From ea618fd6745b593f747d1a466e8ede9a4f59f56a Mon Sep 17 00:00:00 2001 From: "skye.yue" Date: Sat, 9 May 2026 11:35:55 +0800 Subject: [PATCH] Refactor store dwell alert management API and dwell engine - Updated argument parsing in manage_api.py to include new threshold parameters. - Enhanced _config_payload to include thresholds and webhook configurations. - Modified _build_summary to track queue metrics and adjust alert reporting. - Refactored DwellEngine to utilize queue thresholds for alerting and reporting. - Added queue metrics calculations and status change tracking in dwell_engine.py. - Updated notifier.py to support posting JSON events to webhooks. - Adjusted example configuration to reflect new threshold parameters. - Enhanced Docker entrypoint script for better process management. - Updated tests to cover new queue metrics and thresholds. - Improved ManagedServiceDetail and ManagedServices Vue components to display queue metrics. --- Dockerfile | 3 +- deploy/managed-portal.10.8.0.11.env | 13 + docs/managed-queue-webhook.md | 249 ++++++++++++++++++ .../config/config.example.yaml | 15 ++ .../configs/default_config.yaml | 15 ++ .../scripts/docker-entrypoint.sh | 65 ++++- .../src/people_flow/config.py | 12 + .../src/people_flow/manage_api.py | 69 ++++- .../src/people_flow/models.py | 33 ++- .../src/people_flow/pipeline.py | 99 +++++-- .../src/people_flow/queue_analytics.py | 201 ++++++++++++++ .../src/people_flow/webhook.py | 29 ++ .../tests/test_manage_api.py | 48 +++- .../tests/test_queue_analytics.py | 43 +++ managed/store_dwell_alert/app/config.py | 17 +- managed/store_dwell_alert/app/main.py | 24 +- managed/store_dwell_alert/app/manage_api.py | 60 +++-- .../app/modules/dwell_engine.py | 184 ++++++++++++- .../store_dwell_alert/app/modules/notifier.py | 23 +- .../config/config.example.yaml | 6 +- .../scripts/docker-entrypoint.sh | 65 ++++- .../store_dwell_alert/tests/test_config.py | 11 +- .../tests/test_dwell_engine.py | 84 ++++-- .../tests/test_manage_api.py | 39 ++- web/src/views/ManagedServiceDetail.vue | 188 ++++++++++++- web/src/views/ManagedServices.vue | 127 ++++++++- 26 files changed, 1605 insertions(+), 117 deletions(-) create mode 100644 deploy/managed-portal.10.8.0.11.env create mode 100644 docs/managed-queue-webhook.md create mode 100644 managed/people_flow_project/src/people_flow/queue_analytics.py create mode 100644 managed/people_flow_project/src/people_flow/webhook.py create mode 100644 managed/people_flow_project/tests/test_queue_analytics.py diff --git a/Dockerfile b/Dockerfile index 3641d06..3b8513f 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,7 +1,8 @@ FROM swr.cn-north-4.myhuaweicloud.com/ddn-k8s/docker.io/library/golang:1.25.4-alpine AS builder ENV TZ=Asia/Shanghai \ - GOPROXY=https://goproxy.cn,direct + GOPROXY=https://goproxy.cn,direct \ + GOSUMDB=sum.golang.google.cn RUN sed -i 's/dl-cdn.alpinelinux.org/mirrors.aliyun.com/g' /etc/apk/repositories RUN apk add --no-cache ca-certificates tzdata diff --git a/deploy/managed-portal.10.8.0.11.env b/deploy/managed-portal.10.8.0.11.env new file mode 100644 index 0000000..b680288 --- /dev/null +++ b/deploy/managed-portal.10.8.0.11.env @@ -0,0 +1,13 @@ +IMAGE_VERSION=dev +TZ=Asia/Shanghai +MANAGED_PORTAL_WEB_PORT=13000 + +MANAGED_STORE_DWELL_CAMERA_ID=cam_192_168_3_3 +MANAGED_STORE_DWELL_RTSP_URL=rtsp://admin:Zxjp2026@192.168.3.3:554/h264/ch1/main/av_stream +MANAGED_STORE_DWELL_EVENT_SINK_PATH=logs/events.jsonl +MANAGED_STORE_DWELL_CONFIG_DIR=../managed/store_dwell_alert/config +MANAGED_STORE_DWELL_DATA_DIR=../managed/store_dwell_alert/data + +MANAGED_PEOPLE_FLOW_RTSP_URL=rtsp://admin:Zxjp2026@192.168.3.3:554/h264/ch1/main/av_stream +MANAGED_PEOPLE_FLOW_CONFIG_DIR=../managed/people_flow_project/config +MANAGED_PEOPLE_FLOW_OUTPUT_DIR=../managed/people_flow_project/outputs \ No newline at end of file diff --git a/docs/managed-queue-webhook.md b/docs/managed-queue-webhook.md new file mode 100644 index 0000000..1df8c88 --- /dev/null +++ b/docs/managed-queue-webhook.md @@ -0,0 +1,249 @@ +# Managed Queue Webhook 对接说明 + +本文档说明 `managed/store_dwell_alert` 和 `managed/people_flow_project` 两个工程新增的排队统计与 webhook 推送结构,便于接收方按统一协议完成对接。 + +## 业务规则 + +- 统计窗口固定为每 30 分钟一个窗口。 +- 每个窗口统计两类人数: + - `over_threshold_count`:该窗口内累计排队时间大于等于 5 分钟的人数。 + - `under_threshold_count`:该窗口内累计排队时间小于 5 分钟但大于 0 的人数。 +- 一阶段排队等级规则: + - `crowded`:`over_threshold_count > 5` + - `normal`:`2 <= over_threshold_count <= 5` + - `few`:`over_threshold_count < 2` +- 状态变化规则: + - `queue_increased`:`normal -> crowded` 或 `few -> crowded` + - `queue_decreased`:`normal -> few` 或 `crowded -> few` + - `queue_normalized`:`crowded -> normal` 或 `few -> normal` + - `unchanged`:窗口等级未变化 + - `initial`:首个统计窗口,没有上一个窗口可比较 + +## 配置方式 + +### store_dwell_alert + +配置文件示例:`managed/store_dwell_alert/config/config.example.yaml` + +```yaml +thresholds: + queue_time_threshold_seconds: 300 + crowded_count_threshold: 5 + normal_count_threshold: 2 + pause_timeout_seconds: 300 + alert_cooldown_seconds: 600 + +webhook: + url: "https://receiver.example.com/queue-report" + timeout_seconds: 5.0 +``` + +### people_flow_project + +配置文件示例:`managed/people_flow_project/config/config.example.yaml` + +```yaml +queue: + enabled: true + area: [0.0, 0.0, 1.0, 1.0] + area_mode: "normalized" + queue_time_threshold_seconds: 300 + crowded_count_threshold: 5 + normal_count_threshold: 2 + pause_timeout_seconds: 5 + source_id: "queue_cam_01" + +webhook: + url: "https://receiver.example.com/queue-report" + timeout_seconds: 5.0 + event_log_path: "outputs/rtsp_stream/webhook_events.jsonl" +``` + +说明: + +- `queue.area` 用于限定排队区域,默认 `[0, 0, 1, 1]` 表示全画面。 +- `queue.area_mode=normalized` 表示区域坐标按画面宽高归一化。 +- 两个工程都会先把 webhook 数据写入本地结果文件,再尝试 HTTP POST。 + +## 推送时机 + +- 两个工程都会在每个 30 分钟窗口结束时推送一次 `half_hour_report`。 +- `store_dwell_alert` 仍会继续生成本地事件日志;用于对接的窗口报文以本文档的 `half_hour_report` 结构为准。 + +## 公共字段 + +两个工程的 webhook 都包含以下公共字段: + +| 字段 | 类型 | 说明 | +| --------------- | ------ | ---------------------------------------------------------------------------------------------- | +| `event` | string | 固定为 `half_hour_report` | +| `project_type` | string | 工程类型,值为 `store_dwell_alert` 或 `people_flow_project` | +| `source_id` | string | 数据源标识。`store_dwell_alert` 使用 `camera_id`,`people_flow_project` 使用 `queue.source_id` | +| `window_start` | string | 窗口开始时间,ISO 8601 | +| `window_end` | string | 窗口结束时间,ISO 8601 | +| `queue_metrics` | object | 排队统计主体 | + +`queue_metrics` 结构: + +| 字段 | 类型 | 说明 | +| ------------------------------ | ----------- | ------------------------------------------------------------------------------------ | +| `queue_time_threshold_seconds` | integer | 长等待阈值,当前默认 300 秒 | +| `over_threshold_count` | integer | 窗口内累计排队时间大于等于阈值的人数 | +| `under_threshold_count` | integer | 窗口内累计排队时间小于阈值但大于 0 的人数 | +| `queue_level` | string | `few` / `normal` / `crowded` | +| `previous_queue_level` | string/null | 上一个窗口的等级 | +| `status_change` | string | `initial` / `unchanged` / `queue_increased` / `queue_decreased` / `queue_normalized` | +| `people` | array | 当前窗口内参与统计的人员列表 | + +`queue_metrics.people[]` 结构: + +| 字段 | 类型 | 说明 | +| --------------- | ------- | ------------------------------------- | +| `person_id` | string | 人员标识 | +| `queue_seconds` | integer | 该窗口内累计排队秒数 | +| `bucket` | string | `over_threshold` 或 `under_threshold` | + +## store_dwell_alert 完整报文 + +`store_dwell_alert` 会额外带上门店停留会话明细: + +```json +{ + "event": "half_hour_report", + "project_type": "store_dwell_alert", + "camera_id": "store_cam_01", + "source_id": "store_cam_01", + "window_start": "2026-05-08T09:00:00+08:00", + "window_end": "2026-05-08T09:30:00+08:00", + "active_customer_count": 3, + "active_customers": [ + { + "person_id": "cust_101", + "session_id": "cust_101-s1", + "role": "customer", + "status": "active", + "dwell_seconds": 820, + "window_queue_seconds": 820 + }, + { + "person_id": "cust_102", + "session_id": "cust_102-s1", + "role": "customer", + "status": "active", + "dwell_seconds": 460, + "window_queue_seconds": 460 + } + ], + "closed_customers": [ + { + "person_id": "cust_103", + "session_id": "cust_103-s1", + "final_dwell_seconds": 260, + "window_queue_seconds": 260 + } + ], + "staff_seen_count": 1, + "queue_metrics": { + "queue_time_threshold_seconds": 300, + "over_threshold_count": 6, + "under_threshold_count": 2, + "queue_level": "crowded", + "previous_queue_level": "normal", + "status_change": "queue_increased", + "people": [ + { + "person_id": "cust_101", + "queue_seconds": 820, + "bucket": "over_threshold" + }, + { + "person_id": "cust_102", + "queue_seconds": 460, + "bucket": "over_threshold" + }, + { + "person_id": "cust_103", + "queue_seconds": 260, + "bucket": "under_threshold" + } + ] + } +} +``` + +## people_flow_project 完整报文 + +`people_flow_project` 会额外带上过线统计和属性统计结果: + +```json +{ + "event": "half_hour_report", + "project_type": "people_flow_project", + "source_type": "rtsp", + "source": "rtsp://user:password@camera-ip:554/h264/ch1/main/av_stream", + "source_id": "queue_cam_01", + "window_index": 12, + "window_start": "2026-05-08T09:00:00+08:00", + "window_end": "2026-05-08T09:30:00+08:00", + "window_duration_seconds": 1800, + "config_path": "/opt/people_flow_project/config/local.yaml", + "line": { + "coordinates": [0.1, 0.55, 0.9, 0.55], + "mode": "normalized" + }, + "total_people": 7, + "age_counts": { + "minor": 1, + "adult": 5, + "senior": 1 + }, + "gender_counts": { + "male": 4, + "female": 3 + }, + "unknown_attributes": 2, + "tracks": [ + { + "track_id": 1, + "direction": "in", + "age": 26, + "age_bucket": "adult", + "gender": "male", + "samples_used": 3 + } + ], + "queue_metrics": { + "queue_time_threshold_seconds": 300, + "over_threshold_count": 6, + "under_threshold_count": 2, + "queue_level": "crowded", + "previous_queue_level": "normal", + "status_change": "queue_increased", + "people": [ + { + "person_id": "track_12", + "queue_seconds": 810, + "bucket": "over_threshold" + }, + { + "person_id": "track_21", + "queue_seconds": 180, + "bucket": "under_threshold" + } + ] + } +} +``` + +## 接收方建议 + +- 按 `event + project_type + source_id + window_end` 做幂等去重。 +- 业务判断优先读取 `queue_metrics.queue_level` 和 `queue_metrics.status_change`。 +- 如果只关心图片里的需求,最少只需要解析: + - `source_id` + - `window_start` + - `window_end` + - `queue_metrics.over_threshold_count` + - `queue_metrics.under_threshold_count` + - `queue_metrics.queue_level` + - `queue_metrics.status_change` diff --git a/managed/people_flow_project/config/config.example.yaml b/managed/people_flow_project/config/config.example.yaml index 7ae074c..ca27a73 100644 --- a/managed/people_flow_project/config/config.example.yaml +++ b/managed/people_flow_project/config/config.example.yaml @@ -39,3 +39,18 @@ rtsp: stream_open_timeout_seconds: 10.0 idle_sleep_seconds: 0.05 output_subdir: "rtsp_stream" + +queue: + enabled: true + area: [0.0, 0.0, 1.0, 1.0] + area_mode: "normalized" + queue_time_threshold_seconds: 300 + crowded_count_threshold: 5 + normal_count_threshold: 2 + pause_timeout_seconds: 5 + source_id: "people_flow_queue" + +webhook: + url: "" + timeout_seconds: 5.0 + event_log_path: "outputs/rtsp_stream/webhook_events.jsonl" diff --git a/managed/people_flow_project/configs/default_config.yaml b/managed/people_flow_project/configs/default_config.yaml index 47995c7..d2097cd 100644 --- a/managed/people_flow_project/configs/default_config.yaml +++ b/managed/people_flow_project/configs/default_config.yaml @@ -35,3 +35,18 @@ rtsp: stream_open_timeout_seconds: 10.0 idle_sleep_seconds: 0.05 output_subdir: "rtsp_stream" + +queue: + enabled: true + area: [0.0, 0.0, 1.0, 1.0] + area_mode: "normalized" + queue_time_threshold_seconds: 300 + crowded_count_threshold: 5 + normal_count_threshold: 2 + pause_timeout_seconds: 5 + source_id: "people_flow_queue" + +webhook: + url: "" + timeout_seconds: 5.0 + event_log_path: "outputs/rtsp_stream/webhook_events.jsonl" diff --git a/managed/people_flow_project/scripts/docker-entrypoint.sh b/managed/people_flow_project/scripts/docker-entrypoint.sh index e6f4463..eade454 100755 --- a/managed/people_flow_project/scripts/docker-entrypoint.sh +++ b/managed/people_flow_project/scripts/docker-entrypoint.sh @@ -37,4 +37,67 @@ config_path.write_text( ) PY -exec python main.py --config "${CONFIG_PATH}" manage-api --host "${API_HOST}" --port "${API_PORT}" +exec python - "$CONFIG_PATH" "$API_HOST" "$API_PORT" <<'PY' +import signal +import subprocess +import sys +import time + +config_path, api_host, api_port = sys.argv[1:4] +commands = [ + [sys.executable, "main.py", "--config", config_path, "rtsp"], + [ + sys.executable, + "main.py", + "--config", + config_path, + "manage-api", + "--host", + api_host, + "--port", + api_port, + ], +] +processes = [subprocess.Popen(command) for command in commands] + + +def terminate_all(signum, _frame): + for process in processes: + if process.poll() is None: + process.terminate() + deadline = time.time() + 10 + for process in processes: + if process.poll() is not None: + continue + timeout = max(0, deadline - time.time()) + try: + process.wait(timeout=timeout) + except subprocess.TimeoutExpired: + process.kill() + raise SystemExit(128 + signum) + + +for handled_signal in (signal.SIGINT, signal.SIGTERM): + signal.signal(handled_signal, terminate_all) + +while True: + for index, process in enumerate(processes): + return_code = process.poll() + if return_code is None: + continue + for other_index, other_process in enumerate(processes): + if other_index == index or other_process.poll() is not None: + continue + other_process.terminate() + deadline = time.time() + 10 + for other_index, other_process in enumerate(processes): + if other_index == index or other_process.poll() is not None: + continue + timeout = max(0, deadline - time.time()) + try: + other_process.wait(timeout=timeout) + except subprocess.TimeoutExpired: + other_process.kill() + raise SystemExit(return_code) + time.sleep(0.5) +PY diff --git a/managed/people_flow_project/src/people_flow/config.py b/managed/people_flow_project/src/people_flow/config.py index 2d458ab..76a609f 100644 --- a/managed/people_flow_project/src/people_flow/config.py +++ b/managed/people_flow_project/src/people_flow/config.py @@ -10,8 +10,10 @@ from .models import ( AttributeConfig, CountingConfig, OutputConfig, + QueueConfig, RtspConfig, RuntimeConfig, + WebhookConfig, YoloConfig, ) @@ -59,6 +61,8 @@ def load_config(config_path: Path) -> AppConfig: attributes=AttributeConfig(**data.get("attributes", {})), output=OutputConfig(**data.get("output", {})), rtsp=RtspConfig(**data.get("rtsp", {})), + queue=QueueConfig(**_normalize_queue_config(data.get("queue", {}))), + webhook=WebhookConfig(**data.get("webhook", {})), runtime=RuntimeConfig(**data.get("runtime", {})), config_path=config_path.resolve(), ) @@ -73,6 +77,14 @@ def _normalize_counting_config(data: dict) -> dict: return normalized +def _normalize_queue_config(data: dict) -> dict: + normalized = dict(data) + area = normalized.get("area") + if area is not None: + normalized["area"] = tuple(float(value) for value in area) + return normalized + + def parse_line_override(raw_line: str) -> tuple[float, float, float, float]: parts = [part.strip() for part in raw_line.split(",")] if len(parts) != 4: diff --git a/managed/people_flow_project/src/people_flow/manage_api.py b/managed/people_flow_project/src/people_flow/manage_api.py index e84af50..65a8d20 100644 --- a/managed/people_flow_project/src/people_flow/manage_api.py +++ b/managed/people_flow_project/src/people_flow/manage_api.py @@ -16,7 +16,6 @@ from .config import ( save_config_document, ) - PROJECT_TYPE = "people_flow_project" DEFAULT_MANAGE_PORT = 18082 MAX_PREVIEW_LINES = 2000 @@ -135,7 +134,12 @@ def parse_args() -> ArgumentParser: parser = ArgumentParser(description="People flow management API") parser.add_argument("--config", required=True, help="Path to YAML config file") parser.add_argument("--host", default="0.0.0.0", help="Host for the management API") - parser.add_argument("--port", type=int, default=DEFAULT_MANAGE_PORT, help="Port for the management API") + parser.add_argument( + "--port", + type=int, + default=DEFAULT_MANAGE_PORT, + help="Port for the management API", + ) return parser @@ -160,6 +164,19 @@ def _config_payload(ctx: ManageContext) -> dict: "output_subdir": config.rtsp.output_subdir, "window_seconds": config.rtsp.window_seconds, }, + "queue": { + "source_id": config.queue.source_id, + "queue_time_threshold_seconds": config.queue.queue_time_threshold_seconds, + "crowded_count_threshold": config.queue.crowded_count_threshold, + "normal_count_threshold": config.queue.normal_count_threshold, + }, + "webhook": { + "url": config.webhook.url, + "event_log_path": str( + resolve_project_path(ctx.project_root, config.webhook.event_log_path) + ), + "timeout_seconds": config.webhook.timeout_seconds, + }, } @@ -191,15 +208,33 @@ def _build_summary(ctx: ManageContext) -> dict: total_people = _int_value(payload.get("total_people")) window_end = _string_value(payload.get("window_end")) + queue_metrics = ( + payload.get("queue_metrics") + if isinstance(payload.get("queue_metrics"), dict) + else {} + ) return { "result_type": PROJECT_TYPE, - "headline": f"Latest window counted {total_people} people", + "headline": ( + "Latest report shows " + f"{_string_value(queue_metrics.get('queue_level')) or 'few'} queue, " + f"{_int_value(queue_metrics.get('over_threshold_count'))} over 5 min and " + f"{_int_value(queue_metrics.get('under_threshold_count'))} under 5 min" + ), "last_result_time": window_end, "metrics": { "summary_path": str(summary_path) if summary_path else "", "window_start": _string_value(payload.get("window_start")), "window_end": window_end, "total_people": total_people, + "queue_level": _string_value(queue_metrics.get("queue_level")), + "over_threshold_count": _int_value( + queue_metrics.get("over_threshold_count") + ), + "under_threshold_count": _int_value( + queue_metrics.get("under_threshold_count") + ), + "status_change": _string_value(queue_metrics.get("status_change")), "direction_counts": direction_counts, "age_counts": _map_string_int(payload.get("age_counts")), "gender_counts": _map_string_int(payload.get("gender_counts")), @@ -246,6 +281,14 @@ def _load_window_stats(ctx: ManageContext) -> list[dict]: "window_start": _string_value(payload.get("window_start")), "window_end": _string_value(payload.get("window_end")), "total_people": _int_value(payload.get("total_people")), + "queue_level": _queue_metric_value(payload, "queue_level"), + "over_threshold_count": _queue_metric_int( + payload, "over_threshold_count" + ), + "under_threshold_count": _queue_metric_int( + payload, "under_threshold_count" + ), + "status_change": _queue_metric_value(payload, "status_change"), "age_counts": _map_string_int(payload.get("age_counts")), "gender_counts": _map_string_int(payload.get("gender_counts")), "unknown_attributes": _int_value(payload.get("unknown_attributes")), @@ -259,6 +302,7 @@ def _list_result_files(ctx: ManageContext) -> list[dict]: files: list[dict] = [] for path, label in ( (_latest_json_path(ctx), "Latest Summary"), + (_webhook_log_path(ctx), "Webhook Event Log"), (_runtime_log_path(ctx), "Runtime Log"), ): if path.exists() and path.is_file(): @@ -305,6 +349,11 @@ def _runtime_log_path(ctx: ManageContext) -> Path: return _output_root(ctx) / "rtsp_run.log" +def _webhook_log_path(ctx: ManageContext) -> Path: + config = load_config(ctx.config_path) + return resolve_project_path(ctx.project_root, config.webhook.event_log_path) + + def _window_files(ctx: ManageContext) -> list[Path]: windows_dir = _windows_dir(ctx) if not windows_dir.exists(): @@ -385,5 +434,19 @@ def _map_string_int(value) -> dict[str, int]: return {str(key): _int_value(raw) for key, raw in value.items()} +def _queue_metric_value(payload: dict, field: str) -> str: + queue_metrics = payload.get("queue_metrics") + if not isinstance(queue_metrics, dict): + return "" + return _string_value(queue_metrics.get(field)) + + +def _queue_metric_int(payload: dict, field: str) -> int: + queue_metrics = payload.get("queue_metrics") + if not isinstance(queue_metrics, dict): + return 0 + return _int_value(queue_metrics.get(field)) + + if __name__ == "__main__": raise SystemExit(main()) diff --git a/managed/people_flow_project/src/people_flow/models.py b/managed/people_flow_project/src/people_flow/models.py index 6a92110..b3eee55 100644 --- a/managed/people_flow_project/src/people_flow/models.py +++ b/managed/people_flow_project/src/people_flow/models.py @@ -20,7 +20,9 @@ class CountingConfig: line_mode: str = "normalized" crossing_tolerance: float = 12.0 - def to_pixel_line(self, width: int, height: int) -> tuple[float, float, float, float]: + def to_pixel_line( + self, width: int, height: int + ) -> tuple[float, float, float, float]: x1, y1, x2, y2 = self.line if self.line_mode == "pixel": return x1, y1, x2, y2 @@ -58,6 +60,33 @@ class RtspConfig: output_subdir: str = "rtsp_stream" +@dataclass +class QueueConfig: + enabled: bool = True + area: tuple[float, float, float, float] = (0.0, 0.0, 1.0, 1.0) + area_mode: str = "normalized" + queue_time_threshold_seconds: int = 300 + crowded_count_threshold: int = 5 + normal_count_threshold: int = 2 + pause_timeout_seconds: int = 5 + source_id: str = "people_flow_queue" + + def to_pixel_area( + self, width: int, height: int + ) -> tuple[float, float, float, float]: + x1, y1, x2, y2 = self.area + if self.area_mode == "pixel": + return x1, y1, x2, y2 + return x1 * width, y1 * height, x2 * width, y2 * height + + +@dataclass +class WebhookConfig: + url: str = "" + timeout_seconds: float = 5.0 + event_log_path: str = "outputs/rtsp_stream/webhook_events.jsonl" + + @dataclass class RuntimeConfig: rtsp_url: str = "rtsp://user:password@camera-ip:554/h264/ch1/main/av_stream" @@ -71,6 +100,8 @@ class AppConfig: attributes: AttributeConfig = field(default_factory=AttributeConfig) output: OutputConfig = field(default_factory=OutputConfig) rtsp: RtspConfig = field(default_factory=RtspConfig) + queue: QueueConfig = field(default_factory=QueueConfig) + webhook: WebhookConfig = field(default_factory=WebhookConfig) runtime: RuntimeConfig = field(default_factory=RuntimeConfig) config_path: Path | None = None diff --git a/managed/people_flow_project/src/people_flow/pipeline.py b/managed/people_flow_project/src/people_flow/pipeline.py index ede6b3d..aacea72 100644 --- a/managed/people_flow_project/src/people_flow/pipeline.py +++ b/managed/people_flow_project/src/people_flow/pipeline.py @@ -20,8 +20,9 @@ from .io_utils import ( write_window_json, ) from .models import AppConfig +from .queue_analytics import QueueWindowTracker from .tracking import extract_person_tracks - +from .webhook import dispatch_json_event SUPPORTED_EXTENSIONS = {".mp4", ".mov", ".mkv", ".avi"} @@ -104,7 +105,9 @@ class PeopleFlowPipeline: writer = None if self.config.output.save_video: - writer = make_video_writer(video_output_path, width=width, height=height, fps=fps) + writer = make_video_writer( + video_output_path, width=width, height=height, fps=fps + ) counter = LineCrossCounter(pixel_line, self.config.counting) attributes = AttributeAggregator(self.config.attributes) @@ -118,7 +121,9 @@ class PeopleFlowPipeline: observations = self._track_frame(frame) for observation in observations: - attributes.maybe_collect(frame=frame, frame_index=frame_index, track=observation) + attributes.maybe_collect( + frame=frame, frame_index=frame_index, track=observation + ) counter.update(observations) @@ -154,7 +159,9 @@ class PeopleFlowPipeline: sample_interval = max(float(self.config.rtsp.sample_interval_seconds), 0.01) window_seconds = max(int(self.config.rtsp.window_seconds), 1) reconnect_delay = max(float(self.config.rtsp.reconnect_delay_seconds), 0.1) - open_timeout_seconds = max(float(self.config.rtsp.stream_open_timeout_seconds), 1.0) + open_timeout_seconds = max( + float(self.config.rtsp.stream_open_timeout_seconds), 1.0 + ) idle_sleep = max(float(self.config.rtsp.idle_sleep_seconds), 0.0) window_index = 0 @@ -168,7 +175,18 @@ class PeopleFlowPipeline: capture = None pixel_line = None counter = None + queue_tracker = None attributes = AttributeAggregator(self.config.attributes) + project_root = ( + self.config.config_path.parent.parent + if self.config.config_path is not None + else self.output_root + ) + webhook_event_log_path = ( + project_root / self.config.webhook.event_log_path + if not Path(self.config.webhook.event_log_path).is_absolute() + else Path(self.config.webhook.event_log_path) + ) try: while True: @@ -181,6 +199,7 @@ class PeopleFlowPipeline: window_end=window_end, counter=counter, attributes=attributes, + queue_tracker=queue_tracker, ) json_path = write_window_json( rtsp_paths["windows"], @@ -188,6 +207,12 @@ class PeopleFlowPipeline: payload, window_end, ) + dispatch_json_event( + webhook_event_log_path, + payload, + webhook_url=self.config.webhook.url, + timeout_seconds=self.config.webhook.timeout_seconds, + ) print(f"window_json={json_path}", flush=True) print(f"window_total_people={payload['total_people']}", flush=True) window_index += 1 @@ -195,6 +220,8 @@ class PeopleFlowPipeline: window_end = window_start + timedelta(seconds=window_seconds) if counter is not None: counter.reset() + if queue_tracker is not None: + queue_tracker.reset() attributes.reset() now = datetime.now().astimezone() @@ -213,8 +240,14 @@ class PeopleFlowPipeline: if pixel_line is None: height, width = frame.shape[:2] - pixel_line = self.config.counting.to_pixel_line(width=width, height=height) + pixel_line = self.config.counting.to_pixel_line( + width=width, height=height + ) counter = LineCrossCounter(pixel_line, self.config.counting) + queue_tracker = QueueWindowTracker( + self.config.queue, + self.config.queue.to_pixel_area(width=width, height=height), + ) current_time = time.monotonic() if current_time - last_processed_at < sample_interval: @@ -225,9 +258,13 @@ class PeopleFlowPipeline: last_processed_at = current_time observations = self._track_frame(frame) for observation in observations: - attributes.maybe_collect(frame=frame, frame_index=frame_index, track=observation) + attributes.maybe_collect( + frame=frame, frame_index=frame_index, track=observation + ) if counter is not None: counter.update(observations) + if queue_tracker is not None and self.config.queue.enabled: + queue_tracker.observe(observations, now) if current_time >= next_heartbeat_at: self._print_rtsp_heartbeat( process_started_at=process_started_at, @@ -283,7 +320,9 @@ class PeopleFlowPipeline: capture.release() return None - def _build_live_stats(self, counter: LineCrossCounter, attributes: AttributeAggregator) -> dict: + def _build_live_stats( + self, counter: LineCrossCounter, attributes: AttributeAggregator + ) -> dict: age_counts = {"minor": 0, "adult": 0, "senior": 0} gender_counts = {"male": 0, "female": 0} unknown_attributes = 0 @@ -313,7 +352,9 @@ class PeopleFlowPipeline: last_processed_wall_time: datetime | None, ) -> None: stats = self._build_live_stats(counter, attributes) - runtime_seconds = int((datetime.now().astimezone() - process_started_at).total_seconds()) + runtime_seconds = int( + (datetime.now().astimezone() - process_started_at).total_seconds() + ) last_processed = ( last_processed_wall_time.isoformat(timespec="seconds") if last_processed_wall_time is not None @@ -387,20 +428,41 @@ class PeopleFlowPipeline: window_end: datetime, counter: LineCrossCounter | None, attributes: AttributeAggregator, + queue_tracker: QueueWindowTracker | None, ) -> dict: - age_counts, gender_counts, unknown_attributes, track_summaries = self._collect_track_summaries( - counter, - attributes, + age_counts, gender_counts, unknown_attributes, track_summaries = ( + self._collect_track_summaries( + counter, + attributes, + ) ) total_people = 0 if counter is None else counter.total_people + queue_metrics = ( + queue_tracker.build_queue_metrics(window_start, window_end) + if queue_tracker is not None and self.config.queue.enabled + else { + "queue_time_threshold_seconds": self.config.queue.queue_time_threshold_seconds, + "over_threshold_count": 0, + "under_threshold_count": 0, + "queue_level": "few", + "previous_queue_level": None, + "status_change": "initial", + "people": [], + } + ) return { + "event": "half_hour_report", + "project_type": "people_flow_project", "source_type": "rtsp", "source": source, + "source_id": self.config.queue.source_id, "window_index": window_index, "window_start": window_start.isoformat(), "window_end": window_end.isoformat(), "window_duration_seconds": int((window_end - window_start).total_seconds()), - "config_path": str(self.config.config_path) if self.config.config_path else None, + "config_path": ( + str(self.config.config_path) if self.config.config_path else None + ), "line": { "coordinates": list(self.config.counting.line), "mode": self.config.counting.line_mode, @@ -410,6 +472,7 @@ class PeopleFlowPipeline: "gender_counts": gender_counts, "unknown_attributes": unknown_attributes, "tracks": track_summaries, + "queue_metrics": queue_metrics, } def _finalize_summary( @@ -419,15 +482,19 @@ class PeopleFlowPipeline: attributes: AttributeAggregator, json_path: Path, ) -> dict: - age_counts, gender_counts, unknown_attributes, track_summaries = self._collect_track_summaries( - counter, - attributes, + age_counts, gender_counts, unknown_attributes, track_summaries = ( + self._collect_track_summaries( + counter, + attributes, + ) ) payload = { "video_name": video_path.name, "video_path": str(video_path), - "config_path": str(self.config.config_path) if self.config.config_path else None, + "config_path": ( + str(self.config.config_path) if self.config.config_path else None + ), "line": { "coordinates": list(self.config.counting.line), "mode": self.config.counting.line_mode, diff --git a/managed/people_flow_project/src/people_flow/queue_analytics.py b/managed/people_flow_project/src/people_flow/queue_analytics.py new file mode 100644 index 0000000..ea735d5 --- /dev/null +++ b/managed/people_flow_project/src/people_flow/queue_analytics.py @@ -0,0 +1,201 @@ +from __future__ import annotations + +from dataclasses import dataclass, field +from datetime import datetime + +from .models import QueueConfig, TrackObservation + + +@dataclass +class QueueTrackState: + track_id: int + entered_at: datetime + accumulated_queue_seconds: int = 0 + active_started_at: datetime | None = None + last_seen_at: datetime | None = None + pause_started_at: datetime | None = None + completed_periods: list[tuple[datetime, datetime]] = field(default_factory=list) + + def __post_init__(self) -> None: + if self.active_started_at is None: + self.active_started_at = self.entered_at + if self.last_seen_at is None: + self.last_seen_at = self.entered_at + + def mark_seen(self, when: datetime) -> None: + if self.active_started_at is None: + self.active_started_at = when + self.last_seen_at = when + self.pause_started_at = None + + def pause(self, when: datetime) -> None: + if self.active_started_at is None: + return + self.completed_periods.append((self.active_started_at, when)) + self.accumulated_queue_seconds += max( + 0, + int((when - self.active_started_at).total_seconds()), + ) + self.active_started_at = None + self.pause_started_at = when + self.last_seen_at = when + + def expire(self, when: datetime, pause_timeout_seconds: int) -> bool: + if self.pause_started_at is None: + return False + return ( + int((when - self.pause_started_at).total_seconds()) > pause_timeout_seconds + ) + + def window_queue_seconds(self, window_start: datetime, window_end: datetime) -> int: + total = 0 + for period_start, period_end in self.completed_periods: + total += _overlap_seconds( + period_start, period_end, window_start, window_end + ) + if self.active_started_at is not None: + current_end = self.last_seen_at or window_end + total += _overlap_seconds( + self.active_started_at, current_end, window_start, window_end + ) + return total + + +class QueueWindowTracker: + def __init__( + self, config: QueueConfig, pixel_area: tuple[float, float, float, float] + ) -> None: + self.config = config + self.pixel_area = pixel_area + self.states: dict[int, QueueTrackState] = {} + self.closed_states: list[QueueTrackState] = [] + self.last_queue_level: str | None = None + + def observe(self, observations: list[TrackObservation], when: datetime) -> None: + seen_ids: set[int] = set() + for observation in observations: + if not _point_in_area(observation.center, self.pixel_area): + continue + seen_ids.add(observation.track_id) + state = self.states.get(observation.track_id) + if state is None: + state = QueueTrackState(track_id=observation.track_id, entered_at=when) + self.states[observation.track_id] = state + state.mark_seen(when) + + for track_id, state in list(self.states.items()): + if track_id in seen_ids: + continue + if state.active_started_at is not None: + state.pause(when) + if state.expire(when, self.config.pause_timeout_seconds): + self.closed_states.append(state) + del self.states[track_id] + + def build_queue_metrics(self, window_start: datetime, window_end: datetime) -> dict: + totals: dict[int, int] = {} + for state in self.closed_states: + queue_seconds = state.window_queue_seconds(window_start, window_end) + if queue_seconds > 0: + totals[state.track_id] = totals.get(state.track_id, 0) + queue_seconds + for track_id, state in self.states.items(): + queue_seconds = state.window_queue_seconds(window_start, window_end) + if queue_seconds > 0: + totals[track_id] = queue_seconds + + over_threshold_count = sum( + 1 + for queue_seconds in totals.values() + if queue_seconds >= self.config.queue_time_threshold_seconds + ) + under_threshold_count = sum( + 1 + for queue_seconds in totals.values() + if 0 < queue_seconds < self.config.queue_time_threshold_seconds + ) + queue_level = _queue_level( + over_threshold_count, + crowded_count_threshold=self.config.crowded_count_threshold, + normal_count_threshold=self.config.normal_count_threshold, + ) + previous_queue_level = self.last_queue_level + status_change = _queue_status_change(previous_queue_level, queue_level) + self.last_queue_level = queue_level + return { + "queue_time_threshold_seconds": self.config.queue_time_threshold_seconds, + "over_threshold_count": over_threshold_count, + "under_threshold_count": under_threshold_count, + "queue_level": queue_level, + "previous_queue_level": previous_queue_level, + "status_change": status_change, + "people": [ + { + "person_id": f"track_{track_id}", + "queue_seconds": queue_seconds, + "bucket": ( + "over_threshold" + if queue_seconds >= self.config.queue_time_threshold_seconds + else "under_threshold" + ), + } + for track_id, queue_seconds in sorted( + totals.items(), key=lambda item: item[1], reverse=True + ) + ], + } + + def reset(self) -> None: + self.states.clear() + self.closed_states.clear() + + +def _point_in_area( + point: tuple[float, float], + area: tuple[float, float, float, float], +) -> bool: + px, py = point + x1, y1, x2, y2 = area + left = min(x1, x2) + right = max(x1, x2) + top = min(y1, y2) + bottom = max(y1, y2) + return left <= px <= right and top <= py <= bottom + + +def _overlap_seconds( + period_start: datetime, + period_end: datetime, + window_start: datetime, + window_end: datetime, +) -> int: + overlap_start = max(period_start, window_start) + overlap_end = min(period_end, window_end) + if overlap_end <= overlap_start: + return 0 + return int((overlap_end - overlap_start).total_seconds()) + + +def _queue_level( + over_threshold_count: int, + crowded_count_threshold: int, + normal_count_threshold: int, +) -> str: + if over_threshold_count > crowded_count_threshold: + return "crowded" + if over_threshold_count >= normal_count_threshold: + return "normal" + return "few" + + +def _queue_status_change(previous_level: str | None, current_level: str) -> str: + if previous_level is None: + return "initial" + if previous_level == current_level: + return "unchanged" + if current_level == "crowded" and previous_level in {"normal", "few"}: + return "queue_increased" + if current_level == "few" and previous_level in {"normal", "crowded"}: + return "queue_decreased" + if current_level == "normal" and previous_level in {"crowded", "few"}: + return "queue_normalized" + return "changed" diff --git a/managed/people_flow_project/src/people_flow/webhook.py b/managed/people_flow_project/src/people_flow/webhook.py new file mode 100644 index 0000000..7241df8 --- /dev/null +++ b/managed/people_flow_project/src/people_flow/webhook.py @@ -0,0 +1,29 @@ +from __future__ import annotations + +import json +from pathlib import Path +from urllib import request + + +def dispatch_json_event( + path: str | Path, + payload: dict, + webhook_url: str = "", + timeout_seconds: float = 5.0, +) -> None: + output_path = Path(path) + output_path.parent.mkdir(parents=True, exist_ok=True) + with output_path.open("a", encoding="utf-8") as handle: + handle.write(json.dumps(payload, ensure_ascii=False) + "\n") + + if not webhook_url.strip(): + return + + req = request.Request( + url=webhook_url, + data=json.dumps(payload).encode("utf-8"), + method="POST", + ) + req.add_header("Content-Type", "application/json") + with request.urlopen(req, timeout=timeout_seconds): + return diff --git a/managed/people_flow_project/tests/test_manage_api.py b/managed/people_flow_project/tests/test_manage_api.py index 621d940..50dbebb 100644 --- a/managed/people_flow_project/tests/test_manage_api.py +++ b/managed/people_flow_project/tests/test_manage_api.py @@ -16,7 +16,15 @@ def build_client(project_root: Path): " rtsp_url: rtsp://before-update\n" " output_dir: outputs\n" "rtsp:\n" - " output_subdir: rtsp_stream\n", + " output_subdir: rtsp_stream\n" + "queue:\n" + " source_id: queue_cam_01\n" + " queue_time_threshold_seconds: 300\n" + " crowded_count_threshold: 5\n" + " normal_count_threshold: 2\n" + "webhook:\n" + " url: https://example.test/webhook\n" + " event_log_path: outputs/rtsp_stream/webhook_events.jsonl\n", encoding="utf-8", ) @@ -24,13 +32,23 @@ def build_client(project_root: Path): windows_dir = rtsp_dir / "windows" windows_dir.mkdir(parents=True, exist_ok=True) latest_payload = { + "event": "half_hour_report", "source_type": "rtsp", + "source_id": "queue_cam_01", "window_start": "2026-04-16T09:30:00+08:00", "window_end": "2026-04-16T10:00:00+08:00", "total_people": 7, "age_counts": {"minor": 1, "adult": 5, "senior": 1}, "gender_counts": {"male": 4, "female": 3}, "unknown_attributes": 2, + "queue_metrics": { + "queue_time_threshold_seconds": 300, + "over_threshold_count": 6, + "under_threshold_count": 2, + "queue_level": "crowded", + "previous_queue_level": "normal", + "status_change": "queue_increased", + }, "tracks": [ {"track_id": 1, "direction": "in"}, {"track_id": 2, "direction": "out"}, @@ -47,6 +65,14 @@ def build_client(project_root: Path): "window_start": "2026-04-16T09:00:00+08:00", "window_end": "2026-04-16T09:30:00+08:00", "total_people": 5, + "queue_metrics": { + "queue_time_threshold_seconds": 300, + "over_threshold_count": 2, + "under_threshold_count": 1, + "queue_level": "normal", + "previous_queue_level": None, + "status_change": "initial", + }, "age_counts": {"minor": 0, "adult": 4, "senior": 1}, "gender_counts": {"male": 2, "female": 3}, "unknown_attributes": 1, @@ -58,7 +84,12 @@ def build_client(project_root: Path): json.dumps(latest_payload), encoding="utf-8", ) - (project_root / "outputs" / "rtsp_run.log").write_text("rtsp ok\n", encoding="utf-8") + (project_root / "outputs" / "rtsp_run.log").write_text( + "rtsp ok\n", encoding="utf-8" + ) + (rtsp_dir / "webhook_events.jsonl").write_text( + json.dumps(latest_payload) + "\n", encoding="utf-8" + ) app = create_app(config_path) app.testing = True @@ -85,6 +116,8 @@ def test_get_manage_config(tmp_path: Path): assert response.json["config_path"] == str(config_path) assert response.json["runtime"]["rtsp_url"] == "rtsp://before-update" assert response.json["rtsp"]["output_subdir"] == "rtsp_stream" + assert response.json["queue"]["source_id"] == "queue_cam_01" + assert response.json["webhook"]["url"] == "https://example.test/webhook" def test_put_manage_config_updates_rtsp_url(tmp_path: Path): @@ -111,8 +144,15 @@ def test_get_manage_summary(tmp_path: Path): assert response.json["result_type"] == "people_flow_project" assert response.json["last_result_time"] == "2026-04-16T10:00:00+08:00" assert response.json["metrics"]["total_people"] == 7 + assert response.json["metrics"]["queue_level"] == "crowded" + assert response.json["metrics"]["over_threshold_count"] == 6 + assert response.json["metrics"]["under_threshold_count"] == 2 + assert response.json["metrics"]["status_change"] == "queue_increased" assert response.json["metrics"]["direction_counts"] == {"in": 2, "out": 1} - assert response.json["metrics"]["recent_window_stats"][0]["window_end"] == "2026-04-16T10:00:00+08:00" + assert ( + response.json["metrics"]["recent_window_stats"][0]["window_end"] + == "2026-04-16T10:00:00+08:00" + ) def test_get_manage_windows(tmp_path: Path): @@ -126,6 +166,7 @@ def test_get_manage_windows(tmp_path: Path): assert response.json["page_size"] == 1 assert response.json["items"][0]["window_end"] == "2026-04-16T10:00:00+08:00" assert response.json["items"][0]["total_people"] == 7 + assert response.json["items"][0]["queue_level"] == "crowded" def test_get_manage_files(tmp_path: Path): @@ -137,6 +178,7 @@ def test_get_manage_files(tmp_path: Path): assert {item["path"] for item in response.json["files"]} == { "outputs/rtsp_run.log", "outputs/rtsp_stream/latest.json", + "outputs/rtsp_stream/webhook_events.jsonl", "outputs/rtsp_stream/windows/stats_2026-04-16_09-00-00.json", "outputs/rtsp_stream/windows/stats_2026-04-16_09-30-00.json", } diff --git a/managed/people_flow_project/tests/test_queue_analytics.py b/managed/people_flow_project/tests/test_queue_analytics.py new file mode 100644 index 0000000..f3c69c3 --- /dev/null +++ b/managed/people_flow_project/tests/test_queue_analytics.py @@ -0,0 +1,43 @@ +from datetime import datetime +from zoneinfo import ZoneInfo + +from src.people_flow.models import QueueConfig, TrackObservation +from src.people_flow.queue_analytics import QueueWindowTracker + +TZ = ZoneInfo("Asia/Shanghai") + + +def test_queue_window_tracker_builds_crowded_report(): + tracker = QueueWindowTracker( + QueueConfig( + queue_time_threshold_seconds=300, + crowded_count_threshold=5, + normal_count_threshold=2, + pause_timeout_seconds=5, + ), + pixel_area=(0, 0, 100, 100), + ) + start = datetime(2026, 4, 15, 11, 0, tzinfo=TZ) + crowded_tracks = [ + TrackObservation( + track_id=index, bbox=(0, 0, 10, 10), confidence=0.9, center=(10, 10) + ) + for index in range(1, 7) + ] + short_tracks = [ + TrackObservation( + track_id=index, bbox=(0, 0, 10, 10), confidence=0.9, center=(10, 10) + ) + for index in range(7, 9) + ] + + tracker.observe(crowded_tracks, start) + tracker.observe(crowded_tracks, start.replace(minute=6)) + tracker.observe(crowded_tracks + short_tracks, start.replace(minute=27)) + tracker.observe(short_tracks, start.replace(minute=30)) + + queue_metrics = tracker.build_queue_metrics(start, start.replace(minute=30)) + + assert queue_metrics["over_threshold_count"] == 6 + assert queue_metrics["under_threshold_count"] == 2 + assert queue_metrics["queue_level"] == "crowded" diff --git a/managed/store_dwell_alert/app/config.py b/managed/store_dwell_alert/app/config.py index 8318822..f6b683c 100644 --- a/managed/store_dwell_alert/app/config.py +++ b/managed/store_dwell_alert/app/config.py @@ -8,8 +8,9 @@ import yaml @dataclass(slots=True) class Thresholds: - min_people: int = 5 - min_dwell_seconds: int = 600 + queue_time_threshold_seconds: int = 300 + crowded_count_threshold: int = 5 + normal_count_threshold: int = 2 pause_timeout_seconds: int = 300 alert_cooldown_seconds: int = 600 @@ -30,6 +31,7 @@ class StaffConfig: @dataclass(slots=True) class WebhookConfig: + url: str = "" alert_url: str = "" report_url: str = "" timeout_seconds: float = 5.0 @@ -52,7 +54,16 @@ class AppConfig: def _load_section(raw: dict, key: str, cls): - return cls(**raw.get(key, {})) + payload = dict(raw.get(key, {})) + if cls is Thresholds: + if ( + "queue_time_threshold_seconds" not in payload + and "min_dwell_seconds" in payload + ): + payload["queue_time_threshold_seconds"] = payload["min_dwell_seconds"] + if "crowded_count_threshold" not in payload and "min_people" in payload: + payload["crowded_count_threshold"] = payload["min_people"] + return cls(**payload) def resolve_config_path(config_path: str | Path | None = None) -> Path: diff --git a/managed/store_dwell_alert/app/main.py b/managed/store_dwell_alert/app/main.py index de7dbf8..b376460 100644 --- a/managed/store_dwell_alert/app/main.py +++ b/managed/store_dwell_alert/app/main.py @@ -15,7 +15,7 @@ from app.config import ( from app.modules.detector_tracker import YOLOTrackerAdapter from app.modules.dwell_engine import DwellEngine from app.modules.identity_resolver import IdentityResolver -from app.modules.notifier import append_json_event +from app.modules.notifier import dispatch_json_event from app.modules.staff_filter import StaffMatcher, load_staff_gallery from app.modules.stream_reader import RTSPFrameReader @@ -46,12 +46,20 @@ def build_app(config_path: str | Path | None = None) -> dict: ), "dwell_engine": DwellEngine( camera_id=config.camera_id, - min_people=config.thresholds.min_people, - min_dwell_seconds=config.thresholds.min_dwell_seconds, + queue_time_threshold_seconds=config.thresholds.queue_time_threshold_seconds, + crowded_count_threshold=config.thresholds.crowded_count_threshold, + normal_count_threshold=config.thresholds.normal_count_threshold, pause_timeout_seconds=config.thresholds.pause_timeout_seconds, alert_cooldown_seconds=config.thresholds.alert_cooldown_seconds, ), - "notifier": append_json_event, + "notifier": lambda path, event: dispatch_json_event( + path, + event, + webhook_url=config.webhook.url + or config.webhook.report_url + or config.webhook.alert_url, + timeout_seconds=config.webhook.timeout_seconds, + ), "event_sink_path": event_sink_path, } @@ -102,14 +110,18 @@ def run_forever(app: dict, max_frames: int | None = None) -> int: def parse_args() -> ArgumentParser: parser = ArgumentParser(description="Store dwell alert service bootstrap") parser.add_argument("--config", help="Path to YAML config file") - parser.add_argument("--once", action="store_true", help="Read and process one frame") + parser.add_argument( + "--once", action="store_true", help="Read and process one frame" + ) parser.add_argument( "--manage-api", action="store_true", help="Start the management API instead of the RTSP worker loop", ) parser.add_argument("--host", default="0.0.0.0", help="Host for the management API") - parser.add_argument("--port", type=int, default=18081, help="Port for the management API") + parser.add_argument( + "--port", type=int, default=18081, help="Port for the management API" + ) parser.add_argument( "--max-frames", type=int, diff --git a/managed/store_dwell_alert/app/manage_api.py b/managed/store_dwell_alert/app/manage_api.py index 5baf543..d906eff 100644 --- a/managed/store_dwell_alert/app/manage_api.py +++ b/managed/store_dwell_alert/app/manage_api.py @@ -17,7 +17,6 @@ from app.config import ( save_config_document, ) - PROJECT_TYPE = "store_dwell_alert" DEFAULT_MANAGE_PORT = 18081 MAX_PREVIEW_LINES = 2000 @@ -136,7 +135,12 @@ def parse_args() -> ArgumentParser: parser = ArgumentParser(description="Store dwell alert management API") parser.add_argument("--config", help="Path to YAML config file") parser.add_argument("--host", default="0.0.0.0", help="Host for the management API") - parser.add_argument("--port", type=int, default=DEFAULT_MANAGE_PORT, help="Port for the management API") + parser.add_argument( + "--port", + type=int, + default=DEFAULT_MANAGE_PORT, + help="Port for the management API", + ) return parser @@ -160,9 +164,18 @@ def _config_payload(ctx: ManageContext) -> dict: "sample_fps": config.stream.sample_fps, "reconnect_backoff_seconds": config.stream.reconnect_backoff_seconds, }, + "thresholds": { + "queue_time_threshold_seconds": config.thresholds.queue_time_threshold_seconds, + "crowded_count_threshold": config.thresholds.crowded_count_threshold, + "normal_count_threshold": config.thresholds.normal_count_threshold, + }, "event_sink": { "path": str(event_sink_path), }, + "webhook": { + "url": config.webhook.url, + "timeout_seconds": config.webhook.timeout_seconds, + }, } @@ -179,11 +192,13 @@ def _build_summary(ctx: ManageContext) -> dict: }, } - alert_count = 0 - last_alert_time = "" last_report_time = "" active_count = 0 longest_dwell_seconds = 0 + queue_level = "" + over_threshold_count = 0 + under_threshold_count = 0 + status_change = "" window_stats: list[dict] = [] with events_path.open("r", encoding="utf-8") as handle: @@ -196,10 +211,7 @@ def _build_summary(ctx: ManageContext) -> dict: except json.JSONDecodeError: continue - if payload.get("event") == "long_stay_alert": - alert_count += 1 - last_alert_time = _string_value(payload.get("ts")) - elif payload.get("event") == "half_hour_report": + if payload.get("event") == "half_hour_report": last_report_time = _string_value(payload.get("window_end")) active_count = _int_value(payload.get("active_customer_count")) stat = _build_window_stat(payload) @@ -208,31 +220,36 @@ def _build_summary(ctx: ManageContext) -> dict: longest_dwell_seconds, stat["max_wait_seconds"], ) + queue_level = stat["queue_level"] + over_threshold_count = stat["over_threshold_count"] + under_threshold_count = stat["under_threshold_count"] + status_change = stat["status_change"] window_stats.sort( key=lambda item: _parse_timestamp(item["window_end"]), reverse=True, ) - headline = "No alerts or reports yet" + headline = "No reports yet" if last_report_time: headline = ( "Latest report shows " - f"{active_count} active customers, longest dwell {longest_dwell_seconds}s" + f"{queue_level or 'unknown'} queue, " + f"{over_threshold_count} over 5 min and {under_threshold_count} under 5 min" ) - elif alert_count > 0: - headline = f"Observed {alert_count} alert(s), latest alert at {last_alert_time}" return { "result_type": PROJECT_TYPE, "headline": headline, - "last_result_time": _latest_timestamp(last_alert_time, last_report_time), + "last_result_time": _latest_timestamp(last_report_time), "metrics": { - "alert_count": alert_count, - "last_alert_time": last_alert_time, "last_half_hour_report_time": last_report_time, "active_customer_count": active_count, "longest_dwell_seconds": longest_dwell_seconds, + "queue_level": queue_level, + "over_threshold_count": over_threshold_count, + "under_threshold_count": under_threshold_count, + "status_change": status_change, "events_path": str(events_path), "recent_window_stats": window_stats[:24], "all_window_stats": window_stats, @@ -260,7 +277,9 @@ def _list_result_files(ctx: ManageContext) -> list[dict]: "label": label, "kind": path.suffix.lstrip(".").lower(), "size": info.st_size, - "modified_at": datetime.fromtimestamp(info.st_mtime).astimezone().isoformat(), + "modified_at": datetime.fromtimestamp(info.st_mtime) + .astimezone() + .isoformat(), } ) @@ -279,12 +298,21 @@ def _build_window_stat(payload: dict) -> dict: payload.get("closed_customers"), "final_dwell_seconds", ) + queue_metrics = ( + payload.get("queue_metrics") + if isinstance(payload.get("queue_metrics"), dict) + else {} + ) return { "window_start": _string_value(payload.get("window_start")), "window_end": _string_value(payload.get("window_end")), "active_customer_count": _int_value(payload.get("active_customer_count")), "active_wait_seconds": active_wait_seconds, "closed_wait_seconds": closed_wait_seconds, + "queue_level": _string_value(queue_metrics.get("queue_level")), + "over_threshold_count": _int_value(queue_metrics.get("over_threshold_count")), + "under_threshold_count": _int_value(queue_metrics.get("under_threshold_count")), + "status_change": _string_value(queue_metrics.get("status_change")), "max_wait_seconds": max( max(active_wait_seconds, default=0), max(closed_wait_seconds, default=0), diff --git a/managed/store_dwell_alert/app/modules/dwell_engine.py b/managed/store_dwell_alert/app/modules/dwell_engine.py index 031f7a8..e42d811 100644 --- a/managed/store_dwell_alert/app/modules/dwell_engine.py +++ b/managed/store_dwell_alert/app/modules/dwell_engine.py @@ -1,6 +1,6 @@ from __future__ import annotations -from dataclasses import dataclass +from dataclasses import dataclass, field from datetime import datetime from app.modules.reporter import floor_half_hour, previous_half_hour_window @@ -18,6 +18,7 @@ class DwellSession: last_seen_at: datetime | None = None pause_started_at: datetime | None = None closed_at: datetime | None = None + completed_periods: list[tuple[datetime, datetime]] = field(default_factory=list) def __post_init__(self) -> None: if self.active_started_at is None: @@ -46,6 +47,7 @@ class DwellSession: def pause(self, when: datetime) -> None: if self.state != "active" or self.active_started_at is None: return + self.completed_periods.append((self.active_started_at, when)) self.accumulated_dwell_seconds += max( 0, int((when - self.active_started_at).total_seconds()), @@ -73,19 +75,41 @@ class DwellSession: "dwell_seconds": self.dwell_seconds(when), } + def window_dwell_seconds( + self, + window_start: datetime, + window_end: datetime, + when: datetime | None = None, + ) -> int: + total = 0 + for period_start, period_end in self.completed_periods: + total += _overlap_seconds( + period_start, period_end, window_start, window_end + ) + + if self.state == "active" and self.active_started_at is not None: + current_end = when or self.last_seen_at or window_end + total += _overlap_seconds( + self.active_started_at, current_end, window_start, window_end + ) + + return total + class DwellEngine: def __init__( self, camera_id: str, - min_people: int, - min_dwell_seconds: int, + queue_time_threshold_seconds: int, + crowded_count_threshold: int, + normal_count_threshold: int, pause_timeout_seconds: int, alert_cooldown_seconds: int, ) -> None: self.camera_id = camera_id - self.min_people = min_people - self.min_dwell_seconds = min_dwell_seconds + self.queue_time_threshold_seconds = queue_time_threshold_seconds + self.crowded_count_threshold = crowded_count_threshold + self.normal_count_threshold = normal_count_threshold self.pause_timeout_seconds = pause_timeout_seconds self.alert_cooldown_seconds = alert_cooldown_seconds self.sessions: dict[str, DwellSession] = {} @@ -94,13 +118,16 @@ class DwellEngine: self.alert_rearmed = True self.last_alert_at: datetime | None = None self.last_report_boundary: datetime | None = None + self.last_queue_level: str | None = None def _next_session_id(self, person_id: str) -> str: next_index = self.session_counts.get(person_id, 0) + 1 self.session_counts[person_id] = next_index return f"{person_id}-s{next_index}" - def _create_session(self, person_id: str, role: str, when: datetime) -> DwellSession: + def _create_session( + self, person_id: str, role: str, when: datetime + ) -> DwellSession: session = DwellSession( person_id=person_id, session_id=self._next_session_id(person_id), @@ -110,7 +137,9 @@ class DwellEngine: self.sessions[person_id] = session return session - def process_observations(self, observations: list[dict], when: datetime) -> list[dict]: + def process_observations( + self, observations: list[dict], when: datetime + ) -> list[dict]: events: list[dict] = [] seen_people: set[str] = set() @@ -151,12 +180,12 @@ class DwellEngine: for session in self.sessions.values() if session.role == "customer" and session.state == "active" - and session.dwell_seconds(when) >= self.min_dwell_seconds + and session.dwell_seconds(when) >= self.queue_time_threshold_seconds ] def _build_alert_event(self, when: datetime) -> dict | None: long_stay_sessions = self._active_customer_sessions(when) - if len(long_stay_sessions) < self.min_people: + if len(long_stay_sessions) < self.crowded_count_threshold: self.alert_rearmed = True return None if not self.alert_rearmed: @@ -168,8 +197,8 @@ class DwellEngine: "camera_id": self.camera_id, "ts": when.isoformat(), "threshold": { - "min_people": self.min_people, - "min_dwell_seconds": self.min_dwell_seconds, + "min_people": self.crowded_count_threshold, + "min_dwell_seconds": self.queue_time_threshold_seconds, }, "active_long_stay_count": len(long_stay_sessions), "people": [ @@ -192,8 +221,15 @@ class DwellEngine: return None window_start, window_end = previous_half_hour_window(when) + queue_totals = self._queue_totals(window_start, window_end, when) + queue_metrics = self._build_queue_metrics(queue_totals) active_customers = [ - session.as_event_dict(when) + { + **session.as_event_dict(when), + "window_queue_seconds": session.window_dwell_seconds( + window_start, window_end, when + ), + } for session in self.sessions.values() if session.role == "customer" and session.state == "active" ] @@ -202,25 +238,147 @@ class DwellEngine: "person_id": session.person_id, "session_id": session.session_id, "final_dwell_seconds": session.dwell_seconds(window_end), + "window_queue_seconds": session.window_dwell_seconds( + window_start, window_end, window_end + ), } for session in self.closed_sessions if session.role == "customer" and session.closed_at is not None and window_start < session.closed_at <= window_end ] - staff_seen_count = sum(1 for session in self.sessions.values() if session.role == "staff") + staff_seen_count = sum( + 1 for session in self.sessions.values() if session.role == "staff" + ) self.last_report_boundary = boundary return { "event": "half_hour_report", + "project_type": "store_dwell_alert", "camera_id": self.camera_id, + "source_id": self.camera_id, "window_start": window_start.isoformat(), "window_end": window_end.isoformat(), "active_customer_count": len(active_customers), "active_customers": active_customers, "closed_customers": closed_customers, "staff_seen_count": staff_seen_count, + "queue_metrics": queue_metrics, } + def _queue_totals( + self, + window_start: datetime, + window_end: datetime, + when: datetime, + ) -> dict[str, int]: + totals: dict[str, int] = {} + for session in self.closed_sessions: + if session.role != "customer": + continue + window_seconds = session.window_dwell_seconds( + window_start, window_end, window_end + ) + if window_seconds > 0: + totals[session.person_id] = ( + totals.get(session.person_id, 0) + window_seconds + ) + + for session in self.sessions.values(): + if session.role != "customer": + continue + window_seconds = session.window_dwell_seconds( + window_start, window_end, when + ) + if window_seconds > 0: + totals[session.person_id] = ( + totals.get(session.person_id, 0) + window_seconds + ) + + return totals + + def _build_queue_metrics(self, queue_totals: dict[str, int]) -> dict: + over_threshold_count = sum( + 1 + for seconds in queue_totals.values() + if seconds >= self.queue_time_threshold_seconds + ) + under_threshold_count = sum( + 1 + for seconds in queue_totals.values() + if 0 < seconds < self.queue_time_threshold_seconds + ) + queue_level = _queue_level( + over_threshold_count, + crowded_count_threshold=self.crowded_count_threshold, + normal_count_threshold=self.normal_count_threshold, + ) + previous_queue_level = self.last_queue_level + status_change = _queue_status_change(previous_queue_level, queue_level) + self.last_queue_level = queue_level + return { + "queue_time_threshold_seconds": self.queue_time_threshold_seconds, + "over_threshold_count": over_threshold_count, + "under_threshold_count": under_threshold_count, + "queue_level": queue_level, + "previous_queue_level": previous_queue_level, + "status_change": status_change, + "people": [ + { + "person_id": person_id, + "queue_seconds": queue_seconds, + "bucket": ( + "over_threshold" + if queue_seconds >= self.queue_time_threshold_seconds + else "under_threshold" + ), + } + for person_id, queue_seconds in sorted( + queue_totals.items(), + key=lambda item: item[1], + reverse=True, + ) + ], + } + + +def _overlap_seconds( + period_start: datetime, + period_end: datetime, + window_start: datetime, + window_end: datetime, +) -> int: + overlap_start = max(period_start, window_start) + overlap_end = min(period_end, window_end) + if overlap_end <= overlap_start: + return 0 + return int((overlap_end - overlap_start).total_seconds()) + + +def _queue_level( + over_threshold_count: int, + crowded_count_threshold: int, + normal_count_threshold: int, +) -> str: + if over_threshold_count > crowded_count_threshold: + return "crowded" + if over_threshold_count >= normal_count_threshold: + return "normal" + return "few" + + +def _queue_status_change(previous_level: str | None, current_level: str) -> str: + if previous_level is None: + return "initial" + if previous_level == current_level: + return "unchanged" + if current_level == "crowded" and previous_level in {"normal", "few"}: + return "queue_increased" + if current_level == "few" and previous_level in {"normal", "crowded"}: + return "queue_decreased" + if current_level == "normal" and previous_level in {"crowded", "few"}: + return "queue_normalized" + return "changed" + def long_stay_count(sessions: list[dict], min_dwell_seconds: int) -> int: return sum( diff --git a/managed/store_dwell_alert/app/modules/notifier.py b/managed/store_dwell_alert/app/modules/notifier.py index 8966551..e22ec77 100644 --- a/managed/store_dwell_alert/app/modules/notifier.py +++ b/managed/store_dwell_alert/app/modules/notifier.py @@ -5,7 +5,9 @@ from pathlib import Path from urllib import request -def build_json_request(url: str, payload: dict, timeout_seconds: float = 5.0) -> request.Request: +def build_json_request( + url: str, payload: dict, timeout_seconds: float = 5.0 +) -> request.Request: data = json.dumps(payload).encode("utf-8") req = request.Request(url=url, data=data, method="POST") req.add_header("Content-Type", "application/json") @@ -18,3 +20,22 @@ def append_json_event(path: str | Path, payload: dict) -> None: output_path.parent.mkdir(parents=True, exist_ok=True) with output_path.open("a", encoding="utf-8") as handle: handle.write(json.dumps(payload, ensure_ascii=False) + "\n") + + +def post_json_event(url: str, payload: dict, timeout_seconds: float = 5.0) -> None: + if not url.strip(): + return + req = build_json_request(url, payload, timeout_seconds=timeout_seconds) + with request.urlopen(req, timeout=timeout_seconds): + return + + +def dispatch_json_event( + path: str | Path, + payload: dict, + webhook_url: str = "", + timeout_seconds: float = 5.0, +) -> None: + append_json_event(path, payload) + if webhook_url.strip(): + post_json_event(webhook_url, payload, timeout_seconds=timeout_seconds) diff --git a/managed/store_dwell_alert/config/config.example.yaml b/managed/store_dwell_alert/config/config.example.yaml index b723e57..e95b432 100644 --- a/managed/store_dwell_alert/config/config.example.yaml +++ b/managed/store_dwell_alert/config/config.example.yaml @@ -2,8 +2,9 @@ camera_id: store_cam_01 timezone: Asia/Shanghai thresholds: - min_people: 5 - min_dwell_seconds: 600 + queue_time_threshold_seconds: 300 + crowded_count_threshold: 5 + normal_count_threshold: 2 pause_timeout_seconds: 300 alert_cooldown_seconds: 600 @@ -21,6 +22,7 @@ event_sink: path: logs/events.jsonl webhook: + url: "" alert_url: "" report_url: "" timeout_seconds: 5.0 diff --git a/managed/store_dwell_alert/scripts/docker-entrypoint.sh b/managed/store_dwell_alert/scripts/docker-entrypoint.sh index 6e4a774..221b5a2 100755 --- a/managed/store_dwell_alert/scripts/docker-entrypoint.sh +++ b/managed/store_dwell_alert/scripts/docker-entrypoint.sh @@ -40,4 +40,67 @@ config_path.write_text( ) PY -exec python -m app.manage_api --config "${CONFIG_PATH}" --host "${API_HOST}" --port "${API_PORT}" +exec python - "$CONFIG_PATH" "$API_HOST" "$API_PORT" <<'PY' +import signal +import subprocess +import sys +import time + +config_path, api_host, api_port = sys.argv[1:4] +commands = [ + [sys.executable, "-m", "app.main", "--config", config_path], + [ + sys.executable, + "-m", + "app.manage_api", + "--config", + config_path, + "--host", + api_host, + "--port", + api_port, + ], +] +processes = [subprocess.Popen(command) for command in commands] + + +def terminate_all(signum, _frame): + for process in processes: + if process.poll() is None: + process.terminate() + deadline = time.time() + 10 + for process in processes: + if process.poll() is not None: + continue + timeout = max(0, deadline - time.time()) + try: + process.wait(timeout=timeout) + except subprocess.TimeoutExpired: + process.kill() + raise SystemExit(128 + signum) + + +for handled_signal in (signal.SIGINT, signal.SIGTERM): + signal.signal(handled_signal, terminate_all) + +while True: + for index, process in enumerate(processes): + return_code = process.poll() + if return_code is None: + continue + for other_index, other_process in enumerate(processes): + if other_index == index or other_process.poll() is not None: + continue + other_process.terminate() + deadline = time.time() + 10 + for other_index, other_process in enumerate(processes): + if other_index == index or other_process.poll() is not None: + continue + timeout = max(0, deadline - time.time()) + try: + other_process.wait(timeout=timeout) + except subprocess.TimeoutExpired: + other_process.kill() + raise SystemExit(return_code) + time.sleep(0.5) +PY diff --git a/managed/store_dwell_alert/tests/test_config.py b/managed/store_dwell_alert/tests/test_config.py index b7868b4..85c80df 100644 --- a/managed/store_dwell_alert/tests/test_config.py +++ b/managed/store_dwell_alert/tests/test_config.py @@ -8,16 +8,18 @@ def test_load_config_reads_thresholds(tmp_path: Path): cfg.write_text( "camera_id: store_cam_01\n" "thresholds:\n" - " min_people: 5\n" - " min_dwell_seconds: 600\n", + " queue_time_threshold_seconds: 300\n" + " crowded_count_threshold: 5\n" + " normal_count_threshold: 2\n", encoding="utf-8", ) data = load_config(cfg) assert data.camera_id == "store_cam_01" - assert data.thresholds.min_people == 5 - assert data.thresholds.min_dwell_seconds == 600 + assert data.thresholds.queue_time_threshold_seconds == 300 + assert data.thresholds.crowded_count_threshold == 5 + assert data.thresholds.normal_count_threshold == 2 def test_load_config_uses_defaults_for_optional_sections(tmp_path: Path): @@ -29,4 +31,5 @@ def test_load_config_uses_defaults_for_optional_sections(tmp_path: Path): assert data.stream.sample_fps == 2.0 assert data.staff.min_hits == 3 assert data.event_sink.path == "logs/events.jsonl" + assert data.webhook.url == "" assert data.webhook.timeout_seconds == 5.0 diff --git a/managed/store_dwell_alert/tests/test_dwell_engine.py b/managed/store_dwell_alert/tests/test_dwell_engine.py index 4448834..cd85fa5 100644 --- a/managed/store_dwell_alert/tests/test_dwell_engine.py +++ b/managed/store_dwell_alert/tests/test_dwell_engine.py @@ -1,8 +1,7 @@ from datetime import datetime from zoneinfo import ZoneInfo -from app.modules.dwell_engine import DwellEngine, DwellSession, long_stay_count - +from app.modules.dwell_engine import DwellEngine, DwellSession TZ = ZoneInfo("Asia/Shanghai") @@ -12,34 +11,81 @@ def test_session_pauses_without_adding_absence_time(): session = DwellSession(person_id="cust_1", session_id="cust_1-s1", entered_at=start) session.mark_seen(start.replace(minute=2)) session.pause(start.replace(minute=2, second=10)) - session.close_if_expired(start.replace(minute=7, second=11), pause_timeout_seconds=300) + session.close_if_expired( + start.replace(minute=7, second=11), pause_timeout_seconds=300 + ) assert session.state == "closed" assert session.dwell_seconds() == 130 -def test_engine_emits_alert_when_five_long_stays_are_active(): +def test_engine_emits_half_hour_report_with_queue_classification(): engine = DwellEngine( camera_id="store_cam_01", - min_people=5, - min_dwell_seconds=600, + queue_time_threshold_seconds=300, + crowded_count_threshold=5, + normal_count_threshold=2, pause_timeout_seconds=300, alert_cooldown_seconds=600, ) - now = datetime(2026, 4, 15, 11, 20, tzinfo=TZ) - observations = [{"person_id": f"cust_{idx}", "role": "customer"} for idx in range(5)] + start = datetime(2026, 4, 15, 11, 0, tzinfo=TZ) + crowded_group = [ + {"person_id": f"cust_{idx}", "role": "customer"} for idx in range(6) + ] + short_wait_group = [ + {"person_id": f"short_{idx}", "role": "customer"} for idx in range(2) + ] - engine.process_observations(observations, now.replace(minute=9, second=0)) - events = engine.process_observations(observations, now) + engine.process_observations(crowded_group, start.replace(minute=0, second=0)) + engine.process_observations(crowded_group, start.replace(minute=6, second=0)) + engine.process_observations( + crowded_group + short_wait_group, start.replace(minute=27, second=0) + ) + events = engine.process_observations( + short_wait_group, start.replace(minute=30, second=0) + ) - assert [event["event"] for event in events] == ["long_stay_alert"] - assert events[0]["active_long_stay_count"] == 5 + report = next(event for event in events if event["event"] == "half_hour_report") + assert report["queue_metrics"]["over_threshold_count"] == 6 + assert report["queue_metrics"]["under_threshold_count"] == 2 + assert report["queue_metrics"]["queue_level"] == "crowded" + assert report["queue_metrics"]["status_change"] == "initial" + + +def test_engine_tracks_queue_status_change_between_windows(): + engine = DwellEngine( + camera_id="store_cam_01", + queue_time_threshold_seconds=300, + crowded_count_threshold=5, + normal_count_threshold=2, + pause_timeout_seconds=300, + alert_cooldown_seconds=600, + ) + start = datetime(2026, 4, 15, 11, 0, tzinfo=TZ) + engine.process_observations( + [{"person_id": f"crowded_{idx}", "role": "customer"} for idx in range(6)], + start, + ) + engine.process_observations([], start.replace(minute=30)) + engine.process_observations( + [{"person_id": f"normal_{idx}", "role": "customer"} for idx in range(3)], + start.replace(minute=31), + ) + report_events = engine.process_observations([], start.replace(hour=12, minute=0)) + + report = next( + event for event in report_events if event["event"] == "half_hour_report" + ) + assert report["queue_metrics"]["queue_level"] == "normal" + assert report["queue_metrics"]["status_change"] == "queue_normalized" + assert report["queue_metrics"]["previous_queue_level"] == "crowded" def test_engine_emits_half_hour_report_with_closed_customers(): engine = DwellEngine( camera_id="store_cam_01", - min_people=5, - min_dwell_seconds=600, + queue_time_threshold_seconds=300, + crowded_count_threshold=5, + normal_count_threshold=2, pause_timeout_seconds=300, alert_cooldown_seconds=600, ) @@ -53,11 +99,5 @@ def test_engine_emits_half_hour_report_with_closed_customers(): report = next(event for event in events if event["event"] == "half_hour_report") assert report["window_end"] == "2026-04-15T11:30:00+08:00" assert report["closed_customers"][0]["person_id"] == "cust_1" - - -def test_long_stay_count_excludes_staff(): - sessions = [ - {"role": "customer", "state": "active", "dwell_seconds": 700}, - {"role": "staff", "state": "active", "dwell_seconds": 40000}, - ] - assert long_stay_count(sessions, min_dwell_seconds=600) == 1 + assert report["queue_metrics"]["over_threshold_count"] == 0 + assert report["queue_metrics"]["under_threshold_count"] == 1 diff --git a/managed/store_dwell_alert/tests/test_manage_api.py b/managed/store_dwell_alert/tests/test_manage_api.py index 810728f..b3dad88 100644 --- a/managed/store_dwell_alert/tests/test_manage_api.py +++ b/managed/store_dwell_alert/tests/test_manage_api.py @@ -14,6 +14,10 @@ def build_client(project_root: Path): config_path.write_text( "camera_id: store_cam_01\n" "timezone: Asia/Shanghai\n" + "thresholds:\n" + " queue_time_threshold_seconds: 300\n" + " crowded_count_threshold: 5\n" + " normal_count_threshold: 2\n" "stream:\n" " rtsp_url: rtsp://before-update\n" " sample_fps: 2.0\n" @@ -28,13 +32,6 @@ def build_client(project_root: Path): (logs_dir / "events.jsonl").write_text( "\n".join( [ - json.dumps( - { - "event": "long_stay_alert", - "camera_id": "store_cam_01", - "ts": "2026-04-16T09:00:00+08:00", - } - ), json.dumps( { "event": "half_hour_report", @@ -50,6 +47,14 @@ def build_client(project_root: Path): {"person_id": "cust_3", "final_dwell_seconds": 450} ], "staff_seen_count": 1, + "queue_metrics": { + "queue_time_threshold_seconds": 300, + "over_threshold_count": 2, + "under_threshold_count": 1, + "queue_level": "normal", + "previous_queue_level": null, + "status_change": "initial", + }, } ), json.dumps( @@ -67,6 +72,14 @@ def build_client(project_root: Path): {"person_id": "cust_6", "final_dwell_seconds": 120}, ], "staff_seen_count": 0, + "queue_metrics": { + "queue_time_threshold_seconds": 300, + "over_threshold_count": 6, + "under_threshold_count": 2, + "queue_level": "crowded", + "previous_queue_level": "normal", + "status_change": "queue_increased", + }, } ), ] @@ -126,10 +139,14 @@ def test_get_manage_summary(tmp_path: Path): assert response.status_code == 200 assert response.json["result_type"] == "store_dwell_alert" assert response.json["last_result_time"] == "2026-04-16T10:00:00+08:00" - assert response.json["metrics"]["alert_count"] == 1 - assert response.json["metrics"]["active_customer_count"] == 1 - assert response.json["metrics"]["longest_dwell_seconds"] == 900 - assert response.json["metrics"]["recent_window_stats"][0]["window_end"] == "2026-04-16T10:00:00+08:00" + assert response.json["metrics"]["queue_level"] == "crowded" + assert response.json["metrics"]["over_threshold_count"] == 6 + assert response.json["metrics"]["under_threshold_count"] == 2 + assert response.json["metrics"]["status_change"] == "queue_increased" + assert ( + response.json["metrics"]["recent_window_stats"][0]["window_end"] + == "2026-04-16T10:00:00+08:00" + ) def test_get_manage_windows(tmp_path: Path): diff --git a/web/src/views/ManagedServiceDetail.vue b/web/src/views/ManagedServiceDetail.vue index d3ef57a..e842cc5 100644 --- a/web/src/views/ManagedServiceDetail.vue +++ b/web/src/views/ManagedServiceDetail.vue @@ -5,15 +5,13 @@ 返回列表 -

{{ service?.display_name || route.params.id }}

+

+ {{ service?.display_name || route.params.id }} +

刷新 - + 重启容器 编辑 RTSP @@ -96,6 +94,36 @@

{{ service.summary.headline }}

+
+
+ 排队等级 +
+ + {{ queueLevelText(queueSummaryMetrics.queue_level) }} + +
+
+
+ 超 5 分钟人数 + + {{ queueSummaryMetrics.over_threshold_count ?? 0 }} + +
+
+ 低于 5 分钟人数 + + {{ queueSummaryMetrics.under_threshold_count ?? 0 }} + +
+
+ 状态变化 + + {{ queueChangeText(queueSummaryMetrics.status_change) }} + +
+
+ + + + + + + + + + + +