feat: add webhook retry queue
This commit is contained in:
@@ -135,8 +135,13 @@ zone_ids = ["1", "2", "3"]
|
||||
"callback_token": "secret",
|
||||
"connect_timeout_seconds": 3,
|
||||
"read_timeout_seconds": 5,
|
||||
"retry_max_attempts": 4,
|
||||
"retry_backoff_seconds": 30,
|
||||
"retry_max_backoff_seconds": 300,
|
||||
"retry_batch_limit": 12,
|
||||
},
|
||||
"case_sink": {"path": "logs/cases.jsonl"},
|
||||
"webhook_retry_sink": {"path": "logs/webhook_retry.jsonl"},
|
||||
},
|
||||
)
|
||||
text = path.read_text(encoding="utf-8")
|
||||
@@ -145,8 +150,14 @@ zone_ids = ["1", "2", "3"]
|
||||
self.assertIn('event_url = "https://example.com/events"', text)
|
||||
self.assertIn('case_url = "https://example.com/cases"', text)
|
||||
self.assertIn('callback_token = "secret"', text)
|
||||
self.assertIn("retry_max_attempts = 4", text)
|
||||
self.assertIn("retry_backoff_seconds = 30", text)
|
||||
self.assertIn("retry_max_backoff_seconds = 300", text)
|
||||
self.assertIn("retry_batch_limit = 12", text)
|
||||
self.assertIn("[case_sink]", text)
|
||||
self.assertIn('path = "logs/cases.jsonl"', text)
|
||||
self.assertIn("[webhook_retry_sink]", text)
|
||||
self.assertIn('path = "logs/webhook_retry.jsonl"', text)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
@@ -7,7 +7,14 @@ from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
|
||||
from cold_display_guard.cases import CaseStore
|
||||
from cold_display_guard.main import case_sink_path, deliver_runtime_webhooks, persist_case_updates, restore_runtime_state
|
||||
from cold_display_guard.main import (
|
||||
case_sink_path,
|
||||
deliver_runtime_webhooks,
|
||||
persist_case_updates,
|
||||
restore_runtime_state,
|
||||
webhook_retry_sink_path,
|
||||
)
|
||||
from cold_display_guard.webhooks import load_retry_snapshots
|
||||
|
||||
|
||||
UTC = timezone.utc
|
||||
@@ -22,6 +29,14 @@ class RuntimeRestoreTests(unittest.TestCase):
|
||||
|
||||
self.assertEqual(path, (root / "logs" / "cases.jsonl").resolve())
|
||||
|
||||
def test_webhook_retry_sink_path_uses_default_logs_location(self) -> None:
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
root = Path(tmpdir)
|
||||
|
||||
path = webhook_retry_sink_path(root, {})
|
||||
|
||||
self.assertEqual(path, (root / "logs" / "webhook_retry.jsonl").resolve())
|
||||
|
||||
def test_persist_case_updates_writes_case_snapshots(self) -> None:
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
path = Path(tmpdir) / "cases.jsonl"
|
||||
@@ -99,6 +114,61 @@ class RuntimeRestoreTests(unittest.TestCase):
|
||||
self.assertEqual(deliveries[0][1]["kind"], "batch_event")
|
||||
self.assertEqual(deliveries[1][1]["kind"], "case_event")
|
||||
|
||||
def test_deliver_runtime_webhooks_enqueues_failure_and_drains_due_retry(self) -> None:
|
||||
attempts = {"count": 0}
|
||||
|
||||
def flaky_post(url: str, payload: dict[str, object], timeout: tuple[float, float]) -> tuple[int, str]:
|
||||
attempts["count"] += 1
|
||||
if attempts["count"] == 1:
|
||||
return 503, "down"
|
||||
return 200, "ok"
|
||||
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
audit_path = Path(tmpdir) / "webhook_delivery.jsonl"
|
||||
retry_path = Path(tmpdir) / "webhook_retry.jsonl"
|
||||
config = {
|
||||
"webhooks": {
|
||||
"enabled": True,
|
||||
"event_url": "https://example.com/events",
|
||||
"retry_max_attempts": 3,
|
||||
"retry_backoff_seconds": 30,
|
||||
}
|
||||
}
|
||||
deliver_runtime_webhooks(
|
||||
[
|
||||
{
|
||||
"event": "time_alarm",
|
||||
"ts": datetime(2026, 6, 9, 9, 0, tzinfo=UTC).isoformat(),
|
||||
"batch_id": "batch_000001",
|
||||
"camera_id": "cam_01",
|
||||
"zone_id": "1",
|
||||
"zone_label": "区域 1",
|
||||
"severity": "alarm",
|
||||
"state": "alerted",
|
||||
}
|
||||
],
|
||||
[],
|
||||
config,
|
||||
audit_path,
|
||||
retry_path=retry_path,
|
||||
http_post=flaky_post,
|
||||
now=datetime(2026, 6, 9, 9, 0, tzinfo=UTC),
|
||||
)
|
||||
deliver_runtime_webhooks(
|
||||
[],
|
||||
[],
|
||||
config,
|
||||
audit_path,
|
||||
retry_path=retry_path,
|
||||
http_post=flaky_post,
|
||||
now=datetime(2026, 6, 9, 9, 1, tzinfo=UTC),
|
||||
)
|
||||
retries = load_retry_snapshots(retry_path)
|
||||
|
||||
self.assertEqual(attempts["count"], 2)
|
||||
self.assertEqual(retries[-1]["status"], "delivered")
|
||||
self.assertEqual(retries[-1]["attempt_count"], 2)
|
||||
|
||||
def test_restore_runtime_state_uses_stable_occupancy_when_raw_metrics_flicker(self) -> None:
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
diagnostics_path = Path(tmpdir) / "runtime_diagnostics.jsonl"
|
||||
|
||||
@@ -7,6 +7,7 @@ import threading
|
||||
import unittest
|
||||
from http.server import ThreadingHTTPServer
|
||||
from pathlib import Path
|
||||
from unittest import mock
|
||||
|
||||
from cold_display_guard.config import load_config_document, merge_calibration, save_config_document
|
||||
from cold_display_guard.manage_api import ManageContext, build_summary, config_payload, create_handler
|
||||
@@ -390,11 +391,13 @@ class ManageApiTests(unittest.TestCase):
|
||||
config_path,
|
||||
{
|
||||
"case_sink": {"path": "logs/cases.jsonl"},
|
||||
"webhook_retry_sink": {"path": "logs/webhook_retry.jsonl"},
|
||||
"webhooks": {
|
||||
"enabled": True,
|
||||
"event_url": "https://example.com/events",
|
||||
"case_url": "https://example.com/cases",
|
||||
"callback_token": "secret",
|
||||
"retry_max_attempts": 4,
|
||||
},
|
||||
},
|
||||
)
|
||||
@@ -402,7 +405,9 @@ class ManageApiTests(unittest.TestCase):
|
||||
payload = config_payload(ManageContext(config_path=config_path, project_root=root))
|
||||
|
||||
self.assertEqual(payload["case_sink"]["path"], str((root / "logs" / "cases.jsonl").resolve()))
|
||||
self.assertEqual(payload["webhook_retry_sink"]["path"], str((root / "logs" / "webhook_retry.jsonl").resolve()))
|
||||
self.assertTrue(payload["webhooks"]["enabled"])
|
||||
self.assertEqual(payload["webhooks"]["retry_max_attempts"], 4)
|
||||
self.assertNotIn("callback_token", payload["webhooks"])
|
||||
|
||||
def test_cases_endpoint_returns_latest_snapshots(self) -> None:
|
||||
@@ -560,6 +565,144 @@ class ManageApiTests(unittest.TestCase):
|
||||
self.assertEqual(lines[-1]["handled_source"], "manual")
|
||||
self.assertEqual(lines[-1]["payload"]["note"], "checked")
|
||||
|
||||
def test_manual_handle_endpoint_enqueues_failed_case_webhook_for_retry(self) -> None:
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
root = Path(tmpdir)
|
||||
config_path = root / "config" / "local.toml"
|
||||
save_config_document(
|
||||
config_path,
|
||||
{
|
||||
"case_sink": {"path": "logs/cases.jsonl"},
|
||||
"webhooks": {
|
||||
"enabled": True,
|
||||
"case_url": "https://example.com/cases",
|
||||
"retry_max_attempts": 3,
|
||||
"retry_backoff_seconds": 30,
|
||||
},
|
||||
"layout": {"zone_ids": ["1"]},
|
||||
},
|
||||
)
|
||||
cases_path = root / "logs" / "cases.jsonl"
|
||||
retry_path = root / "logs" / "webhook_retry.jsonl"
|
||||
cases_path.parent.mkdir()
|
||||
cases_path.write_text(
|
||||
json.dumps(
|
||||
{
|
||||
"case_id": "case_batch_000001",
|
||||
"batch_id": "batch_000001",
|
||||
"camera_id": "cam_01",
|
||||
"zone_id": "1",
|
||||
"zone_label": "区域 1",
|
||||
"case_type": "time_alarm",
|
||||
"case_status": "open",
|
||||
"source_event": "time_alarm",
|
||||
"created_at": "2026-06-09T09:00:00+08:00",
|
||||
"updated_at": "2026-06-09T09:00:00+08:00",
|
||||
"payload": {},
|
||||
}
|
||||
),
|
||||
encoding="utf-8",
|
||||
)
|
||||
ctx = ManageContext(config_path=config_path, project_root=root)
|
||||
server, thread = self._serve_once(ctx)
|
||||
try:
|
||||
with mock.patch("cold_display_guard.webhooks.post_json", side_effect=OSError("network down")):
|
||||
status, payload = self._request(
|
||||
server,
|
||||
"POST",
|
||||
"/api/manage/cases/case_batch_000001/handle",
|
||||
body={"handled_by": "alice"},
|
||||
)
|
||||
finally:
|
||||
self._stop_server(server, thread)
|
||||
|
||||
retries = [json.loads(line) for line in retry_path.read_text(encoding="utf-8").splitlines()]
|
||||
|
||||
self.assertEqual(status, 200)
|
||||
self.assertEqual(payload["case_status"], "handled")
|
||||
self.assertEqual(retries[-1]["status"], "pending")
|
||||
self.assertEqual(retries[-1]["target"], "case_event")
|
||||
self.assertEqual(retries[-1]["attempt_count"], 1)
|
||||
|
||||
def test_retry_queue_endpoint_returns_pending_items(self) -> None:
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
root = Path(tmpdir)
|
||||
config_path = root / "config" / "local.toml"
|
||||
save_config_document(config_path, {"layout": {"zone_ids": ["1"]}})
|
||||
retry_path = root / "logs" / "webhook_retry.jsonl"
|
||||
retry_path.parent.mkdir()
|
||||
retry_path.write_text(
|
||||
json.dumps(
|
||||
{
|
||||
"retry_id": "retry_000001",
|
||||
"target": "case_event",
|
||||
"url": "https://example.com/cases",
|
||||
"status": "pending",
|
||||
"attempt_count": 1,
|
||||
"payload": {"kind": "case_event"},
|
||||
"created_at": "2026-06-09T09:00:00+08:00",
|
||||
"updated_at": "2026-06-09T09:00:00+08:00",
|
||||
"next_attempt_at": "2026-06-09T09:01:00+08:00",
|
||||
}
|
||||
),
|
||||
encoding="utf-8",
|
||||
)
|
||||
ctx = ManageContext(config_path=config_path, project_root=root)
|
||||
server, thread = self._serve_once(ctx)
|
||||
try:
|
||||
status, payload = self._request(server, "GET", "/api/manage/webhooks/retries?status=pending")
|
||||
finally:
|
||||
self._stop_server(server, thread)
|
||||
|
||||
self.assertEqual(status, 200)
|
||||
self.assertEqual(payload["items"][0]["retry_id"], "retry_000001")
|
||||
self.assertEqual(payload["items"][0]["status"], "pending")
|
||||
|
||||
def test_retry_drain_endpoint_retries_pending_item(self) -> None:
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
root = Path(tmpdir)
|
||||
config_path = root / "config" / "local.toml"
|
||||
save_config_document(
|
||||
config_path,
|
||||
{
|
||||
"webhooks": {"enabled": True, "retry_max_attempts": 3, "retry_backoff_seconds": 30},
|
||||
"layout": {"zone_ids": ["1"]},
|
||||
},
|
||||
)
|
||||
retry_path = root / "logs" / "webhook_retry.jsonl"
|
||||
retry_path.parent.mkdir()
|
||||
retry_path.write_text(
|
||||
json.dumps(
|
||||
{
|
||||
"retry_id": "retry_000001",
|
||||
"target": "case_event",
|
||||
"url": "https://example.com/cases",
|
||||
"status": "pending",
|
||||
"attempt_count": 1,
|
||||
"payload": {"kind": "case_event", "case_id": "case_batch_000001"},
|
||||
"created_at": "2026-06-09T09:00:00+08:00",
|
||||
"updated_at": "2026-06-09T09:00:00+08:00",
|
||||
"next_attempt_at": "2026-06-09T09:01:00+08:00",
|
||||
}
|
||||
),
|
||||
encoding="utf-8",
|
||||
)
|
||||
ctx = ManageContext(config_path=config_path, project_root=root)
|
||||
server, thread = self._serve_once(ctx)
|
||||
try:
|
||||
with mock.patch("cold_display_guard.webhooks.post_json", return_value=(200, "ok")):
|
||||
status, payload = self._request(server, "POST", "/api/manage/webhooks/retries/drain", body={})
|
||||
finally:
|
||||
self._stop_server(server, thread)
|
||||
|
||||
lines = [json.loads(line) for line in retry_path.read_text(encoding="utf-8").splitlines()]
|
||||
|
||||
self.assertEqual(status, 200)
|
||||
self.assertEqual(payload["retried_count"], 1)
|
||||
self.assertEqual(payload["delivered_count"], 1)
|
||||
self.assertEqual(lines[-1]["status"], "delivered")
|
||||
self.assertEqual(lines[-1]["attempt_count"], 2)
|
||||
|
||||
def test_callback_endpoint_requires_token_and_handles_case(self) -> None:
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
root = Path(tmpdir)
|
||||
|
||||
@@ -9,7 +9,9 @@ from pathlib import Path
|
||||
from cold_display_guard.webhooks import (
|
||||
build_batch_event_payload,
|
||||
build_case_event_payload,
|
||||
drain_webhook_retries,
|
||||
load_webhook_settings,
|
||||
load_retry_snapshots,
|
||||
send_batch_event_webhooks,
|
||||
send_case_webhooks,
|
||||
)
|
||||
@@ -29,6 +31,10 @@ class WebhookTests(unittest.TestCase):
|
||||
"callback_token": "secret",
|
||||
"connect_timeout_seconds": 4,
|
||||
"read_timeout_seconds": 6,
|
||||
"retry_max_attempts": 4,
|
||||
"retry_backoff_seconds": 15,
|
||||
"retry_max_backoff_seconds": 90,
|
||||
"retry_batch_limit": 8,
|
||||
}
|
||||
}
|
||||
)
|
||||
@@ -39,6 +45,10 @@ class WebhookTests(unittest.TestCase):
|
||||
self.assertEqual(settings.callback_token, "secret")
|
||||
self.assertEqual(settings.connect_timeout_seconds, 4)
|
||||
self.assertEqual(settings.read_timeout_seconds, 6)
|
||||
self.assertEqual(settings.retry_max_attempts, 4)
|
||||
self.assertEqual(settings.retry_backoff_seconds, 15)
|
||||
self.assertEqual(settings.retry_max_backoff_seconds, 90)
|
||||
self.assertEqual(settings.retry_batch_limit, 8)
|
||||
|
||||
def test_build_batch_event_payload_wraps_runtime_event(self) -> None:
|
||||
payload = build_batch_event_payload(
|
||||
@@ -182,6 +192,139 @@ class WebhookTests(unittest.TestCase):
|
||||
self.assertEqual(logged[0]["target"], "batch_event")
|
||||
self.assertIn("network down", logged[0]["message"])
|
||||
|
||||
def test_non_2xx_delivery_is_enqueued_for_retry(self) -> None:
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
audit_path = Path(tmpdir) / "webhook_delivery.jsonl"
|
||||
retry_path = Path(tmpdir) / "webhook_retry.jsonl"
|
||||
send_batch_event_webhooks(
|
||||
[
|
||||
{
|
||||
"event": "time_alarm",
|
||||
"ts": datetime(2026, 6, 9, 9, 0, tzinfo=UTC).isoformat(),
|
||||
"batch_id": "batch_000001",
|
||||
"camera_id": "cam_01",
|
||||
"zone_id": "1",
|
||||
"zone_label": "区域 1",
|
||||
"severity": "alarm",
|
||||
"state": "alerted",
|
||||
}
|
||||
],
|
||||
{
|
||||
"webhooks": {
|
||||
"enabled": True,
|
||||
"event_url": "https://example.com/events",
|
||||
"retry_max_attempts": 3,
|
||||
"retry_backoff_seconds": 30,
|
||||
}
|
||||
},
|
||||
audit_path,
|
||||
retry_path=retry_path,
|
||||
http_post=lambda url, payload, timeout: (503, "service unavailable"),
|
||||
now=datetime(2026, 6, 9, 9, 0, tzinfo=UTC),
|
||||
)
|
||||
|
||||
retries = load_retry_snapshots(retry_path)
|
||||
logged = [json.loads(line) for line in audit_path.read_text(encoding="utf-8").splitlines()]
|
||||
|
||||
self.assertEqual(logged[0]["status"], "error")
|
||||
self.assertEqual(logged[0]["status_code"], 503)
|
||||
self.assertEqual(retries[-1]["status"], "pending")
|
||||
self.assertEqual(retries[-1]["attempt_count"], 1)
|
||||
self.assertEqual(retries[-1]["target"], "batch_event")
|
||||
self.assertEqual(retries[-1]["url"], "https://example.com/events")
|
||||
|
||||
def test_due_retry_is_marked_delivered_after_success(self) -> None:
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
audit_path = Path(tmpdir) / "webhook_delivery.jsonl"
|
||||
retry_path = Path(tmpdir) / "webhook_retry.jsonl"
|
||||
config = {
|
||||
"webhooks": {
|
||||
"enabled": True,
|
||||
"event_url": "https://example.com/events",
|
||||
"retry_max_attempts": 3,
|
||||
"retry_backoff_seconds": 30,
|
||||
}
|
||||
}
|
||||
send_batch_event_webhooks(
|
||||
[
|
||||
{
|
||||
"event": "time_alarm",
|
||||
"ts": datetime(2026, 6, 9, 9, 0, tzinfo=UTC).isoformat(),
|
||||
"batch_id": "batch_000001",
|
||||
"camera_id": "cam_01",
|
||||
"zone_id": "1",
|
||||
"zone_label": "区域 1",
|
||||
"severity": "alarm",
|
||||
"state": "alerted",
|
||||
}
|
||||
],
|
||||
config,
|
||||
audit_path,
|
||||
retry_path=retry_path,
|
||||
http_post=lambda url, payload, timeout: (503, "service unavailable"),
|
||||
now=datetime(2026, 6, 9, 9, 0, tzinfo=UTC),
|
||||
)
|
||||
|
||||
drained = drain_webhook_retries(
|
||||
config,
|
||||
retry_path,
|
||||
audit_path,
|
||||
http_post=lambda url, payload, timeout: (200, "ok"),
|
||||
now=datetime(2026, 6, 9, 9, 1, tzinfo=UTC),
|
||||
)
|
||||
retries = load_retry_snapshots(retry_path)
|
||||
|
||||
self.assertEqual(len(drained), 1)
|
||||
self.assertEqual(retries[-1]["status"], "delivered")
|
||||
self.assertEqual(retries[-1]["attempt_count"], 2)
|
||||
self.assertEqual(retries[-1]["last_status_code"], 200)
|
||||
|
||||
def test_retry_reaches_dead_letter_after_attempt_limit(self) -> None:
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
audit_path = Path(tmpdir) / "webhook_delivery.jsonl"
|
||||
retry_path = Path(tmpdir) / "webhook_retry.jsonl"
|
||||
config = {
|
||||
"webhooks": {
|
||||
"enabled": True,
|
||||
"event_url": "https://example.com/events",
|
||||
"retry_max_attempts": 2,
|
||||
"retry_backoff_seconds": 30,
|
||||
}
|
||||
}
|
||||
send_batch_event_webhooks(
|
||||
[
|
||||
{
|
||||
"event": "time_alarm",
|
||||
"ts": datetime(2026, 6, 9, 9, 0, tzinfo=UTC).isoformat(),
|
||||
"batch_id": "batch_000001",
|
||||
"camera_id": "cam_01",
|
||||
"zone_id": "1",
|
||||
"zone_label": "区域 1",
|
||||
"severity": "alarm",
|
||||
"state": "alerted",
|
||||
}
|
||||
],
|
||||
config,
|
||||
audit_path,
|
||||
retry_path=retry_path,
|
||||
http_post=lambda url, payload, timeout: (503, "service unavailable"),
|
||||
now=datetime(2026, 6, 9, 9, 0, tzinfo=UTC),
|
||||
)
|
||||
|
||||
drained = drain_webhook_retries(
|
||||
config,
|
||||
retry_path,
|
||||
audit_path,
|
||||
http_post=lambda url, payload, timeout: (503, "still down"),
|
||||
now=datetime(2026, 6, 9, 9, 1, tzinfo=UTC),
|
||||
)
|
||||
retries = load_retry_snapshots(retry_path)
|
||||
|
||||
self.assertEqual(len(drained), 1)
|
||||
self.assertEqual(retries[-1]["status"], "dead_letter")
|
||||
self.assertEqual(retries[-1]["attempt_count"], 2)
|
||||
self.assertEqual(retries[-1]["last_status_code"], 503)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
|
||||
Reference in New Issue
Block a user