From dab4e521af2d17c9dacf3bdac21ceee5ecbcc9cd Mon Sep 17 00:00:00 2001 From: Harry Date: Wed, 15 Oct 2025 01:18:06 +0800 Subject: [PATCH] feat(trigger): enhance trigger event handling and introduce new debug event polling - Refactored the `DraftWorkflowTriggerNodeApi` and related services to utilize the new `TriggerService` for polling debug events, improving modularity and clarity. - Added `poll_debug_event` methods in `TriggerService`, `ScheduleService`, and `WebhookService` to streamline event handling for different trigger types. - Introduced `ScheduleDebugEvent` and updated `PluginTriggerDebugEvent` to include a more structured approach for event data. - Enhanced the `invoke_trigger_event` method to improve error handling and data validation during trigger invocations. - Updated frontend API calls to align with the new event structure, removing deprecated parameters for cleaner integration. --- api/controllers/console/app/workflow.py | 228 +++++++++--------- api/core/workflow/nodes/base/node.py | 2 +- .../trigger_plugin/trigger_plugin_node.py | 3 +- .../trigger_schedule/trigger_schedule_node.py | 3 +- .../workflow/nodes/trigger_webhook/node.py | 3 +- .../plugin/plugin_parameter_service.py | 10 +- api/services/trigger/schedule_service.py | 17 ++ api/services/trigger/trigger_debug_service.py | 33 ++- .../trigger/trigger_provider_service.py | 12 +- api/services/trigger/trigger_service.py | 90 ++++++- api/services/trigger/webhook_service.py | 49 ++++ api/tasks/trigger_processing_tasks.py | 5 +- api/tasks/workflow_schedule_tasks.py | 35 +++ .../nodes/_base/hooks/use-one-step-run.ts | 8 - 14 files changed, 359 insertions(+), 139 deletions(-) diff --git a/api/controllers/console/app/workflow.py b/api/controllers/console/app/workflow.py index c685b73ab9..e57b5e7361 100644 --- a/api/controllers/console/app/workflow.py +++ b/api/controllers/console/app/workflow.py @@ -20,6 +20,7 @@ from core.app.entities.app_invoke_entities import InvokeFrom from core.file.models import File from core.helper.trace_id_helper import get_external_trace_id from core.model_runtime.utils.encoders import jsonable_encoder +from core.plugin.entities.request import TriggerInvokeEventResponse from core.workflow.graph_engine.manager import GraphEngineManager from extensions.ext_database import db from factories import file_factory, variable_factory @@ -32,16 +33,18 @@ from libs.login import current_user, login_required from models import App from models.account import Account from models.model import AppMode -from models.provider_ids import TriggerProviderID from models.workflow import NodeType, Workflow from services.app_generate_service import AppGenerateService from services.errors.app import WorkflowHashNotEqualError from services.errors.llm import InvokeRateLimitError +from services.trigger.schedule_service import ScheduleService from services.trigger.trigger_debug_service import ( PluginTriggerDebugEvent, + ScheduleDebugEvent, TriggerDebugService, WebhookDebugEvent, ) +from services.trigger.trigger_service import TriggerService from services.trigger.webhook_service import WebhookService from services.workflow_service import DraftWorkflowDeletionError, WorkflowInUseError, WorkflowService @@ -1010,16 +1013,6 @@ class DraftWorkflowTriggerNodeApi(Resource): @api.doc("poll_draft_workflow_trigger_node") @api.doc(description="Poll for trigger events and execute single node when event arrives") @api.doc(params={"app_id": "Application ID", "node_id": "Node ID"}) - @api.expect( - api.model( - "DraftWorkflowTriggerNodeRequest", - { - "event_name": fields.String(required=True, description="Event name"), - "subscription_id": fields.String(required=True, description="Subscription ID"), - "provider_id": fields.String(required=True, description="Provider ID"), - }, - ) - ) @api.response(200, "Trigger event received and node executed successfully") @api.response(403, "Permission denied") @api.response(500, "Internal server error") @@ -1034,28 +1027,8 @@ class DraftWorkflowTriggerNodeApi(Resource): if not isinstance(current_user, Account) or not current_user.has_edit_permission: raise Forbidden() - parser = reqparse.RequestParser() - parser.add_argument("event_name", type=str, required=True, location="json", nullable=False) - parser.add_argument("subscription_id", type=str, required=True, location="json", nullable=False) - parser.add_argument("provider_id", type=str, required=True, location="json", nullable=False) - args = parser.parse_args() - event_name = args["event_name"] - subscription_id = args["subscription_id"] - provider_id = args["provider_id"] - - pool_key = PluginTriggerDebugEvent.build_pool_key( - tenant_id=app_model.tenant_id, - provider_id=provider_id, - subscription_id=subscription_id, - event_name=event_name, - ) - event: PluginTriggerDebugEvent | None = TriggerDebugService.poll( - event_type=PluginTriggerDebugEvent, - pool_key=pool_key, - tenant_id=app_model.tenant_id, - user_id=current_user.id, - app_id=app_model.id, - node_id=node_id, + event: PluginTriggerDebugEvent | None = TriggerService.poll_debug_event( + app_model=app_model, user_id=current_user.id, node_id=node_id ) if not event: return jsonable_encoder({"status": "waiting"}) @@ -1066,22 +1039,31 @@ class DraftWorkflowTriggerNodeApi(Resource): if not draft_workflow: raise ValueError("Workflow not found") - user_inputs = event.model_dump() + response: TriggerInvokeEventResponse = TriggerService.invoke_trigger_event( + event=event, + user_id=current_user.id, + tenant_id=app_model.tenant_id, + node_config=draft_workflow.get_node_config_by_id(node_id=node_id), + ) + if response.cancelled: + return jsonable_encoder({"status": "cancelled"}) + node_execution = workflow_service.run_draft_workflow_node( app_model=app_model, draft_workflow=draft_workflow, node_id=node_id, - user_inputs=user_inputs, + user_inputs=response.variables, account=current_user, query="", files=[], ) return jsonable_encoder(node_execution) - except Exception: + except Exception as e: logger.exception("Error running draft workflow trigger node") return jsonable_encoder( { "status": "error", + "error": str(e), } ), 500 @@ -1122,56 +1104,18 @@ class DraftWorkflowTriggerRunApi(Resource): parser.add_argument("node_id", type=str, required=True, location="json", nullable=False) args = parser.parse_args() node_id = args["node_id"] - workflow_service = WorkflowService() - workflow: Workflow | None = workflow_service.get_draft_workflow( - app_model=app_model, - workflow_id=None, - ) - if not workflow: - return jsonable_encoder({"status": "error", "message": "Workflow not found"}), 404 - - node_data = workflow.get_node_config_by_id(node_id=node_id).get("data") - if not node_data: - return jsonable_encoder({"status": "error", "message": "Node config not found"}), 404 - - event_name = node_data.get("event_name") - subscription_id = node_data.get("subscription_id") - if not subscription_id: - return jsonable_encoder({"status": "error", "message": "Subscription ID not found"}), 404 - - provider_id = TriggerProviderID(node_data.get("provider_id")) - pool_key: str = PluginTriggerDebugEvent.build_pool_key( - tenant_id=app_model.tenant_id, - provider_id=provider_id, - subscription_id=subscription_id, - event_name=event_name, - ) - event: PluginTriggerDebugEvent | None = TriggerDebugService.poll( - event_type=PluginTriggerDebugEvent, - pool_key=pool_key, - tenant_id=app_model.tenant_id, - user_id=current_user.id, - app_id=app_model.id, - node_id=node_id, + event: PluginTriggerDebugEvent | None = TriggerService.poll_debug_event( + app_model=app_model, user_id=current_user.id, node_id=node_id ) if not event: return jsonable_encoder({"status": "waiting", "retry_in": 2000}) - workflow_args = { - "inputs": event.model_dump(), - "query": "", - "files": [], - } - external_trace_id = get_external_trace_id(request) - if external_trace_id: - workflow_args["external_trace_id"] = external_trace_id - try: response = AppGenerateService.generate( app_model=app_model, user=current_user, - args=workflow_args, + args=TriggerService.build_workflow_args(event), invoke_from=InvokeFrom.DEBUGGER, streaming=True, root_node_id=node_id, @@ -1219,50 +1163,21 @@ class DraftWorkflowTriggerWebhookRunApi(Resource): """ if not isinstance(current_user, Account) or not current_user.has_edit_permission: raise Forbidden() - parser = reqparse.RequestParser() parser.add_argument("node_id", type=str, required=True, location="json", nullable=False) args = parser.parse_args() node_id = args["node_id"] - - pool_key = WebhookDebugEvent.build_pool_key( - tenant_id=app_model.tenant_id, - app_id=app_model.id, - node_id=node_id, + event: WebhookDebugEvent | None = WebhookService.poll_debug_event( + app_model=app_model, user_id=current_user.id, node_id=node_id ) - event: WebhookDebugEvent | None = TriggerDebugService.poll( - event_type=WebhookDebugEvent, - pool_key=pool_key, - tenant_id=app_model.tenant_id, - user_id=current_user.id, - app_id=app_model.id, - node_id=node_id, - ) - if not event: return jsonable_encoder({"status": "waiting", "retry_in": 2000}) - payload = event.payload or {} - workflow_inputs = payload.get("inputs") - if workflow_inputs is None: - webhook_data = payload.get("webhook_data", {}) - workflow_inputs = WebhookService.build_workflow_inputs(webhook_data) - - workflow_args = { - "inputs": workflow_inputs or {}, - "query": "", - "files": [], - } - - external_trace_id = get_external_trace_id(request) - if external_trace_id: - workflow_args["external_trace_id"] = external_trace_id - try: response = AppGenerateService.generate( app_model=app_model, user=current_user, - args=workflow_args, + args=WebhookService.build_workflow_args(event), invoke_from=InvokeFrom.DEBUGGER, streaming=True, root_node_id=node_id, @@ -1412,3 +1327,98 @@ class DraftWorkflowTriggerScheduleRunApi(Resource): "status": "error", } ), 500 + + +@console_ns.route("/apps//workflows/draft/trigger/run-all") +class DraftWorkflowTriggerRunAllApi(Resource): + """ + Full workflow debug - Polling API for trigger events + Path: /apps//workflows/draft/trigger/run-all + """ + + @api.doc("draft_workflow_trigger_run_all") + @api.doc(description="Full workflow debug when the start node is a trigger") + @api.doc(params={"app_id": "Application ID"}) + @api.expect( + api.model( + "DraftWorkflowTriggerRunAllRequest", + { + "node_ids": fields.List(fields.String, required=True, description="Node IDs"), + }, + ) + ) + @api.response(200, "Workflow executed successfully") + @api.response(403, "Permission denied") + @api.response(500, "Internal server error") + @setup_required + @login_required + @account_initialization_required + @get_app_model(mode=[AppMode.WORKFLOW]) + def post(self, app_model: App): + """ + Full workflow debug when the start node is a trigger + """ + if not isinstance(current_user, Account) or not current_user.has_edit_permission: + raise Forbidden() + + parser = reqparse.RequestParser() + parser.add_argument("node_ids", type=list, required=True, location="json", nullable=False) + args = parser.parse_args() + node_ids = args["node_ids"] + workflow_service = WorkflowService() + draft_workflow = workflow_service.get_draft_workflow(app_model) + if not draft_workflow: + raise ValueError("Workflow not found") + workflow_args = None + for node_id in node_ids: + node_config = draft_workflow.get_node_config_by_id(node_id=node_id) + if not node_config: + raise ValueError("Node data not found for node %s", node_id) + node_type = draft_workflow.get_node_type_from_node_config(node_config) + if node_type == NodeType.TRIGGER_PLUGIN: + plugin_trigger_event: PluginTriggerDebugEvent | None = TriggerService.poll_debug_event( + app_model=app_model, user_id=current_user.id, node_id=node_id + ) + workflow_args = ( + TriggerService.build_workflow_args(plugin_trigger_event) if plugin_trigger_event else None + ) + elif node_type == NodeType.TRIGGER_WEBHOOK: + webhook_event: WebhookDebugEvent | None = WebhookService.poll_debug_event( + app_model=app_model, user_id=current_user.id, node_id=node_id + ) + workflow_args = WebhookService.build_workflow_args(webhook_event) if webhook_event else None + elif node_type == NodeType.TRIGGER_SCHEDULE: + schedule_event: ScheduleDebugEvent | None = ScheduleService.poll_debug_event( + app_model=app_model, user_id=current_user.id, node_id=node_id + ) + workflow_args = ( + { + "inputs": schedule_event.inputs, + } + if schedule_event + else None + ) + else: + raise ValueError("Invalid node type %s", node_type) + if workflow_args is None: + return jsonable_encoder({"status": "waiting", "retry_in": 2000}) + + try: + response = AppGenerateService.generate( + app_model=app_model, + user=current_user, + args=workflow_args, + invoke_from=InvokeFrom.DEBUGGER, + streaming=True, + root_node_id=node_id, + ) + return helper.compact_generate_response(response) + except InvokeRateLimitError as ex: + raise InvokeRateLimitHttpError(ex.description) + except Exception: + logger.exception("Error running draft workflow trigger webhook run") + return jsonable_encoder( + { + "status": "error", + } + ), 500 diff --git a/api/core/workflow/nodes/base/node.py b/api/core/workflow/nodes/base/node.py index 15186e69a7..b86f36ccfe 100644 --- a/api/core/workflow/nodes/base/node.py +++ b/api/core/workflow/nodes/base/node.py @@ -122,7 +122,7 @@ class Node: start_event.provider_id = f"{plugin_id}/{provider_name}" start_event.provider_type = getattr(self.get_base_node_data(), "provider_type", "") - + from core.workflow.nodes.trigger_plugin.trigger_plugin_node import TriggerPluginNode if isinstance(self, TriggerPluginNode): diff --git a/api/core/workflow/nodes/trigger_plugin/trigger_plugin_node.py b/api/core/workflow/nodes/trigger_plugin/trigger_plugin_node.py index cfe1b9d728..f54f69831f 100644 --- a/api/core/workflow/nodes/trigger_plugin/trigger_plugin_node.py +++ b/api/core/workflow/nodes/trigger_plugin/trigger_plugin_node.py @@ -2,7 +2,7 @@ from collections.abc import Mapping from typing import Any, Optional from core.workflow.entities.workflow_node_execution import WorkflowNodeExecutionMetadataKey, WorkflowNodeExecutionStatus -from core.workflow.enums import ErrorStrategy, NodeType +from core.workflow.enums import ErrorStrategy, NodeExecutionType, NodeType from core.workflow.node_events import NodeRunResult from core.workflow.nodes.base.entities import BaseNodeData, RetryConfig from core.workflow.nodes.base.node import Node @@ -12,6 +12,7 @@ from .entities import PluginTriggerData class TriggerPluginNode(Node): node_type = NodeType.TRIGGER_PLUGIN + execution_type = NodeExecutionType.ROOT _node_data: PluginTriggerData diff --git a/api/core/workflow/nodes/trigger_schedule/trigger_schedule_node.py b/api/core/workflow/nodes/trigger_schedule/trigger_schedule_node.py index 720da2894e..4fa50f1ead 100644 --- a/api/core/workflow/nodes/trigger_schedule/trigger_schedule_node.py +++ b/api/core/workflow/nodes/trigger_schedule/trigger_schedule_node.py @@ -3,7 +3,7 @@ from datetime import UTC, datetime from typing import Any, Optional from core.workflow.entities.workflow_node_execution import WorkflowNodeExecutionStatus -from core.workflow.enums import ErrorStrategy, NodeType +from core.workflow.enums import ErrorStrategy, NodeExecutionType, NodeType from core.workflow.node_events import NodeRunResult from core.workflow.nodes.base.entities import BaseNodeData, RetryConfig from core.workflow.nodes.base.node import Node @@ -12,6 +12,7 @@ from core.workflow.nodes.trigger_schedule.entities import TriggerScheduleNodeDat class TriggerScheduleNode(Node): node_type = NodeType.TRIGGER_SCHEDULE + execution_type = NodeExecutionType.ROOT _node_data: TriggerScheduleNodeData diff --git a/api/core/workflow/nodes/trigger_webhook/node.py b/api/core/workflow/nodes/trigger_webhook/node.py index 546222aeca..31c478f7d9 100644 --- a/api/core/workflow/nodes/trigger_webhook/node.py +++ b/api/core/workflow/nodes/trigger_webhook/node.py @@ -2,7 +2,7 @@ from collections.abc import Mapping from typing import Any, Optional from core.workflow.entities.workflow_node_execution import WorkflowNodeExecutionStatus -from core.workflow.enums import ErrorStrategy, NodeType +from core.workflow.enums import ErrorStrategy, NodeExecutionType, NodeType from core.workflow.node_events import NodeRunResult from core.workflow.nodes.base.entities import BaseNodeData, RetryConfig from core.workflow.nodes.base.node import Node @@ -12,6 +12,7 @@ from .entities import ContentType, WebhookData class TriggerWebhookNode(Node): node_type = NodeType.TRIGGER_WEBHOOK + execution_type = NodeExecutionType.ROOT _node_data: WebhookData diff --git a/api/services/plugin/plugin_parameter_service.py b/api/services/plugin/plugin_parameter_service.py index b1820e074c..f08bbaf5b8 100644 --- a/api/services/plugin/plugin_parameter_service.py +++ b/api/services/plugin/plugin_parameter_service.py @@ -87,11 +87,13 @@ class PluginParameterService: provider_controller = TriggerManager.get_trigger_provider(tenant_id, TriggerProviderID(provider)) subscription: TriggerProviderSubscriptionApiEntity | SubscriptionBuilder | None if credential_id: - subscription = TriggerSubscriptionBuilderService.get_subscription_builder( - credential_id - ) or TriggerProviderService.get_subscription_by_id(tenant_id, credential_id) + subscription = TriggerSubscriptionBuilderService.get_subscription_builder(credential_id) + if not subscription: + trigger_subscription = TriggerProviderService.get_subscription_by_id(tenant_id, credential_id) + subscription = trigger_subscription.to_api_entity() if trigger_subscription else None else: - subscription = TriggerProviderService.get_subscription_by_id(tenant_id) + trigger_subscription = TriggerProviderService.get_subscription_by_id(tenant_id) + subscription = trigger_subscription.to_api_entity() if trigger_subscription else None if subscription is None: raise ValueError(f"Subscription {credential_id} not found") diff --git a/api/services/trigger/schedule_service.py b/api/services/trigger/schedule_service.py index 333eeb2cc4..c153096505 100644 --- a/api/services/trigger/schedule_service.py +++ b/api/services/trigger/schedule_service.py @@ -11,12 +11,29 @@ from core.workflow.nodes.trigger_schedule.entities import ScheduleConfig, Schedu from core.workflow.nodes.trigger_schedule.exc import ScheduleConfigError, ScheduleNotFoundError from libs.schedule_utils import calculate_next_run_at, convert_12h_to_24h from models.account import Account, TenantAccountJoin +from models.model import App from models.workflow import Workflow, WorkflowSchedulePlan +from services.trigger.trigger_debug_service import ScheduleDebugEvent, TriggerDebugService logger = logging.getLogger(__name__) class ScheduleService: + @classmethod + def poll_debug_event(cls, app_model: App, user_id: str, node_id: str) -> ScheduleDebugEvent | None: + """Poll a debug event for a schedule trigger.""" + pool_key = ScheduleDebugEvent.build_pool_key( + tenant_id=app_model.tenant_id, app_id=app_model.id, node_id=node_id + ) + return TriggerDebugService.poll( + event_type=ScheduleDebugEvent, + pool_key=pool_key, + tenant_id=app_model.tenant_id, + user_id=user_id, + app_id=app_model.id, + node_id=node_id, + ) + @staticmethod def create_schedule( session: Session, diff --git a/api/services/trigger/trigger_debug_service.py b/api/services/trigger/trigger_debug_service.py index cd786bd6d1..c3ab856684 100644 --- a/api/services/trigger/trigger_debug_service.py +++ b/api/services/trigger/trigger_debug_service.py @@ -3,7 +3,8 @@ import hashlib import logging from abc import ABC, abstractmethod -from typing import Any, Optional, TypeVar +from collections.abc import Mapping +from typing import Any, TypeVar from pydantic import BaseModel, Field from redis import RedisError @@ -39,24 +40,25 @@ class BaseDebugEvent(ABC, BaseModel): class PluginTriggerDebugEvent(BaseDebugEvent): """Debug event for plugin triggers.""" + name: str request_id: str subscription_id: str - event_name: str + provider_id: str @classmethod def build_pool_key(cls, **kwargs: Any) -> str: """Generate pool key for plugin trigger events. Args: + name: Event name tenant_id: Tenant ID provider_id: Provider ID subscription_id: Subscription ID - event_name: Event name """ tenant_id = kwargs["tenant_id"] provider_id = kwargs["provider_id"] subscription_id = kwargs["subscription_id"] - event_name = kwargs["event_name"] + event_name = kwargs["name"] return f"plugin_trigger_debug_waiting_pool:{tenant_id}:{str(provider_id)}:{subscription_id}:{event_name}" @@ -82,6 +84,27 @@ class WebhookDebugEvent(BaseDebugEvent): return f"webhook_trigger_debug_waiting_pool:{tenant_id}:{app_id}:{node_id}" +class ScheduleDebugEvent(BaseDebugEvent): + """Debug event for schedule triggers.""" + + node_id: str + inputs: Mapping[str, Any] + + @classmethod + def build_pool_key(cls, **kwargs: Any) -> str: + """Generate pool key for schedule events. + + Args: + tenant_id: Tenant ID + app_id: App ID + node_id: Node ID + """ + tenant_id = kwargs["tenant_id"] + app_id = kwargs["app_id"] + node_id = kwargs["node_id"] + return f"schedule_trigger_debug_waiting_pool:{tenant_id}:{app_id}:{node_id}" + + class TriggerDebugService: """ Unified Redis-based trigger debug service with polling support. @@ -157,7 +180,7 @@ class TriggerDebugService: user_id: str, app_id: str, node_id: str, - ) -> Optional[TEvent]: + ) -> TEvent | None: """ Poll for an event or register to the waiting pool. diff --git a/api/services/trigger/trigger_provider_service.py b/api/services/trigger/trigger_provider_service.py index bac0d77a4e..aeb4d93106 100644 --- a/api/services/trigger/trigger_provider_service.py +++ b/api/services/trigger/trigger_provider_service.py @@ -192,9 +192,7 @@ class TriggerProviderService: raise ValueError(str(e)) @classmethod - def get_subscription_by_id( - cls, tenant_id: str, subscription_id: str | None = None - ) -> TriggerProviderSubscriptionApiEntity | None: + def get_subscription_by_id(cls, tenant_id: str, subscription_id: str | None = None) -> TriggerSubscription | None: """ Get a trigger subscription by the ID. """ @@ -216,7 +214,13 @@ class TriggerProviderService: subscription=subscription, ) subscription.credentials = encrypter.decrypt(subscription.credentials) - return subscription.to_api_entity() + properties_encrypter, _ = create_trigger_provider_encrypter_for_properties( + tenant_id=subscription.tenant_id, + controller=provider_controller, + subscription=subscription, + ) + subscription.properties = properties_encrypter.decrypt(subscription.properties) + return subscription return None @classmethod diff --git a/api/services/trigger/trigger_service.py b/api/services/trigger/trigger_service.py index 4eb068f50e..c08bbeccf6 100644 --- a/api/services/trigger/trigger_service.py +++ b/api/services/trigger/trigger_service.py @@ -2,11 +2,13 @@ import logging import time import uuid from collections.abc import Mapping, Sequence +from typing import Any -from flask import Request, Response +from flask import Request, Response, request from sqlalchemy import and_, func, select from sqlalchemy.orm import Session +from core.helper.trace_id_helper import get_external_trace_id from core.plugin.entities.plugin_daemon import CredentialType from core.plugin.entities.request import TriggerDispatchResponse, TriggerInvokeEventResponse from core.plugin.utils.http_parser import deserialize_request, serialize_request @@ -20,12 +22,15 @@ from extensions.ext_database import db from extensions.ext_storage import storage from models.account import Account, TenantAccountJoin, TenantAccountRole from models.enums import WorkflowRunTriggeredFrom +from models.model import App from models.provider_ids import TriggerProviderID from models.trigger import TriggerSubscription from models.workflow import AppTrigger, AppTriggerStatus, Workflow, WorkflowPluginTrigger from services.async_workflow_service import AsyncWorkflowService +from services.trigger.trigger_debug_service import PluginTriggerDebugEvent, TriggerDebugService from services.trigger.trigger_provider_service import TriggerProviderService from services.workflow.entities import PluginTriggerData, PluginTriggerDispatchData +from services.workflow_service import WorkflowService logger = logging.getLogger(__name__) @@ -35,7 +40,86 @@ class TriggerService: __ENDPOINT_REQUEST_CACHE_COUNT__ = 10 __ENDPOINT_REQUEST_CACHE_EXPIRE_MS__ = 5 * 60 * 1000 - __WEBHOOK_NODE_CACHE_KEY__ = "webhook_nodes" + @classmethod + def invoke_trigger_event( + cls, tenant_id: str, user_id: str, node_config: Mapping[str, Any], event: PluginTriggerDebugEvent + ) -> TriggerInvokeEventResponse: + """Invoke a trigger event.""" + subscription: TriggerSubscription | None = TriggerProviderService.get_subscription_by_id( + tenant_id=tenant_id, + subscription_id=event.subscription_id, + ) + if not subscription: + raise ValueError("Subscription not found") + node_data = node_config.get("data") + if not node_data: + raise ValueError("Node data not found") + request = deserialize_request(storage.load_once(f"triggers/{event.request_id}")) + if not request: + raise ValueError("Request not found") + # invoke triger + return TriggerManager.invoke_trigger_event( + tenant_id=tenant_id, + user_id=user_id, + provider_id=TriggerProviderID(event.provider_id), + event_name=event.name, + parameters=node_data.get("parameters", {}), + credentials=subscription.credentials, + credential_type=CredentialType.of(subscription.credential_type), + subscription=subscription.to_entity(), + request=request, + ) + + @classmethod + def build_workflow_args(cls, event: PluginTriggerDebugEvent) -> Mapping[str, Any]: + """Build workflow args from plugin trigger debug event.""" + workflow_args = { + "inputs": event.model_dump(), + "query": "", + "files": [], + } + external_trace_id = get_external_trace_id(request) + if external_trace_id: + workflow_args["external_trace_id"] = external_trace_id + + return workflow_args + + @classmethod + def poll_debug_event(cls, app_model: App, user_id: str, node_id: str) -> PluginTriggerDebugEvent | None: + """Poll webhook debug event for a given node ID.""" + workflow_service = WorkflowService() + workflow: Workflow | None = workflow_service.get_draft_workflow( + app_model=app_model, + workflow_id=None, + ) + + if not workflow: + raise ValueError("Workflow not found") + + node_data = workflow.get_node_config_by_id(node_id=node_id).get("data") + if not node_data: + raise ValueError("Node config not found") + + event_name = node_data.get("event_name") + subscription_id = node_data.get("subscription_id") + if not subscription_id: + raise ValueError("Subscription ID not found") + + provider_id = TriggerProviderID(node_data.get("provider_id")) + pool_key: str = PluginTriggerDebugEvent.build_pool_key( + name=event_name, + provider_id=provider_id, + tenant_id=app_model.tenant_id, + subscription_id=subscription_id, + ) + return TriggerDebugService.poll( + event_type=PluginTriggerDebugEvent, + pool_key=pool_key, + tenant_id=app_model.tenant_id, + user_id=user_id, + app_id=app_model.id, + node_id=node_id, + ) @classmethod def _get_latest_workflows_by_app_ids( @@ -129,7 +213,7 @@ class TriggerService: user_id=subscription.user_id, provider_id=TriggerProviderID(subscription.provider_id), event_name=event.identity.name, - parameters=event_node.get("config", {}), + parameters=event_node.get("config", {}).get("parameters", {}), credentials=subscription.credentials, credential_type=CredentialType.of(subscription.credential_type), subscription=subscription.to_entity(), diff --git a/api/services/trigger/webhook_service.py b/api/services/trigger/webhook_service.py index 4038107899..1ffc1c4d80 100644 --- a/api/services/trigger/webhook_service.py +++ b/api/services/trigger/webhook_service.py @@ -13,6 +13,7 @@ from werkzeug.exceptions import RequestEntityTooLarge from configs import dify_config from core.file.models import FileTransferMethod +from core.helper.trace_id_helper import get_external_trace_id from core.tools.tool_file_manager import ToolFileManager from core.variables.types import SegmentType from core.workflow.enums import NodeType @@ -24,6 +25,7 @@ from models.enums import WorkflowRunTriggeredFrom from models.model import App from models.workflow import AppTrigger, AppTriggerStatus, AppTriggerType, Workflow, WorkflowWebhookTrigger from services.async_workflow_service import AsyncWorkflowService +from services.trigger.trigger_debug_service import TriggerDebugService, WebhookDebugEvent from services.workflow.entities import TriggerData logger = logging.getLogger(__name__) @@ -35,6 +37,53 @@ class WebhookService: __WEBHOOK_NODE_CACHE_KEY__ = "webhook_nodes" MAX_WEBHOOK_NODES_PER_WORKFLOW = 5 # Maximum allowed webhook nodes per workflow + @classmethod + def build_workflow_args(cls, event: WebhookDebugEvent) -> Mapping[str, Any]: + """Build workflow args from webhook debug event.""" + payload = event.payload or {} + workflow_inputs = payload.get("inputs") + if workflow_inputs is None: + webhook_data = payload.get("webhook_data", {}) + workflow_inputs = WebhookService.build_workflow_inputs(webhook_data) + + workflow_args = { + "inputs": workflow_inputs or {}, + "query": "", + "files": [], + } + + external_trace_id = get_external_trace_id(request) + if external_trace_id: + workflow_args["external_trace_id"] = external_trace_id + + return workflow_args + + @classmethod + def poll_debug_event(cls, app_model: App, user_id: str, node_id: str) -> WebhookDebugEvent | None: + """Poll webhook debug event for a given node ID. + + Args: + app_model: The app model + user_id: The user ID + node_id: The node ID to poll for + + Returns: + WebhookDebugEvent | None: The webhook debug event if available, None otherwise + """ + pool_key = WebhookDebugEvent.build_pool_key( + tenant_id=app_model.tenant_id, + app_id=app_model.id, + node_id=node_id, + ) + return TriggerDebugService.poll( + event_type=WebhookDebugEvent, + pool_key=pool_key, + tenant_id=app_model.tenant_id, + user_id=user_id, + app_id=app_model.id, + node_id=node_id, + ) + @classmethod def get_webhook_trigger_and_workflow( cls, webhook_id: str, is_debug: bool = False diff --git a/api/tasks/trigger_processing_tasks.py b/api/tasks/trigger_processing_tasks.py index cee064f88a..1b21e6ad4f 100644 --- a/api/tasks/trigger_processing_tasks.py +++ b/api/tasks/trigger_processing_tasks.py @@ -140,16 +140,17 @@ def dispatch_triggered_workflows_async( try: for event_name in events: pool_key: str = PluginTriggerDebugEvent.build_pool_key( + name=event_name, tenant_id=subscription.tenant_id, subscription_id=subscription_id, - event_name=event_name, provider_id=provider_id, ) event = PluginTriggerDebugEvent( + provider_id=provider_id, subscription_id=subscription_id, request_id=request_id, timestamp=timestamp, - event_name=event_name, + name=event_name, ) debug_dispatched += TriggerDebugService.dispatch( tenant_id=subscription.tenant_id, diff --git a/api/tasks/workflow_schedule_tasks.py b/api/tasks/workflow_schedule_tasks.py index da8dbfe3fa..1c06649eae 100644 --- a/api/tasks/workflow_schedule_tasks.py +++ b/api/tasks/workflow_schedule_tasks.py @@ -1,4 +1,5 @@ import logging +import time from datetime import UTC, datetime from zoneinfo import ZoneInfo @@ -15,6 +16,7 @@ from models.enums import WorkflowRunTriggeredFrom from models.workflow import WorkflowSchedulePlan from services.async_workflow_service import AsyncWorkflowService from services.trigger.schedule_service import ScheduleService +from services.trigger.trigger_debug_service import ScheduleDebugEvent, TriggerDebugService from services.workflow.entities import TriggerData logger = logging.getLogger(__name__) @@ -50,6 +52,7 @@ def run_schedule_trigger(schedule_id: str) -> None: current_in_tz = current_utc.astimezone(schedule_tz) inputs = {"current_time": current_in_tz.isoformat()} + # Production dispatch: Trigger the workflow normally response = AsyncWorkflowService.trigger_workflow_async( session=session, user=tenant_owner, @@ -63,6 +66,38 @@ def run_schedule_trigger(schedule_id: str) -> None: ) logger.info("Schedule %s triggered workflow: %s", schedule_id, response.workflow_trigger_log_id) + # Debug dispatch: Send event to waiting debug listeners (if any) + try: + event = ScheduleDebugEvent( + timestamp=int(time.time()), + node_id=schedule.node_id, + inputs=inputs, + ) + pool_key = ScheduleDebugEvent.build_pool_key( + tenant_id=schedule.tenant_id, + app_id=schedule.app_id, + node_id=schedule.node_id, + ) + dispatched_count = TriggerDebugService.dispatch( + tenant_id=schedule.tenant_id, + event=event, + pool_key=pool_key, + ) + if dispatched_count > 0: + logger.debug( + "Dispatched schedule debug event to %d listener(s) for schedule %s", + dispatched_count, + schedule_id, + ) + except Exception as debug_error: + # Debug dispatch failure should not affect production workflow execution + logger.warning( + "Failed to dispatch debug event for schedule %s: %s", + schedule_id, + str(debug_error), + exc_info=True, + ) + except Exception as e: raise ScheduleExecutionError( f"Failed to trigger workflow for schedule {schedule_id}, app {schedule.app_id}" diff --git a/web/app/components/workflow/nodes/_base/hooks/use-one-step-run.ts b/web/app/components/workflow/nodes/_base/hooks/use-one-step-run.ts index df7a256871..dd1b39f6d8 100644 --- a/web/app/components/workflow/nodes/_base/hooks/use-one-step-run.ts +++ b/web/app/components/workflow/nodes/_base/hooks/use-one-step-run.ts @@ -381,18 +381,10 @@ const useOneStepRun = ({ headers.set('Authorization', `Bearer ${accessToken}`) headers.set('Content-Type', 'application/json') - // Reason: Plugin trigger requires event_name, subscription_id, provider_id from node data - const requestBody = { - event_name: (data as any).event_name, - subscription_id: (data as any).subscription_id, - provider_id: (data as any).provider_id, - } - const response = await fetch(urlWithPrefix, { ...baseOptions, method: 'POST', headers, - body: JSON.stringify(requestBody), signal: controller.signal, })