feat: add deployment configuration and scripts for managed-portal, including Dockerfiles and environment settings

This commit is contained in:
2026-05-13 16:49:21 +08:00
parent 330373b8f1
commit f8a6d9803d
13 changed files with 563 additions and 71 deletions

View File

@@ -3,7 +3,7 @@ FROM swr.cn-north-4.myhuaweicloud.com/ddn-k8s/docker.io/library/python:3.12-slim
ENV PYTHONDONTWRITEBYTECODE=1 \
PYTHONUNBUFFERED=1 \
PIP_NO_CACHE_DIR=1 \
PIP_INDEX_URL=https://pypi.tuna.tsinghua.edu.cn/simple \
PIP_INDEX_URL=https://mirrors.aliyun.com/pypi/simple/ \
DEEPFACE_HOME=/root/.deepface \
TF_CPP_MIN_LOG_LEVEL=2
@@ -19,8 +19,7 @@ RUN sed -i 's|http://deb.debian.org/debian|http://mirrors.aliyun.com/debian|g; s
COPY requirements-docker.txt ./requirements-docker.txt
RUN python -m pip install --upgrade pip setuptools wheel && \
pip install "numpy<2"
RUN pip install "numpy<2"
RUN pip install --extra-index-url https://download.pytorch.org/whl/cpu \
"torch==2.6.0+cpu" "torchvision==0.21.0+cpu"
@@ -32,17 +31,12 @@ RUN pip install -r requirements-docker.txt
COPY . .
COPY scripts/docker-entrypoint.sh /opt/people-flow/scripts/docker-entrypoint.sh
RUN test -f /opt/people-flow/weights/yolo11n.pt && \
test -f /opt/people-flow/weights/deepface/age_model_weights.h5 && \
test -f /opt/people-flow/weights/deepface/gender_model_weights.h5 && \
test -f /opt/people-flow/weights/deepface/retinaface.h5 && \
mkdir -p /root/.deepface/weights /opt/people-flow/outputs && \
cp /opt/people-flow/weights/deepface/*.h5 /root/.deepface/weights/ && \
RUN mkdir -p /root/.deepface/weights /opt/people-flow/outputs && \
chmod +x /opt/people-flow/scripts/docker-entrypoint.sh
EXPOSE 18082
HEALTHCHECK --interval=30s --timeout=5s --start-period=15s --retries=3 \
CMD python -c "import urllib.request; urllib.request.urlopen('http://127.0.0.1:18082/api/manage/health', timeout=3).read()" || exit 1
CMD python -c "import urllib.request; urllib.request.urlopen('http://127.0.0.1:18082/api/manage/health', timeout=3).read()" || exit 1
ENTRYPOINT ["/opt/people-flow/scripts/docker-entrypoint.sh"]

View File

@@ -8,8 +8,15 @@ OUTPUT_DIR="${OUTPUT_DIR:-${PROJECT_DIR}/outputs}"
RTSP_URL="${RTSP_URL:-}"
API_HOST="${API_HOST:-0.0.0.0}"
API_PORT="${API_PORT:-18082}"
RTSP_STALL_TIMEOUT_SECONDS="${RTSP_STALL_TIMEOUT_SECONDS:-180}"
DEEPFACE_CACHE_DIR="/root/.deepface/weights"
DEEPFACE_SOURCE_DIR="${PROJECT_DIR}/weights/deepface"
mkdir -p "${OUTPUT_DIR}" "$(dirname "${CONFIG_PATH}")"
mkdir -p "${OUTPUT_DIR}" "$(dirname "${CONFIG_PATH}")" "${DEEPFACE_CACHE_DIR}"
if [ -d "${DEEPFACE_SOURCE_DIR}" ]; then
find "${DEEPFACE_SOURCE_DIR}" -maxdepth 1 -name '*.h5' -exec cp {} "${DEEPFACE_CACHE_DIR}/" \;
fi
if [ ! -f "${CONFIG_PATH}" ]; then
cp "${CONFIG_TEMPLATE}" "${CONFIG_PATH}"
@@ -37,13 +44,31 @@ config_path.write_text(
)
PY
exec python - "$CONFIG_PATH" "$API_HOST" "$API_PORT" <<'PY'
RTSP_OUTPUT_SUBDIR="$(python - "$CONFIG_PATH" <<'PY'
from pathlib import Path
import sys
import yaml
config_path = Path(sys.argv[1])
raw = yaml.safe_load(config_path.read_text(encoding="utf-8")) or {}
rtsp = raw.get("rtsp") or {}
print(rtsp.get("output_subdir", "rtsp_stream"))
PY
)"
RTSP_STATUS_PATH="${OUTPUT_DIR}/${RTSP_OUTPUT_SUBDIR}/worker_status.json"
exec python - "$CONFIG_PATH" "$API_HOST" "$API_PORT" "$RTSP_STATUS_PATH" "$RTSP_STALL_TIMEOUT_SECONDS" <<'PY'
from pathlib import Path
import signal
import subprocess
import sys
import time
config_path, api_host, api_port = sys.argv[1:4]
from src.people_flow.worker_status import worker_status_stall_reason
config_path, api_host, api_port, status_path_raw, stall_timeout_raw = sys.argv[1:6]
status_path = Path(status_path_raw)
stall_timeout_seconds = max(float(stall_timeout_raw), 30.0)
commands = [
[sys.executable, "main.py", "--config", config_path, "rtsp"],
[
@@ -59,21 +84,38 @@ commands = [
],
]
processes = [subprocess.Popen(command) for command in commands]
supervisor_started_at = time.time()
def terminate_all(signum, _frame):
for process in processes:
if process.poll() is None:
process.terminate()
def stop_all(excluded_index=None):
for index, process in enumerate(processes):
if index == excluded_index or process.poll() is not None:
continue
process.terminate()
deadline = time.time() + 10
for process in processes:
if process.poll() is not None:
for index, process in enumerate(processes):
if index == excluded_index or process.poll() is not None:
continue
timeout = max(0, deadline - time.time())
try:
process.wait(timeout=timeout)
except subprocess.TimeoutExpired:
process.kill()
def stale_reason():
if processes[0].poll() is not None:
return None
return worker_status_stall_reason(
status_path,
started_at=supervisor_started_at,
max_age_seconds=stall_timeout_seconds,
)
def terminate_all(signum, _frame):
stop_all()
raise SystemExit(128 + signum)
@@ -85,19 +127,12 @@ while True:
return_code = process.poll()
if return_code is None:
continue
for other_index, other_process in enumerate(processes):
if other_index == index or other_process.poll() is not None:
continue
other_process.terminate()
deadline = time.time() + 10
for other_index, other_process in enumerate(processes):
if other_index == index or other_process.poll() is not None:
continue
timeout = max(0, deadline - time.time())
try:
other_process.wait(timeout=timeout)
except subprocess.TimeoutExpired:
other_process.kill()
stop_all(excluded_index=index)
raise SystemExit(return_code)
reason = stale_reason()
if reason is not None:
print(reason, flush=True)
stop_all()
raise SystemExit(1)
time.sleep(0.5)
PY

View File

@@ -24,6 +24,7 @@ from .queue_analytics import QueueWindowTracker
from .tracking import extract_person_tracks
from .window_identity import WindowIdentityResolver
from .webhook import dispatch_json_event
from .worker_status import write_worker_status
SUPPORTED_EXTENSIONS = {".mp4", ".mov", ".mkv", ".avi"}
@@ -157,6 +158,7 @@ class PeopleFlowPipeline:
def process_rtsp(self, source: str) -> dict:
rtsp_paths = self.get_rtsp_output_paths()
status_path = rtsp_paths["root"] / "worker_status.json"
sample_interval = max(float(self.config.rtsp.sample_interval_seconds), 0.01)
window_seconds = max(int(self.config.rtsp.window_seconds), 1)
reconnect_delay = max(float(self.config.rtsp.reconnect_delay_seconds), 0.1)
@@ -189,8 +191,39 @@ class PeopleFlowPipeline:
if not Path(self.config.webhook.event_log_path).is_absolute()
else Path(self.config.webhook.event_log_path)
)
last_status_phase: str | None = None
last_status_written_at = 0.0
def update_status(
phase: str,
*,
force: bool = False,
note: str | None = None,
) -> None:
nonlocal last_status_phase, last_status_written_at
current_time = time.monotonic()
if (
not force
and phase == last_status_phase
and current_time - last_status_written_at < 5.0
):
return
write_worker_status(
status_path,
phase,
source=source,
window_index=window_index,
frame_index=frame_index,
last_processed_at=last_processed_wall_time,
note=note,
)
last_status_phase = phase
last_status_written_at = current_time
try:
update_status("starting", force=True)
while True:
now = datetime.now().astimezone()
while now >= window_end:
@@ -215,6 +248,7 @@ class PeopleFlowPipeline:
webhook_url=self.config.webhook.url,
timeout_seconds=self.config.webhook.timeout_seconds,
)
update_status("window_flushed", force=True)
print(f"window_json={json_path}", flush=True)
print(f"window_total_people={payload['total_people']}", flush=True)
window_index += 1
@@ -229,13 +263,25 @@ class PeopleFlowPipeline:
now = datetime.now().astimezone()
if capture is None or not capture.isOpened():
update_status("opening_stream")
capture = self._open_rtsp_capture(source, open_timeout_seconds)
if capture is None:
update_status(
"waiting_to_reconnect",
force=True,
note="open_failed",
)
time.sleep(reconnect_delay)
continue
update_status("reading_frame")
ok, frame = capture.read()
if not ok or frame is None:
update_status(
"waiting_to_reconnect",
force=True,
note="read_failed",
)
capture.release()
capture = None
time.sleep(reconnect_delay)
@@ -251,6 +297,7 @@ class PeopleFlowPipeline:
self.config.queue,
self.config.queue.to_pixel_area(width=width, height=height),
)
update_status("capture_ready", force=True)
current_time = time.monotonic()
if current_time - last_processed_at < sample_interval:
@@ -258,6 +305,7 @@ class PeopleFlowPipeline:
time.sleep(idle_sleep)
continue
update_status("tracking_frame")
last_processed_at = current_time
observations = self._track_frame(frame)
person_keys = identity_resolver.resolve(frame, observations)
@@ -281,9 +329,11 @@ class PeopleFlowPipeline:
next_heartbeat_at = current_time + 60.0
last_processed_wall_time = now
frame_index += 1
update_status("processed_frame", force=True)
except KeyboardInterrupt:
pass
finally:
update_status("stopped", force=True)
if capture is not None:
capture.release()

View File

@@ -0,0 +1,84 @@
from __future__ import annotations
import json
from datetime import datetime
from pathlib import Path
def write_worker_status(
path: Path,
phase: str,
*,
source: str,
window_index: int,
frame_index: int,
last_processed_at: datetime | None,
note: str | None = None,
) -> dict:
path.parent.mkdir(parents=True, exist_ok=True)
payload = {
"phase": phase,
"updated_at": datetime.now().astimezone().isoformat(timespec="seconds"),
"source": source,
"window_index": window_index,
"frame_index": frame_index,
"last_processed_at": (
last_processed_at.isoformat(timespec="seconds")
if last_processed_at is not None
else None
),
}
if note:
payload["note"] = note
temp_path = path.with_suffix(path.suffix + ".tmp")
temp_path.write_text(
json.dumps(payload, ensure_ascii=True, indent=2),
encoding="utf-8",
)
temp_path.replace(path)
return payload
def load_worker_status(path: Path) -> dict | None:
try:
payload = json.loads(path.read_text(encoding="utf-8"))
except (FileNotFoundError, json.JSONDecodeError):
return None
return payload if isinstance(payload, dict) else None
def worker_status_age_seconds(path: Path, now: float | None = None) -> float | None:
try:
stat_result = path.stat()
except FileNotFoundError:
return None
current_time = datetime.now().timestamp() if now is None else now
return max(0.0, current_time - stat_result.st_mtime)
def worker_status_stall_reason(
path: Path,
*,
started_at: float,
max_age_seconds: float,
now: float | None = None,
) -> str | None:
current_time = datetime.now().timestamp() if now is None else now
age_seconds = worker_status_age_seconds(path, now=current_time)
if age_seconds is None:
if current_time - started_at < max_age_seconds:
return None
return f"rtsp worker status missing path={path}"
if age_seconds <= max_age_seconds:
return None
payload = load_worker_status(path) or {}
phase = payload.get("phase", "unknown")
updated_at = payload.get("updated_at", "unknown")
return (
f"rtsp worker stalled path={path} phase={phase} "
f"updated_at={updated_at} age_seconds={age_seconds:.1f}"
)

View File

@@ -0,0 +1,101 @@
from __future__ import annotations
import os
from datetime import datetime
from pathlib import Path
from src.people_flow.worker_status import (
load_worker_status,
worker_status_age_seconds,
worker_status_stall_reason,
write_worker_status,
)
def test_write_worker_status_persists_progress(tmp_path: Path):
status_path = tmp_path / "outputs" / "rtsp_stream" / "worker_status.json"
last_processed_at = datetime(2026, 5, 13, 16, 30, 0).astimezone()
write_worker_status(
status_path,
"processed_frame",
source="rtsp://camera/stream",
window_index=3,
frame_index=42,
last_processed_at=last_processed_at,
note="healthy",
)
payload = load_worker_status(status_path)
assert payload is not None
assert payload["phase"] == "processed_frame"
assert payload["source"] == "rtsp://camera/stream"
assert payload["window_index"] == 3
assert payload["frame_index"] == 42
assert payload["last_processed_at"] == last_processed_at.isoformat(
timespec="seconds"
)
assert payload["note"] == "healthy"
assert "updated_at" in payload
def test_worker_status_age_seconds_uses_file_mtime(tmp_path: Path):
status_path = tmp_path / "worker_status.json"
write_worker_status(
status_path,
"tracking_frame",
source="rtsp://camera/stream",
window_index=0,
frame_index=0,
last_processed_at=None,
)
os.utime(status_path, (100.0, 100.0))
assert worker_status_age_seconds(status_path, now=280.0) == 180.0
assert worker_status_age_seconds(tmp_path / "missing.json", now=280.0) is None
def test_worker_status_stall_reason_reports_missing_and_stale_status(tmp_path: Path):
missing_path = tmp_path / "missing.json"
assert (
worker_status_stall_reason(
missing_path,
started_at=150.0,
max_age_seconds=180.0,
now=300.0,
)
is None
)
assert "status missing" in worker_status_stall_reason(
missing_path,
started_at=0.0,
max_age_seconds=180.0,
now=300.0,
)
status_path = tmp_path / "worker_status.json"
write_worker_status(
status_path,
"tracking_frame",
source="rtsp://camera/stream",
window_index=0,
frame_index=2,
last_processed_at=None,
)
os.utime(status_path, (100.0, 100.0))
reason = worker_status_stall_reason(
status_path,
started_at=0.0,
max_age_seconds=180.0,
now=300.0,
)
assert reason is not None
assert "status=missing" not in reason
assert "phase=tracking_frame" in reason
assert "age_seconds=200.0" in reason

View File

@@ -3,23 +3,22 @@ FROM swr.cn-north-4.myhuaweicloud.com/ddn-k8s/docker.io/library/python:3.12-slim
ENV PYTHONDONTWRITEBYTECODE=1 \
PYTHONUNBUFFERED=1 \
PIP_NO_CACHE_DIR=1 \
PIP_INDEX_URL=https://pypi.tuna.tsinghua.edu.cn/simple
PIP_INDEX_URL=https://mirrors.aliyun.com/pypi/simple/
WORKDIR /app
RUN sed -i 's|http://deb.debian.org/debian|http://mirrors.aliyun.com/debian|g; s|http://deb.debian.org/debian-security|http://mirrors.aliyun.com/debian-security|g' /etc/apt/sources.list.d/debian.sources \
&& apt-get update \
&& apt-get install -y --no-install-recommends \
ffmpeg \
libgl1 \
libglib2.0-0 \
libgomp1 \
ffmpeg \
libgl1 \
libglib2.0-0 \
libgomp1 \
&& rm -rf /var/lib/apt/lists/*
COPY requirements.txt /app/requirements.txt
RUN python -m pip install --upgrade pip setuptools wheel \
&& python -m pip install --extra-index-url https://download.pytorch.org/whl/cpu \
"torch==2.6.0+cpu" "torchvision==0.21.0+cpu" \
RUN python -m pip install --extra-index-url https://download.pytorch.org/whl/cpu \
"torch==2.6.0+cpu" "torchvision==0.21.0+cpu" \
&& python -m pip install -r /app/requirements.txt
COPY app /app/app
@@ -35,6 +34,6 @@ RUN test -f /app/weights/yolo11n.pt \
EXPOSE 18081
HEALTHCHECK --interval=30s --timeout=5s --start-period=10s --retries=3 \
CMD python -c "import urllib.request; urllib.request.urlopen('http://127.0.0.1:18081/api/manage/health', timeout=3).read()" || exit 1
CMD python -c "import urllib.request; urllib.request.urlopen('http://127.0.0.1:18081/api/manage/health', timeout=3).read()" || exit 1
ENTRYPOINT ["/app/scripts/docker-entrypoint.sh"]