From cf1ebdadf5aec185fe4210cd74cece41d5982fec Mon Sep 17 00:00:00 2001 From: "Byron.wang" Date: Tue, 23 Jun 2026 00:20:25 -0700 Subject: [PATCH] feat(retention): add V2 workflow run archive bundlesa (#37747) Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com> --- api/commands/__init__.py | 2 + api/commands/retention.py | 639 +++++++++++-- api/extensions/ext_commands.py | 2 + .../api_workflow_run_repository.py | 5 + .../sqlalchemy_api_workflow_run_repository.py | 47 +- .../archive_paid_plan_workflow_run.py | 600 ++++++++---- .../bundle_archive_maintenance.py | 872 ++++++++++++++++++ .../retention/workflow_run/constants.py | 8 + .../delete_archived_workflow_run.py | 333 ++++++- .../retention/workflow_run/tenant_prefix.py | 20 + .../retention/test_workflow_run_archiver.py | 245 ++++- .../test_delete_archived_workflow_run.py | 177 +++- .../test_delete_archived_workflow_run.py | 96 ++ .../test_archive_workflow_run_logs.py | 18 +- 14 files changed, 2736 insertions(+), 328 deletions(-) create mode 100644 api/services/retention/workflow_run/bundle_archive_maintenance.py create mode 100644 api/services/retention/workflow_run/tenant_prefix.py create mode 100644 api/tests/unit_tests/services/retention/workflow_run/test_delete_archived_workflow_run.py diff --git a/api/commands/__init__.py b/api/commands/__init__.py index 94321ed1e49..e4207bea74e 100644 --- a/api/commands/__init__.py +++ b/api/commands/__init__.py @@ -25,6 +25,7 @@ from .plugin import ( from .rbac import migrate_member_roles_to_rbac from .retention import ( archive_workflow_runs, + archive_workflow_runs_plan, clean_expired_messages, clean_workflow_runs, cleanup_orphaned_draft_variables, @@ -51,6 +52,7 @@ from .vector import ( __all__ = [ "add_qdrant_index", "archive_workflow_runs", + "archive_workflow_runs_plan", "backfill_plugin_auto_upgrade", "clean_expired_messages", "clean_workflow_runs", diff --git a/api/commands/retention.py b/api/commands/retention.py index 657a2a2e839..1386e367aff 100644 --- a/api/commands/retention.py +++ b/api/commands/retention.py @@ -12,10 +12,160 @@ from services.clear_free_plan_tenant_expired_logs import ClearFreePlanTenantExpi from services.retention.conversation.messages_clean_policy import create_message_clean_policy from services.retention.conversation.messages_clean_service import MessagesCleanService from services.retention.workflow_run.clear_free_plan_expired_workflow_run_logs import WorkflowRunCleanup +from services.retention.workflow_run.tenant_prefix import tenant_prefix_condition from tasks.remove_app_and_related_data_task import delete_draft_variables_batch logger = logging.getLogger(__name__) +_HEX_PREFIXES = tuple("0123456789abcdef") + + +class WorkflowRunArchivePlanRow(TypedDict): + tenant_prefix: str + total_tenants: int + workflow_runs: int + workflow_node_executions: int + paid_tenants: int + unpaid_tenants: int + + +class WorkflowRunArchiveTenantPlan(TypedDict): + archive_tenant_ids: list[str] | None + paid_tenant_ids: list[str] + unpaid_tenant_ids: list[str] + + +def _parse_tenant_prefixes(prefixes: str | None) -> list[str]: + if not prefixes: + return [] + + parsed = [] + for raw_prefix in prefixes.split(","): + prefix = raw_prefix.strip().lower() + if not prefix: + continue + if len(prefix) != 1 or prefix not in _HEX_PREFIXES: + raise click.UsageError("--tenant-prefixes must be a comma-separated list of hex digits, e.g. 0,1,a,f.") + parsed.append(prefix) + return sorted(set(parsed)) + + +def _get_archive_candidate_tenant_ids_by_prefix( + prefix: str, + *, + start_from: datetime.datetime | None, + end_before: datetime.datetime, +) -> list[str]: + from graphon.enums import WorkflowExecutionStatus + from models.workflow import WorkflowRun + from services.retention.workflow_run.archive_paid_plan_workflow_run import WorkflowRunArchiver + + conditions = [ + WorkflowRun.created_at < end_before, + WorkflowRun.status.in_(WorkflowExecutionStatus.ended_values()), + WorkflowRun.type.in_(WorkflowRunArchiver.ARCHIVED_TYPE), + tenant_prefix_condition(WorkflowRun.tenant_id, prefix), + ] + if start_from is not None: + conditions.append(WorkflowRun.created_at >= start_from) + + tenant_ids = db.session.scalars( + sa.select(WorkflowRun.tenant_id).where(*conditions).distinct().order_by(WorkflowRun.tenant_id) + ).all() + return list(tenant_ids) + + +def _filter_paid_workflow_archive_tenant_ids(tenant_ids: list[str]) -> tuple[list[str], list[str]]: + from configs import dify_config + from enums.cloud_plan import CloudPlan + from services.billing_service import BillingService + + tenant_ids = sorted(set(tenant_ids)) + if not tenant_ids: + return [], [] + if not dify_config.BILLING_ENABLED: + return tenant_ids, [] + + plans = BillingService.get_plan_bulk_with_cache(tenant_ids) + paid_tenant_ids = [ + tenant_id + for tenant_id in tenant_ids + if plans.get(tenant_id) and plans[tenant_id].get("plan") in (CloudPlan.PROFESSIONAL, CloudPlan.TEAM) + ] + unpaid_tenant_ids = sorted(set(tenant_ids) - set(paid_tenant_ids)) + return paid_tenant_ids, unpaid_tenant_ids + + +def _resolve_archive_tenant_ids_from_plan( + *, + tenant_ids: str | None, + tenant_prefixes: list[str], + start_from: datetime.datetime | None, + end_before: datetime.datetime, +) -> WorkflowRunArchiveTenantPlan: + """ + Resolve the archive tenant scope once before scanning workflow_runs. + + Prefix rollout should use the tenant list collected by the same planning path, then archive by + tenant_id IN (...). Scanning workflow_runs with a tenant prefix range in every archive run is too expensive on + the large production table this command is meant to shrink. + """ + if tenant_ids: + requested_tenant_ids = [tid.strip() for tid in tenant_ids.split(",") if tid.strip()] + elif tenant_prefixes: + requested_tenant_ids = [] + for prefix in tenant_prefixes: + requested_tenant_ids.extend( + _get_archive_candidate_tenant_ids_by_prefix( + prefix, + start_from=start_from, + end_before=end_before, + ) + ) + else: + return WorkflowRunArchiveTenantPlan( + archive_tenant_ids=None, + paid_tenant_ids=[], + unpaid_tenant_ids=[], + ) + + paid_tenant_ids, unpaid_tenant_ids = _filter_paid_workflow_archive_tenant_ids(requested_tenant_ids) + return WorkflowRunArchiveTenantPlan( + archive_tenant_ids=paid_tenant_ids, + paid_tenant_ids=paid_tenant_ids, + unpaid_tenant_ids=unpaid_tenant_ids, + ) + + +def _resolve_archive_time_range( + *, + before_days: int, + from_days_ago: int | None, + to_days_ago: int | None, + start_from: datetime.datetime | None, + end_before: datetime.datetime | None, +) -> tuple[int, datetime.datetime | None, datetime.datetime | None]: + if (start_from is None) ^ (end_before is None): + raise click.UsageError("--start-from and --end-before must be provided together.") + + if (from_days_ago is None) ^ (to_days_ago is None): + raise click.UsageError("--from-days-ago and --to-days-ago must be provided together.") + + if from_days_ago is not None and to_days_ago is not None: + if start_from or end_before: + raise click.UsageError("Choose either day offsets or explicit dates, not both.") + if from_days_ago <= to_days_ago: + raise click.UsageError("--from-days-ago must be greater than --to-days-ago.") + now = datetime.datetime.now() + start_from = now - datetime.timedelta(days=from_days_ago) + end_before = now - datetime.timedelta(days=to_days_ago) + before_days = 0 + + if start_from and end_before and start_from >= end_before: + raise click.UsageError("--start-from must be earlier than --end-before.") + + return before_days, start_from, end_before + @click.command("clear-free-plan-tenant-expired-logs", help="Clear free plan tenant expired logs.") @click.option("--days", prompt=True, help="The days to clear free plan tenant expired logs.", default=30) @@ -139,11 +289,143 @@ def clean_workflow_runs( ) +@click.command( + "archive-workflow-runs-plan", + help="Plan workflow run archive rollout by tenant ID first hex digit.", +) +@click.option("--before-days", default=90, show_default=True, help="Plan runs older than N days.") +@click.option( + "--from-days-ago", + default=None, + type=click.IntRange(min=0), + help="Lower bound in days ago (older). Must be paired with --to-days-ago.", +) +@click.option( + "--to-days-ago", + default=None, + type=click.IntRange(min=0), + help="Upper bound in days ago (newer). Must be paired with --from-days-ago.", +) +@click.option( + "--start-from", + type=click.DateTime(formats=["%Y-%m-%d", "%Y-%m-%dT%H:%M:%S"]), + default=None, + help="Plan runs created at or after this timestamp (UTC if no timezone).", +) +@click.option( + "--end-before", + type=click.DateTime(formats=["%Y-%m-%d", "%Y-%m-%dT%H:%M:%S"]), + default=None, + help="Plan runs created before this timestamp (UTC if no timezone).", +) +@click.option( + "--include-archived", + is_flag=True, + help="Compatibility no-op for V2 bundle archive; plan counts source rows in the requested window.", +) +def archive_workflow_runs_plan( + before_days: int, + from_days_ago: int | None, + to_days_ago: int | None, + start_from: datetime.datetime | None, + end_before: datetime.datetime | None, + include_archived: bool, +): + """ + Print the 16 tenant-prefix rollout rows used to choose archive execution order. + + Counts use the same workflow run eligibility as archive-workflow-runs: ended runs, + supported workflow types, and the requested created_at window. V2 bundle archive + does not maintain per-run archive logs, so this plan reports source-table volume. + """ + from graphon.enums import WorkflowExecutionStatus + from models.workflow import WorkflowNodeExecutionModel, WorkflowRun + from services.retention.workflow_run.archive_paid_plan_workflow_run import WorkflowRunArchiver + + before_days, start_from, end_before = _resolve_archive_time_range( + before_days=before_days, + from_days_ago=from_days_ago, + to_days_ago=to_days_ago, + start_from=start_from, + end_before=end_before, + ) + plan_end_before = end_before or datetime.datetime.now(datetime.UTC) - datetime.timedelta(days=before_days) + if include_archived: + click.echo(click.style("--include-archived is a no-op for V2 bundle archive plans.", fg="yellow")) + + rows: list[WorkflowRunArchivePlanRow] = [] + for prefix in _HEX_PREFIXES: + tenant_ids = _get_archive_candidate_tenant_ids_by_prefix( + prefix, + start_from=start_from, + end_before=plan_end_before, + ) + total_tenants = len(tenant_ids) + paid_tenant_ids, unpaid_tenant_ids = _filter_paid_workflow_archive_tenant_ids(tenant_ids) + + run_conditions = [ + WorkflowRun.created_at < plan_end_before, + WorkflowRun.status.in_(WorkflowExecutionStatus.ended_values()), + WorkflowRun.type.in_(WorkflowRunArchiver.ARCHIVED_TYPE), + tenant_prefix_condition(WorkflowRun.tenant_id, prefix), + ] + if start_from is not None: + run_conditions.append(WorkflowRun.created_at >= start_from) + workflow_runs = ( + db.session.scalar(sa.select(sa.func.count()).select_from(WorkflowRun).where(*run_conditions)) or 0 + ) + candidate_runs = sa.select(WorkflowRun.id).where(*run_conditions).subquery() + workflow_node_executions = ( + db.session.scalar( + sa.select(sa.func.count()) + .select_from(WorkflowNodeExecutionModel) + .join(candidate_runs, WorkflowNodeExecutionModel.workflow_run_id == candidate_runs.c.id) + ) + or 0 + ) + + rows.append( + WorkflowRunArchivePlanRow( + tenant_prefix=prefix, + total_tenants=total_tenants, + workflow_runs=workflow_runs, + workflow_node_executions=workflow_node_executions, + paid_tenants=len(paid_tenant_ids), + unpaid_tenants=len(unpaid_tenant_ids), + ) + ) + + click.echo( + click.style( + f"Workflow archive plan for runs before {plan_end_before.isoformat()}" + f"{f' and at/after {start_from.isoformat()}' if start_from else ''}.", + fg="white", + ) + ) + click.echo("tenant_prefix,total_tenants,workflow_runs,workflow_node_executions,paid_tenants,unpaid_tenants") + for row in rows: + click.echo( + f"{row['tenant_prefix']},{row['total_tenants']},{row['workflow_runs']}," + f"{row['workflow_node_executions']},{row['paid_tenants']},{row['unpaid_tenants']}" + ) + + ordered_rows = sorted( + rows, + key=lambda row: (row["workflow_runs"] + row["workflow_node_executions"], row["tenant_prefix"]), + ) + click.echo("suggested_execution_order=" + ",".join(row["tenant_prefix"] for row in ordered_rows)) + + @click.command( "archive-workflow-runs", help="Archive workflow runs for paid plan tenants to S3-compatible storage.", ) @click.option("--tenant-ids", default=None, help="Optional comma-separated tenant IDs for grayscale rollout.") +@click.option( + "--tenant-prefixes", + default=None, + help="Optional comma-separated tenant ID first hex digits for rollout waves, e.g. 0,1,a,f.", +) @click.option("--before-days", default=90, show_default=True, help="Archive runs older than N days.") @click.option( "--from-days-ago", @@ -169,13 +451,36 @@ def clean_workflow_runs( default=None, help="Archive runs created before this timestamp (UTC if no timezone).", ) -@click.option("--batch-size", default=100, show_default=True, help="Batch size for processing.") -@click.option("--workers", default=1, show_default=True, type=int, help="Concurrent workflow runs to archive.") +@click.option("--batch-size", default=100, show_default=True, help="Maximum workflow runs per archive bundle.") +@click.option( + "--workers", + default=1, + show_default=True, + type=int, + help="Reserved; bundle archive currently runs serially.", +) +@click.option( + "--run-shard-index", + default=None, + type=click.IntRange(min=0), + help="Zero-based workflow run shard index for parallel cron jobs. Must be paired with --run-shard-total.", +) +@click.option( + "--run-shard-total", + default=None, + type=click.IntRange(min=1, max=16), + help="Total workflow run shard count for parallel cron jobs. Must be paired with --run-shard-index.", +) @click.option("--limit", default=None, type=int, help="Maximum number of runs to archive.") @click.option("--dry-run", is_flag=True, help="Preview without archiving.") -@click.option("--delete-after-archive", is_flag=True, help="Delete runs and related data after archiving.") +@click.option( + "--delete-after-archive", + is_flag=True, + help="Not supported by bundle archive; use a separate bundle delete workflow after validation.", +) def archive_workflow_runs( tenant_ids: str | None, + tenant_prefixes: str | None, before_days: int, from_days_ago: int | None, to_days_ago: int | None, @@ -183,6 +488,8 @@ def archive_workflow_runs( end_before: datetime.datetime | None, batch_size: int, workers: int, + run_shard_index: int | None, + run_shard_total: int | None, limit: int | None, dry_run: bool, delete_after_archive: bool, @@ -190,14 +497,19 @@ def archive_workflow_runs( """ Archive workflow runs for paid plan tenants older than the specified days. - This command archives the following tables to storage: + This command writes V2 tenant/month/shard archive bundles. Each bundle contains Parquet snapshots from: + - workflow_runs + - workflow_app_logs - workflow_node_executions - workflow_node_execution_offload - workflow_pauses - workflow_pause_reasons - workflow_trigger_logs - The workflow_runs and workflow_app_logs tables are preserved for UI listing. + Source database rows are always preserved by archive. Deletion must be handled by + a separate bundle-level delete workflow after manifest, checksum, row-count, and + restore-sampling validation. In --dry-run mode, no storage or database writes + happen; the command estimates per-table Parquet bytes and object size instead. """ from services.retention.workflow_run.archive_paid_plan_workflow_run import WorkflowRunArchiver @@ -209,32 +521,58 @@ def archive_workflow_runs( ) ) - if (start_from is None) ^ (end_before is None): - click.echo(click.style("start-from and end-before must be provided together.", fg="red")) - return - - if (from_days_ago is None) ^ (to_days_ago is None): - click.echo(click.style("from-days-ago and to-days-ago must be provided together.", fg="red")) - return - - if from_days_ago is not None and to_days_ago is not None: - if start_from or end_before: - click.echo(click.style("Choose either day offsets or explicit dates, not both.", fg="red")) - return - if from_days_ago <= to_days_ago: - click.echo(click.style("from-days-ago must be greater than to-days-ago.", fg="red")) - return - now = datetime.datetime.now() - start_from = now - datetime.timedelta(days=from_days_ago) - end_before = now - datetime.timedelta(days=to_days_ago) - before_days = 0 - - if start_from and end_before and start_from >= end_before: - click.echo(click.style("start-from must be earlier than end-before.", fg="red")) + try: + before_days, start_from, end_before = _resolve_archive_time_range( + before_days=before_days, + from_days_ago=from_days_ago, + to_days_ago=to_days_ago, + start_from=start_from, + end_before=end_before, + ) + parsed_tenant_prefixes = _parse_tenant_prefixes(tenant_prefixes) + except click.UsageError as e: + click.echo(click.style(e.message, fg="red")) return + plan_end_before = end_before or datetime.datetime.now(datetime.UTC) - datetime.timedelta(days=before_days) if workers < 1: click.echo(click.style("workers must be at least 1.", fg="red")) return + if (run_shard_index is None) ^ (run_shard_total is None): + click.echo(click.style("run-shard-index and run-shard-total must be provided together.", fg="red")) + return + if run_shard_index is not None and run_shard_total is not None and run_shard_index >= run_shard_total: + click.echo(click.style("run-shard-index must be less than run-shard-total.", fg="red")) + return + if delete_after_archive: + click.echo(click.style("delete-after-archive is not supported by bundle archive.", fg="red")) + return + + try: + tenant_plan = _resolve_archive_tenant_ids_from_plan( + tenant_ids=tenant_ids, + tenant_prefixes=parsed_tenant_prefixes, + start_from=start_from, + end_before=plan_end_before, + ) + except Exception: + logger.exception("Failed to resolve workflow archive tenant plan") + click.echo(click.style("Failed to resolve workflow archive tenant plan.", fg="red")) + return + + planned_tenant_ids = tenant_plan["archive_tenant_ids"] + planned_paid_tenant_ids = tenant_plan["paid_tenant_ids"] if planned_tenant_ids is not None else None + paid_tenants = len(tenant_plan["paid_tenant_ids"]) + unpaid_tenants = len(tenant_plan["unpaid_tenant_ids"]) + if planned_tenant_ids is not None: + click.echo( + click.style( + f"Resolved archive tenant plan: paid_tenants={paid_tenants}, unpaid_tenants={unpaid_tenants}.", + fg="white", + ) + ) + if not planned_tenant_ids: + click.echo(click.style("No paid tenants matched the archive plan; nothing to archive.", fg="yellow")) + return archiver = WorkflowRunArchiver( days=before_days, @@ -242,7 +580,11 @@ def archive_workflow_runs( start_from=start_from, end_before=end_before, workers=workers, - tenant_ids=[tid.strip() for tid in tenant_ids.split(",")] if tenant_ids else None, + tenant_ids=planned_tenant_ids, + tenant_prefixes=parsed_tenant_prefixes, + paid_tenant_ids=planned_paid_tenant_ids, + run_shard_index=run_shard_index, + run_shard_total=run_shard_total, limit=limit, dry_run=dry_run, delete_after_archive=delete_after_archive, @@ -252,7 +594,9 @@ def archive_workflow_runs( click.style( f"Summary: processed={summary.total_runs_processed}, archived={summary.runs_archived}, " f"skipped={summary.runs_skipped}, failed={summary.runs_failed}, " - f"time={summary.total_elapsed_time:.2f}s", + f"bundles_archived={summary.bundles_archived}, bundles_skipped={summary.bundles_skipped}, " + f"bundles_failed={summary.bundles_failed}, " + f"object_size_bytes={summary.total_object_size_bytes}, time={summary.total_elapsed_time:.2f}s", fg="cyan", ) ) @@ -268,6 +612,52 @@ def archive_workflow_runs( ) +def _echo_bundle_archive_operation_summary(summary) -> None: + status = "completed successfully" if summary.bundles_failed == 0 else "completed with failures" + fg = "green" if summary.bundles_failed == 0 else "red" + click.echo( + click.style( + f"{summary.operation} {status}. " + f"bundles_success={summary.bundles_succeeded} bundles_failed={summary.bundles_failed} " + f"runs={summary.runs_processed} rows={summary.rows_processed} " + f"archive_bytes={summary.archive_bytes} duration={summary.elapsed_time:.2f}s " + f"validation_time={summary.validation_time:.2f}s " + f"runs_per_second={summary.runs_per_second:.2f} rows_per_second={summary.rows_per_second:.2f} " + f"bytes_per_second={summary.bytes_per_second:.2f}", + fg=fg, + ) + ) + click.echo(click.style("table,row_count", fg="white")) + for table_name in [ + "workflow_runs", + "workflow_app_logs", + "workflow_node_executions", + "workflow_node_execution_offload", + "workflow_pauses", + "workflow_pause_reasons", + "workflow_trigger_logs", + ]: + click.echo(f"{table_name},{summary.table_counts.get(table_name, 0)}") + for result in summary.results: + if result.success: + click.echo( + click.style( + f" bundle={result.bundle_id} tenant={result.tenant_id} runs={result.run_count} " + f"rows={result.row_count} archive_bytes={result.archive_bytes} " + f"time={result.elapsed_time:.2f}s validation={result.validation_time:.2f}s", + fg="white", + ) + ) + else: + click.echo( + click.style( + f" failed bundle={result.bundle_id} tenant={result.tenant_id} " + f"object_prefix={result.object_prefix} error={result.error}", + fg="red", + ) + ) + + @click.command( "restore-workflow-runs", help="Restore archived workflow runs from S3-compatible storage.", @@ -290,8 +680,8 @@ def archive_workflow_runs( default=None, help="Optional upper bound (exclusive) for created_at; must be paired with --start-from.", ) -@click.option("--workers", default=1, show_default=True, type=int, help="Concurrent workflow runs to restore.") -@click.option("--limit", type=int, default=100, show_default=True, help="Maximum number of runs to restore.") +@click.option("--workers", default=1, show_default=True, type=int, help="V1 --run-id compatibility only.") +@click.option("--limit", type=int, default=100, show_default=True, help="Maximum number of V2 bundles to restore.") @click.option("--dry-run", is_flag=True, help="Preview without restoring.") def restore_workflow_runs( tenant_ids: str | None, @@ -303,15 +693,18 @@ def restore_workflow_runs( dry_run: bool, ): """ - Restore an archived workflow run from storage to the database. + Restore archived workflow runs from storage to the database. - This restores the following tables: + Batch restore uses V2 bundle metadata and validates archive objects before writing source rows. This restores: + - workflow_runs + - workflow_app_logs - workflow_node_executions - workflow_node_execution_offload - workflow_pauses - workflow_pause_reasons - workflow_trigger_logs """ + from services.retention.workflow_run.bundle_archive_maintenance import WorkflowRunBundleArchiveMaintenance from services.retention.workflow_run.restore_archived_workflow_run import WorkflowRunRestore parsed_tenant_ids = None @@ -335,39 +728,46 @@ def restore_workflow_runs( ) ) - restorer = WorkflowRunRestore(dry_run=dry_run, workers=workers) if run_id: + restorer = WorkflowRunRestore(dry_run=dry_run, workers=workers) results = [restorer.restore_by_run_id(run_id)] - else: - assert start_from is not None - assert end_before is not None - results = restorer.restore_batch( - parsed_tenant_ids, - start_date=start_from, - end_date=end_before, - limit=limit, - ) + end_time = datetime.datetime.now(datetime.UTC) + elapsed = end_time - start_time - end_time = datetime.datetime.now(datetime.UTC) - elapsed = end_time - start_time + successes = sum(1 for result in results if result.success) + failures = len(results) - successes - successes = sum(1 for result in results if result.success) - failures = len(results) - successes - - if failures == 0: - click.echo( - click.style( - f"Restore completed successfully. success={successes} duration={elapsed}", - fg="green", + if failures == 0: + click.echo( + click.style( + f"Restore completed successfully. success={successes} duration={elapsed}", + fg="green", + ) ) - ) - else: - click.echo( - click.style( - f"Restore completed with failures. success={successes} failed={failures} duration={elapsed}", - fg="red", + else: + click.echo( + click.style( + f"Restore completed with failures. success={successes} failed={failures} duration={elapsed}", + fg="red", + ) ) + return + + if workers != 1: + click.echo( + click.style("--workers is ignored for V2 bundle restore; bundles are processed serially.", fg="yellow") ) + assert start_from is not None + assert end_before is not None + bundle_restorer = WorkflowRunBundleArchiveMaintenance(dry_run=dry_run, strict_content_validation=True) + summary = bundle_restorer.restore_batch( + tenant_ids=parsed_tenant_ids, + start_date=start_from, + end_date=end_before, + limit=limit, + ) + _echo_bundle_archive_operation_summary(summary) + return @click.command( @@ -392,8 +792,20 @@ def restore_workflow_runs( default=None, help="Optional upper bound (exclusive) for created_at; must be paired with --start-from.", ) -@click.option("--limit", type=int, default=100, show_default=True, help="Maximum number of runs to delete.") +@click.option("--limit", type=int, default=100, show_default=True, help="Maximum number of V2 bundles to delete.") @click.option("--dry-run", is_flag=True, help="Preview without deleting.") +@click.option( + "--skip-bad-archives", + is_flag=True, + help="Continue batch deletion when one archive object fails validation.", +) +@click.option( + "--restore-sample-interval", + type=int, + default=0, + show_default=True, + help="Run restore dry-run after every N successful deletes; 0 disables restore sampling.", +) def delete_archived_workflow_runs( tenant_ids: str | None, run_id: str | None, @@ -401,10 +813,16 @@ def delete_archived_workflow_runs( end_before: datetime.datetime | None, limit: int, dry_run: bool, + skip_bad_archives: bool, + restore_sample_interval: int, ): """ Delete archived workflow runs from the database. + + Batch delete uses V2 bundle metadata and validates object existence, manifest schema, object size, checksum, row + counts, and source/archive content checksums before deleting source rows. `--run-id` keeps the V1 per-run path. """ + from services.retention.workflow_run.bundle_archive_maintenance import WorkflowRunBundleArchiveMaintenance from services.retention.workflow_run.delete_archived_workflow_run import ArchivedWorkflowRunDeletion parsed_tenant_ids = None @@ -417,6 +835,8 @@ def delete_archived_workflow_runs( raise click.UsageError("--start-from and --end-before must be provided together.") if run_id is None and (start_from is None or end_before is None): raise click.UsageError("--start-from and --end-before are required for batch delete.") + if restore_sample_interval < 0: + raise click.BadParameter("restore-sample-interval must be >= 0") start_time = datetime.datetime.now(datetime.UTC) target_desc = f"workflow run {run_id}" if run_id else "workflow runs" @@ -427,56 +847,85 @@ def delete_archived_workflow_runs( ) ) - deleter = ArchivedWorkflowRunDeletion(dry_run=dry_run) if run_id: - results = [deleter.delete_by_run_id(run_id)] - else: - assert start_from is not None - assert end_before is not None - results = deleter.delete_batch( - parsed_tenant_ids, - start_date=start_from, - end_date=end_before, - limit=limit, + deleter = ArchivedWorkflowRunDeletion( + dry_run=dry_run, + skip_bad_archives=skip_bad_archives, + restore_sample_interval=restore_sample_interval, ) + results = [deleter.delete_by_run_id(run_id)] + for result in results: + if result.success: + click.echo( + click.style( + f"{'[DRY RUN] Would delete' if dry_run else 'Deleted'} " + f"workflow run {result.run_id} (tenant={result.tenant_id}, " + f"archive_key={result.archive_key}, counts={result.validated_counts})", + fg="green", + ) + ) + if result.restore_sampled: + sample_status = "passed" if result.restore_sample_success else "failed" + click.echo( + click.style( + f" restore dry-run sample {sample_status} for workflow run {result.run_id}", + fg="green" if result.restore_sample_success else "red", + ) + ) + else: + click.echo( + click.style( + f"Failed to delete workflow run {result.run_id}: {result.error}", + fg="red", + ) + ) + click.echo( + click.style( + " runbook: pause this delete window, verify archive storage object and manifest/checksum, " + "retry the same run after fixing storage or DB drift, or rerun with --skip-bad-archives " + "to quarantine this run and continue the batch.", + fg="yellow", + ) + ) - for result in results: - if result.success: + end_time = datetime.datetime.now(datetime.UTC) + elapsed = end_time - start_time + + successes = sum(1 for result in results if result.success) + failures = len(results) - successes + + if failures == 0: click.echo( click.style( - f"{'[DRY RUN] Would delete' if dry_run else 'Deleted'} " - f"workflow run {result.run_id} (tenant={result.tenant_id})", + f"Delete completed successfully. success={successes} duration={elapsed}", fg="green", ) ) else: click.echo( click.style( - f"Failed to delete workflow run {result.run_id}: {result.error}", + f"Delete completed with failures. success={successes} failed={failures} duration={elapsed}", fg="red", ) ) + return - end_time = datetime.datetime.now(datetime.UTC) - elapsed = end_time - start_time - - successes = sum(1 for result in results if result.success) - failures = len(results) - successes - - if failures == 0: - click.echo( - click.style( - f"Delete completed successfully. success={successes} duration={elapsed}", - fg="green", - ) - ) - else: - click.echo( - click.style( - f"Delete completed with failures. success={successes} failed={failures} duration={elapsed}", - fg="red", - ) - ) + if restore_sample_interval: + click.echo(click.style("--restore-sample-interval is ignored for V2 bundle delete.", fg="yellow")) + assert start_from is not None + assert end_before is not None + bundle_deleter = WorkflowRunBundleArchiveMaintenance( + dry_run=dry_run, + strict_content_validation=True, + stop_on_error=not skip_bad_archives, + ) + summary = bundle_deleter.delete_batch( + tenant_ids=parsed_tenant_ids, + start_date=start_from, + end_date=end_before, + limit=limit, + ) + _echo_bundle_archive_operation_summary(summary) def _find_orphaned_draft_variables(batch_size: int = 1000) -> list[str]: diff --git a/api/extensions/ext_commands.py b/api/extensions/ext_commands.py index 6cd4b08b900..a85f6569978 100644 --- a/api/extensions/ext_commands.py +++ b/api/extensions/ext_commands.py @@ -5,6 +5,7 @@ def init_app(app: DifyApp): from commands import ( add_qdrant_index, archive_workflow_runs, + archive_workflow_runs_plan, backfill_plugin_auto_upgrade, clean_expired_messages, clean_workflow_runs, @@ -72,6 +73,7 @@ def init_app(app: DifyApp): setup_datasource_oauth_client, transform_datasource_credentials, install_rag_pipeline_plugins, + archive_workflow_runs_plan, archive_workflow_runs, delete_archived_workflow_runs, restore_workflow_runs, diff --git a/api/repositories/api_workflow_run_repository.py b/api/repositories/api_workflow_run_repository.py index 2659e550552..bc30e980619 100644 --- a/api/repositories/api_workflow_run_repository.py +++ b/api/repositories/api_workflow_run_repository.py @@ -290,7 +290,10 @@ class APIWorkflowRunRepository(WorkflowExecutionRepository, Protocol): batch_size: int, run_types: Sequence[WorkflowType] | None = None, tenant_ids: Sequence[str] | None = None, + tenant_prefixes: Sequence[str] | None = None, workflow_ids: Sequence[str] | None = None, + run_shard_index: int | None = None, + run_shard_total: int | None = None, ) -> Sequence[WorkflowRun]: """ Fetch ended workflow runs in a time window for archival and clean batching. @@ -298,7 +301,9 @@ class APIWorkflowRunRepository(WorkflowExecutionRepository, Protocol): Optional filters: - run_types - tenant_ids + - tenant_prefixes, using the first hexadecimal digit of tenant_id for rollout waves - workflow_ids + - run_shard_index/run_shard_total, using a deterministic workflow_run_id shard """ ... diff --git a/api/repositories/sqlalchemy_api_workflow_run_repository.py b/api/repositories/sqlalchemy_api_workflow_run_repository.py index b40eb4bdd8a..2394377c9d4 100644 --- a/api/repositories/sqlalchemy_api_workflow_run_repository.py +++ b/api/repositories/sqlalchemy_api_workflow_run_repository.py @@ -56,6 +56,7 @@ from repositories.types import ( DailyTerminalsStats, DailyTokenCostStats, ) +from services.retention.workflow_run.tenant_prefix import tenant_prefix_condition logger = logging.getLogger(__name__) @@ -64,6 +65,40 @@ class _WorkflowRunError(Exception): pass +_HEX_SHARD_VALUES = { + "0": 0, + "1": 1, + "2": 2, + "3": 3, + "4": 4, + "5": 5, + "6": 6, + "7": 7, + "8": 8, + "9": 9, + "a": 10, + "b": 11, + "c": 12, + "d": 13, + "e": 14, + "f": 15, +} + + +def _tenant_prefix_condition(prefixes: Sequence[str]) -> sa.ColumnElement[bool]: + conditions = [tenant_prefix_condition(WorkflowRun.tenant_id, prefix) for prefix in prefixes] + return sa.or_(*conditions) + + +def _workflow_run_id_shard_expr() -> sa.ColumnElement[int]: + normalized_id = func.lower(func.replace(sa.cast(WorkflowRun.id, sa.String()), "-", "")) + last_hex = func.substr(normalized_id, func.length(normalized_id), 1) + return sa.case( + *[(last_hex == hex_digit, shard_value) for hex_digit, shard_value in _HEX_SHARD_VALUES.items()], + else_=0, + ) + + def _build_human_input_required_reason( reason_model: WorkflowPauseReason, form_model: HumanInputForm | None, @@ -378,7 +413,10 @@ class DifyAPISQLAlchemyWorkflowRunRepository(APIWorkflowRunRepository): batch_size: int, run_types: Sequence[WorkflowType] | None = None, tenant_ids: Sequence[str] | None = None, + tenant_prefixes: Sequence[str] | None = None, workflow_ids: Sequence[str] | None = None, + run_shard_index: int | None = None, + run_shard_total: int | None = None, ) -> Sequence[WorkflowRun]: """ Fetch ended workflow runs in a time window for archival and clean batching. @@ -387,7 +425,8 @@ class DifyAPISQLAlchemyWorkflowRunRepository(APIWorkflowRunRepository): - created_at in [start_from, end_before) - type in run_types (when provided) - status is an ended state - - optional tenant_id, workflow_id filters and cursor (last_seen) for pagination + - optional tenant_id, tenant_prefix, workflow_id filters and cursor (last_seen) for pagination + - optional deterministic shard by the last hexadecimal digit of workflow_run_id """ with self._session_maker() as session: stmt = ( @@ -410,9 +449,15 @@ class DifyAPISQLAlchemyWorkflowRunRepository(APIWorkflowRunRepository): if tenant_ids: stmt = stmt.where(WorkflowRun.tenant_id.in_(tenant_ids)) + if tenant_prefixes: + stmt = stmt.where(_tenant_prefix_condition(tenant_prefixes)) + if workflow_ids: stmt = stmt.where(WorkflowRun.workflow_id.in_(workflow_ids)) + if run_shard_index is not None and run_shard_total is not None: + stmt = stmt.where((_workflow_run_id_shard_expr() % run_shard_total) == run_shard_index) + if last_seen: stmt = stmt.where( tuple_(WorkflowRun.created_at, WorkflowRun.id) diff --git a/api/services/retention/workflow_run/archive_paid_plan_workflow_run.py b/api/services/retention/workflow_run/archive_paid_plan_workflow_run.py index 21be411bea7..e046013b2ce 100644 --- a/api/services/retention/workflow_run/archive_paid_plan_workflow_run.py +++ b/api/services/retention/workflow_run/archive_paid_plan_workflow_run.py @@ -1,8 +1,12 @@ """ Archive Paid Plan Workflow Run Logs Service. -This service archives workflow run logs for paid plan users older than the configured -retention period (default: 90 days) to S3-compatible storage. +This service archives workflow run logs for paid plan users older than the configured retention period (default: +90 days) to S3-compatible storage. + +Archive V2 writes bundle-level Parquet objects. A bundle contains many workflow runs and their related table rows. +Bundle metadata lives in the object-store manifest instead of a database table, so archive/delete/restore does not move +the large-table retention problem into another OLTP table. Archived tables: - workflow_runs @@ -16,18 +20,19 @@ Archived tables: """ import datetime -import io +import hashlib import json import logging import time -import zipfile from collections.abc import Sequence -from concurrent.futures import ThreadPoolExecutor from dataclasses import dataclass, field +from enum import Enum from typing import Any, TypedDict import click -from sqlalchemy import inspect +import pyarrow as pa +import pyarrow.parquet as pq +from sqlalchemy import inspect, select from sqlalchemy.orm import Session, sessionmaker from configs import dify_config @@ -39,12 +44,24 @@ from libs.archive_storage import ( ArchiveStorageNotConfiguredError, get_archive_storage, ) -from models.workflow import WorkflowAppLog, WorkflowRun +from models.trigger import WorkflowTriggerLog +from models.workflow import ( + WorkflowAppLog, + WorkflowNodeExecutionModel, + WorkflowNodeExecutionOffload, + WorkflowPause, + WorkflowPauseReason, + WorkflowRun, +) from repositories.api_workflow_node_execution_repository import DifyAPIWorkflowNodeExecutionRepository from repositories.api_workflow_run_repository import APIWorkflowRunRepository from repositories.sqlalchemy_workflow_trigger_log_repository import SQLAlchemyWorkflowTriggerLogRepository from services.billing_service import BillingService -from services.retention.workflow_run.constants import ARCHIVE_BUNDLE_NAME, ARCHIVE_SCHEMA_VERSION +from services.retention.workflow_run.constants import ( + ARCHIVE_BUNDLE_FORMAT, + ARCHIVE_BUNDLE_MANIFEST_NAME, + ARCHIVE_BUNDLE_SCHEMA_VERSION, +) logger = logging.getLogger(__name__) @@ -53,17 +70,41 @@ class TableStatsManifestEntry(TypedDict): row_count: int checksum: str size_bytes: int + object_key: str class ArchiveManifestDict(TypedDict): schema_version: str - workflow_run_id: str + archive_format: str tenant_id: str - app_id: str - workflow_id: str - created_at: str + tenant_prefix: str + year: int + month: int + shard: str + bundle_id: str + object_prefix: str + workflow_run_count: int + workflow_node_execution_count: int + min_created_at: str + max_created_at: str + min_run_id: str + max_run_id: str archived_at: str tables: dict[str, TableStatsManifestEntry] + run_ids: list[str] + + +@dataclass(frozen=True) +class ArchiveBundleIdentity: + """Stable identity and object prefix for one V2 archive bundle.""" + + tenant_prefix: str + tenant_id: str + year: int + month: int + shard: str + bundle_id: str + object_prefix: str @dataclass @@ -74,16 +115,21 @@ class TableStats: row_count: int checksum: str size_bytes: int + object_key: str = "" @dataclass class ArchiveResult: - """Result of archiving a single workflow run.""" + """Result of archiving a bundle of workflow runs.""" - run_id: str + bundle_id: str tenant_id: str + object_prefix: str success: bool + run_count: int = 0 tables: list[TableStats] = field(default_factory=list) + object_size_bytes: int = 0 + skipped: bool = False error: str | None = None elapsed_time: float = 0.0 @@ -96,6 +142,12 @@ class ArchiveSummary: runs_archived: int = 0 runs_skipped: int = 0 runs_failed: int = 0 + total_bundles_processed: int = 0 + bundles_archived: int = 0 + bundles_skipped: int = 0 + bundles_failed: int = 0 + total_object_size_bytes: int = 0 + table_stats: dict[str, TableStats] = field(default_factory=dict) total_elapsed_time: float = 0.0 @@ -104,16 +156,20 @@ class WorkflowRunArchiver: Archive workflow run logs for paid plan users. Storage Layout: - {tenant_id}/app_id={app_id}/year={YYYY}/month={MM}/workflow_run_id={run_id}/ - └── archive.v1.0.zip + workflow-runs/v2/tenant_prefix={prefix}/tenant_id={tenant_id}/year={YYYY}/month={MM}/ + shard={shard}/bundle={bundle_id}/ ├── manifest.json - ├── workflow_runs.jsonl - ├── workflow_app_logs.jsonl - ├── workflow_node_executions.jsonl - ├── workflow_node_execution_offload.jsonl - ├── workflow_pauses.jsonl - ├── workflow_pause_reasons.jsonl - └── workflow_trigger_logs.jsonl + ├── workflow_runs.parquet + ├── workflow_app_logs.parquet + ├── workflow_node_executions.parquet + ├── workflow_node_execution_offload.parquet + ├── workflow_pauses.parquet + ├── workflow_pause_reasons.parquet + └── workflow_trigger_logs.parquet + + `batch_size` is the maximum workflow_runs per bundle. The current implementation groups each fetched page by + tenant/month before writing bundles. Bundle idempotency is based on the manifest object key; the manifest is + uploaded after all table objects, so a missing manifest means the bundle should be retried. """ ARCHIVED_TYPE = [ @@ -132,6 +188,10 @@ class WorkflowRunArchiver: start_from: datetime.datetime | None end_before: datetime.datetime + paid_tenant_ids: set[str] | None + tenant_prefixes: list[str] + run_shard_index: int | None + run_shard_total: int | None def __init__( self, @@ -141,6 +201,10 @@ class WorkflowRunArchiver: end_before: datetime.datetime | None = None, workers: int = 1, tenant_ids: Sequence[str] | None = None, + tenant_prefixes: Sequence[str] | None = None, + paid_tenant_ids: Sequence[str] | None = None, + run_shard_index: int | None = None, + run_shard_total: int | None = None, limit: int | None = None, dry_run: bool = False, delete_after_archive: bool = False, @@ -156,10 +220,19 @@ class WorkflowRunArchiver: end_before: Optional end time (exclusive) for archiving workers: Number of concurrent workflow runs to archive tenant_ids: Optional tenant IDs for grayscale rollout + tenant_prefixes: Optional tenant ID first-hex prefixes for rollout waves. CLI callers should resolve these + to tenant_ids during planning so workflow_runs scan uses tenant_id IN (...) instead of a prefix range. + paid_tenant_ids: Optional paid-tenant whitelist resolved by the archive plan. When provided, archive uses it + for per-run paid filtering and does not call billing on every fetched page. + run_shard_index: Optional zero-based workflow run shard index for parallel cron jobs + run_shard_total: Optional total workflow run shard count for parallel cron jobs limit: Maximum number of runs to archive (None for unlimited) dry_run: If True, only preview without making changes - delete_after_archive: If True, delete runs and related data after archiving + delete_after_archive: Reserved for the V1 per-run path. Bundle archive requires a separate validated + bundle delete workflow. """ + if delete_after_archive: + raise ValueError("delete_after_archive is not supported by bundle archive") self.days = days self.batch_size = batch_size if start_from or end_before: @@ -176,6 +249,16 @@ class WorkflowRunArchiver: raise ValueError("workers must be at least 1") self.workers = workers self.tenant_ids = sorted(set(tenant_ids)) if tenant_ids else [] + self.tenant_prefixes = sorted(set(tenant_prefixes)) if tenant_prefixes else [] + self.paid_tenant_ids = set(paid_tenant_ids) if paid_tenant_ids is not None else None + if (run_shard_index is None) ^ (run_shard_total is None): + raise ValueError("run_shard_index and run_shard_total must be provided together") + if run_shard_total is not None and not 1 <= run_shard_total <= 16: + raise ValueError("run_shard_total must be between 1 and 16") + if run_shard_index is not None and run_shard_total is not None and not 0 <= run_shard_index < run_shard_total: + raise ValueError("run_shard_index must be between 0 and run_shard_total - 1") + self.run_shard_index = run_shard_index + self.run_shard_total = run_shard_total self.limit = limit self.dry_run = dry_run self.delete_after_archive = delete_after_archive @@ -209,124 +292,185 @@ class WorkflowRunArchiver: return summary session_maker = sessionmaker(bind=db.engine, expire_on_commit=False) - repo = self._get_workflow_run_repo() + attempted_count = 0 - def _archive_with_session(run: WorkflowRun) -> ArchiveResult: - with session_maker() as session: - return self._archive_run(session, storage, run) - - last_seen: tuple[datetime.datetime, str] | None = None - archived_count = 0 - - with ThreadPoolExecutor(max_workers=self.workers) as executor: + for tenant_scope in self._tenant_scan_scopes(): + last_seen: tuple[datetime.datetime, str] | None = None while True: - # Check limit - if self.limit and archived_count >= self.limit: + if self.limit and attempted_count >= self.limit: click.echo(click.style(f"Reached limit of {self.limit} runs", fg="yellow")) break - # Fetch batch of runs - runs = self._get_runs_batch(last_seen) - + runs = self._get_runs_batch(last_seen, tenant_scope=tenant_scope) if not runs: break - run_ids = [run.id for run in runs] - with session_maker() as session: - archived_run_ids = repo.get_archived_run_ids(session, run_ids) - last_seen = (runs[-1].created_at, runs[-1].id) - - # Filter to paid tenants only tenant_ids = {run.tenant_id for run in runs} paid_tenants = self._filter_paid_tenants(tenant_ids) runs_to_process: list[WorkflowRun] = [] for run in runs: summary.total_runs_processed += 1 - - # Skip non-paid tenants if run.tenant_id not in paid_tenants: summary.runs_skipped += 1 continue - - # Skip already archived runs - if run.id in archived_run_ids: - summary.runs_skipped += 1 - continue - - # Check limit - if self.limit and archived_count + len(runs_to_process) >= self.limit: + if self.limit and attempted_count + len(runs_to_process) >= self.limit: break - runs_to_process.append(run) if not runs_to_process: continue - results = list(executor.map(_archive_with_session, runs_to_process)) + for bundle_runs in self._group_runs_for_bundles(runs_to_process): + summary.total_bundles_processed += 1 + with session_maker() as session: + result = self._archive_bundle(session, storage, bundle_runs) - for run, result in zip(runs_to_process, results): - if result.success: - summary.runs_archived += 1 - archived_count += 1 + if result.skipped: + attempted_count += result.run_count + summary.bundles_skipped += 1 + summary.runs_skipped += result.run_count + click.echo( + click.style( + f"Skipped bundle {result.bundle_id} (tenant={result.tenant_id}, " + f"runs={result.run_count}, reason={result.error or 'already handled'})", + fg="yellow", + ) + ) + elif result.success: + attempted_count += result.run_count + summary.bundles_archived += 1 + summary.runs_archived += result.run_count + self._merge_result_stats(summary, result) click.echo( click.style( f"{'[DRY RUN] Would archive' if self.dry_run else 'Archived'} " - f"run {run.id} (tenant={run.tenant_id}, " - f"tables={len(result.tables)}, time={result.elapsed_time:.2f}s)", + f"bundle {result.bundle_id} (tenant={result.tenant_id}, runs={result.run_count}, " + f"tables={len(result.tables)}, object_size_bytes={result.object_size_bytes}, " + f"time={result.elapsed_time:.2f}s)", fg="green", ) ) + if self.dry_run: + self._echo_table_estimates(result.tables) else: - summary.runs_failed += 1 + attempted_count += result.run_count + summary.bundles_failed += 1 + summary.runs_failed += result.run_count click.echo( click.style( - f"Failed to archive run {run.id}: {result.error}", + f"Failed to archive bundle {result.bundle_id}: {result.error}", fg="red", ) ) + if self.limit and attempted_count >= self.limit: + break + summary.total_elapsed_time = time.time() - start_time click.echo( click.style( f"{'[DRY RUN] ' if self.dry_run else ''}Archive complete: " f"processed={summary.total_runs_processed}, archived={summary.runs_archived}, " f"skipped={summary.runs_skipped}, failed={summary.runs_failed}, " + f"bundles_archived={summary.bundles_archived}, bundles_skipped={summary.bundles_skipped}, " + f"bundles_failed={summary.bundles_failed}, " + f"object_size_bytes={summary.total_object_size_bytes}, " f"time={summary.total_elapsed_time:.2f}s", fg="white", ) ) + if self.dry_run: + self._echo_summary_estimates(summary) return summary + @staticmethod + def _merge_result_stats(summary: ArchiveSummary, result: ArchiveResult) -> None: + summary.total_object_size_bytes += result.object_size_bytes + for table_stat in result.tables: + summary_stat = summary.table_stats.get(table_stat.table_name) + if summary_stat is None: + summary.table_stats[table_stat.table_name] = TableStats( + table_name=table_stat.table_name, + row_count=table_stat.row_count, + checksum="", + size_bytes=table_stat.size_bytes, + ) + continue + summary_stat.row_count += table_stat.row_count + summary_stat.size_bytes += table_stat.size_bytes + + @staticmethod + def _echo_table_estimates(table_stats: Sequence[TableStats]) -> None: + for stat in table_stats: + click.echo( + click.style( + f" table={stat.table_name} rows={stat.row_count} parquet_bytes={stat.size_bytes}", + fg="white", + ) + ) + + def _echo_summary_estimates(self, summary: ArchiveSummary) -> None: + click.echo(click.style("[DRY RUN] Estimated archive totals by table:", fg="white")) + for table_name in self.ARCHIVED_TABLES: + stat = summary.table_stats.get(table_name) + row_count = stat.row_count if stat else 0 + size_bytes = stat.size_bytes if stat else 0 + click.echo(click.style(f" table={table_name} rows={row_count} parquet_bytes={size_bytes}", fg="white")) + def _get_runs_batch( self, last_seen: tuple[datetime.datetime, str] | None, + tenant_scope: Sequence[str] | None = None, ) -> Sequence[WorkflowRun]: """Fetch a batch of workflow runs to archive.""" repo = self._get_workflow_run_repo() + tenant_ids = list(tenant_scope) if tenant_scope is not None else self.tenant_ids or None return repo.get_runs_batch_by_time_range( start_from=self.start_from, end_before=self.end_before, last_seen=last_seen, batch_size=self.batch_size, run_types=self.ARCHIVED_TYPE, - tenant_ids=self.tenant_ids or None, + tenant_ids=tenant_ids, + tenant_prefixes=None if tenant_ids else self.tenant_prefixes or None, + run_shard_index=self.run_shard_index, + run_shard_total=self.run_shard_total, ) + def _tenant_scan_scopes(self) -> list[list[str] | None]: + if not self.tenant_ids: + return [None] + return [[tenant_id] for tenant_id in self.tenant_ids] + def _build_start_message(self) -> str: range_desc = f"before {self.end_before.isoformat()}" if self.start_from: range_desc = f"between {self.start_from.isoformat()} and {self.end_before.isoformat()}" + run_shard_desc = "all" + if self.run_shard_index is not None and self.run_shard_total is not None: + run_shard_desc = f"{self.run_shard_index}/{self.run_shard_total}" return ( f"{'[DRY RUN] ' if self.dry_run else ''}Starting workflow run archiving " f"for runs {range_desc} " - f"(batch_size={self.batch_size}, tenant_ids={','.join(self.tenant_ids) or 'all'})" + f"(batch_size={self.batch_size}, tenant_ids={self._format_tenant_scope()}, " + f"tenant_prefixes={','.join(self.tenant_prefixes) or 'all'}, run_shard={run_shard_desc})" ) + def _format_tenant_scope(self) -> str: + if not self.tenant_ids: + return "all" + if len(self.tenant_ids) <= 10: + return ",".join(self.tenant_ids) + return f"{len(self.tenant_ids)} planned tenants" + def _filter_paid_tenants(self, tenant_ids: set[str]) -> set[str]: """Filter tenant IDs to only include paid tenants.""" + if self.paid_tenant_ids is not None: + return tenant_ids & self.paid_tenant_ids + if not dify_config.BILLING_ENABLED: # If billing is not enabled, treat all tenants as paid return tenant_ids @@ -349,177 +493,293 @@ class WorkflowRunArchiver: return paid - def _archive_run( + def _archive_bundle( self, session: Session, storage: ArchiveStorage | None, - run: WorkflowRun, + runs: Sequence[WorkflowRun], ) -> ArchiveResult: - """Archive a single workflow run.""" + """Archive one tenant/month bundle of workflow runs.""" + if not runs: + raise ValueError("runs must not be empty") start_time = time.time() - result = ArchiveResult(run_id=run.id, tenant_id=run.tenant_id, success=False) + identity = self._build_bundle_identity(runs) + result = ArchiveResult( + bundle_id=identity.bundle_id, + tenant_id=identity.tenant_id, + object_prefix=identity.object_prefix, + run_count=len(runs), + success=False, + ) try: - # Extract data from all tables - table_data, app_logs, trigger_metadata = self._extract_data(session, run) + if not self.dry_run: + if storage is None: + raise ArchiveStorageNotConfiguredError("Archive storage not configured") + if storage.object_exists(self._get_manifest_object_key(identity)): + result.success = True + result.skipped = True + result.error = "bundle already archived" + result.elapsed_time = time.time() - start_time + return result + + locked_runs = self._lock_runs_for_archive(session, [run.id for run in runs]) + if len(locked_runs) != len(runs): + result.success = True + result.skipped = True + result.error = "one or more runs locked or deleted by another archiver" + result.elapsed_time = time.time() - start_time + return result + runs = locked_runs + + table_data = self._extract_bundle_data(session, runs) + table_stats, table_payloads, manifest_data = self._build_archive_payload(identity, runs, table_data) + object_size = len(manifest_data) + sum(len(payload) for payload in table_payloads.values()) if self.dry_run: - # In dry run, just report what would be archived - for table_name in self.ARCHIVED_TABLES: - records = table_data.get(table_name, []) - result.tables.append( - TableStats( - table_name=table_name, - row_count=len(records), - checksum="", - size_bytes=0, - ) - ) + result.tables = table_stats + result.object_size_bytes = object_size result.success = True else: if storage is None: raise ArchiveStorageNotConfiguredError("Archive storage not configured") - archive_key = self._get_archive_key(run) - # Serialize tables for the archive bundle - table_stats: list[TableStats] = [] - table_payloads: dict[str, bytes] = {} - for table_name in self.ARCHIVED_TABLES: - records = table_data.get(table_name, []) - data = ArchiveStorage.serialize_to_jsonl(records) - table_payloads[table_name] = data - checksum = ArchiveStorage.compute_checksum(data) - - table_stats.append( - TableStats( - table_name=table_name, - row_count=len(records), - checksum=checksum, - size_bytes=len(data), - ) - ) - - # Generate and upload archive bundle - manifest = self._generate_manifest(run, table_stats) - manifest_data = json.dumps(manifest, indent=2, default=str).encode("utf-8") - archive_data = self._build_archive_bundle(manifest_data, table_payloads) - storage.put_object(archive_key, archive_data) - - repo = self._get_workflow_run_repo() - archived_log_count = repo.create_archive_logs(session, run, app_logs, trigger_metadata) + for table_name, payload in table_payloads.items(): + storage.put_object(self._get_table_object_key(identity, table_name), payload) + storage.put_object(self._get_manifest_object_key(identity), manifest_data) session.commit() - deleted_counts = None - if self.delete_after_archive: - deleted_counts = repo.delete_runs_with_related( - [run], - delete_node_executions=self._delete_node_executions, - delete_trigger_logs=self._delete_trigger_logs, - ) - logger.info( - "Archived workflow run %s: tables=%s, archived_logs=%s, deleted=%s", - run.id, + "Archived workflow run bundle %s: tenant=%s runs=%s tables=%s object_prefix=%s", + identity.bundle_id, + identity.tenant_id, + len(runs), {s.table_name: s.row_count for s in table_stats}, - archived_log_count, - deleted_counts, + identity.object_prefix, ) result.tables = table_stats + result.object_size_bytes = object_size result.success = True except Exception as e: - logger.exception("Failed to archive workflow run %s", run.id) + logger.exception("Failed to archive workflow run bundle %s", identity.bundle_id) result.error = str(e) session.rollback() result.elapsed_time = time.time() - start_time return result - def _extract_data( + def _lock_runs_for_archive( self, session: Session, - run: WorkflowRun, - ) -> tuple[dict[str, list[dict[str, Any]]], Sequence[WorkflowAppLog], str | None]: + run_ids: Sequence[str], + ) -> list[WorkflowRun]: + """ + Lock workflow runs before archiving a bundle. + + Parallel cron jobs may select overlapping pages. Row-level SKIP LOCKED keeps duplicate archivers from uploading + conflicting bundle objects for the same source rows. + """ + if not run_ids: + return [] + stmt = ( + select(WorkflowRun) + .where(WorkflowRun.id.in_(run_ids)) + .order_by(WorkflowRun.created_at.asc(), WorkflowRun.id.asc()) + .with_for_update(skip_locked=True) + ) + return list(session.scalars(stmt)) + + def _extract_bundle_data( + self, + session: Session, + runs: Sequence[WorkflowRun], + ) -> dict[str, list[dict[str, Any]]]: + """Extract all archived table rows for a bundle.""" + run_ids = [run.id for run in runs] table_data: dict[str, list[dict[str, Any]]] = {} - table_data["workflow_runs"] = [self._row_to_dict(run)] - repo = self._get_workflow_run_repo() - app_logs = repo.get_app_logs_by_run_id(session, run.id) + table_data["workflow_runs"] = [self._row_to_dict(run) for run in runs] + + app_logs = list(session.scalars(select(WorkflowAppLog).where(WorkflowAppLog.workflow_run_id.in_(run_ids)))) table_data["workflow_app_logs"] = [self._row_to_dict(row) for row in app_logs] - node_exec_repo = self._get_workflow_node_execution_repo(session) - node_exec_records = node_exec_repo.get_executions_by_workflow_run( - tenant_id=run.tenant_id, - app_id=run.app_id, - workflow_run_id=run.id, + + node_exec_records = list( + session.scalars( + select(WorkflowNodeExecutionModel).where(WorkflowNodeExecutionModel.workflow_run_id.in_(run_ids)) + ) ) node_exec_ids = [record.id for record in node_exec_records] - offload_records = node_exec_repo.get_offloads_by_execution_ids(session, node_exec_ids) + offload_records = [] + if node_exec_ids: + offload_records = list( + session.scalars( + select(WorkflowNodeExecutionOffload).where( + WorkflowNodeExecutionOffload.node_execution_id.in_(node_exec_ids) + ) + ) + ) table_data["workflow_node_executions"] = [self._row_to_dict(row) for row in node_exec_records] table_data["workflow_node_execution_offload"] = [self._row_to_dict(row) for row in offload_records] - repo = self._get_workflow_run_repo() - pause_records = repo.get_pause_records_by_run_id(session, run.id) + + pause_records = list(session.scalars(select(WorkflowPause).where(WorkflowPause.workflow_run_id.in_(run_ids)))) pause_ids = [pause.id for pause in pause_records] - pause_reason_records = repo.get_pause_reason_records_by_run_id( - session, - pause_ids, - ) + pause_reason_records = [] + if pause_ids: + pause_reason_records = list( + session.scalars(select(WorkflowPauseReason).where(WorkflowPauseReason.pause_id.in_(pause_ids))) + ) table_data["workflow_pauses"] = [self._row_to_dict(row) for row in pause_records] table_data["workflow_pause_reasons"] = [self._row_to_dict(row) for row in pause_reason_records] + trigger_repo = SQLAlchemyWorkflowTriggerLogRepository(session) - trigger_records = trigger_repo.list_by_run_id(run.id) + trigger_records: list[WorkflowTriggerLog] = [] + for run_id in run_ids: + trigger_records.extend(trigger_repo.list_by_run_id(run_id)) table_data["workflow_trigger_logs"] = [self._row_to_dict(row) for row in trigger_records] - trigger_metadata = trigger_records[0].trigger_metadata if trigger_records else None - return table_data, app_logs, trigger_metadata + return table_data @staticmethod def _row_to_dict(row: Any) -> dict[str, Any]: mapper = inspect(row).mapper return {str(column.name): getattr(row, mapper.get_property_by_column(column).key) for column in mapper.columns} - def _get_archive_key(self, run: WorkflowRun) -> str: - """Get the storage key for the archive bundle.""" - created_at = run.created_at - prefix = ( - f"{run.tenant_id}/app_id={run.app_id}/year={created_at.strftime('%Y')}/" - f"month={created_at.strftime('%m')}/workflow_run_id={run.id}" - ) - return f"{prefix}/{ARCHIVE_BUNDLE_NAME}" + def _build_archive_payload( + self, + identity: ArchiveBundleIdentity, + runs: Sequence[WorkflowRun], + table_data: dict[str, list[dict[str, Any]]], + ) -> tuple[list[TableStats], dict[str, bytes], bytes]: + """Build the archive payload and size stats without writing it to storage.""" + table_stats: list[TableStats] = [] + table_payloads: dict[str, bytes] = {} + for table_name in self.ARCHIVED_TABLES: + records = table_data.get(table_name, []) + data = self._serialize_to_parquet(records) + table_payloads[table_name] = data + checksum = ArchiveStorage.compute_checksum(data) + + table_stats.append( + TableStats( + table_name=table_name, + row_count=len(records), + checksum=checksum, + size_bytes=len(data), + object_key=self._get_table_object_key(identity, table_name), + ) + ) + + manifest = self._generate_manifest(identity, runs, table_stats) + manifest_data = json.dumps(manifest, indent=2, default=str).encode("utf-8") + return table_stats, table_payloads, manifest_data def _generate_manifest( self, - run: WorkflowRun, + identity: ArchiveBundleIdentity, + runs: Sequence[WorkflowRun], table_stats: list[TableStats], ) -> ArchiveManifestDict: - """Generate a manifest for the archived workflow run.""" + """Generate a manifest for the archived workflow run bundle.""" tables: dict[str, TableStatsManifestEntry] = { stat.table_name: { "row_count": stat.row_count, "checksum": stat.checksum, "size_bytes": stat.size_bytes, + "object_key": stat.object_key, } for stat in table_stats } + sorted_runs = sorted(runs, key=lambda run: (run.created_at, run.id)) return ArchiveManifestDict( - schema_version=ARCHIVE_SCHEMA_VERSION, - workflow_run_id=run.id, - tenant_id=run.tenant_id, - app_id=run.app_id, - workflow_id=run.workflow_id, - created_at=run.created_at.isoformat(), + schema_version=ARCHIVE_BUNDLE_SCHEMA_VERSION, + archive_format=ARCHIVE_BUNDLE_FORMAT, + tenant_id=identity.tenant_id, + tenant_prefix=identity.tenant_prefix, + year=identity.year, + month=identity.month, + shard=identity.shard, + bundle_id=identity.bundle_id, + object_prefix=identity.object_prefix, + workflow_run_count=len(runs), + workflow_node_execution_count=tables["workflow_node_executions"]["row_count"], + min_created_at=sorted_runs[0].created_at.isoformat(), + max_created_at=sorted_runs[-1].created_at.isoformat(), + min_run_id=min(run.id for run in runs), + max_run_id=max(run.id for run in runs), archived_at=datetime.datetime.now(datetime.UTC).isoformat(), tables=tables, + run_ids=[run.id for run in sorted_runs], ) - def _build_archive_bundle(self, manifest_data: bytes, table_payloads: dict[str, bytes]) -> bytes: - buffer = io.BytesIO() - with zipfile.ZipFile(buffer, mode="w", compression=zipfile.ZIP_DEFLATED) as archive: - archive.writestr("manifest.json", manifest_data) - for table_name in self.ARCHIVED_TABLES: - data = table_payloads.get(table_name) - if data is None: - raise ValueError(f"Missing archive payload for {table_name}") - archive.writestr(f"{table_name}.jsonl", data) - return buffer.getvalue() + @staticmethod + def _serialize_to_parquet(records: list[dict[str, Any]]) -> bytes: + normalized_records = [WorkflowRunArchiver._normalize_record_for_parquet(record) for record in records] + table = pa.Table.from_pylist(normalized_records) if normalized_records else pa.table({}) + sink = pa.BufferOutputStream() + pq.write_table(table, sink, compression="zstd") + return sink.getvalue().to_pybytes() + + @staticmethod + def _normalize_record_for_parquet(record: dict[str, Any]) -> dict[str, Any]: + def normalize(value: Any) -> Any: + if isinstance(value, Enum): + return value.value + if isinstance(value, dict | list): + return json.dumps(value, default=str, ensure_ascii=False) + return value + + return {key: normalize(value) for key, value in record.items()} + + def _group_runs_for_bundles(self, runs: Sequence[WorkflowRun]) -> list[list[WorkflowRun]]: + """Group a fetched page into tenant/month bundles.""" + grouped: dict[tuple[str, int, int], list[WorkflowRun]] = {} + for run in runs: + key = (run.tenant_id, run.created_at.year, run.created_at.month) + grouped.setdefault(key, []).append(run) + return [sorted(group, key=lambda run: (run.created_at, run.id)) for group in grouped.values()] + + def _build_bundle_identity(self, runs: Sequence[WorkflowRun]) -> ArchiveBundleIdentity: + """Build the object-store identity for a bundle.""" + sorted_runs = sorted(runs, key=lambda run: (run.created_at, run.id)) + first_run = sorted_runs[0] + tenant_ids = {run.tenant_id for run in sorted_runs} + if len(tenant_ids) != 1: + raise ValueError("archive bundle cannot span multiple tenants") + years_months = {(run.created_at.year, run.created_at.month) for run in sorted_runs} + if len(years_months) != 1: + raise ValueError("archive bundle cannot span multiple months") + + run_ids_digest = hashlib.sha256(",".join(run.id for run in sorted_runs).encode("utf-8")).hexdigest() + tenant_prefix = first_run.tenant_id[0].lower() + shard = self._bundle_shard_name() + year, month = next(iter(years_months)) + bundle_id = run_ids_digest[:16] + object_prefix = ( + f"workflow-runs/v2/tenant_prefix={tenant_prefix}/tenant_id={first_run.tenant_id}/" + f"year={year:04d}/month={month:02d}/shard={shard}/bundle={bundle_id}" + ) + return ArchiveBundleIdentity( + tenant_prefix=tenant_prefix, + tenant_id=first_run.tenant_id, + year=year, + month=month, + shard=shard, + bundle_id=bundle_id, + object_prefix=object_prefix, + ) + + def _bundle_shard_name(self) -> str: + if self.run_shard_index is None or self.run_shard_total is None: + return "00-of-01" + return f"{self.run_shard_index:02d}-of-{self.run_shard_total:02d}" + + @staticmethod + def _get_table_object_key(identity: ArchiveBundleIdentity, table_name: str) -> str: + return f"{identity.object_prefix}/{table_name}.parquet" + + @staticmethod + def _get_manifest_object_key(identity: ArchiveBundleIdentity) -> str: + return f"{identity.object_prefix}/{ARCHIVE_BUNDLE_MANIFEST_NAME}" def _delete_trigger_logs(self, session: Session, run_ids: Sequence[str]) -> int: trigger_repo = SQLAlchemyWorkflowTriggerLogRepository(session) diff --git a/api/services/retention/workflow_run/bundle_archive_maintenance.py b/api/services/retention/workflow_run/bundle_archive_maintenance.py new file mode 100644 index 00000000000..8678e044c5e --- /dev/null +++ b/api/services/retention/workflow_run/bundle_archive_maintenance.py @@ -0,0 +1,872 @@ +""" +Maintain V2 workflow-run archive bundles. + +Archive V2 keeps bundle metadata in object-store manifests, not in a database table. This module discovers bundles by +listing `manifest.json` objects, uses object-store marker files for delete/restore state, and only touches the database +for source-table validation, deletion, and restoration. + +Each bundle is processed in its own database transaction. A failed bundle leaves source rows unchanged unless the +transaction has already committed; marker handling makes the next run able to reconcile the common committed-but-marker +not-updated case. +""" + +import datetime +import io +import json +import logging +import time +from collections.abc import Sequence +from dataclasses import dataclass, field +from enum import Enum +from typing import Any, TypedDict, cast + +import pyarrow.parquet as pq +import sqlalchemy as sa +from sqlalchemy import delete, func, inspect, select +from sqlalchemy.dialects.postgresql import insert as pg_insert +from sqlalchemy.engine import CursorResult +from sqlalchemy.orm import Session, sessionmaker + +from extensions.ext_database import db +from libs.archive_storage import ArchiveStorage, ArchiveStorageNotConfiguredError, get_archive_storage +from models.trigger import WorkflowTriggerLog +from models.workflow import ( + WorkflowAppLog, + WorkflowNodeExecutionModel, + WorkflowNodeExecutionOffload, + WorkflowPause, + WorkflowPauseReason, + WorkflowRun, +) +from services.retention.workflow_run.constants import ( + ARCHIVE_BUNDLE_DELETE_STARTED_MARKER_NAME, + ARCHIVE_BUNDLE_DELETED_MARKER_NAME, + ARCHIVE_BUNDLE_FORMAT, + ARCHIVE_BUNDLE_MANIFEST_NAME, + ARCHIVE_BUNDLE_RESTORE_STARTED_MARKER_NAME, + ARCHIVE_BUNDLE_RESTORED_MARKER_NAME, + ARCHIVE_BUNDLE_SCHEMA_VERSION, +) + +logger = logging.getLogger(__name__) + +_ARCHIVE_ROOT_PREFIX = "workflow-runs/v2/" +_CHUNK_SIZE = 5_000 + + +class TableManifestEntry(TypedDict): + row_count: int + checksum: str + size_bytes: int + object_key: str + + +class BundleManifest(TypedDict): + schema_version: str + archive_format: str + tenant_id: str + tenant_prefix: str + year: int + month: int + shard: str + bundle_id: str + object_prefix: str + workflow_run_count: int + workflow_node_execution_count: int + min_created_at: str + max_created_at: str + min_run_id: str + max_run_id: str + archived_at: str + tables: dict[str, TableManifestEntry] + run_ids: list[str] + + +@dataclass(frozen=True) +class BundleReference: + """Object-store reference for one V2 archive bundle.""" + + object_prefix: str + manifest_key: str + manifest: BundleManifest + + +@dataclass +class BundleOperationResult: + """Result for one V2 bundle delete or restore operation.""" + + bundle_id: str + tenant_id: str + object_prefix: str + success: bool = False + table_counts: dict[str, int] = field(default_factory=dict) + archive_bytes: int = 0 + elapsed_time: float = 0.0 + validation_time: float = 0.0 + error: str | None = None + + @property + def run_count(self) -> int: + return self.table_counts.get("workflow_runs", 0) + + @property + def row_count(self) -> int: + return sum(self.table_counts.values()) + + +@dataclass +class BundleOperationSummary: + """Aggregate metrics for a V2 bundle maintenance command.""" + + operation: str + bundles_processed: int = 0 + bundles_succeeded: int = 0 + bundles_failed: int = 0 + rows_processed: int = 0 + runs_processed: int = 0 + archive_bytes: int = 0 + elapsed_time: float = 0.0 + validation_time: float = 0.0 + table_counts: dict[str, int] = field(default_factory=dict) + results: list[BundleOperationResult] = field(default_factory=list) + + @property + def runs_per_second(self) -> float: + if self.elapsed_time <= 0: + return 0.0 + return self.runs_processed / self.elapsed_time + + @property + def rows_per_second(self) -> float: + if self.elapsed_time <= 0: + return 0.0 + return self.rows_processed / self.elapsed_time + + @property + def bytes_per_second(self) -> float: + if self.elapsed_time <= 0: + return 0.0 + return self.archive_bytes / self.elapsed_time + + +TABLE_MODELS: dict[str, Any] = { + "workflow_runs": WorkflowRun, + "workflow_app_logs": WorkflowAppLog, + "workflow_node_executions": WorkflowNodeExecutionModel, + "workflow_node_execution_offload": WorkflowNodeExecutionOffload, + "workflow_pauses": WorkflowPause, + "workflow_pause_reasons": WorkflowPauseReason, + "workflow_trigger_logs": WorkflowTriggerLog, +} + +ARCHIVED_TABLES = [ + "workflow_runs", + "workflow_app_logs", + "workflow_node_executions", + "workflow_node_execution_offload", + "workflow_pauses", + "workflow_pause_reasons", + "workflow_trigger_logs", +] + +RESTORE_ORDER = [ + "workflow_runs", + "workflow_app_logs", + "workflow_node_executions", + "workflow_node_execution_offload", + "workflow_pauses", + "workflow_pause_reasons", + "workflow_trigger_logs", +] + + +class WorkflowRunBundleArchiveMaintenance: + """ + Delete and restore V2 workflow-run archive bundles. + + Args: + dry_run: Validate and report counts without changing source rows or object-store markers. + strict_content_validation: Compare source-table content checksums against Parquet content before destructive + delete and after restore. Keep enabled for real maintenance. + stop_on_error: Stop batch processing after the first failed bundle. + """ + + dry_run: bool + strict_content_validation: bool + stop_on_error: bool + + def __init__( + self, + *, + dry_run: bool = False, + strict_content_validation: bool = True, + stop_on_error: bool = True, + ) -> None: + self.dry_run = dry_run + self.strict_content_validation = strict_content_validation + self.stop_on_error = stop_on_error + + def delete_batch( + self, + *, + tenant_ids: Sequence[str] | None, + start_date: datetime.datetime, + end_date: datetime.datetime, + limit: int, + ) -> BundleOperationSummary: + """Validate and delete source rows for archived V2 bundles in the requested created_at window.""" + return self._process_batch( + operation="delete", + tenant_ids=tenant_ids, + start_date=start_date, + end_date=end_date, + limit=limit, + ) + + def restore_batch( + self, + *, + tenant_ids: Sequence[str] | None, + start_date: datetime.datetime, + end_date: datetime.datetime, + limit: int, + ) -> BundleOperationSummary: + """Restore source rows for deleted V2 bundles in the requested created_at window.""" + return self._process_batch( + operation="restore", + tenant_ids=tenant_ids, + start_date=start_date, + end_date=end_date, + limit=limit, + ) + + def _process_batch( + self, + *, + operation: str, + tenant_ids: Sequence[str] | None, + start_date: datetime.datetime, + end_date: datetime.datetime, + limit: int, + ) -> BundleOperationSummary: + start_time = time.time() + summary = BundleOperationSummary(operation=operation) + if tenant_ids is not None and not tenant_ids: + return summary + + storage = self._get_archive_storage() + bundle_refs = self._list_bundle_refs( + storage, + operation=operation, + tenant_ids=tenant_ids, + start_date=start_date, + end_date=end_date, + limit=limit, + ) + + logger.info("Found %s V2 archive bundles for %s", len(bundle_refs), operation) + session_maker = sessionmaker(bind=db.engine, expire_on_commit=False) + for bundle_ref in bundle_refs: + with session_maker() as session: + if operation == "delete": + result = self._delete_bundle(session, storage, bundle_ref) + elif operation == "restore": + result = self._restore_bundle(session, storage, bundle_ref) + else: + raise ValueError(f"Unsupported operation: {operation}") + + self._merge_result(summary, result) + if not result.success and self.stop_on_error: + logger.error("Stopping V2 bundle %s after failure: %s", operation, result.error) + break + + summary.elapsed_time = time.time() - start_time + return summary + + def _list_bundle_refs( + self, + storage: ArchiveStorage, + *, + operation: str, + tenant_ids: Sequence[str] | None, + start_date: datetime.datetime, + end_date: datetime.datetime, + limit: int, + ) -> list[BundleReference]: + start_date = self._to_naive_utc(start_date) + end_date = self._to_naive_utc(end_date) + manifest_keys = self._list_manifest_keys(storage, tenant_ids) + refs: list[BundleReference] = [] + for manifest_key in manifest_keys: + manifest_data = self._get_checked_object(storage, manifest_key) + object_prefix = manifest_key.removesuffix(f"/{ARCHIVE_BUNDLE_MANIFEST_NAME}") + manifest = self._load_and_validate_manifest(manifest_data, object_prefix=object_prefix) + min_created_at = self._parse_manifest_datetime(manifest["min_created_at"]) + max_created_at = self._parse_manifest_datetime(manifest["max_created_at"]) + if max_created_at < start_date or min_created_at >= end_date: + continue + if tenant_ids and manifest["tenant_id"] not in tenant_ids: + continue + if operation == "delete" and self._is_deleted(storage, object_prefix): + continue + if operation == "restore" and not self._is_deleted(storage, object_prefix): + continue + refs.append(BundleReference(object_prefix=object_prefix, manifest_key=manifest_key, manifest=manifest)) + + refs.sort( + key=lambda ref: ( + self._parse_manifest_datetime(ref.manifest["min_created_at"]), + ref.manifest["tenant_id"], + ref.manifest["bundle_id"], + ) + ) + return refs[:limit] + + @staticmethod + def _list_manifest_keys(storage: ArchiveStorage, tenant_ids: Sequence[str] | None) -> list[str]: + keys: list[str] = [] + if tenant_ids: + prefixes = [ + f"{_ARCHIVE_ROOT_PREFIX}tenant_prefix={tenant_id[0].lower()}/tenant_id={tenant_id}/" + for tenant_id in tenant_ids + ] + else: + prefixes = [_ARCHIVE_ROOT_PREFIX] + for prefix in prefixes: + keys.extend(storage.list_objects(prefix)) + return sorted(key for key in keys if key.endswith(f"/{ARCHIVE_BUNDLE_MANIFEST_NAME}")) + + def _delete_bundle( + self, + session: Session, + storage: ArchiveStorage, + bundle_ref: BundleReference, + ) -> BundleOperationResult: + start_time = time.time() + result = self._new_result(bundle_ref.manifest) + try: + validation_start = time.time() + manifest, table_records, archive_bytes = self._validate_archive_object(storage, bundle_ref) + result.table_counts = self._manifest_table_counts(manifest) + result.archive_bytes = archive_bytes + + self._lock_workflow_runs(session, manifest["run_ids"]) + if self._is_delete_started(storage, bundle_ref.object_prefix) and self._live_counts_match( + session, manifest, expected_present=False + ): + result.validation_time = time.time() - validation_start + if not self.dry_run: + self._mark_deleted(storage, bundle_ref.object_prefix) + self._delete_marker(storage, bundle_ref.object_prefix, ARCHIVE_BUNDLE_DELETE_STARTED_MARKER_NAME) + result.success = True + return result + + self._validate_live_counts(session, manifest, expected_present=True) + if self.strict_content_validation: + self._validate_live_content(session, table_records) + result.validation_time = time.time() - validation_start + + if not self.dry_run: + self._put_marker(storage, bundle_ref.object_prefix, ARCHIVE_BUNDLE_DELETE_STARTED_MARKER_NAME) + deleted_counts = self._delete_bundle_rows(session, table_records) + if deleted_counts != result.table_counts: + raise ValueError( + f"Deleted row count mismatch: expected={result.table_counts}, actual={deleted_counts}" + ) + self._validate_live_counts(session, manifest, expected_present=False) + session.commit() + self._mark_deleted(storage, bundle_ref.object_prefix) + self._delete_marker(storage, bundle_ref.object_prefix, ARCHIVE_BUNDLE_DELETE_STARTED_MARKER_NAME) + self._delete_marker(storage, bundle_ref.object_prefix, ARCHIVE_BUNDLE_RESTORED_MARKER_NAME) + result.success = True + except Exception as e: + session.rollback() + result.error = str(e) + logger.exception("Failed to delete V2 archive bundle %s", bundle_ref.object_prefix) + result.elapsed_time = time.time() - start_time + return result + + def _restore_bundle( + self, + session: Session, + storage: ArchiveStorage, + bundle_ref: BundleReference, + ) -> BundleOperationResult: + start_time = time.time() + result = self._new_result(bundle_ref.manifest) + try: + validation_start = time.time() + manifest, table_records, archive_bytes = self._validate_archive_object(storage, bundle_ref) + result.table_counts = self._manifest_table_counts(manifest) + result.archive_bytes = archive_bytes + + if self._live_counts_match(session, manifest, expected_present=True): + if self.strict_content_validation: + self._validate_live_content(session, table_records) + result.validation_time = time.time() - validation_start + if not self.dry_run: + self._mark_restored(storage, bundle_ref.object_prefix) + result.success = True + return result + + self._validate_live_counts(session, manifest, expected_present=False) + result.validation_time = time.time() - validation_start + + if not self.dry_run: + self._put_marker(storage, bundle_ref.object_prefix, ARCHIVE_BUNDLE_RESTORE_STARTED_MARKER_NAME) + restored_counts = self._restore_bundle_rows(session, table_records) + if restored_counts != result.table_counts: + self._validate_live_counts(session, manifest, expected_present=True) + self._validate_live_counts(session, manifest, expected_present=True) + if self.strict_content_validation: + self._validate_live_content(session, table_records) + session.commit() + self._mark_restored(storage, bundle_ref.object_prefix) + result.success = True + except Exception as e: + session.rollback() + result.error = str(e) + logger.exception("Failed to restore V2 archive bundle %s", bundle_ref.object_prefix) + result.elapsed_time = time.time() - start_time + return result + + @staticmethod + def _new_result(manifest: BundleManifest) -> BundleOperationResult: + return BundleOperationResult( + bundle_id=manifest["bundle_id"], + tenant_id=manifest["tenant_id"], + object_prefix=manifest["object_prefix"], + ) + + def _validate_archive_object( + self, + storage: ArchiveStorage, + bundle_ref: BundleReference, + ) -> tuple[BundleManifest, dict[str, list[dict[str, Any]]], int]: + manifest = bundle_ref.manifest + table_records: dict[str, list[dict[str, Any]]] = {} + total_size = len(storage.get_object(bundle_ref.manifest_key)) + for table_name in ARCHIVED_TABLES: + info = manifest["tables"][table_name] + payload = self._get_checked_object(storage, info["object_key"]) + total_size += len(payload) + if len(payload) != info["size_bytes"]: + raise ValueError( + f"Archive object size mismatch for {info['object_key']}: " + f"expected={info['size_bytes']}, actual={len(payload)}" + ) + checksum = ArchiveStorage.compute_checksum(payload) + if checksum != info["checksum"]: + raise ValueError( + f"Archive object checksum mismatch for {info['object_key']}: " + f"expected={info['checksum']}, actual={checksum}" + ) + records = self._deserialize_parquet(payload) + if len(records) != info["row_count"]: + raise ValueError( + f"Parquet row count mismatch for {info['object_key']}: " + f"expected={info['row_count']}, actual={len(records)}" + ) + table_records[table_name] = records + return manifest, table_records, total_size + + @staticmethod + def _get_checked_object(storage: ArchiveStorage, object_key: str) -> bytes: + if not storage.object_exists(object_key): + raise FileNotFoundError(f"Archive object not found: {object_key}") + return storage.get_object(object_key) + + @staticmethod + def _load_and_validate_manifest( + manifest_data: bytes, + *, + object_prefix: str, + ) -> BundleManifest: + loaded = json.loads(manifest_data) + if not isinstance(loaded, dict): + raise ValueError("manifest.json must be an object") + required_fields = { + "schema_version", + "archive_format", + "tenant_id", + "tenant_prefix", + "year", + "month", + "shard", + "bundle_id", + "object_prefix", + "workflow_run_count", + "workflow_node_execution_count", + "tables", + "run_ids", + } + missing_fields = sorted(required_fields - set(loaded)) + if missing_fields: + raise ValueError(f"manifest missing required fields: {', '.join(missing_fields)}") + manifest = cast(BundleManifest, loaded) + if manifest["schema_version"] != ARCHIVE_BUNDLE_SCHEMA_VERSION: + raise ValueError(f"unsupported bundle schema_version: {manifest['schema_version']}") + if manifest["archive_format"] != ARCHIVE_BUNDLE_FORMAT: + raise ValueError(f"unsupported bundle archive_format: {manifest['archive_format']}") + if manifest["object_prefix"] != object_prefix: + raise ValueError("manifest object_prefix does not match object key") + if manifest["tenant_id"][0].lower() != manifest["tenant_prefix"]: + raise ValueError("manifest tenant_prefix does not match tenant_id") + if len(manifest["run_ids"]) != manifest["workflow_run_count"]: + raise ValueError("manifest run_ids count does not match workflow_run_count") + + tables = manifest["tables"] + if not isinstance(tables, dict): + raise ValueError("manifest tables must be an object") + for table_name in ARCHIVED_TABLES: + if table_name not in tables: + raise ValueError(f"manifest missing table: {table_name}") + info = tables[table_name] + for key in ("row_count", "checksum", "size_bytes", "object_key"): + if key not in info: + raise ValueError(f"manifest table {table_name} missing {key}") + expected_key = f"{object_prefix}/{table_name}.parquet" + if info["object_key"] != expected_key: + raise ValueError( + f"manifest object_key mismatch for {table_name}: " + f"expected={expected_key}, actual={info['object_key']}" + ) + return manifest + + @staticmethod + def _deserialize_parquet(payload: bytes) -> list[dict[str, Any]]: + table = pq.read_table(io.BytesIO(payload)) + return table.to_pylist() + + def _validate_live_counts( + self, + session: Session, + manifest: BundleManifest, + *, + expected_present: bool, + ) -> None: + expected_counts = self._manifest_table_counts(manifest) + actual_counts = self._count_live_rows(session, manifest["run_ids"]) + if not expected_present: + expected_counts = dict.fromkeys(expected_counts, 0) + if actual_counts != expected_counts: + state = "present" if expected_present else "deleted" + raise ValueError( + f"Live row count mismatch for {state} bundle: expected={expected_counts}, actual={actual_counts}" + ) + + def _live_counts_match(self, session: Session, manifest: BundleManifest, *, expected_present: bool) -> bool: + expected_counts = self._manifest_table_counts(manifest) + if not expected_present: + expected_counts = dict.fromkeys(expected_counts, 0) + return self._count_live_rows(session, manifest["run_ids"]) == expected_counts + + @staticmethod + def _manifest_table_counts(manifest: BundleManifest) -> dict[str, int]: + return {table_name: manifest["tables"][table_name]["row_count"] for table_name in ARCHIVED_TABLES} + + def _count_live_rows(self, session: Session, run_ids: Sequence[str]) -> dict[str, int]: + node_ids = self._select_ids_by_run_ids(session, WorkflowNodeExecutionModel, run_ids) + pause_ids = self._select_ids_by_run_ids(session, WorkflowPause, run_ids) + return { + "workflow_runs": self._count_by_run_ids(session, WorkflowRun, run_ids), + "workflow_app_logs": self._count_by_run_ids(session, WorkflowAppLog, run_ids), + "workflow_node_executions": len(node_ids), + "workflow_node_execution_offload": self._count_by_column( + session, WorkflowNodeExecutionOffload, WorkflowNodeExecutionOffload.node_execution_id, node_ids + ), + "workflow_pauses": len(pause_ids), + "workflow_pause_reasons": self._count_by_column( + session, WorkflowPauseReason, WorkflowPauseReason.pause_id, pause_ids + ), + "workflow_trigger_logs": self._count_by_run_ids(session, WorkflowTriggerLog, run_ids), + } + + def _validate_live_content( + self, + session: Session, + table_records: dict[str, list[dict[str, Any]]], + ) -> None: + run_ids = [str(record["id"]) for record in table_records["workflow_runs"]] + node_ids = [str(record["id"]) for record in table_records["workflow_node_executions"]] + pause_ids = [str(record["id"]) for record in table_records["workflow_pauses"]] + + live_records = { + "workflow_runs": self._load_records_by_run_ids(session, WorkflowRun, run_ids), + "workflow_app_logs": self._load_records_by_run_ids(session, WorkflowAppLog, run_ids), + "workflow_node_executions": self._load_records_by_run_ids(session, WorkflowNodeExecutionModel, run_ids), + "workflow_node_execution_offload": self._load_records_by_column( + session, WorkflowNodeExecutionOffload, WorkflowNodeExecutionOffload.node_execution_id, node_ids + ), + "workflow_pauses": self._load_records_by_run_ids(session, WorkflowPause, run_ids), + "workflow_pause_reasons": self._load_records_by_column( + session, WorkflowPauseReason, WorkflowPauseReason.pause_id, pause_ids + ), + "workflow_trigger_logs": self._load_records_by_run_ids(session, WorkflowTriggerLog, run_ids), + } + for table_name in ARCHIVED_TABLES: + live_checksum = self._records_checksum(live_records[table_name]) + archive_checksum = self._records_checksum(table_records[table_name]) + if live_checksum != archive_checksum: + raise ValueError( + f"Live/archive content checksum mismatch for {table_name}: " + f"expected={archive_checksum}, actual={live_checksum}" + ) + + def _delete_bundle_rows( + self, + session: Session, + table_records: dict[str, list[dict[str, Any]]], + ) -> dict[str, int]: + run_ids = [str(record["id"]) for record in table_records["workflow_runs"]] + node_ids = [str(record["id"]) for record in table_records["workflow_node_executions"]] + pause_ids = [str(record["id"]) for record in table_records["workflow_pauses"]] + + deleted_counts = dict.fromkeys(ARCHIVED_TABLES, 0) + deleted_counts["workflow_pause_reasons"] = self._delete_by_column( + session, WorkflowPauseReason, WorkflowPauseReason.pause_id, pause_ids + ) + deleted_counts["workflow_node_execution_offload"] = self._delete_by_column( + session, WorkflowNodeExecutionOffload, WorkflowNodeExecutionOffload.node_execution_id, node_ids + ) + deleted_counts["workflow_trigger_logs"] = self._delete_by_run_ids(session, WorkflowTriggerLog, run_ids) + deleted_counts["workflow_app_logs"] = self._delete_by_run_ids(session, WorkflowAppLog, run_ids) + deleted_counts["workflow_node_executions"] = self._delete_by_run_ids( + session, WorkflowNodeExecutionModel, run_ids + ) + deleted_counts["workflow_pauses"] = self._delete_by_run_ids(session, WorkflowPause, run_ids) + deleted_counts["workflow_runs"] = self._delete_by_run_ids(session, WorkflowRun, run_ids) + return deleted_counts + + def _restore_bundle_rows( + self, + session: Session, + table_records: dict[str, list[dict[str, Any]]], + ) -> dict[str, int]: + restored_counts = dict.fromkeys(ARCHIVED_TABLES, 0) + for table_name in RESTORE_ORDER: + restored_counts[table_name] = self._restore_table_records(session, table_name, table_records[table_name]) + return restored_counts + + def _restore_table_records( + self, + session: Session, + table_name: str, + records: list[dict[str, Any]], + ) -> int: + if not records: + return 0 + model = TABLE_MODELS[table_name] + total = 0 + for chunk in self._chunks(records, _CHUNK_SIZE): + converted = [self._prepare_insert_record(model, record) for record in chunk] + stmt = pg_insert(cast(Any, model.__table__)).values(converted) + stmt = stmt.on_conflict_do_nothing(index_elements=["id"]) + result = session.execute(stmt) + total += cast(CursorResult, result).rowcount or 0 + return total + + def _prepare_insert_record( + self, + model: Any, + record: dict[str, Any], + ) -> dict[str, Any]: + table = model.__table__ + columns_by_name = {column.name: column for column in table.columns} + prepared = {key: value for key, value in record.items() if key in columns_by_name} + for column_name, value in list(prepared.items()): + column = columns_by_name[column_name] + if value is None: + continue + if isinstance(column.type, sa.DateTime) and isinstance(value, str): + prepared[column_name] = datetime.datetime.fromisoformat(value) + elif isinstance(column.type, sa.JSON) and isinstance(value, str): + prepared[column_name] = json.loads(value) + return prepared + + @staticmethod + def _row_to_dict(row: Any) -> dict[str, Any]: + mapper = inspect(row).mapper + return {str(column.name): getattr(row, mapper.get_property_by_column(column).key) for column in mapper.columns} + + @staticmethod + def _normalize_record_for_checksum(record: dict[str, Any]) -> dict[str, Any]: + def normalize(value: Any) -> Any: + if isinstance(value, Enum): + return value.value + if isinstance(value, dict | list): + return json.dumps(value, default=str, ensure_ascii=False) + return value + + return {key: normalize(value) for key, value in record.items()} + + @classmethod + def _records_checksum(cls, records: list[dict[str, Any]]) -> str: + normalized = [cls._normalize_record_for_checksum(record) for record in records] + normalized.sort(key=lambda record: json.dumps(record, sort_keys=True, default=str, ensure_ascii=False)) + payload = json.dumps(normalized, sort_keys=True, default=str, ensure_ascii=False, separators=(",", ":")) + return ArchiveStorage.compute_checksum(payload.encode("utf-8")) + + @staticmethod + def _lock_workflow_runs(session: Session, run_ids: Sequence[str]) -> None: + for chunk in WorkflowRunBundleArchiveMaintenance._chunks(run_ids, _CHUNK_SIZE): + list(session.scalars(select(WorkflowRun.id).where(WorkflowRun.id.in_(chunk)).with_for_update())) + + @staticmethod + def _select_ids_by_run_ids( + session: Session, + model: Any, + run_ids: Sequence[str], + ) -> list[str]: + if not run_ids: + return [] + ids: list[str] = [] + for chunk in WorkflowRunBundleArchiveMaintenance._chunks(run_ids, _CHUNK_SIZE): + ids.extend( + str(row_id) for row_id in session.scalars(select(model.id).where(model.workflow_run_id.in_(chunk))) + ) + return ids + + @staticmethod + def _count_by_run_ids( + session: Session, + model: Any, + run_ids: Sequence[str], + ) -> int: + return WorkflowRunBundleArchiveMaintenance._count_by_column( + session, model, WorkflowRunBundleArchiveMaintenance._run_id_column(model), run_ids + ) + + @staticmethod + def _count_by_column( + session: Session, + model: Any, + column: Any, + values: Sequence[str], + ) -> int: + if not values: + return 0 + total = 0 + for chunk in WorkflowRunBundleArchiveMaintenance._chunks(values, _CHUNK_SIZE): + total += session.scalar(select(func.count()).select_from(model).where(column.in_(chunk))) or 0 + return total + + def _load_records_by_run_ids( + self, + session: Session, + model: Any, + run_ids: Sequence[str], + ) -> list[dict[str, Any]]: + return self._load_records_by_column(session, model, self._run_id_column(model), run_ids) + + def _load_records_by_column( + self, + session: Session, + model: Any, + column: Any, + values: Sequence[str], + ) -> list[dict[str, Any]]: + if not values: + return [] + rows: list[Any] = [] + for chunk in self._chunks(values, _CHUNK_SIZE): + rows.extend(session.scalars(select(model).where(column.in_(chunk)))) + return [self._row_to_dict(row) for row in rows] + + @staticmethod + def _delete_by_run_ids( + session: Session, + model: Any, + run_ids: Sequence[str], + ) -> int: + return WorkflowRunBundleArchiveMaintenance._delete_by_column( + session, model, WorkflowRunBundleArchiveMaintenance._run_id_column(model), run_ids + ) + + @staticmethod + def _run_id_column(model: Any) -> Any: + if model is WorkflowRun: + return WorkflowRun.id + return model.workflow_run_id + + @staticmethod + def _delete_by_column( + session: Session, + model: Any, + column: Any, + values: Sequence[str], + ) -> int: + if not values: + return 0 + total = 0 + for chunk in WorkflowRunBundleArchiveMaintenance._chunks(values, _CHUNK_SIZE): + result = session.execute(delete(model).where(column.in_(chunk))) + total += cast(CursorResult, result).rowcount or 0 + return total + + @staticmethod + def _is_deleted(storage: ArchiveStorage, object_prefix: str) -> bool: + return storage.object_exists(f"{object_prefix}/{ARCHIVE_BUNDLE_DELETED_MARKER_NAME}") + + @staticmethod + def _is_delete_started(storage: ArchiveStorage, object_prefix: str) -> bool: + return storage.object_exists(f"{object_prefix}/{ARCHIVE_BUNDLE_DELETE_STARTED_MARKER_NAME}") + + @staticmethod + def _mark_deleted(storage: ArchiveStorage, object_prefix: str) -> None: + WorkflowRunBundleArchiveMaintenance._put_marker(storage, object_prefix, ARCHIVE_BUNDLE_DELETED_MARKER_NAME) + + @staticmethod + def _mark_restored(storage: ArchiveStorage, object_prefix: str) -> None: + WorkflowRunBundleArchiveMaintenance._delete_marker(storage, object_prefix, ARCHIVE_BUNDLE_DELETED_MARKER_NAME) + WorkflowRunBundleArchiveMaintenance._delete_marker( + storage, object_prefix, ARCHIVE_BUNDLE_RESTORE_STARTED_MARKER_NAME + ) + WorkflowRunBundleArchiveMaintenance._put_marker(storage, object_prefix, ARCHIVE_BUNDLE_RESTORED_MARKER_NAME) + + @staticmethod + def _put_marker(storage: ArchiveStorage, object_prefix: str, marker_name: str) -> None: + payload = json.dumps({"created_at": datetime.datetime.now(datetime.UTC).isoformat()}).encode("utf-8") + storage.put_object(f"{object_prefix}/{marker_name}", payload) + + @staticmethod + def _delete_marker(storage: ArchiveStorage, object_prefix: str, marker_name: str) -> None: + marker_key = f"{object_prefix}/{marker_name}" + if storage.object_exists(marker_key): + storage.delete_object(marker_key) + + @staticmethod + def _parse_manifest_datetime(value: str) -> datetime.datetime: + return WorkflowRunBundleArchiveMaintenance._to_naive_utc(datetime.datetime.fromisoformat(value)) + + @staticmethod + def _to_naive_utc(value: datetime.datetime) -> datetime.datetime: + if value.tzinfo is None: + return value + return value.astimezone(datetime.UTC).replace(tzinfo=None) + + @staticmethod + def _chunks(values: Sequence[Any], size: int) -> list[Sequence[Any]]: + return [values[index : index + size] for index in range(0, len(values), size)] + + @staticmethod + def _get_archive_storage() -> ArchiveStorage: + try: + return get_archive_storage() + except ArchiveStorageNotConfiguredError as e: + raise RuntimeError(f"Archive storage not configured: {e}") from e + + @staticmethod + def _merge_result(summary: BundleOperationSummary, result: BundleOperationResult) -> None: + summary.results.append(result) + summary.bundles_processed += 1 + summary.validation_time += result.validation_time + if result.success: + summary.bundles_succeeded += 1 + summary.rows_processed += result.row_count + summary.runs_processed += result.run_count + summary.archive_bytes += result.archive_bytes + for table_name, count in result.table_counts.items(): + summary.table_counts[table_name] = summary.table_counts.get(table_name, 0) + count + else: + summary.bundles_failed += 1 diff --git a/api/services/retention/workflow_run/constants.py b/api/services/retention/workflow_run/constants.py index 162bb4947df..d8d2807bb16 100644 --- a/api/services/retention/workflow_run/constants.py +++ b/api/services/retention/workflow_run/constants.py @@ -1,2 +1,10 @@ ARCHIVE_SCHEMA_VERSION = "1.0" ARCHIVE_BUNDLE_NAME = f"archive.v{ARCHIVE_SCHEMA_VERSION}.zip" + +ARCHIVE_BUNDLE_SCHEMA_VERSION = "2.0" +ARCHIVE_BUNDLE_FORMAT = "parquet" +ARCHIVE_BUNDLE_MANIFEST_NAME = "manifest.json" +ARCHIVE_BUNDLE_DELETE_STARTED_MARKER_NAME = "_DELETE_STARTED" +ARCHIVE_BUNDLE_DELETED_MARKER_NAME = "_DELETED" +ARCHIVE_BUNDLE_RESTORE_STARTED_MARKER_NAME = "_RESTORE_STARTED" +ARCHIVE_BUNDLE_RESTORED_MARKER_NAME = "_RESTORED" diff --git a/api/services/retention/workflow_run/delete_archived_workflow_run.py b/api/services/retention/workflow_run/delete_archived_workflow_run.py index 937a1067105..8fd8b3c8989 100644 --- a/api/services/retention/workflow_run/delete_archived_workflow_run.py +++ b/api/services/retention/workflow_run/delete_archived_workflow_run.py @@ -2,20 +2,68 @@ Delete Archived Workflow Run Service. This service deletes archived workflow run data from the database while keeping -archive logs intact. +archive logs intact. Deletion is intentionally gated by archive-object validation: +the archive bundle must exist, have a supported manifest, pass zip/member checksum +checks, and match the live row counts for every cleanup-owned table before rows +are removed from the primary database. """ +import io +import json +import logging import time +import zipfile from collections.abc import Sequence from dataclasses import dataclass, field from datetime import datetime +from typing import TypedDict from sqlalchemy.orm import Session, sessionmaker from extensions.ext_database import db -from models.workflow import WorkflowRun +from libs.archive_storage import ArchiveStorage, ArchiveStorageNotConfiguredError, get_archive_storage +from models.workflow import WorkflowArchiveLog, WorkflowRun from repositories.api_workflow_run_repository import APIWorkflowRunRepository, RunsWithRelatedCountsDict from repositories.sqlalchemy_workflow_trigger_log_repository import SQLAlchemyWorkflowTriggerLogRepository +from services.retention.workflow_run.constants import ARCHIVE_BUNDLE_NAME, ARCHIVE_SCHEMA_VERSION + +logger = logging.getLogger(__name__) + + +class _TableManifestEntry(TypedDict): + row_count: int + checksum: str + size_bytes: int + + +class _ArchiveManifest(TypedDict): + schema_version: str + workflow_run_id: str + tenant_id: str + app_id: str + workflow_id: str + tables: dict[str, _TableManifestEntry] + + +_ARCHIVED_TABLES = [ + "workflow_runs", + "workflow_app_logs", + "workflow_node_executions", + "workflow_node_execution_offload", + "workflow_pauses", + "workflow_pause_reasons", + "workflow_trigger_logs", +] + +_TABLE_TO_COUNT_KEY = { + "workflow_runs": "runs", + "workflow_app_logs": "app_logs", + "workflow_node_executions": "node_executions", + "workflow_node_execution_offload": "offloads", + "workflow_pauses": "pauses", + "workflow_pause_reasons": "pause_reasons", + "workflow_trigger_logs": "trigger_logs", +} @dataclass @@ -34,13 +82,49 @@ class DeleteResult: "pause_reasons": 0, } ) + validated_counts: RunsWithRelatedCountsDict = field( + default_factory=lambda: { # type: ignore[assignment] + "runs": 0, + "node_executions": 0, + "offloads": 0, + "app_logs": 0, + "trigger_logs": 0, + "pauses": 0, + "pause_reasons": 0, + } + ) + archive_key: str | None = None + restore_sampled: bool = False + restore_sample_success: bool | None = None error: str | None = None elapsed_time: float = 0.0 class ArchivedWorkflowRunDeletion: - def __init__(self, dry_run: bool = False): + """ + Delete archived workflow-run rows after validating the archive bundle. + + Args: + dry_run: Preview validation and row counts without deleting. + skip_bad_archives: Continue batch deletion after a validation/delete failure. + restore_sample_interval: Run restore dry-run for every Nth successful deletion; 0 disables sampling. + """ + + _delete_attempt_count: int + + def __init__( + self, + dry_run: bool = False, + *, + skip_bad_archives: bool = False, + restore_sample_interval: int = 0, + ): self.dry_run = dry_run + self.skip_bad_archives = skip_bad_archives + if restore_sample_interval < 0: + raise ValueError("restore_sample_interval must be >= 0") + self.restore_sample_interval = restore_sample_interval + self._delete_attempt_count = 0 self.workflow_run_repo: APIWorkflowRunRepository | None = None def delete_by_run_id(self, run_id: str) -> DeleteResult: @@ -57,12 +141,13 @@ class ArchivedWorkflowRunDeletion: return result result.tenant_id = run.tenant_id - if not repo.get_archived_run_ids(session, [run.id]): + archive_log = repo.get_archived_log_by_run_id(run.id) + if archive_log is None: result.error = f"Workflow run {run_id} is not archived" result.elapsed_time = time.time() - start_time return result - result = self._delete_run(run) + result = self._delete_run(run, archive_log) result.elapsed_time = time.time() - start_time return result @@ -78,8 +163,8 @@ class ArchivedWorkflowRunDeletion: repo = self._get_workflow_run_repo() with session_maker() as session: - runs = list( - repo.get_archived_runs_by_time_range( + archive_logs = list( + repo.get_archived_logs_by_time_range( session=session, tenant_ids=tenant_ids, start_date=start_date, @@ -87,14 +172,44 @@ class ArchivedWorkflowRunDeletion: limit=limit, ) ) - for run in runs: - results.append(self._delete_run(run)) + run_ids = [archive_log.workflow_run_id for archive_log in archive_logs] + runs_by_id = {run.id: run for run in session.query(WorkflowRun).where(WorkflowRun.id.in_(run_ids)).all()} + for archive_log in archive_logs: + run = runs_by_id.get(archive_log.workflow_run_id) + if run is None: + result = DeleteResult( + run_id=archive_log.workflow_run_id, + tenant_id=archive_log.tenant_id, + success=False, + error=f"Workflow run {archive_log.workflow_run_id} not found", + ) + else: + result = self._delete_run(run, archive_log) + results.append(result) + if not result.success and not self.skip_bad_archives: + logger.error("Stopping archived workflow run deletion after failure: %s", result.error) + break return results - def _delete_run(self, run: WorkflowRun) -> DeleteResult: + def _delete_run(self, run: WorkflowRun, archive_log: WorkflowArchiveLog | None = None) -> DeleteResult: start_time = time.time() result = DeleteResult(run_id=run.id, tenant_id=run.tenant_id, success=False) + if archive_log is None: + archive_log = self._get_workflow_run_repo().get_archived_log_by_run_id(run.id) + if archive_log is None: + result.error = f"Workflow run {run.id} is not archived" + result.elapsed_time = time.time() - start_time + return result + + try: + result.archive_key = self._validate_archive_before_delete(run, archive_log) + result.validated_counts = self._count_live_related_rows(run) + except Exception as e: + result.error = str(e) + result.elapsed_time = time.time() - start_time + return result + if self.dry_run: result.success = True result.elapsed_time = time.time() - start_time @@ -108,17 +223,202 @@ class ArchivedWorkflowRunDeletion: delete_trigger_logs=self._delete_trigger_logs, ) result.deleted_counts = deleted_counts + self._verify_post_delete(run.id) + if self._should_run_restore_sample(): + result.restore_sampled = True + result.restore_sample_success = self._run_restore_dry_run_sample(archive_log) + if not result.restore_sample_success: + raise RuntimeError(f"Restore dry-run sample failed for workflow run {run.id}") result.success = True except Exception as e: result.error = str(e) result.elapsed_time = time.time() - start_time return result + def _validate_archive_before_delete(self, run: WorkflowRun, archive_log: WorkflowArchiveLog) -> str: + storage = self._get_archive_storage() + archive_key = self._get_archive_key(archive_log) + if not storage.object_exists(archive_key): + raise FileNotFoundError(f"Archive bundle not found: {archive_key}") + + archive_data = storage.get_object(archive_key) + manifest = self._validate_archive_bundle( + archive_data, + run_id=run.id, + tenant_id=run.tenant_id, + app_id=run.app_id, + workflow_id=run.workflow_id, + ) + expected_counts = self._counts_from_manifest(manifest) + current_counts = self._count_live_related_rows(run) + if current_counts != expected_counts: + raise ValueError( + "Archive row count mismatch before delete: " + f"run_id={run.id}, expected={expected_counts}, current={current_counts}" + ) + return archive_key + + @staticmethod + def _validate_archive_bundle( + archive_data: bytes, + *, + run_id: str, + tenant_id: str, + app_id: str, + workflow_id: str, + ) -> _ArchiveManifest: + try: + with zipfile.ZipFile(io.BytesIO(archive_data), mode="r") as archive: + bad_member = archive.testzip() + if bad_member: + raise ValueError(f"zip CRC check failed for member {bad_member}") + try: + manifest_data = archive.read("manifest.json") + except KeyError as e: + raise ValueError("manifest.json missing from archive bundle") from e + loaded = json.loads(manifest_data) + if not isinstance(loaded, dict): + raise ValueError("manifest.json must be an object") + manifest = loaded + + required_fields = { + "schema_version", + "workflow_run_id", + "tenant_id", + "app_id", + "workflow_id", + "tables", + } + missing_fields = sorted(required_fields - set(manifest)) + if missing_fields: + raise ValueError(f"manifest missing required fields: {', '.join(missing_fields)}") + if manifest["schema_version"] != ARCHIVE_SCHEMA_VERSION: + raise ValueError( + f"unsupported archive schema_version: {manifest['schema_version']} " + f"(expected {ARCHIVE_SCHEMA_VERSION})" + ) + if manifest["workflow_run_id"] != run_id: + raise ValueError("manifest workflow_run_id does not match delete target") + if manifest["tenant_id"] != tenant_id: + raise ValueError("manifest tenant_id does not match delete target") + if manifest["app_id"] != app_id: + raise ValueError("manifest app_id does not match delete target") + if manifest["workflow_id"] != workflow_id: + raise ValueError("manifest workflow_id does not match delete target") + + tables = manifest["tables"] + if not isinstance(tables, dict): + raise ValueError("manifest tables must be an object") + missing_tables = [table_name for table_name in _ARCHIVED_TABLES if table_name not in tables] + if missing_tables: + raise ValueError(f"manifest missing tables: {', '.join(missing_tables)}") + + for table_name in _ARCHIVED_TABLES: + info = tables[table_name] + if not isinstance(info, dict): + raise ValueError(f"manifest table entry must be an object: {table_name}") + for key in ("row_count", "checksum", "size_bytes"): + if key not in info: + raise ValueError(f"manifest table {table_name} missing {key}") + member_path = f"{table_name}.jsonl" + try: + payload = archive.read(member_path) + except KeyError as e: + raise ValueError(f"archive member missing: {member_path}") from e + if len(payload) != info["size_bytes"]: + raise ValueError( + f"archive member size mismatch for {member_path}: " + f"expected={info['size_bytes']}, actual={len(payload)}" + ) + checksum = ArchiveStorage.compute_checksum(payload) + if checksum != info["checksum"]: + raise ValueError( + f"archive member checksum mismatch for {member_path}: " + f"expected={info['checksum']}, actual={checksum}" + ) + row_count = len(ArchiveStorage.deserialize_from_jsonl(payload)) + if row_count != info["row_count"]: + raise ValueError( + f"archive row count mismatch for {member_path}: " + f"expected={info['row_count']}, actual={row_count}" + ) + + return manifest # type: ignore[return-value] + except zipfile.BadZipFile as e: + raise ValueError("archive bundle is not a valid zip file") from e + + @staticmethod + def _counts_from_manifest(manifest: _ArchiveManifest) -> RunsWithRelatedCountsDict: + counts: RunsWithRelatedCountsDict = { + "runs": 0, + "node_executions": 0, + "offloads": 0, + "app_logs": 0, + "trigger_logs": 0, + "pauses": 0, + "pause_reasons": 0, + } + for table_name, count_key in _TABLE_TO_COUNT_KEY.items(): + counts[count_key] = manifest["tables"][table_name]["row_count"] # type: ignore[literal-required] + return counts + + def _count_live_related_rows(self, run: WorkflowRun) -> RunsWithRelatedCountsDict: + repo = self._get_workflow_run_repo() + return repo.count_runs_with_related( + [run], + count_node_executions=self._count_node_executions, + count_trigger_logs=self._count_trigger_logs, + ) + + def _verify_post_delete(self, run_id: str) -> None: + with sessionmaker(bind=db.engine, expire_on_commit=False)() as session: + if session.get(WorkflowRun, run_id) is not None: + raise RuntimeError(f"Post-delete verification failed: workflow run {run_id} still exists") + + def _should_run_restore_sample(self) -> bool: + if self.restore_sample_interval == 0: + return False + self._delete_attempt_count += 1 + return self._delete_attempt_count % self.restore_sample_interval == 0 + + @staticmethod + def _run_restore_dry_run_sample(archive_log: WorkflowArchiveLog) -> bool: + from services.retention.workflow_run.restore_archived_workflow_run import WorkflowRunRestore + + restorer = WorkflowRunRestore(dry_run=True, workers=1) + # Reuse restore's dry-run path so the runbook exercises the actual restore code. + result = restorer._restore_from_run( + archive_log, + session_maker=sessionmaker(bind=db.engine, expire_on_commit=False), + ) + return result.success + + @staticmethod + def _get_archive_key(archive_log: WorkflowArchiveLog) -> str: + created_at = archive_log.run_created_at + prefix = ( + f"{archive_log.tenant_id}/app_id={archive_log.app_id}/year={created_at.strftime('%Y')}/" + f"month={created_at.strftime('%m')}/workflow_run_id={archive_log.workflow_run_id}" + ) + return f"{prefix}/{ARCHIVE_BUNDLE_NAME}" + + @staticmethod + def _get_archive_storage() -> ArchiveStorage: + try: + return get_archive_storage() + except ArchiveStorageNotConfiguredError as e: + raise RuntimeError(f"Archive storage not configured: {e}") from e + @staticmethod def _delete_trigger_logs(session: Session, run_ids: Sequence[str]) -> int: trigger_repo = SQLAlchemyWorkflowTriggerLogRepository(session) return trigger_repo.delete_by_run_ids(run_ids) + @staticmethod + def _count_trigger_logs(session: Session, run_ids: Sequence[str]) -> int: + trigger_repo = SQLAlchemyWorkflowTriggerLogRepository(session) + return trigger_repo.count_by_run_ids(run_ids) + @staticmethod def _delete_node_executions( session: Session, @@ -132,6 +432,19 @@ class ArchivedWorkflowRunDeletion: ) return repo.delete_by_runs(session, run_ids) + @staticmethod + def _count_node_executions( + session: Session, + runs: Sequence[WorkflowRun], + ) -> tuple[int, int]: + from repositories.factory import DifyAPIRepositoryFactory + + run_ids = [run.id for run in runs] + repo = DifyAPIRepositoryFactory.create_api_workflow_node_execution_repository( + session_maker=sessionmaker(bind=session.get_bind(), expire_on_commit=False) + ) + return repo.count_by_runs(session, run_ids) + def _get_workflow_run_repo(self) -> APIWorkflowRunRepository: if self.workflow_run_repo is not None: return self.workflow_run_repo diff --git a/api/services/retention/workflow_run/tenant_prefix.py b/api/services/retention/workflow_run/tenant_prefix.py new file mode 100644 index 00000000000..34d1591c70d --- /dev/null +++ b/api/services/retention/workflow_run/tenant_prefix.py @@ -0,0 +1,20 @@ +from __future__ import annotations + +import sqlalchemy as sa + + +def tenant_prefix_bounds(prefix: str) -> tuple[str, str | None]: + prefix_value = int(prefix, 16) + lower_bound = f"{prefix}0000000-0000-0000-0000-000000000000" + if prefix_value == 15: + return lower_bound, None + upper_bound = f"{prefix_value + 1:x}0000000-0000-0000-0000-000000000000" + return lower_bound, upper_bound + + +def tenant_prefix_condition(column, prefix: str): + lower_bound, upper_bound = tenant_prefix_bounds(prefix) + condition = column >= lower_bound + if upper_bound is not None: + condition = sa.and_(condition, column < upper_bound) + return condition diff --git a/api/tests/integration_tests/services/retention/test_workflow_run_archiver.py b/api/tests/integration_tests/services/retention/test_workflow_run_archiver.py index 5728eacdfb4..9ce15c86e75 100644 --- a/api/tests/integration_tests/services/retention/test_workflow_run_archiver.py +++ b/api/tests/integration_tests/services/retention/test_workflow_run_archiver.py @@ -1,17 +1,17 @@ import datetime -import io import json import uuid -import zipfile from unittest.mock import MagicMock, patch +import pyarrow as pa +import pyarrow.parquet as pq import pytest from services.retention.workflow_run.archive_paid_plan_workflow_run import ( ArchiveSummary, WorkflowRunArchiver, ) -from services.retention.workflow_run.constants import ARCHIVE_SCHEMA_VERSION +from services.retention.workflow_run.constants import ARCHIVE_BUNDLE_FORMAT, ARCHIVE_BUNDLE_SCHEMA_VERSION class TestWorkflowRunArchiverInit: @@ -39,6 +39,22 @@ class TestWorkflowRunArchiverInit: with pytest.raises(ValueError, match="workers must be at least 1"): WorkflowRunArchiver(workers=0) + def test_run_shard_index_without_total_raises(self): + with pytest.raises(ValueError, match="run_shard_index and run_shard_total must be provided together"): + WorkflowRunArchiver(run_shard_index=0) + + def test_run_shard_total_without_index_raises(self): + with pytest.raises(ValueError, match="run_shard_index and run_shard_total must be provided together"): + WorkflowRunArchiver(run_shard_total=4) + + def test_run_shard_total_above_supported_range_raises(self): + with pytest.raises(ValueError, match="run_shard_total must be between 1 and 16"): + WorkflowRunArchiver(run_shard_index=0, run_shard_total=17) + + def test_run_shard_index_must_be_less_than_total(self): + with pytest.raises(ValueError, match="run_shard_index must be between 0 and run_shard_total - 1"): + WorkflowRunArchiver(run_shard_index=4, run_shard_total=4) + def test_valid_init_defaults(self): archiver = WorkflowRunArchiver(days=30, batch_size=50) assert archiver.days == 30 @@ -55,29 +71,93 @@ class TestWorkflowRunArchiverInit: assert archiver.end_before is not None assert archiver.workers == 2 + def test_delete_after_archive_is_not_supported_for_bundle_archive(self): + with pytest.raises(ValueError, match="delete_after_archive is not supported by bundle archive"): + WorkflowRunArchiver(delete_after_archive=True) + + def test_get_runs_batch_passes_shard_options(self): + repo = MagicMock() + repo.get_runs_batch_by_time_range.return_value = [] + archiver = WorkflowRunArchiver( + tenant_prefixes=["0", "a"], + run_shard_index=1, + run_shard_total=4, + workflow_run_repo=repo, + ) + + archiver._get_runs_batch(None) + + repo.get_runs_batch_by_time_range.assert_called_once() + assert repo.get_runs_batch_by_time_range.call_args.kwargs["tenant_prefixes"] == ["0", "a"] + assert repo.get_runs_batch_by_time_range.call_args.kwargs["run_shard_index"] == 1 + assert repo.get_runs_batch_by_time_range.call_args.kwargs["run_shard_total"] == 4 + + def test_get_runs_batch_prefers_planned_tenant_ids_over_prefix_filter(self): + repo = MagicMock() + repo.get_runs_batch_by_time_range.return_value = [] + archiver = WorkflowRunArchiver( + tenant_ids=["0tenant"], + tenant_prefixes=["0"], + paid_tenant_ids=["0tenant"], + workflow_run_repo=repo, + ) + + archiver._get_runs_batch(None) + + repo.get_runs_batch_by_time_range.assert_called_once() + assert repo.get_runs_batch_by_time_range.call_args.kwargs["tenant_ids"] == ["0tenant"] + assert repo.get_runs_batch_by_time_range.call_args.kwargs["tenant_prefixes"] is None + + def test_get_runs_batch_uses_current_tenant_scan_scope(self): + repo = MagicMock() + repo.get_runs_batch_by_time_range.return_value = [] + archiver = WorkflowRunArchiver( + tenant_ids=["tenant-a", "tenant-b"], + workflow_run_repo=repo, + ) + + archiver._get_runs_batch(None, tenant_scope=["tenant-b"]) + + repo.get_runs_batch_by_time_range.assert_called_once() + assert repo.get_runs_batch_by_time_range.call_args.kwargs["tenant_ids"] == ["tenant-b"] + + def test_start_message_includes_shard(self): + archiver = WorkflowRunArchiver(tenant_prefixes=["0"], run_shard_index=1, run_shard_total=4) + + message = archiver._build_start_message() + + assert "tenant_prefixes=0" in message + assert "run_shard=1/4" in message + + def test_start_message_summarizes_large_planned_tenant_list(self): + tenant_ids = [f"tenant-{index}" for index in range(11)] + archiver = WorkflowRunArchiver(tenant_ids=tenant_ids, tenant_prefixes=["0"]) + + message = archiver._build_start_message() + + assert "tenant_ids=11 planned tenants" in message + assert "tenant-10" not in message + class TestBuildArchiveBundle: - def test_bundle_contains_manifest_and_all_tables(self): + def test_bundle_contains_manifest_and_all_table_objects(self): archiver = WorkflowRunArchiver(days=90) + run = MagicMock() + run.id = str(uuid.uuid4()) + run.tenant_id = str(uuid.uuid4()) + run.created_at = datetime.datetime(2025, 3, 15, 10, 0, 0) + identity = archiver._build_bundle_identity([run]) + table_data = {"workflow_runs": [{"id": run.id, "tenant_id": run.tenant_id}]} - manifest_data = json.dumps({"schema_version": ARCHIVE_SCHEMA_VERSION}).encode("utf-8") - table_payloads = dict.fromkeys(archiver.ARCHIVED_TABLES, b"") + table_stats, table_payloads, manifest_data = archiver._build_archive_payload(identity, [run], table_data) + manifest = json.loads(manifest_data) - bundle_bytes = archiver._build_archive_bundle(manifest_data, table_payloads) - - with zipfile.ZipFile(io.BytesIO(bundle_bytes), "r") as zf: - names = set(zf.namelist()) - assert "manifest.json" in names - for table in archiver.ARCHIVED_TABLES: - assert f"{table}.jsonl" in names, f"Missing {table}.jsonl in bundle" - - def test_bundle_missing_table_payload_raises(self): - archiver = WorkflowRunArchiver(days=90) - manifest_data = b"{}" - incomplete_payloads = {archiver.ARCHIVED_TABLES[0]: b"data"} - - with pytest.raises(ValueError, match="Missing archive payload"): - archiver._build_archive_bundle(manifest_data, incomplete_payloads) + assert manifest["schema_version"] == ARCHIVE_BUNDLE_SCHEMA_VERSION + assert manifest["archive_format"] == ARCHIVE_BUNDLE_FORMAT + assert manifest["object_prefix"] == identity.object_prefix + assert set(table_payloads) == set(archiver.ARCHIVED_TABLES) + assert {stat.table_name for stat in table_stats} == set(archiver.ARCHIVED_TABLES) + assert pq.read_table(pa.BufferReader(table_payloads["workflow_runs"])).num_rows == 1 class TestGenerateManifest: @@ -88,25 +168,39 @@ class TestGenerateManifest: run = MagicMock() run.id = str(uuid.uuid4()) run.tenant_id = str(uuid.uuid4()) - run.app_id = str(uuid.uuid4()) - run.workflow_id = str(uuid.uuid4()) run.created_at = datetime.datetime(2025, 3, 15, 10, 0, 0) + identity = archiver._build_bundle_identity([run]) stats = [ - TableStats(table_name="workflow_runs", row_count=1, checksum="abc123", size_bytes=512), - TableStats(table_name="workflow_app_logs", row_count=2, checksum="def456", size_bytes=1024), + TableStats( + table_name="workflow_runs", + row_count=1, + checksum="abc123", + size_bytes=512, + object_key="workflow_runs.parquet", + ), + TableStats( + table_name="workflow_node_executions", + row_count=2, + checksum="def456", + size_bytes=1024, + object_key="workflow_node_executions.parquet", + ), ] - manifest = archiver._generate_manifest(run, stats) + manifest = archiver._generate_manifest(identity, [run], stats) - assert manifest["schema_version"] == ARCHIVE_SCHEMA_VERSION - assert manifest["workflow_run_id"] == run.id + assert manifest["schema_version"] == ARCHIVE_BUNDLE_SCHEMA_VERSION + assert manifest["archive_format"] == ARCHIVE_BUNDLE_FORMAT + assert manifest["bundle_id"] == identity.bundle_id assert manifest["tenant_id"] == run.tenant_id - assert manifest["app_id"] == run.app_id + assert manifest["workflow_run_count"] == 1 + assert manifest["workflow_node_execution_count"] == 2 + assert manifest["run_ids"] == [run.id] assert "tables" in manifest assert manifest["tables"]["workflow_runs"]["row_count"] == 1 assert manifest["tables"]["workflow_runs"]["checksum"] == "abc123" - assert manifest["tables"]["workflow_app_logs"]["row_count"] == 2 + assert manifest["tables"]["workflow_node_executions"]["row_count"] == 2 class TestFilterPaidTenants: @@ -163,6 +257,19 @@ class TestFilterPaidTenants: assert result == set() + def test_planned_paid_tenants_skip_billing_lookup(self): + archiver = WorkflowRunArchiver(days=90, paid_tenant_ids=["t1", "t3"]) + + with ( + patch("services.retention.workflow_run.archive_paid_plan_workflow_run.dify_config") as cfg, + patch("services.retention.workflow_run.archive_paid_plan_workflow_run.BillingService") as billing, + ): + cfg.BILLING_ENABLED = True + result = archiver._filter_paid_tenants({"t1", "t2", "t3"}) + + billing.get_plan_bulk_with_cache.assert_not_called() + assert result == {"t1", "t3"} + class TestDryRunArchive: @patch("services.retention.workflow_run.archive_paid_plan_workflow_run.get_archive_storage") @@ -175,3 +282,81 @@ class TestDryRunArchive: mock_get_storage.assert_not_called() assert isinstance(summary, ArchiveSummary) assert summary.runs_failed == 0 + + def test_dry_run_estimates_table_and_object_sizes(self): + archiver = WorkflowRunArchiver(days=90, dry_run=True) + run = MagicMock() + run.id = "run-1" + run.tenant_id = "tenant-1" + run.app_id = "app-1" + run.workflow_id = "workflow-1" + run.created_at = datetime.datetime(2025, 3, 15, 10, 0, 0) + table_data = { + "workflow_runs": [{"id": "run-1", "tenant_id": "tenant-1"}], + "workflow_app_logs": [{"id": "log-1", "workflow_run_id": "run-1"}], + } + + with patch.object(archiver, "_extract_bundle_data", return_value=table_data): + result = archiver._archive_bundle(MagicMock(), None, [run]) + + stats_by_table = {stat.table_name: stat for stat in result.tables} + assert result.success is True + assert result.object_size_bytes > 0 + assert stats_by_table["workflow_runs"].row_count == 1 + assert stats_by_table["workflow_runs"].size_bytes > 0 + assert stats_by_table["workflow_app_logs"].row_count == 1 + assert stats_by_table["workflow_app_logs"].size_bytes > 0 + assert stats_by_table["workflow_node_executions"].row_count == 0 + assert stats_by_table["workflow_node_executions"].size_bytes > 0 + + def test_summary_merges_dry_run_estimates(self): + summary = ArchiveSummary() + result = MagicMock() + result.object_size_bytes = 128 + result.tables = [ + MagicMock(table_name="workflow_runs", row_count=1, size_bytes=64), + MagicMock(table_name="workflow_app_logs", row_count=2, size_bytes=32), + ] + + WorkflowRunArchiver._merge_result_stats(summary, result) + + assert summary.total_object_size_bytes == 128 + assert summary.table_stats["workflow_runs"].row_count == 1 + assert summary.table_stats["workflow_runs"].size_bytes == 64 + assert summary.table_stats["workflow_app_logs"].row_count == 2 + assert summary.table_stats["workflow_app_logs"].size_bytes == 32 + + +class TestArchiveRunIdempotency: + def test_locked_bundle_is_skipped(self): + archiver = WorkflowRunArchiver(days=90) + run = MagicMock() + run.id = "run-1" + run.tenant_id = "tenant-1" + run.created_at = datetime.datetime(2025, 3, 15, 10, 0, 0) + + with ( + patch.object(archiver, "_lock_runs_for_archive", return_value=[]), + ): + storage = MagicMock() + storage.object_exists.return_value = False + result = archiver._archive_bundle(MagicMock(), storage, [run]) + + assert result.success is True + assert result.skipped is True + assert result.error == "one or more runs locked or deleted by another archiver" + + def test_already_archived_bundle_is_skipped(self): + archiver = WorkflowRunArchiver(days=90) + run = MagicMock() + run.id = "run-1" + run.tenant_id = "tenant-1" + run.created_at = datetime.datetime(2025, 3, 15, 10, 0, 0) + storage = MagicMock() + storage.object_exists.return_value = True + + result = archiver._archive_bundle(MagicMock(), storage, [run]) + + assert result.success is True + assert result.skipped is True + assert result.error == "bundle already archived" diff --git a/api/tests/test_containers_integration_tests/services/test_delete_archived_workflow_run.py b/api/tests/test_containers_integration_tests/services/test_delete_archived_workflow_run.py index 69c39b8bfbb..62d6dd489a5 100644 --- a/api/tests/test_containers_integration_tests/services/test_delete_archived_workflow_run.py +++ b/api/tests/test_containers_integration_tests/services/test_delete_archived_workflow_run.py @@ -2,17 +2,44 @@ Testcontainers integration tests for archived workflow run deletion service. """ +import io +import json +import zipfile from datetime import UTC, datetime, timedelta +from unittest.mock import MagicMock, patch from uuid import uuid4 from sqlalchemy import select from sqlalchemy.orm import Session from graphon.enums import WorkflowExecutionStatus +from libs.archive_storage import ArchiveStorage from models.enums import CreatorUserRole, WorkflowRunTriggeredFrom from models.workflow import WorkflowArchiveLog, WorkflowRun +from services.retention.workflow_run.constants import ARCHIVE_BUNDLE_NAME, ARCHIVE_SCHEMA_VERSION from services.retention.workflow_run.delete_archived_workflow_run import ArchivedWorkflowRunDeletion +ARCHIVED_TABLES = [ + "workflow_runs", + "workflow_app_logs", + "workflow_node_executions", + "workflow_node_execution_offload", + "workflow_pauses", + "workflow_pause_reasons", + "workflow_trigger_logs", +] + + +class FakeArchiveStorage: + def __init__(self, objects: dict[str, bytes]): + self.objects = objects + + def object_exists(self, key: str) -> bool: + return key in self.objects + + def get_object(self, key: str) -> bytes: + return self.objects[key] + class TestArchivedWorkflowRunDeletion: def _create_workflow_run( @@ -47,7 +74,7 @@ class TestArchivedWorkflowRunDeletion: db_session_with_containers.commit() return run - def _create_archive_log(self, db_session_with_containers: Session, *, run: WorkflowRun) -> None: + def _create_archive_log(self, db_session_with_containers: Session, *, run: WorkflowRun) -> WorkflowArchiveLog: archive_log = WorkflowArchiveLog( tenant_id=run.tenant_id, app_id=run.app_id, @@ -72,6 +99,59 @@ class TestArchivedWorkflowRunDeletion: ) db_session_with_containers.add(archive_log) db_session_with_containers.commit() + return archive_log + + def _archive_key(self, run: WorkflowRun) -> str: + return ( + f"{run.tenant_id}/app_id={run.app_id}/year={run.created_at.strftime('%Y')}/" + f"month={run.created_at.strftime('%m')}/workflow_run_id={run.id}/{ARCHIVE_BUNDLE_NAME}" + ) + + def _archive_bundle(self, run: WorkflowRun, *, workflow_run_rows: int = 1) -> bytes: + table_payloads: dict[str, bytes] = {} + table_counts = { + "workflow_runs": workflow_run_rows, + "workflow_app_logs": 0, + "workflow_node_executions": 0, + "workflow_node_execution_offload": 0, + "workflow_pauses": 0, + "workflow_pause_reasons": 0, + "workflow_trigger_logs": 0, + } + for table_name in ARCHIVED_TABLES: + records = [{"id": run.id}] if table_name == "workflow_runs" and workflow_run_rows else [] + table_payloads[table_name] = ArchiveStorage.serialize_to_jsonl(records) + + manifest = { + "schema_version": ARCHIVE_SCHEMA_VERSION, + "workflow_run_id": run.id, + "tenant_id": run.tenant_id, + "app_id": run.app_id, + "workflow_id": run.workflow_id, + "created_at": run.created_at.isoformat(), + "archived_at": datetime.now(UTC).isoformat(), + "tables": { + table_name: { + "row_count": table_counts[table_name], + "checksum": ArchiveStorage.compute_checksum(payload), + "size_bytes": len(payload), + } + for table_name, payload in table_payloads.items() + }, + } + buffer = io.BytesIO() + with zipfile.ZipFile(buffer, mode="w", compression=zipfile.ZIP_DEFLATED) as archive: + archive.writestr("manifest.json", json.dumps(manifest).encode("utf-8")) + for table_name, payload in table_payloads.items(): + archive.writestr(f"{table_name}.jsonl", payload) + return buffer.getvalue() + + def _patch_storage(self, run: WorkflowRun): + storage = FakeArchiveStorage({self._archive_key(run): self._archive_bundle(run)}) + return patch( + "services.retention.workflow_run.delete_archived_workflow_run.get_archive_storage", + return_value=storage, + ) def test_delete_by_run_id_returns_error_when_run_missing(self, db_session_with_containers: Session): deleter = ArchivedWorkflowRunDeletion() @@ -109,13 +189,23 @@ class TestArchivedWorkflowRunDeletion: self._create_archive_log(db_session_with_containers, run=run2) run_ids = [run1.id, run2.id] - deleter = ArchivedWorkflowRunDeletion() - results = deleter.delete_batch( - tenant_ids=[tenant_id], - start_date=base_time - timedelta(minutes=1), - end_date=base_time + timedelta(minutes=1), - limit=2, + storage = FakeArchiveStorage( + { + self._archive_key(run1): self._archive_bundle(run1), + self._archive_key(run2): self._archive_bundle(run2), + } ) + deleter = ArchivedWorkflowRunDeletion() + with patch( + "services.retention.workflow_run.delete_archived_workflow_run.get_archive_storage", + return_value=storage, + ): + results = deleter.delete_batch( + tenant_ids=[tenant_id], + start_date=base_time - timedelta(minutes=1), + end_date=base_time + timedelta(minutes=1), + limit=2, + ) assert len(results) == 2 assert all(result.success for result in results) @@ -133,9 +223,11 @@ class TestArchivedWorkflowRunDeletion: created_at=datetime.now(UTC), ) run_id = run.id + archive_log = self._create_archive_log(db_session_with_containers, run=run) deleter = ArchivedWorkflowRunDeletion() - result = deleter._delete_run(run) + with self._patch_storage(run): + result = deleter._delete_run(run, archive_log) assert result.success is True assert result.deleted_counts["runs"] == 1 @@ -152,9 +244,11 @@ class TestArchivedWorkflowRunDeletion: created_at=datetime.now(UTC), ) run_id = run.id + archive_log = self._create_archive_log(db_session_with_containers, run=run) deleter = ArchivedWorkflowRunDeletion(dry_run=True) - result = deleter._delete_run(run) + with self._patch_storage(run): + result = deleter._delete_run(run, archive_log) assert result.success is True assert result.run_id == run_id @@ -164,22 +258,33 @@ class TestArchivedWorkflowRunDeletion: def test_delete_run_exception_returns_error(self, db_session_with_containers: Session): """Exception during deletion should return failure result.""" - from unittest.mock import MagicMock, patch - tenant_id = str(uuid4()) run = self._create_workflow_run( db_session_with_containers, tenant_id=tenant_id, created_at=datetime.now(UTC), ) + archive_log = self._create_archive_log(db_session_with_containers, run=run) deleter = ArchivedWorkflowRunDeletion(dry_run=False) + expected_counts = { + "runs": 1, + "node_executions": 0, + "offloads": 0, + "app_logs": 0, + "trigger_logs": 0, + "pauses": 0, + "pause_reasons": 0, + } with patch.object(deleter, "_get_workflow_run_repo") as mock_get_repo: mock_repo = MagicMock() mock_get_repo.return_value = mock_repo + mock_repo.get_archived_log_by_run_id.return_value = archive_log + mock_repo.count_runs_with_related.return_value = expected_counts mock_repo.delete_runs_with_related.side_effect = Exception("Database error") - result = deleter._delete_run(run) + with self._patch_storage(run): + result = deleter._delete_run(run, archive_log) assert result.success is False assert result.error == "Database error" @@ -197,7 +302,8 @@ class TestArchivedWorkflowRunDeletion: run_id = run.id deleter = ArchivedWorkflowRunDeletion() - result = deleter.delete_by_run_id(run_id) + with self._patch_storage(run): + result = deleter.delete_by_run_id(run_id) assert result.success is True db_session_with_containers.expunge_all() @@ -212,3 +318,48 @@ class TestArchivedWorkflowRunDeletion: assert repo1 is repo2 assert deleter.workflow_run_repo is repo1 + + def test_delete_run_fails_when_archive_object_missing(self, db_session_with_containers: Session): + tenant_id = str(uuid4()) + run = self._create_workflow_run( + db_session_with_containers, + tenant_id=tenant_id, + created_at=datetime.now(UTC), + ) + archive_log = self._create_archive_log(db_session_with_containers, run=run) + deleter = ArchivedWorkflowRunDeletion() + storage = FakeArchiveStorage({}) + + with patch( + "services.retention.workflow_run.delete_archived_workflow_run.get_archive_storage", + return_value=storage, + ): + result = deleter._delete_run(run, archive_log) + + assert result.success is False + assert result.error == f"Archive bundle not found: {self._archive_key(run)}" + db_session_with_containers.expire_all() + assert db_session_with_containers.get(WorkflowRun, run.id) is not None + + def test_delete_run_fails_when_manifest_count_differs_from_live_rows(self, db_session_with_containers: Session): + tenant_id = str(uuid4()) + run = self._create_workflow_run( + db_session_with_containers, + tenant_id=tenant_id, + created_at=datetime.now(UTC), + ) + archive_log = self._create_archive_log(db_session_with_containers, run=run) + bundle = self._archive_bundle(run, workflow_run_rows=0) + storage = FakeArchiveStorage({self._archive_key(run): bundle}) + deleter = ArchivedWorkflowRunDeletion() + + with patch( + "services.retention.workflow_run.delete_archived_workflow_run.get_archive_storage", + return_value=storage, + ): + result = deleter._delete_run(run, archive_log) + + assert result.success is False + assert "Archive row count mismatch before delete" in str(result.error) + db_session_with_containers.expire_all() + assert db_session_with_containers.get(WorkflowRun, run.id) is not None diff --git a/api/tests/unit_tests/services/retention/workflow_run/test_delete_archived_workflow_run.py b/api/tests/unit_tests/services/retention/workflow_run/test_delete_archived_workflow_run.py new file mode 100644 index 00000000000..be1b94b2fad --- /dev/null +++ b/api/tests/unit_tests/services/retention/workflow_run/test_delete_archived_workflow_run.py @@ -0,0 +1,96 @@ +import io +import json +import zipfile +from datetime import UTC, datetime + +import pytest + +from libs.archive_storage import ArchiveStorage +from services.retention.workflow_run.constants import ARCHIVE_SCHEMA_VERSION +from services.retention.workflow_run.delete_archived_workflow_run import ArchivedWorkflowRunDeletion + +ARCHIVED_TABLES = [ + "workflow_runs", + "workflow_app_logs", + "workflow_node_executions", + "workflow_node_execution_offload", + "workflow_pauses", + "workflow_pause_reasons", + "workflow_trigger_logs", +] + + +def _build_archive_bundle( + *, + run_id: str = "run-1", + tenant_id: str = "tenant-1", + app_id: str = "app-1", + workflow_id: str = "workflow-1", + corrupt_checksum_for: str | None = None, +) -> bytes: + table_payloads: dict[str, bytes] = {} + for table_name in ARCHIVED_TABLES: + records = [{"id": run_id}] if table_name == "workflow_runs" else [] + table_payloads[table_name] = ArchiveStorage.serialize_to_jsonl(records) + + manifest = { + "schema_version": ARCHIVE_SCHEMA_VERSION, + "workflow_run_id": run_id, + "tenant_id": tenant_id, + "app_id": app_id, + "workflow_id": workflow_id, + "created_at": datetime.now(UTC).isoformat(), + "archived_at": datetime.now(UTC).isoformat(), + "tables": { + table_name: { + "row_count": 1 if table_name == "workflow_runs" else 0, + "checksum": ArchiveStorage.compute_checksum(payload), + "size_bytes": len(payload), + } + for table_name, payload in table_payloads.items() + }, + } + if corrupt_checksum_for: + manifest["tables"][corrupt_checksum_for]["checksum"] = "bad-checksum" + + buffer = io.BytesIO() + with zipfile.ZipFile(buffer, mode="w", compression=zipfile.ZIP_DEFLATED) as archive: + archive.writestr("manifest.json", json.dumps(manifest).encode("utf-8")) + for table_name, payload in table_payloads.items(): + archive.writestr(f"{table_name}.jsonl", payload) + return buffer.getvalue() + + +def test_validate_archive_bundle_accepts_valid_archive() -> None: + manifest = ArchivedWorkflowRunDeletion._validate_archive_bundle( + _build_archive_bundle(), + run_id="run-1", + tenant_id="tenant-1", + app_id="app-1", + workflow_id="workflow-1", + ) + + assert manifest["schema_version"] == ARCHIVE_SCHEMA_VERSION + assert manifest["tables"]["workflow_runs"]["row_count"] == 1 + + +def test_validate_archive_bundle_rejects_checksum_mismatch() -> None: + with pytest.raises(ValueError, match="archive member checksum mismatch"): + ArchivedWorkflowRunDeletion._validate_archive_bundle( + _build_archive_bundle(corrupt_checksum_for="workflow_runs"), + run_id="run-1", + tenant_id="tenant-1", + app_id="app-1", + workflow_id="workflow-1", + ) + + +def test_validate_archive_bundle_rejects_manifest_target_mismatch() -> None: + with pytest.raises(ValueError, match="manifest tenant_id does not match delete target"): + ArchivedWorkflowRunDeletion._validate_archive_bundle( + _build_archive_bundle(), + run_id="run-1", + tenant_id="different-tenant", + app_id="app-1", + workflow_id="workflow-1", + ) diff --git a/api/tests/unit_tests/services/test_archive_workflow_run_logs.py b/api/tests/unit_tests/services/test_archive_workflow_run_logs.py index bb9e5a8deab..a21f1de769c 100644 --- a/api/tests/unit_tests/services/test_archive_workflow_run_logs.py +++ b/api/tests/unit_tests/services/test_archive_workflow_run_logs.py @@ -9,8 +9,6 @@ This module contains tests for: from datetime import datetime from unittest.mock import MagicMock, patch -from services.retention.workflow_run.constants import ARCHIVE_BUNDLE_NAME - class TestWorkflowRunArchiver: """Tests for the WorkflowRunArchiver class.""" @@ -37,18 +35,20 @@ class TestWorkflowRunArchiver: assert archiver.limit == 50 assert archiver.dry_run is True - def test_get_archive_key(self): - """Test archive key generation.""" + def test_get_bundle_manifest_key(self): + """Test V2 bundle manifest key generation.""" from services.retention.workflow_run.archive_paid_plan_workflow_run import WorkflowRunArchiver - archiver = WorkflowRunArchiver.__new__(WorkflowRunArchiver) + archiver = WorkflowRunArchiver(run_shard_index=1, run_shard_total=4) mock_run = MagicMock() - mock_run.tenant_id = "tenant-123" - mock_run.app_id = "app-999" + mock_run.tenant_id = "9enant-123" mock_run.id = "run-456" mock_run.created_at = datetime(2024, 1, 15, 12, 0, 0) - key = archiver._get_archive_key(mock_run) + identity = archiver._build_bundle_identity([mock_run]) + key = archiver._get_manifest_object_key(identity) - assert key == f"tenant-123/app_id=app-999/year=2024/month=01/workflow_run_id=run-456/{ARCHIVE_BUNDLE_NAME}" + assert key.endswith("/manifest.json") + assert "workflow-runs/v2/tenant_prefix=9/tenant_id=9enant-123/year=2024/month=01" in key + assert "/shard=01-of-04/" in key