diff --git a/api/.env.example b/api/.env.example index 8bd2c706c1..dec002cadc 100644 --- a/api/.env.example +++ b/api/.env.example @@ -715,5 +715,6 @@ ANNOTATION_IMPORT_MAX_CONCURRENT=5 # Sandbox expired records clean configuration SANDBOX_EXPIRED_RECORDS_CLEAN_GRACEFUL_PERIOD=21 SANDBOX_EXPIRED_RECORDS_CLEAN_BATCH_SIZE=1000 +SANDBOX_EXPIRED_RECORDS_CLEAN_BATCH_MAX_INTERVAL=200 SANDBOX_EXPIRED_RECORDS_RETENTION_DAYS=30 SANDBOX_EXPIRED_RECORDS_CLEAN_TASK_LOCK_TTL=90000 diff --git a/api/configs/feature/__init__.py b/api/configs/feature/__init__.py index d97e9a0440..d064cfa2d9 100644 --- a/api/configs/feature/__init__.py +++ b/api/configs/feature/__init__.py @@ -1309,6 +1309,10 @@ class SandboxExpiredRecordsCleanConfig(BaseSettings): description="Maximum number of records to process in each batch", default=1000, ) + SANDBOX_EXPIRED_RECORDS_CLEAN_BATCH_MAX_INTERVAL: PositiveInt = Field( + description="Maximum interval in milliseconds between batches", + default=200, + ) SANDBOX_EXPIRED_RECORDS_RETENTION_DAYS: PositiveInt = Field( description="Retention days for sandbox expired workflow_run records and message records", default=30, diff --git a/api/services/retention/conversation/messages_clean_service.py b/api/services/retention/conversation/messages_clean_service.py index d8b91234c9..93a3cff93d 100644 --- a/api/services/retention/conversation/messages_clean_service.py +++ b/api/services/retention/conversation/messages_clean_service.py @@ -1,5 +1,6 @@ import datetime import logging +import os import random import time from collections.abc import Sequence @@ -194,6 +195,8 @@ class MessagesCleanService: self._end_before, ) + max_batch_interval_ms = int(os.environ.get("SANDBOX_EXPIRED_RECORDS_CLEAN_BATCH_MAX_INTERVAL", 200)) + while True: stats["batches"] += 1 batch_start = time.monotonic() @@ -319,6 +322,11 @@ class MessagesCleanService: commit_ms, int((time.monotonic() - batch_start) * 1000), ) + + # Random sleep between batches to avoid overwhelming the database + sleep_ms = random.uniform(0, max_batch_interval_ms) # noqa: S311 + logger.info("clean_messages (batch %s): sleeping for %.2fms", stats["batches"], sleep_ms) + time.sleep(sleep_ms / 1000) else: # Log random sample of message IDs that would be deleted (up to 10) sample_size = min(10, len(message_ids_to_delete)) diff --git a/docker/.env.example b/docker/.env.example index 41a0205bf5..3015c91aed 100644 --- a/docker/.env.example +++ b/docker/.env.example @@ -1518,5 +1518,6 @@ AMPLITUDE_API_KEY= # Sandbox expired records clean configuration SANDBOX_EXPIRED_RECORDS_CLEAN_GRACEFUL_PERIOD=21 SANDBOX_EXPIRED_RECORDS_CLEAN_BATCH_SIZE=1000 +SANDBOX_EXPIRED_RECORDS_CLEAN_BATCH_MAX_INTERVAL=200 SANDBOX_EXPIRED_RECORDS_RETENTION_DAYS=30 SANDBOX_EXPIRED_RECORDS_CLEAN_TASK_LOCK_TTL=90000 diff --git a/docker/docker-compose.yaml b/docker/docker-compose.yaml index 1886f848e0..9c62c09573 100644 --- a/docker/docker-compose.yaml +++ b/docker/docker-compose.yaml @@ -682,6 +682,7 @@ x-shared-env: &shared-api-worker-env AMPLITUDE_API_KEY: ${AMPLITUDE_API_KEY:-} SANDBOX_EXPIRED_RECORDS_CLEAN_GRACEFUL_PERIOD: ${SANDBOX_EXPIRED_RECORDS_CLEAN_GRACEFUL_PERIOD:-21} SANDBOX_EXPIRED_RECORDS_CLEAN_BATCH_SIZE: ${SANDBOX_EXPIRED_RECORDS_CLEAN_BATCH_SIZE:-1000} + SANDBOX_EXPIRED_RECORDS_CLEAN_BATCH_MAX_INTERVAL: ${SANDBOX_EXPIRED_RECORDS_CLEAN_BATCH_MAX_INTERVAL:-200} SANDBOX_EXPIRED_RECORDS_RETENTION_DAYS: ${SANDBOX_EXPIRED_RECORDS_RETENTION_DAYS:-30} SANDBOX_EXPIRED_RECORDS_CLEAN_TASK_LOCK_TTL: ${SANDBOX_EXPIRED_RECORDS_CLEAN_TASK_LOCK_TTL:-90000}