fix: keep cleanup tasks resilient to billing API failures (#35600)

This commit is contained in:
zyssyz123 2026-04-27 16:51:09 +08:00 committed by fatelei
parent e22b03797c
commit e73f720505
No known key found for this signature in database
GPG Key ID: 2F91DA05646F4EED
6 changed files with 479 additions and 31 deletions

View File

@ -39,6 +39,58 @@ class AbstractVectorFactory(ABC):
return index_struct_dict
class _LazyEmbeddings(Embeddings):
"""Lazy proxy that defers materializing the real embedding model.
Constructing the real embeddings (via ``ModelManager.get_model_instance``)
transitively calls ``FeatureService.get_features`` ``BillingService``
HTTP GETs (see ``provider_manager.py``). Cleanup paths
(``delete_by_ids`` / ``delete`` / ``text_exists``) do not need embeddings
at all, so deferring this until an ``embed_*`` method is actually invoked
keeps cleanup tasks resilient to transient billing-API failures and avoids
leaving stranded ``document_segments`` / ``child_chunks`` whenever billing
hiccups.
Existing callers that perform create / search operations are unaffected:
the first ``embed_*`` call materializes the underlying model and the
behavior is identical from that point on.
"""
def __init__(self, dataset: Dataset):
self._dataset = dataset
self._real: Embeddings | None = None
def _ensure(self) -> Embeddings:
if self._real is None:
model_manager = ModelManager.for_tenant(tenant_id=self._dataset.tenant_id)
embedding_model = model_manager.get_model_instance(
tenant_id=self._dataset.tenant_id,
provider=self._dataset.embedding_model_provider,
model_type=ModelType.TEXT_EMBEDDING,
model=self._dataset.embedding_model,
)
self._real = CacheEmbedding(embedding_model)
return self._real
def embed_documents(self, texts: list[str]) -> list[list[float]]:
return self._ensure().embed_documents(texts)
def embed_multimodal_documents(self, multimodel_documents: list[dict[str, Any]]) -> list[list[float]]:
return self._ensure().embed_multimodal_documents(multimodel_documents)
def embed_query(self, text: str) -> list[float]:
return self._ensure().embed_query(text)
def embed_multimodal_query(self, multimodel_document: dict[str, Any]) -> list[float]:
return self._ensure().embed_multimodal_query(multimodel_document)
async def aembed_documents(self, texts: list[str]) -> list[list[float]]:
return await self._ensure().aembed_documents(texts)
async def aembed_query(self, text: str) -> list[float]:
return await self._ensure().aembed_query(text)
class Vector:
def __init__(self, dataset: Dataset, attributes: list | None = None):
if attributes is None:
@ -60,7 +112,11 @@ class Vector:
"original_chunk_id",
]
self._dataset = dataset
self._embeddings = self._get_embeddings()
# Use a lazy proxy so cleanup paths (delete_by_ids / delete / text_exists)
# never transitively trigger billing API calls during ``Vector(dataset)``
# construction. The real embedding model is materialized only when an
# ``embed_*`` method is actually invoked (i.e. create / search paths).
self._embeddings: Embeddings = _LazyEmbeddings(dataset)
self._attributes = attributes
self._vector_processor = self._init_vector()

View File

