diff --git a/api/core/workflow/entities/workflow_node_execution.py b/api/core/workflow/entities/workflow_node_execution.py index 09a408f4d7..dc0b53d546 100644 --- a/api/core/workflow/entities/workflow_node_execution.py +++ b/api/core/workflow/entities/workflow_node_execution.py @@ -25,6 +25,7 @@ class WorkflowNodeExecutionMetadataKey(StrEnum): TOTAL_PRICE = "total_price" CURRENCY = "currency" TOOL_INFO = "tool_info" + TRIGGER_INFO = "trigger_info" AGENT_LOG = "agent_log" ITERATION_ID = "iteration_id" ITERATION_INDEX = "iteration_index" diff --git a/api/core/workflow/nodes/trigger_plugin/entities.py b/api/core/workflow/nodes/trigger_plugin/entities.py index 0ecda92bbb..afe37eb83f 100644 --- a/api/core/workflow/nodes/trigger_plugin/entities.py +++ b/api/core/workflow/nodes/trigger_plugin/entities.py @@ -15,6 +15,7 @@ class PluginTriggerData(BaseNodeData): provider_id: str = Field(..., description="Provider ID") trigger_name: str = Field(..., description="Trigger name") subscription_id: str = Field(..., description="Subscription ID") + plugin_unique_identifier: str = Field(..., description="Plugin unique identifier") parameters: dict[str, Any] = Field(default_factory=dict, description="Trigger parameters") # Error handling diff --git a/api/core/workflow/nodes/trigger_plugin/trigger_plugin_node.py b/api/core/workflow/nodes/trigger_plugin/trigger_plugin_node.py index 37341401ce..32d22add7c 100644 --- a/api/core/workflow/nodes/trigger_plugin/trigger_plugin_node.py +++ b/api/core/workflow/nodes/trigger_plugin/trigger_plugin_node.py @@ -2,11 +2,12 @@ from collections.abc import Mapping from typing import Any, Optional from core.plugin.entities.plugin import TriggerProviderID +from core.plugin.impl.exc import PluginDaemonClientSideError, PluginInvokeError from core.plugin.utils.http_parser import deserialize_request from core.trigger.entities.api_entities import TriggerProviderSubscriptionApiEntity from core.trigger.trigger_manager import TriggerManager from core.workflow.entities.node_entities import NodeRunResult -from core.workflow.entities.workflow_node_execution import WorkflowNodeExecutionStatus +from core.workflow.entities.workflow_node_execution import WorkflowNodeExecutionMetadataKey, WorkflowNodeExecutionStatus from core.workflow.nodes.base import BaseNode from core.workflow.nodes.base.entities import BaseNodeData, RetryConfig from core.workflow.nodes.enums import ErrorStrategy, NodeType @@ -69,6 +70,14 @@ class TriggerPluginNode(BaseNode): # Get trigger data passed when workflow was triggered trigger_inputs = dict(self.graph_runtime_state.variable_pool.user_inputs) + metadata = { + WorkflowNodeExecutionMetadataKey.TRIGGER_INFO: { + **trigger_inputs, + "provider_id": self._node_data.provider_id, + "trigger_name": self._node_data.trigger_name, + "plugin_unique_identifier": self._node_data.plugin_unique_identifier, + }, + } request_id = trigger_inputs.get("request_id") trigger_name = trigger_inputs.get("trigger_name", "") @@ -80,14 +89,24 @@ class TriggerPluginNode(BaseNode): inputs=trigger_inputs, outputs={"error": "No request ID or subscription ID available"}, ) - try: subscription: TriggerProviderSubscriptionApiEntity | None = TriggerProviderService.get_subscription_by_id( tenant_id=self.tenant_id, subscription_id=subscription_id ) if not subscription: - raise ValueError(f"Subscription {subscription_id} not found") + return NodeRunResult( + status=WorkflowNodeExecutionStatus.FAILED, + inputs=trigger_inputs, + outputs={"error": f"Invalid subscription {subscription_id} not found"}, + ) + except Exception as e: + return NodeRunResult( + status=WorkflowNodeExecutionStatus.FAILED, + inputs=trigger_inputs, + outputs={"error": f"Failed to get subscription: {str(e)}"}, + ) + try: request = deserialize_request(storage.load_once(f"triggers/{request_id}")) parameters = self._node_data.parameters if hasattr(self, "_node_data") and self._node_data else {} invoke_response = TriggerManager.invoke_trigger( @@ -102,9 +121,31 @@ class TriggerPluginNode(BaseNode): ) outputs = invoke_response.event.variables or {} return NodeRunResult(status=WorkflowNodeExecutionStatus.SUCCEEDED, inputs=trigger_inputs, outputs=outputs) + except PluginInvokeError as e: + return NodeRunResult( + status=WorkflowNodeExecutionStatus.FAILED, + inputs=trigger_inputs, + metadata=metadata, + error="An error occurred in the plugin, " + f"please contact the author of {subscription.provider} for help, " + f"error type: {e.get_error_type()}, " + f"error details: {e.get_error_message()}", + error_type=type(e).__name__, + ) + except PluginDaemonClientSideError as e: + return NodeRunResult( + status=WorkflowNodeExecutionStatus.FAILED, + inputs=trigger_inputs, + metadata=metadata, + error=f"Failed to invoke trigger, error: {e.description}", + error_type=type(e).__name__, + ) + except Exception as e: return NodeRunResult( status=WorkflowNodeExecutionStatus.FAILED, inputs=trigger_inputs, - outputs={"error": f"Failed to invoke trigger: {str(e)}", "request_id": request_id}, + metadata=metadata, + error=f"Failed to invoke trigger: {str(e)}", + error_type=type(e).__name__, ) diff --git a/api/services/trigger_service.py b/api/services/trigger_service.py index 488ec14805..094d05f40f 100644 --- a/api/services/trigger_service.py +++ b/api/services/trigger_service.py @@ -18,7 +18,7 @@ from models.trigger import TriggerSubscription from models.workflow import Workflow, WorkflowPluginTrigger from services.async_workflow_service import AsyncWorkflowService from services.trigger.trigger_provider_service import TriggerProviderService -from services.workflow.entities import PluginTriggerData +from services.workflow.entities import PluginTriggerData, PluginTriggerDispatchData logger = logging.getLogger(__name__) @@ -150,8 +150,7 @@ class TriggerService: # Production dispatch from tasks.trigger_processing_tasks import dispatch_triggered_workflows_async - - dispatch_triggered_workflows_async( + plugin_trigger_dispatch_data = PluginTriggerDispatchData( endpoint_id=endpoint_id, provider_id=subscription.provider_id, subscription_id=subscription.id, @@ -159,6 +158,8 @@ class TriggerService: triggers=list(dispatch_response.triggers), request_id=request_id, ) + dispatch_data = plugin_trigger_dispatch_data.model_dump(mode="json") + dispatch_triggered_workflows_async.delay(dispatch_data) logger.info( "Queued async dispatching for %d triggers on endpoint %s with request_id %s", diff --git a/api/services/workflow/entities.py b/api/services/workflow/entities.py index 2a199642b2..f2f440bf38 100644 --- a/api/services/workflow/entities.py +++ b/api/services/workflow/entities.py @@ -57,6 +57,15 @@ class PluginTriggerData(TriggerData): plugin_id: str endpoint_id: str +class PluginTriggerDispatchData(BaseModel): + """Plugin trigger dispatch data for Celery tasks""" + + endpoint_id: str + provider_id: str + subscription_id: str + timestamp: int + triggers: list[str] + request_id: str class WorkflowTaskData(BaseModel): """Lightweight data structure for Celery workflow tasks""" diff --git a/api/tasks/trigger_processing_tasks.py b/api/tasks/trigger_processing_tasks.py index 798c29685e..73b5ca944b 100644 --- a/api/tasks/trigger_processing_tasks.py +++ b/api/tasks/trigger_processing_tasks.py @@ -17,6 +17,7 @@ from extensions.ext_storage import storage from models.trigger import TriggerSubscription from services.trigger_debug_service import TriggerDebugService from services.trigger_service import TriggerService +from services.workflow.entities import PluginTriggerDispatchData logger = logging.getLogger(__name__) @@ -24,15 +25,9 @@ logger = logging.getLogger(__name__) TRIGGER_QUEUE = "triggered_workflow_dispatcher" -@shared_task(queue=TRIGGER_QUEUE, bind=True, max_retries=3) +@shared_task(queue=TRIGGER_QUEUE) def dispatch_triggered_workflows_async( - self, - endpoint_id: str, - provider_id: str, - subscription_id: str, - timestamp: int, - triggers: list[str], - request_id: str, + dispatch_data: dict, ) -> dict: """ Dispatch triggers asynchronously. @@ -48,6 +43,16 @@ def dispatch_triggered_workflows_async( Returns: dict: Execution result with status and dispatched trigger count """ + dispatch_params: PluginTriggerDispatchData = PluginTriggerDispatchData.model_validate( + dispatch_data + ) + endpoint_id = dispatch_params.endpoint_id + provider_id = dispatch_params.provider_id + subscription_id = dispatch_params.subscription_id + timestamp = dispatch_params.timestamp + triggers = dispatch_params.triggers + request_id = dispatch_params.request_id + try: logger.info( "Starting async trigger dispatching for endpoint=%s, triggers=%s, request_id=%s, timestamp=%s", @@ -141,17 +146,11 @@ def dispatch_triggered_workflows_async( except Exception as e: logger.exception( - "Error in async trigger dispatching for endpoint %s", + "Error in async trigger dispatching for endpoint %s data %s", endpoint_id, + dispatch_data, ) - # Retry the task if not exceeded max retries - if self.request.retries < self.max_retries: - raise self.retry(exc=e, countdown=60 * (self.request.retries + 1)) - - # Note: Stored request is not deleted even on failure. See comment above for cleanup strategy. - return { "status": "failed", "error": str(e), - "retries": self.request.retries, } diff --git a/web/app/components/workflow/block-selector/trigger-plugin/action-item.tsx b/web/app/components/workflow/block-selector/trigger-plugin/action-item.tsx index 81e63e271e..92306c7d8e 100644 --- a/web/app/components/workflow/block-selector/trigger-plugin/action-item.tsx +++ b/web/app/components/workflow/block-selector/trigger-plugin/action-item.tsx @@ -66,6 +66,7 @@ const TriggerPluginActionItem: FC = ({ trigger_name: payload.name, trigger_label: payload.label[language], trigger_description: payload.description[language], + plugin_unique_identifier: provider.plugin_unique_identifier, title: payload.label[language], is_team_authorization: provider.is_team_authorization, output_schema: payload.output_schema || {}, diff --git a/web/app/components/workflow/block-selector/types.ts b/web/app/components/workflow/block-selector/types.ts index 873e4335e3..bdbd24f7b3 100644 --- a/web/app/components/workflow/block-selector/types.ts +++ b/web/app/components/workflow/block-selector/types.ts @@ -35,11 +35,12 @@ export type TriggerDefaultValue = PluginDefaultValue & { trigger_label: string trigger_description: string title: string + plugin_unique_identifier: string is_team_authorization: boolean params: Record paramSchemas: Record[] output_schema: Record - credential_id?: string + subscription_id?: string meta?: PluginMeta } @@ -149,7 +150,7 @@ export type TriggerProviderApiEntity = { icon_dark?: string tags: string[] plugin_id?: string - plugin_unique_identifier?: string + plugin_unique_identifier: string credentials_schema: TriggerCredentialField[] oauth_client_schema: TriggerCredentialField[] subscription_schema: TriggerSubscriptionSchema @@ -160,6 +161,7 @@ export type TriggerProviderApiEntity = { export type TriggerWithProvider = Collection & { triggers: Trigger[] meta: PluginMeta + plugin_unique_identifier: string credentials_schema?: TriggerCredentialField[] oauth_client_schema?: TriggerCredentialField[] subscription_schema?: TriggerSubscriptionSchema diff --git a/web/service/use-triggers.ts b/web/service/use-triggers.ts index dc0d72bc96..c4193ae650 100644 --- a/web/service/use-triggers.ts +++ b/web/service/use-triggers.ts @@ -32,6 +32,7 @@ const convertToTriggerWithProvider = (provider: TriggerProviderApiEntity): Trigg allow_delete: false, labels: provider.tags || [], plugin_id: provider.plugin_id, + plugin_unique_identifier: provider.plugin_unique_identifier || '', triggers: provider.triggers.map(trigger => ({ name: trigger.name, author: provider.author,