fix file bugs

This commit is contained in:
takatost 2024-03-18 23:15:33 +08:00
parent 5a5beb5b59
commit 2da7cc6928
3 changed files with 44 additions and 3 deletions

View File

@ -24,6 +24,7 @@ from core.app.entities.task_entities import (
from core.file.file_obj import FileVar
from core.model_runtime.utils.encoders import jsonable_encoder
from core.workflow.entities.node_entities import NodeRunMetadataKey, NodeType, SystemVariable
from core.workflow.workflow_engine_manager import WorkflowEngineManager
from extensions.ext_database import db
from models.account import Account
from models.model import EndUser
@ -66,6 +67,11 @@ class WorkflowCycleManage:
.scalar() or 0
new_sequence_number = max_sequence + 1
inputs = {**user_inputs}
for key, value in (system_inputs or {}).items():
inputs[f'sys.{key.value}'] = value
inputs = WorkflowEngineManager.handle_special_values(inputs)
# init workflow run
workflow_run = WorkflowRun(
tenant_id=workflow.tenant_id,
@ -76,7 +82,7 @@ class WorkflowCycleManage:
triggered_from=triggered_from.value,
version=workflow.version,
graph=workflow.graph,
inputs=json.dumps({**user_inputs, **jsonable_encoder(system_inputs)}),
inputs=json.dumps(inputs),
status=WorkflowRunStatus.RUNNING.value,
created_by_role=(CreatedByRole.ACCOUNT.value
if isinstance(user, Account) else CreatedByRole.END_USER.value),
@ -202,6 +208,9 @@ class WorkflowCycleManage:
:param execution_metadata: execution metadata
:return:
"""
inputs = WorkflowEngineManager.handle_special_values(inputs)
outputs = WorkflowEngineManager.handle_special_values(outputs)
workflow_node_execution.status = WorkflowNodeExecutionStatus.SUCCEEDED.value
workflow_node_execution.elapsed_time = time.perf_counter() - start_at
workflow_node_execution.inputs = json.dumps(inputs) if inputs else None
@ -231,6 +240,9 @@ class WorkflowCycleManage:
:param error: error message
:return:
"""
inputs = WorkflowEngineManager.handle_special_values(inputs)
outputs = WorkflowEngineManager.handle_special_values(outputs)
workflow_node_execution.status = WorkflowNodeExecutionStatus.FAILED.value
workflow_node_execution.error = error
workflow_node_execution.elapsed_time = time.perf_counter() - start_at

View File

@ -24,10 +24,11 @@ class EndNode(BaseNode):
outputs = {}
for variable_selector in output_variables:
variable_value = variable_pool.get_variable_value(
value = variable_pool.get_variable_value(
variable_selector=variable_selector.value_selector
)
outputs[variable_selector.variable] = variable_value
outputs[variable_selector.variable] = value
return NodeRunResult(
status=WorkflowNodeExecutionStatus.SUCCEEDED,

View File

@ -3,6 +3,7 @@ import time
from typing import Optional
from core.app.apps.base_app_queue_manager import GenerateTaskStoppedException
from core.file.file_obj import FileVar
from core.workflow.callbacks.base_workflow_callback import BaseWorkflowCallback
from core.workflow.entities.node_entities import NodeRunMetadataKey, NodeRunResult, NodeType
from core.workflow.entities.variable_pool import VariablePool, VariableValue
@ -494,3 +495,30 @@ class WorkflowEngineManager:
variable_key_list=new_key_list,
variable_value=value
)
@classmethod
def handle_special_values(cls, value: Optional[dict]) -> Optional[dict]:
"""
Handle special values
:param value: value
:return:
"""
if not value:
return None
new_value = value.copy()
if isinstance(new_value, dict):
for key, val in new_value.items():
if isinstance(val, FileVar):
new_value[key] = val.to_dict()
elif isinstance(val, list):
new_val = []
for v in val:
if isinstance(v, FileVar):
new_val.append(v.to_dict())
else:
new_val.append(v)
new_value[key] = new_val
return new_value