feat: enhance LineCrossCounter to deduplicate re-entries and add WindowIdentityResolver for person tracking
chore: update docker-compose for additional host configuration test: add unit tests for LineCrossCounter and WindowIdentityResolver functionality
This commit is contained in:
@@ -19,8 +19,6 @@ services:
|
||||
MANAGED_PORTAL_REGISTRY_PATH: "/app/managed_services.yaml"
|
||||
volumes:
|
||||
- /var/run/docker.sock:/var/run/docker.sock
|
||||
ports:
|
||||
- "31000-31255:31000-31255"
|
||||
networks:
|
||||
- managed-portal
|
||||
|
||||
@@ -39,6 +37,8 @@ services:
|
||||
API_HOST: 0.0.0.0
|
||||
API_PORT: 18081
|
||||
CONFIG_PATH: /app/config/local.yaml
|
||||
extra_hosts:
|
||||
- "host.docker.internal:host-gateway"
|
||||
volumes:
|
||||
- ${MANAGED_STORE_DWELL_CONFIG_DIR:-../managed/store_dwell_alert/config}:/app/config
|
||||
- ${MANAGED_STORE_DWELL_DATA_DIR:-../managed/store_dwell_alert/data}:/app/data
|
||||
@@ -61,6 +61,8 @@ services:
|
||||
OUTPUT_DIR: /opt/people-flow/outputs
|
||||
API_HOST: 0.0.0.0
|
||||
API_PORT: 18082
|
||||
extra_hosts:
|
||||
- "host.docker.internal:host-gateway"
|
||||
volumes:
|
||||
- ${MANAGED_PEOPLE_FLOW_CONFIG_DIR:-../managed/people_flow_project/config}:/opt/people-flow/config
|
||||
- ${MANAGED_PEOPLE_FLOW_OUTPUT_DIR:-../managed/people_flow_project/outputs}:/opt/people-flow/outputs
|
||||
|
||||
@@ -12,31 +12,55 @@ def _line_side(
|
||||
|
||||
|
||||
class LineCrossCounter:
|
||||
def __init__(self, line: tuple[float, float, float, float], config: CountingConfig) -> None:
|
||||
def __init__(
|
||||
self, line: tuple[float, float, float, float], config: CountingConfig
|
||||
) -> None:
|
||||
self.line = line
|
||||
self.config = config
|
||||
self.previous_side: dict[int, float] = {}
|
||||
self.counted_ids: set[int] = set()
|
||||
self.counted_person_keys: set[str] = set()
|
||||
self.crossed_track_ids: set[int] = set()
|
||||
self.crossings: list[CrossingEvent] = []
|
||||
|
||||
def update(self, observations: list[TrackObservation]) -> list[CrossingEvent]:
|
||||
def update(
|
||||
self,
|
||||
observations: list[TrackObservation],
|
||||
person_keys: dict[int, str] | None = None,
|
||||
) -> list[CrossingEvent]:
|
||||
events: list[CrossingEvent] = []
|
||||
for observation in observations:
|
||||
side = _line_side(observation.center, self.line)
|
||||
previous = self.previous_side.get(observation.track_id)
|
||||
self.previous_side[observation.track_id] = side
|
||||
|
||||
if observation.track_id in self.counted_ids:
|
||||
if observation.track_id in self.crossed_track_ids:
|
||||
continue
|
||||
if previous is None:
|
||||
continue
|
||||
if abs(previous) <= self.config.crossing_tolerance or abs(side) <= self.config.crossing_tolerance:
|
||||
if (
|
||||
abs(previous) <= self.config.crossing_tolerance
|
||||
or abs(side) <= self.config.crossing_tolerance
|
||||
):
|
||||
continue
|
||||
if previous * side >= 0:
|
||||
continue
|
||||
|
||||
direction = "negative_to_positive" if previous < 0 < side else "positive_to_negative"
|
||||
direction = (
|
||||
"negative_to_positive"
|
||||
if previous < 0 < side
|
||||
else "positive_to_negative"
|
||||
)
|
||||
person_key = (
|
||||
person_keys.get(observation.track_id, f"track:{observation.track_id}")
|
||||
if person_keys is not None
|
||||
else f"track:{observation.track_id}"
|
||||
)
|
||||
self.crossed_track_ids.add(observation.track_id)
|
||||
if person_key in self.counted_person_keys:
|
||||
continue
|
||||
event = CrossingEvent(track_id=observation.track_id, direction=direction)
|
||||
self.counted_person_keys.add(person_key)
|
||||
self.counted_ids.add(observation.track_id)
|
||||
self.crossings.append(event)
|
||||
events.append(event)
|
||||
@@ -45,8 +69,10 @@ class LineCrossCounter:
|
||||
def reset(self) -> None:
|
||||
self.previous_side.clear()
|
||||
self.counted_ids.clear()
|
||||
self.counted_person_keys.clear()
|
||||
self.crossed_track_ids.clear()
|
||||
self.crossings.clear()
|
||||
|
||||
@property
|
||||
def total_people(self) -> int:
|
||||
return len(self.counted_ids)
|
||||
return len(self.counted_person_keys)
|
||||
|
||||
@@ -22,6 +22,7 @@ from .io_utils import (
|
||||
from .models import AppConfig
|
||||
from .queue_analytics import QueueWindowTracker
|
||||
from .tracking import extract_person_tracks
|
||||
from .window_identity import WindowIdentityResolver
|
||||
from .webhook import dispatch_json_event
|
||||
|
||||
SUPPORTED_EXTENSIONS = {".mp4", ".mov", ".mkv", ".avi"}
|
||||
@@ -176,6 +177,7 @@ class PeopleFlowPipeline:
|
||||
pixel_line = None
|
||||
counter = None
|
||||
queue_tracker = None
|
||||
identity_resolver = WindowIdentityResolver()
|
||||
attributes = AttributeAggregator(self.config.attributes)
|
||||
project_root = (
|
||||
self.config.config_path.parent.parent
|
||||
@@ -222,6 +224,7 @@ class PeopleFlowPipeline:
|
||||
counter.reset()
|
||||
if queue_tracker is not None:
|
||||
queue_tracker.reset()
|
||||
identity_resolver.reset()
|
||||
attributes.reset()
|
||||
now = datetime.now().astimezone()
|
||||
|
||||
@@ -257,12 +260,13 @@ class PeopleFlowPipeline:
|
||||
|
||||
last_processed_at = current_time
|
||||
observations = self._track_frame(frame)
|
||||
person_keys = identity_resolver.resolve(frame, observations)
|
||||
for observation in observations:
|
||||
attributes.maybe_collect(
|
||||
frame=frame, frame_index=frame_index, track=observation
|
||||
)
|
||||
if counter is not None:
|
||||
counter.update(observations)
|
||||
counter.update(observations, person_keys=person_keys)
|
||||
if queue_tracker is not None and self.config.queue.enabled:
|
||||
queue_tracker.observe(observations, now)
|
||||
if current_time >= next_heartbeat_at:
|
||||
|
||||
118
managed/people_flow_project/src/people_flow/window_identity.py
Normal file
118
managed/people_flow_project/src/people_flow/window_identity.py
Normal file
@@ -0,0 +1,118 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from dataclasses import dataclass
|
||||
|
||||
import numpy as np
|
||||
|
||||
from .models import TrackObservation
|
||||
|
||||
|
||||
def build_color_signature(crop: np.ndarray) -> list[float]:
|
||||
if crop.size == 0:
|
||||
return [0.0] * 9
|
||||
bands = np.array_split(crop[:, :, :3], 3, axis=0)
|
||||
signature: list[float] = []
|
||||
for band in bands:
|
||||
if band.size == 0:
|
||||
signature.extend([0.0, 0.0, 0.0])
|
||||
continue
|
||||
mean_bgr = band.reshape(-1, 3).mean(axis=0) / 255.0
|
||||
signature.extend(round(float(value), 4) for value in mean_bgr)
|
||||
return signature
|
||||
|
||||
|
||||
def signature_similarity(left: list[float], right: list[float]) -> float:
|
||||
if not left or not right:
|
||||
return 0.0
|
||||
size = min(len(left), len(right))
|
||||
if size == 0:
|
||||
return 0.0
|
||||
distance = sum((left[index] - right[index]) ** 2 for index in range(size)) ** 0.5
|
||||
return max(0.0, 1.0 - distance / (size**0.5))
|
||||
|
||||
|
||||
@dataclass(slots=True)
|
||||
class ActiveWindowPerson:
|
||||
person_key: str
|
||||
signature: list[float]
|
||||
|
||||
|
||||
@dataclass(slots=True)
|
||||
class PausedWindowPerson:
|
||||
person_key: str
|
||||
signature: list[float]
|
||||
|
||||
|
||||
class WindowIdentityResolver:
|
||||
def __init__(self, similarity_threshold: float = 0.97) -> None:
|
||||
self.similarity_threshold = similarity_threshold
|
||||
self.active_by_track: dict[int, ActiveWindowPerson] = {}
|
||||
self.paused_by_person: dict[str, PausedWindowPerson] = {}
|
||||
self.person_counter = 0
|
||||
|
||||
def reset(self) -> None:
|
||||
self.active_by_track.clear()
|
||||
self.paused_by_person.clear()
|
||||
self.person_counter = 0
|
||||
|
||||
def resolve(
|
||||
self, frame: np.ndarray, observations: list[TrackObservation]
|
||||
) -> dict[int, str]:
|
||||
current_track_ids = {observation.track_id for observation in observations}
|
||||
disappeared_track_ids = [
|
||||
track_id
|
||||
for track_id in self.active_by_track
|
||||
if track_id not in current_track_ids
|
||||
]
|
||||
for track_id in disappeared_track_ids:
|
||||
active = self.active_by_track.pop(track_id)
|
||||
self.paused_by_person[active.person_key] = PausedWindowPerson(
|
||||
person_key=active.person_key,
|
||||
signature=active.signature,
|
||||
)
|
||||
|
||||
person_keys: dict[int, str] = {}
|
||||
for observation in observations:
|
||||
signature = build_color_signature(self._crop(frame, observation.bbox))
|
||||
active = self.active_by_track.get(observation.track_id)
|
||||
if active is None:
|
||||
person_key = self._match_paused(signature)
|
||||
if person_key is None:
|
||||
person_key = self._next_person_key()
|
||||
active = ActiveWindowPerson(person_key=person_key, signature=signature)
|
||||
self.active_by_track[observation.track_id] = active
|
||||
else:
|
||||
active.signature = signature
|
||||
|
||||
person_keys[observation.track_id] = active.person_key
|
||||
return person_keys
|
||||
|
||||
def _next_person_key(self) -> str:
|
||||
self.person_counter += 1
|
||||
return f"person:{self.person_counter:05d}"
|
||||
|
||||
def _match_paused(self, signature: list[float]) -> str | None:
|
||||
if max(signature, default=0.0) <= 0.0:
|
||||
return None
|
||||
best_person_key = None
|
||||
best_similarity = 0.0
|
||||
for person_key, paused in self.paused_by_person.items():
|
||||
similarity = signature_similarity(signature, paused.signature)
|
||||
if similarity < self.similarity_threshold:
|
||||
continue
|
||||
if similarity > best_similarity:
|
||||
best_person_key = person_key
|
||||
best_similarity = similarity
|
||||
if best_person_key is not None:
|
||||
del self.paused_by_person[best_person_key]
|
||||
return best_person_key
|
||||
|
||||
@staticmethod
|
||||
def _crop(frame: np.ndarray, bbox: tuple[int, int, int, int]) -> np.ndarray:
|
||||
x1, y1, x2, y2 = bbox
|
||||
height, width = frame.shape[:2]
|
||||
left = max(0, min(x1, width))
|
||||
top = max(0, min(y1, height))
|
||||
right = max(left, min(x2, width))
|
||||
bottom = max(top, min(y2, height))
|
||||
return frame[top:bottom, left:right]
|
||||
43
managed/people_flow_project/tests/test_counting.py
Normal file
43
managed/people_flow_project/tests/test_counting.py
Normal file
@@ -0,0 +1,43 @@
|
||||
import numpy as np
|
||||
|
||||
from src.people_flow.counting import LineCrossCounter
|
||||
from src.people_flow.models import CountingConfig, TrackObservation
|
||||
from src.people_flow.window_identity import WindowIdentityResolver
|
||||
|
||||
|
||||
def _observation(track_id: int, center_y: float) -> TrackObservation:
|
||||
return TrackObservation(
|
||||
track_id=track_id,
|
||||
bbox=(0, 0, 10, 10),
|
||||
confidence=0.9,
|
||||
center=(5.0, center_y),
|
||||
)
|
||||
|
||||
|
||||
def test_line_cross_counter_dedupes_reentry_with_same_person_key():
|
||||
counter = LineCrossCounter(
|
||||
(0.0, 5.0, 10.0, 5.0), CountingConfig(crossing_tolerance=0.0)
|
||||
)
|
||||
|
||||
counter.update([_observation(1, 2.0)], person_keys={1: "person:1"})
|
||||
first_events = counter.update([_observation(1, 8.0)], person_keys={1: "person:1"})
|
||||
counter.update([_observation(2, 2.0)], person_keys={2: "person:1"})
|
||||
second_events = counter.update([_observation(2, 8.0)], person_keys={2: "person:1"})
|
||||
|
||||
assert len(first_events) == 1
|
||||
assert second_events == []
|
||||
assert counter.total_people == 1
|
||||
assert len(counter.crossings) == 1
|
||||
assert counter.crossings[0].track_id == 1
|
||||
|
||||
|
||||
def test_window_identity_resolver_matches_reentry_after_track_switch():
|
||||
resolver = WindowIdentityResolver(similarity_threshold=0.97)
|
||||
frame = np.zeros((16, 16, 3), dtype=np.uint8)
|
||||
frame[:, :] = (30, 60, 180)
|
||||
|
||||
first_keys = resolver.resolve(frame, [_observation(1, 2.0)])
|
||||
resolver.resolve(frame, [])
|
||||
second_keys = resolver.resolve(frame, [_observation(2, 8.0)])
|
||||
|
||||
assert first_keys[1] == second_keys[2]
|
||||
@@ -2,24 +2,30 @@
|
||||
|
||||
## Checklist
|
||||
|
||||
- [x] Clarify scope, repository boundaries, and validation requirements for a repository-level AGENTS.md.
|
||||
- [x] Verify that the plan covers the user-specified workflow rules, project structure, and risk points.
|
||||
- [x] Create a complete AGENTS.md that embeds the requested workflow orchestration rules and repo-specific guardrails.
|
||||
- [x] Verify the generated AGENTS.md against the current repository structure and required workflow sections.
|
||||
- [x] Update the Review section with the final result and verification evidence.
|
||||
- [x] Confirm the current `store_dwell_alert` half-hour report path and identify the runtime control point.
|
||||
- [x] Verify the plan covers behavior change, focused tests, deployment scope, and post-deploy validation.
|
||||
- [ ] Update focused tests so `half_hour_report` is expected on rolling 1800-second windows from startup time.
|
||||
- [ ] Implement the rolling window behavior in `store_dwell_alert` runtime code.
|
||||
- [ ] Run focused `store_dwell_alert` tests for the changed slice.
|
||||
- [ ] Deploy the updated `store_dwell_alert` code to `xiaozheng@10.8.0.11` and restart only the affected service(s).
|
||||
- [ ] Validate the remote deployment and update the Review section with evidence.
|
||||
|
||||
## Scope And Risks
|
||||
|
||||
- Scope: add a repository-root AGENTS.md that governs future modifications across the Go backend, Vue frontend, deployment assets, and embedded Python managed services.
|
||||
- Risk: omit a repository area or validation command and leave future work under-constrained.
|
||||
- Risk: copy the requested workflow rules without adapting them to this repo's actual layout and toolchain.
|
||||
- Risk: finish without writing persistent task evidence for plan, review, and lessons.
|
||||
- Scope: change `managed/store_dwell_alert` so `half_hour_report` uses rolling 1800-second windows from service startup instead of natural `:00` / `:30` boundaries, then deploy the change to `10.8.0.11`.
|
||||
- Expected touch points: `managed/store_dwell_alert/app/modules/dwell_engine.py`, `managed/store_dwell_alert/app/modules/reporter.py`, and focused tests under `managed/store_dwell_alert/tests/`.
|
||||
- Risk: changing the window model can alter `window_start` and `window_end` values consumed by downstream webhook receivers and manage APIs.
|
||||
- Risk: a delayed observation call may span more than one 30-minute window; the implementation should behave predictably and avoid duplicate emissions for the same window.
|
||||
- Risk: deployment should be limited to `store-dwell-alert` unless code or config diffs prove broader scope is required.
|
||||
|
||||
## Validation Intent
|
||||
|
||||
- First pin the new expected behavior with focused tests.
|
||||
- After the code change, run the narrowest `store_dwell_alert` tests that cover report timing and report payloads.
|
||||
- After deployment, verify the remote service is healthy and that the deployed code matches local content.
|
||||
|
||||
## Review
|
||||
|
||||
- Status: completed.
|
||||
- Result: added a repository-root `AGENTS.md` plus `tasks/todo.md` and `tasks/lessons.md` to enforce the requested workflow for future non-trivial tasks.
|
||||
- Verification:
|
||||
- confirmed `AGENTS.md` exists at the repository root;
|
||||
- read back `AGENTS.md` and verified it covers workflow orchestration, task management, repo structure, validation matrix, and definition of done;
|
||||
- matched required repository-specific references including `internal/webdevice/`, `managed/people_flow_project/`, `managed/store_dwell_alert/`, `tasks/todo.md`, and `tasks/lessons.md`.
|
||||
- Status: in progress.
|
||||
- Result: pending.
|
||||
- Verification: pending.
|
||||
|
||||
Reference in New Issue
Block a user