from __future__ import annotations import unittest from datetime import datetime, timedelta, timezone from cold_display_guard import BatchEngine, EngineSettings, Observation UTC = timezone.utc def obs( ts: datetime, counts: dict[str, int], trash: bool | int = False, disposal_evidence: list[dict[str, object]] | None = None, ) -> Observation: return Observation.from_dict( { "ts": ts.isoformat(), "zone_counts": counts, "trash_deposit": trash, "disposal_evidence": disposal_evidence or [], } ) def disposal_evidence( source_zone_id: str, confidence: float = 0.93, target: str = "trash_bin", ) -> dict[str, object]: return { "source_zone_id": source_zone_id, "target": target, "confidence": confidence, "method": "trajectory", "track_points": [{"x": 101, "y": 202, "ts": "2026-04-27T10:20:01+00:00"}], "item_class": "prepared_food", "detector_score": 0.88, "observed_at": "2026-04-27T10:20:02+00:00", } class BatchEngineTests(unittest.TestCase): def setUp(self) -> None: self.settings = EngineSettings( camera_id="test_cam", max_dwell_seconds=10, trash_confirmation_seconds=5, zone_ids=("r1c1", "r1c2"), ) self.engine = BatchEngine(self.settings) self.t0 = datetime(2026, 4, 27, 10, 0, tzinfo=UTC) def test_starts_batch_when_zone_becomes_occupied(self) -> None: events = self.engine.process(obs(self.t0, {"r1c1": 3})) self.assertEqual([event["event"] for event in events], ["batch_started"]) self.assertEqual(events[0]["zone_id"], "r1c1") self.assertEqual(events[0]["current_count"], 3) self.assertEqual(events[0]["severity"], "info") def test_consumes_batch_when_removed_before_threshold(self) -> None: self.engine.process(obs(self.t0, {"r1c1": 2})) events = self.engine.process(obs(self.t0 + timedelta(seconds=9), {"r1c1": 0})) self.assertEqual([event["event"] for event in events], ["batch_consumed"]) self.assertEqual(events[0]["dwell_seconds"], 9) def test_over_threshold_removal_waits_for_disposal_confirmation(self) -> None: self.engine.process(obs(self.t0, {"r1c1": 2})) events = self.engine.process(obs(self.t0 + timedelta(seconds=10), {"r1c1": 0})) self.assertEqual([event["event"] for event in events], ["time_alarm", "batch_pending_disposal"]) self.assertEqual(events[0]["severity"], "alarm") self.assertEqual(events[1]["dwell_seconds"], 10) self.assertIn("disposal_deadline", events[1]) def test_removal_observation_at_threshold_emits_alarm_before_pending_disposal(self) -> None: settings = EngineSettings( camera_id="test_cam", max_dwell_seconds=1200, trash_confirmation_seconds=120, zone_ids=("1",), ) engine = BatchEngine(settings) engine.process(obs(self.t0, {"1": 1})) events = engine.process(obs(self.t0 + timedelta(seconds=1200), {"1": 0})) self.assertEqual([event["event"] for event in events], ["time_alarm", "batch_pending_disposal"]) self.assertEqual(events[0]["severity"], "alarm") self.assertEqual(events[0]["current_count"], 1) self.assertEqual(events[0]["zone_index"], 1) self.assertEqual(events[1]["severity"], "warning") self.assertEqual(events[1]["state"], "pending_disposal") def test_trash_deposit_confirms_pending_disposal(self) -> None: self.engine.process(obs(self.t0, {"r1c1": 2})) self.engine.process(obs(self.t0 + timedelta(seconds=11), {"r1c1": 0})) events = self.engine.process(obs(self.t0 + timedelta(seconds=12), {"r1c1": 0}, trash=True)) self.assertEqual([event["event"] for event in events], ["batch_discarded"]) def test_observation_from_dict_normalizes_disposal_evidence(self) -> None: observation = Observation.from_dict( { "ts": self.t0.isoformat(), "zone_counts": {"1": "2"}, "disposal_evidence": [ { "source_zone_id": 1, "target": "trash_bin", "confidence": "0.83", "method": "trajectory", "track_points": [{"x": 1, "y": 2}], "item_class": "prepared_food", "detector_score": "0.91", "observed_at": "2026-04-27T10:00:01+00:00", } ], } ) evidence = observation.disposal_evidence[0] self.assertEqual(evidence.source_zone_id, "1") self.assertEqual(evidence.target, "trash_bin") self.assertAlmostEqual(evidence.confidence, 0.83) self.assertEqual(evidence.method, "trajectory") self.assertEqual(evidence.track_points, [{"x": 1, "y": 2}]) self.assertEqual(evidence.item_class, "prepared_food") self.assertAlmostEqual(evidence.detector_score, 0.91) self.assertEqual(evidence.observed_at, "2026-04-27T10:00:01+00:00") def test_observation_from_dict_preserves_null_optional_disposal_fields(self) -> None: observation = Observation.from_dict( { "ts": self.t0.isoformat(), "zone_counts": {"1": 1}, "disposal_evidence": [ { "source_zone_id": "1", "target": "trash_bin", "confidence": 0.83, "method": "trajectory", "track_points": [], "item_class": None, "detector_score": None, } ], } ) evidence = observation.disposal_evidence[0] self.assertIsNone(evidence.item_class) self.assertIsNone(evidence.detector_score) def test_matching_disposal_evidence_discards_pending_batch(self) -> None: settings = EngineSettings( camera_id="test_cam", max_dwell_seconds=1200, trash_confirmation_seconds=120, zone_ids=("1",), ) engine = BatchEngine(settings) engine.process(obs(self.t0, {"1": 1})) engine.process(obs(self.t0 + timedelta(seconds=1200), {"1": 1})) engine.process(obs(self.t0 + timedelta(seconds=1300), {"1": 0})) events = engine.process( obs( self.t0 + timedelta(seconds=1310), {"1": 0}, disposal_evidence=[disposal_evidence("1")], ) ) self.assertEqual([event["event"] for event in events], ["batch_discarded"]) self.assertEqual(events[0]["zone_id"], "1") self.assertEqual(engine.pending_disposal, []) def test_disposal_evidence_for_another_zone_does_not_discard_wrong_pending_batch(self) -> None: settings = EngineSettings( camera_id="test_cam", max_dwell_seconds=1200, trash_confirmation_seconds=120, zone_ids=("1", "4"), ) engine = BatchEngine(settings) engine.process(obs(self.t0, {"1": 1, "4": 0})) engine.process(obs(self.t0 + timedelta(seconds=1200), {"1": 1, "4": 0})) engine.process(obs(self.t0 + timedelta(seconds=1300), {"1": 0, "4": 0})) events = engine.process( obs( self.t0 + timedelta(seconds=1310), {"1": 0, "4": 0}, disposal_evidence=[disposal_evidence("4")], ) ) self.assertEqual(events, []) self.assertEqual([batch.zone_id for batch in engine.pending_disposal], ["1"]) def test_non_trash_disposal_evidence_target_is_ignored(self) -> None: settings = EngineSettings( camera_id="test_cam", max_dwell_seconds=1200, trash_confirmation_seconds=120, zone_ids=("1",), ) engine = BatchEngine(settings) engine.process(obs(self.t0, {"1": 1})) engine.process(obs(self.t0 + timedelta(seconds=1200), {"1": 1})) engine.process(obs(self.t0 + timedelta(seconds=1300), {"1": 0})) events = engine.process( obs( self.t0 + timedelta(seconds=1310), {"1": 0}, disposal_evidence=[disposal_evidence("1", target="customer_hand")], ) ) self.assertEqual(events, []) self.assertEqual([batch.zone_id for batch in engine.pending_disposal], ["1"]) def test_disposal_evidence_and_trash_count_do_not_double_consume_same_signal(self) -> None: settings = EngineSettings( camera_id="test_cam", max_dwell_seconds=1200, trash_confirmation_seconds=120, zone_ids=("1", "4"), ) engine = BatchEngine(settings) engine.process(obs(self.t0, {"1": 1, "4": 1})) engine.process(obs(self.t0 + timedelta(seconds=1200), {"1": 1, "4": 1})) engine.process(obs(self.t0 + timedelta(seconds=1300), {"1": 0, "4": 0})) events = engine.process( obs( self.t0 + timedelta(seconds=1310), {"1": 0, "4": 0}, trash=True, disposal_evidence=[disposal_evidence("4")], ) ) self.assertEqual([(event["event"], event["zone_id"]) for event in events], [("batch_discarded", "4")]) self.assertEqual([batch.zone_id for batch in engine.pending_disposal], ["1"]) def test_extra_trash_deposits_still_fallback_after_matching_disposal_evidence(self) -> None: settings = EngineSettings( camera_id="test_cam", max_dwell_seconds=1200, trash_confirmation_seconds=120, zone_ids=("1", "2"), ) engine = BatchEngine(settings) engine.process(obs(self.t0, {"1": 1, "2": 1})) engine.process(obs(self.t0 + timedelta(seconds=1200), {"1": 1, "2": 1})) engine.process(obs(self.t0 + timedelta(seconds=1300), {"1": 0, "2": 0})) events = engine.process( obs( self.t0 + timedelta(seconds=1310), {"1": 0, "2": 0}, trash=2, disposal_evidence=[disposal_evidence("1")], ) ) self.assertEqual( [(event["event"], event["zone_id"]) for event in events], [("batch_discarded", "1"), ("batch_discarded", "2")], ) self.assertEqual(engine.pending_disposal, []) def test_same_observation_removal_and_disposal_evidence_discards_newly_pending_batch(self) -> None: settings = EngineSettings( camera_id="test_cam", max_dwell_seconds=1200, trash_confirmation_seconds=120, zone_ids=("1",), ) engine = BatchEngine(settings) engine.process(obs(self.t0, {"1": 1})) engine.process(obs(self.t0 + timedelta(seconds=1200), {"1": 1})) events = engine.process( obs( self.t0 + timedelta(seconds=1300), {"1": 0}, disposal_evidence=[disposal_evidence("1")], ) ) later_events = engine.process(obs(self.t0 + timedelta(seconds=1421), {"1": 0})) self.assertEqual([event["event"] for event in events], ["batch_pending_disposal", "batch_discarded"]) self.assertEqual(events[1]["zone_id"], "1") self.assertEqual(later_events, []) def test_low_confidence_disposal_evidence_is_ignored(self) -> None: settings = EngineSettings( camera_id="test_cam", max_dwell_seconds=1200, trash_confirmation_seconds=120, zone_ids=("1",), ) engine = BatchEngine(settings) engine.process(obs(self.t0, {"1": 1})) engine.process(obs(self.t0 + timedelta(seconds=1200), {"1": 1})) engine.process(obs(self.t0 + timedelta(seconds=1300), {"1": 0})) events = engine.process( obs( self.t0 + timedelta(seconds=1310), {"1": 0}, disposal_evidence=[disposal_evidence("1", confidence=0.71)], ) ) self.assertEqual(events, []) self.assertEqual([batch.zone_id for batch in engine.pending_disposal], ["1"]) def test_missing_trash_deposit_escalates_warning_after_deadline(self) -> None: self.engine.process(obs(self.t0, {"r1c1": 2})) self.engine.process(obs(self.t0 + timedelta(seconds=11), {"r1c1": 0})) events = self.engine.process(obs(self.t0 + timedelta(seconds=17), {"r1c1": 0})) self.assertEqual([event["event"] for event in events], ["warning_escalated"]) self.assertEqual(events[0]["severity"], "warning") self.assertEqual(events[0]["violation_reasons"], ["missing_disposal"]) def test_adding_food_before_zone_clears_raises_mixed_batch_violation(self) -> None: self.engine.process(obs(self.t0, {"r1c1": 2})) events = self.engine.process(obs(self.t0 + timedelta(seconds=1), {"r1c1": 3})) self.assertEqual([event["event"] for event in events], ["mixed_batch_violation"]) self.assertEqual(events[0]["severity"], "warning") self.assertEqual(events[0]["reason"], "food_added_before_zone_cleared") def test_count_decrease_keeps_same_batch_active(self) -> None: self.engine.process(obs(self.t0, {"r1c1": 3})) events = self.engine.process(obs(self.t0 + timedelta(seconds=1), {"r1c1": 1})) self.assertEqual([event["event"] for event in events], ["batch_count_changed"]) self.assertEqual(events[0]["previous_count"], 3) self.assertEqual(events[0]["current_count"], 1) def test_overdue_food_reappearing_before_disposal_raises_violation(self) -> None: self.engine.process(obs(self.t0, {"r1c1": 2})) self.engine.process(obs(self.t0 + timedelta(seconds=11), {"r1c1": 0})) events = self.engine.process(obs(self.t0 + timedelta(seconds=12), {"r1c2": 1})) self.assertEqual( [event["event"] for event in events], ["overdue_return_violation", "batch_started"], ) self.assertEqual(events[0]["appeared_zones"], ["r1c2"]) def test_time_alarm_emits_once_while_batch_remains_in_zone(self) -> None: settings = EngineSettings( camera_id="test_cam", max_dwell_seconds=1200, trash_confirmation_seconds=120, zone_ids=("1",), ) engine = BatchEngine(settings) engine.process(obs(self.t0, {"1": 1})) alarm_events = engine.process(obs(self.t0 + timedelta(seconds=1200), {"1": 1})) repeated_events = engine.process(obs(self.t0 + timedelta(seconds=1300), {"1": 1})) self.assertEqual([event["event"] for event in alarm_events], ["time_alarm"]) self.assertEqual(repeated_events, []) self.assertEqual(alarm_events[0]["severity"], "alarm") self.assertEqual(alarm_events[0]["zone_id"], "1") self.assertEqual(alarm_events[0]["zone_index"], 1) self.assertEqual(alarm_events[0]["zone_label"], "区域 1") self.assertEqual(alarm_events[0]["dwell_seconds"], 1200) self.assertEqual(alarm_events[0]["max_dwell_seconds"], 1200) self.assertEqual(alarm_events[0]["current_count"], 1) self.assertIn("alerted_at", alarm_events[0]) def test_alarmed_batch_removed_without_trash_deposit_escalates_warning(self) -> None: settings = EngineSettings( camera_id="test_cam", max_dwell_seconds=1200, trash_confirmation_seconds=120, zone_ids=("1",), ) engine = BatchEngine(settings) engine.process(obs(self.t0, {"1": 1})) engine.process(obs(self.t0 + timedelta(seconds=1200), {"1": 1})) pending_events = engine.process(obs(self.t0 + timedelta(seconds=1300), {"1": 0})) warning_events = engine.process(obs(self.t0 + timedelta(seconds=1421), {"1": 0})) self.assertEqual([event["event"] for event in pending_events], ["batch_pending_disposal"]) self.assertEqual(pending_events[0]["severity"], "warning") self.assertEqual(pending_events[0]["state"], "pending_disposal") self.assertEqual(pending_events[0]["zone_index"], 1) self.assertEqual(pending_events[0]["ended_at"], (self.t0 + timedelta(seconds=1300)).isoformat()) self.assertEqual([event["event"] for event in warning_events], ["warning_escalated"]) self.assertEqual(warning_events[0]["severity"], "warning") self.assertEqual(warning_events[0]["state"], "warning") self.assertEqual(warning_events[0]["reason"], "alarmed_batch_removed_without_trash_deposit") self.assertEqual(warning_events[0]["zone_label"], "区域 1") def test_alarmed_batch_removed_with_trash_deposit_is_discarded(self) -> None: settings = EngineSettings( camera_id="test_cam", max_dwell_seconds=1200, trash_confirmation_seconds=120, zone_ids=("1",), ) engine = BatchEngine(settings) engine.process(obs(self.t0, {"1": 1})) engine.process(obs(self.t0 + timedelta(seconds=1200), {"1": 1})) engine.process(obs(self.t0 + timedelta(seconds=1300), {"1": 0})) events = engine.process(obs(self.t0 + timedelta(seconds=1310), {"1": 0}, trash=True)) self.assertEqual([event["event"] for event in events], ["batch_discarded"]) self.assertEqual(events[0]["severity"], "info") self.assertEqual(events[0]["state"], "discarded") def test_same_observation_removal_and_trash_motion_discards_alerted_batch(self) -> None: settings = EngineSettings( camera_id="test_cam", max_dwell_seconds=1200, trash_confirmation_seconds=120, zone_ids=("1",), ) engine = BatchEngine(settings) engine.process(obs(self.t0, {"1": 1})) engine.process(obs(self.t0 + timedelta(seconds=1200), {"1": 1})) events = engine.process(obs(self.t0 + timedelta(seconds=1300), {"1": 0}, trash=True)) later_events = engine.process(obs(self.t0 + timedelta(seconds=1421), {"1": 0})) self.assertEqual([event["event"] for event in events], ["batch_pending_disposal", "batch_discarded"]) self.assertEqual(events[1]["state"], "discarded") self.assertEqual(later_events, []) def test_same_observation_trash_motion_discards_multiple_newly_pending_batches(self) -> None: settings = EngineSettings( camera_id="test_cam", max_dwell_seconds=300, trash_confirmation_seconds=120, zone_ids=("1", "4"), ) engine = BatchEngine(settings) engine.process(obs(self.t0, {"1": 1, "4": 1})) engine.process(obs(self.t0 + timedelta(seconds=300), {"1": 1, "4": 1})) events = engine.process(obs(self.t0 + timedelta(seconds=360), {"1": 0, "4": 0}, trash=True)) later_events = engine.process(obs(self.t0 + timedelta(seconds=481), {"1": 0, "4": 0})) self.assertEqual( [event["event"] for event in events], ["batch_pending_disposal", "batch_pending_disposal", "batch_discarded", "batch_discarded"], ) self.assertEqual([event["zone_id"] for event in events if event["event"] == "batch_discarded"], ["1", "4"]) self.assertEqual(later_events, []) def test_restore_keeps_active_alarm_batch_after_runtime_restart(self) -> None: settings = EngineSettings( camera_id="test_cam", max_dwell_seconds=1200, trash_confirmation_seconds=120, zone_ids=("1",), ) engine = BatchEngine(settings) engine.restore_from_events( [ { "event": "batch_started", "zone_id": "1", "batch_id": "batch_000124", "started_at": self.t0.isoformat(), "current_count": 1, "state": "active", }, { "event": "time_alarm", "zone_id": "1", "batch_id": "batch_000124", "started_at": self.t0.isoformat(), "alerted_at": (self.t0 + timedelta(seconds=1200)).isoformat(), "current_count": 1, "state": "alerted", }, ], active_zone_counts={"1": 1}, ) repeated_events = engine.process(obs(self.t0 + timedelta(seconds=1300), {"1": 1})) removal_events = engine.process(obs(self.t0 + timedelta(seconds=1400), {"1": 0})) self.assertEqual(repeated_events, []) self.assertEqual([event["event"] for event in removal_events], ["batch_pending_disposal"]) self.assertEqual(removal_events[0]["batch_id"], "batch_000124") self.assertEqual(removal_events[0]["dwell_seconds"], 1400) def test_restore_skips_active_false_positive_when_latest_zone_count_is_empty(self) -> None: settings = EngineSettings( camera_id="test_cam", max_dwell_seconds=1200, trash_confirmation_seconds=120, zone_ids=("3",), ) engine = BatchEngine(settings) engine.restore_from_events( [ { "event": "time_alarm", "zone_id": "3", "batch_id": "batch_000213", "started_at": self.t0.isoformat(), "alerted_at": (self.t0 + timedelta(seconds=1200)).isoformat(), "current_count": 1, "state": "alerted", }, ], active_zone_counts={"3": 0}, ) events = engine.process(obs(self.t0 + timedelta(seconds=1300), {"3": 0})) self.assertEqual(events, []) self.assertEqual(engine.active_by_zone, {}) if __name__ == "__main__": unittest.main()