feat(trigger): refactor trigger debug event handling and improve response structures

- Renamed and refactored trigger debug event classes to enhance clarity and consistency, including changes from `TriggerDebugEventData` to `TriggerEventData` and related response classes.
- Updated `DraftWorkflowTriggerNodeApi` and `DraftWorkflowTriggerRunApi` to utilize the new event structures, improving the handling of trigger events.
- Removed the `TriggerDebugEventGenerator` class, consolidating event generation directly within the API logic for streamlined processing.
- Enhanced error handling and response formatting for trigger events, ensuring structured outputs for better integration and debugging.

This refactor improves the overall architecture of trigger debugging, making it more intuitive and maintainable.
This commit is contained in:
Harry 2025-09-11 16:55:38 +08:00
parent c3ebb22a4b
commit 57c0bc9fb6
9 changed files with 102 additions and 176 deletions

View File

@ -22,6 +22,7 @@ from controllers.web.error import InvokeRateLimitError as InvokeRateLimitHttpErr
from core.app.app_config.features.file_upload.manager import FileUploadConfigManager
from core.app.apps.base_app_queue_manager import AppQueueManager
from core.app.entities.app_invoke_entities import InvokeFrom
from core.app.entities.task_entities import ErrorStreamResponse, TriggerNodeFinishedResponse, TriggerTriggeredResponse
from core.file.models import File
from core.helper.trace_id_helper import get_external_trace_id
from extensions.ext_database import db
@ -827,8 +828,7 @@ class DraftWorkflowTriggerNodeApi(Resource):
parser.add_argument("timeout", type=int, default=300, location="json")
args = parser.parse_args()
from core.trigger.entities.entities import TriggerDebugEventData, TriggerInputs
from services.trigger.trigger_debug_events import TriggerDebugEventGenerator
from core.trigger.entities.entities import TriggerEventData, TriggerInputs
from services.trigger_debug_service import TriggerDebugService
from services.workflow_service import WorkflowService
@ -841,16 +841,16 @@ class DraftWorkflowTriggerNodeApi(Resource):
user_id=current_user.id,
timeout=args.get("timeout", 300),
):
yield event # Pass through all listening events
yield event.to_dict() # Pass through all listening events
# Check if we received the trigger
if isinstance(event, dict) and event.get("event") == "trigger_debug_received":
if isinstance(event, TriggerTriggeredResponse):
# Save trigger data and exit listening loop
trigger_data = TriggerDebugEventData(
subscription_id=event["subscription_id"],
triggers=event["triggers"],
request_id=event["request_id"],
timestamp=event["timestamp"],
trigger_data = TriggerEventData(
subscription_id=event.subscription_id,
triggers=event.triggers,
request_id=event.request_id,
timestamp=event.timestamp,
)
break
@ -858,8 +858,6 @@ class DraftWorkflowTriggerNodeApi(Resource):
if trigger_data:
# Create trigger inputs
trigger_inputs = TriggerInputs.from_trigger_data(trigger_data)
event_generator = TriggerDebugEventGenerator()
try:
# Get workflow and execute node
workflow_service = WorkflowService()
@ -878,10 +876,20 @@ class DraftWorkflowTriggerNodeApi(Resource):
)
# Generate node finished event
yield event_generator.generate_node_finished(node_execution).to_dict()
yield TriggerNodeFinishedResponse(
task_id="",
id=node_execution.id,
node_id=node_execution.node_id,
node_type=node_execution.node_type,
status=node_execution.status,
outputs=node_execution.outputs_dict,
error=node_execution.error,
elapsed_time=node_execution.elapsed_time,
execution_metadata=node_execution.execution_metadata_dict,
).to_dict()
except Exception as e:
yield event_generator.generate_error(str(e)).to_dict()
yield ErrorStreamResponse(task_id="", err=e).to_dict()
# Use standard response format
from core.app.apps.base_app_generator import BaseAppGenerator
@ -912,9 +920,8 @@ class DraftWorkflowTriggerRunApi(Resource):
args = parser.parse_args()
from core.app.entities.app_invoke_entities import InvokeFrom
from core.trigger.entities.entities import TriggerDebugEventData, TriggerInputs
from core.trigger.entities.entities import TriggerEventData, TriggerInputs
from services.app_generate_service import AppGenerateService
from services.trigger.trigger_debug_events import TriggerDebugEventGenerator
from services.trigger_debug_service import TriggerDebugService
def generate(current_user: Account):
@ -926,26 +933,21 @@ class DraftWorkflowTriggerRunApi(Resource):
user_id=current_user.id,
timeout=args.get("timeout", 300),
):
yield event # Pass through all listening events
yield event.to_dict()
# Check if we received the trigger
if isinstance(event, dict) and event.get("event") == "trigger_debug_received":
if isinstance(event, TriggerTriggeredResponse):
# Save trigger data and exit listening loop
trigger_data = TriggerDebugEventData(
subscription_id=event["subscription_id"],
triggers=event["triggers"],
request_id=event["request_id"],
timestamp=event["timestamp"],
trigger_data = TriggerEventData(
subscription_id=event.subscription_id,
triggers=event.triggers,
request_id=event.request_id,
timestamp=event.timestamp,
)
break
# Phase 2: Execute workflow if trigger was received
if trigger_data:
event_generator = TriggerDebugEventGenerator()
# Yield workflow started event
yield event_generator.generate_workflow_started(trigger_data).to_dict()
# Create trigger inputs and convert to workflow args
trigger_inputs = TriggerInputs.from_trigger_data(trigger_data)
combined_args = trigger_inputs.to_workflow_args()
@ -964,7 +966,7 @@ class DraftWorkflowTriggerRunApi(Resource):
yield from workflow_response
except Exception as e:
yield event_generator.generate_error(str(e)).to_dict()
yield ErrorStreamResponse(task_id="", err=e).to_dict()
# Use standard response format
from core.app.apps.base_app_generator import BaseAppGenerator

View File

@ -51,12 +51,10 @@ class QueueEvent(StrEnum):
PING = "ping"
STOP = "stop"
RETRY = "retry"
# Trigger debug events
TRIGGER_DEBUG_LISTENING_STARTED = "trigger_debug_listening_started"
TRIGGER_DEBUG_RECEIVED = "trigger_debug_received"
TRIGGER_DEBUG_NODE_FINISHED = "trigger_debug_node_finished"
TRIGGER_DEBUG_WORKFLOW_STARTED = "trigger_debug_workflow_started"
TRIGGER_DEBUG_TIMEOUT = "trigger_debug_timeout"
TRIGGER_LISTENING_STARTED = "trigger_listening_started"
TRIGGER_TRIGGERED = "trigger_triggered"
TRIGGER_NODE_FINISHED = "trigger_node_finished"
TRIGGER_LISTENING_TIMEOUT = "trigger_listening_timeout"
class AppQueueEvent(BaseModel):
@ -724,35 +722,35 @@ class QueueParallelBranchRunSucceededEvent(AppQueueEvent):
"""loop id if node is in loop"""
class QueueTriggerDebugListeningStartedEvent(AppQueueEvent):
class QueueTriggerListeningStartedEvent(AppQueueEvent):
"""
QueueTriggerDebugListeningStartedEvent entity
QueueTriggerListeningStartedEvent entity
"""
event: QueueEvent = QueueEvent.TRIGGER_DEBUG_LISTENING_STARTED
event: QueueEvent = QueueEvent.TRIGGER_LISTENING_STARTED
session_id: str
webhook_url: str
timeout: int
class QueueTriggerDebugReceivedEvent(AppQueueEvent):
class QueueTriggerTriggeredEvent(AppQueueEvent):
"""
QueueTriggerDebugReceivedEvent entity
QueueTriggerTriggeredEvent entity
"""
event: QueueEvent = QueueEvent.TRIGGER_DEBUG_RECEIVED
event: QueueEvent = QueueEvent.TRIGGER_TRIGGERED
subscription_id: str
triggers: list[str]
request_id: str
timestamp: float
class QueueTriggerDebugNodeFinishedEvent(AppQueueEvent):
class QueueTriggerNodeFinishedEvent(AppQueueEvent):
"""
QueueTriggerDebugNodeFinishedEvent entity
QueueTriggerNodeFinishedEvent entity
"""
event: QueueEvent = QueueEvent.TRIGGER_DEBUG_NODE_FINISHED
event: QueueEvent = QueueEvent.TRIGGER_NODE_FINISHED
id: str
node_id: str
node_type: str
@ -763,23 +761,12 @@ class QueueTriggerDebugNodeFinishedEvent(AppQueueEvent):
execution_metadata: Optional[Mapping[str, Any]] = None
class QueueTriggerDebugWorkflowStartedEvent(AppQueueEvent):
class QueueTriggerListeningTimeoutEvent(AppQueueEvent):
"""
QueueTriggerDebugWorkflowStartedEvent entity
QueueTriggerListeningTimeoutEvent entity
"""
event: QueueEvent = QueueEvent.TRIGGER_DEBUG_WORKFLOW_STARTED
subscription_id: str
triggers: list[str]
request_id: str
class QueueTriggerDebugTimeoutEvent(AppQueueEvent):
"""
QueueTriggerDebugTimeoutEvent entity
"""
event: QueueEvent = QueueEvent.TRIGGER_DEBUG_TIMEOUT
event: QueueEvent = QueueEvent.TRIGGER_LISTENING_TIMEOUT
error: str = "Timeout waiting for trigger"

View File

@ -83,11 +83,10 @@ class StreamEvent(Enum):
TEXT_REPLACE = "text_replace"
AGENT_LOG = "agent_log"
# Trigger debug events
TRIGGER_DEBUG_LISTENING_STARTED = "trigger_debug_listening_started"
TRIGGER_DEBUG_RECEIVED = "trigger_debug_received"
TRIGGER_DEBUG_NODE_FINISHED = "trigger_debug_node_finished"
TRIGGER_DEBUG_WORKFLOW_STARTED = "trigger_debug_workflow_started"
TRIGGER_DEBUG_TIMEOUT = "trigger_debug_timeout"
TRIGGER_LISTENING_STARTED = "trigger_listening_started"
TRIGGER_LISTENING_TIMEOUT = "trigger_listening_timeout"
TRIGGER_TRIGGERED = "trigger_triggered"
TRIGGER_NODE_FINISHED = "trigger_node_finished"
class StreamResponse(BaseModel):
@ -846,35 +845,35 @@ class AgentLogStreamResponse(StreamResponse):
# Trigger Debug Stream Responses
class TriggerDebugListeningStartedResponse(StreamResponse):
class TriggerListeningStartedResponse(StreamResponse):
"""
TriggerDebugListeningStartedResponse entity
TriggerListeningStartedResponse entity
"""
event: StreamEvent = StreamEvent.TRIGGER_DEBUG_LISTENING_STARTED
event: StreamEvent = StreamEvent.TRIGGER_LISTENING_STARTED
session_id: str
webhook_url: str
timeout: int
class TriggerDebugReceivedResponse(StreamResponse):
class TriggerTriggeredResponse(StreamResponse):
"""
TriggerDebugReceivedResponse entity
TriggerTriggeredResponse entity
"""
event: StreamEvent = StreamEvent.TRIGGER_DEBUG_RECEIVED
event: StreamEvent = StreamEvent.TRIGGER_TRIGGERED
subscription_id: str
triggers: list[str]
request_id: str
timestamp: float
class TriggerDebugNodeFinishedResponse(StreamResponse):
class TriggerNodeFinishedResponse(StreamResponse):
"""
TriggerDebugNodeFinishedResponse entity
TriggerNodeFinishedResponse entity
"""
event: StreamEvent = StreamEvent.TRIGGER_DEBUG_NODE_FINISHED
event: StreamEvent = StreamEvent.TRIGGER_NODE_FINISHED
id: str
node_id: str
node_type: str
@ -885,21 +884,10 @@ class TriggerDebugNodeFinishedResponse(StreamResponse):
execution_metadata: Optional[Mapping[str, Any]] = None
class TriggerDebugWorkflowStartedResponse(StreamResponse):
class TriggerListeningTimeoutResponse(StreamResponse):
"""
TriggerDebugWorkflowStartedResponse entity
TriggerListeningTimeoutResponse entity
"""
event: StreamEvent = StreamEvent.TRIGGER_DEBUG_WORKFLOW_STARTED
subscription_id: str
triggers: list[str]
request_id: str
class TriggerDebugTimeoutResponse(StreamResponse):
"""
TriggerDebugTimeoutResponse entity
"""
event: StreamEvent = StreamEvent.TRIGGER_DEBUG_TIMEOUT
event: StreamEvent = StreamEvent.TRIGGER_LISTENING_TIMEOUT
error: str = "Timeout waiting for trigger"

View File

@ -251,8 +251,8 @@ class SubscriptionBuilderUpdater(BaseModel):
subscription_builder.expires_at = self.expires_at
class TriggerDebugEventData(BaseModel):
"""Debug event data dispatched to debug sessions."""
class TriggerEventData(BaseModel):
"""Event data dispatched to trigger sessions."""
subscription_id: str
triggers: list[str]
@ -270,7 +270,7 @@ class TriggerInputs(BaseModel):
subscription_id: str
@classmethod
def from_trigger_data(cls, trigger_data: TriggerDebugEventData) -> "TriggerInputs":
def from_trigger_data(cls, trigger_data: TriggerEventData) -> "TriggerInputs":
"""Create from debug event data."""
return cls(
request_id=trigger_data.request_id,
@ -298,9 +298,9 @@ __all__ = [
"RequestLog",
"Subscription",
"SubscriptionBuilder",
"TriggerDebugEventData",
"TriggerDescription",
"TriggerEntity",
"TriggerEventData",
"TriggerIdentity",
"TriggerInputs",
"TriggerParameter",

View File

@ -84,6 +84,7 @@ class TriggerSubscription(Base):
properties=self.properties,
credential_type=CredentialType(self.credential_type),
credentials=self.credentials,
workflows_in_use=-1,
)

View File

@ -1,48 +0,0 @@
"""
Event generator for trigger debug operations.
Provides structured event generation for trigger debug SSE streams.
"""
from core.app.entities.task_entities import (
ErrorStreamResponse,
StreamResponse,
TriggerDebugNodeFinishedResponse,
TriggerDebugWorkflowStartedResponse,
)
from core.trigger.entities.entities import TriggerDebugEventData
from models.workflow import WorkflowNodeExecutionModel
class TriggerDebugEventGenerator:
"""Generator for trigger debug events."""
@staticmethod
def generate_node_finished(node_execution: WorkflowNodeExecutionModel) -> StreamResponse:
"""Generate node finished event."""
return TriggerDebugNodeFinishedResponse(
task_id="",
id=node_execution.id,
node_id=node_execution.node_id,
node_type=node_execution.node_type,
status=node_execution.status,
outputs=node_execution.outputs_dict,
error=node_execution.error,
elapsed_time=node_execution.elapsed_time,
execution_metadata=node_execution.execution_metadata_dict,
)
@staticmethod
def generate_workflow_started(event_data: TriggerDebugEventData) -> StreamResponse:
"""Generate workflow started event."""
return TriggerDebugWorkflowStartedResponse(
task_id="",
subscription_id=event_data.subscription_id,
triggers=event_data.triggers,
request_id=event_data.request_id,
)
@staticmethod
def generate_error(error: str) -> StreamResponse:
"""Generate error event."""
return ErrorStreamResponse(task_id="", err=Exception(error))

View File

@ -64,25 +64,22 @@ class TriggerProviderService:
.all()
)
subscriptions = [subscription.to_api_entity() for subscription in subscriptions_db]
# Get distinct app count for each subscription
if subscriptions:
usage_counts = (
session.query(
WorkflowPluginTrigger.subscription_id,
func.count(func.distinct(WorkflowPluginTrigger.app_id)).label("app_count"),
)
.filter(
WorkflowPluginTrigger.tenant_id == tenant_id,
WorkflowPluginTrigger.subscription_id.in_([s.id for s in subscriptions]),
)
.group_by(WorkflowPluginTrigger.subscription_id)
.all()
if not subscriptions:
return []
usage_counts = (
session.query(
WorkflowPluginTrigger.subscription_id,
func.count(func.distinct(WorkflowPluginTrigger.app_id)).label("app_count"),
)
# Convert query result to dictionary: subscription_id -> distinct app count
workflows_in_use_map = {str(row.subscription_id): int(row.app_count) for row in usage_counts}
.filter(
WorkflowPluginTrigger.tenant_id == tenant_id,
WorkflowPluginTrigger.subscription_id.in_([s.id for s in subscriptions]),
)
.group_by(WorkflowPluginTrigger.subscription_id)
.all()
)
workflows_in_use_map = {str(row.subscription_id): int(row.app_count) for row in usage_counts}
# Process subscriptions and mask credentials
provider_controller = TriggerManager.get_trigger_provider(tenant_id, provider_id)
for subscription in subscriptions:
encrypter, _ = create_trigger_provider_encrypter_for_subscription(
@ -91,7 +88,6 @@ class TriggerProviderService:
subscription=subscription,
)
subscription.credentials = encrypter.mask_credentials(subscription.credentials)
# Set workflows_in_use count, default to 0 if not found
count = workflows_in_use_map.get(subscription.id)
subscription.workflows_in_use = count if count is not None else 0

View File

@ -11,7 +11,6 @@ import time
import uuid
from collections.abc import Generator
from dataclasses import dataclass
from typing import Any
from flask import request
from werkzeug.exceptions import NotFound
@ -19,11 +18,12 @@ from werkzeug.exceptions import NotFound
from core.app.entities.task_entities import (
ErrorStreamResponse,
PingStreamResponse,
TriggerDebugListeningStartedResponse,
TriggerDebugReceivedResponse,
TriggerDebugTimeoutResponse,
StreamResponse,
TriggerListeningStartedResponse,
TriggerListeningTimeoutResponse,
TriggerTriggeredResponse,
)
from core.trigger.entities.entities import TriggerDebugEventData
from core.trigger.entities.entities import TriggerEventData
from extensions.ext_redis import redis_client
from models.model import App
from services.trigger.trigger_provider_service import TriggerProviderService
@ -92,7 +92,7 @@ class TriggerDebugService:
if not node_config:
raise NotFound(f"Node {node_id} not found")
if node_config.get("data", {}).get("type") != "plugin":
if node_config.get("data", {}).get("type") != "trigger-plugin":
raise ValueError("Node is not a trigger plugin node")
subscription_id = node_config.get("data", {}).get("subscription_id")
@ -134,7 +134,7 @@ class TriggerDebugService:
node_id: str,
user_id: str,
timeout: int = __DEFAULT_LISTEN_TIMEOUT__,
) -> Generator[dict[str, Any], None, None]:
) -> Generator[StreamResponse, None, None]:
"""
Listen for trigger events only.
@ -165,7 +165,7 @@ class TriggerDebugService:
yield event
# If we received a trigger, listening is complete
if isinstance(event, dict) and event.get("event") == "trigger_debug_received":
if isinstance(event, dict) and event.get("event") == "trigger_triggered":
break
@classmethod
@ -209,7 +209,7 @@ class TriggerDebugService:
@classmethod
def listen_for_events(
cls, session_id: str, webhook_url: str, timeout: int = __DEFAULT_LISTEN_TIMEOUT__
) -> Generator[dict[str, Any], None, None]:
) -> Generator[StreamResponse, None, None]:
"""
Listen for events using Redis Pub/Sub and generate structured events.
@ -222,12 +222,12 @@ class TriggerDebugService:
Structured AppQueueEvent objects
"""
# Send initial listening started event
yield TriggerDebugListeningStartedResponse(
yield TriggerListeningStartedResponse(
task_id="", # Will be set by the caller if needed
session_id=session_id,
webhook_url=webhook_url,
timeout=timeout,
).to_dict()
)
pubsub = redis_client.pubsub()
channel = f"{cls.PUBSUB_CHANNEL_PREFIX}{session_id}"
@ -251,39 +251,39 @@ class TriggerDebugService:
logger.info("Received trigger event for session %s", session_id)
# Create structured trigger received event
trigger_data = TriggerDebugEventData(
trigger_data = TriggerEventData(
subscription_id=event_data["subscription_id"],
triggers=event_data["triggers"],
request_id=event_data["request_id"],
timestamp=event_data.get("timestamp", time.time()),
)
yield TriggerDebugReceivedResponse(
yield TriggerTriggeredResponse(
task_id="",
subscription_id=trigger_data.subscription_id,
triggers=trigger_data.triggers,
request_id=trigger_data.request_id,
timestamp=trigger_data.timestamp,
).to_dict()
)
break # End listening after receiving event
except (json.JSONDecodeError, KeyError) as e:
logger.exception("Failed to parse trigger event")
yield ErrorStreamResponse(
task_id="", err=Exception(f"Failed to parse trigger event: {str(e)}")
).to_dict()
)
break
# Send periodic heartbeat
if time.time() - last_heartbeat > 5:
yield PingStreamResponse(task_id="").to_dict()
yield PingStreamResponse(task_id="")
last_heartbeat = time.time()
# Timeout
if time.time() - start_time >= timeout:
yield TriggerDebugTimeoutResponse(task_id="").to_dict()
yield TriggerListeningTimeoutResponse(task_id="")
except Exception as e:
logger.exception("Error in listen_for_events", exc_info=e)
yield ErrorStreamResponse(task_id="", err=e).to_dict()
yield ErrorStreamResponse(task_id="", err=e)
finally:
# Clean up resources
@ -320,7 +320,7 @@ class TriggerDebugService:
logger.exception("Error closing session %s", session_id, exc_info=e)
@classmethod
def dispatch_to_debug_sessions(cls, subscription_id: str, event_data: TriggerDebugEventData) -> int:
def dispatch_to_debug_sessions(cls, subscription_id: str, event_data: TriggerEventData) -> int:
"""
Dispatch events to debug sessions using Pub/Sub only.

View File

@ -8,7 +8,7 @@ from sqlalchemy.orm import Session
from core.plugin.entities.plugin import TriggerProviderID
from core.plugin.utils.http_parser import serialize_request
from core.trigger.entities.entities import TriggerDebugEventData, TriggerEntity, TriggerInputs
from core.trigger.entities.entities import TriggerEntity, TriggerEventData, TriggerInputs
from core.trigger.trigger_manager import TriggerManager
from extensions.ext_database import db
from extensions.ext_storage import storage
@ -133,7 +133,7 @@ class TriggerService:
"""
try:
# Prepare streamlined event data using Pydantic model
debug_data = TriggerDebugEventData(
debug_data = TriggerEventData(
subscription_id=subscription_id,
triggers=triggers,
request_id=request_id,