feat: integrate trajectory runtime diagnostics

This commit is contained in:
Yoilun
2026-05-29 15:58:26 +08:00
parent 39cfc76fa2
commit 90aa5dd704
6 changed files with 314 additions and 7 deletions

View File

@@ -36,6 +36,7 @@ polygon = [[0.581255, 0.408928], [0.717971, 0.468544], [0.711092, 0.574018], [0.
roi = [[0.776842, 0.486901], [0.896842, 0.522456], [0.841053, 0.857427], [0.716842, 0.853684]]
[runtime]
sample_interval_seconds = 5.0
sample_stride_pixels = 4
occupancy_mean_delta = 55.0
occupancy_dark_luma_threshold = 80.0
@@ -51,6 +52,19 @@ trash_motion_delta = 18.0
trash_sustained_motion_delta = 8.0
trash_sustained_motion_frames = 2
trash_motion_cooldown_seconds = 3
trajectory_enabled = true
trajectory_window_seconds = 8
trajectory_sample_interval_seconds = 1.0
trajectory_min_points = 3
trajectory_min_confidence = 0.72
trajectory_motion_delta = 20.0
trajectory_min_blob_area = 12
trajectory_max_blob_area_fraction = 0.35
trajectory_trash_entry_margin = 0.04
trajectory_backend = "motion"
yolo_enabled = false
yolo_model_path = ""
yolo_min_confidence = 0.65
[event_sink]
path = "logs/events.jsonl"

View File

@@ -205,6 +205,12 @@
| 2026-05-29 | Phase 2 | Coding Agent | Fixed trajectory tracker findings | Added blob consumption, strict source polygon origin, and per-candidate diagnostics |
| 2026-05-29 | Phase 2 | Testing Agent | Re-tested phase 2 fixes | Verdict pass; no bugs found |
| 2026-05-29 | Phase 2 | Main Agent | Ran local verification | `tests.test_vision` passed with 20 tests; full Python suite passed with 64 tests; dependency scan had no model/heavy vision matches |
| 2026-05-29 | Phase 3 | Main Agent | Marked Phase 3 as `in_progress` | Preparing fresh coding/testing agents for runtime integration |
| 2026-05-29 | Phase 3 | Coding Agent | Implemented initial runtime integration | Target main/vision tests and full Python tests passed in coding agent run |
| 2026-05-29 | Phase 3 | Testing Agent | Reviewed runtime integration | Verdict pass with non-blocking concerns: capture-error diagnostics schema, `create=True` patch robustness, broad helper type |
| 2026-05-29 | Phase 3 | Coding Agent | Fixed runtime integration review concerns | Error diagnostics keep trajectory schema; tests no longer use `create=True`; evidence payload helper type narrowed |
| 2026-05-29 | Phase 3 | Testing Agent | Re-tested runtime integration concerns | Verdict pass; no new issues |
| 2026-05-29 | Phase 3 | Main Agent | Ran local verification | `tests.test_main tests.test_vision` passed with 26 tests; full Python suite passed with 68 tests; dependency scan had no model/heavy vision matches |
### Test Results
@@ -215,6 +221,9 @@
| 2026-05-29 | `PYTHONPATH=src python3 -m unittest tests.test_vision -v` | pass | 20 vision tests passed after phase 2 trajectory tracker |
| 2026-05-29 | `PYTHONPATH=src python3 -m unittest discover -s tests -v` | pass | 64 full Python tests passed after phase 2 |
| 2026-05-29 | `rg -n "ultralytics|torch|onnxruntime|openvino|opencv|cv2|numpy" src tests pyproject.toml` | pass | No matches; command exited 1 because no heavy vision/model dependency was found |
| 2026-05-29 | `PYTHONPATH=src python3 -m unittest tests.test_main tests.test_vision -v` | pass | 26 runtime/vision tests passed after phase 3 |
| 2026-05-29 | `PYTHONPATH=src python3 -m unittest discover -s tests -v` | pass | 68 full Python tests passed after phase 3 |
| 2026-05-29 | `rg -n "ultralytics|torch|onnxruntime|openvino|opencv|cv2|numpy" src tests pyproject.toml` | pass | No matches; command exited 1 because no heavy vision/model dependency was found |
### Bug Loop
@@ -226,6 +235,35 @@
| Phase 2 | Single motion blob can confirm multiple active candidates | Added frame-local blob IDs and consume each sampled blob once per frame | Resolved; testing agent and local full Python suite passed |
| Phase 2 | Source-zone margin can treat near-outside movement as source-origin movement | Source-origin check now requires strict source polygon containment | Resolved; testing agent and local full Python suite passed |
| Phase 2 | Trajectory diagnostics only expose aggregate counts | Added `emitted`, `rejected`, and `expired` diagnostic lists with source, reason, point count, confidence, and direction score | Resolved; testing agent and local full Python suite passed |
| Phase 3 | Capture-error diagnostics rows lack `disposal_evidence` and `diagnostics.trajectory` schema | Added regression test and wrote empty evidence plus trajectory error diagnostics on capture failure | Resolved; testing agent and local full Python suite passed |
| Phase 3 | Runtime tests patch `TrajectoryTracker` with `create=True`, which can mask missing imports | Removed `create=True` and asserted fake tracker observe calls | Resolved; testing agent and local full Python suite passed |
| Phase 3 | `disposal_evidence_payloads()` accepts `list[object]` but blindly calls `asdict()` | Narrowed helper signature to `list[DisposalEvidence]` | Resolved; testing agent and local full Python suite passed |
## 2026-05-29 Phase Completed: Phase 3 - Runtime Integration
Status: complete
Files Changed:
- `src/cold_display_guard/main.py`
- `config/example.toml`
- `tests/test_main.py`
- `tests/test_vision.py`
- `task_plan.md`
- `progress.md`
Tests:
- `PYTHONPATH=src python3 -m unittest tests.test_main tests.test_vision -v`: pass
- `PYTHONPATH=src python3 -m unittest discover -s tests -v`: pass
- `rg -n "ultralytics|torch|onnxruntime|openvino|opencv|cv2|numpy" src tests pyproject.toml`: pass, no matches
Notes:
- Runtime now passes trajectory `disposal_evidence` into `Observation`.
- Diagnostics rows include top-level serialized `disposal_evidence` and nested `diagnostics.trajectory`.
- Capture failure diagnostics keep the same trajectory/evidence schema.
- Runtime sleeps at `trajectory_sample_interval_seconds` while trajectory candidates are active.
Risks:
- No live-camera validation has run yet in this phase; deployment and remote runtime observation remain phase 4 work.
## 2026-05-29 Phase Completed: Phase 2 - Lightweight Motion Trajectory Backend

View File

@@ -3,6 +3,7 @@ from __future__ import annotations
import argparse
import json
import time
from dataclasses import asdict
from datetime import datetime
from pathlib import Path
from zoneinfo import ZoneInfo
@@ -10,9 +11,10 @@ from zoneinfo import ZoneInfo
from cold_display_guard.config import load_config_document, load_settings, resolve_config_path, resolve_project_root
from cold_display_guard.engine import BatchEngine
from cold_display_guard.frame_source import FrameCaptureError, RTSPFrameSource
from cold_display_guard.models import Observation
from cold_display_guard.models import DisposalEvidence, Observation
from cold_display_guard.vision import (
RegionMetrics,
TrajectoryTracker,
ZoneOccupancyDetector,
load_regions,
load_runtime_vision_settings,
@@ -65,6 +67,7 @@ def run(config_path: str | Path, once: bool = False, max_iterations: int = 0) ->
)
vision_settings = load_runtime_vision_settings(config)
detector = ZoneOccupancyDetector(regions, trash_region, vision_settings)
trajectory_tracker = TrajectoryTracker(regions, trash_region, vision_settings)
engine = BatchEngine(settings)
baseline_seed, active_zone_counts = restore_runtime_state(diagnostics_path, config)
if baseline_seed:
@@ -87,7 +90,13 @@ def run(config_path: str | Path, once: bool = False, max_iterations: int = 0) ->
try:
frame = source.capture()
zone_counts, trash_deposit_count, diagnostics = detector.observe(frame, when)
observation = Observation(ts=when, zone_counts=zone_counts, trash_deposit_count=trash_deposit_count)
disposal_evidence, trajectory_diagnostics = trajectory_tracker.observe(frame, when, zone_counts)
observation = Observation(
ts=when,
zone_counts=zone_counts,
trash_deposit_count=trash_deposit_count,
disposal_evidence=disposal_evidence,
)
events = engine.process(observation)
append_jsonl(event_path, events)
append_jsonl(
@@ -97,7 +106,8 @@ def run(config_path: str | Path, once: bool = False, max_iterations: int = 0) ->
"ts": when.isoformat(),
"zone_counts": zone_counts,
"trash_deposit_count": trash_deposit_count,
"diagnostics": diagnostics,
"disposal_evidence": disposal_evidence_payloads(disposal_evidence),
"diagnostics": {**diagnostics, "trajectory": trajectory_diagnostics},
}
],
)
@@ -106,13 +116,32 @@ def run(config_path: str | Path, once: bool = False, max_iterations: int = 0) ->
except FrameCaptureError as exc:
append_jsonl(
diagnostics_path,
[{"ts": when.isoformat(), "error": "frame_capture_failed", "message": str(exc)}],
[
{
"ts": when.isoformat(),
"error": "frame_capture_failed",
"message": str(exc),
"disposal_evidence": [],
"diagnostics": {
"trajectory": {
"disabled": True,
"reason": "frame_capture_failed",
"emitted_evidence": 0,
}
},
}
],
)
print(f"{when.isoformat()} frame capture failed: {exc}")
if once or (max_iterations > 0 and iteration >= max_iterations):
break
time.sleep(sample_interval_seconds)
sleep_seconds = (
vision_settings.trajectory_sample_interval_seconds
if trajectory_tracker.has_active_candidates
else sample_interval_seconds
)
time.sleep(sleep_seconds)
def resolve_project_path(project_root: Path, raw_path: str) -> Path:
@@ -131,6 +160,10 @@ def append_jsonl(path: Path, payloads: list[dict]) -> None:
handle.write("\n")
def disposal_evidence_payloads(disposal_evidence: list[DisposalEvidence]) -> list[dict]:
return [asdict(item) for item in disposal_evidence]
def restore_runtime_state(diagnostics_path: Path, config: dict) -> tuple[dict[str, RegionMetrics], dict[str, int]]:
latest = load_jsonl_tail(diagnostics_path, 1)
if not latest:

View File

@@ -98,7 +98,7 @@ Create and evolve an independent git project under `~/Code` for monitoring food
| --- | --- | --- | --- |
| 1 | complete | 建立 `disposal_evidence` 数据契约并让状态机优先按来源区域丢弃 | `Observation` 支持 evidenceengine 能按 `source_zone_id` 精确关闭 pending batch同帧移除+evidence 有回归测试;旧 `trash_deposit_count` 仍可兜底 |
| 2 | complete | 实现无 YOLO 依赖的轻量轨迹检测 | synthetic frame 测试覆盖源区域到垃圾桶、非源区域运动、未到垃圾桶、单帧反光、多候选互不串扰;不引入模型依赖 |
| 3 | pending | 集成 runtime 配置、诊断和候选窗口加速采样 | `main.py` 写入 `disposal_evidence` 与 trajectory diagnostics配置默认 `trajectory_enabled=true``yolo_enabled=false`;候选活跃时使用更短采样间隔 |
| 3 | complete | 集成 runtime 配置、诊断和候选窗口加速采样 | `main.py` 写入 `disposal_evidence` 与 trajectory diagnostics配置默认 `trajectory_enabled=true``yolo_enabled=false`;候选活跃时使用更短采样间隔 |
| 4 | pending | 文档、全量验证和部署准备 | README/project/progress 更新Python 全量测试通过;前端测试/构建按影响范围验证;远端部署命令和风险记录清楚 |
### v1.2 Decisions

View File

@@ -3,9 +3,14 @@ from __future__ import annotations
import json
import tempfile
import unittest
from datetime import datetime
from pathlib import Path
from unittest.mock import patch
from cold_display_guard.main import restore_runtime_state
from cold_display_guard.frame_source import FrameCaptureError
from cold_display_guard.main import run, restore_runtime_state
from cold_display_guard.models import DisposalEvidence
from cold_display_guard.vision import Frame
class RuntimeRestoreTests(unittest.TestCase):
@@ -109,5 +114,187 @@ class RuntimeRestoreTests(unittest.TestCase):
self.assertEqual(baselines["4"].bright_fraction, 0.0)
class RuntimeLoopTests(unittest.TestCase):
def test_run_writes_disposal_evidence_and_trajectory_diagnostics(self) -> None:
with tempfile.TemporaryDirectory() as tmpdir:
config_path, diagnostics_path = write_runtime_config(tmpdir)
captured_observations = []
tracker_calls = []
class FakeSource:
def __init__(self, **kwargs: object) -> None:
pass
def capture(self) -> Frame:
return Frame(width=2, height=2, rgb=bytes([0, 0, 0]) * 4)
class FakeDetector:
def __init__(self, *args: object) -> None:
pass
def observe(self, frame: Frame, when: datetime) -> tuple[dict[str, int], int, dict[str, object]]:
return {"1": 0}, 0, {"zones": {"1": {"occupied": False}}}
class FakeTracker:
def __init__(self, *args: object) -> None:
self.has_active_candidates = False
def observe(
self,
frame: Frame,
when: datetime,
zone_counts: dict[str, int],
) -> tuple[list[DisposalEvidence], dict[str, object]]:
tracker_calls.append(zone_counts)
return [
DisposalEvidence(
source_zone_id="1",
target="trash",
confidence=0.9,
method="motion",
track_points=[{"x": 0.1, "y": 0.2}],
item_class=None,
detector_score=None,
observed_at=when.isoformat(),
)
], {"active_candidates": 0, "emitted_evidence": 1}
class FakeEngine:
def __init__(self, settings: object) -> None:
pass
def process(self, observation: object) -> list[dict[str, object]]:
captured_observations.append(observation)
return []
with patch("cold_display_guard.main.RTSPFrameSource", FakeSource), patch(
"cold_display_guard.main.ZoneOccupancyDetector", FakeDetector
), patch("cold_display_guard.main.TrajectoryTracker", FakeTracker), patch(
"cold_display_guard.main.BatchEngine", FakeEngine
):
run(config_path, max_iterations=1)
diagnostics = [json.loads(line) for line in diagnostics_path.read_text(encoding="utf-8").splitlines()]
self.assertEqual(len(captured_observations), 1)
self.assertEqual(tracker_calls, [{"1": 0}])
self.assertEqual(captured_observations[0].disposal_evidence[0].source_zone_id, "1")
self.assertEqual(diagnostics[0]["disposal_evidence"][0]["source_zone_id"], "1")
self.assertEqual(diagnostics[0]["disposal_evidence"][0]["target"], "trash")
self.assertEqual(diagnostics[0]["diagnostics"]["trajectory"]["emitted_evidence"], 1)
def test_run_uses_trajectory_sample_interval_when_candidates_are_active(self) -> None:
with tempfile.TemporaryDirectory() as tmpdir:
config_path, _ = write_runtime_config(tmpdir, sample_interval=5.0, trajectory_interval=1.0)
sleeps = []
tracker_calls = []
class FakeSource:
def __init__(self, **kwargs: object) -> None:
pass
def capture(self) -> Frame:
return Frame(width=2, height=2, rgb=bytes([0, 0, 0]) * 4)
class FakeDetector:
def __init__(self, *args: object) -> None:
pass
def observe(self, frame: Frame, when: datetime) -> tuple[dict[str, int], int, dict[str, object]]:
return {"1": 0}, 0, {}
class FakeTracker:
def __init__(self, *args: object) -> None:
self.has_active_candidates = False
def observe(
self,
frame: Frame,
when: datetime,
zone_counts: dict[str, int],
) -> tuple[list[DisposalEvidence], dict[str, object]]:
tracker_calls.append(zone_counts)
self.has_active_candidates = True
return [], {"active_candidates": 1}
with patch("cold_display_guard.main.RTSPFrameSource", FakeSource), patch(
"cold_display_guard.main.ZoneOccupancyDetector", FakeDetector
), patch("cold_display_guard.main.TrajectoryTracker", FakeTracker), patch(
"cold_display_guard.main.time.sleep", sleeps.append
):
run(config_path, max_iterations=2)
self.assertEqual(tracker_calls, [{"1": 0}, {"1": 0}])
self.assertEqual(sleeps, [1.0])
def test_capture_failure_diagnostics_keep_trajectory_schema(self) -> None:
with tempfile.TemporaryDirectory() as tmpdir:
config_path, diagnostics_path = write_runtime_config(tmpdir)
class FailingSource:
def __init__(self, **kwargs: object) -> None:
pass
def capture(self) -> Frame:
raise FrameCaptureError("camera offline")
with patch("cold_display_guard.main.RTSPFrameSource", FailingSource):
run(config_path, max_iterations=1)
diagnostics = [json.loads(line) for line in diagnostics_path.read_text(encoding="utf-8").splitlines()]
self.assertEqual(diagnostics[0]["error"], "frame_capture_failed")
self.assertEqual(diagnostics[0]["disposal_evidence"], [])
self.assertEqual(diagnostics[0]["diagnostics"]["trajectory"]["reason"], "frame_capture_failed")
def write_runtime_config(
tmpdir: str,
sample_interval: float = 5.0,
trajectory_interval: float = 1.0,
) -> tuple[Path, Path]:
root = Path(tmpdir)
event_path = root / "events.jsonl"
diagnostics_path = root / "runtime_diagnostics.jsonl"
config_path = root / "config.toml"
config_path.write_text(
"\n".join(
[
'camera_id = "test-camera"',
'timezone = "UTC"',
"",
"[stream]",
'rtsp_url = "rtsp://example.invalid/stream"',
"",
"[thresholds]",
"max_dwell_seconds = 1200",
"trash_confirmation_seconds = 120",
"",
"[layout]",
"zone_count = 1",
'zone_ids = ["1"]',
"",
"[[zones]]",
'id = "1"',
"polygon = [[0.0, 0.0], [0.5, 0.0], [0.5, 0.5], [0.0, 0.5]]",
"",
"[trash]",
"roi = [[0.6, 0.6], [1.0, 0.6], [1.0, 1.0], [0.6, 1.0]]",
"",
"[runtime]",
f"sample_interval_seconds = {sample_interval}",
f"trajectory_sample_interval_seconds = {trajectory_interval}",
f'diagnostics_path = "{diagnostics_path}"',
"",
"[event_sink]",
f'path = "{event_path}"',
"",
]
),
encoding="utf-8",
)
return config_path, diagnostics_path
if __name__ == "__main__":
unittest.main()

View File

@@ -547,6 +547,41 @@ class VisionTests(unittest.TestCase):
self.assertEqual(settings.yolo_model_path, "")
self.assertEqual(settings.yolo_min_confidence, 0.65)
def test_runtime_vision_settings_read_trajectory_and_yolo_fields_from_config(self) -> None:
settings = load_runtime_vision_settings(
{
"runtime": {
"trajectory_enabled": False,
"trajectory_window_seconds": 11,
"trajectory_sample_interval_seconds": 0.5,
"trajectory_min_points": 4,
"trajectory_min_confidence": 0.8,
"trajectory_motion_delta": 25.0,
"trajectory_min_blob_area": 20,
"trajectory_max_blob_area_fraction": 0.25,
"trajectory_trash_entry_margin": 0.02,
"trajectory_backend": "motion",
"yolo_enabled": True,
"yolo_model_path": "models/yolo.onnx",
"yolo_min_confidence": 0.7,
}
}
)
self.assertFalse(settings.trajectory_enabled)
self.assertEqual(settings.trajectory_window_seconds, 11)
self.assertEqual(settings.trajectory_sample_interval_seconds, 0.5)
self.assertEqual(settings.trajectory_min_points, 4)
self.assertEqual(settings.trajectory_min_confidence, 0.8)
self.assertEqual(settings.trajectory_motion_delta, 25.0)
self.assertEqual(settings.trajectory_min_blob_area, 20)
self.assertEqual(settings.trajectory_max_blob_area_fraction, 0.25)
self.assertEqual(settings.trajectory_trash_entry_margin, 0.02)
self.assertEqual(settings.trajectory_backend, "motion")
self.assertTrue(settings.yolo_enabled)
self.assertEqual(settings.yolo_model_path, "models/yolo.onnx")
self.assertEqual(settings.yolo_min_confidence, 0.7)
if __name__ == "__main__":
unittest.main()