Files
2026-06-17 11:33:54 +08:00

451 lines
15 KiB
Python

from __future__ import annotations
import json
import os
import re
from datetime import datetime
from pathlib import Path
from typing import Any
from urllib.parse import urlparse, urlunparse
import urllib.request
from zoneinfo import ZoneInfo
from .manifest import read_jsonl, write_manifest
from .paths import hik_cloud_download_path
DEFAULT_TIMEZONE = "Asia/Shanghai"
DEFAULT_CHUNK_SECONDS = 600
MAX_CHUNK_SECONDS = 3600
DEFAULT_API_BASE_URL = "https://api2.hik-cloud.com"
DEFAULT_DOWNLOAD_PATH = "/v1/carrier/cstorage/open/play/download"
DEFAULT_TIMEOUT_SECONDS = 60
DEFAULT_DOWNLOAD_TIMEOUT_SECONDS = 600
DOWNLOAD_MANIFEST_NAME = "hik_cloud_download_manifest.jsonl"
NO_RECORDING_CODE = 80438027
TIME_FORMAT = "%Y-%m-%d %H:%M:%S"
def parse_hik_time(value: str | int | float, timezone: str = DEFAULT_TIMEZONE) -> int:
if isinstance(value, bool):
raise ValueError(f"unsupported time value: {value!r}")
if isinstance(value, int | float):
return int(value)
if isinstance(value, str):
parsed = datetime.strptime(value, TIME_FORMAT)
return int(parsed.replace(tzinfo=ZoneInfo(timezone)).timestamp())
raise ValueError(f"unsupported time value: {value!r}")
def build_download_chunks(config: dict[str, Any]) -> list[dict[str, Any]]:
hik_config = config.get("hik_cloud", {})
runtime_config = config.get("runtime", {})
timezone = runtime_config.get("timezone", DEFAULT_TIMEZONE)
chunk_seconds = int(hik_config.get("chunk_seconds", DEFAULT_CHUNK_SECONDS))
if chunk_seconds <= 0:
raise ValueError("chunk_seconds must be greater than 0")
if chunk_seconds > MAX_CHUNK_SECONDS:
raise ValueError("chunk_seconds must be less than or equal to 3600")
chunks: list[dict[str, Any]] = []
devices = hik_config.get("devices", [])
time_ranges = hik_config.get("time_ranges", [])
for device in devices:
for time_range in time_ranges:
requested_begin = parse_hik_time(time_range["begin"], timezone)
requested_end = parse_hik_time(time_range["end"], timezone)
if requested_end <= requested_begin:
raise ValueError("time range end must be after begin")
time_begin = requested_begin
while time_begin < requested_end:
time_end = min(time_begin + chunk_seconds, requested_end)
chunks.append(
{
"device_serial": device["device_serial"],
"channel_no": device["channel_no"],
"requested_begin": requested_begin,
"requested_end": requested_end,
"time_begin": time_begin,
"time_end": time_end,
}
)
time_begin = time_end
return chunks
def resolve_access_token(config_or_hik_config: dict[str, Any]) -> str:
hik_config = _hik_config(config_or_hik_config)
access_token = hik_config.get("access_token")
if access_token:
return str(access_token)
access_token_env = hik_config.get("access_token_env")
if access_token_env:
env_token = os.environ.get(str(access_token_env))
if env_token:
return env_token
raise ValueError(
"missing hik_cloud access_token; configure access_token or access_token_env"
)
def request_download_address(
chunk: dict[str, Any],
hik_config: dict[str, Any],
*,
http_post: Any | None = None,
) -> dict[str, Any]:
token = resolve_access_token(hik_config)
api_base_url = str(hik_config.get("api_base_url") or DEFAULT_API_BASE_URL)
download_path = str(hik_config.get("download_path") or DEFAULT_DOWNLOAD_PATH)
url = api_base_url.rstrip("/") + download_path
headers = {
"Authorization": f"bearer {token}",
"Content-Type": "application/json",
}
json_body = {
"deviceSerial": chunk["device_serial"],
"channelNo": chunk["channel_no"],
"timeBegin": chunk["time_begin"],
"timeEnd": chunk["time_end"],
}
timeout_seconds = int(hik_config.get("timeout_seconds", DEFAULT_TIMEOUT_SECONDS))
post = http_post or _post_json
try:
response = post(url, json_body, headers, timeout_seconds)
except Exception as exc: # pragma: no cover - exact urllib failures vary.
return {
**_chunk_metadata(chunk),
"status": "address_failed",
"code": None,
"last_error": _sanitize_error(exc, token),
}
code = _optional_int(response.get("code"))
if code == 0:
data = response.get("data") or {}
return {
**_chunk_metadata(chunk),
"status": "address_ok",
"code": code,
"url": data.get("url"),
"actual_begin": _optional_int(data.get("actualBeginTime")),
"actual_end": _optional_int(data.get("actualEndTime")),
}
status = "no_recording" if code == NO_RECORDING_CODE else "address_failed"
result = {
**_chunk_metadata(chunk),
"status": status,
"code": code,
"last_error": _api_error_message(response, token),
}
return result
def download_hik_cloud_recordings(
config: dict[str, Any],
output_dir: str | Path,
*,
address_client: Any | None = None,
download_url: Any | None = None,
download: bool = True,
) -> list[dict[str, Any]]:
output_path = Path(output_dir).expanduser().resolve(strict=False)
manifest_path = output_path / DOWNLOAD_MANIFEST_NAME
hik_config = _hik_config(config)
chunks = build_download_chunks(config)
resume = bool(config.get("output", {}).get("resume", False))
manifest_records = read_jsonl(manifest_path) if resume else []
existing_downloads = {
_manifest_key(record): record
for record in manifest_records
if _is_resumable_download(record)
}
get_address = address_client or request_download_address
fetch = download_url or _download_url
download_timeout_seconds = int(
hik_config.get("download_timeout_seconds", DEFAULT_DOWNLOAD_TIMEOUT_SECONDS)
)
token = _redaction_token(hik_config)
video_records: list[dict[str, Any]] = []
for chunk in chunks:
key = _chunk_key(chunk)
existing_record = existing_downloads.get(key)
if download and existing_record is not None:
video_records.append(_video_record_from_manifest(existing_record))
continue
address_result = get_address(chunk, hik_config)
status = address_result.get("status")
if status != "address_ok":
_upsert_manifest_record(
manifest_records,
_manifest_record(
chunk,
address_result,
status=str(status or "address_failed"),
token=token,
),
)
continue
if not download:
_upsert_manifest_record(
manifest_records,
_manifest_record(
chunk,
address_result,
status="address_ok",
token=token,
),
)
continue
url = str(address_result.get("url") or "")
target_path = hik_cloud_download_path(
output_path,
str(chunk["device_serial"]),
chunk["channel_no"],
int(chunk["time_begin"]),
int(chunk["time_end"]),
)
try:
payload = fetch(url, timeout_seconds=download_timeout_seconds)
target_path.parent.mkdir(parents=True, exist_ok=True)
target_path.write_bytes(payload)
except Exception as exc: # pragma: no cover - concrete network failures vary.
_upsert_manifest_record(
manifest_records,
_manifest_record(
chunk,
address_result,
status="download_failed",
path=target_path,
last_error=_sanitize_error(exc, token),
token=token,
),
)
continue
record = _downloaded_video_record(chunk, address_result, target_path)
video_records.append(record)
_upsert_manifest_record(
manifest_records,
_manifest_record(
chunk,
address_result,
status="downloaded",
path=target_path,
token=token,
),
)
write_manifest(manifest_path, manifest_records)
return video_records
def _post_json(
url: str,
json_body: dict[str, Any],
headers: dict[str, str],
timeout_seconds: int,
) -> dict[str, Any]:
request = urllib.request.Request(
url,
data=json.dumps(json_body).encode("utf-8"),
headers=headers,
method="POST",
)
with urllib.request.urlopen(request, timeout=timeout_seconds) as response:
return json.loads(response.read().decode("utf-8"))
def _download_url(url: str, *, timeout_seconds: int | None = None) -> bytes:
with urllib.request.urlopen(url, timeout=timeout_seconds) as response:
return response.read()
def _hik_config(config_or_hik_config: dict[str, Any]) -> dict[str, Any]:
hik_config = config_or_hik_config.get("hik_cloud")
if isinstance(hik_config, dict):
return hik_config
return config_or_hik_config
def _chunk_metadata(chunk: dict[str, Any]) -> dict[str, Any]:
return {
"device_serial": chunk["device_serial"],
"channel_no": chunk["channel_no"],
"requested_begin": chunk.get("requested_begin"),
"requested_end": chunk.get("requested_end"),
"time_begin": chunk["time_begin"],
"time_end": chunk["time_end"],
}
def _optional_int(value: Any) -> int | None:
if value is None or value == "":
return None
return int(value)
def _api_error_message(response: dict[str, Any], token: str) -> str:
code = response.get("code")
message = response.get("msg") or response.get("message") or "hik api error"
return _sanitize_error(f"hik api code {code}: {message}", token)
def _sanitize_error(value: Any, token: str = "") -> str | None:
if value is None:
return None
message = str(value)
for raw_url in re.findall(r"https?://[^\s'\"<>]+", message):
parsed = urlparse(raw_url)
sanitized_url = urlunparse(
(parsed.scheme, parsed.netloc, parsed.path, "", "", "")
)
message = message.replace(raw_url, sanitized_url)
message = re.sub(
r"\b(?:sign|sig|token|access_token)=[^&\s'\"<>]+",
"[redacted-query]",
message,
flags=re.IGNORECASE,
)
if token:
message = message.replace(token, "[redacted]")
message = message.replace("Authorization", "[redacted-header]")
return message
def _downloaded_video_record(
chunk: dict[str, Any],
address_result: dict[str, Any],
path: Path,
) -> dict[str, Any]:
return {
"source": "hik_cloud",
"path": str(path),
"source_path": _source_path(chunk),
"device_serial": chunk["device_serial"],
"channel_no": chunk["channel_no"],
"requested_begin": chunk["time_begin"],
"requested_end": chunk["time_end"],
"actual_begin": address_result.get("actual_begin"),
"actual_end": address_result.get("actual_end"),
"status": "downloaded",
"retry_count": 0,
"last_error": None,
}
def _manifest_record(
chunk: dict[str, Any],
address_result: dict[str, Any],
*,
status: str,
token: str,
path: Path | None = None,
last_error: str | None = None,
) -> dict[str, Any]:
url = address_result.get("url")
record = {
"source": "hik_cloud",
"device_serial": chunk["device_serial"],
"channel_no": chunk["channel_no"],
"requested_begin": chunk["time_begin"],
"requested_end": chunk["time_end"],
"actual_begin": address_result.get("actual_begin"),
"actual_end": address_result.get("actual_end"),
"path": str(path) if path is not None else None,
"status": status,
"retry_count": 0,
"last_error": _sanitize_error(last_error or address_result.get("last_error"), token),
}
if url:
record["download_url_host"] = urlparse(str(url)).netloc
if "code" in address_result:
record["code"] = address_result.get("code")
if status == "downloaded":
record["source_path"] = _source_path(chunk)
return record
def _source_path(chunk: dict[str, Any]) -> str:
time_begin = chunk.get("time_begin", chunk.get("requested_begin"))
time_end = chunk.get("time_end", chunk.get("requested_end"))
return (
f"hik_cloud://{chunk['device_serial']}/ch{chunk['channel_no']}/"
f"{int(time_begin)}-{int(time_end)}"
)
def _is_resumable_download(record: dict[str, Any]) -> bool:
path = record.get("path")
return (
record.get("status") == "downloaded"
and isinstance(path, str)
and Path(path).exists()
)
def _video_record_from_manifest(record: dict[str, Any]) -> dict[str, Any]:
return {
"source": "hik_cloud",
"path": record["path"],
"source_path": record.get("source_path") or _source_path(record),
"device_serial": record["device_serial"],
"channel_no": record["channel_no"],
"requested_begin": record["requested_begin"],
"requested_end": record["requested_end"],
"actual_begin": record.get("actual_begin"),
"actual_end": record.get("actual_end"),
"status": "downloaded",
"retry_count": record.get("retry_count", 0),
"last_error": record.get("last_error"),
}
def _upsert_manifest_record(
records: list[dict[str, Any]],
new_record: dict[str, Any],
) -> None:
new_key = _manifest_key(new_record)
for index, record in enumerate(records):
if _manifest_key(record) == new_key:
records[index] = new_record
return
records.append(new_record)
def _chunk_key(chunk: dict[str, Any]) -> tuple[Any, Any, Any, Any]:
return (
chunk.get("device_serial"),
chunk.get("channel_no"),
chunk.get("time_begin"),
chunk.get("time_end"),
)
def _manifest_key(record: dict[str, Any]) -> tuple[Any, Any, Any, Any]:
return (
record.get("device_serial"),
record.get("channel_no"),
record.get("requested_begin"),
record.get("requested_end"),
)
def _redaction_token(hik_config: dict[str, Any]) -> str:
token = hik_config.get("access_token")
if token:
return str(token)
token_env = hik_config.get("access_token_env")
if token_env:
return os.environ.get(str(token_env), "")
return ""