From 0bf5f4df3b1806d89e8cc6b34c12cf0726915213 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=9C=A8=E4=B9=8B=E6=9C=AC=E6=BE=AA?= Date: Fri, 27 Feb 2026 05:06:42 +0800 Subject: [PATCH] test: migrate dataset_indexing_task SQL tests to testcontainers (#32531) Co-authored-by: KinomotoMio <200703522+KinomotoMio@users.noreply.github.com> Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com> --- .../tasks/test_dataset_indexing_task.py | 704 ++++++++ .../tasks/test_dataset_indexing_task.py | 1502 +---------------- 2 files changed, 705 insertions(+), 1501 deletions(-) create mode 100644 api/tests/test_containers_integration_tests/tasks/test_dataset_indexing_task.py diff --git a/api/tests/test_containers_integration_tests/tasks/test_dataset_indexing_task.py b/api/tests/test_containers_integration_tests/tasks/test_dataset_indexing_task.py new file mode 100644 index 0000000000..c3ad18ecec --- /dev/null +++ b/api/tests/test_containers_integration_tests/tasks/test_dataset_indexing_task.py @@ -0,0 +1,704 @@ +"""Integration tests for dataset indexing task SQL behaviors using testcontainers.""" + +import uuid +from collections.abc import Sequence +from unittest.mock import MagicMock, patch + +import pytest +from faker import Faker + +from core.indexing_runner import DocumentIsPausedError +from enums.cloud_plan import CloudPlan +from models import Account, Tenant, TenantAccountJoin, TenantAccountRole +from models.dataset import Dataset, Document +from tasks.document_indexing_task import ( + _document_indexing, + _document_indexing_with_tenant_queue, + document_indexing_task, + normal_document_indexing_task, + priority_document_indexing_task, +) + + +class _TrackedSessionContext: + def __init__(self, original_context_manager, opened_sessions: list, closed_sessions: list): + self._original_context_manager = original_context_manager + self._opened_sessions = opened_sessions + self._closed_sessions = closed_sessions + self._close_patcher = None + self._session = None + + def __enter__(self): + self._session = self._original_context_manager.__enter__() + self._opened_sessions.append(self._session) + original_close = self._session.close + + def _tracked_close(*args, **kwargs): + self._closed_sessions.append(self._session) + return original_close(*args, **kwargs) + + self._close_patcher = patch.object(self._session, "close", side_effect=_tracked_close) + self._close_patcher.start() + return self._session + + def __exit__(self, exc_type, exc_val, exc_tb): + try: + return self._original_context_manager.__exit__(exc_type, exc_val, exc_tb) + finally: + if self._close_patcher is not None: + self._close_patcher.stop() + + +@pytest.fixture(autouse=True) +def _ensure_testcontainers_db(db_session_with_containers): + """Ensure this suite always runs on testcontainers infrastructure.""" + return db_session_with_containers + + +@pytest.fixture +def session_close_tracker(): + """Track all sessions opened by session_factory and which were closed.""" + opened_sessions = [] + closed_sessions = [] + + from tasks import document_indexing_task as task_module + + original_create_session = task_module.session_factory.create_session + + def _tracked_create_session(*args, **kwargs): + original_context_manager = original_create_session(*args, **kwargs) + return _TrackedSessionContext(original_context_manager, opened_sessions, closed_sessions) + + with patch.object(task_module.session_factory, "create_session", side_effect=_tracked_create_session): + yield {"opened_sessions": opened_sessions, "closed_sessions": closed_sessions} + + +@pytest.fixture +def patched_external_dependencies(): + """Patch non-DB collaborators while keeping database behavior real.""" + with ( + patch("tasks.document_indexing_task.IndexingRunner") as mock_indexing_runner, + patch("tasks.document_indexing_task.FeatureService") as mock_feature_service, + patch("tasks.document_indexing_task.generate_summary_index_task") as mock_summary_task, + ): + mock_runner_instance = MagicMock() + mock_indexing_runner.return_value = mock_runner_instance + + mock_features = MagicMock() + mock_features.billing.enabled = False + mock_features.billing.subscription.plan = CloudPlan.PROFESSIONAL + mock_features.vector_space.limit = 100 + mock_features.vector_space.size = 0 + mock_feature_service.get_features.return_value = mock_features + + yield { + "indexing_runner": mock_indexing_runner, + "indexing_runner_instance": mock_runner_instance, + "feature_service": mock_feature_service, + "features": mock_features, + "summary_task": mock_summary_task, + } + + +class TestDatasetIndexingTaskIntegration: + """1:1 SQL test migration from unit tests to testcontainers integration tests.""" + + def _create_test_dataset_and_documents( + self, + db_session_with_containers, + *, + document_count: int = 3, + document_ids: Sequence[str] | None = None, + ) -> tuple[Dataset, list[Document]]: + """Create a tenant dataset and waiting documents used by indexing tests.""" + fake = Faker() + + account = Account( + email=fake.email(), + name=fake.name(), + interface_language="en-US", + status="active", + ) + db_session_with_containers.add(account) + db_session_with_containers.flush() + + tenant = Tenant(name=fake.company(), status="normal") + db_session_with_containers.add(tenant) + db_session_with_containers.flush() + + join = TenantAccountJoin( + tenant_id=tenant.id, + account_id=account.id, + role=TenantAccountRole.OWNER, + current=True, + ) + db_session_with_containers.add(join) + + dataset = Dataset( + id=fake.uuid4(), + tenant_id=tenant.id, + name=fake.company(), + description=fake.text(max_nb_chars=100), + data_source_type="upload_file", + indexing_technique="high_quality", + created_by=account.id, + ) + db_session_with_containers.add(dataset) + + if document_ids is None: + document_ids = [str(uuid.uuid4()) for _ in range(document_count)] + + documents = [] + for position, document_id in enumerate(document_ids): + document = Document( + id=document_id, + tenant_id=tenant.id, + dataset_id=dataset.id, + position=position, + data_source_type="upload_file", + batch="test_batch", + name=f"doc-{position}.txt", + created_from="upload_file", + created_by=account.id, + indexing_status="waiting", + enabled=True, + ) + db_session_with_containers.add(document) + documents.append(document) + + db_session_with_containers.commit() + db_session_with_containers.refresh(dataset) + + return dataset, documents + + def _query_document(self, db_session_with_containers, document_id: str) -> Document | None: + """Return the latest persisted document state.""" + return db_session_with_containers.query(Document).where(Document.id == document_id).first() + + def _assert_documents_parsing(self, db_session_with_containers, document_ids: Sequence[str]) -> None: + """Assert all target documents are persisted in parsing status.""" + db_session_with_containers.expire_all() + for document_id in document_ids: + updated = self._query_document(db_session_with_containers, document_id) + assert updated is not None + assert updated.indexing_status == "parsing" + assert updated.processing_started_at is not None + + def _assert_documents_error_contains( + self, + db_session_with_containers, + document_ids: Sequence[str], + expected_error_substring: str, + ) -> None: + """Assert all target documents are persisted in error status with message.""" + db_session_with_containers.expire_all() + for document_id in document_ids: + updated = self._query_document(db_session_with_containers, document_id) + assert updated is not None + assert updated.indexing_status == "error" + assert updated.error is not None + assert expected_error_substring in updated.error + assert updated.stopped_at is not None + + def _assert_all_opened_sessions_closed(self, session_close_tracker: dict) -> None: + """Assert that every opened session is eventually closed.""" + opened = session_close_tracker["opened_sessions"] + closed = session_close_tracker["closed_sessions"] + opened_ids = {id(session) for session in opened} + closed_ids = {id(session) for session in closed} + assert len(opened) >= 2 + assert opened_ids <= closed_ids + + def test_legacy_document_indexing_task_still_works(self, db_session_with_containers, patched_external_dependencies): + """Ensure the legacy task entrypoint still updates parsing status.""" + # Arrange + dataset, documents = self._create_test_dataset_and_documents(db_session_with_containers, document_count=2) + document_ids = [doc.id for doc in documents] + + # Act + document_indexing_task(dataset.id, document_ids) + + # Assert + patched_external_dependencies["indexing_runner_instance"].run.assert_called_once() + self._assert_documents_parsing(db_session_with_containers, document_ids) + + def test_batch_processing_multiple_documents(self, db_session_with_containers, patched_external_dependencies): + """Process multiple documents in one batch.""" + # Arrange + dataset, documents = self._create_test_dataset_and_documents(db_session_with_containers, document_count=3) + document_ids = [doc.id for doc in documents] + + # Act + _document_indexing(dataset.id, document_ids) + + # Assert + patched_external_dependencies["indexing_runner_instance"].run.assert_called_once() + run_args = patched_external_dependencies["indexing_runner_instance"].run.call_args[0][0] + assert len(run_args) == len(document_ids) + self._assert_documents_parsing(db_session_with_containers, document_ids) + + def test_batch_processing_with_limit_check(self, db_session_with_containers, patched_external_dependencies): + """Reject batches larger than configured upload limit. + + This test patches config only to force a deterministic limit branch while keeping SQL writes real. + """ + # Arrange + dataset, documents = self._create_test_dataset_and_documents(db_session_with_containers, document_count=3) + document_ids = [doc.id for doc in documents] + features = patched_external_dependencies["features"] + features.billing.enabled = True + features.billing.subscription.plan = CloudPlan.PROFESSIONAL + features.vector_space.limit = 100 + features.vector_space.size = 50 + + # Act + with patch("tasks.document_indexing_task.dify_config.BATCH_UPLOAD_LIMIT", "2"): + _document_indexing(dataset.id, document_ids) + + # Assert + patched_external_dependencies["indexing_runner_instance"].run.assert_not_called() + self._assert_documents_error_contains(db_session_with_containers, document_ids, "batch upload limit") + + def test_batch_processing_sandbox_plan_single_document_only( + self, db_session_with_containers, patched_external_dependencies + ): + """Reject multi-document upload under sandbox plan.""" + # Arrange + dataset, documents = self._create_test_dataset_and_documents(db_session_with_containers, document_count=2) + document_ids = [doc.id for doc in documents] + features = patched_external_dependencies["features"] + features.billing.enabled = True + features.billing.subscription.plan = CloudPlan.SANDBOX + + # Act + _document_indexing(dataset.id, document_ids) + + # Assert + patched_external_dependencies["indexing_runner_instance"].run.assert_not_called() + self._assert_documents_error_contains(db_session_with_containers, document_ids, "does not support batch upload") + + def test_batch_processing_empty_document_list(self, db_session_with_containers, patched_external_dependencies): + """Handle empty list input without failing.""" + # Arrange + dataset, _ = self._create_test_dataset_and_documents(db_session_with_containers, document_count=0) + + # Act + _document_indexing(dataset.id, []) + + # Assert + patched_external_dependencies["indexing_runner_instance"].run.assert_called_once_with([]) + + def test_tenant_queue_dispatches_next_task_after_completion( + self, db_session_with_containers, patched_external_dependencies + ): + """Dispatch the next queued task after current tenant task completes. + + Queue APIs are patched to isolate dispatch side effects while preserving DB assertions. + """ + # Arrange + dataset, documents = self._create_test_dataset_and_documents(db_session_with_containers, document_count=1) + document_ids = [doc.id for doc in documents] + next_task = { + "tenant_id": dataset.tenant_id, + "dataset_id": dataset.id, + "document_ids": [str(uuid.uuid4())], + } + task_dispatch_spy = MagicMock() + + # Act + with ( + patch("tasks.document_indexing_task.TenantIsolatedTaskQueue.pull_tasks", return_value=[next_task]), + patch("tasks.document_indexing_task.TenantIsolatedTaskQueue.set_task_waiting_time") as set_waiting_spy, + patch("tasks.document_indexing_task.TenantIsolatedTaskQueue.delete_task_key") as delete_key_spy, + ): + _document_indexing_with_tenant_queue(dataset.tenant_id, dataset.id, document_ids, task_dispatch_spy) + + # Assert + task_dispatch_spy.delay.assert_called_once_with( + tenant_id=next_task["tenant_id"], + dataset_id=next_task["dataset_id"], + document_ids=next_task["document_ids"], + ) + set_waiting_spy.assert_called_once() + delete_key_spy.assert_not_called() + + def test_tenant_queue_deletes_running_key_when_no_follow_up_tasks( + self, db_session_with_containers, patched_external_dependencies + ): + """Delete tenant running flag when queue has no pending tasks. + + Queue APIs are patched to isolate dispatch side effects while preserving DB assertions. + """ + # Arrange + dataset, documents = self._create_test_dataset_and_documents(db_session_with_containers, document_count=1) + document_ids = [doc.id for doc in documents] + task_dispatch_spy = MagicMock() + + # Act + with ( + patch("tasks.document_indexing_task.TenantIsolatedTaskQueue.pull_tasks", return_value=[]), + patch("tasks.document_indexing_task.TenantIsolatedTaskQueue.delete_task_key") as delete_key_spy, + ): + _document_indexing_with_tenant_queue(dataset.tenant_id, dataset.id, document_ids, task_dispatch_spy) + + # Assert + task_dispatch_spy.delay.assert_not_called() + delete_key_spy.assert_called_once() + + def test_validation_failure_sets_error_status_when_vector_space_at_limit( + self, db_session_with_containers, patched_external_dependencies + ): + """Set error status when vector space validation fails before runner phase.""" + # Arrange + dataset, documents = self._create_test_dataset_and_documents(db_session_with_containers, document_count=3) + document_ids = [doc.id for doc in documents] + features = patched_external_dependencies["features"] + features.billing.enabled = True + features.billing.subscription.plan = CloudPlan.PROFESSIONAL + features.vector_space.limit = 100 + features.vector_space.size = 100 + + # Act + _document_indexing(dataset.id, document_ids) + + # Assert + patched_external_dependencies["indexing_runner_instance"].run.assert_not_called() + self._assert_documents_error_contains(db_session_with_containers, document_ids, "over the limit") + + def test_runner_exception_does_not_crash_indexing_task( + self, db_session_with_containers, patched_external_dependencies + ): + """Catch generic runner exceptions without crashing the task.""" + # Arrange + dataset, documents = self._create_test_dataset_and_documents(db_session_with_containers, document_count=2) + document_ids = [doc.id for doc in documents] + patched_external_dependencies["indexing_runner_instance"].run.side_effect = Exception("runner failed") + + # Act + _document_indexing(dataset.id, document_ids) + + # Assert + patched_external_dependencies["indexing_runner_instance"].run.assert_called_once() + self._assert_documents_parsing(db_session_with_containers, document_ids) + + def test_document_paused_error_handling(self, db_session_with_containers, patched_external_dependencies): + """Handle DocumentIsPausedError and keep persisted state consistent.""" + # Arrange + dataset, documents = self._create_test_dataset_and_documents(db_session_with_containers, document_count=2) + document_ids = [doc.id for doc in documents] + patched_external_dependencies["indexing_runner_instance"].run.side_effect = DocumentIsPausedError("paused") + + # Act + _document_indexing(dataset.id, document_ids) + + # Assert + patched_external_dependencies["indexing_runner_instance"].run.assert_called_once() + self._assert_documents_parsing(db_session_with_containers, document_ids) + + def test_dataset_not_found_error_handling(self, patched_external_dependencies): + """Exit gracefully when dataset does not exist.""" + # Arrange + missing_dataset_id = str(uuid.uuid4()) + missing_document_id = str(uuid.uuid4()) + + # Act + _document_indexing(missing_dataset_id, [missing_document_id]) + + # Assert + patched_external_dependencies["indexing_runner_instance"].run.assert_not_called() + + def test_tenant_queue_error_handling_still_processes_next_task( + self, db_session_with_containers, patched_external_dependencies + ): + """Even on current task failure, enqueue the next waiting tenant task. + + Queue APIs are patched to isolate dispatch side effects while preserving DB assertions. + """ + # Arrange + dataset, documents = self._create_test_dataset_and_documents(db_session_with_containers, document_count=1) + document_ids = [doc.id for doc in documents] + next_task = { + "tenant_id": dataset.tenant_id, + "dataset_id": dataset.id, + "document_ids": [str(uuid.uuid4())], + } + task_dispatch_spy = MagicMock() + + # Act + with ( + patch("tasks.document_indexing_task._document_indexing", side_effect=Exception("failed")), + patch("tasks.document_indexing_task.TenantIsolatedTaskQueue.pull_tasks", return_value=[next_task]), + patch("tasks.document_indexing_task.TenantIsolatedTaskQueue.set_task_waiting_time"), + ): + _document_indexing_with_tenant_queue(dataset.tenant_id, dataset.id, document_ids, task_dispatch_spy) + + # Assert + task_dispatch_spy.delay.assert_called_once() + + def test_sessions_close_on_successful_indexing( + self, + db_session_with_containers, + patched_external_dependencies, + session_close_tracker, + ): + """Close all opened sessions in successful indexing path.""" + # Arrange + dataset, documents = self._create_test_dataset_and_documents(db_session_with_containers, document_count=2) + document_ids = [doc.id for doc in documents] + + # Act + _document_indexing(dataset.id, document_ids) + + # Assert + self._assert_all_opened_sessions_closed(session_close_tracker) + + def test_sessions_close_when_runner_raises( + self, + db_session_with_containers, + patched_external_dependencies, + session_close_tracker, + ): + """Close opened sessions even when runner fails.""" + # Arrange + dataset, documents = self._create_test_dataset_and_documents(db_session_with_containers, document_count=2) + document_ids = [doc.id for doc in documents] + patched_external_dependencies["indexing_runner_instance"].run.side_effect = Exception("boom") + + # Act + _document_indexing(dataset.id, document_ids) + + # Assert + self._assert_all_opened_sessions_closed(session_close_tracker) + + def test_multiple_documents_with_mixed_success_and_failure( + self, db_session_with_containers, patched_external_dependencies + ): + """Process only existing documents when request includes missing ids.""" + # Arrange + dataset, documents = self._create_test_dataset_and_documents(db_session_with_containers, document_count=2) + existing_ids = [doc.id for doc in documents] + mixed_ids = [existing_ids[0], str(uuid.uuid4()), existing_ids[1]] + + # Act + _document_indexing(dataset.id, mixed_ids) + + # Assert + run_args = patched_external_dependencies["indexing_runner_instance"].run.call_args[0][0] + assert len(run_args) == 2 + self._assert_documents_parsing(db_session_with_containers, existing_ids) + + def test_tenant_queue_dispatches_up_to_concurrency_limit( + self, db_session_with_containers, patched_external_dependencies + ): + """Dispatch only up to configured concurrency under queued backlog burst. + + Queue APIs are patched to isolate dispatch side effects while preserving DB assertions. + """ + # Arrange + dataset, documents = self._create_test_dataset_and_documents(db_session_with_containers, document_count=1) + document_ids = [doc.id for doc in documents] + concurrency_limit = 3 + backlog_size = 20 + pending_tasks = [ + {"tenant_id": dataset.tenant_id, "dataset_id": dataset.id, "document_ids": [f"doc_{idx}"]} + for idx in range(backlog_size) + ] + task_dispatch_spy = MagicMock() + + # Act + with ( + patch("tasks.document_indexing_task.dify_config.TENANT_ISOLATED_TASK_CONCURRENCY", concurrency_limit), + patch( + "tasks.document_indexing_task.TenantIsolatedTaskQueue.pull_tasks", + return_value=pending_tasks[:concurrency_limit], + ), + patch("tasks.document_indexing_task.TenantIsolatedTaskQueue.set_task_waiting_time") as set_waiting_spy, + ): + _document_indexing_with_tenant_queue(dataset.tenant_id, dataset.id, document_ids, task_dispatch_spy) + + # Assert + assert task_dispatch_spy.delay.call_count == concurrency_limit + assert set_waiting_spy.call_count == concurrency_limit + + def test_task_queue_fifo_ordering(self, db_session_with_containers, patched_external_dependencies): + """Keep FIFO ordering when dispatching next queued tasks. + + Queue APIs are patched to isolate dispatch side effects while preserving DB assertions. + """ + # Arrange + dataset, documents = self._create_test_dataset_and_documents(db_session_with_containers, document_count=1) + document_ids = [doc.id for doc in documents] + ordered_tasks = [ + {"tenant_id": dataset.tenant_id, "dataset_id": dataset.id, "document_ids": ["task_A"]}, + {"tenant_id": dataset.tenant_id, "dataset_id": dataset.id, "document_ids": ["task_B"]}, + {"tenant_id": dataset.tenant_id, "dataset_id": dataset.id, "document_ids": ["task_C"]}, + ] + task_dispatch_spy = MagicMock() + + # Act + with ( + patch("tasks.document_indexing_task.dify_config.TENANT_ISOLATED_TASK_CONCURRENCY", 3), + patch("tasks.document_indexing_task.TenantIsolatedTaskQueue.pull_tasks", return_value=ordered_tasks), + patch("tasks.document_indexing_task.TenantIsolatedTaskQueue.set_task_waiting_time"), + ): + _document_indexing_with_tenant_queue(dataset.tenant_id, dataset.id, document_ids, task_dispatch_spy) + + # Assert + assert task_dispatch_spy.delay.call_count == 3 + for index, expected_task in enumerate(ordered_tasks): + assert task_dispatch_spy.delay.call_args_list[index].kwargs["document_ids"] == expected_task["document_ids"] + + def test_billing_disabled_skips_limit_checks(self, db_session_with_containers, patched_external_dependencies): + """Skip limit checks when billing feature is disabled.""" + # Arrange + large_document_ids = [str(uuid.uuid4()) for _ in range(100)] + dataset, _ = self._create_test_dataset_and_documents( + db_session_with_containers, + document_ids=large_document_ids, + ) + features = patched_external_dependencies["features"] + features.billing.enabled = False + + # Act + _document_indexing(dataset.id, large_document_ids) + + # Assert + run_args = patched_external_dependencies["indexing_runner_instance"].run.call_args[0][0] + assert len(run_args) == 100 + self._assert_documents_parsing(db_session_with_containers, large_document_ids) + + def test_complete_workflow_normal_task(self, db_session_with_containers, patched_external_dependencies): + """Run end-to-end normal queue workflow with tenant queue cleanup. + + Queue APIs are patched to isolate dispatch side effects while preserving DB assertions. + """ + # Arrange + dataset, documents = self._create_test_dataset_and_documents(db_session_with_containers, document_count=2) + document_ids = [doc.id for doc in documents] + + # Act + with ( + patch("tasks.document_indexing_task.TenantIsolatedTaskQueue.pull_tasks", return_value=[]), + patch("tasks.document_indexing_task.TenantIsolatedTaskQueue.delete_task_key") as delete_key_spy, + ): + normal_document_indexing_task(dataset.tenant_id, dataset.id, document_ids) + + # Assert + patched_external_dependencies["indexing_runner_instance"].run.assert_called_once() + self._assert_documents_parsing(db_session_with_containers, document_ids) + delete_key_spy.assert_called_once() + + def test_complete_workflow_priority_task(self, db_session_with_containers, patched_external_dependencies): + """Run end-to-end priority queue workflow with tenant queue cleanup. + + Queue APIs are patched to isolate dispatch side effects while preserving DB assertions. + """ + # Arrange + dataset, documents = self._create_test_dataset_and_documents(db_session_with_containers, document_count=2) + document_ids = [doc.id for doc in documents] + + # Act + with ( + patch("tasks.document_indexing_task.TenantIsolatedTaskQueue.pull_tasks", return_value=[]), + patch("tasks.document_indexing_task.TenantIsolatedTaskQueue.delete_task_key") as delete_key_spy, + ): + priority_document_indexing_task(dataset.tenant_id, dataset.id, document_ids) + + # Assert + patched_external_dependencies["indexing_runner_instance"].run.assert_called_once() + self._assert_documents_parsing(db_session_with_containers, document_ids) + delete_key_spy.assert_called_once() + + def test_single_document_processing(self, db_session_with_containers, patched_external_dependencies): + """Process the minimum batch size (single document).""" + # Arrange + dataset, documents = self._create_test_dataset_and_documents(db_session_with_containers, document_count=1) + document_id = documents[0].id + + # Act + _document_indexing(dataset.id, [document_id]) + + # Assert + run_args = patched_external_dependencies["indexing_runner_instance"].run.call_args[0][0] + assert len(run_args) == 1 + self._assert_documents_parsing(db_session_with_containers, [document_id]) + + def test_document_with_special_characters_in_id(self, db_session_with_containers, patched_external_dependencies): + """Handle standard UUID ids with hyphen characters safely.""" + # Arrange + special_document_id = str(uuid.uuid4()) + dataset, _ = self._create_test_dataset_and_documents( + db_session_with_containers, + document_ids=[special_document_id], + ) + + # Act + _document_indexing(dataset.id, [special_document_id]) + + # Assert + self._assert_documents_parsing(db_session_with_containers, [special_document_id]) + + def test_zero_vector_space_limit_allows_unlimited(self, db_session_with_containers, patched_external_dependencies): + """Treat vector limit 0 as unlimited and continue indexing.""" + # Arrange + dataset, documents = self._create_test_dataset_and_documents(db_session_with_containers, document_count=3) + document_ids = [doc.id for doc in documents] + features = patched_external_dependencies["features"] + features.billing.enabled = True + features.billing.subscription.plan = CloudPlan.PROFESSIONAL + features.vector_space.limit = 0 + features.vector_space.size = 1000 + + # Act + _document_indexing(dataset.id, document_ids) + + # Assert + patched_external_dependencies["indexing_runner_instance"].run.assert_called_once() + self._assert_documents_parsing(db_session_with_containers, document_ids) + + def test_negative_vector_space_values_handled_gracefully( + self, db_session_with_containers, patched_external_dependencies + ): + """Treat negative vector limits as non-blocking and continue indexing.""" + # Arrange + dataset, documents = self._create_test_dataset_and_documents(db_session_with_containers, document_count=3) + document_ids = [doc.id for doc in documents] + features = patched_external_dependencies["features"] + features.billing.enabled = True + features.billing.subscription.plan = CloudPlan.PROFESSIONAL + features.vector_space.limit = -1 + features.vector_space.size = 100 + + # Act + _document_indexing(dataset.id, document_ids) + + # Assert + patched_external_dependencies["indexing_runner_instance"].run.assert_called_once() + self._assert_documents_parsing(db_session_with_containers, document_ids) + + def test_large_document_batch_processing(self, db_session_with_containers, patched_external_dependencies): + """Process a batch exactly at configured upload limit. + + This test patches config only to force a deterministic limit branch while keeping SQL writes real. + """ + # Arrange + batch_limit = 50 + document_ids = [str(uuid.uuid4()) for _ in range(batch_limit)] + dataset, _ = self._create_test_dataset_and_documents( + db_session_with_containers, + document_ids=document_ids, + ) + features = patched_external_dependencies["features"] + features.billing.enabled = True + features.billing.subscription.plan = CloudPlan.PROFESSIONAL + features.vector_space.limit = 10000 + features.vector_space.size = 0 + + # Act + with patch("tasks.document_indexing_task.dify_config.BATCH_UPLOAD_LIMIT", str(batch_limit)): + _document_indexing(dataset.id, document_ids) + + # Assert + run_args = patched_external_dependencies["indexing_runner_instance"].run.call_args[0][0] + assert len(run_args) == batch_limit + self._assert_documents_parsing(db_session_with_containers, document_ids) diff --git a/api/tests/unit_tests/tasks/test_dataset_indexing_task.py b/api/tests/unit_tests/tasks/test_dataset_indexing_task.py index 8d8e2b0db0..11b4663187 100644 --- a/api/tests/unit_tests/tasks/test_dataset_indexing_task.py +++ b/api/tests/unit_tests/tasks/test_dataset_indexing_task.py @@ -10,23 +10,14 @@ This module tests the document indexing task functionality including: """ import uuid -from unittest.mock import MagicMock, Mock, patch +from unittest.mock import Mock, patch import pytest -from core.indexing_runner import DocumentIsPausedError, IndexingRunner from core.rag.pipeline.queue import TenantIsolatedTaskQueue from enums.cloud_plan import CloudPlan from extensions.ext_redis import redis_client -from models.dataset import Dataset, Document from services.document_indexing_proxy.document_indexing_task_proxy import DocumentIndexingTaskProxy -from tasks.document_indexing_task import ( - _document_indexing, - _document_indexing_with_tenant_queue, - document_indexing_task, - normal_document_indexing_task, - priority_document_indexing_task, -) # ============================================================================ # Fixtures @@ -51,177 +42,6 @@ def document_ids(): return [str(uuid.uuid4()) for _ in range(3)] -@pytest.fixture -def mock_dataset(dataset_id, tenant_id): - """Create a mock Dataset object.""" - dataset = Mock(spec=Dataset) - dataset.id = dataset_id - dataset.tenant_id = tenant_id - dataset.indexing_technique = "high_quality" - dataset.embedding_model_provider = "openai" - dataset.embedding_model = "text-embedding-ada-002" - return dataset - - -@pytest.fixture -def mock_documents(document_ids, dataset_id): - """Create mock Document objects.""" - documents = [] - for doc_id in document_ids: - doc = Mock(spec=Document) - doc.id = doc_id - doc.dataset_id = dataset_id - doc.indexing_status = "waiting" - doc.error = None - doc.stopped_at = None - doc.processing_started_at = None - documents.append(doc) - return documents - - -@pytest.fixture -def mock_db_session(): - """Mock database session via session_factory.create_session().""" - with patch("tasks.document_indexing_task.session_factory") as mock_sf: - sessions = [] # Track all created sessions - # Shared mock data that all sessions will access - shared_mock_data = {"dataset": None, "documents": None, "doc_iter": None} - - def create_session_side_effect(): - session = MagicMock() - session.close = MagicMock() - - # Track commit calls - commit_mock = MagicMock() - session.commit = commit_mock - cm = MagicMock() - cm.__enter__.return_value = session - - def _exit_side_effect(*args, **kwargs): - session.close() - - cm.__exit__.side_effect = _exit_side_effect - - # Support session.begin() for transactions - begin_cm = MagicMock() - begin_cm.__enter__.return_value = session - - def begin_exit_side_effect(*args, **kwargs): - # Auto-commit on transaction exit (like SQLAlchemy) - session.commit() - # Also mark wrapper's commit as called - if sessions: - sessions[0].commit() - - begin_cm.__exit__ = MagicMock(side_effect=begin_exit_side_effect) - session.begin = MagicMock(return_value=begin_cm) - - sessions.append(session) - - # Setup query with side_effect to handle both Dataset and Document queries - def query_side_effect(*args): - query = MagicMock() - if args and args[0] == Dataset and shared_mock_data["dataset"] is not None: - where_result = MagicMock() - where_result.first.return_value = shared_mock_data["dataset"] - query.where = MagicMock(return_value=where_result) - elif args and args[0] == Document and shared_mock_data["documents"] is not None: - # Support both .first() and .all() calls with chaining - where_result = MagicMock() - where_result.where = MagicMock(return_value=where_result) - - # Create an iterator for .first() calls if not exists - if shared_mock_data["doc_iter"] is None: - docs = shared_mock_data["documents"] or [None] - shared_mock_data["doc_iter"] = iter(docs) - - where_result.first = lambda: next(shared_mock_data["doc_iter"], None) - docs_or_empty = shared_mock_data["documents"] or [] - where_result.all = MagicMock(return_value=docs_or_empty) - query.where = MagicMock(return_value=where_result) - else: - query.where = MagicMock(return_value=query) - return query - - session.query = MagicMock(side_effect=query_side_effect) - return cm - - mock_sf.create_session.side_effect = create_session_side_effect - - # Create a wrapper that behaves like the first session but has access to all sessions - class SessionWrapper: - def __init__(self): - self._sessions = sessions - self._shared_data = shared_mock_data - # Create a default session for setup phase - self._default_session = MagicMock() - self._default_session.close = MagicMock() - self._default_session.commit = MagicMock() - - # Support session.begin() for default session too - begin_cm = MagicMock() - begin_cm.__enter__.return_value = self._default_session - - def default_begin_exit_side_effect(*args, **kwargs): - self._default_session.commit() - - begin_cm.__exit__ = MagicMock(side_effect=default_begin_exit_side_effect) - self._default_session.begin = MagicMock(return_value=begin_cm) - - def default_query_side_effect(*args): - query = MagicMock() - if args and args[0] == Dataset and shared_mock_data["dataset"] is not None: - where_result = MagicMock() - where_result.first.return_value = shared_mock_data["dataset"] - query.where = MagicMock(return_value=where_result) - elif args and args[0] == Document and shared_mock_data["documents"] is not None: - where_result = MagicMock() - where_result.where = MagicMock(return_value=where_result) - - if shared_mock_data["doc_iter"] is None: - docs = shared_mock_data["documents"] or [None] - shared_mock_data["doc_iter"] = iter(docs) - - where_result.first = lambda: next(shared_mock_data["doc_iter"], None) - docs_or_empty = shared_mock_data["documents"] or [] - where_result.all = MagicMock(return_value=docs_or_empty) - query.where = MagicMock(return_value=where_result) - else: - query.where = MagicMock(return_value=query) - return query - - self._default_session.query = MagicMock(side_effect=default_query_side_effect) - - def __getattr__(self, name): - # Forward all attribute access to the first session, or default if none created yet - target_session = self._sessions[0] if self._sessions else self._default_session - return getattr(target_session, name) - - @property - def all_sessions(self): - """Access all created sessions for testing.""" - return self._sessions - - wrapper = SessionWrapper() - yield wrapper - - -@pytest.fixture -def mock_indexing_runner(): - """Mock IndexingRunner.""" - with patch("tasks.document_indexing_task.IndexingRunner") as mock_runner_class: - mock_runner = MagicMock(spec=IndexingRunner) - mock_runner_class.return_value = mock_runner - yield mock_runner - - -@pytest.fixture -def mock_feature_service(): - """Mock FeatureService for billing and feature checks.""" - with patch("tasks.document_indexing_task.FeatureService") as mock_service: - yield mock_service - - @pytest.fixture def mock_redis(): """Mock Redis client operations.""" @@ -346,492 +166,6 @@ class TestTaskEnqueuing: assert mock_redis.lpush.called mock_task.delay.assert_not_called() - def test_legacy_document_indexing_task_still_works( - self, dataset_id, document_ids, mock_db_session, mock_dataset, mock_documents, mock_indexing_runner - ): - """ - Test that the legacy document_indexing_task function still works. - - This ensures backward compatibility for existing code that may still - use the deprecated function. - """ - # Arrange - # Set shared mock data so all sessions can access it - mock_db_session._shared_data["dataset"] = mock_dataset - mock_db_session._shared_data["documents"] = mock_documents - - with patch("tasks.document_indexing_task.FeatureService.get_features") as mock_features: - mock_features.return_value.billing.enabled = False - - # Act - document_indexing_task(dataset_id, document_ids) - - # Assert - mock_indexing_runner.run.assert_called_once() - - -# ============================================================================ -# Test Batch Processing -# ============================================================================ - - -class TestBatchProcessing: - """Test cases for batch processing of multiple documents.""" - - def test_batch_processing_multiple_documents( - self, dataset_id, document_ids, mock_db_session, mock_dataset, mock_indexing_runner - ): - """ - Test batch processing of multiple documents. - - All documents in the batch should be processed together and their - status should be updated to 'parsing'. - """ - # Arrange - Create actual document objects that can be modified - mock_documents = [] - for doc_id in document_ids: - doc = MagicMock(spec=Document) - doc.id = doc_id - doc.dataset_id = dataset_id - doc.indexing_status = "waiting" - doc.error = None - doc.stopped_at = None - doc.processing_started_at = None - mock_documents.append(doc) - - # Set shared mock data so all sessions can access it - mock_db_session._shared_data["dataset"] = mock_dataset - mock_db_session._shared_data["documents"] = mock_documents - - with patch("tasks.document_indexing_task.FeatureService.get_features") as mock_features: - mock_features.return_value.billing.enabled = False - - # Act - _document_indexing(dataset_id, document_ids) - - # Assert - All documents should be set to 'parsing' status - for doc in mock_documents: - assert doc.indexing_status == "parsing" - assert doc.processing_started_at is not None - - # IndexingRunner should be called with all documents - mock_indexing_runner.run.assert_called_once() - call_args = mock_indexing_runner.run.call_args[0][0] - assert len(call_args) == len(document_ids) - - def test_batch_processing_with_limit_check(self, dataset_id, mock_db_session, mock_dataset, mock_feature_service): - """ - Test batch processing respects upload limits. - - When the number of documents exceeds the batch upload limit, - an error should be raised and all documents should be marked as error. - """ - # Arrange - batch_limit = 10 - document_ids = [str(uuid.uuid4()) for _ in range(batch_limit + 1)] - - mock_documents = [] - for doc_id in document_ids: - doc = MagicMock(spec=Document) - doc.id = doc_id - doc.dataset_id = dataset_id - doc.indexing_status = "waiting" - doc.error = None - doc.stopped_at = None - mock_documents.append(doc) - - # Set shared mock data so all sessions can access it - mock_db_session._shared_data["dataset"] = mock_dataset - mock_db_session._shared_data["documents"] = mock_documents - - mock_feature_service.get_features.return_value.billing.enabled = True - mock_feature_service.get_features.return_value.billing.subscription.plan = CloudPlan.PROFESSIONAL - mock_feature_service.get_features.return_value.vector_space.limit = 1000 - mock_feature_service.get_features.return_value.vector_space.size = 0 - - with patch("tasks.document_indexing_task.dify_config.BATCH_UPLOAD_LIMIT", str(batch_limit)): - # Act - _document_indexing(dataset_id, document_ids) - - # Assert - All documents should have error status - for doc in mock_documents: - assert doc.indexing_status == "error" - assert doc.error is not None - assert "batch upload limit" in doc.error - - def test_batch_processing_sandbox_plan_single_document_only( - self, dataset_id, mock_db_session, mock_dataset, mock_feature_service - ): - """ - Test that sandbox plan only allows single document upload. - - Sandbox plan should reject batch uploads (more than 1 document). - """ - # Arrange - document_ids = [str(uuid.uuid4()) for _ in range(2)] - - mock_documents = [] - for doc_id in document_ids: - doc = MagicMock(spec=Document) - doc.id = doc_id - doc.dataset_id = dataset_id - doc.indexing_status = "waiting" - doc.error = None - doc.stopped_at = None - mock_documents.append(doc) - - # Set shared mock data so all sessions can access it - mock_db_session._shared_data["dataset"] = mock_dataset - mock_db_session._shared_data["documents"] = mock_documents - - mock_feature_service.get_features.return_value.billing.enabled = True - mock_feature_service.get_features.return_value.billing.subscription.plan = CloudPlan.SANDBOX - mock_feature_service.get_features.return_value.vector_space.limit = 1000 - mock_feature_service.get_features.return_value.vector_space.size = 0 - - # Act - _document_indexing(dataset_id, document_ids) - - # Assert - All documents should have error status - for doc in mock_documents: - assert doc.indexing_status == "error" - assert "does not support batch upload" in doc.error - - def test_batch_processing_empty_document_list( - self, dataset_id, mock_db_session, mock_dataset, mock_indexing_runner - ): - """ - Test batch processing with empty document list. - - Should handle empty list gracefully without errors. - """ - # Arrange - document_ids = [] - - # Set shared mock data with empty documents list - mock_db_session._shared_data["dataset"] = mock_dataset - mock_db_session._shared_data["documents"] = [] - - with patch("tasks.document_indexing_task.FeatureService.get_features") as mock_features: - mock_features.return_value.billing.enabled = False - - # Act - _document_indexing(dataset_id, document_ids) - - # Assert - IndexingRunner should still be called with empty list - mock_indexing_runner.run.assert_called_once_with([]) - - -# ============================================================================ -# Test Progress Tracking -# ============================================================================ - - -class TestProgressTracking: - """Test cases for progress tracking through task lifecycle.""" - - def test_document_status_progression( - self, dataset_id, document_ids, mock_db_session, mock_dataset, mock_indexing_runner - ): - """ - Test document status progresses correctly through lifecycle. - - Documents should transition from 'waiting' -> 'parsing' -> processed. - """ - # Arrange - Create actual document objects - mock_documents = [] - for doc_id in document_ids: - doc = MagicMock(spec=Document) - doc.id = doc_id - doc.dataset_id = dataset_id - doc.indexing_status = "waiting" - doc.processing_started_at = None - mock_documents.append(doc) - - # Set shared mock data so all sessions can access it - mock_db_session._shared_data["dataset"] = mock_dataset - mock_db_session._shared_data["documents"] = mock_documents - - with patch("tasks.document_indexing_task.FeatureService.get_features") as mock_features: - mock_features.return_value.billing.enabled = False - - # Act - _document_indexing(dataset_id, document_ids) - - # Assert - Status should be 'parsing' - for doc in mock_documents: - assert doc.indexing_status == "parsing" - assert doc.processing_started_at is not None - - # Verify commit was called to persist status - assert mock_db_session.commit.called - - def test_processing_started_timestamp_set( - self, dataset_id, document_ids, mock_db_session, mock_dataset, mock_indexing_runner - ): - """ - Test that processing_started_at timestamp is set correctly. - - When documents start processing, the timestamp should be recorded. - """ - # Arrange - Create actual document objects - mock_documents = [] - for doc_id in document_ids: - doc = MagicMock(spec=Document) - doc.id = doc_id - doc.dataset_id = dataset_id - doc.indexing_status = "waiting" - doc.processing_started_at = None - mock_documents.append(doc) - - # Set shared mock data so all sessions can access it - mock_db_session._shared_data["dataset"] = mock_dataset - mock_db_session._shared_data["documents"] = mock_documents - - with patch("tasks.document_indexing_task.FeatureService.get_features") as mock_features: - mock_features.return_value.billing.enabled = False - - # Act - _document_indexing(dataset_id, document_ids) - - # Assert - for doc in mock_documents: - assert doc.processing_started_at is not None - - def test_tenant_queue_processes_next_task_after_completion( - self, tenant_id, dataset_id, document_ids, mock_redis, mock_db_session, mock_dataset, mock_indexing_runner - ): - """ - Test that tenant queue processes next waiting task after completion. - - After a task completes, the system should check for waiting tasks - and process the next one. - """ - # Arrange - next_task_data = {"tenant_id": tenant_id, "dataset_id": dataset_id, "document_ids": ["next_doc_id"]} - - # Simulate next task in queue - from core.rag.pipeline.queue import TaskWrapper - - wrapper = TaskWrapper(data=next_task_data) - mock_redis.rpop.return_value = wrapper.serialize() - - mock_db_session.query.return_value.where.return_value.first.return_value = mock_dataset - - with patch("tasks.document_indexing_task.FeatureService.get_features") as mock_features: - mock_features.return_value.billing.enabled = False - - with patch("tasks.document_indexing_task.normal_document_indexing_task") as mock_task: - # Act - _document_indexing_with_tenant_queue(tenant_id, dataset_id, document_ids, mock_task) - - # Assert - Next task should be enqueued - mock_task.delay.assert_called() - # Task key should be set for next task - assert mock_redis.setex.called - - def test_tenant_queue_clears_flag_when_no_more_tasks( - self, tenant_id, dataset_id, document_ids, mock_redis, mock_db_session, mock_dataset, mock_indexing_runner - ): - """ - Test that tenant queue clears flag when no more tasks are waiting. - - When there are no more tasks in the queue, the task key should be deleted. - """ - # Arrange - mock_redis.rpop.return_value = None # No more tasks - mock_db_session.query.return_value.where.return_value.first.return_value = mock_dataset - - with patch("tasks.document_indexing_task.FeatureService.get_features") as mock_features: - mock_features.return_value.billing.enabled = False - - with patch("tasks.document_indexing_task.normal_document_indexing_task") as mock_task: - # Act - _document_indexing_with_tenant_queue(tenant_id, dataset_id, document_ids, mock_task) - - # Assert - Task key should be deleted - assert mock_redis.delete.called - - -# ============================================================================ -# Test Error Handling and Retries -# ============================================================================ - - -class TestErrorHandling: - """Test cases for error handling and retry mechanisms.""" - - def test_error_handling_sets_document_error_status( - self, dataset_id, document_ids, mock_db_session, mock_dataset, mock_feature_service - ): - """ - Test that errors during validation set document error status. - - When validation fails (e.g., limit exceeded), documents should be - marked with error status and error message. - """ - # Arrange - Create actual document objects - mock_documents = [] - for doc_id in document_ids: - doc = MagicMock(spec=Document) - doc.id = doc_id - doc.dataset_id = dataset_id - doc.indexing_status = "waiting" - doc.error = None - doc.stopped_at = None - mock_documents.append(doc) - - # Set shared mock data so all sessions can access it - mock_db_session._shared_data["dataset"] = mock_dataset - mock_db_session._shared_data["documents"] = mock_documents - - # Set up to trigger vector space limit error - mock_feature_service.get_features.return_value.billing.enabled = True - mock_feature_service.get_features.return_value.billing.subscription.plan = CloudPlan.PROFESSIONAL - mock_feature_service.get_features.return_value.vector_space.limit = 100 - mock_feature_service.get_features.return_value.vector_space.size = 100 # At limit - - # Act - _document_indexing(dataset_id, document_ids) - - # Assert - for doc in mock_documents: - assert doc.indexing_status == "error" - assert doc.error is not None - assert "over the limit" in doc.error - assert doc.stopped_at is not None - - def test_error_handling_during_indexing_runner( - self, dataset_id, document_ids, mock_db_session, mock_dataset, mock_documents, mock_indexing_runner - ): - """ - Test error handling when IndexingRunner raises an exception. - - Errors during indexing should be caught and logged, but not crash the task. - """ - # Arrange - # Set shared mock data so all sessions can access it - mock_db_session._shared_data["dataset"] = mock_dataset - mock_db_session._shared_data["documents"] = mock_documents - - # Make IndexingRunner raise an exception - mock_indexing_runner.run.side_effect = Exception("Indexing failed") - - with patch("tasks.document_indexing_task.FeatureService.get_features") as mock_features: - mock_features.return_value.billing.enabled = False - - # Act - Should not raise exception - _document_indexing(dataset_id, document_ids) - - # Assert - Session should be closed even after error - assert mock_db_session.close.called - - def test_document_paused_error_handling( - self, dataset_id, document_ids, mock_db_session, mock_dataset, mock_documents, mock_indexing_runner - ): - """ - Test handling of DocumentIsPausedError. - - When a document is paused, the error should be caught and logged - but not treated as a failure. - """ - # Arrange - # Set shared mock data so all sessions can access it - mock_db_session._shared_data["dataset"] = mock_dataset - mock_db_session._shared_data["documents"] = mock_documents - - # Make IndexingRunner raise DocumentIsPausedError - mock_indexing_runner.run.side_effect = DocumentIsPausedError("Document is paused") - - with patch("tasks.document_indexing_task.FeatureService.get_features") as mock_features: - mock_features.return_value.billing.enabled = False - - # Act - Should not raise exception - _document_indexing(dataset_id, document_ids) - - # Assert - Session should be closed - assert mock_db_session.close.called - - def test_dataset_not_found_error_handling(self, dataset_id, document_ids, mock_db_session): - """ - Test handling when dataset is not found. - - If the dataset doesn't exist, the task should exit gracefully. - """ - # Arrange - mock_db_session.query.return_value.where.return_value.first.return_value = None - - # Act - _document_indexing(dataset_id, document_ids) - - # Assert - Session should be closed - assert mock_db_session.close.called - - def test_tenant_queue_error_handling_still_processes_next_task( - self, tenant_id, dataset_id, document_ids, mock_redis, mock_db_session, mock_dataset, mock_indexing_runner - ): - """ - Test that errors don't prevent processing next task in tenant queue. - - Even if the current task fails, the next task should still be processed. - """ - # Arrange - next_task_data = {"tenant_id": tenant_id, "dataset_id": dataset_id, "document_ids": ["next_doc_id"]} - - from core.rag.pipeline.queue import TaskWrapper - - wrapper = TaskWrapper(data=next_task_data) - # Set up rpop to return task once for concurrency check - mock_redis.rpop.side_effect = [wrapper.serialize(), None] - - mock_db_session.query.return_value.where.return_value.first.return_value = mock_dataset - - # Make _document_indexing raise an error - with patch("tasks.document_indexing_task._document_indexing") as mock_indexing: - mock_indexing.side_effect = Exception("Processing failed") - - # Patch logger to avoid format string issue in actual code - with patch("tasks.document_indexing_task.logger"): - with patch("tasks.document_indexing_task.normal_document_indexing_task") as mock_task: - # Act - _document_indexing_with_tenant_queue(tenant_id, dataset_id, document_ids, mock_task) - - # Assert - Next task should still be enqueued despite error - mock_task.delay.assert_called() - - def test_concurrent_task_limit_respected( - self, tenant_id, dataset_id, document_ids, mock_redis, mock_db_session, mock_dataset - ): - """ - Test that tenant isolated task concurrency limit is respected. - - Should pull only TENANT_ISOLATED_TASK_CONCURRENCY tasks at a time. - """ - # Arrange - concurrency_limit = 2 - - # Create multiple tasks in queue - tasks = [] - for i in range(5): - task_data = {"tenant_id": tenant_id, "dataset_id": dataset_id, "document_ids": [f"doc_{i}"]} - from core.rag.pipeline.queue import TaskWrapper - - wrapper = TaskWrapper(data=task_data) - tasks.append(wrapper.serialize()) - - # Mock rpop to return tasks one by one - mock_redis.rpop.side_effect = tasks[:concurrency_limit] + [None] - - mock_db_session.query.return_value.where.return_value.first.return_value = mock_dataset - - with patch("tasks.document_indexing_task.dify_config.TENANT_ISOLATED_TASK_CONCURRENCY", concurrency_limit): - with patch("tasks.document_indexing_task.normal_document_indexing_task") as mock_task: - # Act - _document_indexing_with_tenant_queue(tenant_id, dataset_id, document_ids, mock_task) - - # Assert - Should call delay exactly concurrency_limit times - assert mock_task.delay.call_count == concurrency_limit - # ============================================================================ # Test Task Cancellation @@ -841,76 +175,6 @@ class TestErrorHandling: class TestTaskCancellation: """Test cases for task cancellation and cleanup.""" - def test_task_key_deleted_when_queue_empty( - self, tenant_id, dataset_id, document_ids, mock_redis, mock_db_session, mock_dataset - ): - """ - Test that task key is deleted when queue becomes empty. - - When no more tasks are waiting, the tenant task key should be removed. - """ - # Arrange - mock_redis.rpop.return_value = None # Empty queue - mock_db_session.query.return_value.where.return_value.first.return_value = mock_dataset - - with patch("tasks.document_indexing_task.normal_document_indexing_task") as mock_task: - # Act - _document_indexing_with_tenant_queue(tenant_id, dataset_id, document_ids, mock_task) - - # Assert - assert mock_redis.delete.called - # Verify the correct key was deleted - delete_call_args = mock_redis.delete.call_args[0][0] - assert tenant_id in delete_call_args - assert "document_indexing" in delete_call_args - - def test_session_cleanup_on_success( - self, dataset_id, document_ids, mock_db_session, mock_dataset, mock_documents, mock_indexing_runner - ): - """ - Test that database session is properly closed on success. - - Session cleanup should happen in finally block. - """ - # Arrange - # Set shared mock data so all sessions can access it - mock_db_session._shared_data["dataset"] = mock_dataset - mock_db_session._shared_data["documents"] = mock_documents - - with patch("tasks.document_indexing_task.FeatureService.get_features") as mock_features: - mock_features.return_value.billing.enabled = False - - # Act - _document_indexing(dataset_id, document_ids) - - # Assert - assert mock_db_session.close.called - - def test_session_cleanup_on_error( - self, dataset_id, document_ids, mock_db_session, mock_dataset, mock_documents, mock_indexing_runner - ): - """ - Test that database session is properly closed on error. - - Session cleanup should happen even when errors occur. - """ - # Arrange - # Set shared mock data so all sessions can access it - mock_db_session._shared_data["dataset"] = mock_dataset - mock_db_session._shared_data["documents"] = mock_documents - - # Make IndexingRunner raise an exception - mock_indexing_runner.run.side_effect = Exception("Test error") - - with patch("tasks.document_indexing_task.FeatureService.get_features") as mock_features: - mock_features.return_value.billing.enabled = False - - # Act - _document_indexing(dataset_id, document_ids) - - # Assert - assert mock_db_session.close.called - def test_task_isolation_between_tenants(self, mock_redis): """ Test that tasks are properly isolated between different tenants. @@ -934,407 +198,6 @@ class TestTaskCancellation: assert tenant_2 in queue_2._queue -# ============================================================================ -# Integration Tests -# ============================================================================ - - -class TestAdvancedScenarios: - """Advanced test scenarios for edge cases and complex workflows.""" - - def test_multiple_documents_with_mixed_success_and_failure( - self, dataset_id, mock_db_session, mock_dataset, mock_indexing_runner - ): - """ - Test handling of mixed success and failure scenarios in batch processing. - - When processing multiple documents, some may succeed while others fail. - This tests that the system handles partial failures gracefully. - - Scenario: - - Process 3 documents in a batch - - First document succeeds - - Second document is not found (skipped) - - Third document succeeds - - Expected behavior: - - Only found documents are processed - - Missing documents are skipped without crashing - - IndexingRunner receives only valid documents - """ - # Arrange - Create document IDs with one missing - document_ids = [str(uuid.uuid4()) for _ in range(3)] - - # Create only 2 documents (simulate one missing) - # The new code uses .all() which will only return existing documents - mock_documents = [] - for i, doc_id in enumerate([document_ids[0], document_ids[2]]): # Skip middle one - doc = MagicMock(spec=Document) - doc.id = doc_id - doc.dataset_id = dataset_id - doc.indexing_status = "waiting" - doc.processing_started_at = None - mock_documents.append(doc) - - # Set shared mock data - .all() will only return existing documents - mock_db_session._shared_data["dataset"] = mock_dataset - mock_db_session._shared_data["documents"] = mock_documents - - with patch("tasks.document_indexing_task.FeatureService.get_features") as mock_features: - mock_features.return_value.billing.enabled = False - - # Act - _document_indexing(dataset_id, document_ids) - - # Assert - Only 2 documents should be processed (missing one skipped) - mock_indexing_runner.run.assert_called_once() - call_args = mock_indexing_runner.run.call_args[0][0] - assert len(call_args) == 2 # Only found documents - - def test_tenant_queue_with_multiple_concurrent_tasks( - self, tenant_id, dataset_id, mock_redis, mock_db_session, mock_dataset - ): - """ - Test concurrent task processing with tenant isolation. - - This tests the scenario where multiple tasks are queued for the same tenant - and need to be processed respecting the concurrency limit. - - Scenario: - - 5 tasks are waiting in the queue - - Concurrency limit is 2 - - After current task completes, pull and enqueue next 2 tasks - - Expected behavior: - - Exactly 2 tasks are pulled from queue (respecting concurrency) - - Each task is enqueued with correct parameters - - Task waiting time is set for each new task - """ - # Arrange - concurrency_limit = 2 - document_ids = [str(uuid.uuid4())] - - # Create multiple waiting tasks - waiting_tasks = [] - for i in range(5): - task_data = {"tenant_id": tenant_id, "dataset_id": dataset_id, "document_ids": [f"doc_{i}"]} - from core.rag.pipeline.queue import TaskWrapper - - wrapper = TaskWrapper(data=task_data) - waiting_tasks.append(wrapper.serialize()) - - # Mock rpop to return tasks up to concurrency limit - mock_redis.rpop.side_effect = waiting_tasks[:concurrency_limit] + [None] - mock_db_session.query.return_value.where.return_value.first.return_value = mock_dataset - - with patch("tasks.document_indexing_task.dify_config.TENANT_ISOLATED_TASK_CONCURRENCY", concurrency_limit): - with patch("tasks.document_indexing_task.normal_document_indexing_task") as mock_task: - # Act - _document_indexing_with_tenant_queue(tenant_id, dataset_id, document_ids, mock_task) - - # Assert - # Should call delay exactly concurrency_limit times - assert mock_task.delay.call_count == concurrency_limit - - # Verify task waiting time was set for each task - assert mock_redis.setex.call_count >= concurrency_limit - - def test_vector_space_limit_edge_case_at_exact_limit( - self, dataset_id, document_ids, mock_db_session, mock_dataset, mock_feature_service - ): - """ - Test vector space limit validation at exact boundary. - - Edge case: When vector space is exactly at the limit (not over), - the upload should still be rejected. - - Scenario: - - Vector space limit: 100 - - Current size: 100 (exactly at limit) - - Try to upload 3 documents - - Expected behavior: - - Upload is rejected with appropriate error message - - All documents are marked with error status - """ - # Arrange - mock_documents = [] - for doc_id in document_ids: - doc = MagicMock(spec=Document) - doc.id = doc_id - doc.dataset_id = dataset_id - doc.indexing_status = "waiting" - doc.error = None - doc.stopped_at = None - mock_documents.append(doc) - - # Set shared mock data so all sessions can access it - mock_db_session._shared_data["dataset"] = mock_dataset - mock_db_session._shared_data["documents"] = mock_documents - - # Set vector space exactly at limit - mock_feature_service.get_features.return_value.billing.enabled = True - mock_feature_service.get_features.return_value.billing.subscription.plan = CloudPlan.PROFESSIONAL - mock_feature_service.get_features.return_value.vector_space.limit = 100 - mock_feature_service.get_features.return_value.vector_space.size = 100 # Exactly at limit - - # Act - _document_indexing(dataset_id, document_ids) - - # Assert - All documents should have error status - for doc in mock_documents: - assert doc.indexing_status == "error" - assert "over the limit" in doc.error - - def test_task_queue_fifo_ordering(self, tenant_id, dataset_id, mock_redis, mock_db_session, mock_dataset): - """ - Test that tasks are processed in FIFO (First-In-First-Out) order. - - The tenant isolated queue should maintain task order, ensuring - that tasks are processed in the sequence they were added. - - Scenario: - - Task A added first - - Task B added second - - Task C added third - - When pulling tasks, should get A, then B, then C - - Expected behavior: - - Tasks are retrieved in the order they were added - - FIFO ordering is maintained throughout processing - """ - # Arrange - document_ids = [str(uuid.uuid4())] - - # Create tasks with identifiable document IDs to track order - task_order = ["task_A", "task_B", "task_C"] - tasks = [] - for task_name in task_order: - task_data = {"tenant_id": tenant_id, "dataset_id": dataset_id, "document_ids": [task_name]} - from core.rag.pipeline.queue import TaskWrapper - - wrapper = TaskWrapper(data=task_data) - tasks.append(wrapper.serialize()) - - # Mock rpop to return tasks in FIFO order - mock_redis.rpop.side_effect = tasks + [None] - mock_db_session.query.return_value.where.return_value.first.return_value = mock_dataset - - with patch("tasks.document_indexing_task.dify_config.TENANT_ISOLATED_TASK_CONCURRENCY", 3): - with patch("tasks.document_indexing_task.normal_document_indexing_task") as mock_task: - # Act - _document_indexing_with_tenant_queue(tenant_id, dataset_id, document_ids, mock_task) - - # Assert - Verify tasks were enqueued in correct order - assert mock_task.delay.call_count == 3 - - # Check that document_ids in calls match expected order - for i, call_obj in enumerate(mock_task.delay.call_args_list): - called_doc_ids = call_obj[1]["document_ids"] - assert called_doc_ids == [task_order[i]] - - def test_empty_queue_after_task_completion_cleans_up( - self, tenant_id, dataset_id, document_ids, mock_redis, mock_db_session, mock_dataset - ): - """ - Test cleanup behavior when queue becomes empty after task completion. - - After processing the last task in the queue, the system should: - 1. Detect that no more tasks are waiting - 2. Delete the task key to indicate tenant is idle - 3. Allow new tasks to start fresh processing - - Scenario: - - Process a task - - Check queue for next tasks - - Queue is empty - - Task key should be deleted - - Expected behavior: - - Task key is deleted when queue is empty - - Tenant is marked as idle (no active tasks) - """ - # Arrange - mock_redis.rpop.return_value = None # Empty queue - mock_db_session.query.return_value.where.return_value.first.return_value = mock_dataset - - with patch("tasks.document_indexing_task.normal_document_indexing_task") as mock_task: - # Act - _document_indexing_with_tenant_queue(tenant_id, dataset_id, document_ids, mock_task) - - # Assert - # Verify delete was called to clean up task key - mock_redis.delete.assert_called_once() - - # Verify the correct key was deleted (contains tenant_id and "document_indexing") - delete_call_args = mock_redis.delete.call_args[0][0] - assert tenant_id in delete_call_args - assert "document_indexing" in delete_call_args - - def test_billing_disabled_skips_limit_checks( - self, dataset_id, document_ids, mock_db_session, mock_dataset, mock_indexing_runner, mock_feature_service - ): - """ - Test that billing limit checks are skipped when billing is disabled. - - For self-hosted or enterprise deployments where billing is disabled, - the system should not enforce vector space or batch upload limits. - - Scenario: - - Billing is disabled - - Upload 100 documents (would normally exceed limits) - - No limit checks should be performed - - Expected behavior: - - Documents are processed without limit validation - - No errors related to limits - - All documents proceed to indexing - """ - # Arrange - Create many documents - large_batch_ids = [str(uuid.uuid4()) for _ in range(100)] - - mock_documents = [] - for doc_id in large_batch_ids: - doc = MagicMock(spec=Document) - doc.id = doc_id - doc.dataset_id = dataset_id - doc.indexing_status = "waiting" - doc.processing_started_at = None - mock_documents.append(doc) - - # Set shared mock data so all sessions can access it - mock_db_session._shared_data["dataset"] = mock_dataset - mock_db_session._shared_data["documents"] = mock_documents - - # Billing disabled - limits should not be checked - mock_feature_service.get_features.return_value.billing.enabled = False - - # Act - _document_indexing(dataset_id, large_batch_ids) - - # Assert - # All documents should be set to parsing (no limit errors) - for doc in mock_documents: - assert doc.indexing_status == "parsing" - - # IndexingRunner should be called with all documents - mock_indexing_runner.run.assert_called_once() - call_args = mock_indexing_runner.run.call_args[0][0] - assert len(call_args) == 100 - - -class TestIntegration: - """Integration tests for complete task workflows.""" - - def test_complete_workflow_normal_task( - self, tenant_id, dataset_id, document_ids, mock_redis, mock_db_session, mock_dataset, mock_indexing_runner - ): - """ - Test complete workflow for normal document indexing task. - - This tests the full flow from task receipt to completion. - """ - # Arrange - Create actual document objects - mock_documents = [] - for doc_id in document_ids: - doc = MagicMock(spec=Document) - doc.id = doc_id - doc.dataset_id = dataset_id - doc.indexing_status = "waiting" - doc.processing_started_at = None - mock_documents.append(doc) - - # Set up rpop to return None for concurrency check (no more tasks) - mock_redis.rpop.side_effect = [None] - # Set shared mock data so all sessions can access it - mock_db_session._shared_data["dataset"] = mock_dataset - mock_db_session._shared_data["documents"] = mock_documents - - with patch("tasks.document_indexing_task.FeatureService.get_features") as mock_features: - mock_features.return_value.billing.enabled = False - - # Act - normal_document_indexing_task(tenant_id, dataset_id, document_ids) - - # Assert - # Documents should be processed - mock_indexing_runner.run.assert_called_once() - # Session should be closed - assert mock_db_session.close.called - # Task key should be deleted (no more tasks) - assert mock_redis.delete.called - - def test_complete_workflow_priority_task( - self, tenant_id, dataset_id, document_ids, mock_redis, mock_db_session, mock_dataset, mock_indexing_runner - ): - """ - Test complete workflow for priority document indexing task. - - Priority tasks should follow the same flow as normal tasks. - """ - # Arrange - Create actual document objects - mock_documents = [] - for doc_id in document_ids: - doc = MagicMock(spec=Document) - doc.id = doc_id - doc.dataset_id = dataset_id - doc.indexing_status = "waiting" - doc.processing_started_at = None - mock_documents.append(doc) - - # Set up rpop to return None for concurrency check (no more tasks) - mock_redis.rpop.side_effect = [None] - # Set shared mock data so all sessions can access it - mock_db_session._shared_data["dataset"] = mock_dataset - mock_db_session._shared_data["documents"] = mock_documents - - with patch("tasks.document_indexing_task.FeatureService.get_features") as mock_features: - mock_features.return_value.billing.enabled = False - - # Act - priority_document_indexing_task(tenant_id, dataset_id, document_ids) - - # Assert - mock_indexing_runner.run.assert_called_once() - assert mock_db_session.close.called - assert mock_redis.delete.called - - def test_queue_chain_processing( - self, tenant_id, dataset_id, mock_redis, mock_db_session, mock_dataset, mock_indexing_runner - ): - """ - Test that multiple tasks in queue are processed in sequence. - - When tasks are queued, they should be processed one after another. - """ - # Arrange - task_1_docs = [str(uuid.uuid4())] - task_2_docs = [str(uuid.uuid4())] - - task_2_data = {"tenant_id": tenant_id, "dataset_id": dataset_id, "document_ids": task_2_docs} - - from core.rag.pipeline.queue import TaskWrapper - - wrapper = TaskWrapper(data=task_2_data) - - # First call returns task 2, second call returns None - mock_redis.rpop.side_effect = [wrapper.serialize(), None] - - mock_db_session.query.return_value.where.return_value.first.return_value = mock_dataset - - with patch("tasks.document_indexing_task.FeatureService.get_features") as mock_features: - mock_features.return_value.billing.enabled = False - - with patch("tasks.document_indexing_task.normal_document_indexing_task") as mock_task: - # Act - Process first task - _document_indexing_with_tenant_queue(tenant_id, dataset_id, task_1_docs, mock_task) - - # Assert - Second task should be enqueued - assert mock_task.delay.called - call_args = mock_task.delay.call_args - assert call_args[1]["document_ids"] == task_2_docs - - # ============================================================================ # Additional Edge Case Tests # ============================================================================ @@ -1343,87 +206,6 @@ class TestIntegration: class TestEdgeCases: """Test edge cases and boundary conditions.""" - def test_single_document_processing(self, dataset_id, mock_db_session, mock_dataset, mock_indexing_runner): - """ - Test processing a single document (minimum batch size). - - Single document processing is a common case and should work - without any special handling or errors. - - Scenario: - - Process exactly 1 document - - Document exists and is valid - - Expected behavior: - - Document is processed successfully - - Status is updated to 'parsing' - - IndexingRunner is called with single document - """ - # Arrange - document_ids = [str(uuid.uuid4())] - - mock_document = MagicMock(spec=Document) - mock_document.id = document_ids[0] - mock_document.dataset_id = dataset_id - mock_document.indexing_status = "waiting" - mock_document.processing_started_at = None - - # Set shared mock data so all sessions can access it - mock_db_session._shared_data["dataset"] = mock_dataset - mock_db_session._shared_data["documents"] = [mock_document] - - with patch("tasks.document_indexing_task.FeatureService.get_features") as mock_features: - mock_features.return_value.billing.enabled = False - - # Act - _document_indexing(dataset_id, document_ids) - - # Assert - assert mock_document.indexing_status == "parsing" - mock_indexing_runner.run.assert_called_once() - call_args = mock_indexing_runner.run.call_args[0][0] - assert len(call_args) == 1 - - def test_document_with_special_characters_in_id( - self, dataset_id, mock_db_session, mock_dataset, mock_indexing_runner - ): - """ - Test handling documents with special characters in IDs. - - Document IDs might contain special characters or unusual formats. - The system should handle these without errors. - - Scenario: - - Document ID contains hyphens, underscores - - Standard UUID format - - Expected behavior: - - Document is processed normally - - No parsing or encoding errors - """ - # Arrange - UUID format with standard characters - document_ids = [str(uuid.uuid4())] - - mock_document = MagicMock(spec=Document) - mock_document.id = document_ids[0] - mock_document.dataset_id = dataset_id - mock_document.indexing_status = "waiting" - mock_document.processing_started_at = None - - # Set shared mock data so all sessions can access it - mock_db_session._shared_data["dataset"] = mock_dataset - mock_db_session._shared_data["documents"] = [mock_document] - - with patch("tasks.document_indexing_task.FeatureService.get_features") as mock_features: - mock_features.return_value.billing.enabled = False - - # Act - Should not raise any exceptions - _document_indexing(dataset_id, document_ids) - - # Assert - assert mock_document.indexing_status == "parsing" - mock_indexing_runner.run.assert_called_once() - def test_rapid_successive_task_enqueuing(self, tenant_id, dataset_id, mock_redis): """ Test rapid successive task enqueuing to the same tenant queue. @@ -1463,204 +245,10 @@ class TestEdgeCases: assert mock_redis.lpush.call_count == 5 mock_task.delay.assert_not_called() - def test_zero_vector_space_limit_allows_unlimited( - self, dataset_id, document_ids, mock_db_session, mock_dataset, mock_indexing_runner, mock_feature_service - ): - """ - Test that zero vector space limit means unlimited. - - When vector_space.limit is 0, it indicates no limit is enforced, - allowing unlimited document uploads. - - Scenario: - - Vector space limit: 0 (unlimited) - - Current size: 1000 (any number) - - Upload 3 documents - - Expected behavior: - - Upload is allowed - - No limit errors - - Documents are processed normally - """ - # Arrange - mock_documents = [] - for doc_id in document_ids: - doc = MagicMock(spec=Document) - doc.id = doc_id - doc.dataset_id = dataset_id - doc.indexing_status = "waiting" - doc.processing_started_at = None - mock_documents.append(doc) - - # Set shared mock data so all sessions can access it - mock_db_session._shared_data["dataset"] = mock_dataset - mock_db_session._shared_data["documents"] = mock_documents - - # Set vector space limit to 0 (unlimited) - mock_feature_service.get_features.return_value.billing.enabled = True - mock_feature_service.get_features.return_value.billing.subscription.plan = CloudPlan.PROFESSIONAL - mock_feature_service.get_features.return_value.vector_space.limit = 0 # Unlimited - mock_feature_service.get_features.return_value.vector_space.size = 1000 - - # Act - _document_indexing(dataset_id, document_ids) - - # Assert - All documents should be processed (no limit error) - for doc in mock_documents: - assert doc.indexing_status == "parsing" - - mock_indexing_runner.run.assert_called_once() - - def test_negative_vector_space_values_handled_gracefully( - self, dataset_id, document_ids, mock_db_session, mock_dataset, mock_indexing_runner, mock_feature_service - ): - """ - Test handling of negative vector space values. - - Negative values in vector space configuration should be treated - as unlimited or invalid, not causing crashes. - - Scenario: - - Vector space limit: -1 (invalid/unlimited indicator) - - Current size: 100 - - Upload 3 documents - - Expected behavior: - - Upload is allowed (negative treated as no limit) - - No crashes or validation errors - """ - # Arrange - mock_documents = [] - for doc_id in document_ids: - doc = MagicMock(spec=Document) - doc.id = doc_id - doc.dataset_id = dataset_id - doc.indexing_status = "waiting" - doc.processing_started_at = None - mock_documents.append(doc) - - # Set shared mock data so all sessions can access it - mock_db_session._shared_data["dataset"] = mock_dataset - mock_db_session._shared_data["documents"] = mock_documents - - # Set negative vector space limit - mock_feature_service.get_features.return_value.billing.enabled = True - mock_feature_service.get_features.return_value.billing.subscription.plan = CloudPlan.PROFESSIONAL - mock_feature_service.get_features.return_value.vector_space.limit = -1 # Negative - mock_feature_service.get_features.return_value.vector_space.size = 100 - - # Act - _document_indexing(dataset_id, document_ids) - - # Assert - Should process normally (negative treated as unlimited) - for doc in mock_documents: - assert doc.indexing_status == "parsing" - class TestPerformanceScenarios: """Test performance-related scenarios and optimizations.""" - def test_large_document_batch_processing( - self, dataset_id, mock_db_session, mock_dataset, mock_indexing_runner, mock_feature_service - ): - """ - Test processing a large batch of documents at batch limit. - - When processing the maximum allowed batch size, the system - should handle it efficiently without errors. - - Scenario: - - Process exactly batch_upload_limit documents (e.g., 50) - - All documents are valid - - Billing is enabled - - Expected behavior: - - All documents are processed successfully - - No timeout or memory issues - - Batch limit is not exceeded - """ - # Arrange - batch_limit = 50 - document_ids = [str(uuid.uuid4()) for _ in range(batch_limit)] - - mock_documents = [] - for doc_id in document_ids: - doc = MagicMock(spec=Document) - doc.id = doc_id - doc.dataset_id = dataset_id - doc.indexing_status = "waiting" - doc.processing_started_at = None - mock_documents.append(doc) - - # Set shared mock data so all sessions can access it - mock_db_session._shared_data["dataset"] = mock_dataset - mock_db_session._shared_data["documents"] = mock_documents - - # Configure billing with sufficient limits - mock_feature_service.get_features.return_value.billing.enabled = True - mock_feature_service.get_features.return_value.billing.subscription.plan = CloudPlan.PROFESSIONAL - mock_feature_service.get_features.return_value.vector_space.limit = 10000 - mock_feature_service.get_features.return_value.vector_space.size = 0 - - with patch("tasks.document_indexing_task.dify_config.BATCH_UPLOAD_LIMIT", str(batch_limit)): - # Act - _document_indexing(dataset_id, document_ids) - - # Assert - for doc in mock_documents: - assert doc.indexing_status == "parsing" - - mock_indexing_runner.run.assert_called_once() - call_args = mock_indexing_runner.run.call_args[0][0] - assert len(call_args) == batch_limit - - def test_tenant_queue_handles_burst_traffic(self, tenant_id, dataset_id, mock_redis, mock_db_session, mock_dataset): - """ - Test tenant queue handling burst traffic scenarios. - - When many tasks arrive in a burst for the same tenant, - the queue should handle them efficiently without dropping tasks. - - Scenario: - - 20 tasks arrive rapidly - - Concurrency limit is 3 - - Tasks should be queued and processed in batches - - Expected behavior: - - First 3 tasks are processed immediately - - Remaining tasks wait in queue - - No tasks are lost - """ - # Arrange - num_tasks = 20 - concurrency_limit = 3 - document_ids = [str(uuid.uuid4())] - - # Create waiting tasks - waiting_tasks = [] - for i in range(num_tasks): - task_data = { - "tenant_id": tenant_id, - "dataset_id": dataset_id, - "document_ids": [f"doc_{i}"], - } - from core.rag.pipeline.queue import TaskWrapper - - wrapper = TaskWrapper(data=task_data) - waiting_tasks.append(wrapper.serialize()) - - # Mock rpop to return tasks up to concurrency limit - mock_redis.rpop.side_effect = waiting_tasks[:concurrency_limit] + [None] - mock_db_session.query.return_value.where.return_value.first.return_value = mock_dataset - - with patch("tasks.document_indexing_task.dify_config.TENANT_ISOLATED_TASK_CONCURRENCY", concurrency_limit): - with patch("tasks.document_indexing_task.normal_document_indexing_task") as mock_task: - # Act - _document_indexing_with_tenant_queue(tenant_id, dataset_id, document_ids, mock_task) - - # Assert - Should process exactly concurrency_limit tasks - assert mock_task.delay.call_count == concurrency_limit - def test_multiple_tenants_isolated_processing(self, mock_redis): """ Test that multiple tenants process tasks in isolation. @@ -1704,94 +292,6 @@ class TestPerformanceScenarios: class TestRobustness: """Test system robustness and resilience.""" - def test_indexing_runner_exception_does_not_crash_task( - self, dataset_id, document_ids, mock_db_session, mock_dataset, mock_indexing_runner - ): - """ - Test that IndexingRunner exceptions are handled gracefully. - - When IndexingRunner raises an unexpected exception during processing, - the task should catch it, log it, and clean up properly. - - Scenario: - - Documents are prepared for indexing - - IndexingRunner.run() raises RuntimeError - - Task should not crash - - Expected behavior: - - Exception is caught and logged - - Database session is closed - - Task completes (doesn't hang) - """ - # Arrange - mock_documents = [] - for doc_id in document_ids: - doc = MagicMock(spec=Document) - doc.id = doc_id - doc.dataset_id = dataset_id - doc.indexing_status = "waiting" - doc.processing_started_at = None - mock_documents.append(doc) - - # Set shared mock data so all sessions can access it - mock_db_session._shared_data["dataset"] = mock_dataset - mock_db_session._shared_data["documents"] = mock_documents - - # Make IndexingRunner raise an exception - mock_indexing_runner.run.side_effect = RuntimeError("Unexpected indexing error") - - with patch("tasks.document_indexing_task.FeatureService.get_features") as mock_features: - mock_features.return_value.billing.enabled = False - - # Act - Should not raise exception - _document_indexing(dataset_id, document_ids) - - # Assert - Session should be closed even after error - assert mock_db_session.close.called - - def test_database_session_always_closed_on_success( - self, dataset_id, document_ids, mock_db_session, mock_dataset, mock_indexing_runner - ): - """ - Test that database session is always closed on successful completion. - - Proper resource cleanup is critical. The database session must - be closed in the finally block to prevent connection leaks. - - Scenario: - - Task processes successfully - - No exceptions occur - - Expected behavior: - - All database sessions are closed - - No connection leaks - """ - # Arrange - mock_documents = [] - for doc_id in document_ids: - doc = MagicMock(spec=Document) - doc.id = doc_id - doc.dataset_id = dataset_id - doc.indexing_status = "waiting" - doc.processing_started_at = None - mock_documents.append(doc) - - # Set shared mock data so all sessions can access it - mock_db_session._shared_data["dataset"] = mock_dataset - mock_db_session._shared_data["documents"] = mock_documents - - with patch("tasks.document_indexing_task.FeatureService.get_features") as mock_features: - mock_features.return_value.billing.enabled = False - - # Act - _document_indexing(dataset_id, document_ids) - - # Assert - All created sessions should be closed - # The code creates multiple sessions: validation, Phase 1 (parsing), Phase 3 (summary) - assert len(mock_db_session.all_sessions) >= 1 - for session in mock_db_session.all_sessions: - assert session.close.called, "All sessions should be closed" - def test_task_proxy_handles_feature_service_failure(self, tenant_id, dataset_id, document_ids, mock_redis): """ Test that task proxy handles FeatureService failures gracefully.