fix: ignore global lighting shifts in occupancy

This commit is contained in:
Yoilun
2026-06-01 09:56:11 +08:00
parent 100b949f1f
commit 1ecf881684
6 changed files with 162 additions and 3 deletions

View File

@@ -183,6 +183,10 @@ occupancy_reflection_dark_fraction = 0.10
occupancy_reflection_bright_dark_ratio = 2.0
occupancy_confirm_frames = 2
empty_confirm_frames = 2
lighting_shift_guard_enabled = true
lighting_shift_min_regions = 3
lighting_shift_region_fraction = 0.6
lighting_shift_mean_delta = 45.0
trash_motion_delta = 18.0
trash_sustained_motion_delta = 8.0
trash_sustained_motion_frames = 2
@@ -208,6 +212,7 @@ diagnostics_path = "logs/runtime_diagnostics.jsonl"
运行诊断写入 `logs/runtime_diagnostics.jsonl`。每行包含顶层 `disposal_evidence`,以及 `diagnostics.trajectory`
- 顶层 `disposal_evidence`:本帧实际输出给状态机的来源区域到垃圾桶证据。
- `diagnostics.lighting_shift`:多数区域同时同方向亮度漂移时启用,防止灯光/曝光变化被当成食品进出。
- `diagnostics.trajectory`:轻量轨迹 backend 的候选、过期、拒绝和已发出证据等调试信息。
## 本地测试

View File

@@ -48,6 +48,10 @@ occupancy_reflection_dark_fraction = 0.10
occupancy_reflection_bright_dark_ratio = 2.0
occupancy_confirm_frames = 2
empty_confirm_frames = 2
lighting_shift_guard_enabled = true
lighting_shift_min_regions = 3
lighting_shift_region_fraction = 0.6
lighting_shift_mean_delta = 45.0
trash_motion_delta = 18.0
trash_sustained_motion_delta = 8.0
trash_sustained_motion_frames = 2

View File

@@ -50,6 +50,8 @@ The `v1.2 轨迹识别` batch adds source-zone trajectory evidence for disposal
- Stored under `[trash] roi`.
- Does not use a food zone number.
- v1.2 trajectory settings:
- `lighting_shift_guard_enabled`: freezes occupancy changes when many regions shift brightness in the same direction.
- `lighting_shift_min_regions`, `lighting_shift_region_fraction`, `lighting_shift_mean_delta`: tune the global lighting/exposure guard.
- `trajectory_enabled`: enables source-zone trajectory evidence.
- `trajectory_window_seconds`: seconds after a zone clears where movement can confirm disposal.
- `trajectory_sample_interval_seconds`: faster runtime delay while a candidate is active.
@@ -78,6 +80,7 @@ The current tracker is a motion backend only. A later trained YOLO detector shou
- Runtime diagnostics JSONL records one item per runtime iteration.
- Root `disposal_evidence` is the exact evidence list passed into the engine for that iteration.
- `diagnostics.zones` contains occupancy metrics used to derive `zone_counts`.
- `diagnostics.lighting_shift` reports whether global brightness drift suppressed occupancy transitions.
- `diagnostics.trash` contains generic trash-motion metrics and cooldown state.
- `diagnostics.trajectory` contains v1.2 candidate counts, emitted evidence count, motion point count, and per-candidate emitted/rejected/expired records.
- Capture failures still keep the v1.2 schema with root `disposal_evidence: []` and `diagnostics.trajectory.reason = "frame_capture_failed"`.
@@ -122,6 +125,7 @@ In v1.2, `batch_discarded` can be triggered by zone-scoped `disposal_evidence` b
## Known Risks
- The current vision detector is heuristic and reports binary occupancy, not item counts.
- The lighting-shift guard suppresses multi-zone brightness/exposure jumps; if operators intentionally fill most zones at once under a large lighting change, diagnostics should be reviewed before treating that interval as clean data.
- v1.2 motion tracking improves disposal matching but can still miss movement if the hand/object path is occluded, too broad, too small, or sampled too sparsely.
- YOLO config fields are present for compatibility, but no trained YOLO model is part of the current runtime.
- If food is already present during baseline collection, those regions may be treated as empty baseline until visual changes occur.

