feat(trigger): enhance trigger debugging with polling API and new subscription retrieval

- Refactored DraftWorkflowTriggerNodeApi and DraftWorkflowTriggerRunApi to implement polling for trigger events instead of listening, improving responsiveness and reliability.
- Introduced TriggerSubscriptionBuilderGetApi to retrieve subscription instances for trigger providers, enhancing the API's capabilities.
- Removed deprecated trigger event classes and streamlined event handling in TriggerDebugService, ensuring a cleaner architecture.
- Updated Queue and Stream entities to reflect the changes in trigger event handling, improving overall clarity and maintainability.

These enhancements significantly improve the trigger debugging experience and API usability.
This commit is contained in:
Harry 2025-09-14 19:12:20 +08:00
parent 91e5e33440
commit 76850749e4
11 changed files with 233 additions and 618 deletions

View File

@ -22,9 +22,9 @@ from controllers.web.error import InvokeRateLimitError as InvokeRateLimitHttpErr
from core.app.app_config.features.file_upload.manager import FileUploadConfigManager
from core.app.apps.base_app_queue_manager import AppQueueManager
from core.app.entities.app_invoke_entities import InvokeFrom
from core.app.entities.task_entities import ErrorStreamResponse, TriggerNodeFinishedResponse, TriggerTriggeredResponse
from core.file.models import File
from core.helper.trace_id_helper import get_external_trace_id
from core.model_runtime.utils.encoders import jsonable_encoder
from extensions.ext_database import db
from factories import file_factory, variable_factory
from fields.workflow_fields import workflow_fields, workflow_pagination_fields
@ -39,6 +39,7 @@ from models.workflow import Workflow
from services.app_generate_service import AppGenerateService
from services.errors.app import WorkflowHashNotEqualError
from services.errors.llm import InvokeRateLimitError
from services.trigger_debug_service import TriggerDebugService
from services.workflow_service import DraftWorkflowDeletionError, WorkflowInUseError, WorkflowService
logger = logging.getLogger(__name__)
@ -809,7 +810,7 @@ class DraftWorkflowNodeLastRunApi(Resource):
class DraftWorkflowTriggerNodeApi(Resource):
"""
Single node debug - Listen for trigger events and execute single Trigger node
Single node debug - Polling API for trigger events
Path: /apps/<uuid:app_id>/workflows/draft/nodes/<string:node_id>/trigger
"""
@ -819,88 +820,57 @@ class DraftWorkflowTriggerNodeApi(Resource):
@get_app_model(mode=[AppMode.WORKFLOW])
def post(self, app_model: App, node_id: str):
"""
Debug trigger node by listening for events and executing the node
Poll for trigger events and execute single node when event arrives
"""
if not isinstance(current_user, Account) or not current_user.is_editor:
raise Forbidden()
parser = reqparse.RequestParser()
parser.add_argument("timeout", type=int, default=300, location="json")
parser.add_argument("trigger_name", type=str, required=True, location="json")
parser.add_argument("subscription_id", type=str, required=True, location="json")
args = parser.parse_args()
trigger_name = args["trigger_name"]
subscription_id = args["subscription_id"]
event = TriggerDebugService.poll_event(
tenant_id=app_model.tenant_id,
user_id=current_user.id,
app_id=app_model.id,
subscription_id=subscription_id,
node_id=node_id,
trigger_name=trigger_name,
)
if not event:
return jsonable_encoder({"status": "waiting"})
from core.trigger.entities.entities import TriggerEventData, TriggerInputs
from services.trigger_debug_service import TriggerDebugService
from services.workflow_service import WorkflowService
try:
workflow_service = WorkflowService()
draft_workflow = workflow_service.get_draft_workflow(app_model)
if not draft_workflow:
raise ValueError("Workflow not found")
def generate(current_user: Account):
# Phase 1: Listen for trigger events
trigger_data = None
for event in TriggerDebugService.waiting_for_triggered(
user_inputs = event.model_dump()
node_execution = workflow_service.run_draft_workflow_node(
app_model=app_model,
draft_workflow=draft_workflow,
node_id=node_id,
user_id=current_user.id,
timeout=args.get("timeout", 300),
):
yield event.to_dict() # Pass through all listening events
# Check if we received the trigger
if isinstance(event, TriggerTriggeredResponse):
# Save trigger data and exit listening loop
trigger_data = TriggerEventData(
subscription_id=event.subscription_id,
triggers=event.triggers,
request_id=event.request_id,
timestamp=event.timestamp,
)
break
# Phase 2: Execute node if trigger was received
if trigger_data:
# Create trigger inputs
trigger_inputs = TriggerInputs.from_trigger_data(trigger_data)
try:
# Get workflow and execute node
workflow_service = WorkflowService()
draft_workflow = workflow_service.get_draft_workflow(app_model)
if not draft_workflow:
raise ValueError("Workflow not found")
node_execution = workflow_service.run_draft_workflow_node(
app_model=app_model,
draft_workflow=draft_workflow,
node_id=node_id,
user_inputs=trigger_inputs.to_dict(),
account=current_user,
query="",
files=[],
)
# Generate node finished event
yield TriggerNodeFinishedResponse(
task_id="",
id=node_execution.id,
node_id=node_execution.node_id,
node_type=node_execution.node_type,
status=node_execution.status,
outputs=node_execution.outputs_dict,
error=node_execution.error,
elapsed_time=node_execution.elapsed_time,
execution_metadata=node_execution.execution_metadata_dict,
).to_dict()
except Exception as e:
yield ErrorStreamResponse(task_id="", err=e).to_dict()
# Use standard response format
from core.app.apps.base_app_generator import BaseAppGenerator
response = BaseAppGenerator.convert_to_event_stream(generate(current_user))
return helper.compact_generate_response(response)
user_inputs=user_inputs,
account=current_user,
query="",
files=[],
)
return jsonable_encoder(node_execution)
except Exception:
logger.exception("Error running draft workflow trigger node")
return jsonable_encoder(
{
"status": "error",
}
), 500
class DraftWorkflowTriggerRunApi(Resource):
"""
Full workflow debug - Listen for trigger events and execute complete workflow
Full workflow debug - Polling API for trigger events
Path: /apps/<uuid:app_id>/workflows/draft/trigger/run
"""
@ -910,69 +880,58 @@ class DraftWorkflowTriggerRunApi(Resource):
@get_app_model(mode=[AppMode.WORKFLOW])
def post(self, app_model: App):
"""
Debug trigger workflow by listening for events and running full workflow
Poll for trigger events and execute full workflow when event arrives
"""
if not isinstance(current_user, Account) or not current_user.is_editor:
raise Forbidden()
parser = reqparse.RequestParser()
parser.add_argument("node_id", type=str, required=True, location="json")
parser.add_argument("timeout", type=int, default=300, location="json")
parser.add_argument("trigger_name", type=str, required=True, location="json")
parser.add_argument("subscription_id", type=str, required=True, location="json")
args = parser.parse_args()
node_id = args["node_id"]
trigger_name = args["trigger_name"]
subscription_id = args["subscription_id"]
from core.app.entities.app_invoke_entities import InvokeFrom
from core.trigger.entities.entities import TriggerEventData, TriggerInputs
from services.app_generate_service import AppGenerateService
from services.trigger_debug_service import TriggerDebugService
event = TriggerDebugService.poll_event(
tenant_id=app_model.tenant_id,
user_id=current_user.id,
app_id=app_model.id,
subscription_id=subscription_id,
node_id=node_id,
trigger_name=trigger_name,
)
if not event:
return jsonable_encoder({"status": "waiting"})
def generate(current_user: Account):
# Phase 1: Listen for trigger events
trigger_data = None
for event in TriggerDebugService.waiting_for_triggered(
workflow_args = {
"inputs": event.model_dump(),
"query": "",
"files": [],
}
external_trace_id = get_external_trace_id(request)
if external_trace_id:
workflow_args["external_trace_id"] = external_trace_id
try:
response = AppGenerateService.generate(
app_model=app_model,
node_id=args["node_id"],
user_id=current_user.id,
timeout=args.get("timeout", 300),
):
yield event.to_dict()
# Check if we received the trigger
if isinstance(event, TriggerTriggeredResponse):
# Save trigger data and exit listening loop
trigger_data = TriggerEventData(
subscription_id=event.subscription_id,
triggers=event.triggers,
request_id=event.request_id,
timestamp=event.timestamp,
)
break
# Phase 2: Execute workflow if trigger was received
if trigger_data:
# Create trigger inputs and convert to workflow args
trigger_inputs = TriggerInputs.from_trigger_data(trigger_data)
combined_args = trigger_inputs.to_workflow_args()
try:
# Execute workflow
workflow_response = AppGenerateService.generate(
app_model=app_model,
user=current_user,
args=combined_args,
invoke_from=InvokeFrom.DEBUGGER,
streaming=True,
)
# Pass through workflow's standard event stream
yield from workflow_response
except Exception as e:
yield ErrorStreamResponse(task_id="", err=e).to_dict()
# Use standard response format
from core.app.apps.base_app_generator import BaseAppGenerator
response = BaseAppGenerator.convert_to_event_stream(generate(current_user))
return helper.compact_generate_response(response)
user=current_user,
args=workflow_args,
invoke_from=InvokeFrom.DEBUGGER,
streaming=True,
)
return helper.compact_generate_response(response)
except InvokeRateLimitError as ex:
raise InvokeRateLimitHttpError(ex.description)
except Exception:
logger.exception("Error running draft workflow trigger run")
return jsonable_encoder(
{
"status": "error",
}
), 500
api.add_resource(

View File

@ -92,6 +92,17 @@ class TriggerSubscriptionBuilderCreateApi(Resource):
raise
class TriggerSubscriptionBuilderGetApi(Resource):
@setup_required
@login_required
@account_initialization_required
def get(self, provider, subscription_builder_id):
"""Get a subscription instance for a trigger provider"""
return jsonable_encoder(
TriggerSubscriptionBuilderService.get_subscription_builder_by_id(subscription_builder_id)
)
class TriggerSubscriptionBuilderVerifyApi(Resource):
@setup_required
@login_required
@ -332,6 +343,7 @@ class TriggerOAuthAuthorizeApi(Resource):
{
"authorization_url": authorization_url_response.authorization_url,
"subscription_builder_id": subscription_builder.id,
"subscription_builder": subscription_builder,
}
)
)
@ -532,6 +544,10 @@ api.add_resource(
TriggerSubscriptionBuilderCreateApi,
"/workspaces/current/trigger-provider/<path:provider>/subscriptions/builder/create",
)
api.add_resource(
TriggerSubscriptionBuilderGetApi,
"/workspaces/current/trigger-provider/<path:provider>/subscriptions/builder/<path:subscription_builder_id>",
)
api.add_resource(
TriggerSubscriptionBuilderUpdateApi,
"/workspaces/current/trigger-provider/<path:provider>/subscriptions/builder/update/<path:subscription_builder_id>",

View File

@ -51,11 +51,6 @@ class QueueEvent(StrEnum):
PING = "ping"
STOP = "stop"
RETRY = "retry"
TRIGGER_LISTENING_STARTED = "trigger_listening_started"
TRIGGER_TRIGGERED = "trigger_triggered"
TRIGGER_NODE_FINISHED = "trigger_node_finished"
TRIGGER_LISTENING_TIMEOUT = "trigger_listening_timeout"
class AppQueueEvent(BaseModel):
"""
@ -722,54 +717,6 @@ class QueueParallelBranchRunSucceededEvent(AppQueueEvent):
"""loop id if node is in loop"""
class QueueTriggerListeningStartedEvent(AppQueueEvent):
"""
QueueTriggerListeningStartedEvent entity
"""
event: QueueEvent = QueueEvent.TRIGGER_LISTENING_STARTED
session_id: str
webhook_url: str
timeout: int
class QueueTriggerTriggeredEvent(AppQueueEvent):
"""
QueueTriggerTriggeredEvent entity
"""
event: QueueEvent = QueueEvent.TRIGGER_TRIGGERED
subscription_id: str
triggers: list[str]
request_id: str
timestamp: float
class QueueTriggerNodeFinishedEvent(AppQueueEvent):
"""
QueueTriggerNodeFinishedEvent entity
"""
event: QueueEvent = QueueEvent.TRIGGER_NODE_FINISHED
id: str
node_id: str
node_type: str
status: str
outputs: Optional[Mapping[str, Any]] = None
error: Optional[str] = None
elapsed_time: Optional[float] = None
execution_metadata: Optional[Mapping[str, Any]] = None
class QueueTriggerListeningTimeoutEvent(AppQueueEvent):
"""
QueueTriggerListeningTimeoutEvent entity
"""
event: QueueEvent = QueueEvent.TRIGGER_LISTENING_TIMEOUT
error: str = "Timeout waiting for trigger"
class QueueParallelBranchRunFailedEvent(AppQueueEvent):
"""
QueueParallelBranchRunFailedEvent entity

View File

@ -82,11 +82,6 @@ class StreamEvent(Enum):
TEXT_CHUNK = "text_chunk"
TEXT_REPLACE = "text_replace"
AGENT_LOG = "agent_log"
# Trigger debug events
TRIGGER_LISTENING_STARTED = "trigger_listening_started"
TRIGGER_LISTENING_TIMEOUT = "trigger_listening_timeout"
TRIGGER_TRIGGERED = "trigger_triggered"
TRIGGER_NODE_FINISHED = "trigger_node_finished"
class StreamResponse(BaseModel):
@ -842,52 +837,3 @@ class AgentLogStreamResponse(StreamResponse):
event: StreamEvent = StreamEvent.AGENT_LOG
data: Data
# Trigger Debug Stream Responses
class TriggerListeningStartedResponse(StreamResponse):
"""
TriggerListeningStartedResponse entity
"""
event: StreamEvent = StreamEvent.TRIGGER_LISTENING_STARTED
session_id: str
webhook_url: str
timeout: int
class TriggerTriggeredResponse(StreamResponse):
"""
TriggerTriggeredResponse entity
"""
event: StreamEvent = StreamEvent.TRIGGER_TRIGGERED
subscription_id: str
triggers: list[str]
request_id: str
timestamp: float
class TriggerNodeFinishedResponse(StreamResponse):
"""
TriggerNodeFinishedResponse entity
"""
event: StreamEvent = StreamEvent.TRIGGER_NODE_FINISHED
id: str
node_id: str
node_type: str
status: str
outputs: Optional[Mapping[str, Any]] = None
error: Optional[str] = None
elapsed_time: float
execution_metadata: Optional[Mapping[str, Any]] = None
class TriggerListeningTimeoutResponse(StreamResponse):
"""
TriggerListeningTimeoutResponse entity
"""
event: StreamEvent = StreamEvent.TRIGGER_LISTENING_TIMEOUT
error: str = "Timeout waiting for trigger"

View File

@ -269,15 +269,6 @@ class TriggerInputs(BaseModel):
trigger_name: str
subscription_id: str
@classmethod
def from_trigger_data(cls, trigger_data: TriggerEventData) -> "TriggerInputs":
"""Create from debug event data."""
return cls(
request_id=trigger_data.request_id,
trigger_name=trigger_data.triggers[0] if trigger_data.triggers else "",
subscription_id=trigger_data.subscription_id,
)
@classmethod
def from_trigger_entity(cls, request_id: str, subscription_id: str, trigger: TriggerEntity) -> "TriggerInputs":
"""Create from trigger entity (for production)."""

View File

@ -29,6 +29,15 @@ class NodeType(StrEnum):
TRIGGER_SCHEDULE = "trigger-schedule"
TRIGGER_PLUGIN = "trigger-plugin"
@property
def is_start_node(self) -> bool:
return self in [
NodeType.START,
NodeType.TRIGGER_WEBHOOK,
NodeType.TRIGGER_SCHEDULE,
NodeType.TRIGGER_PLUGIN,
]
class ErrorStrategy(StrEnum):
FAIL_BRANCH = "fail-branch"

View File

@ -204,6 +204,7 @@ class TriggerSubscriptionBuilderService:
def builder_to_api_entity(
cls, controller: PluginTriggerProviderController, entity: SubscriptionBuilder
) -> SubscriptionBuilderApiEntity:
credential_type = CredentialType.of(entity.credential_type or CredentialType.UNAUTHORIZED.value)
return SubscriptionBuilderApiEntity(
id=entity.id,
name=entity.name or "",
@ -211,9 +212,9 @@ class TriggerSubscriptionBuilderService:
endpoint=parse_endpoint_id(entity.endpoint_id),
parameters=entity.parameters,
properties=entity.properties,
credential_type=CredentialType(entity.credential_type),
credential_type=credential_type,
credentials=masked_credentials(
schemas=controller.get_credentials_schema(CredentialType(entity.credential_type)),
schemas=controller.get_credentials_schema(credential_type),
credentials=entity.credentials,
),
)
@ -301,3 +302,16 @@ class TriggerSubscriptionBuilderService:
# append the request log
cls.append_log(endpoint_id, request, response.response)
return response.response
@classmethod
def get_subscription_builder_by_id(cls, subscription_builder_id: str) -> SubscriptionBuilderApiEntity:
"""Get a trigger subscription builder API entity."""
subscription_builder = cls.get_subscription_builder(subscription_builder_id)
if not subscription_builder:
raise ValueError(f"Subscription builder {subscription_builder_id} not found")
return cls.builder_to_api_entity(
controller=TriggerManager.get_trigger_provider(
subscription_builder.tenant_id, TriggerProviderID(subscription_builder.provider_id)
),
entity=subscription_builder,
)

View File

@ -1,372 +1,125 @@
"""
Trigger debug service for webhook debugging in draft workflows.
"""Trigger debug service for webhook debugging in draft workflows."""
This service provides debugging capabilities for trigger nodes by using
Redis Pub/Sub to enable real-time event forwarding across distributed instances.
"""
import json
import hashlib
import logging
import time
import uuid
from collections.abc import Generator
from dataclasses import dataclass
from typing import Optional
from flask import request
from werkzeug.exceptions import NotFound
from pydantic import BaseModel
from redis import RedisError
from core.app.entities.task_entities import (
ErrorStreamResponse,
PingStreamResponse,
StreamResponse,
TriggerListeningStartedResponse,
TriggerListeningTimeoutResponse,
TriggerTriggeredResponse,
)
from core.trigger.entities.entities import TriggerEventData
from extensions.ext_redis import redis_client
from models.model import App
from services.trigger.trigger_provider_service import TriggerProviderService
from services.workflow_service import WorkflowService
logger = logging.getLogger(__name__)
TRIGGER_DEBUG_EVENT_TTL = 300
@dataclass
class TriggerDebuggingContext:
"""Context for trigger debugging session."""
session_id: str
class TriggerDebugEvent(BaseModel):
subscription_id: str
webhook_url: str
node_id: str
app_id: str
user_id: str
timeout: int
request_id: str
timestamp: int
class TriggerDebugService:
"""
Trigger debug service - supports distributed environments.
Cleans up resources on disconnect, no reconnection handling.
Redis-based trigger debug service with polling support.
Uses {tenant_id} hash tags for Redis Cluster compatibility.
"""
SESSION_PREFIX = "trigger_debug_session:"
SUBSCRIPTION_DEBUG_PREFIX = "trigger_debug_subscription:"
PUBSUB_CHANNEL_PREFIX = "trigger_debug_channel:"
# LUA_SELECT: Atomic poll or register for event
# KEYS[1] = trigger_debug_inbox:{tenant_id}:{address_id}
# KEYS[2] = trigger_debug_waiting_pool:{tenant_id}:{subscription_id}:{trigger}
# ARGV[1] = address_id
# compressed lua code, you can use LLM to uncompress it
LUA_SELECT = (
"local v=redis.call('GET',KEYS[1]);"
"if v then redis.call('DEL',KEYS[1]);return v end;"
"redis.call('SADD',KEYS[2],ARGV[1]);"
f"redis.call('EXPIRE',KEYS[2],{TRIGGER_DEBUG_EVENT_TTL});"
"return false"
)
__DEFAULT_LISTEN_TIMEOUT__ = 300
# LUA_DISPATCH: Dispatch event to all waiting addresses
# KEYS[1] = trigger_debug_waiting_pool:{tenant_id}:{subscription_id}:{trigger}
# ARGV[1] = tenant_id
# ARGV[2] = event_json
# compressed lua code, you can use LLM to uncompress it
LUA_DISPATCH = (
"local a=redis.call('SMEMBERS',KEYS[1]);"
"if #a==0 then return 0 end;"
"redis.call('DEL',KEYS[1]);"
"for i=1,#a do "
f"redis.call('SET','trigger_debug_inbox:{{'..ARGV[1]..'}}'..':'..a[i],ARGV[2],'EX',{TRIGGER_DEBUG_EVENT_TTL});"
"end;"
"return #a"
)
@classmethod
def build_debugging_context(
def address(cls, tenant_id: str, user_id: str, app_id: str, node_id: str) -> str:
address_id = hashlib.sha1(f"{user_id}|{app_id}|{node_id}".encode()).hexdigest()
return f"trigger_debug_inbox:{{{tenant_id}}}:{address_id}"
@classmethod
def waiting_pool(cls, tenant_id: str, subscription_id: str, trigger_name: str) -> str:
return f"trigger_debug_waiting_pool:{{{tenant_id}}}:{subscription_id}:{trigger_name}"
@classmethod
def dispatch_debug_event(
cls,
app_model: App,
node_id: str,
user_id: str,
timeout: int = __DEFAULT_LISTEN_TIMEOUT__,
) -> TriggerDebuggingContext:
"""
Build debugging context for trigger node.
Args:
app_model: Application model
node_id: Node ID to debug
user_id: User ID creating the session
timeout: Session timeout in seconds
Returns:
TriggerDebuggingContext with all debugging information
Raises:
NotFound: If workflow or node not found
ValueError: If node is not a trigger plugin or has no subscription
"""
# Get and validate workflow
workflow_service = WorkflowService()
draft_workflow = workflow_service.get_draft_workflow(app_model)
if not draft_workflow:
raise NotFound("Workflow not found")
# Get and validate node
node_config = draft_workflow.get_node_config_by_id(node_id)
if not node_config:
raise NotFound(f"Node {node_id} not found")
if node_config.get("data", {}).get("type") != "trigger-plugin":
raise ValueError("Node is not a trigger plugin node")
subscription_id = node_config.get("data", {}).get("subscription_id")
if not subscription_id:
raise ValueError("No subscription configured for this trigger node")
# Create debug session
app_id = str(app_model.id)
session_id = cls.create_debug_session(
app_id=app_id,
node_id=node_id,
tenant_id: str,
subscription_id: str,
triggers: list[str],
request_id: str,
timestamp: int,
) -> int:
event_json = TriggerDebugEvent(
subscription_id=subscription_id,
user_id=user_id,
timeout=timeout,
)
request_id=request_id,
timestamp=timestamp,
).model_dump_json()
# Get webhook URL
subscription = TriggerProviderService.get_subscription_by_id(
tenant_id=app_model.tenant_id, subscription_id=subscription_id
)
webhook_url = (
f"{request.host_url.rstrip('/')}/trigger/plugin/{subscription.endpoint}" if subscription else "Unknown"
)
dispatched = 0
if len(triggers) > 10:
logger.warning(
"Too many triggers to dispatch at once: %d triggers tenant: %s subscription: %s",
len(triggers),
tenant_id,
subscription_id,
)
return TriggerDebuggingContext(
session_id=session_id,
subscription_id=subscription_id,
webhook_url=webhook_url,
node_id=node_id,
app_id=app_id,
user_id=user_id,
timeout=timeout,
)
for trigger_name in triggers:
try:
dispatched += redis_client.eval(
cls.LUA_DISPATCH,
1,
cls.waiting_pool(tenant_id, subscription_id, trigger_name),
tenant_id,
event_json,
)
except RedisError:
logger.exception("Failed to dispatch for trigger: %s", trigger_name)
return dispatched
@classmethod
def waiting_for_triggered(
def poll_event(
cls,
app_model: App,
node_id: str,
tenant_id: str,
user_id: str,
timeout: int = __DEFAULT_LISTEN_TIMEOUT__,
) -> Generator[StreamResponse, None, None]:
"""
Listen for trigger events only.
This method sets up a debug session and listens for incoming trigger events.
It yields events as they occur and returns when a trigger is received or timeout occurs.
Args:
app_model: Application model
node_id: Node ID to debug
user_id: User ID creating the session
timeout: Timeout in seconds
Yields:
Event dictionaries including:
- listening_started: Initial event with webhook URL
- ping: Periodic heartbeat events
- trigger_debug_received: When trigger is received
- timeout: When timeout occurs
- error: On any errors
"""
# Build debugging context
context = cls.build_debugging_context(app_model, node_id, user_id, timeout)
# Listen for events and pass them through
for event in cls.listen_for_events(
session_id=context.session_id, webhook_url=context.webhook_url, timeout=context.timeout
):
yield event
# If we received a trigger, listening is complete
if isinstance(event, dict) and event.get("event") == "trigger_triggered":
break
@classmethod
def create_debug_session(
cls, app_id: str, node_id: str, subscription_id: str, user_id: str, timeout: int = __DEFAULT_LISTEN_TIMEOUT__
) -> str:
"""
Create a debug session.
Args:
app_id: Application ID
node_id: Node ID being debugged
subscription_id: Subscription ID to monitor
user_id: User ID creating the session
timeout: Session timeout in seconds
Returns:
Session ID
"""
session_id = str(uuid.uuid4())
session_data = {
"session_id": session_id,
"app_id": app_id,
"node_id": node_id,
"subscription_id": subscription_id,
"user_id": user_id,
"created_at": time.time(),
}
# 1. Save session info
redis_client.setex(f"{cls.SESSION_PREFIX}{session_id}", timeout, json.dumps(session_data))
# 2. Register to subscription's debug session set
redis_client.sadd(f"{cls.SUBSCRIPTION_DEBUG_PREFIX}{subscription_id}", session_id)
redis_client.expire(f"{cls.SUBSCRIPTION_DEBUG_PREFIX}{subscription_id}", timeout)
logger.info("Created debug session %s for subscription %s", session_id, subscription_id)
return session_id
@classmethod
def listen_for_events(
cls, session_id: str, webhook_url: str, timeout: int = __DEFAULT_LISTEN_TIMEOUT__
) -> Generator[StreamResponse, None, None]:
"""
Listen for events using Redis Pub/Sub and generate structured events.
Args:
session_id: Debug session ID
webhook_url: Webhook URL for the trigger
timeout: Timeout in seconds
Yields:
Structured AppQueueEvent objects
"""
# Send initial listening started event
yield TriggerListeningStartedResponse(
task_id="", # Will be set by the caller if needed
session_id=session_id,
webhook_url=webhook_url,
timeout=timeout,
)
pubsub = redis_client.pubsub()
channel = f"{cls.PUBSUB_CHANNEL_PREFIX}{session_id}"
app_id: str,
subscription_id: str,
node_id: str,
trigger_name: str,
) -> Optional[TriggerDebugEvent]:
address_id = hashlib.sha1(f"{user_id}|{app_id}|{node_id}".encode()).hexdigest()
try:
# Subscribe to channel
pubsub.subscribe(channel)
logger.info("Listening on channel: %s", channel)
start_time = time.time()
last_heartbeat = time.time()
# Real-time listening
while time.time() - start_time < timeout:
# Non-blocking message retrieval with 1 second timeout
message = pubsub.get_message(timeout=1.0)
if message and message["type"] == "message":
# Received trigger event - parse and create structured event
try:
event_data = json.loads(message["data"])
logger.info("Received trigger event for session %s", session_id)
# Create structured trigger received event
trigger_data = TriggerEventData(
subscription_id=event_data["subscription_id"],
triggers=event_data["triggers"],
request_id=event_data["request_id"],
timestamp=event_data.get("timestamp", time.time()),
)
yield TriggerTriggeredResponse(
task_id="",
subscription_id=trigger_data.subscription_id,
triggers=trigger_data.triggers,
request_id=trigger_data.request_id,
timestamp=trigger_data.timestamp,
)
break # End listening after receiving event
except (json.JSONDecodeError, KeyError) as e:
logger.exception("Failed to parse trigger event")
yield ErrorStreamResponse(
task_id="", err=Exception(f"Failed to parse trigger event: {str(e)}")
)
break
# Send periodic heartbeat
if time.time() - last_heartbeat > 5:
yield PingStreamResponse(task_id="")
last_heartbeat = time.time()
# Timeout
if time.time() - start_time >= timeout:
yield TriggerListeningTimeoutResponse(task_id="")
except Exception as e:
logger.exception("Error in listen_for_events", exc_info=e)
yield ErrorStreamResponse(task_id="", err=e)
finally:
# Clean up resources
cls.close_session(session_id)
pubsub.unsubscribe(channel)
pubsub.close()
logger.info("Closed listening for session %s", session_id)
@classmethod
def close_session(cls, session_id: str):
"""
Close and clean up debug session.
Args:
session_id: Session ID to close
"""
try:
# Get session info
session_data = redis_client.get(f"{cls.SESSION_PREFIX}{session_id}")
if session_data:
session = json.loads(session_data)
subscription_id = session.get("subscription_id")
# Remove from subscription set
if subscription_id:
redis_client.srem(f"{cls.SUBSCRIPTION_DEBUG_PREFIX}{subscription_id}", session_id)
logger.info("Removed session %s from subscription %s", session_id, subscription_id)
# Delete session info
redis_client.delete(f"{cls.SESSION_PREFIX}{session_id}")
logger.info("Cleaned up session %s", session_id)
except Exception as e:
logger.exception("Error closing session %s", session_id, exc_info=e)
@classmethod
def dispatch_to_debug_sessions(cls, subscription_id: str, event_data: TriggerEventData) -> int:
"""
Dispatch events to debug sessions using Pub/Sub only.
Args:
subscription_id: Subscription ID
event_data: Event data to dispatch
Returns:
Number of active debug sessions
"""
try:
# Get all listening debug sessions
debug_sessions = redis_client.smembers(f"{cls.SUBSCRIPTION_DEBUG_PREFIX}{subscription_id}")
if not debug_sessions:
return 0
active_sessions = 0
for session_id_bytes in debug_sessions:
if isinstance(session_id_bytes, bytes):
session_id = session_id_bytes.decode("utf-8")
else:
session_id = session_id_bytes
# Verify session is valid
if not redis_client.exists(f"{cls.SESSION_PREFIX}{session_id}"):
# Clean up invalid session
redis_client.srem(f"{cls.SUBSCRIPTION_DEBUG_PREFIX}{subscription_id}", session_id)
continue
# Publish event via Pub/Sub
channel = f"{cls.PUBSUB_CHANNEL_PREFIX}{session_id}"
subscriber_count = redis_client.publish(channel, json.dumps(event_data.model_dump()))
if subscriber_count > 0:
active_sessions += 1
logger.info("Published event to %d subscribers on channel %s", subscriber_count, channel)
else:
# No subscribers, clean up session
logger.info("No subscribers for session %s, cleaning up", session_id)
cls.close_session(session_id)
if active_sessions > 0:
logger.info("Dispatched event to %d active debug sessions", active_sessions)
return active_sessions
except Exception as e:
logger.exception("Failed to dispatch to debug sessions", exc_info=e)
return 0
event = redis_client.eval(
cls.LUA_SELECT,
2,
cls.address(tenant_id, user_id, app_id, node_id),
cls.waiting_pool(tenant_id, subscription_id, trigger_name),
address_id,
)
return TriggerDebugEvent.model_validate_json(event) if event else None
except RedisError:
logger.exception("Failed to poll debug event")
return None

View File

@ -8,7 +8,7 @@ from sqlalchemy.orm import Session
from core.plugin.entities.plugin import TriggerProviderID
from core.plugin.utils.http_parser import serialize_request
from core.trigger.entities.entities import TriggerEntity, TriggerEventData, TriggerInputs
from core.trigger.entities.entities import TriggerEntity, TriggerInputs
from core.trigger.trigger_manager import TriggerManager
from extensions.ext_database import db
from extensions.ext_storage import storage
@ -18,7 +18,6 @@ from models.trigger import TriggerSubscription
from models.workflow import Workflow, WorkflowPluginTrigger
from services.async_workflow_service import AsyncWorkflowService
from services.trigger.trigger_provider_service import TriggerProviderService
from services.trigger_debug_service import TriggerDebugService
from services.workflow.entities import PluginTriggerData
logger = logging.getLogger(__name__)
@ -121,33 +120,6 @@ class TriggerService:
return dispatched_count
@classmethod
def dispatch_debugging_sessions(cls, subscription_id: str, triggers: list[str], request_id: str) -> int:
"""
Dispatch to debug sessions - simplified version.
Args:
subscription_id: Subscription ID
triggers: List of trigger names
request_id: Request ID for storage reference
"""
try:
# Prepare streamlined event data using Pydantic model
debug_data = TriggerEventData(
subscription_id=subscription_id,
triggers=triggers,
request_id=request_id,
timestamp=time.time(),
)
return TriggerDebugService.dispatch_to_debug_sessions(
subscription_id=subscription_id, event_data=debug_data
)
except Exception as e:
# Silent failure, don't affect production
logger.exception("Debug dispatch failed", exc_info=e)
return 0
@classmethod
def process_endpoint(cls, endpoint_id: str, request: Request) -> Response | None:
"""
@ -157,6 +129,7 @@ class TriggerService:
endpoint_id: Endpoint ID
request: Request
"""
timestamp = int(time.time())
subscription = TriggerProviderService.get_subscription_by_endpoint(endpoint_id)
if not subscription:
return None
@ -175,12 +148,14 @@ class TriggerService:
serialized_request = serialize_request(request)
storage.save(f"triggers/{request_id}", serialized_request)
# Production dispatch
from tasks.trigger_processing_tasks import dispatch_triggered_workflows_async
dispatch_triggered_workflows_async(
endpoint_id=endpoint_id,
provider_id=subscription.provider_id,
subscription_id=subscription.id,
timestamp=timestamp,
triggers=list(dispatch_response.triggers),
request_id=request_id,
)
@ -191,7 +166,6 @@ class TriggerService:
endpoint_id,
request_id,
)
return dispatch_response.response
@classmethod

