diff --git a/api/controllers/console/app/workflow.py b/api/controllers/console/app/workflow.py index bc910ce5cb..ab087bdf44 100644 --- a/api/controllers/console/app/workflow.py +++ b/api/controllers/console/app/workflow.py @@ -22,6 +22,7 @@ from controllers.web.error import InvokeRateLimitError as InvokeRateLimitHttpErr from core.app.app_config.features.file_upload.manager import FileUploadConfigManager from core.app.apps.base_app_queue_manager import AppQueueManager from core.app.entities.app_invoke_entities import InvokeFrom +from core.app.entities.task_entities import ErrorStreamResponse, TriggerNodeFinishedResponse, TriggerTriggeredResponse from core.file.models import File from core.helper.trace_id_helper import get_external_trace_id from extensions.ext_database import db @@ -827,8 +828,7 @@ class DraftWorkflowTriggerNodeApi(Resource): parser.add_argument("timeout", type=int, default=300, location="json") args = parser.parse_args() - from core.trigger.entities.entities import TriggerDebugEventData, TriggerInputs - from services.trigger.trigger_debug_events import TriggerDebugEventGenerator + from core.trigger.entities.entities import TriggerEventData, TriggerInputs from services.trigger_debug_service import TriggerDebugService from services.workflow_service import WorkflowService @@ -841,16 +841,16 @@ class DraftWorkflowTriggerNodeApi(Resource): user_id=current_user.id, timeout=args.get("timeout", 300), ): - yield event # Pass through all listening events + yield event.to_dict() # Pass through all listening events # Check if we received the trigger - if isinstance(event, dict) and event.get("event") == "trigger_debug_received": + if isinstance(event, TriggerTriggeredResponse): # Save trigger data and exit listening loop - trigger_data = TriggerDebugEventData( - subscription_id=event["subscription_id"], - triggers=event["triggers"], - request_id=event["request_id"], - timestamp=event["timestamp"], + trigger_data = TriggerEventData( + subscription_id=event.subscription_id, + triggers=event.triggers, + request_id=event.request_id, + timestamp=event.timestamp, ) break @@ -858,8 +858,6 @@ class DraftWorkflowTriggerNodeApi(Resource): if trigger_data: # Create trigger inputs trigger_inputs = TriggerInputs.from_trigger_data(trigger_data) - event_generator = TriggerDebugEventGenerator() - try: # Get workflow and execute node workflow_service = WorkflowService() @@ -878,10 +876,20 @@ class DraftWorkflowTriggerNodeApi(Resource): ) # Generate node finished event - yield event_generator.generate_node_finished(node_execution).to_dict() + yield TriggerNodeFinishedResponse( + task_id="", + id=node_execution.id, + node_id=node_execution.node_id, + node_type=node_execution.node_type, + status=node_execution.status, + outputs=node_execution.outputs_dict, + error=node_execution.error, + elapsed_time=node_execution.elapsed_time, + execution_metadata=node_execution.execution_metadata_dict, + ).to_dict() except Exception as e: - yield event_generator.generate_error(str(e)).to_dict() + yield ErrorStreamResponse(task_id="", err=e).to_dict() # Use standard response format from core.app.apps.base_app_generator import BaseAppGenerator @@ -912,9 +920,8 @@ class DraftWorkflowTriggerRunApi(Resource): args = parser.parse_args() from core.app.entities.app_invoke_entities import InvokeFrom - from core.trigger.entities.entities import TriggerDebugEventData, TriggerInputs + from core.trigger.entities.entities import TriggerEventData, TriggerInputs from services.app_generate_service import AppGenerateService - from services.trigger.trigger_debug_events import TriggerDebugEventGenerator from services.trigger_debug_service import TriggerDebugService def generate(current_user: Account): @@ -926,26 +933,21 @@ class DraftWorkflowTriggerRunApi(Resource): user_id=current_user.id, timeout=args.get("timeout", 300), ): - yield event # Pass through all listening events + yield event.to_dict() # Check if we received the trigger - if isinstance(event, dict) and event.get("event") == "trigger_debug_received": + if isinstance(event, TriggerTriggeredResponse): # Save trigger data and exit listening loop - trigger_data = TriggerDebugEventData( - subscription_id=event["subscription_id"], - triggers=event["triggers"], - request_id=event["request_id"], - timestamp=event["timestamp"], + trigger_data = TriggerEventData( + subscription_id=event.subscription_id, + triggers=event.triggers, + request_id=event.request_id, + timestamp=event.timestamp, ) break # Phase 2: Execute workflow if trigger was received if trigger_data: - event_generator = TriggerDebugEventGenerator() - - # Yield workflow started event - yield event_generator.generate_workflow_started(trigger_data).to_dict() - # Create trigger inputs and convert to workflow args trigger_inputs = TriggerInputs.from_trigger_data(trigger_data) combined_args = trigger_inputs.to_workflow_args() @@ -964,7 +966,7 @@ class DraftWorkflowTriggerRunApi(Resource): yield from workflow_response except Exception as e: - yield event_generator.generate_error(str(e)).to_dict() + yield ErrorStreamResponse(task_id="", err=e).to_dict() # Use standard response format from core.app.apps.base_app_generator import BaseAppGenerator diff --git a/api/core/app/entities/queue_entities.py b/api/core/app/entities/queue_entities.py index 583463ffbb..730f71beda 100644 --- a/api/core/app/entities/queue_entities.py +++ b/api/core/app/entities/queue_entities.py @@ -51,12 +51,10 @@ class QueueEvent(StrEnum): PING = "ping" STOP = "stop" RETRY = "retry" - # Trigger debug events - TRIGGER_DEBUG_LISTENING_STARTED = "trigger_debug_listening_started" - TRIGGER_DEBUG_RECEIVED = "trigger_debug_received" - TRIGGER_DEBUG_NODE_FINISHED = "trigger_debug_node_finished" - TRIGGER_DEBUG_WORKFLOW_STARTED = "trigger_debug_workflow_started" - TRIGGER_DEBUG_TIMEOUT = "trigger_debug_timeout" + TRIGGER_LISTENING_STARTED = "trigger_listening_started" + TRIGGER_TRIGGERED = "trigger_triggered" + TRIGGER_NODE_FINISHED = "trigger_node_finished" + TRIGGER_LISTENING_TIMEOUT = "trigger_listening_timeout" class AppQueueEvent(BaseModel): @@ -724,35 +722,35 @@ class QueueParallelBranchRunSucceededEvent(AppQueueEvent): """loop id if node is in loop""" -class QueueTriggerDebugListeningStartedEvent(AppQueueEvent): +class QueueTriggerListeningStartedEvent(AppQueueEvent): """ - QueueTriggerDebugListeningStartedEvent entity + QueueTriggerListeningStartedEvent entity """ - event: QueueEvent = QueueEvent.TRIGGER_DEBUG_LISTENING_STARTED + event: QueueEvent = QueueEvent.TRIGGER_LISTENING_STARTED session_id: str webhook_url: str timeout: int -class QueueTriggerDebugReceivedEvent(AppQueueEvent): +class QueueTriggerTriggeredEvent(AppQueueEvent): """ - QueueTriggerDebugReceivedEvent entity + QueueTriggerTriggeredEvent entity """ - event: QueueEvent = QueueEvent.TRIGGER_DEBUG_RECEIVED + event: QueueEvent = QueueEvent.TRIGGER_TRIGGERED subscription_id: str triggers: list[str] request_id: str timestamp: float -class QueueTriggerDebugNodeFinishedEvent(AppQueueEvent): +class QueueTriggerNodeFinishedEvent(AppQueueEvent): """ - QueueTriggerDebugNodeFinishedEvent entity + QueueTriggerNodeFinishedEvent entity """ - event: QueueEvent = QueueEvent.TRIGGER_DEBUG_NODE_FINISHED + event: QueueEvent = QueueEvent.TRIGGER_NODE_FINISHED id: str node_id: str node_type: str @@ -763,23 +761,12 @@ class QueueTriggerDebugNodeFinishedEvent(AppQueueEvent): execution_metadata: Optional[Mapping[str, Any]] = None -class QueueTriggerDebugWorkflowStartedEvent(AppQueueEvent): +class QueueTriggerListeningTimeoutEvent(AppQueueEvent): """ - QueueTriggerDebugWorkflowStartedEvent entity + QueueTriggerListeningTimeoutEvent entity """ - event: QueueEvent = QueueEvent.TRIGGER_DEBUG_WORKFLOW_STARTED - subscription_id: str - triggers: list[str] - request_id: str - - -class QueueTriggerDebugTimeoutEvent(AppQueueEvent): - """ - QueueTriggerDebugTimeoutEvent entity - """ - - event: QueueEvent = QueueEvent.TRIGGER_DEBUG_TIMEOUT + event: QueueEvent = QueueEvent.TRIGGER_LISTENING_TIMEOUT error: str = "Timeout waiting for trigger" diff --git a/api/core/app/entities/task_entities.py b/api/core/app/entities/task_entities.py index e21f84e77c..3921e5dbde 100644 --- a/api/core/app/entities/task_entities.py +++ b/api/core/app/entities/task_entities.py @@ -83,11 +83,10 @@ class StreamEvent(Enum): TEXT_REPLACE = "text_replace" AGENT_LOG = "agent_log" # Trigger debug events - TRIGGER_DEBUG_LISTENING_STARTED = "trigger_debug_listening_started" - TRIGGER_DEBUG_RECEIVED = "trigger_debug_received" - TRIGGER_DEBUG_NODE_FINISHED = "trigger_debug_node_finished" - TRIGGER_DEBUG_WORKFLOW_STARTED = "trigger_debug_workflow_started" - TRIGGER_DEBUG_TIMEOUT = "trigger_debug_timeout" + TRIGGER_LISTENING_STARTED = "trigger_listening_started" + TRIGGER_LISTENING_TIMEOUT = "trigger_listening_timeout" + TRIGGER_TRIGGERED = "trigger_triggered" + TRIGGER_NODE_FINISHED = "trigger_node_finished" class StreamResponse(BaseModel): @@ -846,35 +845,35 @@ class AgentLogStreamResponse(StreamResponse): # Trigger Debug Stream Responses -class TriggerDebugListeningStartedResponse(StreamResponse): +class TriggerListeningStartedResponse(StreamResponse): """ - TriggerDebugListeningStartedResponse entity + TriggerListeningStartedResponse entity """ - event: StreamEvent = StreamEvent.TRIGGER_DEBUG_LISTENING_STARTED + event: StreamEvent = StreamEvent.TRIGGER_LISTENING_STARTED session_id: str webhook_url: str timeout: int -class TriggerDebugReceivedResponse(StreamResponse): +class TriggerTriggeredResponse(StreamResponse): """ - TriggerDebugReceivedResponse entity + TriggerTriggeredResponse entity """ - event: StreamEvent = StreamEvent.TRIGGER_DEBUG_RECEIVED + event: StreamEvent = StreamEvent.TRIGGER_TRIGGERED subscription_id: str triggers: list[str] request_id: str timestamp: float -class TriggerDebugNodeFinishedResponse(StreamResponse): +class TriggerNodeFinishedResponse(StreamResponse): """ - TriggerDebugNodeFinishedResponse entity + TriggerNodeFinishedResponse entity """ - event: StreamEvent = StreamEvent.TRIGGER_DEBUG_NODE_FINISHED + event: StreamEvent = StreamEvent.TRIGGER_NODE_FINISHED id: str node_id: str node_type: str @@ -885,21 +884,10 @@ class TriggerDebugNodeFinishedResponse(StreamResponse): execution_metadata: Optional[Mapping[str, Any]] = None -class TriggerDebugWorkflowStartedResponse(StreamResponse): +class TriggerListeningTimeoutResponse(StreamResponse): """ - TriggerDebugWorkflowStartedResponse entity + TriggerListeningTimeoutResponse entity """ - event: StreamEvent = StreamEvent.TRIGGER_DEBUG_WORKFLOW_STARTED - subscription_id: str - triggers: list[str] - request_id: str - - -class TriggerDebugTimeoutResponse(StreamResponse): - """ - TriggerDebugTimeoutResponse entity - """ - - event: StreamEvent = StreamEvent.TRIGGER_DEBUG_TIMEOUT + event: StreamEvent = StreamEvent.TRIGGER_LISTENING_TIMEOUT error: str = "Timeout waiting for trigger" diff --git a/api/core/trigger/entities/entities.py b/api/core/trigger/entities/entities.py index c54bf6ebaa..6010028553 100644 --- a/api/core/trigger/entities/entities.py +++ b/api/core/trigger/entities/entities.py @@ -251,8 +251,8 @@ class SubscriptionBuilderUpdater(BaseModel): subscription_builder.expires_at = self.expires_at -class TriggerDebugEventData(BaseModel): - """Debug event data dispatched to debug sessions.""" +class TriggerEventData(BaseModel): + """Event data dispatched to trigger sessions.""" subscription_id: str triggers: list[str] @@ -270,7 +270,7 @@ class TriggerInputs(BaseModel): subscription_id: str @classmethod - def from_trigger_data(cls, trigger_data: TriggerDebugEventData) -> "TriggerInputs": + def from_trigger_data(cls, trigger_data: TriggerEventData) -> "TriggerInputs": """Create from debug event data.""" return cls( request_id=trigger_data.request_id, @@ -298,9 +298,9 @@ __all__ = [ "RequestLog", "Subscription", "SubscriptionBuilder", - "TriggerDebugEventData", "TriggerDescription", "TriggerEntity", + "TriggerEventData", "TriggerIdentity", "TriggerInputs", "TriggerParameter", diff --git a/api/models/trigger.py b/api/models/trigger.py index d03a8c91a7..08dc53d82f 100644 --- a/api/models/trigger.py +++ b/api/models/trigger.py @@ -84,6 +84,7 @@ class TriggerSubscription(Base): properties=self.properties, credential_type=CredentialType(self.credential_type), credentials=self.credentials, + workflows_in_use=-1, ) diff --git a/api/services/trigger/trigger_debug_events.py b/api/services/trigger/trigger_debug_events.py deleted file mode 100644 index d5d083f9e5..0000000000 --- a/api/services/trigger/trigger_debug_events.py +++ /dev/null @@ -1,48 +0,0 @@ -""" -Event generator for trigger debug operations. - -Provides structured event generation for trigger debug SSE streams. -""" - -from core.app.entities.task_entities import ( - ErrorStreamResponse, - StreamResponse, - TriggerDebugNodeFinishedResponse, - TriggerDebugWorkflowStartedResponse, -) -from core.trigger.entities.entities import TriggerDebugEventData -from models.workflow import WorkflowNodeExecutionModel - - -class TriggerDebugEventGenerator: - """Generator for trigger debug events.""" - - @staticmethod - def generate_node_finished(node_execution: WorkflowNodeExecutionModel) -> StreamResponse: - """Generate node finished event.""" - return TriggerDebugNodeFinishedResponse( - task_id="", - id=node_execution.id, - node_id=node_execution.node_id, - node_type=node_execution.node_type, - status=node_execution.status, - outputs=node_execution.outputs_dict, - error=node_execution.error, - elapsed_time=node_execution.elapsed_time, - execution_metadata=node_execution.execution_metadata_dict, - ) - - @staticmethod - def generate_workflow_started(event_data: TriggerDebugEventData) -> StreamResponse: - """Generate workflow started event.""" - return TriggerDebugWorkflowStartedResponse( - task_id="", - subscription_id=event_data.subscription_id, - triggers=event_data.triggers, - request_id=event_data.request_id, - ) - - @staticmethod - def generate_error(error: str) -> StreamResponse: - """Generate error event.""" - return ErrorStreamResponse(task_id="", err=Exception(error)) diff --git a/api/services/trigger/trigger_provider_service.py b/api/services/trigger/trigger_provider_service.py index c0cdcba204..92733b1274 100644 --- a/api/services/trigger/trigger_provider_service.py +++ b/api/services/trigger/trigger_provider_service.py @@ -64,25 +64,22 @@ class TriggerProviderService: .all() ) subscriptions = [subscription.to_api_entity() for subscription in subscriptions_db] - - # Get distinct app count for each subscription - if subscriptions: - usage_counts = ( - session.query( - WorkflowPluginTrigger.subscription_id, - func.count(func.distinct(WorkflowPluginTrigger.app_id)).label("app_count"), - ) - .filter( - WorkflowPluginTrigger.tenant_id == tenant_id, - WorkflowPluginTrigger.subscription_id.in_([s.id for s in subscriptions]), - ) - .group_by(WorkflowPluginTrigger.subscription_id) - .all() + if not subscriptions: + return [] + usage_counts = ( + session.query( + WorkflowPluginTrigger.subscription_id, + func.count(func.distinct(WorkflowPluginTrigger.app_id)).label("app_count"), ) - # Convert query result to dictionary: subscription_id -> distinct app count - workflows_in_use_map = {str(row.subscription_id): int(row.app_count) for row in usage_counts} + .filter( + WorkflowPluginTrigger.tenant_id == tenant_id, + WorkflowPluginTrigger.subscription_id.in_([s.id for s in subscriptions]), + ) + .group_by(WorkflowPluginTrigger.subscription_id) + .all() + ) + workflows_in_use_map = {str(row.subscription_id): int(row.app_count) for row in usage_counts} - # Process subscriptions and mask credentials provider_controller = TriggerManager.get_trigger_provider(tenant_id, provider_id) for subscription in subscriptions: encrypter, _ = create_trigger_provider_encrypter_for_subscription( @@ -91,7 +88,6 @@ class TriggerProviderService: subscription=subscription, ) subscription.credentials = encrypter.mask_credentials(subscription.credentials) - # Set workflows_in_use count, default to 0 if not found count = workflows_in_use_map.get(subscription.id) subscription.workflows_in_use = count if count is not None else 0 diff --git a/api/services/trigger_debug_service.py b/api/services/trigger_debug_service.py index 6902ca5fb4..c2a10b7e96 100644 --- a/api/services/trigger_debug_service.py +++ b/api/services/trigger_debug_service.py @@ -11,7 +11,6 @@ import time import uuid from collections.abc import Generator from dataclasses import dataclass -from typing import Any from flask import request from werkzeug.exceptions import NotFound @@ -19,11 +18,12 @@ from werkzeug.exceptions import NotFound from core.app.entities.task_entities import ( ErrorStreamResponse, PingStreamResponse, - TriggerDebugListeningStartedResponse, - TriggerDebugReceivedResponse, - TriggerDebugTimeoutResponse, + StreamResponse, + TriggerListeningStartedResponse, + TriggerListeningTimeoutResponse, + TriggerTriggeredResponse, ) -from core.trigger.entities.entities import TriggerDebugEventData +from core.trigger.entities.entities import TriggerEventData from extensions.ext_redis import redis_client from models.model import App from services.trigger.trigger_provider_service import TriggerProviderService @@ -92,7 +92,7 @@ class TriggerDebugService: if not node_config: raise NotFound(f"Node {node_id} not found") - if node_config.get("data", {}).get("type") != "plugin": + if node_config.get("data", {}).get("type") != "trigger-plugin": raise ValueError("Node is not a trigger plugin node") subscription_id = node_config.get("data", {}).get("subscription_id") @@ -134,7 +134,7 @@ class TriggerDebugService: node_id: str, user_id: str, timeout: int = __DEFAULT_LISTEN_TIMEOUT__, - ) -> Generator[dict[str, Any], None, None]: + ) -> Generator[StreamResponse, None, None]: """ Listen for trigger events only. @@ -165,7 +165,7 @@ class TriggerDebugService: yield event # If we received a trigger, listening is complete - if isinstance(event, dict) and event.get("event") == "trigger_debug_received": + if isinstance(event, dict) and event.get("event") == "trigger_triggered": break @classmethod @@ -209,7 +209,7 @@ class TriggerDebugService: @classmethod def listen_for_events( cls, session_id: str, webhook_url: str, timeout: int = __DEFAULT_LISTEN_TIMEOUT__ - ) -> Generator[dict[str, Any], None, None]: + ) -> Generator[StreamResponse, None, None]: """ Listen for events using Redis Pub/Sub and generate structured events. @@ -222,12 +222,12 @@ class TriggerDebugService: Structured AppQueueEvent objects """ # Send initial listening started event - yield TriggerDebugListeningStartedResponse( + yield TriggerListeningStartedResponse( task_id="", # Will be set by the caller if needed session_id=session_id, webhook_url=webhook_url, timeout=timeout, - ).to_dict() + ) pubsub = redis_client.pubsub() channel = f"{cls.PUBSUB_CHANNEL_PREFIX}{session_id}" @@ -251,39 +251,39 @@ class TriggerDebugService: logger.info("Received trigger event for session %s", session_id) # Create structured trigger received event - trigger_data = TriggerDebugEventData( + trigger_data = TriggerEventData( subscription_id=event_data["subscription_id"], triggers=event_data["triggers"], request_id=event_data["request_id"], timestamp=event_data.get("timestamp", time.time()), ) - yield TriggerDebugReceivedResponse( + yield TriggerTriggeredResponse( task_id="", subscription_id=trigger_data.subscription_id, triggers=trigger_data.triggers, request_id=trigger_data.request_id, timestamp=trigger_data.timestamp, - ).to_dict() + ) break # End listening after receiving event except (json.JSONDecodeError, KeyError) as e: logger.exception("Failed to parse trigger event") yield ErrorStreamResponse( task_id="", err=Exception(f"Failed to parse trigger event: {str(e)}") - ).to_dict() + ) break # Send periodic heartbeat if time.time() - last_heartbeat > 5: - yield PingStreamResponse(task_id="").to_dict() + yield PingStreamResponse(task_id="") last_heartbeat = time.time() # Timeout if time.time() - start_time >= timeout: - yield TriggerDebugTimeoutResponse(task_id="").to_dict() + yield TriggerListeningTimeoutResponse(task_id="") except Exception as e: logger.exception("Error in listen_for_events", exc_info=e) - yield ErrorStreamResponse(task_id="", err=e).to_dict() + yield ErrorStreamResponse(task_id="", err=e) finally: # Clean up resources @@ -320,7 +320,7 @@ class TriggerDebugService: logger.exception("Error closing session %s", session_id, exc_info=e) @classmethod - def dispatch_to_debug_sessions(cls, subscription_id: str, event_data: TriggerDebugEventData) -> int: + def dispatch_to_debug_sessions(cls, subscription_id: str, event_data: TriggerEventData) -> int: """ Dispatch events to debug sessions using Pub/Sub only. diff --git a/api/services/trigger_service.py b/api/services/trigger_service.py index b0c781fbc5..a44dcbb5bf 100644 --- a/api/services/trigger_service.py +++ b/api/services/trigger_service.py @@ -8,7 +8,7 @@ from sqlalchemy.orm import Session from core.plugin.entities.plugin import TriggerProviderID from core.plugin.utils.http_parser import serialize_request -from core.trigger.entities.entities import TriggerDebugEventData, TriggerEntity, TriggerInputs +from core.trigger.entities.entities import TriggerEntity, TriggerEventData, TriggerInputs from core.trigger.trigger_manager import TriggerManager from extensions.ext_database import db from extensions.ext_storage import storage @@ -133,7 +133,7 @@ class TriggerService: """ try: # Prepare streamlined event data using Pydantic model - debug_data = TriggerDebugEventData( + debug_data = TriggerEventData( subscription_id=subscription_id, triggers=triggers, request_id=request_id,