From 57c0bc9fb6f51af828d25d826ef9bbb9902440ae Mon Sep 17 00:00:00 2001 From: Harry Date: Thu, 11 Sep 2025 16:55:38 +0800 Subject: [PATCH] feat(trigger): refactor trigger debug event handling and improve response structures - Renamed and refactored trigger debug event classes to enhance clarity and consistency, including changes from `TriggerDebugEventData` to `TriggerEventData` and related response classes. - Updated `DraftWorkflowTriggerNodeApi` and `DraftWorkflowTriggerRunApi` to utilize the new event structures, improving the handling of trigger events. - Removed the `TriggerDebugEventGenerator` class, consolidating event generation directly within the API logic for streamlined processing. - Enhanced error handling and response formatting for trigger events, ensuring structured outputs for better integration and debugging. This refactor improves the overall architecture of trigger debugging, making it more intuitive and maintainable. --- api/controllers/console/app/workflow.py | 58 ++++++++++--------- api/core/app/entities/queue_entities.py | 45 +++++--------- api/core/app/entities/task_entities.py | 44 +++++--------- api/core/trigger/entities/entities.py | 8 +-- api/models/trigger.py | 1 + api/services/trigger/trigger_debug_events.py | 48 --------------- .../trigger/trigger_provider_service.py | 32 +++++----- api/services/trigger_debug_service.py | 38 ++++++------ api/services/trigger_service.py | 4 +- 9 files changed, 102 insertions(+), 176 deletions(-) delete mode 100644 api/services/trigger/trigger_debug_events.py diff --git a/api/controllers/console/app/workflow.py b/api/controllers/console/app/workflow.py index bc910ce5cb..ab087bdf44 100644 --- a/api/controllers/console/app/workflow.py +++ b/api/controllers/console/app/workflow.py @@ -22,6 +22,7 @@ from controllers.web.error import InvokeRateLimitError as InvokeRateLimitHttpErr from core.app.app_config.features.file_upload.manager import FileUploadConfigManager from core.app.apps.base_app_queue_manager import AppQueueManager from core.app.entities.app_invoke_entities import InvokeFrom +from core.app.entities.task_entities import ErrorStreamResponse, TriggerNodeFinishedResponse, TriggerTriggeredResponse from core.file.models import File from core.helper.trace_id_helper import get_external_trace_id from extensions.ext_database import db @@ -827,8 +828,7 @@ class DraftWorkflowTriggerNodeApi(Resource): parser.add_argument("timeout", type=int, default=300, location="json") args = parser.parse_args() - from core.trigger.entities.entities import TriggerDebugEventData, TriggerInputs - from services.trigger.trigger_debug_events import TriggerDebugEventGenerator + from core.trigger.entities.entities import TriggerEventData, TriggerInputs from services.trigger_debug_service import TriggerDebugService from services.workflow_service import WorkflowService @@ -841,16 +841,16 @@ class DraftWorkflowTriggerNodeApi(Resource): user_id=current_user.id, timeout=args.get("timeout", 300), ): - yield event # Pass through all listening events + yield event.to_dict() # Pass through all listening events # Check if we received the trigger - if isinstance(event, dict) and event.get("event") == "trigger_debug_received": + if isinstance(event, TriggerTriggeredResponse): # Save trigger data and exit listening loop - trigger_data = TriggerDebugEventData( - subscription_id=event["subscription_id"], - triggers=event["triggers"], - request_id=event["request_id"], - timestamp=event["timestamp"], + trigger_data = TriggerEventData( + subscription_id=event.subscription_id, + triggers=event.triggers, + request_id=event.request_id, + timestamp=event.timestamp, ) break @@ -858,8 +858,6 @@ class DraftWorkflowTriggerNodeApi(Resource): if trigger_data: # Create trigger inputs trigger_inputs = TriggerInputs.from_trigger_data(trigger_data) - event_generator = TriggerDebugEventGenerator() - try: # Get workflow and execute node workflow_service = WorkflowService() @@ -878,10 +876,20 @@ class DraftWorkflowTriggerNodeApi(Resource): ) # Generate node finished event - yield event_generator.generate_node_finished(node_execution).to_dict() + yield TriggerNodeFinishedResponse( + task_id="", + id=node_execution.id, + node_id=node_execution.node_id, + node_type=node_execution.node_type, + status=node_execution.status, + outputs=node_execution.outputs_dict, + error=node_execution.error, + elapsed_time=node_execution.elapsed_time, + execution_metadata=node_execution.execution_metadata_dict, + ).to_dict() except Exception as e: - yield event_generator.generate_error(str(e)).to_dict() + yield ErrorStreamResponse(task_id="", err=e).to_dict() # Use standard response format from core.app.apps.base_app_generator import BaseAppGenerator @@ -912,9 +920,8 @@ class DraftWorkflowTriggerRunApi(Resource): args = parser.parse_args() from core.app.entities.app_invoke_entities import InvokeFrom - from core.trigger.entities.entities import TriggerDebugEventData, TriggerInputs + from core.trigger.entities.entities import TriggerEventData, TriggerInputs from services.app_generate_service import AppGenerateService - from services.trigger.trigger_debug_events import TriggerDebugEventGenerator from services.trigger_debug_service import TriggerDebugService def generate(current_user: Account): @@ -926,26 +933,21 @@ class DraftWorkflowTriggerRunApi(Resource): user_id=current_user.id, timeout=args.get("timeout", 300), ): - yield event # Pass through all listening events + yield event.to_dict() # Check if we received the trigger - if isinstance(event, dict) and event.get("event") == "trigger_debug_received": + if isinstance(event, TriggerTriggeredResponse): # Save trigger data and exit listening loop - trigger_data = TriggerDebugEventData( - subscription_id=event["subscription_id"], - triggers=event["triggers"], - request_id=event["request_id"], - timestamp=event["timestamp"], + trigger_data = TriggerEventData( + subscription_id=event.subscription_id, + triggers=event.triggers, + request_id=event.request_id, + timestamp=event.timestamp, ) break # Phase 2: Execute workflow if trigger was received if trigger_data: - event_generator = TriggerDebugEventGenerator() - - # Yield workflow started event - yield event_generator.generate_workflow_started(trigger_data).to_dict() - # Create trigger inputs and convert to workflow args trigger_inputs = TriggerInputs.from_trigger_data(trigger_data) combined_args = trigger_inputs.to_workflow_args() @@ -964,7 +966,7 @@ class DraftWorkflowTriggerRunApi(Resource): yield from workflow_response except Exception as e: - yield event_generator.generate_error(str(e)).to_dict() + yield ErrorStreamResponse(task_id="", err=e).to_dict() # Use standard response format from core.app.apps.base_app_generator import BaseAppGenerator diff --git a/api/core/app/entities/queue_entities.py b/api/core/app/entities/queue_entities.py index 583463ffbb..730f71beda 100644 --- a/api/core/app/entities/queue_entities.py +++ b/api/core/app/entities/queue_entities.py @@ -51,12 +51,10 @@ class QueueEvent(StrEnum): PING = "ping" STOP = "stop" RETRY = "retry" - # Trigger debug events - TRIGGER_DEBUG_LISTENING_STARTED = "trigger_debug_listening_started" - TRIGGER_DEBUG_RECEIVED = "trigger_debug_received" - TRIGGER_DEBUG_NODE_FINISHED = "trigger_debug_node_finished" - TRIGGER_DEBUG_WORKFLOW_STARTED = "trigger_debug_workflow_started" - TRIGGER_DEBUG_TIMEOUT = "trigger_debug_timeout" + TRIGGER_LISTENING_STARTED = "trigger_listening_started" + TRIGGER_TRIGGERED = "trigger_triggered" + TRIGGER_NODE_FINISHED = "trigger_node_finished" + TRIGGER_LISTENING_TIMEOUT = "trigger_listening_timeout" class AppQueueEvent(BaseModel): @@ -724,35 +722,35 @@ class QueueParallelBranchRunSucceededEvent(AppQueueEvent): """loop id if node is in loop""" -class QueueTriggerDebugListeningStartedEvent(AppQueueEvent): +class QueueTriggerListeningStartedEvent(AppQueueEvent): """ - QueueTriggerDebugListeningStartedEvent entity + QueueTriggerListeningStartedEvent entity """ - event: QueueEvent = QueueEvent.TRIGGER_DEBUG_LISTENING_STARTED + event: QueueEvent = QueueEvent.TRIGGER_LISTENING_STARTED session_id: str webhook_url: str timeout: int -class QueueTriggerDebugReceivedEvent(AppQueueEvent): +class QueueTriggerTriggeredEvent(AppQueueEvent): """ - QueueTriggerDebugReceivedEvent entity + QueueTriggerTriggeredEvent entity """ - event: QueueEvent = QueueEvent.TRIGGER_DEBUG_RECEIVED + event: QueueEvent = QueueEvent.TRIGGER_TRIGGERED subscription_id: str triggers: list[str] request_id: str timestamp: float -class QueueTriggerDebugNodeFinishedEvent(AppQueueEvent): +class QueueTriggerNodeFinishedEvent(AppQueueEvent): """ - QueueTriggerDebugNodeFinishedEvent entity + QueueTriggerNodeFinishedEvent entity """ - event: QueueEvent = QueueEvent.TRIGGER_DEBUG_NODE_FINISHED + event: QueueEvent = QueueEvent.TRIGGER_NODE_FINISHED id: str node_id: str node_type: str @@ -763,23 +761,12 @@ class QueueTriggerDebugNodeFinishedEvent(AppQueueEvent): execution_metadata: Optional[Mapping[str, Any]] = None -class QueueTriggerDebugWorkflowStartedEvent(AppQueueEvent): +class QueueTriggerListeningTimeoutEvent(AppQueueEvent): """ - QueueTriggerDebugWorkflowStartedEvent entity + QueueTriggerListeningTimeoutEvent entity """ - event: QueueEvent = QueueEvent.TRIGGER_DEBUG_WORKFLOW_STARTED - subscription_id: str - triggers: list[str] - request_id: str - - -class QueueTriggerDebugTimeoutEvent(AppQueueEvent): - """ - QueueTriggerDebugTimeoutEvent entity - """ - - event: QueueEvent = QueueEvent.TRIGGER_DEBUG_TIMEOUT + event: QueueEvent = QueueEvent.TRIGGER_LISTENING_TIMEOUT error: str = "Timeout waiting for trigger" diff --git a/api/core/app/entities/task_entities.py b/api/core/app/entities/task_entities.py index e21f84e77c..3921e5dbde 100644 --- a/api/core/app/entities/task_entities.py +++ b/api/core/app/entities/task_entities.py @@ -83,11 +83,10 @@ class StreamEvent(Enum): TEXT_REPLACE = "text_replace" AGENT_LOG = "agent_log" # Trigger debug events - TRIGGER_DEBUG_LISTENING_STARTED = "trigger_debug_listening_started" - TRIGGER_DEBUG_RECEIVED = "trigger_debug_received" - TRIGGER_DEBUG_NODE_FINISHED = "trigger_debug_node_finished" - TRIGGER_DEBUG_WORKFLOW_STARTED = "trigger_debug_workflow_started" - TRIGGER_DEBUG_TIMEOUT = "trigger_debug_timeout" + TRIGGER_LISTENING_STARTED = "trigger_listening_started" + TRIGGER_LISTENING_TIMEOUT = "trigger_listening_timeout" + TRIGGER_TRIGGERED = "trigger_triggered" + TRIGGER_NODE_FINISHED = "trigger_node_finished" class StreamResponse(BaseModel): @@ -846,35 +845,35 @@ class AgentLogStreamResponse(StreamResponse): # Trigger Debug Stream Responses -class TriggerDebugListeningStartedResponse(StreamResponse): +class TriggerListeningStartedResponse(StreamResponse): """ - TriggerDebugListeningStartedResponse entity + TriggerListeningStartedResponse entity """ - event: StreamEvent = StreamEvent.TRIGGER_DEBUG_LISTENING_STARTED + event: StreamEvent = StreamEvent.TRIGGER_LISTENING_STARTED session_id: str webhook_url: str timeout: int -class TriggerDebugReceivedResponse(StreamResponse): +class TriggerTriggeredResponse(StreamResponse): """ - TriggerDebugReceivedResponse entity + TriggerTriggeredResponse entity """ - event: StreamEvent = StreamEvent.TRIGGER_DEBUG_RECEIVED + event: StreamEvent = StreamEvent.TRIGGER_TRIGGERED subscription_id: str triggers: list[str] request_id: str timestamp: float -class TriggerDebugNodeFinishedResponse(StreamResponse): +class TriggerNodeFinishedResponse(StreamResponse): """ - TriggerDebugNodeFinishedResponse entity + TriggerNodeFinishedResponse entity """ - event: StreamEvent = StreamEvent.TRIGGER_DEBUG_NODE_FINISHED + event: StreamEvent = StreamEvent.TRIGGER_NODE_FINISHED id: str node_id: str node_type: str @@ -885,21 +884,10 @@ class TriggerDebugNodeFinishedResponse(StreamResponse): execution_metadata: Optional[Mapping[str, Any]] = None -class TriggerDebugWorkflowStartedResponse(StreamResponse): +class TriggerListeningTimeoutResponse(StreamResponse): """ - TriggerDebugWorkflowStartedResponse entity + TriggerListeningTimeoutResponse entity """ - event: StreamEvent = StreamEvent.TRIGGER_DEBUG_WORKFLOW_STARTED - subscription_id: str - triggers: list[str] - request_id: str - - -class TriggerDebugTimeoutResponse(StreamResponse): - """ - TriggerDebugTimeoutResponse entity - """ - - event: StreamEvent = StreamEvent.TRIGGER_DEBUG_TIMEOUT + event: StreamEvent = StreamEvent.TRIGGER_LISTENING_TIMEOUT error: str = "Timeout waiting for trigger" diff --git a/api/core/trigger/entities/entities.py b/api/core/trigger/entities/entities.py index c54bf6ebaa..6010028553 100644 --- a/api/core/trigger/entities/entities.py +++ b/api/core/trigger/entities/entities.py @@ -251,8 +251,8 @@ class SubscriptionBuilderUpdater(BaseModel): subscription_builder.expires_at = self.expires_at -class TriggerDebugEventData(BaseModel): - """Debug event data dispatched to debug sessions.""" +class TriggerEventData(BaseModel): + """Event data dispatched to trigger sessions.""" subscription_id: str triggers: list[str] @@ -270,7 +270,7 @@ class TriggerInputs(BaseModel): subscription_id: str @classmethod - def from_trigger_data(cls, trigger_data: TriggerDebugEventData) -> "TriggerInputs": + def from_trigger_data(cls, trigger_data: TriggerEventData) -> "TriggerInputs": """Create from debug event data.""" return cls( request_id=trigger_data.request_id, @@ -298,9 +298,9 @@ __all__ = [ "RequestLog", "Subscription", "SubscriptionBuilder", - "TriggerDebugEventData", "TriggerDescription", "TriggerEntity", + "TriggerEventData", "TriggerIdentity", "TriggerInputs", "TriggerParameter", diff --git a/api/models/trigger.py b/api/models/trigger.py index d03a8c91a7..08dc53d82f 100644 --- a/api/models/trigger.py +++ b/api/models/trigger.py @@ -84,6 +84,7 @@ class TriggerSubscription(Base): properties=self.properties, credential_type=CredentialType(self.credential_type), credentials=self.credentials, + workflows_in_use=-1, ) diff --git a/api/services/trigger/trigger_debug_events.py b/api/services/trigger/trigger_debug_events.py deleted file mode 100644 index d5d083f9e5..0000000000 --- a/api/services/trigger/trigger_debug_events.py +++ /dev/null @@ -1,48 +0,0 @@ -""" -Event generator for trigger debug operations. - -Provides structured event generation for trigger debug SSE streams. -""" - -from core.app.entities.task_entities import ( - ErrorStreamResponse, - StreamResponse, - TriggerDebugNodeFinishedResponse, - TriggerDebugWorkflowStartedResponse, -) -from core.trigger.entities.entities import TriggerDebugEventData -from models.workflow import WorkflowNodeExecutionModel - - -class TriggerDebugEventGenerator: - """Generator for trigger debug events.""" - - @staticmethod - def generate_node_finished(node_execution: WorkflowNodeExecutionModel) -> StreamResponse: - """Generate node finished event.""" - return TriggerDebugNodeFinishedResponse( - task_id="", - id=node_execution.id, - node_id=node_execution.node_id, - node_type=node_execution.node_type, - status=node_execution.status, - outputs=node_execution.outputs_dict, - error=node_execution.error, - elapsed_time=node_execution.elapsed_time, - execution_metadata=node_execution.execution_metadata_dict, - ) - - @staticmethod - def generate_workflow_started(event_data: TriggerDebugEventData) -> StreamResponse: - """Generate workflow started event.""" - return TriggerDebugWorkflowStartedResponse( - task_id="", - subscription_id=event_data.subscription_id, - triggers=event_data.triggers, - request_id=event_data.request_id, - ) - - @staticmethod - def generate_error(error: str) -> StreamResponse: - """Generate error event.""" - return ErrorStreamResponse(task_id="", err=Exception(error)) diff --git a/api/services/trigger/trigger_provider_service.py b/api/services/trigger/trigger_provider_service.py index c0cdcba204..92733b1274 100644 --- a/api/services/trigger/trigger_provider_service.py +++ b/api/services/trigger/trigger_provider_service.py @@ -64,25 +64,22 @@ class TriggerProviderService: .all() ) subscriptions = [subscription.to_api_entity() for subscription in subscriptions_db] - - # Get distinct app count for each subscription - if subscriptions: - usage_counts = ( - session.query( - WorkflowPluginTrigger.subscription_id, - func.count(func.distinct(WorkflowPluginTrigger.app_id)).label("app_count"), - ) - .filter( - WorkflowPluginTrigger.tenant_id == tenant_id, - WorkflowPluginTrigger.subscription_id.in_([s.id for s in subscriptions]), - ) - .group_by(WorkflowPluginTrigger.subscription_id) - .all() + if not subscriptions: + return [] + usage_counts = ( + session.query( + WorkflowPluginTrigger.subscription_id, + func.count(func.distinct(WorkflowPluginTrigger.app_id)).label("app_count"), ) - # Convert query result to dictionary: subscription_id -> distinct app count - workflows_in_use_map = {str(row.subscription_id): int(row.app_count) for row in usage_counts} + .filter( + WorkflowPluginTrigger.tenant_id == tenant_id, + WorkflowPluginTrigger.subscription_id.in_([s.id for s in subscriptions]), + ) + .group_by(WorkflowPluginTrigger.subscription_id) + .all() + ) + workflows_in_use_map = {str(row.subscription_id): int(row.app_count) for row in usage_counts} - # Process subscriptions and mask credentials provider_controller = TriggerManager.get_trigger_provider(tenant_id, provider_id) for subscription in subscriptions: encrypter, _ = create_trigger_provider_encrypter_for_subscription( @@ -91,7 +88,6 @@ class TriggerProviderService: subscription=subscription, ) subscription.credentials = encrypter.mask_credentials(subscription.credentials) - # Set workflows_in_use count, default to 0 if not found count = workflows_in_use_map.get(subscription.id) subscription.workflows_in_use = count if count is not None else 0 diff --git a/api/services/trigger_debug_service.py b/api/services/trigger_debug_service.py index 6902ca5fb4..c2a10b7e96 100644 --- a/api/services/trigger_debug_service.py +++ b/api/services/trigger_debug_service.py @@ -11,7 +11,6 @@ import time import uuid from collections.abc import Generator from dataclasses import dataclass -from typing import Any from flask import request from werkzeug.exceptions import NotFound @@ -19,11 +18,12 @@ from werkzeug.exceptions import NotFound from core.app.entities.task_entities import ( ErrorStreamResponse, PingStreamResponse, - TriggerDebugListeningStartedResponse, - TriggerDebugReceivedResponse, - TriggerDebugTimeoutResponse, + StreamResponse, + TriggerListeningStartedResponse, + TriggerListeningTimeoutResponse, + TriggerTriggeredResponse, ) -from core.trigger.entities.entities import TriggerDebugEventData +from core.trigger.entities.entities import TriggerEventData from extensions.ext_redis import redis_client from models.model import App from services.trigger.trigger_provider_service import TriggerProviderService @@ -92,7 +92,7 @@ class TriggerDebugService: if not node_config: raise NotFound(f"Node {node_id} not found") - if node_config.get("data", {}).get("type") != "plugin": + if node_config.get("data", {}).get("type") != "trigger-plugin": raise ValueError("Node is not a trigger plugin node") subscription_id = node_config.get("data", {}).get("subscription_id") @@ -134,7 +134,7 @@ class TriggerDebugService: node_id: str, user_id: str, timeout: int = __DEFAULT_LISTEN_TIMEOUT__, - ) -> Generator[dict[str, Any], None, None]: + ) -> Generator[StreamResponse, None, None]: """ Listen for trigger events only. @@ -165,7 +165,7 @@ class TriggerDebugService: yield event # If we received a trigger, listening is complete - if isinstance(event, dict) and event.get("event") == "trigger_debug_received": + if isinstance(event, dict) and event.get("event") == "trigger_triggered": break @classmethod @@ -209,7 +209,7 @@ class TriggerDebugService: @classmethod def listen_for_events( cls, session_id: str, webhook_url: str, timeout: int = __DEFAULT_LISTEN_TIMEOUT__ - ) -> Generator[dict[str, Any], None, None]: + ) -> Generator[StreamResponse, None, None]: """ Listen for events using Redis Pub/Sub and generate structured events. @@ -222,12 +222,12 @@ class TriggerDebugService: Structured AppQueueEvent objects """ # Send initial listening started event - yield TriggerDebugListeningStartedResponse( + yield TriggerListeningStartedResponse( task_id="", # Will be set by the caller if needed session_id=session_id, webhook_url=webhook_url, timeout=timeout, - ).to_dict() + ) pubsub = redis_client.pubsub() channel = f"{cls.PUBSUB_CHANNEL_PREFIX}{session_id}" @@ -251,39 +251,39 @@ class TriggerDebugService: logger.info("Received trigger event for session %s", session_id) # Create structured trigger received event - trigger_data = TriggerDebugEventData( + trigger_data = TriggerEventData( subscription_id=event_data["subscription_id"], triggers=event_data["triggers"], request_id=event_data["request_id"], timestamp=event_data.get("timestamp", time.time()), ) - yield TriggerDebugReceivedResponse( + yield TriggerTriggeredResponse( task_id="", subscription_id=trigger_data.subscription_id, triggers=trigger_data.triggers, request_id=trigger_data.request_id, timestamp=trigger_data.timestamp, - ).to_dict() + ) break # End listening after receiving event except (json.JSONDecodeError, KeyError) as e: logger.exception("Failed to parse trigger event") yield ErrorStreamResponse( task_id="", err=Exception(f"Failed to parse trigger event: {str(e)}") - ).to_dict() + ) break # Send periodic heartbeat if time.time() - last_heartbeat > 5: - yield PingStreamResponse(task_id="").to_dict() + yield PingStreamResponse(task_id="") last_heartbeat = time.time() # Timeout if time.time() - start_time >= timeout: - yield TriggerDebugTimeoutResponse(task_id="").to_dict() + yield TriggerListeningTimeoutResponse(task_id="") except Exception as e: logger.exception("Error in listen_for_events", exc_info=e) - yield ErrorStreamResponse(task_id="", err=e).to_dict() + yield ErrorStreamResponse(task_id="", err=e) finally: # Clean up resources @@ -320,7 +320,7 @@ class TriggerDebugService: logger.exception("Error closing session %s", session_id, exc_info=e) @classmethod - def dispatch_to_debug_sessions(cls, subscription_id: str, event_data: TriggerDebugEventData) -> int: + def dispatch_to_debug_sessions(cls, subscription_id: str, event_data: TriggerEventData) -> int: """ Dispatch events to debug sessions using Pub/Sub only. diff --git a/api/services/trigger_service.py b/api/services/trigger_service.py index b0c781fbc5..a44dcbb5bf 100644 --- a/api/services/trigger_service.py +++ b/api/services/trigger_service.py @@ -8,7 +8,7 @@ from sqlalchemy.orm import Session from core.plugin.entities.plugin import TriggerProviderID from core.plugin.utils.http_parser import serialize_request -from core.trigger.entities.entities import TriggerDebugEventData, TriggerEntity, TriggerInputs +from core.trigger.entities.entities import TriggerEntity, TriggerEventData, TriggerInputs from core.trigger.trigger_manager import TriggerManager from extensions.ext_database import db from extensions.ext_storage import storage @@ -133,7 +133,7 @@ class TriggerService: """ try: # Prepare streamlined event data using Pydantic model - debug_data = TriggerDebugEventData( + debug_data = TriggerEventData( subscription_id=subscription_id, triggers=triggers, request_id=request_id,