commit 36dc3548e63eb073e1b783a471ced06a3f0fbcd2 Author: Yoilun Date: Mon Apr 27 10:59:13 2026 +0800 feat: initialize cold display guard diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..8d49296 --- /dev/null +++ b/.gitignore @@ -0,0 +1,9 @@ +__pycache__/ +*.py[cod] +.pytest_cache/ +.venv/ +dist/ +build/ +*.egg-info/ +logs/ +*.jsonl diff --git a/README_zh.md b/README_zh.md new file mode 100644 index 0000000..23fe2c6 --- /dev/null +++ b/README_zh.md @@ -0,0 +1,56 @@ +# 冷藏展示柜食品批次计时报警 + +这是一个独立项目,用于单摄像头监控冷藏展示柜和同画面垃圾桶,记录每个展示区域内食品批次的放置时长,并发现 3 小时到期后的违规行为。 + +## 已确认业务规则 + +- 摄像头同时看到展示柜和垃圾桶。 +- 展示柜初始布局为横向 4 列、竖向 2 行。 +- 布局后期可以通过配置调整。 +- 每个区域可以放多份食品,但这些食品按同一批次计时。 +- 同一区域不允许混批,必须清空后才能放入新批次。 +- 食品放入区域时记录开始时间。 +- 区域清空时记录结束时间。 +- 未满 3 小时清空视为正常消耗。 +- 超过 3 小时清空后必须在确认窗口内看到垃圾桶投放动作。 +- 超过 3 小时的食品拿出后又放回展示柜,触发报警。 + +## 当前实现范围 + +当前版本先实现纯业务状态机,不依赖摄像头模型。后续视觉模块只需要输出标准观察数据: + +```json +{ + "ts": "2026-04-27T10:00:00+08:00", + "zone_counts": { + "r1c1": 3, + "r1c2": 0 + }, + "trash_deposit": false +} +``` + +程序会输出 JSONL 事件,例如: + +- `batch_started` +- `batch_consumed` +- `batch_pending_disposal` +- `batch_discarded` +- `mixed_batch_violation` +- `overdue_return_violation` +- `missing_disposal_violation` + +## 配置 + +示例配置在 `config/example.toml`。 + +默认阈值: + +- 最大放置时间:`10800` 秒,也就是 3 小时 +- 垃圾桶投放确认窗口:`120` 秒 + +## 本地测试 + +```bash +python3 -m unittest discover -s tests -v +``` diff --git a/config/example.toml b/config/example.toml new file mode 100644 index 0000000..234aa3b --- /dev/null +++ b/config/example.toml @@ -0,0 +1,46 @@ +camera_id = "cold_display_cam_01" +timezone = "Asia/Shanghai" + +[thresholds] +max_dwell_seconds = 10800 +trash_confirmation_seconds = 120 + +[layout] +rows = 2 +cols = 4 +zone_ids = ["r1c1", "r1c2", "r1c3", "r1c4", "r2c1", "r2c2", "r2c3", "r2c4"] + +[[zones]] +id = "r1c1" +polygon = [[0.00, 0.00], [0.25, 0.00], [0.25, 0.50], [0.00, 0.50]] + +[[zones]] +id = "r1c2" +polygon = [[0.25, 0.00], [0.50, 0.00], [0.50, 0.50], [0.25, 0.50]] + +[[zones]] +id = "r1c3" +polygon = [[0.50, 0.00], [0.75, 0.00], [0.75, 0.50], [0.50, 0.50]] + +[[zones]] +id = "r1c4" +polygon = [[0.75, 0.00], [1.00, 0.00], [1.00, 0.50], [0.75, 0.50]] + +[[zones]] +id = "r2c1" +polygon = [[0.00, 0.50], [0.25, 0.50], [0.25, 1.00], [0.00, 1.00]] + +[[zones]] +id = "r2c2" +polygon = [[0.25, 0.50], [0.50, 0.50], [0.50, 1.00], [0.25, 1.00]] + +[[zones]] +id = "r2c3" +polygon = [[0.50, 0.50], [0.75, 0.50], [0.75, 1.00], [0.50, 1.00]] + +[[zones]] +id = "r2c4" +polygon = [[0.75, 0.50], [1.00, 0.50], [1.00, 1.00], [0.75, 1.00]] + +[trash] +roi = [[0.80, 0.65], [1.00, 0.65], [1.00, 1.00], [0.80, 1.00]] diff --git a/docs/plans/2026-04-27-cold-display-guard-design.md b/docs/plans/2026-04-27-cold-display-guard-design.md new file mode 100644 index 0000000..0ca5e39 --- /dev/null +++ b/docs/plans/2026-04-27-cold-display-guard-design.md @@ -0,0 +1,113 @@ +# Cold Display Guard Design + +**Date:** 2026-04-27 + +## Goal + +Build a single-camera refrigerated display monitoring service that records how long each food batch remains in each display zone and raises alerts when over-threshold food is removed without confirmed disposal or is placed back into the display. + +## Scope + +- One camera sees both the refrigerated display cabinet and the trash bin. +- The initial cabinet layout is 4 columns by 2 rows. +- The layout must be configurable because the physical arrangement may change. +- Each zone may contain multiple food items. +- Items in one zone are treated as one batch. +- Mixed batches are not allowed. +- A new batch can only start after a zone becomes empty. + +## Architecture + +The project separates business state from computer vision: + +1. Vision adapters detect display-zone occupancy and trash-bin deposit events. +2. The batch engine receives normalized observations with timestamps. +3. The engine maintains zone and batch state. +4. The notifier layer emits JSONL events and later can send webhooks or UI alerts. + +The first implementation focuses on the batch engine and event contract. Camera inference can be added later without changing compliance rules. + +## Zone Model + +Each zone has: + +- `zone_id` +- configured polygon or bounding box +- current observed item count +- optional active batch + +The initial default zones are `r1c1` through `r2c4`. + +## Batch State Machine + +### `active` + +Food is currently visible in the zone. The batch has `started_at`, `zone_id`, and current observed count. + +### `pending_disposal` + +The batch exceeded the maximum dwell time and the zone became empty. The system waits for a trash-bin deposit event within a configurable window. + +### `discarded` + +The over-threshold batch was removed and a trash-bin deposit was observed in the confirmation window. + +### `consumed` + +The batch was removed before the maximum dwell threshold. No trash confirmation is required. + +### `violation` + +The system observed one of these conditions: + +- food was added to an already occupied zone, which indicates a mixed batch +- an over-threshold removed batch was put back into any display zone before confirmed disposal +- an over-threshold removed batch was not followed by trash disposal before the confirmation deadline + +## Timing Rules + +- Default maximum dwell time: 3 hours (`10800` seconds). +- Default trash confirmation window: 2 minutes (`120` seconds). +- A zone changing from `0` to `>0` starts a new batch. +- A zone changing from `>0` to `0` ends the visible dwell period. +- Count decreases while still `>0` do not end the batch. +- Count increases while already `>0` produce a mixed-batch violation. + +## Event Contract + +The engine emits JSON-compatible events: + +- `batch_started` +- `batch_count_changed` +- `batch_consumed` +- `batch_pending_disposal` +- `batch_discarded` +- `mixed_batch_violation` +- `overdue_return_violation` +- `missing_disposal_violation` + +Each event includes: + +- `event` +- `ts` +- `camera_id` +- `zone_id` when applicable +- `batch_id` when applicable +- timing fields such as `started_at`, `ended_at`, and `dwell_seconds` + +## Future Vision Integration + +The vision layer should output normalized observations: + +```json +{ + "ts": "2026-04-27T10:00:00+08:00", + "zone_counts": { + "r1c1": 3, + "r1c2": 0 + }, + "trash_deposit": false +} +``` + +Trash disposal confirmation should use motion/object evidence inside the trash ROI, not merely a person standing near the bin. diff --git a/docs/plans/2026-04-27-cold-display-guard.md b/docs/plans/2026-04-27-cold-display-guard.md new file mode 100644 index 0000000..4690314 --- /dev/null +++ b/docs/plans/2026-04-27-cold-display-guard.md @@ -0,0 +1,62 @@ +# Cold Display Guard Implementation Plan + +> **For Claude:** REQUIRED SUB-SKILL: Use superpowers:executing-plans to implement this plan task-by-task. + +**Goal:** Build a standalone Python project for refrigerated display food-batch timing and 3-hour disposal compliance alerts. + +**Architecture:** Keep camera and model logic outside the first business core. Implement a pure Python batch engine that consumes timestamped zone occupancy and trash-bin events, emits JSON-compatible compliance events, and can later be driven by a video adapter. + +**Tech Stack:** Python 3.11+ standard library, TOML config via `tomllib`, tests via `unittest`. + +--- + +### Task 1: Project Skeleton + +**Files:** +- Create: `pyproject.toml` +- Create: `src/cold_display_guard/__init__.py` +- Create: `README_zh.md` +- Create: `config/example.toml` + +**Steps:** + +1. Create packaging metadata with a console script named `cold-display-guard`. +2. Create a Chinese README documenting the confirmed business rules. +3. Create default TOML config for camera id, thresholds, 4x2 zones, and trash ROI placeholder. +4. Verify imports with `python3 -m unittest discover -s tests`. + +### Task 2: Batch Engine + +**Files:** +- Create: `src/cold_display_guard/models.py` +- Create: `src/cold_display_guard/engine.py` +- Test: `tests/test_engine.py` + +**Steps:** + +1. Write tests for batch start, normal consumption, over-threshold pending disposal, trash confirmation, mixed-batch violation, overdue return violation, and missing disposal violation. +2. Implement dataclasses for observations, batches, zones, and events. +3. Implement `BatchEngine.process()`. +4. Run `python3 -m unittest tests.test_engine -v`. + +### Task 3: Config and CLI + +**Files:** +- Create: `src/cold_display_guard/config.py` +- Create: `src/cold_display_guard/cli.py` +- Test: `tests/test_config.py` + +**Steps:** + +1. Load TOML config into an engine settings dataclass. +2. Add CLI support for processing a JSONL observation file. +3. Write emitted events to stdout as JSONL. +4. Run `python3 -m unittest discover -s tests -v`. + +### Task 4: Git + +**Steps:** + +1. Initialize git in `~/Code/cold_display_guard`. +2. Review `git status --short`. +3. Commit the initial project. diff --git a/findings.md b/findings.md new file mode 100644 index 0000000..5cdacdd --- /dev/null +++ b/findings.md @@ -0,0 +1,16 @@ +# Findings + +## Existing Local Context + +- `~/Code/video_recognition_local/store_dwell_alert` exists, but it tracks people dwell time, not food batch dwell time. +- The new project should be independent and should not share git history with the existing project. + +## Domain Model + +- The reliable business unit is a display-zone batch, not an individual food item. +- A batch starts when a zone changes from empty to occupied. +- A batch continues while the zone remains occupied, even if the item count decreases. +- A batch ends when the zone becomes empty. +- If a batch is removed after the maximum dwell threshold, the system expects a trash-bin deposit event within a configurable window. +- If a removed over-threshold batch reappears in any display zone before being discarded, that is a violation. +- If food is added while a zone is already occupied, that is a mixed-batch violation. diff --git a/progress.md b/progress.md new file mode 100644 index 0000000..8dfa22d --- /dev/null +++ b/progress.md @@ -0,0 +1,11 @@ +# Progress + +## 2026-04-27 + +- Created project directory `~/Code/cold_display_guard`. +- Confirmed standalone project scope and initial project name. +- Started file-based planning files. +- Created core engine, config loader, CLI, README, design doc, implementation plan, and tests. +- First test run failed because `_end_batch()` set `ended_at` before calculating dwell seconds, causing ended batches to report `0` seconds. Fixed by calculating dwell before assigning `ended_at`. +- Test suite now passes: `PYTHONPATH=src python3 -m unittest discover -s tests -v`. +- Initialized git repository and created the initial project commit. diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..c18e7b9 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,17 @@ +[build-system] +requires = ["setuptools>=68"] +build-backend = "setuptools.build_meta" + +[project] +name = "cold-display-guard" +version = "0.1.0" +description = "Food batch dwell-time and disposal compliance monitor for refrigerated displays" +readme = "README_zh.md" +requires-python = ">=3.11" +dependencies = [] + +[project.scripts] +cold-display-guard = "cold_display_guard.cli:main" + +[tool.setuptools.packages.find] +where = ["src"] diff --git a/src/cold_display_guard/__init__.py b/src/cold_display_guard/__init__.py new file mode 100644 index 0000000..9069797 --- /dev/null +++ b/src/cold_display_guard/__init__.py @@ -0,0 +1,6 @@ +"""Cold display food-batch timing and compliance engine.""" + +from cold_display_guard.engine import BatchEngine +from cold_display_guard.models import EngineSettings, Observation + +__all__ = ["BatchEngine", "EngineSettings", "Observation"] diff --git a/src/cold_display_guard/cli.py b/src/cold_display_guard/cli.py new file mode 100644 index 0000000..a27c8c9 --- /dev/null +++ b/src/cold_display_guard/cli.py @@ -0,0 +1,61 @@ +from __future__ import annotations + +import argparse +import json +import sys +from pathlib import Path +from typing import TextIO + +from cold_display_guard.config import load_settings +from cold_display_guard.engine import BatchEngine +from cold_display_guard.models import EngineSettings, Observation + + +def build_parser() -> argparse.ArgumentParser: + parser = argparse.ArgumentParser( + description="Process cold-display observation JSONL and emit compliance events.", + ) + parser.add_argument( + "--config", + default="config/example.toml", + help="Path to TOML config. Defaults to config/example.toml.", + ) + parser.add_argument( + "--input", + required=True, + help="Path to observation JSONL file.", + ) + return parser + + +def run_jsonl(input_path: str | Path, settings: EngineSettings, output: TextIO) -> int: + engine = BatchEngine(settings) + with Path(input_path).open("r", encoding="utf-8") as handle: + for line_number, line in enumerate(handle, start=1): + line = line.strip() + if not line: + continue + try: + observation = Observation.from_dict(json.loads(line)) + except (KeyError, TypeError, ValueError, json.JSONDecodeError) as exc: + raise ValueError(f"invalid observation at line {line_number}: {exc}") from exc + + for event in engine.process(observation): + output.write(json.dumps(event, ensure_ascii=False, sort_keys=True)) + output.write("\n") + return 0 + + +def main(argv: list[str] | None = None) -> int: + parser = build_parser() + args = parser.parse_args(argv) + settings = load_settings(args.config) + try: + return run_jsonl(args.input, settings, sys.stdout) + except ValueError as exc: + print(str(exc), file=sys.stderr) + return 2 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/src/cold_display_guard/config.py b/src/cold_display_guard/config.py new file mode 100644 index 0000000..6ff0763 --- /dev/null +++ b/src/cold_display_guard/config.py @@ -0,0 +1,32 @@ +from __future__ import annotations + +import tomllib +from pathlib import Path +from typing import Any + +from cold_display_guard.models import DEFAULT_ZONE_IDS, EngineSettings + + +def load_settings(path: str | Path) -> EngineSettings: + data = tomllib.loads(Path(path).read_text(encoding="utf-8")) + thresholds: dict[str, Any] = data.get("thresholds", {}) + layout: dict[str, Any] = data.get("layout", {}) + + zone_ids = tuple(layout.get("zone_ids") or _zone_ids_from_rows_cols(layout)) + if not zone_ids: + zone_ids = DEFAULT_ZONE_IDS + + return EngineSettings( + camera_id=str(data.get("camera_id", "cold_display_cam_01")), + max_dwell_seconds=int(thresholds.get("max_dwell_seconds", 10_800)), + trash_confirmation_seconds=int(thresholds.get("trash_confirmation_seconds", 120)), + zone_ids=zone_ids, + ) + + +def _zone_ids_from_rows_cols(layout: dict[str, Any]) -> tuple[str, ...]: + rows = int(layout.get("rows", 0)) + cols = int(layout.get("cols", 0)) + if rows <= 0 or cols <= 0: + return () + return tuple(f"r{row}c{col}" for row in range(1, rows + 1) for col in range(1, cols + 1)) diff --git a/src/cold_display_guard/engine.py b/src/cold_display_guard/engine.py new file mode 100644 index 0000000..ca7c094 --- /dev/null +++ b/src/cold_display_guard/engine.py @@ -0,0 +1,195 @@ +from __future__ import annotations + +from datetime import datetime +from typing import Any + +from cold_display_guard.models import Batch, EngineSettings, Observation + + +class BatchEngine: + def __init__(self, settings: EngineSettings | None = None) -> None: + self.settings = settings or EngineSettings() + self.active_by_zone: dict[str, Batch] = {} + self.pending_disposal: list[Batch] = [] + self.closed_batches: list[Batch] = [] + self._zone_counts: dict[str, int] = {zone_id: 0 for zone_id in self.settings.zone_ids} + self._next_batch_index = 1 + + def process(self, observation: Observation) -> list[dict[str, Any]]: + events: list[dict[str, Any]] = [] + zone_counts = self._normalized_counts(observation.zone_counts) + + events.extend(self._expire_pending_disposal(observation.ts)) + events.extend(self._apply_trash_deposits(observation.ts, observation.trash_deposit_count)) + + appeared_zones = [ + zone_id + for zone_id, count in zone_counts.items() + if self._zone_counts.get(zone_id, 0) == 0 and count > 0 + ] + if appeared_zones and self.pending_disposal: + events.extend(self._mark_pending_as_returned(observation.ts, appeared_zones)) + + for zone_id, new_count in zone_counts.items(): + previous_count = self._zone_counts.get(zone_id, 0) + if previous_count == 0 and new_count > 0: + events.append(self._start_batch(zone_id, new_count, observation.ts)) + elif previous_count > 0 and new_count == 0: + event = self._end_batch(zone_id, observation.ts) + if event is not None: + events.append(event) + elif previous_count > 0 and new_count > previous_count: + event = self._mark_mixed_batch(zone_id, previous_count, new_count, observation.ts) + if event is not None: + events.append(event) + self.active_by_zone[zone_id].last_count = new_count + elif previous_count > 0 and new_count != previous_count: + batch = self.active_by_zone.get(zone_id) + if batch is not None: + batch.last_count = new_count + events.append( + self._event( + "batch_count_changed", + observation.ts, + batch, + previous_count=previous_count, + current_count=new_count, + ) + ) + + self._zone_counts[zone_id] = new_count + + return events + + def _normalized_counts(self, incoming: dict[str, int]) -> dict[str, int]: + return { + zone_id: max(0, int(incoming.get(zone_id, self._zone_counts.get(zone_id, 0)))) + for zone_id in self.settings.zone_ids + } + + def _next_batch_id(self) -> str: + batch_id = f"batch_{self._next_batch_index:06d}" + self._next_batch_index += 1 + return batch_id + + def _start_batch(self, zone_id: str, count: int, when: datetime) -> dict[str, Any]: + batch = Batch( + batch_id=self._next_batch_id(), + zone_id=zone_id, + started_at=when, + last_count=count, + ) + self.active_by_zone[zone_id] = batch + return self._event("batch_started", when, batch, current_count=count) + + def _end_batch(self, zone_id: str, when: datetime) -> dict[str, Any] | None: + batch = self.active_by_zone.pop(zone_id, None) + if batch is None: + return None + + batch.dwell_seconds = batch.current_dwell_seconds(when) + batch.ended_at = when + + if batch.dwell_seconds >= self.settings.max_dwell_seconds: + batch.state = "pending_disposal" + batch.pending_since = when + batch.disposal_deadline = when + self.settings.trash_confirmation_window + self.pending_disposal.append(batch) + return self._event("batch_pending_disposal", when, batch) + + batch.state = "consumed" + self.closed_batches.append(batch) + return self._event("batch_consumed", when, batch) + + def _mark_mixed_batch( + self, + zone_id: str, + previous_count: int, + current_count: int, + when: datetime, + ) -> dict[str, Any] | None: + batch = self.active_by_zone.get(zone_id) + if batch is None: + return None + if "mixed_batch" in batch.violation_reasons: + return None + batch.state = "violation" + batch.violation_reasons.add("mixed_batch") + return self._event( + "mixed_batch_violation", + when, + batch, + previous_count=previous_count, + current_count=current_count, + reason="food_added_before_zone_cleared", + ) + + def _mark_pending_as_returned(self, when: datetime, appeared_zones: list[str]) -> list[dict[str, Any]]: + events: list[dict[str, Any]] = [] + pending = list(self.pending_disposal) + self.pending_disposal.clear() + for batch in pending: + batch.state = "violation" + batch.violation_reasons.add("overdue_return") + self.closed_batches.append(batch) + events.append( + self._event( + "overdue_return_violation", + when, + batch, + appeared_zones=appeared_zones, + reason="overdue_batch_reappeared_before_disposal", + ) + ) + return events + + def _apply_trash_deposits(self, when: datetime, deposit_count: int) -> list[dict[str, Any]]: + events: list[dict[str, Any]] = [] + while deposit_count > 0 and self.pending_disposal: + batch = self.pending_disposal.pop(0) + batch.state = "discarded" + self.closed_batches.append(batch) + events.append(self._event("batch_discarded", when, batch)) + deposit_count -= 1 + return events + + def _expire_pending_disposal(self, when: datetime) -> list[dict[str, Any]]: + events: list[dict[str, Any]] = [] + still_pending: list[Batch] = [] + for batch in self.pending_disposal: + if batch.disposal_deadline is not None and when > batch.disposal_deadline: + batch.state = "violation" + batch.violation_reasons.add("missing_disposal") + self.closed_batches.append(batch) + events.append( + self._event( + "missing_disposal_violation", + when, + batch, + reason="trash_deposit_not_observed_before_deadline", + ) + ) + else: + still_pending.append(batch) + self.pending_disposal = still_pending + return events + + def _event(self, event_name: str, when: datetime, batch: Batch, **extra: Any) -> dict[str, Any]: + payload: dict[str, Any] = { + "event": event_name, + "ts": when.isoformat(), + "camera_id": self.settings.camera_id, + "zone_id": batch.zone_id, + "batch_id": batch.batch_id, + "state": batch.state, + "started_at": batch.started_at.isoformat(), + "dwell_seconds": batch.current_dwell_seconds(when), + } + if batch.ended_at is not None: + payload["ended_at"] = batch.ended_at.isoformat() + if batch.disposal_deadline is not None: + payload["disposal_deadline"] = batch.disposal_deadline.isoformat() + if batch.violation_reasons: + payload["violation_reasons"] = sorted(batch.violation_reasons) + payload.update(extra) + return payload diff --git a/src/cold_display_guard/models.py b/src/cold_display_guard/models.py new file mode 100644 index 0000000..05c5d7a --- /dev/null +++ b/src/cold_display_guard/models.py @@ -0,0 +1,68 @@ +from __future__ import annotations + +from dataclasses import dataclass, field +from datetime import datetime, timedelta +from typing import Any + + +DEFAULT_ZONE_IDS = tuple(f"r{row}c{col}" for row in range(1, 3) for col in range(1, 5)) + + +@dataclass(frozen=True, slots=True) +class EngineSettings: + camera_id: str = "cold_display_cam_01" + max_dwell_seconds: int = 10_800 + trash_confirmation_seconds: int = 120 + zone_ids: tuple[str, ...] = DEFAULT_ZONE_IDS + + @property + def max_dwell(self) -> timedelta: + return timedelta(seconds=self.max_dwell_seconds) + + @property + def trash_confirmation_window(self) -> timedelta: + return timedelta(seconds=self.trash_confirmation_seconds) + + +@dataclass(frozen=True, slots=True) +class Observation: + ts: datetime + zone_counts: dict[str, int] + trash_deposit_count: int = 0 + + @classmethod + def from_dict(cls, payload: dict[str, Any]) -> "Observation": + ts = payload["ts"] + if isinstance(ts, str): + ts = datetime.fromisoformat(ts) + + raw_trash = payload.get("trash_deposit_count", payload.get("trash_deposit", 0)) + if isinstance(raw_trash, bool): + trash_deposit_count = 1 if raw_trash else 0 + else: + trash_deposit_count = int(raw_trash) + + return cls( + ts=ts, + zone_counts={key: max(0, int(value)) for key, value in payload["zone_counts"].items()}, + trash_deposit_count=max(0, trash_deposit_count), + ) + + +@dataclass(slots=True) +class Batch: + batch_id: str + zone_id: str + started_at: datetime + last_count: int + state: str = "active" + ended_at: datetime | None = None + pending_since: datetime | None = None + disposal_deadline: datetime | None = None + dwell_seconds: int = 0 + violation_reasons: set[str] = field(default_factory=set) + + def current_dwell_seconds(self, when: datetime) -> int: + if self.ended_at is not None: + return self.dwell_seconds + return max(0, int((when - self.started_at).total_seconds())) diff --git a/task_plan.md b/task_plan.md new file mode 100644 index 0000000..74aae75 --- /dev/null +++ b/task_plan.md @@ -0,0 +1,31 @@ +# Cold Display Guard Task Plan + +## Goal + +Create an independent git project under `~/Code` for monitoring food batches in a refrigerated display cabinet. The system tracks each configured display zone, starts a batch timer when food appears, ends it when the zone clears, and raises compliance alerts for over-3-hour removal without trash disposal or for over-3-hour food being put back. + +## Confirmed Decisions + +- The trash bin is visible in the same camera frame. +- The display cabinet starts as a 4-column by 2-row layout, but zones must be configurable. +- A zone may contain multiple food items. +- Items in the same zone are treated as one batch. +- Mixed batches are not allowed; a zone must clear before a new batch can start. +- The first implementation is a standalone project, not a modification of `store_dwell_alert`. + +## Phases + +| Phase | Status | Notes | +| --- | --- | --- | +| Create project skeleton | complete | Built under `~/Code/cold_display_guard`. | +| Write design and implementation plan | complete | Saved in `docs/plans/`. | +| Implement core state engine | complete | Pure Python, no camera dependency. | +| Add config, CLI, README | complete | TOML config and JSONL event CLI skeleton. | +| Add tests | complete | `python3 -m unittest discover -s tests -v` passes. | +| Initialize git and commit | complete | Independent repository created. | + +## Errors Encountered + +| Error | Attempt | Resolution | +| --- | --- | --- | +| Ended batches reported `0` dwell seconds | First `unittest` run | Calculate dwell seconds before assigning `ended_at`. | diff --git a/tests/test_cli.py b/tests/test_cli.py new file mode 100644 index 0000000..5134ace --- /dev/null +++ b/tests/test_cli.py @@ -0,0 +1,59 @@ +from __future__ import annotations + +import io +import json +import tempfile +import unittest +from datetime import datetime, timedelta, timezone +from pathlib import Path + +from cold_display_guard.cli import run_jsonl +from cold_display_guard.models import EngineSettings + + +class CliTests(unittest.TestCase): + def test_processes_jsonl_observations(self) -> None: + settings = EngineSettings( + max_dwell_seconds=10, + trash_confirmation_seconds=5, + zone_ids=("r1c1",), + ) + t0 = datetime(2026, 4, 27, 10, 0, tzinfo=timezone.utc) + + with tempfile.TemporaryDirectory() as tmpdir: + input_path = Path(tmpdir) / "obs.jsonl" + input_path.write_text( + "\n".join( + [ + json.dumps({"ts": t0.isoformat(), "zone_counts": {"r1c1": 1}}), + json.dumps( + { + "ts": (t0 + timedelta(seconds=11)).isoformat(), + "zone_counts": {"r1c1": 0}, + } + ), + json.dumps( + { + "ts": (t0 + timedelta(seconds=12)).isoformat(), + "zone_counts": {"r1c1": 0}, + "trash_deposit": True, + } + ), + ] + ), + encoding="utf-8", + ) + + output = io.StringIO() + exit_code = run_jsonl(input_path, settings, output) + + self.assertEqual(exit_code, 0) + events = [json.loads(line) for line in output.getvalue().splitlines()] + self.assertEqual( + [event["event"] for event in events], + ["batch_started", "batch_pending_disposal", "batch_discarded"], + ) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_config.py b/tests/test_config.py new file mode 100644 index 0000000..ddd1f7b --- /dev/null +++ b/tests/test_config.py @@ -0,0 +1,38 @@ +from __future__ import annotations + +import tempfile +import unittest +from pathlib import Path + +from cold_display_guard.config import load_settings + + +class ConfigTests(unittest.TestCase): + def test_loads_settings_from_toml(self) -> None: + with tempfile.TemporaryDirectory() as tmpdir: + path = Path(tmpdir) / "config.toml" + path.write_text( + """ +camera_id = "cam_a" + +[thresholds] +max_dwell_seconds = 30 +trash_confirmation_seconds = 4 + +[layout] +rows = 1 +cols = 2 +""".strip(), + encoding="utf-8", + ) + + settings = load_settings(path) + + self.assertEqual(settings.camera_id, "cam_a") + self.assertEqual(settings.max_dwell_seconds, 30) + self.assertEqual(settings.trash_confirmation_seconds, 4) + self.assertEqual(settings.zone_ids, ("r1c1", "r1c2")) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_engine.py b/tests/test_engine.py new file mode 100644 index 0000000..db98932 --- /dev/null +++ b/tests/test_engine.py @@ -0,0 +1,98 @@ +from __future__ import annotations + +import unittest +from datetime import datetime, timedelta, timezone + +from cold_display_guard import BatchEngine, EngineSettings, Observation + + +UTC = timezone.utc + + +def obs(ts: datetime, counts: dict[str, int], trash: bool | int = False) -> Observation: + return Observation.from_dict( + { + "ts": ts.isoformat(), + "zone_counts": counts, + "trash_deposit": trash, + } + ) + + +class BatchEngineTests(unittest.TestCase): + def setUp(self) -> None: + self.settings = EngineSettings( + camera_id="test_cam", + max_dwell_seconds=10, + trash_confirmation_seconds=5, + zone_ids=("r1c1", "r1c2"), + ) + self.engine = BatchEngine(self.settings) + self.t0 = datetime(2026, 4, 27, 10, 0, tzinfo=UTC) + + def test_starts_batch_when_zone_becomes_occupied(self) -> None: + events = self.engine.process(obs(self.t0, {"r1c1": 3})) + + self.assertEqual([event["event"] for event in events], ["batch_started"]) + self.assertEqual(events[0]["zone_id"], "r1c1") + self.assertEqual(events[0]["current_count"], 3) + + def test_consumes_batch_when_removed_before_threshold(self) -> None: + self.engine.process(obs(self.t0, {"r1c1": 2})) + events = self.engine.process(obs(self.t0 + timedelta(seconds=9), {"r1c1": 0})) + + self.assertEqual([event["event"] for event in events], ["batch_consumed"]) + self.assertEqual(events[0]["dwell_seconds"], 9) + + def test_over_threshold_removal_waits_for_disposal_confirmation(self) -> None: + self.engine.process(obs(self.t0, {"r1c1": 2})) + events = self.engine.process(obs(self.t0 + timedelta(seconds=10), {"r1c1": 0})) + + self.assertEqual([event["event"] for event in events], ["batch_pending_disposal"]) + self.assertEqual(events[0]["dwell_seconds"], 10) + self.assertIn("disposal_deadline", events[0]) + + def test_trash_deposit_confirms_pending_disposal(self) -> None: + self.engine.process(obs(self.t0, {"r1c1": 2})) + self.engine.process(obs(self.t0 + timedelta(seconds=11), {"r1c1": 0})) + events = self.engine.process(obs(self.t0 + timedelta(seconds=12), {"r1c1": 0}, trash=True)) + + self.assertEqual([event["event"] for event in events], ["batch_discarded"]) + + def test_missing_trash_deposit_raises_violation_after_deadline(self) -> None: + self.engine.process(obs(self.t0, {"r1c1": 2})) + self.engine.process(obs(self.t0 + timedelta(seconds=11), {"r1c1": 0})) + events = self.engine.process(obs(self.t0 + timedelta(seconds=17), {"r1c1": 0})) + + self.assertEqual([event["event"] for event in events], ["missing_disposal_violation"]) + self.assertEqual(events[0]["violation_reasons"], ["missing_disposal"]) + + def test_adding_food_before_zone_clears_raises_mixed_batch_violation(self) -> None: + self.engine.process(obs(self.t0, {"r1c1": 2})) + events = self.engine.process(obs(self.t0 + timedelta(seconds=1), {"r1c1": 3})) + + self.assertEqual([event["event"] for event in events], ["mixed_batch_violation"]) + self.assertEqual(events[0]["reason"], "food_added_before_zone_cleared") + + def test_count_decrease_keeps_same_batch_active(self) -> None: + self.engine.process(obs(self.t0, {"r1c1": 3})) + events = self.engine.process(obs(self.t0 + timedelta(seconds=1), {"r1c1": 1})) + + self.assertEqual([event["event"] for event in events], ["batch_count_changed"]) + self.assertEqual(events[0]["previous_count"], 3) + self.assertEqual(events[0]["current_count"], 1) + + def test_overdue_food_reappearing_before_disposal_raises_violation(self) -> None: + self.engine.process(obs(self.t0, {"r1c1": 2})) + self.engine.process(obs(self.t0 + timedelta(seconds=11), {"r1c1": 0})) + events = self.engine.process(obs(self.t0 + timedelta(seconds=12), {"r1c2": 1})) + + self.assertEqual( + [event["event"] for event in events], + ["overdue_return_violation", "batch_started"], + ) + self.assertEqual(events[0]["appeared_zones"], ["r1c2"]) + + +if __name__ == "__main__": + unittest.main()