feat: initialize cold display guard

This commit is contained in:
Yoilun
2026-04-27 10:59:13 +08:00
commit 36dc3548e6
17 changed files with 918 additions and 0 deletions

9
.gitignore vendored Normal file
View File

@@ -0,0 +1,9 @@
__pycache__/
*.py[cod]
.pytest_cache/
.venv/
dist/
build/
*.egg-info/
logs/
*.jsonl

56
README_zh.md Normal file
View File

@@ -0,0 +1,56 @@
# 冷藏展示柜食品批次计时报警
这是一个独立项目,用于单摄像头监控冷藏展示柜和同画面垃圾桶,记录每个展示区域内食品批次的放置时长,并发现 3 小时到期后的违规行为。
## 已确认业务规则
- 摄像头同时看到展示柜和垃圾桶。
- 展示柜初始布局为横向 4 列、竖向 2 行。
- 布局后期可以通过配置调整。
- 每个区域可以放多份食品,但这些食品按同一批次计时。
- 同一区域不允许混批,必须清空后才能放入新批次。
- 食品放入区域时记录开始时间。
- 区域清空时记录结束时间。
- 未满 3 小时清空视为正常消耗。
- 超过 3 小时清空后必须在确认窗口内看到垃圾桶投放动作。
- 超过 3 小时的食品拿出后又放回展示柜,触发报警。
## 当前实现范围
当前版本先实现纯业务状态机,不依赖摄像头模型。后续视觉模块只需要输出标准观察数据:
```json
{
"ts": "2026-04-27T10:00:00+08:00",
"zone_counts": {
"r1c1": 3,
"r1c2": 0
},
"trash_deposit": false
}
```
程序会输出 JSONL 事件,例如:
- `batch_started`
- `batch_consumed`
- `batch_pending_disposal`
- `batch_discarded`
- `mixed_batch_violation`
- `overdue_return_violation`
- `missing_disposal_violation`
## 配置
示例配置在 `config/example.toml`
默认阈值:
- 最大放置时间:`10800` 秒,也就是 3 小时
- 垃圾桶投放确认窗口:`120`
## 本地测试
```bash
python3 -m unittest discover -s tests -v
```

46
config/example.toml Normal file
View File

@@ -0,0 +1,46 @@
camera_id = "cold_display_cam_01"
timezone = "Asia/Shanghai"
[thresholds]
max_dwell_seconds = 10800
trash_confirmation_seconds = 120
[layout]
rows = 2
cols = 4
zone_ids = ["r1c1", "r1c2", "r1c3", "r1c4", "r2c1", "r2c2", "r2c3", "r2c4"]
[[zones]]
id = "r1c1"
polygon = [[0.00, 0.00], [0.25, 0.00], [0.25, 0.50], [0.00, 0.50]]
[[zones]]
id = "r1c2"
polygon = [[0.25, 0.00], [0.50, 0.00], [0.50, 0.50], [0.25, 0.50]]
[[zones]]
id = "r1c3"
polygon = [[0.50, 0.00], [0.75, 0.00], [0.75, 0.50], [0.50, 0.50]]
[[zones]]
id = "r1c4"
polygon = [[0.75, 0.00], [1.00, 0.00], [1.00, 0.50], [0.75, 0.50]]
[[zones]]
id = "r2c1"
polygon = [[0.00, 0.50], [0.25, 0.50], [0.25, 1.00], [0.00, 1.00]]
[[zones]]
id = "r2c2"
polygon = [[0.25, 0.50], [0.50, 0.50], [0.50, 1.00], [0.25, 1.00]]
[[zones]]
id = "r2c3"
polygon = [[0.50, 0.50], [0.75, 0.50], [0.75, 1.00], [0.50, 1.00]]
[[zones]]
id = "r2c4"
polygon = [[0.75, 0.50], [1.00, 0.50], [1.00, 1.00], [0.75, 1.00]]
[trash]
roi = [[0.80, 0.65], [1.00, 0.65], [1.00, 1.00], [0.80, 1.00]]

