feat: upload alarm snapshots to webhook payloads

This commit is contained in:
2026-06-09 13:01:15 +08:00
parent 523f928303
commit 04729a0fd1
14 changed files with 853 additions and 23 deletions

View File

@@ -184,6 +184,15 @@ diagnostics_path = "logs/runtime_diagnostics.jsonl"
[case_sink]
path = "logs/cases.jsonl"
[alarm_snapshot_upload]
enabled = true
service_url = "https://ota.zhengxinshipin.com"
secret = "change-me-in-production"
object_key_prefix = "cold-display-guard/alarms"
connect_timeout_seconds = 5
read_timeout_seconds = 20
encode_timeout_seconds = 10
[webhook_retry_sink]
path = "logs/webhook_retry.jsonl"
@@ -206,6 +215,20 @@ retry_max_backoff_seconds = 1800
- `logs/webhook_retry.jsonl`Webhook 重试队列状态快照
- `logs/webhook_delivery.jsonl`Webhook 投递结果审计
当某一轮识别结果里出现 `severity=alarm``severity=warning` 的事件时,运行时会直接复用当前检测帧:
1.`ffmpeg` 把当前 RGB 帧编码成 JPEG
2. 通过 `https://ota.zhengxinshipin.com` 的 chunk-upload API 上传
3. 把上传返回的 `object_key` 追加到对应 webhook payload
相关 webhook 字段:
- `snapshot_upload_status``uploaded``error`
- `snapshot_object_key`:上传成功后的 OSS 路径
- `snapshot_file_name`:上传文件名
- `snapshot_captured_at`:抓帧时间
- `snapshot_upload_error`:上传失败原因,仅失败时返回
## 本地测试
```bash

View File

@@ -58,6 +58,15 @@ path = "logs/events.jsonl"
[case_sink]
path = "logs/cases.jsonl"
[alarm_snapshot_upload]
enabled = true
service_url = "https://ota.zhengxinshipin.com"
secret = "change-me-in-production"
object_key_prefix = "cold-display-guard/alarms"
connect_timeout_seconds = 5
read_timeout_seconds = 20
encode_timeout_seconds = 10
[webhook_retry_sink]
path = "logs/webhook_retry.jsonl"

View File

@@ -0,0 +1,111 @@
# Alarm Snapshot Upload Implementation Plan
> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking.
**Goal:** Capture one current-frame snapshot for alerting runtime events, upload it to the OTA chunk-upload service, and include the returned path in outbound webhook payloads.
**Architecture:** Keep `BatchEngine` unchanged and treat snapshot upload as runtime-side enrichment. Reuse the already captured RGB frame from the active detection loop, encode it to JPEG with `ffmpeg`, upload it through the documented token/init/chunk/complete flow, then merge the returned `object_key` into the webhook payload for alert-level batch events and the derived case events from the same cycle.
**Tech Stack:** Python 3.12 standard library backend, existing `ffmpeg` dependency, JSONL webhook retry flow, unittest.
---
### Task 1: Snapshot Upload Client
**Files:**
- Create: `src/cold_display_guard/alarm_snapshots.py`
- Test: `tests/test_alarm_snapshots.py`
- [ ] **Step 1: Write the failing test**
Add tests that cover:
- loading upload settings from config
- encoding a current RGB frame into JPEG via injected encoder helper
- successful OTA upload flow returning `object_key`
- disabled or non-alert events skipping upload
- [ ] **Step 2: Run test to verify it fails**
Run: `eval "$(/opt/homebrew/bin/pyenv init -)" && PYTHONPATH=src python -m unittest tests/test_alarm_snapshots.py -v`
Expected: FAIL because the snapshot upload module does not exist yet.
- [ ] **Step 3: Write minimal implementation**
Implement:
- upload settings parsing with defaults for `https://ota.zhengxinshipin.com` and secret `change-me-in-production`
- current-frame JPEG encoding
- token/init/chunk/complete upload workflow with injectable HTTP helpers for tests
- per-cycle alert snapshot metadata structure carrying `object_key`, file name, and upload status
- [ ] **Step 4: Run test to verify it passes**
Run: `eval "$(/opt/homebrew/bin/pyenv init -)" && PYTHONPATH=src python -m unittest tests/test_alarm_snapshots.py -v`
Expected: PASS
### Task 2: Runtime And Webhook Integration
**Files:**
- Modify: `src/cold_display_guard/main.py`
- Modify: `src/cold_display_guard/webhooks.py`
- Test: `tests/test_main.py`
- Test: `tests/test_webhooks.py`
- [ ] **Step 1: Write the failing test**
Add tests that cover:
- runtime uploads one snapshot when a cycle contains alert-severity events
- webhook payload includes uploaded `object_key` for alert batch events
- derived case webhook payload includes the same snapshot path for matching case-creation events
- upload failure does not block webhook delivery and instead records failure metadata in payload
- [ ] **Step 2: Run test to verify it fails**
Run:
- `eval "$(/opt/homebrew/bin/pyenv init -)" && PYTHONPATH=src python -m unittest tests/test_main.py -v`
- `eval "$(/opt/homebrew/bin/pyenv init -)" && PYTHONPATH=src python -m unittest tests/test_webhooks.py -v`
Expected: FAIL because runtime/webhook code does not accept snapshot metadata yet.
- [ ] **Step 3: Write minimal implementation**
Implement:
- alert event selection based on event severity
- one-per-cycle snapshot upload using the current frame
- payload enrichment for batch-event and matching case-event webhooks
- retry queue persistence of the already enriched payload
- [ ] **Step 4: Run test to verify it passes**
Run:
- `eval "$(/opt/homebrew/bin/pyenv init -)" && PYTHONPATH=src python -m unittest tests/test_main.py -v`
- `eval "$(/opt/homebrew/bin/pyenv init -)" && PYTHONPATH=src python -m unittest tests/test_webhooks.py -v`
Expected: PASS
### Task 3: Config, Secrets, Docs, And Final Verification
**Files:**
- Modify: `src/cold_display_guard/config.py`
- Modify: `src/cold_display_guard/manage_api.py`
- Modify: `config/example.toml`
- Modify: `README_zh.md`
- Test: `tests/test_config.py`
- Test: `tests/test_manage_api.py`
- [ ] **Step 1: Write the failing test**
Extend tests so:
- config formatting writes snapshot-upload settings
- management config payload strips sensitive upload secret
- [ ] **Step 2: Run test to verify it fails**
Run:
- `eval "$(/opt/homebrew/bin/pyenv init -)" && PYTHONPATH=src python -m unittest tests/test_config.py -v`
- `eval "$(/opt/homebrew/bin/pyenv init -)" && PYTHONPATH=src python -m unittest tests/test_manage_api.py -v`
Expected: FAIL because snapshot upload settings are not exposed/formatted yet.
- [ ] **Step 3: Write minimal implementation**
Implement:
- config keys for snapshot upload URL, secret, object prefix, enable flag, and timeout/chunk settings
- config payload secret stripping
- README updates for alert snapshot upload behavior and returned webhook fields
- [ ] **Step 4: Run targeted and full verification**
Run:
- `eval "$(/opt/homebrew/bin/pyenv init -)" && PYTHONPATH=src python -m unittest tests/test_alarm_snapshots.py -v`
- `eval "$(/opt/homebrew/bin/pyenv init -)" && PYTHONPATH=src python -m unittest tests/test_main.py -v`
- `eval "$(/opt/homebrew/bin/pyenv init -)" && PYTHONPATH=src python -m unittest tests/test_webhooks.py -v`
- `eval "$(/opt/homebrew/bin/pyenv init -)" && PYTHONPATH=src python -m unittest tests/test_config.py -v`
- `eval "$(/opt/homebrew/bin/pyenv init -)" && PYTHONPATH=src python -m unittest tests/test_manage_api.py -v`
- `eval "$(/opt/homebrew/bin/pyenv init -)" && PYTHONPATH=src python -m unittest discover -s tests -v`
- `cd web && pnpm build`
Expected: PASS

View File

@@ -0,0 +1,325 @@
from __future__ import annotations
import hashlib
import json
import subprocess
import uuid
from dataclasses import dataclass
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Callable
from urllib import error, parse, request
from cold_display_guard.vision import Frame
JsonRequest = Callable[[str, dict[str, object], tuple[float, float]], tuple[int, dict[str, object]]]
MultipartRequest = Callable[[str, dict[str, str], str, str, bytes, tuple[float, float]], tuple[int, dict[str, object]]]
JpegEncoder = Callable[[Frame, float], bytes]
Uploader = Callable[..., dict[str, object]]
@dataclass(frozen=True, slots=True)
class AlarmSnapshotSettings:
enabled: bool = True
service_url: str = "https://ota.zhengxinshipin.com"
secret: str = "change-me-in-production"
object_key_prefix: str = "cold-display-guard/alarms"
connect_timeout_seconds: float = 5.0
read_timeout_seconds: float = 20.0
encode_timeout_seconds: float = 10.0
class SnapshotUploadError(RuntimeError):
pass
def load_alarm_snapshot_settings(config: dict[str, Any]) -> AlarmSnapshotSettings:
payload = config.get("alarm_snapshot_upload", {})
if not isinstance(payload, dict):
payload = {}
return AlarmSnapshotSettings(
enabled=bool(payload.get("enabled", True)),
service_url=str(payload.get("service_url", "https://ota.zhengxinshipin.com")).rstrip("/"),
secret=str(payload.get("secret", "change-me-in-production")),
object_key_prefix=str(payload.get("object_key_prefix", "cold-display-guard/alarms")).strip("/"),
connect_timeout_seconds=max(1.0, float(payload.get("connect_timeout_seconds", 5.0))),
read_timeout_seconds=max(1.0, float(payload.get("read_timeout_seconds", 20.0))),
encode_timeout_seconds=max(1.0, float(payload.get("encode_timeout_seconds", 10.0))),
)
def capture_alert_snapshot(
frame: Frame,
events: list[dict[str, object]],
config: dict[str, Any],
*,
now: datetime | None = None,
jpeg_encoder: JpegEncoder | None = None,
uploader: Uploader | None = None,
) -> dict[str, object] | None:
alert_events = [event for event in events if event_needs_snapshot(event)]
if not alert_events:
return None
settings = load_alarm_snapshot_settings(config)
if not settings.enabled:
return None
captured_at = now or parse_event_datetime(alert_events[0].get("ts")) or datetime.now(timezone.utc)
file_name = build_snapshot_file_name(alert_events[0], captured_at)
object_key_hint = build_object_key_hint(settings.object_key_prefix, alert_events[0], captured_at, file_name)
try:
image_bytes = (jpeg_encoder or encode_frame_to_jpeg)(frame, settings.encode_timeout_seconds)
result = (uploader or upload_snapshot_bytes)(
image_bytes,
file_name=file_name,
object_key_hint=object_key_hint,
settings=settings,
)
metadata = dict(result)
metadata["captured_at"] = captured_at.isoformat()
metadata["batch_ids"] = sorted({str(event.get("batch_id", "")).strip() for event in alert_events if str(event.get("batch_id", "")).strip()})
metadata["event_names"] = sorted({str(event.get("event", "")).strip() for event in alert_events if str(event.get("event", "")).strip()})
return metadata
except (SnapshotUploadError, OSError) as exc:
return {
"status": "error",
"error": str(exc),
"captured_at": captured_at.isoformat(),
"batch_ids": sorted({str(event.get("batch_id", "")).strip() for event in alert_events if str(event.get("batch_id", "")).strip()}),
"event_names": sorted({str(event.get("event", "")).strip() for event in alert_events if str(event.get("event", "")).strip()}),
}
def upload_snapshot_bytes(
image_bytes: bytes,
*,
file_name: str,
object_key_hint: str,
settings: AlarmSnapshotSettings,
post_json_request: JsonRequest | None = None,
post_multipart_request: MultipartRequest | None = None,
) -> dict[str, object]:
timeout = (settings.connect_timeout_seconds, settings.read_timeout_seconds)
post_json_impl = post_json_request or post_json
post_multipart_impl = post_multipart_request or post_multipart
file_md5 = hashlib.md5(image_bytes).hexdigest()
status, token_payload = post_json_impl(
f"{settings.service_url}/token/generate",
{"secret": settings.secret},
timeout,
)
ensure_ok(status, token_payload, "generate upload token")
token = str(token_payload.get("token", "")).strip()
if not token:
raise SnapshotUploadError("generate upload token failed: token missing")
status, init_payload = post_json_impl(
f"{settings.service_url}/upload/init",
{
"file_name": file_name,
"file_size": len(image_bytes),
"file_md5": file_md5,
"total_chunks": 1,
"chunk_size": len(image_bytes),
"object_key_hint": object_key_hint,
"token": token,
},
timeout,
)
ensure_ok(status, init_payload, "init upload session")
upload_id = str(init_payload.get("upload_id", "")).strip()
if not upload_id:
raise SnapshotUploadError("init upload session failed: upload_id missing")
query = parse.urlencode({"token": token, "upload_id": upload_id, "index": 0})
status, chunk_payload = post_multipart_impl(
f"{settings.service_url}/upload/chunk?{query}",
{"chunk_md5": file_md5},
"chunk",
file_name,
image_bytes,
timeout,
)
ensure_ok(status, chunk_payload, "upload snapshot chunk")
status, complete_payload = post_json_impl(
f"{settings.service_url}/upload/complete",
{"upload_id": upload_id, "file_md5": file_md5, "token": token},
timeout,
)
ensure_ok(status, complete_payload, "complete snapshot upload")
object_key = str(complete_payload.get("object_key", "")).strip()
if not object_key:
raise SnapshotUploadError("complete snapshot upload failed: object_key missing")
return {
"status": "uploaded",
"object_key": object_key,
"file_name": file_name,
"file_md5": str(complete_payload.get("file_md5", file_md5)),
"file_size": int(complete_payload.get("file_size", len(image_bytes))),
"upload_id": upload_id,
}
def event_needs_snapshot(event: dict[str, object]) -> bool:
severity = str(event.get("severity", "")).strip().lower()
return severity in {"alarm", "warning"}
def encode_frame_to_jpeg(frame: Frame, timeout_seconds: float) -> bytes:
command = [
"ffmpeg",
"-hide_banner",
"-loglevel",
"error",
"-f",
"rawvideo",
"-pix_fmt",
"rgb24",
"-s",
f"{frame.width}x{frame.height}",
"-i",
"-",
"-frames:v",
"1",
"-f",
"image2pipe",
"-vcodec",
"mjpeg",
"-",
]
try:
result = subprocess.run(
command,
input=frame.rgb,
check=False,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
timeout=max(1.0, timeout_seconds),
)
except FileNotFoundError as exc:
raise OSError("ffmpeg not found; install ffmpeg first") from exc
except subprocess.TimeoutExpired as exc:
raise OSError(f"ffmpeg jpeg encode timed out after {timeout_seconds:g}s") from exc
if result.returncode != 0:
message = result.stderr.decode("utf-8", errors="replace").strip()
raise OSError(message or f"ffmpeg exited with code {result.returncode}")
if not result.stdout:
raise OSError("ffmpeg returned no jpeg data")
return result.stdout
def build_snapshot_file_name(event: dict[str, object], when: datetime) -> str:
camera_id = sanitize_path_component(str(event.get("camera_id", "")).strip() or "camera")
zone_id = sanitize_path_component(str(event.get("zone_id", "")).strip() or "zone")
event_name = sanitize_path_component(str(event.get("event", "")).strip() or "event")
stamp = when.astimezone(timezone.utc).strftime("%Y%m%dT%H%M%SZ")
return f"{camera_id}_{zone_id}_{event_name}_{stamp}.jpg"
def build_object_key_hint(prefix: str, event: dict[str, object], when: datetime, file_name: str) -> str:
date_prefix = when.astimezone(timezone.utc).strftime("%Y/%m/%d")
camera_id = sanitize_path_component(str(event.get("camera_id", "")).strip() or "camera")
event_name = sanitize_path_component(str(event.get("event", "")).strip() or "event")
pieces = [piece for piece in (prefix.strip("/"), camera_id, event_name, date_prefix, file_name) if piece]
return "/".join(pieces)
def parse_event_datetime(value: object) -> datetime | None:
text = str(value or "").strip()
if not text:
return None
try:
parsed = datetime.fromisoformat(text)
except ValueError:
return None
return parsed if parsed.tzinfo is not None else parsed.replace(tzinfo=timezone.utc)
def sanitize_path_component(value: str) -> str:
safe = "".join(ch if ch.isalnum() or ch in {"-", "_"} else "-" for ch in value.strip())
while "--" in safe:
safe = safe.replace("--", "-")
return safe.strip("-") or "item"
def ensure_ok(status: int, payload: dict[str, object], action: str) -> None:
if 200 <= status < 300 and "error" not in payload:
return
message = str(payload.get("error", "")).strip() or f"http {status}"
raise SnapshotUploadError(f"{action} failed: {message}")
def post_json(url: str, payload: dict[str, object], timeout: tuple[float, float]) -> tuple[int, dict[str, object]]:
data = json.dumps(payload, ensure_ascii=False, sort_keys=True).encode("utf-8")
req = request.Request(url, data=data, headers={"Content-Type": "application/json"}, method="POST")
return perform_request(req, timeout)
def post_multipart(
url: str,
fields: dict[str, str],
file_field: str,
file_name: str,
file_bytes: bytes,
timeout: tuple[float, float],
) -> tuple[int, dict[str, object]]:
boundary = f"----cold-display-{uuid.uuid4().hex}"
body = build_multipart_body(boundary, fields, file_field, file_name, file_bytes)
req = request.Request(
url,
data=body,
headers={"Content-Type": f"multipart/form-data; boundary={boundary}"},
method="POST",
)
return perform_request(req, timeout)
def build_multipart_body(
boundary: str,
fields: dict[str, str],
file_field: str,
file_name: str,
file_bytes: bytes,
) -> bytes:
parts: list[bytes] = []
for key, value in fields.items():
parts.extend(
[
f"--{boundary}\r\n".encode("utf-8"),
f'Content-Disposition: form-data; name="{key}"\r\n\r\n'.encode("utf-8"),
str(value).encode("utf-8"),
b"\r\n",
]
)
parts.extend(
[
f"--{boundary}\r\n".encode("utf-8"),
f'Content-Disposition: form-data; name="{file_field}"; filename="{Path(file_name).name}"\r\n'.encode("utf-8"),
b"Content-Type: image/jpeg\r\n\r\n",
file_bytes,
b"\r\n",
f"--{boundary}--\r\n".encode("utf-8"),
]
)
return b"".join(parts)
def perform_request(req: request.Request, timeout: tuple[float, float]) -> tuple[int, dict[str, object]]:
try:
with request.urlopen(req, timeout=sum(timeout)) as response:
return response.getcode(), decode_json_response(response.read())
except error.HTTPError as exc:
return exc.code, decode_json_response(exc.read())
def decode_json_response(data: bytes) -> dict[str, object]:
if not data:
return {}
try:
payload = json.loads(data.decode("utf-8"))
except (UnicodeDecodeError, json.JSONDecodeError):
return {"error": data.decode("utf-8", errors="replace")}
return payload if isinstance(payload, dict) else {"error": "invalid json response"}

