from __future__ import annotations import json import subprocess from pathlib import Path from typing import Any def probe_video(path: str | Path, *, timeout_seconds: int = 30) -> dict[str, Any]: video_path = Path(path).expanduser().resolve(strict=False) base_record: dict[str, Any] = { "path": str(video_path), "status": "probe_failed", "retry_count": 0, "last_error": None, } command = [ "ffprobe", "-v", "error", "-print_format", "json", "-show_format", "-show_streams", str(video_path), ] try: completed = subprocess.run( command, capture_output=True, text=True, check=True, timeout=timeout_seconds, ) payload = json.loads(completed.stdout or "{}") video_stream = _first_video_stream(payload) format_info = payload.get("format", {}) return { **base_record, "status": "probed", "duration_seconds": _optional_float(format_info.get("duration")), "codec_name": video_stream.get("codec_name"), "width": _optional_int(video_stream.get("width")), "height": _optional_int(video_stream.get("height")), "fps": _parse_frame_rate( video_stream.get("avg_frame_rate") or video_stream.get("r_frame_rate") ), "format_name": format_info.get("format_name"), "start_time": _optional_float(format_info.get("start_time")), } except subprocess.TimeoutExpired as exc: base_record["last_error"] = f"ffprobe timed out after {timeout_seconds}s" if exc.stderr: base_record["last_error"] += f": {exc.stderr}" return base_record except subprocess.CalledProcessError as exc: base_record["last_error"] = _error_text(exc.stderr or exc.stdout or str(exc)) return base_record except (json.JSONDecodeError, ValueError) as exc: base_record["last_error"] = f"ffprobe parse failed: {exc}" return base_record def _first_video_stream(payload: dict[str, Any]) -> dict[str, Any]: for stream in payload.get("streams", []): if stream.get("codec_type") == "video": return stream raise ValueError("ffprobe output did not contain a video stream") def _parse_frame_rate(value: str | None) -> float | None: if not value or value == "0/0": return None if "/" in value: numerator, denominator = value.split("/", 1) denominator_value = float(denominator) if denominator_value == 0: return None return float(numerator) / denominator_value return float(value) def _optional_float(value: Any) -> float | None: if value is None or value == "": return None return float(value) def _optional_int(value: Any) -> int | None: if value is None or value == "": return None return int(value) def _error_text(value: Any) -> str: if isinstance(value, bytes): return value.decode("utf-8", errors="replace").strip() return str(value).strip()