diff --git a/api/commands.py b/api/commands.py index 9a990459c0..944b79d494 100644 --- a/api/commands.py +++ b/api/commands.py @@ -4,6 +4,7 @@ import json import logging import secrets from typing import Any +from uuid import uuid4 import click import sqlalchemy as sa @@ -41,6 +42,14 @@ from models.provider import Provider, ProviderModel from models.provider_ids import DatasourceProviderID, ToolProviderID from models.source import DataSourceApiKeyAuthBinding, DataSourceOauthBinding from models.tools import ToolOAuthSystemClient +from models.trigger import AppTriggerType, WorkflowTriggerLog, WorkflowTriggerStatus +from models.workflow import ( + WorkflowAppLog, + WorkflowNodeExecutionModel, + WorkflowNodeExecutionTriggeredFrom, + WorkflowPause, + WorkflowRun, +) from services.account_service import AccountService, RegisterService, TenantService from services.clear_free_plan_expired_workflow_run_logs import WorkflowRunCleanup from services.clear_free_plan_tenant_expired_logs import ClearFreePlanTenantExpiredLogs @@ -893,6 +902,131 @@ def clean_workflow_runs( click.echo(click.style("Workflow run cleanup completed.", fg="green")) +@click.command( + "seed-expired-workflow-runs", + help="Test-only: create synthetic expired workflow runs (with related records) for cleanup validation.", +) +@click.option("--days-ago", default=40, show_default=True, help="Backdate created_at to N days ago.") +@click.option("--limit", default=5, show_default=True, help="Number of workflow runs to create.") +@click.option( + "--tenant-id", + default=None, + help="Optional tenant id to use; if omitted, a random tenant id is generated.", +) +def seed_expired_workflow_runs(days_ago: int, limit: int, tenant_id: str | None) -> None: + base_time = datetime.datetime.now() - datetime.timedelta(days=days_ago) + tenant = tenant_id or str(uuid4()) + + with sessionmaker(db.engine, expire_on_commit=False).begin() as session: + for i in range(limit): + run_id = str(uuid4()) + app_id = str(uuid4()) + workflow_id = str(uuid4()) + creator_id = str(uuid4()) + created_at = base_time - datetime.timedelta(minutes=i) + + run = WorkflowRun( + id=run_id, + tenant_id=tenant, + app_id=app_id, + workflow_id=workflow_id, + type="workflow", + triggered_from="app-run", + version="v1", + graph="{}", + inputs="{}", + status="succeeded", + outputs="{}", + error=None, + elapsed_time=0.1, + total_tokens=0, + total_steps=0, + created_by_role="account", + created_by=creator_id, + created_at=created_at, + finished_at=created_at + datetime.timedelta(seconds=1), + exceptions_count=0, + ) + + node_execution = WorkflowNodeExecutionModel( + id=str(uuid4()), + tenant_id=tenant, + app_id=app_id, + workflow_id=workflow_id, + triggered_from=WorkflowNodeExecutionTriggeredFrom.WORKFLOW_RUN.value, + workflow_run_id=run_id, + index=0, + predecessor_node_id=None, + node_execution_id=None, + node_id="start", + node_type="start", + title="Seeded node", + inputs="{}", + process_data="{}", + outputs="{}", + status="succeeded", + error=None, + elapsed_time=0.1, + execution_metadata="{}", + created_by_role="account", + created_by=creator_id, + created_at=created_at, + finished_at=created_at + datetime.timedelta(seconds=1), + ) + + app_log = WorkflowAppLog( + tenant_id=tenant, + app_id=app_id, + workflow_id=workflow_id, + workflow_run_id=run_id, + created_from="service-api", + created_by_role="account", + created_by=creator_id + ) + + trigger_log = WorkflowTriggerLog( + tenant_id=tenant, + app_id=app_id, + workflow_id=workflow_id, + workflow_run_id=run_id, + root_node_id=None, + trigger_metadata="{}", + trigger_type=AppTriggerType.TRIGGER_WEBHOOK, + trigger_data="{}", + inputs="{}", + outputs="{}", + status=WorkflowTriggerStatus.SUCCEEDED, + error=None, + queue_name="seed", + celery_task_id=None, + created_by_role="account", + created_by=creator_id, + retry_count=0, + elapsed_time=0.1, + total_tokens=0, + triggered_at=created_at, + finished_at=created_at + datetime.timedelta(seconds=1), + ) + + pause = WorkflowPause( + workflow_id=workflow_id, + workflow_run_id=run_id, + resumed_at=None, + state_object_key="seeded-state", + created_at=created_at, + updated_at=created_at, + ) + + session.add_all([run, node_execution, app_log, trigger_log, pause]) + + click.echo( + click.style( + f"Created {limit} synthetic workflow runs (tenant={tenant}) backdated ~{days_ago} days.", + fg="green", + ) + ) + + @click.option("-f", "--force", is_flag=True, help="Skip user confirmation and force the command to execute.") @click.command("clear-orphaned-file-records", help="Clear orphaned file records.") def clear_orphaned_file_records(force: bool): diff --git a/api/extensions/ext_commands.py b/api/extensions/ext_commands.py index 6f6322827c..611cc0c874 100644 --- a/api/extensions/ext_commands.py +++ b/api/extensions/ext_commands.py @@ -22,6 +22,7 @@ def init_app(app: DifyApp): reset_email, reset_encrypt_key_pair, reset_password, + seed_expired_workflow_runs, setup_datasource_oauth_client, setup_system_tool_oauth_client, setup_system_trigger_oauth_client, @@ -56,6 +57,7 @@ def init_app(app: DifyApp): transform_datasource_credentials, install_rag_pipeline_plugins, clean_workflow_runs, + seed_expired_workflow_runs, ] for cmd in cmds_to_register: app.cli.add_command(cmd)