feat(trigger): implement debug session capabilities for trigger nodes

- Added `DraftWorkflowTriggerNodeApi` to handle debugging of trigger nodes, allowing for real-time event listening and session management.
- Introduced `TriggerDebugService` for managing debug sessions and event dispatching using Redis Pub/Sub.
- Updated `TriggerService` to support dispatching events to debug sessions and refactored related methods for improved clarity and functionality.
- Enhanced data structures in `request.py` and `entities.py` to accommodate new debug event data requirements.

These changes significantly improve the debugging capabilities for trigger nodes in draft workflows, facilitating better development and troubleshooting processes.
This commit is contained in:
Harry 2025-09-09 21:27:31 +08:00
parent e8403977b9
commit 5a15419baf
8 changed files with 420 additions and 92 deletions

View File

@ -806,6 +806,72 @@ class DraftWorkflowNodeLastRunApi(Resource):
return node_exec
class DraftWorkflowTriggerNodeApi(Resource):
@setup_required
@login_required
@account_initialization_required
@get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW])
def post(self, app_model: App, node_id: str):
"""
Debug trigger node by creating a debug session and listening for events.
"""
srv = WorkflowService()
workflow = srv.get_draft_workflow(app_model)
if not workflow:
raise NotFound("Workflow not found")
# Get node configuration
node_config = workflow.get_node_config_by_id(node_id)
if not node_config:
raise NotFound(f"Node {node_id} not found in workflow")
# Validate it's a trigger plugin node
if node_config.get("data", {}).get("type") != "plugin":
raise ValueError("Node is not a trigger plugin node")
# Get subscription ID from node config
subscription_id = node_config.get("data", {}).get("subscription_id")
if not subscription_id:
raise ValueError("No subscription ID configured for this trigger node")
# Create debug session
from services.trigger_debug_service import TriggerDebugService
assert isinstance(current_user, Account)
session_id = TriggerDebugService.create_debug_session(
app_id=str(app_model.id),
node_id=node_id,
subscription_id=subscription_id,
user_id=current_user.id,
timeout=300,
)
# Stream events to client
def generate():
for event_data in TriggerDebugService.listen_for_events(session_id):
if isinstance(event_data, dict):
if event_data.get("type") == "heartbeat":
yield f"event: heartbeat\ndata: {json.dumps(event_data)}\n\n"
elif event_data.get("type") == "timeout":
yield f"event: timeout\ndata: {json.dumps({'message': 'Session timed out'})}\n\n"
break
elif event_data.get("type") == "error":
yield f"event: error\ndata: {json.dumps(event_data)}\n\n"
break
else:
# Trigger event - prepare for workflow execution
yield f"event: trigger\ndata: {json.dumps(event_data)}\n\n"
# TODO: Execute workflow with trigger data if needed
# This would involve extracting trigger data and running the workflow
# For now, just send the trigger event
break
from flask import Response
return Response(generate(), mimetype="text/event-stream")
api.add_resource(
DraftWorkflowApi,
"/apps/<uuid:app_id>/workflows/draft",
@ -830,6 +896,10 @@ api.add_resource(
DraftWorkflowNodeRunApi,
"/apps/<uuid:app_id>/workflows/draft/nodes/<string:node_id>/run",
)
api.add_resource(
DraftWorkflowTriggerNodeApi,
"/apps/<uuid:app_id>/workflows/draft/nodes/<string:node_id>/trigger",
)
api.add_resource(
AdvancedChatDraftRunIterationNodeApi,
"/apps/<uuid:app_id>/advanced-chat/workflows/draft/iteration/nodes/<string:node_id>/run",

View File

@ -1,3 +1,4 @@
from collections.abc import Mapping
from typing import Any, Literal, Optional
from flask import Response
@ -239,9 +240,11 @@ class RequestFetchAppInfo(BaseModel):
app_id: str
class Event(BaseModel):
variables: Mapping[str, Any]
class TriggerInvokeResponse(BaseModel):
event: dict[str, Any]
event: Event
class PluginTriggerDispatchResponse(BaseModel):

View File

@ -3,7 +3,7 @@ from datetime import datetime
from enum import StrEnum
from typing import Any, Optional, Union
from pydantic import BaseModel, Field
from pydantic import BaseModel, ConfigDict, Field
from core.entities.provider_entities import ProviderConfig
from core.plugin.entities.parameters import PluginParameterAutoGenerate, PluginParameterOption, PluginParameterTemplate
@ -251,12 +251,24 @@ class SubscriptionBuilderUpdater(BaseModel):
subscription_builder.expires_at = self.expires_at
class TriggerDebugEventData(BaseModel):
"""Debug event data dispatched to debug sessions."""
subscription_id: str
triggers: list[str]
request_id: str
timestamp: float
model_config = ConfigDict(arbitrary_types_allowed=True)
# Export all entities
__all__ = [
"OAuthSchema",
"RequestLog",
"Subscription",
"SubscriptionBuilder",
"TriggerDebugEventData",
"TriggerDescription",
"TriggerEntity",
"TriggerIdentity",

View File

@ -1,11 +1,17 @@
from collections.abc import Mapping
from typing import Any, Optional
from core.plugin.entities.plugin import TriggerProviderID
from core.plugin.utils.http_parser import deserialize_request
from core.trigger.entities.api_entities import TriggerProviderSubscriptionApiEntity
from core.trigger.trigger_manager import TriggerManager
from core.workflow.entities.node_entities import NodeRunResult
from core.workflow.entities.workflow_node_execution import WorkflowNodeExecutionStatus
from core.workflow.nodes.base import BaseNode
from core.workflow.nodes.base.entities import BaseNodeData, RetryConfig
from core.workflow.nodes.enums import ErrorStrategy, NodeType
from extensions.ext_storage import storage
from services.trigger.trigger_provider_service import TriggerProviderService
from .entities import PluginTriggerData
@ -56,10 +62,50 @@ class TriggerPluginNode(BaseNode):
def _run(self) -> NodeRunResult:
"""
Run the plugin trigger node.
"""
node_data = self._node_data
return NodeRunResult(
status=WorkflowNodeExecutionStatus.SUCCEEDED,
outputs={},
)
This node invokes the trigger to convert request data into events
and makes them available to downstream nodes.
"""
# Get trigger data passed when workflow was triggered
trigger_inputs = dict(self.graph_runtime_state.variable_pool.user_inputs)
request_id = trigger_inputs.get("request_id")
trigger_name = trigger_inputs.get("trigger_name", "")
subscription_id = trigger_inputs.get("subscription_id", "")
if not request_id or not subscription_id:
return NodeRunResult(
status=WorkflowNodeExecutionStatus.FAILED,
inputs=trigger_inputs,
outputs={"error": "No request ID or subscription ID available"},
)
try:
subscription: TriggerProviderSubscriptionApiEntity | None = TriggerProviderService.get_subscription_by_id(
tenant_id=self.tenant_id, subscription_id=subscription_id
)
if not subscription:
raise ValueError(f"Subscription {subscription_id} not found")
request = deserialize_request(storage.load_once(f"triggers/{request_id}"))
parameters = self._node_data.parameters if hasattr(self, "_node_data") and self._node_data else {}
invoke_response = TriggerManager.invoke_trigger(
tenant_id=self.tenant_id,
user_id=self.user_id,
provider_id=TriggerProviderID(subscription.provider),
trigger_name=trigger_name,
parameters=parameters,
credentials=subscription.credentials,
credential_type=subscription.credential_type,
request=request,
)
outputs = invoke_response.event.variables or {}
return NodeRunResult(status=WorkflowNodeExecutionStatus.SUCCEEDED, inputs=trigger_inputs, outputs=outputs)
except Exception as e:
return NodeRunResult(
status=WorkflowNodeExecutionStatus.FAILED,
inputs=trigger_inputs,
outputs={"error": f"Failed to invoke trigger: {str(e)}", "request_id": request_id},
)

View File

@ -0,0 +1,200 @@
"""
Trigger debug service for webhook debugging in draft workflows.
This service provides debugging capabilities for trigger nodes by using
Redis Pub/Sub to enable real-time event forwarding across distributed instances.
"""
import json
import logging
import time
import uuid
from collections.abc import Generator
from core.trigger.entities.entities import TriggerDebugEventData
from extensions.ext_redis import redis_client
logger = logging.getLogger(__name__)
class TriggerDebugService:
"""
Trigger debug service - supports distributed environments.
Cleans up resources on disconnect, no reconnection handling.
"""
SESSION_PREFIX = "trigger_debug_session:"
SUBSCRIPTION_DEBUG_PREFIX = "trigger_debug_subscription:"
PUBSUB_CHANNEL_PREFIX = "trigger_debug_channel:"
@classmethod
def create_debug_session(
cls, app_id: str, node_id: str, subscription_id: str, user_id: str, timeout: int = 300
) -> str:
"""
Create a debug session.
Args:
app_id: Application ID
node_id: Node ID being debugged
subscription_id: Subscription ID to monitor
user_id: User ID creating the session
timeout: Session timeout in seconds
Returns:
Session ID
"""
session_id = str(uuid.uuid4())
session_data = {
"session_id": session_id,
"app_id": app_id,
"node_id": node_id,
"subscription_id": subscription_id,
"user_id": user_id,
"created_at": time.time(),
}
# 1. Save session info
redis_client.setex(f"{cls.SESSION_PREFIX}{session_id}", timeout, json.dumps(session_data))
# 2. Register to subscription's debug session set
redis_client.sadd(f"{cls.SUBSCRIPTION_DEBUG_PREFIX}{subscription_id}", session_id)
redis_client.expire(f"{cls.SUBSCRIPTION_DEBUG_PREFIX}{subscription_id}", timeout)
logger.info("Created debug session %s for subscription %s", session_id, subscription_id)
return session_id
@classmethod
def listen_for_events(cls, session_id: str, timeout: int = 300) -> Generator:
"""
Listen for events using Redis Pub/Sub.
Args:
session_id: Debug session ID
timeout: Timeout in seconds
Yields:
Event data or heartbeat messages
"""
pubsub = redis_client.pubsub()
channel = f"{cls.PUBSUB_CHANNEL_PREFIX}{session_id}"
try:
# Subscribe to channel
pubsub.subscribe(channel)
logger.info("Listening on channel: %s", channel)
start_time = time.time()
last_heartbeat = time.time()
# Real-time listening
while time.time() - start_time < timeout:
# Non-blocking message retrieval with 1 second timeout
message = pubsub.get_message(timeout=1.0)
if message and message["type"] == "message":
# Received trigger event
event_data = json.loads(message["data"])
logger.info("Received trigger event for session %s", session_id)
yield event_data
break # End listening after receiving event
# Send periodic heartbeat
if time.time() - last_heartbeat > 5:
yield {"type": "heartbeat", "remaining": int(timeout - (time.time() - start_time))}
last_heartbeat = time.time()
# Timeout
if time.time() - start_time >= timeout:
yield {"type": "timeout"}
except Exception as e:
logger.exception("Error in listen_for_events", exc_info=e)
yield {"type": "error", "message": str(e)}
finally:
# Clean up resources
cls.close_session(session_id)
pubsub.unsubscribe(channel)
pubsub.close()
logger.info("Closed listening for session %s", session_id)
@classmethod
def close_session(cls, session_id: str):
"""
Close and clean up debug session.
Args:
session_id: Session ID to close
"""
try:
# Get session info
session_data = redis_client.get(f"{cls.SESSION_PREFIX}{session_id}")
if session_data:
session = json.loads(session_data)
subscription_id = session.get("subscription_id")
# Remove from subscription set
if subscription_id:
redis_client.srem(f"{cls.SUBSCRIPTION_DEBUG_PREFIX}{subscription_id}", session_id)
logger.info("Removed session %s from subscription %s", session_id, subscription_id)
# Delete session info
redis_client.delete(f"{cls.SESSION_PREFIX}{session_id}")
logger.info("Cleaned up session %s", session_id)
except Exception as e:
logger.exception("Error closing session %s", session_id, exc_info=e)
@classmethod
def dispatch_to_debug_sessions(cls, subscription_id: str, event_data: TriggerDebugEventData) -> int:
"""
Dispatch events to debug sessions using Pub/Sub only.
Args:
subscription_id: Subscription ID
event_data: Event data to dispatch
Returns:
Number of active debug sessions
"""
try:
# Get all listening debug sessions
debug_sessions = redis_client.smembers(f"{cls.SUBSCRIPTION_DEBUG_PREFIX}{subscription_id}")
if not debug_sessions:
return 0
active_sessions = 0
for session_id_bytes in debug_sessions:
if isinstance(session_id_bytes, bytes):
session_id = session_id_bytes.decode("utf-8")
else:
session_id = session_id_bytes
# Verify session is valid
if not redis_client.exists(f"{cls.SESSION_PREFIX}{session_id}"):
# Clean up invalid session
redis_client.srem(f"{cls.SUBSCRIPTION_DEBUG_PREFIX}{subscription_id}", session_id)
continue
# Publish event via Pub/Sub
channel = f"{cls.PUBSUB_CHANNEL_PREFIX}{session_id}"
subscriber_count = redis_client.publish(channel, json.dumps(event_data))
if subscriber_count > 0:
active_sessions += 1
logger.info("Published event to %d subscribers on channel %s", subscriber_count, channel)
else:
# No subscribers, clean up session
logger.info("No subscribers for session %s, cleaning up", session_id)
cls.close_session(session_id)
if active_sessions > 0:
logger.info("Dispatched event to %d active debug sessions", active_sessions)
return active_sessions
except Exception as e:
logger.exception("Failed to dispatch to debug sessions", exc_info=e)
return 0

View File

@ -1,5 +1,5 @@
import json
import logging
import time
import uuid
from flask import Request, Response
@ -8,10 +8,9 @@ from sqlalchemy.orm import Session
from core.plugin.entities.plugin import TriggerProviderID
from core.plugin.utils.http_parser import serialize_request
from core.trigger.entities.entities import TriggerEntity
from core.trigger.entities.entities import TriggerDebugEventData, TriggerEntity
from core.trigger.trigger_manager import TriggerManager
from extensions.ext_database import db
from extensions.ext_redis import redis_client
from extensions.ext_storage import storage
from models.account import Account, TenantAccountJoin, TenantAccountRole
from models.enums import WorkflowRunTriggeredFrom
@ -19,6 +18,7 @@ from models.trigger import TriggerSubscription
from models.workflow import Workflow, WorkflowPluginTrigger
from services.async_workflow_service import AsyncWorkflowService
from services.trigger.trigger_provider_service import TriggerProviderService
from services.trigger_debug_service import TriggerDebugService
from services.workflow.entities import PluginTriggerData
logger = logging.getLogger(__name__)
@ -30,19 +30,27 @@ class TriggerService:
__ENDPOINT_REQUEST_CACHE_EXPIRE_MS__ = 5 * 60 * 1000
@classmethod
def process_triggered_workflows(
cls, subscription: TriggerSubscription, trigger: TriggerEntity, request: Request
) -> None:
"""Process triggered workflows."""
def dispatch_triggered_workflows(
cls, subscription: TriggerSubscription, trigger: TriggerEntity, request_id: str
) -> int:
"""Process triggered workflows.
subscribers = cls._get_subscriber_triggers(subscription=subscription, trigger=trigger)
Args:
subscription: The trigger subscription
trigger: The trigger entity that was activated
request_id: The ID of the stored request in storage system
"""
subscribers = cls.get_subscriber_triggers(
tenant_id=subscription.tenant_id, subscription_id=subscription.id, trigger_name=trigger.identity.name
)
if not subscribers:
logger.warning(
"No workflows found for trigger '%s' in subscription '%s'",
trigger.identity.name,
subscription.id,
)
return
return 0
with Session(db.engine) as session:
# Get tenant owner for workflow execution
@ -57,10 +65,10 @@ class TriggerService:
if not tenant_owner:
logger.error("Tenant owner not found for tenant %s", subscription.tenant_id)
return
return 0
dispatched_count = 0
for plugin_trigger in subscribers:
# 2. Get workflow
# Get workflow
workflow = session.scalar(
select(Workflow)
.where(
@ -77,14 +85,7 @@ class TriggerService:
)
continue
# Get trigger parameters from node configuration
node_config = workflow.get_node_config_by_id(plugin_trigger.node_id)
parameters = node_config.get("data", {}).get("parameters", {}) if node_config else {}
# 3. Store trigger data
storage_key = cls._store_trigger_data(request, subscription, trigger, parameters)
# 4. Create trigger data for async execution
# Create trigger data for async execution
trigger_data = PluginTriggerData(
app_id=plugin_trigger.app_id,
tenant_id=subscription.tenant_id,
@ -92,13 +93,18 @@ class TriggerService:
root_node_id=plugin_trigger.node_id,
trigger_type=WorkflowRunTriggeredFrom.PLUGIN,
plugin_id=subscription.provider_id,
webhook_url=f"trigger/endpoint/{subscription.endpoint_id}", # For tracking
inputs={"storage_key": storage_key}, # Pass storage key to async task
endpoint_id=subscription.endpoint_id,
inputs={
"request_id": request_id,
"trigger_name": trigger.identity.name,
"subscription_id": subscription.id,
},
)
# 5. Trigger async workflow
# Trigger async workflow
try:
AsyncWorkflowService.trigger_workflow_async(session, tenant_owner, trigger_data)
dispatched_count += 1
logger.info(
"Triggered workflow for app %s with trigger %s",
plugin_trigger.app_id,
@ -110,21 +116,37 @@ class TriggerService:
plugin_trigger.app_id,
)
return dispatched_count
@classmethod
def select_triggers(cls, controller, dispatch_response, provider_id, subscription) -> list[TriggerEntity]:
triggers = []
for trigger_name in dispatch_response.triggers:
trigger = controller.get_trigger(trigger_name)
if trigger is None:
logger.error(
"Trigger '%s' not found in provider '%s' for tenant '%s'",
trigger_name,
provider_id,
subscription.tenant_id,
)
raise ValueError(f"Trigger '{trigger_name}' not found")
triggers.append(trigger)
return triggers
def dispatch_debugging_sessions(
cls, subscription_id: str, request: Request, triggers: list[str], request_id: str
) -> int:
"""
Dispatch to debug sessions - simplified version.
Args:
subscription_id: Subscription ID
request: Original request
triggers: List of trigger names
request_id: Request ID for storage reference
"""
try:
# Prepare streamlined event data using Pydantic model
debug_data = TriggerDebugEventData(
subscription_id=subscription_id,
triggers=triggers,
request_id=request_id,
timestamp=time.time(),
)
return TriggerDebugService.dispatch_to_debug_sessions(
subscription_id=subscription_id, event_data=debug_data
)
except Exception as e:
# Silent failure, don't affect production
logger.exception("Debug dispatch failed", exc_info=e)
return 0
@classmethod
def process_endpoint(cls, endpoint_id: str, request: Request) -> Response | None:
@ -167,53 +189,16 @@ class TriggerService:
return dispatch_response.response
@classmethod
def _get_subscriber_triggers(
cls, subscription: TriggerSubscription, trigger: TriggerEntity
def get_subscriber_triggers(
cls, tenant_id: str, subscription_id: str, trigger_name: str
) -> list[WorkflowPluginTrigger]:
"""Get WorkflowPluginTriggers for a subscription and trigger."""
with Session(db.engine, expire_on_commit=False) as session:
subscribers = session.scalars(
select(WorkflowPluginTrigger).where(
WorkflowPluginTrigger.tenant_id == subscription.tenant_id,
WorkflowPluginTrigger.subscription_id == subscription.id,
WorkflowPluginTrigger.trigger_name == trigger.identity.name,
WorkflowPluginTrigger.tenant_id == tenant_id,
WorkflowPluginTrigger.subscription_id == subscription_id,
WorkflowPluginTrigger.trigger_name == trigger_name,
)
).all()
return list(subscribers)
@classmethod
def _store_trigger_data(
cls,
request: Request,
subscription: TriggerSubscription,
trigger: TriggerEntity,
parameters: dict,
) -> str:
"""Store trigger data in storage and return key."""
storage_key = f"trigger_data_{uuid.uuid4().hex}"
# Prepare data to store
trigger_data = {
"request": {
"method": request.method,
"headers": dict(request.headers),
"query_params": dict(request.args),
"body": request.get_data(as_text=True),
},
"subscription": {
"id": subscription.id,
"provider_id": subscription.provider_id,
"credentials": subscription.credentials,
"credential_type": subscription.credential_type,
},
"trigger": {
"name": trigger.identity.name,
"parameters": parameters,
},
"user_id": subscription.user_id,
}
# Store with 1 hour TTL using Redis
redis_client.setex(storage_key, 3600, json.dumps(trigger_data))
return storage_key

View File

@ -55,7 +55,7 @@ class PluginTriggerData(TriggerData):
trigger_type: WorkflowRunTriggeredFrom = WorkflowRunTriggeredFrom.PLUGIN
plugin_id: str
webhook_url: str
endpoint_id: str
class WorkflowTaskData(BaseModel):

View File

@ -88,12 +88,11 @@ def dispatch_triggered_workflows_async(
)
continue
TriggerService.process_triggered_workflows(
dispatched_count += TriggerService.dispatch_triggered_workflows(
subscription=subscription,
trigger=trigger,
request=request,
request_id=request_id,
)
dispatched_count += 1
except Exception:
logger.exception(
@ -104,6 +103,18 @@ def dispatch_triggered_workflows_async(
# Continue processing other triggers even if one fails
continue
# Dispatch to debug sessions after processing all triggers
try:
debug_dispatched = TriggerService.dispatch_debugging_sessions(
subscription_id=subscription_id,
request=request,
triggers=triggers,
request_id=request_id,
)
except Exception:
# Silent failure for debug dispatch
logger.exception("Failed to dispatch to debug sessions")
logger.info(
"Completed async trigger dispatching: processed %d/%d triggers",
dispatched_count,
@ -117,8 +128,9 @@ def dispatch_triggered_workflows_async(
return {
"status": "completed",
"dispatched_count": dispatched_count,
"total_count": len(triggers),
"dispatched_count": dispatched_count,
"debug_dispatched_count": debug_dispatched,
}
except Exception as e: