Files
managed-portal/managed/people_flow_project/main.py
2026-04-27 10:04:36 +08:00

137 lines
4.4 KiB
Python

from __future__ import annotations
import argparse
from pathlib import Path
def build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(
description="People-flow counting with YOLO tracking and DeepFace demographics."
)
parser.add_argument(
"--config",
default="configs/default_config.yaml",
help="Path to the YAML config file.",
)
parser.add_argument(
"--output-dir",
default=None,
help="Directory for generated artifacts.",
)
parser.add_argument(
"--line",
help="Override counting line as x1,y1,x2,y2.",
)
parser.add_argument(
"--line-mode",
choices=["normalized", "pixel"],
help="Coordinate mode for --line.",
)
parser.add_argument(
"--device",
help="Override inference device, for example cuda:0 or cpu.",
)
subparsers = parser.add_subparsers(dest="command", required=True)
video_parser = subparsers.add_parser("video", help="Process one video.")
video_parser.add_argument("--input", required=True, help="Path to the video file.")
video_parser.add_argument(
"--skip-video-save",
action="store_true",
help="Do not write the annotated video.",
)
batch_parser = subparsers.add_parser("batch", help="Process a directory of videos.")
batch_parser.add_argument(
"--input-dir",
required=True,
help="Directory scanned recursively for videos.",
)
batch_parser.add_argument(
"--pattern",
default="*.mp4",
help="Glob pattern used during recursive discovery.",
)
batch_parser.add_argument(
"--skip-video-save",
action="store_true",
help="Do not write annotated videos.",
)
rtsp_parser = subparsers.add_parser("rtsp", help="Process a live RTSP stream.")
rtsp_parser.add_argument("--input", help="RTSP URL.")
manage_api_parser = subparsers.add_parser("manage-api", help="Start the management API.")
manage_api_parser.add_argument("--host", default="0.0.0.0", help="Host for the management API.")
manage_api_parser.add_argument("--port", type=int, default=18082, help="Port for the management API.")
return parser
def build_config(args: argparse.Namespace):
from src.people_flow.config import load_config, merge_cli_overrides
save_video = None
if hasattr(args, "skip_video_save"):
save_video = not args.skip_video_save
config = load_config(Path(args.config))
return merge_cli_overrides(
config=config,
line=args.line,
line_mode=args.line_mode,
device=args.device,
save_video=save_video,
)
def main() -> int:
parser = build_parser()
args = parser.parse_args()
if args.command == "manage-api":
from src.people_flow.manage_api import run_manage_api
run_manage_api(args.config, host=args.host, port=args.port)
return 0
config = build_config(args)
from src.people_flow.pipeline import PeopleFlowPipeline, discover_videos
output_root = Path(args.output_dir or config.runtime.output_dir)
pipeline = PeopleFlowPipeline(config=config, output_root=output_root)
if args.command == "rtsp":
paths = pipeline.get_rtsp_output_paths()
print(f"rtsp_output_dir={paths['root']}", flush=True)
print(f"latest_json={paths['latest_json']}", flush=True)
source = args.input or config.runtime.rtsp_url
if not source:
raise SystemExit("RTSP source is required. Pass --input or set runtime.rtsp_url in the config.")
pipeline.process_rtsp(source)
return 0
if args.command == "video":
result = pipeline.process_video(Path(args.input))
print(f"processed_video={result['video_name']}")
print(f"total_people={result['total_people']}")
print(f"unknown_attributes={result['unknown_attributes']}")
print(f"json={result['json_path']}")
if result.get("video_output_path"):
print(f"annotated_video={result['video_output_path']}")
return 0
videos = discover_videos(Path(args.input_dir), pattern=args.pattern)
if not videos:
raise SystemExit(f"No videos found under {args.input_dir} with pattern {args.pattern}")
summary = pipeline.process_batch(videos)
print(f"videos_processed={len(summary['videos'])}")
print(f"csv={summary['csv_path']}")
return 0
if __name__ == "__main__":
raise SystemExit(main())