From ddad2460f3ee4906df6819c48babaeab62221b43 Mon Sep 17 00:00:00 2001 From: Gritty_dev <101377478+codomposer@users.noreply.github.com> Date: Fri, 28 Nov 2025 08:31:03 -0500 Subject: [PATCH] feat: complete test script of dataset indexing task (#28897) Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com> --- .../tasks/test_dataset_indexing_task.py | 1913 +++++++++++++++++ 1 file changed, 1913 insertions(+) create mode 100644 api/tests/unit_tests/tasks/test_dataset_indexing_task.py diff --git a/api/tests/unit_tests/tasks/test_dataset_indexing_task.py b/api/tests/unit_tests/tasks/test_dataset_indexing_task.py new file mode 100644 index 0000000000..b3b29fbe45 --- /dev/null +++ b/api/tests/unit_tests/tasks/test_dataset_indexing_task.py @@ -0,0 +1,1913 @@ +""" +Unit tests for dataset indexing tasks. + +This module tests the document indexing task functionality including: +- Task enqueuing to different queues (normal, priority, tenant-isolated) +- Batch processing of multiple documents +- Progress tracking through task lifecycle +- Error handling and retry mechanisms +- Task cancellation and cleanup +""" + +import uuid +from unittest.mock import MagicMock, Mock, patch + +import pytest + +from core.indexing_runner import DocumentIsPausedError, IndexingRunner +from core.rag.pipeline.queue import TenantIsolatedTaskQueue +from enums.cloud_plan import CloudPlan +from extensions.ext_redis import redis_client +from models.dataset import Dataset, Document +from services.document_indexing_task_proxy import DocumentIndexingTaskProxy +from tasks.document_indexing_task import ( + _document_indexing, + _document_indexing_with_tenant_queue, + document_indexing_task, + normal_document_indexing_task, + priority_document_indexing_task, +) + +# ============================================================================ +# Fixtures +# ============================================================================ + + +@pytest.fixture +def tenant_id(): + """Generate a unique tenant ID for testing.""" + return str(uuid.uuid4()) + + +@pytest.fixture +def dataset_id(): + """Generate a unique dataset ID for testing.""" + return str(uuid.uuid4()) + + +@pytest.fixture +def document_ids(): + """Generate a list of document IDs for testing.""" + return [str(uuid.uuid4()) for _ in range(3)] + + +@pytest.fixture +def mock_dataset(dataset_id, tenant_id): + """Create a mock Dataset object.""" + dataset = Mock(spec=Dataset) + dataset.id = dataset_id + dataset.tenant_id = tenant_id + dataset.indexing_technique = "high_quality" + dataset.embedding_model_provider = "openai" + dataset.embedding_model = "text-embedding-ada-002" + return dataset + + +@pytest.fixture +def mock_documents(document_ids, dataset_id): + """Create mock Document objects.""" + documents = [] + for doc_id in document_ids: + doc = Mock(spec=Document) + doc.id = doc_id + doc.dataset_id = dataset_id + doc.indexing_status = "waiting" + doc.error = None + doc.stopped_at = None + doc.processing_started_at = None + documents.append(doc) + return documents + + +@pytest.fixture +def mock_db_session(): + """Mock database session.""" + with patch("tasks.document_indexing_task.db.session") as mock_session: + mock_query = MagicMock() + mock_session.query.return_value = mock_query + mock_query.where.return_value = mock_query + yield mock_session + + +@pytest.fixture +def mock_indexing_runner(): + """Mock IndexingRunner.""" + with patch("tasks.document_indexing_task.IndexingRunner") as mock_runner_class: + mock_runner = MagicMock(spec=IndexingRunner) + mock_runner_class.return_value = mock_runner + yield mock_runner + + +@pytest.fixture +def mock_feature_service(): + """Mock FeatureService for billing and feature checks.""" + with patch("tasks.document_indexing_task.FeatureService") as mock_service: + yield mock_service + + +@pytest.fixture +def mock_redis(): + """Mock Redis client operations.""" + # Redis is already mocked globally in conftest.py + # Reset it for each test + redis_client.reset_mock() + redis_client.get.return_value = None + redis_client.setex.return_value = True + redis_client.delete.return_value = True + redis_client.lpush.return_value = 1 + redis_client.rpop.return_value = None + return redis_client + + +# ============================================================================ +# Test Task Enqueuing +# ============================================================================ + + +class TestTaskEnqueuing: + """Test cases for task enqueuing to different queues.""" + + def test_enqueue_to_priority_direct_queue_for_self_hosted(self, tenant_id, dataset_id, document_ids, mock_redis): + """ + Test enqueuing to priority direct queue for self-hosted deployments. + + When billing is disabled (self-hosted), tasks should go directly to + the priority queue without tenant isolation. + """ + # Arrange + with patch.object(DocumentIndexingTaskProxy, "features") as mock_features: + mock_features.billing.enabled = False + + with patch("services.document_indexing_task_proxy.priority_document_indexing_task") as mock_task: + proxy = DocumentIndexingTaskProxy(tenant_id, dataset_id, document_ids) + + # Act + proxy.delay() + + # Assert + mock_task.delay.assert_called_once_with( + tenant_id=tenant_id, dataset_id=dataset_id, document_ids=document_ids + ) + + def test_enqueue_to_normal_tenant_queue_for_sandbox_plan(self, tenant_id, dataset_id, document_ids, mock_redis): + """ + Test enqueuing to normal tenant queue for sandbox plan. + + Sandbox plan users should have their tasks queued with tenant isolation + in the normal priority queue. + """ + # Arrange + mock_redis.get.return_value = None # No existing task + + with patch.object(DocumentIndexingTaskProxy, "features") as mock_features: + mock_features.billing.enabled = True + mock_features.billing.subscription.plan = CloudPlan.SANDBOX + + with patch("services.document_indexing_task_proxy.normal_document_indexing_task") as mock_task: + proxy = DocumentIndexingTaskProxy(tenant_id, dataset_id, document_ids) + + # Act + proxy.delay() + + # Assert - Should set task key and call delay + assert mock_redis.setex.called + mock_task.delay.assert_called_once() + + def test_enqueue_to_priority_tenant_queue_for_paid_plan(self, tenant_id, dataset_id, document_ids, mock_redis): + """ + Test enqueuing to priority tenant queue for paid plans. + + Paid plan users should have their tasks queued with tenant isolation + in the priority queue. + """ + # Arrange + mock_redis.get.return_value = None # No existing task + + with patch.object(DocumentIndexingTaskProxy, "features") as mock_features: + mock_features.billing.enabled = True + mock_features.billing.subscription.plan = CloudPlan.PROFESSIONAL + + with patch("services.document_indexing_task_proxy.priority_document_indexing_task") as mock_task: + proxy = DocumentIndexingTaskProxy(tenant_id, dataset_id, document_ids) + + # Act + proxy.delay() + + # Assert + assert mock_redis.setex.called + mock_task.delay.assert_called_once() + + def test_enqueue_adds_to_waiting_queue_when_task_running(self, tenant_id, dataset_id, document_ids, mock_redis): + """ + Test that new tasks are added to waiting queue when a task is already running. + + If a task is already running for the tenant (task key exists), + new tasks should be pushed to the waiting queue. + """ + # Arrange + mock_redis.get.return_value = b"1" # Task already running + + with patch.object(DocumentIndexingTaskProxy, "features") as mock_features: + mock_features.billing.enabled = True + mock_features.billing.subscription.plan = CloudPlan.PROFESSIONAL + + with patch("services.document_indexing_task_proxy.priority_document_indexing_task") as mock_task: + proxy = DocumentIndexingTaskProxy(tenant_id, dataset_id, document_ids) + + # Act + proxy.delay() + + # Assert - Should push to queue, not call delay + assert mock_redis.lpush.called + mock_task.delay.assert_not_called() + + def test_legacy_document_indexing_task_still_works( + self, dataset_id, document_ids, mock_db_session, mock_dataset, mock_documents, mock_indexing_runner + ): + """ + Test that the legacy document_indexing_task function still works. + + This ensures backward compatibility for existing code that may still + use the deprecated function. + """ + # Arrange + mock_db_session.query.return_value.where.return_value.first.return_value = mock_dataset + + def mock_query_side_effect(*args): + mock_query = MagicMock() + if args[0] == Dataset: + mock_query.where.return_value.first.return_value = mock_dataset + elif args[0] == Document: + # Return documents one by one for each call + mock_query.where.return_value.first.side_effect = mock_documents + return mock_query + + mock_db_session.query.side_effect = mock_query_side_effect + + with patch("tasks.document_indexing_task.FeatureService.get_features") as mock_features: + mock_features.return_value.billing.enabled = False + + # Act + document_indexing_task(dataset_id, document_ids) + + # Assert + mock_indexing_runner.run.assert_called_once() + + +# ============================================================================ +# Test Batch Processing +# ============================================================================ + + +class TestBatchProcessing: + """Test cases for batch processing of multiple documents.""" + + def test_batch_processing_multiple_documents( + self, dataset_id, document_ids, mock_db_session, mock_dataset, mock_indexing_runner + ): + """ + Test batch processing of multiple documents. + + All documents in the batch should be processed together and their + status should be updated to 'parsing'. + """ + # Arrange - Create actual document objects that can be modified + mock_documents = [] + for doc_id in document_ids: + doc = MagicMock(spec=Document) + doc.id = doc_id + doc.dataset_id = dataset_id + doc.indexing_status = "waiting" + doc.error = None + doc.stopped_at = None + doc.processing_started_at = None + mock_documents.append(doc) + + mock_db_session.query.return_value.where.return_value.first.return_value = mock_dataset + + # Create an iterator for documents + doc_iter = iter(mock_documents) + + def mock_query_side_effect(*args): + mock_query = MagicMock() + if args[0] == Dataset: + mock_query.where.return_value.first.return_value = mock_dataset + elif args[0] == Document: + # Return documents one by one for each call + mock_query.where.return_value.first = lambda: next(doc_iter, None) + return mock_query + + mock_db_session.query.side_effect = mock_query_side_effect + + with patch("tasks.document_indexing_task.FeatureService.get_features") as mock_features: + mock_features.return_value.billing.enabled = False + + # Act + _document_indexing(dataset_id, document_ids) + + # Assert - All documents should be set to 'parsing' status + for doc in mock_documents: + assert doc.indexing_status == "parsing" + assert doc.processing_started_at is not None + + # IndexingRunner should be called with all documents + mock_indexing_runner.run.assert_called_once() + call_args = mock_indexing_runner.run.call_args[0][0] + assert len(call_args) == len(document_ids) + + def test_batch_processing_with_limit_check(self, dataset_id, mock_db_session, mock_dataset, mock_feature_service): + """ + Test batch processing respects upload limits. + + When the number of documents exceeds the batch upload limit, + an error should be raised and all documents should be marked as error. + """ + # Arrange + batch_limit = 10 + document_ids = [str(uuid.uuid4()) for _ in range(batch_limit + 1)] + + mock_documents = [] + for doc_id in document_ids: + doc = MagicMock(spec=Document) + doc.id = doc_id + doc.dataset_id = dataset_id + doc.indexing_status = "waiting" + doc.error = None + doc.stopped_at = None + mock_documents.append(doc) + + mock_db_session.query.return_value.where.return_value.first.return_value = mock_dataset + + doc_iter = iter(mock_documents) + + def mock_query_side_effect(*args): + mock_query = MagicMock() + if args[0] == Dataset: + mock_query.where.return_value.first.return_value = mock_dataset + elif args[0] == Document: + mock_query.where.return_value.first = lambda: next(doc_iter, None) + return mock_query + + mock_db_session.query.side_effect = mock_query_side_effect + + mock_feature_service.get_features.return_value.billing.enabled = True + mock_feature_service.get_features.return_value.billing.subscription.plan = CloudPlan.PROFESSIONAL + mock_feature_service.get_features.return_value.vector_space.limit = 1000 + mock_feature_service.get_features.return_value.vector_space.size = 0 + + with patch("tasks.document_indexing_task.dify_config.BATCH_UPLOAD_LIMIT", str(batch_limit)): + # Act + _document_indexing(dataset_id, document_ids) + + # Assert - All documents should have error status + for doc in mock_documents: + assert doc.indexing_status == "error" + assert doc.error is not None + assert "batch upload limit" in doc.error + + def test_batch_processing_sandbox_plan_single_document_only( + self, dataset_id, mock_db_session, mock_dataset, mock_feature_service + ): + """ + Test that sandbox plan only allows single document upload. + + Sandbox plan should reject batch uploads (more than 1 document). + """ + # Arrange + document_ids = [str(uuid.uuid4()) for _ in range(2)] + + mock_documents = [] + for doc_id in document_ids: + doc = MagicMock(spec=Document) + doc.id = doc_id + doc.dataset_id = dataset_id + doc.indexing_status = "waiting" + doc.error = None + doc.stopped_at = None + mock_documents.append(doc) + + mock_db_session.query.return_value.where.return_value.first.return_value = mock_dataset + + doc_iter = iter(mock_documents) + + def mock_query_side_effect(*args): + mock_query = MagicMock() + if args[0] == Dataset: + mock_query.where.return_value.first.return_value = mock_dataset + elif args[0] == Document: + mock_query.where.return_value.first = lambda: next(doc_iter, None) + return mock_query + + mock_db_session.query.side_effect = mock_query_side_effect + + mock_feature_service.get_features.return_value.billing.enabled = True + mock_feature_service.get_features.return_value.billing.subscription.plan = CloudPlan.SANDBOX + mock_feature_service.get_features.return_value.vector_space.limit = 1000 + mock_feature_service.get_features.return_value.vector_space.size = 0 + + # Act + _document_indexing(dataset_id, document_ids) + + # Assert - All documents should have error status + for doc in mock_documents: + assert doc.indexing_status == "error" + assert "does not support batch upload" in doc.error + + def test_batch_processing_empty_document_list( + self, dataset_id, mock_db_session, mock_dataset, mock_indexing_runner + ): + """ + Test batch processing with empty document list. + + Should handle empty list gracefully without errors. + """ + # Arrange + document_ids = [] + mock_db_session.query.return_value.where.return_value.first.return_value = mock_dataset + + with patch("tasks.document_indexing_task.FeatureService.get_features") as mock_features: + mock_features.return_value.billing.enabled = False + + # Act + _document_indexing(dataset_id, document_ids) + + # Assert - IndexingRunner should still be called with empty list + mock_indexing_runner.run.assert_called_once_with([]) + + +# ============================================================================ +# Test Progress Tracking +# ============================================================================ + + +class TestProgressTracking: + """Test cases for progress tracking through task lifecycle.""" + + def test_document_status_progression( + self, dataset_id, document_ids, mock_db_session, mock_dataset, mock_indexing_runner + ): + """ + Test document status progresses correctly through lifecycle. + + Documents should transition from 'waiting' -> 'parsing' -> processed. + """ + # Arrange - Create actual document objects + mock_documents = [] + for doc_id in document_ids: + doc = MagicMock(spec=Document) + doc.id = doc_id + doc.dataset_id = dataset_id + doc.indexing_status = "waiting" + doc.processing_started_at = None + mock_documents.append(doc) + + mock_db_session.query.return_value.where.return_value.first.return_value = mock_dataset + + doc_iter = iter(mock_documents) + + def mock_query_side_effect(*args): + mock_query = MagicMock() + if args[0] == Dataset: + mock_query.where.return_value.first.return_value = mock_dataset + elif args[0] == Document: + mock_query.where.return_value.first = lambda: next(doc_iter, None) + return mock_query + + mock_db_session.query.side_effect = mock_query_side_effect + + with patch("tasks.document_indexing_task.FeatureService.get_features") as mock_features: + mock_features.return_value.billing.enabled = False + + # Act + _document_indexing(dataset_id, document_ids) + + # Assert - Status should be 'parsing' + for doc in mock_documents: + assert doc.indexing_status == "parsing" + assert doc.processing_started_at is not None + + # Verify commit was called to persist status + assert mock_db_session.commit.called + + def test_processing_started_timestamp_set( + self, dataset_id, document_ids, mock_db_session, mock_dataset, mock_indexing_runner + ): + """ + Test that processing_started_at timestamp is set correctly. + + When documents start processing, the timestamp should be recorded. + """ + # Arrange - Create actual document objects + mock_documents = [] + for doc_id in document_ids: + doc = MagicMock(spec=Document) + doc.id = doc_id + doc.dataset_id = dataset_id + doc.indexing_status = "waiting" + doc.processing_started_at = None + mock_documents.append(doc) + + mock_db_session.query.return_value.where.return_value.first.return_value = mock_dataset + + doc_iter = iter(mock_documents) + + def mock_query_side_effect(*args): + mock_query = MagicMock() + if args[0] == Dataset: + mock_query.where.return_value.first.return_value = mock_dataset + elif args[0] == Document: + mock_query.where.return_value.first = lambda: next(doc_iter, None) + return mock_query + + mock_db_session.query.side_effect = mock_query_side_effect + + with patch("tasks.document_indexing_task.FeatureService.get_features") as mock_features: + mock_features.return_value.billing.enabled = False + + # Act + _document_indexing(dataset_id, document_ids) + + # Assert + for doc in mock_documents: + assert doc.processing_started_at is not None + + def test_tenant_queue_processes_next_task_after_completion( + self, tenant_id, dataset_id, document_ids, mock_redis, mock_db_session, mock_dataset, mock_indexing_runner + ): + """ + Test that tenant queue processes next waiting task after completion. + + After a task completes, the system should check for waiting tasks + and process the next one. + """ + # Arrange + next_task_data = {"tenant_id": tenant_id, "dataset_id": dataset_id, "document_ids": ["next_doc_id"]} + + # Simulate next task in queue + from core.rag.pipeline.queue import TaskWrapper + + wrapper = TaskWrapper(data=next_task_data) + mock_redis.rpop.return_value = wrapper.serialize() + + mock_db_session.query.return_value.where.return_value.first.return_value = mock_dataset + + with patch("tasks.document_indexing_task.FeatureService.get_features") as mock_features: + mock_features.return_value.billing.enabled = False + + with patch("tasks.document_indexing_task.normal_document_indexing_task") as mock_task: + # Act + _document_indexing_with_tenant_queue(tenant_id, dataset_id, document_ids, mock_task) + + # Assert - Next task should be enqueued + mock_task.delay.assert_called() + # Task key should be set for next task + assert mock_redis.setex.called + + def test_tenant_queue_clears_flag_when_no_more_tasks( + self, tenant_id, dataset_id, document_ids, mock_redis, mock_db_session, mock_dataset, mock_indexing_runner + ): + """ + Test that tenant queue clears flag when no more tasks are waiting. + + When there are no more tasks in the queue, the task key should be deleted. + """ + # Arrange + mock_redis.rpop.return_value = None # No more tasks + mock_db_session.query.return_value.where.return_value.first.return_value = mock_dataset + + with patch("tasks.document_indexing_task.FeatureService.get_features") as mock_features: + mock_features.return_value.billing.enabled = False + + with patch("tasks.document_indexing_task.normal_document_indexing_task") as mock_task: + # Act + _document_indexing_with_tenant_queue(tenant_id, dataset_id, document_ids, mock_task) + + # Assert - Task key should be deleted + assert mock_redis.delete.called + + +# ============================================================================ +# Test Error Handling and Retries +# ============================================================================ + + +class TestErrorHandling: + """Test cases for error handling and retry mechanisms.""" + + def test_error_handling_sets_document_error_status( + self, dataset_id, document_ids, mock_db_session, mock_dataset, mock_feature_service + ): + """ + Test that errors during validation set document error status. + + When validation fails (e.g., limit exceeded), documents should be + marked with error status and error message. + """ + # Arrange - Create actual document objects + mock_documents = [] + for doc_id in document_ids: + doc = MagicMock(spec=Document) + doc.id = doc_id + doc.dataset_id = dataset_id + doc.indexing_status = "waiting" + doc.error = None + doc.stopped_at = None + mock_documents.append(doc) + + mock_db_session.query.return_value.where.return_value.first.return_value = mock_dataset + + doc_iter = iter(mock_documents) + + def mock_query_side_effect(*args): + mock_query = MagicMock() + if args[0] == Dataset: + mock_query.where.return_value.first.return_value = mock_dataset + elif args[0] == Document: + mock_query.where.return_value.first = lambda: next(doc_iter, None) + return mock_query + + mock_db_session.query.side_effect = mock_query_side_effect + + # Set up to trigger vector space limit error + mock_feature_service.get_features.return_value.billing.enabled = True + mock_feature_service.get_features.return_value.billing.subscription.plan = CloudPlan.PROFESSIONAL + mock_feature_service.get_features.return_value.vector_space.limit = 100 + mock_feature_service.get_features.return_value.vector_space.size = 100 # At limit + + # Act + _document_indexing(dataset_id, document_ids) + + # Assert + for doc in mock_documents: + assert doc.indexing_status == "error" + assert doc.error is not None + assert "over the limit" in doc.error + assert doc.stopped_at is not None + + def test_error_handling_during_indexing_runner( + self, dataset_id, document_ids, mock_db_session, mock_dataset, mock_documents, mock_indexing_runner + ): + """ + Test error handling when IndexingRunner raises an exception. + + Errors during indexing should be caught and logged, but not crash the task. + """ + # Arrange + mock_db_session.query.return_value.where.return_value.first.return_value = mock_dataset + + def mock_query_side_effect(*args): + mock_query = MagicMock() + if args[0] == Dataset: + mock_query.where.return_value.first.return_value = mock_dataset + elif args[0] == Document: + mock_query.where.return_value.first.side_effect = mock_documents + return mock_query + + mock_db_session.query.side_effect = mock_query_side_effect + + # Make IndexingRunner raise an exception + mock_indexing_runner.run.side_effect = Exception("Indexing failed") + + with patch("tasks.document_indexing_task.FeatureService.get_features") as mock_features: + mock_features.return_value.billing.enabled = False + + # Act - Should not raise exception + _document_indexing(dataset_id, document_ids) + + # Assert - Session should be closed even after error + assert mock_db_session.close.called + + def test_document_paused_error_handling( + self, dataset_id, document_ids, mock_db_session, mock_dataset, mock_documents, mock_indexing_runner + ): + """ + Test handling of DocumentIsPausedError. + + When a document is paused, the error should be caught and logged + but not treated as a failure. + """ + # Arrange + mock_db_session.query.return_value.where.return_value.first.return_value = mock_dataset + + def mock_query_side_effect(*args): + mock_query = MagicMock() + if args[0] == Dataset: + mock_query.where.return_value.first.return_value = mock_dataset + elif args[0] == Document: + mock_query.where.return_value.first.side_effect = mock_documents + return mock_query + + mock_db_session.query.side_effect = mock_query_side_effect + + # Make IndexingRunner raise DocumentIsPausedError + mock_indexing_runner.run.side_effect = DocumentIsPausedError("Document is paused") + + with patch("tasks.document_indexing_task.FeatureService.get_features") as mock_features: + mock_features.return_value.billing.enabled = False + + # Act - Should not raise exception + _document_indexing(dataset_id, document_ids) + + # Assert - Session should be closed + assert mock_db_session.close.called + + def test_dataset_not_found_error_handling(self, dataset_id, document_ids, mock_db_session): + """ + Test handling when dataset is not found. + + If the dataset doesn't exist, the task should exit gracefully. + """ + # Arrange + mock_db_session.query.return_value.where.return_value.first.return_value = None + + # Act + _document_indexing(dataset_id, document_ids) + + # Assert - Session should be closed + assert mock_db_session.close.called + + def test_tenant_queue_error_handling_still_processes_next_task( + self, tenant_id, dataset_id, document_ids, mock_redis, mock_db_session, mock_dataset, mock_indexing_runner + ): + """ + Test that errors don't prevent processing next task in tenant queue. + + Even if the current task fails, the next task should still be processed. + """ + # Arrange + next_task_data = {"tenant_id": tenant_id, "dataset_id": dataset_id, "document_ids": ["next_doc_id"]} + + from core.rag.pipeline.queue import TaskWrapper + + wrapper = TaskWrapper(data=next_task_data) + # Set up rpop to return task once for concurrency check + mock_redis.rpop.side_effect = [wrapper.serialize(), None] + + mock_db_session.query.return_value.where.return_value.first.return_value = mock_dataset + + # Make _document_indexing raise an error + with patch("tasks.document_indexing_task._document_indexing") as mock_indexing: + mock_indexing.side_effect = Exception("Processing failed") + + # Patch logger to avoid format string issue in actual code + with patch("tasks.document_indexing_task.logger"): + with patch("tasks.document_indexing_task.normal_document_indexing_task") as mock_task: + # Act + _document_indexing_with_tenant_queue(tenant_id, dataset_id, document_ids, mock_task) + + # Assert - Next task should still be enqueued despite error + mock_task.delay.assert_called() + + def test_concurrent_task_limit_respected( + self, tenant_id, dataset_id, document_ids, mock_redis, mock_db_session, mock_dataset + ): + """ + Test that tenant isolated task concurrency limit is respected. + + Should pull only TENANT_ISOLATED_TASK_CONCURRENCY tasks at a time. + """ + # Arrange + concurrency_limit = 2 + + # Create multiple tasks in queue + tasks = [] + for i in range(5): + task_data = {"tenant_id": tenant_id, "dataset_id": dataset_id, "document_ids": [f"doc_{i}"]} + from core.rag.pipeline.queue import TaskWrapper + + wrapper = TaskWrapper(data=task_data) + tasks.append(wrapper.serialize()) + + # Mock rpop to return tasks one by one + mock_redis.rpop.side_effect = tasks[:concurrency_limit] + [None] + + mock_db_session.query.return_value.where.return_value.first.return_value = mock_dataset + + with patch("tasks.document_indexing_task.dify_config.TENANT_ISOLATED_TASK_CONCURRENCY", concurrency_limit): + with patch("tasks.document_indexing_task.normal_document_indexing_task") as mock_task: + # Act + _document_indexing_with_tenant_queue(tenant_id, dataset_id, document_ids, mock_task) + + # Assert - Should call delay exactly concurrency_limit times + assert mock_task.delay.call_count == concurrency_limit + + +# ============================================================================ +# Test Task Cancellation +# ============================================================================ + + +class TestTaskCancellation: + """Test cases for task cancellation and cleanup.""" + + def test_task_key_deleted_when_queue_empty( + self, tenant_id, dataset_id, document_ids, mock_redis, mock_db_session, mock_dataset + ): + """ + Test that task key is deleted when queue becomes empty. + + When no more tasks are waiting, the tenant task key should be removed. + """ + # Arrange + mock_redis.rpop.return_value = None # Empty queue + mock_db_session.query.return_value.where.return_value.first.return_value = mock_dataset + + with patch("tasks.document_indexing_task.normal_document_indexing_task") as mock_task: + # Act + _document_indexing_with_tenant_queue(tenant_id, dataset_id, document_ids, mock_task) + + # Assert + assert mock_redis.delete.called + # Verify the correct key was deleted + delete_call_args = mock_redis.delete.call_args[0][0] + assert tenant_id in delete_call_args + assert "document_indexing" in delete_call_args + + def test_session_cleanup_on_success( + self, dataset_id, document_ids, mock_db_session, mock_dataset, mock_documents, mock_indexing_runner + ): + """ + Test that database session is properly closed on success. + + Session cleanup should happen in finally block. + """ + # Arrange + mock_db_session.query.return_value.where.return_value.first.return_value = mock_dataset + + def mock_query_side_effect(*args): + mock_query = MagicMock() + if args[0] == Dataset: + mock_query.where.return_value.first.return_value = mock_dataset + elif args[0] == Document: + mock_query.where.return_value.first.side_effect = mock_documents + return mock_query + + mock_db_session.query.side_effect = mock_query_side_effect + + with patch("tasks.document_indexing_task.FeatureService.get_features") as mock_features: + mock_features.return_value.billing.enabled = False + + # Act + _document_indexing(dataset_id, document_ids) + + # Assert + assert mock_db_session.close.called + + def test_session_cleanup_on_error( + self, dataset_id, document_ids, mock_db_session, mock_dataset, mock_documents, mock_indexing_runner + ): + """ + Test that database session is properly closed on error. + + Session cleanup should happen even when errors occur. + """ + # Arrange + mock_db_session.query.return_value.where.return_value.first.return_value = mock_dataset + + def mock_query_side_effect(*args): + mock_query = MagicMock() + if args[0] == Dataset: + mock_query.where.return_value.first.return_value = mock_dataset + elif args[0] == Document: + mock_query.where.return_value.first.side_effect = mock_documents + return mock_query + + mock_db_session.query.side_effect = mock_query_side_effect + + # Make IndexingRunner raise an exception + mock_indexing_runner.run.side_effect = Exception("Test error") + + with patch("tasks.document_indexing_task.FeatureService.get_features") as mock_features: + mock_features.return_value.billing.enabled = False + + # Act + _document_indexing(dataset_id, document_ids) + + # Assert + assert mock_db_session.close.called + + def test_task_isolation_between_tenants(self, mock_redis): + """ + Test that tasks are properly isolated between different tenants. + + Each tenant should have their own queue and task key. + """ + # Arrange + tenant_1 = str(uuid.uuid4()) + tenant_2 = str(uuid.uuid4()) + dataset_id = str(uuid.uuid4()) + document_ids = [str(uuid.uuid4())] + + # Act + queue_1 = TenantIsolatedTaskQueue(tenant_1, "document_indexing") + queue_2 = TenantIsolatedTaskQueue(tenant_2, "document_indexing") + + # Assert - Different tenants should have different queue keys + assert queue_1._queue != queue_2._queue + assert queue_1._task_key != queue_2._task_key + assert tenant_1 in queue_1._queue + assert tenant_2 in queue_2._queue + + +# ============================================================================ +# Integration Tests +# ============================================================================ + + +class TestAdvancedScenarios: + """Advanced test scenarios for edge cases and complex workflows.""" + + def test_multiple_documents_with_mixed_success_and_failure( + self, dataset_id, mock_db_session, mock_dataset, mock_indexing_runner + ): + """ + Test handling of mixed success and failure scenarios in batch processing. + + When processing multiple documents, some may succeed while others fail. + This tests that the system handles partial failures gracefully. + + Scenario: + - Process 3 documents in a batch + - First document succeeds + - Second document is not found (skipped) + - Third document succeeds + + Expected behavior: + - Only found documents are processed + - Missing documents are skipped without crashing + - IndexingRunner receives only valid documents + """ + # Arrange - Create document IDs with one missing + document_ids = [str(uuid.uuid4()) for _ in range(3)] + + # Create only 2 documents (simulate one missing) + mock_documents = [] + for i, doc_id in enumerate([document_ids[0], document_ids[2]]): # Skip middle one + doc = MagicMock(spec=Document) + doc.id = doc_id + doc.dataset_id = dataset_id + doc.indexing_status = "waiting" + doc.processing_started_at = None + mock_documents.append(doc) + + mock_db_session.query.return_value.where.return_value.first.return_value = mock_dataset + + # Create iterator that returns None for missing document + doc_responses = [mock_documents[0], None, mock_documents[1]] + doc_iter = iter(doc_responses) + + def mock_query_side_effect(*args): + mock_query = MagicMock() + if args[0] == Dataset: + mock_query.where.return_value.first.return_value = mock_dataset + elif args[0] == Document: + mock_query.where.return_value.first = lambda: next(doc_iter, None) + return mock_query + + mock_db_session.query.side_effect = mock_query_side_effect + + with patch("tasks.document_indexing_task.FeatureService.get_features") as mock_features: + mock_features.return_value.billing.enabled = False + + # Act + _document_indexing(dataset_id, document_ids) + + # Assert - Only 2 documents should be processed (missing one skipped) + mock_indexing_runner.run.assert_called_once() + call_args = mock_indexing_runner.run.call_args[0][0] + assert len(call_args) == 2 # Only found documents + + def test_tenant_queue_with_multiple_concurrent_tasks( + self, tenant_id, dataset_id, mock_redis, mock_db_session, mock_dataset + ): + """ + Test concurrent task processing with tenant isolation. + + This tests the scenario where multiple tasks are queued for the same tenant + and need to be processed respecting the concurrency limit. + + Scenario: + - 5 tasks are waiting in the queue + - Concurrency limit is 2 + - After current task completes, pull and enqueue next 2 tasks + + Expected behavior: + - Exactly 2 tasks are pulled from queue (respecting concurrency) + - Each task is enqueued with correct parameters + - Task waiting time is set for each new task + """ + # Arrange + concurrency_limit = 2 + document_ids = [str(uuid.uuid4())] + + # Create multiple waiting tasks + waiting_tasks = [] + for i in range(5): + task_data = {"tenant_id": tenant_id, "dataset_id": dataset_id, "document_ids": [f"doc_{i}"]} + from core.rag.pipeline.queue import TaskWrapper + + wrapper = TaskWrapper(data=task_data) + waiting_tasks.append(wrapper.serialize()) + + # Mock rpop to return tasks up to concurrency limit + mock_redis.rpop.side_effect = waiting_tasks[:concurrency_limit] + [None] + mock_db_session.query.return_value.where.return_value.first.return_value = mock_dataset + + with patch("tasks.document_indexing_task.dify_config.TENANT_ISOLATED_TASK_CONCURRENCY", concurrency_limit): + with patch("tasks.document_indexing_task.normal_document_indexing_task") as mock_task: + # Act + _document_indexing_with_tenant_queue(tenant_id, dataset_id, document_ids, mock_task) + + # Assert + # Should call delay exactly concurrency_limit times + assert mock_task.delay.call_count == concurrency_limit + + # Verify task waiting time was set for each task + assert mock_redis.setex.call_count >= concurrency_limit + + def test_vector_space_limit_edge_case_at_exact_limit( + self, dataset_id, document_ids, mock_db_session, mock_dataset, mock_feature_service + ): + """ + Test vector space limit validation at exact boundary. + + Edge case: When vector space is exactly at the limit (not over), + the upload should still be rejected. + + Scenario: + - Vector space limit: 100 + - Current size: 100 (exactly at limit) + - Try to upload 3 documents + + Expected behavior: + - Upload is rejected with appropriate error message + - All documents are marked with error status + """ + # Arrange + mock_documents = [] + for doc_id in document_ids: + doc = MagicMock(spec=Document) + doc.id = doc_id + doc.dataset_id = dataset_id + doc.indexing_status = "waiting" + doc.error = None + doc.stopped_at = None + mock_documents.append(doc) + + mock_db_session.query.return_value.where.return_value.first.return_value = mock_dataset + + doc_iter = iter(mock_documents) + + def mock_query_side_effect(*args): + mock_query = MagicMock() + if args[0] == Dataset: + mock_query.where.return_value.first.return_value = mock_dataset + elif args[0] == Document: + mock_query.where.return_value.first = lambda: next(doc_iter, None) + return mock_query + + mock_db_session.query.side_effect = mock_query_side_effect + + # Set vector space exactly at limit + mock_feature_service.get_features.return_value.billing.enabled = True + mock_feature_service.get_features.return_value.billing.subscription.plan = CloudPlan.PROFESSIONAL + mock_feature_service.get_features.return_value.vector_space.limit = 100 + mock_feature_service.get_features.return_value.vector_space.size = 100 # Exactly at limit + + # Act + _document_indexing(dataset_id, document_ids) + + # Assert - All documents should have error status + for doc in mock_documents: + assert doc.indexing_status == "error" + assert "over the limit" in doc.error + + def test_task_queue_fifo_ordering(self, tenant_id, dataset_id, mock_redis, mock_db_session, mock_dataset): + """ + Test that tasks are processed in FIFO (First-In-First-Out) order. + + The tenant isolated queue should maintain task order, ensuring + that tasks are processed in the sequence they were added. + + Scenario: + - Task A added first + - Task B added second + - Task C added third + - When pulling tasks, should get A, then B, then C + + Expected behavior: + - Tasks are retrieved in the order they were added + - FIFO ordering is maintained throughout processing + """ + # Arrange + document_ids = [str(uuid.uuid4())] + + # Create tasks with identifiable document IDs to track order + task_order = ["task_A", "task_B", "task_C"] + tasks = [] + for task_name in task_order: + task_data = {"tenant_id": tenant_id, "dataset_id": dataset_id, "document_ids": [task_name]} + from core.rag.pipeline.queue import TaskWrapper + + wrapper = TaskWrapper(data=task_data) + tasks.append(wrapper.serialize()) + + # Mock rpop to return tasks in FIFO order + mock_redis.rpop.side_effect = tasks + [None] + mock_db_session.query.return_value.where.return_value.first.return_value = mock_dataset + + with patch("tasks.document_indexing_task.dify_config.TENANT_ISOLATED_TASK_CONCURRENCY", 3): + with patch("tasks.document_indexing_task.normal_document_indexing_task") as mock_task: + # Act + _document_indexing_with_tenant_queue(tenant_id, dataset_id, document_ids, mock_task) + + # Assert - Verify tasks were enqueued in correct order + assert mock_task.delay.call_count == 3 + + # Check that document_ids in calls match expected order + for i, call_obj in enumerate(mock_task.delay.call_args_list): + called_doc_ids = call_obj[1]["document_ids"] + assert called_doc_ids == [task_order[i]] + + def test_empty_queue_after_task_completion_cleans_up( + self, tenant_id, dataset_id, document_ids, mock_redis, mock_db_session, mock_dataset + ): + """ + Test cleanup behavior when queue becomes empty after task completion. + + After processing the last task in the queue, the system should: + 1. Detect that no more tasks are waiting + 2. Delete the task key to indicate tenant is idle + 3. Allow new tasks to start fresh processing + + Scenario: + - Process a task + - Check queue for next tasks + - Queue is empty + - Task key should be deleted + + Expected behavior: + - Task key is deleted when queue is empty + - Tenant is marked as idle (no active tasks) + """ + # Arrange + mock_redis.rpop.return_value = None # Empty queue + mock_db_session.query.return_value.where.return_value.first.return_value = mock_dataset + + with patch("tasks.document_indexing_task.normal_document_indexing_task") as mock_task: + # Act + _document_indexing_with_tenant_queue(tenant_id, dataset_id, document_ids, mock_task) + + # Assert + # Verify delete was called to clean up task key + mock_redis.delete.assert_called_once() + + # Verify the correct key was deleted (contains tenant_id and "document_indexing") + delete_call_args = mock_redis.delete.call_args[0][0] + assert tenant_id in delete_call_args + assert "document_indexing" in delete_call_args + + def test_billing_disabled_skips_limit_checks( + self, dataset_id, document_ids, mock_db_session, mock_dataset, mock_indexing_runner, mock_feature_service + ): + """ + Test that billing limit checks are skipped when billing is disabled. + + For self-hosted or enterprise deployments where billing is disabled, + the system should not enforce vector space or batch upload limits. + + Scenario: + - Billing is disabled + - Upload 100 documents (would normally exceed limits) + - No limit checks should be performed + + Expected behavior: + - Documents are processed without limit validation + - No errors related to limits + - All documents proceed to indexing + """ + # Arrange - Create many documents + large_batch_ids = [str(uuid.uuid4()) for _ in range(100)] + + mock_documents = [] + for doc_id in large_batch_ids: + doc = MagicMock(spec=Document) + doc.id = doc_id + doc.dataset_id = dataset_id + doc.indexing_status = "waiting" + doc.processing_started_at = None + mock_documents.append(doc) + + mock_db_session.query.return_value.where.return_value.first.return_value = mock_dataset + + doc_iter = iter(mock_documents) + + def mock_query_side_effect(*args): + mock_query = MagicMock() + if args[0] == Dataset: + mock_query.where.return_value.first.return_value = mock_dataset + elif args[0] == Document: + mock_query.where.return_value.first = lambda: next(doc_iter, None) + return mock_query + + mock_db_session.query.side_effect = mock_query_side_effect + + # Billing disabled - limits should not be checked + mock_feature_service.get_features.return_value.billing.enabled = False + + # Act + _document_indexing(dataset_id, large_batch_ids) + + # Assert + # All documents should be set to parsing (no limit errors) + for doc in mock_documents: + assert doc.indexing_status == "parsing" + + # IndexingRunner should be called with all documents + mock_indexing_runner.run.assert_called_once() + call_args = mock_indexing_runner.run.call_args[0][0] + assert len(call_args) == 100 + + +class TestIntegration: + """Integration tests for complete task workflows.""" + + def test_complete_workflow_normal_task( + self, tenant_id, dataset_id, document_ids, mock_redis, mock_db_session, mock_dataset, mock_indexing_runner + ): + """ + Test complete workflow for normal document indexing task. + + This tests the full flow from task receipt to completion. + """ + # Arrange - Create actual document objects + mock_documents = [] + for doc_id in document_ids: + doc = MagicMock(spec=Document) + doc.id = doc_id + doc.dataset_id = dataset_id + doc.indexing_status = "waiting" + doc.processing_started_at = None + mock_documents.append(doc) + + # Set up rpop to return None for concurrency check (no more tasks) + mock_redis.rpop.side_effect = [None] + mock_db_session.query.return_value.where.return_value.first.return_value = mock_dataset + + doc_iter = iter(mock_documents) + + def mock_query_side_effect(*args): + mock_query = MagicMock() + if args[0] == Dataset: + mock_query.where.return_value.first.return_value = mock_dataset + elif args[0] == Document: + mock_query.where.return_value.first = lambda: next(doc_iter, None) + return mock_query + + mock_db_session.query.side_effect = mock_query_side_effect + + with patch("tasks.document_indexing_task.FeatureService.get_features") as mock_features: + mock_features.return_value.billing.enabled = False + + # Act + normal_document_indexing_task(tenant_id, dataset_id, document_ids) + + # Assert + # Documents should be processed + mock_indexing_runner.run.assert_called_once() + # Session should be closed + assert mock_db_session.close.called + # Task key should be deleted (no more tasks) + assert mock_redis.delete.called + + def test_complete_workflow_priority_task( + self, tenant_id, dataset_id, document_ids, mock_redis, mock_db_session, mock_dataset, mock_indexing_runner + ): + """ + Test complete workflow for priority document indexing task. + + Priority tasks should follow the same flow as normal tasks. + """ + # Arrange - Create actual document objects + mock_documents = [] + for doc_id in document_ids: + doc = MagicMock(spec=Document) + doc.id = doc_id + doc.dataset_id = dataset_id + doc.indexing_status = "waiting" + doc.processing_started_at = None + mock_documents.append(doc) + + # Set up rpop to return None for concurrency check (no more tasks) + mock_redis.rpop.side_effect = [None] + mock_db_session.query.return_value.where.return_value.first.return_value = mock_dataset + + doc_iter = iter(mock_documents) + + def mock_query_side_effect(*args): + mock_query = MagicMock() + if args[0] == Dataset: + mock_query.where.return_value.first.return_value = mock_dataset + elif args[0] == Document: + mock_query.where.return_value.first = lambda: next(doc_iter, None) + return mock_query + + mock_db_session.query.side_effect = mock_query_side_effect + + with patch("tasks.document_indexing_task.FeatureService.get_features") as mock_features: + mock_features.return_value.billing.enabled = False + + # Act + priority_document_indexing_task(tenant_id, dataset_id, document_ids) + + # Assert + mock_indexing_runner.run.assert_called_once() + assert mock_db_session.close.called + assert mock_redis.delete.called + + def test_queue_chain_processing( + self, tenant_id, dataset_id, mock_redis, mock_db_session, mock_dataset, mock_indexing_runner + ): + """ + Test that multiple tasks in queue are processed in sequence. + + When tasks are queued, they should be processed one after another. + """ + # Arrange + task_1_docs = [str(uuid.uuid4())] + task_2_docs = [str(uuid.uuid4())] + + task_2_data = {"tenant_id": tenant_id, "dataset_id": dataset_id, "document_ids": task_2_docs} + + from core.rag.pipeline.queue import TaskWrapper + + wrapper = TaskWrapper(data=task_2_data) + + # First call returns task 2, second call returns None + mock_redis.rpop.side_effect = [wrapper.serialize(), None] + + mock_db_session.query.return_value.where.return_value.first.return_value = mock_dataset + + with patch("tasks.document_indexing_task.FeatureService.get_features") as mock_features: + mock_features.return_value.billing.enabled = False + + with patch("tasks.document_indexing_task.normal_document_indexing_task") as mock_task: + # Act - Process first task + _document_indexing_with_tenant_queue(tenant_id, dataset_id, task_1_docs, mock_task) + + # Assert - Second task should be enqueued + assert mock_task.delay.called + call_args = mock_task.delay.call_args + assert call_args[1]["document_ids"] == task_2_docs + + +# ============================================================================ +# Additional Edge Case Tests +# ============================================================================ + + +class TestEdgeCases: + """Test edge cases and boundary conditions.""" + + def test_single_document_processing(self, dataset_id, mock_db_session, mock_dataset, mock_indexing_runner): + """ + Test processing a single document (minimum batch size). + + Single document processing is a common case and should work + without any special handling or errors. + + Scenario: + - Process exactly 1 document + - Document exists and is valid + + Expected behavior: + - Document is processed successfully + - Status is updated to 'parsing' + - IndexingRunner is called with single document + """ + # Arrange + document_ids = [str(uuid.uuid4())] + + mock_document = MagicMock(spec=Document) + mock_document.id = document_ids[0] + mock_document.dataset_id = dataset_id + mock_document.indexing_status = "waiting" + mock_document.processing_started_at = None + + mock_db_session.query.return_value.where.return_value.first.return_value = mock_dataset + + def mock_query_side_effect(*args): + mock_query = MagicMock() + if args[0] == Dataset: + mock_query.where.return_value.first.return_value = mock_dataset + elif args[0] == Document: + mock_query.where.return_value.first = lambda: mock_document + return mock_query + + mock_db_session.query.side_effect = mock_query_side_effect + + with patch("tasks.document_indexing_task.FeatureService.get_features") as mock_features: + mock_features.return_value.billing.enabled = False + + # Act + _document_indexing(dataset_id, document_ids) + + # Assert + assert mock_document.indexing_status == "parsing" + mock_indexing_runner.run.assert_called_once() + call_args = mock_indexing_runner.run.call_args[0][0] + assert len(call_args) == 1 + + def test_document_with_special_characters_in_id( + self, dataset_id, mock_db_session, mock_dataset, mock_indexing_runner + ): + """ + Test handling documents with special characters in IDs. + + Document IDs might contain special characters or unusual formats. + The system should handle these without errors. + + Scenario: + - Document ID contains hyphens, underscores + - Standard UUID format + + Expected behavior: + - Document is processed normally + - No parsing or encoding errors + """ + # Arrange - UUID format with standard characters + document_ids = [str(uuid.uuid4())] + + mock_document = MagicMock(spec=Document) + mock_document.id = document_ids[0] + mock_document.dataset_id = dataset_id + mock_document.indexing_status = "waiting" + mock_document.processing_started_at = None + + mock_db_session.query.return_value.where.return_value.first.return_value = mock_dataset + + def mock_query_side_effect(*args): + mock_query = MagicMock() + if args[0] == Dataset: + mock_query.where.return_value.first.return_value = mock_dataset + elif args[0] == Document: + mock_query.where.return_value.first = lambda: mock_document + return mock_query + + mock_db_session.query.side_effect = mock_query_side_effect + + with patch("tasks.document_indexing_task.FeatureService.get_features") as mock_features: + mock_features.return_value.billing.enabled = False + + # Act - Should not raise any exceptions + _document_indexing(dataset_id, document_ids) + + # Assert + assert mock_document.indexing_status == "parsing" + mock_indexing_runner.run.assert_called_once() + + def test_rapid_successive_task_enqueuing(self, tenant_id, dataset_id, mock_redis): + """ + Test rapid successive task enqueuing to the same tenant queue. + + When multiple tasks are enqueued rapidly for the same tenant, + the system should queue them properly without race conditions. + + Scenario: + - First task starts processing (task key exists) + - Multiple tasks enqueued rapidly while first is running + - All should be added to waiting queue + + Expected behavior: + - All tasks are queued (not executed immediately) + - No tasks are lost + - Queue maintains all tasks + """ + # Arrange + document_ids_list = [[str(uuid.uuid4())] for _ in range(5)] + + # Simulate task already running + mock_redis.get.return_value = b"1" + + with patch.object(DocumentIndexingTaskProxy, "features") as mock_features: + mock_features.billing.enabled = True + mock_features.billing.subscription.plan = CloudPlan.PROFESSIONAL + + with patch("services.document_indexing_task_proxy.priority_document_indexing_task") as mock_task: + # Act - Enqueue multiple tasks rapidly + for doc_ids in document_ids_list: + proxy = DocumentIndexingTaskProxy(tenant_id, dataset_id, doc_ids) + proxy.delay() + + # Assert - All tasks should be pushed to queue, none executed + assert mock_redis.lpush.call_count == 5 + mock_task.delay.assert_not_called() + + def test_zero_vector_space_limit_allows_unlimited( + self, dataset_id, document_ids, mock_db_session, mock_dataset, mock_indexing_runner, mock_feature_service + ): + """ + Test that zero vector space limit means unlimited. + + When vector_space.limit is 0, it indicates no limit is enforced, + allowing unlimited document uploads. + + Scenario: + - Vector space limit: 0 (unlimited) + - Current size: 1000 (any number) + - Upload 3 documents + + Expected behavior: + - Upload is allowed + - No limit errors + - Documents are processed normally + """ + # Arrange + mock_documents = [] + for doc_id in document_ids: + doc = MagicMock(spec=Document) + doc.id = doc_id + doc.dataset_id = dataset_id + doc.indexing_status = "waiting" + doc.processing_started_at = None + mock_documents.append(doc) + + mock_db_session.query.return_value.where.return_value.first.return_value = mock_dataset + + doc_iter = iter(mock_documents) + + def mock_query_side_effect(*args): + mock_query = MagicMock() + if args[0] == Dataset: + mock_query.where.return_value.first.return_value = mock_dataset + elif args[0] == Document: + mock_query.where.return_value.first = lambda: next(doc_iter, None) + return mock_query + + mock_db_session.query.side_effect = mock_query_side_effect + + # Set vector space limit to 0 (unlimited) + mock_feature_service.get_features.return_value.billing.enabled = True + mock_feature_service.get_features.return_value.billing.subscription.plan = CloudPlan.PROFESSIONAL + mock_feature_service.get_features.return_value.vector_space.limit = 0 # Unlimited + mock_feature_service.get_features.return_value.vector_space.size = 1000 + + # Act + _document_indexing(dataset_id, document_ids) + + # Assert - All documents should be processed (no limit error) + for doc in mock_documents: + assert doc.indexing_status == "parsing" + + mock_indexing_runner.run.assert_called_once() + + def test_negative_vector_space_values_handled_gracefully( + self, dataset_id, document_ids, mock_db_session, mock_dataset, mock_indexing_runner, mock_feature_service + ): + """ + Test handling of negative vector space values. + + Negative values in vector space configuration should be treated + as unlimited or invalid, not causing crashes. + + Scenario: + - Vector space limit: -1 (invalid/unlimited indicator) + - Current size: 100 + - Upload 3 documents + + Expected behavior: + - Upload is allowed (negative treated as no limit) + - No crashes or validation errors + """ + # Arrange + mock_documents = [] + for doc_id in document_ids: + doc = MagicMock(spec=Document) + doc.id = doc_id + doc.dataset_id = dataset_id + doc.indexing_status = "waiting" + doc.processing_started_at = None + mock_documents.append(doc) + + mock_db_session.query.return_value.where.return_value.first.return_value = mock_dataset + + doc_iter = iter(mock_documents) + + def mock_query_side_effect(*args): + mock_query = MagicMock() + if args[0] == Dataset: + mock_query.where.return_value.first.return_value = mock_dataset + elif args[0] == Document: + mock_query.where.return_value.first = lambda: next(doc_iter, None) + return mock_query + + mock_db_session.query.side_effect = mock_query_side_effect + + # Set negative vector space limit + mock_feature_service.get_features.return_value.billing.enabled = True + mock_feature_service.get_features.return_value.billing.subscription.plan = CloudPlan.PROFESSIONAL + mock_feature_service.get_features.return_value.vector_space.limit = -1 # Negative + mock_feature_service.get_features.return_value.vector_space.size = 100 + + # Act + _document_indexing(dataset_id, document_ids) + + # Assert - Should process normally (negative treated as unlimited) + for doc in mock_documents: + assert doc.indexing_status == "parsing" + + +class TestPerformanceScenarios: + """Test performance-related scenarios and optimizations.""" + + def test_large_document_batch_processing( + self, dataset_id, mock_db_session, mock_dataset, mock_indexing_runner, mock_feature_service + ): + """ + Test processing a large batch of documents at batch limit. + + When processing the maximum allowed batch size, the system + should handle it efficiently without errors. + + Scenario: + - Process exactly batch_upload_limit documents (e.g., 50) + - All documents are valid + - Billing is enabled + + Expected behavior: + - All documents are processed successfully + - No timeout or memory issues + - Batch limit is not exceeded + """ + # Arrange + batch_limit = 50 + document_ids = [str(uuid.uuid4()) for _ in range(batch_limit)] + + mock_documents = [] + for doc_id in document_ids: + doc = MagicMock(spec=Document) + doc.id = doc_id + doc.dataset_id = dataset_id + doc.indexing_status = "waiting" + doc.processing_started_at = None + mock_documents.append(doc) + + mock_db_session.query.return_value.where.return_value.first.return_value = mock_dataset + + doc_iter = iter(mock_documents) + + def mock_query_side_effect(*args): + mock_query = MagicMock() + if args[0] == Dataset: + mock_query.where.return_value.first.return_value = mock_dataset + elif args[0] == Document: + mock_query.where.return_value.first = lambda: next(doc_iter, None) + return mock_query + + mock_db_session.query.side_effect = mock_query_side_effect + + # Configure billing with sufficient limits + mock_feature_service.get_features.return_value.billing.enabled = True + mock_feature_service.get_features.return_value.billing.subscription.plan = CloudPlan.PROFESSIONAL + mock_feature_service.get_features.return_value.vector_space.limit = 10000 + mock_feature_service.get_features.return_value.vector_space.size = 0 + + with patch("tasks.document_indexing_task.dify_config.BATCH_UPLOAD_LIMIT", str(batch_limit)): + # Act + _document_indexing(dataset_id, document_ids) + + # Assert + for doc in mock_documents: + assert doc.indexing_status == "parsing" + + mock_indexing_runner.run.assert_called_once() + call_args = mock_indexing_runner.run.call_args[0][0] + assert len(call_args) == batch_limit + + def test_tenant_queue_handles_burst_traffic(self, tenant_id, dataset_id, mock_redis, mock_db_session, mock_dataset): + """ + Test tenant queue handling burst traffic scenarios. + + When many tasks arrive in a burst for the same tenant, + the queue should handle them efficiently without dropping tasks. + + Scenario: + - 20 tasks arrive rapidly + - Concurrency limit is 3 + - Tasks should be queued and processed in batches + + Expected behavior: + - First 3 tasks are processed immediately + - Remaining tasks wait in queue + - No tasks are lost + """ + # Arrange + num_tasks = 20 + concurrency_limit = 3 + document_ids = [str(uuid.uuid4())] + + # Create waiting tasks + waiting_tasks = [] + for i in range(num_tasks): + task_data = { + "tenant_id": tenant_id, + "dataset_id": dataset_id, + "document_ids": [f"doc_{i}"], + } + from core.rag.pipeline.queue import TaskWrapper + + wrapper = TaskWrapper(data=task_data) + waiting_tasks.append(wrapper.serialize()) + + # Mock rpop to return tasks up to concurrency limit + mock_redis.rpop.side_effect = waiting_tasks[:concurrency_limit] + [None] + mock_db_session.query.return_value.where.return_value.first.return_value = mock_dataset + + with patch("tasks.document_indexing_task.dify_config.TENANT_ISOLATED_TASK_CONCURRENCY", concurrency_limit): + with patch("tasks.document_indexing_task.normal_document_indexing_task") as mock_task: + # Act + _document_indexing_with_tenant_queue(tenant_id, dataset_id, document_ids, mock_task) + + # Assert - Should process exactly concurrency_limit tasks + assert mock_task.delay.call_count == concurrency_limit + + def test_multiple_tenants_isolated_processing(self, mock_redis): + """ + Test that multiple tenants process tasks in isolation. + + When multiple tenants have tasks running simultaneously, + they should not interfere with each other. + + Scenario: + - Tenant A has tasks in queue + - Tenant B has tasks in queue + - Both process independently + + Expected behavior: + - Each tenant has separate queue + - Each tenant has separate task key + - No cross-tenant interference + """ + # Arrange + tenant_a = str(uuid.uuid4()) + tenant_b = str(uuid.uuid4()) + dataset_id = str(uuid.uuid4()) + document_ids = [str(uuid.uuid4())] + + # Create queues for both tenants + queue_a = TenantIsolatedTaskQueue(tenant_a, "document_indexing") + queue_b = TenantIsolatedTaskQueue(tenant_b, "document_indexing") + + # Act - Set task keys for both tenants + queue_a.set_task_waiting_time() + queue_b.set_task_waiting_time() + + # Assert - Each tenant has independent queue and key + assert queue_a._queue != queue_b._queue + assert queue_a._task_key != queue_b._task_key + assert tenant_a in queue_a._queue + assert tenant_b in queue_b._queue + assert tenant_a in queue_a._task_key + assert tenant_b in queue_b._task_key + + +class TestRobustness: + """Test system robustness and resilience.""" + + def test_indexing_runner_exception_does_not_crash_task( + self, dataset_id, document_ids, mock_db_session, mock_dataset, mock_indexing_runner + ): + """ + Test that IndexingRunner exceptions are handled gracefully. + + When IndexingRunner raises an unexpected exception during processing, + the task should catch it, log it, and clean up properly. + + Scenario: + - Documents are prepared for indexing + - IndexingRunner.run() raises RuntimeError + - Task should not crash + + Expected behavior: + - Exception is caught and logged + - Database session is closed + - Task completes (doesn't hang) + """ + # Arrange + mock_documents = [] + for doc_id in document_ids: + doc = MagicMock(spec=Document) + doc.id = doc_id + doc.dataset_id = dataset_id + doc.indexing_status = "waiting" + doc.processing_started_at = None + mock_documents.append(doc) + + mock_db_session.query.return_value.where.return_value.first.return_value = mock_dataset + + doc_iter = iter(mock_documents) + + def mock_query_side_effect(*args): + mock_query = MagicMock() + if args[0] == Dataset: + mock_query.where.return_value.first.return_value = mock_dataset + elif args[0] == Document: + mock_query.where.return_value.first = lambda: next(doc_iter, None) + return mock_query + + mock_db_session.query.side_effect = mock_query_side_effect + + # Make IndexingRunner raise an exception + mock_indexing_runner.run.side_effect = RuntimeError("Unexpected indexing error") + + with patch("tasks.document_indexing_task.FeatureService.get_features") as mock_features: + mock_features.return_value.billing.enabled = False + + # Act - Should not raise exception + _document_indexing(dataset_id, document_ids) + + # Assert - Session should be closed even after error + assert mock_db_session.close.called + + def test_database_session_always_closed_on_success( + self, dataset_id, document_ids, mock_db_session, mock_dataset, mock_indexing_runner + ): + """ + Test that database session is always closed on successful completion. + + Proper resource cleanup is critical. The database session must + be closed in the finally block to prevent connection leaks. + + Scenario: + - Task processes successfully + - No exceptions occur + + Expected behavior: + - Database session is closed + - No connection leaks + """ + # Arrange + mock_documents = [] + for doc_id in document_ids: + doc = MagicMock(spec=Document) + doc.id = doc_id + doc.dataset_id = dataset_id + doc.indexing_status = "waiting" + doc.processing_started_at = None + mock_documents.append(doc) + + mock_db_session.query.return_value.where.return_value.first.return_value = mock_dataset + + doc_iter = iter(mock_documents) + + def mock_query_side_effect(*args): + mock_query = MagicMock() + if args[0] == Dataset: + mock_query.where.return_value.first.return_value = mock_dataset + elif args[0] == Document: + mock_query.where.return_value.first = lambda: next(doc_iter, None) + return mock_query + + mock_db_session.query.side_effect = mock_query_side_effect + + with patch("tasks.document_indexing_task.FeatureService.get_features") as mock_features: + mock_features.return_value.billing.enabled = False + + # Act + _document_indexing(dataset_id, document_ids) + + # Assert + assert mock_db_session.close.called + # Verify close is called exactly once + assert mock_db_session.close.call_count == 1 + + def test_task_proxy_handles_feature_service_failure(self, tenant_id, dataset_id, document_ids, mock_redis): + """ + Test that task proxy handles FeatureService failures gracefully. + + If FeatureService fails to retrieve features, the system should + have a fallback or handle the error appropriately. + + Scenario: + - FeatureService.get_features() raises an exception during dispatch + - Task enqueuing should handle the error + + Expected behavior: + - Exception is raised when trying to dispatch + - System doesn't crash unexpectedly + - Error is propagated appropriately + """ + # Arrange + with patch("services.document_indexing_task_proxy.FeatureService.get_features") as mock_get_features: + # Simulate FeatureService failure + mock_get_features.side_effect = Exception("Feature service unavailable") + + # Create proxy instance + proxy = DocumentIndexingTaskProxy(tenant_id, dataset_id, document_ids) + + # Act & Assert - Should raise exception when trying to delay (which accesses features) + with pytest.raises(Exception) as exc_info: + proxy.delay() + + # Verify the exception message + assert "Feature service" in str(exc_info.value) or isinstance(exc_info.value, Exception)