diff --git a/api/models/web.py b/api/models/web.py index b2832aa163..5f6a7b40bf 100644 --- a/api/models/web.py +++ b/api/models/web.py @@ -16,6 +16,7 @@ class SavedMessage(TypeBase): __table_args__ = ( sa.PrimaryKeyConstraint("id", name="saved_message_pkey"), sa.Index("saved_message_message_idx", "app_id", "message_id", "created_by_role", "created_by"), + sa.Index("saved_message_message_id_idx", "message_id"), ) id: Mapped[str] = mapped_column( diff --git a/api/services/retention/conversation/messages_clean_service.py b/api/services/retention/conversation/messages_clean_service.py index 3ca5d82860..d8b91234c9 100644 --- a/api/services/retention/conversation/messages_clean_service.py +++ b/api/services/retention/conversation/messages_clean_service.py @@ -1,6 +1,7 @@ import datetime import logging import random +import time from collections.abc import Sequence from typing import cast @@ -195,9 +196,11 @@ class MessagesCleanService: while True: stats["batches"] += 1 + batch_start = time.monotonic() # Step 1: Fetch a batch of messages using cursor with Session(db.engine, expire_on_commit=False) as session: + fetch_messages_start = time.monotonic() msg_stmt = ( select(Message.id, Message.app_id, Message.created_at) .where(Message.created_at < self._end_before) @@ -223,6 +226,12 @@ class MessagesCleanService: SimpleMessage(id=msg_id, app_id=app_id, created_at=msg_created_at) for msg_id, app_id, msg_created_at in raw_messages ] + logger.info( + "clean_messages (batch %s): fetched %s messages in %sms", + stats["batches"], + len(messages), + int((time.monotonic() - fetch_messages_start) * 1000), + ) # Track total messages fetched across all batches stats["total_messages"] += len(messages) @@ -241,8 +250,16 @@ class MessagesCleanService: logger.info("clean_messages (batch %s): no app_ids found, skip", stats["batches"]) continue + fetch_apps_start = time.monotonic() app_stmt = select(App.id, App.tenant_id).where(App.id.in_(app_ids)) apps = list(session.execute(app_stmt).all()) + logger.info( + "clean_messages (batch %s): fetched %s apps for %s app_ids in %sms", + stats["batches"], + len(apps), + len(app_ids), + int((time.monotonic() - fetch_apps_start) * 1000), + ) if not apps: logger.info("clean_messages (batch %s): no apps found, skip", stats["batches"]) @@ -252,7 +269,15 @@ class MessagesCleanService: app_to_tenant: dict[str, str] = {app.id: app.tenant_id for app in apps} # Step 3: Delegate to policy to determine which messages to delete + policy_start = time.monotonic() message_ids_to_delete = self._policy.filter_message_ids(messages, app_to_tenant) + logger.info( + "clean_messages (batch %s): policy selected %s/%s messages in %sms", + stats["batches"], + len(message_ids_to_delete), + len(messages), + int((time.monotonic() - policy_start) * 1000), + ) if not message_ids_to_delete: logger.info("clean_messages (batch %s): no messages to delete, skip", stats["batches"]) @@ -263,14 +288,20 @@ class MessagesCleanService: # Step 4: Batch delete messages and their relations if not self._dry_run: with Session(db.engine, expire_on_commit=False) as session: + delete_relations_start = time.monotonic() # Delete related records first self._batch_delete_message_relations(session, message_ids_to_delete) + delete_relations_ms = int((time.monotonic() - delete_relations_start) * 1000) # Delete messages + delete_messages_start = time.monotonic() delete_stmt = delete(Message).where(Message.id.in_(message_ids_to_delete)) delete_result = cast(CursorResult, session.execute(delete_stmt)) messages_deleted = delete_result.rowcount + delete_messages_ms = int((time.monotonic() - delete_messages_start) * 1000) + commit_start = time.monotonic() session.commit() + commit_ms = int((time.monotonic() - commit_start) * 1000) stats["total_deleted"] += messages_deleted @@ -280,6 +311,14 @@ class MessagesCleanService: len(messages), messages_deleted, ) + logger.info( + "clean_messages (batch %s): relations %sms, messages %sms, commit %sms, batch total %sms", + stats["batches"], + delete_relations_ms, + delete_messages_ms, + commit_ms, + int((time.monotonic() - batch_start) * 1000), + ) else: # Log random sample of message IDs that would be deleted (up to 10) sample_size = min(10, len(message_ids_to_delete))