from __future__ import annotations import tempfile import unittest from datetime import datetime, timezone from pathlib import Path from cold_display_guard.cases import CaseStore, append_case_snapshots, load_case_snapshots UTC = timezone.utc def event( event_name: str, when: datetime, *, batch_id: str = "batch_000001", zone_id: str = "1", zone_label: str = "区域 1", camera_id: str = "cam_01", severity: str = "info", state: str = "active", ) -> dict[str, object]: return { "event": event_name, "ts": when.isoformat(), "batch_id": batch_id, "camera_id": camera_id, "zone_id": zone_id, "zone_label": zone_label, "severity": severity, "state": state, } class CaseStoreTests(unittest.TestCase): def setUp(self) -> None: self.t0 = datetime(2026, 6, 9, 9, 0, tzinfo=UTC) def test_time_alarm_creates_open_case(self) -> None: store = CaseStore() snapshots = store.apply_batch_events([event("time_alarm", self.t0, severity="alarm", state="alerted")]) self.assertEqual(len(snapshots), 1) self.assertEqual(snapshots[0]["case_type"], "time_alarm") self.assertEqual(snapshots[0]["case_status"], "open") self.assertEqual(snapshots[0]["source_event"], "time_alarm") def test_time_pre_warning_creates_open_pre_warning_case(self) -> None: store = CaseStore() snapshots = store.apply_batch_events( [event("time_pre_warning", self.t0, severity="warning", state="pre_warning")] ) self.assertEqual(len(snapshots), 1) self.assertEqual(snapshots[0]["case_type"], "pre_warning") self.assertEqual(snapshots[0]["case_status"], "open") self.assertEqual(snapshots[0]["source_event"], "time_pre_warning") def test_pre_warning_handled_auto_closes_open_case(self) -> None: store = CaseStore() store.apply_batch_events([event("time_pre_warning", self.t0, severity="warning", state="pre_warning")]) snapshots = store.apply_batch_events( [event("pre_warning_handled", self.t0.replace(minute=1), severity="info", state="handled")] ) self.assertEqual(len(snapshots), 1) self.assertEqual(snapshots[0]["case_status"], "handled") self.assertEqual(snapshots[0]["handled_source"], "auto_removed_before_alarm") def test_time_alarm_upgrades_pre_warning_case(self) -> None: store = CaseStore() store.apply_batch_events([event("time_pre_warning", self.t0, severity="warning", state="pre_warning")]) snapshots = store.apply_batch_events([event("time_alarm", self.t0.replace(minute=2), severity="alarm", state="alerted")]) self.assertEqual(len(snapshots), 1) self.assertEqual(snapshots[0]["case_type"], "time_alarm") self.assertEqual(snapshots[0]["case_status"], "open") self.assertEqual(snapshots[0]["source_event"], "time_alarm") def test_alarm_removal_timeout_upgrades_same_case(self) -> None: store = CaseStore() store.apply_batch_events([event("time_pre_warning", self.t0, severity="warning", state="pre_warning")]) store.apply_batch_events([event("time_alarm", self.t0.replace(minute=2), severity="alarm", state="alerted")]) snapshots = store.apply_batch_events( [event("alarm_removal_timeout", self.t0.replace(minute=3), severity="alarm", state="alarm_removal_timeout")] ) self.assertEqual(len(snapshots), 1) self.assertEqual(snapshots[0]["case_type"], "alarm_removal_timeout") self.assertEqual(snapshots[0]["case_status"], "open") self.assertEqual(snapshots[0]["source_event"], "alarm_removal_timeout") def test_pending_disposal_upgrades_existing_case(self) -> None: store = CaseStore() store.apply_batch_events([event("time_alarm", self.t0, severity="alarm", state="alerted")]) snapshots = store.apply_batch_events( [event("batch_pending_disposal", self.t0.replace(minute=1), severity="warning", state="pending_disposal")] ) self.assertEqual(len(snapshots), 1) self.assertEqual(snapshots[0]["case_type"], "pending_disposal") self.assertEqual(snapshots[0]["case_status"], "open") self.assertEqual(snapshots[0]["source_event"], "batch_pending_disposal") def test_warning_escalated_upgrades_same_case(self) -> None: store = CaseStore() store.apply_batch_events([event("time_alarm", self.t0, severity="alarm", state="alerted")]) store.apply_batch_events( [event("batch_pending_disposal", self.t0.replace(minute=1), severity="warning", state="pending_disposal")] ) snapshots = store.apply_batch_events( [event("warning_escalated", self.t0.replace(minute=2), severity="warning", state="warning")] ) self.assertEqual(len(snapshots), 1) self.assertEqual(snapshots[0]["case_type"], "warning_escalated") self.assertEqual(snapshots[0]["case_status"], "open") self.assertEqual(snapshots[0]["source_event"], "warning_escalated") def test_batch_discarded_auto_closes_open_case(self) -> None: store = CaseStore() store.apply_batch_events([event("time_alarm", self.t0, severity="alarm", state="alerted")]) snapshots = store.apply_batch_events( [event("batch_discarded", self.t0.replace(minute=3), severity="info", state="discarded")] ) self.assertEqual(len(snapshots), 1) self.assertEqual(snapshots[0]["case_status"], "handled") self.assertEqual(snapshots[0]["handled_source"], "auto_closed") def test_manual_handle_closes_case(self) -> None: store = CaseStore() created = store.apply_batch_events([event("time_alarm", self.t0, severity="alarm", state="alerted")])[0] snapshot = store.mark_handled( str(created["case_id"]), handled_at=self.t0.replace(minute=4), handled_by="alice", handled_source="manual", note="checked", ) self.assertEqual(snapshot["case_status"], "handled") self.assertEqual(snapshot["handled_source"], "manual") self.assertEqual(snapshot["handled_by"], "alice") self.assertEqual(snapshot["payload"]["note"], "checked") def test_callback_handle_closes_case(self) -> None: store = CaseStore() created = store.apply_batch_events([event("time_alarm", self.t0, severity="alarm", state="alerted")])[0] snapshot = store.mark_handled( str(created["case_id"]), handled_at=self.t0.replace(minute=5), handled_by="crm-bot", handled_source="webhook_callback", source_ref="crm-123", ) self.assertEqual(snapshot["case_status"], "handled") self.assertEqual(snapshot["handled_source"], "webhook_callback") self.assertEqual(snapshot["payload"]["source_ref"], "crm-123") def test_handled_case_does_not_reopen_on_stale_event(self) -> None: store = CaseStore() created = store.apply_batch_events([event("time_alarm", self.t0, severity="alarm", state="alerted")])[0] store.mark_handled( str(created["case_id"]), handled_at=self.t0.replace(minute=5), handled_by="alice", handled_source="manual", ) snapshots = store.apply_batch_events( [event("batch_pending_disposal", self.t0.replace(minute=1), severity="warning", state="pending_disposal")] ) self.assertEqual(snapshots, []) case = store.latest_cases()[0] self.assertEqual(case["case_status"], "handled") self.assertEqual(case["handled_source"], "manual") def test_case_snapshots_round_trip_through_jsonl(self) -> None: store = CaseStore() snapshots = store.apply_batch_events([event("time_alarm", self.t0, severity="alarm", state="alerted")]) with tempfile.TemporaryDirectory() as tmpdir: path = Path(tmpdir) / "cases.jsonl" append_case_snapshots(path, snapshots) loaded = load_case_snapshots(path) self.assertEqual(len(loaded), 1) self.assertEqual(loaded[0]["case_type"], "time_alarm") if __name__ == "__main__": unittest.main()