View File

@@ -29,6 +29,7 @@
- Stage 2 added the lightweight motion trajectory runtime path: ROI occupancy still drives occupied/empty state, `TrajectoryTracker` emits source-zone-to-trash evidence, and generic trash-motion count remains as a fallback.
- Stage 3 added diagnostics and tests for runtime evidence propagation, trajectory sampling interval behavior, capture-failure schema, and trajectory/yolo runtime config parsing.
- Final review fixes: matched evidence now only subtracts the trash fallback budget by the number of batches it actually closed, and trajectory candidates reject outside-before-source motion with `motion_started_outside_source`.
- 2026-06-01 false events across zones 1-7 were caused by a global brightness/exposure drop around 04:55 and recovery around 08:16; the fix is a lighting-shift guard that freezes occupancy transitions when many regions shift brightness in the same direction.
- Current v1.2 does not use YOLO. `yolo_enabled`, `yolo_model_path`, and `yolo_min_confidence` are reserved for a future trained model backend that should keep emitting the same `disposal_evidence` shape.
## Remote Deployment Notes

View File

@@ -2,6 +2,7 @@ from __future__ import annotations
from dataclasses import dataclass
from datetime import datetime, timedelta
from math import ceil
from typing import Any
from cold_display_guard.models import DisposalEvidence
@@ -39,6 +40,10 @@ class RuntimeVisionSettings:
occupancy_reflection_bright_dark_ratio: float = 2.0
occupancy_confirm_frames: int = 2
empty_confirm_frames: int = 2
lighting_shift_guard_enabled: bool = True
lighting_shift_min_regions: int = 3
lighting_shift_region_fraction: float = 0.6
lighting_shift_mean_delta: float = 45.0
trash_motion_delta: float = 18.0
trash_sustained_motion_delta: float = 8.0
trash_sustained_motion_frames: int = 2
@@ -125,7 +130,12 @@ class ZoneOccupancyDetector:
self._update_baseline(metrics_by_region)
zone_counts: dict[str, int] = {}
diagnostics: dict[str, Any] = {"zones": {}, "baseline_ready": self.baseline_ready}
lighting_shift = self._lighting_shift(metrics_by_region)
diagnostics: dict[str, Any] = {
"zones": {},
"baseline_ready": self.baseline_ready,
"lighting_shift": lighting_shift,
}
for region in self.regions:
metrics = metrics_by_region[region.region_id]
baseline = self._baseline.get(region.region_id)
@@ -133,8 +143,12 @@ class ZoneOccupancyDetector:
if baseline is not None:
mean_delta = abs(metrics.mean_luma - baseline.mean_luma)
texture_delta = metrics.texture - baseline.texture
raw_occupied = self._raw_occupied(metrics, baseline, mean_delta, texture_delta)
occupied = self._confirmed_occupancy(region.region_id, raw_occupied)
if lighting_shift["active"]:
raw_occupied = False
occupied = self._stable_occupancy.get(region.region_id, False)
else:
raw_occupied = self._raw_occupied(metrics, baseline, mean_delta, texture_delta)
occupied = self._confirmed_occupancy(region.region_id, raw_occupied)
diagnostics["zones"][region.region_id] = {
"mean_luma": round(metrics.mean_luma, 3),
"baseline_mean_luma": round(baseline.mean_luma, 3),
@@ -151,6 +165,7 @@ class ZoneOccupancyDetector:
"occupied": occupied,
"occupied_streak": self._occupied_streaks[region.region_id],
"empty_streak": self._empty_streaks[region.region_id],
"lighting_shift_suppressed": lighting_shift["active"],
}
zone_counts[region.region_id] = 1 if occupied else 0
@@ -201,6 +216,60 @@ class ZoneOccupancyDetector:
if len(samples) >= self.settings.baseline_frames:
self._baseline[region_id] = average_metrics(samples)
def _lighting_shift(self, metrics_by_region: dict[str, RegionMetrics]) -> dict[str, Any]:
if not self.settings.lighting_shift_guard_enabled:
return self._lighting_shift_diagnostics(False, None, 0, 0, 0)
eligible_region_count = 0
darker_count = 0
brighter_count = 0
for region in self.regions:
metrics = metrics_by_region.get(region.region_id)
baseline = self._baseline.get(region.region_id)
if metrics is None or baseline is None:
continue
eligible_region_count += 1
signed_delta = metrics.mean_luma - baseline.mean_luma
if abs(signed_delta) < self.settings.lighting_shift_mean_delta:
continue
if signed_delta < 0:
darker_count += 1
else:
brighter_count += 1
required_regions = max(
self.settings.lighting_shift_min_regions,
ceil(eligible_region_count * self.settings.lighting_shift_region_fraction),
)
active_direction: str | None = None
shifted_count = max(darker_count, brighter_count)
if eligible_region_count >= self.settings.lighting_shift_min_regions and shifted_count >= required_regions:
active_direction = "darker" if darker_count >= brighter_count else "brighter"
return self._lighting_shift_diagnostics(
active_direction is not None,
active_direction,
shifted_count,
eligible_region_count,
required_regions,
)
def _lighting_shift_diagnostics(
self,
active: bool,
direction: str | None,
shifted_regions: int,
eligible_regions: int,
required_regions: int,
) -> dict[str, Any]:
return {
"active": active,
"direction": direction,
"shifted_regions": shifted_regions,
"eligible_regions": eligible_regions,
"required_regions": required_regions,
"mean_delta_threshold": self.settings.lighting_shift_mean_delta,
}
def _confirmed_occupancy(self, region_id: str, raw_occupied: bool) -> bool:
if raw_occupied:
self._occupied_streaks[region_id] = self._occupied_streaks.get(region_id, 0) + 1
@@ -635,6 +704,10 @@ def load_runtime_vision_settings(config: dict[str, Any]) -> RuntimeVisionSetting
occupancy_reflection_bright_dark_ratio=float(runtime.get("occupancy_reflection_bright_dark_ratio", 2.0)),
occupancy_confirm_frames=max(1, int(runtime.get("occupancy_confirm_frames", 2))),
empty_confirm_frames=max(1, int(runtime.get("empty_confirm_frames", 2))),
lighting_shift_guard_enabled=bool(runtime.get("lighting_shift_guard_enabled", True)),
lighting_shift_min_regions=max(1, int(runtime.get("lighting_shift_min_regions", 3))),
lighting_shift_region_fraction=max(0.0, min(1.0, float(runtime.get("lighting_shift_region_fraction", 0.6)))),
lighting_shift_mean_delta=float(runtime.get("lighting_shift_mean_delta", 45.0)),
trash_motion_delta=float(runtime.get("trash_motion_delta", 18.0)),
trash_sustained_motion_delta=float(runtime.get("trash_sustained_motion_delta", 8.0)),
trash_sustained_motion_frames=max(1, int(runtime.get("trash_sustained_motion_frames", 2))),

