diff --git a/api/controllers/console/app/workflow.py b/api/controllers/console/app/workflow.py index e1a6533655..3ea0ca8ab8 100644 --- a/api/controllers/console/app/workflow.py +++ b/api/controllers/console/app/workflow.py @@ -31,7 +31,7 @@ from libs.login import current_user, login_required from models import App from models.account import Account from models.model import AppMode -from models.workflow import Workflow +from models.workflow import NodeType, Workflow from services.app_generate_service import AppGenerateService from services.errors.app import WorkflowHashNotEqualError from services.errors.llm import InvokeRateLimitError @@ -1234,6 +1234,68 @@ class DraftWorkflowTriggerWebhookRunApi(Resource): ), 500 +@console_ns.route("/apps//workflows/draft/nodes//debug/webhook/run") +class DraftWorkflowNodeWebhookDebugRunApi(Resource): + """Single node debug when the node is a webhook trigger.""" + + @api.doc("draft_workflow_node_webhook_debug_run") + @api.doc(description="Poll for webhook debug payload and execute single node when event arrives") + @api.doc(params={"app_id": "Application ID", "node_id": "Node ID"}) + @api.response(200, "Node executed successfully") + @api.response(403, "Permission denied") + @api.response(400, "Invalid node type") + @api.response(500, "Internal server error") + @setup_required + @login_required + @account_initialization_required + @get_app_model(mode=[AppMode.WORKFLOW]) + def post(self, app_model: App, node_id: str): + if not isinstance(current_user, Account) or not current_user.has_edit_permission: + raise Forbidden() + + event = WebhookDebugService.poll_event( + tenant_id=app_model.tenant_id, + user_id=current_user.id, + app_id=app_model.id, + node_id=node_id, + ) + + if not event: + return jsonable_encoder({"status": "waiting", "retry_in": 2000}) + + workflow_service = WorkflowService() + draft_workflow = workflow_service.get_draft_workflow(app_model=app_model) + + if not draft_workflow: + raise DraftWorkflowNotExist() + + node_config = draft_workflow.get_node_config_by_id(node_id) + node_type = Workflow.get_node_type_from_node_config(node_config) + if node_type != NodeType.TRIGGER_WEBHOOK: + return jsonable_encoder({ + "status": "error", + "message": "node is not webhook trigger", + }), 400 + + payload = event.payload or {} + workflow_inputs = payload.get("inputs") + if workflow_inputs is None: + webhook_data = payload.get("webhook_data", {}) + workflow_inputs = WebhookService.build_workflow_inputs(webhook_data) + + workflow_node_execution = workflow_service.run_draft_workflow_node( + app_model=app_model, + draft_workflow=draft_workflow, + node_id=node_id, + user_inputs=workflow_inputs or {}, + account=current_user, + query="", + files=[], + ) + + return jsonable_encoder(workflow_node_execution) + + @console_ns.route("/apps//workflows/draft/trigger/schedule/run") class DraftWorkflowTriggerScheduleRunApi(Resource): """ diff --git a/api/services/workflow_service.py b/api/services/workflow_service.py index 23ccb9be35..27aff60e27 100644 --- a/api/services/workflow_service.py +++ b/api/services/workflow_service.py @@ -23,6 +23,7 @@ from core.workflow.nodes import NodeType from core.workflow.nodes.base.node import Node from core.workflow.nodes.node_mapping import LATEST_VERSION, NODE_TYPE_CLASSES_MAPPING from core.workflow.nodes.start.entities import StartNodeData +from core.workflow.nodes.trigger_webhook.entities import WebhookData from core.workflow.system_variable import SystemVariable from core.workflow.workflow_entry import WorkflowEntry from events.app_event import app_draft_workflow_was_synced, app_published_workflow_was_updated @@ -630,10 +631,13 @@ class WorkflowService: app=app_model, workflow=draft_workflow, ) - start_data = StartNodeData.model_validate(node_data) - user_inputs = _rebuild_file_for_user_inputs_in_start_node( - tenant_id=draft_workflow.tenant_id, start_node_data=start_data, user_inputs=user_inputs - ) + if node_type == NodeType.TRIGGER_WEBHOOK: + start_data = WebhookData.model_validate(node_data) + else: + start_data = StartNodeData.model_validate(node_data) + user_inputs = _rebuild_file_for_user_inputs_in_start_node( + tenant_id=draft_workflow.tenant_id, start_node_data=start_data, user_inputs=user_inputs + ) # init variable pool variable_pool = _setup_variable_pool( query=query, diff --git a/web/app/components/workflow/nodes/_base/components/workflow-panel/last-run/use-last-run.ts b/web/app/components/workflow/nodes/_base/components/workflow-panel/last-run/use-last-run.ts index 21462de939..6d81949cda 100644 --- a/web/app/components/workflow/nodes/_base/components/workflow-panel/last-run/use-last-run.ts +++ b/web/app/components/workflow/nodes/_base/components/workflow-panel/last-run/use-last-run.ts @@ -62,6 +62,9 @@ const singleRunFormParamsHooks: Record = { [BlockEnum.LoopEnd]: undefined, [BlockEnum.DataSource]: undefined, [BlockEnum.DataSourceEmpty]: undefined, + [BlockEnum.TriggerWebhook]: undefined, + [BlockEnum.TriggerSchedule]: undefined, + [BlockEnum.TriggerPlugin]: undefined, } const useSingleRunFormParamsHooks = (nodeType: BlockEnum) => { @@ -97,6 +100,9 @@ const getDataForCheckMoreHooks: Record = { [BlockEnum.DataSource]: undefined, [BlockEnum.DataSourceEmpty]: undefined, [BlockEnum.KnowledgeBase]: undefined, + [BlockEnum.TriggerWebhook]: undefined, + [BlockEnum.TriggerSchedule]: undefined, + [BlockEnum.TriggerPlugin]: undefined, } const useGetDataForCheckMoreHooks = (nodeType: BlockEnum) => { diff --git a/web/app/components/workflow/nodes/_base/hooks/use-one-step-run.ts b/web/app/components/workflow/nodes/_base/hooks/use-one-step-run.ts index 46a5a82839..e3d1bd4bbc 100644 --- a/web/app/components/workflow/nodes/_base/hooks/use-one-step-run.ts +++ b/web/app/components/workflow/nodes/_base/hooks/use-one-step-run.ts @@ -53,6 +53,8 @@ import { useInvalidLastRun } from '@/service/use-workflow' import useInspectVarsCrud from '../../../hooks/use-inspect-vars-crud' import type { FlowType } from '@/types/common' import useMatchSchemaType from '../components/variable/use-match-schema-type' +import { API_PREFIX } from '@/config' +import { getAccessToken, getBaseOptions } from '@/service/fetch' // eslint-disable-next-line ts/no-unsafe-function-type const checkValidFns: Record = { [BlockEnum.LLM]: checkLLMValid, @@ -198,6 +200,11 @@ const useOneStepRun = ({ invalidateConversationVarValues, } = useInspectVarsCrud() const runningStatus = data._singleRunningStatus || NodeRunningStatus.NotStart + const webhookSingleRunActiveRef = useRef(false) + const webhookSingleRunAbortRef = useRef(null) + const webhookSingleRunTimeoutRef = useRef(undefined) + const webhookSingleRunTokenRef = useRef(0) + const webhookSingleRunDelayResolveRef = useRef<(() => void) | null>(null) const isPausedRef = useRef(isPaused) useEffect(() => { isPausedRef.current = isPaused @@ -239,6 +246,122 @@ const useOneStepRun = ({ }, }) } + + const cancelWebhookSingleRun = useCallback(() => { + webhookSingleRunActiveRef.current = false + webhookSingleRunTokenRef.current += 1 + if (webhookSingleRunAbortRef.current) + webhookSingleRunAbortRef.current.abort() + webhookSingleRunAbortRef.current = null + if (webhookSingleRunTimeoutRef.current !== undefined) { + window.clearTimeout(webhookSingleRunTimeoutRef.current) + webhookSingleRunTimeoutRef.current = undefined + } + if (webhookSingleRunDelayResolveRef.current) { + webhookSingleRunDelayResolveRef.current() + webhookSingleRunDelayResolveRef.current = null + } + }, []) + + const runWebhookSingleRun = useCallback(async (): Promise => { + const urlPath = `/apps/${flowId}/workflows/draft/nodes/${id}/debug/webhook/run` + const urlWithPrefix = `${API_PREFIX}${urlPath.startsWith('/') ? urlPath : `/${urlPath}`}` + + webhookSingleRunActiveRef.current = true + const token = ++webhookSingleRunTokenRef.current + + while (webhookSingleRunActiveRef.current && token === webhookSingleRunTokenRef.current) { + const controller = new AbortController() + webhookSingleRunAbortRef.current = controller + + try { + const baseOptions = getBaseOptions() + const headers = new Headers(baseOptions.headers as Headers) + const accessToken = await getAccessToken() + headers.set('Authorization', `Bearer ${accessToken}`) + headers.set('Content-Type', 'application/json') + + const response = await fetch(urlWithPrefix, { + ...baseOptions, + method: 'POST', + headers, + body: JSON.stringify({}), + signal: controller.signal, + }) + + if (!webhookSingleRunActiveRef.current || token !== webhookSingleRunTokenRef.current) + return null + + const contentType = response.headers.get('Content-Type')?.toLowerCase() || '' + const responseData = contentType.includes('application/json') ? await response.json() : undefined + + if (!response.ok) { + const message = responseData?.message || 'Webhook debug failed' + Toast.notify({ type: 'error', message }) + cancelWebhookSingleRun() + throw new Error(message) + } + + if (responseData?.status === 'waiting') { + const delay = Number(responseData.retry_in) || 2000 + webhookSingleRunAbortRef.current = null + if (!webhookSingleRunActiveRef.current || token !== webhookSingleRunTokenRef.current) + return null + + await new Promise((resolve) => { + const timeoutId = window.setTimeout(resolve, delay) + webhookSingleRunTimeoutRef.current = timeoutId + webhookSingleRunDelayResolveRef.current = resolve + controller.signal.addEventListener('abort', () => { + window.clearTimeout(timeoutId) + resolve() + }, { once: true }) + }) + + webhookSingleRunTimeoutRef.current = undefined + webhookSingleRunDelayResolveRef.current = null + continue + } + + if (responseData?.status === 'error') { + const message = responseData.message || 'Webhook debug failed' + Toast.notify({ type: 'error', message }) + cancelWebhookSingleRun() + throw new Error(message) + } + + handleNodeDataUpdate({ + id, + data: { + ...data, + _isSingleRun: false, + _singleRunningStatus: NodeRunningStatus.Running, + }, + }) + + cancelWebhookSingleRun() + return responseData + } + catch (error) { + if (controller.signal.aborted && (!webhookSingleRunActiveRef.current || token !== webhookSingleRunTokenRef.current)) + return null + if (controller.signal.aborted) + return null + + console.error('handleRun: webhook debug polling error', error) + Toast.notify({ type: 'error', message: 'Webhook debug request failed' }) + cancelWebhookSingleRun() + if (error instanceof Error) + throw error + throw new Error(String(error)) + } + finally { + webhookSingleRunAbortRef.current = null + } + } + + return null + }, [flowId, id, data, handleNodeDataUpdate, cancelWebhookSingleRun]) const checkValidWrap = () => { if (!checkValid) return { isValid: true, errorMessage: '' } @@ -300,33 +423,53 @@ const useOneStepRun = ({ const isCompleted = runningStatus === NodeRunningStatus.Succeeded || runningStatus === NodeRunningStatus.Failed const handleRun = async (submitData: Record) => { + const isWebhookNode = data.type === BlockEnum.TriggerWebhook + if (isWebhookNode) + cancelWebhookSingleRun() + handleNodeDataUpdate({ id, data: { ...data, _isSingleRun: false, - _singleRunningStatus: NodeRunningStatus.Running, + _singleRunningStatus: isWebhookNode ? NodeRunningStatus.Waiting : NodeRunningStatus.Running, }, }) let res: any let hasError = false try { if (!isIteration && !isLoop) { - const isStartNode = data.type === BlockEnum.Start - const postData: Record = {} - if (isStartNode) { - const { '#sys.query#': query, '#sys.files#': files, ...inputs } = submitData - if (isChatMode) - postData.conversation_id = '' - - postData.inputs = inputs - postData.query = query - postData.files = files || [] + if (isWebhookNode) { + res = await runWebhookSingleRun() + if (!res) { + handleNodeDataUpdate({ + id, + data: { + ...data, + _isSingleRun: false, + _singleRunningStatus: NodeRunningStatus.NotStart, + }, + }) + return false + } } else { - postData.inputs = submitData + const isStartNode = data.type === BlockEnum.Start + const postData: Record = {} + if (isStartNode) { + const { '#sys.query#': query, '#sys.files#': files, ...inputs } = submitData + if (isChatMode) + postData.conversation_id = '' + + postData.inputs = inputs + postData.query = query + postData.files = files || [] + } + else { + postData.inputs = submitData + } + res = await singleNodeRun(flowType, flowId!, id, postData) as any } - res = await singleNodeRun(flowType, flowId!, id, postData) as any } else if (isIteration) { setIterationRunResult([]) @@ -557,6 +700,8 @@ const useOneStepRun = ({ } } finally { + if (data.type === BlockEnum.TriggerWebhook) + cancelWebhookSingleRun() if (!isPausedRef.current && !isIteration && !isLoop && res) { setRunResult({ ...res, @@ -583,6 +728,7 @@ const useOneStepRun = ({ } const handleStop = () => { + cancelWebhookSingleRun() handleNodeDataUpdate({ id, data: { diff --git a/web/app/components/workflow/utils/workflow.ts b/web/app/components/workflow/utils/workflow.ts index 0e42cd3ec5..37534a1f5a 100644 --- a/web/app/components/workflow/utils/workflow.ts +++ b/web/app/components/workflow/utils/workflow.ts @@ -34,6 +34,7 @@ export const canRunBySingle = (nodeType: BlockEnum, isChildNode: boolean) => { || nodeType === BlockEnum.VariableAggregator || nodeType === BlockEnum.Assigner || nodeType === BlockEnum.DataSource + || nodeType === BlockEnum.TriggerWebhook } export const isSupportCustomRunForm = (nodeType: BlockEnum) => {