From 654adccfbf6c3f9b3402c18d65802484e9e1ee8b Mon Sep 17 00:00:00 2001 From: zhsama Date: Mon, 13 Oct 2025 17:02:30 +0800 Subject: [PATCH] fix(trigger): implement plugin single run functionality and update node status handling --- .../nodes/_base/hooks/use-one-step-run.ts | 131 +++++++++++++++++- web/app/components/workflow/utils/workflow.ts | 1 + 2 files changed, 129 insertions(+), 3 deletions(-) diff --git a/web/app/components/workflow/nodes/_base/hooks/use-one-step-run.ts b/web/app/components/workflow/nodes/_base/hooks/use-one-step-run.ts index e3d1bd4bbc..df7a256871 100644 --- a/web/app/components/workflow/nodes/_base/hooks/use-one-step-run.ts +++ b/web/app/components/workflow/nodes/_base/hooks/use-one-step-run.ts @@ -362,6 +362,114 @@ const useOneStepRun = ({ return null }, [flowId, id, data, handleNodeDataUpdate, cancelWebhookSingleRun]) + + const runPluginSingleRun = useCallback(async (): Promise => { + const urlPath = `/apps/${flowId}/workflows/draft/nodes/${id}/trigger` + const urlWithPrefix = `${API_PREFIX}${urlPath.startsWith('/') ? urlPath : `/${urlPath}`}` + + webhookSingleRunActiveRef.current = true + const token = ++webhookSingleRunTokenRef.current + + while (webhookSingleRunActiveRef.current && token === webhookSingleRunTokenRef.current) { + const controller = new AbortController() + webhookSingleRunAbortRef.current = controller + + try { + const baseOptions = getBaseOptions() + const headers = new Headers(baseOptions.headers as Headers) + const accessToken = await getAccessToken() + headers.set('Authorization', `Bearer ${accessToken}`) + headers.set('Content-Type', 'application/json') + + // Reason: Plugin trigger requires event_name, subscription_id, provider_id from node data + const requestBody = { + event_name: (data as any).event_name, + subscription_id: (data as any).subscription_id, + provider_id: (data as any).provider_id, + } + + const response = await fetch(urlWithPrefix, { + ...baseOptions, + method: 'POST', + headers, + body: JSON.stringify(requestBody), + signal: controller.signal, + }) + + if (!webhookSingleRunActiveRef.current || token !== webhookSingleRunTokenRef.current) + return null + + const contentType = response.headers.get('Content-Type')?.toLowerCase() || '' + const responseData = contentType.includes('application/json') ? await response.json() : undefined + + if (!response.ok) { + const message = responseData?.message || 'Plugin debug failed' + Toast.notify({ type: 'error', message }) + cancelWebhookSingleRun() + throw new Error(message) + } + + if (responseData?.status === 'waiting') { + const delay = Number(responseData.retry_in) || 2000 + webhookSingleRunAbortRef.current = null + if (!webhookSingleRunActiveRef.current || token !== webhookSingleRunTokenRef.current) + return null + + await new Promise((resolve) => { + const timeoutId = window.setTimeout(resolve, delay) + webhookSingleRunTimeoutRef.current = timeoutId + webhookSingleRunDelayResolveRef.current = resolve + controller.signal.addEventListener('abort', () => { + window.clearTimeout(timeoutId) + resolve() + }, { once: true }) + }) + + webhookSingleRunTimeoutRef.current = undefined + webhookSingleRunDelayResolveRef.current = null + continue + } + + if (responseData?.status === 'error') { + const message = responseData.message || 'Plugin debug failed' + Toast.notify({ type: 'error', message }) + cancelWebhookSingleRun() + throw new Error(message) + } + + handleNodeDataUpdate({ + id, + data: { + ...data, + _isSingleRun: false, + _singleRunningStatus: NodeRunningStatus.Running, + }, + }) + + cancelWebhookSingleRun() + return responseData + } + catch (error) { + if (controller.signal.aborted && (!webhookSingleRunActiveRef.current || token !== webhookSingleRunTokenRef.current)) + return null + if (controller.signal.aborted) + return null + + console.error('handleRun: plugin debug polling error', error) + Toast.notify({ type: 'error', message: 'Plugin debug request failed' }) + cancelWebhookSingleRun() + if (error instanceof Error) + throw error + throw new Error(String(error)) + } + finally { + webhookSingleRunAbortRef.current = null + } + } + + return null + }, [flowId, id, data, handleNodeDataUpdate, cancelWebhookSingleRun]) + const checkValidWrap = () => { if (!checkValid) return { isValid: true, errorMessage: '' } @@ -424,7 +532,10 @@ const useOneStepRun = ({ const handleRun = async (submitData: Record) => { const isWebhookNode = data.type === BlockEnum.TriggerWebhook - if (isWebhookNode) + const isPluginNode = data.type === BlockEnum.TriggerPlugin + const isTriggerNode = isWebhookNode || isPluginNode + + if (isTriggerNode) cancelWebhookSingleRun() handleNodeDataUpdate({ @@ -432,7 +543,7 @@ const useOneStepRun = ({ data: { ...data, _isSingleRun: false, - _singleRunningStatus: isWebhookNode ? NodeRunningStatus.Waiting : NodeRunningStatus.Running, + _singleRunningStatus: isTriggerNode ? NodeRunningStatus.Waiting : NodeRunningStatus.Running, }, }) let res: any @@ -453,6 +564,20 @@ const useOneStepRun = ({ return false } } + else if (isPluginNode) { + res = await runPluginSingleRun() + if (!res) { + handleNodeDataUpdate({ + id, + data: { + ...data, + _isSingleRun: false, + _singleRunningStatus: NodeRunningStatus.NotStart, + }, + }) + return false + } + } else { const isStartNode = data.type === BlockEnum.Start const postData: Record = {} @@ -700,7 +825,7 @@ const useOneStepRun = ({ } } finally { - if (data.type === BlockEnum.TriggerWebhook) + if (isTriggerNode) cancelWebhookSingleRun() if (!isPausedRef.current && !isIteration && !isLoop && res) { setRunResult({ diff --git a/web/app/components/workflow/utils/workflow.ts b/web/app/components/workflow/utils/workflow.ts index 37534a1f5a..4aaf6f1011 100644 --- a/web/app/components/workflow/utils/workflow.ts +++ b/web/app/components/workflow/utils/workflow.ts @@ -35,6 +35,7 @@ export const canRunBySingle = (nodeType: BlockEnum, isChildNode: boolean) => { || nodeType === BlockEnum.Assigner || nodeType === BlockEnum.DataSource || nodeType === BlockEnum.TriggerWebhook + || nodeType === BlockEnum.TriggerPlugin } export const isSupportCustomRunForm = (nodeType: BlockEnum) => {