diff --git a/web/app/components/workflow/nodes/_base/hooks/use-one-step-run.ts b/web/app/components/workflow/nodes/_base/hooks/use-one-step-run.ts index d7bac6d3ca..deb082b0d9 100644 --- a/web/app/components/workflow/nodes/_base/hooks/use-one-step-run.ts +++ b/web/app/components/workflow/nodes/_base/hooks/use-one-step-run.ts @@ -10,7 +10,14 @@ import { import { getNodeInfoById, isConversationVar, isENV, isSystemVar, toNodeOutputVars } from '@/app/components/workflow/nodes/_base/components/variable/utils' import type { CommonNodeType, InputVar, ValueSelector, Var, Variable } from '@/app/components/workflow/types' -import { BlockEnum, InputVarType, NodeRunningStatus, VarType } from '@/app/components/workflow/types' +import { + BlockEnum, + InputVarType, + NodeRunningStatus, + VarType, +} from '@/app/components/workflow/types' +import type { TriggerNodeType } from '@/app/components/workflow/types' +import { EVENT_WORKFLOW_STOP } from '@/app/components/workflow/variable-inspect/types' import { useStore, useWorkflowStore } from '@/app/components/workflow/store' import { fetchNodeInspectVars, getIterationSingleNodeRunUrl, getLoopSingleNodeRunUrl, singleNodeRun } from '@/service/workflow' import Toast from '@/app/components/base/toast' @@ -50,9 +57,10 @@ import { useStoreApi, } from 'reactflow' import { useInvalidLastRun } from '@/service/use-workflow' -import useInspectVarsCrud from '../../../hooks/use-inspect-vars-crud' +import useInspectVarsCrud from '@/app/components/workflow/hooks/use-inspect-vars-crud' import type { FlowType } from '@/types/common' import useMatchSchemaType from '../components/variable/use-match-schema-type' +import { useEventEmitterContextContext } from '@/context/event-emitter' // eslint-disable-next-line ts/no-unsafe-function-type const checkValidFns: Record = { [BlockEnum.LLM]: checkLLMValid, @@ -189,6 +197,12 @@ const useOneStepRun = ({ const store = useStoreApi() const { setShowSingleRunPanel, + setIsListening, + setListeningTriggerType, + setListeningTriggerNodeId, + setListeningTriggerNodeIds, + setListeningTriggerIsAll, + setShowVariableInspectPanel, } = workflowStore.getState() const invalidLastRun = useInvalidLastRun(flowType, flowId!, id) const [runResult, doSetRunResult] = useState(null) @@ -212,6 +226,7 @@ const useOneStepRun = ({ useEffect(() => { isPausedRef.current = isPaused }, [isPaused]) + const { eventEmitter } = useEventEmitterContextContext() const setRunResult = useCallback(async (data: NodeRunResult | null) => { const isPaused = isPausedRef.current @@ -282,6 +297,50 @@ const useOneStepRun = ({ } }, []) + const isWebhookTriggerNode = data.type === BlockEnum.TriggerWebhook + const isPluginTriggerNode = data.type === BlockEnum.TriggerPlugin + const isTriggerNode = isWebhookTriggerNode || isPluginTriggerNode + + const startTriggerListening = useCallback(() => { + if (!isTriggerNode) + return + + setIsListening(true) + setShowVariableInspectPanel(true) + setListeningTriggerType(data.type as TriggerNodeType) + setListeningTriggerNodeId(id) + setListeningTriggerNodeIds([id]) + setListeningTriggerIsAll(false) + }, [ + isTriggerNode, + setIsListening, + setShowVariableInspectPanel, + setListeningTriggerType, + data.type, + setListeningTriggerNodeId, + id, + setListeningTriggerNodeIds, + setListeningTriggerIsAll, + ]) + + const stopTriggerListening = useCallback(() => { + if (!isTriggerNode) + return + + setIsListening(false) + setListeningTriggerType(null) + setListeningTriggerNodeId(null) + setListeningTriggerNodeIds([]) + setListeningTriggerIsAll(false) + }, [ + isTriggerNode, + setIsListening, + setListeningTriggerType, + setListeningTriggerNodeId, + setListeningTriggerNodeIds, + setListeningTriggerIsAll, + ]) + const runWebhookSingleRun = useCallback(async (): Promise => { const urlPath = `/apps/${flowId}/workflows/draft/nodes/${id}/trigger/run` @@ -517,15 +576,16 @@ const useOneStepRun = ({ const isCompleted = runningStatus === NodeRunningStatus.Succeeded || runningStatus === NodeRunningStatus.Failed const handleRun = async (submitData: Record) => { - const isWebhookNode = data.type === BlockEnum.TriggerWebhook - const isPluginNode = data.type === BlockEnum.TriggerPlugin - const isTriggerNode = isWebhookNode || isPluginNode - - if (isWebhookNode) + if (isWebhookTriggerNode) cancelWebhookSingleRun() - if (isPluginNode) + if (isPluginTriggerNode) cancelPluginSingleRun() + if (isTriggerNode) + startTriggerListening() + else + stopTriggerListening() + handleNodeDataUpdate({ id, data: { @@ -538,7 +598,7 @@ const useOneStepRun = ({ let hasError = false try { if (!isIteration && !isLoop) { - if (isWebhookNode) { + if (isWebhookTriggerNode) { res = await runWebhookSingleRun() if (!res) { if (webhookSingleRunActiveRef.current) { @@ -554,7 +614,7 @@ const useOneStepRun = ({ return false } } - else if (isPluginNode) { + else if (isPluginTriggerNode) { res = await runPluginSingleRun() if (!res) { if (pluginSingleRunActiveRef.current) { @@ -817,10 +877,12 @@ const useOneStepRun = ({ } } finally { - if (isWebhookNode) + if (isWebhookTriggerNode) cancelWebhookSingleRun() - if (isPluginNode) + if (isPluginTriggerNode) cancelPluginSingleRun() + if (isTriggerNode) + stopTriggerListening() if (!isPausedRef.current && !isIteration && !isLoop && res) { setRunResult({ ...res, @@ -846,18 +908,37 @@ const useOneStepRun = ({ } } - const handleStop = () => { + const handleStop = useCallback(() => { + if (isTriggerNode) { + const isTriggerActive = runningStatus === NodeRunningStatus.Listening + || webhookSingleRunActiveRef.current + || pluginSingleRunActiveRef.current + if (!isTriggerActive) + return + } + else if (runningStatus !== NodeRunningStatus.Running) { + return + } + cancelWebhookSingleRun() cancelPluginSingleRun() handleNodeDataUpdate({ id, data: { - ...data, _isSingleRun: false, _singleRunningStatus: NodeRunningStatus.Stopped, }, }) - } + stopTriggerListening() + }, [ + isTriggerNode, + runningStatus, + cancelWebhookSingleRun, + cancelPluginSingleRun, + handleNodeDataUpdate, + id, + stopTriggerListening, + ]) const toVarInputs = (variables: Variable[]): InputVar[] => { if (!variables) @@ -920,6 +1001,11 @@ const useOneStepRun = ({ }) } + eventEmitter?.useSubscription((v: any) => { + if (v.type === EVENT_WORKFLOW_STOP) + handleStop() + }) + return { isShowSingleRun, hideSingleRun,