Files
managed-portal/managed/people_flow_project/tests/test_pipeline.py

105 lines
3.5 KiB
Python

from __future__ import annotations
import json
import sys
from datetime import datetime
from pathlib import Path
from types import SimpleNamespace
from src.people_flow.models import AttributeConfig
from src.people_flow.pipeline import PeopleFlowPipeline, resolve_inference_device
def test_resolve_inference_device_keeps_cpu():
assert resolve_inference_device("cpu") == "cpu"
def test_resolve_inference_device_falls_back_when_cuda_unavailable(monkeypatch):
fake_torch = SimpleNamespace(
cuda=SimpleNamespace(is_available=lambda: False),
)
monkeypatch.setitem(sys.modules, "torch", fake_torch)
assert resolve_inference_device("cuda:0") == "cpu"
def test_resolve_inference_device_keeps_cuda_when_available(monkeypatch):
fake_torch = SimpleNamespace(
cuda=SimpleNamespace(is_available=lambda: True),
)
monkeypatch.setitem(sys.modules, "torch", fake_torch)
assert resolve_inference_device("cuda:0") == "cuda:0"
def test_attribute_config_defaults_to_disabled():
assert AttributeConfig().enabled is False
def test_write_live_rtsp_summary_updates_latest_json(tmp_path: Path):
pipeline = PeopleFlowPipeline.__new__(PeopleFlowPipeline)
observed_at = datetime.fromisoformat("2026-05-18T10:45:18+08:00")
window_start = datetime.fromisoformat("2026-05-18T10:30:00+08:00")
def fake_build_rtsp_summary(**kwargs):
return {
"event": "half_hour_report",
"window_start": kwargs["window_start"].isoformat(),
"window_end": kwargs["window_end"].isoformat(),
"total_people": 4,
"queue_metrics": {"queue_level": "normal"},
}
pipeline._build_rtsp_summary = fake_build_rtsp_summary # type: ignore[method-assign]
latest_path = tmp_path / "latest.json"
pipeline._write_live_rtsp_summary(
latest_path=latest_path,
source="rtsp://example",
window_index=0,
window_start=window_start,
observed_at=observed_at,
counter=None,
attributes=SimpleNamespace(),
queue_tracker=None,
)
payload = json.loads(latest_path.read_text(encoding="utf-8"))
assert payload["event"] == "rtsp_live_summary"
assert payload["window_status"] == "in_progress"
assert payload["window_start"] == window_start.isoformat()
assert payload["window_end"] == observed_at.isoformat()
def test_write_live_rtsp_summary_does_not_commit_queue_level(tmp_path: Path):
pipeline = PeopleFlowPipeline.__new__(PeopleFlowPipeline)
observed_at = datetime.fromisoformat("2026-05-18T10:45:18+08:00")
window_start = datetime.fromisoformat("2026-05-18T10:30:00+08:00")
captured: dict[str, object] = {}
def fake_build_rtsp_summary(**kwargs):
captured.update(kwargs)
return {
"event": "half_hour_report",
"window_start": kwargs["window_start"].isoformat(),
"window_end": kwargs["window_end"].isoformat(),
"total_people": 4,
"queue_metrics": {"queue_level": "normal"},
}
pipeline._build_rtsp_summary = fake_build_rtsp_summary # type: ignore[method-assign]
latest_path = tmp_path / "latest.json"
pipeline._write_live_rtsp_summary(
latest_path=latest_path,
source="rtsp://example",
window_index=0,
window_start=window_start,
observed_at=observed_at,
counter=None,
attributes=SimpleNamespace(),
queue_tracker=SimpleNamespace(),
)
assert captured["commit_queue_level"] is False