diff --git a/api/controllers/console/app/workflow.py b/api/controllers/console/app/workflow.py index 524119c0bd..f804710094 100644 --- a/api/controllers/console/app/workflow.py +++ b/api/controllers/console/app/workflow.py @@ -35,7 +35,8 @@ from models.workflow import Workflow from services.app_generate_service import AppGenerateService from services.errors.app import WorkflowHashNotEqualError from services.errors.llm import InvokeRateLimitError -from services.trigger_debug_service import TriggerDebugService +from services.trigger_debug_service import TriggerDebugService, WebhookDebugService +from services.webhook_service import WebhookService from services.workflow_service import DraftWorkflowDeletionError, WorkflowInUseError, WorkflowService logger = logging.getLogger(__name__) @@ -1152,6 +1153,90 @@ class DraftWorkflowTriggerRunApi(Resource): ), 500 +@console_ns.route("/apps//workflows/draft/trigger/webhook/run") +class DraftWorkflowTriggerWebhookRunApi(Resource): + """ + Full workflow debug when the start node is a webhook trigger + Path: /apps//workflows/draft/trigger/webhook/run + """ + + @api.doc("draft_workflow_trigger_webhook_run") + @api.doc(description="Full workflow debug when the start node is a webhook trigger") + @api.doc(params={"app_id": "Application ID"}) + @api.expect( + api.model( + "DraftWorkflowTriggerWebhookRunRequest", + { + "node_id": fields.String(required=True, description="Node ID"), + } + ) + ) + @api.response(200, "Workflow executed successfully") + @api.response(403, "Permission denied") + @api.response(500, "Internal server error") + @setup_required + @login_required + @account_initialization_required + @get_app_model(mode=[AppMode.WORKFLOW]) + def post(self, app_model: App): + """ + Full workflow debug when the start node is a webhook trigger + """ + if not isinstance(current_user, Account) or not current_user.has_edit_permission: + raise Forbidden() + + parser = reqparse.RequestParser() + parser.add_argument("node_id", type=str, required=True, location="json", nullable=False) + args = parser.parse_args() + node_id = args["node_id"] + + event = WebhookDebugService.poll_event( + tenant_id=app_model.tenant_id, + user_id=current_user.id, + app_id=app_model.id, + node_id=node_id, + ) + + if not event: + return jsonable_encoder({"status": "waiting", "retry_in": 2000}) + + payload = event.payload or {} + workflow_inputs = payload.get("inputs") + if workflow_inputs is None: + webhook_data = payload.get("webhook_data", {}) + workflow_inputs = WebhookService.build_workflow_inputs(webhook_data) + + workflow_args = { + "inputs": workflow_inputs or {}, + "query": "", + "files": [], + } + + external_trace_id = get_external_trace_id(request) + if external_trace_id: + workflow_args["external_trace_id"] = external_trace_id + + try: + response = AppGenerateService.generate( + app_model=app_model, + user=current_user, + args=workflow_args, + invoke_from=InvokeFrom.DEBUGGER, + streaming=True, + root_node_id=node_id, + ) + return helper.compact_generate_response(response) + except InvokeRateLimitError as ex: + raise InvokeRateLimitHttpError(ex.description) + except Exception: + logger.exception("Error running draft workflow trigger webhook run") + return jsonable_encoder( + { + "status": "error", + } + ), 500 + + @console_ns.route("/apps//workflows/draft/trigger/schedule/run") class DraftWorkflowTriggerScheduleRunApi(Resource): """ diff --git a/api/controllers/trigger/webhook.py b/api/controllers/trigger/webhook.py index 04f2d6483d..15adb567ad 100644 --- a/api/controllers/trigger/webhook.py +++ b/api/controllers/trigger/webhook.py @@ -1,16 +1,28 @@ import logging +import time from flask import jsonify from werkzeug.exceptions import NotFound, RequestEntityTooLarge from controllers.trigger import bp +from services.trigger_debug_service import WebhookDebugService from services.webhook_service import WebhookService logger = logging.getLogger(__name__) +def _prepare_webhook_execution(webhook_id: str): + """Fetch trigger context, extract request data, and validate payload.""" + webhook_trigger, workflow, node_config = WebhookService.get_webhook_trigger_and_workflow(webhook_id) + webhook_data = WebhookService.extract_webhook_data(webhook_trigger) + validation_result = WebhookService.validate_webhook_request(webhook_data, node_config) + if not validation_result["valid"]: + return webhook_trigger, workflow, node_config, webhook_data, validation_result["error"] + + return webhook_trigger, workflow, node_config, webhook_data, None + + @bp.route("/webhook/", methods=["GET", "POST", "PUT", "PATCH", "DELETE", "HEAD", "OPTIONS"]) -@bp.route("/webhook-debug/", methods=["GET", "POST", "PUT", "PATCH", "DELETE", "HEAD", "OPTIONS"]) def handle_webhook(webhook_id: str): """ Handle webhook trigger calls. @@ -19,16 +31,9 @@ def handle_webhook(webhook_id: str): configured webhook trigger settings. """ try: - # Get webhook trigger, workflow, and node configuration - webhook_trigger, workflow, node_config = WebhookService.get_webhook_trigger_and_workflow(webhook_id) - - # Extract request data - webhook_data = WebhookService.extract_webhook_data(webhook_trigger) - - # Validate request against node configuration - validation_result = WebhookService.validate_webhook_request(webhook_data, node_config) - if not validation_result["valid"]: - return jsonify({"error": "Bad Request", "message": validation_result["error"]}), 400 + webhook_trigger, workflow, node_config, webhook_data, error = _prepare_webhook_execution(webhook_id) + if error: + return jsonify({"error": "Bad Request", "message": error}), 400 # Process webhook call (send to Celery) WebhookService.trigger_workflow_execution(webhook_trigger, webhook_data, workflow) @@ -44,3 +49,36 @@ def handle_webhook(webhook_id: str): except Exception as e: logger.exception("Webhook processing failed for %s", webhook_id) return jsonify({"error": "Internal server error", "message": str(e)}), 500 + + +@bp.route("/webhook-debug/", methods=["GET", "POST", "PUT", "PATCH", "DELETE", "HEAD", "OPTIONS"]) +def handle_webhook_debug(webhook_id: str): + """Handle webhook debug calls without triggering production workflow execution.""" + try: + webhook_trigger, workflow, node_config, webhook_data, error = _prepare_webhook_execution(webhook_id) + if error: + return jsonify({"error": "Bad Request", "message": error}), 400 + + workflow_inputs = WebhookService.build_workflow_inputs(webhook_data) + WebhookDebugService.dispatch_event( + tenant_id=webhook_trigger.tenant_id, + app_id=webhook_trigger.app_id, + node_id=webhook_trigger.node_id, + request_id=f"webhook_debug_{webhook_trigger.webhook_id}_{int(time.time() * 1000)}", + timestamp=int(time.time()), + payload={ + "inputs": workflow_inputs, + "webhook_data": webhook_data, + "method": webhook_data.get("method"), + }, + ) + response_data, status_code = WebhookService.generate_webhook_response(node_config) + return jsonify(response_data), status_code + + except ValueError as e: + raise NotFound(str(e)) + except RequestEntityTooLarge: + raise + except Exception as e: + logger.exception("Webhook debug processing failed for %s", webhook_id) + return jsonify({"error": "Internal server error", "message": str(e)}), 500 diff --git a/api/services/trigger_debug_service.py b/api/services/trigger_debug_service.py index 956ce7132f..fad06d9db5 100644 --- a/api/services/trigger_debug_service.py +++ b/api/services/trigger_debug_service.py @@ -1,10 +1,10 @@ -"""Trigger debug service for webhook debugging in draft workflows.""" +"""Trigger debug service supporting plugin and webhook debugging in draft workflows.""" import hashlib import logging -from typing import Optional +from typing import Any, Optional -from pydantic import BaseModel +from pydantic import BaseModel, Field from redis import RedisError from extensions.ext_redis import redis_client @@ -20,6 +20,18 @@ class TriggerDebugEvent(BaseModel): timestamp: int +class WebhookDebugEvent(BaseModel): + request_id: str + timestamp: int + node_id: str + payload: dict[str, Any] = Field(default_factory=dict) + + +def _address(tenant_id: str, user_id: str, app_id: str, node_id: str) -> str: + address_id = hashlib.sha1(f"{user_id}|{app_id}|{node_id}".encode()).hexdigest() + return f"trigger_debug_inbox:{{{tenant_id}}}:{address_id}" + + class TriggerDebugService: """ Redis-based trigger debug service with polling support. @@ -54,11 +66,6 @@ class TriggerDebugService: "return #a" ) - @classmethod - def address(cls, tenant_id: str, user_id: str, app_id: str, node_id: str) -> str: - address_id = hashlib.sha1(f"{user_id}|{app_id}|{node_id}".encode()).hexdigest() - return f"trigger_debug_inbox:{{{tenant_id}}}:{address_id}" - @classmethod def waiting_pool(cls, tenant_id: str, subscription_id: str, trigger_name: str) -> str: return f"trigger_debug_waiting_pool:{{{tenant_id}}}:{subscription_id}:{trigger_name}" @@ -116,7 +123,7 @@ class TriggerDebugService: event = redis_client.eval( cls.LUA_SELECT, 2, - cls.address(tenant_id, user_id, app_id, node_id), + _address(tenant_id, user_id, app_id, node_id), cls.waiting_pool(tenant_id, subscription_id, trigger_name), address_id, ) @@ -124,3 +131,63 @@ class TriggerDebugService: except RedisError: logger.exception("Failed to poll debug event") return None + + +class WebhookDebugService: + """Debug helpers dedicated to webhook triggers.""" + + @staticmethod + def waiting_pool(tenant_id: str, app_id: str, node_id: str) -> str: + return f"trigger_debug_waiting_pool:{{{tenant_id}}}:{app_id}:{node_id}" + + @classmethod + def dispatch_event( + cls, + tenant_id: str, + app_id: str, + node_id: str, + request_id: str, + timestamp: int, + payload: dict[str, Any], + ) -> int: + event_json = WebhookDebugEvent( + request_id=request_id, + timestamp=timestamp, + node_id=node_id, + payload=payload, + ).model_dump_json() + + try: + return redis_client.eval( + TriggerDebugService.LUA_DISPATCH, + 1, + cls.waiting_pool(tenant_id, app_id, node_id), + tenant_id, + event_json, + ) + except RedisError: + logger.exception("Failed to dispatch webhook debug event") + return 0 + + @classmethod + def poll_event( + cls, + tenant_id: str, + user_id: str, + app_id: str, + node_id: str, + ) -> Optional[WebhookDebugEvent]: + address_id = hashlib.sha1(f"{user_id}|{app_id}|{node_id}".encode()).hexdigest() + + try: + event = redis_client.eval( + TriggerDebugService.LUA_SELECT, + 2, + _address(tenant_id, user_id, app_id, node_id), + cls.waiting_pool(tenant_id, app_id, node_id), + address_id, + ) + return WebhookDebugEvent.model_validate_json(event) if event else None + except RedisError: + logger.exception("Failed to poll webhook debug event") + return None diff --git a/api/services/webhook_service.py b/api/services/webhook_service.py index bf699c58ac..f2e1f89f04 100644 --- a/api/services/webhook_service.py +++ b/api/services/webhook_service.py @@ -522,6 +522,16 @@ class WebhookService: except ValueError: return False + @classmethod + def build_workflow_inputs(cls, webhook_data: dict[str, Any]) -> dict[str, Any]: + """Construct workflow inputs payload from webhook data.""" + return { + "webhook_data": webhook_data, + "webhook_headers": webhook_data.get("headers", {}), + "webhook_query_params": webhook_data.get("query_params", {}), + "webhook_body": webhook_data.get("body", {}), + } + @classmethod def trigger_workflow_execution( cls, webhook_trigger: WorkflowWebhookTrigger, webhook_data: dict[str, Any], workflow: Workflow @@ -545,12 +555,7 @@ class WebhookService: # Prepare inputs for the webhook node # The webhook node expects webhook_data in the inputs - workflow_inputs = { - "webhook_data": webhook_data, - "webhook_headers": webhook_data.get("headers", {}), - "webhook_query_params": webhook_data.get("query_params", {}), - "webhook_body": webhook_data.get("body", {}), - } + workflow_inputs = cls.build_workflow_inputs(webhook_data) # Create trigger data trigger_data = TriggerData( diff --git a/web/app/components/workflow-app/components/workflow-main.tsx b/web/app/components/workflow-app/components/workflow-main.tsx index 7b6b803685..14a2156d3c 100644 --- a/web/app/components/workflow-app/components/workflow-main.tsx +++ b/web/app/components/workflow-app/components/workflow-main.tsx @@ -67,6 +67,7 @@ const WorkflowMain = ({ handleWorkflowStartRunInChatflow, handleWorkflowStartRunInWorkflow, handleWorkflowTriggerScheduleRunInWorkflow, + handleWorkflowTriggerWebhookRunInWorkflow, } = useWorkflowStartRun() const availableNodesMetaData = useAvailableNodesMetaData() const { getWorkflowRunAndTraceUrl } = useGetRunAndTraceUrl() @@ -110,6 +111,7 @@ const WorkflowMain = ({ handleWorkflowStartRunInChatflow, handleWorkflowStartRunInWorkflow, handleWorkflowTriggerScheduleRunInWorkflow, + handleWorkflowTriggerWebhookRunInWorkflow, availableNodesMetaData, getWorkflowRunAndTraceUrl, exportCheck, @@ -144,6 +146,7 @@ const WorkflowMain = ({ handleWorkflowStartRunInChatflow, handleWorkflowStartRunInWorkflow, handleWorkflowTriggerScheduleRunInWorkflow, + handleWorkflowTriggerWebhookRunInWorkflow, availableNodesMetaData, getWorkflowRunAndTraceUrl, exportCheck, diff --git a/web/app/components/workflow-app/hooks/use-workflow-run.ts b/web/app/components/workflow-app/hooks/use-workflow-run.ts index e11bb9c30d..a5bb173c1e 100644 --- a/web/app/components/workflow-app/hooks/use-workflow-run.ts +++ b/web/app/components/workflow-app/hooks/use-workflow-run.ts @@ -1,4 +1,4 @@ -import { useCallback } from 'react' +import { useCallback, useRef } from 'react' import { useReactFlow, useStoreApi, @@ -12,7 +12,8 @@ import { useWorkflowUpdate } from '@/app/components/workflow/hooks/use-workflow- import { useWorkflowRunEvent } from '@/app/components/workflow/hooks/use-workflow-run-event/use-workflow-run-event' import { useStore as useAppStore } from '@/app/components/app/store' import type { IOtherOptions } from '@/service/base' -import { ssePost } from '@/service/base' +import Toast from '@/app/components/base/toast' +import { handleStream, ssePost } from '@/service/base' import { stopWorkflowRun } from '@/service/workflow' import { useFeaturesStore } from '@/app/components/base/features/hooks' import { AudioPlayerManager } from '@/app/components/base/audio-btn/audio.player.manager' @@ -22,12 +23,15 @@ import { useNodesSyncDraft } from './use-nodes-sync-draft' import { useInvalidAllLastRun } from '@/service/use-workflow' import { useSetWorkflowVarsWithValue } from '../../workflow/hooks/use-fetch-workflow-inspect-vars' import { useConfigsMap } from './use-configs-map' +import { API_PREFIX } from '@/config' +import { ContentType, getAccessToken, getBaseOptions } from '@/service/fetch' -type HandleRunMode = 'default' | 'schedule' +type HandleRunMode = 'default' | 'schedule' | 'webhook' type HandleRunOptions = { mode?: HandleRunMode scheduleNodeId?: string + webhookNodeId?: string } export const useWorkflowRun = () => { @@ -46,6 +50,8 @@ export const useWorkflowRun = () => { ...configsMap, }) + const abortControllerRef = useRef(null) + const { handleWorkflowStarted, handleWorkflowFinished, @@ -149,6 +155,7 @@ export const useWorkflowRun = () => { onNodeRetry, onAgentLog, onError, + onCompleted, ...restCallback } = callback || {} workflowStore.setState({ historyWorkflowData: undefined }) @@ -170,6 +177,13 @@ export const useWorkflowRun = () => { } url = `/apps/${appDetail.id}/workflows/draft/trigger/schedule/run` } + else if (runMode === 'webhook') { + if (!appDetail?.id) { + console.error('handleRun: missing app id for webhook trigger run') + return + } + url = `/apps/${appDetail.id}/workflows/draft/trigger/webhook/run` + } else if (appDetail?.mode === 'advanced-chat') { url = `/apps/${appDetail.id}/advanced-chat/workflows/draft/run` } @@ -179,7 +193,9 @@ export const useWorkflowRun = () => { const requestBody = runMode === 'schedule' ? { node_id: options?.scheduleNodeId } - : resolvedParams + : runMode === 'webhook' + ? { node_id: options?.webhookNodeId } + : resolvedParams if (!url) return @@ -189,16 +205,36 @@ export const useWorkflowRun = () => { return } + if (runMode === 'webhook' && !options?.webhookNodeId) { + console.error('handleRun: webhook trigger run requires node id') + return + } + + abortControllerRef.current?.abort() + abortControllerRef.current = null + const { setWorkflowRunningData, } = workflowStore.getState() - setWorkflowRunningData({ - result: { - status: WorkflowRunningStatus.Running, - }, - tracing: [], - resultText: '', - }) + + if (runMode === 'webhook') { + setWorkflowRunningData({ + result: { + status: WorkflowRunningStatus.Waiting, + }, + tracing: [], + resultText: 'Waiting for webhook call...', + }) + } + else { + setWorkflowRunningData({ + result: { + status: WorkflowRunningStatus.Running, + }, + tracing: [], + resultText: '', + }) + } let ttsUrl = '' let ttsIsPublic = false @@ -214,138 +250,309 @@ export const useWorkflowRun = () => { } const player = AudioPlayerManager.getInstance().getAudioPlayer(ttsUrl, ttsIsPublic, uuidV4(), 'none', 'none', noop) + const clearAbortController = () => { + abortControllerRef.current = null + } + + const wrappedOnError = (params: any) => { + clearAbortController() + handleWorkflowFailed() + + if (onError) + onError(params) + } + + const wrappedOnCompleted: IOtherOptions['onCompleted'] = async (hasError?: boolean, errorMessage?: string) => { + clearAbortController() + if (onCompleted) + onCompleted(hasError, errorMessage) + } + + const baseSseOptions: IOtherOptions = { + ...restCallback, + onWorkflowStarted: (params) => { + const state = workflowStore.getState() + if (state.workflowRunningData) { + state.setWorkflowRunningData(produce(state.workflowRunningData, (draft) => { + draft.resultText = '' + })) + } + handleWorkflowStarted(params) + + if (onWorkflowStarted) + onWorkflowStarted(params) + }, + onWorkflowFinished: (params) => { + handleWorkflowFinished(params) + + if (onWorkflowFinished) + onWorkflowFinished(params) + if (isInWorkflowDebug) { + fetchInspectVars({}) + invalidAllLastRun() + } + }, + onNodeStarted: (params) => { + handleWorkflowNodeStarted( + params, + { + clientWidth, + clientHeight, + }, + ) + + if (onNodeStarted) + onNodeStarted(params) + }, + onNodeFinished: (params) => { + handleWorkflowNodeFinished(params) + + if (onNodeFinished) + onNodeFinished(params) + }, + onIterationStart: (params) => { + handleWorkflowNodeIterationStarted( + params, + { + clientWidth, + clientHeight, + }, + ) + + if (onIterationStart) + onIterationStart(params) + }, + onIterationNext: (params) => { + handleWorkflowNodeIterationNext(params) + + if (onIterationNext) + onIterationNext(params) + }, + onIterationFinish: (params) => { + handleWorkflowNodeIterationFinished(params) + + if (onIterationFinish) + onIterationFinish(params) + }, + onLoopStart: (params) => { + handleWorkflowNodeLoopStarted( + params, + { + clientWidth, + clientHeight, + }, + ) + + if (onLoopStart) + onLoopStart(params) + }, + onLoopNext: (params) => { + handleWorkflowNodeLoopNext(params) + + if (onLoopNext) + onLoopNext(params) + }, + onLoopFinish: (params) => { + handleWorkflowNodeLoopFinished(params) + + if (onLoopFinish) + onLoopFinish(params) + }, + onNodeRetry: (params) => { + handleWorkflowNodeRetry(params) + + if (onNodeRetry) + onNodeRetry(params) + }, + onAgentLog: (params) => { + handleWorkflowAgentLog(params) + + if (onAgentLog) + onAgentLog(params) + }, + onTextChunk: (params) => { + handleWorkflowTextChunk(params) + }, + onTextReplace: (params) => { + handleWorkflowTextReplace(params) + }, + onTTSChunk: (messageId: string, audio: string) => { + if (!audio || audio === '') + return + player.playAudioWithAudio(audio, true) + AudioPlayerManager.getInstance().resetMsgId(messageId) + }, + onTTSEnd: (messageId: string, audio: string) => { + player.playAudioWithAudio(audio, false) + }, + onError: wrappedOnError, + onCompleted: wrappedOnCompleted, + } + + const waitWithAbort = (signal: AbortSignal, delay: number) => new Promise((resolve) => { + const timer = window.setTimeout(resolve, delay) + signal.addEventListener('abort', () => { + clearTimeout(timer) + resolve() + }, { once: true }) + }) + + const runWebhookDebug = async () => { + const urlWithPrefix = (url.startsWith('http://') || url.startsWith('https://')) + ? url + : `${API_PREFIX}${url.startsWith('/') ? url : `/${url}`}` + + const poll = async (): Promise => { + const controller = new AbortController() + abortControllerRef.current = controller + + try { + const baseOptions = getBaseOptions() + const headers = new Headers(baseOptions.headers as Headers) + headers.set('Content-Type', ContentType.json) + const accessToken = await getAccessToken() + headers.set('Authorization', `Bearer ${accessToken}`) + + const response = await fetch(urlWithPrefix, { + ...baseOptions, + method: 'POST', + headers, + body: JSON.stringify(requestBody), + signal: controller.signal, + }) + + if (controller.signal.aborted) + return + + if (!response.ok) { + const message = `Webhook debug request failed (${response.status})` + Toast.notify({ type: 'error', message }) + clearAbortController() + return + } + + const contentType = response.headers.get('Content-Type')?.toLowerCase() || '' + if (contentType.includes('application/json')) { + const data = await response.json() + if (controller.signal.aborted) + return + + if (data.status === 'waiting') { + const delay = Number(data.retry_in) || 2000 + await waitWithAbort(controller.signal, delay) + if (controller.signal.aborted) + return + await poll() + return + } + + const errorMessage = data.message || 'Webhook debug failed' + Toast.notify({ type: 'error', message: errorMessage }) + clearAbortController() + setWorkflowRunningData({ + result: { + status: WorkflowRunningStatus.Failed, + error: errorMessage, + inputs_truncated: false, + process_data_truncated: false, + outputs_truncated: false, + }, + tracing: [], + }) + return + } + + handleStream( + response, + baseSseOptions.onData ?? noop, + baseSseOptions.onCompleted, + baseSseOptions.onThought, + baseSseOptions.onMessageEnd, + baseSseOptions.onMessageReplace, + baseSseOptions.onFile, + baseSseOptions.onWorkflowStarted, + baseSseOptions.onWorkflowFinished, + baseSseOptions.onNodeStarted, + baseSseOptions.onNodeFinished, + baseSseOptions.onIterationStart, + baseSseOptions.onIterationNext, + baseSseOptions.onIterationFinish, + baseSseOptions.onLoopStart, + baseSseOptions.onLoopNext, + baseSseOptions.onLoopFinish, + baseSseOptions.onNodeRetry, + baseSseOptions.onParallelBranchStarted, + baseSseOptions.onParallelBranchFinished, + baseSseOptions.onTextChunk, + baseSseOptions.onTTSChunk, + baseSseOptions.onTTSEnd, + baseSseOptions.onTextReplace, + baseSseOptions.onAgentLog, + baseSseOptions.onDataSourceNodeProcessing, + baseSseOptions.onDataSourceNodeCompleted, + baseSseOptions.onDataSourceNodeError, + ) + } + catch (error) { + if (controller.signal.aborted) + return + console.error('handleRun: webhook debug polling error', error) + Toast.notify({ type: 'error', message: 'Webhook debug request failed' }) + clearAbortController() + setWorkflowRunningData({ + result: { + status: WorkflowRunningStatus.Failed, + error: 'Webhook debug request failed', + inputs_truncated: false, + process_data_truncated: false, + outputs_truncated: false, + }, + tracing: [], + }) + } + } + + await poll() + } + + if (runMode === 'webhook') { + await runWebhookDebug() + return + } + ssePost( url, { body: requestBody, }, { - onWorkflowStarted: (params) => { - handleWorkflowStarted(params) - - if (onWorkflowStarted) - onWorkflowStarted(params) + ...baseSseOptions, + getAbortController: (controller: AbortController) => { + abortControllerRef.current = controller }, - onWorkflowFinished: (params) => { - handleWorkflowFinished(params) - - if (onWorkflowFinished) - onWorkflowFinished(params) - if (isInWorkflowDebug) { - fetchInspectVars({}) - invalidAllLastRun() - } - }, - onError: (params) => { - handleWorkflowFailed() - - if (onError) - onError(params) - }, - onNodeStarted: (params) => { - handleWorkflowNodeStarted( - params, - { - clientWidth, - clientHeight, - }, - ) - - if (onNodeStarted) - onNodeStarted(params) - }, - onNodeFinished: (params) => { - handleWorkflowNodeFinished(params) - - if (onNodeFinished) - onNodeFinished(params) - }, - onIterationStart: (params) => { - handleWorkflowNodeIterationStarted( - params, - { - clientWidth, - clientHeight, - }, - ) - - if (onIterationStart) - onIterationStart(params) - }, - onIterationNext: (params) => { - handleWorkflowNodeIterationNext(params) - - if (onIterationNext) - onIterationNext(params) - }, - onIterationFinish: (params) => { - handleWorkflowNodeIterationFinished(params) - - if (onIterationFinish) - onIterationFinish(params) - }, - onLoopStart: (params) => { - handleWorkflowNodeLoopStarted( - params, - { - clientWidth, - clientHeight, - }, - ) - - if (onLoopStart) - onLoopStart(params) - }, - onLoopNext: (params) => { - handleWorkflowNodeLoopNext(params) - - if (onLoopNext) - onLoopNext(params) - }, - onLoopFinish: (params) => { - handleWorkflowNodeLoopFinished(params) - - if (onLoopFinish) - onLoopFinish(params) - }, - onNodeRetry: (params) => { - handleWorkflowNodeRetry(params) - - if (onNodeRetry) - onNodeRetry(params) - }, - onAgentLog: (params) => { - handleWorkflowAgentLog(params) - - if (onAgentLog) - onAgentLog(params) - }, - onTextChunk: (params) => { - handleWorkflowTextChunk(params) - }, - onTextReplace: (params) => { - handleWorkflowTextReplace(params) - }, - onTTSChunk: (messageId: string, audio: string) => { - if (!audio || audio === '') - return - player.playAudioWithAudio(audio, true) - AudioPlayerManager.getInstance().resetMsgId(messageId) - }, - onTTSEnd: (messageId: string, audio: string) => { - player.playAudioWithAudio(audio, false) - }, - ...restCallback, }, ) }, [store, doSyncWorkflowDraft, workflowStore, pathname, handleWorkflowStarted, handleWorkflowFinished, fetchInspectVars, invalidAllLastRun, handleWorkflowFailed, handleWorkflowNodeStarted, handleWorkflowNodeFinished, handleWorkflowNodeIterationStarted, handleWorkflowNodeIterationNext, handleWorkflowNodeIterationFinished, handleWorkflowNodeLoopStarted, handleWorkflowNodeLoopNext, handleWorkflowNodeLoopFinished, handleWorkflowNodeRetry, handleWorkflowAgentLog, handleWorkflowTextChunk, handleWorkflowTextReplace], ) const handleStopRun = useCallback((taskId: string) => { - const appId = useAppStore.getState().appDetail?.id + if (taskId) { + const appId = useAppStore.getState().appDetail?.id + stopWorkflowRun(`/apps/${appId}/workflow-runs/tasks/${taskId}/stop`) + return + } - stopWorkflowRun(`/apps/${appId}/workflow-runs/tasks/${taskId}/stop`) - }, []) + abortControllerRef.current?.abort() + abortControllerRef.current = null + const { setWorkflowRunningData } = workflowStore.getState() + setWorkflowRunningData({ + result: { + status: WorkflowRunningStatus.Stopped, + }, + tracing: [], + resultText: '', + }) + }, [workflowStore]) const handleRestoreFromPublishedWorkflow = useCallback((publishedWorkflow: VersionHistory) => { const nodes = publishedWorkflow.graph.nodes.map(node => ({ ...node, selected: false, data: { ...node.data, selected: false } })) diff --git a/web/app/components/workflow-app/hooks/use-workflow-start-run.tsx b/web/app/components/workflow-app/hooks/use-workflow-start-run.tsx index f16d817359..05a67194ff 100644 --- a/web/app/components/workflow-app/hooks/use-workflow-start-run.tsx +++ b/web/app/components/workflow-app/hooks/use-workflow-start-run.tsx @@ -105,6 +105,48 @@ export const useWorkflowStartRun = () => { setShowInputsPanel(false) }, [store, workflowStore, handleCancelDebugAndPreviewPanel, handleRun, doSyncWorkflowDraft]) + const handleWorkflowTriggerWebhookRunInWorkflow = useCallback(async ({ nodeId }: { nodeId: string }) => { + if (!nodeId) + return + + const { + workflowRunningData, + showDebugAndPreviewPanel, + setShowDebugAndPreviewPanel, + setShowInputsPanel, + setShowEnvPanel, + } = workflowStore.getState() + + if (workflowRunningData?.result.status === WorkflowRunningStatus.Running) + return + + const { getNodes } = store.getState() + const nodes = getNodes() + const webhookNode = nodes.find(node => node.id === nodeId && node.data.type === BlockEnum.TriggerWebhook) + + if (!webhookNode) { + console.warn('handleWorkflowTriggerWebhookRunInWorkflow: webhook node not found', nodeId) + return + } + + setShowEnvPanel(false) + + if (!showDebugAndPreviewPanel) + setShowDebugAndPreviewPanel(true) + + setShowInputsPanel(false) + + await doSyncWorkflowDraft() + handleRun( + { node_id: nodeId }, + undefined, + { + mode: 'webhook', + webhookNodeId: nodeId, + }, + ) + }, [store, workflowStore, handleRun, doSyncWorkflowDraft]) + const handleWorkflowStartRunInChatflow = useCallback(async () => { const { showDebugAndPreviewPanel, @@ -137,5 +179,6 @@ export const useWorkflowStartRun = () => { handleWorkflowStartRunInWorkflow, handleWorkflowStartRunInChatflow, handleWorkflowTriggerScheduleRunInWorkflow, + handleWorkflowTriggerWebhookRunInWorkflow, } } diff --git a/web/app/components/workflow/header/run-mode.tsx b/web/app/components/workflow/header/run-mode.tsx index afb7dbd86a..5f6dbd820c 100644 --- a/web/app/components/workflow/header/run-mode.tsx +++ b/web/app/components/workflow/header/run-mode.tsx @@ -23,12 +23,15 @@ const RunMode = ({ const { handleWorkflowStartRunInWorkflow, handleWorkflowTriggerScheduleRunInWorkflow, + handleWorkflowTriggerWebhookRunInWorkflow, } = useWorkflowStartRun() const { handleStopRun } = useWorkflowRun() const { validateBeforeRun } = useWorkflowRunValidation() const workflowRunningData = useStore(s => s.workflowRunningData) - const isRunning = workflowRunningData?.result.status === WorkflowRunningStatus.Running + const status = workflowRunningData?.result.status + const isWaiting = status === WorkflowRunningStatus.Waiting + const isRunning = status === WorkflowRunningStatus.Running || isWaiting const dynamicOptions = useDynamicTestRunOptions() const testRunMenuRef = useRef(null) @@ -59,6 +62,10 @@ const RunMode = ({ else if (option.type === 'schedule') { handleWorkflowTriggerScheduleRunInWorkflow(option.nodeId) } + else if (option.type === 'webhook') { + if (option.nodeId) + handleWorkflowTriggerWebhookRunInWorkflow({ nodeId: option.nodeId, debugUrl: option.debugUrl }) + } else { // Placeholder for trigger-specific execution logic for schedule, webhook, plugin types console.log('TODO: Handle trigger execution for type:', option.type, 'nodeId:', option.nodeId) @@ -67,6 +74,7 @@ const RunMode = ({ validateBeforeRun, handleWorkflowStartRunInWorkflow, handleWorkflowTriggerScheduleRunInWorkflow, + handleWorkflowTriggerWebhookRunInWorkflow, ]) const { eventEmitter } = useEventEmitterContextContext() @@ -88,7 +96,7 @@ const RunMode = ({ disabled={true} > - {t('workflow.common.running')} + {isWaiting ? t('workflow.common.waiting', { defaultValue: 'Waiting' }) : t('workflow.common.running')} ) : ( diff --git a/web/app/components/workflow/hooks-store/store.ts b/web/app/components/workflow/hooks-store/store.ts index 8ae3739f8a..f863223fd6 100644 --- a/web/app/components/workflow/hooks-store/store.ts +++ b/web/app/components/workflow/hooks-store/store.ts @@ -46,6 +46,7 @@ export type CommonHooksFnMap = { handleWorkflowStartRunInWorkflow: () => void handleWorkflowStartRunInChatflow: () => void handleWorkflowTriggerScheduleRunInWorkflow: (nodeId?: string) => void + handleWorkflowTriggerWebhookRunInWorkflow: (params: { nodeId: string }) => void availableNodesMetaData?: AvailableNodesMetaData getWorkflowRunAndTraceUrl: (runId?: string) => { runUrl: string; traceUrl: string } exportCheck?: () => Promise @@ -89,6 +90,7 @@ export const createHooksStore = ({ handleWorkflowStartRunInWorkflow = noop, handleWorkflowStartRunInChatflow = noop, handleWorkflowTriggerScheduleRunInWorkflow = noop, + handleWorkflowTriggerWebhookRunInWorkflow = noop, availableNodesMetaData = { nodes: [], }, @@ -128,6 +130,7 @@ export const createHooksStore = ({ handleWorkflowStartRunInWorkflow, handleWorkflowStartRunInChatflow, handleWorkflowTriggerScheduleRunInWorkflow, + handleWorkflowTriggerWebhookRunInWorkflow, availableNodesMetaData, getWorkflowRunAndTraceUrl, exportCheck, diff --git a/web/app/components/workflow/hooks/use-workflow-start-run.tsx b/web/app/components/workflow/hooks/use-workflow-start-run.tsx index 3e07a9aae4..0713786fab 100644 --- a/web/app/components/workflow/hooks/use-workflow-start-run.tsx +++ b/web/app/components/workflow/hooks/use-workflow-start-run.tsx @@ -5,11 +5,13 @@ export const useWorkflowStartRun = () => { const handleWorkflowStartRunInWorkflow = useHooksStore(s => s.handleWorkflowStartRunInWorkflow) const handleWorkflowStartRunInChatflow = useHooksStore(s => s.handleWorkflowStartRunInChatflow) const handleWorkflowTriggerScheduleRunInWorkflow = useHooksStore(s => s.handleWorkflowTriggerScheduleRunInWorkflow) + const handleWorkflowTriggerWebhookRunInWorkflow = useHooksStore(s => s.handleWorkflowTriggerWebhookRunInWorkflow) return { handleStartWorkflowRun, handleWorkflowStartRunInWorkflow, handleWorkflowStartRunInChatflow, handleWorkflowTriggerScheduleRunInWorkflow, + handleWorkflowTriggerWebhookRunInWorkflow, } } diff --git a/web/service/base.ts b/web/service/base.ts index 526c8d75d2..b0a2332e56 100644 --- a/web/service/base.ts +++ b/web/service/base.ts @@ -140,7 +140,7 @@ export function format(text: string) { return res.replaceAll('\n', '
').replaceAll('```', '') } -const handleStream = ( +export const handleStream = ( response: Response, onData: IOnData, onCompleted?: IOnCompleted,