debug webhook node

This commit is contained in:
hjlarry 2025-09-29 09:28:01 +08:00
parent 6b677c16ce
commit 6e6198c64e
11 changed files with 620 additions and 159 deletions

View File

@ -35,7 +35,8 @@ from models.workflow import Workflow
from services.app_generate_service import AppGenerateService
from services.errors.app import WorkflowHashNotEqualError
from services.errors.llm import InvokeRateLimitError
from services.trigger_debug_service import TriggerDebugService
from services.trigger_debug_service import TriggerDebugService, WebhookDebugService
from services.webhook_service import WebhookService
from services.workflow_service import DraftWorkflowDeletionError, WorkflowInUseError, WorkflowService
logger = logging.getLogger(__name__)
@ -1152,6 +1153,90 @@ class DraftWorkflowTriggerRunApi(Resource):
), 500
@console_ns.route("/apps/<uuid:app_id>/workflows/draft/trigger/webhook/run")
class DraftWorkflowTriggerWebhookRunApi(Resource):
"""
Full workflow debug when the start node is a webhook trigger
Path: /apps/<uuid:app_id>/workflows/draft/trigger/webhook/run
"""
@api.doc("draft_workflow_trigger_webhook_run")
@api.doc(description="Full workflow debug when the start node is a webhook trigger")
@api.doc(params={"app_id": "Application ID"})
@api.expect(
api.model(
"DraftWorkflowTriggerWebhookRunRequest",
{
"node_id": fields.String(required=True, description="Node ID"),
}
)
)
@api.response(200, "Workflow executed successfully")
@api.response(403, "Permission denied")
@api.response(500, "Internal server error")
@setup_required
@login_required
@account_initialization_required
@get_app_model(mode=[AppMode.WORKFLOW])
def post(self, app_model: App):
"""
Full workflow debug when the start node is a webhook trigger
"""
if not isinstance(current_user, Account) or not current_user.has_edit_permission:
raise Forbidden()
parser = reqparse.RequestParser()
parser.add_argument("node_id", type=str, required=True, location="json", nullable=False)
args = parser.parse_args()
node_id = args["node_id"]
event = WebhookDebugService.poll_event(
tenant_id=app_model.tenant_id,
user_id=current_user.id,
app_id=app_model.id,
node_id=node_id,
)
if not event:
return jsonable_encoder({"status": "waiting", "retry_in": 2000})
payload = event.payload or {}
workflow_inputs = payload.get("inputs")
if workflow_inputs is None:
webhook_data = payload.get("webhook_data", {})
workflow_inputs = WebhookService.build_workflow_inputs(webhook_data)
workflow_args = {
"inputs": workflow_inputs or {},
"query": "",
"files": [],
}
external_trace_id = get_external_trace_id(request)
if external_trace_id:
workflow_args["external_trace_id"] = external_trace_id
try:
response = AppGenerateService.generate(
app_model=app_model,
user=current_user,
args=workflow_args,
invoke_from=InvokeFrom.DEBUGGER,
streaming=True,
root_node_id=node_id,
)
return helper.compact_generate_response(response)
except InvokeRateLimitError as ex:
raise InvokeRateLimitHttpError(ex.description)
except Exception:
logger.exception("Error running draft workflow trigger webhook run")
return jsonable_encoder(
{
"status": "error",
}
), 500
@console_ns.route("/apps/<uuid:app_id>/workflows/draft/trigger/schedule/run")
class DraftWorkflowTriggerScheduleRunApi(Resource):
"""

View File

@ -1,16 +1,28 @@
import logging
import time
from flask import jsonify
from werkzeug.exceptions import NotFound, RequestEntityTooLarge
from controllers.trigger import bp
from services.trigger_debug_service import WebhookDebugService
from services.webhook_service import WebhookService
logger = logging.getLogger(__name__)
def _prepare_webhook_execution(webhook_id: str):
"""Fetch trigger context, extract request data, and validate payload."""
webhook_trigger, workflow, node_config = WebhookService.get_webhook_trigger_and_workflow(webhook_id)
webhook_data = WebhookService.extract_webhook_data(webhook_trigger)
validation_result = WebhookService.validate_webhook_request(webhook_data, node_config)
if not validation_result["valid"]:
return webhook_trigger, workflow, node_config, webhook_data, validation_result["error"]
return webhook_trigger, workflow, node_config, webhook_data, None
@bp.route("/webhook/<string:webhook_id>", methods=["GET", "POST", "PUT", "PATCH", "DELETE", "HEAD", "OPTIONS"])
@bp.route("/webhook-debug/<string:webhook_id>", methods=["GET", "POST", "PUT", "PATCH", "DELETE", "HEAD", "OPTIONS"])
def handle_webhook(webhook_id: str):
"""
Handle webhook trigger calls.
@ -19,16 +31,9 @@ def handle_webhook(webhook_id: str):
configured webhook trigger settings.
"""
try:
# Get webhook trigger, workflow, and node configuration
webhook_trigger, workflow, node_config = WebhookService.get_webhook_trigger_and_workflow(webhook_id)
# Extract request data
webhook_data = WebhookService.extract_webhook_data(webhook_trigger)
# Validate request against node configuration
validation_result = WebhookService.validate_webhook_request(webhook_data, node_config)
if not validation_result["valid"]:
return jsonify({"error": "Bad Request", "message": validation_result["error"]}), 400
webhook_trigger, workflow, node_config, webhook_data, error = _prepare_webhook_execution(webhook_id)
if error:
return jsonify({"error": "Bad Request", "message": error}), 400
# Process webhook call (send to Celery)
WebhookService.trigger_workflow_execution(webhook_trigger, webhook_data, workflow)
@ -44,3 +49,36 @@ def handle_webhook(webhook_id: str):
except Exception as e:
logger.exception("Webhook processing failed for %s", webhook_id)
return jsonify({"error": "Internal server error", "message": str(e)}), 500
@bp.route("/webhook-debug/<string:webhook_id>", methods=["GET", "POST", "PUT", "PATCH", "DELETE", "HEAD", "OPTIONS"])
def handle_webhook_debug(webhook_id: str):
"""Handle webhook debug calls without triggering production workflow execution."""
try:
webhook_trigger, workflow, node_config, webhook_data, error = _prepare_webhook_execution(webhook_id)
if error:
return jsonify({"error": "Bad Request", "message": error}), 400
workflow_inputs = WebhookService.build_workflow_inputs(webhook_data)
WebhookDebugService.dispatch_event(
tenant_id=webhook_trigger.tenant_id,
app_id=webhook_trigger.app_id,
node_id=webhook_trigger.node_id,
request_id=f"webhook_debug_{webhook_trigger.webhook_id}_{int(time.time() * 1000)}",
timestamp=int(time.time()),
payload={
"inputs": workflow_inputs,
"webhook_data": webhook_data,
"method": webhook_data.get("method"),
},
)
response_data, status_code = WebhookService.generate_webhook_response(node_config)
return jsonify(response_data), status_code
except ValueError as e:
raise NotFound(str(e))
except RequestEntityTooLarge:
raise
except Exception as e:
logger.exception("Webhook debug processing failed for %s", webhook_id)
return jsonify({"error": "Internal server error", "message": str(e)}), 500

View File

@ -1,10 +1,10 @@
"""Trigger debug service for webhook debugging in draft workflows."""
"""Trigger debug service supporting plugin and webhook debugging in draft workflows."""
import hashlib
import logging
from typing import Optional
from typing import Any, Optional
from pydantic import BaseModel
from pydantic import BaseModel, Field
from redis import RedisError
from extensions.ext_redis import redis_client
@ -20,6 +20,18 @@ class TriggerDebugEvent(BaseModel):
timestamp: int
class WebhookDebugEvent(BaseModel):
request_id: str
timestamp: int
node_id: str
payload: dict[str, Any] = Field(default_factory=dict)
def _address(tenant_id: str, user_id: str, app_id: str, node_id: str) -> str:
address_id = hashlib.sha1(f"{user_id}|{app_id}|{node_id}".encode()).hexdigest()
return f"trigger_debug_inbox:{{{tenant_id}}}:{address_id}"
class TriggerDebugService:
"""
Redis-based trigger debug service with polling support.
@ -54,11 +66,6 @@ class TriggerDebugService:
"return #a"
)
@classmethod
def address(cls, tenant_id: str, user_id: str, app_id: str, node_id: str) -> str:
address_id = hashlib.sha1(f"{user_id}|{app_id}|{node_id}".encode()).hexdigest()
return f"trigger_debug_inbox:{{{tenant_id}}}:{address_id}"
@classmethod
def waiting_pool(cls, tenant_id: str, subscription_id: str, trigger_name: str) -> str:
return f"trigger_debug_waiting_pool:{{{tenant_id}}}:{subscription_id}:{trigger_name}"
@ -116,7 +123,7 @@ class TriggerDebugService:
event = redis_client.eval(
cls.LUA_SELECT,
2,
cls.address(tenant_id, user_id, app_id, node_id),
_address(tenant_id, user_id, app_id, node_id),
cls.waiting_pool(tenant_id, subscription_id, trigger_name),
address_id,
)
@ -124,3 +131,63 @@ class TriggerDebugService:
except RedisError:
logger.exception("Failed to poll debug event")
return None
class WebhookDebugService:
"""Debug helpers dedicated to webhook triggers."""
@staticmethod
def waiting_pool(tenant_id: str, app_id: str, node_id: str) -> str:
return f"trigger_debug_waiting_pool:{{{tenant_id}}}:{app_id}:{node_id}"
@classmethod
def dispatch_event(
cls,
tenant_id: str,
app_id: str,
node_id: str,
request_id: str,
timestamp: int,
payload: dict[str, Any],
) -> int:
event_json = WebhookDebugEvent(
request_id=request_id,
timestamp=timestamp,
node_id=node_id,
payload=payload,
).model_dump_json()
try:
return redis_client.eval(
TriggerDebugService.LUA_DISPATCH,
1,
cls.waiting_pool(tenant_id, app_id, node_id),
tenant_id,
event_json,
)
except RedisError:
logger.exception("Failed to dispatch webhook debug event")
return 0
@classmethod
def poll_event(
cls,
tenant_id: str,
user_id: str,
app_id: str,
node_id: str,
) -> Optional[WebhookDebugEvent]:
address_id = hashlib.sha1(f"{user_id}|{app_id}|{node_id}".encode()).hexdigest()
try:
event = redis_client.eval(
TriggerDebugService.LUA_SELECT,
2,
_address(tenant_id, user_id, app_id, node_id),
cls.waiting_pool(tenant_id, app_id, node_id),
address_id,
)
return WebhookDebugEvent.model_validate_json(event) if event else None
except RedisError:
logger.exception("Failed to poll webhook debug event")
return None

