feat: add disposal evidence engine handling
This commit is contained in:
@@ -9,16 +9,39 @@ from cold_display_guard import BatchEngine, EngineSettings, Observation
|
||||
UTC = timezone.utc
|
||||
|
||||
|
||||
def obs(ts: datetime, counts: dict[str, int], trash: bool | int = False) -> Observation:
|
||||
def obs(
|
||||
ts: datetime,
|
||||
counts: dict[str, int],
|
||||
trash: bool | int = False,
|
||||
disposal_evidence: list[dict[str, object]] | None = None,
|
||||
) -> Observation:
|
||||
return Observation.from_dict(
|
||||
{
|
||||
"ts": ts.isoformat(),
|
||||
"zone_counts": counts,
|
||||
"trash_deposit": trash,
|
||||
"disposal_evidence": disposal_evidence or [],
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
def disposal_evidence(
|
||||
source_zone_id: str,
|
||||
confidence: float = 0.93,
|
||||
target: str = "trash_bin",
|
||||
) -> dict[str, object]:
|
||||
return {
|
||||
"source_zone_id": source_zone_id,
|
||||
"target": target,
|
||||
"confidence": confidence,
|
||||
"method": "trajectory",
|
||||
"track_points": [{"x": 101, "y": 202, "ts": "2026-04-27T10:20:01+00:00"}],
|
||||
"item_class": "prepared_food",
|
||||
"detector_score": 0.88,
|
||||
"observed_at": "2026-04-27T10:20:02+00:00",
|
||||
}
|
||||
|
||||
|
||||
class BatchEngineTests(unittest.TestCase):
|
||||
def setUp(self) -> None:
|
||||
self.settings = EngineSettings(
|
||||
@@ -80,6 +103,200 @@ class BatchEngineTests(unittest.TestCase):
|
||||
|
||||
self.assertEqual([event["event"] for event in events], ["batch_discarded"])
|
||||
|
||||
def test_observation_from_dict_normalizes_disposal_evidence(self) -> None:
|
||||
observation = Observation.from_dict(
|
||||
{
|
||||
"ts": self.t0.isoformat(),
|
||||
"zone_counts": {"1": "2"},
|
||||
"disposal_evidence": [
|
||||
{
|
||||
"source_zone_id": 1,
|
||||
"target": "trash_bin",
|
||||
"confidence": "0.83",
|
||||
"method": "trajectory",
|
||||
"track_points": [{"x": 1, "y": 2}],
|
||||
"item_class": "prepared_food",
|
||||
"detector_score": "0.91",
|
||||
"observed_at": "2026-04-27T10:00:01+00:00",
|
||||
}
|
||||
],
|
||||
}
|
||||
)
|
||||
|
||||
evidence = observation.disposal_evidence[0]
|
||||
self.assertEqual(evidence.source_zone_id, "1")
|
||||
self.assertEqual(evidence.target, "trash_bin")
|
||||
self.assertAlmostEqual(evidence.confidence, 0.83)
|
||||
self.assertEqual(evidence.method, "trajectory")
|
||||
self.assertEqual(evidence.track_points, [{"x": 1, "y": 2}])
|
||||
self.assertEqual(evidence.item_class, "prepared_food")
|
||||
self.assertAlmostEqual(evidence.detector_score, 0.91)
|
||||
self.assertEqual(evidence.observed_at, "2026-04-27T10:00:01+00:00")
|
||||
|
||||
def test_observation_from_dict_preserves_null_optional_disposal_fields(self) -> None:
|
||||
observation = Observation.from_dict(
|
||||
{
|
||||
"ts": self.t0.isoformat(),
|
||||
"zone_counts": {"1": 1},
|
||||
"disposal_evidence": [
|
||||
{
|
||||
"source_zone_id": "1",
|
||||
"target": "trash_bin",
|
||||
"confidence": 0.83,
|
||||
"method": "trajectory",
|
||||
"track_points": [],
|
||||
"item_class": None,
|
||||
"detector_score": None,
|
||||
}
|
||||
],
|
||||
}
|
||||
)
|
||||
|
||||
evidence = observation.disposal_evidence[0]
|
||||
self.assertIsNone(evidence.item_class)
|
||||
self.assertIsNone(evidence.detector_score)
|
||||
|
||||
def test_matching_disposal_evidence_discards_pending_batch(self) -> None:
|
||||
settings = EngineSettings(
|
||||
camera_id="test_cam",
|
||||
max_dwell_seconds=1200,
|
||||
trash_confirmation_seconds=120,
|
||||
zone_ids=("1",),
|
||||
)
|
||||
engine = BatchEngine(settings)
|
||||
engine.process(obs(self.t0, {"1": 1}))
|
||||
engine.process(obs(self.t0 + timedelta(seconds=1200), {"1": 1}))
|
||||
engine.process(obs(self.t0 + timedelta(seconds=1300), {"1": 0}))
|
||||
|
||||
events = engine.process(
|
||||
obs(
|
||||
self.t0 + timedelta(seconds=1310),
|
||||
{"1": 0},
|
||||
disposal_evidence=[disposal_evidence("1")],
|
||||
)
|
||||
)
|
||||
|
||||
self.assertEqual([event["event"] for event in events], ["batch_discarded"])
|
||||
self.assertEqual(events[0]["zone_id"], "1")
|
||||
self.assertEqual(engine.pending_disposal, [])
|
||||
|
||||
def test_disposal_evidence_for_another_zone_does_not_discard_wrong_pending_batch(self) -> None:
|
||||
settings = EngineSettings(
|
||||
camera_id="test_cam",
|
||||
max_dwell_seconds=1200,
|
||||
trash_confirmation_seconds=120,
|
||||
zone_ids=("1", "4"),
|
||||
)
|
||||
engine = BatchEngine(settings)
|
||||
engine.process(obs(self.t0, {"1": 1, "4": 0}))
|
||||
engine.process(obs(self.t0 + timedelta(seconds=1200), {"1": 1, "4": 0}))
|
||||
engine.process(obs(self.t0 + timedelta(seconds=1300), {"1": 0, "4": 0}))
|
||||
|
||||
events = engine.process(
|
||||
obs(
|
||||
self.t0 + timedelta(seconds=1310),
|
||||
{"1": 0, "4": 0},
|
||||
disposal_evidence=[disposal_evidence("4")],
|
||||
)
|
||||
)
|
||||
|
||||
self.assertEqual(events, [])
|
||||
self.assertEqual([batch.zone_id for batch in engine.pending_disposal], ["1"])
|
||||
|
||||
def test_non_trash_disposal_evidence_target_is_ignored(self) -> None:
|
||||
settings = EngineSettings(
|
||||
camera_id="test_cam",
|
||||
max_dwell_seconds=1200,
|
||||
trash_confirmation_seconds=120,
|
||||
zone_ids=("1",),
|
||||
)
|
||||
engine = BatchEngine(settings)
|
||||
engine.process(obs(self.t0, {"1": 1}))
|
||||
engine.process(obs(self.t0 + timedelta(seconds=1200), {"1": 1}))
|
||||
engine.process(obs(self.t0 + timedelta(seconds=1300), {"1": 0}))
|
||||
|
||||
events = engine.process(
|
||||
obs(
|
||||
self.t0 + timedelta(seconds=1310),
|
||||
{"1": 0},
|
||||
disposal_evidence=[disposal_evidence("1", target="customer_hand")],
|
||||
)
|
||||
)
|
||||
|
||||
self.assertEqual(events, [])
|
||||
self.assertEqual([batch.zone_id for batch in engine.pending_disposal], ["1"])
|
||||
|
||||
def test_disposal_evidence_and_trash_count_do_not_double_consume_same_signal(self) -> None:
|
||||
settings = EngineSettings(
|
||||
camera_id="test_cam",
|
||||
max_dwell_seconds=1200,
|
||||
trash_confirmation_seconds=120,
|
||||
zone_ids=("1", "4"),
|
||||
)
|
||||
engine = BatchEngine(settings)
|
||||
engine.process(obs(self.t0, {"1": 1, "4": 1}))
|
||||
engine.process(obs(self.t0 + timedelta(seconds=1200), {"1": 1, "4": 1}))
|
||||
engine.process(obs(self.t0 + timedelta(seconds=1300), {"1": 0, "4": 0}))
|
||||
|
||||
events = engine.process(
|
||||
obs(
|
||||
self.t0 + timedelta(seconds=1310),
|
||||
{"1": 0, "4": 0},
|
||||
trash=True,
|
||||
disposal_evidence=[disposal_evidence("4")],
|
||||
)
|
||||
)
|
||||
|
||||
self.assertEqual([(event["event"], event["zone_id"]) for event in events], [("batch_discarded", "4")])
|
||||
self.assertEqual([batch.zone_id for batch in engine.pending_disposal], ["1"])
|
||||
|
||||
def test_same_observation_removal_and_disposal_evidence_discards_newly_pending_batch(self) -> None:
|
||||
settings = EngineSettings(
|
||||
camera_id="test_cam",
|
||||
max_dwell_seconds=1200,
|
||||
trash_confirmation_seconds=120,
|
||||
zone_ids=("1",),
|
||||
)
|
||||
engine = BatchEngine(settings)
|
||||
engine.process(obs(self.t0, {"1": 1}))
|
||||
engine.process(obs(self.t0 + timedelta(seconds=1200), {"1": 1}))
|
||||
|
||||
events = engine.process(
|
||||
obs(
|
||||
self.t0 + timedelta(seconds=1300),
|
||||
{"1": 0},
|
||||
disposal_evidence=[disposal_evidence("1")],
|
||||
)
|
||||
)
|
||||
later_events = engine.process(obs(self.t0 + timedelta(seconds=1421), {"1": 0}))
|
||||
|
||||
self.assertEqual([event["event"] for event in events], ["batch_pending_disposal", "batch_discarded"])
|
||||
self.assertEqual(events[1]["zone_id"], "1")
|
||||
self.assertEqual(later_events, [])
|
||||
|
||||
def test_low_confidence_disposal_evidence_is_ignored(self) -> None:
|
||||
settings = EngineSettings(
|
||||
camera_id="test_cam",
|
||||
max_dwell_seconds=1200,
|
||||
trash_confirmation_seconds=120,
|
||||
zone_ids=("1",),
|
||||
)
|
||||
engine = BatchEngine(settings)
|
||||
engine.process(obs(self.t0, {"1": 1}))
|
||||
engine.process(obs(self.t0 + timedelta(seconds=1200), {"1": 1}))
|
||||
engine.process(obs(self.t0 + timedelta(seconds=1300), {"1": 0}))
|
||||
|
||||
events = engine.process(
|
||||
obs(
|
||||
self.t0 + timedelta(seconds=1310),
|
||||
{"1": 0},
|
||||
disposal_evidence=[disposal_evidence("1", confidence=0.71)],
|
||||
)
|
||||
)
|
||||
|
||||
self.assertEqual(events, [])
|
||||
self.assertEqual([batch.zone_id for batch in engine.pending_disposal], ["1"])
|
||||
|
||||
def test_missing_trash_deposit_escalates_warning_after_deadline(self) -> None:
|
||||
self.engine.process(obs(self.t0, {"r1c1": 2}))
|
||||
self.engine.process(obs(self.t0 + timedelta(seconds=11), {"r1c1": 0}))
|
||||
|
||||
Reference in New Issue
Block a user