mirror of https://github.com/langgenius/dify.git
refactor(trigger): unify debug event handling and improve polling mechanism
- Introduced a base class for debug events to streamline event handling. - Refactored `TriggerDebugService` to support multiple event types through a generic dispatch/poll interface. - Updated webhook and plugin trigger debug services to utilize the new event structure. - Enhanced the dispatch logic in `dispatch_triggered_workflows_async` to accommodate the new event model.
This commit is contained in:
parent
02222752f0
commit
a33d04d1ac
|
|
@ -35,7 +35,11 @@ from models.workflow import NodeType, Workflow
|
|||
from services.app_generate_service import AppGenerateService
|
||||
from services.errors.app import WorkflowHashNotEqualError
|
||||
from services.errors.llm import InvokeRateLimitError
|
||||
from services.trigger.trigger_debug_service import TriggerDebugService, WebhookDebugService
|
||||
from services.trigger.trigger_debug_service import (
|
||||
PluginTriggerDebugEvent,
|
||||
TriggerDebugService,
|
||||
WebhookDebugEvent,
|
||||
)
|
||||
from services.trigger.webhook_service import WebhookService
|
||||
from services.workflow_service import DraftWorkflowDeletionError, WorkflowInUseError, WorkflowService
|
||||
|
||||
|
|
@ -1030,13 +1034,18 @@ class DraftWorkflowTriggerNodeApi(Resource):
|
|||
trigger_name = args["trigger_name"]
|
||||
subscription_id = args["subscription_id"]
|
||||
|
||||
event = TriggerDebugService.poll_event(
|
||||
pool_key = PluginTriggerDebugEvent.build_pool_key(
|
||||
tenant_id=app_model.tenant_id,
|
||||
subscription_id=subscription_id,
|
||||
trigger_name=trigger_name,
|
||||
)
|
||||
event: PluginTriggerDebugEvent | None = TriggerDebugService.poll(
|
||||
event_type=PluginTriggerDebugEvent,
|
||||
pool_key=pool_key,
|
||||
tenant_id=app_model.tenant_id,
|
||||
user_id=current_user.id,
|
||||
app_id=app_model.id,
|
||||
subscription_id=subscription_id,
|
||||
node_id=node_id,
|
||||
trigger_name=trigger_name,
|
||||
)
|
||||
if not event:
|
||||
return jsonable_encoder({"status": "waiting"})
|
||||
|
|
@ -1110,13 +1119,18 @@ class DraftWorkflowTriggerRunApi(Resource):
|
|||
trigger_name = args["trigger_name"]
|
||||
subscription_id = args["subscription_id"]
|
||||
|
||||
event = TriggerDebugService.poll_event(
|
||||
pool_key = PluginTriggerDebugEvent.build_pool_key(
|
||||
tenant_id=app_model.tenant_id,
|
||||
subscription_id=subscription_id,
|
||||
trigger_name=trigger_name,
|
||||
)
|
||||
event: PluginTriggerDebugEvent | None = TriggerDebugService.poll(
|
||||
event_type=PluginTriggerDebugEvent,
|
||||
pool_key=pool_key,
|
||||
tenant_id=app_model.tenant_id,
|
||||
user_id=current_user.id,
|
||||
app_id=app_model.id,
|
||||
subscription_id=subscription_id,
|
||||
node_id=node_id,
|
||||
trigger_name=trigger_name,
|
||||
)
|
||||
if not event:
|
||||
return jsonable_encoder({"status": "waiting"})
|
||||
|
|
@ -1187,7 +1201,14 @@ class DraftWorkflowTriggerWebhookRunApi(Resource):
|
|||
args = parser.parse_args()
|
||||
node_id = args["node_id"]
|
||||
|
||||
event = WebhookDebugService.poll_event(
|
||||
pool_key = WebhookDebugEvent.build_pool_key(
|
||||
tenant_id=app_model.tenant_id,
|
||||
app_id=app_model.id,
|
||||
node_id=node_id,
|
||||
)
|
||||
event: WebhookDebugEvent | None = TriggerDebugService.poll(
|
||||
event_type=WebhookDebugEvent,
|
||||
pool_key=pool_key,
|
||||
tenant_id=app_model.tenant_id,
|
||||
user_id=current_user.id,
|
||||
app_id=app_model.id,
|
||||
|
|
@ -1253,7 +1274,14 @@ class DraftWorkflowNodeWebhookDebugRunApi(Resource):
|
|||
if not isinstance(current_user, Account) or not current_user.has_edit_permission:
|
||||
raise Forbidden()
|
||||
|
||||
event = WebhookDebugService.poll_event(
|
||||
pool_key = WebhookDebugEvent.build_pool_key(
|
||||
tenant_id=app_model.tenant_id,
|
||||
app_id=app_model.id,
|
||||
node_id=node_id,
|
||||
)
|
||||
event: WebhookDebugEvent | None = TriggerDebugService.poll(
|
||||
event_type=WebhookDebugEvent,
|
||||
pool_key=pool_key,
|
||||
tenant_id=app_model.tenant_id,
|
||||
user_id=current_user.id,
|
||||
app_id=app_model.id,
|
||||
|
|
@ -1272,10 +1300,12 @@ class DraftWorkflowNodeWebhookDebugRunApi(Resource):
|
|||
node_config = draft_workflow.get_node_config_by_id(node_id)
|
||||
node_type = Workflow.get_node_type_from_node_config(node_config)
|
||||
if node_type != NodeType.TRIGGER_WEBHOOK:
|
||||
return jsonable_encoder({
|
||||
"status": "error",
|
||||
"message": "node is not webhook trigger",
|
||||
}), 400
|
||||
return jsonable_encoder(
|
||||
{
|
||||
"status": "error",
|
||||
"message": "node is not webhook trigger",
|
||||
}
|
||||
), 400
|
||||
|
||||
payload = event.payload or {}
|
||||
workflow_inputs = payload.get("inputs")
|
||||
|
|
|
|||
|
|
@ -5,7 +5,7 @@ from flask import jsonify
|
|||
from werkzeug.exceptions import NotFound, RequestEntityTooLarge
|
||||
|
||||
from controllers.trigger import bp
|
||||
from services.trigger.trigger_debug_service import WebhookDebugService
|
||||
from services.trigger.trigger_debug_service import TriggerDebugService, WebhookDebugEvent
|
||||
from services.trigger.webhook_service import WebhookService
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
|
@ -70,18 +70,28 @@ def handle_webhook_debug(webhook_id: str):
|
|||
return jsonify({"error": "Bad Request", "message": error}), 400
|
||||
|
||||
workflow_inputs = WebhookService.build_workflow_inputs(webhook_data)
|
||||
WebhookDebugService.dispatch_event(
|
||||
|
||||
# Generate pool key and dispatch debug event
|
||||
pool_key: str = WebhookDebugEvent.build_pool_key(
|
||||
tenant_id=webhook_trigger.tenant_id,
|
||||
app_id=webhook_trigger.app_id,
|
||||
node_id=webhook_trigger.node_id,
|
||||
)
|
||||
event = WebhookDebugEvent(
|
||||
request_id=f"webhook_debug_{webhook_trigger.webhook_id}_{int(time.time() * 1000)}",
|
||||
timestamp=int(time.time()),
|
||||
node_id=webhook_trigger.node_id,
|
||||
payload={
|
||||
"inputs": workflow_inputs,
|
||||
"webhook_data": webhook_data,
|
||||
"method": webhook_data.get("method"),
|
||||
},
|
||||
)
|
||||
TriggerDebugService.dispatch(
|
||||
tenant_id=webhook_trigger.tenant_id,
|
||||
event=event,
|
||||
pool_key=pool_key,
|
||||
)
|
||||
response_data, status_code = WebhookService.generate_webhook_response(node_config)
|
||||
return jsonify(response_data), status_code
|
||||
|
||||
|
|
|
|||
|
|
@ -2,7 +2,8 @@
|
|||
|
||||
import hashlib
|
||||
import logging
|
||||
from typing import Any, Optional
|
||||
from abc import ABC, abstractmethod
|
||||
from typing import Any, Optional, TypeVar
|
||||
|
||||
from pydantic import BaseModel, Field
|
||||
from redis import RedisError
|
||||
|
|
@ -13,36 +14,84 @@ logger = logging.getLogger(__name__)
|
|||
|
||||
TRIGGER_DEBUG_EVENT_TTL = 300
|
||||
|
||||
TEvent = TypeVar("TEvent", bound="BaseDebugEvent")
|
||||
|
||||
class TriggerDebugEvent(BaseModel):
|
||||
|
||||
class BaseDebugEvent(ABC, BaseModel):
|
||||
"""Base class for all debug events."""
|
||||
|
||||
timestamp: int
|
||||
|
||||
@classmethod
|
||||
@abstractmethod
|
||||
def build_pool_key(cls, **kwargs: Any) -> str:
|
||||
"""
|
||||
Generate the waiting pool key for this event type.
|
||||
|
||||
Each subclass implements its own pool key strategy based on routing parameters.
|
||||
|
||||
Returns:
|
||||
Redis key for the waiting pool
|
||||
"""
|
||||
raise NotImplementedError("Subclasses must implement build_pool_key")
|
||||
|
||||
|
||||
class PluginTriggerDebugEvent(BaseDebugEvent):
|
||||
"""Debug event for plugin triggers."""
|
||||
|
||||
request_id: str
|
||||
subscription_id: str
|
||||
request_id: str
|
||||
timestamp: int
|
||||
event_name: str
|
||||
|
||||
@classmethod
|
||||
def build_pool_key(cls, **kwargs: Any) -> str:
|
||||
"""Generate pool key for plugin trigger events.
|
||||
|
||||
Args:
|
||||
tenant_id: Tenant ID
|
||||
subscription_id: Subscription ID
|
||||
trigger_name: Trigger name
|
||||
"""
|
||||
tenant_id = kwargs["tenant_id"]
|
||||
subscription_id = kwargs["subscription_id"]
|
||||
trigger_name = kwargs["trigger_name"]
|
||||
return f"trigger_debug_waiting_pool:{{{tenant_id}}}:{subscription_id}:{trigger_name}"
|
||||
|
||||
|
||||
class WebhookDebugEvent(BaseModel):
|
||||
class WebhookDebugEvent(BaseDebugEvent):
|
||||
"""Debug event for webhook triggers."""
|
||||
|
||||
request_id: str
|
||||
timestamp: int
|
||||
node_id: str
|
||||
payload: dict[str, Any] = Field(default_factory=dict)
|
||||
|
||||
@classmethod
|
||||
def build_pool_key(cls, **kwargs: Any) -> str:
|
||||
"""Generate pool key for webhook events.
|
||||
|
||||
def _address(tenant_id: str, user_id: str, app_id: str, node_id: str) -> str:
|
||||
address_id = hashlib.sha1(f"{user_id}|{app_id}|{node_id}".encode()).hexdigest()
|
||||
return f"trigger_debug_inbox:{{{tenant_id}}}:{address_id}"
|
||||
Args:
|
||||
tenant_id: Tenant ID
|
||||
app_id: App ID
|
||||
node_id: Node ID
|
||||
"""
|
||||
tenant_id = kwargs["tenant_id"]
|
||||
app_id = kwargs["app_id"]
|
||||
node_id = kwargs["node_id"]
|
||||
return f"trigger_debug_waiting_pool:{{{tenant_id}}}:{app_id}:{node_id}"
|
||||
|
||||
|
||||
class TriggerDebugService:
|
||||
"""
|
||||
Redis-based trigger debug service with polling support.
|
||||
Unified Redis-based trigger debug service with polling support.
|
||||
|
||||
Uses {tenant_id} hash tags for Redis Cluster compatibility.
|
||||
Supports multiple event types through a generic dispatch/poll interface.
|
||||
"""
|
||||
|
||||
# LUA_SELECT: Atomic poll or register for event
|
||||
# KEYS[1] = trigger_debug_inbox:{tenant_id}:{address_id}
|
||||
# KEYS[2] = trigger_debug_waiting_pool:{tenant_id}:{subscription_id}:{trigger}
|
||||
# KEYS[2] = trigger_debug_waiting_pool:{tenant_id}:...
|
||||
# ARGV[1] = address_id
|
||||
# compressed lua code, you can use LLM to uncompress it
|
||||
LUA_SELECT = (
|
||||
"local v=redis.call('GET',KEYS[1]);"
|
||||
"if v then redis.call('DEL',KEYS[1]);return v end;"
|
||||
|
|
@ -52,10 +101,9 @@ class TriggerDebugService:
|
|||
)
|
||||
|
||||
# LUA_DISPATCH: Dispatch event to all waiting addresses
|
||||
# KEYS[1] = trigger_debug_waiting_pool:{tenant_id}:{subscription_id}:{trigger}
|
||||
# KEYS[1] = trigger_debug_waiting_pool:{tenant_id}:...
|
||||
# ARGV[1] = tenant_id
|
||||
# ARGV[2] = event_json
|
||||
# compressed lua code, you can use LLM to uncompress it
|
||||
LUA_DISPATCH = (
|
||||
"local a=redis.call('SMEMBERS',KEYS[1]);"
|
||||
"if #a==0 then return 0 end;"
|
||||
|
|
@ -67,127 +115,76 @@ class TriggerDebugService:
|
|||
)
|
||||
|
||||
@classmethod
|
||||
def waiting_pool(cls, tenant_id: str, subscription_id: str, trigger_name: str) -> str:
|
||||
return f"trigger_debug_waiting_pool:{{{tenant_id}}}:{subscription_id}:{trigger_name}"
|
||||
|
||||
@classmethod
|
||||
def dispatch_debug_event(
|
||||
def dispatch(
|
||||
cls,
|
||||
tenant_id: str,
|
||||
subscription_id: str,
|
||||
events: list[str],
|
||||
request_id: str,
|
||||
timestamp: int,
|
||||
event: BaseDebugEvent,
|
||||
pool_key: str,
|
||||
) -> int:
|
||||
event_json = TriggerDebugEvent(
|
||||
subscription_id=subscription_id,
|
||||
request_id=request_id,
|
||||
timestamp=timestamp,
|
||||
).model_dump_json()
|
||||
"""
|
||||
Dispatch event to all waiting addresses in the pool.
|
||||
|
||||
dispatched = 0
|
||||
if len(events) > 10:
|
||||
logger.warning(
|
||||
"Too many events to dispatch at once: %d events tenant: %s subscription: %s",
|
||||
len(events),
|
||||
tenant_id,
|
||||
subscription_id,
|
||||
)
|
||||
|
||||
for trigger_name in events:
|
||||
try:
|
||||
dispatched += redis_client.eval(
|
||||
cls.LUA_DISPATCH,
|
||||
1,
|
||||
cls.waiting_pool(tenant_id, subscription_id, trigger_name),
|
||||
tenant_id,
|
||||
event_json,
|
||||
)
|
||||
except RedisError:
|
||||
logger.exception("Failed to dispatch for trigger: %s", trigger_name)
|
||||
return dispatched
|
||||
|
||||
@classmethod
|
||||
def poll_event(
|
||||
cls,
|
||||
tenant_id: str,
|
||||
user_id: str,
|
||||
app_id: str,
|
||||
subscription_id: str,
|
||||
node_id: str,
|
||||
trigger_name: str,
|
||||
) -> Optional[TriggerDebugEvent]:
|
||||
address_id = hashlib.sha1(f"{user_id}|{app_id}|{node_id}".encode()).hexdigest()
|
||||
Args:
|
||||
tenant_id: Tenant ID for hash tag
|
||||
event: Event object to dispatch
|
||||
pool_key: Pool key (generate using event_class.build_pool_key(...))
|
||||
|
||||
Returns:
|
||||
Number of addresses the event was dispatched to
|
||||
"""
|
||||
event_json = event.model_dump_json()
|
||||
try:
|
||||
event = redis_client.eval(
|
||||
cls.LUA_SELECT,
|
||||
2,
|
||||
_address(tenant_id, user_id, app_id, node_id),
|
||||
cls.waiting_pool(tenant_id, subscription_id, trigger_name),
|
||||
address_id,
|
||||
)
|
||||
return TriggerDebugEvent.model_validate_json(event) if event else None
|
||||
except RedisError:
|
||||
logger.exception("Failed to poll debug event")
|
||||
return None
|
||||
|
||||
|
||||
class WebhookDebugService:
|
||||
"""Debug helpers dedicated to webhook triggers."""
|
||||
|
||||
@staticmethod
|
||||
def waiting_pool(tenant_id: str, app_id: str, node_id: str) -> str:
|
||||
return f"trigger_debug_waiting_pool:{{{tenant_id}}}:{app_id}:{node_id}"
|
||||
|
||||
@classmethod
|
||||
def dispatch_event(
|
||||
cls,
|
||||
tenant_id: str,
|
||||
app_id: str,
|
||||
node_id: str,
|
||||
request_id: str,
|
||||
timestamp: int,
|
||||
payload: dict[str, Any],
|
||||
) -> int:
|
||||
event_json = WebhookDebugEvent(
|
||||
request_id=request_id,
|
||||
timestamp=timestamp,
|
||||
node_id=node_id,
|
||||
payload=payload,
|
||||
).model_dump_json()
|
||||
|
||||
try:
|
||||
return redis_client.eval(
|
||||
TriggerDebugService.LUA_DISPATCH,
|
||||
result = redis_client.eval(
|
||||
cls.LUA_DISPATCH,
|
||||
1,
|
||||
cls.waiting_pool(tenant_id, app_id, node_id),
|
||||
pool_key,
|
||||
tenant_id,
|
||||
event_json,
|
||||
)
|
||||
return int(result)
|
||||
except RedisError:
|
||||
logger.exception("Failed to dispatch webhook debug event")
|
||||
logger.exception("Failed to dispatch event to pool: %s", pool_key)
|
||||
return 0
|
||||
|
||||
@classmethod
|
||||
def poll_event(
|
||||
def poll(
|
||||
cls,
|
||||
event_type: type[TEvent],
|
||||
pool_key: str,
|
||||
tenant_id: str,
|
||||
user_id: str,
|
||||
app_id: str,
|
||||
node_id: str,
|
||||
) -> Optional[WebhookDebugEvent]:
|
||||
address_id = hashlib.sha1(f"{user_id}|{app_id}|{node_id}".encode()).hexdigest()
|
||||
) -> Optional[TEvent]:
|
||||
"""
|
||||
Poll for an event or register to the waiting pool.
|
||||
|
||||
If an event is available in the inbox, return it immediately.
|
||||
Otherwise, register the address to the waiting pool for future dispatch.
|
||||
|
||||
Args:
|
||||
event_class: Event class for deserialization and type safety
|
||||
pool_key: Pool key (generate using event_class.build_pool_key(...))
|
||||
tenant_id: Tenant ID
|
||||
user_id: User ID for address calculation
|
||||
app_id: App ID for address calculation
|
||||
node_id: Node ID for address calculation
|
||||
|
||||
Returns:
|
||||
Event object if available, None otherwise
|
||||
"""
|
||||
address_id: str = hashlib.sha1(f"{user_id}|{app_id}|{node_id}".encode()).hexdigest()
|
||||
address: str = f"trigger_debug_inbox:{{{tenant_id}}}:{address_id}"
|
||||
|
||||
try:
|
||||
event = redis_client.eval(
|
||||
TriggerDebugService.LUA_SELECT,
|
||||
event_data = redis_client.eval(
|
||||
cls.LUA_SELECT,
|
||||
2,
|
||||
_address(tenant_id, user_id, app_id, node_id),
|
||||
cls.waiting_pool(tenant_id, app_id, node_id),
|
||||
address,
|
||||
pool_key,
|
||||
address_id,
|
||||
)
|
||||
return WebhookDebugEvent.model_validate_json(event) if event else None
|
||||
return event_type.model_validate_json(json_data=event_data) if event_data else None
|
||||
except RedisError:
|
||||
logger.exception("Failed to poll webhook debug event")
|
||||
logger.exception("Failed to poll event from pool: %s", pool_key)
|
||||
return None
|
||||
|
|
|
|||
|
|
@ -44,13 +44,13 @@ class WebhookService:
|
|||
Args:
|
||||
webhook_id: The webhook ID to look up
|
||||
is_debug: If True, use the draft workflow graph and skip the trigger enabled status check
|
||||
|
||||
|
||||
Returns:
|
||||
A tuple containing:
|
||||
- WorkflowWebhookTrigger: The webhook trigger object
|
||||
- Workflow: The associated workflow object
|
||||
- Mapping[str, Any]: The node configuration data
|
||||
|
||||
|
||||
Raises:
|
||||
ValueError: If webhook not found, app trigger not found, trigger disabled, or workflow not found
|
||||
"""
|
||||
|
|
@ -61,7 +61,7 @@ class WebhookService:
|
|||
)
|
||||
if not webhook_trigger:
|
||||
raise ValueError(f"Webhook not found: {webhook_id}")
|
||||
|
||||
|
||||
if is_debug:
|
||||
workflow = (
|
||||
session.query(Workflow)
|
||||
|
|
@ -113,14 +113,14 @@ class WebhookService:
|
|||
cls, webhook_trigger: WorkflowWebhookTrigger, node_config: Mapping[str, Any]
|
||||
) -> dict[str, Any]:
|
||||
"""Extract and validate webhook data in a single unified process.
|
||||
|
||||
|
||||
Args:
|
||||
webhook_trigger: The webhook trigger object containing metadata
|
||||
node_config: The node configuration containing validation rules
|
||||
|
||||
|
||||
Returns:
|
||||
dict[str, Any]: Processed and validated webhook data with correct types
|
||||
|
||||
|
||||
Raises:
|
||||
ValueError: If validation fails (HTTP method mismatch, missing required fields, type errors)
|
||||
"""
|
||||
|
|
@ -141,10 +141,10 @@ class WebhookService:
|
|||
@classmethod
|
||||
def extract_webhook_data(cls, webhook_trigger: WorkflowWebhookTrigger) -> dict[str, Any]:
|
||||
"""Extract raw data from incoming webhook request without type conversion.
|
||||
|
||||
|
||||
Args:
|
||||
webhook_trigger: The webhook trigger object for file processing context
|
||||
|
||||
|
||||
Returns:
|
||||
dict[str, Any]: Raw webhook data containing:
|
||||
- method: HTTP method
|
||||
|
|
@ -191,14 +191,14 @@ class WebhookService:
|
|||
@classmethod
|
||||
def _process_and_validate_data(cls, raw_data: dict[str, Any], node_data: dict[str, Any]) -> dict[str, Any]:
|
||||
"""Process and validate webhook data according to node configuration.
|
||||
|
||||
|
||||
Args:
|
||||
raw_data: Raw webhook data from extraction
|
||||
node_data: Node configuration containing validation and type rules
|
||||
|
||||
|
||||
Returns:
|
||||
dict[str, Any]: Processed data with validated types
|
||||
|
||||
|
||||
Raises:
|
||||
ValueError: If validation fails or required fields are missing
|
||||
"""
|
||||
|
|
@ -233,7 +233,7 @@ class WebhookService:
|
|||
@classmethod
|
||||
def _extract_json_body(cls) -> tuple[dict[str, Any], dict[str, Any]]:
|
||||
"""Extract JSON body from request.
|
||||
|
||||
|
||||
Returns:
|
||||
tuple: (body_data, files_data) where:
|
||||
- body_data: Parsed JSON content or empty dict if parsing fails
|
||||
|
|
@ -249,7 +249,7 @@ class WebhookService:
|
|||
@classmethod
|
||||
def _extract_form_body(cls) -> tuple[dict[str, Any], dict[str, Any]]:
|
||||
"""Extract form-urlencoded body from request.
|
||||
|
||||
|
||||
Returns:
|
||||
tuple: (body_data, files_data) where:
|
||||
- body_data: Form data as key-value pairs
|
||||
|
|
@ -260,10 +260,10 @@ class WebhookService:
|
|||
@classmethod
|
||||
def _extract_multipart_body(cls, webhook_trigger: WorkflowWebhookTrigger) -> tuple[dict[str, Any], dict[str, Any]]:
|
||||
"""Extract multipart/form-data body and files from request.
|
||||
|
||||
|
||||
Args:
|
||||
webhook_trigger: Webhook trigger for file processing context
|
||||
|
||||
|
||||
Returns:
|
||||
tuple: (body_data, files_data) where:
|
||||
- body_data: Form data as key-value pairs
|
||||
|
|
@ -278,10 +278,10 @@ class WebhookService:
|
|||
cls, webhook_trigger: WorkflowWebhookTrigger
|
||||
) -> tuple[dict[str, Any], dict[str, Any]]:
|
||||
"""Extract binary data as file from request.
|
||||
|
||||
|
||||
Args:
|
||||
webhook_trigger: Webhook trigger for file processing context
|
||||
|
||||
|
||||
Returns:
|
||||
tuple: (body_data, files_data) where:
|
||||
- body_data: Dict with 'raw' key containing file object or None
|
||||
|
|
@ -301,7 +301,7 @@ class WebhookService:
|
|||
@classmethod
|
||||
def _extract_text_body(cls) -> tuple[dict[str, Any], dict[str, Any]]:
|
||||
"""Extract text/plain body from request.
|
||||
|
||||
|
||||
Returns:
|
||||
tuple: (body_data, files_data) where:
|
||||
- body_data: Dict with 'raw' key containing text content
|
||||
|
|
@ -317,11 +317,11 @@ class WebhookService:
|
|||
@classmethod
|
||||
def _process_file_uploads(cls, files, webhook_trigger: WorkflowWebhookTrigger) -> dict[str, Any]:
|
||||
"""Process file uploads using ToolFileManager.
|
||||
|
||||
|
||||
Args:
|
||||
files: Flask request files object containing uploaded files
|
||||
webhook_trigger: Webhook trigger for tenant and user context
|
||||
|
||||
|
||||
Returns:
|
||||
dict[str, Any]: Processed file objects indexed by field name
|
||||
"""
|
||||
|
|
@ -345,12 +345,12 @@ class WebhookService:
|
|||
cls, file_content: bytes, mimetype: str, webhook_trigger: WorkflowWebhookTrigger
|
||||
) -> Any:
|
||||
"""Create a file object from binary content using ToolFileManager.
|
||||
|
||||
|
||||
Args:
|
||||
file_content: The binary content of the file
|
||||
mimetype: The MIME type of the file
|
||||
webhook_trigger: Webhook trigger for tenant and user context
|
||||
|
||||
|
||||
Returns:
|
||||
Any: A file object built from the binary content
|
||||
"""
|
||||
|
|
@ -380,15 +380,15 @@ class WebhookService:
|
|||
cls, raw_params: dict[str, str], param_configs: list, is_form_data: bool = False
|
||||
) -> dict[str, Any]:
|
||||
"""Process parameters with unified validation and type conversion.
|
||||
|
||||
|
||||
Args:
|
||||
raw_params: Raw parameter values as strings
|
||||
param_configs: List of parameter configuration dictionaries
|
||||
is_form_data: Whether the parameters are from form data (requiring string conversion)
|
||||
|
||||
|
||||
Returns:
|
||||
dict[str, Any]: Processed parameters with validated types
|
||||
|
||||
|
||||
Raises:
|
||||
ValueError: If required parameters are missing or validation fails
|
||||
"""
|
||||
|
|
@ -421,15 +421,15 @@ class WebhookService:
|
|||
cls, raw_body: dict[str, Any], body_configs: list, content_type: str
|
||||
) -> dict[str, Any]:
|
||||
"""Process body parameters based on content type and configuration.
|
||||
|
||||
|
||||
Args:
|
||||
raw_body: Raw body data from request
|
||||
body_configs: List of body parameter configuration dictionaries
|
||||
content_type: The request content type
|
||||
|
||||
|
||||
Returns:
|
||||
dict[str, Any]: Processed body parameters with validated types
|
||||
|
||||
|
||||
Raises:
|
||||
ValueError: If required body parameters are missing or validation fails
|
||||
"""
|
||||
|
|
@ -474,16 +474,16 @@ class WebhookService:
|
|||
@classmethod
|
||||
def _validate_and_convert_value(cls, param_name: str, value: Any, param_type: str, is_form_data: bool) -> Any:
|
||||
"""Unified validation and type conversion for parameter values.
|
||||
|
||||
|
||||
Args:
|
||||
param_name: Name of the parameter for error reporting
|
||||
value: The value to validate and convert
|
||||
param_type: The expected parameter type (SegmentType)
|
||||
is_form_data: Whether the value is from form data (requiring string conversion)
|
||||
|
||||
|
||||
Returns:
|
||||
Any: The validated and converted value
|
||||
|
||||
|
||||
Raises:
|
||||
ValueError: If validation or conversion fails
|
||||
"""
|
||||
|
|
@ -500,15 +500,15 @@ class WebhookService:
|
|||
@classmethod
|
||||
def _convert_form_value(cls, param_name: str, value: str, param_type: str) -> Any:
|
||||
"""Convert form data string values to specified types.
|
||||
|
||||
|
||||
Args:
|
||||
param_name: Name of the parameter for error reporting
|
||||
value: The string value to convert
|
||||
param_type: The target type to convert to (SegmentType)
|
||||
|
||||
|
||||
Returns:
|
||||
Any: The converted value in the appropriate type
|
||||
|
||||
|
||||
Raises:
|
||||
ValueError: If the value cannot be converted to the specified type
|
||||
"""
|
||||
|
|
@ -531,15 +531,15 @@ class WebhookService:
|
|||
@classmethod
|
||||
def _validate_json_value(cls, param_name: str, value: Any, param_type: str) -> Any:
|
||||
"""Validate JSON values against expected types.
|
||||
|
||||
|
||||
Args:
|
||||
param_name: Name of the parameter for error reporting
|
||||
value: The value to validate
|
||||
param_type: The expected parameter type (SegmentType)
|
||||
|
||||
|
||||
Returns:
|
||||
Any: The validated value (unchanged if valid)
|
||||
|
||||
|
||||
Raises:
|
||||
ValueError: If the value type doesn't match the expected type
|
||||
"""
|
||||
|
|
@ -581,11 +581,11 @@ class WebhookService:
|
|||
@classmethod
|
||||
def _validate_required_headers(cls, headers: dict[str, Any], header_configs: list) -> None:
|
||||
"""Validate required headers are present.
|
||||
|
||||
|
||||
Args:
|
||||
headers: Request headers dictionary
|
||||
header_configs: List of header configuration dictionaries
|
||||
|
||||
|
||||
Raises:
|
||||
ValueError: If required headers are missing
|
||||
"""
|
||||
|
|
@ -599,11 +599,11 @@ class WebhookService:
|
|||
@classmethod
|
||||
def _validate_http_metadata(cls, webhook_data: dict[str, Any], node_data: dict[str, Any]) -> dict[str, Any]:
|
||||
"""Validate HTTP method and content-type.
|
||||
|
||||
|
||||
Args:
|
||||
webhook_data: Extracted webhook data containing method and headers
|
||||
node_data: Node configuration containing expected method and content-type
|
||||
|
||||
|
||||
Returns:
|
||||
dict[str, Any]: Validation result with 'valid' key and optional 'error' key
|
||||
"""
|
||||
|
|
@ -627,10 +627,10 @@ class WebhookService:
|
|||
@classmethod
|
||||
def _extract_content_type(cls, headers: dict[str, Any]) -> str:
|
||||
"""Extract and normalize content-type from headers.
|
||||
|
||||
|
||||
Args:
|
||||
headers: Request headers dictionary
|
||||
|
||||
|
||||
Returns:
|
||||
str: Normalized content-type (main type without parameters)
|
||||
"""
|
||||
|
|
@ -643,10 +643,10 @@ class WebhookService:
|
|||
@classmethod
|
||||
def _validation_error(cls, error_message: str) -> dict[str, Any]:
|
||||
"""Create a standard validation error response.
|
||||
|
||||
|
||||
Args:
|
||||
error_message: The error message to include
|
||||
|
||||
|
||||
Returns:
|
||||
dict[str, Any]: Validation error response with 'valid' and 'error' keys
|
||||
"""
|
||||
|
|
@ -664,10 +664,10 @@ class WebhookService:
|
|||
@classmethod
|
||||
def build_workflow_inputs(cls, webhook_data: dict[str, Any]) -> dict[str, Any]:
|
||||
"""Construct workflow inputs payload from webhook data.
|
||||
|
||||
|
||||
Args:
|
||||
webhook_data: Processed webhook data containing headers, query params, and body
|
||||
|
||||
|
||||
Returns:
|
||||
dict[str, Any]: Workflow inputs formatted for execution
|
||||
"""
|
||||
|
|
@ -683,12 +683,12 @@ class WebhookService:
|
|||
cls, webhook_trigger: WorkflowWebhookTrigger, webhook_data: dict[str, Any], workflow: Workflow
|
||||
) -> None:
|
||||
"""Trigger workflow execution via AsyncWorkflowService.
|
||||
|
||||
|
||||
Args:
|
||||
webhook_trigger: The webhook trigger object
|
||||
webhook_data: Processed webhook data for workflow inputs
|
||||
workflow: The workflow to execute
|
||||
|
||||
|
||||
Raises:
|
||||
ValueError: If tenant owner is not found
|
||||
Exception: If workflow execution fails
|
||||
|
|
@ -737,10 +737,10 @@ class WebhookService:
|
|||
@classmethod
|
||||
def generate_webhook_response(cls, node_config: Mapping[str, Any]) -> tuple[dict[str, Any], int]:
|
||||
"""Generate HTTP response based on node configuration.
|
||||
|
||||
|
||||
Args:
|
||||
node_config: Node configuration containing response settings
|
||||
|
||||
|
||||
Returns:
|
||||
tuple[dict[str, Any], int]: Response data and HTTP status code
|
||||
"""
|
||||
|
|
|
|||
|
|
@ -15,7 +15,7 @@ from extensions.ext_database import db
|
|||
from extensions.ext_storage import storage
|
||||
from models.provider_ids import TriggerProviderID
|
||||
from models.trigger import TriggerSubscription
|
||||
from services.trigger.trigger_debug_service import TriggerDebugService
|
||||
from services.trigger.trigger_debug_service import PluginTriggerDebugEvent, TriggerDebugService
|
||||
from services.trigger.trigger_service import TriggerService
|
||||
from services.workflow.entities import PluginTriggerDispatchData
|
||||
|
||||
|
|
@ -114,13 +114,23 @@ def dispatch_triggered_workflows_async(
|
|||
# Dispatch to debug sessions after processing all triggers
|
||||
debug_dispatched = 0
|
||||
try:
|
||||
debug_dispatched = TriggerDebugService.dispatch_debug_event(
|
||||
tenant_id=subscription.tenant_id,
|
||||
subscription_id=subscription_id,
|
||||
events=events,
|
||||
timestamp=timestamp,
|
||||
request_id=request_id,
|
||||
)
|
||||
for event_name in events:
|
||||
pool_key: str = PluginTriggerDebugEvent.build_pool_key(
|
||||
tenant_id=subscription.tenant_id,
|
||||
subscription_id=subscription_id,
|
||||
trigger_name=event_name,
|
||||
)
|
||||
event = PluginTriggerDebugEvent(
|
||||
subscription_id=subscription_id,
|
||||
request_id=request_id,
|
||||
timestamp=timestamp,
|
||||
event_name=event_name,
|
||||
)
|
||||
debug_dispatched += TriggerDebugService.dispatch(
|
||||
tenant_id=subscription.tenant_id,
|
||||
event=event,
|
||||
pool_key=pool_key,
|
||||
)
|
||||
except Exception:
|
||||
# Silent failure for debug dispatch
|
||||
logger.exception("Failed to dispatch to debug sessions")
|
||||
|
|
|
|||
Loading…
Reference in New Issue