View File

@ -347,7 +347,7 @@ class WorkflowService:
node_config = draft_workflow.get_node_config_by_id(node_id)
node_type = Workflow.get_node_type_from_node_config(node_config)
node_data = node_config.get("data", {})
if node_type == NodeType.START:
if node_type.is_start_node:
with Session(bind=db.engine) as session, session.begin():
draft_var_srv = WorkflowDraftVariableService(session)
conversation_id = draft_var_srv.get_or_create_conversation(

View File

@ -15,6 +15,7 @@ from core.trigger.trigger_manager import TriggerManager
from extensions.ext_database import db
from extensions.ext_storage import storage
from models.trigger import TriggerSubscription
from services.trigger_debug_service import TriggerDebugService
from services.trigger_service import TriggerService
logger = logging.getLogger(__name__)
@ -29,6 +30,7 @@ def dispatch_triggered_workflows_async(
endpoint_id: str,
provider_id: str,
subscription_id: str,
timestamp: int,
triggers: list[str],
request_id: str,
) -> dict:
@ -39,6 +41,7 @@ def dispatch_triggered_workflows_async(
endpoint_id: Endpoint ID
provider_id: Provider ID
subscription_id: Subscription ID
timestamp: Timestamp of the event
triggers: List of triggers to dispatch
request_id: Unique ID of the stored request
@ -47,10 +50,11 @@ def dispatch_triggered_workflows_async(
"""
try:
logger.info(
"Starting async trigger dispatching for endpoint=%s, triggers=%s, request_id=%s",
"Starting async trigger dispatching for endpoint=%s, triggers=%s, request_id=%s, timestamp=%s",
endpoint_id,
triggers,
request_id,
timestamp,
)
# Verify request exists in storage
@ -106,9 +110,11 @@ def dispatch_triggered_workflows_async(
# Dispatch to debug sessions after processing all triggers
try:
debug_dispatched = TriggerService.dispatch_debugging_sessions(
debug_dispatched = TriggerDebugService.dispatch_debug_event(
tenant_id=subscription.tenant_id,
subscription_id=subscription_id,
triggers=triggers,
timestamp=timestamp,
request_id=request_id,
)
except Exception: