From d78d7a3b8e583784c3d15fcfb92d7e54b56e2d57 Mon Sep 17 00:00:00 2001 From: "autofix-ci[bot]" <114827586+autofix-ci[bot]@users.noreply.github.com> Date: Wed, 3 Jun 2026 20:03:08 +0000 Subject: [PATCH 1/8] [autofix.ci] apply automated fixes --- .../advanced_chat/generate_task_pipeline.py | 9 +- .../apps/workflow/generate_task_pipeline.py | 34 ++--- api/core/app/entities/task_entities.py | 3 +- .../test_generate_task_pipeline_core.py | 137 ++++++++---------- .../app/apps/test_workflow_app_runner_core.py | 14 -- .../test_generate_task_pipeline_core.py | 104 ++++++++----- cli/src/commands/resume/app/index.ts | 2 +- .../app/_strategies/streaming-structured.ts | 8 - cli/src/commands/run/app/index.ts | 2 +- cli/src/commands/run/app/run.test.ts | 79 ---------- .../commands/run/app/sse-collector.test.ts | 68 --------- cli/src/commands/run/app/sse-collector.ts | 37 +---- .../commands/run/app/stream-handlers.test.ts | 92 ------------ cli/src/commands/run/app/stream-handlers.ts | 25 ---- cli/test/e2e/.env.e2e.example | 12 -- cli/test/e2e/README.md | 29 +--- cli/test/e2e/setup/env.ts | 12 -- cli/test/e2e/setup/global-setup.ts | 8 - cli/test/fixtures/dify-mock/scenarios.ts | 2 - cli/test/fixtures/dify-mock/server.ts | 26 ---- .../use-workflow-run-callbacks.spec.ts | 5 - .../hooks/use-workflow-run-callbacks.ts | 9 -- .../hooks/use-workflow-run-utils.ts | 2 - .../workflow-app/hooks/use-workflow-run.ts | 4 +- .../__tests__/use-workflow-run-event.spec.ts | 5 - .../use-workflow-run-event.ts | 3 - .../panel/__tests__/workflow-preview.spec.tsx | 80 ---------- .../workflow/panel/workflow-preview.tsx | 12 -- .../components/workflow/run/result-text.tsx | 6 +- .../workflow/store/workflow/workflow-slice.ts | 4 - 30 files changed, 160 insertions(+), 673 deletions(-) diff --git a/api/core/app/apps/advanced_chat/generate_task_pipeline.py b/api/core/app/apps/advanced_chat/generate_task_pipeline.py index c0d6952871e..a9e251a5d41 100644 --- a/api/core/app/apps/advanced_chat/generate_task_pipeline.py +++ b/api/core/app/apps/advanced_chat/generate_task_pipeline.py @@ -87,7 +87,7 @@ from models import Account, Conversation, EndUser, Message, MessageFile from models.enums import CreatorUserRole, MessageFileBelongsTo, MessageStatus from models.execution_extra_content import HumanInputContent from models.model import AppMode -from models.workflow import Workflow +from models.workflow import Workflow, WorkflowRun logger = logging.getLogger(__name__) @@ -774,6 +774,13 @@ class AdvancedChatAppGenerateTaskPipeline(GraphRuntimeStateSupport): with self._database_session() as session: # Save message self._save_message(session=session, graph_runtime_state=resolved_state) + # Update workflow run status to STOPPED so it does not remain RUNNING in logs + workflow_run = session.scalar(select(WorkflowRun).where(WorkflowRun.id == self._workflow_run_id)) + if workflow_run is not None: + workflow_run.status = WorkflowExecutionStatus.STOPPED + workflow_run.error = event.get_stop_reason() + workflow_run.finished_at = naive_utc_now() + session.commit() yield workflow_finish_resp elif event.stopped_by in ( diff --git a/api/core/app/apps/workflow/generate_task_pipeline.py b/api/core/app/apps/workflow/generate_task_pipeline.py index 6ca4053d7de..8182a2e77b9 100644 --- a/api/core/app/apps/workflow/generate_task_pipeline.py +++ b/api/core/app/apps/workflow/generate_task_pipeline.py @@ -4,6 +4,7 @@ from collections.abc import Callable, Generator from contextlib import contextmanager from typing import Union +from sqlalchemy import select from sqlalchemy.orm import Session, sessionmaker from constants.tts_auto_play_timeout import TTS_AUTO_PLAY_TIMEOUT, TTS_AUTO_PLAY_YIELD_CPU_TIME @@ -31,7 +32,6 @@ from core.app.entities.queue_entities import ( QueueNodeStartedEvent, QueueNodeSucceededEvent, QueuePingEvent, - QueueReasoningChunkEvent, QueueStopEvent, QueueTextChunkEvent, QueueWorkflowFailedEvent, @@ -48,7 +48,6 @@ from core.app.entities.task_entities import ( MessageAudioEndStreamResponse, MessageAudioStreamResponse, PingStreamResponse, - ReasoningChunkStreamResponse, StreamResponse, TextChunkStreamResponse, WorkflowAppBlockingResponse, @@ -66,10 +65,11 @@ from extensions.ext_database import db from graphon.entities import WorkflowStartReason from graphon.enums import WorkflowExecutionStatus from graphon.runtime import GraphRuntimeState +from libs.datetime_utils import naive_utc_now from models import Account from models.enums import CreatorUserRole from models.model import EndUser -from models.workflow import Workflow, WorkflowAppLog, WorkflowAppLogCreatedFrom +from models.workflow import Workflow, WorkflowAppLog, WorkflowAppLogCreatedFrom, WorkflowRun logger = logging.getLogger(__name__) @@ -552,6 +552,17 @@ class WorkflowAppGenerateTaskPipeline(GraphRuntimeStateSupport): error=error, exceptions_count=exceptions_count, ) + + # Persist STOPPED status to the database so the workflow run does not remain RUNNING in logs + if isinstance(event, QueueStopEvent) and self._workflow_execution_id: + with self._database_session() as session: + workflow_run = session.scalar(select(WorkflowRun).where(WorkflowRun.id == self._workflow_execution_id)) + if workflow_run is not None: + workflow_run.status = WorkflowExecutionStatus.STOPPED + workflow_run.error = error + workflow_run.finished_at = naive_utc_now() + session.commit() + yield workflow_finish_resp def _handle_text_chunk_event( @@ -573,22 +584,6 @@ class WorkflowAppGenerateTaskPipeline(GraphRuntimeStateSupport): yield self._text_chunk_to_stream_response(delta_text, from_variable_selector=event.from_variable_selector) - def _handle_reasoning_chunk_event( - self, event: QueueReasoningChunkEvent, **kwargs - ) -> Generator[StreamResponse, None, None]: - """Handle reasoning chunk events.""" - # is_final with empty reasoning is still forwarded as the "thinking finished" signal - if not event.reasoning and not event.is_final: - return - yield ReasoningChunkStreamResponse( - task_id=self._application_generate_entity.task_id, - data=ReasoningChunkStreamResponse.Data( - reasoning=event.reasoning, - node_id=event.from_node_id, - is_final=event.is_final, - ), - ) - def _handle_agent_log_event(self, event: QueueAgentLogEvent, **kwargs) -> Generator[StreamResponse, None, None]: """Handle agent log events.""" yield self._workflow_response_converter.handle_agent_log( @@ -618,7 +613,6 @@ class WorkflowAppGenerateTaskPipeline(GraphRuntimeStateSupport): QueuePingEvent: self._handle_ping_event, QueueErrorEvent: self._handle_error_event, QueueTextChunkEvent: self._handle_text_chunk_event, - QueueReasoningChunkEvent: self._handle_reasoning_chunk_event, # Workflow events QueueWorkflowStartedEvent: self._handle_workflow_started_event, QueueWorkflowSucceededEvent: self._handle_workflow_succeeded_event, diff --git a/api/core/app/entities/task_entities.py b/api/core/app/entities/task_entities.py index ca5a26db55b..f98fe6fb0be 100644 --- a/api/core/app/entities/task_entities.py +++ b/api/core/app/entities/task_entities.py @@ -743,8 +743,7 @@ class ReasoningChunkStreamResponse(StreamResponse): Data entity """ - # chat apps set this; workflow runs have no message - message_id: str | None = None + message_id: str reasoning: str node_id: str | None = None is_final: bool = False diff --git a/api/tests/unit_tests/core/app/apps/advanced_chat/test_generate_task_pipeline_core.py b/api/tests/unit_tests/core/app/apps/advanced_chat/test_generate_task_pipeline_core.py index 4285fe088c6..3af7fad6773 100644 --- a/api/tests/unit_tests/core/app/apps/advanced_chat/test_generate_task_pipeline_core.py +++ b/api/tests/unit_tests/core/app/apps/advanced_chat/test_generate_task_pipeline_core.py @@ -29,7 +29,6 @@ from core.app.entities.queue_entities import ( QueueNodeExceptionEvent, QueueNodeFailedEvent, QueuePingEvent, - QueueReasoningChunkEvent, QueueRetrieverResourcesEvent, QueueStopEvent, QueueTextChunkEvent, @@ -47,12 +46,11 @@ from core.app.entities.task_entities import ( MessageAudioStreamResponse, MessageEndStreamResponse, PingStreamResponse, - ReasoningChunkStreamResponse, ) from core.base.tts.app_generator_tts_publisher import AudioTrunk from core.workflow.system_variables import build_system_variables from graphon.entities.pause_reason import PauseReasonType -from graphon.enums import BuiltinNodeTypes +from graphon.enums import BuiltinNodeTypes, WorkflowExecutionStatus from graphon.nodes.human_input.entities import UserActionConfig from graphon.runtime import GraphRuntimeState, VariablePool from libs.datetime_utils import naive_utc_now @@ -198,42 +196,6 @@ class TestAdvancedChatGenerateTaskPipeline: assert pipeline._task_state.answer == "hi" assert responses - def test_handle_reasoning_chunk_event_emits_on_nonempty(self): - pipeline = _make_pipeline() - event = QueueReasoningChunkEvent(reasoning="pondering", from_node_id="llm-1", is_final=False) - - responses = list(pipeline._handle_reasoning_chunk_event(event)) - - assert len(responses) == 1 - response = responses[0] - assert isinstance(response, ReasoningChunkStreamResponse) - assert response.data.message_id == pipeline._message_id - assert response.data.reasoning == "pondering" - assert response.data.node_id == "llm-1" - assert response.data.is_final is False - # reasoning never touches the answer stream - assert pipeline._task_state.answer == "" - - def test_handle_reasoning_chunk_event_drops_empty_nonfinal(self): - pipeline = _make_pipeline() - event = QueueReasoningChunkEvent(reasoning="", from_node_id="llm-1", is_final=False) - - responses = list(pipeline._handle_reasoning_chunk_event(event)) - - assert responses == [] - - def test_handle_reasoning_chunk_event_emits_empty_final_marker(self): - pipeline = _make_pipeline() - event = QueueReasoningChunkEvent(reasoning="", from_node_id="llm-1", is_final=True) - - responses = list(pipeline._handle_reasoning_chunk_event(event)) - - assert len(responses) == 1 - response = responses[0] - assert isinstance(response, ReasoningChunkStreamResponse) - assert response.data.reasoning == "" - assert response.data.is_final is True - def test_listen_audio_msg_returns_audio_stream(self): pipeline = _make_pipeline() publisher = SimpleNamespace(check_and_get_audio=lambda: AudioTrunk(status="stream", audio="data")) @@ -357,43 +319,6 @@ class TestAdvancedChatGenerateTaskPipeline: assert responses == ["done"] assert pipeline._recorded_files - def test_handle_node_succeeded_event_records_llm_reasoning(self): - pipeline = _make_pipeline() - pipeline._workflow_response_converter.fetch_files_from_node_outputs = lambda outputs: [] - pipeline._workflow_response_converter.workflow_node_finish_to_stream_response = lambda **kwargs: "done" - pipeline._save_output_for_event = lambda event, node_execution_id: None - - event = SimpleNamespace( - node_type=BuiltinNodeTypes.LLM, - outputs={"reasoning_content": "first pass "}, - node_execution_id="exec", - node_id="llm-1", - ) - - list(pipeline._handle_node_succeeded_event(event)) - - assert pipeline._task_state.metadata.reasoning == {"llm-1": "first pass "} - - def test_handle_node_succeeded_event_accumulates_reasoning_across_passes(self): - pipeline = _make_pipeline() - pipeline._workflow_response_converter.fetch_files_from_node_outputs = lambda outputs: [] - pipeline._workflow_response_converter.workflow_node_finish_to_stream_response = lambda **kwargs: "done" - pipeline._save_output_for_event = lambda event, node_execution_id: None - - def _llm_event(reasoning: str): - return SimpleNamespace( - node_type=BuiltinNodeTypes.LLM, - outputs={"reasoning_content": reasoning}, - node_execution_id="exec", - node_id="llm-1", - ) - - # Same node id across iteration/loop passes must accumulate, not overwrite. - list(pipeline._handle_node_succeeded_event(_llm_event("pass one "))) - list(pipeline._handle_node_succeeded_event(_llm_event("pass two"))) - - assert pipeline._task_state.metadata.reasoning == {"llm-1": "pass one pass two"} - def test_iteration_and_loop_handlers(self): pipeline = _make_pipeline() pipeline._workflow_run_id = "run-id" @@ -715,6 +640,66 @@ class TestAdvancedChatGenerateTaskPipeline: assert responses == ["end"] assert saved == ["saved"] + def test_handle_stop_event_updates_workflow_run_status_for_user_manual_stop(self, monkeypatch: pytest.MonkeyPatch): + pipeline = _make_pipeline() + pipeline._message_end_to_stream_response = lambda: "end" + pipeline._workflow_response_converter = SimpleNamespace( + workflow_finish_to_stream_response=lambda **kwargs: "finish" + ) + pipeline._workflow_run_id = "run-id" + pipeline._workflow_id = "workflow-id" + pipeline._graph_runtime_state = GraphRuntimeState( + variable_pool=VariablePool.from_bootstrap( + system_variables=build_system_variables(workflow_execution_id="run-id") + ), + start_at=0.0, + ) + saved: list[str] = [] + + def _save_message(**kwargs): + saved.append("saved") + + pipeline._save_message = _save_message + + # Track whether workflow_run status was updated + status_updates: list[dict] = [] + + class FakeWorkflowRun: + def __init__(self): + self.status = None + self.error = None + self.finished_at = None + + fake_run = FakeWorkflowRun() + + class FakeSession: + def scalar(self, stmt): + return fake_run + + def commit(self): + status_updates.append( + { + "status": fake_run.status, + "error": fake_run.error, + "finished_at": fake_run.finished_at, + } + ) + + @contextmanager + def _fake_session(): + yield FakeSession() + + monkeypatch.setattr(pipeline, "_database_session", _fake_session) + + responses = list(pipeline._handle_stop_event(QueueStopEvent(stopped_by=QueueStopEvent.StopBy.USER_MANUAL))) + + assert responses == ["finish", "end"] + assert saved == ["saved"] + assert len(status_updates) == 1 + assert status_updates[0]["status"] == WorkflowExecutionStatus.STOPPED + assert status_updates[0]["error"] == "Stopped by user." + assert status_updates[0]["finished_at"] is not None + def test_handle_message_end_event_applies_output_moderation(self, monkeypatch: pytest.MonkeyPatch): pipeline = _make_pipeline() pipeline._graph_runtime_state = GraphRuntimeState( diff --git a/api/tests/unit_tests/core/app/apps/test_workflow_app_runner_core.py b/api/tests/unit_tests/core/app/apps/test_workflow_app_runner_core.py index 69ed5919d27..c463c155a52 100644 --- a/api/tests/unit_tests/core/app/apps/test_workflow_app_runner_core.py +++ b/api/tests/unit_tests/core/app/apps/test_workflow_app_runner_core.py @@ -16,7 +16,6 @@ from core.app.entities.queue_entities import ( QueueNodeFailedEvent, QueueNodeRetryEvent, QueueNodeSucceededEvent, - QueueReasoningChunkEvent, QueueTextChunkEvent, QueueWorkflowPausedEvent, QueueWorkflowStartedEvent, @@ -35,7 +34,6 @@ from graphon.graph_events import ( NodeRunHumanInputFormFilledEvent, NodeRunIterationSucceededEvent, NodeRunLoopFailedEvent, - NodeRunReasoningChunkEvent, NodeRunRetryEvent, NodeRunStartedEvent, NodeRunStreamChunkEvent, @@ -397,17 +395,6 @@ class TestWorkflowBasedAppRunner: is_final=False, ), ) - runner._handle_event( - workflow_entry, - NodeRunReasoningChunkEvent( - id="exec", - node_id="node", - node_type=BuiltinNodeTypes.LLM, - selector=["node", "reasoning_content"], - chunk="thinking", - is_final=False, - ), - ) runner._handle_event( workflow_entry, NodeRunAgentLogEvent( @@ -455,7 +442,6 @@ class TestWorkflowBasedAppRunner: ) assert any(isinstance(event, QueueTextChunkEvent) for event in published) - assert any(isinstance(event, QueueReasoningChunkEvent) for event in published) assert any(isinstance(event, QueueAgentLogEvent) for event in published) assert any(isinstance(event, QueueIterationCompletedEvent) for event in published) assert any(isinstance(event, QueueLoopCompletedEvent) for event in published) diff --git a/api/tests/unit_tests/core/app/apps/workflow/test_generate_task_pipeline_core.py b/api/tests/unit_tests/core/app/apps/workflow/test_generate_task_pipeline_core.py index 04fe7a2ebed..21fb6d462ee 100644 --- a/api/tests/unit_tests/core/app/apps/workflow/test_generate_task_pipeline_core.py +++ b/api/tests/unit_tests/core/app/apps/workflow/test_generate_task_pipeline_core.py @@ -27,7 +27,6 @@ from core.app.entities.queue_entities import ( QueueNodeStartedEvent, QueueNodeSucceededEvent, QueuePingEvent, - QueueReasoningChunkEvent, QueueStopEvent, QueueTextChunkEvent, QueueWorkflowFailedEvent, @@ -42,7 +41,6 @@ from core.app.entities.task_entities import ( MessageAudioEndStreamResponse, MessageAudioStreamResponse, PingStreamResponse, - ReasoningChunkStreamResponse, WorkflowAppPausedBlockingResponse, WorkflowFinishStreamResponse, WorkflowStartStreamResponse, @@ -268,41 +266,6 @@ class TestWorkflowGenerateTaskPipeline: assert responses[0].data.text == "hi" assert published == [queue_message] - def test_handle_reasoning_chunk_event_emits_on_nonempty(self): - pipeline = _make_pipeline() - event = QueueReasoningChunkEvent(reasoning="pondering", from_node_id="llm-1", is_final=False) - - responses = list(pipeline._handle_reasoning_chunk_event(event)) - - assert len(responses) == 1 - response = responses[0] - assert isinstance(response, ReasoningChunkStreamResponse) - # workflow runs have no message, so the id is omitted - assert response.data.message_id is None - assert response.data.reasoning == "pondering" - assert response.data.node_id == "llm-1" - assert response.data.is_final is False - - def test_handle_reasoning_chunk_event_drops_empty_nonfinal(self): - pipeline = _make_pipeline() - event = QueueReasoningChunkEvent(reasoning="", from_node_id="llm-1", is_final=False) - - responses = list(pipeline._handle_reasoning_chunk_event(event)) - - assert responses == [] - - def test_handle_reasoning_chunk_event_emits_empty_final_marker(self): - pipeline = _make_pipeline() - event = QueueReasoningChunkEvent(reasoning="", from_node_id="llm-1", is_final=True) - - responses = list(pipeline._handle_reasoning_chunk_event(event)) - - assert len(responses) == 1 - response = responses[0] - assert isinstance(response, ReasoningChunkStreamResponse) - assert response.data.reasoning == "" - assert response.data.is_final is True - def test_dispatch_event_handles_node_failed(self): pipeline = _make_pipeline() pipeline._workflow_response_converter.workflow_node_finish_to_stream_response = lambda **kwargs: "done" @@ -320,7 +283,7 @@ class TestWorkflowGenerateTaskPipeline: assert list(pipeline._dispatch_event(event)) == ["done"] - def test_handle_stop_event_yields_finish(self): + def test_handle_stop_event_yields_finish(self, monkeypatch: pytest.MonkeyPatch): pipeline = _make_pipeline() pipeline._workflow_execution_id = "run-id" pipeline._graph_runtime_state = GraphRuntimeState( @@ -331,6 +294,19 @@ class TestWorkflowGenerateTaskPipeline: ) pipeline._workflow_response_converter.workflow_finish_to_stream_response = lambda **kwargs: "finish" + @contextmanager + def _fake_session(): + class FakeSession: + def scalar(self, stmt): + return None + + def commit(self): + pass + + yield FakeSession() + + monkeypatch.setattr(pipeline, "_database_session", _fake_session) + responses = list( pipeline._handle_workflow_failed_and_stop_events( QueueStopEvent(stopped_by=QueueStopEvent.StopBy.USER_MANUAL) @@ -339,6 +315,58 @@ class TestWorkflowGenerateTaskPipeline: assert responses == ["finish"] + def test_handle_stop_event_updates_workflow_run_status(self, monkeypatch: pytest.MonkeyPatch): + pipeline = _make_pipeline() + pipeline._workflow_execution_id = "run-id" + pipeline._graph_runtime_state = GraphRuntimeState( + variable_pool=VariablePool.from_bootstrap( + system_variables=build_system_variables(workflow_execution_id="run-id") + ), + start_at=0.0, + ) + pipeline._workflow_response_converter.workflow_finish_to_stream_response = lambda **kwargs: "finish" + + status_updates: list[dict] = [] + + class FakeWorkflowRun: + def __init__(self): + self.status = None + self.error = None + self.finished_at = None + + fake_run = FakeWorkflowRun() + + class FakeSession: + def scalar(self, stmt): + return fake_run + + def commit(self): + status_updates.append( + { + "status": fake_run.status, + "error": fake_run.error, + "finished_at": fake_run.finished_at, + } + ) + + @contextmanager + def _fake_session(): + yield FakeSession() + + monkeypatch.setattr(pipeline, "_database_session", _fake_session) + + responses = list( + pipeline._handle_workflow_failed_and_stop_events( + QueueStopEvent(stopped_by=QueueStopEvent.StopBy.USER_MANUAL) + ) + ) + + assert responses == ["finish"] + assert len(status_updates) == 1 + assert status_updates[0]["status"] == WorkflowExecutionStatus.STOPPED + assert status_updates[0]["error"] == "Stopped by user." + assert status_updates[0]["finished_at"] is not None + def test_save_workflow_app_log_created_from(self): pipeline = _make_pipeline() pipeline._application_generate_entity.invoke_from = InvokeFrom.SERVICE_API diff --git a/cli/src/commands/resume/app/index.ts b/cli/src/commands/resume/app/index.ts index c4bbfbf2f56..596654181b5 100644 --- a/cli/src/commands/resume/app/index.ts +++ b/cli/src/commands/resume/app/index.ts @@ -29,7 +29,7 @@ export default class ResumeApp extends DifyCommand { 'workspace': Flags.string({ description: 'workspace id override' }), 'with-history': Flags.boolean({ description: 'Replay executed-node history before attaching to live stream.', default: false }), 'stream': Flags.boolean({ description: 'Print output live as tokens/events arrive. Default: collect and print at end.', default: false }), - 'think': Flags.boolean({ description: 'Show model thinking/reasoning when available — both inline ... blocks and separated reasoning streams. Hidden by default; with --think, thinking is printed to stderr.', default: false }), + 'think': Flags.boolean({ description: 'Show model thinking/reasoning when available. Strips ... blocks silently by default; with --think, thinking is printed to stderr.', default: false }), 'output': Flags.outputFormat({ options: [OutputFormat.JSON, OutputFormat.YAML, OutputFormat.TEXT], default: '' }), 'http-retry': httpRetryFlag, } diff --git a/cli/src/commands/run/app/_strategies/streaming-structured.ts b/cli/src/commands/run/app/_strategies/streaming-structured.ts index de39bfda2e5..3ed602a3410 100644 --- a/cli/src/commands/run/app/_strategies/streaming-structured.ts +++ b/cli/src/commands/run/app/_strategies/streaming-structured.ts @@ -7,7 +7,6 @@ import { collect, HitlPauseError } from '@/commands/run/app/sse-collector' import { formatted, stringifyOutput } from '@/framework/output' import { handle, unhandle } from '@/sys/index' import { colorEnabled, colorScheme } from '@/sys/io/color' -import { reasoningBlocksFromMetadata } from '@/sys/io/reasoning' import { startSpinner } from '@/sys/io/spinner' import { extractThinkBlocks, filterThinkInOutputs, stripThinkBlocks } from '@/sys/io/think-filter' @@ -100,13 +99,6 @@ export class StreamingStructuredStrategy implements RunStrategy { } } - // Surface separated-mode reasoning (carried in message_end metadata) to stderr under --think. - if (ctx.think) { - const reasoningBlocks = reasoningBlocksFromMetadata(processedResp.metadata) - if (reasoningBlocks !== '') - deps.io.err.write(`${reasoningBlocks}\n`) - } - const respMode = typeof processedResp.mode === 'string' && processedResp.mode !== '' ? processedResp.mode : mode deps.io.out.write(stringifyOutput(formatted({ format, data: newAppRunObject(respMode, processedResp) }))) if (isText && CHAT_MODES.has(respMode)) { diff --git a/cli/src/commands/run/app/index.ts b/cli/src/commands/run/app/index.ts index 6b63e17e81e..815d708d9d8 100644 --- a/cli/src/commands/run/app/index.ts +++ b/cli/src/commands/run/app/index.ts @@ -35,7 +35,7 @@ export default class RunApp extends DifyCommand { 'workflow-id': Flags.string({ description: 'Pin to a specific published workflow version' }), 'workspace': Flags.string({ description: 'Workspace id (overrides DIFY_WORKSPACE_ID and stored default)' }), 'stream': Flags.boolean({ description: 'Print output live as tokens/events arrive (default: collect and print at end)', default: false }), - 'think': Flags.boolean({ description: 'Show model thinking/reasoning when available — both inline ... blocks and separated reasoning streams. Hidden by default; with --think, thinking is printed to stderr.', default: false }), + 'think': Flags.boolean({ description: 'Show model thinking/reasoning when available. Strips ... blocks silently by default; with --think, thinking is printed to stderr.', default: false }), 'retry-on-limit': Flags.boolean({ description: 'On a 429 rate limit, wait and retry this POST (bounded) instead of failing immediately. Off by default since running an app is not idempotent.', default: false }), 'http-retry': httpRetryFlag, 'output': Flags.outputFormat({ options: [OutputFormat.JSON, OutputFormat.YAML, OutputFormat.TEXT], default: '' }), diff --git a/cli/src/commands/run/app/run.test.ts b/cli/src/commands/run/app/run.test.ts index a4e201c8ee1..57b02aeb47d 100644 --- a/cli/src/commands/run/app/run.test.ts +++ b/cli/src/commands/run/app/run.test.ts @@ -203,85 +203,6 @@ describe('runApp', () => { expect(io.errBuf()).toContain('secret reasoning') }) - it('--stream chat --think: routes separated reasoning to stderr, clean answer to stdout', async () => { - mock.setScenario('chat-reasoning') - const io = bufferStreams() - const cache = await loadAppInfoCache({ store: getCache(CACHE_APP_INFO) }) - await runApp( - { appId: 'app-1', message: 'hi', stream: true, think: true }, - { active: active(), http: testHttpClient(mock.url, 'dfoa_test'), host: mock.url, io, cache }, - ) - expect(io.outBuf()).toContain('final answer') - expect(io.outBuf()).not.toContain('secret reasoning') - expect(io.errBuf()).toContain('') - expect(io.errBuf()).toContain('secret reasoning') - }) - - it('--stream chat without --think: separated reasoning stays hidden', async () => { - mock.setScenario('chat-reasoning') - const io = bufferStreams() - const cache = await loadAppInfoCache({ store: getCache(CACHE_APP_INFO) }) - await runApp( - { appId: 'app-1', message: 'hi', stream: true }, - { active: active(), http: testHttpClient(mock.url, 'dfoa_test'), host: mock.url, io, cache }, - ) - expect(io.outBuf()).toContain('final answer') - expect(io.errBuf()).not.toContain('secret reasoning') - }) - - it('chat -o json --think: echoes separated reasoning to stderr, persists it in metadata', async () => { - mock.setScenario('chat-reasoning') - const io = bufferStreams() - const cache = await loadAppInfoCache({ store: getCache(CACHE_APP_INFO) }) - await runApp( - { appId: 'app-1', message: 'hi', format: 'json', think: true }, - { active: active(), http: testHttpClient(mock.url, 'dfoa_test'), host: mock.url, io, cache }, - ) - expect(io.errBuf()).toContain('secret reasoning') - const parsed = JSON.parse(io.outBuf()) as { answer: string, metadata: { reasoning: Record } } - expect(parsed.answer).toBe('final answer') - expect(parsed.metadata.reasoning).toEqual({ 'llm-1': 'secret reasoning' }) - }) - - it('--stream workflow --think: routes separated reasoning to stderr, clean outputs to stdout', async () => { - mock.setScenario('workflow-reasoning') - const io = bufferStreams() - const cache = await loadAppInfoCache({ store: getCache(CACHE_APP_INFO) }) - await runApp( - { appId: 'app-2', inputs: { x: '1' }, stream: true, think: true }, - { active: active(), http: testHttpClient(mock.url, 'dfoa_test'), host: mock.url, io, cache }, - ) - expect(io.errBuf()).toContain('') - expect(io.errBuf()).toContain('secret reasoning') - expect(io.outBuf()).toContain('final answer') - expect(io.outBuf()).not.toContain('secret reasoning') - }) - - it('workflow -o json --think: echoes reasoning to stderr, accumulates into metadata.reasoning', async () => { - mock.setScenario('workflow-reasoning') - const io = bufferStreams() - const cache = await loadAppInfoCache({ store: getCache(CACHE_APP_INFO) }) - await runApp( - { appId: 'app-2', inputs: { x: '1' }, format: 'json', think: true }, - { active: active(), http: testHttpClient(mock.url, 'dfoa_test'), host: mock.url, io, cache }, - ) - expect(io.errBuf()).toContain('secret reasoning') - const parsed = JSON.parse(io.outBuf()) as { metadata: { reasoning: Record } } - expect(parsed.metadata.reasoning).toEqual({ 'llm-1': 'secret reasoning' }) - }) - - it('--stream workflow without --think: reasoning stays hidden', async () => { - mock.setScenario('workflow-reasoning') - const io = bufferStreams() - const cache = await loadAppInfoCache({ store: getCache(CACHE_APP_INFO) }) - await runApp( - { appId: 'app-2', inputs: { x: '1' }, stream: true }, - { active: active(), http: testHttpClient(mock.url, 'dfoa_test'), host: mock.url, io, cache }, - ) - expect(io.outBuf()).toContain('final answer') - expect(io.errBuf()).not.toContain('secret reasoning') - }) - it('stream-error scenario: error event surfaces typed BaseError', async () => { mock.setScenario('stream-error') const io = bufferStreams() diff --git a/cli/src/commands/run/app/sse-collector.test.ts b/cli/src/commands/run/app/sse-collector.test.ts index d26abb5608b..e92d3694d3f 100644 --- a/cli/src/commands/run/app/sse-collector.test.ts +++ b/cli/src/commands/run/app/sse-collector.test.ts @@ -59,41 +59,6 @@ describe('collect — chat', () => { }) }) -describe('collect — chat separated reasoning', () => { - function reasoningEvent(reasoning: string, isFinal: boolean) { - return ev('reasoning_chunk', { data: { message_id: 'm1', reasoning, node_id: 'llm-1', is_final: isFinal } }) - } - - it('backfills metadata.reasoning from live deltas when the server omits it', async () => { - const got = await collect(iterOf( - reasoningEvent('pon', false), - reasoningEvent('dering', true), - ev('message', { message_id: 'm1', answer: 'answer' }), - ev('message_end', { metadata: { usage: { tokens: 3 } } }), - ), 'advanced-chat') - expect(got.answer).toBe('answer') - expect((got.metadata as { reasoning?: unknown }).reasoning).toEqual({ 'llm-1': 'pondering' }) - expect((got.metadata as { usage?: unknown }).usage).toEqual({ tokens: 3 }) - }) - - it('keeps the server-persisted reasoning over live deltas', async () => { - const got = await collect(iterOf( - reasoningEvent('live', true), - ev('message', { answer: 'a' }), - ev('message_end', { metadata: { reasoning: { 'llm-1': 'persisted' } } }), - ), 'advanced-chat') - expect((got.metadata as { reasoning?: unknown }).reasoning).toEqual({ 'llm-1': 'persisted' }) - }) - - it('leaves metadata untouched when there is no reasoning at all', async () => { - const got = await collect(iterOf( - ev('message', { answer: 'a' }), - ev('message_end', { metadata: { usage: { tokens: 1 } } }), - ), 'advanced-chat') - expect((got.metadata as { reasoning?: unknown }).reasoning).toBeUndefined() - }) -}) - describe('collect — agent-chat', () => { it('captures agent_thoughts', async () => { const got = await collect(iterOf( @@ -132,39 +97,6 @@ describe('collect — workflow', () => { }) }) -describe('collect — workflow separated reasoning', () => { - function wfReasoning(reasoning: string, nodeId: string, isFinal: boolean) { - return ev('reasoning_chunk', { data: { reasoning, node_id: nodeId, is_final: isFinal } }) - } - - it('accumulates reasoning_chunk per node into metadata.reasoning', async () => { - const got = await collect(iterOf( - ev('node_started', { id: 'llm-1' }), - wfReasoning('pon', 'llm-1', false), - wfReasoning('dering', 'llm-1', true), - ev('workflow_finished', { data: { status: 'succeeded', outputs: { result: 'clean' } } }), - ), 'workflow') - expect((got.data as { outputs: { result: string } }).outputs.result).toBe('clean') - expect((got.metadata as { reasoning?: unknown }).reasoning).toEqual({ 'llm-1': 'pondering' }) - }) - - it('keys reasoning by node, leaves metadata absent when there is none', async () => { - const got = await collect(iterOf( - ev('workflow_finished', { data: { status: 'succeeded', outputs: { result: 'clean' } } }), - ), 'workflow') - expect((got.metadata as { reasoning?: unknown } | undefined)?.reasoning).toBeUndefined() - }) - - it('merges reasoning into metadata already carried by workflow_finished', async () => { - const got = await collect(iterOf( - wfReasoning('think', 'llm-1', true), - ev('workflow_finished', { data: { status: 'succeeded' }, metadata: { usage: { tokens: 7 } } }), - ), 'workflow') - expect((got.metadata as { reasoning?: unknown }).reasoning).toEqual({ 'llm-1': 'think' }) - expect((got.metadata as { usage?: unknown }).usage).toEqual({ tokens: 7 }) - }) -}) - describe('collect — error event', () => { it('throws BaseError when error event arrives', async () => { await expect(collect(iterOf( diff --git a/cli/src/commands/run/app/sse-collector.ts b/cli/src/commands/run/app/sse-collector.ts index b3b3a33b06c..043a9690d5f 100644 --- a/cli/src/commands/run/app/sse-collector.ts +++ b/cli/src/commands/run/app/sse-collector.ts @@ -2,7 +2,6 @@ import type { BaseError } from '@/errors/base' import type { SseEvent } from '@/http/sse' import { HttpClientError, newError } from '@/errors/base' import { ErrorCode } from '@/errors/codes' -import { accumulateReasoning, parseReasoningChunk } from '@/sys/io/reasoning' import { RUN_MODES } from './handlers' export type HitlPauseData = { @@ -68,7 +67,6 @@ class ChatCollector implements Collector { private base: Record = {} private metadata: Record | undefined private thoughts: unknown[] = [] - private readonly reasoning: Record = {} private readonly mode: string private readonly isAgent: boolean constructor(mode: string, isAgent: boolean) { @@ -86,13 +84,6 @@ class ChatCollector implements Collector { copyScalar(this.base, c, ['id', 'conversation_id', 'message_id', 'task_id', 'created_at']) return } - // Accumulate separated-mode reasoning deltas per LLM node. - case 'reasoning_chunk': { - const chunk = parseReasoningChunk(c) - if (chunk !== undefined) - accumulateReasoning(this.reasoning, chunk) - return - } case 'agent_thought': this.thoughts.push(c) return @@ -107,23 +98,12 @@ class ChatCollector implements Collector { const out: Record = { mode: this.mode, answer: this.answer, ...this.base } if (this.metadata !== undefined) out.metadata = this.metadata - // Fall back to live deltas only when the server didn't persist reasoning in metadata. - if (Object.keys(this.reasoning).length > 0 && !hasReasoning(this.metadata)) - out.metadata = { ...(this.metadata ?? {}), reasoning: this.reasoning } if (this.isAgent || this.thoughts.length > 0) out.agent_thoughts = this.thoughts return out } } -function hasReasoning(metadata: Record | undefined): boolean { - const reasoning = metadata?.reasoning - return reasoning !== null - && typeof reasoning === 'object' - && !Array.isArray(reasoning) - && Object.keys(reasoning as object).length > 0 -} - class CompletionCollector implements Collector { private answer = '' private base: Record = {} @@ -153,29 +133,14 @@ class CompletionCollector implements Collector { class WorkflowCollector implements Collector { private final: Record | undefined - private readonly reasoning: Record = {} consume(ev: SseEvent): void { - if (ev.name === 'reasoning_chunk') { - const chunk = parseReasoningChunk(parseJson(ev.data)) - if (chunk !== undefined) - accumulateReasoning(this.reasoning, chunk) - return - } if (ev.name !== 'workflow_finished') return this.final = parseJson(ev.data) } finalize(): Record { - const out: Record = { mode: RUN_MODES.Workflow, ...(this.final ?? {}) } - // Workflow runs don't persist reasoning; surface live deltas under metadata.reasoning. - if (Object.keys(this.reasoning).length > 0) { - const existing = (out.metadata !== null && typeof out.metadata === 'object' && !Array.isArray(out.metadata)) - ? out.metadata as Record - : undefined - out.metadata = { ...(existing ?? {}), reasoning: this.reasoning } - } - return out + return { mode: RUN_MODES.Workflow, ...(this.final ?? {}) } } } diff --git a/cli/src/commands/run/app/stream-handlers.test.ts b/cli/src/commands/run/app/stream-handlers.test.ts index ec9cd957bb5..bab45144a01 100644 --- a/cli/src/commands/run/app/stream-handlers.test.ts +++ b/cli/src/commands/run/app/stream-handlers.test.ts @@ -37,42 +37,6 @@ describe('streamPrinterFor — chat', () => { }) }) -function reasoningEvent(reasoning: string, isFinal: boolean) { - return ev('reasoning_chunk', { data: { message_id: 'm1', reasoning, node_id: 'llm-1', is_final: isFinal } }) -} - -describe('streamPrinterFor — chat separated reasoning', () => { - it('think: true frames reasoning_chunk deltas to stderr, answer stays clean on stdout', () => { - const sp = streamPrinterFor('advanced-chat', true) - const cap = captures() - sp.onEvent(cap.out, cap.err, reasoningEvent('pon', false)) - sp.onEvent(cap.out, cap.err, reasoningEvent('dering', false)) - sp.onEvent(cap.out, cap.err, reasoningEvent('', true)) - sp.onEvent(cap.out, cap.err, ev('message', { conversation_id: 'c1', answer: 'final answer' })) - sp.onEnd(cap.out, cap.err) - expect(cap.errBuf()).toContain(' [llm-1]\npondering') - expect(cap.outBuf()).toBe('final answer\n') - }) - - it('think: false ignores reasoning_chunk entirely', () => { - const sp = streamPrinterFor('advanced-chat', false) - const cap = captures() - sp.onEvent(cap.out, cap.err, reasoningEvent('secret', true)) - sp.onEvent(cap.out, cap.err, ev('message', { answer: 'hi' })) - sp.onEnd(cap.out, cap.err) - expect(cap.errBuf()).not.toContain('secret') - expect(cap.outBuf()).toBe('hi\n') - }) - - it('closes an unterminated reasoning block on stream end', () => { - const sp = streamPrinterFor('advanced-chat', true) - const cap = captures() - sp.onEvent(cap.out, cap.err, reasoningEvent('thinking', false)) - sp.onEnd(cap.out, cap.err) - expect(cap.errBuf()).toContain(' [llm-1]\nthinking') - }) -}) - describe('streamPrinterFor — agent-chat', () => { it('writes agent_thought to stderr', () => { const sp = streamPrinterFor('agent-chat') @@ -141,62 +105,6 @@ describe('streamPrinterFor — workflow think filtering', () => { }) }) -// Workflow reasoning_chunk events carry no message_id (workflow runs have no message). -function wfReasoning(reasoning: string, nodeId: string, isFinal: boolean) { - return ev('reasoning_chunk', { data: { reasoning, node_id: nodeId, is_final: isFinal } }) -} - -describe('streamPrinterFor — workflow separated reasoning', () => { - it('think: true frames reasoning_chunk to stderr, outputs stay clean on stdout', () => { - const sp = streamPrinterFor('workflow', true) - const cap = captures() - sp.onEvent(cap.out, cap.err, ev('node_started', { id: 'llm-1', title: 'LLM' })) - sp.onEvent(cap.out, cap.err, wfReasoning('pon', 'llm-1', false)) - sp.onEvent(cap.out, cap.err, wfReasoning('dering', 'llm-1', false)) - sp.onEvent(cap.out, cap.err, wfReasoning('', 'llm-1', true)) - sp.onEvent(cap.out, cap.err, ev('workflow_finished', { data: { outputs: { result: 'clean answer' } } })) - sp.onEnd(cap.out, cap.err) - // node title precedes the reasoning block for attribution - expect(cap.errBuf()).toContain('→ LLM') - expect(cap.errBuf()).toContain(' [llm-1]\npondering') - const parsed = JSON.parse(cap.outBuf().trim()) as { result: string } - expect(parsed.result).toBe('clean answer') - }) - - it('think: false drops reasoning_chunk entirely', () => { - const sp = streamPrinterFor('workflow', false) - const cap = captures() - sp.onEvent(cap.out, cap.err, wfReasoning('secret', 'llm-1', true)) - sp.onEvent(cap.out, cap.err, ev('workflow_finished', { data: { outputs: { result: 'ok' } } })) - sp.onEnd(cap.out, cap.err) - expect(cap.errBuf()).not.toContain('secret') - const parsed = JSON.parse(cap.outBuf().trim()) as { result: string } - expect(parsed.result).toBe('ok') - }) - - it('closes an unterminated reasoning block on stream end', () => { - const sp = streamPrinterFor('workflow', true) - const cap = captures() - sp.onEvent(cap.out, cap.err, wfReasoning('thinking', 'llm-1', false)) - sp.onEnd(cap.out, cap.err) - expect(cap.errBuf()).toContain(' [llm-1]\nthinking') - }) - - it('keeps interleaved parallel-node reasoning in separate node-tagged blocks', () => { - const sp = streamPrinterFor('workflow', true) - const cap = captures() - sp.onEvent(cap.out, cap.err, wfReasoning('a1', 'llm-1', false)) - sp.onEvent(cap.out, cap.err, wfReasoning('b1', 'llm-2', false)) - sp.onEvent(cap.out, cap.err, wfReasoning('a2', 'llm-1', true)) - sp.onEvent(cap.out, cap.err, wfReasoning('b2', 'llm-2', true)) - sp.onEvent(cap.out, cap.err, ev('workflow_finished', { data: { outputs: { result: 'ok' } } })) - sp.onEnd(cap.out, cap.err) - expect(cap.errBuf()).toBe( - ' [llm-1]\na1\n [llm-2]\nb1\n [llm-1]\na2\n [llm-2]\nb2\n', - ) - }) -}) - describe('streamPrinterFor — unknown mode', () => { it('throws', () => { expect(() => streamPrinterFor('whatever')).toThrow() diff --git a/cli/src/commands/run/app/stream-handlers.ts b/cli/src/commands/run/app/stream-handlers.ts index 7ce96cf53c6..1955d5996bf 100644 --- a/cli/src/commands/run/app/stream-handlers.ts +++ b/cli/src/commands/run/app/stream-handlers.ts @@ -4,7 +4,6 @@ import type { SseEvent } from '@/http/sse' import { newError } from '@/errors/base' import { ErrorCode } from '@/errors/codes' import { colorEnabled, colorScheme } from '@/sys/io/color' -import { parseReasoningChunk, ReasoningChunkRenderer } from '@/sys/io/reasoning' import { filterThinkInOutputs, ThinkChunkFilter } from '@/sys/io/think-filter' import { RUN_MODES } from './handlers' import { HitlPauseError } from './sse-collector' @@ -44,12 +43,9 @@ function handleCommonEvents(ev: SseEvent): boolean { class ChatStreamPrinter implements StreamPrinter { private convoId = '' private readonly filter: ThinkChunkFilter - private readonly reasoning = new ReasoningChunkRenderer() - private readonly think: boolean private readonly isTTY: boolean constructor(think: boolean, isTTY = false) { this.filter = new ThinkChunkFilter(think) - this.think = think this.isTTY = isTTY } @@ -66,15 +62,6 @@ class ChatStreamPrinter implements StreamPrinter { this.convoId = c.conversation_id return } - // Stream separated-mode reasoning to stderr under --think. - case 'reasoning_chunk': { - if (!this.think) - return - const chunk = parseReasoningChunk(c) - if (chunk !== undefined) - this.reasoning.push(chunk, errOut) - return - } case 'agent_thought': if (typeof c.thought === 'string' && c.thought !== '') errOut.write(`thought: ${c.thought}\n`) @@ -86,7 +73,6 @@ class ChatStreamPrinter implements StreamPrinter { } onEnd(out: NodeJS.WritableStream, errOut: NodeJS.WritableStream): void { - this.reasoning.flush(errOut) this.filter.flush(out, errOut) out.write('\n') if (this.convoId !== '') { @@ -120,7 +106,6 @@ class CompletionStreamPrinter implements StreamPrinter { class WorkflowStreamPrinter implements StreamPrinter { private final: Record | undefined - private readonly reasoning = new ReasoningChunkRenderer() private readonly think: boolean constructor(think: boolean) { this.think = think @@ -139,15 +124,6 @@ class WorkflowStreamPrinter implements StreamPrinter { errOut.write(`→ ${title}\n`) return } - // Stream separated-mode reasoning to stderr under --think; the prior → title attributes the node. - case 'reasoning_chunk': { - if (!this.think) - return - const chunk = parseReasoningChunk(c) - if (chunk !== undefined) - this.reasoning.push(chunk, errOut) - return - } case 'node_finished': { const status = typeof c.status === 'string' ? c.status : '' if (status !== '' && status !== 'succeeded') { @@ -162,7 +138,6 @@ class WorkflowStreamPrinter implements StreamPrinter { } onEnd(out: NodeJS.WritableStream, errOut: NodeJS.WritableStream): void { - this.reasoning.flush(errOut) if (this.final === undefined) return const data = this.final.data diff --git a/cli/test/e2e/.env.e2e.example b/cli/test/e2e/.env.e2e.example index 85f2219daeb..53fbedadf31 100644 --- a/cli/test/e2e/.env.e2e.example +++ b/cli/test/e2e/.env.e2e.example @@ -67,15 +67,3 @@ DIFY_E2E_PASSWORD= # DIFY_E2E_HITL_SINGLE_ACTION_APP_ID= # DIFY_E2E_HITL_MULTI_NODE_APP_ID= # DIFY_E2E_WS2_APP_ID= - -# ── Separated-mode reasoning suite (opt-in) ───────────────────────────────── -# run-app-reasoning.e2e.ts is skipped unless DIFY_E2E_REASONING_APP_ID resolves. -# It needs a chatflow whose LLM node uses reasoning_format=separated AND a -# workspace with a default chat model configured. -# -# Either point at an existing app: -# DIFY_E2E_REASONING_APP_ID= -# -# …or auto-provision reasoning-chat.yml (→ app name "reasoning-bot"). Off by -# default so the shared bootstrap stays free of any model dependency. -# DIFY_E2E_REASONING_PROVISION=1 diff --git a/cli/test/e2e/README.md b/cli/test/e2e/README.md index 21635b0f6f9..fd3e9182507 100644 --- a/cli/test/e2e/README.md +++ b/cli/test/e2e/README.md @@ -39,12 +39,11 @@ test/e2e/ │ ├── describe-app.e2e.ts — describe app │ └── get-app-all-workspaces.e2e.ts — get app -A ([EE] multi-workspace cases) └── run/ - ├── run-app-basic.e2e.ts — basic run, -o json, --inputs, streaming, - │ conversation, CI mode - ├── run-app-streaming.e2e.ts — Ctrl+C / error-event / chunk timing - ├── run-app-file.e2e.ts — --file upload (local + remote URL) - ├── run-app-reasoning.e2e.ts — separated-mode reasoning (--think); opt-in - └── run-app-hitl.e2e.ts — HITL pause + resume + ├── run-app-basic.e2e.ts — basic run, -o json, --inputs, streaming, + │ conversation, CI mode + ├── run-app-streaming.e2e.ts — Ctrl+C / error-event / chunk timing + ├── run-app-file.e2e.ts — --file upload (local + remote URL) + └── run-app-hitl.e2e.ts — HITL pause + resume ``` ## Edition support @@ -138,24 +137,6 @@ global-setup will: | `DIFY_E2E_HITL_SINGLE_ACTION_APP_ID` | | | `DIFY_E2E_HITL_MULTI_NODE_APP_ID` | | | `DIFY_E2E_WS2_APP_ID` | Override secondary workspace app ID (EE) | -| `DIFY_E2E_REASONING_APP_ID` | separated-reasoning chatflow app ID (opt-in) | -| `DIFY_E2E_REASONING_PROVISION` | `1` → auto-provision `reasoning-chat.yml` | - -### Separated-mode reasoning suite (opt-in) - -`run-app-reasoning.e2e.ts` verifies the out-of-band `reasoning_chunk` channel -(PR #37460): `--think` surfaces the chain-of-thought to stderr framed as -``, the answer stays clean, and `-o json` persists it under -`metadata.reasoning`. It is **skipped** unless `DIFY_E2E_REASONING_APP_ID` -resolves, because it runs a real LLM node and needs: - -1. a chatflow whose LLM node uses `reasoning_format: separated`, and -1. a workspace with a default chat model configured. - -Point `DIFY_E2E_REASONING_APP_ID` at such an app, or set -`DIFY_E2E_REASONING_PROVISION=1` to import the `reasoning-chat.yml` fixture -(its system prompt forces a `` block, so any chat model triggers the -separated path — no dedicated reasoning model required). ## Running tests diff --git a/cli/test/e2e/setup/env.ts b/cli/test/e2e/setup/env.ts index 33f8361d46b..3f71b514426 100644 --- a/cli/test/e2e/setup/env.ts +++ b/cli/test/e2e/setup/env.ts @@ -37,9 +37,6 @@ * DIFY_E2E_HITL_EXTERNAL_APP_ID * DIFY_E2E_HITL_SINGLE_ACTION_APP_ID * DIFY_E2E_HITL_MULTI_NODE_APP_ID - * DIFY_E2E_REASONING_APP_ID Override separated-reasoning chatflow app ID - * DIFY_E2E_REASONING_PROVISION=1 Opt in to auto-provisioning reasoning-chat.yml - * (needs a workspace default chat model) */ /** Supported edition values. */ @@ -77,12 +74,6 @@ export type E2EEnv = { fileAppId: string /** Chat app (advanced-chat) with a file input variable */ fileChatAppId: string - /** - * Chatflow whose LLM node uses reasoning_format=separated. Empty unless - * DIFY_E2E_REASONING_APP_ID is set or the fixture is auto-provisioned; the - * run-app-reasoning suite is skipped when empty. - */ - reasoningAppId: string /** * Secondary workspace ID — EE only ("auto_test1"). * Empty in CE mode (CE has a single workspace). @@ -127,7 +118,6 @@ export type E2ECapabilities = { workflowAppId: string fileAppId: string fileChatAppId: string - reasoningAppId: string hitlAppId: string hitlExternalAppId: string hitlSingleActionAppId: string @@ -181,7 +171,6 @@ export function loadE2EEnv(): E2EEnv { hitlMultiNodeAppId: process.env.DIFY_E2E_HITL_MULTI_NODE_APP_ID ?? '', fileAppId: process.env.DIFY_E2E_FILE_APP_ID ?? '', fileChatAppId: process.env.DIFY_E2E_FILE_CHAT_APP_ID ?? '', - reasoningAppId: process.env.DIFY_E2E_REASONING_APP_ID ?? '', ws2Id: process.env.DIFY_E2E_WS2_ID ?? '', ws2AppId: process.env.DIFY_E2E_WS2_APP_ID ?? '', email: process.env.DIFY_E2E_EMAIL!, @@ -217,7 +206,6 @@ export function resolveEnv(caps: E2ECapabilities | undefined): E2EEnv { workflowAppId: caps.workflowAppId || env.workflowAppId, fileAppId: caps.fileAppId || env.fileAppId, fileChatAppId: caps.fileChatAppId || env.fileChatAppId, - reasoningAppId: caps.reasoningAppId || env.reasoningAppId, hitlAppId: caps.hitlAppId || env.hitlAppId, hitlExternalAppId: caps.hitlExternalAppId || env.hitlExternalAppId, hitlSingleActionAppId: caps.hitlSingleActionAppId || env.hitlSingleActionAppId, diff --git a/cli/test/e2e/setup/global-setup.ts b/cli/test/e2e/setup/global-setup.ts index 92cc29fc1f2..20b23295acb 100644 --- a/cli/test/e2e/setup/global-setup.ts +++ b/cli/test/e2e/setup/global-setup.ts @@ -182,7 +182,6 @@ export async function setup(project: TestProject): Promise { workflowAppId: '', fileAppId: '', fileChatAppId: '', - reasoningAppId: '', hitlAppId: '', hitlExternalAppId: '', hitlSingleActionAppId: '', @@ -289,7 +288,6 @@ export async function setup(project: TestProject): Promise { workflowAppId: provisionedIds.DIFY_E2E_WORKFLOW_APP_ID || E.workflowAppId, fileAppId: provisionedIds.DIFY_E2E_FILE_APP_ID || E.fileAppId, fileChatAppId: provisionedIds.DIFY_E2E_FILE_CHAT_APP_ID || E.fileChatAppId, - reasoningAppId: provisionedIds.DIFY_E2E_REASONING_APP_ID || E.reasoningAppId, hitlAppId: provisionedIds.DIFY_E2E_HITL_APP_ID || E.hitlAppId, hitlExternalAppId: provisionedIds.DIFY_E2E_HITL_EXTERNAL_APP_ID || E.hitlExternalAppId, hitlSingleActionAppId: provisionedIds.DIFY_E2E_HITL_SINGLE_ACTION_APP_ID || E.hitlSingleActionAppId, @@ -505,12 +503,6 @@ async function provisionApps( ['hitl-single-action.yml', 'DIFY_E2E_HITL_SINGLE_ACTION_APP_ID', primaryWsId], ['hitl-multi-node.yml', 'DIFY_E2E_HITL_MULTI_NODE_APP_ID', primaryWsId], ['file-chat.yml', 'DIFY_E2E_FILE_CHAT_APP_ID', primaryWsId], - // reasoning-chat.yml runs a real LLM node, so it is opt-in: provisioning it - // requires the workspace to have a default chat model configured. Off by - // default to keep the shared bootstrap free of any model dependency. - ...(process.env.DIFY_E2E_REASONING_PROVISION === '1' - ? [['reasoning-chat.yml', 'DIFY_E2E_REASONING_APP_ID', primaryWsId] as [string, string, string]] - : []), ...(edition === 'ee' ? [['ws2-workflow.yml', 'DIFY_E2E_WS2_APP_ID', secondaryWsId] as [string, string, string]] : []), diff --git a/cli/test/fixtures/dify-mock/scenarios.ts b/cli/test/fixtures/dify-mock/scenarios.ts index 2e0b74d3d54..221ccbb6b81 100644 --- a/cli/test/fixtures/dify-mock/scenarios.ts +++ b/cli/test/fixtures/dify-mock/scenarios.ts @@ -15,8 +15,6 @@ export type Scenario | 'server-version-unsupported' | 'run-422-stale' | 'workflow-think' - | 'chat-reasoning' - | 'workflow-reasoning' | 'import-pending' | 'import-failed' diff --git a/cli/test/fixtures/dify-mock/server.ts b/cli/test/fixtures/dify-mock/server.ts index 766963cd0d5..4b119286dbc 100644 --- a/cli/test/fixtures/dify-mock/server.ts +++ b/cli/test/fixtures/dify-mock/server.ts @@ -370,32 +370,6 @@ export function buildApp(getScenario: () => Scenario, state?: MockState): Hono { ]) return new Response(thinkSse, { status: 200, headers: { 'content-type': 'text/event-stream' } }) } - if (scenario === 'chat-reasoning') { - // Separated mode: reasoning streams out-of-band on `reasoning_chunk` (nested - // under `data`), the answer stays free of , and the terminal reasoning - // is persisted into message_end metadata. - const reasoningSse = sseChunks([ - { event: 'reasoning_chunk', data: { data: { message_id: 'msg-1', reasoning: 'secret reasoning', node_id: 'llm-1', is_final: false } } }, - { event: 'reasoning_chunk', data: { data: { message_id: 'msg-1', reasoning: '', node_id: 'llm-1', is_final: true } } }, - { event: 'message', data: { message_id: 'msg-1', conversation_id: 'conv-1', mode: app.mode, answer: 'final answer' } }, - { event: 'message_end', data: { message_id: 'msg-1', conversation_id: 'conv-1', task_id: 'task-1', metadata: { reasoning: { 'llm-1': 'secret reasoning' } } } }, - ]) - return new Response(reasoningSse, { status: 200, headers: { 'content-type': 'text/event-stream' } }) - } - if (scenario === 'workflow-reasoning') { - // Separated mode in a workflow: reasoning streams out-of-band on - // `reasoning_chunk` (no message_id), outputs stay clean, and there is NO - // persisted metadata — the live deltas are the only source. - const wfReasoningSse = sseChunks([ - { event: 'workflow_started', data: { id: 'wf-run-1', workflow_id: 'wf-1' } }, - { event: 'node_started', data: { id: 'llm-1', title: 'LLM' } }, - { event: 'reasoning_chunk', data: { data: { reasoning: 'secret reasoning', node_id: 'llm-1', is_final: false } } }, - { event: 'reasoning_chunk', data: { data: { reasoning: '', node_id: 'llm-1', is_final: true } } }, - { event: 'node_finished', data: { id: 'llm-1', status: 'succeeded' } }, - { event: 'workflow_finished', data: { id: 'wf-run-1', workflow_id: 'wf-1', data: { id: 'wf-run-1', status: 'succeeded', outputs: { result: 'final answer' } } } }, - ]) - return new Response(wfReasoningSse, { status: 200, headers: { 'content-type': 'text/event-stream' } }) - } const sse = streamingRunResponse(app.mode, query, isAgent) return new Response(sse, { status: 200, headers: { 'content-type': 'text/event-stream' } }) }) diff --git a/web/app/components/workflow-app/hooks/__tests__/use-workflow-run-callbacks.spec.ts b/web/app/components/workflow-app/hooks/__tests__/use-workflow-run-callbacks.spec.ts index 000c8fa532f..3756381822e 100644 --- a/web/app/components/workflow-app/hooks/__tests__/use-workflow-run-callbacks.spec.ts +++ b/web/app/components/workflow-app/hooks/__tests__/use-workflow-run-callbacks.spec.ts @@ -40,7 +40,6 @@ const createHandlers = () => ({ handleWorkflowAgentLog: vi.fn(), handleWorkflowTextChunk: vi.fn(), handleWorkflowTextReplace: vi.fn(), - handleWorkflowReasoning: vi.fn(), handleWorkflowPaused: vi.fn(), }) @@ -253,7 +252,6 @@ describe('useWorkflowRun callbacks helpers', () => { callbacks.onAgentLog?.({ node_id: 'node-1' } as never) callbacks.onTextChunk?.({ data: 'chunk' } as never) callbacks.onTextReplace?.({ text: 'replacement' } as never) - callbacks.onReasoning?.({ data: { reasoning: 'thinking', node_id: 'node-1' } } as never) callbacks.onHumanInputRequired?.({ node_id: 'node-1' } as never) callbacks.onHumanInputFormFilled?.({ node_id: 'node-1' } as never) callbacks.onHumanInputFormTimeout?.({ node_id: 'node-1' } as never) @@ -297,7 +295,6 @@ describe('useWorkflowRun callbacks helpers', () => { expect(userCallbacks.onAgentLog).toHaveBeenCalled() expect(handlers.handleWorkflowTextChunk).toHaveBeenCalled() expect(handlers.handleWorkflowTextReplace).toHaveBeenCalled() - expect(handlers.handleWorkflowReasoning).toHaveBeenCalled() expect(handlers.handleWorkflowNodeHumanInputRequired).toHaveBeenCalled() expect(userCallbacks.onHumanInputRequired).toHaveBeenCalled() expect(handlers.handleWorkflowNodeHumanInputFormFilled).toHaveBeenCalled() @@ -426,7 +423,6 @@ describe('useWorkflowRun callbacks helpers', () => { finalCallbacks.onAgentLog?.({ node_id: 'node-1' } as never) finalCallbacks.onTextChunk?.({ data: 'chunk' } as never) finalCallbacks.onTextReplace?.({ text: 'replacement' } as never) - finalCallbacks.onReasoning?.({ data: { reasoning: 'thinking', node_id: 'node-1' } } as never) finalCallbacks.onHumanInputRequired?.({ node_id: 'node-1' } as never) finalCallbacks.onHumanInputFormFilled?.({ node_id: 'node-1' } as never) finalCallbacks.onHumanInputFormTimeout?.({ node_id: 'node-1' } as never) @@ -465,7 +461,6 @@ describe('useWorkflowRun callbacks helpers', () => { expect(userCallbacks.onAgentLog).toHaveBeenCalled() expect(handlers.handleWorkflowTextChunk).toHaveBeenCalled() expect(handlers.handleWorkflowTextReplace).toHaveBeenCalled() - expect(handlers.handleWorkflowReasoning).toHaveBeenCalled() expect(handlers.handleWorkflowNodeHumanInputRequired).toHaveBeenCalled() expect(userCallbacks.onHumanInputRequired).toHaveBeenCalled() expect(handlers.handleWorkflowNodeHumanInputFormFilled).toHaveBeenCalled() diff --git a/web/app/components/workflow-app/hooks/use-workflow-run-callbacks.ts b/web/app/components/workflow-app/hooks/use-workflow-run-callbacks.ts index 3b1c30b385c..e8820f755c5 100644 --- a/web/app/components/workflow-app/hooks/use-workflow-run-callbacks.ts +++ b/web/app/components/workflow-app/hooks/use-workflow-run-callbacks.ts @@ -27,7 +27,6 @@ type WorkflowRunEventHandlers = { handleWorkflowAgentLog: NonNullable handleWorkflowTextChunk: NonNullable handleWorkflowTextReplace: NonNullable - handleWorkflowReasoning: NonNullable handleWorkflowPaused: () => void } @@ -115,7 +114,6 @@ export const createBaseWorkflowRunCallbacks = ({ handleWorkflowAgentLog, handleWorkflowTextChunk, handleWorkflowTextReplace, - handleWorkflowReasoning, handleWorkflowPaused, } = handlers const { @@ -246,9 +244,6 @@ export const createBaseWorkflowRunCallbacks = ({ onTextReplace: (params) => { handleWorkflowTextReplace(params) }, - onReasoning: (params) => { - handleWorkflowReasoning(params) - }, onTTSChunk: (messageId: string, audio: string) => { if (!audio || audio === '') return @@ -330,7 +325,6 @@ export const createFinalWorkflowRunCallbacks = ({ handleWorkflowAgentLog, handleWorkflowTextChunk, handleWorkflowTextReplace, - handleWorkflowReasoning, handleWorkflowPaused, } = handlers const { @@ -445,9 +439,6 @@ export const createFinalWorkflowRunCallbacks = ({ onTextReplace: (params) => { handleWorkflowTextReplace(params) }, - onReasoning: (params) => { - handleWorkflowReasoning(params) - }, onTTSChunk: (messageId: string, audio: string) => { if (!audio || audio === '') return diff --git a/web/app/components/workflow-app/hooks/use-workflow-run-utils.ts b/web/app/components/workflow-app/hooks/use-workflow-run-utils.ts index 80a09f1c67c..eae758cf2f8 100644 --- a/web/app/components/workflow-app/hooks/use-workflow-run-utils.ts +++ b/web/app/components/workflow-app/hooks/use-workflow-run-utils.ts @@ -78,8 +78,6 @@ export const createRunningWorkflowState = () => { }, tracing: [], resultText: '', - reasoningContent: {}, - reasoningFinished: false, } } diff --git a/web/app/components/workflow-app/hooks/use-workflow-run.ts b/web/app/components/workflow-app/hooks/use-workflow-run.ts index 7254c646ee9..d7a90e36605 100644 --- a/web/app/components/workflow-app/hooks/use-workflow-run.ts +++ b/web/app/components/workflow-app/hooks/use-workflow-run.ts @@ -138,7 +138,6 @@ const useWorkflowRunBase = (doSyncWorkflowDraft: DoSyncWorkflowDraft) => { handleWorkflowAgentLog, handleWorkflowTextChunk, handleWorkflowTextReplace, - handleWorkflowReasoning, handleWorkflowPaused, } = useWorkflowRunEvent() @@ -327,7 +326,6 @@ const useWorkflowRunBase = (doSyncWorkflowDraft: DoSyncWorkflowDraft) => { handleWorkflowAgentLog, handleWorkflowTextChunk, handleWorkflowTextReplace, - handleWorkflowReasoning, handleWorkflowPaused, } const userCallbacks = { @@ -445,7 +443,7 @@ const useWorkflowRunBase = (doSyncWorkflowDraft: DoSyncWorkflowDraft) => { }, finalCallbacks, ) - }, [store, doSyncWorkflowDraft, workflowStore, pathname, handleWorkflowFailed, flowId, handleWorkflowStarted, handleWorkflowFinished, fetchInspectVars, invalidAllLastRun, invalidateRunHistory, handleWorkflowNodeStarted, handleWorkflowNodeFinished, handleWorkflowNodeIterationStarted, handleWorkflowNodeIterationNext, handleWorkflowNodeIterationFinished, handleWorkflowNodeLoopStarted, handleWorkflowNodeLoopNext, handleWorkflowNodeLoopFinished, handleWorkflowNodeRetry, handleWorkflowAgentLog, handleWorkflowTextChunk, handleWorkflowTextReplace, handleWorkflowReasoning, handleWorkflowPaused, handleWorkflowNodeHumanInputRequired, handleWorkflowNodeHumanInputFormFilled, handleWorkflowNodeHumanInputFormTimeout]) + }, [store, doSyncWorkflowDraft, workflowStore, pathname, handleWorkflowFailed, flowId, handleWorkflowStarted, handleWorkflowFinished, fetchInspectVars, invalidAllLastRun, invalidateRunHistory, handleWorkflowNodeStarted, handleWorkflowNodeFinished, handleWorkflowNodeIterationStarted, handleWorkflowNodeIterationNext, handleWorkflowNodeIterationFinished, handleWorkflowNodeLoopStarted, handleWorkflowNodeLoopNext, handleWorkflowNodeLoopFinished, handleWorkflowNodeRetry, handleWorkflowAgentLog, handleWorkflowTextChunk, handleWorkflowTextReplace, handleWorkflowPaused, handleWorkflowNodeHumanInputRequired, handleWorkflowNodeHumanInputFormFilled, handleWorkflowNodeHumanInputFormTimeout]) const handleStopRun = useCallback((taskId: string) => { const setStoppedState = () => { diff --git a/web/app/components/workflow/hooks/use-workflow-run-event/__tests__/use-workflow-run-event.spec.ts b/web/app/components/workflow/hooks/use-workflow-run-event/__tests__/use-workflow-run-event.spec.ts index 6010c791702..fb8ea51638a 100644 --- a/web/app/components/workflow/hooks/use-workflow-run-event/__tests__/use-workflow-run-event.spec.ts +++ b/web/app/components/workflow/hooks/use-workflow-run-event/__tests__/use-workflow-run-event.spec.ts @@ -16,7 +16,6 @@ const handlers = vi.hoisted(() => ({ handleWorkflowNodeRetry: vi.fn(), handleWorkflowTextChunk: vi.fn(), handleWorkflowTextReplace: vi.fn(), - handleWorkflowReasoning: vi.fn(), handleWorkflowAgentLog: vi.fn(), handleWorkflowPaused: vi.fn(), handleWorkflowNodeHumanInputRequired: vi.fn(), @@ -46,10 +45,6 @@ vi.mock('..', () => ({ useWorkflowNodeHumanInputFormTimeout: () => ({ handleWorkflowNodeHumanInputFormTimeout: handlers.handleWorkflowNodeHumanInputFormTimeout }), })) -vi.mock('../use-workflow-reasoning', () => ({ - useWorkflowReasoning: () => ({ handleWorkflowReasoning: handlers.handleWorkflowReasoning }), -})) - describe('useWorkflowRunEvent', () => { it('returns the composed handlers from all workflow event hooks', () => { const { result } = renderHook(() => useWorkflowRunEvent()) diff --git a/web/app/components/workflow/hooks/use-workflow-run-event/use-workflow-run-event.ts b/web/app/components/workflow/hooks/use-workflow-run-event/use-workflow-run-event.ts index 2366fdd9684..bf8fd319a2d 100644 --- a/web/app/components/workflow/hooks/use-workflow-run-event/use-workflow-run-event.ts +++ b/web/app/components/workflow/hooks/use-workflow-run-event/use-workflow-run-event.ts @@ -19,7 +19,6 @@ import { useWorkflowTextChunk, useWorkflowTextReplace, } from '.' -import { useWorkflowReasoning } from './use-workflow-reasoning' export const useWorkflowRunEvent = () => { const { handleWorkflowStarted } = useWorkflowStarted() @@ -36,7 +35,6 @@ export const useWorkflowRunEvent = () => { const { handleWorkflowNodeRetry } = useWorkflowNodeRetry() const { handleWorkflowTextChunk } = useWorkflowTextChunk() const { handleWorkflowTextReplace } = useWorkflowTextReplace() - const { handleWorkflowReasoning } = useWorkflowReasoning() const { handleWorkflowAgentLog } = useWorkflowAgentLog() const { handleWorkflowPaused } = useWorkflowPaused() const { handleWorkflowNodeHumanInputRequired } = useWorkflowNodeHumanInputRequired() @@ -58,7 +56,6 @@ export const useWorkflowRunEvent = () => { handleWorkflowNodeRetry, handleWorkflowTextChunk, handleWorkflowTextReplace, - handleWorkflowReasoning, handleWorkflowAgentLog, handleWorkflowPaused, handleWorkflowNodeHumanInputFormFilled, diff --git a/web/app/components/workflow/panel/__tests__/workflow-preview.spec.tsx b/web/app/components/workflow/panel/__tests__/workflow-preview.spec.tsx index 93d51acd78d..a3d629d1708 100644 --- a/web/app/components/workflow/panel/__tests__/workflow-preview.spec.tsx +++ b/web/app/components/workflow/panel/__tests__/workflow-preview.spec.tsx @@ -71,12 +71,6 @@ vi.mock('@/app/components/workflow/run/tracing-panel', () => ({ default: ({ list }: { list: unknown[] }) =>
{list.length}
, })) -vi.mock('@/app/components/base/chat/chat/answer/reasoning-panel', () => ({ - default: ({ content, done }: { content: Record, done: boolean }) => ( -
{Object.keys(content).join(',')}
- ), -})) - vi.mock('@/app/components/workflow/panel/inputs-panel', () => ({ default: ({ onRun }: { onRun: () => void }) => (