From 9fa50774b431fb6e6837532ea53fbf7228170f44 Mon Sep 17 00:00:00 2001 From: bohdansolovie <153934212+bohdansolovie@users.noreply.github.com> Date: Thu, 16 Apr 2026 00:36:33 -0400 Subject: [PATCH] test: migrate duplicate and vector index task integration tests to SQLAlchemy 2.0 APIs (#35292) Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com> --- .../test_deal_dataset_vector_index_task.py | 57 +++++++++++++------ .../test_duplicate_document_indexing_task.py | 25 ++++---- 2 files changed, 52 insertions(+), 30 deletions(-) diff --git a/api/tests/test_containers_integration_tests/tasks/test_deal_dataset_vector_index_task.py b/api/tests/test_containers_integration_tests/tasks/test_deal_dataset_vector_index_task.py index d457b59d58..48fec441c5 100644 --- a/api/tests/test_containers_integration_tests/tasks/test_deal_dataset_vector_index_task.py +++ b/api/tests/test_containers_integration_tests/tasks/test_deal_dataset_vector_index_task.py @@ -11,6 +11,7 @@ from unittest.mock import ANY, Mock, patch import pytest from faker import Faker +from sqlalchemy import select from core.rag.index_processor.constant.index_type import IndexStructureType from models.dataset import Dataset, Document, DocumentSegment @@ -221,7 +222,9 @@ class TestDealDatasetVectorIndexTask: deal_dataset_vector_index_task(dataset.id, "add") # Verify document status was updated to indexing then completed - updated_document = db_session_with_containers.query(Document).filter_by(id=document.id).first() + updated_document = db_session_with_containers.scalar( + select(Document).where(Document.id == document.id).limit(1) + ) assert updated_document.indexing_status == IndexingStatus.COMPLETED # Verify index processor load method was called @@ -322,7 +325,9 @@ class TestDealDatasetVectorIndexTask: deal_dataset_vector_index_task(dataset.id, "update") # Verify document status was updated to indexing then completed - updated_document = db_session_with_containers.query(Document).filter_by(id=document.id).first() + updated_document = db_session_with_containers.scalar( + select(Document).where(Document.id == document.id).limit(1) + ) assert updated_document.indexing_status == IndexingStatus.COMPLETED # Verify index processor clean and load methods were called @@ -431,7 +436,9 @@ class TestDealDatasetVectorIndexTask: deal_dataset_vector_index_task(dataset.id, "add") # Verify document status was updated to indexing then completed - updated_document = db_session_with_containers.query(Document).filter_by(id=document.id).first() + updated_document = db_session_with_containers.scalar( + select(Document).where(Document.id == document.id).limit(1) + ) assert updated_document.indexing_status == IndexingStatus.COMPLETED # Verify that no index processor load was called since no segments exist @@ -564,7 +571,9 @@ class TestDealDatasetVectorIndexTask: deal_dataset_vector_index_task(dataset.id, "add") # Verify document status was updated to error - updated_document = db_session_with_containers.query(Document).filter_by(id=document.id).first() + updated_document = db_session_with_containers.scalar( + select(Document).where(Document.id == document.id).limit(1) + ) assert updated_document.indexing_status == IndexingStatus.ERROR assert "Test exception during indexing" in updated_document.error @@ -635,7 +644,9 @@ class TestDealDatasetVectorIndexTask: deal_dataset_vector_index_task(dataset.id, "add") # Verify document status was updated to indexing then completed - updated_document = db_session_with_containers.query(Document).filter_by(id=document.id).first() + updated_document = db_session_with_containers.scalar( + select(Document).where(Document.id == document.id).limit(1) + ) assert updated_document.indexing_status == IndexingStatus.COMPLETED # Verify index processor was initialized with custom index type @@ -711,7 +722,9 @@ class TestDealDatasetVectorIndexTask: deal_dataset_vector_index_task(dataset.id, "add") # Verify document status was updated to indexing then completed - updated_document = db_session_with_containers.query(Document).filter_by(id=document.id).first() + updated_document = db_session_with_containers.scalar( + select(Document).where(Document.id == document.id).limit(1) + ) assert updated_document.indexing_status == IndexingStatus.COMPLETED # Verify index processor was initialized with the document's index type @@ -815,7 +828,9 @@ class TestDealDatasetVectorIndexTask: # Verify all documents were processed for document in documents: - updated_document = db_session_with_containers.query(Document).filter_by(id=document.id).first() + updated_document = db_session_with_containers.scalar( + select(Document).where(Document.id == document.id).limit(1) + ) assert updated_document.indexing_status == IndexingStatus.COMPLETED # Verify index processor load was called multiple times @@ -917,7 +932,9 @@ class TestDealDatasetVectorIndexTask: deal_dataset_vector_index_task(dataset.id, "add") # Verify final document status - updated_document = db_session_with_containers.query(Document).filter_by(id=document.id).first() + updated_document = db_session_with_containers.scalar( + select(Document).where(Document.id == document.id).limit(1) + ) assert updated_document.indexing_status == IndexingStatus.COMPLETED def test_deal_dataset_vector_index_task_with_disabled_documents( @@ -1027,12 +1044,14 @@ class TestDealDatasetVectorIndexTask: deal_dataset_vector_index_task(dataset.id, "add") # Verify only enabled document was processed - updated_enabled_document = db_session_with_containers.query(Document).filter_by(id=enabled_document.id).first() + updated_enabled_document = db_session_with_containers.scalar( + select(Document).where(Document.id == enabled_document.id).limit(1) + ) assert updated_enabled_document.indexing_status == IndexingStatus.COMPLETED # Verify disabled document status remains unchanged - updated_disabled_document = ( - db_session_with_containers.query(Document).filter_by(id=disabled_document.id).first() + updated_disabled_document = db_session_with_containers.scalar( + select(Document).where(Document.id == disabled_document.id).limit(1) ) assert updated_disabled_document.indexing_status == IndexingStatus.COMPLETED # Should not change @@ -1148,12 +1167,14 @@ class TestDealDatasetVectorIndexTask: deal_dataset_vector_index_task(dataset.id, "add") # Verify only active document was processed - updated_active_document = db_session_with_containers.query(Document).filter_by(id=active_document.id).first() + updated_active_document = db_session_with_containers.scalar( + select(Document).where(Document.id == active_document.id).limit(1) + ) assert updated_active_document.indexing_status == IndexingStatus.COMPLETED # Verify archived document status remains unchanged - updated_archived_document = ( - db_session_with_containers.query(Document).filter_by(id=archived_document.id).first() + updated_archived_document = db_session_with_containers.scalar( + select(Document).where(Document.id == archived_document.id).limit(1) ) assert updated_archived_document.indexing_status == IndexingStatus.COMPLETED # Should not change @@ -1269,14 +1290,14 @@ class TestDealDatasetVectorIndexTask: deal_dataset_vector_index_task(dataset.id, "add") # Verify only completed document was processed - updated_completed_document = ( - db_session_with_containers.query(Document).filter_by(id=completed_document.id).first() + updated_completed_document = db_session_with_containers.scalar( + select(Document).where(Document.id == completed_document.id).limit(1) ) assert updated_completed_document.indexing_status == IndexingStatus.COMPLETED # Verify incomplete document status remains unchanged - updated_incomplete_document = ( - db_session_with_containers.query(Document).filter_by(id=incomplete_document.id).first() + updated_incomplete_document = db_session_with_containers.scalar( + select(Document).where(Document.id == incomplete_document.id).limit(1) ) assert updated_incomplete_document.indexing_status == IndexingStatus.INDEXING # Should not change diff --git a/api/tests/test_containers_integration_tests/tasks/test_duplicate_document_indexing_task.py b/api/tests/test_containers_integration_tests/tasks/test_duplicate_document_indexing_task.py index 6a8e186958..39c58987fd 100644 --- a/api/tests/test_containers_integration_tests/tasks/test_duplicate_document_indexing_task.py +++ b/api/tests/test_containers_integration_tests/tasks/test_duplicate_document_indexing_task.py @@ -2,6 +2,7 @@ from unittest.mock import MagicMock, patch import pytest from faker import Faker +from sqlalchemy import select from core.indexing_runner import DocumentIsPausedError from core.rag.index_processor.constant.index_type import IndexStructureType, IndexTechniqueType @@ -317,7 +318,7 @@ class TestDuplicateDocumentIndexingTasks: # Verify documents were updated to parsing status # Re-query documents from database since _duplicate_document_indexing_task uses a different session for doc_id in document_ids: - updated_document = db_session_with_containers.query(Document).where(Document.id == doc_id).first() + updated_document = db_session_with_containers.scalar(select(Document).where(Document.id == doc_id).limit(1)) assert updated_document.indexing_status == IndexingStatus.PARSING assert updated_document.processing_started_at is not None @@ -362,14 +363,14 @@ class TestDuplicateDocumentIndexingTasks: # Verify segments were deleted from database # Re-query segments from database using captured IDs to avoid stale ORM instances for seg_id in segment_ids: - deleted_segment = ( - db_session_with_containers.query(DocumentSegment).where(DocumentSegment.id == seg_id).first() + deleted_segment = db_session_with_containers.scalar( + select(DocumentSegment).where(DocumentSegment.id == seg_id).limit(1) ) assert deleted_segment is None # Verify documents were updated to parsing status for doc_id in document_ids: - updated_document = db_session_with_containers.query(Document).where(Document.id == doc_id).first() + updated_document = db_session_with_containers.scalar(select(Document).where(Document.id == doc_id).limit(1)) assert updated_document.indexing_status == IndexingStatus.PARSING assert updated_document.processing_started_at is not None @@ -438,7 +439,7 @@ class TestDuplicateDocumentIndexingTasks: # Verify only existing documents were updated # Re-query documents from database since _duplicate_document_indexing_task uses a different session for doc_id in existing_document_ids: - updated_document = db_session_with_containers.query(Document).where(Document.id == doc_id).first() + updated_document = db_session_with_containers.scalar(select(Document).where(Document.id == doc_id).limit(1)) assert updated_document.indexing_status == IndexingStatus.PARSING assert updated_document.processing_started_at is not None @@ -485,7 +486,7 @@ class TestDuplicateDocumentIndexingTasks: # Verify documents were still updated to parsing status before the exception # Re-query documents from database since _duplicate_document_indexing_task close the session for doc_id in document_ids: - updated_document = db_session_with_containers.query(Document).where(Document.id == doc_id).first() + updated_document = db_session_with_containers.scalar(select(Document).where(Document.id == doc_id).limit(1)) assert updated_document.indexing_status == IndexingStatus.PARSING assert updated_document.processing_started_at is not None @@ -543,7 +544,7 @@ class TestDuplicateDocumentIndexingTasks: # Assert: Verify error handling # Re-query documents from database since _duplicate_document_indexing_task uses a different session for doc_id in document_ids: - updated_document = db_session_with_containers.query(Document).where(Document.id == doc_id).first() + updated_document = db_session_with_containers.scalar(select(Document).where(Document.id == doc_id).limit(1)) assert updated_document.indexing_status == IndexingStatus.ERROR assert updated_document.error is not None assert "batch upload" in updated_document.error.lower() @@ -585,7 +586,7 @@ class TestDuplicateDocumentIndexingTasks: # Assert: Verify error handling # Re-query documents from database since _duplicate_document_indexing_task uses a different session for doc_id in document_ids: - updated_document = db_session_with_containers.query(Document).where(Document.id == doc_id).first() + updated_document = db_session_with_containers.scalar(select(Document).where(Document.id == doc_id).limit(1)) assert updated_document.indexing_status == IndexingStatus.ERROR assert updated_document.error is not None assert "limit" in updated_document.error.lower() @@ -649,7 +650,7 @@ class TestDuplicateDocumentIndexingTasks: # Verify documents were processed for doc_id in document_ids: - updated_document = db_session_with_containers.query(Document).where(Document.id == doc_id).first() + updated_document = db_session_with_containers.scalar(select(Document).where(Document.id == doc_id).limit(1)) assert updated_document.indexing_status == IndexingStatus.PARSING @patch("tasks.duplicate_document_indexing_task.TenantIsolatedTaskQueue", autospec=True) @@ -692,7 +693,7 @@ class TestDuplicateDocumentIndexingTasks: # Verify documents were processed for doc_id in document_ids: - updated_document = db_session_with_containers.query(Document).where(Document.id == doc_id).first() + updated_document = db_session_with_containers.scalar(select(Document).where(Document.id == doc_id).limit(1)) assert updated_document.indexing_status == IndexingStatus.PARSING @patch("tasks.duplicate_document_indexing_task.TenantIsolatedTaskQueue", autospec=True) @@ -736,7 +737,7 @@ class TestDuplicateDocumentIndexingTasks: # Verify documents were processed for doc_id in document_ids: - updated_document = db_session_with_containers.query(Document).where(Document.id == doc_id).first() + updated_document = db_session_with_containers.scalar(select(Document).where(Document.id == doc_id).limit(1)) assert updated_document.indexing_status == IndexingStatus.PARSING @patch("tasks.duplicate_document_indexing_task.TenantIsolatedTaskQueue", autospec=True) @@ -851,7 +852,7 @@ class TestDuplicateDocumentIndexingTasks: # Assert for doc_id in document_ids: - updated_document = db_session_with_containers.query(Document).where(Document.id == doc_id).first() + updated_document = db_session_with_containers.scalar(select(Document).where(Document.id == doc_id).limit(1)) assert updated_document.is_paused is True assert updated_document.indexing_status == IndexingStatus.PARSING assert updated_document.display_status == "paused"