Compare commits

...

9 Commits

Author SHA1 Message Date
Yoilun
a4657a4bdf fix: seed trajectories from recent source motion 2026-06-01 12:03:05 +08:00
Yoilun
03bc7085ea feat: allow segmented disposal trajectories 2026-06-01 11:06:42 +08:00
Yoilun
1ecf881684 fix: ignore global lighting shifts in occupancy 2026-06-01 09:56:11 +08:00
Yoilun
100b949f1f fix: harden v1.2 trajectory disposal matching 2026-05-29 16:26:15 +08:00
Yoilun
90aa5dd704 feat: integrate trajectory runtime diagnostics 2026-05-29 15:58:26 +08:00
Yoilun
39cfc76fa2 feat: add lightweight trajectory tracking 2026-05-29 15:48:06 +08:00
Yoilun
d805273a10 feat: add disposal evidence engine handling 2026-05-29 15:30:07 +08:00
Yoilun
5f518991bf docs: plan v1.2 trajectory recognition workflow 2026-05-29 15:18:58 +08:00
Yoilun
ac6d368810 docs: design yolo-ready trajectory evidence 2026-05-29 15:13:48 +08:00
17 changed files with 2530 additions and 16 deletions

1
.gitignore vendored
View File

@@ -3,6 +3,7 @@ __pycache__/
.DS_Store
*.textClipping
.pytest_cache/
.superpowers/
.venv/
dist/
build/

View File

@@ -20,7 +20,9 @@
## 当前实现范围
当前版本先实现纯业务状态机,不依赖摄像头模型。后续视觉模块只需要输出标准观察数据:
当前版本已经接入可运行的轻量视觉流程:区域占用、垃圾桶动作和 v1.2 的轻量 motion trajectory 都使用启发式图像差分实现,不使用 YOLO。后续训练好的 YOLO 食品检测模型会通过统一的 `disposal_evidence` / backend 合约接入,不改变批次计时状态机的业务输入形态。
视觉或 backend 模块需要输出标准观察数据:
```json
{
@@ -29,7 +31,23 @@
"1": 1,
"2": 0
},
"trash_deposit": false
"trash_deposit": false,
"disposal_evidence": [
{
"source_zone_id": "1",
"target": "trash",
"confidence": 0.9,
"method": "motion",
"track_points": [
{"x": 0.22, "y": 0.30},
{"x": 0.48, "y": 0.58},
{"x": 0.76, "y": 0.78}
],
"item_class": null,
"detector_score": null,
"observed_at": "2026-04-27T10:00:03+08:00"
}
]
}
```
@@ -132,15 +150,17 @@ scripts/run_runtime.sh
2.`ffmpeg` 周期抓取小尺寸 RGB 帧。
3. 按标定区域做占用变化检测。
4. 判断垃圾桶区域是否有明显投放动作。
5. 调用批次计时状态机
6. 写入 `logs/events.jsonl`,管理页会读取这个文件
5. 对刚清空的来源区域运行轻量 motion trajectory生成可选的 `disposal_evidence`
6. 调用批次计时状态机,优先使用匹配 `source_zone_id``disposal_evidence` 确认丢弃,再回退到通用垃圾桶动作
7. 写入 `logs/events.jsonl`,管理页会读取这个文件。
当前视觉版本是可运行的启发式版本:
- 每个格口输出 `0/1` 占用状态,不识别单份数量。
- 启动后的前几帧用于建立空柜基线,默认 `3` 帧。
- 如果启动时格口里已经有食品,系统会把它当作基线,后续要等画面变化后才会产生计时事件。
- 真实生产精度后续应接食品检测模型
- v1.2 轨迹识别是轻量 motion trajectory不加载 YOLO不要求模型文件
- 训练好的 YOLO 模型后续应作为新的 backend 接入,并继续输出统一的 `disposal_evidence`
可选运行参数可以放在配置文件的 `[runtime]` 中:
@@ -163,13 +183,40 @@ occupancy_reflection_dark_fraction = 0.10
occupancy_reflection_bright_dark_ratio = 2.0
occupancy_confirm_frames = 2
empty_confirm_frames = 2
lighting_shift_guard_enabled = true
lighting_shift_min_regions = 3
lighting_shift_region_fraction = 0.6
lighting_shift_mean_delta = 45.0
trash_motion_delta = 18.0
trash_sustained_motion_delta = 8.0
trash_sustained_motion_frames = 2
trash_motion_cooldown_seconds = 3
trajectory_enabled = true
trajectory_window_seconds = 8
trajectory_sample_interval_seconds = 1.0
trajectory_min_points = 3
trajectory_segmented_enabled = true
trajectory_segmented_min_points = 2
trajectory_min_confidence = 0.72
trajectory_motion_delta = 20.0
trajectory_min_blob_area = 12
trajectory_max_blob_area_fraction = 0.35
trajectory_trash_entry_margin = 0.04
trajectory_backend = "motion"
yolo_enabled = false
yolo_model_path = ""
yolo_min_confidence = 0.65
diagnostics_path = "logs/runtime_diagnostics.jsonl"
```
`trajectory_backend = "motion"` 表示当前使用轻量轨迹 backend。`yolo_enabled``yolo_model_path``yolo_min_confidence` 是为后续训练模型预留的配置项;当前版本即使保留这些字段,也不会启用 YOLO 推理。
运行诊断写入 `logs/runtime_diagnostics.jsonl`。每行包含顶层 `disposal_evidence`,以及 `diagnostics.trajectory`
- 顶层 `disposal_evidence`:本帧实际输出给状态机的来源区域到垃圾桶证据。
- `diagnostics.lighting_shift`:多数区域同时同方向亮度漂移时启用,防止灯光/曝光变化被当成食品进出。
- `diagnostics.trajectory`:轻量轨迹 backend 的候选、来源 motion 预缓存、过期、拒绝、分段轨迹和已发出证据等调试信息。
## 本地测试
```bash

View File