View File

@@ -0,0 +1,113 @@
# Cold Display Guard Design
**Date:** 2026-04-27
## Goal
Build a single-camera refrigerated display monitoring service that records how long each food batch remains in each display zone and raises alerts when over-threshold food is removed without confirmed disposal or is placed back into the display.
## Scope
- One camera sees both the refrigerated display cabinet and the trash bin.
- The initial cabinet layout is 4 columns by 2 rows.
- The layout must be configurable because the physical arrangement may change.
- Each zone may contain multiple food items.
- Items in one zone are treated as one batch.
- Mixed batches are not allowed.
- A new batch can only start after a zone becomes empty.
## Architecture
The project separates business state from computer vision:
1. Vision adapters detect display-zone occupancy and trash-bin deposit events.
2. The batch engine receives normalized observations with timestamps.
3. The engine maintains zone and batch state.
4. The notifier layer emits JSONL events and later can send webhooks or UI alerts.
The first implementation focuses on the batch engine and event contract. Camera inference can be added later without changing compliance rules.
## Zone Model
Each zone has:
- `zone_id`
- configured polygon or bounding box
- current observed item count
- optional active batch
The initial default zones are `r1c1` through `r2c4`.
## Batch State Machine
### `active`
Food is currently visible in the zone. The batch has `started_at`, `zone_id`, and current observed count.
### `pending_disposal`
The batch exceeded the maximum dwell time and the zone became empty. The system waits for a trash-bin deposit event within a configurable window.
### `discarded`
The over-threshold batch was removed and a trash-bin deposit was observed in the confirmation window.
### `consumed`
The batch was removed before the maximum dwell threshold. No trash confirmation is required.
### `violation`
The system observed one of these conditions:
- food was added to an already occupied zone, which indicates a mixed batch
- an over-threshold removed batch was put back into any display zone before confirmed disposal
- an over-threshold removed batch was not followed by trash disposal before the confirmation deadline
## Timing Rules
- Default maximum dwell time: 3 hours (`10800` seconds).
- Default trash confirmation window: 2 minutes (`120` seconds).
- A zone changing from `0` to `>0` starts a new batch.
- A zone changing from `>0` to `0` ends the visible dwell period.
- Count decreases while still `>0` do not end the batch.
- Count increases while already `>0` produce a mixed-batch violation.
## Event Contract
The engine emits JSON-compatible events:
- `batch_started`
- `batch_count_changed`
- `batch_consumed`
- `batch_pending_disposal`
- `batch_discarded`
- `mixed_batch_violation`
- `overdue_return_violation`
- `missing_disposal_violation`
Each event includes:
- `event`
- `ts`
- `camera_id`
- `zone_id` when applicable
- `batch_id` when applicable
- timing fields such as `started_at`, `ended_at`, and `dwell_seconds`
## Future Vision Integration
The vision layer should output normalized observations:
```json
{
"ts": "2026-04-27T10:00:00+08:00",
"zone_counts": {
"r1c1": 3,
"r1c2": 0
},
"trash_deposit": false
}
```
Trash disposal confirmation should use motion/object evidence inside the trash ROI, not merely a person standing near the bin.

View File

