mirror of https://github.com/langgenius/dify.git
chore: unify request handling
This commit is contained in:
parent
31cf4b6619
commit
30a341331f
|
|
@ -0,0 +1,65 @@
|
|||
from collections.abc import Mapping
|
||||
from typing import Any
|
||||
|
||||
from flask import Request
|
||||
from pydantic import TypeAdapter
|
||||
|
||||
from core.plugin.utils.http_parser import deserialize_request, serialize_request
|
||||
from extensions.ext_storage import storage
|
||||
|
||||
|
||||
class TriggerRequestService:
|
||||
"""
|
||||
Service for handling trigger requests.
|
||||
"""
|
||||
|
||||
_TRIGGER_STORAGE_PATH = "triggers"
|
||||
|
||||
@classmethod
|
||||
def get_request(cls, request_id: str) -> Request:
|
||||
"""
|
||||
Get the request object from the storage.
|
||||
|
||||
Args:
|
||||
request_id: The ID of the request.
|
||||
|
||||
Returns:
|
||||
The request object.
|
||||
"""
|
||||
return deserialize_request(storage.load_once(f"{cls._TRIGGER_STORAGE_PATH}/{request_id}.raw"))
|
||||
|
||||
@classmethod
|
||||
def get_payload(cls, request_id: str) -> Mapping[str, Any]:
|
||||
"""
|
||||
Get the payload from the storage.
|
||||
|
||||
Args:
|
||||
request_id: The ID of the request.
|
||||
|
||||
Returns:
|
||||
The payload.
|
||||
"""
|
||||
return TypeAdapter(Mapping[str, Any]).validate_json(
|
||||
storage.load_once(f"{cls._TRIGGER_STORAGE_PATH}/{request_id}.payload")
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def persist_request(cls, request_id: str, request: Request) -> None:
|
||||
"""
|
||||
Persist the request in the storage.
|
||||
|
||||
Args:
|
||||
request_id: The ID of the request.
|
||||
request: The request object.
|
||||
"""
|
||||
storage.save(f"{cls._TRIGGER_STORAGE_PATH}/{request_id}.raw", serialize_request(request))
|
||||
|
||||
@classmethod
|
||||
def persist_payload(cls, request_id: str, payload: Mapping[str, Any]) -> None:
|
||||
"""
|
||||
Persist the payload in the storage.
|
||||
"""
|
||||
storage.save(
|
||||
f"{cls._TRIGGER_STORAGE_PATH}/{request_id}.payload",
|
||||
TypeAdapter(Mapping[str, Any]).dump_json(payload),
|
||||
)
|
||||
|
|
@ -5,14 +5,13 @@ from collections.abc import Mapping
|
|||
from typing import Any
|
||||
|
||||
from flask import Request, Response
|
||||
from pydantic import BaseModel, TypeAdapter
|
||||
from pydantic import BaseModel
|
||||
from sqlalchemy import and_, select
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
from core.plugin.entities.plugin_daemon import CredentialType
|
||||
from core.plugin.entities.request import TriggerDispatchResponse, TriggerInvokeEventResponse
|
||||
from core.plugin.impl.exc import PluginNotFoundError
|
||||
from core.plugin.utils.http_parser import deserialize_request, serialize_request
|
||||
from core.trigger.debug.events import PluginTriggerDebugEvent
|
||||
from core.trigger.provider import PluginTriggerProviderController
|
||||
from core.trigger.trigger_manager import TriggerManager
|
||||
|
|
@ -21,12 +20,12 @@ from core.workflow.enums import NodeType
|
|||
from core.workflow.nodes.trigger_plugin.entities import TriggerEventNodeData
|
||||
from extensions.ext_database import db
|
||||
from extensions.ext_redis import redis_client
|
||||
from extensions.ext_storage import storage
|
||||
from models.model import App
|
||||
from models.provider_ids import TriggerProviderID
|
||||
from models.trigger import TriggerSubscription
|
||||
from models.workflow import AppTrigger, AppTriggerStatus, Workflow, WorkflowPluginTrigger
|
||||
from services.trigger.trigger_provider_service import TriggerProviderService
|
||||
from services.trigger.trigger_request_service import TriggerRequestService
|
||||
from services.workflow.entities import PluginTriggerDispatchData
|
||||
from tasks.trigger_processing_tasks import dispatch_triggered_workflows_async
|
||||
|
||||
|
|
@ -52,9 +51,8 @@ class TriggerService:
|
|||
if not subscription:
|
||||
raise ValueError("Subscription not found")
|
||||
node_data: TriggerEventNodeData = TriggerEventNodeData.model_validate(node_config.get("data", {}))
|
||||
request = deserialize_request(storage.load_once(f"triggers/{event.request_id}"))
|
||||
if not request:
|
||||
raise ValueError("Request not found")
|
||||
request = TriggerRequestService.get_request(event.request_id)
|
||||
payload = TriggerRequestService.get_payload(event.request_id)
|
||||
# invoke triger
|
||||
provider_controller: PluginTriggerProviderController = TriggerManager.get_trigger_provider(
|
||||
tenant_id, TriggerProviderID(subscription.provider_id)
|
||||
|
|
@ -71,6 +69,7 @@ class TriggerService:
|
|||
credential_type=CredentialType.of(subscription.credential_type),
|
||||
subscription=subscription.to_entity(),
|
||||
request=request,
|
||||
payload=payload,
|
||||
)
|
||||
|
||||
@classmethod
|
||||
|
|
@ -112,14 +111,10 @@ class TriggerService:
|
|||
|
||||
if dispatch_response.events:
|
||||
request_id = f"trigger_request_{timestamp}_{secrets.token_hex(6)}"
|
||||
serialized_request = serialize_request(request)
|
||||
|
||||
# save the request and payload to storage as persistent data
|
||||
storage.save(f"triggers/{request_id}.raw", serialized_request)
|
||||
storage.save(
|
||||
f"triggers/{request_id}.payload",
|
||||
TypeAdapter(Mapping[str, Any]).dump_json(dispatch_response.payload),
|
||||
)
|
||||
TriggerRequestService.persist_request(request_id, request)
|
||||
TriggerRequestService.persist_payload(request_id, dispatch_response.payload)
|
||||
|
||||
# Validate event names
|
||||
for event_name in dispatch_response.events:
|
||||
|
|
|
|||
|
|
@ -10,14 +10,12 @@ from collections.abc import Mapping, Sequence
|
|||
from typing import Any
|
||||
|
||||
from celery import shared_task
|
||||
from pydantic import TypeAdapter
|
||||
from sqlalchemy import func, select
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
from core.app.entities.app_invoke_entities import InvokeFrom
|
||||
from core.plugin.entities.plugin_daemon import CredentialType
|
||||
from core.plugin.entities.request import TriggerInvokeEventResponse
|
||||
from core.plugin.utils.http_parser import deserialize_request
|
||||
from core.trigger.debug.event_bus import TriggerDebugEventBus
|
||||
from core.trigger.debug.events import PluginTriggerDebugEvent
|
||||
from core.trigger.provider import PluginTriggerProviderController
|
||||
|
|
@ -25,7 +23,6 @@ from core.trigger.trigger_manager import TriggerManager
|
|||
from core.workflow.enums import NodeType
|
||||
from core.workflow.nodes.trigger_plugin.entities import TriggerEventNodeData
|
||||
from extensions.ext_database import db
|
||||
from extensions.ext_storage import storage
|
||||
from models.enums import WorkflowRunTriggeredFrom
|
||||
from models.model import EndUser
|
||||
from models.provider_ids import TriggerProviderID
|
||||
|
|
@ -34,6 +31,7 @@ from models.workflow import Workflow, WorkflowPluginTrigger
|
|||
from services.async_workflow_service import AsyncWorkflowService
|
||||
from services.end_user_service import EndUserService
|
||||
from services.trigger.trigger_provider_service import TriggerProviderService
|
||||
from services.trigger.trigger_request_service import TriggerRequestService
|
||||
from services.workflow.entities import PluginTriggerData, PluginTriggerDispatchData
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
|
@ -120,16 +118,11 @@ def dispatch_triggered_workflow(
|
|||
event: The trigger entity that was activated
|
||||
request_id: The ID of the stored request in storage system
|
||||
"""
|
||||
request = deserialize_request(storage.load_once(f"triggers/{request_id}.raw"))
|
||||
if not request:
|
||||
logger.error("Request not found for request_id %s", request_id)
|
||||
return 0
|
||||
payload = TypeAdapter(Mapping[str, Any]).validate_json(storage.load_once(f"triggers/{request_id}.payload"))
|
||||
if not payload:
|
||||
logger.error("Payload not found for request_id %s", request_id)
|
||||
return 0
|
||||
request = TriggerRequestService.get_request(request_id)
|
||||
payload = TriggerRequestService.get_payload(request_id)
|
||||
|
||||
from services.trigger.trigger_service import TriggerService
|
||||
# FIXME: we should avoid import modules inside methods
|
||||
|
||||
subscribers: list[WorkflowPluginTrigger] = TriggerService.get_subscriber_triggers(
|
||||
tenant_id=subscription.tenant_id, subscription_id=subscription.id, event_name=event_name
|
||||
|
|
|
|||
Loading…
Reference in New Issue