feat(workflow): enhance workflow run callbacks with additional data tracking (#36149)

Co-authored-by: CodingOnStar <hanxujiang@dify.com>
Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
This commit is contained in:
Coding On Star 2026-05-14 14:20:12 +08:00 committed by GitHub
parent 55f95dbc36
commit 7066372892
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 225 additions and 27 deletions

View File

@ -2962,11 +2962,6 @@
"count": 2
}
},
"web/app/components/workflow-app/hooks/use-workflow-run.ts": {
"ts/no-explicit-any": {
"count": 5
}
},
"web/app/components/workflow-app/hooks/use-workflow-template.ts": {
"ts/no-explicit-any": {
"count": 2

View File

@ -64,6 +64,12 @@ const createUserCallbacks = () => ({
onCompleted: vi.fn(),
})
const createWorkflowData = () => ({
result: { status: 'running' },
tracing: [{ node_id: 'node-1', status: 'running' }],
resultText: 'partial result',
})
describe('useWorkflowRun callbacks helpers', () => {
beforeEach(() => {
vi.clearAllMocks()
@ -77,6 +83,8 @@ describe('useWorkflowRun callbacks helpers', () => {
const fetchInspectVars = vi.fn()
const invalidAllLastRun = vi.fn()
const trackWorkflowRunFailed = vi.fn()
const workflowData = createWorkflowData()
const getWorkflowRunningData = vi.fn(() => workflowData)
const userOnWorkflowFinished = vi.fn()
const userOnError = vi.fn()
const userOnWorkflowPaused = vi.fn()
@ -95,6 +103,7 @@ describe('useWorkflowRun callbacks helpers', () => {
invalidateRunHistory,
clearAbortController,
clearListeningState,
getWorkflowRunningData,
trackWorkflowRunFailed,
handlers,
callbacks: {
@ -118,7 +127,8 @@ describe('useWorkflowRun callbacks helpers', () => {
expect(clearAbortController).toHaveBeenCalled()
expect(handlers.handleWorkflowFailed).toHaveBeenCalled()
expect(userOnError).toHaveBeenCalled()
expect(trackWorkflowRunFailed).toHaveBeenCalledWith({ error: 'failed', node_type: 'llm' })
expect(getWorkflowRunningData).toHaveBeenCalled()
expect(trackWorkflowRunFailed).toHaveBeenCalledWith({ error: 'failed', node_type: 'llm' }, workflowData)
callbacks.onTTSChunk?.('message-1', 'audio-chunk')
expect(getOrCreatePlayer).toHaveBeenCalled()
@ -149,6 +159,7 @@ describe('useWorkflowRun callbacks helpers', () => {
invalidateRunHistory: vi.fn(),
clearAbortController: vi.fn(),
clearListeningState: vi.fn(),
getWorkflowRunningData: vi.fn(() => createWorkflowData()),
trackWorkflowRunFailed: vi.fn(),
handlers,
callbacks: {},
@ -166,6 +177,7 @@ describe('useWorkflowRun callbacks helpers', () => {
invalidateRunHistory: vi.fn(),
clearAbortController: vi.fn(),
clearListeningState: vi.fn(),
getWorkflowRunningData: vi.fn(() => createWorkflowData()),
trackWorkflowRunFailed: vi.fn(),
handlers,
callbacks: {},
@ -188,6 +200,10 @@ describe('useWorkflowRun callbacks helpers', () => {
finalCallbacks.onTTSChunk?.('message-2', 'audio-chunk')
expect(player.playAudioWithAudio).toHaveBeenCalledWith('audio-chunk', true)
expect(mockResetMsgId).toHaveBeenCalledWith('message-2')
finalCallbacks.onTTSChunk?.('message-3', '')
expect(player.playAudioWithAudio).toHaveBeenCalledTimes(1)
expect(mockResetMsgId).toHaveBeenCalledTimes(1)
})
it('should route base workflow events through handlers, user callbacks, and pause continuation with the same callback object', async () => {
@ -199,6 +215,8 @@ describe('useWorkflowRun callbacks helpers', () => {
const fetchInspectVars = vi.fn()
const invalidAllLastRun = vi.fn()
const trackWorkflowRunFailed = vi.fn()
const workflowData = createWorkflowData()
const getWorkflowRunningData = vi.fn(() => workflowData)
const player = {
playAudioWithAudio: vi.fn(),
} as unknown as AudioPlayer
@ -213,6 +231,7 @@ describe('useWorkflowRun callbacks helpers', () => {
invalidateRunHistory,
clearAbortController,
clearListeningState,
getWorkflowRunningData,
trackWorkflowRunFailed,
handlers,
callbacks: userCallbacks,
@ -297,7 +316,8 @@ describe('useWorkflowRun callbacks helpers', () => {
expect(clearAbortController).toHaveBeenCalled()
expect(handlers.handleWorkflowFailed).toHaveBeenCalled()
expect(userCallbacks.onError).toHaveBeenCalledWith({ error: 'failed', node_type: 'llm' }, '500')
expect(trackWorkflowRunFailed).toHaveBeenCalledWith({ error: 'failed', node_type: 'llm' })
expect(getWorkflowRunningData).toHaveBeenCalled()
expect(trackWorkflowRunFailed).toHaveBeenCalledWith({ error: 'failed', node_type: 'llm' }, workflowData)
expect(invalidateRunHistory).toHaveBeenCalledWith('/apps/app-1/workflow-runs')
})
@ -317,6 +337,7 @@ describe('useWorkflowRun callbacks helpers', () => {
invalidateRunHistory: vi.fn(),
clearAbortController: vi.fn(),
clearListeningState: vi.fn(),
getWorkflowRunningData: vi.fn(() => createWorkflowData()),
trackWorkflowRunFailed: vi.fn(),
handlers,
callbacks: {},
@ -340,6 +361,11 @@ describe('useWorkflowRun callbacks helpers', () => {
const fetchInspectVars = vi.fn()
const invalidAllLastRun = vi.fn()
const invalidateRunHistory = vi.fn()
const clearAbortController = vi.fn()
const clearListeningState = vi.fn()
const trackWorkflowRunFailed = vi.fn()
const workflowData = createWorkflowData()
const getWorkflowRunningData = vi.fn(() => workflowData)
const setAbortController = vi.fn()
const player = {
playAudioWithAudio: vi.fn(),
@ -355,6 +381,7 @@ describe('useWorkflowRun callbacks helpers', () => {
invalidateRunHistory: vi.fn(),
clearAbortController: vi.fn(),
clearListeningState: vi.fn(),
getWorkflowRunningData: vi.fn(() => createWorkflowData()),
trackWorkflowRunFailed: vi.fn(),
handlers,
callbacks: {},
@ -370,9 +397,10 @@ describe('useWorkflowRun callbacks helpers', () => {
fetchInspectVars,
invalidAllLastRun,
invalidateRunHistory,
clearAbortController: vi.fn(),
clearListeningState: vi.fn(),
trackWorkflowRunFailed: vi.fn(),
clearAbortController,
clearListeningState,
getWorkflowRunningData,
trackWorkflowRunFailed,
handlers,
callbacks: userCallbacks,
restCallback: {},
@ -444,8 +472,12 @@ describe('useWorkflowRun callbacks helpers', () => {
expect(mockSseGet).toHaveBeenCalledWith('/workflow/run-2/events', {}, finalCallbacks)
expect(player.playAudioWithAudio).toHaveBeenCalledWith('audio-chunk', true)
expect(player.playAudioWithAudio).toHaveBeenCalledWith('audio-finished', false)
expect(clearAbortController).toHaveBeenCalled()
expect(handlers.handleWorkflowFailed).toHaveBeenCalled()
expect(clearListeningState).toHaveBeenCalled()
expect(userCallbacks.onError).toHaveBeenCalledWith({ error: 'failed' }, '500')
expect(getWorkflowRunningData).toHaveBeenCalled()
expect(trackWorkflowRunFailed).toHaveBeenCalledWith({ error: 'failed' }, workflowData)
expect(invalidateRunHistory).toHaveBeenCalledWith('/apps/app-1/workflow-runs')
})
})

View File

@ -17,6 +17,7 @@ type DebugControllerWindow = Window & {
type WorkflowStoreState = {
backupDraft?: unknown
environmentVariables?: unknown
workflowRunningData?: unknown
setBackupDraft?: (value: unknown) => void
setEnvironmentVariables?: (value: unknown) => void
setWorkflowRunningData?: (value: unknown) => void
@ -219,13 +220,16 @@ vi.mock('../use-workflow-run-callbacks', async (importOriginal) => {
const createWorkflowStoreState = () => ({
backupDraft: undefined,
environmentVariables: [{ id: 'env-current', value: 'secret' }],
workflowRunningData: undefined,
setBackupDraft: vi.fn((value: unknown) => {
mocks.workflowStoreState.backupDraft = value
}),
setEnvironmentVariables: vi.fn((value: unknown) => {
mocks.workflowStoreState.environmentVariables = value
}),
setWorkflowRunningData: vi.fn(),
setWorkflowRunningData: vi.fn((value: unknown) => {
mocks.workflowStoreState.workflowRunningData = value
}),
setIsListening: vi.fn(),
setShowVariableInspectPanel: vi.fn(),
setListeningTriggerType: vi.fn(),
@ -253,6 +257,15 @@ describe('useWorkflowRun', () => {
mocks.mockGetAudioPlayer.mockReturnValue({
playAudioWithAudio: vi.fn(),
})
mocks.runEventHandlers.handleWorkflowFailed.mockImplementation(() => {
const workflowRunningData = mocks.workflowStoreState.workflowRunningData
if (typeof workflowRunningData !== 'object' || workflowRunningData === null)
return
const result = (workflowRunningData as { result?: { status?: string } }).result
if (result)
result.status = WorkflowRunningStatus.Failed
})
mocks.workflowStoreState.backupDraft = undefined
Object.assign(mocks.workflowStoreState, createWorkflowStoreState())
mocks.workflowStoreSetState.mockImplementation((partial: Record<string, unknown>) => {
@ -415,15 +428,88 @@ describe('useWorkflowRun', () => {
})
const baseCallbackFactoryContext = mocks.mockCreateBaseWorkflowRunCallbacks.mock.calls.at(-1)?.[0] as {
trackWorkflowRunFailed: (params: { error?: string, node_type?: string }) => void
getWorkflowRunningData: () => unknown
trackWorkflowRunFailed: (params: unknown, workflowData: unknown) => void
}
const workflowData = {
result: { status: WorkflowRunningStatus.Running },
tracing: [{ node_id: 'node-1', status: 'running' }],
}
baseCallbackFactoryContext.trackWorkflowRunFailed({ error: 'failed', node_type: 'llm' })
baseCallbackFactoryContext.trackWorkflowRunFailed({ error: 'failed', node_type: 'llm' }, workflowData)
expect(mocks.mockTrackEvent).toHaveBeenCalledWith('workflow_run_failed', {
workflow_id: 'flow-1',
reason: 'failed',
node_type: 'llm',
data: {
workflow_status: WorkflowRunningStatus.Running,
workflow_tracing_count: 1,
workflow_data: workflowData,
workflow_data_json: JSON.stringify(workflowData),
},
})
mocks.mockTrackEvent.mockClear()
baseCallbackFactoryContext.trackWorkflowRunFailed('Server Error', workflowData)
expect(mocks.mockTrackEvent).toHaveBeenCalledWith('workflow_run_failed', {
workflow_id: 'flow-1',
reason: 'Server Error',
node_type: undefined,
data: {
workflow_status: WorkflowRunningStatus.Running,
workflow_tracing_count: 1,
workflow_data: workflowData,
workflow_data_json: JSON.stringify(workflowData),
},
})
})
it('should track workflow failures when the error or workflow data is malformed', async () => {
const { result } = renderHook(() => useWorkflowRun())
await act(async () => {
await result.current.handleRun({ inputs: { query: 'hello' } })
})
const baseCallbackFactoryContext = mocks.mockCreateBaseWorkflowRunCallbacks.mock.calls.at(-1)?.[0] as {
trackWorkflowRunFailed: (params: unknown, workflowData: unknown) => void
}
baseCallbackFactoryContext.trackWorkflowRunFailed(new Error('network down'), undefined)
expect(mocks.mockTrackEvent).toHaveBeenCalledWith('workflow_run_failed', {
workflow_id: 'flow-1',
reason: 'network down',
node_type: undefined,
data: {
workflow_status: undefined,
workflow_tracing_count: undefined,
workflow_data: undefined,
workflow_data_json: undefined,
},
})
mocks.mockTrackEvent.mockClear()
const circularWorkflowData: Record<string, unknown> = {
result: null,
tracing: 'not-a-list',
}
circularWorkflowData.self = circularWorkflowData
baseCallbackFactoryContext.trackWorkflowRunFailed({ message: 'missing error' }, circularWorkflowData)
expect(mocks.mockTrackEvent).toHaveBeenCalledWith('workflow_run_failed', {
workflow_id: 'flow-1',
reason: undefined,
node_type: undefined,
data: {
workflow_status: undefined,
workflow_tracing_count: undefined,
workflow_data: circularWorkflowData,
workflow_data_json: undefined,
},
})
})

View File

@ -61,7 +61,8 @@ type CallbackContext = {
invalidateRunHistory: (url: string) => void
clearAbortController: () => void
clearListeningState: () => void
trackWorkflowRunFailed: (params: unknown) => void
getWorkflowRunningData: () => unknown
trackWorkflowRunFailed: (params: unknown, workflowData: unknown) => void
handlers: WorkflowRunEventHandlers
callbacks: UserCallbackHandlers
restCallback: IOtherOptions
@ -87,6 +88,7 @@ export const createBaseWorkflowRunCallbacks = ({
invalidateRunHistory,
clearAbortController,
clearListeningState,
getWorkflowRunningData,
trackWorkflowRunFailed,
handlers,
callbacks,
@ -138,13 +140,14 @@ export const createBaseWorkflowRunCallbacks = ({
const wrappedOnError: IOtherOptions['onError'] = (params, code) => {
clearAbortController()
handleWorkflowFailed()
const workflowData = getWorkflowRunningData()
invalidateRunHistory(runHistoryUrl)
clearListeningState()
if (onError)
onError(params, code)
trackWorkflowRunFailed(params)
trackWorkflowRunFailed(params, workflowData)
}
const wrappedOnCompleted: IOtherOptions['onCompleted'] = async (hasError, errorMessage) => {
@ -293,9 +296,10 @@ export const createFinalWorkflowRunCallbacks = ({
fetchInspectVars,
invalidAllLastRun,
invalidateRunHistory,
clearAbortController: _clearAbortController,
clearListeningState: _clearListeningState,
trackWorkflowRunFailed: _trackWorkflowRunFailed,
clearAbortController,
clearListeningState,
getWorkflowRunningData,
trackWorkflowRunFailed,
handlers,
callbacks,
restCallback,
@ -359,11 +363,15 @@ export const createFinalWorkflowRunCallbacks = ({
}
},
onError: (params, code) => {
clearAbortController()
handleWorkflowFailed()
const workflowData = getWorkflowRunningData()
invalidateRunHistory(runHistoryUrl)
clearListeningState()
if (onError)
onError(params, code)
trackWorkflowRunFailed(params, workflowData)
},
onNodeStarted: (params) => {
handleWorkflowNodeStarted(params, { clientWidth, clientHeight })

View File

@ -48,6 +48,54 @@ import {
validateWorkflowRunRequest,
} from './use-workflow-run-utils'
type WorkflowRunParams = Record<string, unknown> & {
token?: string
appId?: string
}
type DebugAbortController = {
abort: () => void
}
type WorkflowDebugWindow = Window & {
__webhookDebugAbortController?: DebugAbortController
__pluginDebugAbortController?: DebugAbortController
__scheduleDebugAbortController?: DebugAbortController
__allTriggersDebugAbortController?: DebugAbortController
}
const stringifyWorkflowData = (workflowData: unknown) => {
if (!workflowData)
return undefined
try {
return JSON.stringify(workflowData)
}
catch {
return undefined
}
}
const getWorkflowStatus = (workflowData: unknown) => {
if (typeof workflowData !== 'object' || workflowData === null)
return undefined
const result = (workflowData as Record<string, unknown>).result
if (typeof result !== 'object' || result === null)
return undefined
const status = (result as Record<string, unknown>).status
return typeof status === 'string' ? status : undefined
}
const getWorkflowTracingCount = (workflowData: unknown) => {
if (typeof workflowData !== 'object' || workflowData === null)
return undefined
const tracing = (workflowData as Record<string, unknown>).tracing
return Array.isArray(tracing) ? tracing.length : undefined
}
export const useWorkflowRun = () => {
const store = useStoreApi()
const workflowStore = useWorkflowStore()
@ -141,12 +189,12 @@ export const useWorkflowRun = () => {
}, [handleUpdateWorkflowCanvas, workflowStore, featuresStore])
const handleRun = useCallback(async (
params: any,
params: WorkflowRunParams | null | undefined,
callback?: IOtherOptions,
options?: HandleRunOptions,
) => {
const runMode = options?.mode ?? TriggerType.UserInput
const resolvedParams = params ?? {}
const resolvedParams: WorkflowRunParams = params ?? {}
const {
getNodes,
setNodes,
@ -297,9 +345,34 @@ export const useWorkflowRun = () => {
onCompleted,
}
const trackWorkflowRunFailed = (eventParams: unknown) => {
const payload = eventParams as { error?: string, node_type?: string }
trackEvent('workflow_run_failed', { workflow_id: flowId, reason: payload?.error, node_type: payload?.node_type })
const getWorkflowRunningData = () => workflowStore.getState().workflowRunningData
const trackWorkflowRunFailed = (eventParams: unknown, workflowData: unknown) => {
const payload = typeof eventParams === 'object' && eventParams !== null
? eventParams as Record<string, unknown>
: undefined
const reason = typeof eventParams === 'string'
? eventParams
: eventParams instanceof Error
? eventParams.message
: typeof payload?.error === 'string'
? payload.error
: undefined
const nodeType = typeof payload?.node_type === 'string'
? payload.node_type
: undefined
trackEvent('workflow_run_failed', {
workflow_id: flowId,
reason,
node_type: nodeType,
data: {
workflow_status: getWorkflowStatus(workflowData),
workflow_tracing_count: getWorkflowTracingCount(workflowData),
workflow_data: workflowData,
workflow_data_json: stringifyWorkflowData(workflowData),
},
})
}
const baseSseOptions = createBaseWorkflowRunCallbacks({
@ -312,6 +385,7 @@ export const useWorkflowRun = () => {
invalidateRunHistory,
clearAbortController,
clearListeningState: clearListeningStateInStore,
getWorkflowRunningData,
trackWorkflowRunFailed,
handlers: workflowRunEventHandlers,
callbacks: userCallbacks,
@ -346,6 +420,7 @@ export const useWorkflowRun = () => {
invalidateRunHistory,
clearAbortController,
clearListeningState: clearListeningStateInStore,
getWorkflowRunningData,
trackWorkflowRunFailed,
handlers: workflowRunEventHandlers,
callbacks: userCallbacks,
@ -393,19 +468,21 @@ export const useWorkflowRun = () => {
}
// Try webhook debug controller from global variable first
const webhookController = (window as any).__webhookDebugAbortController
const debugWindow = window as WorkflowDebugWindow
const webhookController = debugWindow.__webhookDebugAbortController
if (webhookController)
webhookController.abort()
const pluginController = (window as any).__pluginDebugAbortController
const pluginController = debugWindow.__pluginDebugAbortController
if (pluginController)
pluginController.abort()
const scheduleController = (window as any).__scheduleDebugAbortController
const scheduleController = debugWindow.__scheduleDebugAbortController
if (scheduleController)
scheduleController.abort()
const allTriggerController = (window as any).__allTriggersDebugAbortController
const allTriggerController = debugWindow.__allTriggersDebugAbortController
if (allTriggerController)
allTriggerController.abort()