Files
video-ai-analysis/tests/test_aggregator.py
2026-06-17 11:33:54 +08:00

310 lines
13 KiB
Python

import json
import tempfile
import unittest
from datetime import datetime, timedelta
from pathlib import Path
from video_ai_analysis_poc.aggregator import aggregate_outputs
class AggregatorTests(unittest.TestCase):
def test_aggregates_video_results_folder_summary_and_merges_adjacent_events(self):
with tempfile.TemporaryDirectory() as tmp:
output_dir = Path(tmp)
video_a = {
"video_id": "video-a",
"path": "/videos/a.mp4",
"status": "probed",
"duration_seconds": 40.0,
"codec_name": "h264",
"width": 1920,
"height": 1080,
}
video_b = {
"video_id": "video-b",
"path": "/videos/b.mp4",
"status": "probe_failed",
"last_error": "bad file",
}
self._write_jsonl(output_dir / "video_manifest.jsonl", [video_a, video_b])
clips = [
self._clip("video-a", "video-a_c000001", 0.0, 10.0),
self._clip("video-a", "video-a_c000002", 12.0, 20.0),
self._clip("video-a", "video-a_c000003", 21.0, 30.0),
self._clip("video-b", "video-b_c000001", 0.0, 10.0),
]
self._write_jsonl(output_dir / "clip_manifest.jsonl", clips)
results = [
self._result(
"video-a",
"video-a_c000001",
"/videos/a.mp4",
0.0,
10.0,
"09:00:01",
[{"event_type": "queue_detected", "start_offset_seconds": 1.0, "end_offset_seconds": 10.0}],
),
self._result(
"video-a",
"video-a_c000002",
"/videos/a.mp4",
12.0,
20.0,
"09:00:13",
[{"event_type": "queue_detected", "start_offset_seconds": 12.0, "end_offset_seconds": 16.0}],
),
self._result(
"video-a",
"video-a_c000003",
"/videos/a.mp4",
21.0,
30.0,
"09:00:22",
[{"event_type": "staff_absent", "start_offset_seconds": 21.0, "end_offset_seconds": 25.0}],
),
{
"schema_version": "local-batch-v1",
"video_id": "video-b",
"video_path": "/videos/b.mp4",
"clip_id": "video-b_c000001",
"status": "inference_failed",
"monitoring_timeline": {
"video_start_time": None,
"clip_start_seconds": 0.0,
"clip_end_seconds": 10.0,
"frame_times": [],
"screen_time": "",
},
"events": [],
"raw_response": "",
"processing": {},
"error": "offline",
},
]
self._write_jsonl(output_dir / "clip_results.jsonl", results)
aggregate_outputs(
output_dir,
{
"input": {"dir": "/videos"},
"schema": {"version": "local-batch-v1", "merge_gap_seconds": 3},
"runtime": {"timezone": "Asia/Shanghai"},
},
)
video_result_path = output_dir / "videos" / "video-a" / "video_result.json"
self.assertTrue(video_result_path.exists())
video_result = json.loads(video_result_path.read_text(encoding="utf-8"))
self.assertEqual(video_result["schema_version"], "local-batch-v1")
self.assertEqual(video_result["video_id"], "video-a")
self.assertEqual(video_result["video_path"], "/videos/a.mp4")
self.assertEqual(video_result["probe"]["codec_name"], "h264")
self.assertIsNone(video_result["monitoring_timeline"]["video_start_time"])
self.assertEqual(video_result["monitoring_timeline"]["video_duration_seconds"], 40.0)
self.assertEqual(video_result["clip_count"], 3)
self.assertEqual(video_result["failed_clip_count"], 0)
self.assertEqual(video_result["event_counts"], {"queue_detected": 1, "staff_absent": 1})
self.assertEqual(len(video_result["events"]), 2)
merged = video_result["events"][0]
self.assertEqual(merged["event_type"], "queue_detected")
self.assertEqual(merged["start_offset_seconds"], 1.0)
self.assertEqual(merged["end_offset_seconds"], 16.0)
self.assertEqual(merged["screen_times"], ["09:00:01", "09:00:13"])
self.assertEqual(merged["evidence"]["clip_ids"], ["video-a_c000001", "video-a_c000002"])
self.assertEqual(
[
clip["clip_start_beijing_time"]
for clip in merged["evidence"]["clips"]
],
["2026-06-15 07:00:00", "2026-06-15 07:00:12"],
)
self.assertEqual(
[
clip["clip_end_beijing_time"]
for clip in merged["evidence"]["clips"]
],
["2026-06-15 07:00:10", "2026-06-15 07:00:20"],
)
self.assertEqual(video_result["outputs"]["clip_results_jsonl"], "clip_results.jsonl")
self.assertIn("started_at", video_result["processing"])
self.assertIn("finished_at", video_result["processing"])
failed_video_result = json.loads(
(output_dir / "videos" / "video-b" / "video_result.json").read_text(
encoding="utf-8"
)
)
self.assertEqual(failed_video_result["clip_count"], 1)
self.assertEqual(failed_video_result["failed_clip_count"], 1)
self.assertEqual(failed_video_result["event_counts"], {})
folder_summary = json.loads(
(output_dir / "folder_summary.json").read_text(encoding="utf-8")
)
self.assertEqual(folder_summary["schema_version"], "local-batch-v1")
self.assertEqual(folder_summary["input_dir"], "/videos")
self.assertEqual(folder_summary["video_count"], 2)
self.assertEqual(folder_summary["processed_video_count"], 1)
self.assertEqual(folder_summary["failed_video_count"], 1)
self.assertEqual(folder_summary["event_counts"], {"queue_detected": 1, "staff_absent": 1})
self.assertEqual(
[video["video_id"] for video in folder_summary["videos"]],
["video-a", "video-b"],
)
self.assertIn("processing", folder_summary)
def test_ffprobe_start_time_is_not_treated_as_monitoring_timeline_start(self):
with tempfile.TemporaryDirectory() as tmp:
output_dir = Path(tmp)
self._write_jsonl(
output_dir / "video_manifest.jsonl",
[
{
"video_id": "video-local",
"path": "/videos/local.mp4",
"status": "probed",
"duration_seconds": 12.0,
"start_time": 0.0,
}
],
)
self._write_jsonl(
output_dir / "clip_manifest.jsonl",
[self._clip("video-local", "video-local_c000001", 0.0, 10.0)],
)
self._write_jsonl(output_dir / "clip_results.jsonl", [])
aggregate_outputs(
output_dir,
{
"input": {"dir": "/videos"},
"schema": {"version": "local-batch-v1", "merge_gap_seconds": 3},
},
)
video_result = json.loads(
(output_dir / "videos" / "video-local" / "video_result.json").read_text(
encoding="utf-8"
)
)
self.assertEqual(video_result["probe"]["start_time"], 0.0)
self.assertIsNone(video_result["monitoring_timeline"]["video_start_time"])
def test_does_not_merge_different_event_types_videos_or_large_gaps(self):
with tempfile.TemporaryDirectory() as tmp:
output_dir = Path(tmp)
self._write_jsonl(
output_dir / "video_manifest.jsonl",
[
{"video_id": "video-a", "path": "/videos/a.mp4", "status": "probed"},
{"video_id": "video-b", "path": "/videos/b.mp4", "status": "probed"},
],
)
self._write_jsonl(
output_dir / "clip_manifest.jsonl",
[
self._clip("video-a", "a1", 0.0, 10.0),
self._clip("video-a", "a2", 40.0, 50.0),
self._clip("video-a", "a3", 51.0, 60.0),
self._clip("video-b", "b1", 0.0, 10.0),
],
)
self._write_jsonl(
output_dir / "clip_results.jsonl",
[
self._result("video-a", "a1", "/videos/a.mp4", 0.0, 10.0, "", [{"event_type": "queue_detected", "start_offset_seconds": 1.0, "end_offset_seconds": 5.0}]),
self._result("video-a", "a2", "/videos/a.mp4", 40.0, 50.0, "", [{"event_type": "queue_detected", "start_offset_seconds": 40.0, "end_offset_seconds": 45.0}]),
self._result("video-a", "a3", "/videos/a.mp4", 51.0, 60.0, "", [{"event_type": "staff_absent", "start_offset_seconds": 51.0, "end_offset_seconds": 55.0}]),
self._result("video-b", "b1", "/videos/b.mp4", 0.0, 10.0, "", [{"event_type": "queue_detected", "start_offset_seconds": 1.0, "end_offset_seconds": 5.0}]),
],
)
aggregate_outputs(
output_dir,
{
"input": {"dir": "/videos"},
"schema": {"version": "local-batch-v1", "merge_gap_seconds": 3},
},
)
video_a = json.loads(
(output_dir / "videos" / "video-a" / "video_result.json").read_text(
encoding="utf-8"
)
)
video_b = json.loads(
(output_dir / "videos" / "video-b" / "video_result.json").read_text(
encoding="utf-8"
)
)
self.assertEqual(len(video_a["events"]), 3)
self.assertEqual(video_a["event_counts"], {"queue_detected": 2, "staff_absent": 1})
self.assertEqual(len(video_b["events"]), 1)
self.assertEqual(video_b["event_counts"], {"queue_detected": 1})
def _clip(self, video_id, clip_id, start, end):
return {
"video_id": video_id,
"clip_id": clip_id,
"clip_start_seconds": start,
"clip_end_seconds": end,
"clip_start_timecode": "00:00:00",
"clip_end_timecode": "00:00:10",
"frame_times": [
{
"frame_path": f"frames/{video_id}/{clip_id}.jpg",
"offset_seconds": start,
"timecode": "00:00:00",
}
],
"status": "pending",
}
def _result(self, video_id, clip_id, video_path, start, end, screen_time, events):
base = datetime(2026, 6, 15, 7, 0, 0)
clip_start_beijing_time = (base + timedelta(seconds=start)).strftime(
"%Y-%m-%d %H:%M:%S"
)
clip_end_beijing_time = (base + timedelta(seconds=end)).strftime(
"%Y-%m-%d %H:%M:%S"
)
return {
"schema_version": "local-batch-v1",
"video_id": video_id,
"video_path": video_path,
"clip_id": clip_id,
"status": "ok",
"monitoring_timeline": {
"video_start_time": None,
"clip_start_seconds": start,
"clip_end_seconds": end,
"clip_start_timecode": "00:00:00",
"clip_end_timecode": "00:00:10",
"clip_start_beijing_time": clip_start_beijing_time,
"clip_end_beijing_time": clip_end_beijing_time,
"frame_times": [
{
"frame_path": f"frames/{video_id}/{clip_id}.jpg",
"offset_seconds": start,
"timecode": "00:00:00",
"beijing_time": clip_start_beijing_time,
}
],
"screen_time": screen_time,
},
"events": events,
"raw_response": "{}",
"processing": {},
"error": None,
}
def _write_jsonl(self, path, records):
path.write_text(
"".join(json.dumps(record, sort_keys=True) + "\n" for record in records),
encoding="utf-8",
)
if __name__ == "__main__":
unittest.main()