Files
2026-06-17 22:52:54 +08:00

151 lines
4.7 KiB
Python

from __future__ import annotations
import json
from typing import Any
def extract_json_payload(raw_response: str) -> dict[str, Any]:
text = raw_response.strip()
if not text:
raise ValueError("JSON payload is empty")
try:
payload = json.loads(text)
if isinstance(payload, dict):
return payload
except json.JSONDecodeError:
pass
decoder = json.JSONDecoder()
for index, char in enumerate(text):
if char != "{":
continue
try:
payload, _ = decoder.raw_decode(text[index:])
except json.JSONDecodeError:
continue
if isinstance(payload, dict):
return payload
raise ValueError("JSON object not found in model response")
def build_clip_result(
raw_response: str,
clip_record: dict[str, Any],
video_record: dict[str, Any] | None,
config: dict[str, Any],
*,
processing: dict[str, Any] | None = None,
status: str | None = None,
error: str | None = None,
) -> dict[str, Any]:
processing_record = dict(processing or {})
if status is not None:
payload: dict[str, Any] = {}
result_status = status
result_error = error
else:
try:
payload = extract_json_payload(raw_response)
result_status = "ok"
result_error = None
except ValueError as exc:
payload = {}
result_status = "parse_failed"
result_error = str(exc)
timeline = _timeline(clip_record, config, payload)
return {
"schema_version": config.get("schema", {}).get("version", "local-batch-v1"),
"video_id": str(clip_record.get("video_id")),
"video_path": _video_path(video_record),
"clip_id": str(clip_record.get("clip_id")),
"status": result_status,
"monitoring_timeline": timeline,
"events": _events(payload, clip_record) if result_status == "ok" else [],
"qsc_events": _qsc_events(payload) if result_status == "ok" else [],
"raw_response": raw_response,
"processing": processing_record,
"error": result_error,
}
def _timeline(
clip_record: dict[str, Any],
config: dict[str, Any],
payload: dict[str, Any],
) -> dict[str, Any]:
return {
"timezone": config.get("runtime", {}).get("timezone", "Asia/Shanghai"),
"video_start_time": clip_record.get("video_start_time"),
"clip_start_seconds": clip_record.get("clip_start_seconds"),
"clip_end_seconds": clip_record.get("clip_end_seconds"),
"clip_start_timecode": clip_record.get("clip_start_timecode"),
"clip_end_timecode": clip_record.get("clip_end_timecode"),
"clip_start_beijing_time": clip_record.get("clip_start_beijing_time"),
"clip_end_beijing_time": clip_record.get("clip_end_beijing_time"),
"frame_times": clip_record.get("frame_times", []),
"screen_time": str(
payload.get("screen_time") or payload.get("画面时间") or payload.get("时间") or ""
),
}
def _events(
payload: dict[str, Any],
clip_record: dict[str, Any],
) -> list[dict[str, Any]]:
raw_events = payload.get("events") or []
if not isinstance(raw_events, list):
return []
return [
_event(event, clip_record)
for event in raw_events
if isinstance(event, dict)
]
def _event(
event: dict[str, Any],
clip_record: dict[str, Any],
) -> dict[str, Any]:
normalized = dict(event)
normalized.setdefault("event_type", "unknown")
normalized.setdefault("start_time", None)
normalized.setdefault("end_time", None)
normalized.setdefault("start_offset_seconds", clip_record.get("clip_start_seconds"))
normalized.setdefault("end_offset_seconds", clip_record.get("clip_end_seconds"))
normalized.setdefault("confidence", None)
normalized.setdefault("severity", None)
normalized.setdefault("attributes", {})
normalized.setdefault(
"evidence",
{
"clip_id": clip_record.get("clip_id"),
"frame_paths": [
frame.get("frame_path")
for frame in clip_record.get("frame_times", [])
if frame.get("frame_path")
],
},
)
return normalized
def _qsc_events(payload: dict[str, Any]) -> list[dict[str, Any]]:
raw_events = payload.get("qsc_events") or []
if not isinstance(raw_events, list):
return []
return [
dict(event)
for event in raw_events
if isinstance(event, dict)
]
def _video_path(video_record: dict[str, Any] | None) -> str | None:
if not video_record:
return None
value = video_record.get("path") or video_record.get("source_path")
return str(value) if value is not None else None