feat: surface separated-mode LLM reasoning in CLI and workflow run preview (#37828)

This commit is contained in:
L1nSn0w 2026-06-24 16:02:28 +08:00 committed by GitHub
parent 32dc9ff2d9
commit 94d365ea5e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
35 changed files with 1198 additions and 11 deletions

View File

@ -31,6 +31,7 @@ from core.app.entities.queue_entities import (
QueueNodeStartedEvent,
QueueNodeSucceededEvent,
QueuePingEvent,
QueueReasoningChunkEvent,
QueueStopEvent,
QueueTextChunkEvent,
QueueWorkflowFailedEvent,
@ -47,6 +48,7 @@ from core.app.entities.task_entities import (
MessageAudioEndStreamResponse,
MessageAudioStreamResponse,
PingStreamResponse,
ReasoningChunkStreamResponse,
StreamResponse,
TextChunkStreamResponse,
WorkflowAppBlockingResponse,
@ -571,6 +573,22 @@ class WorkflowAppGenerateTaskPipeline(GraphRuntimeStateSupport):
yield self._text_chunk_to_stream_response(delta_text, from_variable_selector=event.from_variable_selector)
def _handle_reasoning_chunk_event(
self, event: QueueReasoningChunkEvent, **kwargs
) -> Generator[StreamResponse, None, None]:
"""Handle reasoning chunk events."""
# is_final with empty reasoning is still forwarded as the "thinking finished" signal
if not event.reasoning and not event.is_final:
return
yield ReasoningChunkStreamResponse(
task_id=self._application_generate_entity.task_id,
data=ReasoningChunkStreamResponse.Data(
reasoning=event.reasoning,
node_id=event.from_node_id,
is_final=event.is_final,
),
)
def _handle_agent_log_event(self, event: QueueAgentLogEvent, **kwargs) -> Generator[StreamResponse, None, None]:
"""Handle agent log events."""
yield self._workflow_response_converter.handle_agent_log(
@ -600,6 +618,7 @@ class WorkflowAppGenerateTaskPipeline(GraphRuntimeStateSupport):
QueuePingEvent: self._handle_ping_event,
QueueErrorEvent: self._handle_error_event,
QueueTextChunkEvent: self._handle_text_chunk_event,
QueueReasoningChunkEvent: self._handle_reasoning_chunk_event,
# Workflow events
QueueWorkflowStartedEvent: self._handle_workflow_started_event,
QueueWorkflowSucceededEvent: self._handle_workflow_succeeded_event,

View File

@ -743,7 +743,8 @@ class ReasoningChunkStreamResponse(StreamResponse):
Data entity
"""
message_id: str
# chat apps set this; workflow runs have no message
message_id: str | None = None
reasoning: str
node_id: str | None = None
is_final: bool = False

View File

@ -29,6 +29,7 @@ from core.app.entities.queue_entities import (
QueueNodeExceptionEvent,
QueueNodeFailedEvent,
QueuePingEvent,
QueueReasoningChunkEvent,
QueueRetrieverResourcesEvent,
QueueStopEvent,
QueueTextChunkEvent,
@ -46,6 +47,7 @@ from core.app.entities.task_entities import (
MessageAudioStreamResponse,
MessageEndStreamResponse,
PingStreamResponse,
ReasoningChunkStreamResponse,
)
from core.base.tts.app_generator_tts_publisher import AudioTrunk
from core.workflow.system_variables import build_system_variables
@ -196,6 +198,42 @@ class TestAdvancedChatGenerateTaskPipeline:
assert pipeline._task_state.answer == "hi"
assert responses
def test_handle_reasoning_chunk_event_emits_on_nonempty(self):
pipeline = _make_pipeline()
event = QueueReasoningChunkEvent(reasoning="pondering", from_node_id="llm-1", is_final=False)
responses = list(pipeline._handle_reasoning_chunk_event(event))
assert len(responses) == 1
response = responses[0]
assert isinstance(response, ReasoningChunkStreamResponse)
assert response.data.message_id == pipeline._message_id
assert response.data.reasoning == "pondering"
assert response.data.node_id == "llm-1"
assert response.data.is_final is False
# reasoning never touches the answer stream
assert pipeline._task_state.answer == ""
def test_handle_reasoning_chunk_event_drops_empty_nonfinal(self):
pipeline = _make_pipeline()
event = QueueReasoningChunkEvent(reasoning="", from_node_id="llm-1", is_final=False)
responses = list(pipeline._handle_reasoning_chunk_event(event))
assert responses == []
def test_handle_reasoning_chunk_event_emits_empty_final_marker(self):
pipeline = _make_pipeline()
event = QueueReasoningChunkEvent(reasoning="", from_node_id="llm-1", is_final=True)
responses = list(pipeline._handle_reasoning_chunk_event(event))
assert len(responses) == 1
response = responses[0]
assert isinstance(response, ReasoningChunkStreamResponse)
assert response.data.reasoning == ""
assert response.data.is_final is True
def test_listen_audio_msg_returns_audio_stream(self):
pipeline = _make_pipeline()
publisher = SimpleNamespace(check_and_get_audio=lambda: AudioTrunk(status="stream", audio="data"))
@ -319,6 +357,43 @@ class TestAdvancedChatGenerateTaskPipeline:
assert responses == ["done"]
assert pipeline._recorded_files
def test_handle_node_succeeded_event_records_llm_reasoning(self):
pipeline = _make_pipeline()
pipeline._workflow_response_converter.fetch_files_from_node_outputs = lambda outputs: []
pipeline._workflow_response_converter.workflow_node_finish_to_stream_response = lambda **kwargs: "done"
pipeline._save_output_for_event = lambda event, node_execution_id: None
event = SimpleNamespace(
node_type=BuiltinNodeTypes.LLM,
outputs={"reasoning_content": "first pass "},
node_execution_id="exec",
node_id="llm-1",
)
list(pipeline._handle_node_succeeded_event(event))
assert pipeline._task_state.metadata.reasoning == {"llm-1": "first pass "}
def test_handle_node_succeeded_event_accumulates_reasoning_across_passes(self):
pipeline = _make_pipeline()
pipeline._workflow_response_converter.fetch_files_from_node_outputs = lambda outputs: []
pipeline._workflow_response_converter.workflow_node_finish_to_stream_response = lambda **kwargs: "done"
pipeline._save_output_for_event = lambda event, node_execution_id: None
def _llm_event(reasoning: str):
return SimpleNamespace(
node_type=BuiltinNodeTypes.LLM,
outputs={"reasoning_content": reasoning},
node_execution_id="exec",
node_id="llm-1",
)
# Same node id across iteration/loop passes must accumulate, not overwrite.
list(pipeline._handle_node_succeeded_event(_llm_event("pass one ")))
list(pipeline._handle_node_succeeded_event(_llm_event("pass two")))
assert pipeline._task_state.metadata.reasoning == {"llm-1": "pass one pass two"}
def test_iteration_and_loop_handlers(self):
pipeline = _make_pipeline()
pipeline._workflow_run_id = "run-id"

View File

@ -16,6 +16,7 @@ from core.app.entities.queue_entities import (
QueueNodeFailedEvent,
QueueNodeRetryEvent,
QueueNodeSucceededEvent,
QueueReasoningChunkEvent,
QueueTextChunkEvent,
QueueWorkflowPausedEvent,
QueueWorkflowStartedEvent,
@ -34,6 +35,7 @@ from graphon.graph_events import (
NodeRunHumanInputFormFilledEvent,
NodeRunIterationSucceededEvent,
NodeRunLoopFailedEvent,
NodeRunReasoningChunkEvent,
NodeRunRetryEvent,
NodeRunStartedEvent,
NodeRunStreamChunkEvent,
@ -395,6 +397,17 @@ class TestWorkflowBasedAppRunner:
is_final=False,
),
)
runner._handle_event(
workflow_entry,
NodeRunReasoningChunkEvent(
id="exec",
node_id="node",
node_type=BuiltinNodeTypes.LLM,
selector=["node", "reasoning_content"],
chunk="thinking",
is_final=False,
),
)
runner._handle_event(
workflow_entry,
NodeRunAgentLogEvent(
@ -442,6 +455,7 @@ class TestWorkflowBasedAppRunner:
)
assert any(isinstance(event, QueueTextChunkEvent) for event in published)
assert any(isinstance(event, QueueReasoningChunkEvent) for event in published)
assert any(isinstance(event, QueueAgentLogEvent) for event in published)
assert any(isinstance(event, QueueIterationCompletedEvent) for event in published)
assert any(isinstance(event, QueueLoopCompletedEvent) for event in published)

View File

@ -26,6 +26,7 @@ from core.app.entities.queue_entities import (
QueueNodeStartedEvent,
QueueNodeSucceededEvent,
QueuePingEvent,
QueueReasoningChunkEvent,
QueueStopEvent,
QueueTextChunkEvent,
QueueWorkflowFailedEvent,
@ -40,6 +41,7 @@ from core.app.entities.task_entities import (
MessageAudioEndStreamResponse,
MessageAudioStreamResponse,
PingStreamResponse,
ReasoningChunkStreamResponse,
WorkflowAppPausedBlockingResponse,
WorkflowFinishStreamResponse,
WorkflowStartStreamResponse,
@ -265,6 +267,41 @@ class TestWorkflowGenerateTaskPipeline:
assert responses[0].data.text == "hi"
assert published == [queue_message]
def test_handle_reasoning_chunk_event_emits_on_nonempty(self):
pipeline = _make_pipeline()
event = QueueReasoningChunkEvent(reasoning="pondering", from_node_id="llm-1", is_final=False)
responses = list(pipeline._handle_reasoning_chunk_event(event))
assert len(responses) == 1
response = responses[0]
assert isinstance(response, ReasoningChunkStreamResponse)
# workflow runs have no message, so the id is omitted
assert response.data.message_id is None
assert response.data.reasoning == "pondering"
assert response.data.node_id == "llm-1"
assert response.data.is_final is False
def test_handle_reasoning_chunk_event_drops_empty_nonfinal(self):
pipeline = _make_pipeline()
event = QueueReasoningChunkEvent(reasoning="", from_node_id="llm-1", is_final=False)
responses = list(pipeline._handle_reasoning_chunk_event(event))
assert responses == []
def test_handle_reasoning_chunk_event_emits_empty_final_marker(self):
pipeline = _make_pipeline()
event = QueueReasoningChunkEvent(reasoning="", from_node_id="llm-1", is_final=True)
responses = list(pipeline._handle_reasoning_chunk_event(event))
assert len(responses) == 1
response = responses[0]
assert isinstance(response, ReasoningChunkStreamResponse)
assert response.data.reasoning == ""
assert response.data.is_final is True
def test_dispatch_event_handles_node_failed(self):
pipeline = _make_pipeline()
pipeline._workflow_response_converter.workflow_node_finish_to_stream_response = lambda **kwargs: "done"

View File

@ -29,7 +29,7 @@ export default class ResumeApp extends DifyCommand {
'workspace': Flags.string({ description: 'workspace id override' }),
'with-history': Flags.boolean({ description: 'Replay executed-node history before attaching to live stream.', default: false }),
'stream': Flags.boolean({ description: 'Print output live as tokens/events arrive. Default: collect and print at end.', default: false }),
'think': Flags.boolean({ description: 'Show model thinking/reasoning when available. Strips <think>...</think> blocks silently by default; with --think, thinking is printed to stderr.', default: false }),
'think': Flags.boolean({ description: 'Show model thinking/reasoning when available — both inline <think>...</think> blocks and separated reasoning streams. Hidden by default; with --think, thinking is printed to stderr.', default: false }),
'output': Flags.outputFormat({ options: [OutputFormat.JSON, OutputFormat.YAML, OutputFormat.TEXT], default: '' }),
'http-retry': httpRetryFlag,
}

View File

@ -7,6 +7,7 @@ import { collect, HitlPauseError } from '@/commands/run/app/sse-collector'
import { formatted, stringifyOutput } from '@/framework/output'
import { handle, unhandle } from '@/sys/index'
import { colorEnabled, colorScheme } from '@/sys/io/color'
import { reasoningBlocksFromMetadata } from '@/sys/io/reasoning'
import { startSpinner } from '@/sys/io/spinner'
import { extractThinkBlocks, filterThinkInOutputs, stripThinkBlocks } from '@/sys/io/think-filter'
@ -99,6 +100,13 @@ export class StreamingStructuredStrategy implements RunStrategy {
}
}
// Surface separated-mode reasoning (carried in message_end metadata) to stderr under --think.
if (ctx.think) {
const reasoningBlocks = reasoningBlocksFromMetadata(processedResp.metadata)
if (reasoningBlocks !== '')
deps.io.err.write(`${reasoningBlocks}\n`)
}
const respMode = typeof processedResp.mode === 'string' && processedResp.mode !== '' ? processedResp.mode : mode
deps.io.out.write(stringifyOutput(formatted({ format, data: newAppRunObject(respMode, processedResp) })))
if (isText && CHAT_MODES.has(respMode)) {

View File

@ -35,7 +35,7 @@ export default class RunApp extends DifyCommand {
'workflow-id': Flags.string({ description: 'Pin to a specific published workflow version' }),
'workspace': Flags.string({ description: 'Workspace id (overrides DIFY_WORKSPACE_ID and stored default)' }),
'stream': Flags.boolean({ description: 'Print output live as tokens/events arrive (default: collect and print at end)', default: false }),
'think': Flags.boolean({ description: 'Show model thinking/reasoning when available. Strips <think>...</think> blocks silently by default; with --think, thinking is printed to stderr.', default: false }),
'think': Flags.boolean({ description: 'Show model thinking/reasoning when available — both inline <think>...</think> blocks and separated reasoning streams. Hidden by default; with --think, thinking is printed to stderr.', default: false }),
'retry-on-limit': Flags.boolean({ description: 'On a 429 rate limit, wait and retry this POST (bounded) instead of failing immediately. Off by default since running an app is not idempotent.', default: false }),
'http-retry': httpRetryFlag,
'output': Flags.outputFormat({ options: [OutputFormat.JSON, OutputFormat.YAML, OutputFormat.TEXT], default: '' }),

View File

@ -203,6 +203,85 @@ describe('runApp', () => {
expect(io.errBuf()).toContain('secret reasoning')
})
it('--stream chat --think: routes separated reasoning to stderr, clean answer to stdout', async () => {
mock.setScenario('chat-reasoning')
const io = bufferStreams()
const cache = await loadAppInfoCache({ store: getCache(CACHE_APP_INFO) })
await runApp(
{ appId: 'app-1', message: 'hi', stream: true, think: true },
{ active: active(), http: testHttpClient(mock.url, 'dfoa_test'), host: mock.url, io, cache },
)
expect(io.outBuf()).toContain('final answer')
expect(io.outBuf()).not.toContain('secret reasoning')
expect(io.errBuf()).toContain('<think>')
expect(io.errBuf()).toContain('secret reasoning')
})
it('--stream chat without --think: separated reasoning stays hidden', async () => {
mock.setScenario('chat-reasoning')
const io = bufferStreams()
const cache = await loadAppInfoCache({ store: getCache(CACHE_APP_INFO) })
await runApp(
{ appId: 'app-1', message: 'hi', stream: true },
{ active: active(), http: testHttpClient(mock.url, 'dfoa_test'), host: mock.url, io, cache },
)
expect(io.outBuf()).toContain('final answer')
expect(io.errBuf()).not.toContain('secret reasoning')
})
it('chat -o json --think: echoes separated reasoning to stderr, persists it in metadata', async () => {
mock.setScenario('chat-reasoning')
const io = bufferStreams()
const cache = await loadAppInfoCache({ store: getCache(CACHE_APP_INFO) })
await runApp(
{ appId: 'app-1', message: 'hi', format: 'json', think: true },
{ active: active(), http: testHttpClient(mock.url, 'dfoa_test'), host: mock.url, io, cache },
)
expect(io.errBuf()).toContain('secret reasoning')
const parsed = JSON.parse(io.outBuf()) as { answer: string, metadata: { reasoning: Record<string, string> } }
expect(parsed.answer).toBe('final answer')
expect(parsed.metadata.reasoning).toEqual({ 'llm-1': 'secret reasoning' })
})
it('--stream workflow --think: routes separated reasoning to stderr, clean outputs to stdout', async () => {
mock.setScenario('workflow-reasoning')
const io = bufferStreams()
const cache = await loadAppInfoCache({ store: getCache(CACHE_APP_INFO) })
await runApp(
{ appId: 'app-2', inputs: { x: '1' }, stream: true, think: true },
{ active: active(), http: testHttpClient(mock.url, 'dfoa_test'), host: mock.url, io, cache },
)
expect(io.errBuf()).toContain('<think>')
expect(io.errBuf()).toContain('secret reasoning')
expect(io.outBuf()).toContain('final answer')
expect(io.outBuf()).not.toContain('secret reasoning')
})
it('workflow -o json --think: echoes reasoning to stderr, accumulates into metadata.reasoning', async () => {
mock.setScenario('workflow-reasoning')
const io = bufferStreams()
const cache = await loadAppInfoCache({ store: getCache(CACHE_APP_INFO) })
await runApp(
{ appId: 'app-2', inputs: { x: '1' }, format: 'json', think: true },
{ active: active(), http: testHttpClient(mock.url, 'dfoa_test'), host: mock.url, io, cache },
)
expect(io.errBuf()).toContain('secret reasoning')
const parsed = JSON.parse(io.outBuf()) as { metadata: { reasoning: Record<string, string> } }
expect(parsed.metadata.reasoning).toEqual({ 'llm-1': 'secret reasoning' })
})
it('--stream workflow without --think: reasoning stays hidden', async () => {
mock.setScenario('workflow-reasoning')
const io = bufferStreams()
const cache = await loadAppInfoCache({ store: getCache(CACHE_APP_INFO) })
await runApp(
{ appId: 'app-2', inputs: { x: '1' }, stream: true },
{ active: active(), http: testHttpClient(mock.url, 'dfoa_test'), host: mock.url, io, cache },
)
expect(io.outBuf()).toContain('final answer')
expect(io.errBuf()).not.toContain('secret reasoning')
})
it('stream-error scenario: error event surfaces typed BaseError', async () => {
mock.setScenario('stream-error')
const io = bufferStreams()

View File

@ -59,6 +59,41 @@ describe('collect — chat', () => {
})
})
describe('collect — chat separated reasoning', () => {
function reasoningEvent(reasoning: string, isFinal: boolean) {
return ev('reasoning_chunk', { data: { message_id: 'm1', reasoning, node_id: 'llm-1', is_final: isFinal } })
}
it('backfills metadata.reasoning from live deltas when the server omits it', async () => {
const got = await collect(iterOf(
reasoningEvent('pon', false),
reasoningEvent('dering', true),
ev('message', { message_id: 'm1', answer: 'answer' }),
ev('message_end', { metadata: { usage: { tokens: 3 } } }),
), 'advanced-chat')
expect(got.answer).toBe('answer')
expect((got.metadata as { reasoning?: unknown }).reasoning).toEqual({ 'llm-1': 'pondering' })
expect((got.metadata as { usage?: unknown }).usage).toEqual({ tokens: 3 })
})
it('keeps the server-persisted reasoning over live deltas', async () => {
const got = await collect(iterOf(
reasoningEvent('live', true),
ev('message', { answer: 'a' }),
ev('message_end', { metadata: { reasoning: { 'llm-1': 'persisted' } } }),
), 'advanced-chat')
expect((got.metadata as { reasoning?: unknown }).reasoning).toEqual({ 'llm-1': 'persisted' })
})
it('leaves metadata untouched when there is no reasoning at all', async () => {
const got = await collect(iterOf(
ev('message', { answer: 'a' }),
ev('message_end', { metadata: { usage: { tokens: 1 } } }),
), 'advanced-chat')
expect((got.metadata as { reasoning?: unknown }).reasoning).toBeUndefined()
})
})
describe('collect — agent-chat', () => {
it('captures agent_thoughts', async () => {
const got = await collect(iterOf(
@ -97,6 +132,39 @@ describe('collect — workflow', () => {
})
})
describe('collect — workflow separated reasoning', () => {
function wfReasoning(reasoning: string, nodeId: string, isFinal: boolean) {
return ev('reasoning_chunk', { data: { reasoning, node_id: nodeId, is_final: isFinal } })
}
it('accumulates reasoning_chunk per node into metadata.reasoning', async () => {
const got = await collect(iterOf(
ev('node_started', { id: 'llm-1' }),
wfReasoning('pon', 'llm-1', false),
wfReasoning('dering', 'llm-1', true),
ev('workflow_finished', { data: { status: 'succeeded', outputs: { result: 'clean' } } }),
), 'workflow')
expect((got.data as { outputs: { result: string } }).outputs.result).toBe('clean')
expect((got.metadata as { reasoning?: unknown }).reasoning).toEqual({ 'llm-1': 'pondering' })
})
it('keys reasoning by node, leaves metadata absent when there is none', async () => {
const got = await collect(iterOf(
ev('workflow_finished', { data: { status: 'succeeded', outputs: { result: 'clean' } } }),
), 'workflow')
expect((got.metadata as { reasoning?: unknown } | undefined)?.reasoning).toBeUndefined()
})
it('merges reasoning into metadata already carried by workflow_finished', async () => {
const got = await collect(iterOf(
wfReasoning('think', 'llm-1', true),
ev('workflow_finished', { data: { status: 'succeeded' }, metadata: { usage: { tokens: 7 } } }),
), 'workflow')
expect((got.metadata as { reasoning?: unknown }).reasoning).toEqual({ 'llm-1': 'think' })
expect((got.metadata as { usage?: unknown }).usage).toEqual({ tokens: 7 })
})
})
describe('collect — error event', () => {
it('throws BaseError when error event arrives', async () => {
await expect(collect(iterOf(

View File

@ -2,6 +2,7 @@ import type { BaseError } from '@/errors/base'
import type { SseEvent } from '@/http/sse'
import { HttpClientError, newError } from '@/errors/base'
import { ErrorCode } from '@/errors/codes'
import { accumulateReasoning, parseReasoningChunk } from '@/sys/io/reasoning'
import { RUN_MODES } from './handlers'
export type HitlPauseData = {
@ -67,6 +68,7 @@ class ChatCollector implements Collector {
private base: Record<string, unknown> = {}
private metadata: Record<string, unknown> | undefined
private thoughts: unknown[] = []
private readonly reasoning: Record<string, string> = {}
private readonly mode: string
private readonly isAgent: boolean
constructor(mode: string, isAgent: boolean) {
@ -84,6 +86,13 @@ class ChatCollector implements Collector {
copyScalar(this.base, c, ['id', 'conversation_id', 'message_id', 'task_id', 'created_at'])
return
}
// Accumulate separated-mode reasoning deltas per LLM node.
case 'reasoning_chunk': {
const chunk = parseReasoningChunk(c)
if (chunk !== undefined)
accumulateReasoning(this.reasoning, chunk)
return
}
case 'agent_thought':
this.thoughts.push(c)
return
@ -98,12 +107,23 @@ class ChatCollector implements Collector {
const out: Record<string, unknown> = { mode: this.mode, answer: this.answer, ...this.base }
if (this.metadata !== undefined)
out.metadata = this.metadata
// Fall back to live deltas only when the server didn't persist reasoning in metadata.
if (Object.keys(this.reasoning).length > 0 && !hasReasoning(this.metadata))
out.metadata = { ...(this.metadata ?? {}), reasoning: this.reasoning }
if (this.isAgent || this.thoughts.length > 0)
out.agent_thoughts = this.thoughts
return out
}
}
function hasReasoning(metadata: Record<string, unknown> | undefined): boolean {
const reasoning = metadata?.reasoning
return reasoning !== null
&& typeof reasoning === 'object'
&& !Array.isArray(reasoning)
&& Object.keys(reasoning as object).length > 0
}
class CompletionCollector implements Collector {
private answer = ''
private base: Record<string, unknown> = {}
@ -133,14 +153,29 @@ class CompletionCollector implements Collector {
class WorkflowCollector implements Collector {
private final: Record<string, unknown> | undefined
private readonly reasoning: Record<string, string> = {}
consume(ev: SseEvent): void {
if (ev.name === 'reasoning_chunk') {
const chunk = parseReasoningChunk(parseJson(ev.data))
if (chunk !== undefined)
accumulateReasoning(this.reasoning, chunk)
return
}
if (ev.name !== 'workflow_finished')
return
this.final = parseJson(ev.data)
}
finalize(): Record<string, unknown> {
return { mode: RUN_MODES.Workflow, ...(this.final ?? {}) }
const out: Record<string, unknown> = { mode: RUN_MODES.Workflow, ...(this.final ?? {}) }
// Workflow runs don't persist reasoning; surface live deltas under metadata.reasoning.
if (Object.keys(this.reasoning).length > 0) {
const existing = (out.metadata !== null && typeof out.metadata === 'object' && !Array.isArray(out.metadata))
? out.metadata as Record<string, unknown>
: undefined
out.metadata = { ...(existing ?? {}), reasoning: this.reasoning }
}
return out
}
}

View File

@ -37,6 +37,42 @@ describe('streamPrinterFor — chat', () => {
})
})
function reasoningEvent(reasoning: string, isFinal: boolean) {
return ev('reasoning_chunk', { data: { message_id: 'm1', reasoning, node_id: 'llm-1', is_final: isFinal } })
}
describe('streamPrinterFor — chat separated reasoning', () => {
it('think: true frames reasoning_chunk deltas to stderr, answer stays clean on stdout', () => {
const sp = streamPrinterFor('advanced-chat', true)
const cap = captures()
sp.onEvent(cap.out, cap.err, reasoningEvent('pon', false))
sp.onEvent(cap.out, cap.err, reasoningEvent('dering', false))
sp.onEvent(cap.out, cap.err, reasoningEvent('', true))
sp.onEvent(cap.out, cap.err, ev('message', { conversation_id: 'c1', answer: 'final answer' }))
sp.onEnd(cap.out, cap.err)
expect(cap.errBuf()).toContain('<think> [llm-1]\npondering</think>')
expect(cap.outBuf()).toBe('final answer\n')
})
it('think: false ignores reasoning_chunk entirely', () => {
const sp = streamPrinterFor('advanced-chat', false)
const cap = captures()
sp.onEvent(cap.out, cap.err, reasoningEvent('secret', true))
sp.onEvent(cap.out, cap.err, ev('message', { answer: 'hi' }))
sp.onEnd(cap.out, cap.err)
expect(cap.errBuf()).not.toContain('secret')
expect(cap.outBuf()).toBe('hi\n')
})
it('closes an unterminated reasoning block on stream end', () => {
const sp = streamPrinterFor('advanced-chat', true)
const cap = captures()
sp.onEvent(cap.out, cap.err, reasoningEvent('thinking', false))
sp.onEnd(cap.out, cap.err)
expect(cap.errBuf()).toContain('<think> [llm-1]\nthinking</think>')
})
})
describe('streamPrinterFor — agent-chat', () => {
it('writes agent_thought to stderr', () => {
const sp = streamPrinterFor('agent-chat')
@ -105,6 +141,62 @@ describe('streamPrinterFor — workflow think filtering', () => {
})
})
// Workflow reasoning_chunk events carry no message_id (workflow runs have no message).
function wfReasoning(reasoning: string, nodeId: string, isFinal: boolean) {
return ev('reasoning_chunk', { data: { reasoning, node_id: nodeId, is_final: isFinal } })
}
describe('streamPrinterFor — workflow separated reasoning', () => {
it('think: true frames reasoning_chunk to stderr, outputs stay clean on stdout', () => {
const sp = streamPrinterFor('workflow', true)
const cap = captures()
sp.onEvent(cap.out, cap.err, ev('node_started', { id: 'llm-1', title: 'LLM' }))
sp.onEvent(cap.out, cap.err, wfReasoning('pon', 'llm-1', false))
sp.onEvent(cap.out, cap.err, wfReasoning('dering', 'llm-1', false))
sp.onEvent(cap.out, cap.err, wfReasoning('', 'llm-1', true))
sp.onEvent(cap.out, cap.err, ev('workflow_finished', { data: { outputs: { result: 'clean answer' } } }))
sp.onEnd(cap.out, cap.err)
// node title precedes the reasoning block for attribution
expect(cap.errBuf()).toContain('→ LLM')
expect(cap.errBuf()).toContain('<think> [llm-1]\npondering</think>')
const parsed = JSON.parse(cap.outBuf().trim()) as { result: string }
expect(parsed.result).toBe('clean answer')
})
it('think: false drops reasoning_chunk entirely', () => {
const sp = streamPrinterFor('workflow', false)
const cap = captures()
sp.onEvent(cap.out, cap.err, wfReasoning('secret', 'llm-1', true))
sp.onEvent(cap.out, cap.err, ev('workflow_finished', { data: { outputs: { result: 'ok' } } }))
sp.onEnd(cap.out, cap.err)
expect(cap.errBuf()).not.toContain('secret')
const parsed = JSON.parse(cap.outBuf().trim()) as { result: string }
expect(parsed.result).toBe('ok')
})
it('closes an unterminated reasoning block on stream end', () => {
const sp = streamPrinterFor('workflow', true)
const cap = captures()
sp.onEvent(cap.out, cap.err, wfReasoning('thinking', 'llm-1', false))
sp.onEnd(cap.out, cap.err)
expect(cap.errBuf()).toContain('<think> [llm-1]\nthinking</think>')
})
it('keeps interleaved parallel-node reasoning in separate node-tagged blocks', () => {
const sp = streamPrinterFor('workflow', true)
const cap = captures()
sp.onEvent(cap.out, cap.err, wfReasoning('a1', 'llm-1', false))
sp.onEvent(cap.out, cap.err, wfReasoning('b1', 'llm-2', false))
sp.onEvent(cap.out, cap.err, wfReasoning('a2', 'llm-1', true))
sp.onEvent(cap.out, cap.err, wfReasoning('b2', 'llm-2', true))
sp.onEvent(cap.out, cap.err, ev('workflow_finished', { data: { outputs: { result: 'ok' } } }))
sp.onEnd(cap.out, cap.err)
expect(cap.errBuf()).toBe(
'<think> [llm-1]\na1</think>\n<think> [llm-2]\nb1</think>\n<think> [llm-1]\na2</think>\n<think> [llm-2]\nb2</think>\n',
)
})
})
describe('streamPrinterFor — unknown mode', () => {
it('throws', () => {
expect(() => streamPrinterFor('whatever')).toThrow()

View File

@ -4,6 +4,7 @@ import type { SseEvent } from '@/http/sse'
import { newError } from '@/errors/base'
import { ErrorCode } from '@/errors/codes'
import { colorEnabled, colorScheme } from '@/sys/io/color'
import { parseReasoningChunk, ReasoningChunkRenderer } from '@/sys/io/reasoning'
import { filterThinkInOutputs, ThinkChunkFilter } from '@/sys/io/think-filter'
import { RUN_MODES } from './handlers'
import { HitlPauseError } from './sse-collector'
@ -43,9 +44,12 @@ function handleCommonEvents(ev: SseEvent): boolean {
class ChatStreamPrinter implements StreamPrinter {
private convoId = ''
private readonly filter: ThinkChunkFilter
private readonly reasoning = new ReasoningChunkRenderer()
private readonly think: boolean
private readonly isTTY: boolean
constructor(think: boolean, isTTY = false) {
this.filter = new ThinkChunkFilter(think)
this.think = think
this.isTTY = isTTY
}
@ -62,6 +66,15 @@ class ChatStreamPrinter implements StreamPrinter {
this.convoId = c.conversation_id
return
}
// Stream separated-mode reasoning to stderr under --think.
case 'reasoning_chunk': {
if (!this.think)
return
const chunk = parseReasoningChunk(c)
if (chunk !== undefined)
this.reasoning.push(chunk, errOut)
return
}
case 'agent_thought':
if (typeof c.thought === 'string' && c.thought !== '')
errOut.write(`thought: ${c.thought}\n`)
@ -73,6 +86,7 @@ class ChatStreamPrinter implements StreamPrinter {
}
onEnd(out: NodeJS.WritableStream, errOut: NodeJS.WritableStream): void {
this.reasoning.flush(errOut)
this.filter.flush(out, errOut)
out.write('\n')
if (this.convoId !== '') {
@ -106,6 +120,7 @@ class CompletionStreamPrinter implements StreamPrinter {
class WorkflowStreamPrinter implements StreamPrinter {
private final: Record<string, unknown> | undefined
private readonly reasoning = new ReasoningChunkRenderer()
private readonly think: boolean
constructor(think: boolean) {
this.think = think
@ -124,6 +139,15 @@ class WorkflowStreamPrinter implements StreamPrinter {
errOut.write(`${title}\n`)
return
}
// Stream separated-mode reasoning to stderr under --think; the prior → title attributes the node.
case 'reasoning_chunk': {
if (!this.think)
return
const chunk = parseReasoningChunk(c)
if (chunk !== undefined)
this.reasoning.push(chunk, errOut)
return
}
case 'node_finished': {
const status = typeof c.status === 'string' ? c.status : ''
if (status !== '' && status !== 'succeeded') {
@ -138,6 +162,7 @@ class WorkflowStreamPrinter implements StreamPrinter {
}
onEnd(out: NodeJS.WritableStream, errOut: NodeJS.WritableStream): void {
this.reasoning.flush(errOut)
if (this.final === undefined)
return
const data = this.final.data

View File

@ -0,0 +1,128 @@
import { Buffer } from 'node:buffer'
import { PassThrough } from 'node:stream'
import { describe, expect, it } from 'vitest'
import {
accumulateReasoning,
formatReasoningBlocks,
parseReasoningChunk,
reasoningBlocksFromMetadata,
ReasoningChunkRenderer,
} from './reasoning'
function capture(): { err: PassThrough, errBuf: () => string } {
const err = new PassThrough()
const ec: Buffer[] = []
err.on('data', d => ec.push(d as Buffer))
return { err, errBuf: () => Buffer.concat(ec).toString('utf-8') }
}
describe('parseReasoningChunk', () => {
it('reads the payload nested under data', () => {
expect(parseReasoningChunk({ data: { reasoning: 'hi', node_id: 'llm-1', is_final: true } }))
.toEqual({ reasoning: 'hi', nodeId: 'llm-1', isFinal: true })
})
it('defaults missing/wrong-typed fields', () => {
expect(parseReasoningChunk({ data: {} })).toEqual({ reasoning: '', nodeId: '', isFinal: false })
})
it('returns undefined when data is absent or not an object', () => {
expect(parseReasoningChunk({})).toBeUndefined()
expect(parseReasoningChunk({ data: null })).toBeUndefined()
expect(parseReasoningChunk({ data: ['x'] })).toBeUndefined()
})
})
describe('ReasoningChunkRenderer', () => {
it('frames streamed deltas with a node-tagged <think> open/close on the terminal marker', () => {
const cap = capture()
const r = new ReasoningChunkRenderer()
r.push({ reasoning: 'pon', nodeId: 'llm-1', isFinal: false }, cap.err)
r.push({ reasoning: 'dering', nodeId: 'llm-1', isFinal: false }, cap.err)
r.push({ reasoning: '', nodeId: 'llm-1', isFinal: true }, cap.err)
expect(cap.errBuf()).toBe('<think> [llm-1]\npondering</think>\n')
})
it('emits separate node-tagged blocks per node', () => {
const cap = capture()
const r = new ReasoningChunkRenderer()
r.push({ reasoning: 'a', nodeId: 'n1', isFinal: true }, cap.err)
r.push({ reasoning: 'b', nodeId: 'n2', isFinal: true }, cap.err)
expect(cap.errBuf()).toBe('<think> [n1]\na</think>\n<think> [n2]\nb</think>\n')
})
it('tags each block with its node id so interleaved fragments stay distinguishable', () => {
const cap = capture()
const r = new ReasoningChunkRenderer()
r.push({ reasoning: 'a1', nodeId: 'n1', isFinal: false }, cap.err)
r.push({ reasoning: 'b1', nodeId: 'n2', isFinal: false }, cap.err)
r.push({ reasoning: 'a2', nodeId: 'n1', isFinal: true }, cap.err)
r.push({ reasoning: 'b2', nodeId: 'n2', isFinal: true }, cap.err)
expect(cap.errBuf()).toBe(
'<think> [n1]\na1</think>\n<think> [n2]\nb1</think>\n<think> [n1]\na2</think>\n<think> [n2]\nb2</think>\n',
)
})
it('omits the tag when the chunk carries no node id', () => {
const cap = capture()
const r = new ReasoningChunkRenderer()
r.push({ reasoning: 'plain', nodeId: '', isFinal: true }, cap.err)
expect(cap.errBuf()).toBe('<think>\nplain</think>\n')
})
it('flush closes a block left open by a truncated stream', () => {
const cap = capture()
const r = new ReasoningChunkRenderer()
r.push({ reasoning: 'half', nodeId: 'n1', isFinal: false }, cap.err)
r.flush(cap.err)
expect(cap.errBuf()).toBe('<think> [n1]\nhalf</think>\n')
})
it('a lone terminal marker with no reasoning emits nothing', () => {
const cap = capture()
const r = new ReasoningChunkRenderer()
r.push({ reasoning: '', nodeId: 'n1', isFinal: true }, cap.err)
expect(cap.errBuf()).toBe('')
})
})
describe('accumulateReasoning', () => {
it('appends deltas per node, falling back to "_" for a missing nodeId', () => {
const acc: Record<string, string> = {}
accumulateReasoning(acc, { reasoning: 'a', nodeId: 'n1', isFinal: false })
accumulateReasoning(acc, { reasoning: 'b', nodeId: 'n1', isFinal: false })
accumulateReasoning(acc, { reasoning: 'x', nodeId: '', isFinal: false })
expect(acc).toEqual({ n1: 'ab', _: 'x' })
})
it('ignores empty reasoning', () => {
const acc: Record<string, string> = {}
accumulateReasoning(acc, { reasoning: '', nodeId: 'n1', isFinal: true })
expect(acc).toEqual({})
})
})
describe('formatReasoningBlocks', () => {
it('frames and trims each node, joined by a separator', () => {
expect(formatReasoningBlocks({ n1: ' one ', n2: 'two' }))
.toBe('<think>\none\n</think>\n---\n<think>\ntwo\n</think>')
})
it('skips empty entries and returns empty for no reasoning', () => {
expect(formatReasoningBlocks({ n1: ' ' })).toBe('')
expect(formatReasoningBlocks({})).toBe('')
})
})
describe('reasoningBlocksFromMetadata', () => {
it('extracts reasoning from a metadata object', () => {
expect(reasoningBlocksFromMetadata({ reasoning: { n1: 'why' } }))
.toBe('<think>\nwhy\n</think>')
})
it('returns empty for tagged mode (empty reasoning) and malformed input', () => {
expect(reasoningBlocksFromMetadata({ reasoning: {} })).toBe('')
expect(reasoningBlocksFromMetadata(undefined)).toBe('')
expect(reasoningBlocksFromMetadata({ usage: { tokens: 1 } })).toBe('')
})
})

View File

@ -0,0 +1,99 @@
// Renders "separated"-mode reasoning (streamed on its own `reasoning_chunk` SSE
// channel) to stderr, so --think matches inline <think> (see think-filter.ts).
const THINK_OPEN = '<think>'
const THINK_CLOSE = '</think>'
export type ReasoningChunk = {
reasoning: string
nodeId: string
isFinal: boolean
}
// reasoning_chunk nests its payload under `data` (not top-level like `message`).
export function parseReasoningChunk(parsed: Record<string, unknown>): ReasoningChunk | undefined {
const data = parsed.data
if (data === null || typeof data !== 'object' || Array.isArray(data))
return undefined
const rec = data as Record<string, unknown>
return {
reasoning: typeof rec.reasoning === 'string' ? rec.reasoning : '',
nodeId: typeof rec.node_id === 'string' ? rec.node_id : '',
isFinal: rec.is_final === true,
}
}
// Bucket key for a chunk; falls back to a single bucket so live rendering and
// buffered collection key reasoning the same way.
export function reasoningKey(chunk: ReasoningChunk): string {
return chunk.nodeId !== '' ? chunk.nodeId : '_'
}
// Appends a reasoning delta to a per-node accumulator.
export function accumulateReasoning(acc: Record<string, string>, chunk: ReasoningChunk): void {
if (chunk.reasoning === '')
return
const key = reasoningKey(chunk)
acc[key] = (acc[key] ?? '') + chunk.reasoning
}
// Frames a live reasoning stream into stderr: <think> on the first delta,
// raw deltas thereafter, </think> on is_final. Parallel branches can interleave
// chunks from different nodes on one stream, so it keeps at most one block open,
// switches blocks on node change, and tags each block with its node id so the
// interleaved fragments stay distinguishable.
export class ReasoningChunkRenderer {
private openNode: string | undefined
push(chunk: ReasoningChunk, errOut: NodeJS.WritableStream): void {
const key = reasoningKey(chunk)
if (chunk.reasoning !== '') {
if (this.openNode !== key) {
this.closeActive(errOut)
errOut.write(chunk.nodeId !== '' ? `${THINK_OPEN} [${chunk.nodeId}]\n` : `${THINK_OPEN}\n`)
this.openNode = key
}
errOut.write(chunk.reasoning)
}
if (chunk.isFinal && this.openNode === key)
this.closeActive(errOut)
}
// Close a block left open by a truncated stream.
flush(errOut: NodeJS.WritableStream): void {
this.closeActive(errOut)
}
private closeActive(errOut: NodeJS.WritableStream): void {
if (this.openNode === undefined)
return
errOut.write(`${THINK_CLOSE}\n`)
this.openNode = undefined
}
}
// Frames fully-buffered reasoning (one entry per LLM node id) into <think> blocks.
export function formatReasoningBlocks(reasoning: Record<string, string>): string {
const blocks: string[] = []
for (const text of Object.values(reasoning)) {
const trimmed = text.trim()
if (trimmed !== '')
blocks.push(`${THINK_OPEN}\n${trimmed}\n${THINK_CLOSE}`)
}
return blocks.join('\n---\n')
}
// Frames per-node reasoning from a message_end `metadata` object; '' when absent.
export function reasoningBlocksFromMetadata(metadata: unknown): string {
if (metadata === null || typeof metadata !== 'object' || Array.isArray(metadata))
return ''
const reasoning = (metadata as Record<string, unknown>).reasoning
if (reasoning === null || typeof reasoning !== 'object' || Array.isArray(reasoning))
return ''
const map: Record<string, string> = {}
for (const [key, value] of Object.entries(reasoning as Record<string, unknown>)) {
if (typeof value === 'string')
map[key] = value
}
return formatReasoningBlocks(map)
}

View File

@ -67,3 +67,15 @@ DIFY_E2E_PASSWORD=
# DIFY_E2E_HITL_SINGLE_ACTION_APP_ID=
# DIFY_E2E_HITL_MULTI_NODE_APP_ID=
# DIFY_E2E_WS2_APP_ID=
# ── Separated-mode reasoning suite (opt-in) ─────────────────────────────────
# run-app-reasoning.e2e.ts is skipped unless DIFY_E2E_REASONING_APP_ID resolves.
# It needs a chatflow whose LLM node uses reasoning_format=separated AND a
# workspace with a default chat model configured.
#
# Either point at an existing app:
# DIFY_E2E_REASONING_APP_ID=
#
# …or auto-provision reasoning-chat.yml (→ app name "reasoning-bot"). Off by
# default so the shared bootstrap stays free of any model dependency.
# DIFY_E2E_REASONING_PROVISION=1

View File

@ -39,11 +39,12 @@ test/e2e/
│ ├── describe-app.e2e.ts — describe app
│ └── get-app-all-workspaces.e2e.ts — get app -A ([EE] multi-workspace cases)
└── run/
├── run-app-basic.e2e.ts — basic run, -o json, --inputs, streaming,
│ conversation, CI mode
├── run-app-streaming.e2e.ts — Ctrl+C / error-event / chunk timing
├── run-app-file.e2e.ts — --file upload (local + remote URL)
└── run-app-hitl.e2e.ts — HITL pause + resume
├── run-app-basic.e2e.ts — basic run, -o json, --inputs, streaming,
│ conversation, CI mode
├── run-app-streaming.e2e.ts — Ctrl+C / error-event / chunk timing
├── run-app-file.e2e.ts — --file upload (local + remote URL)
├── run-app-reasoning.e2e.ts — separated-mode reasoning (--think); opt-in
└── run-app-hitl.e2e.ts — HITL pause + resume
```
## Edition support
@ -137,6 +138,24 @@ global-setup will:
| `DIFY_E2E_HITL_SINGLE_ACTION_APP_ID` | |
| `DIFY_E2E_HITL_MULTI_NODE_APP_ID` | |
| `DIFY_E2E_WS2_APP_ID` | Override secondary workspace app ID (EE) |
| `DIFY_E2E_REASONING_APP_ID` | separated-reasoning chatflow app ID (opt-in) |
| `DIFY_E2E_REASONING_PROVISION` | `1` → auto-provision `reasoning-chat.yml` |
### Separated-mode reasoning suite (opt-in)
`run-app-reasoning.e2e.ts` verifies the out-of-band `reasoning_chunk` channel
(PR #37460): `--think` surfaces the chain-of-thought to stderr framed as
`<think>…</think>`, the answer stays clean, and `-o json` persists it under
`metadata.reasoning`. It is **skipped** unless `DIFY_E2E_REASONING_APP_ID`
resolves, because it runs a real LLM node and needs:
1. a chatflow whose LLM node uses `reasoning_format: separated`, and
1. a workspace with a default chat model configured.
Point `DIFY_E2E_REASONING_APP_ID` at such an app, or set
`DIFY_E2E_REASONING_PROVISION=1` to import the `reasoning-chat.yml` fixture
(its system prompt forces a `<think>` block, so any chat model triggers the
separated path — no dedicated reasoning model required).
## Running tests

View File

@ -0,0 +1,120 @@
# Chatflow that exercises separated-mode reasoning (PR #37460): the LLM node sets
# reasoning_format=separated, so the server strips <think>...</think> from the
# answer and streams the chain-of-thought on the out-of-band `reasoning_chunk`
# channel instead. The system prompt forces a <think> block, so the separated
# path triggers with any chat model — no dedicated reasoning model required.
#
# NOTE: the LLM node leaves model.provider/name empty and relies on the target
# workspace's configured default chat model. The run-app-reasoning E2E suite is
# gated on DIFY_E2E_REASONING_APP_ID, so it is skipped unless a server with a
# working model is wired up.
app:
description: e2e-test reasoning (separated mode)
icon: 🧠
icon_background: '#FFEAD5'
icon_type: emoji
mode: advanced-chat
name: reasoning-bot
use_icon_as_answer_icon: false
dependencies: []
kind: app
version: 0.6.0
workflow:
conversation_variables: []
environment_variables: []
features:
file_upload: {}
opening_statement: ''
retriever_resource:
enabled: true
sensitive_word_avoidance:
enabled: false
speech_to_text:
enabled: false
suggested_questions: []
suggested_questions_after_answer:
enabled: false
text_to_speech:
enabled: false
language: ''
voice: ''
graph:
edges:
- id: start-llm
source: '1755189262236'
sourceHandle: source
target: llm
targetHandle: target
- id: llm-answer
source: llm
sourceHandle: source
target: answer
targetHandle: target
nodes:
- data:
desc: ''
title: Start
type: start
variables: []
id: '1755189262236'
position:
x: 80
y: 282
sourcePosition: right
targetPosition: left
type: custom
- data:
context:
enabled: false
variable_selector: []
desc: ''
memory:
query_prompt_template: '{{#sys.query#}}'
window:
enabled: false
size: 10
model:
completion_params:
temperature: 0.7
mode: chat
name: ''
provider: ''
prompt_template:
- role: system
text: >-
You are a helpful assistant. Always reason step by step INSIDE a
single <think>...</think> block first, then write the final
answer AFTER the closing </think> tag. The final answer must not
contain any <think> tags.
reasoning_format: separated
selected: false
title: LLM
type: llm
variables: []
vision:
enabled: false
id: llm
position:
x: 380
y: 282
sourcePosition: right
targetPosition: left
type: custom
- data:
answer: '{{#llm.text#}}'
desc: ''
title: Answer
type: answer
variables: []
id: answer
position:
x: 680
y: 282
sourcePosition: right
targetPosition: left
type: custom
viewport:
x: 0
y: 0
zoom: 1
rag_pipeline_variables: []

View File

@ -37,6 +37,9 @@
* DIFY_E2E_HITL_EXTERNAL_APP_ID
* DIFY_E2E_HITL_SINGLE_ACTION_APP_ID
* DIFY_E2E_HITL_MULTI_NODE_APP_ID
* DIFY_E2E_REASONING_APP_ID Override separated-reasoning chatflow app ID
* DIFY_E2E_REASONING_PROVISION=1 Opt in to auto-provisioning reasoning-chat.yml
* (needs a workspace default chat model)
*/
/** Supported edition values. */
@ -74,6 +77,12 @@ export type E2EEnv = {
fileAppId: string
/** Chat app (advanced-chat) with a file input variable */
fileChatAppId: string
/**
* Chatflow whose LLM node uses reasoning_format=separated. Empty unless
* DIFY_E2E_REASONING_APP_ID is set or the fixture is auto-provisioned; the
* run-app-reasoning suite is skipped when empty.
*/
reasoningAppId: string
/**
* Secondary workspace ID EE only ("auto_test1").
* Empty in CE mode (CE has a single workspace).
@ -118,6 +127,7 @@ export type E2ECapabilities = {
workflowAppId: string
fileAppId: string
fileChatAppId: string
reasoningAppId: string
hitlAppId: string
hitlExternalAppId: string
hitlSingleActionAppId: string
@ -171,6 +181,7 @@ export function loadE2EEnv(): E2EEnv {
hitlMultiNodeAppId: process.env.DIFY_E2E_HITL_MULTI_NODE_APP_ID ?? '',
fileAppId: process.env.DIFY_E2E_FILE_APP_ID ?? '',
fileChatAppId: process.env.DIFY_E2E_FILE_CHAT_APP_ID ?? '',
reasoningAppId: process.env.DIFY_E2E_REASONING_APP_ID ?? '',
ws2Id: process.env.DIFY_E2E_WS2_ID ?? '',
ws2AppId: process.env.DIFY_E2E_WS2_APP_ID ?? '',
email: process.env.DIFY_E2E_EMAIL!,
@ -206,6 +217,7 @@ export function resolveEnv(caps: E2ECapabilities | undefined): E2EEnv {
workflowAppId: caps.workflowAppId || env.workflowAppId,
fileAppId: caps.fileAppId || env.fileAppId,
fileChatAppId: caps.fileChatAppId || env.fileChatAppId,
reasoningAppId: caps.reasoningAppId || env.reasoningAppId,
hitlAppId: caps.hitlAppId || env.hitlAppId,
hitlExternalAppId: caps.hitlExternalAppId || env.hitlExternalAppId,
hitlSingleActionAppId: caps.hitlSingleActionAppId || env.hitlSingleActionAppId,

View File

@ -182,6 +182,7 @@ export async function setup(project: TestProject): Promise<void> {
workflowAppId: '',
fileAppId: '',
fileChatAppId: '',
reasoningAppId: '',
hitlAppId: '',
hitlExternalAppId: '',
hitlSingleActionAppId: '',
@ -288,6 +289,7 @@ export async function setup(project: TestProject): Promise<void> {
workflowAppId: provisionedIds.DIFY_E2E_WORKFLOW_APP_ID || E.workflowAppId,
fileAppId: provisionedIds.DIFY_E2E_FILE_APP_ID || E.fileAppId,
fileChatAppId: provisionedIds.DIFY_E2E_FILE_CHAT_APP_ID || E.fileChatAppId,
reasoningAppId: provisionedIds.DIFY_E2E_REASONING_APP_ID || E.reasoningAppId,
hitlAppId: provisionedIds.DIFY_E2E_HITL_APP_ID || E.hitlAppId,
hitlExternalAppId: provisionedIds.DIFY_E2E_HITL_EXTERNAL_APP_ID || E.hitlExternalAppId,
hitlSingleActionAppId: provisionedIds.DIFY_E2E_HITL_SINGLE_ACTION_APP_ID || E.hitlSingleActionAppId,
@ -503,6 +505,12 @@ async function provisionApps(
['hitl-single-action.yml', 'DIFY_E2E_HITL_SINGLE_ACTION_APP_ID', primaryWsId],
['hitl-multi-node.yml', 'DIFY_E2E_HITL_MULTI_NODE_APP_ID', primaryWsId],
['file-chat.yml', 'DIFY_E2E_FILE_CHAT_APP_ID', primaryWsId],
// reasoning-chat.yml runs a real LLM node, so it is opt-in: provisioning it
// requires the workspace to have a default chat model configured. Off by
// default to keep the shared bootstrap free of any model dependency.
...(process.env.DIFY_E2E_REASONING_PROVISION === '1'
? [['reasoning-chat.yml', 'DIFY_E2E_REASONING_APP_ID', primaryWsId] as [string, string, string]]
: []),
...(edition === 'ee'
? [['ws2-workflow.yml', 'DIFY_E2E_WS2_APP_ID', secondaryWsId] as [string, string, string]]
: []),

View File

@ -0,0 +1,91 @@
/**
* E2E: difyctl run app separated-mode reasoning (PR #37460)
*
* Exercises the out-of-band `reasoning_chunk` SSE channel against a real server.
* Requires a chatflow whose LLM node uses reasoning_format=separated AND a
* workspace with a configured chat model. The whole suite is skipped unless
* DIFY_E2E_REASONING_APP_ID resolves (set it directly, or provision the
* reasoning-chat.yml fixture with DIFY_E2E_REASONING_PROVISION=1).
*
* Verifies the client adaptation:
* - --think surfaces the separated reasoning to stderr, framed as <think></think>
* - the answer (stdout) stays free of <think>
* - -o json persists the reasoning under metadata.reasoning
* - without --think, reasoning stays hidden
*/
import type { AuthFixture } from '../../helpers/cli.js'
import { afterEach, beforeEach, describe, expect, inject } from 'vitest'
import { assertExitCode, assertJson, assertStderrContains } from '../../helpers/assert.js'
import { registerConversation } from '../../helpers/cleanup-registry.js'
import { withAuthFixture } from '../../helpers/cli.js'
import { withRetry } from '../../helpers/retry.js'
import { optionalIt } from '../../helpers/skip.js'
import { resolveEnv } from '../../setup/env.js'
// @ts-expect-error — see test/e2e/helpers/vitest-context.ts for explanation
const caps = inject('e2eCapabilities') as import('../../setup/env.js').E2ECapabilities
const E = resolveEnv(caps)
// Skipped unless a separated-reasoning chatflow is wired up (needs a real model).
const reasoningIt = optionalIt(Boolean(E.reasoningAppId))
const QUERY = 'In one short sentence, why is the sky blue?'
describe('E2E / difyctl run app — separated reasoning', () => {
let fx: AuthFixture
beforeEach(async () => {
fx = await withAuthFixture(E)
})
afterEach(async () => {
await fx.cleanup()
})
reasoningIt('[P1] --think --stream surfaces reasoning on stderr, clean answer on stdout', async () => {
const result = await withRetry(
() => fx.r(['run', 'app', E.reasoningAppId, QUERY, '--think', '--stream']),
{ attempts: 3, delayMs: 1000 },
)
assertExitCode(result, 0)
expect(result.stdout.trim().length).toBeGreaterThan(0)
// Separated mode keeps the answer free of <think>; reasoning is framed on stderr.
expect(result.stdout).not.toContain('<think>')
assertStderrContains(result, '<think>')
})
reasoningIt('[P1] --think -o json persists reasoning under metadata.reasoning', async () => {
const result = await withRetry(
() => fx.r(['run', 'app', E.reasoningAppId, QUERY, '--think', '-o', 'json']),
{ attempts: 3, delayMs: 1000 },
)
assertExitCode(result, 0)
const parsed = assertJson<{
conversation_id?: string
answer: string
metadata?: { reasoning?: Record<string, string> }
}>(result)
if (parsed.conversation_id)
registerConversation(E.host, E.token, E.reasoningAppId, parsed.conversation_id)
const reasoning = parsed.metadata?.reasoning ?? {}
expect(Object.keys(reasoning).length).toBeGreaterThan(0)
expect(Object.values(reasoning).join('').length).toBeGreaterThan(0)
// --think also echoes the separated reasoning to stderr.
assertStderrContains(result, '<think>')
})
reasoningIt('[P1] without --think, reasoning stays hidden', async () => {
const result = await withRetry(
() => fx.r(['run', 'app', E.reasoningAppId, QUERY, '--stream']),
{ attempts: 3, delayMs: 1000 },
)
assertExitCode(result, 0)
expect(result.stdout.trim().length).toBeGreaterThan(0)
expect(result.stderr).not.toContain('<think>')
})
})

View File

@ -15,6 +15,8 @@ export type Scenario
| 'server-version-unsupported'
| 'run-422-stale'
| 'workflow-think'
| 'chat-reasoning'
| 'workflow-reasoning'
| 'import-pending'
| 'import-failed'

View File

@ -370,6 +370,32 @@ export function buildApp(getScenario: () => Scenario, state?: MockState): Hono {
])
return new Response(thinkSse, { status: 200, headers: { 'content-type': 'text/event-stream' } })
}
if (scenario === 'chat-reasoning') {
// Separated mode: reasoning streams out-of-band on `reasoning_chunk` (nested
// under `data`), the answer stays free of <think>, and the terminal reasoning
// is persisted into message_end metadata.
const reasoningSse = sseChunks([
{ event: 'reasoning_chunk', data: { data: { message_id: 'msg-1', reasoning: 'secret reasoning', node_id: 'llm-1', is_final: false } } },
{ event: 'reasoning_chunk', data: { data: { message_id: 'msg-1', reasoning: '', node_id: 'llm-1', is_final: true } } },
{ event: 'message', data: { message_id: 'msg-1', conversation_id: 'conv-1', mode: app.mode, answer: 'final answer' } },
{ event: 'message_end', data: { message_id: 'msg-1', conversation_id: 'conv-1', task_id: 'task-1', metadata: { reasoning: { 'llm-1': 'secret reasoning' } } } },
])
return new Response(reasoningSse, { status: 200, headers: { 'content-type': 'text/event-stream' } })
}
if (scenario === 'workflow-reasoning') {
// Separated mode in a workflow: reasoning streams out-of-band on
// `reasoning_chunk` (no message_id), outputs stay clean, and there is NO
// persisted metadata — the live deltas are the only source.
const wfReasoningSse = sseChunks([
{ event: 'workflow_started', data: { id: 'wf-run-1', workflow_id: 'wf-1' } },
{ event: 'node_started', data: { id: 'llm-1', title: 'LLM' } },
{ event: 'reasoning_chunk', data: { data: { reasoning: 'secret reasoning', node_id: 'llm-1', is_final: false } } },
{ event: 'reasoning_chunk', data: { data: { reasoning: '', node_id: 'llm-1', is_final: true } } },
{ event: 'node_finished', data: { id: 'llm-1', status: 'succeeded' } },
{ event: 'workflow_finished', data: { id: 'wf-run-1', workflow_id: 'wf-1', data: { id: 'wf-run-1', status: 'succeeded', outputs: { result: 'final answer' } } } },
])
return new Response(wfReasoningSse, { status: 200, headers: { 'content-type': 'text/event-stream' } })
}
const sse = streamingRunResponse(app.mode, query, isAgent)
return new Response(sse, { status: 200, headers: { 'content-type': 'text/event-stream' } })
})

View File

@ -40,6 +40,7 @@ const createHandlers = () => ({
handleWorkflowAgentLog: vi.fn(),
handleWorkflowTextChunk: vi.fn(),
handleWorkflowTextReplace: vi.fn(),
handleWorkflowReasoning: vi.fn(),
handleWorkflowPaused: vi.fn(),
})
@ -252,6 +253,7 @@ describe('useWorkflowRun callbacks helpers', () => {
callbacks.onAgentLog?.({ node_id: 'node-1' } as never)
callbacks.onTextChunk?.({ data: 'chunk' } as never)
callbacks.onTextReplace?.({ text: 'replacement' } as never)
callbacks.onReasoning?.({ data: { reasoning: 'thinking', node_id: 'node-1' } } as never)
callbacks.onHumanInputRequired?.({ node_id: 'node-1' } as never)
callbacks.onHumanInputFormFilled?.({ node_id: 'node-1' } as never)
callbacks.onHumanInputFormTimeout?.({ node_id: 'node-1' } as never)
@ -295,6 +297,7 @@ describe('useWorkflowRun callbacks helpers', () => {
expect(userCallbacks.onAgentLog).toHaveBeenCalled()
expect(handlers.handleWorkflowTextChunk).toHaveBeenCalled()
expect(handlers.handleWorkflowTextReplace).toHaveBeenCalled()
expect(handlers.handleWorkflowReasoning).toHaveBeenCalled()
expect(handlers.handleWorkflowNodeHumanInputRequired).toHaveBeenCalled()
expect(userCallbacks.onHumanInputRequired).toHaveBeenCalled()
expect(handlers.handleWorkflowNodeHumanInputFormFilled).toHaveBeenCalled()
@ -423,6 +426,7 @@ describe('useWorkflowRun callbacks helpers', () => {
finalCallbacks.onAgentLog?.({ node_id: 'node-1' } as never)
finalCallbacks.onTextChunk?.({ data: 'chunk' } as never)
finalCallbacks.onTextReplace?.({ text: 'replacement' } as never)
finalCallbacks.onReasoning?.({ data: { reasoning: 'thinking', node_id: 'node-1' } } as never)
finalCallbacks.onHumanInputRequired?.({ node_id: 'node-1' } as never)
finalCallbacks.onHumanInputFormFilled?.({ node_id: 'node-1' } as never)
finalCallbacks.onHumanInputFormTimeout?.({ node_id: 'node-1' } as never)
@ -461,6 +465,7 @@ describe('useWorkflowRun callbacks helpers', () => {
expect(userCallbacks.onAgentLog).toHaveBeenCalled()
expect(handlers.handleWorkflowTextChunk).toHaveBeenCalled()
expect(handlers.handleWorkflowTextReplace).toHaveBeenCalled()
expect(handlers.handleWorkflowReasoning).toHaveBeenCalled()
expect(handlers.handleWorkflowNodeHumanInputRequired).toHaveBeenCalled()
expect(userCallbacks.onHumanInputRequired).toHaveBeenCalled()
expect(handlers.handleWorkflowNodeHumanInputFormFilled).toHaveBeenCalled()

View File

@ -27,6 +27,7 @@ type WorkflowRunEventHandlers = {
handleWorkflowAgentLog: NonNullable<IOtherOptions['onAgentLog']>
handleWorkflowTextChunk: NonNullable<IOtherOptions['onTextChunk']>
handleWorkflowTextReplace: NonNullable<IOtherOptions['onTextReplace']>
handleWorkflowReasoning: NonNullable<IOtherOptions['onReasoning']>
handleWorkflowPaused: () => void
}
@ -114,6 +115,7 @@ export const createBaseWorkflowRunCallbacks = ({
handleWorkflowAgentLog,
handleWorkflowTextChunk,
handleWorkflowTextReplace,
handleWorkflowReasoning,
handleWorkflowPaused,
} = handlers
const {
@ -244,6 +246,9 @@ export const createBaseWorkflowRunCallbacks = ({
onTextReplace: (params) => {
handleWorkflowTextReplace(params)
},
onReasoning: (params) => {
handleWorkflowReasoning(params)
},
onTTSChunk: (messageId: string, audio: string) => {
if (!audio || audio === '')
return
@ -325,6 +330,7 @@ export const createFinalWorkflowRunCallbacks = ({
handleWorkflowAgentLog,
handleWorkflowTextChunk,
handleWorkflowTextReplace,
handleWorkflowReasoning,
handleWorkflowPaused,
} = handlers
const {
@ -439,6 +445,9 @@ export const createFinalWorkflowRunCallbacks = ({
onTextReplace: (params) => {
handleWorkflowTextReplace(params)
},
onReasoning: (params) => {
handleWorkflowReasoning(params)
},
onTTSChunk: (messageId: string, audio: string) => {
if (!audio || audio === '')
return

View File

@ -78,6 +78,8 @@ export const createRunningWorkflowState = () => {
},
tracing: [],
resultText: '',
reasoningContent: {},
reasoningFinished: false,
}
}

View File

@ -138,6 +138,7 @@ const useWorkflowRunBase = (doSyncWorkflowDraft: DoSyncWorkflowDraft) => {
handleWorkflowAgentLog,
handleWorkflowTextChunk,
handleWorkflowTextReplace,
handleWorkflowReasoning,
handleWorkflowPaused,
} = useWorkflowRunEvent()
@ -326,6 +327,7 @@ const useWorkflowRunBase = (doSyncWorkflowDraft: DoSyncWorkflowDraft) => {
handleWorkflowAgentLog,
handleWorkflowTextChunk,
handleWorkflowTextReplace,
handleWorkflowReasoning,
handleWorkflowPaused,
}
const userCallbacks = {
@ -443,7 +445,7 @@ const useWorkflowRunBase = (doSyncWorkflowDraft: DoSyncWorkflowDraft) => {
},
finalCallbacks,
)
}, [store, doSyncWorkflowDraft, workflowStore, pathname, handleWorkflowFailed, flowId, handleWorkflowStarted, handleWorkflowFinished, fetchInspectVars, invalidAllLastRun, invalidateRunHistory, handleWorkflowNodeStarted, handleWorkflowNodeFinished, handleWorkflowNodeIterationStarted, handleWorkflowNodeIterationNext, handleWorkflowNodeIterationFinished, handleWorkflowNodeLoopStarted, handleWorkflowNodeLoopNext, handleWorkflowNodeLoopFinished, handleWorkflowNodeRetry, handleWorkflowAgentLog, handleWorkflowTextChunk, handleWorkflowTextReplace, handleWorkflowPaused, handleWorkflowNodeHumanInputRequired, handleWorkflowNodeHumanInputFormFilled, handleWorkflowNodeHumanInputFormTimeout])
}, [store, doSyncWorkflowDraft, workflowStore, pathname, handleWorkflowFailed, flowId, handleWorkflowStarted, handleWorkflowFinished, fetchInspectVars, invalidAllLastRun, invalidateRunHistory, handleWorkflowNodeStarted, handleWorkflowNodeFinished, handleWorkflowNodeIterationStarted, handleWorkflowNodeIterationNext, handleWorkflowNodeIterationFinished, handleWorkflowNodeLoopStarted, handleWorkflowNodeLoopNext, handleWorkflowNodeLoopFinished, handleWorkflowNodeRetry, handleWorkflowAgentLog, handleWorkflowTextChunk, handleWorkflowTextReplace, handleWorkflowReasoning, handleWorkflowPaused, handleWorkflowNodeHumanInputRequired, handleWorkflowNodeHumanInputFormFilled, handleWorkflowNodeHumanInputFormTimeout])
const handleStopRun = useCallback((taskId: string) => {
const setStoppedState = () => {

View File

@ -0,0 +1,61 @@
import type { ReasoningChunkResponse } from '@/types/workflow'
import { baseRunningData, renderWorkflowHook } from '../../../__tests__/workflow-test-env'
import { useWorkflowReasoning } from '../use-workflow-reasoning'
const reasoningChunk = (data: Partial<ReasoningChunkResponse['data']>): ReasoningChunkResponse => ({
task_id: 'task-1',
event: 'reasoning_chunk',
data: { message_id: '', reasoning: '', ...data },
})
describe('useWorkflowReasoning', () => {
it('accumulates reasoning deltas per LLM node id', () => {
const { result, store } = renderWorkflowHook(() => useWorkflowReasoning(), {
initialStoreState: {
workflowRunningData: baseRunningData({ resultText: '' }),
},
})
result.current.handleWorkflowReasoning(reasoningChunk({ reasoning: 'let me ', node_id: 'llm' }))
result.current.handleWorkflowReasoning(reasoningChunk({ reasoning: 'think', node_id: 'llm' }))
const state = store.getState().workflowRunningData!
expect(state.reasoningContent).toEqual({ llm: 'let me think' })
expect(state.reasoningFinished).toBeFalsy()
})
it('keeps reasoning from multiple LLM nodes in separate buckets', () => {
const { result, store } = renderWorkflowHook(() => useWorkflowReasoning(), {
initialStoreState: { workflowRunningData: baseRunningData({ resultText: '' }) },
})
result.current.handleWorkflowReasoning(reasoningChunk({ reasoning: 'a', node_id: 'llm-1' }))
result.current.handleWorkflowReasoning(reasoningChunk({ reasoning: 'b', node_id: 'llm-2' }))
expect(store.getState().workflowRunningData!.reasoningContent).toEqual({ 'llm-1': 'a', 'llm-2': 'b' })
})
it('falls back to "_" when the chunk carries no node id', () => {
const { result, store } = renderWorkflowHook(() => useWorkflowReasoning(), {
initialStoreState: { workflowRunningData: baseRunningData({ resultText: '' }) },
})
result.current.handleWorkflowReasoning(reasoningChunk({ reasoning: 'x' }))
expect(store.getState().workflowRunningData!.reasoningContent).toEqual({ _: 'x' })
})
it('marks reasoning finished on the terminal marker without appending empty text', () => {
const { result, store } = renderWorkflowHook(() => useWorkflowReasoning(), {
initialStoreState: {
workflowRunningData: baseRunningData({ resultText: '', reasoningContent: { llm: 'done' } }),
},
})
result.current.handleWorkflowReasoning(reasoningChunk({ reasoning: '', node_id: 'llm', is_final: true }))
const state = store.getState().workflowRunningData!
expect(state.reasoningContent).toEqual({ llm: 'done' })
expect(state.reasoningFinished).toBe(true)
})
})

View File

@ -16,6 +16,7 @@ const handlers = vi.hoisted(() => ({
handleWorkflowNodeRetry: vi.fn(),
handleWorkflowTextChunk: vi.fn(),
handleWorkflowTextReplace: vi.fn(),
handleWorkflowReasoning: vi.fn(),
handleWorkflowAgentLog: vi.fn(),
handleWorkflowPaused: vi.fn(),
handleWorkflowNodeHumanInputRequired: vi.fn(),
@ -45,6 +46,10 @@ vi.mock('..', () => ({
useWorkflowNodeHumanInputFormTimeout: () => ({ handleWorkflowNodeHumanInputFormTimeout: handlers.handleWorkflowNodeHumanInputFormTimeout }),
}))
vi.mock('../use-workflow-reasoning', () => ({
useWorkflowReasoning: () => ({ handleWorkflowReasoning: handlers.handleWorkflowReasoning }),
}))
describe('useWorkflowRunEvent', () => {
it('returns the composed handlers from all workflow event hooks', () => {
const { result } = renderHook(() => useWorkflowRunEvent())

View File

@ -0,0 +1,30 @@
import type { ReasoningChunkResponse } from '@/types/workflow'
import { produce } from 'immer'
import { useCallback } from 'react'
import { useWorkflowStore } from '@/app/components/workflow/store'
export const useWorkflowReasoning = () => {
const workflowStore = useWorkflowStore()
const handleWorkflowReasoning = useCallback((params: ReasoningChunkResponse) => {
const { data: { reasoning, node_id, is_final } } = params
const {
workflowRunningData,
setWorkflowRunningData,
} = workflowStore.getState()
setWorkflowRunningData(produce(workflowRunningData!, (draft) => {
const reasoningContent = (draft.reasoningContent ||= {})
// key by LLM node so multiple nodes' reasoning stays separated
const key = node_id || '_'
if (reasoning)
reasoningContent[key] = (reasoningContent[key] || '') + reasoning
if (is_final)
draft.reasoningFinished = true
}))
}, [workflowStore])
return {
handleWorkflowReasoning,
}
}

View File

@ -19,6 +19,7 @@ import {
useWorkflowTextChunk,
useWorkflowTextReplace,
} from '.'
import { useWorkflowReasoning } from './use-workflow-reasoning'
export const useWorkflowRunEvent = () => {
const { handleWorkflowStarted } = useWorkflowStarted()
@ -35,6 +36,7 @@ export const useWorkflowRunEvent = () => {
const { handleWorkflowNodeRetry } = useWorkflowNodeRetry()
const { handleWorkflowTextChunk } = useWorkflowTextChunk()
const { handleWorkflowTextReplace } = useWorkflowTextReplace()
const { handleWorkflowReasoning } = useWorkflowReasoning()
const { handleWorkflowAgentLog } = useWorkflowAgentLog()
const { handleWorkflowPaused } = useWorkflowPaused()
const { handleWorkflowNodeHumanInputRequired } = useWorkflowNodeHumanInputRequired()
@ -56,6 +58,7 @@ export const useWorkflowRunEvent = () => {
handleWorkflowNodeRetry,
handleWorkflowTextChunk,
handleWorkflowTextReplace,
handleWorkflowReasoning,
handleWorkflowAgentLog,
handleWorkflowPaused,
handleWorkflowNodeHumanInputFormFilled,

View File

@ -71,6 +71,12 @@ vi.mock('@/app/components/workflow/run/tracing-panel', () => ({
default: ({ list }: { list: unknown[] }) => <div data-testid="tracing-panel">{list.length}</div>,
}))
vi.mock('@/app/components/base/chat/chat/answer/reasoning-panel', () => ({
default: ({ content, done }: { content: Record<string, string>, done: boolean }) => (
<div data-testid="reasoning-panel" data-done={String(done)}>{Object.keys(content).join(',')}</div>
),
}))
vi.mock('@/app/components/workflow/panel/inputs-panel', () => ({
default: ({ onRun }: { onRun: () => void }) => (
<button type="button" onClick={onRun}>
@ -341,6 +347,80 @@ describe('WorkflowPreview', () => {
expect(screen.getByTestId('result-panel')).toBeInTheDocument()
})
it('should render a single merged reasoning panel above the result on the result tab', async () => {
const user = userEvent.setup()
renderWorkflowComponent(
<WorkflowPreview />,
{
initialStoreState: {
workflowRunningData: {
...createWorkflowRunningData({
result: createWorkflowResult({ status: WorkflowRunningStatus.Running }),
}),
resultText: '',
reasoningContent: { 'llm-1': 'thinking a', 'llm-2': 'thinking b' },
} as NonNullable<Shape['workflowRunningData']>,
},
},
)
await user.click(screen.getByText('runLog.result'))
// one panel that carries both nodes' reasoning; still running → timer keeps ticking
const panels = screen.getAllByTestId('reasoning-panel')
expect(panels).toHaveLength(1)
expect(panels[0]).toHaveTextContent('llm-1,llm-2')
expect(panels[0]).toHaveAttribute('data-done', 'false')
})
it('should mark reasoning done once the answer starts streaming while still running', async () => {
const user = userEvent.setup()
renderWorkflowComponent(
<WorkflowPreview />,
{
initialStoreState: {
workflowRunningData: {
...createWorkflowRunningData({
result: createWorkflowResult({ status: WorkflowRunningStatus.Running }),
}),
resultText: 'the answer',
reasoningContent: { 'llm-1': 'thinking a' },
} as NonNullable<Shape['workflowRunningData']>,
},
},
)
await user.click(screen.getByText('runLog.result'))
// answer-started (resultText non-empty) freezes the timer even though the run is still Running
expect(screen.getByTestId('reasoning-panel')).toHaveAttribute('data-done', 'true')
})
it('should not render a reasoning panel when there is no reasoning content', async () => {
const user = userEvent.setup()
renderWorkflowComponent(
<WorkflowPreview />,
{
initialStoreState: {
workflowRunningData: {
...createWorkflowRunningData({
result: createWorkflowResult({ status: WorkflowRunningStatus.Running }),
}),
resultText: '',
reasoningContent: { llm: '' },
} as NonNullable<Shape['workflowRunningData']>,
},
},
)
await user.click(screen.getByText('runLog.result'))
expect(screen.queryByTestId('reasoning-panel')).not.toBeInTheDocument()
})
it('should switch to the tracing tab when result panel requests it', async () => {
const user = userEvent.setup()

View File

@ -10,6 +10,7 @@ import {
useState,
} from 'react'
import { useTranslation } from 'react-i18next'
import ReasoningPanel from '@/app/components/base/chat/chat/answer/reasoning-panel'
import Loading from '@/app/components/base/loading'
import { submitHumanInputForm } from '@/service/workflow'
import {
@ -203,6 +204,17 @@ const WorkflowPreview = () => {
humanInputFilledFormDataList={humanInputFilledFormDataList}
/>
)}
{workflowRunningData?.reasoningContent && Object.values(workflowRunningData.reasoningContent).some(Boolean) && (
<ReasoningPanel
content={workflowRunningData.reasoningContent}
// freeze the timer once the answer starts streaming — reasoningFinished and status only flip at run end
done={
!!workflowRunningData?.resultText?.trim()
|| !!workflowRunningData?.reasoningFinished
|| workflowRunningData?.result?.status !== WorkflowRunningStatus.Running
}
/>
)}
<ResultText
isRunning={workflowRunningData?.result?.status === WorkflowRunningStatus.Running || !workflowRunningData?.result}
isPaused={workflowRunningData?.result?.status === WorkflowRunningStatus.Paused}

View File

@ -1,6 +1,7 @@
'use client'
import type { FC } from 'react'
import { useTranslation } from 'react-i18next'
import { ChatContextProvider } from '@/app/components/base/chat/chat/context-provider'
import LoadingAnim from '@/app/components/base/chat/chat/loading-anim'
import { FileList } from '@/app/components/base/file-uploader'
import { ImageIndentLeft } from '@/app/components/base/icons/src/vender/line/editor'
@ -60,7 +61,10 @@ const ResultText: FC<ResultTextProps> = ({
<>
{outputs && (
<div className="px-4 py-2">
<Markdown content={outputs} />
{/* ThinkBlock's timer reads isResponding from ChatContext, which the run panel otherwise lacks. */}
<ChatContextProvider chatList={[]} isResponding={!!isRunning}>
<Markdown content={outputs} />
</ChatContextProvider>
</div>
)}
{!!allFiles?.length && allFiles.map(item => (

View File

@ -11,6 +11,10 @@ type PreviewRunningData = WorkflowRunningData & {
resultTabActive?: boolean
resultText?: string
resultTextSelectorKey?: string
// separated-mode reasoning deltas per LLM node id (live preview only)
reasoningContent?: Record<string, string>
// true once the terminal reasoning marker arrived
reasoningFinished?: boolean
// human input form schema or data cached when node is in 'Paused' status
extraContentAndFormData?: Record<string, unknown>
}