310 lines
13 KiB
Python
310 lines
13 KiB
Python
import json
|
|
import tempfile
|
|
import unittest
|
|
from datetime import datetime, timedelta
|
|
from pathlib import Path
|
|
|
|
from video_ai_analysis_poc.aggregator import aggregate_outputs
|
|
|
|
|
|
class AggregatorTests(unittest.TestCase):
|
|
def test_aggregates_video_results_folder_summary_and_merges_adjacent_events(self):
|
|
with tempfile.TemporaryDirectory() as tmp:
|
|
output_dir = Path(tmp)
|
|
video_a = {
|
|
"video_id": "video-a",
|
|
"path": "/videos/a.mp4",
|
|
"status": "probed",
|
|
"duration_seconds": 40.0,
|
|
"codec_name": "h264",
|
|
"width": 1920,
|
|
"height": 1080,
|
|
}
|
|
video_b = {
|
|
"video_id": "video-b",
|
|
"path": "/videos/b.mp4",
|
|
"status": "probe_failed",
|
|
"last_error": "bad file",
|
|
}
|
|
self._write_jsonl(output_dir / "video_manifest.jsonl", [video_a, video_b])
|
|
clips = [
|
|
self._clip("video-a", "video-a_c000001", 0.0, 10.0),
|
|
self._clip("video-a", "video-a_c000002", 12.0, 20.0),
|
|
self._clip("video-a", "video-a_c000003", 21.0, 30.0),
|
|
self._clip("video-b", "video-b_c000001", 0.0, 10.0),
|
|
]
|
|
self._write_jsonl(output_dir / "clip_manifest.jsonl", clips)
|
|
results = [
|
|
self._result(
|
|
"video-a",
|
|
"video-a_c000001",
|
|
"/videos/a.mp4",
|
|
0.0,
|
|
10.0,
|
|
"09:00:01",
|
|
[{"event_type": "queue_detected", "start_offset_seconds": 1.0, "end_offset_seconds": 10.0}],
|
|
),
|
|
self._result(
|
|
"video-a",
|
|
"video-a_c000002",
|
|
"/videos/a.mp4",
|
|
12.0,
|
|
20.0,
|
|
"09:00:13",
|
|
[{"event_type": "queue_detected", "start_offset_seconds": 12.0, "end_offset_seconds": 16.0}],
|
|
),
|
|
self._result(
|
|
"video-a",
|
|
"video-a_c000003",
|
|
"/videos/a.mp4",
|
|
21.0,
|
|
30.0,
|
|
"09:00:22",
|
|
[{"event_type": "staff_absent", "start_offset_seconds": 21.0, "end_offset_seconds": 25.0}],
|
|
),
|
|
{
|
|
"schema_version": "local-batch-v1",
|
|
"video_id": "video-b",
|
|
"video_path": "/videos/b.mp4",
|
|
"clip_id": "video-b_c000001",
|
|
"status": "inference_failed",
|
|
"monitoring_timeline": {
|
|
"video_start_time": None,
|
|
"clip_start_seconds": 0.0,
|
|
"clip_end_seconds": 10.0,
|
|
"frame_times": [],
|
|
"screen_time": "",
|
|
},
|
|
"events": [],
|
|
"raw_response": "",
|
|
"processing": {},
|
|
"error": "offline",
|
|
},
|
|
]
|
|
self._write_jsonl(output_dir / "clip_results.jsonl", results)
|
|
|
|
aggregate_outputs(
|
|
output_dir,
|
|
{
|
|
"input": {"dir": "/videos"},
|
|
"schema": {"version": "local-batch-v1", "merge_gap_seconds": 3},
|
|
"runtime": {"timezone": "Asia/Shanghai"},
|
|
},
|
|
)
|
|
|
|
video_result_path = output_dir / "videos" / "video-a" / "video_result.json"
|
|
self.assertTrue(video_result_path.exists())
|
|
video_result = json.loads(video_result_path.read_text(encoding="utf-8"))
|
|
self.assertEqual(video_result["schema_version"], "local-batch-v1")
|
|
self.assertEqual(video_result["video_id"], "video-a")
|
|
self.assertEqual(video_result["video_path"], "/videos/a.mp4")
|
|
self.assertEqual(video_result["probe"]["codec_name"], "h264")
|
|
self.assertIsNone(video_result["monitoring_timeline"]["video_start_time"])
|
|
self.assertEqual(video_result["monitoring_timeline"]["video_duration_seconds"], 40.0)
|
|
self.assertEqual(video_result["clip_count"], 3)
|
|
self.assertEqual(video_result["failed_clip_count"], 0)
|
|
self.assertEqual(video_result["event_counts"], {"queue_detected": 1, "staff_absent": 1})
|
|
self.assertEqual(len(video_result["events"]), 2)
|
|
merged = video_result["events"][0]
|
|
self.assertEqual(merged["event_type"], "queue_detected")
|
|
self.assertEqual(merged["start_offset_seconds"], 1.0)
|
|
self.assertEqual(merged["end_offset_seconds"], 16.0)
|
|
self.assertEqual(merged["screen_times"], ["09:00:01", "09:00:13"])
|
|
self.assertEqual(merged["evidence"]["clip_ids"], ["video-a_c000001", "video-a_c000002"])
|
|
self.assertEqual(
|
|
[
|
|
clip["clip_start_beijing_time"]
|
|
for clip in merged["evidence"]["clips"]
|
|
],
|
|
["2026-06-15 07:00:00", "2026-06-15 07:00:12"],
|
|
)
|
|
self.assertEqual(
|
|
[
|
|
clip["clip_end_beijing_time"]
|
|
for clip in merged["evidence"]["clips"]
|
|
],
|
|
["2026-06-15 07:00:10", "2026-06-15 07:00:20"],
|
|
)
|
|
self.assertEqual(video_result["outputs"]["clip_results_jsonl"], "clip_results.jsonl")
|
|
self.assertIn("started_at", video_result["processing"])
|
|
self.assertIn("finished_at", video_result["processing"])
|
|
|
|
failed_video_result = json.loads(
|
|
(output_dir / "videos" / "video-b" / "video_result.json").read_text(
|
|
encoding="utf-8"
|
|
)
|
|
)
|
|
self.assertEqual(failed_video_result["clip_count"], 1)
|
|
self.assertEqual(failed_video_result["failed_clip_count"], 1)
|
|
self.assertEqual(failed_video_result["event_counts"], {})
|
|
|
|
folder_summary = json.loads(
|
|
(output_dir / "folder_summary.json").read_text(encoding="utf-8")
|
|
)
|
|
self.assertEqual(folder_summary["schema_version"], "local-batch-v1")
|
|
self.assertEqual(folder_summary["input_dir"], "/videos")
|
|
self.assertEqual(folder_summary["video_count"], 2)
|
|
self.assertEqual(folder_summary["processed_video_count"], 1)
|
|
self.assertEqual(folder_summary["failed_video_count"], 1)
|
|
self.assertEqual(folder_summary["event_counts"], {"queue_detected": 1, "staff_absent": 1})
|
|
self.assertEqual(
|
|
[video["video_id"] for video in folder_summary["videos"]],
|
|
["video-a", "video-b"],
|
|
)
|
|
self.assertIn("processing", folder_summary)
|
|
|
|
def test_ffprobe_start_time_is_not_treated_as_monitoring_timeline_start(self):
|
|
with tempfile.TemporaryDirectory() as tmp:
|
|
output_dir = Path(tmp)
|
|
self._write_jsonl(
|
|
output_dir / "video_manifest.jsonl",
|
|
[
|
|
{
|
|
"video_id": "video-local",
|
|
"path": "/videos/local.mp4",
|
|
"status": "probed",
|
|
"duration_seconds": 12.0,
|
|
"start_time": 0.0,
|
|
}
|
|
],
|
|
)
|
|
self._write_jsonl(
|
|
output_dir / "clip_manifest.jsonl",
|
|
[self._clip("video-local", "video-local_c000001", 0.0, 10.0)],
|
|
)
|
|
self._write_jsonl(output_dir / "clip_results.jsonl", [])
|
|
|
|
aggregate_outputs(
|
|
output_dir,
|
|
{
|
|
"input": {"dir": "/videos"},
|
|
"schema": {"version": "local-batch-v1", "merge_gap_seconds": 3},
|
|
},
|
|
)
|
|
|
|
video_result = json.loads(
|
|
(output_dir / "videos" / "video-local" / "video_result.json").read_text(
|
|
encoding="utf-8"
|
|
)
|
|
)
|
|
self.assertEqual(video_result["probe"]["start_time"], 0.0)
|
|
self.assertIsNone(video_result["monitoring_timeline"]["video_start_time"])
|
|
|
|
def test_does_not_merge_different_event_types_videos_or_large_gaps(self):
|
|
with tempfile.TemporaryDirectory() as tmp:
|
|
output_dir = Path(tmp)
|
|
self._write_jsonl(
|
|
output_dir / "video_manifest.jsonl",
|
|
[
|
|
{"video_id": "video-a", "path": "/videos/a.mp4", "status": "probed"},
|
|
{"video_id": "video-b", "path": "/videos/b.mp4", "status": "probed"},
|
|
],
|
|
)
|
|
self._write_jsonl(
|
|
output_dir / "clip_manifest.jsonl",
|
|
[
|
|
self._clip("video-a", "a1", 0.0, 10.0),
|
|
self._clip("video-a", "a2", 40.0, 50.0),
|
|
self._clip("video-a", "a3", 51.0, 60.0),
|
|
self._clip("video-b", "b1", 0.0, 10.0),
|
|
],
|
|
)
|
|
self._write_jsonl(
|
|
output_dir / "clip_results.jsonl",
|
|
[
|
|
self._result("video-a", "a1", "/videos/a.mp4", 0.0, 10.0, "", [{"event_type": "queue_detected", "start_offset_seconds": 1.0, "end_offset_seconds": 5.0}]),
|
|
self._result("video-a", "a2", "/videos/a.mp4", 40.0, 50.0, "", [{"event_type": "queue_detected", "start_offset_seconds": 40.0, "end_offset_seconds": 45.0}]),
|
|
self._result("video-a", "a3", "/videos/a.mp4", 51.0, 60.0, "", [{"event_type": "staff_absent", "start_offset_seconds": 51.0, "end_offset_seconds": 55.0}]),
|
|
self._result("video-b", "b1", "/videos/b.mp4", 0.0, 10.0, "", [{"event_type": "queue_detected", "start_offset_seconds": 1.0, "end_offset_seconds": 5.0}]),
|
|
],
|
|
)
|
|
|
|
aggregate_outputs(
|
|
output_dir,
|
|
{
|
|
"input": {"dir": "/videos"},
|
|
"schema": {"version": "local-batch-v1", "merge_gap_seconds": 3},
|
|
},
|
|
)
|
|
|
|
video_a = json.loads(
|
|
(output_dir / "videos" / "video-a" / "video_result.json").read_text(
|
|
encoding="utf-8"
|
|
)
|
|
)
|
|
video_b = json.loads(
|
|
(output_dir / "videos" / "video-b" / "video_result.json").read_text(
|
|
encoding="utf-8"
|
|
)
|
|
)
|
|
self.assertEqual(len(video_a["events"]), 3)
|
|
self.assertEqual(video_a["event_counts"], {"queue_detected": 2, "staff_absent": 1})
|
|
self.assertEqual(len(video_b["events"]), 1)
|
|
self.assertEqual(video_b["event_counts"], {"queue_detected": 1})
|
|
|
|
def _clip(self, video_id, clip_id, start, end):
|
|
return {
|
|
"video_id": video_id,
|
|
"clip_id": clip_id,
|
|
"clip_start_seconds": start,
|
|
"clip_end_seconds": end,
|
|
"clip_start_timecode": "00:00:00",
|
|
"clip_end_timecode": "00:00:10",
|
|
"frame_times": [
|
|
{
|
|
"frame_path": f"frames/{video_id}/{clip_id}.jpg",
|
|
"offset_seconds": start,
|
|
"timecode": "00:00:00",
|
|
}
|
|
],
|
|
"status": "pending",
|
|
}
|
|
|
|
def _result(self, video_id, clip_id, video_path, start, end, screen_time, events):
|
|
base = datetime(2026, 6, 15, 7, 0, 0)
|
|
clip_start_beijing_time = (base + timedelta(seconds=start)).strftime(
|
|
"%Y-%m-%d %H:%M:%S"
|
|
)
|
|
clip_end_beijing_time = (base + timedelta(seconds=end)).strftime(
|
|
"%Y-%m-%d %H:%M:%S"
|
|
)
|
|
return {
|
|
"schema_version": "local-batch-v1",
|
|
"video_id": video_id,
|
|
"video_path": video_path,
|
|
"clip_id": clip_id,
|
|
"status": "ok",
|
|
"monitoring_timeline": {
|
|
"video_start_time": None,
|
|
"clip_start_seconds": start,
|
|
"clip_end_seconds": end,
|
|
"clip_start_timecode": "00:00:00",
|
|
"clip_end_timecode": "00:00:10",
|
|
"clip_start_beijing_time": clip_start_beijing_time,
|
|
"clip_end_beijing_time": clip_end_beijing_time,
|
|
"frame_times": [
|
|
{
|
|
"frame_path": f"frames/{video_id}/{clip_id}.jpg",
|
|
"offset_seconds": start,
|
|
"timecode": "00:00:00",
|
|
"beijing_time": clip_start_beijing_time,
|
|
}
|
|
],
|
|
"screen_time": screen_time,
|
|
},
|
|
"events": events,
|
|
"raw_response": "{}",
|
|
"processing": {},
|
|
"error": None,
|
|
}
|
|
|
|
def _write_jsonl(self, path, records):
|
|
path.write_text(
|
|
"".join(json.dumps(record, sort_keys=True) + "\n" for record in records),
|
|
encoding="utf-8",
|
|
)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
unittest.main()
|