From dd0da3218cd1a111a3d32660492c39bbeac662d2 Mon Sep 17 00:00:00 2001 From: Yeuoly Date: Sat, 18 Oct 2025 19:15:46 +0800 Subject: [PATCH] feat: introduce payload field to plugin trigger processing --- api/core/trigger/provider.py | 15 +++++++++------ api/core/trigger/trigger_manager.py | 19 +++++++++---------- api/services/trigger/trigger_service.py | 14 ++++++++++---- api/tasks/trigger_processing_tasks.py | 18 ++++++++++++------ 4 files changed, 40 insertions(+), 26 deletions(-) diff --git a/api/core/trigger/provider.py b/api/core/trigger/provider.py index 7adf815333..510cf34d88 100644 --- a/api/core/trigger/provider.py +++ b/api/core/trigger/provider.py @@ -26,7 +26,7 @@ from core.trigger.entities.entities import ( TriggerCreationMethod, TriggerProviderEntity, TriggerProviderIdentity, - Unsubscription, + UnsubscribeResult, ) from core.trigger.errors import TriggerProviderCredentialValidationError from models.provider_ids import TriggerProviderID @@ -190,7 +190,7 @@ class PluginTriggerProviderController: :return: List of supported credential types """ - types = [] + types: list[CredentialType] = [] subscription_constructor = self.entity.subscription_constructor if subscription_constructor and subscription_constructor.oauth_schema: types.append(CredentialType.OAUTH2) @@ -208,7 +208,7 @@ class PluginTriggerProviderController: subscription_constructor = self.entity.subscription_constructor if not subscription_constructor: return [] - credential_type = CredentialType.of(credential_type) if isinstance(credential_type, str) else credential_type + credential_type = CredentialType.of(credential_type) if credential_type == CredentialType.OAUTH2: return ( subscription_constructor.oauth_schema.credentials_schema.copy() @@ -304,6 +304,7 @@ class PluginTriggerProviderController: credential_type: CredentialType, subscription: Subscription, request: Request, + payload: Mapping[str, Any], ) -> TriggerInvokeEventResponse: """ Execute a trigger through plugin runtime @@ -314,6 +315,7 @@ class PluginTriggerProviderController: :param credentials: Provider credentials :param credential_type: Credential type :param request: Request + :param payload: Payload :return: Trigger execution result """ manager = PluginTriggerManager() @@ -329,6 +331,7 @@ class PluginTriggerProviderController: request=request, parameters=parameters, subscription=subscription, + payload=payload, ) def subscribe_trigger( @@ -366,7 +369,7 @@ class PluginTriggerProviderController: def unsubscribe_trigger( self, user_id: str, subscription: Subscription, credentials: Mapping[str, str], credential_type: CredentialType - ) -> Unsubscription: + ) -> UnsubscribeResult: """ Unsubscribe from a trigger through plugin runtime @@ -374,7 +377,7 @@ class PluginTriggerProviderController: :param subscription: Subscription metadata :param credentials: Provider credentials :param credential_type: Credential type - :return: Unsubscription result + :return: Unsubscribe result """ manager = PluginTriggerManager() provider_id: TriggerProviderID = self.get_provider_id() @@ -388,7 +391,7 @@ class PluginTriggerProviderController: credential_type=credential_type, ) - return Unsubscription.model_validate(response.subscription) + return UnsubscribeResult.model_validate(response.subscription) def refresh_trigger( self, subscription: Subscription, credentials: Mapping[str, str], credential_type: CredentialType diff --git a/api/core/trigger/trigger_manager.py b/api/core/trigger/trigger_manager.py index 2dc3abf44a..b24c240cb4 100644 --- a/api/core/trigger/trigger_manager.py +++ b/api/core/trigger/trigger_manager.py @@ -18,9 +18,9 @@ from core.plugin.impl.trigger import PluginTriggerManager from core.trigger.entities.entities import ( EventEntity, Subscription, - Unsubscription, + UnsubscribeResult, ) -from core.trigger.errors import TriggerPluginInvokeError +from core.trigger.errors import EventIgnoreError, TriggerPluginInvokeError from core.trigger.provider import PluginTriggerProviderController from models.provider_ids import TriggerProviderID @@ -56,7 +56,7 @@ class TriggerManager: manager = PluginTriggerManager() provider_entities = manager.fetch_trigger_providers(tenant_id) - controllers = [] + controllers: list[PluginTriggerProviderController] = [] for provider in provider_entities: try: controller = PluginTriggerProviderController( @@ -158,6 +158,7 @@ class TriggerManager: credential_type: CredentialType, subscription: Subscription, request: Request, + payload: Mapping[str, Any], ) -> TriggerInvokeEventResponse: """ Execute a trigger @@ -171,6 +172,7 @@ class TriggerManager: :param credential_type: Credential type :param subscription: Subscription :param request: Request + :param payload: Payload :return: Trigger execution result """ provider: PluginTriggerProviderController = cls.get_trigger_provider( @@ -185,10 +187,11 @@ class TriggerManager: credential_type=credential_type, subscription=subscription, request=request, + payload=payload, ) + except EventIgnoreError as e: + return TriggerInvokeEventResponse(variables={}, cancelled=True) except PluginInvokeError as e: - if e.get_error_type() == "EventIgnoreError": - return TriggerInvokeEventResponse(variables={}, cancelled=True) logger.exception("Failed to invoke trigger event") raise TriggerPluginInvokeError( description=e.to_user_friendly_error(plugin_name=provider.entity.identity.name) @@ -237,7 +240,7 @@ class TriggerManager: subscription: Subscription, credentials: Mapping[str, str], credential_type: CredentialType, - ) -> Unsubscription: + ) -> UnsubscribeResult: """ Unsubscribe from a trigger @@ -283,7 +286,3 @@ class TriggerManager: return cls.get_trigger_provider(tenant_id=tenant_id, provider_id=provider_id).refresh_trigger( subscription=subscription, credentials=credentials, credential_type=credential_type ) - - -# Export -__all__ = ["TriggerManager"] diff --git a/api/services/trigger/trigger_service.py b/api/services/trigger/trigger_service.py index f196108820..9531992007 100644 --- a/api/services/trigger/trigger_service.py +++ b/api/services/trigger/trigger_service.py @@ -1,11 +1,11 @@ import logging +import secrets import time -import uuid from collections.abc import Mapping from typing import Any from flask import Request, Response -from pydantic import BaseModel +from pydantic import BaseModel, TypeAdapter from sqlalchemy import and_, select from sqlalchemy.orm import Session @@ -111,9 +111,15 @@ class TriggerService: ) if dispatch_response.events: - request_id = f"trigger_request_{uuid.uuid4().hex}" + request_id = f"trigger_request_{timestamp}_{secrets.token_hex(6)}" serialized_request = serialize_request(request) - storage.save(f"triggers/{request_id}", serialized_request) + + # save the request and payload to storage as persistent data + storage.save(f"triggers/{request_id}.raw", serialized_request) + storage.save( + f"triggers/{request_id}.payload", + TypeAdapter(Mapping[str, Any]).dump_json(dispatch_response.payload), + ) # Validate event names for event_name in dispatch_response.events: diff --git a/api/tasks/trigger_processing_tasks.py b/api/tasks/trigger_processing_tasks.py index d3fa387bd4..5add4e76da 100644 --- a/api/tasks/trigger_processing_tasks.py +++ b/api/tasks/trigger_processing_tasks.py @@ -7,8 +7,10 @@ to avoid blocking the main request thread. import logging from collections.abc import Mapping, Sequence +from typing import Any from celery import shared_task +from pydantic import TypeAdapter from sqlalchemy import func, select from sqlalchemy.orm import Session @@ -30,6 +32,7 @@ from models.provider_ids import TriggerProviderID from models.trigger import TriggerSubscription from models.workflow import Workflow, WorkflowPluginTrigger from services.async_workflow_service import AsyncWorkflowService +from services.end_user_service import EndUserService from services.trigger.trigger_provider_service import TriggerProviderService from services.workflow.entities import PluginTriggerData, PluginTriggerDispatchData @@ -117,10 +120,14 @@ def dispatch_triggered_workflow( event: The trigger entity that was activated request_id: The ID of the stored request in storage system """ - request = deserialize_request(storage.load_once(f"triggers/{request_id}")) + request = deserialize_request(storage.load_once(f"triggers/{request_id}.raw")) if not request: logger.error("Request not found for request_id %s", request_id) return 0 + payload = TypeAdapter(Mapping[str, Any]).validate_json(storage.load_once(f"triggers/{request_id}.payload")) + if not payload: + logger.error("Payload not found for request_id %s", request_id) + return 0 from services.trigger.trigger_service import TriggerService @@ -141,10 +148,8 @@ def dispatch_triggered_workflow( ) with Session(db.engine) as session: workflows: Mapping[str, Workflow] = _get_latest_workflows_by_app_ids(session, subscribers) - # Lazy import to avoid circular import during app initialization - from controllers.service_api.wraps import create_end_user_batch - end_users: Mapping[str, EndUser] = create_end_user_batch( + end_users: Mapping[str, EndUser] = EndUserService.create_end_user_batch( type=InvokeFrom.TRIGGER, tenant_id=subscription.tenant_id, app_ids=[plugin_trigger.app_id for plugin_trigger in subscribers], @@ -185,6 +190,7 @@ def dispatch_triggered_workflow( credential_type=CredentialType.of(subscription.credential_type), subscription=subscription.to_entity(), request=request, + payload=payload, ) if invoke_response.cancelled: logger.info( @@ -265,8 +271,8 @@ def dispatch_triggered_workflows( @shared_task(queue=TRIGGER_QUEUE) def dispatch_triggered_workflows_async( - dispatch_data: dict, -) -> dict: + dispatch_data: Mapping[str, Any], +) -> Mapping[str, Any]: """ Dispatch triggers asynchronously.