@@ -0,0 +1,62 @@
# Cold Display Guard Implementation Plan
> **For Claude:** REQUIRED SUB-SKILL: Use superpowers:executing-plans to implement this plan task-by-task.
**Goal:** Build a standalone Python project for refrigerated display food-batch timing and 3-hour disposal compliance alerts.
**Architecture:** Keep camera and model logic outside the first business core. Implement a pure Python batch engine that consumes timestamped zone occupancy and trash-bin events, emits JSON-compatible compliance events, and can later be driven by a video adapter.
**Tech Stack:** Python 3.11+ standard library, TOML config via `tomllib`, tests via `unittest`.
---
### Task 1: Project Skeleton
**Files:**
- Create: `pyproject.toml`
- Create: `src/cold_display_guard/__init__.py`
- Create: `README_zh.md`
- Create: `config/example.toml`
**Steps:**
1. Create packaging metadata with a console script named `cold-display-guard`.
2. Create a Chinese README documenting the confirmed business rules.
3. Create default TOML config for camera id, thresholds, 4x2 zones, and trash ROI placeholder.
4. Verify imports with `python3 -m unittest discover -s tests`.
### Task 2: Batch Engine
**Files:**
- Create: `src/cold_display_guard/models.py`
- Create: `src/cold_display_guard/engine.py`
- Test: `tests/test_engine.py`
**Steps:**
1. Write tests for batch start, normal consumption, over-threshold pending disposal, trash confirmation, mixed-batch violation, overdue return violation, and missing disposal violation.
2. Implement dataclasses for observations, batches, zones, and events.
3. Implement `BatchEngine.process()`.
4. Run `python3 -m unittest tests.test_engine -v`.
### Task 3: Config and CLI
**Files:**
- Create: `src/cold_display_guard/config.py`
- Create: `src/cold_display_guard/cli.py`
- Test: `tests/test_config.py`
**Steps:**
1. Load TOML config into an engine settings dataclass.
2. Add CLI support for processing a JSONL observation file.
3. Write emitted events to stdout as JSONL.
4. Run `python3 -m unittest discover -s tests -v`.
### Task 4: Git
**Steps:**
1. Initialize git in `~/Code/cold_display_guard`.
2. Review `git status --short`.
3. Commit the initial project.

16
findings.md Normal file
View File

@@ -0,0 +1,16 @@
# Findings
## Existing Local Context
- `~/Code/video_recognition_local/store_dwell_alert` exists, but it tracks people dwell time, not food batch dwell time.
- The new project should be independent and should not share git history with the existing project.
## Domain Model
- The reliable business unit is a display-zone batch, not an individual food item.
- A batch starts when a zone changes from empty to occupied.
- A batch continues while the zone remains occupied, even if the item count decreases.
- A batch ends when the zone becomes empty.
- If a batch is removed after the maximum dwell threshold, the system expects a trash-bin deposit event within a configurable window.
- If a removed over-threshold batch reappears in any display zone before being discarded, that is a violation.
- If food is added while a zone is already occupied, that is a mixed-batch violation.

11
progress.md Normal file
View File

@@ -0,0 +1,11 @@
# Progress
## 2026-04-27
- Created project directory `~/Code/cold_display_guard`.
- Confirmed standalone project scope and initial project name.
- Started file-based planning files.
- Created core engine, config loader, CLI, README, design doc, implementation plan, and tests.
- First test run failed because `_end_batch()` set `ended_at` before calculating dwell seconds, causing ended batches to report `0` seconds. Fixed by calculating dwell before assigning `ended_at`.
- Test suite now passes: `PYTHONPATH=src python3 -m unittest discover -s tests -v`.
- Initialized git repository and created the initial project commit.

17
pyproject.toml Normal file
View File

@@ -0,0 +1,17 @@
[build-system]
requires = ["setuptools>=68"]
build-backend = "setuptools.build_meta"
[project]
name = "cold-display-guard"
version = "0.1.0"
description = "Food batch dwell-time and disposal compliance monitor for refrigerated displays"
readme = "README_zh.md"
requires-python = ">=3.11"
dependencies = []
[project.scripts]
cold-display-guard = "cold_display_guard.cli:main"
[tool.setuptools.packages.find]
where = ["src"]

View File

@@ -0,0 +1,6 @@
"""Cold display food-batch timing and compliance engine."""
from cold_display_guard.engine import BatchEngine
from cold_display_guard.models import EngineSettings, Observation
__all__ = ["BatchEngine", "EngineSettings", "Observation"]

View File

@@ -0,0 +1,61 @@
from __future__ import annotations
import argparse
import json
import sys
from pathlib import Path
from typing import TextIO
from cold_display_guard.config import load_settings
from cold_display_guard.engine import BatchEngine
from cold_display_guard.models import EngineSettings, Observation
def build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(
description="Process cold-display observation JSONL and emit compliance events.",
)
parser.add_argument(
"--config",
default="config/example.toml",
help="Path to TOML config. Defaults to config/example.toml.",
)
parser.add_argument(
"--input",
required=True,
help="Path to observation JSONL file.",
)
return parser
def run_jsonl(input_path: str | Path, settings: EngineSettings, output: TextIO) -> int:
engine = BatchEngine(settings)
with Path(input_path).open("r", encoding="utf-8") as handle:
for line_number, line in enumerate(handle, start=1):
line = line.strip()
if not line:
continue
try:
observation = Observation.from_dict(json.loads(line))
except (KeyError, TypeError, ValueError, json.JSONDecodeError) as exc:
raise ValueError(f"invalid observation at line {line_number}: {exc}") from exc
for event in engine.process(observation):
output.write(json.dumps(event, ensure_ascii=False, sort_keys=True))
output.write("\n")
return 0
def main(argv: list[str] | None = None) -> int:
parser = build_parser()
args = parser.parse_args(argv)
settings = load_settings(args.config)
try:
return run_jsonl(args.input, settings, sys.stdout)
except ValueError as exc:
print(str(exc), file=sys.stderr)
return 2
if __name__ == "__main__":
raise SystemExit(main())

View File

@@ -0,0 +1,32 @@
from __future__ import annotations
import tomllib
from pathlib import Path
from typing import Any
from cold_display_guard.models import DEFAULT_ZONE_IDS, EngineSettings
def load_settings(path: str | Path) -> EngineSettings:
data = tomllib.loads(Path(path).read_text(encoding="utf-8"))
thresholds: dict[str, Any] = data.get("thresholds", {})
layout: dict[str, Any] = data.get("layout", {})
zone_ids = tuple(layout.get("zone_ids") or _zone_ids_from_rows_cols(layout))
if not zone_ids:
zone_ids = DEFAULT_ZONE_IDS
return EngineSettings(
camera_id=str(data.get("camera_id", "cold_display_cam_01")),
max_dwell_seconds=int(thresholds.get("max_dwell_seconds", 10_800)),
trash_confirmation_seconds=int(thresholds.get("trash_confirmation_seconds", 120)),
zone_ids=zone_ids,
)
def _zone_ids_from_rows_cols(layout: dict[str, Any]) -> tuple[str, ...]:
rows = int(layout.get("rows", 0))
cols = int(layout.get("cols", 0))
if rows <= 0 or cols <= 0:
return ()
return tuple(f"r{row}c{col}" for row in range(1, rows + 1) for col in range(1, cols + 1))

View File

