feat: introduce payload field to plugin trigger processing

This commit is contained in:
Yeuoly 2025-10-18 19:15:46 +08:00
parent 11c9219848
commit dd0da3218c
4 changed files with 40 additions and 26 deletions

View File

@ -26,7 +26,7 @@ from core.trigger.entities.entities import (
TriggerCreationMethod,
TriggerProviderEntity,
TriggerProviderIdentity,
Unsubscription,
UnsubscribeResult,
)
from core.trigger.errors import TriggerProviderCredentialValidationError
from models.provider_ids import TriggerProviderID
@ -190,7 +190,7 @@ class PluginTriggerProviderController:
:return: List of supported credential types
"""
types = []
types: list[CredentialType] = []
subscription_constructor = self.entity.subscription_constructor
if subscription_constructor and subscription_constructor.oauth_schema:
types.append(CredentialType.OAUTH2)
@ -208,7 +208,7 @@ class PluginTriggerProviderController:
subscription_constructor = self.entity.subscription_constructor
if not subscription_constructor:
return []
credential_type = CredentialType.of(credential_type) if isinstance(credential_type, str) else credential_type
credential_type = CredentialType.of(credential_type)
if credential_type == CredentialType.OAUTH2:
return (
subscription_constructor.oauth_schema.credentials_schema.copy()
@ -304,6 +304,7 @@ class PluginTriggerProviderController:
credential_type: CredentialType,
subscription: Subscription,
request: Request,
payload: Mapping[str, Any],
) -> TriggerInvokeEventResponse:
"""
Execute a trigger through plugin runtime
@ -314,6 +315,7 @@ class PluginTriggerProviderController:
:param credentials: Provider credentials
:param credential_type: Credential type
:param request: Request
:param payload: Payload
:return: Trigger execution result
"""
manager = PluginTriggerManager()
@ -329,6 +331,7 @@ class PluginTriggerProviderController:
request=request,
parameters=parameters,
subscription=subscription,
payload=payload,
)
def subscribe_trigger(
@ -366,7 +369,7 @@ class PluginTriggerProviderController:
def unsubscribe_trigger(
self, user_id: str, subscription: Subscription, credentials: Mapping[str, str], credential_type: CredentialType
) -> Unsubscription:
) -> UnsubscribeResult:
"""
Unsubscribe from a trigger through plugin runtime
@ -374,7 +377,7 @@ class PluginTriggerProviderController:
:param subscription: Subscription metadata
:param credentials: Provider credentials
:param credential_type: Credential type
:return: Unsubscription result
:return: Unsubscribe result
"""
manager = PluginTriggerManager()
provider_id: TriggerProviderID = self.get_provider_id()
@ -388,7 +391,7 @@ class PluginTriggerProviderController:
credential_type=credential_type,
)
return Unsubscription.model_validate(response.subscription)
return UnsubscribeResult.model_validate(response.subscription)
def refresh_trigger(
self, subscription: Subscription, credentials: Mapping[str, str], credential_type: CredentialType

View File

@ -18,9 +18,9 @@ from core.plugin.impl.trigger import PluginTriggerManager
from core.trigger.entities.entities import (
EventEntity,
Subscription,
Unsubscription,
UnsubscribeResult,
)
from core.trigger.errors import TriggerPluginInvokeError
from core.trigger.errors import EventIgnoreError, TriggerPluginInvokeError
from core.trigger.provider import PluginTriggerProviderController
from models.provider_ids import TriggerProviderID
@ -56,7 +56,7 @@ class TriggerManager:
manager = PluginTriggerManager()
provider_entities = manager.fetch_trigger_providers(tenant_id)
controllers = []
controllers: list[PluginTriggerProviderController] = []
for provider in provider_entities:
try:
controller = PluginTriggerProviderController(
@ -158,6 +158,7 @@ class TriggerManager:
credential_type: CredentialType,
subscription: Subscription,
request: Request,
payload: Mapping[str, Any],
) -> TriggerInvokeEventResponse:
"""
Execute a trigger
@ -171,6 +172,7 @@ class TriggerManager:
:param credential_type: Credential type
:param subscription: Subscription
:param request: Request
:param payload: Payload
:return: Trigger execution result
"""
provider: PluginTriggerProviderController = cls.get_trigger_provider(
@ -185,10 +187,11 @@ class TriggerManager:
credential_type=credential_type,
subscription=subscription,
request=request,
payload=payload,
)
except EventIgnoreError as e:
return TriggerInvokeEventResponse(variables={}, cancelled=True)
except PluginInvokeError as e:
if e.get_error_type() == "EventIgnoreError":
return TriggerInvokeEventResponse(variables={}, cancelled=True)
logger.exception("Failed to invoke trigger event")
raise TriggerPluginInvokeError(
description=e.to_user_friendly_error(plugin_name=provider.entity.identity.name)
@ -237,7 +240,7 @@ class TriggerManager:
subscription: Subscription,
credentials: Mapping[str, str],
credential_type: CredentialType,
) -> Unsubscription:
) -> UnsubscribeResult:
"""
Unsubscribe from a trigger
@ -283,7 +286,3 @@ class TriggerManager:
return cls.get_trigger_provider(tenant_id=tenant_id, provider_id=provider_id).refresh_trigger(
subscription=subscription, credentials=credentials, credential_type=credential_type
)
# Export
__all__ = ["TriggerManager"]

