From fb12f31df2940970cb3cbdd72272ccc26a139d52 Mon Sep 17 00:00:00 2001 From: Harry Date: Wed, 29 Oct 2025 18:06:41 +0800 Subject: [PATCH] feat(trigger): system variables for trigger nodes Added a timestamp field to the SystemVariable model and updated the WorkflowAppRunner to include the current timestamp during execution. Enhanced node type checks to recognize trigger nodes in various services, ensuring proper handling of system variables and node outputs in TriggerEventNode and TriggerScheduleNode. This improves the overall workflow execution context and maintains consistency across node types. --- api/core/app/apps/workflow/app_runner.py | 2 ++ api/core/workflow/enums.py | 11 +++++++++++ api/core/workflow/graph/graph.py | 2 +- .../nodes/trigger_plugin/trigger_event_node.py | 15 +++++++++++---- .../trigger_schedule/trigger_schedule_node.py | 14 ++++++++++---- api/core/workflow/system_variable.py | 4 ++++ api/services/workflow_draft_variable_service.py | 2 +- api/services/workflow_service.py | 3 ++- 8 files changed, 42 insertions(+), 11 deletions(-) diff --git a/api/core/app/apps/workflow/app_runner.py b/api/core/app/apps/workflow/app_runner.py index 439ecb2491..f05eb0e596 100644 --- a/api/core/app/apps/workflow/app_runner.py +++ b/api/core/app/apps/workflow/app_runner.py @@ -19,6 +19,7 @@ from core.workflow.system_variable import SystemVariable from core.workflow.variable_loader import VariableLoader from core.workflow.workflow_entry import WorkflowEntry from extensions.ext_redis import redis_client +from libs.datetime_utils import naive_utc_now from models.enums import UserFrom from models.workflow import Workflow @@ -67,6 +68,7 @@ class WorkflowAppRunner(WorkflowBasedAppRunner): files=self.application_generate_entity.files, user_id=self._sys_user_id, app_id=app_config.app_id, + timestamp=int(naive_utc_now().timestamp()), workflow_id=app_config.workflow_id, workflow_execution_id=self.application_generate_entity.workflow_execution_id, ) diff --git a/api/core/workflow/enums.py b/api/core/workflow/enums.py index 4e9a30dd6e..882d7762f2 100644 --- a/api/core/workflow/enums.py +++ b/api/core/workflow/enums.py @@ -22,6 +22,7 @@ class SystemVariableKey(StrEnum): APP_ID = "app_id" WORKFLOW_ID = "workflow_id" WORKFLOW_EXECUTION_ID = "workflow_run_id" + TIMESTAMP = "timestamp" # RAG Pipeline DOCUMENT_ID = "document_id" ORIGINAL_DOCUMENT_ID = "original_document_id" @@ -63,11 +64,21 @@ class NodeType(StrEnum): TRIGGER_PLUGIN = "trigger-plugin" HUMAN_INPUT = "human-input" + @property + def is_trigger_node(self) -> bool: + """Check if this node type is a trigger node.""" + return self in [ + NodeType.TRIGGER_WEBHOOK, + NodeType.TRIGGER_SCHEDULE, + NodeType.TRIGGER_PLUGIN, + ] + @property def is_start_node(self) -> bool: """Check if this node type can serve as a workflow entry point.""" return self in [ NodeType.START, + NodeType.DATASOURCE, NodeType.TRIGGER_WEBHOOK, NodeType.TRIGGER_SCHEDULE, NodeType.TRIGGER_PLUGIN, diff --git a/api/core/workflow/graph/graph.py b/api/core/workflow/graph/graph.py index d04724425c..ba5a01fc94 100644 --- a/api/core/workflow/graph/graph.py +++ b/api/core/workflow/graph/graph.py @@ -117,7 +117,7 @@ class Graph: node_type = node_data.get("type") if not isinstance(node_type, str): continue - if node_type in [NodeType.START, NodeType.DATASOURCE]: + if NodeType(node_type).is_start_node: start_node_id = nid break diff --git a/api/core/workflow/nodes/trigger_plugin/trigger_event_node.py b/api/core/workflow/nodes/trigger_plugin/trigger_event_node.py index 367c6852ff..0a70aa8727 100644 --- a/api/core/workflow/nodes/trigger_plugin/trigger_event_node.py +++ b/api/core/workflow/nodes/trigger_plugin/trigger_event_node.py @@ -1,7 +1,7 @@ from collections.abc import Mapping -from copy import deepcopy from typing import Any, Optional +from core.workflow.constants import SYSTEM_VARIABLE_NODE_ID from core.workflow.entities.workflow_node_execution import WorkflowNodeExecutionMetadataKey, WorkflowNodeExecutionStatus from core.workflow.enums import ErrorStrategy, NodeExecutionType, NodeType from core.workflow.node_events import NodeRunResult @@ -66,7 +66,6 @@ class TriggerEventNode(Node): """ # Get trigger data passed when workflow was triggered - inputs = deepcopy(self.graph_runtime_state.variable_pool.user_inputs) metadata = { WorkflowNodeExecutionMetadataKey.TRIGGER_INFO: { "provider_id": self._node_data.provider_id, @@ -74,9 +73,17 @@ class TriggerEventNode(Node): "plugin_unique_identifier": self._node_data.plugin_unique_identifier, }, } + node_inputs = dict(self.graph_runtime_state.variable_pool.user_inputs) + system_inputs = self.graph_runtime_state.variable_pool.system_variables.to_dict() + + # TODO: System variables should be directly accessible, no need for special handling + # Set system variables as node outputs. + for var in system_inputs: + node_inputs[SYSTEM_VARIABLE_NODE_ID + "." + var] = system_inputs[var] + outputs = dict(node_inputs) return NodeRunResult( status=WorkflowNodeExecutionStatus.SUCCEEDED, - inputs={}, - outputs=inputs, + inputs=node_inputs, + outputs=outputs, metadata=metadata, ) diff --git a/api/core/workflow/nodes/trigger_schedule/trigger_schedule_node.py b/api/core/workflow/nodes/trigger_schedule/trigger_schedule_node.py index 4fa50f1ead..55d66ac907 100644 --- a/api/core/workflow/nodes/trigger_schedule/trigger_schedule_node.py +++ b/api/core/workflow/nodes/trigger_schedule/trigger_schedule_node.py @@ -1,7 +1,7 @@ from collections.abc import Mapping -from datetime import UTC, datetime from typing import Any, Optional +from core.workflow.constants import SYSTEM_VARIABLE_NODE_ID from core.workflow.entities.workflow_node_execution import WorkflowNodeExecutionStatus from core.workflow.enums import ErrorStrategy, NodeExecutionType, NodeType from core.workflow.node_events import NodeRunResult @@ -54,10 +54,16 @@ class TriggerScheduleNode(Node): } def _run(self) -> NodeRunResult: - current_time = datetime.now(UTC) - node_outputs = {"current_time": current_time.isoformat()} + node_inputs = dict(self.graph_runtime_state.variable_pool.user_inputs) + system_inputs = self.graph_runtime_state.variable_pool.system_variables.to_dict() + # TODO: System variables should be directly accessible, no need for special handling + # Set system variables as node outputs. + for var in system_inputs: + node_inputs[SYSTEM_VARIABLE_NODE_ID + "." + var] = system_inputs[var] + outputs = dict(node_inputs) return NodeRunResult( status=WorkflowNodeExecutionStatus.SUCCEEDED, - outputs=node_outputs, + inputs=node_inputs, + outputs=outputs, ) diff --git a/api/core/workflow/system_variable.py b/api/core/workflow/system_variable.py index 6716e745cd..d35d6db7ec 100644 --- a/api/core/workflow/system_variable.py +++ b/api/core/workflow/system_variable.py @@ -28,6 +28,8 @@ class SystemVariable(BaseModel): app_id: str | None = None workflow_id: str | None = None + timestamp: int | None = None + files: Sequence[File] = Field(default_factory=list) # NOTE: The `workflow_execution_id` field was previously named `workflow_run_id`. @@ -107,4 +109,6 @@ class SystemVariable(BaseModel): d[SystemVariableKey.DATASOURCE_INFO] = self.datasource_info if self.invoke_from is not None: d[SystemVariableKey.INVOKE_FROM] = self.invoke_from + if self.timestamp is not None: + d[SystemVariableKey.TIMESTAMP] = self.timestamp return d diff --git a/api/services/workflow_draft_variable_service.py b/api/services/workflow_draft_variable_service.py index 5e63a83bb1..2690b55dbc 100644 --- a/api/services/workflow_draft_variable_service.py +++ b/api/services/workflow_draft_variable_service.py @@ -1026,7 +1026,7 @@ class DraftVariableSaver: return if self._node_type == NodeType.VARIABLE_ASSIGNER: draft_vars = self._build_from_variable_assigner_mapping(process_data=process_data) - elif self._node_type == NodeType.START: + elif self._node_type == NodeType.START or self._node_type.is_trigger_node: draft_vars = self._build_variables_from_start_mapping(outputs) else: draft_vars = self._build_variables_from_mapping(outputs) diff --git a/api/services/workflow_service.py b/api/services/workflow_service.py index aa53e27ece..c61e76402f 100644 --- a/api/services/workflow_service.py +++ b/api/services/workflow_service.py @@ -1007,10 +1007,11 @@ def _setup_variable_pool( conversation_variables: list[Variable], ): # Only inject system variables for START node type. - if node_type == NodeType.START: + if node_type == NodeType.START or node_type.is_trigger_node: system_variable = SystemVariable( user_id=user_id, app_id=workflow.app_id, + timestamp=int(naive_utc_now().timestamp()), workflow_id=workflow.id, files=files or [], workflow_execution_id=str(uuid.uuid4()),