Restore trigger provider metadata on start events

This commit is contained in:
-LAN- 2026-03-14 18:38:25 +08:00
parent 9e8a4c8a71
commit 2fd4e9e259
No known key found for this signature in database
GPG Key ID: 6BA0D108DED011FF
9 changed files with 122 additions and 10 deletions

View File

@ -462,31 +462,33 @@ class WorkflowResponseConverter:
agent_strategy=event.agent_strategy, agent_strategy=event.agent_strategy,
), ),
) )
response.data.extras.update(event.extras)
provider_id = str(response.data.extras.get("provider_id") or event.provider_id)
try: try:
if event.node_type == NodeType.TOOL: if event.node_type == NodeType.TOOL:
response.data.extras["icon"] = ToolManager.get_tool_icon( response.data.extras["icon"] = ToolManager.get_tool_icon(
tenant_id=self._application_generate_entity.app_config.tenant_id, tenant_id=self._application_generate_entity.app_config.tenant_id,
provider_type=ToolProviderType(event.provider_type), provider_type=ToolProviderType(event.provider_type),
provider_id=event.provider_id, provider_id=provider_id,
) )
elif event.node_type == NodeType.DATASOURCE: elif event.node_type == NodeType.DATASOURCE:
manager = PluginDatasourceManager() manager = PluginDatasourceManager()
provider_entity = manager.fetch_datasource_provider( provider_entity = manager.fetch_datasource_provider(
self._application_generate_entity.app_config.tenant_id, self._application_generate_entity.app_config.tenant_id,
event.provider_id, provider_id,
) )
response.data.extras["icon"] = provider_entity.declaration.identity.generate_datasource_icon_url( response.data.extras["icon"] = provider_entity.declaration.identity.generate_datasource_icon_url(
self._application_generate_entity.app_config.tenant_id self._application_generate_entity.app_config.tenant_id
) )
elif event.node_type == NodeType.TRIGGER_PLUGIN: elif event.node_type == NodeType.TRIGGER_PLUGIN and provider_id:
response.data.extras["icon"] = TriggerManager.get_trigger_plugin_icon( response.data.extras["icon"] = TriggerManager.get_trigger_plugin_icon(
self._application_generate_entity.app_config.tenant_id, self._application_generate_entity.app_config.tenant_id,
event.provider_id, provider_id,
) )
except Exception: except Exception:
# metadata fetch may fail, for example, the plugin daemon is down or plugin is uninstalled. # metadata fetch may fail, for example, the plugin daemon is down or plugin is uninstalled.
logger.warning("failed to fetch icon for %s", event.provider_id) logger.warning("failed to fetch icon for %s", provider_id)
return response return response
@ -595,6 +597,7 @@ class WorkflowResponseConverter:
iteration_id=event.in_iteration_id, iteration_id=event.in_iteration_id,
loop_id=event.in_loop_id, loop_id=event.in_loop_id,
retry_index=event.retry_index, retry_index=event.retry_index,
extras=dict(event.extras),
), ),
) )

View File

@ -392,6 +392,7 @@ class WorkflowBasedAppRunner:
process_data = node_run_result.process_data process_data = node_run_result.process_data
outputs = node_run_result.outputs outputs = node_run_result.outputs
execution_metadata = node_run_result.metadata execution_metadata = node_run_result.metadata
provider_id = event.provider_id or str(event.extras.get("provider_id", ""))
self._publish_event( self._publish_event(
QueueNodeRetryEvent( QueueNodeRetryEvent(
node_execution_id=event.id, node_execution_id=event.id,
@ -407,11 +408,13 @@ class WorkflowBasedAppRunner:
error=event.error, error=event.error,
execution_metadata=execution_metadata, execution_metadata=execution_metadata,
retry_index=event.retry_index, retry_index=event.retry_index,
extras=dict(event.extras),
provider_type=event.provider_type, provider_type=event.provider_type,
provider_id=event.provider_id, provider_id=provider_id,
) )
) )
elif isinstance(event, NodeRunStartedEvent): elif isinstance(event, NodeRunStartedEvent):
provider_id = event.provider_id or str(event.extras.get("provider_id", ""))
self._publish_event( self._publish_event(
QueueNodeStartedEvent( QueueNodeStartedEvent(
node_execution_id=event.id, node_execution_id=event.id,
@ -422,8 +425,9 @@ class WorkflowBasedAppRunner:
in_iteration_id=event.in_iteration_id, in_iteration_id=event.in_iteration_id,
in_loop_id=event.in_loop_id, in_loop_id=event.in_loop_id,
agent_strategy=event.agent_strategy, agent_strategy=event.agent_strategy,
extras=dict(event.extras),
provider_type=event.provider_type, provider_type=event.provider_type,
provider_id=event.provider_id, provider_id=provider_id,
) )
) )
elif isinstance(event, NodeRunSucceededEvent): elif isinstance(event, NodeRunSucceededEvent):

