from __future__ import annotations import math import subprocess from pathlib import Path from typing import Any from .frames import build_frame_records from .manifest import read_jsonl, write_manifest from .timeline import DEFAULT_TIMEZONE, timeline_start_epoch NVDEC_CODECS = {"h264", "hevc"} def build_sample_command( video_path: str | Path, output_dir: str | Path, video_id: str, ffmpeg_config: dict[str, Any], *, codec_name: str | None, max_frames: int | None = None, max_duration_seconds: float | None = None, ) -> list[str]: frame_dir = Path(output_dir).expanduser() / "frames" / video_id frame_pattern = frame_dir / "%06d.jpg" command = ["ffmpeg", "-hide_banner", "-y"] codec = (codec_name or "").lower() prefer_nvdec = bool(ffmpeg_config.get("prefer_nvdec", True)) allow_cpu_fallback = bool(ffmpeg_config.get("allow_cpu_fallback", False)) decoders = ffmpeg_config.get("codec_decoders", {}) decoder = decoders.get(codec) if isinstance(decoders, dict) else None if prefer_nvdec and codec in NVDEC_CODECS and decoder: command.extend( [ "-hwaccel", str(ffmpeg_config.get("hwaccel", "cuda")), "-c:v", str(decoder), ] ) elif not allow_cpu_fallback: raise ValueError( f"NVDEC decoder is required for codec {codec_name!r}; CPU fallback is disabled" ) frame_fps = ffmpeg_config.get("frame_fps", 1) frame_width = ffmpeg_config.get("frame_width", 640) jpeg_quality = ffmpeg_config.get("jpeg_quality", 4) command.extend( [ "-i", str(Path(video_path).expanduser()), ] ) if max_duration_seconds is not None and max_duration_seconds > 0: command.extend(["-t", f"{max_duration_seconds:g}"]) command.extend( [ "-vf", f"fps={frame_fps},scale={frame_width}:-2", "-q:v", str(jpeg_quality), ] ) if max_frames is not None and max_frames > 0: command.extend(["-frames:v", str(max_frames)]) command.append(str(frame_pattern)) return command def sample_video_frames( video_record: dict[str, Any], output_dir: str | Path, ffmpeg_config: dict[str, Any], *, manifest_path: str | Path | None = None, ) -> list[dict[str, Any]]: video_id = str(video_record["video_id"]) output_root = Path(output_dir).expanduser().resolve(strict=False) frame_dir = output_root / "frames" / video_id frame_dir.mkdir(parents=True, exist_ok=True) try: max_frames = _max_output_frames(video_record, ffmpeg_config) timezone_name = str(ffmpeg_config.get("timezone", DEFAULT_TIMEZONE)) start_epoch = timeline_start_epoch(video_record) command = build_sample_command( video_record.get("path") or video_record.get("source_path"), output_root, video_id, ffmpeg_config, codec_name=video_record.get("codec_name"), max_frames=max_frames, max_duration_seconds=_record_duration_seconds(video_record), ) completed = subprocess.run( command, capture_output=True, text=True, check=True, timeout=int(ffmpeg_config.get("timeout_seconds_per_video", 3600)), ) records = build_frame_records( video_id, output_root, frame_dir.glob("*.jpg"), frame_fps=float(ffmpeg_config.get("frame_fps", 1)), timeline_start_epoch=start_epoch, timezone_name=timezone_name, ) _attach_success_evidence( records, command, stderr=completed.stderr, ) except subprocess.CalledProcessError as exc: records = build_frame_records( video_id, output_root, frame_dir.glob("*.jpg"), frame_fps=float(ffmpeg_config.get("frame_fps", 1)), timeline_start_epoch=start_epoch, timezone_name=timezone_name, ) if _has_usable_frames_after_nonzero_exit( records, max_frames=max_frames, ffmpeg_config=ffmpeg_config, ): _attach_success_evidence( records, command, stderr=exc.stderr, ) else: records = [_failure_record(video_id, exc)] except (subprocess.TimeoutExpired, ValueError) as exc: records = [_failure_record(video_id, exc)] if manifest_path is not None: _replace_video_records(Path(manifest_path), video_id, records) return records def _has_usable_frames_after_nonzero_exit( records: list[dict[str, Any]], *, max_frames: int | None, ffmpeg_config: dict[str, Any], ) -> bool: if not records: return False if max_frames is None or len(records) >= max_frames: return True min_ratio = float(ffmpeg_config.get("min_success_frame_ratio", 0.98)) missing_tolerance = int(ffmpeg_config.get("max_missing_success_frames", 5)) return ( len(records) >= math.floor(max_frames * min_ratio) or max_frames - len(records) <= missing_tolerance ) def _replace_video_records( manifest_path: Path, video_id: str, new_records: list[dict[str, Any]], ) -> None: existing = [ record for record in read_jsonl(manifest_path) if str(record.get("video_id")) != video_id ] write_manifest(manifest_path, [*existing, *new_records]) def _failure_record(video_id: str, exc: BaseException) -> dict[str, Any]: return { "video_id": video_id, "frame_id": None, "frame_path": None, "offset_seconds": None, "timecode": None, "pts_time": None, "status": "sample_failed", "retry_count": 0, "last_error": _error_text(exc), } def _attach_success_evidence( records: list[dict[str, Any]], command: list[str], *, stderr: str | None, ) -> None: evidence = { "ffmpeg_command": command, "decoder": _command_value_after(command, "-c:v"), "hwaccel": _command_value_after(command, "-hwaccel"), "stderr_summary": _stderr_summary(stderr), } for record in records: record.update(evidence) def _command_value_after(command: list[str], flag: str) -> str | None: try: index = command.index(flag) except ValueError: return None if index + 1 >= len(command): return None return command[index + 1] def _stderr_summary(stderr: str | None, *, limit: int = 2000) -> str: if not stderr: return "" text = stderr.strip() if len(text) <= limit: return text return text[:limit] def _error_text(exc: BaseException) -> str: if isinstance(exc, subprocess.CalledProcessError): return str(exc.stderr or exc.stdout or exc) if isinstance(exc, subprocess.TimeoutExpired): return f"ffmpeg timed out after {exc.timeout}s" return str(exc) def _max_output_frames( video_record: dict[str, Any], ffmpeg_config: dict[str, Any], ) -> int | None: frame_fps = _optional_float(ffmpeg_config.get("frame_fps", 1)) if frame_fps is None or frame_fps <= 0: return None duration_seconds = _record_duration_seconds(video_record) if duration_seconds is None or duration_seconds <= 0: return None return max(1, math.ceil(duration_seconds * frame_fps) + 1) def _record_duration_seconds(video_record: dict[str, Any]) -> float | None: for begin_key, end_key in ( ("actual_begin", "actual_end"), ("requested_begin", "requested_end"), ): begin = _optional_float(video_record.get(begin_key)) end = _optional_float(video_record.get(end_key)) if begin is not None and end is not None and end > begin: return end - begin return _optional_float(video_record.get("duration_seconds")) def _optional_float(value: Any) -> float | None: if value is None or value == "": return None return float(value)