from __future__ import annotations from pathlib import Path from typing import Any from .frames import seconds_to_timecode from .manifest import read_jsonl, write_manifest from .timeline import derive_time_from_reference def build_clip_records( frame_records: list[dict[str, Any]], clip_config: dict[str, Any], ) -> list[dict[str, Any]]: sampled_frames = [ record for record in frame_records if record.get("status") == "sampled" ] by_video: dict[str, list[dict[str, Any]]] = {} for frame in sampled_frames: by_video.setdefault(str(frame["video_id"]), []).append(frame) clips = [] for video_id, frames in sorted(by_video.items()): clips.extend(_build_video_clips(video_id, frames, clip_config)) return clips def build_clip_records_from_manifest( frame_manifest_path: str | Path, clip_manifest_path: str | Path, clip_config: dict[str, Any], ) -> list[dict[str, Any]]: clips = build_clip_records(read_jsonl(frame_manifest_path), clip_config) write_manifest(clip_manifest_path, clips) return clips def _build_video_clips( video_id: str, frames: list[dict[str, Any]], clip_config: dict[str, Any], ) -> list[dict[str, Any]]: sorted_frames = sorted(frames, key=lambda frame: float(frame["offset_seconds"])) if not sorted_frames: return [] length_seconds = float(clip_config.get("length_seconds", 10)) stride_seconds = float(clip_config.get("stride_seconds", length_seconds)) frames_per_clip = int(clip_config.get("frames_per_clip", 8)) min_frames_per_clip = int(clip_config.get("min_frames_per_clip", 4)) max_offset = max(float(frame["offset_seconds"]) for frame in sorted_frames) timeline_end = _estimated_timeline_end(sorted_frames) clips = [] clip_index = 1 start = 0.0 while start <= max_offset: end = min(start + length_seconds, timeline_end) in_window = [ frame for frame in sorted_frames if start <= float(frame["offset_seconds"]) < end ] if len(in_window) >= min_frames_per_clip: selected_frames = _uniform_sample(in_window, frames_per_clip) start_beijing_time, end_beijing_time = _clip_beijing_time_range( in_window, start, end, ) clip = { "video_id": video_id, "clip_id": f"{video_id}_c{clip_index:06d}", "clip_start_seconds": round(start, 6), "clip_end_seconds": round(end, 6), "clip_start_timecode": seconds_to_timecode(start), "clip_end_timecode": seconds_to_timecode(end), "frame_times": [_frame_time(frame) for frame in selected_frames], "status": "pending", "retry_count": 0, "last_error": None, } if start_beijing_time is not None: clip["clip_start_beijing_time"] = start_beijing_time if end_beijing_time is not None: clip["clip_end_beijing_time"] = end_beijing_time clips.append(clip) clip_index += 1 start += stride_seconds return clips def _estimated_timeline_end(frames: list[dict[str, Any]]) -> float: offsets = [float(frame["offset_seconds"]) for frame in frames] if len(offsets) < 2: return offsets[-1] intervals = [ current - previous for previous, current in zip(offsets, offsets[1:]) if current > previous ] if not intervals: return offsets[-1] return offsets[-1] + min(intervals) def _uniform_sample( frames: list[dict[str, Any]], frames_per_clip: int, ) -> list[dict[str, Any]]: if len(frames) <= frames_per_clip: return frames if frames_per_clip <= 1: return [frames[0]] last_index = len(frames) - 1 indexes = [ round(position * last_index / (frames_per_clip - 1)) for position in range(frames_per_clip) ] return [frames[index] for index in indexes] def _frame_time(frame: dict[str, Any]) -> dict[str, Any]: record = { "frame_id": frame.get("frame_id"), "frame_path": frame.get("frame_path"), "offset_seconds": frame.get("offset_seconds"), "timecode": frame.get("timecode"), "pts_time": frame.get("pts_time"), } if frame.get("beijing_time") is not None: record["beijing_time"] = frame.get("beijing_time") return record def _clip_beijing_time_range( frames: list[dict[str, Any]], start: float, end: float, ) -> tuple[str | None, str | None]: for frame in frames: reference_time = frame.get("beijing_time") if not reference_time: continue reference_offset = frame.get("offset_seconds") return ( derive_time_from_reference( str(reference_time), reference_offset_seconds=reference_offset, target_offset_seconds=start, ), derive_time_from_reference( str(reference_time), reference_offset_seconds=reference_offset, target_offset_seconds=end, ), ) return None, None