View File

@ -315,8 +315,9 @@ class QueueNodeStartedEvent(AppQueueEvent):
in_loop_id: str | None = None in_loop_id: str | None = None
start_at: datetime start_at: datetime
agent_strategy: AgentNodeStrategyInit | None = None agent_strategy: AgentNodeStrategyInit | None = None
extras: dict[str, object] = Field(default_factory=dict)
# FIXME(-LAN-): only for ToolNode, need to refactor # Legacy provider fields kept for existing start-event consumers.
provider_type: str # should be a core.tools.entities.tool_entities.ToolProviderType provider_type: str # should be a core.tools.entities.tool_entities.ToolProviderType
provider_id: str provider_id: str

View File

@ -472,6 +472,7 @@ class NodeRetryStreamResponse(StreamResponse):
iteration_id: str | None = None iteration_id: str | None = None
loop_id: str | None = None loop_id: str | None = None
retry_index: int = 0 retry_index: int = 0
extras: dict[str, object] = Field(default_factory=dict)
event: StreamEvent = StreamEvent.NODE_RETRY event: StreamEvent = StreamEvent.NODE_RETRY
workflow_run_id: str workflow_run_id: str
@ -502,6 +503,7 @@ class NodeRetryStreamResponse(StreamResponse):
"iteration_id": self.data.iteration_id, "iteration_id": self.data.iteration_id,
"loop_id": self.data.loop_id, "loop_id": self.data.loop_id,
"retry_index": self.data.retry_index, "retry_index": self.data.retry_index,
"extras": {},
}, },
} }

View File

@ -3,6 +3,7 @@ from collections.abc import Mapping
from dify_graph.constants import SYSTEM_VARIABLE_NODE_ID from dify_graph.constants import SYSTEM_VARIABLE_NODE_ID
from dify_graph.entities.workflow_node_execution import WorkflowNodeExecutionMetadataKey, WorkflowNodeExecutionStatus from dify_graph.entities.workflow_node_execution import WorkflowNodeExecutionMetadataKey, WorkflowNodeExecutionStatus
from dify_graph.enums import NodeExecutionType, NodeType from dify_graph.enums import NodeExecutionType, NodeType
from dify_graph.graph_events import NodeRunStartedEvent
from dify_graph.node_events import NodeRunResult from dify_graph.node_events import NodeRunResult
from dify_graph.nodes.base.node import Node from dify_graph.nodes.base.node import Node
@ -32,6 +33,11 @@ class TriggerEventNode(Node[TriggerEventNodeData]):
def version(cls) -> str: def version(cls) -> str:
return "1" return "1"
def customize_start_event(self, event: NodeRunStartedEvent) -> None:
provider_id = self.node_data.provider_id
event.provider_id = provider_id
event.extras["provider_id"] = provider_id
def _run(self) -> NodeRunResult: def _run(self) -> NodeRunResult:
""" """
Run the plugin trigger node. Run the plugin trigger node.

View File

@ -15,8 +15,9 @@ class NodeRunStartedEvent(GraphNodeEventBase):
predecessor_node_id: str | None = None predecessor_node_id: str | None = None
agent_strategy: AgentNodeStrategyInit | None = None agent_strategy: AgentNodeStrategyInit | None = None
start_at: datetime = Field(..., description="node start time") start_at: datetime = Field(..., description="node start time")
extras: dict[str, object] = Field(default_factory=dict)
# FIXME(-LAN-): only for ToolNode # Legacy provider fields kept for existing start-event consumers.
provider_type: str = "" provider_type: str = ""
provider_id: str = "" provider_id: str = ""

View File

