From a33d04d1ac876023c3014c80c6d353f0cebd9da8 Mon Sep 17 00:00:00 2001 From: Harry Date: Wed, 8 Oct 2025 17:31:16 +0800 Subject: [PATCH] refactor(trigger): unify debug event handling and improve polling mechanism - Introduced a base class for debug events to streamline event handling. - Refactored `TriggerDebugService` to support multiple event types through a generic dispatch/poll interface. - Updated webhook and plugin trigger debug services to utilize the new event structure. - Enhanced the dispatch logic in `dispatch_triggered_workflows_async` to accommodate the new event model. --- api/controllers/console/app/workflow.py | 56 +++-- api/controllers/trigger/webhook.py | 14 +- api/services/trigger/trigger_debug_service.py | 221 +++++++++--------- api/services/trigger/webhook_service.py | 102 ++++---- api/tasks/trigger_processing_tasks.py | 26 ++- 5 files changed, 233 insertions(+), 186 deletions(-) diff --git a/api/controllers/console/app/workflow.py b/api/controllers/console/app/workflow.py index 3ea0ca8ab8..14fa92407a 100644 --- a/api/controllers/console/app/workflow.py +++ b/api/controllers/console/app/workflow.py @@ -35,7 +35,11 @@ from models.workflow import NodeType, Workflow from services.app_generate_service import AppGenerateService from services.errors.app import WorkflowHashNotEqualError from services.errors.llm import InvokeRateLimitError -from services.trigger.trigger_debug_service import TriggerDebugService, WebhookDebugService +from services.trigger.trigger_debug_service import ( + PluginTriggerDebugEvent, + TriggerDebugService, + WebhookDebugEvent, +) from services.trigger.webhook_service import WebhookService from services.workflow_service import DraftWorkflowDeletionError, WorkflowInUseError, WorkflowService @@ -1030,13 +1034,18 @@ class DraftWorkflowTriggerNodeApi(Resource): trigger_name = args["trigger_name"] subscription_id = args["subscription_id"] - event = TriggerDebugService.poll_event( + pool_key = PluginTriggerDebugEvent.build_pool_key( + tenant_id=app_model.tenant_id, + subscription_id=subscription_id, + trigger_name=trigger_name, + ) + event: PluginTriggerDebugEvent | None = TriggerDebugService.poll( + event_type=PluginTriggerDebugEvent, + pool_key=pool_key, tenant_id=app_model.tenant_id, user_id=current_user.id, app_id=app_model.id, - subscription_id=subscription_id, node_id=node_id, - trigger_name=trigger_name, ) if not event: return jsonable_encoder({"status": "waiting"}) @@ -1110,13 +1119,18 @@ class DraftWorkflowTriggerRunApi(Resource): trigger_name = args["trigger_name"] subscription_id = args["subscription_id"] - event = TriggerDebugService.poll_event( + pool_key = PluginTriggerDebugEvent.build_pool_key( + tenant_id=app_model.tenant_id, + subscription_id=subscription_id, + trigger_name=trigger_name, + ) + event: PluginTriggerDebugEvent | None = TriggerDebugService.poll( + event_type=PluginTriggerDebugEvent, + pool_key=pool_key, tenant_id=app_model.tenant_id, user_id=current_user.id, app_id=app_model.id, - subscription_id=subscription_id, node_id=node_id, - trigger_name=trigger_name, ) if not event: return jsonable_encoder({"status": "waiting"}) @@ -1187,7 +1201,14 @@ class DraftWorkflowTriggerWebhookRunApi(Resource): args = parser.parse_args() node_id = args["node_id"] - event = WebhookDebugService.poll_event( + pool_key = WebhookDebugEvent.build_pool_key( + tenant_id=app_model.tenant_id, + app_id=app_model.id, + node_id=node_id, + ) + event: WebhookDebugEvent | None = TriggerDebugService.poll( + event_type=WebhookDebugEvent, + pool_key=pool_key, tenant_id=app_model.tenant_id, user_id=current_user.id, app_id=app_model.id, @@ -1253,7 +1274,14 @@ class DraftWorkflowNodeWebhookDebugRunApi(Resource): if not isinstance(current_user, Account) or not current_user.has_edit_permission: raise Forbidden() - event = WebhookDebugService.poll_event( + pool_key = WebhookDebugEvent.build_pool_key( + tenant_id=app_model.tenant_id, + app_id=app_model.id, + node_id=node_id, + ) + event: WebhookDebugEvent | None = TriggerDebugService.poll( + event_type=WebhookDebugEvent, + pool_key=pool_key, tenant_id=app_model.tenant_id, user_id=current_user.id, app_id=app_model.id, @@ -1272,10 +1300,12 @@ class DraftWorkflowNodeWebhookDebugRunApi(Resource): node_config = draft_workflow.get_node_config_by_id(node_id) node_type = Workflow.get_node_type_from_node_config(node_config) if node_type != NodeType.TRIGGER_WEBHOOK: - return jsonable_encoder({ - "status": "error", - "message": "node is not webhook trigger", - }), 400 + return jsonable_encoder( + { + "status": "error", + "message": "node is not webhook trigger", + } + ), 400 payload = event.payload or {} workflow_inputs = payload.get("inputs") diff --git a/api/controllers/trigger/webhook.py b/api/controllers/trigger/webhook.py index 3945921ba1..bd354cd9b7 100644 --- a/api/controllers/trigger/webhook.py +++ b/api/controllers/trigger/webhook.py @@ -5,7 +5,7 @@ from flask import jsonify from werkzeug.exceptions import NotFound, RequestEntityTooLarge from controllers.trigger import bp -from services.trigger.trigger_debug_service import WebhookDebugService +from services.trigger.trigger_debug_service import TriggerDebugService, WebhookDebugEvent from services.trigger.webhook_service import WebhookService logger = logging.getLogger(__name__) @@ -70,18 +70,28 @@ def handle_webhook_debug(webhook_id: str): return jsonify({"error": "Bad Request", "message": error}), 400 workflow_inputs = WebhookService.build_workflow_inputs(webhook_data) - WebhookDebugService.dispatch_event( + + # Generate pool key and dispatch debug event + pool_key: str = WebhookDebugEvent.build_pool_key( tenant_id=webhook_trigger.tenant_id, app_id=webhook_trigger.app_id, node_id=webhook_trigger.node_id, + ) + event = WebhookDebugEvent( request_id=f"webhook_debug_{webhook_trigger.webhook_id}_{int(time.time() * 1000)}", timestamp=int(time.time()), + node_id=webhook_trigger.node_id, payload={ "inputs": workflow_inputs, "webhook_data": webhook_data, "method": webhook_data.get("method"), }, ) + TriggerDebugService.dispatch( + tenant_id=webhook_trigger.tenant_id, + event=event, + pool_key=pool_key, + ) response_data, status_code = WebhookService.generate_webhook_response(node_config) return jsonify(response_data), status_code diff --git a/api/services/trigger/trigger_debug_service.py b/api/services/trigger/trigger_debug_service.py index b373144c1d..6eaa923be8 100644 --- a/api/services/trigger/trigger_debug_service.py +++ b/api/services/trigger/trigger_debug_service.py @@ -2,7 +2,8 @@ import hashlib import logging -from typing import Any, Optional +from abc import ABC, abstractmethod +from typing import Any, Optional, TypeVar from pydantic import BaseModel, Field from redis import RedisError @@ -13,36 +14,84 @@ logger = logging.getLogger(__name__) TRIGGER_DEBUG_EVENT_TTL = 300 +TEvent = TypeVar("TEvent", bound="BaseDebugEvent") -class TriggerDebugEvent(BaseModel): + +class BaseDebugEvent(ABC, BaseModel): + """Base class for all debug events.""" + + timestamp: int + + @classmethod + @abstractmethod + def build_pool_key(cls, **kwargs: Any) -> str: + """ + Generate the waiting pool key for this event type. + + Each subclass implements its own pool key strategy based on routing parameters. + + Returns: + Redis key for the waiting pool + """ + raise NotImplementedError("Subclasses must implement build_pool_key") + + +class PluginTriggerDebugEvent(BaseDebugEvent): + """Debug event for plugin triggers.""" + + request_id: str subscription_id: str - request_id: str - timestamp: int + event_name: str + + @classmethod + def build_pool_key(cls, **kwargs: Any) -> str: + """Generate pool key for plugin trigger events. + + Args: + tenant_id: Tenant ID + subscription_id: Subscription ID + trigger_name: Trigger name + """ + tenant_id = kwargs["tenant_id"] + subscription_id = kwargs["subscription_id"] + trigger_name = kwargs["trigger_name"] + return f"trigger_debug_waiting_pool:{{{tenant_id}}}:{subscription_id}:{trigger_name}" -class WebhookDebugEvent(BaseModel): +class WebhookDebugEvent(BaseDebugEvent): + """Debug event for webhook triggers.""" + request_id: str - timestamp: int node_id: str payload: dict[str, Any] = Field(default_factory=dict) + @classmethod + def build_pool_key(cls, **kwargs: Any) -> str: + """Generate pool key for webhook events. -def _address(tenant_id: str, user_id: str, app_id: str, node_id: str) -> str: - address_id = hashlib.sha1(f"{user_id}|{app_id}|{node_id}".encode()).hexdigest() - return f"trigger_debug_inbox:{{{tenant_id}}}:{address_id}" + Args: + tenant_id: Tenant ID + app_id: App ID + node_id: Node ID + """ + tenant_id = kwargs["tenant_id"] + app_id = kwargs["app_id"] + node_id = kwargs["node_id"] + return f"trigger_debug_waiting_pool:{{{tenant_id}}}:{app_id}:{node_id}" class TriggerDebugService: """ - Redis-based trigger debug service with polling support. + Unified Redis-based trigger debug service with polling support. + Uses {tenant_id} hash tags for Redis Cluster compatibility. + Supports multiple event types through a generic dispatch/poll interface. """ # LUA_SELECT: Atomic poll or register for event # KEYS[1] = trigger_debug_inbox:{tenant_id}:{address_id} - # KEYS[2] = trigger_debug_waiting_pool:{tenant_id}:{subscription_id}:{trigger} + # KEYS[2] = trigger_debug_waiting_pool:{tenant_id}:... # ARGV[1] = address_id - # compressed lua code, you can use LLM to uncompress it LUA_SELECT = ( "local v=redis.call('GET',KEYS[1]);" "if v then redis.call('DEL',KEYS[1]);return v end;" @@ -52,10 +101,9 @@ class TriggerDebugService: ) # LUA_DISPATCH: Dispatch event to all waiting addresses - # KEYS[1] = trigger_debug_waiting_pool:{tenant_id}:{subscription_id}:{trigger} + # KEYS[1] = trigger_debug_waiting_pool:{tenant_id}:... # ARGV[1] = tenant_id # ARGV[2] = event_json - # compressed lua code, you can use LLM to uncompress it LUA_DISPATCH = ( "local a=redis.call('SMEMBERS',KEYS[1]);" "if #a==0 then return 0 end;" @@ -67,127 +115,76 @@ class TriggerDebugService: ) @classmethod - def waiting_pool(cls, tenant_id: str, subscription_id: str, trigger_name: str) -> str: - return f"trigger_debug_waiting_pool:{{{tenant_id}}}:{subscription_id}:{trigger_name}" - - @classmethod - def dispatch_debug_event( + def dispatch( cls, tenant_id: str, - subscription_id: str, - events: list[str], - request_id: str, - timestamp: int, + event: BaseDebugEvent, + pool_key: str, ) -> int: - event_json = TriggerDebugEvent( - subscription_id=subscription_id, - request_id=request_id, - timestamp=timestamp, - ).model_dump_json() + """ + Dispatch event to all waiting addresses in the pool. - dispatched = 0 - if len(events) > 10: - logger.warning( - "Too many events to dispatch at once: %d events tenant: %s subscription: %s", - len(events), - tenant_id, - subscription_id, - ) - - for trigger_name in events: - try: - dispatched += redis_client.eval( - cls.LUA_DISPATCH, - 1, - cls.waiting_pool(tenant_id, subscription_id, trigger_name), - tenant_id, - event_json, - ) - except RedisError: - logger.exception("Failed to dispatch for trigger: %s", trigger_name) - return dispatched - - @classmethod - def poll_event( - cls, - tenant_id: str, - user_id: str, - app_id: str, - subscription_id: str, - node_id: str, - trigger_name: str, - ) -> Optional[TriggerDebugEvent]: - address_id = hashlib.sha1(f"{user_id}|{app_id}|{node_id}".encode()).hexdigest() + Args: + tenant_id: Tenant ID for hash tag + event: Event object to dispatch + pool_key: Pool key (generate using event_class.build_pool_key(...)) + Returns: + Number of addresses the event was dispatched to + """ + event_json = event.model_dump_json() try: - event = redis_client.eval( - cls.LUA_SELECT, - 2, - _address(tenant_id, user_id, app_id, node_id), - cls.waiting_pool(tenant_id, subscription_id, trigger_name), - address_id, - ) - return TriggerDebugEvent.model_validate_json(event) if event else None - except RedisError: - logger.exception("Failed to poll debug event") - return None - - -class WebhookDebugService: - """Debug helpers dedicated to webhook triggers.""" - - @staticmethod - def waiting_pool(tenant_id: str, app_id: str, node_id: str) -> str: - return f"trigger_debug_waiting_pool:{{{tenant_id}}}:{app_id}:{node_id}" - - @classmethod - def dispatch_event( - cls, - tenant_id: str, - app_id: str, - node_id: str, - request_id: str, - timestamp: int, - payload: dict[str, Any], - ) -> int: - event_json = WebhookDebugEvent( - request_id=request_id, - timestamp=timestamp, - node_id=node_id, - payload=payload, - ).model_dump_json() - - try: - return redis_client.eval( - TriggerDebugService.LUA_DISPATCH, + result = redis_client.eval( + cls.LUA_DISPATCH, 1, - cls.waiting_pool(tenant_id, app_id, node_id), + pool_key, tenant_id, event_json, ) + return int(result) except RedisError: - logger.exception("Failed to dispatch webhook debug event") + logger.exception("Failed to dispatch event to pool: %s", pool_key) return 0 @classmethod - def poll_event( + def poll( cls, + event_type: type[TEvent], + pool_key: str, tenant_id: str, user_id: str, app_id: str, node_id: str, - ) -> Optional[WebhookDebugEvent]: - address_id = hashlib.sha1(f"{user_id}|{app_id}|{node_id}".encode()).hexdigest() + ) -> Optional[TEvent]: + """ + Poll for an event or register to the waiting pool. + + If an event is available in the inbox, return it immediately. + Otherwise, register the address to the waiting pool for future dispatch. + + Args: + event_class: Event class for deserialization and type safety + pool_key: Pool key (generate using event_class.build_pool_key(...)) + tenant_id: Tenant ID + user_id: User ID for address calculation + app_id: App ID for address calculation + node_id: Node ID for address calculation + + Returns: + Event object if available, None otherwise + """ + address_id: str = hashlib.sha1(f"{user_id}|{app_id}|{node_id}".encode()).hexdigest() + address: str = f"trigger_debug_inbox:{{{tenant_id}}}:{address_id}" try: - event = redis_client.eval( - TriggerDebugService.LUA_SELECT, + event_data = redis_client.eval( + cls.LUA_SELECT, 2, - _address(tenant_id, user_id, app_id, node_id), - cls.waiting_pool(tenant_id, app_id, node_id), + address, + pool_key, address_id, ) - return WebhookDebugEvent.model_validate_json(event) if event else None + return event_type.model_validate_json(json_data=event_data) if event_data else None except RedisError: - logger.exception("Failed to poll webhook debug event") + logger.exception("Failed to poll event from pool: %s", pool_key) return None diff --git a/api/services/trigger/webhook_service.py b/api/services/trigger/webhook_service.py index 2afb3eca59..4038107899 100644 --- a/api/services/trigger/webhook_service.py +++ b/api/services/trigger/webhook_service.py @@ -44,13 +44,13 @@ class WebhookService: Args: webhook_id: The webhook ID to look up is_debug: If True, use the draft workflow graph and skip the trigger enabled status check - + Returns: A tuple containing: - WorkflowWebhookTrigger: The webhook trigger object - Workflow: The associated workflow object - Mapping[str, Any]: The node configuration data - + Raises: ValueError: If webhook not found, app trigger not found, trigger disabled, or workflow not found """ @@ -61,7 +61,7 @@ class WebhookService: ) if not webhook_trigger: raise ValueError(f"Webhook not found: {webhook_id}") - + if is_debug: workflow = ( session.query(Workflow) @@ -113,14 +113,14 @@ class WebhookService: cls, webhook_trigger: WorkflowWebhookTrigger, node_config: Mapping[str, Any] ) -> dict[str, Any]: """Extract and validate webhook data in a single unified process. - + Args: webhook_trigger: The webhook trigger object containing metadata node_config: The node configuration containing validation rules - + Returns: dict[str, Any]: Processed and validated webhook data with correct types - + Raises: ValueError: If validation fails (HTTP method mismatch, missing required fields, type errors) """ @@ -141,10 +141,10 @@ class WebhookService: @classmethod def extract_webhook_data(cls, webhook_trigger: WorkflowWebhookTrigger) -> dict[str, Any]: """Extract raw data from incoming webhook request without type conversion. - + Args: webhook_trigger: The webhook trigger object for file processing context - + Returns: dict[str, Any]: Raw webhook data containing: - method: HTTP method @@ -191,14 +191,14 @@ class WebhookService: @classmethod def _process_and_validate_data(cls, raw_data: dict[str, Any], node_data: dict[str, Any]) -> dict[str, Any]: """Process and validate webhook data according to node configuration. - + Args: raw_data: Raw webhook data from extraction node_data: Node configuration containing validation and type rules - + Returns: dict[str, Any]: Processed data with validated types - + Raises: ValueError: If validation fails or required fields are missing """ @@ -233,7 +233,7 @@ class WebhookService: @classmethod def _extract_json_body(cls) -> tuple[dict[str, Any], dict[str, Any]]: """Extract JSON body from request. - + Returns: tuple: (body_data, files_data) where: - body_data: Parsed JSON content or empty dict if parsing fails @@ -249,7 +249,7 @@ class WebhookService: @classmethod def _extract_form_body(cls) -> tuple[dict[str, Any], dict[str, Any]]: """Extract form-urlencoded body from request. - + Returns: tuple: (body_data, files_data) where: - body_data: Form data as key-value pairs @@ -260,10 +260,10 @@ class WebhookService: @classmethod def _extract_multipart_body(cls, webhook_trigger: WorkflowWebhookTrigger) -> tuple[dict[str, Any], dict[str, Any]]: """Extract multipart/form-data body and files from request. - + Args: webhook_trigger: Webhook trigger for file processing context - + Returns: tuple: (body_data, files_data) where: - body_data: Form data as key-value pairs @@ -278,10 +278,10 @@ class WebhookService: cls, webhook_trigger: WorkflowWebhookTrigger ) -> tuple[dict[str, Any], dict[str, Any]]: """Extract binary data as file from request. - + Args: webhook_trigger: Webhook trigger for file processing context - + Returns: tuple: (body_data, files_data) where: - body_data: Dict with 'raw' key containing file object or None @@ -301,7 +301,7 @@ class WebhookService: @classmethod def _extract_text_body(cls) -> tuple[dict[str, Any], dict[str, Any]]: """Extract text/plain body from request. - + Returns: tuple: (body_data, files_data) where: - body_data: Dict with 'raw' key containing text content @@ -317,11 +317,11 @@ class WebhookService: @classmethod def _process_file_uploads(cls, files, webhook_trigger: WorkflowWebhookTrigger) -> dict[str, Any]: """Process file uploads using ToolFileManager. - + Args: files: Flask request files object containing uploaded files webhook_trigger: Webhook trigger for tenant and user context - + Returns: dict[str, Any]: Processed file objects indexed by field name """ @@ -345,12 +345,12 @@ class WebhookService: cls, file_content: bytes, mimetype: str, webhook_trigger: WorkflowWebhookTrigger ) -> Any: """Create a file object from binary content using ToolFileManager. - + Args: file_content: The binary content of the file mimetype: The MIME type of the file webhook_trigger: Webhook trigger for tenant and user context - + Returns: Any: A file object built from the binary content """ @@ -380,15 +380,15 @@ class WebhookService: cls, raw_params: dict[str, str], param_configs: list, is_form_data: bool = False ) -> dict[str, Any]: """Process parameters with unified validation and type conversion. - + Args: raw_params: Raw parameter values as strings param_configs: List of parameter configuration dictionaries is_form_data: Whether the parameters are from form data (requiring string conversion) - + Returns: dict[str, Any]: Processed parameters with validated types - + Raises: ValueError: If required parameters are missing or validation fails """ @@ -421,15 +421,15 @@ class WebhookService: cls, raw_body: dict[str, Any], body_configs: list, content_type: str ) -> dict[str, Any]: """Process body parameters based on content type and configuration. - + Args: raw_body: Raw body data from request body_configs: List of body parameter configuration dictionaries content_type: The request content type - + Returns: dict[str, Any]: Processed body parameters with validated types - + Raises: ValueError: If required body parameters are missing or validation fails """ @@ -474,16 +474,16 @@ class WebhookService: @classmethod def _validate_and_convert_value(cls, param_name: str, value: Any, param_type: str, is_form_data: bool) -> Any: """Unified validation and type conversion for parameter values. - + Args: param_name: Name of the parameter for error reporting value: The value to validate and convert param_type: The expected parameter type (SegmentType) is_form_data: Whether the value is from form data (requiring string conversion) - + Returns: Any: The validated and converted value - + Raises: ValueError: If validation or conversion fails """ @@ -500,15 +500,15 @@ class WebhookService: @classmethod def _convert_form_value(cls, param_name: str, value: str, param_type: str) -> Any: """Convert form data string values to specified types. - + Args: param_name: Name of the parameter for error reporting value: The string value to convert param_type: The target type to convert to (SegmentType) - + Returns: Any: The converted value in the appropriate type - + Raises: ValueError: If the value cannot be converted to the specified type """ @@ -531,15 +531,15 @@ class WebhookService: @classmethod def _validate_json_value(cls, param_name: str, value: Any, param_type: str) -> Any: """Validate JSON values against expected types. - + Args: param_name: Name of the parameter for error reporting value: The value to validate param_type: The expected parameter type (SegmentType) - + Returns: Any: The validated value (unchanged if valid) - + Raises: ValueError: If the value type doesn't match the expected type """ @@ -581,11 +581,11 @@ class WebhookService: @classmethod def _validate_required_headers(cls, headers: dict[str, Any], header_configs: list) -> None: """Validate required headers are present. - + Args: headers: Request headers dictionary header_configs: List of header configuration dictionaries - + Raises: ValueError: If required headers are missing """ @@ -599,11 +599,11 @@ class WebhookService: @classmethod def _validate_http_metadata(cls, webhook_data: dict[str, Any], node_data: dict[str, Any]) -> dict[str, Any]: """Validate HTTP method and content-type. - + Args: webhook_data: Extracted webhook data containing method and headers node_data: Node configuration containing expected method and content-type - + Returns: dict[str, Any]: Validation result with 'valid' key and optional 'error' key """ @@ -627,10 +627,10 @@ class WebhookService: @classmethod def _extract_content_type(cls, headers: dict[str, Any]) -> str: """Extract and normalize content-type from headers. - + Args: headers: Request headers dictionary - + Returns: str: Normalized content-type (main type without parameters) """ @@ -643,10 +643,10 @@ class WebhookService: @classmethod def _validation_error(cls, error_message: str) -> dict[str, Any]: """Create a standard validation error response. - + Args: error_message: The error message to include - + Returns: dict[str, Any]: Validation error response with 'valid' and 'error' keys """ @@ -664,10 +664,10 @@ class WebhookService: @classmethod def build_workflow_inputs(cls, webhook_data: dict[str, Any]) -> dict[str, Any]: """Construct workflow inputs payload from webhook data. - + Args: webhook_data: Processed webhook data containing headers, query params, and body - + Returns: dict[str, Any]: Workflow inputs formatted for execution """ @@ -683,12 +683,12 @@ class WebhookService: cls, webhook_trigger: WorkflowWebhookTrigger, webhook_data: dict[str, Any], workflow: Workflow ) -> None: """Trigger workflow execution via AsyncWorkflowService. - + Args: webhook_trigger: The webhook trigger object webhook_data: Processed webhook data for workflow inputs workflow: The workflow to execute - + Raises: ValueError: If tenant owner is not found Exception: If workflow execution fails @@ -737,10 +737,10 @@ class WebhookService: @classmethod def generate_webhook_response(cls, node_config: Mapping[str, Any]) -> tuple[dict[str, Any], int]: """Generate HTTP response based on node configuration. - + Args: node_config: Node configuration containing response settings - + Returns: tuple[dict[str, Any], int]: Response data and HTTP status code """ diff --git a/api/tasks/trigger_processing_tasks.py b/api/tasks/trigger_processing_tasks.py index 989265db7b..3611f295cd 100644 --- a/api/tasks/trigger_processing_tasks.py +++ b/api/tasks/trigger_processing_tasks.py @@ -15,7 +15,7 @@ from extensions.ext_database import db from extensions.ext_storage import storage from models.provider_ids import TriggerProviderID from models.trigger import TriggerSubscription -from services.trigger.trigger_debug_service import TriggerDebugService +from services.trigger.trigger_debug_service import PluginTriggerDebugEvent, TriggerDebugService from services.trigger.trigger_service import TriggerService from services.workflow.entities import PluginTriggerDispatchData @@ -114,13 +114,23 @@ def dispatch_triggered_workflows_async( # Dispatch to debug sessions after processing all triggers debug_dispatched = 0 try: - debug_dispatched = TriggerDebugService.dispatch_debug_event( - tenant_id=subscription.tenant_id, - subscription_id=subscription_id, - events=events, - timestamp=timestamp, - request_id=request_id, - ) + for event_name in events: + pool_key: str = PluginTriggerDebugEvent.build_pool_key( + tenant_id=subscription.tenant_id, + subscription_id=subscription_id, + trigger_name=event_name, + ) + event = PluginTriggerDebugEvent( + subscription_id=subscription_id, + request_id=request_id, + timestamp=timestamp, + event_name=event_name, + ) + debug_dispatched += TriggerDebugService.dispatch( + tenant_id=subscription.tenant_id, + event=event, + pool_key=pool_key, + ) except Exception: # Silent failure for debug dispatch logger.exception("Failed to dispatch to debug sessions")