feat(chat): implement workflow event handling and audio player management in chat hooks

This commit is contained in:
twwu 2026-01-09 17:05:38 +08:00
parent b479a36273
commit a280df2c07
4 changed files with 369 additions and 26 deletions

View File

@ -2,9 +2,10 @@ import type { FileEntity } from '../../file-uploader/types'
import type {
ChatConfig,
ChatItem,
ChatItemInTree,
OnSend,
} from '../types'
import { useCallback, useEffect, useMemo, useState } from 'react'
import { useCallback, useEffect, useMemo, useRef, useState } from 'react'
import AnswerIcon from '@/app/components/base/answer-icon'
import AppIcon from '@/app/components/base/app-icon'
import InputsForm from '@/app/components/base/chat/chat-with-history/inputs-form'
@ -72,6 +73,7 @@ const ChatWrapper = () => {
setTargetMessageId,
handleSend,
handleStop,
handleResume,
isResponding: respondingState,
suggestedQuestions,
} = useChat(
@ -118,8 +120,11 @@ const ChatWrapper = () => {
if (fileIsUploading)
return true
if (chatList.some(item => item.isAnswer && item.humanInputFormDataList && item.humanInputFormDataList.length > 0))
return true
return false
}, [inputsFormValue, inputsForms, allInputsHidden])
}, [allInputsHidden, inputsForms, chatList, inputsFormValue])
useEffect(() => {
if (currentChatInstanceRef.current)
@ -130,6 +135,36 @@ const ChatWrapper = () => {
setIsResponding(respondingState)
}, [respondingState, setIsResponding])
// Resume paused workflows when chat history is loaded
const resumedWorkflowsRef = useRef<Set<string>>(new Set())
useEffect(() => {
if (!appPrevChatTree || appPrevChatTree.length === 0)
return
// Find all answer items with workflow_run_id that need resumption
const checkForPausedWorkflows = (nodes: ChatItemInTree[]) => {
nodes.forEach((node) => {
if (node.isAnswer && node.workflow_run_id && node.humanInputFormDataList && node.humanInputFormDataList.length > 0) {
// This is a paused workflow waiting for human input
const workflowKey = `${node.workflow_run_id}-${node.id}`
if (!resumedWorkflowsRef.current.has(workflowKey)) {
resumedWorkflowsRef.current.add(workflowKey)
// Re-subscribe to workflow events
handleResume(
node.id,
node.workflow_run_id,
!isInstalledApp,
)
}
}
if (node.children && node.children.length > 0)
checkForPausedWorkflows(node.children)
})
}
checkForPausedWorkflows(appPrevChatTree)
}, [])
const doSend: OnSend = useCallback((message, files, isRegenerate = false, parentAnswer: ChatItem | null = null) => {
const data: any = {
query: message,

View File

@ -60,10 +60,12 @@ function getFormattedChatList(messages: any[]) {
const answerFiles = item.message_files?.filter((file: any) => file.belongs_to === 'assistant') || []
const humanInputFormDataList: HumanInputFormData[] = []
const humanInputFilledFormDataList: HumanInputFilledFormData[] = []
let workflowRunId = ''
if (item.status === 'paused') {
item.extra_contents?.forEach((content: ExtraContent) => {
if (content.type === 'human_input' && !content.submitted) {
humanInputFormDataList.push(content.form_definition)
workflowRunId = content.workflow_run_id
}
})
}
@ -85,6 +87,7 @@ function getFormattedChatList(messages: any[]) {
parentMessageId: `question-${item.id}`,
humanInputFormDataList,
humanInputFilledFormDataList,
workflow_run_id: workflowRunId,
})
})
return newChatList

View File

@ -73,6 +73,7 @@ export const useChat = (
const [suggestedQuestions, setSuggestQuestions] = useState<string[]>([])
const conversationMessagesAbortControllerRef = useRef<AbortController | null>(null)
const suggestedQuestionsAbortControllerRef = useRef<AbortController | null>(null)
const workflowEventsAbortControllerRef = useRef<AbortController | null>(null)
const params = useParams()
const pathname = usePathname()
@ -170,6 +171,8 @@ export const useChat = (
conversationMessagesAbortControllerRef.current.abort()
if (suggestedQuestionsAbortControllerRef.current)
suggestedQuestionsAbortControllerRef.current.abort()
if (workflowEventsAbortControllerRef.current)
workflowEventsAbortControllerRef.current.abort()
}, [stopChat, handleResponding])
const handleRestart = useCallback((cb?: any) => {
@ -181,6 +184,325 @@ export const useChat = (
cb?.()
}, [handleStop])
const createAudioPlayerManager = useCallback(() => {
let ttsUrl = ''
let ttsIsPublic = false
if (params.token) {
ttsUrl = '/text-to-audio'
ttsIsPublic = true
}
else if (params.appId) {
if (pathname.search('explore/installed') > -1)
ttsUrl = `/installed-apps/${params.appId}/text-to-audio`
else
ttsUrl = `/apps/${params.appId}/text-to-audio`
}
let player: AudioPlayer | null = null
const getOrCreatePlayer = () => {
if (!player)
player = AudioPlayerManager.getInstance().getAudioPlayer(ttsUrl, ttsIsPublic, uuidV4(), 'none', 'none', noop)
return player
}
return getOrCreatePlayer
}, [params.token, params.appId, pathname])
const handleResume = useCallback((
messageId: string,
workflowRunId: string,
isPublicAPI?: boolean,
) => {
const getOrCreatePlayer = createAudioPlayerManager()
// Re-subscribe to workflow events for the specific message
const url = `/workflow/${workflowRunId}/events`
const otherOptions: IOtherOptions = {
isPublicAPI,
getAbortController: (abortController) => {
workflowEventsAbortControllerRef.current = abortController
},
onData: (message: string, isFirstMessage: boolean, { conversationId: newConversationId, taskId }: any) => {
updateChatTreeNode(messageId, (responseItem) => {
const isAgentMode = responseItem.agent_thoughts && responseItem.agent_thoughts.length > 0
if (!isAgentMode) {
responseItem.content = responseItem.content + message
}
else {
const lastThought = responseItem.agent_thoughts?.[responseItem.agent_thoughts?.length - 1]
if (lastThought)
lastThought.thought = lastThought.thought + message
}
})
if (isFirstMessage && newConversationId)
conversationId.current = newConversationId
if (taskId)
taskIdRef.current = taskId
},
async onCompleted() {
handleResponding(false)
},
onFile(file) {
updateChatTreeNode(messageId, (responseItem) => {
const lastThought = responseItem.agent_thoughts?.[responseItem.agent_thoughts?.length - 1]
if (lastThought)
responseItem.agent_thoughts![responseItem.agent_thoughts!.length - 1].message_files = [...(lastThought as any).message_files, file]
})
},
onThought(thought) {
updateChatTreeNode(messageId, (responseItem) => {
if (thought.message_id)
responseItem.id = thought.message_id
if (thought.conversation_id)
responseItem.conversationId = thought.conversation_id
if (!responseItem.agent_thoughts)
responseItem.agent_thoughts = []
if (responseItem.agent_thoughts.length === 0) {
responseItem.agent_thoughts.push(thought)
}
else {
const lastThought = responseItem.agent_thoughts[responseItem.agent_thoughts.length - 1]
if (lastThought.id === thought.id) {
thought.thought = lastThought.thought
thought.message_files = lastThought.message_files
responseItem.agent_thoughts[responseItem.agent_thoughts.length - 1] = thought
}
else {
responseItem.agent_thoughts.push(thought)
}
}
})
},
onMessageEnd: (messageEnd) => {
updateChatTreeNode(messageId, (responseItem) => {
if (messageEnd.metadata?.annotation_reply) {
responseItem.annotation = ({
id: messageEnd.metadata.annotation_reply.id,
authorName: messageEnd.metadata.annotation_reply.account.name,
})
return
}
responseItem.citation = messageEnd.metadata?.retriever_resources || []
const processedFilesFromResponse = getProcessedFilesFromResponse(messageEnd.files || [])
responseItem.allFiles = uniqBy([...(responseItem.allFiles || []), ...(processedFilesFromResponse || [])], 'id')
})
},
onMessageReplace: (messageReplace) => {
updateChatTreeNode(messageId, (responseItem) => {
responseItem.content = messageReplace.answer
})
},
onError() {
handleResponding(false)
},
onWorkflowStarted: ({ workflow_run_id, task_id, data: { is_resumption } }) => {
handleResponding(true)
hasStopResponded.current = false
updateChatTreeNode(messageId, (responseItem) => {
if (is_resumption) {
if (responseItem.workflowProcess) {
responseItem.workflowProcess.status = WorkflowRunningStatus.Running
}
else {
responseItem.workflowProcess = {
status: WorkflowRunningStatus.Running,
tracing: [],
}
}
}
else {
taskIdRef.current = task_id
responseItem.workflow_run_id = workflow_run_id
responseItem.workflowProcess = {
status: WorkflowRunningStatus.Running,
tracing: [],
}
}
})
},
onWorkflowFinished: ({ data: workflowFinishedData }) => {
updateChatTreeNode(messageId, (responseItem) => {
if (responseItem.workflowProcess)
responseItem.workflowProcess.status = workflowFinishedData.status as WorkflowRunningStatus
})
},
onIterationStart: ({ data: iterationStartedData }) => {
updateChatTreeNode(messageId, (responseItem) => {
if (!responseItem.workflowProcess)
return
if (!responseItem.workflowProcess.tracing)
responseItem.workflowProcess.tracing = []
responseItem.workflowProcess.tracing.push({
...iterationStartedData,
status: WorkflowRunningStatus.Running,
})
})
},
onIterationFinish: ({ data: iterationFinishedData }) => {
updateChatTreeNode(messageId, (responseItem) => {
if (!responseItem.workflowProcess?.tracing)
return
const tracing = responseItem.workflowProcess.tracing
const iterationIndex = tracing.findIndex(item => item.node_id === iterationFinishedData.node_id
&& (item.execution_metadata?.parallel_id === iterationFinishedData.execution_metadata?.parallel_id || item.parallel_id === iterationFinishedData.execution_metadata?.parallel_id))!
if (iterationIndex > -1) {
tracing[iterationIndex] = {
...tracing[iterationIndex],
...iterationFinishedData,
status: WorkflowRunningStatus.Succeeded,
}
}
})
},
onNodeStarted: ({ data: nodeStartedData }) => {
updateChatTreeNode(messageId, (responseItem) => {
if (!responseItem.workflowProcess)
return
if (!responseItem.workflowProcess.tracing)
responseItem.workflowProcess.tracing = []
const { is_resumption } = nodeStartedData
if (is_resumption) {
const currentIndex = responseItem.workflowProcess.tracing.findIndex(item => item.node_id === nodeStartedData.node_id)
if (currentIndex > -1) {
responseItem.workflowProcess.tracing[currentIndex] = {
...nodeStartedData,
status: NodeRunningStatus.Running,
}
}
}
else {
if (nodeStartedData.iteration_id)
return
responseItem.workflowProcess.tracing.push({
...nodeStartedData,
status: WorkflowRunningStatus.Running,
})
}
})
},
onNodeFinished: ({ data: nodeFinishedData }) => {
updateChatTreeNode(messageId, (responseItem) => {
if (!responseItem.workflowProcess?.tracing)
return
if (nodeFinishedData.iteration_id)
return
const currentIndex = responseItem.workflowProcess.tracing.findIndex((item) => {
if (!item.execution_metadata?.parallel_id)
return item.id === nodeFinishedData.id
return item.id === nodeFinishedData.id && (item.execution_metadata?.parallel_id === nodeFinishedData.execution_metadata?.parallel_id)
})
if (currentIndex > -1)
responseItem.workflowProcess.tracing[currentIndex] = nodeFinishedData as any
})
},
onTTSChunk: (messageId: string, audio: string) => {
if (!audio || audio === '')
return
const audioPlayer = getOrCreatePlayer()
if (audioPlayer) {
audioPlayer.playAudioWithAudio(audio, true)
AudioPlayerManager.getInstance().resetMsgId(messageId)
}
},
onTTSEnd: (messageId: string, audio: string) => {
const audioPlayer = getOrCreatePlayer()
if (audioPlayer)
audioPlayer.playAudioWithAudio(audio, false)
},
onLoopStart: ({ data: loopStartedData }) => {
updateChatTreeNode(messageId, (responseItem) => {
if (!responseItem.workflowProcess)
return
if (!responseItem.workflowProcess.tracing)
responseItem.workflowProcess.tracing = []
responseItem.workflowProcess.tracing.push({
...loopStartedData,
status: WorkflowRunningStatus.Running,
})
})
},
onLoopFinish: ({ data: loopFinishedData }) => {
updateChatTreeNode(messageId, (responseItem) => {
if (!responseItem.workflowProcess?.tracing)
return
const tracing = responseItem.workflowProcess.tracing
const loopIndex = tracing.findIndex(item => item.node_id === loopFinishedData.node_id
&& (item.execution_metadata?.parallel_id === loopFinishedData.execution_metadata?.parallel_id || item.parallel_id === loopFinishedData.execution_metadata?.parallel_id))!
if (loopIndex > -1) {
tracing[loopIndex] = {
...tracing[loopIndex],
...loopFinishedData,
status: WorkflowRunningStatus.Succeeded,
}
}
})
},
onHumanInputRequired: ({ data: humanInputRequiredData }) => {
updateChatTreeNode(messageId, (responseItem) => {
if (!responseItem.humanInputFormDataList) {
responseItem.humanInputFormDataList = [humanInputRequiredData]
}
else {
const currentFormIndex = responseItem.humanInputFormDataList.findIndex(item => item.node_id === humanInputRequiredData.node_id)
if (currentFormIndex > -1) {
responseItem.humanInputFormDataList[currentFormIndex] = humanInputRequiredData
}
else {
responseItem.humanInputFormDataList.push(humanInputRequiredData)
}
}
if (responseItem.workflowProcess?.tracing) {
const currentTracingIndex = responseItem.workflowProcess.tracing.findIndex(item => item.node_id === humanInputRequiredData.node_id)
if (currentTracingIndex > -1)
responseItem.workflowProcess.tracing[currentTracingIndex].status = NodeRunningStatus.Paused
}
})
},
onHumanInputFormFilled: ({ data: humanInputFilledFormData }) => {
updateChatTreeNode(messageId, (responseItem) => {
if (responseItem.humanInputFormDataList?.length) {
const currentFormIndex = responseItem.humanInputFormDataList.findIndex(item => item.node_id === humanInputFilledFormData.node_id)
if (currentFormIndex > -1)
responseItem.humanInputFormDataList.splice(currentFormIndex, 1)
}
if (!responseItem.humanInputFilledFormDataList) {
responseItem.humanInputFilledFormDataList = [humanInputFilledFormData]
}
else {
responseItem.humanInputFilledFormDataList.push(humanInputFilledFormData)
}
})
},
onWorkflowPaused: ({ data: workflowPausedData }) => {
const resumeUrl = `/workflow/${workflowPausedData.workflow_run_id}/events`
sseGet(
resumeUrl,
{},
otherOptions,
)
updateChatTreeNode(messageId, (responseItem) => {
responseItem.workflowProcess!.status = WorkflowRunningStatus.Paused
})
},
}
sseGet(
url,
{},
otherOptions,
)
}, [updateChatTreeNode, handleResponding, createAudioPlayerManager])
const updateCurrentQAOnTree = useCallback(({
parentId,
responseItem,
@ -303,30 +625,13 @@ export const useChat = (
let isAgentMode = false
let hasSetResponseId = false
let ttsUrl = ''
let ttsIsPublic = false
if (params.token) {
ttsUrl = '/text-to-audio'
ttsIsPublic = true
}
else if (params.appId) {
if (pathname.search('explore/installed') > -1)
ttsUrl = `/installed-apps/${params.appId}/text-to-audio`
else
ttsUrl = `/apps/${params.appId}/text-to-audio`
}
// Lazy initialization: Only create AudioPlayer when TTS is actually needed
// This prevents opening audio channel unnecessarily
let player: AudioPlayer | null = null
const getOrCreatePlayer = () => {
if (!player)
player = AudioPlayerManager.getInstance().getAudioPlayer(ttsUrl, ttsIsPublic, uuidV4(), 'none', 'none', noop)
return player
}
const getOrCreatePlayer = createAudioPlayerManager()
const otherOptions: IOtherOptions = {
isPublicAPI,
getAbortController: (abortController) => {
workflowEventsAbortControllerRef.current = abortController
},
onData: (message: string, isFirstMessage: boolean, { conversationId: newConversationId, messageId, taskId }: any) => {
if (!isAgentMode) {
responseItem.content = responseItem.content + message
@ -728,9 +1033,7 @@ export const useChat = (
notify,
handleResponding,
formatTime,
params.token,
params.appId,
pathname,
createAudioPlayerManager,
formSettings,
])
@ -798,6 +1101,7 @@ export const useChat = (
isResponding,
setIsResponding,
handleSend,
handleResume,
suggestedQuestions,
handleRestart,
handleStop,

View File

@ -73,6 +73,7 @@ export type ExtraContent
type: 'human_input'
submitted: false
form_definition: HumanInputFormData
workflow_run_id: string
}
| {
type: 'human_input'