159 lines
5.2 KiB
Python
159 lines
5.2 KiB
Python
from __future__ import annotations
|
|
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
from .frames import seconds_to_timecode
|
|
from .manifest import read_jsonl, write_manifest
|
|
from .timeline import derive_time_from_reference
|
|
|
|
|
|
def build_clip_records(
|
|
frame_records: list[dict[str, Any]],
|
|
clip_config: dict[str, Any],
|
|
) -> list[dict[str, Any]]:
|
|
sampled_frames = [
|
|
record for record in frame_records if record.get("status") == "sampled"
|
|
]
|
|
by_video: dict[str, list[dict[str, Any]]] = {}
|
|
for frame in sampled_frames:
|
|
by_video.setdefault(str(frame["video_id"]), []).append(frame)
|
|
|
|
clips = []
|
|
for video_id, frames in sorted(by_video.items()):
|
|
clips.extend(_build_video_clips(video_id, frames, clip_config))
|
|
return clips
|
|
|
|
|
|
def build_clip_records_from_manifest(
|
|
frame_manifest_path: str | Path,
|
|
clip_manifest_path: str | Path,
|
|
clip_config: dict[str, Any],
|
|
) -> list[dict[str, Any]]:
|
|
clips = build_clip_records(read_jsonl(frame_manifest_path), clip_config)
|
|
write_manifest(clip_manifest_path, clips)
|
|
return clips
|
|
|
|
|
|
def _build_video_clips(
|
|
video_id: str,
|
|
frames: list[dict[str, Any]],
|
|
clip_config: dict[str, Any],
|
|
) -> list[dict[str, Any]]:
|
|
sorted_frames = sorted(frames, key=lambda frame: float(frame["offset_seconds"]))
|
|
if not sorted_frames:
|
|
return []
|
|
|
|
length_seconds = float(clip_config.get("length_seconds", 10))
|
|
stride_seconds = float(clip_config.get("stride_seconds", length_seconds))
|
|
frames_per_clip = int(clip_config.get("frames_per_clip", 8))
|
|
min_frames_per_clip = int(clip_config.get("min_frames_per_clip", 4))
|
|
max_offset = max(float(frame["offset_seconds"]) for frame in sorted_frames)
|
|
timeline_end = _estimated_timeline_end(sorted_frames)
|
|
|
|
clips = []
|
|
clip_index = 1
|
|
start = 0.0
|
|
while start <= max_offset:
|
|
end = min(start + length_seconds, timeline_end)
|
|
in_window = [
|
|
frame
|
|
for frame in sorted_frames
|
|
if start <= float(frame["offset_seconds"]) < end
|
|
]
|
|
if len(in_window) >= min_frames_per_clip:
|
|
selected_frames = _uniform_sample(in_window, frames_per_clip)
|
|
start_beijing_time, end_beijing_time = _clip_beijing_time_range(
|
|
in_window,
|
|
start,
|
|
end,
|
|
)
|
|
clip = {
|
|
"video_id": video_id,
|
|
"clip_id": f"{video_id}_c{clip_index:06d}",
|
|
"clip_start_seconds": round(start, 6),
|
|
"clip_end_seconds": round(end, 6),
|
|
"clip_start_timecode": seconds_to_timecode(start),
|
|
"clip_end_timecode": seconds_to_timecode(end),
|
|
"frame_times": [_frame_time(frame) for frame in selected_frames],
|
|
"status": "pending",
|
|
"retry_count": 0,
|
|
"last_error": None,
|
|
}
|
|
if start_beijing_time is not None:
|
|
clip["clip_start_beijing_time"] = start_beijing_time
|
|
if end_beijing_time is not None:
|
|
clip["clip_end_beijing_time"] = end_beijing_time
|
|
clips.append(clip)
|
|
clip_index += 1
|
|
start += stride_seconds
|
|
return clips
|
|
|
|
|
|
def _estimated_timeline_end(frames: list[dict[str, Any]]) -> float:
|
|
offsets = [float(frame["offset_seconds"]) for frame in frames]
|
|
if len(offsets) < 2:
|
|
return offsets[-1]
|
|
intervals = [
|
|
current - previous
|
|
for previous, current in zip(offsets, offsets[1:])
|
|
if current > previous
|
|
]
|
|
if not intervals:
|
|
return offsets[-1]
|
|
return offsets[-1] + min(intervals)
|
|
|
|
|
|
def _uniform_sample(
|
|
frames: list[dict[str, Any]],
|
|
frames_per_clip: int,
|
|
) -> list[dict[str, Any]]:
|
|
if len(frames) <= frames_per_clip:
|
|
return frames
|
|
if frames_per_clip <= 1:
|
|
return [frames[0]]
|
|
last_index = len(frames) - 1
|
|
indexes = [
|
|
round(position * last_index / (frames_per_clip - 1))
|
|
for position in range(frames_per_clip)
|
|
]
|
|
return [frames[index] for index in indexes]
|
|
|
|
|
|
def _frame_time(frame: dict[str, Any]) -> dict[str, Any]:
|
|
record = {
|
|
"frame_id": frame.get("frame_id"),
|
|
"frame_path": frame.get("frame_path"),
|
|
"offset_seconds": frame.get("offset_seconds"),
|
|
"timecode": frame.get("timecode"),
|
|
"pts_time": frame.get("pts_time"),
|
|
}
|
|
if frame.get("beijing_time") is not None:
|
|
record["beijing_time"] = frame.get("beijing_time")
|
|
return record
|
|
|
|
|
|
def _clip_beijing_time_range(
|
|
frames: list[dict[str, Any]],
|
|
start: float,
|
|
end: float,
|
|
) -> tuple[str | None, str | None]:
|
|
for frame in frames:
|
|
reference_time = frame.get("beijing_time")
|
|
if not reference_time:
|
|
continue
|
|
reference_offset = frame.get("offset_seconds")
|
|
return (
|
|
derive_time_from_reference(
|
|
str(reference_time),
|
|
reference_offset_seconds=reference_offset,
|
|
target_offset_seconds=start,
|
|
),
|
|
derive_time_from_reference(
|
|
str(reference_time),
|
|
reference_offset_seconds=reference_offset,
|
|
target_offset_seconds=end,
|
|
),
|
|
)
|
|
return None, None
|