@@ -0,0 +1,195 @@
from __future__ import annotations
from datetime import datetime
from typing import Any
from cold_display_guard.models import Batch, EngineSettings, Observation
class BatchEngine:
def __init__(self, settings: EngineSettings | None = None) -> None:
self.settings = settings or EngineSettings()
self.active_by_zone: dict[str, Batch] = {}
self.pending_disposal: list[Batch] = []
self.closed_batches: list[Batch] = []
self._zone_counts: dict[str, int] = {zone_id: 0 for zone_id in self.settings.zone_ids}
self._next_batch_index = 1
def process(self, observation: Observation) -> list[dict[str, Any]]:
events: list[dict[str, Any]] = []
zone_counts = self._normalized_counts(observation.zone_counts)
events.extend(self._expire_pending_disposal(observation.ts))
events.extend(self._apply_trash_deposits(observation.ts, observation.trash_deposit_count))
appeared_zones = [
zone_id
for zone_id, count in zone_counts.items()
if self._zone_counts.get(zone_id, 0) == 0 and count > 0
]
if appeared_zones and self.pending_disposal:
events.extend(self._mark_pending_as_returned(observation.ts, appeared_zones))
for zone_id, new_count in zone_counts.items():
previous_count = self._zone_counts.get(zone_id, 0)
if previous_count == 0 and new_count > 0:
events.append(self._start_batch(zone_id, new_count, observation.ts))
elif previous_count > 0 and new_count == 0:
event = self._end_batch(zone_id, observation.ts)
if event is not None:
events.append(event)
elif previous_count > 0 and new_count > previous_count:
event = self._mark_mixed_batch(zone_id, previous_count, new_count, observation.ts)
if event is not None:
events.append(event)
self.active_by_zone[zone_id].last_count = new_count
elif previous_count > 0 and new_count != previous_count:
batch = self.active_by_zone.get(zone_id)
if batch is not None:
batch.last_count = new_count
events.append(
self._event(
"batch_count_changed",
observation.ts,
batch,
previous_count=previous_count,
current_count=new_count,
)
)
self._zone_counts[zone_id] = new_count
return events
def _normalized_counts(self, incoming: dict[str, int]) -> dict[str, int]:
return {
zone_id: max(0, int(incoming.get(zone_id, self._zone_counts.get(zone_id, 0))))
for zone_id in self.settings.zone_ids
}
def _next_batch_id(self) -> str:
batch_id = f"batch_{self._next_batch_index:06d}"
self._next_batch_index += 1
return batch_id
def _start_batch(self, zone_id: str, count: int, when: datetime) -> dict[str, Any]:
batch = Batch(
batch_id=self._next_batch_id(),
zone_id=zone_id,
started_at=when,
last_count=count,
)
self.active_by_zone[zone_id] = batch
return self._event("batch_started", when, batch, current_count=count)
def _end_batch(self, zone_id: str, when: datetime) -> dict[str, Any] | None:
batch = self.active_by_zone.pop(zone_id, None)
if batch is None:
return None
batch.dwell_seconds = batch.current_dwell_seconds(when)
batch.ended_at = when
if batch.dwell_seconds >= self.settings.max_dwell_seconds:
batch.state = "pending_disposal"
batch.pending_since = when
batch.disposal_deadline = when + self.settings.trash_confirmation_window
self.pending_disposal.append(batch)
return self._event("batch_pending_disposal", when, batch)
batch.state = "consumed"
self.closed_batches.append(batch)
return self._event("batch_consumed", when, batch)
def _mark_mixed_batch(
self,
zone_id: str,
previous_count: int,
current_count: int,
when: datetime,
) -> dict[str, Any] | None:
batch = self.active_by_zone.get(zone_id)
if batch is None:
return None
if "mixed_batch" in batch.violation_reasons:
return None
batch.state = "violation"
batch.violation_reasons.add("mixed_batch")
return self._event(
"mixed_batch_violation",
when,
batch,
previous_count=previous_count,
current_count=current_count,
reason="food_added_before_zone_cleared",
)
def _mark_pending_as_returned(self, when: datetime, appeared_zones: list[str]) -> list[dict[str, Any]]:
events: list[dict[str, Any]] = []
pending = list(self.pending_disposal)
self.pending_disposal.clear()
for batch in pending:
batch.state = "violation"
batch.violation_reasons.add("overdue_return")
self.closed_batches.append(batch)
events.append(
self._event(
"overdue_return_violation",
when,
batch,
appeared_zones=appeared_zones,
reason="overdue_batch_reappeared_before_disposal",
)
)
return events
def _apply_trash_deposits(self, when: datetime, deposit_count: int) -> list[dict[str, Any]]:
events: list[dict[str, Any]] = []
while deposit_count > 0 and self.pending_disposal:
batch = self.pending_disposal.pop(0)
batch.state = "discarded"
self.closed_batches.append(batch)
events.append(self._event("batch_discarded", when, batch))
deposit_count -= 1
return events
def _expire_pending_disposal(self, when: datetime) -> list[dict[str, Any]]:
events: list[dict[str, Any]] = []
still_pending: list[Batch] = []
for batch in self.pending_disposal:
if batch.disposal_deadline is not None and when > batch.disposal_deadline:
batch.state = "violation"
batch.violation_reasons.add("missing_disposal")
self.closed_batches.append(batch)
events.append(
self._event(
"missing_disposal_violation",
when,
batch,
reason="trash_deposit_not_observed_before_deadline",
)
)
else:
still_pending.append(batch)
self.pending_disposal = still_pending
return events
def _event(self, event_name: str, when: datetime, batch: Batch, **extra: Any) -> dict[str, Any]:
payload: dict[str, Any] = {
"event": event_name,
"ts": when.isoformat(),
"camera_id": self.settings.camera_id,
"zone_id": batch.zone_id,
"batch_id": batch.batch_id,
"state": batch.state,
"started_at": batch.started_at.isoformat(),
"dwell_seconds": batch.current_dwell_seconds(when),
}
if batch.ended_at is not None:
payload["ended_at"] = batch.ended_at.isoformat()
if batch.disposal_deadline is not None:
payload["disposal_deadline"] = batch.disposal_deadline.isoformat()
if batch.violation_reasons:
payload["violation_reasons"] = sorted(batch.violation_reasons)
payload.update(extra)
return payload

