refactor(trigger): Enhance error handling and parameter resolution in trigger workflows

- Improved error handling in `DraftWorkflowTriggerRunApi`, `DraftWorkflowTriggerNodeApi`, and `DraftWorkflowTriggerRunAllApi` to raise exceptions directly, providing clearer error messages.
- Introduced `get_event_parameters` method in `PluginTriggerProviderController` to retrieve event parameters for triggers.
- Updated `PluginTriggerNodeData` to include a new method for resolving parameters based on event schemas, ensuring better validation and handling of trigger inputs.
- Refactored `TriggerService` to utilize the new parameter resolution method, enhancing the clarity and reliability of trigger invocations.
This commit is contained in:
Harry 2025-10-15 17:05:21 +08:00
parent f28a7218cd
commit ad2b910d73
5 changed files with 117 additions and 45 deletions

View File

@ -27,7 +27,6 @@ from core.trigger.debug.event_selectors import (
create_event_poller,
select_trigger_debug_events,
)
from core.trigger.errors import TriggerPluginInvokeError
from core.workflow.enums import NodeType
from core.workflow.graph_engine.manager import GraphEngineManager
from extensions.ext_database import db
@ -1068,22 +1067,13 @@ class DraftWorkflowTriggerRunApi(Resource):
)
except InvokeRateLimitError as ex:
raise InvokeRateLimitHttpError(ex.description)
except TriggerPluginInvokeError as e:
logger.exception("Error invoking trigger event")
return jsonable_encoder(
{
"status": "error",
"error": e.get_error_message(),
"error_type": e.get_error_type(),
}
), 500
except Exception:
logger.exception("Error running draft workflow trigger run")
return jsonable_encoder(
{
"status": "error",
}
), 500
except ValueError as e:
raise e
except PluginInvokeError as e:
raise e
except Exception as e:
logger.exception("Error polling trigger debug event")
raise e
@console_ns.route("/apps/<uuid:app_id>/workflows/draft/nodes/<string:node_id>/trigger/run")
@ -1128,15 +1118,23 @@ class DraftWorkflowTriggerNodeApi(Resource):
)
# for other trigger types, poll for the event
else:
poller: TriggerDebugEventPoller = create_event_poller(
draft_workflow=draft_workflow,
tenant_id=app_model.tenant_id,
user_id=current_user.id,
app_id=app_model.id,
node_id=node_id,
)
event = poller.poll()
try:
poller: TriggerDebugEventPoller = create_event_poller(
draft_workflow=draft_workflow,
tenant_id=app_model.tenant_id,
user_id=current_user.id,
app_id=app_model.id,
node_id=node_id,
)
event = poller.poll()
except ValueError as e:
logger.exception("Error polling trigger debug event")
raise e
except PluginInvokeError as e:
raise e
except Exception as e:
logger.exception("Error polling trigger debug event")
raise e
if not event:
return jsonable_encoder({"status": "waiting", "retry_in": LISTENING_RETRY_IN})
try:
@ -1205,23 +1203,13 @@ class DraftWorkflowTriggerRunAllApi(Resource):
trigger_debug_event: TriggerDebugEvent | None = select_trigger_debug_events(
draft_workflow=draft_workflow, app_model=app_model, user_id=current_user.id, node_ids=node_ids
)
except ValueError as e:
raise e
except PluginInvokeError as e:
logger.exception("Error selecting trigger debug event")
return jsonable_encoder(
{
"status": "error",
"error": e.get_error_message(),
"error_type": e.get_error_type(),
}
), 500
raise e
except Exception as e:
logger.exception("Error polling trigger debug event")
return jsonable_encoder(
{
"status": "error",
}
), 500
raise e
if trigger_debug_event is None:
return jsonable_encoder({"status": "waiting", "retry_in": LISTENING_RETRY_IN})

View File

@ -19,6 +19,7 @@ from core.plugin.impl.trigger import PluginTriggerManager
from core.trigger.entities.api_entities import EventApiEntity, TriggerProviderApiEntity
from core.trigger.entities.entities import (
EventEntity,
EventParameter,
ProviderConfig,
Subscription,
SubscriptionConstructor,
@ -255,6 +256,12 @@ class PluginTriggerProviderController:
else []
)
def get_event_parameters(self, event_name: str) -> Mapping[str, EventParameter]:
"""
Get event parameters for this provider
"""
return {parameter.name: parameter for parameter in self.get_event(event_name).parameters}
def dispatch(
self,
user_id: str,

View File

@ -1,14 +1,42 @@
from typing import Any, Optional
from collections.abc import Mapping
from typing import Any, Literal, Optional, Union
from pydantic import Field
from pydantic import BaseModel, Field, ValidationInfo, field_validator
from core.trigger.entities.entities import EventParameter
from core.workflow.enums import ErrorStrategy
from core.workflow.nodes.base.entities import BaseNodeData, RetryConfig
from core.workflow.nodes.trigger_plugin.exc import TriggerEventParameterError
class PluginTriggerNodeData(BaseNodeData):
"""Plugin trigger node data"""
class PluginTriggerInput(BaseModel):
value: Union[Any, list[str]]
type: Literal["mixed", "variable", "constant"]
@field_validator("type", mode="before")
@classmethod
def check_type(cls, value, validation_info: ValidationInfo):
typ = value
value = validation_info.data.get("value")
if value is None:
return typ
if typ == "mixed" and not isinstance(value, str):
raise ValueError("value must be a string")
elif typ == "variable":
if not isinstance(value, list):
raise ValueError("value must be a list")
for val in value:
if not isinstance(val, str):
raise ValueError("value must be a list of strings")
elif typ == "constant" and not isinstance(value, str | int | float | bool | dict):
raise ValueError("value must be a string, int, float, bool or dict")
return typ
title: str
desc: Optional[str] = None
plugin_id: str = Field(..., description="Plugin ID")
@ -16,10 +44,39 @@ class PluginTriggerNodeData(BaseNodeData):
event_name: str = Field(..., description="Event name")
subscription_id: str = Field(..., description="Subscription ID")
plugin_unique_identifier: str = Field(..., description="Plugin unique identifier")
parameters: dict[str, Any] = Field(default_factory=dict, description="Trigger parameters")
parameters: Mapping[str, PluginTriggerInput] = Field(default_factory=dict, description="Trigger parameters")
# Error handling
error_strategy: Optional[ErrorStrategy] = Field(
default=ErrorStrategy.FAIL_BRANCH, description="Error handling strategy"
)
retry_config: RetryConfig = Field(default_factory=lambda: RetryConfig(), description="Retry configuration")
def resolve_parameters(
self,
*,
parameter_schemas: Mapping[str, EventParameter],
) -> Mapping[str, Any]:
"""
Generate parameters based on the given plugin trigger parameters.
Args:
parameter_schemas (Mapping[str, EventParameter]): The mapping of parameter schemas.
Returns:
Mapping[str, Any]: A dictionary containing the generated parameters.
"""
result: Mapping[str, Any] = {}
for parameter_name in self.parameters:
parameter: EventParameter | None = parameter_schemas.get(parameter_name)
if not parameter:
result[parameter_name] = None
continue
event_input = self.parameters[parameter_name]
# trigger node only supports constant input
if event_input.type != "constant":
raise TriggerEventParameterError(f"Unknown plugin trigger input type '{event_input.type}'")
result[parameter_name] = event_input.value
return result

View File

@ -0,0 +1,10 @@
class TriggerEventNodeError(ValueError):
"""Base exception for plugin trigger node errors."""
pass
class TriggerEventParameterError(TriggerEventNodeError):
"""Exception raised for errors in plugin trigger parameters."""
pass

View File

@ -59,12 +59,17 @@ class TriggerService:
if not request:
raise ValueError("Request not found")
# invoke triger
provider_controller: PluginTriggerProviderController = TriggerManager.get_trigger_provider(
tenant_id, TriggerProviderID(subscription.provider_id)
)
return TriggerManager.invoke_trigger_event(
tenant_id=tenant_id,
user_id=user_id,
provider_id=TriggerProviderID(event.provider_id),
event_name=event.name,
parameters=node_data.parameters,
parameters=node_data.resolve_parameters(
parameter_schemas=provider_controller.get_event_parameters(event_name=event.name)
),
credentials=subscription.credentials,
credential_type=CredentialType.of(subscription.credential_type),
subscription=subscription.to_entity(),
@ -133,6 +138,9 @@ class TriggerService:
return 0
dispatched_count = 0
provider_controller: PluginTriggerProviderController = TriggerManager.get_trigger_provider(
tenant_id=subscription.tenant_id, provider_id=TriggerProviderID(subscription.provider_id)
)
with Session(db.engine) as session:
tenant_owner = cls._get_tenant_owner(session, subscription.tenant_id)
workflows = cls._get_latest_workflows_by_app_ids(session, subscribers)
@ -164,7 +172,9 @@ class TriggerService:
user_id=subscription.user_id,
provider_id=TriggerProviderID(subscription.provider_id),
event_name=event.identity.name,
parameters=node_data.parameters,
parameters=node_data.resolve_parameters(
parameter_schemas=provider_controller.get_event_parameters(event_name=event.identity.name)
),
credentials=subscription.credentials,
credential_type=CredentialType.of(subscription.credential_type),
subscription=subscription.to_entity(),