chore: commit all pending changes
This commit is contained in:
@@ -5,6 +5,12 @@ from pathlib import Path
|
||||
from urllib import request
|
||||
|
||||
|
||||
def _payload_for_webhook(payload: dict) -> dict:
|
||||
outbound = dict(payload)
|
||||
outbound.pop("tracks", None)
|
||||
return outbound
|
||||
|
||||
|
||||
def dispatch_json_event(
|
||||
path: str | Path,
|
||||
payload: dict,
|
||||
@@ -21,7 +27,7 @@ def dispatch_json_event(
|
||||
|
||||
req = request.Request(
|
||||
url=webhook_url,
|
||||
data=json.dumps(payload).encode("utf-8"),
|
||||
data=json.dumps(_payload_for_webhook(payload)).encode("utf-8"),
|
||||
method="POST",
|
||||
)
|
||||
req.add_header("Content-Type", "application/json")
|
||||
|
||||
52
managed/people_flow_project/tests/test_webhook.py
Normal file
52
managed/people_flow_project/tests/test_webhook.py
Normal file
@@ -0,0 +1,52 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
|
||||
from src.people_flow.webhook import dispatch_json_event
|
||||
|
||||
|
||||
def test_dispatch_json_event_omits_tracks_from_webhook_but_keeps_local_log(
|
||||
tmp_path, monkeypatch
|
||||
):
|
||||
sent: dict[str, object] = {}
|
||||
|
||||
class DummyResponse:
|
||||
def __enter__(self):
|
||||
return self
|
||||
|
||||
def __exit__(self, exc_type, exc, tb):
|
||||
return False
|
||||
|
||||
def fake_urlopen(req, timeout):
|
||||
sent["url"] = req.full_url
|
||||
sent["timeout"] = timeout
|
||||
sent["payload"] = json.loads(req.data.decode("utf-8"))
|
||||
return DummyResponse()
|
||||
|
||||
monkeypatch.setattr("src.people_flow.webhook.request.urlopen", fake_urlopen)
|
||||
|
||||
output = tmp_path / "logs" / "events.jsonl"
|
||||
payload = {
|
||||
"event": "half_hour_report",
|
||||
"total_people": 3,
|
||||
"tracks": [
|
||||
{"track_id": 1, "direction": "in"},
|
||||
{"track_id": 2, "direction": "out"},
|
||||
],
|
||||
}
|
||||
|
||||
dispatch_json_event(
|
||||
output,
|
||||
payload,
|
||||
webhook_url="https://example.test/webhook",
|
||||
timeout_seconds=7.5,
|
||||
)
|
||||
|
||||
lines = output.read_text(encoding="utf-8").splitlines()
|
||||
assert json.loads(lines[0]) == payload
|
||||
assert sent["url"] == "https://example.test/webhook"
|
||||
assert sent["timeout"] == 7.5
|
||||
assert sent["payload"] == {
|
||||
"event": "half_hour_report",
|
||||
"total_people": 3,
|
||||
}
|
||||
@@ -1,6 +1,9 @@
|
||||
import json
|
||||
from datetime import datetime
|
||||
from zoneinfo import ZoneInfo
|
||||
|
||||
from app.modules.notifier import append_json_event
|
||||
from app.modules.dwell_engine import DwellEngine
|
||||
from app.modules.notifier import append_json_event, dispatch_json_event
|
||||
|
||||
|
||||
def test_append_json_event_writes_jsonl(tmp_path):
|
||||
@@ -13,3 +16,54 @@ def test_append_json_event_writes_jsonl(tmp_path):
|
||||
|
||||
assert json.loads(lines[0]) == {"event": "long_stay_alert", "count": 5}
|
||||
assert json.loads(lines[1]) == {"event": "half_hour_report", "count": 3}
|
||||
|
||||
|
||||
def test_dispatch_json_event_posts_report_without_tracks(tmp_path, monkeypatch):
|
||||
tz = ZoneInfo("Asia/Shanghai")
|
||||
window_start = datetime(2026, 4, 15, 11, 7, tzinfo=tz)
|
||||
engine = DwellEngine(
|
||||
camera_id="store_cam_01",
|
||||
queue_time_threshold_seconds=300,
|
||||
crowded_count_threshold=5,
|
||||
normal_count_threshold=2,
|
||||
pause_timeout_seconds=300,
|
||||
alert_cooldown_seconds=600,
|
||||
report_window_start=window_start,
|
||||
)
|
||||
|
||||
engine.process_observations(
|
||||
[{"person_id": f"cust_{idx}", "role": "customer"} for idx in range(6)],
|
||||
window_start,
|
||||
)
|
||||
events = engine.process_observations([], window_start.replace(minute=37))
|
||||
report_event = next(event for event in events if event["event"] == "half_hour_report")
|
||||
|
||||
sent: dict[str, object] = {}
|
||||
|
||||
class DummyResponse:
|
||||
def __enter__(self):
|
||||
return self
|
||||
|
||||
def __exit__(self, exc_type, exc, tb):
|
||||
return False
|
||||
|
||||
def fake_urlopen(req, timeout):
|
||||
sent["url"] = req.full_url
|
||||
sent["timeout"] = timeout
|
||||
sent["payload"] = json.loads(req.data.decode("utf-8"))
|
||||
return DummyResponse()
|
||||
|
||||
monkeypatch.setattr("app.modules.notifier.request.urlopen", fake_urlopen)
|
||||
|
||||
output = tmp_path / "logs" / "events.jsonl"
|
||||
dispatch_json_event(
|
||||
output,
|
||||
report_event,
|
||||
webhook_url="https://example.test/webhook",
|
||||
timeout_seconds=5.0,
|
||||
)
|
||||
|
||||
assert sent["url"] == "https://example.test/webhook"
|
||||
assert sent["timeout"] == 5.0
|
||||
assert sent["payload"]["event"] == "half_hour_report"
|
||||
assert "tracks" not in sent["payload"]
|
||||
|
||||
Reference in New Issue
Block a user