diff --git a/api/core/app/apps/pipeline/pipeline_runner.py b/api/core/app/apps/pipeline/pipeline_runner.py index 51c4865ad1..89ada53831 100644 --- a/api/core/app/apps/pipeline/pipeline_runner.py +++ b/api/core/app/apps/pipeline/pipeline_runner.py @@ -109,19 +109,19 @@ class PipelineRunner(WorkflowBasedAppRunner): files = self.application_generate_entity.files # Create a variable pool. - system_inputs = { - SystemVariableKey.FILES: files, - SystemVariableKey.USER_ID: user_id, - SystemVariableKey.APP_ID: app_config.app_id, - SystemVariableKey.WORKFLOW_ID: app_config.workflow_id, - SystemVariableKey.WORKFLOW_EXECUTION_ID: self.application_generate_entity.workflow_execution_id, - SystemVariableKey.DOCUMENT_ID: self.application_generate_entity.document_id, - SystemVariableKey.BATCH: self.application_generate_entity.batch, - SystemVariableKey.DATASET_ID: self.application_generate_entity.dataset_id, - SystemVariableKey.DATASOURCE_TYPE: self.application_generate_entity.datasource_type, - SystemVariableKey.DATASOURCE_INFO: self.application_generate_entity.datasource_info, - SystemVariableKey.INVOKE_FROM: self.application_generate_entity.invoke_from.value, - } + system_inputs = SystemVariable( + files=files, + user_id=user_id, + app_id=app_config.app_id, + workflow_id=app_config.workflow_id, + workflow_execution_id=self.application_generate_entity.workflow_execution_id, + document_id=self.application_generate_entity.document_id, + batch=self.application_generate_entity.batch, + dataset_id=self.application_generate_entity.dataset_id, + datasource_type=self.application_generate_entity.datasource_type, + datasource_info=self.application_generate_entity.datasource_info, + invoke_from=self.application_generate_entity.invoke_from.value, + ) rag_pipeline_variables = [] if workflow.rag_pipeline_variables: for v in workflow.rag_pipeline_variables: @@ -138,7 +138,7 @@ class PipelineRunner(WorkflowBasedAppRunner): ) variable_pool = VariablePool( - system_variables=SystemVariable(**system_inputs), + system_variables=system_inputs, user_inputs=inputs, environment_variables=workflow.environment_variables, conversation_variables=[], diff --git a/api/core/workflow/entities/variable_pool.py b/api/core/workflow/entities/variable_pool.py index de1eb5311e..e433cdb98b 100644 --- a/api/core/workflow/entities/variable_pool.py +++ b/api/core/workflow/entities/variable_pool.py @@ -67,7 +67,9 @@ class VariablePool(BaseModel): self.add((CONVERSATION_VARIABLE_NODE_ID, var.name), var) # Add rag pipeline variables to the variable pool for var in self.rag_pipeline_variables: - self.add((RAG_PIPELINE_VARIABLE_NODE_ID, var.variable.belong_to_node_id, var.variable.variable), var.value) + # Combine belong_to_node_id and variable into a single variable name + variable_name = f"{var.variable.belong_to_node_id}.{var.variable.variable}" + self.add((RAG_PIPELINE_VARIABLE_NODE_ID, variable_name), var.value) def add(self, selector: Sequence[str], value: Any, /) -> None: """ diff --git a/api/core/workflow/nodes/datasource/datasource_node.py b/api/core/workflow/nodes/datasource/datasource_node.py index 4ae0c5652e..dca523de81 100644 --- a/api/core/workflow/nodes/datasource/datasource_node.py +++ b/api/core/workflow/nodes/datasource/datasource_node.py @@ -204,16 +204,7 @@ class DatasourceNode(BaseNode): size=upload_file.size, storage_key=upload_file.key, ) - variable_pool.add([self.node_id, "file"], [file_info]) - for key, value in datasource_info.items(): - # construct new key list - new_key_list = ["file", key] - self._append_variables_recursively( - variable_pool=variable_pool, - node_id=self.node_id, - variable_key_list=new_key_list, - variable_value=value, - ) + variable_pool.add([self.node_id, "file"], file_info.to_dict()) yield RunCompletedEvent( run_result=NodeRunResult( status=WorkflowNodeExecutionStatus.SUCCEEDED, @@ -307,7 +298,7 @@ class DatasourceNode(BaseNode): :param variable_value: variable value :return: """ - variable_pool.add([node_id] + variable_key_list, variable_value) + variable_pool.add([node_id] + [".".join(variable_key_list)], variable_value) # if variable_value is a dict, then recursively append variables if isinstance(variable_value, dict): @@ -533,16 +524,8 @@ class DatasourceNode(BaseNode): mapping=mapping, tenant_id=self.tenant_id, ) - variable_pool.add([self.node_id, "file"], [file]) - for key, value in datasource_info.items(): - # construct new key list - new_key_list = ["file", key] - self._append_variables_recursively( - variable_pool=variable_pool, - node_id=self.node_id, - variable_key_list=new_key_list, - variable_value=value, - ) + if file: + variable_pool.add([self.node_id, "file"], file.to_dict()) yield RunCompletedEvent( run_result=NodeRunResult( status=WorkflowNodeExecutionStatus.SUCCEEDED,