346 lines
14 KiB
Python
346 lines
14 KiB
Python
from __future__ import annotations
|
|
|
|
import json
|
|
import tempfile
|
|
import unittest
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
|
|
from cold_display_guard.cases import CaseStore
|
|
from cold_display_guard.main import (
|
|
case_sink_path,
|
|
capture_runtime_alarm_snapshot,
|
|
deliver_runtime_webhooks,
|
|
persist_case_updates,
|
|
restore_runtime_state,
|
|
webhook_retry_sink_path,
|
|
)
|
|
from cold_display_guard.vision import Frame
|
|
from cold_display_guard.webhooks import load_retry_snapshots
|
|
|
|
|
|
UTC = timezone.utc
|
|
|
|
|
|
class RuntimeRestoreTests(unittest.TestCase):
|
|
def test_case_sink_path_uses_default_logs_location(self) -> None:
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
root = Path(tmpdir)
|
|
|
|
path = case_sink_path(root, {})
|
|
|
|
self.assertEqual(path, (root / "logs" / "cases.jsonl").resolve())
|
|
|
|
def test_webhook_retry_sink_path_uses_default_logs_location(self) -> None:
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
root = Path(tmpdir)
|
|
|
|
path = webhook_retry_sink_path(root, {})
|
|
|
|
self.assertEqual(path, (root / "logs" / "webhook_retry.jsonl").resolve())
|
|
|
|
def test_persist_case_updates_writes_case_snapshots(self) -> None:
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
path = Path(tmpdir) / "cases.jsonl"
|
|
store = CaseStore()
|
|
|
|
snapshots = persist_case_updates(
|
|
store,
|
|
path,
|
|
[
|
|
{
|
|
"event": "time_alarm",
|
|
"ts": datetime(2026, 6, 9, 9, 0, tzinfo=UTC).isoformat(),
|
|
"batch_id": "batch_000001",
|
|
"camera_id": "cam_01",
|
|
"zone_id": "1",
|
|
"zone_label": "区域 1",
|
|
"severity": "alarm",
|
|
"state": "alerted",
|
|
}
|
|
],
|
|
)
|
|
|
|
written = [json.loads(line) for line in path.read_text(encoding="utf-8").splitlines()]
|
|
|
|
self.assertEqual(len(snapshots), 1)
|
|
self.assertEqual(written[0]["case_type"], "time_alarm")
|
|
self.assertEqual(written[0]["case_status"], "open")
|
|
|
|
def test_deliver_runtime_webhooks_sends_event_and_case_payloads(self) -> None:
|
|
deliveries: list[tuple[str, dict[str, object]]] = []
|
|
|
|
def fake_post(url: str, payload: dict[str, object], timeout: tuple[float, float]) -> tuple[int, str]:
|
|
deliveries.append((url, payload))
|
|
return 200, "ok"
|
|
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
audit_path = Path(tmpdir) / "webhook_delivery.jsonl"
|
|
deliver_runtime_webhooks(
|
|
[
|
|
{
|
|
"event": "time_alarm",
|
|
"ts": datetime(2026, 6, 9, 9, 0, tzinfo=UTC).isoformat(),
|
|
"batch_id": "batch_000001",
|
|
"camera_id": "cam_01",
|
|
"zone_id": "1",
|
|
"zone_label": "区域 1",
|
|
"severity": "alarm",
|
|
"state": "alerted",
|
|
}
|
|
],
|
|
[
|
|
{
|
|
"case_id": "case_batch_000001",
|
|
"batch_id": "batch_000001",
|
|
"case_type": "time_alarm",
|
|
"case_status": "open",
|
|
"source_event": "time_alarm",
|
|
"handled_source": "",
|
|
"created_at": datetime(2026, 6, 9, 9, 0, tzinfo=UTC).isoformat(),
|
|
"updated_at": datetime(2026, 6, 9, 9, 0, tzinfo=UTC).isoformat(),
|
|
}
|
|
],
|
|
{
|
|
"webhooks": {
|
|
"enabled": True,
|
|
"event_url": "https://example.com/events",
|
|
"case_url": "https://example.com/cases",
|
|
}
|
|
},
|
|
audit_path,
|
|
http_post=fake_post,
|
|
)
|
|
|
|
self.assertEqual(len(deliveries), 2)
|
|
self.assertEqual(deliveries[0][1]["kind"], "batch_event")
|
|
self.assertEqual(deliveries[1][1]["kind"], "case_event")
|
|
|
|
def test_capture_runtime_alarm_snapshot_uses_current_frame_for_alert_events(self) -> None:
|
|
frame = Frame(width=1, height=1, rgb=b"\x01\x02\x03")
|
|
result = capture_runtime_alarm_snapshot(
|
|
frame,
|
|
[
|
|
{
|
|
"event": "time_alarm",
|
|
"severity": "alarm",
|
|
"batch_id": "batch_000001",
|
|
"camera_id": "cam_01",
|
|
"zone_id": "1",
|
|
"ts": datetime(2026, 6, 9, 9, 0, tzinfo=UTC).isoformat(),
|
|
}
|
|
],
|
|
{"alarm_snapshot_upload": {"enabled": True}},
|
|
now=datetime(2026, 6, 9, 9, 0, tzinfo=UTC),
|
|
jpeg_encoder=lambda frame, timeout_seconds: b"jpeg",
|
|
uploader=lambda image_bytes, **kwargs: {"status": "uploaded", "object_key": "uploads/alarms/a.jpg", "file_name": "a.jpg"},
|
|
)
|
|
|
|
self.assertEqual(result["status"], "uploaded")
|
|
self.assertEqual(result["object_key"], "uploads/alarms/a.jpg")
|
|
|
|
def test_deliver_runtime_webhooks_enqueues_failure_and_drains_due_retry(self) -> None:
|
|
attempts = {"count": 0}
|
|
|
|
def flaky_post(url: str, payload: dict[str, object], timeout: tuple[float, float]) -> tuple[int, str]:
|
|
attempts["count"] += 1
|
|
if attempts["count"] == 1:
|
|
return 503, "down"
|
|
return 200, "ok"
|
|
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
audit_path = Path(tmpdir) / "webhook_delivery.jsonl"
|
|
retry_path = Path(tmpdir) / "webhook_retry.jsonl"
|
|
config = {
|
|
"webhooks": {
|
|
"enabled": True,
|
|
"event_url": "https://example.com/events",
|
|
"retry_max_attempts": 3,
|
|
"retry_backoff_seconds": 30,
|
|
}
|
|
}
|
|
deliver_runtime_webhooks(
|
|
[
|
|
{
|
|
"event": "time_alarm",
|
|
"ts": datetime(2026, 6, 9, 9, 0, tzinfo=UTC).isoformat(),
|
|
"batch_id": "batch_000001",
|
|
"camera_id": "cam_01",
|
|
"zone_id": "1",
|
|
"zone_label": "区域 1",
|
|
"severity": "alarm",
|
|
"state": "alerted",
|
|
}
|
|
],
|
|
[],
|
|
config,
|
|
audit_path,
|
|
retry_path=retry_path,
|
|
http_post=flaky_post,
|
|
now=datetime(2026, 6, 9, 9, 0, tzinfo=UTC),
|
|
)
|
|
deliver_runtime_webhooks(
|
|
[],
|
|
[],
|
|
config,
|
|
audit_path,
|
|
retry_path=retry_path,
|
|
http_post=flaky_post,
|
|
now=datetime(2026, 6, 9, 9, 1, tzinfo=UTC),
|
|
)
|
|
retries = load_retry_snapshots(retry_path)
|
|
|
|
self.assertEqual(attempts["count"], 2)
|
|
self.assertEqual(retries[-1]["status"], "delivered")
|
|
self.assertEqual(retries[-1]["attempt_count"], 2)
|
|
|
|
def test_deliver_runtime_webhooks_includes_snapshot_path_in_alert_payloads(self) -> None:
|
|
deliveries: list[dict[str, object]] = []
|
|
|
|
def fake_post(url: str, payload: dict[str, object], timeout: tuple[float, float]) -> tuple[int, str]:
|
|
deliveries.append(payload)
|
|
return 200, "ok"
|
|
|
|
deliver_runtime_webhooks(
|
|
[
|
|
{
|
|
"event": "time_alarm",
|
|
"ts": datetime(2026, 6, 9, 9, 0, tzinfo=UTC).isoformat(),
|
|
"batch_id": "batch_000001",
|
|
"camera_id": "cam_01",
|
|
"zone_id": "1",
|
|
"zone_label": "区域 1",
|
|
"severity": "alarm",
|
|
"state": "alerted",
|
|
}
|
|
],
|
|
[
|
|
{
|
|
"case_id": "case_batch_000001",
|
|
"batch_id": "batch_000001",
|
|
"case_type": "time_alarm",
|
|
"case_status": "open",
|
|
"source_event": "time_alarm",
|
|
"handled_source": "",
|
|
"created_at": datetime(2026, 6, 9, 9, 0, tzinfo=UTC).isoformat(),
|
|
"updated_at": datetime(2026, 6, 9, 9, 0, tzinfo=UTC).isoformat(),
|
|
}
|
|
],
|
|
{
|
|
"webhooks": {
|
|
"enabled": True,
|
|
"event_url": "https://example.com/events",
|
|
"case_url": "https://example.com/cases",
|
|
}
|
|
},
|
|
Path(tempfile.mkdtemp()) / "webhook_delivery.jsonl",
|
|
http_post=fake_post,
|
|
snapshot_upload={"status": "uploaded", "object_key": "uploads/alarms/a.jpg", "batch_ids": ["batch_000001"]},
|
|
)
|
|
|
|
self.assertEqual(deliveries[0]["snapshot_object_key"], "uploads/alarms/a.jpg")
|
|
self.assertEqual(deliveries[1]["snapshot_object_key"], "uploads/alarms/a.jpg")
|
|
|
|
def test_restore_runtime_state_uses_stable_occupancy_when_raw_metrics_flicker(self) -> None:
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
diagnostics_path = Path(tmpdir) / "runtime_diagnostics.jsonl"
|
|
diagnostics_path.write_text(
|
|
json.dumps(
|
|
{
|
|
"ts": "2026-05-29T10:05:26+08:00",
|
|
"zone_counts": {"2": 1},
|
|
"diagnostics": {
|
|
"baseline_ready": True,
|
|
"zones": {
|
|
"2": {
|
|
"baseline_mean_luma": 165.0,
|
|
"baseline_texture": 16.0,
|
|
"baseline_dark_fraction": 0.0,
|
|
"baseline_bright_fraction": 0.0,
|
|
"mean_delta": 17.077,
|
|
"texture_delta": 8.819,
|
|
"dark_fraction": 0.0357,
|
|
"bright_fraction": 0.0,
|
|
"raw_occupied": False,
|
|
"occupied": True,
|
|
"empty_streak": 1,
|
|
},
|
|
},
|
|
},
|
|
}
|
|
),
|
|
encoding="utf-8",
|
|
)
|
|
|
|
_, zone_counts = restore_runtime_state(
|
|
diagnostics_path,
|
|
{
|
|
"runtime": {
|
|
"occupancy_mean_delta": 55.0,
|
|
"occupancy_texture_delta": 18.0,
|
|
"occupancy_dark_fraction": 0.06,
|
|
"occupancy_texture_dark_fraction": 0.04,
|
|
}
|
|
},
|
|
)
|
|
|
|
self.assertEqual(zone_counts, {"2": 1})
|
|
|
|
def test_restore_runtime_state_uses_dark_fraction_rules(self) -> None:
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
diagnostics_path = Path(tmpdir) / "runtime_diagnostics.jsonl"
|
|
diagnostics_path.write_text(
|
|
json.dumps(
|
|
{
|
|
"ts": "2026-05-29T10:00:00+08:00",
|
|
"zone_counts": {"1": 1, "4": 1},
|
|
"diagnostics": {
|
|
"baseline_ready": True,
|
|
"zones": {
|
|
"1": {
|
|
"baseline_mean_luma": 165.0,
|
|
"baseline_texture": 16.0,
|
|
"baseline_dark_fraction": 0.0,
|
|
"baseline_bright_fraction": 0.0,
|
|
"mean_delta": 40.0,
|
|
"texture_delta": 18.0,
|
|
"dark_fraction": 0.10,
|
|
"bright_fraction": 0.0,
|
|
},
|
|
"4": {
|
|
"baseline_mean_luma": 177.0,
|
|
"baseline_texture": 9.0,
|
|
"baseline_dark_fraction": 0.0,
|
|
"baseline_bright_fraction": 0.0,
|
|
"mean_delta": 16.0,
|
|
"texture_delta": 40.0,
|
|
"dark_fraction": 0.0769,
|
|
"bright_fraction": 0.3077,
|
|
},
|
|
},
|
|
},
|
|
}
|
|
),
|
|
encoding="utf-8",
|
|
)
|
|
|
|
baselines, zone_counts = restore_runtime_state(
|
|
diagnostics_path,
|
|
{
|
|
"runtime": {
|
|
"occupancy_mean_delta": 55.0,
|
|
"occupancy_texture_delta": 18.0,
|
|
"occupancy_dark_fraction": 0.06,
|
|
"occupancy_texture_dark_fraction": 0.04,
|
|
}
|
|
},
|
|
)
|
|
|
|
self.assertEqual(zone_counts, {"1": 1, "4": 0})
|
|
self.assertEqual(baselines["1"].dark_fraction, 0.0)
|
|
self.assertEqual(baselines["4"].bright_fraction, 0.0)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
unittest.main()
|