rag pipeline

This commit is contained in:
zxhlyh 2025-05-07 16:30:24 +08:00
parent 3f52f491d7
commit fa8ab4ea04
10 changed files with 519 additions and 5 deletions

View File

@ -6,6 +6,10 @@ import type { WorkflowProps } from '@/app/components/workflow'
import RagPipelineChildren from './rag-pipeline-children'
import {
useAvailableNodesMetaData,
useNodesSyncDraft,
useWorkflowRefreshDraft,
useWorkflowRun,
useWorkflowStartRun,
} from '../hooks'
type RagPipelineMainProps = Pick<WorkflowProps, 'nodes' | 'edges' | 'viewport'>
@ -14,14 +18,50 @@ const RagPipelineMain = ({
edges,
viewport,
}: RagPipelineMainProps) => {
const {
doSyncWorkflowDraft,
syncWorkflowDraftWhenPageClose,
} = useNodesSyncDraft()
const { handleRefreshWorkflowDraft } = useWorkflowRefreshDraft()
const {
handleBackupDraft,
handleLoadBackupDraft,
handleRestoreFromPublishedWorkflow,
handleRun,
handleStopRun,
} = useWorkflowRun()
const {
handleStartWorkflowRun,
handleWorkflowStartRunInWorkflow,
} = useWorkflowStartRun()
const availableNodesMetaData = useAvailableNodesMetaData()
const hooksStore = useMemo(() => {
return {
availableNodesMetaData,
syncWorkflowDraftWhenPageClose,
doSyncWorkflowDraft,
handleRefreshWorkflowDraft,
handleBackupDraft,
handleLoadBackupDraft,
handleRestoreFromPublishedWorkflow,
handleRun,
handleStopRun,
handleStartWorkflowRun,
handleWorkflowStartRunInWorkflow,
}
}, [
availableNodesMetaData,
syncWorkflowDraftWhenPageClose,
doSyncWorkflowDraft,
handleRefreshWorkflowDraft,
handleBackupDraft,
handleLoadBackupDraft,
handleRestoreFromPublishedWorkflow,
handleRun,
handleStopRun,
handleStartWorkflowRun,
handleWorkflowStartRunInWorkflow,
])
return (

View File

@ -1 +1,5 @@
export * from './use-available-nodes-meta-data'
export * from './use-workflow-refresh-draft'
export * from './use-nodes-sync-draft'
export * from './use-workflow-run'
export * from './use-workflow-start-run'

View File

@ -0,0 +1,84 @@
import { useCallback } from 'react'
import produce from 'immer'
import { useStoreApi } from 'reactflow'
import {
useWorkflowStore,
} from '@/app/components/workflow/store'
import {
useNodesReadOnly,
} from '@/app/components/workflow/hooks/use-workflow'
export const useNodesSyncDraft = () => {
const store = useStoreApi()
const workflowStore = useWorkflowStore()
const { getNodesReadOnly } = useNodesReadOnly()
const getPostParams = useCallback(() => {
const {
getNodes,
edges,
transform,
} = store.getState()
const [x, y, zoom] = transform
const {
pipelineId,
environmentVariables,
syncWorkflowDraftHash,
} = workflowStore.getState()
if (pipelineId) {
const nodes = getNodes()
const producedNodes = produce(nodes, (draft) => {
draft.forEach((node) => {
Object.keys(node.data).forEach((key) => {
if (key.startsWith('_'))
delete node.data[key]
})
})
})
const producedEdges = produce(edges, (draft) => {
draft.forEach((edge) => {
Object.keys(edge.data).forEach((key) => {
if (key.startsWith('_'))
delete edge.data[key]
})
})
})
return {
url: `/datasets/${pipelineId}/workflows/draft`,
params: {
graph: {
nodes: producedNodes,
edges: producedEdges,
viewport: {
x,
y,
zoom,
},
},
environment_variables: environmentVariables,
hash: syncWorkflowDraftHash,
},
}
}
}, [store, workflowStore])
const syncWorkflowDraftWhenPageClose = useCallback(() => {
return true
}, [])
const doSyncWorkflowDraft = useCallback(async () => {
if (getNodesReadOnly())
return
const postParams = getPostParams()
if (postParams)
return true
}, [getPostParams, getNodesReadOnly])
return {
doSyncWorkflowDraft,
syncWorkflowDraftWhenPageClose,
}
}

View File

@ -0,0 +1,11 @@
import { useCallback } from 'react'
export const useWorkflowRefreshDraft = () => {
const handleRefreshWorkflowDraft = useCallback(() => {
return true
}, [])
return {
handleRefreshWorkflowDraft,
}
}

View File

@ -0,0 +1,305 @@
import { useCallback } from 'react'
import {
useReactFlow,
useStoreApi,
} from 'reactflow'
import produce from 'immer'
import { useWorkflowStore } from '@/app/components/workflow/store'
import { WorkflowRunningStatus } from '@/app/components/workflow/types'
import { useWorkflowUpdate } from '@/app/components/workflow/hooks/use-workflow-interactions'
import { useWorkflowRunEvent } from '@/app/components/workflow/hooks/use-workflow-run-event/use-workflow-run-event'
import type { IOtherOptions } from '@/service/base'
import { ssePost } from '@/service/base'
import { stopWorkflowRun } from '@/service/workflow'
import type { VersionHistory } from '@/types/workflow'
import { useNodesSyncDraft } from './use-nodes-sync-draft'
export const useWorkflowRun = () => {
const store = useStoreApi()
const workflowStore = useWorkflowStore()
const reactflow = useReactFlow()
const { doSyncWorkflowDraft } = useNodesSyncDraft()
const { handleUpdateWorkflowCanvas } = useWorkflowUpdate()
const {
handleWorkflowStarted,
handleWorkflowFinished,
handleWorkflowFailed,
handleWorkflowNodeStarted,
handleWorkflowNodeFinished,
handleWorkflowNodeIterationStarted,
handleWorkflowNodeIterationNext,
handleWorkflowNodeIterationFinished,
handleWorkflowNodeLoopStarted,
handleWorkflowNodeLoopNext,
handleWorkflowNodeLoopFinished,
handleWorkflowNodeRetry,
handleWorkflowAgentLog,
handleWorkflowTextChunk,
handleWorkflowTextReplace,
} = useWorkflowRunEvent()
const handleBackupDraft = useCallback(() => {
const {
getNodes,
edges,
} = store.getState()
const { getViewport } = reactflow
const {
backupDraft,
setBackupDraft,
environmentVariables,
} = workflowStore.getState()
if (!backupDraft) {
setBackupDraft({
nodes: getNodes(),
edges,
viewport: getViewport(),
environmentVariables,
})
doSyncWorkflowDraft()
}
}, [reactflow, workflowStore, store, doSyncWorkflowDraft])
const handleLoadBackupDraft = useCallback(() => {
const {
backupDraft,
setBackupDraft,
setEnvironmentVariables,
} = workflowStore.getState()
if (backupDraft) {
const {
nodes,
edges,
viewport,
environmentVariables,
} = backupDraft
handleUpdateWorkflowCanvas({
nodes,
edges,
viewport,
})
setEnvironmentVariables(environmentVariables)
setBackupDraft(undefined)
}
}, [handleUpdateWorkflowCanvas, workflowStore])
const handleRun = useCallback(async (
params: any,
callback?: IOtherOptions,
) => {
const {
getNodes,
setNodes,
} = store.getState()
const newNodes = produce(getNodes(), (draft) => {
draft.forEach((node) => {
node.data.selected = false
node.data._runningStatus = undefined
})
})
setNodes(newNodes)
await doSyncWorkflowDraft()
const {
onWorkflowStarted,
onWorkflowFinished,
onNodeStarted,
onNodeFinished,
onIterationStart,
onIterationNext,
onIterationFinish,
onLoopStart,
onLoopNext,
onLoopFinish,
onNodeRetry,
onAgentLog,
onError,
...restCallback
} = callback || {}
const { pipelineId } = workflowStore.getState()
workflowStore.setState({ historyWorkflowData: undefined })
const workflowContainer = document.getElementById('workflow-container')
const {
clientWidth,
clientHeight,
} = workflowContainer!
const url = `/rag/pipeline/${pipelineId}/workflows/draft/run`
const {
setWorkflowRunningData,
} = workflowStore.getState()
setWorkflowRunningData({
result: {
status: WorkflowRunningStatus.Running,
},
tracing: [],
resultText: '',
})
return true
ssePost(
url,
{
body: params,
},
{
onWorkflowStarted: (params) => {
handleWorkflowStarted(params)
if (onWorkflowStarted)
onWorkflowStarted(params)
},
onWorkflowFinished: (params) => {
handleWorkflowFinished(params)
if (onWorkflowFinished)
onWorkflowFinished(params)
},
onError: (params) => {
handleWorkflowFailed()
if (onError)
onError(params)
},
onNodeStarted: (params) => {
handleWorkflowNodeStarted(
params,
{
clientWidth,
clientHeight,
},
)
if (onNodeStarted)
onNodeStarted(params)
},
onNodeFinished: (params) => {
handleWorkflowNodeFinished(params)
if (onNodeFinished)
onNodeFinished(params)
},
onIterationStart: (params) => {
handleWorkflowNodeIterationStarted(
params,
{
clientWidth,
clientHeight,
},
)
if (onIterationStart)
onIterationStart(params)
},
onIterationNext: (params) => {
handleWorkflowNodeIterationNext(params)
if (onIterationNext)
onIterationNext(params)
},
onIterationFinish: (params) => {
handleWorkflowNodeIterationFinished(params)
if (onIterationFinish)
onIterationFinish(params)
},
onLoopStart: (params) => {
handleWorkflowNodeLoopStarted(
params,
{
clientWidth,
clientHeight,
},
)
if (onLoopStart)
onLoopStart(params)
},
onLoopNext: (params) => {
handleWorkflowNodeLoopNext(params)
if (onLoopNext)
onLoopNext(params)
},
onLoopFinish: (params) => {
handleWorkflowNodeLoopFinished(params)
if (onLoopFinish)
onLoopFinish(params)
},
onNodeRetry: (params) => {
handleWorkflowNodeRetry(params)
if (onNodeRetry)
onNodeRetry(params)
},
onAgentLog: (params) => {
handleWorkflowAgentLog(params)
if (onAgentLog)
onAgentLog(params)
},
onTextChunk: (params) => {
handleWorkflowTextChunk(params)
},
onTextReplace: (params) => {
handleWorkflowTextReplace(params)
},
...restCallback,
},
)
}, [
store,
workflowStore,
doSyncWorkflowDraft,
handleWorkflowStarted,
handleWorkflowFinished,
handleWorkflowFailed,
handleWorkflowNodeStarted,
handleWorkflowNodeFinished,
handleWorkflowNodeIterationStarted,
handleWorkflowNodeIterationNext,
handleWorkflowNodeIterationFinished,
handleWorkflowNodeLoopStarted,
handleWorkflowNodeLoopNext,
handleWorkflowNodeLoopFinished,
handleWorkflowNodeRetry,
handleWorkflowTextChunk,
handleWorkflowTextReplace,
handleWorkflowAgentLog,
],
)
const handleStopRun = useCallback((taskId: string) => {
const { pipelineId } = workflowStore.getState()
stopWorkflowRun(`/rag/pipeline/${pipelineId}/workflow-runs/tasks/${taskId}/stop`)
}, [workflowStore])
const handleRestoreFromPublishedWorkflow = useCallback((publishedWorkflow: VersionHistory) => {
const nodes = publishedWorkflow.graph.nodes.map(node => ({ ...node, selected: false, data: { ...node.data, selected: false } }))
const edges = publishedWorkflow.graph.edges
const viewport = publishedWorkflow.graph.viewport!
handleUpdateWorkflowCanvas({
nodes,
edges,
viewport,
})
workflowStore.getState().setEnvironmentVariables(publishedWorkflow.environment_variables || [])
}, [handleUpdateWorkflowCanvas, workflowStore])
return {
handleBackupDraft,
handleLoadBackupDraft,
handleRun,
handleStopRun,
handleRestoreFromPublishedWorkflow,
}
}

View File

@ -0,0 +1,68 @@
import { useCallback } from 'react'
import { useStoreApi } from 'reactflow'
import { useWorkflowStore } from '@/app/components/workflow/store'
import {
BlockEnum,
WorkflowRunningStatus,
} from '@/app/components/workflow/types'
import { useWorkflowInteractions } from '@/app/components/workflow/hooks'
import {
useNodesSyncDraft,
useWorkflowRun,
} from '.'
export const useWorkflowStartRun = () => {
const store = useStoreApi()
const workflowStore = useWorkflowStore()
const { handleCancelDebugAndPreviewPanel } = useWorkflowInteractions()
const { handleRun } = useWorkflowRun()
const { doSyncWorkflowDraft } = useNodesSyncDraft()
const handleWorkflowStartRunInWorkflow = useCallback(async () => {
const {
workflowRunningData,
} = workflowStore.getState()
if (workflowRunningData?.result.status === WorkflowRunningStatus.Running)
return
const { getNodes } = store.getState()
const nodes = getNodes()
const startNode = nodes.find(node => node.data.type === BlockEnum.Start)
const startVariables = startNode?.data.variables || []
const {
showTestRunPanel,
setShowInputsPanel,
setShowEnvPanel,
setShowTestRunPanel,
} = workflowStore.getState()
setShowEnvPanel(false)
if (showTestRunPanel) {
setShowTestRunPanel?.(false)
handleCancelDebugAndPreviewPanel()
return
}
if (!startVariables.length) {
await doSyncWorkflowDraft()
handleRun({ inputs: {}, files: [] })
setShowTestRunPanel?.(true)
setShowInputsPanel(false)
}
else {
setShowTestRunPanel?.(true)
setShowInputsPanel(true)
}
}, [store, workflowStore, handleCancelDebugAndPreviewPanel, handleRun, doSyncWorkflowDraft])
const handleStartWorkflowRun = useCallback(() => {
handleWorkflowStartRunInWorkflow()
}, [handleWorkflowStartRunInWorkflow])
return {
handleStartWorkflowRun,
handleWorkflowStartRunInWorkflow,
}
}

View File

@ -1,6 +1,7 @@
import type { StateCreator } from 'zustand'
export type RagPipelineSliceShape = {
pipelineId: string
showInputFieldDialog: boolean
setShowInputFieldDialog: (showInputFieldPanel: boolean) => void
nodesDefaultConfigs: Record<string, any>
@ -11,6 +12,7 @@ export type RagPipelineSliceShape = {
export type CreateRagPipelineSliceSlice = StateCreator<RagPipelineSliceShape>
export const createRagPipelineSliceSlice: StateCreator<RagPipelineSliceShape> = set => ({
pipelineId: '',
showInputFieldDialog: false,
setShowInputFieldDialog: showInputFieldDialog => set(() => ({ showInputFieldDialog })),
nodesDefaultConfigs: {},

View File

@ -12,7 +12,7 @@ export type WorkflowDraftSliceShape = {
nodes: Node[]
edges: Edge[]
viewport: Viewport
features: Record<string, any>
features?: Record<string, any>
environmentVariables: EnvironmentVariable[]
}
setBackupDraft: (backupDraft?: WorkflowDraftSliceShape['backupDraft']) => void

View File

@ -254,7 +254,7 @@ const translation = {
'loop-start': 'Loop Start',
'loop': 'Loop',
'loop-end': 'Exit Loop',
'knowledge-base': 'Knowledge Base',
'knowledge-index': 'Knowledge Base',
},
blocksAbout: {
'start': 'Define the initial parameters for launching a workflow',
@ -277,7 +277,7 @@ const translation = {
'document-extractor': 'Used to parse uploaded documents into text content that is easily understandable by LLM.',
'list-operator': 'Used to filter or sort array content.',
'agent': 'Invoking large language models to answer questions or process natural language',
'knowledge-base': 'Knowledge Base About',
'knowledge-index': 'Knowledge Base About',
},
operator: {
zoomIn: 'Zoom In',

View File

@ -255,7 +255,7 @@ const translation = {
'loop-start': '循环开始',
'loop': '循环',
'loop-end': '退出循环',
'knowledge-base': '知识库',
'knowledge-index': '知识库',
},
blocksAbout: {
'start': '定义一个 workflow 流程启动的初始参数',
@ -278,7 +278,7 @@ const translation = {
'document-extractor': '用于将用户上传的文档解析为 LLM 便于理解的文本内容。',
'list-operator': '用于过滤或排序数组内容。',
'agent': '调用大型语言模型回答问题或处理自然语言',
'knowledge-base': '知识库节点',
'knowledge-index': '知识库节点',
},
operator: {
zoomIn: '放大',