@@ -36,6 +36,7 @@ polygon = [[0.581255, 0.408928], [0.717971, 0.468544], [0.711092, 0.574018], [0.
roi = [[0.776842, 0.486901], [0.896842, 0.522456], [0.841053, 0.857427], [0.716842, 0.853684]]
[runtime]
sample_interval_seconds = 5.0
sample_stride_pixels = 4
occupancy_mean_delta = 55.0
occupancy_dark_luma_threshold = 80.0
@@ -47,10 +48,29 @@ occupancy_reflection_dark_fraction = 0.10
occupancy_reflection_bright_dark_ratio = 2.0
occupancy_confirm_frames = 2
empty_confirm_frames = 2
lighting_shift_guard_enabled = true
lighting_shift_min_regions = 3
lighting_shift_region_fraction = 0.6
lighting_shift_mean_delta = 45.0
trash_motion_delta = 18.0
trash_sustained_motion_delta = 8.0
trash_sustained_motion_frames = 2
trash_motion_cooldown_seconds = 3
trajectory_enabled = true
trajectory_window_seconds = 8
trajectory_sample_interval_seconds = 1.0
trajectory_min_points = 3
trajectory_segmented_enabled = true
trajectory_segmented_min_points = 2
trajectory_min_confidence = 0.72
trajectory_motion_delta = 20.0
trajectory_min_blob_area = 12
trajectory_max_blob_area_fraction = 0.35
trajectory_trash_entry_margin = 0.04
trajectory_backend = "motion"
yolo_enabled = false
yolo_model_path = ""
yolo_min_confidence = 0.65
[event_sink]
path = "logs/events.jsonl"

View File

@@ -6,6 +6,8 @@
The `v1.1 优化改造` batch upgrades the product from a fixed 8-zone layout to a configurable 1-10 zone workflow with numeric region labels and editable trash ROI calibration. All v1.1 items are part of one batch; backend, API, frontend, and documentation are implementation workstreams inside that same batch.
The `v1.2 轨迹识别` batch adds source-zone trajectory evidence for disposal confirmation. The first implementation uses lightweight motion tracking and keeps YOLO disabled, while preserving an evidence contract that a later trained YOLO product detector can enrich.
## Architecture
- Backend package: `src/cold_display_guard/`
@@ -15,6 +17,7 @@ The `v1.1 优化改造` batch upgrades the product from a fixed 8-zone layout to
- `manage_api.py`: standard-library HTTP management API.
- `main.py`: RTSP runtime loop connecting frame capture, vision, state engine, and JSONL sinks.
- `vision.py`: heuristic ROI occupancy and trash-motion detection.
- v1.2 adds trajectory evidence between vision and engine: `TrajectoryTracker` emits source-zone-to-trash evidence; `BatchEngine` consumes the backend-neutral `disposal_evidence` contract.
- Frontend package: `web/`
- Vite + vanilla JavaScript management console.
- Default web port `23000`.
@@ -23,6 +26,7 @@ The `v1.1 优化改造` batch upgrades the product from a fixed 8-zone layout to
- Runtime data:
- Events JSONL default path `logs/events.jsonl`.
- Diagnostics JSONL default path `logs/runtime_diagnostics.jsonl`.
- v1.2 diagnostics include root-level `disposal_evidence` plus `diagnostics.trajectory`.
- Deployment:
- Root Python Docker image for API/runtime.
- `web/Dockerfile` for static web console.
@@ -45,6 +49,42 @@ The `v1.1 优化改造` batch upgrades the product from a fixed 8-zone layout to
- Trash ROI:
- Stored under `[trash] roi`.
- Does not use a food zone number.
- v1.2 trajectory settings:
- `lighting_shift_guard_enabled`: freezes occupancy changes when many regions shift brightness in the same direction.
- `lighting_shift_min_regions`, `lighting_shift_region_fraction`, `lighting_shift_mean_delta`: tune the global lighting/exposure guard.
- `trajectory_enabled`: enables source-zone trajectory evidence.
- `trajectory_window_seconds`: seconds after a zone clears where movement can confirm disposal.
- `trajectory_sample_interval_seconds`: faster runtime delay while a candidate is active.
- `trajectory_min_points`: minimum sampled motion points required before evidence can emit.
- `trajectory_segmented_enabled`, `trajectory_segmented_min_points`: allow a source point plus trash-entry point to confirm disposal when the middle of the path is occluded.
- `trajectory_min_confidence`: minimum confidence before evidence can close pending disposal.
- `trajectory_motion_delta`: frame-difference threshold for trajectory motion points.
- `trajectory_min_blob_area`: minimum connected motion area to keep as a point.
- `trajectory_max_blob_area_fraction`: rejects overly broad frame motion as ambiguous.
- `trajectory_trash_entry_margin`: margin for treating a track point as entering the trash ROI.
- `trajectory_backend`: first valid value is `"motion"`.
- `yolo_enabled`, `yolo_model_path`, `yolo_min_confidence`: reserved for a future trained model backend. Current v1.2 keeps YOLO disabled.
## v1.2 Runtime Flow
1. `RTSPFrameSource` captures a resized RGB frame.
2. `ZoneOccupancyDetector` updates per-zone binary occupancy and generic trash-motion count from calibrated ROIs.
3. `TrajectoryTracker` caches recent source-region motion while a zone is occupied, watches zones that just cleared, follows lightweight motion points toward the trash ROI, and emits source-specific `DisposalEvidence` when confidence passes the configured threshold. If the middle of the path is occluded, a segmented source-to-trash track can still emit evidence.
4. `BatchEngine` processes `Observation(zone_counts, trash_deposit_count, disposal_evidence)`.
5. For pending disposal, matching `disposal_evidence.source_zone_id` confirms `batch_discarded` before generic FIFO `trash_deposit_count` fallback is used.
6. Runtime writes events to `logs/events.jsonl` and diagnostics to `logs/runtime_diagnostics.jsonl`.
The current tracker is a motion backend only. A later trained YOLO detector should plug in as another backend that enriches or replaces the evidence producer while preserving the same `disposal_evidence` contract consumed by the engine.
## Diagnostics
- Runtime diagnostics JSONL records one item per runtime iteration.
- Root `disposal_evidence` is the exact evidence list passed into the engine for that iteration.
- `diagnostics.zones` contains occupancy metrics used to derive `zone_counts`.
- `diagnostics.lighting_shift` reports whether global brightness drift suppressed occupancy transitions.
- `diagnostics.trash` contains generic trash-motion metrics and cooldown state.
- `diagnostics.trajectory` contains v1.2 candidate counts, emitted evidence count, motion point count, source-seeded and segmented-track flags, and per-candidate emitted/rejected/expired records.
- Capture failures still keep the v1.2 schema with root `disposal_evidence: []` and `diagnostics.trajectory.reason = "frame_capture_failed"`.
## Event Model
@@ -59,6 +99,8 @@ The `v1.1 优化改造` batch upgrades the product from a fixed 8-zone layout to
Events should include `zone_id`, `zone_index`, `zone_label`, `started_at`, `dwell_seconds`, and relevant alarm/removal/deadline timestamps when available.
In v1.2, `batch_discarded` can be triggered by zone-scoped `disposal_evidence` before falling back to generic `trash_deposit_count`. Evidence must match the pending batch's `source_zone_id`.
## Runbook
- Python tests:
@@ -75,10 +117,18 @@ Events should include `zone_id`, `zone_index`, `zone_label`, `started_at`, `dwel
- `scripts/run_runtime.sh`
- One-frame smoke test when camera and `ffmpeg` are available:
- `PYTHONPATH=src python3 -m cold_display_guard.main --config config/example.toml --once`
- v1.2 operating notes:
- Keep `trajectory_backend = "motion"` and `yolo_enabled = false` unless a trained YOLO backend has been explicitly deployed.
- Confirm `logs/runtime_diagnostics.jsonl` contains top-level `disposal_evidence` and `diagnostics.trajectory` before judging trajectory behavior from events alone.
- When `TrajectoryTracker` has active candidates, runtime sampling uses `trajectory_sample_interval_seconds`; this can temporarily be faster than the normal `sample_interval_seconds`.
- On remote deployments, preserve the remote `config/example.toml` calibration and stream settings when syncing code.
## Known Risks
- The current vision detector is heuristic and reports binary occupancy, not item counts.
- The lighting-shift guard suppresses multi-zone brightness/exposure jumps; if operators intentionally fill most zones at once under a large lighting change, diagnostics should be reviewed before treating that interval as clean data.
- v1.2 motion tracking improves disposal matching but can still miss movement if the hand/object path is occluded, too broad, too small, or sampled too sparsely.
- YOLO config fields are present for compatibility, but no trained YOLO model is part of the current runtime.
- If food is already present during baseline collection, those regions may be treated as empty baseline until visual changes occur.
- Changing calibration while the runtime process has active batches can create operational ambiguity; v1.1 should document or enforce a pause/restart expectation.
- Historical events must keep the zone index at the time of emission so later region reordering does not reinterpret old logs.

View File

@@ -0,0 +1,169 @@
# v1.2 Trajectory Recognition Implementation Plan
> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking.
**Goal:** Add source-zone trajectory evidence so alarmed items moved to the trash are discarded by their actual source zone, while keeping YOLO as a future optional backend.
**Architecture:** Extend `Observation` with backend-neutral `disposal_evidence`, make `BatchEngine` consume matching evidence before generic trash fallback, then add a no-dependency motion trajectory tracker in the vision layer. Runtime writes diagnostics and uses faster sampling only while trajectory candidates are active.
**Tech Stack:** Python 3.11+ standard library, existing `Frame` RGB bytes, `unittest`, Vite/Node tests only if frontend files change.
---
### Task 1: Data Contract And Engine Evidence Handling
**Files:**
- Modify: `src/cold_display_guard/models.py`
- Modify: `src/cold_display_guard/engine.py`
- Test: `tests/test_engine.py`
- [ ] **Step 1: Write failing tests**
Add tests for:
- `Observation.from_dict()` normalizes `disposal_evidence`.
- Matching evidence discards the pending batch for the same source zone.
- Evidence for zone 4 does not discard pending zone 1.
- Same-observation removal plus evidence closes the newly pending batch.
- Low-confidence evidence is ignored.
- [ ] **Step 2: Run RED tests**
Run: `PYTHONPATH=src python3 -m unittest tests.test_engine -v`
Expected: FAIL because `Observation` has no `disposal_evidence` and engine ignores evidence.
- [ ] **Step 3: Implement minimal contract and engine logic**
Add a `DisposalEvidence` dataclass and `Observation.disposal_evidence`. In `BatchEngine.process()`, apply evidence to matching `pending_disposal` before generic trash deposits and again after zone transitions for same-frame removals.
- [ ] **Step 4: Run GREEN tests**
Run: `PYTHONPATH=src python3 -m unittest tests.test_engine -v`
Expected: PASS.
- [ ] **Step 5: Commit phase**
Run:
```bash
git add src/cold_display_guard/models.py src/cold_display_guard/engine.py tests/test_engine.py
git commit -m "feat: add disposal evidence engine handling"
```
### Task 2: Lightweight Motion Trajectory Backend
**Files:**
- Modify: `src/cold_display_guard/vision.py`
- Test: `tests/test_vision.py`
- [ ] **Step 1: Write failing tests**
Add synthetic RGB-frame tests for:
- Motion from source zone to trash ROI emits evidence.
- Motion that starts away from source zone is rejected.
- Motion that never reaches trash ROI is rejected.
- One-frame reflection flash is rejected.
- Multiple active candidates do not cross-close each other.
- [ ] **Step 2: Run RED tests**
Run: `PYTHONPATH=src python3 -m unittest tests.test_vision -v`
Expected: FAIL because no trajectory tracker exists.
- [ ] **Step 3: Implement minimal motion tracker**
Add trajectory settings, candidate state, motion blob extraction from frame deltas, confidence scoring, and diagnostics. Keep the implementation standard-library only.
- [ ] **Step 4: Run GREEN tests**
Run: `PYTHONPATH=src python3 -m unittest tests.test_vision -v`
Expected: PASS.
- [ ] **Step 5: Commit phase**
Run:
```bash
git add src/cold_display_guard/vision.py tests/test_vision.py
git commit -m "feat: add lightweight trajectory tracking"
```
### Task 3: Runtime Configuration And Diagnostics Integration
**Files:**
- Modify: `src/cold_display_guard/main.py`
- Modify: `src/cold_display_guard/vision.py`
- Modify: `config/example.toml`
- Test: `tests/test_vision.py`
- Test: `tests/test_main.py`
- [ ] **Step 1: Write failing tests**
Add tests that verify runtime defaults include trajectory settings with YOLO disabled and diagnostics rows include emitted evidence when present.
- [ ] **Step 2: Run RED tests**
Run: `PYTHONPATH=src python3 -m unittest tests.test_vision tests.test_main -v`
Expected: FAIL because runtime does not pass evidence into `Observation` or expose trajectory sampling state.
- [ ] **Step 3: Implement runtime integration**
Return `disposal_evidence` from vision observation, write it to diagnostics, pass it to `Observation`, and use `trajectory_sample_interval_seconds` while candidates are active.
- [ ] **Step 4: Run GREEN tests**
Run: `PYTHONPATH=src python3 -m unittest tests.test_vision tests.test_main -v`
Expected: PASS.
- [ ] **Step 5: Commit phase**
Run:
```bash
git add src/cold_display_guard/main.py src/cold_display_guard/vision.py config/example.toml tests/test_vision.py tests/test_main.py
git commit -m "feat: integrate trajectory runtime diagnostics"
```
### Task 4: Documentation, Full Verification, And Deployment Prep
**Files:**
- Modify: `README_zh.md`
- Modify: `docs/project.md`
- Modify: `task_plan.md`
- Modify: `findings.md`
- Modify: `progress.md`
- Modify: `memories.md`
- [ ] **Step 1: Update docs**
Document v1.2 trajectory settings, evidence semantics, tests, and remote deployment notes.
- [ ] **Step 2: Run full verification**
Run:
```bash
PYTHONPATH=src python3 -m unittest discover -s tests -v
node --test web/test/zone-state.test.js
cd web && pnpm build
```
Expected: PASS for all commands. If frontend files did not change, frontend commands still provide regression coverage for the management console.
- [ ] **Step 3: Commit phase**
Run:
```bash
git add README_zh.md docs/project.md task_plan.md findings.md progress.md memories.md docs/superpowers/plans/2026-05-29-v1.2-trajectory-recognition.md
git commit -m "docs: document v1.2 trajectory recognition"
```
- [ ] **Step 4: Prepare remote deploy**
Use rsync excluding `config/example.toml`, rebuild runtime/API, and verify Docker services. Record the exact commands and results in `progress.md`.

View File

@@ -0,0 +1,278 @@
# Lightweight Trajectory Tracking With YOLO-Ready Evidence
Date: 2026-05-29
Branch: `lightweight-trajectory-tracking`
## Summary
The runtime currently confirms disposal by matching a zone becoming empty with generic trash-bin motion. That produces false matches when several zones change close together, when the trash ROI moves for an unrelated reason, or when reflection changes look like motion.
This design adds a trajectory evidence layer. Version 1 uses lightweight motion tracking to infer "source zone -> trash ROI" during a short window after a zone becomes empty. Version 2 can add a trained YOLO backend later without changing the event engine contract.
The first implementation must not require YOLO, PyTorch, ONNX Runtime, or OpenVINO. It must keep the current ROI occupancy timer and add a stronger disposal confirmation path.
## Goals
- Confirm disposal by source zone, not by FIFO matching alone.
- Reduce cases where zone 1 or zone 4 removal is incorrectly matched to another zone.
- Suppress reflection-only and trash-bin-only movement from confirming disposal.
- Keep CPU load low by activating trajectory analysis only after a zone becomes empty.
- Preserve a stable data contract that a future trained YOLO model can enrich.
## Non-Goals
- Do not convert the whole project to YOLO in the first trajectory version.
- Do not train or bundle a model in this branch.
- Do not replace ROI occupancy timing; it remains the authority for zone occupied/empty state.
- Do not require visual access inside the trash bin. Confirmation is based on motion entering the trash mouth ROI.
## Current Architecture
`main.py` captures one RTSP frame per sample interval with `ffmpeg`, passes it to `ZoneOccupancyDetector.observe()`, creates an `Observation`, and sends it to `BatchEngine.process()`.
`vision.py` currently outputs:
- `zone_counts`: stable occupied/empty state per configured zone.
- `trash_deposit_count`: count of generic trash ROI motion events.
- `diagnostics`: metrics for zones and trash motion.
`engine.py` currently consumes:
- `Observation.zone_counts`
- `Observation.trash_deposit_count`
When a timed-out batch is removed, it becomes pending disposal. A later trash motion can close pending batches, using FIFO order when source-zone evidence is missing.
## Proposed Architecture
Add a trajectory evidence path between vision and engine:
1. Zone occupancy still runs first.
2. When a zone transitions from occupied to empty, vision opens a short tracking window for that zone.
3. While any tracking window is active, the runtime temporarily shortens the capture delay so movement is sampled densely enough for a path.
4. During the window, a lightweight motion backend tracks moving blobs across the source zone, the path/corridor, and the trash mouth ROI.
5. If the path is coherent, vision emits a zone-scoped disposal evidence item.
6. The engine applies zone-scoped disposal evidence before using generic trash motion fallback.
The engine should depend on a neutral evidence format, not on YOLO or any specific tracking backend.
## Data Contract
Add `disposal_evidence` to `Observation`.
Example V1 evidence:
```json
{
"source_zone_id": "1",
"target": "trash",
"confidence": 0.86,
"method": "motion",
"started_at": "2026-05-29T14:03:20+08:00",
"ended_at": "2026-05-29T14:03:25+08:00",
"track_points": [[152, 210], [181, 219], [226, 235], [275, 252]],
"item_class": null,
"detector_score": null
}
```
Example later YOLO-enriched evidence:
```json
{
"source_zone_id": "1",
"target": "trash",
"confidence": 0.94,
"method": "motion+yolo",
"started_at": "2026-05-29T14:03:20+08:00",
"ended_at": "2026-05-29T14:03:25+08:00",
"track_points": [[152, 210], [181, 219], [226, 235], [275, 252]],
"item_class": "trained_product_a",
"detector_score": 0.91
}
```
`trash_deposit_count` remains for compatibility and fallback, but zone-scoped `disposal_evidence` takes priority.
## Components
### `TrajectoryTracker`
Owns active tracking windows. It receives current frame, timestamp, zone counts, region polygons, and trash ROI.
Responsibilities:
- Detect occupied-to-empty transitions.
- Start a per-zone candidate window.
- Keep recent motion observations for each active candidate.
- Decide whether a candidate has enough evidence to emit disposal evidence.
- Expire weak candidates without closing a batch.
- Report whether any candidate is active so `main.py` can use the faster trajectory sample interval.
### `MotionTrajectoryBackend`
The default V1 backend. It uses frame-to-frame differences and connected motion regions.
Responsibilities:
- Compute motion mask from the current and previous frame.
- Filter out tiny, static, and reflection-like changes.
- Extract moving blob centroids and bounding boxes.
- Associate centroids over time into a short track.
- Return backend-neutral track observations.
The backend must work without external model dependencies.
### `YoloDetectionBackend`
An optional future backend. It is not implemented in V1 but the interface is reserved.
Responsibilities when enabled later:
- Run only during active tracking windows or on configured path crops.
- Detect trained product classes and optionally hands/person keypoints.
- Attach `item_class`, `detector_score`, and bounding boxes to the same evidence contract.
- Never bypass trajectory validation. YOLO detections enrich confidence; they do not directly close events.
### `EvidenceFusion`
Combines backend output into final evidence.
V1 uses motion-only signals:
- Origin score: first meaningful motion is near or inside the source zone.
- Direction score: track generally moves from source zone toward trash ROI.
- Target score: final track points intersect or approach the trash mouth ROI.
- Stability score: track persists across enough frames and is not a one-frame flash.
V2 can add YOLO class and detector confidence into the same confidence calculation.
### `BatchEngine`
The engine should process evidence in this order:
1. Expire old pending disposal records.
2. Apply zone-scoped `disposal_evidence` to matching pending batches first.
3. Process zone transitions.
4. Apply any evidence created in the same observation to newly pending batches.
5. Use remaining generic `trash_deposit_count` as fallback for older behavior.
Zone-scoped evidence should only discard the pending batch from `source_zone_id`. It must not close a different zone when the source zone has no pending disposal.
## Runtime Flow
1. A zone is occupied long enough to create an active batch.
2. The batch reaches the dwell alarm threshold and emits `time_alarm`.
3. The item is removed from the zone.
4. Occupancy confirms the zone is empty, and the tracker opens or continues a short candidate window for that zone.
5. The engine emits `batch_pending_disposal` for that zone.
6. While the candidate is active, the runtime samples faster than the normal dwell timer interval.
7. Motion backend observes a track from source zone toward trash ROI.
8. If the track enters the trash mouth ROI with enough confidence, `disposal_evidence` is emitted.
9. The engine emits `batch_discarded` for that same zone. If evidence is emitted in the same observation that created pending disposal, the engine applies it after processing the zone-empty transition.
10. If no evidence arrives before the pending deadline, the current warning escalation behavior remains.
## Configuration
Add runtime settings with conservative defaults:
```toml
[runtime]
trajectory_enabled = true
trajectory_window_seconds = 8
trajectory_sample_interval_seconds = 1.0
trajectory_min_points = 3
trajectory_min_confidence = 0.72
trajectory_motion_delta = 20.0
trajectory_min_blob_area = 12
trajectory_max_blob_area_fraction = 0.35
trajectory_trash_entry_margin = 0.04
trajectory_backend = "motion"
yolo_enabled = false
yolo_model_path = ""
yolo_min_confidence = 0.65
```
`yolo_enabled = false` is the only valid first implementation mode. The config keys are included so deployment files and UI can evolve without changing the observation contract.
`trajectory_sample_interval_seconds` applies only while at least one trajectory candidate is active. Normal monitoring keeps using the existing `sample_interval_seconds`.
## Diagnostics
Append trajectory diagnostics to each runtime diagnostics row:
```json
{
"trajectory": {
"active_candidates": ["1"],
"emitted_evidence": [
{
"source_zone_id": "1",
"confidence": 0.86,
"method": "motion"
}
],
"expired_candidates": [],
"rejected_candidates": [
{
"source_zone_id": "4",
"reason": "target_not_reached"
}
]
}
}
```
Diagnostics should explain why a candidate was accepted, expired, or rejected. This is required for tuning the live camera.
## Error Handling
- If tracking cannot run because there is no previous frame, no evidence is emitted.
- If trash ROI is not configured, trajectory evidence is disabled and current generic behavior remains.
- If faster sampling cannot keep up with RTSP capture time, runtime should continue at the achievable rate and record capture timing in diagnostics.
- If multiple zones become empty at once, keep independent candidates. A track can confirm only one source zone unless future YOLO tracking explicitly supports multiple objects.
- If evidence confidence is below threshold, do not close pending disposal.
- If YOLO is enabled later but the model fails to load, runtime should fall back to motion-only tracking and record a diagnostic error.
## Testing Strategy
Unit tests:
- `Observation.from_dict()` accepts and normalizes `disposal_evidence`.
- Engine discards a pending batch from the matching source zone when evidence arrives.
- Engine does not discard zone 1 when evidence says source zone 4.
- Same-observation zone removal plus disposal evidence closes the newly pending batch.
- Generic `trash_deposit_count` still works as fallback.
- Low-confidence evidence is ignored.
Vision tests:
- Motion track from zone polygon to trash ROI emits evidence.
- Motion that starts away from the source zone is rejected.
- Motion that never reaches trash ROI is rejected.
- One-frame reflection flash is rejected.
- Multiple active candidates do not cross-close each other.
Runtime tests:
- Diagnostics include trajectory status.
- Config defaults load with trajectory enabled and YOLO disabled.
- Existing tests for zone occupancy, trash motion, restore state, API summary, and web zone rendering keep passing.
## Rollout Plan
1. Implement the data contract and engine evidence handling behind config.
2. Add motion trajectory backend and diagnostics.
3. Keep generic trash motion fallback enabled during rollout.
4. Deploy to the remote runtime and observe diagnostics for zones 1, 2, 4, 5, 6, and trash ROI.
5. Tune thresholds from live diagnostics.
6. Later, add YOLO backend as a separate implementation that feeds the same evidence contract.
## Acceptance Criteria
- Removing an alarmed item from zone 1 and moving it visibly to the trash mouth closes zone 1, not another zone.
- Removing alarmed items from multiple zones close together does not rely on FIFO when trajectory evidence identifies the source zone.
- Motion inside trash ROI alone does not confirm disposal if no source-zone trajectory exists.
- Reflection-only changes do not emit disposal evidence.
- The runtime works without YOLO dependencies installed.
- The future YOLO path can be added by implementing the reserved backend without changing `BatchEngine` event semantics.

View File

@@ -52,6 +52,37 @@
Use `阶段 x` only as a workflow stage inside the same `v1.1 优化改造` batch.
## v1.2 轨迹识别 Findings
## Architecture
- `main.py` 当前每轮从 RTSP 截一帧,调用 `ZoneOccupancyDetector.observe()` 得到 `zone_counts``trash_deposit_count``diagnostics`,再构造 `Observation``BatchEngine.process()`
- `vision.py` 当前只有区域占用和垃圾桶动作计数,没有从“来源区域到垃圾桶”的轨迹证据。
- `engine.py` 当前先处理 pending 过期和旧 trash deposit再处理区域转移最后对同帧剩余 trash deposit 做兜底;这为同帧 evidence 处理提供了插入点。
- `models.py``Observation` 是最适合扩展统一 evidence contract 的位置。
- v1.2 设计规格已保存并提交:`docs/superpowers/specs/2026-05-29-lightweight-trajectory-yolo-ready-design.md`commit `ac6d368`
## Constraints
- 第一版必须在无 YOLO 依赖下运行。
- 轨迹检测只应在区域变空后的短窗口活跃,避免持续 CPU 压力。
- 垃圾桶内部不可见;确认点是进入垃圾桶口 ROI不是看到桶内物品。
- 不能让一个来源区域的 evidence 关闭另一个区域的 pending batch。
- 远端部署必须继续排除 `config/example.toml`,以保留 RTSP 和现场标定。
## Decisions
- 新字段命名为 `disposal_evidence`,元素使用 `source_zone_id``target``confidence``method``track_points`、可选 `item_class``detector_score`
- `trash_deposit_count` 保留兼容和兜底,但 engine 先使用 zone-scoped evidence。
- 轨迹诊断必须记录 active、emitted、rejected、expired方便现场调参。
- 后续 YOLO 模型只作为 evidence 增强输入,不能修改 `BatchEngine` 的业务语义。
## Final Review Findings
- Evidence and generic trash fallback must share the same count budget: one matched `disposal_evidence` consumes one observed trash deposit signal when `trash_deposit_count` is also present, but extra trash deposits must still fall back to pending batches.
- A trajectory candidate must not append source-zone-external motion before source motion has been seen. If outside motion appears first and is later followed by source noise, the candidate is rejected with `motion_started_outside_source` instead of producing evidence.
- Runtime diagnostics on the deployed host should be checked for schema only, not by printing config, because the remote config may contain RTSP credentials and calibration.
## Backend Planning Notes
- `EngineSettings.zone_ids` should remain config driven; numeric zones are preferred for new configs, but old `r1c1` style IDs should continue loading.

42
memories.md Normal file
View File

@@ -0,0 +1,42 @@
# Memories
## User and Workflow
- User wants Chinese responses.
- Project path for subagent headers is `/Users/yoilun/Code/cold_display_guard`.
- Current workflow batch is `v1.2 轨迹识别`.
- User requested following `/Users/yoilun/Code/goal-subagents-workflow-prompt.md`.
- Every subagent task must begin with:
```text
[项目: /Users/yoilun/Code/cold_display_guard]
[工作流批次: v1.2 轨迹识别]
[阶段: 阶段 x]
[角色: 对应智能体角色]
```
## Technical Direction
- First implement lightweight motion trajectory detection without YOLO dependencies.
- Preserve a stable `disposal_evidence` contract for a future trained YOLO product detector.
- Keep ROI occupancy timing as the source of zone occupied/empty state.
- Use trajectory evidence before generic trash-motion FIFO fallback.
- Current runtime writes top-level `disposal_evidence` and nested `diagnostics.trajectory` into runtime diagnostics JSONL.
## v1.2 Completed Facts
- Stage 1 established the backend contract: `Observation.disposal_evidence` normalizes backend-neutral disposal evidence, and the engine can discard a pending batch only when evidence targets `trash`, meets confidence, and matches the pending `source_zone_id`.
- Stage 2 added the lightweight motion trajectory runtime path: ROI occupancy still drives occupied/empty state, `TrajectoryTracker` emits source-zone-to-trash evidence, and generic trash-motion count remains as a fallback.
- Stage 3 added diagnostics and tests for runtime evidence propagation, trajectory sampling interval behavior, capture-failure schema, and trajectory/yolo runtime config parsing.
- Final review fixes: matched evidence now only subtracts the trash fallback budget by the number of batches it actually closed, and trajectory candidates reject outside-before-source motion with `motion_started_outside_source`.
- 2026-06-01 false events across zones 1-7 were caused by a global brightness/exposure drop around 04:55 and recovery around 08:16; the fix is a lighting-shift guard that freezes occupancy transitions when many regions shift brightness in the same direction.
- 2026-06-01 trajectory update allows segmented source-to-trash tracks: after a source-zone motion point is seen, the middle of the path may be occluded, and a later trash-entry point can still emit `disposal_evidence` when direction/confidence pass.
- 2026-06-01 follow-up fixed empty-confirmation lag: `TrajectoryTracker` now caches recent source-region motion while a zone is still occupied and seeds the disposal candidate when the stable zone count finally clears.
- Current v1.2 does not use YOLO. `yolo_enabled`, `yolo_model_path`, and `yolo_min_confidence` are reserved for a future trained model backend that should keep emitting the same `disposal_evidence` shape.
## Remote Deployment Notes
- Remote deployment target is `xiaozheng@192.168.5.206:/home/xiaozheng/cold_display_guard`.
- Preserve the remote `config/example.toml`; it may contain camera, calibration, threshold, and deployment-specific runtime settings that must not be overwritten blindly.
- When syncing code remotely, verify that runtime diagnostics still show top-level `disposal_evidence` and `diagnostics.trajectory` before evaluating v1.2 trajectory behavior from `logs/events.jsonl`.
- The latest v1.2 deployment was verified with `cold-display-guard-runtime` and `cold-display-guard-api` up, API health `status=ok`, and diagnostics schema showing `has_disposal_evidence=True` plus `has_trajectory=True`.

View File

@@ -183,3 +183,185 @@
| Runtime restart can drop threshold-edge timers | Runtime restore used raw threshold recompute instead of stable `occupied`, so a restart during a one-frame raw dip could lose an active timer | Restore now uses stable `occupied` when present and keeps raw recompute only as a fallback | Resolved; regression test covers zone 2-style flicker |
| Zone 1 timer reset | Zone 1's ROI had too few samples with the default stride `8`; dark evidence jumped between `0.0714` and `0.0357/0`, causing two occupied frames followed by two empty frames and repeated short batches | Reduced default and remote `sample_stride_pixels` to `4` so small ROI/object evidence is less quantized | Resolved in current verification; zone 1 remains continuously occupied after deploy |
| Zone 1/4 disposal missed after zone 2 discard | Zone 2 generated a trash deposit at `14:32:13`; zone 1/4 cleared at `14:32:19`, but their strong trash motion was inside the old 8-second cooldown, and one same-frame deposit could only discard one newly pending batch | Reduced trash cooldown to 3 seconds and let one same-frame trash motion discard all newly pending alerted batches from that observation | Resolved for future events; historical zone 1/4 rows are not rewritten |
## 2026-05-29 v1.2 轨迹识别
### Session Log
| Time | Phase | Actor | Action | Result |
| --- | --- | --- | --- | --- |
| 2026-05-29 | Setup | Main Agent | Created Goal for `v1.2 轨迹识别` using `/Users/yoilun/Code/cold_display_guard` as the real project path | Active goal tracks lightweight trajectory detection plus YOLO-ready evidence contract |
| 2026-05-29 | Setup | Main Agent | Read `/Users/yoilun/Code/goal-subagents-workflow-prompt.md` | Workflow requires task files, stage-based coding/testing agents, bug loop limit, and standard subagent context header |
| 2026-05-29 | Setup | Main Agent | Read existing `task_plan.md`, `findings.md`, `progress.md`, and `docs/project.md` | v1.1 state is complete; v1.2 plan can start on branch `lightweight-trajectory-tracking` |
| 2026-05-29 | Phase 1 | Main Agent | Marked Phase 1 as `in_progress` | Preparing coding agent for data contract and engine evidence handling |
| 2026-05-29 | Phase 1 | Coding Agent | Implemented initial `disposal_evidence` contract and engine handling | Target engine tests and full Python tests passed in coding agent run, but testing agent found review issues |
| 2026-05-29 | Phase 1 | Testing Agent | Reviewed phase 1 implementation | Verdict fail: evidence/count double consumption, missing target validation, null classifier fields coercion |
| 2026-05-29 | Phase 1 | Coding Agent | Fixed testing-agent findings | Added target validation, nullable optional fields, and evidence/count double-consume guard |
| 2026-05-29 | Phase 1 | Testing Agent | Re-tested phase 1 fixes | Verdict pass; no bugs found |
| 2026-05-29 | Phase 1 | Main Agent | Ran local verification | `tests.test_engine` passed with 24 tests; full Python suite passed with 55 tests |
| 2026-05-29 | Phase 2 | Main Agent | Marked Phase 2 as `in_progress` | Preparing fresh coding/testing agents for lightweight motion trajectory detection |
| 2026-05-29 | Phase 2 | Coding Agent | Implemented initial lightweight `TrajectoryTracker` | Target vision tests passed locally, but testing agent found multi-candidate and source-margin risks |
| 2026-05-29 | Phase 2 | Testing Agent | Reviewed initial trajectory tracker | Verdict fail: single blob can confirm multiple candidates, source margin false positive, diagnostics lack per-candidate reasons |
| 2026-05-29 | Phase 2 | Coding Agent | Fixed trajectory tracker findings | Added blob consumption, strict source polygon origin, and per-candidate diagnostics |
| 2026-05-29 | Phase 2 | Testing Agent | Re-tested phase 2 fixes | Verdict pass; no bugs found |
| 2026-05-29 | Phase 2 | Main Agent | Ran local verification | `tests.test_vision` passed with 20 tests; full Python suite passed with 64 tests; dependency scan had no model/heavy vision matches |
| 2026-05-29 | Phase 3 | Main Agent | Marked Phase 3 as `in_progress` | Preparing fresh coding/testing agents for runtime integration |
| 2026-05-29 | Phase 3 | Coding Agent | Implemented initial runtime integration | Target main/vision tests and full Python tests passed in coding agent run |
| 2026-05-29 | Phase 3 | Testing Agent | Reviewed runtime integration | Verdict pass with non-blocking concerns: capture-error diagnostics schema, `create=True` patch robustness, broad helper type |
| 2026-05-29 | Phase 3 | Coding Agent | Fixed runtime integration review concerns | Error diagnostics keep trajectory schema; tests no longer use `create=True`; evidence payload helper type narrowed |
| 2026-05-29 | Phase 3 | Testing Agent | Re-tested runtime integration concerns | Verdict pass; no new issues |
| 2026-05-29 | Phase 3 | Main Agent | Ran local verification | `tests.test_main tests.test_vision` passed with 26 tests; full Python suite passed with 68 tests; dependency scan had no model/heavy vision matches |
| 2026-05-29 | Phase 4 | Main Agent | Marked Phase 4 as `in_progress` | Preparing documentation, final verification, remote deployment, and final review |
| 2026-05-29 | Phase 4 | Main Agent | Synced v1.2 code to `xiaozheng@192.168.5.206` | `rsync` completed while excluding remote `config/example.toml` |
| 2026-05-29 | Phase 4 | Main Agent | Tried remote config patch with inline heredoc | Failed with shell quoting `SyntaxError`; switching to scp temporary patch script |
| 2026-05-29 | Phase 4 | Main Agent | Patched remote runtime config via uploaded Python script | Added missing trajectory/yolo runtime keys without printing RTSP config |
| 2026-05-29 | Phase 4 | Main Agent | Rebuilt and restarted remote runtime/API containers | `cold-display-guard-runtime` and `cold-display-guard-api` recreated and started |
| 2026-05-29 | Phase 4 | Final Code Review Agent | Reviewed full v1.2 implementation | Verdict fail: extra trash fallback was suppressed, tracker could seed from outside-source motion, and docs named a non-existent backend |
| 2026-05-29 | Phase 4 | Main Agent | Fixed final review findings | Added regression tests, adjusted trash fallback budgeting, rejected outside-before-source trajectories, and renamed docs to `TrajectoryTracker` |
| 2026-05-29 | Phase 4 | Main Agent | Re-ran local full verification | Python 70 tests passed; frontend 22 tests passed; Vite build passed; no heavy model dependency matches |
| 2026-05-29 | Phase 4 | Main Agent | Re-synced and redeployed fix to `xiaozheng@192.168.5.206` | Runtime/API rebuilt and restarted from the fixed code |
| 2026-05-29 | Phase 4 | Main Agent | Verified remote runtime after redeploy | Containers are up, API health returns `status=ok`, diagnostics contain `disposal_evidence` and `diagnostics.trajectory` |
### Test Results
| Time | Command | Result | Notes |
| --- | --- | --- | --- |
| 2026-05-29 | `PYTHONPATH=src python3 -m unittest tests.test_engine -v` | pass | 24 engine tests passed after phase 1 evidence fixes |
| 2026-05-29 | `PYTHONPATH=src python3 -m unittest discover -s tests -v` | pass | 55 full Python tests passed after phase 1 |
| 2026-05-29 | `PYTHONPATH=src python3 -m unittest tests.test_vision -v` | pass | 20 vision tests passed after phase 2 trajectory tracker |
| 2026-05-29 | `PYTHONPATH=src python3 -m unittest discover -s tests -v` | pass | 64 full Python tests passed after phase 2 |
| 2026-05-29 | `rg -n "ultralytics|torch|onnxruntime|openvino|opencv|cv2|numpy" src tests pyproject.toml` | pass | No matches; command exited 1 because no heavy vision/model dependency was found |
| 2026-05-29 | `PYTHONPATH=src python3 -m unittest tests.test_main tests.test_vision -v` | pass | 26 runtime/vision tests passed after phase 3 |
| 2026-05-29 | `PYTHONPATH=src python3 -m unittest discover -s tests -v` | pass | 68 full Python tests passed after phase 3 |
| 2026-05-29 | `rg -n "ultralytics|torch|onnxruntime|openvino|opencv|cv2|numpy" src tests pyproject.toml` | pass | No matches; command exited 1 because no heavy vision/model dependency was found |
| 2026-05-29 | `PYTHONPATH=src python3 -m unittest discover -s tests -v` | pass | 70 full Python tests passed after final review fixes |
| 2026-05-29 | `node --test web/test/zone-state.test.js` | pass | 22 frontend model tests passed |
| 2026-05-29 | `cd web && pnpm build` | pass | Vite production build passed |
| 2026-05-29 | `rg -n "ultralytics|torch|onnxruntime|openvino|opencv|cv2|numpy" src tests pyproject.toml` | pass | No matches; command exited 1 because no heavy vision/model dependency was found |
| 2026-05-29 | `docker compose ... build cold-display-guard-runtime cold-display-guard-api && docker compose ... up -d --no-deps ...` on remote | pass | Runtime/API rebuilt and restarted after final fixes |
| 2026-05-29 | `curl --max-time 5 http://192.168.5.206:19080/api/manage/health` | pass | `status=ok`, `runtime_status=running` |
| 2026-05-29 | Remote diagnostics schema check script | pass | `has_disposal_evidence=True`, `has_trajectory=True` |
### Bug Loop
| Phase | Bug | Fix Attempt | Retest Result |
| --- | --- | --- | --- |
| Phase 1 | `disposal_evidence` and `trash_deposit_count` can double-consume the same disposal signal | Added regression test and suppress generic trash fallback when confirming source-specific evidence exists in the observation | Resolved; testing agent and local full Python suite passed |
| Phase 1 | High-confidence evidence with non-trash target can close pending disposal | Added target whitelist for `trash` / `trash_bin` plus regression test | Resolved; testing agent and local full Python suite passed |
| Phase 1 | `item_class: null` and `detector_score: null` lose null semantics | Changed optional evidence fields to preserve `None` plus regression test | Resolved; testing agent and local full Python suite passed |
| Phase 2 | Single motion blob can confirm multiple active candidates | Added frame-local blob IDs and consume each sampled blob once per frame | Resolved; testing agent and local full Python suite passed |
| Phase 2 | Source-zone margin can treat near-outside movement as source-origin movement | Source-origin check now requires strict source polygon containment | Resolved; testing agent and local full Python suite passed |
| Phase 2 | Trajectory diagnostics only expose aggregate counts | Added `emitted`, `rejected`, and `expired` diagnostic lists with source, reason, point count, confidence, and direction score | Resolved; testing agent and local full Python suite passed |
| Phase 3 | Capture-error diagnostics rows lack `disposal_evidence` and `diagnostics.trajectory` schema | Added regression test and wrote empty evidence plus trajectory error diagnostics on capture failure | Resolved; testing agent and local full Python suite passed |
| Phase 3 | Runtime tests patch `TrajectoryTracker` with `create=True`, which can mask missing imports | Removed `create=True` and asserted fake tracker observe calls | Resolved; testing agent and local full Python suite passed |
| Phase 3 | `disposal_evidence_payloads()` accepts `list[object]` but blindly calls `asdict()` | Narrowed helper signature to `list[DisposalEvidence]` | Resolved; testing agent and local full Python suite passed |
| Phase 4 | Remote config patch inline heredoc lost the empty-string value for `yolo_model_path` | Switched to an scp-uploaded Python patch script instead of repeating inline quoting | Resolved; remote config patch reported 13 runtime keys patched |
| Phase 4 | Source-specific evidence disabled all generic fallback trash deposits | Subtract only the count of evidence-discard events from `remaining_trash_deposits`; add regression test for evidence plus `trash_deposit_count=2` | Resolved; targeted regression and full Python suite passed |
| Phase 4 | Tracker could keep an outside-source blob as the first point before source motion | Track outside-before-source contamination, reject later source noise with `motion_started_outside_source`, and never append those outside points | Resolved; targeted regression and full Python suite passed |
| Phase 4 | `docs/project.md` referenced non-existent `MotionTrajectoryBackend` | Changed architecture docs to name the implemented `TrajectoryTracker` | Resolved; `rg` no longer finds the stale name in project docs/README/src/tests |
## 2026-05-29 Phase Completed: Phase 4 - Documentation, Verification, And Deployment
Status: complete
Files Changed:
- `.gitignore`
- `README_zh.md`
- `docs/project.md`
- `findings.md`
- `memories.md`
- `progress.md`
- `task_plan.md`
- `src/cold_display_guard/engine.py`
- `src/cold_display_guard/vision.py`
- `tests/test_engine.py`
- `tests/test_vision.py`
Tests:
- `PYTHONPATH=src python3 -m unittest discover -s tests -v`: pass, 70 tests
- `node --test web/test/zone-state.test.js`: pass, 22 tests
- `cd web && pnpm build`: pass
- `rg -n "ultralytics|torch|onnxruntime|openvino|opencv|cv2|numpy" src tests pyproject.toml`: pass, no matches
- Remote Docker rebuild/restart: pass
- Remote API health: pass
- Remote diagnostics schema check: pass
Notes:
- Final review findings were fixed before redeploy.
- Remote sync continued to exclude `config/example.toml` to preserve camera/calibration settings.
- Remote diagnostics check intentionally printed only schema booleans and safe trajectory keys.
Risks:
- v1.2 is still heuristic motion tracking. Live precision should be tuned from diagnostics, and future YOLO integration should continue using the same `disposal_evidence` contract.
## 2026-05-29 Phase Completed: Phase 3 - Runtime Integration
Status: complete
Files Changed:
- `src/cold_display_guard/main.py`
- `config/example.toml`
- `tests/test_main.py`
- `tests/test_vision.py`
- `task_plan.md`
- `progress.md`
Tests:
- `PYTHONPATH=src python3 -m unittest tests.test_main tests.test_vision -v`: pass
- `PYTHONPATH=src python3 -m unittest discover -s tests -v`: pass
- `rg -n "ultralytics|torch|onnxruntime|openvino|opencv|cv2|numpy" src tests pyproject.toml`: pass, no matches
Notes:
- Runtime now passes trajectory `disposal_evidence` into `Observation`.
- Diagnostics rows include top-level serialized `disposal_evidence` and nested `diagnostics.trajectory`.
- Capture failure diagnostics keep the same trajectory/evidence schema.
- Runtime sleeps at `trajectory_sample_interval_seconds` while trajectory candidates are active.
Risks:
- No live-camera validation has run yet in this phase; deployment and remote runtime observation remain phase 4 work.
## 2026-05-29 Phase Completed: Phase 2 - Lightweight Motion Trajectory Backend
Status: complete
Files Changed:
- `src/cold_display_guard/vision.py`
- `tests/test_vision.py`
- `task_plan.md`
- `progress.md`
Tests:
- `PYTHONPATH=src python3 -m unittest tests.test_vision -v`: pass
- `PYTHONPATH=src python3 -m unittest discover -s tests -v`: pass
- `rg -n "ultralytics|torch|onnxruntime|openvino|opencv|cv2|numpy" src tests pyproject.toml`: pass, no matches
Notes:
- `TrajectoryTracker` now emits `DisposalEvidence` with `target=trash` and `method=motion`.
- Tracker uses frame-delta motion blobs, strict source-origin validation, target ROI validation, direction scoring, and per-candidate diagnostics.
- Multiple candidates cannot reuse the same frame-local blob for confirmation.
Risks:
- Tracker is implemented but not yet wired into `main.py`; phase 3 will integrate runtime observation, diagnostics, and faster active-candidate sampling.
## 2026-05-29 Phase Completed: Phase 1 - Data Contract And Engine Evidence Handling
Status: complete
Files Changed:
- `src/cold_display_guard/models.py`
- `src/cold_display_guard/engine.py`
- `tests/test_engine.py`
- `task_plan.md`
- `progress.md`
Tests:
- `PYTHONPATH=src python3 -m unittest tests.test_engine -v`: pass
- `PYTHONPATH=src python3 -m unittest discover -s tests -v`: pass
Notes:
- `Observation` now supports `disposal_evidence`.
- `BatchEngine` applies source-zone evidence before generic trash fallback and again after same-frame zone transitions.
- Evidence must meet confidence threshold and target the trash.
Risks:
- Threshold is currently a phase-1 constant; later runtime config integration will make it configurable.

View File

@@ -3,7 +3,11 @@ from __future__ import annotations
from datetime import datetime
from typing import Any
from cold_display_guard.models import Batch, EngineSettings, Observation
from cold_display_guard.models import Batch, DisposalEvidence, EngineSettings, Observation
DISPOSAL_EVIDENCE_CONFIDENCE_THRESHOLD = 0.72
TRASH_DISPOSAL_TARGETS = {"trash", "trash_bin"}
class BatchEngine:
@@ -20,8 +24,16 @@ class BatchEngine:
zone_counts = self._normalized_counts(observation.zone_counts)
previous_zone_counts = dict(self._zone_counts)
remaining_trash_deposits = observation.trash_deposit_count
used_disposal_evidence: set[int] = set()
events.extend(self._expire_pending_disposal(observation.ts))
evidence_events = self._apply_disposal_evidence(
observation.ts,
observation.disposal_evidence,
used_disposal_evidence,
)
remaining_trash_deposits = max(0, remaining_trash_deposits - len(evidence_events))
events.extend(evidence_events)
trash_events = self._apply_trash_deposits(observation.ts, remaining_trash_deposits)
remaining_trash_deposits = max(0, remaining_trash_deposits - len(trash_events))
events.extend(trash_events)
@@ -67,6 +79,13 @@ class BatchEngine:
self._zone_counts[zone_id] = new_count
newly_pending_count = max(0, len(self.pending_disposal) - pending_count_before_zone_transitions)
evidence_events = self._apply_disposal_evidence(
observation.ts,
observation.disposal_evidence,
used_disposal_evidence,
)
remaining_trash_deposits = max(0, remaining_trash_deposits - len(evidence_events))
events.extend(evidence_events)
trash_deposits_to_apply = remaining_trash_deposits
if remaining_trash_deposits > 0 and newly_pending_count > 1:
trash_deposits_to_apply = max(remaining_trash_deposits, newly_pending_count)
@@ -239,6 +258,39 @@ class BatchEngine:
deposit_count -= 1
return events
def _apply_disposal_evidence(
self,
when: datetime,
disposal_evidence: list[DisposalEvidence],
used_evidence_indices: set[int],
) -> list[dict[str, Any]]:
events: list[dict[str, Any]] = []
for index, evidence in enumerate(disposal_evidence):
if index in used_evidence_indices:
continue
if not self._is_confirming_disposal_evidence(evidence):
continue
pending_index = self._pending_index_for_source_zone(evidence.source_zone_id)
if pending_index is None:
continue
batch = self.pending_disposal.pop(pending_index)
batch.state = "discarded"
self.closed_batches.append(batch)
used_evidence_indices.add(index)
events.append(self._event("batch_discarded", when, batch, severity="info"))
return events
def _is_confirming_disposal_evidence(self, evidence: DisposalEvidence) -> bool:
if evidence.confidence < DISPOSAL_EVIDENCE_CONFIDENCE_THRESHOLD:
return False
return evidence.target.lower() in TRASH_DISPOSAL_TARGETS
def _pending_index_for_source_zone(self, source_zone_id: str) -> int | None:
for index, batch in enumerate(self.pending_disposal):
if batch.zone_id == source_zone_id:
return index
return None
def _expire_pending_disposal(self, when: datetime) -> list[dict[str, Any]]:
events: list[dict[str, Any]] = []
still_pending: list[Batch] = []

View File

@@ -3,6 +3,7 @@ from __future__ import annotations
import argparse
import json
import time
from dataclasses import asdict
from datetime import datetime
from pathlib import Path
from zoneinfo import ZoneInfo
@@ -10,9 +11,10 @@ from zoneinfo import ZoneInfo
from cold_display_guard.config import load_config_document, load_settings, resolve_config_path, resolve_project_root
from cold_display_guard.engine import BatchEngine
from cold_display_guard.frame_source import FrameCaptureError, RTSPFrameSource
from cold_display_guard.models import Observation
from cold_display_guard.models import DisposalEvidence, Observation
from cold_display_guard.vision import (
RegionMetrics,
TrajectoryTracker,
ZoneOccupancyDetector,
load_regions,
load_runtime_vision_settings,
@@ -65,6 +67,7 @@ def run(config_path: str | Path, once: bool = False, max_iterations: int = 0) ->
)
vision_settings = load_runtime_vision_settings(config)
detector = ZoneOccupancyDetector(regions, trash_region, vision_settings)
trajectory_tracker = TrajectoryTracker(regions, trash_region, vision_settings)
engine = BatchEngine(settings)
baseline_seed, active_zone_counts = restore_runtime_state(diagnostics_path, config)
if baseline_seed:
@@ -87,7 +90,13 @@ def run(config_path: str | Path, once: bool = False, max_iterations: int = 0) ->
try:
frame = source.capture()
zone_counts, trash_deposit_count, diagnostics = detector.observe(frame, when)
observation = Observation(ts=when, zone_counts=zone_counts, trash_deposit_count=trash_deposit_count)
disposal_evidence, trajectory_diagnostics = trajectory_tracker.observe(frame, when, zone_counts)
observation = Observation(
ts=when,
zone_counts=zone_counts,
trash_deposit_count=trash_deposit_count,
disposal_evidence=disposal_evidence,
)
events = engine.process(observation)
append_jsonl(event_path, events)
append_jsonl(
@@ -97,7 +106,8 @@ def run(config_path: str | Path, once: bool = False, max_iterations: int = 0) ->
"ts": when.isoformat(),
"zone_counts": zone_counts,
"trash_deposit_count": trash_deposit_count,
"diagnostics": diagnostics,
"disposal_evidence": disposal_evidence_payloads(disposal_evidence),
"diagnostics": {**diagnostics, "trajectory": trajectory_diagnostics},
}
],
)
@@ -106,13 +116,32 @@ def run(config_path: str | Path, once: bool = False, max_iterations: int = 0) ->
except FrameCaptureError as exc:
append_jsonl(
diagnostics_path,
[{"ts": when.isoformat(), "error": "frame_capture_failed", "message": str(exc)}],
[
{
"ts": when.isoformat(),
"error": "frame_capture_failed",
"message": str(exc),
"disposal_evidence": [],
"diagnostics": {
"trajectory": {
"disabled": True,
"reason": "frame_capture_failed",
"emitted_evidence": 0,
}
},
}
],
)
print(f"{when.isoformat()} frame capture failed: {exc}")
if once or (max_iterations > 0 and iteration >= max_iterations):
break
time.sleep(sample_interval_seconds)
sleep_seconds = (
vision_settings.trajectory_sample_interval_seconds
if trajectory_tracker.has_active_candidates
else sample_interval_seconds
)
time.sleep(sleep_seconds)
def resolve_project_path(project_root: Path, raw_path: str) -> Path:
@@ -131,6 +160,10 @@ def append_jsonl(path: Path, payloads: list[dict]) -> None:
handle.write("\n")
def disposal_evidence_payloads(disposal_evidence: list[DisposalEvidence]) -> list[dict]:
return [asdict(item) for item in disposal_evidence]
def restore_runtime_state(diagnostics_path: Path, config: dict) -> tuple[dict[str, RegionMetrics], dict[str, int]]:
latest = load_jsonl_tail(diagnostics_path, 1)
if not latest:

