Compare commits

6 Commits

Author SHA1 Message Date
0c3895c24c fix: preserve handled display cabinet cases 2026-06-15 16:08:12 +08:00
7b9ec2e148 fix: reduce alarm snapshot label obstruction 2026-06-15 14:21:31 +08:00
fa2c90e250 fix: stabilize cold display occupancy detection 2026-06-15 13:40:20 +08:00
1059850378 feat: add cold display alarm flow and labeled snapshots 2026-06-15 12:59:25 +08:00
46889c0621 feat: draw calibration overlay on alarm snapshots
Before JPEG encoding and OTA upload, paint the configured [[zones]]
polygons (yellow) and the [trash].roi (red) directly onto a copied
Frame.rgb so uploaded alarm snapshots visually carry the calibrated
regions. Normalized coordinates are clamped to image bounds, the source
frame stays untouched for downstream runtime processing, and
non-alert/disabled paths are unchanged. Adds stdlib-only polygon
fill/outline helpers plus focused unit tests.

Co-Authored-By: Claude <noreply@anthropic.com>
2026-06-15 12:34:46 +08:00
547fb6290f fix: use dynamic upstream resolver in nginx api proxy
Add an explicit Docker resolver and switch the /api/ proxy to a
variable-based upstream so nginx re-resolves cold-display-guard-api
instead of caching stale DNS after the backend container restarts.

Co-Authored-By: Claude <noreply@anthropic.com>
2026-06-15 11:43:48 +08:00
20 changed files with 1917 additions and 14 deletions

View File

