diff --git a/api/core/rag/datasource/vdb/vector_factory.py b/api/core/rag/datasource/vdb/vector_factory.py index 59d7f3c3c4..9575377174 100644 --- a/api/core/rag/datasource/vdb/vector_factory.py +++ b/api/core/rag/datasource/vdb/vector_factory.py @@ -39,6 +39,58 @@ class AbstractVectorFactory(ABC): return index_struct_dict +class _LazyEmbeddings(Embeddings): + """Lazy proxy that defers materializing the real embedding model. + + Constructing the real embeddings (via ``ModelManager.get_model_instance``) + transitively calls ``FeatureService.get_features`` → ``BillingService`` + HTTP GETs (see ``provider_manager.py``). Cleanup paths + (``delete_by_ids`` / ``delete`` / ``text_exists``) do not need embeddings + at all, so deferring this until an ``embed_*`` method is actually invoked + keeps cleanup tasks resilient to transient billing-API failures and avoids + leaving stranded ``document_segments`` / ``child_chunks`` whenever billing + hiccups. + + Existing callers that perform create / search operations are unaffected: + the first ``embed_*`` call materializes the underlying model and the + behavior is identical from that point on. + """ + + def __init__(self, dataset: Dataset): + self._dataset = dataset + self._real: Embeddings | None = None + + def _ensure(self) -> Embeddings: + if self._real is None: + model_manager = ModelManager.for_tenant(tenant_id=self._dataset.tenant_id) + embedding_model = model_manager.get_model_instance( + tenant_id=self._dataset.tenant_id, + provider=self._dataset.embedding_model_provider, + model_type=ModelType.TEXT_EMBEDDING, + model=self._dataset.embedding_model, + ) + self._real = CacheEmbedding(embedding_model) + return self._real + + def embed_documents(self, texts: list[str]) -> list[list[float]]: + return self._ensure().embed_documents(texts) + + def embed_multimodal_documents(self, multimodel_documents: list[dict[str, Any]]) -> list[list[float]]: + return self._ensure().embed_multimodal_documents(multimodel_documents) + + def embed_query(self, text: str) -> list[float]: + return self._ensure().embed_query(text) + + def embed_multimodal_query(self, multimodel_document: dict[str, Any]) -> list[float]: + return self._ensure().embed_multimodal_query(multimodel_document) + + async def aembed_documents(self, texts: list[str]) -> list[list[float]]: + return await self._ensure().aembed_documents(texts) + + async def aembed_query(self, text: str) -> list[float]: + return await self._ensure().aembed_query(text) + + class Vector: def __init__(self, dataset: Dataset, attributes: list | None = None): if attributes is None: @@ -60,7 +112,11 @@ class Vector: "original_chunk_id", ] self._dataset = dataset - self._embeddings = self._get_embeddings() + # Use a lazy proxy so cleanup paths (delete_by_ids / delete / text_exists) + # never transitively trigger billing API calls during ``Vector(dataset)`` + # construction. The real embedding model is materialized only when an + # ``embed_*`` method is actually invoked (i.e. create / search paths). + self._embeddings: Embeddings = _LazyEmbeddings(dataset) self._attributes = attributes self._vector_processor = self._init_vector() diff --git a/api/tasks/clean_document_task.py b/api/tasks/clean_document_task.py index a657cd553a..c8d0e31c06 100644 --- a/api/tasks/clean_document_task.py +++ b/api/tasks/clean_document_task.py @@ -61,13 +61,31 @@ def clean_document_task(document_id: str, dataset_id: str, doc_form: str, file_i # check segment is exist if index_node_ids: - index_processor = IndexProcessorFactory(doc_form).init_index_processor() - with session_factory.create_session() as session: - dataset = session.scalar(select(Dataset).where(Dataset.id == dataset_id).limit(1)) - if dataset: - index_processor.clean( - dataset, index_node_ids, with_keywords=True, delete_child_chunks=True, delete_summaries=True - ) + # Wrap vector / keyword index cleanup in try/except so that a transient + # failure here (e.g. billing API hiccup propagated via FeatureService when + # ModelManager is initialized inside ``Vector(dataset)``) does not abort + # the entire task and leave document_segments / child_chunks / image_files + # / metadata bindings stranded in PG. Mirrors the pattern already used in + # ``clean_dataset_task`` so the document row's hard delete (already + # committed by the caller) does not produce orphan PG rows just because + # the vector backend or one of its transitive dependencies was unhappy. + try: + index_processor = IndexProcessorFactory(doc_form).init_index_processor() + with session_factory.create_session() as session: + dataset = session.scalar(select(Dataset).where(Dataset.id == dataset_id).limit(1)) + if dataset: + index_processor.clean( + dataset, index_node_ids, with_keywords=True, delete_child_chunks=True, delete_summaries=True + ) + except Exception: + logger.exception( + "Failed to clean vector / keyword index in clean_document_task, " + "document_id=%s, dataset_id=%s, index_node_ids_count=%d. " + "Continuing with PG / storage cleanup; vector orphans can be reaped later.", + document_id, + dataset_id, + len(index_node_ids), + ) total_image_files = [] with session_factory.create_session() as session, session.begin(): diff --git a/api/tasks/clean_notion_document_task.py b/api/tasks/clean_notion_document_task.py index e3be24ac74..017d60efac 100644 --- a/api/tasks/clean_notion_document_task.py +++ b/api/tasks/clean_notion_document_task.py @@ -40,12 +40,29 @@ def clean_notion_document_task(document_ids: list[str], dataset_id: str): segments = session.scalars(select(DocumentSegment).where(DocumentSegment.document_id == document_id)).all() total_index_node_ids.extend([segment.index_node_id for segment in segments]) - with session_factory.create_session() as session: - dataset = session.scalar(select(Dataset).where(Dataset.id == dataset_id).limit(1)) - if dataset: - index_processor.clean( - dataset, total_index_node_ids, with_keywords=True, delete_child_chunks=True, delete_summaries=True - ) + # Wrap vector / keyword index cleanup in try/except so that a transient + # failure here (e.g. billing API hiccup propagated via FeatureService when + # ``ModelManager`` is initialized inside ``Vector(dataset)``) does not abort + # the task and leave the already-deleted documents' segments stranded in PG. + # The Document rows are hard-deleted in the previous session block, so any + # exception escaping this task would produce orphans that no later request + # can reference back. Mirrors the pattern in ``clean_dataset_task``. + try: + with session_factory.create_session() as session: + dataset = session.scalar(select(Dataset).where(Dataset.id == dataset_id).limit(1)) + if dataset: + index_processor.clean( + dataset, total_index_node_ids, with_keywords=True, delete_child_chunks=True, delete_summaries=True + ) + except Exception: + logger.exception( + "Failed to clean vector / keyword index in clean_notion_document_task, " + "dataset_id=%s, document_ids=%s, index_node_ids_count=%d. " + "Continuing with segment deletion; vector orphans can be reaped later.", + dataset_id, + document_ids, + len(total_index_node_ids), + ) with session_factory.create_session() as session, session.begin(): segment_delete_stmt = delete(DocumentSegment).where(DocumentSegment.document_id.in_(document_ids)) diff --git a/api/tests/test_containers_integration_tests/tasks/test_clean_notion_document_task.py b/api/tests/test_containers_integration_tests/tasks/test_clean_notion_document_task.py index fa3ac12cf0..7e5c374b5d 100644 --- a/api/tests/test_containers_integration_tests/tasks/test_clean_notion_document_task.py +++ b/api/tests/test_containers_integration_tests/tasks/test_clean_notion_document_task.py @@ -602,14 +602,25 @@ class TestCleanNotionDocumentTask: # Note: This test successfully verifies database operations. # IndexProcessor verification would require more sophisticated mocking. - def test_clean_notion_document_task_database_transaction_rollback( + def test_clean_notion_document_task_continues_when_index_processor_fails( self, db_session_with_containers, mock_index_processor_factory, mock_external_service_dependencies ): """ - Test cleanup task behavior when database operations fail. + Index processor failure (e.g. transient billing API error propagated via + ``FeatureService`` when ``Vector(dataset)`` lazily resolves the embedding + model) must NOT abort the cleanup task. The Document rows have already + been hard-deleted in the first session block before vector cleanup runs, + so any uncaught exception escaping the task would strand + ``DocumentSegment`` rows in PG with no parent ``Document``. - This test verifies that the task properly handles database errors - and maintains data consistency. + Contract: the task swallows the index_processor exception, logs it, and + proceeds to delete the segments — leaving PG consistent. (Vector orphans, + if any, can be reaped later by an offline scanner.) + + Regression guard for the production incident where ``clean_document_task`` + / ``clean_notion_document_task`` failed with + ``ValueError("Unable to retrieve billing information...")`` and left + tens of thousands of orphan segments per affected tenant. """ fake = Faker() @@ -672,17 +683,28 @@ class TestCleanNotionDocumentTask: db_session_with_containers.add(segment) db_session_with_containers.commit() - # Mock index processor to raise an exception + # Simulate the production failure mode: index_processor.clean() raises a + # ValueError mirroring ``BillingService._send_request`` returning non-200. mock_index_processor = mock_index_processor_factory.return_value.init_index_processor.return_value - mock_index_processor.clean.side_effect = Exception("Index processor error") + mock_index_processor.clean.side_effect = ValueError( + "Unable to retrieve billing information. Please try again later or contact support." + ) - # Execute cleanup task - current implementation propagates the exception - with pytest.raises(Exception, match="Index processor error"): - clean_notion_document_task([document.id], dataset.id) + # Execute cleanup task — must NOT raise even though clean() raises. + # Before the safety-net wrapper this would have re-raised the ValueError, + # aborting the task and leaving DocumentSegment stranded in PG. + clean_notion_document_task([document.id], dataset.id) - # Note: This test demonstrates the task's error handling capability. - # Even with external service errors, the database operations complete successfully. - # In a production environment, proper error handling would determine transaction rollback behavior. + # Vector cleanup was attempted exactly once. + mock_index_processor.clean.assert_called_once() + + # The crucial assertion: despite the index processor failure, the + # final session block (line 51-52, ``DELETE FROM document_segments``) + # still ran and committed. This is what the wrapper buys us — without + # it the production incident left tens of thousands of orphan segments + # per affected tenant. Aligns with the assertion shape used by the + # happy-path test (``test_clean_notion_document_task_success``). + assert _count_segments(db_session_with_containers, DocumentSegment.document_id == document.id) == 0 def test_clean_notion_document_task_with_large_number_of_documents( self, db_session_with_containers, mock_index_processor_factory, mock_external_service_dependencies diff --git a/api/tests/unit_tests/core/rag/datasource/vdb/test_vector_factory.py b/api/tests/unit_tests/core/rag/datasource/vdb/test_vector_factory.py index 9de04c80ba..f84ce2771f 100644 --- a/api/tests/unit_tests/core/rag/datasource/vdb/test_vector_factory.py +++ b/api/tests/unit_tests/core/rag/datasource/vdb/test_vector_factory.py @@ -146,10 +146,7 @@ def test_get_vector_factory_entry_point_overrides_builtin(vector_factory_module, def test_vector_init_uses_default_and_custom_attributes(vector_factory_module): dataset = SimpleNamespace(id="dataset-1") - with ( - patch.object(vector_factory_module.Vector, "_get_embeddings", return_value="embeddings"), - patch.object(vector_factory_module.Vector, "_init_vector", return_value="processor"), - ): + with patch.object(vector_factory_module.Vector, "_init_vector", return_value="processor"): default_vector = vector_factory_module.Vector(dataset) custom_vector = vector_factory_module.Vector(dataset, attributes=["doc_id"]) @@ -166,10 +163,57 @@ def test_vector_init_uses_default_and_custom_attributes(vector_factory_module): "original_chunk_id", ] assert custom_vector._attributes == ["doc_id"] - assert default_vector._embeddings == "embeddings" + # ``_embeddings`` is now a lazy proxy that defers materializing the real + # embedding model until ``embed_*`` is invoked, so cleanup paths never + # trigger billing/feature-service calls during ``Vector(dataset)`` + # construction. See ``_LazyEmbeddings``. + assert isinstance(default_vector._embeddings, vector_factory_module._LazyEmbeddings) assert default_vector._vector_processor == "processor" +def test_lazy_embeddings_defer_real_load_until_first_embed_call(vector_factory_module, monkeypatch): + """``Vector(dataset)`` must not transitively call ``ModelManager`` during + construction. The real embedding model should only be materialized on the + first ``embed_*`` call (i.e. create / search paths) so cleanup paths + (``delete_by_ids`` / ``delete``) remain resilient to billing-API failures. + """ + for_tenant_mock = MagicMock(side_effect=AssertionError("ModelManager.for_tenant must not be called eagerly")) + monkeypatch.setattr(vector_factory_module.ModelManager, "for_tenant", for_tenant_mock) + + dataset = SimpleNamespace( + tenant_id="tenant-1", + embedding_model_provider="openai", + embedding_model="text-embedding-3-small", + ) + + proxy = vector_factory_module._LazyEmbeddings(dataset) + + # Construction alone does not trigger ModelManager / FeatureService / BillingService. + for_tenant_mock.assert_not_called() + + # Exercising an embed_* method materializes the real model exactly once. + inner_model = MagicMock() + inner_model.embed_documents.return_value = [[0.1, 0.2]] + cached_embedding_mock = MagicMock(return_value=inner_model) + real_for_tenant = MagicMock() + real_for_tenant.get_model_instance.return_value = "embedding-model-instance" + monkeypatch.setattr(vector_factory_module.ModelManager, "for_tenant", MagicMock(return_value=real_for_tenant)) + monkeypatch.setattr(vector_factory_module, "CacheEmbedding", cached_embedding_mock) + + result = proxy.embed_documents(["hello"]) + + assert result == [[0.1, 0.2]] + cached_embedding_mock.assert_called_once_with("embedding-model-instance") + inner_model.embed_documents.assert_called_once_with(["hello"]) + + # Subsequent calls reuse the materialized model (no re-resolution). + inner_model.embed_documents.reset_mock() + cached_embedding_mock.reset_mock() + proxy.embed_documents(["world"]) + cached_embedding_mock.assert_not_called() + inner_model.embed_documents.assert_called_once_with(["world"]) + + def test_init_vector_prefers_dataset_index_struct(vector_factory_module, monkeypatch): calls = {"vector_type": None, "init_args": None} diff --git a/api/tests/unit_tests/tasks/test_clean_document_task.py b/api/tests/unit_tests/tasks/test_clean_document_task.py new file mode 100644 index 0000000000..26d7b3e3b6 --- /dev/null +++ b/api/tests/unit_tests/tasks/test_clean_document_task.py @@ -0,0 +1,291 @@ +""" +Unit tests for clean_document_task. + +Focuses on the resilience contract added by the billing-failure fix: +``index_processor.clean()`` is wrapped in ``try/except`` so that a transient +failure inside the vector / keyword cleanup (e.g. ``ValueError("Unable to +retrieve billing information...")`` raised by ``BillingService._send_request`` +when ``Vector(dataset)`` transitively triggers ``FeatureService.get_features``) +does not abort the entire task and leave PG with stranded ``DocumentSegment`` +/ ``ChildChunk`` / ``UploadFile`` / ``DatasetMetadataBinding`` rows. +""" + +import uuid +from unittest.mock import MagicMock, patch + +import pytest + +from tasks.clean_document_task import clean_document_task + + +@pytest.fixture +def document_id(): + return str(uuid.uuid4()) + + +@pytest.fixture +def dataset_id(): + return str(uuid.uuid4()) + + +@pytest.fixture +def tenant_id(): + return str(uuid.uuid4()) + + +@pytest.fixture +def mock_session_factory(): + """Patch ``session_factory.create_session`` to return per-call mock sessions. + + Each call to ``create_session()`` yields a fresh ``MagicMock`` session so we + can assert ``execute()`` calls across the multiple short-lived transactions + used by ``clean_document_task``. + """ + with patch("tasks.clean_document_task.session_factory", autospec=True) as mock_sf: + sessions: list[MagicMock] = [] + + def _create_session(): + session = MagicMock() + session.scalars.return_value.all.return_value = [] + session.execute.return_value.all.return_value = [] + session.scalar.return_value = None + cm = MagicMock() + cm.__enter__.return_value = session + cm.__exit__.return_value = None + sessions.append(session) + return cm + + mock_sf.create_session.side_effect = _create_session + yield mock_sf, sessions + + +@pytest.fixture +def mock_storage(): + with patch("tasks.clean_document_task.storage", autospec=True) as mock: + mock.delete.return_value = None + yield mock + + +@pytest.fixture +def mock_index_processor_factory(): + """Mock ``IndexProcessorFactory`` so we can inject behavior into ``clean``.""" + with patch("tasks.clean_document_task.IndexProcessorFactory", autospec=True) as factory_cls: + processor = MagicMock() + processor.clean.return_value = None + factory_instance = MagicMock() + factory_instance.init_index_processor.return_value = processor + factory_cls.return_value = factory_instance + + yield { + "factory_cls": factory_cls, + "factory_instance": factory_instance, + "processor": processor, + } + + +def _build_segment(segment_id: str, content: str = "segment content") -> MagicMock: + seg = MagicMock() + seg.id = segment_id + seg.index_node_id = f"node-{segment_id}" + seg.content = content + return seg + + +def _build_dataset(dataset_id: str, tenant_id: str) -> MagicMock: + ds = MagicMock() + ds.id = dataset_id + ds.tenant_id = tenant_id + return ds + + +class TestVectorCleanupResilience: + """Vector / keyword cleanup must not abort the task on transient failure.""" + + def test_billing_failure_during_vector_cleanup_does_not_skip_pg_cleanup( + self, + document_id, + dataset_id, + tenant_id, + mock_session_factory, + mock_storage, + mock_index_processor_factory, + ): + """Reproduces the production incident: + + ``Vector(dataset)`` transitively calls ``FeatureService.get_features`` + which calls ``BillingService._send_request("GET", ...)``. When billing + returns non-200 it raises ``ValueError("Unable to retrieve billing + information...")``. Before the fix this propagated out of + ``clean_document_task`` and left ``DocumentSegment`` / ``ChildChunk`` / + ``UploadFile`` / ``DatasetMetadataBinding`` rows orphaned because the + already-deleted ``Document`` row had been hard-committed by the caller + (``dataset_service.delete_document``) before ``.delay()`` was invoked. + + Contract: a billing failure inside ``index_processor.clean()`` must be + caught, logged, and the rest of the task must continue so PG ends up + consistent with the deleted ``Document`` even if Qdrant retains + orphan vectors that can be reaped later. + """ + mock_sf, sessions = mock_session_factory + + # First create_session(): Step 1 (load segments + attachments). + step1_session = MagicMock() + step1_session.scalars.return_value.all.return_value = [ + _build_segment("seg-1"), + _build_segment("seg-2"), + ] + step1_session.execute.return_value.all.return_value = [] + step1_session.scalar.return_value = _build_dataset(dataset_id, tenant_id) + # Second create_session(): Step 2 (vector cleanup). Returns dataset. + step2_session = MagicMock() + step2_session.scalar.return_value = _build_dataset(dataset_id, tenant_id) + step2_session.scalars.return_value.all.return_value = [] + step2_session.execute.return_value.all.return_value = [] + # Subsequent sessions: Step 3+ (image / segment / file / metadata cleanup). + # Default fixture returns empty results which is fine for these short txns. + cm1, cm2 = MagicMock(), MagicMock() + cm1.__enter__.return_value = step1_session + cm1.__exit__.return_value = None + cm2.__enter__.return_value = step2_session + cm2.__exit__.return_value = None + + def _default_cm(): + session = MagicMock() + session.scalars.return_value.all.return_value = [] + session.execute.return_value.all.return_value = [] + session.scalar.return_value = None + cm = MagicMock() + cm.__enter__.return_value = session + cm.__exit__.return_value = None + sessions.append(session) + return cm + + mock_sf.create_session.side_effect = [cm1, cm2] + [_default_cm() for _ in range(10)] + + # Simulate the production failure: index_processor.clean() raises ValueError + # mirroring BillingService._send_request when billing returns non-200. + mock_index_processor_factory["processor"].clean.side_effect = ValueError( + "Unable to retrieve billing information. Please try again later or contact support." + ) + + # Act — must not raise out of the task even though clean() raises. + clean_document_task( + document_id=document_id, + dataset_id=dataset_id, + doc_form="paragraph", + file_id=None, + ) + + # Assert + # 1. Vector cleanup was attempted. + mock_index_processor_factory["processor"].clean.assert_called_once() + # 2. Despite the failure the task continued: at least one DocumentSegment + # delete was issued. We use the count of session.execute calls across + # later short transactions as a proxy for "Step 3+ executed". + execute_calls = sum(s.execute.call_count for s in sessions) + assert execute_calls > 0, ( + "Step 3+ DB cleanup did not run after vector cleanup failure; " + "this regression would re-introduce the orphan-segment bug." + ) + + def test_vector_cleanup_success_path_remains_unaffected( + self, + document_id, + dataset_id, + tenant_id, + mock_session_factory, + mock_storage, + mock_index_processor_factory, + ): + """Backward-compat: the happy path must still call ``clean()`` exactly + once with the expected arguments and complete without errors. + """ + mock_sf, sessions = mock_session_factory + + step1_session = MagicMock() + step1_session.scalars.return_value.all.return_value = [_build_segment("seg-1")] + step1_session.execute.return_value.all.return_value = [] + step1_session.scalar.return_value = _build_dataset(dataset_id, tenant_id) + step2_session = MagicMock() + step2_session.scalar.return_value = _build_dataset(dataset_id, tenant_id) + step2_session.scalars.return_value.all.return_value = [] + step2_session.execute.return_value.all.return_value = [] + cm1, cm2 = MagicMock(), MagicMock() + cm1.__enter__.return_value = step1_session + cm1.__exit__.return_value = None + cm2.__enter__.return_value = step2_session + cm2.__exit__.return_value = None + + def _default_cm(): + session = MagicMock() + session.scalars.return_value.all.return_value = [] + session.execute.return_value.all.return_value = [] + session.scalar.return_value = None + cm = MagicMock() + cm.__enter__.return_value = session + cm.__exit__.return_value = None + sessions.append(session) + return cm + + mock_sf.create_session.side_effect = [cm1, cm2] + [_default_cm() for _ in range(10)] + + clean_document_task( + document_id=document_id, + dataset_id=dataset_id, + doc_form="paragraph", + file_id=None, + ) + + assert mock_index_processor_factory["processor"].clean.call_count == 1 + # Index cleanup invoked with the expected delete_summaries / delete_child_chunks flags. + _, kwargs = mock_index_processor_factory["processor"].clean.call_args + assert kwargs.get("with_keywords") is True + assert kwargs.get("delete_child_chunks") is True + assert kwargs.get("delete_summaries") is True + + def test_no_segments_skips_vector_cleanup( + self, + document_id, + dataset_id, + tenant_id, + mock_session_factory, + mock_storage, + mock_index_processor_factory, + ): + """When the document has no segments (e.g. indexing failed before + producing any), vector cleanup must not be attempted — and therefore + the new try/except wrapper does not change behavior here. + """ + mock_sf, sessions = mock_session_factory + + step1_session = MagicMock() + step1_session.scalars.return_value.all.return_value = [] # no segments + step1_session.execute.return_value.all.return_value = [] + step1_session.scalar.return_value = _build_dataset(dataset_id, tenant_id) + cm1 = MagicMock() + cm1.__enter__.return_value = step1_session + cm1.__exit__.return_value = None + + def _default_cm(): + session = MagicMock() + session.scalars.return_value.all.return_value = [] + session.execute.return_value.all.return_value = [] + session.scalar.return_value = None + cm = MagicMock() + cm.__enter__.return_value = session + cm.__exit__.return_value = None + sessions.append(session) + return cm + + mock_sf.create_session.side_effect = [cm1] + [_default_cm() for _ in range(10)] + + clean_document_task( + document_id=document_id, + dataset_id=dataset_id, + doc_form="paragraph", + file_id=None, + ) + + # Vector cleanup is gated on ``index_node_ids``; when there are no + # segments the IndexProcessorFactory path is never entered. + mock_index_processor_factory["factory_cls"].assert_not_called()