From 04729a0fd17e9890aebc539d148108f566ae659a Mon Sep 17 00:00:00 2001 From: "skye.yue" Date: Tue, 9 Jun 2026 13:01:15 +0800 Subject: [PATCH] feat: upload alarm snapshots to webhook payloads --- README_zh.md | 23 ++ config/example.toml | 9 + .../plans/2026-06-09-alarm-snapshot-upload.md | 111 ++++++ src/cold_display_guard/alarm_snapshots.py | 325 ++++++++++++++++++ src/cold_display_guard/config.py | 23 ++ src/cold_display_guard/main.py | 43 ++- src/cold_display_guard/manage_api.py | 3 + src/cold_display_guard/webhooks.py | 53 ++- tasks/todo.md | 29 +- tests/test_alarm_snapshots.py | 132 +++++++ tests/test_config.py | 10 + tests/test_main.py | 72 ++++ tests/test_manage_api.py | 8 + tests/test_webhooks.py | 35 ++ 14 files changed, 853 insertions(+), 23 deletions(-) create mode 100644 docs/superpowers/plans/2026-06-09-alarm-snapshot-upload.md create mode 100644 src/cold_display_guard/alarm_snapshots.py create mode 100644 tests/test_alarm_snapshots.py diff --git a/README_zh.md b/README_zh.md index 5e5e98d..4d13053 100644 --- a/README_zh.md +++ b/README_zh.md @@ -184,6 +184,15 @@ diagnostics_path = "logs/runtime_diagnostics.jsonl" [case_sink] path = "logs/cases.jsonl" +[alarm_snapshot_upload] +enabled = true +service_url = "https://ota.zhengxinshipin.com" +secret = "change-me-in-production" +object_key_prefix = "cold-display-guard/alarms" +connect_timeout_seconds = 5 +read_timeout_seconds = 20 +encode_timeout_seconds = 10 + [webhook_retry_sink] path = "logs/webhook_retry.jsonl" @@ -206,6 +215,20 @@ retry_max_backoff_seconds = 1800 - `logs/webhook_retry.jsonl`:Webhook 重试队列状态快照 - `logs/webhook_delivery.jsonl`:Webhook 投递结果审计 +当某一轮识别结果里出现 `severity=alarm` 或 `severity=warning` 的事件时,运行时会直接复用当前检测帧: + +1. 用 `ffmpeg` 把当前 RGB 帧编码成 JPEG +2. 通过 `https://ota.zhengxinshipin.com` 的 chunk-upload API 上传 +3. 把上传返回的 `object_key` 追加到对应 webhook payload + +相关 webhook 字段: + +- `snapshot_upload_status`:`uploaded` 或 `error` +- `snapshot_object_key`:上传成功后的 OSS 路径 +- `snapshot_file_name`:上传文件名 +- `snapshot_captured_at`:抓帧时间 +- `snapshot_upload_error`:上传失败原因,仅失败时返回 + ## 本地测试 ```bash diff --git a/config/example.toml b/config/example.toml index 9844367..8581d56 100644 --- a/config/example.toml +++ b/config/example.toml @@ -58,6 +58,15 @@ path = "logs/events.jsonl" [case_sink] path = "logs/cases.jsonl" +[alarm_snapshot_upload] +enabled = true +service_url = "https://ota.zhengxinshipin.com" +secret = "change-me-in-production" +object_key_prefix = "cold-display-guard/alarms" +connect_timeout_seconds = 5 +read_timeout_seconds = 20 +encode_timeout_seconds = 10 + [webhook_retry_sink] path = "logs/webhook_retry.jsonl" diff --git a/docs/superpowers/plans/2026-06-09-alarm-snapshot-upload.md b/docs/superpowers/plans/2026-06-09-alarm-snapshot-upload.md new file mode 100644 index 0000000..42ef591 --- /dev/null +++ b/docs/superpowers/plans/2026-06-09-alarm-snapshot-upload.md @@ -0,0 +1,111 @@ +# Alarm Snapshot Upload Implementation Plan + +> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. + +**Goal:** Capture one current-frame snapshot for alerting runtime events, upload it to the OTA chunk-upload service, and include the returned path in outbound webhook payloads. + +**Architecture:** Keep `BatchEngine` unchanged and treat snapshot upload as runtime-side enrichment. Reuse the already captured RGB frame from the active detection loop, encode it to JPEG with `ffmpeg`, upload it through the documented token/init/chunk/complete flow, then merge the returned `object_key` into the webhook payload for alert-level batch events and the derived case events from the same cycle. + +**Tech Stack:** Python 3.12 standard library backend, existing `ffmpeg` dependency, JSONL webhook retry flow, unittest. + +--- + +### Task 1: Snapshot Upload Client + +**Files:** +- Create: `src/cold_display_guard/alarm_snapshots.py` +- Test: `tests/test_alarm_snapshots.py` + +- [ ] **Step 1: Write the failing test** + Add tests that cover: + - loading upload settings from config + - encoding a current RGB frame into JPEG via injected encoder helper + - successful OTA upload flow returning `object_key` + - disabled or non-alert events skipping upload + +- [ ] **Step 2: Run test to verify it fails** + Run: `eval "$(/opt/homebrew/bin/pyenv init -)" && PYTHONPATH=src python -m unittest tests/test_alarm_snapshots.py -v` + Expected: FAIL because the snapshot upload module does not exist yet. + +- [ ] **Step 3: Write minimal implementation** + Implement: + - upload settings parsing with defaults for `https://ota.zhengxinshipin.com` and secret `change-me-in-production` + - current-frame JPEG encoding + - token/init/chunk/complete upload workflow with injectable HTTP helpers for tests + - per-cycle alert snapshot metadata structure carrying `object_key`, file name, and upload status + +- [ ] **Step 4: Run test to verify it passes** + Run: `eval "$(/opt/homebrew/bin/pyenv init -)" && PYTHONPATH=src python -m unittest tests/test_alarm_snapshots.py -v` + Expected: PASS + +### Task 2: Runtime And Webhook Integration + +**Files:** +- Modify: `src/cold_display_guard/main.py` +- Modify: `src/cold_display_guard/webhooks.py` +- Test: `tests/test_main.py` +- Test: `tests/test_webhooks.py` + +- [ ] **Step 1: Write the failing test** + Add tests that cover: + - runtime uploads one snapshot when a cycle contains alert-severity events + - webhook payload includes uploaded `object_key` for alert batch events + - derived case webhook payload includes the same snapshot path for matching case-creation events + - upload failure does not block webhook delivery and instead records failure metadata in payload + +- [ ] **Step 2: Run test to verify it fails** + Run: + - `eval "$(/opt/homebrew/bin/pyenv init -)" && PYTHONPATH=src python -m unittest tests/test_main.py -v` + - `eval "$(/opt/homebrew/bin/pyenv init -)" && PYTHONPATH=src python -m unittest tests/test_webhooks.py -v` + Expected: FAIL because runtime/webhook code does not accept snapshot metadata yet. + +- [ ] **Step 3: Write minimal implementation** + Implement: + - alert event selection based on event severity + - one-per-cycle snapshot upload using the current frame + - payload enrichment for batch-event and matching case-event webhooks + - retry queue persistence of the already enriched payload + +- [ ] **Step 4: Run test to verify it passes** + Run: + - `eval "$(/opt/homebrew/bin/pyenv init -)" && PYTHONPATH=src python -m unittest tests/test_main.py -v` + - `eval "$(/opt/homebrew/bin/pyenv init -)" && PYTHONPATH=src python -m unittest tests/test_webhooks.py -v` + Expected: PASS + +### Task 3: Config, Secrets, Docs, And Final Verification + +**Files:** +- Modify: `src/cold_display_guard/config.py` +- Modify: `src/cold_display_guard/manage_api.py` +- Modify: `config/example.toml` +- Modify: `README_zh.md` +- Test: `tests/test_config.py` +- Test: `tests/test_manage_api.py` + +- [ ] **Step 1: Write the failing test** + Extend tests so: + - config formatting writes snapshot-upload settings + - management config payload strips sensitive upload secret + +- [ ] **Step 2: Run test to verify it fails** + Run: + - `eval "$(/opt/homebrew/bin/pyenv init -)" && PYTHONPATH=src python -m unittest tests/test_config.py -v` + - `eval "$(/opt/homebrew/bin/pyenv init -)" && PYTHONPATH=src python -m unittest tests/test_manage_api.py -v` + Expected: FAIL because snapshot upload settings are not exposed/formatted yet. + +- [ ] **Step 3: Write minimal implementation** + Implement: + - config keys for snapshot upload URL, secret, object prefix, enable flag, and timeout/chunk settings + - config payload secret stripping + - README updates for alert snapshot upload behavior and returned webhook fields + +- [ ] **Step 4: Run targeted and full verification** + Run: + - `eval "$(/opt/homebrew/bin/pyenv init -)" && PYTHONPATH=src python -m unittest tests/test_alarm_snapshots.py -v` + - `eval "$(/opt/homebrew/bin/pyenv init -)" && PYTHONPATH=src python -m unittest tests/test_main.py -v` + - `eval "$(/opt/homebrew/bin/pyenv init -)" && PYTHONPATH=src python -m unittest tests/test_webhooks.py -v` + - `eval "$(/opt/homebrew/bin/pyenv init -)" && PYTHONPATH=src python -m unittest tests/test_config.py -v` + - `eval "$(/opt/homebrew/bin/pyenv init -)" && PYTHONPATH=src python -m unittest tests/test_manage_api.py -v` + - `eval "$(/opt/homebrew/bin/pyenv init -)" && PYTHONPATH=src python -m unittest discover -s tests -v` + - `cd web && pnpm build` + Expected: PASS diff --git a/src/cold_display_guard/alarm_snapshots.py b/src/cold_display_guard/alarm_snapshots.py new file mode 100644 index 0000000..604be68 --- /dev/null +++ b/src/cold_display_guard/alarm_snapshots.py @@ -0,0 +1,325 @@ +from __future__ import annotations + +import hashlib +import json +import subprocess +import uuid +from dataclasses import dataclass +from datetime import datetime, timezone +from pathlib import Path +from typing import Any, Callable +from urllib import error, parse, request + +from cold_display_guard.vision import Frame + + +JsonRequest = Callable[[str, dict[str, object], tuple[float, float]], tuple[int, dict[str, object]]] +MultipartRequest = Callable[[str, dict[str, str], str, str, bytes, tuple[float, float]], tuple[int, dict[str, object]]] +JpegEncoder = Callable[[Frame, float], bytes] +Uploader = Callable[..., dict[str, object]] + + +@dataclass(frozen=True, slots=True) +class AlarmSnapshotSettings: + enabled: bool = True + service_url: str = "https://ota.zhengxinshipin.com" + secret: str = "change-me-in-production" + object_key_prefix: str = "cold-display-guard/alarms" + connect_timeout_seconds: float = 5.0 + read_timeout_seconds: float = 20.0 + encode_timeout_seconds: float = 10.0 + + +class SnapshotUploadError(RuntimeError): + pass + + +def load_alarm_snapshot_settings(config: dict[str, Any]) -> AlarmSnapshotSettings: + payload = config.get("alarm_snapshot_upload", {}) + if not isinstance(payload, dict): + payload = {} + return AlarmSnapshotSettings( + enabled=bool(payload.get("enabled", True)), + service_url=str(payload.get("service_url", "https://ota.zhengxinshipin.com")).rstrip("/"), + secret=str(payload.get("secret", "change-me-in-production")), + object_key_prefix=str(payload.get("object_key_prefix", "cold-display-guard/alarms")).strip("/"), + connect_timeout_seconds=max(1.0, float(payload.get("connect_timeout_seconds", 5.0))), + read_timeout_seconds=max(1.0, float(payload.get("read_timeout_seconds", 20.0))), + encode_timeout_seconds=max(1.0, float(payload.get("encode_timeout_seconds", 10.0))), + ) + + +def capture_alert_snapshot( + frame: Frame, + events: list[dict[str, object]], + config: dict[str, Any], + *, + now: datetime | None = None, + jpeg_encoder: JpegEncoder | None = None, + uploader: Uploader | None = None, +) -> dict[str, object] | None: + alert_events = [event for event in events if event_needs_snapshot(event)] + if not alert_events: + return None + settings = load_alarm_snapshot_settings(config) + if not settings.enabled: + return None + + captured_at = now or parse_event_datetime(alert_events[0].get("ts")) or datetime.now(timezone.utc) + file_name = build_snapshot_file_name(alert_events[0], captured_at) + object_key_hint = build_object_key_hint(settings.object_key_prefix, alert_events[0], captured_at, file_name) + try: + image_bytes = (jpeg_encoder or encode_frame_to_jpeg)(frame, settings.encode_timeout_seconds) + result = (uploader or upload_snapshot_bytes)( + image_bytes, + file_name=file_name, + object_key_hint=object_key_hint, + settings=settings, + ) + metadata = dict(result) + metadata["captured_at"] = captured_at.isoformat() + metadata["batch_ids"] = sorted({str(event.get("batch_id", "")).strip() for event in alert_events if str(event.get("batch_id", "")).strip()}) + metadata["event_names"] = sorted({str(event.get("event", "")).strip() for event in alert_events if str(event.get("event", "")).strip()}) + return metadata + except (SnapshotUploadError, OSError) as exc: + return { + "status": "error", + "error": str(exc), + "captured_at": captured_at.isoformat(), + "batch_ids": sorted({str(event.get("batch_id", "")).strip() for event in alert_events if str(event.get("batch_id", "")).strip()}), + "event_names": sorted({str(event.get("event", "")).strip() for event in alert_events if str(event.get("event", "")).strip()}), + } + + +def upload_snapshot_bytes( + image_bytes: bytes, + *, + file_name: str, + object_key_hint: str, + settings: AlarmSnapshotSettings, + post_json_request: JsonRequest | None = None, + post_multipart_request: MultipartRequest | None = None, +) -> dict[str, object]: + timeout = (settings.connect_timeout_seconds, settings.read_timeout_seconds) + post_json_impl = post_json_request or post_json + post_multipart_impl = post_multipart_request or post_multipart + file_md5 = hashlib.md5(image_bytes).hexdigest() + + status, token_payload = post_json_impl( + f"{settings.service_url}/token/generate", + {"secret": settings.secret}, + timeout, + ) + ensure_ok(status, token_payload, "generate upload token") + token = str(token_payload.get("token", "")).strip() + if not token: + raise SnapshotUploadError("generate upload token failed: token missing") + + status, init_payload = post_json_impl( + f"{settings.service_url}/upload/init", + { + "file_name": file_name, + "file_size": len(image_bytes), + "file_md5": file_md5, + "total_chunks": 1, + "chunk_size": len(image_bytes), + "object_key_hint": object_key_hint, + "token": token, + }, + timeout, + ) + ensure_ok(status, init_payload, "init upload session") + upload_id = str(init_payload.get("upload_id", "")).strip() + if not upload_id: + raise SnapshotUploadError("init upload session failed: upload_id missing") + + query = parse.urlencode({"token": token, "upload_id": upload_id, "index": 0}) + status, chunk_payload = post_multipart_impl( + f"{settings.service_url}/upload/chunk?{query}", + {"chunk_md5": file_md5}, + "chunk", + file_name, + image_bytes, + timeout, + ) + ensure_ok(status, chunk_payload, "upload snapshot chunk") + + status, complete_payload = post_json_impl( + f"{settings.service_url}/upload/complete", + {"upload_id": upload_id, "file_md5": file_md5, "token": token}, + timeout, + ) + ensure_ok(status, complete_payload, "complete snapshot upload") + object_key = str(complete_payload.get("object_key", "")).strip() + if not object_key: + raise SnapshotUploadError("complete snapshot upload failed: object_key missing") + return { + "status": "uploaded", + "object_key": object_key, + "file_name": file_name, + "file_md5": str(complete_payload.get("file_md5", file_md5)), + "file_size": int(complete_payload.get("file_size", len(image_bytes))), + "upload_id": upload_id, + } + + +def event_needs_snapshot(event: dict[str, object]) -> bool: + severity = str(event.get("severity", "")).strip().lower() + return severity in {"alarm", "warning"} + + +def encode_frame_to_jpeg(frame: Frame, timeout_seconds: float) -> bytes: + command = [ + "ffmpeg", + "-hide_banner", + "-loglevel", + "error", + "-f", + "rawvideo", + "-pix_fmt", + "rgb24", + "-s", + f"{frame.width}x{frame.height}", + "-i", + "-", + "-frames:v", + "1", + "-f", + "image2pipe", + "-vcodec", + "mjpeg", + "-", + ] + try: + result = subprocess.run( + command, + input=frame.rgb, + check=False, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + timeout=max(1.0, timeout_seconds), + ) + except FileNotFoundError as exc: + raise OSError("ffmpeg not found; install ffmpeg first") from exc + except subprocess.TimeoutExpired as exc: + raise OSError(f"ffmpeg jpeg encode timed out after {timeout_seconds:g}s") from exc + if result.returncode != 0: + message = result.stderr.decode("utf-8", errors="replace").strip() + raise OSError(message or f"ffmpeg exited with code {result.returncode}") + if not result.stdout: + raise OSError("ffmpeg returned no jpeg data") + return result.stdout + + +def build_snapshot_file_name(event: dict[str, object], when: datetime) -> str: + camera_id = sanitize_path_component(str(event.get("camera_id", "")).strip() or "camera") + zone_id = sanitize_path_component(str(event.get("zone_id", "")).strip() or "zone") + event_name = sanitize_path_component(str(event.get("event", "")).strip() or "event") + stamp = when.astimezone(timezone.utc).strftime("%Y%m%dT%H%M%SZ") + return f"{camera_id}_{zone_id}_{event_name}_{stamp}.jpg" + + +def build_object_key_hint(prefix: str, event: dict[str, object], when: datetime, file_name: str) -> str: + date_prefix = when.astimezone(timezone.utc).strftime("%Y/%m/%d") + camera_id = sanitize_path_component(str(event.get("camera_id", "")).strip() or "camera") + event_name = sanitize_path_component(str(event.get("event", "")).strip() or "event") + pieces = [piece for piece in (prefix.strip("/"), camera_id, event_name, date_prefix, file_name) if piece] + return "/".join(pieces) + + +def parse_event_datetime(value: object) -> datetime | None: + text = str(value or "").strip() + if not text: + return None + try: + parsed = datetime.fromisoformat(text) + except ValueError: + return None + return parsed if parsed.tzinfo is not None else parsed.replace(tzinfo=timezone.utc) + + +def sanitize_path_component(value: str) -> str: + safe = "".join(ch if ch.isalnum() or ch in {"-", "_"} else "-" for ch in value.strip()) + while "--" in safe: + safe = safe.replace("--", "-") + return safe.strip("-") or "item" + + +def ensure_ok(status: int, payload: dict[str, object], action: str) -> None: + if 200 <= status < 300 and "error" not in payload: + return + message = str(payload.get("error", "")).strip() or f"http {status}" + raise SnapshotUploadError(f"{action} failed: {message}") + + +def post_json(url: str, payload: dict[str, object], timeout: tuple[float, float]) -> tuple[int, dict[str, object]]: + data = json.dumps(payload, ensure_ascii=False, sort_keys=True).encode("utf-8") + req = request.Request(url, data=data, headers={"Content-Type": "application/json"}, method="POST") + return perform_request(req, timeout) + + +def post_multipart( + url: str, + fields: dict[str, str], + file_field: str, + file_name: str, + file_bytes: bytes, + timeout: tuple[float, float], +) -> tuple[int, dict[str, object]]: + boundary = f"----cold-display-{uuid.uuid4().hex}" + body = build_multipart_body(boundary, fields, file_field, file_name, file_bytes) + req = request.Request( + url, + data=body, + headers={"Content-Type": f"multipart/form-data; boundary={boundary}"}, + method="POST", + ) + return perform_request(req, timeout) + + +def build_multipart_body( + boundary: str, + fields: dict[str, str], + file_field: str, + file_name: str, + file_bytes: bytes, +) -> bytes: + parts: list[bytes] = [] + for key, value in fields.items(): + parts.extend( + [ + f"--{boundary}\r\n".encode("utf-8"), + f'Content-Disposition: form-data; name="{key}"\r\n\r\n'.encode("utf-8"), + str(value).encode("utf-8"), + b"\r\n", + ] + ) + parts.extend( + [ + f"--{boundary}\r\n".encode("utf-8"), + f'Content-Disposition: form-data; name="{file_field}"; filename="{Path(file_name).name}"\r\n'.encode("utf-8"), + b"Content-Type: image/jpeg\r\n\r\n", + file_bytes, + b"\r\n", + f"--{boundary}--\r\n".encode("utf-8"), + ] + ) + return b"".join(parts) + + +def perform_request(req: request.Request, timeout: tuple[float, float]) -> tuple[int, dict[str, object]]: + try: + with request.urlopen(req, timeout=sum(timeout)) as response: + return response.getcode(), decode_json_response(response.read()) + except error.HTTPError as exc: + return exc.code, decode_json_response(exc.read()) + + +def decode_json_response(data: bytes) -> dict[str, object]: + if not data: + return {} + try: + payload = json.loads(data.decode("utf-8")) + except (UnicodeDecodeError, json.JSONDecodeError): + return {"error": data.decode("utf-8", errors="replace")} + return payload if isinstance(payload, dict) else {"error": "invalid json response"} diff --git a/src/cold_display_guard/config.py b/src/cold_display_guard/config.py index be515c5..45b4c14 100644 --- a/src/cold_display_guard/config.py +++ b/src/cold_display_guard/config.py @@ -199,6 +199,29 @@ def format_config_document(data: dict[str, Any]) -> str: lines.append(f'path = "{_escape(str(case_sink.get("path", "logs/cases.jsonl")))}"') lines.append("") + alarm_snapshot_upload = data.get("alarm_snapshot_upload", {}) + if alarm_snapshot_upload: + lines.append("[alarm_snapshot_upload]") + for key in ( + "connect_timeout_seconds", + "encode_timeout_seconds", + "enabled", + "object_key_prefix", + "read_timeout_seconds", + "secret", + "service_url", + ): + if key not in alarm_snapshot_upload: + continue + value = alarm_snapshot_upload[key] + if isinstance(value, bool): + lines.append(f"{key} = {str(value).lower()}") + elif isinstance(value, int | float): + lines.append(f"{key} = {value}") + else: + lines.append(f'{key} = "{_escape(str(value))}"') + lines.append("") + webhook_retry_sink = data.get("webhook_retry_sink", {}) if webhook_retry_sink: lines.append("[webhook_retry_sink]") diff --git a/src/cold_display_guard/main.py b/src/cold_display_guard/main.py index c7d1cd7..57b10d7 100644 --- a/src/cold_display_guard/main.py +++ b/src/cold_display_guard/main.py @@ -7,6 +7,7 @@ from datetime import datetime from pathlib import Path from zoneinfo import ZoneInfo +from cold_display_guard.alarm_snapshots import capture_alert_snapshot from cold_display_guard.cases import CaseStore, append_case_snapshots, load_case_snapshots from cold_display_guard.config import load_config_document, load_settings, resolve_config_path, resolve_project_root from cold_display_guard.engine import BatchEngine @@ -104,6 +105,7 @@ def run(config_path: str | Path, once: bool = False, max_iterations: int = 0) -> events = engine.process(observation) append_jsonl(event_path, events) case_snapshots = persist_case_updates(case_store, case_path, events) + snapshot_upload = capture_runtime_alarm_snapshot(frame, events, config, now=when) deliver_runtime_webhooks( events, case_snapshots, @@ -111,6 +113,7 @@ def run(config_path: str | Path, once: bool = False, max_iterations: int = 0) -> webhook_delivery_path, retry_path=webhook_retry_path, now=when, + snapshot_upload=snapshot_upload, ) append_jsonl( diagnostics_path, @@ -173,6 +176,25 @@ def persist_case_updates(case_store: CaseStore, path: Path, events: list[dict[st return snapshots +def capture_runtime_alarm_snapshot( + frame, + events: list[dict[str, object]], + config: dict[str, object], + *, + now: datetime | None = None, + jpeg_encoder=None, + uploader=None, +) -> dict[str, object] | None: + return capture_alert_snapshot( + frame, + events, + config, + now=now, + jpeg_encoder=jpeg_encoder, + uploader=uploader, + ) + + def deliver_runtime_webhooks( events: list[dict[str, object]], case_snapshots: list[dict[str, object]], @@ -182,9 +204,26 @@ def deliver_runtime_webhooks( retry_path: Path | None = None, http_post=None, now: datetime | None = None, + snapshot_upload: dict[str, object] | None = None, ) -> None: - send_batch_event_webhooks(events, config, audit_path, retry_path=retry_path, http_post=http_post, now=now) - send_case_webhooks(case_snapshots, config, audit_path, retry_path=retry_path, http_post=http_post, now=now) + send_batch_event_webhooks( + events, + config, + audit_path, + retry_path=retry_path, + http_post=http_post, + now=now, + snapshot_upload=snapshot_upload, + ) + send_case_webhooks( + case_snapshots, + config, + audit_path, + retry_path=retry_path, + http_post=http_post, + now=now, + snapshot_upload=snapshot_upload, + ) if retry_path is not None: drain_webhook_retries(config, retry_path, audit_path, http_post=http_post, now=now) diff --git a/src/cold_display_guard/manage_api.py b/src/cold_display_guard/manage_api.py index 0914cda..54ba727 100644 --- a/src/cold_display_guard/manage_api.py +++ b/src/cold_display_guard/manage_api.py @@ -311,6 +311,8 @@ def config_payload(ctx: ManageContext) -> dict[str, Any]: event_path = event_sink_path(ctx, data) case_path = case_sink_path(ctx, data) retry_path = webhook_retry_sink_path(ctx, data) + alarm_snapshot_upload = dict(data.get("alarm_snapshot_upload", {}) or {}) + alarm_snapshot_upload.pop("secret", None) webhooks = dict(data.get("webhooks", {}) or {}) webhooks.pop("callback_token", None) return { @@ -329,6 +331,7 @@ def config_payload(ctx: ManageContext) -> dict[str, Any]: "event_sink": {"path": str(event_path)}, "case_sink": {"path": str(case_path)}, "webhook_retry_sink": {"path": str(retry_path)}, + "alarm_snapshot_upload": alarm_snapshot_upload, "webhooks": webhooks, } diff --git a/src/cold_display_guard/webhooks.py b/src/cold_display_guard/webhooks.py index 64475f9..fbca134 100644 --- a/src/cold_display_guard/webhooks.py +++ b/src/cold_display_guard/webhooks.py @@ -133,8 +133,12 @@ def load_webhook_settings(config: dict[str, Any]) -> WebhookSettings: ) -def build_batch_event_payload(event: dict[str, object]) -> dict[str, object]: - return { +def build_batch_event_payload( + event: dict[str, object], + *, + snapshot_upload: dict[str, object] | None = None, +) -> dict[str, object]: + payload = { "kind": "batch_event", "event": event.get("event", ""), "ts": event.get("ts", ""), @@ -145,10 +149,15 @@ def build_batch_event_payload(event: dict[str, object]) -> dict[str, object]: "severity": event.get("severity", ""), "state": event.get("state", ""), } + return attach_snapshot_upload(payload, batch_id=str(event.get("batch_id", "")), snapshot_upload=snapshot_upload) -def build_case_event_payload(snapshot: dict[str, object]) -> dict[str, object]: - return { +def build_case_event_payload( + snapshot: dict[str, object], + *, + snapshot_upload: dict[str, object] | None = None, +) -> dict[str, object]: + payload = { "kind": "case_event", "action": infer_case_action(snapshot), "case_id": snapshot.get("case_id", ""), @@ -159,6 +168,7 @@ def build_case_event_payload(snapshot: dict[str, object]) -> dict[str, object]: "handled_source": snapshot.get("handled_source", ""), "updated_at": snapshot.get("updated_at", ""), } + return attach_snapshot_upload(payload, batch_id=str(snapshot.get("batch_id", "")), snapshot_upload=snapshot_upload) def infer_case_action(snapshot: dict[str, object]) -> str: @@ -177,6 +187,7 @@ def send_batch_event_webhooks( retry_path: Path | None = None, http_post: HttpPost | None = None, now: datetime | None = None, + snapshot_upload: dict[str, object] | None = None, ) -> list[dict[str, object]]: settings = load_webhook_settings(config) if not settings.enabled or not settings.event_url: @@ -186,7 +197,7 @@ def send_batch_event_webhooks( retry_updates: list[dict[str, object]] = [] store = load_retry_store(retry_path) if retry_path is not None else None for event in events: - payload = build_batch_event_payload(event) + payload = build_batch_event_payload(event, snapshot_upload=snapshot_upload) record = deliver_webhook( settings.event_url, payload, @@ -223,6 +234,7 @@ def send_case_webhooks( retry_path: Path | None = None, http_post: HttpPost | None = None, now: datetime | None = None, + snapshot_upload: dict[str, object] | None = None, ) -> list[dict[str, object]]: settings = load_webhook_settings(config) if not settings.enabled or not settings.case_url: @@ -232,7 +244,7 @@ def send_case_webhooks( retry_updates: list[dict[str, object]] = [] store = load_retry_store(retry_path) if retry_path is not None else None for snapshot in snapshots: - payload = build_case_event_payload(snapshot) + payload = build_case_event_payload(snapshot, snapshot_upload=snapshot_upload) record = deliver_webhook( settings.case_url, payload, @@ -455,6 +467,35 @@ def optional_int(value: object) -> int | None: return None +def attach_snapshot_upload( + payload: dict[str, object], + *, + batch_id: str, + snapshot_upload: dict[str, object] | None, +) -> dict[str, object]: + if not snapshot_upload: + return payload + batch_ids = {str(item).strip() for item in snapshot_upload.get("batch_ids", []) if str(item).strip()} + if batch_ids and batch_id not in batch_ids: + return payload + status = str(snapshot_upload.get("status", "")).strip() + if status: + payload["snapshot_upload_status"] = status + object_key = str(snapshot_upload.get("object_key", "")).strip() + if object_key: + payload["snapshot_object_key"] = object_key + file_name = str(snapshot_upload.get("file_name", "")).strip() + if file_name: + payload["snapshot_file_name"] = file_name + captured_at = str(snapshot_upload.get("captured_at", "")).strip() + if captured_at: + payload["snapshot_captured_at"] = captured_at + error_message = str(snapshot_upload.get("error", "")).strip() + if error_message: + payload["snapshot_upload_error"] = error_message + return payload + + def file_ends_with_newline(path: Path) -> bool: with path.open("rb") as handle: handle.seek(-1, 2) diff --git a/tasks/todo.md b/tasks/todo.md index 375ac4d..76393ef 100644 --- a/tasks/todo.md +++ b/tasks/todo.md @@ -1,31 +1,30 @@ # Task Todo - [x] Review the current project instructions and check for task-relevant lessons. -- [x] Check repository status before starting retry-queue work. -- [x] Re-verify that `main` includes webhook case management before layering retries on top. -- [x] Inspect the current webhook delivery path, config surface, runtime integration point, and manage API hooks. -- [x] Write the detailed retry-queue implementation plan to `docs/superpowers/plans/2026-06-09-webhook-retry-queue.md`. -- [x] Execute webhook retry queue backend TDD cycle. -- [x] Execute runtime/manage API retry integration TDD cycle. -- [x] Update documentation/config formatting for retry queue settings and sinks. +- [x] Inspect the OTA upload API document and current runtime/webhook capture path. +- [x] Create an isolated worktree for alarm snapshot upload implementation. +- [x] Write the detailed implementation plan to `docs/superpowers/plans/2026-06-09-alarm-snapshot-upload.md`. +- [x] Execute alarm snapshot upload client TDD cycle. +- [x] Execute runtime and webhook payload integration TDD cycle. +- [x] Update config surface, docs, and verification notes. - [x] Run targeted verification and final full verification. ## Notes - `tasks/lessons.md` is absent in this repository/worktree, so there were no prior session lessons to review. -- Main branch merge result is available locally at `81f1709`; retry-queue work continues from branch `feat/webhook-retry-queue`. +- Upload API reference: `/Users/glo/code/go/wenma/ai_manager/zd-ai-manager/chunk-upload-oss-service/UPLOAD_API.md` +- User-provided upload target: `https://ota.zhengxinshipin.com` +- User-provided token secret: `change-me-in-production` ## Review -- Plan saved to `docs/superpowers/plans/2026-06-09-webhook-retry-queue.md`. -- Chosen scope keeps the first outbound webhook attempt synchronous, then persists failures into a JSONL-backed retry queue with bounded backoff and dead-letter cutoff. -- Retry queue observability and manual compensation will be exposed through the management API rather than the frontend in this phase. -- Implemented queue-aware webhook delivery in `src/cold_display_guard/webhooks.py`, runtime retry draining in `src/cold_display_guard/main.py`, manage API retry list/drain endpoints in `src/cold_display_guard/manage_api.py`, and config/doc updates in `src/cold_display_guard/config.py`, `config/example.toml`, and `README_zh.md`. +- Plan saved to `docs/superpowers/plans/2026-06-09-alarm-snapshot-upload.md`. +- Chosen implementation keeps snapshot upload entirely outside `BatchEngine` and enriches webhook payloads from the runtime side using the already captured frame. +- Implemented `src/cold_display_guard/alarm_snapshots.py` for JPEG encoding plus OTA chunk-upload orchestration, runtime integration in `src/cold_display_guard/main.py`, webhook payload enrichment in `src/cold_display_guard/webhooks.py`, config exposure/secret stripping in `src/cold_display_guard/config.py` and `src/cold_display_guard/manage_api.py`, and config/doc updates in `config/example.toml` and `README_zh.md`. - Targeted verification passed: - - `eval "$(/opt/homebrew/bin/pyenv init -)" && PYTHONPATH=src python -m unittest tests/test_webhooks.py -v` + - `eval "$(/opt/homebrew/bin/pyenv init -)" && PYTHONPATH=src python -m unittest tests/test_alarm_snapshots.py -v` - `eval "$(/opt/homebrew/bin/pyenv init -)" && PYTHONPATH=src python -m unittest tests/test_main.py -v` - - `eval "$(/opt/homebrew/bin/pyenv init -)" && PYTHONPATH=src python -m unittest tests/test_manage_api.py -v` - - `eval "$(/opt/homebrew/bin/pyenv init -)" && PYTHONPATH=src python -m unittest tests/test_config.py -v` + - `eval "$(/opt/homebrew/bin/pyenv init -)" && PYTHONPATH=src python -m unittest tests/test_webhooks.py tests/test_config.py tests/test_manage_api.py -v` - Final verification passed: - `eval "$(/opt/homebrew/bin/pyenv init -)" && PYTHONPATH=src python -m unittest discover -s tests -v` - `cd web && pnpm install --frozen-lockfile && pnpm build` diff --git a/tests/test_alarm_snapshots.py b/tests/test_alarm_snapshots.py new file mode 100644 index 0000000..ad697ea --- /dev/null +++ b/tests/test_alarm_snapshots.py @@ -0,0 +1,132 @@ +from __future__ import annotations + +import unittest +from datetime import datetime, timezone + +from cold_display_guard.alarm_snapshots import ( + capture_alert_snapshot, + load_alarm_snapshot_settings, + upload_snapshot_bytes, +) +from cold_display_guard.vision import Frame + + +UTC = timezone.utc + + +class AlarmSnapshotTests(unittest.TestCase): + def test_load_alarm_snapshot_settings_from_config(self) -> None: + settings = load_alarm_snapshot_settings( + { + "alarm_snapshot_upload": { + "enabled": True, + "service_url": "https://ota.zhengxinshipin.com", + "secret": "change-me-in-production", + "object_key_prefix": "alarms/cold-display", + "connect_timeout_seconds": 4, + "read_timeout_seconds": 9, + "encode_timeout_seconds": 7, + } + } + ) + + self.assertTrue(settings.enabled) + self.assertEqual(settings.service_url, "https://ota.zhengxinshipin.com") + self.assertEqual(settings.secret, "change-me-in-production") + self.assertEqual(settings.object_key_prefix, "alarms/cold-display") + self.assertEqual(settings.connect_timeout_seconds, 4) + self.assertEqual(settings.read_timeout_seconds, 9) + self.assertEqual(settings.encode_timeout_seconds, 7) + + def test_upload_snapshot_bytes_uses_documented_chunk_upload_flow(self) -> None: + json_calls: list[tuple[str, dict[str, object]]] = [] + chunk_calls: list[tuple[str, dict[str, str], bytes, dict[str, str]]] = [] + + def fake_post_json(url: str, payload: dict[str, object], timeout: tuple[float, float]) -> tuple[int, dict[str, object]]: + json_calls.append((url, payload)) + if url.endswith("/token/generate"): + return 200, {"token": "token-1", "expires_at": 1770003600} + if url.endswith("/upload/init"): + return 200, {"upload_id": "upload-1"} + if url.endswith("/upload/complete"): + return 200, {"upload_id": "upload-1", "object_key": "uploads/alarms/a.jpg", "file_size": 3, "file_md5": "900150983cd24fb0d6963f7d28e17f72"} + raise AssertionError(url) + + def fake_post_multipart( + url: str, + fields: dict[str, str], + file_field: str, + file_name: str, + file_bytes: bytes, + timeout: tuple[float, float], + ) -> tuple[int, dict[str, object]]: + chunk_calls.append((url, fields, file_bytes, {"file_field": file_field, "file_name": file_name})) + return 200, {"upload_id": "upload-1", "index": 0, "size": len(file_bytes), "received_chunks": 1, "total_chunks": 1} + + result = upload_snapshot_bytes( + b"abc", + file_name="alarm.jpg", + object_key_hint="alarms/a.jpg", + settings=load_alarm_snapshot_settings({}), + post_json_request=fake_post_json, + post_multipart_request=fake_post_multipart, + ) + + self.assertEqual(result["status"], "uploaded") + self.assertEqual(result["object_key"], "uploads/alarms/a.jpg") + self.assertEqual(json_calls[0][0], "https://ota.zhengxinshipin.com/token/generate") + self.assertEqual(json_calls[1][0], "https://ota.zhengxinshipin.com/upload/init") + self.assertEqual(json_calls[2][0], "https://ota.zhengxinshipin.com/upload/complete") + self.assertIn("token=token-1", chunk_calls[0][0]) + self.assertIn("upload_id=upload-1", chunk_calls[0][0]) + self.assertEqual(chunk_calls[0][1]["chunk_md5"], "900150983cd24fb0d6963f7d28e17f72") + self.assertEqual(chunk_calls[0][3]["file_field"], "chunk") + + def test_capture_alert_snapshot_skips_non_alert_events(self) -> None: + result = capture_alert_snapshot( + Frame(width=1, height=1, rgb=b"\x00\x00\x00"), + [{"event": "batch_started", "severity": "info", "batch_id": "batch_1"}], + {}, + now=datetime(2026, 6, 9, 9, 0, tzinfo=UTC), + ) + + self.assertIsNone(result) + + def test_capture_alert_snapshot_uploads_current_frame_for_alert_events(self) -> None: + encode_calls: list[Frame] = [] + upload_calls: list[tuple[bytes, str, str]] = [] + + def fake_encode(frame: Frame, timeout_seconds: float) -> bytes: + encode_calls.append(frame) + return b"jpeg-bytes" + + def fake_upload( + image_bytes: bytes, + *, + file_name: str, + object_key_hint: str, + settings, + post_json_request=None, + post_multipart_request=None, + ) -> dict[str, object]: + upload_calls.append((image_bytes, file_name, object_key_hint)) + return {"status": "uploaded", "object_key": "uploads/alarms/test.jpg", "file_name": file_name} + + result = capture_alert_snapshot( + Frame(width=1, height=1, rgb=b"\x01\x02\x03"), + [{"event": "time_alarm", "severity": "alarm", "batch_id": "batch_1", "camera_id": "cam_1", "zone_id": "1", "ts": "2026-06-09T09:00:00+00:00"}], + {"alarm_snapshot_upload": {"enabled": True, "object_key_prefix": "alarms/cold-display"}}, + now=datetime(2026, 6, 9, 9, 0, tzinfo=UTC), + jpeg_encoder=fake_encode, + uploader=fake_upload, + ) + + self.assertEqual(len(encode_calls), 1) + self.assertEqual(upload_calls[0][0], b"jpeg-bytes") + self.assertEqual(result["status"], "uploaded") + self.assertEqual(result["object_key"], "uploads/alarms/test.jpg") + self.assertEqual(result["batch_ids"], ["batch_1"]) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_config.py b/tests/test_config.py index f1711d6..e66d709 100644 --- a/tests/test_config.py +++ b/tests/test_config.py @@ -128,6 +128,12 @@ zone_ids = ["1", "2", "3"] save_config_document( path, { + "alarm_snapshot_upload": { + "enabled": True, + "service_url": "https://ota.zhengxinshipin.com", + "secret": "change-me-in-production", + "object_key_prefix": "cold-display/alarms", + }, "webhooks": { "enabled": True, "event_url": "https://example.com/events", @@ -146,6 +152,10 @@ zone_ids = ["1", "2", "3"] ) text = path.read_text(encoding="utf-8") + self.assertIn("[alarm_snapshot_upload]", text) + self.assertIn('service_url = "https://ota.zhengxinshipin.com"', text) + self.assertIn('secret = "change-me-in-production"', text) + self.assertIn('object_key_prefix = "cold-display/alarms"', text) self.assertIn("[webhooks]", text) self.assertIn('event_url = "https://example.com/events"', text) self.assertIn('case_url = "https://example.com/cases"', text) diff --git a/tests/test_main.py b/tests/test_main.py index 9ac0e67..dce2774 100644 --- a/tests/test_main.py +++ b/tests/test_main.py @@ -9,11 +9,13 @@ from pathlib import Path from cold_display_guard.cases import CaseStore from cold_display_guard.main import ( case_sink_path, + capture_runtime_alarm_snapshot, deliver_runtime_webhooks, persist_case_updates, restore_runtime_state, webhook_retry_sink_path, ) +from cold_display_guard.vision import Frame from cold_display_guard.webhooks import load_retry_snapshots @@ -114,6 +116,29 @@ class RuntimeRestoreTests(unittest.TestCase): self.assertEqual(deliveries[0][1]["kind"], "batch_event") self.assertEqual(deliveries[1][1]["kind"], "case_event") + def test_capture_runtime_alarm_snapshot_uses_current_frame_for_alert_events(self) -> None: + frame = Frame(width=1, height=1, rgb=b"\x01\x02\x03") + result = capture_runtime_alarm_snapshot( + frame, + [ + { + "event": "time_alarm", + "severity": "alarm", + "batch_id": "batch_000001", + "camera_id": "cam_01", + "zone_id": "1", + "ts": datetime(2026, 6, 9, 9, 0, tzinfo=UTC).isoformat(), + } + ], + {"alarm_snapshot_upload": {"enabled": True}}, + now=datetime(2026, 6, 9, 9, 0, tzinfo=UTC), + jpeg_encoder=lambda frame, timeout_seconds: b"jpeg", + uploader=lambda image_bytes, **kwargs: {"status": "uploaded", "object_key": "uploads/alarms/a.jpg", "file_name": "a.jpg"}, + ) + + self.assertEqual(result["status"], "uploaded") + self.assertEqual(result["object_key"], "uploads/alarms/a.jpg") + def test_deliver_runtime_webhooks_enqueues_failure_and_drains_due_retry(self) -> None: attempts = {"count": 0} @@ -169,6 +194,53 @@ class RuntimeRestoreTests(unittest.TestCase): self.assertEqual(retries[-1]["status"], "delivered") self.assertEqual(retries[-1]["attempt_count"], 2) + def test_deliver_runtime_webhooks_includes_snapshot_path_in_alert_payloads(self) -> None: + deliveries: list[dict[str, object]] = [] + + def fake_post(url: str, payload: dict[str, object], timeout: tuple[float, float]) -> tuple[int, str]: + deliveries.append(payload) + return 200, "ok" + + deliver_runtime_webhooks( + [ + { + "event": "time_alarm", + "ts": datetime(2026, 6, 9, 9, 0, tzinfo=UTC).isoformat(), + "batch_id": "batch_000001", + "camera_id": "cam_01", + "zone_id": "1", + "zone_label": "区域 1", + "severity": "alarm", + "state": "alerted", + } + ], + [ + { + "case_id": "case_batch_000001", + "batch_id": "batch_000001", + "case_type": "time_alarm", + "case_status": "open", + "source_event": "time_alarm", + "handled_source": "", + "created_at": datetime(2026, 6, 9, 9, 0, tzinfo=UTC).isoformat(), + "updated_at": datetime(2026, 6, 9, 9, 0, tzinfo=UTC).isoformat(), + } + ], + { + "webhooks": { + "enabled": True, + "event_url": "https://example.com/events", + "case_url": "https://example.com/cases", + } + }, + Path(tempfile.mkdtemp()) / "webhook_delivery.jsonl", + http_post=fake_post, + snapshot_upload={"status": "uploaded", "object_key": "uploads/alarms/a.jpg", "batch_ids": ["batch_000001"]}, + ) + + self.assertEqual(deliveries[0]["snapshot_object_key"], "uploads/alarms/a.jpg") + self.assertEqual(deliveries[1]["snapshot_object_key"], "uploads/alarms/a.jpg") + def test_restore_runtime_state_uses_stable_occupancy_when_raw_metrics_flicker(self) -> None: with tempfile.TemporaryDirectory() as tmpdir: diagnostics_path = Path(tmpdir) / "runtime_diagnostics.jsonl" diff --git a/tests/test_manage_api.py b/tests/test_manage_api.py index a3ec055..688df37 100644 --- a/tests/test_manage_api.py +++ b/tests/test_manage_api.py @@ -392,6 +392,11 @@ class ManageApiTests(unittest.TestCase): { "case_sink": {"path": "logs/cases.jsonl"}, "webhook_retry_sink": {"path": "logs/webhook_retry.jsonl"}, + "alarm_snapshot_upload": { + "enabled": True, + "service_url": "https://ota.zhengxinshipin.com", + "secret": "change-me-in-production", + }, "webhooks": { "enabled": True, "event_url": "https://example.com/events", @@ -406,6 +411,9 @@ class ManageApiTests(unittest.TestCase): self.assertEqual(payload["case_sink"]["path"], str((root / "logs" / "cases.jsonl").resolve())) self.assertEqual(payload["webhook_retry_sink"]["path"], str((root / "logs" / "webhook_retry.jsonl").resolve())) + self.assertTrue(payload["alarm_snapshot_upload"]["enabled"]) + self.assertEqual(payload["alarm_snapshot_upload"]["service_url"], "https://ota.zhengxinshipin.com") + self.assertNotIn("secret", payload["alarm_snapshot_upload"]) self.assertTrue(payload["webhooks"]["enabled"]) self.assertEqual(payload["webhooks"]["retry_max_attempts"], 4) self.assertNotIn("callback_token", payload["webhooks"]) diff --git a/tests/test_webhooks.py b/tests/test_webhooks.py index 35bca5f..1a89411 100644 --- a/tests/test_webhooks.py +++ b/tests/test_webhooks.py @@ -68,6 +68,24 @@ class WebhookTests(unittest.TestCase): self.assertEqual(payload["event"], "time_alarm") self.assertEqual(payload["zone_label"], "区域 1") + def test_build_batch_event_payload_includes_uploaded_snapshot_path(self) -> None: + payload = build_batch_event_payload( + { + "event": "time_alarm", + "ts": datetime(2026, 6, 9, 9, 0, tzinfo=UTC).isoformat(), + "batch_id": "batch_000001", + "camera_id": "cam_01", + "zone_id": "1", + "zone_label": "区域 1", + "severity": "alarm", + "state": "alerted", + }, + snapshot_upload={"status": "uploaded", "object_key": "uploads/alarms/a.jpg", "batch_ids": ["batch_000001"]}, + ) + + self.assertEqual(payload["snapshot_upload_status"], "uploaded") + self.assertEqual(payload["snapshot_object_key"], "uploads/alarms/a.jpg") + def test_build_case_event_payload_wraps_case_snapshot(self) -> None: payload = build_case_event_payload( { @@ -85,6 +103,23 @@ class WebhookTests(unittest.TestCase): self.assertEqual(payload["action"], "updated") self.assertEqual(payload["case_id"], "case_batch_000001") + def test_build_case_event_payload_includes_uploaded_snapshot_path(self) -> None: + payload = build_case_event_payload( + { + "case_id": "case_batch_000001", + "case_type": "warning_escalated", + "case_status": "open", + "batch_id": "batch_000001", + "source_event": "warning_escalated", + "handled_source": "", + "updated_at": datetime(2026, 6, 9, 9, 5, tzinfo=UTC).isoformat(), + }, + snapshot_upload={"status": "uploaded", "object_key": "uploads/alarms/a.jpg", "batch_ids": ["batch_000001"]}, + ) + + self.assertEqual(payload["snapshot_upload_status"], "uploaded") + self.assertEqual(payload["snapshot_object_key"], "uploads/alarms/a.jpg") + def test_send_batch_event_webhooks_delivers_payload(self) -> None: deliveries: list[tuple[str, dict[str, object], tuple[float, float]]] = []