mirror of https://github.com/langgenius/dify.git
feat(trigger): implement trigger debugging functionality
- Added DraftWorkflowTriggerNodeApi and DraftWorkflowTriggerRunApi for debugging trigger nodes and workflows. - Enhanced TriggerDebugService to manage trigger debugging sessions and event listening. - Introduced structured event responses for trigger debugging, including listening started, received, node finished, and workflow started events. - Updated Queue and Stream entities to support new trigger debug events. - Refactored trigger input handling to streamline the process of creating inputs from trigger data. This implementation improves the debugging capabilities for trigger nodes and workflows, providing clearer event handling and structured responses.
This commit is contained in:
parent
e9e843b27d
commit
1562d00037
|
|
@ -807,69 +807,170 @@ class DraftWorkflowNodeLastRunApi(Resource):
|
|||
|
||||
|
||||
class DraftWorkflowTriggerNodeApi(Resource):
|
||||
"""
|
||||
Single node debug - Listen for trigger events and execute single Trigger node
|
||||
Path: /apps/<uuid:app_id>/workflows/draft/nodes/<string:node_id>/trigger
|
||||
"""
|
||||
|
||||
@setup_required
|
||||
@login_required
|
||||
@account_initialization_required
|
||||
@get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW])
|
||||
@get_app_model(mode=[AppMode.WORKFLOW])
|
||||
def post(self, app_model: App, node_id: str):
|
||||
"""
|
||||
Debug trigger node by creating a debug session and listening for events.
|
||||
Debug trigger node by listening for events and executing the node
|
||||
"""
|
||||
srv = WorkflowService()
|
||||
workflow = srv.get_draft_workflow(app_model)
|
||||
if not workflow:
|
||||
raise NotFound("Workflow not found")
|
||||
if not isinstance(current_user, Account) or not current_user.is_editor:
|
||||
raise Forbidden()
|
||||
|
||||
# Get node configuration
|
||||
node_config = workflow.get_node_config_by_id(node_id)
|
||||
if not node_config:
|
||||
raise NotFound(f"Node {node_id} not found in workflow")
|
||||
parser = reqparse.RequestParser()
|
||||
parser.add_argument("timeout", type=int, default=300, location="json")
|
||||
args = parser.parse_args()
|
||||
|
||||
# Validate it's a trigger plugin node
|
||||
if node_config.get("data", {}).get("type") != "plugin":
|
||||
raise ValueError("Node is not a trigger plugin node")
|
||||
from core.trigger.entities.entities import TriggerDebugEventData, TriggerInputs
|
||||
from services.trigger.trigger_debug_events import TriggerDebugEventGenerator
|
||||
from services.trigger_debug_service import TriggerDebugService
|
||||
from services.workflow_service import WorkflowService
|
||||
|
||||
# Get subscription ID from node config
|
||||
subscription_id = node_config.get("data", {}).get("subscription_id")
|
||||
if not subscription_id:
|
||||
raise ValueError("No subscription ID configured for this trigger node")
|
||||
def generate(current_user: Account):
|
||||
# Phase 1: Listen for trigger events
|
||||
trigger_data = None
|
||||
for event in TriggerDebugService.waiting_for_triggered(
|
||||
app_model=app_model,
|
||||
node_id=node_id,
|
||||
user_id=current_user.id,
|
||||
timeout=args.get("timeout", 300),
|
||||
):
|
||||
yield event # Pass through all listening events
|
||||
|
||||
# Create debug session
|
||||
# Check if we received the trigger
|
||||
if isinstance(event, dict) and event.get("event") == "trigger_debug_received":
|
||||
# Save trigger data and exit listening loop
|
||||
trigger_data = TriggerDebugEventData(
|
||||
subscription_id=event["subscription_id"],
|
||||
triggers=event["triggers"],
|
||||
request_id=event["request_id"],
|
||||
timestamp=event["timestamp"],
|
||||
)
|
||||
break
|
||||
|
||||
# Phase 2: Execute node if trigger was received
|
||||
if trigger_data:
|
||||
# Create trigger inputs
|
||||
trigger_inputs = TriggerInputs.from_trigger_data(trigger_data)
|
||||
event_generator = TriggerDebugEventGenerator()
|
||||
|
||||
try:
|
||||
# Get workflow and execute node
|
||||
workflow_service = WorkflowService()
|
||||
draft_workflow = workflow_service.get_draft_workflow(app_model)
|
||||
if not draft_workflow:
|
||||
raise ValueError("Workflow not found")
|
||||
|
||||
node_execution = workflow_service.run_draft_workflow_node(
|
||||
app_model=app_model,
|
||||
draft_workflow=draft_workflow,
|
||||
node_id=node_id,
|
||||
user_inputs=trigger_inputs.to_dict(),
|
||||
account=current_user,
|
||||
query="",
|
||||
files=[],
|
||||
)
|
||||
|
||||
# Generate node finished event
|
||||
yield event_generator.generate_node_finished(node_execution).to_dict()
|
||||
|
||||
except Exception as e:
|
||||
yield event_generator.generate_error(str(e)).to_dict()
|
||||
|
||||
# Use standard response format
|
||||
from core.app.apps.base_app_generator import BaseAppGenerator
|
||||
|
||||
response = BaseAppGenerator.convert_to_event_stream(generate(current_user))
|
||||
return helper.compact_generate_response(response)
|
||||
|
||||
|
||||
class DraftWorkflowTriggerRunApi(Resource):
|
||||
"""
|
||||
Full workflow debug - Listen for trigger events and execute complete workflow
|
||||
Path: /apps/<uuid:app_id>/workflows/draft/trigger/run
|
||||
"""
|
||||
|
||||
@setup_required
|
||||
@login_required
|
||||
@account_initialization_required
|
||||
@get_app_model(mode=[AppMode.WORKFLOW])
|
||||
def post(self, app_model: App):
|
||||
"""
|
||||
Debug trigger workflow by listening for events and running full workflow
|
||||
"""
|
||||
if not isinstance(current_user, Account) or not current_user.is_editor:
|
||||
raise Forbidden()
|
||||
parser = reqparse.RequestParser()
|
||||
parser.add_argument("node_id", type=str, required=True, location="json")
|
||||
parser.add_argument("timeout", type=int, default=300, location="json")
|
||||
args = parser.parse_args()
|
||||
|
||||
from core.app.entities.app_invoke_entities import InvokeFrom
|
||||
from core.trigger.entities.entities import TriggerDebugEventData, TriggerInputs
|
||||
from services.app_generate_service import AppGenerateService
|
||||
from services.trigger.trigger_debug_events import TriggerDebugEventGenerator
|
||||
from services.trigger_debug_service import TriggerDebugService
|
||||
|
||||
assert isinstance(current_user, Account)
|
||||
session_id = TriggerDebugService.create_debug_session(
|
||||
app_id=str(app_model.id),
|
||||
node_id=node_id,
|
||||
subscription_id=subscription_id,
|
||||
user_id=current_user.id,
|
||||
timeout=300,
|
||||
)
|
||||
def generate(current_user: Account):
|
||||
# Phase 1: Listen for trigger events
|
||||
trigger_data = None
|
||||
for event in TriggerDebugService.waiting_for_triggered(
|
||||
app_model=app_model,
|
||||
node_id=args["node_id"],
|
||||
user_id=current_user.id,
|
||||
timeout=args.get("timeout", 300),
|
||||
):
|
||||
yield event # Pass through all listening events
|
||||
|
||||
# Stream events to client
|
||||
def generate():
|
||||
for event_data in TriggerDebugService.listen_for_events(session_id):
|
||||
if isinstance(event_data, dict):
|
||||
if event_data.get("type") == "heartbeat":
|
||||
yield f"event: heartbeat\ndata: {json.dumps(event_data)}\n\n"
|
||||
elif event_data.get("type") == "timeout":
|
||||
yield f"event: timeout\ndata: {json.dumps({'message': 'Session timed out'})}\n\n"
|
||||
break
|
||||
elif event_data.get("type") == "error":
|
||||
yield f"event: error\ndata: {json.dumps(event_data)}\n\n"
|
||||
break
|
||||
else:
|
||||
# Trigger event - prepare for workflow execution
|
||||
yield f"event: trigger\ndata: {json.dumps(event_data)}\n\n"
|
||||
# Check if we received the trigger
|
||||
if isinstance(event, dict) and event.get("event") == "trigger_debug_received":
|
||||
# Save trigger data and exit listening loop
|
||||
trigger_data = TriggerDebugEventData(
|
||||
subscription_id=event["subscription_id"],
|
||||
triggers=event["triggers"],
|
||||
request_id=event["request_id"],
|
||||
timestamp=event["timestamp"],
|
||||
)
|
||||
break
|
||||
|
||||
# TODO: Execute workflow with trigger data if needed
|
||||
# This would involve extracting trigger data and running the workflow
|
||||
# For now, just send the trigger event
|
||||
break
|
||||
# Phase 2: Execute workflow if trigger was received
|
||||
if trigger_data:
|
||||
event_generator = TriggerDebugEventGenerator()
|
||||
|
||||
from flask import Response
|
||||
# Yield workflow started event
|
||||
yield event_generator.generate_workflow_started(trigger_data).to_dict()
|
||||
|
||||
return Response(generate(), mimetype="text/event-stream")
|
||||
# Create trigger inputs and convert to workflow args
|
||||
trigger_inputs = TriggerInputs.from_trigger_data(trigger_data)
|
||||
combined_args = trigger_inputs.to_workflow_args()
|
||||
|
||||
try:
|
||||
# Execute workflow
|
||||
workflow_response = AppGenerateService.generate(
|
||||
app_model=app_model,
|
||||
user=current_user,
|
||||
args=combined_args,
|
||||
invoke_from=InvokeFrom.DEBUGGER,
|
||||
streaming=True,
|
||||
)
|
||||
|
||||
# Pass through workflow's standard event stream
|
||||
yield from workflow_response
|
||||
|
||||
except Exception as e:
|
||||
yield event_generator.generate_error(str(e)).to_dict()
|
||||
|
||||
# Use standard response format
|
||||
from core.app.apps.base_app_generator import BaseAppGenerator
|
||||
|
||||
response = BaseAppGenerator.convert_to_event_stream(generate(current_user))
|
||||
return helper.compact_generate_response(response)
|
||||
|
||||
|
||||
api.add_resource(
|
||||
|
|
@ -900,6 +1001,10 @@ api.add_resource(
|
|||
DraftWorkflowTriggerNodeApi,
|
||||
"/apps/<uuid:app_id>/workflows/draft/nodes/<string:node_id>/trigger",
|
||||
)
|
||||
api.add_resource(
|
||||
DraftWorkflowTriggerRunApi,
|
||||
"/apps/<uuid:app_id>/workflows/draft/trigger/run",
|
||||
)
|
||||
api.add_resource(
|
||||
AdvancedChatDraftRunIterationNodeApi,
|
||||
"/apps/<uuid:app_id>/advanced-chat/workflows/draft/iteration/nodes/<string:node_id>/run",
|
||||
|
|
|
|||
|
|
@ -447,13 +447,14 @@ class TriggerOAuthClientManageApi(Resource):
|
|||
provider_id=provider_id,
|
||||
)
|
||||
provider_controller = TriggerManager.get_trigger_provider(user.current_tenant_id, provider_id)
|
||||
redirect_uri = f"{dify_config.CONSOLE_API_URL}/console/api/oauth/plugin/{provider}/trigger/callback"
|
||||
return jsonable_encoder(
|
||||
{
|
||||
"configured": bool(custom_params or system_client),
|
||||
"oauth_client_schema": provider_controller.get_oauth_client_schema(),
|
||||
"custom_configured": bool(custom_params),
|
||||
"custom_enabled": is_custom_enabled,
|
||||
"redirect_uri": f"{dify_config.CONSOLE_API_URL}/console/api/oauth/plugin/{provider}/trigger/callback",
|
||||
"redirect_uri": redirect_uri,
|
||||
"params": custom_params if custom_params else {},
|
||||
}
|
||||
)
|
||||
|
|
|
|||
|
|
@ -51,6 +51,12 @@ class QueueEvent(StrEnum):
|
|||
PING = "ping"
|
||||
STOP = "stop"
|
||||
RETRY = "retry"
|
||||
# Trigger debug events
|
||||
TRIGGER_DEBUG_LISTENING_STARTED = "trigger_debug_listening_started"
|
||||
TRIGGER_DEBUG_RECEIVED = "trigger_debug_received"
|
||||
TRIGGER_DEBUG_NODE_FINISHED = "trigger_debug_node_finished"
|
||||
TRIGGER_DEBUG_WORKFLOW_STARTED = "trigger_debug_workflow_started"
|
||||
TRIGGER_DEBUG_TIMEOUT = "trigger_debug_timeout"
|
||||
|
||||
|
||||
class AppQueueEvent(BaseModel):
|
||||
|
|
@ -718,6 +724,65 @@ class QueueParallelBranchRunSucceededEvent(AppQueueEvent):
|
|||
"""loop id if node is in loop"""
|
||||
|
||||
|
||||
class QueueTriggerDebugListeningStartedEvent(AppQueueEvent):
|
||||
"""
|
||||
QueueTriggerDebugListeningStartedEvent entity
|
||||
"""
|
||||
|
||||
event: QueueEvent = QueueEvent.TRIGGER_DEBUG_LISTENING_STARTED
|
||||
session_id: str
|
||||
webhook_url: str
|
||||
timeout: int
|
||||
|
||||
|
||||
class QueueTriggerDebugReceivedEvent(AppQueueEvent):
|
||||
"""
|
||||
QueueTriggerDebugReceivedEvent entity
|
||||
"""
|
||||
|
||||
event: QueueEvent = QueueEvent.TRIGGER_DEBUG_RECEIVED
|
||||
subscription_id: str
|
||||
triggers: list[str]
|
||||
request_id: str
|
||||
timestamp: float
|
||||
|
||||
|
||||
class QueueTriggerDebugNodeFinishedEvent(AppQueueEvent):
|
||||
"""
|
||||
QueueTriggerDebugNodeFinishedEvent entity
|
||||
"""
|
||||
|
||||
event: QueueEvent = QueueEvent.TRIGGER_DEBUG_NODE_FINISHED
|
||||
id: str
|
||||
node_id: str
|
||||
node_type: str
|
||||
status: str
|
||||
outputs: Optional[Mapping[str, Any]] = None
|
||||
error: Optional[str] = None
|
||||
elapsed_time: Optional[float] = None
|
||||
execution_metadata: Optional[Mapping[str, Any]] = None
|
||||
|
||||
|
||||
class QueueTriggerDebugWorkflowStartedEvent(AppQueueEvent):
|
||||
"""
|
||||
QueueTriggerDebugWorkflowStartedEvent entity
|
||||
"""
|
||||
|
||||
event: QueueEvent = QueueEvent.TRIGGER_DEBUG_WORKFLOW_STARTED
|
||||
subscription_id: str
|
||||
triggers: list[str]
|
||||
request_id: str
|
||||
|
||||
|
||||
class QueueTriggerDebugTimeoutEvent(AppQueueEvent):
|
||||
"""
|
||||
QueueTriggerDebugTimeoutEvent entity
|
||||
"""
|
||||
|
||||
event: QueueEvent = QueueEvent.TRIGGER_DEBUG_TIMEOUT
|
||||
error: str = "Timeout waiting for trigger"
|
||||
|
||||
|
||||
class QueueParallelBranchRunFailedEvent(AppQueueEvent):
|
||||
"""
|
||||
QueueParallelBranchRunFailedEvent entity
|
||||
|
|
|
|||
|
|
@ -82,6 +82,12 @@ class StreamEvent(Enum):
|
|||
TEXT_CHUNK = "text_chunk"
|
||||
TEXT_REPLACE = "text_replace"
|
||||
AGENT_LOG = "agent_log"
|
||||
# Trigger debug events
|
||||
TRIGGER_DEBUG_LISTENING_STARTED = "trigger_debug_listening_started"
|
||||
TRIGGER_DEBUG_RECEIVED = "trigger_debug_received"
|
||||
TRIGGER_DEBUG_NODE_FINISHED = "trigger_debug_node_finished"
|
||||
TRIGGER_DEBUG_WORKFLOW_STARTED = "trigger_debug_workflow_started"
|
||||
TRIGGER_DEBUG_TIMEOUT = "trigger_debug_timeout"
|
||||
|
||||
|
||||
class StreamResponse(BaseModel):
|
||||
|
|
@ -837,3 +843,63 @@ class AgentLogStreamResponse(StreamResponse):
|
|||
|
||||
event: StreamEvent = StreamEvent.AGENT_LOG
|
||||
data: Data
|
||||
|
||||
|
||||
# Trigger Debug Stream Responses
|
||||
class TriggerDebugListeningStartedResponse(StreamResponse):
|
||||
"""
|
||||
TriggerDebugListeningStartedResponse entity
|
||||
"""
|
||||
|
||||
event: StreamEvent = StreamEvent.TRIGGER_DEBUG_LISTENING_STARTED
|
||||
session_id: str
|
||||
webhook_url: str
|
||||
timeout: int
|
||||
|
||||
|
||||
class TriggerDebugReceivedResponse(StreamResponse):
|
||||
"""
|
||||
TriggerDebugReceivedResponse entity
|
||||
"""
|
||||
|
||||
event: StreamEvent = StreamEvent.TRIGGER_DEBUG_RECEIVED
|
||||
subscription_id: str
|
||||
triggers: list[str]
|
||||
request_id: str
|
||||
timestamp: float
|
||||
|
||||
|
||||
class TriggerDebugNodeFinishedResponse(StreamResponse):
|
||||
"""
|
||||
TriggerDebugNodeFinishedResponse entity
|
||||
"""
|
||||
|
||||
event: StreamEvent = StreamEvent.TRIGGER_DEBUG_NODE_FINISHED
|
||||
id: str
|
||||
node_id: str
|
||||
node_type: str
|
||||
status: str
|
||||
outputs: Optional[Mapping[str, Any]] = None
|
||||
error: Optional[str] = None
|
||||
elapsed_time: float
|
||||
execution_metadata: Optional[Mapping[str, Any]] = None
|
||||
|
||||
|
||||
class TriggerDebugWorkflowStartedResponse(StreamResponse):
|
||||
"""
|
||||
TriggerDebugWorkflowStartedResponse entity
|
||||
"""
|
||||
|
||||
event: StreamEvent = StreamEvent.TRIGGER_DEBUG_WORKFLOW_STARTED
|
||||
subscription_id: str
|
||||
triggers: list[str]
|
||||
request_id: str
|
||||
|
||||
|
||||
class TriggerDebugTimeoutResponse(StreamResponse):
|
||||
"""
|
||||
TriggerDebugTimeoutResponse entity
|
||||
"""
|
||||
|
||||
event: StreamEvent = StreamEvent.TRIGGER_DEBUG_TIMEOUT
|
||||
error: str = "Timeout waiting for trigger"
|
||||
|
|
|
|||
|
|
@ -262,6 +262,36 @@ class TriggerDebugEventData(BaseModel):
|
|||
model_config = ConfigDict(arbitrary_types_allowed=True)
|
||||
|
||||
|
||||
class TriggerInputs(BaseModel):
|
||||
"""Standard inputs for trigger nodes."""
|
||||
|
||||
request_id: str
|
||||
trigger_name: str
|
||||
subscription_id: str
|
||||
|
||||
@classmethod
|
||||
def from_trigger_data(cls, trigger_data: TriggerDebugEventData) -> "TriggerInputs":
|
||||
"""Create from debug event data."""
|
||||
return cls(
|
||||
request_id=trigger_data.request_id,
|
||||
trigger_name=trigger_data.triggers[0] if trigger_data.triggers else "",
|
||||
subscription_id=trigger_data.subscription_id,
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def from_trigger_entity(cls, request_id: str, subscription_id: str, trigger: TriggerEntity) -> "TriggerInputs":
|
||||
"""Create from trigger entity (for production)."""
|
||||
return cls(request_id=request_id, trigger_name=trigger.identity.name, subscription_id=subscription_id)
|
||||
|
||||
def to_workflow_args(self) -> dict[str, Any]:
|
||||
"""Convert to workflow arguments format."""
|
||||
return {"inputs": self.model_dump(), "files": []}
|
||||
|
||||
def to_dict(self) -> dict[str, Any]:
|
||||
"""Convert to dict (alias for model_dump)."""
|
||||
return self.model_dump()
|
||||
|
||||
|
||||
# Export all entities
|
||||
__all__ = [
|
||||
"OAuthSchema",
|
||||
|
|
@ -272,6 +302,7 @@ __all__ = [
|
|||
"TriggerDescription",
|
||||
"TriggerEntity",
|
||||
"TriggerIdentity",
|
||||
"TriggerInputs",
|
||||
"TriggerParameter",
|
||||
"TriggerParameterType",
|
||||
"TriggerProviderEntity",
|
||||
|
|
|
|||
|
|
@ -0,0 +1,48 @@
|
|||
"""
|
||||
Event generator for trigger debug operations.
|
||||
|
||||
Provides structured event generation for trigger debug SSE streams.
|
||||
"""
|
||||
|
||||
from core.app.entities.task_entities import (
|
||||
ErrorStreamResponse,
|
||||
StreamResponse,
|
||||
TriggerDebugNodeFinishedResponse,
|
||||
TriggerDebugWorkflowStartedResponse,
|
||||
)
|
||||
from core.trigger.entities.entities import TriggerDebugEventData
|
||||
from models.workflow import WorkflowNodeExecutionModel
|
||||
|
||||
|
||||
class TriggerDebugEventGenerator:
|
||||
"""Generator for trigger debug events."""
|
||||
|
||||
@staticmethod
|
||||
def generate_node_finished(node_execution: WorkflowNodeExecutionModel) -> StreamResponse:
|
||||
"""Generate node finished event."""
|
||||
return TriggerDebugNodeFinishedResponse(
|
||||
task_id="",
|
||||
id=node_execution.id,
|
||||
node_id=node_execution.node_id,
|
||||
node_type=node_execution.node_type,
|
||||
status=node_execution.status,
|
||||
outputs=node_execution.outputs_dict,
|
||||
error=node_execution.error,
|
||||
elapsed_time=node_execution.elapsed_time,
|
||||
execution_metadata=node_execution.execution_metadata_dict,
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def generate_workflow_started(event_data: TriggerDebugEventData) -> StreamResponse:
|
||||
"""Generate workflow started event."""
|
||||
return TriggerDebugWorkflowStartedResponse(
|
||||
task_id="",
|
||||
subscription_id=event_data.subscription_id,
|
||||
triggers=event_data.triggers,
|
||||
request_id=event_data.request_id,
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def generate_error(error: str) -> StreamResponse:
|
||||
"""Generate error event."""
|
||||
return ErrorStreamResponse(task_id="", err=Exception(error))
|
||||
|
|
@ -10,13 +10,41 @@ import logging
|
|||
import time
|
||||
import uuid
|
||||
from collections.abc import Generator
|
||||
from dataclasses import dataclass
|
||||
from typing import Any
|
||||
|
||||
from flask import request
|
||||
from werkzeug.exceptions import NotFound
|
||||
|
||||
from core.app.entities.task_entities import (
|
||||
ErrorStreamResponse,
|
||||
PingStreamResponse,
|
||||
TriggerDebugListeningStartedResponse,
|
||||
TriggerDebugReceivedResponse,
|
||||
TriggerDebugTimeoutResponse,
|
||||
)
|
||||
from core.trigger.entities.entities import TriggerDebugEventData
|
||||
from extensions.ext_redis import redis_client
|
||||
from models.model import App
|
||||
from services.trigger.trigger_provider_service import TriggerProviderService
|
||||
from services.workflow_service import WorkflowService
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@dataclass
|
||||
class TriggerDebuggingContext:
|
||||
"""Context for trigger debugging session."""
|
||||
|
||||
session_id: str
|
||||
subscription_id: str
|
||||
webhook_url: str
|
||||
node_id: str
|
||||
app_id: str
|
||||
user_id: str
|
||||
timeout: int
|
||||
|
||||
|
||||
class TriggerDebugService:
|
||||
"""
|
||||
Trigger debug service - supports distributed environments.
|
||||
|
|
@ -27,9 +55,122 @@ class TriggerDebugService:
|
|||
SUBSCRIPTION_DEBUG_PREFIX = "trigger_debug_subscription:"
|
||||
PUBSUB_CHANNEL_PREFIX = "trigger_debug_channel:"
|
||||
|
||||
__DEFAULT_LISTEN_TIMEOUT__ = 300
|
||||
|
||||
@classmethod
|
||||
def build_debugging_context(
|
||||
cls,
|
||||
app_model: App,
|
||||
node_id: str,
|
||||
user_id: str,
|
||||
timeout: int = __DEFAULT_LISTEN_TIMEOUT__,
|
||||
) -> TriggerDebuggingContext:
|
||||
"""
|
||||
Build debugging context for trigger node.
|
||||
|
||||
Args:
|
||||
app_model: Application model
|
||||
node_id: Node ID to debug
|
||||
user_id: User ID creating the session
|
||||
timeout: Session timeout in seconds
|
||||
|
||||
Returns:
|
||||
TriggerDebuggingContext with all debugging information
|
||||
|
||||
Raises:
|
||||
NotFound: If workflow or node not found
|
||||
ValueError: If node is not a trigger plugin or has no subscription
|
||||
"""
|
||||
# Get and validate workflow
|
||||
workflow_service = WorkflowService()
|
||||
draft_workflow = workflow_service.get_draft_workflow(app_model)
|
||||
if not draft_workflow:
|
||||
raise NotFound("Workflow not found")
|
||||
|
||||
# Get and validate node
|
||||
node_config = draft_workflow.get_node_config_by_id(node_id)
|
||||
if not node_config:
|
||||
raise NotFound(f"Node {node_id} not found")
|
||||
|
||||
if node_config.get("data", {}).get("type") != "plugin":
|
||||
raise ValueError("Node is not a trigger plugin node")
|
||||
|
||||
subscription_id = node_config.get("data", {}).get("subscription_id")
|
||||
if not subscription_id:
|
||||
raise ValueError("No subscription configured for this trigger node")
|
||||
|
||||
# Create debug session
|
||||
app_id = str(app_model.id)
|
||||
session_id = cls.create_debug_session(
|
||||
app_id=app_id,
|
||||
node_id=node_id,
|
||||
subscription_id=subscription_id,
|
||||
user_id=user_id,
|
||||
timeout=timeout,
|
||||
)
|
||||
|
||||
# Get webhook URL
|
||||
subscription = TriggerProviderService.get_subscription_by_id(
|
||||
tenant_id=app_model.tenant_id, subscription_id=subscription_id
|
||||
)
|
||||
webhook_url = (
|
||||
f"{request.host_url.rstrip('/')}/trigger/plugin/{subscription.endpoint}" if subscription else "Unknown"
|
||||
)
|
||||
|
||||
return TriggerDebuggingContext(
|
||||
session_id=session_id,
|
||||
subscription_id=subscription_id,
|
||||
webhook_url=webhook_url,
|
||||
node_id=node_id,
|
||||
app_id=app_id,
|
||||
user_id=user_id,
|
||||
timeout=timeout,
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def waiting_for_triggered(
|
||||
cls,
|
||||
app_model: App,
|
||||
node_id: str,
|
||||
user_id: str,
|
||||
timeout: int = __DEFAULT_LISTEN_TIMEOUT__,
|
||||
) -> Generator[dict[str, Any], None, None]:
|
||||
"""
|
||||
Listen for trigger events only.
|
||||
|
||||
This method sets up a debug session and listens for incoming trigger events.
|
||||
It yields events as they occur and returns when a trigger is received or timeout occurs.
|
||||
|
||||
Args:
|
||||
app_model: Application model
|
||||
node_id: Node ID to debug
|
||||
user_id: User ID creating the session
|
||||
timeout: Timeout in seconds
|
||||
|
||||
Yields:
|
||||
Event dictionaries including:
|
||||
- listening_started: Initial event with webhook URL
|
||||
- ping: Periodic heartbeat events
|
||||
- trigger_debug_received: When trigger is received
|
||||
- timeout: When timeout occurs
|
||||
- error: On any errors
|
||||
"""
|
||||
# Build debugging context
|
||||
context = cls.build_debugging_context(app_model, node_id, user_id, timeout)
|
||||
|
||||
# Listen for events and pass them through
|
||||
for event in cls.listen_for_events(
|
||||
session_id=context.session_id, webhook_url=context.webhook_url, timeout=context.timeout
|
||||
):
|
||||
yield event
|
||||
|
||||
# If we received a trigger, listening is complete
|
||||
if isinstance(event, dict) and event.get("event") == "trigger_debug_received":
|
||||
break
|
||||
|
||||
@classmethod
|
||||
def create_debug_session(
|
||||
cls, app_id: str, node_id: str, subscription_id: str, user_id: str, timeout: int = 300
|
||||
cls, app_id: str, node_id: str, subscription_id: str, user_id: str, timeout: int = __DEFAULT_LISTEN_TIMEOUT__
|
||||
) -> str:
|
||||
"""
|
||||
Create a debug session.
|
||||
|
|
@ -66,17 +207,27 @@ class TriggerDebugService:
|
|||
return session_id
|
||||
|
||||
@classmethod
|
||||
def listen_for_events(cls, session_id: str, timeout: int = 300) -> Generator:
|
||||
def listen_for_events(
|
||||
cls, session_id: str, webhook_url: str, timeout: int = __DEFAULT_LISTEN_TIMEOUT__
|
||||
) -> Generator[dict[str, Any], None, None]:
|
||||
"""
|
||||
Listen for events using Redis Pub/Sub.
|
||||
Listen for events using Redis Pub/Sub and generate structured events.
|
||||
|
||||
Args:
|
||||
session_id: Debug session ID
|
||||
webhook_url: Webhook URL for the trigger
|
||||
timeout: Timeout in seconds
|
||||
|
||||
Yields:
|
||||
Event data or heartbeat messages
|
||||
Structured AppQueueEvent objects
|
||||
"""
|
||||
# Send initial listening started event
|
||||
yield TriggerDebugListeningStartedResponse(
|
||||
task_id="", # Will be set by the caller if needed
|
||||
session_id=session_id,
|
||||
webhook_url=webhook_url,
|
||||
timeout=timeout,
|
||||
).to_dict()
|
||||
pubsub = redis_client.pubsub()
|
||||
channel = f"{cls.PUBSUB_CHANNEL_PREFIX}{session_id}"
|
||||
|
||||
|
|
@ -94,24 +245,45 @@ class TriggerDebugService:
|
|||
message = pubsub.get_message(timeout=1.0)
|
||||
|
||||
if message and message["type"] == "message":
|
||||
# Received trigger event
|
||||
event_data = json.loads(message["data"])
|
||||
logger.info("Received trigger event for session %s", session_id)
|
||||
yield event_data
|
||||
break # End listening after receiving event
|
||||
# Received trigger event - parse and create structured event
|
||||
try:
|
||||
event_data = json.loads(message["data"])
|
||||
logger.info("Received trigger event for session %s", session_id)
|
||||
|
||||
# Create structured trigger received event
|
||||
trigger_data = TriggerDebugEventData(
|
||||
subscription_id=event_data["subscription_id"],
|
||||
triggers=event_data["triggers"],
|
||||
request_id=event_data["request_id"],
|
||||
timestamp=event_data.get("timestamp", time.time()),
|
||||
)
|
||||
yield TriggerDebugReceivedResponse(
|
||||
task_id="",
|
||||
subscription_id=trigger_data.subscription_id,
|
||||
triggers=trigger_data.triggers,
|
||||
request_id=trigger_data.request_id,
|
||||
timestamp=trigger_data.timestamp,
|
||||
).to_dict()
|
||||
break # End listening after receiving event
|
||||
except (json.JSONDecodeError, KeyError) as e:
|
||||
logger.exception("Failed to parse trigger event")
|
||||
yield ErrorStreamResponse(
|
||||
task_id="", err=Exception(f"Failed to parse trigger event: {str(e)}")
|
||||
).to_dict()
|
||||
break
|
||||
|
||||
# Send periodic heartbeat
|
||||
if time.time() - last_heartbeat > 5:
|
||||
yield {"type": "heartbeat", "remaining": int(timeout - (time.time() - start_time))}
|
||||
yield PingStreamResponse(task_id="").to_dict()
|
||||
last_heartbeat = time.time()
|
||||
|
||||
# Timeout
|
||||
if time.time() - start_time >= timeout:
|
||||
yield {"type": "timeout"}
|
||||
yield TriggerDebugTimeoutResponse(task_id="").to_dict()
|
||||
|
||||
except Exception as e:
|
||||
logger.exception("Error in listen_for_events", exc_info=e)
|
||||
yield {"type": "error", "message": str(e)}
|
||||
yield ErrorStreamResponse(task_id="", err=e).to_dict()
|
||||
|
||||
finally:
|
||||
# Clean up resources
|
||||
|
|
|
|||
|
|
@ -8,7 +8,7 @@ from sqlalchemy.orm import Session
|
|||
|
||||
from core.plugin.entities.plugin import TriggerProviderID
|
||||
from core.plugin.utils.http_parser import serialize_request
|
||||
from core.trigger.entities.entities import TriggerDebugEventData, TriggerEntity
|
||||
from core.trigger.entities.entities import TriggerDebugEventData, TriggerEntity, TriggerInputs
|
||||
from core.trigger.trigger_manager import TriggerManager
|
||||
from extensions.ext_database import db
|
||||
from extensions.ext_storage import storage
|
||||
|
|
@ -87,6 +87,11 @@ class TriggerService:
|
|||
)
|
||||
continue
|
||||
|
||||
# Create trigger inputs using new structure
|
||||
trigger_inputs = TriggerInputs.from_trigger_entity(
|
||||
request_id=request_id, subscription_id=subscription.id, trigger=trigger
|
||||
)
|
||||
|
||||
# Create trigger data for async execution
|
||||
trigger_data = PluginTriggerData(
|
||||
app_id=plugin_trigger.app_id,
|
||||
|
|
@ -96,11 +101,7 @@ class TriggerService:
|
|||
trigger_type=WorkflowRunTriggeredFrom.PLUGIN,
|
||||
plugin_id=subscription.provider_id,
|
||||
endpoint_id=subscription.endpoint_id,
|
||||
inputs={
|
||||
"request_id": request_id,
|
||||
"trigger_name": trigger.identity.name,
|
||||
"subscription_id": subscription.id,
|
||||
},
|
||||
inputs=trigger_inputs.to_dict(),
|
||||
)
|
||||
|
||||
# Trigger async workflow
|
||||
|
|
@ -121,15 +122,12 @@ class TriggerService:
|
|||
return dispatched_count
|
||||
|
||||
@classmethod
|
||||
def dispatch_debugging_sessions(
|
||||
cls, subscription_id: str, request: Request, triggers: list[str], request_id: str
|
||||
) -> int:
|
||||
def dispatch_debugging_sessions(cls, subscription_id: str, triggers: list[str], request_id: str) -> int:
|
||||
"""
|
||||
Dispatch to debug sessions - simplified version.
|
||||
|
||||
Args:
|
||||
subscription_id: Subscription ID
|
||||
request: Original request
|
||||
triggers: List of trigger names
|
||||
request_id: Request ID for storage reference
|
||||
"""
|
||||
|
|
|
|||
|
|
@ -11,7 +11,6 @@ from celery import shared_task
|
|||
from sqlalchemy.orm import Session
|
||||
|
||||
from core.plugin.entities.plugin import TriggerProviderID
|
||||
from core.plugin.utils.http_parser import deserialize_request
|
||||
from core.trigger.trigger_manager import TriggerManager
|
||||
from extensions.ext_database import db
|
||||
from extensions.ext_storage import storage
|
||||
|
|
@ -54,10 +53,12 @@ def dispatch_triggered_workflows_async(
|
|||
request_id,
|
||||
)
|
||||
|
||||
# Load request from storage
|
||||
# Verify request exists in storage
|
||||
try:
|
||||
serialized_request = storage.load_once(f"triggers/{request_id}")
|
||||
request = deserialize_request(serialized_request)
|
||||
# Just verify it exists, we don't need to deserialize it here
|
||||
if not serialized_request:
|
||||
raise ValueError("Request not found in storage")
|
||||
except Exception as e:
|
||||
logger.exception("Failed to load request %s", request_id, exc_info=e)
|
||||
return {"status": "failed", "error": f"Failed to load request: {str(e)}"}
|
||||
|
|
@ -107,7 +108,6 @@ def dispatch_triggered_workflows_async(
|
|||
try:
|
||||
debug_dispatched = TriggerService.dispatch_debugging_sessions(
|
||||
subscription_id=subscription_id,
|
||||
request=request,
|
||||
triggers=triggers,
|
||||
request_id=request_id,
|
||||
)
|
||||
|
|
|
|||
|
|
@ -32,7 +32,7 @@ const useConfig = (id: string, payload: ToolNodeType) => {
|
|||
* tool_parameters: tool dynamic setting(form type = llm)
|
||||
* output_schema: tool dynamic output
|
||||
*/
|
||||
const { provider_id, provider_type, trigger_name: tool_name, tool_configurations, output_schema, tool_parameters } = inputs
|
||||
const { provider_id, provider_type, tool_name, tool_configurations, output_schema, tool_parameters } = inputs
|
||||
const isBuiltIn = provider_type === CollectionType.builtIn
|
||||
const buildInTools = useStore(s => s.buildInTools)
|
||||
const customTools = useStore(s => s.customTools)
|
||||
|
|
|
|||
Loading…
Reference in New Issue