diff --git a/api/controllers/console/app/workflow.py b/api/controllers/console/app/workflow.py index e36f308bd4..cdb0d1b5c0 100644 --- a/api/controllers/console/app/workflow.py +++ b/api/controllers/console/app/workflow.py @@ -806,6 +806,72 @@ class DraftWorkflowNodeLastRunApi(Resource): return node_exec +class DraftWorkflowTriggerNodeApi(Resource): + @setup_required + @login_required + @account_initialization_required + @get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW]) + def post(self, app_model: App, node_id: str): + """ + Debug trigger node by creating a debug session and listening for events. + """ + srv = WorkflowService() + workflow = srv.get_draft_workflow(app_model) + if not workflow: + raise NotFound("Workflow not found") + + # Get node configuration + node_config = workflow.get_node_config_by_id(node_id) + if not node_config: + raise NotFound(f"Node {node_id} not found in workflow") + + # Validate it's a trigger plugin node + if node_config.get("data", {}).get("type") != "plugin": + raise ValueError("Node is not a trigger plugin node") + + # Get subscription ID from node config + subscription_id = node_config.get("data", {}).get("subscription_id") + if not subscription_id: + raise ValueError("No subscription ID configured for this trigger node") + + # Create debug session + from services.trigger_debug_service import TriggerDebugService + + assert isinstance(current_user, Account) + session_id = TriggerDebugService.create_debug_session( + app_id=str(app_model.id), + node_id=node_id, + subscription_id=subscription_id, + user_id=current_user.id, + timeout=300, + ) + + # Stream events to client + def generate(): + for event_data in TriggerDebugService.listen_for_events(session_id): + if isinstance(event_data, dict): + if event_data.get("type") == "heartbeat": + yield f"event: heartbeat\ndata: {json.dumps(event_data)}\n\n" + elif event_data.get("type") == "timeout": + yield f"event: timeout\ndata: {json.dumps({'message': 'Session timed out'})}\n\n" + break + elif event_data.get("type") == "error": + yield f"event: error\ndata: {json.dumps(event_data)}\n\n" + break + else: + # Trigger event - prepare for workflow execution + yield f"event: trigger\ndata: {json.dumps(event_data)}\n\n" + + # TODO: Execute workflow with trigger data if needed + # This would involve extracting trigger data and running the workflow + # For now, just send the trigger event + break + + from flask import Response + + return Response(generate(), mimetype="text/event-stream") + + api.add_resource( DraftWorkflowApi, "/apps//workflows/draft", @@ -830,6 +896,10 @@ api.add_resource( DraftWorkflowNodeRunApi, "/apps//workflows/draft/nodes//run", ) +api.add_resource( + DraftWorkflowTriggerNodeApi, + "/apps//workflows/draft/nodes//trigger", +) api.add_resource( AdvancedChatDraftRunIterationNodeApi, "/apps//advanced-chat/workflows/draft/iteration/nodes//run", diff --git a/api/core/plugin/entities/request.py b/api/core/plugin/entities/request.py index 4c271912f1..d2bd4001c0 100644 --- a/api/core/plugin/entities/request.py +++ b/api/core/plugin/entities/request.py @@ -1,3 +1,4 @@ +from collections.abc import Mapping from typing import Any, Literal, Optional from flask import Response @@ -239,9 +240,11 @@ class RequestFetchAppInfo(BaseModel): app_id: str +class Event(BaseModel): + variables: Mapping[str, Any] class TriggerInvokeResponse(BaseModel): - event: dict[str, Any] + event: Event class PluginTriggerDispatchResponse(BaseModel): diff --git a/api/core/trigger/entities/entities.py b/api/core/trigger/entities/entities.py index 49b4945ea1..93f6daf8b7 100644 --- a/api/core/trigger/entities/entities.py +++ b/api/core/trigger/entities/entities.py @@ -3,7 +3,7 @@ from datetime import datetime from enum import StrEnum from typing import Any, Optional, Union -from pydantic import BaseModel, Field +from pydantic import BaseModel, ConfigDict, Field from core.entities.provider_entities import ProviderConfig from core.plugin.entities.parameters import PluginParameterAutoGenerate, PluginParameterOption, PluginParameterTemplate @@ -251,12 +251,24 @@ class SubscriptionBuilderUpdater(BaseModel): subscription_builder.expires_at = self.expires_at +class TriggerDebugEventData(BaseModel): + """Debug event data dispatched to debug sessions.""" + + subscription_id: str + triggers: list[str] + request_id: str + timestamp: float + + model_config = ConfigDict(arbitrary_types_allowed=True) + + # Export all entities __all__ = [ "OAuthSchema", "RequestLog", "Subscription", "SubscriptionBuilder", + "TriggerDebugEventData", "TriggerDescription", "TriggerEntity", "TriggerIdentity", diff --git a/api/core/workflow/nodes/trigger_plugin/trigger_plugin_node.py b/api/core/workflow/nodes/trigger_plugin/trigger_plugin_node.py index a003cce903..bc5211c6e3 100644 --- a/api/core/workflow/nodes/trigger_plugin/trigger_plugin_node.py +++ b/api/core/workflow/nodes/trigger_plugin/trigger_plugin_node.py @@ -1,11 +1,17 @@ from collections.abc import Mapping from typing import Any, Optional +from core.plugin.entities.plugin import TriggerProviderID +from core.plugin.utils.http_parser import deserialize_request +from core.trigger.entities.api_entities import TriggerProviderSubscriptionApiEntity +from core.trigger.trigger_manager import TriggerManager from core.workflow.entities.node_entities import NodeRunResult from core.workflow.entities.workflow_node_execution import WorkflowNodeExecutionStatus from core.workflow.nodes.base import BaseNode from core.workflow.nodes.base.entities import BaseNodeData, RetryConfig from core.workflow.nodes.enums import ErrorStrategy, NodeType +from extensions.ext_storage import storage +from services.trigger.trigger_provider_service import TriggerProviderService from .entities import PluginTriggerData @@ -56,10 +62,50 @@ class TriggerPluginNode(BaseNode): def _run(self) -> NodeRunResult: """ Run the plugin trigger node. - """ - node_data = self._node_data - return NodeRunResult( - status=WorkflowNodeExecutionStatus.SUCCEEDED, - outputs={}, - ) + This node invokes the trigger to convert request data into events + and makes them available to downstream nodes. + """ + + + # Get trigger data passed when workflow was triggered + trigger_inputs = dict(self.graph_runtime_state.variable_pool.user_inputs) + + request_id = trigger_inputs.get("request_id") + trigger_name = trigger_inputs.get("trigger_name", "") + subscription_id = trigger_inputs.get("subscription_id", "") + + if not request_id or not subscription_id: + return NodeRunResult( + status=WorkflowNodeExecutionStatus.FAILED, + inputs=trigger_inputs, + outputs={"error": "No request ID or subscription ID available"}, + ) + + try: + subscription: TriggerProviderSubscriptionApiEntity | None = TriggerProviderService.get_subscription_by_id( + tenant_id=self.tenant_id, subscription_id=subscription_id + ) + if not subscription: + raise ValueError(f"Subscription {subscription_id} not found") + + request = deserialize_request(storage.load_once(f"triggers/{request_id}")) + parameters = self._node_data.parameters if hasattr(self, "_node_data") and self._node_data else {} + invoke_response = TriggerManager.invoke_trigger( + tenant_id=self.tenant_id, + user_id=self.user_id, + provider_id=TriggerProviderID(subscription.provider), + trigger_name=trigger_name, + parameters=parameters, + credentials=subscription.credentials, + credential_type=subscription.credential_type, + request=request, + ) + outputs = invoke_response.event.variables or {} + return NodeRunResult(status=WorkflowNodeExecutionStatus.SUCCEEDED, inputs=trigger_inputs, outputs=outputs) + except Exception as e: + return NodeRunResult( + status=WorkflowNodeExecutionStatus.FAILED, + inputs=trigger_inputs, + outputs={"error": f"Failed to invoke trigger: {str(e)}", "request_id": request_id}, + ) diff --git a/api/services/trigger_debug_service.py b/api/services/trigger_debug_service.py new file mode 100644 index 0000000000..da4d566620 --- /dev/null +++ b/api/services/trigger_debug_service.py @@ -0,0 +1,200 @@ +""" +Trigger debug service for webhook debugging in draft workflows. + +This service provides debugging capabilities for trigger nodes by using +Redis Pub/Sub to enable real-time event forwarding across distributed instances. +""" + +import json +import logging +import time +import uuid +from collections.abc import Generator + +from core.trigger.entities.entities import TriggerDebugEventData +from extensions.ext_redis import redis_client + +logger = logging.getLogger(__name__) + + +class TriggerDebugService: + """ + Trigger debug service - supports distributed environments. + Cleans up resources on disconnect, no reconnection handling. + """ + + SESSION_PREFIX = "trigger_debug_session:" + SUBSCRIPTION_DEBUG_PREFIX = "trigger_debug_subscription:" + PUBSUB_CHANNEL_PREFIX = "trigger_debug_channel:" + + @classmethod + def create_debug_session( + cls, app_id: str, node_id: str, subscription_id: str, user_id: str, timeout: int = 300 + ) -> str: + """ + Create a debug session. + + Args: + app_id: Application ID + node_id: Node ID being debugged + subscription_id: Subscription ID to monitor + user_id: User ID creating the session + timeout: Session timeout in seconds + + Returns: + Session ID + """ + session_id = str(uuid.uuid4()) + + session_data = { + "session_id": session_id, + "app_id": app_id, + "node_id": node_id, + "subscription_id": subscription_id, + "user_id": user_id, + "created_at": time.time(), + } + + # 1. Save session info + redis_client.setex(f"{cls.SESSION_PREFIX}{session_id}", timeout, json.dumps(session_data)) + + # 2. Register to subscription's debug session set + redis_client.sadd(f"{cls.SUBSCRIPTION_DEBUG_PREFIX}{subscription_id}", session_id) + redis_client.expire(f"{cls.SUBSCRIPTION_DEBUG_PREFIX}{subscription_id}", timeout) + + logger.info("Created debug session %s for subscription %s", session_id, subscription_id) + return session_id + + @classmethod + def listen_for_events(cls, session_id: str, timeout: int = 300) -> Generator: + """ + Listen for events using Redis Pub/Sub. + + Args: + session_id: Debug session ID + timeout: Timeout in seconds + + Yields: + Event data or heartbeat messages + """ + pubsub = redis_client.pubsub() + channel = f"{cls.PUBSUB_CHANNEL_PREFIX}{session_id}" + + try: + # Subscribe to channel + pubsub.subscribe(channel) + logger.info("Listening on channel: %s", channel) + + start_time = time.time() + last_heartbeat = time.time() + + # Real-time listening + while time.time() - start_time < timeout: + # Non-blocking message retrieval with 1 second timeout + message = pubsub.get_message(timeout=1.0) + + if message and message["type"] == "message": + # Received trigger event + event_data = json.loads(message["data"]) + logger.info("Received trigger event for session %s", session_id) + yield event_data + break # End listening after receiving event + + # Send periodic heartbeat + if time.time() - last_heartbeat > 5: + yield {"type": "heartbeat", "remaining": int(timeout - (time.time() - start_time))} + last_heartbeat = time.time() + + # Timeout + if time.time() - start_time >= timeout: + yield {"type": "timeout"} + + except Exception as e: + logger.exception("Error in listen_for_events", exc_info=e) + yield {"type": "error", "message": str(e)} + + finally: + # Clean up resources + cls.close_session(session_id) + pubsub.unsubscribe(channel) + pubsub.close() + logger.info("Closed listening for session %s", session_id) + + @classmethod + def close_session(cls, session_id: str): + """ + Close and clean up debug session. + + Args: + session_id: Session ID to close + """ + try: + # Get session info + session_data = redis_client.get(f"{cls.SESSION_PREFIX}{session_id}") + if session_data: + session = json.loads(session_data) + subscription_id = session.get("subscription_id") + + # Remove from subscription set + if subscription_id: + redis_client.srem(f"{cls.SUBSCRIPTION_DEBUG_PREFIX}{subscription_id}", session_id) + logger.info("Removed session %s from subscription %s", session_id, subscription_id) + + # Delete session info + redis_client.delete(f"{cls.SESSION_PREFIX}{session_id}") + logger.info("Cleaned up session %s", session_id) + + except Exception as e: + logger.exception("Error closing session %s", session_id, exc_info=e) + + @classmethod + def dispatch_to_debug_sessions(cls, subscription_id: str, event_data: TriggerDebugEventData) -> int: + """ + Dispatch events to debug sessions using Pub/Sub only. + + Args: + subscription_id: Subscription ID + event_data: Event data to dispatch + + Returns: + Number of active debug sessions + """ + try: + # Get all listening debug sessions + debug_sessions = redis_client.smembers(f"{cls.SUBSCRIPTION_DEBUG_PREFIX}{subscription_id}") + + if not debug_sessions: + return 0 + + active_sessions = 0 + for session_id_bytes in debug_sessions: + if isinstance(session_id_bytes, bytes): + session_id = session_id_bytes.decode("utf-8") + else: + session_id = session_id_bytes + + # Verify session is valid + if not redis_client.exists(f"{cls.SESSION_PREFIX}{session_id}"): + # Clean up invalid session + redis_client.srem(f"{cls.SUBSCRIPTION_DEBUG_PREFIX}{subscription_id}", session_id) + continue + + # Publish event via Pub/Sub + channel = f"{cls.PUBSUB_CHANNEL_PREFIX}{session_id}" + subscriber_count = redis_client.publish(channel, json.dumps(event_data)) + + if subscriber_count > 0: + active_sessions += 1 + logger.info("Published event to %d subscribers on channel %s", subscriber_count, channel) + else: + # No subscribers, clean up session + logger.info("No subscribers for session %s, cleaning up", session_id) + cls.close_session(session_id) + + if active_sessions > 0: + logger.info("Dispatched event to %d active debug sessions", active_sessions) + + return active_sessions + except Exception as e: + logger.exception("Failed to dispatch to debug sessions", exc_info=e) + return 0 \ No newline at end of file diff --git a/api/services/trigger_service.py b/api/services/trigger_service.py index 2876ee53cd..3e6e6b9248 100644 --- a/api/services/trigger_service.py +++ b/api/services/trigger_service.py @@ -1,5 +1,5 @@ -import json import logging +import time import uuid from flask import Request, Response @@ -8,10 +8,9 @@ from sqlalchemy.orm import Session from core.plugin.entities.plugin import TriggerProviderID from core.plugin.utils.http_parser import serialize_request -from core.trigger.entities.entities import TriggerEntity +from core.trigger.entities.entities import TriggerDebugEventData, TriggerEntity from core.trigger.trigger_manager import TriggerManager from extensions.ext_database import db -from extensions.ext_redis import redis_client from extensions.ext_storage import storage from models.account import Account, TenantAccountJoin, TenantAccountRole from models.enums import WorkflowRunTriggeredFrom @@ -19,6 +18,7 @@ from models.trigger import TriggerSubscription from models.workflow import Workflow, WorkflowPluginTrigger from services.async_workflow_service import AsyncWorkflowService from services.trigger.trigger_provider_service import TriggerProviderService +from services.trigger_debug_service import TriggerDebugService from services.workflow.entities import PluginTriggerData logger = logging.getLogger(__name__) @@ -30,19 +30,27 @@ class TriggerService: __ENDPOINT_REQUEST_CACHE_EXPIRE_MS__ = 5 * 60 * 1000 @classmethod - def process_triggered_workflows( - cls, subscription: TriggerSubscription, trigger: TriggerEntity, request: Request - ) -> None: - """Process triggered workflows.""" + def dispatch_triggered_workflows( + cls, subscription: TriggerSubscription, trigger: TriggerEntity, request_id: str + ) -> int: + """Process triggered workflows. - subscribers = cls._get_subscriber_triggers(subscription=subscription, trigger=trigger) + Args: + subscription: The trigger subscription + trigger: The trigger entity that was activated + request_id: The ID of the stored request in storage system + """ + + subscribers = cls.get_subscriber_triggers( + tenant_id=subscription.tenant_id, subscription_id=subscription.id, trigger_name=trigger.identity.name + ) if not subscribers: logger.warning( "No workflows found for trigger '%s' in subscription '%s'", trigger.identity.name, subscription.id, ) - return + return 0 with Session(db.engine) as session: # Get tenant owner for workflow execution @@ -57,10 +65,10 @@ class TriggerService: if not tenant_owner: logger.error("Tenant owner not found for tenant %s", subscription.tenant_id) - return - + return 0 + dispatched_count = 0 for plugin_trigger in subscribers: - # 2. Get workflow + # Get workflow workflow = session.scalar( select(Workflow) .where( @@ -77,14 +85,7 @@ class TriggerService: ) continue - # Get trigger parameters from node configuration - node_config = workflow.get_node_config_by_id(plugin_trigger.node_id) - parameters = node_config.get("data", {}).get("parameters", {}) if node_config else {} - - # 3. Store trigger data - storage_key = cls._store_trigger_data(request, subscription, trigger, parameters) - - # 4. Create trigger data for async execution + # Create trigger data for async execution trigger_data = PluginTriggerData( app_id=plugin_trigger.app_id, tenant_id=subscription.tenant_id, @@ -92,13 +93,18 @@ class TriggerService: root_node_id=plugin_trigger.node_id, trigger_type=WorkflowRunTriggeredFrom.PLUGIN, plugin_id=subscription.provider_id, - webhook_url=f"trigger/endpoint/{subscription.endpoint_id}", # For tracking - inputs={"storage_key": storage_key}, # Pass storage key to async task + endpoint_id=subscription.endpoint_id, + inputs={ + "request_id": request_id, + "trigger_name": trigger.identity.name, + "subscription_id": subscription.id, + }, ) - # 5. Trigger async workflow + # Trigger async workflow try: AsyncWorkflowService.trigger_workflow_async(session, tenant_owner, trigger_data) + dispatched_count += 1 logger.info( "Triggered workflow for app %s with trigger %s", plugin_trigger.app_id, @@ -110,21 +116,37 @@ class TriggerService: plugin_trigger.app_id, ) + return dispatched_count + @classmethod - def select_triggers(cls, controller, dispatch_response, provider_id, subscription) -> list[TriggerEntity]: - triggers = [] - for trigger_name in dispatch_response.triggers: - trigger = controller.get_trigger(trigger_name) - if trigger is None: - logger.error( - "Trigger '%s' not found in provider '%s' for tenant '%s'", - trigger_name, - provider_id, - subscription.tenant_id, - ) - raise ValueError(f"Trigger '{trigger_name}' not found") - triggers.append(trigger) - return triggers + def dispatch_debugging_sessions( + cls, subscription_id: str, request: Request, triggers: list[str], request_id: str + ) -> int: + """ + Dispatch to debug sessions - simplified version. + + Args: + subscription_id: Subscription ID + request: Original request + triggers: List of trigger names + request_id: Request ID for storage reference + """ + try: + # Prepare streamlined event data using Pydantic model + debug_data = TriggerDebugEventData( + subscription_id=subscription_id, + triggers=triggers, + request_id=request_id, + timestamp=time.time(), + ) + return TriggerDebugService.dispatch_to_debug_sessions( + subscription_id=subscription_id, event_data=debug_data + ) + + except Exception as e: + # Silent failure, don't affect production + logger.exception("Debug dispatch failed", exc_info=e) + return 0 @classmethod def process_endpoint(cls, endpoint_id: str, request: Request) -> Response | None: @@ -167,53 +189,16 @@ class TriggerService: return dispatch_response.response @classmethod - def _get_subscriber_triggers( - cls, subscription: TriggerSubscription, trigger: TriggerEntity + def get_subscriber_triggers( + cls, tenant_id: str, subscription_id: str, trigger_name: str ) -> list[WorkflowPluginTrigger]: """Get WorkflowPluginTriggers for a subscription and trigger.""" with Session(db.engine, expire_on_commit=False) as session: subscribers = session.scalars( select(WorkflowPluginTrigger).where( - WorkflowPluginTrigger.tenant_id == subscription.tenant_id, - WorkflowPluginTrigger.subscription_id == subscription.id, - WorkflowPluginTrigger.trigger_name == trigger.identity.name, + WorkflowPluginTrigger.tenant_id == tenant_id, + WorkflowPluginTrigger.subscription_id == subscription_id, + WorkflowPluginTrigger.trigger_name == trigger_name, ) ).all() return list(subscribers) - - @classmethod - def _store_trigger_data( - cls, - request: Request, - subscription: TriggerSubscription, - trigger: TriggerEntity, - parameters: dict, - ) -> str: - """Store trigger data in storage and return key.""" - storage_key = f"trigger_data_{uuid.uuid4().hex}" - - # Prepare data to store - trigger_data = { - "request": { - "method": request.method, - "headers": dict(request.headers), - "query_params": dict(request.args), - "body": request.get_data(as_text=True), - }, - "subscription": { - "id": subscription.id, - "provider_id": subscription.provider_id, - "credentials": subscription.credentials, - "credential_type": subscription.credential_type, - }, - "trigger": { - "name": trigger.identity.name, - "parameters": parameters, - }, - "user_id": subscription.user_id, - } - - # Store with 1 hour TTL using Redis - redis_client.setex(storage_key, 3600, json.dumps(trigger_data)) - - return storage_key diff --git a/api/services/workflow/entities.py b/api/services/workflow/entities.py index 6c6da42bb9..2a199642b2 100644 --- a/api/services/workflow/entities.py +++ b/api/services/workflow/entities.py @@ -55,7 +55,7 @@ class PluginTriggerData(TriggerData): trigger_type: WorkflowRunTriggeredFrom = WorkflowRunTriggeredFrom.PLUGIN plugin_id: str - webhook_url: str + endpoint_id: str class WorkflowTaskData(BaseModel): diff --git a/api/tasks/trigger_processing_tasks.py b/api/tasks/trigger_processing_tasks.py index 4b5add56e0..d248f9eb3a 100644 --- a/api/tasks/trigger_processing_tasks.py +++ b/api/tasks/trigger_processing_tasks.py @@ -88,12 +88,11 @@ def dispatch_triggered_workflows_async( ) continue - TriggerService.process_triggered_workflows( + dispatched_count += TriggerService.dispatch_triggered_workflows( subscription=subscription, trigger=trigger, - request=request, + request_id=request_id, ) - dispatched_count += 1 except Exception: logger.exception( @@ -104,6 +103,18 @@ def dispatch_triggered_workflows_async( # Continue processing other triggers even if one fails continue + # Dispatch to debug sessions after processing all triggers + try: + debug_dispatched = TriggerService.dispatch_debugging_sessions( + subscription_id=subscription_id, + request=request, + triggers=triggers, + request_id=request_id, + ) + except Exception: + # Silent failure for debug dispatch + logger.exception("Failed to dispatch to debug sessions") + logger.info( "Completed async trigger dispatching: processed %d/%d triggers", dispatched_count, @@ -117,8 +128,9 @@ def dispatch_triggered_workflows_async( return { "status": "completed", - "dispatched_count": dispatched_count, "total_count": len(triggers), + "dispatched_count": dispatched_count, + "debug_dispatched_count": debug_dispatched, } except Exception as e: