diff --git a/api/controllers/console/app/workflow.py b/api/controllers/console/app/workflow.py index cdb0d1b5c0..bc910ce5cb 100644 --- a/api/controllers/console/app/workflow.py +++ b/api/controllers/console/app/workflow.py @@ -807,69 +807,170 @@ class DraftWorkflowNodeLastRunApi(Resource): class DraftWorkflowTriggerNodeApi(Resource): + """ + Single node debug - Listen for trigger events and execute single Trigger node + Path: /apps//workflows/draft/nodes//trigger + """ + @setup_required @login_required @account_initialization_required - @get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW]) + @get_app_model(mode=[AppMode.WORKFLOW]) def post(self, app_model: App, node_id: str): """ - Debug trigger node by creating a debug session and listening for events. + Debug trigger node by listening for events and executing the node """ - srv = WorkflowService() - workflow = srv.get_draft_workflow(app_model) - if not workflow: - raise NotFound("Workflow not found") + if not isinstance(current_user, Account) or not current_user.is_editor: + raise Forbidden() - # Get node configuration - node_config = workflow.get_node_config_by_id(node_id) - if not node_config: - raise NotFound(f"Node {node_id} not found in workflow") + parser = reqparse.RequestParser() + parser.add_argument("timeout", type=int, default=300, location="json") + args = parser.parse_args() - # Validate it's a trigger plugin node - if node_config.get("data", {}).get("type") != "plugin": - raise ValueError("Node is not a trigger plugin node") + from core.trigger.entities.entities import TriggerDebugEventData, TriggerInputs + from services.trigger.trigger_debug_events import TriggerDebugEventGenerator + from services.trigger_debug_service import TriggerDebugService + from services.workflow_service import WorkflowService - # Get subscription ID from node config - subscription_id = node_config.get("data", {}).get("subscription_id") - if not subscription_id: - raise ValueError("No subscription ID configured for this trigger node") + def generate(current_user: Account): + # Phase 1: Listen for trigger events + trigger_data = None + for event in TriggerDebugService.waiting_for_triggered( + app_model=app_model, + node_id=node_id, + user_id=current_user.id, + timeout=args.get("timeout", 300), + ): + yield event # Pass through all listening events - # Create debug session + # Check if we received the trigger + if isinstance(event, dict) and event.get("event") == "trigger_debug_received": + # Save trigger data and exit listening loop + trigger_data = TriggerDebugEventData( + subscription_id=event["subscription_id"], + triggers=event["triggers"], + request_id=event["request_id"], + timestamp=event["timestamp"], + ) + break + + # Phase 2: Execute node if trigger was received + if trigger_data: + # Create trigger inputs + trigger_inputs = TriggerInputs.from_trigger_data(trigger_data) + event_generator = TriggerDebugEventGenerator() + + try: + # Get workflow and execute node + workflow_service = WorkflowService() + draft_workflow = workflow_service.get_draft_workflow(app_model) + if not draft_workflow: + raise ValueError("Workflow not found") + + node_execution = workflow_service.run_draft_workflow_node( + app_model=app_model, + draft_workflow=draft_workflow, + node_id=node_id, + user_inputs=trigger_inputs.to_dict(), + account=current_user, + query="", + files=[], + ) + + # Generate node finished event + yield event_generator.generate_node_finished(node_execution).to_dict() + + except Exception as e: + yield event_generator.generate_error(str(e)).to_dict() + + # Use standard response format + from core.app.apps.base_app_generator import BaseAppGenerator + + response = BaseAppGenerator.convert_to_event_stream(generate(current_user)) + return helper.compact_generate_response(response) + + +class DraftWorkflowTriggerRunApi(Resource): + """ + Full workflow debug - Listen for trigger events and execute complete workflow + Path: /apps//workflows/draft/trigger/run + """ + + @setup_required + @login_required + @account_initialization_required + @get_app_model(mode=[AppMode.WORKFLOW]) + def post(self, app_model: App): + """ + Debug trigger workflow by listening for events and running full workflow + """ + if not isinstance(current_user, Account) or not current_user.is_editor: + raise Forbidden() + parser = reqparse.RequestParser() + parser.add_argument("node_id", type=str, required=True, location="json") + parser.add_argument("timeout", type=int, default=300, location="json") + args = parser.parse_args() + + from core.app.entities.app_invoke_entities import InvokeFrom + from core.trigger.entities.entities import TriggerDebugEventData, TriggerInputs + from services.app_generate_service import AppGenerateService + from services.trigger.trigger_debug_events import TriggerDebugEventGenerator from services.trigger_debug_service import TriggerDebugService - assert isinstance(current_user, Account) - session_id = TriggerDebugService.create_debug_session( - app_id=str(app_model.id), - node_id=node_id, - subscription_id=subscription_id, - user_id=current_user.id, - timeout=300, - ) + def generate(current_user: Account): + # Phase 1: Listen for trigger events + trigger_data = None + for event in TriggerDebugService.waiting_for_triggered( + app_model=app_model, + node_id=args["node_id"], + user_id=current_user.id, + timeout=args.get("timeout", 300), + ): + yield event # Pass through all listening events - # Stream events to client - def generate(): - for event_data in TriggerDebugService.listen_for_events(session_id): - if isinstance(event_data, dict): - if event_data.get("type") == "heartbeat": - yield f"event: heartbeat\ndata: {json.dumps(event_data)}\n\n" - elif event_data.get("type") == "timeout": - yield f"event: timeout\ndata: {json.dumps({'message': 'Session timed out'})}\n\n" - break - elif event_data.get("type") == "error": - yield f"event: error\ndata: {json.dumps(event_data)}\n\n" - break - else: - # Trigger event - prepare for workflow execution - yield f"event: trigger\ndata: {json.dumps(event_data)}\n\n" + # Check if we received the trigger + if isinstance(event, dict) and event.get("event") == "trigger_debug_received": + # Save trigger data and exit listening loop + trigger_data = TriggerDebugEventData( + subscription_id=event["subscription_id"], + triggers=event["triggers"], + request_id=event["request_id"], + timestamp=event["timestamp"], + ) + break - # TODO: Execute workflow with trigger data if needed - # This would involve extracting trigger data and running the workflow - # For now, just send the trigger event - break + # Phase 2: Execute workflow if trigger was received + if trigger_data: + event_generator = TriggerDebugEventGenerator() - from flask import Response + # Yield workflow started event + yield event_generator.generate_workflow_started(trigger_data).to_dict() - return Response(generate(), mimetype="text/event-stream") + # Create trigger inputs and convert to workflow args + trigger_inputs = TriggerInputs.from_trigger_data(trigger_data) + combined_args = trigger_inputs.to_workflow_args() + + try: + # Execute workflow + workflow_response = AppGenerateService.generate( + app_model=app_model, + user=current_user, + args=combined_args, + invoke_from=InvokeFrom.DEBUGGER, + streaming=True, + ) + + # Pass through workflow's standard event stream + yield from workflow_response + + except Exception as e: + yield event_generator.generate_error(str(e)).to_dict() + + # Use standard response format + from core.app.apps.base_app_generator import BaseAppGenerator + + response = BaseAppGenerator.convert_to_event_stream(generate(current_user)) + return helper.compact_generate_response(response) api.add_resource( @@ -900,6 +1001,10 @@ api.add_resource( DraftWorkflowTriggerNodeApi, "/apps//workflows/draft/nodes//trigger", ) +api.add_resource( + DraftWorkflowTriggerRunApi, + "/apps//workflows/draft/trigger/run", +) api.add_resource( AdvancedChatDraftRunIterationNodeApi, "/apps//advanced-chat/workflows/draft/iteration/nodes//run", diff --git a/api/controllers/console/workspace/trigger_providers.py b/api/controllers/console/workspace/trigger_providers.py index 8da0fa6dbd..3fdadea8ac 100644 --- a/api/controllers/console/workspace/trigger_providers.py +++ b/api/controllers/console/workspace/trigger_providers.py @@ -447,13 +447,14 @@ class TriggerOAuthClientManageApi(Resource): provider_id=provider_id, ) provider_controller = TriggerManager.get_trigger_provider(user.current_tenant_id, provider_id) + redirect_uri = f"{dify_config.CONSOLE_API_URL}/console/api/oauth/plugin/{provider}/trigger/callback" return jsonable_encoder( { "configured": bool(custom_params or system_client), "oauth_client_schema": provider_controller.get_oauth_client_schema(), "custom_configured": bool(custom_params), "custom_enabled": is_custom_enabled, - "redirect_uri": f"{dify_config.CONSOLE_API_URL}/console/api/oauth/plugin/{provider}/trigger/callback", + "redirect_uri": redirect_uri, "params": custom_params if custom_params else {}, } ) diff --git a/api/core/app/entities/queue_entities.py b/api/core/app/entities/queue_entities.py index db0297c352..583463ffbb 100644 --- a/api/core/app/entities/queue_entities.py +++ b/api/core/app/entities/queue_entities.py @@ -51,6 +51,12 @@ class QueueEvent(StrEnum): PING = "ping" STOP = "stop" RETRY = "retry" + # Trigger debug events + TRIGGER_DEBUG_LISTENING_STARTED = "trigger_debug_listening_started" + TRIGGER_DEBUG_RECEIVED = "trigger_debug_received" + TRIGGER_DEBUG_NODE_FINISHED = "trigger_debug_node_finished" + TRIGGER_DEBUG_WORKFLOW_STARTED = "trigger_debug_workflow_started" + TRIGGER_DEBUG_TIMEOUT = "trigger_debug_timeout" class AppQueueEvent(BaseModel): @@ -718,6 +724,65 @@ class QueueParallelBranchRunSucceededEvent(AppQueueEvent): """loop id if node is in loop""" +class QueueTriggerDebugListeningStartedEvent(AppQueueEvent): + """ + QueueTriggerDebugListeningStartedEvent entity + """ + + event: QueueEvent = QueueEvent.TRIGGER_DEBUG_LISTENING_STARTED + session_id: str + webhook_url: str + timeout: int + + +class QueueTriggerDebugReceivedEvent(AppQueueEvent): + """ + QueueTriggerDebugReceivedEvent entity + """ + + event: QueueEvent = QueueEvent.TRIGGER_DEBUG_RECEIVED + subscription_id: str + triggers: list[str] + request_id: str + timestamp: float + + +class QueueTriggerDebugNodeFinishedEvent(AppQueueEvent): + """ + QueueTriggerDebugNodeFinishedEvent entity + """ + + event: QueueEvent = QueueEvent.TRIGGER_DEBUG_NODE_FINISHED + id: str + node_id: str + node_type: str + status: str + outputs: Optional[Mapping[str, Any]] = None + error: Optional[str] = None + elapsed_time: Optional[float] = None + execution_metadata: Optional[Mapping[str, Any]] = None + + +class QueueTriggerDebugWorkflowStartedEvent(AppQueueEvent): + """ + QueueTriggerDebugWorkflowStartedEvent entity + """ + + event: QueueEvent = QueueEvent.TRIGGER_DEBUG_WORKFLOW_STARTED + subscription_id: str + triggers: list[str] + request_id: str + + +class QueueTriggerDebugTimeoutEvent(AppQueueEvent): + """ + QueueTriggerDebugTimeoutEvent entity + """ + + event: QueueEvent = QueueEvent.TRIGGER_DEBUG_TIMEOUT + error: str = "Timeout waiting for trigger" + + class QueueParallelBranchRunFailedEvent(AppQueueEvent): """ QueueParallelBranchRunFailedEvent entity diff --git a/api/core/app/entities/task_entities.py b/api/core/app/entities/task_entities.py index a1c0368354..e21f84e77c 100644 --- a/api/core/app/entities/task_entities.py +++ b/api/core/app/entities/task_entities.py @@ -82,6 +82,12 @@ class StreamEvent(Enum): TEXT_CHUNK = "text_chunk" TEXT_REPLACE = "text_replace" AGENT_LOG = "agent_log" + # Trigger debug events + TRIGGER_DEBUG_LISTENING_STARTED = "trigger_debug_listening_started" + TRIGGER_DEBUG_RECEIVED = "trigger_debug_received" + TRIGGER_DEBUG_NODE_FINISHED = "trigger_debug_node_finished" + TRIGGER_DEBUG_WORKFLOW_STARTED = "trigger_debug_workflow_started" + TRIGGER_DEBUG_TIMEOUT = "trigger_debug_timeout" class StreamResponse(BaseModel): @@ -837,3 +843,63 @@ class AgentLogStreamResponse(StreamResponse): event: StreamEvent = StreamEvent.AGENT_LOG data: Data + + +# Trigger Debug Stream Responses +class TriggerDebugListeningStartedResponse(StreamResponse): + """ + TriggerDebugListeningStartedResponse entity + """ + + event: StreamEvent = StreamEvent.TRIGGER_DEBUG_LISTENING_STARTED + session_id: str + webhook_url: str + timeout: int + + +class TriggerDebugReceivedResponse(StreamResponse): + """ + TriggerDebugReceivedResponse entity + """ + + event: StreamEvent = StreamEvent.TRIGGER_DEBUG_RECEIVED + subscription_id: str + triggers: list[str] + request_id: str + timestamp: float + + +class TriggerDebugNodeFinishedResponse(StreamResponse): + """ + TriggerDebugNodeFinishedResponse entity + """ + + event: StreamEvent = StreamEvent.TRIGGER_DEBUG_NODE_FINISHED + id: str + node_id: str + node_type: str + status: str + outputs: Optional[Mapping[str, Any]] = None + error: Optional[str] = None + elapsed_time: float + execution_metadata: Optional[Mapping[str, Any]] = None + + +class TriggerDebugWorkflowStartedResponse(StreamResponse): + """ + TriggerDebugWorkflowStartedResponse entity + """ + + event: StreamEvent = StreamEvent.TRIGGER_DEBUG_WORKFLOW_STARTED + subscription_id: str + triggers: list[str] + request_id: str + + +class TriggerDebugTimeoutResponse(StreamResponse): + """ + TriggerDebugTimeoutResponse entity + """ + + event: StreamEvent = StreamEvent.TRIGGER_DEBUG_TIMEOUT + error: str = "Timeout waiting for trigger" diff --git a/api/core/trigger/entities/entities.py b/api/core/trigger/entities/entities.py index 93f6daf8b7..c54bf6ebaa 100644 --- a/api/core/trigger/entities/entities.py +++ b/api/core/trigger/entities/entities.py @@ -262,6 +262,36 @@ class TriggerDebugEventData(BaseModel): model_config = ConfigDict(arbitrary_types_allowed=True) +class TriggerInputs(BaseModel): + """Standard inputs for trigger nodes.""" + + request_id: str + trigger_name: str + subscription_id: str + + @classmethod + def from_trigger_data(cls, trigger_data: TriggerDebugEventData) -> "TriggerInputs": + """Create from debug event data.""" + return cls( + request_id=trigger_data.request_id, + trigger_name=trigger_data.triggers[0] if trigger_data.triggers else "", + subscription_id=trigger_data.subscription_id, + ) + + @classmethod + def from_trigger_entity(cls, request_id: str, subscription_id: str, trigger: TriggerEntity) -> "TriggerInputs": + """Create from trigger entity (for production).""" + return cls(request_id=request_id, trigger_name=trigger.identity.name, subscription_id=subscription_id) + + def to_workflow_args(self) -> dict[str, Any]: + """Convert to workflow arguments format.""" + return {"inputs": self.model_dump(), "files": []} + + def to_dict(self) -> dict[str, Any]: + """Convert to dict (alias for model_dump).""" + return self.model_dump() + + # Export all entities __all__ = [ "OAuthSchema", @@ -272,6 +302,7 @@ __all__ = [ "TriggerDescription", "TriggerEntity", "TriggerIdentity", + "TriggerInputs", "TriggerParameter", "TriggerParameterType", "TriggerProviderEntity", diff --git a/api/services/trigger/trigger_debug_events.py b/api/services/trigger/trigger_debug_events.py new file mode 100644 index 0000000000..d5d083f9e5 --- /dev/null +++ b/api/services/trigger/trigger_debug_events.py @@ -0,0 +1,48 @@ +""" +Event generator for trigger debug operations. + +Provides structured event generation for trigger debug SSE streams. +""" + +from core.app.entities.task_entities import ( + ErrorStreamResponse, + StreamResponse, + TriggerDebugNodeFinishedResponse, + TriggerDebugWorkflowStartedResponse, +) +from core.trigger.entities.entities import TriggerDebugEventData +from models.workflow import WorkflowNodeExecutionModel + + +class TriggerDebugEventGenerator: + """Generator for trigger debug events.""" + + @staticmethod + def generate_node_finished(node_execution: WorkflowNodeExecutionModel) -> StreamResponse: + """Generate node finished event.""" + return TriggerDebugNodeFinishedResponse( + task_id="", + id=node_execution.id, + node_id=node_execution.node_id, + node_type=node_execution.node_type, + status=node_execution.status, + outputs=node_execution.outputs_dict, + error=node_execution.error, + elapsed_time=node_execution.elapsed_time, + execution_metadata=node_execution.execution_metadata_dict, + ) + + @staticmethod + def generate_workflow_started(event_data: TriggerDebugEventData) -> StreamResponse: + """Generate workflow started event.""" + return TriggerDebugWorkflowStartedResponse( + task_id="", + subscription_id=event_data.subscription_id, + triggers=event_data.triggers, + request_id=event_data.request_id, + ) + + @staticmethod + def generate_error(error: str) -> StreamResponse: + """Generate error event.""" + return ErrorStreamResponse(task_id="", err=Exception(error)) diff --git a/api/services/trigger_debug_service.py b/api/services/trigger_debug_service.py index 35993ebf68..6902ca5fb4 100644 --- a/api/services/trigger_debug_service.py +++ b/api/services/trigger_debug_service.py @@ -10,13 +10,41 @@ import logging import time import uuid from collections.abc import Generator +from dataclasses import dataclass +from typing import Any +from flask import request +from werkzeug.exceptions import NotFound + +from core.app.entities.task_entities import ( + ErrorStreamResponse, + PingStreamResponse, + TriggerDebugListeningStartedResponse, + TriggerDebugReceivedResponse, + TriggerDebugTimeoutResponse, +) from core.trigger.entities.entities import TriggerDebugEventData from extensions.ext_redis import redis_client +from models.model import App +from services.trigger.trigger_provider_service import TriggerProviderService +from services.workflow_service import WorkflowService logger = logging.getLogger(__name__) +@dataclass +class TriggerDebuggingContext: + """Context for trigger debugging session.""" + + session_id: str + subscription_id: str + webhook_url: str + node_id: str + app_id: str + user_id: str + timeout: int + + class TriggerDebugService: """ Trigger debug service - supports distributed environments. @@ -27,9 +55,122 @@ class TriggerDebugService: SUBSCRIPTION_DEBUG_PREFIX = "trigger_debug_subscription:" PUBSUB_CHANNEL_PREFIX = "trigger_debug_channel:" + __DEFAULT_LISTEN_TIMEOUT__ = 300 + + @classmethod + def build_debugging_context( + cls, + app_model: App, + node_id: str, + user_id: str, + timeout: int = __DEFAULT_LISTEN_TIMEOUT__, + ) -> TriggerDebuggingContext: + """ + Build debugging context for trigger node. + + Args: + app_model: Application model + node_id: Node ID to debug + user_id: User ID creating the session + timeout: Session timeout in seconds + + Returns: + TriggerDebuggingContext with all debugging information + + Raises: + NotFound: If workflow or node not found + ValueError: If node is not a trigger plugin or has no subscription + """ + # Get and validate workflow + workflow_service = WorkflowService() + draft_workflow = workflow_service.get_draft_workflow(app_model) + if not draft_workflow: + raise NotFound("Workflow not found") + + # Get and validate node + node_config = draft_workflow.get_node_config_by_id(node_id) + if not node_config: + raise NotFound(f"Node {node_id} not found") + + if node_config.get("data", {}).get("type") != "plugin": + raise ValueError("Node is not a trigger plugin node") + + subscription_id = node_config.get("data", {}).get("subscription_id") + if not subscription_id: + raise ValueError("No subscription configured for this trigger node") + + # Create debug session + app_id = str(app_model.id) + session_id = cls.create_debug_session( + app_id=app_id, + node_id=node_id, + subscription_id=subscription_id, + user_id=user_id, + timeout=timeout, + ) + + # Get webhook URL + subscription = TriggerProviderService.get_subscription_by_id( + tenant_id=app_model.tenant_id, subscription_id=subscription_id + ) + webhook_url = ( + f"{request.host_url.rstrip('/')}/trigger/plugin/{subscription.endpoint}" if subscription else "Unknown" + ) + + return TriggerDebuggingContext( + session_id=session_id, + subscription_id=subscription_id, + webhook_url=webhook_url, + node_id=node_id, + app_id=app_id, + user_id=user_id, + timeout=timeout, + ) + + @classmethod + def waiting_for_triggered( + cls, + app_model: App, + node_id: str, + user_id: str, + timeout: int = __DEFAULT_LISTEN_TIMEOUT__, + ) -> Generator[dict[str, Any], None, None]: + """ + Listen for trigger events only. + + This method sets up a debug session and listens for incoming trigger events. + It yields events as they occur and returns when a trigger is received or timeout occurs. + + Args: + app_model: Application model + node_id: Node ID to debug + user_id: User ID creating the session + timeout: Timeout in seconds + + Yields: + Event dictionaries including: + - listening_started: Initial event with webhook URL + - ping: Periodic heartbeat events + - trigger_debug_received: When trigger is received + - timeout: When timeout occurs + - error: On any errors + """ + # Build debugging context + context = cls.build_debugging_context(app_model, node_id, user_id, timeout) + + # Listen for events and pass them through + for event in cls.listen_for_events( + session_id=context.session_id, webhook_url=context.webhook_url, timeout=context.timeout + ): + yield event + + # If we received a trigger, listening is complete + if isinstance(event, dict) and event.get("event") == "trigger_debug_received": + break + @classmethod def create_debug_session( - cls, app_id: str, node_id: str, subscription_id: str, user_id: str, timeout: int = 300 + cls, app_id: str, node_id: str, subscription_id: str, user_id: str, timeout: int = __DEFAULT_LISTEN_TIMEOUT__ ) -> str: """ Create a debug session. @@ -66,17 +207,27 @@ class TriggerDebugService: return session_id @classmethod - def listen_for_events(cls, session_id: str, timeout: int = 300) -> Generator: + def listen_for_events( + cls, session_id: str, webhook_url: str, timeout: int = __DEFAULT_LISTEN_TIMEOUT__ + ) -> Generator[dict[str, Any], None, None]: """ - Listen for events using Redis Pub/Sub. + Listen for events using Redis Pub/Sub and generate structured events. Args: session_id: Debug session ID + webhook_url: Webhook URL for the trigger timeout: Timeout in seconds Yields: - Event data or heartbeat messages + Structured AppQueueEvent objects """ + # Send initial listening started event + yield TriggerDebugListeningStartedResponse( + task_id="", # Will be set by the caller if needed + session_id=session_id, + webhook_url=webhook_url, + timeout=timeout, + ).to_dict() pubsub = redis_client.pubsub() channel = f"{cls.PUBSUB_CHANNEL_PREFIX}{session_id}" @@ -94,24 +245,45 @@ class TriggerDebugService: message = pubsub.get_message(timeout=1.0) if message and message["type"] == "message": - # Received trigger event - event_data = json.loads(message["data"]) - logger.info("Received trigger event for session %s", session_id) - yield event_data - break # End listening after receiving event + # Received trigger event - parse and create structured event + try: + event_data = json.loads(message["data"]) + logger.info("Received trigger event for session %s", session_id) + + # Create structured trigger received event + trigger_data = TriggerDebugEventData( + subscription_id=event_data["subscription_id"], + triggers=event_data["triggers"], + request_id=event_data["request_id"], + timestamp=event_data.get("timestamp", time.time()), + ) + yield TriggerDebugReceivedResponse( + task_id="", + subscription_id=trigger_data.subscription_id, + triggers=trigger_data.triggers, + request_id=trigger_data.request_id, + timestamp=trigger_data.timestamp, + ).to_dict() + break # End listening after receiving event + except (json.JSONDecodeError, KeyError) as e: + logger.exception("Failed to parse trigger event") + yield ErrorStreamResponse( + task_id="", err=Exception(f"Failed to parse trigger event: {str(e)}") + ).to_dict() + break # Send periodic heartbeat if time.time() - last_heartbeat > 5: - yield {"type": "heartbeat", "remaining": int(timeout - (time.time() - start_time))} + yield PingStreamResponse(task_id="").to_dict() last_heartbeat = time.time() # Timeout if time.time() - start_time >= timeout: - yield {"type": "timeout"} + yield TriggerDebugTimeoutResponse(task_id="").to_dict() except Exception as e: logger.exception("Error in listen_for_events", exc_info=e) - yield {"type": "error", "message": str(e)} + yield ErrorStreamResponse(task_id="", err=e).to_dict() finally: # Clean up resources diff --git a/api/services/trigger_service.py b/api/services/trigger_service.py index 13fc7340f0..b0c781fbc5 100644 --- a/api/services/trigger_service.py +++ b/api/services/trigger_service.py @@ -8,7 +8,7 @@ from sqlalchemy.orm import Session from core.plugin.entities.plugin import TriggerProviderID from core.plugin.utils.http_parser import serialize_request -from core.trigger.entities.entities import TriggerDebugEventData, TriggerEntity +from core.trigger.entities.entities import TriggerDebugEventData, TriggerEntity, TriggerInputs from core.trigger.trigger_manager import TriggerManager from extensions.ext_database import db from extensions.ext_storage import storage @@ -87,6 +87,11 @@ class TriggerService: ) continue + # Create trigger inputs using new structure + trigger_inputs = TriggerInputs.from_trigger_entity( + request_id=request_id, subscription_id=subscription.id, trigger=trigger + ) + # Create trigger data for async execution trigger_data = PluginTriggerData( app_id=plugin_trigger.app_id, @@ -96,11 +101,7 @@ class TriggerService: trigger_type=WorkflowRunTriggeredFrom.PLUGIN, plugin_id=subscription.provider_id, endpoint_id=subscription.endpoint_id, - inputs={ - "request_id": request_id, - "trigger_name": trigger.identity.name, - "subscription_id": subscription.id, - }, + inputs=trigger_inputs.to_dict(), ) # Trigger async workflow @@ -121,15 +122,12 @@ class TriggerService: return dispatched_count @classmethod - def dispatch_debugging_sessions( - cls, subscription_id: str, request: Request, triggers: list[str], request_id: str - ) -> int: + def dispatch_debugging_sessions(cls, subscription_id: str, triggers: list[str], request_id: str) -> int: """ Dispatch to debug sessions - simplified version. Args: subscription_id: Subscription ID - request: Original request triggers: List of trigger names request_id: Request ID for storage reference """ diff --git a/api/tasks/trigger_processing_tasks.py b/api/tasks/trigger_processing_tasks.py index d248f9eb3a..b13d8a2999 100644 --- a/api/tasks/trigger_processing_tasks.py +++ b/api/tasks/trigger_processing_tasks.py @@ -11,7 +11,6 @@ from celery import shared_task from sqlalchemy.orm import Session from core.plugin.entities.plugin import TriggerProviderID -from core.plugin.utils.http_parser import deserialize_request from core.trigger.trigger_manager import TriggerManager from extensions.ext_database import db from extensions.ext_storage import storage @@ -54,10 +53,12 @@ def dispatch_triggered_workflows_async( request_id, ) - # Load request from storage + # Verify request exists in storage try: serialized_request = storage.load_once(f"triggers/{request_id}") - request = deserialize_request(serialized_request) + # Just verify it exists, we don't need to deserialize it here + if not serialized_request: + raise ValueError("Request not found in storage") except Exception as e: logger.exception("Failed to load request %s", request_id, exc_info=e) return {"status": "failed", "error": f"Failed to load request: {str(e)}"} @@ -107,7 +108,6 @@ def dispatch_triggered_workflows_async( try: debug_dispatched = TriggerService.dispatch_debugging_sessions( subscription_id=subscription_id, - request=request, triggers=triggers, request_id=request_id, ) diff --git a/web/app/components/workflow/nodes/tool/use-config.ts b/web/app/components/workflow/nodes/tool/use-config.ts index f93d938f1f..c169ae71a4 100644 --- a/web/app/components/workflow/nodes/tool/use-config.ts +++ b/web/app/components/workflow/nodes/tool/use-config.ts @@ -32,7 +32,7 @@ const useConfig = (id: string, payload: ToolNodeType) => { * tool_parameters: tool dynamic setting(form type = llm) * output_schema: tool dynamic output */ - const { provider_id, provider_type, trigger_name: tool_name, tool_configurations, output_schema, tool_parameters } = inputs + const { provider_id, provider_type, tool_name, tool_configurations, output_schema, tool_parameters } = inputs const isBuiltIn = provider_type === CollectionType.builtIn const buildInTools = useStore(s => s.buildInTools) const customTools = useStore(s => s.customTools)