diff --git a/web/app/components/base/chat/chat-with-history/chat-wrapper.tsx b/web/app/components/base/chat/chat-with-history/chat-wrapper.tsx index 2222d7adf0..66ad3e7c5a 100644 --- a/web/app/components/base/chat/chat-with-history/chat-wrapper.tsx +++ b/web/app/components/base/chat/chat-with-history/chat-wrapper.tsx @@ -2,9 +2,10 @@ import type { FileEntity } from '../../file-uploader/types' import type { ChatConfig, ChatItem, + ChatItemInTree, OnSend, } from '../types' -import { useCallback, useEffect, useMemo, useState } from 'react' +import { useCallback, useEffect, useMemo, useRef, useState } from 'react' import AnswerIcon from '@/app/components/base/answer-icon' import AppIcon from '@/app/components/base/app-icon' import InputsForm from '@/app/components/base/chat/chat-with-history/inputs-form' @@ -72,6 +73,7 @@ const ChatWrapper = () => { setTargetMessageId, handleSend, handleStop, + handleResume, isResponding: respondingState, suggestedQuestions, } = useChat( @@ -118,8 +120,11 @@ const ChatWrapper = () => { if (fileIsUploading) return true + + if (chatList.some(item => item.isAnswer && item.humanInputFormDataList && item.humanInputFormDataList.length > 0)) + return true return false - }, [inputsFormValue, inputsForms, allInputsHidden]) + }, [allInputsHidden, inputsForms, chatList, inputsFormValue]) useEffect(() => { if (currentChatInstanceRef.current) @@ -130,6 +135,36 @@ const ChatWrapper = () => { setIsResponding(respondingState) }, [respondingState, setIsResponding]) + // Resume paused workflows when chat history is loaded + const resumedWorkflowsRef = useRef>(new Set()) + useEffect(() => { + if (!appPrevChatTree || appPrevChatTree.length === 0) + return + + // Find all answer items with workflow_run_id that need resumption + const checkForPausedWorkflows = (nodes: ChatItemInTree[]) => { + nodes.forEach((node) => { + if (node.isAnswer && node.workflow_run_id && node.humanInputFormDataList && node.humanInputFormDataList.length > 0) { + // This is a paused workflow waiting for human input + const workflowKey = `${node.workflow_run_id}-${node.id}` + if (!resumedWorkflowsRef.current.has(workflowKey)) { + resumedWorkflowsRef.current.add(workflowKey) + // Re-subscribe to workflow events + handleResume( + node.id, + node.workflow_run_id, + !isInstalledApp, + ) + } + } + if (node.children && node.children.length > 0) + checkForPausedWorkflows(node.children) + }) + } + + checkForPausedWorkflows(appPrevChatTree) + }, []) + const doSend: OnSend = useCallback((message, files, isRegenerate = false, parentAnswer: ChatItem | null = null) => { const data: any = { query: message, diff --git a/web/app/components/base/chat/chat-with-history/hooks.tsx b/web/app/components/base/chat/chat-with-history/hooks.tsx index 03536a388b..0ef7aeb5b4 100644 --- a/web/app/components/base/chat/chat-with-history/hooks.tsx +++ b/web/app/components/base/chat/chat-with-history/hooks.tsx @@ -60,10 +60,12 @@ function getFormattedChatList(messages: any[]) { const answerFiles = item.message_files?.filter((file: any) => file.belongs_to === 'assistant') || [] const humanInputFormDataList: HumanInputFormData[] = [] const humanInputFilledFormDataList: HumanInputFilledFormData[] = [] + let workflowRunId = '' if (item.status === 'paused') { item.extra_contents?.forEach((content: ExtraContent) => { if (content.type === 'human_input' && !content.submitted) { humanInputFormDataList.push(content.form_definition) + workflowRunId = content.workflow_run_id } }) } @@ -85,6 +87,7 @@ function getFormattedChatList(messages: any[]) { parentMessageId: `question-${item.id}`, humanInputFormDataList, humanInputFilledFormDataList, + workflow_run_id: workflowRunId, }) }) return newChatList diff --git a/web/app/components/base/chat/chat/hooks.ts b/web/app/components/base/chat/chat/hooks.ts index 047770ea27..bfcbdcfc60 100644 --- a/web/app/components/base/chat/chat/hooks.ts +++ b/web/app/components/base/chat/chat/hooks.ts @@ -73,6 +73,7 @@ export const useChat = ( const [suggestedQuestions, setSuggestQuestions] = useState([]) const conversationMessagesAbortControllerRef = useRef(null) const suggestedQuestionsAbortControllerRef = useRef(null) + const workflowEventsAbortControllerRef = useRef(null) const params = useParams() const pathname = usePathname() @@ -170,6 +171,8 @@ export const useChat = ( conversationMessagesAbortControllerRef.current.abort() if (suggestedQuestionsAbortControllerRef.current) suggestedQuestionsAbortControllerRef.current.abort() + if (workflowEventsAbortControllerRef.current) + workflowEventsAbortControllerRef.current.abort() }, [stopChat, handleResponding]) const handleRestart = useCallback((cb?: any) => { @@ -181,6 +184,325 @@ export const useChat = ( cb?.() }, [handleStop]) + const createAudioPlayerManager = useCallback(() => { + let ttsUrl = '' + let ttsIsPublic = false + if (params.token) { + ttsUrl = '/text-to-audio' + ttsIsPublic = true + } + else if (params.appId) { + if (pathname.search('explore/installed') > -1) + ttsUrl = `/installed-apps/${params.appId}/text-to-audio` + else + ttsUrl = `/apps/${params.appId}/text-to-audio` + } + + let player: AudioPlayer | null = null + const getOrCreatePlayer = () => { + if (!player) + player = AudioPlayerManager.getInstance().getAudioPlayer(ttsUrl, ttsIsPublic, uuidV4(), 'none', 'none', noop) + + return player + } + + return getOrCreatePlayer + }, [params.token, params.appId, pathname]) + + const handleResume = useCallback(( + messageId: string, + workflowRunId: string, + isPublicAPI?: boolean, + ) => { + const getOrCreatePlayer = createAudioPlayerManager() + // Re-subscribe to workflow events for the specific message + const url = `/workflow/${workflowRunId}/events` + + const otherOptions: IOtherOptions = { + isPublicAPI, + getAbortController: (abortController) => { + workflowEventsAbortControllerRef.current = abortController + }, + onData: (message: string, isFirstMessage: boolean, { conversationId: newConversationId, taskId }: any) => { + updateChatTreeNode(messageId, (responseItem) => { + const isAgentMode = responseItem.agent_thoughts && responseItem.agent_thoughts.length > 0 + if (!isAgentMode) { + responseItem.content = responseItem.content + message + } + else { + const lastThought = responseItem.agent_thoughts?.[responseItem.agent_thoughts?.length - 1] + if (lastThought) + lastThought.thought = lastThought.thought + message + } + }) + + if (isFirstMessage && newConversationId) + conversationId.current = newConversationId + + if (taskId) + taskIdRef.current = taskId + }, + async onCompleted() { + handleResponding(false) + }, + onFile(file) { + updateChatTreeNode(messageId, (responseItem) => { + const lastThought = responseItem.agent_thoughts?.[responseItem.agent_thoughts?.length - 1] + if (lastThought) + responseItem.agent_thoughts![responseItem.agent_thoughts!.length - 1].message_files = [...(lastThought as any).message_files, file] + }) + }, + onThought(thought) { + updateChatTreeNode(messageId, (responseItem) => { + if (thought.message_id) + responseItem.id = thought.message_id + if (thought.conversation_id) + responseItem.conversationId = thought.conversation_id + + if (!responseItem.agent_thoughts) + responseItem.agent_thoughts = [] + + if (responseItem.agent_thoughts.length === 0) { + responseItem.agent_thoughts.push(thought) + } + else { + const lastThought = responseItem.agent_thoughts[responseItem.agent_thoughts.length - 1] + if (lastThought.id === thought.id) { + thought.thought = lastThought.thought + thought.message_files = lastThought.message_files + responseItem.agent_thoughts[responseItem.agent_thoughts.length - 1] = thought + } + else { + responseItem.agent_thoughts.push(thought) + } + } + }) + }, + onMessageEnd: (messageEnd) => { + updateChatTreeNode(messageId, (responseItem) => { + if (messageEnd.metadata?.annotation_reply) { + responseItem.annotation = ({ + id: messageEnd.metadata.annotation_reply.id, + authorName: messageEnd.metadata.annotation_reply.account.name, + }) + return + } + responseItem.citation = messageEnd.metadata?.retriever_resources || [] + const processedFilesFromResponse = getProcessedFilesFromResponse(messageEnd.files || []) + responseItem.allFiles = uniqBy([...(responseItem.allFiles || []), ...(processedFilesFromResponse || [])], 'id') + }) + }, + onMessageReplace: (messageReplace) => { + updateChatTreeNode(messageId, (responseItem) => { + responseItem.content = messageReplace.answer + }) + }, + onError() { + handleResponding(false) + }, + onWorkflowStarted: ({ workflow_run_id, task_id, data: { is_resumption } }) => { + handleResponding(true) + hasStopResponded.current = false + updateChatTreeNode(messageId, (responseItem) => { + if (is_resumption) { + if (responseItem.workflowProcess) { + responseItem.workflowProcess.status = WorkflowRunningStatus.Running + } + else { + responseItem.workflowProcess = { + status: WorkflowRunningStatus.Running, + tracing: [], + } + } + } + else { + taskIdRef.current = task_id + responseItem.workflow_run_id = workflow_run_id + responseItem.workflowProcess = { + status: WorkflowRunningStatus.Running, + tracing: [], + } + } + }) + }, + onWorkflowFinished: ({ data: workflowFinishedData }) => { + updateChatTreeNode(messageId, (responseItem) => { + if (responseItem.workflowProcess) + responseItem.workflowProcess.status = workflowFinishedData.status as WorkflowRunningStatus + }) + }, + onIterationStart: ({ data: iterationStartedData }) => { + updateChatTreeNode(messageId, (responseItem) => { + if (!responseItem.workflowProcess) + return + if (!responseItem.workflowProcess.tracing) + responseItem.workflowProcess.tracing = [] + responseItem.workflowProcess.tracing.push({ + ...iterationStartedData, + status: WorkflowRunningStatus.Running, + }) + }) + }, + onIterationFinish: ({ data: iterationFinishedData }) => { + updateChatTreeNode(messageId, (responseItem) => { + if (!responseItem.workflowProcess?.tracing) + return + const tracing = responseItem.workflowProcess.tracing + const iterationIndex = tracing.findIndex(item => item.node_id === iterationFinishedData.node_id + && (item.execution_metadata?.parallel_id === iterationFinishedData.execution_metadata?.parallel_id || item.parallel_id === iterationFinishedData.execution_metadata?.parallel_id))! + if (iterationIndex > -1) { + tracing[iterationIndex] = { + ...tracing[iterationIndex], + ...iterationFinishedData, + status: WorkflowRunningStatus.Succeeded, + } + } + }) + }, + onNodeStarted: ({ data: nodeStartedData }) => { + updateChatTreeNode(messageId, (responseItem) => { + if (!responseItem.workflowProcess) + return + if (!responseItem.workflowProcess.tracing) + responseItem.workflowProcess.tracing = [] + + const { is_resumption } = nodeStartedData + if (is_resumption) { + const currentIndex = responseItem.workflowProcess.tracing.findIndex(item => item.node_id === nodeStartedData.node_id) + if (currentIndex > -1) { + responseItem.workflowProcess.tracing[currentIndex] = { + ...nodeStartedData, + status: NodeRunningStatus.Running, + } + } + } + else { + if (nodeStartedData.iteration_id) + return + + responseItem.workflowProcess.tracing.push({ + ...nodeStartedData, + status: WorkflowRunningStatus.Running, + }) + } + }) + }, + onNodeFinished: ({ data: nodeFinishedData }) => { + updateChatTreeNode(messageId, (responseItem) => { + if (!responseItem.workflowProcess?.tracing) + return + + if (nodeFinishedData.iteration_id) + return + + const currentIndex = responseItem.workflowProcess.tracing.findIndex((item) => { + if (!item.execution_metadata?.parallel_id) + return item.id === nodeFinishedData.id + + return item.id === nodeFinishedData.id && (item.execution_metadata?.parallel_id === nodeFinishedData.execution_metadata?.parallel_id) + }) + if (currentIndex > -1) + responseItem.workflowProcess.tracing[currentIndex] = nodeFinishedData as any + }) + }, + onTTSChunk: (messageId: string, audio: string) => { + if (!audio || audio === '') + return + const audioPlayer = getOrCreatePlayer() + if (audioPlayer) { + audioPlayer.playAudioWithAudio(audio, true) + AudioPlayerManager.getInstance().resetMsgId(messageId) + } + }, + onTTSEnd: (messageId: string, audio: string) => { + const audioPlayer = getOrCreatePlayer() + if (audioPlayer) + audioPlayer.playAudioWithAudio(audio, false) + }, + onLoopStart: ({ data: loopStartedData }) => { + updateChatTreeNode(messageId, (responseItem) => { + if (!responseItem.workflowProcess) + return + if (!responseItem.workflowProcess.tracing) + responseItem.workflowProcess.tracing = [] + responseItem.workflowProcess.tracing.push({ + ...loopStartedData, + status: WorkflowRunningStatus.Running, + }) + }) + }, + onLoopFinish: ({ data: loopFinishedData }) => { + updateChatTreeNode(messageId, (responseItem) => { + if (!responseItem.workflowProcess?.tracing) + return + const tracing = responseItem.workflowProcess.tracing + const loopIndex = tracing.findIndex(item => item.node_id === loopFinishedData.node_id + && (item.execution_metadata?.parallel_id === loopFinishedData.execution_metadata?.parallel_id || item.parallel_id === loopFinishedData.execution_metadata?.parallel_id))! + if (loopIndex > -1) { + tracing[loopIndex] = { + ...tracing[loopIndex], + ...loopFinishedData, + status: WorkflowRunningStatus.Succeeded, + } + } + }) + }, + onHumanInputRequired: ({ data: humanInputRequiredData }) => { + updateChatTreeNode(messageId, (responseItem) => { + if (!responseItem.humanInputFormDataList) { + responseItem.humanInputFormDataList = [humanInputRequiredData] + } + else { + const currentFormIndex = responseItem.humanInputFormDataList.findIndex(item => item.node_id === humanInputRequiredData.node_id) + if (currentFormIndex > -1) { + responseItem.humanInputFormDataList[currentFormIndex] = humanInputRequiredData + } + else { + responseItem.humanInputFormDataList.push(humanInputRequiredData) + } + } + if (responseItem.workflowProcess?.tracing) { + const currentTracingIndex = responseItem.workflowProcess.tracing.findIndex(item => item.node_id === humanInputRequiredData.node_id) + if (currentTracingIndex > -1) + responseItem.workflowProcess.tracing[currentTracingIndex].status = NodeRunningStatus.Paused + } + }) + }, + onHumanInputFormFilled: ({ data: humanInputFilledFormData }) => { + updateChatTreeNode(messageId, (responseItem) => { + if (responseItem.humanInputFormDataList?.length) { + const currentFormIndex = responseItem.humanInputFormDataList.findIndex(item => item.node_id === humanInputFilledFormData.node_id) + if (currentFormIndex > -1) + responseItem.humanInputFormDataList.splice(currentFormIndex, 1) + } + if (!responseItem.humanInputFilledFormDataList) { + responseItem.humanInputFilledFormDataList = [humanInputFilledFormData] + } + else { + responseItem.humanInputFilledFormDataList.push(humanInputFilledFormData) + } + }) + }, + onWorkflowPaused: ({ data: workflowPausedData }) => { + const resumeUrl = `/workflow/${workflowPausedData.workflow_run_id}/events` + sseGet( + resumeUrl, + {}, + otherOptions, + ) + updateChatTreeNode(messageId, (responseItem) => { + responseItem.workflowProcess!.status = WorkflowRunningStatus.Paused + }) + }, + } + + sseGet( + url, + {}, + otherOptions, + ) + }, [updateChatTreeNode, handleResponding, createAudioPlayerManager]) + const updateCurrentQAOnTree = useCallback(({ parentId, responseItem, @@ -303,30 +625,13 @@ export const useChat = ( let isAgentMode = false let hasSetResponseId = false - let ttsUrl = '' - let ttsIsPublic = false - if (params.token) { - ttsUrl = '/text-to-audio' - ttsIsPublic = true - } - else if (params.appId) { - if (pathname.search('explore/installed') > -1) - ttsUrl = `/installed-apps/${params.appId}/text-to-audio` - else - ttsUrl = `/apps/${params.appId}/text-to-audio` - } - // Lazy initialization: Only create AudioPlayer when TTS is actually needed - // This prevents opening audio channel unnecessarily - let player: AudioPlayer | null = null - const getOrCreatePlayer = () => { - if (!player) - player = AudioPlayerManager.getInstance().getAudioPlayer(ttsUrl, ttsIsPublic, uuidV4(), 'none', 'none', noop) - - return player - } + const getOrCreatePlayer = createAudioPlayerManager() const otherOptions: IOtherOptions = { isPublicAPI, + getAbortController: (abortController) => { + workflowEventsAbortControllerRef.current = abortController + }, onData: (message: string, isFirstMessage: boolean, { conversationId: newConversationId, messageId, taskId }: any) => { if (!isAgentMode) { responseItem.content = responseItem.content + message @@ -728,9 +1033,7 @@ export const useChat = ( notify, handleResponding, formatTime, - params.token, - params.appId, - pathname, + createAudioPlayerManager, formSettings, ]) @@ -798,6 +1101,7 @@ export const useChat = ( isResponding, setIsResponding, handleSend, + handleResume, suggestedQuestions, handleRestart, handleStop, diff --git a/web/app/components/base/chat/chat/type.ts b/web/app/components/base/chat/chat/type.ts index 185bdb0553..7bd4de5b05 100644 --- a/web/app/components/base/chat/chat/type.ts +++ b/web/app/components/base/chat/chat/type.ts @@ -73,6 +73,7 @@ export type ExtraContent type: 'human_input' submitted: false form_definition: HumanInputFormData + workflow_run_id: string } | { type: 'human_input'