diff --git a/api/.env.example b/api/.env.example index 99cd2ba558..bf10aa7979 100644 --- a/api/.env.example +++ b/api/.env.example @@ -574,6 +574,7 @@ ENABLE_CLEAN_UNUSED_DATASETS_TASK=false ENABLE_CREATE_TIDB_SERVERLESS_TASK=false ENABLE_UPDATE_TIDB_SERVERLESS_STATUS_TASK=false ENABLE_CLEAN_MESSAGES=false +ENABLE_WORKFLOW_RUN_CLEANUP_TASK=false ENABLE_MAIL_CLEAN_DOCUMENT_NOTIFY_TASK=false ENABLE_DATASETS_QUEUE_MONITOR=false ENABLE_CHECK_UPGRADABLE_PLUGIN_TASK=true diff --git a/api/commands.py b/api/commands.py index a8d89ac200..d12941068e 100644 --- a/api/commands.py +++ b/api/commands.py @@ -1,4 +1,5 @@ import base64 +import datetime import json import logging import secrets @@ -41,6 +42,7 @@ from models.provider_ids import DatasourceProviderID, ToolProviderID from models.source import DataSourceApiKeyAuthBinding, DataSourceOauthBinding from models.tools import ToolOAuthSystemClient from services.account_service import AccountService, RegisterService, TenantService +from services.clear_free_plan_expired_workflow_run_logs import WorkflowRunCleanup from services.clear_free_plan_tenant_expired_logs import ClearFreePlanTenantExpiredLogs from services.plugin.data_migration import PluginDataMigration from services.plugin.plugin_migration import PluginMigration @@ -852,6 +854,61 @@ def clear_free_plan_tenant_expired_logs(days: int, batch: int, tenant_ids: list[ click.echo(click.style("Clear free plan tenant expired logs completed.", fg="green")) +@click.command("clean-workflow-runs", help="Clean expired workflow runs and related data for free tenants.") +@click.option("--days", default=30, show_default=True, help="Delete workflow runs created before N days ago.") +@click.option("--batch-size", default=200, show_default=True, help="Batch size for selecting workflow runs.") +@click.option( + "--start-after", + type=click.DateTime(formats=["%Y-%m-%d", "%Y-%m-%dT%H:%M:%S"]), + default=None, + help="Optional lower bound (inclusive) for created_at; must be paired with --end-before.", +) +@click.option( + "--end-before", + type=click.DateTime(formats=["%Y-%m-%d", "%Y-%m-%dT%H:%M:%S"]), + default=None, + help="Optional upper bound (exclusive) for created_at; must be paired with --start-after.", +) +@click.option( + "--dry-run", + is_flag=True, + help="Preview cleanup results without deleting any workflow run data.", +) +def clean_workflow_runs( + days: int, + batch_size: int, + start_after: datetime.datetime | None, + end_before: datetime.datetime | None, + dry_run: bool, +): + """ + Clean workflow runs and related workflow data for free tenants. + """ + if (start_after is None) ^ (end_before is None): + raise click.UsageError("--start-after and --end-before must be provided together.") + + start_time = datetime.datetime.now(datetime.UTC) + click.echo(click.style(f"Starting workflow run cleanup at {start_time.isoformat()}.", fg="white")) + + WorkflowRunCleanup( + days=days, + batch_size=batch_size, + start_after=start_after, + end_before=end_before, + dry_run=dry_run, + ).run() + + end_time = datetime.datetime.now(datetime.UTC) + elapsed = end_time - start_time + click.echo( + click.style( + f"Workflow run cleanup completed. start={start_time.isoformat()} " + f"end={end_time.isoformat()} duration={elapsed}", + fg="green", + ) + ) + + @click.option("-f", "--force", is_flag=True, help="Skip user confirmation and force the command to execute.") @click.command("clear-orphaned-file-records", help="Clear orphaned file records.") def clear_orphaned_file_records(force: bool): diff --git a/api/configs/feature/__init__.py b/api/configs/feature/__init__.py index 43dddbd011..b854293367 100644 --- a/api/configs/feature/__init__.py +++ b/api/configs/feature/__init__.py @@ -1096,6 +1096,10 @@ class CeleryScheduleTasksConfig(BaseSettings): description="Enable clean messages task", default=False, ) + ENABLE_WORKFLOW_RUN_CLEANUP_TASK: bool = Field( + description="Enable scheduled workflow run cleanup task", + default=False, + ) ENABLE_MAIL_CLEAN_DOCUMENT_NOTIFY_TASK: bool = Field( description="Enable mail clean document notify task", default=False, diff --git a/api/extensions/ext_celery.py b/api/extensions/ext_celery.py index 5cf4984709..c19763372d 100644 --- a/api/extensions/ext_celery.py +++ b/api/extensions/ext_celery.py @@ -160,6 +160,13 @@ def init_app(app: DifyApp) -> Celery: "task": "schedule.clean_workflow_runlogs_precise.clean_workflow_runlogs_precise", "schedule": crontab(minute="0", hour="2"), } + if dify_config.ENABLE_WORKFLOW_RUN_CLEANUP_TASK: + # for saas only + imports.append("schedule.clean_workflow_runs_task") + beat_schedule["clean_workflow_runs_task"] = { + "task": "schedule.clean_workflow_runs_task.clean_workflow_runs_task", + "schedule": crontab(minute="0", hour="0"), + } if dify_config.ENABLE_WORKFLOW_SCHEDULE_POLLER_TASK: imports.append("schedule.workflow_schedule_task") beat_schedule["workflow_schedule_task"] = { diff --git a/api/extensions/ext_commands.py b/api/extensions/ext_commands.py index 71a63168a5..6f6322827c 100644 --- a/api/extensions/ext_commands.py +++ b/api/extensions/ext_commands.py @@ -4,6 +4,7 @@ from dify_app import DifyApp def init_app(app: DifyApp): from commands import ( add_qdrant_index, + clean_workflow_runs, cleanup_orphaned_draft_variables, clear_free_plan_tenant_expired_logs, clear_orphaned_file_records, @@ -54,6 +55,7 @@ def init_app(app: DifyApp): setup_datasource_oauth_client, transform_datasource_credentials, install_rag_pipeline_plugins, + clean_workflow_runs, ] for cmd in cmds_to_register: app.cli.add_command(cmd) diff --git a/api/migrations/versions/2025_12_18_1630-905527cc8fd3_.py b/api/migrations/versions/2025_12_18_1630-905527cc8fd3_.py new file mode 100644 index 0000000000..e8b20674e8 --- /dev/null +++ b/api/migrations/versions/2025_12_18_1630-905527cc8fd3_.py @@ -0,0 +1,32 @@ +"""add workflow_run_created_at_id_idx + +Revision ID: 905527cc8fd3 +Revises: 03ea244985ce +Create Date: 2025-12-18 16:30:02.462084 + +""" +from alembic import op +import models as models +import sqlalchemy as sa +from sqlalchemy.dialects import postgresql + +# revision identifiers, used by Alembic. +revision = '905527cc8fd3' +down_revision = '03ea244985ce' +branch_labels = None +depends_on = None + + +def upgrade(): + # ### commands auto generated by Alembic - please adjust! ### + with op.batch_alter_table('workflow_runs', schema=None) as batch_op: + batch_op.create_index('workflow_run_created_at_id_idx', ['created_at', 'id'], unique=False) + + # ### end Alembic commands ### + + +def downgrade(): + # ### commands auto generated by Alembic - please adjust! ### + with op.batch_alter_table('workflow_runs', schema=None) as batch_op: + batch_op.drop_index('workflow_run_created_at_id_idx') + # ### end Alembic commands ### diff --git a/api/models/workflow.py b/api/models/workflow.py index 853d5afefc..90c99d49d6 100644 --- a/api/models/workflow.py +++ b/api/models/workflow.py @@ -595,6 +595,7 @@ class WorkflowRun(Base): __table_args__ = ( sa.PrimaryKeyConstraint("id", name="workflow_run_pkey"), sa.Index("workflow_run_triggerd_from_idx", "tenant_id", "app_id", "triggered_from"), + sa.Index("workflow_run_created_at_id_idx", "created_at", "id"), ) id: Mapped[str] = mapped_column(StringUUID, default=lambda: str(uuid4())) diff --git a/api/repositories/api_workflow_run_repository.py b/api/repositories/api_workflow_run_repository.py index fd547c78ba..22c3ea7130 100644 --- a/api/repositories/api_workflow_run_repository.py +++ b/api/repositories/api_workflow_run_repository.py @@ -34,10 +34,12 @@ Example: ``` """ -from collections.abc import Sequence +from collections.abc import Callable, Sequence from datetime import datetime from typing import Protocol +from sqlalchemy.orm import Session + from core.workflow.entities.pause_reason import PauseReason from core.workflow.repositories.workflow_execution_repository import WorkflowExecutionRepository from libs.infinite_scroll_pagination import InfiniteScrollPagination @@ -253,6 +255,30 @@ class APIWorkflowRunRepository(WorkflowExecutionRepository, Protocol): """ ... + def get_runs_batch_by_time_range( + self, + start_after: datetime | None, + end_before: datetime, + last_seen: tuple[datetime, str] | None, + batch_size: int, + ) -> Sequence[WorkflowRun]: + """ + Fetch a batch of workflow runs within a time window using keyset pagination. + """ + ... + + def delete_runs_with_related( + self, + runs: Sequence[WorkflowRun], + delete_node_executions: Callable[[Session, Sequence[WorkflowRun]], tuple[int, int]] | None = None, + delete_trigger_logs: Callable[[Session, Sequence[str]], int] | None = None, + ) -> dict[str, int]: + """ + Delete workflow runs and their related records (node executions, offloads, app logs, + trigger logs, pauses, pause reasons). + """ + ... + def create_workflow_pause( self, workflow_run_id: str, diff --git a/api/repositories/sqlalchemy_api_workflow_node_execution_repository.py b/api/repositories/sqlalchemy_api_workflow_node_execution_repository.py index 7e2173acdd..004bd7cad9 100644 --- a/api/repositories/sqlalchemy_api_workflow_node_execution_repository.py +++ b/api/repositories/sqlalchemy_api_workflow_node_execution_repository.py @@ -7,13 +7,13 @@ using SQLAlchemy 2.0 style queries for WorkflowNodeExecutionModel operations. from collections.abc import Sequence from datetime import datetime -from typing import cast +from typing import TypedDict, cast -from sqlalchemy import asc, delete, desc, select +from sqlalchemy import asc, delete, desc, select, tuple_ from sqlalchemy.engine import CursorResult from sqlalchemy.orm import Session, sessionmaker -from models.workflow import WorkflowNodeExecutionModel +from models.workflow import WorkflowNodeExecutionModel, WorkflowNodeExecutionOffload from repositories.api_workflow_node_execution_repository import DifyAPIWorkflowNodeExecutionRepository @@ -290,3 +290,64 @@ class DifyAPISQLAlchemyWorkflowNodeExecutionRepository(DifyAPIWorkflowNodeExecut result = cast(CursorResult, session.execute(stmt)) session.commit() return result.rowcount + + class RunContext(TypedDict): + run_id: str + tenant_id: str + app_id: str + workflow_id: str + triggered_from: str + + @staticmethod + def delete_by_runs(session: Session, runs: Sequence[RunContext]) -> tuple[int, int]: + """ + Delete node executions (and offloads) for the given workflow runs using indexed columns. + + Uses the composite index on (tenant_id, app_id, workflow_id, triggered_from, workflow_run_id) + by filtering on those columns with tuple IN. + """ + if not runs: + return 0, 0 + + tuple_values = [ + (run["tenant_id"], run["app_id"], run["workflow_id"], run["triggered_from"], run["run_id"]) for run in runs + ] + + node_execution_ids = session.scalars( + select(WorkflowNodeExecutionModel.id).where( + tuple_( + WorkflowNodeExecutionModel.tenant_id, + WorkflowNodeExecutionModel.app_id, + WorkflowNodeExecutionModel.workflow_id, + WorkflowNodeExecutionModel.triggered_from, + WorkflowNodeExecutionModel.workflow_run_id, + ).in_(tuple_values) + ) + ).all() + + if not node_execution_ids: + return 0, 0 + + offloads_deleted = ( + cast( + CursorResult, + session.execute( + delete(WorkflowNodeExecutionOffload).where( + WorkflowNodeExecutionOffload.node_execution_id.in_(node_execution_ids) + ) + ), + ).rowcount + or 0 + ) + + node_executions_deleted = ( + cast( + CursorResult, + session.execute( + delete(WorkflowNodeExecutionModel).where(WorkflowNodeExecutionModel.id.in_(node_execution_ids)) + ), + ).rowcount + or 0 + ) + + return node_executions_deleted, offloads_deleted diff --git a/api/repositories/sqlalchemy_api_workflow_run_repository.py b/api/repositories/sqlalchemy_api_workflow_run_repository.py index b172c6a3ac..f081124a80 100644 --- a/api/repositories/sqlalchemy_api_workflow_run_repository.py +++ b/api/repositories/sqlalchemy_api_workflow_run_repository.py @@ -21,7 +21,7 @@ Implementation Notes: import logging import uuid -from collections.abc import Sequence +from collections.abc import Callable, Sequence from datetime import datetime from decimal import Decimal from typing import Any, cast @@ -40,8 +40,14 @@ from libs.infinite_scroll_pagination import InfiniteScrollPagination from libs.time_parser import get_time_threshold from libs.uuid_utils import uuidv7 from models.enums import WorkflowRunTriggeredFrom -from models.workflow import WorkflowPause as WorkflowPauseModel -from models.workflow import WorkflowPauseReason, WorkflowRun +from models.workflow import ( + WorkflowAppLog, + WorkflowPauseReason, + WorkflowRun, +) +from models.workflow import ( + WorkflowPause as WorkflowPauseModel, +) from repositories.api_workflow_run_repository import APIWorkflowRunRepository from repositories.entities.workflow_pause import WorkflowPauseEntity from repositories.types import ( @@ -314,6 +320,102 @@ class DifyAPISQLAlchemyWorkflowRunRepository(APIWorkflowRunRepository): logger.info("Total deleted %s workflow runs for app %s", total_deleted, app_id) return total_deleted + def get_runs_batch_by_time_range( + self, + start_after: datetime | None, + end_before: datetime, + last_seen: tuple[datetime, str] | None, + batch_size: int, + ) -> Sequence[WorkflowRun]: + with self._session_maker() as session: + stmt = ( + select(WorkflowRun) + .where( + WorkflowRun.created_at < end_before, + WorkflowRun.status.in_( + [ + WorkflowExecutionStatus.SUCCEEDED.value, + WorkflowExecutionStatus.FAILED.value, + WorkflowExecutionStatus.STOPPED.value, + WorkflowExecutionStatus.PARTIAL_SUCCEEDED.value, + ] + ), + ) + .order_by(WorkflowRun.created_at.asc(), WorkflowRun.id.asc()) + .limit(batch_size) + ) + + if start_after: + stmt = stmt.where(WorkflowRun.created_at >= start_after) + + if last_seen: + stmt = stmt.where( + or_( + WorkflowRun.created_at > last_seen[0], + and_(WorkflowRun.created_at == last_seen[0], WorkflowRun.id > last_seen[1]), + ) + ) + + return session.scalars(stmt).all() + + def delete_runs_with_related( + self, + runs: Sequence[WorkflowRun], + delete_node_executions: Callable[[Session, Sequence[WorkflowRun]], tuple[int, int]] | None = None, + delete_trigger_logs: Callable[[Session, Sequence[str]], int] | None = None, + ) -> dict[str, int]: + if not runs: + return { + "runs": 0, + "node_executions": 0, + "offloads": 0, + "app_logs": 0, + "trigger_logs": 0, + "pauses": 0, + "pause_reasons": 0, + } + + with self._session_maker() as session: + run_ids = [run.id for run in runs] + if delete_node_executions: + node_executions_deleted, offloads_deleted = delete_node_executions(session, runs) + else: + node_executions_deleted, offloads_deleted = 0, 0 + + app_logs_result = session.execute(delete(WorkflowAppLog).where(WorkflowAppLog.workflow_run_id.in_(run_ids))) + app_logs_deleted = cast(CursorResult, app_logs_result).rowcount or 0 + + pause_ids = session.scalars( + select(WorkflowPauseModel.id).where(WorkflowPauseModel.workflow_run_id.in_(run_ids)) + ).all() + pause_reasons_deleted = 0 + pauses_deleted = 0 + + if pause_ids: + pause_reasons_result = session.execute( + delete(WorkflowPauseReason).where(WorkflowPauseReason.pause_id.in_(pause_ids)) + ) + pause_reasons_deleted = cast(CursorResult, pause_reasons_result).rowcount or 0 + pauses_result = session.execute(delete(WorkflowPauseModel).where(WorkflowPauseModel.id.in_(pause_ids))) + pauses_deleted = cast(CursorResult, pauses_result).rowcount or 0 + + trigger_logs_deleted = delete_trigger_logs(session, run_ids) if delete_trigger_logs else 0 + + runs_result = session.execute(delete(WorkflowRun).where(WorkflowRun.id.in_(run_ids))) + runs_deleted = cast(CursorResult, runs_result).rowcount or 0 + + session.commit() + + return { + "runs": runs_deleted, + "node_executions": node_executions_deleted, + "offloads": offloads_deleted, + "app_logs": app_logs_deleted, + "trigger_logs": trigger_logs_deleted, + "pauses": pauses_deleted, + "pause_reasons": pause_reasons_deleted, + } + def create_workflow_pause( self, workflow_run_id: str, diff --git a/api/repositories/sqlalchemy_workflow_trigger_log_repository.py b/api/repositories/sqlalchemy_workflow_trigger_log_repository.py index 0d67e286b0..d01c35e5ab 100644 --- a/api/repositories/sqlalchemy_workflow_trigger_log_repository.py +++ b/api/repositories/sqlalchemy_workflow_trigger_log_repository.py @@ -4,8 +4,10 @@ SQLAlchemy implementation of WorkflowTriggerLogRepository. from collections.abc import Sequence from datetime import UTC, datetime, timedelta +from typing import cast -from sqlalchemy import and_, select +from sqlalchemy import and_, delete, select +from sqlalchemy.engine import CursorResult from sqlalchemy.orm import Session from models.enums import WorkflowTriggerStatus @@ -84,3 +86,19 @@ class SQLAlchemyWorkflowTriggerLogRepository(WorkflowTriggerLogRepository): ) return list(self.session.scalars(query).all()) + + def delete_by_run_ids(self, run_ids: Sequence[str]) -> int: + """ + Delete trigger logs associated with the given workflow run ids. + + Args: + run_ids: Collection of workflow run identifiers. + + Returns: + Number of rows deleted. + """ + if not run_ids: + return 0 + + result = self.session.execute(delete(WorkflowTriggerLog).where(WorkflowTriggerLog.workflow_run_id.in_(run_ids))) + return cast(CursorResult, result).rowcount or 0 diff --git a/api/schedule/clean_workflow_runs_task.py b/api/schedule/clean_workflow_runs_task.py new file mode 100644 index 0000000000..540a46f33d --- /dev/null +++ b/api/schedule/clean_workflow_runs_task.py @@ -0,0 +1,43 @@ +from datetime import UTC, datetime + +import click + +import app +from configs import dify_config +from services.clear_free_plan_expired_workflow_run_logs import WorkflowRunCleanup + + +@app.celery.task(queue="retention") +def clean_workflow_runs_task() -> None: + """ + Scheduled cleanup for workflow runs and related records (sandbox tenants only). + """ + click.echo( + click.style( + ( + "Scheduled workflow run cleanup starting: " + f"cutoff={dify_config.SANDBOX_EXPIRED_RECORDS_RETENTION_DAYS} days, " + f"batch={dify_config.SANDBOX_EXPIRED_RECORDS_CLEAN_BATCH_SIZE}" + ), + fg="green", + ) + ) + + start_time = datetime.now(UTC) + + WorkflowRunCleanup( + days=dify_config.SANDBOX_EXPIRED_RECORDS_RETENTION_DAYS, + batch_size=dify_config.SANDBOX_EXPIRED_RECORDS_CLEAN_BATCH_SIZE, + start_after=None, + end_before=None, + ).run() + + end_time = datetime.now(UTC) + elapsed = end_time - start_time + click.echo( + click.style( + f"Scheduled workflow run cleanup finished. start={start_time.isoformat()} " + f"end={end_time.isoformat()} duration={elapsed}", + fg="green", + ) + ) diff --git a/api/services/clear_free_plan_expired_workflow_run_logs.py b/api/services/clear_free_plan_expired_workflow_run_logs.py new file mode 100644 index 0000000000..5b01a9caf6 --- /dev/null +++ b/api/services/clear_free_plan_expired_workflow_run_logs.py @@ -0,0 +1,263 @@ +import datetime +import logging +from collections.abc import Iterable, Sequence + +import click +from sqlalchemy.orm import Session, sessionmaker + +from configs import dify_config +from enums.cloud_plan import CloudPlan +from extensions.ext_database import db +from models.workflow import WorkflowRun +from repositories.api_workflow_run_repository import APIWorkflowRunRepository +from repositories.sqlalchemy_api_workflow_node_execution_repository import ( + DifyAPISQLAlchemyWorkflowNodeExecutionRepository, +) +from repositories.sqlalchemy_workflow_trigger_log_repository import SQLAlchemyWorkflowTriggerLogRepository +from services.billing_service import BillingService, SubscriptionPlan + +logger = logging.getLogger(__name__) + + +class WorkflowRunCleanup: + def __init__( + self, + days: int, + batch_size: int, + start_after: datetime.datetime | None = None, + end_before: datetime.datetime | None = None, + workflow_run_repo: APIWorkflowRunRepository | None = None, + dry_run: bool = False, + ): + if (start_after is None) ^ (end_before is None): + raise ValueError("start_after and end_before must be both set or both omitted.") + + computed_cutoff = datetime.datetime.now() - datetime.timedelta(days=days) + self.window_start = start_after + self.window_end = end_before or computed_cutoff + + if self.window_start and self.window_end <= self.window_start: + raise ValueError("end_before must be greater than start_after.") + + if batch_size <= 0: + raise ValueError("batch_size must be greater than 0.") + + self.batch_size = batch_size + self.billing_cache: dict[str, SubscriptionPlan | None] = {} + self._cleanup_whitelist: set[str] | None = None + self.dry_run = dry_run + self.free_plan_grace_period_days = dify_config.SANDBOX_EXPIRED_RECORDS_CLEAN_GRACEFUL_PERIOD + self.workflow_run_repo: APIWorkflowRunRepository + if workflow_run_repo: + self.workflow_run_repo = workflow_run_repo + else: + # Lazy import to avoid circular dependencies during module import + from repositories.factory import DifyAPIRepositoryFactory + + session_maker = sessionmaker(bind=db.engine, expire_on_commit=False) + self.workflow_run_repo = DifyAPIRepositoryFactory.create_api_workflow_run_repository(session_maker) + + def run(self) -> None: + click.echo( + click.style( + f"{'Inspecting' if self.dry_run else 'Cleaning'} workflow runs " + f"{'between ' + self.window_start.isoformat() + ' and ' if self.window_start else 'before '}" + f"{self.window_end.isoformat()} (batch={self.batch_size})", + fg="white", + ) + ) + if self.dry_run: + click.echo(click.style("Dry run mode enabled. No data will be deleted.", fg="yellow")) + + total_runs_deleted = 0 + total_runs_targeted = 0 + batch_index = 0 + last_seen: tuple[datetime.datetime, str] | None = None + + while True: + run_rows = self.workflow_run_repo.get_runs_batch_by_time_range( + start_after=self.window_start, + end_before=self.window_end, + last_seen=last_seen, + batch_size=self.batch_size, + ) + if not run_rows: + break + + batch_index += 1 + last_seen = (run_rows[-1].created_at, run_rows[-1].id) + tenant_ids = {row.tenant_id for row in run_rows} + free_tenants = self._filter_free_tenants(tenant_ids) + free_runs = [row for row in run_rows if row.tenant_id in free_tenants] + paid_or_skipped = len(run_rows) - len(free_runs) + + if not free_runs: + click.echo( + click.style( + f"[batch #{batch_index}] skipped (no sandbox runs in batch, {paid_or_skipped} paid/unknown)", + fg="yellow", + ) + ) + continue + + total_runs_targeted += len(free_runs) + + if self.dry_run: + sample_ids = ", ".join(run.id for run in free_runs[:5]) + click.echo( + click.style( + f"[batch #{batch_index}] would delete {len(free_runs)} runs " + f"(sample ids: {sample_ids}) and skip {paid_or_skipped} paid/unknown", + fg="yellow", + ) + ) + continue + + try: + counts = self.workflow_run_repo.delete_runs_with_related( + free_runs, + delete_node_executions=self._delete_node_executions, + delete_trigger_logs=self._delete_trigger_logs, + ) + except Exception: + logger.exception("Failed to delete workflow runs batch ending at %s", last_seen[0]) + raise + + total_runs_deleted += counts["runs"] + click.echo( + click.style( + f"[batch #{batch_index}] deleted runs: {counts['runs']} " + f"(nodes {counts['node_executions']}, offloads {counts['offloads']}, " + f"app_logs {counts['app_logs']}, trigger_logs {counts['trigger_logs']}, " + f"pauses {counts['pauses']}, pause_reasons {counts['pause_reasons']}); " + f"skipped {paid_or_skipped} paid/unknown", + fg="green", + ) + ) + + if self.dry_run: + if self.window_start: + summary_message = ( + f"Dry run complete. Would delete {total_runs_targeted} workflow runs " + f"between {self.window_start.isoformat()} and {self.window_end.isoformat()}" + ) + else: + summary_message = ( + f"Dry run complete. Would delete {total_runs_targeted} workflow runs " + f"before {self.window_end.isoformat()}" + ) + summary_color = "yellow" + else: + if self.window_start: + summary_message = ( + f"Cleanup complete. Deleted {total_runs_deleted} workflow runs " + f"between {self.window_start.isoformat()} and {self.window_end.isoformat()}" + ) + else: + summary_message = ( + f"Cleanup complete. Deleted {total_runs_deleted} workflow runs before {self.window_end.isoformat()}" + ) + summary_color = "white" + + click.echo(click.style(summary_message, fg=summary_color)) + + def _filter_free_tenants(self, tenant_ids: Iterable[str]) -> set[str]: + tenant_id_list = list(tenant_ids) + + if not dify_config.BILLING_ENABLED: + return set(tenant_id_list) + + if not tenant_id_list: + return set() + + cleanup_whitelist = self._get_cleanup_whitelist() + + uncached_tenants = [tenant_id for tenant_id in tenant_id_list if tenant_id not in self.billing_cache] + + if uncached_tenants: + try: + bulk_info = BillingService.get_plan_bulk(uncached_tenants) + except Exception: + bulk_info = {} + logger.exception("Failed to fetch billing plans in bulk for tenants: %s", uncached_tenants) + + for tenant_id in uncached_tenants: + info = bulk_info.get(tenant_id) + if info is None: + logger.warning("Missing billing info for tenant %s in bulk resp; treating as non-free", tenant_id) + self.billing_cache[tenant_id] = info + + eligible_free_tenants: set[str] = set() + for tenant_id in tenant_id_list: + if tenant_id in cleanup_whitelist: + continue + + info = self.billing_cache.get(tenant_id) + if not info: + continue + + if info.get("plan") != CloudPlan.SANDBOX: + continue + + if self._is_within_grace_period(tenant_id, info): + continue + + eligible_free_tenants.add(tenant_id) + + return eligible_free_tenants + + def _expiration_datetime(self, tenant_id: str, expiration_value: int) -> datetime.datetime | None: + if expiration_value < 0: + return None + + try: + return datetime.datetime.fromtimestamp(expiration_value, datetime.UTC) + except (OverflowError, OSError, ValueError): + logger.exception("Failed to parse expiration timestamp for tenant %s", tenant_id) + return None + + def _is_within_grace_period(self, tenant_id: str, info: SubscriptionPlan) -> bool: + if self.free_plan_grace_period_days <= 0: + return False + + expiration_value = info.get("expiration_date", -1) + expiration_at = self._expiration_datetime(tenant_id, expiration_value) + if expiration_at is None: + return False + + grace_deadline = expiration_at + datetime.timedelta(days=self.free_plan_grace_period_days) + return datetime.datetime.now(datetime.UTC) < grace_deadline + + def _get_cleanup_whitelist(self) -> set[str]: + if self._cleanup_whitelist is not None: + return self._cleanup_whitelist + + if not dify_config.BILLING_ENABLED: + self._cleanup_whitelist = set() + return self._cleanup_whitelist + + try: + whitelist_ids = BillingService.get_expired_subscription_cleanup_whitelist() + except Exception: + logger.exception("Failed to fetch cleanup whitelist from billing service") + whitelist_ids = [] + + self._cleanup_whitelist = set(whitelist_ids) + return self._cleanup_whitelist + + def _delete_trigger_logs(self, session: Session, run_ids: Sequence[str]) -> int: + trigger_repo = SQLAlchemyWorkflowTriggerLogRepository(session) + return trigger_repo.delete_by_run_ids(run_ids) + + def _delete_node_executions(self, session: Session, runs: Sequence[WorkflowRun]) -> tuple[int, int]: + run_contexts: list[DifyAPISQLAlchemyWorkflowNodeExecutionRepository.RunContext] = [ + { + "run_id": run.id, + "tenant_id": run.tenant_id, + "app_id": run.app_id, + "workflow_id": run.workflow_id, + "triggered_from": run.triggered_from, + } + for run in runs + ] + return DifyAPISQLAlchemyWorkflowNodeExecutionRepository.delete_by_runs(session, run_contexts) diff --git a/api/tests/unit_tests/repositories/test_sqlalchemy_api_workflow_run_repository.py b/api/tests/unit_tests/repositories/test_sqlalchemy_api_workflow_run_repository.py index 0c34676252..ef3ac29519 100644 --- a/api/tests/unit_tests/repositories/test_sqlalchemy_api_workflow_run_repository.py +++ b/api/tests/unit_tests/repositories/test_sqlalchemy_api_workflow_run_repository.py @@ -4,6 +4,7 @@ from datetime import UTC, datetime from unittest.mock import Mock, patch import pytest +from sqlalchemy.dialects import postgresql from sqlalchemy.orm import Session, sessionmaker from core.workflow.enums import WorkflowExecutionStatus @@ -104,6 +105,42 @@ class TestDifyAPISQLAlchemyWorkflowRunRepository: return pause +class TestGetRunsBatchByTimeRange(TestDifyAPISQLAlchemyWorkflowRunRepository): + def test_get_runs_batch_by_time_range_filters_terminal_statuses( + self, repository: DifyAPISQLAlchemyWorkflowRunRepository, mock_session: Mock + ): + scalar_result = Mock() + scalar_result.all.return_value = [] + mock_session.scalars.return_value = scalar_result + + repository.get_runs_batch_by_time_range( + start_after=None, + end_before=datetime(2024, 1, 1), + last_seen=None, + batch_size=50, + ) + + stmt = mock_session.scalars.call_args[0][0] + compiled_sql = str( + stmt.compile( + dialect=postgresql.dialect(), + compile_kwargs={"literal_binds": True}, + ) + ) + + assert "workflow_runs.status" in compiled_sql + for status in ( + WorkflowExecutionStatus.SUCCEEDED, + WorkflowExecutionStatus.FAILED, + WorkflowExecutionStatus.STOPPED, + WorkflowExecutionStatus.PARTIAL_SUCCEEDED, + ): + assert f"'{status.value}'" in compiled_sql + + assert "'running'" not in compiled_sql + assert "'paused'" not in compiled_sql + + class TestCreateWorkflowPause(TestDifyAPISQLAlchemyWorkflowRunRepository): """Test create_workflow_pause method.""" @@ -181,6 +218,34 @@ class TestCreateWorkflowPause(TestDifyAPISQLAlchemyWorkflowRunRepository): ) +class TestDeleteRunsWithRelated(TestDifyAPISQLAlchemyWorkflowRunRepository): + def test_uses_trigger_log_repository(self, repository: DifyAPISQLAlchemyWorkflowRunRepository, mock_session: Mock): + node_ids_result = Mock() + node_ids_result.all.return_value = [] + pause_ids_result = Mock() + pause_ids_result.all.return_value = [] + mock_session.scalars.side_effect = [node_ids_result, pause_ids_result] + + # app_logs delete, runs delete + mock_session.execute.side_effect = [Mock(rowcount=0), Mock(rowcount=1)] + + fake_trigger_repo = Mock() + fake_trigger_repo.delete_by_run_ids.return_value = 3 + + run = Mock(id="run-1", tenant_id="t1", app_id="a1", workflow_id="w1", triggered_from="tf") + counts = repository.delete_runs_with_related( + [run], + delete_node_executions=lambda session, runs: (2, 1), + delete_trigger_logs=lambda session, run_ids: fake_trigger_repo.delete_by_run_ids(run_ids), + ) + + fake_trigger_repo.delete_by_run_ids.assert_called_once_with(["run-1"]) + assert counts["node_executions"] == 2 + assert counts["offloads"] == 1 + assert counts["trigger_logs"] == 3 + assert counts["runs"] == 1 + + class TestResumeWorkflowPause(TestDifyAPISQLAlchemyWorkflowRunRepository): """Test resume_workflow_pause method.""" diff --git a/api/tests/unit_tests/repositories/test_sqlalchemy_workflow_trigger_log_repository.py b/api/tests/unit_tests/repositories/test_sqlalchemy_workflow_trigger_log_repository.py new file mode 100644 index 0000000000..d409618211 --- /dev/null +++ b/api/tests/unit_tests/repositories/test_sqlalchemy_workflow_trigger_log_repository.py @@ -0,0 +1,31 @@ +from unittest.mock import Mock + +from sqlalchemy.dialects import postgresql +from sqlalchemy.orm import Session + +from repositories.sqlalchemy_workflow_trigger_log_repository import SQLAlchemyWorkflowTriggerLogRepository + + +def test_delete_by_run_ids_executes_delete(): + session = Mock(spec=Session) + session.execute.return_value = Mock(rowcount=2) + repo = SQLAlchemyWorkflowTriggerLogRepository(session) + + deleted = repo.delete_by_run_ids(["run-1", "run-2"]) + + stmt = session.execute.call_args[0][0] + compiled_sql = str(stmt.compile(dialect=postgresql.dialect(), compile_kwargs={"literal_binds": True})) + assert "workflow_trigger_logs" in compiled_sql + assert "'run-1'" in compiled_sql + assert "'run-2'" in compiled_sql + assert deleted == 2 + + +def test_delete_by_run_ids_empty_short_circuits(): + session = Mock(spec=Session) + repo = SQLAlchemyWorkflowTriggerLogRepository(session) + + deleted = repo.delete_by_run_ids([]) + + session.execute.assert_not_called() + assert deleted == 0 diff --git a/api/tests/unit_tests/services/test_clear_free_plan_expired_workflow_run_logs.py b/api/tests/unit_tests/services/test_clear_free_plan_expired_workflow_run_logs.py new file mode 100644 index 0000000000..d0921c1b5d --- /dev/null +++ b/api/tests/unit_tests/services/test_clear_free_plan_expired_workflow_run_logs.py @@ -0,0 +1,275 @@ +import datetime +from typing import Any + +import pytest + +from services import clear_free_plan_expired_workflow_run_logs as cleanup_module +from services.billing_service import SubscriptionPlan +from services.clear_free_plan_expired_workflow_run_logs import WorkflowRunCleanup + + +class FakeRun: + def __init__( + self, + run_id: str, + tenant_id: str, + created_at: datetime.datetime, + app_id: str = "app-1", + workflow_id: str = "wf-1", + triggered_from: str = "workflow-run", + ) -> None: + self.id = run_id + self.tenant_id = tenant_id + self.app_id = app_id + self.workflow_id = workflow_id + self.triggered_from = triggered_from + self.created_at = created_at + + +class FakeRepo: + def __init__(self, batches: list[list[FakeRun]], delete_result: dict[str, int] | None = None) -> None: + self.batches = batches + self.call_idx = 0 + self.deleted: list[list[str]] = [] + self.delete_result = delete_result or { + "runs": 0, + "node_executions": 0, + "offloads": 0, + "app_logs": 0, + "trigger_logs": 0, + "pauses": 0, + "pause_reasons": 0, + } + + def get_runs_batch_by_time_range( + self, + start_after: datetime.datetime | None, + end_before: datetime.datetime, + last_seen: tuple[datetime.datetime, str] | None, + batch_size: int, + ) -> list[FakeRun]: + if self.call_idx >= len(self.batches): + return [] + batch = self.batches[self.call_idx] + self.call_idx += 1 + return batch + + def delete_runs_with_related( + self, runs: list[FakeRun], delete_node_executions=None, delete_trigger_logs=None + ) -> dict[str, int]: + self.deleted.append([run.id for run in runs]) + result = self.delete_result.copy() + result["runs"] = len(runs) + return result + + +def plan_info(plan: str, expiration: int) -> SubscriptionPlan: + return SubscriptionPlan(plan=plan, expiration_date=expiration) + + +def create_cleanup( + monkeypatch: pytest.MonkeyPatch, + repo: FakeRepo, + *, + grace_period_days: int = 0, + whitelist: set[str] | None = None, + **kwargs: Any, +) -> WorkflowRunCleanup: + monkeypatch.setattr( + cleanup_module.dify_config, + "SANDBOX_EXPIRED_RECORDS_CLEAN_GRACEFUL_PERIOD", + grace_period_days, + ) + monkeypatch.setattr( + cleanup_module.WorkflowRunCleanup, + "_get_cleanup_whitelist", + lambda self: whitelist or set(), + ) + return WorkflowRunCleanup(workflow_run_repo=repo, **kwargs) + + +def test_filter_free_tenants_billing_disabled(monkeypatch: pytest.MonkeyPatch) -> None: + cleanup = create_cleanup(monkeypatch, repo=FakeRepo([]), days=30, batch_size=10) + + monkeypatch.setattr(cleanup_module.dify_config, "BILLING_ENABLED", False) + + def fail_bulk(_: list[str]) -> dict[str, SubscriptionPlan]: + raise RuntimeError("should not call") + + monkeypatch.setattr(cleanup_module.BillingService, "get_plan_bulk", staticmethod(fail_bulk)) + + tenants = {"t1", "t2"} + free = cleanup._filter_free_tenants(tenants) + + assert free == tenants + + +def test_filter_free_tenants_bulk_mixed(monkeypatch: pytest.MonkeyPatch) -> None: + cleanup = create_cleanup(monkeypatch, repo=FakeRepo([]), days=30, batch_size=10) + + monkeypatch.setattr(cleanup_module.dify_config, "BILLING_ENABLED", True) + cleanup.billing_cache["t_free"] = plan_info("sandbox", -1) + cleanup.billing_cache["t_paid"] = plan_info("team", -1) + monkeypatch.setattr( + cleanup_module.BillingService, + "get_plan_bulk", + staticmethod(lambda tenant_ids: {tenant_id: plan_info("sandbox", -1) for tenant_id in tenant_ids}), + ) + + free = cleanup._filter_free_tenants({"t_free", "t_paid", "t_missing"}) + + assert free == {"t_free", "t_missing"} + + +def test_filter_free_tenants_respects_grace_period(monkeypatch: pytest.MonkeyPatch) -> None: + cleanup = create_cleanup(monkeypatch, repo=FakeRepo([]), days=30, batch_size=10, grace_period_days=45) + + monkeypatch.setattr(cleanup_module.dify_config, "BILLING_ENABLED", True) + now = datetime.datetime.now(datetime.UTC) + within_grace_ts = int((now - datetime.timedelta(days=10)).timestamp()) + outside_grace_ts = int((now - datetime.timedelta(days=90)).timestamp()) + + def fake_bulk(_: list[str]) -> dict[str, SubscriptionPlan]: + return { + "recently_downgraded": plan_info("sandbox", within_grace_ts), + "long_sandbox": plan_info("sandbox", outside_grace_ts), + } + + monkeypatch.setattr(cleanup_module.BillingService, "get_plan_bulk", staticmethod(fake_bulk)) + + free = cleanup._filter_free_tenants({"recently_downgraded", "long_sandbox"}) + + assert free == {"long_sandbox"} + + +def test_filter_free_tenants_skips_cleanup_whitelist(monkeypatch: pytest.MonkeyPatch) -> None: + cleanup = create_cleanup( + monkeypatch, + repo=FakeRepo([]), + days=30, + batch_size=10, + whitelist={"tenant_whitelist"}, + ) + + monkeypatch.setattr(cleanup_module.dify_config, "BILLING_ENABLED", True) + cleanup.billing_cache["tenant_whitelist"] = plan_info("sandbox", -1) + monkeypatch.setattr( + cleanup_module.BillingService, + "get_plan_bulk", + staticmethod(lambda tenant_ids: {tenant_id: plan_info("sandbox", -1) for tenant_id in tenant_ids}), + ) + + tenants = {"tenant_whitelist", "tenant_regular"} + free = cleanup._filter_free_tenants(tenants) + + assert free == {"tenant_regular"} + + +def test_filter_free_tenants_bulk_failure(monkeypatch: pytest.MonkeyPatch) -> None: + cleanup = create_cleanup(monkeypatch, repo=FakeRepo([]), days=30, batch_size=10) + + monkeypatch.setattr(cleanup_module.dify_config, "BILLING_ENABLED", True) + monkeypatch.setattr( + cleanup_module.BillingService, + "get_plan_bulk", + staticmethod(lambda tenant_ids: (_ for _ in ()).throw(RuntimeError("boom"))), + ) + + free = cleanup._filter_free_tenants({"t1", "t2"}) + + assert free == set() + + +def test_run_deletes_only_free_tenants(monkeypatch: pytest.MonkeyPatch) -> None: + cutoff = datetime.datetime.now() + repo = FakeRepo( + batches=[ + [ + FakeRun("run-free", "t_free", cutoff), + FakeRun("run-paid", "t_paid", cutoff), + ] + ] + ) + cleanup = create_cleanup(monkeypatch, repo=repo, days=30, batch_size=10) + + monkeypatch.setattr(cleanup_module.dify_config, "BILLING_ENABLED", True) + cleanup.billing_cache["t_free"] = plan_info("sandbox", -1) + cleanup.billing_cache["t_paid"] = plan_info("team", -1) + monkeypatch.setattr( + cleanup_module.BillingService, + "get_plan_bulk", + staticmethod(lambda tenant_ids: {tenant_id: plan_info("sandbox", -1) for tenant_id in tenant_ids}), + ) + + cleanup.run() + + assert repo.deleted == [["run-free"]] + + +def test_run_skips_when_no_free_tenants(monkeypatch: pytest.MonkeyPatch) -> None: + cutoff = datetime.datetime.now() + repo = FakeRepo(batches=[[FakeRun("run-paid", "t_paid", cutoff)]]) + cleanup = create_cleanup(monkeypatch, repo=repo, days=30, batch_size=10) + + monkeypatch.setattr(cleanup_module.dify_config, "BILLING_ENABLED", True) + monkeypatch.setattr( + cleanup_module.BillingService, + "get_plan_bulk", + staticmethod(lambda tenant_ids: {tenant_id: plan_info("team", 1893456000) for tenant_id in tenant_ids}), + ) + + cleanup.run() + + assert repo.deleted == [] + + +def test_run_exits_on_empty_batch(monkeypatch: pytest.MonkeyPatch) -> None: + cleanup = create_cleanup(monkeypatch, repo=FakeRepo([]), days=30, batch_size=10) + + cleanup.run() + + +def test_run_dry_run_skips_deletions(monkeypatch: pytest.MonkeyPatch, capsys: pytest.CaptureFixture[str]) -> None: + cutoff = datetime.datetime.now() + repo = FakeRepo(batches=[[FakeRun("run-free", "t_free", cutoff)]]) + cleanup = create_cleanup(monkeypatch, repo=repo, days=30, batch_size=10, dry_run=True) + + monkeypatch.setattr(cleanup_module.dify_config, "BILLING_ENABLED", False) + + cleanup.run() + + assert repo.deleted == [] + captured = capsys.readouterr().out + assert "Dry run mode enabled" in captured + assert "would delete 1 runs" in captured + + +def test_between_sets_window_bounds(monkeypatch: pytest.MonkeyPatch) -> None: + start_after = datetime.datetime(2024, 5, 1, 0, 0, 0) + end_before = datetime.datetime(2024, 6, 1, 0, 0, 0) + cleanup = create_cleanup( + monkeypatch, repo=FakeRepo([]), days=30, batch_size=10, start_after=start_after, end_before=end_before + ) + + assert cleanup.window_start == start_after + assert cleanup.window_end == end_before + + +def test_between_requires_both_boundaries(monkeypatch: pytest.MonkeyPatch) -> None: + with pytest.raises(ValueError): + create_cleanup( + monkeypatch, repo=FakeRepo([]), days=30, batch_size=10, start_after=datetime.datetime.now(), end_before=None + ) + with pytest.raises(ValueError): + create_cleanup( + monkeypatch, repo=FakeRepo([]), days=30, batch_size=10, start_after=None, end_before=datetime.datetime.now() + ) + + +def test_between_requires_end_after_start(monkeypatch: pytest.MonkeyPatch) -> None: + start_after = datetime.datetime(2024, 6, 1, 0, 0, 0) + end_before = datetime.datetime(2024, 5, 1, 0, 0, 0) + with pytest.raises(ValueError): + create_cleanup( + monkeypatch, repo=FakeRepo([]), days=30, batch_size=10, start_after=start_after, end_before=end_before + ) diff --git a/docker/.env.example b/docker/.env.example index 0e09d6869d..372a5de842 100644 --- a/docker/.env.example +++ b/docker/.env.example @@ -1463,6 +1463,7 @@ ENABLE_CLEAN_UNUSED_DATASETS_TASK=false ENABLE_CREATE_TIDB_SERVERLESS_TASK=false ENABLE_UPDATE_TIDB_SERVERLESS_STATUS_TASK=false ENABLE_CLEAN_MESSAGES=false +ENABLE_WORKFLOW_RUN_CLEANUP_TASK=false ENABLE_MAIL_CLEAN_DOCUMENT_NOTIFY_TASK=false ENABLE_DATASETS_QUEUE_MONITOR=false ENABLE_CHECK_UPGRADABLE_PLUGIN_TASK=true diff --git a/docker/docker-compose.yaml b/docker/docker-compose.yaml index 1c8d8d03e3..6bf92d2d2a 100644 --- a/docker/docker-compose.yaml +++ b/docker/docker-compose.yaml @@ -653,6 +653,7 @@ x-shared-env: &shared-api-worker-env ENABLE_CREATE_TIDB_SERVERLESS_TASK: ${ENABLE_CREATE_TIDB_SERVERLESS_TASK:-false} ENABLE_UPDATE_TIDB_SERVERLESS_STATUS_TASK: ${ENABLE_UPDATE_TIDB_SERVERLESS_STATUS_TASK:-false} ENABLE_CLEAN_MESSAGES: ${ENABLE_CLEAN_MESSAGES:-false} + ENABLE_WORKFLOW_RUN_CLEANUP_TASK: ${ENABLE_WORKFLOW_RUN_CLEANUP_TASK:-false} ENABLE_MAIL_CLEAN_DOCUMENT_NOTIFY_TASK: ${ENABLE_MAIL_CLEAN_DOCUMENT_NOTIFY_TASK:-false} ENABLE_DATASETS_QUEUE_MONITOR: ${ENABLE_DATASETS_QUEUE_MONITOR:-false} ENABLE_CHECK_UPGRADABLE_PLUGIN_TASK: ${ENABLE_CHECK_UPGRADABLE_PLUGIN_TASK:-true}