feat: enrich webhook payloads with downstream event table fields

Add missing fields (event_code, camera_ip, started_at, ended_at,
dwell_seconds, is_discarded, alerted_at, etc.) to both batch_event
and case_event payloads. Introduce source_id config for payload
injection and infer_camera_ip to extract IP from RTSP stream URL.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-06-10 17:04:58 +08:00
parent e919ffd561
commit 45e2cf70f7
8 changed files with 193 additions and 17 deletions

View File

@@ -200,6 +200,7 @@ path = "logs/webhook_retry.jsonl"
enabled = true enabled = true
event_url = "https://example.com/runtime-events" event_url = "https://example.com/runtime-events"
case_url = "https://example.com/case-events" case_url = "https://example.com/case-events"
source_id = "cold-display-guard"
callback_token = "shared-secret" callback_token = "shared-secret"
connect_timeout_seconds = 3 connect_timeout_seconds = 3
read_timeout_seconds = 5 read_timeout_seconds = 5
@@ -223,6 +224,16 @@ retry_max_backoff_seconds = 1800
相关 webhook 字段: 相关 webhook 字段:
- `event_code`:下游事件列表可直接使用的稳定编码,当前取批次 ID
- `camera_id` / `camera_ip`:来源设备和摄像头 IP
- `zone_id` / `zone_label`:所属区域
- `started_at`:开始计时时间点
- `ended_at` / `removed_at`:取出时间点
- `dwell_seconds`:当前批次累计计时时长
- `is_discarded` / `discarded_at`:是否已丢弃及丢弃时间点
- `created_at`:该条外部事件记录的创建时间
- `alerted_at` / `alarm_at`:时长告警时间点
- `updated_at`:该条外部事件记录的最新更新时间
- `snapshot_upload_status``uploaded``error` - `snapshot_upload_status``uploaded``error`
- `snapshot_object_key`:上传成功后的 OSS 路径 - `snapshot_object_key`:上传成功后的 OSS 路径
- `snapshot_file_name`:上传文件名 - `snapshot_file_name`:上传文件名

View File

@@ -77,6 +77,7 @@ path = "logs/webhook_delivery.jsonl"
enabled = false enabled = false
event_url = "" event_url = ""
case_url = "" case_url = ""
source_id = ""
callback_token = "" callback_token = ""
connect_timeout_seconds = 3 connect_timeout_seconds = 3
read_timeout_seconds = 5 read_timeout_seconds = 5

View File

@@ -242,6 +242,7 @@ def format_config_document(data: dict[str, Any]) -> str:
"retry_batch_limit", "retry_batch_limit",
"retry_max_attempts", "retry_max_attempts",
"retry_max_backoff_seconds", "retry_max_backoff_seconds",
"source_id",
): ):
if key not in webhooks: if key not in webhooks:
continue continue

View File

