From 06c91fbcbd95eb66c7d82a8365df655cc7137deb Mon Sep 17 00:00:00 2001 From: Harry Date: Wed, 15 Oct 2025 14:41:53 +0800 Subject: [PATCH] refactor(trigger): Unify the Trigger Debug interface and event handling and enhance error management - Updated `DraftWorkflowTriggerNodeApi` to utilize the new `TriggerDebugEvent` and `TriggerDebugEventPoller` for improved event polling. - Removed deprecated `poll_debug_event` methods from `TriggerService`, `ScheduleService`, and `WebhookService`, consolidating functionality into the new event structure. - Enhanced error handling in `invoke_trigger_event` to utilize `TriggerPluginInvokeError` for better clarity on invocation issues. - Updated frontend API routes to reflect changes in trigger event handling, ensuring consistency across the application. --- .gitignore | 3 + api/controllers/console/app/workflow.py | 370 +++++------------- .../console/workspace/trigger_providers.py | 4 +- api/controllers/trigger/webhook.py | 5 +- .../trigger/debug/event_bus.py} | 103 +---- api/core/trigger/debug/event_selectors.py | 180 +++++++++ api/core/trigger/debug/events.py | 92 +++++ api/core/trigger/errors.py | 7 + api/core/trigger/provider.py | 9 +- api/core/trigger/trigger_manager.py | 24 +- .../workflow/nodes/trigger_plugin/entities.py | 2 +- .../trigger_plugin/trigger_plugin_node.py | 6 +- .../sync_plugin_trigger_when_app_created.py | 4 +- api/services/trigger/schedule_service.py | 17 - api/services/trigger/trigger_service.py | 259 +++++++++--- .../trigger_subscription_builder_service.py | 3 +- api/services/trigger/webhook_service.py | 49 --- .../workflow_plugin_trigger_service.py | 204 ---------- api/services/workflow_service.py | 4 +- api/tasks/trigger_processing_tasks.py | 5 +- api/tasks/workflow_schedule_tasks.py | 5 +- .../workflow-app/hooks/use-workflow-run.ts | 16 +- .../nodes/_base/hooks/use-one-step-run.ts | 4 +- 23 files changed, 622 insertions(+), 753 deletions(-) rename api/{services/trigger/trigger_debug_service.py => core/trigger/debug/event_bus.py} (56%) create mode 100644 api/core/trigger/debug/event_selectors.py create mode 100644 api/core/trigger/debug/events.py delete mode 100644 api/services/trigger/workflow_plugin_trigger_service.py diff --git a/.gitignore b/.gitignore index de31bf9a11..32ec6914cd 100644 --- a/.gitignore +++ b/.gitignore @@ -6,6 +6,9 @@ __pycache__/ # C extensions *.so +# *db files +*.db + # Distribution / packaging .Python build/ diff --git a/api/controllers/console/app/workflow.py b/api/controllers/console/app/workflow.py index e57b5e7361..6e0ddfa43e 100644 --- a/api/controllers/console/app/workflow.py +++ b/api/controllers/console/app/workflow.py @@ -20,7 +20,15 @@ from core.app.entities.app_invoke_entities import InvokeFrom from core.file.models import File from core.helper.trace_id_helper import get_external_trace_id from core.model_runtime.utils.encoders import jsonable_encoder -from core.plugin.entities.request import TriggerInvokeEventResponse +from core.plugin.impl.exc import PluginInvokeError +from core.trigger.debug.event_selectors import ( + TriggerDebugEvent, + TriggerDebugEventPoller, + create_event_poller, + select_trigger_debug_events, +) +from core.trigger.errors import TriggerPluginInvokeError +from core.workflow.enums import NodeType from core.workflow.graph_engine.manager import GraphEngineManager from extensions.ext_database import db from factories import file_factory, variable_factory @@ -33,22 +41,14 @@ from libs.login import current_user, login_required from models import App from models.account import Account from models.model import AppMode -from models.workflow import NodeType, Workflow +from models.workflow import Workflow from services.app_generate_service import AppGenerateService from services.errors.app import WorkflowHashNotEqualError from services.errors.llm import InvokeRateLimitError -from services.trigger.schedule_service import ScheduleService -from services.trigger.trigger_debug_service import ( - PluginTriggerDebugEvent, - ScheduleDebugEvent, - TriggerDebugService, - WebhookDebugEvent, -) -from services.trigger.trigger_service import TriggerService -from services.trigger.webhook_service import WebhookService from services.workflow_service import DraftWorkflowDeletionError, WorkflowInUseError, WorkflowService logger = logging.getLogger(__name__) +LISTENING_RETRY_IN = 2000 # TODO(QuantumGhost): Refactor existing node run API to handle file parameter parsing @@ -1003,11 +1003,11 @@ class DraftWorkflowNodeLastRunApi(Resource): return node_exec -@console_ns.route("/apps//workflows/draft/nodes//trigger") +@console_ns.route("/apps//workflows/draft/nodes//trigger/run") class DraftWorkflowTriggerNodeApi(Resource): """ Single node debug - Polling API for trigger events - Path: /apps//workflows/draft/nodes//trigger + Path: /apps//workflows/draft/nodes//trigger/run """ @api.doc("poll_draft_workflow_trigger_node") @@ -1027,32 +1027,41 @@ class DraftWorkflowTriggerNodeApi(Resource): if not isinstance(current_user, Account) or not current_user.has_edit_permission: raise Forbidden() - event: PluginTriggerDebugEvent | None = TriggerService.poll_debug_event( - app_model=app_model, user_id=current_user.id, node_id=node_id - ) - if not event: - return jsonable_encoder({"status": "waiting"}) + workflow_service = WorkflowService() + draft_workflow = workflow_service.get_draft_workflow(app_model) + if not draft_workflow: + raise ValueError("Workflow not found") - try: - workflow_service = WorkflowService() - draft_workflow = workflow_service.get_draft_workflow(app_model) - if not draft_workflow: - raise ValueError("Workflow not found") - - response: TriggerInvokeEventResponse = TriggerService.invoke_trigger_event( - event=event, - user_id=current_user.id, - tenant_id=app_model.tenant_id, - node_config=draft_workflow.get_node_config_by_id(node_id=node_id), + node_config = draft_workflow.get_node_config_by_id(node_id=node_id) + if not node_config: + raise ValueError("Node data not found for node %s", node_id) + node_type: NodeType = draft_workflow.get_node_type_from_node_config(node_config) + event: TriggerDebugEvent | None = None + # for schedule trigger, when run single node, just execute directly + if node_type == NodeType.TRIGGER_SCHEDULE: + event = TriggerDebugEvent( + workflow_args={}, + node_id=node_id, ) - if response.cancelled: - return jsonable_encoder({"status": "cancelled"}) + # for other trigger types, poll for the event + else: + poller: TriggerDebugEventPoller = create_event_poller( + draft_workflow=draft_workflow, + tenant_id=app_model.tenant_id, + user_id=current_user.id, + app_id=app_model.id, + node_id=node_id, + ) + event = poller.poll() + if not event: + return jsonable_encoder({"status": "waiting", "retry_in": LISTENING_RETRY_IN}) + try: node_execution = workflow_service.run_draft_workflow_node( app_model=app_model, draft_workflow=draft_workflow, node_id=node_id, - user_inputs=response.variables, + user_inputs=event.workflow_args, account=current_user, query="", files=[], @@ -1104,224 +1113,46 @@ class DraftWorkflowTriggerRunApi(Resource): parser.add_argument("node_id", type=str, required=True, location="json", nullable=False) args = parser.parse_args() node_id = args["node_id"] + workflow_service = WorkflowService() + draft_workflow = workflow_service.get_draft_workflow(app_model) + if not draft_workflow: + raise ValueError("Workflow not found") - event: PluginTriggerDebugEvent | None = TriggerService.poll_debug_event( - app_model=app_model, user_id=current_user.id, node_id=node_id - ) - if not event: - return jsonable_encoder({"status": "waiting", "retry_in": 2000}) - - try: - response = AppGenerateService.generate( - app_model=app_model, - user=current_user, - args=TriggerService.build_workflow_args(event), - invoke_from=InvokeFrom.DEBUGGER, - streaming=True, - root_node_id=node_id, - ) - return helper.compact_generate_response(response) - except InvokeRateLimitError as ex: - raise InvokeRateLimitHttpError(ex.description) - except Exception: - logger.exception("Error running draft workflow trigger run") - return jsonable_encoder( - { - "status": "error", - } - ), 500 - - -@console_ns.route("/apps//workflows/draft/trigger/webhook/run") -class DraftWorkflowTriggerWebhookRunApi(Resource): - """ - Full workflow debug when the start node is a webhook trigger - Path: /apps//workflows/draft/trigger/webhook/run - """ - - @api.doc("draft_workflow_trigger_webhook_run") - @api.doc(description="Full workflow debug when the start node is a webhook trigger") - @api.doc(params={"app_id": "Application ID"}) - @api.expect( - api.model( - "DraftWorkflowTriggerWebhookRunRequest", - { - "node_id": fields.String(required=True, description="Node ID"), - }, - ) - ) - @api.response(200, "Workflow executed successfully") - @api.response(403, "Permission denied") - @api.response(500, "Internal server error") - @setup_required - @login_required - @account_initialization_required - @get_app_model(mode=[AppMode.WORKFLOW]) - def post(self, app_model: App): - """ - Full workflow debug when the start node is a webhook trigger - """ - if not isinstance(current_user, Account) or not current_user.has_edit_permission: - raise Forbidden() - parser = reqparse.RequestParser() - parser.add_argument("node_id", type=str, required=True, location="json", nullable=False) - args = parser.parse_args() - node_id = args["node_id"] - event: WebhookDebugEvent | None = WebhookService.poll_debug_event( - app_model=app_model, user_id=current_user.id, node_id=node_id - ) - if not event: - return jsonable_encoder({"status": "waiting", "retry_in": 2000}) - - try: - response = AppGenerateService.generate( - app_model=app_model, - user=current_user, - args=WebhookService.build_workflow_args(event), - invoke_from=InvokeFrom.DEBUGGER, - streaming=True, - root_node_id=node_id, - ) - return helper.compact_generate_response(response) - except InvokeRateLimitError as ex: - raise InvokeRateLimitHttpError(ex.description) - except Exception: - logger.exception("Error running draft workflow trigger webhook run") - return jsonable_encoder( - { - "status": "error", - } - ), 500 - - -@console_ns.route("/apps//workflows/draft/nodes//debug/webhook/run") -class DraftWorkflowNodeWebhookDebugRunApi(Resource): - """Single node debug when the node is a webhook trigger.""" - - @api.doc("draft_workflow_node_webhook_debug_run") - @api.doc(description="Poll for webhook debug payload and execute single node when event arrives") - @api.doc(params={"app_id": "Application ID", "node_id": "Node ID"}) - @api.response(200, "Node executed successfully") - @api.response(403, "Permission denied") - @api.response(400, "Invalid node type") - @api.response(500, "Internal server error") - @setup_required - @login_required - @account_initialization_required - @get_app_model(mode=[AppMode.WORKFLOW]) - def post(self, app_model: App, node_id: str): - if not isinstance(current_user, Account) or not current_user.has_edit_permission: - raise Forbidden() - - pool_key = WebhookDebugEvent.build_pool_key( - tenant_id=app_model.tenant_id, - app_id=app_model.id, - node_id=node_id, - ) - event: WebhookDebugEvent | None = TriggerDebugService.poll( - event_type=WebhookDebugEvent, - pool_key=pool_key, + poller: TriggerDebugEventPoller = create_event_poller( + draft_workflow=draft_workflow, tenant_id=app_model.tenant_id, user_id=current_user.id, app_id=app_model.id, node_id=node_id, ) - - if not event: - return jsonable_encoder({"status": "waiting", "retry_in": 2000}) - - workflow_service = WorkflowService() - draft_workflow = workflow_service.get_draft_workflow(app_model=app_model) - - if not draft_workflow: - raise DraftWorkflowNotExist() - - node_config = draft_workflow.get_node_config_by_id(node_id) - node_type = Workflow.get_node_type_from_node_config(node_config) - if node_type != NodeType.TRIGGER_WEBHOOK: + event: TriggerDebugEvent | None = None + try: + event = poller.poll() + if not event: + return jsonable_encoder({"status": "waiting", "retry_in": LISTENING_RETRY_IN}) + return helper.compact_generate_response( + AppGenerateService.generate( + app_model=app_model, + user=current_user, + args=event.workflow_args, + invoke_from=InvokeFrom.DEBUGGER, + streaming=True, + root_node_id=node_id, + ) + ) + except InvokeRateLimitError as ex: + raise InvokeRateLimitHttpError(ex.description) + except TriggerPluginInvokeError as e: + logger.exception("Error invoking trigger event") return jsonable_encoder( { "status": "error", - "message": "node is not webhook trigger", + "error": e.get_error_message(), + "error_type": e.get_error_type(), } - ), 400 - - payload = event.payload or {} - workflow_inputs = payload.get("inputs") - if workflow_inputs is None: - webhook_data = payload.get("webhook_data", {}) - workflow_inputs = WebhookService.build_workflow_inputs(webhook_data) - - workflow_node_execution = workflow_service.run_draft_workflow_node( - app_model=app_model, - draft_workflow=draft_workflow, - node_id=node_id, - user_inputs=workflow_inputs or {}, - account=current_user, - query="", - files=[], - ) - - return jsonable_encoder(workflow_node_execution) - - -@console_ns.route("/apps//workflows/draft/trigger/schedule/run") -class DraftWorkflowTriggerScheduleRunApi(Resource): - """ - Full workflow debug when the start node is a schedule trigger - Path: /apps//workflows/draft/trigger/schedule/run - """ - - @api.doc("draft_workflow_trigger_schedule_run") - @api.doc(description="Full workflow debug when the start node is a schedule trigger") - @api.doc(params={"app_id": "Application ID"}) - @api.expect( - api.model( - "DraftWorkflowTriggerScheduleRunRequest", - { - "node_id": fields.String(required=True, description="Node ID"), - }, - ) - ) - @api.response(200, "Workflow executed successfully") - @api.response(403, "Permission denied") - @api.response(500, "Internal server error") - @setup_required - @login_required - @account_initialization_required - @get_app_model(mode=[AppMode.WORKFLOW]) - def post(self, app_model: App): - """ - Full workflow debug when the start node is a schedule trigger - """ - if not isinstance(current_user, Account) or not current_user.has_edit_permission: - raise Forbidden() - - parser = reqparse.RequestParser() - parser.add_argument("node_id", type=str, required=True, location="json", nullable=False) - args = parser.parse_args() - node_id = args["node_id"] - - workflow_args = { - "inputs": {}, - "query": "", - "files": [], - } - - try: - response = AppGenerateService.generate( - app_model=app_model, - user=current_user, - args=workflow_args, - invoke_from=InvokeFrom.DEBUGGER, - streaming=True, - root_node_id=node_id, - ) - return helper.compact_generate_response(response) - except InvokeRateLimitError as ex: - raise InvokeRateLimitHttpError(ex.description) + ), 500 except Exception: - logger.exception("Error running draft workflow trigger schedule run") + logger.exception("Error running draft workflow trigger run") return jsonable_encoder( { "status": "error", @@ -1369,48 +1200,39 @@ class DraftWorkflowTriggerRunAllApi(Resource): draft_workflow = workflow_service.get_draft_workflow(app_model) if not draft_workflow: raise ValueError("Workflow not found") - workflow_args = None - for node_id in node_ids: - node_config = draft_workflow.get_node_config_by_id(node_id=node_id) - if not node_config: - raise ValueError("Node data not found for node %s", node_id) - node_type = draft_workflow.get_node_type_from_node_config(node_config) - if node_type == NodeType.TRIGGER_PLUGIN: - plugin_trigger_event: PluginTriggerDebugEvent | None = TriggerService.poll_debug_event( - app_model=app_model, user_id=current_user.id, node_id=node_id - ) - workflow_args = ( - TriggerService.build_workflow_args(plugin_trigger_event) if plugin_trigger_event else None - ) - elif node_type == NodeType.TRIGGER_WEBHOOK: - webhook_event: WebhookDebugEvent | None = WebhookService.poll_debug_event( - app_model=app_model, user_id=current_user.id, node_id=node_id - ) - workflow_args = WebhookService.build_workflow_args(webhook_event) if webhook_event else None - elif node_type == NodeType.TRIGGER_SCHEDULE: - schedule_event: ScheduleDebugEvent | None = ScheduleService.poll_debug_event( - app_model=app_model, user_id=current_user.id, node_id=node_id - ) - workflow_args = ( - { - "inputs": schedule_event.inputs, - } - if schedule_event - else None - ) - else: - raise ValueError("Invalid node type %s", node_type) - if workflow_args is None: - return jsonable_encoder({"status": "waiting", "retry_in": 2000}) + + try: + trigger_debug_event: TriggerDebugEvent | None = select_trigger_debug_events( + draft_workflow=draft_workflow, app_model=app_model, user_id=current_user.id, node_ids=node_ids + ) + except PluginInvokeError as e: + logger.exception("Error selecting trigger debug event") + return jsonable_encoder( + { + "status": "error", + "error": e.get_error_message(), + "error_type": e.get_error_type(), + } + ), 500 + except Exception as e: + logger.exception("Error polling trigger debug event") + return jsonable_encoder( + { + "status": "error", + } + ), 500 + + if trigger_debug_event is None: + return jsonable_encoder({"status": "waiting", "retry_in": LISTENING_RETRY_IN}) try: response = AppGenerateService.generate( app_model=app_model, user=current_user, - args=workflow_args, + args=trigger_debug_event.workflow_args, invoke_from=InvokeFrom.DEBUGGER, streaming=True, - root_node_id=node_id, + root_node_id=trigger_debug_event.node_id, ) return helper.compact_generate_response(response) except InvokeRateLimitError as ex: diff --git a/api/controllers/console/workspace/trigger_providers.py b/api/controllers/console/workspace/trigger_providers.py index 6544071076..6e8e6c97f3 100644 --- a/api/controllers/console/workspace/trigger_providers.py +++ b/api/controllers/console/workspace/trigger_providers.py @@ -19,8 +19,8 @@ from models.account import Account from models.provider_ids import TriggerProviderID from services.plugin.oauth_service import OAuthProxyService from services.trigger.trigger_provider_service import TriggerProviderService +from services.trigger.trigger_service import TriggerService from services.trigger.trigger_subscription_builder_service import TriggerSubscriptionBuilderService -from services.trigger.workflow_plugin_trigger_service import WorkflowPluginTriggerService logger = logging.getLogger(__name__) @@ -279,7 +279,7 @@ class TriggerSubscriptionDeleteApi(Resource): subscription_id=subscription_id, ) # Delete plugin triggers - WorkflowPluginTriggerService.delete_plugin_trigger_by_subscription( + TriggerService.delete_plugin_trigger_by_subscription( session=session, tenant_id=user.current_tenant_id, subscription_id=subscription_id, diff --git a/api/controllers/trigger/webhook.py b/api/controllers/trigger/webhook.py index bd354cd9b7..35f97daa0f 100644 --- a/api/controllers/trigger/webhook.py +++ b/api/controllers/trigger/webhook.py @@ -5,7 +5,8 @@ from flask import jsonify from werkzeug.exceptions import NotFound, RequestEntityTooLarge from controllers.trigger import bp -from services.trigger.trigger_debug_service import TriggerDebugService, WebhookDebugEvent +from core.trigger.debug.event_bus import TriggerDebugEventBus +from core.trigger.debug.events import WebhookDebugEvent from services.trigger.webhook_service import WebhookService logger = logging.getLogger(__name__) @@ -87,7 +88,7 @@ def handle_webhook_debug(webhook_id: str): "method": webhook_data.get("method"), }, ) - TriggerDebugService.dispatch( + TriggerDebugEventBus.dispatch( tenant_id=webhook_trigger.tenant_id, event=event, pool_key=pool_key, diff --git a/api/services/trigger/trigger_debug_service.py b/api/core/trigger/debug/event_bus.py similarity index 56% rename from api/services/trigger/trigger_debug_service.py rename to api/core/trigger/debug/event_bus.py index c3ab856684..ffb9dabc92 100644 --- a/api/services/trigger/trigger_debug_service.py +++ b/api/core/trigger/debug/event_bus.py @@ -1,111 +1,20 @@ -"""Trigger debug service supporting plugin and webhook debugging in draft workflows.""" - import hashlib import logging -from abc import ABC, abstractmethod -from collections.abc import Mapping -from typing import Any, TypeVar +from typing import TypeVar -from pydantic import BaseModel, Field from redis import RedisError +from core.trigger.debug.events import BaseDebugEvent from extensions.ext_redis import redis_client logger = logging.getLogger(__name__) TRIGGER_DEBUG_EVENT_TTL = 300 -TEvent = TypeVar("TEvent", bound="BaseDebugEvent") +TTriggerDebugEvent = TypeVar("TTriggerDebugEvent", bound="BaseDebugEvent") -class BaseDebugEvent(ABC, BaseModel): - """Base class for all debug events.""" - - timestamp: int - - @classmethod - @abstractmethod - def build_pool_key(cls, **kwargs: Any) -> str: - """ - Generate the waiting pool key for this event type. - - Each subclass implements its own pool key strategy based on routing parameters. - - Returns: - Redis key for the waiting pool - """ - raise NotImplementedError("Subclasses must implement build_pool_key") - - -class PluginTriggerDebugEvent(BaseDebugEvent): - """Debug event for plugin triggers.""" - - name: str - request_id: str - subscription_id: str - provider_id: str - - @classmethod - def build_pool_key(cls, **kwargs: Any) -> str: - """Generate pool key for plugin trigger events. - - Args: - name: Event name - tenant_id: Tenant ID - provider_id: Provider ID - subscription_id: Subscription ID - """ - tenant_id = kwargs["tenant_id"] - provider_id = kwargs["provider_id"] - subscription_id = kwargs["subscription_id"] - event_name = kwargs["name"] - return f"plugin_trigger_debug_waiting_pool:{tenant_id}:{str(provider_id)}:{subscription_id}:{event_name}" - - -class WebhookDebugEvent(BaseDebugEvent): - """Debug event for webhook triggers.""" - - request_id: str - node_id: str - payload: dict[str, Any] = Field(default_factory=dict) - - @classmethod - def build_pool_key(cls, **kwargs: Any) -> str: - """Generate pool key for webhook events. - - Args: - tenant_id: Tenant ID - app_id: App ID - node_id: Node ID - """ - tenant_id = kwargs["tenant_id"] - app_id = kwargs["app_id"] - node_id = kwargs["node_id"] - return f"webhook_trigger_debug_waiting_pool:{tenant_id}:{app_id}:{node_id}" - - -class ScheduleDebugEvent(BaseDebugEvent): - """Debug event for schedule triggers.""" - - node_id: str - inputs: Mapping[str, Any] - - @classmethod - def build_pool_key(cls, **kwargs: Any) -> str: - """Generate pool key for schedule events. - - Args: - tenant_id: Tenant ID - app_id: App ID - node_id: Node ID - """ - tenant_id = kwargs["tenant_id"] - app_id = kwargs["app_id"] - node_id = kwargs["node_id"] - return f"schedule_trigger_debug_waiting_pool:{tenant_id}:{app_id}:{node_id}" - - -class TriggerDebugService: +class TriggerDebugEventBus: """ Unified Redis-based trigger debug service with polling support. @@ -174,13 +83,13 @@ class TriggerDebugService: @classmethod def poll( cls, - event_type: type[TEvent], + event_type: type[TTriggerDebugEvent], pool_key: str, tenant_id: str, user_id: str, app_id: str, node_id: str, - ) -> TEvent | None: + ) -> TTriggerDebugEvent | None: """ Poll for an event or register to the waiting pool. diff --git a/api/core/trigger/debug/event_selectors.py b/api/core/trigger/debug/event_selectors.py new file mode 100644 index 0000000000..9c355feb3b --- /dev/null +++ b/api/core/trigger/debug/event_selectors.py @@ -0,0 +1,180 @@ +"""Trigger debug service supporting plugin and webhook debugging in draft workflows.""" + +import logging +from abc import ABC, abstractmethod +from collections.abc import Mapping +from typing import Any + +from pydantic import BaseModel + +from core.plugin.entities.request import TriggerInvokeEventResponse +from core.trigger.debug.event_bus import TriggerDebugEventBus +from core.trigger.debug.events import PluginTriggerDebugEvent, ScheduleDebugEvent, WebhookDebugEvent +from core.workflow.enums import NodeType +from core.workflow.nodes.trigger_plugin.entities import PluginTriggerNodeData +from models.model import App +from models.provider_ids import TriggerProviderID +from models.workflow import Workflow +from services.trigger.trigger_service import TriggerService +from services.trigger.webhook_service import WebhookService + +logger = logging.getLogger(__name__) + + +class TriggerDebugEvent(BaseModel): + workflow_args: Mapping[str, Any] + node_id: str + + +class TriggerDebugEventPoller(ABC): + app_id: str + user_id: str + tenant_id: str + node_config: Mapping[str, Any] + node_id: str + + def __init__(self, tenant_id: str, user_id: str, app_id: str, node_config: Mapping[str, Any], node_id: str): + self.tenant_id = tenant_id + self.user_id = user_id + self.app_id = app_id + self.node_config = node_config + self.node_id = node_id + + @abstractmethod + def poll(self) -> TriggerDebugEvent | None: + raise NotImplementedError + + +class PluginTriggerDebugEventPoller(TriggerDebugEventPoller): + def poll(self) -> TriggerDebugEvent | None: + plugin_trigger_data = PluginTriggerNodeData.model_validate(self.node_config.get("data", {})) + provider_id = TriggerProviderID(plugin_trigger_data.provider_id) + pool_key: str = PluginTriggerDebugEvent.build_pool_key( + name=plugin_trigger_data.event_name, + provider_id=provider_id, + tenant_id=self.tenant_id, + subscription_id=plugin_trigger_data.subscription_id, + ) + plugin_trigger_event: PluginTriggerDebugEvent | None = TriggerDebugEventBus.poll( + event_type=PluginTriggerDebugEvent, + pool_key=pool_key, + tenant_id=self.tenant_id, + user_id=self.user_id, + app_id=self.app_id, + node_id=self.node_id, + ) + if not plugin_trigger_event: + return None + trigger_event_response: TriggerInvokeEventResponse = TriggerService.invoke_trigger_event( + event=plugin_trigger_event, + user_id=self.user_id, + tenant_id=self.tenant_id, + node_config=self.node_config, + ) + + if trigger_event_response.cancelled: + return None + + return TriggerDebugEvent( + workflow_args={ + "inputs": trigger_event_response.variables, + "query": "", + "files": [], + }, + node_id=self.node_id, + ) + + +class WebhookTriggerDebugEventPoller(TriggerDebugEventPoller): + def poll(self) -> TriggerDebugEvent | None: + pool_key = WebhookDebugEvent.build_pool_key( + tenant_id=self.tenant_id, + app_id=self.app_id, + node_id=self.node_id, + ) + webhook_event: WebhookDebugEvent | None = TriggerDebugEventBus.poll( + event_type=WebhookDebugEvent, + pool_key=pool_key, + tenant_id=self.tenant_id, + user_id=self.user_id, + app_id=self.app_id, + node_id=self.node_id, + ) + if not webhook_event: + return None + + payload = webhook_event.payload or {} + workflow_inputs = payload.get("inputs") + if workflow_inputs is None: + webhook_data = payload.get("webhook_data", {}) + workflow_inputs = WebhookService.build_workflow_inputs(webhook_data) + + workflow_args = { + "inputs": workflow_inputs or {}, + "query": "", + "files": [], + } + return TriggerDebugEvent(workflow_args=workflow_args, node_id=self.node_id) + + +class ScheduleTriggerDebugEventPoller(TriggerDebugEventPoller): + def poll(self) -> TriggerDebugEvent | None: + pool_key: str = ScheduleDebugEvent.build_pool_key( + tenant_id=self.tenant_id, app_id=self.app_id, node_id=self.node_id + ) + schedule_event: ScheduleDebugEvent | None = TriggerDebugEventBus.poll( + event_type=ScheduleDebugEvent, + pool_key=pool_key, + tenant_id=self.tenant_id, + user_id=self.user_id, + app_id=self.app_id, + node_id=self.node_id, + ) + if not schedule_event: + return None + return TriggerDebugEvent(workflow_args=schedule_event.inputs, node_id=self.node_id) + + +def create_event_poller( + draft_workflow: Workflow, tenant_id: str, user_id: str, app_id: str, node_id: str +) -> TriggerDebugEventPoller: + node_config = draft_workflow.get_node_config_by_id(node_id=node_id) + if not node_config: + raise ValueError("Node data not found for node %s", node_id) + node_type = draft_workflow.get_node_type_from_node_config(node_config) + match node_type: + case NodeType.TRIGGER_PLUGIN: + return PluginTriggerDebugEventPoller( + tenant_id=tenant_id, user_id=user_id, app_id=app_id, node_config=node_config, node_id=node_id + ) + case NodeType.TRIGGER_WEBHOOK: + return WebhookTriggerDebugEventPoller( + tenant_id=tenant_id, user_id=user_id, app_id=app_id, node_config=node_config, node_id=node_id + ) + case NodeType.TRIGGER_SCHEDULE: + return ScheduleTriggerDebugEventPoller( + tenant_id=tenant_id, user_id=user_id, app_id=app_id, node_config=node_config, node_id=node_id + ) + case _: + raise ValueError("unable to create event poller for node type %s", node_type) + + +def select_trigger_debug_events( + draft_workflow: Workflow, app_model: App, user_id: str, node_ids: list[str] +) -> TriggerDebugEvent | None: + event: TriggerDebugEvent | None = None + for node_id in node_ids: + node_config = draft_workflow.get_node_config_by_id(node_id=node_id) + if not node_config: + raise ValueError("Node data not found for node %s", node_id) + poller: TriggerDebugEventPoller = create_event_poller( + draft_workflow=draft_workflow, + tenant_id=app_model.tenant_id, + user_id=user_id, + app_id=app_model.id, + node_id=node_id, + ) + event = poller.poll() + if event is not None: + return event + return None diff --git a/api/core/trigger/debug/events.py b/api/core/trigger/debug/events.py new file mode 100644 index 0000000000..06ce534eb8 --- /dev/null +++ b/api/core/trigger/debug/events.py @@ -0,0 +1,92 @@ +from abc import ABC, abstractmethod +from collections.abc import Mapping +from typing import Any + +from pydantic import BaseModel, Field + + +class BaseDebugEvent(ABC, BaseModel): + """Base class for all debug events.""" + + timestamp: int + + @classmethod + @abstractmethod + def build_pool_key(cls, **kwargs: Any) -> str: + """ + Generate the waiting pool key for this event type. + + Each subclass implements its own pool key strategy based on routing parameters. + + Returns: + Redis key for the waiting pool + """ + raise NotImplementedError("Subclasses must implement build_pool_key") + + +class ScheduleDebugEvent(BaseDebugEvent): + """Debug event for schedule triggers.""" + + node_id: str + inputs: Mapping[str, Any] + + @classmethod + def build_pool_key(cls, **kwargs: Any) -> str: + """Generate pool key for schedule events. + + Args: + tenant_id: Tenant ID + app_id: App ID + node_id: Node ID + """ + tenant_id = kwargs["tenant_id"] + app_id = kwargs["app_id"] + node_id = kwargs["node_id"] + return f"schedule_trigger_debug_waiting_pool:{tenant_id}:{app_id}:{node_id}" + + +class WebhookDebugEvent(BaseDebugEvent): + """Debug event for webhook triggers.""" + + request_id: str + node_id: str + payload: dict[str, Any] = Field(default_factory=dict) + + @classmethod + def build_pool_key(cls, **kwargs: Any) -> str: + """Generate pool key for webhook events. + + Args: + tenant_id: Tenant ID + app_id: App ID + node_id: Node ID + """ + tenant_id = kwargs["tenant_id"] + app_id = kwargs["app_id"] + node_id = kwargs["node_id"] + return f"webhook_trigger_debug_waiting_pool:{tenant_id}:{app_id}:{node_id}" + + +class PluginTriggerDebugEvent(BaseDebugEvent): + """Debug event for plugin triggers.""" + + name: str + request_id: str + subscription_id: str + provider_id: str + + @classmethod + def build_pool_key(cls, **kwargs: Any) -> str: + """Generate pool key for plugin trigger events. + + Args: + name: Event name + tenant_id: Tenant ID + provider_id: Provider ID + subscription_id: Subscription ID + """ + tenant_id = kwargs["tenant_id"] + provider_id = kwargs["provider_id"] + subscription_id = kwargs["subscription_id"] + event_name = kwargs["name"] + return f"plugin_trigger_debug_waiting_pool:{tenant_id}:{str(provider_id)}:{subscription_id}:{event_name}" diff --git a/api/core/trigger/errors.py b/api/core/trigger/errors.py index 9e35d54785..6a2140feb8 100644 --- a/api/core/trigger/errors.py +++ b/api/core/trigger/errors.py @@ -1,7 +1,14 @@ +from core.plugin.impl.exc import PluginInvokeError + + class TriggerProviderCredentialValidationError(ValueError): pass +class TriggerPluginInvokeError(PluginInvokeError): + pass + + class TriggerInvokeError(Exception): pass diff --git a/api/core/trigger/provider.py b/api/core/trigger/provider.py index 56c95fd21a..467a012de8 100644 --- a/api/core/trigger/provider.py +++ b/api/core/trigger/provider.py @@ -146,7 +146,7 @@ class PluginTriggerProviderController: """ return {prop.name: prop.default for prop in self.entity.subscription_schema if prop.default} - def get_subscription_constructor(self) -> SubscriptionConstructor: + def get_subscription_constructor(self) -> SubscriptionConstructor | None: """ Get subscription constructor for this provider @@ -162,7 +162,10 @@ class PluginTriggerProviderController: :return: Validation response """ # First validate against schema - for config in self.entity.subscription_constructor.credentials_schema or []: + subscription_constructor: SubscriptionConstructor | None = self.entity.subscription_constructor + if not subscription_constructor: + raise ValueError("Subscription constructor not found") + for config in subscription_constructor.credentials_schema or []: if config.required and config.name not in credentials: raise TriggerProviderCredentialValidationError(f"Missing required credential field: {config.name}") @@ -202,6 +205,8 @@ class PluginTriggerProviderController: :return: List of provider config schemas """ subscription_constructor = self.entity.subscription_constructor + if not subscription_constructor: + raise ValueError("Subscription constructor not found") credential_type = CredentialType.of(credential_type) if isinstance(credential_type, str) else credential_type if credential_type == CredentialType.OAUTH2: return ( diff --git a/api/core/trigger/trigger_manager.py b/api/core/trigger/trigger_manager.py index 6dacfa4a55..43b2e40de4 100644 --- a/api/core/trigger/trigger_manager.py +++ b/api/core/trigger/trigger_manager.py @@ -8,19 +8,19 @@ from threading import Lock from typing import Any from flask import Request -from yarl import URL import contexts from configs import dify_config -from core.plugin.entities.plugin_daemon import CredentialType, PluginDaemonError, PluginTriggerProviderEntity +from core.plugin.entities.plugin_daemon import CredentialType, PluginTriggerProviderEntity from core.plugin.entities.request import TriggerInvokeEventResponse -from core.plugin.impl.exc import PluginInvokeError +from core.plugin.impl.exc import PluginDaemonError, PluginInvokeError from core.plugin.impl.trigger import PluginTriggerManager from core.trigger.entities.entities import ( EventEntity, Subscription, Unsubscription, ) +from core.trigger.errors import TriggerPluginInvokeError from core.trigger.provider import PluginTriggerProviderController from models.provider_ids import TriggerProviderID @@ -41,16 +41,9 @@ class TriggerManager: provider: PluginTriggerProviderEntity = manager.fetch_trigger_provider( tenant_id=tenant_id, provider_id=TriggerProviderID(provider_id) ) - return str( - URL(dify_config.CONSOLE_API_URL or "/") - / "console" - / "api" - / "workspaces" - / "current" - / "plugin" - / "icon" - % {"tenant_id": tenant_id, "filename": provider.declaration.identity.icon} - ) + filename = provider.declaration.identity.icon + base_url = f"{dify_config.CONSOLE_API_URL}/console/api/workspaces/current/plugin/icon" + return f"{base_url}?tenant_id={tenant_id}&filename={filename}" @classmethod def list_plugin_trigger_providers(cls, tenant_id: str) -> list[PluginTriggerProviderController]: @@ -194,9 +187,8 @@ class TriggerManager: except PluginInvokeError as e: if e.get_error_type() == "TriggerIgnoreEventError": return TriggerInvokeEventResponse(variables={}, cancelled=True) - else: - logger.exception("Failed to invoke trigger event") - raise + logger.exception("Failed to invoke trigger event") + raise TriggerPluginInvokeError(description=e.get_error_message()) from e @classmethod def subscribe_trigger( diff --git a/api/core/workflow/nodes/trigger_plugin/entities.py b/api/core/workflow/nodes/trigger_plugin/entities.py index b76aa731c0..b3d8f38895 100644 --- a/api/core/workflow/nodes/trigger_plugin/entities.py +++ b/api/core/workflow/nodes/trigger_plugin/entities.py @@ -6,7 +6,7 @@ from core.workflow.enums import ErrorStrategy from core.workflow.nodes.base.entities import BaseNodeData, RetryConfig -class PluginTriggerData(BaseNodeData): +class PluginTriggerNodeData(BaseNodeData): """Plugin trigger node data""" title: str diff --git a/api/core/workflow/nodes/trigger_plugin/trigger_plugin_node.py b/api/core/workflow/nodes/trigger_plugin/trigger_plugin_node.py index f54f69831f..ae4b3f5377 100644 --- a/api/core/workflow/nodes/trigger_plugin/trigger_plugin_node.py +++ b/api/core/workflow/nodes/trigger_plugin/trigger_plugin_node.py @@ -7,17 +7,17 @@ from core.workflow.node_events import NodeRunResult from core.workflow.nodes.base.entities import BaseNodeData, RetryConfig from core.workflow.nodes.base.node import Node -from .entities import PluginTriggerData +from .entities import PluginTriggerNodeData class TriggerPluginNode(Node): node_type = NodeType.TRIGGER_PLUGIN execution_type = NodeExecutionType.ROOT - _node_data: PluginTriggerData + _node_data: PluginTriggerNodeData def init_node_data(self, data: Mapping[str, Any]) -> None: - self._node_data = PluginTriggerData.model_validate(data) + self._node_data = PluginTriggerNodeData.model_validate(data) def _get_error_strategy(self) -> Optional[ErrorStrategy]: return self._node_data.error_strategy diff --git a/api/events/event_handlers/sync_plugin_trigger_when_app_created.py b/api/events/event_handlers/sync_plugin_trigger_when_app_created.py index b24e3a8bae..68be37dfdb 100644 --- a/api/events/event_handlers/sync_plugin_trigger_when_app_created.py +++ b/api/events/event_handlers/sync_plugin_trigger_when_app_created.py @@ -3,7 +3,7 @@ import logging from events.app_event import app_draft_workflow_was_synced from models.model import App, AppMode from models.workflow import Workflow -from services.trigger.workflow_plugin_trigger_service import WorkflowPluginTriggerService +from services.trigger.trigger_service import TriggerService logger = logging.getLogger(__name__) @@ -19,4 +19,4 @@ def handle(sender, synced_draft_workflow: Workflow, **kwargs): # only handle workflow app, chatflow is not supported yet return - WorkflowPluginTriggerService.sync_plugin_trigger_relationships(app, synced_draft_workflow) + TriggerService.sync_plugin_trigger_relationships(app, synced_draft_workflow) diff --git a/api/services/trigger/schedule_service.py b/api/services/trigger/schedule_service.py index c153096505..333eeb2cc4 100644 --- a/api/services/trigger/schedule_service.py +++ b/api/services/trigger/schedule_service.py @@ -11,29 +11,12 @@ from core.workflow.nodes.trigger_schedule.entities import ScheduleConfig, Schedu from core.workflow.nodes.trigger_schedule.exc import ScheduleConfigError, ScheduleNotFoundError from libs.schedule_utils import calculate_next_run_at, convert_12h_to_24h from models.account import Account, TenantAccountJoin -from models.model import App from models.workflow import Workflow, WorkflowSchedulePlan -from services.trigger.trigger_debug_service import ScheduleDebugEvent, TriggerDebugService logger = logging.getLogger(__name__) class ScheduleService: - @classmethod - def poll_debug_event(cls, app_model: App, user_id: str, node_id: str) -> ScheduleDebugEvent | None: - """Poll a debug event for a schedule trigger.""" - pool_key = ScheduleDebugEvent.build_pool_key( - tenant_id=app_model.tenant_id, app_id=app_model.id, node_id=node_id - ) - return TriggerDebugService.poll( - event_type=ScheduleDebugEvent, - pool_key=pool_key, - tenant_id=app_model.tenant_id, - user_id=user_id, - app_id=app_model.id, - node_id=node_id, - ) - @staticmethod def create_schedule( session: Session, diff --git a/api/services/trigger/trigger_service.py b/api/services/trigger/trigger_service.py index c08bbeccf6..8dc5312e2f 100644 --- a/api/services/trigger/trigger_service.py +++ b/api/services/trigger/trigger_service.py @@ -4,21 +4,24 @@ import uuid from collections.abc import Mapping, Sequence from typing import Any -from flask import Request, Response, request +from flask import Request, Response +from pydantic import BaseModel from sqlalchemy import and_, func, select from sqlalchemy.orm import Session -from core.helper.trace_id_helper import get_external_trace_id from core.plugin.entities.plugin_daemon import CredentialType from core.plugin.entities.request import TriggerDispatchResponse, TriggerInvokeEventResponse from core.plugin.utils.http_parser import deserialize_request, serialize_request +from core.trigger.debug.events import PluginTriggerDebugEvent from core.trigger.entities.entities import EventEntity from core.trigger.provider import PluginTriggerProviderController from core.trigger.trigger_manager import TriggerManager from core.trigger.utils.encryption import create_trigger_provider_encrypter_for_subscription from core.workflow.enums import NodeType +from core.workflow.nodes.trigger_plugin.entities import PluginTriggerNodeData from core.workflow.nodes.trigger_schedule.exc import TenantOwnerNotFoundError from extensions.ext_database import db +from extensions.ext_redis import redis_client from extensions.ext_storage import storage from models.account import Account, TenantAccountJoin, TenantAccountRole from models.enums import WorkflowRunTriggeredFrom @@ -27,10 +30,8 @@ from models.provider_ids import TriggerProviderID from models.trigger import TriggerSubscription from models.workflow import AppTrigger, AppTriggerStatus, Workflow, WorkflowPluginTrigger from services.async_workflow_service import AsyncWorkflowService -from services.trigger.trigger_debug_service import PluginTriggerDebugEvent, TriggerDebugService from services.trigger.trigger_provider_service import TriggerProviderService from services.workflow.entities import PluginTriggerData, PluginTriggerDispatchData -from services.workflow_service import WorkflowService logger = logging.getLogger(__name__) @@ -39,6 +40,8 @@ class TriggerService: __TEMPORARY_ENDPOINT_EXPIRE_MS__ = 5 * 60 * 1000 __ENDPOINT_REQUEST_CACHE_COUNT__ = 10 __ENDPOINT_REQUEST_CACHE_EXPIRE_MS__ = 5 * 60 * 1000 + __PLUGIN_TRIGGER_NODE_CACHE_KEY__ = "plugin_trigger_nodes" + MAX_PLUGIN_TRIGGER_NODES_PER_WORKFLOW = 5 # Maximum allowed plugin trigger nodes per workflow @classmethod def invoke_trigger_event( @@ -51,9 +54,7 @@ class TriggerService: ) if not subscription: raise ValueError("Subscription not found") - node_data = node_config.get("data") - if not node_data: - raise ValueError("Node data not found") + node_data: PluginTriggerNodeData = PluginTriggerNodeData.model_validate(node_config.get("data", {})) request = deserialize_request(storage.load_once(f"triggers/{event.request_id}")) if not request: raise ValueError("Request not found") @@ -63,64 +64,13 @@ class TriggerService: user_id=user_id, provider_id=TriggerProviderID(event.provider_id), event_name=event.name, - parameters=node_data.get("parameters", {}), + parameters=node_data.parameters, credentials=subscription.credentials, credential_type=CredentialType.of(subscription.credential_type), subscription=subscription.to_entity(), request=request, ) - @classmethod - def build_workflow_args(cls, event: PluginTriggerDebugEvent) -> Mapping[str, Any]: - """Build workflow args from plugin trigger debug event.""" - workflow_args = { - "inputs": event.model_dump(), - "query": "", - "files": [], - } - external_trace_id = get_external_trace_id(request) - if external_trace_id: - workflow_args["external_trace_id"] = external_trace_id - - return workflow_args - - @classmethod - def poll_debug_event(cls, app_model: App, user_id: str, node_id: str) -> PluginTriggerDebugEvent | None: - """Poll webhook debug event for a given node ID.""" - workflow_service = WorkflowService() - workflow: Workflow | None = workflow_service.get_draft_workflow( - app_model=app_model, - workflow_id=None, - ) - - if not workflow: - raise ValueError("Workflow not found") - - node_data = workflow.get_node_config_by_id(node_id=node_id).get("data") - if not node_data: - raise ValueError("Node config not found") - - event_name = node_data.get("event_name") - subscription_id = node_data.get("subscription_id") - if not subscription_id: - raise ValueError("Subscription ID not found") - - provider_id = TriggerProviderID(node_data.get("provider_id")) - pool_key: str = PluginTriggerDebugEvent.build_pool_key( - name=event_name, - provider_id=provider_id, - tenant_id=app_model.tenant_id, - subscription_id=subscription_id, - ) - return TriggerDebugService.poll( - event_type=PluginTriggerDebugEvent, - pool_key=pool_key, - tenant_id=app_model.tenant_id, - user_id=user_id, - app_id=app_model.id, - node_id=node_id, - ) - @classmethod def _get_latest_workflows_by_app_ids( cls, session: Session, subscribers: Sequence[WorkflowPluginTrigger] @@ -208,12 +158,13 @@ class TriggerService: continue # invoke triger + node_data: PluginTriggerNodeData = PluginTriggerNodeData.model_validate(event_node.get("data", {})) invoke_response: TriggerInvokeEventResponse = TriggerManager.invoke_trigger_event( tenant_id=subscription.tenant_id, user_id=subscription.user_id, provider_id=TriggerProviderID(subscription.provider_id), event_name=event.identity.name, - parameters=event_node.get("config", {}).get("parameters", {}), + parameters=node_data.parameters, credentials=subscription.credentials, credential_type=CredentialType.of(subscription.credential_type), subscription=subscription.to_entity(), @@ -348,3 +299,191 @@ class TriggerService: ) ).all() return list(subscribers) + + @classmethod + def delete_plugin_trigger_by_subscription( + cls, + session: Session, + tenant_id: str, + subscription_id: str, + ) -> None: + """Delete a plugin trigger by tenant_id and subscription_id within an existing session + + Args: + session: Database session + tenant_id: The tenant ID + subscription_id: The subscription ID + + Raises: + NotFound: If plugin trigger not found + """ + # Find plugin trigger using indexed columns + plugin_trigger = session.scalar( + select(WorkflowPluginTrigger).where( + WorkflowPluginTrigger.tenant_id == tenant_id, + WorkflowPluginTrigger.subscription_id == subscription_id, + ) + ) + + if not plugin_trigger: + return + + session.delete(plugin_trigger) + + @classmethod + def sync_plugin_trigger_relationships(cls, app: App, workflow: Workflow): + """ + Sync plugin trigger relationships in DB. + + 1. Check if the workflow has any plugin trigger nodes + 2. Fetch the nodes from DB, see if there were any plugin trigger records already + 3. Diff the nodes and the plugin trigger records, create/update/delete the records as needed + + Approach: + Frequent DB operations may cause performance issues, using Redis to cache it instead. + If any record exists, cache it. + + Limits: + - Maximum 5 plugin trigger nodes per workflow + """ + + class Cache(BaseModel): + """ + Cache model for plugin trigger nodes + """ + + record_id: str + node_id: str + provider_id: str + event_name: str + subscription_id: str + + # Walk nodes to find plugin triggers + nodes_in_graph = [] + for node_id, node_config in workflow.walk_nodes(NodeType.TRIGGER_PLUGIN): + # Extract plugin trigger configuration from node + plugin_id = node_config.get("plugin_id", "") + provider_id = node_config.get("provider_id", "") + event_name = node_config.get("event_name", "") + subscription_id = node_config.get("subscription_id", "") + + if not subscription_id: + continue + + nodes_in_graph.append( + { + "node_id": node_id, + "plugin_id": plugin_id, + "provider_id": provider_id, + "event_name": event_name, + "subscription_id": subscription_id, + } + ) + + # Check plugin trigger node limit + if len(nodes_in_graph) > cls.MAX_PLUGIN_TRIGGER_NODES_PER_WORKFLOW: + raise ValueError( + f"Workflow exceeds maximum plugin trigger node limit. " + f"Found {len(nodes_in_graph)} plugin trigger nodes, " + f"maximum allowed is {cls.MAX_PLUGIN_TRIGGER_NODES_PER_WORKFLOW}" + ) + + not_found_in_cache: list[dict] = [] + for node_info in nodes_in_graph: + node_id = node_info["node_id"] + # firstly check if the node exists in cache + if not redis_client.get(f"{cls.__PLUGIN_TRIGGER_NODE_CACHE_KEY__}:{node_id}"): + not_found_in_cache.append(node_info) + continue + + with Session(db.engine) as session: + try: + # lock the concurrent plugin trigger creation + redis_client.lock(f"{cls.__PLUGIN_TRIGGER_NODE_CACHE_KEY__}:apps:{app.id}:lock", timeout=10) + # fetch the non-cached nodes from DB + all_records = session.scalars( + select(WorkflowPluginTrigger).where( + WorkflowPluginTrigger.app_id == app.id, + WorkflowPluginTrigger.tenant_id == app.tenant_id, + ) + ).all() + + nodes_id_in_db = {node.node_id: node for node in all_records} + nodes_id_in_graph = {node["node_id"] for node in nodes_in_graph} + + # get the nodes not found both in cache and DB + nodes_not_found = [ + node_info for node_info in not_found_in_cache if node_info["node_id"] not in nodes_id_in_db + ] + + # create new plugin trigger records + for node_info in nodes_not_found: + plugin_trigger = WorkflowPluginTrigger( + app_id=app.id, + tenant_id=app.tenant_id, + node_id=node_info["node_id"], + provider_id=node_info["provider_id"], + event_name=node_info["event_name"], + subscription_id=node_info["subscription_id"], + ) + session.add(plugin_trigger) + session.flush() # Get the ID for caching + + cache = Cache( + record_id=plugin_trigger.id, + node_id=node_info["node_id"], + provider_id=node_info["provider_id"], + event_name=node_info["event_name"], + subscription_id=node_info["subscription_id"], + ) + redis_client.set( + f"{cls.__PLUGIN_TRIGGER_NODE_CACHE_KEY__}:{node_info['node_id']}", + cache.model_dump_json(), + ex=60 * 60, + ) + session.commit() + + # Update existing records if subscription_id changed + for node_info in nodes_in_graph: + node_id = node_info["node_id"] + if node_id in nodes_id_in_db: + existing_record = nodes_id_in_db[node_id] + if ( + existing_record.subscription_id != node_info["subscription_id"] + or existing_record.provider_id != node_info["provider_id"] + or existing_record.event_name != node_info["event_name"] + ): + existing_record.subscription_id = node_info["subscription_id"] + existing_record.provider_id = node_info["provider_id"] + existing_record.event_name = node_info["event_name"] + session.add(existing_record) + + # Update cache + cache = Cache( + record_id=existing_record.id, + node_id=node_id, + provider_id=node_info["provider_id"], + event_name=node_info["event_name"], + subscription_id=node_info["subscription_id"], + ) + redis_client.set( + f"{cls.__PLUGIN_TRIGGER_NODE_CACHE_KEY__}:{node_id}", + cache.model_dump_json(), + ex=60 * 60, + ) + session.commit() + + # delete the nodes not found in the graph + for node_id in nodes_id_in_db: + if node_id not in nodes_id_in_graph: + session.delete(nodes_id_in_db[node_id]) + redis_client.delete(f"{cls.__PLUGIN_TRIGGER_NODE_CACHE_KEY__}:{node_id}") + session.commit() + except Exception: + import logging + + logger = logging.getLogger(__name__) + logger.exception("Failed to sync plugin trigger relationships for app %s", app.id) + raise + finally: + redis_client.delete(f"{cls.__PLUGIN_TRIGGER_NODE_CACHE_KEY__}:apps:{app.id}:lock") diff --git a/api/services/trigger/trigger_subscription_builder_service.py b/api/services/trigger/trigger_subscription_builder_service.py index 6e4a3208cf..861bd8e927 100644 --- a/api/services/trigger/trigger_subscription_builder_service.py +++ b/api/services/trigger/trigger_subscription_builder_service.py @@ -17,6 +17,7 @@ from core.trigger.entities.entities import ( Subscription, SubscriptionBuilder, SubscriptionBuilderUpdater, + SubscriptionConstructor, ) from core.trigger.provider import PluginTriggerProviderController from core.trigger.trigger_manager import TriggerManager @@ -183,7 +184,7 @@ class TriggerSubscriptionBuilderService: if not provider_controller: raise ValueError(f"Provider {provider_id} not found") - subscription_constructor = provider_controller.get_subscription_constructor() + subscription_constructor: SubscriptionConstructor | None = provider_controller.get_subscription_constructor() subscription_id = str(uuid.uuid4()) subscription_builder = SubscriptionBuilder( id=subscription_id, diff --git a/api/services/trigger/webhook_service.py b/api/services/trigger/webhook_service.py index 1ffc1c4d80..4038107899 100644 --- a/api/services/trigger/webhook_service.py +++ b/api/services/trigger/webhook_service.py @@ -13,7 +13,6 @@ from werkzeug.exceptions import RequestEntityTooLarge from configs import dify_config from core.file.models import FileTransferMethod -from core.helper.trace_id_helper import get_external_trace_id from core.tools.tool_file_manager import ToolFileManager from core.variables.types import SegmentType from core.workflow.enums import NodeType @@ -25,7 +24,6 @@ from models.enums import WorkflowRunTriggeredFrom from models.model import App from models.workflow import AppTrigger, AppTriggerStatus, AppTriggerType, Workflow, WorkflowWebhookTrigger from services.async_workflow_service import AsyncWorkflowService -from services.trigger.trigger_debug_service import TriggerDebugService, WebhookDebugEvent from services.workflow.entities import TriggerData logger = logging.getLogger(__name__) @@ -37,53 +35,6 @@ class WebhookService: __WEBHOOK_NODE_CACHE_KEY__ = "webhook_nodes" MAX_WEBHOOK_NODES_PER_WORKFLOW = 5 # Maximum allowed webhook nodes per workflow - @classmethod - def build_workflow_args(cls, event: WebhookDebugEvent) -> Mapping[str, Any]: - """Build workflow args from webhook debug event.""" - payload = event.payload or {} - workflow_inputs = payload.get("inputs") - if workflow_inputs is None: - webhook_data = payload.get("webhook_data", {}) - workflow_inputs = WebhookService.build_workflow_inputs(webhook_data) - - workflow_args = { - "inputs": workflow_inputs or {}, - "query": "", - "files": [], - } - - external_trace_id = get_external_trace_id(request) - if external_trace_id: - workflow_args["external_trace_id"] = external_trace_id - - return workflow_args - - @classmethod - def poll_debug_event(cls, app_model: App, user_id: str, node_id: str) -> WebhookDebugEvent | None: - """Poll webhook debug event for a given node ID. - - Args: - app_model: The app model - user_id: The user ID - node_id: The node ID to poll for - - Returns: - WebhookDebugEvent | None: The webhook debug event if available, None otherwise - """ - pool_key = WebhookDebugEvent.build_pool_key( - tenant_id=app_model.tenant_id, - app_id=app_model.id, - node_id=node_id, - ) - return TriggerDebugService.poll( - event_type=WebhookDebugEvent, - pool_key=pool_key, - tenant_id=app_model.tenant_id, - user_id=user_id, - app_id=app_model.id, - node_id=node_id, - ) - @classmethod def get_webhook_trigger_and_workflow( cls, webhook_id: str, is_debug: bool = False diff --git a/api/services/trigger/workflow_plugin_trigger_service.py b/api/services/trigger/workflow_plugin_trigger_service.py deleted file mode 100644 index e8bfa9c06f..0000000000 --- a/api/services/trigger/workflow_plugin_trigger_service.py +++ /dev/null @@ -1,204 +0,0 @@ -from pydantic import BaseModel -from sqlalchemy import select -from sqlalchemy.orm import Session - -from core.workflow.enums import NodeType -from extensions.ext_database import db -from extensions.ext_redis import redis_client -from models.model import App -from models.workflow import Workflow, WorkflowPluginTrigger - - -class WorkflowPluginTriggerService: - """Service for managing workflow plugin triggers""" - - __PLUGIN_TRIGGER_NODE_CACHE_KEY__ = "plugin_trigger_nodes" - MAX_PLUGIN_TRIGGER_NODES_PER_WORKFLOW = 5 # Maximum allowed plugin trigger nodes per workflow - - @classmethod - def delete_plugin_trigger_by_subscription( - cls, - session: Session, - tenant_id: str, - subscription_id: str, - ) -> None: - """Delete a plugin trigger by tenant_id and subscription_id within an existing session - - Args: - session: Database session - tenant_id: The tenant ID - subscription_id: The subscription ID - - Raises: - NotFound: If plugin trigger not found - """ - # Find plugin trigger using indexed columns - plugin_trigger = session.scalar( - select(WorkflowPluginTrigger).where( - WorkflowPluginTrigger.tenant_id == tenant_id, - WorkflowPluginTrigger.subscription_id == subscription_id, - ) - ) - - if not plugin_trigger: - return - - session.delete(plugin_trigger) - - @classmethod - def sync_plugin_trigger_relationships(cls, app: App, workflow: Workflow): - """ - Sync plugin trigger relationships in DB. - - 1. Check if the workflow has any plugin trigger nodes - 2. Fetch the nodes from DB, see if there were any plugin trigger records already - 3. Diff the nodes and the plugin trigger records, create/update/delete the records as needed - - Approach: - Frequent DB operations may cause performance issues, using Redis to cache it instead. - If any record exists, cache it. - - Limits: - - Maximum 5 plugin trigger nodes per workflow - """ - - class Cache(BaseModel): - """ - Cache model for plugin trigger nodes - """ - - record_id: str - node_id: str - provider_id: str - event_name: str - subscription_id: str - - # Walk nodes to find plugin triggers - nodes_in_graph = [] - for node_id, node_config in workflow.walk_nodes(NodeType.TRIGGER_PLUGIN): - # Extract plugin trigger configuration from node - plugin_id = node_config.get("plugin_id", "") - provider_id = node_config.get("provider_id", "") - event_name = node_config.get("event_name", "") - subscription_id = node_config.get("subscription_id", "") - - if not subscription_id: - continue - - nodes_in_graph.append( - { - "node_id": node_id, - "plugin_id": plugin_id, - "provider_id": provider_id, - "event_name": event_name, - "subscription_id": subscription_id, - } - ) - - # Check plugin trigger node limit - if len(nodes_in_graph) > cls.MAX_PLUGIN_TRIGGER_NODES_PER_WORKFLOW: - raise ValueError( - f"Workflow exceeds maximum plugin trigger node limit. " - f"Found {len(nodes_in_graph)} plugin trigger nodes, " - f"maximum allowed is {cls.MAX_PLUGIN_TRIGGER_NODES_PER_WORKFLOW}" - ) - - not_found_in_cache: list[dict] = [] - for node_info in nodes_in_graph: - node_id = node_info["node_id"] - # firstly check if the node exists in cache - if not redis_client.get(f"{cls.__PLUGIN_TRIGGER_NODE_CACHE_KEY__}:{node_id}"): - not_found_in_cache.append(node_info) - continue - - with Session(db.engine) as session: - try: - # lock the concurrent plugin trigger creation - redis_client.lock(f"{cls.__PLUGIN_TRIGGER_NODE_CACHE_KEY__}:apps:{app.id}:lock", timeout=10) - # fetch the non-cached nodes from DB - all_records = session.scalars( - select(WorkflowPluginTrigger).where( - WorkflowPluginTrigger.app_id == app.id, - WorkflowPluginTrigger.tenant_id == app.tenant_id, - ) - ).all() - - nodes_id_in_db = {node.node_id: node for node in all_records} - nodes_id_in_graph = {node["node_id"] for node in nodes_in_graph} - - # get the nodes not found both in cache and DB - nodes_not_found = [ - node_info for node_info in not_found_in_cache if node_info["node_id"] not in nodes_id_in_db - ] - - # create new plugin trigger records - for node_info in nodes_not_found: - plugin_trigger = WorkflowPluginTrigger( - app_id=app.id, - tenant_id=app.tenant_id, - node_id=node_info["node_id"], - provider_id=node_info["provider_id"], - event_name=node_info["event_name"], - subscription_id=node_info["subscription_id"], - ) - session.add(plugin_trigger) - session.flush() # Get the ID for caching - - cache = Cache( - record_id=plugin_trigger.id, - node_id=node_info["node_id"], - provider_id=node_info["provider_id"], - event_name=node_info["event_name"], - subscription_id=node_info["subscription_id"], - ) - redis_client.set( - f"{cls.__PLUGIN_TRIGGER_NODE_CACHE_KEY__}:{node_info['node_id']}", - cache.model_dump_json(), - ex=60 * 60, - ) - session.commit() - - # Update existing records if subscription_id changed - for node_info in nodes_in_graph: - node_id = node_info["node_id"] - if node_id in nodes_id_in_db: - existing_record = nodes_id_in_db[node_id] - if ( - existing_record.subscription_id != node_info["subscription_id"] - or existing_record.provider_id != node_info["provider_id"] - or existing_record.event_name != node_info["event_name"] - ): - existing_record.subscription_id = node_info["subscription_id"] - existing_record.provider_id = node_info["provider_id"] - existing_record.event_name = node_info["event_name"] - session.add(existing_record) - - # Update cache - cache = Cache( - record_id=existing_record.id, - node_id=node_id, - provider_id=node_info["provider_id"], - event_name=node_info["event_name"], - subscription_id=node_info["subscription_id"], - ) - redis_client.set( - f"{cls.__PLUGIN_TRIGGER_NODE_CACHE_KEY__}:{node_id}", - cache.model_dump_json(), - ex=60 * 60, - ) - session.commit() - - # delete the nodes not found in the graph - for node_id in nodes_id_in_db: - if node_id not in nodes_id_in_graph: - session.delete(nodes_id_in_db[node_id]) - redis_client.delete(f"{cls.__PLUGIN_TRIGGER_NODE_CACHE_KEY__}:{node_id}") - session.commit() - except Exception: - import logging - - logger = logging.getLogger(__name__) - logger.exception("Failed to sync plugin trigger relationships for app %s", app.id) - raise - finally: - redis_client.delete(f"{cls.__PLUGIN_TRIGGER_NODE_CACHE_KEY__}:apps:{app.id}:lock") diff --git a/api/services/workflow_service.py b/api/services/workflow_service.py index a5e3581215..7b30f36cbe 100644 --- a/api/services/workflow_service.py +++ b/api/services/workflow_service.py @@ -23,7 +23,7 @@ from core.workflow.nodes import NodeType from core.workflow.nodes.base.node import Node from core.workflow.nodes.node_mapping import LATEST_VERSION, NODE_TYPE_CLASSES_MAPPING from core.workflow.nodes.start.entities import StartNodeData -from core.workflow.nodes.trigger_plugin.entities import PluginTriggerData +from core.workflow.nodes.trigger_plugin.entities import PluginTriggerNodeData from core.workflow.nodes.trigger_schedule.entities import TriggerScheduleNodeData from core.workflow.nodes.trigger_webhook.entities import WebhookData from core.workflow.system_variable import SystemVariable @@ -636,7 +636,7 @@ class WorkflowService: if node_type == NodeType.TRIGGER_WEBHOOK: start_data = WebhookData.model_validate(node_data) elif node_type == NodeType.TRIGGER_PLUGIN: - start_data = PluginTriggerData.model_validate(node_data) + start_data = PluginTriggerNodeData.model_validate(node_data) elif node_type == NodeType.TRIGGER_SCHEDULE: start_data = TriggerScheduleNodeData.model_validate(node_data) else: diff --git a/api/tasks/trigger_processing_tasks.py b/api/tasks/trigger_processing_tasks.py index 1b21e6ad4f..53f6a8769c 100644 --- a/api/tasks/trigger_processing_tasks.py +++ b/api/tasks/trigger_processing_tasks.py @@ -10,6 +10,8 @@ import logging from celery import shared_task from sqlalchemy.orm import Session +from core.trigger.debug.event_bus import TriggerDebugEventBus +from core.trigger.debug.events import PluginTriggerDebugEvent from core.trigger.provider import PluginTriggerProviderController from core.trigger.trigger_manager import TriggerManager from core.trigger.utils.encryption import ( @@ -20,7 +22,6 @@ from extensions.ext_database import db from extensions.ext_storage import storage from models.provider_ids import TriggerProviderID from models.trigger import TriggerSubscription -from services.trigger.trigger_debug_service import PluginTriggerDebugEvent, TriggerDebugService from services.trigger.trigger_service import TriggerService from services.workflow.entities import PluginTriggerDispatchData @@ -152,7 +153,7 @@ def dispatch_triggered_workflows_async( timestamp=timestamp, name=event_name, ) - debug_dispatched += TriggerDebugService.dispatch( + debug_dispatched += TriggerDebugEventBus.dispatch( tenant_id=subscription.tenant_id, event=event, pool_key=pool_key, diff --git a/api/tasks/workflow_schedule_tasks.py b/api/tasks/workflow_schedule_tasks.py index 1c06649eae..95095a69bd 100644 --- a/api/tasks/workflow_schedule_tasks.py +++ b/api/tasks/workflow_schedule_tasks.py @@ -6,6 +6,8 @@ from zoneinfo import ZoneInfo from celery import shared_task from sqlalchemy.orm import sessionmaker +from core.trigger.debug.event_bus import TriggerDebugEventBus +from core.trigger.debug.events import ScheduleDebugEvent from core.workflow.nodes.trigger_schedule.exc import ( ScheduleExecutionError, ScheduleNotFoundError, @@ -16,7 +18,6 @@ from models.enums import WorkflowRunTriggeredFrom from models.workflow import WorkflowSchedulePlan from services.async_workflow_service import AsyncWorkflowService from services.trigger.schedule_service import ScheduleService -from services.trigger.trigger_debug_service import ScheduleDebugEvent, TriggerDebugService from services.workflow.entities import TriggerData logger = logging.getLogger(__name__) @@ -78,7 +79,7 @@ def run_schedule_trigger(schedule_id: str) -> None: app_id=schedule.app_id, node_id=schedule.node_id, ) - dispatched_count = TriggerDebugService.dispatch( + dispatched_count = TriggerDebugEventBus.dispatch( tenant_id=schedule.tenant_id, event=event, pool_key=pool_key, diff --git a/web/app/components/workflow-app/hooks/use-workflow-run.ts b/web/app/components/workflow-app/hooks/use-workflow-run.ts index b3602f60bf..f0ff480d7b 100644 --- a/web/app/components/workflow-app/hooks/use-workflow-run.ts +++ b/web/app/components/workflow-app/hooks/use-workflow-run.ts @@ -170,21 +170,7 @@ export const useWorkflowRun = () => { const isInWorkflowDebug = appDetail?.mode === 'workflow' let url = '' - if (runMode === 'schedule') { - if (!appDetail?.id) { - console.error('handleRun: missing app id for schedule trigger run') - return - } - url = `/apps/${appDetail.id}/workflows/draft/trigger/schedule/run` - } - else if (runMode === 'webhook') { - if (!appDetail?.id) { - console.error('handleRun: missing app id for webhook trigger run') - return - } - url = `/apps/${appDetail.id}/workflows/draft/trigger/webhook/run` - } - else if (runMode === 'plugin') { + if (runMode === 'plugin' || runMode === 'webhook' || runMode === 'schedule') { if (!appDetail?.id) { console.error('handleRun: missing app id for trigger plugin run') return diff --git a/web/app/components/workflow/nodes/_base/hooks/use-one-step-run.ts b/web/app/components/workflow/nodes/_base/hooks/use-one-step-run.ts index dd1b39f6d8..f87b2ed42d 100644 --- a/web/app/components/workflow/nodes/_base/hooks/use-one-step-run.ts +++ b/web/app/components/workflow/nodes/_base/hooks/use-one-step-run.ts @@ -264,7 +264,7 @@ const useOneStepRun = ({ }, []) const runWebhookSingleRun = useCallback(async (): Promise => { - const urlPath = `/apps/${flowId}/workflows/draft/nodes/${id}/debug/webhook/run` + const urlPath = `/apps/${flowId}/workflows/draft/nodes/${id}/trigger/run` const urlWithPrefix = `${API_PREFIX}${urlPath.startsWith('/') ? urlPath : `/${urlPath}`}` webhookSingleRunActiveRef.current = true @@ -364,7 +364,7 @@ const useOneStepRun = ({ }, [flowId, id, data, handleNodeDataUpdate, cancelWebhookSingleRun]) const runPluginSingleRun = useCallback(async (): Promise => { - const urlPath = `/apps/${flowId}/workflows/draft/nodes/${id}/trigger` + const urlPath = `/apps/${flowId}/workflows/draft/nodes/${id}/trigger/run` const urlWithPrefix = `${API_PREFIX}${urlPath.startsWith('/') ? urlPath : `/${urlPath}`}` webhookSingleRunActiveRef.current = true