fix document retry

This commit is contained in:
jyong 2025-09-16 14:52:33 +08:00
parent c4ddc6420a
commit c463f31f56
2 changed files with 8 additions and 3 deletions

View File

@ -72,6 +72,7 @@ class PipelineGenerator(BaseAppGenerator):
call_depth: int,
workflow_thread_pool_id: Optional[str],
is_retry: bool = False,
document_id: Optional[str] = None,
) -> Mapping[str, Any] | Generator[Mapping | str, None, None] | None: ...
@overload
@ -87,6 +88,7 @@ class PipelineGenerator(BaseAppGenerator):
call_depth: int,
workflow_thread_pool_id: Optional[str],
is_retry: bool = False,
document_id: Optional[str] = None,
) -> Mapping[str, Any]: ...
@overload
@ -102,6 +104,7 @@ class PipelineGenerator(BaseAppGenerator):
call_depth: int,
workflow_thread_pool_id: Optional[str],
is_retry: bool = False,
document_id: Optional[str] = None,
) -> Union[Mapping[str, Any], Generator[Mapping | str, None, None]]: ...
def generate(
@ -116,6 +119,7 @@ class PipelineGenerator(BaseAppGenerator):
call_depth: int = 0,
workflow_thread_pool_id: Optional[str] = None,
is_retry: bool = False,
documents: list[Document] = [],
) -> Union[Mapping[str, Any], Generator[Mapping | str, None, None], None]:
# Add null check for dataset
@ -134,7 +138,6 @@ class PipelineGenerator(BaseAppGenerator):
pipeline_config = PipelineConfigManager.get_pipeline_config(
pipeline=pipeline, workflow=workflow, start_node_id=start_node_id
)
documents = []
if invoke_from == InvokeFrom.PUBLISHED and not is_retry:
from services.dataset_service import DocumentService
for datasource_info in datasource_info_list:
@ -160,8 +163,9 @@ class PipelineGenerator(BaseAppGenerator):
for i, datasource_info in enumerate(datasource_info_list):
workflow_run_id = str(uuid.uuid4())
document_id = None
if invoke_from == InvokeFrom.PUBLISHED and not is_retry:
if documents:
document_id = documents[i].id
if invoke_from == InvokeFrom.PUBLISHED and not is_retry:
document_pipeline_execution_log = DocumentPipelineExecutionLog(
document_id=document_id,
datasource_type=datasource_type,
@ -218,7 +222,7 @@ class PipelineGenerator(BaseAppGenerator):
app_id=application_generate_entity.app_config.app_id,
triggered_from=WorkflowNodeExecutionTriggeredFrom.RAG_PIPELINE_RUN,
)
if invoke_from == InvokeFrom.DEBUGGER:
if invoke_from == InvokeFrom.DEBUGGER or is_retry:
return self._generate(
flask_app=current_app._get_current_object(), # type: ignore
context=contextvars.copy_context(),

View File

@ -1346,4 +1346,5 @@ class RagPipelineService:
call_depth=0,
workflow_thread_pool_id=None,
is_retry=True,
documents=[document],
)