feat: allow segmented disposal trajectories
This commit is contained in:
@@ -378,6 +378,45 @@ class VisionTests(unittest.TestCase):
|
||||
self.assertGreaterEqual(len(emitted.track_points), 3)
|
||||
self.assertGreaterEqual(emitted_evidence_count, 1)
|
||||
|
||||
def test_segmented_source_to_trash_track_survives_temporary_occlusion(self) -> None:
|
||||
source = Region("source", ((0.05, 0.35), (0.25, 0.35), (0.25, 0.65), (0.05, 0.65)))
|
||||
trash = Region("trash", ((0.72, 0.35), (0.95, 0.35), (0.95, 0.65), (0.72, 0.65)))
|
||||
tracker = TrajectoryTracker(
|
||||
[source],
|
||||
trash,
|
||||
RuntimeVisionSettings(
|
||||
trajectory_sample_interval_seconds=0.0,
|
||||
trajectory_min_points=3,
|
||||
trajectory_min_confidence=0.72,
|
||||
trajectory_motion_delta=20.0,
|
||||
trajectory_min_blob_area=12,
|
||||
),
|
||||
)
|
||||
now = datetime(2026, 6, 1, 10, 55, tzinfo=timezone.utc)
|
||||
|
||||
tracker.observe(frame_with_motion_patch(80, 80, (8, 36)), now, {"source": 1})
|
||||
source_motion_frame = frame_with_motion_patch(80, 80, (16, 36))
|
||||
first_evidence, _ = tracker.observe(source_motion_frame, now + timedelta(seconds=1), {"source": 0})
|
||||
occluded_evidence, occluded_diagnostics = tracker.observe(source_motion_frame, now + timedelta(seconds=2), {"source": 0})
|
||||
trash_evidence, trash_diagnostics = tracker.observe(
|
||||
frame_with_motion_patch(80, 80, (66, 36)),
|
||||
now + timedelta(seconds=3),
|
||||
{"source": 0},
|
||||
)
|
||||
|
||||
self.assertEqual(first_evidence, [])
|
||||
self.assertEqual(occluded_evidence, [])
|
||||
self.assertEqual(occluded_diagnostics["active_candidates"], 1)
|
||||
self.assertEqual(len(trash_evidence), 1)
|
||||
emitted = trash_evidence[0]
|
||||
self.assertEqual(emitted.source_zone_id, "source")
|
||||
self.assertEqual(emitted.target, "trash")
|
||||
self.assertEqual(emitted.method, "motion")
|
||||
self.assertEqual(len(emitted.track_points), 2)
|
||||
self.assertGreaterEqual(emitted.confidence, 0.72)
|
||||
self.assertEqual(trash_diagnostics["emitted"][0]["reason"], "emitted")
|
||||
self.assertTrue(trash_diagnostics["emitted"][0]["segmented"])
|
||||
|
||||
def test_motion_that_starts_away_from_source_zone_is_rejected(self) -> None:
|
||||
source = Region("source", ((0.05, 0.35), (0.25, 0.35), (0.25, 0.65), (0.05, 0.65)))
|
||||
trash = Region("trash", ((0.72, 0.35), (0.95, 0.35), (0.95, 0.65), (0.72, 0.65)))
|
||||
@@ -633,6 +672,8 @@ class VisionTests(unittest.TestCase):
|
||||
self.assertEqual(settings.trajectory_window_seconds, 8)
|
||||
self.assertEqual(settings.trajectory_sample_interval_seconds, 1.0)
|
||||
self.assertEqual(settings.trajectory_min_points, 3)
|
||||
self.assertTrue(settings.trajectory_segmented_enabled)
|
||||
self.assertEqual(settings.trajectory_segmented_min_points, 2)
|
||||
self.assertEqual(settings.trajectory_min_confidence, 0.72)
|
||||
self.assertEqual(settings.trajectory_motion_delta, 20.0)
|
||||
self.assertEqual(settings.trajectory_min_blob_area, 12)
|
||||
@@ -651,6 +692,8 @@ class VisionTests(unittest.TestCase):
|
||||
"trajectory_window_seconds": 11,
|
||||
"trajectory_sample_interval_seconds": 0.5,
|
||||
"trajectory_min_points": 4,
|
||||
"trajectory_segmented_enabled": False,
|
||||
"trajectory_segmented_min_points": 3,
|
||||
"trajectory_min_confidence": 0.8,
|
||||
"trajectory_motion_delta": 25.0,
|
||||
"trajectory_min_blob_area": 20,
|
||||
@@ -672,6 +715,8 @@ class VisionTests(unittest.TestCase):
|
||||
self.assertEqual(settings.trajectory_window_seconds, 11)
|
||||
self.assertEqual(settings.trajectory_sample_interval_seconds, 0.5)
|
||||
self.assertEqual(settings.trajectory_min_points, 4)
|
||||
self.assertFalse(settings.trajectory_segmented_enabled)
|
||||
self.assertEqual(settings.trajectory_segmented_min_points, 3)
|
||||
self.assertEqual(settings.trajectory_min_confidence, 0.8)
|
||||
self.assertEqual(settings.trajectory_motion_delta, 25.0)
|
||||
self.assertEqual(settings.trajectory_min_blob_area, 20)
|
||||
|
||||
Reference in New Issue
Block a user