View File

@@ -24,11 +24,39 @@ class EngineSettings:
return timedelta(seconds=self.trash_confirmation_seconds)
@dataclass(frozen=True, slots=True)
class DisposalEvidence:
source_zone_id: str
target: str
confidence: float
method: str
track_points: list[Any]
item_class: str | None
detector_score: float | None
observed_at: str | None = None
@classmethod
def from_dict(cls, payload: dict[str, Any]) -> "DisposalEvidence":
return cls(
source_zone_id=str(payload.get("source_zone_id", "")).strip(),
target=str(payload.get("target", "")).strip(),
confidence=_float_or_zero(payload.get("confidence", 0.0)),
method=str(payload.get("method", "")).strip(),
track_points=_normalize_track_points(payload.get("track_points", [])),
item_class=_optional_string(payload.get("item_class")),
detector_score=_optional_float(payload.get("detector_score")),
observed_at=_optional_string(
payload.get("observed_at", payload.get("detected_at", payload.get("ts")))
),
)
@dataclass(frozen=True, slots=True)
class Observation:
ts: datetime
zone_counts: dict[str, int]
trash_deposit_count: int = 0
disposal_evidence: list[DisposalEvidence] = field(default_factory=list)
@classmethod
def from_dict(cls, payload: dict[str, Any]) -> "Observation":
@@ -46,6 +74,7 @@ class Observation:
ts=ts,
zone_counts={key: max(0, int(value)) for key, value in payload["zone_counts"].items()},
trash_deposit_count=max(0, trash_deposit_count),
disposal_evidence=_normalize_disposal_evidence(payload.get("disposal_evidence", [])),
)
@@ -67,3 +96,47 @@ class Batch:
if self.ended_at is not None:
return self.dwell_seconds
return max(0, int((when - self.started_at).total_seconds()))
def _normalize_disposal_evidence(raw_evidence: Any) -> list[DisposalEvidence]:
if raw_evidence is None:
return []
if isinstance(raw_evidence, dict):
raw_items = [raw_evidence]
else:
raw_items = raw_evidence
return [
DisposalEvidence.from_dict(item)
for item in raw_items
if isinstance(item, dict)
]
def _normalize_track_points(raw_track_points: Any) -> list[Any]:
if isinstance(raw_track_points, list):
return list(raw_track_points)
if isinstance(raw_track_points, tuple):
return list(raw_track_points)
return []
def _float_or_zero(value: Any) -> float:
try:
return float(value)
except (TypeError, ValueError):
return 0.0
def _optional_float(value: Any) -> float | None:
if value is None:
return None
try:
return float(value)
except (TypeError, ValueError):
return None
def _optional_string(value: Any) -> str | None:
if value is None:
return None
return str(value).strip()

