test: migrate test_dataset_service_batch_update_document_status SQL tests to testcontainers (#32537)

Co-authored-by: KinomotoMio <200703522+KinomotoMio@users.noreply.github.com>
Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
This commit is contained in:
木之本澪 2026-03-03 19:29:58 +08:00 committed by GitHub
parent 1a90c4d81b
commit 65bf632ec0
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 662 additions and 702 deletions

View File

@ -0,0 +1,660 @@
"""Integration tests for DocumentService.batch_update_document_status.
This suite validates SQL-backed batch status updates with testcontainers.
It keeps database access real and only patches non-DB side effects.
"""
import datetime
import json
from dataclasses import dataclass
from unittest.mock import call, patch
from uuid import uuid4
import pytest
from extensions.ext_database import db
from models.dataset import Dataset, Document
from services.dataset_service import DocumentService
from services.errors.document import DocumentIndexingError
FIXED_TIME = datetime.datetime(2023, 1, 1, 12, 0, 0)
@dataclass
class UserDouble:
"""Minimal user object for batch update operations."""
id: str
class DocumentBatchUpdateIntegrationDataFactory:
"""Factory for creating persisted entities used in integration tests."""
@staticmethod
def create_dataset(
dataset_id: str | None = None,
tenant_id: str | None = None,
name: str = "Test Dataset",
created_by: str | None = None,
) -> Dataset:
"""Create and persist a dataset."""
dataset = Dataset(
tenant_id=tenant_id or str(uuid4()),
name=name,
data_source_type="upload_file",
created_by=created_by or str(uuid4()),
)
if dataset_id:
dataset.id = dataset_id
db.session.add(dataset)
db.session.commit()
return dataset
@staticmethod
def create_document(
dataset: Dataset,
document_id: str | None = None,
name: str = "test_document.pdf",
enabled: bool = True,
archived: bool = False,
indexing_status: str = "completed",
completed_at: datetime.datetime | None = None,
position: int = 1,
created_by: str | None = None,
commit: bool = True,
**kwargs,
) -> Document:
"""Create a document bound to the given dataset and persist it."""
document = Document(
tenant_id=dataset.tenant_id,
dataset_id=dataset.id,
position=position,
data_source_type="upload_file",
data_source_info=json.dumps({"upload_file_id": str(uuid4())}),
batch=f"batch-{uuid4()}",
name=name,
created_from="web",
created_by=created_by or str(uuid4()),
doc_form="text_model",
)
document.id = document_id or str(uuid4())
document.enabled = enabled
document.archived = archived
document.indexing_status = indexing_status
document.completed_at = (
completed_at if completed_at is not None else (FIXED_TIME if indexing_status == "completed" else None)
)
for key, value in kwargs.items():
setattr(document, key, value)
db.session.add(document)
if commit:
db.session.commit()
return document
@staticmethod
def create_multiple_documents(
dataset: Dataset,
document_ids: list[str],
enabled: bool = True,
archived: bool = False,
indexing_status: str = "completed",
) -> list[Document]:
"""Create and persist multiple documents for one dataset in a single transaction."""
documents: list[Document] = []
for index, doc_id in enumerate(document_ids, start=1):
document = DocumentBatchUpdateIntegrationDataFactory.create_document(
dataset=dataset,
document_id=doc_id,
name=f"document_{doc_id}.pdf",
enabled=enabled,
archived=archived,
indexing_status=indexing_status,
position=index,
commit=False,
)
documents.append(document)
db.session.commit()
return documents
@staticmethod
def create_user(user_id: str | None = None) -> UserDouble:
"""Create a lightweight user for update metadata fields."""
return UserDouble(id=user_id or str(uuid4()))
class TestDatasetServiceBatchUpdateDocumentStatus:
"""Integration coverage for batch document status updates."""
@pytest.fixture
def patched_dependencies(self):
"""Patch non-DB collaborators only."""
with (
patch("services.dataset_service.redis_client") as redis_client,
patch("services.dataset_service.add_document_to_index_task") as add_task,
patch("services.dataset_service.remove_document_from_index_task") as remove_task,
patch("services.dataset_service.naive_utc_now") as naive_utc_now,
):
naive_utc_now.return_value = FIXED_TIME
redis_client.get.return_value = None
yield {
"redis_client": redis_client,
"add_task": add_task,
"remove_task": remove_task,
"naive_utc_now": naive_utc_now,
}
def _assert_document_enabled(self, document: Document, current_time: datetime.datetime):
"""Verify enabled-state fields after action=enable."""
assert document.enabled is True
assert document.disabled_at is None
assert document.disabled_by is None
assert document.updated_at == current_time
def _assert_document_disabled(self, document: Document, user_id: str, current_time: datetime.datetime):
"""Verify disabled-state fields after action=disable."""
assert document.enabled is False
assert document.disabled_at == current_time
assert document.disabled_by == user_id
assert document.updated_at == current_time
def _assert_document_archived(self, document: Document, user_id: str, current_time: datetime.datetime):
"""Verify archived-state fields after action=archive."""
assert document.archived is True
assert document.archived_at == current_time
assert document.archived_by == user_id
assert document.updated_at == current_time
def _assert_document_unarchived(self, document: Document):
"""Verify unarchived-state fields after action=un_archive."""
assert document.archived is False
assert document.archived_at is None
assert document.archived_by is None
def test_batch_update_enable_documents_success(self, db_session_with_containers, patched_dependencies):
"""Enable disabled documents and trigger indexing side effects."""
# Arrange
dataset = DocumentBatchUpdateIntegrationDataFactory.create_dataset()
user = DocumentBatchUpdateIntegrationDataFactory.create_user()
document_ids = [str(uuid4()), str(uuid4())]
disabled_docs = DocumentBatchUpdateIntegrationDataFactory.create_multiple_documents(
dataset=dataset,
document_ids=document_ids,
enabled=False,
)
# Act
DocumentService.batch_update_document_status(
dataset=dataset, document_ids=document_ids, action="enable", user=user
)
# Assert
for document in disabled_docs:
db.session.refresh(document)
self._assert_document_enabled(document, FIXED_TIME)
expected_get_calls = [call(f"document_{doc_id}_indexing") for doc_id in document_ids]
expected_setex_calls = [call(f"document_{doc_id}_indexing", 600, 1) for doc_id in document_ids]
expected_add_calls = [call(doc_id) for doc_id in document_ids]
patched_dependencies["redis_client"].get.assert_has_calls(expected_get_calls)
patched_dependencies["redis_client"].setex.assert_has_calls(expected_setex_calls)
patched_dependencies["add_task"].delay.assert_has_calls(expected_add_calls)
def test_batch_update_enable_already_enabled_document_skipped(
self, db_session_with_containers, patched_dependencies
):
"""Skip enable operation for already-enabled documents."""
# Arrange
dataset = DocumentBatchUpdateIntegrationDataFactory.create_dataset()
user = DocumentBatchUpdateIntegrationDataFactory.create_user()
document = DocumentBatchUpdateIntegrationDataFactory.create_document(dataset=dataset, enabled=True)
# Act
DocumentService.batch_update_document_status(
dataset=dataset,
document_ids=[document.id],
action="enable",
user=user,
)
# Assert
db.session.refresh(document)
assert document.enabled is True
patched_dependencies["redis_client"].setex.assert_not_called()
patched_dependencies["add_task"].delay.assert_not_called()
def test_batch_update_disable_documents_success(self, db_session_with_containers, patched_dependencies):
"""Disable completed documents and trigger remove-index tasks."""
# Arrange
dataset = DocumentBatchUpdateIntegrationDataFactory.create_dataset()
user = DocumentBatchUpdateIntegrationDataFactory.create_user()
document_ids = [str(uuid4()), str(uuid4())]
enabled_docs = DocumentBatchUpdateIntegrationDataFactory.create_multiple_documents(
dataset=dataset,
document_ids=document_ids,
enabled=True,
indexing_status="completed",
)
# Act
DocumentService.batch_update_document_status(
dataset=dataset,
document_ids=document_ids,
action="disable",
user=user,
)
# Assert
for document in enabled_docs:
db.session.refresh(document)
self._assert_document_disabled(document, user.id, FIXED_TIME)
expected_get_calls = [call(f"document_{doc_id}_indexing") for doc_id in document_ids]
expected_setex_calls = [call(f"document_{doc_id}_indexing", 600, 1) for doc_id in document_ids]
expected_remove_calls = [call(doc_id) for doc_id in document_ids]
patched_dependencies["redis_client"].get.assert_has_calls(expected_get_calls)
patched_dependencies["redis_client"].setex.assert_has_calls(expected_setex_calls)
patched_dependencies["remove_task"].delay.assert_has_calls(expected_remove_calls)
def test_batch_update_disable_already_disabled_document_skipped(
self, db_session_with_containers, patched_dependencies
):
"""Skip disable operation for already-disabled documents."""
# Arrange
dataset = DocumentBatchUpdateIntegrationDataFactory.create_dataset()
user = DocumentBatchUpdateIntegrationDataFactory.create_user()
disabled_doc = DocumentBatchUpdateIntegrationDataFactory.create_document(
dataset=dataset,
enabled=False,
indexing_status="completed",
completed_at=FIXED_TIME,
)
# Act
DocumentService.batch_update_document_status(
dataset=dataset,
document_ids=[disabled_doc.id],
action="disable",
user=user,
)
# Assert
db.session.refresh(disabled_doc)
assert disabled_doc.enabled is False
patched_dependencies["redis_client"].setex.assert_not_called()
patched_dependencies["remove_task"].delay.assert_not_called()
def test_batch_update_disable_non_completed_document_error(self, db_session_with_containers, patched_dependencies):
"""Raise error when disabling a non-completed document."""
# Arrange
dataset = DocumentBatchUpdateIntegrationDataFactory.create_dataset()
user = DocumentBatchUpdateIntegrationDataFactory.create_user()
non_completed_doc = DocumentBatchUpdateIntegrationDataFactory.create_document(
dataset=dataset,
enabled=True,
indexing_status="indexing",
completed_at=None,
)
# Act / Assert
with pytest.raises(DocumentIndexingError, match="is not completed"):
DocumentService.batch_update_document_status(
dataset=dataset,
document_ids=[non_completed_doc.id],
action="disable",
user=user,
)
def test_batch_update_archive_documents_success(self, db_session_with_containers, patched_dependencies):
"""Archive enabled documents and trigger remove-index task."""
# Arrange
dataset = DocumentBatchUpdateIntegrationDataFactory.create_dataset()
user = DocumentBatchUpdateIntegrationDataFactory.create_user()
document = DocumentBatchUpdateIntegrationDataFactory.create_document(
dataset=dataset, enabled=True, archived=False
)
# Act
DocumentService.batch_update_document_status(
dataset=dataset,
document_ids=[document.id],
action="archive",
user=user,
)
# Assert
db.session.refresh(document)
self._assert_document_archived(document, user.id, FIXED_TIME)
patched_dependencies["redis_client"].get.assert_called_once_with(f"document_{document.id}_indexing")
patched_dependencies["redis_client"].setex.assert_called_once_with(f"document_{document.id}_indexing", 600, 1)
patched_dependencies["remove_task"].delay.assert_called_once_with(document.id)
def test_batch_update_archive_already_archived_document_skipped(
self, db_session_with_containers, patched_dependencies
):
"""Skip archive operation for already-archived documents."""
# Arrange
dataset = DocumentBatchUpdateIntegrationDataFactory.create_dataset()
user = DocumentBatchUpdateIntegrationDataFactory.create_user()
document = DocumentBatchUpdateIntegrationDataFactory.create_document(
dataset=dataset, enabled=True, archived=True
)
# Act
DocumentService.batch_update_document_status(
dataset=dataset,
document_ids=[document.id],
action="archive",
user=user,
)
# Assert
db.session.refresh(document)
assert document.archived is True
patched_dependencies["redis_client"].setex.assert_not_called()
patched_dependencies["remove_task"].delay.assert_not_called()
def test_batch_update_archive_disabled_document_no_index_removal(
self, db_session_with_containers, patched_dependencies
):
"""Archive disabled document without index-removal side effects."""
# Arrange
dataset = DocumentBatchUpdateIntegrationDataFactory.create_dataset()
user = DocumentBatchUpdateIntegrationDataFactory.create_user()
document = DocumentBatchUpdateIntegrationDataFactory.create_document(
dataset=dataset, enabled=False, archived=False
)
# Act
DocumentService.batch_update_document_status(
dataset=dataset,
document_ids=[document.id],
action="archive",
user=user,
)
# Assert
db.session.refresh(document)
self._assert_document_archived(document, user.id, FIXED_TIME)
patched_dependencies["redis_client"].setex.assert_not_called()
patched_dependencies["remove_task"].delay.assert_not_called()
def test_batch_update_unarchive_documents_success(self, db_session_with_containers, patched_dependencies):
"""Unarchive enabled documents and trigger add-index task."""
# Arrange
dataset = DocumentBatchUpdateIntegrationDataFactory.create_dataset()
user = DocumentBatchUpdateIntegrationDataFactory.create_user()
document = DocumentBatchUpdateIntegrationDataFactory.create_document(
dataset=dataset, enabled=True, archived=True
)
# Act
DocumentService.batch_update_document_status(
dataset=dataset,
document_ids=[document.id],
action="un_archive",
user=user,
)
# Assert
db.session.refresh(document)
self._assert_document_unarchived(document)
assert document.updated_at == FIXED_TIME
patched_dependencies["redis_client"].get.assert_called_once_with(f"document_{document.id}_indexing")
patched_dependencies["redis_client"].setex.assert_called_once_with(f"document_{document.id}_indexing", 600, 1)
patched_dependencies["add_task"].delay.assert_called_once_with(document.id)
def test_batch_update_unarchive_already_unarchived_document_skipped(
self, db_session_with_containers, patched_dependencies
):
"""Skip unarchive operation for already-unarchived documents."""
# Arrange
dataset = DocumentBatchUpdateIntegrationDataFactory.create_dataset()
user = DocumentBatchUpdateIntegrationDataFactory.create_user()
document = DocumentBatchUpdateIntegrationDataFactory.create_document(
dataset=dataset, enabled=True, archived=False
)
# Act
DocumentService.batch_update_document_status(
dataset=dataset,
document_ids=[document.id],
action="un_archive",
user=user,
)
# Assert
db.session.refresh(document)
assert document.archived is False
patched_dependencies["redis_client"].setex.assert_not_called()
patched_dependencies["add_task"].delay.assert_not_called()
def test_batch_update_unarchive_disabled_document_no_index_addition(
self, db_session_with_containers, patched_dependencies
):
"""Unarchive disabled document without index-add side effects."""
# Arrange
dataset = DocumentBatchUpdateIntegrationDataFactory.create_dataset()
user = DocumentBatchUpdateIntegrationDataFactory.create_user()
document = DocumentBatchUpdateIntegrationDataFactory.create_document(
dataset=dataset, enabled=False, archived=True
)
# Act
DocumentService.batch_update_document_status(
dataset=dataset,
document_ids=[document.id],
action="un_archive",
user=user,
)
# Assert
db.session.refresh(document)
self._assert_document_unarchived(document)
assert document.updated_at == FIXED_TIME
patched_dependencies["redis_client"].setex.assert_not_called()
patched_dependencies["add_task"].delay.assert_not_called()
def test_batch_update_document_indexing_error_redis_cache_hit(
self, db_session_with_containers, patched_dependencies
):
"""Raise DocumentIndexingError when redis indicates active indexing."""
# Arrange
dataset = DocumentBatchUpdateIntegrationDataFactory.create_dataset()
user = DocumentBatchUpdateIntegrationDataFactory.create_user()
document = DocumentBatchUpdateIntegrationDataFactory.create_document(
dataset=dataset,
name="test_document.pdf",
enabled=True,
)
patched_dependencies["redis_client"].get.return_value = "indexing"
# Act / Assert
with pytest.raises(DocumentIndexingError, match="is being indexed") as exc_info:
DocumentService.batch_update_document_status(
dataset=dataset,
document_ids=[document.id],
action="enable",
user=user,
)
assert "test_document.pdf" in str(exc_info.value)
patched_dependencies["redis_client"].get.assert_called_once_with(f"document_{document.id}_indexing")
def test_batch_update_async_task_error_handling(self, db_session_with_containers, patched_dependencies):
"""Persist DB update, then propagate async task error."""
# Arrange
dataset = DocumentBatchUpdateIntegrationDataFactory.create_dataset()
user = DocumentBatchUpdateIntegrationDataFactory.create_user()
document = DocumentBatchUpdateIntegrationDataFactory.create_document(dataset=dataset, enabled=False)
patched_dependencies["add_task"].delay.side_effect = Exception("Celery task error")
# Act / Assert
with pytest.raises(Exception, match="Celery task error"):
DocumentService.batch_update_document_status(
dataset=dataset,
document_ids=[document.id],
action="enable",
user=user,
)
db.session.refresh(document)
self._assert_document_enabled(document, FIXED_TIME)
patched_dependencies["redis_client"].setex.assert_called_once_with(f"document_{document.id}_indexing", 600, 1)
def test_batch_update_empty_document_list(self, db_session_with_containers, patched_dependencies):
"""Return early when document_ids is empty."""
# Arrange
dataset = DocumentBatchUpdateIntegrationDataFactory.create_dataset()
user = DocumentBatchUpdateIntegrationDataFactory.create_user()
# Act
result = DocumentService.batch_update_document_status(
dataset=dataset, document_ids=[], action="enable", user=user
)
# Assert
assert result is None
patched_dependencies["redis_client"].get.assert_not_called()
patched_dependencies["redis_client"].setex.assert_not_called()
def test_batch_update_document_not_found_skipped(self, db_session_with_containers, patched_dependencies):
"""Skip IDs that do not map to existing dataset documents."""
# Arrange
dataset = DocumentBatchUpdateIntegrationDataFactory.create_dataset()
user = DocumentBatchUpdateIntegrationDataFactory.create_user()
missing_document_id = str(uuid4())
# Act
DocumentService.batch_update_document_status(
dataset=dataset,
document_ids=[missing_document_id],
action="enable",
user=user,
)
# Assert
patched_dependencies["redis_client"].get.assert_not_called()
patched_dependencies["redis_client"].setex.assert_not_called()
patched_dependencies["add_task"].delay.assert_not_called()
def test_batch_update_mixed_document_states_and_actions(self, db_session_with_containers, patched_dependencies):
"""Process only the applicable document in a mixed-state enable batch."""
# Arrange
dataset = DocumentBatchUpdateIntegrationDataFactory.create_dataset()
user = DocumentBatchUpdateIntegrationDataFactory.create_user()
disabled_doc = DocumentBatchUpdateIntegrationDataFactory.create_document(dataset=dataset, enabled=False)
enabled_doc = DocumentBatchUpdateIntegrationDataFactory.create_document(
dataset=dataset,
enabled=True,
position=2,
)
archived_doc = DocumentBatchUpdateIntegrationDataFactory.create_document(
dataset=dataset,
enabled=True,
archived=True,
position=3,
)
document_ids = [disabled_doc.id, enabled_doc.id, archived_doc.id]
# Act
DocumentService.batch_update_document_status(
dataset=dataset,
document_ids=document_ids,
action="enable",
user=user,
)
# Assert
db.session.refresh(disabled_doc)
db.session.refresh(enabled_doc)
db.session.refresh(archived_doc)
self._assert_document_enabled(disabled_doc, FIXED_TIME)
assert enabled_doc.enabled is True
assert archived_doc.enabled is True
patched_dependencies["redis_client"].setex.assert_called_once_with(
f"document_{disabled_doc.id}_indexing",
600,
1,
)
patched_dependencies["add_task"].delay.assert_called_once_with(disabled_doc.id)
def test_batch_update_large_document_list_performance(self, db_session_with_containers, patched_dependencies):
"""Handle large document lists with consistent updates and side effects."""
# Arrange
dataset = DocumentBatchUpdateIntegrationDataFactory.create_dataset()
user = DocumentBatchUpdateIntegrationDataFactory.create_user()
document_ids = [str(uuid4()) for _ in range(100)]
documents = DocumentBatchUpdateIntegrationDataFactory.create_multiple_documents(
dataset=dataset,
document_ids=document_ids,
enabled=False,
)
# Act
DocumentService.batch_update_document_status(
dataset=dataset,
document_ids=document_ids,
action="enable",
user=user,
)
# Assert
for document in documents:
db.session.refresh(document)
self._assert_document_enabled(document, FIXED_TIME)
assert patched_dependencies["redis_client"].setex.call_count == len(document_ids)
assert patched_dependencies["add_task"].delay.call_count == len(document_ids)
expected_setex_calls = [call(f"document_{doc_id}_indexing", 600, 1) for doc_id in document_ids]
expected_task_calls = [call(doc_id) for doc_id in document_ids]
patched_dependencies["redis_client"].setex.assert_has_calls(expected_setex_calls)
patched_dependencies["add_task"].delay.assert_has_calls(expected_task_calls)
def test_batch_update_mixed_document_states_complex_scenario(
self, db_session_with_containers, patched_dependencies
):
"""Process a complex mixed-state batch and update only eligible records."""
# Arrange
dataset = DocumentBatchUpdateIntegrationDataFactory.create_dataset()
user = DocumentBatchUpdateIntegrationDataFactory.create_user()
doc1 = DocumentBatchUpdateIntegrationDataFactory.create_document(dataset=dataset, enabled=False)
doc2 = DocumentBatchUpdateIntegrationDataFactory.create_document(dataset=dataset, enabled=True, position=2)
doc3 = DocumentBatchUpdateIntegrationDataFactory.create_document(dataset=dataset, enabled=True, position=3)
doc4 = DocumentBatchUpdateIntegrationDataFactory.create_document(dataset=dataset, enabled=True, position=4)
doc5 = DocumentBatchUpdateIntegrationDataFactory.create_document(
dataset=dataset,
enabled=True,
archived=True,
position=5,
)
missing_id = str(uuid4())
document_ids = [doc1.id, doc2.id, doc3.id, doc4.id, doc5.id, missing_id]
# Act
DocumentService.batch_update_document_status(
dataset=dataset,
document_ids=document_ids,
action="enable",
user=user,
)
# Assert
db.session.refresh(doc1)
db.session.refresh(doc2)
db.session.refresh(doc3)
db.session.refresh(doc4)
db.session.refresh(doc5)
self._assert_document_enabled(doc1, FIXED_TIME)
assert doc2.enabled is True
assert doc3.enabled is True
assert doc4.enabled is True
assert doc5.enabled is True
patched_dependencies["redis_client"].setex.assert_called_once_with(f"document_{doc1.id}_indexing", 600, 1)
patched_dependencies["add_task"].delay.assert_called_once_with(doc1.id)

View File

@ -1,13 +1,10 @@
import datetime
# Mock redis_client before importing dataset_service
from unittest.mock import Mock, call, patch
from unittest.mock import Mock, patch
import pytest
from models.dataset import Dataset, Document
from services.dataset_service import DocumentService
from services.errors.document import DocumentIndexingError
from tests.unit_tests.conftest import redis_mock
@ -48,7 +45,6 @@ class DocumentBatchUpdateTestDataFactory:
document.indexing_status = indexing_status
document.completed_at = completed_at or datetime.datetime.now()
# Set default values for optional fields
document.disabled_at = None
document.disabled_by = None
document.archived_at = None
@ -59,32 +55,9 @@ class DocumentBatchUpdateTestDataFactory:
setattr(document, key, value)
return document
@staticmethod
def create_multiple_documents(
document_ids: list[str], enabled: bool = True, archived: bool = False, indexing_status: str = "completed"
) -> list[Mock]:
"""Create multiple mock documents with specified attributes."""
documents = []
for doc_id in document_ids:
doc = DocumentBatchUpdateTestDataFactory.create_document_mock(
document_id=doc_id,
name=f"document_{doc_id}.pdf",
enabled=enabled,
archived=archived,
indexing_status=indexing_status,
)
documents.append(doc)
return documents
class TestDatasetServiceBatchUpdateDocumentStatus:
"""
Comprehensive unit tests for DocumentService.batch_update_document_status method.
This test suite covers all supported actions (enable, disable, archive, un_archive),
error conditions, edge cases, and validates proper interaction with Redis cache,
database operations, and async task triggers.
"""
"""Unit tests for non-SQL path in DocumentService.batch_update_document_status."""
@pytest.fixture
def mock_document_service_dependencies(self):
@ -104,697 +77,24 @@ class TestDatasetServiceBatchUpdateDocumentStatus:
"current_time": current_time,
}
@pytest.fixture
def mock_async_task_dependencies(self):
"""Mock setup for async task dependencies."""
with (
patch("services.dataset_service.add_document_to_index_task") as mock_add_task,
patch("services.dataset_service.remove_document_from_index_task") as mock_remove_task,
):
yield {"add_task": mock_add_task, "remove_task": mock_remove_task}
def _assert_document_enabled(self, document: Mock, user_id: str, current_time: datetime.datetime):
"""Helper method to verify document was enabled correctly."""
assert document.enabled == True
assert document.disabled_at is None
assert document.disabled_by is None
assert document.updated_at == current_time
def _assert_document_disabled(self, document: Mock, user_id: str, current_time: datetime.datetime):
"""Helper method to verify document was disabled correctly."""
assert document.enabled == False
assert document.disabled_at == current_time
assert document.disabled_by == user_id
assert document.updated_at == current_time
def _assert_document_archived(self, document: Mock, user_id: str, current_time: datetime.datetime):
"""Helper method to verify document was archived correctly."""
assert document.archived == True
assert document.archived_at == current_time
assert document.archived_by == user_id
assert document.updated_at == current_time
def _assert_document_unarchived(self, document: Mock):
"""Helper method to verify document was unarchived correctly."""
assert document.archived == False
assert document.archived_at is None
assert document.archived_by is None
def _assert_redis_cache_operations(self, document_ids: list[str], action: str = "setex"):
"""Helper method to verify Redis cache operations."""
if action == "setex":
expected_calls = [call(f"document_{doc_id}_indexing", 600, 1) for doc_id in document_ids]
redis_mock.setex.assert_has_calls(expected_calls)
elif action == "get":
expected_calls = [call(f"document_{doc_id}_indexing") for doc_id in document_ids]
redis_mock.get.assert_has_calls(expected_calls)
def _assert_async_task_calls(self, mock_task, document_ids: list[str], task_type: str):
"""Helper method to verify async task calls."""
expected_calls = [call(doc_id) for doc_id in document_ids]
if task_type in {"add", "remove"}:
mock_task.delay.assert_has_calls(expected_calls)
# ==================== Enable Document Tests ====================
def test_batch_update_enable_documents_success(
self, mock_document_service_dependencies, mock_async_task_dependencies
):
"""Test successful enabling of disabled documents."""
dataset = DocumentBatchUpdateTestDataFactory.create_dataset_mock()
user = DocumentBatchUpdateTestDataFactory.create_user_mock()
# Create disabled documents
disabled_docs = DocumentBatchUpdateTestDataFactory.create_multiple_documents(["doc-1", "doc-2"], enabled=False)
mock_document_service_dependencies["get_document"].side_effect = disabled_docs
# Reset module-level Redis mock
redis_mock.reset_mock()
redis_mock.get.return_value = None
# Call the method to enable documents
DocumentService.batch_update_document_status(
dataset=dataset, document_ids=["doc-1", "doc-2"], action="enable", user=user
)
# Verify document attributes were updated correctly
for doc in disabled_docs:
self._assert_document_enabled(doc, user.id, mock_document_service_dependencies["current_time"])
# Verify Redis cache operations
self._assert_redis_cache_operations(["doc-1", "doc-2"], "get")
self._assert_redis_cache_operations(["doc-1", "doc-2"], "setex")
# Verify async tasks were triggered for indexing
self._assert_async_task_calls(mock_async_task_dependencies["add_task"], ["doc-1", "doc-2"], "add")
# Verify database operations
mock_db = mock_document_service_dependencies["db_session"]
assert mock_db.add.call_count == 2
assert mock_db.commit.call_count == 1
def test_batch_update_enable_already_enabled_document_skipped(self, mock_document_service_dependencies):
"""Test enabling documents that are already enabled."""
dataset = DocumentBatchUpdateTestDataFactory.create_dataset_mock()
user = DocumentBatchUpdateTestDataFactory.create_user_mock()
# Create already enabled document
enabled_doc = DocumentBatchUpdateTestDataFactory.create_document_mock(enabled=True)
mock_document_service_dependencies["get_document"].return_value = enabled_doc
# Reset module-level Redis mock
redis_mock.reset_mock()
redis_mock.get.return_value = None
# Attempt to enable already enabled document
DocumentService.batch_update_document_status(
dataset=dataset, document_ids=["doc-1"], action="enable", user=user
)
# Verify no database operations occurred (document was skipped)
mock_db = mock_document_service_dependencies["db_session"]
mock_db.commit.assert_not_called()
# Verify no Redis setex operations occurred (document was skipped)
redis_mock.setex.assert_not_called()
# ==================== Disable Document Tests ====================
def test_batch_update_disable_documents_success(
self, mock_document_service_dependencies, mock_async_task_dependencies
):
"""Test successful disabling of enabled and completed documents."""
dataset = DocumentBatchUpdateTestDataFactory.create_dataset_mock()
user = DocumentBatchUpdateTestDataFactory.create_user_mock()
# Create enabled documents
enabled_docs = DocumentBatchUpdateTestDataFactory.create_multiple_documents(["doc-1", "doc-2"], enabled=True)
mock_document_service_dependencies["get_document"].side_effect = enabled_docs
# Reset module-level Redis mock
redis_mock.reset_mock()
redis_mock.get.return_value = None
# Call the method to disable documents
DocumentService.batch_update_document_status(
dataset=dataset, document_ids=["doc-1", "doc-2"], action="disable", user=user
)
# Verify document attributes were updated correctly
for doc in enabled_docs:
self._assert_document_disabled(doc, user.id, mock_document_service_dependencies["current_time"])
# Verify Redis cache operations for indexing prevention
self._assert_redis_cache_operations(["doc-1", "doc-2"], "setex")
# Verify async tasks were triggered to remove from index
self._assert_async_task_calls(mock_async_task_dependencies["remove_task"], ["doc-1", "doc-2"], "remove")
# Verify database operations
mock_db = mock_document_service_dependencies["db_session"]
assert mock_db.add.call_count == 2
assert mock_db.commit.call_count == 1
def test_batch_update_disable_already_disabled_document_skipped(
self, mock_document_service_dependencies, mock_async_task_dependencies
):
"""Test disabling documents that are already disabled."""
dataset = DocumentBatchUpdateTestDataFactory.create_dataset_mock()
user = DocumentBatchUpdateTestDataFactory.create_user_mock()
# Create already disabled document
disabled_doc = DocumentBatchUpdateTestDataFactory.create_document_mock(enabled=False)
mock_document_service_dependencies["get_document"].return_value = disabled_doc
# Reset module-level Redis mock
redis_mock.reset_mock()
redis_mock.get.return_value = None
# Attempt to disable already disabled document
DocumentService.batch_update_document_status(
dataset=dataset, document_ids=["doc-1"], action="disable", user=user
)
# Verify no database operations occurred (document was skipped)
mock_db = mock_document_service_dependencies["db_session"]
mock_db.commit.assert_not_called()
# Verify no Redis setex operations occurred (document was skipped)
redis_mock.setex.assert_not_called()
# Verify no async tasks were triggered (document was skipped)
mock_async_task_dependencies["add_task"].delay.assert_not_called()
def test_batch_update_disable_non_completed_document_error(self, mock_document_service_dependencies):
"""Test that DocumentIndexingError is raised when trying to disable non-completed documents."""
dataset = DocumentBatchUpdateTestDataFactory.create_dataset_mock()
user = DocumentBatchUpdateTestDataFactory.create_user_mock()
# Create a document that's not completed
non_completed_doc = DocumentBatchUpdateTestDataFactory.create_document_mock(
enabled=True,
indexing_status="indexing", # Not completed
completed_at=None, # Not completed
)
mock_document_service_dependencies["get_document"].return_value = non_completed_doc
# Verify that DocumentIndexingError is raised
with pytest.raises(DocumentIndexingError) as exc_info:
DocumentService.batch_update_document_status(
dataset=dataset, document_ids=["doc-1"], action="disable", user=user
)
# Verify error message indicates document is not completed
assert "is not completed" in str(exc_info.value)
# ==================== Archive Document Tests ====================
def test_batch_update_archive_documents_success(
self, mock_document_service_dependencies, mock_async_task_dependencies
):
"""Test successful archiving of unarchived documents."""
dataset = DocumentBatchUpdateTestDataFactory.create_dataset_mock()
user = DocumentBatchUpdateTestDataFactory.create_user_mock()
# Create unarchived enabled document
unarchived_doc = DocumentBatchUpdateTestDataFactory.create_document_mock(enabled=True, archived=False)
mock_document_service_dependencies["get_document"].return_value = unarchived_doc
# Reset module-level Redis mock
redis_mock.reset_mock()
redis_mock.get.return_value = None
# Call the method to archive documents
DocumentService.batch_update_document_status(
dataset=dataset, document_ids=["doc-1"], action="archive", user=user
)
# Verify document attributes were updated correctly
self._assert_document_archived(unarchived_doc, user.id, mock_document_service_dependencies["current_time"])
# Verify Redis cache was set (because document was enabled)
redis_mock.setex.assert_called_once_with("document_doc-1_indexing", 600, 1)
# Verify async task was triggered to remove from index (because enabled)
mock_async_task_dependencies["remove_task"].delay.assert_called_once_with("doc-1")
# Verify database operations
mock_db = mock_document_service_dependencies["db_session"]
mock_db.add.assert_called_once()
mock_db.commit.assert_called_once()
def test_batch_update_archive_already_archived_document_skipped(self, mock_document_service_dependencies):
"""Test archiving documents that are already archived."""
dataset = DocumentBatchUpdateTestDataFactory.create_dataset_mock()
user = DocumentBatchUpdateTestDataFactory.create_user_mock()
# Create already archived document
archived_doc = DocumentBatchUpdateTestDataFactory.create_document_mock(enabled=True, archived=True)
mock_document_service_dependencies["get_document"].return_value = archived_doc
# Reset module-level Redis mock
redis_mock.reset_mock()
redis_mock.get.return_value = None
# Attempt to archive already archived document
DocumentService.batch_update_document_status(
dataset=dataset, document_ids=["doc-3"], action="archive", user=user
)
# Verify no database operations occurred (document was skipped)
mock_db = mock_document_service_dependencies["db_session"]
mock_db.commit.assert_not_called()
# Verify no Redis setex operations occurred (document was skipped)
redis_mock.setex.assert_not_called()
def test_batch_update_archive_disabled_document_no_index_removal(
self, mock_document_service_dependencies, mock_async_task_dependencies
):
"""Test archiving disabled documents (should not trigger index removal)."""
dataset = DocumentBatchUpdateTestDataFactory.create_dataset_mock()
user = DocumentBatchUpdateTestDataFactory.create_user_mock()
# Set up disabled, unarchived document
disabled_unarchived_doc = DocumentBatchUpdateTestDataFactory.create_document_mock(enabled=False, archived=False)
mock_document_service_dependencies["get_document"].return_value = disabled_unarchived_doc
# Reset module-level Redis mock
redis_mock.reset_mock()
redis_mock.get.return_value = None
# Archive the disabled document
DocumentService.batch_update_document_status(
dataset=dataset, document_ids=["doc-1"], action="archive", user=user
)
# Verify document was archived
self._assert_document_archived(
disabled_unarchived_doc, user.id, mock_document_service_dependencies["current_time"]
)
# Verify no Redis cache was set (document is disabled)
redis_mock.setex.assert_not_called()
# Verify no index removal task was triggered (document is disabled)
mock_async_task_dependencies["remove_task"].delay.assert_not_called()
# Verify database operations still occurred
mock_db = mock_document_service_dependencies["db_session"]
mock_db.add.assert_called_once()
mock_db.commit.assert_called_once()
# ==================== Unarchive Document Tests ====================
def test_batch_update_unarchive_documents_success(
self, mock_document_service_dependencies, mock_async_task_dependencies
):
"""Test successful unarchiving of archived documents."""
dataset = DocumentBatchUpdateTestDataFactory.create_dataset_mock()
user = DocumentBatchUpdateTestDataFactory.create_user_mock()
# Create mock archived document
archived_doc = DocumentBatchUpdateTestDataFactory.create_document_mock(enabled=True, archived=True)
mock_document_service_dependencies["get_document"].return_value = archived_doc
# Reset module-level Redis mock
redis_mock.reset_mock()
redis_mock.get.return_value = None
# Call the method to unarchive documents
DocumentService.batch_update_document_status(
dataset=dataset, document_ids=["doc-1"], action="un_archive", user=user
)
# Verify document attributes were updated correctly
self._assert_document_unarchived(archived_doc)
assert archived_doc.updated_at == mock_document_service_dependencies["current_time"]
# Verify Redis cache was set (because document is enabled)
redis_mock.setex.assert_called_once_with("document_doc-1_indexing", 600, 1)
# Verify async task was triggered to add back to index (because enabled)
mock_async_task_dependencies["add_task"].delay.assert_called_once_with("doc-1")
# Verify database operations
mock_db = mock_document_service_dependencies["db_session"]
mock_db.add.assert_called_once()
mock_db.commit.assert_called_once()
def test_batch_update_unarchive_already_unarchived_document_skipped(
self, mock_document_service_dependencies, mock_async_task_dependencies
):
"""Test unarchiving documents that are already unarchived."""
dataset = DocumentBatchUpdateTestDataFactory.create_dataset_mock()
user = DocumentBatchUpdateTestDataFactory.create_user_mock()
# Create already unarchived document
unarchived_doc = DocumentBatchUpdateTestDataFactory.create_document_mock(enabled=True, archived=False)
mock_document_service_dependencies["get_document"].return_value = unarchived_doc
# Reset module-level Redis mock
redis_mock.reset_mock()
redis_mock.get.return_value = None
# Attempt to unarchive already unarchived document
DocumentService.batch_update_document_status(
dataset=dataset, document_ids=["doc-1"], action="un_archive", user=user
)
# Verify no database operations occurred (document was skipped)
mock_db = mock_document_service_dependencies["db_session"]
mock_db.commit.assert_not_called()
# Verify no Redis setex operations occurred (document was skipped)
redis_mock.setex.assert_not_called()
# Verify no async tasks were triggered (document was skipped)
mock_async_task_dependencies["add_task"].delay.assert_not_called()
def test_batch_update_unarchive_disabled_document_no_index_addition(
self, mock_document_service_dependencies, mock_async_task_dependencies
):
"""Test unarchiving disabled documents (should not trigger index addition)."""
dataset = DocumentBatchUpdateTestDataFactory.create_dataset_mock()
user = DocumentBatchUpdateTestDataFactory.create_user_mock()
# Create mock archived but disabled document
archived_disabled_doc = DocumentBatchUpdateTestDataFactory.create_document_mock(enabled=False, archived=True)
mock_document_service_dependencies["get_document"].return_value = archived_disabled_doc
# Reset module-level Redis mock
redis_mock.reset_mock()
redis_mock.get.return_value = None
# Unarchive the disabled document
DocumentService.batch_update_document_status(
dataset=dataset, document_ids=["doc-1"], action="un_archive", user=user
)
# Verify document was unarchived
self._assert_document_unarchived(archived_disabled_doc)
assert archived_disabled_doc.updated_at == mock_document_service_dependencies["current_time"]
# Verify no Redis cache was set (document is disabled)
redis_mock.setex.assert_not_called()
# Verify no index addition task was triggered (document is disabled)
mock_async_task_dependencies["add_task"].delay.assert_not_called()
# Verify database operations still occurred
mock_db = mock_document_service_dependencies["db_session"]
mock_db.add.assert_called_once()
mock_db.commit.assert_called_once()
# ==================== Error Handling Tests ====================
def test_batch_update_document_indexing_error_redis_cache_hit(self, mock_document_service_dependencies):
"""Test that DocumentIndexingError is raised when documents are currently being indexed."""
dataset = DocumentBatchUpdateTestDataFactory.create_dataset_mock()
user = DocumentBatchUpdateTestDataFactory.create_user_mock()
# Create mock enabled document
enabled_doc = DocumentBatchUpdateTestDataFactory.create_document_mock(enabled=True)
mock_document_service_dependencies["get_document"].return_value = enabled_doc
# Set up mock to indicate document is being indexed
redis_mock.reset_mock()
redis_mock.get.return_value = "indexing"
# Verify that DocumentIndexingError is raised
with pytest.raises(DocumentIndexingError) as exc_info:
DocumentService.batch_update_document_status(
dataset=dataset, document_ids=["doc-1"], action="enable", user=user
)
# Verify error message contains document name
assert "test_document.pdf" in str(exc_info.value)
assert "is being indexed" in str(exc_info.value)
# Verify Redis cache was checked
redis_mock.get.assert_called_once_with("document_doc-1_indexing")
def test_batch_update_invalid_action_error(self, mock_document_service_dependencies):
"""Test that ValueError is raised when an invalid action is provided."""
dataset = DocumentBatchUpdateTestDataFactory.create_dataset_mock()
user = DocumentBatchUpdateTestDataFactory.create_user_mock()
# Create mock document
doc = DocumentBatchUpdateTestDataFactory.create_document_mock(enabled=True)
mock_document_service_dependencies["get_document"].return_value = doc
# Reset module-level Redis mock
redis_mock.reset_mock()
redis_mock.get.return_value = None
# Test with invalid action
invalid_action = "invalid_action"
with pytest.raises(ValueError) as exc_info:
DocumentService.batch_update_document_status(
dataset=dataset, document_ids=["doc-1"], action=invalid_action, user=user
)
# Verify error message contains the invalid action
assert invalid_action in str(exc_info.value)
assert "Invalid action" in str(exc_info.value)
# Verify no Redis operations occurred
redis_mock.setex.assert_not_called()
def test_batch_update_async_task_error_handling(
self, mock_document_service_dependencies, mock_async_task_dependencies
):
"""Test handling of async task errors during batch operations."""
dataset = DocumentBatchUpdateTestDataFactory.create_dataset_mock()
user = DocumentBatchUpdateTestDataFactory.create_user_mock()
# Create mock disabled document
disabled_doc = DocumentBatchUpdateTestDataFactory.create_document_mock(enabled=False)
mock_document_service_dependencies["get_document"].return_value = disabled_doc
# Mock async task to raise an exception
mock_async_task_dependencies["add_task"].delay.side_effect = Exception("Celery task error")
# Reset module-level Redis mock
redis_mock.reset_mock()
redis_mock.get.return_value = None
# Verify that async task error is propagated
with pytest.raises(Exception) as exc_info:
DocumentService.batch_update_document_status(
dataset=dataset, document_ids=["doc-1"], action="enable", user=user
)
# Verify error message
assert "Celery task error" in str(exc_info.value)
# Verify database operations completed successfully
mock_db = mock_document_service_dependencies["db_session"]
mock_db.add.assert_called_once()
mock_db.commit.assert_called_once()
# Verify Redis cache was set successfully
redis_mock.setex.assert_called_once_with("document_doc-1_indexing", 600, 1)
# Verify document was updated
self._assert_document_enabled(disabled_doc, user.id, mock_document_service_dependencies["current_time"])
# ==================== Edge Case Tests ====================
def test_batch_update_empty_document_list(self, mock_document_service_dependencies):
"""Test batch operations with an empty document ID list."""
dataset = DocumentBatchUpdateTestDataFactory.create_dataset_mock()
user = DocumentBatchUpdateTestDataFactory.create_user_mock()
# Call method with empty document list
result = DocumentService.batch_update_document_status(
dataset=dataset, document_ids=[], action="enable", user=user
)
# Verify no document lookups were performed
mock_document_service_dependencies["get_document"].assert_not_called()
# Verify method returns None (early return)
assert result is None
def test_batch_update_document_not_found_skipped(self, mock_document_service_dependencies):
"""Test behavior when some documents don't exist in the database."""
dataset = DocumentBatchUpdateTestDataFactory.create_dataset_mock()
user = DocumentBatchUpdateTestDataFactory.create_user_mock()
# Mock document service to return None (document not found)
mock_document_service_dependencies["get_document"].return_value = None
# Call method with non-existent document ID
# This should not raise an error, just skip the missing document
try:
DocumentService.batch_update_document_status(
dataset=dataset, document_ids=["non-existent-doc"], action="enable", user=user
)
except Exception as e:
pytest.fail(f"Method should not raise exception for missing documents: {e}")
# Verify document lookup was attempted
mock_document_service_dependencies["get_document"].assert_called_once_with(dataset.id, "non-existent-doc")
def test_batch_update_mixed_document_states_and_actions(
self, mock_document_service_dependencies, mock_async_task_dependencies
):
"""Test batch operations on documents with mixed states and various scenarios."""
dataset = DocumentBatchUpdateTestDataFactory.create_dataset_mock()
user = DocumentBatchUpdateTestDataFactory.create_user_mock()
# Create documents in various states
disabled_doc = DocumentBatchUpdateTestDataFactory.create_document_mock("doc-1", enabled=False)
enabled_doc = DocumentBatchUpdateTestDataFactory.create_document_mock("doc-2", enabled=True)
archived_doc = DocumentBatchUpdateTestDataFactory.create_document_mock("doc-3", enabled=True, archived=True)
# Mix of different document states
documents = [disabled_doc, enabled_doc, archived_doc]
mock_document_service_dependencies["get_document"].side_effect = documents
# Reset module-level Redis mock
redis_mock.reset_mock()
redis_mock.get.return_value = None
# Perform enable operation on mixed state documents
DocumentService.batch_update_document_status(
dataset=dataset, document_ids=["doc-1", "doc-2", "doc-3"], action="enable", user=user
)
# Verify only the disabled document was processed
# (enabled and archived documents should be skipped for enable action)
# Only one add should occur (for the disabled document that was enabled)
mock_db = mock_document_service_dependencies["db_session"]
mock_db.add.assert_called_once()
# Only one commit should occur
mock_db.commit.assert_called_once()
# Only one Redis setex should occur (for the document that was enabled)
redis_mock.setex.assert_called_once_with("document_doc-1_indexing", 600, 1)
# Only one async task should be triggered (for the document that was enabled)
mock_async_task_dependencies["add_task"].delay.assert_called_once_with("doc-1")
# ==================== Performance Tests ====================
def test_batch_update_large_document_list_performance(
self, mock_document_service_dependencies, mock_async_task_dependencies
):
"""Test batch operations with a large number of documents."""
dataset = DocumentBatchUpdateTestDataFactory.create_dataset_mock()
user = DocumentBatchUpdateTestDataFactory.create_user_mock()
# Create large list of document IDs
document_ids = [f"doc-{i}" for i in range(1, 101)] # 100 documents
# Create mock documents
mock_documents = DocumentBatchUpdateTestDataFactory.create_multiple_documents(
document_ids,
enabled=False, # All disabled, will be enabled
)
mock_document_service_dependencies["get_document"].side_effect = mock_documents
# Reset module-level Redis mock
redis_mock.reset_mock()
redis_mock.get.return_value = None
# Perform batch enable operation
DocumentService.batch_update_document_status(
dataset=dataset, document_ids=document_ids, action="enable", user=user
)
# Verify all documents were processed
assert mock_document_service_dependencies["get_document"].call_count == 100
# Verify all documents were updated
for mock_doc in mock_documents:
self._assert_document_enabled(mock_doc, user.id, mock_document_service_dependencies["current_time"])
# Verify database operations
mock_db = mock_document_service_dependencies["db_session"]
assert mock_db.add.call_count == 100
assert mock_db.commit.call_count == 1
# Verify Redis cache operations occurred for each document
assert redis_mock.setex.call_count == 100
# Verify async tasks were triggered for each document
assert mock_async_task_dependencies["add_task"].delay.call_count == 100
# Verify correct Redis cache keys were set
expected_redis_calls = [call(f"document_doc-{i}_indexing", 600, 1) for i in range(1, 101)]
redis_mock.setex.assert_has_calls(expected_redis_calls)
# Verify correct async task calls
expected_task_calls = [call(f"doc-{i}") for i in range(1, 101)]
mock_async_task_dependencies["add_task"].delay.assert_has_calls(expected_task_calls)
def test_batch_update_mixed_document_states_complex_scenario(
self, mock_document_service_dependencies, mock_async_task_dependencies
):
"""Test complex batch operations with documents in various states."""
dataset = DocumentBatchUpdateTestDataFactory.create_dataset_mock()
user = DocumentBatchUpdateTestDataFactory.create_user_mock()
# Create documents in various states
doc1 = DocumentBatchUpdateTestDataFactory.create_document_mock("doc-1", enabled=False) # Will be enabled
doc2 = DocumentBatchUpdateTestDataFactory.create_document_mock(
"doc-2", enabled=True
) # Already enabled, will be skipped
doc3 = DocumentBatchUpdateTestDataFactory.create_document_mock(
"doc-3", enabled=True
) # Already enabled, will be skipped
doc4 = DocumentBatchUpdateTestDataFactory.create_document_mock(
"doc-4", enabled=True
) # Not affected by enable action
doc5 = DocumentBatchUpdateTestDataFactory.create_document_mock(
"doc-5", enabled=True, archived=True
) # Not affected by enable action
doc6 = None # Non-existent, will be skipped
mock_document_service_dependencies["get_document"].side_effect = [doc1, doc2, doc3, doc4, doc5, doc6]
# Reset module-level Redis mock
redis_mock.reset_mock()
redis_mock.get.return_value = None
# Perform mixed batch operations
DocumentService.batch_update_document_status(
dataset=dataset,
document_ids=["doc-1", "doc-2", "doc-3", "doc-4", "doc-5", "doc-6"],
action="enable", # This will only affect doc1
user=user,
)
# Verify document 1 was enabled
self._assert_document_enabled(doc1, user.id, mock_document_service_dependencies["current_time"])
# Verify other documents were skipped appropriately
assert doc2.enabled == True # No change
assert doc3.enabled == True # No change
assert doc4.enabled == True # No change
assert doc5.enabled == True # No change
# Verify database commits occurred for processed documents
# Only doc1 should be added (others were skipped, doc6 doesn't exist)
mock_db = mock_document_service_dependencies["db_session"]
assert mock_db.add.call_count == 1
assert mock_db.commit.call_count == 1
# Verify Redis cache operations occurred for processed documents
# Only doc1 should have Redis operations
assert redis_mock.setex.call_count == 1
# Verify async tasks were triggered for processed documents
# Only doc1 should trigger tasks
assert mock_async_task_dependencies["add_task"].delay.call_count == 1
# Verify correct Redis cache keys were set
expected_redis_calls = [call("document_doc-1_indexing", 600, 1)]
redis_mock.setex.assert_has_calls(expected_redis_calls)
# Verify correct async task calls
expected_task_calls = [call("doc-1")]
mock_async_task_dependencies["add_task"].delay.assert_has_calls(expected_task_calls)