diff --git a/api/core/app/apps/workflow/app_runner.py b/api/core/app/apps/workflow/app_runner.py index 439ecb2491..f05eb0e596 100644 --- a/api/core/app/apps/workflow/app_runner.py +++ b/api/core/app/apps/workflow/app_runner.py @@ -19,6 +19,7 @@ from core.workflow.system_variable import SystemVariable from core.workflow.variable_loader import VariableLoader from core.workflow.workflow_entry import WorkflowEntry from extensions.ext_redis import redis_client +from libs.datetime_utils import naive_utc_now from models.enums import UserFrom from models.workflow import Workflow @@ -67,6 +68,7 @@ class WorkflowAppRunner(WorkflowBasedAppRunner): files=self.application_generate_entity.files, user_id=self._sys_user_id, app_id=app_config.app_id, + timestamp=int(naive_utc_now().timestamp()), workflow_id=app_config.workflow_id, workflow_execution_id=self.application_generate_entity.workflow_execution_id, ) diff --git a/api/core/workflow/enums.py b/api/core/workflow/enums.py index 4e9a30dd6e..882d7762f2 100644 --- a/api/core/workflow/enums.py +++ b/api/core/workflow/enums.py @@ -22,6 +22,7 @@ class SystemVariableKey(StrEnum): APP_ID = "app_id" WORKFLOW_ID = "workflow_id" WORKFLOW_EXECUTION_ID = "workflow_run_id" + TIMESTAMP = "timestamp" # RAG Pipeline DOCUMENT_ID = "document_id" ORIGINAL_DOCUMENT_ID = "original_document_id" @@ -63,11 +64,21 @@ class NodeType(StrEnum): TRIGGER_PLUGIN = "trigger-plugin" HUMAN_INPUT = "human-input" + @property + def is_trigger_node(self) -> bool: + """Check if this node type is a trigger node.""" + return self in [ + NodeType.TRIGGER_WEBHOOK, + NodeType.TRIGGER_SCHEDULE, + NodeType.TRIGGER_PLUGIN, + ] + @property def is_start_node(self) -> bool: """Check if this node type can serve as a workflow entry point.""" return self in [ NodeType.START, + NodeType.DATASOURCE, NodeType.TRIGGER_WEBHOOK, NodeType.TRIGGER_SCHEDULE, NodeType.TRIGGER_PLUGIN, diff --git a/api/core/workflow/graph/graph.py b/api/core/workflow/graph/graph.py index d04724425c..ba5a01fc94 100644 --- a/api/core/workflow/graph/graph.py +++ b/api/core/workflow/graph/graph.py @@ -117,7 +117,7 @@ class Graph: node_type = node_data.get("type") if not isinstance(node_type, str): continue - if node_type in [NodeType.START, NodeType.DATASOURCE]: + if NodeType(node_type).is_start_node: start_node_id = nid break diff --git a/api/core/workflow/nodes/trigger_plugin/trigger_event_node.py b/api/core/workflow/nodes/trigger_plugin/trigger_event_node.py index 367c6852ff..0a70aa8727 100644 --- a/api/core/workflow/nodes/trigger_plugin/trigger_event_node.py +++ b/api/core/workflow/nodes/trigger_plugin/trigger_event_node.py @@ -1,7 +1,7 @@ from collections.abc import Mapping -from copy import deepcopy from typing import Any, Optional +from core.workflow.constants import SYSTEM_VARIABLE_NODE_ID from core.workflow.entities.workflow_node_execution import WorkflowNodeExecutionMetadataKey, WorkflowNodeExecutionStatus from core.workflow.enums import ErrorStrategy, NodeExecutionType, NodeType from core.workflow.node_events import NodeRunResult @@ -66,7 +66,6 @@ class TriggerEventNode(Node): """ # Get trigger data passed when workflow was triggered - inputs = deepcopy(self.graph_runtime_state.variable_pool.user_inputs) metadata = { WorkflowNodeExecutionMetadataKey.TRIGGER_INFO: { "provider_id": self._node_data.provider_id, @@ -74,9 +73,17 @@ class TriggerEventNode(Node): "plugin_unique_identifier": self._node_data.plugin_unique_identifier, }, } + node_inputs = dict(self.graph_runtime_state.variable_pool.user_inputs) + system_inputs = self.graph_runtime_state.variable_pool.system_variables.to_dict() + + # TODO: System variables should be directly accessible, no need for special handling + # Set system variables as node outputs. + for var in system_inputs: + node_inputs[SYSTEM_VARIABLE_NODE_ID + "." + var] = system_inputs[var] + outputs = dict(node_inputs) return NodeRunResult( status=WorkflowNodeExecutionStatus.SUCCEEDED, - inputs={}, - outputs=inputs, + inputs=node_inputs, + outputs=outputs, metadata=metadata, ) diff --git a/api/core/workflow/nodes/trigger_schedule/trigger_schedule_node.py b/api/core/workflow/nodes/trigger_schedule/trigger_schedule_node.py index 4fa50f1ead..55d66ac907 100644 --- a/api/core/workflow/nodes/trigger_schedule/trigger_schedule_node.py +++ b/api/core/workflow/nodes/trigger_schedule/trigger_schedule_node.py @@ -1,7 +1,7 @@ from collections.abc import Mapping -from datetime import UTC, datetime from typing import Any, Optional +from core.workflow.constants import SYSTEM_VARIABLE_NODE_ID from core.workflow.entities.workflow_node_execution import WorkflowNodeExecutionStatus from core.workflow.enums import ErrorStrategy, NodeExecutionType, NodeType from core.workflow.node_events import NodeRunResult @@ -54,10 +54,16 @@ class TriggerScheduleNode(Node): } def _run(self) -> NodeRunResult: - current_time = datetime.now(UTC) - node_outputs = {"current_time": current_time.isoformat()} + node_inputs = dict(self.graph_runtime_state.variable_pool.user_inputs) + system_inputs = self.graph_runtime_state.variable_pool.system_variables.to_dict() + # TODO: System variables should be directly accessible, no need for special handling + # Set system variables as node outputs. + for var in system_inputs: + node_inputs[SYSTEM_VARIABLE_NODE_ID + "." + var] = system_inputs[var] + outputs = dict(node_inputs) return NodeRunResult( status=WorkflowNodeExecutionStatus.SUCCEEDED, - outputs=node_outputs, + inputs=node_inputs, + outputs=outputs, ) diff --git a/api/core/workflow/system_variable.py b/api/core/workflow/system_variable.py index 6716e745cd..d35d6db7ec 100644 --- a/api/core/workflow/system_variable.py +++ b/api/core/workflow/system_variable.py @@ -28,6 +28,8 @@ class SystemVariable(BaseModel): app_id: str | None = None workflow_id: str | None = None + timestamp: int | None = None + files: Sequence[File] = Field(default_factory=list) # NOTE: The `workflow_execution_id` field was previously named `workflow_run_id`. @@ -107,4 +109,6 @@ class SystemVariable(BaseModel): d[SystemVariableKey.DATASOURCE_INFO] = self.datasource_info if self.invoke_from is not None: d[SystemVariableKey.INVOKE_FROM] = self.invoke_from + if self.timestamp is not None: + d[SystemVariableKey.TIMESTAMP] = self.timestamp return d diff --git a/api/services/workflow_draft_variable_service.py b/api/services/workflow_draft_variable_service.py index 5e63a83bb1..2690b55dbc 100644 --- a/api/services/workflow_draft_variable_service.py +++ b/api/services/workflow_draft_variable_service.py @@ -1026,7 +1026,7 @@ class DraftVariableSaver: return if self._node_type == NodeType.VARIABLE_ASSIGNER: draft_vars = self._build_from_variable_assigner_mapping(process_data=process_data) - elif self._node_type == NodeType.START: + elif self._node_type == NodeType.START or self._node_type.is_trigger_node: draft_vars = self._build_variables_from_start_mapping(outputs) else: draft_vars = self._build_variables_from_mapping(outputs) diff --git a/api/services/workflow_service.py b/api/services/workflow_service.py index aa53e27ece..c61e76402f 100644 --- a/api/services/workflow_service.py +++ b/api/services/workflow_service.py @@ -1007,10 +1007,11 @@ def _setup_variable_pool( conversation_variables: list[Variable], ): # Only inject system variables for START node type. - if node_type == NodeType.START: + if node_type == NodeType.START or node_type.is_trigger_node: system_variable = SystemVariable( user_id=user_id, app_id=workflow.app_id, + timestamp=int(naive_utc_now().timestamp()), workflow_id=workflow.id, files=files or [], workflow_execution_id=str(uuid.uuid4()),