View File

@@ -2,8 +2,11 @@ from __future__ import annotations
from dataclasses import dataclass
from datetime import datetime, timedelta
from math import ceil
from typing import Any
from cold_display_guard.models import DisposalEvidence
@dataclass(frozen=True, slots=True)
class Frame:
@@ -37,10 +40,29 @@ class RuntimeVisionSettings:
occupancy_reflection_bright_dark_ratio: float = 2.0
occupancy_confirm_frames: int = 2
empty_confirm_frames: int = 2
lighting_shift_guard_enabled: bool = True
lighting_shift_min_regions: int = 3
lighting_shift_region_fraction: float = 0.6
lighting_shift_mean_delta: float = 45.0
trash_motion_delta: float = 18.0
trash_sustained_motion_delta: float = 8.0
trash_sustained_motion_frames: int = 2
trash_motion_cooldown_seconds: int = 3
trajectory_enabled: bool = True
trajectory_window_seconds: int = 8
trajectory_sample_interval_seconds: float = 1.0
trajectory_min_points: int = 3
trajectory_segmented_enabled: bool = True
trajectory_segmented_min_points: int = 2
trajectory_min_confidence: float = 0.72
trajectory_motion_delta: float = 20.0
trajectory_min_blob_area: int = 12
trajectory_max_blob_area_fraction: float = 0.35
trajectory_trash_entry_margin: float = 0.04
trajectory_backend: str = "motion"
yolo_enabled: bool = False
yolo_model_path: str = ""
yolo_min_confidence: float = 0.65
@dataclass(frozen=True, slots=True)
@@ -52,6 +74,31 @@ class RegionMetrics:
bright_fraction: float = 0.0
@dataclass(slots=True)
class _MotionPoint:
blob_id: int
x: float
y: float
area: int
when: datetime
@dataclass(slots=True)
class _TrajectoryCandidate:
source_region: Region
opened_at: datetime
last_sample_at: datetime | None = None
points: list[_MotionPoint] | None = None
source_motion_seen: bool = False
pre_source_motion_seen: bool = False
source_seeded: bool = False
forced_rejection_reason: str | None = None
def __post_init__(self) -> None:
if self.points is None:
self.points = []
class ZoneOccupancyDetector:
def __init__(
self,
@@ -87,7 +134,12 @@ class ZoneOccupancyDetector:
self._update_baseline(metrics_by_region)
zone_counts: dict[str, int] = {}
diagnostics: dict[str, Any] = {"zones": {}, "baseline_ready": self.baseline_ready}
lighting_shift = self._lighting_shift(metrics_by_region)
diagnostics: dict[str, Any] = {
"zones": {},
"baseline_ready": self.baseline_ready,
"lighting_shift": lighting_shift,
}
for region in self.regions:
metrics = metrics_by_region[region.region_id]
baseline = self._baseline.get(region.region_id)
@@ -95,8 +147,12 @@ class ZoneOccupancyDetector:
if baseline is not None:
mean_delta = abs(metrics.mean_luma - baseline.mean_luma)
texture_delta = metrics.texture - baseline.texture
raw_occupied = self._raw_occupied(metrics, baseline, mean_delta, texture_delta)
occupied = self._confirmed_occupancy(region.region_id, raw_occupied)
if lighting_shift["active"]:
raw_occupied = False
occupied = self._stable_occupancy.get(region.region_id, False)
else:
raw_occupied = self._raw_occupied(metrics, baseline, mean_delta, texture_delta)
occupied = self._confirmed_occupancy(region.region_id, raw_occupied)
diagnostics["zones"][region.region_id] = {
"mean_luma": round(metrics.mean_luma, 3),
"baseline_mean_luma": round(baseline.mean_luma, 3),
@@ -113,6 +169,7 @@ class ZoneOccupancyDetector:
"occupied": occupied,
"occupied_streak": self._occupied_streaks[region.region_id],
"empty_streak": self._empty_streaks[region.region_id],
"lighting_shift_suppressed": lighting_shift["active"],
}
zone_counts[region.region_id] = 1 if occupied else 0
@@ -163,6 +220,60 @@ class ZoneOccupancyDetector:
if len(samples) >= self.settings.baseline_frames:
self._baseline[region_id] = average_metrics(samples)
def _lighting_shift(self, metrics_by_region: dict[str, RegionMetrics]) -> dict[str, Any]:
if not self.settings.lighting_shift_guard_enabled:
return self._lighting_shift_diagnostics(False, None, 0, 0, 0)
eligible_region_count = 0
darker_count = 0
brighter_count = 0
for region in self.regions:
metrics = metrics_by_region.get(region.region_id)
baseline = self._baseline.get(region.region_id)
if metrics is None or baseline is None:
continue
eligible_region_count += 1
signed_delta = metrics.mean_luma - baseline.mean_luma
if abs(signed_delta) < self.settings.lighting_shift_mean_delta:
continue
if signed_delta < 0:
darker_count += 1
else:
brighter_count += 1
required_regions = max(
self.settings.lighting_shift_min_regions,
ceil(eligible_region_count * self.settings.lighting_shift_region_fraction),
)
active_direction: str | None = None
shifted_count = max(darker_count, brighter_count)
if eligible_region_count >= self.settings.lighting_shift_min_regions and shifted_count >= required_regions:
active_direction = "darker" if darker_count >= brighter_count else "brighter"
return self._lighting_shift_diagnostics(
active_direction is not None,
active_direction,
shifted_count,
eligible_region_count,
required_regions,
)
def _lighting_shift_diagnostics(
self,
active: bool,
direction: str | None,
shifted_regions: int,
eligible_regions: int,
required_regions: int,
) -> dict[str, Any]:
return {
"active": active,
"direction": direction,
"shifted_regions": shifted_regions,
"eligible_regions": eligible_regions,
"required_regions": required_regions,
"mean_delta_threshold": self.settings.lighting_shift_mean_delta,
}
def _confirmed_occupancy(self, region_id: str, raw_occupied: bool) -> bool:
if raw_occupied:
self._occupied_streaks[region_id] = self._occupied_streaks.get(region_id, 0) + 1
@@ -212,6 +323,418 @@ class ZoneOccupancyDetector:
return 1 if deposit else 0
class TrajectoryTracker:
def __init__(
self,
regions: list[Region],
trash_region: Region | None,
settings: RuntimeVisionSettings | None = None,
) -> None:
self.regions = regions
self.trash_region = trash_region
self.settings = settings or RuntimeVisionSettings()
self._previous_frame: Frame | None = None
self._previous_zone_counts: dict[str, int] = {}
self._candidates: list[_TrajectoryCandidate] = []
self._recent_source_motion: dict[str, _MotionPoint] = {}
self._emitted_track_signatures: set[tuple[tuple[float, float], ...]] = set()
@property
def has_active_candidates(self) -> bool:
return bool(self._candidates)
def observe(
self,
frame: Frame,
when: datetime,
zone_counts: dict[str, int],
) -> tuple[list[DisposalEvidence], dict[str, Any]]:
diagnostics: dict[str, Any] = {
"active_candidates": len(self._candidates),
"emitted_evidence": 0,
"expired_candidates": 0,
"rejected_candidates": 0,
"emitted": [],
"rejected": [],
"expired": [],
"disabled": False,
"reason": None,
}
if not self.settings.trajectory_enabled:
diagnostics["disabled"] = True
diagnostics["reason"] = "trajectory_disabled"
self._remember(frame, zone_counts)
return [], diagnostics
if self.trash_region is None:
diagnostics["disabled"] = True
diagnostics["reason"] = "missing_trash_region"
self._remember(frame, zone_counts)
return [], diagnostics
if self.settings.trajectory_backend != "motion":
diagnostics["disabled"] = True
diagnostics["reason"] = "unsupported_trajectory_backend"
self._remember(frame, zone_counts)
return [], diagnostics
blobs = self._motion_points(frame, when) if self._previous_frame is not None else []
self._remember_recent_source_motion(blobs, when, zone_counts)
self._open_candidates(when, zone_counts)
emitted: list[DisposalEvidence] = []
remaining: list[_TrajectoryCandidate] = []
consumed_blob_ids: set[int] = set()
emitted_track_signatures = set(self._emitted_track_signatures)
for candidate in self._candidates:
rejected_reason = self._sample_candidate(candidate, blobs, when, consumed_blob_ids)
if rejected_reason is not None:
diagnostics["rejected_candidates"] += 1
diagnostics["rejected"].append(self._candidate_event(candidate, rejected_reason))
continue
if when - candidate.opened_at > timedelta(seconds=self.settings.trajectory_window_seconds):
diagnostics["expired_candidates"] += 1
diagnostics["expired"].append(self._candidate_event(candidate, "expired"))
if not self._candidate_ready(candidate):
diagnostics["rejected_candidates"] += 1
diagnostics["rejected"].append(self._candidate_event(candidate, self._rejection_reason(candidate)))
else:
signature = self._track_signature(candidate)
if signature in emitted_track_signatures:
diagnostics["rejected_candidates"] += 1
diagnostics["rejected"].append(self._candidate_event(candidate, "ambiguous_motion_track"))
else:
emitted.append(self._evidence(candidate, when))
emitted_track_signatures.add(signature)
self._emitted_track_signatures.add(signature)
diagnostics["emitted"].append(self._candidate_event(candidate, "emitted"))
continue
if self._candidate_reached_trash(candidate):
if self._candidate_ready(candidate):
signature = self._track_signature(candidate)
if signature in emitted_track_signatures:
diagnostics["rejected_candidates"] += 1
diagnostics["rejected"].append(self._candidate_event(candidate, "ambiguous_motion_track"))
else:
emitted.append(self._evidence(candidate, when))
emitted_track_signatures.add(signature)
self._emitted_track_signatures.add(signature)
diagnostics["emitted"].append(self._candidate_event(candidate, "emitted"))
else:
diagnostics["rejected_candidates"] += 1
diagnostics["rejected"].append(self._candidate_event(candidate, self._rejection_reason(candidate)))
continue
remaining.append(candidate)
self._candidates = remaining
if not self._candidates:
self._emitted_track_signatures.clear()
diagnostics["emitted_evidence"] = len(emitted)
diagnostics["active_candidates"] = len(self._candidates)
diagnostics["motion_points"] = len(blobs)
self._remember(frame, zone_counts)
return emitted, diagnostics
def _remember(self, frame: Frame, zone_counts: dict[str, int]) -> None:
self._previous_frame = frame
self._previous_zone_counts = {region_id: max(0, int(count)) for region_id, count in zone_counts.items()}
def _open_candidates(self, when: datetime, zone_counts: dict[str, int]) -> None:
active_region_ids = {candidate.source_region.region_id for candidate in self._candidates}
used_seed_signatures: set[tuple[float, float, str]] = set()
for region in self.regions:
previous = self._previous_zone_counts.get(region.region_id, 0)
current = max(0, int(zone_counts.get(region.region_id, 0)))
if previous > 0 and current == 0 and region.region_id not in active_region_ids:
candidate = _TrajectoryCandidate(source_region=region, opened_at=when)
recent = self._recent_source_motion.get(region.region_id)
if recent is not None and when - recent.when <= timedelta(seconds=self.settings.trajectory_window_seconds):
signature = (round(recent.x, 4), round(recent.y, 4), recent.when.isoformat())
if signature in used_seed_signatures:
candidate.forced_rejection_reason = "ambiguous_motion_track"
else:
used_seed_signatures.add(signature)
candidate.points.append(recent)
candidate.source_motion_seen = True
candidate.source_seeded = True
candidate.last_sample_at = recent.when
self._candidates.append(candidate)
self._recent_source_motion.pop(region.region_id, None)
def _remember_recent_source_motion(self, blobs: list[_MotionPoint], when: datetime, zone_counts: dict[str, int]) -> None:
if not blobs:
return
for region in self.regions:
previous = self._previous_zone_counts.get(region.region_id, 0)
current = max(0, int(zone_counts.get(region.region_id, 0)))
if previous <= 0 and current <= 0:
continue
source_blobs = [blob for blob in blobs if region_contains(region, blob.x, blob.y)]
if not source_blobs:
continue
self._recent_source_motion[region.region_id] = _nearest_point(region_center(region), source_blobs)
def _motion_points(self, frame: Frame, when: datetime) -> list[_MotionPoint]:
previous = self._previous_frame
if previous is None or previous.width != frame.width or previous.height != frame.height:
return []
width = frame.width
height = frame.height
changed = bytearray(width * height)
threshold = self.settings.trajectory_motion_delta
for y in range(height):
for x in range(width):
offset = (y * width + x) * 3
current_luma = _luma(frame.rgb[offset], frame.rgb[offset + 1], frame.rgb[offset + 2])
previous_luma = _luma(previous.rgb[offset], previous.rgb[offset + 1], previous.rgb[offset + 2])
if abs(current_luma - previous_luma) >= threshold:
changed[y * width + x] = 1
min_area = max(1, int(self.settings.trajectory_min_blob_area))
max_area = max(min_area, int(width * height * self.settings.trajectory_max_blob_area_fraction))
points: list[_MotionPoint] = []
next_blob_id = 0
for start in range(width * height):
if not changed[start]:
continue
stack = [start]
changed[start] = 0
area = 0
sum_x = 0
sum_y = 0
while stack:
index = stack.pop()
x = index % width
y = index // width
area += 1
sum_x += x
sum_y += y
if x > 0:
neighbor = index - 1
if changed[neighbor]:
changed[neighbor] = 0
stack.append(neighbor)
if x + 1 < width:
neighbor = index + 1
if changed[neighbor]:
changed[neighbor] = 0
stack.append(neighbor)
if y > 0:
neighbor = index - width
if changed[neighbor]:
changed[neighbor] = 0
stack.append(neighbor)
if y + 1 < height:
neighbor = index + width
if changed[neighbor]:
changed[neighbor] = 0
stack.append(neighbor)
if min_area <= area <= max_area:
points.append(
_MotionPoint(
blob_id=next_blob_id,
x=(sum_x / area + 0.5) / width,
y=(sum_y / area + 0.5) / height,
area=area,
when=when,
)
)
next_blob_id += 1
return points
def _sample_candidate(
self,
candidate: _TrajectoryCandidate,
blobs: list[_MotionPoint],
when: datetime,
consumed_blob_ids: set[int],
) -> str | None:
if candidate.forced_rejection_reason is not None:
return candidate.forced_rejection_reason
if not blobs:
return None
if (
candidate.last_sample_at is not None
and (when - candidate.last_sample_at).total_seconds() < self.settings.trajectory_sample_interval_seconds
):
return None
if not candidate.source_motion_seen:
source_blobs = [
blob for blob in blobs if region_contains(candidate.source_region, blob.x, blob.y)
]
if source_blobs:
available_source_blobs = [blob for blob in source_blobs if blob.blob_id not in consumed_blob_ids]
if not available_source_blobs:
return "ambiguous_motion_track"
if candidate.pre_source_motion_seen:
return "motion_started_outside_source"
candidate.source_motion_seen = True
point = _nearest_point(region_center(candidate.source_region), available_source_blobs)
else:
available_blobs = [blob for blob in blobs if blob.blob_id not in consumed_blob_ids]
if not available_blobs:
return None
candidate.pre_source_motion_seen = True
if any(
region_contains(self.trash_region, blob.x, blob.y, margin=self.settings.trajectory_trash_entry_margin)
for blob in available_blobs
if self.trash_region is not None
):
return "motion_started_outside_source"
return None
else:
previous = candidate.points[-1] if candidate.points else None
available_blobs = [blob for blob in blobs if blob.blob_id not in consumed_blob_ids]
if not available_blobs:
return None
point = self._next_progress_point(candidate, available_blobs, previous)
if candidate.points and _distance((candidate.points[-1].x, candidate.points[-1].y), (point.x, point.y)) < 0.015:
return None
candidate.points.append(point)
consumed_blob_ids.add(point.blob_id)
candidate.last_sample_at = when
return None
def _next_progress_point(
self,
candidate: _TrajectoryCandidate,
blobs: list[_MotionPoint],
previous: _MotionPoint | None,
) -> _MotionPoint:
if self.trash_region is None:
return blobs[0]
source = region_center(candidate.source_region)
target = region_center(self.trash_region)
expected = (target[0] - source[0], target[1] - source[1])
expected_length = (expected[0] ** 2 + expected[1] ** 2) ** 0.5
if expected_length <= 1e-9:
origin = (previous.x, previous.y) if previous is not None else source
return _nearest_point(origin, blobs)
unit = (expected[0] / expected_length, expected[1] / expected_length)
origin = (previous.x, previous.y) if previous is not None else source
def score(point: _MotionPoint) -> float:
dx = point.x - origin[0]
dy = point.y - origin[1]
projection = dx * unit[0] + dy * unit[1]
perpendicular = abs(dx * unit[1] - dy * unit[0])
return projection - 0.25 * perpendicular
return max(blobs, key=score)
def _candidate_reached_trash(self, candidate: _TrajectoryCandidate) -> bool:
points = candidate.points or []
return any(
region_contains(self.trash_region, point.x, point.y, margin=self.settings.trajectory_trash_entry_margin)
for point in points
if self.trash_region is not None
)
def _candidate_ready(self, candidate: _TrajectoryCandidate) -> bool:
confidence = self._confidence(candidate)
return (
candidate.source_motion_seen
and self._candidate_reached_trash(candidate)
and self._has_enough_track_points(candidate)
and self._direction_score(candidate) >= 0.35
and confidence >= self.settings.trajectory_min_confidence
)
def _rejection_reason(self, candidate: _TrajectoryCandidate) -> str:
if not candidate.source_motion_seen:
return "missing_source_motion"
if not self._candidate_reached_trash(candidate):
return "did_not_reach_trash"
if not self._has_enough_track_points(candidate):
return "insufficient_points"
if self._direction_score(candidate) < 0.35:
return "bad_direction"
if self._confidence(candidate) < self.settings.trajectory_min_confidence:
return "low_confidence"
return "rejected"
def _has_enough_track_points(self, candidate: _TrajectoryCandidate) -> bool:
point_count = len(candidate.points or [])
if point_count >= self.settings.trajectory_min_points:
return True
return (
self.settings.trajectory_segmented_enabled
and point_count >= self.settings.trajectory_segmented_min_points
and candidate.source_motion_seen
and self._candidate_reached_trash(candidate)
)
def _candidate_event(self, candidate: _TrajectoryCandidate, reason: str) -> dict[str, Any]:
return {
"source_zone_id": candidate.source_region.region_id,
"reason": reason,
"point_count": len(candidate.points or []),
"confidence": round(self._confidence(candidate), 3),
"direction_score": round(self._direction_score(candidate), 3),
"segmented": self._is_segmented_track(candidate),
"source_seeded": candidate.source_seeded,
}
def _is_segmented_track(self, candidate: _TrajectoryCandidate) -> bool:
point_count = len(candidate.points or [])
return (
self.settings.trajectory_segmented_enabled
and self.settings.trajectory_segmented_min_points <= point_count < self.settings.trajectory_min_points
and candidate.source_motion_seen
and self._candidate_reached_trash(candidate)
)
def _track_signature(self, candidate: _TrajectoryCandidate) -> tuple[tuple[float, float], ...]:
return tuple((round(point.x, 4), round(point.y, 4)) for point in candidate.points or [])
def _confidence(self, candidate: _TrajectoryCandidate) -> float:
point_count = len(candidate.points or [])
point_score = min(1.0, point_count / max(1, self.settings.trajectory_min_points))
source_score = 1.0 if candidate.source_motion_seen else 0.0
trash_score = 1.0 if self._candidate_reached_trash(candidate) else 0.0
direction_score = max(0.0, self._direction_score(candidate))
return min(1.0, 0.20 * source_score + 0.35 * trash_score + 0.25 * direction_score + 0.20 * point_score)
def _direction_score(self, candidate: _TrajectoryCandidate) -> float:
points = candidate.points or []
if len(points) < 2 or self.trash_region is None:
return 0.0
start = points[0]
end = points[-1]
motion = (end.x - start.x, end.y - start.y)
motion_length = (motion[0] ** 2 + motion[1] ** 2) ** 0.5
if motion_length <= 1e-9:
return 0.0
source = region_center(candidate.source_region)
target = region_center(self.trash_region)
expected = (target[0] - source[0], target[1] - source[1])
expected_length = (expected[0] ** 2 + expected[1] ** 2) ** 0.5
if expected_length <= 1e-9:
return 0.0
return (motion[0] * expected[0] + motion[1] * expected[1]) / (motion_length * expected_length)
def _evidence(self, candidate: _TrajectoryCandidate, when: datetime) -> DisposalEvidence:
return DisposalEvidence(
source_zone_id=candidate.source_region.region_id,
target="trash",
confidence=round(self._confidence(candidate), 3),
method="motion",
track_points=[
{
"x": round(point.x, 4),
"y": round(point.y, 4),
"area": point.area,
"observed_at": point.when.isoformat(),
}
for point in candidate.points or []
],
item_class=None,
detector_score=None,
observed_at=when.isoformat(),
)
def load_regions(config: dict[str, Any]) -> tuple[list[Region], Region | None]:
regions: list[Region] = []
for zone in config.get("zones", []):
@@ -243,10 +766,32 @@ def load_runtime_vision_settings(config: dict[str, Any]) -> RuntimeVisionSetting
occupancy_reflection_bright_dark_ratio=float(runtime.get("occupancy_reflection_bright_dark_ratio", 2.0)),
occupancy_confirm_frames=max(1, int(runtime.get("occupancy_confirm_frames", 2))),
empty_confirm_frames=max(1, int(runtime.get("empty_confirm_frames", 2))),
lighting_shift_guard_enabled=bool(runtime.get("lighting_shift_guard_enabled", True)),
lighting_shift_min_regions=max(1, int(runtime.get("lighting_shift_min_regions", 3))),
lighting_shift_region_fraction=max(0.0, min(1.0, float(runtime.get("lighting_shift_region_fraction", 0.6)))),
lighting_shift_mean_delta=float(runtime.get("lighting_shift_mean_delta", 45.0)),
trash_motion_delta=float(runtime.get("trash_motion_delta", 18.0)),
trash_sustained_motion_delta=float(runtime.get("trash_sustained_motion_delta", 8.0)),
trash_sustained_motion_frames=max(1, int(runtime.get("trash_sustained_motion_frames", 2))),
trash_motion_cooldown_seconds=max(0, int(runtime.get("trash_motion_cooldown_seconds", 3))),
trajectory_enabled=bool(runtime.get("trajectory_enabled", True)),
trajectory_window_seconds=max(1, int(runtime.get("trajectory_window_seconds", 8))),
trajectory_sample_interval_seconds=max(0.0, float(runtime.get("trajectory_sample_interval_seconds", 1.0))),
trajectory_min_points=max(1, int(runtime.get("trajectory_min_points", 3))),
trajectory_segmented_enabled=bool(runtime.get("trajectory_segmented_enabled", True)),
trajectory_segmented_min_points=max(2, int(runtime.get("trajectory_segmented_min_points", 2))),
trajectory_min_confidence=float(runtime.get("trajectory_min_confidence", 0.72)),
trajectory_motion_delta=float(runtime.get("trajectory_motion_delta", 20.0)),
trajectory_min_blob_area=max(1, int(runtime.get("trajectory_min_blob_area", 12))),
trajectory_max_blob_area_fraction=max(
0.0,
min(1.0, float(runtime.get("trajectory_max_blob_area_fraction", 0.35))),
),
trajectory_trash_entry_margin=max(0.0, float(runtime.get("trajectory_trash_entry_margin", 0.04))),
trajectory_backend=str(runtime.get("trajectory_backend", "motion")),
yolo_enabled=bool(runtime.get("yolo_enabled", False)),
yolo_model_path=str(runtime.get("yolo_model_path", "")),
yolo_min_confidence=float(runtime.get("yolo_min_confidence", 0.65)),
)
@@ -310,6 +855,37 @@ def average_metrics(samples: list[RegionMetrics]) -> RegionMetrics:
)
def region_center(region: Region) -> tuple[float, float]:
return (
sum(point[0] for point in region.polygon) / len(region.polygon),
sum(point[1] for point in region.polygon) / len(region.polygon),
)
def region_contains(region: Region, x: float, y: float, margin: float = 0.0) -> bool:
if margin <= 0:
return point_in_polygon(x, y, region.polygon)
xs = [point[0] for point in region.polygon]
ys = [point[1] for point in region.polygon]
if x < min(xs) - margin or x > max(xs) + margin or y < min(ys) - margin or y > max(ys) + margin:
return False
return point_in_polygon(x, y, region.polygon) or (
min(xs) - margin <= x <= max(xs) + margin and min(ys) - margin <= y <= max(ys) + margin
)
def _nearest_point(origin: tuple[float, float], points: list[_MotionPoint]) -> _MotionPoint:
return min(points, key=lambda point: _distance(origin, (point.x, point.y)))
def _distance(first: tuple[float, float], second: tuple[float, float]) -> float:
return ((first[0] - second[0]) ** 2 + (first[1] - second[1]) ** 2) ** 0.5
def _luma(r: int, g: int, b: int) -> float:
return 0.299 * r + 0.587 * g + 0.114 * b
def metrics_indicate_occupied(
settings: RuntimeVisionSettings,
mean_delta: float,

View File

@@ -76,3 +76,42 @@ Create and evolve an independent git project under `~/Code` for monitoring food
```
其中 `阶段 x` 表示同一 `v1.1 优化改造` 批次内的工作阶段,不代表拆分成独立批次。
## v1.2 轨迹识别
### Goal
`/Users/yoilun/Code/cold_display_guard` 中完成轨迹识别改造:保留现有 ROI 占用计时和垃圾桶动作兜底,新增轻量轨迹移动检测,输出可被未来 YOLO 物品识别模型复用的统一 `disposal_evidence`,让报警后移出的物品按来源区域确认是否进入垃圾桶。
### Stop Conditions
- [x] v1.2 所有阶段完成。
- [x] 必要 Python 测试通过。
- [x] 前端测试或构建在受影响时通过。
- [x] `docs/project.md` 记录 v1.2 架构、配置、运行方式和关键决策。
- [x] 没有 blocking bug 或未处理的高风险问题。
- [x] 如果同一问题连续 3 次修复仍失败,暂停并报告原因、已尝试方案和建议下一步。
### Phases
| Phase | Status | Goal | Acceptance Criteria |
| --- | --- | --- | --- |
| 1 | complete | 建立 `disposal_evidence` 数据契约并让状态机优先按来源区域丢弃 | `Observation` 支持 evidenceengine 能按 `source_zone_id` 精确关闭 pending batch同帧移除+evidence 有回归测试;旧 `trash_deposit_count` 仍可兜底 |
| 2 | complete | 实现无 YOLO 依赖的轻量轨迹检测 | synthetic frame 测试覆盖源区域到垃圾桶、非源区域运动、未到垃圾桶、单帧反光、多候选互不串扰;不引入模型依赖 |
| 3 | complete | 集成 runtime 配置、诊断和候选窗口加速采样 | `main.py` 写入 `disposal_evidence` 与 trajectory diagnostics配置默认 `trajectory_enabled=true``yolo_enabled=false`;候选活跃时使用更短采样间隔 |
| 4 | complete | 文档、全量验证和部署准备 | README/project/progress 更新Python 全量测试通过;前端测试/构建按影响范围验证;远端部署命令和风险记录清楚 |
### v1.2 Decisions
- 第一版使用 `MotionTrajectoryBackend`,不安装 YOLO、PyTorch、ONNX Runtime 或 OpenVINO。
- YOLO 作为后续 `YoloDetectionBackend` 接入统一 evidence contract不能绕过轨迹校验直接关闭业务事件。
- 状态机只消费 `disposal_evidence`,不依赖具体视觉后端。
- 轨迹 evidence 优先级高于 FIFO 垃圾桶动作兜底。
- 子 agent 派发必须使用标准上下文头:
```text
[项目: /Users/yoilun/Code/cold_display_guard]
[工作流批次: v1.2 轨迹识别]
[阶段: 阶段 x]
[角色: 对应智能体角色]
```

View File

@@ -9,16 +9,39 @@ from cold_display_guard import BatchEngine, EngineSettings, Observation
UTC = timezone.utc
def obs(ts: datetime, counts: dict[str, int], trash: bool | int = False) -> Observation:
def obs(
ts: datetime,
counts: dict[str, int],
trash: bool | int = False,
disposal_evidence: list[dict[str, object]] | None = None,
) -> Observation:
return Observation.from_dict(
{
"ts": ts.isoformat(),
"zone_counts": counts,
"trash_deposit": trash,
"disposal_evidence": disposal_evidence or [],
}
)
def disposal_evidence(
source_zone_id: str,
confidence: float = 0.93,
target: str = "trash_bin",
) -> dict[str, object]:
return {
"source_zone_id": source_zone_id,
"target": target,
"confidence": confidence,
"method": "trajectory",
"track_points": [{"x": 101, "y": 202, "ts": "2026-04-27T10:20:01+00:00"}],
"item_class": "prepared_food",
"detector_score": 0.88,
"observed_at": "2026-04-27T10:20:02+00:00",
}
class BatchEngineTests(unittest.TestCase):
def setUp(self) -> None:
self.settings = EngineSettings(
@@ -80,6 +103,227 @@ class BatchEngineTests(unittest.TestCase):
self.assertEqual([event["event"] for event in events], ["batch_discarded"])
def test_observation_from_dict_normalizes_disposal_evidence(self) -> None:
observation = Observation.from_dict(
{
"ts": self.t0.isoformat(),
"zone_counts": {"1": "2"},
"disposal_evidence": [
{
"source_zone_id": 1,
"target": "trash_bin",
"confidence": "0.83",
"method": "trajectory",
"track_points": [{"x": 1, "y": 2}],
"item_class": "prepared_food",
"detector_score": "0.91",
"observed_at": "2026-04-27T10:00:01+00:00",
}
],
}
)
evidence = observation.disposal_evidence[0]
self.assertEqual(evidence.source_zone_id, "1")
self.assertEqual(evidence.target, "trash_bin")
self.assertAlmostEqual(evidence.confidence, 0.83)
self.assertEqual(evidence.method, "trajectory")
self.assertEqual(evidence.track_points, [{"x": 1, "y": 2}])
self.assertEqual(evidence.item_class, "prepared_food")
self.assertAlmostEqual(evidence.detector_score, 0.91)
self.assertEqual(evidence.observed_at, "2026-04-27T10:00:01+00:00")
def test_observation_from_dict_preserves_null_optional_disposal_fields(self) -> None:
observation = Observation.from_dict(
{
"ts": self.t0.isoformat(),
"zone_counts": {"1": 1},
"disposal_evidence": [
{
"source_zone_id": "1",
"target": "trash_bin",
"confidence": 0.83,
"method": "trajectory",
"track_points": [],
"item_class": None,
"detector_score": None,
}
],
}
)
evidence = observation.disposal_evidence[0]
self.assertIsNone(evidence.item_class)
self.assertIsNone(evidence.detector_score)
def test_matching_disposal_evidence_discards_pending_batch(self) -> None:
settings = EngineSettings(
camera_id="test_cam",
max_dwell_seconds=1200,
trash_confirmation_seconds=120,
zone_ids=("1",),
)
engine = BatchEngine(settings)
engine.process(obs(self.t0, {"1": 1}))
engine.process(obs(self.t0 + timedelta(seconds=1200), {"1": 1}))
engine.process(obs(self.t0 + timedelta(seconds=1300), {"1": 0}))
events = engine.process(
obs(
self.t0 + timedelta(seconds=1310),
{"1": 0},
disposal_evidence=[disposal_evidence("1")],
)
)
self.assertEqual([event["event"] for event in events], ["batch_discarded"])
self.assertEqual(events[0]["zone_id"], "1")
self.assertEqual(engine.pending_disposal, [])
def test_disposal_evidence_for_another_zone_does_not_discard_wrong_pending_batch(self) -> None:
settings = EngineSettings(
camera_id="test_cam",
max_dwell_seconds=1200,
trash_confirmation_seconds=120,
zone_ids=("1", "4"),
)
engine = BatchEngine(settings)
engine.process(obs(self.t0, {"1": 1, "4": 0}))
engine.process(obs(self.t0 + timedelta(seconds=1200), {"1": 1, "4": 0}))
engine.process(obs(self.t0 + timedelta(seconds=1300), {"1": 0, "4": 0}))
events = engine.process(
obs(
self.t0 + timedelta(seconds=1310),
{"1": 0, "4": 0},
disposal_evidence=[disposal_evidence("4")],
)
)
self.assertEqual(events, [])
self.assertEqual([batch.zone_id for batch in engine.pending_disposal], ["1"])
def test_non_trash_disposal_evidence_target_is_ignored(self) -> None:
settings = EngineSettings(
camera_id="test_cam",
max_dwell_seconds=1200,
trash_confirmation_seconds=120,
zone_ids=("1",),
)
engine = BatchEngine(settings)
engine.process(obs(self.t0, {"1": 1}))
engine.process(obs(self.t0 + timedelta(seconds=1200), {"1": 1}))
engine.process(obs(self.t0 + timedelta(seconds=1300), {"1": 0}))
events = engine.process(
obs(
self.t0 + timedelta(seconds=1310),
{"1": 0},
disposal_evidence=[disposal_evidence("1", target="customer_hand")],
)
)
self.assertEqual(events, [])
self.assertEqual([batch.zone_id for batch in engine.pending_disposal], ["1"])
def test_disposal_evidence_and_trash_count_do_not_double_consume_same_signal(self) -> None:
settings = EngineSettings(
camera_id="test_cam",
max_dwell_seconds=1200,
trash_confirmation_seconds=120,
zone_ids=("1", "4"),
)
engine = BatchEngine(settings)
engine.process(obs(self.t0, {"1": 1, "4": 1}))
engine.process(obs(self.t0 + timedelta(seconds=1200), {"1": 1, "4": 1}))
engine.process(obs(self.t0 + timedelta(seconds=1300), {"1": 0, "4": 0}))
events = engine.process(
obs(
self.t0 + timedelta(seconds=1310),
{"1": 0, "4": 0},
trash=True,
disposal_evidence=[disposal_evidence("4")],
)
)
self.assertEqual([(event["event"], event["zone_id"]) for event in events], [("batch_discarded", "4")])
self.assertEqual([batch.zone_id for batch in engine.pending_disposal], ["1"])
def test_extra_trash_deposits_still_fallback_after_matching_disposal_evidence(self) -> None:
settings = EngineSettings(
camera_id="test_cam",
max_dwell_seconds=1200,
trash_confirmation_seconds=120,
zone_ids=("1", "2"),
)
engine = BatchEngine(settings)
engine.process(obs(self.t0, {"1": 1, "2": 1}))
engine.process(obs(self.t0 + timedelta(seconds=1200), {"1": 1, "2": 1}))
engine.process(obs(self.t0 + timedelta(seconds=1300), {"1": 0, "2": 0}))
events = engine.process(
obs(
self.t0 + timedelta(seconds=1310),
{"1": 0, "2": 0},
trash=2,
disposal_evidence=[disposal_evidence("1")],
)
)
self.assertEqual(
[(event["event"], event["zone_id"]) for event in events],
[("batch_discarded", "1"), ("batch_discarded", "2")],
)
self.assertEqual(engine.pending_disposal, [])
def test_same_observation_removal_and_disposal_evidence_discards_newly_pending_batch(self) -> None:
settings = EngineSettings(
camera_id="test_cam",
max_dwell_seconds=1200,
trash_confirmation_seconds=120,
zone_ids=("1",),
)
engine = BatchEngine(settings)
engine.process(obs(self.t0, {"1": 1}))
engine.process(obs(self.t0 + timedelta(seconds=1200), {"1": 1}))
events = engine.process(
obs(
self.t0 + timedelta(seconds=1300),
{"1": 0},
disposal_evidence=[disposal_evidence("1")],
)
)
later_events = engine.process(obs(self.t0 + timedelta(seconds=1421), {"1": 0}))
self.assertEqual([event["event"] for event in events], ["batch_pending_disposal", "batch_discarded"])
self.assertEqual(events[1]["zone_id"], "1")
self.assertEqual(later_events, [])
def test_low_confidence_disposal_evidence_is_ignored(self) -> None:
settings = EngineSettings(
camera_id="test_cam",
max_dwell_seconds=1200,
trash_confirmation_seconds=120,
zone_ids=("1",),
)
engine = BatchEngine(settings)
engine.process(obs(self.t0, {"1": 1}))
engine.process(obs(self.t0 + timedelta(seconds=1200), {"1": 1}))
engine.process(obs(self.t0 + timedelta(seconds=1300), {"1": 0}))
events = engine.process(
obs(
self.t0 + timedelta(seconds=1310),
{"1": 0},
disposal_evidence=[disposal_evidence("1", confidence=0.71)],
)
)
self.assertEqual(events, [])
self.assertEqual([batch.zone_id for batch in engine.pending_disposal], ["1"])
def test_missing_trash_deposit_escalates_warning_after_deadline(self) -> None:
self.engine.process(obs(self.t0, {"r1c1": 2}))
self.engine.process(obs(self.t0 + timedelta(seconds=11), {"r1c1": 0}))

View File

@@ -3,9 +3,14 @@ from __future__ import annotations
import json
import tempfile
import unittest
from datetime import datetime
from pathlib import Path
from unittest.mock import patch
from cold_display_guard.main import restore_runtime_state
from cold_display_guard.frame_source import FrameCaptureError
from cold_display_guard.main import run, restore_runtime_state
from cold_display_guard.models import DisposalEvidence
from cold_display_guard.vision import Frame
class RuntimeRestoreTests(unittest.TestCase):
@@ -109,5 +114,187 @@ class RuntimeRestoreTests(unittest.TestCase):
self.assertEqual(baselines["4"].bright_fraction, 0.0)
class RuntimeLoopTests(unittest.TestCase):
def test_run_writes_disposal_evidence_and_trajectory_diagnostics(self) -> None:
with tempfile.TemporaryDirectory() as tmpdir:
config_path, diagnostics_path = write_runtime_config(tmpdir)
captured_observations = []
tracker_calls = []
class FakeSource:
def __init__(self, **kwargs: object) -> None:
pass
def capture(self) -> Frame:
return Frame(width=2, height=2, rgb=bytes([0, 0, 0]) * 4)
class FakeDetector:
def __init__(self, *args: object) -> None:
pass
def observe(self, frame: Frame, when: datetime) -> tuple[dict[str, int], int, dict[str, object]]:
return {"1": 0}, 0, {"zones": {"1": {"occupied": False}}}
class FakeTracker:
def __init__(self, *args: object) -> None:
self.has_active_candidates = False
def observe(
self,
frame: Frame,
when: datetime,
zone_counts: dict[str, int],
) -> tuple[list[DisposalEvidence], dict[str, object]]:
tracker_calls.append(zone_counts)
return [
DisposalEvidence(
source_zone_id="1",
target="trash",
confidence=0.9,
method="motion",
track_points=[{"x": 0.1, "y": 0.2}],
item_class=None,
detector_score=None,
observed_at=when.isoformat(),
)
], {"active_candidates": 0, "emitted_evidence": 1}
class FakeEngine:
def __init__(self, settings: object) -> None:
pass
def process(self, observation: object) -> list[dict[str, object]]:
captured_observations.append(observation)
return []
with patch("cold_display_guard.main.RTSPFrameSource", FakeSource), patch(
"cold_display_guard.main.ZoneOccupancyDetector", FakeDetector
), patch("cold_display_guard.main.TrajectoryTracker", FakeTracker), patch(
"cold_display_guard.main.BatchEngine", FakeEngine
):
run(config_path, max_iterations=1)
diagnostics = [json.loads(line) for line in diagnostics_path.read_text(encoding="utf-8").splitlines()]
self.assertEqual(len(captured_observations), 1)
self.assertEqual(tracker_calls, [{"1": 0}])
self.assertEqual(captured_observations[0].disposal_evidence[0].source_zone_id, "1")
self.assertEqual(diagnostics[0]["disposal_evidence"][0]["source_zone_id"], "1")
self.assertEqual(diagnostics[0]["disposal_evidence"][0]["target"], "trash")
self.assertEqual(diagnostics[0]["diagnostics"]["trajectory"]["emitted_evidence"], 1)
def test_run_uses_trajectory_sample_interval_when_candidates_are_active(self) -> None:
with tempfile.TemporaryDirectory() as tmpdir:
config_path, _ = write_runtime_config(tmpdir, sample_interval=5.0, trajectory_interval=1.0)
sleeps = []
tracker_calls = []
class FakeSource:
def __init__(self, **kwargs: object) -> None:
pass
def capture(self) -> Frame:
return Frame(width=2, height=2, rgb=bytes([0, 0, 0]) * 4)
class FakeDetector:
def __init__(self, *args: object) -> None:
pass
def observe(self, frame: Frame, when: datetime) -> tuple[dict[str, int], int, dict[str, object]]:
return {"1": 0}, 0, {}
class FakeTracker:
def __init__(self, *args: object) -> None:
self.has_active_candidates = False
def observe(
self,
frame: Frame,
when: datetime,
zone_counts: dict[str, int],
) -> tuple[list[DisposalEvidence], dict[str, object]]:
tracker_calls.append(zone_counts)
self.has_active_candidates = True
return [], {"active_candidates": 1}
with patch("cold_display_guard.main.RTSPFrameSource", FakeSource), patch(
"cold_display_guard.main.ZoneOccupancyDetector", FakeDetector
), patch("cold_display_guard.main.TrajectoryTracker", FakeTracker), patch(
"cold_display_guard.main.time.sleep", sleeps.append
):
run(config_path, max_iterations=2)
self.assertEqual(tracker_calls, [{"1": 0}, {"1": 0}])
self.assertEqual(sleeps, [1.0])
def test_capture_failure_diagnostics_keep_trajectory_schema(self) -> None:
with tempfile.TemporaryDirectory() as tmpdir:
config_path, diagnostics_path = write_runtime_config(tmpdir)
class FailingSource:
def __init__(self, **kwargs: object) -> None:
pass
def capture(self) -> Frame:
raise FrameCaptureError("camera offline")
with patch("cold_display_guard.main.RTSPFrameSource", FailingSource):
run(config_path, max_iterations=1)
diagnostics = [json.loads(line) for line in diagnostics_path.read_text(encoding="utf-8").splitlines()]
self.assertEqual(diagnostics[0]["error"], "frame_capture_failed")
self.assertEqual(diagnostics[0]["disposal_evidence"], [])
self.assertEqual(diagnostics[0]["diagnostics"]["trajectory"]["reason"], "frame_capture_failed")
def write_runtime_config(
tmpdir: str,
sample_interval: float = 5.0,
trajectory_interval: float = 1.0,
) -> tuple[Path, Path]:
root = Path(tmpdir)
event_path = root / "events.jsonl"
diagnostics_path = root / "runtime_diagnostics.jsonl"
config_path = root / "config.toml"
config_path.write_text(
"\n".join(
[
'camera_id = "test-camera"',
'timezone = "UTC"',
"",
"[stream]",
'rtsp_url = "rtsp://example.invalid/stream"',
"",
"[thresholds]",
"max_dwell_seconds = 1200",
"trash_confirmation_seconds = 120",
"",
"[layout]",
"zone_count = 1",
'zone_ids = ["1"]',
"",
"[[zones]]",
'id = "1"',
"polygon = [[0.0, 0.0], [0.5, 0.0], [0.5, 0.5], [0.0, 0.5]]",
"",
"[trash]",
"roi = [[0.6, 0.6], [1.0, 0.6], [1.0, 1.0], [0.6, 1.0]]",
"",
"[runtime]",
f"sample_interval_seconds = {sample_interval}",
f"trajectory_sample_interval_seconds = {trajectory_interval}",
f'diagnostics_path = "{diagnostics_path}"',
"",
"[event_sink]",
f'path = "{event_path}"',
"",
]
),
encoding="utf-8",
)
return config_path, diagnostics_path
if __name__ == "__main__":
unittest.main()

View File

@@ -8,6 +8,7 @@ from cold_display_guard.vision import (
Region,
RegionMetrics,
RuntimeVisionSettings,
TrajectoryTracker,
ZoneOccupancyDetector,
load_runtime_vision_settings,
point_in_polygon,
@@ -38,6 +39,11 @@ def multi_patched_frame(width: int, height: int, base: int, patches: list[tuple[
return Frame(width=width, height=height, rgb=bytes(pixels))
def frame_with_motion_patch(width: int, height: int, top_left: tuple[int, int]) -> Frame:
x, y = top_left
return patched_frame(width, height, 40, (x, y, x + 8, y + 8, 180))
class VisionTests(unittest.TestCase):
def test_point_in_polygon(self) -> None:
polygon = ((0.0, 0.0), (1.0, 0.0), (1.0, 1.0), (0.0, 1.0))
@@ -152,6 +158,66 @@ class VisionTests(unittest.TestCase):
self.assertEqual(first_empty_counts, {"1": 1})
self.assertEqual(second_empty_counts, {"1": 0})
def test_detector_ignores_global_lighting_dimming_across_many_zones(self) -> None:
regions = [
Region(str(index + 1), ((index / 7, 0.0), ((index + 1) / 7, 0.0), ((index + 1) / 7, 1.0), (index / 7, 1.0)))
for index in range(7)
]
detector = ZoneOccupancyDetector(
regions,
trash_region=None,
settings=RuntimeVisionSettings(
baseline_frames=1,
sample_stride_pixels=2,
occupancy_mean_delta=55,
occupancy_texture_delta=18,
occupancy_confirm_frames=2,
empty_confirm_frames=2,
),
)
now = datetime(2026, 6, 1, 4, 55, tzinfo=timezone.utc)
detector.observe(solid_frame(70, 20, 180), now)
first_counts, _, first_diagnostics = detector.observe(solid_frame(70, 20, 100), now + timedelta(seconds=5))
second_counts, _, second_diagnostics = detector.observe(solid_frame(70, 20, 100), now + timedelta(seconds=10))
self.assertEqual(first_counts, {str(index): 0 for index in range(1, 8)})
self.assertEqual(second_counts, {str(index): 0 for index in range(1, 8)})
self.assertTrue(first_diagnostics["lighting_shift"]["active"])
self.assertTrue(second_diagnostics["lighting_shift"]["active"])
self.assertTrue(all(not zone["raw_occupied"] for zone in second_diagnostics["zones"].values()))
def test_detector_allows_single_zone_object_while_lighting_guard_is_available(self) -> None:
regions = [
Region(str(index + 1), ((index / 7, 0.0), ((index + 1) / 7, 0.0), ((index + 1) / 7, 1.0), (index / 7, 1.0)))
for index in range(7)
]
detector = ZoneOccupancyDetector(
regions,
trash_region=None,
settings=RuntimeVisionSettings(
baseline_frames=1,
sample_stride_pixels=2,
occupancy_mean_delta=55,
occupancy_texture_delta=100,
occupancy_dark_luma_threshold=80,
occupancy_dark_fraction=0.06,
occupancy_confirm_frames=2,
empty_confirm_frames=2,
),
)
now = datetime(2026, 6, 1, 10, 0, tzinfo=timezone.utc)
detector.observe(solid_frame(70, 20, 180), now)
object_frame = patched_frame(70, 20, 180, (0, 0, 10, 20, 20))
detector.observe(object_frame, now + timedelta(seconds=5))
counts, _, diagnostics = detector.observe(object_frame, now + timedelta(seconds=10))
self.assertEqual(counts["1"], 1)
self.assertEqual(sum(counts.values()), 1)
self.assertFalse(diagnostics["lighting_shift"]["active"])
self.assertTrue(diagnostics["zones"]["1"]["raw_occupied"])
def test_runtime_vision_defaults_raise_brightness_reflection_threshold(self) -> None:
settings = load_runtime_vision_settings({})
@@ -163,6 +229,10 @@ class VisionTests(unittest.TestCase):
self.assertEqual(settings.trash_sustained_motion_delta, 8.0)
self.assertEqual(settings.trash_sustained_motion_frames, 2)
self.assertEqual(settings.trash_motion_cooldown_seconds, 3)
self.assertTrue(settings.lighting_shift_guard_enabled)
self.assertEqual(settings.lighting_shift_min_regions, 3)
self.assertAlmostEqual(settings.lighting_shift_region_fraction, 0.6)
self.assertEqual(settings.lighting_shift_mean_delta, 45.0)
def test_detector_can_seed_previous_baseline_and_occupancy(self) -> None:
detector = ZoneOccupancyDetector(
@@ -271,6 +341,426 @@ class VisionTests(unittest.TestCase):
self.assertGreaterEqual(zone["dark_fraction"], 0.06)
self.assertGreaterEqual(zone["bright_fraction"], 0.18)
def test_motion_track_from_source_zone_to_trash_roi_emits_evidence(self) -> None:
source = Region("source", ((0.05, 0.35), (0.25, 0.35), (0.25, 0.65), (0.05, 0.65)))
trash = Region("trash", ((0.72, 0.35), (0.95, 0.35), (0.95, 0.65), (0.72, 0.65)))
tracker = TrajectoryTracker(
[source],
trash,
RuntimeVisionSettings(
trajectory_sample_interval_seconds=0.0,
trajectory_min_points=3,
trajectory_min_confidence=0.72,
trajectory_motion_delta=20.0,
trajectory_min_blob_area=12,
),
)
now = datetime(2026, 4, 28, 10, 0, tzinfo=timezone.utc)
tracker.observe(frame_with_motion_patch(80, 80, (8, 36)), now, {"source": 1})
all_evidence: list[object] = []
emitted_evidence_count = 0
for index, point in enumerate([(16, 36), (28, 36), (42, 36), (58, 36), (66, 36)]):
evidence, diagnostics = tracker.observe(
frame_with_motion_patch(80, 80, point),
now + timedelta(seconds=index + 1),
{"source": 0},
)
all_evidence.extend(evidence)
emitted_evidence_count += diagnostics["emitted_evidence"]
self.assertTrue(all_evidence)
emitted = all_evidence[0]
self.assertEqual(emitted.source_zone_id, "source")
self.assertEqual(emitted.target, "trash")
self.assertEqual(emitted.method, "motion")
self.assertGreaterEqual(emitted.confidence, 0.72)
self.assertGreaterEqual(len(emitted.track_points), 3)
self.assertGreaterEqual(emitted_evidence_count, 1)
def test_segmented_source_to_trash_track_survives_temporary_occlusion(self) -> None:
source = Region("source", ((0.05, 0.35), (0.25, 0.35), (0.25, 0.65), (0.05, 0.65)))
trash = Region("trash", ((0.72, 0.35), (0.95, 0.35), (0.95, 0.65), (0.72, 0.65)))
tracker = TrajectoryTracker(
[source],
trash,
RuntimeVisionSettings(
trajectory_sample_interval_seconds=0.0,
trajectory_min_points=3,
trajectory_min_confidence=0.72,
trajectory_motion_delta=20.0,
trajectory_min_blob_area=12,
),
)
now = datetime(2026, 6, 1, 10, 55, tzinfo=timezone.utc)
tracker.observe(frame_with_motion_patch(80, 80, (8, 36)), now, {"source": 1})
source_motion_frame = frame_with_motion_patch(80, 80, (16, 36))
first_evidence, _ = tracker.observe(source_motion_frame, now + timedelta(seconds=1), {"source": 0})
occluded_evidence, occluded_diagnostics = tracker.observe(source_motion_frame, now + timedelta(seconds=2), {"source": 0})
trash_evidence, trash_diagnostics = tracker.observe(
frame_with_motion_patch(80, 80, (66, 36)),
now + timedelta(seconds=3),
{"source": 0},
)
self.assertEqual(first_evidence, [])
self.assertEqual(occluded_evidence, [])
self.assertEqual(occluded_diagnostics["active_candidates"], 1)
self.assertEqual(len(trash_evidence), 1)
emitted = trash_evidence[0]
self.assertEqual(emitted.source_zone_id, "source")
self.assertEqual(emitted.target, "trash")
self.assertEqual(emitted.method, "motion")
self.assertEqual(len(emitted.track_points), 2)
self.assertGreaterEqual(emitted.confidence, 0.72)
self.assertEqual(trash_diagnostics["emitted"][0]["reason"], "emitted")
self.assertTrue(trash_diagnostics["emitted"][0]["segmented"])
def test_recent_source_motion_seeds_track_when_empty_confirmation_lags(self) -> None:
source = Region("source", ((0.05, 0.35), (0.25, 0.35), (0.25, 0.65), (0.05, 0.65)))
trash = Region("trash", ((0.72, 0.35), (0.95, 0.35), (0.95, 0.65), (0.72, 0.65)))
tracker = TrajectoryTracker(
[source],
trash,
RuntimeVisionSettings(
trajectory_sample_interval_seconds=0.0,
trajectory_min_points=3,
trajectory_min_confidence=0.72,
trajectory_motion_delta=20.0,
trajectory_min_blob_area=12,
),
)
now = datetime(2026, 6, 1, 11, 25, tzinfo=timezone.utc)
tracker.observe(frame_with_motion_patch(80, 80, (8, 36)), now, {"source": 1})
tracker.observe(frame_with_motion_patch(80, 80, (16, 36)), now + timedelta(seconds=1), {"source": 1})
evidence, diagnostics = tracker.observe(
frame_with_motion_patch(80, 80, (66, 36)),
now + timedelta(seconds=2),
{"source": 0},
)
self.assertEqual(len(evidence), 1)
self.assertEqual(evidence[0].source_zone_id, "source")
self.assertEqual(len(evidence[0].track_points), 2)
self.assertTrue(diagnostics["emitted"][0]["segmented"])
self.assertTrue(diagnostics["emitted"][0]["source_seeded"])
def test_motion_that_starts_away_from_source_zone_is_rejected(self) -> None:
source = Region("source", ((0.05, 0.35), (0.25, 0.35), (0.25, 0.65), (0.05, 0.65)))
trash = Region("trash", ((0.72, 0.35), (0.95, 0.35), (0.95, 0.65), (0.72, 0.65)))
tracker = TrajectoryTracker(
[source],
trash,
RuntimeVisionSettings(trajectory_sample_interval_seconds=0.0, trajectory_min_points=3),
)
now = datetime(2026, 4, 28, 10, 0, tzinfo=timezone.utc)
tracker.observe(frame_with_motion_patch(80, 80, (50, 10)), now, {"source": 1})
all_evidence: list[object] = []
rejected_candidates = 0
for index, point in enumerate([(52, 14), (56, 20), (60, 28), (66, 36)]):
evidence, diagnostics = tracker.observe(
frame_with_motion_patch(80, 80, point),
now + timedelta(seconds=index + 1),
{"source": 0},
)
all_evidence.extend(evidence)
rejected_candidates += diagnostics["rejected_candidates"]
self.assertEqual(all_evidence, [])
self.assertGreaterEqual(rejected_candidates, 1)
def test_motion_that_never_reaches_trash_roi_is_rejected(self) -> None:
source = Region("source", ((0.05, 0.35), (0.25, 0.35), (0.25, 0.65), (0.05, 0.65)))
trash = Region("trash", ((0.72, 0.35), (0.95, 0.35), (0.95, 0.65), (0.72, 0.65)))
tracker = TrajectoryTracker(
[source],
trash,
RuntimeVisionSettings(
trajectory_window_seconds=3,
trajectory_sample_interval_seconds=0.0,
trajectory_min_points=3,
),
)
now = datetime(2026, 4, 28, 10, 0, tzinfo=timezone.utc)
tracker.observe(frame_with_motion_patch(80, 80, (8, 36)), now, {"source": 1})
all_evidence: list[object] = []
diagnostics = {}
for index, point in enumerate([(16, 36), (26, 36), (36, 36), (42, 36), (46, 36)]):
evidence, diagnostics = tracker.observe(
frame_with_motion_patch(80, 80, point),
now + timedelta(seconds=index + 1),
{"source": 0},
)
all_evidence.extend(evidence)
self.assertEqual(all_evidence, [])
self.assertGreaterEqual(diagnostics["expired_candidates"], 1)
self.assertGreaterEqual(diagnostics["rejected_candidates"], 1)
def test_one_frame_reflection_flash_is_rejected(self) -> None:
source = Region("source", ((0.05, 0.35), (0.25, 0.35), (0.25, 0.65), (0.05, 0.65)))
trash = Region("trash", ((0.72, 0.35), (0.95, 0.35), (0.95, 0.65), (0.72, 0.65)))
tracker = TrajectoryTracker(
[source],
trash,
RuntimeVisionSettings(trajectory_sample_interval_seconds=0.0, trajectory_min_points=3),
)
now = datetime(2026, 4, 28, 10, 0, tzinfo=timezone.utc)
tracker.observe(solid_frame(80, 80, 40), now, {"source": 1})
flash_frame = patched_frame(80, 80, 40, (56, 28, 72, 52, 255))
evidence, diagnostics = tracker.observe(flash_frame, now + timedelta(seconds=1), {"source": 0})
later_evidence, later_diagnostics = tracker.observe(solid_frame(80, 80, 40), now + timedelta(seconds=2), {"source": 0})
self.assertEqual(evidence, [])
self.assertEqual(later_evidence, [])
self.assertEqual(diagnostics["emitted_evidence"], 0)
self.assertEqual(later_diagnostics["emitted_evidence"], 0)
self.assertEqual(later_diagnostics["rejected_candidates"], 0)
def test_multiple_active_candidates_do_not_cross_close_each_other(self) -> None:
left = Region("left", ((0.05, 0.15), (0.25, 0.15), (0.25, 0.35), (0.05, 0.35)))
right = Region("right", ((0.05, 0.65), (0.25, 0.65), (0.25, 0.85), (0.05, 0.85)))
trash = Region("trash", ((0.72, 0.35), (0.95, 0.35), (0.95, 0.65), (0.72, 0.65)))
tracker = TrajectoryTracker(
[left, right],
trash,
RuntimeVisionSettings(trajectory_sample_interval_seconds=0.0, trajectory_min_points=3),
)
now = datetime(2026, 4, 28, 10, 0, tzinfo=timezone.utc)
tracker.observe(
multi_patched_frame(80, 80, 40, [(8, 18, 16, 26, 180), (8, 54, 16, 62, 180)]),
now,
{"left": 1, "right": 1},
)
all_evidence = []
frames = [
[(16, 20, 24, 28, 180), (18, 54, 26, 62, 180)],
[(28, 24, 36, 32, 180), (30, 52, 38, 60, 180)],
[(44, 30, 52, 38, 180), (44, 50, 52, 58, 180)],
[(60, 36, 68, 44, 180), (60, 50, 68, 58, 180)],
]
for index, patches in enumerate(frames):
evidence, _ = tracker.observe(
multi_patched_frame(80, 80, 40, patches),
now + timedelta(seconds=index + 1),
{"left": 0, "right": 0},
)
all_evidence.extend(evidence)
self.assertEqual({item.source_zone_id for item in all_evidence}, {"left", "right"})
self.assertEqual(len(all_evidence), 2)
def test_multiple_active_candidates_do_not_emit_same_motion_track(self) -> None:
first = Region("first", ((0.05, 0.35), (0.25, 0.35), (0.25, 0.65), (0.05, 0.65)))
second = Region("second", ((0.05, 0.35), (0.25, 0.35), (0.25, 0.65), (0.05, 0.65)))
trash = Region("trash", ((0.72, 0.35), (0.95, 0.35), (0.95, 0.65), (0.72, 0.65)))
tracker = TrajectoryTracker(
[first, second],
trash,
RuntimeVisionSettings(trajectory_sample_interval_seconds=0.0, trajectory_min_points=3),
)
now = datetime(2026, 4, 28, 10, 0, tzinfo=timezone.utc)
tracker.observe(frame_with_motion_patch(80, 80, (8, 36)), now, {"first": 1, "second": 1})
all_evidence = []
rejected = []
for index, point in enumerate([(16, 36), (28, 36), (42, 36), (58, 36), (66, 36)]):
evidence, diagnostics = tracker.observe(
frame_with_motion_patch(80, 80, point),
now + timedelta(seconds=index + 1),
{"first": 0, "second": 0},
)
all_evidence.extend(evidence)
rejected.extend(diagnostics["rejected"])
tracks = [tuple((point["x"], point["y"]) for point in item.track_points) for item in all_evidence]
self.assertLessEqual(len(all_evidence), 1)
self.assertEqual(len(tracks), len(set(tracks)))
self.assertTrue(any(item["reason"] == "ambiguous_motion_track" for item in rejected))
def test_motion_inside_source_margin_but_outside_source_polygon_is_rejected(self) -> None:
source = Region("source", ((0.05, 0.35), (0.25, 0.35), (0.25, 0.65), (0.05, 0.65)))
trash = Region("trash", ((0.72, 0.35), (0.95, 0.35), (0.95, 0.65), (0.72, 0.65)))
tracker = TrajectoryTracker(
[source],
trash,
RuntimeVisionSettings(trajectory_sample_interval_seconds=0.0, trajectory_min_points=3),
)
now = datetime(2026, 4, 28, 10, 0, tzinfo=timezone.utc)
tracker.observe(frame_with_motion_patch(80, 80, (20, 36)), now, {"source": 1})
all_evidence = []
rejected = []
for index, point in enumerate([(24, 36), (36, 36), (50, 36), (66, 36), (70, 36)]):
evidence, diagnostics = tracker.observe(
frame_with_motion_patch(80, 80, point),
now + timedelta(seconds=index + 1),
{"source": 0},
)
all_evidence.extend(evidence)
rejected.extend(diagnostics["rejected"])
self.assertEqual(all_evidence, [])
self.assertTrue(any(item["reason"] == "motion_started_outside_source" for item in rejected))
def test_motion_before_source_motion_cannot_seed_later_trash_evidence(self) -> None:
source = Region("source", ((0.05, 0.35), (0.25, 0.35), (0.25, 0.65), (0.05, 0.65)))
trash = Region("trash", ((0.72, 0.35), (0.95, 0.35), (0.95, 0.65), (0.72, 0.65)))
tracker = TrajectoryTracker(
[source],
trash,
RuntimeVisionSettings(trajectory_sample_interval_seconds=0.0, trajectory_min_points=3),
)
now = datetime(2026, 4, 28, 10, 0, tzinfo=timezone.utc)
tracker.observe(solid_frame(80, 80, 40), now, {"source": 1})
all_evidence = []
rejected = []
frames = [
frame_with_motion_patch(80, 80, (34, 36)),
frame_with_motion_patch(80, 80, (16, 36)),
frame_with_motion_patch(80, 80, (34, 36)),
frame_with_motion_patch(80, 80, (50, 36)),
frame_with_motion_patch(80, 80, (66, 36)),
]
for index, frame in enumerate(frames):
evidence, diagnostics = tracker.observe(
frame,
now + timedelta(seconds=index + 1),
{"source": 0},
)
all_evidence.extend(evidence)
rejected.extend(diagnostics["rejected"])
self.assertEqual(all_evidence, [])
self.assertTrue(any(item["reason"] == "motion_started_outside_source" for item in rejected))
def test_trajectory_diagnostics_include_per_candidate_events(self) -> None:
source = Region("source", ((0.05, 0.35), (0.25, 0.35), (0.25, 0.65), (0.05, 0.65)))
trash = Region("trash", ((0.72, 0.35), (0.95, 0.35), (0.95, 0.65), (0.72, 0.65)))
tracker = TrajectoryTracker(
[source],
trash,
RuntimeVisionSettings(
trajectory_window_seconds=3,
trajectory_sample_interval_seconds=0.0,
trajectory_min_points=3,
),
)
now = datetime(2026, 4, 28, 10, 0, tzinfo=timezone.utc)
tracker.observe(frame_with_motion_patch(80, 80, (8, 36)), now, {"source": 1})
emitted_diagnostics = {}
for index, point in enumerate([(16, 36), (28, 36), (42, 36), (58, 36)]):
_, emitted_diagnostics = tracker.observe(
frame_with_motion_patch(80, 80, point),
now + timedelta(seconds=index + 1),
{"source": 0},
)
emitted_event = emitted_diagnostics["emitted"][0]
self.assert_candidate_event(emitted_event, "source", "emitted")
tracker = TrajectoryTracker(
[source],
trash,
RuntimeVisionSettings(
trajectory_window_seconds=2,
trajectory_sample_interval_seconds=0.0,
trajectory_min_points=3,
),
)
tracker.observe(frame_with_motion_patch(80, 80, (8, 36)), now, {"source": 1})
rejected_diagnostics = {}
for index, point in enumerate([(16, 36), (26, 36), (36, 36), (42, 36)]):
_, rejected_diagnostics = tracker.observe(
frame_with_motion_patch(80, 80, point),
now + timedelta(seconds=index + 1),
{"source": 0},
)
expired_event = rejected_diagnostics["expired"][0]
rejected_event = rejected_diagnostics["rejected"][0]
self.assert_candidate_event(expired_event, "source", "expired")
self.assert_candidate_event(rejected_event, "source", "did_not_reach_trash")
def assert_candidate_event(self, event: dict[str, object], source_zone_id: str, reason: str) -> None:
self.assertEqual(event["source_zone_id"], source_zone_id)
self.assertEqual(event["reason"], reason)
self.assertIn("point_count", event)
self.assertIn("confidence", event)
self.assertIn("direction_score", event)
def test_runtime_vision_defaults_include_trajectory_and_yolo_fields(self) -> None:
settings = load_runtime_vision_settings({})
self.assertTrue(settings.trajectory_enabled)
self.assertEqual(settings.trajectory_window_seconds, 8)
self.assertEqual(settings.trajectory_sample_interval_seconds, 1.0)
self.assertEqual(settings.trajectory_min_points, 3)
self.assertTrue(settings.trajectory_segmented_enabled)
self.assertEqual(settings.trajectory_segmented_min_points, 2)
self.assertEqual(settings.trajectory_min_confidence, 0.72)
self.assertEqual(settings.trajectory_motion_delta, 20.0)
self.assertEqual(settings.trajectory_min_blob_area, 12)
self.assertEqual(settings.trajectory_max_blob_area_fraction, 0.35)
self.assertEqual(settings.trajectory_trash_entry_margin, 0.04)
self.assertEqual(settings.trajectory_backend, "motion")
self.assertFalse(settings.yolo_enabled)
self.assertEqual(settings.yolo_model_path, "")
self.assertEqual(settings.yolo_min_confidence, 0.65)
def test_runtime_vision_settings_read_trajectory_and_yolo_fields_from_config(self) -> None:
settings = load_runtime_vision_settings(
{
"runtime": {
"trajectory_enabled": False,
"trajectory_window_seconds": 11,
"trajectory_sample_interval_seconds": 0.5,
"trajectory_min_points": 4,
"trajectory_segmented_enabled": False,
"trajectory_segmented_min_points": 3,
"trajectory_min_confidence": 0.8,
"trajectory_motion_delta": 25.0,
"trajectory_min_blob_area": 20,
"trajectory_max_blob_area_fraction": 0.25,
"trajectory_trash_entry_margin": 0.02,
"trajectory_backend": "motion",
"yolo_enabled": True,
"yolo_model_path": "models/yolo.onnx",
"yolo_min_confidence": 0.7,
"lighting_shift_guard_enabled": False,
"lighting_shift_min_regions": 4,
"lighting_shift_region_fraction": 0.75,
"lighting_shift_mean_delta": 60.0,
}
}
)
self.assertFalse(settings.trajectory_enabled)
self.assertEqual(settings.trajectory_window_seconds, 11)
self.assertEqual(settings.trajectory_sample_interval_seconds, 0.5)
self.assertEqual(settings.trajectory_min_points, 4)
self.assertFalse(settings.trajectory_segmented_enabled)
self.assertEqual(settings.trajectory_segmented_min_points, 3)
self.assertEqual(settings.trajectory_min_confidence, 0.8)
self.assertEqual(settings.trajectory_motion_delta, 25.0)
self.assertEqual(settings.trajectory_min_blob_area, 20)
self.assertEqual(settings.trajectory_max_blob_area_fraction, 0.25)
self.assertEqual(settings.trajectory_trash_entry_margin, 0.02)
self.assertEqual(settings.trajectory_backend, "motion")
self.assertTrue(settings.yolo_enabled)
self.assertEqual(settings.yolo_model_path, "models/yolo.onnx")
self.assertEqual(settings.yolo_min_confidence, 0.7)
self.assertFalse(settings.lighting_shift_guard_enabled)
self.assertEqual(settings.lighting_shift_min_regions, 4)
self.assertAlmostEqual(settings.lighting_shift_region_fraction, 0.75)
self.assertEqual(settings.lighting_shift_mean_delta, 60.0)
if __name__ == "__main__":
unittest.main()