diff --git a/api/core/app/task_pipeline/workflow_cycle_manage.py b/api/core/app/task_pipeline/workflow_cycle_manage.py index 8e7619adfd..89da11b76c 100644 --- a/api/core/app/task_pipeline/workflow_cycle_manage.py +++ b/api/core/app/task_pipeline/workflow_cycle_manage.py @@ -24,6 +24,7 @@ from core.app.entities.task_entities import ( from core.file.file_obj import FileVar from core.model_runtime.utils.encoders import jsonable_encoder from core.workflow.entities.node_entities import NodeRunMetadataKey, NodeType, SystemVariable +from core.workflow.workflow_engine_manager import WorkflowEngineManager from extensions.ext_database import db from models.account import Account from models.model import EndUser @@ -66,6 +67,11 @@ class WorkflowCycleManage: .scalar() or 0 new_sequence_number = max_sequence + 1 + inputs = {**user_inputs} + for key, value in (system_inputs or {}).items(): + inputs[f'sys.{key.value}'] = value + inputs = WorkflowEngineManager.handle_special_values(inputs) + # init workflow run workflow_run = WorkflowRun( tenant_id=workflow.tenant_id, @@ -76,7 +82,7 @@ class WorkflowCycleManage: triggered_from=triggered_from.value, version=workflow.version, graph=workflow.graph, - inputs=json.dumps({**user_inputs, **jsonable_encoder(system_inputs)}), + inputs=json.dumps(inputs), status=WorkflowRunStatus.RUNNING.value, created_by_role=(CreatedByRole.ACCOUNT.value if isinstance(user, Account) else CreatedByRole.END_USER.value), @@ -202,6 +208,9 @@ class WorkflowCycleManage: :param execution_metadata: execution metadata :return: """ + inputs = WorkflowEngineManager.handle_special_values(inputs) + outputs = WorkflowEngineManager.handle_special_values(outputs) + workflow_node_execution.status = WorkflowNodeExecutionStatus.SUCCEEDED.value workflow_node_execution.elapsed_time = time.perf_counter() - start_at workflow_node_execution.inputs = json.dumps(inputs) if inputs else None @@ -231,6 +240,9 @@ class WorkflowCycleManage: :param error: error message :return: """ + inputs = WorkflowEngineManager.handle_special_values(inputs) + outputs = WorkflowEngineManager.handle_special_values(outputs) + workflow_node_execution.status = WorkflowNodeExecutionStatus.FAILED.value workflow_node_execution.error = error workflow_node_execution.elapsed_time = time.perf_counter() - start_at diff --git a/api/core/workflow/nodes/end/end_node.py b/api/core/workflow/nodes/end/end_node.py index 3241860c29..d968321c30 100644 --- a/api/core/workflow/nodes/end/end_node.py +++ b/api/core/workflow/nodes/end/end_node.py @@ -24,10 +24,11 @@ class EndNode(BaseNode): outputs = {} for variable_selector in output_variables: - variable_value = variable_pool.get_variable_value( + value = variable_pool.get_variable_value( variable_selector=variable_selector.value_selector ) - outputs[variable_selector.variable] = variable_value + + outputs[variable_selector.variable] = value return NodeRunResult( status=WorkflowNodeExecutionStatus.SUCCEEDED, diff --git a/api/core/workflow/workflow_engine_manager.py b/api/core/workflow/workflow_engine_manager.py index 9eb7b3af0b..be5bd1c17a 100644 --- a/api/core/workflow/workflow_engine_manager.py +++ b/api/core/workflow/workflow_engine_manager.py @@ -3,6 +3,7 @@ import time from typing import Optional from core.app.apps.base_app_queue_manager import GenerateTaskStoppedException +from core.file.file_obj import FileVar from core.workflow.callbacks.base_workflow_callback import BaseWorkflowCallback from core.workflow.entities.node_entities import NodeRunMetadataKey, NodeRunResult, NodeType from core.workflow.entities.variable_pool import VariablePool, VariableValue @@ -494,3 +495,30 @@ class WorkflowEngineManager: variable_key_list=new_key_list, variable_value=value ) + + @classmethod + def handle_special_values(cls, value: Optional[dict]) -> Optional[dict]: + """ + Handle special values + :param value: value + :return: + """ + if not value: + return None + + new_value = value.copy() + if isinstance(new_value, dict): + for key, val in new_value.items(): + if isinstance(val, FileVar): + new_value[key] = val.to_dict() + elif isinstance(val, list): + new_val = [] + for v in val: + if isinstance(v, FileVar): + new_val.append(v.to_dict()) + else: + new_val.append(v) + + new_value[key] = new_val + + return new_value