from __future__ import annotations import argparse from contextlib import contextmanager import json import time from datetime import datetime, timezone from pathlib import Path from typing import Callable, Iterator, Sequence, TypeVar from .aggregator import aggregate_outputs from .clips import build_clip_records from .config import DEFAULT_CONFIG_PATH, load_config from .discovery import discover_videos from .ffmpeg_sampler import sample_video_frames from .hik_cloud import download_hik_cloud_recordings from .manifest import read_jsonl, write_manifest from .paths import stable_video_id from .probe import probe_video from .result_parser import build_clip_result from .timeline import DEFAULT_TIMEZONE, format_beijing_time, timeline_start_epoch from .vlm_client import infer_clip T = TypeVar("T") def _new_phase_timings() -> dict[str, object]: return { "schema_version": "phase-timings-v1", "started_at": _utc_now_iso(), "updated_at": _utc_now_iso(), "phases": {}, } def _write_phase_timings( output_dir: Path, phase_timings: dict[str, object], ) -> None: phase_timings["updated_at"] = _utc_now_iso() (output_dir / "phase_timings.json").write_text( json.dumps(phase_timings, ensure_ascii=False, sort_keys=True, indent=2) + "\n", encoding="utf-8", ) def _measure_phase( phase_timings: dict[str, object] | None, phase_name: str, func: Callable[[], T], ) -> T: with _timed_phase(phase_timings, phase_name): return func() @contextmanager def _timed_phase( phase_timings: dict[str, object] | None, phase_name: str, ) -> Iterator[None]: started = time.perf_counter() try: yield finally: if phase_timings is not None: phases = phase_timings.get("phases") if not isinstance(phases, dict): phases = {} phase_timings["phases"] = phases previous = phases.get(phase_name, 0) if not isinstance(previous, (int, float)): previous = 0 phases[phase_name] = round( float(previous) + time.perf_counter() - started, 6, ) def _utc_now_iso() -> str: return datetime.now(timezone.utc).isoformat() def main(argv: Sequence[str] | None = None) -> int: parser = argparse.ArgumentParser( description="Local video batch analysis PoC entrypoint." ) parser.add_argument("--config", default=str(DEFAULT_CONFIG_PATH)) parser.add_argument("--input-dir") parser.add_argument("--output-dir") parser.add_argument("--dry-run", action="store_true") parser.add_argument("--until", choices=["clips", "inference"]) parser.add_argument("--limit-clips", type=int) args = parser.parse_args(argv) config = load_config( args.config, input_dir=args.input_dir, output_dir=args.output_dir, ) if args.dry_run and args.until: parser.error("--dry-run cannot be combined with --until") if args.limit_clips is not None and args.limit_clips < 0: parser.error("--limit-clips must be non-negative") output_dir = Path(config["output"]["dir"]) output_dir.mkdir(parents=True, exist_ok=True) phase_timings = _new_phase_timings() video_manifest_path = output_dir / "video_manifest.jsonl" resume_enabled = bool(config.get("output", {}).get("resume", False)) records = _load_resume_records( video_manifest_path, resume=resume_enabled, ) record_indexes = { _record_key(record): index for index, record in enumerate(records) if _record_key(record) is not None } try: _acquire_source_records( config, output_dir, records, record_indexes, download_source=not args.dry_run, phase_timings=phase_timings, ) except ValueError as exc: parser.error(str(exc)) write_manifest(video_manifest_path, records) _write_phase_timings(output_dir, phase_timings) if args.dry_run: return 0 clip_manifest_path = output_dir / "clip_manifest.jsonl" existing_clip_records = read_jsonl(clip_manifest_path) if resume_enabled else [] existing_clip_video_ids = { str(record.get("video_id")) for record in existing_clip_records if record.get("video_id") } frame_manifest_path = output_dir / "frame_manifest.jsonl" frame_records = read_jsonl(frame_manifest_path) if resume_enabled else [] timezone_name = str(config.get("runtime", {}).get("timezone", DEFAULT_TIMEZONE)) backfilled_frame_video_ids = _backfill_frame_beijing_times( frame_records, records, timezone_name=timezone_name, ) existing_sampled_video_ids = { str(record.get("video_id")) for record in frame_records if record.get("status") == "sampled" and record.get("video_id") } changed_frame_video_ids: set[str] = set(backfilled_frame_video_ids) with _timed_phase(phase_timings, "frame_sampling_seconds"): for record in records: if record.get("status") != "probed": continue video_id = str(record.get("video_id")) if args.until == "inference" and video_id in existing_clip_video_ids: continue if video_id in existing_sampled_video_ids: continue frame_records = _without_video_records(frame_records, video_id) ffmpeg_config = dict(config["ffmpeg"]) ffmpeg_config["timezone"] = timezone_name frame_records.extend( sample_video_frames( record, output_dir, ffmpeg_config, manifest_path=None, ) ) changed_frame_video_ids.add(video_id) write_manifest(frame_manifest_path, frame_records) _write_phase_timings(output_dir, phase_timings) sampled_video_ids = { str(record.get("video_id")) for record in frame_records if record.get("status") == "sampled" and record.get("video_id") } clip_rebuild_video_ids = changed_frame_video_ids | ( sampled_video_ids - existing_clip_video_ids ) clip_records = [ record for record in existing_clip_records if str(record.get("video_id")) not in clip_rebuild_video_ids ] frames_to_build = [ record for record in frame_records if str(record.get("video_id")) in clip_rebuild_video_ids ] with _timed_phase(phase_timings, "clip_generation_seconds"): clip_records.extend(build_clip_records(frames_to_build, config["clip"])) write_manifest(output_dir / "clip_manifest.jsonl", clip_records) _write_phase_timings(output_dir, phase_timings) if args.until == "clips": return 0 with _timed_phase(phase_timings, "inference_seconds"): _run_inference( clip_records, records, output_dir, config, limit_clips=args.limit_clips, resume=resume_enabled, ) _write_phase_timings(output_dir, phase_timings) if args.until == "inference": return 0 with _timed_phase(phase_timings, "aggregation_seconds"): aggregate_outputs(output_dir, config) _write_phase_timings(output_dir, phase_timings) return 0 def _load_resume_records(path: Path, *, resume: bool) -> list[dict[str, object]]: if not resume: return [] return read_jsonl(path) def _record_key(record: dict[str, object]) -> str | None: video_id = record.get("video_id") if video_id: return str(video_id) path = record.get("path") if path: return stable_video_id(str(path)) return None def _acquire_source_records( config: dict[str, object], output_dir: Path, records: list[dict[str, object]], record_indexes: dict[str, int], *, download_source: bool = True, phase_timings: dict[str, object] | None = None, ) -> None: source_records = _measure_phase( phase_timings, "source_acquisition_seconds", lambda: _source_video_records( config, output_dir, download_source=download_source, ) ) with _timed_phase(phase_timings, "video_probe_seconds"): for source_record in source_records: path = source_record.get("path") if not path: continue video_id = stable_video_id(str(path)) existing_index = record_indexes.get(video_id) if ( existing_index is not None and records[existing_index].get("status") == "probed" ): continue probe_record = probe_video( str(path), timeout_seconds=config["ffprobe"]["timeout_seconds"], ) record = {**source_record, **probe_record, "video_id": video_id} if existing_index is None: record_indexes[video_id] = len(records) records.append(record) else: records[existing_index] = record def _source_video_records( config: dict[str, object], output_dir: Path, *, download_source: bool = True, ) -> list[dict[str, object]]: source_config = config.get("source", {}) source_mode = "local" if isinstance(source_config, dict): source_mode = str(source_config.get("mode", "local")) if source_mode == "local": videos = discover_videos( config["input"]["dir"], config["input"]["extensions"], recursive=config["input"]["recursive"], ) return [{"path": path} for path in videos] if source_mode == "hik_cloud": return [ record for record in download_hik_cloud_recordings( config, output_dir, download=download_source, ) if record.get("status") == "downloaded" ] raise ValueError(f"unsupported source.mode: {source_mode}") def _without_video_records( records: list[dict[str, object]], video_id: str, ) -> list[dict[str, object]]: return [record for record in records if str(record.get("video_id")) != video_id] def _backfill_frame_beijing_times( frame_records: list[dict[str, object]], video_records: list[dict[str, object]], *, timezone_name: str, ) -> set[str]: video_by_id = { str(record.get("video_id")): record for record in video_records if record.get("video_id") } changed_video_ids: set[str] = set() for frame_record in frame_records: if frame_record.get("status") != "sampled" or frame_record.get("beijing_time"): continue video_id = str(frame_record.get("video_id") or "") start_epoch = timeline_start_epoch(video_by_id.get(video_id, {})) beijing_time = format_beijing_time( start_epoch, offset_seconds=float(frame_record.get("offset_seconds") or 0), timezone_name=timezone_name, ) if beijing_time is None: continue frame_record["beijing_time"] = beijing_time changed_video_ids.add(video_id) return changed_video_ids def _run_inference( clip_records: list[dict[str, object]], video_records: list[dict[str, object]], output_dir: Path, config: dict[str, object], *, limit_clips: int | None, resume: bool, ) -> None: results_path = output_dir / "clip_results.jsonl" result_records = read_jsonl(results_path) if resume else [] clip_by_id = { str(record.get("clip_id")): record for record in clip_records if record.get("clip_id") } result_records = [ _refresh_result_timeline(record, clip_by_id, config) for record in result_records ] ok_clip_ids = { str(record.get("clip_id")) for record in result_records if record.get("status") == "ok" and record.get("clip_id") } video_by_id = { str(record.get("video_id")): record for record in video_records if record.get("video_id") } processed = 0 for clip_record in clip_records: clip_id = str(clip_record.get("clip_id")) if clip_id in ok_clip_ids: continue if limit_clips is not None and processed >= limit_clips: break result_records = [ record for record in result_records if str(record.get("clip_id")) != clip_id ] video_record = video_by_id.get(str(clip_record.get("video_id")), {}) result = _infer_and_parse_clip(clip_record, video_record, output_dir, config) result_records.append(result) _write_jsonl_exact(results_path, result_records) processed += 1 _write_jsonl_exact(results_path, result_records) def _refresh_result_timeline( result_record: dict[str, object], clip_by_id: dict[str, dict[str, object]], config: dict[str, object], ) -> dict[str, object]: clip_record = clip_by_id.get(str(result_record.get("clip_id"))) if not clip_record: return result_record if not _clip_has_beijing_timing(clip_record): return result_record timeline = dict(result_record.get("monitoring_timeline") or {}) timeline.update( { "timezone": config.get("runtime", {}).get("timezone", DEFAULT_TIMEZONE), "clip_start_seconds": clip_record.get("clip_start_seconds"), "clip_end_seconds": clip_record.get("clip_end_seconds"), "clip_start_timecode": clip_record.get("clip_start_timecode"), "clip_end_timecode": clip_record.get("clip_end_timecode"), "clip_start_beijing_time": clip_record.get("clip_start_beijing_time"), "clip_end_beijing_time": clip_record.get("clip_end_beijing_time"), "frame_times": clip_record.get("frame_times", []), } ) refreshed = dict(result_record) refreshed["monitoring_timeline"] = timeline return refreshed def _clip_has_beijing_timing(clip_record: dict[str, object]) -> bool: if clip_record.get("clip_start_beijing_time") or clip_record.get("clip_end_beijing_time"): return True for frame in clip_record.get("frame_times", []) or []: if isinstance(frame, dict) and frame.get("beijing_time"): return True return False def _infer_and_parse_clip( clip_record: dict[str, object], video_record: dict[str, object], output_dir: Path, config: dict[str, object], ) -> dict[str, object]: schema_config = config.get("schema", {}) parse_retry = 0 if isinstance(schema_config, dict): parse_retry = int(schema_config.get("parse_retry", 0)) attempts = parse_retry + 1 result: dict[str, object] | None = None for attempt in range(attempts): try: inference = infer_clip( clip_record, output_dir, config["vlm"], config["prompt"], ) except Exception as exc: return build_clip_result( "", clip_record, video_record, config, processing={}, status="inference_failed", error=str(exc), ) result = build_clip_result( str(inference.get("raw_response", "")), clip_record, video_record, config, processing={ "latency_ms": inference.get("latency_ms"), "http_status": inference.get("http_status"), "attempt": attempt + 1, }, ) if result.get("status") != "parse_failed": return result if result is None: raise RuntimeError("unreachable inference state") return result def _write_jsonl_exact( path: Path, records: list[dict[str, object]], ) -> None: path.parent.mkdir(parents=True, exist_ok=True) with path.open("w", encoding="utf-8") as handle: for record in records: handle.write(json.dumps(record, ensure_ascii=False, sort_keys=True) + "\n") if __name__ == "__main__": raise SystemExit(main())