from __future__ import annotations import json from pathlib import Path import yaml from src.people_flow.manage_api import create_app def build_client(project_root: Path): config_path = project_root / "config" / "local.yaml" config_path.parent.mkdir(parents=True, exist_ok=True) config_path.write_text( "runtime:\n" " rtsp_url: rtsp://before-update\n" " output_dir: outputs\n" "rtsp:\n" " output_subdir: rtsp_stream\n" "queue:\n" " source_id: queue_cam_01\n" " queue_time_threshold_seconds: 300\n" " crowded_count_threshold: 5\n" " normal_count_threshold: 2\n" "webhook:\n" " url: https://example.test/webhook\n" " event_log_path: outputs/rtsp_stream/webhook_events.jsonl\n", encoding="utf-8", ) rtsp_dir = project_root / "outputs" / "rtsp_stream" windows_dir = rtsp_dir / "windows" windows_dir.mkdir(parents=True, exist_ok=True) latest_payload = { "event": "half_hour_report", "source_type": "rtsp", "source_id": "queue_cam_01", "window_start": "2026-04-16T09:30:00+08:00", "window_end": "2026-04-16T10:00:00+08:00", "total_people": 7, "age_counts": {"minor": 1, "adult": 5, "senior": 1}, "gender_counts": {"male": 4, "female": 3}, "unknown_attributes": 2, "queue_metrics": { "queue_time_threshold_seconds": 300, "over_threshold_count": 6, "under_threshold_count": 2, "queue_level": "crowded", "previous_queue_level": "normal", "status_change": "queue_increased", }, "tracks": [ {"track_id": 1, "direction": "in"}, {"track_id": 2, "direction": "out"}, {"track_id": 3, "direction": "in"}, ], } (rtsp_dir / "latest.json").write_text( json.dumps(latest_payload), encoding="utf-8", ) (windows_dir / "stats_2026-04-16_09-00-00.json").write_text( json.dumps( { "window_start": "2026-04-16T09:00:00+08:00", "window_end": "2026-04-16T09:30:00+08:00", "total_people": 5, "queue_metrics": { "queue_time_threshold_seconds": 300, "over_threshold_count": 2, "under_threshold_count": 1, "queue_level": "normal", "previous_queue_level": None, "status_change": "initial", }, "age_counts": {"minor": 0, "adult": 4, "senior": 1}, "gender_counts": {"male": 2, "female": 3}, "unknown_attributes": 1, } ), encoding="utf-8", ) (windows_dir / "stats_2026-04-16_09-30-00.json").write_text( json.dumps(latest_payload), encoding="utf-8", ) (project_root / "outputs" / "rtsp_run.log").write_text( "rtsp ok\n", encoding="utf-8" ) (rtsp_dir / "webhook_events.jsonl").write_text( json.dumps(latest_payload) + "\n", encoding="utf-8" ) app = create_app(config_path) app.testing = True return app.test_client(), config_path def test_get_manage_health(tmp_path: Path): client, _ = build_client(tmp_path) response = client.get("/api/manage/health") assert response.status_code == 200 assert response.json["status"] == "ok" assert response.json["project_type"] == "people_flow_project" assert response.json["runtime_status"] == "running" def test_get_manage_config(tmp_path: Path): client, config_path = build_client(tmp_path) response = client.get("/api/manage/config") assert response.status_code == 200 assert response.json["config_path"] == str(config_path) assert response.json["runtime"]["rtsp_url"] == "rtsp://before-update" assert response.json["rtsp"]["output_subdir"] == "rtsp_stream" assert response.json["queue"]["source_id"] == "queue_cam_01" assert response.json["webhook"]["url"] == "https://example.test/webhook" def test_put_manage_config_updates_rtsp_url(tmp_path: Path): client, config_path = build_client(tmp_path) response = client.put( "/api/manage/config", json={"rtsp_url": "rtsp://after-update"}, ) assert response.status_code == 200 assert response.json["runtime"]["rtsp_url"] == "rtsp://after-update" saved = yaml.safe_load(config_path.read_text(encoding="utf-8")) assert saved["runtime"]["rtsp_url"] == "rtsp://after-update" def test_get_manage_summary(tmp_path: Path): client, _ = build_client(tmp_path) response = client.get("/api/manage/summary") assert response.status_code == 200 assert response.json["result_type"] == "people_flow_project" assert response.json["last_result_time"] == "2026-04-16T10:00:00+08:00" assert response.json["metrics"]["total_people"] == 7 assert response.json["metrics"]["queue_level"] == "crowded" assert response.json["metrics"]["over_threshold_count"] == 6 assert response.json["metrics"]["under_threshold_count"] == 2 assert response.json["metrics"]["status_change"] == "queue_increased" assert response.json["metrics"]["direction_counts"] == {"in": 2, "out": 1} assert ( response.json["metrics"]["recent_window_stats"][0]["window_end"] == "2026-04-16T10:00:00+08:00" ) def test_get_manage_windows(tmp_path: Path): client, _ = build_client(tmp_path) response = client.get("/api/manage/windows?page=1&page_size=1") assert response.status_code == 200 assert response.json["total"] == 2 assert response.json["page"] == 1 assert response.json["page_size"] == 1 assert response.json["items"][0]["window_end"] == "2026-04-16T10:00:00+08:00" assert response.json["items"][0]["total_people"] == 7 assert response.json["items"][0]["queue_level"] == "crowded" def test_get_manage_files(tmp_path: Path): client, _ = build_client(tmp_path) response = client.get("/api/manage/files") assert response.status_code == 200 assert {item["path"] for item in response.json["files"]} == { "outputs/rtsp_run.log", "outputs/rtsp_stream/latest.json", "outputs/rtsp_stream/webhook_events.jsonl", "outputs/rtsp_stream/windows/stats_2026-04-16_09-00-00.json", "outputs/rtsp_stream/windows/stats_2026-04-16_09-30-00.json", } def test_get_manage_files_preview(tmp_path: Path): client, _ = build_client(tmp_path) response = client.get( "/api/manage/files/preview?path=outputs/rtsp_stream/latest.json&lines=1" ) assert response.status_code == 200 assert response.json["path"] == "outputs/rtsp_stream/latest.json" assert response.json["count"] == 1 assert "total_people" in response.json["lines"][0] def test_get_manage_files_download(tmp_path: Path): client, _ = build_client(tmp_path) response = client.get("/api/manage/files/download?path=outputs/rtsp_run.log") assert response.status_code == 200 assert response.data == b"rtsp ok\n"