Files
video-ai-analysis/tests/test_hik_cloud.py
2026-06-17 11:33:54 +08:00

555 lines
22 KiB
Python

import os
import tempfile
import unittest
from datetime import datetime
from pathlib import Path
from unittest.mock import patch
from zoneinfo import ZoneInfo
from video_ai_analysis_poc import hik_cloud
from video_ai_analysis_poc.hik_cloud import (
build_download_chunks,
request_download_address,
resolve_access_token,
)
from video_ai_analysis_poc.manifest import read_jsonl, write_manifest
class HikCloudTests(unittest.TestCase):
def test_build_download_chunks_defaults_to_600_second_chunks(self):
config = {
"runtime": {"timezone": "Asia/Shanghai"},
"hik_cloud": {
"devices": [
{
"device_serial": "EXAMPLE_DEVICE_SERIAL",
"channel_no": 1,
"name": "front",
}
],
"time_ranges": [
{
"begin": "2026-02-03 09:00:00",
"end": "2026-02-03 10:30:00",
}
],
},
}
chunks = build_download_chunks(config)
requested_begin = int(
datetime(2026, 2, 3, 9, 0, 0, tzinfo=ZoneInfo("Asia/Shanghai")).timestamp()
)
requested_end = int(
datetime(2026, 2, 3, 10, 30, 0, tzinfo=ZoneInfo("Asia/Shanghai")).timestamp()
)
self.assertEqual(len(chunks), 9)
self.assertEqual(chunks[0]["time_begin"], requested_begin)
self.assertEqual(chunks[0]["time_end"], requested_begin + 600)
self.assertEqual(chunks[-1]["time_begin"], requested_begin + 4800)
self.assertEqual(chunks[-1]["time_end"], requested_end)
for chunk in chunks:
self.assertLessEqual(chunk["time_end"] - chunk["time_begin"], 600)
def test_build_download_chunks_allows_explicit_3600_second_chunks(self):
config = {
"runtime": {"timezone": "Asia/Shanghai"},
"hik_cloud": {
"chunk_seconds": 3600,
"devices": [{"device_serial": "EXAMPLE_DEVICE_SERIAL", "channel_no": 1}],
"time_ranges": [
{
"begin": "2026-02-03 09:00:00",
"end": "2026-02-03 10:30:00",
}
],
},
}
chunks = build_download_chunks(config)
requested_begin = int(
datetime(2026, 2, 3, 9, 0, 0, tzinfo=ZoneInfo("Asia/Shanghai")).timestamp()
)
requested_end = int(
datetime(2026, 2, 3, 10, 30, 0, tzinfo=ZoneInfo("Asia/Shanghai")).timestamp()
)
self.assertEqual(len(chunks), 2)
self.assertEqual(chunks[0]["time_begin"], requested_begin)
self.assertEqual(chunks[0]["time_end"], requested_begin + 3600)
self.assertEqual(chunks[1]["time_begin"], requested_begin + 3600)
self.assertEqual(chunks[1]["time_end"], requested_end)
for chunk in chunks:
self.assertLessEqual(chunk["time_end"] - chunk["time_begin"], 3600)
def test_build_download_chunks_accepts_epoch_time_ranges(self):
config = {
"hik_cloud": {
"devices": [{"device_serial": "EXAMPLE_DEVICE_SERIAL", "channel_no": 1}],
"time_ranges": [{"begin": 1770080400, "end": 1770084000.0}],
}
}
chunks = build_download_chunks(config)
self.assertEqual(len(chunks), 6)
self.assertEqual(chunks[0]["time_begin"], 1770080400)
self.assertEqual(chunks[0]["time_end"], 1770081000)
self.assertEqual(chunks[-1]["time_begin"], 1770083400)
self.assertEqual(chunks[-1]["time_end"], 1770084000)
def test_build_download_chunks_rejects_end_before_begin(self):
config = {
"hik_cloud": {
"devices": [{"device_serial": "EXAMPLE_DEVICE_SERIAL", "channel_no": 1}],
"time_ranges": [
{
"begin": "2026-02-03 10:30:00",
"end": "2026-02-03 09:00:00",
}
],
},
}
with self.assertRaisesRegex(ValueError, "end must be after begin"):
build_download_chunks(config)
def test_build_download_chunks_rejects_chunk_seconds_over_3600(self):
config = {
"hik_cloud": {
"chunk_seconds": 7200,
"devices": [{"device_serial": "EXAMPLE_DEVICE_SERIAL", "channel_no": 1}],
"time_ranges": [
{
"begin": "2026-02-03 09:00:00",
"end": "2026-02-03 11:30:00",
}
],
},
}
with self.assertRaisesRegex(
ValueError, "chunk_seconds must be less than or equal to 3600"
):
build_download_chunks(config)
def test_resolve_access_token_prefers_literal_token_over_environment(self):
config = {
"hik_cloud": {
"access_token": "DIRECT_TOKEN",
"access_token_env": "HIK_CLOUD_ACCESS_TOKEN",
}
}
with patch.dict(os.environ, {"HIK_CLOUD_ACCESS_TOKEN": "ENV_TOKEN"}):
token = resolve_access_token(config)
self.assertEqual(token, "DIRECT_TOKEN")
def test_resolve_access_token_reads_configured_environment_variable(self):
hik_config = {"access_token_env": "HIK_CLOUD_ACCESS_TOKEN"}
with patch.dict(os.environ, {"HIK_CLOUD_ACCESS_TOKEN": "ENV_TOKEN"}):
token = resolve_access_token(hik_config)
self.assertEqual(token, "ENV_TOKEN")
def test_resolve_access_token_raises_without_leaking_secret_values(self):
hik_config = {"access_token_env": "HIK_CLOUD_ACCESS_TOKEN"}
with patch.dict(os.environ, {}, clear=True):
with self.assertRaises(ValueError) as raised:
resolve_access_token(hik_config)
message = str(raised.exception)
self.assertIn("access_token", message)
self.assertNotIn("TOKEN", message)
def test_request_download_address_posts_expected_request_and_returns_success(self):
chunk = {
"device_serial": "EXAMPLE_DEVICE_SERIAL",
"channel_no": 1,
"requested_begin": 1764856787,
"requested_end": 1764856978,
"time_begin": 1764856787,
"time_end": 1764856978,
}
hik_config = {
"api_base_url": "https://api2.hik-cloud.com/",
"download_path": "/v1/carrier/cstorage/open/play/download",
"access_token": "TOKEN",
"timeout_seconds": 12,
}
calls = []
def fake_http_post(url, json_body, headers, timeout_seconds):
calls.append(
{
"url": url,
"json_body": json_body,
"headers": headers,
"timeout_seconds": timeout_seconds,
}
)
return {
"code": 0,
"success": True,
"data": {
"url": "https://download.example/video.mp4?sig=abc",
"actualBeginTime": "1764856787",
"actualEndTime": "1764856978",
},
}
result = request_download_address(chunk, hik_config, http_post=fake_http_post)
self.assertEqual(len(calls), 1)
self.assertEqual(
calls[0]["url"],
"https://api2.hik-cloud.com/v1/carrier/cstorage/open/play/download",
)
self.assertEqual(calls[0]["headers"]["Authorization"], "bearer TOKEN")
self.assertEqual(calls[0]["headers"]["Content-Type"], "application/json")
self.assertEqual(
calls[0]["json_body"],
{
"deviceSerial": "EXAMPLE_DEVICE_SERIAL",
"channelNo": 1,
"timeBegin": 1764856787,
"timeEnd": 1764856978,
},
)
self.assertEqual(calls[0]["timeout_seconds"], 12)
self.assertEqual(result["status"], "address_ok")
self.assertEqual(result["url"], "https://download.example/video.mp4?sig=abc")
self.assertEqual(result["actual_begin"], 1764856787)
self.assertEqual(result["actual_end"], 1764856978)
self.assertEqual(result["device_serial"], "EXAMPLE_DEVICE_SERIAL")
self.assertEqual(result["channel_no"], 1)
self.assertEqual(result["requested_begin"], 1764856787)
self.assertEqual(result["requested_end"], 1764856978)
def test_request_download_address_returns_no_recording_for_known_empty_code(self):
chunk = {
"device_serial": "EXAMPLE_DEVICE_SERIAL",
"channel_no": 1,
"requested_begin": 1764856787,
"requested_end": 1764856978,
"time_begin": 1764856787,
"time_end": 1764856978,
}
hik_config = {
"api_base_url": "https://api2.hik-cloud.com",
"download_path": "/v1/carrier/cstorage/open/play/download",
"access_token": "TOKEN",
}
def fake_http_post(url, json_body, headers, timeout_seconds):
return {"code": 80438027, "msg": "no recording"}
result = request_download_address(chunk, hik_config, http_post=fake_http_post)
self.assertEqual(result["status"], "no_recording")
self.assertEqual(result["code"], 80438027)
self.assertEqual(result["device_serial"], "EXAMPLE_DEVICE_SERIAL")
self.assertNotIn("url", result)
def test_request_download_address_returns_sanitized_failure_for_other_codes(self):
chunk = {
"device_serial": "EXAMPLE_DEVICE_SERIAL",
"channel_no": 1,
"requested_begin": 1764856787,
"requested_end": 1764856978,
"time_begin": 1764856787,
"time_end": 1764856978,
}
hik_config = {
"api_base_url": "https://api2.hik-cloud.com",
"download_path": "/v1/carrier/cstorage/open/play/download",
"access_token": "TOKEN",
}
def fake_http_post(url, json_body, headers, timeout_seconds):
return {"code": 80430002, "msg": "bad TOKEN Authorization request"}
result = request_download_address(chunk, hik_config, http_post=fake_http_post)
self.assertEqual(result["status"], "address_failed")
self.assertEqual(result["code"], 80430002)
self.assertIn("last_error", result)
self.assertNotIn("TOKEN", str(result))
self.assertNotIn("Authorization", str(result))
def test_download_hik_cloud_recordings_writes_file_records_and_manifest(self):
with tempfile.TemporaryDirectory() as tmp:
output_dir = Path(tmp)
config = _download_config()
address_calls = []
download_calls = []
def fake_address_client(chunk, hik_config):
address_calls.append((chunk, hik_config))
return {
**chunk,
"status": "address_ok",
"url": (
"https://download.example/video.mp4?"
"sign=SECRET&sig=SECRET&TOKEN=SECRET"
),
"actual_begin": chunk["time_begin"] + 1,
"actual_end": chunk["time_end"] - 1,
}
def fake_download_url(url, timeout_seconds=None):
download_calls.append((url, timeout_seconds))
return b"fake mp4 bytes"
records = hik_cloud.download_hik_cloud_recordings(
config,
output_dir,
address_client=fake_address_client,
download_url=fake_download_url,
)
self.assertEqual(len(address_calls), 1)
self.assertEqual(len(download_calls), 1)
self.assertEqual(download_calls[0][1], 600)
expected_path = (
output_dir
/ "downloads"
/ "hik_cloud"
/ "EXAMPLE_DEVICE_SERIAL"
/ "ch1"
/ "EXAMPLE_DEVICE_SERIAL_ch1_1764856787_1764856978.mp4"
).resolve(strict=False)
self.assertEqual(expected_path.read_bytes(), b"fake mp4 bytes")
self.assertEqual(len(records), 1)
self.assertEqual(records[0]["path"], str(expected_path))
self.assertEqual(records[0]["source"], "hik_cloud")
self.assertEqual(records[0]["source_path"], "hik_cloud://EXAMPLE_DEVICE_SERIAL/ch1/1764856787-1764856978")
self.assertEqual(records[0]["device_serial"], "EXAMPLE_DEVICE_SERIAL")
self.assertEqual(records[0]["channel_no"], 1)
self.assertEqual(records[0]["requested_begin"], 1764856787)
self.assertEqual(records[0]["requested_end"], 1764856978)
self.assertEqual(records[0]["actual_begin"], 1764856788)
self.assertEqual(records[0]["actual_end"], 1764856977)
self.assertEqual(records[0]["status"], "downloaded")
manifest = read_jsonl(output_dir / "hik_cloud_download_manifest.jsonl")
self.assertEqual(len(manifest), 1)
self.assertEqual(manifest[0]["status"], "downloaded")
self.assertIsNone(manifest[0]["last_error"])
self.assertEqual(manifest[0]["download_url_host"], "download.example")
self.assertEqual(manifest[0]["path"], str(expected_path))
serialized_path = expected_path.name
serialized_manifest = str(manifest)
self.assertNotIn("sign=", serialized_path)
self.assertNotIn("sig=", serialized_path)
self.assertNotIn("TOKEN", serialized_path)
self.assertNotIn("sign=", serialized_manifest)
self.assertNotIn("sig=", serialized_manifest)
self.assertNotIn("TOKEN", serialized_manifest)
def test_download_hik_cloud_recordings_can_plan_without_downloading(self):
with tempfile.TemporaryDirectory() as tmp:
output_dir = Path(tmp)
config = _download_config()
download_calls = []
def fake_address_client(chunk, hik_config):
return {
**chunk,
"status": "address_ok",
"url": (
"https://download.example/video.mp4?"
"sign=SECRET&sig=SECRET&TOKEN=SECRET"
),
"actual_begin": chunk["time_begin"],
"actual_end": chunk["time_end"],
}
def fake_download_url(url, timeout_seconds=None):
download_calls.append(url)
return b"unexpected"
records = hik_cloud.download_hik_cloud_recordings(
config,
output_dir,
address_client=fake_address_client,
download_url=fake_download_url,
download=False,
)
self.assertEqual(records, [])
self.assertEqual(download_calls, [])
manifest = read_jsonl(output_dir / "hik_cloud_download_manifest.jsonl")
self.assertEqual(len(manifest), 1)
self.assertEqual(manifest[0]["status"], "address_ok")
self.assertIsNone(manifest[0]["path"])
self.assertEqual(manifest[0]["download_url_host"], "download.example")
self.assertNotIn("sign=", str(manifest))
self.assertNotIn("sig=", str(manifest))
self.assertNotIn("TOKEN", str(manifest))
def test_download_hik_cloud_recordings_records_empty_and_address_failures(self):
with tempfile.TemporaryDirectory() as tmp:
output_dir = Path(tmp)
config = _download_config(
time_ranges=[
{"begin": 1764856787, "end": 1764856978},
{"begin": 1764857000, "end": 1764857100},
]
)
statuses = ["no_recording", "address_failed"]
download_calls = []
def fake_address_client(chunk, hik_config):
status = statuses.pop(0)
return {
**chunk,
"status": status,
"actual_begin": None,
"actual_end": None,
"last_error": None if status == "no_recording" else "api failed",
}
def fake_download_url(url, timeout_seconds=None):
download_calls.append(url)
return b"unexpected"
records = hik_cloud.download_hik_cloud_recordings(
config,
output_dir,
address_client=fake_address_client,
download_url=fake_download_url,
)
self.assertEqual(records, [])
self.assertEqual(download_calls, [])
manifest = read_jsonl(output_dir / "hik_cloud_download_manifest.jsonl")
self.assertEqual([record["status"] for record in manifest], ["no_recording", "address_failed"])
def test_download_hik_cloud_recordings_records_download_failure_and_continues(self):
with tempfile.TemporaryDirectory() as tmp:
output_dir = Path(tmp)
config = _download_config(
time_ranges=[
{"begin": 1764856787, "end": 1764856978},
{"begin": 1764857000, "end": 1764857100},
]
)
download_calls = []
def fake_address_client(chunk, hik_config):
return {
**chunk,
"status": "address_ok",
"url": (
"https://download.example/video.mp4?"
"sign=SECRET&sig=SECRET&TOKEN=SECRET"
),
"actual_begin": chunk["time_begin"],
"actual_end": chunk["time_end"],
}
def fake_download_url(url, timeout_seconds=None):
download_calls.append(url)
if len(download_calls) == 1:
raise RuntimeError(
"download failed for query sign=SECRET&sig=SECRET&TOKEN=SECRET"
)
return b"second chunk"
records = hik_cloud.download_hik_cloud_recordings(
config,
output_dir,
address_client=fake_address_client,
download_url=fake_download_url,
)
self.assertEqual(len(download_calls), 2)
self.assertEqual(len(records), 1)
self.assertEqual(records[0]["status"], "downloaded")
manifest = read_jsonl(output_dir / "hik_cloud_download_manifest.jsonl")
self.assertEqual([record["status"] for record in manifest], ["download_failed", "downloaded"])
self.assertIn("last_error", manifest[0])
self.assertNotIn("sign=", str(manifest))
self.assertNotIn("sig=", str(manifest))
self.assertNotIn("TOKEN", str(manifest))
self.assertNotIn("SECRET", str(manifest))
def test_download_hik_cloud_recordings_resume_skips_existing_downloaded_file(self):
with tempfile.TemporaryDirectory() as tmp:
output_dir = Path(tmp)
config = _download_config(resume=True)
downloaded_path = (
output_dir
/ "downloads"
/ "hik_cloud"
/ "EXAMPLE_DEVICE_SERIAL"
/ "ch1"
/ "EXAMPLE_DEVICE_SERIAL_ch1_1764856787_1764856978.mp4"
)
downloaded_path.parent.mkdir(parents=True, exist_ok=True)
downloaded_path.write_bytes(b"existing")
existing_record = {
"source": "hik_cloud",
"path": str(downloaded_path),
"device_serial": "EXAMPLE_DEVICE_SERIAL",
"channel_no": 1,
"requested_begin": 1764856787,
"requested_end": 1764856978,
"actual_begin": 1764856787,
"actual_end": 1764856978,
"status": "downloaded",
"retry_count": 0,
"last_error": None,
}
write_manifest(
output_dir / "hik_cloud_download_manifest.jsonl",
[existing_record],
)
def failing_address_client(chunk, hik_config):
raise AssertionError("resume should skip address lookup")
def failing_download_url(url, timeout_seconds=None):
raise AssertionError("resume should skip download")
records = hik_cloud.download_hik_cloud_recordings(
config,
output_dir,
address_client=failing_address_client,
download_url=failing_download_url,
)
expected_video_record = {
**existing_record,
"source_path": "hik_cloud://EXAMPLE_DEVICE_SERIAL/ch1/1764856787-1764856978",
}
self.assertEqual(records, [expected_video_record])
manifest = read_jsonl(output_dir / "hik_cloud_download_manifest.jsonl")
self.assertEqual(manifest, [existing_record])
def _download_config(
*,
time_ranges=None,
resume: bool = False,
):
return {
"output": {"resume": resume},
"hik_cloud": {
"access_token": "TOKEN",
"download_timeout_seconds": 600,
"devices": [{"device_serial": "EXAMPLE_DEVICE_SERIAL", "channel_no": 1}],
"time_ranges": time_ranges
or [{"begin": 1764856787, "end": 1764856978}],
},
}
if __name__ == "__main__":
unittest.main()