import os import tempfile import unittest from datetime import datetime from pathlib import Path from unittest.mock import patch from zoneinfo import ZoneInfo from video_ai_analysis_poc import hik_cloud from video_ai_analysis_poc.hik_cloud import ( build_download_chunks, request_download_address, resolve_access_token, ) from video_ai_analysis_poc.manifest import read_jsonl, write_manifest class HikCloudTests(unittest.TestCase): def test_build_download_chunks_defaults_to_600_second_chunks(self): config = { "runtime": {"timezone": "Asia/Shanghai"}, "hik_cloud": { "devices": [ { "device_serial": "EXAMPLE_DEVICE_SERIAL", "channel_no": 1, "name": "front", } ], "time_ranges": [ { "begin": "2026-02-03 09:00:00", "end": "2026-02-03 10:30:00", } ], }, } chunks = build_download_chunks(config) requested_begin = int( datetime(2026, 2, 3, 9, 0, 0, tzinfo=ZoneInfo("Asia/Shanghai")).timestamp() ) requested_end = int( datetime(2026, 2, 3, 10, 30, 0, tzinfo=ZoneInfo("Asia/Shanghai")).timestamp() ) self.assertEqual(len(chunks), 9) self.assertEqual(chunks[0]["time_begin"], requested_begin) self.assertEqual(chunks[0]["time_end"], requested_begin + 600) self.assertEqual(chunks[-1]["time_begin"], requested_begin + 4800) self.assertEqual(chunks[-1]["time_end"], requested_end) for chunk in chunks: self.assertLessEqual(chunk["time_end"] - chunk["time_begin"], 600) def test_build_download_chunks_allows_explicit_3600_second_chunks(self): config = { "runtime": {"timezone": "Asia/Shanghai"}, "hik_cloud": { "chunk_seconds": 3600, "devices": [{"device_serial": "EXAMPLE_DEVICE_SERIAL", "channel_no": 1}], "time_ranges": [ { "begin": "2026-02-03 09:00:00", "end": "2026-02-03 10:30:00", } ], }, } chunks = build_download_chunks(config) requested_begin = int( datetime(2026, 2, 3, 9, 0, 0, tzinfo=ZoneInfo("Asia/Shanghai")).timestamp() ) requested_end = int( datetime(2026, 2, 3, 10, 30, 0, tzinfo=ZoneInfo("Asia/Shanghai")).timestamp() ) self.assertEqual(len(chunks), 2) self.assertEqual(chunks[0]["time_begin"], requested_begin) self.assertEqual(chunks[0]["time_end"], requested_begin + 3600) self.assertEqual(chunks[1]["time_begin"], requested_begin + 3600) self.assertEqual(chunks[1]["time_end"], requested_end) for chunk in chunks: self.assertLessEqual(chunk["time_end"] - chunk["time_begin"], 3600) def test_build_download_chunks_accepts_epoch_time_ranges(self): config = { "hik_cloud": { "devices": [{"device_serial": "EXAMPLE_DEVICE_SERIAL", "channel_no": 1}], "time_ranges": [{"begin": 1770080400, "end": 1770084000.0}], } } chunks = build_download_chunks(config) self.assertEqual(len(chunks), 6) self.assertEqual(chunks[0]["time_begin"], 1770080400) self.assertEqual(chunks[0]["time_end"], 1770081000) self.assertEqual(chunks[-1]["time_begin"], 1770083400) self.assertEqual(chunks[-1]["time_end"], 1770084000) def test_build_download_chunks_rejects_end_before_begin(self): config = { "hik_cloud": { "devices": [{"device_serial": "EXAMPLE_DEVICE_SERIAL", "channel_no": 1}], "time_ranges": [ { "begin": "2026-02-03 10:30:00", "end": "2026-02-03 09:00:00", } ], }, } with self.assertRaisesRegex(ValueError, "end must be after begin"): build_download_chunks(config) def test_build_download_chunks_rejects_chunk_seconds_over_3600(self): config = { "hik_cloud": { "chunk_seconds": 7200, "devices": [{"device_serial": "EXAMPLE_DEVICE_SERIAL", "channel_no": 1}], "time_ranges": [ { "begin": "2026-02-03 09:00:00", "end": "2026-02-03 11:30:00", } ], }, } with self.assertRaisesRegex( ValueError, "chunk_seconds must be less than or equal to 3600" ): build_download_chunks(config) def test_resolve_access_token_prefers_literal_token_over_environment(self): config = { "hik_cloud": { "access_token": "DIRECT_TOKEN", "access_token_env": "HIK_CLOUD_ACCESS_TOKEN", } } with patch.dict(os.environ, {"HIK_CLOUD_ACCESS_TOKEN": "ENV_TOKEN"}): token = resolve_access_token(config) self.assertEqual(token, "DIRECT_TOKEN") def test_resolve_access_token_reads_configured_environment_variable(self): hik_config = {"access_token_env": "HIK_CLOUD_ACCESS_TOKEN"} with patch.dict(os.environ, {"HIK_CLOUD_ACCESS_TOKEN": "ENV_TOKEN"}): token = resolve_access_token(hik_config) self.assertEqual(token, "ENV_TOKEN") def test_resolve_access_token_raises_without_leaking_secret_values(self): hik_config = {"access_token_env": "HIK_CLOUD_ACCESS_TOKEN"} with patch.dict(os.environ, {}, clear=True): with self.assertRaises(ValueError) as raised: resolve_access_token(hik_config) message = str(raised.exception) self.assertIn("access_token", message) self.assertNotIn("TOKEN", message) def test_request_download_address_posts_expected_request_and_returns_success(self): chunk = { "device_serial": "EXAMPLE_DEVICE_SERIAL", "channel_no": 1, "requested_begin": 1764856787, "requested_end": 1764856978, "time_begin": 1764856787, "time_end": 1764856978, } hik_config = { "api_base_url": "https://api2.hik-cloud.com/", "download_path": "/v1/carrier/cstorage/open/play/download", "access_token": "TOKEN", "timeout_seconds": 12, } calls = [] def fake_http_post(url, json_body, headers, timeout_seconds): calls.append( { "url": url, "json_body": json_body, "headers": headers, "timeout_seconds": timeout_seconds, } ) return { "code": 0, "success": True, "data": { "url": "https://download.example/video.mp4?sig=abc", "actualBeginTime": "1764856787", "actualEndTime": "1764856978", }, } result = request_download_address(chunk, hik_config, http_post=fake_http_post) self.assertEqual(len(calls), 1) self.assertEqual( calls[0]["url"], "https://api2.hik-cloud.com/v1/carrier/cstorage/open/play/download", ) self.assertEqual(calls[0]["headers"]["Authorization"], "bearer TOKEN") self.assertEqual(calls[0]["headers"]["Content-Type"], "application/json") self.assertEqual( calls[0]["json_body"], { "deviceSerial": "EXAMPLE_DEVICE_SERIAL", "channelNo": 1, "timeBegin": 1764856787, "timeEnd": 1764856978, }, ) self.assertEqual(calls[0]["timeout_seconds"], 12) self.assertEqual(result["status"], "address_ok") self.assertEqual(result["url"], "https://download.example/video.mp4?sig=abc") self.assertEqual(result["actual_begin"], 1764856787) self.assertEqual(result["actual_end"], 1764856978) self.assertEqual(result["device_serial"], "EXAMPLE_DEVICE_SERIAL") self.assertEqual(result["channel_no"], 1) self.assertEqual(result["requested_begin"], 1764856787) self.assertEqual(result["requested_end"], 1764856978) def test_request_download_address_returns_no_recording_for_known_empty_code(self): chunk = { "device_serial": "EXAMPLE_DEVICE_SERIAL", "channel_no": 1, "requested_begin": 1764856787, "requested_end": 1764856978, "time_begin": 1764856787, "time_end": 1764856978, } hik_config = { "api_base_url": "https://api2.hik-cloud.com", "download_path": "/v1/carrier/cstorage/open/play/download", "access_token": "TOKEN", } def fake_http_post(url, json_body, headers, timeout_seconds): return {"code": 80438027, "msg": "no recording"} result = request_download_address(chunk, hik_config, http_post=fake_http_post) self.assertEqual(result["status"], "no_recording") self.assertEqual(result["code"], 80438027) self.assertEqual(result["device_serial"], "EXAMPLE_DEVICE_SERIAL") self.assertNotIn("url", result) def test_request_download_address_returns_sanitized_failure_for_other_codes(self): chunk = { "device_serial": "EXAMPLE_DEVICE_SERIAL", "channel_no": 1, "requested_begin": 1764856787, "requested_end": 1764856978, "time_begin": 1764856787, "time_end": 1764856978, } hik_config = { "api_base_url": "https://api2.hik-cloud.com", "download_path": "/v1/carrier/cstorage/open/play/download", "access_token": "TOKEN", } def fake_http_post(url, json_body, headers, timeout_seconds): return {"code": 80430002, "msg": "bad TOKEN Authorization request"} result = request_download_address(chunk, hik_config, http_post=fake_http_post) self.assertEqual(result["status"], "address_failed") self.assertEqual(result["code"], 80430002) self.assertIn("last_error", result) self.assertNotIn("TOKEN", str(result)) self.assertNotIn("Authorization", str(result)) def test_download_hik_cloud_recordings_writes_file_records_and_manifest(self): with tempfile.TemporaryDirectory() as tmp: output_dir = Path(tmp) config = _download_config() address_calls = [] download_calls = [] def fake_address_client(chunk, hik_config): address_calls.append((chunk, hik_config)) return { **chunk, "status": "address_ok", "url": ( "https://download.example/video.mp4?" "sign=SECRET&sig=SECRET&TOKEN=SECRET" ), "actual_begin": chunk["time_begin"] + 1, "actual_end": chunk["time_end"] - 1, } def fake_download_url(url, timeout_seconds=None): download_calls.append((url, timeout_seconds)) return b"fake mp4 bytes" records = hik_cloud.download_hik_cloud_recordings( config, output_dir, address_client=fake_address_client, download_url=fake_download_url, ) self.assertEqual(len(address_calls), 1) self.assertEqual(len(download_calls), 1) self.assertEqual(download_calls[0][1], 600) expected_path = ( output_dir / "downloads" / "hik_cloud" / "EXAMPLE_DEVICE_SERIAL" / "ch1" / "EXAMPLE_DEVICE_SERIAL_ch1_1764856787_1764856978.mp4" ).resolve(strict=False) self.assertEqual(expected_path.read_bytes(), b"fake mp4 bytes") self.assertEqual(len(records), 1) self.assertEqual(records[0]["path"], str(expected_path)) self.assertEqual(records[0]["source"], "hik_cloud") self.assertEqual(records[0]["source_path"], "hik_cloud://EXAMPLE_DEVICE_SERIAL/ch1/1764856787-1764856978") self.assertEqual(records[0]["device_serial"], "EXAMPLE_DEVICE_SERIAL") self.assertEqual(records[0]["channel_no"], 1) self.assertEqual(records[0]["requested_begin"], 1764856787) self.assertEqual(records[0]["requested_end"], 1764856978) self.assertEqual(records[0]["actual_begin"], 1764856788) self.assertEqual(records[0]["actual_end"], 1764856977) self.assertEqual(records[0]["status"], "downloaded") manifest = read_jsonl(output_dir / "hik_cloud_download_manifest.jsonl") self.assertEqual(len(manifest), 1) self.assertEqual(manifest[0]["status"], "downloaded") self.assertIsNone(manifest[0]["last_error"]) self.assertEqual(manifest[0]["download_url_host"], "download.example") self.assertEqual(manifest[0]["path"], str(expected_path)) serialized_path = expected_path.name serialized_manifest = str(manifest) self.assertNotIn("sign=", serialized_path) self.assertNotIn("sig=", serialized_path) self.assertNotIn("TOKEN", serialized_path) self.assertNotIn("sign=", serialized_manifest) self.assertNotIn("sig=", serialized_manifest) self.assertNotIn("TOKEN", serialized_manifest) def test_download_hik_cloud_recordings_can_plan_without_downloading(self): with tempfile.TemporaryDirectory() as tmp: output_dir = Path(tmp) config = _download_config() download_calls = [] def fake_address_client(chunk, hik_config): return { **chunk, "status": "address_ok", "url": ( "https://download.example/video.mp4?" "sign=SECRET&sig=SECRET&TOKEN=SECRET" ), "actual_begin": chunk["time_begin"], "actual_end": chunk["time_end"], } def fake_download_url(url, timeout_seconds=None): download_calls.append(url) return b"unexpected" records = hik_cloud.download_hik_cloud_recordings( config, output_dir, address_client=fake_address_client, download_url=fake_download_url, download=False, ) self.assertEqual(records, []) self.assertEqual(download_calls, []) manifest = read_jsonl(output_dir / "hik_cloud_download_manifest.jsonl") self.assertEqual(len(manifest), 1) self.assertEqual(manifest[0]["status"], "address_ok") self.assertIsNone(manifest[0]["path"]) self.assertEqual(manifest[0]["download_url_host"], "download.example") self.assertNotIn("sign=", str(manifest)) self.assertNotIn("sig=", str(manifest)) self.assertNotIn("TOKEN", str(manifest)) def test_download_hik_cloud_recordings_records_empty_and_address_failures(self): with tempfile.TemporaryDirectory() as tmp: output_dir = Path(tmp) config = _download_config( time_ranges=[ {"begin": 1764856787, "end": 1764856978}, {"begin": 1764857000, "end": 1764857100}, ] ) statuses = ["no_recording", "address_failed"] download_calls = [] def fake_address_client(chunk, hik_config): status = statuses.pop(0) return { **chunk, "status": status, "actual_begin": None, "actual_end": None, "last_error": None if status == "no_recording" else "api failed", } def fake_download_url(url, timeout_seconds=None): download_calls.append(url) return b"unexpected" records = hik_cloud.download_hik_cloud_recordings( config, output_dir, address_client=fake_address_client, download_url=fake_download_url, ) self.assertEqual(records, []) self.assertEqual(download_calls, []) manifest = read_jsonl(output_dir / "hik_cloud_download_manifest.jsonl") self.assertEqual([record["status"] for record in manifest], ["no_recording", "address_failed"]) def test_download_hik_cloud_recordings_records_download_failure_and_continues(self): with tempfile.TemporaryDirectory() as tmp: output_dir = Path(tmp) config = _download_config( time_ranges=[ {"begin": 1764856787, "end": 1764856978}, {"begin": 1764857000, "end": 1764857100}, ] ) download_calls = [] def fake_address_client(chunk, hik_config): return { **chunk, "status": "address_ok", "url": ( "https://download.example/video.mp4?" "sign=SECRET&sig=SECRET&TOKEN=SECRET" ), "actual_begin": chunk["time_begin"], "actual_end": chunk["time_end"], } def fake_download_url(url, timeout_seconds=None): download_calls.append(url) if len(download_calls) == 1: raise RuntimeError( "download failed for query sign=SECRET&sig=SECRET&TOKEN=SECRET" ) return b"second chunk" records = hik_cloud.download_hik_cloud_recordings( config, output_dir, address_client=fake_address_client, download_url=fake_download_url, ) self.assertEqual(len(download_calls), 2) self.assertEqual(len(records), 1) self.assertEqual(records[0]["status"], "downloaded") manifest = read_jsonl(output_dir / "hik_cloud_download_manifest.jsonl") self.assertEqual([record["status"] for record in manifest], ["download_failed", "downloaded"]) self.assertIn("last_error", manifest[0]) self.assertNotIn("sign=", str(manifest)) self.assertNotIn("sig=", str(manifest)) self.assertNotIn("TOKEN", str(manifest)) self.assertNotIn("SECRET", str(manifest)) def test_download_hik_cloud_recordings_resume_skips_existing_downloaded_file(self): with tempfile.TemporaryDirectory() as tmp: output_dir = Path(tmp) config = _download_config(resume=True) downloaded_path = ( output_dir / "downloads" / "hik_cloud" / "EXAMPLE_DEVICE_SERIAL" / "ch1" / "EXAMPLE_DEVICE_SERIAL_ch1_1764856787_1764856978.mp4" ) downloaded_path.parent.mkdir(parents=True, exist_ok=True) downloaded_path.write_bytes(b"existing") existing_record = { "source": "hik_cloud", "path": str(downloaded_path), "device_serial": "EXAMPLE_DEVICE_SERIAL", "channel_no": 1, "requested_begin": 1764856787, "requested_end": 1764856978, "actual_begin": 1764856787, "actual_end": 1764856978, "status": "downloaded", "retry_count": 0, "last_error": None, } write_manifest( output_dir / "hik_cloud_download_manifest.jsonl", [existing_record], ) def failing_address_client(chunk, hik_config): raise AssertionError("resume should skip address lookup") def failing_download_url(url, timeout_seconds=None): raise AssertionError("resume should skip download") records = hik_cloud.download_hik_cloud_recordings( config, output_dir, address_client=failing_address_client, download_url=failing_download_url, ) expected_video_record = { **existing_record, "source_path": "hik_cloud://EXAMPLE_DEVICE_SERIAL/ch1/1764856787-1764856978", } self.assertEqual(records, [expected_video_record]) manifest = read_jsonl(output_dir / "hik_cloud_download_manifest.jsonl") self.assertEqual(manifest, [existing_record]) def _download_config( *, time_ranges=None, resume: bool = False, ): return { "output": {"resume": resume}, "hik_cloud": { "access_token": "TOKEN", "download_timeout_seconds": 600, "devices": [{"device_serial": "EXAMPLE_DEVICE_SERIAL", "channel_no": 1}], "time_ranges": time_ranges or [{"begin": 1764856787, "end": 1764856978}], }, } if __name__ == "__main__": unittest.main()