View File

@@ -0,0 +1,68 @@
from __future__ import annotations
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from typing import Any
DEFAULT_ZONE_IDS = tuple(f"r{row}c{col}" for row in range(1, 3) for col in range(1, 5))
@dataclass(frozen=True, slots=True)
class EngineSettings:
camera_id: str = "cold_display_cam_01"
max_dwell_seconds: int = 10_800
trash_confirmation_seconds: int = 120
zone_ids: tuple[str, ...] = DEFAULT_ZONE_IDS
@property
def max_dwell(self) -> timedelta:
return timedelta(seconds=self.max_dwell_seconds)
@property
def trash_confirmation_window(self) -> timedelta:
return timedelta(seconds=self.trash_confirmation_seconds)
@dataclass(frozen=True, slots=True)
class Observation:
ts: datetime
zone_counts: dict[str, int]
trash_deposit_count: int = 0
@classmethod
def from_dict(cls, payload: dict[str, Any]) -> "Observation":
ts = payload["ts"]
if isinstance(ts, str):
ts = datetime.fromisoformat(ts)
raw_trash = payload.get("trash_deposit_count", payload.get("trash_deposit", 0))
if isinstance(raw_trash, bool):
trash_deposit_count = 1 if raw_trash else 0
else:
trash_deposit_count = int(raw_trash)
return cls(
ts=ts,
zone_counts={key: max(0, int(value)) for key, value in payload["zone_counts"].items()},
trash_deposit_count=max(0, trash_deposit_count),
)
@dataclass(slots=True)
class Batch:
batch_id: str
zone_id: str
started_at: datetime
last_count: int
state: str = "active"
ended_at: datetime | None = None
pending_since: datetime | None = None
disposal_deadline: datetime | None = None
dwell_seconds: int = 0
violation_reasons: set[str] = field(default_factory=set)
def current_dwell_seconds(self, when: datetime) -> int:
if self.ended_at is not None:
return self.dwell_seconds
return max(0, int((when - self.started_at).total_seconds()))

31
task_plan.md Normal file
View File