@@ -11,6 +11,7 @@ RUN sed -i 's|http://deb.debian.org/debian|http://mirrors.aliyun.com/debian|g; s
apt-get update && apt-get install -y --no-install-recommends \ apt-get update && apt-get install -y --no-install-recommends \
ca-certificates \ ca-certificates \
ffmpeg \ ffmpeg \
fonts-noto-cjk \
tzdata \ tzdata \
&& rm -rf /var/lib/apt/lists/* && rm -rf /var/lib/apt/lists/*

View File

@@ -5,7 +5,9 @@ timezone = "Asia/Shanghai"
rtsp_url = "" rtsp_url = ""
[thresholds] [thresholds]
pre_warning_seconds = 900
max_dwell_seconds = 1200 max_dwell_seconds = 1200
alarm_removal_seconds = 1800
trash_confirmation_seconds = 120 trash_confirmation_seconds = 120
[layout] [layout]
@@ -39,6 +41,7 @@ roi = [[0.776842, 0.486901], [0.896842, 0.522456], [0.841053, 0.857427], [0.7168
sample_stride_pixels = 4 sample_stride_pixels = 4
occupancy_mean_delta = 55.0 occupancy_mean_delta = 55.0
occupancy_dark_luma_threshold = 80.0 occupancy_dark_luma_threshold = 80.0
occupancy_absolute_dark_fraction = 0.0
occupancy_dark_fraction = 0.06 occupancy_dark_fraction = 0.06
occupancy_texture_dark_fraction = 0.04 occupancy_texture_dark_fraction = 0.04
occupancy_bright_luma_threshold = 220.0 occupancy_bright_luma_threshold = 220.0

View File

@@ -30,6 +30,51 @@ class AlarmSnapshotSettings:
encode_timeout_seconds: float = 10.0 encode_timeout_seconds: float = 10.0
@dataclass(frozen=True, slots=True)
class CalibrationOverlayRegion:
polygon: tuple[tuple[float, float], ...]
label: str
outline_rgb: tuple[int, int, int]
fill_rgb: tuple[int, int, int]
@dataclass(frozen=True, slots=True)
class OverlayLabel:
text: str
fallback_text: str
x: int
y: int
accent_rgb: tuple[int, int, int]
ZONE_OVERLAY_PALETTE: tuple[tuple[int, int, int], ...] = (
(255, 196, 0),
(0, 190, 255),
(112, 224, 96),
(255, 128, 32),
(180, 128, 255),
(0, 220, 180),
(255, 96, 176),
(192, 224, 48),
(96, 144, 255),
(255, 96, 96),
)
TRASH_OVERLAY_RGB = (255, 64, 64)
LABEL_TEXT_RGB = (255, 255, 255)
LABEL_SHADOW_RGB = (0, 0, 0)
LABEL_BOX_ALPHA = 0.34
LABEL_OUTLINE_ALPHA = 0.64
CJK_FONT_CANDIDATES = (
Path("/usr/share/fonts/opentype/noto/NotoSansCJK-Regular.ttc"),
Path("/usr/share/fonts/opentype/noto/NotoSansCJK-Regular.otf"),
Path("/usr/share/fonts/truetype/noto/NotoSansCJK-Regular.ttc"),
Path("/usr/share/fonts/truetype/arphic/uming.ttc"),
Path("/System/Library/Fonts/PingFang.ttc"),
Path("/System/Library/Fonts/STHeiti Light.ttc"),
Path("/Library/Fonts/Arial Unicode.ttf"),
)
class SnapshotUploadError(RuntimeError): class SnapshotUploadError(RuntimeError):
pass pass
@@ -69,7 +114,13 @@ def capture_alert_snapshot(
file_name = build_snapshot_file_name(alert_events[0], captured_at) file_name = build_snapshot_file_name(alert_events[0], captured_at)
object_key_hint = build_object_key_hint(settings.object_key_prefix, alert_events[0], captured_at, file_name) object_key_hint = build_object_key_hint(settings.object_key_prefix, alert_events[0], captured_at, file_name)
try: try:
image_bytes = (jpeg_encoder or encode_frame_to_jpeg)(frame, settings.encode_timeout_seconds) annotated_frame = apply_calibration_overlay(
frame,
config,
zone_ids=alert_event_zone_ids(alert_events),
include_trash=False,
)
image_bytes = (jpeg_encoder or encode_frame_to_jpeg)(annotated_frame, settings.encode_timeout_seconds)
result = (uploader or upload_snapshot_bytes)( result = (uploader or upload_snapshot_bytes)(
image_bytes, image_bytes,
file_name=file_name, file_name=file_name,
@@ -91,6 +142,888 @@ def capture_alert_snapshot(
} }
def apply_calibration_overlay(
frame: Frame,
config: dict[str, Any],
*,
zone_ids: set[str] | None = None,
include_trash: bool = True,
) -> Frame:
regions = load_calibration_overlay_regions(config, zone_ids=zone_ids, include_trash=include_trash)
if not regions or frame.width <= 0 or frame.height <= 0:
return frame
rgb = bytearray(frame.rgb)
if len(rgb) != frame.width * frame.height * 3:
return frame
labels: list[OverlayLabel] = []
for region in regions:
polygon = normalized_polygon_to_pixels(region.polygon, frame.width, frame.height)
if len(polygon) < 3:
continue
fill_polygon(rgb, frame.width, frame.height, polygon, region.fill_rgb, alpha=0.24)
draw_polygon_outline(rgb, frame.width, frame.height, polygon, region.outline_rgb, alpha=0.85)
label = label_for_polygon(polygon, region.label, frame.width, frame.height, region.outline_rgb)
if label is not None:
labels.append(label)
if labels and not render_region_labels_with_ffmpeg(rgb, frame.width, frame.height, labels):
for label in labels:
draw_region_label(
rgb,
frame.width,
frame.height,
label.x,
label.y,
label.fallback_text,
label.accent_rgb,
)
return Frame(width=frame.width, height=frame.height, rgb=bytes(rgb))
def load_calibration_overlay_regions(
config: dict[str, Any],
*,
zone_ids: set[str] | None = None,
include_trash: bool = True,
) -> list[CalibrationOverlayRegion]:
regions: list[CalibrationOverlayRegion] = []
for index, zone in enumerate(config.get("zones", [])):
if not isinstance(zone, dict):
continue
zone_id = str(zone.get("id", "")).strip()
if zone_ids is not None and zone_id not in zone_ids:
continue
polygon = normalize_overlay_polygon(zone.get("polygon", []))
if len(polygon) >= 3:
color = ZONE_OVERLAY_PALETTE[index % len(ZONE_OVERLAY_PALETTE)]
regions.append(
CalibrationOverlayRegion(
polygon=polygon,
label=overlay_region_label(zone, fallback=f"区域 {index + 1}"),
outline_rgb=color,
fill_rgb=color,
)
)
trash = config.get("trash", {})
if include_trash and isinstance(trash, dict):
polygon = normalize_overlay_polygon(trash.get("roi", []))
if len(polygon) >= 3:
regions.append(
CalibrationOverlayRegion(
polygon=polygon,
label=overlay_region_label(trash, fallback="垃圾区"),
outline_rgb=TRASH_OVERLAY_RGB,
fill_rgb=TRASH_OVERLAY_RGB,
)
)
return regions
def alert_event_zone_ids(events: list[dict[str, object]]) -> set[str]:
return {zone_id for event in events if (zone_id := str(event.get("zone_id", "")).strip())}
def overlay_region_label(payload: dict[str, Any], *, fallback: str) -> str:
for key in ("label", "name", "id"):
value = str(payload.get(key, "")).strip()
if value:
return value
return fallback
def normalize_overlay_polygon(value: object) -> tuple[tuple[float, float], ...]:
points: list[tuple[float, float]] = []
if not isinstance(value, list | tuple):
return ()
for item in value:
if not isinstance(item, list | tuple) or len(item) != 2:
continue
try:
x = clamp_unit(float(item[0]))
y = clamp_unit(float(item[1]))
except (TypeError, ValueError):
continue
points.append((x, y))
return tuple(points)
def normalized_polygon_to_pixels(
polygon: tuple[tuple[float, float], ...],
width: int,
height: int,
) -> tuple[tuple[int, int], ...]:
max_x = max(0, width - 1)
max_y = max(0, height - 1)
return tuple((round(x * max_x), round(y * max_y)) for x, y in polygon)
def fill_polygon(
rgb: bytearray,
width: int,
height: int,
polygon: tuple[tuple[int, int], ...],
color: tuple[int, int, int],
*,
alpha: float,
) -> None:
min_x = max(0, min(point[0] for point in polygon))
max_x = min(width - 1, max(point[0] for point in polygon))
min_y = max(0, min(point[1] for point in polygon))
max_y = min(height - 1, max(point[1] for point in polygon))
for y in range(min_y, max_y + 1):
for x in range(min_x, max_x + 1):
if point_in_pixel_polygon(x, y, polygon):
blend_pixel(rgb, width, x, y, color, alpha)
def draw_polygon_outline(
rgb: bytearray,
width: int,
height: int,
polygon: tuple[tuple[int, int], ...],
color: tuple[int, int, int],
*,
alpha: float,
) -> None:
previous = polygon[-1]
for point in polygon:
draw_line(rgb, width, height, previous, point, color, alpha=alpha)
previous = point
def label_for_polygon(
polygon: tuple[tuple[int, int], ...],
text: str,
width: int,
height: int,
accent_rgb: tuple[int, int, int],
) -> OverlayLabel | None:
label_text = text.strip()
fallback_text = fallback_label_text(label_text)
if not label_text or not fallback_text or width < 12 or height < 10:
return None
min_x = max(0, min(point[0] for point in polygon))
min_y = max(0, min(point[1] for point in polygon))
max_x = min(width - 1, max(point[0] for point in polygon))
max_y = min(height - 1, max(point[1] for point in polygon))
if max_x - min_x + 1 < 4 or max_y - min_y + 1 < 4:
return None
x = min(max(0, min_x + 2), max(0, width - 8))
y = min(max(0, min_y + 2), max(0, height - 8))
return OverlayLabel(
text=label_text,
fallback_text=fallback_text,
x=x,
y=y,
accent_rgb=accent_rgb,
)
def render_region_labels_with_ffmpeg(
rgb: bytearray,
width: int,
height: int,
labels: list[OverlayLabel],
) -> bool:
font_file = find_cjk_font_file()
if font_file is None:
return False
filter_text = build_drawtext_filter(labels, font_file, height)
if not filter_text:
return False
command = [
"ffmpeg",
"-hide_banner",
"-loglevel",
"error",
"-f",
"rawvideo",
"-pix_fmt",
"rgb24",
"-s",
f"{width}x{height}",
"-i",
"-",
"-vf",
filter_text,
"-frames:v",
"1",
"-f",
"rawvideo",
"-pix_fmt",
"rgb24",
"-",
]
try:
result = subprocess.run(
command,
input=bytes(rgb),
check=False,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
timeout=3,
)
except (FileNotFoundError, subprocess.TimeoutExpired, OSError):
return False
expected_size = width * height * 3
if result.returncode != 0 or len(result.stdout) != expected_size:
return False
rgb[:] = result.stdout
return True
def build_drawtext_filter(labels: list[OverlayLabel], font_file: Path, height: int) -> str:
font_size = max(10, min(22, round(height * 0.036)))
filters: list[str] = []
for label in labels:
text = escape_drawtext_value(label.text)
font = escape_drawtext_value(str(font_file))
color = rgb_to_hex(LABEL_TEXT_RGB)
box_color = f"black@{LABEL_BOX_ALPHA:.2f}"
filters.append(
"drawtext="
f"fontfile='{font}':"
f"text='{text}':"
f"x={label.x}:"
f"y={label.y}:"
f"fontsize={font_size}:"
f"fontcolor={color}:"
"box=1:"
f"boxcolor={box_color}:"
"boxborderw=2"
)
return ",".join(filters)
def escape_drawtext_value(value: str) -> str:
escaped = value.replace("\\", "\\\\")
for char in (":", "'", ",", "[", "]", "%"):
escaped = escaped.replace(char, f"\\{char}")
return escaped
def rgb_to_hex(color: tuple[int, int, int]) -> str:
return f"0x{color[0]:02x}{color[1]:02x}{color[2]:02x}"
def find_cjk_font_file() -> Path | None:
for candidate in CJK_FONT_CANDIDATES:
if candidate.exists():
return candidate
try:
result = subprocess.run(
["fc-match", "-f", "%{file}", ":lang=zh-cn"],
check=False,
stdout=subprocess.PIPE,
stderr=subprocess.DEVNULL,
timeout=1,
text=True,
)
except (FileNotFoundError, subprocess.TimeoutExpired, OSError):
return None
font_path = Path(result.stdout.strip())
if font_path.exists() and any(marker in font_path.name.lower() for marker in ("noto", "cjk", "hei", "song", "fang")):
return font_path
return None
def fallback_label_text(label: str) -> str:
text = label.strip()
if not text:
return ""
if "垃圾" in text:
return "TRASH"
digits = "".join(char for char in text if char.isdigit())
if digits:
return f"R{digits}"
ascii_text = "".join(char if char.isascii() and (char.isalnum() or char in {"-", "_"}) else "" for char in text)
return ascii_text[:12] or "ZONE"
def draw_region_label(
rgb: bytearray,
width: int,
height: int,
x: int,
y: int,
label: str,
accent_rgb: tuple[int, int, int],
) -> None:
text = label.strip()
if not text or width < 12 or height < 10:
return
glyphs = text_to_glyphs(text)
if not glyphs:
return
text_width = sum(len(glyph[0]) for glyph in glyphs) + max(0, len(glyphs) - 1)
text_height = max(len(glyph) for glyph in glyphs)
box_width = text_width + 4
box_height = text_height + 4
x = min(max(0, x), max(0, width - box_width - 1))
y = min(max(0, y), max(0, height - box_height - 1))
box_width = min(box_width, width - x)
box_height = min(box_height, height - y)
if box_width < 6 or box_height < 6:
return
fill_rect(rgb, width, height, x, y, box_width, box_height, LABEL_SHADOW_RGB, alpha=LABEL_BOX_ALPHA)
draw_rect_outline(rgb, width, height, x, y, box_width, box_height, accent_rgb, alpha=LABEL_OUTLINE_ALPHA)
draw_text(rgb, width, height, x + 2, y + 2, glyphs, LABEL_TEXT_RGB, LABEL_SHADOW_RGB)
def fill_rect(
rgb: bytearray,
width: int,
height: int,
x: int,
y: int,
rect_width: int,
rect_height: int,
color: tuple[int, int, int],
*,
alpha: float,
) -> None:
for yy in range(max(0, y), min(height, y + rect_height)):
for xx in range(max(0, x), min(width, x + rect_width)):
blend_pixel(rgb, width, xx, yy, color, alpha)
def draw_rect_outline(
rgb: bytearray,
width: int,
height: int,
x: int,
y: int,
rect_width: int,
rect_height: int,
color: tuple[int, int, int],
*,
alpha: float,
) -> None:
if rect_width <= 0 or rect_height <= 0:
return
x1 = min(width - 1, x + rect_width - 1)
y1 = min(height - 1, y + rect_height - 1)
draw_line(rgb, width, height, (x, y), (x1, y), color, alpha=alpha)
draw_line(rgb, width, height, (x1, y), (x1, y1), color, alpha=alpha)
draw_line(rgb, width, height, (x1, y1), (x, y1), color, alpha=alpha)
draw_line(rgb, width, height, (x, y1), (x, y), color, alpha=alpha)
def draw_text(
rgb: bytearray,
width: int,
height: int,
x: int,
y: int,
glyphs: list[tuple[str, ...]],
color: tuple[int, int, int],
shadow: tuple[int, int, int],
) -> None:
cursor_x = x
for glyph in glyphs:
draw_glyph(rgb, width, height, cursor_x + 1, y + 1, glyph, shadow, alpha=0.8)
draw_glyph(rgb, width, height, cursor_x, y, glyph, color, alpha=1.0)
cursor_x += len(glyph[0]) + 1
def draw_glyph(
rgb: bytearray,
width: int,
height: int,
x: int,
y: int,
glyph: tuple[str, ...],
color: tuple[int, int, int],
*,
alpha: float,
) -> None:
for row_index, row in enumerate(glyph):
yy = y + row_index
if yy < 0 or yy >= height:
continue
for col_index, marker in enumerate(row):
if marker != "1":
continue
xx = x + col_index
if 0 <= xx < width:
blend_pixel(rgb, width, xx, yy, color, alpha)
def text_to_glyphs(text: str) -> list[tuple[str, ...]]:
glyphs: list[tuple[str, ...]] = []
for char in text[:12]:
glyph = GLYPHS.get(char.upper(), GLYPHS.get(char))
if glyph is not None:
glyphs.append(glyph)
return glyphs
GLYPHS: dict[str, tuple[str, ...]] = {
" ": (
"000",
"000",
"000",
"000",
"000",
"000",
"000",
),
"0": (
"111",
"101",
"101",
"101",
"101",
"101",
"111",
),
"1": (
"010",
"110",
"010",
"010",
"010",
"010",
"111",
),
"2": (
"111",
"001",
"001",
"111",
"100",
"100",
"111",
),
"3": (
"111",
"001",
"001",
"111",
"001",
"001",
"111",
),
"4": (
"101",
"101",
"101",
"111",
"001",
"001",
"001",
),
"5": (
"111",
"100",
"100",
"111",
"001",
"001",
"111",
),
"6": (
"111",
"100",
"100",
"111",
"101",
"101",
"111",
),
"7": (
"111",
"001",
"001",
"010",
"010",
"100",
"100",
),
"8": (
"111",
"101",
"101",
"111",
"101",
"101",
"111",
),
"9": (
"111",
"101",
"101",
"111",
"001",
"001",
"111",
),
"A": (
"010",
"101",
"101",
"111",
"101",
"101",
"101",
),
"B": (
"110",
"101",
"101",
"110",
"101",
"101",
"110",
),
"C": (
"111",
"100",
"100",
"100",
"100",
"100",
"111",
),
"D": (
"110",
"101",
"101",
"101",
"101",
"101",
"110",
),
"E": (
"111",
"100",
"100",
"110",
"100",
"100",
"111",
),
"F": (
"111",
"100",
"100",
"110",
"100",
"100",
"100",
),
"G": (
"111",
"100",
"100",
"101",
"101",
"101",
"111",
),
"H": (
"101",
"101",
"101",
"111",
"101",
"101",
"101",
),
"I": (
"111",
"010",
"010",
"010",
"010",
"010",
"111",
),
"J": (
"001",
"001",
"001",
"001",
"101",
"101",
"111",
),
"K": (
"101",
"101",
"110",
"100",
"110",
"101",
"101",
),
"L": (
"100",
"100",
"100",
"100",
"100",
"100",
"111",
),
"M": (
"101",
"111",
"111",
"101",
"101",
"101",
"101",
),
"N": (
"101",
"111",
"111",
"111",
"101",
"101",
"101",
),
"O": (
"111",
"101",
"101",
"101",
"101",
"101",
"111",
),
"P": (
"111",
"101",
"101",
"111",
"100",
"100",
"100",
),
"Q": (
"111",
"101",
"101",
"101",
"111",
"001",
"001",
),
"R": (
"111",
"101",
"101",
"111",
"110",
"101",
"101",
),
"S": (
"111",
"100",
"100",
"111",
"001",
"001",
"111",
),
"T": (
"111",
"010",
"010",
"010",
"010",
"010",
"010",
),
"U": (
"101",
"101",
"101",
"101",
"101",
"101",
"111",
),
"V": (
"101",
"101",
"101",
"101",
"101",
"101",
"010",
),
"W": (
"101",
"101",
"101",
"101",
"111",
"111",
"101",
),
"X": (
"101",
"101",
"101",
"010",
"101",
"101",
"101",
),
"Y": (
"101",
"101",
"101",
"010",
"010",
"010",
"010",
),
"Z": (
"111",
"001",
"001",
"010",
"100",
"100",
"111",
),
"-": (
"000",
"000",
"000",
"111",
"000",
"000",
"000",
),
"_": (
"000",
"000",
"000",
"000",
"000",
"000",
"111",
),
"": (
"1111111",
"1000001",
"1011001",
"1001011",
"1011001",
"1000001",
"1111111",
),
"": (
"0010010",
"1111111",
"0010010",
"1111011",
"0010110",
"1111011",
"0010101",
),
"": (
"0100100",
"1111111",
"0100100",
"0101110",
"0100100",
"1100100",
"0111110",
),
"": (
"0101110",
"1110010",
"0100010",
"0101110",
"0101010",
"1101010",
"0110011",
),
}
def draw_line(
rgb: bytearray,
width: int,
height: int,
start: tuple[int, int],
end: tuple[int, int],
color: tuple[int, int, int],
*,
alpha: float,
) -> None:
x0, y0 = start
x1, y1 = end
dx = abs(x1 - x0)
dy = -abs(y1 - y0)
step_x = 1 if x0 < x1 else -1
step_y = 1 if y0 < y1 else -1
error = dx + dy
while True:
blend_pixel(rgb, width, x0, y0, color, alpha)
if x0 == x1 and y0 == y1:
break
twice_error = 2 * error
if twice_error >= dy:
error += dy
x0 += step_x
if twice_error <= dx:
error += dx
y0 += step_y
def point_in_pixel_polygon(x: int, y: int, polygon: tuple[tuple[int, int], ...]) -> bool:
inside = False
px = float(x)
py = float(y)
previous_x, previous_y = polygon[-1]
for current_x, current_y in polygon:
if point_on_segment(px, py, previous_x, previous_y, current_x, current_y):
return True
crosses = (current_y > py) != (previous_y > py)
if crosses:
slope_x = (previous_x - current_x) * (py - current_y) / (previous_y - current_y) + current_x
if px < slope_x:
inside = not inside
previous_x, previous_y = current_x, current_y
return inside
def point_on_segment(px: float, py: float, x0: int, y0: int, x1: int, y1: int) -> bool:
cross = (px - x0) * (y1 - y0) - (py - y0) * (x1 - x0)
if abs(cross) > 1e-9:
return False
return min(x0, x1) <= px <= max(x0, x1) and min(y0, y1) <= py <= max(y0, y1)
def blend_pixel(
rgb: bytearray,
width: int,
x: int,
y: int,
color: tuple[int, int, int],
alpha: float,
) -> None:
offset = (y * width + x) * 3
inverse = 1.0 - alpha
rgb[offset] = int(rgb[offset] * inverse + color[0] * alpha)
rgb[offset + 1] = int(rgb[offset + 1] * inverse + color[1] * alpha)
rgb[offset + 2] = int(rgb[offset + 2] * inverse + color[2] * alpha)
def clamp_unit(value: float) -> float:
return min(1.0, max(0.0, value))
def upload_snapshot_bytes( def upload_snapshot_bytes(
image_bytes: bytes, image_bytes: bytes,
*, *,

View File

@@ -8,15 +8,19 @@ from typing import Any
EVENT_CASE_TYPES = { EVENT_CASE_TYPES = {
"time_pre_warning": "pre_warning",
"time_alarm": "time_alarm", "time_alarm": "time_alarm",
"batch_pending_disposal": "pending_disposal", "batch_pending_disposal": "pending_disposal",
"alarm_removal_timeout": "alarm_removal_timeout",
"warning_escalated": "warning_escalated", "warning_escalated": "warning_escalated",
} }
CASE_PRIORITY = { CASE_PRIORITY = {
"time_alarm": 1, "pre_warning": 1,
"pending_disposal": 2, "time_alarm": 2,
"warning_escalated": 3, "pending_disposal": 3,
"alarm_removal_timeout": 4,
"warning_escalated": 5,
} }
@@ -139,10 +143,11 @@ class CaseStore:
case_id = build_case_id(batch_id) case_id = build_case_id(batch_id)
existing = self._cases.get(case_id) existing = self._cases.get(case_id)
if event_name == "batch_discarded": if event_name in {"batch_discarded", "pre_warning_handled"}:
if existing is None or existing.case_status == "handled": if existing is None or existing.case_status == "handled":
return None return None
return self._close_case(existing, when, handled_source="auto_closed") handled_source = "auto_removed_before_alarm" if event_name == "pre_warning_handled" else "auto_closed"
return self._close_case(existing, when, handled_source=handled_source)
case_type = EVENT_CASE_TYPES.get(event_name) case_type = EVENT_CASE_TYPES.get(event_name)
if case_type is None: if case_type is None:

View File

@@ -23,7 +23,9 @@ def load_settings(path: str | Path) -> EngineSettings:
return EngineSettings( return EngineSettings(
camera_id=str(data.get("camera_id", "cold_display_cam_01")), camera_id=str(data.get("camera_id", "cold_display_cam_01")),
pre_warning_seconds=int(thresholds.get("pre_warning_seconds", 0)),
max_dwell_seconds=int(thresholds.get("max_dwell_seconds", 10_800)), max_dwell_seconds=int(thresholds.get("max_dwell_seconds", 10_800)),
alarm_removal_seconds=int(thresholds.get("alarm_removal_seconds", 0)),
trash_confirmation_seconds=int(thresholds.get("trash_confirmation_seconds", 120)), trash_confirmation_seconds=int(thresholds.get("trash_confirmation_seconds", 120)),
zone_ids=zone_ids, zone_ids=zone_ids,
) )
@@ -135,7 +137,9 @@ def format_config_document(data: dict[str, Any]) -> str:
thresholds = data.get("thresholds", {}) thresholds = data.get("thresholds", {})
lines.append("[thresholds]") lines.append("[thresholds]")
lines.append(f'pre_warning_seconds = {int(thresholds.get("pre_warning_seconds", 0))}')
lines.append(f'max_dwell_seconds = {int(thresholds.get("max_dwell_seconds", 10_800))}') lines.append(f'max_dwell_seconds = {int(thresholds.get("max_dwell_seconds", 10_800))}')
lines.append(f'alarm_removal_seconds = {int(thresholds.get("alarm_removal_seconds", 0))}')
lines.append(f'trash_confirmation_seconds = {int(thresholds.get("trash_confirmation_seconds", 120))}') lines.append(f'trash_confirmation_seconds = {int(thresholds.get("trash_confirmation_seconds", 120))}')
lines.append("") lines.append("")

View File

@@ -34,7 +34,9 @@ class BatchEngine:
if appeared_zones and self.pending_disposal: if appeared_zones and self.pending_disposal:
events.extend(self._mark_pending_as_returned(observation.ts, appeared_zones)) events.extend(self._mark_pending_as_returned(observation.ts, appeared_zones))
events.extend(self._apply_pre_warnings(observation.ts, previous_zone_counts))
events.extend(self._apply_time_alarms(observation.ts, previous_zone_counts)) events.extend(self._apply_time_alarms(observation.ts, previous_zone_counts))
events.extend(self._apply_alarm_removal_timeouts(observation.ts))
pending_count_before_zone_transitions = len(self.pending_disposal) pending_count_before_zone_transitions = len(self.pending_disposal)
for zone_id, new_count in zone_counts.items(): for zone_id, new_count in zone_counts.items():
@@ -99,7 +101,14 @@ class BatchEngine:
if zone_id not in self._zone_counts: if zone_id not in self._zone_counts:
continue continue
event_name = str(event.get("event", "")) event_name = str(event.get("event", ""))
if event_name in {"batch_started", "batch_count_changed", "mixed_batch_violation", "time_alarm"}: if event_name in {
"batch_started",
"batch_count_changed",
"mixed_batch_violation",
"time_pre_warning",
"time_alarm",
"alarm_removal_timeout",
}:
if active_zone_counts is not None and active_counts.get(zone_id, 0) <= 0: if active_zone_counts is not None and active_counts.get(zone_id, 0) <= 0:
self.active_by_zone.pop(zone_id, None) self.active_by_zone.pop(zone_id, None)
self._zone_counts[zone_id] = 0 self._zone_counts[zone_id] = 0
@@ -131,9 +140,19 @@ class BatchEngine:
last_count=max(1, int(event.get("current_count", 1) or 1)), last_count=max(1, int(event.get("current_count", 1) or 1)),
state=str(event.get("state", "active") or "active"), state=str(event.get("state", "active") or "active"),
) )
batch.pre_warned_at = parse_event_datetime(event.get("pre_warned_at"))
batch.alerted_at = parse_event_datetime(event.get("alerted_at")) batch.alerted_at = parse_event_datetime(event.get("alerted_at"))
if batch.alerted_at is not None: if batch.alerted_at is not None:
batch.state = "alerted" batch.state = "alerted"
if self.settings.alarm_removal_seconds > 0:
batch.alarm_removal_deadline = parse_event_datetime(event.get("alarm_removal_deadline"))
if batch.alarm_removal_deadline is None:
batch.alarm_removal_deadline = batch.alerted_at + self.settings.alarm_removal_window
elif batch.pre_warned_at is not None:
batch.state = "pre_warning"
batch.alarm_removal_timed_out_at = parse_event_datetime(event.get("alarm_removal_timed_out_at"))
if batch.alarm_removal_timed_out_at is not None:
batch.state = "alarm_removal_timeout"
batch.dwell_seconds = max(0, int(event.get("dwell_seconds", 0) or 0)) batch.dwell_seconds = max(0, int(event.get("dwell_seconds", 0) or 0))
return batch return batch
@@ -162,10 +181,44 @@ class BatchEngine:
self.pending_disposal.append(batch) self.pending_disposal.append(batch)
return self._event("batch_pending_disposal", when, batch, severity="warning") return self._event("batch_pending_disposal", when, batch, severity="warning")
if batch.pre_warned_at is not None:
batch.state = "handled"
self.closed_batches.append(batch)
return self._event(
"pre_warning_handled",
when,
batch,
severity="info",
handled_source="auto_removed_before_alarm",
)
batch.state = "consumed" batch.state = "consumed"
self.closed_batches.append(batch) self.closed_batches.append(batch)
return self._event("batch_consumed", when, batch, severity="info") return self._event("batch_consumed", when, batch, severity="info")
def _apply_pre_warnings(self, when: datetime, zone_counts: dict[str, int]) -> list[dict[str, Any]]:
if self.settings.pre_warning_seconds <= 0:
return []
events: list[dict[str, Any]] = []
for zone_id, batch in self.active_by_zone.items():
if batch.pre_warned_at is not None or batch.alerted_at is not None:
continue
dwell_seconds = batch.current_dwell_seconds(when)
if dwell_seconds < self.settings.pre_warning_seconds:
continue
batch.state = "pre_warning"
batch.pre_warned_at = when
events.append(
self._event(
"time_pre_warning",
when,
batch,
severity="warning",
current_count=zone_counts.get(zone_id, batch.last_count),
)
)
return events
def _apply_time_alarms(self, when: datetime, zone_counts: dict[str, int]) -> list[dict[str, Any]]: def _apply_time_alarms(self, when: datetime, zone_counts: dict[str, int]) -> list[dict[str, Any]]:
events: list[dict[str, Any]] = [] events: list[dict[str, Any]] = []
for zone_id, batch in self.active_by_zone.items(): for zone_id, batch in self.active_by_zone.items():
@@ -176,6 +229,8 @@ class BatchEngine:
continue continue
batch.state = "alerted" batch.state = "alerted"
batch.alerted_at = when batch.alerted_at = when
if self.settings.alarm_removal_seconds > 0:
batch.alarm_removal_deadline = when + self.settings.alarm_removal_window
events.append( events.append(
self._event( self._event(
"time_alarm", "time_alarm",
@@ -187,6 +242,30 @@ class BatchEngine:
) )
return events return events
def _apply_alarm_removal_timeouts(self, when: datetime) -> list[dict[str, Any]]:
if self.settings.alarm_removal_seconds <= 0:
return []
events: list[dict[str, Any]] = []
for batch in self.active_by_zone.values():
if batch.alerted_at is None or batch.alarm_removal_deadline is None:
continue
if batch.alarm_removal_timed_out_at is not None:
continue
if when <= batch.alarm_removal_deadline:
continue
batch.state = "alarm_removal_timeout"
batch.alarm_removal_timed_out_at = when
events.append(
self._event(
"alarm_removal_timeout",
when,
batch,
severity="alarm",
reason="alarmed_batch_not_removed_after_alarm_window",
)
)
return events
def _mark_mixed_batch( def _mark_mixed_batch(
self, self,
zone_id: str, zone_id: str,
@@ -273,13 +352,21 @@ class BatchEngine:
"state": batch.state, "state": batch.state,
"started_at": batch.started_at.isoformat(), "started_at": batch.started_at.isoformat(),
"dwell_seconds": batch.current_dwell_seconds(when), "dwell_seconds": batch.current_dwell_seconds(when),
"pre_warning_seconds": self.settings.pre_warning_seconds,
"max_dwell_seconds": self.settings.max_dwell_seconds, "max_dwell_seconds": self.settings.max_dwell_seconds,
"alarm_removal_seconds": self.settings.alarm_removal_seconds,
} }
zone_index = self._zone_index(batch.zone_id) zone_index = self._zone_index(batch.zone_id)
if zone_index is not None: if zone_index is not None:
payload["zone_index"] = zone_index payload["zone_index"] = zone_index
if batch.pre_warned_at is not None:
payload["pre_warned_at"] = batch.pre_warned_at.isoformat()
if batch.alerted_at is not None: if batch.alerted_at is not None:
payload["alerted_at"] = batch.alerted_at.isoformat() payload["alerted_at"] = batch.alerted_at.isoformat()
if batch.alarm_removal_deadline is not None:
payload["alarm_removal_deadline"] = batch.alarm_removal_deadline.isoformat()
if batch.alarm_removal_timed_out_at is not None:
payload["alarm_removal_timed_out_at"] = batch.alarm_removal_timed_out_at.isoformat()
if batch.ended_at is not None: if batch.ended_at is not None:
payload["ended_at"] = batch.ended_at.isoformat() payload["ended_at"] = batch.ended_at.isoformat()
if batch.disposal_deadline is not None: if batch.disposal_deadline is not None:
@@ -290,9 +377,9 @@ class BatchEngine:
return payload return payload
def _event_severity(self, event_name: str) -> str: def _event_severity(self, event_name: str) -> str:
if event_name == "time_alarm": if event_name in {"time_alarm", "alarm_removal_timeout"}:
return "alarm" return "alarm"
if event_name in {"warning_escalated", "batch_pending_disposal"}: if event_name in {"warning_escalated", "batch_pending_disposal", "time_pre_warning"}:
return "warning" return "warning"
if event_name.endswith("_violation"): if event_name.endswith("_violation"):
return "warning" return "warning"

View File

@@ -171,6 +171,7 @@ def load_case_store(path: Path) -> CaseStore:
def persist_case_updates(case_store: CaseStore, path: Path, events: list[dict[str, object]]) -> list[dict[str, object]]: def persist_case_updates(case_store: CaseStore, path: Path, events: list[dict[str, object]]) -> list[dict[str, object]]:
case_store = load_case_store(path)
snapshots = case_store.apply_batch_events(events) snapshots = case_store.apply_batch_events(events)
append_case_snapshots(path, snapshots) append_case_snapshots(path, snapshots)
return snapshots return snapshots

View File

@@ -11,14 +11,24 @@ DEFAULT_ZONE_IDS = tuple(f"r{row}c{col}" for row in range(1, 3) for col in range
@dataclass(frozen=True, slots=True) @dataclass(frozen=True, slots=True)
class EngineSettings: class EngineSettings:
camera_id: str = "cold_display_cam_01" camera_id: str = "cold_display_cam_01"
pre_warning_seconds: int = 0
max_dwell_seconds: int = 10_800 max_dwell_seconds: int = 10_800
alarm_removal_seconds: int = 0
trash_confirmation_seconds: int = 120 trash_confirmation_seconds: int = 120
zone_ids: tuple[str, ...] = DEFAULT_ZONE_IDS zone_ids: tuple[str, ...] = DEFAULT_ZONE_IDS
@property
def pre_warning(self) -> timedelta:
return timedelta(seconds=self.pre_warning_seconds)
@property @property
def max_dwell(self) -> timedelta: def max_dwell(self) -> timedelta:
return timedelta(seconds=self.max_dwell_seconds) return timedelta(seconds=self.max_dwell_seconds)
@property
def alarm_removal_window(self) -> timedelta:
return timedelta(seconds=self.alarm_removal_seconds)
@property @property
def trash_confirmation_window(self) -> timedelta: def trash_confirmation_window(self) -> timedelta:
return timedelta(seconds=self.trash_confirmation_seconds) return timedelta(seconds=self.trash_confirmation_seconds)
@@ -56,7 +66,10 @@ class Batch:
started_at: datetime started_at: datetime
last_count: int last_count: int
state: str = "active" state: str = "active"
pre_warned_at: datetime | None = None
alerted_at: datetime | None = None alerted_at: datetime | None = None
alarm_removal_deadline: datetime | None = None
alarm_removal_timed_out_at: datetime | None = None
ended_at: datetime | None = None ended_at: datetime | None = None
pending_since: datetime | None = None pending_since: datetime | None = None
disposal_deadline: datetime | None = None disposal_deadline: datetime | None = None

View File

@@ -30,6 +30,7 @@ class RuntimeVisionSettings:
occupancy_texture_delta: float = 18.0 occupancy_texture_delta: float = 18.0
occupancy_dark_luma_threshold: float = 80.0 occupancy_dark_luma_threshold: float = 80.0
occupancy_dark_fraction: float = 0.06 occupancy_dark_fraction: float = 0.06
occupancy_absolute_dark_fraction: float = 0.0
occupancy_texture_dark_fraction: float = 0.04 occupancy_texture_dark_fraction: float = 0.04
occupancy_bright_luma_threshold: float = 220.0 occupancy_bright_luma_threshold: float = 220.0
occupancy_bright_reflection_fraction: float = 0.18 occupancy_bright_reflection_fraction: float = 0.18
@@ -236,6 +237,7 @@ def load_runtime_vision_settings(config: dict[str, Any]) -> RuntimeVisionSetting
occupancy_texture_delta=float(runtime.get("occupancy_texture_delta", 18.0)), occupancy_texture_delta=float(runtime.get("occupancy_texture_delta", 18.0)),
occupancy_dark_luma_threshold=float(runtime.get("occupancy_dark_luma_threshold", 80.0)), occupancy_dark_luma_threshold=float(runtime.get("occupancy_dark_luma_threshold", 80.0)),
occupancy_dark_fraction=float(runtime.get("occupancy_dark_fraction", 0.06)), occupancy_dark_fraction=float(runtime.get("occupancy_dark_fraction", 0.06)),
occupancy_absolute_dark_fraction=float(runtime.get("occupancy_absolute_dark_fraction", 0.0)),
occupancy_texture_dark_fraction=float(runtime.get("occupancy_texture_dark_fraction", 0.04)), occupancy_texture_dark_fraction=float(runtime.get("occupancy_texture_dark_fraction", 0.04)),
occupancy_bright_luma_threshold=float(runtime.get("occupancy_bright_luma_threshold", 220.0)), occupancy_bright_luma_threshold=float(runtime.get("occupancy_bright_luma_threshold", 220.0)),
occupancy_bright_reflection_fraction=float(runtime.get("occupancy_bright_reflection_fraction", 0.18)), occupancy_bright_reflection_fraction=float(runtime.get("occupancy_bright_reflection_fraction", 0.18)),
@@ -324,13 +326,18 @@ def metrics_indicate_occupied(
dark_delta = dark_fraction - baseline_dark_fraction dark_delta = dark_fraction - baseline_dark_fraction
bright_reflection = is_bright_reflection(settings, dark_delta, bright_fraction) bright_reflection = is_bright_reflection(settings, dark_delta, bright_fraction)
dark_occupied = dark_delta >= settings.occupancy_dark_fraction and not bright_reflection dark_occupied = dark_delta >= settings.occupancy_dark_fraction and not bright_reflection
absolute_dark_occupied = (
settings.occupancy_absolute_dark_fraction > 0
and dark_fraction >= settings.occupancy_absolute_dark_fraction
and not bright_reflection
)
mean_occupied = mean_delta >= settings.occupancy_mean_delta and not bright_reflection mean_occupied = mean_delta >= settings.occupancy_mean_delta and not bright_reflection
texture_occupied = ( texture_occupied = (
texture_delta >= settings.occupancy_texture_delta texture_delta >= settings.occupancy_texture_delta
and dark_delta >= settings.occupancy_texture_dark_fraction and dark_delta >= settings.occupancy_texture_dark_fraction
and not bright_reflection and not bright_reflection
) )
return dark_occupied or mean_occupied or texture_occupied return dark_occupied or absolute_dark_occupied or mean_occupied or texture_occupied
def is_bright_reflection(settings: RuntimeVisionSettings, dark_delta: float, bright_fraction: float) -> bool: def is_bright_reflection(settings: RuntimeVisionSettings, dark_delta: float, bright_fraction: float) -> bool:

View File

@@ -145,6 +145,7 @@ def build_batch_event_payload(
batch_id = str(event.get("batch_id", "")) batch_id = str(event.get("batch_id", ""))
event_name = str(event.get("event", "")) event_name = str(event.get("event", ""))
ts = str(event.get("ts", "")) ts = str(event.get("ts", ""))
pre_warned_at = str(event.get("pre_warned_at", ""))
alerted_at = str(event.get("alerted_at", "")) alerted_at = str(event.get("alerted_at", ""))
ended_at = str(event.get("ended_at", "")) ended_at = str(event.get("ended_at", ""))
payload = { payload = {
@@ -165,7 +166,8 @@ def build_batch_event_payload(
"dwell_seconds": event.get("dwell_seconds", ""), "dwell_seconds": event.get("dwell_seconds", ""),
"is_discarded": event_name == "batch_discarded", "is_discarded": event_name == "batch_discarded",
"discarded_at": ts if event_name == "batch_discarded" else "", "discarded_at": ts if event_name == "batch_discarded" else "",
"created_at": alerted_at or ts, "created_at": pre_warned_at or alerted_at or ts,
"pre_warned_at": pre_warned_at,
"alerted_at": alerted_at, "alerted_at": alerted_at,
"alarm_at": alerted_at, "alarm_at": alerted_at,
"updated_at": ts, "updated_at": ts,
@@ -185,6 +187,7 @@ def build_case_event_payload(
handled_at = str(snapshot.get("handled_at", "")) handled_at = str(snapshot.get("handled_at", ""))
handled_source = str(snapshot.get("handled_source", "")) handled_source = str(snapshot.get("handled_source", ""))
event = snapshot_event(snapshot) event = snapshot_event(snapshot)
pre_warned_at = str(event.get("pre_warned_at", ""))
alerted_at = str(event.get("alerted_at", "")) alerted_at = str(event.get("alerted_at", ""))
ended_at = str(event.get("ended_at", "")) ended_at = str(event.get("ended_at", ""))
discarded = handled_source == "auto_closed" discarded = handled_source == "auto_closed"
@@ -208,7 +211,8 @@ def build_case_event_payload(
"dwell_seconds": event.get("dwell_seconds", ""), "dwell_seconds": event.get("dwell_seconds", ""),
"is_discarded": discarded, "is_discarded": discarded,
"discarded_at": handled_at if discarded else "", "discarded_at": handled_at if discarded else "",
"created_at": alerted_at or created_at, "created_at": pre_warned_at or alerted_at or created_at,
"pre_warned_at": pre_warned_at,
"alerted_at": alerted_at, "alerted_at": alerted_at,
"alarm_at": alerted_at, "alarm_at": alerted_at,
"updated_at": updated_at, "updated_at": updated_at,

View File

@@ -5,3 +5,15 @@
1. 对每个用户指定的 Webhook 路径,先在目标主机上用与真实请求接近的 `POST` 探测并记录状态码。 1. 对每个用户指定的 Webhook 路径,先在目标主机上用与真实请求接近的 `POST` 探测并记录状态码。
2. 如果存在多个相似路径,只能在验证过用户指定路径不可用后,才考虑回退到其它路径。 2. 如果存在多个相似路径,只能在验证过用户指定路径不可用后,才考虑回退到其它路径。
3. 切换远端配置前,先确认发送端容器对目标主机名或 IP 实际可达,避免写入不可解析的地址。 3. 切换远端配置前,先确认发送端容器对目标主机名或 IP 实际可达,避免写入不可解析的地址。
- 2026-06-15: 告警截图叠加中文区域名时,不能依赖默认西文字体或手写点阵字形;这会在现场截图里表现为乱码或不可读文字。
Prevention:
1. 涉及中文截图叠字时,镜像必须安装可验证的 CJK 字体包,并在容器内用 `fc-match :lang=zh-cn` 确认命中 CJK 字体。
2. 部署后必须在目标容器内跑一次中文标签叠图烟测,确认真实渲染路径可用,而不只检查像素变化。
3. 字体渲染不可用时,回退文本必须转换成可读 ASCII 标识,例如 `区域 1` -> `R1``垃圾区` -> `TRASH`,避免继续绘制乱码中文。
- 2026-06-15: 现场识别抖动排查时,不能先假设某个区域为空;用户指出区域 1、2、6 实际都有物后,原先单纯调高相对暗区阈值会压掉真实占用。
Prevention:
1. 调整视觉阈值前,必须向现场实际状态对齐,明确每个被分析区域当前应该是有物还是空。
2. 如果物品已存在于启动基线中,不能只依赖相对基线变化;需要绝对特征或重新采空基线来识别。
3. 对“正常取用”误报,应优先检查有物状态是否短暂掉空,并用判空确认帧数或滞后来处理抖动,而不是只提高占用阈值。

View File

@@ -1,5 +1,37 @@
# Task Todo # Task Todo
## Current Task: Runtime/API Case State Reopen Fix
**Goal:** When the management API marks a display-cabinet case as handled, the runtime process must not later append a newer `open` snapshot for the same case from stale in-memory state.
- [x] Add a failing regression test for API-written `handled` state being preserved when runtime persists later events.
- [x] Fix runtime case persistence to reconcile with the latest JSONL snapshots before applying new events.
- [x] Run targeted case/runtime tests.
- [x] Record remote chain verification and deployment status.
### Findings
- On `xiaozheng@10.8.0.23`, `case_batch_000911` was marked `handled` at `2026-06-15T07:27:12Z`, then runtime appended a newer `open` snapshot for the same case at `2026-06-15T15:38:03+08:00`.
- The API and runtime are separate processes sharing `logs/cases.jsonl`; runtime keeps a long-lived `CaseStore` loaded at startup and did not see the API-written handled snapshot.
### Verification
- RED:
- `eval "$(/opt/homebrew/bin/pyenv init -)" && PYTHONPATH=src python -m unittest tests.test_main.RuntimeRestoreTests.test_persist_case_updates_preserves_api_handled_snapshot -v`
- Result before fix: failed because runtime appended a later `open` snapshot.
- Local targeted verification:
- `eval "$(/opt/homebrew/bin/pyenv init -)" && PYTHONPATH=src python -m unittest tests.test_main.RuntimeRestoreTests.test_persist_case_updates_preserves_api_handled_snapshot -v`
- `eval "$(/opt/homebrew/bin/pyenv init -)" && PYTHONPATH=src python -m unittest tests/test_cases.py -v`
- `eval "$(/opt/homebrew/bin/pyenv init -)" && PYTHONPATH=src python -m unittest tests/test_main.py -v`
- Result: all passed.
- Remote deployment:
- Synced only `src/cold_display_guard/main.py` to `xiaozheng@10.8.0.23:/home/xiaozheng/cold_display_guard/src/cold_display_guard/main.py`.
- Ran `docker compose --env-file deploy/cold-display-guard.env -f deploy/docker-compose.yml up -d --build cold-display-guard-runtime`.
- Compose recreated `cold-display-guard-api` and `cold-display-guard-runtime`; health check returned `status=ok`.
- Remote behavior check:
- Ran the same API-handled/runtime-later-event scenario inside `cold-display-guard-runtime` using a temp JSONL file.
- Result: `{"handled_source": "manual", "latest_status": "handled", "new_snapshots": 0}`.
- [x] Review the current project instructions and check for task-relevant lessons. - [x] Review the current project instructions and check for task-relevant lessons.
- [x] Inspect the OTA upload API document and current runtime/webhook capture path. - [x] Inspect the OTA upload API document and current runtime/webhook capture path.
- [x] Create an isolated worktree for alarm snapshot upload implementation. - [x] Create an isolated worktree for alarm snapshot upload implementation.
@@ -64,3 +96,326 @@
- `PYTHONPATH=src python3 -m unittest discover -s tests -v` - `PYTHONPATH=src python3 -m unittest discover -s tests -v`
- Deployed updated code to `xiaozheng@10.8.0.11` without overwriting the remote `config/example.toml`, rebuilt `cold-display-guard:dev`, and restarted only `cold-display-guard-api` plus `cold-display-guard-runtime`. - Deployed updated code to `xiaozheng@10.8.0.11` without overwriting the remote `config/example.toml`, rebuilt `cold-display-guard:dev`, and restarted only `cold-display-guard-api` plus `cold-display-guard-runtime`.
- Natural post-deploy traffic did not arrive during the 2-minute observation window, so final runtime verification used the deployed container to build representative batch/case webhook payloads with the live remote config and confirmed `camera_ip = 192.168.3.4` plus all new downstream fields were present. - Natural post-deploy traffic did not arrive during the 2-minute observation window, so final runtime verification used the deployed container to build representative batch/case webhook payloads with the live remote config and confirmed `camera_ip = 192.168.3.4` plus all new downstream fields were present.
## Current Task: Deploy To 192.168.5.103
- [x] Inspect the existing deployment layout and active containers on `xiaozheng@192.168.5.103`.
- [x] Verify the exact webhook route on that host before writing config.
- [x] Sync the current project code to the remote deployment directory without overwriting the live RTSP and calibration config.
- [x] Configure the remote webhook settings for the local `video-recognition` receiver.
- [x] Rebuild and restart the remote API/runtime containers, then verify health and outbound webhook configuration.
### Deployment Findings
- Existing deployment path on `192.168.5.103` is `/home/xiaozheng/cold_display_guard`, not `~/apps/cold-display-guard/app`.
- The host already runs `cold-display-guard-api`, `cold-display-guard-runtime`, and `cold-display-guard-web` on ports `19080` and `23000`.
- The same host also runs `video-recognition`, and a direct probe to `http://127.0.0.1:8080/api/webhook/cold-display-guard` returned `200 OK`, so this is the verified webhook target for this environment.
### Deployment Verification
- From inside the running `cold-display-guard-api` container on `192.168.5.103`:
- `http://host.docker.internal:8080/api/webhook/cold-display-guard` failed DNS resolution.
- `http://172.17.0.1:8080/api/webhook/cold-display-guard` returned `200 OK`.
- `http://192.168.5.103:8080/api/webhook/cold-display-guard` returned `200 OK`.
- The configured webhook target was set to `http://192.168.5.103:8080/api/webhook/cold-display-guard` for both `event_url` and `case_url`.
- Remote config was enriched to include:
- `case_sink`
- `alarm_snapshot_upload`
- `webhook_retry_sink`
- `webhook_delivery_sink`
- `webhooks`
- Code sync used `rsync` with `config/example.toml` excluded so the live RTSP URL and calibration polygons were preserved.
- Remote rebuild/restart completed for `cold-display-guard-api` and `cold-display-guard-runtime`.
- Verified after restart:
- `GET http://127.0.0.1:19080/api/manage/health` returned `status=ok`
- `GET http://127.0.0.1:19080/api/manage/config` showed `webhooks.enabled=true`
- `event_url` and `case_url` both active on `http://192.168.5.103:8080/api/webhook/cold-display-guard`
- `alarm_snapshot_upload.enabled=true`
## Current Task: Alarm Snapshot Calibration Overlay
**Goal:** Webhook-linked uploaded alarm snapshots should visually include the calibrated cold display zones and trash confirmation ROI from the current config.
**Design:** Keep the existing runtime flow intact: capture current RTSP frame, process events, then upload an alarm snapshot only for warning/alarm events. Before JPEG encoding, build overlay regions from `[[zones]]` plus `[trash].roi`, clamp normalized polygon coordinates to the image bounds, draw a semi-transparent fill and visible outline directly onto a copied `Frame.rgb`, and pass that annotated frame to the existing encoder/uploader. Do not change `BatchEngine`, Webhook payload shape, OTA upload protocol, or management snapshot capture.
- [x] Review task-relevant lessons and current dirty worktree.
- [x] Inspect `alarm_snapshots.py`, `main.py`, config polygon shape, and existing tests.
- [x] Write a failing unit test proving alert snapshot upload encodes an annotated frame when zones/trash ROI are configured.
- [x] Write focused unit tests for polygon overlay behavior using a tiny RGB frame.
- [x] Run targeted tests and confirm the new tests fail for the expected missing overlay behavior.
- [x] Implement the smallest standard-library overlay helper in `src/cold_display_guard/alarm_snapshots.py`.
- [x] Wire `capture_alert_snapshot` to apply configured overlays before JPEG encoding.
- [x] Run targeted snapshot/runtime tests.
- [x] Run the full Python test suite.
### Review
- Added `apply_calibration_overlay` in `src/cold_display_guard/alarm_snapshots.py` to draw configured food-zone polygons in yellow and the trash ROI in red onto a copied frame before JPEG encoding and OTA upload.
- The overlay clamps normalized coordinates to image bounds, draws semi-transparent fills plus outlines, and leaves the original `Frame.rgb` unchanged for downstream runtime processing.
- `capture_alert_snapshot` now encodes the annotated frame when warning/alarm events trigger snapshot upload; non-alert events and disabled upload behavior are unchanged.
- Targeted verification passed:
- `PYTHONPATH=src python3 -m unittest tests/test_alarm_snapshots.py -v`
- `PYTHONPATH=src python3 -m unittest tests/test_main.py -v`
- Full verification passed:
- `PYTHONPATH=src python3 -m unittest discover -s tests -v`
## Current Task: Deploy Overlay Update To 10.8.0.23
**Goal:** Deploy the alarm snapshot calibration overlay change to `xiaozheng@10.8.0.23` without overwriting live RTSP/calibration config or unrelated local changes.
**Plan:** Inspect the remote deployment layout first, confirm which containers are active, sync only the runtime source file required for the overlay change, rebuild/restart the API/runtime services that use the Python image, and verify both service health and the deployed source code.
- [x] Inspect remote deployment directory, Docker/Compose files, and active containers on `xiaozheng@10.8.0.23`.
- [x] Confirm the remote config file remains present and is not overwritten.
- [x] Sync `src/cold_display_guard/alarm_snapshots.py` to the remote deployment path.
- [x] Rebuild and restart only the affected `cold-display-guard-api` and `cold-display-guard-runtime` services when Compose is available.
- [x] Verify management API health after restart.
- [x] Verify the deployed remote source contains `apply_calibration_overlay`.
### Deployment Review
- Remote deployment path confirmed as `/home/xiaozheng/cold_display_guard`.
- Active services before deployment: `cold-display-guard-api`, `cold-display-guard-runtime`, and `cold-display-guard-web`.
- Remote live `config/example.toml` was checked before and after deployment and was not overwritten.
- Synced only `src/cold_display_guard/alarm_snapshots.py` to avoid deploying unrelated local `web/nginx.conf` changes.
- Created a timestamped backup of the previous remote `alarm_snapshots.py` beside the source file before syncing.
- Rebuilt `cold-display-guard:dev` with `docker compose --env-file deploy/cold-display-guard.env -f deploy/docker-compose.yml build cold-display-guard-api`.
- Restarted only `cold-display-guard-api` and `cold-display-guard-runtime` with Compose; `cold-display-guard-web` remained untouched.
- Verification passed:
- `curl http://127.0.0.1:19080/api/manage/health` returned `status=ok` and `runtime_status=running`.
- `docker exec cold-display-guard-api python3 -c ...` confirmed `apply_calibration_overlay` exists in the running image with signature `(frame, config) -> Frame`.
- API and runtime logs show normal startup after restart.
## Current Task: Update Timing Parameters On 10.8.0.23
**Goal:** Adjust the live timing settings on `xiaozheng@10.8.0.23` per operator request.
**Applied mapping:** The current application has no separate pre-warning threshold. It supports `max_dwell_seconds` for the time alarm/overdue threshold and `trash_confirmation_seconds` for the disposal confirmation window before warning escalation. Applied `max_dwell_seconds = 120` and `trash_confirmation_seconds = 30`.
- [x] Back up `/home/xiaozheng/cold_display_guard/config/example.toml`.
- [x] Update `[thresholds].max_dwell_seconds` from `300` to `120`.
- [x] Update `[thresholds].trash_confirmation_seconds` from `120` to `30`.
- [x] Restart `cold-display-guard-api` and `cold-display-guard-runtime`.
- [x] Verify `/api/manage/health`.
- [x] Verify `/api/manage/config` returns `{"max_dwell_seconds": 120, "trash_confirmation_seconds": 30}`.
### Timing Update Review
- Remote config was edited in place after creating a timestamped backup.
- `cold-display-guard-api` and `cold-display-guard-runtime` were explicitly restarted with Docker Compose.
- `cold-display-guard-web` was not restarted.
- Verification passed:
- `GET http://127.0.0.1:19080/api/manage/health` returned `status=ok` and `runtime_status=running`.
- `GET http://127.0.0.1:19080/api/manage/config` returned `max_dwell_seconds = 120` and `trash_confirmation_seconds = 30`.
- Container status showed `cold-display-guard-api` healthy and `cold-display-guard-runtime` running after restart.
- Note: requested `预警时长 = 1min` is not independently configurable in the current codebase; supporting distinct pre-warning at 60 seconds and overdue at 120 seconds would require a code change.
## Current Task: Pre-Warning Alarm Flow And Full Webhook/MQTT Chain
**Goal:** Implement the requested camera-side timing flow, deploy it to `xiaozheng@10.8.0.23`, and verify the Webhook -> `video_recognition_local` -> MQTT -> `store_data_platform` chain.
**Design:** Keep all timing decisions inside `cold_display_guard.BatchEngine`. Add separate thresholds for pre-warning, alarm, and alarm-removal timeout; emit explicit lifecycle events so downstream services do not infer camera-side timers. Keep `video_recognition_local` as a transparent Webhook/MQTT bridge, and update `store_data_platform` only where event names map to notifications, case types, and CRM penalty submission.
- [x] Review task-relevant instructions, lessons, and dirty worktree.
- [x] Inspect the current cold-display engine, case store, webhook payload, and tests.
- [x] Inspect `video_recognition_local` cold-display Webhook receiver and MQTT publisher.
- [x] Inspect `store_data_platform` cold-display MQTT consumer, notification mapping, and CRM submission trigger.
- [x] Inspect `xiaozheng@10.8.0.23` active containers and deployment paths.
- [x] Add failing cold-display engine/case/config/webhook tests for `time_pre_warning`, `pre_warning_handled`, `time_alarm`, and `alarm_removal_timeout`.
- [x] Implement the camera-side state machine and config fields.
- [x] Add/adjust `video_recognition_local` passthrough tests for the new event names.
- [x] Add/adjust `store_data_platform` tests and mappings for new event semantics.
- [x] Run local targeted and full relevant verification.
- [x] Deploy changed services to `xiaozheng@10.8.0.23` without overwriting live RTSP/calibration secrets.
- [x] Update the remote timing config to `pre_warning_seconds=60`, `max_dwell_seconds=120`, `alarm_removal_seconds=30`, `trash_confirmation_seconds=30`.
- [x] Verify remote Webhook target reachability from the cold-display container to local `video-recognition`.
- [x] Observe cold-display, video-recognition, MQTT, and platform logs; record the result.
### Current Findings
- `cold_display_guard` currently has only `max_dwell_seconds` and `trash_confirmation_seconds`; it cannot independently represent 1-minute pre-warning, 2-minute alarm, and 30-second alarm-removal timeout.
- `video_recognition_local` receives `/api/webhook/cold-display-guard` payloads as generic JSON and forwards them to MQTT; new event names should remain transparent, but tests should lock this behavior.
- `store_data_platform` currently treats `time_alarm` and `batch_pending_disposal` as warning notifications, and only `warning_escalated` triggers CRM penalty submission. This must change so `time_pre_warning` is the warning, `time_alarm` is the alert reminder, and `alarm_removal_timeout` triggers CRM submission.
- On `10.8.0.23`, active containers include `cold-display-guard-*`, `video-recognition`, and `mosquitto`; `video-recognition` runs with host networking, while `cold-display-guard-api` runs on its Compose network.
### Local Verification
- Cold-display full Python suite passed: `PYTHONPATH=src python3 -m unittest discover -s tests -v` (`98` tests).
- `video_recognition_local` cold-display focused tests passed: `go test ./internal/server ./internal/mqtt ./cmd -run 'TestColdDisplayGuard|Test.*ColdDisplayGuard' -count=1`.
- `store_data_platform` display-cabinet service focused tests passed: `go test ./store_data/service -run 'Test.*StoreDisplayCabinet|TestResolveStoreDisplayCabinet.*|TestShouldSubmitStoreDisplayCabinetPenalty|TestBuildStoreDisplayCabinet.*' -count=1`.
### Deployment Review
- Synced only these cold-display source files to `xiaozheng@10.8.0.23:/home/xiaozheng/cold_display_guard/src/cold_display_guard/`: `models.py`, `config.py`, `engine.py`, `cases.py`, `webhooks.py`.
- Backed up the remote source files and live `config/example.toml` before deployment.
- Updated the live remote thresholds to `pre_warning_seconds=60`, `max_dwell_seconds=120`, `alarm_removal_seconds=30`, and `trash_confirmation_seconds=30`.
- Updated the live remote Webhook target from the unreachable old host to `http://10.8.0.23:8080/api/webhook/cold-display-guard`.
- Rebuilt `cold-display-guard:dev` and restarted only `cold-display-guard-api` and `cold-display-guard-runtime`.
- Remote verification passed:
- `GET /api/manage/health` returned `status=ok` and `runtime_status=running`.
- `GET /api/manage/config` returned the four expected threshold values and the new Webhook target.
- Container-side synthetic engine run emitted `batch_started`, `time_pre_warning`, `time_alarm`, `alarm_removal_timeout`, then `batch_pending_disposal` plus `batch_discarded`.
- Natural runtime log emitted `alarm_removal_timeout` for `batch_000881` at `2026-06-15T11:52:20+08:00`.
- Webhook delivery for that event returned HTTP `200` from `video-recognition`.
- `video_recognition_local` result JSONL recorded both `alarm_removal_timeout` batch and case events.
- MQTT probe confirmed `video-recognition` published to `video/cold-display-guard/result/cold-display-guard` with `device_identifier=cold-display-guard`.
- `store_data_platform` is not deployed on `10.8.0.23` under that repository name or as an identifiable container; platform handling changes were completed and verified in the local repository.
- The cold-display retry queue has no pending entries; old `192.168.5.103` failures are already dead-letter history.
## Current Task: Alarm Snapshot Labels And Zone Colors
**Goal:** Uploaded alarm screenshots should show each calibrated region name directly on the image, and different cold-display zones should use different overlay colors.
**Design:** Extend the existing standard-library overlay path. Keep drawing configured polygons before JPEG upload, but carry a display label for each region, choose a stable color from a fixed palette by zone order, and draw a small high-contrast text label inside the polygon. Keep trash ROI red and labeled separately.
- [x] Inspect the current calibration overlay helper and tests.
- [x] Add failing tests for per-zone colors and visible region labels.
- [x] Implement labels and stable zone color palette.
- [x] Run snapshot tests and full Python tests.
- [x] Deploy the overlay update to `xiaozheng@10.8.0.23`.
- [x] Verify remote API/runtime health and deployed overlay helper.
### Review
- `apply_calibration_overlay` now assigns each cold-display zone a stable color from a fixed palette and keeps the trash ROI red.
- Each overlay region now carries a label and draws a small high-contrast label box directly on the frame before JPEG encoding/upload.
- The built-in label renderer covers common现场 labels such as `区域 1` through digits and `垃圾区`, plus basic ASCII for custom numeric/English labels.
- Verification passed:
- `PYTHONPATH=src python3 -m unittest tests/test_alarm_snapshots.py -v`
- `PYTHONPATH=src python3 -m unittest discover -s tests -v` (`99` tests)
- Deployed `src/cold_display_guard/alarm_snapshots.py` to `xiaozheng@10.8.0.23` after backing up the previous remote file.
- Rebuilt `cold-display-guard:dev` and restarted `cold-display-guard-api` plus `cold-display-guard-runtime`.
- Remote verification passed:
- `GET /api/manage/health` returned `status=ok` and `runtime_status=running`.
- Container-side overlay smoke test confirmed two zones render different RGB values and label text pixels are present.
## Current Task: Alarm Snapshot Chinese Label Rendering Fix
**Goal:** Fix unreadable/garbled Chinese region names on uploaded alarm screenshots while keeping per-zone colors and fallback labeling robust.
**Design:** Use a real CJK font renderer for Chinese labels in the alarm snapshot overlay path. Install Noto CJK fonts in the runtime image, render labels through ffmpeg `drawtext` when the font is available, and fall back to readable ASCII labels if the font renderer is unavailable.
- [x] Reproduce and identify the likely root cause: remote container only matched DejaVu for `zh-cn`, so Chinese labels had no real CJK font path.
- [x] Add regression tests for Docker CJK font installation and readable ASCII fallback labels.
- [x] Update `Dockerfile` to install `fonts-noto-cjk`.
- [x] Update `alarm_snapshots.py` to prefer CJK font rendering and use `R1`/`TRASH` fallback text when needed.
- [x] Run focused and full local Python verification.
- [x] Deploy `Dockerfile` and `alarm_snapshots.py` to `xiaozheng@10.8.0.23` without overwriting live config.
- [x] Rebuild/restart `cold-display-guard-api` and `cold-display-guard-runtime`.
- [x] Verify remote API/runtime health, CJK font availability, overlay smoke behavior, and runtime logs.
### Review
- Root cause was the screenshot overlay path not having a real Chinese font renderer in the deployed image; the container matched DejaVu before this fix.
- The rebuilt remote container now reports `NotoSansCJK-Regular.ttc: "Noto Sans CJK SC" "Regular"` for `fc-match :lang=zh-cn`.
- Remote overlay smoke test confirmed `find_cjk_font_file()` returns `/usr/share/fonts/opentype/noto/NotoSansCJK-Regular.ttc`, Chinese labels change the frame, bright label pixels are present, and different regions retain distinct colors.
- Local verification passed:
- `PYTHONPATH=src python3 -m unittest tests/test_alarm_snapshots.py -v`
- `PYTHONPATH=src python3 -m unittest discover -s tests -v` (`101` tests)
- Remote verification passed:
- `GET /api/manage/health` returned `status=ok`, `runtime_status=running`, and version `dev`.
- `cold-display-guard-api` is healthy and `cold-display-guard-runtime` is running after restart.
- Runtime logs show normal startup after the restart.
## Current Task: Investigate False Normal Consumption Events On 10.8.0.23
**Goal:** Determine why the live system records a normal consumption event about every two minutes with a dwell time near 13 seconds even when no one touched the cold display cabinet.
**Debug plan:** Inspect remote runtime/event/case/diagnostic logs first, correlate `batch_started` and `batch_consumed` pairs by zone and dwell time, then trace the vision metrics for those timestamps to identify whether the source is occupancy flicker, runtime restart state restoration, config thresholds, or downstream display interpretation.
- [ ] Inspect recent remote events and confirm the exact event names, zones, dwell seconds, and cadence.
- [ ] Inspect runtime diagnostics around those timestamps for occupancy and vision metric flicker.
- [ ] Inspect live config and runtime logs for sampling/stabilization settings and restarts.
- [x] Form and test a root-cause hypothesis before changing code or live thresholds.
- [x] Record findings, fix if needed, and verify with logs/tests.
### Findings And Fix
- The repeated records were real `batch_started` -> `batch_consumed` events from the camera-side engine, not a downstream display issue.
- Before the fix, recent events showed repeated zone 1 batches ending after 13-33 seconds, matching the two-frame confirmation cadence at the current sampling rate.
- Root cause had two parts:
- Zone 1 was genuinely occupied, but its vision signal hovered around the old relative dark threshold, so short raw-occupancy dips were interpreted as item removal.
- Zone 2 was occupied before or during baseline learning, so its relative difference from baseline stayed near zero and it was not detected as occupied.
- Added `occupancy_absolute_dark_fraction` in `src/cold_display_guard/vision.py`, defaulting to `0.0` so existing configs are unchanged unless they opt in.
- Updated the live config on `xiaozheng@10.8.0.23`:
- `occupancy_dark_fraction = 0.12`
- `occupancy_absolute_dark_fraction = 0.085`
- `empty_confirm_frames = 6`
- Rebuilt and restarted `cold-display-guard-api` and `cold-display-guard-runtime`.
- Verification:
- Local full Python suite passed: `PYTHONPATH=src python3 -m unittest discover -s tests -v` (`102` tests).
- Remote health returned `status=ok` and `runtime_status=running`.
- Remote container config shows the new thresholds.
- After deployment, latest diagnostics stabilized at `zone_counts = {"1": 1, "2": 1, "6": 1}`.
- During a two-minute observation window after `13:25`, no new `batch_consumed` events were emitted; only expected pre-warning/alarm lifecycle events appeared for the occupied zones.
## Current Task: Reduce Alarm Snapshot Label Visual Obstruction
**Goal:** Region labels on uploaded alarm screenshots should be smaller and more transparent so operators can inspect the food/display image underneath.
**Design:** Keep the existing label content, placement, CJK font rendering, and per-zone colors. Only reduce the visual weight of the label layer by lowering font size, black label-box opacity, border width, and fallback label-box opacity.
- [x] Inspect current alarm snapshot label rendering style.
- [x] Add a regression test for smaller ffmpeg drawtext label style.
- [x] Reduce drawtext font size and label-box opacity.
- [x] Keep fallback label renderer visually consistent with the ffmpeg path.
- [x] Run full local verification.
- [x] Deploy the updated snapshot overlay style to `xiaozheng@10.8.0.23`.
- [x] Verify remote runtime health and deployed label style.
### Notes
- Targeted snapshot test passed: `PYTHONPATH=src python3 -m unittest tests/test_alarm_snapshots.py -v`.
- Full local verification passed: `PYTHONPATH=src python3 -m unittest discover -s tests -v` (`103` tests).
- Remote verification passed:
- `GET /api/manage/health` returned `status=ok` and `runtime_status=running`.
- Running container uses `fontsize=13`, `boxcolor=black@0.34`, and `boxborderw=2` for region labels.
- `cold-display-guard-runtime` logs show normal startup after restart.
## Current Task: Limit Alert Snapshot Overlay To Event Zones
**Goal:** Uploaded warning/alarm screenshots should only draw the cold-display region polygons and names for the zones that actually triggered the warning/alarm event. Other configured zones and the trash ROI should not be drawn on those uploaded screenshots.
**Plan:** Keep the full calibration overlay helper available for tests and general use, but pass alert event zone IDs from `capture_alert_snapshot` into the overlay loader and disable trash ROI drawing for alert uploads.
- [x] Add a regression test proving alert snapshot upload only annotates the triggering event zone.
- [x] Filter snapshot overlay regions by event `zone_id` during alert upload.
- [x] Preserve full overlay behavior when `apply_calibration_overlay` is called without filters.
- [x] Run full local Python verification.
- [x] Deploy `alarm_snapshots.py` to `xiaozheng@10.8.0.23`.
- [x] Verify remote API/runtime health and deployed filtered-overlay behavior.
### Review
- Local verification passed:
- `PYTHONPATH=src python3 -m unittest tests/test_alarm_snapshots.py -v`
- `PYTHONPATH=src python3 -m unittest discover -s tests -v` (`104` tests)
- Deployed only `src/cold_display_guard/alarm_snapshots.py` to `xiaozheng@10.8.0.23` after backing up the previous remote file; live config was not overwritten.
- Rebuilt `cold-display-guard:dev` and restarted `cold-display-guard-api` plus `cold-display-guard-runtime`.
- Remote verification passed:
- `GET /api/manage/health` returned `status=ok` and `runtime_status=running`.
- Container-side smoke test for a zone-1 alert returned `zone1_changed=True`, `zone2_unchanged=True`, and `trash_unchanged=True`.
- API/runtime logs show normal startup after restart.
## Current Task: Check Webhook Duplicate Delivery
**Goal:** Verify whether `cold_display_guard` is sending duplicate Webhook requests to `video-recognition` on `xiaozheng@10.8.0.23`.
**Investigation:** Compare the sending code path, remote webhook delivery audit, retry queue state, cold-display event/case logs, `video-recognition` HTTP logs, and the receiver-side JSONL payloads.
- [x] Inspect sender code path for direct event/case delivery and retry drain behavior.
- [x] Confirm remote Webhook config uses the same URL for `event_url` and `case_url`.
- [x] Check sender delivery audit for duplicate receiver `task_id` values.
- [x] Check retry queue for pending successful redelivery risk.
- [x] Check receiver-side cold-display JSONL for duplicate payloads and duplicate business keys.
- [x] Trace the only coarse duplicate-looking case around `batch_000898`.
### Review
- Current remote config sends both `batch_event` and `case_event` to `http://10.8.0.23:8080/api/webhook/cold-display-guard`, so one business transition can produce two HTTP POSTs to the same endpoint with different `kind` values.
- Sender audit `logs/webhook_delivery.jsonl` contains `3056` records total; recent valid delivery has `321` direct `ok` records and `0` retry `ok` records.
- Receiver-returned `task_id` values are unique: `321` unique task IDs and `0` duplicate task IDs.
- Retry queue has `547` latest retry items, all `dead_letter`; there are no pending retries.
- Receiver-side `video-recognition` cold-display files for `2026-06-15` contain `181` business payloads; exact payload duplicates are `0`, and fine-grained business key duplicates are `0`.
- Sender `events.jsonl` contains `3325` events; duplicate `(batch_id, event, ts, zone_id)` keys are `0`.
- The only coarse duplicate-looking receiver entry was `batch_000898` at `13:20:26`: the same frame emitted `time_pre_warning` and `pre_warning_handled`, which produced separate `case_event` actions `created` and `handled`. This is not the same Webhook request repeated.

View File

@@ -2,9 +2,12 @@ from __future__ import annotations
import unittest import unittest
from datetime import datetime, timezone from datetime import datetime, timezone
from pathlib import Path
from cold_display_guard import alarm_snapshots
from cold_display_guard.alarm_snapshots import ( from cold_display_guard.alarm_snapshots import (
capture_alert_snapshot, capture_alert_snapshot,
fallback_label_text,
load_alarm_snapshot_settings, load_alarm_snapshot_settings,
upload_snapshot_bytes, upload_snapshot_bytes,
) )
@@ -127,6 +130,186 @@ class AlarmSnapshotTests(unittest.TestCase):
self.assertEqual(result["object_key"], "uploads/alarms/test.jpg") self.assertEqual(result["object_key"], "uploads/alarms/test.jpg")
self.assertEqual(result["batch_ids"], ["batch_1"]) self.assertEqual(result["batch_ids"], ["batch_1"])
def test_calibration_overlay_draws_zones_and_trash_roi_without_mutating_source(self) -> None:
apply_overlay = getattr(alarm_snapshots, "apply_calibration_overlay", None)
self.assertTrue(callable(apply_overlay), "apply_calibration_overlay should be available")
frame = Frame(width=5, height=5, rgb=b"\x00\x00\x00" * 25)
annotated = apply_overlay(
frame,
{
"zones": [
{
"id": "1",
"label": "区域 1",
"polygon": [[0.2, 0.2], [0.8, 0.2], [0.8, 0.8], [0.2, 0.8]],
}
],
"trash": {"roi": [[0.0, 0.0], [0.4, 0.0], [0.0, 0.4]]},
},
)
self.assertEqual(frame.rgb, b"\x00\x00\x00" * 25)
self.assertNotEqual(annotated.rgb, frame.rgb)
self.assertNotEqual(annotated.pixel(1, 1), (0, 0, 0))
self.assertNotEqual(annotated.pixel(2, 2), (0, 0, 0))
self.assertNotEqual(annotated.pixel(0, 0), (0, 0, 0))
def test_calibration_overlay_uses_distinct_zone_colors_and_draws_labels(self) -> None:
frame = Frame(width=40, height=20, rgb=b"\x00\x00\x00" * 800)
annotated = alarm_snapshots.apply_calibration_overlay(
frame,
{
"zones": [
{
"id": "1",
"label": "区域 1",
"polygon": [[0.05, 0.10], [0.40, 0.10], [0.40, 0.90], [0.05, 0.90]],
},
{
"id": "2",
"label": "区域 2",
"polygon": [[0.55, 0.10], [0.90, 0.10], [0.90, 0.90], [0.55, 0.90]],
},
]
},
)
self.assertNotEqual(annotated.pixel(10, 15), annotated.pixel(30, 15))
label_pixels = [annotated.pixel(x, y) for y in range(2, 10) for x in range(2, 18)]
self.assertTrue(any(max(pixel) >= 220 for pixel in label_pixels), "expected bright label text pixels")
def test_chinese_label_fallback_uses_readable_ascii_when_font_renderer_is_unavailable(self) -> None:
self.assertEqual(fallback_label_text("区域 1"), "R1")
self.assertEqual(fallback_label_text("区域 12"), "R12")
self.assertEqual(fallback_label_text("垃圾区"), "TRASH")
def test_docker_image_installs_cjk_fonts_for_alarm_snapshot_labels(self) -> None:
dockerfile = (Path(__file__).resolve().parents[1] / "Dockerfile").read_text(encoding="utf-8")
self.assertIn("fonts-noto-cjk", dockerfile)
def test_drawtext_label_style_stays_small_and_translucent(self) -> None:
filter_text = alarm_snapshots.build_drawtext_filter(
[
alarm_snapshots.OverlayLabel(
text="区域 1",
fallback_text="R1",
x=10,
y=20,
accent_rgb=(255, 196, 0),
)
],
Path("/tmp/NotoSansCJK-Regular.ttc"),
height=360,
)
self.assertIn("fontsize=13", filter_text)
self.assertIn("boxcolor=black@0.34", filter_text)
self.assertIn("boxborderw=2", filter_text)
def test_capture_alert_snapshot_encodes_frame_with_configured_calibration_overlay(self) -> None:
encoded_frames: list[Frame] = []
def fake_encode(frame: Frame, timeout_seconds: float) -> bytes:
encoded_frames.append(frame)
return b"jpeg-bytes"
def fake_upload(
image_bytes: bytes,
*,
file_name: str,
object_key_hint: str,
settings,
post_json_request=None,
post_multipart_request=None,
) -> dict[str, object]:
return {"status": "uploaded", "object_key": "uploads/alarms/overlay.jpg", "file_name": file_name}
source_frame = Frame(width=5, height=5, rgb=b"\x00\x00\x00" * 25)
result = capture_alert_snapshot(
source_frame,
[{"event": "time_alarm", "severity": "alarm", "batch_id": "batch_1", "camera_id": "cam_1", "zone_id": "1", "ts": "2026-06-09T09:00:00+00:00"}],
{
"alarm_snapshot_upload": {"enabled": True},
"zones": [
{
"id": "1",
"label": "区域 1",
"polygon": [[0.2, 0.2], [0.8, 0.2], [0.8, 0.8], [0.2, 0.8]],
}
],
"trash": {"roi": [[0.0, 0.0], [0.4, 0.0], [0.0, 0.4]]},
},
now=datetime(2026, 6, 9, 9, 0, tzinfo=UTC),
jpeg_encoder=fake_encode,
uploader=fake_upload,
)
self.assertEqual(result["status"], "uploaded")
self.assertEqual(source_frame.rgb, b"\x00\x00\x00" * 25)
self.assertEqual(len(encoded_frames), 1)
self.assertNotEqual(encoded_frames[0].rgb, source_frame.rgb)
self.assertNotEqual(encoded_frames[0].pixel(1, 1), (0, 0, 0))
def test_capture_alert_snapshot_only_draws_alert_event_zones(self) -> None:
encoded_frames: list[Frame] = []
def fake_encode(frame: Frame, timeout_seconds: float) -> bytes:
encoded_frames.append(frame)
return b"jpeg-bytes"
def fake_upload(
image_bytes: bytes,
*,
file_name: str,
object_key_hint: str,
settings,
post_json_request=None,
post_multipart_request=None,
) -> dict[str, object]:
return {"status": "uploaded", "object_key": "uploads/alarms/zone-only.jpg", "file_name": file_name}
source_frame = Frame(width=30, height=20, rgb=b"\x00\x00\x00" * 600)
result = capture_alert_snapshot(
source_frame,
[
{
"event": "time_alarm",
"severity": "alarm",
"batch_id": "batch_1",
"camera_id": "cam_1",
"zone_id": "1",
"ts": "2026-06-09T09:00:00+00:00",
}
],
{
"alarm_snapshot_upload": {"enabled": True},
"zones": [
{
"id": "1",
"label": "区域 1",
"polygon": [[0.00, 0.00], [0.45, 0.00], [0.45, 1.00], [0.00, 1.00]],
},
{
"id": "2",
"label": "区域 2",
"polygon": [[0.55, 0.00], [1.00, 0.00], [1.00, 1.00], [0.55, 1.00]],
},
],
"trash": {"roi": [[0.45, 0.50], [0.55, 0.50], [0.55, 1.00], [0.45, 1.00]]},
},
now=datetime(2026, 6, 9, 9, 0, tzinfo=UTC),
jpeg_encoder=fake_encode,
uploader=fake_upload,
)
self.assertEqual(result["status"], "uploaded")
self.assertEqual(len(encoded_frames), 1)
self.assertNotEqual(encoded_frames[0].pixel(5, 10), (0, 0, 0))
self.assertEqual(encoded_frames[0].pixel(25, 10), (0, 0, 0))
self.assertEqual(encoded_frames[0].pixel(15, 15), (0, 0, 0))
if __name__ == "__main__": if __name__ == "__main__":
unittest.main() unittest.main()

View File

@@ -48,6 +48,55 @@ class CaseStoreTests(unittest.TestCase):
self.assertEqual(snapshots[0]["case_status"], "open") self.assertEqual(snapshots[0]["case_status"], "open")
self.assertEqual(snapshots[0]["source_event"], "time_alarm") self.assertEqual(snapshots[0]["source_event"], "time_alarm")
def test_time_pre_warning_creates_open_pre_warning_case(self) -> None:
store = CaseStore()
snapshots = store.apply_batch_events(
[event("time_pre_warning", self.t0, severity="warning", state="pre_warning")]
)
self.assertEqual(len(snapshots), 1)
self.assertEqual(snapshots[0]["case_type"], "pre_warning")
self.assertEqual(snapshots[0]["case_status"], "open")
self.assertEqual(snapshots[0]["source_event"], "time_pre_warning")
def test_pre_warning_handled_auto_closes_open_case(self) -> None:
store = CaseStore()
store.apply_batch_events([event("time_pre_warning", self.t0, severity="warning", state="pre_warning")])
snapshots = store.apply_batch_events(
[event("pre_warning_handled", self.t0.replace(minute=1), severity="info", state="handled")]
)
self.assertEqual(len(snapshots), 1)
self.assertEqual(snapshots[0]["case_status"], "handled")
self.assertEqual(snapshots[0]["handled_source"], "auto_removed_before_alarm")
def test_time_alarm_upgrades_pre_warning_case(self) -> None:
store = CaseStore()
store.apply_batch_events([event("time_pre_warning", self.t0, severity="warning", state="pre_warning")])
snapshots = store.apply_batch_events([event("time_alarm", self.t0.replace(minute=2), severity="alarm", state="alerted")])
self.assertEqual(len(snapshots), 1)
self.assertEqual(snapshots[0]["case_type"], "time_alarm")
self.assertEqual(snapshots[0]["case_status"], "open")
self.assertEqual(snapshots[0]["source_event"], "time_alarm")
def test_alarm_removal_timeout_upgrades_same_case(self) -> None:
store = CaseStore()
store.apply_batch_events([event("time_pre_warning", self.t0, severity="warning", state="pre_warning")])
store.apply_batch_events([event("time_alarm", self.t0.replace(minute=2), severity="alarm", state="alerted")])
snapshots = store.apply_batch_events(
[event("alarm_removal_timeout", self.t0.replace(minute=3), severity="alarm", state="alarm_removal_timeout")]
)
self.assertEqual(len(snapshots), 1)
self.assertEqual(snapshots[0]["case_type"], "alarm_removal_timeout")
self.assertEqual(snapshots[0]["case_status"], "open")
self.assertEqual(snapshots[0]["source_event"], "alarm_removal_timeout")
def test_pending_disposal_upgrades_existing_case(self) -> None: def test_pending_disposal_upgrades_existing_case(self) -> None:
store = CaseStore() store = CaseStore()
store.apply_batch_events([event("time_alarm", self.t0, severity="alarm", state="alerted")]) store.apply_batch_events([event("time_alarm", self.t0, severity="alarm", state="alerted")])

View File

@@ -16,7 +16,9 @@ class ConfigTests(unittest.TestCase):
camera_id = "cam_a" camera_id = "cam_a"
[thresholds] [thresholds]
pre_warning_seconds = 20
max_dwell_seconds = 30 max_dwell_seconds = 30
alarm_removal_seconds = 2
trash_confirmation_seconds = 4 trash_confirmation_seconds = 4
[layout] [layout]
@@ -29,7 +31,9 @@ cols = 2
settings = load_settings(path) settings = load_settings(path)
self.assertEqual(settings.camera_id, "cam_a") self.assertEqual(settings.camera_id, "cam_a")
self.assertEqual(settings.pre_warning_seconds, 20)
self.assertEqual(settings.max_dwell_seconds, 30) self.assertEqual(settings.max_dwell_seconds, 30)
self.assertEqual(settings.alarm_removal_seconds, 2)
self.assertEqual(settings.trash_confirmation_seconds, 4) self.assertEqual(settings.trash_confirmation_seconds, 4)
self.assertEqual(settings.zone_ids, ("r1c1", "r1c2")) self.assertEqual(settings.zone_ids, ("r1c1", "r1c2"))
@@ -118,6 +122,8 @@ zone_ids = ["1", "2", "3"]
text = path.read_text(encoding="utf-8") text = path.read_text(encoding="utf-8")
self.assertIn("zone_count = 2", text) self.assertIn("zone_count = 2", text)
self.assertIn("pre_warning_seconds = 0", text)
self.assertIn("alarm_removal_seconds = 0", text)
self.assertIn('label = "区域 1"', text) self.assertIn('label = "区域 1"', text)
self.assertIn("[trash]", text) self.assertIn("[trash]", text)
self.assertNotIn('"trash"', text.split("[layout]", maxsplit=1)[1].split("[[zones]]", maxsplit=1)[0]) self.assertNotIn('"trash"', text.split("[layout]", maxsplit=1)[1].split("[[zones]]", maxsplit=1)[0])

View File

@@ -140,6 +140,96 @@ class BatchEngineTests(unittest.TestCase):
self.assertEqual(alarm_events[0]["current_count"], 1) self.assertEqual(alarm_events[0]["current_count"], 1)
self.assertIn("alerted_at", alarm_events[0]) self.assertIn("alerted_at", alarm_events[0])
def test_pre_warning_emits_once_before_alarm_threshold(self) -> None:
settings = EngineSettings(
camera_id="test_cam",
pre_warning_seconds=60,
max_dwell_seconds=120,
alarm_removal_seconds=30,
trash_confirmation_seconds=30,
zone_ids=("1",),
)
engine = BatchEngine(settings)
engine.process(obs(self.t0, {"1": 1}))
pre_warning_events = engine.process(obs(self.t0 + timedelta(seconds=60), {"1": 1}))
repeated_events = engine.process(obs(self.t0 + timedelta(seconds=90), {"1": 1}))
self.assertEqual([event["event"] for event in pre_warning_events], ["time_pre_warning"])
self.assertEqual(pre_warning_events[0]["severity"], "warning")
self.assertEqual(pre_warning_events[0]["state"], "pre_warning")
self.assertEqual(pre_warning_events[0]["pre_warning_seconds"], 60)
self.assertEqual(pre_warning_events[0]["pre_warned_at"], (self.t0 + timedelta(seconds=60)).isoformat())
self.assertEqual(pre_warning_events[0]["current_count"], 1)
self.assertEqual(repeated_events, [])
def test_pre_warning_removed_before_alarm_is_auto_handled(self) -> None:
settings = EngineSettings(
camera_id="test_cam",
pre_warning_seconds=60,
max_dwell_seconds=120,
alarm_removal_seconds=30,
trash_confirmation_seconds=30,
zone_ids=("1",),
)
engine = BatchEngine(settings)
engine.process(obs(self.t0, {"1": 1}))
engine.process(obs(self.t0 + timedelta(seconds=60), {"1": 1}))
events = engine.process(obs(self.t0 + timedelta(seconds=90), {"1": 0}))
self.assertEqual([event["event"] for event in events], ["pre_warning_handled"])
self.assertEqual(events[0]["severity"], "info")
self.assertEqual(events[0]["state"], "handled")
self.assertEqual(events[0]["handled_source"], "auto_removed_before_alarm")
self.assertEqual(events[0]["ended_at"], (self.t0 + timedelta(seconds=90)).isoformat())
def test_alarm_removal_timeout_emits_once_before_late_removal(self) -> None:
settings = EngineSettings(
camera_id="test_cam",
pre_warning_seconds=60,
max_dwell_seconds=120,
alarm_removal_seconds=30,
trash_confirmation_seconds=30,
zone_ids=("1",),
)
engine = BatchEngine(settings)
engine.process(obs(self.t0, {"1": 1}))
engine.process(obs(self.t0 + timedelta(seconds=60), {"1": 1}))
alarm_events = engine.process(obs(self.t0 + timedelta(seconds=120), {"1": 1}))
timeout_events = engine.process(obs(self.t0 + timedelta(seconds=151), {"1": 1}))
repeated_events = engine.process(obs(self.t0 + timedelta(seconds=160), {"1": 1}))
removal_events = engine.process(obs(self.t0 + timedelta(seconds=170), {"1": 0}, trash=True))
self.assertEqual([event["event"] for event in alarm_events], ["time_alarm"])
self.assertEqual(alarm_events[0]["alarm_removal_deadline"], (self.t0 + timedelta(seconds=150)).isoformat())
self.assertEqual([event["event"] for event in timeout_events], ["alarm_removal_timeout"])
self.assertEqual(timeout_events[0]["severity"], "alarm")
self.assertEqual(timeout_events[0]["state"], "alarm_removal_timeout")
self.assertEqual(timeout_events[0]["reason"], "alarmed_batch_not_removed_after_alarm_window")
self.assertEqual(repeated_events, [])
self.assertEqual([event["event"] for event in removal_events], ["batch_pending_disposal", "batch_discarded"])
def test_alarmed_batch_removed_within_alarm_window_does_not_emit_removal_timeout(self) -> None:
settings = EngineSettings(
camera_id="test_cam",
pre_warning_seconds=60,
max_dwell_seconds=120,
alarm_removal_seconds=30,
trash_confirmation_seconds=30,
zone_ids=("1",),
)
engine = BatchEngine(settings)
engine.process(obs(self.t0, {"1": 1}))
engine.process(obs(self.t0 + timedelta(seconds=60), {"1": 1}))
engine.process(obs(self.t0 + timedelta(seconds=120), {"1": 1}))
events = engine.process(obs(self.t0 + timedelta(seconds=150), {"1": 0}, trash=True))
self.assertEqual([event["event"] for event in events], ["batch_pending_disposal", "batch_discarded"])
self.assertTrue(all(event["event"] != "alarm_removal_timeout" for event in events))
def test_alarmed_batch_removed_without_trash_deposit_escalates_warning(self) -> None: def test_alarmed_batch_removed_without_trash_deposit_escalates_warning(self) -> None:
settings = EngineSettings( settings = EngineSettings(
camera_id="test_cam", camera_id="test_cam",
@@ -260,6 +350,37 @@ class BatchEngineTests(unittest.TestCase):
self.assertEqual(removal_events[0]["batch_id"], "batch_000124") self.assertEqual(removal_events[0]["batch_id"], "batch_000124")
self.assertEqual(removal_events[0]["dwell_seconds"], 1400) self.assertEqual(removal_events[0]["dwell_seconds"], 1400)
def test_restore_keeps_alarm_removal_timeout_deadline_after_runtime_restart(self) -> None:
settings = EngineSettings(
camera_id="test_cam",
pre_warning_seconds=60,
max_dwell_seconds=120,
alarm_removal_seconds=30,
trash_confirmation_seconds=30,
zone_ids=("1",),
)
engine = BatchEngine(settings)
engine.restore_from_events(
[
{
"event": "time_alarm",
"zone_id": "1",
"batch_id": "batch_000124",
"started_at": self.t0.isoformat(),
"pre_warned_at": (self.t0 + timedelta(seconds=60)).isoformat(),
"alerted_at": (self.t0 + timedelta(seconds=120)).isoformat(),
"current_count": 1,
"state": "alerted",
},
],
active_zone_counts={"1": 1},
)
timeout_events = engine.process(obs(self.t0 + timedelta(seconds=151), {"1": 1}))
self.assertEqual([event["event"] for event in timeout_events], ["alarm_removal_timeout"])
self.assertEqual(timeout_events[0]["batch_id"], "batch_000124")
def test_restore_skips_active_false_positive_when_latest_zone_count_is_empty(self) -> None: def test_restore_skips_active_false_positive_when_latest_zone_count_is_empty(self) -> None:
settings = EngineSettings( settings = EngineSettings(
camera_id="test_cam", camera_id="test_cam",

View File

@@ -6,7 +6,7 @@ import unittest
from datetime import datetime, timezone from datetime import datetime, timezone
from pathlib import Path from pathlib import Path
from cold_display_guard.cases import CaseStore from cold_display_guard.cases import CaseStore, append_case_snapshots, load_case_snapshots
from cold_display_guard.main import ( from cold_display_guard.main import (
case_sink_path, case_sink_path,
capture_runtime_alarm_snapshot, capture_runtime_alarm_snapshot,
@@ -67,6 +67,61 @@ class RuntimeRestoreTests(unittest.TestCase):
self.assertEqual(written[0]["case_type"], "time_alarm") self.assertEqual(written[0]["case_type"], "time_alarm")
self.assertEqual(written[0]["case_status"], "open") self.assertEqual(written[0]["case_status"], "open")
def test_persist_case_updates_preserves_api_handled_snapshot(self) -> None:
with tempfile.TemporaryDirectory() as tmpdir:
path = Path(tmpdir) / "cases.jsonl"
runtime_store = CaseStore()
created = persist_case_updates(
runtime_store,
path,
[
{
"event": "time_alarm",
"ts": datetime(2026, 6, 9, 9, 0, tzinfo=UTC).isoformat(),
"batch_id": "batch_000001",
"camera_id": "cam_01",
"zone_id": "1",
"zone_label": "区域 1",
"severity": "alarm",
"state": "alerted",
}
],
)[0]
api_store = CaseStore(load_case_snapshots(path))
append_case_snapshots(
path,
[
api_store.mark_handled(
str(created["case_id"]),
handled_at=datetime(2026, 6, 9, 9, 5, tzinfo=UTC),
handled_by="alice",
handled_source="manual",
)
],
)
snapshots = persist_case_updates(
runtime_store,
path,
[
{
"event": "batch_pending_disposal",
"ts": datetime(2026, 6, 9, 9, 6, tzinfo=UTC).isoformat(),
"batch_id": "batch_000001",
"camera_id": "cam_01",
"zone_id": "1",
"zone_label": "区域 1",
"severity": "warning",
"state": "pending_disposal",
}
],
)
latest = CaseStore(load_case_snapshots(path)).latest_cases()[0]
self.assertEqual(snapshots, [])
self.assertEqual(latest["case_status"], "handled")
self.assertEqual(latest["handled_source"], "manual")
def test_deliver_runtime_webhooks_sends_event_and_case_payloads(self) -> None: def test_deliver_runtime_webhooks_sends_event_and_case_payloads(self) -> None:
deliveries: list[tuple[str, dict[str, object]]] = [] deliveries: list[tuple[str, dict[str, object]]] = []

View File

@@ -10,6 +10,7 @@ from cold_display_guard.vision import (
RuntimeVisionSettings, RuntimeVisionSettings,
ZoneOccupancyDetector, ZoneOccupancyDetector,
load_runtime_vision_settings, load_runtime_vision_settings,
metrics_indicate_occupied,
point_in_polygon, point_in_polygon,
) )
@@ -157,6 +158,7 @@ class VisionTests(unittest.TestCase):
self.assertEqual(settings.sample_stride_pixels, 4) self.assertEqual(settings.sample_stride_pixels, 4)
self.assertEqual(settings.occupancy_mean_delta, 55.0) self.assertEqual(settings.occupancy_mean_delta, 55.0)
self.assertEqual(settings.occupancy_absolute_dark_fraction, 0.0)
self.assertEqual(settings.occupancy_confirm_frames, 2) self.assertEqual(settings.occupancy_confirm_frames, 2)
self.assertEqual(settings.empty_confirm_frames, 2) self.assertEqual(settings.empty_confirm_frames, 2)
self.assertEqual(settings.trash_motion_delta, 18.0) self.assertEqual(settings.trash_motion_delta, 18.0)
@@ -164,6 +166,25 @@ class VisionTests(unittest.TestCase):
self.assertEqual(settings.trash_sustained_motion_frames, 2) self.assertEqual(settings.trash_sustained_motion_frames, 2)
self.assertEqual(settings.trash_motion_cooldown_seconds, 3) self.assertEqual(settings.trash_motion_cooldown_seconds, 3)
def test_absolute_dark_fraction_can_detect_food_already_present_in_baseline(self) -> None:
settings = RuntimeVisionSettings(
occupancy_mean_delta=55,
occupancy_texture_delta=18,
occupancy_dark_fraction=0.06,
occupancy_absolute_dark_fraction=0.085,
)
occupied = metrics_indicate_occupied(
settings,
mean_delta=5.0,
texture_delta=0.5,
dark_fraction=0.09,
baseline_dark_fraction=0.10,
bright_fraction=0.0,
)
self.assertTrue(occupied)
def test_detector_can_seed_previous_baseline_and_occupancy(self) -> None: def test_detector_can_seed_previous_baseline_and_occupancy(self) -> None:
detector = ZoneOccupancyDetector( detector = ZoneOccupancyDetector(
[Region("1", ((0.0, 0.0), (1.0, 0.0), (1.0, 1.0), (0.0, 1.0)))], [Region("1", ((0.0, 0.0), (1.0, 0.0), (1.0, 1.0), (0.0, 1.0)))],

View File

@@ -80,6 +80,46 @@ class WebhookTests(unittest.TestCase):
self.assertFalse(payload["is_discarded"]) self.assertFalse(payload["is_discarded"])
self.assertEqual(payload["alerted_at"], datetime(2026, 6, 9, 9, 0, tzinfo=UTC).isoformat()) self.assertEqual(payload["alerted_at"], datetime(2026, 6, 9, 9, 0, tzinfo=UTC).isoformat())
def test_build_batch_event_payload_preserves_pre_warning_and_alarm_times(self) -> None:
pre_warned_at = datetime(2026, 6, 9, 8, 59, tzinfo=UTC).isoformat()
alarm_at = datetime(2026, 6, 9, 9, 0, tzinfo=UTC).isoformat()
pre_warning_payload = build_batch_event_payload(
{
"event": "time_pre_warning",
"ts": pre_warned_at,
"batch_id": "batch_000001",
"camera_id": "cam_01",
"zone_id": "1",
"zone_label": "区域 1",
"severity": "warning",
"state": "pre_warning",
"started_at": datetime(2026, 6, 9, 8, 58, tzinfo=UTC).isoformat(),
"pre_warned_at": pre_warned_at,
}
)
alarm_payload = build_batch_event_payload(
{
"event": "time_alarm",
"ts": alarm_at,
"batch_id": "batch_000001",
"camera_id": "cam_01",
"zone_id": "1",
"zone_label": "区域 1",
"severity": "alarm",
"state": "alerted",
"started_at": datetime(2026, 6, 9, 8, 58, tzinfo=UTC).isoformat(),
"pre_warned_at": pre_warned_at,
"alerted_at": alarm_at,
}
)
self.assertEqual(pre_warning_payload["pre_warned_at"], pre_warned_at)
self.assertEqual(pre_warning_payload["created_at"], pre_warned_at)
self.assertEqual(pre_warning_payload["alarm_at"], "")
self.assertEqual(alarm_payload["pre_warned_at"], pre_warned_at)
self.assertEqual(alarm_payload["alarm_at"], alarm_at)
def test_build_batch_event_payload_includes_uploaded_snapshot_path(self) -> None: def test_build_batch_event_payload_includes_uploaded_snapshot_path(self) -> None:
payload = build_batch_event_payload( payload = build_batch_event_payload(
{ {

View File

@@ -2,11 +2,14 @@ server {
listen 80; listen 80;
server_name _; server_name _;
resolver 127.0.0.11 ipv6=off valid=10s;
root /usr/share/nginx/html; root /usr/share/nginx/html;
index index.html; index index.html;
location /api/ { location /api/ {
proxy_pass http://cold-display-guard-api:19080; set $api_upstream http://cold-display-guard-api:19080;
proxy_pass $api_upstream;
proxy_http_version 1.1; proxy_http_version 1.1;
proxy_set_header Host $host; proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr; proxy_set_header X-Real-IP $remote_addr;