from __future__ import annotations import http.client import json import tempfile import threading import unittest from http.server import ThreadingHTTPServer from pathlib import Path from unittest import mock from cold_display_guard.config import load_config_document, merge_calibration, save_config_document from cold_display_guard.manage_api import ManageContext, build_summary, config_payload, create_handler class ManageApiTests(unittest.TestCase): def _serve_once(self, ctx: ManageContext) -> tuple[ThreadingHTTPServer, threading.Thread]: server = ThreadingHTTPServer(("127.0.0.1", 0), create_handler(ctx)) thread = threading.Thread(target=server.serve_forever, daemon=True) thread.start() return server, thread def _request( self, server: ThreadingHTTPServer, method: str, path: str, body: dict | None = None, headers: dict[str, str] | None = None, ) -> tuple[int, dict]: conn = http.client.HTTPConnection("127.0.0.1", server.server_address[1], timeout=5) payload = None if body is None else json.dumps(body) final_headers = {"Content-Type": "application/json"} final_headers.update(headers or {}) conn.request(method, path, body=payload, headers=final_headers) response = conn.getresponse() raw = response.read().decode("utf-8") conn.close() return response.status, json.loads(raw or "{}") def _stop_server(self, server: ThreadingHTTPServer, thread: threading.Thread) -> None: server.shutdown() thread.join() server.server_close() def test_merge_calibration_updates_zones_and_trash(self) -> None: data = { "camera_id": "cam", "layout": {"rows": 2, "cols": 4, "zone_ids": ["r1c1", "r1c2"]}, "zones": [{"id": "r1c2", "polygon": [[0.5, 0], [1, 0], [1, 0.5]]}], } merged = merge_calibration( data, [{"id": "r1c1", "polygon": [[0, 0], [0.5, 0], [0.5, 0.5], [0, 0.5]]}], [[0.8, 0.8], [1, 0.8], [1, 1], [0.8, 1]], ) self.assertEqual(merged["layout"]["zone_ids"], ["r1c1", "r1c2"]) self.assertEqual(merged["zones"][0]["id"], "r1c1") self.assertEqual(merged["zones"][1]["id"], "r1c2") self.assertEqual(merged["trash"]["roi"][0], [0.8, 0.8]) def test_merge_calibration_replaces_numeric_food_zones_and_keeps_trash_separate(self) -> None: data = { "layout": {"zone_count": 2, "zone_ids": ["1", "2"]}, "zones": [ {"id": "1", "polygon": [[0, 0], [0.3, 0], [0.3, 0.3]]}, {"id": "2", "polygon": [[0.3, 0], [0.6, 0], [0.6, 0.3]]}, ], } merged = merge_calibration( data, [ {"id": "1", "label": "区域 1", "polygon": [[0, 0], [0.2, 0], [0.2, 0.2]]}, {"id": "2", "label": "区域 2", "polygon": [[0.2, 0], [0.4, 0], [0.4, 0.2]]}, {"id": "3", "label": "区域 3", "polygon": [[0.4, 0], [0.6, 0], [0.6, 0.2]]}, ], [[0.8, 0.8], [1, 0.8], [1, 1], [0.8, 1]], ) self.assertEqual(merged["layout"]["zone_count"], 3) self.assertEqual(merged["layout"]["zone_ids"], ["1", "2", "3"]) self.assertEqual([zone["label"] for zone in merged["zones"]], ["区域 1", "区域 2", "区域 3"]) self.assertEqual(merged["trash"]["roi"][0], [0.8, 0.8]) self.assertNotIn("trash", merged["layout"]["zone_ids"]) def test_merge_calibration_preserves_numeric_zone_count_when_some_zones_are_unmarked(self) -> None: data = { "layout": {"zone_count": 3, "zone_ids": ["1", "2", "3"]}, "zones": [ {"id": "1", "label": "区域 1", "polygon": [[0, 0], [0.2, 0], [0.2, 0.2]]}, {"id": "2", "label": "区域 2", "polygon": [[0.2, 0], [0.4, 0], [0.4, 0.2]]}, ], } merged = merge_calibration( data, [{"id": "1", "label": "区域 1", "polygon": [[0, 0], [0.3, 0], [0.3, 0.3]]}], [[0.8, 0.8], [1, 0.8], [1, 1]], {"zone_count": 3, "zone_ids": ["1", "2", "3"]}, ) self.assertEqual(merged["layout"]["zone_count"], 3) self.assertEqual(merged["layout"]["zone_ids"], ["1", "2", "3"]) self.assertEqual([zone["id"] for zone in merged["zones"]], ["1", "2"]) self.assertEqual(merged["zones"][0]["polygon"], [[0.0, 0.0], [0.3, 0.0], [0.3, 0.3]]) self.assertEqual(merged["zones"][1]["polygon"], [[0.2, 0.0], [0.4, 0.0], [0.4, 0.2]]) def test_merge_calibration_rejects_more_than_ten_numeric_food_zones(self) -> None: zones = [ {"id": str(index), "polygon": [[0, 0], [0.1, 0], [0.1, 0.1]]} for index in range(1, 12) ] with self.assertRaisesRegex(ValueError, "1 to 10"): merge_calibration({"layout": {}}, zones, None) def test_save_config_document_round_trips_manage_fields(self) -> None: with tempfile.TemporaryDirectory() as tmpdir: path = Path(tmpdir) / "config.toml" save_config_document( path, { "camera_id": "cam", "timezone": "Asia/Shanghai", "stream": {"rtsp_url": "rtsp://example"}, "thresholds": {"max_dwell_seconds": 30, "trash_confirmation_seconds": 5}, "layout": {"rows": 1, "cols": 1, "zone_ids": ["r1c1"]}, "zones": [{"id": "r1c1", "polygon": [[0, 0], [1, 0], [1, 1]]}], "trash": {"roi": [[0, 0], [1, 0], [1, 1]]}, "event_sink": {"path": "logs/events.jsonl"}, }, ) loaded = load_config_document(path) self.assertEqual(loaded["stream"]["rtsp_url"], "rtsp://example") self.assertEqual(loaded["zones"][0]["polygon"][1], [1.0, 0.0]) def test_summary_reads_event_jsonl(self) -> None: with tempfile.TemporaryDirectory() as tmpdir: root = Path(tmpdir) config_path = root / "config" / "local.toml" save_config_document( config_path, { "event_sink": {"path": "logs/events.jsonl"}, "layout": {"rows": 1, "cols": 1, "zone_ids": ["r1c1"]}, }, ) events_path = root / "logs" / "events.jsonl" events_path.parent.mkdir() events_path.write_text( "\n".join( [ json.dumps({"event": "batch_started", "severity": "info", "ts": "2026-04-27T10:00:00+08:00"}), json.dumps({"event": "time_alarm", "severity": "alarm", "ts": "2026-04-27T12:00:00+08:00"}), json.dumps({"event": "warning_escalated", "severity": "warning", "ts": "2026-04-27T13:02:00+08:00"}), ] ), encoding="utf-8", ) summary = build_summary(ManageContext(config_path=config_path, project_root=root)) self.assertEqual(summary["metrics"]["event_count"], 3) self.assertEqual(summary["metrics"]["alert_count"], 1) self.assertEqual(summary["metrics"]["warning_count"], 1) self.assertEqual(summary["metrics"]["violation_count"], 1) self.assertEqual(summary["metrics"]["latest_alert_time"], "2026-04-27T13:02:00+08:00") def test_summary_counts_escalated_and_legacy_warnings_without_pending_disposal(self) -> None: with tempfile.TemporaryDirectory() as tmpdir: root = Path(tmpdir) config_path = root / "config" / "local.toml" save_config_document( config_path, { "event_sink": {"path": "logs/events.jsonl"}, "layout": {"rows": 1, "cols": 1, "zone_ids": ["r1c1"]}, }, ) events_path = root / "logs" / "events.jsonl" events_path.parent.mkdir() events_path.write_text( "\n".join( [ json.dumps({"event": "batch_pending_disposal", "severity": "warning", "ts": "2026-04-27T12:01:00+08:00"}), json.dumps({"event": "mixed_batch_violation", "ts": "2026-04-27T12:02:00+08:00"}), json.dumps({"event": "warning_escalated", "severity": "warning", "ts": "2026-04-27T12:03:00+08:00"}), ] ), encoding="utf-8", ) summary = build_summary(ManageContext(config_path=config_path, project_root=root)) self.assertEqual(summary["metrics"]["event_count"], 3) self.assertEqual(summary["metrics"]["alert_count"], 0) self.assertEqual(summary["metrics"]["warning_count"], 2) self.assertEqual(summary["metrics"]["violation_count"], 2) self.assertEqual(summary["metrics"]["latest_alert_time"], "2026-04-27T12:03:00+08:00") def test_summary_reads_runtime_diagnostics(self) -> None: with tempfile.TemporaryDirectory() as tmpdir: root = Path(tmpdir) config_path = root / "config" / "local.toml" save_config_document( config_path, { "runtime": {"diagnostics_path": "logs/runtime_diagnostics.jsonl"}, "event_sink": {"path": "logs/events.jsonl"}, "layout": {"rows": 1, "cols": 1, "zone_ids": ["r1c1"]}, }, ) diagnostics_path = root / "logs" / "runtime_diagnostics.jsonl" diagnostics_path.parent.mkdir() diagnostics_path.write_text( json.dumps( { "ts": "2026-04-28T10:00:00+08:00", "zone_counts": {"r1c1": 1}, "diagnostics": {"baseline_ready": True}, } ), encoding="utf-8", ) summary = build_summary(ManageContext(config_path=config_path, project_root=root)) self.assertEqual(summary["metrics"]["diagnostics_count"], 1) self.assertEqual(summary["metrics"]["latest_zone_counts"], {"r1c1": 1}) self.assertTrue(summary["metrics"]["baseline_ready"]) def test_summary_uses_stable_runtime_occupancy_when_raw_metrics_flicker(self) -> None: with tempfile.TemporaryDirectory() as tmpdir: root = Path(tmpdir) config_path = root / "config" / "local.toml" save_config_document( config_path, { "runtime": { "diagnostics_path": "logs/runtime_diagnostics.jsonl", "occupancy_mean_delta": 55.0, "occupancy_texture_delta": 18.0, "occupancy_dark_fraction": 0.06, "occupancy_texture_dark_fraction": 0.04, }, "event_sink": {"path": "logs/events.jsonl"}, "layout": {"zone_count": 2, "zone_ids": ["1", "2"]}, }, ) diagnostics_path = root / "logs" / "runtime_diagnostics.jsonl" diagnostics_path.parent.mkdir() diagnostics_path.write_text( json.dumps( { "ts": "2026-05-29T10:05:26+08:00", "zone_counts": {"1": 0, "2": 1}, "diagnostics": { "baseline_ready": True, "zones": { "1": { "mean_delta": 0.0, "texture_delta": 0.0, "dark_fraction": 0.0, "baseline_dark_fraction": 0.0, "bright_fraction": 0.0, "occupied": False, }, "2": { "mean_delta": 17.077, "texture_delta": 8.819, "dark_fraction": 0.0357, "baseline_dark_fraction": 0.0, "bright_fraction": 0.0, "raw_occupied": False, "occupied": True, "empty_streak": 1, }, }, }, } ), encoding="utf-8", ) summary = build_summary(ManageContext(config_path=config_path, project_root=root)) self.assertEqual(summary["metrics"]["latest_zone_counts"], {"1": 0, "2": 1}) def test_summary_recomputes_latest_zone_counts_from_runtime_thresholds_when_stable_state_is_absent(self) -> None: with tempfile.TemporaryDirectory() as tmpdir: root = Path(tmpdir) config_path = root / "config" / "local.toml" save_config_document( config_path, { "runtime": { "diagnostics_path": "logs/runtime_diagnostics.jsonl", "occupancy_mean_delta": 45.0, "occupancy_texture_delta": 18.0, }, "event_sink": {"path": "logs/events.jsonl"}, "layout": {"zone_count": 3, "zone_ids": ["1", "2", "3"]}, }, ) diagnostics_path = root / "logs" / "runtime_diagnostics.jsonl" diagnostics_path.parent.mkdir() diagnostics_path.write_text( json.dumps( { "ts": "2026-05-27T11:02:23+08:00", "zone_counts": {"1": 1, "3": 1}, "diagnostics": { "baseline_ready": True, "zones": { "1": {"mean_delta": 70.0, "texture_delta": 27.0}, "3": {"mean_delta": 36.0, "texture_delta": -9.0}, }, }, } ), encoding="utf-8", ) summary = build_summary(ManageContext(config_path=config_path, project_root=root)) self.assertEqual(summary["metrics"]["latest_zone_counts"], {"1": 1, "3": 0}) def test_summary_recomputes_latest_zone_counts_with_dark_fraction_rule(self) -> None: with tempfile.TemporaryDirectory() as tmpdir: root = Path(tmpdir) config_path = root / "config" / "local.toml" save_config_document( config_path, { "runtime": { "diagnostics_path": "logs/runtime_diagnostics.jsonl", "occupancy_mean_delta": 55.0, "occupancy_texture_delta": 18.0, "occupancy_dark_fraction": 0.06, "occupancy_texture_dark_fraction": 0.04, "occupancy_bright_reflection_fraction": 0.18, }, "event_sink": {"path": "logs/events.jsonl"}, "layout": {"zone_count": 2, "zone_ids": ["1", "2"]}, }, ) diagnostics_path = root / "logs" / "runtime_diagnostics.jsonl" diagnostics_path.parent.mkdir() diagnostics_path.write_text( json.dumps( { "ts": "2026-05-28T09:41:13+08:00", "zone_counts": {"1": 1, "2": 1}, "diagnostics": { "baseline_ready": True, "zones": { "1": { "mean_delta": 45.0, "texture_delta": 20.0, "dark_fraction": 0.20, "baseline_dark_fraction": 0.0, "bright_fraction": 0.0, }, "2": { "mean_delta": 16.0, "texture_delta": 40.0, "dark_fraction": 0.0769, "baseline_dark_fraction": 0.0, "bright_fraction": 0.3077, }, }, }, } ), encoding="utf-8", ) summary = build_summary(ManageContext(config_path=config_path, project_root=root)) self.assertEqual(summary["metrics"]["latest_zone_counts"], {"1": 1, "2": 0}) def test_config_payload_exposes_case_sink_and_webhooks_without_callback_token(self) -> None: with tempfile.TemporaryDirectory() as tmpdir: root = Path(tmpdir) config_path = root / "config" / "local.toml" save_config_document( config_path, { "case_sink": {"path": "logs/cases.jsonl"}, "webhook_retry_sink": {"path": "logs/webhook_retry.jsonl"}, "alarm_snapshot_upload": { "enabled": True, "service_url": "https://ota.zhengxinshipin.com", "secret": "change-me-in-production", }, "webhooks": { "enabled": True, "event_url": "https://example.com/events", "case_url": "https://example.com/cases", "callback_token": "secret", "retry_max_attempts": 4, }, }, ) payload = config_payload(ManageContext(config_path=config_path, project_root=root)) self.assertEqual(payload["case_sink"]["path"], str((root / "logs" / "cases.jsonl").resolve())) self.assertEqual(payload["webhook_retry_sink"]["path"], str((root / "logs" / "webhook_retry.jsonl").resolve())) self.assertTrue(payload["alarm_snapshot_upload"]["enabled"]) self.assertEqual(payload["alarm_snapshot_upload"]["service_url"], "https://ota.zhengxinshipin.com") self.assertNotIn("secret", payload["alarm_snapshot_upload"]) self.assertTrue(payload["webhooks"]["enabled"]) self.assertEqual(payload["webhooks"]["retry_max_attempts"], 4) self.assertNotIn("callback_token", payload["webhooks"]) def test_cases_endpoint_returns_latest_snapshots(self) -> None: with tempfile.TemporaryDirectory() as tmpdir: root = Path(tmpdir) config_path = root / "config" / "local.toml" save_config_document( config_path, { "case_sink": {"path": "logs/cases.jsonl"}, "layout": {"zone_ids": ["1"]}, }, ) cases_path = root / "logs" / "cases.jsonl" cases_path.parent.mkdir() cases_path.write_text( json.dumps( { "case_id": "case_batch_000001", "batch_id": "batch_000001", "camera_id": "cam_01", "zone_id": "1", "zone_label": "区域 1", "case_type": "time_alarm", "case_status": "open", "source_event": "time_alarm", "created_at": "2026-06-09T09:00:00+08:00", "updated_at": "2026-06-09T09:00:00+08:00", "payload": {}, } ), encoding="utf-8", ) ctx = ManageContext(config_path=config_path, project_root=root) server, thread = self._serve_once(ctx) try: status, payload = self._request(server, "GET", "/api/manage/cases?status=open") finally: self._stop_server(server, thread) self.assertEqual(status, 200) self.assertEqual(len(payload["items"]), 1) self.assertEqual(payload["items"][0]["case_id"], "case_batch_000001") def test_case_summary_endpoint_counts_open_and_handled(self) -> None: with tempfile.TemporaryDirectory() as tmpdir: root = Path(tmpdir) config_path = root / "config" / "local.toml" save_config_document( config_path, { "case_sink": {"path": "logs/cases.jsonl"}, "layout": {"zone_ids": ["1"]}, }, ) cases_path = root / "logs" / "cases.jsonl" cases_path.parent.mkdir() cases_path.write_text( "\n".join( [ json.dumps( { "case_id": "case_batch_000001", "batch_id": "batch_000001", "camera_id": "cam_01", "zone_id": "1", "zone_label": "区域 1", "case_type": "time_alarm", "case_status": "open", "source_event": "time_alarm", "created_at": "2026-06-09T09:00:00+08:00", "updated_at": "2026-06-09T09:00:00+08:00", "payload": {}, } ), json.dumps( { "case_id": "case_batch_000002", "batch_id": "batch_000002", "camera_id": "cam_01", "zone_id": "1", "zone_label": "区域 1", "case_type": "warning_escalated", "case_status": "handled", "source_event": "warning_escalated", "created_at": "2026-06-09T09:01:00+08:00", "updated_at": "2026-06-09T09:05:00+08:00", "handled_source": "manual", "payload": {}, } ), ] ), encoding="utf-8", ) ctx = ManageContext(config_path=config_path, project_root=root) server, thread = self._serve_once(ctx) try: status, payload = self._request(server, "GET", "/api/manage/cases/summary") finally: self._stop_server(server, thread) self.assertEqual(status, 200) self.assertEqual(payload["open_case_count"], 1) self.assertEqual(payload["handled_case_count"], 1) self.assertEqual(payload["warning_escalated_case_count"], 1) def test_manual_handle_endpoint_appends_handled_snapshot(self) -> None: with tempfile.TemporaryDirectory() as tmpdir: root = Path(tmpdir) config_path = root / "config" / "local.toml" save_config_document( config_path, { "case_sink": {"path": "logs/cases.jsonl"}, "layout": {"zone_ids": ["1"]}, }, ) cases_path = root / "logs" / "cases.jsonl" cases_path.parent.mkdir() cases_path.write_text( json.dumps( { "case_id": "case_batch_000001", "batch_id": "batch_000001", "camera_id": "cam_01", "zone_id": "1", "zone_label": "区域 1", "case_type": "time_alarm", "case_status": "open", "source_event": "time_alarm", "created_at": "2026-06-09T09:00:00+08:00", "updated_at": "2026-06-09T09:00:00+08:00", "payload": {}, } ), encoding="utf-8", ) ctx = ManageContext(config_path=config_path, project_root=root) server, thread = self._serve_once(ctx) try: status, payload = self._request( server, "POST", "/api/manage/cases/case_batch_000001/handle", body={"handled_by": "alice", "note": "checked"}, ) finally: self._stop_server(server, thread) lines = [json.loads(line) for line in cases_path.read_text(encoding="utf-8").splitlines()] self.assertEqual(status, 200) self.assertEqual(payload["case_status"], "handled") self.assertEqual(lines[-1]["handled_source"], "manual") self.assertEqual(lines[-1]["payload"]["note"], "checked") def test_manual_handle_endpoint_enqueues_failed_case_webhook_for_retry(self) -> None: with tempfile.TemporaryDirectory() as tmpdir: root = Path(tmpdir) config_path = root / "config" / "local.toml" save_config_document( config_path, { "case_sink": {"path": "logs/cases.jsonl"}, "webhooks": { "enabled": True, "case_url": "https://example.com/cases", "retry_max_attempts": 3, "retry_backoff_seconds": 30, }, "layout": {"zone_ids": ["1"]}, }, ) cases_path = root / "logs" / "cases.jsonl" retry_path = root / "logs" / "webhook_retry.jsonl" cases_path.parent.mkdir() cases_path.write_text( json.dumps( { "case_id": "case_batch_000001", "batch_id": "batch_000001", "camera_id": "cam_01", "zone_id": "1", "zone_label": "区域 1", "case_type": "time_alarm", "case_status": "open", "source_event": "time_alarm", "created_at": "2026-06-09T09:00:00+08:00", "updated_at": "2026-06-09T09:00:00+08:00", "payload": {}, } ), encoding="utf-8", ) ctx = ManageContext(config_path=config_path, project_root=root) server, thread = self._serve_once(ctx) try: with mock.patch("cold_display_guard.webhooks.post_json", side_effect=OSError("network down")): status, payload = self._request( server, "POST", "/api/manage/cases/case_batch_000001/handle", body={"handled_by": "alice"}, ) finally: self._stop_server(server, thread) retries = [json.loads(line) for line in retry_path.read_text(encoding="utf-8").splitlines()] self.assertEqual(status, 200) self.assertEqual(payload["case_status"], "handled") self.assertEqual(retries[-1]["status"], "pending") self.assertEqual(retries[-1]["target"], "case_event") self.assertEqual(retries[-1]["attempt_count"], 1) def test_retry_queue_endpoint_returns_pending_items(self) -> None: with tempfile.TemporaryDirectory() as tmpdir: root = Path(tmpdir) config_path = root / "config" / "local.toml" save_config_document(config_path, {"layout": {"zone_ids": ["1"]}}) retry_path = root / "logs" / "webhook_retry.jsonl" retry_path.parent.mkdir() retry_path.write_text( json.dumps( { "retry_id": "retry_000001", "target": "case_event", "url": "https://example.com/cases", "status": "pending", "attempt_count": 1, "payload": {"kind": "case_event"}, "created_at": "2026-06-09T09:00:00+08:00", "updated_at": "2026-06-09T09:00:00+08:00", "next_attempt_at": "2026-06-09T09:01:00+08:00", } ), encoding="utf-8", ) ctx = ManageContext(config_path=config_path, project_root=root) server, thread = self._serve_once(ctx) try: status, payload = self._request(server, "GET", "/api/manage/webhooks/retries?status=pending") finally: self._stop_server(server, thread) self.assertEqual(status, 200) self.assertEqual(payload["items"][0]["retry_id"], "retry_000001") self.assertEqual(payload["items"][0]["status"], "pending") def test_retry_drain_endpoint_retries_pending_item(self) -> None: with tempfile.TemporaryDirectory() as tmpdir: root = Path(tmpdir) config_path = root / "config" / "local.toml" save_config_document( config_path, { "webhooks": {"enabled": True, "retry_max_attempts": 3, "retry_backoff_seconds": 30}, "layout": {"zone_ids": ["1"]}, }, ) retry_path = root / "logs" / "webhook_retry.jsonl" retry_path.parent.mkdir() retry_path.write_text( json.dumps( { "retry_id": "retry_000001", "target": "case_event", "url": "https://example.com/cases", "status": "pending", "attempt_count": 1, "payload": {"kind": "case_event", "case_id": "case_batch_000001"}, "created_at": "2026-06-09T09:00:00+08:00", "updated_at": "2026-06-09T09:00:00+08:00", "next_attempt_at": "2026-06-09T09:01:00+08:00", } ), encoding="utf-8", ) ctx = ManageContext(config_path=config_path, project_root=root) server, thread = self._serve_once(ctx) try: with mock.patch("cold_display_guard.webhooks.post_json", return_value=(200, "ok")): status, payload = self._request(server, "POST", "/api/manage/webhooks/retries/drain", body={}) finally: self._stop_server(server, thread) lines = [json.loads(line) for line in retry_path.read_text(encoding="utf-8").splitlines()] self.assertEqual(status, 200) self.assertEqual(payload["retried_count"], 1) self.assertEqual(payload["delivered_count"], 1) self.assertEqual(lines[-1]["status"], "delivered") self.assertEqual(lines[-1]["attempt_count"], 2) def test_callback_endpoint_requires_token_and_handles_case(self) -> None: with tempfile.TemporaryDirectory() as tmpdir: root = Path(tmpdir) config_path = root / "config" / "local.toml" save_config_document( config_path, { "case_sink": {"path": "logs/cases.jsonl"}, "webhooks": {"callback_token": "secret"}, "layout": {"zone_ids": ["1"]}, }, ) cases_path = root / "logs" / "cases.jsonl" cases_path.parent.mkdir() cases_path.write_text( json.dumps( { "case_id": "case_batch_000001", "batch_id": "batch_000001", "camera_id": "cam_01", "zone_id": "1", "zone_label": "区域 1", "case_type": "time_alarm", "case_status": "open", "source_event": "time_alarm", "created_at": "2026-06-09T09:00:00+08:00", "updated_at": "2026-06-09T09:00:00+08:00", "payload": {}, } ), encoding="utf-8", ) ctx = ManageContext(config_path=config_path, project_root=root) server, thread = self._serve_once(ctx) try: unauthorized_status, _ = self._request( server, "POST", "/api/manage/webhooks/case-update", body={"case_id": "case_batch_000001", "status": "handled"}, ) status, payload = self._request( server, "POST", "/api/manage/webhooks/case-update", body={"case_id": "case_batch_000001", "status": "handled", "handled_by": "crm-bot"}, headers={"X-Webhook-Token": "secret"}, ) finally: self._stop_server(server, thread) lines = [json.loads(line) for line in cases_path.read_text(encoding="utf-8").splitlines()] self.assertEqual(unauthorized_status, 403) self.assertEqual(status, 200) self.assertEqual(payload["handled_source"], "webhook_callback") self.assertEqual(lines[-1]["handled_source"], "webhook_callback") if __name__ == "__main__": unittest.main()