feat(chatflow): stream LLM reasoning to a live thinking panel (#37460)

Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
This commit is contained in:
L1nSn0w 2026-06-23 17:49:01 +08:00 committed by GitHub
parent c2a554da93
commit 725e4da29d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
35 changed files with 909 additions and 115 deletions

View File

@ -27,7 +27,11 @@ from controllers.console.wraps import with_current_user
from core.app.entities.app_invoke_entities import InvokeFrom
from core.errors.error import ModelCurrentlyNotSupportError, ProviderTokenNotInitError, QuotaExceededError
from fields.conversation_fields import ResultResponse
from fields.message_fields import MessageInfiniteScrollPagination, MessageListItem, SuggestedQuestionsResponse
from fields.message_fields import (
ExploreMessageInfiniteScrollPagination,
ExploreMessageListItem,
SuggestedQuestionsResponse,
)
from graphon.model_runtime.errors.invoke import InvokeError
from libs import helper
from models import Account
@ -56,7 +60,7 @@ register_schema_models(console_ns, MessageListQuery, MessageFeedbackPayload, Mor
register_response_schema_models(
console_ns,
GeneratedAppResponse,
MessageInfiniteScrollPagination,
ExploreMessageInfiniteScrollPagination,
ResultResponse,
SuggestedQuestionsResponse,
)
@ -68,7 +72,7 @@ register_response_schema_models(
)
class MessageListApi(InstalledAppResource):
@console_ns.doc(params=query_params_from_model(MessageListQuery))
@console_ns.response(200, "Success", console_ns.models[MessageInfiniteScrollPagination.__name__])
@console_ns.response(200, "Success", console_ns.models[ExploreMessageInfiniteScrollPagination.__name__])
@with_current_user
def get(self, current_user: Account, installed_app: InstalledApp):
app_model = installed_app.app
@ -88,9 +92,9 @@ class MessageListApi(InstalledAppResource):
str(args.first_id) if args.first_id else None,
args.limit,
)
adapter = TypeAdapter(MessageListItem)
adapter = TypeAdapter(ExploreMessageListItem)
items = [adapter.validate_python(message, from_attributes=True) for message in pagination.data]
return MessageInfiniteScrollPagination(
return ExploreMessageInfiniteScrollPagination(
limit=pagination.limit,
has_more=pagination.has_more,
data=items,

View File

@ -41,6 +41,7 @@ from core.app.entities.queue_entities import (
QueueNodeStartedEvent,
QueueNodeSucceededEvent,
QueuePingEvent,
QueueReasoningChunkEvent,
QueueRetrieverResourcesEvent,
QueueStopEvent,
QueueTextChunkEvent,
@ -62,6 +63,7 @@ from core.app.entities.task_entities import (
MessageAudioStreamResponse,
MessageEndStreamResponse,
PingStreamResponse,
ReasoningChunkStreamResponse,
StreamResponse,
WorkflowPauseStreamResponse,
WorkflowTaskState,
@ -473,6 +475,17 @@ class AdvancedChatAppGenerateTaskPipeline(GraphRuntimeStateSupport):
self._workflow_response_converter.fetch_files_from_node_outputs(event.outputs or {})
)
# Collect terminal reasoning (separated mode) per LLM node id for persistence. This is the
# authoritative source (outputs.reasoning_content), decoupled from the live delta stream.
# Accumulate across iteration/loop passes (same node_id) to match the live stream, which
# appends every pass under the same key — overwriting would keep only the last pass.
if event.node_type == BuiltinNodeTypes.LLM:
reasoning_content = (event.outputs or {}).get("reasoning_content")
if isinstance(reasoning_content, str) and reasoning_content:
self._task_state.metadata.reasoning[event.node_id] = (
self._task_state.metadata.reasoning.get(event.node_id, "") + reasoning_content
)
node_finish_resp = self._workflow_response_converter.workflow_node_finish_to_stream_response(
event=event,
task_id=self._application_generate_entity.task_id,
@ -535,6 +548,27 @@ class AdvancedChatAppGenerateTaskPipeline(GraphRuntimeStateSupport):
answer=delta_text, message_id=self._message_id, from_variable_selector=event.from_variable_selector
)
def _handle_reasoning_chunk_event(
self, event: QueueReasoningChunkEvent, **kwargs
) -> Generator[StreamResponse, None, None]:
"""Handle out-of-band reasoning chunk events.
Pure emit: reasoning is streamed on its own channel and never written to the
answer. The terminal marker (is_final) may carry an empty reasoning string, in
which case it is still forwarded as the "thinking finished" signal.
"""
if not event.reasoning and not event.is_final:
return
yield ReasoningChunkStreamResponse(
task_id=self._application_generate_entity.task_id,
data=ReasoningChunkStreamResponse.Data(
message_id=self._message_id,
reasoning=event.reasoning,
node_id=event.from_node_id,
is_final=event.is_final,
),
)
def _handle_iteration_start_event(
self, event: QueueIterationStartEvent, **kwargs
) -> Generator[StreamResponse, None, None]:
@ -872,6 +906,7 @@ class AdvancedChatAppGenerateTaskPipeline(GraphRuntimeStateSupport):
QueuePingEvent: self._handle_ping_event,
QueueErrorEvent: self._handle_error_event,
QueueTextChunkEvent: self._handle_text_chunk_event,
QueueReasoningChunkEvent: self._handle_reasoning_chunk_event,
# Workflow events
QueueWorkflowStartedEvent: self._handle_workflow_started_event,
QueueWorkflowSucceededEvent: self._handle_workflow_succeeded_event,

View File

@ -24,6 +24,7 @@ from core.app.entities.queue_entities import (
QueueNodeRetryEvent,
QueueNodeStartedEvent,
QueueNodeSucceededEvent,
QueueReasoningChunkEvent,
QueueRetrieverResourcesEvent,
QueueTextChunkEvent,
QueueWorkflowFailedEvent,
@ -74,6 +75,7 @@ from graphon.graph_events import (
NodeRunLoopNextEvent,
NodeRunLoopStartedEvent,
NodeRunLoopSucceededEvent,
NodeRunReasoningChunkEvent,
NodeRunRetrieverResourceEvent,
NodeRunRetryEvent,
NodeRunStartedEvent,
@ -576,6 +578,16 @@ class WorkflowBasedAppRunner:
in_loop_id=event.in_loop_id,
)
)
case NodeRunReasoningChunkEvent():
self._publish_event(
QueueReasoningChunkEvent(
reasoning=event.chunk,
from_node_id=event.node_id,
is_final=event.is_final,
in_iteration_id=event.in_iteration_id,
in_loop_id=event.in_loop_id,
)
)
case NodeRunRetrieverResourceEvent():
self._publish_event(
QueueRetrieverResourcesEvent(

View File

@ -40,6 +40,7 @@ class QueueEvent(StrEnum):
NODE_FAILED = "node_failed"
NODE_EXCEPTION = "node_exception"
RETRIEVER_RESOURCES = "retriever_resources"
REASONING_CHUNK = "reasoning_chunk"
ANNOTATION_REPLY = "annotation_reply"
AGENT_THOUGHT = "agent_thought"
MESSAGE_FILE = "message_file"
@ -197,6 +198,26 @@ class QueueTextChunkEvent(AppQueueEvent):
"""loop id if node is in loop"""
class QueueReasoningChunkEvent(AppQueueEvent):
"""
QueueReasoningChunkEvent entity
Out-of-band reasoning (chain-of-thought) delta from an LLM node in "separated"
mode. It never touches the answer; it is emitted on a dedicated channel.
"""
event: QueueEvent = QueueEvent.REASONING_CHUNK
reasoning: str
from_node_id: str | None = None
"""id of the LLM node that produced this reasoning"""
is_final: bool = False
"""marks the terminal reasoning chunk for the node run (may carry empty reasoning)"""
in_iteration_id: str | None = None
"""iteration id if node is in iteration"""
in_loop_id: str | None = None
"""loop id if node is in loop"""
class QueueAgentMessageEvent(AppQueueEvent):
"""
QueueMessageEvent entity

View File

@ -27,6 +27,9 @@ class TaskStateMetadata(BaseModel):
annotation_reply: AnnotationReply | None = None
retriever_resources: Sequence[RetrievalSourceMetadata] = Field(default_factory=list)
usage: LLMUsage | None = None
reasoning: dict[str, str] = Field(default_factory=dict)
"""reasoning_content per LLM node id (separated mode), accumulated across iteration/loop
passes for that node; persisted to message_metadata"""
class TaskState(BaseModel):
@ -85,6 +88,7 @@ class StreamEvent(StrEnum):
LOOP_COMPLETED = "loop_completed"
TEXT_CHUNK = "text_chunk"
TEXT_REPLACE = "text_replace"
REASONING_CHUNK = "reasoning_chunk"
AGENT_LOG = "agent_log"
HUMAN_INPUT_REQUIRED = "human_input_required"
HUMAN_INPUT_FORM_FILLED = "human_input_form_filled"
@ -726,6 +730,28 @@ class TextChunkStreamResponse(StreamResponse):
data: Data
class ReasoningChunkStreamResponse(StreamResponse):
"""
ReasoningChunkStreamResponse entity
Out-of-band reasoning (chain-of-thought) delta, parallel to text_chunk. Only
emitted in "separated" mode; the answer/message stream stays free of <think>.
"""
class Data(BaseModel):
"""
Data entity
"""
message_id: str
reasoning: str
node_id: str | None = None
is_final: bool = False
event: StreamEvent = StreamEvent.REASONING_CHUNK
data: Data
class TextReplaceStreamResponse(StreamResponse):
"""
TextReplaceStreamResponse entity

View File

@ -77,6 +77,13 @@ class WebMessageListItem(MessageListItem):
)
class ExploreMessageListItem(MessageListItem):
metadata: JSONValueType | None = Field(
default=None,
validation_alias="message_metadata_dict",
)
class MessageInfiniteScrollPagination(ResponseModel):
limit: int
has_more: bool
@ -89,6 +96,12 @@ class WebMessageInfiniteScrollPagination(ResponseModel):
data: list[WebMessageListItem]
class ExploreMessageInfiniteScrollPagination(ResponseModel):
limit: int
has_more: bool
data: list[ExploreMessageListItem]
class SavedMessageItem(ResponseModel):
id: str
inputs: dict[str, JSONValueType]

View File

@ -6574,7 +6574,7 @@ Request body:
| Code | Description | Schema |
| ---- | ----------- | ------ |
| 200 | Success | **application/json**: [MessageInfiniteScrollPagination](#messageinfinitescrollpagination)<br> |
| 200 | Success | **application/json**: [ExploreMessageInfiniteScrollPagination](#exploremessageinfinitescrollpagination)<br> |
### [POST] /installed-apps/{installed_app_id}/messages/{message_id}/feedbacks
#### Parameters
@ -15996,6 +15996,34 @@ Request payload for bulk downloading documents as a zip archive.
| ---- | ---- | ----------- | -------- |
| tool_icons | object | | No |
#### ExploreMessageInfiniteScrollPagination
| Name | Type | Description | Required |
| ---- | ---- | ----------- | -------- |
| data | [ [ExploreMessageListItem](#exploremessagelistitem) ] | | Yes |
| has_more | boolean | | Yes |
| limit | integer | | Yes |
#### ExploreMessageListItem
| Name | Type | Description | Required |
| ---- | ---- | ----------- | -------- |
| agent_thoughts | [ [AgentThought](#agentthought) ] | | Yes |
| answer | string | | Yes |
| conversation_id | string | | Yes |
| created_at | integer | | No |
| error | string | | No |
| extra_contents | [ [HumanInputContent](#humaninputcontent) ] | | Yes |
| feedback | [SimpleFeedback](#simplefeedback) | | No |
| id | string | | Yes |
| inputs | object | | Yes |
| message_files | [ [MessageFile](#messagefile) ] | | Yes |
| metadata | [JSONValueType](#jsonvaluetype) | | No |
| parent_message_id | string | | No |
| query | string | | Yes |
| retriever_resources | [ [RetrieverResource](#retrieverresource) ] | | Yes |
| status | string | | Yes |
#### ExternalApiTemplateListQuery
| Name | Type | Description | Required |
@ -17070,14 +17098,6 @@ Enum class for large language model mode.
| upload_file_id | string | | No |
| url | string | | No |
#### MessageInfiniteScrollPagination
| Name | Type | Description | Required |
| ---- | ---- | ----------- | -------- |
| data | [ [MessageListItem](#messagelistitem) ] | | Yes |
| has_more | boolean | | Yes |
| limit | integer | | Yes |
#### MessageInfiniteScrollPaginationResponse
| Name | Type | Description | Required |
@ -17086,25 +17106,6 @@ Enum class for large language model mode.
| has_more | boolean | | Yes |
| limit | integer | | Yes |
#### MessageListItem
| Name | Type | Description | Required |
| ---- | ---- | ----------- | -------- |
| agent_thoughts | [ [AgentThought](#agentthought) ] | | Yes |
| answer | string | | Yes |
| conversation_id | string | | Yes |
| created_at | integer | | No |
| error | string | | No |
| extra_contents | [ [HumanInputContent](#humaninputcontent) ] | | Yes |
| feedback | [SimpleFeedback](#simplefeedback) | | No |
| id | string | | Yes |
| inputs | object | | Yes |
| message_files | [ [MessageFile](#messagefile) ] | | Yes |
| parent_message_id | string | | No |
| query | string | | Yes |
| retriever_resources | [ [RetrieverResource](#retrieverresource) ] | | Yes |
| status | string | | Yes |
#### MessageListQuery
| Name | Type | Description | Required |

View File

@ -44,7 +44,7 @@ dependencies = [
"resend>=2.27.0,<3.0.0",
# Emerging: newer and fast-moving, use compatible pins
"fastopenapi[flask]==0.7.0",
"graphon==0.5.2",
"graphon==0.5.3",
"httpx-sse==0.4.3",
"json-repair==0.59.4",
]

View File

@ -1,4 +1,4 @@
from core.app.entities.queue_entities import QueueStopEvent
from core.app.entities.queue_entities import QueueEvent, QueueReasoningChunkEvent, QueueStopEvent
class TestQueueEntities:
@ -10,3 +10,17 @@ class TestQueueEntities:
event = QueueStopEvent(stopped_by=QueueStopEvent.StopBy.USER_MANUAL)
event.stopped_by = "unknown"
assert event.get_stop_reason() == "Stopped by unknown reason."
def test_reasoning_chunk_event_defaults(self):
event = QueueReasoningChunkEvent(reasoning="thinking", from_node_id="llm")
assert event.event == QueueEvent.REASONING_CHUNK
assert event.reasoning == "thinking"
assert event.from_node_id == "llm"
assert event.is_final is False
assert event.in_iteration_id is None
assert event.in_loop_id is None
def test_reasoning_chunk_event_terminal_marker_allows_empty_reasoning(self):
event = QueueReasoningChunkEvent(reasoning="", from_node_id="llm", is_final=True)
assert event.reasoning == ""
assert event.is_final is True

View File

@ -1,10 +1,15 @@
import json
from core.app.entities.task_entities import (
NodeFinishStreamResponse,
NodeRetryStreamResponse,
NodeStartStreamResponse,
ReasoningChunkStreamResponse,
StreamEvent,
TaskStateMetadata,
)
from graphon.enums import WorkflowNodeExecutionStatus
from graphon.model_runtime.utils.encoders import jsonable_encoder
class TestTaskEntities:
@ -76,3 +81,41 @@ class TestTaskEntities:
assert payload["event"] == StreamEvent.NODE_RETRY.value
assert payload["data"]["retry_index"] == 2
assert payload["data"]["outputs"] is None
def test_reasoning_chunk_stream_response_shape(self):
response = ReasoningChunkStreamResponse(
task_id="task-1",
data=ReasoningChunkStreamResponse.Data(
message_id="msg-1",
reasoning="let me think",
node_id="llm",
is_final=False,
),
)
payload = response.model_dump()
assert payload["event"] == StreamEvent.REASONING_CHUNK
assert payload["task_id"] == "task-1"
assert payload["data"]["message_id"] == "msg-1"
assert payload["data"]["reasoning"] == "let me think"
assert payload["data"]["node_id"] == "llm"
assert payload["data"]["is_final"] is False
def test_task_state_metadata_reasoning_round_trips(self):
# The persistence path serializes the whole metadata to message_metadata via
# model_dump -> jsonable_encoder -> json.dumps, then reads back with json.loads.
metadata = TaskStateMetadata()
metadata.reasoning["llm"] = "first"
metadata.reasoning["llm2"] = "second"
serialized = json.dumps(jsonable_encoder(metadata.model_dump()))
restored = json.loads(serialized)
assert restored["reasoning"] == {"llm": "first", "llm2": "second"}
def test_task_state_metadata_reasoning_defaults_empty(self):
# Old rows / runs without reasoning serialize to an empty dict, never null.
metadata = TaskStateMetadata()
restored = json.loads(json.dumps(jsonable_encoder(metadata.model_dump())))
assert restored["reasoning"] == {}

View File

@ -47,7 +47,12 @@ from graphon.model_runtime.entities.model_entities import (
ParameterType,
)
from graphon.model_runtime.model_providers.model_provider_factory import ModelProviderFactory
from graphon.node_events import ModelInvokeCompletedEvent, RunRetrieverResourceEvent, StreamChunkEvent
from graphon.node_events import (
ModelInvokeCompletedEvent,
RunRetrieverResourceEvent,
StreamChunkEvent,
StreamReasoningEvent,
)
from graphon.nodes.base.entities import VariableSelector
from graphon.nodes.llm import llm_utils
from graphon.nodes.llm.entities import (
@ -1576,9 +1581,13 @@ def test_handle_invoke_result_streaming_collects_text_metrics_and_structured_out
assert events[0] == first_chunk
assert events[1] == StreamChunkEvent(selector=["node-1", "text"], chunk="answer", is_final=False)
assert events[1] == StreamReasoningEvent(selector=["node-1", "reasoning_content"], chunk="plan", is_final=False)
completed = events[2]
assert events[2] == StreamChunkEvent(selector=["node-1", "text"], chunk="answer", is_final=False)
assert events[3] == StreamReasoningEvent(selector=["node-1", "reasoning_content"], chunk="", is_final=True)
completed = events[4]
assert isinstance(completed, ModelInvokeCompletedEvent)
assert completed.text == "answer"
assert completed.reasoning_content == "plan"

View File

@ -0,0 +1,36 @@
from fields.message_fields import ExploreMessageListItem, MessageListItem
def _base_kwargs():
return {
"id": "m1",
"conversation_id": "c1",
"inputs": {},
"query": "hi",
"answer": "answer",
"retriever_resources": [],
"agent_thoughts": [],
"message_files": [],
"status": "normal",
"extra_contents": [],
}
class TestExploreMessageListItem:
def test_exposes_metadata_for_history_rehydration(self):
# The Explore/installed-app surface must surface message_metadata (incl. reasoning)
# so the chat-with-history client can rehydrate the thinking panel on reload.
item = ExploreMessageListItem(**_base_kwargs(), metadata={"reasoning": {"llm": "thinking..."}})
payload = item.model_dump(mode="json")
assert payload["metadata"] == {"reasoning": {"llm": "thinking..."}}
def test_metadata_defaults_to_none(self):
item = ExploreMessageListItem(**_base_kwargs())
assert item.model_dump(mode="json")["metadata"] is None
def test_base_message_list_item_has_no_metadata(self):
# Guard the public service-API contract: the base item must not leak metadata.
payload = MessageListItem(**_base_kwargs()).model_dump(mode="json")
assert "metadata" not in payload

8
api/uv.lock generated
View File

@ -1636,7 +1636,7 @@ requires-dist = [
{ name = "gmpy2", specifier = ">=2.3.0,<3.0.0" },
{ name = "google-api-python-client", specifier = ">=2.196.0,<3.0.0" },
{ name = "google-cloud-aiplatform", specifier = ">=1.151.0,<2.0.0" },
{ name = "graphon", specifier = "==0.5.2" },
{ name = "graphon", specifier = "==0.5.3" },
{ name = "gunicorn", specifier = ">=26.0.0,<27.0.0" },
{ name = "httpx", extras = ["socks"], specifier = "==0.28.1" },
{ name = "httpx-sse", specifier = "==0.4.3" },
@ -2987,7 +2987,7 @@ httpx = [
[[package]]
name = "graphon"
version = "0.5.2"
version = "0.5.3"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "charset-normalizer" },
@ -3008,9 +3008,9 @@ dependencies = [
{ name = "unstructured", extra = ["docx", "epub", "md", "ppt", "pptx"] },
{ name = "webvtt-py" },
]
sdist = { url = "https://files.pythonhosted.org/packages/c2/16/f183da187414c335be67f52f6a1b7c2a33bf0b1d5090eda7e6c92d42d94a/graphon-0.5.2.tar.gz", hash = "sha256:d66a9edcd883766bd50e94f84a691c92ce536ea60e721552089e83ac8e94bf68", size = 269773, upload-time = "2026-06-16T04:06:22.074Z" }
sdist = { url = "https://files.pythonhosted.org/packages/50/02/75c8cc2f946c8b6debe4f71a8a0f41a69cd499073368a8735ca424c6551f/graphon-0.5.3.tar.gz", hash = "sha256:eaa87d5e664acdf14c80e38afce6bc0f14644961de7ce7b059266fe61bc30e0b", size = 271204, upload-time = "2026-06-23T08:13:32.46Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/2f/e6/36a3981cd44e7a40a7cd7d374e26f01e02dd49410c5fbbd7df248750d5fb/graphon-0.5.2-py3-none-any.whl", hash = "sha256:11f89399e67ed1ddd2ce1c336accd9c4ad5b8fe2741f9167e6085af0b325cd14", size = 381908, upload-time = "2026-06-16T04:06:20.453Z" },
{ url = "https://files.pythonhosted.org/packages/84/fb/616f8ecbd184af57dca8380877b149198d944f4a6658cceb353ae02ace92/graphon-0.5.3-py3-none-any.whl", hash = "sha256:a7f070d1e5eef13d25b97cce6d23675b228c1d38f3c656e3dcacaa6be9ccada4", size = 383359, upload-time = "2026-06-23T08:13:31.075Z" },
]
[[package]]

View File

@ -2133,9 +2133,6 @@
}
},
"web/app/components/base/markdown-blocks/think-block.tsx": {
"react/set-state-in-effect": {
"count": 1
},
"ts/no-explicit-any": {
"count": 4
}

View File

@ -98,8 +98,8 @@ export type ResultResponse = {
result: string
}
export type MessageInfiniteScrollPagination = {
data: Array<MessageListItem>
export type ExploreMessageInfiniteScrollPagination = {
data: Array<ExploreMessageListItem>
has_more: boolean
limit: number
}
@ -187,7 +187,7 @@ export type JsonValue
| Array<unknown>
| null
export type MessageListItem = {
export type ExploreMessageListItem = {
agent_thoughts: Array<AgentThought>
answer: string
conversation_id: string
@ -200,6 +200,7 @@ export type MessageListItem = {
[key: string]: JsonValueType
}
message_files: Array<MessageFile>
metadata?: JsonValueType | null
parent_message_id?: string | null
query: string
retriever_resources: Array<RetrieverResource>
@ -644,7 +645,7 @@ export type GetInstalledAppsByInstalledAppIdMessagesData = {
}
export type GetInstalledAppsByInstalledAppIdMessagesResponses = {
200: MessageInfiniteScrollPagination
200: ExploreMessageInfiniteScrollPagination
}
export type GetInstalledAppsByInstalledAppIdMessagesResponse

View File

@ -503,9 +503,9 @@ export const zHumanInputContent = z.object({
})
/**
* MessageListItem
* ExploreMessageListItem
*/
export const zMessageListItem = z.object({
export const zExploreMessageListItem = z.object({
agent_thoughts: z.array(zAgentThought),
answer: z.string(),
conversation_id: z.string(),
@ -516,6 +516,7 @@ export const zMessageListItem = z.object({
id: z.string(),
inputs: z.record(z.string(), zJsonValueType),
message_files: z.array(zMessageFile),
metadata: zJsonValueType.nullish(),
parent_message_id: z.string().nullish(),
query: z.string(),
retriever_resources: z.array(zRetrieverResource),
@ -523,10 +524,10 @@ export const zMessageListItem = z.object({
})
/**
* MessageInfiniteScrollPagination
* ExploreMessageInfiniteScrollPagination
*/
export const zMessageInfiniteScrollPagination = z.object({
data: z.array(zMessageListItem),
export const zExploreMessageInfiniteScrollPagination = z.object({
data: z.array(zExploreMessageListItem),
has_more: z.boolean(),
limit: z.int(),
})
@ -693,7 +694,8 @@ export const zGetInstalledAppsByInstalledAppIdMessagesQuery = z.object({
/**
* Success
*/
export const zGetInstalledAppsByInstalledAppIdMessagesResponse = zMessageInfiniteScrollPagination
export const zGetInstalledAppsByInstalledAppIdMessagesResponse
= zExploreMessageInfiniteScrollPagination
export const zPostInstalledAppsByInstalledAppIdMessagesByMessageIdFeedbacksBody
= zMessageFeedbackPayload

View File

@ -67,6 +67,8 @@ function getFormattedChatList(messages: any[]) {
feedback: item.feedback,
isAnswer: true,
citation: item.retriever_resources,
reasoningContent: item.metadata?.reasoning,
reasoningFinished: true,
message_files: getProcessedFilesFromResponse(answerFiles.map((item: any) => ({ ...item, related_id: item.id, upload_file_id: item.upload_file_id }))),
parentMessageId: `question-${item.id}`,
humanInputFormDataList,

View File

@ -68,6 +68,7 @@ type HookCallbacks = {
onWorkflowPaused: (workflowPaused: Record<string, unknown>) => void
onTTSChunk: (messageId: string, audio: string) => void
onTTSEnd: (messageId: string, audio: string) => void
onReasoning: (chunk: { data: { message_id?: string, reasoning: string, node_id?: string, is_final?: boolean } }) => void
}
type UseChatFormSettings = NonNullable<Parameters<typeof useChat>[1]>
@ -2410,4 +2411,98 @@ describe('useChat', () => {
})
expect(result.current.chatList[1]!.annotation?.id).toBe('')
})
describe('reasoning (separated mode)', () => {
it('accumulates reasoning deltas per node and marks finished on is_final (handleSend)', () => {
let callbacks: HookCallbacks
vi.mocked(ssePost).mockImplementation(async (_url, _params, options) => {
callbacks = options as HookCallbacks
})
const { result } = renderHook(() => useChat())
act(() => {
result.current.handleSend('test-url', { query: 'hi' }, {})
})
act(() => {
callbacks.onData('answer', true, { messageId: 'm-1', conversationId: 'c-1', taskId: 't-1' })
})
act(() => {
callbacks.onReasoning({ data: { message_id: 'm-1', reasoning: 'let me ', node_id: 'llm' } })
callbacks.onReasoning({ data: { message_id: 'm-1', reasoning: 'think', node_id: 'llm' } })
})
const responseItem = result.current.chatList[1]!
expect(responseItem.reasoningContent).toEqual({ llm: 'let me think' })
expect(responseItem.reasoningFinished).toBeUndefined()
// answer stays clean — reasoning never leaks into content
expect(responseItem.content).toBe('answer')
act(() => {
callbacks.onReasoning({ data: { message_id: 'm-1', reasoning: '', node_id: 'llm', is_final: true } })
})
expect(result.current.chatList[1]!.reasoningContent).toEqual({ llm: 'let me think' })
expect(result.current.chatList[1]!.reasoningFinished).toBe(true)
})
it('keys reasoning by node and falls back to "_" when node_id is absent (handleSend)', () => {
let callbacks: HookCallbacks
vi.mocked(ssePost).mockImplementation(async (_url, _params, options) => {
callbacks = options as HookCallbacks
})
const { result } = renderHook(() => useChat())
act(() => {
result.current.handleSend('test-url', { query: 'hi' }, {})
})
act(() => {
callbacks.onData('answer', true, { messageId: 'm-1', conversationId: 'c-1', taskId: 't-1' })
})
act(() => {
callbacks.onReasoning({ data: { message_id: 'm-1', reasoning: 'a', node_id: 'llm-1' } })
callbacks.onReasoning({ data: { message_id: 'm-1', reasoning: 'b', node_id: 'llm-2' } })
callbacks.onReasoning({ data: { message_id: 'm-1', reasoning: 'c' } })
})
expect(result.current.chatList[1]!.reasoningContent).toEqual({ 'llm-1': 'a', 'llm-2': 'b', '_': 'c' })
})
it('accumulates reasoning onto an existing answer node on resume (handleResume / sseGet)', () => {
let callbacks: HookCallbacks
vi.mocked(sseGet).mockImplementation(async (_url, _params, options) => {
callbacks = options as HookCallbacks
})
const prevChatTree = [{
id: 'q-1',
content: 'query',
isAnswer: false,
children: [{
id: 'm-1',
content: 'initial',
isAnswer: true,
message_files: [],
siblingIndex: 0,
}],
}]
const { result } = renderHook(() => useChat(undefined, undefined, prevChatTree as ChatItemInTree[]))
act(() => {
result.current.handleResume('m-1', 'wr-1', { isPublicAPI: true })
})
act(() => {
callbacks.onReasoning({ data: { message_id: 'm-1', reasoning: 'resumed ', node_id: 'llm' } })
callbacks.onReasoning({ data: { message_id: 'm-1', reasoning: 'thought', node_id: 'llm', is_final: true } })
})
const responseItem = result.current.chatList.find(item => item.id === 'm-1')!
expect(responseItem.reasoningContent).toEqual({ llm: 'resumed thought' })
expect(responseItem.reasoningFinished).toBe(true)
})
})
})

View File

@ -150,6 +150,92 @@ describe('Answer Component', () => {
})
})
// Reasoning panel slot (separated-mode chain-of-thought) in both layouts.
// The panel body renders through the async dynamic Markdown, so assertions
// target the synchronously-rendered "Thinking…/Thought" summary label.
describe('Reasoning Panel', () => {
it('should render the reasoning panel in the normal layout while thinking', () => {
render(
<Answer
{...defaultProps}
responding={true}
item={{
...defaultProps.item,
// Thinking ⇒ the answer has not started yet, so content must be empty.
content: '',
reasoningContent: { llm: 'deep thought' },
} as unknown as ChatItem}
/>,
)
expect(screen.getByText(/chat\.thinking/)).toBeInTheDocument()
})
it('should render the reasoning panel in the thought state once finished', () => {
render(
<Answer
{...defaultProps}
item={{
...defaultProps.item,
reasoningContent: { llm: 'recalled reasoning' },
reasoningFinished: true,
} as unknown as ChatItem}
/>,
)
expect(screen.getByText(/chat\.thought/)).toBeInTheDocument()
})
it('should render the reasoning panel within the human-input layout', () => {
render(
<Answer
{...defaultProps}
item={{
...defaultProps.item,
reasoningContent: { llm: 'human-input reasoning' },
humanInputFormDataList: [{ id: 'form1' }],
} as unknown as ChatItem}
/>,
)
// hasHumanInputs is true, so this can only come from the human-input slot
expect(screen.getByText(/chat\.(thinking|thought)/)).toBeInTheDocument()
})
it('should render the reasoning panel in the human-input layout when the answer is empty (history reload)', () => {
// Regression: the human-input slot outer guard must include hasReasoning, otherwise a
// rehydrated message with forms + reasoning but an empty answer drops the panel entirely.
render(
<Answer
{...defaultProps}
item={{
...defaultProps.item,
content: '',
reasoningContent: { llm: 'reload reasoning' },
reasoningFinished: true,
humanInputFilledFormDataList: [{ id: 'form1' }],
} as unknown as ChatItem}
/>,
)
expect(screen.getByText(/chat\.thought/)).toBeInTheDocument()
})
it('should not render the reasoning panel when reasoningContent is absent', () => {
render(<Answer {...defaultProps} />)
expect(screen.queryByText(/chat\.(thinking|thought)/)).not.toBeInTheDocument()
})
it('should not render the reasoning panel for an empty reasoningContent map (rehydrated, no reasoning)', () => {
render(
<Answer
{...defaultProps}
item={{
...defaultProps.item,
reasoningContent: {},
} as unknown as ChatItem}
/>,
)
expect(screen.queryByText(/chat\.(thinking|thought)/)).not.toBeInTheDocument()
})
})
describe('Interactions', () => {
it('should handle switch sibling', () => {
const mockSwitchSibling = vi.fn()

View File

@ -0,0 +1,89 @@
import { act, render, screen } from '@testing-library/react'
import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest'
import ReasoningPanel from '../reasoning-panel'
// Mock react-i18next so the reused chat.thinking/chat.thought labels resolve.
vi.mock('react-i18next', () => ({
useTranslation: () => ({
t: (key: string) => {
const translations: Record<string, string> = {
'chat.thinking': 'Thinking...',
'chat.thought': 'Thought',
}
return translations[key] || key
},
}),
}))
// Mock the heavy Markdown renderer to a simple passthrough.
vi.mock('@/app/components/base/markdown', () => ({
Markdown: ({ content }: { content: string }) => <div data-testid="reasoning-markdown">{content}</div>,
}))
describe('ReasoningPanel', () => {
beforeEach(() => {
vi.clearAllMocks()
vi.useFakeTimers()
})
afterEach(() => {
vi.useRealTimers()
})
it('renders nothing when there is no reasoning text', () => {
const { container } = render(<ReasoningPanel content={{}} done={false} />)
expect(container).toBeEmptyDOMElement()
})
it('shows the thinking state while not done', () => {
render(<ReasoningPanel content={{ llm: 'let me think' }} done={false} />)
expect(screen.getByText(/Thinking\.\.\./)).toBeInTheDocument()
expect(screen.getByText('let me think')).toBeInTheDocument()
})
it('shows the thought state once done (answer started / terminal / history)', () => {
render(<ReasoningPanel content={{ llm: 'done thinking' }} done />)
expect(screen.getByText(/Thought/)).toBeInTheDocument()
})
it('counts elapsed time up while thinking', () => {
render(<ReasoningPanel content={{ llm: 'thinking' }} done={false} />)
expect(screen.getByText(/\(0\.0s\)/)).toBeInTheDocument()
act(() => {
vi.advanceTimersByTime(500)
})
expect(screen.getByText(/\(0\.5s\)/)).toBeInTheDocument()
})
it('freezes the timer once done (latched), even if it flips back', () => {
const { rerender } = render(<ReasoningPanel content={{ llm: 'thinking' }} done={false} />)
act(() => {
vi.advanceTimersByTime(700)
})
// Answer starts → done latches; timer must stop at 0.7s.
rerender(<ReasoningPanel content={{ llm: 'thinking' }} done />)
act(() => {
vi.advanceTimersByTime(1000)
})
expect(screen.getByText(/Thought\(0\.7s\)/)).toBeInTheDocument()
})
it('concatenates reasoning from multiple LLM nodes', () => {
render(<ReasoningPanel content={{ llm1: 'first', llm2: 'second' }} done={false} />)
expect(screen.getByTestId('reasoning-markdown')).toHaveTextContent('first second')
})
it('reflects in-place mutation of the same content object (streaming)', () => {
// The live stream mutates the same reasoningContent object under a stable reference,
// then re-renders. The panel must reflect the appended delta, not a stale snapshot.
const content: Record<string, string> = { llm: 'first' }
const { rerender } = render(<ReasoningPanel content={content} done={false} />)
expect(screen.getByTestId('reasoning-markdown')).toHaveTextContent('first')
content.llm = 'first second'
rerender(<ReasoningPanel content={content} done={false} />)
expect(screen.getByTestId('reasoning-markdown')).toHaveTextContent('first second')
})
})

View File

@ -24,6 +24,7 @@ import HumanInputFilledFormList from './human-input-filled-form-list'
import HumanInputFormList from './human-input-form-list'
import More from './more'
import Operation from './operation'
import ReasoningPanel from './reasoning-panel'
import SuggestedQuestions from './suggested-questions'
import WorkflowProcessItem from './workflow-process'
@ -74,6 +75,9 @@ const Answer: FC<AnswerProps> = ({
} = item
const hasAgentThoughts = !!agent_thoughts?.length
const hasHumanInputs = !!humanInputFormDataList?.length || !!humanInputFilledFormDataList?.length
// Truthy only when there is real reasoning text. Rehydrated messages carry an empty
// `{}` (the field is always persisted), and `!!{}` would otherwise be truthy.
const hasReasoning = !!item.reasoningContent && Object.values(item.reasoningContent).some(Boolean)
const [containerWidth, setContainerWidth] = useState(0)
const [contentWidth, setContentWidth] = useState(0)
@ -140,6 +144,15 @@ const Answer: FC<AnswerProps> = ({
}, [switchSibling, item.prevSibling, item.nextSibling])
const contentIsEmpty = typeof content === 'string' && content.trim() === ''
// Reasoning is "done" — freeze the elapsed timer and collapse the panel — as soon as ANY of:
// ① the answer has begun streaming (first text delta): the only signal that fires
// mid-node, so it drives the normal think→answer handoff;
// ② the reasoning stream's terminal marker arrived (a reasoning node that finishes
// before a separate answer node starts);
// ③ the response is no longer active — explicitly false, not merely absent (history / abnormal end).
// graphon's is_final (on BOTH the text and reasoning channels) is a node-terminal marker
// that trails the whole answer, so it can't drive ①; the answer-started signal must.
const reasoningDone = !contentIsEmpty || !!item.reasoningFinished || responding === false
return (
<div className="mb-2 flex last:mb-0">
@ -223,7 +236,7 @@ const Answer: FC<AnswerProps> = ({
)}
{/* Block 2: Response Content (when human inputs exist) */}
{hasHumanInputs && (responding || !contentIsEmpty || hasAgentThoughts) && (
{hasHumanInputs && (responding || !contentIsEmpty || hasAgentThoughts || hasReasoning) && (
<div className={cn('group relative mt-2 pr-10', chatAnswerContainerInner)}>
<div className="absolute -top-2 left-6 h-3 w-0.5 bg-chat-answer-human-input-form-divider-bg" />
<div
@ -245,7 +258,15 @@ const Answer: FC<AnswerProps> = ({
)
}
{
responding && contentIsEmpty && !hasAgentThoughts && (
hasReasoning && (
<ReasoningPanel
content={item.reasoningContent ?? {}}
done={reasoningDone}
/>
)
}
{
responding && contentIsEmpty && !hasAgentThoughts && !hasReasoning && (
<div className="flex h-5 w-6 items-center justify-center">
<LoadingAnim type="text" />
</div>
@ -351,7 +372,15 @@ const Answer: FC<AnswerProps> = ({
)
}
{
responding && contentIsEmpty && !hasAgentThoughts && (
hasReasoning && (
<ReasoningPanel
content={item.reasoningContent ?? {}}
done={reasoningDone}
/>
)
}
{
responding && contentIsEmpty && !hasAgentThoughts && !hasReasoning && (
<div className="flex h-5 w-6 items-center justify-center">
<LoadingAnim type="text" />
</div>

View File

@ -0,0 +1,31 @@
import type { FC } from 'react'
import { Markdown } from '@/app/components/base/markdown'
import ThinkingDetails from '@/app/components/base/markdown-blocks/thinking-details'
import { useElapsedTimer } from '@/app/components/base/markdown-blocks/use-elapsed-timer'
type ReasoningPanelProps = {
// reasoning (chain-of-thought) deltas accumulated per LLM node id
content: Record<string, string>
// true once reasoning is over (answer started / terminal marker / response ended);
// latches the elapsed timer and collapses the panel. Computed by the caller.
done: boolean
}
const ReasoningPanel: FC<ReasoningPanelProps> = ({ content, done }) => {
// First version renders one panel for the run; multiple LLM nodes are concatenated.
// Computed inline (not memoized): the live stream mutates `content` in place under a
// stable reference, so a [content]-keyed memo would never see new deltas.
const text = Object.values(content).filter(Boolean).join('\n\n')
const { elapsedTime, isComplete } = useElapsedTimer(done)
if (!text)
return null
return (
<ThinkingDetails className="my-2" isComplete={isComplete} elapsedTime={elapsedTime}>
<Markdown content={text} />
</ThinkingDetails>
)
}
export default ReasoningPanel

View File

@ -12,6 +12,7 @@ import type {
IOnDataMoreInfo,
IOtherOptions,
} from '@/service/base'
import type { ReasoningChunkResponse } from '@/types/workflow'
import { toast } from '@langgenius/dify-ui/toast'
import { uniqBy } from 'es-toolkit/compat'
import { noop } from 'es-toolkit/function'
@ -283,6 +284,17 @@ export const useChat = (
if (taskId)
taskIdRef.current = taskId
},
onReasoning: ({ data: reasoningData }: ReasoningChunkResponse) => {
const { message_id, reasoning, node_id, is_final } = reasoningData
updateChatTreeNode(message_id, (responseItem) => {
const reasoningContent = responseItem.reasoningContent || (responseItem.reasoningContent = {})
const key = node_id || '_'
if (reasoning)
reasoningContent[key] = (reasoningContent[key] || '') + reasoning
if (is_final)
responseItem.reasoningFinished = true
})
},
async onCompleted(hasError?: boolean) {
handleResponding(false)
@ -757,6 +769,22 @@ export const useChat = (
parentId: data.parent_message_id,
})
},
onReasoning: ({ data: reasoningData }: ReasoningChunkResponse) => {
const { reasoning, node_id, is_final } = reasoningData
const reasoningContent = responseItem.reasoningContent || (responseItem.reasoningContent = {})
const key = node_id || '_'
if (reasoning)
reasoningContent[key] = (reasoningContent[key] || '') + reasoning
if (is_final)
responseItem.reasoningFinished = true
updateCurrentQAOnTree({
placeholderQuestionId,
questionItem,
responseItem,
parentId: data.parent_message_id,
})
},
async onCompleted(hasError?: boolean) {
handleResponding(false)

View File

@ -113,6 +113,9 @@ export type IChatItem = {
suggestedQuestions?: string[]
log?: { role: string, text: string, files?: FileEntity[] }[]
agent_thoughts?: ThoughtItem[]
// for LLM reasoning (chain-of-thought) in "separated" mode, keyed by LLM node id
reasoningContent?: Record<string, string>
reasoningFinished?: boolean
message_files?: FileEntity[]
workflow_run_id?: string
// for agent log

View File

@ -39,6 +39,8 @@ function getFormattedChatList(messages: any[]) {
feedback: item.feedback,
isAnswer: true,
citation: item.retriever_resources,
reasoningContent: item.metadata?.reasoning,
reasoningFinished: true,
message_files: getProcessedFilesFromResponse(answerFiles.map((item: any) => ({ ...item, related_id: item.id }))),
parentMessageId: `question-${item.id}`,
})

View File

@ -1,8 +1,7 @@
import { cn } from '@langgenius/dify-ui/cn'
import * as React from 'react'
import { useEffect, useRef, useState } from 'react'
import { useTranslation } from 'react-i18next'
import { useChatContext } from '../chat/chat/context'
import ThinkingDetails from './thinking-details'
import { useElapsedTimer } from './use-elapsed-timer'
const hasEndThink = (children: any): boolean => {
if (typeof children === 'string')
@ -40,34 +39,9 @@ const removeEndThink = (children: any): any => {
const useThinkTimer = (children: any) => {
const { isResponding } = useChatContext()
const endThinkDetected = hasEndThink(children)
const [startTime] = useState(() => Date.now())
const [elapsedTime, setElapsedTime] = useState(0)
const [isComplete, setIsComplete] = useState(() => endThinkDetected)
const timerRef = useRef<NodeJS.Timeout | null>(null)
useEffect(() => {
if (isComplete)
return
timerRef.current = setInterval(() => {
setElapsedTime(Math.floor((Date.now() - startTime) / 100) / 10)
}, 100)
return () => {
if (timerRef.current)
clearInterval(timerRef.current)
}
}, [startTime, isComplete])
useEffect(() => {
// Stop timer when:
// 1. Content has [ENDTHINKFLAG] marker (normal completion)
// 2. isResponding is not true (false = user clicked stop, undefined = historical conversation)
if (endThinkDetected || !isResponding)
setIsComplete(true)
}, [endThinkDetected, isResponding])
return { elapsedTime, isComplete }
// Stop when the marker arrives (normal completion) or the response is no longer
// active (false = user stopped, undefined = historical conversation).
return useElapsedTimer(endThinkDetected || !isResponding)
}
type ThinkBlockProps = React.ComponentProps<'details'> & {
@ -77,41 +51,22 @@ type ThinkBlockProps = React.ComponentProps<'details'> & {
const ThinkBlock = ({ children, ...props }: ThinkBlockProps) => {
const { elapsedTime, isComplete } = useThinkTimer(children)
const displayContent = removeEndThink(children)
const { t } = useTranslation()
const { 'data-think': isThink = false, className, open, ...rest } = props
if (!isThink)
return (<details {...props}>{children}</details>)
return (
<details
<ThinkingDetails
{...rest}
data-think={isThink}
className={cn('group', className)}
open={isComplete ? open : true}
className={className}
open={open}
isComplete={isComplete}
elapsedTime={elapsedTime}
>
<summary className="flex cursor-pointer list-none items-center pl-2 font-bold whitespace-nowrap text-text-secondary select-none">
<div className="flex shrink-0 items-center">
<svg
className="mr-2 size-3 transition-transform duration-500 group-open:rotate-90"
fill="none"
stroke="currentColor"
viewBox="0 0 24 24"
>
<path
strokeLinecap="round"
strokeLinejoin="round"
strokeWidth={2}
d="M9 5l7 7-7 7"
/>
</svg>
{isComplete ? `${t('chat.thought', { ns: 'common' })}(${elapsedTime.toFixed(1)}s)` : `${t('chat.thinking', { ns: 'common' })}(${elapsedTime.toFixed(1)}s)`}
</div>
</summary>
<div className="ml-2 border-l border-components-panel-border bg-components-panel-bg-alt p-3 text-text-secondary">
{displayContent}
</div>
</details>
{displayContent}
</ThinkingDetails>
)
}

View File

@ -0,0 +1,49 @@
import type { ComponentProps } from 'react'
import { cn } from '@langgenius/dify-ui/cn'
import { useTranslation } from 'react-i18next'
type ThinkingDetailsProps = ComponentProps<'details'> & {
isComplete: boolean
elapsedTime: number
}
/**
* Presentational collapsible "thinking" shell: the chevron summary with the
* "Thinking…/Thought (Xs)" label and the bordered content body. Driver-agnostic
* callers compute `isComplete`/`elapsedTime` and pass the body as children.
*/
const ThinkingDetails = ({ isComplete, elapsedTime, className, open, children, ...rest }: ThinkingDetailsProps) => {
const { t } = useTranslation()
return (
<details
{...rest}
className={cn('group', className)}
open={isComplete ? open : true}
>
<summary className="flex cursor-pointer list-none items-center pl-2 font-bold whitespace-nowrap text-text-secondary select-none">
<div className="flex shrink-0 items-center">
<svg
className="mr-2 size-3 transition-transform duration-500 group-open:rotate-90"
fill="none"
stroke="currentColor"
viewBox="0 0 24 24"
>
<path
strokeLinecap="round"
strokeLinejoin="round"
strokeWidth={2}
d="M9 5l7 7-7 7"
/>
</svg>
{isComplete ? `${t('chat.thought', { ns: 'common' })}(${elapsedTime.toFixed(1)}s)` : `${t('chat.thinking', { ns: 'common' })}(${elapsedTime.toFixed(1)}s)`}
</div>
</summary>
<div className="ml-2 border-l border-components-panel-border bg-components-panel-bg-alt p-3 text-text-secondary">
{children}
</div>
</details>
)
}
export default ThinkingDetails

View File

@ -0,0 +1,34 @@
import { useEffect, useRef, useState } from 'react'
/**
* Elapsed-time timer shared by the tagged-markdown `ThinkBlock` and the
* stream-driven `ReasoningPanel`. Counts up every 100ms until `complete`
* latches true, then freezes. Initializing complete=true (e.g. historical
* conversations) never starts the timer.
*/
export const useElapsedTimer = (complete: boolean) => {
const [startTime] = useState(() => Date.now())
const [elapsedTime, setElapsedTime] = useState(0)
// Latch completion so a transient flip back to "not complete" never restarts the timer.
const completedRef = useRef(complete)
if (complete)
completedRef.current = true
const isComplete = completedRef.current
const timerRef = useRef<NodeJS.Timeout | null>(null)
useEffect(() => {
if (isComplete)
return
timerRef.current = setInterval(() => {
setElapsedTime(Math.floor((Date.now() - startTime) / 100) / 10)
}, 100)
return () => {
if (timerRef.current)
clearInterval(timerRef.current)
}
}, [startTime, isComplete])
return { elapsedTime, isComplete }
}

View File

@ -39,6 +39,8 @@ function getFormattedChatList(messages: any[]) {
feedback: item.feedback,
isAnswer: true,
citation: item.metadata?.retriever_resources,
reasoningContent: item.metadata?.reasoning,
reasoningFinished: true,
message_files: getProcessedFilesFromResponse(answerFiles.map((item: any) => ({ ...item, related_id: item.id }))),
workflow_run_id: item.workflow_run_id,
parentMessageId: `question-${item.id}`,

View File

@ -295,6 +295,35 @@ describe('useChat handleResume', () => {
})
})
describe('onReasoning', () => {
it('should accumulate reasoning per node onto the resumed answer', async () => {
const { result } = await setupResumeWithTree()
act(() => {
capturedResumeOptions.onReasoning({ data: { message_id: 'msg-resume', reasoning: 'resumed ', node_id: 'llm' } })
capturedResumeOptions.onReasoning({ data: { message_id: 'msg-resume', reasoning: 'thought', node_id: 'llm', is_final: true } })
})
const answer = result.current.chatList.find(item => item.id === 'msg-resume')
expect(answer!.reasoningContent).toEqual({ llm: 'resumed thought' })
expect(answer!.reasoningFinished).toBe(true)
})
it('should ignore empty reasoning and fall back to "_" when node_id is absent', async () => {
const { result } = await setupResumeWithTree()
act(() => {
capturedResumeOptions.onReasoning({ data: { message_id: 'msg-resume', reasoning: '' } })
capturedResumeOptions.onReasoning({ data: { message_id: 'msg-resume', reasoning: 'a' } })
capturedResumeOptions.onReasoning({ data: { message_id: 'msg-resume', reasoning: 'b' } })
})
const answer = result.current.chatList.find(item => item.id === 'msg-resume')
expect(answer!.reasoningContent).toEqual({ _: 'ab' })
expect(answer!.reasoningFinished).toBeUndefined()
})
})
describe('onCompleted', () => {
it('should set isResponding to false', async () => {
const { result } = await setupResumeWithTree()

View File

@ -216,6 +216,50 @@ describe('useChat handleSend SSE callbacks', () => {
})
})
describe('onReasoning', () => {
const findAnswer = (result: any) =>
result.current.chatList.find((item: any) => item.isAnswer && !item.isOpeningStatement)
it('should accumulate reasoning per node without leaking into content', () => {
const { result } = setupAndSend()
act(() => {
capturedCallbacks.onReasoning({ data: { message_id: 'm-1', reasoning: 'let me ', node_id: 'llm' } })
capturedCallbacks.onReasoning({ data: { message_id: 'm-1', reasoning: 'think', node_id: 'llm' } })
})
const answer = findAnswer(result)
expect(answer!.reasoningContent).toEqual({ llm: 'let me think' })
expect(answer!.reasoningFinished).toBeUndefined()
expect(answer!.content).toBe('')
})
it('should key reasoning by node and fall back to "_" when node_id is absent', () => {
const { result } = setupAndSend()
act(() => {
capturedCallbacks.onReasoning({ data: { message_id: 'm-1', reasoning: 'a', node_id: 'llm-1' } })
capturedCallbacks.onReasoning({ data: { message_id: 'm-1', reasoning: 'b', node_id: 'llm-2' } })
capturedCallbacks.onReasoning({ data: { message_id: 'm-1', reasoning: 'c' } })
})
expect(findAnswer(result)!.reasoningContent).toEqual({ 'llm-1': 'a', 'llm-2': 'b', '_': 'c' })
})
it('should ignore empty reasoning and mark finished when is_final is set', () => {
const { result } = setupAndSend()
act(() => {
capturedCallbacks.onReasoning({ data: { message_id: 'm-1', reasoning: 'done', node_id: 'llm' } })
capturedCallbacks.onReasoning({ data: { message_id: 'm-1', reasoning: '', node_id: 'llm', is_final: true } })
})
const answer = findAnswer(result)
expect(answer!.reasoningContent).toEqual({ llm: 'done' })
expect(answer!.reasoningFinished).toBe(true)
})
})
describe('onCompleted', () => {
it('should set isResponding to false', async () => {
const { result } = setupAndSend()

View File

@ -7,6 +7,7 @@ import type {
} from '@/app/components/base/chat/types'
import type { FileEntity } from '@/app/components/base/file-uploader/types'
import type { IOtherOptions } from '@/service/base'
import type { ReasoningChunkResponse } from '@/types/workflow'
import { toast } from '@langgenius/dify-ui/toast'
import { uniqBy } from 'es-toolkit/compat'
import { produce, setAutoFreeze } from 'immer'
@ -365,6 +366,22 @@ export const useChat = (
parentId: params.parent_message_id,
})
},
onReasoning: ({ data: reasoningData }: ReasoningChunkResponse) => {
const { reasoning, node_id, is_final } = reasoningData
const reasoningContent = responseItem.reasoningContent || (responseItem.reasoningContent = {})
const key = node_id || '_'
if (reasoning)
reasoningContent[key] = (reasoningContent[key] || '') + reasoning
if (is_final)
responseItem.reasoningFinished = true
updateCurrentQAOnTree({
placeholderQuestionId,
questionItem,
responseItem,
parentId: params.parent_message_id,
})
},
async onCompleted(hasError?: boolean, errorMessage?: string) {
const { workflowRunningData } = workflowStore.getState()
handleResponding(false)
@ -711,6 +728,18 @@ export const useChat = (
if (taskId)
taskIdRef.current = taskId
},
onReasoning: ({ data: reasoningData }: ReasoningChunkResponse) => {
const { message_id, reasoning, node_id, is_final } = reasoningData
updateChatTreeNode(message_id, (responseItem) => {
const reasoningContent = responseItem.reasoningContent || (responseItem.reasoningContent = {})
const key = node_id || '_'
if (reasoning)
reasoningContent[key] = (reasoningContent[key] || '') + reasoning
if (is_final)
responseItem.reasoningFinished = true
})
},
async onCompleted(hasError?: boolean) {
const { workflowRunningData } = workflowStore.getState()
handleResponding(false)

View File

@ -217,6 +217,57 @@ describe('handleStream', () => {
expect(onCompleted).toHaveBeenCalled()
})
it('should dispatch reasoning_chunk events to onReasoning', async () => {
// Arrange
const onData = vi.fn()
const onCompleted = vi.fn()
const onReasoning = vi.fn()
const reasoningEvent = {
event: 'reasoning_chunk',
task_id: 'task-1',
data: { message_id: 'm-1', reasoning: 'let me think', node_id: 'llm', is_final: false },
}
const mockReader = {
read: vi.fn()
.mockResolvedValueOnce({
done: false,
value: new TextEncoder().encode(`data: ${JSON.stringify(reasoningEvent)}\n`),
})
.mockResolvedValueOnce({
done: true,
value: undefined,
}),
}
const mockResponse = {
ok: true,
body: {
getReader: () => mockReader,
},
} as unknown as Response
// onReasoning is the last positional handler; fill the unused intervening slots.
const interveningNoops = Array.from({ length: 29 }, () => undefined)
// Act
;(handleStream as (...args: unknown[]) => void)(
mockResponse,
onData,
onCompleted,
...interveningNoops,
onReasoning,
)
// Wait for the stream to be processed
await new Promise(resolve => setTimeout(resolve, 50))
// Assert - the full event object is forwarded to onReasoning, answer stays untouched
expect(onReasoning).toHaveBeenCalledWith(reasoningEvent)
expect(onData).not.toHaveBeenCalled()
})
it('should throw error when response is not ok', () => {
// Arrange
const onData = vi.fn()

View File

@ -21,6 +21,7 @@ import type {
NodeStartedResponse,
ParallelBranchFinishedResponse,
ParallelBranchStartedResponse,
ReasoningChunkResponse,
TextChunkResponse,
TextReplaceResponse,
WorkflowFinishedResponse,
@ -66,6 +67,7 @@ type IOnIterationFinished = (workflowFinished: IterationFinishedResponse) => voi
type IOnParallelBranchStarted = (parallelBranchStarted: ParallelBranchStartedResponse) => void
type IOnParallelBranchFinished = (parallelBranchFinished: ParallelBranchFinishedResponse) => void
type IOnTextChunk = (textChunk: TextChunkResponse) => void
type IOnReasoning = (reasoningChunk: ReasoningChunkResponse) => void
type IOnTTSChunk = (messageId: string, audioStr: string, audioType?: string) => void
type IOnTTSEnd = (messageId: string, audioStr: string, audioType?: string) => void
type IOnTextReplace = (textReplace: TextReplaceResponse) => void
@ -95,6 +97,7 @@ export type IOtherOptions = {
request?: Request
onData?: IOnData // for stream
onReasoning?: IOnReasoning
onThought?: IOnThought
onFile?: IOnFile
onMessageEnd?: IOnMessageEnd
@ -223,6 +226,7 @@ export const handleStream = (
onDataSourceNodeProcessing?: IOnDataSourceNodeProcessing,
onDataSourceNodeCompleted?: IOnDataSourceNodeCompleted,
onDataSourceNodeError?: IOnDataSourceNodeError,
onReasoning?: IOnReasoning,
) => {
if (!response.ok)
throw new Error('Network response was not ok')
@ -340,6 +344,9 @@ export const handleStream = (
else if (bufferObj.event === 'text_chunk') {
onTextChunk?.(bufferObj as TextChunkResponse)
}
else if (bufferObj.event === 'reasoning_chunk') {
onReasoning?.(bufferObj as ReasoningChunkResponse)
}
else if (bufferObj.event === 'text_replace') {
onTextReplace?.(bufferObj as TextReplaceResponse)
}
@ -461,6 +468,7 @@ export const ssePost = async (
const {
isPublicAPI = false,
onData,
onReasoning,
onCompleted,
onThought,
onFile,
@ -599,6 +607,7 @@ export const ssePost = async (
onDataSourceNodeProcessing,
onDataSourceNodeCompleted,
onDataSourceNodeError,
onReasoning,
)
})
.catch((e) => {
@ -616,6 +625,7 @@ export const sseGet = async (
const {
isPublicAPI = false,
onData,
onReasoning,
onCompleted,
onThought,
onFile,
@ -747,6 +757,7 @@ export const sseGet = async (
onDataSourceNodeProcessing,
onDataSourceNodeCompleted,
onDataSourceNodeError,
onReasoning,
)
})
.catch((e) => {

View File

@ -309,6 +309,17 @@ export type TextChunkResponse = {
}
}
export type ReasoningChunkResponse = {
task_id: string
event: string
data: {
message_id: string
reasoning: string
node_id?: string
is_final?: boolean
}
}
export type TextReplaceResponse = {
task_id: string
workflow_run_id: string