from __future__ import annotations import base64 import json import time import urllib.request from pathlib import Path from typing import Any, Callable HttpPost = Callable[[str, dict[str, Any], int], dict[str, Any]] def infer_clip( clip_record: dict[str, Any], output_dir: str | Path, vlm_config: dict[str, Any], prompt_config: dict[str, Any], *, http_post: HttpPost | None = None, ) -> dict[str, Any]: start = time.monotonic() client = http_post or _post_json url = build_chat_url(vlm_config) payload = build_payload(clip_record, output_dir, vlm_config, prompt_config) response = client(url, payload, int(vlm_config.get("timeout_seconds", 120))) latency_ms = int((time.monotonic() - start) * 1000) return { "raw_response": _extract_message_content(response.get("body")), "http_status": response.get("status"), "latency_ms": latency_ms, } def build_chat_url(vlm_config: dict[str, Any]) -> str: return ( str(vlm_config["api_base_url"]).rstrip("/") + str(vlm_config["chat_completions_path"]) ) def build_payload( clip_record: dict[str, Any], output_dir: str | Path, vlm_config: dict[str, Any], prompt_config: dict[str, Any], ) -> dict[str, Any]: content: list[dict[str, Any]] = [ {"type": "text", "text": str(prompt_config.get("user", ""))} ] for frame in clip_record.get("frame_times", []): frame_path = frame.get("frame_path") if not frame_path: continue content.append( { "type": "image_url", "image_url": { "url": _image_url( frame_path, output_dir, str(vlm_config.get("image_transport", "data_uri")), ) }, } ) return { "model": vlm_config.get("model"), "messages": [ {"role": "system", "content": str(prompt_config.get("system", ""))}, {"role": "user", "content": content}, ], "temperature": vlm_config.get("temperature", 0), "max_tokens": vlm_config.get("max_tokens", 512), } def _image_url( frame_path: str | Path, output_dir: str | Path, image_transport: str, ) -> str: if image_transport != "data_uri": return str(frame_path) path = Path(frame_path).expanduser() if not path.is_absolute(): path = Path(output_dir).expanduser() / path data = base64.b64encode(path.read_bytes()).decode("ascii") return f"data:{_mime_type(path)};base64,{data}" def _mime_type(path: Path) -> str: suffix = path.suffix.lower() if suffix in {".jpg", ".jpeg"}: return "image/jpeg" if suffix == ".png": return "image/png" if suffix == ".webp": return "image/webp" return "application/octet-stream" def _post_json( url: str, payload: dict[str, Any], timeout_seconds: int, ) -> dict[str, Any]: body = json.dumps(payload).encode("utf-8") request = urllib.request.Request( url, data=body, headers={"Content-Type": "application/json"}, method="POST", ) with urllib.request.urlopen(request, timeout=timeout_seconds) as response: response_body = response.read().decode("utf-8") return { "status": response.status, "body": json.loads(response_body) if response_body else {}, } def _extract_message_content(body: Any) -> str: if not isinstance(body, dict): return "" choices = body.get("choices") if not choices: return "" message = choices[0].get("message", {}) if isinstance(choices[0], dict) else {} content = message.get("content", "") if isinstance(content, str): return content return json.dumps(content, ensure_ascii=False)