Files
video-ai-analysis/video_ai_analysis_poc/probe.py
2026-06-17 11:33:54 +08:00

100 lines
3.1 KiB
Python

from __future__ import annotations
import json
import subprocess
from pathlib import Path
from typing import Any
def probe_video(path: str | Path, *, timeout_seconds: int = 30) -> dict[str, Any]:
video_path = Path(path).expanduser().resolve(strict=False)
base_record: dict[str, Any] = {
"path": str(video_path),
"status": "probe_failed",
"retry_count": 0,
"last_error": None,
}
command = [
"ffprobe",
"-v",
"error",
"-print_format",
"json",
"-show_format",
"-show_streams",
str(video_path),
]
try:
completed = subprocess.run(
command,
capture_output=True,
text=True,
check=True,
timeout=timeout_seconds,
)
payload = json.loads(completed.stdout or "{}")
video_stream = _first_video_stream(payload)
format_info = payload.get("format", {})
return {
**base_record,
"status": "probed",
"duration_seconds": _optional_float(format_info.get("duration")),
"codec_name": video_stream.get("codec_name"),
"width": _optional_int(video_stream.get("width")),
"height": _optional_int(video_stream.get("height")),
"fps": _parse_frame_rate(
video_stream.get("avg_frame_rate") or video_stream.get("r_frame_rate")
),
"format_name": format_info.get("format_name"),
"start_time": _optional_float(format_info.get("start_time")),
}
except subprocess.TimeoutExpired as exc:
base_record["last_error"] = f"ffprobe timed out after {timeout_seconds}s"
if exc.stderr:
base_record["last_error"] += f": {exc.stderr}"
return base_record
except subprocess.CalledProcessError as exc:
base_record["last_error"] = _error_text(exc.stderr or exc.stdout or str(exc))
return base_record
except (json.JSONDecodeError, ValueError) as exc:
base_record["last_error"] = f"ffprobe parse failed: {exc}"
return base_record
def _first_video_stream(payload: dict[str, Any]) -> dict[str, Any]:
for stream in payload.get("streams", []):
if stream.get("codec_type") == "video":
return stream
raise ValueError("ffprobe output did not contain a video stream")
def _parse_frame_rate(value: str | None) -> float | None:
if not value or value == "0/0":
return None
if "/" in value:
numerator, denominator = value.split("/", 1)
denominator_value = float(denominator)
if denominator_value == 0:
return None
return float(numerator) / denominator_value
return float(value)
def _optional_float(value: Any) -> float | None:
if value is None or value == "":
return None
return float(value)
def _optional_int(value: Any) -> int | None:
if value is None or value == "":
return None
return int(value)
def _error_text(value: Any) -> str:
if isinstance(value, bytes):
return value.decode("utf-8", errors="replace").strip()
return str(value).strip()