From 30a341331fdf72083265becab593bb31c8f48ad0 Mon Sep 17 00:00:00 2001 From: Yeuoly Date: Sat, 18 Oct 2025 19:29:00 +0800 Subject: [PATCH] chore: unify request handling --- .../trigger/trigger_request_service.py | 65 +++++++++++++++++++ api/services/trigger/trigger_service.py | 19 ++---- api/tasks/trigger_processing_tasks.py | 15 ++--- 3 files changed, 76 insertions(+), 23 deletions(-) create mode 100644 api/services/trigger/trigger_request_service.py diff --git a/api/services/trigger/trigger_request_service.py b/api/services/trigger/trigger_request_service.py new file mode 100644 index 0000000000..ad02b43e01 --- /dev/null +++ b/api/services/trigger/trigger_request_service.py @@ -0,0 +1,65 @@ +from collections.abc import Mapping +from typing import Any + +from flask import Request +from pydantic import TypeAdapter + +from core.plugin.utils.http_parser import deserialize_request, serialize_request +from extensions.ext_storage import storage + + +class TriggerRequestService: + """ + Service for handling trigger requests. + """ + + _TRIGGER_STORAGE_PATH = "triggers" + + @classmethod + def get_request(cls, request_id: str) -> Request: + """ + Get the request object from the storage. + + Args: + request_id: The ID of the request. + + Returns: + The request object. + """ + return deserialize_request(storage.load_once(f"{cls._TRIGGER_STORAGE_PATH}/{request_id}.raw")) + + @classmethod + def get_payload(cls, request_id: str) -> Mapping[str, Any]: + """ + Get the payload from the storage. + + Args: + request_id: The ID of the request. + + Returns: + The payload. + """ + return TypeAdapter(Mapping[str, Any]).validate_json( + storage.load_once(f"{cls._TRIGGER_STORAGE_PATH}/{request_id}.payload") + ) + + @classmethod + def persist_request(cls, request_id: str, request: Request) -> None: + """ + Persist the request in the storage. + + Args: + request_id: The ID of the request. + request: The request object. + """ + storage.save(f"{cls._TRIGGER_STORAGE_PATH}/{request_id}.raw", serialize_request(request)) + + @classmethod + def persist_payload(cls, request_id: str, payload: Mapping[str, Any]) -> None: + """ + Persist the payload in the storage. + """ + storage.save( + f"{cls._TRIGGER_STORAGE_PATH}/{request_id}.payload", + TypeAdapter(Mapping[str, Any]).dump_json(payload), + ) diff --git a/api/services/trigger/trigger_service.py b/api/services/trigger/trigger_service.py index 9531992007..5bfd4a13d4 100644 --- a/api/services/trigger/trigger_service.py +++ b/api/services/trigger/trigger_service.py @@ -5,14 +5,13 @@ from collections.abc import Mapping from typing import Any from flask import Request, Response -from pydantic import BaseModel, TypeAdapter +from pydantic import BaseModel from sqlalchemy import and_, select from sqlalchemy.orm import Session from core.plugin.entities.plugin_daemon import CredentialType from core.plugin.entities.request import TriggerDispatchResponse, TriggerInvokeEventResponse from core.plugin.impl.exc import PluginNotFoundError -from core.plugin.utils.http_parser import deserialize_request, serialize_request from core.trigger.debug.events import PluginTriggerDebugEvent from core.trigger.provider import PluginTriggerProviderController from core.trigger.trigger_manager import TriggerManager @@ -21,12 +20,12 @@ from core.workflow.enums import NodeType from core.workflow.nodes.trigger_plugin.entities import TriggerEventNodeData from extensions.ext_database import db from extensions.ext_redis import redis_client -from extensions.ext_storage import storage from models.model import App from models.provider_ids import TriggerProviderID from models.trigger import TriggerSubscription from models.workflow import AppTrigger, AppTriggerStatus, Workflow, WorkflowPluginTrigger from services.trigger.trigger_provider_service import TriggerProviderService +from services.trigger.trigger_request_service import TriggerRequestService from services.workflow.entities import PluginTriggerDispatchData from tasks.trigger_processing_tasks import dispatch_triggered_workflows_async @@ -52,9 +51,8 @@ class TriggerService: if not subscription: raise ValueError("Subscription not found") node_data: TriggerEventNodeData = TriggerEventNodeData.model_validate(node_config.get("data", {})) - request = deserialize_request(storage.load_once(f"triggers/{event.request_id}")) - if not request: - raise ValueError("Request not found") + request = TriggerRequestService.get_request(event.request_id) + payload = TriggerRequestService.get_payload(event.request_id) # invoke triger provider_controller: PluginTriggerProviderController = TriggerManager.get_trigger_provider( tenant_id, TriggerProviderID(subscription.provider_id) @@ -71,6 +69,7 @@ class TriggerService: credential_type=CredentialType.of(subscription.credential_type), subscription=subscription.to_entity(), request=request, + payload=payload, ) @classmethod @@ -112,14 +111,10 @@ class TriggerService: if dispatch_response.events: request_id = f"trigger_request_{timestamp}_{secrets.token_hex(6)}" - serialized_request = serialize_request(request) # save the request and payload to storage as persistent data - storage.save(f"triggers/{request_id}.raw", serialized_request) - storage.save( - f"triggers/{request_id}.payload", - TypeAdapter(Mapping[str, Any]).dump_json(dispatch_response.payload), - ) + TriggerRequestService.persist_request(request_id, request) + TriggerRequestService.persist_payload(request_id, dispatch_response.payload) # Validate event names for event_name in dispatch_response.events: diff --git a/api/tasks/trigger_processing_tasks.py b/api/tasks/trigger_processing_tasks.py index 5add4e76da..9667c1eed5 100644 --- a/api/tasks/trigger_processing_tasks.py +++ b/api/tasks/trigger_processing_tasks.py @@ -10,14 +10,12 @@ from collections.abc import Mapping, Sequence from typing import Any from celery import shared_task -from pydantic import TypeAdapter from sqlalchemy import func, select from sqlalchemy.orm import Session from core.app.entities.app_invoke_entities import InvokeFrom from core.plugin.entities.plugin_daemon import CredentialType from core.plugin.entities.request import TriggerInvokeEventResponse -from core.plugin.utils.http_parser import deserialize_request from core.trigger.debug.event_bus import TriggerDebugEventBus from core.trigger.debug.events import PluginTriggerDebugEvent from core.trigger.provider import PluginTriggerProviderController @@ -25,7 +23,6 @@ from core.trigger.trigger_manager import TriggerManager from core.workflow.enums import NodeType from core.workflow.nodes.trigger_plugin.entities import TriggerEventNodeData from extensions.ext_database import db -from extensions.ext_storage import storage from models.enums import WorkflowRunTriggeredFrom from models.model import EndUser from models.provider_ids import TriggerProviderID @@ -34,6 +31,7 @@ from models.workflow import Workflow, WorkflowPluginTrigger from services.async_workflow_service import AsyncWorkflowService from services.end_user_service import EndUserService from services.trigger.trigger_provider_service import TriggerProviderService +from services.trigger.trigger_request_service import TriggerRequestService from services.workflow.entities import PluginTriggerData, PluginTriggerDispatchData logger = logging.getLogger(__name__) @@ -120,16 +118,11 @@ def dispatch_triggered_workflow( event: The trigger entity that was activated request_id: The ID of the stored request in storage system """ - request = deserialize_request(storage.load_once(f"triggers/{request_id}.raw")) - if not request: - logger.error("Request not found for request_id %s", request_id) - return 0 - payload = TypeAdapter(Mapping[str, Any]).validate_json(storage.load_once(f"triggers/{request_id}.payload")) - if not payload: - logger.error("Payload not found for request_id %s", request_id) - return 0 + request = TriggerRequestService.get_request(request_id) + payload = TriggerRequestService.get_payload(request_id) from services.trigger.trigger_service import TriggerService + # FIXME: we should avoid import modules inside methods subscribers: list[WorkflowPluginTrigger] = TriggerService.get_subscriber_triggers( tenant_id=subscription.tenant_id, subscription_id=subscription.id, event_name=event_name