diff --git a/api/core/workflow/nodes/datasource/datasource_node.py b/api/core/workflow/nodes/datasource/datasource_node.py index d294a9840a..d9c2750ec5 100644 --- a/api/core/workflow/nodes/datasource/datasource_node.py +++ b/api/core/workflow/nodes/datasource/datasource_node.py @@ -478,7 +478,7 @@ class DatasourceNode(Node): yield StreamCompletedEvent( node_run_result=NodeRunResult( status=WorkflowNodeExecutionStatus.SUCCEEDED, - outputs={"json": json, "files": files, **variables, "text": text}, + outputs={**variables}, metadata={ WorkflowNodeExecutionMetadataKey.DATASOURCE_INFO: datasource_info, }, diff --git a/api/services/rag_pipeline/rag_pipeline.py b/api/services/rag_pipeline/rag_pipeline.py index 05d74f3692..6ae9d68e5f 100644 --- a/api/services/rag_pipeline/rag_pipeline.py +++ b/api/services/rag_pipeline/rag_pipeline.py @@ -1147,30 +1147,21 @@ class RagPipelineService: return node_exec def set_datasource_variables(self, pipeline: Pipeline, args: dict, current_user: Account | EndUser): + """ + Set datasource variables + """ + # fetch draft workflow by app_model draft_workflow = self.get_draft_workflow(pipeline=pipeline) if not draft_workflow: raise ValueError("Workflow not initialized") - workflow_node_execution = WorkflowNodeExecution( - id=str(uuid4()), - workflow_id=draft_workflow.id, - index=1, - node_id=args.get("start_node_id", ""), - node_type=NodeType.DATASOURCE, - title=args.get("start_node_title", "Datasource"), - elapsed_time=0, - finished_at=datetime.now(UTC).replace(tzinfo=None), - created_at=datetime.now(UTC).replace(tzinfo=None), - status=WorkflowNodeExecutionStatus.SUCCEEDED, - inputs=None, - metadata=None, - ) - outputs = { - **args.get("datasource_info", {}), - "datasource_type": args.get("datasource_type", ""), - } - workflow_node_execution.outputs = outputs - node_config = draft_workflow.get_node_config_by_id(args.get("start_node_id", "")) + + # run draft workflow node + start_at = time.perf_counter() + node_id = args.get("start_node_id") + if not node_id: + raise ValueError("Node id is required") + node_config = draft_workflow.get_node_config_by_id(node_id) eclosing_node_type_and_id = draft_workflow.get_enclosing_node_type_and_id(node_config) if eclosing_node_type_and_id: @@ -1178,6 +1169,36 @@ class RagPipelineService: else: enclosing_node_id = None + system_inputs = SystemVariable( + datasource_type=args.get("datasource_type", "online_document"), + datasource_info=args.get("datasource_info", {}), + ) + + workflow_node_execution = self._handle_node_run_result( + getter=lambda: WorkflowEntry.single_step_run( + workflow=draft_workflow, + node_id=node_id, + user_inputs={}, + user_id=current_user.id, + variable_pool=VariablePool( + system_variables=system_inputs, + user_inputs={}, + environment_variables=[], + conversation_variables=[], + rag_pipeline_variables=[], + ), + variable_loader=DraftVarLoader( + engine=db.engine, + app_id=pipeline.id, + tenant_id=pipeline.tenant_id, + ), + ), + start_at=start_at, + tenant_id=pipeline.tenant_id, + node_id=node_id, + ) + workflow_node_execution.workflow_id = draft_workflow.id + # Create repository and save the node execution repository = SQLAlchemyWorkflowNodeExecutionRepository( session_factory=db.engine,