This commit is contained in:
goingforstudying-ctrl 2026-06-25 22:17:39 +00:00 committed by GitHub
commit da681b3447
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
31 changed files with 165 additions and 699 deletions

View File

@ -87,7 +87,7 @@ from models import Account, Conversation, EndUser, Message, MessageFile
from models.enums import CreatorUserRole, MessageFileBelongsTo, MessageStatus
from models.execution_extra_content import HumanInputContent
from models.model import AppMode
from models.workflow import Workflow
from models.workflow import Workflow, WorkflowRun
logger = logging.getLogger(__name__)
@ -774,6 +774,13 @@ class AdvancedChatAppGenerateTaskPipeline(GraphRuntimeStateSupport):
with self._database_session() as session:
# Save message
self._save_message(session=session, graph_runtime_state=resolved_state)
# Update workflow run status to STOPPED so it does not remain RUNNING in logs
workflow_run = session.scalar(select(WorkflowRun).where(WorkflowRun.id == self._workflow_run_id))
if workflow_run is not None:
workflow_run.status = WorkflowExecutionStatus.STOPPED
workflow_run.error = event.get_stop_reason()
workflow_run.finished_at = naive_utc_now()
session.commit()
yield workflow_finish_resp
elif event.stopped_by in (

View File

@ -4,6 +4,7 @@ from collections.abc import Callable, Generator
from contextlib import contextmanager
from typing import Union
from sqlalchemy import select
from sqlalchemy.orm import Session, sessionmaker
from constants.tts_auto_play_timeout import TTS_AUTO_PLAY_TIMEOUT, TTS_AUTO_PLAY_YIELD_CPU_TIME
@ -31,7 +32,6 @@ from core.app.entities.queue_entities import (
QueueNodeStartedEvent,
QueueNodeSucceededEvent,
QueuePingEvent,
QueueReasoningChunkEvent,
QueueStopEvent,
QueueTextChunkEvent,
QueueWorkflowFailedEvent,
@ -48,7 +48,6 @@ from core.app.entities.task_entities import (
MessageAudioEndStreamResponse,
MessageAudioStreamResponse,
PingStreamResponse,
ReasoningChunkStreamResponse,
StreamResponse,
TextChunkStreamResponse,
WorkflowAppBlockingResponse,
@ -66,10 +65,11 @@ from extensions.ext_database import db
from graphon.entities import WorkflowStartReason
from graphon.enums import WorkflowExecutionStatus
from graphon.runtime import GraphRuntimeState
from libs.datetime_utils import naive_utc_now
from models import Account
from models.enums import CreatorUserRole
from models.model import EndUser
from models.workflow import Workflow, WorkflowAppLog, WorkflowAppLogCreatedFrom
from models.workflow import Workflow, WorkflowAppLog, WorkflowAppLogCreatedFrom, WorkflowRun
logger = logging.getLogger(__name__)
@ -552,6 +552,17 @@ class WorkflowAppGenerateTaskPipeline(GraphRuntimeStateSupport):
error=error,
exceptions_count=exceptions_count,
)
# Persist STOPPED status to the database so the workflow run does not remain RUNNING in logs
if isinstance(event, QueueStopEvent) and self._workflow_execution_id:
with self._database_session() as session:
workflow_run = session.scalar(select(WorkflowRun).where(WorkflowRun.id == self._workflow_execution_id))
if workflow_run is not None:
workflow_run.status = WorkflowExecutionStatus.STOPPED
workflow_run.error = error
workflow_run.finished_at = naive_utc_now()
session.commit()
yield workflow_finish_resp
def _handle_text_chunk_event(
@ -573,22 +584,6 @@ class WorkflowAppGenerateTaskPipeline(GraphRuntimeStateSupport):
yield self._text_chunk_to_stream_response(delta_text, from_variable_selector=event.from_variable_selector)
def _handle_reasoning_chunk_event(
self, event: QueueReasoningChunkEvent, **kwargs
) -> Generator[StreamResponse, None, None]:
"""Handle reasoning chunk events."""
# is_final with empty reasoning is still forwarded as the "thinking finished" signal
if not event.reasoning and not event.is_final:
return
yield ReasoningChunkStreamResponse(
task_id=self._application_generate_entity.task_id,
data=ReasoningChunkStreamResponse.Data(
reasoning=event.reasoning,
node_id=event.from_node_id,
is_final=event.is_final,
),
)
def _handle_agent_log_event(self, event: QueueAgentLogEvent, **kwargs) -> Generator[StreamResponse, None, None]:
"""Handle agent log events."""
yield self._workflow_response_converter.handle_agent_log(
@ -618,7 +613,6 @@ class WorkflowAppGenerateTaskPipeline(GraphRuntimeStateSupport):
QueuePingEvent: self._handle_ping_event,
QueueErrorEvent: self._handle_error_event,
QueueTextChunkEvent: self._handle_text_chunk_event,
QueueReasoningChunkEvent: self._handle_reasoning_chunk_event,
# Workflow events
QueueWorkflowStartedEvent: self._handle_workflow_started_event,
QueueWorkflowSucceededEvent: self._handle_workflow_succeeded_event,

View File

@ -743,8 +743,7 @@ class ReasoningChunkStreamResponse(StreamResponse):
Data entity
"""
# chat apps set this; workflow runs have no message
message_id: str | None = None
message_id: str
reasoning: str
node_id: str | None = None
is_final: bool = False

View File

@ -29,7 +29,6 @@ from core.app.entities.queue_entities import (
QueueNodeExceptionEvent,
QueueNodeFailedEvent,
QueuePingEvent,
QueueReasoningChunkEvent,
QueueRetrieverResourcesEvent,
QueueStopEvent,
QueueTextChunkEvent,
@ -47,12 +46,11 @@ from core.app.entities.task_entities import (
MessageAudioStreamResponse,
MessageEndStreamResponse,
PingStreamResponse,
ReasoningChunkStreamResponse,
)
from core.base.tts.app_generator_tts_publisher import AudioTrunk
from core.workflow.system_variables import build_system_variables
from graphon.entities.pause_reason import PauseReasonType
from graphon.enums import BuiltinNodeTypes
from graphon.enums import BuiltinNodeTypes, WorkflowExecutionStatus
from graphon.nodes.human_input.entities import UserActionConfig
from graphon.runtime import GraphRuntimeState, VariablePool
from libs.datetime_utils import naive_utc_now
@ -198,42 +196,6 @@ class TestAdvancedChatGenerateTaskPipeline:
assert pipeline._task_state.answer == "hi"
assert responses
def test_handle_reasoning_chunk_event_emits_on_nonempty(self):
pipeline = _make_pipeline()
event = QueueReasoningChunkEvent(reasoning="pondering", from_node_id="llm-1", is_final=False)
responses = list(pipeline._handle_reasoning_chunk_event(event))
assert len(responses) == 1
response = responses[0]
assert isinstance(response, ReasoningChunkStreamResponse)
assert response.data.message_id == pipeline._message_id
assert response.data.reasoning == "pondering"
assert response.data.node_id == "llm-1"
assert response.data.is_final is False
# reasoning never touches the answer stream
assert pipeline._task_state.answer == ""
def test_handle_reasoning_chunk_event_drops_empty_nonfinal(self):
pipeline = _make_pipeline()
event = QueueReasoningChunkEvent(reasoning="", from_node_id="llm-1", is_final=False)
responses = list(pipeline._handle_reasoning_chunk_event(event))
assert responses == []
def test_handle_reasoning_chunk_event_emits_empty_final_marker(self):
pipeline = _make_pipeline()
event = QueueReasoningChunkEvent(reasoning="", from_node_id="llm-1", is_final=True)
responses = list(pipeline._handle_reasoning_chunk_event(event))
assert len(responses) == 1
response = responses[0]
assert isinstance(response, ReasoningChunkStreamResponse)
assert response.data.reasoning == ""
assert response.data.is_final is True
def test_listen_audio_msg_returns_audio_stream(self):
pipeline = _make_pipeline()
publisher = SimpleNamespace(check_and_get_audio=lambda: AudioTrunk(status="stream", audio="data"))
@ -357,43 +319,6 @@ class TestAdvancedChatGenerateTaskPipeline:
assert responses == ["done"]
assert pipeline._recorded_files
def test_handle_node_succeeded_event_records_llm_reasoning(self):
pipeline = _make_pipeline()
pipeline._workflow_response_converter.fetch_files_from_node_outputs = lambda outputs: []
pipeline._workflow_response_converter.workflow_node_finish_to_stream_response = lambda **kwargs: "done"
pipeline._save_output_for_event = lambda event, node_execution_id: None
event = SimpleNamespace(
node_type=BuiltinNodeTypes.LLM,
outputs={"reasoning_content": "first pass "},
node_execution_id="exec",
node_id="llm-1",
)
list(pipeline._handle_node_succeeded_event(event))
assert pipeline._task_state.metadata.reasoning == {"llm-1": "first pass "}
def test_handle_node_succeeded_event_accumulates_reasoning_across_passes(self):
pipeline = _make_pipeline()
pipeline._workflow_response_converter.fetch_files_from_node_outputs = lambda outputs: []
pipeline._workflow_response_converter.workflow_node_finish_to_stream_response = lambda **kwargs: "done"
pipeline._save_output_for_event = lambda event, node_execution_id: None
def _llm_event(reasoning: str):
return SimpleNamespace(
node_type=BuiltinNodeTypes.LLM,
outputs={"reasoning_content": reasoning},
node_execution_id="exec",
node_id="llm-1",
)
# Same node id across iteration/loop passes must accumulate, not overwrite.
list(pipeline._handle_node_succeeded_event(_llm_event("pass one ")))
list(pipeline._handle_node_succeeded_event(_llm_event("pass two")))
assert pipeline._task_state.metadata.reasoning == {"llm-1": "pass one pass two"}
def test_iteration_and_loop_handlers(self):
pipeline = _make_pipeline()
pipeline._workflow_run_id = "run-id"
@ -715,6 +640,66 @@ class TestAdvancedChatGenerateTaskPipeline:
assert responses == ["end"]
assert saved == ["saved"]
def test_handle_stop_event_updates_workflow_run_status_for_user_manual_stop(self, monkeypatch: pytest.MonkeyPatch):
pipeline = _make_pipeline()
pipeline._message_end_to_stream_response = lambda: "end"
pipeline._workflow_response_converter = SimpleNamespace(
workflow_finish_to_stream_response=lambda **kwargs: "finish"
)
pipeline._workflow_run_id = "run-id"
pipeline._workflow_id = "workflow-id"
pipeline._graph_runtime_state = GraphRuntimeState(
variable_pool=VariablePool.from_bootstrap(
system_variables=build_system_variables(workflow_execution_id="run-id")
),
start_at=0.0,
)
saved: list[str] = []
def _save_message(**kwargs):
saved.append("saved")
pipeline._save_message = _save_message
# Track whether workflow_run status was updated
status_updates: list[dict] = []
class FakeWorkflowRun:
def __init__(self):
self.status = None
self.error = None
self.finished_at = None
fake_run = FakeWorkflowRun()
class FakeSession:
def scalar(self, stmt):
return fake_run
def commit(self):
status_updates.append(
{
"status": fake_run.status,
"error": fake_run.error,
"finished_at": fake_run.finished_at,
}
)
@contextmanager
def _fake_session():
yield FakeSession()
monkeypatch.setattr(pipeline, "_database_session", _fake_session)
responses = list(pipeline._handle_stop_event(QueueStopEvent(stopped_by=QueueStopEvent.StopBy.USER_MANUAL)))
assert responses == ["finish", "end"]
assert saved == ["saved"]
assert len(status_updates) == 1
assert status_updates[0]["status"] == WorkflowExecutionStatus.STOPPED
assert status_updates[0]["error"] == "Stopped by user."
assert status_updates[0]["finished_at"] is not None
def test_handle_message_end_event_applies_output_moderation(self, monkeypatch: pytest.MonkeyPatch):
pipeline = _make_pipeline()
pipeline._graph_runtime_state = GraphRuntimeState(

View File

@ -16,7 +16,6 @@ from core.app.entities.queue_entities import (
QueueNodeFailedEvent,
QueueNodeRetryEvent,
QueueNodeSucceededEvent,
QueueReasoningChunkEvent,
QueueTextChunkEvent,
QueueWorkflowPausedEvent,
QueueWorkflowStartedEvent,
@ -35,7 +34,6 @@ from graphon.graph_events import (
NodeRunHumanInputFormFilledEvent,
NodeRunIterationSucceededEvent,
NodeRunLoopFailedEvent,
NodeRunReasoningChunkEvent,
NodeRunRetryEvent,
NodeRunStartedEvent,
NodeRunStreamChunkEvent,
@ -397,17 +395,6 @@ class TestWorkflowBasedAppRunner:
is_final=False,
),
)
runner._handle_event(
workflow_entry,
NodeRunReasoningChunkEvent(
id="exec",
node_id="node",
node_type=BuiltinNodeTypes.LLM,
selector=["node", "reasoning_content"],
chunk="thinking",
is_final=False,
),
)
runner._handle_event(
workflow_entry,
NodeRunAgentLogEvent(
@ -455,7 +442,6 @@ class TestWorkflowBasedAppRunner:
)
assert any(isinstance(event, QueueTextChunkEvent) for event in published)
assert any(isinstance(event, QueueReasoningChunkEvent) for event in published)
assert any(isinstance(event, QueueAgentLogEvent) for event in published)
assert any(isinstance(event, QueueIterationCompletedEvent) for event in published)
assert any(isinstance(event, QueueLoopCompletedEvent) for event in published)

View File

@ -27,7 +27,6 @@ from core.app.entities.queue_entities import (
QueueNodeStartedEvent,
QueueNodeSucceededEvent,
QueuePingEvent,
QueueReasoningChunkEvent,
QueueStopEvent,
QueueTextChunkEvent,
QueueWorkflowFailedEvent,
@ -42,7 +41,6 @@ from core.app.entities.task_entities import (
MessageAudioEndStreamResponse,
MessageAudioStreamResponse,
PingStreamResponse,
ReasoningChunkStreamResponse,
WorkflowAppPausedBlockingResponse,
WorkflowFinishStreamResponse,
WorkflowStartStreamResponse,
@ -268,41 +266,6 @@ class TestWorkflowGenerateTaskPipeline:
assert responses[0].data.text == "hi"
assert published == [queue_message]
def test_handle_reasoning_chunk_event_emits_on_nonempty(self):
pipeline = _make_pipeline()
event = QueueReasoningChunkEvent(reasoning="pondering", from_node_id="llm-1", is_final=False)
responses = list(pipeline._handle_reasoning_chunk_event(event))
assert len(responses) == 1
response = responses[0]
assert isinstance(response, ReasoningChunkStreamResponse)
# workflow runs have no message, so the id is omitted
assert response.data.message_id is None
assert response.data.reasoning == "pondering"
assert response.data.node_id == "llm-1"
assert response.data.is_final is False
def test_handle_reasoning_chunk_event_drops_empty_nonfinal(self):
pipeline = _make_pipeline()
event = QueueReasoningChunkEvent(reasoning="", from_node_id="llm-1", is_final=False)
responses = list(pipeline._handle_reasoning_chunk_event(event))
assert responses == []
def test_handle_reasoning_chunk_event_emits_empty_final_marker(self):
pipeline = _make_pipeline()
event = QueueReasoningChunkEvent(reasoning="", from_node_id="llm-1", is_final=True)
responses = list(pipeline._handle_reasoning_chunk_event(event))
assert len(responses) == 1
response = responses[0]
assert isinstance(response, ReasoningChunkStreamResponse)
assert response.data.reasoning == ""
assert response.data.is_final is True
def test_dispatch_event_handles_node_failed(self):
pipeline = _make_pipeline()
pipeline._workflow_response_converter.workflow_node_finish_to_stream_response = lambda **kwargs: "done"
@ -320,7 +283,7 @@ class TestWorkflowGenerateTaskPipeline:
assert list(pipeline._dispatch_event(event)) == ["done"]
def test_handle_stop_event_yields_finish(self):
def test_handle_stop_event_yields_finish(self, monkeypatch: pytest.MonkeyPatch):
pipeline = _make_pipeline()
pipeline._workflow_execution_id = "run-id"
pipeline._graph_runtime_state = GraphRuntimeState(
@ -331,6 +294,19 @@ class TestWorkflowGenerateTaskPipeline:
)
pipeline._workflow_response_converter.workflow_finish_to_stream_response = lambda **kwargs: "finish"
@contextmanager
def _fake_session():
class FakeSession:
def scalar(self, stmt):
return None
def commit(self):
pass
yield FakeSession()
monkeypatch.setattr(pipeline, "_database_session", _fake_session)
responses = list(
pipeline._handle_workflow_failed_and_stop_events(
QueueStopEvent(stopped_by=QueueStopEvent.StopBy.USER_MANUAL)
@ -339,6 +315,58 @@ class TestWorkflowGenerateTaskPipeline:
assert responses == ["finish"]
def test_handle_stop_event_updates_workflow_run_status(self, monkeypatch: pytest.MonkeyPatch):
pipeline = _make_pipeline()
pipeline._workflow_execution_id = "run-id"
pipeline._graph_runtime_state = GraphRuntimeState(
variable_pool=VariablePool.from_bootstrap(
system_variables=build_system_variables(workflow_execution_id="run-id")
),
start_at=0.0,
)
pipeline._workflow_response_converter.workflow_finish_to_stream_response = lambda **kwargs: "finish"
status_updates: list[dict] = []
class FakeWorkflowRun:
def __init__(self):
self.status = None
self.error = None
self.finished_at = None
fake_run = FakeWorkflowRun()
class FakeSession:
def scalar(self, stmt):
return fake_run
def commit(self):
status_updates.append(
{
"status": fake_run.status,
"error": fake_run.error,
"finished_at": fake_run.finished_at,
}
)
@contextmanager
def _fake_session():
yield FakeSession()
monkeypatch.setattr(pipeline, "_database_session", _fake_session)
responses = list(
pipeline._handle_workflow_failed_and_stop_events(
QueueStopEvent(stopped_by=QueueStopEvent.StopBy.USER_MANUAL)
)
)
assert responses == ["finish"]
assert len(status_updates) == 1
assert status_updates[0]["status"] == WorkflowExecutionStatus.STOPPED
assert status_updates[0]["error"] == "Stopped by user."
assert status_updates[0]["finished_at"] is not None
def test_save_workflow_app_log_created_from(self):
pipeline = _make_pipeline()
pipeline._application_generate_entity.invoke_from = InvokeFrom.SERVICE_API

View File

@ -29,7 +29,7 @@ export default class ResumeApp extends DifyCommand {
'workspace': Flags.string({ description: 'workspace id override' }),
'with-history': Flags.boolean({ description: 'Replay executed-node history before attaching to live stream.', default: false }),
'stream': Flags.boolean({ description: 'Print output live as tokens/events arrive. Default: collect and print at end.', default: false }),
'think': Flags.boolean({ description: 'Show model thinking/reasoning when available — both inline <think>...</think> blocks and separated reasoning streams. Hidden by default; with --think, thinking is printed to stderr.', default: false }),
'think': Flags.boolean({ description: 'Show model thinking/reasoning when available. Strips <think>...</think> blocks silently by default; with --think, thinking is printed to stderr.', default: false }),
'output': Flags.outputFormat({ options: [OutputFormat.JSON, OutputFormat.YAML, OutputFormat.TEXT], default: '' }),
'http-retry': httpRetryFlag,
}

View File

@ -7,7 +7,6 @@ import { collect, HitlPauseError } from '@/commands/run/app/sse-collector'
import { formatted, stringifyOutput } from '@/framework/output'
import { handle, unhandle } from '@/sys/index'
import { colorEnabled, colorScheme } from '@/sys/io/color'
import { reasoningBlocksFromMetadata } from '@/sys/io/reasoning'
import { startSpinner } from '@/sys/io/spinner'
import { extractThinkBlocks, filterThinkInOutputs, stripThinkBlocks } from '@/sys/io/think-filter'
@ -100,13 +99,6 @@ export class StreamingStructuredStrategy implements RunStrategy {
}
}
// Surface separated-mode reasoning (carried in message_end metadata) to stderr under --think.
if (ctx.think) {
const reasoningBlocks = reasoningBlocksFromMetadata(processedResp.metadata)
if (reasoningBlocks !== '')
deps.io.err.write(`${reasoningBlocks}\n`)
}
const respMode = typeof processedResp.mode === 'string' && processedResp.mode !== '' ? processedResp.mode : mode
deps.io.out.write(stringifyOutput(formatted({ format, data: newAppRunObject(respMode, processedResp) })))
if (isText && CHAT_MODES.has(respMode)) {

View File

@ -35,7 +35,7 @@ export default class RunApp extends DifyCommand {
'workflow-id': Flags.string({ description: 'Pin to a specific published workflow version' }),
'workspace': Flags.string({ description: 'Workspace id (overrides DIFY_WORKSPACE_ID and stored default)' }),
'stream': Flags.boolean({ description: 'Print output live as tokens/events arrive (default: collect and print at end)', default: false }),
'think': Flags.boolean({ description: 'Show model thinking/reasoning when available — both inline <think>...</think> blocks and separated reasoning streams. Hidden by default; with --think, thinking is printed to stderr.', default: false }),
'think': Flags.boolean({ description: 'Show model thinking/reasoning when available. Strips <think>...</think> blocks silently by default; with --think, thinking is printed to stderr.', default: false }),
'retry-on-limit': Flags.boolean({ description: 'On a 429 rate limit, wait and retry this POST (bounded) instead of failing immediately. Off by default since running an app is not idempotent.', default: false }),
'http-retry': httpRetryFlag,
'output': Flags.outputFormat({ options: [OutputFormat.JSON, OutputFormat.YAML, OutputFormat.TEXT], default: '' }),

View File

@ -203,85 +203,6 @@ describe('runApp', () => {
expect(io.errBuf()).toContain('secret reasoning')
})
it('--stream chat --think: routes separated reasoning to stderr, clean answer to stdout', async () => {
mock.setScenario('chat-reasoning')
const io = bufferStreams()
const cache = await loadAppInfoCache({ store: getCache(CACHE_APP_INFO) })
await runApp(
{ appId: 'app-1', message: 'hi', stream: true, think: true },
{ active: active(), http: testHttpClient(mock.url, 'dfoa_test'), host: mock.url, io, cache },
)
expect(io.outBuf()).toContain('final answer')
expect(io.outBuf()).not.toContain('secret reasoning')
expect(io.errBuf()).toContain('<think>')
expect(io.errBuf()).toContain('secret reasoning')
})
it('--stream chat without --think: separated reasoning stays hidden', async () => {
mock.setScenario('chat-reasoning')
const io = bufferStreams()
const cache = await loadAppInfoCache({ store: getCache(CACHE_APP_INFO) })
await runApp(
{ appId: 'app-1', message: 'hi', stream: true },
{ active: active(), http: testHttpClient(mock.url, 'dfoa_test'), host: mock.url, io, cache },
)
expect(io.outBuf()).toContain('final answer')
expect(io.errBuf()).not.toContain('secret reasoning')
})
it('chat -o json --think: echoes separated reasoning to stderr, persists it in metadata', async () => {
mock.setScenario('chat-reasoning')
const io = bufferStreams()
const cache = await loadAppInfoCache({ store: getCache(CACHE_APP_INFO) })
await runApp(
{ appId: 'app-1', message: 'hi', format: 'json', think: true },
{ active: active(), http: testHttpClient(mock.url, 'dfoa_test'), host: mock.url, io, cache },
)
expect(io.errBuf()).toContain('secret reasoning')
const parsed = JSON.parse(io.outBuf()) as { answer: string, metadata: { reasoning: Record<string, string> } }
expect(parsed.answer).toBe('final answer')
expect(parsed.metadata.reasoning).toEqual({ 'llm-1': 'secret reasoning' })
})
it('--stream workflow --think: routes separated reasoning to stderr, clean outputs to stdout', async () => {
mock.setScenario('workflow-reasoning')
const io = bufferStreams()
const cache = await loadAppInfoCache({ store: getCache(CACHE_APP_INFO) })
await runApp(
{ appId: 'app-2', inputs: { x: '1' }, stream: true, think: true },
{ active: active(), http: testHttpClient(mock.url, 'dfoa_test'), host: mock.url, io, cache },
)
expect(io.errBuf()).toContain('<think>')
expect(io.errBuf()).toContain('secret reasoning')
expect(io.outBuf()).toContain('final answer')
expect(io.outBuf()).not.toContain('secret reasoning')
})
it('workflow -o json --think: echoes reasoning to stderr, accumulates into metadata.reasoning', async () => {
mock.setScenario('workflow-reasoning')
const io = bufferStreams()
const cache = await loadAppInfoCache({ store: getCache(CACHE_APP_INFO) })
await runApp(
{ appId: 'app-2', inputs: { x: '1' }, format: 'json', think: true },
{ active: active(), http: testHttpClient(mock.url, 'dfoa_test'), host: mock.url, io, cache },
)
expect(io.errBuf()).toContain('secret reasoning')
const parsed = JSON.parse(io.outBuf()) as { metadata: { reasoning: Record<string, string> } }
expect(parsed.metadata.reasoning).toEqual({ 'llm-1': 'secret reasoning' })
})
it('--stream workflow without --think: reasoning stays hidden', async () => {
mock.setScenario('workflow-reasoning')
const io = bufferStreams()
const cache = await loadAppInfoCache({ store: getCache(CACHE_APP_INFO) })
await runApp(
{ appId: 'app-2', inputs: { x: '1' }, stream: true },
{ active: active(), http: testHttpClient(mock.url, 'dfoa_test'), host: mock.url, io, cache },
)
expect(io.outBuf()).toContain('final answer')
expect(io.errBuf()).not.toContain('secret reasoning')
})
it('stream-error scenario: error event surfaces typed BaseError', async () => {
mock.setScenario('stream-error')
const io = bufferStreams()

View File

@ -59,41 +59,6 @@ describe('collect — chat', () => {
})
})
describe('collect — chat separated reasoning', () => {
function reasoningEvent(reasoning: string, isFinal: boolean) {
return ev('reasoning_chunk', { data: { message_id: 'm1', reasoning, node_id: 'llm-1', is_final: isFinal } })
}
it('backfills metadata.reasoning from live deltas when the server omits it', async () => {
const got = await collect(iterOf(
reasoningEvent('pon', false),
reasoningEvent('dering', true),
ev('message', { message_id: 'm1', answer: 'answer' }),
ev('message_end', { metadata: { usage: { tokens: 3 } } }),
), 'advanced-chat')
expect(got.answer).toBe('answer')
expect((got.metadata as { reasoning?: unknown }).reasoning).toEqual({ 'llm-1': 'pondering' })
expect((got.metadata as { usage?: unknown }).usage).toEqual({ tokens: 3 })
})
it('keeps the server-persisted reasoning over live deltas', async () => {
const got = await collect(iterOf(
reasoningEvent('live', true),
ev('message', { answer: 'a' }),
ev('message_end', { metadata: { reasoning: { 'llm-1': 'persisted' } } }),
), 'advanced-chat')
expect((got.metadata as { reasoning?: unknown }).reasoning).toEqual({ 'llm-1': 'persisted' })
})
it('leaves metadata untouched when there is no reasoning at all', async () => {
const got = await collect(iterOf(
ev('message', { answer: 'a' }),
ev('message_end', { metadata: { usage: { tokens: 1 } } }),
), 'advanced-chat')
expect((got.metadata as { reasoning?: unknown }).reasoning).toBeUndefined()
})
})
describe('collect — agent-chat', () => {
it('captures agent_thoughts', async () => {
const got = await collect(iterOf(
@ -132,39 +97,6 @@ describe('collect — workflow', () => {
})
})
describe('collect — workflow separated reasoning', () => {
function wfReasoning(reasoning: string, nodeId: string, isFinal: boolean) {
return ev('reasoning_chunk', { data: { reasoning, node_id: nodeId, is_final: isFinal } })
}
it('accumulates reasoning_chunk per node into metadata.reasoning', async () => {
const got = await collect(iterOf(
ev('node_started', { id: 'llm-1' }),
wfReasoning('pon', 'llm-1', false),
wfReasoning('dering', 'llm-1', true),
ev('workflow_finished', { data: { status: 'succeeded', outputs: { result: 'clean' } } }),
), 'workflow')
expect((got.data as { outputs: { result: string } }).outputs.result).toBe('clean')
expect((got.metadata as { reasoning?: unknown }).reasoning).toEqual({ 'llm-1': 'pondering' })
})
it('keys reasoning by node, leaves metadata absent when there is none', async () => {
const got = await collect(iterOf(
ev('workflow_finished', { data: { status: 'succeeded', outputs: { result: 'clean' } } }),
), 'workflow')
expect((got.metadata as { reasoning?: unknown } | undefined)?.reasoning).toBeUndefined()
})
it('merges reasoning into metadata already carried by workflow_finished', async () => {
const got = await collect(iterOf(
wfReasoning('think', 'llm-1', true),
ev('workflow_finished', { data: { status: 'succeeded' }, metadata: { usage: { tokens: 7 } } }),
), 'workflow')
expect((got.metadata as { reasoning?: unknown }).reasoning).toEqual({ 'llm-1': 'think' })
expect((got.metadata as { usage?: unknown }).usage).toEqual({ tokens: 7 })
})
})
describe('collect — error event', () => {
it('throws BaseError when error event arrives', async () => {
await expect(collect(iterOf(

View File

@ -2,7 +2,6 @@ import type { BaseError } from '@/errors/base'
import type { SseEvent } from '@/http/sse'
import { HttpClientError, newError } from '@/errors/base'
import { ErrorCode } from '@/errors/codes'
import { accumulateReasoning, parseReasoningChunk } from '@/sys/io/reasoning'
import { RUN_MODES } from './handlers'
export type HitlPauseData = {
@ -68,7 +67,6 @@ class ChatCollector implements Collector {
private base: Record<string, unknown> = {}
private metadata: Record<string, unknown> | undefined
private thoughts: unknown[] = []
private readonly reasoning: Record<string, string> = {}
private readonly mode: string
private readonly isAgent: boolean
constructor(mode: string, isAgent: boolean) {
@ -86,13 +84,6 @@ class ChatCollector implements Collector {
copyScalar(this.base, c, ['id', 'conversation_id', 'message_id', 'task_id', 'created_at'])
return
}
// Accumulate separated-mode reasoning deltas per LLM node.
case 'reasoning_chunk': {
const chunk = parseReasoningChunk(c)
if (chunk !== undefined)
accumulateReasoning(this.reasoning, chunk)
return
}
case 'agent_thought':
this.thoughts.push(c)
return
@ -107,23 +98,12 @@ class ChatCollector implements Collector {
const out: Record<string, unknown> = { mode: this.mode, answer: this.answer, ...this.base }
if (this.metadata !== undefined)
out.metadata = this.metadata
// Fall back to live deltas only when the server didn't persist reasoning in metadata.
if (Object.keys(this.reasoning).length > 0 && !hasReasoning(this.metadata))
out.metadata = { ...(this.metadata ?? {}), reasoning: this.reasoning }
if (this.isAgent || this.thoughts.length > 0)
out.agent_thoughts = this.thoughts
return out
}
}
function hasReasoning(metadata: Record<string, unknown> | undefined): boolean {
const reasoning = metadata?.reasoning
return reasoning !== null
&& typeof reasoning === 'object'
&& !Array.isArray(reasoning)
&& Object.keys(reasoning as object).length > 0
}
class CompletionCollector implements Collector {
private answer = ''
private base: Record<string, unknown> = {}
@ -153,29 +133,14 @@ class CompletionCollector implements Collector {
class WorkflowCollector implements Collector {
private final: Record<string, unknown> | undefined
private readonly reasoning: Record<string, string> = {}
consume(ev: SseEvent): void {
if (ev.name === 'reasoning_chunk') {
const chunk = parseReasoningChunk(parseJson(ev.data))
if (chunk !== undefined)
accumulateReasoning(this.reasoning, chunk)
return
}
if (ev.name !== 'workflow_finished')
return
this.final = parseJson(ev.data)
}
finalize(): Record<string, unknown> {
const out: Record<string, unknown> = { mode: RUN_MODES.Workflow, ...(this.final ?? {}) }
// Workflow runs don't persist reasoning; surface live deltas under metadata.reasoning.
if (Object.keys(this.reasoning).length > 0) {
const existing = (out.metadata !== null && typeof out.metadata === 'object' && !Array.isArray(out.metadata))
? out.metadata as Record<string, unknown>
: undefined
out.metadata = { ...(existing ?? {}), reasoning: this.reasoning }
}
return out
return { mode: RUN_MODES.Workflow, ...(this.final ?? {}) }
}
}

View File

@ -37,42 +37,6 @@ describe('streamPrinterFor — chat', () => {
})
})
function reasoningEvent(reasoning: string, isFinal: boolean) {
return ev('reasoning_chunk', { data: { message_id: 'm1', reasoning, node_id: 'llm-1', is_final: isFinal } })
}
describe('streamPrinterFor — chat separated reasoning', () => {
it('think: true frames reasoning_chunk deltas to stderr, answer stays clean on stdout', () => {
const sp = streamPrinterFor('advanced-chat', true)
const cap = captures()
sp.onEvent(cap.out, cap.err, reasoningEvent('pon', false))
sp.onEvent(cap.out, cap.err, reasoningEvent('dering', false))
sp.onEvent(cap.out, cap.err, reasoningEvent('', true))
sp.onEvent(cap.out, cap.err, ev('message', { conversation_id: 'c1', answer: 'final answer' }))
sp.onEnd(cap.out, cap.err)
expect(cap.errBuf()).toContain('<think> [llm-1]\npondering</think>')
expect(cap.outBuf()).toBe('final answer\n')
})
it('think: false ignores reasoning_chunk entirely', () => {
const sp = streamPrinterFor('advanced-chat', false)
const cap = captures()
sp.onEvent(cap.out, cap.err, reasoningEvent('secret', true))
sp.onEvent(cap.out, cap.err, ev('message', { answer: 'hi' }))
sp.onEnd(cap.out, cap.err)
expect(cap.errBuf()).not.toContain('secret')
expect(cap.outBuf()).toBe('hi\n')
})
it('closes an unterminated reasoning block on stream end', () => {
const sp = streamPrinterFor('advanced-chat', true)
const cap = captures()
sp.onEvent(cap.out, cap.err, reasoningEvent('thinking', false))
sp.onEnd(cap.out, cap.err)
expect(cap.errBuf()).toContain('<think> [llm-1]\nthinking</think>')
})
})
describe('streamPrinterFor — agent-chat', () => {
it('writes agent_thought to stderr', () => {
const sp = streamPrinterFor('agent-chat')
@ -141,62 +105,6 @@ describe('streamPrinterFor — workflow think filtering', () => {
})
})
// Workflow reasoning_chunk events carry no message_id (workflow runs have no message).
function wfReasoning(reasoning: string, nodeId: string, isFinal: boolean) {
return ev('reasoning_chunk', { data: { reasoning, node_id: nodeId, is_final: isFinal } })
}
describe('streamPrinterFor — workflow separated reasoning', () => {
it('think: true frames reasoning_chunk to stderr, outputs stay clean on stdout', () => {
const sp = streamPrinterFor('workflow', true)
const cap = captures()
sp.onEvent(cap.out, cap.err, ev('node_started', { id: 'llm-1', title: 'LLM' }))
sp.onEvent(cap.out, cap.err, wfReasoning('pon', 'llm-1', false))
sp.onEvent(cap.out, cap.err, wfReasoning('dering', 'llm-1', false))
sp.onEvent(cap.out, cap.err, wfReasoning('', 'llm-1', true))
sp.onEvent(cap.out, cap.err, ev('workflow_finished', { data: { outputs: { result: 'clean answer' } } }))
sp.onEnd(cap.out, cap.err)
// node title precedes the reasoning block for attribution
expect(cap.errBuf()).toContain('→ LLM')
expect(cap.errBuf()).toContain('<think> [llm-1]\npondering</think>')
const parsed = JSON.parse(cap.outBuf().trim()) as { result: string }
expect(parsed.result).toBe('clean answer')
})
it('think: false drops reasoning_chunk entirely', () => {
const sp = streamPrinterFor('workflow', false)
const cap = captures()
sp.onEvent(cap.out, cap.err, wfReasoning('secret', 'llm-1', true))
sp.onEvent(cap.out, cap.err, ev('workflow_finished', { data: { outputs: { result: 'ok' } } }))
sp.onEnd(cap.out, cap.err)
expect(cap.errBuf()).not.toContain('secret')
const parsed = JSON.parse(cap.outBuf().trim()) as { result: string }
expect(parsed.result).toBe('ok')
})
it('closes an unterminated reasoning block on stream end', () => {
const sp = streamPrinterFor('workflow', true)
const cap = captures()
sp.onEvent(cap.out, cap.err, wfReasoning('thinking', 'llm-1', false))
sp.onEnd(cap.out, cap.err)
expect(cap.errBuf()).toContain('<think> [llm-1]\nthinking</think>')
})
it('keeps interleaved parallel-node reasoning in separate node-tagged blocks', () => {
const sp = streamPrinterFor('workflow', true)
const cap = captures()
sp.onEvent(cap.out, cap.err, wfReasoning('a1', 'llm-1', false))
sp.onEvent(cap.out, cap.err, wfReasoning('b1', 'llm-2', false))
sp.onEvent(cap.out, cap.err, wfReasoning('a2', 'llm-1', true))
sp.onEvent(cap.out, cap.err, wfReasoning('b2', 'llm-2', true))
sp.onEvent(cap.out, cap.err, ev('workflow_finished', { data: { outputs: { result: 'ok' } } }))
sp.onEnd(cap.out, cap.err)
expect(cap.errBuf()).toBe(
'<think> [llm-1]\na1</think>\n<think> [llm-2]\nb1</think>\n<think> [llm-1]\na2</think>\n<think> [llm-2]\nb2</think>\n',
)
})
})
describe('streamPrinterFor — unknown mode', () => {
it('throws', () => {
expect(() => streamPrinterFor('whatever')).toThrow()

View File

@ -4,7 +4,6 @@ import type { SseEvent } from '@/http/sse'
import { newError } from '@/errors/base'
import { ErrorCode } from '@/errors/codes'
import { colorEnabled, colorScheme } from '@/sys/io/color'
import { parseReasoningChunk, ReasoningChunkRenderer } from '@/sys/io/reasoning'
import { filterThinkInOutputs, ThinkChunkFilter } from '@/sys/io/think-filter'
import { RUN_MODES } from './handlers'
import { HitlPauseError } from './sse-collector'
@ -44,12 +43,9 @@ function handleCommonEvents(ev: SseEvent): boolean {
class ChatStreamPrinter implements StreamPrinter {
private convoId = ''
private readonly filter: ThinkChunkFilter
private readonly reasoning = new ReasoningChunkRenderer()
private readonly think: boolean
private readonly isTTY: boolean
constructor(think: boolean, isTTY = false) {
this.filter = new ThinkChunkFilter(think)
this.think = think
this.isTTY = isTTY
}
@ -66,15 +62,6 @@ class ChatStreamPrinter implements StreamPrinter {
this.convoId = c.conversation_id
return
}
// Stream separated-mode reasoning to stderr under --think.
case 'reasoning_chunk': {
if (!this.think)
return
const chunk = parseReasoningChunk(c)
if (chunk !== undefined)
this.reasoning.push(chunk, errOut)
return
}
case 'agent_thought':
if (typeof c.thought === 'string' && c.thought !== '')
errOut.write(`thought: ${c.thought}\n`)
@ -86,7 +73,6 @@ class ChatStreamPrinter implements StreamPrinter {
}
onEnd(out: NodeJS.WritableStream, errOut: NodeJS.WritableStream): void {
this.reasoning.flush(errOut)
this.filter.flush(out, errOut)
out.write('\n')
if (this.convoId !== '') {
@ -120,7 +106,6 @@ class CompletionStreamPrinter implements StreamPrinter {
class WorkflowStreamPrinter implements StreamPrinter {
private final: Record<string, unknown> | undefined
private readonly reasoning = new ReasoningChunkRenderer()
private readonly think: boolean
constructor(think: boolean) {
this.think = think
@ -139,15 +124,6 @@ class WorkflowStreamPrinter implements StreamPrinter {
errOut.write(`${title}\n`)
return
}
// Stream separated-mode reasoning to stderr under --think; the prior → title attributes the node.
case 'reasoning_chunk': {
if (!this.think)
return
const chunk = parseReasoningChunk(c)
if (chunk !== undefined)
this.reasoning.push(chunk, errOut)
return
}
case 'node_finished': {
const status = typeof c.status === 'string' ? c.status : ''
if (status !== '' && status !== 'succeeded') {
@ -162,7 +138,6 @@ class WorkflowStreamPrinter implements StreamPrinter {
}
onEnd(out: NodeJS.WritableStream, errOut: NodeJS.WritableStream): void {
this.reasoning.flush(errOut)
if (this.final === undefined)
return
const data = this.final.data

View File

@ -67,15 +67,3 @@ DIFY_E2E_PASSWORD=
# DIFY_E2E_HITL_SINGLE_ACTION_APP_ID=
# DIFY_E2E_HITL_MULTI_NODE_APP_ID=
# DIFY_E2E_WS2_APP_ID=
# ── Separated-mode reasoning suite (opt-in) ─────────────────────────────────
# run-app-reasoning.e2e.ts is skipped unless DIFY_E2E_REASONING_APP_ID resolves.
# It needs a chatflow whose LLM node uses reasoning_format=separated AND a
# workspace with a default chat model configured.
#
# Either point at an existing app:
# DIFY_E2E_REASONING_APP_ID=
#
# …or auto-provision reasoning-chat.yml (→ app name "reasoning-bot"). Off by
# default so the shared bootstrap stays free of any model dependency.
# DIFY_E2E_REASONING_PROVISION=1

View File

@ -39,12 +39,11 @@ test/e2e/
│ ├── describe-app.e2e.ts — describe app
│ └── get-app-all-workspaces.e2e.ts — get app -A ([EE] multi-workspace cases)
└── run/
├── run-app-basic.e2e.ts — basic run, -o json, --inputs, streaming,
│ conversation, CI mode
├── run-app-streaming.e2e.ts — Ctrl+C / error-event / chunk timing
├── run-app-file.e2e.ts — --file upload (local + remote URL)
├── run-app-reasoning.e2e.ts — separated-mode reasoning (--think); opt-in
└── run-app-hitl.e2e.ts — HITL pause + resume
├── run-app-basic.e2e.ts — basic run, -o json, --inputs, streaming,
│ conversation, CI mode
├── run-app-streaming.e2e.ts — Ctrl+C / error-event / chunk timing
├── run-app-file.e2e.ts — --file upload (local + remote URL)
└── run-app-hitl.e2e.ts — HITL pause + resume
```
## Edition support
@ -138,24 +137,6 @@ global-setup will:
| `DIFY_E2E_HITL_SINGLE_ACTION_APP_ID` | |
| `DIFY_E2E_HITL_MULTI_NODE_APP_ID` | |
| `DIFY_E2E_WS2_APP_ID` | Override secondary workspace app ID (EE) |
| `DIFY_E2E_REASONING_APP_ID` | separated-reasoning chatflow app ID (opt-in) |
| `DIFY_E2E_REASONING_PROVISION` | `1` → auto-provision `reasoning-chat.yml` |
### Separated-mode reasoning suite (opt-in)
`run-app-reasoning.e2e.ts` verifies the out-of-band `reasoning_chunk` channel
(PR #37460): `--think` surfaces the chain-of-thought to stderr framed as
`<think>…</think>`, the answer stays clean, and `-o json` persists it under
`metadata.reasoning`. It is **skipped** unless `DIFY_E2E_REASONING_APP_ID`
resolves, because it runs a real LLM node and needs:
1. a chatflow whose LLM node uses `reasoning_format: separated`, and
1. a workspace with a default chat model configured.
Point `DIFY_E2E_REASONING_APP_ID` at such an app, or set
`DIFY_E2E_REASONING_PROVISION=1` to import the `reasoning-chat.yml` fixture
(its system prompt forces a `<think>` block, so any chat model triggers the
separated path — no dedicated reasoning model required).
## Running tests

View File

@ -37,9 +37,6 @@
* DIFY_E2E_HITL_EXTERNAL_APP_ID
* DIFY_E2E_HITL_SINGLE_ACTION_APP_ID
* DIFY_E2E_HITL_MULTI_NODE_APP_ID
* DIFY_E2E_REASONING_APP_ID Override separated-reasoning chatflow app ID
* DIFY_E2E_REASONING_PROVISION=1 Opt in to auto-provisioning reasoning-chat.yml
* (needs a workspace default chat model)
*/
/** Supported edition values. */
@ -77,11 +74,7 @@ export type E2EEnv = {
fileAppId: string
/** Chat app (advanced-chat) with a file input variable */
fileChatAppId: string
/**
* Chatflow whose LLM node uses reasoning_format=separated. Empty unless
* DIFY_E2E_REASONING_APP_ID is set or the fixture is auto-provisioned; the
* run-app-reasoning suite is skipped when empty.
*/
/** Chat app with separated reasoning (needs real model) */
reasoningAppId: string
/**
* Secondary workspace ID EE only ("auto_test1").
@ -127,11 +120,11 @@ export type E2ECapabilities = {
workflowAppId: string
fileAppId: string
fileChatAppId: string
reasoningAppId: string
hitlAppId: string
hitlExternalAppId: string
hitlSingleActionAppId: string
hitlMultiNodeAppId: string
reasoningAppId: string
ws2AppId: string
}
@ -217,11 +210,11 @@ export function resolveEnv(caps: E2ECapabilities | undefined): E2EEnv {
workflowAppId: caps.workflowAppId || env.workflowAppId,
fileAppId: caps.fileAppId || env.fileAppId,
fileChatAppId: caps.fileChatAppId || env.fileChatAppId,
reasoningAppId: caps.reasoningAppId || env.reasoningAppId,
hitlAppId: caps.hitlAppId || env.hitlAppId,
hitlExternalAppId: caps.hitlExternalAppId || env.hitlExternalAppId,
hitlSingleActionAppId: caps.hitlSingleActionAppId || env.hitlSingleActionAppId,
hitlMultiNodeAppId: caps.hitlMultiNodeAppId || env.hitlMultiNodeAppId,
reasoningAppId: caps.reasoningAppId || env.reasoningAppId,
ws2AppId: caps.ws2AppId || env.ws2AppId,
}
}

View File

@ -182,11 +182,11 @@ export async function setup(project: TestProject): Promise<void> {
workflowAppId: '',
fileAppId: '',
fileChatAppId: '',
reasoningAppId: '',
hitlAppId: '',
hitlExternalAppId: '',
hitlSingleActionAppId: '',
hitlMultiNodeAppId: '',
reasoningAppId: '',
ws2AppId: '',
} satisfies E2ECapabilities)
return
@ -289,11 +289,11 @@ export async function setup(project: TestProject): Promise<void> {
workflowAppId: provisionedIds.DIFY_E2E_WORKFLOW_APP_ID || E.workflowAppId,
fileAppId: provisionedIds.DIFY_E2E_FILE_APP_ID || E.fileAppId,
fileChatAppId: provisionedIds.DIFY_E2E_FILE_CHAT_APP_ID || E.fileChatAppId,
reasoningAppId: provisionedIds.DIFY_E2E_REASONING_APP_ID || E.reasoningAppId,
hitlAppId: provisionedIds.DIFY_E2E_HITL_APP_ID || E.hitlAppId,
hitlExternalAppId: provisionedIds.DIFY_E2E_HITL_EXTERNAL_APP_ID || E.hitlExternalAppId,
hitlSingleActionAppId: provisionedIds.DIFY_E2E_HITL_SINGLE_ACTION_APP_ID || E.hitlSingleActionAppId,
hitlMultiNodeAppId: provisionedIds.DIFY_E2E_HITL_MULTI_NODE_APP_ID || E.hitlMultiNodeAppId,
reasoningAppId: provisionedIds.DIFY_E2E_REASONING_APP_ID || E.reasoningAppId,
ws2AppId: provisionedIds.DIFY_E2E_WS2_APP_ID || E.ws2AppId,
}
@ -505,12 +505,6 @@ async function provisionApps(
['hitl-single-action.yml', 'DIFY_E2E_HITL_SINGLE_ACTION_APP_ID', primaryWsId],
['hitl-multi-node.yml', 'DIFY_E2E_HITL_MULTI_NODE_APP_ID', primaryWsId],
['file-chat.yml', 'DIFY_E2E_FILE_CHAT_APP_ID', primaryWsId],
// reasoning-chat.yml runs a real LLM node, so it is opt-in: provisioning it
// requires the workspace to have a default chat model configured. Off by
// default to keep the shared bootstrap free of any model dependency.
...(process.env.DIFY_E2E_REASONING_PROVISION === '1'
? [['reasoning-chat.yml', 'DIFY_E2E_REASONING_APP_ID', primaryWsId] as [string, string, string]]
: []),
...(edition === 'ee'
? [['ws2-workflow.yml', 'DIFY_E2E_WS2_APP_ID', secondaryWsId] as [string, string, string]]
: []),

View File

@ -15,8 +15,6 @@ export type Scenario
| 'server-version-unsupported'
| 'run-422-stale'
| 'workflow-think'
| 'chat-reasoning'
| 'workflow-reasoning'
| 'import-pending'
| 'import-failed'

View File

@ -370,32 +370,6 @@ export function buildApp(getScenario: () => Scenario, state?: MockState): Hono {
])
return new Response(thinkSse, { status: 200, headers: { 'content-type': 'text/event-stream' } })
}
if (scenario === 'chat-reasoning') {
// Separated mode: reasoning streams out-of-band on `reasoning_chunk` (nested
// under `data`), the answer stays free of <think>, and the terminal reasoning
// is persisted into message_end metadata.
const reasoningSse = sseChunks([
{ event: 'reasoning_chunk', data: { data: { message_id: 'msg-1', reasoning: 'secret reasoning', node_id: 'llm-1', is_final: false } } },
{ event: 'reasoning_chunk', data: { data: { message_id: 'msg-1', reasoning: '', node_id: 'llm-1', is_final: true } } },
{ event: 'message', data: { message_id: 'msg-1', conversation_id: 'conv-1', mode: app.mode, answer: 'final answer' } },
{ event: 'message_end', data: { message_id: 'msg-1', conversation_id: 'conv-1', task_id: 'task-1', metadata: { reasoning: { 'llm-1': 'secret reasoning' } } } },
])
return new Response(reasoningSse, { status: 200, headers: { 'content-type': 'text/event-stream' } })
}
if (scenario === 'workflow-reasoning') {
// Separated mode in a workflow: reasoning streams out-of-band on
// `reasoning_chunk` (no message_id), outputs stay clean, and there is NO
// persisted metadata — the live deltas are the only source.
const wfReasoningSse = sseChunks([
{ event: 'workflow_started', data: { id: 'wf-run-1', workflow_id: 'wf-1' } },
{ event: 'node_started', data: { id: 'llm-1', title: 'LLM' } },
{ event: 'reasoning_chunk', data: { data: { reasoning: 'secret reasoning', node_id: 'llm-1', is_final: false } } },
{ event: 'reasoning_chunk', data: { data: { reasoning: '', node_id: 'llm-1', is_final: true } } },
{ event: 'node_finished', data: { id: 'llm-1', status: 'succeeded' } },
{ event: 'workflow_finished', data: { id: 'wf-run-1', workflow_id: 'wf-1', data: { id: 'wf-run-1', status: 'succeeded', outputs: { result: 'final answer' } } } },
])
return new Response(wfReasoningSse, { status: 200, headers: { 'content-type': 'text/event-stream' } })
}
const sse = streamingRunResponse(app.mode, query, isAgent)
return new Response(sse, { status: 200, headers: { 'content-type': 'text/event-stream' } })
})

View File

@ -40,7 +40,6 @@ const createHandlers = () => ({
handleWorkflowAgentLog: vi.fn(),
handleWorkflowTextChunk: vi.fn(),
handleWorkflowTextReplace: vi.fn(),
handleWorkflowReasoning: vi.fn(),
handleWorkflowPaused: vi.fn(),
})
@ -253,7 +252,6 @@ describe('useWorkflowRun callbacks helpers', () => {
callbacks.onAgentLog?.({ node_id: 'node-1' } as never)
callbacks.onTextChunk?.({ data: 'chunk' } as never)
callbacks.onTextReplace?.({ text: 'replacement' } as never)
callbacks.onReasoning?.({ data: { reasoning: 'thinking', node_id: 'node-1' } } as never)
callbacks.onHumanInputRequired?.({ node_id: 'node-1' } as never)
callbacks.onHumanInputFormFilled?.({ node_id: 'node-1' } as never)
callbacks.onHumanInputFormTimeout?.({ node_id: 'node-1' } as never)
@ -297,7 +295,6 @@ describe('useWorkflowRun callbacks helpers', () => {
expect(userCallbacks.onAgentLog).toHaveBeenCalled()
expect(handlers.handleWorkflowTextChunk).toHaveBeenCalled()
expect(handlers.handleWorkflowTextReplace).toHaveBeenCalled()
expect(handlers.handleWorkflowReasoning).toHaveBeenCalled()
expect(handlers.handleWorkflowNodeHumanInputRequired).toHaveBeenCalled()
expect(userCallbacks.onHumanInputRequired).toHaveBeenCalled()
expect(handlers.handleWorkflowNodeHumanInputFormFilled).toHaveBeenCalled()
@ -426,7 +423,6 @@ describe('useWorkflowRun callbacks helpers', () => {
finalCallbacks.onAgentLog?.({ node_id: 'node-1' } as never)
finalCallbacks.onTextChunk?.({ data: 'chunk' } as never)
finalCallbacks.onTextReplace?.({ text: 'replacement' } as never)
finalCallbacks.onReasoning?.({ data: { reasoning: 'thinking', node_id: 'node-1' } } as never)
finalCallbacks.onHumanInputRequired?.({ node_id: 'node-1' } as never)
finalCallbacks.onHumanInputFormFilled?.({ node_id: 'node-1' } as never)
finalCallbacks.onHumanInputFormTimeout?.({ node_id: 'node-1' } as never)
@ -465,7 +461,6 @@ describe('useWorkflowRun callbacks helpers', () => {
expect(userCallbacks.onAgentLog).toHaveBeenCalled()
expect(handlers.handleWorkflowTextChunk).toHaveBeenCalled()
expect(handlers.handleWorkflowTextReplace).toHaveBeenCalled()
expect(handlers.handleWorkflowReasoning).toHaveBeenCalled()
expect(handlers.handleWorkflowNodeHumanInputRequired).toHaveBeenCalled()
expect(userCallbacks.onHumanInputRequired).toHaveBeenCalled()
expect(handlers.handleWorkflowNodeHumanInputFormFilled).toHaveBeenCalled()

View File

@ -27,7 +27,6 @@ type WorkflowRunEventHandlers = {
handleWorkflowAgentLog: NonNullable<IOtherOptions['onAgentLog']>
handleWorkflowTextChunk: NonNullable<IOtherOptions['onTextChunk']>
handleWorkflowTextReplace: NonNullable<IOtherOptions['onTextReplace']>
handleWorkflowReasoning: NonNullable<IOtherOptions['onReasoning']>
handleWorkflowPaused: () => void
}
@ -115,7 +114,6 @@ export const createBaseWorkflowRunCallbacks = ({
handleWorkflowAgentLog,
handleWorkflowTextChunk,
handleWorkflowTextReplace,
handleWorkflowReasoning,
handleWorkflowPaused,
} = handlers
const {
@ -246,9 +244,6 @@ export const createBaseWorkflowRunCallbacks = ({
onTextReplace: (params) => {
handleWorkflowTextReplace(params)
},
onReasoning: (params) => {
handleWorkflowReasoning(params)
},
onTTSChunk: (messageId: string, audio: string) => {
if (!audio || audio === '')
return
@ -330,7 +325,6 @@ export const createFinalWorkflowRunCallbacks = ({
handleWorkflowAgentLog,
handleWorkflowTextChunk,
handleWorkflowTextReplace,
handleWorkflowReasoning,
handleWorkflowPaused,
} = handlers
const {
@ -445,9 +439,6 @@ export const createFinalWorkflowRunCallbacks = ({
onTextReplace: (params) => {
handleWorkflowTextReplace(params)
},
onReasoning: (params) => {
handleWorkflowReasoning(params)
},
onTTSChunk: (messageId: string, audio: string) => {
if (!audio || audio === '')
return

View File

@ -78,8 +78,6 @@ export const createRunningWorkflowState = () => {
},
tracing: [],
resultText: '',
reasoningContent: {},
reasoningFinished: false,
}
}

View File

@ -138,7 +138,6 @@ const useWorkflowRunBase = (doSyncWorkflowDraft: DoSyncWorkflowDraft) => {
handleWorkflowAgentLog,
handleWorkflowTextChunk,
handleWorkflowTextReplace,
handleWorkflowReasoning,
handleWorkflowPaused,
} = useWorkflowRunEvent()
@ -327,7 +326,6 @@ const useWorkflowRunBase = (doSyncWorkflowDraft: DoSyncWorkflowDraft) => {
handleWorkflowAgentLog,
handleWorkflowTextChunk,
handleWorkflowTextReplace,
handleWorkflowReasoning,
handleWorkflowPaused,
}
const userCallbacks = {
@ -445,7 +443,7 @@ const useWorkflowRunBase = (doSyncWorkflowDraft: DoSyncWorkflowDraft) => {
},
finalCallbacks,
)
}, [store, doSyncWorkflowDraft, workflowStore, pathname, handleWorkflowFailed, flowId, handleWorkflowStarted, handleWorkflowFinished, fetchInspectVars, invalidAllLastRun, invalidateRunHistory, handleWorkflowNodeStarted, handleWorkflowNodeFinished, handleWorkflowNodeIterationStarted, handleWorkflowNodeIterationNext, handleWorkflowNodeIterationFinished, handleWorkflowNodeLoopStarted, handleWorkflowNodeLoopNext, handleWorkflowNodeLoopFinished, handleWorkflowNodeRetry, handleWorkflowAgentLog, handleWorkflowTextChunk, handleWorkflowTextReplace, handleWorkflowReasoning, handleWorkflowPaused, handleWorkflowNodeHumanInputRequired, handleWorkflowNodeHumanInputFormFilled, handleWorkflowNodeHumanInputFormTimeout])
}, [store, doSyncWorkflowDraft, workflowStore, pathname, handleWorkflowFailed, flowId, handleWorkflowStarted, handleWorkflowFinished, fetchInspectVars, invalidAllLastRun, invalidateRunHistory, handleWorkflowNodeStarted, handleWorkflowNodeFinished, handleWorkflowNodeIterationStarted, handleWorkflowNodeIterationNext, handleWorkflowNodeIterationFinished, handleWorkflowNodeLoopStarted, handleWorkflowNodeLoopNext, handleWorkflowNodeLoopFinished, handleWorkflowNodeRetry, handleWorkflowAgentLog, handleWorkflowTextChunk, handleWorkflowTextReplace, handleWorkflowPaused, handleWorkflowNodeHumanInputRequired, handleWorkflowNodeHumanInputFormFilled, handleWorkflowNodeHumanInputFormTimeout])
const handleStopRun = useCallback((taskId: string) => {
const setStoppedState = () => {

View File

@ -16,7 +16,6 @@ const handlers = vi.hoisted(() => ({
handleWorkflowNodeRetry: vi.fn(),
handleWorkflowTextChunk: vi.fn(),
handleWorkflowTextReplace: vi.fn(),
handleWorkflowReasoning: vi.fn(),
handleWorkflowAgentLog: vi.fn(),
handleWorkflowPaused: vi.fn(),
handleWorkflowNodeHumanInputRequired: vi.fn(),
@ -46,10 +45,6 @@ vi.mock('..', () => ({
useWorkflowNodeHumanInputFormTimeout: () => ({ handleWorkflowNodeHumanInputFormTimeout: handlers.handleWorkflowNodeHumanInputFormTimeout }),
}))
vi.mock('../use-workflow-reasoning', () => ({
useWorkflowReasoning: () => ({ handleWorkflowReasoning: handlers.handleWorkflowReasoning }),
}))
describe('useWorkflowRunEvent', () => {
it('returns the composed handlers from all workflow event hooks', () => {
const { result } = renderHook(() => useWorkflowRunEvent())

View File

@ -1,30 +0,0 @@
import type { ReasoningChunkResponse } from '@/types/workflow'
import { produce } from 'immer'
import { useCallback } from 'react'
import { useWorkflowStore } from '@/app/components/workflow/store'
export const useWorkflowReasoning = () => {
const workflowStore = useWorkflowStore()
const handleWorkflowReasoning = useCallback((params: ReasoningChunkResponse) => {
const { data: { reasoning, node_id, is_final } } = params
const {
workflowRunningData,
setWorkflowRunningData,
} = workflowStore.getState()
setWorkflowRunningData(produce(workflowRunningData!, (draft) => {
const reasoningContent = (draft.reasoningContent ||= {})
// key by LLM node so multiple nodes' reasoning stays separated
const key = node_id || '_'
if (reasoning)
reasoningContent[key] = (reasoningContent[key] || '') + reasoning
if (is_final)
draft.reasoningFinished = true
}))
}, [workflowStore])
return {
handleWorkflowReasoning,
}
}

View File

@ -19,7 +19,6 @@ import {
useWorkflowTextChunk,
useWorkflowTextReplace,
} from '.'
import { useWorkflowReasoning } from './use-workflow-reasoning'
export const useWorkflowRunEvent = () => {
const { handleWorkflowStarted } = useWorkflowStarted()
@ -36,7 +35,6 @@ export const useWorkflowRunEvent = () => {
const { handleWorkflowNodeRetry } = useWorkflowNodeRetry()
const { handleWorkflowTextChunk } = useWorkflowTextChunk()
const { handleWorkflowTextReplace } = useWorkflowTextReplace()
const { handleWorkflowReasoning } = useWorkflowReasoning()
const { handleWorkflowAgentLog } = useWorkflowAgentLog()
const { handleWorkflowPaused } = useWorkflowPaused()
const { handleWorkflowNodeHumanInputRequired } = useWorkflowNodeHumanInputRequired()
@ -58,7 +56,6 @@ export const useWorkflowRunEvent = () => {
handleWorkflowNodeRetry,
handleWorkflowTextChunk,
handleWorkflowTextReplace,
handleWorkflowReasoning,
handleWorkflowAgentLog,
handleWorkflowPaused,
handleWorkflowNodeHumanInputFormFilled,

View File

@ -71,12 +71,6 @@ vi.mock('@/app/components/workflow/run/tracing-panel', () => ({
default: ({ list }: { list: unknown[] }) => <div data-testid="tracing-panel">{list.length}</div>,
}))
vi.mock('@/app/components/base/chat/chat/answer/reasoning-panel', () => ({
default: ({ content, done }: { content: Record<string, string>, done: boolean }) => (
<div data-testid="reasoning-panel" data-done={String(done)}>{Object.keys(content).join(',')}</div>
),
}))
vi.mock('@/app/components/workflow/panel/inputs-panel', () => ({
default: ({ onRun }: { onRun: () => void }) => (
<button type="button" onClick={onRun}>
@ -347,80 +341,6 @@ describe('WorkflowPreview', () => {
expect(screen.getByTestId('result-panel')).toBeInTheDocument()
})
it('should render a single merged reasoning panel above the result on the result tab', async () => {
const user = userEvent.setup()
renderWorkflowComponent(
<WorkflowPreview />,
{
initialStoreState: {
workflowRunningData: {
...createWorkflowRunningData({
result: createWorkflowResult({ status: WorkflowRunningStatus.Running }),
}),
resultText: '',
reasoningContent: { 'llm-1': 'thinking a', 'llm-2': 'thinking b' },
} as NonNullable<Shape['workflowRunningData']>,
},
},
)
await user.click(screen.getByText('runLog.result'))
// one panel that carries both nodes' reasoning; still running → timer keeps ticking
const panels = screen.getAllByTestId('reasoning-panel')
expect(panels).toHaveLength(1)
expect(panels[0]).toHaveTextContent('llm-1,llm-2')
expect(panels[0]).toHaveAttribute('data-done', 'false')
})
it('should mark reasoning done once the answer starts streaming while still running', async () => {
const user = userEvent.setup()
renderWorkflowComponent(
<WorkflowPreview />,
{
initialStoreState: {
workflowRunningData: {
...createWorkflowRunningData({
result: createWorkflowResult({ status: WorkflowRunningStatus.Running }),
}),
resultText: 'the answer',
reasoningContent: { 'llm-1': 'thinking a' },
} as NonNullable<Shape['workflowRunningData']>,
},
},
)
await user.click(screen.getByText('runLog.result'))
// answer-started (resultText non-empty) freezes the timer even though the run is still Running
expect(screen.getByTestId('reasoning-panel')).toHaveAttribute('data-done', 'true')
})
it('should not render a reasoning panel when there is no reasoning content', async () => {
const user = userEvent.setup()
renderWorkflowComponent(
<WorkflowPreview />,
{
initialStoreState: {
workflowRunningData: {
...createWorkflowRunningData({
result: createWorkflowResult({ status: WorkflowRunningStatus.Running }),
}),
resultText: '',
reasoningContent: { llm: '' },
} as NonNullable<Shape['workflowRunningData']>,
},
},
)
await user.click(screen.getByText('runLog.result'))
expect(screen.queryByTestId('reasoning-panel')).not.toBeInTheDocument()
})
it('should switch to the tracing tab when result panel requests it', async () => {
const user = userEvent.setup()

View File

@ -10,7 +10,6 @@ import {
useState,
} from 'react'
import { useTranslation } from 'react-i18next'
import ReasoningPanel from '@/app/components/base/chat/chat/answer/reasoning-panel'
import Loading from '@/app/components/base/loading'
import { submitHumanInputForm } from '@/service/workflow'
import {
@ -204,17 +203,6 @@ const WorkflowPreview = () => {
humanInputFilledFormDataList={humanInputFilledFormDataList}
/>
)}
{workflowRunningData?.reasoningContent && Object.values(workflowRunningData.reasoningContent).some(Boolean) && (
<ReasoningPanel
content={workflowRunningData.reasoningContent}
// freeze the timer once the answer starts streaming — reasoningFinished and status only flip at run end
done={
!!workflowRunningData?.resultText?.trim()
|| !!workflowRunningData?.reasoningFinished
|| workflowRunningData?.result?.status !== WorkflowRunningStatus.Running
}
/>
)}
<ResultText
isRunning={workflowRunningData?.result?.status === WorkflowRunningStatus.Running || !workflowRunningData?.result}
isPaused={workflowRunningData?.result?.status === WorkflowRunningStatus.Paused}

View File

@ -1,7 +1,6 @@
'use client'
import type { FC } from 'react'
import { useTranslation } from 'react-i18next'
import { ChatContextProvider } from '@/app/components/base/chat/chat/context-provider'
import LoadingAnim from '@/app/components/base/chat/chat/loading-anim'
import { FileList } from '@/app/components/base/file-uploader'
import { ImageIndentLeft } from '@/app/components/base/icons/src/vender/line/editor'
@ -61,10 +60,7 @@ const ResultText: FC<ResultTextProps> = ({
<>
{outputs && (
<div className="px-4 py-2">
{/* ThinkBlock's timer reads isResponding from ChatContext, which the run panel otherwise lacks. */}
<ChatContextProvider chatList={[]} isResponding={!!isRunning}>
<Markdown content={outputs} />
</ChatContextProvider>
<Markdown content={outputs} />
</div>
)}
{!!allFiles?.length && allFiles.map(item => (

View File

@ -11,9 +11,7 @@ type PreviewRunningData = WorkflowRunningData & {
resultTabActive?: boolean
resultText?: string
resultTextSelectorKey?: string
// separated-mode reasoning deltas per LLM node id (live preview only)
reasoningContent?: Record<string, string>
// true once the terminal reasoning marker arrived
reasoningFinished?: boolean
// human input form schema or data cached when node is in 'Paused' status
extraContentAndFormData?: Record<string, unknown>