From 52ce46c3644e415435eaee2c6e836b18fdfd1de9 Mon Sep 17 00:00:00 2001 From: twwu Date: Wed, 31 Dec 2025 16:44:43 +0800 Subject: [PATCH] refactor: optimize workflow variable handling and enhance node state management for resumption scenarios --- .../hooks/use-fetch-workflow-inspect-vars.ts | 28 ++++--- .../use-workflow-node-started.ts | 26 +++++-- .../workflow/panel/debug-and-preview/hooks.ts | 75 +++++++++++-------- web/types/workflow.ts | 1 + 4 files changed, 82 insertions(+), 48 deletions(-) diff --git a/web/app/components/workflow/hooks/use-fetch-workflow-inspect-vars.ts b/web/app/components/workflow/hooks/use-fetch-workflow-inspect-vars.ts index 54c2c77d0d..f013402cd3 100644 --- a/web/app/components/workflow/hooks/use-fetch-workflow-inspect-vars.ts +++ b/web/app/components/workflow/hooks/use-fetch-workflow-inspect-vars.ts @@ -2,7 +2,7 @@ import type { Node, ToolWithProvider } from '@/app/components/workflow/types' import type { SchemaTypeDefinition } from '@/service/use-common' import type { FlowType } from '@/types/common' import type { NodeWithVar, VarInInspect } from '@/types/workflow' -import { useCallback } from 'react' +import { useCallback, useMemo } from 'react' import { useStoreApi } from 'reactflow' import { useNodesInteractionsWithoutSync } from '@/app/components/workflow/hooks/use-nodes-interactions-without-sync' import { useStore, useWorkflowStore } from '@/app/components/workflow/store' @@ -14,7 +14,7 @@ import { } from '@/service/use-tools' import { useInvalidateConversationVarValues, useInvalidateSysVarValues } from '@/service/use-workflow' import { fetchAllInspectVars } from '@/service/workflow' -import useMatchSchemaType, { getMatchedSchemaType } from '../nodes/_base/components/variable/use-match-schema-type' +import useMatchSchemaType from '../nodes/_base/components/variable/use-match-schema-type' import { toNodeOutputVars } from '../nodes/_base/components/variable/utils' type Params = { @@ -37,15 +37,18 @@ export const useSetWorkflowVarsWithValue = ({ const { data: workflowTools } = useAllWorkflowTools() const { data: mcpTools } = useAllMCPTools() const dataSourceList = useStore(s => s.dataSourceList) - const allPluginInfoList = { - buildInTools: buildInTools || [], - customTools: customTools || [], - workflowTools: workflowTools || [], - mcpTools: mcpTools || [], - dataSourceList: dataSourceList || [], - } - const setInspectVarsToStore = (inspectVars: VarInInspect[], passedInAllPluginInfoList?: Record, passedInSchemaTypeDefinitions?: SchemaTypeDefinition[]) => { + const allPluginInfoList = useMemo(() => { + return { + buildInTools: buildInTools || [], + customTools: customTools || [], + workflowTools: workflowTools || [], + mcpTools: mcpTools || [], + dataSourceList: dataSourceList || [], + } + }, [buildInTools, customTools, workflowTools, mcpTools, dataSourceList]) + + const setInspectVarsToStore = useCallback((inspectVars: VarInInspect[], passedInAllPluginInfoList?: Record, passedInSchemaTypeDefinitions?: SchemaTypeDefinition[]) => { const { setNodesWithInspectVars } = workflowStore.getState() const { getNodes } = store.getState() @@ -95,7 +98,7 @@ export const useSetWorkflowVarsWithValue = ({ return nodeWithVar }) setNodesWithInspectVars(res) - } + }, [workflowStore, store, allPluginInfoList, schemaTypeDefinitions]) const fetchInspectVars = useCallback(async (params: { passInVars?: boolean @@ -109,7 +112,8 @@ export const useSetWorkflowVarsWithValue = ({ const data = passInVars ? vars! : await fetchAllInspectVars(flowType, flowId) setInspectVarsToStore(data, passedInAllPluginInfoList, passedInSchemaTypeDefinitions) handleCancelAllNodeSuccessStatus() // to make sure clear node output show the unset status - }, [invalidateConversationVarValues, invalidateSysVarValues, flowType, flowId, setInspectVarsToStore, handleCancelAllNodeSuccessStatus, schemaTypeDefinitions, getMatchedSchemaType]) + }, [invalidateConversationVarValues, invalidateSysVarValues, flowType, flowId, setInspectVarsToStore, handleCancelAllNodeSuccessStatus]) + return { fetchInspectVars, } diff --git a/web/app/components/workflow/hooks/use-workflow-run-event/use-workflow-node-started.ts b/web/app/components/workflow/hooks/use-workflow-run-event/use-workflow-node-started.ts index 03c7387d38..3c21fde0eb 100644 --- a/web/app/components/workflow/hooks/use-workflow-run-event/use-workflow-node-started.ts +++ b/web/app/components/workflow/hooks/use-workflow-run-event/use-workflow-node-started.ts @@ -21,6 +21,7 @@ export const useWorkflowNodeStarted = () => { }, ) => { const { data } = params + const { is_resumption } = data const { workflowRunningData, setWorkflowRunningData, @@ -33,12 +34,25 @@ export const useWorkflowNodeStarted = () => { transform, } = store.getState() const nodes = getNodes() - setWorkflowRunningData(produce(workflowRunningData!, (draft) => { - draft.tracing!.push({ - ...data, - status: NodeRunningStatus.Running, - }) - })) + if (is_resumption) { + const currentIndex = workflowRunningData?.tracing?.findIndex(item => item.node_id === data.node_id) + if (currentIndex && currentIndex > -1) { + setWorkflowRunningData(produce(workflowRunningData!, (draft) => { + draft.tracing![currentIndex] = { + ...data, + status: NodeRunningStatus.Running, + } + })) + } + } + else { + setWorkflowRunningData(produce(workflowRunningData!, (draft) => { + draft.tracing!.push({ + ...data, + status: NodeRunningStatus.Running, + }) + })) + } const { setViewport, diff --git a/web/app/components/workflow/panel/debug-and-preview/hooks.ts b/web/app/components/workflow/panel/debug-and-preview/hooks.ts index 34533a9117..eb0062fb5e 100644 --- a/web/app/components/workflow/panel/debug-and-preview/hooks.ts +++ b/web/app/components/workflow/panel/debug-and-preview/hooks.ts @@ -301,35 +301,38 @@ export const useChat = ( }) }, async onCompleted(hasError?: boolean, errorMessage?: string) { + const { workflowRunningData } = workflowStore.getState() handleResponding(false) - fetchInspectVars({}) - invalidAllLastRun() + if (workflowRunningData?.result.status !== WorkflowRunningStatus.Paused) { + fetchInspectVars({}) + invalidAllLastRun() - if (hasError) { - if (errorMessage) { - responseItem.content = errorMessage - responseItem.isError = true - updateCurrentQAOnTree({ - placeholderQuestionId, - questionItem, - responseItem, - parentId: params.parent_message_id, - }) + if (hasError) { + if (errorMessage) { + responseItem.content = errorMessage + responseItem.isError = true + updateCurrentQAOnTree({ + placeholderQuestionId, + questionItem, + responseItem, + parentId: params.parent_message_id, + }) + } + return } - return - } - if (config?.suggested_questions_after_answer?.enabled && !hasStopResponded.current && onGetSuggestedQuestions) { - try { - const { data }: any = await onGetSuggestedQuestions( - responseItem.id, - newAbortController => suggestedQuestionsAbortControllerRef.current = newAbortController, - ) - setSuggestQuestions(data) - } - // eslint-disable-next-line unused-imports/no-unused-vars - catch (error) { - setSuggestQuestions([]) + if (config?.suggested_questions_after_answer?.enabled && !hasStopResponded.current && onGetSuggestedQuestions) { + try { + const { data }: any = await onGetSuggestedQuestions( + responseItem.id, + newAbortController => suggestedQuestionsAbortControllerRef.current = newAbortController, + ) + setSuggestQuestions(data) + } + // eslint-disable-next-line unused-imports/no-unused-vars + catch (error) { + setSuggestQuestions([]) + } } } }, @@ -353,6 +356,7 @@ export const useChat = ( }, onWorkflowStarted: ({ workflow_run_id, task_id, data: { is_resumption } }) => { if (is_resumption) { + handleResponding(true) responseItem.workflowProcess!.status = WorkflowRunningStatus.Running } else { @@ -434,10 +438,22 @@ export const useChat = ( } }, onNodeStarted: ({ data }) => { - responseItem.workflowProcess!.tracing!.push({ - ...data, - status: NodeRunningStatus.Running, - } as any) + const { is_resumption } = data + if (is_resumption) { + const currentIndex = responseItem.workflowProcess!.tracing!.findIndex(item => item.node_id === data.node_id) + if (currentIndex > -1) { + responseItem.workflowProcess!.tracing![currentIndex] = { + ...data, + status: NodeRunningStatus.Running, + } + } + } + else { + responseItem.workflowProcess!.tracing!.push({ + ...data, + status: NodeRunningStatus.Running, + }) + } updateCurrentQAOnTree({ placeholderQuestionId, questionItem, @@ -538,7 +554,6 @@ export const useChat = ( const handleSubmitHumanInputForm = async (formID: string, formData: any) => { await submitHumanInputForm(formID, formData) - // TODO deal with success } const getHumanInputNodeData = (nodeID: string) => { diff --git a/web/types/workflow.ts b/web/types/workflow.ts index 418b913f27..b74cb1ffb2 100644 --- a/web/types/workflow.ts +++ b/web/types/workflow.ts @@ -105,6 +105,7 @@ export type NodeTracing = { parent_parallel_id?: string parent_parallel_start_node_id?: string agentLog?: AgentLogItemWithChildren[] // agent log + is_resumption?: boolean // for human input node } export type FetchWorkflowDraftResponse = {