feat: add rtsp snapshot calibrator
This commit is contained in:
144
tools/calibrator/server.py
Normal file
144
tools/calibrator/server.py
Normal file
@@ -0,0 +1,144 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import mimetypes
|
||||
import subprocess
|
||||
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
|
||||
from pathlib import Path
|
||||
from urllib.parse import unquote
|
||||
|
||||
|
||||
ROOT = Path(__file__).resolve().parent
|
||||
|
||||
|
||||
class CalibratorHandler(BaseHTTPRequestHandler):
|
||||
server_version = "ColdDisplayCalibrator/0.1"
|
||||
|
||||
def do_GET(self) -> None:
|
||||
path = "/" if self.path == "/" else unquote(self.path.split("?", 1)[0])
|
||||
if path == "/":
|
||||
self._send_file(ROOT / "index.html")
|
||||
return
|
||||
target = (ROOT / path.lstrip("/")).resolve()
|
||||
if ROOT not in target.parents or not target.is_file():
|
||||
self.send_error(404)
|
||||
return
|
||||
self._send_file(target)
|
||||
|
||||
def do_POST(self) -> None:
|
||||
if self.path != "/api/capture":
|
||||
self.send_error(404)
|
||||
return
|
||||
|
||||
try:
|
||||
payload = self._read_json()
|
||||
rtsp_url = str(payload.get("rtsp_url", "")).strip()
|
||||
timeout_seconds = float(payload.get("timeout_seconds", 10))
|
||||
if not rtsp_url.lower().startswith("rtsp://"):
|
||||
self._send_json({"error": "rtsp_url must start with rtsp://"}, status=400)
|
||||
return
|
||||
image = capture_rtsp_frame(rtsp_url, timeout_seconds)
|
||||
except CaptureError as exc:
|
||||
self._send_json({"error": str(exc)}, status=502)
|
||||
return
|
||||
except (ValueError, json.JSONDecodeError) as exc:
|
||||
self._send_json({"error": f"invalid request: {exc}"}, status=400)
|
||||
return
|
||||
|
||||
self.send_response(200)
|
||||
self.send_header("Content-Type", "image/jpeg")
|
||||
self.send_header("Content-Length", str(len(image)))
|
||||
self.send_header("Cache-Control", "no-store")
|
||||
self.end_headers()
|
||||
self.wfile.write(image)
|
||||
|
||||
def log_message(self, format: str, *args: object) -> None:
|
||||
print(f"{self.address_string()} - {format % args}")
|
||||
|
||||
def _read_json(self) -> dict:
|
||||
length = int(self.headers.get("Content-Length", "0"))
|
||||
return json.loads(self.rfile.read(length).decode("utf-8"))
|
||||
|
||||
def _send_file(self, path: Path) -> None:
|
||||
data = path.read_bytes()
|
||||
content_type = mimetypes.guess_type(path.name)[0] or "application/octet-stream"
|
||||
self.send_response(200)
|
||||
self.send_header("Content-Type", content_type)
|
||||
self.send_header("Content-Length", str(len(data)))
|
||||
self.send_header("Cache-Control", "no-store")
|
||||
self.end_headers()
|
||||
self.wfile.write(data)
|
||||
|
||||
def _send_json(self, payload: dict, status: int = 200) -> None:
|
||||
data = json.dumps(payload, ensure_ascii=False).encode("utf-8")
|
||||
self.send_response(status)
|
||||
self.send_header("Content-Type", "application/json; charset=utf-8")
|
||||
self.send_header("Content-Length", str(len(data)))
|
||||
self.send_header("Cache-Control", "no-store")
|
||||
self.end_headers()
|
||||
self.wfile.write(data)
|
||||
|
||||
|
||||
class CaptureError(RuntimeError):
|
||||
pass
|
||||
|
||||
|
||||
def capture_rtsp_frame(rtsp_url: str, timeout_seconds: float) -> bytes:
|
||||
command = [
|
||||
"ffmpeg",
|
||||
"-hide_banner",
|
||||
"-loglevel",
|
||||
"error",
|
||||
"-rtsp_transport",
|
||||
"tcp",
|
||||
"-i",
|
||||
rtsp_url,
|
||||
"-frames:v",
|
||||
"1",
|
||||
"-f",
|
||||
"image2pipe",
|
||||
"-vcodec",
|
||||
"mjpeg",
|
||||
"-",
|
||||
]
|
||||
try:
|
||||
result = subprocess.run(
|
||||
command,
|
||||
check=False,
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.PIPE,
|
||||
timeout=max(1.0, timeout_seconds),
|
||||
)
|
||||
except FileNotFoundError as exc:
|
||||
raise CaptureError("ffmpeg not found; install ffmpeg first") from exc
|
||||
except subprocess.TimeoutExpired as exc:
|
||||
raise CaptureError(f"ffmpeg timed out after {timeout_seconds:g}s") from exc
|
||||
|
||||
if result.returncode != 0:
|
||||
message = result.stderr.decode("utf-8", errors="replace").strip()
|
||||
raise CaptureError(message or f"ffmpeg exited with code {result.returncode}")
|
||||
if not result.stdout:
|
||||
raise CaptureError("ffmpeg returned no image data")
|
||||
return result.stdout
|
||||
|
||||
|
||||
def main() -> int:
|
||||
parser = argparse.ArgumentParser(description="RTSP snapshot calibration web tool.")
|
||||
parser.add_argument("--host", default="127.0.0.1")
|
||||
parser.add_argument("--port", default=18090, type=int)
|
||||
args = parser.parse_args()
|
||||
|
||||
server = ThreadingHTTPServer((args.host, args.port), CalibratorHandler)
|
||||
print(f"Calibration server: http://{args.host}:{args.port}")
|
||||
try:
|
||||
server.serve_forever()
|
||||
except KeyboardInterrupt:
|
||||
print("\nStopping calibration server")
|
||||
finally:
|
||||
server.server_close()
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
raise SystemExit(main())
|
||||
Reference in New Issue
Block a user