diff --git a/api/tests/test_containers_integration_tests/tasks/test_document_indexing_sync_task.py b/api/tests/test_containers_integration_tests/tasks/test_document_indexing_sync_task.py new file mode 100644 index 0000000000..0b9e29fde9 --- /dev/null +++ b/api/tests/test_containers_integration_tests/tasks/test_document_indexing_sync_task.py @@ -0,0 +1,464 @@ +""" +Integration tests for document_indexing_sync_task using testcontainers. + +This module validates SQL-backed behavior for document sync flows: +- Notion sync precondition checks +- Segment cleanup and document state updates +- Credential and indexing error handling +""" + +import json +from unittest.mock import Mock, patch +from uuid import uuid4 + +import pytest +from psycopg2.extensions import register_adapter +from psycopg2.extras import Json + +from core.indexing_runner import DocumentIsPausedError, IndexingRunner +from models import Account, Tenant, TenantAccountJoin, TenantAccountRole +from models.dataset import Dataset, Document, DocumentSegment +from tasks.document_indexing_sync_task import document_indexing_sync_task + + +@pytest.fixture(autouse=True) +def _register_dict_adapter_for_psycopg2(): + """Align test DB adapter behavior with dict payloads used in task update flow.""" + register_adapter(dict, Json) + + +class DocumentIndexingSyncTaskTestDataFactory: + """Create real DB entities for document indexing sync integration tests.""" + + @staticmethod + def create_account_with_tenant(db_session_with_containers) -> tuple[Account, Tenant]: + account = Account( + email=f"{uuid4()}@example.com", + name=f"user-{uuid4()}", + interface_language="en-US", + status="active", + ) + db_session_with_containers.add(account) + db_session_with_containers.flush() + + tenant = Tenant(name=f"tenant-{account.id}", status="normal") + db_session_with_containers.add(tenant) + db_session_with_containers.flush() + + join = TenantAccountJoin( + tenant_id=tenant.id, + account_id=account.id, + role=TenantAccountRole.OWNER, + current=True, + ) + db_session_with_containers.add(join) + db_session_with_containers.commit() + + return account, tenant + + @staticmethod + def create_dataset(db_session_with_containers, tenant_id: str, created_by: str) -> Dataset: + dataset = Dataset( + tenant_id=tenant_id, + name=f"dataset-{uuid4()}", + description="sync test dataset", + data_source_type="notion_import", + indexing_technique="high_quality", + created_by=created_by, + ) + db_session_with_containers.add(dataset) + db_session_with_containers.commit() + return dataset + + @staticmethod + def create_document( + db_session_with_containers, + *, + tenant_id: str, + dataset_id: str, + created_by: str, + data_source_info: dict | None, + indexing_status: str = "completed", + ) -> Document: + document = Document( + tenant_id=tenant_id, + dataset_id=dataset_id, + position=0, + data_source_type="notion_import", + data_source_info=json.dumps(data_source_info) if data_source_info is not None else None, + batch="test-batch", + name=f"doc-{uuid4()}", + created_from="notion_import", + created_by=created_by, + indexing_status=indexing_status, + enabled=True, + doc_form="text_model", + doc_language="en", + ) + db_session_with_containers.add(document) + db_session_with_containers.commit() + return document + + @staticmethod + def create_segments( + db_session_with_containers, + *, + tenant_id: str, + dataset_id: str, + document_id: str, + created_by: str, + count: int = 3, + ) -> list[DocumentSegment]: + segments: list[DocumentSegment] = [] + for i in range(count): + segment = DocumentSegment( + tenant_id=tenant_id, + dataset_id=dataset_id, + document_id=document_id, + position=i, + content=f"segment-{i}", + answer=None, + word_count=10, + tokens=5, + index_node_id=f"node-{document_id}-{i}", + status="completed", + created_by=created_by, + ) + db_session_with_containers.add(segment) + segments.append(segment) + db_session_with_containers.commit() + return segments + + +class TestDocumentIndexingSyncTask: + """Integration tests for document_indexing_sync_task with real database assertions.""" + + @pytest.fixture + def mock_external_dependencies(self): + """Patch only external collaborators; keep DB access real.""" + with ( + patch("tasks.document_indexing_sync_task.DatasourceProviderService") as mock_datasource_service_class, + patch("tasks.document_indexing_sync_task.NotionExtractor") as mock_notion_extractor_class, + patch("tasks.document_indexing_sync_task.IndexProcessorFactory") as mock_index_processor_factory, + patch("tasks.document_indexing_sync_task.IndexingRunner") as mock_indexing_runner_class, + ): + datasource_service = Mock() + datasource_service.get_datasource_credentials.return_value = {"integration_secret": "test_token"} + mock_datasource_service_class.return_value = datasource_service + + notion_extractor = Mock() + notion_extractor.get_notion_last_edited_time.return_value = "2024-01-02T00:00:00Z" + mock_notion_extractor_class.return_value = notion_extractor + + index_processor = Mock() + index_processor.clean = Mock() + mock_index_processor_factory.return_value.init_index_processor.return_value = index_processor + + indexing_runner = Mock(spec=IndexingRunner) + indexing_runner.run = Mock() + mock_indexing_runner_class.return_value = indexing_runner + + yield { + "datasource_service": datasource_service, + "notion_extractor": notion_extractor, + "notion_extractor_class": mock_notion_extractor_class, + "index_processor": index_processor, + "index_processor_factory": mock_index_processor_factory, + "indexing_runner": indexing_runner, + } + + def _create_notion_sync_context(self, db_session_with_containers, *, data_source_info: dict | None = None): + account, tenant = DocumentIndexingSyncTaskTestDataFactory.create_account_with_tenant(db_session_with_containers) + dataset = DocumentIndexingSyncTaskTestDataFactory.create_dataset( + db_session_with_containers, + tenant_id=tenant.id, + created_by=account.id, + ) + + notion_info = data_source_info or { + "notion_workspace_id": str(uuid4()), + "notion_page_id": str(uuid4()), + "type": "page", + "last_edited_time": "2024-01-01T00:00:00Z", + "credential_id": str(uuid4()), + } + + document = DocumentIndexingSyncTaskTestDataFactory.create_document( + db_session_with_containers, + tenant_id=tenant.id, + dataset_id=dataset.id, + created_by=account.id, + data_source_info=notion_info, + indexing_status="completed", + ) + + segments = DocumentIndexingSyncTaskTestDataFactory.create_segments( + db_session_with_containers, + tenant_id=tenant.id, + dataset_id=dataset.id, + document_id=document.id, + created_by=account.id, + count=3, + ) + + return { + "account": account, + "tenant": tenant, + "dataset": dataset, + "document": document, + "segments": segments, + "node_ids": [segment.index_node_id for segment in segments], + "notion_info": notion_info, + } + + def test_document_not_found(self, db_session_with_containers, mock_external_dependencies): + """Test that task handles missing document gracefully.""" + # Arrange + dataset_id = str(uuid4()) + document_id = str(uuid4()) + + # Act + document_indexing_sync_task(dataset_id, document_id) + + # Assert + mock_external_dependencies["datasource_service"].get_datasource_credentials.assert_not_called() + mock_external_dependencies["indexing_runner"].run.assert_not_called() + + def test_missing_notion_workspace_id(self, db_session_with_containers, mock_external_dependencies): + """Test that task raises error when notion_workspace_id is missing.""" + # Arrange + context = self._create_notion_sync_context( + db_session_with_containers, + data_source_info={ + "notion_page_id": str(uuid4()), + "type": "page", + "last_edited_time": "2024-01-01T00:00:00Z", + }, + ) + + # Act & Assert + with pytest.raises(ValueError, match="no notion page found"): + document_indexing_sync_task(context["dataset"].id, context["document"].id) + + def test_missing_notion_page_id(self, db_session_with_containers, mock_external_dependencies): + """Test that task raises error when notion_page_id is missing.""" + # Arrange + context = self._create_notion_sync_context( + db_session_with_containers, + data_source_info={ + "notion_workspace_id": str(uuid4()), + "type": "page", + "last_edited_time": "2024-01-01T00:00:00Z", + }, + ) + + # Act & Assert + with pytest.raises(ValueError, match="no notion page found"): + document_indexing_sync_task(context["dataset"].id, context["document"].id) + + def test_empty_data_source_info(self, db_session_with_containers, mock_external_dependencies): + """Test that task raises error when data_source_info is empty.""" + # Arrange + context = self._create_notion_sync_context(db_session_with_containers, data_source_info=None) + db_session_with_containers.query(Document).where(Document.id == context["document"].id).update( + {"data_source_info": None} + ) + db_session_with_containers.commit() + + # Act & Assert + with pytest.raises(ValueError, match="no notion page found"): + document_indexing_sync_task(context["dataset"].id, context["document"].id) + + def test_credential_not_found(self, db_session_with_containers, mock_external_dependencies): + """Test that task sets document error state when credential is missing.""" + # Arrange + context = self._create_notion_sync_context(db_session_with_containers) + mock_external_dependencies["datasource_service"].get_datasource_credentials.return_value = None + + # Act + document_indexing_sync_task(context["dataset"].id, context["document"].id) + + # Assert + db_session_with_containers.expire_all() + updated_document = ( + db_session_with_containers.query(Document).where(Document.id == context["document"].id).first() + ) + assert updated_document is not None + assert updated_document.indexing_status == "error" + assert "Datasource credential not found" in updated_document.error + assert updated_document.stopped_at is not None + mock_external_dependencies["indexing_runner"].run.assert_not_called() + + def test_page_not_updated(self, db_session_with_containers, mock_external_dependencies): + """Test that task exits early when notion page is unchanged.""" + # Arrange + context = self._create_notion_sync_context(db_session_with_containers) + mock_external_dependencies["notion_extractor"].get_notion_last_edited_time.return_value = "2024-01-01T00:00:00Z" + + # Act + document_indexing_sync_task(context["dataset"].id, context["document"].id) + + # Assert + db_session_with_containers.expire_all() + updated_document = ( + db_session_with_containers.query(Document).where(Document.id == context["document"].id).first() + ) + remaining_segments = ( + db_session_with_containers.query(DocumentSegment) + .where(DocumentSegment.document_id == context["document"].id) + .count() + ) + assert updated_document is not None + assert updated_document.indexing_status == "completed" + assert updated_document.processing_started_at is None + assert remaining_segments == 3 + mock_external_dependencies["index_processor"].clean.assert_not_called() + mock_external_dependencies["indexing_runner"].run.assert_not_called() + + def test_successful_sync_when_page_updated(self, db_session_with_containers, mock_external_dependencies): + """Test full successful sync flow with SQL state updates and side effects.""" + # Arrange + context = self._create_notion_sync_context(db_session_with_containers) + + # Act + document_indexing_sync_task(context["dataset"].id, context["document"].id) + + # Assert + db_session_with_containers.expire_all() + updated_document = ( + db_session_with_containers.query(Document).where(Document.id == context["document"].id).first() + ) + remaining_segments = ( + db_session_with_containers.query(DocumentSegment) + .where(DocumentSegment.document_id == context["document"].id) + .count() + ) + + assert updated_document is not None + assert updated_document.indexing_status == "parsing" + assert updated_document.processing_started_at is not None + assert updated_document.data_source_info_dict.get("last_edited_time") == "2024-01-02T00:00:00Z" + assert remaining_segments == 0 + + clean_call_args = mock_external_dependencies["index_processor"].clean.call_args + assert clean_call_args is not None + clean_args, clean_kwargs = clean_call_args + assert getattr(clean_args[0], "id", None) == context["dataset"].id + assert set(clean_args[1]) == set(context["node_ids"]) + assert clean_kwargs.get("with_keywords") is True + assert clean_kwargs.get("delete_child_chunks") is True + + run_call_args = mock_external_dependencies["indexing_runner"].run.call_args + assert run_call_args is not None + run_documents = run_call_args[0][0] + assert len(run_documents) == 1 + assert getattr(run_documents[0], "id", None) == context["document"].id + + def test_dataset_not_found_during_cleaning(self, db_session_with_containers, mock_external_dependencies): + """Test that task still updates document and reindexes if dataset vanishes before clean.""" + # Arrange + context = self._create_notion_sync_context(db_session_with_containers) + + def _delete_dataset_before_clean() -> str: + db_session_with_containers.query(Dataset).where(Dataset.id == context["dataset"].id).delete() + db_session_with_containers.commit() + return "2024-01-02T00:00:00Z" + + mock_external_dependencies[ + "notion_extractor" + ].get_notion_last_edited_time.side_effect = _delete_dataset_before_clean + + # Act + document_indexing_sync_task(context["dataset"].id, context["document"].id) + + # Assert + db_session_with_containers.expire_all() + updated_document = ( + db_session_with_containers.query(Document).where(Document.id == context["document"].id).first() + ) + assert updated_document is not None + assert updated_document.indexing_status == "parsing" + mock_external_dependencies["index_processor"].clean.assert_not_called() + mock_external_dependencies["indexing_runner"].run.assert_called_once() + + def test_cleaning_error_continues_to_indexing(self, db_session_with_containers, mock_external_dependencies): + """Test that indexing continues when index cleanup fails.""" + # Arrange + context = self._create_notion_sync_context(db_session_with_containers) + mock_external_dependencies["index_processor"].clean.side_effect = Exception("Cleaning error") + + # Act + document_indexing_sync_task(context["dataset"].id, context["document"].id) + + # Assert + db_session_with_containers.expire_all() + updated_document = ( + db_session_with_containers.query(Document).where(Document.id == context["document"].id).first() + ) + remaining_segments = ( + db_session_with_containers.query(DocumentSegment) + .where(DocumentSegment.document_id == context["document"].id) + .count() + ) + assert updated_document is not None + assert updated_document.indexing_status == "parsing" + assert remaining_segments == 0 + mock_external_dependencies["indexing_runner"].run.assert_called_once() + + def test_indexing_runner_document_paused_error(self, db_session_with_containers, mock_external_dependencies): + """Test that DocumentIsPausedError does not flip document into error state.""" + # Arrange + context = self._create_notion_sync_context(db_session_with_containers) + mock_external_dependencies["indexing_runner"].run.side_effect = DocumentIsPausedError("Document paused") + + # Act + document_indexing_sync_task(context["dataset"].id, context["document"].id) + + # Assert + db_session_with_containers.expire_all() + updated_document = ( + db_session_with_containers.query(Document).where(Document.id == context["document"].id).first() + ) + assert updated_document is not None + assert updated_document.indexing_status == "parsing" + assert updated_document.error is None + + def test_indexing_runner_general_error(self, db_session_with_containers, mock_external_dependencies): + """Test that indexing errors are persisted to document state.""" + # Arrange + context = self._create_notion_sync_context(db_session_with_containers) + mock_external_dependencies["indexing_runner"].run.side_effect = Exception("Indexing error") + + # Act + document_indexing_sync_task(context["dataset"].id, context["document"].id) + + # Assert + db_session_with_containers.expire_all() + updated_document = ( + db_session_with_containers.query(Document).where(Document.id == context["document"].id).first() + ) + assert updated_document is not None + assert updated_document.indexing_status == "error" + assert "Indexing error" in updated_document.error + assert updated_document.stopped_at is not None + + def test_index_processor_clean_called_with_correct_params( + self, + db_session_with_containers, + mock_external_dependencies, + ): + """Test that clean is called with dataset instance and collected node ids.""" + # Arrange + context = self._create_notion_sync_context(db_session_with_containers) + + # Act + document_indexing_sync_task(context["dataset"].id, context["document"].id) + + # Assert + clean_call_args = mock_external_dependencies["index_processor"].clean.call_args + assert clean_call_args is not None + clean_args, clean_kwargs = clean_call_args + assert getattr(clean_args[0], "id", None) == context["dataset"].id + assert set(clean_args[1]) == set(context["node_ids"]) + assert clean_kwargs.get("with_keywords") is True + assert clean_kwargs.get("delete_child_chunks") is True diff --git a/api/tests/unit_tests/tasks/test_document_indexing_sync_task.py b/api/tests/unit_tests/tasks/test_document_indexing_sync_task.py index 549f2c6c9b..1a4cedc60a 100644 --- a/api/tests/unit_tests/tasks/test_document_indexing_sync_task.py +++ b/api/tests/unit_tests/tasks/test_document_indexing_sync_task.py @@ -1,12 +1,8 @@ """ -Unit tests for document indexing sync task. +Unit tests for collaborator parameter wiring in document_indexing_sync_task. -This module tests the document indexing sync task functionality including: -- Syncing Notion documents when updated -- Validating document and data source existence -- Credential validation and retrieval -- Cleaning old segments before re-indexing -- Error handling and edge cases +These tests intentionally stay in unit scope because they validate call arguments +for external collaborators rather than SQL-backed state transitions. """ import uuid @@ -14,187 +10,92 @@ from unittest.mock import MagicMock, Mock, patch import pytest -from core.indexing_runner import DocumentIsPausedError, IndexingRunner -from models.dataset import Dataset, Document, DocumentSegment +from models.dataset import Dataset, Document from tasks.document_indexing_sync_task import document_indexing_sync_task -# ============================================================================ -# Fixtures -# ============================================================================ - @pytest.fixture -def tenant_id(): - """Generate a unique tenant ID for testing.""" +def dataset_id() -> str: + """Generate a dataset id.""" return str(uuid.uuid4()) @pytest.fixture -def dataset_id(): - """Generate a unique dataset ID for testing.""" +def document_id() -> str: + """Generate a document id.""" return str(uuid.uuid4()) @pytest.fixture -def document_id(): - """Generate a unique document ID for testing.""" +def notion_workspace_id() -> str: + """Generate a notion workspace id.""" return str(uuid.uuid4()) @pytest.fixture -def notion_workspace_id(): - """Generate a Notion workspace ID for testing.""" +def notion_page_id() -> str: + """Generate a notion page id.""" return str(uuid.uuid4()) @pytest.fixture -def notion_page_id(): - """Generate a Notion page ID for testing.""" +def credential_id() -> str: + """Generate a credential id.""" return str(uuid.uuid4()) @pytest.fixture -def credential_id(): - """Generate a credential ID for testing.""" - return str(uuid.uuid4()) - - -@pytest.fixture -def mock_dataset(dataset_id, tenant_id): - """Create a mock Dataset object.""" +def mock_dataset(dataset_id): + """Create a minimal dataset mock used by the task pre-check.""" dataset = Mock(spec=Dataset) dataset.id = dataset_id - dataset.tenant_id = tenant_id - dataset.indexing_technique = "high_quality" - dataset.embedding_model_provider = "openai" - dataset.embedding_model = "text-embedding-ada-002" return dataset @pytest.fixture -def mock_document(document_id, dataset_id, tenant_id, notion_workspace_id, notion_page_id, credential_id): - """Create a mock Document object with Notion data source.""" - doc = Mock(spec=Document) - doc.id = document_id - doc.dataset_id = dataset_id - doc.tenant_id = tenant_id - doc.data_source_type = "notion_import" - doc.indexing_status = "completed" - doc.error = None - doc.stopped_at = None - doc.processing_started_at = None - doc.doc_form = "text_model" - doc.data_source_info_dict = { +def mock_document(document_id, dataset_id, notion_workspace_id, notion_page_id, credential_id): + """Create a minimal notion document mock for collaborator parameter assertions.""" + document = Mock(spec=Document) + document.id = document_id + document.dataset_id = dataset_id + document.tenant_id = str(uuid.uuid4()) + document.data_source_type = "notion_import" + document.indexing_status = "completed" + document.doc_form = "text_model" + document.data_source_info_dict = { "notion_workspace_id": notion_workspace_id, "notion_page_id": notion_page_id, "type": "page", "last_edited_time": "2024-01-01T00:00:00Z", "credential_id": credential_id, } - return doc + return document @pytest.fixture -def mock_document_segments(document_id): - """Create mock DocumentSegment objects.""" - segments = [] - for i in range(3): - segment = Mock(spec=DocumentSegment) - segment.id = str(uuid.uuid4()) - segment.document_id = document_id - segment.index_node_id = f"node-{document_id}-{i}" - segments.append(segment) - return segments +def mock_db_session(mock_document, mock_dataset): + """Mock session_factory.create_session to drive deterministic read-only task flow.""" + with patch("tasks.document_indexing_sync_task.session_factory") as mock_session_factory: + session = MagicMock() + session.scalars.return_value.all.return_value = [] + session.query.return_value.where.return_value.first.side_effect = [mock_document, mock_dataset] + begin_cm = MagicMock() + begin_cm.__enter__.return_value = session + begin_cm.__exit__.return_value = False + session.begin.return_value = begin_cm -@pytest.fixture -def mock_db_session(): - """Mock database session via session_factory.create_session(). + session_cm = MagicMock() + session_cm.__enter__.return_value = session + session_cm.__exit__.return_value = False - After session split refactor, the code calls create_session() multiple times. - This fixture creates shared query mocks so all sessions use the same - query configuration, simulating database persistence across sessions. - - The fixture automatically converts side_effect to cycle to prevent StopIteration. - Tests configure mocks the same way as before, but behind the scenes the values - are cycled infinitely for all sessions. - """ - from itertools import cycle - - with patch("tasks.document_indexing_sync_task.session_factory") as mock_sf: - sessions = [] - - # Shared query mocks - all sessions use these - shared_query = MagicMock() - shared_filter_by = MagicMock() - shared_scalars_result = MagicMock() - - # Create custom first mock that auto-cycles side_effect - class CyclicMock(MagicMock): - def __setattr__(self, name, value): - if name == "side_effect" and value is not None: - # Convert list/tuple to infinite cycle - if isinstance(value, (list, tuple)): - value = cycle(value) - super().__setattr__(name, value) - - shared_query.where.return_value.first = CyclicMock() - shared_filter_by.first = CyclicMock() - - def _create_session(): - """Create a new mock session for each create_session() call.""" - session = MagicMock() - session.close = MagicMock() - session.commit = MagicMock() - - # Mock session.begin() context manager - begin_cm = MagicMock() - begin_cm.__enter__.return_value = session - - def _begin_exit_side_effect(exc_type, exc, tb): - # commit on success - if exc_type is None: - session.commit() - # return False to propagate exceptions - return False - - begin_cm.__exit__.side_effect = _begin_exit_side_effect - session.begin.return_value = begin_cm - - # Mock create_session() context manager - cm = MagicMock() - cm.__enter__.return_value = session - - def _exit_side_effect(exc_type, exc, tb): - session.close() - return False - - cm.__exit__.side_effect = _exit_side_effect - - # All sessions use the same shared query mocks - session.query.return_value = shared_query - shared_query.where.return_value = shared_query - shared_query.filter_by.return_value = shared_filter_by - session.scalars.return_value = shared_scalars_result - - sessions.append(session) - # Attach helpers on the first created session for assertions across all sessions - if len(sessions) == 1: - session.get_all_sessions = lambda: sessions - session.any_close_called = lambda: any(s.close.called for s in sessions) - session.any_commit_called = lambda: any(s.commit.called for s in sessions) - return cm - - mock_sf.create_session.side_effect = _create_session - - # Create first session and return it - _create_session() - yield sessions[0] + mock_session_factory.create_session.return_value = session_cm + yield session @pytest.fixture def mock_datasource_provider_service(): - """Mock DatasourceProviderService.""" + """Mock datasource credential provider.""" with patch("tasks.document_indexing_sync_task.DatasourceProviderService") as mock_service_class: mock_service = MagicMock() mock_service.get_datasource_credentials.return_value = {"integration_secret": "test_token"} @@ -204,314 +105,16 @@ def mock_datasource_provider_service(): @pytest.fixture def mock_notion_extractor(): - """Mock NotionExtractor.""" + """Mock notion extractor class and instance.""" with patch("tasks.document_indexing_sync_task.NotionExtractor") as mock_extractor_class: mock_extractor = MagicMock() - mock_extractor.get_notion_last_edited_time.return_value = "2024-01-02T00:00:00Z" # Updated time + mock_extractor.get_notion_last_edited_time.return_value = "2024-01-01T00:00:00Z" mock_extractor_class.return_value = mock_extractor - yield mock_extractor + yield {"class": mock_extractor_class, "instance": mock_extractor} -@pytest.fixture -def mock_index_processor_factory(): - """Mock IndexProcessorFactory.""" - with patch("tasks.document_indexing_sync_task.IndexProcessorFactory") as mock_factory: - mock_processor = MagicMock() - mock_processor.clean = Mock() - mock_factory.return_value.init_index_processor.return_value = mock_processor - yield mock_factory - - -@pytest.fixture -def mock_indexing_runner(): - """Mock IndexingRunner.""" - with patch("tasks.document_indexing_sync_task.IndexingRunner") as mock_runner_class: - mock_runner = MagicMock(spec=IndexingRunner) - mock_runner.run = Mock() - mock_runner_class.return_value = mock_runner - yield mock_runner - - -# ============================================================================ -# Tests for document_indexing_sync_task -# ============================================================================ - - -class TestDocumentIndexingSyncTask: - """Tests for the document_indexing_sync_task function.""" - - def test_document_not_found(self, mock_db_session, dataset_id, document_id): - """Test that task handles document not found gracefully.""" - # Arrange - mock_db_session.query.return_value.where.return_value.first.return_value = None - - # Act - document_indexing_sync_task(dataset_id, document_id) - - # Assert - at least one session should have been closed - assert mock_db_session.any_close_called() - - def test_missing_notion_workspace_id(self, mock_db_session, mock_document, dataset_id, document_id): - """Test that task raises error when notion_workspace_id is missing.""" - # Arrange - mock_document.data_source_info_dict = {"notion_page_id": "page123", "type": "page"} - mock_db_session.query.return_value.where.return_value.first.return_value = mock_document - - # Act & Assert - with pytest.raises(ValueError, match="no notion page found"): - document_indexing_sync_task(dataset_id, document_id) - - def test_missing_notion_page_id(self, mock_db_session, mock_document, dataset_id, document_id): - """Test that task raises error when notion_page_id is missing.""" - # Arrange - mock_document.data_source_info_dict = {"notion_workspace_id": "ws123", "type": "page"} - mock_db_session.query.return_value.where.return_value.first.return_value = mock_document - - # Act & Assert - with pytest.raises(ValueError, match="no notion page found"): - document_indexing_sync_task(dataset_id, document_id) - - def test_empty_data_source_info(self, mock_db_session, mock_document, dataset_id, document_id): - """Test that task raises error when data_source_info is empty.""" - # Arrange - mock_document.data_source_info_dict = None - mock_db_session.query.return_value.where.return_value.first.return_value = mock_document - - # Act & Assert - with pytest.raises(ValueError, match="no notion page found"): - document_indexing_sync_task(dataset_id, document_id) - - def test_credential_not_found( - self, - mock_db_session, - mock_datasource_provider_service, - mock_document, - dataset_id, - document_id, - ): - """Test that task handles missing credentials by updating document status.""" - # Arrange - mock_db_session.query.return_value.where.return_value.first.return_value = mock_document - mock_db_session.query.return_value.filter_by.return_value.first.return_value = mock_document - mock_datasource_provider_service.get_datasource_credentials.return_value = None - - # Act - document_indexing_sync_task(dataset_id, document_id) - - # Assert - assert mock_document.indexing_status == "error" - assert "Datasource credential not found" in mock_document.error - assert mock_document.stopped_at is not None - assert mock_db_session.any_commit_called() - assert mock_db_session.any_close_called() - - def test_page_not_updated( - self, - mock_db_session, - mock_datasource_provider_service, - mock_notion_extractor, - mock_document, - dataset_id, - document_id, - ): - """Test that task does nothing when page has not been updated.""" - # Arrange - mock_db_session.query.return_value.where.return_value.first.return_value = mock_document - mock_db_session.query.return_value.filter_by.return_value.first.return_value = mock_document - # Return same time as stored in document - mock_notion_extractor.get_notion_last_edited_time.return_value = "2024-01-01T00:00:00Z" - - # Act - document_indexing_sync_task(dataset_id, document_id) - - # Assert - # Document status should remain unchanged - assert mock_document.indexing_status == "completed" - # At least one session should have been closed via context manager teardown - assert mock_db_session.any_close_called() - - def test_successful_sync_when_page_updated( - self, - mock_db_session, - mock_datasource_provider_service, - mock_notion_extractor, - mock_index_processor_factory, - mock_indexing_runner, - mock_dataset, - mock_document, - mock_document_segments, - dataset_id, - document_id, - ): - """Test successful sync flow when Notion page has been updated.""" - # Arrange - # Set exact sequence of returns across calls to `.first()`: - # 1) document (initial fetch) - # 2) dataset (pre-check) - # 3) dataset (cleaning phase) - # 4) document (pre-indexing update) - # 5) document (indexing runner fetch) - mock_db_session.query.return_value.where.return_value.first.side_effect = [ - mock_document, - mock_dataset, - mock_dataset, - mock_document, - mock_document, - ] - mock_db_session.query.return_value.filter_by.return_value.first.return_value = mock_document - mock_db_session.scalars.return_value.all.return_value = mock_document_segments - # NotionExtractor returns updated time - mock_notion_extractor.get_notion_last_edited_time.return_value = "2024-01-02T00:00:00Z" - - # Act - document_indexing_sync_task(dataset_id, document_id) - - # Assert - # Verify document status was updated to parsing - assert mock_document.indexing_status == "parsing" - assert mock_document.processing_started_at is not None - - # Verify segments were cleaned - mock_processor = mock_index_processor_factory.return_value.init_index_processor.return_value - mock_processor.clean.assert_called_once() - - # Verify segments were deleted from database in batch (DELETE FROM document_segments) - # Aggregate execute calls across all created sessions - execute_sqls = [] - for s in mock_db_session.get_all_sessions(): - execute_sqls.extend([" ".join(str(c[0][0]).split()) for c in s.execute.call_args_list]) - assert any("DELETE FROM document_segments" in sql for sql in execute_sqls) - - # Verify indexing runner was called - mock_indexing_runner.run.assert_called_once_with([mock_document]) - - # Verify session operations (across any created session) - assert mock_db_session.any_commit_called() - assert mock_db_session.any_close_called() - - def test_dataset_not_found_during_cleaning( - self, - mock_db_session, - mock_datasource_provider_service, - mock_notion_extractor, - mock_indexing_runner, - mock_document, - dataset_id, - document_id, - ): - """Test that task handles dataset not found during cleaning phase.""" - # Arrange - # Sequence: document (initial), dataset (pre-check), None (cleaning), document (update), document (indexing) - mock_db_session.query.return_value.where.return_value.first.side_effect = [ - mock_document, - mock_dataset, - None, - mock_document, - mock_document, - ] - mock_db_session.query.return_value.filter_by.return_value.first.return_value = mock_document - mock_notion_extractor.get_notion_last_edited_time.return_value = "2024-01-02T00:00:00Z" - - # Act - document_indexing_sync_task(dataset_id, document_id) - - # Assert - # Document should still be set to parsing - assert mock_document.indexing_status == "parsing" - # At least one session should be closed after error - assert mock_db_session.any_close_called() - - def test_cleaning_error_continues_to_indexing( - self, - mock_db_session, - mock_datasource_provider_service, - mock_notion_extractor, - mock_index_processor_factory, - mock_indexing_runner, - mock_dataset, - mock_document, - dataset_id, - document_id, - ): - """Test that indexing continues even if cleaning fails.""" - # Arrange - from itertools import cycle - - mock_db_session.query.return_value.where.return_value.first.side_effect = cycle([mock_document, mock_dataset]) - mock_db_session.query.return_value.filter_by.return_value.first.return_value = mock_document - # Make the cleaning step fail but not the segment fetch - processor = mock_index_processor_factory.return_value.init_index_processor.return_value - processor.clean.side_effect = Exception("Cleaning error") - mock_db_session.scalars.return_value.all.return_value = [] - mock_notion_extractor.get_notion_last_edited_time.return_value = "2024-01-02T00:00:00Z" - - # Act - document_indexing_sync_task(dataset_id, document_id) - - # Assert - # Indexing should still be attempted despite cleaning error - mock_indexing_runner.run.assert_called_once_with([mock_document]) - assert mock_db_session.any_close_called() - - def test_indexing_runner_document_paused_error( - self, - mock_db_session, - mock_datasource_provider_service, - mock_notion_extractor, - mock_index_processor_factory, - mock_indexing_runner, - mock_dataset, - mock_document, - mock_document_segments, - dataset_id, - document_id, - ): - """Test that DocumentIsPausedError is handled gracefully.""" - # Arrange - from itertools import cycle - - mock_db_session.query.return_value.where.return_value.first.side_effect = cycle([mock_document, mock_dataset]) - mock_db_session.query.return_value.filter_by.return_value.first.return_value = mock_document - mock_db_session.scalars.return_value.all.return_value = mock_document_segments - mock_notion_extractor.get_notion_last_edited_time.return_value = "2024-01-02T00:00:00Z" - mock_indexing_runner.run.side_effect = DocumentIsPausedError("Document paused") - - # Act - document_indexing_sync_task(dataset_id, document_id) - - # Assert - # Session should be closed after handling error - assert mock_db_session.any_close_called() - - def test_indexing_runner_general_error( - self, - mock_db_session, - mock_datasource_provider_service, - mock_notion_extractor, - mock_index_processor_factory, - mock_indexing_runner, - mock_dataset, - mock_document, - mock_document_segments, - dataset_id, - document_id, - ): - """Test that general exceptions during indexing are handled.""" - # Arrange - from itertools import cycle - - mock_db_session.query.return_value.where.return_value.first.side_effect = cycle([mock_document, mock_dataset]) - mock_db_session.query.return_value.filter_by.return_value.first.return_value = mock_document - mock_db_session.scalars.return_value.all.return_value = mock_document_segments - mock_notion_extractor.get_notion_last_edited_time.return_value = "2024-01-02T00:00:00Z" - mock_indexing_runner.run.side_effect = Exception("Indexing error") - - # Act - document_indexing_sync_task(dataset_id, document_id) - - # Assert - # Session should be closed after error - assert mock_db_session.any_close_called() +class TestDocumentIndexingSyncTaskCollaboratorParams: + """Unit tests for collaborator parameter passing in document_indexing_sync_task.""" def test_notion_extractor_initialized_with_correct_params( self, @@ -524,27 +127,21 @@ class TestDocumentIndexingSyncTask: notion_workspace_id, notion_page_id, ): - """Test that NotionExtractor is initialized with correct parameters.""" + """Test that NotionExtractor is initialized with expected arguments.""" # Arrange - mock_db_session.query.return_value.where.return_value.first.return_value = mock_document - mock_notion_extractor.get_notion_last_edited_time.return_value = "2024-01-01T00:00:00Z" # No update + expected_token = "test_token" # Act - with patch("tasks.document_indexing_sync_task.NotionExtractor") as mock_extractor_class: - mock_extractor = MagicMock() - mock_extractor.get_notion_last_edited_time.return_value = "2024-01-01T00:00:00Z" - mock_extractor_class.return_value = mock_extractor + document_indexing_sync_task(dataset_id, document_id) - document_indexing_sync_task(dataset_id, document_id) - - # Assert - mock_extractor_class.assert_called_once_with( - notion_workspace_id=notion_workspace_id, - notion_obj_id=notion_page_id, - notion_page_type="page", - notion_access_token="test_token", - tenant_id=mock_document.tenant_id, - ) + # Assert + mock_notion_extractor["class"].assert_called_once_with( + notion_workspace_id=notion_workspace_id, + notion_obj_id=notion_page_id, + notion_page_type="page", + notion_access_token=expected_token, + tenant_id=mock_document.tenant_id, + ) def test_datasource_credentials_requested_correctly( self, @@ -556,17 +153,16 @@ class TestDocumentIndexingSyncTask: document_id, credential_id, ): - """Test that datasource credentials are requested with correct parameters.""" + """Test that datasource credentials are requested with expected identifiers.""" # Arrange - mock_db_session.query.return_value.where.return_value.first.return_value = mock_document - mock_notion_extractor.get_notion_last_edited_time.return_value = "2024-01-01T00:00:00Z" + expected_tenant_id = mock_document.tenant_id # Act document_indexing_sync_task(dataset_id, document_id) # Assert mock_datasource_provider_service.get_datasource_credentials.assert_called_once_with( - tenant_id=mock_document.tenant_id, + tenant_id=expected_tenant_id, credential_id=credential_id, provider="notion_datasource", plugin_id="langgenius/notion_datasource", @@ -581,16 +177,14 @@ class TestDocumentIndexingSyncTask: dataset_id, document_id, ): - """Test that task handles missing credential_id by passing None.""" + """Test that missing credential_id is forwarded as None.""" # Arrange mock_document.data_source_info_dict = { - "notion_workspace_id": "ws123", - "notion_page_id": "page123", + "notion_workspace_id": "workspace-id", + "notion_page_id": "page-id", "type": "page", "last_edited_time": "2024-01-01T00:00:00Z", } - mock_db_session.query.return_value.where.return_value.first.return_value = mock_document - mock_notion_extractor.get_notion_last_edited_time.return_value = "2024-01-01T00:00:00Z" # Act document_indexing_sync_task(dataset_id, document_id) @@ -602,39 +196,3 @@ class TestDocumentIndexingSyncTask: provider="notion_datasource", plugin_id="langgenius/notion_datasource", ) - - def test_index_processor_clean_called_with_correct_params( - self, - mock_db_session, - mock_datasource_provider_service, - mock_notion_extractor, - mock_index_processor_factory, - mock_indexing_runner, - mock_dataset, - mock_document, - mock_document_segments, - dataset_id, - document_id, - ): - """Test that index processor clean is called with correct parameters.""" - # Arrange - # Sequence: document (initial), dataset (pre-check), dataset (cleaning), document (update), document (indexing) - mock_db_session.query.return_value.where.return_value.first.side_effect = [ - mock_document, - mock_dataset, - mock_dataset, - mock_document, - mock_document, - ] - mock_db_session.scalars.return_value.all.return_value = mock_document_segments - mock_notion_extractor.get_notion_last_edited_time.return_value = "2024-01-02T00:00:00Z" - - # Act - document_indexing_sync_task(dataset_id, document_id) - - # Assert - mock_processor = mock_index_processor_factory.return_value.init_index_processor.return_value - expected_node_ids = [seg.index_node_id for seg in mock_document_segments] - mock_processor.clean.assert_called_once_with( - mock_dataset, expected_node_ids, with_keywords=True, delete_child_chunks=True - )