feat: add rtsp runtime pipeline

This commit is contained in:
Yoilun
2026-04-28 19:00:23 +08:00
parent be3d2ac3af
commit b1c39d3fa7
9 changed files with 524 additions and 0 deletions

1
.gitignore vendored
View File

@@ -1,5 +1,6 @@
__pycache__/
*.py[cod]
.DS_Store
.pytest_cache/
.venv/
dist/

View File

@@ -98,6 +98,47 @@ http://127.0.0.1:19080
- `GET /api/manage/summary`
- `GET /api/manage/events`
## 运行识别计时进程
管理页只负责配置和查看数据。要产生数据,还需要启动运行进程:
```bash
scripts/run_runtime.sh
```
运行进程会:
1. 按配置读取 RTSP。
2.`ffmpeg` 周期抓取小尺寸 RGB 帧。
3. 按标定区域做占用变化检测。
4. 判断垃圾桶区域是否有明显投放动作。
5. 调用批次计时状态机。
6. 写入 `logs/events.jsonl`,管理页会读取这个文件。
当前视觉版本是可运行的启发式版本:
- 每个格口输出 `0/1` 占用状态,不识别单份数量。
- 启动后的前几帧用于建立空柜基线,默认 `3` 帧。
- 如果启动时格口里已经有食品,系统会把它当作基线,后续要等画面变化后才会产生计时事件。
- 真实生产精度后续应接食品检测模型。
可选运行参数可以放在配置文件的 `[runtime]` 中:
```toml
[runtime]
sample_interval_seconds = 5.0
frame_width = 640
frame_height = 360
capture_timeout_seconds = 12.0
baseline_frames = 3
sample_stride_pixels = 8
occupancy_mean_delta = 24.0
occupancy_texture_delta = 18.0
trash_motion_delta = 18.0
trash_motion_cooldown_seconds = 8
diagnostics_path = "logs/runtime_diagnostics.jsonl"
```
## 本地测试
```bash

View File

@@ -13,6 +13,7 @@ dependencies = []
[project.scripts]
cold-display-guard = "cold_display_guard.cli:main"
cold-display-guard-manage = "cold_display_guard.manage_api:main"
cold-display-guard-run = "cold_display_guard.main:main"
[tool.setuptools.packages.find]
where = ["src"]

6
scripts/run_runtime.sh Executable file
View File

@@ -0,0 +1,6 @@
#!/usr/bin/env bash
set -euo pipefail
CONFIG_PATH="${CONFIG_PATH:-config/example.toml}"
PYTHONPATH=src python3 -m cold_display_guard.main --config "$CONFIG_PATH"

View File

@@ -109,6 +109,19 @@ def format_config_document(data: dict[str, Any]) -> str:
lines.append(f'trash_confirmation_seconds = {int(thresholds.get("trash_confirmation_seconds", 120))}')
lines.append("")
runtime = data.get("runtime", {})
if runtime:
lines.append("[runtime]")
for key in sorted(runtime):
value = runtime[key]
if isinstance(value, bool):
lines.append(f"{key} = {str(value).lower()}")
elif isinstance(value, int | float):
lines.append(f"{key} = {value}")
else:
lines.append(f'{key} = "{_escape(str(value))}"')
lines.append("")
layout = data.get("layout", {})
zone_ids = [str(item) for item in layout.get("zone_ids", DEFAULT_ZONE_IDS)]
rows = int(layout.get("rows", 2))

View File

@@ -0,0 +1,60 @@
from __future__ import annotations
import subprocess
from dataclasses import dataclass
from cold_display_guard.vision import Frame
class FrameCaptureError(RuntimeError):
pass
@dataclass(frozen=True, slots=True)
class RTSPFrameSource:
rtsp_url: str
width: int = 640
height: int = 360
timeout_seconds: float = 12.0
def capture(self) -> Frame:
command = [
"ffmpeg",
"-hide_banner",
"-loglevel",
"error",
"-rtsp_transport",
"tcp",
"-i",
self.rtsp_url,
"-frames:v",
"1",
"-vf",
f"scale={self.width}:{self.height}",
"-f",
"rawvideo",
"-pix_fmt",
"rgb24",
"-",
]
try:
result = subprocess.run(
command,
check=False,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
timeout=max(1.0, self.timeout_seconds),
)
except FileNotFoundError as exc:
raise FrameCaptureError("ffmpeg not found; install ffmpeg first") from exc
except subprocess.TimeoutExpired as exc:
raise FrameCaptureError(f"ffmpeg timed out after {self.timeout_seconds:g}s") from exc
if result.returncode != 0:
message = result.stderr.decode("utf-8", errors="replace").strip()
raise FrameCaptureError(message or f"ffmpeg exited with code {result.returncode}")
expected_size = self.width * self.height * 3
if len(result.stdout) != expected_size:
raise FrameCaptureError(f"expected {expected_size} RGB bytes, got {len(result.stdout)}")
return Frame(width=self.width, height=self.height, rgb=result.stdout)

View File

@@ -0,0 +1,122 @@
from __future__ import annotations
import argparse
import json
import time
from datetime import datetime
from pathlib import Path
from zoneinfo import ZoneInfo
from cold_display_guard.config import load_config_document, load_settings, resolve_config_path, resolve_project_root
from cold_display_guard.engine import BatchEngine
from cold_display_guard.frame_source import FrameCaptureError, RTSPFrameSource
from cold_display_guard.models import Observation
from cold_display_guard.vision import ZoneOccupancyDetector, load_regions, load_runtime_vision_settings
def main() -> int:
args = parse_args().parse_args()
run(args.config, once=args.once, max_iterations=args.max_iterations)
return 0
def parse_args() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(description="Run Cold Display Guard RTSP batch monitor")
parser.add_argument("--config", default=str(resolve_config_path(None)), help="Path to TOML config")
parser.add_argument("--once", action="store_true", help="Process one frame and exit")
parser.add_argument("--max-iterations", type=int, default=0, help="Stop after N frames; 0 means forever")
return parser
def run(config_path: str | Path, once: bool = False, max_iterations: int = 0) -> None:
resolved_config = resolve_config_path(config_path)
project_root = resolve_project_root(resolved_config)
config = load_config_document(resolved_config)
settings = load_settings(resolved_config)
runtime = config.get("runtime", {})
stream = config.get("stream", {})
rtsp_url = str(stream.get("rtsp_url", "")).strip()
if not rtsp_url:
raise ValueError("stream.rtsp_url is required")
regions, trash_region = load_regions(config)
if not regions:
raise ValueError("at least one [[zones]] polygon is required")
timezone = ZoneInfo(str(config.get("timezone", "Asia/Shanghai")))
event_path = resolve_project_path(project_root, str(config.get("event_sink", {}).get("path", "logs/events.jsonl")))
diagnostics_path = resolve_project_path(project_root, str(runtime.get("diagnostics_path", "logs/runtime_diagnostics.jsonl")))
sample_interval_seconds = max(0.1, float(runtime.get("sample_interval_seconds", 5.0)))
frame_width = max(64, int(runtime.get("frame_width", 640)))
frame_height = max(64, int(runtime.get("frame_height", 360)))
capture_timeout_seconds = max(1.0, float(runtime.get("capture_timeout_seconds", 12.0)))
source = RTSPFrameSource(
rtsp_url=rtsp_url,
width=frame_width,
height=frame_height,
timeout_seconds=capture_timeout_seconds,
)
detector = ZoneOccupancyDetector(regions, trash_region, load_runtime_vision_settings(config))
engine = BatchEngine(settings)
event_path.parent.mkdir(parents=True, exist_ok=True)
diagnostics_path.parent.mkdir(parents=True, exist_ok=True)
print(f"Cold Display Guard runtime started")
print(f"Config: {resolved_config}")
print(f"Events: {event_path}")
print(f"Diagnostics: {diagnostics_path}")
iteration = 0
while True:
iteration += 1
when = datetime.now(timezone)
try:
frame = source.capture()
zone_counts, trash_deposit_count, diagnostics = detector.observe(frame, when)
observation = Observation(ts=when, zone_counts=zone_counts, trash_deposit_count=trash_deposit_count)
events = engine.process(observation)
append_jsonl(event_path, events)
append_jsonl(
diagnostics_path,
[
{
"ts": when.isoformat(),
"zone_counts": zone_counts,
"trash_deposit_count": trash_deposit_count,
"diagnostics": diagnostics,
}
],
)
if events:
print(f"{when.isoformat()} wrote {len(events)} event(s)")
except FrameCaptureError as exc:
append_jsonl(
diagnostics_path,
[{"ts": when.isoformat(), "error": "frame_capture_failed", "message": str(exc)}],
)
print(f"{when.isoformat()} frame capture failed: {exc}")
if once or (max_iterations > 0 and iteration >= max_iterations):
break
time.sleep(sample_interval_seconds)
def resolve_project_path(project_root: Path, raw_path: str) -> Path:
path = Path(raw_path).expanduser()
if not path.is_absolute():
path = project_root / path
return path.resolve()
def append_jsonl(path: Path, payloads: list[dict]) -> None:
if not payloads:
return
with path.open("a", encoding="utf-8") as handle:
for payload in payloads:
handle.write(json.dumps(payload, ensure_ascii=False, sort_keys=True))
handle.write("\n")
if __name__ == "__main__":
raise SystemExit(main())

View File

@@ -0,0 +1,208 @@
from __future__ import annotations
from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import Any
@dataclass(frozen=True, slots=True)
class Frame:
width: int
height: int
rgb: bytes
def pixel(self, x: int, y: int) -> tuple[int, int, int]:
offset = (y * self.width + x) * 3
return self.rgb[offset], self.rgb[offset + 1], self.rgb[offset + 2]
@dataclass(frozen=True, slots=True)
class Region:
region_id: str
polygon: tuple[tuple[float, float], ...]
@dataclass(frozen=True, slots=True)
class RuntimeVisionSettings:
baseline_frames: int = 3
sample_stride_pixels: int = 8
occupancy_mean_delta: float = 24.0
occupancy_texture_delta: float = 18.0
trash_motion_delta: float = 18.0
trash_motion_cooldown_seconds: int = 8
@dataclass(frozen=True, slots=True)
class RegionMetrics:
mean_luma: float
texture: float
sample_count: int
class ZoneOccupancyDetector:
def __init__(
self,
regions: list[Region],
trash_region: Region | None,
settings: RuntimeVisionSettings | None = None,
) -> None:
self.regions = regions
self.trash_region = trash_region
self.settings = settings or RuntimeVisionSettings()
self._baseline: dict[str, RegionMetrics] = {}
self._baseline_samples: dict[str, list[RegionMetrics]] = {region.region_id: [] for region in regions}
if trash_region is not None:
self._baseline_samples[trash_region.region_id] = []
self._previous_trash_metrics: RegionMetrics | None = None
self._last_trash_motion_at: datetime | None = None
def observe(self, frame: Frame, when: datetime) -> tuple[dict[str, int], int, dict[str, Any]]:
metrics_by_region = {region.region_id: region_metrics(frame, region, self.settings.sample_stride_pixels) for region in self.regions}
self._update_baseline(metrics_by_region)
zone_counts: dict[str, int] = {}
diagnostics: dict[str, Any] = {"zones": {}, "baseline_ready": self.baseline_ready}
for region in self.regions:
metrics = metrics_by_region[region.region_id]
baseline = self._baseline.get(region.region_id)
occupied = False
if baseline is not None:
mean_delta = abs(metrics.mean_luma - baseline.mean_luma)
texture_delta = metrics.texture - baseline.texture
occupied = (
mean_delta >= self.settings.occupancy_mean_delta
or texture_delta >= self.settings.occupancy_texture_delta
)
diagnostics["zones"][region.region_id] = {
"mean_luma": round(metrics.mean_luma, 3),
"baseline_mean_luma": round(baseline.mean_luma, 3),
"mean_delta": round(mean_delta, 3),
"texture": round(metrics.texture, 3),
"baseline_texture": round(baseline.texture, 3),
"texture_delta": round(texture_delta, 3),
"occupied": occupied,
}
zone_counts[region.region_id] = 1 if occupied else 0
trash_deposit_count = self._trash_deposit_count(frame, when, diagnostics)
return zone_counts, trash_deposit_count, diagnostics
@property
def baseline_ready(self) -> bool:
return all(region.region_id in self._baseline for region in self.regions)
def _update_baseline(self, metrics_by_region: dict[str, RegionMetrics]) -> None:
for region_id, metrics in metrics_by_region.items():
if region_id in self._baseline:
continue
samples = self._baseline_samples.setdefault(region_id, [])
samples.append(metrics)
if len(samples) >= self.settings.baseline_frames:
self._baseline[region_id] = average_metrics(samples)
def _trash_deposit_count(self, frame: Frame, when: datetime, diagnostics: dict[str, Any]) -> int:
if self.trash_region is None:
return 0
metrics = region_metrics(frame, self.trash_region, self.settings.sample_stride_pixels)
previous = self._previous_trash_metrics
self._previous_trash_metrics = metrics
if previous is None:
diagnostics["trash"] = {"motion_delta": 0.0, "deposit": False}
return 0
motion_delta = abs(metrics.mean_luma - previous.mean_luma) + abs(metrics.texture - previous.texture)
cooldown = timedelta(seconds=self.settings.trash_motion_cooldown_seconds)
in_cooldown = self._last_trash_motion_at is not None and when - self._last_trash_motion_at < cooldown
deposit = motion_delta >= self.settings.trash_motion_delta and not in_cooldown
if deposit:
self._last_trash_motion_at = when
diagnostics["trash"] = {
"motion_delta": round(motion_delta, 3),
"deposit": deposit,
}
return 1 if deposit else 0
def load_regions(config: dict[str, Any]) -> tuple[list[Region], Region | None]:
regions: list[Region] = []
for zone in config.get("zones", []):
zone_id = str(zone.get("id", "")).strip()
polygon = normalize_polygon(zone.get("polygon", []))
if zone_id and len(polygon) >= 3:
regions.append(Region(zone_id, polygon))
trash_region = None
trash_polygon = normalize_polygon(config.get("trash", {}).get("roi", []))
if len(trash_polygon) >= 3:
trash_region = Region("trash", trash_polygon)
return regions, trash_region
def load_runtime_vision_settings(config: dict[str, Any]) -> RuntimeVisionSettings:
runtime = config.get("runtime", {})
return RuntimeVisionSettings(
baseline_frames=max(1, int(runtime.get("baseline_frames", 3))),
sample_stride_pixels=max(1, int(runtime.get("sample_stride_pixels", 8))),
occupancy_mean_delta=float(runtime.get("occupancy_mean_delta", 24.0)),
occupancy_texture_delta=float(runtime.get("occupancy_texture_delta", 18.0)),
trash_motion_delta=float(runtime.get("trash_motion_delta", 18.0)),
trash_motion_cooldown_seconds=max(0, int(runtime.get("trash_motion_cooldown_seconds", 8))),
)
def normalize_polygon(value: Any) -> tuple[tuple[float, float], ...]:
points: list[tuple[float, float]] = []
if not isinstance(value, list):
return ()
for item in value:
if not isinstance(item, list | tuple) or len(item) != 2:
continue
points.append((min(1.0, max(0.0, float(item[0]))), min(1.0, max(0.0, float(item[1])))))
return tuple(points)
def region_metrics(frame: Frame, region: Region, stride: int) -> RegionMetrics:
xs = [point[0] for point in region.polygon]
ys = [point[1] for point in region.polygon]
min_x = max(0, int(min(xs) * frame.width))
max_x = min(frame.width - 1, int(max(xs) * frame.width))
min_y = max(0, int(min(ys) * frame.height))
max_y = min(frame.height - 1, int(max(ys) * frame.height))
values: list[float] = []
for y in range(min_y, max_y + 1, stride):
norm_y = (y + 0.5) / frame.height
for x in range(min_x, max_x + 1, stride):
norm_x = (x + 0.5) / frame.width
if not point_in_polygon(norm_x, norm_y, region.polygon):
continue
r, g, b = frame.pixel(x, y)
values.append(0.299 * r + 0.587 * g + 0.114 * b)
if not values:
return RegionMetrics(mean_luma=0.0, texture=0.0, sample_count=0)
mean = sum(values) / len(values)
variance = sum((value - mean) ** 2 for value in values) / len(values)
return RegionMetrics(mean_luma=mean, texture=variance ** 0.5, sample_count=len(values))
def average_metrics(samples: list[RegionMetrics]) -> RegionMetrics:
return RegionMetrics(
mean_luma=sum(item.mean_luma for item in samples) / len(samples),
texture=sum(item.texture for item in samples) / len(samples),
sample_count=min(item.sample_count for item in samples),
)
def point_in_polygon(x: float, y: float, polygon: tuple[tuple[float, float], ...]) -> bool:
inside = False
j = len(polygon) - 1
for i, point in enumerate(polygon):
xi, yi = point
xj, yj = polygon[j]
intersects = (yi > y) != (yj > y) and x < (xj - xi) * (y - yi) / ((yj - yi) or 1e-12) + xi
if intersects:
inside = not inside
j = i
return inside

72
tests/test_vision.py Normal file
View File

@@ -0,0 +1,72 @@
from __future__ import annotations
import unittest
from datetime import datetime, timezone
from cold_display_guard.vision import (
Frame,
Region,
RuntimeVisionSettings,
ZoneOccupancyDetector,
point_in_polygon,
)
def solid_frame(width: int, height: int, value: int) -> Frame:
return Frame(width=width, height=height, rgb=bytes([value, value, value]) * width * height)
def patched_frame(width: int, height: int, base: int, patch: tuple[int, int, int, int, int]) -> Frame:
x1, y1, x2, y2, value = patch
pixels = bytearray(bytes([base, base, base]) * width * height)
for y in range(y1, y2):
for x in range(x1, x2):
offset = (y * width + x) * 3
pixels[offset : offset + 3] = bytes([value, value, value])
return Frame(width=width, height=height, rgb=bytes(pixels))
class VisionTests(unittest.TestCase):
def test_point_in_polygon(self) -> None:
polygon = ((0.0, 0.0), (1.0, 0.0), (1.0, 1.0), (0.0, 1.0))
self.assertTrue(point_in_polygon(0.5, 0.5, polygon))
self.assertFalse(point_in_polygon(1.5, 0.5, polygon))
def test_detector_reports_occupied_after_baseline_changes(self) -> None:
detector = ZoneOccupancyDetector(
[Region("r1c1", ((0.0, 0.0), (1.0, 0.0), (1.0, 1.0), (0.0, 1.0)))],
trash_region=None,
settings=RuntimeVisionSettings(
baseline_frames=1,
sample_stride_pixels=4,
occupancy_mean_delta=10,
occupancy_texture_delta=10,
),
)
now = datetime(2026, 4, 28, 10, 0, tzinfo=timezone.utc)
baseline_counts, _, _ = detector.observe(solid_frame(32, 32, 30), now)
changed_counts, _, _ = detector.observe(patched_frame(32, 32, 30, (0, 0, 32, 32, 90)), now)
self.assertEqual(baseline_counts, {"r1c1": 0})
self.assertEqual(changed_counts, {"r1c1": 1})
def test_detector_reports_trash_motion(self) -> None:
trash = Region("trash", ((0.0, 0.0), (1.0, 0.0), (1.0, 1.0), (0.0, 1.0)))
detector = ZoneOccupancyDetector(
[],
trash_region=trash,
settings=RuntimeVisionSettings(sample_stride_pixels=4, trash_motion_delta=10),
)
now = datetime(2026, 4, 28, 10, 0, tzinfo=timezone.utc)
_, first_deposit, _ = detector.observe(solid_frame(32, 32, 30), now)
_, second_deposit, _ = detector.observe(solid_frame(32, 32, 90), now)
self.assertEqual(first_deposit, 0)
self.assertEqual(second_deposit, 1)
if __name__ == "__main__":
unittest.main()