from __future__ import annotations import json import tempfile import unittest from datetime import datetime, timezone from pathlib import Path from cold_display_guard.cases import CaseStore, append_case_snapshots, load_case_snapshots from cold_display_guard.main import ( case_sink_path, capture_runtime_alarm_snapshot, deliver_runtime_webhooks, persist_case_updates, restore_runtime_state, webhook_retry_sink_path, ) from cold_display_guard.vision import Frame from cold_display_guard.webhooks import load_retry_snapshots UTC = timezone.utc class RuntimeRestoreTests(unittest.TestCase): def test_case_sink_path_uses_default_logs_location(self) -> None: with tempfile.TemporaryDirectory() as tmpdir: root = Path(tmpdir) path = case_sink_path(root, {}) self.assertEqual(path, (root / "logs" / "cases.jsonl").resolve()) def test_webhook_retry_sink_path_uses_default_logs_location(self) -> None: with tempfile.TemporaryDirectory() as tmpdir: root = Path(tmpdir) path = webhook_retry_sink_path(root, {}) self.assertEqual(path, (root / "logs" / "webhook_retry.jsonl").resolve()) def test_persist_case_updates_writes_case_snapshots(self) -> None: with tempfile.TemporaryDirectory() as tmpdir: path = Path(tmpdir) / "cases.jsonl" store = CaseStore() snapshots = persist_case_updates( store, path, [ { "event": "time_alarm", "ts": datetime(2026, 6, 9, 9, 0, tzinfo=UTC).isoformat(), "batch_id": "batch_000001", "camera_id": "cam_01", "zone_id": "1", "zone_label": "区域 1", "severity": "alarm", "state": "alerted", } ], ) written = [json.loads(line) for line in path.read_text(encoding="utf-8").splitlines()] self.assertEqual(len(snapshots), 1) self.assertEqual(written[0]["case_type"], "time_alarm") self.assertEqual(written[0]["case_status"], "open") def test_persist_case_updates_preserves_api_handled_snapshot(self) -> None: with tempfile.TemporaryDirectory() as tmpdir: path = Path(tmpdir) / "cases.jsonl" runtime_store = CaseStore() created = persist_case_updates( runtime_store, path, [ { "event": "time_alarm", "ts": datetime(2026, 6, 9, 9, 0, tzinfo=UTC).isoformat(), "batch_id": "batch_000001", "camera_id": "cam_01", "zone_id": "1", "zone_label": "区域 1", "severity": "alarm", "state": "alerted", } ], )[0] api_store = CaseStore(load_case_snapshots(path)) append_case_snapshots( path, [ api_store.mark_handled( str(created["case_id"]), handled_at=datetime(2026, 6, 9, 9, 5, tzinfo=UTC), handled_by="alice", handled_source="manual", ) ], ) snapshots = persist_case_updates( runtime_store, path, [ { "event": "batch_pending_disposal", "ts": datetime(2026, 6, 9, 9, 6, tzinfo=UTC).isoformat(), "batch_id": "batch_000001", "camera_id": "cam_01", "zone_id": "1", "zone_label": "区域 1", "severity": "warning", "state": "pending_disposal", } ], ) latest = CaseStore(load_case_snapshots(path)).latest_cases()[0] self.assertEqual(snapshots, []) self.assertEqual(latest["case_status"], "handled") self.assertEqual(latest["handled_source"], "manual") def test_deliver_runtime_webhooks_sends_event_and_case_payloads(self) -> None: deliveries: list[tuple[str, dict[str, object]]] = [] def fake_post(url: str, payload: dict[str, object], timeout: tuple[float, float]) -> tuple[int, str]: deliveries.append((url, payload)) return 200, "ok" with tempfile.TemporaryDirectory() as tmpdir: audit_path = Path(tmpdir) / "webhook_delivery.jsonl" deliver_runtime_webhooks( [ { "event": "time_alarm", "ts": datetime(2026, 6, 9, 9, 0, tzinfo=UTC).isoformat(), "batch_id": "batch_000001", "camera_id": "cam_01", "zone_id": "1", "zone_label": "区域 1", "severity": "alarm", "state": "alerted", } ], [ { "case_id": "case_batch_000001", "batch_id": "batch_000001", "case_type": "time_alarm", "case_status": "open", "source_event": "time_alarm", "handled_source": "", "created_at": datetime(2026, 6, 9, 9, 0, tzinfo=UTC).isoformat(), "updated_at": datetime(2026, 6, 9, 9, 0, tzinfo=UTC).isoformat(), } ], { "webhooks": { "enabled": True, "event_url": "https://example.com/events", "case_url": "https://example.com/cases", } }, audit_path, http_post=fake_post, ) self.assertEqual(len(deliveries), 2) self.assertEqual(deliveries[0][1]["kind"], "batch_event") self.assertEqual(deliveries[1][1]["kind"], "case_event") def test_capture_runtime_alarm_snapshot_uses_current_frame_for_alert_events(self) -> None: frame = Frame(width=1, height=1, rgb=b"\x01\x02\x03") result = capture_runtime_alarm_snapshot( frame, [ { "event": "time_alarm", "severity": "alarm", "batch_id": "batch_000001", "camera_id": "cam_01", "zone_id": "1", "ts": datetime(2026, 6, 9, 9, 0, tzinfo=UTC).isoformat(), } ], {"alarm_snapshot_upload": {"enabled": True}}, now=datetime(2026, 6, 9, 9, 0, tzinfo=UTC), jpeg_encoder=lambda frame, timeout_seconds: b"jpeg", uploader=lambda image_bytes, **kwargs: {"status": "uploaded", "object_key": "uploads/alarms/a.jpg", "file_name": "a.jpg"}, ) self.assertEqual(result["status"], "uploaded") self.assertEqual(result["object_key"], "uploads/alarms/a.jpg") def test_deliver_runtime_webhooks_enqueues_failure_and_drains_due_retry(self) -> None: attempts = {"count": 0} def flaky_post(url: str, payload: dict[str, object], timeout: tuple[float, float]) -> tuple[int, str]: attempts["count"] += 1 if attempts["count"] == 1: return 503, "down" return 200, "ok" with tempfile.TemporaryDirectory() as tmpdir: audit_path = Path(tmpdir) / "webhook_delivery.jsonl" retry_path = Path(tmpdir) / "webhook_retry.jsonl" config = { "webhooks": { "enabled": True, "event_url": "https://example.com/events", "retry_max_attempts": 3, "retry_backoff_seconds": 30, } } deliver_runtime_webhooks( [ { "event": "time_alarm", "ts": datetime(2026, 6, 9, 9, 0, tzinfo=UTC).isoformat(), "batch_id": "batch_000001", "camera_id": "cam_01", "zone_id": "1", "zone_label": "区域 1", "severity": "alarm", "state": "alerted", } ], [], config, audit_path, retry_path=retry_path, http_post=flaky_post, now=datetime(2026, 6, 9, 9, 0, tzinfo=UTC), ) deliver_runtime_webhooks( [], [], config, audit_path, retry_path=retry_path, http_post=flaky_post, now=datetime(2026, 6, 9, 9, 1, tzinfo=UTC), ) retries = load_retry_snapshots(retry_path) self.assertEqual(attempts["count"], 2) self.assertEqual(retries[-1]["status"], "delivered") self.assertEqual(retries[-1]["attempt_count"], 2) def test_deliver_runtime_webhooks_includes_snapshot_path_in_alert_payloads(self) -> None: deliveries: list[dict[str, object]] = [] def fake_post(url: str, payload: dict[str, object], timeout: tuple[float, float]) -> tuple[int, str]: deliveries.append(payload) return 200, "ok" deliver_runtime_webhooks( [ { "event": "time_alarm", "ts": datetime(2026, 6, 9, 9, 0, tzinfo=UTC).isoformat(), "batch_id": "batch_000001", "camera_id": "cam_01", "zone_id": "1", "zone_label": "区域 1", "severity": "alarm", "state": "alerted", } ], [ { "case_id": "case_batch_000001", "batch_id": "batch_000001", "case_type": "time_alarm", "case_status": "open", "source_event": "time_alarm", "handled_source": "", "created_at": datetime(2026, 6, 9, 9, 0, tzinfo=UTC).isoformat(), "updated_at": datetime(2026, 6, 9, 9, 0, tzinfo=UTC).isoformat(), } ], { "webhooks": { "enabled": True, "event_url": "https://example.com/events", "case_url": "https://example.com/cases", } }, Path(tempfile.mkdtemp()) / "webhook_delivery.jsonl", http_post=fake_post, snapshot_upload={"status": "uploaded", "object_key": "uploads/alarms/a.jpg", "batch_ids": ["batch_000001"]}, ) self.assertEqual(deliveries[0]["snapshot_object_key"], "uploads/alarms/a.jpg") self.assertEqual(deliveries[1]["snapshot_object_key"], "uploads/alarms/a.jpg") def test_restore_runtime_state_uses_stable_occupancy_when_raw_metrics_flicker(self) -> None: with tempfile.TemporaryDirectory() as tmpdir: diagnostics_path = Path(tmpdir) / "runtime_diagnostics.jsonl" diagnostics_path.write_text( json.dumps( { "ts": "2026-05-29T10:05:26+08:00", "zone_counts": {"2": 1}, "diagnostics": { "baseline_ready": True, "zones": { "2": { "baseline_mean_luma": 165.0, "baseline_texture": 16.0, "baseline_dark_fraction": 0.0, "baseline_bright_fraction": 0.0, "mean_delta": 17.077, "texture_delta": 8.819, "dark_fraction": 0.0357, "bright_fraction": 0.0, "raw_occupied": False, "occupied": True, "empty_streak": 1, }, }, }, } ), encoding="utf-8", ) _, zone_counts = restore_runtime_state( diagnostics_path, { "runtime": { "occupancy_mean_delta": 55.0, "occupancy_texture_delta": 18.0, "occupancy_dark_fraction": 0.06, "occupancy_texture_dark_fraction": 0.04, } }, ) self.assertEqual(zone_counts, {"2": 1}) def test_restore_runtime_state_uses_dark_fraction_rules(self) -> None: with tempfile.TemporaryDirectory() as tmpdir: diagnostics_path = Path(tmpdir) / "runtime_diagnostics.jsonl" diagnostics_path.write_text( json.dumps( { "ts": "2026-05-29T10:00:00+08:00", "zone_counts": {"1": 1, "4": 1}, "diagnostics": { "baseline_ready": True, "zones": { "1": { "baseline_mean_luma": 165.0, "baseline_texture": 16.0, "baseline_dark_fraction": 0.0, "baseline_bright_fraction": 0.0, "mean_delta": 40.0, "texture_delta": 18.0, "dark_fraction": 0.10, "bright_fraction": 0.0, }, "4": { "baseline_mean_luma": 177.0, "baseline_texture": 9.0, "baseline_dark_fraction": 0.0, "baseline_bright_fraction": 0.0, "mean_delta": 16.0, "texture_delta": 40.0, "dark_fraction": 0.0769, "bright_fraction": 0.3077, }, }, }, } ), encoding="utf-8", ) baselines, zone_counts = restore_runtime_state( diagnostics_path, { "runtime": { "occupancy_mean_delta": 55.0, "occupancy_texture_delta": 18.0, "occupancy_dark_fraction": 0.06, "occupancy_texture_dark_fraction": 0.04, } }, ) self.assertEqual(zone_counts, {"1": 1, "4": 0}) self.assertEqual(baselines["1"].dark_fraction, 0.0) self.assertEqual(baselines["4"].bright_fraction, 0.0) if __name__ == "__main__": unittest.main()