From c463f31f560abc76420af9684549dd6b6ba24298 Mon Sep 17 00:00:00 2001 From: jyong <718720800@qq.com> Date: Tue, 16 Sep 2025 14:52:33 +0800 Subject: [PATCH] fix document retry --- api/core/app/apps/pipeline/pipeline_generator.py | 10 +++++++--- api/services/rag_pipeline/rag_pipeline.py | 1 + 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/api/core/app/apps/pipeline/pipeline_generator.py b/api/core/app/apps/pipeline/pipeline_generator.py index 8751197767..0e13599c30 100644 --- a/api/core/app/apps/pipeline/pipeline_generator.py +++ b/api/core/app/apps/pipeline/pipeline_generator.py @@ -72,6 +72,7 @@ class PipelineGenerator(BaseAppGenerator): call_depth: int, workflow_thread_pool_id: Optional[str], is_retry: bool = False, + document_id: Optional[str] = None, ) -> Mapping[str, Any] | Generator[Mapping | str, None, None] | None: ... @overload @@ -87,6 +88,7 @@ class PipelineGenerator(BaseAppGenerator): call_depth: int, workflow_thread_pool_id: Optional[str], is_retry: bool = False, + document_id: Optional[str] = None, ) -> Mapping[str, Any]: ... @overload @@ -102,6 +104,7 @@ class PipelineGenerator(BaseAppGenerator): call_depth: int, workflow_thread_pool_id: Optional[str], is_retry: bool = False, + document_id: Optional[str] = None, ) -> Union[Mapping[str, Any], Generator[Mapping | str, None, None]]: ... def generate( @@ -116,6 +119,7 @@ class PipelineGenerator(BaseAppGenerator): call_depth: int = 0, workflow_thread_pool_id: Optional[str] = None, is_retry: bool = False, + documents: list[Document] = [], ) -> Union[Mapping[str, Any], Generator[Mapping | str, None, None], None]: # Add null check for dataset @@ -134,7 +138,6 @@ class PipelineGenerator(BaseAppGenerator): pipeline_config = PipelineConfigManager.get_pipeline_config( pipeline=pipeline, workflow=workflow, start_node_id=start_node_id ) - documents = [] if invoke_from == InvokeFrom.PUBLISHED and not is_retry: from services.dataset_service import DocumentService for datasource_info in datasource_info_list: @@ -160,8 +163,9 @@ class PipelineGenerator(BaseAppGenerator): for i, datasource_info in enumerate(datasource_info_list): workflow_run_id = str(uuid.uuid4()) document_id = None - if invoke_from == InvokeFrom.PUBLISHED and not is_retry: + if documents: document_id = documents[i].id + if invoke_from == InvokeFrom.PUBLISHED and not is_retry: document_pipeline_execution_log = DocumentPipelineExecutionLog( document_id=document_id, datasource_type=datasource_type, @@ -218,7 +222,7 @@ class PipelineGenerator(BaseAppGenerator): app_id=application_generate_entity.app_config.app_id, triggered_from=WorkflowNodeExecutionTriggeredFrom.RAG_PIPELINE_RUN, ) - if invoke_from == InvokeFrom.DEBUGGER: + if invoke_from == InvokeFrom.DEBUGGER or is_retry: return self._generate( flask_app=current_app._get_current_object(), # type: ignore context=contextvars.copy_context(), diff --git a/api/services/rag_pipeline/rag_pipeline.py b/api/services/rag_pipeline/rag_pipeline.py index a9aca31439..f9ef050c52 100644 --- a/api/services/rag_pipeline/rag_pipeline.py +++ b/api/services/rag_pipeline/rag_pipeline.py @@ -1346,4 +1346,5 @@ class RagPipelineService: call_depth=0, workflow_thread_pool_id=None, is_retry=True, + documents=[document], )