feat(trigger): enhance trigger plugin data structure and error handling

- Added `plugin_unique_identifier` to `PluginTriggerData` and `TriggerProviderApiEntity` to improve identification of trigger plugins.
- Introduced `PluginTriggerDispatchData` for structured dispatch data in Celery tasks, enhancing the clarity of trigger dispatching.
- Updated `dispatch_triggered_workflows_async` to utilize the new dispatch data structure, improving error handling and logging for trigger invocations.
- Enhanced metadata handling in `TriggerPluginNode` to include trigger information, aiding in debugging and tracking.

These changes improve the robustness and maintainability of trigger plugin interactions within the workflow system.
This commit is contained in:
Harry 2025-09-16 15:39:40 +08:00
parent ee68a685a7
commit 0cb0cea167
9 changed files with 81 additions and 25 deletions

View File

@ -25,6 +25,7 @@ class WorkflowNodeExecutionMetadataKey(StrEnum):
TOTAL_PRICE = "total_price"
CURRENCY = "currency"
TOOL_INFO = "tool_info"
TRIGGER_INFO = "trigger_info"
AGENT_LOG = "agent_log"
ITERATION_ID = "iteration_id"
ITERATION_INDEX = "iteration_index"

View File

@ -15,6 +15,7 @@ class PluginTriggerData(BaseNodeData):
provider_id: str = Field(..., description="Provider ID")
trigger_name: str = Field(..., description="Trigger name")
subscription_id: str = Field(..., description="Subscription ID")
plugin_unique_identifier: str = Field(..., description="Plugin unique identifier")
parameters: dict[str, Any] = Field(default_factory=dict, description="Trigger parameters")
# Error handling

View File

@ -2,11 +2,12 @@ from collections.abc import Mapping
from typing import Any, Optional
from core.plugin.entities.plugin import TriggerProviderID
from core.plugin.impl.exc import PluginDaemonClientSideError, PluginInvokeError
from core.plugin.utils.http_parser import deserialize_request
from core.trigger.entities.api_entities import TriggerProviderSubscriptionApiEntity
from core.trigger.trigger_manager import TriggerManager
from core.workflow.entities.node_entities import NodeRunResult
from core.workflow.entities.workflow_node_execution import WorkflowNodeExecutionStatus
from core.workflow.entities.workflow_node_execution import WorkflowNodeExecutionMetadataKey, WorkflowNodeExecutionStatus
from core.workflow.nodes.base import BaseNode
from core.workflow.nodes.base.entities import BaseNodeData, RetryConfig
from core.workflow.nodes.enums import ErrorStrategy, NodeType
@ -69,6 +70,14 @@ class TriggerPluginNode(BaseNode):
# Get trigger data passed when workflow was triggered
trigger_inputs = dict(self.graph_runtime_state.variable_pool.user_inputs)
metadata = {
WorkflowNodeExecutionMetadataKey.TRIGGER_INFO: {
**trigger_inputs,
"provider_id": self._node_data.provider_id,
"trigger_name": self._node_data.trigger_name,
"plugin_unique_identifier": self._node_data.plugin_unique_identifier,
},
}
request_id = trigger_inputs.get("request_id")
trigger_name = trigger_inputs.get("trigger_name", "")
@ -80,14 +89,24 @@ class TriggerPluginNode(BaseNode):
inputs=trigger_inputs,
outputs={"error": "No request ID or subscription ID available"},
)
try:
subscription: TriggerProviderSubscriptionApiEntity | None = TriggerProviderService.get_subscription_by_id(
tenant_id=self.tenant_id, subscription_id=subscription_id
)
if not subscription:
raise ValueError(f"Subscription {subscription_id} not found")
return NodeRunResult(
status=WorkflowNodeExecutionStatus.FAILED,
inputs=trigger_inputs,
outputs={"error": f"Invalid subscription {subscription_id} not found"},
)
except Exception as e:
return NodeRunResult(
status=WorkflowNodeExecutionStatus.FAILED,
inputs=trigger_inputs,
outputs={"error": f"Failed to get subscription: {str(e)}"},
)
try:
request = deserialize_request(storage.load_once(f"triggers/{request_id}"))
parameters = self._node_data.parameters if hasattr(self, "_node_data") and self._node_data else {}
invoke_response = TriggerManager.invoke_trigger(
@ -102,9 +121,31 @@ class TriggerPluginNode(BaseNode):
)
outputs = invoke_response.event.variables or {}
return NodeRunResult(status=WorkflowNodeExecutionStatus.SUCCEEDED, inputs=trigger_inputs, outputs=outputs)
except PluginInvokeError as e:
return NodeRunResult(
status=WorkflowNodeExecutionStatus.FAILED,
inputs=trigger_inputs,
metadata=metadata,
error="An error occurred in the plugin, "
f"please contact the author of {subscription.provider} for help, "
f"error type: {e.get_error_type()}, "
f"error details: {e.get_error_message()}",
error_type=type(e).__name__,
)
except PluginDaemonClientSideError as e:
return NodeRunResult(
status=WorkflowNodeExecutionStatus.FAILED,
inputs=trigger_inputs,
metadata=metadata,
error=f"Failed to invoke trigger, error: {e.description}",
error_type=type(e).__name__,
)
except Exception as e:
return NodeRunResult(
status=WorkflowNodeExecutionStatus.FAILED,
inputs=trigger_inputs,
outputs={"error": f"Failed to invoke trigger: {str(e)}", "request_id": request_id},
metadata=metadata,
error=f"Failed to invoke trigger: {str(e)}",
error_type=type(e).__name__,
)

View File

@ -18,7 +18,7 @@ from models.trigger import TriggerSubscription
from models.workflow import Workflow, WorkflowPluginTrigger
from services.async_workflow_service import AsyncWorkflowService
from services.trigger.trigger_provider_service import TriggerProviderService
from services.workflow.entities import PluginTriggerData
from services.workflow.entities import PluginTriggerData, PluginTriggerDispatchData
logger = logging.getLogger(__name__)
@ -150,8 +150,7 @@ class TriggerService:
# Production dispatch
from tasks.trigger_processing_tasks import dispatch_triggered_workflows_async
dispatch_triggered_workflows_async(
plugin_trigger_dispatch_data = PluginTriggerDispatchData(
endpoint_id=endpoint_id,
provider_id=subscription.provider_id,
subscription_id=subscription.id,
@ -159,6 +158,8 @@ class TriggerService:
triggers=list(dispatch_response.triggers),
request_id=request_id,
)
dispatch_data = plugin_trigger_dispatch_data.model_dump(mode="json")
dispatch_triggered_workflows_async.delay(dispatch_data)
logger.info(
"Queued async dispatching for %d triggers on endpoint %s with request_id %s",

View File

@ -57,6 +57,15 @@ class PluginTriggerData(TriggerData):
plugin_id: str
endpoint_id: str
class PluginTriggerDispatchData(BaseModel):
"""Plugin trigger dispatch data for Celery tasks"""
endpoint_id: str
provider_id: str
subscription_id: str
timestamp: int
triggers: list[str]
request_id: str
class WorkflowTaskData(BaseModel):
"""Lightweight data structure for Celery workflow tasks"""

View File

@ -17,6 +17,7 @@ from extensions.ext_storage import storage
from models.trigger import TriggerSubscription
from services.trigger_debug_service import TriggerDebugService
from services.trigger_service import TriggerService
from services.workflow.entities import PluginTriggerDispatchData
logger = logging.getLogger(__name__)
@ -24,15 +25,9 @@ logger = logging.getLogger(__name__)
TRIGGER_QUEUE = "triggered_workflow_dispatcher"
@shared_task(queue=TRIGGER_QUEUE, bind=True, max_retries=3)
@shared_task(queue=TRIGGER_QUEUE)
def dispatch_triggered_workflows_async(
self,
endpoint_id: str,
provider_id: str,
subscription_id: str,
timestamp: int,
triggers: list[str],
request_id: str,
dispatch_data: dict,
) -> dict:
"""
Dispatch triggers asynchronously.
@ -48,6 +43,16 @@ def dispatch_triggered_workflows_async(
Returns:
dict: Execution result with status and dispatched trigger count
"""
dispatch_params: PluginTriggerDispatchData = PluginTriggerDispatchData.model_validate(
dispatch_data
)
endpoint_id = dispatch_params.endpoint_id
provider_id = dispatch_params.provider_id
subscription_id = dispatch_params.subscription_id
timestamp = dispatch_params.timestamp
triggers = dispatch_params.triggers
request_id = dispatch_params.request_id
try:
logger.info(
"Starting async trigger dispatching for endpoint=%s, triggers=%s, request_id=%s, timestamp=%s",
@ -141,17 +146,11 @@ def dispatch_triggered_workflows_async(
except Exception as e:
logger.exception(
"Error in async trigger dispatching for endpoint %s",
"Error in async trigger dispatching for endpoint %s data %s",
endpoint_id,
dispatch_data,
)
# Retry the task if not exceeded max retries
if self.request.retries < self.max_retries:
raise self.retry(exc=e, countdown=60 * (self.request.retries + 1))
# Note: Stored request is not deleted even on failure. See comment above for cleanup strategy.
return {
"status": "failed",
"error": str(e),
"retries": self.request.retries,
}

View File

@ -66,6 +66,7 @@ const TriggerPluginActionItem: FC<Props> = ({
trigger_name: payload.name,
trigger_label: payload.label[language],
trigger_description: payload.description[language],
plugin_unique_identifier: provider.plugin_unique_identifier,
title: payload.label[language],
is_team_authorization: provider.is_team_authorization,
output_schema: payload.output_schema || {},

View File

@ -35,11 +35,12 @@ export type TriggerDefaultValue = PluginDefaultValue & {
trigger_label: string
trigger_description: string
title: string
plugin_unique_identifier: string
is_team_authorization: boolean
params: Record<string, any>
paramSchemas: Record<string, any>[]
output_schema: Record<string, any>
credential_id?: string
subscription_id?: string
meta?: PluginMeta
}
@ -149,7 +150,7 @@ export type TriggerProviderApiEntity = {
icon_dark?: string
tags: string[]
plugin_id?: string
plugin_unique_identifier?: string
plugin_unique_identifier: string
credentials_schema: TriggerCredentialField[]
oauth_client_schema: TriggerCredentialField[]
subscription_schema: TriggerSubscriptionSchema
@ -160,6 +161,7 @@ export type TriggerProviderApiEntity = {
export type TriggerWithProvider = Collection & {
triggers: Trigger[]
meta: PluginMeta
plugin_unique_identifier: string
credentials_schema?: TriggerCredentialField[]
oauth_client_schema?: TriggerCredentialField[]
subscription_schema?: TriggerSubscriptionSchema

View File

@ -32,6 +32,7 @@ const convertToTriggerWithProvider = (provider: TriggerProviderApiEntity): Trigg
allow_delete: false,
labels: provider.tags || [],
plugin_id: provider.plugin_id,
plugin_unique_identifier: provider.plugin_unique_identifier || '',
triggers: provider.triggers.map(trigger => ({
name: trigger.name,
author: provider.author,