From 4971e11734e668800a7f5c9c1b879fee22061e28 Mon Sep 17 00:00:00 2001 From: QuantumGhost Date: Fri, 6 Feb 2026 15:12:32 +0800 Subject: [PATCH] perf: use batch delete method instead of single delete (#32036) Co-authored-by: fatelei Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com> Co-authored-by: FFXN --- api/tasks/batch_clean_document_task.py | 211 ++++++++++++++---- api/tasks/delete_segment_from_index_task.py | 11 +- api/tasks/document_indexing_sync_task.py | 4 +- .../tasks/test_document_indexing_sync_task.py | 15 ++ 4 files changed, 190 insertions(+), 51 deletions(-) diff --git a/api/tasks/batch_clean_document_task.py b/api/tasks/batch_clean_document_task.py index d388284980..747106d373 100644 --- a/api/tasks/batch_clean_document_task.py +++ b/api/tasks/batch_clean_document_task.py @@ -14,6 +14,9 @@ from models.model import UploadFile logger = logging.getLogger(__name__) +# Batch size for database operations to keep transactions short +BATCH_SIZE = 1000 + @shared_task(queue="dataset") def batch_clean_document_task(document_ids: list[str], dataset_id: str, doc_form: str | None, file_ids: list[str]): @@ -31,63 +34,179 @@ def batch_clean_document_task(document_ids: list[str], dataset_id: str, doc_form if not doc_form: raise ValueError("doc_form is required") - with session_factory.create_session() as session: - try: - dataset = session.query(Dataset).where(Dataset.id == dataset_id).first() - - if not dataset: - raise Exception("Document has no dataset") - - session.query(DatasetMetadataBinding).where( - DatasetMetadataBinding.dataset_id == dataset_id, - DatasetMetadataBinding.document_id.in_(document_ids), - ).delete(synchronize_session=False) + storage_keys_to_delete: list[str] = [] + index_node_ids: list[str] = [] + segment_ids: list[str] = [] + total_image_upload_file_ids: list[str] = [] + try: + # ============ Step 1: Query segment and file data (short read-only transaction) ============ + with session_factory.create_session() as session: + # Get segments info segments = session.scalars( select(DocumentSegment).where(DocumentSegment.document_id.in_(document_ids)) ).all() - # check segment is exist + if segments: index_node_ids = [segment.index_node_id for segment in segments] - index_processor = IndexProcessorFactory(doc_form).init_index_processor() - index_processor.clean( - dataset, index_node_ids, with_keywords=True, delete_child_chunks=True, delete_summaries=True - ) + segment_ids = [segment.id for segment in segments] + # Collect image file IDs from segment content for segment in segments: image_upload_file_ids = get_image_upload_file_ids(segment.content) - image_files = session.query(UploadFile).where(UploadFile.id.in_(image_upload_file_ids)).all() - for image_file in image_files: - try: - if image_file and image_file.key: - storage.delete(image_file.key) - except Exception: - logger.exception( - "Delete image_files failed when storage deleted, \ - image_upload_file_is: %s", - image_file.id, - ) - stmt = delete(UploadFile).where(UploadFile.id.in_(image_upload_file_ids)) - session.execute(stmt) - session.delete(segment) + total_image_upload_file_ids.extend(image_upload_file_ids) + + # Query storage keys for image files + if total_image_upload_file_ids: + image_files = session.scalars( + select(UploadFile).where(UploadFile.id.in_(total_image_upload_file_ids)) + ).all() + storage_keys_to_delete.extend([f.key for f in image_files if f and f.key]) + + # Query storage keys for document files if file_ids: files = session.scalars(select(UploadFile).where(UploadFile.id.in_(file_ids))).all() - for file in files: - try: - storage.delete(file.key) - except Exception: - logger.exception("Delete file failed when document deleted, file_id: %s", file.id) - stmt = delete(UploadFile).where(UploadFile.id.in_(file_ids)) - session.execute(stmt) + storage_keys_to_delete.extend([f.key for f in files if f and f.key]) - session.commit() - - end_at = time.perf_counter() - logger.info( - click.style( - f"Cleaned documents when documents deleted latency: {end_at - start_at}", - fg="green", + # ============ Step 2: Clean vector index (external service, fresh session for dataset) ============ + if index_node_ids: + try: + # Fetch dataset in a fresh session to avoid DetachedInstanceError + with session_factory.create_session() as session: + dataset = session.query(Dataset).where(Dataset.id == dataset_id).first() + if not dataset: + logger.warning("Dataset not found for vector index cleanup, dataset_id: %s", dataset_id) + else: + index_processor = IndexProcessorFactory(doc_form).init_index_processor() + index_processor.clean( + dataset, index_node_ids, with_keywords=True, delete_child_chunks=True, delete_summaries=True + ) + except Exception: + logger.exception( + "Failed to clean vector index for dataset_id: %s, document_ids: %s, index_node_ids count: %d", + dataset_id, + document_ids, + len(index_node_ids), ) - ) + + # ============ Step 3: Delete metadata binding (separate short transaction) ============ + try: + with session_factory.create_session() as session: + deleted_count = ( + session.query(DatasetMetadataBinding) + .where( + DatasetMetadataBinding.dataset_id == dataset_id, + DatasetMetadataBinding.document_id.in_(document_ids), + ) + .delete(synchronize_session=False) + ) + session.commit() + logger.debug("Deleted %d metadata bindings for dataset_id: %s", deleted_count, dataset_id) except Exception: - logger.exception("Cleaned documents when documents deleted failed") + logger.exception( + "Failed to delete metadata bindings for dataset_id: %s, document_ids: %s", + dataset_id, + document_ids, + ) + + # ============ Step 4: Batch delete UploadFile records (multiple short transactions) ============ + if total_image_upload_file_ids: + failed_batches = 0 + total_batches = (len(total_image_upload_file_ids) + BATCH_SIZE - 1) // BATCH_SIZE + for i in range(0, len(total_image_upload_file_ids), BATCH_SIZE): + batch = total_image_upload_file_ids[i : i + BATCH_SIZE] + try: + with session_factory.create_session() as session: + stmt = delete(UploadFile).where(UploadFile.id.in_(batch)) + session.execute(stmt) + session.commit() + except Exception: + failed_batches += 1 + logger.exception( + "Failed to delete image UploadFile batch %d-%d for dataset_id: %s", + i, + i + len(batch), + dataset_id, + ) + if failed_batches > 0: + logger.warning( + "Image UploadFile deletion: %d/%d batches failed for dataset_id: %s", + failed_batches, + total_batches, + dataset_id, + ) + + # ============ Step 5: Batch delete DocumentSegment records (multiple short transactions) ============ + if segment_ids: + failed_batches = 0 + total_batches = (len(segment_ids) + BATCH_SIZE - 1) // BATCH_SIZE + for i in range(0, len(segment_ids), BATCH_SIZE): + batch = segment_ids[i : i + BATCH_SIZE] + try: + with session_factory.create_session() as session: + segment_delete_stmt = delete(DocumentSegment).where(DocumentSegment.id.in_(batch)) + session.execute(segment_delete_stmt) + session.commit() + except Exception: + failed_batches += 1 + logger.exception( + "Failed to delete DocumentSegment batch %d-%d for dataset_id: %s, document_ids: %s", + i, + i + len(batch), + dataset_id, + document_ids, + ) + if failed_batches > 0: + logger.warning( + "DocumentSegment deletion: %d/%d batches failed, document_ids: %s", + failed_batches, + total_batches, + document_ids, + ) + + # ============ Step 6: Delete document-associated files (separate short transaction) ============ + if file_ids: + try: + with session_factory.create_session() as session: + stmt = delete(UploadFile).where(UploadFile.id.in_(file_ids)) + session.execute(stmt) + session.commit() + except Exception: + logger.exception( + "Failed to delete document UploadFile records for dataset_id: %s, file_ids: %s", + dataset_id, + file_ids, + ) + + # ============ Step 7: Delete storage files (I/O operations, no DB transaction) ============ + storage_delete_failures = 0 + for storage_key in storage_keys_to_delete: + try: + storage.delete(storage_key) + except Exception: + storage_delete_failures += 1 + logger.exception("Failed to delete file from storage, key: %s", storage_key) + if storage_delete_failures > 0: + logger.warning( + "Storage file deletion completed with %d failures out of %d total files for dataset_id: %s", + storage_delete_failures, + len(storage_keys_to_delete), + dataset_id, + ) + + end_at = time.perf_counter() + logger.info( + click.style( + f"Cleaned documents when documents deleted latency: {end_at - start_at:.2f}s, " + f"dataset_id: {dataset_id}, document_ids: {document_ids}, " + f"segments: {len(segment_ids)}, image_files: {len(total_image_upload_file_ids)}, " + f"storage_files: {len(storage_keys_to_delete)}", + fg="green", + ) + ) + except Exception: + logger.exception( + "Batch clean documents failed for dataset_id: %s, document_ids: %s", + dataset_id, + document_ids, + ) diff --git a/api/tasks/delete_segment_from_index_task.py b/api/tasks/delete_segment_from_index_task.py index 764c635d83..a6a2dcebc8 100644 --- a/api/tasks/delete_segment_from_index_task.py +++ b/api/tasks/delete_segment_from_index_task.py @@ -3,6 +3,7 @@ import time import click from celery import shared_task +from sqlalchemy import delete from core.db.session_factory import session_factory from core.rag.index_processor.index_processor_factory import IndexProcessorFactory @@ -67,8 +68,14 @@ def delete_segment_from_index_task( if segment_attachment_bindings: attachment_ids = [binding.attachment_id for binding in segment_attachment_bindings] index_processor.clean(dataset=dataset, node_ids=attachment_ids, with_keywords=False) - for binding in segment_attachment_bindings: - session.delete(binding) + segment_attachment_bind_ids = [i.id for i in segment_attachment_bindings] + + for i in range(0, len(segment_attachment_bind_ids), 1000): + segment_attachment_bind_delete_stmt = delete(SegmentAttachmentBinding).where( + SegmentAttachmentBinding.id.in_(segment_attachment_bind_ids[i : i + 1000]) + ) + session.execute(segment_attachment_bind_delete_stmt) + # delete upload file session.query(UploadFile).where(UploadFile.id.in_(attachment_ids)).delete(synchronize_session=False) session.commit() diff --git a/api/tasks/document_indexing_sync_task.py b/api/tasks/document_indexing_sync_task.py index 149185f6e2..8fa5faa796 100644 --- a/api/tasks/document_indexing_sync_task.py +++ b/api/tasks/document_indexing_sync_task.py @@ -28,7 +28,7 @@ def document_indexing_sync_task(dataset_id: str, document_id: str): logger.info(click.style(f"Start sync document: {document_id}", fg="green")) start_at = time.perf_counter() - with session_factory.create_session() as session: + with session_factory.create_session() as session, session.begin(): document = session.query(Document).where(Document.id == document_id, Document.dataset_id == dataset_id).first() if not document: @@ -68,7 +68,6 @@ def document_indexing_sync_task(dataset_id: str, document_id: str): document.indexing_status = "error" document.error = "Datasource credential not found. Please reconnect your Notion workspace." document.stopped_at = naive_utc_now() - session.commit() return loader = NotionExtractor( @@ -85,7 +84,6 @@ def document_indexing_sync_task(dataset_id: str, document_id: str): if last_edited_time != page_edited_time: document.indexing_status = "parsing" document.processing_started_at = naive_utc_now() - session.commit() # delete all document segment and index try: diff --git a/api/tests/unit_tests/tasks/test_document_indexing_sync_task.py b/api/tests/unit_tests/tasks/test_document_indexing_sync_task.py index fa33034f40..24e0bc76cf 100644 --- a/api/tests/unit_tests/tasks/test_document_indexing_sync_task.py +++ b/api/tests/unit_tests/tasks/test_document_indexing_sync_task.py @@ -114,6 +114,21 @@ def mock_db_session(): session = MagicMock() # Ensure tests can observe session.close() via context manager teardown session.close = MagicMock() + session.commit = MagicMock() + + # Mock session.begin() context manager to auto-commit on exit + begin_cm = MagicMock() + begin_cm.__enter__.return_value = session + + def _begin_exit_side_effect(*args, **kwargs): + # session.begin().__exit__() should commit if no exception + if args[0] is None: # No exception + session.commit() + + begin_cm.__exit__.side_effect = _begin_exit_side_effect + session.begin.return_value = begin_cm + + # Mock create_session() context manager cm = MagicMock() cm.__enter__.return_value = session