View File

@ -1,11 +1,11 @@
import logging
import secrets
import time
import uuid
from collections.abc import Mapping
from typing import Any
from flask import Request, Response
from pydantic import BaseModel
from pydantic import BaseModel, TypeAdapter
from sqlalchemy import and_, select
from sqlalchemy.orm import Session
@ -111,9 +111,15 @@ class TriggerService:
)
if dispatch_response.events:
request_id = f"trigger_request_{uuid.uuid4().hex}"
request_id = f"trigger_request_{timestamp}_{secrets.token_hex(6)}"
serialized_request = serialize_request(request)
storage.save(f"triggers/{request_id}", serialized_request)
# save the request and payload to storage as persistent data
storage.save(f"triggers/{request_id}.raw", serialized_request)
storage.save(
f"triggers/{request_id}.payload",
TypeAdapter(Mapping[str, Any]).dump_json(dispatch_response.payload),
)
# Validate event names
for event_name in dispatch_response.events:

View File

@ -7,8 +7,10 @@ to avoid blocking the main request thread.
import logging
from collections.abc import Mapping, Sequence
from typing import Any
from celery import shared_task
from pydantic import TypeAdapter
from sqlalchemy import func, select
from sqlalchemy.orm import Session
@ -30,6 +32,7 @@ from models.provider_ids import TriggerProviderID
from models.trigger import TriggerSubscription
from models.workflow import Workflow, WorkflowPluginTrigger
from services.async_workflow_service import AsyncWorkflowService
from services.end_user_service import EndUserService
from services.trigger.trigger_provider_service import TriggerProviderService
from services.workflow.entities import PluginTriggerData, PluginTriggerDispatchData
@ -117,10 +120,14 @@ def dispatch_triggered_workflow(
event: The trigger entity that was activated
request_id: The ID of the stored request in storage system
"""
request = deserialize_request(storage.load_once(f"triggers/{request_id}"))
request = deserialize_request(storage.load_once(f"triggers/{request_id}.raw"))
if not request:
logger.error("Request not found for request_id %s", request_id)
return 0
payload = TypeAdapter(Mapping[str, Any]).validate_json(storage.load_once(f"triggers/{request_id}.payload"))
if not payload:
logger.error("Payload not found for request_id %s", request_id)
return 0
from services.trigger.trigger_service import TriggerService
@ -141,10 +148,8 @@ def dispatch_triggered_workflow(
)
with Session(db.engine) as session:
workflows: Mapping[str, Workflow] = _get_latest_workflows_by_app_ids(session, subscribers)
# Lazy import to avoid circular import during app initialization
from controllers.service_api.wraps import create_end_user_batch
end_users: Mapping[str, EndUser] = create_end_user_batch(
end_users: Mapping[str, EndUser] = EndUserService.create_end_user_batch(
type=InvokeFrom.TRIGGER,
tenant_id=subscription.tenant_id,
app_ids=[plugin_trigger.app_id for plugin_trigger in subscribers],
@ -185,6 +190,7 @@ def dispatch_triggered_workflow(
credential_type=CredentialType.of(subscription.credential_type),
subscription=subscription.to_entity(),
request=request,
payload=payload,
)
if invoke_response.cancelled:
logger.info(
@ -265,8 +271,8 @@ def dispatch_triggered_workflows(
@shared_task(queue=TRIGGER_QUEUE)
def dispatch_triggered_workflows_async(
dispatch_data: dict,
) -> dict:
dispatch_data: Mapping[str, Any],
) -> Mapping[str, Any]:
"""
Dispatch triggers asynchronously.