feat(trigger): system variables for trigger nodes

Added a timestamp field to the SystemVariable model and updated the WorkflowAppRunner to include the current timestamp during execution. Enhanced node type checks to recognize trigger nodes in various services, ensuring proper handling of system variables and node outputs in TriggerEventNode and TriggerScheduleNode. This improves the overall workflow execution context and maintains consistency across node types.
This commit is contained in:
Harry 2025-10-29 18:06:41 +08:00
parent db2c6678e4
commit fb12f31df2
8 changed files with 42 additions and 11 deletions

View File

@ -19,6 +19,7 @@ from core.workflow.system_variable import SystemVariable
from core.workflow.variable_loader import VariableLoader
from core.workflow.workflow_entry import WorkflowEntry
from extensions.ext_redis import redis_client
from libs.datetime_utils import naive_utc_now
from models.enums import UserFrom
from models.workflow import Workflow
@ -67,6 +68,7 @@ class WorkflowAppRunner(WorkflowBasedAppRunner):
files=self.application_generate_entity.files,
user_id=self._sys_user_id,
app_id=app_config.app_id,
timestamp=int(naive_utc_now().timestamp()),
workflow_id=app_config.workflow_id,
workflow_execution_id=self.application_generate_entity.workflow_execution_id,
)

View File

@ -22,6 +22,7 @@ class SystemVariableKey(StrEnum):
APP_ID = "app_id"
WORKFLOW_ID = "workflow_id"
WORKFLOW_EXECUTION_ID = "workflow_run_id"
TIMESTAMP = "timestamp"
# RAG Pipeline
DOCUMENT_ID = "document_id"
ORIGINAL_DOCUMENT_ID = "original_document_id"
@ -63,11 +64,21 @@ class NodeType(StrEnum):
TRIGGER_PLUGIN = "trigger-plugin"
HUMAN_INPUT = "human-input"
@property
def is_trigger_node(self) -> bool:
"""Check if this node type is a trigger node."""
return self in [
NodeType.TRIGGER_WEBHOOK,
NodeType.TRIGGER_SCHEDULE,
NodeType.TRIGGER_PLUGIN,
]
@property
def is_start_node(self) -> bool:
"""Check if this node type can serve as a workflow entry point."""
return self in [
NodeType.START,
NodeType.DATASOURCE,
NodeType.TRIGGER_WEBHOOK,
NodeType.TRIGGER_SCHEDULE,
NodeType.TRIGGER_PLUGIN,

View File

@ -117,7 +117,7 @@ class Graph:
node_type = node_data.get("type")
if not isinstance(node_type, str):
continue
if node_type in [NodeType.START, NodeType.DATASOURCE]:
if NodeType(node_type).is_start_node:
start_node_id = nid
break

View File

@ -1,7 +1,7 @@
from collections.abc import Mapping
from copy import deepcopy
from typing import Any, Optional
from core.workflow.constants import SYSTEM_VARIABLE_NODE_ID
from core.workflow.entities.workflow_node_execution import WorkflowNodeExecutionMetadataKey, WorkflowNodeExecutionStatus
from core.workflow.enums import ErrorStrategy, NodeExecutionType, NodeType
from core.workflow.node_events import NodeRunResult
@ -66,7 +66,6 @@ class TriggerEventNode(Node):
"""
# Get trigger data passed when workflow was triggered
inputs = deepcopy(self.graph_runtime_state.variable_pool.user_inputs)
metadata = {
WorkflowNodeExecutionMetadataKey.TRIGGER_INFO: {
"provider_id": self._node_data.provider_id,
@ -74,9 +73,17 @@ class TriggerEventNode(Node):
"plugin_unique_identifier": self._node_data.plugin_unique_identifier,
},
}
node_inputs = dict(self.graph_runtime_state.variable_pool.user_inputs)
system_inputs = self.graph_runtime_state.variable_pool.system_variables.to_dict()
# TODO: System variables should be directly accessible, no need for special handling
# Set system variables as node outputs.
for var in system_inputs:
node_inputs[SYSTEM_VARIABLE_NODE_ID + "." + var] = system_inputs[var]
outputs = dict(node_inputs)
return NodeRunResult(
status=WorkflowNodeExecutionStatus.SUCCEEDED,
inputs={},
outputs=inputs,
inputs=node_inputs,
outputs=outputs,
metadata=metadata,
)

View File

@ -1,7 +1,7 @@
from collections.abc import Mapping
from datetime import UTC, datetime
from typing import Any, Optional
from core.workflow.constants import SYSTEM_VARIABLE_NODE_ID
from core.workflow.entities.workflow_node_execution import WorkflowNodeExecutionStatus
from core.workflow.enums import ErrorStrategy, NodeExecutionType, NodeType
from core.workflow.node_events import NodeRunResult
@ -54,10 +54,16 @@ class TriggerScheduleNode(Node):
}
def _run(self) -> NodeRunResult:
current_time = datetime.now(UTC)
node_outputs = {"current_time": current_time.isoformat()}
node_inputs = dict(self.graph_runtime_state.variable_pool.user_inputs)
system_inputs = self.graph_runtime_state.variable_pool.system_variables.to_dict()
# TODO: System variables should be directly accessible, no need for special handling
# Set system variables as node outputs.
for var in system_inputs:
node_inputs[SYSTEM_VARIABLE_NODE_ID + "." + var] = system_inputs[var]
outputs = dict(node_inputs)
return NodeRunResult(
status=WorkflowNodeExecutionStatus.SUCCEEDED,
outputs=node_outputs,
inputs=node_inputs,
outputs=outputs,
)

View File

@ -28,6 +28,8 @@ class SystemVariable(BaseModel):
app_id: str | None = None
workflow_id: str | None = None
timestamp: int | None = None
files: Sequence[File] = Field(default_factory=list)
# NOTE: The `workflow_execution_id` field was previously named `workflow_run_id`.
@ -107,4 +109,6 @@ class SystemVariable(BaseModel):
d[SystemVariableKey.DATASOURCE_INFO] = self.datasource_info
if self.invoke_from is not None:
d[SystemVariableKey.INVOKE_FROM] = self.invoke_from
if self.timestamp is not None:
d[SystemVariableKey.TIMESTAMP] = self.timestamp
return d

View File

@ -1026,7 +1026,7 @@ class DraftVariableSaver:
return
if self._node_type == NodeType.VARIABLE_ASSIGNER:
draft_vars = self._build_from_variable_assigner_mapping(process_data=process_data)
elif self._node_type == NodeType.START:
elif self._node_type == NodeType.START or self._node_type.is_trigger_node:
draft_vars = self._build_variables_from_start_mapping(outputs)
else:
draft_vars = self._build_variables_from_mapping(outputs)

View File

@ -1007,10 +1007,11 @@ def _setup_variable_pool(
conversation_variables: list[Variable],
):
# Only inject system variables for START node type.
if node_type == NodeType.START:
if node_type == NodeType.START or node_type.is_trigger_node:
system_variable = SystemVariable(
user_id=user_id,
app_id=workflow.app_id,
timestamp=int(naive_utc_now().timestamp()),
workflow_id=workflow.id,
files=files or [],
workflow_execution_id=str(uuid.uuid4()),