refactor(api): type archive manifest with ArchiveManifestDict TypedDict (#34594)

This commit is contained in:
YBoy 2026-04-06 13:35:11 +02:00 committed by GitHub
parent 9081c46565
commit 1e5cd69205
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -24,7 +24,7 @@ import zipfile
from collections.abc import Sequence
from concurrent.futures import ThreadPoolExecutor
from dataclasses import dataclass, field
from typing import Any
from typing import Any, TypedDict
import click
from graphon.enums import WorkflowType
@ -49,6 +49,23 @@ from services.retention.workflow_run.constants import ARCHIVE_BUNDLE_NAME, ARCHI
logger = logging.getLogger(__name__)
class TableStatsManifestEntry(TypedDict):
row_count: int
checksum: str
size_bytes: int
class ArchiveManifestDict(TypedDict):
schema_version: str
workflow_run_id: str
tenant_id: str
app_id: str
workflow_id: str
created_at: str
archived_at: str
tables: dict[str, TableStatsManifestEntry]
@dataclass
class TableStats:
"""Statistics for a single archived table."""
@ -472,25 +489,26 @@ class WorkflowRunArchiver:
self,
run: WorkflowRun,
table_stats: list[TableStats],
) -> dict[str, Any]:
) -> ArchiveManifestDict:
"""Generate a manifest for the archived workflow run."""
return {
"schema_version": ARCHIVE_SCHEMA_VERSION,
"workflow_run_id": run.id,
"tenant_id": run.tenant_id,
"app_id": run.app_id,
"workflow_id": run.workflow_id,
"created_at": run.created_at.isoformat(),
"archived_at": datetime.datetime.now(datetime.UTC).isoformat(),
"tables": {
stat.table_name: {
"row_count": stat.row_count,
"checksum": stat.checksum,
"size_bytes": stat.size_bytes,
}
for stat in table_stats
},
tables: dict[str, TableStatsManifestEntry] = {
stat.table_name: {
"row_count": stat.row_count,
"checksum": stat.checksum,
"size_bytes": stat.size_bytes,
}
for stat in table_stats
}
return ArchiveManifestDict(
schema_version=ARCHIVE_SCHEMA_VERSION,
workflow_run_id=run.id,
tenant_id=run.tenant_id,
app_id=run.app_id,
workflow_id=run.workflow_id,
created_at=run.created_at.isoformat(),
archived_at=datetime.datetime.now(datetime.UTC).isoformat(),
tables=tables,
)
def _build_archive_bundle(self, manifest_data: bytes, table_payloads: dict[str, bytes]) -> bytes:
buffer = io.BytesIO()