test: migrate duplicate and vector index task integration tests to SQLAlchemy 2.0 APIs (#35292)

Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
This commit is contained in:
bohdansolovie 2026-04-16 00:36:33 -04:00 committed by GitHub
parent 731414a44f
commit 9fa50774b4
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 52 additions and 30 deletions

View File

@ -11,6 +11,7 @@ from unittest.mock import ANY, Mock, patch
import pytest
from faker import Faker
from sqlalchemy import select
from core.rag.index_processor.constant.index_type import IndexStructureType
from models.dataset import Dataset, Document, DocumentSegment
@ -221,7 +222,9 @@ class TestDealDatasetVectorIndexTask:
deal_dataset_vector_index_task(dataset.id, "add")
# Verify document status was updated to indexing then completed
updated_document = db_session_with_containers.query(Document).filter_by(id=document.id).first()
updated_document = db_session_with_containers.scalar(
select(Document).where(Document.id == document.id).limit(1)
)
assert updated_document.indexing_status == IndexingStatus.COMPLETED
# Verify index processor load method was called
@ -322,7 +325,9 @@ class TestDealDatasetVectorIndexTask:
deal_dataset_vector_index_task(dataset.id, "update")
# Verify document status was updated to indexing then completed
updated_document = db_session_with_containers.query(Document).filter_by(id=document.id).first()
updated_document = db_session_with_containers.scalar(
select(Document).where(Document.id == document.id).limit(1)
)
assert updated_document.indexing_status == IndexingStatus.COMPLETED
# Verify index processor clean and load methods were called
@ -431,7 +436,9 @@ class TestDealDatasetVectorIndexTask:
deal_dataset_vector_index_task(dataset.id, "add")
# Verify document status was updated to indexing then completed
updated_document = db_session_with_containers.query(Document).filter_by(id=document.id).first()
updated_document = db_session_with_containers.scalar(
select(Document).where(Document.id == document.id).limit(1)
)
assert updated_document.indexing_status == IndexingStatus.COMPLETED
# Verify that no index processor load was called since no segments exist
@ -564,7 +571,9 @@ class TestDealDatasetVectorIndexTask:
deal_dataset_vector_index_task(dataset.id, "add")
# Verify document status was updated to error
updated_document = db_session_with_containers.query(Document).filter_by(id=document.id).first()
updated_document = db_session_with_containers.scalar(
select(Document).where(Document.id == document.id).limit(1)
)
assert updated_document.indexing_status == IndexingStatus.ERROR
assert "Test exception during indexing" in updated_document.error
@ -635,7 +644,9 @@ class TestDealDatasetVectorIndexTask:
deal_dataset_vector_index_task(dataset.id, "add")
# Verify document status was updated to indexing then completed
updated_document = db_session_with_containers.query(Document).filter_by(id=document.id).first()
updated_document = db_session_with_containers.scalar(
select(Document).where(Document.id == document.id).limit(1)
)
assert updated_document.indexing_status == IndexingStatus.COMPLETED
# Verify index processor was initialized with custom index type
@ -711,7 +722,9 @@ class TestDealDatasetVectorIndexTask:
deal_dataset_vector_index_task(dataset.id, "add")
# Verify document status was updated to indexing then completed
updated_document = db_session_with_containers.query(Document).filter_by(id=document.id).first()
updated_document = db_session_with_containers.scalar(
select(Document).where(Document.id == document.id).limit(1)
)
assert updated_document.indexing_status == IndexingStatus.COMPLETED
# Verify index processor was initialized with the document's index type
@ -815,7 +828,9 @@ class TestDealDatasetVectorIndexTask:
# Verify all documents were processed
for document in documents:
updated_document = db_session_with_containers.query(Document).filter_by(id=document.id).first()
updated_document = db_session_with_containers.scalar(
select(Document).where(Document.id == document.id).limit(1)
)
assert updated_document.indexing_status == IndexingStatus.COMPLETED
# Verify index processor load was called multiple times
@ -917,7 +932,9 @@ class TestDealDatasetVectorIndexTask:
deal_dataset_vector_index_task(dataset.id, "add")
# Verify final document status
updated_document = db_session_with_containers.query(Document).filter_by(id=document.id).first()
updated_document = db_session_with_containers.scalar(
select(Document).where(Document.id == document.id).limit(1)
)
assert updated_document.indexing_status == IndexingStatus.COMPLETED
def test_deal_dataset_vector_index_task_with_disabled_documents(
@ -1027,12 +1044,14 @@ class TestDealDatasetVectorIndexTask:
deal_dataset_vector_index_task(dataset.id, "add")
# Verify only enabled document was processed
updated_enabled_document = db_session_with_containers.query(Document).filter_by(id=enabled_document.id).first()
updated_enabled_document = db_session_with_containers.scalar(
select(Document).where(Document.id == enabled_document.id).limit(1)
)
assert updated_enabled_document.indexing_status == IndexingStatus.COMPLETED
# Verify disabled document status remains unchanged
updated_disabled_document = (
db_session_with_containers.query(Document).filter_by(id=disabled_document.id).first()
updated_disabled_document = db_session_with_containers.scalar(
select(Document).where(Document.id == disabled_document.id).limit(1)
)
assert updated_disabled_document.indexing_status == IndexingStatus.COMPLETED # Should not change
@ -1148,12 +1167,14 @@ class TestDealDatasetVectorIndexTask:
deal_dataset_vector_index_task(dataset.id, "add")
# Verify only active document was processed
updated_active_document = db_session_with_containers.query(Document).filter_by(id=active_document.id).first()
updated_active_document = db_session_with_containers.scalar(
select(Document).where(Document.id == active_document.id).limit(1)
)
assert updated_active_document.indexing_status == IndexingStatus.COMPLETED
# Verify archived document status remains unchanged
updated_archived_document = (
db_session_with_containers.query(Document).filter_by(id=archived_document.id).first()
updated_archived_document = db_session_with_containers.scalar(
select(Document).where(Document.id == archived_document.id).limit(1)
)
assert updated_archived_document.indexing_status == IndexingStatus.COMPLETED # Should not change
@ -1269,14 +1290,14 @@ class TestDealDatasetVectorIndexTask:
deal_dataset_vector_index_task(dataset.id, "add")
# Verify only completed document was processed
updated_completed_document = (
db_session_with_containers.query(Document).filter_by(id=completed_document.id).first()
updated_completed_document = db_session_with_containers.scalar(
select(Document).where(Document.id == completed_document.id).limit(1)
)
assert updated_completed_document.indexing_status == IndexingStatus.COMPLETED
# Verify incomplete document status remains unchanged
updated_incomplete_document = (
db_session_with_containers.query(Document).filter_by(id=incomplete_document.id).first()
updated_incomplete_document = db_session_with_containers.scalar(
select(Document).where(Document.id == incomplete_document.id).limit(1)
)
assert updated_incomplete_document.indexing_status == IndexingStatus.INDEXING # Should not change

View File

@ -2,6 +2,7 @@ from unittest.mock import MagicMock, patch
import pytest
from faker import Faker
from sqlalchemy import select
from core.indexing_runner import DocumentIsPausedError
from core.rag.index_processor.constant.index_type import IndexStructureType, IndexTechniqueType
@ -317,7 +318,7 @@ class TestDuplicateDocumentIndexingTasks:
# Verify documents were updated to parsing status
# Re-query documents from database since _duplicate_document_indexing_task uses a different session
for doc_id in document_ids:
updated_document = db_session_with_containers.query(Document).where(Document.id == doc_id).first()
updated_document = db_session_with_containers.scalar(select(Document).where(Document.id == doc_id).limit(1))
assert updated_document.indexing_status == IndexingStatus.PARSING
assert updated_document.processing_started_at is not None
@ -362,14 +363,14 @@ class TestDuplicateDocumentIndexingTasks:
# Verify segments were deleted from database
# Re-query segments from database using captured IDs to avoid stale ORM instances
for seg_id in segment_ids:
deleted_segment = (
db_session_with_containers.query(DocumentSegment).where(DocumentSegment.id == seg_id).first()
deleted_segment = db_session_with_containers.scalar(
select(DocumentSegment).where(DocumentSegment.id == seg_id).limit(1)
)
assert deleted_segment is None
# Verify documents were updated to parsing status
for doc_id in document_ids:
updated_document = db_session_with_containers.query(Document).where(Document.id == doc_id).first()
updated_document = db_session_with_containers.scalar(select(Document).where(Document.id == doc_id).limit(1))
assert updated_document.indexing_status == IndexingStatus.PARSING
assert updated_document.processing_started_at is not None
@ -438,7 +439,7 @@ class TestDuplicateDocumentIndexingTasks:
# Verify only existing documents were updated
# Re-query documents from database since _duplicate_document_indexing_task uses a different session
for doc_id in existing_document_ids:
updated_document = db_session_with_containers.query(Document).where(Document.id == doc_id).first()
updated_document = db_session_with_containers.scalar(select(Document).where(Document.id == doc_id).limit(1))
assert updated_document.indexing_status == IndexingStatus.PARSING
assert updated_document.processing_started_at is not None
@ -485,7 +486,7 @@ class TestDuplicateDocumentIndexingTasks:
# Verify documents were still updated to parsing status before the exception
# Re-query documents from database since _duplicate_document_indexing_task close the session
for doc_id in document_ids:
updated_document = db_session_with_containers.query(Document).where(Document.id == doc_id).first()
updated_document = db_session_with_containers.scalar(select(Document).where(Document.id == doc_id).limit(1))
assert updated_document.indexing_status == IndexingStatus.PARSING
assert updated_document.processing_started_at is not None
@ -543,7 +544,7 @@ class TestDuplicateDocumentIndexingTasks:
# Assert: Verify error handling
# Re-query documents from database since _duplicate_document_indexing_task uses a different session
for doc_id in document_ids:
updated_document = db_session_with_containers.query(Document).where(Document.id == doc_id).first()
updated_document = db_session_with_containers.scalar(select(Document).where(Document.id == doc_id).limit(1))
assert updated_document.indexing_status == IndexingStatus.ERROR
assert updated_document.error is not None
assert "batch upload" in updated_document.error.lower()
@ -585,7 +586,7 @@ class TestDuplicateDocumentIndexingTasks:
# Assert: Verify error handling
# Re-query documents from database since _duplicate_document_indexing_task uses a different session
for doc_id in document_ids:
updated_document = db_session_with_containers.query(Document).where(Document.id == doc_id).first()
updated_document = db_session_with_containers.scalar(select(Document).where(Document.id == doc_id).limit(1))
assert updated_document.indexing_status == IndexingStatus.ERROR
assert updated_document.error is not None
assert "limit" in updated_document.error.lower()
@ -649,7 +650,7 @@ class TestDuplicateDocumentIndexingTasks:
# Verify documents were processed
for doc_id in document_ids:
updated_document = db_session_with_containers.query(Document).where(Document.id == doc_id).first()
updated_document = db_session_with_containers.scalar(select(Document).where(Document.id == doc_id).limit(1))
assert updated_document.indexing_status == IndexingStatus.PARSING
@patch("tasks.duplicate_document_indexing_task.TenantIsolatedTaskQueue", autospec=True)
@ -692,7 +693,7 @@ class TestDuplicateDocumentIndexingTasks:
# Verify documents were processed
for doc_id in document_ids:
updated_document = db_session_with_containers.query(Document).where(Document.id == doc_id).first()
updated_document = db_session_with_containers.scalar(select(Document).where(Document.id == doc_id).limit(1))
assert updated_document.indexing_status == IndexingStatus.PARSING
@patch("tasks.duplicate_document_indexing_task.TenantIsolatedTaskQueue", autospec=True)
@ -736,7 +737,7 @@ class TestDuplicateDocumentIndexingTasks:
# Verify documents were processed
for doc_id in document_ids:
updated_document = db_session_with_containers.query(Document).where(Document.id == doc_id).first()
updated_document = db_session_with_containers.scalar(select(Document).where(Document.id == doc_id).limit(1))
assert updated_document.indexing_status == IndexingStatus.PARSING
@patch("tasks.duplicate_document_indexing_task.TenantIsolatedTaskQueue", autospec=True)
@ -851,7 +852,7 @@ class TestDuplicateDocumentIndexingTasks:
# Assert
for doc_id in document_ids:
updated_document = db_session_with_containers.query(Document).where(Document.id == doc_id).first()
updated_document = db_session_with_containers.scalar(select(Document).where(Document.id == doc_id).limit(1))
assert updated_document.is_paused is True
assert updated_document.indexing_status == IndexingStatus.PARSING
assert updated_document.display_status == "paused"