diff --git a/api/commands.py b/api/commands.py index e223df74d4..aa7b731a27 100644 --- a/api/commands.py +++ b/api/commands.py @@ -862,8 +862,27 @@ def clear_free_plan_tenant_expired_logs(days: int, batch: int, tenant_ids: list[ @click.command("clean-workflow-runs", help="Clean expired workflow runs and related data for free tenants.") -@click.option("--days", default=30, show_default=True, help="Delete workflow runs created before N days ago.") +@click.option( + "--before-days", + "--days", + default=30, + show_default=True, + type=click.IntRange(min=0), + help="Delete workflow runs created before N days ago.", +) @click.option("--batch-size", default=200, show_default=True, help="Batch size for selecting workflow runs.") +@click.option( + "--from-days-ago", + default=None, + type=click.IntRange(min=0), + help="Lower bound in days ago (older). Must be paired with --to-days-ago.", +) +@click.option( + "--to-days-ago", + default=None, + type=click.IntRange(min=0), + help="Upper bound in days ago (newer). Must be paired with --from-days-ago.", +) @click.option( "--start-from", type=click.DateTime(formats=["%Y-%m-%d", "%Y-%m-%dT%H:%M:%S"]), @@ -882,8 +901,10 @@ def clear_free_plan_tenant_expired_logs(days: int, batch: int, tenant_ids: list[ help="Preview cleanup results without deleting any workflow run data.", ) def clean_workflow_runs( - days: int, + before_days: int, batch_size: int, + from_days_ago: int | None, + to_days_ago: int | None, start_from: datetime.datetime | None, end_before: datetime.datetime | None, dry_run: bool, @@ -894,11 +915,24 @@ def clean_workflow_runs( if (start_from is None) ^ (end_before is None): raise click.UsageError("--start-from and --end-before must be provided together.") + if (from_days_ago is None) ^ (to_days_ago is None): + raise click.UsageError("--from-days-ago and --to-days-ago must be provided together.") + + if from_days_ago is not None and to_days_ago is not None: + if start_from or end_before: + raise click.UsageError("Choose either day offsets or explicit dates, not both.") + if from_days_ago <= to_days_ago: + raise click.UsageError("--from-days-ago must be greater than --to-days-ago.") + now = datetime.datetime.now() + start_from = now - datetime.timedelta(days=from_days_ago) + end_before = now - datetime.timedelta(days=to_days_ago) + before_days = 0 + start_time = datetime.datetime.now(datetime.UTC) click.echo(click.style(f"Starting workflow run cleanup at {start_time.isoformat()}.", fg="white")) WorkflowRunCleanup( - days=days, + days=before_days, batch_size=batch_size, start_from=start_from, end_before=end_before, diff --git a/api/migrations/versions/2026_01_16_1715-288345cd01d1_change_workflow_node_execution_run_index.py b/api/migrations/versions/2026_01_16_1715-288345cd01d1_change_workflow_node_execution_run_index.py new file mode 100644 index 0000000000..2e1af0c83f --- /dev/null +++ b/api/migrations/versions/2026_01_16_1715-288345cd01d1_change_workflow_node_execution_run_index.py @@ -0,0 +1,35 @@ +"""change workflow node execution workflow_run index + +Revision ID: 288345cd01d1 +Revises: 3334862ee907 +Create Date: 2026-01-16 17:15:00.000000 + +""" +from alembic import op + + +# revision identifiers, used by Alembic. +revision = "288345cd01d1" +down_revision = "3334862ee907" +branch_labels = None +depends_on = None + + +def upgrade(): + with op.batch_alter_table("workflow_node_executions", schema=None) as batch_op: + batch_op.drop_index("workflow_node_execution_workflow_run_idx") + batch_op.create_index( + "workflow_node_execution_workflow_run_id_idx", + ["workflow_run_id"], + unique=False, + ) + + +def downgrade(): + with op.batch_alter_table("workflow_node_executions", schema=None) as batch_op: + batch_op.drop_index("workflow_node_execution_workflow_run_id_idx") + batch_op.create_index( + "workflow_node_execution_workflow_run_idx", + ["tenant_id", "app_id", "workflow_id", "triggered_from", "workflow_run_id"], + unique=False, + ) diff --git a/api/models/workflow.py b/api/models/workflow.py index 5d92da3fa1..2ff47e87b9 100644 --- a/api/models/workflow.py +++ b/api/models/workflow.py @@ -781,11 +781,7 @@ class WorkflowNodeExecutionModel(Base): # This model is expected to have `offlo return ( PrimaryKeyConstraint("id", name="workflow_node_execution_pkey"), Index( - "workflow_node_execution_workflow_run_idx", - "tenant_id", - "app_id", - "workflow_id", - "triggered_from", + "workflow_node_execution_workflow_run_id_idx", "workflow_run_id", ), Index( diff --git a/api/repositories/api_workflow_node_execution_repository.py b/api/repositories/api_workflow_node_execution_repository.py index fa2c94b623..479eb1ff54 100644 --- a/api/repositories/api_workflow_node_execution_repository.py +++ b/api/repositories/api_workflow_node_execution_repository.py @@ -13,6 +13,8 @@ from collections.abc import Sequence from datetime import datetime from typing import Protocol +from sqlalchemy.orm import Session + from core.workflow.repositories.workflow_node_execution_repository import WorkflowNodeExecutionRepository from models.workflow import WorkflowNodeExecutionModel @@ -130,6 +132,18 @@ class DifyAPIWorkflowNodeExecutionRepository(WorkflowNodeExecutionRepository, Pr """ ... + def count_by_runs(self, session: Session, run_ids: Sequence[str]) -> tuple[int, int]: + """ + Count node executions and offloads for the given workflow run ids. + """ + ... + + def delete_by_runs(self, session: Session, run_ids: Sequence[str]) -> tuple[int, int]: + """ + Delete node executions and offloads for the given workflow run ids. + """ + ... + def delete_executions_by_app( self, tenant_id: str, diff --git a/api/repositories/sqlalchemy_api_workflow_node_execution_repository.py b/api/repositories/sqlalchemy_api_workflow_node_execution_repository.py index 2de3a15d65..4a7c975d2c 100644 --- a/api/repositories/sqlalchemy_api_workflow_node_execution_repository.py +++ b/api/repositories/sqlalchemy_api_workflow_node_execution_repository.py @@ -7,17 +7,15 @@ using SQLAlchemy 2.0 style queries for WorkflowNodeExecutionModel operations. from collections.abc import Sequence from datetime import datetime -from typing import TypedDict, cast +from typing import cast -from sqlalchemy import asc, delete, desc, func, select, tuple_ +from sqlalchemy import asc, delete, desc, func, select from sqlalchemy.engine import CursorResult from sqlalchemy.orm import Session, sessionmaker -from models.enums import WorkflowRunTriggeredFrom from models.workflow import ( WorkflowNodeExecutionModel, WorkflowNodeExecutionOffload, - WorkflowNodeExecutionTriggeredFrom, ) from repositories.api_workflow_node_execution_repository import DifyAPIWorkflowNodeExecutionRepository @@ -49,26 +47,6 @@ class DifyAPISQLAlchemyWorkflowNodeExecutionRepository(DifyAPIWorkflowNodeExecut """ self._session_maker = session_maker - @staticmethod - def _map_run_triggered_from_to_node_triggered_from(triggered_from: str) -> str: - """ - Map workflow run triggered_from values to workflow node execution triggered_from values. - """ - if triggered_from in { - WorkflowRunTriggeredFrom.APP_RUN.value, - WorkflowRunTriggeredFrom.DEBUGGING.value, - WorkflowRunTriggeredFrom.SCHEDULE.value, - WorkflowRunTriggeredFrom.PLUGIN.value, - WorkflowRunTriggeredFrom.WEBHOOK.value, - }: - return WorkflowNodeExecutionTriggeredFrom.WORKFLOW_RUN.value - if triggered_from in { - WorkflowRunTriggeredFrom.RAG_PIPELINE_RUN.value, - WorkflowRunTriggeredFrom.RAG_PIPELINE_DEBUGGING.value, - }: - return WorkflowNodeExecutionTriggeredFrom.RAG_PIPELINE_RUN.value - return "" - def get_node_last_execution( self, tenant_id: str, @@ -316,51 +294,16 @@ class DifyAPISQLAlchemyWorkflowNodeExecutionRepository(DifyAPIWorkflowNodeExecut session.commit() return result.rowcount - class RunContext(TypedDict): - run_id: str - tenant_id: str - app_id: str - workflow_id: str - triggered_from: str - - @staticmethod - def delete_by_runs(session: Session, runs: Sequence[RunContext]) -> tuple[int, int]: + def delete_by_runs(self, session: Session, run_ids: Sequence[str]) -> tuple[int, int]: """ - Delete node executions (and offloads) for the given workflow runs using indexed columns. - - Uses the composite index on (tenant_id, app_id, workflow_id, triggered_from, workflow_run_id) - by filtering on those columns with tuple IN. + Delete node executions (and offloads) for the given workflow runs using workflow_run_id. """ - if not runs: + if not run_ids: return 0, 0 - tuple_values = [ - ( - run["tenant_id"], - run["app_id"], - run["workflow_id"], - DifyAPISQLAlchemyWorkflowNodeExecutionRepository._map_run_triggered_from_to_node_triggered_from( - run["triggered_from"] - ), - run["run_id"], - ) - for run in runs - ] - - node_execution_ids = session.scalars( - select(WorkflowNodeExecutionModel.id).where( - tuple_( - WorkflowNodeExecutionModel.tenant_id, - WorkflowNodeExecutionModel.app_id, - WorkflowNodeExecutionModel.workflow_id, - WorkflowNodeExecutionModel.triggered_from, - WorkflowNodeExecutionModel.workflow_run_id, - ).in_(tuple_values) - ) - ).all() - - if not node_execution_ids: - return 0, 0 + run_ids = list(run_ids) + run_id_filter = WorkflowNodeExecutionModel.workflow_run_id.in_(run_ids) + node_execution_ids = select(WorkflowNodeExecutionModel.id).where(run_id_filter) offloads_deleted = ( cast( @@ -377,55 +320,32 @@ class DifyAPISQLAlchemyWorkflowNodeExecutionRepository(DifyAPIWorkflowNodeExecut node_executions_deleted = ( cast( CursorResult, - session.execute( - delete(WorkflowNodeExecutionModel).where(WorkflowNodeExecutionModel.id.in_(node_execution_ids)) - ), + session.execute(delete(WorkflowNodeExecutionModel).where(run_id_filter)), ).rowcount or 0 ) return node_executions_deleted, offloads_deleted - @staticmethod - def count_by_runs(session: Session, runs: Sequence[RunContext]) -> tuple[int, int]: + def count_by_runs(self, session: Session, run_ids: Sequence[str]) -> tuple[int, int]: """ - Count node executions (and offloads) for the given workflow runs using indexed columns. + Count node executions (and offloads) for the given workflow runs using workflow_run_id. """ - if not runs: + if not run_ids: return 0, 0 - tuple_values = [ - ( - run["tenant_id"], - run["app_id"], - run["workflow_id"], - DifyAPISQLAlchemyWorkflowNodeExecutionRepository._map_run_triggered_from_to_node_triggered_from( - run["triggered_from"] - ), - run["run_id"], - ) - for run in runs - ] - tuple_filter = tuple_( - WorkflowNodeExecutionModel.tenant_id, - WorkflowNodeExecutionModel.app_id, - WorkflowNodeExecutionModel.workflow_id, - WorkflowNodeExecutionModel.triggered_from, - WorkflowNodeExecutionModel.workflow_run_id, - ).in_(tuple_values) + run_ids = list(run_ids) + run_id_filter = WorkflowNodeExecutionModel.workflow_run_id.in_(run_ids) node_executions_count = ( - session.scalar(select(func.count()).select_from(WorkflowNodeExecutionModel).where(tuple_filter)) or 0 + session.scalar(select(func.count()).select_from(WorkflowNodeExecutionModel).where(run_id_filter)) or 0 ) + node_execution_ids = select(WorkflowNodeExecutionModel.id).where(run_id_filter) offloads_count = ( session.scalar( select(func.count()) .select_from(WorkflowNodeExecutionOffload) - .join( - WorkflowNodeExecutionModel, - WorkflowNodeExecutionOffload.node_execution_id == WorkflowNodeExecutionModel.id, - ) - .where(tuple_filter) + .where(WorkflowNodeExecutionOffload.node_execution_id.in_(node_execution_ids)) ) or 0 ) diff --git a/api/services/retention/workflow_run/clear_free_plan_expired_workflow_run_logs.py b/api/services/retention/workflow_run/clear_free_plan_expired_workflow_run_logs.py index 2213169510..c3e0dce399 100644 --- a/api/services/retention/workflow_run/clear_free_plan_expired_workflow_run_logs.py +++ b/api/services/retention/workflow_run/clear_free_plan_expired_workflow_run_logs.py @@ -10,9 +10,7 @@ from enums.cloud_plan import CloudPlan from extensions.ext_database import db from models.workflow import WorkflowRun from repositories.api_workflow_run_repository import APIWorkflowRunRepository -from repositories.sqlalchemy_api_workflow_node_execution_repository import ( - DifyAPISQLAlchemyWorkflowNodeExecutionRepository, -) +from repositories.factory import DifyAPIRepositoryFactory from repositories.sqlalchemy_workflow_trigger_log_repository import SQLAlchemyWorkflowTriggerLogRepository from services.billing_service import BillingService, SubscriptionPlan @@ -92,9 +90,12 @@ class WorkflowRunCleanup: paid_or_skipped = len(run_rows) - len(free_runs) if not free_runs: + skipped_message = ( + f"[batch #{batch_index}] skipped (no sandbox runs in batch, {paid_or_skipped} paid/unknown)" + ) click.echo( click.style( - f"[batch #{batch_index}] skipped (no sandbox runs in batch, {paid_or_skipped} paid/unknown)", + skipped_message, fg="yellow", ) ) @@ -255,21 +256,6 @@ class WorkflowRunCleanup: trigger_repo = SQLAlchemyWorkflowTriggerLogRepository(session) return trigger_repo.count_by_run_ids(run_ids) - @staticmethod - def _build_run_contexts( - runs: Sequence[WorkflowRun], - ) -> list[DifyAPISQLAlchemyWorkflowNodeExecutionRepository.RunContext]: - return [ - { - "run_id": run.id, - "tenant_id": run.tenant_id, - "app_id": run.app_id, - "workflow_id": run.workflow_id, - "triggered_from": run.triggered_from, - } - for run in runs - ] - @staticmethod def _empty_related_counts() -> dict[str, int]: return { @@ -293,9 +279,15 @@ class WorkflowRunCleanup: ) def _count_node_executions(self, session: Session, runs: Sequence[WorkflowRun]) -> tuple[int, int]: - run_contexts = self._build_run_contexts(runs) - return DifyAPISQLAlchemyWorkflowNodeExecutionRepository.count_by_runs(session, run_contexts) + run_ids = [run.id for run in runs] + repo = DifyAPIRepositoryFactory.create_api_workflow_node_execution_repository( + session_maker=sessionmaker(bind=session.get_bind(), expire_on_commit=False) + ) + return repo.count_by_runs(session, run_ids) def _delete_node_executions(self, session: Session, runs: Sequence[WorkflowRun]) -> tuple[int, int]: - run_contexts = self._build_run_contexts(runs) - return DifyAPISQLAlchemyWorkflowNodeExecutionRepository.delete_by_runs(session, run_contexts) + run_ids = [run.id for run in runs] + repo = DifyAPIRepositoryFactory.create_api_workflow_node_execution_repository( + session_maker=sessionmaker(bind=session.get_bind(), expire_on_commit=False) + ) + return repo.delete_by_runs(session, run_ids)