View File

@@ -199,6 +199,29 @@ def format_config_document(data: dict[str, Any]) -> str:
lines.append(f'path = "{_escape(str(case_sink.get("path", "logs/cases.jsonl")))}"')
lines.append("")
alarm_snapshot_upload = data.get("alarm_snapshot_upload", {})
if alarm_snapshot_upload:
lines.append("[alarm_snapshot_upload]")
for key in (
"connect_timeout_seconds",
"encode_timeout_seconds",
"enabled",
"object_key_prefix",
"read_timeout_seconds",
"secret",
"service_url",
):
if key not in alarm_snapshot_upload:
continue
value = alarm_snapshot_upload[key]
if isinstance(value, bool):
lines.append(f"{key} = {str(value).lower()}")
elif isinstance(value, int | float):
lines.append(f"{key} = {value}")
else:
lines.append(f'{key} = "{_escape(str(value))}"')
lines.append("")
webhook_retry_sink = data.get("webhook_retry_sink", {})
if webhook_retry_sink:
lines.append("[webhook_retry_sink]")

View File

@@ -7,6 +7,7 @@ from datetime import datetime
from pathlib import Path
from zoneinfo import ZoneInfo
from cold_display_guard.alarm_snapshots import capture_alert_snapshot
from cold_display_guard.cases import CaseStore, append_case_snapshots, load_case_snapshots
from cold_display_guard.config import load_config_document, load_settings, resolve_config_path, resolve_project_root
from cold_display_guard.engine import BatchEngine
@@ -104,6 +105,7 @@ def run(config_path: str | Path, once: bool = False, max_iterations: int = 0) ->
events = engine.process(observation)
append_jsonl(event_path, events)
case_snapshots = persist_case_updates(case_store, case_path, events)
snapshot_upload = capture_runtime_alarm_snapshot(frame, events, config, now=when)
deliver_runtime_webhooks(
events,
case_snapshots,
@@ -111,6 +113,7 @@ def run(config_path: str | Path, once: bool = False, max_iterations: int = 0) ->
webhook_delivery_path,
retry_path=webhook_retry_path,
now=when,
snapshot_upload=snapshot_upload,
)
append_jsonl(
diagnostics_path,
@@ -173,6 +176,25 @@ def persist_case_updates(case_store: CaseStore, path: Path, events: list[dict[st
return snapshots
def capture_runtime_alarm_snapshot(
frame,
events: list[dict[str, object]],
config: dict[str, object],
*,
now: datetime | None = None,
jpeg_encoder=None,
uploader=None,
) -> dict[str, object] | None:
return capture_alert_snapshot(
frame,
events,
config,
now=now,
jpeg_encoder=jpeg_encoder,
uploader=uploader,
)
def deliver_runtime_webhooks(
events: list[dict[str, object]],
case_snapshots: list[dict[str, object]],
@@ -182,9 +204,26 @@ def deliver_runtime_webhooks(
retry_path: Path | None = None,
http_post=None,
now: datetime | None = None,
snapshot_upload: dict[str, object] | None = None,
) -> None:
send_batch_event_webhooks(events, config, audit_path, retry_path=retry_path, http_post=http_post, now=now)
send_case_webhooks(case_snapshots, config, audit_path, retry_path=retry_path, http_post=http_post, now=now)
send_batch_event_webhooks(
events,
config,
audit_path,
retry_path=retry_path,
http_post=http_post,
now=now,
snapshot_upload=snapshot_upload,
)
send_case_webhooks(
case_snapshots,
config,
audit_path,
retry_path=retry_path,
http_post=http_post,
now=now,
snapshot_upload=snapshot_upload,
)
if retry_path is not None:
drain_webhook_retries(config, retry_path, audit_path, http_post=http_post, now=now)

View File

@@ -311,6 +311,8 @@ def config_payload(ctx: ManageContext) -> dict[str, Any]:
event_path = event_sink_path(ctx, data)
case_path = case_sink_path(ctx, data)
retry_path = webhook_retry_sink_path(ctx, data)
alarm_snapshot_upload = dict(data.get("alarm_snapshot_upload", {}) or {})
alarm_snapshot_upload.pop("secret", None)
webhooks = dict(data.get("webhooks", {}) or {})
webhooks.pop("callback_token", None)
return {
@@ -329,6 +331,7 @@ def config_payload(ctx: ManageContext) -> dict[str, Any]:
"event_sink": {"path": str(event_path)},
"case_sink": {"path": str(case_path)},
"webhook_retry_sink": {"path": str(retry_path)},
"alarm_snapshot_upload": alarm_snapshot_upload,
"webhooks": webhooks,
}

View File

@@ -133,8 +133,12 @@ def load_webhook_settings(config: dict[str, Any]) -> WebhookSettings:
)
def build_batch_event_payload(event: dict[str, object]) -> dict[str, object]:
return {
def build_batch_event_payload(
event: dict[str, object],
*,
snapshot_upload: dict[str, object] | None = None,
) -> dict[str, object]:
payload = {
"kind": "batch_event",
"event": event.get("event", ""),
"ts": event.get("ts", ""),
@@ -145,10 +149,15 @@ def build_batch_event_payload(event: dict[str, object]) -> dict[str, object]:
"severity": event.get("severity", ""),
"state": event.get("state", ""),
}
return attach_snapshot_upload(payload, batch_id=str(event.get("batch_id", "")), snapshot_upload=snapshot_upload)
def build_case_event_payload(snapshot: dict[str, object]) -> dict[str, object]:
return {
def build_case_event_payload(
snapshot: dict[str, object],
*,
snapshot_upload: dict[str, object] | None = None,
) -> dict[str, object]:
payload = {
"kind": "case_event",
"action": infer_case_action(snapshot),
"case_id": snapshot.get("case_id", ""),
@@ -159,6 +168,7 @@ def build_case_event_payload(snapshot: dict[str, object]) -> dict[str, object]:
"handled_source": snapshot.get("handled_source", ""),
"updated_at": snapshot.get("updated_at", ""),
}
return attach_snapshot_upload(payload, batch_id=str(snapshot.get("batch_id", "")), snapshot_upload=snapshot_upload)
def infer_case_action(snapshot: dict[str, object]) -> str:
@@ -177,6 +187,7 @@ def send_batch_event_webhooks(
retry_path: Path | None = None,
http_post: HttpPost | None = None,
now: datetime | None = None,
snapshot_upload: dict[str, object] | None = None,
) -> list[dict[str, object]]:
settings = load_webhook_settings(config)
if not settings.enabled or not settings.event_url:
@@ -186,7 +197,7 @@ def send_batch_event_webhooks(
retry_updates: list[dict[str, object]] = []
store = load_retry_store(retry_path) if retry_path is not None else None
for event in events:
payload = build_batch_event_payload(event)
payload = build_batch_event_payload(event, snapshot_upload=snapshot_upload)
record = deliver_webhook(
settings.event_url,
payload,
@@ -223,6 +234,7 @@ def send_case_webhooks(
retry_path: Path | None = None,
http_post: HttpPost | None = None,
now: datetime | None = None,
snapshot_upload: dict[str, object] | None = None,
) -> list[dict[str, object]]:
settings = load_webhook_settings(config)
if not settings.enabled or not settings.case_url:
@@ -232,7 +244,7 @@ def send_case_webhooks(
retry_updates: list[dict[str, object]] = []
store = load_retry_store(retry_path) if retry_path is not None else None
for snapshot in snapshots:
payload = build_case_event_payload(snapshot)
payload = build_case_event_payload(snapshot, snapshot_upload=snapshot_upload)
record = deliver_webhook(
settings.case_url,
payload,
@@ -455,6 +467,35 @@ def optional_int(value: object) -> int | None:
return None
def attach_snapshot_upload(
payload: dict[str, object],
*,
batch_id: str,
snapshot_upload: dict[str, object] | None,
) -> dict[str, object]:
if not snapshot_upload:
return payload
batch_ids = {str(item).strip() for item in snapshot_upload.get("batch_ids", []) if str(item).strip()}
if batch_ids and batch_id not in batch_ids:
return payload
status = str(snapshot_upload.get("status", "")).strip()
if status:
payload["snapshot_upload_status"] = status
object_key = str(snapshot_upload.get("object_key", "")).strip()
if object_key:
payload["snapshot_object_key"] = object_key
file_name = str(snapshot_upload.get("file_name", "")).strip()
if file_name:
payload["snapshot_file_name"] = file_name
captured_at = str(snapshot_upload.get("captured_at", "")).strip()
if captured_at:
payload["snapshot_captured_at"] = captured_at
error_message = str(snapshot_upload.get("error", "")).strip()
if error_message:
payload["snapshot_upload_error"] = error_message
return payload
def file_ends_with_newline(path: Path) -> bool:
with path.open("rb") as handle:
handle.seek(-1, 2)

View File

@@ -1,31 +1,30 @@
# Task Todo
- [x] Review the current project instructions and check for task-relevant lessons.
- [x] Check repository status before starting retry-queue work.
- [x] Re-verify that `main` includes webhook case management before layering retries on top.
- [x] Inspect the current webhook delivery path, config surface, runtime integration point, and manage API hooks.
- [x] Write the detailed retry-queue implementation plan to `docs/superpowers/plans/2026-06-09-webhook-retry-queue.md`.
- [x] Execute webhook retry queue backend TDD cycle.
- [x] Execute runtime/manage API retry integration TDD cycle.
- [x] Update documentation/config formatting for retry queue settings and sinks.
- [x] Inspect the OTA upload API document and current runtime/webhook capture path.
- [x] Create an isolated worktree for alarm snapshot upload implementation.
- [x] Write the detailed implementation plan to `docs/superpowers/plans/2026-06-09-alarm-snapshot-upload.md`.
- [x] Execute alarm snapshot upload client TDD cycle.
- [x] Execute runtime and webhook payload integration TDD cycle.
- [x] Update config surface, docs, and verification notes.
- [x] Run targeted verification and final full verification.
## Notes
- `tasks/lessons.md` is absent in this repository/worktree, so there were no prior session lessons to review.
- Main branch merge result is available locally at `81f1709`; retry-queue work continues from branch `feat/webhook-retry-queue`.
- Upload API reference: `/Users/glo/code/go/wenma/ai_manager/zd-ai-manager/chunk-upload-oss-service/UPLOAD_API.md`
- User-provided upload target: `https://ota.zhengxinshipin.com`
- User-provided token secret: `change-me-in-production`
## Review
- Plan saved to `docs/superpowers/plans/2026-06-09-webhook-retry-queue.md`.
- Chosen scope keeps the first outbound webhook attempt synchronous, then persists failures into a JSONL-backed retry queue with bounded backoff and dead-letter cutoff.
- Retry queue observability and manual compensation will be exposed through the management API rather than the frontend in this phase.
- Implemented queue-aware webhook delivery in `src/cold_display_guard/webhooks.py`, runtime retry draining in `src/cold_display_guard/main.py`, manage API retry list/drain endpoints in `src/cold_display_guard/manage_api.py`, and config/doc updates in `src/cold_display_guard/config.py`, `config/example.toml`, and `README_zh.md`.
- Plan saved to `docs/superpowers/plans/2026-06-09-alarm-snapshot-upload.md`.
- Chosen implementation keeps snapshot upload entirely outside `BatchEngine` and enriches webhook payloads from the runtime side using the already captured frame.
- Implemented `src/cold_display_guard/alarm_snapshots.py` for JPEG encoding plus OTA chunk-upload orchestration, runtime integration in `src/cold_display_guard/main.py`, webhook payload enrichment in `src/cold_display_guard/webhooks.py`, config exposure/secret stripping in `src/cold_display_guard/config.py` and `src/cold_display_guard/manage_api.py`, and config/doc updates in `config/example.toml` and `README_zh.md`.
- Targeted verification passed:
- `eval "$(/opt/homebrew/bin/pyenv init -)" && PYTHONPATH=src python -m unittest tests/test_webhooks.py -v`
- `eval "$(/opt/homebrew/bin/pyenv init -)" && PYTHONPATH=src python -m unittest tests/test_alarm_snapshots.py -v`
- `eval "$(/opt/homebrew/bin/pyenv init -)" && PYTHONPATH=src python -m unittest tests/test_main.py -v`
- `eval "$(/opt/homebrew/bin/pyenv init -)" && PYTHONPATH=src python -m unittest tests/test_manage_api.py -v`
- `eval "$(/opt/homebrew/bin/pyenv init -)" && PYTHONPATH=src python -m unittest tests/test_config.py -v`
- `eval "$(/opt/homebrew/bin/pyenv init -)" && PYTHONPATH=src python -m unittest tests/test_webhooks.py tests/test_config.py tests/test_manage_api.py -v`
- Final verification passed:
- `eval "$(/opt/homebrew/bin/pyenv init -)" && PYTHONPATH=src python -m unittest discover -s tests -v`
- `cd web && pnpm install --frozen-lockfile && pnpm build`

View File

@@ -0,0 +1,132 @@
from __future__ import annotations
import unittest
from datetime import datetime, timezone
from cold_display_guard.alarm_snapshots import (
capture_alert_snapshot,
load_alarm_snapshot_settings,
upload_snapshot_bytes,
)
from cold_display_guard.vision import Frame
UTC = timezone.utc
class AlarmSnapshotTests(unittest.TestCase):
def test_load_alarm_snapshot_settings_from_config(self) -> None:
settings = load_alarm_snapshot_settings(
{
"alarm_snapshot_upload": {
"enabled": True,
"service_url": "https://ota.zhengxinshipin.com",
"secret": "change-me-in-production",
"object_key_prefix": "alarms/cold-display",
"connect_timeout_seconds": 4,
"read_timeout_seconds": 9,
"encode_timeout_seconds": 7,
}
}
)
self.assertTrue(settings.enabled)
self.assertEqual(settings.service_url, "https://ota.zhengxinshipin.com")
self.assertEqual(settings.secret, "change-me-in-production")
self.assertEqual(settings.object_key_prefix, "alarms/cold-display")
self.assertEqual(settings.connect_timeout_seconds, 4)
self.assertEqual(settings.read_timeout_seconds, 9)
self.assertEqual(settings.encode_timeout_seconds, 7)
def test_upload_snapshot_bytes_uses_documented_chunk_upload_flow(self) -> None:
json_calls: list[tuple[str, dict[str, object]]] = []
chunk_calls: list[tuple[str, dict[str, str], bytes, dict[str, str]]] = []
def fake_post_json(url: str, payload: dict[str, object], timeout: tuple[float, float]) -> tuple[int, dict[str, object]]:
json_calls.append((url, payload))
if url.endswith("/token/generate"):
return 200, {"token": "token-1", "expires_at": 1770003600}
if url.endswith("/upload/init"):
return 200, {"upload_id": "upload-1"}
if url.endswith("/upload/complete"):
return 200, {"upload_id": "upload-1", "object_key": "uploads/alarms/a.jpg", "file_size": 3, "file_md5": "900150983cd24fb0d6963f7d28e17f72"}
raise AssertionError(url)
def fake_post_multipart(
url: str,
fields: dict[str, str],
file_field: str,
file_name: str,
file_bytes: bytes,
timeout: tuple[float, float],
) -> tuple[int, dict[str, object]]:
chunk_calls.append((url, fields, file_bytes, {"file_field": file_field, "file_name": file_name}))
return 200, {"upload_id": "upload-1", "index": 0, "size": len(file_bytes), "received_chunks": 1, "total_chunks": 1}
result = upload_snapshot_bytes(
b"abc",
file_name="alarm.jpg",
object_key_hint="alarms/a.jpg",
settings=load_alarm_snapshot_settings({}),
post_json_request=fake_post_json,
post_multipart_request=fake_post_multipart,
)
self.assertEqual(result["status"], "uploaded")
self.assertEqual(result["object_key"], "uploads/alarms/a.jpg")
self.assertEqual(json_calls[0][0], "https://ota.zhengxinshipin.com/token/generate")
self.assertEqual(json_calls[1][0], "https://ota.zhengxinshipin.com/upload/init")
self.assertEqual(json_calls[2][0], "https://ota.zhengxinshipin.com/upload/complete")
self.assertIn("token=token-1", chunk_calls[0][0])
self.assertIn("upload_id=upload-1", chunk_calls[0][0])
self.assertEqual(chunk_calls[0][1]["chunk_md5"], "900150983cd24fb0d6963f7d28e17f72")
self.assertEqual(chunk_calls[0][3]["file_field"], "chunk")
def test_capture_alert_snapshot_skips_non_alert_events(self) -> None:
result = capture_alert_snapshot(
Frame(width=1, height=1, rgb=b"\x00\x00\x00"),
[{"event": "batch_started", "severity": "info", "batch_id": "batch_1"}],
{},
now=datetime(2026, 6, 9, 9, 0, tzinfo=UTC),
)
self.assertIsNone(result)
def test_capture_alert_snapshot_uploads_current_frame_for_alert_events(self) -> None:
encode_calls: list[Frame] = []
upload_calls: list[tuple[bytes, str, str]] = []
def fake_encode(frame: Frame, timeout_seconds: float) -> bytes:
encode_calls.append(frame)
return b"jpeg-bytes"
def fake_upload(
image_bytes: bytes,
*,
file_name: str,
object_key_hint: str,
settings,
post_json_request=None,
post_multipart_request=None,
) -> dict[str, object]:
upload_calls.append((image_bytes, file_name, object_key_hint))
return {"status": "uploaded", "object_key": "uploads/alarms/test.jpg", "file_name": file_name}
result = capture_alert_snapshot(
Frame(width=1, height=1, rgb=b"\x01\x02\x03"),
[{"event": "time_alarm", "severity": "alarm", "batch_id": "batch_1", "camera_id": "cam_1", "zone_id": "1", "ts": "2026-06-09T09:00:00+00:00"}],
{"alarm_snapshot_upload": {"enabled": True, "object_key_prefix": "alarms/cold-display"}},
now=datetime(2026, 6, 9, 9, 0, tzinfo=UTC),
jpeg_encoder=fake_encode,
uploader=fake_upload,
)
self.assertEqual(len(encode_calls), 1)
self.assertEqual(upload_calls[0][0], b"jpeg-bytes")
self.assertEqual(result["status"], "uploaded")
self.assertEqual(result["object_key"], "uploads/alarms/test.jpg")
self.assertEqual(result["batch_ids"], ["batch_1"])
if __name__ == "__main__":
unittest.main()

View File

@@ -128,6 +128,12 @@ zone_ids = ["1", "2", "3"]
save_config_document(
path,
{
"alarm_snapshot_upload": {
"enabled": True,
"service_url": "https://ota.zhengxinshipin.com",
"secret": "change-me-in-production",
"object_key_prefix": "cold-display/alarms",
},
"webhooks": {
"enabled": True,
"event_url": "https://example.com/events",
@@ -146,6 +152,10 @@ zone_ids = ["1", "2", "3"]
)
text = path.read_text(encoding="utf-8")
self.assertIn("[alarm_snapshot_upload]", text)
self.assertIn('service_url = "https://ota.zhengxinshipin.com"', text)
self.assertIn('secret = "change-me-in-production"', text)
self.assertIn('object_key_prefix = "cold-display/alarms"', text)
self.assertIn("[webhooks]", text)
self.assertIn('event_url = "https://example.com/events"', text)
self.assertIn('case_url = "https://example.com/cases"', text)

View File

@@ -9,11 +9,13 @@ from pathlib import Path
from cold_display_guard.cases import CaseStore
from cold_display_guard.main import (
case_sink_path,
capture_runtime_alarm_snapshot,
deliver_runtime_webhooks,
persist_case_updates,
restore_runtime_state,
webhook_retry_sink_path,
)
from cold_display_guard.vision import Frame
from cold_display_guard.webhooks import load_retry_snapshots
@@ -114,6 +116,29 @@ class RuntimeRestoreTests(unittest.TestCase):
self.assertEqual(deliveries[0][1]["kind"], "batch_event")
self.assertEqual(deliveries[1][1]["kind"], "case_event")
def test_capture_runtime_alarm_snapshot_uses_current_frame_for_alert_events(self) -> None:
frame = Frame(width=1, height=1, rgb=b"\x01\x02\x03")
result = capture_runtime_alarm_snapshot(
frame,
[
{
"event": "time_alarm",
"severity": "alarm",
"batch_id": "batch_000001",
"camera_id": "cam_01",
"zone_id": "1",
"ts": datetime(2026, 6, 9, 9, 0, tzinfo=UTC).isoformat(),
}
],
{"alarm_snapshot_upload": {"enabled": True}},
now=datetime(2026, 6, 9, 9, 0, tzinfo=UTC),
jpeg_encoder=lambda frame, timeout_seconds: b"jpeg",
uploader=lambda image_bytes, **kwargs: {"status": "uploaded", "object_key": "uploads/alarms/a.jpg", "file_name": "a.jpg"},
)
self.assertEqual(result["status"], "uploaded")
self.assertEqual(result["object_key"], "uploads/alarms/a.jpg")
def test_deliver_runtime_webhooks_enqueues_failure_and_drains_due_retry(self) -> None:
attempts = {"count": 0}
@@ -169,6 +194,53 @@ class RuntimeRestoreTests(unittest.TestCase):
self.assertEqual(retries[-1]["status"], "delivered")
self.assertEqual(retries[-1]["attempt_count"], 2)
def test_deliver_runtime_webhooks_includes_snapshot_path_in_alert_payloads(self) -> None:
deliveries: list[dict[str, object]] = []
def fake_post(url: str, payload: dict[str, object], timeout: tuple[float, float]) -> tuple[int, str]:
deliveries.append(payload)
return 200, "ok"
deliver_runtime_webhooks(
[
{
"event": "time_alarm",
"ts": datetime(2026, 6, 9, 9, 0, tzinfo=UTC).isoformat(),
"batch_id": "batch_000001",
"camera_id": "cam_01",
"zone_id": "1",
"zone_label": "区域 1",
"severity": "alarm",
"state": "alerted",
}
],
[
{
"case_id": "case_batch_000001",
"batch_id": "batch_000001",
"case_type": "time_alarm",
"case_status": "open",
"source_event": "time_alarm",
"handled_source": "",
"created_at": datetime(2026, 6, 9, 9, 0, tzinfo=UTC).isoformat(),
"updated_at": datetime(2026, 6, 9, 9, 0, tzinfo=UTC).isoformat(),
}
],
{
"webhooks": {
"enabled": True,
"event_url": "https://example.com/events",
"case_url": "https://example.com/cases",
}
},
Path(tempfile.mkdtemp()) / "webhook_delivery.jsonl",
http_post=fake_post,
snapshot_upload={"status": "uploaded", "object_key": "uploads/alarms/a.jpg", "batch_ids": ["batch_000001"]},
)
self.assertEqual(deliveries[0]["snapshot_object_key"], "uploads/alarms/a.jpg")
self.assertEqual(deliveries[1]["snapshot_object_key"], "uploads/alarms/a.jpg")
def test_restore_runtime_state_uses_stable_occupancy_when_raw_metrics_flicker(self) -> None:
with tempfile.TemporaryDirectory() as tmpdir:
diagnostics_path = Path(tmpdir) / "runtime_diagnostics.jsonl"

View File

@@ -392,6 +392,11 @@ class ManageApiTests(unittest.TestCase):
{
"case_sink": {"path": "logs/cases.jsonl"},
"webhook_retry_sink": {"path": "logs/webhook_retry.jsonl"},
"alarm_snapshot_upload": {
"enabled": True,
"service_url": "https://ota.zhengxinshipin.com",
"secret": "change-me-in-production",
},
"webhooks": {
"enabled": True,
"event_url": "https://example.com/events",
@@ -406,6 +411,9 @@ class ManageApiTests(unittest.TestCase):
self.assertEqual(payload["case_sink"]["path"], str((root / "logs" / "cases.jsonl").resolve()))
self.assertEqual(payload["webhook_retry_sink"]["path"], str((root / "logs" / "webhook_retry.jsonl").resolve()))
self.assertTrue(payload["alarm_snapshot_upload"]["enabled"])
self.assertEqual(payload["alarm_snapshot_upload"]["service_url"], "https://ota.zhengxinshipin.com")
self.assertNotIn("secret", payload["alarm_snapshot_upload"])
self.assertTrue(payload["webhooks"]["enabled"])
self.assertEqual(payload["webhooks"]["retry_max_attempts"], 4)
self.assertNotIn("callback_token", payload["webhooks"])

View File

@@ -68,6 +68,24 @@ class WebhookTests(unittest.TestCase):
self.assertEqual(payload["event"], "time_alarm")
self.assertEqual(payload["zone_label"], "区域 1")
def test_build_batch_event_payload_includes_uploaded_snapshot_path(self) -> None:
payload = build_batch_event_payload(
{
"event": "time_alarm",
"ts": datetime(2026, 6, 9, 9, 0, tzinfo=UTC).isoformat(),
"batch_id": "batch_000001",
"camera_id": "cam_01",
"zone_id": "1",
"zone_label": "区域 1",
"severity": "alarm",
"state": "alerted",
},
snapshot_upload={"status": "uploaded", "object_key": "uploads/alarms/a.jpg", "batch_ids": ["batch_000001"]},
)
self.assertEqual(payload["snapshot_upload_status"], "uploaded")
self.assertEqual(payload["snapshot_object_key"], "uploads/alarms/a.jpg")
def test_build_case_event_payload_wraps_case_snapshot(self) -> None:
payload = build_case_event_payload(
{
@@ -85,6 +103,23 @@ class WebhookTests(unittest.TestCase):
self.assertEqual(payload["action"], "updated")
self.assertEqual(payload["case_id"], "case_batch_000001")
def test_build_case_event_payload_includes_uploaded_snapshot_path(self) -> None:
payload = build_case_event_payload(
{
"case_id": "case_batch_000001",
"case_type": "warning_escalated",
"case_status": "open",
"batch_id": "batch_000001",
"source_event": "warning_escalated",
"handled_source": "",
"updated_at": datetime(2026, 6, 9, 9, 5, tzinfo=UTC).isoformat(),
},
snapshot_upload={"status": "uploaded", "object_key": "uploads/alarms/a.jpg", "batch_ids": ["batch_000001"]},
)
self.assertEqual(payload["snapshot_upload_status"], "uploaded")
self.assertEqual(payload["snapshot_object_key"], "uploads/alarms/a.jpg")
def test_send_batch_event_webhooks_delivers_payload(self) -> None:
deliveries: list[tuple[str, dict[str, object], tuple[float, float]]] = []