Files
2026-04-27 11:12:00 +08:00

145 lines
4.7 KiB
Python

from __future__ import annotations
import argparse
import json
import mimetypes
import subprocess
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
from pathlib import Path
from urllib.parse import unquote
ROOT = Path(__file__).resolve().parent
class CalibratorHandler(BaseHTTPRequestHandler):
server_version = "ColdDisplayCalibrator/0.1"
def do_GET(self) -> None:
path = "/" if self.path == "/" else unquote(self.path.split("?", 1)[0])
if path == "/":
self._send_file(ROOT / "index.html")
return
target = (ROOT / path.lstrip("/")).resolve()
if ROOT not in target.parents or not target.is_file():
self.send_error(404)
return
self._send_file(target)
def do_POST(self) -> None:
if self.path != "/api/capture":
self.send_error(404)
return
try:
payload = self._read_json()
rtsp_url = str(payload.get("rtsp_url", "")).strip()
timeout_seconds = float(payload.get("timeout_seconds", 10))
if not rtsp_url.lower().startswith("rtsp://"):
self._send_json({"error": "rtsp_url must start with rtsp://"}, status=400)
return
image = capture_rtsp_frame(rtsp_url, timeout_seconds)
except CaptureError as exc:
self._send_json({"error": str(exc)}, status=502)
return
except (ValueError, json.JSONDecodeError) as exc:
self._send_json({"error": f"invalid request: {exc}"}, status=400)
return
self.send_response(200)
self.send_header("Content-Type", "image/jpeg")
self.send_header("Content-Length", str(len(image)))
self.send_header("Cache-Control", "no-store")
self.end_headers()
self.wfile.write(image)
def log_message(self, format: str, *args: object) -> None:
print(f"{self.address_string()} - {format % args}")
def _read_json(self) -> dict:
length = int(self.headers.get("Content-Length", "0"))
return json.loads(self.rfile.read(length).decode("utf-8"))
def _send_file(self, path: Path) -> None:
data = path.read_bytes()
content_type = mimetypes.guess_type(path.name)[0] or "application/octet-stream"
self.send_response(200)
self.send_header("Content-Type", content_type)
self.send_header("Content-Length", str(len(data)))
self.send_header("Cache-Control", "no-store")
self.end_headers()
self.wfile.write(data)
def _send_json(self, payload: dict, status: int = 200) -> None:
data = json.dumps(payload, ensure_ascii=False).encode("utf-8")
self.send_response(status)
self.send_header("Content-Type", "application/json; charset=utf-8")
self.send_header("Content-Length", str(len(data)))
self.send_header("Cache-Control", "no-store")
self.end_headers()
self.wfile.write(data)
class CaptureError(RuntimeError):
pass
def capture_rtsp_frame(rtsp_url: str, timeout_seconds: float) -> bytes:
command = [
"ffmpeg",
"-hide_banner",
"-loglevel",
"error",
"-rtsp_transport",
"tcp",
"-i",
rtsp_url,
"-frames:v",
"1",
"-f",
"image2pipe",
"-vcodec",
"mjpeg",
"-",
]
try:
result = subprocess.run(
command,
check=False,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
timeout=max(1.0, timeout_seconds),
)
except FileNotFoundError as exc:
raise CaptureError("ffmpeg not found; install ffmpeg first") from exc
except subprocess.TimeoutExpired as exc:
raise CaptureError(f"ffmpeg timed out after {timeout_seconds:g}s") from exc
if result.returncode != 0:
message = result.stderr.decode("utf-8", errors="replace").strip()
raise CaptureError(message or f"ffmpeg exited with code {result.returncode}")
if not result.stdout:
raise CaptureError("ffmpeg returned no image data")
return result.stdout
def main() -> int:
parser = argparse.ArgumentParser(description="RTSP snapshot calibration web tool.")
parser.add_argument("--host", default="127.0.0.1")
parser.add_argument("--port", default=18090, type=int)
args = parser.parse_args()
server = ThreadingHTTPServer((args.host, args.port), CalibratorHandler)
print(f"Calibration server: http://{args.host}:{args.port}")
try:
server.serve_forever()
except KeyboardInterrupt:
print("\nStopping calibration server")
finally:
server.server_close()
return 0
if __name__ == "__main__":
raise SystemExit(main())