@@ -7,6 +7,7 @@ from datetime import datetime, timedelta, timezone
from pathlib import Path from pathlib import Path
from typing import Any, Callable from typing import Any, Callable
from urllib import request from urllib import request
from urllib.parse import urlsplit
@dataclass(frozen=True, slots=True) @dataclass(frozen=True, slots=True)
@@ -14,6 +15,7 @@ class WebhookSettings:
enabled: bool = False enabled: bool = False
event_url: str = "" event_url: str = ""
case_url: str = "" case_url: str = ""
source_id: str = ""
callback_token: str = "" callback_token: str = ""
connect_timeout_seconds: float = 3.0 connect_timeout_seconds: float = 3.0
read_timeout_seconds: float = 5.0 read_timeout_seconds: float = 5.0
@@ -123,6 +125,7 @@ def load_webhook_settings(config: dict[str, Any]) -> WebhookSettings:
enabled=bool(payload.get("enabled", False)), enabled=bool(payload.get("enabled", False)),
event_url=str(payload.get("event_url", "")), event_url=str(payload.get("event_url", "")),
case_url=str(payload.get("case_url", "")), case_url=str(payload.get("case_url", "")),
source_id=str(payload.get("source_id", "")),
callback_token=str(payload.get("callback_token", "")), callback_token=str(payload.get("callback_token", "")),
connect_timeout_seconds=float(payload.get("connect_timeout_seconds", 3.0)), connect_timeout_seconds=float(payload.get("connect_timeout_seconds", 3.0)),
read_timeout_seconds=float(payload.get("read_timeout_seconds", 5.0)), read_timeout_seconds=float(payload.get("read_timeout_seconds", 5.0)),
@@ -136,39 +139,81 @@ def load_webhook_settings(config: dict[str, Any]) -> WebhookSettings:
def build_batch_event_payload( def build_batch_event_payload(
event: dict[str, object], event: dict[str, object],
*, *,
camera_ip: str = "",
snapshot_upload: dict[str, object] | None = None, snapshot_upload: dict[str, object] | None = None,
) -> dict[str, object]: ) -> dict[str, object]:
batch_id = str(event.get("batch_id", ""))
event_name = str(event.get("event", ""))
ts = str(event.get("ts", ""))
alerted_at = str(event.get("alerted_at", ""))
ended_at = str(event.get("ended_at", ""))
payload = { payload = {
"kind": "batch_event", "kind": "batch_event",
"event": event.get("event", ""), "event": event_name,
"ts": event.get("ts", ""), "event_code": batch_id,
"batch_id": event.get("batch_id", ""), "ts": ts,
"batch_id": batch_id,
"camera_id": event.get("camera_id", ""), "camera_id": event.get("camera_id", ""),
"camera_ip": camera_ip,
"zone_id": event.get("zone_id", ""), "zone_id": event.get("zone_id", ""),
"zone_label": event.get("zone_label", ""), "zone_label": event.get("zone_label", ""),
"severity": event.get("severity", ""), "severity": event.get("severity", ""),
"state": event.get("state", ""), "state": event.get("state", ""),
"started_at": event.get("started_at", ""),
"ended_at": ended_at,
"removed_at": ended_at,
"dwell_seconds": event.get("dwell_seconds", ""),
"is_discarded": event_name == "batch_discarded",
"discarded_at": ts if event_name == "batch_discarded" else "",
"created_at": alerted_at or ts,
"alerted_at": alerted_at,
"alarm_at": alerted_at,
"updated_at": ts,
} }
return attach_snapshot_upload(payload, batch_id=str(event.get("batch_id", "")), snapshot_upload=snapshot_upload) return attach_snapshot_upload(payload, batch_id=batch_id, snapshot_upload=snapshot_upload)
def build_case_event_payload( def build_case_event_payload(
snapshot: dict[str, object], snapshot: dict[str, object],
*, *,
camera_ip: str = "",
snapshot_upload: dict[str, object] | None = None, snapshot_upload: dict[str, object] | None = None,
) -> dict[str, object]: ) -> dict[str, object]:
batch_id = str(snapshot.get("batch_id", ""))
created_at = str(snapshot.get("created_at", ""))
updated_at = str(snapshot.get("updated_at", ""))
handled_at = str(snapshot.get("handled_at", ""))
handled_source = str(snapshot.get("handled_source", ""))
event = snapshot_event(snapshot)
alerted_at = str(event.get("alerted_at", ""))
ended_at = str(event.get("ended_at", ""))
discarded = handled_source == "auto_closed"
payload = { payload = {
"kind": "case_event", "kind": "case_event",
"action": infer_case_action(snapshot), "action": infer_case_action(snapshot),
"case_id": snapshot.get("case_id", ""), "case_id": snapshot.get("case_id", ""),
"event_code": batch_id or snapshot.get("case_id", ""),
"case_type": snapshot.get("case_type", ""), "case_type": snapshot.get("case_type", ""),
"case_status": snapshot.get("case_status", ""), "case_status": snapshot.get("case_status", ""),
"batch_id": snapshot.get("batch_id", ""), "batch_id": batch_id,
"camera_id": snapshot.get("camera_id", ""),
"camera_ip": camera_ip,
"zone_id": snapshot.get("zone_id", ""),
"zone_label": snapshot.get("zone_label", ""),
"source_event": snapshot.get("source_event", ""), "source_event": snapshot.get("source_event", ""),
"handled_source": snapshot.get("handled_source", ""), "handled_source": handled_source,
"updated_at": snapshot.get("updated_at", ""), "started_at": event.get("started_at", ""),
"ended_at": ended_at,
"removed_at": ended_at,
"dwell_seconds": event.get("dwell_seconds", ""),
"is_discarded": discarded,
"discarded_at": handled_at if discarded else "",
"created_at": alerted_at or created_at,
"alerted_at": alerted_at,
"alarm_at": alerted_at,
"updated_at": updated_at,
} }
return attach_snapshot_upload(payload, batch_id=str(snapshot.get("batch_id", "")), snapshot_upload=snapshot_upload) return attach_snapshot_upload(payload, batch_id=batch_id, snapshot_upload=snapshot_upload)
def infer_case_action(snapshot: dict[str, object]) -> str: def infer_case_action(snapshot: dict[str, object]) -> str:
@@ -192,12 +237,15 @@ def send_batch_event_webhooks(
settings = load_webhook_settings(config) settings = load_webhook_settings(config)
if not settings.enabled or not settings.event_url: if not settings.enabled or not settings.event_url:
return [] return []
camera_ip = infer_camera_ip(config)
attempted_at = now or datetime.now(timezone.utc) attempted_at = now or datetime.now(timezone.utc)
deliveries: list[dict[str, object]] = [] deliveries: list[dict[str, object]] = []
retry_updates: list[dict[str, object]] = [] retry_updates: list[dict[str, object]] = []
store = load_retry_store(retry_path) if retry_path is not None else None store = load_retry_store(retry_path) if retry_path is not None else None
for event in events: for event in events:
payload = build_batch_event_payload(event, snapshot_upload=snapshot_upload) payload = build_batch_event_payload(event, camera_ip=camera_ip, snapshot_upload=snapshot_upload)
if settings.source_id:
payload["source_id"] = settings.source_id
record = deliver_webhook( record = deliver_webhook(
settings.event_url, settings.event_url,
payload, payload,
@@ -239,12 +287,15 @@ def send_case_webhooks(
settings = load_webhook_settings(config) settings = load_webhook_settings(config)
if not settings.enabled or not settings.case_url: if not settings.enabled or not settings.case_url:
return [] return []
camera_ip = infer_camera_ip(config)
attempted_at = now or datetime.now(timezone.utc) attempted_at = now or datetime.now(timezone.utc)
deliveries: list[dict[str, object]] = [] deliveries: list[dict[str, object]] = []
retry_updates: list[dict[str, object]] = [] retry_updates: list[dict[str, object]] = []
store = load_retry_store(retry_path) if retry_path is not None else None store = load_retry_store(retry_path) if retry_path is not None else None
for snapshot in snapshots: for snapshot in snapshots:
payload = build_case_event_payload(snapshot, snapshot_upload=snapshot_upload) payload = build_case_event_payload(snapshot, camera_ip=camera_ip, snapshot_upload=snapshot_upload)
if settings.source_id:
payload["source_id"] = settings.source_id
record = deliver_webhook( record = deliver_webhook(
settings.case_url, settings.case_url,
payload, payload,
@@ -467,6 +518,29 @@ def optional_int(value: object) -> int | None:
return None return None
def infer_camera_ip(config: dict[str, Any]) -> str:
stream = config.get("stream", {})
if not isinstance(stream, dict):
return ""
rtsp_url = str(stream.get("rtsp_url", "")).strip()
if not rtsp_url:
return ""
try:
return urlsplit(rtsp_url).hostname or ""
except ValueError:
return ""
def snapshot_event(snapshot: dict[str, object]) -> dict[str, object]:
payload = snapshot.get("payload", {})
if not isinstance(payload, dict):
return {}
event = payload.get("event", {})
if not isinstance(event, dict):
return {}
return event
def attach_snapshot_upload( def attach_snapshot_upload(
payload: dict[str, object], payload: dict[str, object],
*, *,

7
tasks/lessons.md Normal file
View File

@@ -0,0 +1,7 @@
# Lessons
- 2026-06-10: 远端接收端路由不能只凭已有相似服务或历史路径推断,必须先对用户指定的精确路径做真实 HTTP 探测,再决定配置值。
Prevention:
1. 对每个用户指定的 Webhook 路径,先在目标主机上用与真实请求接近的 `POST` 探测并记录状态码。
2. 如果存在多个相似路径,只能在验证过用户指定路径不可用后,才考虑回退到其它路径。
3. 切换远端配置前,先确认发送端容器对目标主机名或 IP 实际可达,避免写入不可解析的地址。

View File

@@ -28,3 +28,39 @@
- Final verification passed: - Final verification passed:
- `eval "$(/opt/homebrew/bin/pyenv init -)" && PYTHONPATH=src python -m unittest discover -s tests -v` - `eval "$(/opt/homebrew/bin/pyenv init -)" && PYTHONPATH=src python -m unittest discover -s tests -v`
- `cd web && pnpm install --frozen-lockfile && pnpm build` - `cd web && pnpm install --frozen-lockfile && pnpm build`
## Current Task: Webhook Payload Field Gap Check
- [x] Pull the actual payload currently received by `video-recognition` and compare it against the required event list fields.
- [x] Patch webhook payload builders to include the missing non-store fields required by the downstream table.
- [x] Add or update focused webhook tests for the enriched payload shape.
- [x] Run targeted verification and record the result here.
### Current Findings
- Current received payload only includes `batch_id`, `camera_id`, `event`, `kind`, `severity`, `source_id`, `state`, `ts`, `zone_id`, and `zone_label`.
- Missing or not explicitly populated for the downstream event table: event code, camera IP, batch start time, removal time, dwell duration, discard flag, discard time, create time, alarm time, and update time.
### Field Gap Verification
- Actual receiver payload before the fix, from `video-recognition` result JSONL on `10.8.0.11`, confirmed only the base fields above and did not include the downstream table time/discard/IP fields.
- Updated `src/cold_display_guard/webhooks.py` so both `batch_event` and `case_event` now include:
- `event_code`
- `camera_ip`
- `started_at`
- `ended_at`
- `removed_at`
- `dwell_seconds`
- `is_discarded`
- `discarded_at`
- `created_at`
- `alerted_at`
- `alarm_at`
- `updated_at`
- `case_event` also now carries the missing contextual fields `camera_id`, `zone_id`, and `zone_label`.
- Verification passed:
- `PYTHONPATH=src python3 -m unittest tests/test_webhooks.py -v`
- `PYTHONPATH=src python3 -m unittest tests/test_main.py -v`
- `PYTHONPATH=src python3 -m unittest discover -s tests -v`
- Deployed updated code to `xiaozheng@10.8.0.11` without overwriting the remote `config/example.toml`, rebuilt `cold-display-guard:dev`, and restarted only `cold-display-guard-api` plus `cold-display-guard-runtime`.
- Natural post-deploy traffic did not arrive during the 2-minute observation window, so final runtime verification used the deployed container to build representative batch/case webhook payloads with the live remote config and confirmed `camera_ip = 192.168.3.4` plus all new downstream fields were present.

View File

@@ -138,6 +138,7 @@ zone_ids = ["1", "2", "3"]
"enabled": True, "enabled": True,
"event_url": "https://example.com/events", "event_url": "https://example.com/events",
"case_url": "https://example.com/cases", "case_url": "https://example.com/cases",
"source_id": "cold-display-guard",
"callback_token": "secret", "callback_token": "secret",
"connect_timeout_seconds": 3, "connect_timeout_seconds": 3,
"read_timeout_seconds": 5, "read_timeout_seconds": 5,
@@ -159,6 +160,7 @@ zone_ids = ["1", "2", "3"]
self.assertIn("[webhooks]", text) self.assertIn("[webhooks]", text)
self.assertIn('event_url = "https://example.com/events"', text) self.assertIn('event_url = "https://example.com/events"', text)
self.assertIn('case_url = "https://example.com/cases"', text) self.assertIn('case_url = "https://example.com/cases"', text)
self.assertIn('source_id = "cold-display-guard"', text)
self.assertIn('callback_token = "secret"', text) self.assertIn('callback_token = "secret"', text)
self.assertIn("retry_max_attempts = 4", text) self.assertIn("retry_max_attempts = 4", text)
self.assertIn("retry_backoff_seconds = 30", text) self.assertIn("retry_backoff_seconds = 30", text)

View File

@@ -28,6 +28,7 @@ class WebhookTests(unittest.TestCase):
"enabled": True, "enabled": True,
"event_url": "https://example.com/events", "event_url": "https://example.com/events",
"case_url": "https://example.com/cases", "case_url": "https://example.com/cases",
"source_id": "cold-display-guard",
"callback_token": "secret", "callback_token": "secret",
"connect_timeout_seconds": 4, "connect_timeout_seconds": 4,
"read_timeout_seconds": 6, "read_timeout_seconds": 6,
@@ -42,6 +43,7 @@ class WebhookTests(unittest.TestCase):
self.assertTrue(settings.enabled) self.assertTrue(settings.enabled)
self.assertEqual(settings.event_url, "https://example.com/events") self.assertEqual(settings.event_url, "https://example.com/events")
self.assertEqual(settings.case_url, "https://example.com/cases") self.assertEqual(settings.case_url, "https://example.com/cases")
self.assertEqual(settings.source_id, "cold-display-guard")
self.assertEqual(settings.callback_token, "secret") self.assertEqual(settings.callback_token, "secret")
self.assertEqual(settings.connect_timeout_seconds, 4) self.assertEqual(settings.connect_timeout_seconds, 4)
self.assertEqual(settings.read_timeout_seconds, 6) self.assertEqual(settings.read_timeout_seconds, 6)
@@ -61,12 +63,22 @@ class WebhookTests(unittest.TestCase):
"zone_label": "区域 1", "zone_label": "区域 1",
"severity": "alarm", "severity": "alarm",
"state": "alerted", "state": "alerted",
} "started_at": datetime(2026, 6, 9, 8, 40, tzinfo=UTC).isoformat(),
"dwell_seconds": 1200,
"alerted_at": datetime(2026, 6, 9, 9, 0, tzinfo=UTC).isoformat(),
},
camera_ip="192.168.3.4",
) )
self.assertEqual(payload["kind"], "batch_event") self.assertEqual(payload["kind"], "batch_event")
self.assertEqual(payload["event"], "time_alarm") self.assertEqual(payload["event"], "time_alarm")
self.assertEqual(payload["event_code"], "batch_000001")
self.assertEqual(payload["camera_ip"], "192.168.3.4")
self.assertEqual(payload["zone_label"], "区域 1") self.assertEqual(payload["zone_label"], "区域 1")
self.assertEqual(payload["started_at"], datetime(2026, 6, 9, 8, 40, tzinfo=UTC).isoformat())
self.assertEqual(payload["dwell_seconds"], 1200)
self.assertFalse(payload["is_discarded"])
self.assertEqual(payload["alerted_at"], datetime(2026, 6, 9, 9, 0, tzinfo=UTC).isoformat())
def test_build_batch_event_payload_includes_uploaded_snapshot_path(self) -> None: def test_build_batch_event_payload_includes_uploaded_snapshot_path(self) -> None:
payload = build_batch_event_payload( payload = build_batch_event_payload(
@@ -91,17 +103,41 @@ class WebhookTests(unittest.TestCase):
{ {
"case_id": "case_batch_000001", "case_id": "case_batch_000001",
"case_type": "warning_escalated", "case_type": "warning_escalated",
"case_status": "open", "case_status": "handled",
"batch_id": "batch_000001", "batch_id": "batch_000001",
"camera_id": "cam_01",
"zone_id": "1",
"zone_label": "区域 1",
"source_event": "warning_escalated", "source_event": "warning_escalated",
"handled_source": "", "created_at": datetime(2026, 6, 9, 9, 0, tzinfo=UTC).isoformat(),
"handled_at": datetime(2026, 6, 9, 9, 6, tzinfo=UTC).isoformat(),
"handled_source": "auto_closed",
"updated_at": datetime(2026, 6, 9, 9, 5, tzinfo=UTC).isoformat(), "updated_at": datetime(2026, 6, 9, 9, 5, tzinfo=UTC).isoformat(),
"payload": {
"event": {
"started_at": datetime(2026, 6, 9, 8, 40, tzinfo=UTC).isoformat(),
"ended_at": datetime(2026, 6, 9, 9, 4, tzinfo=UTC).isoformat(),
"dwell_seconds": 1440,
"alerted_at": datetime(2026, 6, 9, 9, 0, tzinfo=UTC).isoformat(),
} }
},
},
camera_ip="192.168.3.4",
) )
self.assertEqual(payload["kind"], "case_event") self.assertEqual(payload["kind"], "case_event")
self.assertEqual(payload["action"], "updated") self.assertEqual(payload["action"], "handled")
self.assertEqual(payload["case_id"], "case_batch_000001") self.assertEqual(payload["case_id"], "case_batch_000001")
self.assertEqual(payload["event_code"], "batch_000001")
self.assertEqual(payload["camera_id"], "cam_01")
self.assertEqual(payload["camera_ip"], "192.168.3.4")
self.assertEqual(payload["zone_label"], "区域 1")
self.assertEqual(payload["started_at"], datetime(2026, 6, 9, 8, 40, tzinfo=UTC).isoformat())
self.assertEqual(payload["ended_at"], datetime(2026, 6, 9, 9, 4, tzinfo=UTC).isoformat())
self.assertEqual(payload["dwell_seconds"], 1440)
self.assertTrue(payload["is_discarded"])
self.assertEqual(payload["discarded_at"], datetime(2026, 6, 9, 9, 6, tzinfo=UTC).isoformat())
self.assertEqual(payload["alerted_at"], datetime(2026, 6, 9, 9, 0, tzinfo=UTC).isoformat())
def test_build_case_event_payload_includes_uploaded_snapshot_path(self) -> None: def test_build_case_event_payload_includes_uploaded_snapshot_path(self) -> None:
payload = build_case_event_payload( payload = build_case_event_payload(
@@ -146,9 +182,11 @@ class WebhookTests(unittest.TestCase):
"webhooks": { "webhooks": {
"enabled": True, "enabled": True,
"event_url": "https://example.com/events", "event_url": "https://example.com/events",
"source_id": "cold-display-guard",
"connect_timeout_seconds": 4, "connect_timeout_seconds": 4,
"read_timeout_seconds": 6, "read_timeout_seconds": 6,
} },
"stream": {"rtsp_url": "rtsp://admin:secret@192.168.3.4:554/h264/ch1/main/av_stream"},
}, },
audit_path, audit_path,
http_post=fake_post, http_post=fake_post,
@@ -156,6 +194,8 @@ class WebhookTests(unittest.TestCase):
self.assertEqual(deliveries[0][0], "https://example.com/events") self.assertEqual(deliveries[0][0], "https://example.com/events")
self.assertEqual(deliveries[0][1]["kind"], "batch_event") self.assertEqual(deliveries[0][1]["kind"], "batch_event")
self.assertEqual(deliveries[0][1]["camera_ip"], "192.168.3.4")
self.assertEqual(deliveries[0][1]["source_id"], "cold-display-guard")
self.assertEqual(deliveries[0][2], (4.0, 6.0)) self.assertEqual(deliveries[0][2], (4.0, 6.0))
def test_send_case_webhooks_delivers_payload(self) -> None: def test_send_case_webhooks_delivers_payload(self) -> None:
@@ -183,7 +223,9 @@ class WebhookTests(unittest.TestCase):
"webhooks": { "webhooks": {
"enabled": True, "enabled": True,
"case_url": "https://example.com/cases", "case_url": "https://example.com/cases",
} "source_id": "cold-display-guard",
},
"stream": {"rtsp_url": "rtsp://admin:secret@192.168.3.4:554/h264/ch1/main/av_stream"},
}, },
audit_path, audit_path,
http_post=fake_post, http_post=fake_post,
@@ -192,6 +234,8 @@ class WebhookTests(unittest.TestCase):
self.assertEqual(deliveries[0][0], "https://example.com/cases") self.assertEqual(deliveries[0][0], "https://example.com/cases")
self.assertEqual(deliveries[0][1]["kind"], "case_event") self.assertEqual(deliveries[0][1]["kind"], "case_event")
self.assertEqual(deliveries[0][1]["action"], "handled") self.assertEqual(deliveries[0][1]["action"], "handled")
self.assertEqual(deliveries[0][1]["camera_ip"], "192.168.3.4")
self.assertEqual(deliveries[0][1]["source_id"], "cold-display-guard")
def test_failed_delivery_is_logged_without_raising(self) -> None: def test_failed_delivery_is_logged_without_raising(self) -> None:
def fake_post(url: str, payload: dict[str, object], timeout: tuple[float, float]) -> tuple[int, str]: def fake_post(url: str, payload: dict[str, object], timeout: tuple[float, float]) -> tuple[int, str]: