From ad2b910d73653c5095443afec453a392ec3f227b Mon Sep 17 00:00:00 2001 From: Harry Date: Wed, 15 Oct 2025 17:05:21 +0800 Subject: [PATCH] refactor(trigger): Enhance error handling and parameter resolution in trigger workflows - Improved error handling in `DraftWorkflowTriggerRunApi`, `DraftWorkflowTriggerNodeApi`, and `DraftWorkflowTriggerRunAllApi` to raise exceptions directly, providing clearer error messages. - Introduced `get_event_parameters` method in `PluginTriggerProviderController` to retrieve event parameters for triggers. - Updated `PluginTriggerNodeData` to include a new method for resolving parameters based on event schemas, ensuring better validation and handling of trigger inputs. - Refactored `TriggerService` to utilize the new parameter resolution method, enhancing the clarity and reliability of trigger invocations. --- api/controllers/console/app/workflow.py | 68 ++++++++----------- api/core/trigger/provider.py | 7 ++ .../workflow/nodes/trigger_plugin/entities.py | 63 ++++++++++++++++- api/core/workflow/nodes/trigger_plugin/exc.py | 10 +++ api/services/trigger/trigger_service.py | 14 +++- 5 files changed, 117 insertions(+), 45 deletions(-) create mode 100644 api/core/workflow/nodes/trigger_plugin/exc.py diff --git a/api/controllers/console/app/workflow.py b/api/controllers/console/app/workflow.py index d23ff49aa9..4d525a0107 100644 --- a/api/controllers/console/app/workflow.py +++ b/api/controllers/console/app/workflow.py @@ -27,7 +27,6 @@ from core.trigger.debug.event_selectors import ( create_event_poller, select_trigger_debug_events, ) -from core.trigger.errors import TriggerPluginInvokeError from core.workflow.enums import NodeType from core.workflow.graph_engine.manager import GraphEngineManager from extensions.ext_database import db @@ -1068,22 +1067,13 @@ class DraftWorkflowTriggerRunApi(Resource): ) except InvokeRateLimitError as ex: raise InvokeRateLimitHttpError(ex.description) - except TriggerPluginInvokeError as e: - logger.exception("Error invoking trigger event") - return jsonable_encoder( - { - "status": "error", - "error": e.get_error_message(), - "error_type": e.get_error_type(), - } - ), 500 - except Exception: - logger.exception("Error running draft workflow trigger run") - return jsonable_encoder( - { - "status": "error", - } - ), 500 + except ValueError as e: + raise e + except PluginInvokeError as e: + raise e + except Exception as e: + logger.exception("Error polling trigger debug event") + raise e @console_ns.route("/apps//workflows/draft/nodes//trigger/run") @@ -1128,15 +1118,23 @@ class DraftWorkflowTriggerNodeApi(Resource): ) # for other trigger types, poll for the event else: - poller: TriggerDebugEventPoller = create_event_poller( - draft_workflow=draft_workflow, - tenant_id=app_model.tenant_id, - user_id=current_user.id, - app_id=app_model.id, - node_id=node_id, - ) - event = poller.poll() - + try: + poller: TriggerDebugEventPoller = create_event_poller( + draft_workflow=draft_workflow, + tenant_id=app_model.tenant_id, + user_id=current_user.id, + app_id=app_model.id, + node_id=node_id, + ) + event = poller.poll() + except ValueError as e: + logger.exception("Error polling trigger debug event") + raise e + except PluginInvokeError as e: + raise e + except Exception as e: + logger.exception("Error polling trigger debug event") + raise e if not event: return jsonable_encoder({"status": "waiting", "retry_in": LISTENING_RETRY_IN}) try: @@ -1205,23 +1203,13 @@ class DraftWorkflowTriggerRunAllApi(Resource): trigger_debug_event: TriggerDebugEvent | None = select_trigger_debug_events( draft_workflow=draft_workflow, app_model=app_model, user_id=current_user.id, node_ids=node_ids ) + except ValueError as e: + raise e except PluginInvokeError as e: - logger.exception("Error selecting trigger debug event") - return jsonable_encoder( - { - "status": "error", - "error": e.get_error_message(), - "error_type": e.get_error_type(), - } - ), 500 + raise e except Exception as e: logger.exception("Error polling trigger debug event") - return jsonable_encoder( - { - "status": "error", - } - ), 500 - + raise e if trigger_debug_event is None: return jsonable_encoder({"status": "waiting", "retry_in": LISTENING_RETRY_IN}) diff --git a/api/core/trigger/provider.py b/api/core/trigger/provider.py index 467a012de8..18f8484734 100644 --- a/api/core/trigger/provider.py +++ b/api/core/trigger/provider.py @@ -19,6 +19,7 @@ from core.plugin.impl.trigger import PluginTriggerManager from core.trigger.entities.api_entities import EventApiEntity, TriggerProviderApiEntity from core.trigger.entities.entities import ( EventEntity, + EventParameter, ProviderConfig, Subscription, SubscriptionConstructor, @@ -255,6 +256,12 @@ class PluginTriggerProviderController: else [] ) + def get_event_parameters(self, event_name: str) -> Mapping[str, EventParameter]: + """ + Get event parameters for this provider + """ + return {parameter.name: parameter for parameter in self.get_event(event_name).parameters} + def dispatch( self, user_id: str, diff --git a/api/core/workflow/nodes/trigger_plugin/entities.py b/api/core/workflow/nodes/trigger_plugin/entities.py index b3d8f38895..3f3134a849 100644 --- a/api/core/workflow/nodes/trigger_plugin/entities.py +++ b/api/core/workflow/nodes/trigger_plugin/entities.py @@ -1,14 +1,42 @@ -from typing import Any, Optional +from collections.abc import Mapping +from typing import Any, Literal, Optional, Union -from pydantic import Field +from pydantic import BaseModel, Field, ValidationInfo, field_validator +from core.trigger.entities.entities import EventParameter from core.workflow.enums import ErrorStrategy from core.workflow.nodes.base.entities import BaseNodeData, RetryConfig +from core.workflow.nodes.trigger_plugin.exc import TriggerEventParameterError class PluginTriggerNodeData(BaseNodeData): """Plugin trigger node data""" + class PluginTriggerInput(BaseModel): + value: Union[Any, list[str]] + type: Literal["mixed", "variable", "constant"] + + @field_validator("type", mode="before") + @classmethod + def check_type(cls, value, validation_info: ValidationInfo): + typ = value + value = validation_info.data.get("value") + + if value is None: + return typ + + if typ == "mixed" and not isinstance(value, str): + raise ValueError("value must be a string") + elif typ == "variable": + if not isinstance(value, list): + raise ValueError("value must be a list") + for val in value: + if not isinstance(val, str): + raise ValueError("value must be a list of strings") + elif typ == "constant" and not isinstance(value, str | int | float | bool | dict): + raise ValueError("value must be a string, int, float, bool or dict") + return typ + title: str desc: Optional[str] = None plugin_id: str = Field(..., description="Plugin ID") @@ -16,10 +44,39 @@ class PluginTriggerNodeData(BaseNodeData): event_name: str = Field(..., description="Event name") subscription_id: str = Field(..., description="Subscription ID") plugin_unique_identifier: str = Field(..., description="Plugin unique identifier") - parameters: dict[str, Any] = Field(default_factory=dict, description="Trigger parameters") + parameters: Mapping[str, PluginTriggerInput] = Field(default_factory=dict, description="Trigger parameters") # Error handling error_strategy: Optional[ErrorStrategy] = Field( default=ErrorStrategy.FAIL_BRANCH, description="Error handling strategy" ) retry_config: RetryConfig = Field(default_factory=lambda: RetryConfig(), description="Retry configuration") + + def resolve_parameters( + self, + *, + parameter_schemas: Mapping[str, EventParameter], + ) -> Mapping[str, Any]: + """ + Generate parameters based on the given plugin trigger parameters. + + Args: + parameter_schemas (Mapping[str, EventParameter]): The mapping of parameter schemas. + + Returns: + Mapping[str, Any]: A dictionary containing the generated parameters. + + """ + result: Mapping[str, Any] = {} + for parameter_name in self.parameters: + parameter: EventParameter | None = parameter_schemas.get(parameter_name) + if not parameter: + result[parameter_name] = None + continue + event_input = self.parameters[parameter_name] + + # trigger node only supports constant input + if event_input.type != "constant": + raise TriggerEventParameterError(f"Unknown plugin trigger input type '{event_input.type}'") + result[parameter_name] = event_input.value + return result diff --git a/api/core/workflow/nodes/trigger_plugin/exc.py b/api/core/workflow/nodes/trigger_plugin/exc.py new file mode 100644 index 0000000000..c9d900eaeb --- /dev/null +++ b/api/core/workflow/nodes/trigger_plugin/exc.py @@ -0,0 +1,10 @@ +class TriggerEventNodeError(ValueError): + """Base exception for plugin trigger node errors.""" + + pass + + +class TriggerEventParameterError(TriggerEventNodeError): + """Exception raised for errors in plugin trigger parameters.""" + + pass \ No newline at end of file diff --git a/api/services/trigger/trigger_service.py b/api/services/trigger/trigger_service.py index 8dc5312e2f..99e7cce112 100644 --- a/api/services/trigger/trigger_service.py +++ b/api/services/trigger/trigger_service.py @@ -59,12 +59,17 @@ class TriggerService: if not request: raise ValueError("Request not found") # invoke triger + provider_controller: PluginTriggerProviderController = TriggerManager.get_trigger_provider( + tenant_id, TriggerProviderID(subscription.provider_id) + ) return TriggerManager.invoke_trigger_event( tenant_id=tenant_id, user_id=user_id, provider_id=TriggerProviderID(event.provider_id), event_name=event.name, - parameters=node_data.parameters, + parameters=node_data.resolve_parameters( + parameter_schemas=provider_controller.get_event_parameters(event_name=event.name) + ), credentials=subscription.credentials, credential_type=CredentialType.of(subscription.credential_type), subscription=subscription.to_entity(), @@ -133,6 +138,9 @@ class TriggerService: return 0 dispatched_count = 0 + provider_controller: PluginTriggerProviderController = TriggerManager.get_trigger_provider( + tenant_id=subscription.tenant_id, provider_id=TriggerProviderID(subscription.provider_id) + ) with Session(db.engine) as session: tenant_owner = cls._get_tenant_owner(session, subscription.tenant_id) workflows = cls._get_latest_workflows_by_app_ids(session, subscribers) @@ -164,7 +172,9 @@ class TriggerService: user_id=subscription.user_id, provider_id=TriggerProviderID(subscription.provider_id), event_name=event.identity.name, - parameters=node_data.parameters, + parameters=node_data.resolve_parameters( + parameter_schemas=provider_controller.get_event_parameters(event_name=event.identity.name) + ), credentials=subscription.credentials, credential_type=CredentialType.of(subscription.credential_type), subscription=subscription.to_entity(),