Files
2026-06-17 11:33:54 +08:00

135 lines
3.8 KiB
Python

from __future__ import annotations
import base64
import json
import time
import urllib.request
from pathlib import Path
from typing import Any, Callable
HttpPost = Callable[[str, dict[str, Any], int], dict[str, Any]]
def infer_clip(
clip_record: dict[str, Any],
output_dir: str | Path,
vlm_config: dict[str, Any],
prompt_config: dict[str, Any],
*,
http_post: HttpPost | None = None,
) -> dict[str, Any]:
start = time.monotonic()
client = http_post or _post_json
url = build_chat_url(vlm_config)
payload = build_payload(clip_record, output_dir, vlm_config, prompt_config)
response = client(url, payload, int(vlm_config.get("timeout_seconds", 120)))
latency_ms = int((time.monotonic() - start) * 1000)
return {
"raw_response": _extract_message_content(response.get("body")),
"http_status": response.get("status"),
"latency_ms": latency_ms,
}
def build_chat_url(vlm_config: dict[str, Any]) -> str:
return (
str(vlm_config["api_base_url"]).rstrip("/")
+ str(vlm_config["chat_completions_path"])
)
def build_payload(
clip_record: dict[str, Any],
output_dir: str | Path,
vlm_config: dict[str, Any],
prompt_config: dict[str, Any],
) -> dict[str, Any]:
content: list[dict[str, Any]] = [
{"type": "text", "text": str(prompt_config.get("user", ""))}
]
for frame in clip_record.get("frame_times", []):
frame_path = frame.get("frame_path")
if not frame_path:
continue
content.append(
{
"type": "image_url",
"image_url": {
"url": _image_url(
frame_path,
output_dir,
str(vlm_config.get("image_transport", "data_uri")),
)
},
}
)
return {
"model": vlm_config.get("model"),
"messages": [
{"role": "system", "content": str(prompt_config.get("system", ""))},
{"role": "user", "content": content},
],
"temperature": vlm_config.get("temperature", 0),
"max_tokens": vlm_config.get("max_tokens", 512),
}
def _image_url(
frame_path: str | Path,
output_dir: str | Path,
image_transport: str,
) -> str:
if image_transport != "data_uri":
return str(frame_path)
path = Path(frame_path).expanduser()
if not path.is_absolute():
path = Path(output_dir).expanduser() / path
data = base64.b64encode(path.read_bytes()).decode("ascii")
return f"data:{_mime_type(path)};base64,{data}"
def _mime_type(path: Path) -> str:
suffix = path.suffix.lower()
if suffix in {".jpg", ".jpeg"}:
return "image/jpeg"
if suffix == ".png":
return "image/png"
if suffix == ".webp":
return "image/webp"
return "application/octet-stream"
def _post_json(
url: str,
payload: dict[str, Any],
timeout_seconds: int,
) -> dict[str, Any]:
body = json.dumps(payload).encode("utf-8")
request = urllib.request.Request(
url,
data=body,
headers={"Content-Type": "application/json"},
method="POST",
)
with urllib.request.urlopen(request, timeout=timeout_seconds) as response:
response_body = response.read().decode("utf-8")
return {
"status": response.status,
"body": json.loads(response_body) if response_body else {},
}
def _extract_message_content(body: Any) -> str:
if not isinstance(body, dict):
return ""
choices = body.get("choices")
if not choices:
return ""
message = choices[0].get("message", {}) if isinstance(choices[0], dict) else {}
content = message.get("content", "")
if isinstance(content, str):
return content
return json.dumps(content, ensure_ascii=False)