@@ -0,0 +1,31 @@
# Cold Display Guard Task Plan
## Goal
Create an independent git project under `~/Code` for monitoring food batches in a refrigerated display cabinet. The system tracks each configured display zone, starts a batch timer when food appears, ends it when the zone clears, and raises compliance alerts for over-3-hour removal without trash disposal or for over-3-hour food being put back.
## Confirmed Decisions
- The trash bin is visible in the same camera frame.
- The display cabinet starts as a 4-column by 2-row layout, but zones must be configurable.
- A zone may contain multiple food items.
- Items in the same zone are treated as one batch.
- Mixed batches are not allowed; a zone must clear before a new batch can start.
- The first implementation is a standalone project, not a modification of `store_dwell_alert`.
## Phases
| Phase | Status | Notes |
| --- | --- | --- |
| Create project skeleton | complete | Built under `~/Code/cold_display_guard`. |
| Write design and implementation plan | complete | Saved in `docs/plans/`. |
| Implement core state engine | complete | Pure Python, no camera dependency. |
| Add config, CLI, README | complete | TOML config and JSONL event CLI skeleton. |
| Add tests | complete | `python3 -m unittest discover -s tests -v` passes. |
| Initialize git and commit | complete | Independent repository created. |
## Errors Encountered
| Error | Attempt | Resolution |
| --- | --- | --- |
| Ended batches reported `0` dwell seconds | First `unittest` run | Calculate dwell seconds before assigning `ended_at`. |

59
tests/test_cli.py Normal file
View File

@@ -0,0 +1,59 @@
from __future__ import annotations
import io
import json
import tempfile
import unittest
from datetime import datetime, timedelta, timezone
from pathlib import Path
from cold_display_guard.cli import run_jsonl
from cold_display_guard.models import EngineSettings
class CliTests(unittest.TestCase):
def test_processes_jsonl_observations(self) -> None:
settings = EngineSettings(
max_dwell_seconds=10,
trash_confirmation_seconds=5,
zone_ids=("r1c1",),
)
t0 = datetime(2026, 4, 27, 10, 0, tzinfo=timezone.utc)
with tempfile.TemporaryDirectory() as tmpdir:
input_path = Path(tmpdir) / "obs.jsonl"
input_path.write_text(
"\n".join(
[
json.dumps({"ts": t0.isoformat(), "zone_counts": {"r1c1": 1}}),
json.dumps(
{
"ts": (t0 + timedelta(seconds=11)).isoformat(),
"zone_counts": {"r1c1": 0},
}
),
json.dumps(
{
"ts": (t0 + timedelta(seconds=12)).isoformat(),
"zone_counts": {"r1c1": 0},
"trash_deposit": True,
}
),
]
),
encoding="utf-8",
)
output = io.StringIO()
exit_code = run_jsonl(input_path, settings, output)
self.assertEqual(exit_code, 0)
events = [json.loads(line) for line in output.getvalue().splitlines()]
self.assertEqual(
[event["event"] for event in events],
["batch_started", "batch_pending_disposal", "batch_discarded"],
)
if __name__ == "__main__":
unittest.main()

38
tests/test_config.py Normal file
View File

@@ -0,0 +1,38 @@
from __future__ import annotations
import tempfile
import unittest
from pathlib import Path
from cold_display_guard.config import load_settings
class ConfigTests(unittest.TestCase):
def test_loads_settings_from_toml(self) -> None:
with tempfile.TemporaryDirectory() as tmpdir:
path = Path(tmpdir) / "config.toml"
path.write_text(
"""
camera_id = "cam_a"
[thresholds]
max_dwell_seconds = 30
trash_confirmation_seconds = 4
[layout]
rows = 1
cols = 2
""".strip(),
encoding="utf-8",
)
settings = load_settings(path)
self.assertEqual(settings.camera_id, "cam_a")
self.assertEqual(settings.max_dwell_seconds, 30)
self.assertEqual(settings.trash_confirmation_seconds, 4)
self.assertEqual(settings.zone_ids, ("r1c1", "r1c2"))
if __name__ == "__main__":
unittest.main()

98
tests/test_engine.py Normal file
View File

