feat: add webhook case management
This commit is contained in:
158
tests/test_cases.py
Normal file
158
tests/test_cases.py
Normal file
@@ -0,0 +1,158 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import tempfile
|
||||
import unittest
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
|
||||
from cold_display_guard.cases import CaseStore, append_case_snapshots, load_case_snapshots
|
||||
|
||||
|
||||
UTC = timezone.utc
|
||||
|
||||
|
||||
def event(
|
||||
event_name: str,
|
||||
when: datetime,
|
||||
*,
|
||||
batch_id: str = "batch_000001",
|
||||
zone_id: str = "1",
|
||||
zone_label: str = "区域 1",
|
||||
camera_id: str = "cam_01",
|
||||
severity: str = "info",
|
||||
state: str = "active",
|
||||
) -> dict[str, object]:
|
||||
return {
|
||||
"event": event_name,
|
||||
"ts": when.isoformat(),
|
||||
"batch_id": batch_id,
|
||||
"camera_id": camera_id,
|
||||
"zone_id": zone_id,
|
||||
"zone_label": zone_label,
|
||||
"severity": severity,
|
||||
"state": state,
|
||||
}
|
||||
|
||||
|
||||
class CaseStoreTests(unittest.TestCase):
|
||||
def setUp(self) -> None:
|
||||
self.t0 = datetime(2026, 6, 9, 9, 0, tzinfo=UTC)
|
||||
|
||||
def test_time_alarm_creates_open_case(self) -> None:
|
||||
store = CaseStore()
|
||||
|
||||
snapshots = store.apply_batch_events([event("time_alarm", self.t0, severity="alarm", state="alerted")])
|
||||
|
||||
self.assertEqual(len(snapshots), 1)
|
||||
self.assertEqual(snapshots[0]["case_type"], "time_alarm")
|
||||
self.assertEqual(snapshots[0]["case_status"], "open")
|
||||
self.assertEqual(snapshots[0]["source_event"], "time_alarm")
|
||||
|
||||
def test_pending_disposal_upgrades_existing_case(self) -> None:
|
||||
store = CaseStore()
|
||||
store.apply_batch_events([event("time_alarm", self.t0, severity="alarm", state="alerted")])
|
||||
|
||||
snapshots = store.apply_batch_events(
|
||||
[event("batch_pending_disposal", self.t0.replace(minute=1), severity="warning", state="pending_disposal")]
|
||||
)
|
||||
|
||||
self.assertEqual(len(snapshots), 1)
|
||||
self.assertEqual(snapshots[0]["case_type"], "pending_disposal")
|
||||
self.assertEqual(snapshots[0]["case_status"], "open")
|
||||
self.assertEqual(snapshots[0]["source_event"], "batch_pending_disposal")
|
||||
|
||||
def test_warning_escalated_upgrades_same_case(self) -> None:
|
||||
store = CaseStore()
|
||||
store.apply_batch_events([event("time_alarm", self.t0, severity="alarm", state="alerted")])
|
||||
store.apply_batch_events(
|
||||
[event("batch_pending_disposal", self.t0.replace(minute=1), severity="warning", state="pending_disposal")]
|
||||
)
|
||||
|
||||
snapshots = store.apply_batch_events(
|
||||
[event("warning_escalated", self.t0.replace(minute=2), severity="warning", state="warning")]
|
||||
)
|
||||
|
||||
self.assertEqual(len(snapshots), 1)
|
||||
self.assertEqual(snapshots[0]["case_type"], "warning_escalated")
|
||||
self.assertEqual(snapshots[0]["case_status"], "open")
|
||||
self.assertEqual(snapshots[0]["source_event"], "warning_escalated")
|
||||
|
||||
def test_batch_discarded_auto_closes_open_case(self) -> None:
|
||||
store = CaseStore()
|
||||
store.apply_batch_events([event("time_alarm", self.t0, severity="alarm", state="alerted")])
|
||||
|
||||
snapshots = store.apply_batch_events(
|
||||
[event("batch_discarded", self.t0.replace(minute=3), severity="info", state="discarded")]
|
||||
)
|
||||
|
||||
self.assertEqual(len(snapshots), 1)
|
||||
self.assertEqual(snapshots[0]["case_status"], "handled")
|
||||
self.assertEqual(snapshots[0]["handled_source"], "auto_closed")
|
||||
|
||||
def test_manual_handle_closes_case(self) -> None:
|
||||
store = CaseStore()
|
||||
created = store.apply_batch_events([event("time_alarm", self.t0, severity="alarm", state="alerted")])[0]
|
||||
|
||||
snapshot = store.mark_handled(
|
||||
str(created["case_id"]),
|
||||
handled_at=self.t0.replace(minute=4),
|
||||
handled_by="alice",
|
||||
handled_source="manual",
|
||||
note="checked",
|
||||
)
|
||||
|
||||
self.assertEqual(snapshot["case_status"], "handled")
|
||||
self.assertEqual(snapshot["handled_source"], "manual")
|
||||
self.assertEqual(snapshot["handled_by"], "alice")
|
||||
self.assertEqual(snapshot["payload"]["note"], "checked")
|
||||
|
||||
def test_callback_handle_closes_case(self) -> None:
|
||||
store = CaseStore()
|
||||
created = store.apply_batch_events([event("time_alarm", self.t0, severity="alarm", state="alerted")])[0]
|
||||
|
||||
snapshot = store.mark_handled(
|
||||
str(created["case_id"]),
|
||||
handled_at=self.t0.replace(minute=5),
|
||||
handled_by="crm-bot",
|
||||
handled_source="webhook_callback",
|
||||
source_ref="crm-123",
|
||||
)
|
||||
|
||||
self.assertEqual(snapshot["case_status"], "handled")
|
||||
self.assertEqual(snapshot["handled_source"], "webhook_callback")
|
||||
self.assertEqual(snapshot["payload"]["source_ref"], "crm-123")
|
||||
|
||||
def test_handled_case_does_not_reopen_on_stale_event(self) -> None:
|
||||
store = CaseStore()
|
||||
created = store.apply_batch_events([event("time_alarm", self.t0, severity="alarm", state="alerted")])[0]
|
||||
store.mark_handled(
|
||||
str(created["case_id"]),
|
||||
handled_at=self.t0.replace(minute=5),
|
||||
handled_by="alice",
|
||||
handled_source="manual",
|
||||
)
|
||||
|
||||
snapshots = store.apply_batch_events(
|
||||
[event("batch_pending_disposal", self.t0.replace(minute=1), severity="warning", state="pending_disposal")]
|
||||
)
|
||||
|
||||
self.assertEqual(snapshots, [])
|
||||
case = store.latest_cases()[0]
|
||||
self.assertEqual(case["case_status"], "handled")
|
||||
self.assertEqual(case["handled_source"], "manual")
|
||||
|
||||
def test_case_snapshots_round_trip_through_jsonl(self) -> None:
|
||||
store = CaseStore()
|
||||
snapshots = store.apply_batch_events([event("time_alarm", self.t0, severity="alarm", state="alerted")])
|
||||
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
path = Path(tmpdir) / "cases.jsonl"
|
||||
append_case_snapshots(path, snapshots)
|
||||
loaded = load_case_snapshots(path)
|
||||
|
||||
self.assertEqual(len(loaded), 1)
|
||||
self.assertEqual(loaded[0]["case_type"], "time_alarm")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
@@ -122,6 +122,32 @@ zone_ids = ["1", "2", "3"]
|
||||
self.assertIn("[trash]", text)
|
||||
self.assertNotIn('"trash"', text.split("[layout]", maxsplit=1)[1].split("[[zones]]", maxsplit=1)[0])
|
||||
|
||||
def test_save_config_document_writes_webhooks_and_case_sink(self) -> None:
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
path = Path(tmpdir) / "config.toml"
|
||||
save_config_document(
|
||||
path,
|
||||
{
|
||||
"webhooks": {
|
||||
"enabled": True,
|
||||
"event_url": "https://example.com/events",
|
||||
"case_url": "https://example.com/cases",
|
||||
"callback_token": "secret",
|
||||
"connect_timeout_seconds": 3,
|
||||
"read_timeout_seconds": 5,
|
||||
},
|
||||
"case_sink": {"path": "logs/cases.jsonl"},
|
||||
},
|
||||
)
|
||||
text = path.read_text(encoding="utf-8")
|
||||
|
||||
self.assertIn("[webhooks]", text)
|
||||
self.assertIn('event_url = "https://example.com/events"', text)
|
||||
self.assertIn('case_url = "https://example.com/cases"', text)
|
||||
self.assertIn('callback_token = "secret"', text)
|
||||
self.assertIn("[case_sink]", text)
|
||||
self.assertIn('path = "logs/cases.jsonl"', text)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
|
||||
@@ -3,12 +3,102 @@ from __future__ import annotations
|
||||
import json
|
||||
import tempfile
|
||||
import unittest
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
|
||||
from cold_display_guard.main import restore_runtime_state
|
||||
from cold_display_guard.cases import CaseStore
|
||||
from cold_display_guard.main import case_sink_path, deliver_runtime_webhooks, persist_case_updates, restore_runtime_state
|
||||
|
||||
|
||||
UTC = timezone.utc
|
||||
|
||||
|
||||
class RuntimeRestoreTests(unittest.TestCase):
|
||||
def test_case_sink_path_uses_default_logs_location(self) -> None:
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
root = Path(tmpdir)
|
||||
|
||||
path = case_sink_path(root, {})
|
||||
|
||||
self.assertEqual(path, (root / "logs" / "cases.jsonl").resolve())
|
||||
|
||||
def test_persist_case_updates_writes_case_snapshots(self) -> None:
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
path = Path(tmpdir) / "cases.jsonl"
|
||||
store = CaseStore()
|
||||
|
||||
snapshots = persist_case_updates(
|
||||
store,
|
||||
path,
|
||||
[
|
||||
{
|
||||
"event": "time_alarm",
|
||||
"ts": datetime(2026, 6, 9, 9, 0, tzinfo=UTC).isoformat(),
|
||||
"batch_id": "batch_000001",
|
||||
"camera_id": "cam_01",
|
||||
"zone_id": "1",
|
||||
"zone_label": "区域 1",
|
||||
"severity": "alarm",
|
||||
"state": "alerted",
|
||||
}
|
||||
],
|
||||
)
|
||||
|
||||
written = [json.loads(line) for line in path.read_text(encoding="utf-8").splitlines()]
|
||||
|
||||
self.assertEqual(len(snapshots), 1)
|
||||
self.assertEqual(written[0]["case_type"], "time_alarm")
|
||||
self.assertEqual(written[0]["case_status"], "open")
|
||||
|
||||
def test_deliver_runtime_webhooks_sends_event_and_case_payloads(self) -> None:
|
||||
deliveries: list[tuple[str, dict[str, object]]] = []
|
||||
|
||||
def fake_post(url: str, payload: dict[str, object], timeout: tuple[float, float]) -> tuple[int, str]:
|
||||
deliveries.append((url, payload))
|
||||
return 200, "ok"
|
||||
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
audit_path = Path(tmpdir) / "webhook_delivery.jsonl"
|
||||
deliver_runtime_webhooks(
|
||||
[
|
||||
{
|
||||
"event": "time_alarm",
|
||||
"ts": datetime(2026, 6, 9, 9, 0, tzinfo=UTC).isoformat(),
|
||||
"batch_id": "batch_000001",
|
||||
"camera_id": "cam_01",
|
||||
"zone_id": "1",
|
||||
"zone_label": "区域 1",
|
||||
"severity": "alarm",
|
||||
"state": "alerted",
|
||||
}
|
||||
],
|
||||
[
|
||||
{
|
||||
"case_id": "case_batch_000001",
|
||||
"batch_id": "batch_000001",
|
||||
"case_type": "time_alarm",
|
||||
"case_status": "open",
|
||||
"source_event": "time_alarm",
|
||||
"handled_source": "",
|
||||
"created_at": datetime(2026, 6, 9, 9, 0, tzinfo=UTC).isoformat(),
|
||||
"updated_at": datetime(2026, 6, 9, 9, 0, tzinfo=UTC).isoformat(),
|
||||
}
|
||||
],
|
||||
{
|
||||
"webhooks": {
|
||||
"enabled": True,
|
||||
"event_url": "https://example.com/events",
|
||||
"case_url": "https://example.com/cases",
|
||||
}
|
||||
},
|
||||
audit_path,
|
||||
http_post=fake_post,
|
||||
)
|
||||
|
||||
self.assertEqual(len(deliveries), 2)
|
||||
self.assertEqual(deliveries[0][1]["kind"], "batch_event")
|
||||
self.assertEqual(deliveries[1][1]["kind"], "case_event")
|
||||
|
||||
def test_restore_runtime_state_uses_stable_occupancy_when_raw_metrics_flicker(self) -> None:
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
diagnostics_path = Path(tmpdir) / "runtime_diagnostics.jsonl"
|
||||
|
||||
@@ -1,15 +1,47 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import http.client
|
||||
import json
|
||||
import tempfile
|
||||
import threading
|
||||
import unittest
|
||||
from http.server import ThreadingHTTPServer
|
||||
from pathlib import Path
|
||||
|
||||
from cold_display_guard.config import load_config_document, merge_calibration, save_config_document
|
||||
from cold_display_guard.manage_api import ManageContext, build_summary
|
||||
from cold_display_guard.manage_api import ManageContext, build_summary, config_payload, create_handler
|
||||
|
||||
|
||||
class ManageApiTests(unittest.TestCase):
|
||||
def _serve_once(self, ctx: ManageContext) -> tuple[ThreadingHTTPServer, threading.Thread]:
|
||||
server = ThreadingHTTPServer(("127.0.0.1", 0), create_handler(ctx))
|
||||
thread = threading.Thread(target=server.serve_forever, daemon=True)
|
||||
thread.start()
|
||||
return server, thread
|
||||
|
||||
def _request(
|
||||
self,
|
||||
server: ThreadingHTTPServer,
|
||||
method: str,
|
||||
path: str,
|
||||
body: dict | None = None,
|
||||
headers: dict[str, str] | None = None,
|
||||
) -> tuple[int, dict]:
|
||||
conn = http.client.HTTPConnection("127.0.0.1", server.server_address[1], timeout=5)
|
||||
payload = None if body is None else json.dumps(body)
|
||||
final_headers = {"Content-Type": "application/json"}
|
||||
final_headers.update(headers or {})
|
||||
conn.request(method, path, body=payload, headers=final_headers)
|
||||
response = conn.getresponse()
|
||||
raw = response.read().decode("utf-8")
|
||||
conn.close()
|
||||
return response.status, json.loads(raw or "{}")
|
||||
|
||||
def _stop_server(self, server: ThreadingHTTPServer, thread: threading.Thread) -> None:
|
||||
server.shutdown()
|
||||
thread.join()
|
||||
server.server_close()
|
||||
|
||||
def test_merge_calibration_updates_zones_and_trash(self) -> None:
|
||||
data = {
|
||||
"camera_id": "cam",
|
||||
@@ -350,6 +382,242 @@ class ManageApiTests(unittest.TestCase):
|
||||
|
||||
self.assertEqual(summary["metrics"]["latest_zone_counts"], {"1": 1, "2": 0})
|
||||
|
||||
def test_config_payload_exposes_case_sink_and_webhooks_without_callback_token(self) -> None:
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
root = Path(tmpdir)
|
||||
config_path = root / "config" / "local.toml"
|
||||
save_config_document(
|
||||
config_path,
|
||||
{
|
||||
"case_sink": {"path": "logs/cases.jsonl"},
|
||||
"webhooks": {
|
||||
"enabled": True,
|
||||
"event_url": "https://example.com/events",
|
||||
"case_url": "https://example.com/cases",
|
||||
"callback_token": "secret",
|
||||
},
|
||||
},
|
||||
)
|
||||
|
||||
payload = config_payload(ManageContext(config_path=config_path, project_root=root))
|
||||
|
||||
self.assertEqual(payload["case_sink"]["path"], str((root / "logs" / "cases.jsonl").resolve()))
|
||||
self.assertTrue(payload["webhooks"]["enabled"])
|
||||
self.assertNotIn("callback_token", payload["webhooks"])
|
||||
|
||||
def test_cases_endpoint_returns_latest_snapshots(self) -> None:
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
root = Path(tmpdir)
|
||||
config_path = root / "config" / "local.toml"
|
||||
save_config_document(
|
||||
config_path,
|
||||
{
|
||||
"case_sink": {"path": "logs/cases.jsonl"},
|
||||
"layout": {"zone_ids": ["1"]},
|
||||
},
|
||||
)
|
||||
cases_path = root / "logs" / "cases.jsonl"
|
||||
cases_path.parent.mkdir()
|
||||
cases_path.write_text(
|
||||
json.dumps(
|
||||
{
|
||||
"case_id": "case_batch_000001",
|
||||
"batch_id": "batch_000001",
|
||||
"camera_id": "cam_01",
|
||||
"zone_id": "1",
|
||||
"zone_label": "区域 1",
|
||||
"case_type": "time_alarm",
|
||||
"case_status": "open",
|
||||
"source_event": "time_alarm",
|
||||
"created_at": "2026-06-09T09:00:00+08:00",
|
||||
"updated_at": "2026-06-09T09:00:00+08:00",
|
||||
"payload": {},
|
||||
}
|
||||
),
|
||||
encoding="utf-8",
|
||||
)
|
||||
ctx = ManageContext(config_path=config_path, project_root=root)
|
||||
server, thread = self._serve_once(ctx)
|
||||
try:
|
||||
status, payload = self._request(server, "GET", "/api/manage/cases?status=open")
|
||||
finally:
|
||||
self._stop_server(server, thread)
|
||||
|
||||
self.assertEqual(status, 200)
|
||||
self.assertEqual(len(payload["items"]), 1)
|
||||
self.assertEqual(payload["items"][0]["case_id"], "case_batch_000001")
|
||||
|
||||
def test_case_summary_endpoint_counts_open_and_handled(self) -> None:
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
root = Path(tmpdir)
|
||||
config_path = root / "config" / "local.toml"
|
||||
save_config_document(
|
||||
config_path,
|
||||
{
|
||||
"case_sink": {"path": "logs/cases.jsonl"},
|
||||
"layout": {"zone_ids": ["1"]},
|
||||
},
|
||||
)
|
||||
cases_path = root / "logs" / "cases.jsonl"
|
||||
cases_path.parent.mkdir()
|
||||
cases_path.write_text(
|
||||
"\n".join(
|
||||
[
|
||||
json.dumps(
|
||||
{
|
||||
"case_id": "case_batch_000001",
|
||||
"batch_id": "batch_000001",
|
||||
"camera_id": "cam_01",
|
||||
"zone_id": "1",
|
||||
"zone_label": "区域 1",
|
||||
"case_type": "time_alarm",
|
||||
"case_status": "open",
|
||||
"source_event": "time_alarm",
|
||||
"created_at": "2026-06-09T09:00:00+08:00",
|
||||
"updated_at": "2026-06-09T09:00:00+08:00",
|
||||
"payload": {},
|
||||
}
|
||||
),
|
||||
json.dumps(
|
||||
{
|
||||
"case_id": "case_batch_000002",
|
||||
"batch_id": "batch_000002",
|
||||
"camera_id": "cam_01",
|
||||
"zone_id": "1",
|
||||
"zone_label": "区域 1",
|
||||
"case_type": "warning_escalated",
|
||||
"case_status": "handled",
|
||||
"source_event": "warning_escalated",
|
||||
"created_at": "2026-06-09T09:01:00+08:00",
|
||||
"updated_at": "2026-06-09T09:05:00+08:00",
|
||||
"handled_source": "manual",
|
||||
"payload": {},
|
||||
}
|
||||
),
|
||||
]
|
||||
),
|
||||
encoding="utf-8",
|
||||
)
|
||||
ctx = ManageContext(config_path=config_path, project_root=root)
|
||||
server, thread = self._serve_once(ctx)
|
||||
try:
|
||||
status, payload = self._request(server, "GET", "/api/manage/cases/summary")
|
||||
finally:
|
||||
self._stop_server(server, thread)
|
||||
|
||||
self.assertEqual(status, 200)
|
||||
self.assertEqual(payload["open_case_count"], 1)
|
||||
self.assertEqual(payload["handled_case_count"], 1)
|
||||
self.assertEqual(payload["warning_escalated_case_count"], 1)
|
||||
|
||||
def test_manual_handle_endpoint_appends_handled_snapshot(self) -> None:
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
root = Path(tmpdir)
|
||||
config_path = root / "config" / "local.toml"
|
||||
save_config_document(
|
||||
config_path,
|
||||
{
|
||||
"case_sink": {"path": "logs/cases.jsonl"},
|
||||
"layout": {"zone_ids": ["1"]},
|
||||
},
|
||||
)
|
||||
cases_path = root / "logs" / "cases.jsonl"
|
||||
cases_path.parent.mkdir()
|
||||
cases_path.write_text(
|
||||
json.dumps(
|
||||
{
|
||||
"case_id": "case_batch_000001",
|
||||
"batch_id": "batch_000001",
|
||||
"camera_id": "cam_01",
|
||||
"zone_id": "1",
|
||||
"zone_label": "区域 1",
|
||||
"case_type": "time_alarm",
|
||||
"case_status": "open",
|
||||
"source_event": "time_alarm",
|
||||
"created_at": "2026-06-09T09:00:00+08:00",
|
||||
"updated_at": "2026-06-09T09:00:00+08:00",
|
||||
"payload": {},
|
||||
}
|
||||
),
|
||||
encoding="utf-8",
|
||||
)
|
||||
ctx = ManageContext(config_path=config_path, project_root=root)
|
||||
server, thread = self._serve_once(ctx)
|
||||
try:
|
||||
status, payload = self._request(
|
||||
server,
|
||||
"POST",
|
||||
"/api/manage/cases/case_batch_000001/handle",
|
||||
body={"handled_by": "alice", "note": "checked"},
|
||||
)
|
||||
finally:
|
||||
self._stop_server(server, thread)
|
||||
|
||||
lines = [json.loads(line) for line in cases_path.read_text(encoding="utf-8").splitlines()]
|
||||
|
||||
self.assertEqual(status, 200)
|
||||
self.assertEqual(payload["case_status"], "handled")
|
||||
self.assertEqual(lines[-1]["handled_source"], "manual")
|
||||
self.assertEqual(lines[-1]["payload"]["note"], "checked")
|
||||
|
||||
def test_callback_endpoint_requires_token_and_handles_case(self) -> None:
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
root = Path(tmpdir)
|
||||
config_path = root / "config" / "local.toml"
|
||||
save_config_document(
|
||||
config_path,
|
||||
{
|
||||
"case_sink": {"path": "logs/cases.jsonl"},
|
||||
"webhooks": {"callback_token": "secret"},
|
||||
"layout": {"zone_ids": ["1"]},
|
||||
},
|
||||
)
|
||||
cases_path = root / "logs" / "cases.jsonl"
|
||||
cases_path.parent.mkdir()
|
||||
cases_path.write_text(
|
||||
json.dumps(
|
||||
{
|
||||
"case_id": "case_batch_000001",
|
||||
"batch_id": "batch_000001",
|
||||
"camera_id": "cam_01",
|
||||
"zone_id": "1",
|
||||
"zone_label": "区域 1",
|
||||
"case_type": "time_alarm",
|
||||
"case_status": "open",
|
||||
"source_event": "time_alarm",
|
||||
"created_at": "2026-06-09T09:00:00+08:00",
|
||||
"updated_at": "2026-06-09T09:00:00+08:00",
|
||||
"payload": {},
|
||||
}
|
||||
),
|
||||
encoding="utf-8",
|
||||
)
|
||||
ctx = ManageContext(config_path=config_path, project_root=root)
|
||||
server, thread = self._serve_once(ctx)
|
||||
try:
|
||||
unauthorized_status, _ = self._request(
|
||||
server,
|
||||
"POST",
|
||||
"/api/manage/webhooks/case-update",
|
||||
body={"case_id": "case_batch_000001", "status": "handled"},
|
||||
)
|
||||
status, payload = self._request(
|
||||
server,
|
||||
"POST",
|
||||
"/api/manage/webhooks/case-update",
|
||||
body={"case_id": "case_batch_000001", "status": "handled", "handled_by": "crm-bot"},
|
||||
headers={"X-Webhook-Token": "secret"},
|
||||
)
|
||||
finally:
|
||||
self._stop_server(server, thread)
|
||||
|
||||
lines = [json.loads(line) for line in cases_path.read_text(encoding="utf-8").splitlines()]
|
||||
|
||||
self.assertEqual(unauthorized_status, 403)
|
||||
self.assertEqual(status, 200)
|
||||
self.assertEqual(payload["handled_source"], "webhook_callback")
|
||||
self.assertEqual(lines[-1]["handled_source"], "webhook_callback")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
|
||||
187
tests/test_webhooks.py
Normal file
187
tests/test_webhooks.py
Normal file
@@ -0,0 +1,187 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import tempfile
|
||||
import unittest
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
|
||||
from cold_display_guard.webhooks import (
|
||||
build_batch_event_payload,
|
||||
build_case_event_payload,
|
||||
load_webhook_settings,
|
||||
send_batch_event_webhooks,
|
||||
send_case_webhooks,
|
||||
)
|
||||
|
||||
|
||||
UTC = timezone.utc
|
||||
|
||||
|
||||
class WebhookTests(unittest.TestCase):
|
||||
def test_load_webhook_settings_from_config(self) -> None:
|
||||
settings = load_webhook_settings(
|
||||
{
|
||||
"webhooks": {
|
||||
"enabled": True,
|
||||
"event_url": "https://example.com/events",
|
||||
"case_url": "https://example.com/cases",
|
||||
"callback_token": "secret",
|
||||
"connect_timeout_seconds": 4,
|
||||
"read_timeout_seconds": 6,
|
||||
}
|
||||
}
|
||||
)
|
||||
|
||||
self.assertTrue(settings.enabled)
|
||||
self.assertEqual(settings.event_url, "https://example.com/events")
|
||||
self.assertEqual(settings.case_url, "https://example.com/cases")
|
||||
self.assertEqual(settings.callback_token, "secret")
|
||||
self.assertEqual(settings.connect_timeout_seconds, 4)
|
||||
self.assertEqual(settings.read_timeout_seconds, 6)
|
||||
|
||||
def test_build_batch_event_payload_wraps_runtime_event(self) -> None:
|
||||
payload = build_batch_event_payload(
|
||||
{
|
||||
"event": "time_alarm",
|
||||
"ts": datetime(2026, 6, 9, 9, 0, tzinfo=UTC).isoformat(),
|
||||
"batch_id": "batch_000001",
|
||||
"camera_id": "cam_01",
|
||||
"zone_id": "1",
|
||||
"zone_label": "区域 1",
|
||||
"severity": "alarm",
|
||||
"state": "alerted",
|
||||
}
|
||||
)
|
||||
|
||||
self.assertEqual(payload["kind"], "batch_event")
|
||||
self.assertEqual(payload["event"], "time_alarm")
|
||||
self.assertEqual(payload["zone_label"], "区域 1")
|
||||
|
||||
def test_build_case_event_payload_wraps_case_snapshot(self) -> None:
|
||||
payload = build_case_event_payload(
|
||||
{
|
||||
"case_id": "case_batch_000001",
|
||||
"case_type": "warning_escalated",
|
||||
"case_status": "open",
|
||||
"batch_id": "batch_000001",
|
||||
"source_event": "warning_escalated",
|
||||
"handled_source": "",
|
||||
"updated_at": datetime(2026, 6, 9, 9, 5, tzinfo=UTC).isoformat(),
|
||||
}
|
||||
)
|
||||
|
||||
self.assertEqual(payload["kind"], "case_event")
|
||||
self.assertEqual(payload["action"], "updated")
|
||||
self.assertEqual(payload["case_id"], "case_batch_000001")
|
||||
|
||||
def test_send_batch_event_webhooks_delivers_payload(self) -> None:
|
||||
deliveries: list[tuple[str, dict[str, object], tuple[float, float]]] = []
|
||||
|
||||
def fake_post(url: str, payload: dict[str, object], timeout: tuple[float, float]) -> tuple[int, str]:
|
||||
deliveries.append((url, payload, timeout))
|
||||
return 202, "ok"
|
||||
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
audit_path = Path(tmpdir) / "webhook_delivery.jsonl"
|
||||
send_batch_event_webhooks(
|
||||
[
|
||||
{
|
||||
"event": "time_alarm",
|
||||
"ts": datetime(2026, 6, 9, 9, 0, tzinfo=UTC).isoformat(),
|
||||
"batch_id": "batch_000001",
|
||||
"camera_id": "cam_01",
|
||||
"zone_id": "1",
|
||||
"zone_label": "区域 1",
|
||||
"severity": "alarm",
|
||||
"state": "alerted",
|
||||
}
|
||||
],
|
||||
{
|
||||
"webhooks": {
|
||||
"enabled": True,
|
||||
"event_url": "https://example.com/events",
|
||||
"connect_timeout_seconds": 4,
|
||||
"read_timeout_seconds": 6,
|
||||
}
|
||||
},
|
||||
audit_path,
|
||||
http_post=fake_post,
|
||||
)
|
||||
|
||||
self.assertEqual(deliveries[0][0], "https://example.com/events")
|
||||
self.assertEqual(deliveries[0][1]["kind"], "batch_event")
|
||||
self.assertEqual(deliveries[0][2], (4.0, 6.0))
|
||||
|
||||
def test_send_case_webhooks_delivers_payload(self) -> None:
|
||||
deliveries: list[tuple[str, dict[str, object]]] = []
|
||||
|
||||
def fake_post(url: str, payload: dict[str, object], timeout: tuple[float, float]) -> tuple[int, str]:
|
||||
deliveries.append((url, payload))
|
||||
return 200, "ok"
|
||||
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
audit_path = Path(tmpdir) / "webhook_delivery.jsonl"
|
||||
send_case_webhooks(
|
||||
[
|
||||
{
|
||||
"case_id": "case_batch_000001",
|
||||
"case_type": "time_alarm",
|
||||
"case_status": "handled",
|
||||
"batch_id": "batch_000001",
|
||||
"source_event": "time_alarm",
|
||||
"handled_source": "manual",
|
||||
"updated_at": datetime(2026, 6, 9, 9, 10, tzinfo=UTC).isoformat(),
|
||||
}
|
||||
],
|
||||
{
|
||||
"webhooks": {
|
||||
"enabled": True,
|
||||
"case_url": "https://example.com/cases",
|
||||
}
|
||||
},
|
||||
audit_path,
|
||||
http_post=fake_post,
|
||||
)
|
||||
|
||||
self.assertEqual(deliveries[0][0], "https://example.com/cases")
|
||||
self.assertEqual(deliveries[0][1]["kind"], "case_event")
|
||||
self.assertEqual(deliveries[0][1]["action"], "handled")
|
||||
|
||||
def test_failed_delivery_is_logged_without_raising(self) -> None:
|
||||
def fake_post(url: str, payload: dict[str, object], timeout: tuple[float, float]) -> tuple[int, str]:
|
||||
raise OSError("network down")
|
||||
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
audit_path = Path(tmpdir) / "webhook_delivery.jsonl"
|
||||
send_batch_event_webhooks(
|
||||
[
|
||||
{
|
||||
"event": "time_alarm",
|
||||
"ts": datetime(2026, 6, 9, 9, 0, tzinfo=UTC).isoformat(),
|
||||
"batch_id": "batch_000001",
|
||||
"camera_id": "cam_01",
|
||||
"zone_id": "1",
|
||||
"zone_label": "区域 1",
|
||||
"severity": "alarm",
|
||||
"state": "alerted",
|
||||
}
|
||||
],
|
||||
{
|
||||
"webhooks": {
|
||||
"enabled": True,
|
||||
"event_url": "https://example.com/events",
|
||||
}
|
||||
},
|
||||
audit_path,
|
||||
http_post=fake_post,
|
||||
)
|
||||
logged = [json.loads(line) for line in audit_path.read_text(encoding="utf-8").splitlines()]
|
||||
|
||||
self.assertEqual(logged[0]["status"], "error")
|
||||
self.assertEqual(logged[0]["target"], "batch_event")
|
||||
self.assertIn("network down", logged[0]["message"])
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
Reference in New Issue
Block a user