From df3c66a8acbfaae37f0697c6294b5b5f3f6ffe04 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9C=A8=E4=B9=8B=E6=9C=AC=E6=BE=AA?= Date: Thu, 5 Mar 2026 01:35:24 +0800 Subject: [PATCH] test: migrate duplicate_document_indexing_task SQL tests to testcontainers (#32540) Co-authored-by: KinomotoMio <200703522+KinomotoMio@users.noreply.github.com> --- .../test_duplicate_document_indexing_task.py | 100 ++++- .../test_duplicate_document_indexing_task.py | 393 +----------------- 2 files changed, 97 insertions(+), 396 deletions(-) diff --git a/api/tests/test_containers_integration_tests/tasks/test_duplicate_document_indexing_task.py b/api/tests/test_containers_integration_tests/tasks/test_duplicate_document_indexing_task.py index b2e1ce3b89..c61e37b1e9 100644 --- a/api/tests/test_containers_integration_tests/tasks/test_duplicate_document_indexing_task.py +++ b/api/tests/test_containers_integration_tests/tasks/test_duplicate_document_indexing_task.py @@ -3,6 +3,7 @@ from unittest.mock import MagicMock, patch import pytest from faker import Faker +from core.indexing_runner import DocumentIsPausedError from enums.cloud_plan import CloudPlan from models import Account, Tenant, TenantAccountJoin, TenantAccountRole from models.dataset import Dataset, Document, DocumentSegment @@ -282,7 +283,7 @@ class TestDuplicateDocumentIndexingTasks: return dataset, documents - def test_duplicate_document_indexing_task_success( + def _test_duplicate_document_indexing_task_success( self, db_session_with_containers, mock_external_service_dependencies ): """ @@ -324,7 +325,7 @@ class TestDuplicateDocumentIndexingTasks: processed_documents = call_args[0][0] # First argument should be documents list assert len(processed_documents) == 3 - def test_duplicate_document_indexing_task_with_segment_cleanup( + def _test_duplicate_document_indexing_task_with_segment_cleanup( self, db_session_with_containers, mock_external_service_dependencies ): """ @@ -374,7 +375,7 @@ class TestDuplicateDocumentIndexingTasks: mock_external_service_dependencies["indexing_runner"].assert_called_once() mock_external_service_dependencies["indexing_runner_instance"].run.assert_called_once() - def test_duplicate_document_indexing_task_dataset_not_found( + def _test_duplicate_document_indexing_task_dataset_not_found( self, db_session_with_containers, mock_external_service_dependencies ): """ @@ -445,7 +446,7 @@ class TestDuplicateDocumentIndexingTasks: processed_documents = call_args[0][0] # First argument should be documents list assert len(processed_documents) == 2 # Only existing documents - def test_duplicate_document_indexing_task_indexing_runner_exception( + def _test_duplicate_document_indexing_task_indexing_runner_exception( self, db_session_with_containers, mock_external_service_dependencies ): """ @@ -486,7 +487,7 @@ class TestDuplicateDocumentIndexingTasks: assert updated_document.indexing_status == "parsing" assert updated_document.processing_started_at is not None - def test_duplicate_document_indexing_task_billing_sandbox_plan_batch_limit( + def _test_duplicate_document_indexing_task_billing_sandbox_plan_batch_limit( self, db_session_with_containers, mock_external_service_dependencies ): """ @@ -549,7 +550,7 @@ class TestDuplicateDocumentIndexingTasks: # Verify indexing runner was not called due to early validation error mock_external_service_dependencies["indexing_runner_instance"].run.assert_not_called() - def test_duplicate_document_indexing_task_billing_vector_space_limit_exceeded( + def _test_duplicate_document_indexing_task_billing_vector_space_limit_exceeded( self, db_session_with_containers, mock_external_service_dependencies ): """ @@ -783,3 +784,90 @@ class TestDuplicateDocumentIndexingTasks: document_ids=document_ids, ) mock_queue.delete_task_key.assert_not_called() + + def test_successful_duplicate_document_indexing( + self, db_session_with_containers, mock_external_service_dependencies + ): + """Test successful duplicate document indexing flow.""" + self._test_duplicate_document_indexing_task_success( + db_session_with_containers, mock_external_service_dependencies + ) + + def test_duplicate_document_indexing_dataset_not_found( + self, db_session_with_containers, mock_external_service_dependencies + ): + """Test duplicate document indexing when dataset is not found.""" + self._test_duplicate_document_indexing_task_dataset_not_found( + db_session_with_containers, mock_external_service_dependencies + ) + + def test_duplicate_document_indexing_with_billing_enabled_sandbox_plan( + self, db_session_with_containers, mock_external_service_dependencies + ): + """Test duplicate document indexing with billing enabled and sandbox plan.""" + self._test_duplicate_document_indexing_task_billing_sandbox_plan_batch_limit( + db_session_with_containers, mock_external_service_dependencies + ) + + def test_duplicate_document_indexing_with_billing_limit_exceeded( + self, db_session_with_containers, mock_external_service_dependencies + ): + """Test duplicate document indexing when billing limit is exceeded.""" + self._test_duplicate_document_indexing_task_billing_vector_space_limit_exceeded( + db_session_with_containers, mock_external_service_dependencies + ) + + def test_duplicate_document_indexing_runner_error( + self, db_session_with_containers, mock_external_service_dependencies + ): + """Test duplicate document indexing when IndexingRunner raises an error.""" + self._test_duplicate_document_indexing_task_indexing_runner_exception( + db_session_with_containers, mock_external_service_dependencies + ) + + def _test_duplicate_document_indexing_task_document_is_paused( + self, db_session_with_containers, mock_external_service_dependencies + ): + """Test duplicate document indexing when document is paused.""" + # Arrange + dataset, documents = self._create_test_dataset_and_documents( + db_session_with_containers, mock_external_service_dependencies, document_count=2 + ) + for document in documents: + document.is_paused = True + db_session_with_containers.add(document) + db_session_with_containers.commit() + + document_ids = [doc.id for doc in documents] + mock_external_service_dependencies["indexing_runner_instance"].run.side_effect = DocumentIsPausedError( + "Document paused" + ) + + # Act + _duplicate_document_indexing_task(dataset.id, document_ids) + db_session_with_containers.expire_all() + + # Assert + for doc_id in document_ids: + updated_document = db_session_with_containers.query(Document).where(Document.id == doc_id).first() + assert updated_document.is_paused is True + assert updated_document.indexing_status == "parsing" + assert updated_document.display_status == "paused" + assert updated_document.processing_started_at is not None + mock_external_service_dependencies["indexing_runner_instance"].run.assert_called_once() + + def test_duplicate_document_indexing_document_is_paused( + self, db_session_with_containers, mock_external_service_dependencies + ): + """Test duplicate document indexing when document is paused.""" + self._test_duplicate_document_indexing_task_document_is_paused( + db_session_with_containers, mock_external_service_dependencies + ) + + def test_duplicate_document_indexing_cleans_old_segments( + self, db_session_with_containers, mock_external_service_dependencies + ): + """Test that duplicate document indexing cleans old segments.""" + self._test_duplicate_document_indexing_task_with_segment_cleanup( + db_session_with_containers, mock_external_service_dependencies + ) diff --git a/api/tests/unit_tests/tasks/test_duplicate_document_indexing_task.py b/api/tests/unit_tests/tasks/test_duplicate_document_indexing_task.py index 68fb8b748f..f6dbc4275b 100644 --- a/api/tests/unit_tests/tasks/test_duplicate_document_indexing_task.py +++ b/api/tests/unit_tests/tasks/test_duplicate_document_indexing_task.py @@ -1,158 +1,38 @@ -""" -Unit tests for duplicate document indexing tasks. - -This module tests the duplicate document indexing task functionality including: -- Task enqueuing to different queues (normal, priority, tenant-isolated) -- Batch processing of multiple duplicate documents -- Progress tracking through task lifecycle -- Error handling and retry mechanisms -- Cleanup of old document data before re-indexing -""" +"""Unit tests for queue/wrapper behaviors in duplicate document indexing tasks (non-database logic).""" import uuid -from unittest.mock import MagicMock, Mock, patch +from unittest.mock import Mock, patch import pytest -from core.indexing_runner import DocumentIsPausedError, IndexingRunner from core.rag.pipeline.queue import TenantIsolatedTaskQueue -from enums.cloud_plan import CloudPlan -from models.dataset import Dataset, Document, DocumentSegment from tasks.duplicate_document_indexing_task import ( - _duplicate_document_indexing_task, _duplicate_document_indexing_task_with_tenant_queue, duplicate_document_indexing_task, normal_duplicate_document_indexing_task, priority_duplicate_document_indexing_task, ) -# ============================================================================ -# Fixtures -# ============================================================================ - @pytest.fixture def tenant_id(): - """Generate a unique tenant ID for testing.""" return str(uuid.uuid4()) @pytest.fixture def dataset_id(): - """Generate a unique dataset ID for testing.""" return str(uuid.uuid4()) @pytest.fixture def document_ids(): - """Generate a list of document IDs for testing.""" return [str(uuid.uuid4()) for _ in range(3)] -@pytest.fixture -def mock_dataset(dataset_id, tenant_id): - """Create a mock Dataset object.""" - dataset = Mock(spec=Dataset) - dataset.id = dataset_id - dataset.tenant_id = tenant_id - dataset.indexing_technique = "high_quality" - dataset.embedding_model_provider = "openai" - dataset.embedding_model = "text-embedding-ada-002" - return dataset - - -@pytest.fixture -def mock_documents(document_ids, dataset_id): - """Create mock Document objects.""" - documents = [] - for doc_id in document_ids: - doc = Mock(spec=Document) - doc.id = doc_id - doc.dataset_id = dataset_id - doc.indexing_status = "waiting" - doc.error = None - doc.stopped_at = None - doc.processing_started_at = None - doc.doc_form = "text_model" - documents.append(doc) - return documents - - -@pytest.fixture -def mock_document_segments(document_ids): - """Create mock DocumentSegment objects.""" - segments = [] - for doc_id in document_ids: - for i in range(3): - segment = Mock(spec=DocumentSegment) - segment.id = str(uuid.uuid4()) - segment.document_id = doc_id - segment.index_node_id = f"node-{doc_id}-{i}" - segments.append(segment) - return segments - - -@pytest.fixture -def mock_db_session(): - """Mock database session via session_factory.create_session().""" - with patch("tasks.duplicate_document_indexing_task.session_factory", autospec=True) as mock_sf: - session = MagicMock() - # Allow tests to observe session.close() via context manager teardown - session.close = MagicMock() - cm = MagicMock() - cm.__enter__.return_value = session - - def _exit_side_effect(*args, **kwargs): - session.close() - - cm.__exit__.side_effect = _exit_side_effect - mock_sf.create_session.return_value = cm - - query = MagicMock() - session.query.return_value = query - query.where.return_value = query - session.scalars.return_value = MagicMock() - yield session - - -@pytest.fixture -def mock_indexing_runner(): - """Mock IndexingRunner.""" - with patch("tasks.duplicate_document_indexing_task.IndexingRunner", autospec=True) as mock_runner_class: - mock_runner = MagicMock(spec=IndexingRunner) - mock_runner_class.return_value = mock_runner - yield mock_runner - - -@pytest.fixture -def mock_feature_service(): - """Mock FeatureService.""" - with patch("tasks.duplicate_document_indexing_task.FeatureService", autospec=True) as mock_service: - mock_features = Mock() - mock_features.billing = Mock() - mock_features.billing.enabled = False - mock_features.vector_space = Mock() - mock_features.vector_space.size = 0 - mock_features.vector_space.limit = 1000 - mock_service.get_features.return_value = mock_features - yield mock_service - - -@pytest.fixture -def mock_index_processor_factory(): - """Mock IndexProcessorFactory.""" - with patch("tasks.duplicate_document_indexing_task.IndexProcessorFactory", autospec=True) as mock_factory: - mock_processor = MagicMock() - mock_processor.clean = Mock() - mock_factory.return_value.init_index_processor.return_value = mock_processor - yield mock_factory - - @pytest.fixture def mock_tenant_isolated_queue(): - """Mock TenantIsolatedTaskQueue.""" with patch("tasks.duplicate_document_indexing_task.TenantIsolatedTaskQueue", autospec=True) as mock_queue_class: - mock_queue = MagicMock(spec=TenantIsolatedTaskQueue) + mock_queue = Mock(spec=TenantIsolatedTaskQueue) mock_queue.pull_tasks.return_value = [] mock_queue.delete_task_key = Mock() mock_queue.set_task_waiting_time = Mock() @@ -160,11 +40,6 @@ def mock_tenant_isolated_queue(): yield mock_queue -# ============================================================================ -# Tests for deprecated duplicate_document_indexing_task -# ============================================================================ - - class TestDuplicateDocumentIndexingTask: """Tests for the deprecated duplicate_document_indexing_task function.""" @@ -190,258 +65,6 @@ class TestDuplicateDocumentIndexingTask: mock_core_func.assert_called_once_with(dataset_id, document_ids) -# ============================================================================ -# Tests for _duplicate_document_indexing_task core function -# ============================================================================ - - -class TestDuplicateDocumentIndexingTaskCore: - """Tests for the _duplicate_document_indexing_task core function.""" - - def test_successful_duplicate_document_indexing( - self, - mock_db_session, - mock_indexing_runner, - mock_feature_service, - mock_index_processor_factory, - mock_dataset, - mock_documents, - mock_document_segments, - dataset_id, - document_ids, - ): - """Test successful duplicate document indexing flow.""" - # Arrange - # Dataset via query.first() - mock_db_session.query.return_value.where.return_value.first.return_value = mock_dataset - # scalars() call sequence: - # 1) documents list - # 2..N) segments per document - - def _scalars_side_effect(*args, **kwargs): - m = MagicMock() - # First call returns documents; subsequent calls return segments - if not hasattr(_scalars_side_effect, "_calls"): - _scalars_side_effect._calls = 0 - if _scalars_side_effect._calls == 0: - m.all.return_value = mock_documents - else: - m.all.return_value = mock_document_segments - _scalars_side_effect._calls += 1 - return m - - mock_db_session.scalars.side_effect = _scalars_side_effect - - # Act - _duplicate_document_indexing_task(dataset_id, document_ids) - - # Assert - # Verify IndexingRunner was called - mock_indexing_runner.run.assert_called_once() - - # Verify all documents were set to parsing status - for doc in mock_documents: - assert doc.indexing_status == "parsing" - assert doc.processing_started_at is not None - - # Verify session operations - assert mock_db_session.commit.called - assert mock_db_session.close.called - - def test_duplicate_document_indexing_dataset_not_found(self, mock_db_session, dataset_id, document_ids): - """Test duplicate document indexing when dataset is not found.""" - # Arrange - mock_db_session.query.return_value.where.return_value.first.return_value = None - - # Act - _duplicate_document_indexing_task(dataset_id, document_ids) - - # Assert - # Should close the session at least once - assert mock_db_session.close.called - - def test_duplicate_document_indexing_with_billing_enabled_sandbox_plan( - self, - mock_db_session, - mock_feature_service, - mock_dataset, - dataset_id, - document_ids, - ): - """Test duplicate document indexing with billing enabled and sandbox plan.""" - # Arrange - mock_db_session.query.return_value.where.return_value.first.return_value = mock_dataset - mock_features = mock_feature_service.get_features.return_value - mock_features.billing.enabled = True - mock_features.billing.subscription.plan = CloudPlan.SANDBOX - - # Act - _duplicate_document_indexing_task(dataset_id, document_ids) - - # Assert - # For sandbox plan with multiple documents, should fail - mock_db_session.commit.assert_called() - - def test_duplicate_document_indexing_with_billing_limit_exceeded( - self, - mock_db_session, - mock_feature_service, - mock_dataset, - mock_documents, - dataset_id, - document_ids, - ): - """Test duplicate document indexing when billing limit is exceeded.""" - # Arrange - mock_db_session.query.return_value.where.return_value.first.return_value = mock_dataset - # First scalars() -> documents; subsequent -> empty segments - - def _scalars_side_effect(*args, **kwargs): - m = MagicMock() - if not hasattr(_scalars_side_effect, "_calls"): - _scalars_side_effect._calls = 0 - if _scalars_side_effect._calls == 0: - m.all.return_value = mock_documents - else: - m.all.return_value = [] - _scalars_side_effect._calls += 1 - return m - - mock_db_session.scalars.side_effect = _scalars_side_effect - mock_features = mock_feature_service.get_features.return_value - mock_features.billing.enabled = True - mock_features.billing.subscription.plan = CloudPlan.TEAM - mock_features.vector_space.size = 990 - mock_features.vector_space.limit = 1000 - - # Act - _duplicate_document_indexing_task(dataset_id, document_ids) - - # Assert - # Should commit the session - assert mock_db_session.commit.called - # Should close the session - assert mock_db_session.close.called - - def test_duplicate_document_indexing_runner_error( - self, - mock_db_session, - mock_indexing_runner, - mock_feature_service, - mock_index_processor_factory, - mock_dataset, - mock_documents, - dataset_id, - document_ids, - ): - """Test duplicate document indexing when IndexingRunner raises an error.""" - # Arrange - mock_db_session.query.return_value.where.return_value.first.return_value = mock_dataset - - def _scalars_side_effect(*args, **kwargs): - m = MagicMock() - if not hasattr(_scalars_side_effect, "_calls"): - _scalars_side_effect._calls = 0 - if _scalars_side_effect._calls == 0: - m.all.return_value = mock_documents - else: - m.all.return_value = [] - _scalars_side_effect._calls += 1 - return m - - mock_db_session.scalars.side_effect = _scalars_side_effect - mock_indexing_runner.run.side_effect = Exception("Indexing error") - - # Act - _duplicate_document_indexing_task(dataset_id, document_ids) - - # Assert - # Should close the session even after error - mock_db_session.close.assert_called_once() - - def test_duplicate_document_indexing_document_is_paused( - self, - mock_db_session, - mock_indexing_runner, - mock_feature_service, - mock_index_processor_factory, - mock_dataset, - mock_documents, - dataset_id, - document_ids, - ): - """Test duplicate document indexing when document is paused.""" - # Arrange - mock_db_session.query.return_value.where.return_value.first.return_value = mock_dataset - - def _scalars_side_effect(*args, **kwargs): - m = MagicMock() - if not hasattr(_scalars_side_effect, "_calls"): - _scalars_side_effect._calls = 0 - if _scalars_side_effect._calls == 0: - m.all.return_value = mock_documents - else: - m.all.return_value = [] - _scalars_side_effect._calls += 1 - return m - - mock_db_session.scalars.side_effect = _scalars_side_effect - mock_indexing_runner.run.side_effect = DocumentIsPausedError("Document paused") - - # Act - _duplicate_document_indexing_task(dataset_id, document_ids) - - # Assert - # Should handle DocumentIsPausedError gracefully - mock_db_session.close.assert_called_once() - - def test_duplicate_document_indexing_cleans_old_segments( - self, - mock_db_session, - mock_indexing_runner, - mock_feature_service, - mock_index_processor_factory, - mock_dataset, - mock_documents, - mock_document_segments, - dataset_id, - document_ids, - ): - """Test that duplicate document indexing cleans old segments.""" - # Arrange - mock_db_session.query.return_value.where.return_value.first.return_value = mock_dataset - - def _scalars_side_effect(*args, **kwargs): - m = MagicMock() - if not hasattr(_scalars_side_effect, "_calls"): - _scalars_side_effect._calls = 0 - if _scalars_side_effect._calls == 0: - m.all.return_value = mock_documents - else: - m.all.return_value = mock_document_segments - _scalars_side_effect._calls += 1 - return m - - mock_db_session.scalars.side_effect = _scalars_side_effect - mock_processor = mock_index_processor_factory.return_value.init_index_processor.return_value - - # Act - _duplicate_document_indexing_task(dataset_id, document_ids) - - # Assert - # Verify clean was called for each document - assert mock_processor.clean.call_count == len(mock_documents) - - # Verify segments were deleted in batch (DELETE FROM document_segments) - execute_sqls = [" ".join(str(c[0][0]).split()) for c in mock_db_session.execute.call_args_list] - assert any("DELETE FROM document_segments" in sql for sql in execute_sqls) - - -# ============================================================================ -# Tests for tenant queue wrapper function -# ============================================================================ - - class TestDuplicateDocumentIndexingTaskWithTenantQueue: """Tests for _duplicate_document_indexing_task_with_tenant_queue function.""" @@ -536,11 +159,6 @@ class TestDuplicateDocumentIndexingTaskWithTenantQueue: mock_tenant_isolated_queue.pull_tasks.assert_called_once() -# ============================================================================ -# Tests for normal_duplicate_document_indexing_task -# ============================================================================ - - class TestNormalDuplicateDocumentIndexingTask: """Tests for normal_duplicate_document_indexing_task function.""" @@ -581,11 +199,6 @@ class TestNormalDuplicateDocumentIndexingTask: ) -# ============================================================================ -# Tests for priority_duplicate_document_indexing_task -# ============================================================================ - - class TestPriorityDuplicateDocumentIndexingTask: """Tests for priority_duplicate_document_indexing_task function."""