fix variable_pool

This commit is contained in:
jyong 2025-08-13 17:38:14 +08:00
parent 3797416fe0
commit 72ea3b4d01
3 changed files with 21 additions and 36 deletions

View File

@ -109,19 +109,19 @@ class PipelineRunner(WorkflowBasedAppRunner):
files = self.application_generate_entity.files
# Create a variable pool.
system_inputs = {
SystemVariableKey.FILES: files,
SystemVariableKey.USER_ID: user_id,
SystemVariableKey.APP_ID: app_config.app_id,
SystemVariableKey.WORKFLOW_ID: app_config.workflow_id,
SystemVariableKey.WORKFLOW_EXECUTION_ID: self.application_generate_entity.workflow_execution_id,
SystemVariableKey.DOCUMENT_ID: self.application_generate_entity.document_id,
SystemVariableKey.BATCH: self.application_generate_entity.batch,
SystemVariableKey.DATASET_ID: self.application_generate_entity.dataset_id,
SystemVariableKey.DATASOURCE_TYPE: self.application_generate_entity.datasource_type,
SystemVariableKey.DATASOURCE_INFO: self.application_generate_entity.datasource_info,
SystemVariableKey.INVOKE_FROM: self.application_generate_entity.invoke_from.value,
}
system_inputs = SystemVariable(
files=files,
user_id=user_id,
app_id=app_config.app_id,
workflow_id=app_config.workflow_id,
workflow_execution_id=self.application_generate_entity.workflow_execution_id,
document_id=self.application_generate_entity.document_id,
batch=self.application_generate_entity.batch,
dataset_id=self.application_generate_entity.dataset_id,
datasource_type=self.application_generate_entity.datasource_type,
datasource_info=self.application_generate_entity.datasource_info,
invoke_from=self.application_generate_entity.invoke_from.value,
)
rag_pipeline_variables = []
if workflow.rag_pipeline_variables:
for v in workflow.rag_pipeline_variables:
@ -138,7 +138,7 @@ class PipelineRunner(WorkflowBasedAppRunner):
)
variable_pool = VariablePool(
system_variables=SystemVariable(**system_inputs),
system_variables=system_inputs,
user_inputs=inputs,
environment_variables=workflow.environment_variables,
conversation_variables=[],

View File

@ -67,7 +67,9 @@ class VariablePool(BaseModel):
self.add((CONVERSATION_VARIABLE_NODE_ID, var.name), var)
# Add rag pipeline variables to the variable pool
for var in self.rag_pipeline_variables:
self.add((RAG_PIPELINE_VARIABLE_NODE_ID, var.variable.belong_to_node_id, var.variable.variable), var.value)
# Combine belong_to_node_id and variable into a single variable name
variable_name = f"{var.variable.belong_to_node_id}.{var.variable.variable}"
self.add((RAG_PIPELINE_VARIABLE_NODE_ID, variable_name), var.value)
def add(self, selector: Sequence[str], value: Any, /) -> None:
"""

View File

@ -204,16 +204,7 @@ class DatasourceNode(BaseNode):
size=upload_file.size,
storage_key=upload_file.key,
)
variable_pool.add([self.node_id, "file"], [file_info])
for key, value in datasource_info.items():
# construct new key list
new_key_list = ["file", key]
self._append_variables_recursively(
variable_pool=variable_pool,
node_id=self.node_id,
variable_key_list=new_key_list,
variable_value=value,
)
variable_pool.add([self.node_id, "file"], file_info.to_dict())
yield RunCompletedEvent(
run_result=NodeRunResult(
status=WorkflowNodeExecutionStatus.SUCCEEDED,
@ -307,7 +298,7 @@ class DatasourceNode(BaseNode):
:param variable_value: variable value
:return:
"""
variable_pool.add([node_id] + variable_key_list, variable_value)
variable_pool.add([node_id] + [".".join(variable_key_list)], variable_value)
# if variable_value is a dict, then recursively append variables
if isinstance(variable_value, dict):
@ -533,16 +524,8 @@ class DatasourceNode(BaseNode):
mapping=mapping,
tenant_id=self.tenant_id,
)
variable_pool.add([self.node_id, "file"], [file])
for key, value in datasource_info.items():
# construct new key list
new_key_list = ["file", key]
self._append_variables_recursively(
variable_pool=variable_pool,
node_id=self.node_id,
variable_key_list=new_key_list,
variable_value=value,
)
if file:
variable_pool.add([self.node_id, "file"], file.to_dict())
yield RunCompletedEvent(
run_result=NodeRunResult(
status=WorkflowNodeExecutionStatus.SUCCEEDED,