diff --git a/api/controllers/console/datasets/rag_pipeline/rag_pipeline_workflow.py b/api/controllers/console/datasets/rag_pipeline/rag_pipeline_workflow.py index f1a1f5f2b8..f4031ec5a9 100644 --- a/api/controllers/console/datasets/rag_pipeline/rag_pipeline_workflow.py +++ b/api/controllers/console/datasets/rag_pipeline/rag_pipeline_workflow.py @@ -1010,14 +1010,6 @@ api.add_resource( RagPipelinePublishedDatasourceNodeRunApi, "/rag/pipelines//workflows/published/datasource/nodes//run", ) -# api.add_resource( -# RagPipelinePublishedDatasourceNodeRunStatusApi, -# "/rag/pipelines//workflows/published/datasource/nodes//run-status", -# ) -# api.add_resource( -# RagPipelineDraftDatasourceNodeRunStatusApi, -# "/rag/pipelines//workflows/draft/datasource/nodes//run-status", -# ) api.add_resource( RagPipelineDraftDatasourceNodeRunApi, diff --git a/api/core/app/apps/common/workflow_response_converter.py b/api/core/app/apps/common/workflow_response_converter.py index b3a94e6d9f..16944d8670 100644 --- a/api/core/app/apps/common/workflow_response_converter.py +++ b/api/core/app/apps/common/workflow_response_converter.py @@ -179,11 +179,10 @@ class WorkflowResponseConverter: provider_id=event.provider_id, ) elif event.node_type == NodeType.DATASOURCE: - node_data = cast(DatasourceNodeData, event.node_data) manager = PluginDatasourceManager() provider_entity = manager.fetch_datasource_provider( self._application_generate_entity.app_config.tenant_id, - f"{node_data.plugin_id}/{node_data.provider_name}", + event.provider_id, ) response.data.extras["icon"] = provider_entity.declaration.identity.icon diff --git a/api/core/app/apps/pipeline/pipeline_runner.py b/api/core/app/apps/pipeline/pipeline_runner.py index bc2e5c1bce..abd9d26052 100644 --- a/api/core/app/apps/pipeline/pipeline_runner.py +++ b/api/core/app/apps/pipeline/pipeline_runner.py @@ -217,26 +217,27 @@ class PipelineRunner(WorkflowBasedAppRunner): if not isinstance(graph_config.get("edges"), list): raise ValueError("edges in workflow graph must be a list") - nodes = graph_config.get("nodes", []) - edges = graph_config.get("edges", []) - real_run_nodes = [] - real_edges = [] - exclude_node_ids = [] - for node in nodes: - node_id = node.get("id") - node_type = node.get("data", {}).get("type", "") - if node_type == "datasource": - if start_node_id != node_id: - exclude_node_ids.append(node_id) - continue - real_run_nodes.append(node) - for edge in edges: - if edge.get("source") in exclude_node_ids: - continue - real_edges.append(edge) - graph_config = dict(graph_config) - graph_config["nodes"] = real_run_nodes - graph_config["edges"] = real_edges + # nodes = graph_config.get("nodes", []) + # edges = graph_config.get("edges", []) + # real_run_nodes = [] + # real_edges = [] + # exclude_node_ids = [] + # for node in nodes: + # node_id = node.get("id") + # node_type = node.get("data", {}).get("type", "") + # if node_type == "datasource": + # if start_node_id != node_id: + # exclude_node_ids.append(node_id) + # continue + # real_run_nodes.append(node) + + # for edge in edges: + # if edge.get("source") in exclude_node_ids: + # continue + # real_edges.append(edge) + # graph_config = dict(graph_config) + # graph_config["nodes"] = real_run_nodes + # graph_config["edges"] = real_edges # init graph # Create required parameters for Graph.init graph_init_params = GraphInitParams( diff --git a/api/core/workflow/nodes/base/node.py b/api/core/workflow/nodes/base/node.py index 8816e22a85..c888884b21 100644 --- a/api/core/workflow/nodes/base/node.py +++ b/api/core/workflow/nodes/base/node.py @@ -117,6 +117,12 @@ class Node: if isinstance(self, ToolNode): start_event.provider_id = getattr(self.get_base_node_data(), "provider_id", "") start_event.provider_type = getattr(self.get_base_node_data(), "provider_type", "") + + from core.workflow.nodes.datasource.datasource_node import DatasourceNode + + if isinstance(self, DatasourceNode): + start_event.provider_id = f"{getattr(self.get_base_node_data(), 'plugin_id', '')}/{getattr(self.get_base_node_data(), 'provider_name', '')}" + start_event.provider_type = getattr(self.get_base_node_data(), "provider_type", "") from typing import cast diff --git a/api/core/workflow/nodes/datasource/datasource_node.py b/api/core/workflow/nodes/datasource/datasource_node.py index 1ccb062cd5..d294a9840a 100644 --- a/api/core/workflow/nodes/datasource/datasource_node.py +++ b/api/core/workflow/nodes/datasource/datasource_node.py @@ -314,7 +314,7 @@ class DatasourceNode(Node): *, graph_config: Mapping[str, Any], node_id: str, - node_data: DatasourceNodeData, + node_data: Mapping[str, Any], ) -> Mapping[str, Sequence[str]]: """ Extract variable selector to variable mapping @@ -323,10 +323,11 @@ class DatasourceNode(Node): :param node_data: node data :return: """ + typed_node_data = DatasourceNodeData.model_validate(node_data) result = {} - if node_data.datasource_parameters: - for parameter_name in node_data.datasource_parameters: - input = node_data.datasource_parameters[parameter_name] + if typed_node_data.datasource_parameters: + for parameter_name in typed_node_data.datasource_parameters: + input = typed_node_data.datasource_parameters[parameter_name] if input.type == "mixed": assert isinstance(input.value, str) selectors = VariableTemplateParser(input.value).extract_variable_selectors() @@ -430,7 +431,7 @@ class DatasourceNode(Node): ) elif message.type == DatasourceMessage.MessageType.JSON: assert isinstance(message.message, DatasourceMessage.JsonMessage) - if self._node_type == NodeType.AGENT: + if self.node_type == NodeType.AGENT: msg_metadata = message.message.json_object.pop("execution_metadata", {}) agent_execution_metadata = { key: value