From b1c39d3fa7f7185028e885de6a0bde20ecd875c1 Mon Sep 17 00:00:00 2001 From: Yoilun Date: Tue, 28 Apr 2026 19:00:23 +0800 Subject: [PATCH] feat: add rtsp runtime pipeline --- .gitignore | 1 + README_zh.md | 41 +++++ pyproject.toml | 1 + scripts/run_runtime.sh | 6 + src/cold_display_guard/config.py | 13 ++ src/cold_display_guard/frame_source.py | 60 +++++++ src/cold_display_guard/main.py | 122 +++++++++++++++ src/cold_display_guard/vision.py | 208 +++++++++++++++++++++++++ tests/test_vision.py | 72 +++++++++ 9 files changed, 524 insertions(+) create mode 100755 scripts/run_runtime.sh create mode 100644 src/cold_display_guard/frame_source.py create mode 100644 src/cold_display_guard/main.py create mode 100644 src/cold_display_guard/vision.py create mode 100644 tests/test_vision.py diff --git a/.gitignore b/.gitignore index a71bcb5..e6ab47b 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,6 @@ __pycache__/ *.py[cod] +.DS_Store .pytest_cache/ .venv/ dist/ diff --git a/README_zh.md b/README_zh.md index d1bac16..5d7522a 100644 --- a/README_zh.md +++ b/README_zh.md @@ -98,6 +98,47 @@ http://127.0.0.1:19080 - `GET /api/manage/summary` - `GET /api/manage/events` +## 运行识别计时进程 + +管理页只负责配置和查看数据。要产生数据,还需要启动运行进程: + +```bash +scripts/run_runtime.sh +``` + +运行进程会: + +1. 按配置读取 RTSP。 +2. 用 `ffmpeg` 周期抓取小尺寸 RGB 帧。 +3. 按标定区域做占用变化检测。 +4. 判断垃圾桶区域是否有明显投放动作。 +5. 调用批次计时状态机。 +6. 写入 `logs/events.jsonl`,管理页会读取这个文件。 + +当前视觉版本是可运行的启发式版本: + +- 每个格口输出 `0/1` 占用状态,不识别单份数量。 +- 启动后的前几帧用于建立空柜基线,默认 `3` 帧。 +- 如果启动时格口里已经有食品,系统会把它当作基线,后续要等画面变化后才会产生计时事件。 +- 真实生产精度后续应接食品检测模型。 + +可选运行参数可以放在配置文件的 `[runtime]` 中: + +```toml +[runtime] +sample_interval_seconds = 5.0 +frame_width = 640 +frame_height = 360 +capture_timeout_seconds = 12.0 +baseline_frames = 3 +sample_stride_pixels = 8 +occupancy_mean_delta = 24.0 +occupancy_texture_delta = 18.0 +trash_motion_delta = 18.0 +trash_motion_cooldown_seconds = 8 +diagnostics_path = "logs/runtime_diagnostics.jsonl" +``` + ## 本地测试 ```bash diff --git a/pyproject.toml b/pyproject.toml index 1604743..f3ddffa 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -13,6 +13,7 @@ dependencies = [] [project.scripts] cold-display-guard = "cold_display_guard.cli:main" cold-display-guard-manage = "cold_display_guard.manage_api:main" +cold-display-guard-run = "cold_display_guard.main:main" [tool.setuptools.packages.find] where = ["src"] diff --git a/scripts/run_runtime.sh b/scripts/run_runtime.sh new file mode 100755 index 0000000..8e2b1ea --- /dev/null +++ b/scripts/run_runtime.sh @@ -0,0 +1,6 @@ +#!/usr/bin/env bash +set -euo pipefail + +CONFIG_PATH="${CONFIG_PATH:-config/example.toml}" + +PYTHONPATH=src python3 -m cold_display_guard.main --config "$CONFIG_PATH" diff --git a/src/cold_display_guard/config.py b/src/cold_display_guard/config.py index 03363b9..3aca8c0 100644 --- a/src/cold_display_guard/config.py +++ b/src/cold_display_guard/config.py @@ -109,6 +109,19 @@ def format_config_document(data: dict[str, Any]) -> str: lines.append(f'trash_confirmation_seconds = {int(thresholds.get("trash_confirmation_seconds", 120))}') lines.append("") + runtime = data.get("runtime", {}) + if runtime: + lines.append("[runtime]") + for key in sorted(runtime): + value = runtime[key] + if isinstance(value, bool): + lines.append(f"{key} = {str(value).lower()}") + elif isinstance(value, int | float): + lines.append(f"{key} = {value}") + else: + lines.append(f'{key} = "{_escape(str(value))}"') + lines.append("") + layout = data.get("layout", {}) zone_ids = [str(item) for item in layout.get("zone_ids", DEFAULT_ZONE_IDS)] rows = int(layout.get("rows", 2)) diff --git a/src/cold_display_guard/frame_source.py b/src/cold_display_guard/frame_source.py new file mode 100644 index 0000000..8a114a7 --- /dev/null +++ b/src/cold_display_guard/frame_source.py @@ -0,0 +1,60 @@ +from __future__ import annotations + +import subprocess +from dataclasses import dataclass + +from cold_display_guard.vision import Frame + + +class FrameCaptureError(RuntimeError): + pass + + +@dataclass(frozen=True, slots=True) +class RTSPFrameSource: + rtsp_url: str + width: int = 640 + height: int = 360 + timeout_seconds: float = 12.0 + + def capture(self) -> Frame: + command = [ + "ffmpeg", + "-hide_banner", + "-loglevel", + "error", + "-rtsp_transport", + "tcp", + "-i", + self.rtsp_url, + "-frames:v", + "1", + "-vf", + f"scale={self.width}:{self.height}", + "-f", + "rawvideo", + "-pix_fmt", + "rgb24", + "-", + ] + try: + result = subprocess.run( + command, + check=False, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + timeout=max(1.0, self.timeout_seconds), + ) + except FileNotFoundError as exc: + raise FrameCaptureError("ffmpeg not found; install ffmpeg first") from exc + except subprocess.TimeoutExpired as exc: + raise FrameCaptureError(f"ffmpeg timed out after {self.timeout_seconds:g}s") from exc + + if result.returncode != 0: + message = result.stderr.decode("utf-8", errors="replace").strip() + raise FrameCaptureError(message or f"ffmpeg exited with code {result.returncode}") + + expected_size = self.width * self.height * 3 + if len(result.stdout) != expected_size: + raise FrameCaptureError(f"expected {expected_size} RGB bytes, got {len(result.stdout)}") + return Frame(width=self.width, height=self.height, rgb=result.stdout) diff --git a/src/cold_display_guard/main.py b/src/cold_display_guard/main.py new file mode 100644 index 0000000..6657435 --- /dev/null +++ b/src/cold_display_guard/main.py @@ -0,0 +1,122 @@ +from __future__ import annotations + +import argparse +import json +import time +from datetime import datetime +from pathlib import Path +from zoneinfo import ZoneInfo + +from cold_display_guard.config import load_config_document, load_settings, resolve_config_path, resolve_project_root +from cold_display_guard.engine import BatchEngine +from cold_display_guard.frame_source import FrameCaptureError, RTSPFrameSource +from cold_display_guard.models import Observation +from cold_display_guard.vision import ZoneOccupancyDetector, load_regions, load_runtime_vision_settings + + +def main() -> int: + args = parse_args().parse_args() + run(args.config, once=args.once, max_iterations=args.max_iterations) + return 0 + + +def parse_args() -> argparse.ArgumentParser: + parser = argparse.ArgumentParser(description="Run Cold Display Guard RTSP batch monitor") + parser.add_argument("--config", default=str(resolve_config_path(None)), help="Path to TOML config") + parser.add_argument("--once", action="store_true", help="Process one frame and exit") + parser.add_argument("--max-iterations", type=int, default=0, help="Stop after N frames; 0 means forever") + return parser + + +def run(config_path: str | Path, once: bool = False, max_iterations: int = 0) -> None: + resolved_config = resolve_config_path(config_path) + project_root = resolve_project_root(resolved_config) + config = load_config_document(resolved_config) + settings = load_settings(resolved_config) + runtime = config.get("runtime", {}) + stream = config.get("stream", {}) + rtsp_url = str(stream.get("rtsp_url", "")).strip() + if not rtsp_url: + raise ValueError("stream.rtsp_url is required") + + regions, trash_region = load_regions(config) + if not regions: + raise ValueError("at least one [[zones]] polygon is required") + + timezone = ZoneInfo(str(config.get("timezone", "Asia/Shanghai"))) + event_path = resolve_project_path(project_root, str(config.get("event_sink", {}).get("path", "logs/events.jsonl"))) + diagnostics_path = resolve_project_path(project_root, str(runtime.get("diagnostics_path", "logs/runtime_diagnostics.jsonl"))) + sample_interval_seconds = max(0.1, float(runtime.get("sample_interval_seconds", 5.0))) + frame_width = max(64, int(runtime.get("frame_width", 640))) + frame_height = max(64, int(runtime.get("frame_height", 360))) + capture_timeout_seconds = max(1.0, float(runtime.get("capture_timeout_seconds", 12.0))) + + source = RTSPFrameSource( + rtsp_url=rtsp_url, + width=frame_width, + height=frame_height, + timeout_seconds=capture_timeout_seconds, + ) + detector = ZoneOccupancyDetector(regions, trash_region, load_runtime_vision_settings(config)) + engine = BatchEngine(settings) + + event_path.parent.mkdir(parents=True, exist_ok=True) + diagnostics_path.parent.mkdir(parents=True, exist_ok=True) + print(f"Cold Display Guard runtime started") + print(f"Config: {resolved_config}") + print(f"Events: {event_path}") + print(f"Diagnostics: {diagnostics_path}") + + iteration = 0 + while True: + iteration += 1 + when = datetime.now(timezone) + try: + frame = source.capture() + zone_counts, trash_deposit_count, diagnostics = detector.observe(frame, when) + observation = Observation(ts=when, zone_counts=zone_counts, trash_deposit_count=trash_deposit_count) + events = engine.process(observation) + append_jsonl(event_path, events) + append_jsonl( + diagnostics_path, + [ + { + "ts": when.isoformat(), + "zone_counts": zone_counts, + "trash_deposit_count": trash_deposit_count, + "diagnostics": diagnostics, + } + ], + ) + if events: + print(f"{when.isoformat()} wrote {len(events)} event(s)") + except FrameCaptureError as exc: + append_jsonl( + diagnostics_path, + [{"ts": when.isoformat(), "error": "frame_capture_failed", "message": str(exc)}], + ) + print(f"{when.isoformat()} frame capture failed: {exc}") + + if once or (max_iterations > 0 and iteration >= max_iterations): + break + time.sleep(sample_interval_seconds) + + +def resolve_project_path(project_root: Path, raw_path: str) -> Path: + path = Path(raw_path).expanduser() + if not path.is_absolute(): + path = project_root / path + return path.resolve() + + +def append_jsonl(path: Path, payloads: list[dict]) -> None: + if not payloads: + return + with path.open("a", encoding="utf-8") as handle: + for payload in payloads: + handle.write(json.dumps(payload, ensure_ascii=False, sort_keys=True)) + handle.write("\n") + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/src/cold_display_guard/vision.py b/src/cold_display_guard/vision.py new file mode 100644 index 0000000..d1f9dfd --- /dev/null +++ b/src/cold_display_guard/vision.py @@ -0,0 +1,208 @@ +from __future__ import annotations + +from dataclasses import dataclass +from datetime import datetime, timedelta +from typing import Any + + +@dataclass(frozen=True, slots=True) +class Frame: + width: int + height: int + rgb: bytes + + def pixel(self, x: int, y: int) -> tuple[int, int, int]: + offset = (y * self.width + x) * 3 + return self.rgb[offset], self.rgb[offset + 1], self.rgb[offset + 2] + + +@dataclass(frozen=True, slots=True) +class Region: + region_id: str + polygon: tuple[tuple[float, float], ...] + + +@dataclass(frozen=True, slots=True) +class RuntimeVisionSettings: + baseline_frames: int = 3 + sample_stride_pixels: int = 8 + occupancy_mean_delta: float = 24.0 + occupancy_texture_delta: float = 18.0 + trash_motion_delta: float = 18.0 + trash_motion_cooldown_seconds: int = 8 + + +@dataclass(frozen=True, slots=True) +class RegionMetrics: + mean_luma: float + texture: float + sample_count: int + + +class ZoneOccupancyDetector: + def __init__( + self, + regions: list[Region], + trash_region: Region | None, + settings: RuntimeVisionSettings | None = None, + ) -> None: + self.regions = regions + self.trash_region = trash_region + self.settings = settings or RuntimeVisionSettings() + self._baseline: dict[str, RegionMetrics] = {} + self._baseline_samples: dict[str, list[RegionMetrics]] = {region.region_id: [] for region in regions} + if trash_region is not None: + self._baseline_samples[trash_region.region_id] = [] + self._previous_trash_metrics: RegionMetrics | None = None + self._last_trash_motion_at: datetime | None = None + + def observe(self, frame: Frame, when: datetime) -> tuple[dict[str, int], int, dict[str, Any]]: + metrics_by_region = {region.region_id: region_metrics(frame, region, self.settings.sample_stride_pixels) for region in self.regions} + self._update_baseline(metrics_by_region) + + zone_counts: dict[str, int] = {} + diagnostics: dict[str, Any] = {"zones": {}, "baseline_ready": self.baseline_ready} + for region in self.regions: + metrics = metrics_by_region[region.region_id] + baseline = self._baseline.get(region.region_id) + occupied = False + if baseline is not None: + mean_delta = abs(metrics.mean_luma - baseline.mean_luma) + texture_delta = metrics.texture - baseline.texture + occupied = ( + mean_delta >= self.settings.occupancy_mean_delta + or texture_delta >= self.settings.occupancy_texture_delta + ) + diagnostics["zones"][region.region_id] = { + "mean_luma": round(metrics.mean_luma, 3), + "baseline_mean_luma": round(baseline.mean_luma, 3), + "mean_delta": round(mean_delta, 3), + "texture": round(metrics.texture, 3), + "baseline_texture": round(baseline.texture, 3), + "texture_delta": round(texture_delta, 3), + "occupied": occupied, + } + zone_counts[region.region_id] = 1 if occupied else 0 + + trash_deposit_count = self._trash_deposit_count(frame, when, diagnostics) + return zone_counts, trash_deposit_count, diagnostics + + @property + def baseline_ready(self) -> bool: + return all(region.region_id in self._baseline for region in self.regions) + + def _update_baseline(self, metrics_by_region: dict[str, RegionMetrics]) -> None: + for region_id, metrics in metrics_by_region.items(): + if region_id in self._baseline: + continue + samples = self._baseline_samples.setdefault(region_id, []) + samples.append(metrics) + if len(samples) >= self.settings.baseline_frames: + self._baseline[region_id] = average_metrics(samples) + + def _trash_deposit_count(self, frame: Frame, when: datetime, diagnostics: dict[str, Any]) -> int: + if self.trash_region is None: + return 0 + + metrics = region_metrics(frame, self.trash_region, self.settings.sample_stride_pixels) + previous = self._previous_trash_metrics + self._previous_trash_metrics = metrics + if previous is None: + diagnostics["trash"] = {"motion_delta": 0.0, "deposit": False} + return 0 + + motion_delta = abs(metrics.mean_luma - previous.mean_luma) + abs(metrics.texture - previous.texture) + cooldown = timedelta(seconds=self.settings.trash_motion_cooldown_seconds) + in_cooldown = self._last_trash_motion_at is not None and when - self._last_trash_motion_at < cooldown + deposit = motion_delta >= self.settings.trash_motion_delta and not in_cooldown + if deposit: + self._last_trash_motion_at = when + diagnostics["trash"] = { + "motion_delta": round(motion_delta, 3), + "deposit": deposit, + } + return 1 if deposit else 0 + + +def load_regions(config: dict[str, Any]) -> tuple[list[Region], Region | None]: + regions: list[Region] = [] + for zone in config.get("zones", []): + zone_id = str(zone.get("id", "")).strip() + polygon = normalize_polygon(zone.get("polygon", [])) + if zone_id and len(polygon) >= 3: + regions.append(Region(zone_id, polygon)) + + trash_region = None + trash_polygon = normalize_polygon(config.get("trash", {}).get("roi", [])) + if len(trash_polygon) >= 3: + trash_region = Region("trash", trash_polygon) + return regions, trash_region + + +def load_runtime_vision_settings(config: dict[str, Any]) -> RuntimeVisionSettings: + runtime = config.get("runtime", {}) + return RuntimeVisionSettings( + baseline_frames=max(1, int(runtime.get("baseline_frames", 3))), + sample_stride_pixels=max(1, int(runtime.get("sample_stride_pixels", 8))), + occupancy_mean_delta=float(runtime.get("occupancy_mean_delta", 24.0)), + occupancy_texture_delta=float(runtime.get("occupancy_texture_delta", 18.0)), + trash_motion_delta=float(runtime.get("trash_motion_delta", 18.0)), + trash_motion_cooldown_seconds=max(0, int(runtime.get("trash_motion_cooldown_seconds", 8))), + ) + + +def normalize_polygon(value: Any) -> tuple[tuple[float, float], ...]: + points: list[tuple[float, float]] = [] + if not isinstance(value, list): + return () + for item in value: + if not isinstance(item, list | tuple) or len(item) != 2: + continue + points.append((min(1.0, max(0.0, float(item[0]))), min(1.0, max(0.0, float(item[1]))))) + return tuple(points) + + +def region_metrics(frame: Frame, region: Region, stride: int) -> RegionMetrics: + xs = [point[0] for point in region.polygon] + ys = [point[1] for point in region.polygon] + min_x = max(0, int(min(xs) * frame.width)) + max_x = min(frame.width - 1, int(max(xs) * frame.width)) + min_y = max(0, int(min(ys) * frame.height)) + max_y = min(frame.height - 1, int(max(ys) * frame.height)) + + values: list[float] = [] + for y in range(min_y, max_y + 1, stride): + norm_y = (y + 0.5) / frame.height + for x in range(min_x, max_x + 1, stride): + norm_x = (x + 0.5) / frame.width + if not point_in_polygon(norm_x, norm_y, region.polygon): + continue + r, g, b = frame.pixel(x, y) + values.append(0.299 * r + 0.587 * g + 0.114 * b) + + if not values: + return RegionMetrics(mean_luma=0.0, texture=0.0, sample_count=0) + mean = sum(values) / len(values) + variance = sum((value - mean) ** 2 for value in values) / len(values) + return RegionMetrics(mean_luma=mean, texture=variance ** 0.5, sample_count=len(values)) + + +def average_metrics(samples: list[RegionMetrics]) -> RegionMetrics: + return RegionMetrics( + mean_luma=sum(item.mean_luma for item in samples) / len(samples), + texture=sum(item.texture for item in samples) / len(samples), + sample_count=min(item.sample_count for item in samples), + ) + + +def point_in_polygon(x: float, y: float, polygon: tuple[tuple[float, float], ...]) -> bool: + inside = False + j = len(polygon) - 1 + for i, point in enumerate(polygon): + xi, yi = point + xj, yj = polygon[j] + intersects = (yi > y) != (yj > y) and x < (xj - xi) * (y - yi) / ((yj - yi) or 1e-12) + xi + if intersects: + inside = not inside + j = i + return inside diff --git a/tests/test_vision.py b/tests/test_vision.py new file mode 100644 index 0000000..50260fb --- /dev/null +++ b/tests/test_vision.py @@ -0,0 +1,72 @@ +from __future__ import annotations + +import unittest +from datetime import datetime, timezone + +from cold_display_guard.vision import ( + Frame, + Region, + RuntimeVisionSettings, + ZoneOccupancyDetector, + point_in_polygon, +) + + +def solid_frame(width: int, height: int, value: int) -> Frame: + return Frame(width=width, height=height, rgb=bytes([value, value, value]) * width * height) + + +def patched_frame(width: int, height: int, base: int, patch: tuple[int, int, int, int, int]) -> Frame: + x1, y1, x2, y2, value = patch + pixels = bytearray(bytes([base, base, base]) * width * height) + for y in range(y1, y2): + for x in range(x1, x2): + offset = (y * width + x) * 3 + pixels[offset : offset + 3] = bytes([value, value, value]) + return Frame(width=width, height=height, rgb=bytes(pixels)) + + +class VisionTests(unittest.TestCase): + def test_point_in_polygon(self) -> None: + polygon = ((0.0, 0.0), (1.0, 0.0), (1.0, 1.0), (0.0, 1.0)) + + self.assertTrue(point_in_polygon(0.5, 0.5, polygon)) + self.assertFalse(point_in_polygon(1.5, 0.5, polygon)) + + def test_detector_reports_occupied_after_baseline_changes(self) -> None: + detector = ZoneOccupancyDetector( + [Region("r1c1", ((0.0, 0.0), (1.0, 0.0), (1.0, 1.0), (0.0, 1.0)))], + trash_region=None, + settings=RuntimeVisionSettings( + baseline_frames=1, + sample_stride_pixels=4, + occupancy_mean_delta=10, + occupancy_texture_delta=10, + ), + ) + now = datetime(2026, 4, 28, 10, 0, tzinfo=timezone.utc) + + baseline_counts, _, _ = detector.observe(solid_frame(32, 32, 30), now) + changed_counts, _, _ = detector.observe(patched_frame(32, 32, 30, (0, 0, 32, 32, 90)), now) + + self.assertEqual(baseline_counts, {"r1c1": 0}) + self.assertEqual(changed_counts, {"r1c1": 1}) + + def test_detector_reports_trash_motion(self) -> None: + trash = Region("trash", ((0.0, 0.0), (1.0, 0.0), (1.0, 1.0), (0.0, 1.0))) + detector = ZoneOccupancyDetector( + [], + trash_region=trash, + settings=RuntimeVisionSettings(sample_stride_pixels=4, trash_motion_delta=10), + ) + now = datetime(2026, 4, 28, 10, 0, tzinfo=timezone.utc) + + _, first_deposit, _ = detector.observe(solid_frame(32, 32, 30), now) + _, second_deposit, _ = detector.observe(solid_frame(32, 32, 90), now) + + self.assertEqual(first_deposit, 0) + self.assertEqual(second_deposit, 1) + + +if __name__ == "__main__": + unittest.main()