diff --git a/api/controllers/console/app/workflow.py b/api/controllers/console/app/workflow.py index ab087bdf44..f20f074c6c 100644 --- a/api/controllers/console/app/workflow.py +++ b/api/controllers/console/app/workflow.py @@ -22,9 +22,9 @@ from controllers.web.error import InvokeRateLimitError as InvokeRateLimitHttpErr from core.app.app_config.features.file_upload.manager import FileUploadConfigManager from core.app.apps.base_app_queue_manager import AppQueueManager from core.app.entities.app_invoke_entities import InvokeFrom -from core.app.entities.task_entities import ErrorStreamResponse, TriggerNodeFinishedResponse, TriggerTriggeredResponse from core.file.models import File from core.helper.trace_id_helper import get_external_trace_id +from core.model_runtime.utils.encoders import jsonable_encoder from extensions.ext_database import db from factories import file_factory, variable_factory from fields.workflow_fields import workflow_fields, workflow_pagination_fields @@ -39,6 +39,7 @@ from models.workflow import Workflow from services.app_generate_service import AppGenerateService from services.errors.app import WorkflowHashNotEqualError from services.errors.llm import InvokeRateLimitError +from services.trigger_debug_service import TriggerDebugService from services.workflow_service import DraftWorkflowDeletionError, WorkflowInUseError, WorkflowService logger = logging.getLogger(__name__) @@ -809,7 +810,7 @@ class DraftWorkflowNodeLastRunApi(Resource): class DraftWorkflowTriggerNodeApi(Resource): """ - Single node debug - Listen for trigger events and execute single Trigger node + Single node debug - Polling API for trigger events Path: /apps//workflows/draft/nodes//trigger """ @@ -819,88 +820,57 @@ class DraftWorkflowTriggerNodeApi(Resource): @get_app_model(mode=[AppMode.WORKFLOW]) def post(self, app_model: App, node_id: str): """ - Debug trigger node by listening for events and executing the node + Poll for trigger events and execute single node when event arrives """ if not isinstance(current_user, Account) or not current_user.is_editor: raise Forbidden() parser = reqparse.RequestParser() - parser.add_argument("timeout", type=int, default=300, location="json") + parser.add_argument("trigger_name", type=str, required=True, location="json") + parser.add_argument("subscription_id", type=str, required=True, location="json") args = parser.parse_args() + trigger_name = args["trigger_name"] + subscription_id = args["subscription_id"] + event = TriggerDebugService.poll_event( + tenant_id=app_model.tenant_id, + user_id=current_user.id, + app_id=app_model.id, + subscription_id=subscription_id, + node_id=node_id, + trigger_name=trigger_name, + ) + if not event: + return jsonable_encoder({"status": "waiting"}) - from core.trigger.entities.entities import TriggerEventData, TriggerInputs - from services.trigger_debug_service import TriggerDebugService - from services.workflow_service import WorkflowService + try: + workflow_service = WorkflowService() + draft_workflow = workflow_service.get_draft_workflow(app_model) + if not draft_workflow: + raise ValueError("Workflow not found") - def generate(current_user: Account): - # Phase 1: Listen for trigger events - trigger_data = None - for event in TriggerDebugService.waiting_for_triggered( + user_inputs = event.model_dump() + node_execution = workflow_service.run_draft_workflow_node( app_model=app_model, + draft_workflow=draft_workflow, node_id=node_id, - user_id=current_user.id, - timeout=args.get("timeout", 300), - ): - yield event.to_dict() # Pass through all listening events - - # Check if we received the trigger - if isinstance(event, TriggerTriggeredResponse): - # Save trigger data and exit listening loop - trigger_data = TriggerEventData( - subscription_id=event.subscription_id, - triggers=event.triggers, - request_id=event.request_id, - timestamp=event.timestamp, - ) - break - - # Phase 2: Execute node if trigger was received - if trigger_data: - # Create trigger inputs - trigger_inputs = TriggerInputs.from_trigger_data(trigger_data) - try: - # Get workflow and execute node - workflow_service = WorkflowService() - draft_workflow = workflow_service.get_draft_workflow(app_model) - if not draft_workflow: - raise ValueError("Workflow not found") - - node_execution = workflow_service.run_draft_workflow_node( - app_model=app_model, - draft_workflow=draft_workflow, - node_id=node_id, - user_inputs=trigger_inputs.to_dict(), - account=current_user, - query="", - files=[], - ) - - # Generate node finished event - yield TriggerNodeFinishedResponse( - task_id="", - id=node_execution.id, - node_id=node_execution.node_id, - node_type=node_execution.node_type, - status=node_execution.status, - outputs=node_execution.outputs_dict, - error=node_execution.error, - elapsed_time=node_execution.elapsed_time, - execution_metadata=node_execution.execution_metadata_dict, - ).to_dict() - - except Exception as e: - yield ErrorStreamResponse(task_id="", err=e).to_dict() - - # Use standard response format - from core.app.apps.base_app_generator import BaseAppGenerator - - response = BaseAppGenerator.convert_to_event_stream(generate(current_user)) - return helper.compact_generate_response(response) + user_inputs=user_inputs, + account=current_user, + query="", + files=[], + ) + return jsonable_encoder(node_execution) + except Exception: + logger.exception("Error running draft workflow trigger node") + return jsonable_encoder( + { + "status": "error", + } + ), 500 class DraftWorkflowTriggerRunApi(Resource): """ - Full workflow debug - Listen for trigger events and execute complete workflow + Full workflow debug - Polling API for trigger events Path: /apps//workflows/draft/trigger/run """ @@ -910,69 +880,58 @@ class DraftWorkflowTriggerRunApi(Resource): @get_app_model(mode=[AppMode.WORKFLOW]) def post(self, app_model: App): """ - Debug trigger workflow by listening for events and running full workflow + Poll for trigger events and execute full workflow when event arrives """ if not isinstance(current_user, Account) or not current_user.is_editor: raise Forbidden() + parser = reqparse.RequestParser() parser.add_argument("node_id", type=str, required=True, location="json") - parser.add_argument("timeout", type=int, default=300, location="json") + parser.add_argument("trigger_name", type=str, required=True, location="json") + parser.add_argument("subscription_id", type=str, required=True, location="json") args = parser.parse_args() + node_id = args["node_id"] + trigger_name = args["trigger_name"] + subscription_id = args["subscription_id"] - from core.app.entities.app_invoke_entities import InvokeFrom - from core.trigger.entities.entities import TriggerEventData, TriggerInputs - from services.app_generate_service import AppGenerateService - from services.trigger_debug_service import TriggerDebugService + event = TriggerDebugService.poll_event( + tenant_id=app_model.tenant_id, + user_id=current_user.id, + app_id=app_model.id, + subscription_id=subscription_id, + node_id=node_id, + trigger_name=trigger_name, + ) + if not event: + return jsonable_encoder({"status": "waiting"}) - def generate(current_user: Account): - # Phase 1: Listen for trigger events - trigger_data = None - for event in TriggerDebugService.waiting_for_triggered( + workflow_args = { + "inputs": event.model_dump(), + "query": "", + "files": [], + } + external_trace_id = get_external_trace_id(request) + if external_trace_id: + workflow_args["external_trace_id"] = external_trace_id + + try: + response = AppGenerateService.generate( app_model=app_model, - node_id=args["node_id"], - user_id=current_user.id, - timeout=args.get("timeout", 300), - ): - yield event.to_dict() - - # Check if we received the trigger - if isinstance(event, TriggerTriggeredResponse): - # Save trigger data and exit listening loop - trigger_data = TriggerEventData( - subscription_id=event.subscription_id, - triggers=event.triggers, - request_id=event.request_id, - timestamp=event.timestamp, - ) - break - - # Phase 2: Execute workflow if trigger was received - if trigger_data: - # Create trigger inputs and convert to workflow args - trigger_inputs = TriggerInputs.from_trigger_data(trigger_data) - combined_args = trigger_inputs.to_workflow_args() - - try: - # Execute workflow - workflow_response = AppGenerateService.generate( - app_model=app_model, - user=current_user, - args=combined_args, - invoke_from=InvokeFrom.DEBUGGER, - streaming=True, - ) - - # Pass through workflow's standard event stream - yield from workflow_response - - except Exception as e: - yield ErrorStreamResponse(task_id="", err=e).to_dict() - - # Use standard response format - from core.app.apps.base_app_generator import BaseAppGenerator - - response = BaseAppGenerator.convert_to_event_stream(generate(current_user)) - return helper.compact_generate_response(response) + user=current_user, + args=workflow_args, + invoke_from=InvokeFrom.DEBUGGER, + streaming=True, + ) + return helper.compact_generate_response(response) + except InvokeRateLimitError as ex: + raise InvokeRateLimitHttpError(ex.description) + except Exception: + logger.exception("Error running draft workflow trigger run") + return jsonable_encoder( + { + "status": "error", + } + ), 500 api.add_resource( diff --git a/api/controllers/console/workspace/trigger_providers.py b/api/controllers/console/workspace/trigger_providers.py index 3fdadea8ac..95828bc32b 100644 --- a/api/controllers/console/workspace/trigger_providers.py +++ b/api/controllers/console/workspace/trigger_providers.py @@ -92,6 +92,17 @@ class TriggerSubscriptionBuilderCreateApi(Resource): raise +class TriggerSubscriptionBuilderGetApi(Resource): + @setup_required + @login_required + @account_initialization_required + def get(self, provider, subscription_builder_id): + """Get a subscription instance for a trigger provider""" + return jsonable_encoder( + TriggerSubscriptionBuilderService.get_subscription_builder_by_id(subscription_builder_id) + ) + + class TriggerSubscriptionBuilderVerifyApi(Resource): @setup_required @login_required @@ -332,6 +343,7 @@ class TriggerOAuthAuthorizeApi(Resource): { "authorization_url": authorization_url_response.authorization_url, "subscription_builder_id": subscription_builder.id, + "subscription_builder": subscription_builder, } ) ) @@ -532,6 +544,10 @@ api.add_resource( TriggerSubscriptionBuilderCreateApi, "/workspaces/current/trigger-provider//subscriptions/builder/create", ) +api.add_resource( + TriggerSubscriptionBuilderGetApi, + "/workspaces/current/trigger-provider//subscriptions/builder/", +) api.add_resource( TriggerSubscriptionBuilderUpdateApi, "/workspaces/current/trigger-provider//subscriptions/builder/update/", diff --git a/api/core/app/entities/queue_entities.py b/api/core/app/entities/queue_entities.py index 730f71beda..4cb3d219e4 100644 --- a/api/core/app/entities/queue_entities.py +++ b/api/core/app/entities/queue_entities.py @@ -51,11 +51,6 @@ class QueueEvent(StrEnum): PING = "ping" STOP = "stop" RETRY = "retry" - TRIGGER_LISTENING_STARTED = "trigger_listening_started" - TRIGGER_TRIGGERED = "trigger_triggered" - TRIGGER_NODE_FINISHED = "trigger_node_finished" - TRIGGER_LISTENING_TIMEOUT = "trigger_listening_timeout" - class AppQueueEvent(BaseModel): """ @@ -722,54 +717,6 @@ class QueueParallelBranchRunSucceededEvent(AppQueueEvent): """loop id if node is in loop""" -class QueueTriggerListeningStartedEvent(AppQueueEvent): - """ - QueueTriggerListeningStartedEvent entity - """ - - event: QueueEvent = QueueEvent.TRIGGER_LISTENING_STARTED - session_id: str - webhook_url: str - timeout: int - - -class QueueTriggerTriggeredEvent(AppQueueEvent): - """ - QueueTriggerTriggeredEvent entity - """ - - event: QueueEvent = QueueEvent.TRIGGER_TRIGGERED - subscription_id: str - triggers: list[str] - request_id: str - timestamp: float - - -class QueueTriggerNodeFinishedEvent(AppQueueEvent): - """ - QueueTriggerNodeFinishedEvent entity - """ - - event: QueueEvent = QueueEvent.TRIGGER_NODE_FINISHED - id: str - node_id: str - node_type: str - status: str - outputs: Optional[Mapping[str, Any]] = None - error: Optional[str] = None - elapsed_time: Optional[float] = None - execution_metadata: Optional[Mapping[str, Any]] = None - - -class QueueTriggerListeningTimeoutEvent(AppQueueEvent): - """ - QueueTriggerListeningTimeoutEvent entity - """ - - event: QueueEvent = QueueEvent.TRIGGER_LISTENING_TIMEOUT - error: str = "Timeout waiting for trigger" - - class QueueParallelBranchRunFailedEvent(AppQueueEvent): """ QueueParallelBranchRunFailedEvent entity diff --git a/api/core/app/entities/task_entities.py b/api/core/app/entities/task_entities.py index 3921e5dbde..a1c0368354 100644 --- a/api/core/app/entities/task_entities.py +++ b/api/core/app/entities/task_entities.py @@ -82,11 +82,6 @@ class StreamEvent(Enum): TEXT_CHUNK = "text_chunk" TEXT_REPLACE = "text_replace" AGENT_LOG = "agent_log" - # Trigger debug events - TRIGGER_LISTENING_STARTED = "trigger_listening_started" - TRIGGER_LISTENING_TIMEOUT = "trigger_listening_timeout" - TRIGGER_TRIGGERED = "trigger_triggered" - TRIGGER_NODE_FINISHED = "trigger_node_finished" class StreamResponse(BaseModel): @@ -842,52 +837,3 @@ class AgentLogStreamResponse(StreamResponse): event: StreamEvent = StreamEvent.AGENT_LOG data: Data - - -# Trigger Debug Stream Responses -class TriggerListeningStartedResponse(StreamResponse): - """ - TriggerListeningStartedResponse entity - """ - - event: StreamEvent = StreamEvent.TRIGGER_LISTENING_STARTED - session_id: str - webhook_url: str - timeout: int - - -class TriggerTriggeredResponse(StreamResponse): - """ - TriggerTriggeredResponse entity - """ - - event: StreamEvent = StreamEvent.TRIGGER_TRIGGERED - subscription_id: str - triggers: list[str] - request_id: str - timestamp: float - - -class TriggerNodeFinishedResponse(StreamResponse): - """ - TriggerNodeFinishedResponse entity - """ - - event: StreamEvent = StreamEvent.TRIGGER_NODE_FINISHED - id: str - node_id: str - node_type: str - status: str - outputs: Optional[Mapping[str, Any]] = None - error: Optional[str] = None - elapsed_time: float - execution_metadata: Optional[Mapping[str, Any]] = None - - -class TriggerListeningTimeoutResponse(StreamResponse): - """ - TriggerListeningTimeoutResponse entity - """ - - event: StreamEvent = StreamEvent.TRIGGER_LISTENING_TIMEOUT - error: str = "Timeout waiting for trigger" diff --git a/api/core/trigger/entities/entities.py b/api/core/trigger/entities/entities.py index 6010028553..1e8cbae240 100644 --- a/api/core/trigger/entities/entities.py +++ b/api/core/trigger/entities/entities.py @@ -269,15 +269,6 @@ class TriggerInputs(BaseModel): trigger_name: str subscription_id: str - @classmethod - def from_trigger_data(cls, trigger_data: TriggerEventData) -> "TriggerInputs": - """Create from debug event data.""" - return cls( - request_id=trigger_data.request_id, - trigger_name=trigger_data.triggers[0] if trigger_data.triggers else "", - subscription_id=trigger_data.subscription_id, - ) - @classmethod def from_trigger_entity(cls, request_id: str, subscription_id: str, trigger: TriggerEntity) -> "TriggerInputs": """Create from trigger entity (for production).""" diff --git a/api/core/workflow/nodes/enums.py b/api/core/workflow/nodes/enums.py index a09816f4fe..f87cb6b042 100644 --- a/api/core/workflow/nodes/enums.py +++ b/api/core/workflow/nodes/enums.py @@ -29,6 +29,15 @@ class NodeType(StrEnum): TRIGGER_SCHEDULE = "trigger-schedule" TRIGGER_PLUGIN = "trigger-plugin" + @property + def is_start_node(self) -> bool: + return self in [ + NodeType.START, + NodeType.TRIGGER_WEBHOOK, + NodeType.TRIGGER_SCHEDULE, + NodeType.TRIGGER_PLUGIN, + ] + class ErrorStrategy(StrEnum): FAIL_BRANCH = "fail-branch" diff --git a/api/services/trigger/trigger_subscription_builder_service.py b/api/services/trigger/trigger_subscription_builder_service.py index 9a4211847c..ccde48a71d 100644 --- a/api/services/trigger/trigger_subscription_builder_service.py +++ b/api/services/trigger/trigger_subscription_builder_service.py @@ -204,6 +204,7 @@ class TriggerSubscriptionBuilderService: def builder_to_api_entity( cls, controller: PluginTriggerProviderController, entity: SubscriptionBuilder ) -> SubscriptionBuilderApiEntity: + credential_type = CredentialType.of(entity.credential_type or CredentialType.UNAUTHORIZED.value) return SubscriptionBuilderApiEntity( id=entity.id, name=entity.name or "", @@ -211,9 +212,9 @@ class TriggerSubscriptionBuilderService: endpoint=parse_endpoint_id(entity.endpoint_id), parameters=entity.parameters, properties=entity.properties, - credential_type=CredentialType(entity.credential_type), + credential_type=credential_type, credentials=masked_credentials( - schemas=controller.get_credentials_schema(CredentialType(entity.credential_type)), + schemas=controller.get_credentials_schema(credential_type), credentials=entity.credentials, ), ) @@ -301,3 +302,16 @@ class TriggerSubscriptionBuilderService: # append the request log cls.append_log(endpoint_id, request, response.response) return response.response + + @classmethod + def get_subscription_builder_by_id(cls, subscription_builder_id: str) -> SubscriptionBuilderApiEntity: + """Get a trigger subscription builder API entity.""" + subscription_builder = cls.get_subscription_builder(subscription_builder_id) + if not subscription_builder: + raise ValueError(f"Subscription builder {subscription_builder_id} not found") + return cls.builder_to_api_entity( + controller=TriggerManager.get_trigger_provider( + subscription_builder.tenant_id, TriggerProviderID(subscription_builder.provider_id) + ), + entity=subscription_builder, + ) diff --git a/api/services/trigger_debug_service.py b/api/services/trigger_debug_service.py index c2a10b7e96..707e1a78e9 100644 --- a/api/services/trigger_debug_service.py +++ b/api/services/trigger_debug_service.py @@ -1,372 +1,125 @@ -""" -Trigger debug service for webhook debugging in draft workflows. +"""Trigger debug service for webhook debugging in draft workflows.""" -This service provides debugging capabilities for trigger nodes by using -Redis Pub/Sub to enable real-time event forwarding across distributed instances. -""" - -import json +import hashlib import logging -import time -import uuid -from collections.abc import Generator -from dataclasses import dataclass +from typing import Optional -from flask import request -from werkzeug.exceptions import NotFound +from pydantic import BaseModel +from redis import RedisError -from core.app.entities.task_entities import ( - ErrorStreamResponse, - PingStreamResponse, - StreamResponse, - TriggerListeningStartedResponse, - TriggerListeningTimeoutResponse, - TriggerTriggeredResponse, -) -from core.trigger.entities.entities import TriggerEventData from extensions.ext_redis import redis_client -from models.model import App -from services.trigger.trigger_provider_service import TriggerProviderService -from services.workflow_service import WorkflowService logger = logging.getLogger(__name__) +TRIGGER_DEBUG_EVENT_TTL = 300 -@dataclass -class TriggerDebuggingContext: - """Context for trigger debugging session.""" - - session_id: str +class TriggerDebugEvent(BaseModel): subscription_id: str - webhook_url: str - node_id: str - app_id: str - user_id: str - timeout: int + request_id: str + timestamp: int class TriggerDebugService: """ - Trigger debug service - supports distributed environments. - Cleans up resources on disconnect, no reconnection handling. + Redis-based trigger debug service with polling support. + Uses {tenant_id} hash tags for Redis Cluster compatibility. """ - SESSION_PREFIX = "trigger_debug_session:" - SUBSCRIPTION_DEBUG_PREFIX = "trigger_debug_subscription:" - PUBSUB_CHANNEL_PREFIX = "trigger_debug_channel:" + # LUA_SELECT: Atomic poll or register for event + # KEYS[1] = trigger_debug_inbox:{tenant_id}:{address_id} + # KEYS[2] = trigger_debug_waiting_pool:{tenant_id}:{subscription_id}:{trigger} + # ARGV[1] = address_id + # compressed lua code, you can use LLM to uncompress it + LUA_SELECT = ( + "local v=redis.call('GET',KEYS[1]);" + "if v then redis.call('DEL',KEYS[1]);return v end;" + "redis.call('SADD',KEYS[2],ARGV[1]);" + f"redis.call('EXPIRE',KEYS[2],{TRIGGER_DEBUG_EVENT_TTL});" + "return false" + ) - __DEFAULT_LISTEN_TIMEOUT__ = 300 + # LUA_DISPATCH: Dispatch event to all waiting addresses + # KEYS[1] = trigger_debug_waiting_pool:{tenant_id}:{subscription_id}:{trigger} + # ARGV[1] = tenant_id + # ARGV[2] = event_json + # compressed lua code, you can use LLM to uncompress it + LUA_DISPATCH = ( + "local a=redis.call('SMEMBERS',KEYS[1]);" + "if #a==0 then return 0 end;" + "redis.call('DEL',KEYS[1]);" + "for i=1,#a do " + f"redis.call('SET','trigger_debug_inbox:{{'..ARGV[1]..'}}'..':'..a[i],ARGV[2],'EX',{TRIGGER_DEBUG_EVENT_TTL});" + "end;" + "return #a" + ) @classmethod - def build_debugging_context( + def address(cls, tenant_id: str, user_id: str, app_id: str, node_id: str) -> str: + address_id = hashlib.sha1(f"{user_id}|{app_id}|{node_id}".encode()).hexdigest() + return f"trigger_debug_inbox:{{{tenant_id}}}:{address_id}" + + @classmethod + def waiting_pool(cls, tenant_id: str, subscription_id: str, trigger_name: str) -> str: + return f"trigger_debug_waiting_pool:{{{tenant_id}}}:{subscription_id}:{trigger_name}" + + @classmethod + def dispatch_debug_event( cls, - app_model: App, - node_id: str, - user_id: str, - timeout: int = __DEFAULT_LISTEN_TIMEOUT__, - ) -> TriggerDebuggingContext: - """ - Build debugging context for trigger node. - - Args: - app_model: Application model - node_id: Node ID to debug - user_id: User ID creating the session - timeout: Session timeout in seconds - - Returns: - TriggerDebuggingContext with all debugging information - - Raises: - NotFound: If workflow or node not found - ValueError: If node is not a trigger plugin or has no subscription - """ - # Get and validate workflow - workflow_service = WorkflowService() - draft_workflow = workflow_service.get_draft_workflow(app_model) - if not draft_workflow: - raise NotFound("Workflow not found") - - # Get and validate node - node_config = draft_workflow.get_node_config_by_id(node_id) - if not node_config: - raise NotFound(f"Node {node_id} not found") - - if node_config.get("data", {}).get("type") != "trigger-plugin": - raise ValueError("Node is not a trigger plugin node") - - subscription_id = node_config.get("data", {}).get("subscription_id") - if not subscription_id: - raise ValueError("No subscription configured for this trigger node") - - # Create debug session - app_id = str(app_model.id) - session_id = cls.create_debug_session( - app_id=app_id, - node_id=node_id, + tenant_id: str, + subscription_id: str, + triggers: list[str], + request_id: str, + timestamp: int, + ) -> int: + event_json = TriggerDebugEvent( subscription_id=subscription_id, - user_id=user_id, - timeout=timeout, - ) + request_id=request_id, + timestamp=timestamp, + ).model_dump_json() - # Get webhook URL - subscription = TriggerProviderService.get_subscription_by_id( - tenant_id=app_model.tenant_id, subscription_id=subscription_id - ) - webhook_url = ( - f"{request.host_url.rstrip('/')}/trigger/plugin/{subscription.endpoint}" if subscription else "Unknown" - ) + dispatched = 0 + if len(triggers) > 10: + logger.warning( + "Too many triggers to dispatch at once: %d triggers tenant: %s subscription: %s", + len(triggers), + tenant_id, + subscription_id, + ) - return TriggerDebuggingContext( - session_id=session_id, - subscription_id=subscription_id, - webhook_url=webhook_url, - node_id=node_id, - app_id=app_id, - user_id=user_id, - timeout=timeout, - ) + for trigger_name in triggers: + try: + dispatched += redis_client.eval( + cls.LUA_DISPATCH, + 1, + cls.waiting_pool(tenant_id, subscription_id, trigger_name), + tenant_id, + event_json, + ) + except RedisError: + logger.exception("Failed to dispatch for trigger: %s", trigger_name) + return dispatched @classmethod - def waiting_for_triggered( + def poll_event( cls, - app_model: App, - node_id: str, + tenant_id: str, user_id: str, - timeout: int = __DEFAULT_LISTEN_TIMEOUT__, - ) -> Generator[StreamResponse, None, None]: - """ - Listen for trigger events only. - - This method sets up a debug session and listens for incoming trigger events. - It yields events as they occur and returns when a trigger is received or timeout occurs. - - Args: - app_model: Application model - node_id: Node ID to debug - user_id: User ID creating the session - timeout: Timeout in seconds - - Yields: - Event dictionaries including: - - listening_started: Initial event with webhook URL - - ping: Periodic heartbeat events - - trigger_debug_received: When trigger is received - - timeout: When timeout occurs - - error: On any errors - """ - # Build debugging context - context = cls.build_debugging_context(app_model, node_id, user_id, timeout) - - # Listen for events and pass them through - for event in cls.listen_for_events( - session_id=context.session_id, webhook_url=context.webhook_url, timeout=context.timeout - ): - yield event - - # If we received a trigger, listening is complete - if isinstance(event, dict) and event.get("event") == "trigger_triggered": - break - - @classmethod - def create_debug_session( - cls, app_id: str, node_id: str, subscription_id: str, user_id: str, timeout: int = __DEFAULT_LISTEN_TIMEOUT__ - ) -> str: - """ - Create a debug session. - - Args: - app_id: Application ID - node_id: Node ID being debugged - subscription_id: Subscription ID to monitor - user_id: User ID creating the session - timeout: Session timeout in seconds - - Returns: - Session ID - """ - session_id = str(uuid.uuid4()) - - session_data = { - "session_id": session_id, - "app_id": app_id, - "node_id": node_id, - "subscription_id": subscription_id, - "user_id": user_id, - "created_at": time.time(), - } - - # 1. Save session info - redis_client.setex(f"{cls.SESSION_PREFIX}{session_id}", timeout, json.dumps(session_data)) - - # 2. Register to subscription's debug session set - redis_client.sadd(f"{cls.SUBSCRIPTION_DEBUG_PREFIX}{subscription_id}", session_id) - redis_client.expire(f"{cls.SUBSCRIPTION_DEBUG_PREFIX}{subscription_id}", timeout) - - logger.info("Created debug session %s for subscription %s", session_id, subscription_id) - return session_id - - @classmethod - def listen_for_events( - cls, session_id: str, webhook_url: str, timeout: int = __DEFAULT_LISTEN_TIMEOUT__ - ) -> Generator[StreamResponse, None, None]: - """ - Listen for events using Redis Pub/Sub and generate structured events. - - Args: - session_id: Debug session ID - webhook_url: Webhook URL for the trigger - timeout: Timeout in seconds - - Yields: - Structured AppQueueEvent objects - """ - # Send initial listening started event - yield TriggerListeningStartedResponse( - task_id="", # Will be set by the caller if needed - session_id=session_id, - webhook_url=webhook_url, - timeout=timeout, - ) - pubsub = redis_client.pubsub() - channel = f"{cls.PUBSUB_CHANNEL_PREFIX}{session_id}" + app_id: str, + subscription_id: str, + node_id: str, + trigger_name: str, + ) -> Optional[TriggerDebugEvent]: + address_id = hashlib.sha1(f"{user_id}|{app_id}|{node_id}".encode()).hexdigest() try: - # Subscribe to channel - pubsub.subscribe(channel) - logger.info("Listening on channel: %s", channel) - - start_time = time.time() - last_heartbeat = time.time() - - # Real-time listening - while time.time() - start_time < timeout: - # Non-blocking message retrieval with 1 second timeout - message = pubsub.get_message(timeout=1.0) - - if message and message["type"] == "message": - # Received trigger event - parse and create structured event - try: - event_data = json.loads(message["data"]) - logger.info("Received trigger event for session %s", session_id) - - # Create structured trigger received event - trigger_data = TriggerEventData( - subscription_id=event_data["subscription_id"], - triggers=event_data["triggers"], - request_id=event_data["request_id"], - timestamp=event_data.get("timestamp", time.time()), - ) - yield TriggerTriggeredResponse( - task_id="", - subscription_id=trigger_data.subscription_id, - triggers=trigger_data.triggers, - request_id=trigger_data.request_id, - timestamp=trigger_data.timestamp, - ) - break # End listening after receiving event - except (json.JSONDecodeError, KeyError) as e: - logger.exception("Failed to parse trigger event") - yield ErrorStreamResponse( - task_id="", err=Exception(f"Failed to parse trigger event: {str(e)}") - ) - break - - # Send periodic heartbeat - if time.time() - last_heartbeat > 5: - yield PingStreamResponse(task_id="") - last_heartbeat = time.time() - - # Timeout - if time.time() - start_time >= timeout: - yield TriggerListeningTimeoutResponse(task_id="") - - except Exception as e: - logger.exception("Error in listen_for_events", exc_info=e) - yield ErrorStreamResponse(task_id="", err=e) - - finally: - # Clean up resources - cls.close_session(session_id) - pubsub.unsubscribe(channel) - pubsub.close() - logger.info("Closed listening for session %s", session_id) - - @classmethod - def close_session(cls, session_id: str): - """ - Close and clean up debug session. - - Args: - session_id: Session ID to close - """ - try: - # Get session info - session_data = redis_client.get(f"{cls.SESSION_PREFIX}{session_id}") - if session_data: - session = json.loads(session_data) - subscription_id = session.get("subscription_id") - - # Remove from subscription set - if subscription_id: - redis_client.srem(f"{cls.SUBSCRIPTION_DEBUG_PREFIX}{subscription_id}", session_id) - logger.info("Removed session %s from subscription %s", session_id, subscription_id) - - # Delete session info - redis_client.delete(f"{cls.SESSION_PREFIX}{session_id}") - logger.info("Cleaned up session %s", session_id) - - except Exception as e: - logger.exception("Error closing session %s", session_id, exc_info=e) - - @classmethod - def dispatch_to_debug_sessions(cls, subscription_id: str, event_data: TriggerEventData) -> int: - """ - Dispatch events to debug sessions using Pub/Sub only. - - Args: - subscription_id: Subscription ID - event_data: Event data to dispatch - - Returns: - Number of active debug sessions - """ - try: - # Get all listening debug sessions - debug_sessions = redis_client.smembers(f"{cls.SUBSCRIPTION_DEBUG_PREFIX}{subscription_id}") - - if not debug_sessions: - return 0 - - active_sessions = 0 - for session_id_bytes in debug_sessions: - if isinstance(session_id_bytes, bytes): - session_id = session_id_bytes.decode("utf-8") - else: - session_id = session_id_bytes - - # Verify session is valid - if not redis_client.exists(f"{cls.SESSION_PREFIX}{session_id}"): - # Clean up invalid session - redis_client.srem(f"{cls.SUBSCRIPTION_DEBUG_PREFIX}{subscription_id}", session_id) - continue - - # Publish event via Pub/Sub - channel = f"{cls.PUBSUB_CHANNEL_PREFIX}{session_id}" - subscriber_count = redis_client.publish(channel, json.dumps(event_data.model_dump())) - - if subscriber_count > 0: - active_sessions += 1 - logger.info("Published event to %d subscribers on channel %s", subscriber_count, channel) - else: - # No subscribers, clean up session - logger.info("No subscribers for session %s, cleaning up", session_id) - cls.close_session(session_id) - - if active_sessions > 0: - logger.info("Dispatched event to %d active debug sessions", active_sessions) - - return active_sessions - except Exception as e: - logger.exception("Failed to dispatch to debug sessions", exc_info=e) - return 0 + event = redis_client.eval( + cls.LUA_SELECT, + 2, + cls.address(tenant_id, user_id, app_id, node_id), + cls.waiting_pool(tenant_id, subscription_id, trigger_name), + address_id, + ) + return TriggerDebugEvent.model_validate_json(event) if event else None + except RedisError: + logger.exception("Failed to poll debug event") + return None diff --git a/api/services/trigger_service.py b/api/services/trigger_service.py index a44dcbb5bf..488ec14805 100644 --- a/api/services/trigger_service.py +++ b/api/services/trigger_service.py @@ -8,7 +8,7 @@ from sqlalchemy.orm import Session from core.plugin.entities.plugin import TriggerProviderID from core.plugin.utils.http_parser import serialize_request -from core.trigger.entities.entities import TriggerEntity, TriggerEventData, TriggerInputs +from core.trigger.entities.entities import TriggerEntity, TriggerInputs from core.trigger.trigger_manager import TriggerManager from extensions.ext_database import db from extensions.ext_storage import storage @@ -18,7 +18,6 @@ from models.trigger import TriggerSubscription from models.workflow import Workflow, WorkflowPluginTrigger from services.async_workflow_service import AsyncWorkflowService from services.trigger.trigger_provider_service import TriggerProviderService -from services.trigger_debug_service import TriggerDebugService from services.workflow.entities import PluginTriggerData logger = logging.getLogger(__name__) @@ -121,33 +120,6 @@ class TriggerService: return dispatched_count - @classmethod - def dispatch_debugging_sessions(cls, subscription_id: str, triggers: list[str], request_id: str) -> int: - """ - Dispatch to debug sessions - simplified version. - - Args: - subscription_id: Subscription ID - triggers: List of trigger names - request_id: Request ID for storage reference - """ - try: - # Prepare streamlined event data using Pydantic model - debug_data = TriggerEventData( - subscription_id=subscription_id, - triggers=triggers, - request_id=request_id, - timestamp=time.time(), - ) - return TriggerDebugService.dispatch_to_debug_sessions( - subscription_id=subscription_id, event_data=debug_data - ) - - except Exception as e: - # Silent failure, don't affect production - logger.exception("Debug dispatch failed", exc_info=e) - return 0 - @classmethod def process_endpoint(cls, endpoint_id: str, request: Request) -> Response | None: """ @@ -157,6 +129,7 @@ class TriggerService: endpoint_id: Endpoint ID request: Request """ + timestamp = int(time.time()) subscription = TriggerProviderService.get_subscription_by_endpoint(endpoint_id) if not subscription: return None @@ -175,12 +148,14 @@ class TriggerService: serialized_request = serialize_request(request) storage.save(f"triggers/{request_id}", serialized_request) + # Production dispatch from tasks.trigger_processing_tasks import dispatch_triggered_workflows_async dispatch_triggered_workflows_async( endpoint_id=endpoint_id, provider_id=subscription.provider_id, subscription_id=subscription.id, + timestamp=timestamp, triggers=list(dispatch_response.triggers), request_id=request_id, ) @@ -191,7 +166,6 @@ class TriggerService: endpoint_id, request_id, ) - return dispatch_response.response @classmethod diff --git a/api/services/workflow_service.py b/api/services/workflow_service.py index 3a68379789..09bc536f5e 100644 --- a/api/services/workflow_service.py +++ b/api/services/workflow_service.py @@ -347,7 +347,7 @@ class WorkflowService: node_config = draft_workflow.get_node_config_by_id(node_id) node_type = Workflow.get_node_type_from_node_config(node_config) node_data = node_config.get("data", {}) - if node_type == NodeType.START: + if node_type.is_start_node: with Session(bind=db.engine) as session, session.begin(): draft_var_srv = WorkflowDraftVariableService(session) conversation_id = draft_var_srv.get_or_create_conversation( diff --git a/api/tasks/trigger_processing_tasks.py b/api/tasks/trigger_processing_tasks.py index b13d8a2999..798c29685e 100644 --- a/api/tasks/trigger_processing_tasks.py +++ b/api/tasks/trigger_processing_tasks.py @@ -15,6 +15,7 @@ from core.trigger.trigger_manager import TriggerManager from extensions.ext_database import db from extensions.ext_storage import storage from models.trigger import TriggerSubscription +from services.trigger_debug_service import TriggerDebugService from services.trigger_service import TriggerService logger = logging.getLogger(__name__) @@ -29,6 +30,7 @@ def dispatch_triggered_workflows_async( endpoint_id: str, provider_id: str, subscription_id: str, + timestamp: int, triggers: list[str], request_id: str, ) -> dict: @@ -39,6 +41,7 @@ def dispatch_triggered_workflows_async( endpoint_id: Endpoint ID provider_id: Provider ID subscription_id: Subscription ID + timestamp: Timestamp of the event triggers: List of triggers to dispatch request_id: Unique ID of the stored request @@ -47,10 +50,11 @@ def dispatch_triggered_workflows_async( """ try: logger.info( - "Starting async trigger dispatching for endpoint=%s, triggers=%s, request_id=%s", + "Starting async trigger dispatching for endpoint=%s, triggers=%s, request_id=%s, timestamp=%s", endpoint_id, triggers, request_id, + timestamp, ) # Verify request exists in storage @@ -106,9 +110,11 @@ def dispatch_triggered_workflows_async( # Dispatch to debug sessions after processing all triggers try: - debug_dispatched = TriggerService.dispatch_debugging_sessions( + debug_dispatched = TriggerDebugService.dispatch_debug_event( + tenant_id=subscription.tenant_id, subscription_id=subscription_id, triggers=triggers, + timestamp=timestamp, request_id=request_id, ) except Exception: