feat: add lightweight trajectory tracking

This commit is contained in:
Yoilun
2026-05-29 15:48:06 +08:00
parent d805273a10
commit 39cfc76fa2
4 changed files with 741 additions and 1 deletions

View File

@@ -8,6 +8,7 @@ from cold_display_guard.vision import (
Region,
RegionMetrics,
RuntimeVisionSettings,
TrajectoryTracker,
ZoneOccupancyDetector,
load_runtime_vision_settings,
point_in_polygon,
@@ -38,6 +39,11 @@ def multi_patched_frame(width: int, height: int, base: int, patches: list[tuple[
return Frame(width=width, height=height, rgb=bytes(pixels))
def frame_with_motion_patch(width: int, height: int, top_left: tuple[int, int]) -> Frame:
x, y = top_left
return patched_frame(width, height, 40, (x, y, x + 8, y + 8, 180))
class VisionTests(unittest.TestCase):
def test_point_in_polygon(self) -> None:
polygon = ((0.0, 0.0), (1.0, 0.0), (1.0, 1.0), (0.0, 1.0))
@@ -271,6 +277,276 @@ class VisionTests(unittest.TestCase):
self.assertGreaterEqual(zone["dark_fraction"], 0.06)
self.assertGreaterEqual(zone["bright_fraction"], 0.18)
def test_motion_track_from_source_zone_to_trash_roi_emits_evidence(self) -> None:
source = Region("source", ((0.05, 0.35), (0.25, 0.35), (0.25, 0.65), (0.05, 0.65)))
trash = Region("trash", ((0.72, 0.35), (0.95, 0.35), (0.95, 0.65), (0.72, 0.65)))
tracker = TrajectoryTracker(
[source],
trash,
RuntimeVisionSettings(
trajectory_sample_interval_seconds=0.0,
trajectory_min_points=3,
trajectory_min_confidence=0.72,
trajectory_motion_delta=20.0,
trajectory_min_blob_area=12,
),
)
now = datetime(2026, 4, 28, 10, 0, tzinfo=timezone.utc)
tracker.observe(frame_with_motion_patch(80, 80, (8, 36)), now, {"source": 1})
all_evidence: list[object] = []
emitted_evidence_count = 0
for index, point in enumerate([(16, 36), (28, 36), (42, 36), (58, 36), (66, 36)]):
evidence, diagnostics = tracker.observe(
frame_with_motion_patch(80, 80, point),
now + timedelta(seconds=index + 1),
{"source": 0},
)
all_evidence.extend(evidence)
emitted_evidence_count += diagnostics["emitted_evidence"]
self.assertTrue(all_evidence)
emitted = all_evidence[0]
self.assertEqual(emitted.source_zone_id, "source")
self.assertEqual(emitted.target, "trash")
self.assertEqual(emitted.method, "motion")
self.assertGreaterEqual(emitted.confidence, 0.72)
self.assertGreaterEqual(len(emitted.track_points), 3)
self.assertGreaterEqual(emitted_evidence_count, 1)
def test_motion_that_starts_away_from_source_zone_is_rejected(self) -> None:
source = Region("source", ((0.05, 0.35), (0.25, 0.35), (0.25, 0.65), (0.05, 0.65)))
trash = Region("trash", ((0.72, 0.35), (0.95, 0.35), (0.95, 0.65), (0.72, 0.65)))
tracker = TrajectoryTracker(
[source],
trash,
RuntimeVisionSettings(trajectory_sample_interval_seconds=0.0, trajectory_min_points=3),
)
now = datetime(2026, 4, 28, 10, 0, tzinfo=timezone.utc)
tracker.observe(frame_with_motion_patch(80, 80, (50, 10)), now, {"source": 1})
all_evidence: list[object] = []
rejected_candidates = 0
for index, point in enumerate([(52, 14), (56, 20), (60, 28), (66, 36)]):
evidence, diagnostics = tracker.observe(
frame_with_motion_patch(80, 80, point),
now + timedelta(seconds=index + 1),
{"source": 0},
)
all_evidence.extend(evidence)
rejected_candidates += diagnostics["rejected_candidates"]
self.assertEqual(all_evidence, [])
self.assertGreaterEqual(rejected_candidates, 1)
def test_motion_that_never_reaches_trash_roi_is_rejected(self) -> None:
source = Region("source", ((0.05, 0.35), (0.25, 0.35), (0.25, 0.65), (0.05, 0.65)))
trash = Region("trash", ((0.72, 0.35), (0.95, 0.35), (0.95, 0.65), (0.72, 0.65)))
tracker = TrajectoryTracker(
[source],
trash,
RuntimeVisionSettings(
trajectory_window_seconds=3,
trajectory_sample_interval_seconds=0.0,
trajectory_min_points=3,
),
)
now = datetime(2026, 4, 28, 10, 0, tzinfo=timezone.utc)
tracker.observe(frame_with_motion_patch(80, 80, (8, 36)), now, {"source": 1})
all_evidence: list[object] = []
diagnostics = {}
for index, point in enumerate([(16, 36), (26, 36), (36, 36), (42, 36), (46, 36)]):
evidence, diagnostics = tracker.observe(
frame_with_motion_patch(80, 80, point),
now + timedelta(seconds=index + 1),
{"source": 0},
)
all_evidence.extend(evidence)
self.assertEqual(all_evidence, [])
self.assertGreaterEqual(diagnostics["expired_candidates"], 1)
self.assertGreaterEqual(diagnostics["rejected_candidates"], 1)
def test_one_frame_reflection_flash_is_rejected(self) -> None:
source = Region("source", ((0.05, 0.35), (0.25, 0.35), (0.25, 0.65), (0.05, 0.65)))
trash = Region("trash", ((0.72, 0.35), (0.95, 0.35), (0.95, 0.65), (0.72, 0.65)))
tracker = TrajectoryTracker(
[source],
trash,
RuntimeVisionSettings(trajectory_sample_interval_seconds=0.0, trajectory_min_points=3),
)
now = datetime(2026, 4, 28, 10, 0, tzinfo=timezone.utc)
tracker.observe(solid_frame(80, 80, 40), now, {"source": 1})
flash_frame = patched_frame(80, 80, 40, (56, 28, 72, 52, 255))
evidence, diagnostics = tracker.observe(flash_frame, now + timedelta(seconds=1), {"source": 0})
later_evidence, later_diagnostics = tracker.observe(solid_frame(80, 80, 40), now + timedelta(seconds=2), {"source": 0})
self.assertEqual(evidence, [])
self.assertEqual(later_evidence, [])
self.assertEqual(diagnostics["emitted_evidence"], 0)
self.assertEqual(later_diagnostics["emitted_evidence"], 0)
self.assertEqual(later_diagnostics["rejected_candidates"], 0)
def test_multiple_active_candidates_do_not_cross_close_each_other(self) -> None:
left = Region("left", ((0.05, 0.15), (0.25, 0.15), (0.25, 0.35), (0.05, 0.35)))
right = Region("right", ((0.05, 0.65), (0.25, 0.65), (0.25, 0.85), (0.05, 0.85)))
trash = Region("trash", ((0.72, 0.35), (0.95, 0.35), (0.95, 0.65), (0.72, 0.65)))
tracker = TrajectoryTracker(
[left, right],
trash,
RuntimeVisionSettings(trajectory_sample_interval_seconds=0.0, trajectory_min_points=3),
)
now = datetime(2026, 4, 28, 10, 0, tzinfo=timezone.utc)
tracker.observe(
multi_patched_frame(80, 80, 40, [(8, 18, 16, 26, 180), (8, 54, 16, 62, 180)]),
now,
{"left": 1, "right": 1},
)
all_evidence = []
frames = [
[(16, 20, 24, 28, 180), (18, 54, 26, 62, 180)],
[(28, 24, 36, 32, 180), (30, 52, 38, 60, 180)],
[(44, 30, 52, 38, 180), (44, 50, 52, 58, 180)],
[(60, 36, 68, 44, 180), (60, 50, 68, 58, 180)],
]
for index, patches in enumerate(frames):
evidence, _ = tracker.observe(
multi_patched_frame(80, 80, 40, patches),
now + timedelta(seconds=index + 1),
{"left": 0, "right": 0},
)
all_evidence.extend(evidence)
self.assertEqual({item.source_zone_id for item in all_evidence}, {"left", "right"})
self.assertEqual(len(all_evidence), 2)
def test_multiple_active_candidates_do_not_emit_same_motion_track(self) -> None:
first = Region("first", ((0.05, 0.35), (0.25, 0.35), (0.25, 0.65), (0.05, 0.65)))
second = Region("second", ((0.05, 0.35), (0.25, 0.35), (0.25, 0.65), (0.05, 0.65)))
trash = Region("trash", ((0.72, 0.35), (0.95, 0.35), (0.95, 0.65), (0.72, 0.65)))
tracker = TrajectoryTracker(
[first, second],
trash,
RuntimeVisionSettings(trajectory_sample_interval_seconds=0.0, trajectory_min_points=3),
)
now = datetime(2026, 4, 28, 10, 0, tzinfo=timezone.utc)
tracker.observe(frame_with_motion_patch(80, 80, (8, 36)), now, {"first": 1, "second": 1})
all_evidence = []
rejected = []
for index, point in enumerate([(16, 36), (28, 36), (42, 36), (58, 36), (66, 36)]):
evidence, diagnostics = tracker.observe(
frame_with_motion_patch(80, 80, point),
now + timedelta(seconds=index + 1),
{"first": 0, "second": 0},
)
all_evidence.extend(evidence)
rejected.extend(diagnostics["rejected"])
tracks = [tuple((point["x"], point["y"]) for point in item.track_points) for item in all_evidence]
self.assertLessEqual(len(all_evidence), 1)
self.assertEqual(len(tracks), len(set(tracks)))
self.assertTrue(any(item["reason"] == "ambiguous_motion_track" for item in rejected))
def test_motion_inside_source_margin_but_outside_source_polygon_is_rejected(self) -> None:
source = Region("source", ((0.05, 0.35), (0.25, 0.35), (0.25, 0.65), (0.05, 0.65)))
trash = Region("trash", ((0.72, 0.35), (0.95, 0.35), (0.95, 0.65), (0.72, 0.65)))
tracker = TrajectoryTracker(
[source],
trash,
RuntimeVisionSettings(trajectory_sample_interval_seconds=0.0, trajectory_min_points=3),
)
now = datetime(2026, 4, 28, 10, 0, tzinfo=timezone.utc)
tracker.observe(frame_with_motion_patch(80, 80, (20, 36)), now, {"source": 1})
all_evidence = []
rejected = []
for index, point in enumerate([(24, 36), (36, 36), (50, 36), (66, 36), (70, 36)]):
evidence, diagnostics = tracker.observe(
frame_with_motion_patch(80, 80, point),
now + timedelta(seconds=index + 1),
{"source": 0},
)
all_evidence.extend(evidence)
rejected.extend(diagnostics["rejected"])
self.assertEqual(all_evidence, [])
self.assertTrue(any(item["reason"] == "missing_source_motion" for item in rejected))
def test_trajectory_diagnostics_include_per_candidate_events(self) -> None:
source = Region("source", ((0.05, 0.35), (0.25, 0.35), (0.25, 0.65), (0.05, 0.65)))
trash = Region("trash", ((0.72, 0.35), (0.95, 0.35), (0.95, 0.65), (0.72, 0.65)))
tracker = TrajectoryTracker(
[source],
trash,
RuntimeVisionSettings(
trajectory_window_seconds=3,
trajectory_sample_interval_seconds=0.0,
trajectory_min_points=3,
),
)
now = datetime(2026, 4, 28, 10, 0, tzinfo=timezone.utc)
tracker.observe(frame_with_motion_patch(80, 80, (8, 36)), now, {"source": 1})
emitted_diagnostics = {}
for index, point in enumerate([(16, 36), (28, 36), (42, 36), (58, 36)]):
_, emitted_diagnostics = tracker.observe(
frame_with_motion_patch(80, 80, point),
now + timedelta(seconds=index + 1),
{"source": 0},
)
emitted_event = emitted_diagnostics["emitted"][0]
self.assert_candidate_event(emitted_event, "source", "emitted")
tracker = TrajectoryTracker(
[source],
trash,
RuntimeVisionSettings(
trajectory_window_seconds=2,
trajectory_sample_interval_seconds=0.0,
trajectory_min_points=3,
),
)
tracker.observe(frame_with_motion_patch(80, 80, (8, 36)), now, {"source": 1})
rejected_diagnostics = {}
for index, point in enumerate([(16, 36), (26, 36), (36, 36), (42, 36)]):
_, rejected_diagnostics = tracker.observe(
frame_with_motion_patch(80, 80, point),
now + timedelta(seconds=index + 1),
{"source": 0},
)
expired_event = rejected_diagnostics["expired"][0]
rejected_event = rejected_diagnostics["rejected"][0]
self.assert_candidate_event(expired_event, "source", "expired")
self.assert_candidate_event(rejected_event, "source", "did_not_reach_trash")
def assert_candidate_event(self, event: dict[str, object], source_zone_id: str, reason: str) -> None:
self.assertEqual(event["source_zone_id"], source_zone_id)
self.assertEqual(event["reason"], reason)
self.assertIn("point_count", event)
self.assertIn("confidence", event)
self.assertIn("direction_score", event)
def test_runtime_vision_defaults_include_trajectory_and_yolo_fields(self) -> None:
settings = load_runtime_vision_settings({})
self.assertTrue(settings.trajectory_enabled)
self.assertEqual(settings.trajectory_window_seconds, 8)
self.assertEqual(settings.trajectory_sample_interval_seconds, 1.0)
self.assertEqual(settings.trajectory_min_points, 3)
self.assertEqual(settings.trajectory_min_confidence, 0.72)
self.assertEqual(settings.trajectory_motion_delta, 20.0)
self.assertEqual(settings.trajectory_min_blob_area, 12)
self.assertEqual(settings.trajectory_max_blob_area_fraction, 0.35)
self.assertEqual(settings.trajectory_trash_entry_margin, 0.04)
self.assertEqual(settings.trajectory_backend, "motion")
self.assertFalse(settings.yolo_enabled)
self.assertEqual(settings.yolo_model_path, "")
self.assertEqual(settings.yolo_min_confidence, 0.65)
if __name__ == "__main__":
unittest.main()