555 lines
22 KiB
Python
555 lines
22 KiB
Python
import os
|
|
import tempfile
|
|
import unittest
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
from unittest.mock import patch
|
|
from zoneinfo import ZoneInfo
|
|
|
|
from video_ai_analysis_poc import hik_cloud
|
|
from video_ai_analysis_poc.hik_cloud import (
|
|
build_download_chunks,
|
|
request_download_address,
|
|
resolve_access_token,
|
|
)
|
|
from video_ai_analysis_poc.manifest import read_jsonl, write_manifest
|
|
|
|
|
|
class HikCloudTests(unittest.TestCase):
|
|
def test_build_download_chunks_defaults_to_600_second_chunks(self):
|
|
config = {
|
|
"runtime": {"timezone": "Asia/Shanghai"},
|
|
"hik_cloud": {
|
|
"devices": [
|
|
{
|
|
"device_serial": "EXAMPLE_DEVICE_SERIAL",
|
|
"channel_no": 1,
|
|
"name": "front",
|
|
}
|
|
],
|
|
"time_ranges": [
|
|
{
|
|
"begin": "2026-02-03 09:00:00",
|
|
"end": "2026-02-03 10:30:00",
|
|
}
|
|
],
|
|
},
|
|
}
|
|
|
|
chunks = build_download_chunks(config)
|
|
|
|
requested_begin = int(
|
|
datetime(2026, 2, 3, 9, 0, 0, tzinfo=ZoneInfo("Asia/Shanghai")).timestamp()
|
|
)
|
|
requested_end = int(
|
|
datetime(2026, 2, 3, 10, 30, 0, tzinfo=ZoneInfo("Asia/Shanghai")).timestamp()
|
|
)
|
|
self.assertEqual(len(chunks), 9)
|
|
self.assertEqual(chunks[0]["time_begin"], requested_begin)
|
|
self.assertEqual(chunks[0]["time_end"], requested_begin + 600)
|
|
self.assertEqual(chunks[-1]["time_begin"], requested_begin + 4800)
|
|
self.assertEqual(chunks[-1]["time_end"], requested_end)
|
|
for chunk in chunks:
|
|
self.assertLessEqual(chunk["time_end"] - chunk["time_begin"], 600)
|
|
|
|
def test_build_download_chunks_allows_explicit_3600_second_chunks(self):
|
|
config = {
|
|
"runtime": {"timezone": "Asia/Shanghai"},
|
|
"hik_cloud": {
|
|
"chunk_seconds": 3600,
|
|
"devices": [{"device_serial": "EXAMPLE_DEVICE_SERIAL", "channel_no": 1}],
|
|
"time_ranges": [
|
|
{
|
|
"begin": "2026-02-03 09:00:00",
|
|
"end": "2026-02-03 10:30:00",
|
|
}
|
|
],
|
|
},
|
|
}
|
|
|
|
chunks = build_download_chunks(config)
|
|
|
|
requested_begin = int(
|
|
datetime(2026, 2, 3, 9, 0, 0, tzinfo=ZoneInfo("Asia/Shanghai")).timestamp()
|
|
)
|
|
requested_end = int(
|
|
datetime(2026, 2, 3, 10, 30, 0, tzinfo=ZoneInfo("Asia/Shanghai")).timestamp()
|
|
)
|
|
self.assertEqual(len(chunks), 2)
|
|
self.assertEqual(chunks[0]["time_begin"], requested_begin)
|
|
self.assertEqual(chunks[0]["time_end"], requested_begin + 3600)
|
|
self.assertEqual(chunks[1]["time_begin"], requested_begin + 3600)
|
|
self.assertEqual(chunks[1]["time_end"], requested_end)
|
|
for chunk in chunks:
|
|
self.assertLessEqual(chunk["time_end"] - chunk["time_begin"], 3600)
|
|
|
|
def test_build_download_chunks_accepts_epoch_time_ranges(self):
|
|
config = {
|
|
"hik_cloud": {
|
|
"devices": [{"device_serial": "EXAMPLE_DEVICE_SERIAL", "channel_no": 1}],
|
|
"time_ranges": [{"begin": 1770080400, "end": 1770084000.0}],
|
|
}
|
|
}
|
|
|
|
chunks = build_download_chunks(config)
|
|
|
|
self.assertEqual(len(chunks), 6)
|
|
self.assertEqual(chunks[0]["time_begin"], 1770080400)
|
|
self.assertEqual(chunks[0]["time_end"], 1770081000)
|
|
self.assertEqual(chunks[-1]["time_begin"], 1770083400)
|
|
self.assertEqual(chunks[-1]["time_end"], 1770084000)
|
|
|
|
def test_build_download_chunks_rejects_end_before_begin(self):
|
|
config = {
|
|
"hik_cloud": {
|
|
"devices": [{"device_serial": "EXAMPLE_DEVICE_SERIAL", "channel_no": 1}],
|
|
"time_ranges": [
|
|
{
|
|
"begin": "2026-02-03 10:30:00",
|
|
"end": "2026-02-03 09:00:00",
|
|
}
|
|
],
|
|
},
|
|
}
|
|
|
|
with self.assertRaisesRegex(ValueError, "end must be after begin"):
|
|
build_download_chunks(config)
|
|
|
|
def test_build_download_chunks_rejects_chunk_seconds_over_3600(self):
|
|
config = {
|
|
"hik_cloud": {
|
|
"chunk_seconds": 7200,
|
|
"devices": [{"device_serial": "EXAMPLE_DEVICE_SERIAL", "channel_no": 1}],
|
|
"time_ranges": [
|
|
{
|
|
"begin": "2026-02-03 09:00:00",
|
|
"end": "2026-02-03 11:30:00",
|
|
}
|
|
],
|
|
},
|
|
}
|
|
|
|
with self.assertRaisesRegex(
|
|
ValueError, "chunk_seconds must be less than or equal to 3600"
|
|
):
|
|
build_download_chunks(config)
|
|
|
|
def test_resolve_access_token_prefers_literal_token_over_environment(self):
|
|
config = {
|
|
"hik_cloud": {
|
|
"access_token": "DIRECT_TOKEN",
|
|
"access_token_env": "HIK_CLOUD_ACCESS_TOKEN",
|
|
}
|
|
}
|
|
|
|
with patch.dict(os.environ, {"HIK_CLOUD_ACCESS_TOKEN": "ENV_TOKEN"}):
|
|
token = resolve_access_token(config)
|
|
|
|
self.assertEqual(token, "DIRECT_TOKEN")
|
|
|
|
def test_resolve_access_token_reads_configured_environment_variable(self):
|
|
hik_config = {"access_token_env": "HIK_CLOUD_ACCESS_TOKEN"}
|
|
|
|
with patch.dict(os.environ, {"HIK_CLOUD_ACCESS_TOKEN": "ENV_TOKEN"}):
|
|
token = resolve_access_token(hik_config)
|
|
|
|
self.assertEqual(token, "ENV_TOKEN")
|
|
|
|
def test_resolve_access_token_raises_without_leaking_secret_values(self):
|
|
hik_config = {"access_token_env": "HIK_CLOUD_ACCESS_TOKEN"}
|
|
|
|
with patch.dict(os.environ, {}, clear=True):
|
|
with self.assertRaises(ValueError) as raised:
|
|
resolve_access_token(hik_config)
|
|
|
|
message = str(raised.exception)
|
|
self.assertIn("access_token", message)
|
|
self.assertNotIn("TOKEN", message)
|
|
|
|
def test_request_download_address_posts_expected_request_and_returns_success(self):
|
|
chunk = {
|
|
"device_serial": "EXAMPLE_DEVICE_SERIAL",
|
|
"channel_no": 1,
|
|
"requested_begin": 1764856787,
|
|
"requested_end": 1764856978,
|
|
"time_begin": 1764856787,
|
|
"time_end": 1764856978,
|
|
}
|
|
hik_config = {
|
|
"api_base_url": "https://api2.hik-cloud.com/",
|
|
"download_path": "/v1/carrier/cstorage/open/play/download",
|
|
"access_token": "TOKEN",
|
|
"timeout_seconds": 12,
|
|
}
|
|
calls = []
|
|
|
|
def fake_http_post(url, json_body, headers, timeout_seconds):
|
|
calls.append(
|
|
{
|
|
"url": url,
|
|
"json_body": json_body,
|
|
"headers": headers,
|
|
"timeout_seconds": timeout_seconds,
|
|
}
|
|
)
|
|
return {
|
|
"code": 0,
|
|
"success": True,
|
|
"data": {
|
|
"url": "https://download.example/video.mp4?sig=abc",
|
|
"actualBeginTime": "1764856787",
|
|
"actualEndTime": "1764856978",
|
|
},
|
|
}
|
|
|
|
result = request_download_address(chunk, hik_config, http_post=fake_http_post)
|
|
|
|
self.assertEqual(len(calls), 1)
|
|
self.assertEqual(
|
|
calls[0]["url"],
|
|
"https://api2.hik-cloud.com/v1/carrier/cstorage/open/play/download",
|
|
)
|
|
self.assertEqual(calls[0]["headers"]["Authorization"], "bearer TOKEN")
|
|
self.assertEqual(calls[0]["headers"]["Content-Type"], "application/json")
|
|
self.assertEqual(
|
|
calls[0]["json_body"],
|
|
{
|
|
"deviceSerial": "EXAMPLE_DEVICE_SERIAL",
|
|
"channelNo": 1,
|
|
"timeBegin": 1764856787,
|
|
"timeEnd": 1764856978,
|
|
},
|
|
)
|
|
self.assertEqual(calls[0]["timeout_seconds"], 12)
|
|
self.assertEqual(result["status"], "address_ok")
|
|
self.assertEqual(result["url"], "https://download.example/video.mp4?sig=abc")
|
|
self.assertEqual(result["actual_begin"], 1764856787)
|
|
self.assertEqual(result["actual_end"], 1764856978)
|
|
self.assertEqual(result["device_serial"], "EXAMPLE_DEVICE_SERIAL")
|
|
self.assertEqual(result["channel_no"], 1)
|
|
self.assertEqual(result["requested_begin"], 1764856787)
|
|
self.assertEqual(result["requested_end"], 1764856978)
|
|
|
|
def test_request_download_address_returns_no_recording_for_known_empty_code(self):
|
|
chunk = {
|
|
"device_serial": "EXAMPLE_DEVICE_SERIAL",
|
|
"channel_no": 1,
|
|
"requested_begin": 1764856787,
|
|
"requested_end": 1764856978,
|
|
"time_begin": 1764856787,
|
|
"time_end": 1764856978,
|
|
}
|
|
hik_config = {
|
|
"api_base_url": "https://api2.hik-cloud.com",
|
|
"download_path": "/v1/carrier/cstorage/open/play/download",
|
|
"access_token": "TOKEN",
|
|
}
|
|
|
|
def fake_http_post(url, json_body, headers, timeout_seconds):
|
|
return {"code": 80438027, "msg": "no recording"}
|
|
|
|
result = request_download_address(chunk, hik_config, http_post=fake_http_post)
|
|
|
|
self.assertEqual(result["status"], "no_recording")
|
|
self.assertEqual(result["code"], 80438027)
|
|
self.assertEqual(result["device_serial"], "EXAMPLE_DEVICE_SERIAL")
|
|
self.assertNotIn("url", result)
|
|
|
|
def test_request_download_address_returns_sanitized_failure_for_other_codes(self):
|
|
chunk = {
|
|
"device_serial": "EXAMPLE_DEVICE_SERIAL",
|
|
"channel_no": 1,
|
|
"requested_begin": 1764856787,
|
|
"requested_end": 1764856978,
|
|
"time_begin": 1764856787,
|
|
"time_end": 1764856978,
|
|
}
|
|
hik_config = {
|
|
"api_base_url": "https://api2.hik-cloud.com",
|
|
"download_path": "/v1/carrier/cstorage/open/play/download",
|
|
"access_token": "TOKEN",
|
|
}
|
|
|
|
def fake_http_post(url, json_body, headers, timeout_seconds):
|
|
return {"code": 80430002, "msg": "bad TOKEN Authorization request"}
|
|
|
|
result = request_download_address(chunk, hik_config, http_post=fake_http_post)
|
|
|
|
self.assertEqual(result["status"], "address_failed")
|
|
self.assertEqual(result["code"], 80430002)
|
|
self.assertIn("last_error", result)
|
|
self.assertNotIn("TOKEN", str(result))
|
|
self.assertNotIn("Authorization", str(result))
|
|
|
|
def test_download_hik_cloud_recordings_writes_file_records_and_manifest(self):
|
|
with tempfile.TemporaryDirectory() as tmp:
|
|
output_dir = Path(tmp)
|
|
config = _download_config()
|
|
address_calls = []
|
|
download_calls = []
|
|
|
|
def fake_address_client(chunk, hik_config):
|
|
address_calls.append((chunk, hik_config))
|
|
return {
|
|
**chunk,
|
|
"status": "address_ok",
|
|
"url": (
|
|
"https://download.example/video.mp4?"
|
|
"sign=SECRET&sig=SECRET&TOKEN=SECRET"
|
|
),
|
|
"actual_begin": chunk["time_begin"] + 1,
|
|
"actual_end": chunk["time_end"] - 1,
|
|
}
|
|
|
|
def fake_download_url(url, timeout_seconds=None):
|
|
download_calls.append((url, timeout_seconds))
|
|
return b"fake mp4 bytes"
|
|
|
|
records = hik_cloud.download_hik_cloud_recordings(
|
|
config,
|
|
output_dir,
|
|
address_client=fake_address_client,
|
|
download_url=fake_download_url,
|
|
)
|
|
|
|
self.assertEqual(len(address_calls), 1)
|
|
self.assertEqual(len(download_calls), 1)
|
|
self.assertEqual(download_calls[0][1], 600)
|
|
expected_path = (
|
|
output_dir
|
|
/ "downloads"
|
|
/ "hik_cloud"
|
|
/ "EXAMPLE_DEVICE_SERIAL"
|
|
/ "ch1"
|
|
/ "EXAMPLE_DEVICE_SERIAL_ch1_1764856787_1764856978.mp4"
|
|
).resolve(strict=False)
|
|
self.assertEqual(expected_path.read_bytes(), b"fake mp4 bytes")
|
|
self.assertEqual(len(records), 1)
|
|
self.assertEqual(records[0]["path"], str(expected_path))
|
|
self.assertEqual(records[0]["source"], "hik_cloud")
|
|
self.assertEqual(records[0]["source_path"], "hik_cloud://EXAMPLE_DEVICE_SERIAL/ch1/1764856787-1764856978")
|
|
self.assertEqual(records[0]["device_serial"], "EXAMPLE_DEVICE_SERIAL")
|
|
self.assertEqual(records[0]["channel_no"], 1)
|
|
self.assertEqual(records[0]["requested_begin"], 1764856787)
|
|
self.assertEqual(records[0]["requested_end"], 1764856978)
|
|
self.assertEqual(records[0]["actual_begin"], 1764856788)
|
|
self.assertEqual(records[0]["actual_end"], 1764856977)
|
|
self.assertEqual(records[0]["status"], "downloaded")
|
|
|
|
manifest = read_jsonl(output_dir / "hik_cloud_download_manifest.jsonl")
|
|
self.assertEqual(len(manifest), 1)
|
|
self.assertEqual(manifest[0]["status"], "downloaded")
|
|
self.assertIsNone(manifest[0]["last_error"])
|
|
self.assertEqual(manifest[0]["download_url_host"], "download.example")
|
|
self.assertEqual(manifest[0]["path"], str(expected_path))
|
|
serialized_path = expected_path.name
|
|
serialized_manifest = str(manifest)
|
|
self.assertNotIn("sign=", serialized_path)
|
|
self.assertNotIn("sig=", serialized_path)
|
|
self.assertNotIn("TOKEN", serialized_path)
|
|
self.assertNotIn("sign=", serialized_manifest)
|
|
self.assertNotIn("sig=", serialized_manifest)
|
|
self.assertNotIn("TOKEN", serialized_manifest)
|
|
|
|
def test_download_hik_cloud_recordings_can_plan_without_downloading(self):
|
|
with tempfile.TemporaryDirectory() as tmp:
|
|
output_dir = Path(tmp)
|
|
config = _download_config()
|
|
download_calls = []
|
|
|
|
def fake_address_client(chunk, hik_config):
|
|
return {
|
|
**chunk,
|
|
"status": "address_ok",
|
|
"url": (
|
|
"https://download.example/video.mp4?"
|
|
"sign=SECRET&sig=SECRET&TOKEN=SECRET"
|
|
),
|
|
"actual_begin": chunk["time_begin"],
|
|
"actual_end": chunk["time_end"],
|
|
}
|
|
|
|
def fake_download_url(url, timeout_seconds=None):
|
|
download_calls.append(url)
|
|
return b"unexpected"
|
|
|
|
records = hik_cloud.download_hik_cloud_recordings(
|
|
config,
|
|
output_dir,
|
|
address_client=fake_address_client,
|
|
download_url=fake_download_url,
|
|
download=False,
|
|
)
|
|
|
|
self.assertEqual(records, [])
|
|
self.assertEqual(download_calls, [])
|
|
manifest = read_jsonl(output_dir / "hik_cloud_download_manifest.jsonl")
|
|
self.assertEqual(len(manifest), 1)
|
|
self.assertEqual(manifest[0]["status"], "address_ok")
|
|
self.assertIsNone(manifest[0]["path"])
|
|
self.assertEqual(manifest[0]["download_url_host"], "download.example")
|
|
self.assertNotIn("sign=", str(manifest))
|
|
self.assertNotIn("sig=", str(manifest))
|
|
self.assertNotIn("TOKEN", str(manifest))
|
|
|
|
def test_download_hik_cloud_recordings_records_empty_and_address_failures(self):
|
|
with tempfile.TemporaryDirectory() as tmp:
|
|
output_dir = Path(tmp)
|
|
config = _download_config(
|
|
time_ranges=[
|
|
{"begin": 1764856787, "end": 1764856978},
|
|
{"begin": 1764857000, "end": 1764857100},
|
|
]
|
|
)
|
|
statuses = ["no_recording", "address_failed"]
|
|
download_calls = []
|
|
|
|
def fake_address_client(chunk, hik_config):
|
|
status = statuses.pop(0)
|
|
return {
|
|
**chunk,
|
|
"status": status,
|
|
"actual_begin": None,
|
|
"actual_end": None,
|
|
"last_error": None if status == "no_recording" else "api failed",
|
|
}
|
|
|
|
def fake_download_url(url, timeout_seconds=None):
|
|
download_calls.append(url)
|
|
return b"unexpected"
|
|
|
|
records = hik_cloud.download_hik_cloud_recordings(
|
|
config,
|
|
output_dir,
|
|
address_client=fake_address_client,
|
|
download_url=fake_download_url,
|
|
)
|
|
|
|
self.assertEqual(records, [])
|
|
self.assertEqual(download_calls, [])
|
|
manifest = read_jsonl(output_dir / "hik_cloud_download_manifest.jsonl")
|
|
self.assertEqual([record["status"] for record in manifest], ["no_recording", "address_failed"])
|
|
|
|
def test_download_hik_cloud_recordings_records_download_failure_and_continues(self):
|
|
with tempfile.TemporaryDirectory() as tmp:
|
|
output_dir = Path(tmp)
|
|
config = _download_config(
|
|
time_ranges=[
|
|
{"begin": 1764856787, "end": 1764856978},
|
|
{"begin": 1764857000, "end": 1764857100},
|
|
]
|
|
)
|
|
download_calls = []
|
|
|
|
def fake_address_client(chunk, hik_config):
|
|
return {
|
|
**chunk,
|
|
"status": "address_ok",
|
|
"url": (
|
|
"https://download.example/video.mp4?"
|
|
"sign=SECRET&sig=SECRET&TOKEN=SECRET"
|
|
),
|
|
"actual_begin": chunk["time_begin"],
|
|
"actual_end": chunk["time_end"],
|
|
}
|
|
|
|
def fake_download_url(url, timeout_seconds=None):
|
|
download_calls.append(url)
|
|
if len(download_calls) == 1:
|
|
raise RuntimeError(
|
|
"download failed for query sign=SECRET&sig=SECRET&TOKEN=SECRET"
|
|
)
|
|
return b"second chunk"
|
|
|
|
records = hik_cloud.download_hik_cloud_recordings(
|
|
config,
|
|
output_dir,
|
|
address_client=fake_address_client,
|
|
download_url=fake_download_url,
|
|
)
|
|
|
|
self.assertEqual(len(download_calls), 2)
|
|
self.assertEqual(len(records), 1)
|
|
self.assertEqual(records[0]["status"], "downloaded")
|
|
manifest = read_jsonl(output_dir / "hik_cloud_download_manifest.jsonl")
|
|
self.assertEqual([record["status"] for record in manifest], ["download_failed", "downloaded"])
|
|
self.assertIn("last_error", manifest[0])
|
|
self.assertNotIn("sign=", str(manifest))
|
|
self.assertNotIn("sig=", str(manifest))
|
|
self.assertNotIn("TOKEN", str(manifest))
|
|
self.assertNotIn("SECRET", str(manifest))
|
|
|
|
def test_download_hik_cloud_recordings_resume_skips_existing_downloaded_file(self):
|
|
with tempfile.TemporaryDirectory() as tmp:
|
|
output_dir = Path(tmp)
|
|
config = _download_config(resume=True)
|
|
downloaded_path = (
|
|
output_dir
|
|
/ "downloads"
|
|
/ "hik_cloud"
|
|
/ "EXAMPLE_DEVICE_SERIAL"
|
|
/ "ch1"
|
|
/ "EXAMPLE_DEVICE_SERIAL_ch1_1764856787_1764856978.mp4"
|
|
)
|
|
downloaded_path.parent.mkdir(parents=True, exist_ok=True)
|
|
downloaded_path.write_bytes(b"existing")
|
|
existing_record = {
|
|
"source": "hik_cloud",
|
|
"path": str(downloaded_path),
|
|
"device_serial": "EXAMPLE_DEVICE_SERIAL",
|
|
"channel_no": 1,
|
|
"requested_begin": 1764856787,
|
|
"requested_end": 1764856978,
|
|
"actual_begin": 1764856787,
|
|
"actual_end": 1764856978,
|
|
"status": "downloaded",
|
|
"retry_count": 0,
|
|
"last_error": None,
|
|
}
|
|
write_manifest(
|
|
output_dir / "hik_cloud_download_manifest.jsonl",
|
|
[existing_record],
|
|
)
|
|
|
|
def failing_address_client(chunk, hik_config):
|
|
raise AssertionError("resume should skip address lookup")
|
|
|
|
def failing_download_url(url, timeout_seconds=None):
|
|
raise AssertionError("resume should skip download")
|
|
|
|
records = hik_cloud.download_hik_cloud_recordings(
|
|
config,
|
|
output_dir,
|
|
address_client=failing_address_client,
|
|
download_url=failing_download_url,
|
|
)
|
|
|
|
expected_video_record = {
|
|
**existing_record,
|
|
"source_path": "hik_cloud://EXAMPLE_DEVICE_SERIAL/ch1/1764856787-1764856978",
|
|
}
|
|
self.assertEqual(records, [expected_video_record])
|
|
manifest = read_jsonl(output_dir / "hik_cloud_download_manifest.jsonl")
|
|
self.assertEqual(manifest, [existing_record])
|
|
|
|
|
|
def _download_config(
|
|
*,
|
|
time_ranges=None,
|
|
resume: bool = False,
|
|
):
|
|
return {
|
|
"output": {"resume": resume},
|
|
"hik_cloud": {
|
|
"access_token": "TOKEN",
|
|
"download_timeout_seconds": 600,
|
|
"devices": [{"device_serial": "EXAMPLE_DEVICE_SERIAL", "channel_no": 1}],
|
|
"time_ranges": time_ranges
|
|
or [{"begin": 1764856787, "end": 1764856978}],
|
|
},
|
|
}
|
|
|
|
|
|
if __name__ == "__main__":
|
|
unittest.main()
|