@ -273,6 +273,10 @@ class Node(Generic[NodeDataT]):
"""Optional hook for subclasses requiring extra initialization.""" """Optional hook for subclasses requiring extra initialization."""
return return
def customize_start_event(self, event: NodeRunStartedEvent) -> None:
"""Optional hook for subclasses to attach start-event metadata or extras."""
return
@property @property
def graph_init_params(self) -> GraphInitParams: def graph_init_params(self) -> GraphInitParams:
return self._graph_init_params return self._graph_init_params
@ -388,6 +392,8 @@ class Node(Generic[NodeDataT]):
icon=self.agent_strategy_icon, icon=self.agent_strategy_icon,
) )
self.customize_start_event(start_event)
# === # ===
yield start_event yield start_event

View File

@ -6,7 +6,7 @@ import uuid
from collections.abc import Mapping from collections.abc import Mapping
from dataclasses import dataclass from dataclasses import dataclass
from typing import Any from typing import Any
from unittest.mock import Mock from unittest.mock import Mock, patch
import pytest import pytest
@ -737,6 +737,33 @@ class TestWorkflowResponseConverterServiceApiTruncation:
assert not response.data.inputs_truncated assert not response.data.inputs_truncated
assert not response.data.process_data_truncated assert not response.data.process_data_truncated
assert not response.data.outputs_truncated assert not response.data.outputs_truncated
assert response.data.extras == {}
def test_trigger_plugin_start_event_uses_extras_provider_id_for_icon(self):
converter = self.create_test_converter(InvokeFrom.WEB_APP)
event = QueueNodeStartedEvent(
node_execution_id=str(uuid.uuid4()),
node_id="trigger-node",
node_title="Trigger Node",
node_type=NodeType.TRIGGER_PLUGIN,
start_at=naive_utc_now(),
in_iteration_id=None,
in_loop_id=None,
provider_type="",
provider_id="",
extras={"provider_id": "provider-1"},
)
with patch(
"core.app.apps.common.workflow_response_converter.TriggerManager.get_trigger_plugin_icon",
return_value="https://example.com/icon.png",
) as get_trigger_plugin_icon:
response = converter.workflow_node_start_to_stream_response(event=event, task_id="task-1")
assert response is not None
assert response.data.extras["provider_id"] == "provider-1"
assert response.data.extras["icon"] == "https://example.com/icon.png"
get_trigger_plugin_icon.assert_called_once_with("test_tenant", "provider-1")
def test_service_api_iteration_events_no_truncation(self): def test_service_api_iteration_events_no_truncation(self):
"""Test that Service API doesn't truncate iteration events.""" """Test that Service API doesn't truncate iteration events."""

View File

@ -0,0 +1,62 @@
from core.app.entities.app_invoke_entities import InvokeFrom, UserFrom
from core.workflow.nodes.trigger_plugin.entities import TriggerEventNodeData
from core.workflow.nodes.trigger_plugin.trigger_event_node import TriggerEventNode
from dify_graph.entities.graph_init_params import DIFY_RUN_CONTEXT_KEY, GraphInitParams
from dify_graph.graph_events import NodeRunStartedEvent
from dify_graph.runtime.graph_runtime_state import GraphRuntimeState
from dify_graph.runtime.variable_pool import VariablePool
from dify_graph.system_variable import SystemVariable
def create_trigger_event_node(node_data: TriggerEventNodeData) -> TriggerEventNode:
node_config = {
"id": "trigger-node",
"data": node_data.model_dump(),
}
graph_init_params = GraphInitParams(
workflow_id="workflow-1",
graph_config={},
run_context={
DIFY_RUN_CONTEXT_KEY: {
"tenant_id": "tenant-1",
"app_id": "app-1",
"user_id": "user-1",
"user_from": UserFrom.ACCOUNT,
"invoke_from": InvokeFrom.SERVICE_API,
}
},
call_depth=0,
)
runtime_state = GraphRuntimeState(
variable_pool=VariablePool(
system_variables=SystemVariable.default(),
user_inputs={},
),
start_at=0,
)
return TriggerEventNode(
id="trigger-node",
config=node_config,
graph_init_params=graph_init_params,
graph_runtime_state=runtime_state,
)
def test_trigger_event_start_event_carries_provider_metadata() -> None:
node = create_trigger_event_node(
TriggerEventNodeData(
title="Plugin Trigger",
provider_id="provider-1",
plugin_id="plugin-1",
event_name="event.created",
subscription_id="subscription-1",
plugin_unique_identifier="plugin/provider",
event_parameters={},
)
)
start_event = next(node.run())
assert isinstance(start_event, NodeRunStartedEvent)
assert start_event.provider_id == "provider-1"
assert start_event.extras == {"provider_id": "provider-1"}