import json import subprocess import tempfile import unittest from pathlib import Path from unittest.mock import patch from video_ai_analysis_poc.ffmpeg_sampler import ( build_sample_command, sample_video_frames, ) class FfmpegSamplerTests(unittest.TestCase): def test_build_sample_command_uses_nvdec_decoder_for_h264(self): with tempfile.TemporaryDirectory() as tmp: output_dir = Path(tmp) / "output" command = build_sample_command( Path("/tmp/input.mp4"), output_dir, "video-abc", { "prefer_nvdec": True, "allow_cpu_fallback": False, "hwaccel": "cuda", "codec_decoders": {"h264": "h264_cuvid", "hevc": "hevc_cuvid"}, "frame_fps": 1, "frame_width": 640, "jpeg_quality": 4, }, codec_name="h264", ) self.assertIn("-hwaccel", command) self.assertIn("cuda", command) self.assertIn("-c:v", command) self.assertIn("h264_cuvid", command) self.assertEqual(command[-1], str(output_dir / "frames" / "video-abc" / "%06d.jpg")) def test_build_sample_command_uses_nvdec_decoder_for_hevc(self): with tempfile.TemporaryDirectory() as tmp: command = build_sample_command( Path("/tmp/input.mp4"), Path(tmp) / "output", "video-abc", { "prefer_nvdec": True, "allow_cpu_fallback": False, "hwaccel": "cuda", "codec_decoders": {"h264": "h264_cuvid", "hevc": "hevc_cuvid"}, "frame_fps": 1, "frame_width": 640, "jpeg_quality": 4, }, codec_name="hevc", ) self.assertIn("-hwaccel", command) self.assertIn("cuda", command) self.assertIn("-c:v", command) self.assertIn("hevc_cuvid", command) def test_build_sample_command_refuses_cpu_fallback_by_default(self): with tempfile.TemporaryDirectory() as tmp: with self.assertRaisesRegex(ValueError, "NVDEC decoder is required"): build_sample_command( Path("/tmp/input.mp4"), Path(tmp), "video-abc", { "prefer_nvdec": True, "allow_cpu_fallback": False, "codec_decoders": {"h264": "h264_cuvid", "hevc": "hevc_cuvid"}, }, codec_name="vp9", ) def test_sample_video_frames_writes_structured_failure_record(self): with tempfile.TemporaryDirectory() as tmp: root = Path(tmp) manifest_path = root / "frame_manifest.jsonl" failure = subprocess.CalledProcessError( returncode=1, cmd=["ffmpeg"], stderr="No decoder h264_cuvid", ) with patch("subprocess.run", side_effect=failure): records = sample_video_frames( { "video_id": "video-abc", "path": str(root / "input.mp4"), "codec_name": "h264", }, root, { "prefer_nvdec": True, "allow_cpu_fallback": False, "hwaccel": "cuda", "codec_decoders": {"h264": "h264_cuvid"}, "frame_fps": 1, "frame_width": 640, "jpeg_quality": 4, "timeout_seconds_per_video": 30, }, manifest_path=manifest_path, ) self.assertEqual(len(records), 1) self.assertEqual(records[0]["video_id"], "video-abc") self.assertEqual(records[0]["status"], "sample_failed") self.assertIn("h264_cuvid", records[0]["last_error"]) persisted = [ json.loads(line) for line in manifest_path.read_text(encoding="utf-8").splitlines() ] self.assertEqual(persisted, records) def test_sample_video_frames_persists_success_nvdec_evidence(self): with tempfile.TemporaryDirectory() as tmp: root = Path(tmp) manifest_path = root / "frame_manifest.jsonl" video_id = "video-abc" frame_dir = root / "frames" / video_id def run_success(*args, **kwargs): frame_dir.mkdir(parents=True, exist_ok=True) (frame_dir / "000001.jpg").write_bytes(b"jpg") return subprocess.CompletedProcess( args=args[0], returncode=0, stdout="", stderr="Using decoder h264_cuvid with hwaccel cuda", ) with patch("subprocess.run", side_effect=run_success): records = sample_video_frames( { "video_id": video_id, "path": str(root / "input.mp4"), "codec_name": "h264", }, root, { "prefer_nvdec": True, "allow_cpu_fallback": False, "hwaccel": "cuda", "codec_decoders": {"h264": "h264_cuvid"}, "frame_fps": 1, "frame_width": 640, "jpeg_quality": 4, "timeout_seconds_per_video": 30, }, manifest_path=manifest_path, ) self.assertEqual(records[0]["status"], "sampled") self.assertEqual(records[0]["decoder"], "h264_cuvid") self.assertEqual(records[0]["hwaccel"], "cuda") self.assertIn("h264_cuvid", records[0]["ffmpeg_command"]) self.assertIn("Using decoder h264_cuvid", records[0]["stderr_summary"]) persisted = [ json.loads(line) for line in manifest_path.read_text(encoding="utf-8").splitlines() ] self.assertEqual(persisted, records) def test_sample_video_frames_adds_beijing_time_from_hik_actual_begin(self): with tempfile.TemporaryDirectory() as tmp: root = Path(tmp) video_id = "video-abc" frame_dir = root / "frames" / video_id def run_success(command, *args, **kwargs): frame_dir.mkdir(parents=True, exist_ok=True) (frame_dir / "000001.jpg").write_bytes(b"jpg") (frame_dir / "000002.jpg").write_bytes(b"jpg") return subprocess.CompletedProcess( args=command, returncode=0, stdout="", stderr="", ) with patch("subprocess.run", side_effect=run_success): records = sample_video_frames( { "video_id": video_id, "path": str(root / "input.mp4"), "codec_name": "h264", "actual_begin": 1781478000, "actual_end": 1781478600, }, root, { "prefer_nvdec": True, "allow_cpu_fallback": False, "hwaccel": "cuda", "codec_decoders": {"h264": "h264_cuvid"}, "frame_fps": 1, "frame_width": 640, "jpeg_quality": 4, "timeout_seconds_per_video": 30, "timezone": "Asia/Shanghai", }, ) self.assertEqual(records[0]["beijing_time"], "2026-06-15 07:00:00") self.assertEqual(records[1]["beijing_time"], "2026-06-15 07:00:01") def test_sample_video_frames_caps_output_frames_to_requested_duration(self): with tempfile.TemporaryDirectory() as tmp: root = Path(tmp) video_id = "video-abc" frame_dir = root / "frames" / video_id captured_command = [] def run_success(command, *args, **kwargs): captured_command.extend(command) frame_dir.mkdir(parents=True, exist_ok=True) (frame_dir / "000001.jpg").write_bytes(b"jpg") return subprocess.CompletedProcess( args=command, returncode=0, stdout="", stderr="", ) with patch("subprocess.run", side_effect=run_success): sample_video_frames( { "video_id": video_id, "path": str(root / "input.mp4"), "codec_name": "hevc", "requested_begin": 1000, "requested_end": 1600, }, root, { "prefer_nvdec": True, "allow_cpu_fallback": False, "hwaccel": "cuda", "codec_decoders": {"hevc": "hevc_cuvid"}, "frame_fps": 1, "frame_width": 640, "jpeg_quality": 4, "timeout_seconds_per_video": 30, }, ) self.assertIn("-frames:v", captured_command) frames_flag_index = captured_command.index("-frames:v") self.assertEqual(captured_command[frames_flag_index + 1], "601") def test_sample_video_frames_limits_decode_window_to_requested_duration(self): with tempfile.TemporaryDirectory() as tmp: root = Path(tmp) video_id = "video-abc" frame_dir = root / "frames" / video_id captured_command = [] def run_success(command, *args, **kwargs): captured_command.extend(command) frame_dir.mkdir(parents=True, exist_ok=True) (frame_dir / "000001.jpg").write_bytes(b"jpg") return subprocess.CompletedProcess( args=command, returncode=0, stdout="", stderr="", ) with patch("subprocess.run", side_effect=run_success): sample_video_frames( { "video_id": video_id, "path": str(root / "input.mp4"), "codec_name": "hevc", "requested_begin": 1000, "requested_end": 1600, "duration_seconds": 104259.921, }, root, { "prefer_nvdec": True, "allow_cpu_fallback": False, "hwaccel": "cuda", "codec_decoders": {"hevc": "hevc_cuvid"}, "frame_fps": 1, "frame_width": 640, "jpeg_quality": 4, "timeout_seconds_per_video": 30, }, ) self.assertIn("-t", captured_command) input_index = captured_command.index("-i") t_flag_index = captured_command.index("-t") vf_index = captured_command.index("-vf") self.assertLess(input_index, t_flag_index) self.assertLess(t_flag_index, vf_index) self.assertEqual(captured_command[t_flag_index + 1], "600") def test_sample_video_frames_uses_complete_frames_when_ffmpeg_exits_nonzero(self): with tempfile.TemporaryDirectory() as tmp: root = Path(tmp) video_id = "video-abc" frame_dir = root / "frames" / video_id manifest_path = root / "frame_manifest.jsonl" def run_with_nonzero_exit(command, *args, **kwargs): frame_dir.mkdir(parents=True, exist_ok=True) for index in range(1, 602): (frame_dir / f"{index:06d}.jpg").write_bytes(b"jpg") raise subprocess.CalledProcessError( returncode=1, cmd=command, stderr="trailing decoder error after requested frames", ) with patch("subprocess.run", side_effect=run_with_nonzero_exit): records = sample_video_frames( { "video_id": video_id, "path": str(root / "input.mp4"), "codec_name": "hevc", "requested_begin": 1000, "requested_end": 1600, }, root, { "prefer_nvdec": True, "allow_cpu_fallback": False, "hwaccel": "cuda", "codec_decoders": {"hevc": "hevc_cuvid"}, "frame_fps": 1, "frame_width": 640, "jpeg_quality": 4, "timeout_seconds_per_video": 30, }, manifest_path=manifest_path, ) self.assertEqual(len(records), 601) self.assertEqual({record["status"] for record in records}, {"sampled"}) self.assertIn("-t", records[0]["ffmpeg_command"]) self.assertIn("trailing decoder error", records[0]["stderr_summary"]) persisted = [ json.loads(line) for line in manifest_path.read_text(encoding="utf-8").splitlines() ] self.assertEqual(persisted, records) def test_sample_video_frames_uses_nearly_complete_frames_when_ffmpeg_exits_nonzero(self): with tempfile.TemporaryDirectory() as tmp: root = Path(tmp) video_id = "video-abc" frame_dir = root / "frames" / video_id def run_with_nonzero_exit(command, *args, **kwargs): frame_dir.mkdir(parents=True, exist_ok=True) for index in range(1, 597): (frame_dir / f"{index:06d}.jpg").write_bytes(b"jpg") raise subprocess.CalledProcessError( returncode=1, cmd=command, stderr="trailing decoder error after near-complete chunk", ) with patch("subprocess.run", side_effect=run_with_nonzero_exit): records = sample_video_frames( { "video_id": video_id, "path": str(root / "input.mp4"), "codec_name": "hevc", "actual_begin": 1000, "actual_end": 1600, }, root, { "prefer_nvdec": True, "allow_cpu_fallback": False, "hwaccel": "cuda", "codec_decoders": {"hevc": "hevc_cuvid"}, "frame_fps": 1, "frame_width": 640, "jpeg_quality": 4, "timeout_seconds_per_video": 30, }, ) self.assertEqual(len(records), 596) self.assertEqual({record["status"] for record in records}, {"sampled"}) self.assertIn("near-complete chunk", records[0]["stderr_summary"]) if __name__ == "__main__": unittest.main()