208 lines
8.2 KiB
Python
208 lines
8.2 KiB
Python
from __future__ import annotations
|
|
|
|
import tempfile
|
|
import unittest
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
|
|
from cold_display_guard.cases import CaseStore, append_case_snapshots, load_case_snapshots
|
|
|
|
|
|
UTC = timezone.utc
|
|
|
|
|
|
def event(
|
|
event_name: str,
|
|
when: datetime,
|
|
*,
|
|
batch_id: str = "batch_000001",
|
|
zone_id: str = "1",
|
|
zone_label: str = "区域 1",
|
|
camera_id: str = "cam_01",
|
|
severity: str = "info",
|
|
state: str = "active",
|
|
) -> dict[str, object]:
|
|
return {
|
|
"event": event_name,
|
|
"ts": when.isoformat(),
|
|
"batch_id": batch_id,
|
|
"camera_id": camera_id,
|
|
"zone_id": zone_id,
|
|
"zone_label": zone_label,
|
|
"severity": severity,
|
|
"state": state,
|
|
}
|
|
|
|
|
|
class CaseStoreTests(unittest.TestCase):
|
|
def setUp(self) -> None:
|
|
self.t0 = datetime(2026, 6, 9, 9, 0, tzinfo=UTC)
|
|
|
|
def test_time_alarm_creates_open_case(self) -> None:
|
|
store = CaseStore()
|
|
|
|
snapshots = store.apply_batch_events([event("time_alarm", self.t0, severity="alarm", state="alerted")])
|
|
|
|
self.assertEqual(len(snapshots), 1)
|
|
self.assertEqual(snapshots[0]["case_type"], "time_alarm")
|
|
self.assertEqual(snapshots[0]["case_status"], "open")
|
|
self.assertEqual(snapshots[0]["source_event"], "time_alarm")
|
|
|
|
def test_time_pre_warning_creates_open_pre_warning_case(self) -> None:
|
|
store = CaseStore()
|
|
|
|
snapshots = store.apply_batch_events(
|
|
[event("time_pre_warning", self.t0, severity="warning", state="pre_warning")]
|
|
)
|
|
|
|
self.assertEqual(len(snapshots), 1)
|
|
self.assertEqual(snapshots[0]["case_type"], "pre_warning")
|
|
self.assertEqual(snapshots[0]["case_status"], "open")
|
|
self.assertEqual(snapshots[0]["source_event"], "time_pre_warning")
|
|
|
|
def test_pre_warning_handled_auto_closes_open_case(self) -> None:
|
|
store = CaseStore()
|
|
store.apply_batch_events([event("time_pre_warning", self.t0, severity="warning", state="pre_warning")])
|
|
|
|
snapshots = store.apply_batch_events(
|
|
[event("pre_warning_handled", self.t0.replace(minute=1), severity="info", state="handled")]
|
|
)
|
|
|
|
self.assertEqual(len(snapshots), 1)
|
|
self.assertEqual(snapshots[0]["case_status"], "handled")
|
|
self.assertEqual(snapshots[0]["handled_source"], "auto_removed_before_alarm")
|
|
|
|
def test_time_alarm_upgrades_pre_warning_case(self) -> None:
|
|
store = CaseStore()
|
|
store.apply_batch_events([event("time_pre_warning", self.t0, severity="warning", state="pre_warning")])
|
|
|
|
snapshots = store.apply_batch_events([event("time_alarm", self.t0.replace(minute=2), severity="alarm", state="alerted")])
|
|
|
|
self.assertEqual(len(snapshots), 1)
|
|
self.assertEqual(snapshots[0]["case_type"], "time_alarm")
|
|
self.assertEqual(snapshots[0]["case_status"], "open")
|
|
self.assertEqual(snapshots[0]["source_event"], "time_alarm")
|
|
|
|
def test_alarm_removal_timeout_upgrades_same_case(self) -> None:
|
|
store = CaseStore()
|
|
store.apply_batch_events([event("time_pre_warning", self.t0, severity="warning", state="pre_warning")])
|
|
store.apply_batch_events([event("time_alarm", self.t0.replace(minute=2), severity="alarm", state="alerted")])
|
|
|
|
snapshots = store.apply_batch_events(
|
|
[event("alarm_removal_timeout", self.t0.replace(minute=3), severity="alarm", state="alarm_removal_timeout")]
|
|
)
|
|
|
|
self.assertEqual(len(snapshots), 1)
|
|
self.assertEqual(snapshots[0]["case_type"], "alarm_removal_timeout")
|
|
self.assertEqual(snapshots[0]["case_status"], "open")
|
|
self.assertEqual(snapshots[0]["source_event"], "alarm_removal_timeout")
|
|
|
|
def test_pending_disposal_upgrades_existing_case(self) -> None:
|
|
store = CaseStore()
|
|
store.apply_batch_events([event("time_alarm", self.t0, severity="alarm", state="alerted")])
|
|
|
|
snapshots = store.apply_batch_events(
|
|
[event("batch_pending_disposal", self.t0.replace(minute=1), severity="warning", state="pending_disposal")]
|
|
)
|
|
|
|
self.assertEqual(len(snapshots), 1)
|
|
self.assertEqual(snapshots[0]["case_type"], "pending_disposal")
|
|
self.assertEqual(snapshots[0]["case_status"], "open")
|
|
self.assertEqual(snapshots[0]["source_event"], "batch_pending_disposal")
|
|
|
|
def test_warning_escalated_upgrades_same_case(self) -> None:
|
|
store = CaseStore()
|
|
store.apply_batch_events([event("time_alarm", self.t0, severity="alarm", state="alerted")])
|
|
store.apply_batch_events(
|
|
[event("batch_pending_disposal", self.t0.replace(minute=1), severity="warning", state="pending_disposal")]
|
|
)
|
|
|
|
snapshots = store.apply_batch_events(
|
|
[event("warning_escalated", self.t0.replace(minute=2), severity="warning", state="warning")]
|
|
)
|
|
|
|
self.assertEqual(len(snapshots), 1)
|
|
self.assertEqual(snapshots[0]["case_type"], "warning_escalated")
|
|
self.assertEqual(snapshots[0]["case_status"], "open")
|
|
self.assertEqual(snapshots[0]["source_event"], "warning_escalated")
|
|
|
|
def test_batch_discarded_auto_closes_open_case(self) -> None:
|
|
store = CaseStore()
|
|
store.apply_batch_events([event("time_alarm", self.t0, severity="alarm", state="alerted")])
|
|
|
|
snapshots = store.apply_batch_events(
|
|
[event("batch_discarded", self.t0.replace(minute=3), severity="info", state="discarded")]
|
|
)
|
|
|
|
self.assertEqual(len(snapshots), 1)
|
|
self.assertEqual(snapshots[0]["case_status"], "handled")
|
|
self.assertEqual(snapshots[0]["handled_source"], "auto_closed")
|
|
|
|
def test_manual_handle_closes_case(self) -> None:
|
|
store = CaseStore()
|
|
created = store.apply_batch_events([event("time_alarm", self.t0, severity="alarm", state="alerted")])[0]
|
|
|
|
snapshot = store.mark_handled(
|
|
str(created["case_id"]),
|
|
handled_at=self.t0.replace(minute=4),
|
|
handled_by="alice",
|
|
handled_source="manual",
|
|
note="checked",
|
|
)
|
|
|
|
self.assertEqual(snapshot["case_status"], "handled")
|
|
self.assertEqual(snapshot["handled_source"], "manual")
|
|
self.assertEqual(snapshot["handled_by"], "alice")
|
|
self.assertEqual(snapshot["payload"]["note"], "checked")
|
|
|
|
def test_callback_handle_closes_case(self) -> None:
|
|
store = CaseStore()
|
|
created = store.apply_batch_events([event("time_alarm", self.t0, severity="alarm", state="alerted")])[0]
|
|
|
|
snapshot = store.mark_handled(
|
|
str(created["case_id"]),
|
|
handled_at=self.t0.replace(minute=5),
|
|
handled_by="crm-bot",
|
|
handled_source="webhook_callback",
|
|
source_ref="crm-123",
|
|
)
|
|
|
|
self.assertEqual(snapshot["case_status"], "handled")
|
|
self.assertEqual(snapshot["handled_source"], "webhook_callback")
|
|
self.assertEqual(snapshot["payload"]["source_ref"], "crm-123")
|
|
|
|
def test_handled_case_does_not_reopen_on_stale_event(self) -> None:
|
|
store = CaseStore()
|
|
created = store.apply_batch_events([event("time_alarm", self.t0, severity="alarm", state="alerted")])[0]
|
|
store.mark_handled(
|
|
str(created["case_id"]),
|
|
handled_at=self.t0.replace(minute=5),
|
|
handled_by="alice",
|
|
handled_source="manual",
|
|
)
|
|
|
|
snapshots = store.apply_batch_events(
|
|
[event("batch_pending_disposal", self.t0.replace(minute=1), severity="warning", state="pending_disposal")]
|
|
)
|
|
|
|
self.assertEqual(snapshots, [])
|
|
case = store.latest_cases()[0]
|
|
self.assertEqual(case["case_status"], "handled")
|
|
self.assertEqual(case["handled_source"], "manual")
|
|
|
|
def test_case_snapshots_round_trip_through_jsonl(self) -> None:
|
|
store = CaseStore()
|
|
snapshots = store.apply_batch_events([event("time_alarm", self.t0, severity="alarm", state="alerted")])
|
|
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
path = Path(tmpdir) / "cases.jsonl"
|
|
append_case_snapshots(path, snapshots)
|
|
loaded = load_case_snapshots(path)
|
|
|
|
self.assertEqual(len(loaded), 1)
|
|
self.assertEqual(loaded[0]["case_type"], "time_alarm")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
unittest.main()
|