diff --git a/progress.md b/progress.md index 9608c23..731fd89 100644 --- a/progress.md +++ b/progress.md @@ -199,6 +199,12 @@ | 2026-05-29 | Phase 1 | Coding Agent | Fixed testing-agent findings | Added target validation, nullable optional fields, and evidence/count double-consume guard | | 2026-05-29 | Phase 1 | Testing Agent | Re-tested phase 1 fixes | Verdict pass; no bugs found | | 2026-05-29 | Phase 1 | Main Agent | Ran local verification | `tests.test_engine` passed with 24 tests; full Python suite passed with 55 tests | +| 2026-05-29 | Phase 2 | Main Agent | Marked Phase 2 as `in_progress` | Preparing fresh coding/testing agents for lightweight motion trajectory detection | +| 2026-05-29 | Phase 2 | Coding Agent | Implemented initial lightweight `TrajectoryTracker` | Target vision tests passed locally, but testing agent found multi-candidate and source-margin risks | +| 2026-05-29 | Phase 2 | Testing Agent | Reviewed initial trajectory tracker | Verdict fail: single blob can confirm multiple candidates, source margin false positive, diagnostics lack per-candidate reasons | +| 2026-05-29 | Phase 2 | Coding Agent | Fixed trajectory tracker findings | Added blob consumption, strict source polygon origin, and per-candidate diagnostics | +| 2026-05-29 | Phase 2 | Testing Agent | Re-tested phase 2 fixes | Verdict pass; no bugs found | +| 2026-05-29 | Phase 2 | Main Agent | Ran local verification | `tests.test_vision` passed with 20 tests; full Python suite passed with 64 tests; dependency scan had no model/heavy vision matches | ### Test Results @@ -206,6 +212,9 @@ | --- | --- | --- | --- | | 2026-05-29 | `PYTHONPATH=src python3 -m unittest tests.test_engine -v` | pass | 24 engine tests passed after phase 1 evidence fixes | | 2026-05-29 | `PYTHONPATH=src python3 -m unittest discover -s tests -v` | pass | 55 full Python tests passed after phase 1 | +| 2026-05-29 | `PYTHONPATH=src python3 -m unittest tests.test_vision -v` | pass | 20 vision tests passed after phase 2 trajectory tracker | +| 2026-05-29 | `PYTHONPATH=src python3 -m unittest discover -s tests -v` | pass | 64 full Python tests passed after phase 2 | +| 2026-05-29 | `rg -n "ultralytics|torch|onnxruntime|openvino|opencv|cv2|numpy" src tests pyproject.toml` | pass | No matches; command exited 1 because no heavy vision/model dependency was found | ### Bug Loop @@ -214,6 +223,32 @@ | Phase 1 | `disposal_evidence` and `trash_deposit_count` can double-consume the same disposal signal | Added regression test and suppress generic trash fallback when confirming source-specific evidence exists in the observation | Resolved; testing agent and local full Python suite passed | | Phase 1 | High-confidence evidence with non-trash target can close pending disposal | Added target whitelist for `trash` / `trash_bin` plus regression test | Resolved; testing agent and local full Python suite passed | | Phase 1 | `item_class: null` and `detector_score: null` lose null semantics | Changed optional evidence fields to preserve `None` plus regression test | Resolved; testing agent and local full Python suite passed | +| Phase 2 | Single motion blob can confirm multiple active candidates | Added frame-local blob IDs and consume each sampled blob once per frame | Resolved; testing agent and local full Python suite passed | +| Phase 2 | Source-zone margin can treat near-outside movement as source-origin movement | Source-origin check now requires strict source polygon containment | Resolved; testing agent and local full Python suite passed | +| Phase 2 | Trajectory diagnostics only expose aggregate counts | Added `emitted`, `rejected`, and `expired` diagnostic lists with source, reason, point count, confidence, and direction score | Resolved; testing agent and local full Python suite passed | + +## 2026-05-29 Phase Completed: Phase 2 - Lightweight Motion Trajectory Backend + +Status: complete + +Files Changed: +- `src/cold_display_guard/vision.py` +- `tests/test_vision.py` +- `task_plan.md` +- `progress.md` + +Tests: +- `PYTHONPATH=src python3 -m unittest tests.test_vision -v`: pass +- `PYTHONPATH=src python3 -m unittest discover -s tests -v`: pass +- `rg -n "ultralytics|torch|onnxruntime|openvino|opencv|cv2|numpy" src tests pyproject.toml`: pass, no matches + +Notes: +- `TrajectoryTracker` now emits `DisposalEvidence` with `target=trash` and `method=motion`. +- Tracker uses frame-delta motion blobs, strict source-origin validation, target ROI validation, direction scoring, and per-candidate diagnostics. +- Multiple candidates cannot reuse the same frame-local blob for confirmation. + +Risks: +- Tracker is implemented but not yet wired into `main.py`; phase 3 will integrate runtime observation, diagnostics, and faster active-candidate sampling. ## 2026-05-29 Phase Completed: Phase 1 - Data Contract And Engine Evidence Handling diff --git a/src/cold_display_guard/vision.py b/src/cold_display_guard/vision.py index 72b6d0d..f566462 100644 --- a/src/cold_display_guard/vision.py +++ b/src/cold_display_guard/vision.py @@ -4,6 +4,8 @@ from dataclasses import dataclass from datetime import datetime, timedelta from typing import Any +from cold_display_guard.models import DisposalEvidence + @dataclass(frozen=True, slots=True) class Frame: @@ -41,6 +43,19 @@ class RuntimeVisionSettings: trash_sustained_motion_delta: float = 8.0 trash_sustained_motion_frames: int = 2 trash_motion_cooldown_seconds: int = 3 + trajectory_enabled: bool = True + trajectory_window_seconds: int = 8 + trajectory_sample_interval_seconds: float = 1.0 + trajectory_min_points: int = 3 + trajectory_min_confidence: float = 0.72 + trajectory_motion_delta: float = 20.0 + trajectory_min_blob_area: int = 12 + trajectory_max_blob_area_fraction: float = 0.35 + trajectory_trash_entry_margin: float = 0.04 + trajectory_backend: str = "motion" + yolo_enabled: bool = False + yolo_model_path: str = "" + yolo_min_confidence: float = 0.65 @dataclass(frozen=True, slots=True) @@ -52,6 +67,28 @@ class RegionMetrics: bright_fraction: float = 0.0 +@dataclass(slots=True) +class _MotionPoint: + blob_id: int + x: float + y: float + area: int + when: datetime + + +@dataclass(slots=True) +class _TrajectoryCandidate: + source_region: Region + opened_at: datetime + last_sample_at: datetime | None = None + points: list[_MotionPoint] | None = None + source_motion_seen: bool = False + + def __post_init__(self) -> None: + if self.points is None: + self.points = [] + + class ZoneOccupancyDetector: def __init__( self, @@ -212,6 +249,351 @@ class ZoneOccupancyDetector: return 1 if deposit else 0 +class TrajectoryTracker: + def __init__( + self, + regions: list[Region], + trash_region: Region | None, + settings: RuntimeVisionSettings | None = None, + ) -> None: + self.regions = regions + self.trash_region = trash_region + self.settings = settings or RuntimeVisionSettings() + self._previous_frame: Frame | None = None + self._previous_zone_counts: dict[str, int] = {} + self._candidates: list[_TrajectoryCandidate] = [] + + @property + def has_active_candidates(self) -> bool: + return bool(self._candidates) + + def observe( + self, + frame: Frame, + when: datetime, + zone_counts: dict[str, int], + ) -> tuple[list[DisposalEvidence], dict[str, Any]]: + diagnostics: dict[str, Any] = { + "active_candidates": len(self._candidates), + "emitted_evidence": 0, + "expired_candidates": 0, + "rejected_candidates": 0, + "emitted": [], + "rejected": [], + "expired": [], + "disabled": False, + "reason": None, + } + if not self.settings.trajectory_enabled: + diagnostics["disabled"] = True + diagnostics["reason"] = "trajectory_disabled" + self._remember(frame, zone_counts) + return [], diagnostics + if self.trash_region is None: + diagnostics["disabled"] = True + diagnostics["reason"] = "missing_trash_region" + self._remember(frame, zone_counts) + return [], diagnostics + if self.settings.trajectory_backend != "motion": + diagnostics["disabled"] = True + diagnostics["reason"] = "unsupported_trajectory_backend" + self._remember(frame, zone_counts) + return [], diagnostics + + blobs = self._motion_points(frame, when) if self._previous_frame is not None else [] + self._open_candidates(when, zone_counts) + + emitted: list[DisposalEvidence] = [] + remaining: list[_TrajectoryCandidate] = [] + consumed_blob_ids: set[int] = set() + emitted_track_signatures: set[tuple[tuple[float, float], ...]] = set() + for candidate in self._candidates: + rejected_reason = self._sample_candidate(candidate, blobs, when, consumed_blob_ids) + if rejected_reason is not None: + diagnostics["rejected_candidates"] += 1 + diagnostics["rejected"].append(self._candidate_event(candidate, rejected_reason)) + continue + if when - candidate.opened_at > timedelta(seconds=self.settings.trajectory_window_seconds): + diagnostics["expired_candidates"] += 1 + diagnostics["expired"].append(self._candidate_event(candidate, "expired")) + if not self._candidate_ready(candidate): + diagnostics["rejected_candidates"] += 1 + diagnostics["rejected"].append(self._candidate_event(candidate, self._rejection_reason(candidate))) + else: + signature = self._track_signature(candidate) + if signature in emitted_track_signatures: + diagnostics["rejected_candidates"] += 1 + diagnostics["rejected"].append(self._candidate_event(candidate, "ambiguous_motion_track")) + else: + emitted.append(self._evidence(candidate, when)) + emitted_track_signatures.add(signature) + diagnostics["emitted"].append(self._candidate_event(candidate, "emitted")) + continue + if self._candidate_reached_trash(candidate): + if self._candidate_ready(candidate): + signature = self._track_signature(candidate) + if signature in emitted_track_signatures: + diagnostics["rejected_candidates"] += 1 + diagnostics["rejected"].append(self._candidate_event(candidate, "ambiguous_motion_track")) + else: + emitted.append(self._evidence(candidate, when)) + emitted_track_signatures.add(signature) + diagnostics["emitted"].append(self._candidate_event(candidate, "emitted")) + else: + diagnostics["rejected_candidates"] += 1 + diagnostics["rejected"].append(self._candidate_event(candidate, self._rejection_reason(candidate))) + continue + remaining.append(candidate) + + self._candidates = remaining + diagnostics["emitted_evidence"] = len(emitted) + diagnostics["active_candidates"] = len(self._candidates) + diagnostics["motion_points"] = len(blobs) + self._remember(frame, zone_counts) + return emitted, diagnostics + + def _remember(self, frame: Frame, zone_counts: dict[str, int]) -> None: + self._previous_frame = frame + self._previous_zone_counts = {region_id: max(0, int(count)) for region_id, count in zone_counts.items()} + + def _open_candidates(self, when: datetime, zone_counts: dict[str, int]) -> None: + active_region_ids = {candidate.source_region.region_id for candidate in self._candidates} + for region in self.regions: + previous = self._previous_zone_counts.get(region.region_id, 0) + current = max(0, int(zone_counts.get(region.region_id, 0))) + if previous > 0 and current == 0 and region.region_id not in active_region_ids: + self._candidates.append(_TrajectoryCandidate(source_region=region, opened_at=when)) + + def _motion_points(self, frame: Frame, when: datetime) -> list[_MotionPoint]: + previous = self._previous_frame + if previous is None or previous.width != frame.width or previous.height != frame.height: + return [] + + width = frame.width + height = frame.height + changed = bytearray(width * height) + threshold = self.settings.trajectory_motion_delta + for y in range(height): + for x in range(width): + offset = (y * width + x) * 3 + current_luma = _luma(frame.rgb[offset], frame.rgb[offset + 1], frame.rgb[offset + 2]) + previous_luma = _luma(previous.rgb[offset], previous.rgb[offset + 1], previous.rgb[offset + 2]) + if abs(current_luma - previous_luma) >= threshold: + changed[y * width + x] = 1 + + min_area = max(1, int(self.settings.trajectory_min_blob_area)) + max_area = max(min_area, int(width * height * self.settings.trajectory_max_blob_area_fraction)) + points: list[_MotionPoint] = [] + next_blob_id = 0 + for start in range(width * height): + if not changed[start]: + continue + stack = [start] + changed[start] = 0 + area = 0 + sum_x = 0 + sum_y = 0 + while stack: + index = stack.pop() + x = index % width + y = index // width + area += 1 + sum_x += x + sum_y += y + if x > 0: + neighbor = index - 1 + if changed[neighbor]: + changed[neighbor] = 0 + stack.append(neighbor) + if x + 1 < width: + neighbor = index + 1 + if changed[neighbor]: + changed[neighbor] = 0 + stack.append(neighbor) + if y > 0: + neighbor = index - width + if changed[neighbor]: + changed[neighbor] = 0 + stack.append(neighbor) + if y + 1 < height: + neighbor = index + width + if changed[neighbor]: + changed[neighbor] = 0 + stack.append(neighbor) + if min_area <= area <= max_area: + points.append( + _MotionPoint( + blob_id=next_blob_id, + x=(sum_x / area + 0.5) / width, + y=(sum_y / area + 0.5) / height, + area=area, + when=when, + ) + ) + next_blob_id += 1 + return points + + def _sample_candidate( + self, + candidate: _TrajectoryCandidate, + blobs: list[_MotionPoint], + when: datetime, + consumed_blob_ids: set[int], + ) -> str | None: + if not blobs: + return None + if ( + candidate.last_sample_at is not None + and (when - candidate.last_sample_at).total_seconds() < self.settings.trajectory_sample_interval_seconds + ): + return None + + if not candidate.source_motion_seen: + source_blobs = [ + blob for blob in blobs if region_contains(candidate.source_region, blob.x, blob.y) + ] + if source_blobs: + available_source_blobs = [blob for blob in source_blobs if blob.blob_id not in consumed_blob_ids] + if not available_source_blobs: + return "ambiguous_motion_track" + candidate.source_motion_seen = True + point = _nearest_point(region_center(candidate.source_region), available_source_blobs) + else: + available_blobs = [blob for blob in blobs if blob.blob_id not in consumed_blob_ids] + if not available_blobs: + return None + point = _nearest_point(region_center(candidate.source_region), available_blobs) + else: + previous = candidate.points[-1] if candidate.points else None + available_blobs = [blob for blob in blobs if blob.blob_id not in consumed_blob_ids] + if not available_blobs: + return None + point = self._next_progress_point(candidate, available_blobs, previous) + + if candidate.points and _distance((candidate.points[-1].x, candidate.points[-1].y), (point.x, point.y)) < 0.015: + return None + candidate.points.append(point) + consumed_blob_ids.add(point.blob_id) + candidate.last_sample_at = when + return None + + def _next_progress_point( + self, + candidate: _TrajectoryCandidate, + blobs: list[_MotionPoint], + previous: _MotionPoint | None, + ) -> _MotionPoint: + if self.trash_region is None: + return blobs[0] + source = region_center(candidate.source_region) + target = region_center(self.trash_region) + expected = (target[0] - source[0], target[1] - source[1]) + expected_length = (expected[0] ** 2 + expected[1] ** 2) ** 0.5 + if expected_length <= 1e-9: + origin = (previous.x, previous.y) if previous is not None else source + return _nearest_point(origin, blobs) + unit = (expected[0] / expected_length, expected[1] / expected_length) + origin = (previous.x, previous.y) if previous is not None else source + + def score(point: _MotionPoint) -> float: + dx = point.x - origin[0] + dy = point.y - origin[1] + projection = dx * unit[0] + dy * unit[1] + perpendicular = abs(dx * unit[1] - dy * unit[0]) + return projection - 0.25 * perpendicular + + return max(blobs, key=score) + + def _candidate_reached_trash(self, candidate: _TrajectoryCandidate) -> bool: + points = candidate.points or [] + return any( + region_contains(self.trash_region, point.x, point.y, margin=self.settings.trajectory_trash_entry_margin) + for point in points + if self.trash_region is not None + ) + + def _candidate_ready(self, candidate: _TrajectoryCandidate) -> bool: + confidence = self._confidence(candidate) + return ( + candidate.source_motion_seen + and self._candidate_reached_trash(candidate) + and len(candidate.points or []) >= self.settings.trajectory_min_points + and self._direction_score(candidate) >= 0.35 + and confidence >= self.settings.trajectory_min_confidence + ) + + def _rejection_reason(self, candidate: _TrajectoryCandidate) -> str: + if not candidate.source_motion_seen: + return "missing_source_motion" + if not self._candidate_reached_trash(candidate): + return "did_not_reach_trash" + if len(candidate.points or []) < self.settings.trajectory_min_points: + return "insufficient_points" + if self._direction_score(candidate) < 0.35: + return "bad_direction" + if self._confidence(candidate) < self.settings.trajectory_min_confidence: + return "low_confidence" + return "rejected" + + def _candidate_event(self, candidate: _TrajectoryCandidate, reason: str) -> dict[str, Any]: + return { + "source_zone_id": candidate.source_region.region_id, + "reason": reason, + "point_count": len(candidate.points or []), + "confidence": round(self._confidence(candidate), 3), + "direction_score": round(self._direction_score(candidate), 3), + } + + def _track_signature(self, candidate: _TrajectoryCandidate) -> tuple[tuple[float, float], ...]: + return tuple((round(point.x, 4), round(point.y, 4)) for point in candidate.points or []) + + def _confidence(self, candidate: _TrajectoryCandidate) -> float: + point_count = len(candidate.points or []) + point_score = min(1.0, point_count / max(1, self.settings.trajectory_min_points)) + source_score = 1.0 if candidate.source_motion_seen else 0.0 + trash_score = 1.0 if self._candidate_reached_trash(candidate) else 0.0 + direction_score = max(0.0, self._direction_score(candidate)) + return min(1.0, 0.20 * source_score + 0.35 * trash_score + 0.25 * direction_score + 0.20 * point_score) + + def _direction_score(self, candidate: _TrajectoryCandidate) -> float: + points = candidate.points or [] + if len(points) < 2 or self.trash_region is None: + return 0.0 + start = points[0] + end = points[-1] + motion = (end.x - start.x, end.y - start.y) + motion_length = (motion[0] ** 2 + motion[1] ** 2) ** 0.5 + if motion_length <= 1e-9: + return 0.0 + source = region_center(candidate.source_region) + target = region_center(self.trash_region) + expected = (target[0] - source[0], target[1] - source[1]) + expected_length = (expected[0] ** 2 + expected[1] ** 2) ** 0.5 + if expected_length <= 1e-9: + return 0.0 + return (motion[0] * expected[0] + motion[1] * expected[1]) / (motion_length * expected_length) + + def _evidence(self, candidate: _TrajectoryCandidate, when: datetime) -> DisposalEvidence: + return DisposalEvidence( + source_zone_id=candidate.source_region.region_id, + target="trash", + confidence=round(self._confidence(candidate), 3), + method="motion", + track_points=[ + { + "x": round(point.x, 4), + "y": round(point.y, 4), + "area": point.area, + "observed_at": point.when.isoformat(), + } + for point in candidate.points or [] + ], + item_class=None, + detector_score=None, + observed_at=when.isoformat(), + ) + + def load_regions(config: dict[str, Any]) -> tuple[list[Region], Region | None]: regions: list[Region] = [] for zone in config.get("zones", []): @@ -247,6 +629,22 @@ def load_runtime_vision_settings(config: dict[str, Any]) -> RuntimeVisionSetting trash_sustained_motion_delta=float(runtime.get("trash_sustained_motion_delta", 8.0)), trash_sustained_motion_frames=max(1, int(runtime.get("trash_sustained_motion_frames", 2))), trash_motion_cooldown_seconds=max(0, int(runtime.get("trash_motion_cooldown_seconds", 3))), + trajectory_enabled=bool(runtime.get("trajectory_enabled", True)), + trajectory_window_seconds=max(1, int(runtime.get("trajectory_window_seconds", 8))), + trajectory_sample_interval_seconds=max(0.0, float(runtime.get("trajectory_sample_interval_seconds", 1.0))), + trajectory_min_points=max(1, int(runtime.get("trajectory_min_points", 3))), + trajectory_min_confidence=float(runtime.get("trajectory_min_confidence", 0.72)), + trajectory_motion_delta=float(runtime.get("trajectory_motion_delta", 20.0)), + trajectory_min_blob_area=max(1, int(runtime.get("trajectory_min_blob_area", 12))), + trajectory_max_blob_area_fraction=max( + 0.0, + min(1.0, float(runtime.get("trajectory_max_blob_area_fraction", 0.35))), + ), + trajectory_trash_entry_margin=max(0.0, float(runtime.get("trajectory_trash_entry_margin", 0.04))), + trajectory_backend=str(runtime.get("trajectory_backend", "motion")), + yolo_enabled=bool(runtime.get("yolo_enabled", False)), + yolo_model_path=str(runtime.get("yolo_model_path", "")), + yolo_min_confidence=float(runtime.get("yolo_min_confidence", 0.65)), ) @@ -310,6 +708,37 @@ def average_metrics(samples: list[RegionMetrics]) -> RegionMetrics: ) +def region_center(region: Region) -> tuple[float, float]: + return ( + sum(point[0] for point in region.polygon) / len(region.polygon), + sum(point[1] for point in region.polygon) / len(region.polygon), + ) + + +def region_contains(region: Region, x: float, y: float, margin: float = 0.0) -> bool: + if margin <= 0: + return point_in_polygon(x, y, region.polygon) + xs = [point[0] for point in region.polygon] + ys = [point[1] for point in region.polygon] + if x < min(xs) - margin or x > max(xs) + margin or y < min(ys) - margin or y > max(ys) + margin: + return False + return point_in_polygon(x, y, region.polygon) or ( + min(xs) - margin <= x <= max(xs) + margin and min(ys) - margin <= y <= max(ys) + margin + ) + + +def _nearest_point(origin: tuple[float, float], points: list[_MotionPoint]) -> _MotionPoint: + return min(points, key=lambda point: _distance(origin, (point.x, point.y))) + + +def _distance(first: tuple[float, float], second: tuple[float, float]) -> float: + return ((first[0] - second[0]) ** 2 + (first[1] - second[1]) ** 2) ** 0.5 + + +def _luma(r: int, g: int, b: int) -> float: + return 0.299 * r + 0.587 * g + 0.114 * b + + def metrics_indicate_occupied( settings: RuntimeVisionSettings, mean_delta: float, diff --git a/task_plan.md b/task_plan.md index a52861c..2df7eeb 100644 --- a/task_plan.md +++ b/task_plan.md @@ -97,7 +97,7 @@ Create and evolve an independent git project under `~/Code` for monitoring food | Phase | Status | Goal | Acceptance Criteria | | --- | --- | --- | --- | | 1 | complete | 建立 `disposal_evidence` 数据契约并让状态机优先按来源区域丢弃 | `Observation` 支持 evidence;engine 能按 `source_zone_id` 精确关闭 pending batch;同帧移除+evidence 有回归测试;旧 `trash_deposit_count` 仍可兜底 | -| 2 | pending | 实现无 YOLO 依赖的轻量轨迹检测 | synthetic frame 测试覆盖源区域到垃圾桶、非源区域运动、未到垃圾桶、单帧反光、多候选互不串扰;不引入模型依赖 | +| 2 | complete | 实现无 YOLO 依赖的轻量轨迹检测 | synthetic frame 测试覆盖源区域到垃圾桶、非源区域运动、未到垃圾桶、单帧反光、多候选互不串扰;不引入模型依赖 | | 3 | pending | 集成 runtime 配置、诊断和候选窗口加速采样 | `main.py` 写入 `disposal_evidence` 与 trajectory diagnostics;配置默认 `trajectory_enabled=true`、`yolo_enabled=false`;候选活跃时使用更短采样间隔 | | 4 | pending | 文档、全量验证和部署准备 | README/project/progress 更新;Python 全量测试通过;前端测试/构建按影响范围验证;远端部署命令和风险记录清楚 | diff --git a/tests/test_vision.py b/tests/test_vision.py index 7144360..ba30b69 100644 --- a/tests/test_vision.py +++ b/tests/test_vision.py @@ -8,6 +8,7 @@ from cold_display_guard.vision import ( Region, RegionMetrics, RuntimeVisionSettings, + TrajectoryTracker, ZoneOccupancyDetector, load_runtime_vision_settings, point_in_polygon, @@ -38,6 +39,11 @@ def multi_patched_frame(width: int, height: int, base: int, patches: list[tuple[ return Frame(width=width, height=height, rgb=bytes(pixels)) +def frame_with_motion_patch(width: int, height: int, top_left: tuple[int, int]) -> Frame: + x, y = top_left + return patched_frame(width, height, 40, (x, y, x + 8, y + 8, 180)) + + class VisionTests(unittest.TestCase): def test_point_in_polygon(self) -> None: polygon = ((0.0, 0.0), (1.0, 0.0), (1.0, 1.0), (0.0, 1.0)) @@ -271,6 +277,276 @@ class VisionTests(unittest.TestCase): self.assertGreaterEqual(zone["dark_fraction"], 0.06) self.assertGreaterEqual(zone["bright_fraction"], 0.18) + def test_motion_track_from_source_zone_to_trash_roi_emits_evidence(self) -> None: + source = Region("source", ((0.05, 0.35), (0.25, 0.35), (0.25, 0.65), (0.05, 0.65))) + trash = Region("trash", ((0.72, 0.35), (0.95, 0.35), (0.95, 0.65), (0.72, 0.65))) + tracker = TrajectoryTracker( + [source], + trash, + RuntimeVisionSettings( + trajectory_sample_interval_seconds=0.0, + trajectory_min_points=3, + trajectory_min_confidence=0.72, + trajectory_motion_delta=20.0, + trajectory_min_blob_area=12, + ), + ) + now = datetime(2026, 4, 28, 10, 0, tzinfo=timezone.utc) + + tracker.observe(frame_with_motion_patch(80, 80, (8, 36)), now, {"source": 1}) + all_evidence: list[object] = [] + emitted_evidence_count = 0 + for index, point in enumerate([(16, 36), (28, 36), (42, 36), (58, 36), (66, 36)]): + evidence, diagnostics = tracker.observe( + frame_with_motion_patch(80, 80, point), + now + timedelta(seconds=index + 1), + {"source": 0}, + ) + all_evidence.extend(evidence) + emitted_evidence_count += diagnostics["emitted_evidence"] + + self.assertTrue(all_evidence) + emitted = all_evidence[0] + self.assertEqual(emitted.source_zone_id, "source") + self.assertEqual(emitted.target, "trash") + self.assertEqual(emitted.method, "motion") + self.assertGreaterEqual(emitted.confidence, 0.72) + self.assertGreaterEqual(len(emitted.track_points), 3) + self.assertGreaterEqual(emitted_evidence_count, 1) + + def test_motion_that_starts_away_from_source_zone_is_rejected(self) -> None: + source = Region("source", ((0.05, 0.35), (0.25, 0.35), (0.25, 0.65), (0.05, 0.65))) + trash = Region("trash", ((0.72, 0.35), (0.95, 0.35), (0.95, 0.65), (0.72, 0.65))) + tracker = TrajectoryTracker( + [source], + trash, + RuntimeVisionSettings(trajectory_sample_interval_seconds=0.0, trajectory_min_points=3), + ) + now = datetime(2026, 4, 28, 10, 0, tzinfo=timezone.utc) + + tracker.observe(frame_with_motion_patch(80, 80, (50, 10)), now, {"source": 1}) + all_evidence: list[object] = [] + rejected_candidates = 0 + for index, point in enumerate([(52, 14), (56, 20), (60, 28), (66, 36)]): + evidence, diagnostics = tracker.observe( + frame_with_motion_patch(80, 80, point), + now + timedelta(seconds=index + 1), + {"source": 0}, + ) + all_evidence.extend(evidence) + rejected_candidates += diagnostics["rejected_candidates"] + + self.assertEqual(all_evidence, []) + self.assertGreaterEqual(rejected_candidates, 1) + + def test_motion_that_never_reaches_trash_roi_is_rejected(self) -> None: + source = Region("source", ((0.05, 0.35), (0.25, 0.35), (0.25, 0.65), (0.05, 0.65))) + trash = Region("trash", ((0.72, 0.35), (0.95, 0.35), (0.95, 0.65), (0.72, 0.65))) + tracker = TrajectoryTracker( + [source], + trash, + RuntimeVisionSettings( + trajectory_window_seconds=3, + trajectory_sample_interval_seconds=0.0, + trajectory_min_points=3, + ), + ) + now = datetime(2026, 4, 28, 10, 0, tzinfo=timezone.utc) + + tracker.observe(frame_with_motion_patch(80, 80, (8, 36)), now, {"source": 1}) + all_evidence: list[object] = [] + diagnostics = {} + for index, point in enumerate([(16, 36), (26, 36), (36, 36), (42, 36), (46, 36)]): + evidence, diagnostics = tracker.observe( + frame_with_motion_patch(80, 80, point), + now + timedelta(seconds=index + 1), + {"source": 0}, + ) + all_evidence.extend(evidence) + + self.assertEqual(all_evidence, []) + self.assertGreaterEqual(diagnostics["expired_candidates"], 1) + self.assertGreaterEqual(diagnostics["rejected_candidates"], 1) + + def test_one_frame_reflection_flash_is_rejected(self) -> None: + source = Region("source", ((0.05, 0.35), (0.25, 0.35), (0.25, 0.65), (0.05, 0.65))) + trash = Region("trash", ((0.72, 0.35), (0.95, 0.35), (0.95, 0.65), (0.72, 0.65))) + tracker = TrajectoryTracker( + [source], + trash, + RuntimeVisionSettings(trajectory_sample_interval_seconds=0.0, trajectory_min_points=3), + ) + now = datetime(2026, 4, 28, 10, 0, tzinfo=timezone.utc) + + tracker.observe(solid_frame(80, 80, 40), now, {"source": 1}) + flash_frame = patched_frame(80, 80, 40, (56, 28, 72, 52, 255)) + evidence, diagnostics = tracker.observe(flash_frame, now + timedelta(seconds=1), {"source": 0}) + later_evidence, later_diagnostics = tracker.observe(solid_frame(80, 80, 40), now + timedelta(seconds=2), {"source": 0}) + + self.assertEqual(evidence, []) + self.assertEqual(later_evidence, []) + self.assertEqual(diagnostics["emitted_evidence"], 0) + self.assertEqual(later_diagnostics["emitted_evidence"], 0) + self.assertEqual(later_diagnostics["rejected_candidates"], 0) + + def test_multiple_active_candidates_do_not_cross_close_each_other(self) -> None: + left = Region("left", ((0.05, 0.15), (0.25, 0.15), (0.25, 0.35), (0.05, 0.35))) + right = Region("right", ((0.05, 0.65), (0.25, 0.65), (0.25, 0.85), (0.05, 0.85))) + trash = Region("trash", ((0.72, 0.35), (0.95, 0.35), (0.95, 0.65), (0.72, 0.65))) + tracker = TrajectoryTracker( + [left, right], + trash, + RuntimeVisionSettings(trajectory_sample_interval_seconds=0.0, trajectory_min_points=3), + ) + now = datetime(2026, 4, 28, 10, 0, tzinfo=timezone.utc) + + tracker.observe( + multi_patched_frame(80, 80, 40, [(8, 18, 16, 26, 180), (8, 54, 16, 62, 180)]), + now, + {"left": 1, "right": 1}, + ) + all_evidence = [] + frames = [ + [(16, 20, 24, 28, 180), (18, 54, 26, 62, 180)], + [(28, 24, 36, 32, 180), (30, 52, 38, 60, 180)], + [(44, 30, 52, 38, 180), (44, 50, 52, 58, 180)], + [(60, 36, 68, 44, 180), (60, 50, 68, 58, 180)], + ] + for index, patches in enumerate(frames): + evidence, _ = tracker.observe( + multi_patched_frame(80, 80, 40, patches), + now + timedelta(seconds=index + 1), + {"left": 0, "right": 0}, + ) + all_evidence.extend(evidence) + + self.assertEqual({item.source_zone_id for item in all_evidence}, {"left", "right"}) + self.assertEqual(len(all_evidence), 2) + + def test_multiple_active_candidates_do_not_emit_same_motion_track(self) -> None: + first = Region("first", ((0.05, 0.35), (0.25, 0.35), (0.25, 0.65), (0.05, 0.65))) + second = Region("second", ((0.05, 0.35), (0.25, 0.35), (0.25, 0.65), (0.05, 0.65))) + trash = Region("trash", ((0.72, 0.35), (0.95, 0.35), (0.95, 0.65), (0.72, 0.65))) + tracker = TrajectoryTracker( + [first, second], + trash, + RuntimeVisionSettings(trajectory_sample_interval_seconds=0.0, trajectory_min_points=3), + ) + now = datetime(2026, 4, 28, 10, 0, tzinfo=timezone.utc) + + tracker.observe(frame_with_motion_patch(80, 80, (8, 36)), now, {"first": 1, "second": 1}) + all_evidence = [] + rejected = [] + for index, point in enumerate([(16, 36), (28, 36), (42, 36), (58, 36), (66, 36)]): + evidence, diagnostics = tracker.observe( + frame_with_motion_patch(80, 80, point), + now + timedelta(seconds=index + 1), + {"first": 0, "second": 0}, + ) + all_evidence.extend(evidence) + rejected.extend(diagnostics["rejected"]) + + tracks = [tuple((point["x"], point["y"]) for point in item.track_points) for item in all_evidence] + self.assertLessEqual(len(all_evidence), 1) + self.assertEqual(len(tracks), len(set(tracks))) + self.assertTrue(any(item["reason"] == "ambiguous_motion_track" for item in rejected)) + + def test_motion_inside_source_margin_but_outside_source_polygon_is_rejected(self) -> None: + source = Region("source", ((0.05, 0.35), (0.25, 0.35), (0.25, 0.65), (0.05, 0.65))) + trash = Region("trash", ((0.72, 0.35), (0.95, 0.35), (0.95, 0.65), (0.72, 0.65))) + tracker = TrajectoryTracker( + [source], + trash, + RuntimeVisionSettings(trajectory_sample_interval_seconds=0.0, trajectory_min_points=3), + ) + now = datetime(2026, 4, 28, 10, 0, tzinfo=timezone.utc) + + tracker.observe(frame_with_motion_patch(80, 80, (20, 36)), now, {"source": 1}) + all_evidence = [] + rejected = [] + for index, point in enumerate([(24, 36), (36, 36), (50, 36), (66, 36), (70, 36)]): + evidence, diagnostics = tracker.observe( + frame_with_motion_patch(80, 80, point), + now + timedelta(seconds=index + 1), + {"source": 0}, + ) + all_evidence.extend(evidence) + rejected.extend(diagnostics["rejected"]) + + self.assertEqual(all_evidence, []) + self.assertTrue(any(item["reason"] == "missing_source_motion" for item in rejected)) + + def test_trajectory_diagnostics_include_per_candidate_events(self) -> None: + source = Region("source", ((0.05, 0.35), (0.25, 0.35), (0.25, 0.65), (0.05, 0.65))) + trash = Region("trash", ((0.72, 0.35), (0.95, 0.35), (0.95, 0.65), (0.72, 0.65))) + tracker = TrajectoryTracker( + [source], + trash, + RuntimeVisionSettings( + trajectory_window_seconds=3, + trajectory_sample_interval_seconds=0.0, + trajectory_min_points=3, + ), + ) + now = datetime(2026, 4, 28, 10, 0, tzinfo=timezone.utc) + + tracker.observe(frame_with_motion_patch(80, 80, (8, 36)), now, {"source": 1}) + emitted_diagnostics = {} + for index, point in enumerate([(16, 36), (28, 36), (42, 36), (58, 36)]): + _, emitted_diagnostics = tracker.observe( + frame_with_motion_patch(80, 80, point), + now + timedelta(seconds=index + 1), + {"source": 0}, + ) + emitted_event = emitted_diagnostics["emitted"][0] + self.assert_candidate_event(emitted_event, "source", "emitted") + + tracker = TrajectoryTracker( + [source], + trash, + RuntimeVisionSettings( + trajectory_window_seconds=2, + trajectory_sample_interval_seconds=0.0, + trajectory_min_points=3, + ), + ) + tracker.observe(frame_with_motion_patch(80, 80, (8, 36)), now, {"source": 1}) + rejected_diagnostics = {} + for index, point in enumerate([(16, 36), (26, 36), (36, 36), (42, 36)]): + _, rejected_diagnostics = tracker.observe( + frame_with_motion_patch(80, 80, point), + now + timedelta(seconds=index + 1), + {"source": 0}, + ) + expired_event = rejected_diagnostics["expired"][0] + rejected_event = rejected_diagnostics["rejected"][0] + self.assert_candidate_event(expired_event, "source", "expired") + self.assert_candidate_event(rejected_event, "source", "did_not_reach_trash") + + def assert_candidate_event(self, event: dict[str, object], source_zone_id: str, reason: str) -> None: + self.assertEqual(event["source_zone_id"], source_zone_id) + self.assertEqual(event["reason"], reason) + self.assertIn("point_count", event) + self.assertIn("confidence", event) + self.assertIn("direction_score", event) + + def test_runtime_vision_defaults_include_trajectory_and_yolo_fields(self) -> None: + settings = load_runtime_vision_settings({}) + + self.assertTrue(settings.trajectory_enabled) + self.assertEqual(settings.trajectory_window_seconds, 8) + self.assertEqual(settings.trajectory_sample_interval_seconds, 1.0) + self.assertEqual(settings.trajectory_min_points, 3) + self.assertEqual(settings.trajectory_min_confidence, 0.72) + self.assertEqual(settings.trajectory_motion_delta, 20.0) + self.assertEqual(settings.trajectory_min_blob_area, 12) + self.assertEqual(settings.trajectory_max_blob_area_fraction, 0.35) + self.assertEqual(settings.trajectory_trash_entry_margin, 0.04) + self.assertEqual(settings.trajectory_backend, "motion") + self.assertFalse(settings.yolo_enabled) + self.assertEqual(settings.yolo_model_path, "") + self.assertEqual(settings.yolo_min_confidence, 0.65) + if __name__ == "__main__": unittest.main()