Compare commits
2 Commits
be3d2ac3af
...
c81a20b2ea
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
c81a20b2ea | ||
|
|
b1c39d3fa7 |
1
.gitignore
vendored
1
.gitignore
vendored
@@ -1,5 +1,6 @@
|
||||
__pycache__/
|
||||
*.py[cod]
|
||||
.DS_Store
|
||||
.pytest_cache/
|
||||
.venv/
|
||||
dist/
|
||||
|
||||
41
README_zh.md
41
README_zh.md
@@ -98,6 +98,47 @@ http://127.0.0.1:19080
|
||||
- `GET /api/manage/summary`
|
||||
- `GET /api/manage/events`
|
||||
|
||||
## 运行识别计时进程
|
||||
|
||||
管理页只负责配置和查看数据。要产生数据,还需要启动运行进程:
|
||||
|
||||
```bash
|
||||
scripts/run_runtime.sh
|
||||
```
|
||||
|
||||
运行进程会:
|
||||
|
||||
1. 按配置读取 RTSP。
|
||||
2. 用 `ffmpeg` 周期抓取小尺寸 RGB 帧。
|
||||
3. 按标定区域做占用变化检测。
|
||||
4. 判断垃圾桶区域是否有明显投放动作。
|
||||
5. 调用批次计时状态机。
|
||||
6. 写入 `logs/events.jsonl`,管理页会读取这个文件。
|
||||
|
||||
当前视觉版本是可运行的启发式版本:
|
||||
|
||||
- 每个格口输出 `0/1` 占用状态,不识别单份数量。
|
||||
- 启动后的前几帧用于建立空柜基线,默认 `3` 帧。
|
||||
- 如果启动时格口里已经有食品,系统会把它当作基线,后续要等画面变化后才会产生计时事件。
|
||||
- 真实生产精度后续应接食品检测模型。
|
||||
|
||||
可选运行参数可以放在配置文件的 `[runtime]` 中:
|
||||
|
||||
```toml
|
||||
[runtime]
|
||||
sample_interval_seconds = 5.0
|
||||
frame_width = 640
|
||||
frame_height = 360
|
||||
capture_timeout_seconds = 12.0
|
||||
baseline_frames = 3
|
||||
sample_stride_pixels = 8
|
||||
occupancy_mean_delta = 24.0
|
||||
occupancy_texture_delta = 18.0
|
||||
trash_motion_delta = 18.0
|
||||
trash_motion_cooldown_seconds = 8
|
||||
diagnostics_path = "logs/runtime_diagnostics.jsonl"
|
||||
```
|
||||
|
||||
## 本地测试
|
||||
|
||||
```bash
|
||||
|
||||
@@ -13,6 +13,7 @@ dependencies = []
|
||||
[project.scripts]
|
||||
cold-display-guard = "cold_display_guard.cli:main"
|
||||
cold-display-guard-manage = "cold_display_guard.manage_api:main"
|
||||
cold-display-guard-run = "cold_display_guard.main:main"
|
||||
|
||||
[tool.setuptools.packages.find]
|
||||
where = ["src"]
|
||||
|
||||
6
scripts/run_runtime.sh
Executable file
6
scripts/run_runtime.sh
Executable file
@@ -0,0 +1,6 @@
|
||||
#!/usr/bin/env bash
|
||||
set -euo pipefail
|
||||
|
||||
CONFIG_PATH="${CONFIG_PATH:-config/example.toml}"
|
||||
|
||||
PYTHONPATH=src python3 -m cold_display_guard.main --config "$CONFIG_PATH"
|
||||
@@ -109,6 +109,19 @@ def format_config_document(data: dict[str, Any]) -> str:
|
||||
lines.append(f'trash_confirmation_seconds = {int(thresholds.get("trash_confirmation_seconds", 120))}')
|
||||
lines.append("")
|
||||
|
||||
runtime = data.get("runtime", {})
|
||||
if runtime:
|
||||
lines.append("[runtime]")
|
||||
for key in sorted(runtime):
|
||||
value = runtime[key]
|
||||
if isinstance(value, bool):
|
||||
lines.append(f"{key} = {str(value).lower()}")
|
||||
elif isinstance(value, int | float):
|
||||
lines.append(f"{key} = {value}")
|
||||
else:
|
||||
lines.append(f'{key} = "{_escape(str(value))}"')
|
||||
lines.append("")
|
||||
|
||||
layout = data.get("layout", {})
|
||||
zone_ids = [str(item) for item in layout.get("zone_ids", DEFAULT_ZONE_IDS)]
|
||||
rows = int(layout.get("rows", 2))
|
||||
|
||||
60
src/cold_display_guard/frame_source.py
Normal file
60
src/cold_display_guard/frame_source.py
Normal file
@@ -0,0 +1,60 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import subprocess
|
||||
from dataclasses import dataclass
|
||||
|
||||
from cold_display_guard.vision import Frame
|
||||
|
||||
|
||||
class FrameCaptureError(RuntimeError):
|
||||
pass
|
||||
|
||||
|
||||
@dataclass(frozen=True, slots=True)
|
||||
class RTSPFrameSource:
|
||||
rtsp_url: str
|
||||
width: int = 640
|
||||
height: int = 360
|
||||
timeout_seconds: float = 12.0
|
||||
|
||||
def capture(self) -> Frame:
|
||||
command = [
|
||||
"ffmpeg",
|
||||
"-hide_banner",
|
||||
"-loglevel",
|
||||
"error",
|
||||
"-rtsp_transport",
|
||||
"tcp",
|
||||
"-i",
|
||||
self.rtsp_url,
|
||||
"-frames:v",
|
||||
"1",
|
||||
"-vf",
|
||||
f"scale={self.width}:{self.height}",
|
||||
"-f",
|
||||
"rawvideo",
|
||||
"-pix_fmt",
|
||||
"rgb24",
|
||||
"-",
|
||||
]
|
||||
try:
|
||||
result = subprocess.run(
|
||||
command,
|
||||
check=False,
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.PIPE,
|
||||
timeout=max(1.0, self.timeout_seconds),
|
||||
)
|
||||
except FileNotFoundError as exc:
|
||||
raise FrameCaptureError("ffmpeg not found; install ffmpeg first") from exc
|
||||
except subprocess.TimeoutExpired as exc:
|
||||
raise FrameCaptureError(f"ffmpeg timed out after {self.timeout_seconds:g}s") from exc
|
||||
|
||||
if result.returncode != 0:
|
||||
message = result.stderr.decode("utf-8", errors="replace").strip()
|
||||
raise FrameCaptureError(message or f"ffmpeg exited with code {result.returncode}")
|
||||
|
||||
expected_size = self.width * self.height * 3
|
||||
if len(result.stdout) != expected_size:
|
||||
raise FrameCaptureError(f"expected {expected_size} RGB bytes, got {len(result.stdout)}")
|
||||
return Frame(width=self.width, height=self.height, rgb=result.stdout)
|
||||
122
src/cold_display_guard/main.py
Normal file
122
src/cold_display_guard/main.py
Normal file
@@ -0,0 +1,122 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import time
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
from zoneinfo import ZoneInfo
|
||||
|
||||
from cold_display_guard.config import load_config_document, load_settings, resolve_config_path, resolve_project_root
|
||||
from cold_display_guard.engine import BatchEngine
|
||||
from cold_display_guard.frame_source import FrameCaptureError, RTSPFrameSource
|
||||
from cold_display_guard.models import Observation
|
||||
from cold_display_guard.vision import ZoneOccupancyDetector, load_regions, load_runtime_vision_settings
|
||||
|
||||
|
||||
def main() -> int:
|
||||
args = parse_args().parse_args()
|
||||
run(args.config, once=args.once, max_iterations=args.max_iterations)
|
||||
return 0
|
||||
|
||||
|
||||
def parse_args() -> argparse.ArgumentParser:
|
||||
parser = argparse.ArgumentParser(description="Run Cold Display Guard RTSP batch monitor")
|
||||
parser.add_argument("--config", default=str(resolve_config_path(None)), help="Path to TOML config")
|
||||
parser.add_argument("--once", action="store_true", help="Process one frame and exit")
|
||||
parser.add_argument("--max-iterations", type=int, default=0, help="Stop after N frames; 0 means forever")
|
||||
return parser
|
||||
|
||||
|
||||
def run(config_path: str | Path, once: bool = False, max_iterations: int = 0) -> None:
|
||||
resolved_config = resolve_config_path(config_path)
|
||||
project_root = resolve_project_root(resolved_config)
|
||||
config = load_config_document(resolved_config)
|
||||
settings = load_settings(resolved_config)
|
||||
runtime = config.get("runtime", {})
|
||||
stream = config.get("stream", {})
|
||||
rtsp_url = str(stream.get("rtsp_url", "")).strip()
|
||||
if not rtsp_url:
|
||||
raise ValueError("stream.rtsp_url is required")
|
||||
|
||||
regions, trash_region = load_regions(config)
|
||||
if not regions:
|
||||
raise ValueError("at least one [[zones]] polygon is required")
|
||||
|
||||
timezone = ZoneInfo(str(config.get("timezone", "Asia/Shanghai")))
|
||||
event_path = resolve_project_path(project_root, str(config.get("event_sink", {}).get("path", "logs/events.jsonl")))
|
||||
diagnostics_path = resolve_project_path(project_root, str(runtime.get("diagnostics_path", "logs/runtime_diagnostics.jsonl")))
|
||||
sample_interval_seconds = max(0.1, float(runtime.get("sample_interval_seconds", 5.0)))
|
||||
frame_width = max(64, int(runtime.get("frame_width", 640)))
|
||||
frame_height = max(64, int(runtime.get("frame_height", 360)))
|
||||
capture_timeout_seconds = max(1.0, float(runtime.get("capture_timeout_seconds", 12.0)))
|
||||
|
||||
source = RTSPFrameSource(
|
||||
rtsp_url=rtsp_url,
|
||||
width=frame_width,
|
||||
height=frame_height,
|
||||
timeout_seconds=capture_timeout_seconds,
|
||||
)
|
||||
detector = ZoneOccupancyDetector(regions, trash_region, load_runtime_vision_settings(config))
|
||||
engine = BatchEngine(settings)
|
||||
|
||||
event_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
diagnostics_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
print(f"Cold Display Guard runtime started")
|
||||
print(f"Config: {resolved_config}")
|
||||
print(f"Events: {event_path}")
|
||||
print(f"Diagnostics: {diagnostics_path}")
|
||||
|
||||
iteration = 0
|
||||
while True:
|
||||
iteration += 1
|
||||
when = datetime.now(timezone)
|
||||
try:
|
||||
frame = source.capture()
|
||||
zone_counts, trash_deposit_count, diagnostics = detector.observe(frame, when)
|
||||
observation = Observation(ts=when, zone_counts=zone_counts, trash_deposit_count=trash_deposit_count)
|
||||
events = engine.process(observation)
|
||||
append_jsonl(event_path, events)
|
||||
append_jsonl(
|
||||
diagnostics_path,
|
||||
[
|
||||
{
|
||||
"ts": when.isoformat(),
|
||||
"zone_counts": zone_counts,
|
||||
"trash_deposit_count": trash_deposit_count,
|
||||
"diagnostics": diagnostics,
|
||||
}
|
||||
],
|
||||
)
|
||||
if events:
|
||||
print(f"{when.isoformat()} wrote {len(events)} event(s)")
|
||||
except FrameCaptureError as exc:
|
||||
append_jsonl(
|
||||
diagnostics_path,
|
||||
[{"ts": when.isoformat(), "error": "frame_capture_failed", "message": str(exc)}],
|
||||
)
|
||||
print(f"{when.isoformat()} frame capture failed: {exc}")
|
||||
|
||||
if once or (max_iterations > 0 and iteration >= max_iterations):
|
||||
break
|
||||
time.sleep(sample_interval_seconds)
|
||||
|
||||
|
||||
def resolve_project_path(project_root: Path, raw_path: str) -> Path:
|
||||
path = Path(raw_path).expanduser()
|
||||
if not path.is_absolute():
|
||||
path = project_root / path
|
||||
return path.resolve()
|
||||
|
||||
|
||||
def append_jsonl(path: Path, payloads: list[dict]) -> None:
|
||||
if not payloads:
|
||||
return
|
||||
with path.open("a", encoding="utf-8") as handle:
|
||||
for payload in payloads:
|
||||
handle.write(json.dumps(payload, ensure_ascii=False, sort_keys=True))
|
||||
handle.write("\n")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
raise SystemExit(main())
|
||||
@@ -65,6 +65,11 @@ def create_handler(ctx: ManageContext) -> type[BaseHTTPRequestHandler]:
|
||||
limit = bounded_int(query.get("limit", ["200"])[0], 1, MAX_EVENT_LINES)
|
||||
self._send_json({"items": load_events(ctx, limit), "limit": limit})
|
||||
return
|
||||
if parsed.path == "/api/manage/diagnostics":
|
||||
query = parse_qs(parsed.query)
|
||||
limit = bounded_int(query.get("limit", ["50"])[0], 1, MAX_EVENT_LINES)
|
||||
self._send_json({"items": load_diagnostics(ctx, limit), "limit": limit})
|
||||
return
|
||||
self.send_error(HTTPStatus.NOT_FOUND)
|
||||
|
||||
def do_PUT(self) -> None:
|
||||
@@ -234,6 +239,7 @@ def config_payload(ctx: ManageContext) -> dict[str, Any]:
|
||||
|
||||
def build_summary(ctx: ManageContext) -> dict[str, Any]:
|
||||
events = load_events(ctx, MAX_EVENT_LINES)
|
||||
diagnostics = load_diagnostics(ctx, MAX_EVENT_LINES)
|
||||
counts: dict[str, int] = {}
|
||||
last_event_time = ""
|
||||
latest_alert = ""
|
||||
@@ -261,22 +267,35 @@ def build_summary(ctx: ManageContext) -> dict[str, Any]:
|
||||
"violation_count": active_alert_count,
|
||||
"latest_alert_time": latest_alert,
|
||||
"events_path": str(event_sink_path(ctx)),
|
||||
"diagnostics_path": str(diagnostics_path(ctx)),
|
||||
"diagnostics_count": len(diagnostics),
|
||||
"latest_zone_counts": latest_zone_counts(diagnostics),
|
||||
"baseline_ready": latest_baseline_ready(diagnostics),
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
def load_events(ctx: ManageContext, limit: int) -> list[dict[str, Any]]:
|
||||
path = event_sink_path(ctx)
|
||||
return load_jsonl_tail(path, limit)
|
||||
|
||||
|
||||
def load_diagnostics(ctx: ManageContext, limit: int) -> list[dict[str, Any]]:
|
||||
path = diagnostics_path(ctx)
|
||||
return load_jsonl_tail(path, limit)
|
||||
|
||||
|
||||
def load_jsonl_tail(path: Path, limit: int) -> list[dict[str, Any]]:
|
||||
lines = tail_lines(path, limit)
|
||||
events: list[dict[str, Any]] = []
|
||||
items: list[dict[str, Any]] = []
|
||||
for line in lines:
|
||||
try:
|
||||
payload = json.loads(line)
|
||||
except json.JSONDecodeError:
|
||||
continue
|
||||
if isinstance(payload, dict):
|
||||
events.append(payload)
|
||||
return events
|
||||
items.append(payload)
|
||||
return items
|
||||
|
||||
|
||||
def event_sink_path(ctx: ManageContext, data: dict[str, Any] | None = None) -> Path:
|
||||
@@ -289,6 +308,32 @@ def event_sink_path(ctx: ManageContext, data: dict[str, Any] | None = None) -> P
|
||||
return path.resolve()
|
||||
|
||||
|
||||
def diagnostics_path(ctx: ManageContext, data: dict[str, Any] | None = None) -> Path:
|
||||
if data is None:
|
||||
data = load_config_document(ctx.config_path)
|
||||
raw_path = str(data.get("runtime", {}).get("diagnostics_path", "logs/runtime_diagnostics.jsonl"))
|
||||
path = Path(raw_path).expanduser()
|
||||
if not path.is_absolute():
|
||||
path = ctx.project_root / path
|
||||
return path.resolve()
|
||||
|
||||
|
||||
def latest_zone_counts(diagnostics: list[dict[str, Any]]) -> dict[str, int]:
|
||||
for item in reversed(diagnostics):
|
||||
zone_counts = item.get("zone_counts")
|
||||
if isinstance(zone_counts, dict):
|
||||
return {str(key): int(value) for key, value in zone_counts.items()}
|
||||
return {}
|
||||
|
||||
|
||||
def latest_baseline_ready(diagnostics: list[dict[str, Any]]) -> bool:
|
||||
for item in reversed(diagnostics):
|
||||
diagnostics_payload = item.get("diagnostics")
|
||||
if isinstance(diagnostics_payload, dict):
|
||||
return bool(diagnostics_payload.get("baseline_ready", False))
|
||||
return False
|
||||
|
||||
|
||||
def tail_lines(path: Path, limit: int) -> list[str]:
|
||||
if not path.exists():
|
||||
return []
|
||||
|
||||
208
src/cold_display_guard/vision.py
Normal file
208
src/cold_display_guard/vision.py
Normal file
@@ -0,0 +1,208 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from dataclasses import dataclass
|
||||
from datetime import datetime, timedelta
|
||||
from typing import Any
|
||||
|
||||
|
||||
@dataclass(frozen=True, slots=True)
|
||||
class Frame:
|
||||
width: int
|
||||
height: int
|
||||
rgb: bytes
|
||||
|
||||
def pixel(self, x: int, y: int) -> tuple[int, int, int]:
|
||||
offset = (y * self.width + x) * 3
|
||||
return self.rgb[offset], self.rgb[offset + 1], self.rgb[offset + 2]
|
||||
|
||||
|
||||
@dataclass(frozen=True, slots=True)
|
||||
class Region:
|
||||
region_id: str
|
||||
polygon: tuple[tuple[float, float], ...]
|
||||
|
||||
|
||||
@dataclass(frozen=True, slots=True)
|
||||
class RuntimeVisionSettings:
|
||||
baseline_frames: int = 3
|
||||
sample_stride_pixels: int = 8
|
||||
occupancy_mean_delta: float = 24.0
|
||||
occupancy_texture_delta: float = 18.0
|
||||
trash_motion_delta: float = 18.0
|
||||
trash_motion_cooldown_seconds: int = 8
|
||||
|
||||
|
||||
@dataclass(frozen=True, slots=True)
|
||||
class RegionMetrics:
|
||||
mean_luma: float
|
||||
texture: float
|
||||
sample_count: int
|
||||
|
||||
|
||||
class ZoneOccupancyDetector:
|
||||
def __init__(
|
||||
self,
|
||||
regions: list[Region],
|
||||
trash_region: Region | None,
|
||||
settings: RuntimeVisionSettings | None = None,
|
||||
) -> None:
|
||||
self.regions = regions
|
||||
self.trash_region = trash_region
|
||||
self.settings = settings or RuntimeVisionSettings()
|
||||
self._baseline: dict[str, RegionMetrics] = {}
|
||||
self._baseline_samples: dict[str, list[RegionMetrics]] = {region.region_id: [] for region in regions}
|
||||
if trash_region is not None:
|
||||
self._baseline_samples[trash_region.region_id] = []
|
||||
self._previous_trash_metrics: RegionMetrics | None = None
|
||||
self._last_trash_motion_at: datetime | None = None
|
||||
|
||||
def observe(self, frame: Frame, when: datetime) -> tuple[dict[str, int], int, dict[str, Any]]:
|
||||
metrics_by_region = {region.region_id: region_metrics(frame, region, self.settings.sample_stride_pixels) for region in self.regions}
|
||||
self._update_baseline(metrics_by_region)
|
||||
|
||||
zone_counts: dict[str, int] = {}
|
||||
diagnostics: dict[str, Any] = {"zones": {}, "baseline_ready": self.baseline_ready}
|
||||
for region in self.regions:
|
||||
metrics = metrics_by_region[region.region_id]
|
||||
baseline = self._baseline.get(region.region_id)
|
||||
occupied = False
|
||||
if baseline is not None:
|
||||
mean_delta = abs(metrics.mean_luma - baseline.mean_luma)
|
||||
texture_delta = metrics.texture - baseline.texture
|
||||
occupied = (
|
||||
mean_delta >= self.settings.occupancy_mean_delta
|
||||
or texture_delta >= self.settings.occupancy_texture_delta
|
||||
)
|
||||
diagnostics["zones"][region.region_id] = {
|
||||
"mean_luma": round(metrics.mean_luma, 3),
|
||||
"baseline_mean_luma": round(baseline.mean_luma, 3),
|
||||
"mean_delta": round(mean_delta, 3),
|
||||
"texture": round(metrics.texture, 3),
|
||||
"baseline_texture": round(baseline.texture, 3),
|
||||
"texture_delta": round(texture_delta, 3),
|
||||
"occupied": occupied,
|
||||
}
|
||||
zone_counts[region.region_id] = 1 if occupied else 0
|
||||
|
||||
trash_deposit_count = self._trash_deposit_count(frame, when, diagnostics)
|
||||
return zone_counts, trash_deposit_count, diagnostics
|
||||
|
||||
@property
|
||||
def baseline_ready(self) -> bool:
|
||||
return all(region.region_id in self._baseline for region in self.regions)
|
||||
|
||||
def _update_baseline(self, metrics_by_region: dict[str, RegionMetrics]) -> None:
|
||||
for region_id, metrics in metrics_by_region.items():
|
||||
if region_id in self._baseline:
|
||||
continue
|
||||
samples = self._baseline_samples.setdefault(region_id, [])
|
||||
samples.append(metrics)
|
||||
if len(samples) >= self.settings.baseline_frames:
|
||||
self._baseline[region_id] = average_metrics(samples)
|
||||
|
||||
def _trash_deposit_count(self, frame: Frame, when: datetime, diagnostics: dict[str, Any]) -> int:
|
||||
if self.trash_region is None:
|
||||
return 0
|
||||
|
||||
metrics = region_metrics(frame, self.trash_region, self.settings.sample_stride_pixels)
|
||||
previous = self._previous_trash_metrics
|
||||
self._previous_trash_metrics = metrics
|
||||
if previous is None:
|
||||
diagnostics["trash"] = {"motion_delta": 0.0, "deposit": False}
|
||||
return 0
|
||||
|
||||
motion_delta = abs(metrics.mean_luma - previous.mean_luma) + abs(metrics.texture - previous.texture)
|
||||
cooldown = timedelta(seconds=self.settings.trash_motion_cooldown_seconds)
|
||||
in_cooldown = self._last_trash_motion_at is not None and when - self._last_trash_motion_at < cooldown
|
||||
deposit = motion_delta >= self.settings.trash_motion_delta and not in_cooldown
|
||||
if deposit:
|
||||
self._last_trash_motion_at = when
|
||||
diagnostics["trash"] = {
|
||||
"motion_delta": round(motion_delta, 3),
|
||||
"deposit": deposit,
|
||||
}
|
||||
return 1 if deposit else 0
|
||||
|
||||
|
||||
def load_regions(config: dict[str, Any]) -> tuple[list[Region], Region | None]:
|
||||
regions: list[Region] = []
|
||||
for zone in config.get("zones", []):
|
||||
zone_id = str(zone.get("id", "")).strip()
|
||||
polygon = normalize_polygon(zone.get("polygon", []))
|
||||
if zone_id and len(polygon) >= 3:
|
||||
regions.append(Region(zone_id, polygon))
|
||||
|
||||
trash_region = None
|
||||
trash_polygon = normalize_polygon(config.get("trash", {}).get("roi", []))
|
||||
if len(trash_polygon) >= 3:
|
||||
trash_region = Region("trash", trash_polygon)
|
||||
return regions, trash_region
|
||||
|
||||
|
||||
def load_runtime_vision_settings(config: dict[str, Any]) -> RuntimeVisionSettings:
|
||||
runtime = config.get("runtime", {})
|
||||
return RuntimeVisionSettings(
|
||||
baseline_frames=max(1, int(runtime.get("baseline_frames", 3))),
|
||||
sample_stride_pixels=max(1, int(runtime.get("sample_stride_pixels", 8))),
|
||||
occupancy_mean_delta=float(runtime.get("occupancy_mean_delta", 24.0)),
|
||||
occupancy_texture_delta=float(runtime.get("occupancy_texture_delta", 18.0)),
|
||||
trash_motion_delta=float(runtime.get("trash_motion_delta", 18.0)),
|
||||
trash_motion_cooldown_seconds=max(0, int(runtime.get("trash_motion_cooldown_seconds", 8))),
|
||||
)
|
||||
|
||||
|
||||
def normalize_polygon(value: Any) -> tuple[tuple[float, float], ...]:
|
||||
points: list[tuple[float, float]] = []
|
||||
if not isinstance(value, list):
|
||||
return ()
|
||||
for item in value:
|
||||
if not isinstance(item, list | tuple) or len(item) != 2:
|
||||
continue
|
||||
points.append((min(1.0, max(0.0, float(item[0]))), min(1.0, max(0.0, float(item[1])))))
|
||||
return tuple(points)
|
||||
|
||||
|
||||
def region_metrics(frame: Frame, region: Region, stride: int) -> RegionMetrics:
|
||||
xs = [point[0] for point in region.polygon]
|
||||
ys = [point[1] for point in region.polygon]
|
||||
min_x = max(0, int(min(xs) * frame.width))
|
||||
max_x = min(frame.width - 1, int(max(xs) * frame.width))
|
||||
min_y = max(0, int(min(ys) * frame.height))
|
||||
max_y = min(frame.height - 1, int(max(ys) * frame.height))
|
||||
|
||||
values: list[float] = []
|
||||
for y in range(min_y, max_y + 1, stride):
|
||||
norm_y = (y + 0.5) / frame.height
|
||||
for x in range(min_x, max_x + 1, stride):
|
||||
norm_x = (x + 0.5) / frame.width
|
||||
if not point_in_polygon(norm_x, norm_y, region.polygon):
|
||||
continue
|
||||
r, g, b = frame.pixel(x, y)
|
||||
values.append(0.299 * r + 0.587 * g + 0.114 * b)
|
||||
|
||||
if not values:
|
||||
return RegionMetrics(mean_luma=0.0, texture=0.0, sample_count=0)
|
||||
mean = sum(values) / len(values)
|
||||
variance = sum((value - mean) ** 2 for value in values) / len(values)
|
||||
return RegionMetrics(mean_luma=mean, texture=variance ** 0.5, sample_count=len(values))
|
||||
|
||||
|
||||
def average_metrics(samples: list[RegionMetrics]) -> RegionMetrics:
|
||||
return RegionMetrics(
|
||||
mean_luma=sum(item.mean_luma for item in samples) / len(samples),
|
||||
texture=sum(item.texture for item in samples) / len(samples),
|
||||
sample_count=min(item.sample_count for item in samples),
|
||||
)
|
||||
|
||||
|
||||
def point_in_polygon(x: float, y: float, polygon: tuple[tuple[float, float], ...]) -> bool:
|
||||
inside = False
|
||||
j = len(polygon) - 1
|
||||
for i, point in enumerate(polygon):
|
||||
xi, yi = point
|
||||
xj, yj = polygon[j]
|
||||
intersects = (yi > y) != (yj > y) and x < (xj - xi) * (y - yi) / ((yj - yi) or 1e-12) + xi
|
||||
if intersects:
|
||||
inside = not inside
|
||||
j = i
|
||||
return inside
|
||||
@@ -77,6 +77,37 @@ class ManageApiTests(unittest.TestCase):
|
||||
self.assertEqual(summary["metrics"]["event_count"], 2)
|
||||
self.assertEqual(summary["metrics"]["violation_count"], 1)
|
||||
|
||||
def test_summary_reads_runtime_diagnostics(self) -> None:
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
root = Path(tmpdir)
|
||||
config_path = root / "config" / "local.toml"
|
||||
save_config_document(
|
||||
config_path,
|
||||
{
|
||||
"runtime": {"diagnostics_path": "logs/runtime_diagnostics.jsonl"},
|
||||
"event_sink": {"path": "logs/events.jsonl"},
|
||||
"layout": {"rows": 1, "cols": 1, "zone_ids": ["r1c1"]},
|
||||
},
|
||||
)
|
||||
diagnostics_path = root / "logs" / "runtime_diagnostics.jsonl"
|
||||
diagnostics_path.parent.mkdir()
|
||||
diagnostics_path.write_text(
|
||||
json.dumps(
|
||||
{
|
||||
"ts": "2026-04-28T10:00:00+08:00",
|
||||
"zone_counts": {"r1c1": 1},
|
||||
"diagnostics": {"baseline_ready": True},
|
||||
}
|
||||
),
|
||||
encoding="utf-8",
|
||||
)
|
||||
|
||||
summary = build_summary(ManageContext(config_path=config_path, project_root=root))
|
||||
|
||||
self.assertEqual(summary["metrics"]["diagnostics_count"], 1)
|
||||
self.assertEqual(summary["metrics"]["latest_zone_counts"], {"r1c1": 1})
|
||||
self.assertTrue(summary["metrics"]["baseline_ready"])
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
|
||||
72
tests/test_vision.py
Normal file
72
tests/test_vision.py
Normal file
@@ -0,0 +1,72 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import unittest
|
||||
from datetime import datetime, timezone
|
||||
|
||||
from cold_display_guard.vision import (
|
||||
Frame,
|
||||
Region,
|
||||
RuntimeVisionSettings,
|
||||
ZoneOccupancyDetector,
|
||||
point_in_polygon,
|
||||
)
|
||||
|
||||
|
||||
def solid_frame(width: int, height: int, value: int) -> Frame:
|
||||
return Frame(width=width, height=height, rgb=bytes([value, value, value]) * width * height)
|
||||
|
||||
|
||||
def patched_frame(width: int, height: int, base: int, patch: tuple[int, int, int, int, int]) -> Frame:
|
||||
x1, y1, x2, y2, value = patch
|
||||
pixels = bytearray(bytes([base, base, base]) * width * height)
|
||||
for y in range(y1, y2):
|
||||
for x in range(x1, x2):
|
||||
offset = (y * width + x) * 3
|
||||
pixels[offset : offset + 3] = bytes([value, value, value])
|
||||
return Frame(width=width, height=height, rgb=bytes(pixels))
|
||||
|
||||
|
||||
class VisionTests(unittest.TestCase):
|
||||
def test_point_in_polygon(self) -> None:
|
||||
polygon = ((0.0, 0.0), (1.0, 0.0), (1.0, 1.0), (0.0, 1.0))
|
||||
|
||||
self.assertTrue(point_in_polygon(0.5, 0.5, polygon))
|
||||
self.assertFalse(point_in_polygon(1.5, 0.5, polygon))
|
||||
|
||||
def test_detector_reports_occupied_after_baseline_changes(self) -> None:
|
||||
detector = ZoneOccupancyDetector(
|
||||
[Region("r1c1", ((0.0, 0.0), (1.0, 0.0), (1.0, 1.0), (0.0, 1.0)))],
|
||||
trash_region=None,
|
||||
settings=RuntimeVisionSettings(
|
||||
baseline_frames=1,
|
||||
sample_stride_pixels=4,
|
||||
occupancy_mean_delta=10,
|
||||
occupancy_texture_delta=10,
|
||||
),
|
||||
)
|
||||
now = datetime(2026, 4, 28, 10, 0, tzinfo=timezone.utc)
|
||||
|
||||
baseline_counts, _, _ = detector.observe(solid_frame(32, 32, 30), now)
|
||||
changed_counts, _, _ = detector.observe(patched_frame(32, 32, 30, (0, 0, 32, 32, 90)), now)
|
||||
|
||||
self.assertEqual(baseline_counts, {"r1c1": 0})
|
||||
self.assertEqual(changed_counts, {"r1c1": 1})
|
||||
|
||||
def test_detector_reports_trash_motion(self) -> None:
|
||||
trash = Region("trash", ((0.0, 0.0), (1.0, 0.0), (1.0, 1.0), (0.0, 1.0)))
|
||||
detector = ZoneOccupancyDetector(
|
||||
[],
|
||||
trash_region=trash,
|
||||
settings=RuntimeVisionSettings(sample_stride_pixels=4, trash_motion_delta=10),
|
||||
)
|
||||
now = datetime(2026, 4, 28, 10, 0, tzinfo=timezone.utc)
|
||||
|
||||
_, first_deposit, _ = detector.observe(solid_frame(32, 32, 30), now)
|
||||
_, second_deposit, _ = detector.observe(solid_frame(32, 32, 90), now)
|
||||
|
||||
self.assertEqual(first_deposit, 0)
|
||||
self.assertEqual(second_deposit, 1)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
@@ -484,10 +484,18 @@ function renderMetrics() {
|
||||
const cards = [
|
||||
["事件总数", metrics.event_count ?? 0],
|
||||
["违规事件", metrics.violation_count ?? 0],
|
||||
["诊断帧数", metrics.diagnostics_count ?? 0],
|
||||
["基线状态", metrics.baseline_ready ? "ready" : "learning"],
|
||||
["最新报警", metrics.latest_alert_time || "-"],
|
||||
["事件文件", metrics.events_path || "-"],
|
||||
];
|
||||
els.metrics.innerHTML = cards.map(([label, value]) => `<div class="metric"><span>${label}</span><strong>${value}</strong></div>`).join("");
|
||||
const zoneCounts = metrics.latest_zone_counts || {};
|
||||
const zoneSummary = Object.keys(zoneCounts).length
|
||||
? `<div class="metric wide"><span>最新区域状态</span><strong>${Object.entries(zoneCounts)
|
||||
.map(([zoneId, count]) => `${zoneId}:${count}`)
|
||||
.join(" ")}</strong></div>`
|
||||
: "";
|
||||
els.metrics.innerHTML = cards.map(([label, value]) => `<div class="metric"><span>${label}</span><strong>${value}</strong></div>`).join("") + zoneSummary;
|
||||
}
|
||||
|
||||
function renderEvents() {
|
||||
|
||||
@@ -226,6 +226,10 @@ canvas {
|
||||
font-size: 18px;
|
||||
}
|
||||
|
||||
.metric.wide {
|
||||
grid-column: 1 / -1;
|
||||
}
|
||||
|
||||
.events-table {
|
||||
overflow: auto;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user