diff --git a/web/app/components/workflow/panel/debug-and-preview/hooks.ts b/web/app/components/workflow/panel/debug-and-preview/hooks.ts index 4f8512ea4c..a9e961874e 100644 --- a/web/app/components/workflow/panel/debug-and-preview/hooks.ts +++ b/web/app/components/workflow/panel/debug-and-preview/hooks.ts @@ -43,6 +43,11 @@ import { useHooksStore } from '../../hooks-store' import { useWorkflowStore } from '../../store' import { NodeRunningStatus, WorkflowRunningStatus } from '../../types' import { upsertTopLevelTracingNodeOnStart } from '../../utils/top-level-tracing' +import { + findTracingIndexByExecutionOrUniqueNodeId, + mergeTracingNodePreservingExecutionMetadata, + upsertTracingNodeOnResumeStart, +} from '../../utils/tracing-execution' type GetAbortController = (abortController: AbortController) => void type SendCallback = { @@ -470,8 +475,7 @@ export const useChat = ( const currentTracingIndex = responseItem.workflowProcess!.tracing!.findIndex(item => item.id === data.id) if (currentTracingIndex > -1) { responseItem.workflowProcess!.tracing[currentTracingIndex] = { - ...responseItem.workflowProcess!.tracing[currentTracingIndex], - ...data, + ...mergeTracingNodePreservingExecutionMetadata(responseItem.workflowProcess!.tracing[currentTracingIndex], data), } updateCurrentQAOnTree({ placeholderQuestionId, @@ -497,8 +501,7 @@ export const useChat = ( const currentTracingIndex = responseItem.workflowProcess!.tracing!.findIndex(item => item.id === data.id) if (currentTracingIndex > -1) { responseItem.workflowProcess!.tracing[currentTracingIndex] = { - ...responseItem.workflowProcess!.tracing[currentTracingIndex], - ...data, + ...mergeTracingNodePreservingExecutionMetadata(responseItem.workflowProcess!.tracing[currentTracingIndex], data), } updateCurrentQAOnTree({ placeholderQuestionId, @@ -540,8 +543,7 @@ export const useChat = ( const currentTracingIndex = responseItem.workflowProcess!.tracing!.findIndex(item => item.id === data.id) if (currentTracingIndex > -1) { responseItem.workflowProcess!.tracing[currentTracingIndex] = { - ...responseItem.workflowProcess!.tracing[currentTracingIndex], - ...data, + ...mergeTracingNodePreservingExecutionMetadata(responseItem.workflowProcess!.tracing[currentTracingIndex], data), } updateCurrentQAOnTree({ placeholderQuestionId, @@ -552,7 +554,10 @@ export const useChat = ( } }, onAgentLog: ({ data }) => { - const currentNodeIndex = responseItem.workflowProcess!.tracing!.findIndex(item => item.id === data.node_execution_id) + const currentNodeIndex = findTracingIndexByExecutionOrUniqueNodeId(responseItem.workflowProcess!.tracing!, { + executionId: data.node_execution_id, + nodeId: data.node_id, + }) if (currentNodeIndex > -1) { const current = responseItem.workflowProcess!.tracing![currentNodeIndex] @@ -767,7 +772,8 @@ export const useChat = ( return if (!responseItem.workflowProcess.tracing) responseItem.workflowProcess.tracing = [] - responseItem.workflowProcess.tracing.push({ + + upsertTracingNodeOnResumeStart(responseItem.workflowProcess.tracing, { ...iterationStartedData, status: WorkflowRunningStatus.Running, }) @@ -778,11 +784,14 @@ export const useChat = ( if (!responseItem.workflowProcess?.tracing) return const tracing = responseItem.workflowProcess.tracing - const iterationIndex = tracing.findIndex(item => item.id === iterationFinishedData.id)! + const iterationIndex = findTracingIndexByExecutionOrUniqueNodeId(tracing, { + executionId: iterationFinishedData.id, + nodeId: iterationFinishedData.node_id, + parallelId: iterationFinishedData.execution_metadata?.parallel_id, + }) if (iterationIndex > -1) { tracing[iterationIndex] = { - ...tracing[iterationIndex], - ...iterationFinishedData, + ...mergeTracingNodePreservingExecutionMetadata(tracing[iterationIndex], iterationFinishedData), status: WorkflowRunningStatus.Succeeded, } } @@ -812,14 +821,16 @@ export const useChat = ( if (nodeFinishedData.loop_id) return - const currentIndex = responseItem.workflowProcess.tracing.findIndex((item) => { - if (!item.execution_metadata?.parallel_id) - return item.id === nodeFinishedData.id - - return item.id === nodeFinishedData.id && (item.execution_metadata?.parallel_id === nodeFinishedData.execution_metadata?.parallel_id) + const currentIndex = findTracingIndexByExecutionOrUniqueNodeId(responseItem.workflowProcess.tracing, { + executionId: nodeFinishedData.id, + nodeId: nodeFinishedData.node_id, + parallelId: nodeFinishedData.execution_metadata?.parallel_id, }) - if (currentIndex > -1) - responseItem.workflowProcess.tracing[currentIndex] = nodeFinishedData as any + if (currentIndex > -1) { + responseItem.workflowProcess.tracing[currentIndex] = { + ...mergeTracingNodePreservingExecutionMetadata(responseItem.workflowProcess.tracing[currentIndex], nodeFinishedData), + } as any + } }) }, onLoopStart: ({ data: loopStartedData }) => { @@ -828,7 +839,8 @@ export const useChat = ( return if (!responseItem.workflowProcess.tracing) responseItem.workflowProcess.tracing = [] - responseItem.workflowProcess.tracing.push({ + + upsertTracingNodeOnResumeStart(responseItem.workflowProcess.tracing, { ...loopStartedData, status: WorkflowRunningStatus.Running, }) @@ -839,11 +851,14 @@ export const useChat = ( if (!responseItem.workflowProcess?.tracing) return const tracing = responseItem.workflowProcess.tracing - const loopIndex = tracing.findIndex(item => item.id === loopFinishedData.id)! + const loopIndex = findTracingIndexByExecutionOrUniqueNodeId(tracing, { + executionId: loopFinishedData.id, + nodeId: loopFinishedData.node_id, + parallelId: loopFinishedData.execution_metadata?.parallel_id, + }) if (loopIndex > -1) { tracing[loopIndex] = { - ...tracing[loopIndex], - ...loopFinishedData, + ...mergeTracingNodePreservingExecutionMetadata(tracing[loopIndex], loopFinishedData), status: WorkflowRunningStatus.Succeeded, } } diff --git a/web/app/components/workflow/utils/tracing-execution.spec.ts b/web/app/components/workflow/utils/tracing-execution.spec.ts new file mode 100644 index 0000000000..8e66146b53 --- /dev/null +++ b/web/app/components/workflow/utils/tracing-execution.spec.ts @@ -0,0 +1,136 @@ +import type { AgentLogItem, NodeTracing } from '@/types/workflow' +import { + findTracingIndexByExecutionOrUniqueNodeId, + mergeTracingNodePreservingExecutionMetadata, + upsertTracingNodeOnResumeStart, +} from './tracing-execution' + +const createTrace = (overrides: Partial = {}): NodeTracing => ({ + id: 'trace-1', + index: 0, + predecessor_node_id: '', + node_id: 'node-1', + node_type: 'llm' as NodeTracing['node_type'], + title: 'Node 1', + inputs: {}, + inputs_truncated: false, + process_data: {}, + process_data_truncated: false, + outputs: {}, + outputs_truncated: false, + status: 'succeeded' as NodeTracing['status'], + elapsed_time: 0, + metadata: { + iterator_length: 0, + iterator_index: 0, + loop_length: 0, + loop_index: 0, + }, + created_at: 0, + created_by: { + id: 'user-1', + name: 'User', + email: 'user@example.com', + }, + finished_at: 0, + ...overrides, +}) + +describe('tracing-execution utils', () => { + beforeEach(() => { + vi.clearAllMocks() + }) + + it('should prefer the exact execution id when the same node ran multiple times', () => { + const tracing = [ + createTrace({ id: 'trace-1', node_id: 'node-1' }), + createTrace({ id: 'trace-2', node_id: 'node-1' }), + ] + + expect(findTracingIndexByExecutionOrUniqueNodeId(tracing, { + executionId: 'trace-2', + nodeId: 'node-1', + })).toBe(1) + }) + + it('should fall back to a unique node id when the execution id is missing', () => { + const tracing = [ + createTrace({ id: 'trace-1', node_id: 'node-1' }), + ] + + expect(findTracingIndexByExecutionOrUniqueNodeId(tracing, { + executionId: 'missing-trace', + nodeId: 'node-1', + })).toBe(0) + }) + + it('should not fall back to node id when multiple executions exist', () => { + const tracing = [ + createTrace({ id: 'trace-1', node_id: 'node-1' }), + createTrace({ id: 'trace-2', node_id: 'node-1' }), + ] + + expect(findTracingIndexByExecutionOrUniqueNodeId(tracing, { + executionId: 'missing-trace', + nodeId: 'node-1', + })).toBe(-1) + }) + + it('should merge into an existing resume trace instead of appending a duplicate', () => { + const tracing: NodeTracing[] = [ + createTrace({ id: 'trace-1', node_id: 'node-1', title: 'old title' }), + ] + + upsertTracingNodeOnResumeStart(tracing, createTrace({ node_id: 'node-1', title: 'new title' })) + + expect(tracing).toHaveLength(1) + expect(tracing[0].id).toBe('trace-1') + expect(tracing[0].title).toBe('new title') + }) + + it('should append a new trace when a new execution id appears', () => { + const tracing: NodeTracing[] = [ + createTrace({ id: 'trace-1', node_id: 'node-1' }), + ] + + upsertTracingNodeOnResumeStart(tracing, createTrace({ id: 'trace-2', node_id: 'node-1', title: 'second run' })) + + expect(tracing).toHaveLength(2) + expect(tracing[1].id).toBe('trace-2') + }) + + it('should preserve agent logs when merging finish metadata', () => { + const agentLogItem: AgentLogItem = { + node_execution_id: 'trace-1', + message_id: 'm-1', + node_id: 'node-1', + label: 'tool', + data: {}, + status: 'success', + } + + const currentNode = createTrace({ + execution_metadata: { + total_tokens: 1, + total_price: 0, + currency: 'USD', + agent_log: [agentLogItem], + parallel_id: 'p-1', + }, + }) + + const mergedNode = mergeTracingNodePreservingExecutionMetadata(currentNode, { + status: 'succeeded' as NodeTracing['status'], + execution_metadata: { + total_tokens: 2, + total_price: 1, + currency: 'USD', + parallel_id: 'p-1', + extra: 'value', + } as NodeTracing['execution_metadata'], + }) + + expect(mergedNode.execution_metadata?.agent_log).toEqual([agentLogItem]) + expect((mergedNode.execution_metadata as Record).extra).toBe('value') + }) +}) diff --git a/web/app/components/workflow/utils/tracing-execution.ts b/web/app/components/workflow/utils/tracing-execution.ts new file mode 100644 index 0000000000..84098d4d55 --- /dev/null +++ b/web/app/components/workflow/utils/tracing-execution.ts @@ -0,0 +1,76 @@ +import type { NodeTracing } from '@/types/workflow' + +type TracingLookup = { + executionId?: string + nodeId?: string + parallelId?: string + allowNodeIdFallbackWhenExecutionIdMissing?: boolean +} + +const getParallelId = (trace: Partial) => { + return trace.execution_metadata?.parallel_id || trace.parallel_id +} + +export const findTracingIndexByExecutionOrUniqueNodeId = ( + tracing: Partial[], + { executionId, nodeId, parallelId, allowNodeIdFallbackWhenExecutionIdMissing = true }: TracingLookup, +) => { + if (executionId) { + const exactIndex = tracing.findIndex(item => item.id === executionId) + if (exactIndex > -1) + return exactIndex + + if (!allowNodeIdFallbackWhenExecutionIdMissing) + return -1 + } + + if (!nodeId) + return -1 + + const candidates = tracing + .map((item, index) => ({ item, index })) + .filter(({ item }) => item.node_id === nodeId) + .filter(({ item }) => !parallelId || getParallelId(item) === parallelId) + + return candidates.length === 1 ? candidates[0].index : -1 +} + +export const upsertTracingNodeOnResumeStart = ( + tracing: NodeTracing[], + startedNode: NodeTracing, +) => { + const currentIndex = findTracingIndexByExecutionOrUniqueNodeId(tracing, { + executionId: startedNode.id, + nodeId: startedNode.node_id, + parallelId: getParallelId(startedNode), + allowNodeIdFallbackWhenExecutionIdMissing: false, + }) + + if (currentIndex > -1) { + tracing[currentIndex] = { + ...tracing[currentIndex], + ...startedNode, + } + return currentIndex + } + + tracing.push(startedNode) + return tracing.length - 1 +} + +export const mergeTracingNodePreservingExecutionMetadata = ( + currentNode: NodeTracing, + incomingNode: Partial, +): NodeTracing => { + return { + ...currentNode, + ...incomingNode, + execution_metadata: incomingNode.execution_metadata + ? { + ...currentNode.execution_metadata, + ...incomingNode.execution_metadata, + agent_log: incomingNode.execution_metadata.agent_log ?? currentNode.execution_metadata?.agent_log, + } + : currentNode.execution_metadata, + } +}