@ -61,13 +61,31 @@ def clean_document_task(document_id: str, dataset_id: str, doc_form: str, file_i
# check segment is exist
if index_node_ids:
index_processor = IndexProcessorFactory(doc_form).init_index_processor()
with session_factory.create_session() as session:
dataset = session.scalar(select(Dataset).where(Dataset.id == dataset_id).limit(1))
if dataset:
index_processor.clean(
dataset, index_node_ids, with_keywords=True, delete_child_chunks=True, delete_summaries=True
)
# Wrap vector / keyword index cleanup in try/except so that a transient
# failure here (e.g. billing API hiccup propagated via FeatureService when
# ModelManager is initialized inside ``Vector(dataset)``) does not abort
# the entire task and leave document_segments / child_chunks / image_files
# / metadata bindings stranded in PG. Mirrors the pattern already used in
# ``clean_dataset_task`` so the document row's hard delete (already
# committed by the caller) does not produce orphan PG rows just because
# the vector backend or one of its transitive dependencies was unhappy.
try:
index_processor = IndexProcessorFactory(doc_form).init_index_processor()
with session_factory.create_session() as session:
dataset = session.scalar(select(Dataset).where(Dataset.id == dataset_id).limit(1))
if dataset:
index_processor.clean(
dataset, index_node_ids, with_keywords=True, delete_child_chunks=True, delete_summaries=True
)
except Exception:
logger.exception(
"Failed to clean vector / keyword index in clean_document_task, "
"document_id=%s, dataset_id=%s, index_node_ids_count=%d. "
"Continuing with PG / storage cleanup; vector orphans can be reaped later.",
document_id,
dataset_id,
len(index_node_ids),
)
total_image_files = []
with session_factory.create_session() as session, session.begin():

View File

@ -40,12 +40,29 @@ def clean_notion_document_task(document_ids: list[str], dataset_id: str):
segments = session.scalars(select(DocumentSegment).where(DocumentSegment.document_id == document_id)).all()
total_index_node_ids.extend([segment.index_node_id for segment in segments])
with session_factory.create_session() as session:
dataset = session.scalar(select(Dataset).where(Dataset.id == dataset_id).limit(1))
if dataset:
index_processor.clean(
dataset, total_index_node_ids, with_keywords=True, delete_child_chunks=True, delete_summaries=True
)
# Wrap vector / keyword index cleanup in try/except so that a transient
# failure here (e.g. billing API hiccup propagated via FeatureService when
# ``ModelManager`` is initialized inside ``Vector(dataset)``) does not abort
# the task and leave the already-deleted documents' segments stranded in PG.
# The Document rows are hard-deleted in the previous session block, so any
# exception escaping this task would produce orphans that no later request
# can reference back. Mirrors the pattern in ``clean_dataset_task``.
try:
with session_factory.create_session() as session:
dataset = session.scalar(select(Dataset).where(Dataset.id == dataset_id).limit(1))
if dataset:
index_processor.clean(
dataset, total_index_node_ids, with_keywords=True, delete_child_chunks=True, delete_summaries=True
)
except Exception:
logger.exception(
"Failed to clean vector / keyword index in clean_notion_document_task, "
"dataset_id=%s, document_ids=%s, index_node_ids_count=%d. "
"Continuing with segment deletion; vector orphans can be reaped later.",
dataset_id,
document_ids,
len(total_index_node_ids),
)
with session_factory.create_session() as session, session.begin():
segment_delete_stmt = delete(DocumentSegment).where(DocumentSegment.document_id.in_(document_ids))

View File

@ -602,14 +602,25 @@ class TestCleanNotionDocumentTask:
# Note: This test successfully verifies database operations.
# IndexProcessor verification would require more sophisticated mocking.
def test_clean_notion_document_task_database_transaction_rollback(
def test_clean_notion_document_task_continues_when_index_processor_fails(
self, db_session_with_containers, mock_index_processor_factory, mock_external_service_dependencies
):
"""
Test cleanup task behavior when database operations fail.
Index processor failure (e.g. transient billing API error propagated via
``FeatureService`` when ``Vector(dataset)`` lazily resolves the embedding
model) must NOT abort the cleanup task. The Document rows have already
been hard-deleted in the first session block before vector cleanup runs,
so any uncaught exception escaping the task would strand
``DocumentSegment`` rows in PG with no parent ``Document``.
This test verifies that the task properly handles database errors
and maintains data consistency.
Contract: the task swallows the index_processor exception, logs it, and
proceeds to delete the segments leaving PG consistent. (Vector orphans,
if any, can be reaped later by an offline scanner.)
Regression guard for the production incident where ``clean_document_task``
/ ``clean_notion_document_task`` failed with
``ValueError("Unable to retrieve billing information...")`` and left
tens of thousands of orphan segments per affected tenant.
"""
fake = Faker()
@ -672,17 +683,28 @@ class TestCleanNotionDocumentTask:
db_session_with_containers.add(segment)
db_session_with_containers.commit()
# Mock index processor to raise an exception
# Simulate the production failure mode: index_processor.clean() raises a
# ValueError mirroring ``BillingService._send_request`` returning non-200.
mock_index_processor = mock_index_processor_factory.return_value.init_index_processor.return_value
mock_index_processor.clean.side_effect = Exception("Index processor error")
mock_index_processor.clean.side_effect = ValueError(
"Unable to retrieve billing information. Please try again later or contact support."
)
# Execute cleanup task - current implementation propagates the exception
with pytest.raises(Exception, match="Index processor error"):
clean_notion_document_task([document.id], dataset.id)
# Execute cleanup task — must NOT raise even though clean() raises.
# Before the safety-net wrapper this would have re-raised the ValueError,
# aborting the task and leaving DocumentSegment stranded in PG.
clean_notion_document_task([document.id], dataset.id)
# Note: This test demonstrates the task's error handling capability.
# Even with external service errors, the database operations complete successfully.
# In a production environment, proper error handling would determine transaction rollback behavior.
# Vector cleanup was attempted exactly once.
mock_index_processor.clean.assert_called_once()
# The crucial assertion: despite the index processor failure, the
# final session block (line 51-52, ``DELETE FROM document_segments``)
# still ran and committed. This is what the wrapper buys us — without
# it the production incident left tens of thousands of orphan segments
# per affected tenant. Aligns with the assertion shape used by the
# happy-path test (``test_clean_notion_document_task_success``).
assert _count_segments(db_session_with_containers, DocumentSegment.document_id == document.id) == 0
def test_clean_notion_document_task_with_large_number_of_documents(
self, db_session_with_containers, mock_index_processor_factory, mock_external_service_dependencies

View File

@ -146,10 +146,7 @@ def test_get_vector_factory_entry_point_overrides_builtin(vector_factory_module,
def test_vector_init_uses_default_and_custom_attributes(vector_factory_module):
dataset = SimpleNamespace(id="dataset-1")
with (
patch.object(vector_factory_module.Vector, "_get_embeddings", return_value="embeddings"),
patch.object(vector_factory_module.Vector, "_init_vector", return_value="processor"),
):
with patch.object(vector_factory_module.Vector, "_init_vector", return_value="processor"):
default_vector = vector_factory_module.Vector(dataset)
custom_vector = vector_factory_module.Vector(dataset, attributes=["doc_id"])
@ -166,10 +163,57 @@ def test_vector_init_uses_default_and_custom_attributes(vector_factory_module):
"original_chunk_id",
]
assert custom_vector._attributes == ["doc_id"]
assert default_vector._embeddings == "embeddings"
# ``_embeddings`` is now a lazy proxy that defers materializing the real
# embedding model until ``embed_*`` is invoked, so cleanup paths never
# trigger billing/feature-service calls during ``Vector(dataset)``
# construction. See ``_LazyEmbeddings``.
assert isinstance(default_vector._embeddings, vector_factory_module._LazyEmbeddings)
assert default_vector._vector_processor == "processor"
def test_lazy_embeddings_defer_real_load_until_first_embed_call(vector_factory_module, monkeypatch):
"""``Vector(dataset)`` must not transitively call ``ModelManager`` during
construction. The real embedding model should only be materialized on the
first ``embed_*`` call (i.e. create / search paths) so cleanup paths
(``delete_by_ids`` / ``delete``) remain resilient to billing-API failures.
"""
for_tenant_mock = MagicMock(side_effect=AssertionError("ModelManager.for_tenant must not be called eagerly"))
monkeypatch.setattr(vector_factory_module.ModelManager, "for_tenant", for_tenant_mock)
dataset = SimpleNamespace(
tenant_id="tenant-1",
embedding_model_provider="openai",
embedding_model="text-embedding-3-small",
)
proxy = vector_factory_module._LazyEmbeddings(dataset)
# Construction alone does not trigger ModelManager / FeatureService / BillingService.
for_tenant_mock.assert_not_called()
# Exercising an embed_* method materializes the real model exactly once.
inner_model = MagicMock()
inner_model.embed_documents.return_value = [[0.1, 0.2]]
cached_embedding_mock = MagicMock(return_value=inner_model)
real_for_tenant = MagicMock()
real_for_tenant.get_model_instance.return_value = "embedding-model-instance"
monkeypatch.setattr(vector_factory_module.ModelManager, "for_tenant", MagicMock(return_value=real_for_tenant))
monkeypatch.setattr(vector_factory_module, "CacheEmbedding", cached_embedding_mock)
result = proxy.embed_documents(["hello"])
assert result == [[0.1, 0.2]]
cached_embedding_mock.assert_called_once_with("embedding-model-instance")
inner_model.embed_documents.assert_called_once_with(["hello"])
# Subsequent calls reuse the materialized model (no re-resolution).
inner_model.embed_documents.reset_mock()
cached_embedding_mock.reset_mock()
proxy.embed_documents(["world"])
cached_embedding_mock.assert_not_called()
inner_model.embed_documents.assert_called_once_with(["world"])
def test_init_vector_prefers_dataset_index_struct(vector_factory_module, monkeypatch):
calls = {"vector_type": None, "init_args": None}

View File

@ -0,0 +1,291 @@
"""
Unit tests for clean_document_task.
Focuses on the resilience contract added by the billing-failure fix:
``index_processor.clean()`` is wrapped in ``try/except`` so that a transient
failure inside the vector / keyword cleanup (e.g. ``ValueError("Unable to
retrieve billing information...")`` raised by ``BillingService._send_request``
when ``Vector(dataset)`` transitively triggers ``FeatureService.get_features``)
does not abort the entire task and leave PG with stranded ``DocumentSegment``
/ ``ChildChunk`` / ``UploadFile`` / ``DatasetMetadataBinding`` rows.
"""
import uuid
from unittest.mock import MagicMock, patch
import pytest
from tasks.clean_document_task import clean_document_task
@pytest.fixture
def document_id():
return str(uuid.uuid4())
@pytest.fixture
def dataset_id():
return str(uuid.uuid4())
@pytest.fixture
def tenant_id():
return str(uuid.uuid4())
@pytest.fixture
def mock_session_factory():
"""Patch ``session_factory.create_session`` to return per-call mock sessions.
Each call to ``create_session()`` yields a fresh ``MagicMock`` session so we
can assert ``execute()`` calls across the multiple short-lived transactions
used by ``clean_document_task``.
"""
with patch("tasks.clean_document_task.session_factory", autospec=True) as mock_sf:
sessions: list[MagicMock] = []
def _create_session():
session = MagicMock()
session.scalars.return_value.all.return_value = []
session.execute.return_value.all.return_value = []
session.scalar.return_value = None
cm = MagicMock()
cm.__enter__.return_value = session
cm.__exit__.return_value = None
sessions.append(session)
return cm
mock_sf.create_session.side_effect = _create_session
yield mock_sf, sessions
@pytest.fixture
def mock_storage():
with patch("tasks.clean_document_task.storage", autospec=True) as mock:
mock.delete.return_value = None
yield mock
@pytest.fixture
def mock_index_processor_factory():
"""Mock ``IndexProcessorFactory`` so we can inject behavior into ``clean``."""
with patch("tasks.clean_document_task.IndexProcessorFactory", autospec=True) as factory_cls:
processor = MagicMock()
processor.clean.return_value = None
factory_instance = MagicMock()
factory_instance.init_index_processor.return_value = processor
factory_cls.return_value = factory_instance
yield {
"factory_cls": factory_cls,
"factory_instance": factory_instance,
"processor": processor,
}
def _build_segment(segment_id: str, content: str = "segment content") -> MagicMock:
seg = MagicMock()
seg.id = segment_id
seg.index_node_id = f"node-{segment_id}"
seg.content = content
return seg
def _build_dataset(dataset_id: str, tenant_id: str) -> MagicMock:
ds = MagicMock()
ds.id = dataset_id
ds.tenant_id = tenant_id
return ds
class TestVectorCleanupResilience:
"""Vector / keyword cleanup must not abort the task on transient failure."""
def test_billing_failure_during_vector_cleanup_does_not_skip_pg_cleanup(
self,
document_id,
dataset_id,
tenant_id,
mock_session_factory,
mock_storage,
mock_index_processor_factory,
):
"""Reproduces the production incident:
``Vector(dataset)`` transitively calls ``FeatureService.get_features``
which calls ``BillingService._send_request("GET", ...)``. When billing
returns non-200 it raises ``ValueError("Unable to retrieve billing
information...")``. Before the fix this propagated out of
``clean_document_task`` and left ``DocumentSegment`` / ``ChildChunk`` /
``UploadFile`` / ``DatasetMetadataBinding`` rows orphaned because the
already-deleted ``Document`` row had been hard-committed by the caller
(``dataset_service.delete_document``) before ``.delay()`` was invoked.
Contract: a billing failure inside ``index_processor.clean()`` must be
caught, logged, and the rest of the task must continue so PG ends up
consistent with the deleted ``Document`` even if Qdrant retains
orphan vectors that can be reaped later.
"""
mock_sf, sessions = mock_session_factory
# First create_session(): Step 1 (load segments + attachments).
step1_session = MagicMock()
step1_session.scalars.return_value.all.return_value = [
_build_segment("seg-1"),
_build_segment("seg-2"),
]
step1_session.execute.return_value.all.return_value = []
step1_session.scalar.return_value = _build_dataset(dataset_id, tenant_id)
# Second create_session(): Step 2 (vector cleanup). Returns dataset.
step2_session = MagicMock()
step2_session.scalar.return_value = _build_dataset(dataset_id, tenant_id)
step2_session.scalars.return_value.all.return_value = []
step2_session.execute.return_value.all.return_value = []
# Subsequent sessions: Step 3+ (image / segment / file / metadata cleanup).
# Default fixture returns empty results which is fine for these short txns.
cm1, cm2 = MagicMock(), MagicMock()
cm1.__enter__.return_value = step1_session
cm1.__exit__.return_value = None
cm2.__enter__.return_value = step2_session
cm2.__exit__.return_value = None
def _default_cm():
session = MagicMock()
session.scalars.return_value.all.return_value = []
session.execute.return_value.all.return_value = []
session.scalar.return_value = None
cm = MagicMock()
cm.__enter__.return_value = session
cm.__exit__.return_value = None
sessions.append(session)
return cm
mock_sf.create_session.side_effect = [cm1, cm2] + [_default_cm() for _ in range(10)]
# Simulate the production failure: index_processor.clean() raises ValueError
# mirroring BillingService._send_request when billing returns non-200.
mock_index_processor_factory["processor"].clean.side_effect = ValueError(
"Unable to retrieve billing information. Please try again later or contact support."
)
# Act — must not raise out of the task even though clean() raises.
clean_document_task(
document_id=document_id,
dataset_id=dataset_id,
doc_form="paragraph",
file_id=None,
)
# Assert
# 1. Vector cleanup was attempted.
mock_index_processor_factory["processor"].clean.assert_called_once()
# 2. Despite the failure the task continued: at least one DocumentSegment
# delete was issued. We use the count of session.execute calls across
# later short transactions as a proxy for "Step 3+ executed".
execute_calls = sum(s.execute.call_count for s in sessions)
assert execute_calls > 0, (
"Step 3+ DB cleanup did not run after vector cleanup failure; "
"this regression would re-introduce the orphan-segment bug."
)
def test_vector_cleanup_success_path_remains_unaffected(
self,
document_id,
dataset_id,
tenant_id,
mock_session_factory,
mock_storage,
mock_index_processor_factory,
):
"""Backward-compat: the happy path must still call ``clean()`` exactly
once with the expected arguments and complete without errors.
"""
mock_sf, sessions = mock_session_factory
step1_session = MagicMock()
step1_session.scalars.return_value.all.return_value = [_build_segment("seg-1")]
step1_session.execute.return_value.all.return_value = []
step1_session.scalar.return_value = _build_dataset(dataset_id, tenant_id)
step2_session = MagicMock()
step2_session.scalar.return_value = _build_dataset(dataset_id, tenant_id)
step2_session.scalars.return_value.all.return_value = []
step2_session.execute.return_value.all.return_value = []
cm1, cm2 = MagicMock(), MagicMock()
cm1.__enter__.return_value = step1_session
cm1.__exit__.return_value = None
cm2.__enter__.return_value = step2_session
cm2.__exit__.return_value = None
def _default_cm():
session = MagicMock()
session.scalars.return_value.all.return_value = []
session.execute.return_value.all.return_value = []
session.scalar.return_value = None
cm = MagicMock()
cm.__enter__.return_value = session
cm.__exit__.return_value = None
sessions.append(session)
return cm
mock_sf.create_session.side_effect = [cm1, cm2] + [_default_cm() for _ in range(10)]
clean_document_task(
document_id=document_id,
dataset_id=dataset_id,
doc_form="paragraph",
file_id=None,
)
assert mock_index_processor_factory["processor"].clean.call_count == 1
# Index cleanup invoked with the expected delete_summaries / delete_child_chunks flags.
_, kwargs = mock_index_processor_factory["processor"].clean.call_args
assert kwargs.get("with_keywords") is True
assert kwargs.get("delete_child_chunks") is True
assert kwargs.get("delete_summaries") is True
def test_no_segments_skips_vector_cleanup(
self,
document_id,
dataset_id,
tenant_id,
mock_session_factory,
mock_storage,
mock_index_processor_factory,
):
"""When the document has no segments (e.g. indexing failed before
producing any), vector cleanup must not be attempted and therefore
the new try/except wrapper does not change behavior here.
"""
mock_sf, sessions = mock_session_factory
step1_session = MagicMock()
step1_session.scalars.return_value.all.return_value = [] # no segments
step1_session.execute.return_value.all.return_value = []
step1_session.scalar.return_value = _build_dataset(dataset_id, tenant_id)
cm1 = MagicMock()
cm1.__enter__.return_value = step1_session
cm1.__exit__.return_value = None
def _default_cm():
session = MagicMock()
session.scalars.return_value.all.return_value = []
session.execute.return_value.all.return_value = []
session.scalar.return_value = None
cm = MagicMock()
cm.__enter__.return_value = session
cm.__exit__.return_value = None
sessions.append(session)
return cm
mock_sf.create_session.side_effect = [cm1] + [_default_cm() for _ in range(10)]
clean_document_task(
document_id=document_id,
dataset_id=dataset_id,
doc_form="paragraph",
file_id=None,
)
# Vector cleanup is gated on ``index_node_ids``; when there are no
# segments the IndexProcessorFactory path is never entered.
mock_index_processor_factory["factory_cls"].assert_not_called()