620 lines
27 KiB
Python
620 lines
27 KiB
Python
from __future__ import annotations
|
|
|
|
import unittest
|
|
from datetime import datetime, timedelta, timezone
|
|
|
|
from cold_display_guard.vision import (
|
|
Frame,
|
|
Region,
|
|
RegionMetrics,
|
|
RuntimeVisionSettings,
|
|
TrajectoryTracker,
|
|
ZoneOccupancyDetector,
|
|
load_runtime_vision_settings,
|
|
point_in_polygon,
|
|
)
|
|
|
|
|
|
def solid_frame(width: int, height: int, value: int) -> Frame:
|
|
return Frame(width=width, height=height, rgb=bytes([value, value, value]) * width * height)
|
|
|
|
|
|
def patched_frame(width: int, height: int, base: int, patch: tuple[int, int, int, int, int]) -> Frame:
|
|
x1, y1, x2, y2, value = patch
|
|
pixels = bytearray(bytes([base, base, base]) * width * height)
|
|
for y in range(y1, y2):
|
|
for x in range(x1, x2):
|
|
offset = (y * width + x) * 3
|
|
pixels[offset : offset + 3] = bytes([value, value, value])
|
|
return Frame(width=width, height=height, rgb=bytes(pixels))
|
|
|
|
|
|
def multi_patched_frame(width: int, height: int, base: int, patches: list[tuple[int, int, int, int, int]]) -> Frame:
|
|
pixels = bytearray(bytes([base, base, base]) * width * height)
|
|
for x1, y1, x2, y2, value in patches:
|
|
for y in range(y1, y2):
|
|
for x in range(x1, x2):
|
|
offset = (y * width + x) * 3
|
|
pixels[offset : offset + 3] = bytes([value, value, value])
|
|
return Frame(width=width, height=height, rgb=bytes(pixels))
|
|
|
|
|
|
def frame_with_motion_patch(width: int, height: int, top_left: tuple[int, int]) -> Frame:
|
|
x, y = top_left
|
|
return patched_frame(width, height, 40, (x, y, x + 8, y + 8, 180))
|
|
|
|
|
|
class VisionTests(unittest.TestCase):
|
|
def test_point_in_polygon(self) -> None:
|
|
polygon = ((0.0, 0.0), (1.0, 0.0), (1.0, 1.0), (0.0, 1.0))
|
|
|
|
self.assertTrue(point_in_polygon(0.5, 0.5, polygon))
|
|
self.assertFalse(point_in_polygon(1.5, 0.5, polygon))
|
|
|
|
def test_detector_reports_occupied_after_baseline_changes(self) -> None:
|
|
detector = ZoneOccupancyDetector(
|
|
[Region("r1c1", ((0.0, 0.0), (1.0, 0.0), (1.0, 1.0), (0.0, 1.0)))],
|
|
trash_region=None,
|
|
settings=RuntimeVisionSettings(
|
|
baseline_frames=1,
|
|
sample_stride_pixels=4,
|
|
occupancy_mean_delta=10,
|
|
occupancy_texture_delta=10,
|
|
occupancy_confirm_frames=1,
|
|
empty_confirm_frames=1,
|
|
),
|
|
)
|
|
now = datetime(2026, 4, 28, 10, 0, tzinfo=timezone.utc)
|
|
|
|
baseline_counts, _, _ = detector.observe(solid_frame(32, 32, 30), now)
|
|
changed_counts, _, _ = detector.observe(patched_frame(32, 32, 30, (0, 0, 32, 32, 90)), now)
|
|
|
|
self.assertEqual(baseline_counts, {"r1c1": 0})
|
|
self.assertEqual(changed_counts, {"r1c1": 1})
|
|
|
|
def test_detector_reports_trash_motion(self) -> None:
|
|
trash = Region("trash", ((0.0, 0.0), (1.0, 0.0), (1.0, 1.0), (0.0, 1.0)))
|
|
detector = ZoneOccupancyDetector(
|
|
[],
|
|
trash_region=trash,
|
|
settings=RuntimeVisionSettings(sample_stride_pixels=4, trash_motion_delta=10),
|
|
)
|
|
now = datetime(2026, 4, 28, 10, 0, tzinfo=timezone.utc)
|
|
|
|
_, first_deposit, _ = detector.observe(solid_frame(32, 32, 30), now)
|
|
_, second_deposit, _ = detector.observe(solid_frame(32, 32, 90), now)
|
|
|
|
self.assertEqual(first_deposit, 0)
|
|
self.assertEqual(second_deposit, 1)
|
|
|
|
def test_detector_reports_sustained_trash_motion_below_single_frame_threshold(self) -> None:
|
|
trash = Region("trash", ((0.0, 0.0), (1.0, 0.0), (1.0, 1.0), (0.0, 1.0)))
|
|
detector = ZoneOccupancyDetector(
|
|
[],
|
|
trash_region=trash,
|
|
settings=RuntimeVisionSettings(
|
|
sample_stride_pixels=4,
|
|
trash_motion_delta=18,
|
|
trash_sustained_motion_delta=8,
|
|
trash_sustained_motion_frames=2,
|
|
),
|
|
)
|
|
now = datetime(2026, 4, 28, 10, 0, tzinfo=timezone.utc)
|
|
|
|
_, first_deposit, _ = detector.observe(solid_frame(32, 32, 30), now)
|
|
_, second_deposit, second_diagnostics = detector.observe(solid_frame(32, 32, 39), now)
|
|
_, third_deposit, third_diagnostics = detector.observe(solid_frame(32, 32, 48), now)
|
|
|
|
self.assertEqual(first_deposit, 0)
|
|
self.assertEqual(second_deposit, 0)
|
|
self.assertEqual(second_diagnostics["trash"]["motion_streak"], 1)
|
|
self.assertEqual(third_deposit, 1)
|
|
self.assertEqual(third_diagnostics["trash"]["motion_streak"], 2)
|
|
|
|
def test_detector_allows_quick_sequential_strong_trash_motions(self) -> None:
|
|
trash = Region("trash", ((0.0, 0.0), (1.0, 0.0), (1.0, 1.0), (0.0, 1.0)))
|
|
detector = ZoneOccupancyDetector(
|
|
[],
|
|
trash_region=trash,
|
|
settings=RuntimeVisionSettings(sample_stride_pixels=4, trash_motion_delta=18),
|
|
)
|
|
now = datetime(2026, 4, 28, 10, 0, tzinfo=timezone.utc)
|
|
|
|
_, first_deposit, _ = detector.observe(solid_frame(32, 32, 30), now)
|
|
_, second_deposit, _ = detector.observe(solid_frame(32, 32, 90), now + timedelta(seconds=1))
|
|
_, third_deposit, third_diagnostics = detector.observe(solid_frame(32, 32, 30), now + timedelta(seconds=7))
|
|
|
|
self.assertEqual(first_deposit, 0)
|
|
self.assertEqual(second_deposit, 1)
|
|
self.assertEqual(third_deposit, 1)
|
|
self.assertFalse(third_diagnostics["trash"]["in_cooldown"])
|
|
|
|
def test_detector_requires_consecutive_occupied_frames(self) -> None:
|
|
detector = ZoneOccupancyDetector(
|
|
[Region("1", ((0.0, 0.0), (1.0, 0.0), (1.0, 1.0), (0.0, 1.0)))],
|
|
trash_region=None,
|
|
settings=RuntimeVisionSettings(
|
|
baseline_frames=1,
|
|
sample_stride_pixels=4,
|
|
occupancy_mean_delta=10,
|
|
occupancy_texture_delta=10,
|
|
occupancy_confirm_frames=2,
|
|
empty_confirm_frames=2,
|
|
),
|
|
)
|
|
now = datetime(2026, 4, 28, 10, 0, tzinfo=timezone.utc)
|
|
|
|
detector.observe(solid_frame(32, 32, 30), now)
|
|
first_counts, _, first_diagnostics = detector.observe(solid_frame(32, 32, 90), now)
|
|
second_counts, _, second_diagnostics = detector.observe(solid_frame(32, 32, 90), now)
|
|
first_empty_counts, _, _ = detector.observe(solid_frame(32, 32, 30), now)
|
|
second_empty_counts, _, _ = detector.observe(solid_frame(32, 32, 30), now)
|
|
|
|
self.assertEqual(first_counts, {"1": 0})
|
|
self.assertTrue(first_diagnostics["zones"]["1"]["raw_occupied"])
|
|
self.assertEqual(first_diagnostics["zones"]["1"]["occupied_streak"], 1)
|
|
self.assertEqual(second_counts, {"1": 1})
|
|
self.assertTrue(second_diagnostics["zones"]["1"]["occupied"])
|
|
self.assertEqual(first_empty_counts, {"1": 1})
|
|
self.assertEqual(second_empty_counts, {"1": 0})
|
|
|
|
def test_runtime_vision_defaults_raise_brightness_reflection_threshold(self) -> None:
|
|
settings = load_runtime_vision_settings({})
|
|
|
|
self.assertEqual(settings.sample_stride_pixels, 4)
|
|
self.assertEqual(settings.occupancy_mean_delta, 55.0)
|
|
self.assertEqual(settings.occupancy_confirm_frames, 2)
|
|
self.assertEqual(settings.empty_confirm_frames, 2)
|
|
self.assertEqual(settings.trash_motion_delta, 18.0)
|
|
self.assertEqual(settings.trash_sustained_motion_delta, 8.0)
|
|
self.assertEqual(settings.trash_sustained_motion_frames, 2)
|
|
self.assertEqual(settings.trash_motion_cooldown_seconds, 3)
|
|
|
|
def test_detector_can_seed_previous_baseline_and_occupancy(self) -> None:
|
|
detector = ZoneOccupancyDetector(
|
|
[Region("1", ((0.0, 0.0), (1.0, 0.0), (1.0, 1.0), (0.0, 1.0)))],
|
|
trash_region=None,
|
|
settings=RuntimeVisionSettings(
|
|
baseline_frames=10,
|
|
sample_stride_pixels=4,
|
|
occupancy_mean_delta=55,
|
|
occupancy_texture_delta=18,
|
|
occupancy_confirm_frames=2,
|
|
empty_confirm_frames=2,
|
|
),
|
|
)
|
|
detector.seed_baseline({"1": RegionMetrics(mean_luma=30.0, texture=0.0, sample_count=1)})
|
|
detector.seed_occupancy({"1": 1})
|
|
|
|
counts, _, diagnostics = detector.observe(solid_frame(32, 32, 90), datetime(2026, 4, 28, 10, 0, tzinfo=timezone.utc))
|
|
|
|
self.assertTrue(diagnostics["baseline_ready"])
|
|
self.assertEqual(counts, {"1": 1})
|
|
self.assertTrue(diagnostics["zones"]["1"]["occupied"])
|
|
|
|
def test_detector_reports_compact_dark_object_as_occupied(self) -> None:
|
|
detector = ZoneOccupancyDetector(
|
|
[Region("1", ((0.0, 0.0), (1.0, 0.0), (1.0, 1.0), (0.0, 1.0)))],
|
|
trash_region=None,
|
|
settings=RuntimeVisionSettings(
|
|
baseline_frames=1,
|
|
sample_stride_pixels=4,
|
|
occupancy_mean_delta=55,
|
|
occupancy_texture_delta=100,
|
|
occupancy_dark_luma_threshold=80,
|
|
occupancy_dark_fraction=0.06,
|
|
occupancy_confirm_frames=1,
|
|
empty_confirm_frames=1,
|
|
),
|
|
)
|
|
now = datetime(2026, 4, 28, 10, 0, tzinfo=timezone.utc)
|
|
|
|
detector.observe(solid_frame(32, 32, 180), now)
|
|
counts, _, diagnostics = detector.observe(patched_frame(32, 32, 180, (0, 0, 8, 32, 20)), now)
|
|
|
|
self.assertEqual(counts, {"1": 1})
|
|
self.assertGreaterEqual(diagnostics["zones"]["1"]["dark_fraction"], 0.06)
|
|
|
|
def test_detector_ignores_bright_reflection_without_dark_object(self) -> None:
|
|
detector = ZoneOccupancyDetector(
|
|
[Region("1", ((0.0, 0.0), (1.0, 0.0), (1.0, 1.0), (0.0, 1.0)))],
|
|
trash_region=None,
|
|
settings=RuntimeVisionSettings(
|
|
baseline_frames=1,
|
|
sample_stride_pixels=4,
|
|
occupancy_mean_delta=55,
|
|
occupancy_texture_delta=10,
|
|
occupancy_dark_luma_threshold=80,
|
|
occupancy_dark_fraction=0.06,
|
|
occupancy_texture_dark_fraction=0.04,
|
|
occupancy_confirm_frames=1,
|
|
empty_confirm_frames=1,
|
|
),
|
|
)
|
|
now = datetime(2026, 4, 28, 10, 0, tzinfo=timezone.utc)
|
|
|
|
detector.observe(solid_frame(32, 32, 160), now)
|
|
counts, _, diagnostics = detector.observe(patched_frame(32, 32, 160, (0, 0, 8, 32, 255)), now)
|
|
|
|
self.assertEqual(counts, {"1": 0})
|
|
self.assertGreaterEqual(diagnostics["zones"]["1"]["texture_delta"], 10)
|
|
self.assertLess(diagnostics["zones"]["1"]["dark_fraction"], 0.04)
|
|
|
|
def test_detector_ignores_bright_reflection_with_small_dark_edge(self) -> None:
|
|
detector = ZoneOccupancyDetector(
|
|
[Region("1", ((0.0, 0.0), (1.0, 0.0), (1.0, 1.0), (0.0, 1.0)))],
|
|
trash_region=None,
|
|
settings=RuntimeVisionSettings(
|
|
baseline_frames=1,
|
|
sample_stride_pixels=4,
|
|
occupancy_mean_delta=55,
|
|
occupancy_texture_delta=18,
|
|
occupancy_dark_luma_threshold=80,
|
|
occupancy_dark_fraction=0.06,
|
|
occupancy_texture_dark_fraction=0.04,
|
|
occupancy_confirm_frames=1,
|
|
empty_confirm_frames=1,
|
|
),
|
|
)
|
|
now = datetime(2026, 4, 28, 10, 0, tzinfo=timezone.utc)
|
|
|
|
detector.observe(solid_frame(40, 40, 180), now)
|
|
counts, _, diagnostics = detector.observe(
|
|
multi_patched_frame(
|
|
40,
|
|
40,
|
|
180,
|
|
[
|
|
(0, 0, 12, 40, 255),
|
|
(12, 0, 16, 32, 20),
|
|
],
|
|
),
|
|
now,
|
|
)
|
|
|
|
zone = diagnostics["zones"]["1"]
|
|
self.assertEqual(counts, {"1": 0})
|
|
self.assertGreaterEqual(zone["dark_fraction"], 0.06)
|
|
self.assertGreaterEqual(zone["bright_fraction"], 0.18)
|
|
|
|
def test_motion_track_from_source_zone_to_trash_roi_emits_evidence(self) -> None:
|
|
source = Region("source", ((0.05, 0.35), (0.25, 0.35), (0.25, 0.65), (0.05, 0.65)))
|
|
trash = Region("trash", ((0.72, 0.35), (0.95, 0.35), (0.95, 0.65), (0.72, 0.65)))
|
|
tracker = TrajectoryTracker(
|
|
[source],
|
|
trash,
|
|
RuntimeVisionSettings(
|
|
trajectory_sample_interval_seconds=0.0,
|
|
trajectory_min_points=3,
|
|
trajectory_min_confidence=0.72,
|
|
trajectory_motion_delta=20.0,
|
|
trajectory_min_blob_area=12,
|
|
),
|
|
)
|
|
now = datetime(2026, 4, 28, 10, 0, tzinfo=timezone.utc)
|
|
|
|
tracker.observe(frame_with_motion_patch(80, 80, (8, 36)), now, {"source": 1})
|
|
all_evidence: list[object] = []
|
|
emitted_evidence_count = 0
|
|
for index, point in enumerate([(16, 36), (28, 36), (42, 36), (58, 36), (66, 36)]):
|
|
evidence, diagnostics = tracker.observe(
|
|
frame_with_motion_patch(80, 80, point),
|
|
now + timedelta(seconds=index + 1),
|
|
{"source": 0},
|
|
)
|
|
all_evidence.extend(evidence)
|
|
emitted_evidence_count += diagnostics["emitted_evidence"]
|
|
|
|
self.assertTrue(all_evidence)
|
|
emitted = all_evidence[0]
|
|
self.assertEqual(emitted.source_zone_id, "source")
|
|
self.assertEqual(emitted.target, "trash")
|
|
self.assertEqual(emitted.method, "motion")
|
|
self.assertGreaterEqual(emitted.confidence, 0.72)
|
|
self.assertGreaterEqual(len(emitted.track_points), 3)
|
|
self.assertGreaterEqual(emitted_evidence_count, 1)
|
|
|
|
def test_motion_that_starts_away_from_source_zone_is_rejected(self) -> None:
|
|
source = Region("source", ((0.05, 0.35), (0.25, 0.35), (0.25, 0.65), (0.05, 0.65)))
|
|
trash = Region("trash", ((0.72, 0.35), (0.95, 0.35), (0.95, 0.65), (0.72, 0.65)))
|
|
tracker = TrajectoryTracker(
|
|
[source],
|
|
trash,
|
|
RuntimeVisionSettings(trajectory_sample_interval_seconds=0.0, trajectory_min_points=3),
|
|
)
|
|
now = datetime(2026, 4, 28, 10, 0, tzinfo=timezone.utc)
|
|
|
|
tracker.observe(frame_with_motion_patch(80, 80, (50, 10)), now, {"source": 1})
|
|
all_evidence: list[object] = []
|
|
rejected_candidates = 0
|
|
for index, point in enumerate([(52, 14), (56, 20), (60, 28), (66, 36)]):
|
|
evidence, diagnostics = tracker.observe(
|
|
frame_with_motion_patch(80, 80, point),
|
|
now + timedelta(seconds=index + 1),
|
|
{"source": 0},
|
|
)
|
|
all_evidence.extend(evidence)
|
|
rejected_candidates += diagnostics["rejected_candidates"]
|
|
|
|
self.assertEqual(all_evidence, [])
|
|
self.assertGreaterEqual(rejected_candidates, 1)
|
|
|
|
def test_motion_that_never_reaches_trash_roi_is_rejected(self) -> None:
|
|
source = Region("source", ((0.05, 0.35), (0.25, 0.35), (0.25, 0.65), (0.05, 0.65)))
|
|
trash = Region("trash", ((0.72, 0.35), (0.95, 0.35), (0.95, 0.65), (0.72, 0.65)))
|
|
tracker = TrajectoryTracker(
|
|
[source],
|
|
trash,
|
|
RuntimeVisionSettings(
|
|
trajectory_window_seconds=3,
|
|
trajectory_sample_interval_seconds=0.0,
|
|
trajectory_min_points=3,
|
|
),
|
|
)
|
|
now = datetime(2026, 4, 28, 10, 0, tzinfo=timezone.utc)
|
|
|
|
tracker.observe(frame_with_motion_patch(80, 80, (8, 36)), now, {"source": 1})
|
|
all_evidence: list[object] = []
|
|
diagnostics = {}
|
|
for index, point in enumerate([(16, 36), (26, 36), (36, 36), (42, 36), (46, 36)]):
|
|
evidence, diagnostics = tracker.observe(
|
|
frame_with_motion_patch(80, 80, point),
|
|
now + timedelta(seconds=index + 1),
|
|
{"source": 0},
|
|
)
|
|
all_evidence.extend(evidence)
|
|
|
|
self.assertEqual(all_evidence, [])
|
|
self.assertGreaterEqual(diagnostics["expired_candidates"], 1)
|
|
self.assertGreaterEqual(diagnostics["rejected_candidates"], 1)
|
|
|
|
def test_one_frame_reflection_flash_is_rejected(self) -> None:
|
|
source = Region("source", ((0.05, 0.35), (0.25, 0.35), (0.25, 0.65), (0.05, 0.65)))
|
|
trash = Region("trash", ((0.72, 0.35), (0.95, 0.35), (0.95, 0.65), (0.72, 0.65)))
|
|
tracker = TrajectoryTracker(
|
|
[source],
|
|
trash,
|
|
RuntimeVisionSettings(trajectory_sample_interval_seconds=0.0, trajectory_min_points=3),
|
|
)
|
|
now = datetime(2026, 4, 28, 10, 0, tzinfo=timezone.utc)
|
|
|
|
tracker.observe(solid_frame(80, 80, 40), now, {"source": 1})
|
|
flash_frame = patched_frame(80, 80, 40, (56, 28, 72, 52, 255))
|
|
evidence, diagnostics = tracker.observe(flash_frame, now + timedelta(seconds=1), {"source": 0})
|
|
later_evidence, later_diagnostics = tracker.observe(solid_frame(80, 80, 40), now + timedelta(seconds=2), {"source": 0})
|
|
|
|
self.assertEqual(evidence, [])
|
|
self.assertEqual(later_evidence, [])
|
|
self.assertEqual(diagnostics["emitted_evidence"], 0)
|
|
self.assertEqual(later_diagnostics["emitted_evidence"], 0)
|
|
self.assertEqual(later_diagnostics["rejected_candidates"], 0)
|
|
|
|
def test_multiple_active_candidates_do_not_cross_close_each_other(self) -> None:
|
|
left = Region("left", ((0.05, 0.15), (0.25, 0.15), (0.25, 0.35), (0.05, 0.35)))
|
|
right = Region("right", ((0.05, 0.65), (0.25, 0.65), (0.25, 0.85), (0.05, 0.85)))
|
|
trash = Region("trash", ((0.72, 0.35), (0.95, 0.35), (0.95, 0.65), (0.72, 0.65)))
|
|
tracker = TrajectoryTracker(
|
|
[left, right],
|
|
trash,
|
|
RuntimeVisionSettings(trajectory_sample_interval_seconds=0.0, trajectory_min_points=3),
|
|
)
|
|
now = datetime(2026, 4, 28, 10, 0, tzinfo=timezone.utc)
|
|
|
|
tracker.observe(
|
|
multi_patched_frame(80, 80, 40, [(8, 18, 16, 26, 180), (8, 54, 16, 62, 180)]),
|
|
now,
|
|
{"left": 1, "right": 1},
|
|
)
|
|
all_evidence = []
|
|
frames = [
|
|
[(16, 20, 24, 28, 180), (18, 54, 26, 62, 180)],
|
|
[(28, 24, 36, 32, 180), (30, 52, 38, 60, 180)],
|
|
[(44, 30, 52, 38, 180), (44, 50, 52, 58, 180)],
|
|
[(60, 36, 68, 44, 180), (60, 50, 68, 58, 180)],
|
|
]
|
|
for index, patches in enumerate(frames):
|
|
evidence, _ = tracker.observe(
|
|
multi_patched_frame(80, 80, 40, patches),
|
|
now + timedelta(seconds=index + 1),
|
|
{"left": 0, "right": 0},
|
|
)
|
|
all_evidence.extend(evidence)
|
|
|
|
self.assertEqual({item.source_zone_id for item in all_evidence}, {"left", "right"})
|
|
self.assertEqual(len(all_evidence), 2)
|
|
|
|
def test_multiple_active_candidates_do_not_emit_same_motion_track(self) -> None:
|
|
first = Region("first", ((0.05, 0.35), (0.25, 0.35), (0.25, 0.65), (0.05, 0.65)))
|
|
second = Region("second", ((0.05, 0.35), (0.25, 0.35), (0.25, 0.65), (0.05, 0.65)))
|
|
trash = Region("trash", ((0.72, 0.35), (0.95, 0.35), (0.95, 0.65), (0.72, 0.65)))
|
|
tracker = TrajectoryTracker(
|
|
[first, second],
|
|
trash,
|
|
RuntimeVisionSettings(trajectory_sample_interval_seconds=0.0, trajectory_min_points=3),
|
|
)
|
|
now = datetime(2026, 4, 28, 10, 0, tzinfo=timezone.utc)
|
|
|
|
tracker.observe(frame_with_motion_patch(80, 80, (8, 36)), now, {"first": 1, "second": 1})
|
|
all_evidence = []
|
|
rejected = []
|
|
for index, point in enumerate([(16, 36), (28, 36), (42, 36), (58, 36), (66, 36)]):
|
|
evidence, diagnostics = tracker.observe(
|
|
frame_with_motion_patch(80, 80, point),
|
|
now + timedelta(seconds=index + 1),
|
|
{"first": 0, "second": 0},
|
|
)
|
|
all_evidence.extend(evidence)
|
|
rejected.extend(diagnostics["rejected"])
|
|
|
|
tracks = [tuple((point["x"], point["y"]) for point in item.track_points) for item in all_evidence]
|
|
self.assertLessEqual(len(all_evidence), 1)
|
|
self.assertEqual(len(tracks), len(set(tracks)))
|
|
self.assertTrue(any(item["reason"] == "ambiguous_motion_track" for item in rejected))
|
|
|
|
def test_motion_inside_source_margin_but_outside_source_polygon_is_rejected(self) -> None:
|
|
source = Region("source", ((0.05, 0.35), (0.25, 0.35), (0.25, 0.65), (0.05, 0.65)))
|
|
trash = Region("trash", ((0.72, 0.35), (0.95, 0.35), (0.95, 0.65), (0.72, 0.65)))
|
|
tracker = TrajectoryTracker(
|
|
[source],
|
|
trash,
|
|
RuntimeVisionSettings(trajectory_sample_interval_seconds=0.0, trajectory_min_points=3),
|
|
)
|
|
now = datetime(2026, 4, 28, 10, 0, tzinfo=timezone.utc)
|
|
|
|
tracker.observe(frame_with_motion_patch(80, 80, (20, 36)), now, {"source": 1})
|
|
all_evidence = []
|
|
rejected = []
|
|
for index, point in enumerate([(24, 36), (36, 36), (50, 36), (66, 36), (70, 36)]):
|
|
evidence, diagnostics = tracker.observe(
|
|
frame_with_motion_patch(80, 80, point),
|
|
now + timedelta(seconds=index + 1),
|
|
{"source": 0},
|
|
)
|
|
all_evidence.extend(evidence)
|
|
rejected.extend(diagnostics["rejected"])
|
|
|
|
self.assertEqual(all_evidence, [])
|
|
self.assertTrue(any(item["reason"] == "motion_started_outside_source" for item in rejected))
|
|
|
|
def test_motion_before_source_motion_cannot_seed_later_trash_evidence(self) -> None:
|
|
source = Region("source", ((0.05, 0.35), (0.25, 0.35), (0.25, 0.65), (0.05, 0.65)))
|
|
trash = Region("trash", ((0.72, 0.35), (0.95, 0.35), (0.95, 0.65), (0.72, 0.65)))
|
|
tracker = TrajectoryTracker(
|
|
[source],
|
|
trash,
|
|
RuntimeVisionSettings(trajectory_sample_interval_seconds=0.0, trajectory_min_points=3),
|
|
)
|
|
now = datetime(2026, 4, 28, 10, 0, tzinfo=timezone.utc)
|
|
|
|
tracker.observe(solid_frame(80, 80, 40), now, {"source": 1})
|
|
all_evidence = []
|
|
rejected = []
|
|
frames = [
|
|
frame_with_motion_patch(80, 80, (34, 36)),
|
|
frame_with_motion_patch(80, 80, (16, 36)),
|
|
frame_with_motion_patch(80, 80, (34, 36)),
|
|
frame_with_motion_patch(80, 80, (50, 36)),
|
|
frame_with_motion_patch(80, 80, (66, 36)),
|
|
]
|
|
for index, frame in enumerate(frames):
|
|
evidence, diagnostics = tracker.observe(
|
|
frame,
|
|
now + timedelta(seconds=index + 1),
|
|
{"source": 0},
|
|
)
|
|
all_evidence.extend(evidence)
|
|
rejected.extend(diagnostics["rejected"])
|
|
|
|
self.assertEqual(all_evidence, [])
|
|
self.assertTrue(any(item["reason"] == "motion_started_outside_source" for item in rejected))
|
|
|
|
def test_trajectory_diagnostics_include_per_candidate_events(self) -> None:
|
|
source = Region("source", ((0.05, 0.35), (0.25, 0.35), (0.25, 0.65), (0.05, 0.65)))
|
|
trash = Region("trash", ((0.72, 0.35), (0.95, 0.35), (0.95, 0.65), (0.72, 0.65)))
|
|
tracker = TrajectoryTracker(
|
|
[source],
|
|
trash,
|
|
RuntimeVisionSettings(
|
|
trajectory_window_seconds=3,
|
|
trajectory_sample_interval_seconds=0.0,
|
|
trajectory_min_points=3,
|
|
),
|
|
)
|
|
now = datetime(2026, 4, 28, 10, 0, tzinfo=timezone.utc)
|
|
|
|
tracker.observe(frame_with_motion_patch(80, 80, (8, 36)), now, {"source": 1})
|
|
emitted_diagnostics = {}
|
|
for index, point in enumerate([(16, 36), (28, 36), (42, 36), (58, 36)]):
|
|
_, emitted_diagnostics = tracker.observe(
|
|
frame_with_motion_patch(80, 80, point),
|
|
now + timedelta(seconds=index + 1),
|
|
{"source": 0},
|
|
)
|
|
emitted_event = emitted_diagnostics["emitted"][0]
|
|
self.assert_candidate_event(emitted_event, "source", "emitted")
|
|
|
|
tracker = TrajectoryTracker(
|
|
[source],
|
|
trash,
|
|
RuntimeVisionSettings(
|
|
trajectory_window_seconds=2,
|
|
trajectory_sample_interval_seconds=0.0,
|
|
trajectory_min_points=3,
|
|
),
|
|
)
|
|
tracker.observe(frame_with_motion_patch(80, 80, (8, 36)), now, {"source": 1})
|
|
rejected_diagnostics = {}
|
|
for index, point in enumerate([(16, 36), (26, 36), (36, 36), (42, 36)]):
|
|
_, rejected_diagnostics = tracker.observe(
|
|
frame_with_motion_patch(80, 80, point),
|
|
now + timedelta(seconds=index + 1),
|
|
{"source": 0},
|
|
)
|
|
expired_event = rejected_diagnostics["expired"][0]
|
|
rejected_event = rejected_diagnostics["rejected"][0]
|
|
self.assert_candidate_event(expired_event, "source", "expired")
|
|
self.assert_candidate_event(rejected_event, "source", "did_not_reach_trash")
|
|
|
|
def assert_candidate_event(self, event: dict[str, object], source_zone_id: str, reason: str) -> None:
|
|
self.assertEqual(event["source_zone_id"], source_zone_id)
|
|
self.assertEqual(event["reason"], reason)
|
|
self.assertIn("point_count", event)
|
|
self.assertIn("confidence", event)
|
|
self.assertIn("direction_score", event)
|
|
|
|
def test_runtime_vision_defaults_include_trajectory_and_yolo_fields(self) -> None:
|
|
settings = load_runtime_vision_settings({})
|
|
|
|
self.assertTrue(settings.trajectory_enabled)
|
|
self.assertEqual(settings.trajectory_window_seconds, 8)
|
|
self.assertEqual(settings.trajectory_sample_interval_seconds, 1.0)
|
|
self.assertEqual(settings.trajectory_min_points, 3)
|
|
self.assertEqual(settings.trajectory_min_confidence, 0.72)
|
|
self.assertEqual(settings.trajectory_motion_delta, 20.0)
|
|
self.assertEqual(settings.trajectory_min_blob_area, 12)
|
|
self.assertEqual(settings.trajectory_max_blob_area_fraction, 0.35)
|
|
self.assertEqual(settings.trajectory_trash_entry_margin, 0.04)
|
|
self.assertEqual(settings.trajectory_backend, "motion")
|
|
self.assertFalse(settings.yolo_enabled)
|
|
self.assertEqual(settings.yolo_model_path, "")
|
|
self.assertEqual(settings.yolo_min_confidence, 0.65)
|
|
|
|
def test_runtime_vision_settings_read_trajectory_and_yolo_fields_from_config(self) -> None:
|
|
settings = load_runtime_vision_settings(
|
|
{
|
|
"runtime": {
|
|
"trajectory_enabled": False,
|
|
"trajectory_window_seconds": 11,
|
|
"trajectory_sample_interval_seconds": 0.5,
|
|
"trajectory_min_points": 4,
|
|
"trajectory_min_confidence": 0.8,
|
|
"trajectory_motion_delta": 25.0,
|
|
"trajectory_min_blob_area": 20,
|
|
"trajectory_max_blob_area_fraction": 0.25,
|
|
"trajectory_trash_entry_margin": 0.02,
|
|
"trajectory_backend": "motion",
|
|
"yolo_enabled": True,
|
|
"yolo_model_path": "models/yolo.onnx",
|
|
"yolo_min_confidence": 0.7,
|
|
}
|
|
}
|
|
)
|
|
|
|
self.assertFalse(settings.trajectory_enabled)
|
|
self.assertEqual(settings.trajectory_window_seconds, 11)
|
|
self.assertEqual(settings.trajectory_sample_interval_seconds, 0.5)
|
|
self.assertEqual(settings.trajectory_min_points, 4)
|
|
self.assertEqual(settings.trajectory_min_confidence, 0.8)
|
|
self.assertEqual(settings.trajectory_motion_delta, 25.0)
|
|
self.assertEqual(settings.trajectory_min_blob_area, 20)
|
|
self.assertEqual(settings.trajectory_max_blob_area_fraction, 0.25)
|
|
self.assertEqual(settings.trajectory_trash_entry_margin, 0.02)
|
|
self.assertEqual(settings.trajectory_backend, "motion")
|
|
self.assertTrue(settings.yolo_enabled)
|
|
self.assertEqual(settings.yolo_model_path, "models/yolo.onnx")
|
|
self.assertEqual(settings.yolo_min_confidence, 0.7)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
unittest.main()
|