From 3a3b60bab5e1f03aa93588f9303acc4a997149c5 Mon Sep 17 00:00:00 2001 From: jyong <718720800@qq.com> Date: Tue, 15 Jul 2025 15:00:38 +0800 Subject: [PATCH] r2 --- api/services/rag_pipeline/rag_pipeline.py | 36 ++++++++++++++++++++--- 1 file changed, 32 insertions(+), 4 deletions(-) diff --git a/api/services/rag_pipeline/rag_pipeline.py b/api/services/rag_pipeline/rag_pipeline.py index a227d04a1a..d083afcb0c 100644 --- a/api/services/rag_pipeline/rag_pipeline.py +++ b/api/services/rag_pipeline/rag_pipeline.py @@ -72,7 +72,7 @@ from services.entities.knowledge_entities.rag_pipeline_entities import ( ) from services.errors.app import WorkflowHashNotEqualError from services.rag_pipeline.pipeline_template.pipeline_template_factory import PipelineTemplateRetrievalFactory -from services.workflow_draft_variable_service import DraftVarLoader +from services.workflow_draft_variable_service import DraftVarLoader, DraftVariableSaver logger = logging.getLogger(__name__) @@ -389,7 +389,7 @@ class RagPipelineService: def run_draft_workflow_node( self, pipeline: Pipeline, node_id: str, user_inputs: dict, account: Account - ) -> WorkflowNodeExecution: + ) -> WorkflowNodeExecutionModel: """ Run draft workflow node """ @@ -400,6 +400,13 @@ class RagPipelineService: # run draft workflow node start_at = time.perf_counter() + node_config = draft_workflow.get_node_config_by_id(node_id) + + eclosing_node_type_and_id = draft_workflow.get_enclosing_node_type_and_id(node_config) + if eclosing_node_type_and_id: + _, enclosing_node_id = eclosing_node_type_and_id + else: + enclosing_node_id = None workflow_node_execution = self._handle_node_run_result( getter=lambda: WorkflowEntry.single_step_run( @@ -426,9 +433,30 @@ class RagPipelineService: ) workflow_node_execution.workflow_id = draft_workflow.id - db.session.add(workflow_node_execution) - db.session.commit() + # Create repository and save the node execution + repository = SQLAlchemyWorkflowNodeExecutionRepository( + session_factory=db.engine, + user=account, + app_id=pipeline.id, + triggered_from=WorkflowNodeExecutionTriggeredFrom.SINGLE_STEP, + ) + repository.save(workflow_node_execution) + # Convert node_execution to WorkflowNodeExecution after save + workflow_node_execution = repository.to_db_model(workflow_node_execution) + + with Session(bind=db.engine) as session, session.begin(): + draft_var_saver = DraftVariableSaver( + session=session, + app_id=pipeline.id, + node_id=workflow_node_execution.node_id, + node_type=NodeType(workflow_node_execution.node_type), + enclosing_node_id=enclosing_node_id, + node_execution_id=workflow_node_execution.id, + ) + draft_var_saver.save(process_data=workflow_node_execution.process_data, + outputs=workflow_node_execution.outputs) + session.commit() return workflow_node_execution def run_datasource_workflow_node(