diff --git a/api/.env.example b/api/.env.example index 554b1624ec..38a096da0a 100644 --- a/api/.env.example +++ b/api/.env.example @@ -553,6 +553,8 @@ WORKFLOW_LOG_CLEANUP_ENABLED=false WORKFLOW_LOG_RETENTION_DAYS=30 # Batch size for workflow log cleanup operations (default: 100) WORKFLOW_LOG_CLEANUP_BATCH_SIZE=100 +# Comma-separated list of workflow IDs to clean logs for +WORKFLOW_LOG_CLEANUP_SPECIFIC_WORKFLOW_IDS= # App configuration APP_MAX_EXECUTION_TIME=1200 diff --git a/api/configs/feature/__init__.py b/api/configs/feature/__init__.py index 46dad6fc05..3fe9031dff 100644 --- a/api/configs/feature/__init__.py +++ b/api/configs/feature/__init__.py @@ -1314,6 +1314,9 @@ class WorkflowLogConfig(BaseSettings): WORKFLOW_LOG_CLEANUP_BATCH_SIZE: int = Field( default=100, description="Batch size for workflow run log cleanup operations" ) + WORKFLOW_LOG_CLEANUP_SPECIFIC_WORKFLOW_IDS: str = Field( + default="", description="Comma-separated list of workflow IDs to clean logs for" + ) class SwaggerUIConfig(BaseSettings): diff --git a/api/repositories/api_workflow_run_repository.py b/api/repositories/api_workflow_run_repository.py index 17e01a6e18..ffa87b209f 100644 --- a/api/repositories/api_workflow_run_repository.py +++ b/api/repositories/api_workflow_run_repository.py @@ -264,9 +264,15 @@ class APIWorkflowRunRepository(WorkflowExecutionRepository, Protocol): batch_size: int, run_types: Sequence[WorkflowType] | None = None, tenant_ids: Sequence[str] | None = None, + workflow_ids: Sequence[str] | None = None, ) -> Sequence[WorkflowRun]: """ Fetch ended workflow runs in a time window for archival and clean batching. + + Optional filters: + - run_types + - tenant_ids + - workflow_ids """ ... diff --git a/api/repositories/sqlalchemy_api_workflow_run_repository.py b/api/repositories/sqlalchemy_api_workflow_run_repository.py index 00cb979e17..7935dfb225 100644 --- a/api/repositories/sqlalchemy_api_workflow_run_repository.py +++ b/api/repositories/sqlalchemy_api_workflow_run_repository.py @@ -386,6 +386,7 @@ class DifyAPISQLAlchemyWorkflowRunRepository(APIWorkflowRunRepository): batch_size: int, run_types: Sequence[WorkflowType] | None = None, tenant_ids: Sequence[str] | None = None, + workflow_ids: Sequence[str] | None = None, ) -> Sequence[WorkflowRun]: """ Fetch ended workflow runs in a time window for archival and clean batching. @@ -394,7 +395,7 @@ class DifyAPISQLAlchemyWorkflowRunRepository(APIWorkflowRunRepository): - created_at in [start_from, end_before) - type in run_types (when provided) - status is an ended state - - optional tenant_id filter and cursor (last_seen) for pagination + - optional tenant_id, workflow_id filters and cursor (last_seen) for pagination """ with self._session_maker() as session: stmt = ( @@ -417,6 +418,9 @@ class DifyAPISQLAlchemyWorkflowRunRepository(APIWorkflowRunRepository): if tenant_ids: stmt = stmt.where(WorkflowRun.tenant_id.in_(tenant_ids)) + if workflow_ids: + stmt = stmt.where(WorkflowRun.workflow_id.in_(workflow_ids)) + if last_seen: stmt = stmt.where( or_( diff --git a/api/schedule/clean_workflow_runlogs_precise.py b/api/schedule/clean_workflow_runlogs_precise.py index db4198720d..ebb8d52924 100644 --- a/api/schedule/clean_workflow_runlogs_precise.py +++ b/api/schedule/clean_workflow_runlogs_precise.py @@ -4,7 +4,6 @@ import time from collections.abc import Sequence import click -from sqlalchemy import select from sqlalchemy.orm import Session, sessionmaker import app @@ -13,6 +12,7 @@ from extensions.ext_database import db from models.model import ( AppAnnotationHitHistory, Conversation, + DatasetRetrieverResource, Message, MessageAgentThought, MessageAnnotation, @@ -20,7 +20,10 @@ from models.model import ( MessageFeedback, MessageFile, ) -from models.workflow import ConversationVariable, WorkflowAppLog, WorkflowNodeExecutionModel, WorkflowRun +from models.web import SavedMessage +from models.workflow import ConversationVariable, WorkflowRun +from repositories.factory import DifyAPIRepositoryFactory +from repositories.sqlalchemy_workflow_trigger_log_repository import SQLAlchemyWorkflowTriggerLogRepository logger = logging.getLogger(__name__) @@ -29,8 +32,15 @@ MAX_RETRIES = 3 BATCH_SIZE = dify_config.WORKFLOW_LOG_CLEANUP_BATCH_SIZE -@app.celery.task(queue="dataset") -def clean_workflow_runlogs_precise(): +def _get_specific_workflow_ids() -> list[str]: + workflow_ids_str = dify_config.WORKFLOW_LOG_CLEANUP_SPECIFIC_WORKFLOW_IDS.strip() + if not workflow_ids_str: + return [] + return [wid.strip() for wid in workflow_ids_str.split(",") if wid.strip()] + + +@app.celery.task(queue="retention") +def clean_workflow_runlogs_precise() -> None: """Clean expired workflow run logs with retry mechanism and complete message cascade""" click.echo(click.style("Start clean workflow run logs (precise mode with complete cascade).", fg="green")) @@ -39,48 +49,48 @@ def clean_workflow_runlogs_precise(): retention_days = dify_config.WORKFLOW_LOG_RETENTION_DAYS cutoff_date = datetime.datetime.now() - datetime.timedelta(days=retention_days) session_factory = sessionmaker(db.engine, expire_on_commit=False) + workflow_run_repo = DifyAPIRepositoryFactory.create_api_workflow_run_repository(session_factory) + workflow_ids = _get_specific_workflow_ids() + workflow_ids_filter = workflow_ids or None try: - with session_factory.begin() as session: - total_workflow_runs = session.query(WorkflowRun).where(WorkflowRun.created_at < cutoff_date).count() - if total_workflow_runs == 0: - logger.info("No expired workflow run logs found") - return - logger.info("Found %s expired workflow run logs to clean", total_workflow_runs) - total_deleted = 0 failed_batches = 0 batch_count = 0 + last_seen: tuple[datetime.datetime, str] | None = None while True: + run_rows = workflow_run_repo.get_runs_batch_by_time_range( + start_from=None, + end_before=cutoff_date, + last_seen=last_seen, + batch_size=BATCH_SIZE, + workflow_ids=workflow_ids_filter, + ) + + if not run_rows: + if batch_count == 0: + logger.info("No expired workflow run logs found") + break + + last_seen = (run_rows[-1].created_at, run_rows[-1].id) + batch_count += 1 with session_factory.begin() as session: - workflow_run_ids = session.scalars( - select(WorkflowRun.id) - .where(WorkflowRun.created_at < cutoff_date) - .order_by(WorkflowRun.created_at, WorkflowRun.id) - .limit(BATCH_SIZE) - ).all() + success = _delete_batch(session, workflow_run_repo, run_rows, failed_batches) - if not workflow_run_ids: + if success: + total_deleted += len(run_rows) + failed_batches = 0 + else: + failed_batches += 1 + if failed_batches >= MAX_RETRIES: + logger.error("Failed to delete batch after %s retries, aborting cleanup for today", MAX_RETRIES) break - - batch_count += 1 - - success = _delete_batch(session, workflow_run_ids, failed_batches) - - if success: - total_deleted += len(workflow_run_ids) - failed_batches = 0 else: - failed_batches += 1 - if failed_batches >= MAX_RETRIES: - logger.error("Failed to delete batch after %s retries, aborting cleanup for today", MAX_RETRIES) - break - else: - # Calculate incremental delay times: 5, 10, 15 minutes - retry_delay_minutes = failed_batches * 5 - logger.warning("Batch deletion failed, retrying in %s minutes...", retry_delay_minutes) - time.sleep(retry_delay_minutes * 60) - continue + # Calculate incremental delay times: 5, 10, 15 minutes + retry_delay_minutes = failed_batches * 5 + logger.warning("Batch deletion failed, retrying in %s minutes...", retry_delay_minutes) + time.sleep(retry_delay_minutes * 60) + continue logger.info("Cleanup completed: %s expired workflow run logs deleted", total_deleted) @@ -93,10 +103,16 @@ def clean_workflow_runlogs_precise(): click.echo(click.style(f"Cleaned workflow run logs from db success latency: {execution_time:.2f}s", fg="green")) -def _delete_batch(session: Session, workflow_run_ids: Sequence[str], attempt_count: int) -> bool: +def _delete_batch( + session: Session, + workflow_run_repo, + workflow_runs: Sequence[WorkflowRun], + attempt_count: int, +) -> bool: """Delete a single batch of workflow runs and all related data within a nested transaction.""" try: with session.begin_nested(): + workflow_run_ids = [run.id for run in workflow_runs] message_data = ( session.query(Message.id, Message.conversation_id) .where(Message.workflow_run_id.in_(workflow_run_ids)) @@ -107,11 +123,13 @@ def _delete_batch(session: Session, workflow_run_ids: Sequence[str], attempt_cou if message_id_list: message_related_models = [ AppAnnotationHitHistory, + DatasetRetrieverResource, MessageAgentThought, MessageChain, MessageFile, MessageAnnotation, MessageFeedback, + SavedMessage, ] for model in message_related_models: session.query(model).where(model.message_id.in_(message_id_list)).delete(synchronize_session=False) # type: ignore @@ -122,14 +140,6 @@ def _delete_batch(session: Session, workflow_run_ids: Sequence[str], attempt_cou synchronize_session=False ) - session.query(WorkflowAppLog).where(WorkflowAppLog.workflow_run_id.in_(workflow_run_ids)).delete( - synchronize_session=False - ) - - session.query(WorkflowNodeExecutionModel).where( - WorkflowNodeExecutionModel.workflow_run_id.in_(workflow_run_ids) - ).delete(synchronize_session=False) - if conversation_id_list: session.query(ConversationVariable).where( ConversationVariable.conversation_id.in_(conversation_id_list) @@ -139,7 +149,22 @@ def _delete_batch(session: Session, workflow_run_ids: Sequence[str], attempt_cou synchronize_session=False ) - session.query(WorkflowRun).where(WorkflowRun.id.in_(workflow_run_ids)).delete(synchronize_session=False) + def _delete_node_executions(active_session: Session, runs: Sequence[WorkflowRun]) -> tuple[int, int]: + run_ids = [run.id for run in runs] + repo = DifyAPIRepositoryFactory.create_api_workflow_node_execution_repository( + session_maker=sessionmaker(bind=active_session.get_bind(), expire_on_commit=False) + ) + return repo.delete_by_runs(active_session, run_ids) + + def _delete_trigger_logs(active_session: Session, run_ids: Sequence[str]) -> int: + trigger_repo = SQLAlchemyWorkflowTriggerLogRepository(active_session) + return trigger_repo.delete_by_run_ids(run_ids) + + workflow_run_repo.delete_runs_with_related( + workflow_runs, + delete_node_executions=_delete_node_executions, + delete_trigger_logs=_delete_trigger_logs, + ) return True diff --git a/api/tests/unit_tests/services/test_clear_free_plan_expired_workflow_run_logs.py b/api/tests/unit_tests/services/test_clear_free_plan_expired_workflow_run_logs.py index 8c80e2b4ad..50826d6798 100644 --- a/api/tests/unit_tests/services/test_clear_free_plan_expired_workflow_run_logs.py +++ b/api/tests/unit_tests/services/test_clear_free_plan_expired_workflow_run_logs.py @@ -62,6 +62,9 @@ class FakeRepo: end_before: datetime.datetime, last_seen: tuple[datetime.datetime, str] | None, batch_size: int, + run_types=None, + tenant_ids=None, + workflow_ids=None, ) -> list[FakeRun]: if self.call_idx >= len(self.batches): return [] diff --git a/docker/.env.example b/docker/.env.example index 4a141e37d4..3d0009711d 100644 --- a/docker/.env.example +++ b/docker/.env.example @@ -1073,6 +1073,8 @@ WORKFLOW_LOG_CLEANUP_ENABLED=false WORKFLOW_LOG_RETENTION_DAYS=30 # Batch size for workflow log cleanup operations (default: 100) WORKFLOW_LOG_CLEANUP_BATCH_SIZE=100 +# Comma-separated list of workflow IDs to clean logs for +WORKFLOW_LOG_CLEANUP_SPECIFIC_WORKFLOW_IDS= # Aliyun SLS Logstore Configuration # Aliyun Access Key ID diff --git a/docker/docker-compose.yaml b/docker/docker-compose.yaml index 6340dd290e..003ecf8497 100644 --- a/docker/docker-compose.yaml +++ b/docker/docker-compose.yaml @@ -470,6 +470,7 @@ x-shared-env: &shared-api-worker-env WORKFLOW_LOG_CLEANUP_ENABLED: ${WORKFLOW_LOG_CLEANUP_ENABLED:-false} WORKFLOW_LOG_RETENTION_DAYS: ${WORKFLOW_LOG_RETENTION_DAYS:-30} WORKFLOW_LOG_CLEANUP_BATCH_SIZE: ${WORKFLOW_LOG_CLEANUP_BATCH_SIZE:-100} + WORKFLOW_LOG_CLEANUP_SPECIFIC_WORKFLOW_IDS: ${WORKFLOW_LOG_CLEANUP_SPECIFIC_WORKFLOW_IDS:-} ALIYUN_SLS_ACCESS_KEY_ID: ${ALIYUN_SLS_ACCESS_KEY_ID:-} ALIYUN_SLS_ACCESS_KEY_SECRET: ${ALIYUN_SLS_ACCESS_KEY_SECRET:-} ALIYUN_SLS_ENDPOINT: ${ALIYUN_SLS_ENDPOINT:-}