diff --git a/Dockerfile b/Dockerfile
index 3641d06..3b8513f 100644
--- a/Dockerfile
+++ b/Dockerfile
@@ -1,7 +1,8 @@
FROM swr.cn-north-4.myhuaweicloud.com/ddn-k8s/docker.io/library/golang:1.25.4-alpine AS builder
ENV TZ=Asia/Shanghai \
- GOPROXY=https://goproxy.cn,direct
+ GOPROXY=https://goproxy.cn,direct \
+ GOSUMDB=sum.golang.google.cn
RUN sed -i 's/dl-cdn.alpinelinux.org/mirrors.aliyun.com/g' /etc/apk/repositories
RUN apk add --no-cache ca-certificates tzdata
diff --git a/deploy/managed-portal.10.8.0.11.env b/deploy/managed-portal.10.8.0.11.env
new file mode 100644
index 0000000..b680288
--- /dev/null
+++ b/deploy/managed-portal.10.8.0.11.env
@@ -0,0 +1,13 @@
+IMAGE_VERSION=dev
+TZ=Asia/Shanghai
+MANAGED_PORTAL_WEB_PORT=13000
+
+MANAGED_STORE_DWELL_CAMERA_ID=cam_192_168_3_3
+MANAGED_STORE_DWELL_RTSP_URL=rtsp://admin:Zxjp2026@192.168.3.3:554/h264/ch1/main/av_stream
+MANAGED_STORE_DWELL_EVENT_SINK_PATH=logs/events.jsonl
+MANAGED_STORE_DWELL_CONFIG_DIR=../managed/store_dwell_alert/config
+MANAGED_STORE_DWELL_DATA_DIR=../managed/store_dwell_alert/data
+
+MANAGED_PEOPLE_FLOW_RTSP_URL=rtsp://admin:Zxjp2026@192.168.3.3:554/h264/ch1/main/av_stream
+MANAGED_PEOPLE_FLOW_CONFIG_DIR=../managed/people_flow_project/config
+MANAGED_PEOPLE_FLOW_OUTPUT_DIR=../managed/people_flow_project/outputs
\ No newline at end of file
diff --git a/docs/managed-queue-webhook.md b/docs/managed-queue-webhook.md
new file mode 100644
index 0000000..1df8c88
--- /dev/null
+++ b/docs/managed-queue-webhook.md
@@ -0,0 +1,249 @@
+# Managed Queue Webhook 对接说明
+
+本文档说明 `managed/store_dwell_alert` 和 `managed/people_flow_project` 两个工程新增的排队统计与 webhook 推送结构,便于接收方按统一协议完成对接。
+
+## 业务规则
+
+- 统计窗口固定为每 30 分钟一个窗口。
+- 每个窗口统计两类人数:
+ - `over_threshold_count`:该窗口内累计排队时间大于等于 5 分钟的人数。
+ - `under_threshold_count`:该窗口内累计排队时间小于 5 分钟但大于 0 的人数。
+- 一阶段排队等级规则:
+ - `crowded`:`over_threshold_count > 5`
+ - `normal`:`2 <= over_threshold_count <= 5`
+ - `few`:`over_threshold_count < 2`
+- 状态变化规则:
+ - `queue_increased`:`normal -> crowded` 或 `few -> crowded`
+ - `queue_decreased`:`normal -> few` 或 `crowded -> few`
+ - `queue_normalized`:`crowded -> normal` 或 `few -> normal`
+ - `unchanged`:窗口等级未变化
+ - `initial`:首个统计窗口,没有上一个窗口可比较
+
+## 配置方式
+
+### store_dwell_alert
+
+配置文件示例:`managed/store_dwell_alert/config/config.example.yaml`
+
+```yaml
+thresholds:
+ queue_time_threshold_seconds: 300
+ crowded_count_threshold: 5
+ normal_count_threshold: 2
+ pause_timeout_seconds: 300
+ alert_cooldown_seconds: 600
+
+webhook:
+ url: "https://receiver.example.com/queue-report"
+ timeout_seconds: 5.0
+```
+
+### people_flow_project
+
+配置文件示例:`managed/people_flow_project/config/config.example.yaml`
+
+```yaml
+queue:
+ enabled: true
+ area: [0.0, 0.0, 1.0, 1.0]
+ area_mode: "normalized"
+ queue_time_threshold_seconds: 300
+ crowded_count_threshold: 5
+ normal_count_threshold: 2
+ pause_timeout_seconds: 5
+ source_id: "queue_cam_01"
+
+webhook:
+ url: "https://receiver.example.com/queue-report"
+ timeout_seconds: 5.0
+ event_log_path: "outputs/rtsp_stream/webhook_events.jsonl"
+```
+
+说明:
+
+- `queue.area` 用于限定排队区域,默认 `[0, 0, 1, 1]` 表示全画面。
+- `queue.area_mode=normalized` 表示区域坐标按画面宽高归一化。
+- 两个工程都会先把 webhook 数据写入本地结果文件,再尝试 HTTP POST。
+
+## 推送时机
+
+- 两个工程都会在每个 30 分钟窗口结束时推送一次 `half_hour_report`。
+- `store_dwell_alert` 仍会继续生成本地事件日志;用于对接的窗口报文以本文档的 `half_hour_report` 结构为准。
+
+## 公共字段
+
+两个工程的 webhook 都包含以下公共字段:
+
+| 字段 | 类型 | 说明 |
+| --------------- | ------ | ---------------------------------------------------------------------------------------------- |
+| `event` | string | 固定为 `half_hour_report` |
+| `project_type` | string | 工程类型,值为 `store_dwell_alert` 或 `people_flow_project` |
+| `source_id` | string | 数据源标识。`store_dwell_alert` 使用 `camera_id`,`people_flow_project` 使用 `queue.source_id` |
+| `window_start` | string | 窗口开始时间,ISO 8601 |
+| `window_end` | string | 窗口结束时间,ISO 8601 |
+| `queue_metrics` | object | 排队统计主体 |
+
+`queue_metrics` 结构:
+
+| 字段 | 类型 | 说明 |
+| ------------------------------ | ----------- | ------------------------------------------------------------------------------------ |
+| `queue_time_threshold_seconds` | integer | 长等待阈值,当前默认 300 秒 |
+| `over_threshold_count` | integer | 窗口内累计排队时间大于等于阈值的人数 |
+| `under_threshold_count` | integer | 窗口内累计排队时间小于阈值但大于 0 的人数 |
+| `queue_level` | string | `few` / `normal` / `crowded` |
+| `previous_queue_level` | string/null | 上一个窗口的等级 |
+| `status_change` | string | `initial` / `unchanged` / `queue_increased` / `queue_decreased` / `queue_normalized` |
+| `people` | array | 当前窗口内参与统计的人员列表 |
+
+`queue_metrics.people[]` 结构:
+
+| 字段 | 类型 | 说明 |
+| --------------- | ------- | ------------------------------------- |
+| `person_id` | string | 人员标识 |
+| `queue_seconds` | integer | 该窗口内累计排队秒数 |
+| `bucket` | string | `over_threshold` 或 `under_threshold` |
+
+## store_dwell_alert 完整报文
+
+`store_dwell_alert` 会额外带上门店停留会话明细:
+
+```json
+{
+ "event": "half_hour_report",
+ "project_type": "store_dwell_alert",
+ "camera_id": "store_cam_01",
+ "source_id": "store_cam_01",
+ "window_start": "2026-05-08T09:00:00+08:00",
+ "window_end": "2026-05-08T09:30:00+08:00",
+ "active_customer_count": 3,
+ "active_customers": [
+ {
+ "person_id": "cust_101",
+ "session_id": "cust_101-s1",
+ "role": "customer",
+ "status": "active",
+ "dwell_seconds": 820,
+ "window_queue_seconds": 820
+ },
+ {
+ "person_id": "cust_102",
+ "session_id": "cust_102-s1",
+ "role": "customer",
+ "status": "active",
+ "dwell_seconds": 460,
+ "window_queue_seconds": 460
+ }
+ ],
+ "closed_customers": [
+ {
+ "person_id": "cust_103",
+ "session_id": "cust_103-s1",
+ "final_dwell_seconds": 260,
+ "window_queue_seconds": 260
+ }
+ ],
+ "staff_seen_count": 1,
+ "queue_metrics": {
+ "queue_time_threshold_seconds": 300,
+ "over_threshold_count": 6,
+ "under_threshold_count": 2,
+ "queue_level": "crowded",
+ "previous_queue_level": "normal",
+ "status_change": "queue_increased",
+ "people": [
+ {
+ "person_id": "cust_101",
+ "queue_seconds": 820,
+ "bucket": "over_threshold"
+ },
+ {
+ "person_id": "cust_102",
+ "queue_seconds": 460,
+ "bucket": "over_threshold"
+ },
+ {
+ "person_id": "cust_103",
+ "queue_seconds": 260,
+ "bucket": "under_threshold"
+ }
+ ]
+ }
+}
+```
+
+## people_flow_project 完整报文
+
+`people_flow_project` 会额外带上过线统计和属性统计结果:
+
+```json
+{
+ "event": "half_hour_report",
+ "project_type": "people_flow_project",
+ "source_type": "rtsp",
+ "source": "rtsp://user:password@camera-ip:554/h264/ch1/main/av_stream",
+ "source_id": "queue_cam_01",
+ "window_index": 12,
+ "window_start": "2026-05-08T09:00:00+08:00",
+ "window_end": "2026-05-08T09:30:00+08:00",
+ "window_duration_seconds": 1800,
+ "config_path": "/opt/people_flow_project/config/local.yaml",
+ "line": {
+ "coordinates": [0.1, 0.55, 0.9, 0.55],
+ "mode": "normalized"
+ },
+ "total_people": 7,
+ "age_counts": {
+ "minor": 1,
+ "adult": 5,
+ "senior": 1
+ },
+ "gender_counts": {
+ "male": 4,
+ "female": 3
+ },
+ "unknown_attributes": 2,
+ "tracks": [
+ {
+ "track_id": 1,
+ "direction": "in",
+ "age": 26,
+ "age_bucket": "adult",
+ "gender": "male",
+ "samples_used": 3
+ }
+ ],
+ "queue_metrics": {
+ "queue_time_threshold_seconds": 300,
+ "over_threshold_count": 6,
+ "under_threshold_count": 2,
+ "queue_level": "crowded",
+ "previous_queue_level": "normal",
+ "status_change": "queue_increased",
+ "people": [
+ {
+ "person_id": "track_12",
+ "queue_seconds": 810,
+ "bucket": "over_threshold"
+ },
+ {
+ "person_id": "track_21",
+ "queue_seconds": 180,
+ "bucket": "under_threshold"
+ }
+ ]
+ }
+}
+```
+
+## 接收方建议
+
+- 按 `event + project_type + source_id + window_end` 做幂等去重。
+- 业务判断优先读取 `queue_metrics.queue_level` 和 `queue_metrics.status_change`。
+- 如果只关心图片里的需求,最少只需要解析:
+ - `source_id`
+ - `window_start`
+ - `window_end`
+ - `queue_metrics.over_threshold_count`
+ - `queue_metrics.under_threshold_count`
+ - `queue_metrics.queue_level`
+ - `queue_metrics.status_change`
diff --git a/managed/people_flow_project/config/config.example.yaml b/managed/people_flow_project/config/config.example.yaml
index 7ae074c..ca27a73 100644
--- a/managed/people_flow_project/config/config.example.yaml
+++ b/managed/people_flow_project/config/config.example.yaml
@@ -39,3 +39,18 @@ rtsp:
stream_open_timeout_seconds: 10.0
idle_sleep_seconds: 0.05
output_subdir: "rtsp_stream"
+
+queue:
+ enabled: true
+ area: [0.0, 0.0, 1.0, 1.0]
+ area_mode: "normalized"
+ queue_time_threshold_seconds: 300
+ crowded_count_threshold: 5
+ normal_count_threshold: 2
+ pause_timeout_seconds: 5
+ source_id: "people_flow_queue"
+
+webhook:
+ url: ""
+ timeout_seconds: 5.0
+ event_log_path: "outputs/rtsp_stream/webhook_events.jsonl"
diff --git a/managed/people_flow_project/configs/default_config.yaml b/managed/people_flow_project/configs/default_config.yaml
index 47995c7..d2097cd 100644
--- a/managed/people_flow_project/configs/default_config.yaml
+++ b/managed/people_flow_project/configs/default_config.yaml
@@ -35,3 +35,18 @@ rtsp:
stream_open_timeout_seconds: 10.0
idle_sleep_seconds: 0.05
output_subdir: "rtsp_stream"
+
+queue:
+ enabled: true
+ area: [0.0, 0.0, 1.0, 1.0]
+ area_mode: "normalized"
+ queue_time_threshold_seconds: 300
+ crowded_count_threshold: 5
+ normal_count_threshold: 2
+ pause_timeout_seconds: 5
+ source_id: "people_flow_queue"
+
+webhook:
+ url: ""
+ timeout_seconds: 5.0
+ event_log_path: "outputs/rtsp_stream/webhook_events.jsonl"
diff --git a/managed/people_flow_project/scripts/docker-entrypoint.sh b/managed/people_flow_project/scripts/docker-entrypoint.sh
index e6f4463..eade454 100755
--- a/managed/people_flow_project/scripts/docker-entrypoint.sh
+++ b/managed/people_flow_project/scripts/docker-entrypoint.sh
@@ -37,4 +37,67 @@ config_path.write_text(
)
PY
-exec python main.py --config "${CONFIG_PATH}" manage-api --host "${API_HOST}" --port "${API_PORT}"
+exec python - "$CONFIG_PATH" "$API_HOST" "$API_PORT" <<'PY'
+import signal
+import subprocess
+import sys
+import time
+
+config_path, api_host, api_port = sys.argv[1:4]
+commands = [
+ [sys.executable, "main.py", "--config", config_path, "rtsp"],
+ [
+ sys.executable,
+ "main.py",
+ "--config",
+ config_path,
+ "manage-api",
+ "--host",
+ api_host,
+ "--port",
+ api_port,
+ ],
+]
+processes = [subprocess.Popen(command) for command in commands]
+
+
+def terminate_all(signum, _frame):
+ for process in processes:
+ if process.poll() is None:
+ process.terminate()
+ deadline = time.time() + 10
+ for process in processes:
+ if process.poll() is not None:
+ continue
+ timeout = max(0, deadline - time.time())
+ try:
+ process.wait(timeout=timeout)
+ except subprocess.TimeoutExpired:
+ process.kill()
+ raise SystemExit(128 + signum)
+
+
+for handled_signal in (signal.SIGINT, signal.SIGTERM):
+ signal.signal(handled_signal, terminate_all)
+
+while True:
+ for index, process in enumerate(processes):
+ return_code = process.poll()
+ if return_code is None:
+ continue
+ for other_index, other_process in enumerate(processes):
+ if other_index == index or other_process.poll() is not None:
+ continue
+ other_process.terminate()
+ deadline = time.time() + 10
+ for other_index, other_process in enumerate(processes):
+ if other_index == index or other_process.poll() is not None:
+ continue
+ timeout = max(0, deadline - time.time())
+ try:
+ other_process.wait(timeout=timeout)
+ except subprocess.TimeoutExpired:
+ other_process.kill()
+ raise SystemExit(return_code)
+ time.sleep(0.5)
+PY
diff --git a/managed/people_flow_project/src/people_flow/config.py b/managed/people_flow_project/src/people_flow/config.py
index 2d458ab..76a609f 100644
--- a/managed/people_flow_project/src/people_flow/config.py
+++ b/managed/people_flow_project/src/people_flow/config.py
@@ -10,8 +10,10 @@ from .models import (
AttributeConfig,
CountingConfig,
OutputConfig,
+ QueueConfig,
RtspConfig,
RuntimeConfig,
+ WebhookConfig,
YoloConfig,
)
@@ -59,6 +61,8 @@ def load_config(config_path: Path) -> AppConfig:
attributes=AttributeConfig(**data.get("attributes", {})),
output=OutputConfig(**data.get("output", {})),
rtsp=RtspConfig(**data.get("rtsp", {})),
+ queue=QueueConfig(**_normalize_queue_config(data.get("queue", {}))),
+ webhook=WebhookConfig(**data.get("webhook", {})),
runtime=RuntimeConfig(**data.get("runtime", {})),
config_path=config_path.resolve(),
)
@@ -73,6 +77,14 @@ def _normalize_counting_config(data: dict) -> dict:
return normalized
+def _normalize_queue_config(data: dict) -> dict:
+ normalized = dict(data)
+ area = normalized.get("area")
+ if area is not None:
+ normalized["area"] = tuple(float(value) for value in area)
+ return normalized
+
+
def parse_line_override(raw_line: str) -> tuple[float, float, float, float]:
parts = [part.strip() for part in raw_line.split(",")]
if len(parts) != 4:
diff --git a/managed/people_flow_project/src/people_flow/manage_api.py b/managed/people_flow_project/src/people_flow/manage_api.py
index e84af50..65a8d20 100644
--- a/managed/people_flow_project/src/people_flow/manage_api.py
+++ b/managed/people_flow_project/src/people_flow/manage_api.py
@@ -16,7 +16,6 @@ from .config import (
save_config_document,
)
-
PROJECT_TYPE = "people_flow_project"
DEFAULT_MANAGE_PORT = 18082
MAX_PREVIEW_LINES = 2000
@@ -135,7 +134,12 @@ def parse_args() -> ArgumentParser:
parser = ArgumentParser(description="People flow management API")
parser.add_argument("--config", required=True, help="Path to YAML config file")
parser.add_argument("--host", default="0.0.0.0", help="Host for the management API")
- parser.add_argument("--port", type=int, default=DEFAULT_MANAGE_PORT, help="Port for the management API")
+ parser.add_argument(
+ "--port",
+ type=int,
+ default=DEFAULT_MANAGE_PORT,
+ help="Port for the management API",
+ )
return parser
@@ -160,6 +164,19 @@ def _config_payload(ctx: ManageContext) -> dict:
"output_subdir": config.rtsp.output_subdir,
"window_seconds": config.rtsp.window_seconds,
},
+ "queue": {
+ "source_id": config.queue.source_id,
+ "queue_time_threshold_seconds": config.queue.queue_time_threshold_seconds,
+ "crowded_count_threshold": config.queue.crowded_count_threshold,
+ "normal_count_threshold": config.queue.normal_count_threshold,
+ },
+ "webhook": {
+ "url": config.webhook.url,
+ "event_log_path": str(
+ resolve_project_path(ctx.project_root, config.webhook.event_log_path)
+ ),
+ "timeout_seconds": config.webhook.timeout_seconds,
+ },
}
@@ -191,15 +208,33 @@ def _build_summary(ctx: ManageContext) -> dict:
total_people = _int_value(payload.get("total_people"))
window_end = _string_value(payload.get("window_end"))
+ queue_metrics = (
+ payload.get("queue_metrics")
+ if isinstance(payload.get("queue_metrics"), dict)
+ else {}
+ )
return {
"result_type": PROJECT_TYPE,
- "headline": f"Latest window counted {total_people} people",
+ "headline": (
+ "Latest report shows "
+ f"{_string_value(queue_metrics.get('queue_level')) or 'few'} queue, "
+ f"{_int_value(queue_metrics.get('over_threshold_count'))} over 5 min and "
+ f"{_int_value(queue_metrics.get('under_threshold_count'))} under 5 min"
+ ),
"last_result_time": window_end,
"metrics": {
"summary_path": str(summary_path) if summary_path else "",
"window_start": _string_value(payload.get("window_start")),
"window_end": window_end,
"total_people": total_people,
+ "queue_level": _string_value(queue_metrics.get("queue_level")),
+ "over_threshold_count": _int_value(
+ queue_metrics.get("over_threshold_count")
+ ),
+ "under_threshold_count": _int_value(
+ queue_metrics.get("under_threshold_count")
+ ),
+ "status_change": _string_value(queue_metrics.get("status_change")),
"direction_counts": direction_counts,
"age_counts": _map_string_int(payload.get("age_counts")),
"gender_counts": _map_string_int(payload.get("gender_counts")),
@@ -246,6 +281,14 @@ def _load_window_stats(ctx: ManageContext) -> list[dict]:
"window_start": _string_value(payload.get("window_start")),
"window_end": _string_value(payload.get("window_end")),
"total_people": _int_value(payload.get("total_people")),
+ "queue_level": _queue_metric_value(payload, "queue_level"),
+ "over_threshold_count": _queue_metric_int(
+ payload, "over_threshold_count"
+ ),
+ "under_threshold_count": _queue_metric_int(
+ payload, "under_threshold_count"
+ ),
+ "status_change": _queue_metric_value(payload, "status_change"),
"age_counts": _map_string_int(payload.get("age_counts")),
"gender_counts": _map_string_int(payload.get("gender_counts")),
"unknown_attributes": _int_value(payload.get("unknown_attributes")),
@@ -259,6 +302,7 @@ def _list_result_files(ctx: ManageContext) -> list[dict]:
files: list[dict] = []
for path, label in (
(_latest_json_path(ctx), "Latest Summary"),
+ (_webhook_log_path(ctx), "Webhook Event Log"),
(_runtime_log_path(ctx), "Runtime Log"),
):
if path.exists() and path.is_file():
@@ -305,6 +349,11 @@ def _runtime_log_path(ctx: ManageContext) -> Path:
return _output_root(ctx) / "rtsp_run.log"
+def _webhook_log_path(ctx: ManageContext) -> Path:
+ config = load_config(ctx.config_path)
+ return resolve_project_path(ctx.project_root, config.webhook.event_log_path)
+
+
def _window_files(ctx: ManageContext) -> list[Path]:
windows_dir = _windows_dir(ctx)
if not windows_dir.exists():
@@ -385,5 +434,19 @@ def _map_string_int(value) -> dict[str, int]:
return {str(key): _int_value(raw) for key, raw in value.items()}
+def _queue_metric_value(payload: dict, field: str) -> str:
+ queue_metrics = payload.get("queue_metrics")
+ if not isinstance(queue_metrics, dict):
+ return ""
+ return _string_value(queue_metrics.get(field))
+
+
+def _queue_metric_int(payload: dict, field: str) -> int:
+ queue_metrics = payload.get("queue_metrics")
+ if not isinstance(queue_metrics, dict):
+ return 0
+ return _int_value(queue_metrics.get(field))
+
+
if __name__ == "__main__":
raise SystemExit(main())
diff --git a/managed/people_flow_project/src/people_flow/models.py b/managed/people_flow_project/src/people_flow/models.py
index 6a92110..b3eee55 100644
--- a/managed/people_flow_project/src/people_flow/models.py
+++ b/managed/people_flow_project/src/people_flow/models.py
@@ -20,7 +20,9 @@ class CountingConfig:
line_mode: str = "normalized"
crossing_tolerance: float = 12.0
- def to_pixel_line(self, width: int, height: int) -> tuple[float, float, float, float]:
+ def to_pixel_line(
+ self, width: int, height: int
+ ) -> tuple[float, float, float, float]:
x1, y1, x2, y2 = self.line
if self.line_mode == "pixel":
return x1, y1, x2, y2
@@ -58,6 +60,33 @@ class RtspConfig:
output_subdir: str = "rtsp_stream"
+@dataclass
+class QueueConfig:
+ enabled: bool = True
+ area: tuple[float, float, float, float] = (0.0, 0.0, 1.0, 1.0)
+ area_mode: str = "normalized"
+ queue_time_threshold_seconds: int = 300
+ crowded_count_threshold: int = 5
+ normal_count_threshold: int = 2
+ pause_timeout_seconds: int = 5
+ source_id: str = "people_flow_queue"
+
+ def to_pixel_area(
+ self, width: int, height: int
+ ) -> tuple[float, float, float, float]:
+ x1, y1, x2, y2 = self.area
+ if self.area_mode == "pixel":
+ return x1, y1, x2, y2
+ return x1 * width, y1 * height, x2 * width, y2 * height
+
+
+@dataclass
+class WebhookConfig:
+ url: str = ""
+ timeout_seconds: float = 5.0
+ event_log_path: str = "outputs/rtsp_stream/webhook_events.jsonl"
+
+
@dataclass
class RuntimeConfig:
rtsp_url: str = "rtsp://user:password@camera-ip:554/h264/ch1/main/av_stream"
@@ -71,6 +100,8 @@ class AppConfig:
attributes: AttributeConfig = field(default_factory=AttributeConfig)
output: OutputConfig = field(default_factory=OutputConfig)
rtsp: RtspConfig = field(default_factory=RtspConfig)
+ queue: QueueConfig = field(default_factory=QueueConfig)
+ webhook: WebhookConfig = field(default_factory=WebhookConfig)
runtime: RuntimeConfig = field(default_factory=RuntimeConfig)
config_path: Path | None = None
diff --git a/managed/people_flow_project/src/people_flow/pipeline.py b/managed/people_flow_project/src/people_flow/pipeline.py
index ede6b3d..aacea72 100644
--- a/managed/people_flow_project/src/people_flow/pipeline.py
+++ b/managed/people_flow_project/src/people_flow/pipeline.py
@@ -20,8 +20,9 @@ from .io_utils import (
write_window_json,
)
from .models import AppConfig
+from .queue_analytics import QueueWindowTracker
from .tracking import extract_person_tracks
-
+from .webhook import dispatch_json_event
SUPPORTED_EXTENSIONS = {".mp4", ".mov", ".mkv", ".avi"}
@@ -104,7 +105,9 @@ class PeopleFlowPipeline:
writer = None
if self.config.output.save_video:
- writer = make_video_writer(video_output_path, width=width, height=height, fps=fps)
+ writer = make_video_writer(
+ video_output_path, width=width, height=height, fps=fps
+ )
counter = LineCrossCounter(pixel_line, self.config.counting)
attributes = AttributeAggregator(self.config.attributes)
@@ -118,7 +121,9 @@ class PeopleFlowPipeline:
observations = self._track_frame(frame)
for observation in observations:
- attributes.maybe_collect(frame=frame, frame_index=frame_index, track=observation)
+ attributes.maybe_collect(
+ frame=frame, frame_index=frame_index, track=observation
+ )
counter.update(observations)
@@ -154,7 +159,9 @@ class PeopleFlowPipeline:
sample_interval = max(float(self.config.rtsp.sample_interval_seconds), 0.01)
window_seconds = max(int(self.config.rtsp.window_seconds), 1)
reconnect_delay = max(float(self.config.rtsp.reconnect_delay_seconds), 0.1)
- open_timeout_seconds = max(float(self.config.rtsp.stream_open_timeout_seconds), 1.0)
+ open_timeout_seconds = max(
+ float(self.config.rtsp.stream_open_timeout_seconds), 1.0
+ )
idle_sleep = max(float(self.config.rtsp.idle_sleep_seconds), 0.0)
window_index = 0
@@ -168,7 +175,18 @@ class PeopleFlowPipeline:
capture = None
pixel_line = None
counter = None
+ queue_tracker = None
attributes = AttributeAggregator(self.config.attributes)
+ project_root = (
+ self.config.config_path.parent.parent
+ if self.config.config_path is not None
+ else self.output_root
+ )
+ webhook_event_log_path = (
+ project_root / self.config.webhook.event_log_path
+ if not Path(self.config.webhook.event_log_path).is_absolute()
+ else Path(self.config.webhook.event_log_path)
+ )
try:
while True:
@@ -181,6 +199,7 @@ class PeopleFlowPipeline:
window_end=window_end,
counter=counter,
attributes=attributes,
+ queue_tracker=queue_tracker,
)
json_path = write_window_json(
rtsp_paths["windows"],
@@ -188,6 +207,12 @@ class PeopleFlowPipeline:
payload,
window_end,
)
+ dispatch_json_event(
+ webhook_event_log_path,
+ payload,
+ webhook_url=self.config.webhook.url,
+ timeout_seconds=self.config.webhook.timeout_seconds,
+ )
print(f"window_json={json_path}", flush=True)
print(f"window_total_people={payload['total_people']}", flush=True)
window_index += 1
@@ -195,6 +220,8 @@ class PeopleFlowPipeline:
window_end = window_start + timedelta(seconds=window_seconds)
if counter is not None:
counter.reset()
+ if queue_tracker is not None:
+ queue_tracker.reset()
attributes.reset()
now = datetime.now().astimezone()
@@ -213,8 +240,14 @@ class PeopleFlowPipeline:
if pixel_line is None:
height, width = frame.shape[:2]
- pixel_line = self.config.counting.to_pixel_line(width=width, height=height)
+ pixel_line = self.config.counting.to_pixel_line(
+ width=width, height=height
+ )
counter = LineCrossCounter(pixel_line, self.config.counting)
+ queue_tracker = QueueWindowTracker(
+ self.config.queue,
+ self.config.queue.to_pixel_area(width=width, height=height),
+ )
current_time = time.monotonic()
if current_time - last_processed_at < sample_interval:
@@ -225,9 +258,13 @@ class PeopleFlowPipeline:
last_processed_at = current_time
observations = self._track_frame(frame)
for observation in observations:
- attributes.maybe_collect(frame=frame, frame_index=frame_index, track=observation)
+ attributes.maybe_collect(
+ frame=frame, frame_index=frame_index, track=observation
+ )
if counter is not None:
counter.update(observations)
+ if queue_tracker is not None and self.config.queue.enabled:
+ queue_tracker.observe(observations, now)
if current_time >= next_heartbeat_at:
self._print_rtsp_heartbeat(
process_started_at=process_started_at,
@@ -283,7 +320,9 @@ class PeopleFlowPipeline:
capture.release()
return None
- def _build_live_stats(self, counter: LineCrossCounter, attributes: AttributeAggregator) -> dict:
+ def _build_live_stats(
+ self, counter: LineCrossCounter, attributes: AttributeAggregator
+ ) -> dict:
age_counts = {"minor": 0, "adult": 0, "senior": 0}
gender_counts = {"male": 0, "female": 0}
unknown_attributes = 0
@@ -313,7 +352,9 @@ class PeopleFlowPipeline:
last_processed_wall_time: datetime | None,
) -> None:
stats = self._build_live_stats(counter, attributes)
- runtime_seconds = int((datetime.now().astimezone() - process_started_at).total_seconds())
+ runtime_seconds = int(
+ (datetime.now().astimezone() - process_started_at).total_seconds()
+ )
last_processed = (
last_processed_wall_time.isoformat(timespec="seconds")
if last_processed_wall_time is not None
@@ -387,20 +428,41 @@ class PeopleFlowPipeline:
window_end: datetime,
counter: LineCrossCounter | None,
attributes: AttributeAggregator,
+ queue_tracker: QueueWindowTracker | None,
) -> dict:
- age_counts, gender_counts, unknown_attributes, track_summaries = self._collect_track_summaries(
- counter,
- attributes,
+ age_counts, gender_counts, unknown_attributes, track_summaries = (
+ self._collect_track_summaries(
+ counter,
+ attributes,
+ )
)
total_people = 0 if counter is None else counter.total_people
+ queue_metrics = (
+ queue_tracker.build_queue_metrics(window_start, window_end)
+ if queue_tracker is not None and self.config.queue.enabled
+ else {
+ "queue_time_threshold_seconds": self.config.queue.queue_time_threshold_seconds,
+ "over_threshold_count": 0,
+ "under_threshold_count": 0,
+ "queue_level": "few",
+ "previous_queue_level": None,
+ "status_change": "initial",
+ "people": [],
+ }
+ )
return {
+ "event": "half_hour_report",
+ "project_type": "people_flow_project",
"source_type": "rtsp",
"source": source,
+ "source_id": self.config.queue.source_id,
"window_index": window_index,
"window_start": window_start.isoformat(),
"window_end": window_end.isoformat(),
"window_duration_seconds": int((window_end - window_start).total_seconds()),
- "config_path": str(self.config.config_path) if self.config.config_path else None,
+ "config_path": (
+ str(self.config.config_path) if self.config.config_path else None
+ ),
"line": {
"coordinates": list(self.config.counting.line),
"mode": self.config.counting.line_mode,
@@ -410,6 +472,7 @@ class PeopleFlowPipeline:
"gender_counts": gender_counts,
"unknown_attributes": unknown_attributes,
"tracks": track_summaries,
+ "queue_metrics": queue_metrics,
}
def _finalize_summary(
@@ -419,15 +482,19 @@ class PeopleFlowPipeline:
attributes: AttributeAggregator,
json_path: Path,
) -> dict:
- age_counts, gender_counts, unknown_attributes, track_summaries = self._collect_track_summaries(
- counter,
- attributes,
+ age_counts, gender_counts, unknown_attributes, track_summaries = (
+ self._collect_track_summaries(
+ counter,
+ attributes,
+ )
)
payload = {
"video_name": video_path.name,
"video_path": str(video_path),
- "config_path": str(self.config.config_path) if self.config.config_path else None,
+ "config_path": (
+ str(self.config.config_path) if self.config.config_path else None
+ ),
"line": {
"coordinates": list(self.config.counting.line),
"mode": self.config.counting.line_mode,
diff --git a/managed/people_flow_project/src/people_flow/queue_analytics.py b/managed/people_flow_project/src/people_flow/queue_analytics.py
new file mode 100644
index 0000000..ea735d5
--- /dev/null
+++ b/managed/people_flow_project/src/people_flow/queue_analytics.py
@@ -0,0 +1,201 @@
+from __future__ import annotations
+
+from dataclasses import dataclass, field
+from datetime import datetime
+
+from .models import QueueConfig, TrackObservation
+
+
+@dataclass
+class QueueTrackState:
+ track_id: int
+ entered_at: datetime
+ accumulated_queue_seconds: int = 0
+ active_started_at: datetime | None = None
+ last_seen_at: datetime | None = None
+ pause_started_at: datetime | None = None
+ completed_periods: list[tuple[datetime, datetime]] = field(default_factory=list)
+
+ def __post_init__(self) -> None:
+ if self.active_started_at is None:
+ self.active_started_at = self.entered_at
+ if self.last_seen_at is None:
+ self.last_seen_at = self.entered_at
+
+ def mark_seen(self, when: datetime) -> None:
+ if self.active_started_at is None:
+ self.active_started_at = when
+ self.last_seen_at = when
+ self.pause_started_at = None
+
+ def pause(self, when: datetime) -> None:
+ if self.active_started_at is None:
+ return
+ self.completed_periods.append((self.active_started_at, when))
+ self.accumulated_queue_seconds += max(
+ 0,
+ int((when - self.active_started_at).total_seconds()),
+ )
+ self.active_started_at = None
+ self.pause_started_at = when
+ self.last_seen_at = when
+
+ def expire(self, when: datetime, pause_timeout_seconds: int) -> bool:
+ if self.pause_started_at is None:
+ return False
+ return (
+ int((when - self.pause_started_at).total_seconds()) > pause_timeout_seconds
+ )
+
+ def window_queue_seconds(self, window_start: datetime, window_end: datetime) -> int:
+ total = 0
+ for period_start, period_end in self.completed_periods:
+ total += _overlap_seconds(
+ period_start, period_end, window_start, window_end
+ )
+ if self.active_started_at is not None:
+ current_end = self.last_seen_at or window_end
+ total += _overlap_seconds(
+ self.active_started_at, current_end, window_start, window_end
+ )
+ return total
+
+
+class QueueWindowTracker:
+ def __init__(
+ self, config: QueueConfig, pixel_area: tuple[float, float, float, float]
+ ) -> None:
+ self.config = config
+ self.pixel_area = pixel_area
+ self.states: dict[int, QueueTrackState] = {}
+ self.closed_states: list[QueueTrackState] = []
+ self.last_queue_level: str | None = None
+
+ def observe(self, observations: list[TrackObservation], when: datetime) -> None:
+ seen_ids: set[int] = set()
+ for observation in observations:
+ if not _point_in_area(observation.center, self.pixel_area):
+ continue
+ seen_ids.add(observation.track_id)
+ state = self.states.get(observation.track_id)
+ if state is None:
+ state = QueueTrackState(track_id=observation.track_id, entered_at=when)
+ self.states[observation.track_id] = state
+ state.mark_seen(when)
+
+ for track_id, state in list(self.states.items()):
+ if track_id in seen_ids:
+ continue
+ if state.active_started_at is not None:
+ state.pause(when)
+ if state.expire(when, self.config.pause_timeout_seconds):
+ self.closed_states.append(state)
+ del self.states[track_id]
+
+ def build_queue_metrics(self, window_start: datetime, window_end: datetime) -> dict:
+ totals: dict[int, int] = {}
+ for state in self.closed_states:
+ queue_seconds = state.window_queue_seconds(window_start, window_end)
+ if queue_seconds > 0:
+ totals[state.track_id] = totals.get(state.track_id, 0) + queue_seconds
+ for track_id, state in self.states.items():
+ queue_seconds = state.window_queue_seconds(window_start, window_end)
+ if queue_seconds > 0:
+ totals[track_id] = queue_seconds
+
+ over_threshold_count = sum(
+ 1
+ for queue_seconds in totals.values()
+ if queue_seconds >= self.config.queue_time_threshold_seconds
+ )
+ under_threshold_count = sum(
+ 1
+ for queue_seconds in totals.values()
+ if 0 < queue_seconds < self.config.queue_time_threshold_seconds
+ )
+ queue_level = _queue_level(
+ over_threshold_count,
+ crowded_count_threshold=self.config.crowded_count_threshold,
+ normal_count_threshold=self.config.normal_count_threshold,
+ )
+ previous_queue_level = self.last_queue_level
+ status_change = _queue_status_change(previous_queue_level, queue_level)
+ self.last_queue_level = queue_level
+ return {
+ "queue_time_threshold_seconds": self.config.queue_time_threshold_seconds,
+ "over_threshold_count": over_threshold_count,
+ "under_threshold_count": under_threshold_count,
+ "queue_level": queue_level,
+ "previous_queue_level": previous_queue_level,
+ "status_change": status_change,
+ "people": [
+ {
+ "person_id": f"track_{track_id}",
+ "queue_seconds": queue_seconds,
+ "bucket": (
+ "over_threshold"
+ if queue_seconds >= self.config.queue_time_threshold_seconds
+ else "under_threshold"
+ ),
+ }
+ for track_id, queue_seconds in sorted(
+ totals.items(), key=lambda item: item[1], reverse=True
+ )
+ ],
+ }
+
+ def reset(self) -> None:
+ self.states.clear()
+ self.closed_states.clear()
+
+
+def _point_in_area(
+ point: tuple[float, float],
+ area: tuple[float, float, float, float],
+) -> bool:
+ px, py = point
+ x1, y1, x2, y2 = area
+ left = min(x1, x2)
+ right = max(x1, x2)
+ top = min(y1, y2)
+ bottom = max(y1, y2)
+ return left <= px <= right and top <= py <= bottom
+
+
+def _overlap_seconds(
+ period_start: datetime,
+ period_end: datetime,
+ window_start: datetime,
+ window_end: datetime,
+) -> int:
+ overlap_start = max(period_start, window_start)
+ overlap_end = min(period_end, window_end)
+ if overlap_end <= overlap_start:
+ return 0
+ return int((overlap_end - overlap_start).total_seconds())
+
+
+def _queue_level(
+ over_threshold_count: int,
+ crowded_count_threshold: int,
+ normal_count_threshold: int,
+) -> str:
+ if over_threshold_count > crowded_count_threshold:
+ return "crowded"
+ if over_threshold_count >= normal_count_threshold:
+ return "normal"
+ return "few"
+
+
+def _queue_status_change(previous_level: str | None, current_level: str) -> str:
+ if previous_level is None:
+ return "initial"
+ if previous_level == current_level:
+ return "unchanged"
+ if current_level == "crowded" and previous_level in {"normal", "few"}:
+ return "queue_increased"
+ if current_level == "few" and previous_level in {"normal", "crowded"}:
+ return "queue_decreased"
+ if current_level == "normal" and previous_level in {"crowded", "few"}:
+ return "queue_normalized"
+ return "changed"
diff --git a/managed/people_flow_project/src/people_flow/webhook.py b/managed/people_flow_project/src/people_flow/webhook.py
new file mode 100644
index 0000000..7241df8
--- /dev/null
+++ b/managed/people_flow_project/src/people_flow/webhook.py
@@ -0,0 +1,29 @@
+from __future__ import annotations
+
+import json
+from pathlib import Path
+from urllib import request
+
+
+def dispatch_json_event(
+ path: str | Path,
+ payload: dict,
+ webhook_url: str = "",
+ timeout_seconds: float = 5.0,
+) -> None:
+ output_path = Path(path)
+ output_path.parent.mkdir(parents=True, exist_ok=True)
+ with output_path.open("a", encoding="utf-8") as handle:
+ handle.write(json.dumps(payload, ensure_ascii=False) + "\n")
+
+ if not webhook_url.strip():
+ return
+
+ req = request.Request(
+ url=webhook_url,
+ data=json.dumps(payload).encode("utf-8"),
+ method="POST",
+ )
+ req.add_header("Content-Type", "application/json")
+ with request.urlopen(req, timeout=timeout_seconds):
+ return
diff --git a/managed/people_flow_project/tests/test_manage_api.py b/managed/people_flow_project/tests/test_manage_api.py
index 621d940..50dbebb 100644
--- a/managed/people_flow_project/tests/test_manage_api.py
+++ b/managed/people_flow_project/tests/test_manage_api.py
@@ -16,7 +16,15 @@ def build_client(project_root: Path):
" rtsp_url: rtsp://before-update\n"
" output_dir: outputs\n"
"rtsp:\n"
- " output_subdir: rtsp_stream\n",
+ " output_subdir: rtsp_stream\n"
+ "queue:\n"
+ " source_id: queue_cam_01\n"
+ " queue_time_threshold_seconds: 300\n"
+ " crowded_count_threshold: 5\n"
+ " normal_count_threshold: 2\n"
+ "webhook:\n"
+ " url: https://example.test/webhook\n"
+ " event_log_path: outputs/rtsp_stream/webhook_events.jsonl\n",
encoding="utf-8",
)
@@ -24,13 +32,23 @@ def build_client(project_root: Path):
windows_dir = rtsp_dir / "windows"
windows_dir.mkdir(parents=True, exist_ok=True)
latest_payload = {
+ "event": "half_hour_report",
"source_type": "rtsp",
+ "source_id": "queue_cam_01",
"window_start": "2026-04-16T09:30:00+08:00",
"window_end": "2026-04-16T10:00:00+08:00",
"total_people": 7,
"age_counts": {"minor": 1, "adult": 5, "senior": 1},
"gender_counts": {"male": 4, "female": 3},
"unknown_attributes": 2,
+ "queue_metrics": {
+ "queue_time_threshold_seconds": 300,
+ "over_threshold_count": 6,
+ "under_threshold_count": 2,
+ "queue_level": "crowded",
+ "previous_queue_level": "normal",
+ "status_change": "queue_increased",
+ },
"tracks": [
{"track_id": 1, "direction": "in"},
{"track_id": 2, "direction": "out"},
@@ -47,6 +65,14 @@ def build_client(project_root: Path):
"window_start": "2026-04-16T09:00:00+08:00",
"window_end": "2026-04-16T09:30:00+08:00",
"total_people": 5,
+ "queue_metrics": {
+ "queue_time_threshold_seconds": 300,
+ "over_threshold_count": 2,
+ "under_threshold_count": 1,
+ "queue_level": "normal",
+ "previous_queue_level": None,
+ "status_change": "initial",
+ },
"age_counts": {"minor": 0, "adult": 4, "senior": 1},
"gender_counts": {"male": 2, "female": 3},
"unknown_attributes": 1,
@@ -58,7 +84,12 @@ def build_client(project_root: Path):
json.dumps(latest_payload),
encoding="utf-8",
)
- (project_root / "outputs" / "rtsp_run.log").write_text("rtsp ok\n", encoding="utf-8")
+ (project_root / "outputs" / "rtsp_run.log").write_text(
+ "rtsp ok\n", encoding="utf-8"
+ )
+ (rtsp_dir / "webhook_events.jsonl").write_text(
+ json.dumps(latest_payload) + "\n", encoding="utf-8"
+ )
app = create_app(config_path)
app.testing = True
@@ -85,6 +116,8 @@ def test_get_manage_config(tmp_path: Path):
assert response.json["config_path"] == str(config_path)
assert response.json["runtime"]["rtsp_url"] == "rtsp://before-update"
assert response.json["rtsp"]["output_subdir"] == "rtsp_stream"
+ assert response.json["queue"]["source_id"] == "queue_cam_01"
+ assert response.json["webhook"]["url"] == "https://example.test/webhook"
def test_put_manage_config_updates_rtsp_url(tmp_path: Path):
@@ -111,8 +144,15 @@ def test_get_manage_summary(tmp_path: Path):
assert response.json["result_type"] == "people_flow_project"
assert response.json["last_result_time"] == "2026-04-16T10:00:00+08:00"
assert response.json["metrics"]["total_people"] == 7
+ assert response.json["metrics"]["queue_level"] == "crowded"
+ assert response.json["metrics"]["over_threshold_count"] == 6
+ assert response.json["metrics"]["under_threshold_count"] == 2
+ assert response.json["metrics"]["status_change"] == "queue_increased"
assert response.json["metrics"]["direction_counts"] == {"in": 2, "out": 1}
- assert response.json["metrics"]["recent_window_stats"][0]["window_end"] == "2026-04-16T10:00:00+08:00"
+ assert (
+ response.json["metrics"]["recent_window_stats"][0]["window_end"]
+ == "2026-04-16T10:00:00+08:00"
+ )
def test_get_manage_windows(tmp_path: Path):
@@ -126,6 +166,7 @@ def test_get_manage_windows(tmp_path: Path):
assert response.json["page_size"] == 1
assert response.json["items"][0]["window_end"] == "2026-04-16T10:00:00+08:00"
assert response.json["items"][0]["total_people"] == 7
+ assert response.json["items"][0]["queue_level"] == "crowded"
def test_get_manage_files(tmp_path: Path):
@@ -137,6 +178,7 @@ def test_get_manage_files(tmp_path: Path):
assert {item["path"] for item in response.json["files"]} == {
"outputs/rtsp_run.log",
"outputs/rtsp_stream/latest.json",
+ "outputs/rtsp_stream/webhook_events.jsonl",
"outputs/rtsp_stream/windows/stats_2026-04-16_09-00-00.json",
"outputs/rtsp_stream/windows/stats_2026-04-16_09-30-00.json",
}
diff --git a/managed/people_flow_project/tests/test_queue_analytics.py b/managed/people_flow_project/tests/test_queue_analytics.py
new file mode 100644
index 0000000..f3c69c3
--- /dev/null
+++ b/managed/people_flow_project/tests/test_queue_analytics.py
@@ -0,0 +1,43 @@
+from datetime import datetime
+from zoneinfo import ZoneInfo
+
+from src.people_flow.models import QueueConfig, TrackObservation
+from src.people_flow.queue_analytics import QueueWindowTracker
+
+TZ = ZoneInfo("Asia/Shanghai")
+
+
+def test_queue_window_tracker_builds_crowded_report():
+ tracker = QueueWindowTracker(
+ QueueConfig(
+ queue_time_threshold_seconds=300,
+ crowded_count_threshold=5,
+ normal_count_threshold=2,
+ pause_timeout_seconds=5,
+ ),
+ pixel_area=(0, 0, 100, 100),
+ )
+ start = datetime(2026, 4, 15, 11, 0, tzinfo=TZ)
+ crowded_tracks = [
+ TrackObservation(
+ track_id=index, bbox=(0, 0, 10, 10), confidence=0.9, center=(10, 10)
+ )
+ for index in range(1, 7)
+ ]
+ short_tracks = [
+ TrackObservation(
+ track_id=index, bbox=(0, 0, 10, 10), confidence=0.9, center=(10, 10)
+ )
+ for index in range(7, 9)
+ ]
+
+ tracker.observe(crowded_tracks, start)
+ tracker.observe(crowded_tracks, start.replace(minute=6))
+ tracker.observe(crowded_tracks + short_tracks, start.replace(minute=27))
+ tracker.observe(short_tracks, start.replace(minute=30))
+
+ queue_metrics = tracker.build_queue_metrics(start, start.replace(minute=30))
+
+ assert queue_metrics["over_threshold_count"] == 6
+ assert queue_metrics["under_threshold_count"] == 2
+ assert queue_metrics["queue_level"] == "crowded"
diff --git a/managed/store_dwell_alert/app/config.py b/managed/store_dwell_alert/app/config.py
index 8318822..f6b683c 100644
--- a/managed/store_dwell_alert/app/config.py
+++ b/managed/store_dwell_alert/app/config.py
@@ -8,8 +8,9 @@ import yaml
@dataclass(slots=True)
class Thresholds:
- min_people: int = 5
- min_dwell_seconds: int = 600
+ queue_time_threshold_seconds: int = 300
+ crowded_count_threshold: int = 5
+ normal_count_threshold: int = 2
pause_timeout_seconds: int = 300
alert_cooldown_seconds: int = 600
@@ -30,6 +31,7 @@ class StaffConfig:
@dataclass(slots=True)
class WebhookConfig:
+ url: str = ""
alert_url: str = ""
report_url: str = ""
timeout_seconds: float = 5.0
@@ -52,7 +54,16 @@ class AppConfig:
def _load_section(raw: dict, key: str, cls):
- return cls(**raw.get(key, {}))
+ payload = dict(raw.get(key, {}))
+ if cls is Thresholds:
+ if (
+ "queue_time_threshold_seconds" not in payload
+ and "min_dwell_seconds" in payload
+ ):
+ payload["queue_time_threshold_seconds"] = payload["min_dwell_seconds"]
+ if "crowded_count_threshold" not in payload and "min_people" in payload:
+ payload["crowded_count_threshold"] = payload["min_people"]
+ return cls(**payload)
def resolve_config_path(config_path: str | Path | None = None) -> Path:
diff --git a/managed/store_dwell_alert/app/main.py b/managed/store_dwell_alert/app/main.py
index de7dbf8..b376460 100644
--- a/managed/store_dwell_alert/app/main.py
+++ b/managed/store_dwell_alert/app/main.py
@@ -15,7 +15,7 @@ from app.config import (
from app.modules.detector_tracker import YOLOTrackerAdapter
from app.modules.dwell_engine import DwellEngine
from app.modules.identity_resolver import IdentityResolver
-from app.modules.notifier import append_json_event
+from app.modules.notifier import dispatch_json_event
from app.modules.staff_filter import StaffMatcher, load_staff_gallery
from app.modules.stream_reader import RTSPFrameReader
@@ -46,12 +46,20 @@ def build_app(config_path: str | Path | None = None) -> dict:
),
"dwell_engine": DwellEngine(
camera_id=config.camera_id,
- min_people=config.thresholds.min_people,
- min_dwell_seconds=config.thresholds.min_dwell_seconds,
+ queue_time_threshold_seconds=config.thresholds.queue_time_threshold_seconds,
+ crowded_count_threshold=config.thresholds.crowded_count_threshold,
+ normal_count_threshold=config.thresholds.normal_count_threshold,
pause_timeout_seconds=config.thresholds.pause_timeout_seconds,
alert_cooldown_seconds=config.thresholds.alert_cooldown_seconds,
),
- "notifier": append_json_event,
+ "notifier": lambda path, event: dispatch_json_event(
+ path,
+ event,
+ webhook_url=config.webhook.url
+ or config.webhook.report_url
+ or config.webhook.alert_url,
+ timeout_seconds=config.webhook.timeout_seconds,
+ ),
"event_sink_path": event_sink_path,
}
@@ -102,14 +110,18 @@ def run_forever(app: dict, max_frames: int | None = None) -> int:
def parse_args() -> ArgumentParser:
parser = ArgumentParser(description="Store dwell alert service bootstrap")
parser.add_argument("--config", help="Path to YAML config file")
- parser.add_argument("--once", action="store_true", help="Read and process one frame")
+ parser.add_argument(
+ "--once", action="store_true", help="Read and process one frame"
+ )
parser.add_argument(
"--manage-api",
action="store_true",
help="Start the management API instead of the RTSP worker loop",
)
parser.add_argument("--host", default="0.0.0.0", help="Host for the management API")
- parser.add_argument("--port", type=int, default=18081, help="Port for the management API")
+ parser.add_argument(
+ "--port", type=int, default=18081, help="Port for the management API"
+ )
parser.add_argument(
"--max-frames",
type=int,
diff --git a/managed/store_dwell_alert/app/manage_api.py b/managed/store_dwell_alert/app/manage_api.py
index 5baf543..d906eff 100644
--- a/managed/store_dwell_alert/app/manage_api.py
+++ b/managed/store_dwell_alert/app/manage_api.py
@@ -17,7 +17,6 @@ from app.config import (
save_config_document,
)
-
PROJECT_TYPE = "store_dwell_alert"
DEFAULT_MANAGE_PORT = 18081
MAX_PREVIEW_LINES = 2000
@@ -136,7 +135,12 @@ def parse_args() -> ArgumentParser:
parser = ArgumentParser(description="Store dwell alert management API")
parser.add_argument("--config", help="Path to YAML config file")
parser.add_argument("--host", default="0.0.0.0", help="Host for the management API")
- parser.add_argument("--port", type=int, default=DEFAULT_MANAGE_PORT, help="Port for the management API")
+ parser.add_argument(
+ "--port",
+ type=int,
+ default=DEFAULT_MANAGE_PORT,
+ help="Port for the management API",
+ )
return parser
@@ -160,9 +164,18 @@ def _config_payload(ctx: ManageContext) -> dict:
"sample_fps": config.stream.sample_fps,
"reconnect_backoff_seconds": config.stream.reconnect_backoff_seconds,
},
+ "thresholds": {
+ "queue_time_threshold_seconds": config.thresholds.queue_time_threshold_seconds,
+ "crowded_count_threshold": config.thresholds.crowded_count_threshold,
+ "normal_count_threshold": config.thresholds.normal_count_threshold,
+ },
"event_sink": {
"path": str(event_sink_path),
},
+ "webhook": {
+ "url": config.webhook.url,
+ "timeout_seconds": config.webhook.timeout_seconds,
+ },
}
@@ -179,11 +192,13 @@ def _build_summary(ctx: ManageContext) -> dict:
},
}
- alert_count = 0
- last_alert_time = ""
last_report_time = ""
active_count = 0
longest_dwell_seconds = 0
+ queue_level = ""
+ over_threshold_count = 0
+ under_threshold_count = 0
+ status_change = ""
window_stats: list[dict] = []
with events_path.open("r", encoding="utf-8") as handle:
@@ -196,10 +211,7 @@ def _build_summary(ctx: ManageContext) -> dict:
except json.JSONDecodeError:
continue
- if payload.get("event") == "long_stay_alert":
- alert_count += 1
- last_alert_time = _string_value(payload.get("ts"))
- elif payload.get("event") == "half_hour_report":
+ if payload.get("event") == "half_hour_report":
last_report_time = _string_value(payload.get("window_end"))
active_count = _int_value(payload.get("active_customer_count"))
stat = _build_window_stat(payload)
@@ -208,31 +220,36 @@ def _build_summary(ctx: ManageContext) -> dict:
longest_dwell_seconds,
stat["max_wait_seconds"],
)
+ queue_level = stat["queue_level"]
+ over_threshold_count = stat["over_threshold_count"]
+ under_threshold_count = stat["under_threshold_count"]
+ status_change = stat["status_change"]
window_stats.sort(
key=lambda item: _parse_timestamp(item["window_end"]),
reverse=True,
)
- headline = "No alerts or reports yet"
+ headline = "No reports yet"
if last_report_time:
headline = (
"Latest report shows "
- f"{active_count} active customers, longest dwell {longest_dwell_seconds}s"
+ f"{queue_level or 'unknown'} queue, "
+ f"{over_threshold_count} over 5 min and {under_threshold_count} under 5 min"
)
- elif alert_count > 0:
- headline = f"Observed {alert_count} alert(s), latest alert at {last_alert_time}"
return {
"result_type": PROJECT_TYPE,
"headline": headline,
- "last_result_time": _latest_timestamp(last_alert_time, last_report_time),
+ "last_result_time": _latest_timestamp(last_report_time),
"metrics": {
- "alert_count": alert_count,
- "last_alert_time": last_alert_time,
"last_half_hour_report_time": last_report_time,
"active_customer_count": active_count,
"longest_dwell_seconds": longest_dwell_seconds,
+ "queue_level": queue_level,
+ "over_threshold_count": over_threshold_count,
+ "under_threshold_count": under_threshold_count,
+ "status_change": status_change,
"events_path": str(events_path),
"recent_window_stats": window_stats[:24],
"all_window_stats": window_stats,
@@ -260,7 +277,9 @@ def _list_result_files(ctx: ManageContext) -> list[dict]:
"label": label,
"kind": path.suffix.lstrip(".").lower(),
"size": info.st_size,
- "modified_at": datetime.fromtimestamp(info.st_mtime).astimezone().isoformat(),
+ "modified_at": datetime.fromtimestamp(info.st_mtime)
+ .astimezone()
+ .isoformat(),
}
)
@@ -279,12 +298,21 @@ def _build_window_stat(payload: dict) -> dict:
payload.get("closed_customers"),
"final_dwell_seconds",
)
+ queue_metrics = (
+ payload.get("queue_metrics")
+ if isinstance(payload.get("queue_metrics"), dict)
+ else {}
+ )
return {
"window_start": _string_value(payload.get("window_start")),
"window_end": _string_value(payload.get("window_end")),
"active_customer_count": _int_value(payload.get("active_customer_count")),
"active_wait_seconds": active_wait_seconds,
"closed_wait_seconds": closed_wait_seconds,
+ "queue_level": _string_value(queue_metrics.get("queue_level")),
+ "over_threshold_count": _int_value(queue_metrics.get("over_threshold_count")),
+ "under_threshold_count": _int_value(queue_metrics.get("under_threshold_count")),
+ "status_change": _string_value(queue_metrics.get("status_change")),
"max_wait_seconds": max(
max(active_wait_seconds, default=0),
max(closed_wait_seconds, default=0),
diff --git a/managed/store_dwell_alert/app/modules/dwell_engine.py b/managed/store_dwell_alert/app/modules/dwell_engine.py
index 031f7a8..e42d811 100644
--- a/managed/store_dwell_alert/app/modules/dwell_engine.py
+++ b/managed/store_dwell_alert/app/modules/dwell_engine.py
@@ -1,6 +1,6 @@
from __future__ import annotations
-from dataclasses import dataclass
+from dataclasses import dataclass, field
from datetime import datetime
from app.modules.reporter import floor_half_hour, previous_half_hour_window
@@ -18,6 +18,7 @@ class DwellSession:
last_seen_at: datetime | None = None
pause_started_at: datetime | None = None
closed_at: datetime | None = None
+ completed_periods: list[tuple[datetime, datetime]] = field(default_factory=list)
def __post_init__(self) -> None:
if self.active_started_at is None:
@@ -46,6 +47,7 @@ class DwellSession:
def pause(self, when: datetime) -> None:
if self.state != "active" or self.active_started_at is None:
return
+ self.completed_periods.append((self.active_started_at, when))
self.accumulated_dwell_seconds += max(
0,
int((when - self.active_started_at).total_seconds()),
@@ -73,19 +75,41 @@ class DwellSession:
"dwell_seconds": self.dwell_seconds(when),
}
+ def window_dwell_seconds(
+ self,
+ window_start: datetime,
+ window_end: datetime,
+ when: datetime | None = None,
+ ) -> int:
+ total = 0
+ for period_start, period_end in self.completed_periods:
+ total += _overlap_seconds(
+ period_start, period_end, window_start, window_end
+ )
+
+ if self.state == "active" and self.active_started_at is not None:
+ current_end = when or self.last_seen_at or window_end
+ total += _overlap_seconds(
+ self.active_started_at, current_end, window_start, window_end
+ )
+
+ return total
+
class DwellEngine:
def __init__(
self,
camera_id: str,
- min_people: int,
- min_dwell_seconds: int,
+ queue_time_threshold_seconds: int,
+ crowded_count_threshold: int,
+ normal_count_threshold: int,
pause_timeout_seconds: int,
alert_cooldown_seconds: int,
) -> None:
self.camera_id = camera_id
- self.min_people = min_people
- self.min_dwell_seconds = min_dwell_seconds
+ self.queue_time_threshold_seconds = queue_time_threshold_seconds
+ self.crowded_count_threshold = crowded_count_threshold
+ self.normal_count_threshold = normal_count_threshold
self.pause_timeout_seconds = pause_timeout_seconds
self.alert_cooldown_seconds = alert_cooldown_seconds
self.sessions: dict[str, DwellSession] = {}
@@ -94,13 +118,16 @@ class DwellEngine:
self.alert_rearmed = True
self.last_alert_at: datetime | None = None
self.last_report_boundary: datetime | None = None
+ self.last_queue_level: str | None = None
def _next_session_id(self, person_id: str) -> str:
next_index = self.session_counts.get(person_id, 0) + 1
self.session_counts[person_id] = next_index
return f"{person_id}-s{next_index}"
- def _create_session(self, person_id: str, role: str, when: datetime) -> DwellSession:
+ def _create_session(
+ self, person_id: str, role: str, when: datetime
+ ) -> DwellSession:
session = DwellSession(
person_id=person_id,
session_id=self._next_session_id(person_id),
@@ -110,7 +137,9 @@ class DwellEngine:
self.sessions[person_id] = session
return session
- def process_observations(self, observations: list[dict], when: datetime) -> list[dict]:
+ def process_observations(
+ self, observations: list[dict], when: datetime
+ ) -> list[dict]:
events: list[dict] = []
seen_people: set[str] = set()
@@ -151,12 +180,12 @@ class DwellEngine:
for session in self.sessions.values()
if session.role == "customer"
and session.state == "active"
- and session.dwell_seconds(when) >= self.min_dwell_seconds
+ and session.dwell_seconds(when) >= self.queue_time_threshold_seconds
]
def _build_alert_event(self, when: datetime) -> dict | None:
long_stay_sessions = self._active_customer_sessions(when)
- if len(long_stay_sessions) < self.min_people:
+ if len(long_stay_sessions) < self.crowded_count_threshold:
self.alert_rearmed = True
return None
if not self.alert_rearmed:
@@ -168,8 +197,8 @@ class DwellEngine:
"camera_id": self.camera_id,
"ts": when.isoformat(),
"threshold": {
- "min_people": self.min_people,
- "min_dwell_seconds": self.min_dwell_seconds,
+ "min_people": self.crowded_count_threshold,
+ "min_dwell_seconds": self.queue_time_threshold_seconds,
},
"active_long_stay_count": len(long_stay_sessions),
"people": [
@@ -192,8 +221,15 @@ class DwellEngine:
return None
window_start, window_end = previous_half_hour_window(when)
+ queue_totals = self._queue_totals(window_start, window_end, when)
+ queue_metrics = self._build_queue_metrics(queue_totals)
active_customers = [
- session.as_event_dict(when)
+ {
+ **session.as_event_dict(when),
+ "window_queue_seconds": session.window_dwell_seconds(
+ window_start, window_end, when
+ ),
+ }
for session in self.sessions.values()
if session.role == "customer" and session.state == "active"
]
@@ -202,25 +238,147 @@ class DwellEngine:
"person_id": session.person_id,
"session_id": session.session_id,
"final_dwell_seconds": session.dwell_seconds(window_end),
+ "window_queue_seconds": session.window_dwell_seconds(
+ window_start, window_end, window_end
+ ),
}
for session in self.closed_sessions
if session.role == "customer"
and session.closed_at is not None
and window_start < session.closed_at <= window_end
]
- staff_seen_count = sum(1 for session in self.sessions.values() if session.role == "staff")
+ staff_seen_count = sum(
+ 1 for session in self.sessions.values() if session.role == "staff"
+ )
self.last_report_boundary = boundary
return {
"event": "half_hour_report",
+ "project_type": "store_dwell_alert",
"camera_id": self.camera_id,
+ "source_id": self.camera_id,
"window_start": window_start.isoformat(),
"window_end": window_end.isoformat(),
"active_customer_count": len(active_customers),
"active_customers": active_customers,
"closed_customers": closed_customers,
"staff_seen_count": staff_seen_count,
+ "queue_metrics": queue_metrics,
}
+ def _queue_totals(
+ self,
+ window_start: datetime,
+ window_end: datetime,
+ when: datetime,
+ ) -> dict[str, int]:
+ totals: dict[str, int] = {}
+ for session in self.closed_sessions:
+ if session.role != "customer":
+ continue
+ window_seconds = session.window_dwell_seconds(
+ window_start, window_end, window_end
+ )
+ if window_seconds > 0:
+ totals[session.person_id] = (
+ totals.get(session.person_id, 0) + window_seconds
+ )
+
+ for session in self.sessions.values():
+ if session.role != "customer":
+ continue
+ window_seconds = session.window_dwell_seconds(
+ window_start, window_end, when
+ )
+ if window_seconds > 0:
+ totals[session.person_id] = (
+ totals.get(session.person_id, 0) + window_seconds
+ )
+
+ return totals
+
+ def _build_queue_metrics(self, queue_totals: dict[str, int]) -> dict:
+ over_threshold_count = sum(
+ 1
+ for seconds in queue_totals.values()
+ if seconds >= self.queue_time_threshold_seconds
+ )
+ under_threshold_count = sum(
+ 1
+ for seconds in queue_totals.values()
+ if 0 < seconds < self.queue_time_threshold_seconds
+ )
+ queue_level = _queue_level(
+ over_threshold_count,
+ crowded_count_threshold=self.crowded_count_threshold,
+ normal_count_threshold=self.normal_count_threshold,
+ )
+ previous_queue_level = self.last_queue_level
+ status_change = _queue_status_change(previous_queue_level, queue_level)
+ self.last_queue_level = queue_level
+ return {
+ "queue_time_threshold_seconds": self.queue_time_threshold_seconds,
+ "over_threshold_count": over_threshold_count,
+ "under_threshold_count": under_threshold_count,
+ "queue_level": queue_level,
+ "previous_queue_level": previous_queue_level,
+ "status_change": status_change,
+ "people": [
+ {
+ "person_id": person_id,
+ "queue_seconds": queue_seconds,
+ "bucket": (
+ "over_threshold"
+ if queue_seconds >= self.queue_time_threshold_seconds
+ else "under_threshold"
+ ),
+ }
+ for person_id, queue_seconds in sorted(
+ queue_totals.items(),
+ key=lambda item: item[1],
+ reverse=True,
+ )
+ ],
+ }
+
+
+def _overlap_seconds(
+ period_start: datetime,
+ period_end: datetime,
+ window_start: datetime,
+ window_end: datetime,
+) -> int:
+ overlap_start = max(period_start, window_start)
+ overlap_end = min(period_end, window_end)
+ if overlap_end <= overlap_start:
+ return 0
+ return int((overlap_end - overlap_start).total_seconds())
+
+
+def _queue_level(
+ over_threshold_count: int,
+ crowded_count_threshold: int,
+ normal_count_threshold: int,
+) -> str:
+ if over_threshold_count > crowded_count_threshold:
+ return "crowded"
+ if over_threshold_count >= normal_count_threshold:
+ return "normal"
+ return "few"
+
+
+def _queue_status_change(previous_level: str | None, current_level: str) -> str:
+ if previous_level is None:
+ return "initial"
+ if previous_level == current_level:
+ return "unchanged"
+ if current_level == "crowded" and previous_level in {"normal", "few"}:
+ return "queue_increased"
+ if current_level == "few" and previous_level in {"normal", "crowded"}:
+ return "queue_decreased"
+ if current_level == "normal" and previous_level in {"crowded", "few"}:
+ return "queue_normalized"
+ return "changed"
+
def long_stay_count(sessions: list[dict], min_dwell_seconds: int) -> int:
return sum(
diff --git a/managed/store_dwell_alert/app/modules/notifier.py b/managed/store_dwell_alert/app/modules/notifier.py
index 8966551..e22ec77 100644
--- a/managed/store_dwell_alert/app/modules/notifier.py
+++ b/managed/store_dwell_alert/app/modules/notifier.py
@@ -5,7 +5,9 @@ from pathlib import Path
from urllib import request
-def build_json_request(url: str, payload: dict, timeout_seconds: float = 5.0) -> request.Request:
+def build_json_request(
+ url: str, payload: dict, timeout_seconds: float = 5.0
+) -> request.Request:
data = json.dumps(payload).encode("utf-8")
req = request.Request(url=url, data=data, method="POST")
req.add_header("Content-Type", "application/json")
@@ -18,3 +20,22 @@ def append_json_event(path: str | Path, payload: dict) -> None:
output_path.parent.mkdir(parents=True, exist_ok=True)
with output_path.open("a", encoding="utf-8") as handle:
handle.write(json.dumps(payload, ensure_ascii=False) + "\n")
+
+
+def post_json_event(url: str, payload: dict, timeout_seconds: float = 5.0) -> None:
+ if not url.strip():
+ return
+ req = build_json_request(url, payload, timeout_seconds=timeout_seconds)
+ with request.urlopen(req, timeout=timeout_seconds):
+ return
+
+
+def dispatch_json_event(
+ path: str | Path,
+ payload: dict,
+ webhook_url: str = "",
+ timeout_seconds: float = 5.0,
+) -> None:
+ append_json_event(path, payload)
+ if webhook_url.strip():
+ post_json_event(webhook_url, payload, timeout_seconds=timeout_seconds)
diff --git a/managed/store_dwell_alert/config/config.example.yaml b/managed/store_dwell_alert/config/config.example.yaml
index b723e57..e95b432 100644
--- a/managed/store_dwell_alert/config/config.example.yaml
+++ b/managed/store_dwell_alert/config/config.example.yaml
@@ -2,8 +2,9 @@ camera_id: store_cam_01
timezone: Asia/Shanghai
thresholds:
- min_people: 5
- min_dwell_seconds: 600
+ queue_time_threshold_seconds: 300
+ crowded_count_threshold: 5
+ normal_count_threshold: 2
pause_timeout_seconds: 300
alert_cooldown_seconds: 600
@@ -21,6 +22,7 @@ event_sink:
path: logs/events.jsonl
webhook:
+ url: ""
alert_url: ""
report_url: ""
timeout_seconds: 5.0
diff --git a/managed/store_dwell_alert/scripts/docker-entrypoint.sh b/managed/store_dwell_alert/scripts/docker-entrypoint.sh
index 6e4a774..221b5a2 100755
--- a/managed/store_dwell_alert/scripts/docker-entrypoint.sh
+++ b/managed/store_dwell_alert/scripts/docker-entrypoint.sh
@@ -40,4 +40,67 @@ config_path.write_text(
)
PY
-exec python -m app.manage_api --config "${CONFIG_PATH}" --host "${API_HOST}" --port "${API_PORT}"
+exec python - "$CONFIG_PATH" "$API_HOST" "$API_PORT" <<'PY'
+import signal
+import subprocess
+import sys
+import time
+
+config_path, api_host, api_port = sys.argv[1:4]
+commands = [
+ [sys.executable, "-m", "app.main", "--config", config_path],
+ [
+ sys.executable,
+ "-m",
+ "app.manage_api",
+ "--config",
+ config_path,
+ "--host",
+ api_host,
+ "--port",
+ api_port,
+ ],
+]
+processes = [subprocess.Popen(command) for command in commands]
+
+
+def terminate_all(signum, _frame):
+ for process in processes:
+ if process.poll() is None:
+ process.terminate()
+ deadline = time.time() + 10
+ for process in processes:
+ if process.poll() is not None:
+ continue
+ timeout = max(0, deadline - time.time())
+ try:
+ process.wait(timeout=timeout)
+ except subprocess.TimeoutExpired:
+ process.kill()
+ raise SystemExit(128 + signum)
+
+
+for handled_signal in (signal.SIGINT, signal.SIGTERM):
+ signal.signal(handled_signal, terminate_all)
+
+while True:
+ for index, process in enumerate(processes):
+ return_code = process.poll()
+ if return_code is None:
+ continue
+ for other_index, other_process in enumerate(processes):
+ if other_index == index or other_process.poll() is not None:
+ continue
+ other_process.terminate()
+ deadline = time.time() + 10
+ for other_index, other_process in enumerate(processes):
+ if other_index == index or other_process.poll() is not None:
+ continue
+ timeout = max(0, deadline - time.time())
+ try:
+ other_process.wait(timeout=timeout)
+ except subprocess.TimeoutExpired:
+ other_process.kill()
+ raise SystemExit(return_code)
+ time.sleep(0.5)
+PY
diff --git a/managed/store_dwell_alert/tests/test_config.py b/managed/store_dwell_alert/tests/test_config.py
index b7868b4..85c80df 100644
--- a/managed/store_dwell_alert/tests/test_config.py
+++ b/managed/store_dwell_alert/tests/test_config.py
@@ -8,16 +8,18 @@ def test_load_config_reads_thresholds(tmp_path: Path):
cfg.write_text(
"camera_id: store_cam_01\n"
"thresholds:\n"
- " min_people: 5\n"
- " min_dwell_seconds: 600\n",
+ " queue_time_threshold_seconds: 300\n"
+ " crowded_count_threshold: 5\n"
+ " normal_count_threshold: 2\n",
encoding="utf-8",
)
data = load_config(cfg)
assert data.camera_id == "store_cam_01"
- assert data.thresholds.min_people == 5
- assert data.thresholds.min_dwell_seconds == 600
+ assert data.thresholds.queue_time_threshold_seconds == 300
+ assert data.thresholds.crowded_count_threshold == 5
+ assert data.thresholds.normal_count_threshold == 2
def test_load_config_uses_defaults_for_optional_sections(tmp_path: Path):
@@ -29,4 +31,5 @@ def test_load_config_uses_defaults_for_optional_sections(tmp_path: Path):
assert data.stream.sample_fps == 2.0
assert data.staff.min_hits == 3
assert data.event_sink.path == "logs/events.jsonl"
+ assert data.webhook.url == ""
assert data.webhook.timeout_seconds == 5.0
diff --git a/managed/store_dwell_alert/tests/test_dwell_engine.py b/managed/store_dwell_alert/tests/test_dwell_engine.py
index 4448834..cd85fa5 100644
--- a/managed/store_dwell_alert/tests/test_dwell_engine.py
+++ b/managed/store_dwell_alert/tests/test_dwell_engine.py
@@ -1,8 +1,7 @@
from datetime import datetime
from zoneinfo import ZoneInfo
-from app.modules.dwell_engine import DwellEngine, DwellSession, long_stay_count
-
+from app.modules.dwell_engine import DwellEngine, DwellSession
TZ = ZoneInfo("Asia/Shanghai")
@@ -12,34 +11,81 @@ def test_session_pauses_without_adding_absence_time():
session = DwellSession(person_id="cust_1", session_id="cust_1-s1", entered_at=start)
session.mark_seen(start.replace(minute=2))
session.pause(start.replace(minute=2, second=10))
- session.close_if_expired(start.replace(minute=7, second=11), pause_timeout_seconds=300)
+ session.close_if_expired(
+ start.replace(minute=7, second=11), pause_timeout_seconds=300
+ )
assert session.state == "closed"
assert session.dwell_seconds() == 130
-def test_engine_emits_alert_when_five_long_stays_are_active():
+def test_engine_emits_half_hour_report_with_queue_classification():
engine = DwellEngine(
camera_id="store_cam_01",
- min_people=5,
- min_dwell_seconds=600,
+ queue_time_threshold_seconds=300,
+ crowded_count_threshold=5,
+ normal_count_threshold=2,
pause_timeout_seconds=300,
alert_cooldown_seconds=600,
)
- now = datetime(2026, 4, 15, 11, 20, tzinfo=TZ)
- observations = [{"person_id": f"cust_{idx}", "role": "customer"} for idx in range(5)]
+ start = datetime(2026, 4, 15, 11, 0, tzinfo=TZ)
+ crowded_group = [
+ {"person_id": f"cust_{idx}", "role": "customer"} for idx in range(6)
+ ]
+ short_wait_group = [
+ {"person_id": f"short_{idx}", "role": "customer"} for idx in range(2)
+ ]
- engine.process_observations(observations, now.replace(minute=9, second=0))
- events = engine.process_observations(observations, now)
+ engine.process_observations(crowded_group, start.replace(minute=0, second=0))
+ engine.process_observations(crowded_group, start.replace(minute=6, second=0))
+ engine.process_observations(
+ crowded_group + short_wait_group, start.replace(minute=27, second=0)
+ )
+ events = engine.process_observations(
+ short_wait_group, start.replace(minute=30, second=0)
+ )
- assert [event["event"] for event in events] == ["long_stay_alert"]
- assert events[0]["active_long_stay_count"] == 5
+ report = next(event for event in events if event["event"] == "half_hour_report")
+ assert report["queue_metrics"]["over_threshold_count"] == 6
+ assert report["queue_metrics"]["under_threshold_count"] == 2
+ assert report["queue_metrics"]["queue_level"] == "crowded"
+ assert report["queue_metrics"]["status_change"] == "initial"
+
+
+def test_engine_tracks_queue_status_change_between_windows():
+ engine = DwellEngine(
+ camera_id="store_cam_01",
+ queue_time_threshold_seconds=300,
+ crowded_count_threshold=5,
+ normal_count_threshold=2,
+ pause_timeout_seconds=300,
+ alert_cooldown_seconds=600,
+ )
+ start = datetime(2026, 4, 15, 11, 0, tzinfo=TZ)
+ engine.process_observations(
+ [{"person_id": f"crowded_{idx}", "role": "customer"} for idx in range(6)],
+ start,
+ )
+ engine.process_observations([], start.replace(minute=30))
+ engine.process_observations(
+ [{"person_id": f"normal_{idx}", "role": "customer"} for idx in range(3)],
+ start.replace(minute=31),
+ )
+ report_events = engine.process_observations([], start.replace(hour=12, minute=0))
+
+ report = next(
+ event for event in report_events if event["event"] == "half_hour_report"
+ )
+ assert report["queue_metrics"]["queue_level"] == "normal"
+ assert report["queue_metrics"]["status_change"] == "queue_normalized"
+ assert report["queue_metrics"]["previous_queue_level"] == "crowded"
def test_engine_emits_half_hour_report_with_closed_customers():
engine = DwellEngine(
camera_id="store_cam_01",
- min_people=5,
- min_dwell_seconds=600,
+ queue_time_threshold_seconds=300,
+ crowded_count_threshold=5,
+ normal_count_threshold=2,
pause_timeout_seconds=300,
alert_cooldown_seconds=600,
)
@@ -53,11 +99,5 @@ def test_engine_emits_half_hour_report_with_closed_customers():
report = next(event for event in events if event["event"] == "half_hour_report")
assert report["window_end"] == "2026-04-15T11:30:00+08:00"
assert report["closed_customers"][0]["person_id"] == "cust_1"
-
-
-def test_long_stay_count_excludes_staff():
- sessions = [
- {"role": "customer", "state": "active", "dwell_seconds": 700},
- {"role": "staff", "state": "active", "dwell_seconds": 40000},
- ]
- assert long_stay_count(sessions, min_dwell_seconds=600) == 1
+ assert report["queue_metrics"]["over_threshold_count"] == 0
+ assert report["queue_metrics"]["under_threshold_count"] == 1
diff --git a/managed/store_dwell_alert/tests/test_manage_api.py b/managed/store_dwell_alert/tests/test_manage_api.py
index 810728f..b3dad88 100644
--- a/managed/store_dwell_alert/tests/test_manage_api.py
+++ b/managed/store_dwell_alert/tests/test_manage_api.py
@@ -14,6 +14,10 @@ def build_client(project_root: Path):
config_path.write_text(
"camera_id: store_cam_01\n"
"timezone: Asia/Shanghai\n"
+ "thresholds:\n"
+ " queue_time_threshold_seconds: 300\n"
+ " crowded_count_threshold: 5\n"
+ " normal_count_threshold: 2\n"
"stream:\n"
" rtsp_url: rtsp://before-update\n"
" sample_fps: 2.0\n"
@@ -28,13 +32,6 @@ def build_client(project_root: Path):
(logs_dir / "events.jsonl").write_text(
"\n".join(
[
- json.dumps(
- {
- "event": "long_stay_alert",
- "camera_id": "store_cam_01",
- "ts": "2026-04-16T09:00:00+08:00",
- }
- ),
json.dumps(
{
"event": "half_hour_report",
@@ -50,6 +47,14 @@ def build_client(project_root: Path):
{"person_id": "cust_3", "final_dwell_seconds": 450}
],
"staff_seen_count": 1,
+ "queue_metrics": {
+ "queue_time_threshold_seconds": 300,
+ "over_threshold_count": 2,
+ "under_threshold_count": 1,
+ "queue_level": "normal",
+ "previous_queue_level": null,
+ "status_change": "initial",
+ },
}
),
json.dumps(
@@ -67,6 +72,14 @@ def build_client(project_root: Path):
{"person_id": "cust_6", "final_dwell_seconds": 120},
],
"staff_seen_count": 0,
+ "queue_metrics": {
+ "queue_time_threshold_seconds": 300,
+ "over_threshold_count": 6,
+ "under_threshold_count": 2,
+ "queue_level": "crowded",
+ "previous_queue_level": "normal",
+ "status_change": "queue_increased",
+ },
}
),
]
@@ -126,10 +139,14 @@ def test_get_manage_summary(tmp_path: Path):
assert response.status_code == 200
assert response.json["result_type"] == "store_dwell_alert"
assert response.json["last_result_time"] == "2026-04-16T10:00:00+08:00"
- assert response.json["metrics"]["alert_count"] == 1
- assert response.json["metrics"]["active_customer_count"] == 1
- assert response.json["metrics"]["longest_dwell_seconds"] == 900
- assert response.json["metrics"]["recent_window_stats"][0]["window_end"] == "2026-04-16T10:00:00+08:00"
+ assert response.json["metrics"]["queue_level"] == "crowded"
+ assert response.json["metrics"]["over_threshold_count"] == 6
+ assert response.json["metrics"]["under_threshold_count"] == 2
+ assert response.json["metrics"]["status_change"] == "queue_increased"
+ assert (
+ response.json["metrics"]["recent_window_stats"][0]["window_end"]
+ == "2026-04-16T10:00:00+08:00"
+ )
def test_get_manage_windows(tmp_path: Path):
diff --git a/web/src/views/ManagedServiceDetail.vue b/web/src/views/ManagedServiceDetail.vue
index d3ef57a..e842cc5 100644
--- a/web/src/views/ManagedServiceDetail.vue
+++ b/web/src/views/ManagedServiceDetail.vue
@@ -5,15 +5,13 @@