View File

@@ -158,6 +158,66 @@ class VisionTests(unittest.TestCase):
self.assertEqual(first_empty_counts, {"1": 1})
self.assertEqual(second_empty_counts, {"1": 0})
def test_detector_ignores_global_lighting_dimming_across_many_zones(self) -> None:
regions = [
Region(str(index + 1), ((index / 7, 0.0), ((index + 1) / 7, 0.0), ((index + 1) / 7, 1.0), (index / 7, 1.0)))
for index in range(7)
]
detector = ZoneOccupancyDetector(
regions,
trash_region=None,
settings=RuntimeVisionSettings(
baseline_frames=1,
sample_stride_pixels=2,
occupancy_mean_delta=55,
occupancy_texture_delta=18,
occupancy_confirm_frames=2,
empty_confirm_frames=2,
),
)
now = datetime(2026, 6, 1, 4, 55, tzinfo=timezone.utc)
detector.observe(solid_frame(70, 20, 180), now)
first_counts, _, first_diagnostics = detector.observe(solid_frame(70, 20, 100), now + timedelta(seconds=5))
second_counts, _, second_diagnostics = detector.observe(solid_frame(70, 20, 100), now + timedelta(seconds=10))
self.assertEqual(first_counts, {str(index): 0 for index in range(1, 8)})
self.assertEqual(second_counts, {str(index): 0 for index in range(1, 8)})
self.assertTrue(first_diagnostics["lighting_shift"]["active"])
self.assertTrue(second_diagnostics["lighting_shift"]["active"])
self.assertTrue(all(not zone["raw_occupied"] for zone in second_diagnostics["zones"].values()))
def test_detector_allows_single_zone_object_while_lighting_guard_is_available(self) -> None:
regions = [
Region(str(index + 1), ((index / 7, 0.0), ((index + 1) / 7, 0.0), ((index + 1) / 7, 1.0), (index / 7, 1.0)))
for index in range(7)
]
detector = ZoneOccupancyDetector(
regions,
trash_region=None,
settings=RuntimeVisionSettings(
baseline_frames=1,
sample_stride_pixels=2,
occupancy_mean_delta=55,
occupancy_texture_delta=100,
occupancy_dark_luma_threshold=80,
occupancy_dark_fraction=0.06,
occupancy_confirm_frames=2,
empty_confirm_frames=2,
),
)
now = datetime(2026, 6, 1, 10, 0, tzinfo=timezone.utc)
detector.observe(solid_frame(70, 20, 180), now)
object_frame = patched_frame(70, 20, 180, (0, 0, 10, 20, 20))
detector.observe(object_frame, now + timedelta(seconds=5))
counts, _, diagnostics = detector.observe(object_frame, now + timedelta(seconds=10))
self.assertEqual(counts["1"], 1)
self.assertEqual(sum(counts.values()), 1)
self.assertFalse(diagnostics["lighting_shift"]["active"])
self.assertTrue(diagnostics["zones"]["1"]["raw_occupied"])
def test_runtime_vision_defaults_raise_brightness_reflection_threshold(self) -> None:
settings = load_runtime_vision_settings({})
@@ -169,6 +229,10 @@ class VisionTests(unittest.TestCase):
self.assertEqual(settings.trash_sustained_motion_delta, 8.0)
self.assertEqual(settings.trash_sustained_motion_frames, 2)
self.assertEqual(settings.trash_motion_cooldown_seconds, 3)
self.assertTrue(settings.lighting_shift_guard_enabled)
self.assertEqual(settings.lighting_shift_min_regions, 3)
self.assertAlmostEqual(settings.lighting_shift_region_fraction, 0.6)
self.assertEqual(settings.lighting_shift_mean_delta, 45.0)
def test_detector_can_seed_previous_baseline_and_occupancy(self) -> None:
detector = ZoneOccupancyDetector(
@@ -596,6 +660,10 @@ class VisionTests(unittest.TestCase):
"yolo_enabled": True,
"yolo_model_path": "models/yolo.onnx",
"yolo_min_confidence": 0.7,
"lighting_shift_guard_enabled": False,
"lighting_shift_min_regions": 4,
"lighting_shift_region_fraction": 0.75,
"lighting_shift_mean_delta": 60.0,
}
}
)
@@ -613,6 +681,10 @@ class VisionTests(unittest.TestCase):
self.assertTrue(settings.yolo_enabled)
self.assertEqual(settings.yolo_model_path, "models/yolo.onnx")
self.assertEqual(settings.yolo_min_confidence, 0.7)
self.assertFalse(settings.lighting_shift_guard_enabled)
self.assertEqual(settings.lighting_shift_min_regions, 4)
self.assertAlmostEqual(settings.lighting_shift_region_fraction, 0.75)
self.assertEqual(settings.lighting_shift_mean_delta, 60.0)
if __name__ == "__main__":