404 lines
14 KiB
Python
404 lines
14 KiB
Python
from __future__ import annotations
|
|
|
|
import json
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
from .manifest import read_jsonl
|
|
|
|
|
|
def aggregate_outputs(
|
|
output_dir: str | Path,
|
|
config: dict[str, Any],
|
|
) -> dict[str, Any]:
|
|
root = Path(output_dir).expanduser().resolve(strict=False)
|
|
started_at = _now_iso()
|
|
video_records = read_jsonl(root / "video_manifest.jsonl")
|
|
clip_records = read_jsonl(root / "clip_manifest.jsonl")
|
|
clip_results = read_jsonl(root / "clip_results.jsonl")
|
|
|
|
schema_version = str(config.get("schema", {}).get("version", "local-batch-v1"))
|
|
merge_gap_seconds = float(config.get("schema", {}).get("merge_gap_seconds", 30))
|
|
clips_by_video = _group_by_video(clip_records)
|
|
results_by_video = _group_by_video(clip_results)
|
|
|
|
videos_summary = []
|
|
folder_event_counts: dict[str, int] = {}
|
|
processed_video_count = 0
|
|
failed_video_count = 0
|
|
|
|
for video_record in video_records:
|
|
video_id = str(video_record.get("video_id") or "")
|
|
if not video_id:
|
|
continue
|
|
video_clips = clips_by_video.get(video_id, [])
|
|
video_results = results_by_video.get(video_id, [])
|
|
video_result = _build_video_result(
|
|
video_record,
|
|
video_clips,
|
|
video_results,
|
|
schema_version=schema_version,
|
|
merge_gap_seconds=merge_gap_seconds,
|
|
started_at=started_at,
|
|
)
|
|
result_path = root / "videos" / video_id / "video_result.json"
|
|
_write_json(result_path, video_result)
|
|
|
|
failed_clip_count = int(video_result["failed_clip_count"])
|
|
video_failed = video_record.get("status") != "probed" or failed_clip_count > 0
|
|
if video_failed:
|
|
failed_video_count += 1
|
|
else:
|
|
processed_video_count += 1
|
|
for event_type, count in video_result["event_counts"].items():
|
|
folder_event_counts[event_type] = folder_event_counts.get(event_type, 0) + int(count)
|
|
videos_summary.append(
|
|
{
|
|
"video_id": video_id,
|
|
"video_path": video_result["video_path"],
|
|
"status": "failed" if video_failed else "processed",
|
|
"clip_count": video_result["clip_count"],
|
|
"failed_clip_count": failed_clip_count,
|
|
"failed_clip_counts": video_result["failed_clip_counts"],
|
|
"event_counts": video_result["event_counts"],
|
|
"outputs": {"video_result_json": f"videos/{video_id}/video_result.json"},
|
|
"error": video_record.get("last_error"),
|
|
}
|
|
)
|
|
|
|
folder_summary = {
|
|
"schema_version": schema_version,
|
|
"input_dir": str(config.get("input", {}).get("dir")),
|
|
"video_count": len(video_records),
|
|
"processed_video_count": processed_video_count,
|
|
"failed_video_count": failed_video_count,
|
|
"event_counts": dict(sorted(folder_event_counts.items())),
|
|
"videos": videos_summary,
|
|
"processing": {
|
|
"started_at": started_at,
|
|
"finished_at": _now_iso(),
|
|
},
|
|
}
|
|
_write_json(root / "folder_summary.json", folder_summary)
|
|
return folder_summary
|
|
|
|
|
|
def _build_video_result(
|
|
video_record: dict[str, Any],
|
|
clip_records: list[dict[str, Any]],
|
|
clip_results: list[dict[str, Any]],
|
|
*,
|
|
schema_version: str,
|
|
merge_gap_seconds: float,
|
|
started_at: str,
|
|
) -> dict[str, Any]:
|
|
video_id = str(video_record.get("video_id"))
|
|
failed_clip_counts = _failed_clip_counts(clip_results)
|
|
merged_events = _merge_events(_event_records(clip_results), merge_gap_seconds)
|
|
event_counts = _event_counts(merged_events)
|
|
video_duration = _first_present(
|
|
video_record,
|
|
("duration_seconds", "video_duration_seconds", "duration"),
|
|
)
|
|
video_start_time = _video_start_time(video_record, clip_results)
|
|
return {
|
|
"schema_version": schema_version,
|
|
"video_id": video_id,
|
|
"video_path": _video_path(video_record, clip_results),
|
|
"probe": _probe(video_record),
|
|
"monitoring_timeline": {
|
|
"video_start_time": video_start_time,
|
|
"video_duration_seconds": video_duration,
|
|
},
|
|
"clip_count": len(clip_records),
|
|
"failed_clip_count": sum(failed_clip_counts.values()),
|
|
"failed_clip_counts": failed_clip_counts,
|
|
"event_counts": event_counts,
|
|
"events": merged_events,
|
|
"outputs": {"clip_results_jsonl": "clip_results.jsonl"},
|
|
"processing": {
|
|
"started_at": started_at,
|
|
"finished_at": _now_iso(),
|
|
},
|
|
}
|
|
|
|
|
|
def _event_records(clip_results: list[dict[str, Any]]) -> list[dict[str, Any]]:
|
|
records = []
|
|
for result in clip_results:
|
|
if result.get("status") != "ok":
|
|
continue
|
|
timeline = result.get("monitoring_timeline") or {}
|
|
if not isinstance(timeline, dict):
|
|
timeline = {}
|
|
for event in result.get("events") or []:
|
|
if not isinstance(event, dict):
|
|
continue
|
|
event_record = _normalize_event(event, result, timeline)
|
|
records.append(event_record)
|
|
return sorted(
|
|
records,
|
|
key=lambda event: (
|
|
str(event.get("video_id")),
|
|
str(event.get("event_type")),
|
|
float(event.get("start_offset_seconds") or 0),
|
|
float(event.get("end_offset_seconds") or 0),
|
|
),
|
|
)
|
|
|
|
|
|
def _normalize_event(
|
|
event: dict[str, Any],
|
|
result: dict[str, Any],
|
|
timeline: dict[str, Any],
|
|
) -> dict[str, Any]:
|
|
clip_id = str(result.get("clip_id"))
|
|
frame_times = [
|
|
dict(frame)
|
|
for frame in timeline.get("frame_times", [])
|
|
if isinstance(frame, dict)
|
|
]
|
|
frame_paths = [
|
|
str(frame.get("frame_path"))
|
|
for frame in frame_times
|
|
if frame.get("frame_path") is not None
|
|
]
|
|
start = event.get("start_offset_seconds", timeline.get("clip_start_seconds"))
|
|
end = event.get("end_offset_seconds", timeline.get("clip_end_seconds"))
|
|
screen_time = str(timeline.get("screen_time") or "")
|
|
normalized = {
|
|
"video_id": str(result.get("video_id")),
|
|
"event_type": str(event.get("event_type") or "unknown"),
|
|
"start_time": event.get("start_time"),
|
|
"end_time": event.get("end_time"),
|
|
"start_offset_seconds": _float_or_none(start),
|
|
"end_offset_seconds": _float_or_none(end),
|
|
"confidence": event.get("confidence"),
|
|
"severity": event.get("severity"),
|
|
"attributes": event.get("attributes") if isinstance(event.get("attributes"), dict) else {},
|
|
"screen_times": [screen_time] if screen_time else [],
|
|
"evidence": {
|
|
"clip_ids": [clip_id],
|
|
"frame_paths": frame_paths,
|
|
"frame_times": frame_times,
|
|
"clips": [
|
|
{
|
|
"clip_id": clip_id,
|
|
"clip_start_seconds": timeline.get("clip_start_seconds"),
|
|
"clip_end_seconds": timeline.get("clip_end_seconds"),
|
|
"clip_start_timecode": timeline.get("clip_start_timecode"),
|
|
"clip_end_timecode": timeline.get("clip_end_timecode"),
|
|
"clip_start_beijing_time": timeline.get("clip_start_beijing_time"),
|
|
"clip_end_beijing_time": timeline.get("clip_end_beijing_time"),
|
|
"screen_time": screen_time,
|
|
}
|
|
],
|
|
},
|
|
"source_event_count": 1,
|
|
}
|
|
original_evidence = event.get("evidence")
|
|
if isinstance(original_evidence, dict):
|
|
original_clip_id = original_evidence.get("clip_id")
|
|
if original_clip_id:
|
|
normalized["evidence"]["clip_ids"] = _unique(
|
|
[*normalized["evidence"]["clip_ids"], str(original_clip_id)]
|
|
)
|
|
original_frame_paths = original_evidence.get("frame_paths")
|
|
if isinstance(original_frame_paths, list):
|
|
normalized["evidence"]["frame_paths"] = _unique(
|
|
[*normalized["evidence"]["frame_paths"], *map(str, original_frame_paths)]
|
|
)
|
|
return normalized
|
|
|
|
|
|
def _merge_events(
|
|
events: list[dict[str, Any]],
|
|
merge_gap_seconds: float,
|
|
) -> list[dict[str, Any]]:
|
|
merged: list[dict[str, Any]] = []
|
|
for event in events:
|
|
if not merged or not _can_merge(merged[-1], event, merge_gap_seconds):
|
|
merged.append(_copy_event(event))
|
|
continue
|
|
_merge_into(merged[-1], event)
|
|
for event in merged:
|
|
event.pop("video_id", None)
|
|
return merged
|
|
|
|
|
|
def _can_merge(
|
|
previous: dict[str, Any],
|
|
current: dict[str, Any],
|
|
merge_gap_seconds: float,
|
|
) -> bool:
|
|
if previous.get("video_id") != current.get("video_id"):
|
|
return False
|
|
if previous.get("event_type") != current.get("event_type"):
|
|
return False
|
|
previous_end = _float_or_none(previous.get("end_offset_seconds"))
|
|
current_start = _float_or_none(current.get("start_offset_seconds"))
|
|
if previous_end is None or current_start is None:
|
|
return False
|
|
return current_start - previous_end <= merge_gap_seconds
|
|
|
|
|
|
def _merge_into(target: dict[str, Any], event: dict[str, Any]) -> None:
|
|
target["start_offset_seconds"] = _min_number(
|
|
target.get("start_offset_seconds"),
|
|
event.get("start_offset_seconds"),
|
|
)
|
|
target["end_offset_seconds"] = _max_number(
|
|
target.get("end_offset_seconds"),
|
|
event.get("end_offset_seconds"),
|
|
)
|
|
target["screen_times"] = _unique(
|
|
[*target.get("screen_times", []), *event.get("screen_times", [])]
|
|
)
|
|
target["source_event_count"] = int(target.get("source_event_count", 1)) + int(
|
|
event.get("source_event_count", 1)
|
|
)
|
|
target["evidence"]["clip_ids"] = _unique(
|
|
[*target["evidence"].get("clip_ids", []), *event["evidence"].get("clip_ids", [])]
|
|
)
|
|
target["evidence"]["frame_paths"] = _unique(
|
|
[
|
|
*target["evidence"].get("frame_paths", []),
|
|
*event["evidence"].get("frame_paths", []),
|
|
]
|
|
)
|
|
target["evidence"]["frame_times"].extend(event["evidence"].get("frame_times", []))
|
|
target["evidence"]["clips"].extend(event["evidence"].get("clips", []))
|
|
if target.get("confidence") is None:
|
|
target["confidence"] = event.get("confidence")
|
|
elif event.get("confidence") is not None:
|
|
target["confidence"] = max(float(target["confidence"]), float(event["confidence"]))
|
|
|
|
|
|
def _copy_event(event: dict[str, Any]) -> dict[str, Any]:
|
|
copied = dict(event)
|
|
copied["screen_times"] = list(event.get("screen_times", []))
|
|
copied["attributes"] = dict(event.get("attributes", {}))
|
|
copied["evidence"] = {
|
|
"clip_ids": list(event["evidence"].get("clip_ids", [])),
|
|
"frame_paths": list(event["evidence"].get("frame_paths", [])),
|
|
"frame_times": [dict(frame) for frame in event["evidence"].get("frame_times", [])],
|
|
"clips": [dict(clip) for clip in event["evidence"].get("clips", [])],
|
|
}
|
|
return copied
|
|
|
|
|
|
def _group_by_video(records: list[dict[str, Any]]) -> dict[str, list[dict[str, Any]]]:
|
|
grouped: dict[str, list[dict[str, Any]]] = {}
|
|
for record in records:
|
|
video_id = record.get("video_id")
|
|
if video_id:
|
|
grouped.setdefault(str(video_id), []).append(record)
|
|
return grouped
|
|
|
|
|
|
def _failed_clip_counts(clip_results: list[dict[str, Any]]) -> dict[str, int]:
|
|
counts = {"parse_failed": 0, "inference_failed": 0}
|
|
for result in clip_results:
|
|
status = result.get("status")
|
|
if status in counts:
|
|
counts[str(status)] += 1
|
|
return counts
|
|
|
|
|
|
def _event_counts(events: list[dict[str, Any]]) -> dict[str, int]:
|
|
counts: dict[str, int] = {}
|
|
for event in events:
|
|
event_type = str(event.get("event_type") or "unknown")
|
|
counts[event_type] = counts.get(event_type, 0) + 1
|
|
return dict(sorted(counts.items()))
|
|
|
|
|
|
def _probe(video_record: dict[str, Any]) -> dict[str, Any]:
|
|
excluded = {"video_id", "path", "source_path", "status", "retry_count", "last_error"}
|
|
probe = {
|
|
key: value
|
|
for key, value in video_record.items()
|
|
if key not in excluded
|
|
}
|
|
probe["status"] = video_record.get("status")
|
|
if video_record.get("last_error") is not None:
|
|
probe["last_error"] = video_record.get("last_error")
|
|
return probe
|
|
|
|
|
|
def _video_path(
|
|
video_record: dict[str, Any],
|
|
clip_results: list[dict[str, Any]],
|
|
) -> str | None:
|
|
path = video_record.get("path") or video_record.get("source_path")
|
|
if path is not None:
|
|
return str(path)
|
|
for result in clip_results:
|
|
if result.get("video_path") is not None:
|
|
return str(result["video_path"])
|
|
return None
|
|
|
|
|
|
def _video_start_time(
|
|
video_record: dict[str, Any],
|
|
clip_results: list[dict[str, Any]],
|
|
) -> Any:
|
|
if video_record.get("video_start_time") is not None:
|
|
return video_record.get("video_start_time")
|
|
for result in clip_results:
|
|
timeline = result.get("monitoring_timeline")
|
|
if isinstance(timeline, dict) and timeline.get("video_start_time") is not None:
|
|
return timeline.get("video_start_time")
|
|
return None
|
|
|
|
|
|
def _first_present(record: dict[str, Any], keys: tuple[str, ...]) -> Any:
|
|
for key in keys:
|
|
if record.get(key) is not None:
|
|
return record.get(key)
|
|
return None
|
|
|
|
|
|
def _float_or_none(value: Any) -> float | None:
|
|
if value is None:
|
|
return None
|
|
try:
|
|
return float(value)
|
|
except (TypeError, ValueError):
|
|
return None
|
|
|
|
|
|
def _min_number(left: Any, right: Any) -> float | None:
|
|
values = [value for value in (_float_or_none(left), _float_or_none(right)) if value is not None]
|
|
return min(values) if values else None
|
|
|
|
|
|
def _max_number(left: Any, right: Any) -> float | None:
|
|
values = [value for value in (_float_or_none(left), _float_or_none(right)) if value is not None]
|
|
return max(values) if values else None
|
|
|
|
|
|
def _unique(values: list[Any]) -> list[Any]:
|
|
seen = set()
|
|
unique_values = []
|
|
for value in values:
|
|
marker = json.dumps(value, sort_keys=True) if isinstance(value, dict) else value
|
|
if marker in seen:
|
|
continue
|
|
seen.add(marker)
|
|
unique_values.append(value)
|
|
return unique_values
|
|
|
|
|
|
def _write_json(path: Path, payload: dict[str, Any]) -> None:
|
|
path.parent.mkdir(parents=True, exist_ok=True)
|
|
path.write_text(
|
|
json.dumps(payload, ensure_ascii=False, indent=2, sort_keys=True) + "\n",
|
|
encoding="utf-8",
|
|
)
|
|
|
|
|
|
def _now_iso() -> str:
|
|
return datetime.now(timezone.utc).isoformat()
|