From 94d365ea5e9dc4d461b692d7f6fe8707c3705369 Mon Sep 17 00:00:00 2001 From: L1nSn0w Date: Wed, 24 Jun 2026 16:02:28 +0800 Subject: [PATCH] feat: surface separated-mode LLM reasoning in CLI and workflow run preview (#37828) --- .../apps/workflow/generate_task_pipeline.py | 19 +++ api/core/app/entities/task_entities.py | 3 +- .../test_generate_task_pipeline_core.py | 75 ++++++++++ .../app/apps/test_workflow_app_runner_core.py | 14 ++ .../test_generate_task_pipeline_core.py | 37 +++++ cli/src/commands/resume/app/index.ts | 2 +- .../app/_strategies/streaming-structured.ts | 8 ++ cli/src/commands/run/app/index.ts | 2 +- cli/src/commands/run/app/run.test.ts | 79 +++++++++++ .../commands/run/app/sse-collector.test.ts | 68 ++++++++++ cli/src/commands/run/app/sse-collector.ts | 37 ++++- .../commands/run/app/stream-handlers.test.ts | 92 +++++++++++++ cli/src/commands/run/app/stream-handlers.ts | 25 ++++ cli/src/sys/io/reasoning.test.ts | 128 ++++++++++++++++++ cli/src/sys/io/reasoning.ts | 99 ++++++++++++++ cli/test/e2e/.env.e2e.example | 12 ++ cli/test/e2e/README.md | 29 +++- cli/test/e2e/fixtures/apps/reasoning-chat.yml | 120 ++++++++++++++++ cli/test/e2e/setup/env.ts | 12 ++ cli/test/e2e/setup/global-setup.ts | 8 ++ .../e2e/suites/run/run-app-reasoning.e2e.ts | 91 +++++++++++++ cli/test/fixtures/dify-mock/scenarios.ts | 2 + cli/test/fixtures/dify-mock/server.ts | 26 ++++ .../use-workflow-run-callbacks.spec.ts | 5 + .../hooks/use-workflow-run-callbacks.ts | 9 ++ .../hooks/use-workflow-run-utils.ts | 2 + .../workflow-app/hooks/use-workflow-run.ts | 4 +- .../__tests__/use-workflow-reasoning.spec.ts | 61 +++++++++ .../__tests__/use-workflow-run-event.spec.ts | 5 + .../use-workflow-reasoning.ts | 30 ++++ .../use-workflow-run-event.ts | 3 + .../panel/__tests__/workflow-preview.spec.tsx | 80 +++++++++++ .../workflow/panel/workflow-preview.tsx | 12 ++ .../components/workflow/run/result-text.tsx | 6 +- .../workflow/store/workflow/workflow-slice.ts | 4 + 35 files changed, 1198 insertions(+), 11 deletions(-) create mode 100644 cli/src/sys/io/reasoning.test.ts create mode 100644 cli/src/sys/io/reasoning.ts create mode 100644 cli/test/e2e/fixtures/apps/reasoning-chat.yml create mode 100644 cli/test/e2e/suites/run/run-app-reasoning.e2e.ts create mode 100644 web/app/components/workflow/hooks/use-workflow-run-event/__tests__/use-workflow-reasoning.spec.ts create mode 100644 web/app/components/workflow/hooks/use-workflow-run-event/use-workflow-reasoning.ts diff --git a/api/core/app/apps/workflow/generate_task_pipeline.py b/api/core/app/apps/workflow/generate_task_pipeline.py index e52e1e9c9da..6ca4053d7de 100644 --- a/api/core/app/apps/workflow/generate_task_pipeline.py +++ b/api/core/app/apps/workflow/generate_task_pipeline.py @@ -31,6 +31,7 @@ from core.app.entities.queue_entities import ( QueueNodeStartedEvent, QueueNodeSucceededEvent, QueuePingEvent, + QueueReasoningChunkEvent, QueueStopEvent, QueueTextChunkEvent, QueueWorkflowFailedEvent, @@ -47,6 +48,7 @@ from core.app.entities.task_entities import ( MessageAudioEndStreamResponse, MessageAudioStreamResponse, PingStreamResponse, + ReasoningChunkStreamResponse, StreamResponse, TextChunkStreamResponse, WorkflowAppBlockingResponse, @@ -571,6 +573,22 @@ class WorkflowAppGenerateTaskPipeline(GraphRuntimeStateSupport): yield self._text_chunk_to_stream_response(delta_text, from_variable_selector=event.from_variable_selector) + def _handle_reasoning_chunk_event( + self, event: QueueReasoningChunkEvent, **kwargs + ) -> Generator[StreamResponse, None, None]: + """Handle reasoning chunk events.""" + # is_final with empty reasoning is still forwarded as the "thinking finished" signal + if not event.reasoning and not event.is_final: + return + yield ReasoningChunkStreamResponse( + task_id=self._application_generate_entity.task_id, + data=ReasoningChunkStreamResponse.Data( + reasoning=event.reasoning, + node_id=event.from_node_id, + is_final=event.is_final, + ), + ) + def _handle_agent_log_event(self, event: QueueAgentLogEvent, **kwargs) -> Generator[StreamResponse, None, None]: """Handle agent log events.""" yield self._workflow_response_converter.handle_agent_log( @@ -600,6 +618,7 @@ class WorkflowAppGenerateTaskPipeline(GraphRuntimeStateSupport): QueuePingEvent: self._handle_ping_event, QueueErrorEvent: self._handle_error_event, QueueTextChunkEvent: self._handle_text_chunk_event, + QueueReasoningChunkEvent: self._handle_reasoning_chunk_event, # Workflow events QueueWorkflowStartedEvent: self._handle_workflow_started_event, QueueWorkflowSucceededEvent: self._handle_workflow_succeeded_event, diff --git a/api/core/app/entities/task_entities.py b/api/core/app/entities/task_entities.py index f98fe6fb0be..ca5a26db55b 100644 --- a/api/core/app/entities/task_entities.py +++ b/api/core/app/entities/task_entities.py @@ -743,7 +743,8 @@ class ReasoningChunkStreamResponse(StreamResponse): Data entity """ - message_id: str + # chat apps set this; workflow runs have no message + message_id: str | None = None reasoning: str node_id: str | None = None is_final: bool = False diff --git a/api/tests/unit_tests/core/app/apps/advanced_chat/test_generate_task_pipeline_core.py b/api/tests/unit_tests/core/app/apps/advanced_chat/test_generate_task_pipeline_core.py index 28f416ac27f..4285fe088c6 100644 --- a/api/tests/unit_tests/core/app/apps/advanced_chat/test_generate_task_pipeline_core.py +++ b/api/tests/unit_tests/core/app/apps/advanced_chat/test_generate_task_pipeline_core.py @@ -29,6 +29,7 @@ from core.app.entities.queue_entities import ( QueueNodeExceptionEvent, QueueNodeFailedEvent, QueuePingEvent, + QueueReasoningChunkEvent, QueueRetrieverResourcesEvent, QueueStopEvent, QueueTextChunkEvent, @@ -46,6 +47,7 @@ from core.app.entities.task_entities import ( MessageAudioStreamResponse, MessageEndStreamResponse, PingStreamResponse, + ReasoningChunkStreamResponse, ) from core.base.tts.app_generator_tts_publisher import AudioTrunk from core.workflow.system_variables import build_system_variables @@ -196,6 +198,42 @@ class TestAdvancedChatGenerateTaskPipeline: assert pipeline._task_state.answer == "hi" assert responses + def test_handle_reasoning_chunk_event_emits_on_nonempty(self): + pipeline = _make_pipeline() + event = QueueReasoningChunkEvent(reasoning="pondering", from_node_id="llm-1", is_final=False) + + responses = list(pipeline._handle_reasoning_chunk_event(event)) + + assert len(responses) == 1 + response = responses[0] + assert isinstance(response, ReasoningChunkStreamResponse) + assert response.data.message_id == pipeline._message_id + assert response.data.reasoning == "pondering" + assert response.data.node_id == "llm-1" + assert response.data.is_final is False + # reasoning never touches the answer stream + assert pipeline._task_state.answer == "" + + def test_handle_reasoning_chunk_event_drops_empty_nonfinal(self): + pipeline = _make_pipeline() + event = QueueReasoningChunkEvent(reasoning="", from_node_id="llm-1", is_final=False) + + responses = list(pipeline._handle_reasoning_chunk_event(event)) + + assert responses == [] + + def test_handle_reasoning_chunk_event_emits_empty_final_marker(self): + pipeline = _make_pipeline() + event = QueueReasoningChunkEvent(reasoning="", from_node_id="llm-1", is_final=True) + + responses = list(pipeline._handle_reasoning_chunk_event(event)) + + assert len(responses) == 1 + response = responses[0] + assert isinstance(response, ReasoningChunkStreamResponse) + assert response.data.reasoning == "" + assert response.data.is_final is True + def test_listen_audio_msg_returns_audio_stream(self): pipeline = _make_pipeline() publisher = SimpleNamespace(check_and_get_audio=lambda: AudioTrunk(status="stream", audio="data")) @@ -319,6 +357,43 @@ class TestAdvancedChatGenerateTaskPipeline: assert responses == ["done"] assert pipeline._recorded_files + def test_handle_node_succeeded_event_records_llm_reasoning(self): + pipeline = _make_pipeline() + pipeline._workflow_response_converter.fetch_files_from_node_outputs = lambda outputs: [] + pipeline._workflow_response_converter.workflow_node_finish_to_stream_response = lambda **kwargs: "done" + pipeline._save_output_for_event = lambda event, node_execution_id: None + + event = SimpleNamespace( + node_type=BuiltinNodeTypes.LLM, + outputs={"reasoning_content": "first pass "}, + node_execution_id="exec", + node_id="llm-1", + ) + + list(pipeline._handle_node_succeeded_event(event)) + + assert pipeline._task_state.metadata.reasoning == {"llm-1": "first pass "} + + def test_handle_node_succeeded_event_accumulates_reasoning_across_passes(self): + pipeline = _make_pipeline() + pipeline._workflow_response_converter.fetch_files_from_node_outputs = lambda outputs: [] + pipeline._workflow_response_converter.workflow_node_finish_to_stream_response = lambda **kwargs: "done" + pipeline._save_output_for_event = lambda event, node_execution_id: None + + def _llm_event(reasoning: str): + return SimpleNamespace( + node_type=BuiltinNodeTypes.LLM, + outputs={"reasoning_content": reasoning}, + node_execution_id="exec", + node_id="llm-1", + ) + + # Same node id across iteration/loop passes must accumulate, not overwrite. + list(pipeline._handle_node_succeeded_event(_llm_event("pass one "))) + list(pipeline._handle_node_succeeded_event(_llm_event("pass two"))) + + assert pipeline._task_state.metadata.reasoning == {"llm-1": "pass one pass two"} + def test_iteration_and_loop_handlers(self): pipeline = _make_pipeline() pipeline._workflow_run_id = "run-id" diff --git a/api/tests/unit_tests/core/app/apps/test_workflow_app_runner_core.py b/api/tests/unit_tests/core/app/apps/test_workflow_app_runner_core.py index c463c155a52..69ed5919d27 100644 --- a/api/tests/unit_tests/core/app/apps/test_workflow_app_runner_core.py +++ b/api/tests/unit_tests/core/app/apps/test_workflow_app_runner_core.py @@ -16,6 +16,7 @@ from core.app.entities.queue_entities import ( QueueNodeFailedEvent, QueueNodeRetryEvent, QueueNodeSucceededEvent, + QueueReasoningChunkEvent, QueueTextChunkEvent, QueueWorkflowPausedEvent, QueueWorkflowStartedEvent, @@ -34,6 +35,7 @@ from graphon.graph_events import ( NodeRunHumanInputFormFilledEvent, NodeRunIterationSucceededEvent, NodeRunLoopFailedEvent, + NodeRunReasoningChunkEvent, NodeRunRetryEvent, NodeRunStartedEvent, NodeRunStreamChunkEvent, @@ -395,6 +397,17 @@ class TestWorkflowBasedAppRunner: is_final=False, ), ) + runner._handle_event( + workflow_entry, + NodeRunReasoningChunkEvent( + id="exec", + node_id="node", + node_type=BuiltinNodeTypes.LLM, + selector=["node", "reasoning_content"], + chunk="thinking", + is_final=False, + ), + ) runner._handle_event( workflow_entry, NodeRunAgentLogEvent( @@ -442,6 +455,7 @@ class TestWorkflowBasedAppRunner: ) assert any(isinstance(event, QueueTextChunkEvent) for event in published) + assert any(isinstance(event, QueueReasoningChunkEvent) for event in published) assert any(isinstance(event, QueueAgentLogEvent) for event in published) assert any(isinstance(event, QueueIterationCompletedEvent) for event in published) assert any(isinstance(event, QueueLoopCompletedEvent) for event in published) diff --git a/api/tests/unit_tests/core/app/apps/workflow/test_generate_task_pipeline_core.py b/api/tests/unit_tests/core/app/apps/workflow/test_generate_task_pipeline_core.py index 0aaee900e37..9a04014d620 100644 --- a/api/tests/unit_tests/core/app/apps/workflow/test_generate_task_pipeline_core.py +++ b/api/tests/unit_tests/core/app/apps/workflow/test_generate_task_pipeline_core.py @@ -26,6 +26,7 @@ from core.app.entities.queue_entities import ( QueueNodeStartedEvent, QueueNodeSucceededEvent, QueuePingEvent, + QueueReasoningChunkEvent, QueueStopEvent, QueueTextChunkEvent, QueueWorkflowFailedEvent, @@ -40,6 +41,7 @@ from core.app.entities.task_entities import ( MessageAudioEndStreamResponse, MessageAudioStreamResponse, PingStreamResponse, + ReasoningChunkStreamResponse, WorkflowAppPausedBlockingResponse, WorkflowFinishStreamResponse, WorkflowStartStreamResponse, @@ -265,6 +267,41 @@ class TestWorkflowGenerateTaskPipeline: assert responses[0].data.text == "hi" assert published == [queue_message] + def test_handle_reasoning_chunk_event_emits_on_nonempty(self): + pipeline = _make_pipeline() + event = QueueReasoningChunkEvent(reasoning="pondering", from_node_id="llm-1", is_final=False) + + responses = list(pipeline._handle_reasoning_chunk_event(event)) + + assert len(responses) == 1 + response = responses[0] + assert isinstance(response, ReasoningChunkStreamResponse) + # workflow runs have no message, so the id is omitted + assert response.data.message_id is None + assert response.data.reasoning == "pondering" + assert response.data.node_id == "llm-1" + assert response.data.is_final is False + + def test_handle_reasoning_chunk_event_drops_empty_nonfinal(self): + pipeline = _make_pipeline() + event = QueueReasoningChunkEvent(reasoning="", from_node_id="llm-1", is_final=False) + + responses = list(pipeline._handle_reasoning_chunk_event(event)) + + assert responses == [] + + def test_handle_reasoning_chunk_event_emits_empty_final_marker(self): + pipeline = _make_pipeline() + event = QueueReasoningChunkEvent(reasoning="", from_node_id="llm-1", is_final=True) + + responses = list(pipeline._handle_reasoning_chunk_event(event)) + + assert len(responses) == 1 + response = responses[0] + assert isinstance(response, ReasoningChunkStreamResponse) + assert response.data.reasoning == "" + assert response.data.is_final is True + def test_dispatch_event_handles_node_failed(self): pipeline = _make_pipeline() pipeline._workflow_response_converter.workflow_node_finish_to_stream_response = lambda **kwargs: "done" diff --git a/cli/src/commands/resume/app/index.ts b/cli/src/commands/resume/app/index.ts index 596654181b5..c4bbfbf2f56 100644 --- a/cli/src/commands/resume/app/index.ts +++ b/cli/src/commands/resume/app/index.ts @@ -29,7 +29,7 @@ export default class ResumeApp extends DifyCommand { 'workspace': Flags.string({ description: 'workspace id override' }), 'with-history': Flags.boolean({ description: 'Replay executed-node history before attaching to live stream.', default: false }), 'stream': Flags.boolean({ description: 'Print output live as tokens/events arrive. Default: collect and print at end.', default: false }), - 'think': Flags.boolean({ description: 'Show model thinking/reasoning when available. Strips ... blocks silently by default; with --think, thinking is printed to stderr.', default: false }), + 'think': Flags.boolean({ description: 'Show model thinking/reasoning when available — both inline ... blocks and separated reasoning streams. Hidden by default; with --think, thinking is printed to stderr.', default: false }), 'output': Flags.outputFormat({ options: [OutputFormat.JSON, OutputFormat.YAML, OutputFormat.TEXT], default: '' }), 'http-retry': httpRetryFlag, } diff --git a/cli/src/commands/run/app/_strategies/streaming-structured.ts b/cli/src/commands/run/app/_strategies/streaming-structured.ts index 3ed602a3410..de39bfda2e5 100644 --- a/cli/src/commands/run/app/_strategies/streaming-structured.ts +++ b/cli/src/commands/run/app/_strategies/streaming-structured.ts @@ -7,6 +7,7 @@ import { collect, HitlPauseError } from '@/commands/run/app/sse-collector' import { formatted, stringifyOutput } from '@/framework/output' import { handle, unhandle } from '@/sys/index' import { colorEnabled, colorScheme } from '@/sys/io/color' +import { reasoningBlocksFromMetadata } from '@/sys/io/reasoning' import { startSpinner } from '@/sys/io/spinner' import { extractThinkBlocks, filterThinkInOutputs, stripThinkBlocks } from '@/sys/io/think-filter' @@ -99,6 +100,13 @@ export class StreamingStructuredStrategy implements RunStrategy { } } + // Surface separated-mode reasoning (carried in message_end metadata) to stderr under --think. + if (ctx.think) { + const reasoningBlocks = reasoningBlocksFromMetadata(processedResp.metadata) + if (reasoningBlocks !== '') + deps.io.err.write(`${reasoningBlocks}\n`) + } + const respMode = typeof processedResp.mode === 'string' && processedResp.mode !== '' ? processedResp.mode : mode deps.io.out.write(stringifyOutput(formatted({ format, data: newAppRunObject(respMode, processedResp) }))) if (isText && CHAT_MODES.has(respMode)) { diff --git a/cli/src/commands/run/app/index.ts b/cli/src/commands/run/app/index.ts index 815d708d9d8..6b63e17e81e 100644 --- a/cli/src/commands/run/app/index.ts +++ b/cli/src/commands/run/app/index.ts @@ -35,7 +35,7 @@ export default class RunApp extends DifyCommand { 'workflow-id': Flags.string({ description: 'Pin to a specific published workflow version' }), 'workspace': Flags.string({ description: 'Workspace id (overrides DIFY_WORKSPACE_ID and stored default)' }), 'stream': Flags.boolean({ description: 'Print output live as tokens/events arrive (default: collect and print at end)', default: false }), - 'think': Flags.boolean({ description: 'Show model thinking/reasoning when available. Strips ... blocks silently by default; with --think, thinking is printed to stderr.', default: false }), + 'think': Flags.boolean({ description: 'Show model thinking/reasoning when available — both inline ... blocks and separated reasoning streams. Hidden by default; with --think, thinking is printed to stderr.', default: false }), 'retry-on-limit': Flags.boolean({ description: 'On a 429 rate limit, wait and retry this POST (bounded) instead of failing immediately. Off by default since running an app is not idempotent.', default: false }), 'http-retry': httpRetryFlag, 'output': Flags.outputFormat({ options: [OutputFormat.JSON, OutputFormat.YAML, OutputFormat.TEXT], default: '' }), diff --git a/cli/src/commands/run/app/run.test.ts b/cli/src/commands/run/app/run.test.ts index 57b02aeb47d..a4e201c8ee1 100644 --- a/cli/src/commands/run/app/run.test.ts +++ b/cli/src/commands/run/app/run.test.ts @@ -203,6 +203,85 @@ describe('runApp', () => { expect(io.errBuf()).toContain('secret reasoning') }) + it('--stream chat --think: routes separated reasoning to stderr, clean answer to stdout', async () => { + mock.setScenario('chat-reasoning') + const io = bufferStreams() + const cache = await loadAppInfoCache({ store: getCache(CACHE_APP_INFO) }) + await runApp( + { appId: 'app-1', message: 'hi', stream: true, think: true }, + { active: active(), http: testHttpClient(mock.url, 'dfoa_test'), host: mock.url, io, cache }, + ) + expect(io.outBuf()).toContain('final answer') + expect(io.outBuf()).not.toContain('secret reasoning') + expect(io.errBuf()).toContain('') + expect(io.errBuf()).toContain('secret reasoning') + }) + + it('--stream chat without --think: separated reasoning stays hidden', async () => { + mock.setScenario('chat-reasoning') + const io = bufferStreams() + const cache = await loadAppInfoCache({ store: getCache(CACHE_APP_INFO) }) + await runApp( + { appId: 'app-1', message: 'hi', stream: true }, + { active: active(), http: testHttpClient(mock.url, 'dfoa_test'), host: mock.url, io, cache }, + ) + expect(io.outBuf()).toContain('final answer') + expect(io.errBuf()).not.toContain('secret reasoning') + }) + + it('chat -o json --think: echoes separated reasoning to stderr, persists it in metadata', async () => { + mock.setScenario('chat-reasoning') + const io = bufferStreams() + const cache = await loadAppInfoCache({ store: getCache(CACHE_APP_INFO) }) + await runApp( + { appId: 'app-1', message: 'hi', format: 'json', think: true }, + { active: active(), http: testHttpClient(mock.url, 'dfoa_test'), host: mock.url, io, cache }, + ) + expect(io.errBuf()).toContain('secret reasoning') + const parsed = JSON.parse(io.outBuf()) as { answer: string, metadata: { reasoning: Record } } + expect(parsed.answer).toBe('final answer') + expect(parsed.metadata.reasoning).toEqual({ 'llm-1': 'secret reasoning' }) + }) + + it('--stream workflow --think: routes separated reasoning to stderr, clean outputs to stdout', async () => { + mock.setScenario('workflow-reasoning') + const io = bufferStreams() + const cache = await loadAppInfoCache({ store: getCache(CACHE_APP_INFO) }) + await runApp( + { appId: 'app-2', inputs: { x: '1' }, stream: true, think: true }, + { active: active(), http: testHttpClient(mock.url, 'dfoa_test'), host: mock.url, io, cache }, + ) + expect(io.errBuf()).toContain('') + expect(io.errBuf()).toContain('secret reasoning') + expect(io.outBuf()).toContain('final answer') + expect(io.outBuf()).not.toContain('secret reasoning') + }) + + it('workflow -o json --think: echoes reasoning to stderr, accumulates into metadata.reasoning', async () => { + mock.setScenario('workflow-reasoning') + const io = bufferStreams() + const cache = await loadAppInfoCache({ store: getCache(CACHE_APP_INFO) }) + await runApp( + { appId: 'app-2', inputs: { x: '1' }, format: 'json', think: true }, + { active: active(), http: testHttpClient(mock.url, 'dfoa_test'), host: mock.url, io, cache }, + ) + expect(io.errBuf()).toContain('secret reasoning') + const parsed = JSON.parse(io.outBuf()) as { metadata: { reasoning: Record } } + expect(parsed.metadata.reasoning).toEqual({ 'llm-1': 'secret reasoning' }) + }) + + it('--stream workflow without --think: reasoning stays hidden', async () => { + mock.setScenario('workflow-reasoning') + const io = bufferStreams() + const cache = await loadAppInfoCache({ store: getCache(CACHE_APP_INFO) }) + await runApp( + { appId: 'app-2', inputs: { x: '1' }, stream: true }, + { active: active(), http: testHttpClient(mock.url, 'dfoa_test'), host: mock.url, io, cache }, + ) + expect(io.outBuf()).toContain('final answer') + expect(io.errBuf()).not.toContain('secret reasoning') + }) + it('stream-error scenario: error event surfaces typed BaseError', async () => { mock.setScenario('stream-error') const io = bufferStreams() diff --git a/cli/src/commands/run/app/sse-collector.test.ts b/cli/src/commands/run/app/sse-collector.test.ts index e92d3694d3f..d26abb5608b 100644 --- a/cli/src/commands/run/app/sse-collector.test.ts +++ b/cli/src/commands/run/app/sse-collector.test.ts @@ -59,6 +59,41 @@ describe('collect — chat', () => { }) }) +describe('collect — chat separated reasoning', () => { + function reasoningEvent(reasoning: string, isFinal: boolean) { + return ev('reasoning_chunk', { data: { message_id: 'm1', reasoning, node_id: 'llm-1', is_final: isFinal } }) + } + + it('backfills metadata.reasoning from live deltas when the server omits it', async () => { + const got = await collect(iterOf( + reasoningEvent('pon', false), + reasoningEvent('dering', true), + ev('message', { message_id: 'm1', answer: 'answer' }), + ev('message_end', { metadata: { usage: { tokens: 3 } } }), + ), 'advanced-chat') + expect(got.answer).toBe('answer') + expect((got.metadata as { reasoning?: unknown }).reasoning).toEqual({ 'llm-1': 'pondering' }) + expect((got.metadata as { usage?: unknown }).usage).toEqual({ tokens: 3 }) + }) + + it('keeps the server-persisted reasoning over live deltas', async () => { + const got = await collect(iterOf( + reasoningEvent('live', true), + ev('message', { answer: 'a' }), + ev('message_end', { metadata: { reasoning: { 'llm-1': 'persisted' } } }), + ), 'advanced-chat') + expect((got.metadata as { reasoning?: unknown }).reasoning).toEqual({ 'llm-1': 'persisted' }) + }) + + it('leaves metadata untouched when there is no reasoning at all', async () => { + const got = await collect(iterOf( + ev('message', { answer: 'a' }), + ev('message_end', { metadata: { usage: { tokens: 1 } } }), + ), 'advanced-chat') + expect((got.metadata as { reasoning?: unknown }).reasoning).toBeUndefined() + }) +}) + describe('collect — agent-chat', () => { it('captures agent_thoughts', async () => { const got = await collect(iterOf( @@ -97,6 +132,39 @@ describe('collect — workflow', () => { }) }) +describe('collect — workflow separated reasoning', () => { + function wfReasoning(reasoning: string, nodeId: string, isFinal: boolean) { + return ev('reasoning_chunk', { data: { reasoning, node_id: nodeId, is_final: isFinal } }) + } + + it('accumulates reasoning_chunk per node into metadata.reasoning', async () => { + const got = await collect(iterOf( + ev('node_started', { id: 'llm-1' }), + wfReasoning('pon', 'llm-1', false), + wfReasoning('dering', 'llm-1', true), + ev('workflow_finished', { data: { status: 'succeeded', outputs: { result: 'clean' } } }), + ), 'workflow') + expect((got.data as { outputs: { result: string } }).outputs.result).toBe('clean') + expect((got.metadata as { reasoning?: unknown }).reasoning).toEqual({ 'llm-1': 'pondering' }) + }) + + it('keys reasoning by node, leaves metadata absent when there is none', async () => { + const got = await collect(iterOf( + ev('workflow_finished', { data: { status: 'succeeded', outputs: { result: 'clean' } } }), + ), 'workflow') + expect((got.metadata as { reasoning?: unknown } | undefined)?.reasoning).toBeUndefined() + }) + + it('merges reasoning into metadata already carried by workflow_finished', async () => { + const got = await collect(iterOf( + wfReasoning('think', 'llm-1', true), + ev('workflow_finished', { data: { status: 'succeeded' }, metadata: { usage: { tokens: 7 } } }), + ), 'workflow') + expect((got.metadata as { reasoning?: unknown }).reasoning).toEqual({ 'llm-1': 'think' }) + expect((got.metadata as { usage?: unknown }).usage).toEqual({ tokens: 7 }) + }) +}) + describe('collect — error event', () => { it('throws BaseError when error event arrives', async () => { await expect(collect(iterOf( diff --git a/cli/src/commands/run/app/sse-collector.ts b/cli/src/commands/run/app/sse-collector.ts index 043a9690d5f..b3b3a33b06c 100644 --- a/cli/src/commands/run/app/sse-collector.ts +++ b/cli/src/commands/run/app/sse-collector.ts @@ -2,6 +2,7 @@ import type { BaseError } from '@/errors/base' import type { SseEvent } from '@/http/sse' import { HttpClientError, newError } from '@/errors/base' import { ErrorCode } from '@/errors/codes' +import { accumulateReasoning, parseReasoningChunk } from '@/sys/io/reasoning' import { RUN_MODES } from './handlers' export type HitlPauseData = { @@ -67,6 +68,7 @@ class ChatCollector implements Collector { private base: Record = {} private metadata: Record | undefined private thoughts: unknown[] = [] + private readonly reasoning: Record = {} private readonly mode: string private readonly isAgent: boolean constructor(mode: string, isAgent: boolean) { @@ -84,6 +86,13 @@ class ChatCollector implements Collector { copyScalar(this.base, c, ['id', 'conversation_id', 'message_id', 'task_id', 'created_at']) return } + // Accumulate separated-mode reasoning deltas per LLM node. + case 'reasoning_chunk': { + const chunk = parseReasoningChunk(c) + if (chunk !== undefined) + accumulateReasoning(this.reasoning, chunk) + return + } case 'agent_thought': this.thoughts.push(c) return @@ -98,12 +107,23 @@ class ChatCollector implements Collector { const out: Record = { mode: this.mode, answer: this.answer, ...this.base } if (this.metadata !== undefined) out.metadata = this.metadata + // Fall back to live deltas only when the server didn't persist reasoning in metadata. + if (Object.keys(this.reasoning).length > 0 && !hasReasoning(this.metadata)) + out.metadata = { ...(this.metadata ?? {}), reasoning: this.reasoning } if (this.isAgent || this.thoughts.length > 0) out.agent_thoughts = this.thoughts return out } } +function hasReasoning(metadata: Record | undefined): boolean { + const reasoning = metadata?.reasoning + return reasoning !== null + && typeof reasoning === 'object' + && !Array.isArray(reasoning) + && Object.keys(reasoning as object).length > 0 +} + class CompletionCollector implements Collector { private answer = '' private base: Record = {} @@ -133,14 +153,29 @@ class CompletionCollector implements Collector { class WorkflowCollector implements Collector { private final: Record | undefined + private readonly reasoning: Record = {} consume(ev: SseEvent): void { + if (ev.name === 'reasoning_chunk') { + const chunk = parseReasoningChunk(parseJson(ev.data)) + if (chunk !== undefined) + accumulateReasoning(this.reasoning, chunk) + return + } if (ev.name !== 'workflow_finished') return this.final = parseJson(ev.data) } finalize(): Record { - return { mode: RUN_MODES.Workflow, ...(this.final ?? {}) } + const out: Record = { mode: RUN_MODES.Workflow, ...(this.final ?? {}) } + // Workflow runs don't persist reasoning; surface live deltas under metadata.reasoning. + if (Object.keys(this.reasoning).length > 0) { + const existing = (out.metadata !== null && typeof out.metadata === 'object' && !Array.isArray(out.metadata)) + ? out.metadata as Record + : undefined + out.metadata = { ...(existing ?? {}), reasoning: this.reasoning } + } + return out } } diff --git a/cli/src/commands/run/app/stream-handlers.test.ts b/cli/src/commands/run/app/stream-handlers.test.ts index bab45144a01..ec9cd957bb5 100644 --- a/cli/src/commands/run/app/stream-handlers.test.ts +++ b/cli/src/commands/run/app/stream-handlers.test.ts @@ -37,6 +37,42 @@ describe('streamPrinterFor — chat', () => { }) }) +function reasoningEvent(reasoning: string, isFinal: boolean) { + return ev('reasoning_chunk', { data: { message_id: 'm1', reasoning, node_id: 'llm-1', is_final: isFinal } }) +} + +describe('streamPrinterFor — chat separated reasoning', () => { + it('think: true frames reasoning_chunk deltas to stderr, answer stays clean on stdout', () => { + const sp = streamPrinterFor('advanced-chat', true) + const cap = captures() + sp.onEvent(cap.out, cap.err, reasoningEvent('pon', false)) + sp.onEvent(cap.out, cap.err, reasoningEvent('dering', false)) + sp.onEvent(cap.out, cap.err, reasoningEvent('', true)) + sp.onEvent(cap.out, cap.err, ev('message', { conversation_id: 'c1', answer: 'final answer' })) + sp.onEnd(cap.out, cap.err) + expect(cap.errBuf()).toContain(' [llm-1]\npondering') + expect(cap.outBuf()).toBe('final answer\n') + }) + + it('think: false ignores reasoning_chunk entirely', () => { + const sp = streamPrinterFor('advanced-chat', false) + const cap = captures() + sp.onEvent(cap.out, cap.err, reasoningEvent('secret', true)) + sp.onEvent(cap.out, cap.err, ev('message', { answer: 'hi' })) + sp.onEnd(cap.out, cap.err) + expect(cap.errBuf()).not.toContain('secret') + expect(cap.outBuf()).toBe('hi\n') + }) + + it('closes an unterminated reasoning block on stream end', () => { + const sp = streamPrinterFor('advanced-chat', true) + const cap = captures() + sp.onEvent(cap.out, cap.err, reasoningEvent('thinking', false)) + sp.onEnd(cap.out, cap.err) + expect(cap.errBuf()).toContain(' [llm-1]\nthinking') + }) +}) + describe('streamPrinterFor — agent-chat', () => { it('writes agent_thought to stderr', () => { const sp = streamPrinterFor('agent-chat') @@ -105,6 +141,62 @@ describe('streamPrinterFor — workflow think filtering', () => { }) }) +// Workflow reasoning_chunk events carry no message_id (workflow runs have no message). +function wfReasoning(reasoning: string, nodeId: string, isFinal: boolean) { + return ev('reasoning_chunk', { data: { reasoning, node_id: nodeId, is_final: isFinal } }) +} + +describe('streamPrinterFor — workflow separated reasoning', () => { + it('think: true frames reasoning_chunk to stderr, outputs stay clean on stdout', () => { + const sp = streamPrinterFor('workflow', true) + const cap = captures() + sp.onEvent(cap.out, cap.err, ev('node_started', { id: 'llm-1', title: 'LLM' })) + sp.onEvent(cap.out, cap.err, wfReasoning('pon', 'llm-1', false)) + sp.onEvent(cap.out, cap.err, wfReasoning('dering', 'llm-1', false)) + sp.onEvent(cap.out, cap.err, wfReasoning('', 'llm-1', true)) + sp.onEvent(cap.out, cap.err, ev('workflow_finished', { data: { outputs: { result: 'clean answer' } } })) + sp.onEnd(cap.out, cap.err) + // node title precedes the reasoning block for attribution + expect(cap.errBuf()).toContain('→ LLM') + expect(cap.errBuf()).toContain(' [llm-1]\npondering') + const parsed = JSON.parse(cap.outBuf().trim()) as { result: string } + expect(parsed.result).toBe('clean answer') + }) + + it('think: false drops reasoning_chunk entirely', () => { + const sp = streamPrinterFor('workflow', false) + const cap = captures() + sp.onEvent(cap.out, cap.err, wfReasoning('secret', 'llm-1', true)) + sp.onEvent(cap.out, cap.err, ev('workflow_finished', { data: { outputs: { result: 'ok' } } })) + sp.onEnd(cap.out, cap.err) + expect(cap.errBuf()).not.toContain('secret') + const parsed = JSON.parse(cap.outBuf().trim()) as { result: string } + expect(parsed.result).toBe('ok') + }) + + it('closes an unterminated reasoning block on stream end', () => { + const sp = streamPrinterFor('workflow', true) + const cap = captures() + sp.onEvent(cap.out, cap.err, wfReasoning('thinking', 'llm-1', false)) + sp.onEnd(cap.out, cap.err) + expect(cap.errBuf()).toContain(' [llm-1]\nthinking') + }) + + it('keeps interleaved parallel-node reasoning in separate node-tagged blocks', () => { + const sp = streamPrinterFor('workflow', true) + const cap = captures() + sp.onEvent(cap.out, cap.err, wfReasoning('a1', 'llm-1', false)) + sp.onEvent(cap.out, cap.err, wfReasoning('b1', 'llm-2', false)) + sp.onEvent(cap.out, cap.err, wfReasoning('a2', 'llm-1', true)) + sp.onEvent(cap.out, cap.err, wfReasoning('b2', 'llm-2', true)) + sp.onEvent(cap.out, cap.err, ev('workflow_finished', { data: { outputs: { result: 'ok' } } })) + sp.onEnd(cap.out, cap.err) + expect(cap.errBuf()).toBe( + ' [llm-1]\na1\n [llm-2]\nb1\n [llm-1]\na2\n [llm-2]\nb2\n', + ) + }) +}) + describe('streamPrinterFor — unknown mode', () => { it('throws', () => { expect(() => streamPrinterFor('whatever')).toThrow() diff --git a/cli/src/commands/run/app/stream-handlers.ts b/cli/src/commands/run/app/stream-handlers.ts index 1955d5996bf..7ce96cf53c6 100644 --- a/cli/src/commands/run/app/stream-handlers.ts +++ b/cli/src/commands/run/app/stream-handlers.ts @@ -4,6 +4,7 @@ import type { SseEvent } from '@/http/sse' import { newError } from '@/errors/base' import { ErrorCode } from '@/errors/codes' import { colorEnabled, colorScheme } from '@/sys/io/color' +import { parseReasoningChunk, ReasoningChunkRenderer } from '@/sys/io/reasoning' import { filterThinkInOutputs, ThinkChunkFilter } from '@/sys/io/think-filter' import { RUN_MODES } from './handlers' import { HitlPauseError } from './sse-collector' @@ -43,9 +44,12 @@ function handleCommonEvents(ev: SseEvent): boolean { class ChatStreamPrinter implements StreamPrinter { private convoId = '' private readonly filter: ThinkChunkFilter + private readonly reasoning = new ReasoningChunkRenderer() + private readonly think: boolean private readonly isTTY: boolean constructor(think: boolean, isTTY = false) { this.filter = new ThinkChunkFilter(think) + this.think = think this.isTTY = isTTY } @@ -62,6 +66,15 @@ class ChatStreamPrinter implements StreamPrinter { this.convoId = c.conversation_id return } + // Stream separated-mode reasoning to stderr under --think. + case 'reasoning_chunk': { + if (!this.think) + return + const chunk = parseReasoningChunk(c) + if (chunk !== undefined) + this.reasoning.push(chunk, errOut) + return + } case 'agent_thought': if (typeof c.thought === 'string' && c.thought !== '') errOut.write(`thought: ${c.thought}\n`) @@ -73,6 +86,7 @@ class ChatStreamPrinter implements StreamPrinter { } onEnd(out: NodeJS.WritableStream, errOut: NodeJS.WritableStream): void { + this.reasoning.flush(errOut) this.filter.flush(out, errOut) out.write('\n') if (this.convoId !== '') { @@ -106,6 +120,7 @@ class CompletionStreamPrinter implements StreamPrinter { class WorkflowStreamPrinter implements StreamPrinter { private final: Record | undefined + private readonly reasoning = new ReasoningChunkRenderer() private readonly think: boolean constructor(think: boolean) { this.think = think @@ -124,6 +139,15 @@ class WorkflowStreamPrinter implements StreamPrinter { errOut.write(`→ ${title}\n`) return } + // Stream separated-mode reasoning to stderr under --think; the prior → title attributes the node. + case 'reasoning_chunk': { + if (!this.think) + return + const chunk = parseReasoningChunk(c) + if (chunk !== undefined) + this.reasoning.push(chunk, errOut) + return + } case 'node_finished': { const status = typeof c.status === 'string' ? c.status : '' if (status !== '' && status !== 'succeeded') { @@ -138,6 +162,7 @@ class WorkflowStreamPrinter implements StreamPrinter { } onEnd(out: NodeJS.WritableStream, errOut: NodeJS.WritableStream): void { + this.reasoning.flush(errOut) if (this.final === undefined) return const data = this.final.data diff --git a/cli/src/sys/io/reasoning.test.ts b/cli/src/sys/io/reasoning.test.ts new file mode 100644 index 00000000000..1f6887a6108 --- /dev/null +++ b/cli/src/sys/io/reasoning.test.ts @@ -0,0 +1,128 @@ +import { Buffer } from 'node:buffer' +import { PassThrough } from 'node:stream' +import { describe, expect, it } from 'vitest' +import { + accumulateReasoning, + formatReasoningBlocks, + parseReasoningChunk, + reasoningBlocksFromMetadata, + ReasoningChunkRenderer, +} from './reasoning' + +function capture(): { err: PassThrough, errBuf: () => string } { + const err = new PassThrough() + const ec: Buffer[] = [] + err.on('data', d => ec.push(d as Buffer)) + return { err, errBuf: () => Buffer.concat(ec).toString('utf-8') } +} + +describe('parseReasoningChunk', () => { + it('reads the payload nested under data', () => { + expect(parseReasoningChunk({ data: { reasoning: 'hi', node_id: 'llm-1', is_final: true } })) + .toEqual({ reasoning: 'hi', nodeId: 'llm-1', isFinal: true }) + }) + + it('defaults missing/wrong-typed fields', () => { + expect(parseReasoningChunk({ data: {} })).toEqual({ reasoning: '', nodeId: '', isFinal: false }) + }) + + it('returns undefined when data is absent or not an object', () => { + expect(parseReasoningChunk({})).toBeUndefined() + expect(parseReasoningChunk({ data: null })).toBeUndefined() + expect(parseReasoningChunk({ data: ['x'] })).toBeUndefined() + }) +}) + +describe('ReasoningChunkRenderer', () => { + it('frames streamed deltas with a node-tagged open/close on the terminal marker', () => { + const cap = capture() + const r = new ReasoningChunkRenderer() + r.push({ reasoning: 'pon', nodeId: 'llm-1', isFinal: false }, cap.err) + r.push({ reasoning: 'dering', nodeId: 'llm-1', isFinal: false }, cap.err) + r.push({ reasoning: '', nodeId: 'llm-1', isFinal: true }, cap.err) + expect(cap.errBuf()).toBe(' [llm-1]\npondering\n') + }) + + it('emits separate node-tagged blocks per node', () => { + const cap = capture() + const r = new ReasoningChunkRenderer() + r.push({ reasoning: 'a', nodeId: 'n1', isFinal: true }, cap.err) + r.push({ reasoning: 'b', nodeId: 'n2', isFinal: true }, cap.err) + expect(cap.errBuf()).toBe(' [n1]\na\n [n2]\nb\n') + }) + + it('tags each block with its node id so interleaved fragments stay distinguishable', () => { + const cap = capture() + const r = new ReasoningChunkRenderer() + r.push({ reasoning: 'a1', nodeId: 'n1', isFinal: false }, cap.err) + r.push({ reasoning: 'b1', nodeId: 'n2', isFinal: false }, cap.err) + r.push({ reasoning: 'a2', nodeId: 'n1', isFinal: true }, cap.err) + r.push({ reasoning: 'b2', nodeId: 'n2', isFinal: true }, cap.err) + expect(cap.errBuf()).toBe( + ' [n1]\na1\n [n2]\nb1\n [n1]\na2\n [n2]\nb2\n', + ) + }) + + it('omits the tag when the chunk carries no node id', () => { + const cap = capture() + const r = new ReasoningChunkRenderer() + r.push({ reasoning: 'plain', nodeId: '', isFinal: true }, cap.err) + expect(cap.errBuf()).toBe('\nplain\n') + }) + + it('flush closes a block left open by a truncated stream', () => { + const cap = capture() + const r = new ReasoningChunkRenderer() + r.push({ reasoning: 'half', nodeId: 'n1', isFinal: false }, cap.err) + r.flush(cap.err) + expect(cap.errBuf()).toBe(' [n1]\nhalf\n') + }) + + it('a lone terminal marker with no reasoning emits nothing', () => { + const cap = capture() + const r = new ReasoningChunkRenderer() + r.push({ reasoning: '', nodeId: 'n1', isFinal: true }, cap.err) + expect(cap.errBuf()).toBe('') + }) +}) + +describe('accumulateReasoning', () => { + it('appends deltas per node, falling back to "_" for a missing nodeId', () => { + const acc: Record = {} + accumulateReasoning(acc, { reasoning: 'a', nodeId: 'n1', isFinal: false }) + accumulateReasoning(acc, { reasoning: 'b', nodeId: 'n1', isFinal: false }) + accumulateReasoning(acc, { reasoning: 'x', nodeId: '', isFinal: false }) + expect(acc).toEqual({ n1: 'ab', _: 'x' }) + }) + + it('ignores empty reasoning', () => { + const acc: Record = {} + accumulateReasoning(acc, { reasoning: '', nodeId: 'n1', isFinal: true }) + expect(acc).toEqual({}) + }) +}) + +describe('formatReasoningBlocks', () => { + it('frames and trims each node, joined by a separator', () => { + expect(formatReasoningBlocks({ n1: ' one ', n2: 'two' })) + .toBe('\none\n\n---\n\ntwo\n') + }) + + it('skips empty entries and returns empty for no reasoning', () => { + expect(formatReasoningBlocks({ n1: ' ' })).toBe('') + expect(formatReasoningBlocks({})).toBe('') + }) +}) + +describe('reasoningBlocksFromMetadata', () => { + it('extracts reasoning from a metadata object', () => { + expect(reasoningBlocksFromMetadata({ reasoning: { n1: 'why' } })) + .toBe('\nwhy\n') + }) + + it('returns empty for tagged mode (empty reasoning) and malformed input', () => { + expect(reasoningBlocksFromMetadata({ reasoning: {} })).toBe('') + expect(reasoningBlocksFromMetadata(undefined)).toBe('') + expect(reasoningBlocksFromMetadata({ usage: { tokens: 1 } })).toBe('') + }) +}) diff --git a/cli/src/sys/io/reasoning.ts b/cli/src/sys/io/reasoning.ts new file mode 100644 index 00000000000..2a88e3ae32b --- /dev/null +++ b/cli/src/sys/io/reasoning.ts @@ -0,0 +1,99 @@ +// Renders "separated"-mode reasoning (streamed on its own `reasoning_chunk` SSE +// channel) to stderr, so --think matches inline (see think-filter.ts). + +const THINK_OPEN = '' +const THINK_CLOSE = '' + +export type ReasoningChunk = { + reasoning: string + nodeId: string + isFinal: boolean +} + +// reasoning_chunk nests its payload under `data` (not top-level like `message`). +export function parseReasoningChunk(parsed: Record): ReasoningChunk | undefined { + const data = parsed.data + if (data === null || typeof data !== 'object' || Array.isArray(data)) + return undefined + const rec = data as Record + return { + reasoning: typeof rec.reasoning === 'string' ? rec.reasoning : '', + nodeId: typeof rec.node_id === 'string' ? rec.node_id : '', + isFinal: rec.is_final === true, + } +} + +// Bucket key for a chunk; falls back to a single bucket so live rendering and +// buffered collection key reasoning the same way. +export function reasoningKey(chunk: ReasoningChunk): string { + return chunk.nodeId !== '' ? chunk.nodeId : '_' +} + +// Appends a reasoning delta to a per-node accumulator. +export function accumulateReasoning(acc: Record, chunk: ReasoningChunk): void { + if (chunk.reasoning === '') + return + const key = reasoningKey(chunk) + acc[key] = (acc[key] ?? '') + chunk.reasoning +} + +// Frames a live reasoning stream into stderr: on the first delta, +// raw deltas thereafter, on is_final. Parallel branches can interleave +// chunks from different nodes on one stream, so it keeps at most one block open, +// switches blocks on node change, and tags each block with its node id so the +// interleaved fragments stay distinguishable. +export class ReasoningChunkRenderer { + private openNode: string | undefined + + push(chunk: ReasoningChunk, errOut: NodeJS.WritableStream): void { + const key = reasoningKey(chunk) + if (chunk.reasoning !== '') { + if (this.openNode !== key) { + this.closeActive(errOut) + errOut.write(chunk.nodeId !== '' ? `${THINK_OPEN} [${chunk.nodeId}]\n` : `${THINK_OPEN}\n`) + this.openNode = key + } + errOut.write(chunk.reasoning) + } + if (chunk.isFinal && this.openNode === key) + this.closeActive(errOut) + } + + // Close a block left open by a truncated stream. + flush(errOut: NodeJS.WritableStream): void { + this.closeActive(errOut) + } + + private closeActive(errOut: NodeJS.WritableStream): void { + if (this.openNode === undefined) + return + errOut.write(`${THINK_CLOSE}\n`) + this.openNode = undefined + } +} + +// Frames fully-buffered reasoning (one entry per LLM node id) into blocks. +export function formatReasoningBlocks(reasoning: Record): string { + const blocks: string[] = [] + for (const text of Object.values(reasoning)) { + const trimmed = text.trim() + if (trimmed !== '') + blocks.push(`${THINK_OPEN}\n${trimmed}\n${THINK_CLOSE}`) + } + return blocks.join('\n---\n') +} + +// Frames per-node reasoning from a message_end `metadata` object; '' when absent. +export function reasoningBlocksFromMetadata(metadata: unknown): string { + if (metadata === null || typeof metadata !== 'object' || Array.isArray(metadata)) + return '' + const reasoning = (metadata as Record).reasoning + if (reasoning === null || typeof reasoning !== 'object' || Array.isArray(reasoning)) + return '' + const map: Record = {} + for (const [key, value] of Object.entries(reasoning as Record)) { + if (typeof value === 'string') + map[key] = value + } + return formatReasoningBlocks(map) +} diff --git a/cli/test/e2e/.env.e2e.example b/cli/test/e2e/.env.e2e.example index 53fbedadf31..85f2219daeb 100644 --- a/cli/test/e2e/.env.e2e.example +++ b/cli/test/e2e/.env.e2e.example @@ -67,3 +67,15 @@ DIFY_E2E_PASSWORD= # DIFY_E2E_HITL_SINGLE_ACTION_APP_ID= # DIFY_E2E_HITL_MULTI_NODE_APP_ID= # DIFY_E2E_WS2_APP_ID= + +# ── Separated-mode reasoning suite (opt-in) ───────────────────────────────── +# run-app-reasoning.e2e.ts is skipped unless DIFY_E2E_REASONING_APP_ID resolves. +# It needs a chatflow whose LLM node uses reasoning_format=separated AND a +# workspace with a default chat model configured. +# +# Either point at an existing app: +# DIFY_E2E_REASONING_APP_ID= +# +# …or auto-provision reasoning-chat.yml (→ app name "reasoning-bot"). Off by +# default so the shared bootstrap stays free of any model dependency. +# DIFY_E2E_REASONING_PROVISION=1 diff --git a/cli/test/e2e/README.md b/cli/test/e2e/README.md index fd3e9182507..21635b0f6f9 100644 --- a/cli/test/e2e/README.md +++ b/cli/test/e2e/README.md @@ -39,11 +39,12 @@ test/e2e/ │ ├── describe-app.e2e.ts — describe app │ └── get-app-all-workspaces.e2e.ts — get app -A ([EE] multi-workspace cases) └── run/ - ├── run-app-basic.e2e.ts — basic run, -o json, --inputs, streaming, - │ conversation, CI mode - ├── run-app-streaming.e2e.ts — Ctrl+C / error-event / chunk timing - ├── run-app-file.e2e.ts — --file upload (local + remote URL) - └── run-app-hitl.e2e.ts — HITL pause + resume + ├── run-app-basic.e2e.ts — basic run, -o json, --inputs, streaming, + │ conversation, CI mode + ├── run-app-streaming.e2e.ts — Ctrl+C / error-event / chunk timing + ├── run-app-file.e2e.ts — --file upload (local + remote URL) + ├── run-app-reasoning.e2e.ts — separated-mode reasoning (--think); opt-in + └── run-app-hitl.e2e.ts — HITL pause + resume ``` ## Edition support @@ -137,6 +138,24 @@ global-setup will: | `DIFY_E2E_HITL_SINGLE_ACTION_APP_ID` | | | `DIFY_E2E_HITL_MULTI_NODE_APP_ID` | | | `DIFY_E2E_WS2_APP_ID` | Override secondary workspace app ID (EE) | +| `DIFY_E2E_REASONING_APP_ID` | separated-reasoning chatflow app ID (opt-in) | +| `DIFY_E2E_REASONING_PROVISION` | `1` → auto-provision `reasoning-chat.yml` | + +### Separated-mode reasoning suite (opt-in) + +`run-app-reasoning.e2e.ts` verifies the out-of-band `reasoning_chunk` channel +(PR #37460): `--think` surfaces the chain-of-thought to stderr framed as +``, the answer stays clean, and `-o json` persists it under +`metadata.reasoning`. It is **skipped** unless `DIFY_E2E_REASONING_APP_ID` +resolves, because it runs a real LLM node and needs: + +1. a chatflow whose LLM node uses `reasoning_format: separated`, and +1. a workspace with a default chat model configured. + +Point `DIFY_E2E_REASONING_APP_ID` at such an app, or set +`DIFY_E2E_REASONING_PROVISION=1` to import the `reasoning-chat.yml` fixture +(its system prompt forces a `` block, so any chat model triggers the +separated path — no dedicated reasoning model required). ## Running tests diff --git a/cli/test/e2e/fixtures/apps/reasoning-chat.yml b/cli/test/e2e/fixtures/apps/reasoning-chat.yml new file mode 100644 index 00000000000..ba24be57ef5 --- /dev/null +++ b/cli/test/e2e/fixtures/apps/reasoning-chat.yml @@ -0,0 +1,120 @@ +# Chatflow that exercises separated-mode reasoning (PR #37460): the LLM node sets +# reasoning_format=separated, so the server strips ... from the +# answer and streams the chain-of-thought on the out-of-band `reasoning_chunk` +# channel instead. The system prompt forces a block, so the separated +# path triggers with any chat model — no dedicated reasoning model required. +# +# NOTE: the LLM node leaves model.provider/name empty and relies on the target +# workspace's configured default chat model. The run-app-reasoning E2E suite is +# gated on DIFY_E2E_REASONING_APP_ID, so it is skipped unless a server with a +# working model is wired up. +app: + description: e2e-test reasoning (separated mode) + icon: 🧠 + icon_background: '#FFEAD5' + icon_type: emoji + mode: advanced-chat + name: reasoning-bot + use_icon_as_answer_icon: false +dependencies: [] +kind: app +version: 0.6.0 +workflow: + conversation_variables: [] + environment_variables: [] + features: + file_upload: {} + opening_statement: '' + retriever_resource: + enabled: true + sensitive_word_avoidance: + enabled: false + speech_to_text: + enabled: false + suggested_questions: [] + suggested_questions_after_answer: + enabled: false + text_to_speech: + enabled: false + language: '' + voice: '' + graph: + edges: + - id: start-llm + source: '1755189262236' + sourceHandle: source + target: llm + targetHandle: target + - id: llm-answer + source: llm + sourceHandle: source + target: answer + targetHandle: target + nodes: + - data: + desc: '' + title: Start + type: start + variables: [] + id: '1755189262236' + position: + x: 80 + y: 282 + sourcePosition: right + targetPosition: left + type: custom + - data: + context: + enabled: false + variable_selector: [] + desc: '' + memory: + query_prompt_template: '{{#sys.query#}}' + window: + enabled: false + size: 10 + model: + completion_params: + temperature: 0.7 + mode: chat + name: '' + provider: '' + prompt_template: + - role: system + text: >- + You are a helpful assistant. Always reason step by step INSIDE a + single ... block first, then write the final + answer AFTER the closing tag. The final answer must not + contain any tags. + reasoning_format: separated + selected: false + title: LLM + type: llm + variables: [] + vision: + enabled: false + id: llm + position: + x: 380 + y: 282 + sourcePosition: right + targetPosition: left + type: custom + - data: + answer: '{{#llm.text#}}' + desc: '' + title: Answer + type: answer + variables: [] + id: answer + position: + x: 680 + y: 282 + sourcePosition: right + targetPosition: left + type: custom + viewport: + x: 0 + y: 0 + zoom: 1 + rag_pipeline_variables: [] diff --git a/cli/test/e2e/setup/env.ts b/cli/test/e2e/setup/env.ts index 3f71b514426..33f8361d46b 100644 --- a/cli/test/e2e/setup/env.ts +++ b/cli/test/e2e/setup/env.ts @@ -37,6 +37,9 @@ * DIFY_E2E_HITL_EXTERNAL_APP_ID * DIFY_E2E_HITL_SINGLE_ACTION_APP_ID * DIFY_E2E_HITL_MULTI_NODE_APP_ID + * DIFY_E2E_REASONING_APP_ID Override separated-reasoning chatflow app ID + * DIFY_E2E_REASONING_PROVISION=1 Opt in to auto-provisioning reasoning-chat.yml + * (needs a workspace default chat model) */ /** Supported edition values. */ @@ -74,6 +77,12 @@ export type E2EEnv = { fileAppId: string /** Chat app (advanced-chat) with a file input variable */ fileChatAppId: string + /** + * Chatflow whose LLM node uses reasoning_format=separated. Empty unless + * DIFY_E2E_REASONING_APP_ID is set or the fixture is auto-provisioned; the + * run-app-reasoning suite is skipped when empty. + */ + reasoningAppId: string /** * Secondary workspace ID — EE only ("auto_test1"). * Empty in CE mode (CE has a single workspace). @@ -118,6 +127,7 @@ export type E2ECapabilities = { workflowAppId: string fileAppId: string fileChatAppId: string + reasoningAppId: string hitlAppId: string hitlExternalAppId: string hitlSingleActionAppId: string @@ -171,6 +181,7 @@ export function loadE2EEnv(): E2EEnv { hitlMultiNodeAppId: process.env.DIFY_E2E_HITL_MULTI_NODE_APP_ID ?? '', fileAppId: process.env.DIFY_E2E_FILE_APP_ID ?? '', fileChatAppId: process.env.DIFY_E2E_FILE_CHAT_APP_ID ?? '', + reasoningAppId: process.env.DIFY_E2E_REASONING_APP_ID ?? '', ws2Id: process.env.DIFY_E2E_WS2_ID ?? '', ws2AppId: process.env.DIFY_E2E_WS2_APP_ID ?? '', email: process.env.DIFY_E2E_EMAIL!, @@ -206,6 +217,7 @@ export function resolveEnv(caps: E2ECapabilities | undefined): E2EEnv { workflowAppId: caps.workflowAppId || env.workflowAppId, fileAppId: caps.fileAppId || env.fileAppId, fileChatAppId: caps.fileChatAppId || env.fileChatAppId, + reasoningAppId: caps.reasoningAppId || env.reasoningAppId, hitlAppId: caps.hitlAppId || env.hitlAppId, hitlExternalAppId: caps.hitlExternalAppId || env.hitlExternalAppId, hitlSingleActionAppId: caps.hitlSingleActionAppId || env.hitlSingleActionAppId, diff --git a/cli/test/e2e/setup/global-setup.ts b/cli/test/e2e/setup/global-setup.ts index 20b23295acb..92cc29fc1f2 100644 --- a/cli/test/e2e/setup/global-setup.ts +++ b/cli/test/e2e/setup/global-setup.ts @@ -182,6 +182,7 @@ export async function setup(project: TestProject): Promise { workflowAppId: '', fileAppId: '', fileChatAppId: '', + reasoningAppId: '', hitlAppId: '', hitlExternalAppId: '', hitlSingleActionAppId: '', @@ -288,6 +289,7 @@ export async function setup(project: TestProject): Promise { workflowAppId: provisionedIds.DIFY_E2E_WORKFLOW_APP_ID || E.workflowAppId, fileAppId: provisionedIds.DIFY_E2E_FILE_APP_ID || E.fileAppId, fileChatAppId: provisionedIds.DIFY_E2E_FILE_CHAT_APP_ID || E.fileChatAppId, + reasoningAppId: provisionedIds.DIFY_E2E_REASONING_APP_ID || E.reasoningAppId, hitlAppId: provisionedIds.DIFY_E2E_HITL_APP_ID || E.hitlAppId, hitlExternalAppId: provisionedIds.DIFY_E2E_HITL_EXTERNAL_APP_ID || E.hitlExternalAppId, hitlSingleActionAppId: provisionedIds.DIFY_E2E_HITL_SINGLE_ACTION_APP_ID || E.hitlSingleActionAppId, @@ -503,6 +505,12 @@ async function provisionApps( ['hitl-single-action.yml', 'DIFY_E2E_HITL_SINGLE_ACTION_APP_ID', primaryWsId], ['hitl-multi-node.yml', 'DIFY_E2E_HITL_MULTI_NODE_APP_ID', primaryWsId], ['file-chat.yml', 'DIFY_E2E_FILE_CHAT_APP_ID', primaryWsId], + // reasoning-chat.yml runs a real LLM node, so it is opt-in: provisioning it + // requires the workspace to have a default chat model configured. Off by + // default to keep the shared bootstrap free of any model dependency. + ...(process.env.DIFY_E2E_REASONING_PROVISION === '1' + ? [['reasoning-chat.yml', 'DIFY_E2E_REASONING_APP_ID', primaryWsId] as [string, string, string]] + : []), ...(edition === 'ee' ? [['ws2-workflow.yml', 'DIFY_E2E_WS2_APP_ID', secondaryWsId] as [string, string, string]] : []), diff --git a/cli/test/e2e/suites/run/run-app-reasoning.e2e.ts b/cli/test/e2e/suites/run/run-app-reasoning.e2e.ts new file mode 100644 index 00000000000..fd5f12e0b57 --- /dev/null +++ b/cli/test/e2e/suites/run/run-app-reasoning.e2e.ts @@ -0,0 +1,91 @@ +/** + * E2E: difyctl run app — separated-mode reasoning (PR #37460) + * + * Exercises the out-of-band `reasoning_chunk` SSE channel against a real server. + * Requires a chatflow whose LLM node uses reasoning_format=separated AND a + * workspace with a configured chat model. The whole suite is skipped unless + * DIFY_E2E_REASONING_APP_ID resolves (set it directly, or provision the + * reasoning-chat.yml fixture with DIFY_E2E_REASONING_PROVISION=1). + * + * Verifies the client adaptation: + * - --think surfaces the separated reasoning to stderr, framed as + * - the answer (stdout) stays free of + * - -o json persists the reasoning under metadata.reasoning + * - without --think, reasoning stays hidden + */ + +import type { AuthFixture } from '../../helpers/cli.js' +import { afterEach, beforeEach, describe, expect, inject } from 'vitest' +import { assertExitCode, assertJson, assertStderrContains } from '../../helpers/assert.js' +import { registerConversation } from '../../helpers/cleanup-registry.js' +import { withAuthFixture } from '../../helpers/cli.js' +import { withRetry } from '../../helpers/retry.js' +import { optionalIt } from '../../helpers/skip.js' +import { resolveEnv } from '../../setup/env.js' + +// @ts-expect-error — see test/e2e/helpers/vitest-context.ts for explanation +const caps = inject('e2eCapabilities') as import('../../setup/env.js').E2ECapabilities +const E = resolveEnv(caps) + +// Skipped unless a separated-reasoning chatflow is wired up (needs a real model). +const reasoningIt = optionalIt(Boolean(E.reasoningAppId)) + +const QUERY = 'In one short sentence, why is the sky blue?' + +describe('E2E / difyctl run app — separated reasoning', () => { + let fx: AuthFixture + + beforeEach(async () => { + fx = await withAuthFixture(E) + }) + afterEach(async () => { + await fx.cleanup() + }) + + reasoningIt('[P1] --think --stream surfaces reasoning on stderr, clean answer on stdout', async () => { + const result = await withRetry( + () => fx.r(['run', 'app', E.reasoningAppId, QUERY, '--think', '--stream']), + { attempts: 3, delayMs: 1000 }, + ) + + assertExitCode(result, 0) + expect(result.stdout.trim().length).toBeGreaterThan(0) + // Separated mode keeps the answer free of ; reasoning is framed on stderr. + expect(result.stdout).not.toContain('') + assertStderrContains(result, '') + }) + + reasoningIt('[P1] --think -o json persists reasoning under metadata.reasoning', async () => { + const result = await withRetry( + () => fx.r(['run', 'app', E.reasoningAppId, QUERY, '--think', '-o', 'json']), + { attempts: 3, delayMs: 1000 }, + ) + + assertExitCode(result, 0) + const parsed = assertJson<{ + conversation_id?: string + answer: string + metadata?: { reasoning?: Record } + }>(result) + + if (parsed.conversation_id) + registerConversation(E.host, E.token, E.reasoningAppId, parsed.conversation_id) + + const reasoning = parsed.metadata?.reasoning ?? {} + expect(Object.keys(reasoning).length).toBeGreaterThan(0) + expect(Object.values(reasoning).join('').length).toBeGreaterThan(0) + // --think also echoes the separated reasoning to stderr. + assertStderrContains(result, '') + }) + + reasoningIt('[P1] without --think, reasoning stays hidden', async () => { + const result = await withRetry( + () => fx.r(['run', 'app', E.reasoningAppId, QUERY, '--stream']), + { attempts: 3, delayMs: 1000 }, + ) + + assertExitCode(result, 0) + expect(result.stdout.trim().length).toBeGreaterThan(0) + expect(result.stderr).not.toContain('') + }) +}) diff --git a/cli/test/fixtures/dify-mock/scenarios.ts b/cli/test/fixtures/dify-mock/scenarios.ts index 221ccbb6b81..2e0b74d3d54 100644 --- a/cli/test/fixtures/dify-mock/scenarios.ts +++ b/cli/test/fixtures/dify-mock/scenarios.ts @@ -15,6 +15,8 @@ export type Scenario | 'server-version-unsupported' | 'run-422-stale' | 'workflow-think' + | 'chat-reasoning' + | 'workflow-reasoning' | 'import-pending' | 'import-failed' diff --git a/cli/test/fixtures/dify-mock/server.ts b/cli/test/fixtures/dify-mock/server.ts index 4b119286dbc..766963cd0d5 100644 --- a/cli/test/fixtures/dify-mock/server.ts +++ b/cli/test/fixtures/dify-mock/server.ts @@ -370,6 +370,32 @@ export function buildApp(getScenario: () => Scenario, state?: MockState): Hono { ]) return new Response(thinkSse, { status: 200, headers: { 'content-type': 'text/event-stream' } }) } + if (scenario === 'chat-reasoning') { + // Separated mode: reasoning streams out-of-band on `reasoning_chunk` (nested + // under `data`), the answer stays free of , and the terminal reasoning + // is persisted into message_end metadata. + const reasoningSse = sseChunks([ + { event: 'reasoning_chunk', data: { data: { message_id: 'msg-1', reasoning: 'secret reasoning', node_id: 'llm-1', is_final: false } } }, + { event: 'reasoning_chunk', data: { data: { message_id: 'msg-1', reasoning: '', node_id: 'llm-1', is_final: true } } }, + { event: 'message', data: { message_id: 'msg-1', conversation_id: 'conv-1', mode: app.mode, answer: 'final answer' } }, + { event: 'message_end', data: { message_id: 'msg-1', conversation_id: 'conv-1', task_id: 'task-1', metadata: { reasoning: { 'llm-1': 'secret reasoning' } } } }, + ]) + return new Response(reasoningSse, { status: 200, headers: { 'content-type': 'text/event-stream' } }) + } + if (scenario === 'workflow-reasoning') { + // Separated mode in a workflow: reasoning streams out-of-band on + // `reasoning_chunk` (no message_id), outputs stay clean, and there is NO + // persisted metadata — the live deltas are the only source. + const wfReasoningSse = sseChunks([ + { event: 'workflow_started', data: { id: 'wf-run-1', workflow_id: 'wf-1' } }, + { event: 'node_started', data: { id: 'llm-1', title: 'LLM' } }, + { event: 'reasoning_chunk', data: { data: { reasoning: 'secret reasoning', node_id: 'llm-1', is_final: false } } }, + { event: 'reasoning_chunk', data: { data: { reasoning: '', node_id: 'llm-1', is_final: true } } }, + { event: 'node_finished', data: { id: 'llm-1', status: 'succeeded' } }, + { event: 'workflow_finished', data: { id: 'wf-run-1', workflow_id: 'wf-1', data: { id: 'wf-run-1', status: 'succeeded', outputs: { result: 'final answer' } } } }, + ]) + return new Response(wfReasoningSse, { status: 200, headers: { 'content-type': 'text/event-stream' } }) + } const sse = streamingRunResponse(app.mode, query, isAgent) return new Response(sse, { status: 200, headers: { 'content-type': 'text/event-stream' } }) }) diff --git a/web/app/components/workflow-app/hooks/__tests__/use-workflow-run-callbacks.spec.ts b/web/app/components/workflow-app/hooks/__tests__/use-workflow-run-callbacks.spec.ts index 3756381822e..000c8fa532f 100644 --- a/web/app/components/workflow-app/hooks/__tests__/use-workflow-run-callbacks.spec.ts +++ b/web/app/components/workflow-app/hooks/__tests__/use-workflow-run-callbacks.spec.ts @@ -40,6 +40,7 @@ const createHandlers = () => ({ handleWorkflowAgentLog: vi.fn(), handleWorkflowTextChunk: vi.fn(), handleWorkflowTextReplace: vi.fn(), + handleWorkflowReasoning: vi.fn(), handleWorkflowPaused: vi.fn(), }) @@ -252,6 +253,7 @@ describe('useWorkflowRun callbacks helpers', () => { callbacks.onAgentLog?.({ node_id: 'node-1' } as never) callbacks.onTextChunk?.({ data: 'chunk' } as never) callbacks.onTextReplace?.({ text: 'replacement' } as never) + callbacks.onReasoning?.({ data: { reasoning: 'thinking', node_id: 'node-1' } } as never) callbacks.onHumanInputRequired?.({ node_id: 'node-1' } as never) callbacks.onHumanInputFormFilled?.({ node_id: 'node-1' } as never) callbacks.onHumanInputFormTimeout?.({ node_id: 'node-1' } as never) @@ -295,6 +297,7 @@ describe('useWorkflowRun callbacks helpers', () => { expect(userCallbacks.onAgentLog).toHaveBeenCalled() expect(handlers.handleWorkflowTextChunk).toHaveBeenCalled() expect(handlers.handleWorkflowTextReplace).toHaveBeenCalled() + expect(handlers.handleWorkflowReasoning).toHaveBeenCalled() expect(handlers.handleWorkflowNodeHumanInputRequired).toHaveBeenCalled() expect(userCallbacks.onHumanInputRequired).toHaveBeenCalled() expect(handlers.handleWorkflowNodeHumanInputFormFilled).toHaveBeenCalled() @@ -423,6 +426,7 @@ describe('useWorkflowRun callbacks helpers', () => { finalCallbacks.onAgentLog?.({ node_id: 'node-1' } as never) finalCallbacks.onTextChunk?.({ data: 'chunk' } as never) finalCallbacks.onTextReplace?.({ text: 'replacement' } as never) + finalCallbacks.onReasoning?.({ data: { reasoning: 'thinking', node_id: 'node-1' } } as never) finalCallbacks.onHumanInputRequired?.({ node_id: 'node-1' } as never) finalCallbacks.onHumanInputFormFilled?.({ node_id: 'node-1' } as never) finalCallbacks.onHumanInputFormTimeout?.({ node_id: 'node-1' } as never) @@ -461,6 +465,7 @@ describe('useWorkflowRun callbacks helpers', () => { expect(userCallbacks.onAgentLog).toHaveBeenCalled() expect(handlers.handleWorkflowTextChunk).toHaveBeenCalled() expect(handlers.handleWorkflowTextReplace).toHaveBeenCalled() + expect(handlers.handleWorkflowReasoning).toHaveBeenCalled() expect(handlers.handleWorkflowNodeHumanInputRequired).toHaveBeenCalled() expect(userCallbacks.onHumanInputRequired).toHaveBeenCalled() expect(handlers.handleWorkflowNodeHumanInputFormFilled).toHaveBeenCalled() diff --git a/web/app/components/workflow-app/hooks/use-workflow-run-callbacks.ts b/web/app/components/workflow-app/hooks/use-workflow-run-callbacks.ts index e8820f755c5..3b1c30b385c 100644 --- a/web/app/components/workflow-app/hooks/use-workflow-run-callbacks.ts +++ b/web/app/components/workflow-app/hooks/use-workflow-run-callbacks.ts @@ -27,6 +27,7 @@ type WorkflowRunEventHandlers = { handleWorkflowAgentLog: NonNullable handleWorkflowTextChunk: NonNullable handleWorkflowTextReplace: NonNullable + handleWorkflowReasoning: NonNullable handleWorkflowPaused: () => void } @@ -114,6 +115,7 @@ export const createBaseWorkflowRunCallbacks = ({ handleWorkflowAgentLog, handleWorkflowTextChunk, handleWorkflowTextReplace, + handleWorkflowReasoning, handleWorkflowPaused, } = handlers const { @@ -244,6 +246,9 @@ export const createBaseWorkflowRunCallbacks = ({ onTextReplace: (params) => { handleWorkflowTextReplace(params) }, + onReasoning: (params) => { + handleWorkflowReasoning(params) + }, onTTSChunk: (messageId: string, audio: string) => { if (!audio || audio === '') return @@ -325,6 +330,7 @@ export const createFinalWorkflowRunCallbacks = ({ handleWorkflowAgentLog, handleWorkflowTextChunk, handleWorkflowTextReplace, + handleWorkflowReasoning, handleWorkflowPaused, } = handlers const { @@ -439,6 +445,9 @@ export const createFinalWorkflowRunCallbacks = ({ onTextReplace: (params) => { handleWorkflowTextReplace(params) }, + onReasoning: (params) => { + handleWorkflowReasoning(params) + }, onTTSChunk: (messageId: string, audio: string) => { if (!audio || audio === '') return diff --git a/web/app/components/workflow-app/hooks/use-workflow-run-utils.ts b/web/app/components/workflow-app/hooks/use-workflow-run-utils.ts index eae758cf2f8..80a09f1c67c 100644 --- a/web/app/components/workflow-app/hooks/use-workflow-run-utils.ts +++ b/web/app/components/workflow-app/hooks/use-workflow-run-utils.ts @@ -78,6 +78,8 @@ export const createRunningWorkflowState = () => { }, tracing: [], resultText: '', + reasoningContent: {}, + reasoningFinished: false, } } diff --git a/web/app/components/workflow-app/hooks/use-workflow-run.ts b/web/app/components/workflow-app/hooks/use-workflow-run.ts index d7a90e36605..7254c646ee9 100644 --- a/web/app/components/workflow-app/hooks/use-workflow-run.ts +++ b/web/app/components/workflow-app/hooks/use-workflow-run.ts @@ -138,6 +138,7 @@ const useWorkflowRunBase = (doSyncWorkflowDraft: DoSyncWorkflowDraft) => { handleWorkflowAgentLog, handleWorkflowTextChunk, handleWorkflowTextReplace, + handleWorkflowReasoning, handleWorkflowPaused, } = useWorkflowRunEvent() @@ -326,6 +327,7 @@ const useWorkflowRunBase = (doSyncWorkflowDraft: DoSyncWorkflowDraft) => { handleWorkflowAgentLog, handleWorkflowTextChunk, handleWorkflowTextReplace, + handleWorkflowReasoning, handleWorkflowPaused, } const userCallbacks = { @@ -443,7 +445,7 @@ const useWorkflowRunBase = (doSyncWorkflowDraft: DoSyncWorkflowDraft) => { }, finalCallbacks, ) - }, [store, doSyncWorkflowDraft, workflowStore, pathname, handleWorkflowFailed, flowId, handleWorkflowStarted, handleWorkflowFinished, fetchInspectVars, invalidAllLastRun, invalidateRunHistory, handleWorkflowNodeStarted, handleWorkflowNodeFinished, handleWorkflowNodeIterationStarted, handleWorkflowNodeIterationNext, handleWorkflowNodeIterationFinished, handleWorkflowNodeLoopStarted, handleWorkflowNodeLoopNext, handleWorkflowNodeLoopFinished, handleWorkflowNodeRetry, handleWorkflowAgentLog, handleWorkflowTextChunk, handleWorkflowTextReplace, handleWorkflowPaused, handleWorkflowNodeHumanInputRequired, handleWorkflowNodeHumanInputFormFilled, handleWorkflowNodeHumanInputFormTimeout]) + }, [store, doSyncWorkflowDraft, workflowStore, pathname, handleWorkflowFailed, flowId, handleWorkflowStarted, handleWorkflowFinished, fetchInspectVars, invalidAllLastRun, invalidateRunHistory, handleWorkflowNodeStarted, handleWorkflowNodeFinished, handleWorkflowNodeIterationStarted, handleWorkflowNodeIterationNext, handleWorkflowNodeIterationFinished, handleWorkflowNodeLoopStarted, handleWorkflowNodeLoopNext, handleWorkflowNodeLoopFinished, handleWorkflowNodeRetry, handleWorkflowAgentLog, handleWorkflowTextChunk, handleWorkflowTextReplace, handleWorkflowReasoning, handleWorkflowPaused, handleWorkflowNodeHumanInputRequired, handleWorkflowNodeHumanInputFormFilled, handleWorkflowNodeHumanInputFormTimeout]) const handleStopRun = useCallback((taskId: string) => { const setStoppedState = () => { diff --git a/web/app/components/workflow/hooks/use-workflow-run-event/__tests__/use-workflow-reasoning.spec.ts b/web/app/components/workflow/hooks/use-workflow-run-event/__tests__/use-workflow-reasoning.spec.ts new file mode 100644 index 00000000000..ebbb547f706 --- /dev/null +++ b/web/app/components/workflow/hooks/use-workflow-run-event/__tests__/use-workflow-reasoning.spec.ts @@ -0,0 +1,61 @@ +import type { ReasoningChunkResponse } from '@/types/workflow' +import { baseRunningData, renderWorkflowHook } from '../../../__tests__/workflow-test-env' +import { useWorkflowReasoning } from '../use-workflow-reasoning' + +const reasoningChunk = (data: Partial): ReasoningChunkResponse => ({ + task_id: 'task-1', + event: 'reasoning_chunk', + data: { message_id: '', reasoning: '', ...data }, +}) + +describe('useWorkflowReasoning', () => { + it('accumulates reasoning deltas per LLM node id', () => { + const { result, store } = renderWorkflowHook(() => useWorkflowReasoning(), { + initialStoreState: { + workflowRunningData: baseRunningData({ resultText: '' }), + }, + }) + + result.current.handleWorkflowReasoning(reasoningChunk({ reasoning: 'let me ', node_id: 'llm' })) + result.current.handleWorkflowReasoning(reasoningChunk({ reasoning: 'think', node_id: 'llm' })) + + const state = store.getState().workflowRunningData! + expect(state.reasoningContent).toEqual({ llm: 'let me think' }) + expect(state.reasoningFinished).toBeFalsy() + }) + + it('keeps reasoning from multiple LLM nodes in separate buckets', () => { + const { result, store } = renderWorkflowHook(() => useWorkflowReasoning(), { + initialStoreState: { workflowRunningData: baseRunningData({ resultText: '' }) }, + }) + + result.current.handleWorkflowReasoning(reasoningChunk({ reasoning: 'a', node_id: 'llm-1' })) + result.current.handleWorkflowReasoning(reasoningChunk({ reasoning: 'b', node_id: 'llm-2' })) + + expect(store.getState().workflowRunningData!.reasoningContent).toEqual({ 'llm-1': 'a', 'llm-2': 'b' }) + }) + + it('falls back to "_" when the chunk carries no node id', () => { + const { result, store } = renderWorkflowHook(() => useWorkflowReasoning(), { + initialStoreState: { workflowRunningData: baseRunningData({ resultText: '' }) }, + }) + + result.current.handleWorkflowReasoning(reasoningChunk({ reasoning: 'x' })) + + expect(store.getState().workflowRunningData!.reasoningContent).toEqual({ _: 'x' }) + }) + + it('marks reasoning finished on the terminal marker without appending empty text', () => { + const { result, store } = renderWorkflowHook(() => useWorkflowReasoning(), { + initialStoreState: { + workflowRunningData: baseRunningData({ resultText: '', reasoningContent: { llm: 'done' } }), + }, + }) + + result.current.handleWorkflowReasoning(reasoningChunk({ reasoning: '', node_id: 'llm', is_final: true })) + + const state = store.getState().workflowRunningData! + expect(state.reasoningContent).toEqual({ llm: 'done' }) + expect(state.reasoningFinished).toBe(true) + }) +}) diff --git a/web/app/components/workflow/hooks/use-workflow-run-event/__tests__/use-workflow-run-event.spec.ts b/web/app/components/workflow/hooks/use-workflow-run-event/__tests__/use-workflow-run-event.spec.ts index fb8ea51638a..6010c791702 100644 --- a/web/app/components/workflow/hooks/use-workflow-run-event/__tests__/use-workflow-run-event.spec.ts +++ b/web/app/components/workflow/hooks/use-workflow-run-event/__tests__/use-workflow-run-event.spec.ts @@ -16,6 +16,7 @@ const handlers = vi.hoisted(() => ({ handleWorkflowNodeRetry: vi.fn(), handleWorkflowTextChunk: vi.fn(), handleWorkflowTextReplace: vi.fn(), + handleWorkflowReasoning: vi.fn(), handleWorkflowAgentLog: vi.fn(), handleWorkflowPaused: vi.fn(), handleWorkflowNodeHumanInputRequired: vi.fn(), @@ -45,6 +46,10 @@ vi.mock('..', () => ({ useWorkflowNodeHumanInputFormTimeout: () => ({ handleWorkflowNodeHumanInputFormTimeout: handlers.handleWorkflowNodeHumanInputFormTimeout }), })) +vi.mock('../use-workflow-reasoning', () => ({ + useWorkflowReasoning: () => ({ handleWorkflowReasoning: handlers.handleWorkflowReasoning }), +})) + describe('useWorkflowRunEvent', () => { it('returns the composed handlers from all workflow event hooks', () => { const { result } = renderHook(() => useWorkflowRunEvent()) diff --git a/web/app/components/workflow/hooks/use-workflow-run-event/use-workflow-reasoning.ts b/web/app/components/workflow/hooks/use-workflow-run-event/use-workflow-reasoning.ts new file mode 100644 index 00000000000..9abfeca0383 --- /dev/null +++ b/web/app/components/workflow/hooks/use-workflow-run-event/use-workflow-reasoning.ts @@ -0,0 +1,30 @@ +import type { ReasoningChunkResponse } from '@/types/workflow' +import { produce } from 'immer' +import { useCallback } from 'react' +import { useWorkflowStore } from '@/app/components/workflow/store' + +export const useWorkflowReasoning = () => { + const workflowStore = useWorkflowStore() + + const handleWorkflowReasoning = useCallback((params: ReasoningChunkResponse) => { + const { data: { reasoning, node_id, is_final } } = params + const { + workflowRunningData, + setWorkflowRunningData, + } = workflowStore.getState() + + setWorkflowRunningData(produce(workflowRunningData!, (draft) => { + const reasoningContent = (draft.reasoningContent ||= {}) + // key by LLM node so multiple nodes' reasoning stays separated + const key = node_id || '_' + if (reasoning) + reasoningContent[key] = (reasoningContent[key] || '') + reasoning + if (is_final) + draft.reasoningFinished = true + })) + }, [workflowStore]) + + return { + handleWorkflowReasoning, + } +} diff --git a/web/app/components/workflow/hooks/use-workflow-run-event/use-workflow-run-event.ts b/web/app/components/workflow/hooks/use-workflow-run-event/use-workflow-run-event.ts index bf8fd319a2d..2366fdd9684 100644 --- a/web/app/components/workflow/hooks/use-workflow-run-event/use-workflow-run-event.ts +++ b/web/app/components/workflow/hooks/use-workflow-run-event/use-workflow-run-event.ts @@ -19,6 +19,7 @@ import { useWorkflowTextChunk, useWorkflowTextReplace, } from '.' +import { useWorkflowReasoning } from './use-workflow-reasoning' export const useWorkflowRunEvent = () => { const { handleWorkflowStarted } = useWorkflowStarted() @@ -35,6 +36,7 @@ export const useWorkflowRunEvent = () => { const { handleWorkflowNodeRetry } = useWorkflowNodeRetry() const { handleWorkflowTextChunk } = useWorkflowTextChunk() const { handleWorkflowTextReplace } = useWorkflowTextReplace() + const { handleWorkflowReasoning } = useWorkflowReasoning() const { handleWorkflowAgentLog } = useWorkflowAgentLog() const { handleWorkflowPaused } = useWorkflowPaused() const { handleWorkflowNodeHumanInputRequired } = useWorkflowNodeHumanInputRequired() @@ -56,6 +58,7 @@ export const useWorkflowRunEvent = () => { handleWorkflowNodeRetry, handleWorkflowTextChunk, handleWorkflowTextReplace, + handleWorkflowReasoning, handleWorkflowAgentLog, handleWorkflowPaused, handleWorkflowNodeHumanInputFormFilled, diff --git a/web/app/components/workflow/panel/__tests__/workflow-preview.spec.tsx b/web/app/components/workflow/panel/__tests__/workflow-preview.spec.tsx index a3d629d1708..93d51acd78d 100644 --- a/web/app/components/workflow/panel/__tests__/workflow-preview.spec.tsx +++ b/web/app/components/workflow/panel/__tests__/workflow-preview.spec.tsx @@ -71,6 +71,12 @@ vi.mock('@/app/components/workflow/run/tracing-panel', () => ({ default: ({ list }: { list: unknown[] }) =>
{list.length}
, })) +vi.mock('@/app/components/base/chat/chat/answer/reasoning-panel', () => ({ + default: ({ content, done }: { content: Record, done: boolean }) => ( +
{Object.keys(content).join(',')}
+ ), +})) + vi.mock('@/app/components/workflow/panel/inputs-panel', () => ({ default: ({ onRun }: { onRun: () => void }) => (