266 lines
8.0 KiB
Python
266 lines
8.0 KiB
Python
from __future__ import annotations
|
|
|
|
import math
|
|
import subprocess
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
from .frames import build_frame_records
|
|
from .manifest import read_jsonl, write_manifest
|
|
from .timeline import DEFAULT_TIMEZONE, timeline_start_epoch
|
|
|
|
|
|
NVDEC_CODECS = {"h264", "hevc"}
|
|
|
|
|
|
def build_sample_command(
|
|
video_path: str | Path,
|
|
output_dir: str | Path,
|
|
video_id: str,
|
|
ffmpeg_config: dict[str, Any],
|
|
*,
|
|
codec_name: str | None,
|
|
max_frames: int | None = None,
|
|
max_duration_seconds: float | None = None,
|
|
) -> list[str]:
|
|
frame_dir = Path(output_dir).expanduser() / "frames" / video_id
|
|
frame_pattern = frame_dir / "%06d.jpg"
|
|
command = ["ffmpeg", "-hide_banner", "-y"]
|
|
|
|
codec = (codec_name or "").lower()
|
|
prefer_nvdec = bool(ffmpeg_config.get("prefer_nvdec", True))
|
|
allow_cpu_fallback = bool(ffmpeg_config.get("allow_cpu_fallback", False))
|
|
decoders = ffmpeg_config.get("codec_decoders", {})
|
|
decoder = decoders.get(codec) if isinstance(decoders, dict) else None
|
|
|
|
if prefer_nvdec and codec in NVDEC_CODECS and decoder:
|
|
command.extend(
|
|
[
|
|
"-hwaccel",
|
|
str(ffmpeg_config.get("hwaccel", "cuda")),
|
|
"-c:v",
|
|
str(decoder),
|
|
]
|
|
)
|
|
elif not allow_cpu_fallback:
|
|
raise ValueError(
|
|
f"NVDEC decoder is required for codec {codec_name!r}; CPU fallback is disabled"
|
|
)
|
|
|
|
frame_fps = ffmpeg_config.get("frame_fps", 1)
|
|
frame_width = ffmpeg_config.get("frame_width", 640)
|
|
jpeg_quality = ffmpeg_config.get("jpeg_quality", 4)
|
|
command.extend(
|
|
[
|
|
"-i",
|
|
str(Path(video_path).expanduser()),
|
|
]
|
|
)
|
|
if max_duration_seconds is not None and max_duration_seconds > 0:
|
|
command.extend(["-t", f"{max_duration_seconds:g}"])
|
|
command.extend(
|
|
[
|
|
"-vf",
|
|
f"fps={frame_fps},scale={frame_width}:-2",
|
|
"-q:v",
|
|
str(jpeg_quality),
|
|
]
|
|
)
|
|
if max_frames is not None and max_frames > 0:
|
|
command.extend(["-frames:v", str(max_frames)])
|
|
command.append(str(frame_pattern))
|
|
return command
|
|
|
|
|
|
def sample_video_frames(
|
|
video_record: dict[str, Any],
|
|
output_dir: str | Path,
|
|
ffmpeg_config: dict[str, Any],
|
|
*,
|
|
manifest_path: str | Path | None = None,
|
|
) -> list[dict[str, Any]]:
|
|
video_id = str(video_record["video_id"])
|
|
output_root = Path(output_dir).expanduser().resolve(strict=False)
|
|
frame_dir = output_root / "frames" / video_id
|
|
frame_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
try:
|
|
max_frames = _max_output_frames(video_record, ffmpeg_config)
|
|
timezone_name = str(ffmpeg_config.get("timezone", DEFAULT_TIMEZONE))
|
|
start_epoch = timeline_start_epoch(video_record)
|
|
command = build_sample_command(
|
|
video_record.get("path") or video_record.get("source_path"),
|
|
output_root,
|
|
video_id,
|
|
ffmpeg_config,
|
|
codec_name=video_record.get("codec_name"),
|
|
max_frames=max_frames,
|
|
max_duration_seconds=_record_duration_seconds(video_record),
|
|
)
|
|
completed = subprocess.run(
|
|
command,
|
|
capture_output=True,
|
|
text=True,
|
|
check=True,
|
|
timeout=int(ffmpeg_config.get("timeout_seconds_per_video", 3600)),
|
|
)
|
|
records = build_frame_records(
|
|
video_id,
|
|
output_root,
|
|
frame_dir.glob("*.jpg"),
|
|
frame_fps=float(ffmpeg_config.get("frame_fps", 1)),
|
|
timeline_start_epoch=start_epoch,
|
|
timezone_name=timezone_name,
|
|
)
|
|
_attach_success_evidence(
|
|
records,
|
|
command,
|
|
stderr=completed.stderr,
|
|
)
|
|
except subprocess.CalledProcessError as exc:
|
|
records = build_frame_records(
|
|
video_id,
|
|
output_root,
|
|
frame_dir.glob("*.jpg"),
|
|
frame_fps=float(ffmpeg_config.get("frame_fps", 1)),
|
|
timeline_start_epoch=start_epoch,
|
|
timezone_name=timezone_name,
|
|
)
|
|
if _has_usable_frames_after_nonzero_exit(
|
|
records,
|
|
max_frames=max_frames,
|
|
ffmpeg_config=ffmpeg_config,
|
|
):
|
|
_attach_success_evidence(
|
|
records,
|
|
command,
|
|
stderr=exc.stderr,
|
|
)
|
|
else:
|
|
records = [_failure_record(video_id, exc)]
|
|
except (subprocess.TimeoutExpired, ValueError) as exc:
|
|
records = [_failure_record(video_id, exc)]
|
|
|
|
if manifest_path is not None:
|
|
_replace_video_records(Path(manifest_path), video_id, records)
|
|
return records
|
|
|
|
|
|
def _has_usable_frames_after_nonzero_exit(
|
|
records: list[dict[str, Any]],
|
|
*,
|
|
max_frames: int | None,
|
|
ffmpeg_config: dict[str, Any],
|
|
) -> bool:
|
|
if not records:
|
|
return False
|
|
if max_frames is None or len(records) >= max_frames:
|
|
return True
|
|
min_ratio = float(ffmpeg_config.get("min_success_frame_ratio", 0.98))
|
|
missing_tolerance = int(ffmpeg_config.get("max_missing_success_frames", 5))
|
|
return (
|
|
len(records) >= math.floor(max_frames * min_ratio)
|
|
or max_frames - len(records) <= missing_tolerance
|
|
)
|
|
|
|
|
|
def _replace_video_records(
|
|
manifest_path: Path,
|
|
video_id: str,
|
|
new_records: list[dict[str, Any]],
|
|
) -> None:
|
|
existing = [
|
|
record
|
|
for record in read_jsonl(manifest_path)
|
|
if str(record.get("video_id")) != video_id
|
|
]
|
|
write_manifest(manifest_path, [*existing, *new_records])
|
|
|
|
|
|
def _failure_record(video_id: str, exc: BaseException) -> dict[str, Any]:
|
|
return {
|
|
"video_id": video_id,
|
|
"frame_id": None,
|
|
"frame_path": None,
|
|
"offset_seconds": None,
|
|
"timecode": None,
|
|
"pts_time": None,
|
|
"status": "sample_failed",
|
|
"retry_count": 0,
|
|
"last_error": _error_text(exc),
|
|
}
|
|
|
|
|
|
def _attach_success_evidence(
|
|
records: list[dict[str, Any]],
|
|
command: list[str],
|
|
*,
|
|
stderr: str | None,
|
|
) -> None:
|
|
evidence = {
|
|
"ffmpeg_command": command,
|
|
"decoder": _command_value_after(command, "-c:v"),
|
|
"hwaccel": _command_value_after(command, "-hwaccel"),
|
|
"stderr_summary": _stderr_summary(stderr),
|
|
}
|
|
for record in records:
|
|
record.update(evidence)
|
|
|
|
|
|
def _command_value_after(command: list[str], flag: str) -> str | None:
|
|
try:
|
|
index = command.index(flag)
|
|
except ValueError:
|
|
return None
|
|
if index + 1 >= len(command):
|
|
return None
|
|
return command[index + 1]
|
|
|
|
|
|
def _stderr_summary(stderr: str | None, *, limit: int = 2000) -> str:
|
|
if not stderr:
|
|
return ""
|
|
text = stderr.strip()
|
|
if len(text) <= limit:
|
|
return text
|
|
return text[:limit]
|
|
|
|
|
|
def _error_text(exc: BaseException) -> str:
|
|
if isinstance(exc, subprocess.CalledProcessError):
|
|
return str(exc.stderr or exc.stdout or exc)
|
|
if isinstance(exc, subprocess.TimeoutExpired):
|
|
return f"ffmpeg timed out after {exc.timeout}s"
|
|
return str(exc)
|
|
|
|
|
|
def _max_output_frames(
|
|
video_record: dict[str, Any],
|
|
ffmpeg_config: dict[str, Any],
|
|
) -> int | None:
|
|
frame_fps = _optional_float(ffmpeg_config.get("frame_fps", 1))
|
|
if frame_fps is None or frame_fps <= 0:
|
|
return None
|
|
duration_seconds = _record_duration_seconds(video_record)
|
|
if duration_seconds is None or duration_seconds <= 0:
|
|
return None
|
|
return max(1, math.ceil(duration_seconds * frame_fps) + 1)
|
|
|
|
|
|
def _record_duration_seconds(video_record: dict[str, Any]) -> float | None:
|
|
for begin_key, end_key in (
|
|
("actual_begin", "actual_end"),
|
|
("requested_begin", "requested_end"),
|
|
):
|
|
begin = _optional_float(video_record.get(begin_key))
|
|
end = _optional_float(video_record.get(end_key))
|
|
if begin is not None and end is not None and end > begin:
|
|
return end - begin
|
|
return _optional_float(video_record.get("duration_seconds"))
|
|
|
|
|
|
def _optional_float(value: Any) -> float | None:
|
|
if value is None or value == "":
|
|
return None
|
|
return float(value)
|