diff --git a/Dockerfile b/Dockerfile index 4b3969a..a04a2b3 100644 --- a/Dockerfile +++ b/Dockerfile @@ -11,6 +11,7 @@ RUN sed -i 's|http://deb.debian.org/debian|http://mirrors.aliyun.com/debian|g; s apt-get update && apt-get install -y --no-install-recommends \ ca-certificates \ ffmpeg \ + fonts-noto-cjk \ tzdata \ && rm -rf /var/lib/apt/lists/* diff --git a/config/example.toml b/config/example.toml index 0b55c2a..cd688d2 100644 --- a/config/example.toml +++ b/config/example.toml @@ -5,7 +5,9 @@ timezone = "Asia/Shanghai" rtsp_url = "" [thresholds] +pre_warning_seconds = 900 max_dwell_seconds = 1200 +alarm_removal_seconds = 1800 trash_confirmation_seconds = 120 [layout] diff --git a/src/cold_display_guard/alarm_snapshots.py b/src/cold_display_guard/alarm_snapshots.py index 179ccc5..c48326d 100644 --- a/src/cold_display_guard/alarm_snapshots.py +++ b/src/cold_display_guard/alarm_snapshots.py @@ -33,10 +33,46 @@ class AlarmSnapshotSettings: @dataclass(frozen=True, slots=True) class CalibrationOverlayRegion: polygon: tuple[tuple[float, float], ...] + label: str outline_rgb: tuple[int, int, int] fill_rgb: tuple[int, int, int] +@dataclass(frozen=True, slots=True) +class OverlayLabel: + text: str + fallback_text: str + x: int + y: int + accent_rgb: tuple[int, int, int] + + +ZONE_OVERLAY_PALETTE: tuple[tuple[int, int, int], ...] = ( + (255, 196, 0), + (0, 190, 255), + (112, 224, 96), + (255, 128, 32), + (180, 128, 255), + (0, 220, 180), + (255, 96, 176), + (192, 224, 48), + (96, 144, 255), + (255, 96, 96), +) +TRASH_OVERLAY_RGB = (255, 64, 64) +LABEL_TEXT_RGB = (255, 255, 255) +LABEL_SHADOW_RGB = (0, 0, 0) +CJK_FONT_CANDIDATES = ( + Path("/usr/share/fonts/opentype/noto/NotoSansCJK-Regular.ttc"), + Path("/usr/share/fonts/opentype/noto/NotoSansCJK-Regular.otf"), + Path("/usr/share/fonts/truetype/noto/NotoSansCJK-Regular.ttc"), + Path("/usr/share/fonts/truetype/arphic/uming.ttc"), + Path("/System/Library/Fonts/PingFang.ttc"), + Path("/System/Library/Fonts/STHeiti Light.ttc"), + Path("/Library/Fonts/Arial Unicode.ttf"), +) + + class SnapshotUploadError(RuntimeError): pass @@ -108,27 +144,44 @@ def apply_calibration_overlay(frame: Frame, config: dict[str, Any]) -> Frame: if len(rgb) != frame.width * frame.height * 3: return frame + labels: list[OverlayLabel] = [] for region in regions: polygon = normalized_polygon_to_pixels(region.polygon, frame.width, frame.height) if len(polygon) < 3: continue fill_polygon(rgb, frame.width, frame.height, polygon, region.fill_rgb, alpha=0.24) draw_polygon_outline(rgb, frame.width, frame.height, polygon, region.outline_rgb, alpha=0.85) + label = label_for_polygon(polygon, region.label, frame.width, frame.height, region.outline_rgb) + if label is not None: + labels.append(label) + if labels and not render_region_labels_with_ffmpeg(rgb, frame.width, frame.height, labels): + for label in labels: + draw_region_label( + rgb, + frame.width, + frame.height, + label.x, + label.y, + label.fallback_text, + label.accent_rgb, + ) return Frame(width=frame.width, height=frame.height, rgb=bytes(rgb)) def load_calibration_overlay_regions(config: dict[str, Any]) -> list[CalibrationOverlayRegion]: regions: list[CalibrationOverlayRegion] = [] - for zone in config.get("zones", []): + for index, zone in enumerate(config.get("zones", [])): if not isinstance(zone, dict): continue polygon = normalize_overlay_polygon(zone.get("polygon", [])) if len(polygon) >= 3: + color = ZONE_OVERLAY_PALETTE[index % len(ZONE_OVERLAY_PALETTE)] regions.append( CalibrationOverlayRegion( polygon=polygon, - outline_rgb=(255, 196, 0), - fill_rgb=(255, 196, 0), + label=overlay_region_label(zone, fallback=f"区域 {index + 1}"), + outline_rgb=color, + fill_rgb=color, ) ) @@ -139,13 +192,22 @@ def load_calibration_overlay_regions(config: dict[str, Any]) -> list[Calibration regions.append( CalibrationOverlayRegion( polygon=polygon, - outline_rgb=(255, 64, 64), - fill_rgb=(255, 64, 64), + label=overlay_region_label(trash, fallback="垃圾区"), + outline_rgb=TRASH_OVERLAY_RGB, + fill_rgb=TRASH_OVERLAY_RGB, ) ) return regions +def overlay_region_label(payload: dict[str, Any], *, fallback: str) -> str: + for key in ("label", "name", "id"): + value = str(payload.get(key, "")).strip() + if value: + return value + return fallback + + def normalize_overlay_polygon(value: object) -> tuple[tuple[float, float], ...]: points: list[tuple[float, float]] = [] if not isinstance(value, list | tuple): @@ -206,6 +268,664 @@ def draw_polygon_outline( previous = point +def label_for_polygon( + polygon: tuple[tuple[int, int], ...], + text: str, + width: int, + height: int, + accent_rgb: tuple[int, int, int], +) -> OverlayLabel | None: + label_text = text.strip() + fallback_text = fallback_label_text(label_text) + if not label_text or not fallback_text or width < 12 or height < 10: + return None + min_x = max(0, min(point[0] for point in polygon)) + min_y = max(0, min(point[1] for point in polygon)) + max_x = min(width - 1, max(point[0] for point in polygon)) + max_y = min(height - 1, max(point[1] for point in polygon)) + if max_x - min_x + 1 < 4 or max_y - min_y + 1 < 4: + return None + x = min(max(0, min_x + 2), max(0, width - 8)) + y = min(max(0, min_y + 2), max(0, height - 8)) + return OverlayLabel( + text=label_text, + fallback_text=fallback_text, + x=x, + y=y, + accent_rgb=accent_rgb, + ) + + +def render_region_labels_with_ffmpeg( + rgb: bytearray, + width: int, + height: int, + labels: list[OverlayLabel], +) -> bool: + font_file = find_cjk_font_file() + if font_file is None: + return False + filter_text = build_drawtext_filter(labels, font_file, height) + if not filter_text: + return False + command = [ + "ffmpeg", + "-hide_banner", + "-loglevel", + "error", + "-f", + "rawvideo", + "-pix_fmt", + "rgb24", + "-s", + f"{width}x{height}", + "-i", + "-", + "-vf", + filter_text, + "-frames:v", + "1", + "-f", + "rawvideo", + "-pix_fmt", + "rgb24", + "-", + ] + try: + result = subprocess.run( + command, + input=bytes(rgb), + check=False, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + timeout=3, + ) + except (FileNotFoundError, subprocess.TimeoutExpired, OSError): + return False + expected_size = width * height * 3 + if result.returncode != 0 or len(result.stdout) != expected_size: + return False + rgb[:] = result.stdout + return True + + +def build_drawtext_filter(labels: list[OverlayLabel], font_file: Path, height: int) -> str: + font_size = max(14, min(30, round(height * 0.052))) + filters: list[str] = [] + for label in labels: + text = escape_drawtext_value(label.text) + font = escape_drawtext_value(str(font_file)) + color = rgb_to_hex(LABEL_TEXT_RGB) + box_color = "black@0.62" + filters.append( + "drawtext=" + f"fontfile='{font}':" + f"text='{text}':" + f"x={label.x}:" + f"y={label.y}:" + f"fontsize={font_size}:" + f"fontcolor={color}:" + "box=1:" + f"boxcolor={box_color}:" + "boxborderw=4" + ) + return ",".join(filters) + + +def escape_drawtext_value(value: str) -> str: + escaped = value.replace("\\", "\\\\") + for char in (":", "'", ",", "[", "]", "%"): + escaped = escaped.replace(char, f"\\{char}") + return escaped + + +def rgb_to_hex(color: tuple[int, int, int]) -> str: + return f"0x{color[0]:02x}{color[1]:02x}{color[2]:02x}" + + +def find_cjk_font_file() -> Path | None: + for candidate in CJK_FONT_CANDIDATES: + if candidate.exists(): + return candidate + try: + result = subprocess.run( + ["fc-match", "-f", "%{file}", ":lang=zh-cn"], + check=False, + stdout=subprocess.PIPE, + stderr=subprocess.DEVNULL, + timeout=1, + text=True, + ) + except (FileNotFoundError, subprocess.TimeoutExpired, OSError): + return None + font_path = Path(result.stdout.strip()) + if font_path.exists() and any(marker in font_path.name.lower() for marker in ("noto", "cjk", "hei", "song", "fang")): + return font_path + return None + + +def fallback_label_text(label: str) -> str: + text = label.strip() + if not text: + return "" + if "垃圾" in text: + return "TRASH" + digits = "".join(char for char in text if char.isdigit()) + if digits: + return f"R{digits}" + ascii_text = "".join(char if char.isascii() and (char.isalnum() or char in {"-", "_"}) else "" for char in text) + return ascii_text[:12] or "ZONE" + + +def draw_region_label( + rgb: bytearray, + width: int, + height: int, + x: int, + y: int, + label: str, + accent_rgb: tuple[int, int, int], +) -> None: + text = label.strip() + if not text or width < 12 or height < 10: + return + glyphs = text_to_glyphs(text) + if not glyphs: + return + text_width = sum(len(glyph[0]) for glyph in glyphs) + max(0, len(glyphs) - 1) + text_height = max(len(glyph) for glyph in glyphs) + box_width = text_width + 4 + box_height = text_height + 4 + x = min(max(0, x), max(0, width - box_width - 1)) + y = min(max(0, y), max(0, height - box_height - 1)) + box_width = min(box_width, width - x) + box_height = min(box_height, height - y) + if box_width < 6 or box_height < 6: + return + fill_rect(rgb, width, height, x, y, box_width, box_height, LABEL_SHADOW_RGB, alpha=0.62) + draw_rect_outline(rgb, width, height, x, y, box_width, box_height, accent_rgb, alpha=0.9) + draw_text(rgb, width, height, x + 2, y + 2, glyphs, LABEL_TEXT_RGB, LABEL_SHADOW_RGB) + + +def fill_rect( + rgb: bytearray, + width: int, + height: int, + x: int, + y: int, + rect_width: int, + rect_height: int, + color: tuple[int, int, int], + *, + alpha: float, +) -> None: + for yy in range(max(0, y), min(height, y + rect_height)): + for xx in range(max(0, x), min(width, x + rect_width)): + blend_pixel(rgb, width, xx, yy, color, alpha) + + +def draw_rect_outline( + rgb: bytearray, + width: int, + height: int, + x: int, + y: int, + rect_width: int, + rect_height: int, + color: tuple[int, int, int], + *, + alpha: float, +) -> None: + if rect_width <= 0 or rect_height <= 0: + return + x1 = min(width - 1, x + rect_width - 1) + y1 = min(height - 1, y + rect_height - 1) + draw_line(rgb, width, height, (x, y), (x1, y), color, alpha=alpha) + draw_line(rgb, width, height, (x1, y), (x1, y1), color, alpha=alpha) + draw_line(rgb, width, height, (x1, y1), (x, y1), color, alpha=alpha) + draw_line(rgb, width, height, (x, y1), (x, y), color, alpha=alpha) + + +def draw_text( + rgb: bytearray, + width: int, + height: int, + x: int, + y: int, + glyphs: list[tuple[str, ...]], + color: tuple[int, int, int], + shadow: tuple[int, int, int], +) -> None: + cursor_x = x + for glyph in glyphs: + draw_glyph(rgb, width, height, cursor_x + 1, y + 1, glyph, shadow, alpha=0.8) + draw_glyph(rgb, width, height, cursor_x, y, glyph, color, alpha=1.0) + cursor_x += len(glyph[0]) + 1 + + +def draw_glyph( + rgb: bytearray, + width: int, + height: int, + x: int, + y: int, + glyph: tuple[str, ...], + color: tuple[int, int, int], + *, + alpha: float, +) -> None: + for row_index, row in enumerate(glyph): + yy = y + row_index + if yy < 0 or yy >= height: + continue + for col_index, marker in enumerate(row): + if marker != "1": + continue + xx = x + col_index + if 0 <= xx < width: + blend_pixel(rgb, width, xx, yy, color, alpha) + + +def text_to_glyphs(text: str) -> list[tuple[str, ...]]: + glyphs: list[tuple[str, ...]] = [] + for char in text[:12]: + glyph = GLYPHS.get(char.upper(), GLYPHS.get(char)) + if glyph is not None: + glyphs.append(glyph) + return glyphs + + +GLYPHS: dict[str, tuple[str, ...]] = { + " ": ( + "000", + "000", + "000", + "000", + "000", + "000", + "000", + ), + "0": ( + "111", + "101", + "101", + "101", + "101", + "101", + "111", + ), + "1": ( + "010", + "110", + "010", + "010", + "010", + "010", + "111", + ), + "2": ( + "111", + "001", + "001", + "111", + "100", + "100", + "111", + ), + "3": ( + "111", + "001", + "001", + "111", + "001", + "001", + "111", + ), + "4": ( + "101", + "101", + "101", + "111", + "001", + "001", + "001", + ), + "5": ( + "111", + "100", + "100", + "111", + "001", + "001", + "111", + ), + "6": ( + "111", + "100", + "100", + "111", + "101", + "101", + "111", + ), + "7": ( + "111", + "001", + "001", + "010", + "010", + "100", + "100", + ), + "8": ( + "111", + "101", + "101", + "111", + "101", + "101", + "111", + ), + "9": ( + "111", + "101", + "101", + "111", + "001", + "001", + "111", + ), + "A": ( + "010", + "101", + "101", + "111", + "101", + "101", + "101", + ), + "B": ( + "110", + "101", + "101", + "110", + "101", + "101", + "110", + ), + "C": ( + "111", + "100", + "100", + "100", + "100", + "100", + "111", + ), + "D": ( + "110", + "101", + "101", + "101", + "101", + "101", + "110", + ), + "E": ( + "111", + "100", + "100", + "110", + "100", + "100", + "111", + ), + "F": ( + "111", + "100", + "100", + "110", + "100", + "100", + "100", + ), + "G": ( + "111", + "100", + "100", + "101", + "101", + "101", + "111", + ), + "H": ( + "101", + "101", + "101", + "111", + "101", + "101", + "101", + ), + "I": ( + "111", + "010", + "010", + "010", + "010", + "010", + "111", + ), + "J": ( + "001", + "001", + "001", + "001", + "101", + "101", + "111", + ), + "K": ( + "101", + "101", + "110", + "100", + "110", + "101", + "101", + ), + "L": ( + "100", + "100", + "100", + "100", + "100", + "100", + "111", + ), + "M": ( + "101", + "111", + "111", + "101", + "101", + "101", + "101", + ), + "N": ( + "101", + "111", + "111", + "111", + "101", + "101", + "101", + ), + "O": ( + "111", + "101", + "101", + "101", + "101", + "101", + "111", + ), + "P": ( + "111", + "101", + "101", + "111", + "100", + "100", + "100", + ), + "Q": ( + "111", + "101", + "101", + "101", + "111", + "001", + "001", + ), + "R": ( + "111", + "101", + "101", + "111", + "110", + "101", + "101", + ), + "S": ( + "111", + "100", + "100", + "111", + "001", + "001", + "111", + ), + "T": ( + "111", + "010", + "010", + "010", + "010", + "010", + "010", + ), + "U": ( + "101", + "101", + "101", + "101", + "101", + "101", + "111", + ), + "V": ( + "101", + "101", + "101", + "101", + "101", + "101", + "010", + ), + "W": ( + "101", + "101", + "101", + "101", + "111", + "111", + "101", + ), + "X": ( + "101", + "101", + "101", + "010", + "101", + "101", + "101", + ), + "Y": ( + "101", + "101", + "101", + "010", + "010", + "010", + "010", + ), + "Z": ( + "111", + "001", + "001", + "010", + "100", + "100", + "111", + ), + "-": ( + "000", + "000", + "000", + "111", + "000", + "000", + "000", + ), + "_": ( + "000", + "000", + "000", + "000", + "000", + "000", + "111", + ), + "区": ( + "1111111", + "1000001", + "1011001", + "1001011", + "1011001", + "1000001", + "1111111", + ), + "域": ( + "0010010", + "1111111", + "0010010", + "1111011", + "0010110", + "1111011", + "0010101", + ), + "垃": ( + "0100100", + "1111111", + "0100100", + "0101110", + "0100100", + "1100100", + "0111110", + ), + "圾": ( + "0101110", + "1110010", + "0100010", + "0101110", + "0101010", + "1101010", + "0110011", + ), +} + + def draw_line( rgb: bytearray, width: int, diff --git a/src/cold_display_guard/cases.py b/src/cold_display_guard/cases.py index 34aef7f..38ea2fc 100644 --- a/src/cold_display_guard/cases.py +++ b/src/cold_display_guard/cases.py @@ -8,15 +8,19 @@ from typing import Any EVENT_CASE_TYPES = { + "time_pre_warning": "pre_warning", "time_alarm": "time_alarm", "batch_pending_disposal": "pending_disposal", + "alarm_removal_timeout": "alarm_removal_timeout", "warning_escalated": "warning_escalated", } CASE_PRIORITY = { - "time_alarm": 1, - "pending_disposal": 2, - "warning_escalated": 3, + "pre_warning": 1, + "time_alarm": 2, + "pending_disposal": 3, + "alarm_removal_timeout": 4, + "warning_escalated": 5, } @@ -139,10 +143,11 @@ class CaseStore: case_id = build_case_id(batch_id) existing = self._cases.get(case_id) - if event_name == "batch_discarded": + if event_name in {"batch_discarded", "pre_warning_handled"}: if existing is None or existing.case_status == "handled": return None - return self._close_case(existing, when, handled_source="auto_closed") + handled_source = "auto_removed_before_alarm" if event_name == "pre_warning_handled" else "auto_closed" + return self._close_case(existing, when, handled_source=handled_source) case_type = EVENT_CASE_TYPES.get(event_name) if case_type is None: diff --git a/src/cold_display_guard/config.py b/src/cold_display_guard/config.py index 1de5a08..db874fb 100644 --- a/src/cold_display_guard/config.py +++ b/src/cold_display_guard/config.py @@ -23,7 +23,9 @@ def load_settings(path: str | Path) -> EngineSettings: return EngineSettings( camera_id=str(data.get("camera_id", "cold_display_cam_01")), + pre_warning_seconds=int(thresholds.get("pre_warning_seconds", 0)), max_dwell_seconds=int(thresholds.get("max_dwell_seconds", 10_800)), + alarm_removal_seconds=int(thresholds.get("alarm_removal_seconds", 0)), trash_confirmation_seconds=int(thresholds.get("trash_confirmation_seconds", 120)), zone_ids=zone_ids, ) @@ -135,7 +137,9 @@ def format_config_document(data: dict[str, Any]) -> str: thresholds = data.get("thresholds", {}) lines.append("[thresholds]") + lines.append(f'pre_warning_seconds = {int(thresholds.get("pre_warning_seconds", 0))}') lines.append(f'max_dwell_seconds = {int(thresholds.get("max_dwell_seconds", 10_800))}') + lines.append(f'alarm_removal_seconds = {int(thresholds.get("alarm_removal_seconds", 0))}') lines.append(f'trash_confirmation_seconds = {int(thresholds.get("trash_confirmation_seconds", 120))}') lines.append("") diff --git a/src/cold_display_guard/engine.py b/src/cold_display_guard/engine.py index 77ac601..acf50d4 100644 --- a/src/cold_display_guard/engine.py +++ b/src/cold_display_guard/engine.py @@ -34,7 +34,9 @@ class BatchEngine: if appeared_zones and self.pending_disposal: events.extend(self._mark_pending_as_returned(observation.ts, appeared_zones)) + events.extend(self._apply_pre_warnings(observation.ts, previous_zone_counts)) events.extend(self._apply_time_alarms(observation.ts, previous_zone_counts)) + events.extend(self._apply_alarm_removal_timeouts(observation.ts)) pending_count_before_zone_transitions = len(self.pending_disposal) for zone_id, new_count in zone_counts.items(): @@ -99,7 +101,14 @@ class BatchEngine: if zone_id not in self._zone_counts: continue event_name = str(event.get("event", "")) - if event_name in {"batch_started", "batch_count_changed", "mixed_batch_violation", "time_alarm"}: + if event_name in { + "batch_started", + "batch_count_changed", + "mixed_batch_violation", + "time_pre_warning", + "time_alarm", + "alarm_removal_timeout", + }: if active_zone_counts is not None and active_counts.get(zone_id, 0) <= 0: self.active_by_zone.pop(zone_id, None) self._zone_counts[zone_id] = 0 @@ -131,9 +140,19 @@ class BatchEngine: last_count=max(1, int(event.get("current_count", 1) or 1)), state=str(event.get("state", "active") or "active"), ) + batch.pre_warned_at = parse_event_datetime(event.get("pre_warned_at")) batch.alerted_at = parse_event_datetime(event.get("alerted_at")) if batch.alerted_at is not None: batch.state = "alerted" + if self.settings.alarm_removal_seconds > 0: + batch.alarm_removal_deadline = parse_event_datetime(event.get("alarm_removal_deadline")) + if batch.alarm_removal_deadline is None: + batch.alarm_removal_deadline = batch.alerted_at + self.settings.alarm_removal_window + elif batch.pre_warned_at is not None: + batch.state = "pre_warning" + batch.alarm_removal_timed_out_at = parse_event_datetime(event.get("alarm_removal_timed_out_at")) + if batch.alarm_removal_timed_out_at is not None: + batch.state = "alarm_removal_timeout" batch.dwell_seconds = max(0, int(event.get("dwell_seconds", 0) or 0)) return batch @@ -162,10 +181,44 @@ class BatchEngine: self.pending_disposal.append(batch) return self._event("batch_pending_disposal", when, batch, severity="warning") + if batch.pre_warned_at is not None: + batch.state = "handled" + self.closed_batches.append(batch) + return self._event( + "pre_warning_handled", + when, + batch, + severity="info", + handled_source="auto_removed_before_alarm", + ) + batch.state = "consumed" self.closed_batches.append(batch) return self._event("batch_consumed", when, batch, severity="info") + def _apply_pre_warnings(self, when: datetime, zone_counts: dict[str, int]) -> list[dict[str, Any]]: + if self.settings.pre_warning_seconds <= 0: + return [] + events: list[dict[str, Any]] = [] + for zone_id, batch in self.active_by_zone.items(): + if batch.pre_warned_at is not None or batch.alerted_at is not None: + continue + dwell_seconds = batch.current_dwell_seconds(when) + if dwell_seconds < self.settings.pre_warning_seconds: + continue + batch.state = "pre_warning" + batch.pre_warned_at = when + events.append( + self._event( + "time_pre_warning", + when, + batch, + severity="warning", + current_count=zone_counts.get(zone_id, batch.last_count), + ) + ) + return events + def _apply_time_alarms(self, when: datetime, zone_counts: dict[str, int]) -> list[dict[str, Any]]: events: list[dict[str, Any]] = [] for zone_id, batch in self.active_by_zone.items(): @@ -176,6 +229,8 @@ class BatchEngine: continue batch.state = "alerted" batch.alerted_at = when + if self.settings.alarm_removal_seconds > 0: + batch.alarm_removal_deadline = when + self.settings.alarm_removal_window events.append( self._event( "time_alarm", @@ -187,6 +242,30 @@ class BatchEngine: ) return events + def _apply_alarm_removal_timeouts(self, when: datetime) -> list[dict[str, Any]]: + if self.settings.alarm_removal_seconds <= 0: + return [] + events: list[dict[str, Any]] = [] + for batch in self.active_by_zone.values(): + if batch.alerted_at is None or batch.alarm_removal_deadline is None: + continue + if batch.alarm_removal_timed_out_at is not None: + continue + if when <= batch.alarm_removal_deadline: + continue + batch.state = "alarm_removal_timeout" + batch.alarm_removal_timed_out_at = when + events.append( + self._event( + "alarm_removal_timeout", + when, + batch, + severity="alarm", + reason="alarmed_batch_not_removed_after_alarm_window", + ) + ) + return events + def _mark_mixed_batch( self, zone_id: str, @@ -273,13 +352,21 @@ class BatchEngine: "state": batch.state, "started_at": batch.started_at.isoformat(), "dwell_seconds": batch.current_dwell_seconds(when), + "pre_warning_seconds": self.settings.pre_warning_seconds, "max_dwell_seconds": self.settings.max_dwell_seconds, + "alarm_removal_seconds": self.settings.alarm_removal_seconds, } zone_index = self._zone_index(batch.zone_id) if zone_index is not None: payload["zone_index"] = zone_index + if batch.pre_warned_at is not None: + payload["pre_warned_at"] = batch.pre_warned_at.isoformat() if batch.alerted_at is not None: payload["alerted_at"] = batch.alerted_at.isoformat() + if batch.alarm_removal_deadline is not None: + payload["alarm_removal_deadline"] = batch.alarm_removal_deadline.isoformat() + if batch.alarm_removal_timed_out_at is not None: + payload["alarm_removal_timed_out_at"] = batch.alarm_removal_timed_out_at.isoformat() if batch.ended_at is not None: payload["ended_at"] = batch.ended_at.isoformat() if batch.disposal_deadline is not None: @@ -290,9 +377,9 @@ class BatchEngine: return payload def _event_severity(self, event_name: str) -> str: - if event_name == "time_alarm": + if event_name in {"time_alarm", "alarm_removal_timeout"}: return "alarm" - if event_name in {"warning_escalated", "batch_pending_disposal"}: + if event_name in {"warning_escalated", "batch_pending_disposal", "time_pre_warning"}: return "warning" if event_name.endswith("_violation"): return "warning" diff --git a/src/cold_display_guard/models.py b/src/cold_display_guard/models.py index e9e2c21..1035c9b 100644 --- a/src/cold_display_guard/models.py +++ b/src/cold_display_guard/models.py @@ -11,14 +11,24 @@ DEFAULT_ZONE_IDS = tuple(f"r{row}c{col}" for row in range(1, 3) for col in range @dataclass(frozen=True, slots=True) class EngineSettings: camera_id: str = "cold_display_cam_01" + pre_warning_seconds: int = 0 max_dwell_seconds: int = 10_800 + alarm_removal_seconds: int = 0 trash_confirmation_seconds: int = 120 zone_ids: tuple[str, ...] = DEFAULT_ZONE_IDS + @property + def pre_warning(self) -> timedelta: + return timedelta(seconds=self.pre_warning_seconds) + @property def max_dwell(self) -> timedelta: return timedelta(seconds=self.max_dwell_seconds) + @property + def alarm_removal_window(self) -> timedelta: + return timedelta(seconds=self.alarm_removal_seconds) + @property def trash_confirmation_window(self) -> timedelta: return timedelta(seconds=self.trash_confirmation_seconds) @@ -56,7 +66,10 @@ class Batch: started_at: datetime last_count: int state: str = "active" + pre_warned_at: datetime | None = None alerted_at: datetime | None = None + alarm_removal_deadline: datetime | None = None + alarm_removal_timed_out_at: datetime | None = None ended_at: datetime | None = None pending_since: datetime | None = None disposal_deadline: datetime | None = None diff --git a/src/cold_display_guard/webhooks.py b/src/cold_display_guard/webhooks.py index fe693bc..0b70b34 100644 --- a/src/cold_display_guard/webhooks.py +++ b/src/cold_display_guard/webhooks.py @@ -145,6 +145,7 @@ def build_batch_event_payload( batch_id = str(event.get("batch_id", "")) event_name = str(event.get("event", "")) ts = str(event.get("ts", "")) + pre_warned_at = str(event.get("pre_warned_at", "")) alerted_at = str(event.get("alerted_at", "")) ended_at = str(event.get("ended_at", "")) payload = { @@ -165,7 +166,8 @@ def build_batch_event_payload( "dwell_seconds": event.get("dwell_seconds", ""), "is_discarded": event_name == "batch_discarded", "discarded_at": ts if event_name == "batch_discarded" else "", - "created_at": alerted_at or ts, + "created_at": pre_warned_at or alerted_at or ts, + "pre_warned_at": pre_warned_at, "alerted_at": alerted_at, "alarm_at": alerted_at, "updated_at": ts, @@ -185,6 +187,7 @@ def build_case_event_payload( handled_at = str(snapshot.get("handled_at", "")) handled_source = str(snapshot.get("handled_source", "")) event = snapshot_event(snapshot) + pre_warned_at = str(event.get("pre_warned_at", "")) alerted_at = str(event.get("alerted_at", "")) ended_at = str(event.get("ended_at", "")) discarded = handled_source == "auto_closed" @@ -208,7 +211,8 @@ def build_case_event_payload( "dwell_seconds": event.get("dwell_seconds", ""), "is_discarded": discarded, "discarded_at": handled_at if discarded else "", - "created_at": alerted_at or created_at, + "created_at": pre_warned_at or alerted_at or created_at, + "pre_warned_at": pre_warned_at, "alerted_at": alerted_at, "alarm_at": alerted_at, "updated_at": updated_at, diff --git a/tasks/lessons.md b/tasks/lessons.md index 7090f33..dd4b440 100644 --- a/tasks/lessons.md +++ b/tasks/lessons.md @@ -5,3 +5,9 @@ 1. 对每个用户指定的 Webhook 路径,先在目标主机上用与真实请求接近的 `POST` 探测并记录状态码。 2. 如果存在多个相似路径,只能在验证过用户指定路径不可用后,才考虑回退到其它路径。 3. 切换远端配置前,先确认发送端容器对目标主机名或 IP 实际可达,避免写入不可解析的地址。 + +- 2026-06-15: 告警截图叠加中文区域名时,不能依赖默认西文字体或手写点阵字形;这会在现场截图里表现为乱码或不可读文字。 + Prevention: + 1. 涉及中文截图叠字时,镜像必须安装可验证的 CJK 字体包,并在容器内用 `fc-match :lang=zh-cn` 确认命中 CJK 字体。 + 2. 部署后必须在目标容器内跑一次中文标签叠图烟测,确认真实渲染路径可用,而不只检查像素变化。 + 3. 字体渲染不可用时,回退文本必须转换成可读 ASCII 标识,例如 `区域 1` -> `R1`、`垃圾区` -> `TRASH`,避免继续绘制乱码中文。 diff --git a/tasks/todo.md b/tasks/todo.md index 807275f..1488c89 100644 --- a/tasks/todo.md +++ b/tasks/todo.md @@ -229,3 +229,58 @@ - MQTT probe confirmed `video-recognition` published to `video/cold-display-guard/result/cold-display-guard` with `device_identifier=cold-display-guard`. - `store_data_platform` is not deployed on `10.8.0.23` under that repository name or as an identifiable container; platform handling changes were completed and verified in the local repository. - The cold-display retry queue has no pending entries; old `192.168.5.103` failures are already dead-letter history. + +## Current Task: Alarm Snapshot Labels And Zone Colors + +**Goal:** Uploaded alarm screenshots should show each calibrated region name directly on the image, and different cold-display zones should use different overlay colors. + +**Design:** Extend the existing standard-library overlay path. Keep drawing configured polygons before JPEG upload, but carry a display label for each region, choose a stable color from a fixed palette by zone order, and draw a small high-contrast text label inside the polygon. Keep trash ROI red and labeled separately. + +- [x] Inspect the current calibration overlay helper and tests. +- [x] Add failing tests for per-zone colors and visible region labels. +- [x] Implement labels and stable zone color palette. +- [x] Run snapshot tests and full Python tests. +- [x] Deploy the overlay update to `xiaozheng@10.8.0.23`. +- [x] Verify remote API/runtime health and deployed overlay helper. + +### Review + +- `apply_calibration_overlay` now assigns each cold-display zone a stable color from a fixed palette and keeps the trash ROI red. +- Each overlay region now carries a label and draws a small high-contrast label box directly on the frame before JPEG encoding/upload. +- The built-in label renderer covers common现场 labels such as `区域 1` through digits and `垃圾区`, plus basic ASCII for custom numeric/English labels. +- Verification passed: + - `PYTHONPATH=src python3 -m unittest tests/test_alarm_snapshots.py -v` + - `PYTHONPATH=src python3 -m unittest discover -s tests -v` (`99` tests) +- Deployed `src/cold_display_guard/alarm_snapshots.py` to `xiaozheng@10.8.0.23` after backing up the previous remote file. +- Rebuilt `cold-display-guard:dev` and restarted `cold-display-guard-api` plus `cold-display-guard-runtime`. +- Remote verification passed: + - `GET /api/manage/health` returned `status=ok` and `runtime_status=running`. + - Container-side overlay smoke test confirmed two zones render different RGB values and label text pixels are present. + +## Current Task: Alarm Snapshot Chinese Label Rendering Fix + +**Goal:** Fix unreadable/garbled Chinese region names on uploaded alarm screenshots while keeping per-zone colors and fallback labeling robust. + +**Design:** Use a real CJK font renderer for Chinese labels in the alarm snapshot overlay path. Install Noto CJK fonts in the runtime image, render labels through ffmpeg `drawtext` when the font is available, and fall back to readable ASCII labels if the font renderer is unavailable. + +- [x] Reproduce and identify the likely root cause: remote container only matched DejaVu for `zh-cn`, so Chinese labels had no real CJK font path. +- [x] Add regression tests for Docker CJK font installation and readable ASCII fallback labels. +- [x] Update `Dockerfile` to install `fonts-noto-cjk`. +- [x] Update `alarm_snapshots.py` to prefer CJK font rendering and use `R1`/`TRASH` fallback text when needed. +- [x] Run focused and full local Python verification. +- [x] Deploy `Dockerfile` and `alarm_snapshots.py` to `xiaozheng@10.8.0.23` without overwriting live config. +- [x] Rebuild/restart `cold-display-guard-api` and `cold-display-guard-runtime`. +- [x] Verify remote API/runtime health, CJK font availability, overlay smoke behavior, and runtime logs. + +### Review + +- Root cause was the screenshot overlay path not having a real Chinese font renderer in the deployed image; the container matched DejaVu before this fix. +- The rebuilt remote container now reports `NotoSansCJK-Regular.ttc: "Noto Sans CJK SC" "Regular"` for `fc-match :lang=zh-cn`. +- Remote overlay smoke test confirmed `find_cjk_font_file()` returns `/usr/share/fonts/opentype/noto/NotoSansCJK-Regular.ttc`, Chinese labels change the frame, bright label pixels are present, and different regions retain distinct colors. +- Local verification passed: + - `PYTHONPATH=src python3 -m unittest tests/test_alarm_snapshots.py -v` + - `PYTHONPATH=src python3 -m unittest discover -s tests -v` (`101` tests) +- Remote verification passed: + - `GET /api/manage/health` returned `status=ok`, `runtime_status=running`, and version `dev`. + - `cold-display-guard-api` is healthy and `cold-display-guard-runtime` is running after restart. + - Runtime logs show normal startup after the restart. diff --git a/tests/test_alarm_snapshots.py b/tests/test_alarm_snapshots.py index ad3b72e..639ee3d 100644 --- a/tests/test_alarm_snapshots.py +++ b/tests/test_alarm_snapshots.py @@ -2,10 +2,12 @@ from __future__ import annotations import unittest from datetime import datetime, timezone +from pathlib import Path from cold_display_guard import alarm_snapshots from cold_display_guard.alarm_snapshots import ( capture_alert_snapshot, + fallback_label_text, load_alarm_snapshot_settings, upload_snapshot_bytes, ) @@ -153,6 +155,40 @@ class AlarmSnapshotTests(unittest.TestCase): self.assertNotEqual(annotated.pixel(2, 2), (0, 0, 0)) self.assertNotEqual(annotated.pixel(0, 0), (0, 0, 0)) + def test_calibration_overlay_uses_distinct_zone_colors_and_draws_labels(self) -> None: + frame = Frame(width=40, height=20, rgb=b"\x00\x00\x00" * 800) + + annotated = alarm_snapshots.apply_calibration_overlay( + frame, + { + "zones": [ + { + "id": "1", + "label": "区域 1", + "polygon": [[0.05, 0.10], [0.40, 0.10], [0.40, 0.90], [0.05, 0.90]], + }, + { + "id": "2", + "label": "区域 2", + "polygon": [[0.55, 0.10], [0.90, 0.10], [0.90, 0.90], [0.55, 0.90]], + }, + ] + }, + ) + + self.assertNotEqual(annotated.pixel(10, 15), annotated.pixel(30, 15)) + label_pixels = [annotated.pixel(x, y) for y in range(2, 10) for x in range(2, 18)] + self.assertTrue(any(max(pixel) >= 220 for pixel in label_pixels), "expected bright label text pixels") + + def test_chinese_label_fallback_uses_readable_ascii_when_font_renderer_is_unavailable(self) -> None: + self.assertEqual(fallback_label_text("区域 1"), "R1") + self.assertEqual(fallback_label_text("区域 12"), "R12") + self.assertEqual(fallback_label_text("垃圾区"), "TRASH") + + def test_docker_image_installs_cjk_fonts_for_alarm_snapshot_labels(self) -> None: + dockerfile = (Path(__file__).resolve().parents[1] / "Dockerfile").read_text(encoding="utf-8") + self.assertIn("fonts-noto-cjk", dockerfile) + def test_capture_alert_snapshot_encodes_frame_with_configured_calibration_overlay(self) -> None: encoded_frames: list[Frame] = [] diff --git a/tests/test_cases.py b/tests/test_cases.py index 94ac3b7..271ccf4 100644 --- a/tests/test_cases.py +++ b/tests/test_cases.py @@ -48,6 +48,55 @@ class CaseStoreTests(unittest.TestCase): self.assertEqual(snapshots[0]["case_status"], "open") self.assertEqual(snapshots[0]["source_event"], "time_alarm") + def test_time_pre_warning_creates_open_pre_warning_case(self) -> None: + store = CaseStore() + + snapshots = store.apply_batch_events( + [event("time_pre_warning", self.t0, severity="warning", state="pre_warning")] + ) + + self.assertEqual(len(snapshots), 1) + self.assertEqual(snapshots[0]["case_type"], "pre_warning") + self.assertEqual(snapshots[0]["case_status"], "open") + self.assertEqual(snapshots[0]["source_event"], "time_pre_warning") + + def test_pre_warning_handled_auto_closes_open_case(self) -> None: + store = CaseStore() + store.apply_batch_events([event("time_pre_warning", self.t0, severity="warning", state="pre_warning")]) + + snapshots = store.apply_batch_events( + [event("pre_warning_handled", self.t0.replace(minute=1), severity="info", state="handled")] + ) + + self.assertEqual(len(snapshots), 1) + self.assertEqual(snapshots[0]["case_status"], "handled") + self.assertEqual(snapshots[0]["handled_source"], "auto_removed_before_alarm") + + def test_time_alarm_upgrades_pre_warning_case(self) -> None: + store = CaseStore() + store.apply_batch_events([event("time_pre_warning", self.t0, severity="warning", state="pre_warning")]) + + snapshots = store.apply_batch_events([event("time_alarm", self.t0.replace(minute=2), severity="alarm", state="alerted")]) + + self.assertEqual(len(snapshots), 1) + self.assertEqual(snapshots[0]["case_type"], "time_alarm") + self.assertEqual(snapshots[0]["case_status"], "open") + self.assertEqual(snapshots[0]["source_event"], "time_alarm") + + def test_alarm_removal_timeout_upgrades_same_case(self) -> None: + store = CaseStore() + store.apply_batch_events([event("time_pre_warning", self.t0, severity="warning", state="pre_warning")]) + store.apply_batch_events([event("time_alarm", self.t0.replace(minute=2), severity="alarm", state="alerted")]) + + snapshots = store.apply_batch_events( + [event("alarm_removal_timeout", self.t0.replace(minute=3), severity="alarm", state="alarm_removal_timeout")] + ) + + self.assertEqual(len(snapshots), 1) + self.assertEqual(snapshots[0]["case_type"], "alarm_removal_timeout") + self.assertEqual(snapshots[0]["case_status"], "open") + self.assertEqual(snapshots[0]["source_event"], "alarm_removal_timeout") + def test_pending_disposal_upgrades_existing_case(self) -> None: store = CaseStore() store.apply_batch_events([event("time_alarm", self.t0, severity="alarm", state="alerted")]) diff --git a/tests/test_config.py b/tests/test_config.py index be8a114..4425b6b 100644 --- a/tests/test_config.py +++ b/tests/test_config.py @@ -16,7 +16,9 @@ class ConfigTests(unittest.TestCase): camera_id = "cam_a" [thresholds] +pre_warning_seconds = 20 max_dwell_seconds = 30 +alarm_removal_seconds = 2 trash_confirmation_seconds = 4 [layout] @@ -29,7 +31,9 @@ cols = 2 settings = load_settings(path) self.assertEqual(settings.camera_id, "cam_a") + self.assertEqual(settings.pre_warning_seconds, 20) self.assertEqual(settings.max_dwell_seconds, 30) + self.assertEqual(settings.alarm_removal_seconds, 2) self.assertEqual(settings.trash_confirmation_seconds, 4) self.assertEqual(settings.zone_ids, ("r1c1", "r1c2")) @@ -118,6 +122,8 @@ zone_ids = ["1", "2", "3"] text = path.read_text(encoding="utf-8") self.assertIn("zone_count = 2", text) + self.assertIn("pre_warning_seconds = 0", text) + self.assertIn("alarm_removal_seconds = 0", text) self.assertIn('label = "区域 1"', text) self.assertIn("[trash]", text) self.assertNotIn('"trash"', text.split("[layout]", maxsplit=1)[1].split("[[zones]]", maxsplit=1)[0]) diff --git a/tests/test_engine.py b/tests/test_engine.py index 7db9948..ee1a74f 100644 --- a/tests/test_engine.py +++ b/tests/test_engine.py @@ -140,6 +140,96 @@ class BatchEngineTests(unittest.TestCase): self.assertEqual(alarm_events[0]["current_count"], 1) self.assertIn("alerted_at", alarm_events[0]) + def test_pre_warning_emits_once_before_alarm_threshold(self) -> None: + settings = EngineSettings( + camera_id="test_cam", + pre_warning_seconds=60, + max_dwell_seconds=120, + alarm_removal_seconds=30, + trash_confirmation_seconds=30, + zone_ids=("1",), + ) + engine = BatchEngine(settings) + engine.process(obs(self.t0, {"1": 1})) + + pre_warning_events = engine.process(obs(self.t0 + timedelta(seconds=60), {"1": 1})) + repeated_events = engine.process(obs(self.t0 + timedelta(seconds=90), {"1": 1})) + + self.assertEqual([event["event"] for event in pre_warning_events], ["time_pre_warning"]) + self.assertEqual(pre_warning_events[0]["severity"], "warning") + self.assertEqual(pre_warning_events[0]["state"], "pre_warning") + self.assertEqual(pre_warning_events[0]["pre_warning_seconds"], 60) + self.assertEqual(pre_warning_events[0]["pre_warned_at"], (self.t0 + timedelta(seconds=60)).isoformat()) + self.assertEqual(pre_warning_events[0]["current_count"], 1) + self.assertEqual(repeated_events, []) + + def test_pre_warning_removed_before_alarm_is_auto_handled(self) -> None: + settings = EngineSettings( + camera_id="test_cam", + pre_warning_seconds=60, + max_dwell_seconds=120, + alarm_removal_seconds=30, + trash_confirmation_seconds=30, + zone_ids=("1",), + ) + engine = BatchEngine(settings) + engine.process(obs(self.t0, {"1": 1})) + engine.process(obs(self.t0 + timedelta(seconds=60), {"1": 1})) + + events = engine.process(obs(self.t0 + timedelta(seconds=90), {"1": 0})) + + self.assertEqual([event["event"] for event in events], ["pre_warning_handled"]) + self.assertEqual(events[0]["severity"], "info") + self.assertEqual(events[0]["state"], "handled") + self.assertEqual(events[0]["handled_source"], "auto_removed_before_alarm") + self.assertEqual(events[0]["ended_at"], (self.t0 + timedelta(seconds=90)).isoformat()) + + def test_alarm_removal_timeout_emits_once_before_late_removal(self) -> None: + settings = EngineSettings( + camera_id="test_cam", + pre_warning_seconds=60, + max_dwell_seconds=120, + alarm_removal_seconds=30, + trash_confirmation_seconds=30, + zone_ids=("1",), + ) + engine = BatchEngine(settings) + engine.process(obs(self.t0, {"1": 1})) + engine.process(obs(self.t0 + timedelta(seconds=60), {"1": 1})) + alarm_events = engine.process(obs(self.t0 + timedelta(seconds=120), {"1": 1})) + + timeout_events = engine.process(obs(self.t0 + timedelta(seconds=151), {"1": 1})) + repeated_events = engine.process(obs(self.t0 + timedelta(seconds=160), {"1": 1})) + removal_events = engine.process(obs(self.t0 + timedelta(seconds=170), {"1": 0}, trash=True)) + + self.assertEqual([event["event"] for event in alarm_events], ["time_alarm"]) + self.assertEqual(alarm_events[0]["alarm_removal_deadline"], (self.t0 + timedelta(seconds=150)).isoformat()) + self.assertEqual([event["event"] for event in timeout_events], ["alarm_removal_timeout"]) + self.assertEqual(timeout_events[0]["severity"], "alarm") + self.assertEqual(timeout_events[0]["state"], "alarm_removal_timeout") + self.assertEqual(timeout_events[0]["reason"], "alarmed_batch_not_removed_after_alarm_window") + self.assertEqual(repeated_events, []) + self.assertEqual([event["event"] for event in removal_events], ["batch_pending_disposal", "batch_discarded"]) + + def test_alarmed_batch_removed_within_alarm_window_does_not_emit_removal_timeout(self) -> None: + settings = EngineSettings( + camera_id="test_cam", + pre_warning_seconds=60, + max_dwell_seconds=120, + alarm_removal_seconds=30, + trash_confirmation_seconds=30, + zone_ids=("1",), + ) + engine = BatchEngine(settings) + engine.process(obs(self.t0, {"1": 1})) + engine.process(obs(self.t0 + timedelta(seconds=60), {"1": 1})) + engine.process(obs(self.t0 + timedelta(seconds=120), {"1": 1})) + + events = engine.process(obs(self.t0 + timedelta(seconds=150), {"1": 0}, trash=True)) + + self.assertEqual([event["event"] for event in events], ["batch_pending_disposal", "batch_discarded"]) + self.assertTrue(all(event["event"] != "alarm_removal_timeout" for event in events)) + def test_alarmed_batch_removed_without_trash_deposit_escalates_warning(self) -> None: settings = EngineSettings( camera_id="test_cam", @@ -260,6 +350,37 @@ class BatchEngineTests(unittest.TestCase): self.assertEqual(removal_events[0]["batch_id"], "batch_000124") self.assertEqual(removal_events[0]["dwell_seconds"], 1400) + def test_restore_keeps_alarm_removal_timeout_deadline_after_runtime_restart(self) -> None: + settings = EngineSettings( + camera_id="test_cam", + pre_warning_seconds=60, + max_dwell_seconds=120, + alarm_removal_seconds=30, + trash_confirmation_seconds=30, + zone_ids=("1",), + ) + engine = BatchEngine(settings) + engine.restore_from_events( + [ + { + "event": "time_alarm", + "zone_id": "1", + "batch_id": "batch_000124", + "started_at": self.t0.isoformat(), + "pre_warned_at": (self.t0 + timedelta(seconds=60)).isoformat(), + "alerted_at": (self.t0 + timedelta(seconds=120)).isoformat(), + "current_count": 1, + "state": "alerted", + }, + ], + active_zone_counts={"1": 1}, + ) + + timeout_events = engine.process(obs(self.t0 + timedelta(seconds=151), {"1": 1})) + + self.assertEqual([event["event"] for event in timeout_events], ["alarm_removal_timeout"]) + self.assertEqual(timeout_events[0]["batch_id"], "batch_000124") + def test_restore_skips_active_false_positive_when_latest_zone_count_is_empty(self) -> None: settings = EngineSettings( camera_id="test_cam", diff --git a/tests/test_webhooks.py b/tests/test_webhooks.py index 388836d..adb606b 100644 --- a/tests/test_webhooks.py +++ b/tests/test_webhooks.py @@ -80,6 +80,46 @@ class WebhookTests(unittest.TestCase): self.assertFalse(payload["is_discarded"]) self.assertEqual(payload["alerted_at"], datetime(2026, 6, 9, 9, 0, tzinfo=UTC).isoformat()) + def test_build_batch_event_payload_preserves_pre_warning_and_alarm_times(self) -> None: + pre_warned_at = datetime(2026, 6, 9, 8, 59, tzinfo=UTC).isoformat() + alarm_at = datetime(2026, 6, 9, 9, 0, tzinfo=UTC).isoformat() + + pre_warning_payload = build_batch_event_payload( + { + "event": "time_pre_warning", + "ts": pre_warned_at, + "batch_id": "batch_000001", + "camera_id": "cam_01", + "zone_id": "1", + "zone_label": "区域 1", + "severity": "warning", + "state": "pre_warning", + "started_at": datetime(2026, 6, 9, 8, 58, tzinfo=UTC).isoformat(), + "pre_warned_at": pre_warned_at, + } + ) + alarm_payload = build_batch_event_payload( + { + "event": "time_alarm", + "ts": alarm_at, + "batch_id": "batch_000001", + "camera_id": "cam_01", + "zone_id": "1", + "zone_label": "区域 1", + "severity": "alarm", + "state": "alerted", + "started_at": datetime(2026, 6, 9, 8, 58, tzinfo=UTC).isoformat(), + "pre_warned_at": pre_warned_at, + "alerted_at": alarm_at, + } + ) + + self.assertEqual(pre_warning_payload["pre_warned_at"], pre_warned_at) + self.assertEqual(pre_warning_payload["created_at"], pre_warned_at) + self.assertEqual(pre_warning_payload["alarm_at"], "") + self.assertEqual(alarm_payload["pre_warned_at"], pre_warned_at) + self.assertEqual(alarm_payload["alarm_at"], alarm_at) + def test_build_batch_event_payload_includes_uploaded_snapshot_path(self) -> None: payload = build_batch_event_payload( {