From 3e995e6a6df3a275b1b45c574d63424f360f403c Mon Sep 17 00:00:00 2001 From: Renzo <170978465+RenzoMXD@users.noreply.github.com> Date: Tue, 7 Apr 2026 00:53:21 -0500 Subject: [PATCH] refactor: migrate session.query to select API in document task files (#34646) --- api/tasks/clean_notion_document_task.py | 4 ++-- api/tasks/document_indexing_update_task.py | 6 ++++-- api/tasks/retry_document_indexing_task.py | 16 +++++++--------- 3 files changed, 13 insertions(+), 13 deletions(-) diff --git a/api/tasks/clean_notion_document_task.py b/api/tasks/clean_notion_document_task.py index c22ee761d8..e3be24ac74 100644 --- a/api/tasks/clean_notion_document_task.py +++ b/api/tasks/clean_notion_document_task.py @@ -26,7 +26,7 @@ def clean_notion_document_task(document_ids: list[str], dataset_id: str): total_index_node_ids = [] with session_factory.create_session() as session: - dataset = session.query(Dataset).where(Dataset.id == dataset_id).first() + dataset = session.scalar(select(Dataset).where(Dataset.id == dataset_id).limit(1)) if not dataset: raise Exception("Document has no dataset") @@ -41,7 +41,7 @@ def clean_notion_document_task(document_ids: list[str], dataset_id: str): total_index_node_ids.extend([segment.index_node_id for segment in segments]) with session_factory.create_session() as session: - dataset = session.query(Dataset).where(Dataset.id == dataset_id).first() + dataset = session.scalar(select(Dataset).where(Dataset.id == dataset_id).limit(1)) if dataset: index_processor.clean( dataset, total_index_node_ids, with_keywords=True, delete_child_chunks=True, delete_summaries=True diff --git a/api/tasks/document_indexing_update_task.py b/api/tasks/document_indexing_update_task.py index 62bce24de4..15f0e0162b 100644 --- a/api/tasks/document_indexing_update_task.py +++ b/api/tasks/document_indexing_update_task.py @@ -28,7 +28,9 @@ def document_indexing_update_task(dataset_id: str, document_id: str): start_at = time.perf_counter() with session_factory.create_session() as session, session.begin(): - document = session.query(Document).where(Document.id == document_id, Document.dataset_id == dataset_id).first() + document = session.scalar( + select(Document).where(Document.id == document_id, Document.dataset_id == dataset_id).limit(1) + ) if not document: logger.info(click.style(f"Document not found: {document_id}", fg="red")) @@ -37,7 +39,7 @@ def document_indexing_update_task(dataset_id: str, document_id: str): document.indexing_status = IndexingStatus.PARSING document.processing_started_at = naive_utc_now() - dataset = session.query(Dataset).where(Dataset.id == dataset_id).first() + dataset = session.scalar(select(Dataset).where(Dataset.id == dataset_id).limit(1)) if not dataset: return diff --git a/api/tasks/retry_document_indexing_task.py b/api/tasks/retry_document_indexing_task.py index 4fcb0cf804..7cc28d5226 100644 --- a/api/tasks/retry_document_indexing_task.py +++ b/api/tasks/retry_document_indexing_task.py @@ -32,15 +32,15 @@ def retry_document_indexing_task(dataset_id: str, document_ids: list[str], user_ start_at = time.perf_counter() with session_factory.create_session() as session: try: - dataset = session.query(Dataset).where(Dataset.id == dataset_id).first() + dataset = session.scalar(select(Dataset).where(Dataset.id == dataset_id).limit(1)) if not dataset: logger.info(click.style(f"Dataset not found: {dataset_id}", fg="red")) return - user = session.query(Account).where(Account.id == user_id).first() + user = session.scalar(select(Account).where(Account.id == user_id).limit(1)) if not user: logger.info(click.style(f"User not found: {user_id}", fg="red")) return - tenant = session.query(Tenant).where(Tenant.id == dataset.tenant_id).first() + tenant = session.scalar(select(Tenant).where(Tenant.id == dataset.tenant_id).limit(1)) if not tenant: raise ValueError("Tenant not found") user.current_tenant = tenant @@ -58,10 +58,8 @@ def retry_document_indexing_task(dataset_id: str, document_ids: list[str], user_ "your subscription." ) except Exception as e: - document = ( - session.query(Document) - .where(Document.id == document_id, Document.dataset_id == dataset_id) - .first() + document = session.scalar( + select(Document).where(Document.id == document_id, Document.dataset_id == dataset_id).limit(1) ) if document: document.indexing_status = IndexingStatus.ERROR @@ -73,8 +71,8 @@ def retry_document_indexing_task(dataset_id: str, document_ids: list[str], user_ return logger.info(click.style(f"Start retry document: {document_id}", fg="green")) - document = ( - session.query(Document).where(Document.id == document_id, Document.dataset_id == dataset_id).first() + document = session.scalar( + select(Document).where(Document.id == document_id, Document.dataset_id == dataset_id).limit(1) ) if not document: logger.info(click.style(f"Document not found: {document_id}", fg="yellow"))