feat(workflow): enhance trigger node handling with event listening and state management

This commit is contained in:
zhsama 2025-10-23 18:21:22 +08:00
parent 4ae23ed0f9
commit 9aa43c9165
1 changed files with 101 additions and 15 deletions

View File

@ -10,7 +10,14 @@ import {
import { getNodeInfoById, isConversationVar, isENV, isSystemVar, toNodeOutputVars } from '@/app/components/workflow/nodes/_base/components/variable/utils'
import type { CommonNodeType, InputVar, ValueSelector, Var, Variable } from '@/app/components/workflow/types'
import { BlockEnum, InputVarType, NodeRunningStatus, VarType } from '@/app/components/workflow/types'
import {
BlockEnum,
InputVarType,
NodeRunningStatus,
VarType,
} from '@/app/components/workflow/types'
import type { TriggerNodeType } from '@/app/components/workflow/types'
import { EVENT_WORKFLOW_STOP } from '@/app/components/workflow/variable-inspect/types'
import { useStore, useWorkflowStore } from '@/app/components/workflow/store'
import { fetchNodeInspectVars, getIterationSingleNodeRunUrl, getLoopSingleNodeRunUrl, singleNodeRun } from '@/service/workflow'
import Toast from '@/app/components/base/toast'
@ -50,9 +57,10 @@ import {
useStoreApi,
} from 'reactflow'
import { useInvalidLastRun } from '@/service/use-workflow'
import useInspectVarsCrud from '../../../hooks/use-inspect-vars-crud'
import useInspectVarsCrud from '@/app/components/workflow/hooks/use-inspect-vars-crud'
import type { FlowType } from '@/types/common'
import useMatchSchemaType from '../components/variable/use-match-schema-type'
import { useEventEmitterContextContext } from '@/context/event-emitter'
// eslint-disable-next-line ts/no-unsafe-function-type
const checkValidFns: Record<BlockEnum, Function> = {
[BlockEnum.LLM]: checkLLMValid,
@ -189,6 +197,12 @@ const useOneStepRun = <T>({
const store = useStoreApi()
const {
setShowSingleRunPanel,
setIsListening,
setListeningTriggerType,
setListeningTriggerNodeId,
setListeningTriggerNodeIds,
setListeningTriggerIsAll,
setShowVariableInspectPanel,
} = workflowStore.getState()
const invalidLastRun = useInvalidLastRun(flowType, flowId!, id)
const [runResult, doSetRunResult] = useState<NodeRunResult | null>(null)
@ -212,6 +226,7 @@ const useOneStepRun = <T>({
useEffect(() => {
isPausedRef.current = isPaused
}, [isPaused])
const { eventEmitter } = useEventEmitterContextContext()
const setRunResult = useCallback(async (data: NodeRunResult | null) => {
const isPaused = isPausedRef.current
@ -282,6 +297,50 @@ const useOneStepRun = <T>({
}
}, [])
const isWebhookTriggerNode = data.type === BlockEnum.TriggerWebhook
const isPluginTriggerNode = data.type === BlockEnum.TriggerPlugin
const isTriggerNode = isWebhookTriggerNode || isPluginTriggerNode
const startTriggerListening = useCallback(() => {
if (!isTriggerNode)
return
setIsListening(true)
setShowVariableInspectPanel(true)
setListeningTriggerType(data.type as TriggerNodeType)
setListeningTriggerNodeId(id)
setListeningTriggerNodeIds([id])
setListeningTriggerIsAll(false)
}, [
isTriggerNode,
setIsListening,
setShowVariableInspectPanel,
setListeningTriggerType,
data.type,
setListeningTriggerNodeId,
id,
setListeningTriggerNodeIds,
setListeningTriggerIsAll,
])
const stopTriggerListening = useCallback(() => {
if (!isTriggerNode)
return
setIsListening(false)
setListeningTriggerType(null)
setListeningTriggerNodeId(null)
setListeningTriggerNodeIds([])
setListeningTriggerIsAll(false)
}, [
isTriggerNode,
setIsListening,
setListeningTriggerType,
setListeningTriggerNodeId,
setListeningTriggerNodeIds,
setListeningTriggerIsAll,
])
const runWebhookSingleRun = useCallback(async (): Promise<any | null> => {
const urlPath = `/apps/${flowId}/workflows/draft/nodes/${id}/trigger/run`
@ -517,15 +576,16 @@ const useOneStepRun = <T>({
const isCompleted = runningStatus === NodeRunningStatus.Succeeded || runningStatus === NodeRunningStatus.Failed
const handleRun = async (submitData: Record<string, any>) => {
const isWebhookNode = data.type === BlockEnum.TriggerWebhook
const isPluginNode = data.type === BlockEnum.TriggerPlugin
const isTriggerNode = isWebhookNode || isPluginNode
if (isWebhookNode)
if (isWebhookTriggerNode)
cancelWebhookSingleRun()
if (isPluginNode)
if (isPluginTriggerNode)
cancelPluginSingleRun()
if (isTriggerNode)
startTriggerListening()
else
stopTriggerListening()
handleNodeDataUpdate({
id,
data: {
@ -538,7 +598,7 @@ const useOneStepRun = <T>({
let hasError = false
try {
if (!isIteration && !isLoop) {
if (isWebhookNode) {
if (isWebhookTriggerNode) {
res = await runWebhookSingleRun()
if (!res) {
if (webhookSingleRunActiveRef.current) {
@ -554,7 +614,7 @@ const useOneStepRun = <T>({
return false
}
}
else if (isPluginNode) {
else if (isPluginTriggerNode) {
res = await runPluginSingleRun()
if (!res) {
if (pluginSingleRunActiveRef.current) {
@ -817,10 +877,12 @@ const useOneStepRun = <T>({
}
}
finally {
if (isWebhookNode)
if (isWebhookTriggerNode)
cancelWebhookSingleRun()
if (isPluginNode)
if (isPluginTriggerNode)
cancelPluginSingleRun()
if (isTriggerNode)
stopTriggerListening()
if (!isPausedRef.current && !isIteration && !isLoop && res) {
setRunResult({
...res,
@ -846,18 +908,37 @@ const useOneStepRun = <T>({
}
}
const handleStop = () => {
const handleStop = useCallback(() => {
if (isTriggerNode) {
const isTriggerActive = runningStatus === NodeRunningStatus.Listening
|| webhookSingleRunActiveRef.current
|| pluginSingleRunActiveRef.current
if (!isTriggerActive)
return
}
else if (runningStatus !== NodeRunningStatus.Running) {
return
}
cancelWebhookSingleRun()
cancelPluginSingleRun()
handleNodeDataUpdate({
id,
data: {
...data,
_isSingleRun: false,
_singleRunningStatus: NodeRunningStatus.Stopped,
},
})
}
stopTriggerListening()
}, [
isTriggerNode,
runningStatus,
cancelWebhookSingleRun,
cancelPluginSingleRun,
handleNodeDataUpdate,
id,
stopTriggerListening,
])
const toVarInputs = (variables: Variable[]): InputVar[] => {
if (!variables)
@ -920,6 +1001,11 @@ const useOneStepRun = <T>({
})
}
eventEmitter?.useSubscription((v: any) => {
if (v.type === EVENT_WORKFLOW_STOP)
handleStop()
})
return {
isShowSingleRun,
hideSingleRun,