@@ -0,0 +1,98 @@
from __future__ import annotations
import unittest
from datetime import datetime, timedelta, timezone
from cold_display_guard import BatchEngine, EngineSettings, Observation
UTC = timezone.utc
def obs(ts: datetime, counts: dict[str, int], trash: bool | int = False) -> Observation:
return Observation.from_dict(
{
"ts": ts.isoformat(),
"zone_counts": counts,
"trash_deposit": trash,
}
)
class BatchEngineTests(unittest.TestCase):
def setUp(self) -> None:
self.settings = EngineSettings(
camera_id="test_cam",
max_dwell_seconds=10,
trash_confirmation_seconds=5,
zone_ids=("r1c1", "r1c2"),
)
self.engine = BatchEngine(self.settings)
self.t0 = datetime(2026, 4, 27, 10, 0, tzinfo=UTC)
def test_starts_batch_when_zone_becomes_occupied(self) -> None:
events = self.engine.process(obs(self.t0, {"r1c1": 3}))
self.assertEqual([event["event"] for event in events], ["batch_started"])
self.assertEqual(events[0]["zone_id"], "r1c1")
self.assertEqual(events[0]["current_count"], 3)
def test_consumes_batch_when_removed_before_threshold(self) -> None:
self.engine.process(obs(self.t0, {"r1c1": 2}))
events = self.engine.process(obs(self.t0 + timedelta(seconds=9), {"r1c1": 0}))
self.assertEqual([event["event"] for event in events], ["batch_consumed"])
self.assertEqual(events[0]["dwell_seconds"], 9)
def test_over_threshold_removal_waits_for_disposal_confirmation(self) -> None:
self.engine.process(obs(self.t0, {"r1c1": 2}))
events = self.engine.process(obs(self.t0 + timedelta(seconds=10), {"r1c1": 0}))
self.assertEqual([event["event"] for event in events], ["batch_pending_disposal"])
self.assertEqual(events[0]["dwell_seconds"], 10)
self.assertIn("disposal_deadline", events[0])
def test_trash_deposit_confirms_pending_disposal(self) -> None:
self.engine.process(obs(self.t0, {"r1c1": 2}))
self.engine.process(obs(self.t0 + timedelta(seconds=11), {"r1c1": 0}))
events = self.engine.process(obs(self.t0 + timedelta(seconds=12), {"r1c1": 0}, trash=True))
self.assertEqual([event["event"] for event in events], ["batch_discarded"])
def test_missing_trash_deposit_raises_violation_after_deadline(self) -> None:
self.engine.process(obs(self.t0, {"r1c1": 2}))
self.engine.process(obs(self.t0 + timedelta(seconds=11), {"r1c1": 0}))
events = self.engine.process(obs(self.t0 + timedelta(seconds=17), {"r1c1": 0}))
self.assertEqual([event["event"] for event in events], ["missing_disposal_violation"])
self.assertEqual(events[0]["violation_reasons"], ["missing_disposal"])
def test_adding_food_before_zone_clears_raises_mixed_batch_violation(self) -> None:
self.engine.process(obs(self.t0, {"r1c1": 2}))
events = self.engine.process(obs(self.t0 + timedelta(seconds=1), {"r1c1": 3}))
self.assertEqual([event["event"] for event in events], ["mixed_batch_violation"])
self.assertEqual(events[0]["reason"], "food_added_before_zone_cleared")
def test_count_decrease_keeps_same_batch_active(self) -> None:
self.engine.process(obs(self.t0, {"r1c1": 3}))
events = self.engine.process(obs(self.t0 + timedelta(seconds=1), {"r1c1": 1}))
self.assertEqual([event["event"] for event in events], ["batch_count_changed"])
self.assertEqual(events[0]["previous_count"], 3)
self.assertEqual(events[0]["current_count"], 1)
def test_overdue_food_reappearing_before_disposal_raises_violation(self) -> None:
self.engine.process(obs(self.t0, {"r1c1": 2}))
self.engine.process(obs(self.t0 + timedelta(seconds=11), {"r1c1": 0}))
events = self.engine.process(obs(self.t0 + timedelta(seconds=12), {"r1c2": 1}))
self.assertEqual(
[event["event"] for event in events],
["overdue_return_violation", "batch_started"],
)
self.assertEqual(events[0]["appeared_zones"], ["r1c2"])
if __name__ == "__main__":
unittest.main()