- Skip half_hour_report events from webhook posts in people_flow - Handle pre-existing stale worker status files during startup gracefully - Make store_dwell_alert timestamp parsing robust against invalid/empty values - Update lessons learned and todo documentation Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
40 lines
1.0 KiB
Python
40 lines
1.0 KiB
Python
from __future__ import annotations
|
|
|
|
import json
|
|
from pathlib import Path
|
|
from urllib import request
|
|
|
|
|
|
def _payload_for_webhook(payload: dict) -> dict:
|
|
outbound = dict(payload)
|
|
outbound.pop("tracks", None)
|
|
return outbound
|
|
|
|
|
|
def _should_post_webhook(payload: dict) -> bool:
|
|
return payload.get("event") != "half_hour_report"
|
|
|
|
|
|
def dispatch_json_event(
|
|
path: str | Path,
|
|
payload: dict,
|
|
webhook_url: str = "",
|
|
timeout_seconds: float = 5.0,
|
|
) -> None:
|
|
output_path = Path(path)
|
|
output_path.parent.mkdir(parents=True, exist_ok=True)
|
|
with output_path.open("a", encoding="utf-8") as handle:
|
|
handle.write(json.dumps(payload, ensure_ascii=False) + "\n")
|
|
|
|
if not webhook_url.strip() or not _should_post_webhook(payload):
|
|
return
|
|
|
|
req = request.Request(
|
|
url=webhook_url,
|
|
data=json.dumps(_payload_for_webhook(payload)).encode("utf-8"),
|
|
method="POST",
|
|
)
|
|
req.add_header("Content-Type", "application/json")
|
|
with request.urlopen(req, timeout=timeout_seconds):
|
|
return
|