View File

@ -522,6 +522,16 @@ class WebhookService:
except ValueError:
return False
@classmethod
def build_workflow_inputs(cls, webhook_data: dict[str, Any]) -> dict[str, Any]:
"""Construct workflow inputs payload from webhook data."""
return {
"webhook_data": webhook_data,
"webhook_headers": webhook_data.get("headers", {}),
"webhook_query_params": webhook_data.get("query_params", {}),
"webhook_body": webhook_data.get("body", {}),
}
@classmethod
def trigger_workflow_execution(
cls, webhook_trigger: WorkflowWebhookTrigger, webhook_data: dict[str, Any], workflow: Workflow
@ -545,12 +555,7 @@ class WebhookService:
# Prepare inputs for the webhook node
# The webhook node expects webhook_data in the inputs
workflow_inputs = {
"webhook_data": webhook_data,
"webhook_headers": webhook_data.get("headers", {}),
"webhook_query_params": webhook_data.get("query_params", {}),
"webhook_body": webhook_data.get("body", {}),
}
workflow_inputs = cls.build_workflow_inputs(webhook_data)
# Create trigger data
trigger_data = TriggerData(

View File

@ -67,6 +67,7 @@ const WorkflowMain = ({
handleWorkflowStartRunInChatflow,
handleWorkflowStartRunInWorkflow,
handleWorkflowTriggerScheduleRunInWorkflow,
handleWorkflowTriggerWebhookRunInWorkflow,
} = useWorkflowStartRun()
const availableNodesMetaData = useAvailableNodesMetaData()
const { getWorkflowRunAndTraceUrl } = useGetRunAndTraceUrl()
@ -110,6 +111,7 @@ const WorkflowMain = ({
handleWorkflowStartRunInChatflow,
handleWorkflowStartRunInWorkflow,
handleWorkflowTriggerScheduleRunInWorkflow,
handleWorkflowTriggerWebhookRunInWorkflow,
availableNodesMetaData,
getWorkflowRunAndTraceUrl,
exportCheck,
@ -144,6 +146,7 @@ const WorkflowMain = ({
handleWorkflowStartRunInChatflow,
handleWorkflowStartRunInWorkflow,
handleWorkflowTriggerScheduleRunInWorkflow,
handleWorkflowTriggerWebhookRunInWorkflow,
availableNodesMetaData,
getWorkflowRunAndTraceUrl,
exportCheck,

View File

@ -1,4 +1,4 @@
import { useCallback } from 'react'
import { useCallback, useRef } from 'react'
import {
useReactFlow,
useStoreApi,
@ -12,7 +12,8 @@ import { useWorkflowUpdate } from '@/app/components/workflow/hooks/use-workflow-
import { useWorkflowRunEvent } from '@/app/components/workflow/hooks/use-workflow-run-event/use-workflow-run-event'
import { useStore as useAppStore } from '@/app/components/app/store'
import type { IOtherOptions } from '@/service/base'
import { ssePost } from '@/service/base'
import Toast from '@/app/components/base/toast'
import { handleStream, ssePost } from '@/service/base'
import { stopWorkflowRun } from '@/service/workflow'
import { useFeaturesStore } from '@/app/components/base/features/hooks'
import { AudioPlayerManager } from '@/app/components/base/audio-btn/audio.player.manager'
@ -22,12 +23,15 @@ import { useNodesSyncDraft } from './use-nodes-sync-draft'
import { useInvalidAllLastRun } from '@/service/use-workflow'
import { useSetWorkflowVarsWithValue } from '../../workflow/hooks/use-fetch-workflow-inspect-vars'
import { useConfigsMap } from './use-configs-map'
import { API_PREFIX } from '@/config'
import { ContentType, getAccessToken, getBaseOptions } from '@/service/fetch'
type HandleRunMode = 'default' | 'schedule'
type HandleRunMode = 'default' | 'schedule' | 'webhook'
type HandleRunOptions = {
mode?: HandleRunMode
scheduleNodeId?: string
webhookNodeId?: string
}
export const useWorkflowRun = () => {
@ -46,6 +50,8 @@ export const useWorkflowRun = () => {
...configsMap,
})
const abortControllerRef = useRef<AbortController | null>(null)
const {
handleWorkflowStarted,
handleWorkflowFinished,
@ -149,6 +155,7 @@ export const useWorkflowRun = () => {
onNodeRetry,
onAgentLog,
onError,
onCompleted,
...restCallback
} = callback || {}
workflowStore.setState({ historyWorkflowData: undefined })
@ -170,6 +177,13 @@ export const useWorkflowRun = () => {
}
url = `/apps/${appDetail.id}/workflows/draft/trigger/schedule/run`
}
else if (runMode === 'webhook') {
if (!appDetail?.id) {
console.error('handleRun: missing app id for webhook trigger run')
return
}
url = `/apps/${appDetail.id}/workflows/draft/trigger/webhook/run`
}
else if (appDetail?.mode === 'advanced-chat') {
url = `/apps/${appDetail.id}/advanced-chat/workflows/draft/run`
}
@ -179,7 +193,9 @@ export const useWorkflowRun = () => {
const requestBody = runMode === 'schedule'
? { node_id: options?.scheduleNodeId }
: resolvedParams
: runMode === 'webhook'
? { node_id: options?.webhookNodeId }
: resolvedParams
if (!url)
return
@ -189,16 +205,36 @@ export const useWorkflowRun = () => {
return
}
if (runMode === 'webhook' && !options?.webhookNodeId) {
console.error('handleRun: webhook trigger run requires node id')
return
}
abortControllerRef.current?.abort()
abortControllerRef.current = null
const {
setWorkflowRunningData,
} = workflowStore.getState()
setWorkflowRunningData({
result: {
status: WorkflowRunningStatus.Running,
},
tracing: [],
resultText: '',
})
if (runMode === 'webhook') {
setWorkflowRunningData({
result: {
status: WorkflowRunningStatus.Waiting,
},
tracing: [],
resultText: 'Waiting for webhook call...',
})
}
else {
setWorkflowRunningData({
result: {
status: WorkflowRunningStatus.Running,
},
tracing: [],
resultText: '',
})
}
let ttsUrl = ''
let ttsIsPublic = false
@ -214,138 +250,309 @@ export const useWorkflowRun = () => {
}
const player = AudioPlayerManager.getInstance().getAudioPlayer(ttsUrl, ttsIsPublic, uuidV4(), 'none', 'none', noop)
const clearAbortController = () => {
abortControllerRef.current = null
}
const wrappedOnError = (params: any) => {
clearAbortController()
handleWorkflowFailed()
if (onError)
onError(params)
}
const wrappedOnCompleted: IOtherOptions['onCompleted'] = async (hasError?: boolean, errorMessage?: string) => {
clearAbortController()
if (onCompleted)
onCompleted(hasError, errorMessage)
}
const baseSseOptions: IOtherOptions = {
...restCallback,
onWorkflowStarted: (params) => {
const state = workflowStore.getState()
if (state.workflowRunningData) {
state.setWorkflowRunningData(produce(state.workflowRunningData, (draft) => {
draft.resultText = ''
}))
}
handleWorkflowStarted(params)
if (onWorkflowStarted)
onWorkflowStarted(params)
},
onWorkflowFinished: (params) => {
handleWorkflowFinished(params)
if (onWorkflowFinished)
onWorkflowFinished(params)
if (isInWorkflowDebug) {
fetchInspectVars({})
invalidAllLastRun()
}
},
onNodeStarted: (params) => {
handleWorkflowNodeStarted(
params,
{
clientWidth,
clientHeight,
},
)
if (onNodeStarted)
onNodeStarted(params)
},
onNodeFinished: (params) => {
handleWorkflowNodeFinished(params)
if (onNodeFinished)
onNodeFinished(params)
},
onIterationStart: (params) => {
handleWorkflowNodeIterationStarted(
params,
{
clientWidth,
clientHeight,
},
)
if (onIterationStart)
onIterationStart(params)
},
onIterationNext: (params) => {
handleWorkflowNodeIterationNext(params)
if (onIterationNext)
onIterationNext(params)
},
onIterationFinish: (params) => {
handleWorkflowNodeIterationFinished(params)
if (onIterationFinish)
onIterationFinish(params)
},
onLoopStart: (params) => {
handleWorkflowNodeLoopStarted(
params,
{
clientWidth,
clientHeight,
},
)
if (onLoopStart)
onLoopStart(params)
},
onLoopNext: (params) => {
handleWorkflowNodeLoopNext(params)
if (onLoopNext)
onLoopNext(params)
},
onLoopFinish: (params) => {
handleWorkflowNodeLoopFinished(params)
if (onLoopFinish)
onLoopFinish(params)
},
onNodeRetry: (params) => {
handleWorkflowNodeRetry(params)
if (onNodeRetry)
onNodeRetry(params)
},
onAgentLog: (params) => {
handleWorkflowAgentLog(params)
if (onAgentLog)
onAgentLog(params)
},
onTextChunk: (params) => {
handleWorkflowTextChunk(params)
},
onTextReplace: (params) => {
handleWorkflowTextReplace(params)
},
onTTSChunk: (messageId: string, audio: string) => {
if (!audio || audio === '')
return
player.playAudioWithAudio(audio, true)
AudioPlayerManager.getInstance().resetMsgId(messageId)
},
onTTSEnd: (messageId: string, audio: string) => {
player.playAudioWithAudio(audio, false)
},
onError: wrappedOnError,
onCompleted: wrappedOnCompleted,
}
const waitWithAbort = (signal: AbortSignal, delay: number) => new Promise<void>((resolve) => {
const timer = window.setTimeout(resolve, delay)
signal.addEventListener('abort', () => {
clearTimeout(timer)
resolve()
}, { once: true })
})
const runWebhookDebug = async () => {
const urlWithPrefix = (url.startsWith('http://') || url.startsWith('https://'))
? url
: `${API_PREFIX}${url.startsWith('/') ? url : `/${url}`}`
const poll = async (): Promise<void> => {
const controller = new AbortController()
abortControllerRef.current = controller
try {
const baseOptions = getBaseOptions()
const headers = new Headers(baseOptions.headers as Headers)
headers.set('Content-Type', ContentType.json)
const accessToken = await getAccessToken()
headers.set('Authorization', `Bearer ${accessToken}`)
const response = await fetch(urlWithPrefix, {
...baseOptions,
method: 'POST',
headers,
body: JSON.stringify(requestBody),
signal: controller.signal,
})
if (controller.signal.aborted)
return
if (!response.ok) {
const message = `Webhook debug request failed (${response.status})`
Toast.notify({ type: 'error', message })
clearAbortController()
return
}
const contentType = response.headers.get('Content-Type')?.toLowerCase() || ''
if (contentType.includes('application/json')) {
const data = await response.json()
if (controller.signal.aborted)
return
if (data.status === 'waiting') {
const delay = Number(data.retry_in) || 2000
await waitWithAbort(controller.signal, delay)
if (controller.signal.aborted)
return
await poll()
return
}
const errorMessage = data.message || 'Webhook debug failed'
Toast.notify({ type: 'error', message: errorMessage })
clearAbortController()
setWorkflowRunningData({
result: {
status: WorkflowRunningStatus.Failed,
error: errorMessage,
inputs_truncated: false,
process_data_truncated: false,
outputs_truncated: false,
},
tracing: [],
})
return
}
handleStream(
response,
baseSseOptions.onData ?? noop,
baseSseOptions.onCompleted,
baseSseOptions.onThought,
baseSseOptions.onMessageEnd,
baseSseOptions.onMessageReplace,
baseSseOptions.onFile,
baseSseOptions.onWorkflowStarted,
baseSseOptions.onWorkflowFinished,
baseSseOptions.onNodeStarted,
baseSseOptions.onNodeFinished,
baseSseOptions.onIterationStart,
baseSseOptions.onIterationNext,
baseSseOptions.onIterationFinish,
baseSseOptions.onLoopStart,
baseSseOptions.onLoopNext,
baseSseOptions.onLoopFinish,
baseSseOptions.onNodeRetry,
baseSseOptions.onParallelBranchStarted,
baseSseOptions.onParallelBranchFinished,
baseSseOptions.onTextChunk,
baseSseOptions.onTTSChunk,
baseSseOptions.onTTSEnd,
baseSseOptions.onTextReplace,
baseSseOptions.onAgentLog,
baseSseOptions.onDataSourceNodeProcessing,
baseSseOptions.onDataSourceNodeCompleted,
baseSseOptions.onDataSourceNodeError,
)
}
catch (error) {
if (controller.signal.aborted)
return
console.error('handleRun: webhook debug polling error', error)
Toast.notify({ type: 'error', message: 'Webhook debug request failed' })
clearAbortController()
setWorkflowRunningData({
result: {
status: WorkflowRunningStatus.Failed,
error: 'Webhook debug request failed',
inputs_truncated: false,
process_data_truncated: false,
outputs_truncated: false,
},
tracing: [],
})
}
}
await poll()
}
if (runMode === 'webhook') {
await runWebhookDebug()
return
}
ssePost(
url,
{
body: requestBody,
},
{
onWorkflowStarted: (params) => {
handleWorkflowStarted(params)
if (onWorkflowStarted)
onWorkflowStarted(params)
...baseSseOptions,
getAbortController: (controller: AbortController) => {
abortControllerRef.current = controller
},
onWorkflowFinished: (params) => {
handleWorkflowFinished(params)
if (onWorkflowFinished)
onWorkflowFinished(params)
if (isInWorkflowDebug) {
fetchInspectVars({})
invalidAllLastRun()
}
},
onError: (params) => {
handleWorkflowFailed()
if (onError)
onError(params)
},
onNodeStarted: (params) => {
handleWorkflowNodeStarted(
params,
{
clientWidth,
clientHeight,
},
)
if (onNodeStarted)
onNodeStarted(params)
},
onNodeFinished: (params) => {
handleWorkflowNodeFinished(params)
if (onNodeFinished)
onNodeFinished(params)
},
onIterationStart: (params) => {
handleWorkflowNodeIterationStarted(
params,
{
clientWidth,
clientHeight,
},
)
if (onIterationStart)
onIterationStart(params)
},
onIterationNext: (params) => {
handleWorkflowNodeIterationNext(params)
if (onIterationNext)
onIterationNext(params)
},
onIterationFinish: (params) => {
handleWorkflowNodeIterationFinished(params)
if (onIterationFinish)
onIterationFinish(params)
},
onLoopStart: (params) => {
handleWorkflowNodeLoopStarted(
params,
{
clientWidth,
clientHeight,
},
)
if (onLoopStart)
onLoopStart(params)
},
onLoopNext: (params) => {
handleWorkflowNodeLoopNext(params)
if (onLoopNext)
onLoopNext(params)
},
onLoopFinish: (params) => {
handleWorkflowNodeLoopFinished(params)
if (onLoopFinish)
onLoopFinish(params)
},
onNodeRetry: (params) => {
handleWorkflowNodeRetry(params)
if (onNodeRetry)
onNodeRetry(params)
},
onAgentLog: (params) => {
handleWorkflowAgentLog(params)
if (onAgentLog)
onAgentLog(params)
},
onTextChunk: (params) => {
handleWorkflowTextChunk(params)
},
onTextReplace: (params) => {
handleWorkflowTextReplace(params)
},
onTTSChunk: (messageId: string, audio: string) => {
if (!audio || audio === '')
return
player.playAudioWithAudio(audio, true)
AudioPlayerManager.getInstance().resetMsgId(messageId)
},
onTTSEnd: (messageId: string, audio: string) => {
player.playAudioWithAudio(audio, false)
},
...restCallback,
},
)
}, [store, doSyncWorkflowDraft, workflowStore, pathname, handleWorkflowStarted, handleWorkflowFinished, fetchInspectVars, invalidAllLastRun, handleWorkflowFailed, handleWorkflowNodeStarted, handleWorkflowNodeFinished, handleWorkflowNodeIterationStarted, handleWorkflowNodeIterationNext, handleWorkflowNodeIterationFinished, handleWorkflowNodeLoopStarted, handleWorkflowNodeLoopNext, handleWorkflowNodeLoopFinished, handleWorkflowNodeRetry, handleWorkflowAgentLog, handleWorkflowTextChunk, handleWorkflowTextReplace],
)
const handleStopRun = useCallback((taskId: string) => {
const appId = useAppStore.getState().appDetail?.id
if (taskId) {
const appId = useAppStore.getState().appDetail?.id
stopWorkflowRun(`/apps/${appId}/workflow-runs/tasks/${taskId}/stop`)
return
}
stopWorkflowRun(`/apps/${appId}/workflow-runs/tasks/${taskId}/stop`)
}, [])
abortControllerRef.current?.abort()
abortControllerRef.current = null
const { setWorkflowRunningData } = workflowStore.getState()
setWorkflowRunningData({
result: {
status: WorkflowRunningStatus.Stopped,
},
tracing: [],
resultText: '',
})
}, [workflowStore])
const handleRestoreFromPublishedWorkflow = useCallback((publishedWorkflow: VersionHistory) => {
const nodes = publishedWorkflow.graph.nodes.map(node => ({ ...node, selected: false, data: { ...node.data, selected: false } }))

View File

@ -105,6 +105,48 @@ export const useWorkflowStartRun = () => {
setShowInputsPanel(false)
}, [store, workflowStore, handleCancelDebugAndPreviewPanel, handleRun, doSyncWorkflowDraft])
const handleWorkflowTriggerWebhookRunInWorkflow = useCallback(async ({ nodeId }: { nodeId: string }) => {
if (!nodeId)
return
const {
workflowRunningData,
showDebugAndPreviewPanel,
setShowDebugAndPreviewPanel,
setShowInputsPanel,
setShowEnvPanel,
} = workflowStore.getState()
if (workflowRunningData?.result.status === WorkflowRunningStatus.Running)
return
const { getNodes } = store.getState()
const nodes = getNodes()
const webhookNode = nodes.find(node => node.id === nodeId && node.data.type === BlockEnum.TriggerWebhook)
if (!webhookNode) {
console.warn('handleWorkflowTriggerWebhookRunInWorkflow: webhook node not found', nodeId)
return
}
setShowEnvPanel(false)
if (!showDebugAndPreviewPanel)
setShowDebugAndPreviewPanel(true)
setShowInputsPanel(false)
await doSyncWorkflowDraft()
handleRun(
{ node_id: nodeId },
undefined,
{
mode: 'webhook',
webhookNodeId: nodeId,
},
)
}, [store, workflowStore, handleRun, doSyncWorkflowDraft])
const handleWorkflowStartRunInChatflow = useCallback(async () => {
const {
showDebugAndPreviewPanel,
@ -137,5 +179,6 @@ export const useWorkflowStartRun = () => {
handleWorkflowStartRunInWorkflow,
handleWorkflowStartRunInChatflow,
handleWorkflowTriggerScheduleRunInWorkflow,
handleWorkflowTriggerWebhookRunInWorkflow,
}
}

View File

@ -23,12 +23,15 @@ const RunMode = ({
const {
handleWorkflowStartRunInWorkflow,
handleWorkflowTriggerScheduleRunInWorkflow,
handleWorkflowTriggerWebhookRunInWorkflow,
} = useWorkflowStartRun()
const { handleStopRun } = useWorkflowRun()
const { validateBeforeRun } = useWorkflowRunValidation()
const workflowRunningData = useStore(s => s.workflowRunningData)
const isRunning = workflowRunningData?.result.status === WorkflowRunningStatus.Running
const status = workflowRunningData?.result.status
const isWaiting = status === WorkflowRunningStatus.Waiting
const isRunning = status === WorkflowRunningStatus.Running || isWaiting
const dynamicOptions = useDynamicTestRunOptions()
const testRunMenuRef = useRef<TestRunMenuRef>(null)
@ -59,6 +62,10 @@ const RunMode = ({
else if (option.type === 'schedule') {
handleWorkflowTriggerScheduleRunInWorkflow(option.nodeId)
}
else if (option.type === 'webhook') {
if (option.nodeId)
handleWorkflowTriggerWebhookRunInWorkflow({ nodeId: option.nodeId, debugUrl: option.debugUrl })
}
else {
// Placeholder for trigger-specific execution logic for schedule, webhook, plugin types
console.log('TODO: Handle trigger execution for type:', option.type, 'nodeId:', option.nodeId)
@ -67,6 +74,7 @@ const RunMode = ({
validateBeforeRun,
handleWorkflowStartRunInWorkflow,
handleWorkflowTriggerScheduleRunInWorkflow,
handleWorkflowTriggerWebhookRunInWorkflow,
])
const { eventEmitter } = useEventEmitterContextContext()
@ -88,7 +96,7 @@ const RunMode = ({
disabled={true}
>
<RiLoader2Line className='mr-1 size-4 animate-spin' />
{t('workflow.common.running')}
{isWaiting ? t('workflow.common.waiting', { defaultValue: 'Waiting' }) : t('workflow.common.running')}
</button>
)
: (

View File

@ -46,6 +46,7 @@ export type CommonHooksFnMap = {
handleWorkflowStartRunInWorkflow: () => void
handleWorkflowStartRunInChatflow: () => void
handleWorkflowTriggerScheduleRunInWorkflow: (nodeId?: string) => void
handleWorkflowTriggerWebhookRunInWorkflow: (params: { nodeId: string }) => void
availableNodesMetaData?: AvailableNodesMetaData
getWorkflowRunAndTraceUrl: (runId?: string) => { runUrl: string; traceUrl: string }
exportCheck?: () => Promise<void>
@ -89,6 +90,7 @@ export const createHooksStore = ({
handleWorkflowStartRunInWorkflow = noop,
handleWorkflowStartRunInChatflow = noop,
handleWorkflowTriggerScheduleRunInWorkflow = noop,
handleWorkflowTriggerWebhookRunInWorkflow = noop,
availableNodesMetaData = {
nodes: [],
},
@ -128,6 +130,7 @@ export const createHooksStore = ({
handleWorkflowStartRunInWorkflow,
handleWorkflowStartRunInChatflow,
handleWorkflowTriggerScheduleRunInWorkflow,
handleWorkflowTriggerWebhookRunInWorkflow,
availableNodesMetaData,
getWorkflowRunAndTraceUrl,
exportCheck,

View File

@ -5,11 +5,13 @@ export const useWorkflowStartRun = () => {
const handleWorkflowStartRunInWorkflow = useHooksStore(s => s.handleWorkflowStartRunInWorkflow)
const handleWorkflowStartRunInChatflow = useHooksStore(s => s.handleWorkflowStartRunInChatflow)
const handleWorkflowTriggerScheduleRunInWorkflow = useHooksStore(s => s.handleWorkflowTriggerScheduleRunInWorkflow)
const handleWorkflowTriggerWebhookRunInWorkflow = useHooksStore(s => s.handleWorkflowTriggerWebhookRunInWorkflow)
return {
handleStartWorkflowRun,
handleWorkflowStartRunInWorkflow,
handleWorkflowStartRunInChatflow,
handleWorkflowTriggerScheduleRunInWorkflow,
handleWorkflowTriggerWebhookRunInWorkflow,
}
}

View File

@ -140,7 +140,7 @@ export function format(text: string) {
return res.replaceAll('\n', '<br/>').replaceAll('```', '')
}
const handleStream = (
export const handleStream = (
response: Response,
onData: IOnData,
onCompleted?: IOnCompleted,