diff --git a/api/.env.example b/api/.env.example index 15981c14b8..c3b1474549 100644 --- a/api/.env.example +++ b/api/.env.example @@ -715,4 +715,5 @@ ANNOTATION_IMPORT_MAX_CONCURRENT=5 SANDBOX_EXPIRED_RECORDS_CLEAN_GRACEFUL_PERIOD=21 SANDBOX_EXPIRED_RECORDS_CLEAN_BATCH_SIZE=1000 SANDBOX_EXPIRED_RECORDS_RETENTION_DAYS=30 +SANDBOX_EXPIRED_RECORDS_CLEAN_TASK_LOCK_TTL=90000 diff --git a/api/configs/feature/__init__.py b/api/configs/feature/__init__.py index cf71a33fa8..03aff7e6b5 100644 --- a/api/configs/feature/__init__.py +++ b/api/configs/feature/__init__.py @@ -1298,6 +1298,10 @@ class SandboxExpiredRecordsCleanConfig(BaseSettings): description="Retention days for sandbox expired workflow_run records and message records", default=30, ) + SANDBOX_EXPIRED_RECORDS_CLEAN_TASK_LOCK_TTL: PositiveInt = Field( + description="Lock TTL for sandbox expired records clean task in seconds", + default=90000, + ) class FeatureConfig( diff --git a/api/schedule/clean_messages.py b/api/schedule/clean_messages.py index e85bba8823..be5f483b95 100644 --- a/api/schedule/clean_messages.py +++ b/api/schedule/clean_messages.py @@ -2,9 +2,11 @@ import logging import time import click +from redis.exceptions import LockError import app from configs import dify_config +from extensions.ext_redis import redis_client from services.retention.conversation.messages_clean_policy import create_message_clean_policy from services.retention.conversation.messages_clean_service import MessagesCleanService @@ -31,12 +33,16 @@ def clean_messages(): ) # Create and run the cleanup service - service = MessagesCleanService.from_days( - policy=policy, - days=dify_config.SANDBOX_EXPIRED_RECORDS_RETENTION_DAYS, - batch_size=dify_config.SANDBOX_EXPIRED_RECORDS_CLEAN_BATCH_SIZE, - ) - stats = service.run() + # lock the task to avoid concurrent execution in case of the future data volume growth + with redis_client.lock( + "retention:clean_messages", timeout=dify_config.SANDBOX_EXPIRED_RECORDS_CLEAN_TASK_LOCK_TTL, blocking=False + ): + service = MessagesCleanService.from_days( + policy=policy, + days=dify_config.SANDBOX_EXPIRED_RECORDS_RETENTION_DAYS, + batch_size=dify_config.SANDBOX_EXPIRED_RECORDS_CLEAN_BATCH_SIZE, + ) + stats = service.run() end_at = time.perf_counter() click.echo( @@ -50,6 +56,16 @@ def clean_messages(): fg="green", ) ) + except LockError: + end_at = time.perf_counter() + logger.exception("clean_messages: acquire task lock failed, skip current execution") + click.echo( + click.style( + f"clean_messages: skipped (lock already held) - latency: {end_at - start_at:.2f}s", + fg="yellow", + ) + ) + raise except Exception as e: end_at = time.perf_counter() logger.exception("clean_messages failed") diff --git a/api/schedule/clean_workflow_runs_task.py b/api/schedule/clean_workflow_runs_task.py index 9f5bf8e150..ff45a3ddf2 100644 --- a/api/schedule/clean_workflow_runs_task.py +++ b/api/schedule/clean_workflow_runs_task.py @@ -1,11 +1,16 @@ +import logging from datetime import UTC, datetime import click +from redis.exceptions import LockError import app from configs import dify_config +from extensions.ext_redis import redis_client from services.retention.workflow_run.clear_free_plan_expired_workflow_run_logs import WorkflowRunCleanup +logger = logging.getLogger(__name__) + @app.celery.task(queue="retention") def clean_workflow_runs_task() -> None: @@ -25,19 +30,50 @@ def clean_workflow_runs_task() -> None: start_time = datetime.now(UTC) - WorkflowRunCleanup( - days=dify_config.SANDBOX_EXPIRED_RECORDS_RETENTION_DAYS, - batch_size=dify_config.SANDBOX_EXPIRED_RECORDS_CLEAN_BATCH_SIZE, - start_from=None, - end_before=None, - ).run() + try: + # lock the task to avoid concurrent execution in case of the future data volume growth + with redis_client.lock( + "retention:clean_workflow_runs_task", + timeout=dify_config.SANDBOX_EXPIRED_RECORDS_CLEAN_TASK_LOCK_TTL, + blocking=False, + ): + WorkflowRunCleanup( + days=dify_config.SANDBOX_EXPIRED_RECORDS_RETENTION_DAYS, + batch_size=dify_config.SANDBOX_EXPIRED_RECORDS_CLEAN_BATCH_SIZE, + start_from=None, + end_before=None, + ).run() - end_time = datetime.now(UTC) - elapsed = end_time - start_time - click.echo( - click.style( - f"Scheduled workflow run cleanup finished. start={start_time.isoformat()} " - f"end={end_time.isoformat()} duration={elapsed}", - fg="green", + end_time = datetime.now(UTC) + elapsed = end_time - start_time + click.echo( + click.style( + f"Scheduled workflow run cleanup finished. start={start_time.isoformat()} " + f"end={end_time.isoformat()} duration={elapsed}", + fg="green", + ) ) - ) + except LockError: + end_time = datetime.now(UTC) + elapsed = end_time - start_time + logger.exception("clean_workflow_runs_task: acquire task lock failed, skip current execution") + click.echo( + click.style( + f"Scheduled workflow run cleanup skipped (lock already held). " + f"start={start_time.isoformat()} end={end_time.isoformat()} duration={elapsed}", + fg="yellow", + ) + ) + raise + except Exception as e: + end_time = datetime.now(UTC) + elapsed = end_time - start_time + logger.exception("clean_workflow_runs_task failed") + click.echo( + click.style( + f"Scheduled workflow run cleanup failed. start={start_time.isoformat()} " + f"end={end_time.isoformat()} duration={elapsed} - {str(e)}", + fg="red", + ) + ) + raise diff --git a/docker/.env.example b/docker/.env.example index 627a3a23da..c7246ae11f 100644 --- a/docker/.env.example +++ b/docker/.env.example @@ -1518,3 +1518,4 @@ AMPLITUDE_API_KEY= SANDBOX_EXPIRED_RECORDS_CLEAN_GRACEFUL_PERIOD=21 SANDBOX_EXPIRED_RECORDS_CLEAN_BATCH_SIZE=1000 SANDBOX_EXPIRED_RECORDS_RETENTION_DAYS=30 +SANDBOX_EXPIRED_RECORDS_CLEAN_TASK_LOCK_TTL=90000 diff --git a/docker/docker-compose.yaml b/docker/docker-compose.yaml index 429667e75f..902ca3103c 100644 --- a/docker/docker-compose.yaml +++ b/docker/docker-compose.yaml @@ -682,6 +682,7 @@ x-shared-env: &shared-api-worker-env SANDBOX_EXPIRED_RECORDS_CLEAN_GRACEFUL_PERIOD: ${SANDBOX_EXPIRED_RECORDS_CLEAN_GRACEFUL_PERIOD:-21} SANDBOX_EXPIRED_RECORDS_CLEAN_BATCH_SIZE: ${SANDBOX_EXPIRED_RECORDS_CLEAN_BATCH_SIZE:-1000} SANDBOX_EXPIRED_RECORDS_RETENTION_DAYS: ${SANDBOX_EXPIRED_RECORDS_RETENTION_DAYS:-30} + SANDBOX_EXPIRED_RECORDS_CLEAN_TASK_LOCK_TTL: ${SANDBOX_EXPIRED_RECORDS_CLEAN_TASK_LOCK_TTL:-90000} services: # Init container to fix permissions