From acef56d7fd6d913272666b1dfe2a5b2a981f2fcd Mon Sep 17 00:00:00 2001
From: Jyong <76649700+JohnJyong@users.noreply.github.com>
Date: Mon, 15 Dec 2025 12:00:03 +0800
Subject: [PATCH] fix: delete knowledge pipeline but pipeline and workflow
don't delete (#29591)
Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
---
.../clean_when_dataset_deleted.py | 1 +
api/tasks/clean_dataset_task.py | 12 +
.../tasks/test_clean_dataset_task.py | 1232 +++++++++++++++++
3 files changed, 1245 insertions(+)
create mode 100644 api/tests/unit_tests/tasks/test_clean_dataset_task.py
diff --git a/api/events/event_handlers/clean_when_dataset_deleted.py b/api/events/event_handlers/clean_when_dataset_deleted.py
index 1666e2e29f..d6007662d8 100644
--- a/api/events/event_handlers/clean_when_dataset_deleted.py
+++ b/api/events/event_handlers/clean_when_dataset_deleted.py
@@ -15,4 +15,5 @@ def handle(sender: Dataset, **kwargs):
dataset.index_struct,
dataset.collection_binding_id,
dataset.doc_form,
+ dataset.pipeline_id,
)
diff --git a/api/tasks/clean_dataset_task.py b/api/tasks/clean_dataset_task.py
index 8608df6b8e..b4d82a150d 100644
--- a/api/tasks/clean_dataset_task.py
+++ b/api/tasks/clean_dataset_task.py
@@ -9,6 +9,7 @@ from core.rag.index_processor.index_processor_factory import IndexProcessorFacto
from core.tools.utils.web_reader_tool import get_image_upload_file_ids
from extensions.ext_database import db
from extensions.ext_storage import storage
+from models import WorkflowType
from models.dataset import (
AppDatasetJoin,
Dataset,
@@ -18,9 +19,11 @@ from models.dataset import (
DatasetQuery,
Document,
DocumentSegment,
+ Pipeline,
SegmentAttachmentBinding,
)
from models.model import UploadFile
+from models.workflow import Workflow
logger = logging.getLogger(__name__)
@@ -34,6 +37,7 @@ def clean_dataset_task(
index_struct: str,
collection_binding_id: str,
doc_form: str,
+ pipeline_id: str | None = None,
):
"""
Clean dataset when dataset deleted.
@@ -135,6 +139,14 @@ def clean_dataset_task(
# delete dataset metadata
db.session.query(DatasetMetadata).where(DatasetMetadata.dataset_id == dataset_id).delete()
db.session.query(DatasetMetadataBinding).where(DatasetMetadataBinding.dataset_id == dataset_id).delete()
+ # delete pipeline and workflow
+ if pipeline_id:
+ db.session.query(Pipeline).where(Pipeline.id == pipeline_id).delete()
+ db.session.query(Workflow).where(
+ Workflow.tenant_id == tenant_id,
+ Workflow.app_id == pipeline_id,
+ Workflow.type == WorkflowType.RAG_PIPELINE,
+ ).delete()
# delete files
if documents:
for document in documents:
diff --git a/api/tests/unit_tests/tasks/test_clean_dataset_task.py b/api/tests/unit_tests/tasks/test_clean_dataset_task.py
new file mode 100644
index 0000000000..bace66bec4
--- /dev/null
+++ b/api/tests/unit_tests/tasks/test_clean_dataset_task.py
@@ -0,0 +1,1232 @@
+"""
+Unit tests for clean_dataset_task.
+
+This module tests the dataset cleanup task functionality including:
+- Basic cleanup of documents and segments
+- Vector database cleanup with IndexProcessorFactory
+- Storage file deletion
+- Invalid doc_form handling with default fallback
+- Error handling and database session rollback
+- Pipeline and workflow deletion
+- Segment attachment cleanup
+"""
+
+import uuid
+from unittest.mock import MagicMock, patch
+
+import pytest
+
+from tasks.clean_dataset_task import clean_dataset_task
+
+# ============================================================================
+# Fixtures
+# ============================================================================
+
+
+@pytest.fixture
+def tenant_id():
+ """Generate a unique tenant ID for testing."""
+ return str(uuid.uuid4())
+
+
+@pytest.fixture
+def dataset_id():
+ """Generate a unique dataset ID for testing."""
+ return str(uuid.uuid4())
+
+
+@pytest.fixture
+def collection_binding_id():
+ """Generate a unique collection binding ID for testing."""
+ return str(uuid.uuid4())
+
+
+@pytest.fixture
+def pipeline_id():
+ """Generate a unique pipeline ID for testing."""
+ return str(uuid.uuid4())
+
+
+@pytest.fixture
+def mock_db_session():
+ """Mock database session with query capabilities."""
+ with patch("tasks.clean_dataset_task.db") as mock_db:
+ mock_session = MagicMock()
+ mock_db.session = mock_session
+
+ # Setup query chain
+ mock_query = MagicMock()
+ mock_session.query.return_value = mock_query
+ mock_query.where.return_value = mock_query
+ mock_query.delete.return_value = 0
+
+ # Setup scalars for select queries
+ mock_session.scalars.return_value.all.return_value = []
+
+ # Setup execute for JOIN queries
+ mock_session.execute.return_value.all.return_value = []
+
+ yield mock_db
+
+
+@pytest.fixture
+def mock_storage():
+ """Mock storage client."""
+ with patch("tasks.clean_dataset_task.storage") as mock_storage:
+ mock_storage.delete.return_value = None
+ yield mock_storage
+
+
+@pytest.fixture
+def mock_index_processor_factory():
+ """Mock IndexProcessorFactory."""
+ with patch("tasks.clean_dataset_task.IndexProcessorFactory") as mock_factory:
+ mock_processor = MagicMock()
+ mock_processor.clean.return_value = None
+ mock_factory_instance = MagicMock()
+ mock_factory_instance.init_index_processor.return_value = mock_processor
+ mock_factory.return_value = mock_factory_instance
+
+ yield {
+ "factory": mock_factory,
+ "factory_instance": mock_factory_instance,
+ "processor": mock_processor,
+ }
+
+
+@pytest.fixture
+def mock_get_image_upload_file_ids():
+ """Mock get_image_upload_file_ids function."""
+ with patch("tasks.clean_dataset_task.get_image_upload_file_ids") as mock_func:
+ mock_func.return_value = []
+ yield mock_func
+
+
+@pytest.fixture
+def mock_document():
+ """Create a mock Document object."""
+ doc = MagicMock()
+ doc.id = str(uuid.uuid4())
+ doc.tenant_id = str(uuid.uuid4())
+ doc.dataset_id = str(uuid.uuid4())
+ doc.data_source_type = "upload_file"
+ doc.data_source_info = '{"upload_file_id": "test-file-id"}'
+ doc.data_source_info_dict = {"upload_file_id": "test-file-id"}
+ return doc
+
+
+@pytest.fixture
+def mock_segment():
+ """Create a mock DocumentSegment object."""
+ segment = MagicMock()
+ segment.id = str(uuid.uuid4())
+ segment.content = "Test segment content"
+ return segment
+
+
+@pytest.fixture
+def mock_upload_file():
+ """Create a mock UploadFile object."""
+ upload_file = MagicMock()
+ upload_file.id = str(uuid.uuid4())
+ upload_file.key = f"test_files/{uuid.uuid4()}.txt"
+ return upload_file
+
+
+# ============================================================================
+# Test Basic Cleanup
+# ============================================================================
+
+
+class TestBasicCleanup:
+ """Test cases for basic dataset cleanup functionality."""
+
+ def test_clean_dataset_task_empty_dataset(
+ self,
+ dataset_id,
+ tenant_id,
+ collection_binding_id,
+ mock_db_session,
+ mock_storage,
+ mock_index_processor_factory,
+ mock_get_image_upload_file_ids,
+ ):
+ """
+ Test cleanup of an empty dataset with no documents or segments.
+
+ Scenario:
+ - Dataset has no documents or segments
+ - Should still clean vector database and delete related records
+
+ Expected behavior:
+ - IndexProcessorFactory is called to clean vector database
+ - No storage deletions occur
+ - Related records (DatasetProcessRule, etc.) are deleted
+ - Session is committed and closed
+ """
+ # Arrange
+ mock_db_session.session.scalars.return_value.all.return_value = []
+
+ # Act
+ clean_dataset_task(
+ dataset_id=dataset_id,
+ tenant_id=tenant_id,
+ indexing_technique="high_quality",
+ index_struct='{"type": "paragraph"}',
+ collection_binding_id=collection_binding_id,
+ doc_form="paragraph_index",
+ )
+
+ # Assert
+ mock_index_processor_factory["factory"].assert_called_once_with("paragraph_index")
+ mock_index_processor_factory["processor"].clean.assert_called_once()
+ mock_storage.delete.assert_not_called()
+ mock_db_session.session.commit.assert_called_once()
+ mock_db_session.session.close.assert_called_once()
+
+ def test_clean_dataset_task_with_documents_and_segments(
+ self,
+ dataset_id,
+ tenant_id,
+ collection_binding_id,
+ mock_db_session,
+ mock_storage,
+ mock_index_processor_factory,
+ mock_get_image_upload_file_ids,
+ mock_document,
+ mock_segment,
+ ):
+ """
+ Test cleanup of dataset with documents and segments.
+
+ Scenario:
+ - Dataset has one document and one segment
+ - No image files in segment content
+
+ Expected behavior:
+ - Documents and segments are deleted
+ - Vector database is cleaned
+ - Session is committed
+ """
+ # Arrange
+ mock_db_session.session.scalars.return_value.all.side_effect = [
+ [mock_document], # documents
+ [mock_segment], # segments
+ ]
+ mock_get_image_upload_file_ids.return_value = []
+
+ # Act
+ clean_dataset_task(
+ dataset_id=dataset_id,
+ tenant_id=tenant_id,
+ indexing_technique="high_quality",
+ index_struct='{"type": "paragraph"}',
+ collection_binding_id=collection_binding_id,
+ doc_form="paragraph_index",
+ )
+
+ # Assert
+ mock_db_session.session.delete.assert_any_call(mock_document)
+ mock_db_session.session.delete.assert_any_call(mock_segment)
+ mock_db_session.session.commit.assert_called_once()
+
+ def test_clean_dataset_task_deletes_related_records(
+ self,
+ dataset_id,
+ tenant_id,
+ collection_binding_id,
+ mock_db_session,
+ mock_storage,
+ mock_index_processor_factory,
+ mock_get_image_upload_file_ids,
+ ):
+ """
+ Test that all related records are deleted.
+
+ Expected behavior:
+ - DatasetProcessRule records are deleted
+ - DatasetQuery records are deleted
+ - AppDatasetJoin records are deleted
+ - DatasetMetadata records are deleted
+ - DatasetMetadataBinding records are deleted
+ """
+ # Arrange
+ mock_query = mock_db_session.session.query.return_value
+ mock_query.where.return_value = mock_query
+ mock_query.delete.return_value = 1
+
+ # Act
+ clean_dataset_task(
+ dataset_id=dataset_id,
+ tenant_id=tenant_id,
+ indexing_technique="high_quality",
+ index_struct='{"type": "paragraph"}',
+ collection_binding_id=collection_binding_id,
+ doc_form="paragraph_index",
+ )
+
+ # Assert - verify query.where.delete was called multiple times
+ # for different models (DatasetProcessRule, DatasetQuery, etc.)
+ assert mock_query.delete.call_count >= 5
+
+
+# ============================================================================
+# Test Doc Form Validation
+# ============================================================================
+
+
+class TestDocFormValidation:
+ """Test cases for doc_form validation and default fallback."""
+
+ @pytest.mark.parametrize(
+ "invalid_doc_form",
+ [
+ None,
+ "",
+ " ",
+ "\t",
+ "\n",
+ " \t\n ",
+ ],
+ )
+ def test_clean_dataset_task_invalid_doc_form_uses_default(
+ self,
+ invalid_doc_form,
+ dataset_id,
+ tenant_id,
+ collection_binding_id,
+ mock_db_session,
+ mock_storage,
+ mock_index_processor_factory,
+ mock_get_image_upload_file_ids,
+ ):
+ """
+ Test that invalid doc_form values use default paragraph index type.
+
+ Scenario:
+ - doc_form is None, empty, or whitespace-only
+ - Should use default IndexStructureType.PARAGRAPH_INDEX
+
+ Expected behavior:
+ - Default index type is used for cleanup
+ - No errors are raised
+ - Cleanup proceeds normally
+ """
+ # Arrange - import to verify the default value
+ from core.rag.index_processor.constant.index_type import IndexStructureType
+
+ # Act
+ clean_dataset_task(
+ dataset_id=dataset_id,
+ tenant_id=tenant_id,
+ indexing_technique="high_quality",
+ index_struct='{"type": "paragraph"}',
+ collection_binding_id=collection_binding_id,
+ doc_form=invalid_doc_form,
+ )
+
+ # Assert - IndexProcessorFactory should be called with default type
+ mock_index_processor_factory["factory"].assert_called_once_with(IndexStructureType.PARAGRAPH_INDEX)
+ mock_index_processor_factory["processor"].clean.assert_called_once()
+
+ def test_clean_dataset_task_valid_doc_form_used_directly(
+ self,
+ dataset_id,
+ tenant_id,
+ collection_binding_id,
+ mock_db_session,
+ mock_storage,
+ mock_index_processor_factory,
+ mock_get_image_upload_file_ids,
+ ):
+ """
+ Test that valid doc_form values are used directly.
+
+ Expected behavior:
+ - Provided doc_form is passed to IndexProcessorFactory
+ """
+ # Arrange
+ valid_doc_form = "qa_index"
+
+ # Act
+ clean_dataset_task(
+ dataset_id=dataset_id,
+ tenant_id=tenant_id,
+ indexing_technique="high_quality",
+ index_struct='{"type": "paragraph"}',
+ collection_binding_id=collection_binding_id,
+ doc_form=valid_doc_form,
+ )
+
+ # Assert
+ mock_index_processor_factory["factory"].assert_called_once_with(valid_doc_form)
+
+
+# ============================================================================
+# Test Error Handling
+# ============================================================================
+
+
+class TestErrorHandling:
+ """Test cases for error handling and recovery."""
+
+ def test_clean_dataset_task_vector_cleanup_failure_continues(
+ self,
+ dataset_id,
+ tenant_id,
+ collection_binding_id,
+ mock_db_session,
+ mock_storage,
+ mock_index_processor_factory,
+ mock_get_image_upload_file_ids,
+ mock_document,
+ mock_segment,
+ ):
+ """
+ Test that document cleanup continues even if vector cleanup fails.
+
+ Scenario:
+ - IndexProcessor.clean() raises an exception
+ - Document and segment deletion should still proceed
+
+ Expected behavior:
+ - Exception is caught and logged
+ - Documents and segments are still deleted
+ - Session is committed
+ """
+ # Arrange
+ mock_db_session.session.scalars.return_value.all.side_effect = [
+ [mock_document], # documents
+ [mock_segment], # segments
+ ]
+ mock_index_processor_factory["processor"].clean.side_effect = Exception("Vector database error")
+
+ # Act
+ clean_dataset_task(
+ dataset_id=dataset_id,
+ tenant_id=tenant_id,
+ indexing_technique="high_quality",
+ index_struct='{"type": "paragraph"}',
+ collection_binding_id=collection_binding_id,
+ doc_form="paragraph_index",
+ )
+
+ # Assert - documents and segments should still be deleted
+ mock_db_session.session.delete.assert_any_call(mock_document)
+ mock_db_session.session.delete.assert_any_call(mock_segment)
+ mock_db_session.session.commit.assert_called_once()
+
+ def test_clean_dataset_task_storage_delete_failure_continues(
+ self,
+ dataset_id,
+ tenant_id,
+ collection_binding_id,
+ mock_db_session,
+ mock_storage,
+ mock_index_processor_factory,
+ mock_get_image_upload_file_ids,
+ ):
+ """
+ Test that cleanup continues even if storage deletion fails.
+
+ Scenario:
+ - Segment contains image file references
+ - Storage.delete() raises an exception
+ - Cleanup should continue
+
+ Expected behavior:
+ - Exception is caught and logged
+ - Image file record is still deleted from database
+ - Other cleanup operations proceed
+ """
+ # Arrange
+ # Need at least one document for segment processing to occur (code is in else block)
+ mock_document = MagicMock()
+ mock_document.id = str(uuid.uuid4())
+ mock_document.tenant_id = tenant_id
+ mock_document.data_source_type = "website" # Non-upload type to avoid file deletion
+
+ mock_segment = MagicMock()
+ mock_segment.id = str(uuid.uuid4())
+ mock_segment.content = "Test content with image"
+
+ mock_upload_file = MagicMock()
+ mock_upload_file.id = str(uuid.uuid4())
+ mock_upload_file.key = "images/test-image.jpg"
+
+ image_file_id = mock_upload_file.id
+
+ mock_db_session.session.scalars.return_value.all.side_effect = [
+ [mock_document], # documents - need at least one for segment processing
+ [mock_segment], # segments
+ ]
+ mock_get_image_upload_file_ids.return_value = [image_file_id]
+ mock_db_session.session.query.return_value.where.return_value.first.return_value = mock_upload_file
+ mock_storage.delete.side_effect = Exception("Storage service unavailable")
+
+ # Act
+ clean_dataset_task(
+ dataset_id=dataset_id,
+ tenant_id=tenant_id,
+ indexing_technique="high_quality",
+ index_struct='{"type": "paragraph"}',
+ collection_binding_id=collection_binding_id,
+ doc_form="paragraph_index",
+ )
+
+ # Assert - storage delete was attempted for image file
+ mock_storage.delete.assert_called_with(mock_upload_file.key)
+ # Image file should still be deleted from database
+ mock_db_session.session.delete.assert_any_call(mock_upload_file)
+
+ def test_clean_dataset_task_database_error_rollback(
+ self,
+ dataset_id,
+ tenant_id,
+ collection_binding_id,
+ mock_db_session,
+ mock_storage,
+ mock_index_processor_factory,
+ mock_get_image_upload_file_ids,
+ ):
+ """
+ Test that database session is rolled back on error.
+
+ Scenario:
+ - Database operation raises an exception
+ - Session should be rolled back to prevent dirty state
+
+ Expected behavior:
+ - Session.rollback() is called
+ - Session.close() is called in finally block
+ """
+ # Arrange
+ mock_db_session.session.commit.side_effect = Exception("Database commit failed")
+
+ # Act
+ clean_dataset_task(
+ dataset_id=dataset_id,
+ tenant_id=tenant_id,
+ indexing_technique="high_quality",
+ index_struct='{"type": "paragraph"}',
+ collection_binding_id=collection_binding_id,
+ doc_form="paragraph_index",
+ )
+
+ # Assert
+ mock_db_session.session.rollback.assert_called_once()
+ mock_db_session.session.close.assert_called_once()
+
+ def test_clean_dataset_task_rollback_failure_still_closes_session(
+ self,
+ dataset_id,
+ tenant_id,
+ collection_binding_id,
+ mock_db_session,
+ mock_storage,
+ mock_index_processor_factory,
+ mock_get_image_upload_file_ids,
+ ):
+ """
+ Test that session is closed even if rollback fails.
+
+ Scenario:
+ - Database commit fails
+ - Rollback also fails
+ - Session should still be closed
+
+ Expected behavior:
+ - Session.close() is called regardless of rollback failure
+ """
+ # Arrange
+ mock_db_session.session.commit.side_effect = Exception("Commit failed")
+ mock_db_session.session.rollback.side_effect = Exception("Rollback failed")
+
+ # Act
+ clean_dataset_task(
+ dataset_id=dataset_id,
+ tenant_id=tenant_id,
+ indexing_technique="high_quality",
+ index_struct='{"type": "paragraph"}',
+ collection_binding_id=collection_binding_id,
+ doc_form="paragraph_index",
+ )
+
+ # Assert
+ mock_db_session.session.close.assert_called_once()
+
+
+# ============================================================================
+# Test Pipeline and Workflow Deletion
+# ============================================================================
+
+
+class TestPipelineAndWorkflowDeletion:
+ """Test cases for pipeline and workflow deletion."""
+
+ def test_clean_dataset_task_with_pipeline_id(
+ self,
+ dataset_id,
+ tenant_id,
+ collection_binding_id,
+ pipeline_id,
+ mock_db_session,
+ mock_storage,
+ mock_index_processor_factory,
+ mock_get_image_upload_file_ids,
+ ):
+ """
+ Test that pipeline and workflow are deleted when pipeline_id is provided.
+
+ Expected behavior:
+ - Pipeline record is deleted
+ - Related workflow record is deleted
+ """
+ # Arrange
+ mock_query = mock_db_session.session.query.return_value
+ mock_query.where.return_value = mock_query
+ mock_query.delete.return_value = 1
+
+ # Act
+ clean_dataset_task(
+ dataset_id=dataset_id,
+ tenant_id=tenant_id,
+ indexing_technique="high_quality",
+ index_struct='{"type": "paragraph"}',
+ collection_binding_id=collection_binding_id,
+ doc_form="paragraph_index",
+ pipeline_id=pipeline_id,
+ )
+
+ # Assert - verify delete was called for pipeline-related queries
+ # The actual count depends on total queries, but pipeline deletion should add 2 more
+ assert mock_query.delete.call_count >= 7 # 5 base + 2 pipeline/workflow
+
+ def test_clean_dataset_task_without_pipeline_id(
+ self,
+ dataset_id,
+ tenant_id,
+ collection_binding_id,
+ mock_db_session,
+ mock_storage,
+ mock_index_processor_factory,
+ mock_get_image_upload_file_ids,
+ ):
+ """
+ Test that pipeline/workflow deletion is skipped when pipeline_id is None.
+
+ Expected behavior:
+ - Pipeline and workflow deletion queries are not executed
+ """
+ # Arrange
+ mock_query = mock_db_session.session.query.return_value
+ mock_query.where.return_value = mock_query
+ mock_query.delete.return_value = 1
+
+ # Act
+ clean_dataset_task(
+ dataset_id=dataset_id,
+ tenant_id=tenant_id,
+ indexing_technique="high_quality",
+ index_struct='{"type": "paragraph"}',
+ collection_binding_id=collection_binding_id,
+ doc_form="paragraph_index",
+ pipeline_id=None,
+ )
+
+ # Assert - verify delete was called only for base queries (5 times)
+ assert mock_query.delete.call_count == 5
+
+
+# ============================================================================
+# Test Segment Attachment Cleanup
+# ============================================================================
+
+
+class TestSegmentAttachmentCleanup:
+ """Test cases for segment attachment cleanup."""
+
+ def test_clean_dataset_task_with_attachments(
+ self,
+ dataset_id,
+ tenant_id,
+ collection_binding_id,
+ mock_db_session,
+ mock_storage,
+ mock_index_processor_factory,
+ mock_get_image_upload_file_ids,
+ ):
+ """
+ Test that segment attachments are cleaned up properly.
+
+ Scenario:
+ - Dataset has segment attachments with associated files
+ - Both binding and file records should be deleted
+
+ Expected behavior:
+ - Storage.delete() is called for each attachment file
+ - Attachment file records are deleted from database
+ - Binding records are deleted from database
+ """
+ # Arrange
+ mock_binding = MagicMock()
+ mock_binding.attachment_id = str(uuid.uuid4())
+
+ mock_attachment_file = MagicMock()
+ mock_attachment_file.id = mock_binding.attachment_id
+ mock_attachment_file.key = f"attachments/{uuid.uuid4()}.pdf"
+
+ # Setup execute to return attachment with binding
+ mock_db_session.session.execute.return_value.all.return_value = [(mock_binding, mock_attachment_file)]
+
+ # Act
+ clean_dataset_task(
+ dataset_id=dataset_id,
+ tenant_id=tenant_id,
+ indexing_technique="high_quality",
+ index_struct='{"type": "paragraph"}',
+ collection_binding_id=collection_binding_id,
+ doc_form="paragraph_index",
+ )
+
+ # Assert
+ mock_storage.delete.assert_called_with(mock_attachment_file.key)
+ mock_db_session.session.delete.assert_any_call(mock_attachment_file)
+ mock_db_session.session.delete.assert_any_call(mock_binding)
+
+ def test_clean_dataset_task_attachment_storage_failure(
+ self,
+ dataset_id,
+ tenant_id,
+ collection_binding_id,
+ mock_db_session,
+ mock_storage,
+ mock_index_processor_factory,
+ mock_get_image_upload_file_ids,
+ ):
+ """
+ Test that cleanup continues even if attachment storage deletion fails.
+
+ Expected behavior:
+ - Exception is caught and logged
+ - Attachment file and binding are still deleted from database
+ """
+ # Arrange
+ mock_binding = MagicMock()
+ mock_binding.attachment_id = str(uuid.uuid4())
+
+ mock_attachment_file = MagicMock()
+ mock_attachment_file.id = mock_binding.attachment_id
+ mock_attachment_file.key = f"attachments/{uuid.uuid4()}.pdf"
+
+ mock_db_session.session.execute.return_value.all.return_value = [(mock_binding, mock_attachment_file)]
+ mock_storage.delete.side_effect = Exception("Storage error")
+
+ # Act
+ clean_dataset_task(
+ dataset_id=dataset_id,
+ tenant_id=tenant_id,
+ indexing_technique="high_quality",
+ index_struct='{"type": "paragraph"}',
+ collection_binding_id=collection_binding_id,
+ doc_form="paragraph_index",
+ )
+
+ # Assert - storage delete was attempted
+ mock_storage.delete.assert_called_once()
+ # Records should still be deleted from database
+ mock_db_session.session.delete.assert_any_call(mock_attachment_file)
+ mock_db_session.session.delete.assert_any_call(mock_binding)
+
+
+# ============================================================================
+# Test Upload File Cleanup
+# ============================================================================
+
+
+class TestUploadFileCleanup:
+ """Test cases for upload file cleanup."""
+
+ def test_clean_dataset_task_deletes_document_upload_files(
+ self,
+ dataset_id,
+ tenant_id,
+ collection_binding_id,
+ mock_db_session,
+ mock_storage,
+ mock_index_processor_factory,
+ mock_get_image_upload_file_ids,
+ ):
+ """
+ Test that document upload files are deleted.
+
+ Scenario:
+ - Document has data_source_type = "upload_file"
+ - data_source_info contains upload_file_id
+
+ Expected behavior:
+ - Upload file is deleted from storage
+ - Upload file record is deleted from database
+ """
+ # Arrange
+ mock_document = MagicMock()
+ mock_document.id = str(uuid.uuid4())
+ mock_document.tenant_id = tenant_id
+ mock_document.data_source_type = "upload_file"
+ mock_document.data_source_info = '{"upload_file_id": "test-file-id"}'
+ mock_document.data_source_info_dict = {"upload_file_id": "test-file-id"}
+
+ mock_upload_file = MagicMock()
+ mock_upload_file.id = "test-file-id"
+ mock_upload_file.key = "uploads/test-file.txt"
+
+ mock_db_session.session.scalars.return_value.all.side_effect = [
+ [mock_document], # documents
+ [], # segments
+ ]
+ mock_db_session.session.query.return_value.where.return_value.first.return_value = mock_upload_file
+
+ # Act
+ clean_dataset_task(
+ dataset_id=dataset_id,
+ tenant_id=tenant_id,
+ indexing_technique="high_quality",
+ index_struct='{"type": "paragraph"}',
+ collection_binding_id=collection_binding_id,
+ doc_form="paragraph_index",
+ )
+
+ # Assert
+ mock_storage.delete.assert_called_with(mock_upload_file.key)
+ mock_db_session.session.delete.assert_any_call(mock_upload_file)
+
+ def test_clean_dataset_task_handles_missing_upload_file(
+ self,
+ dataset_id,
+ tenant_id,
+ collection_binding_id,
+ mock_db_session,
+ mock_storage,
+ mock_index_processor_factory,
+ mock_get_image_upload_file_ids,
+ ):
+ """
+ Test that missing upload files are handled gracefully.
+
+ Scenario:
+ - Document references an upload_file_id that doesn't exist
+
+ Expected behavior:
+ - No error is raised
+ - Cleanup continues normally
+ """
+ # Arrange
+ mock_document = MagicMock()
+ mock_document.id = str(uuid.uuid4())
+ mock_document.tenant_id = tenant_id
+ mock_document.data_source_type = "upload_file"
+ mock_document.data_source_info = '{"upload_file_id": "nonexistent-file"}'
+ mock_document.data_source_info_dict = {"upload_file_id": "nonexistent-file"}
+
+ mock_db_session.session.scalars.return_value.all.side_effect = [
+ [mock_document], # documents
+ [], # segments
+ ]
+ mock_db_session.session.query.return_value.where.return_value.first.return_value = None
+
+ # Act - should not raise exception
+ clean_dataset_task(
+ dataset_id=dataset_id,
+ tenant_id=tenant_id,
+ indexing_technique="high_quality",
+ index_struct='{"type": "paragraph"}',
+ collection_binding_id=collection_binding_id,
+ doc_form="paragraph_index",
+ )
+
+ # Assert
+ mock_storage.delete.assert_not_called()
+ mock_db_session.session.commit.assert_called_once()
+
+ def test_clean_dataset_task_handles_non_upload_file_data_source(
+ self,
+ dataset_id,
+ tenant_id,
+ collection_binding_id,
+ mock_db_session,
+ mock_storage,
+ mock_index_processor_factory,
+ mock_get_image_upload_file_ids,
+ ):
+ """
+ Test that non-upload_file data sources are skipped.
+
+ Scenario:
+ - Document has data_source_type = "website"
+
+ Expected behavior:
+ - No file deletion is attempted
+ """
+ # Arrange
+ mock_document = MagicMock()
+ mock_document.id = str(uuid.uuid4())
+ mock_document.tenant_id = tenant_id
+ mock_document.data_source_type = "website"
+ mock_document.data_source_info = None
+
+ mock_db_session.session.scalars.return_value.all.side_effect = [
+ [mock_document], # documents
+ [], # segments
+ ]
+
+ # Act
+ clean_dataset_task(
+ dataset_id=dataset_id,
+ tenant_id=tenant_id,
+ indexing_technique="high_quality",
+ index_struct='{"type": "paragraph"}',
+ collection_binding_id=collection_binding_id,
+ doc_form="paragraph_index",
+ )
+
+ # Assert - storage delete should not be called for document files
+ # (only for image files in segments, which are empty here)
+ mock_storage.delete.assert_not_called()
+
+
+# ============================================================================
+# Test Image File Cleanup
+# ============================================================================
+
+
+class TestImageFileCleanup:
+ """Test cases for image file cleanup in segments."""
+
+ def test_clean_dataset_task_deletes_image_files_in_segments(
+ self,
+ dataset_id,
+ tenant_id,
+ collection_binding_id,
+ mock_db_session,
+ mock_storage,
+ mock_index_processor_factory,
+ mock_get_image_upload_file_ids,
+ ):
+ """
+ Test that image files referenced in segment content are deleted.
+
+ Scenario:
+ - Segment content contains image file references
+ - get_image_upload_file_ids returns file IDs
+
+ Expected behavior:
+ - Each image file is deleted from storage
+ - Each image file record is deleted from database
+ """
+ # Arrange
+ # Need at least one document for segment processing to occur (code is in else block)
+ mock_document = MagicMock()
+ mock_document.id = str(uuid.uuid4())
+ mock_document.tenant_id = tenant_id
+ mock_document.data_source_type = "website" # Non-upload type
+
+ mock_segment = MagicMock()
+ mock_segment.id = str(uuid.uuid4())
+ mock_segment.content = '
'
+
+ image_file_ids = ["image-1", "image-2"]
+ mock_get_image_upload_file_ids.return_value = image_file_ids
+
+ mock_image_files = []
+ for file_id in image_file_ids:
+ mock_file = MagicMock()
+ mock_file.id = file_id
+ mock_file.key = f"images/{file_id}.jpg"
+ mock_image_files.append(mock_file)
+
+ mock_db_session.session.scalars.return_value.all.side_effect = [
+ [mock_document], # documents - need at least one for segment processing
+ [mock_segment], # segments
+ ]
+
+ # Setup a mock query chain that returns files in sequence
+ mock_query = MagicMock()
+ mock_where = MagicMock()
+ mock_query.where.return_value = mock_where
+ mock_where.first.side_effect = mock_image_files
+ mock_db_session.session.query.return_value = mock_query
+
+ # Act
+ clean_dataset_task(
+ dataset_id=dataset_id,
+ tenant_id=tenant_id,
+ indexing_technique="high_quality",
+ index_struct='{"type": "paragraph"}',
+ collection_binding_id=collection_binding_id,
+ doc_form="paragraph_index",
+ )
+
+ # Assert
+ assert mock_storage.delete.call_count == 2
+ mock_storage.delete.assert_any_call("images/image-1.jpg")
+ mock_storage.delete.assert_any_call("images/image-2.jpg")
+
+ def test_clean_dataset_task_handles_missing_image_file(
+ self,
+ dataset_id,
+ tenant_id,
+ collection_binding_id,
+ mock_db_session,
+ mock_storage,
+ mock_index_processor_factory,
+ mock_get_image_upload_file_ids,
+ ):
+ """
+ Test that missing image files are handled gracefully.
+
+ Scenario:
+ - Segment references image file ID that doesn't exist in database
+
+ Expected behavior:
+ - No error is raised
+ - Cleanup continues
+ """
+ # Arrange
+ # Need at least one document for segment processing to occur (code is in else block)
+ mock_document = MagicMock()
+ mock_document.id = str(uuid.uuid4())
+ mock_document.tenant_id = tenant_id
+ mock_document.data_source_type = "website" # Non-upload type
+
+ mock_segment = MagicMock()
+ mock_segment.id = str(uuid.uuid4())
+ mock_segment.content = '
'
+
+ mock_get_image_upload_file_ids.return_value = ["nonexistent-image"]
+
+ mock_db_session.session.scalars.return_value.all.side_effect = [
+ [mock_document], # documents - need at least one for segment processing
+ [mock_segment], # segments
+ ]
+
+ # Image file not found
+ mock_db_session.session.query.return_value.where.return_value.first.return_value = None
+
+ # Act - should not raise exception
+ clean_dataset_task(
+ dataset_id=dataset_id,
+ tenant_id=tenant_id,
+ indexing_technique="high_quality",
+ index_struct='{"type": "paragraph"}',
+ collection_binding_id=collection_binding_id,
+ doc_form="paragraph_index",
+ )
+
+ # Assert
+ mock_storage.delete.assert_not_called()
+ mock_db_session.session.commit.assert_called_once()
+
+
+# ============================================================================
+# Test Edge Cases
+# ============================================================================
+
+
+class TestEdgeCases:
+ """Test edge cases and boundary conditions."""
+
+ def test_clean_dataset_task_multiple_documents_and_segments(
+ self,
+ dataset_id,
+ tenant_id,
+ collection_binding_id,
+ mock_db_session,
+ mock_storage,
+ mock_index_processor_factory,
+ mock_get_image_upload_file_ids,
+ ):
+ """
+ Test cleanup of multiple documents and segments.
+
+ Scenario:
+ - Dataset has 5 documents and 10 segments
+
+ Expected behavior:
+ - All documents and segments are deleted
+ """
+ # Arrange
+ mock_documents = []
+ for i in range(5):
+ doc = MagicMock()
+ doc.id = str(uuid.uuid4())
+ doc.tenant_id = tenant_id
+ doc.data_source_type = "website" # Non-upload type
+ mock_documents.append(doc)
+
+ mock_segments = []
+ for i in range(10):
+ seg = MagicMock()
+ seg.id = str(uuid.uuid4())
+ seg.content = f"Segment content {i}"
+ mock_segments.append(seg)
+
+ mock_db_session.session.scalars.return_value.all.side_effect = [
+ mock_documents,
+ mock_segments,
+ ]
+ mock_get_image_upload_file_ids.return_value = []
+
+ # Act
+ clean_dataset_task(
+ dataset_id=dataset_id,
+ tenant_id=tenant_id,
+ indexing_technique="high_quality",
+ index_struct='{"type": "paragraph"}',
+ collection_binding_id=collection_binding_id,
+ doc_form="paragraph_index",
+ )
+
+ # Assert - all documents and segments should be deleted
+ delete_calls = mock_db_session.session.delete.call_args_list
+ deleted_items = [call[0][0] for call in delete_calls]
+
+ for doc in mock_documents:
+ assert doc in deleted_items
+ for seg in mock_segments:
+ assert seg in deleted_items
+
+ def test_clean_dataset_task_document_with_empty_data_source_info(
+ self,
+ dataset_id,
+ tenant_id,
+ collection_binding_id,
+ mock_db_session,
+ mock_storage,
+ mock_index_processor_factory,
+ mock_get_image_upload_file_ids,
+ ):
+ """
+ Test handling of document with empty data_source_info.
+
+ Scenario:
+ - Document has data_source_type = "upload_file"
+ - data_source_info is None or empty
+
+ Expected behavior:
+ - No error is raised
+ - File deletion is skipped
+ """
+ # Arrange
+ mock_document = MagicMock()
+ mock_document.id = str(uuid.uuid4())
+ mock_document.tenant_id = tenant_id
+ mock_document.data_source_type = "upload_file"
+ mock_document.data_source_info = None
+
+ mock_db_session.session.scalars.return_value.all.side_effect = [
+ [mock_document], # documents
+ [], # segments
+ ]
+
+ # Act - should not raise exception
+ clean_dataset_task(
+ dataset_id=dataset_id,
+ tenant_id=tenant_id,
+ indexing_technique="high_quality",
+ index_struct='{"type": "paragraph"}',
+ collection_binding_id=collection_binding_id,
+ doc_form="paragraph_index",
+ )
+
+ # Assert
+ mock_storage.delete.assert_not_called()
+ mock_db_session.session.commit.assert_called_once()
+
+ def test_clean_dataset_task_session_always_closed(
+ self,
+ dataset_id,
+ tenant_id,
+ collection_binding_id,
+ mock_db_session,
+ mock_storage,
+ mock_index_processor_factory,
+ mock_get_image_upload_file_ids,
+ ):
+ """
+ Test that database session is always closed regardless of success or failure.
+
+ Expected behavior:
+ - Session.close() is called in finally block
+ """
+ # Act
+ clean_dataset_task(
+ dataset_id=dataset_id,
+ tenant_id=tenant_id,
+ indexing_technique="high_quality",
+ index_struct='{"type": "paragraph"}',
+ collection_binding_id=collection_binding_id,
+ doc_form="paragraph_index",
+ )
+
+ # Assert
+ mock_db_session.session.close.assert_called_once()
+
+
+# ============================================================================
+# Test IndexProcessor Parameters
+# ============================================================================
+
+
+class TestIndexProcessorParameters:
+ """Test cases for IndexProcessor clean method parameters."""
+
+ def test_clean_dataset_task_passes_correct_parameters_to_index_processor(
+ self,
+ dataset_id,
+ tenant_id,
+ collection_binding_id,
+ mock_db_session,
+ mock_storage,
+ mock_index_processor_factory,
+ mock_get_image_upload_file_ids,
+ ):
+ """
+ Test that correct parameters are passed to IndexProcessor.clean().
+
+ Expected behavior:
+ - with_keywords=True is passed
+ - delete_child_chunks=True is passed
+ - Dataset object with correct attributes is passed
+ """
+ # Arrange
+ indexing_technique = "high_quality"
+ index_struct = '{"type": "paragraph"}'
+
+ # Act
+ clean_dataset_task(
+ dataset_id=dataset_id,
+ tenant_id=tenant_id,
+ indexing_technique=indexing_technique,
+ index_struct=index_struct,
+ collection_binding_id=collection_binding_id,
+ doc_form="paragraph_index",
+ )
+
+ # Assert
+ mock_index_processor_factory["processor"].clean.assert_called_once()
+ call_args = mock_index_processor_factory["processor"].clean.call_args
+
+ # Verify positional arguments
+ dataset_arg = call_args[0][0]
+ assert dataset_arg.id == dataset_id
+ assert dataset_arg.tenant_id == tenant_id
+ assert dataset_arg.indexing_technique == indexing_technique
+ assert dataset_arg.index_struct == index_struct
+ assert dataset_arg.collection_binding_id == collection_binding_id
+
+ # Verify None is passed as second argument
+ assert call_args[0][1] is None
+
+ # Verify keyword arguments
+ assert call_args[1]["with_keywords"] is True
+ assert call_args[1]["delete_child_chunks"] is True