From 3e96c0c4689e8dc8c2a75b56b0e082af86e6fae7 Mon Sep 17 00:00:00 2001 From: Jacky Wu Date: Mon, 14 Jul 2025 11:16:10 +0400 Subject: [PATCH] fix: close session before doing long latency operation (#22306) --- api/core/rag/datasource/retrieval_service.py | 5 +++-- api/core/rag/retrieval/dataset_retrieval.py | 4 +++- .../nodes/knowledge_retrieval/knowledge_retrieval_node.py | 5 +++++ 3 files changed, 11 insertions(+), 3 deletions(-) diff --git a/api/core/rag/datasource/retrieval_service.py b/api/core/rag/datasource/retrieval_service.py index 2c5178241c..5a6903d3d5 100644 --- a/api/core/rag/datasource/retrieval_service.py +++ b/api/core/rag/datasource/retrieval_service.py @@ -3,7 +3,7 @@ from concurrent.futures import ThreadPoolExecutor from typing import Optional from flask import Flask, current_app -from sqlalchemy.orm import load_only +from sqlalchemy.orm import Session, load_only from configs import dify_config from core.rag.data_post_processor.data_post_processor import DataPostProcessor @@ -144,7 +144,8 @@ class RetrievalService: @classmethod def _get_dataset(cls, dataset_id: str) -> Optional[Dataset]: - return db.session.query(Dataset).filter(Dataset.id == dataset_id).first() + with Session(db.engine) as session: + return session.query(Dataset).filter(Dataset.id == dataset_id).first() @classmethod def keyword_search( diff --git a/api/core/rag/retrieval/dataset_retrieval.py b/api/core/rag/retrieval/dataset_retrieval.py index 3fca48be22..5c0360b064 100644 --- a/api/core/rag/retrieval/dataset_retrieval.py +++ b/api/core/rag/retrieval/dataset_retrieval.py @@ -9,6 +9,7 @@ from typing import Any, Optional, Union, cast from flask import Flask, current_app from sqlalchemy import Float, and_, or_, text from sqlalchemy import cast as sqlalchemy_cast +from sqlalchemy.orm import Session from core.app.app_config.entities import ( DatasetEntity, @@ -598,7 +599,8 @@ class DatasetRetrieval: metadata_condition: Optional[MetadataCondition] = None, ): with flask_app.app_context(): - dataset = db.session.query(Dataset).filter(Dataset.id == dataset_id).first() + with Session(db.engine) as session: + dataset = session.query(Dataset).filter(Dataset.id == dataset_id).first() if not dataset: return [] diff --git a/api/core/workflow/nodes/knowledge_retrieval/knowledge_retrieval_node.py b/api/core/workflow/nodes/knowledge_retrieval/knowledge_retrieval_node.py index b34d62d669..f05d93d83e 100644 --- a/api/core/workflow/nodes/knowledge_retrieval/knowledge_retrieval_node.py +++ b/api/core/workflow/nodes/knowledge_retrieval/knowledge_retrieval_node.py @@ -144,6 +144,8 @@ class KnowledgeRetrievalNode(LLMNode): error=str(e), error_type=type(e).__name__, ) + finally: + db.session.close() def _fetch_dataset_retriever(self, node_data: KnowledgeRetrievalNodeData, query: str) -> list[dict[str, Any]]: available_datasets = [] @@ -171,6 +173,9 @@ class KnowledgeRetrievalNode(LLMNode): .all() ) + # avoid blocking at retrieval + db.session.close() + for dataset in results: # pass if dataset is not available if not dataset: