diff --git a/api/.env.example b/api/.env.example index fcadfa1c3b..554b1624ec 100644 --- a/api/.env.example +++ b/api/.env.example @@ -715,6 +715,7 @@ ANNOTATION_IMPORT_MAX_CONCURRENT=5 # Sandbox expired records clean configuration SANDBOX_EXPIRED_RECORDS_CLEAN_GRACEFUL_PERIOD=21 SANDBOX_EXPIRED_RECORDS_CLEAN_BATCH_SIZE=1000 +SANDBOX_EXPIRED_RECORDS_CLEAN_BATCH_MAX_INTERVAL=200 SANDBOX_EXPIRED_RECORDS_RETENTION_DAYS=30 SANDBOX_EXPIRED_RECORDS_CLEAN_TASK_LOCK_TTL=90000 diff --git a/api/configs/feature/__init__.py b/api/configs/feature/__init__.py index ded5cf03ab..46dad6fc05 100644 --- a/api/configs/feature/__init__.py +++ b/api/configs/feature/__init__.py @@ -1344,6 +1344,10 @@ class SandboxExpiredRecordsCleanConfig(BaseSettings): description="Maximum number of records to process in each batch", default=1000, ) + SANDBOX_EXPIRED_RECORDS_CLEAN_BATCH_MAX_INTERVAL: PositiveInt = Field( + description="Maximum interval in milliseconds between batches", + default=200, + ) SANDBOX_EXPIRED_RECORDS_RETENTION_DAYS: PositiveInt = Field( description="Retention days for sandbox expired workflow_run records and message records", default=30, diff --git a/api/migrations/versions/2026_02_11_1549-fce013ca180e_fix_index_to_optimize_message_clean_job_.py b/api/migrations/versions/2026_02_11_1549-fce013ca180e_fix_index_to_optimize_message_clean_job_.py new file mode 100644 index 0000000000..ed482fbd6d --- /dev/null +++ b/api/migrations/versions/2026_02_11_1549-fce013ca180e_fix_index_to_optimize_message_clean_job_.py @@ -0,0 +1,39 @@ +"""fix index to optimize message clean job performance + +Revision ID: fce013ca180e +Revises: f55813ffe2c8 +Create Date: 2026-02-11 15:49:17.603638 + +""" +from alembic import op +import models as models +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision = 'fce013ca180e' +down_revision = 'f55813ffe2c8' +branch_labels = None +depends_on = None + + +def upgrade(): + # ### commands auto generated by Alembic - please adjust! ### + with op.batch_alter_table('messages', schema=None) as batch_op: + batch_op.drop_index(batch_op.f('message_created_at_idx')) + + with op.batch_alter_table('saved_messages', schema=None) as batch_op: + batch_op.create_index('saved_message_message_id_idx', ['message_id'], unique=False) + + # ### end Alembic commands ### + + +def downgrade(): + # ### commands auto generated by Alembic - please adjust! ### + with op.batch_alter_table('saved_messages', schema=None) as batch_op: + batch_op.drop_index('saved_message_message_id_idx') + + with op.batch_alter_table('messages', schema=None) as batch_op: + batch_op.create_index(batch_op.f('message_created_at_idx'), ['created_at'], unique=False) + + # ### end Alembic commands ### diff --git a/api/models/model.py b/api/models/model.py index be4e5b819a..e2a9bb70cf 100644 --- a/api/models/model.py +++ b/api/models/model.py @@ -1040,7 +1040,6 @@ class Message(Base): Index("message_end_user_idx", "app_id", "from_source", "from_end_user_id"), Index("message_account_idx", "app_id", "from_source", "from_account_id"), Index("message_workflow_run_id_idx", "conversation_id", "workflow_run_id"), - Index("message_created_at_idx", "created_at"), Index("message_app_mode_idx", "app_mode"), Index("message_created_at_id_idx", "created_at", "id"), ) diff --git a/api/models/web.py b/api/models/web.py index b2832aa163..5f6a7b40bf 100644 --- a/api/models/web.py +++ b/api/models/web.py @@ -16,6 +16,7 @@ class SavedMessage(TypeBase): __table_args__ = ( sa.PrimaryKeyConstraint("id", name="saved_message_pkey"), sa.Index("saved_message_message_idx", "app_id", "message_id", "created_by_role", "created_by"), + sa.Index("saved_message_message_id_idx", "message_id"), ) id: Mapped[str] = mapped_column( diff --git a/api/services/retention/conversation/messages_clean_service.py b/api/services/retention/conversation/messages_clean_service.py index 3ca5d82860..f7836a2b14 100644 --- a/api/services/retention/conversation/messages_clean_service.py +++ b/api/services/retention/conversation/messages_clean_service.py @@ -1,10 +1,13 @@ import datetime import logging +import os import random +import time from collections.abc import Sequence from typing import cast -from sqlalchemy import delete, select +import sqlalchemy as sa +from sqlalchemy import delete, select, tuple_ from sqlalchemy.engine import CursorResult from sqlalchemy.orm import Session @@ -193,11 +196,15 @@ class MessagesCleanService: self._end_before, ) + max_batch_interval_ms = int(os.environ.get("SANDBOX_EXPIRED_RECORDS_CLEAN_BATCH_MAX_INTERVAL", 200)) + while True: stats["batches"] += 1 + batch_start = time.monotonic() # Step 1: Fetch a batch of messages using cursor with Session(db.engine, expire_on_commit=False) as session: + fetch_messages_start = time.monotonic() msg_stmt = ( select(Message.id, Message.app_id, Message.created_at) .where(Message.created_at < self._end_before) @@ -209,13 +216,13 @@ class MessagesCleanService: msg_stmt = msg_stmt.where(Message.created_at >= self._start_from) # Apply cursor condition: (created_at, id) > (last_created_at, last_message_id) - # This translates to: - # created_at > last_created_at OR (created_at = last_created_at AND id > last_message_id) if _cursor: - # Continuing from previous batch msg_stmt = msg_stmt.where( - (Message.created_at > _cursor[0]) - | ((Message.created_at == _cursor[0]) & (Message.id > _cursor[1])) + tuple_(Message.created_at, Message.id) + > tuple_( + sa.literal(_cursor[0], type_=sa.DateTime()), + sa.literal(_cursor[1], type_=Message.id.type), + ) ) raw_messages = list(session.execute(msg_stmt).all()) @@ -223,6 +230,12 @@ class MessagesCleanService: SimpleMessage(id=msg_id, app_id=app_id, created_at=msg_created_at) for msg_id, app_id, msg_created_at in raw_messages ] + logger.info( + "clean_messages (batch %s): fetched %s messages in %sms", + stats["batches"], + len(messages), + int((time.monotonic() - fetch_messages_start) * 1000), + ) # Track total messages fetched across all batches stats["total_messages"] += len(messages) @@ -241,8 +254,16 @@ class MessagesCleanService: logger.info("clean_messages (batch %s): no app_ids found, skip", stats["batches"]) continue + fetch_apps_start = time.monotonic() app_stmt = select(App.id, App.tenant_id).where(App.id.in_(app_ids)) apps = list(session.execute(app_stmt).all()) + logger.info( + "clean_messages (batch %s): fetched %s apps for %s app_ids in %sms", + stats["batches"], + len(apps), + len(app_ids), + int((time.monotonic() - fetch_apps_start) * 1000), + ) if not apps: logger.info("clean_messages (batch %s): no apps found, skip", stats["batches"]) @@ -252,7 +273,15 @@ class MessagesCleanService: app_to_tenant: dict[str, str] = {app.id: app.tenant_id for app in apps} # Step 3: Delegate to policy to determine which messages to delete + policy_start = time.monotonic() message_ids_to_delete = self._policy.filter_message_ids(messages, app_to_tenant) + logger.info( + "clean_messages (batch %s): policy selected %s/%s messages in %sms", + stats["batches"], + len(message_ids_to_delete), + len(messages), + int((time.monotonic() - policy_start) * 1000), + ) if not message_ids_to_delete: logger.info("clean_messages (batch %s): no messages to delete, skip", stats["batches"]) @@ -263,14 +292,20 @@ class MessagesCleanService: # Step 4: Batch delete messages and their relations if not self._dry_run: with Session(db.engine, expire_on_commit=False) as session: + delete_relations_start = time.monotonic() # Delete related records first self._batch_delete_message_relations(session, message_ids_to_delete) + delete_relations_ms = int((time.monotonic() - delete_relations_start) * 1000) # Delete messages + delete_messages_start = time.monotonic() delete_stmt = delete(Message).where(Message.id.in_(message_ids_to_delete)) delete_result = cast(CursorResult, session.execute(delete_stmt)) messages_deleted = delete_result.rowcount + delete_messages_ms = int((time.monotonic() - delete_messages_start) * 1000) + commit_start = time.monotonic() session.commit() + commit_ms = int((time.monotonic() - commit_start) * 1000) stats["total_deleted"] += messages_deleted @@ -280,6 +315,19 @@ class MessagesCleanService: len(messages), messages_deleted, ) + logger.info( + "clean_messages (batch %s): relations %sms, messages %sms, commit %sms, batch total %sms", + stats["batches"], + delete_relations_ms, + delete_messages_ms, + commit_ms, + int((time.monotonic() - batch_start) * 1000), + ) + + # Random sleep between batches to avoid overwhelming the database + sleep_ms = random.uniform(0, max_batch_interval_ms) # noqa: S311 + logger.info("clean_messages (batch %s): sleeping for %.2fms", stats["batches"], sleep_ms) + time.sleep(sleep_ms / 1000) else: # Log random sample of message IDs that would be deleted (up to 10) sample_size = min(10, len(message_ids_to_delete)) diff --git a/api/services/retention/workflow_run/clear_free_plan_expired_workflow_run_logs.py b/api/services/retention/workflow_run/clear_free_plan_expired_workflow_run_logs.py index c3e0dce399..2c94cb5324 100644 --- a/api/services/retention/workflow_run/clear_free_plan_expired_workflow_run_logs.py +++ b/api/services/retention/workflow_run/clear_free_plan_expired_workflow_run_logs.py @@ -1,5 +1,8 @@ import datetime import logging +import os +import random +import time from collections.abc import Iterable, Sequence import click @@ -72,7 +75,12 @@ class WorkflowRunCleanup: batch_index = 0 last_seen: tuple[datetime.datetime, str] | None = None + max_batch_interval_ms = int(os.environ.get("SANDBOX_EXPIRED_RECORDS_CLEAN_BATCH_MAX_INTERVAL", 200)) + while True: + batch_start = time.monotonic() + + fetch_start = time.monotonic() run_rows = self.workflow_run_repo.get_runs_batch_by_time_range( start_from=self.window_start, end_before=self.window_end, @@ -80,12 +88,30 @@ class WorkflowRunCleanup: batch_size=self.batch_size, ) if not run_rows: + logger.info("workflow_run_cleanup (batch #%s): no more rows to process", batch_index + 1) break batch_index += 1 last_seen = (run_rows[-1].created_at, run_rows[-1].id) + logger.info( + "workflow_run_cleanup (batch #%s): fetched %s rows in %sms", + batch_index, + len(run_rows), + int((time.monotonic() - fetch_start) * 1000), + ) + tenant_ids = {row.tenant_id for row in run_rows} + + filter_start = time.monotonic() free_tenants = self._filter_free_tenants(tenant_ids) + logger.info( + "workflow_run_cleanup (batch #%s): filtered %s free tenants from %s tenants in %sms", + batch_index, + len(free_tenants), + len(tenant_ids), + int((time.monotonic() - filter_start) * 1000), + ) + free_runs = [row for row in run_rows if row.tenant_id in free_tenants] paid_or_skipped = len(run_rows) - len(free_runs) @@ -104,11 +130,17 @@ class WorkflowRunCleanup: total_runs_targeted += len(free_runs) if self.dry_run: + count_start = time.monotonic() batch_counts = self.workflow_run_repo.count_runs_with_related( free_runs, count_node_executions=self._count_node_executions, count_trigger_logs=self._count_trigger_logs, ) + logger.info( + "workflow_run_cleanup (batch #%s, dry_run): counted related records in %sms", + batch_index, + int((time.monotonic() - count_start) * 1000), + ) if related_totals is not None: for key in related_totals: related_totals[key] += batch_counts.get(key, 0) @@ -120,14 +152,21 @@ class WorkflowRunCleanup: fg="yellow", ) ) + logger.info( + "workflow_run_cleanup (batch #%s, dry_run): batch total %sms", + batch_index, + int((time.monotonic() - batch_start) * 1000), + ) continue try: + delete_start = time.monotonic() counts = self.workflow_run_repo.delete_runs_with_related( free_runs, delete_node_executions=self._delete_node_executions, delete_trigger_logs=self._delete_trigger_logs, ) + delete_ms = int((time.monotonic() - delete_start) * 1000) except Exception: logger.exception("Failed to delete workflow runs batch ending at %s", last_seen[0]) raise @@ -143,6 +182,17 @@ class WorkflowRunCleanup: fg="green", ) ) + logger.info( + "workflow_run_cleanup (batch #%s): delete %sms, batch total %sms", + batch_index, + delete_ms, + int((time.monotonic() - batch_start) * 1000), + ) + + # Random sleep between batches to avoid overwhelming the database + sleep_ms = random.uniform(0, max_batch_interval_ms) # noqa: S311 + logger.info("workflow_run_cleanup (batch #%s): sleeping for %.2fms", batch_index, sleep_ms) + time.sleep(sleep_ms / 1000) if self.dry_run: if self.window_start: diff --git a/docker/.env.example b/docker/.env.example index c8db23b9ed..4a141e37d4 100644 --- a/docker/.env.example +++ b/docker/.env.example @@ -1523,6 +1523,7 @@ AMPLITUDE_API_KEY= # Sandbox expired records clean configuration SANDBOX_EXPIRED_RECORDS_CLEAN_GRACEFUL_PERIOD=21 SANDBOX_EXPIRED_RECORDS_CLEAN_BATCH_SIZE=1000 +SANDBOX_EXPIRED_RECORDS_CLEAN_BATCH_MAX_INTERVAL=200 SANDBOX_EXPIRED_RECORDS_RETENTION_DAYS=30 diff --git a/docker/docker-compose.yaml b/docker/docker-compose.yaml index afd64963c4..1f38512661 100644 --- a/docker/docker-compose.yaml +++ b/docker/docker-compose.yaml @@ -684,6 +684,7 @@ x-shared-env: &shared-api-worker-env AMPLITUDE_API_KEY: ${AMPLITUDE_API_KEY:-} SANDBOX_EXPIRED_RECORDS_CLEAN_GRACEFUL_PERIOD: ${SANDBOX_EXPIRED_RECORDS_CLEAN_GRACEFUL_PERIOD:-21} SANDBOX_EXPIRED_RECORDS_CLEAN_BATCH_SIZE: ${SANDBOX_EXPIRED_RECORDS_CLEAN_BATCH_SIZE:-1000} + SANDBOX_EXPIRED_RECORDS_CLEAN_BATCH_MAX_INTERVAL: ${SANDBOX_EXPIRED_RECORDS_CLEAN_BATCH_MAX_INTERVAL:-200} SANDBOX_EXPIRED_RECORDS_RETENTION_DAYS: ${SANDBOX_EXPIRED_RECORDS_RETENTION_DAYS:-30} PUBSUB_REDIS_URL: ${PUBSUB_REDIS_URL:-} PUBSUB_REDIS_CHANNEL_TYPE: ${PUBSUB_REDIS_CHANNEL_TYPE:-pubsub}