mirror of
https://github.com/langgenius/dify.git
synced 2026-06-23 20:41:17 +08:00
fix(cli): apply --think filtering to workflow app outputs (#37736)
This commit is contained in:
parent
8f6b57fe24
commit
7aa20d6d94
@ -1,14 +1,14 @@
|
||||
import type { RunContext, RunStrategy } from './index'
|
||||
import type { SseEvent } from '@/http/sse'
|
||||
import { buildRunBody } from '@/api/app-run'
|
||||
import { CHAT_MODES, chatConversationHint, newAppRunObject } from '@/commands/run/app/handlers'
|
||||
import { CHAT_MODES, chatConversationHint, newAppRunObject, RUN_MODES } from '@/commands/run/app/handlers'
|
||||
import { renderHitlHint, renderHitlOutput } from '@/commands/run/app/hitl-render'
|
||||
import { collect, HitlPauseError } from '@/commands/run/app/sse-collector'
|
||||
import { formatted, stringifyOutput } from '@/framework/output'
|
||||
import { handle, unhandle } from '@/sys/index'
|
||||
import { colorEnabled, colorScheme } from '@/sys/io/color'
|
||||
import { startSpinner } from '@/sys/io/spinner'
|
||||
import { extractThinkBlocks, stripThinkBlocks } from '@/sys/io/think-filter'
|
||||
import { extractThinkBlocks, filterThinkInOutputs, stripThinkBlocks } from '@/sys/io/think-filter'
|
||||
|
||||
async function* captureTaskId(
|
||||
iter: AsyncIterable<SseEvent>,
|
||||
@ -86,6 +86,18 @@ export class StreamingStructuredStrategy implements RunStrategy {
|
||||
processedResp = { ...processedResp, answer: stripThinkBlocks(processedResp.answer) }
|
||||
}
|
||||
}
|
||||
else if (mode === RUN_MODES.Workflow) {
|
||||
const data = processedResp.data
|
||||
if (data !== null && typeof data === 'object' && 'outputs' in data) {
|
||||
const raw = (data as { outputs: unknown }).outputs
|
||||
if (raw !== null && typeof raw === 'object' && !Array.isArray(raw)) {
|
||||
const { outputs, thinking } = filterThinkInOutputs(raw as Record<string, unknown>, ctx.think)
|
||||
if (ctx.think && thinking !== '')
|
||||
deps.io.err.write(`${thinking}\n`)
|
||||
processedResp = { ...processedResp, data: { ...(data as Record<string, unknown>), outputs } }
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
const respMode = typeof processedResp.mode === 'string' && processedResp.mode !== '' ? processedResp.mode : mode
|
||||
deps.io.out.write(stringifyOutput(formatted({ format, data: newAppRunObject(respMode, processedResp) })))
|
||||
|
||||
@ -165,6 +165,43 @@ describe('runApp', () => {
|
||||
expect(parsed.data.status).toBe('succeeded')
|
||||
})
|
||||
|
||||
it('workflow: strips <think> from outputs by default', async () => {
|
||||
mock.setScenario('workflow-think')
|
||||
const io = bufferStreams()
|
||||
const cache = await loadAppInfoCache({ store: getCache(CACHE_APP_INFO) })
|
||||
await runApp(
|
||||
{ appId: 'app-2', inputs: { x: '1' } },
|
||||
{ active: active(), http: testHttpClient(mock.url, 'dfoa_test'), host: mock.url, io, cache },
|
||||
)
|
||||
expect(io.outBuf()).toBe('final answer\n')
|
||||
expect(io.errBuf()).not.toContain('secret reasoning')
|
||||
})
|
||||
|
||||
it('workflow --think: routes <think> to stderr, clean stdout', async () => {
|
||||
mock.setScenario('workflow-think')
|
||||
const io = bufferStreams()
|
||||
const cache = await loadAppInfoCache({ store: getCache(CACHE_APP_INFO) })
|
||||
await runApp(
|
||||
{ appId: 'app-2', inputs: { x: '1' }, think: true },
|
||||
{ active: active(), http: testHttpClient(mock.url, 'dfoa_test'), host: mock.url, io, cache },
|
||||
)
|
||||
expect(io.outBuf()).toBe('final answer\n')
|
||||
expect(io.errBuf()).toContain('secret reasoning')
|
||||
})
|
||||
|
||||
it('--stream workflow -o json --think: strips outputs and routes thinking to stderr', async () => {
|
||||
mock.setScenario('workflow-think')
|
||||
const io = bufferStreams()
|
||||
const cache = await loadAppInfoCache({ store: getCache(CACHE_APP_INFO) })
|
||||
await runApp(
|
||||
{ appId: 'app-2', inputs: { x: '1' }, stream: true, format: 'json', think: true },
|
||||
{ active: active(), http: testHttpClient(mock.url, 'dfoa_test'), host: mock.url, io, cache },
|
||||
)
|
||||
const parsed = JSON.parse(io.outBuf()) as { data: { outputs: { result: string } } }
|
||||
expect(parsed.data.outputs.result).toBe('final answer')
|
||||
expect(io.errBuf()).toContain('secret reasoning')
|
||||
})
|
||||
|
||||
it('stream-error scenario: error event surfaces typed BaseError', async () => {
|
||||
mock.setScenario('stream-error')
|
||||
const io = bufferStreams()
|
||||
|
||||
@ -74,6 +74,37 @@ describe('streamPrinterFor — workflow', () => {
|
||||
})
|
||||
})
|
||||
|
||||
describe('streamPrinterFor — workflow think filtering', () => {
|
||||
it('think: false (default) strips <think> from string outputs, nothing to stderr', () => {
|
||||
const sp = streamPrinterFor('workflow')
|
||||
const cap = captures()
|
||||
sp.onEvent(cap.out, cap.err, ev('workflow_finished', { data: { outputs: { text: '<think>hidden</think>\nresult' } } }))
|
||||
sp.onEnd(cap.out, cap.err)
|
||||
const parsed = JSON.parse(cap.outBuf().trim()) as { text: string }
|
||||
expect(parsed.text).toBe('result')
|
||||
expect(cap.errBuf()).toBe('')
|
||||
})
|
||||
|
||||
it('think: true strips <think> from string outputs and routes thinking to stderr', () => {
|
||||
const sp = streamPrinterFor('workflow', true)
|
||||
const cap = captures()
|
||||
sp.onEvent(cap.out, cap.err, ev('workflow_finished', { data: { outputs: { text: '<think>reasoning</think>\nresult' } } }))
|
||||
sp.onEnd(cap.out, cap.err)
|
||||
const parsed = JSON.parse(cap.outBuf().trim()) as { text: string }
|
||||
expect(parsed.text).toBe('result')
|
||||
expect(cap.errBuf()).toContain('<think>')
|
||||
expect(cap.errBuf()).toContain('reasoning')
|
||||
})
|
||||
|
||||
it('array outputs pass through unchanged (not reshaped into an object)', () => {
|
||||
const sp = streamPrinterFor('workflow', true)
|
||||
const cap = captures()
|
||||
sp.onEvent(cap.out, cap.err, ev('workflow_finished', { data: { outputs: ['a', 'b'] } }))
|
||||
sp.onEnd(cap.out, cap.err)
|
||||
expect(cap.outBuf().trim()).toBe('["a","b"]')
|
||||
})
|
||||
})
|
||||
|
||||
describe('streamPrinterFor — unknown mode', () => {
|
||||
it('throws', () => {
|
||||
expect(() => streamPrinterFor('whatever')).toThrow()
|
||||
|
||||
@ -4,7 +4,7 @@ import type { SseEvent } from '@/http/sse'
|
||||
import { newError } from '@/errors/base'
|
||||
import { ErrorCode } from '@/errors/codes'
|
||||
import { colorEnabled, colorScheme } from '@/sys/io/color'
|
||||
import { ThinkChunkFilter } from '@/sys/io/think-filter'
|
||||
import { filterThinkInOutputs, ThinkChunkFilter } from '@/sys/io/think-filter'
|
||||
import { RUN_MODES } from './handlers'
|
||||
import { HitlPauseError } from './sse-collector'
|
||||
|
||||
@ -106,6 +106,11 @@ class CompletionStreamPrinter implements StreamPrinter {
|
||||
|
||||
class WorkflowStreamPrinter implements StreamPrinter {
|
||||
private final: Record<string, unknown> | undefined
|
||||
private readonly think: boolean
|
||||
constructor(think: boolean) {
|
||||
this.think = think
|
||||
}
|
||||
|
||||
onEvent(_out: NodeJS.WritableStream, errOut: NodeJS.WritableStream, ev: SseEvent): void {
|
||||
if (handleCommonEvents(ev))
|
||||
return
|
||||
@ -132,12 +137,20 @@ class WorkflowStreamPrinter implements StreamPrinter {
|
||||
}
|
||||
}
|
||||
|
||||
onEnd(out: NodeJS.WritableStream): void {
|
||||
onEnd(out: NodeJS.WritableStream, errOut: NodeJS.WritableStream): void {
|
||||
if (this.final === undefined)
|
||||
return
|
||||
const data = this.final.data
|
||||
if (data !== null && typeof data === 'object' && 'outputs' in data) {
|
||||
out.write(`${JSON.stringify((data as { outputs: unknown }).outputs)}\n`)
|
||||
const raw = (data as { outputs: unknown }).outputs
|
||||
if (raw !== null && typeof raw === 'object' && !Array.isArray(raw)) {
|
||||
const { outputs, thinking } = filterThinkInOutputs(raw as Record<string, unknown>, this.think)
|
||||
if (this.think && thinking !== '')
|
||||
errOut.write(`${thinking}\n`)
|
||||
out.write(`${JSON.stringify(outputs)}\n`)
|
||||
return
|
||||
}
|
||||
out.write(`${JSON.stringify(raw)}\n`)
|
||||
return
|
||||
}
|
||||
out.write(`${JSON.stringify(this.final)}\n`)
|
||||
@ -149,7 +162,7 @@ const FACTORIES: Record<string, (think: boolean, isTTY: boolean) => StreamPrinte
|
||||
[RUN_MODES.AdvancedChat]: (think, isTTY) => new ChatStreamPrinter(think, isTTY),
|
||||
[RUN_MODES.AgentChat]: (think, isTTY) => new ChatStreamPrinter(think, isTTY),
|
||||
[RUN_MODES.Completion]: (think, _isTTY) => new CompletionStreamPrinter(think),
|
||||
[RUN_MODES.Workflow]: (_think, _isTTY) => new WorkflowStreamPrinter(),
|
||||
[RUN_MODES.Workflow]: (think, _isTTY) => new WorkflowStreamPrinter(think),
|
||||
}
|
||||
|
||||
export function streamPrinterFor(mode: string, think = false, isTTY = false): StreamPrinter {
|
||||
|
||||
@ -1,7 +1,7 @@
|
||||
import { Buffer } from 'node:buffer'
|
||||
import { PassThrough } from 'node:stream'
|
||||
import { describe, expect, it } from 'vitest'
|
||||
import { extractThinkBlocks, stripThinkBlocks, ThinkChunkFilter } from './think-filter'
|
||||
import { extractThinkBlocks, filterThinkInOutputs, stripThinkBlocks, ThinkChunkFilter } from './think-filter'
|
||||
|
||||
function captures() {
|
||||
const out = new PassThrough()
|
||||
@ -63,6 +63,50 @@ describe('extractThinkBlocks', () => {
|
||||
})
|
||||
})
|
||||
|
||||
// --- workflow outputs helper ---
|
||||
|
||||
describe('filterThinkInOutputs', () => {
|
||||
it('no think block — outputs unchanged, thinking empty', () => {
|
||||
const r = filterThinkInOutputs({ text: 'hello' }, true)
|
||||
expect(r.outputs).toEqual({ text: 'hello' })
|
||||
expect(r.thinking).toBe('')
|
||||
})
|
||||
|
||||
it('showThink: false — strips from string field, thinking empty', () => {
|
||||
const r = filterThinkInOutputs({ text: '<think>reasoning</think>\nanswer' }, false)
|
||||
expect(r.outputs).toEqual({ text: 'answer' })
|
||||
expect(r.thinking).toBe('')
|
||||
})
|
||||
|
||||
it('showThink: true — strips from string field, captures thinking', () => {
|
||||
const r = filterThinkInOutputs({ text: '<think>step 1</think>\nfinal' }, true)
|
||||
expect(r.outputs).toEqual({ text: 'final' })
|
||||
expect(r.thinking).toBe('<think>\nstep 1\n</think>')
|
||||
})
|
||||
|
||||
it('multiple string fields — thinking joined with separator', () => {
|
||||
const r = filterThinkInOutputs(
|
||||
{ a: '<think>x</think>\nfoo', b: '<think>y</think>\nbar' },
|
||||
true,
|
||||
)
|
||||
expect(r.outputs).toEqual({ a: 'foo', b: 'bar' })
|
||||
expect(r.thinking).toBe('<think>\nx\n</think>\n---\n<think>\ny\n</think>')
|
||||
})
|
||||
|
||||
it('non-string values pass through untouched', () => {
|
||||
const outputs = { n: 42, flag: true, nested: { k: '<think>v</think>\nx' }, arr: ['a'], nil: null }
|
||||
const r = filterThinkInOutputs(outputs, true)
|
||||
expect(r.outputs).toEqual(outputs)
|
||||
expect(r.thinking).toBe('')
|
||||
})
|
||||
|
||||
it('empty outputs — empty result', () => {
|
||||
const r = filterThinkInOutputs({}, true)
|
||||
expect(r.outputs).toEqual({})
|
||||
expect(r.thinking).toBe('')
|
||||
})
|
||||
})
|
||||
|
||||
// --- streaming chunk filter ---
|
||||
|
||||
describe('ThinkChunkFilter — showThink: false (strip)', () => {
|
||||
|
||||
@ -14,6 +14,28 @@ export function extractThinkBlocks(s: string): { clean: string, thinking: string
|
||||
return { clean, thinking: parts.join('\n---\n') }
|
||||
}
|
||||
|
||||
// Workflow outputs carry their answer text in top-level string fields rather than
|
||||
// a single `answer`, so think filtering navigates the outputs object. Nested
|
||||
// strings (inside arrays/objects) are left untouched.
|
||||
export function filterThinkInOutputs(
|
||||
outputs: Record<string, unknown>,
|
||||
showThink: boolean,
|
||||
): { outputs: Record<string, unknown>, thinking: string } {
|
||||
const thoughts: string[] = []
|
||||
const clean: Record<string, unknown> = {}
|
||||
for (const [key, value] of Object.entries(outputs)) {
|
||||
if (typeof value !== 'string') {
|
||||
clean[key] = value
|
||||
continue
|
||||
}
|
||||
const extracted = extractThinkBlocks(value)
|
||||
clean[key] = extracted.clean
|
||||
if (showThink && extracted.thinking !== '')
|
||||
thoughts.push(extracted.thinking)
|
||||
}
|
||||
return { outputs: clean, thinking: thoughts.join('\n---\n') }
|
||||
}
|
||||
|
||||
function splitAtPotentialTag(s: string, tag: string): [string, string] {
|
||||
const maxHold = tag.length - 1
|
||||
for (let len = Math.min(maxHold, s.length); len > 0; len--) {
|
||||
|
||||
1
cli/test/fixtures/dify-mock/scenarios.ts
vendored
1
cli/test/fixtures/dify-mock/scenarios.ts
vendored
@ -14,6 +14,7 @@ export type Scenario
|
||||
| 'server-version-empty'
|
||||
| 'server-version-unsupported'
|
||||
| 'run-422-stale'
|
||||
| 'workflow-think'
|
||||
| 'import-pending'
|
||||
| 'import-failed'
|
||||
|
||||
|
||||
7
cli/test/fixtures/dify-mock/server.ts
vendored
7
cli/test/fixtures/dify-mock/server.ts
vendored
@ -337,6 +337,13 @@ export function buildApp(getScenario: () => Scenario, state?: MockState): Hono {
|
||||
if (scenario === 'hitl-pause') {
|
||||
return new Response(hitlPauseResponse(), { status: 200, headers: { 'content-type': 'text/event-stream' } })
|
||||
}
|
||||
if (scenario === 'workflow-think') {
|
||||
const thinkSse = sseChunks([
|
||||
{ event: 'workflow_started', data: { id: 'wf-run-1', workflow_id: 'wf-1' } },
|
||||
{ event: 'workflow_finished', data: { id: 'wf-run-1', workflow_id: 'wf-1', data: { id: 'wf-run-1', status: 'succeeded', outputs: { result: '<think>secret reasoning</think>\nfinal answer' } } } },
|
||||
])
|
||||
return new Response(thinkSse, { status: 200, headers: { 'content-type': 'text/event-stream' } })
|
||||
}
|
||||
const sse = streamingRunResponse(app.mode, query, isAgent)
|
||||
return new Response(sse, { status: 200, headers: { 'content-type': 'text/event-stream' } })
|
||||
})
|
||||
|
||||
Loading…
Reference in New Issue
Block a user