diff --git a/api/core/app/apps/common/workflow_response_converter.py b/api/core/app/apps/common/workflow_response_converter.py index 67dc9909a1..72103ae9eb 100644 --- a/api/core/app/apps/common/workflow_response_converter.py +++ b/api/core/app/apps/common/workflow_response_converter.py @@ -462,31 +462,33 @@ class WorkflowResponseConverter: agent_strategy=event.agent_strategy, ), ) + response.data.extras.update(event.extras) + provider_id = str(response.data.extras.get("provider_id") or event.provider_id) try: if event.node_type == NodeType.TOOL: response.data.extras["icon"] = ToolManager.get_tool_icon( tenant_id=self._application_generate_entity.app_config.tenant_id, provider_type=ToolProviderType(event.provider_type), - provider_id=event.provider_id, + provider_id=provider_id, ) elif event.node_type == NodeType.DATASOURCE: manager = PluginDatasourceManager() provider_entity = manager.fetch_datasource_provider( self._application_generate_entity.app_config.tenant_id, - event.provider_id, + provider_id, ) response.data.extras["icon"] = provider_entity.declaration.identity.generate_datasource_icon_url( self._application_generate_entity.app_config.tenant_id ) - elif event.node_type == NodeType.TRIGGER_PLUGIN: + elif event.node_type == NodeType.TRIGGER_PLUGIN and provider_id: response.data.extras["icon"] = TriggerManager.get_trigger_plugin_icon( self._application_generate_entity.app_config.tenant_id, - event.provider_id, + provider_id, ) except Exception: # metadata fetch may fail, for example, the plugin daemon is down or plugin is uninstalled. - logger.warning("failed to fetch icon for %s", event.provider_id) + logger.warning("failed to fetch icon for %s", provider_id) return response @@ -595,6 +597,7 @@ class WorkflowResponseConverter: iteration_id=event.in_iteration_id, loop_id=event.in_loop_id, retry_index=event.retry_index, + extras=dict(event.extras), ), ) diff --git a/api/core/app/apps/workflow_app_runner.py b/api/core/app/apps/workflow_app_runner.py index 252faf7e3e..28dab3e400 100644 --- a/api/core/app/apps/workflow_app_runner.py +++ b/api/core/app/apps/workflow_app_runner.py @@ -392,6 +392,7 @@ class WorkflowBasedAppRunner: process_data = node_run_result.process_data outputs = node_run_result.outputs execution_metadata = node_run_result.metadata + provider_id = event.provider_id or str(event.extras.get("provider_id", "")) self._publish_event( QueueNodeRetryEvent( node_execution_id=event.id, @@ -407,11 +408,13 @@ class WorkflowBasedAppRunner: error=event.error, execution_metadata=execution_metadata, retry_index=event.retry_index, + extras=dict(event.extras), provider_type=event.provider_type, - provider_id=event.provider_id, + provider_id=provider_id, ) ) elif isinstance(event, NodeRunStartedEvent): + provider_id = event.provider_id or str(event.extras.get("provider_id", "")) self._publish_event( QueueNodeStartedEvent( node_execution_id=event.id, @@ -422,8 +425,9 @@ class WorkflowBasedAppRunner: in_iteration_id=event.in_iteration_id, in_loop_id=event.in_loop_id, agent_strategy=event.agent_strategy, + extras=dict(event.extras), provider_type=event.provider_type, - provider_id=event.provider_id, + provider_id=provider_id, ) ) elif isinstance(event, NodeRunSucceededEvent): diff --git a/api/core/app/entities/queue_entities.py b/api/core/app/entities/queue_entities.py index d42df0d1bf..daaf2a6a39 100644 --- a/api/core/app/entities/queue_entities.py +++ b/api/core/app/entities/queue_entities.py @@ -315,8 +315,9 @@ class QueueNodeStartedEvent(AppQueueEvent): in_loop_id: str | None = None start_at: datetime agent_strategy: AgentNodeStrategyInit | None = None + extras: dict[str, object] = Field(default_factory=dict) - # FIXME(-LAN-): only for ToolNode, need to refactor + # Legacy provider fields kept for existing start-event consumers. provider_type: str # should be a core.tools.entities.tool_entities.ToolProviderType provider_id: str diff --git a/api/core/app/entities/task_entities.py b/api/core/app/entities/task_entities.py index b58dae0ff2..9d3d63d712 100644 --- a/api/core/app/entities/task_entities.py +++ b/api/core/app/entities/task_entities.py @@ -472,6 +472,7 @@ class NodeRetryStreamResponse(StreamResponse): iteration_id: str | None = None loop_id: str | None = None retry_index: int = 0 + extras: dict[str, object] = Field(default_factory=dict) event: StreamEvent = StreamEvent.NODE_RETRY workflow_run_id: str @@ -502,6 +503,7 @@ class NodeRetryStreamResponse(StreamResponse): "iteration_id": self.data.iteration_id, "loop_id": self.data.loop_id, "retry_index": self.data.retry_index, + "extras": {}, }, } diff --git a/api/core/workflow/nodes/trigger_plugin/trigger_event_node.py b/api/core/workflow/nodes/trigger_plugin/trigger_event_node.py index b4f1116f7e..fad4a7c527 100644 --- a/api/core/workflow/nodes/trigger_plugin/trigger_event_node.py +++ b/api/core/workflow/nodes/trigger_plugin/trigger_event_node.py @@ -3,6 +3,7 @@ from collections.abc import Mapping from dify_graph.constants import SYSTEM_VARIABLE_NODE_ID from dify_graph.entities.workflow_node_execution import WorkflowNodeExecutionMetadataKey, WorkflowNodeExecutionStatus from dify_graph.enums import NodeExecutionType, NodeType +from dify_graph.graph_events import NodeRunStartedEvent from dify_graph.node_events import NodeRunResult from dify_graph.nodes.base.node import Node @@ -32,6 +33,11 @@ class TriggerEventNode(Node[TriggerEventNodeData]): def version(cls) -> str: return "1" + def customize_start_event(self, event: NodeRunStartedEvent) -> None: + provider_id = self.node_data.provider_id + event.provider_id = provider_id + event.extras["provider_id"] = provider_id + def _run(self) -> NodeRunResult: """ Run the plugin trigger node. diff --git a/api/dify_graph/graph_events/node.py b/api/dify_graph/graph_events/node.py index 21ddf80b64..ce2a9730cb 100644 --- a/api/dify_graph/graph_events/node.py +++ b/api/dify_graph/graph_events/node.py @@ -15,8 +15,9 @@ class NodeRunStartedEvent(GraphNodeEventBase): predecessor_node_id: str | None = None agent_strategy: AgentNodeStrategyInit | None = None start_at: datetime = Field(..., description="node start time") + extras: dict[str, object] = Field(default_factory=dict) - # FIXME(-LAN-): only for ToolNode + # Legacy provider fields kept for existing start-event consumers. provider_type: str = "" provider_id: str = "" diff --git a/api/dify_graph/nodes/base/node.py b/api/dify_graph/nodes/base/node.py index e944851d21..648ef9accd 100644 --- a/api/dify_graph/nodes/base/node.py +++ b/api/dify_graph/nodes/base/node.py @@ -273,6 +273,10 @@ class Node(Generic[NodeDataT]): """Optional hook for subclasses requiring extra initialization.""" return + def customize_start_event(self, event: NodeRunStartedEvent) -> None: + """Optional hook for subclasses to attach start-event metadata or extras.""" + return + @property def graph_init_params(self) -> GraphInitParams: return self._graph_init_params @@ -388,6 +392,8 @@ class Node(Generic[NodeDataT]): icon=self.agent_strategy_icon, ) + self.customize_start_event(start_event) + # === yield start_event diff --git a/api/tests/unit_tests/core/app/apps/common/test_workflow_response_converter_truncation.py b/api/tests/unit_tests/core/app/apps/common/test_workflow_response_converter_truncation.py index 69d476bd13..27cecca092 100644 --- a/api/tests/unit_tests/core/app/apps/common/test_workflow_response_converter_truncation.py +++ b/api/tests/unit_tests/core/app/apps/common/test_workflow_response_converter_truncation.py @@ -6,7 +6,7 @@ import uuid from collections.abc import Mapping from dataclasses import dataclass from typing import Any -from unittest.mock import Mock +from unittest.mock import Mock, patch import pytest @@ -737,6 +737,33 @@ class TestWorkflowResponseConverterServiceApiTruncation: assert not response.data.inputs_truncated assert not response.data.process_data_truncated assert not response.data.outputs_truncated + assert response.data.extras == {} + + def test_trigger_plugin_start_event_uses_extras_provider_id_for_icon(self): + converter = self.create_test_converter(InvokeFrom.WEB_APP) + event = QueueNodeStartedEvent( + node_execution_id=str(uuid.uuid4()), + node_id="trigger-node", + node_title="Trigger Node", + node_type=NodeType.TRIGGER_PLUGIN, + start_at=naive_utc_now(), + in_iteration_id=None, + in_loop_id=None, + provider_type="", + provider_id="", + extras={"provider_id": "provider-1"}, + ) + + with patch( + "core.app.apps.common.workflow_response_converter.TriggerManager.get_trigger_plugin_icon", + return_value="https://example.com/icon.png", + ) as get_trigger_plugin_icon: + response = converter.workflow_node_start_to_stream_response(event=event, task_id="task-1") + + assert response is not None + assert response.data.extras["provider_id"] == "provider-1" + assert response.data.extras["icon"] == "https://example.com/icon.png" + get_trigger_plugin_icon.assert_called_once_with("test_tenant", "provider-1") def test_service_api_iteration_events_no_truncation(self): """Test that Service API doesn't truncate iteration events.""" diff --git a/api/tests/unit_tests/core/workflow/nodes/trigger_plugin/test_trigger_event_node.py b/api/tests/unit_tests/core/workflow/nodes/trigger_plugin/test_trigger_event_node.py new file mode 100644 index 0000000000..282c231344 --- /dev/null +++ b/api/tests/unit_tests/core/workflow/nodes/trigger_plugin/test_trigger_event_node.py @@ -0,0 +1,62 @@ +from core.app.entities.app_invoke_entities import InvokeFrom, UserFrom +from core.workflow.nodes.trigger_plugin.entities import TriggerEventNodeData +from core.workflow.nodes.trigger_plugin.trigger_event_node import TriggerEventNode +from dify_graph.entities.graph_init_params import DIFY_RUN_CONTEXT_KEY, GraphInitParams +from dify_graph.graph_events import NodeRunStartedEvent +from dify_graph.runtime.graph_runtime_state import GraphRuntimeState +from dify_graph.runtime.variable_pool import VariablePool +from dify_graph.system_variable import SystemVariable + + +def create_trigger_event_node(node_data: TriggerEventNodeData) -> TriggerEventNode: + node_config = { + "id": "trigger-node", + "data": node_data.model_dump(), + } + graph_init_params = GraphInitParams( + workflow_id="workflow-1", + graph_config={}, + run_context={ + DIFY_RUN_CONTEXT_KEY: { + "tenant_id": "tenant-1", + "app_id": "app-1", + "user_id": "user-1", + "user_from": UserFrom.ACCOUNT, + "invoke_from": InvokeFrom.SERVICE_API, + } + }, + call_depth=0, + ) + runtime_state = GraphRuntimeState( + variable_pool=VariablePool( + system_variables=SystemVariable.default(), + user_inputs={}, + ), + start_at=0, + ) + return TriggerEventNode( + id="trigger-node", + config=node_config, + graph_init_params=graph_init_params, + graph_runtime_state=runtime_state, + ) + + +def test_trigger_event_start_event_carries_provider_metadata() -> None: + node = create_trigger_event_node( + TriggerEventNodeData( + title="Plugin Trigger", + provider_id="provider-1", + plugin_id="plugin-1", + event_name="event.created", + subscription_id="subscription-1", + plugin_unique_identifier="plugin/provider", + event_parameters={}, + ) + ) + + start_event = next(node.run()) + + assert isinstance(start_event, NodeRunStartedEvent) + assert start_event.provider_id == "provider-1" + assert start_event.extras == {"provider_id": "provider-1"}