145 lines
4.7 KiB
Python
145 lines
4.7 KiB
Python
from __future__ import annotations
|
|
|
|
import argparse
|
|
import json
|
|
import mimetypes
|
|
import subprocess
|
|
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
|
|
from pathlib import Path
|
|
from urllib.parse import unquote
|
|
|
|
|
|
ROOT = Path(__file__).resolve().parent
|
|
|
|
|
|
class CalibratorHandler(BaseHTTPRequestHandler):
|
|
server_version = "ColdDisplayCalibrator/0.1"
|
|
|
|
def do_GET(self) -> None:
|
|
path = "/" if self.path == "/" else unquote(self.path.split("?", 1)[0])
|
|
if path == "/":
|
|
self._send_file(ROOT / "index.html")
|
|
return
|
|
target = (ROOT / path.lstrip("/")).resolve()
|
|
if ROOT not in target.parents or not target.is_file():
|
|
self.send_error(404)
|
|
return
|
|
self._send_file(target)
|
|
|
|
def do_POST(self) -> None:
|
|
if self.path != "/api/capture":
|
|
self.send_error(404)
|
|
return
|
|
|
|
try:
|
|
payload = self._read_json()
|
|
rtsp_url = str(payload.get("rtsp_url", "")).strip()
|
|
timeout_seconds = float(payload.get("timeout_seconds", 10))
|
|
if not rtsp_url.lower().startswith("rtsp://"):
|
|
self._send_json({"error": "rtsp_url must start with rtsp://"}, status=400)
|
|
return
|
|
image = capture_rtsp_frame(rtsp_url, timeout_seconds)
|
|
except CaptureError as exc:
|
|
self._send_json({"error": str(exc)}, status=502)
|
|
return
|
|
except (ValueError, json.JSONDecodeError) as exc:
|
|
self._send_json({"error": f"invalid request: {exc}"}, status=400)
|
|
return
|
|
|
|
self.send_response(200)
|
|
self.send_header("Content-Type", "image/jpeg")
|
|
self.send_header("Content-Length", str(len(image)))
|
|
self.send_header("Cache-Control", "no-store")
|
|
self.end_headers()
|
|
self.wfile.write(image)
|
|
|
|
def log_message(self, format: str, *args: object) -> None:
|
|
print(f"{self.address_string()} - {format % args}")
|
|
|
|
def _read_json(self) -> dict:
|
|
length = int(self.headers.get("Content-Length", "0"))
|
|
return json.loads(self.rfile.read(length).decode("utf-8"))
|
|
|
|
def _send_file(self, path: Path) -> None:
|
|
data = path.read_bytes()
|
|
content_type = mimetypes.guess_type(path.name)[0] or "application/octet-stream"
|
|
self.send_response(200)
|
|
self.send_header("Content-Type", content_type)
|
|
self.send_header("Content-Length", str(len(data)))
|
|
self.send_header("Cache-Control", "no-store")
|
|
self.end_headers()
|
|
self.wfile.write(data)
|
|
|
|
def _send_json(self, payload: dict, status: int = 200) -> None:
|
|
data = json.dumps(payload, ensure_ascii=False).encode("utf-8")
|
|
self.send_response(status)
|
|
self.send_header("Content-Type", "application/json; charset=utf-8")
|
|
self.send_header("Content-Length", str(len(data)))
|
|
self.send_header("Cache-Control", "no-store")
|
|
self.end_headers()
|
|
self.wfile.write(data)
|
|
|
|
|
|
class CaptureError(RuntimeError):
|
|
pass
|
|
|
|
|
|
def capture_rtsp_frame(rtsp_url: str, timeout_seconds: float) -> bytes:
|
|
command = [
|
|
"ffmpeg",
|
|
"-hide_banner",
|
|
"-loglevel",
|
|
"error",
|
|
"-rtsp_transport",
|
|
"tcp",
|
|
"-i",
|
|
rtsp_url,
|
|
"-frames:v",
|
|
"1",
|
|
"-f",
|
|
"image2pipe",
|
|
"-vcodec",
|
|
"mjpeg",
|
|
"-",
|
|
]
|
|
try:
|
|
result = subprocess.run(
|
|
command,
|
|
check=False,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE,
|
|
timeout=max(1.0, timeout_seconds),
|
|
)
|
|
except FileNotFoundError as exc:
|
|
raise CaptureError("ffmpeg not found; install ffmpeg first") from exc
|
|
except subprocess.TimeoutExpired as exc:
|
|
raise CaptureError(f"ffmpeg timed out after {timeout_seconds:g}s") from exc
|
|
|
|
if result.returncode != 0:
|
|
message = result.stderr.decode("utf-8", errors="replace").strip()
|
|
raise CaptureError(message or f"ffmpeg exited with code {result.returncode}")
|
|
if not result.stdout:
|
|
raise CaptureError("ffmpeg returned no image data")
|
|
return result.stdout
|
|
|
|
|
|
def main() -> int:
|
|
parser = argparse.ArgumentParser(description="RTSP snapshot calibration web tool.")
|
|
parser.add_argument("--host", default="127.0.0.1")
|
|
parser.add_argument("--port", default=18090, type=int)
|
|
args = parser.parse_args()
|
|
|
|
server = ThreadingHTTPServer((args.host, args.port), CalibratorHandler)
|
|
print(f"Calibration server: http://{args.host}:{args.port}")
|
|
try:
|
|
server.serve_forever()
|
|
except KeyboardInterrupt:
|
|
print("\nStopping calibration server")
|
|
finally:
|
|
server.server_close()
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|