import json import tempfile import unittest from datetime import datetime, timedelta from pathlib import Path from video_ai_analysis_poc.aggregator import aggregate_outputs class AggregatorTests(unittest.TestCase): def test_aggregates_video_results_folder_summary_and_merges_adjacent_events(self): with tempfile.TemporaryDirectory() as tmp: output_dir = Path(tmp) video_a = { "video_id": "video-a", "path": "/videos/a.mp4", "status": "probed", "duration_seconds": 40.0, "codec_name": "h264", "width": 1920, "height": 1080, } video_b = { "video_id": "video-b", "path": "/videos/b.mp4", "status": "probe_failed", "last_error": "bad file", } self._write_jsonl(output_dir / "video_manifest.jsonl", [video_a, video_b]) clips = [ self._clip("video-a", "video-a_c000001", 0.0, 10.0), self._clip("video-a", "video-a_c000002", 12.0, 20.0), self._clip("video-a", "video-a_c000003", 21.0, 30.0), self._clip("video-b", "video-b_c000001", 0.0, 10.0), ] self._write_jsonl(output_dir / "clip_manifest.jsonl", clips) results = [ self._result( "video-a", "video-a_c000001", "/videos/a.mp4", 0.0, 10.0, "09:00:01", [{"event_type": "queue_detected", "start_offset_seconds": 1.0, "end_offset_seconds": 10.0}], ), self._result( "video-a", "video-a_c000002", "/videos/a.mp4", 12.0, 20.0, "09:00:13", [{"event_type": "queue_detected", "start_offset_seconds": 12.0, "end_offset_seconds": 16.0}], ), self._result( "video-a", "video-a_c000003", "/videos/a.mp4", 21.0, 30.0, "09:00:22", [{"event_type": "staff_absent", "start_offset_seconds": 21.0, "end_offset_seconds": 25.0}], ), { "schema_version": "local-batch-v1", "video_id": "video-b", "video_path": "/videos/b.mp4", "clip_id": "video-b_c000001", "status": "inference_failed", "monitoring_timeline": { "video_start_time": None, "clip_start_seconds": 0.0, "clip_end_seconds": 10.0, "frame_times": [], "screen_time": "", }, "events": [], "raw_response": "", "processing": {}, "error": "offline", }, ] self._write_jsonl(output_dir / "clip_results.jsonl", results) aggregate_outputs( output_dir, { "input": {"dir": "/videos"}, "schema": {"version": "local-batch-v1", "merge_gap_seconds": 3}, "runtime": {"timezone": "Asia/Shanghai"}, }, ) video_result_path = output_dir / "videos" / "video-a" / "video_result.json" self.assertTrue(video_result_path.exists()) video_result = json.loads(video_result_path.read_text(encoding="utf-8")) self.assertEqual(video_result["schema_version"], "local-batch-v1") self.assertEqual(video_result["video_id"], "video-a") self.assertEqual(video_result["video_path"], "/videos/a.mp4") self.assertEqual(video_result["probe"]["codec_name"], "h264") self.assertIsNone(video_result["monitoring_timeline"]["video_start_time"]) self.assertEqual(video_result["monitoring_timeline"]["video_duration_seconds"], 40.0) self.assertEqual(video_result["clip_count"], 3) self.assertEqual(video_result["failed_clip_count"], 0) self.assertEqual(video_result["event_counts"], {"queue_detected": 1, "staff_absent": 1}) self.assertEqual(len(video_result["events"]), 2) merged = video_result["events"][0] self.assertEqual(merged["event_type"], "queue_detected") self.assertEqual(merged["start_offset_seconds"], 1.0) self.assertEqual(merged["end_offset_seconds"], 16.0) self.assertEqual(merged["screen_times"], ["09:00:01", "09:00:13"]) self.assertEqual(merged["evidence"]["clip_ids"], ["video-a_c000001", "video-a_c000002"]) self.assertEqual( [ clip["clip_start_beijing_time"] for clip in merged["evidence"]["clips"] ], ["2026-06-15 07:00:00", "2026-06-15 07:00:12"], ) self.assertEqual( [ clip["clip_end_beijing_time"] for clip in merged["evidence"]["clips"] ], ["2026-06-15 07:00:10", "2026-06-15 07:00:20"], ) self.assertEqual(video_result["outputs"]["clip_results_jsonl"], "clip_results.jsonl") self.assertIn("started_at", video_result["processing"]) self.assertIn("finished_at", video_result["processing"]) failed_video_result = json.loads( (output_dir / "videos" / "video-b" / "video_result.json").read_text( encoding="utf-8" ) ) self.assertEqual(failed_video_result["clip_count"], 1) self.assertEqual(failed_video_result["failed_clip_count"], 1) self.assertEqual(failed_video_result["event_counts"], {}) folder_summary = json.loads( (output_dir / "folder_summary.json").read_text(encoding="utf-8") ) self.assertEqual(folder_summary["schema_version"], "local-batch-v1") self.assertEqual(folder_summary["input_dir"], "/videos") self.assertEqual(folder_summary["video_count"], 2) self.assertEqual(folder_summary["processed_video_count"], 1) self.assertEqual(folder_summary["failed_video_count"], 1) self.assertEqual(folder_summary["event_counts"], {"queue_detected": 1, "staff_absent": 1}) self.assertEqual( [video["video_id"] for video in folder_summary["videos"]], ["video-a", "video-b"], ) self.assertIn("processing", folder_summary) def test_ffprobe_start_time_is_not_treated_as_monitoring_timeline_start(self): with tempfile.TemporaryDirectory() as tmp: output_dir = Path(tmp) self._write_jsonl( output_dir / "video_manifest.jsonl", [ { "video_id": "video-local", "path": "/videos/local.mp4", "status": "probed", "duration_seconds": 12.0, "start_time": 0.0, } ], ) self._write_jsonl( output_dir / "clip_manifest.jsonl", [self._clip("video-local", "video-local_c000001", 0.0, 10.0)], ) self._write_jsonl(output_dir / "clip_results.jsonl", []) aggregate_outputs( output_dir, { "input": {"dir": "/videos"}, "schema": {"version": "local-batch-v1", "merge_gap_seconds": 3}, }, ) video_result = json.loads( (output_dir / "videos" / "video-local" / "video_result.json").read_text( encoding="utf-8" ) ) self.assertEqual(video_result["probe"]["start_time"], 0.0) self.assertIsNone(video_result["monitoring_timeline"]["video_start_time"]) def test_does_not_merge_different_event_types_videos_or_large_gaps(self): with tempfile.TemporaryDirectory() as tmp: output_dir = Path(tmp) self._write_jsonl( output_dir / "video_manifest.jsonl", [ {"video_id": "video-a", "path": "/videos/a.mp4", "status": "probed"}, {"video_id": "video-b", "path": "/videos/b.mp4", "status": "probed"}, ], ) self._write_jsonl( output_dir / "clip_manifest.jsonl", [ self._clip("video-a", "a1", 0.0, 10.0), self._clip("video-a", "a2", 40.0, 50.0), self._clip("video-a", "a3", 51.0, 60.0), self._clip("video-b", "b1", 0.0, 10.0), ], ) self._write_jsonl( output_dir / "clip_results.jsonl", [ self._result("video-a", "a1", "/videos/a.mp4", 0.0, 10.0, "", [{"event_type": "queue_detected", "start_offset_seconds": 1.0, "end_offset_seconds": 5.0}]), self._result("video-a", "a2", "/videos/a.mp4", 40.0, 50.0, "", [{"event_type": "queue_detected", "start_offset_seconds": 40.0, "end_offset_seconds": 45.0}]), self._result("video-a", "a3", "/videos/a.mp4", 51.0, 60.0, "", [{"event_type": "staff_absent", "start_offset_seconds": 51.0, "end_offset_seconds": 55.0}]), self._result("video-b", "b1", "/videos/b.mp4", 0.0, 10.0, "", [{"event_type": "queue_detected", "start_offset_seconds": 1.0, "end_offset_seconds": 5.0}]), ], ) aggregate_outputs( output_dir, { "input": {"dir": "/videos"}, "schema": {"version": "local-batch-v1", "merge_gap_seconds": 3}, }, ) video_a = json.loads( (output_dir / "videos" / "video-a" / "video_result.json").read_text( encoding="utf-8" ) ) video_b = json.loads( (output_dir / "videos" / "video-b" / "video_result.json").read_text( encoding="utf-8" ) ) self.assertEqual(len(video_a["events"]), 3) self.assertEqual(video_a["event_counts"], {"queue_detected": 2, "staff_absent": 1}) self.assertEqual(len(video_b["events"]), 1) self.assertEqual(video_b["event_counts"], {"queue_detected": 1}) def _clip(self, video_id, clip_id, start, end): return { "video_id": video_id, "clip_id": clip_id, "clip_start_seconds": start, "clip_end_seconds": end, "clip_start_timecode": "00:00:00", "clip_end_timecode": "00:00:10", "frame_times": [ { "frame_path": f"frames/{video_id}/{clip_id}.jpg", "offset_seconds": start, "timecode": "00:00:00", } ], "status": "pending", } def _result(self, video_id, clip_id, video_path, start, end, screen_time, events): base = datetime(2026, 6, 15, 7, 0, 0) clip_start_beijing_time = (base + timedelta(seconds=start)).strftime( "%Y-%m-%d %H:%M:%S" ) clip_end_beijing_time = (base + timedelta(seconds=end)).strftime( "%Y-%m-%d %H:%M:%S" ) return { "schema_version": "local-batch-v1", "video_id": video_id, "video_path": video_path, "clip_id": clip_id, "status": "ok", "monitoring_timeline": { "video_start_time": None, "clip_start_seconds": start, "clip_end_seconds": end, "clip_start_timecode": "00:00:00", "clip_end_timecode": "00:00:10", "clip_start_beijing_time": clip_start_beijing_time, "clip_end_beijing_time": clip_end_beijing_time, "frame_times": [ { "frame_path": f"frames/{video_id}/{clip_id}.jpg", "offset_seconds": start, "timecode": "00:00:00", "beijing_time": clip_start_beijing_time, } ], "screen_time": screen_time, }, "events": events, "raw_response": "{}", "processing": {}, "error": None, } def _write_jsonl(self, path, records): path.write_text( "".join(json.dumps(record, sort_keys=True) + "\n" for record in records), encoding="utf-8", ) if __name__ == "__main__": unittest.main()