Files
2026-06-17 11:33:54 +08:00

36 lines
1.2 KiB
Python

from __future__ import annotations
import json
from pathlib import Path
from typing import Any, Iterable
def write_manifest(path: str | Path, records: Iterable[dict[str, Any]]) -> None:
manifest_path = Path(path).expanduser().resolve(strict=False)
manifest_path.parent.mkdir(parents=True, exist_ok=True)
with manifest_path.open("w", encoding="utf-8") as handle:
for record in records:
normalized = _normalize_record(record)
handle.write(
json.dumps(normalized, ensure_ascii=False, sort_keys=True) + "\n"
)
def read_jsonl(path: str | Path) -> list[dict[str, Any]]:
jsonl_path = Path(path).expanduser().resolve(strict=False)
if not jsonl_path.exists():
return []
records = []
for line in jsonl_path.read_text(encoding="utf-8").splitlines():
if line.strip():
records.append(json.loads(line))
return records
def _normalize_record(record: dict[str, Any]) -> dict[str, Any]:
normalized = dict(record)
normalized.setdefault("status", "pending")
normalized.setdefault("retry_count", 0)
normalized.setdefault("last_error", None)
return normalized