from __future__ import annotations import argparse import json import mimetypes import subprocess from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer from pathlib import Path from urllib.parse import unquote ROOT = Path(__file__).resolve().parent class CalibratorHandler(BaseHTTPRequestHandler): server_version = "ColdDisplayCalibrator/0.1" def do_GET(self) -> None: path = "/" if self.path == "/" else unquote(self.path.split("?", 1)[0]) if path == "/": self._send_file(ROOT / "index.html") return target = (ROOT / path.lstrip("/")).resolve() if ROOT not in target.parents or not target.is_file(): self.send_error(404) return self._send_file(target) def do_POST(self) -> None: if self.path != "/api/capture": self.send_error(404) return try: payload = self._read_json() rtsp_url = str(payload.get("rtsp_url", "")).strip() timeout_seconds = float(payload.get("timeout_seconds", 10)) if not rtsp_url.lower().startswith("rtsp://"): self._send_json({"error": "rtsp_url must start with rtsp://"}, status=400) return image = capture_rtsp_frame(rtsp_url, timeout_seconds) except CaptureError as exc: self._send_json({"error": str(exc)}, status=502) return except (ValueError, json.JSONDecodeError) as exc: self._send_json({"error": f"invalid request: {exc}"}, status=400) return self.send_response(200) self.send_header("Content-Type", "image/jpeg") self.send_header("Content-Length", str(len(image))) self.send_header("Cache-Control", "no-store") self.end_headers() self.wfile.write(image) def log_message(self, format: str, *args: object) -> None: print(f"{self.address_string()} - {format % args}") def _read_json(self) -> dict: length = int(self.headers.get("Content-Length", "0")) return json.loads(self.rfile.read(length).decode("utf-8")) def _send_file(self, path: Path) -> None: data = path.read_bytes() content_type = mimetypes.guess_type(path.name)[0] or "application/octet-stream" self.send_response(200) self.send_header("Content-Type", content_type) self.send_header("Content-Length", str(len(data))) self.send_header("Cache-Control", "no-store") self.end_headers() self.wfile.write(data) def _send_json(self, payload: dict, status: int = 200) -> None: data = json.dumps(payload, ensure_ascii=False).encode("utf-8") self.send_response(status) self.send_header("Content-Type", "application/json; charset=utf-8") self.send_header("Content-Length", str(len(data))) self.send_header("Cache-Control", "no-store") self.end_headers() self.wfile.write(data) class CaptureError(RuntimeError): pass def capture_rtsp_frame(rtsp_url: str, timeout_seconds: float) -> bytes: command = [ "ffmpeg", "-hide_banner", "-loglevel", "error", "-rtsp_transport", "tcp", "-i", rtsp_url, "-frames:v", "1", "-f", "image2pipe", "-vcodec", "mjpeg", "-", ] try: result = subprocess.run( command, check=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE, timeout=max(1.0, timeout_seconds), ) except FileNotFoundError as exc: raise CaptureError("ffmpeg not found; install ffmpeg first") from exc except subprocess.TimeoutExpired as exc: raise CaptureError(f"ffmpeg timed out after {timeout_seconds:g}s") from exc if result.returncode != 0: message = result.stderr.decode("utf-8", errors="replace").strip() raise CaptureError(message or f"ffmpeg exited with code {result.returncode}") if not result.stdout: raise CaptureError("ffmpeg returned no image data") return result.stdout def main() -> int: parser = argparse.ArgumentParser(description="RTSP snapshot calibration web tool.") parser.add_argument("--host", default="127.0.0.1") parser.add_argument("--port", default=18090, type=int) args = parser.parse_args() server = ThreadingHTTPServer((args.host, args.port), CalibratorHandler) print(f"Calibration server: http://{args.host}:{args.port}") try: server.serve_forever() except KeyboardInterrupt: print("\nStopping calibration server") finally: server.server_close() return 0 if __name__ == "__main__": raise SystemExit(main())