Files
video-ai-analysis/tests/test_ffmpeg_sampler.py
2026-06-18 03:27:09 +08:00

400 lines
16 KiB
Python

import json
import subprocess
import tempfile
import unittest
from pathlib import Path
from unittest.mock import patch
from video_ai_analysis_poc.ffmpeg_sampler import (
build_sample_command,
sample_video_frames,
)
class FfmpegSamplerTests(unittest.TestCase):
def test_build_sample_command_uses_nvdec_decoder_for_h264(self):
with tempfile.TemporaryDirectory() as tmp:
output_dir = Path(tmp) / "output"
command = build_sample_command(
Path("/tmp/input.mp4"),
output_dir,
"video-abc",
{
"prefer_nvdec": True,
"allow_cpu_fallback": False,
"hwaccel": "cuda",
"codec_decoders": {"h264": "h264_cuvid", "hevc": "hevc_cuvid"},
"frame_fps": 1,
"frame_width": 640,
"jpeg_quality": 4,
},
codec_name="h264",
)
self.assertIn("-hwaccel", command)
self.assertIn("cuda", command)
self.assertIn("-c:v", command)
self.assertIn("h264_cuvid", command)
self.assertEqual(command[-1], str(output_dir / "frames" / "video-abc" / "%06d.jpg"))
def test_build_sample_command_uses_nvdec_decoder_for_hevc(self):
with tempfile.TemporaryDirectory() as tmp:
command = build_sample_command(
Path("/tmp/input.mp4"),
Path(tmp) / "output",
"video-abc",
{
"prefer_nvdec": True,
"allow_cpu_fallback": False,
"hwaccel": "cuda",
"codec_decoders": {"h264": "h264_cuvid", "hevc": "hevc_cuvid"},
"frame_fps": 1,
"frame_width": 640,
"jpeg_quality": 4,
},
codec_name="hevc",
)
self.assertIn("-hwaccel", command)
self.assertIn("cuda", command)
self.assertIn("-c:v", command)
self.assertIn("hevc_cuvid", command)
def test_build_sample_command_refuses_cpu_fallback_by_default(self):
with tempfile.TemporaryDirectory() as tmp:
with self.assertRaisesRegex(ValueError, "NVDEC decoder is required"):
build_sample_command(
Path("/tmp/input.mp4"),
Path(tmp),
"video-abc",
{
"prefer_nvdec": True,
"allow_cpu_fallback": False,
"codec_decoders": {"h264": "h264_cuvid", "hevc": "hevc_cuvid"},
},
codec_name="vp9",
)
def test_sample_video_frames_writes_structured_failure_record(self):
with tempfile.TemporaryDirectory() as tmp:
root = Path(tmp)
manifest_path = root / "frame_manifest.jsonl"
failure = subprocess.CalledProcessError(
returncode=1,
cmd=["ffmpeg"],
stderr="No decoder h264_cuvid",
)
with patch("subprocess.run", side_effect=failure):
records = sample_video_frames(
{
"video_id": "video-abc",
"path": str(root / "input.mp4"),
"codec_name": "h264",
},
root,
{
"prefer_nvdec": True,
"allow_cpu_fallback": False,
"hwaccel": "cuda",
"codec_decoders": {"h264": "h264_cuvid"},
"frame_fps": 1,
"frame_width": 640,
"jpeg_quality": 4,
"timeout_seconds_per_video": 30,
},
manifest_path=manifest_path,
)
self.assertEqual(len(records), 1)
self.assertEqual(records[0]["video_id"], "video-abc")
self.assertEqual(records[0]["status"], "sample_failed")
self.assertIn("h264_cuvid", records[0]["last_error"])
persisted = [
json.loads(line)
for line in manifest_path.read_text(encoding="utf-8").splitlines()
]
self.assertEqual(persisted, records)
def test_sample_video_frames_persists_success_nvdec_evidence(self):
with tempfile.TemporaryDirectory() as tmp:
root = Path(tmp)
manifest_path = root / "frame_manifest.jsonl"
video_id = "video-abc"
frame_dir = root / "frames" / video_id
def run_success(*args, **kwargs):
frame_dir.mkdir(parents=True, exist_ok=True)
(frame_dir / "000001.jpg").write_bytes(b"jpg")
return subprocess.CompletedProcess(
args=args[0],
returncode=0,
stdout="",
stderr="Using decoder h264_cuvid with hwaccel cuda",
)
with patch("subprocess.run", side_effect=run_success):
records = sample_video_frames(
{
"video_id": video_id,
"path": str(root / "input.mp4"),
"codec_name": "h264",
},
root,
{
"prefer_nvdec": True,
"allow_cpu_fallback": False,
"hwaccel": "cuda",
"codec_decoders": {"h264": "h264_cuvid"},
"frame_fps": 1,
"frame_width": 640,
"jpeg_quality": 4,
"timeout_seconds_per_video": 30,
},
manifest_path=manifest_path,
)
self.assertEqual(records[0]["status"], "sampled")
self.assertEqual(records[0]["decoder"], "h264_cuvid")
self.assertEqual(records[0]["hwaccel"], "cuda")
self.assertIn("h264_cuvid", records[0]["ffmpeg_command"])
self.assertIn("Using decoder h264_cuvid", records[0]["stderr_summary"])
persisted = [
json.loads(line)
for line in manifest_path.read_text(encoding="utf-8").splitlines()
]
self.assertEqual(persisted, records)
def test_sample_video_frames_adds_beijing_time_from_hik_actual_begin(self):
with tempfile.TemporaryDirectory() as tmp:
root = Path(tmp)
video_id = "video-abc"
frame_dir = root / "frames" / video_id
def run_success(command, *args, **kwargs):
frame_dir.mkdir(parents=True, exist_ok=True)
(frame_dir / "000001.jpg").write_bytes(b"jpg")
(frame_dir / "000002.jpg").write_bytes(b"jpg")
return subprocess.CompletedProcess(
args=command,
returncode=0,
stdout="",
stderr="",
)
with patch("subprocess.run", side_effect=run_success):
records = sample_video_frames(
{
"video_id": video_id,
"path": str(root / "input.mp4"),
"codec_name": "h264",
"actual_begin": 1781478000,
"actual_end": 1781478600,
},
root,
{
"prefer_nvdec": True,
"allow_cpu_fallback": False,
"hwaccel": "cuda",
"codec_decoders": {"h264": "h264_cuvid"},
"frame_fps": 1,
"frame_width": 640,
"jpeg_quality": 4,
"timeout_seconds_per_video": 30,
"timezone": "Asia/Shanghai",
},
)
self.assertEqual(records[0]["beijing_time"], "2026-06-15 07:00:00")
self.assertEqual(records[1]["beijing_time"], "2026-06-15 07:00:01")
def test_sample_video_frames_caps_output_frames_to_requested_duration(self):
with tempfile.TemporaryDirectory() as tmp:
root = Path(tmp)
video_id = "video-abc"
frame_dir = root / "frames" / video_id
captured_command = []
def run_success(command, *args, **kwargs):
captured_command.extend(command)
frame_dir.mkdir(parents=True, exist_ok=True)
(frame_dir / "000001.jpg").write_bytes(b"jpg")
return subprocess.CompletedProcess(
args=command,
returncode=0,
stdout="",
stderr="",
)
with patch("subprocess.run", side_effect=run_success):
sample_video_frames(
{
"video_id": video_id,
"path": str(root / "input.mp4"),
"codec_name": "hevc",
"requested_begin": 1000,
"requested_end": 1600,
},
root,
{
"prefer_nvdec": True,
"allow_cpu_fallback": False,
"hwaccel": "cuda",
"codec_decoders": {"hevc": "hevc_cuvid"},
"frame_fps": 1,
"frame_width": 640,
"jpeg_quality": 4,
"timeout_seconds_per_video": 30,
},
)
self.assertIn("-frames:v", captured_command)
frames_flag_index = captured_command.index("-frames:v")
self.assertEqual(captured_command[frames_flag_index + 1], "601")
def test_sample_video_frames_limits_decode_window_to_requested_duration(self):
with tempfile.TemporaryDirectory() as tmp:
root = Path(tmp)
video_id = "video-abc"
frame_dir = root / "frames" / video_id
captured_command = []
def run_success(command, *args, **kwargs):
captured_command.extend(command)
frame_dir.mkdir(parents=True, exist_ok=True)
(frame_dir / "000001.jpg").write_bytes(b"jpg")
return subprocess.CompletedProcess(
args=command,
returncode=0,
stdout="",
stderr="",
)
with patch("subprocess.run", side_effect=run_success):
sample_video_frames(
{
"video_id": video_id,
"path": str(root / "input.mp4"),
"codec_name": "hevc",
"requested_begin": 1000,
"requested_end": 1600,
"duration_seconds": 104259.921,
},
root,
{
"prefer_nvdec": True,
"allow_cpu_fallback": False,
"hwaccel": "cuda",
"codec_decoders": {"hevc": "hevc_cuvid"},
"frame_fps": 1,
"frame_width": 640,
"jpeg_quality": 4,
"timeout_seconds_per_video": 30,
},
)
self.assertIn("-t", captured_command)
input_index = captured_command.index("-i")
t_flag_index = captured_command.index("-t")
vf_index = captured_command.index("-vf")
self.assertLess(input_index, t_flag_index)
self.assertLess(t_flag_index, vf_index)
self.assertEqual(captured_command[t_flag_index + 1], "600")
def test_sample_video_frames_uses_complete_frames_when_ffmpeg_exits_nonzero(self):
with tempfile.TemporaryDirectory() as tmp:
root = Path(tmp)
video_id = "video-abc"
frame_dir = root / "frames" / video_id
manifest_path = root / "frame_manifest.jsonl"
def run_with_nonzero_exit(command, *args, **kwargs):
frame_dir.mkdir(parents=True, exist_ok=True)
for index in range(1, 602):
(frame_dir / f"{index:06d}.jpg").write_bytes(b"jpg")
raise subprocess.CalledProcessError(
returncode=1,
cmd=command,
stderr="trailing decoder error after requested frames",
)
with patch("subprocess.run", side_effect=run_with_nonzero_exit):
records = sample_video_frames(
{
"video_id": video_id,
"path": str(root / "input.mp4"),
"codec_name": "hevc",
"requested_begin": 1000,
"requested_end": 1600,
},
root,
{
"prefer_nvdec": True,
"allow_cpu_fallback": False,
"hwaccel": "cuda",
"codec_decoders": {"hevc": "hevc_cuvid"},
"frame_fps": 1,
"frame_width": 640,
"jpeg_quality": 4,
"timeout_seconds_per_video": 30,
},
manifest_path=manifest_path,
)
self.assertEqual(len(records), 601)
self.assertEqual({record["status"] for record in records}, {"sampled"})
self.assertIn("-t", records[0]["ffmpeg_command"])
self.assertIn("trailing decoder error", records[0]["stderr_summary"])
persisted = [
json.loads(line)
for line in manifest_path.read_text(encoding="utf-8").splitlines()
]
self.assertEqual(persisted, records)
def test_sample_video_frames_uses_nearly_complete_frames_when_ffmpeg_exits_nonzero(self):
with tempfile.TemporaryDirectory() as tmp:
root = Path(tmp)
video_id = "video-abc"
frame_dir = root / "frames" / video_id
def run_with_nonzero_exit(command, *args, **kwargs):
frame_dir.mkdir(parents=True, exist_ok=True)
for index in range(1, 597):
(frame_dir / f"{index:06d}.jpg").write_bytes(b"jpg")
raise subprocess.CalledProcessError(
returncode=1,
cmd=command,
stderr="trailing decoder error after near-complete chunk",
)
with patch("subprocess.run", side_effect=run_with_nonzero_exit):
records = sample_video_frames(
{
"video_id": video_id,
"path": str(root / "input.mp4"),
"codec_name": "hevc",
"actual_begin": 1000,
"actual_end": 1600,
},
root,
{
"prefer_nvdec": True,
"allow_cpu_fallback": False,
"hwaccel": "cuda",
"codec_decoders": {"hevc": "hevc_cuvid"},
"frame_fps": 1,
"frame_width": 640,
"jpeg_quality": 4,
"timeout_seconds_per_video": 30,
},
)
self.assertEqual(len(records), 596)
self.assertEqual({record["status"] for record in records}, {"sampled"})
self.assertIn("near-complete chunk", records[0]["stderr_summary"])
if __name__ == "__main__":
unittest.main()