diff --git a/README_zh.md b/README_zh.md index 4d13053..b374548 100644 --- a/README_zh.md +++ b/README_zh.md @@ -200,6 +200,7 @@ path = "logs/webhook_retry.jsonl" enabled = true event_url = "https://example.com/runtime-events" case_url = "https://example.com/case-events" +source_id = "cold-display-guard" callback_token = "shared-secret" connect_timeout_seconds = 3 read_timeout_seconds = 5 @@ -223,6 +224,16 @@ retry_max_backoff_seconds = 1800 相关 webhook 字段: +- `event_code`:下游事件列表可直接使用的稳定编码,当前取批次 ID +- `camera_id` / `camera_ip`:来源设备和摄像头 IP +- `zone_id` / `zone_label`:所属区域 +- `started_at`:开始计时时间点 +- `ended_at` / `removed_at`:取出时间点 +- `dwell_seconds`:当前批次累计计时时长 +- `is_discarded` / `discarded_at`:是否已丢弃及丢弃时间点 +- `created_at`:该条外部事件记录的创建时间 +- `alerted_at` / `alarm_at`:时长告警时间点 +- `updated_at`:该条外部事件记录的最新更新时间 - `snapshot_upload_status`:`uploaded` 或 `error` - `snapshot_object_key`:上传成功后的 OSS 路径 - `snapshot_file_name`:上传文件名 diff --git a/config/example.toml b/config/example.toml index 8581d56..0b55c2a 100644 --- a/config/example.toml +++ b/config/example.toml @@ -77,6 +77,7 @@ path = "logs/webhook_delivery.jsonl" enabled = false event_url = "" case_url = "" +source_id = "" callback_token = "" connect_timeout_seconds = 3 read_timeout_seconds = 5 diff --git a/src/cold_display_guard/config.py b/src/cold_display_guard/config.py index 45b4c14..1de5a08 100644 --- a/src/cold_display_guard/config.py +++ b/src/cold_display_guard/config.py @@ -242,6 +242,7 @@ def format_config_document(data: dict[str, Any]) -> str: "retry_batch_limit", "retry_max_attempts", "retry_max_backoff_seconds", + "source_id", ): if key not in webhooks: continue diff --git a/src/cold_display_guard/webhooks.py b/src/cold_display_guard/webhooks.py index fbca134..fe693bc 100644 --- a/src/cold_display_guard/webhooks.py +++ b/src/cold_display_guard/webhooks.py @@ -7,6 +7,7 @@ from datetime import datetime, timedelta, timezone from pathlib import Path from typing import Any, Callable from urllib import request +from urllib.parse import urlsplit @dataclass(frozen=True, slots=True) @@ -14,6 +15,7 @@ class WebhookSettings: enabled: bool = False event_url: str = "" case_url: str = "" + source_id: str = "" callback_token: str = "" connect_timeout_seconds: float = 3.0 read_timeout_seconds: float = 5.0 @@ -123,6 +125,7 @@ def load_webhook_settings(config: dict[str, Any]) -> WebhookSettings: enabled=bool(payload.get("enabled", False)), event_url=str(payload.get("event_url", "")), case_url=str(payload.get("case_url", "")), + source_id=str(payload.get("source_id", "")), callback_token=str(payload.get("callback_token", "")), connect_timeout_seconds=float(payload.get("connect_timeout_seconds", 3.0)), read_timeout_seconds=float(payload.get("read_timeout_seconds", 5.0)), @@ -136,39 +139,81 @@ def load_webhook_settings(config: dict[str, Any]) -> WebhookSettings: def build_batch_event_payload( event: dict[str, object], *, + camera_ip: str = "", snapshot_upload: dict[str, object] | None = None, ) -> dict[str, object]: + batch_id = str(event.get("batch_id", "")) + event_name = str(event.get("event", "")) + ts = str(event.get("ts", "")) + alerted_at = str(event.get("alerted_at", "")) + ended_at = str(event.get("ended_at", "")) payload = { "kind": "batch_event", - "event": event.get("event", ""), - "ts": event.get("ts", ""), - "batch_id": event.get("batch_id", ""), + "event": event_name, + "event_code": batch_id, + "ts": ts, + "batch_id": batch_id, "camera_id": event.get("camera_id", ""), + "camera_ip": camera_ip, "zone_id": event.get("zone_id", ""), "zone_label": event.get("zone_label", ""), "severity": event.get("severity", ""), "state": event.get("state", ""), + "started_at": event.get("started_at", ""), + "ended_at": ended_at, + "removed_at": ended_at, + "dwell_seconds": event.get("dwell_seconds", ""), + "is_discarded": event_name == "batch_discarded", + "discarded_at": ts if event_name == "batch_discarded" else "", + "created_at": alerted_at or ts, + "alerted_at": alerted_at, + "alarm_at": alerted_at, + "updated_at": ts, } - return attach_snapshot_upload(payload, batch_id=str(event.get("batch_id", "")), snapshot_upload=snapshot_upload) + return attach_snapshot_upload(payload, batch_id=batch_id, snapshot_upload=snapshot_upload) def build_case_event_payload( snapshot: dict[str, object], *, + camera_ip: str = "", snapshot_upload: dict[str, object] | None = None, ) -> dict[str, object]: + batch_id = str(snapshot.get("batch_id", "")) + created_at = str(snapshot.get("created_at", "")) + updated_at = str(snapshot.get("updated_at", "")) + handled_at = str(snapshot.get("handled_at", "")) + handled_source = str(snapshot.get("handled_source", "")) + event = snapshot_event(snapshot) + alerted_at = str(event.get("alerted_at", "")) + ended_at = str(event.get("ended_at", "")) + discarded = handled_source == "auto_closed" payload = { "kind": "case_event", "action": infer_case_action(snapshot), "case_id": snapshot.get("case_id", ""), + "event_code": batch_id or snapshot.get("case_id", ""), "case_type": snapshot.get("case_type", ""), "case_status": snapshot.get("case_status", ""), - "batch_id": snapshot.get("batch_id", ""), + "batch_id": batch_id, + "camera_id": snapshot.get("camera_id", ""), + "camera_ip": camera_ip, + "zone_id": snapshot.get("zone_id", ""), + "zone_label": snapshot.get("zone_label", ""), "source_event": snapshot.get("source_event", ""), - "handled_source": snapshot.get("handled_source", ""), - "updated_at": snapshot.get("updated_at", ""), + "handled_source": handled_source, + "started_at": event.get("started_at", ""), + "ended_at": ended_at, + "removed_at": ended_at, + "dwell_seconds": event.get("dwell_seconds", ""), + "is_discarded": discarded, + "discarded_at": handled_at if discarded else "", + "created_at": alerted_at or created_at, + "alerted_at": alerted_at, + "alarm_at": alerted_at, + "updated_at": updated_at, } - return attach_snapshot_upload(payload, batch_id=str(snapshot.get("batch_id", "")), snapshot_upload=snapshot_upload) + return attach_snapshot_upload(payload, batch_id=batch_id, snapshot_upload=snapshot_upload) def infer_case_action(snapshot: dict[str, object]) -> str: @@ -192,12 +237,15 @@ def send_batch_event_webhooks( settings = load_webhook_settings(config) if not settings.enabled or not settings.event_url: return [] + camera_ip = infer_camera_ip(config) attempted_at = now or datetime.now(timezone.utc) deliveries: list[dict[str, object]] = [] retry_updates: list[dict[str, object]] = [] store = load_retry_store(retry_path) if retry_path is not None else None for event in events: - payload = build_batch_event_payload(event, snapshot_upload=snapshot_upload) + payload = build_batch_event_payload(event, camera_ip=camera_ip, snapshot_upload=snapshot_upload) + if settings.source_id: + payload["source_id"] = settings.source_id record = deliver_webhook( settings.event_url, payload, @@ -239,12 +287,15 @@ def send_case_webhooks( settings = load_webhook_settings(config) if not settings.enabled or not settings.case_url: return [] + camera_ip = infer_camera_ip(config) attempted_at = now or datetime.now(timezone.utc) deliveries: list[dict[str, object]] = [] retry_updates: list[dict[str, object]] = [] store = load_retry_store(retry_path) if retry_path is not None else None for snapshot in snapshots: - payload = build_case_event_payload(snapshot, snapshot_upload=snapshot_upload) + payload = build_case_event_payload(snapshot, camera_ip=camera_ip, snapshot_upload=snapshot_upload) + if settings.source_id: + payload["source_id"] = settings.source_id record = deliver_webhook( settings.case_url, payload, @@ -467,6 +518,29 @@ def optional_int(value: object) -> int | None: return None +def infer_camera_ip(config: dict[str, Any]) -> str: + stream = config.get("stream", {}) + if not isinstance(stream, dict): + return "" + rtsp_url = str(stream.get("rtsp_url", "")).strip() + if not rtsp_url: + return "" + try: + return urlsplit(rtsp_url).hostname or "" + except ValueError: + return "" + + +def snapshot_event(snapshot: dict[str, object]) -> dict[str, object]: + payload = snapshot.get("payload", {}) + if not isinstance(payload, dict): + return {} + event = payload.get("event", {}) + if not isinstance(event, dict): + return {} + return event + + def attach_snapshot_upload( payload: dict[str, object], *, diff --git a/tasks/lessons.md b/tasks/lessons.md new file mode 100644 index 0000000..7090f33 --- /dev/null +++ b/tasks/lessons.md @@ -0,0 +1,7 @@ +# Lessons + +- 2026-06-10: 远端接收端路由不能只凭已有相似服务或历史路径推断,必须先对用户指定的精确路径做真实 HTTP 探测,再决定配置值。 + Prevention: + 1. 对每个用户指定的 Webhook 路径,先在目标主机上用与真实请求接近的 `POST` 探测并记录状态码。 + 2. 如果存在多个相似路径,只能在验证过用户指定路径不可用后,才考虑回退到其它路径。 + 3. 切换远端配置前,先确认发送端容器对目标主机名或 IP 实际可达,避免写入不可解析的地址。 diff --git a/tasks/todo.md b/tasks/todo.md index 76393ef..27ce5eb 100644 --- a/tasks/todo.md +++ b/tasks/todo.md @@ -28,3 +28,39 @@ - Final verification passed: - `eval "$(/opt/homebrew/bin/pyenv init -)" && PYTHONPATH=src python -m unittest discover -s tests -v` - `cd web && pnpm install --frozen-lockfile && pnpm build` + +## Current Task: Webhook Payload Field Gap Check + +- [x] Pull the actual payload currently received by `video-recognition` and compare it against the required event list fields. +- [x] Patch webhook payload builders to include the missing non-store fields required by the downstream table. +- [x] Add or update focused webhook tests for the enriched payload shape. +- [x] Run targeted verification and record the result here. + +### Current Findings + +- Current received payload only includes `batch_id`, `camera_id`, `event`, `kind`, `severity`, `source_id`, `state`, `ts`, `zone_id`, and `zone_label`. +- Missing or not explicitly populated for the downstream event table: event code, camera IP, batch start time, removal time, dwell duration, discard flag, discard time, create time, alarm time, and update time. + +### Field Gap Verification + +- Actual receiver payload before the fix, from `video-recognition` result JSONL on `10.8.0.11`, confirmed only the base fields above and did not include the downstream table time/discard/IP fields. +- Updated `src/cold_display_guard/webhooks.py` so both `batch_event` and `case_event` now include: + - `event_code` + - `camera_ip` + - `started_at` + - `ended_at` + - `removed_at` + - `dwell_seconds` + - `is_discarded` + - `discarded_at` + - `created_at` + - `alerted_at` + - `alarm_at` + - `updated_at` +- `case_event` also now carries the missing contextual fields `camera_id`, `zone_id`, and `zone_label`. +- Verification passed: + - `PYTHONPATH=src python3 -m unittest tests/test_webhooks.py -v` + - `PYTHONPATH=src python3 -m unittest tests/test_main.py -v` + - `PYTHONPATH=src python3 -m unittest discover -s tests -v` +- Deployed updated code to `xiaozheng@10.8.0.11` without overwriting the remote `config/example.toml`, rebuilt `cold-display-guard:dev`, and restarted only `cold-display-guard-api` plus `cold-display-guard-runtime`. +- Natural post-deploy traffic did not arrive during the 2-minute observation window, so final runtime verification used the deployed container to build representative batch/case webhook payloads with the live remote config and confirmed `camera_ip = 192.168.3.4` plus all new downstream fields were present. diff --git a/tests/test_config.py b/tests/test_config.py index e66d709..be8a114 100644 --- a/tests/test_config.py +++ b/tests/test_config.py @@ -138,6 +138,7 @@ zone_ids = ["1", "2", "3"] "enabled": True, "event_url": "https://example.com/events", "case_url": "https://example.com/cases", + "source_id": "cold-display-guard", "callback_token": "secret", "connect_timeout_seconds": 3, "read_timeout_seconds": 5, @@ -159,6 +160,7 @@ zone_ids = ["1", "2", "3"] self.assertIn("[webhooks]", text) self.assertIn('event_url = "https://example.com/events"', text) self.assertIn('case_url = "https://example.com/cases"', text) + self.assertIn('source_id = "cold-display-guard"', text) self.assertIn('callback_token = "secret"', text) self.assertIn("retry_max_attempts = 4", text) self.assertIn("retry_backoff_seconds = 30", text) diff --git a/tests/test_webhooks.py b/tests/test_webhooks.py index 1a89411..388836d 100644 --- a/tests/test_webhooks.py +++ b/tests/test_webhooks.py @@ -28,6 +28,7 @@ class WebhookTests(unittest.TestCase): "enabled": True, "event_url": "https://example.com/events", "case_url": "https://example.com/cases", + "source_id": "cold-display-guard", "callback_token": "secret", "connect_timeout_seconds": 4, "read_timeout_seconds": 6, @@ -42,6 +43,7 @@ class WebhookTests(unittest.TestCase): self.assertTrue(settings.enabled) self.assertEqual(settings.event_url, "https://example.com/events") self.assertEqual(settings.case_url, "https://example.com/cases") + self.assertEqual(settings.source_id, "cold-display-guard") self.assertEqual(settings.callback_token, "secret") self.assertEqual(settings.connect_timeout_seconds, 4) self.assertEqual(settings.read_timeout_seconds, 6) @@ -61,12 +63,22 @@ class WebhookTests(unittest.TestCase): "zone_label": "区域 1", "severity": "alarm", "state": "alerted", - } + "started_at": datetime(2026, 6, 9, 8, 40, tzinfo=UTC).isoformat(), + "dwell_seconds": 1200, + "alerted_at": datetime(2026, 6, 9, 9, 0, tzinfo=UTC).isoformat(), + }, + camera_ip="192.168.3.4", ) self.assertEqual(payload["kind"], "batch_event") self.assertEqual(payload["event"], "time_alarm") + self.assertEqual(payload["event_code"], "batch_000001") + self.assertEqual(payload["camera_ip"], "192.168.3.4") self.assertEqual(payload["zone_label"], "区域 1") + self.assertEqual(payload["started_at"], datetime(2026, 6, 9, 8, 40, tzinfo=UTC).isoformat()) + self.assertEqual(payload["dwell_seconds"], 1200) + self.assertFalse(payload["is_discarded"]) + self.assertEqual(payload["alerted_at"], datetime(2026, 6, 9, 9, 0, tzinfo=UTC).isoformat()) def test_build_batch_event_payload_includes_uploaded_snapshot_path(self) -> None: payload = build_batch_event_payload( @@ -91,17 +103,41 @@ class WebhookTests(unittest.TestCase): { "case_id": "case_batch_000001", "case_type": "warning_escalated", - "case_status": "open", + "case_status": "handled", "batch_id": "batch_000001", + "camera_id": "cam_01", + "zone_id": "1", + "zone_label": "区域 1", "source_event": "warning_escalated", - "handled_source": "", + "created_at": datetime(2026, 6, 9, 9, 0, tzinfo=UTC).isoformat(), + "handled_at": datetime(2026, 6, 9, 9, 6, tzinfo=UTC).isoformat(), + "handled_source": "auto_closed", "updated_at": datetime(2026, 6, 9, 9, 5, tzinfo=UTC).isoformat(), - } + "payload": { + "event": { + "started_at": datetime(2026, 6, 9, 8, 40, tzinfo=UTC).isoformat(), + "ended_at": datetime(2026, 6, 9, 9, 4, tzinfo=UTC).isoformat(), + "dwell_seconds": 1440, + "alerted_at": datetime(2026, 6, 9, 9, 0, tzinfo=UTC).isoformat(), + } + }, + }, + camera_ip="192.168.3.4", ) self.assertEqual(payload["kind"], "case_event") - self.assertEqual(payload["action"], "updated") + self.assertEqual(payload["action"], "handled") self.assertEqual(payload["case_id"], "case_batch_000001") + self.assertEqual(payload["event_code"], "batch_000001") + self.assertEqual(payload["camera_id"], "cam_01") + self.assertEqual(payload["camera_ip"], "192.168.3.4") + self.assertEqual(payload["zone_label"], "区域 1") + self.assertEqual(payload["started_at"], datetime(2026, 6, 9, 8, 40, tzinfo=UTC).isoformat()) + self.assertEqual(payload["ended_at"], datetime(2026, 6, 9, 9, 4, tzinfo=UTC).isoformat()) + self.assertEqual(payload["dwell_seconds"], 1440) + self.assertTrue(payload["is_discarded"]) + self.assertEqual(payload["discarded_at"], datetime(2026, 6, 9, 9, 6, tzinfo=UTC).isoformat()) + self.assertEqual(payload["alerted_at"], datetime(2026, 6, 9, 9, 0, tzinfo=UTC).isoformat()) def test_build_case_event_payload_includes_uploaded_snapshot_path(self) -> None: payload = build_case_event_payload( @@ -146,9 +182,11 @@ class WebhookTests(unittest.TestCase): "webhooks": { "enabled": True, "event_url": "https://example.com/events", + "source_id": "cold-display-guard", "connect_timeout_seconds": 4, "read_timeout_seconds": 6, - } + }, + "stream": {"rtsp_url": "rtsp://admin:secret@192.168.3.4:554/h264/ch1/main/av_stream"}, }, audit_path, http_post=fake_post, @@ -156,6 +194,8 @@ class WebhookTests(unittest.TestCase): self.assertEqual(deliveries[0][0], "https://example.com/events") self.assertEqual(deliveries[0][1]["kind"], "batch_event") + self.assertEqual(deliveries[0][1]["camera_ip"], "192.168.3.4") + self.assertEqual(deliveries[0][1]["source_id"], "cold-display-guard") self.assertEqual(deliveries[0][2], (4.0, 6.0)) def test_send_case_webhooks_delivers_payload(self) -> None: @@ -183,7 +223,9 @@ class WebhookTests(unittest.TestCase): "webhooks": { "enabled": True, "case_url": "https://example.com/cases", - } + "source_id": "cold-display-guard", + }, + "stream": {"rtsp_url": "rtsp://admin:secret@192.168.3.4:554/h264/ch1/main/av_stream"}, }, audit_path, http_post=fake_post, @@ -192,6 +234,8 @@ class WebhookTests(unittest.TestCase): self.assertEqual(deliveries[0][0], "https://example.com/cases") self.assertEqual(deliveries[0][1]["kind"], "case_event") self.assertEqual(deliveries[0][1]["action"], "handled") + self.assertEqual(deliveries[0][1]["camera_ip"], "192.168.3.4") + self.assertEqual(deliveries[0][1]["source_id"], "cold-display-guard") def test_failed_delivery_is_logged_without_raising(self) -> None: def fake_post(url: str, payload: dict[str, object], timeout: tuple[float, float]) -> tuple[int, str]: