feat: implement rolling half-hour report windows anchored to service startup time
This commit is contained in:
@@ -26,6 +26,7 @@ def build_app(config_path: str | Path | None = None) -> dict:
|
||||
project_root = resolve_project_root(resolved_config_path)
|
||||
event_sink_path = resolve_project_path(project_root, config.event_sink.path)
|
||||
gallery_dir = resolve_project_path(project_root, config.staff.gallery_dir)
|
||||
startup_time = datetime.now().astimezone()
|
||||
|
||||
return {
|
||||
"config": config,
|
||||
@@ -51,6 +52,7 @@ def build_app(config_path: str | Path | None = None) -> dict:
|
||||
normal_count_threshold=config.thresholds.normal_count_threshold,
|
||||
pause_timeout_seconds=config.thresholds.pause_timeout_seconds,
|
||||
alert_cooldown_seconds=config.thresholds.alert_cooldown_seconds,
|
||||
report_window_start=startup_time,
|
||||
),
|
||||
"notifier": lambda path, event: dispatch_json_event(
|
||||
path,
|
||||
|
||||
@@ -3,7 +3,7 @@ from __future__ import annotations
|
||||
from dataclasses import dataclass, field
|
||||
from datetime import datetime
|
||||
|
||||
from app.modules.reporter import floor_half_hour, previous_half_hour_window
|
||||
from app.modules.reporter import initial_half_hour_window, should_emit_half_hour_report
|
||||
|
||||
|
||||
@dataclass(slots=True)
|
||||
@@ -105,6 +105,7 @@ class DwellEngine:
|
||||
normal_count_threshold: int,
|
||||
pause_timeout_seconds: int,
|
||||
alert_cooldown_seconds: int,
|
||||
report_window_start: datetime | None = None,
|
||||
) -> None:
|
||||
self.camera_id = camera_id
|
||||
self.queue_time_threshold_seconds = queue_time_threshold_seconds
|
||||
@@ -117,8 +118,13 @@ class DwellEngine:
|
||||
self.session_counts: dict[str, int] = {}
|
||||
self.alert_rearmed = True
|
||||
self.last_alert_at: datetime | None = None
|
||||
self.last_report_boundary: datetime | None = None
|
||||
self.last_queue_level: str | None = None
|
||||
self.report_window_start: datetime | None = None
|
||||
self.report_window_end: datetime | None = None
|
||||
if report_window_start is not None:
|
||||
self.report_window_start, self.report_window_end = initial_half_hour_window(
|
||||
report_window_start
|
||||
)
|
||||
|
||||
def _next_session_id(self, person_id: str) -> str:
|
||||
next_index = self.session_counts.get(person_id, 0) + 1
|
||||
@@ -140,6 +146,7 @@ class DwellEngine:
|
||||
def process_observations(
|
||||
self, observations: list[dict], when: datetime
|
||||
) -> list[dict]:
|
||||
self._ensure_report_window(when)
|
||||
events: list[dict] = []
|
||||
seen_people: set[str] = set()
|
||||
|
||||
@@ -168,12 +175,21 @@ class DwellEngine:
|
||||
if alert_event is not None:
|
||||
events.append(alert_event)
|
||||
|
||||
report_event = self._build_half_hour_report(when)
|
||||
if report_event is not None:
|
||||
while True:
|
||||
report_event = self._build_half_hour_report(when)
|
||||
if report_event is None:
|
||||
break
|
||||
events.append(report_event)
|
||||
|
||||
return events
|
||||
|
||||
def _ensure_report_window(self, when: datetime) -> None:
|
||||
if self.report_window_start is not None and self.report_window_end is not None:
|
||||
return
|
||||
self.report_window_start, self.report_window_end = initial_half_hour_window(
|
||||
when
|
||||
)
|
||||
|
||||
def _active_customer_sessions(self, when: datetime) -> list[DwellSession]:
|
||||
return [
|
||||
session
|
||||
@@ -212,27 +228,32 @@ class DwellEngine:
|
||||
}
|
||||
|
||||
def _build_half_hour_report(self, when: datetime) -> dict | None:
|
||||
boundary = floor_half_hour(when)
|
||||
if boundary == when and self.last_report_boundary == boundary:
|
||||
return
|
||||
if boundary == self.last_report_boundary:
|
||||
return None
|
||||
if when < boundary:
|
||||
self._ensure_report_window(when)
|
||||
if self.report_window_start is None or self.report_window_end is None:
|
||||
return None
|
||||
|
||||
window_start, window_end = previous_half_hour_window(when)
|
||||
queue_totals = self._queue_totals(window_start, window_end, when)
|
||||
window_start = self.report_window_start
|
||||
window_end = self.report_window_end
|
||||
if not should_emit_half_hour_report(window_end, when):
|
||||
return None
|
||||
|
||||
queue_totals = self._queue_totals(window_start, window_end)
|
||||
queue_metrics = self._build_queue_metrics(queue_totals)
|
||||
active_customers = [
|
||||
{
|
||||
**session.as_event_dict(when),
|
||||
"window_queue_seconds": session.window_dwell_seconds(
|
||||
window_start, window_end, when
|
||||
),
|
||||
}
|
||||
for session in self.sessions.values()
|
||||
if session.role == "customer" and session.state == "active"
|
||||
]
|
||||
active_customers = []
|
||||
for session in self.sessions.values():
|
||||
if session.role != "customer" or session.state != "active":
|
||||
continue
|
||||
window_queue_seconds = session.window_dwell_seconds(
|
||||
window_start, window_end, window_end
|
||||
)
|
||||
if window_queue_seconds <= 0:
|
||||
continue
|
||||
active_customers.append(
|
||||
{
|
||||
**session.as_event_dict(window_end),
|
||||
"window_queue_seconds": window_queue_seconds,
|
||||
}
|
||||
)
|
||||
closed_customers = [
|
||||
{
|
||||
"person_id": session.person_id,
|
||||
@@ -248,9 +269,14 @@ class DwellEngine:
|
||||
and window_start < session.closed_at <= window_end
|
||||
]
|
||||
staff_seen_count = sum(
|
||||
1 for session in self.sessions.values() if session.role == "staff"
|
||||
1
|
||||
for session in self.sessions.values()
|
||||
if session.role == "staff"
|
||||
and session.window_dwell_seconds(window_start, window_end, window_end) > 0
|
||||
)
|
||||
self.report_window_start, self.report_window_end = initial_half_hour_window(
|
||||
window_end
|
||||
)
|
||||
self.last_report_boundary = boundary
|
||||
return {
|
||||
"event": "half_hour_report",
|
||||
"project_type": "store_dwell_alert",
|
||||
@@ -269,7 +295,6 @@ class DwellEngine:
|
||||
self,
|
||||
window_start: datetime,
|
||||
window_end: datetime,
|
||||
when: datetime,
|
||||
) -> dict[str, int]:
|
||||
totals: dict[str, int] = {}
|
||||
for session in self.closed_sessions:
|
||||
@@ -287,7 +312,7 @@ class DwellEngine:
|
||||
if session.role != "customer":
|
||||
continue
|
||||
window_seconds = session.window_dwell_seconds(
|
||||
window_start, window_end, when
|
||||
window_start, window_end, window_end
|
||||
)
|
||||
if window_seconds > 0:
|
||||
totals[session.person_id] = (
|
||||
|
||||
@@ -2,18 +2,12 @@ from __future__ import annotations
|
||||
|
||||
from datetime import datetime, timedelta
|
||||
|
||||
|
||||
def should_emit_half_hour_report(ts: str) -> bool:
|
||||
dt = datetime.fromisoformat(ts)
|
||||
return dt.minute in {0, 30} and dt.second == 0
|
||||
HALF_HOUR_REPORT_SECONDS = 1800
|
||||
|
||||
|
||||
def floor_half_hour(dt: datetime) -> datetime:
|
||||
minute = 0 if dt.minute < 30 else 30
|
||||
return dt.replace(minute=minute, second=0, microsecond=0)
|
||||
def should_emit_half_hour_report(window_end: datetime, when: datetime) -> bool:
|
||||
return when >= window_end
|
||||
|
||||
|
||||
def previous_half_hour_window(dt: datetime) -> tuple[datetime, datetime]:
|
||||
window_end = floor_half_hour(dt)
|
||||
window_start = window_end - timedelta(minutes=30)
|
||||
return window_start, window_end
|
||||
def initial_half_hour_window(started_at: datetime) -> tuple[datetime, datetime]:
|
||||
return started_at, started_at + timedelta(seconds=HALF_HOUR_REPORT_SECONDS)
|
||||
|
||||
Reference in New Issue
Block a user