mirror of
https://github.com/langgenius/dify.git
synced 2026-05-10 05:56:31 +08:00
fix: preserve workflow tracing across repeated runs
This commit is contained in:
parent
f659eb48c6
commit
226cf788d1
@ -43,6 +43,11 @@ import { useHooksStore } from '../../hooks-store'
|
||||
import { useWorkflowStore } from '../../store'
|
||||
import { NodeRunningStatus, WorkflowRunningStatus } from '../../types'
|
||||
import { upsertTopLevelTracingNodeOnStart } from '../../utils/top-level-tracing'
|
||||
import {
|
||||
findTracingIndexByExecutionOrUniqueNodeId,
|
||||
mergeTracingNodePreservingExecutionMetadata,
|
||||
upsertTracingNodeOnResumeStart,
|
||||
} from '../../utils/tracing-execution'
|
||||
|
||||
type GetAbortController = (abortController: AbortController) => void
|
||||
type SendCallback = {
|
||||
@ -470,8 +475,7 @@ export const useChat = (
|
||||
const currentTracingIndex = responseItem.workflowProcess!.tracing!.findIndex(item => item.id === data.id)
|
||||
if (currentTracingIndex > -1) {
|
||||
responseItem.workflowProcess!.tracing[currentTracingIndex] = {
|
||||
...responseItem.workflowProcess!.tracing[currentTracingIndex],
|
||||
...data,
|
||||
...mergeTracingNodePreservingExecutionMetadata(responseItem.workflowProcess!.tracing[currentTracingIndex], data),
|
||||
}
|
||||
updateCurrentQAOnTree({
|
||||
placeholderQuestionId,
|
||||
@ -497,8 +501,7 @@ export const useChat = (
|
||||
const currentTracingIndex = responseItem.workflowProcess!.tracing!.findIndex(item => item.id === data.id)
|
||||
if (currentTracingIndex > -1) {
|
||||
responseItem.workflowProcess!.tracing[currentTracingIndex] = {
|
||||
...responseItem.workflowProcess!.tracing[currentTracingIndex],
|
||||
...data,
|
||||
...mergeTracingNodePreservingExecutionMetadata(responseItem.workflowProcess!.tracing[currentTracingIndex], data),
|
||||
}
|
||||
updateCurrentQAOnTree({
|
||||
placeholderQuestionId,
|
||||
@ -540,8 +543,7 @@ export const useChat = (
|
||||
const currentTracingIndex = responseItem.workflowProcess!.tracing!.findIndex(item => item.id === data.id)
|
||||
if (currentTracingIndex > -1) {
|
||||
responseItem.workflowProcess!.tracing[currentTracingIndex] = {
|
||||
...responseItem.workflowProcess!.tracing[currentTracingIndex],
|
||||
...data,
|
||||
...mergeTracingNodePreservingExecutionMetadata(responseItem.workflowProcess!.tracing[currentTracingIndex], data),
|
||||
}
|
||||
updateCurrentQAOnTree({
|
||||
placeholderQuestionId,
|
||||
@ -552,7 +554,10 @@ export const useChat = (
|
||||
}
|
||||
},
|
||||
onAgentLog: ({ data }) => {
|
||||
const currentNodeIndex = responseItem.workflowProcess!.tracing!.findIndex(item => item.id === data.node_execution_id)
|
||||
const currentNodeIndex = findTracingIndexByExecutionOrUniqueNodeId(responseItem.workflowProcess!.tracing!, {
|
||||
executionId: data.node_execution_id,
|
||||
nodeId: data.node_id,
|
||||
})
|
||||
if (currentNodeIndex > -1) {
|
||||
const current = responseItem.workflowProcess!.tracing![currentNodeIndex]
|
||||
|
||||
@ -767,7 +772,8 @@ export const useChat = (
|
||||
return
|
||||
if (!responseItem.workflowProcess.tracing)
|
||||
responseItem.workflowProcess.tracing = []
|
||||
responseItem.workflowProcess.tracing.push({
|
||||
|
||||
upsertTracingNodeOnResumeStart(responseItem.workflowProcess.tracing, {
|
||||
...iterationStartedData,
|
||||
status: WorkflowRunningStatus.Running,
|
||||
})
|
||||
@ -778,11 +784,14 @@ export const useChat = (
|
||||
if (!responseItem.workflowProcess?.tracing)
|
||||
return
|
||||
const tracing = responseItem.workflowProcess.tracing
|
||||
const iterationIndex = tracing.findIndex(item => item.id === iterationFinishedData.id)!
|
||||
const iterationIndex = findTracingIndexByExecutionOrUniqueNodeId(tracing, {
|
||||
executionId: iterationFinishedData.id,
|
||||
nodeId: iterationFinishedData.node_id,
|
||||
parallelId: iterationFinishedData.execution_metadata?.parallel_id,
|
||||
})
|
||||
if (iterationIndex > -1) {
|
||||
tracing[iterationIndex] = {
|
||||
...tracing[iterationIndex],
|
||||
...iterationFinishedData,
|
||||
...mergeTracingNodePreservingExecutionMetadata(tracing[iterationIndex], iterationFinishedData),
|
||||
status: WorkflowRunningStatus.Succeeded,
|
||||
}
|
||||
}
|
||||
@ -812,14 +821,16 @@ export const useChat = (
|
||||
if (nodeFinishedData.loop_id)
|
||||
return
|
||||
|
||||
const currentIndex = responseItem.workflowProcess.tracing.findIndex((item) => {
|
||||
if (!item.execution_metadata?.parallel_id)
|
||||
return item.id === nodeFinishedData.id
|
||||
|
||||
return item.id === nodeFinishedData.id && (item.execution_metadata?.parallel_id === nodeFinishedData.execution_metadata?.parallel_id)
|
||||
const currentIndex = findTracingIndexByExecutionOrUniqueNodeId(responseItem.workflowProcess.tracing, {
|
||||
executionId: nodeFinishedData.id,
|
||||
nodeId: nodeFinishedData.node_id,
|
||||
parallelId: nodeFinishedData.execution_metadata?.parallel_id,
|
||||
})
|
||||
if (currentIndex > -1)
|
||||
responseItem.workflowProcess.tracing[currentIndex] = nodeFinishedData as any
|
||||
if (currentIndex > -1) {
|
||||
responseItem.workflowProcess.tracing[currentIndex] = {
|
||||
...mergeTracingNodePreservingExecutionMetadata(responseItem.workflowProcess.tracing[currentIndex], nodeFinishedData),
|
||||
} as any
|
||||
}
|
||||
})
|
||||
},
|
||||
onLoopStart: ({ data: loopStartedData }) => {
|
||||
@ -828,7 +839,8 @@ export const useChat = (
|
||||
return
|
||||
if (!responseItem.workflowProcess.tracing)
|
||||
responseItem.workflowProcess.tracing = []
|
||||
responseItem.workflowProcess.tracing.push({
|
||||
|
||||
upsertTracingNodeOnResumeStart(responseItem.workflowProcess.tracing, {
|
||||
...loopStartedData,
|
||||
status: WorkflowRunningStatus.Running,
|
||||
})
|
||||
@ -839,11 +851,14 @@ export const useChat = (
|
||||
if (!responseItem.workflowProcess?.tracing)
|
||||
return
|
||||
const tracing = responseItem.workflowProcess.tracing
|
||||
const loopIndex = tracing.findIndex(item => item.id === loopFinishedData.id)!
|
||||
const loopIndex = findTracingIndexByExecutionOrUniqueNodeId(tracing, {
|
||||
executionId: loopFinishedData.id,
|
||||
nodeId: loopFinishedData.node_id,
|
||||
parallelId: loopFinishedData.execution_metadata?.parallel_id,
|
||||
})
|
||||
if (loopIndex > -1) {
|
||||
tracing[loopIndex] = {
|
||||
...tracing[loopIndex],
|
||||
...loopFinishedData,
|
||||
...mergeTracingNodePreservingExecutionMetadata(tracing[loopIndex], loopFinishedData),
|
||||
status: WorkflowRunningStatus.Succeeded,
|
||||
}
|
||||
}
|
||||
|
||||
136
web/app/components/workflow/utils/tracing-execution.spec.ts
Normal file
136
web/app/components/workflow/utils/tracing-execution.spec.ts
Normal file
@ -0,0 +1,136 @@
|
||||
import type { AgentLogItem, NodeTracing } from '@/types/workflow'
|
||||
import {
|
||||
findTracingIndexByExecutionOrUniqueNodeId,
|
||||
mergeTracingNodePreservingExecutionMetadata,
|
||||
upsertTracingNodeOnResumeStart,
|
||||
} from './tracing-execution'
|
||||
|
||||
const createTrace = (overrides: Partial<NodeTracing> = {}): NodeTracing => ({
|
||||
id: 'trace-1',
|
||||
index: 0,
|
||||
predecessor_node_id: '',
|
||||
node_id: 'node-1',
|
||||
node_type: 'llm' as NodeTracing['node_type'],
|
||||
title: 'Node 1',
|
||||
inputs: {},
|
||||
inputs_truncated: false,
|
||||
process_data: {},
|
||||
process_data_truncated: false,
|
||||
outputs: {},
|
||||
outputs_truncated: false,
|
||||
status: 'succeeded' as NodeTracing['status'],
|
||||
elapsed_time: 0,
|
||||
metadata: {
|
||||
iterator_length: 0,
|
||||
iterator_index: 0,
|
||||
loop_length: 0,
|
||||
loop_index: 0,
|
||||
},
|
||||
created_at: 0,
|
||||
created_by: {
|
||||
id: 'user-1',
|
||||
name: 'User',
|
||||
email: 'user@example.com',
|
||||
},
|
||||
finished_at: 0,
|
||||
...overrides,
|
||||
})
|
||||
|
||||
describe('tracing-execution utils', () => {
|
||||
beforeEach(() => {
|
||||
vi.clearAllMocks()
|
||||
})
|
||||
|
||||
it('should prefer the exact execution id when the same node ran multiple times', () => {
|
||||
const tracing = [
|
||||
createTrace({ id: 'trace-1', node_id: 'node-1' }),
|
||||
createTrace({ id: 'trace-2', node_id: 'node-1' }),
|
||||
]
|
||||
|
||||
expect(findTracingIndexByExecutionOrUniqueNodeId(tracing, {
|
||||
executionId: 'trace-2',
|
||||
nodeId: 'node-1',
|
||||
})).toBe(1)
|
||||
})
|
||||
|
||||
it('should fall back to a unique node id when the execution id is missing', () => {
|
||||
const tracing = [
|
||||
createTrace({ id: 'trace-1', node_id: 'node-1' }),
|
||||
]
|
||||
|
||||
expect(findTracingIndexByExecutionOrUniqueNodeId(tracing, {
|
||||
executionId: 'missing-trace',
|
||||
nodeId: 'node-1',
|
||||
})).toBe(0)
|
||||
})
|
||||
|
||||
it('should not fall back to node id when multiple executions exist', () => {
|
||||
const tracing = [
|
||||
createTrace({ id: 'trace-1', node_id: 'node-1' }),
|
||||
createTrace({ id: 'trace-2', node_id: 'node-1' }),
|
||||
]
|
||||
|
||||
expect(findTracingIndexByExecutionOrUniqueNodeId(tracing, {
|
||||
executionId: 'missing-trace',
|
||||
nodeId: 'node-1',
|
||||
})).toBe(-1)
|
||||
})
|
||||
|
||||
it('should merge into an existing resume trace instead of appending a duplicate', () => {
|
||||
const tracing: NodeTracing[] = [
|
||||
createTrace({ id: 'trace-1', node_id: 'node-1', title: 'old title' }),
|
||||
]
|
||||
|
||||
upsertTracingNodeOnResumeStart(tracing, createTrace({ node_id: 'node-1', title: 'new title' }))
|
||||
|
||||
expect(tracing).toHaveLength(1)
|
||||
expect(tracing[0].id).toBe('trace-1')
|
||||
expect(tracing[0].title).toBe('new title')
|
||||
})
|
||||
|
||||
it('should append a new trace when a new execution id appears', () => {
|
||||
const tracing: NodeTracing[] = [
|
||||
createTrace({ id: 'trace-1', node_id: 'node-1' }),
|
||||
]
|
||||
|
||||
upsertTracingNodeOnResumeStart(tracing, createTrace({ id: 'trace-2', node_id: 'node-1', title: 'second run' }))
|
||||
|
||||
expect(tracing).toHaveLength(2)
|
||||
expect(tracing[1].id).toBe('trace-2')
|
||||
})
|
||||
|
||||
it('should preserve agent logs when merging finish metadata', () => {
|
||||
const agentLogItem: AgentLogItem = {
|
||||
node_execution_id: 'trace-1',
|
||||
message_id: 'm-1',
|
||||
node_id: 'node-1',
|
||||
label: 'tool',
|
||||
data: {},
|
||||
status: 'success',
|
||||
}
|
||||
|
||||
const currentNode = createTrace({
|
||||
execution_metadata: {
|
||||
total_tokens: 1,
|
||||
total_price: 0,
|
||||
currency: 'USD',
|
||||
agent_log: [agentLogItem],
|
||||
parallel_id: 'p-1',
|
||||
},
|
||||
})
|
||||
|
||||
const mergedNode = mergeTracingNodePreservingExecutionMetadata(currentNode, {
|
||||
status: 'succeeded' as NodeTracing['status'],
|
||||
execution_metadata: {
|
||||
total_tokens: 2,
|
||||
total_price: 1,
|
||||
currency: 'USD',
|
||||
parallel_id: 'p-1',
|
||||
extra: 'value',
|
||||
} as NodeTracing['execution_metadata'],
|
||||
})
|
||||
|
||||
expect(mergedNode.execution_metadata?.agent_log).toEqual([agentLogItem])
|
||||
expect((mergedNode.execution_metadata as Record<string, unknown>).extra).toBe('value')
|
||||
})
|
||||
})
|
||||
76
web/app/components/workflow/utils/tracing-execution.ts
Normal file
76
web/app/components/workflow/utils/tracing-execution.ts
Normal file
@ -0,0 +1,76 @@
|
||||
import type { NodeTracing } from '@/types/workflow'
|
||||
|
||||
type TracingLookup = {
|
||||
executionId?: string
|
||||
nodeId?: string
|
||||
parallelId?: string
|
||||
allowNodeIdFallbackWhenExecutionIdMissing?: boolean
|
||||
}
|
||||
|
||||
const getParallelId = (trace: Partial<NodeTracing>) => {
|
||||
return trace.execution_metadata?.parallel_id || trace.parallel_id
|
||||
}
|
||||
|
||||
export const findTracingIndexByExecutionOrUniqueNodeId = (
|
||||
tracing: Partial<NodeTracing>[],
|
||||
{ executionId, nodeId, parallelId, allowNodeIdFallbackWhenExecutionIdMissing = true }: TracingLookup,
|
||||
) => {
|
||||
if (executionId) {
|
||||
const exactIndex = tracing.findIndex(item => item.id === executionId)
|
||||
if (exactIndex > -1)
|
||||
return exactIndex
|
||||
|
||||
if (!allowNodeIdFallbackWhenExecutionIdMissing)
|
||||
return -1
|
||||
}
|
||||
|
||||
if (!nodeId)
|
||||
return -1
|
||||
|
||||
const candidates = tracing
|
||||
.map((item, index) => ({ item, index }))
|
||||
.filter(({ item }) => item.node_id === nodeId)
|
||||
.filter(({ item }) => !parallelId || getParallelId(item) === parallelId)
|
||||
|
||||
return candidates.length === 1 ? candidates[0].index : -1
|
||||
}
|
||||
|
||||
export const upsertTracingNodeOnResumeStart = (
|
||||
tracing: NodeTracing[],
|
||||
startedNode: NodeTracing,
|
||||
) => {
|
||||
const currentIndex = findTracingIndexByExecutionOrUniqueNodeId(tracing, {
|
||||
executionId: startedNode.id,
|
||||
nodeId: startedNode.node_id,
|
||||
parallelId: getParallelId(startedNode),
|
||||
allowNodeIdFallbackWhenExecutionIdMissing: false,
|
||||
})
|
||||
|
||||
if (currentIndex > -1) {
|
||||
tracing[currentIndex] = {
|
||||
...tracing[currentIndex],
|
||||
...startedNode,
|
||||
}
|
||||
return currentIndex
|
||||
}
|
||||
|
||||
tracing.push(startedNode)
|
||||
return tracing.length - 1
|
||||
}
|
||||
|
||||
export const mergeTracingNodePreservingExecutionMetadata = (
|
||||
currentNode: NodeTracing,
|
||||
incomingNode: Partial<NodeTracing>,
|
||||
): NodeTracing => {
|
||||
return {
|
||||
...currentNode,
|
||||
...incomingNode,
|
||||
execution_metadata: incomingNode.execution_metadata
|
||||
? {
|
||||
...currentNode.execution_metadata,
|
||||
...incomingNode.execution_metadata,
|
||||
agent_log: incomingNode.execution_metadata.agent_log ?? currentNode.execution_metadata?.agent_log,
|
||||
}
|
||||
: currentNode.execution_metadata,
|
||||
}
|
||||
}
|
||||
Loading…
Reference in New Issue
Block a user