refactor: optimize workflow variable handling and enhance node state management for resumption scenarios

This commit is contained in:
twwu 2025-12-31 16:44:43 +08:00
parent f0f1ae0b49
commit 52ce46c364
4 changed files with 82 additions and 48 deletions

View File

@ -2,7 +2,7 @@ import type { Node, ToolWithProvider } from '@/app/components/workflow/types'
import type { SchemaTypeDefinition } from '@/service/use-common'
import type { FlowType } from '@/types/common'
import type { NodeWithVar, VarInInspect } from '@/types/workflow'
import { useCallback } from 'react'
import { useCallback, useMemo } from 'react'
import { useStoreApi } from 'reactflow'
import { useNodesInteractionsWithoutSync } from '@/app/components/workflow/hooks/use-nodes-interactions-without-sync'
import { useStore, useWorkflowStore } from '@/app/components/workflow/store'
@ -14,7 +14,7 @@ import {
} from '@/service/use-tools'
import { useInvalidateConversationVarValues, useInvalidateSysVarValues } from '@/service/use-workflow'
import { fetchAllInspectVars } from '@/service/workflow'
import useMatchSchemaType, { getMatchedSchemaType } from '../nodes/_base/components/variable/use-match-schema-type'
import useMatchSchemaType from '../nodes/_base/components/variable/use-match-schema-type'
import { toNodeOutputVars } from '../nodes/_base/components/variable/utils'
type Params = {
@ -37,15 +37,18 @@ export const useSetWorkflowVarsWithValue = ({
const { data: workflowTools } = useAllWorkflowTools()
const { data: mcpTools } = useAllMCPTools()
const dataSourceList = useStore(s => s.dataSourceList)
const allPluginInfoList = {
buildInTools: buildInTools || [],
customTools: customTools || [],
workflowTools: workflowTools || [],
mcpTools: mcpTools || [],
dataSourceList: dataSourceList || [],
}
const setInspectVarsToStore = (inspectVars: VarInInspect[], passedInAllPluginInfoList?: Record<string, ToolWithProvider[]>, passedInSchemaTypeDefinitions?: SchemaTypeDefinition[]) => {
const allPluginInfoList = useMemo(() => {
return {
buildInTools: buildInTools || [],
customTools: customTools || [],
workflowTools: workflowTools || [],
mcpTools: mcpTools || [],
dataSourceList: dataSourceList || [],
}
}, [buildInTools, customTools, workflowTools, mcpTools, dataSourceList])
const setInspectVarsToStore = useCallback((inspectVars: VarInInspect[], passedInAllPluginInfoList?: Record<string, ToolWithProvider[]>, passedInSchemaTypeDefinitions?: SchemaTypeDefinition[]) => {
const { setNodesWithInspectVars } = workflowStore.getState()
const { getNodes } = store.getState()
@ -95,7 +98,7 @@ export const useSetWorkflowVarsWithValue = ({
return nodeWithVar
})
setNodesWithInspectVars(res)
}
}, [workflowStore, store, allPluginInfoList, schemaTypeDefinitions])
const fetchInspectVars = useCallback(async (params: {
passInVars?: boolean
@ -109,7 +112,8 @@ export const useSetWorkflowVarsWithValue = ({
const data = passInVars ? vars! : await fetchAllInspectVars(flowType, flowId)
setInspectVarsToStore(data, passedInAllPluginInfoList, passedInSchemaTypeDefinitions)
handleCancelAllNodeSuccessStatus() // to make sure clear node output show the unset status
}, [invalidateConversationVarValues, invalidateSysVarValues, flowType, flowId, setInspectVarsToStore, handleCancelAllNodeSuccessStatus, schemaTypeDefinitions, getMatchedSchemaType])
}, [invalidateConversationVarValues, invalidateSysVarValues, flowType, flowId, setInspectVarsToStore, handleCancelAllNodeSuccessStatus])
return {
fetchInspectVars,
}

View File

@ -21,6 +21,7 @@ export const useWorkflowNodeStarted = () => {
},
) => {
const { data } = params
const { is_resumption } = data
const {
workflowRunningData,
setWorkflowRunningData,
@ -33,12 +34,25 @@ export const useWorkflowNodeStarted = () => {
transform,
} = store.getState()
const nodes = getNodes()
setWorkflowRunningData(produce(workflowRunningData!, (draft) => {
draft.tracing!.push({
...data,
status: NodeRunningStatus.Running,
})
}))
if (is_resumption) {
const currentIndex = workflowRunningData?.tracing?.findIndex(item => item.node_id === data.node_id)
if (currentIndex && currentIndex > -1) {
setWorkflowRunningData(produce(workflowRunningData!, (draft) => {
draft.tracing![currentIndex] = {
...data,
status: NodeRunningStatus.Running,
}
}))
}
}
else {
setWorkflowRunningData(produce(workflowRunningData!, (draft) => {
draft.tracing!.push({
...data,
status: NodeRunningStatus.Running,
})
}))
}
const {
setViewport,

View File

@ -301,35 +301,38 @@ export const useChat = (
})
},
async onCompleted(hasError?: boolean, errorMessage?: string) {
const { workflowRunningData } = workflowStore.getState()
handleResponding(false)
fetchInspectVars({})
invalidAllLastRun()
if (workflowRunningData?.result.status !== WorkflowRunningStatus.Paused) {
fetchInspectVars({})
invalidAllLastRun()
if (hasError) {
if (errorMessage) {
responseItem.content = errorMessage
responseItem.isError = true
updateCurrentQAOnTree({
placeholderQuestionId,
questionItem,
responseItem,
parentId: params.parent_message_id,
})
if (hasError) {
if (errorMessage) {
responseItem.content = errorMessage
responseItem.isError = true
updateCurrentQAOnTree({
placeholderQuestionId,
questionItem,
responseItem,
parentId: params.parent_message_id,
})
}
return
}
return
}
if (config?.suggested_questions_after_answer?.enabled && !hasStopResponded.current && onGetSuggestedQuestions) {
try {
const { data }: any = await onGetSuggestedQuestions(
responseItem.id,
newAbortController => suggestedQuestionsAbortControllerRef.current = newAbortController,
)
setSuggestQuestions(data)
}
// eslint-disable-next-line unused-imports/no-unused-vars
catch (error) {
setSuggestQuestions([])
if (config?.suggested_questions_after_answer?.enabled && !hasStopResponded.current && onGetSuggestedQuestions) {
try {
const { data }: any = await onGetSuggestedQuestions(
responseItem.id,
newAbortController => suggestedQuestionsAbortControllerRef.current = newAbortController,
)
setSuggestQuestions(data)
}
// eslint-disable-next-line unused-imports/no-unused-vars
catch (error) {
setSuggestQuestions([])
}
}
}
},
@ -353,6 +356,7 @@ export const useChat = (
},
onWorkflowStarted: ({ workflow_run_id, task_id, data: { is_resumption } }) => {
if (is_resumption) {
handleResponding(true)
responseItem.workflowProcess!.status = WorkflowRunningStatus.Running
}
else {
@ -434,10 +438,22 @@ export const useChat = (
}
},
onNodeStarted: ({ data }) => {
responseItem.workflowProcess!.tracing!.push({
...data,
status: NodeRunningStatus.Running,
} as any)
const { is_resumption } = data
if (is_resumption) {
const currentIndex = responseItem.workflowProcess!.tracing!.findIndex(item => item.node_id === data.node_id)
if (currentIndex > -1) {
responseItem.workflowProcess!.tracing![currentIndex] = {
...data,
status: NodeRunningStatus.Running,
}
}
}
else {
responseItem.workflowProcess!.tracing!.push({
...data,
status: NodeRunningStatus.Running,
})
}
updateCurrentQAOnTree({
placeholderQuestionId,
questionItem,
@ -538,7 +554,6 @@ export const useChat = (
const handleSubmitHumanInputForm = async (formID: string, formData: any) => {
await submitHumanInputForm(formID, formData)
// TODO deal with success
}
const getHumanInputNodeData = (nodeID: string) => {

View File

@ -105,6 +105,7 @@ export type NodeTracing = {
parent_parallel_id?: string
parent_parallel_start_node_id?: string
agentLog?: AgentLogItemWithChildren[] // agent log
is_resumption?: boolean // for human input node
}
export type FetchWorkflowDraftResponse = {