diff --git a/api/services/retention/workflow_run/archive_paid_plan_workflow_run.py b/api/services/retention/workflow_run/archive_paid_plan_workflow_run.py index 2c1f99a3bc..ab60986bfe 100644 --- a/api/services/retention/workflow_run/archive_paid_plan_workflow_run.py +++ b/api/services/retention/workflow_run/archive_paid_plan_workflow_run.py @@ -24,7 +24,7 @@ import zipfile from collections.abc import Sequence from concurrent.futures import ThreadPoolExecutor from dataclasses import dataclass, field -from typing import Any +from typing import Any, TypedDict import click from graphon.enums import WorkflowType @@ -49,6 +49,23 @@ from services.retention.workflow_run.constants import ARCHIVE_BUNDLE_NAME, ARCHI logger = logging.getLogger(__name__) +class TableStatsManifestEntry(TypedDict): + row_count: int + checksum: str + size_bytes: int + + +class ArchiveManifestDict(TypedDict): + schema_version: str + workflow_run_id: str + tenant_id: str + app_id: str + workflow_id: str + created_at: str + archived_at: str + tables: dict[str, TableStatsManifestEntry] + + @dataclass class TableStats: """Statistics for a single archived table.""" @@ -472,25 +489,26 @@ class WorkflowRunArchiver: self, run: WorkflowRun, table_stats: list[TableStats], - ) -> dict[str, Any]: + ) -> ArchiveManifestDict: """Generate a manifest for the archived workflow run.""" - return { - "schema_version": ARCHIVE_SCHEMA_VERSION, - "workflow_run_id": run.id, - "tenant_id": run.tenant_id, - "app_id": run.app_id, - "workflow_id": run.workflow_id, - "created_at": run.created_at.isoformat(), - "archived_at": datetime.datetime.now(datetime.UTC).isoformat(), - "tables": { - stat.table_name: { - "row_count": stat.row_count, - "checksum": stat.checksum, - "size_bytes": stat.size_bytes, - } - for stat in table_stats - }, + tables: dict[str, TableStatsManifestEntry] = { + stat.table_name: { + "row_count": stat.row_count, + "checksum": stat.checksum, + "size_bytes": stat.size_bytes, + } + for stat in table_stats } + return ArchiveManifestDict( + schema_version=ARCHIVE_SCHEMA_VERSION, + workflow_run_id=run.id, + tenant_id=run.tenant_id, + app_id=run.app_id, + workflow_id=run.workflow_id, + created_at=run.created_at.isoformat(), + archived_at=datetime.datetime.now(datetime.UTC).isoformat(), + tables=tables, + ) def _build_archive_bundle(self, manifest_data: bytes, table_payloads: dict[str, bytes]) -> bytes: buffer = io.BytesIO()