diff --git a/web/app/components/rag-pipeline/components/rag-pipeline-main.tsx b/web/app/components/rag-pipeline/components/rag-pipeline-main.tsx index 2c28f7ef24..a96acbc19c 100644 --- a/web/app/components/rag-pipeline/components/rag-pipeline-main.tsx +++ b/web/app/components/rag-pipeline/components/rag-pipeline-main.tsx @@ -6,6 +6,10 @@ import type { WorkflowProps } from '@/app/components/workflow' import RagPipelineChildren from './rag-pipeline-children' import { useAvailableNodesMetaData, + useNodesSyncDraft, + useWorkflowRefreshDraft, + useWorkflowRun, + useWorkflowStartRun, } from '../hooks' type RagPipelineMainProps = Pick @@ -14,14 +18,50 @@ const RagPipelineMain = ({ edges, viewport, }: RagPipelineMainProps) => { + const { + doSyncWorkflowDraft, + syncWorkflowDraftWhenPageClose, + } = useNodesSyncDraft() + const { handleRefreshWorkflowDraft } = useWorkflowRefreshDraft() + const { + handleBackupDraft, + handleLoadBackupDraft, + handleRestoreFromPublishedWorkflow, + handleRun, + handleStopRun, + } = useWorkflowRun() + const { + handleStartWorkflowRun, + handleWorkflowStartRunInWorkflow, + } = useWorkflowStartRun() const availableNodesMetaData = useAvailableNodesMetaData() const hooksStore = useMemo(() => { return { availableNodesMetaData, + syncWorkflowDraftWhenPageClose, + doSyncWorkflowDraft, + handleRefreshWorkflowDraft, + handleBackupDraft, + handleLoadBackupDraft, + handleRestoreFromPublishedWorkflow, + handleRun, + handleStopRun, + handleStartWorkflowRun, + handleWorkflowStartRunInWorkflow, } }, [ availableNodesMetaData, + syncWorkflowDraftWhenPageClose, + doSyncWorkflowDraft, + handleRefreshWorkflowDraft, + handleBackupDraft, + handleLoadBackupDraft, + handleRestoreFromPublishedWorkflow, + handleRun, + handleStopRun, + handleStartWorkflowRun, + handleWorkflowStartRunInWorkflow, ]) return ( diff --git a/web/app/components/rag-pipeline/hooks/index.ts b/web/app/components/rag-pipeline/hooks/index.ts index ed5ceca87d..f4e5fa491b 100644 --- a/web/app/components/rag-pipeline/hooks/index.ts +++ b/web/app/components/rag-pipeline/hooks/index.ts @@ -1 +1,5 @@ export * from './use-available-nodes-meta-data' +export * from './use-workflow-refresh-draft' +export * from './use-nodes-sync-draft' +export * from './use-workflow-run' +export * from './use-workflow-start-run' diff --git a/web/app/components/rag-pipeline/hooks/use-nodes-sync-draft.ts b/web/app/components/rag-pipeline/hooks/use-nodes-sync-draft.ts new file mode 100644 index 0000000000..230e7c038b --- /dev/null +++ b/web/app/components/rag-pipeline/hooks/use-nodes-sync-draft.ts @@ -0,0 +1,84 @@ +import { useCallback } from 'react' +import produce from 'immer' +import { useStoreApi } from 'reactflow' +import { + useWorkflowStore, +} from '@/app/components/workflow/store' +import { + useNodesReadOnly, +} from '@/app/components/workflow/hooks/use-workflow' + +export const useNodesSyncDraft = () => { + const store = useStoreApi() + const workflowStore = useWorkflowStore() + const { getNodesReadOnly } = useNodesReadOnly() + + const getPostParams = useCallback(() => { + const { + getNodes, + edges, + transform, + } = store.getState() + const [x, y, zoom] = transform + const { + pipelineId, + environmentVariables, + syncWorkflowDraftHash, + } = workflowStore.getState() + + if (pipelineId) { + const nodes = getNodes() + + const producedNodes = produce(nodes, (draft) => { + draft.forEach((node) => { + Object.keys(node.data).forEach((key) => { + if (key.startsWith('_')) + delete node.data[key] + }) + }) + }) + const producedEdges = produce(edges, (draft) => { + draft.forEach((edge) => { + Object.keys(edge.data).forEach((key) => { + if (key.startsWith('_')) + delete edge.data[key] + }) + }) + }) + return { + url: `/datasets/${pipelineId}/workflows/draft`, + params: { + graph: { + nodes: producedNodes, + edges: producedEdges, + viewport: { + x, + y, + zoom, + }, + }, + environment_variables: environmentVariables, + hash: syncWorkflowDraftHash, + }, + } + } + }, [store, workflowStore]) + + const syncWorkflowDraftWhenPageClose = useCallback(() => { + return true + }, []) + + const doSyncWorkflowDraft = useCallback(async () => { + if (getNodesReadOnly()) + return + const postParams = getPostParams() + + if (postParams) + return true + }, [getPostParams, getNodesReadOnly]) + + return { + doSyncWorkflowDraft, + syncWorkflowDraftWhenPageClose, + } +} diff --git a/web/app/components/rag-pipeline/hooks/use-workflow-refresh-draft.ts b/web/app/components/rag-pipeline/hooks/use-workflow-refresh-draft.ts new file mode 100644 index 0000000000..e87c5b049c --- /dev/null +++ b/web/app/components/rag-pipeline/hooks/use-workflow-refresh-draft.ts @@ -0,0 +1,11 @@ +import { useCallback } from 'react' + +export const useWorkflowRefreshDraft = () => { + const handleRefreshWorkflowDraft = useCallback(() => { + return true + }, []) + + return { + handleRefreshWorkflowDraft, + } +} diff --git a/web/app/components/rag-pipeline/hooks/use-workflow-run.ts b/web/app/components/rag-pipeline/hooks/use-workflow-run.ts new file mode 100644 index 0000000000..fc3fc3ecb2 --- /dev/null +++ b/web/app/components/rag-pipeline/hooks/use-workflow-run.ts @@ -0,0 +1,305 @@ +import { useCallback } from 'react' +import { + useReactFlow, + useStoreApi, +} from 'reactflow' +import produce from 'immer' +import { useWorkflowStore } from '@/app/components/workflow/store' +import { WorkflowRunningStatus } from '@/app/components/workflow/types' +import { useWorkflowUpdate } from '@/app/components/workflow/hooks/use-workflow-interactions' +import { useWorkflowRunEvent } from '@/app/components/workflow/hooks/use-workflow-run-event/use-workflow-run-event' +import type { IOtherOptions } from '@/service/base' +import { ssePost } from '@/service/base' +import { stopWorkflowRun } from '@/service/workflow' +import type { VersionHistory } from '@/types/workflow' +import { useNodesSyncDraft } from './use-nodes-sync-draft' + +export const useWorkflowRun = () => { + const store = useStoreApi() + const workflowStore = useWorkflowStore() + const reactflow = useReactFlow() + const { doSyncWorkflowDraft } = useNodesSyncDraft() + const { handleUpdateWorkflowCanvas } = useWorkflowUpdate() + + const { + handleWorkflowStarted, + handleWorkflowFinished, + handleWorkflowFailed, + handleWorkflowNodeStarted, + handleWorkflowNodeFinished, + handleWorkflowNodeIterationStarted, + handleWorkflowNodeIterationNext, + handleWorkflowNodeIterationFinished, + handleWorkflowNodeLoopStarted, + handleWorkflowNodeLoopNext, + handleWorkflowNodeLoopFinished, + handleWorkflowNodeRetry, + handleWorkflowAgentLog, + handleWorkflowTextChunk, + handleWorkflowTextReplace, + } = useWorkflowRunEvent() + + const handleBackupDraft = useCallback(() => { + const { + getNodes, + edges, + } = store.getState() + const { getViewport } = reactflow + const { + backupDraft, + setBackupDraft, + environmentVariables, + } = workflowStore.getState() + + if (!backupDraft) { + setBackupDraft({ + nodes: getNodes(), + edges, + viewport: getViewport(), + environmentVariables, + }) + doSyncWorkflowDraft() + } + }, [reactflow, workflowStore, store, doSyncWorkflowDraft]) + + const handleLoadBackupDraft = useCallback(() => { + const { + backupDraft, + setBackupDraft, + setEnvironmentVariables, + } = workflowStore.getState() + + if (backupDraft) { + const { + nodes, + edges, + viewport, + environmentVariables, + } = backupDraft + handleUpdateWorkflowCanvas({ + nodes, + edges, + viewport, + }) + setEnvironmentVariables(environmentVariables) + setBackupDraft(undefined) + } + }, [handleUpdateWorkflowCanvas, workflowStore]) + + const handleRun = useCallback(async ( + params: any, + callback?: IOtherOptions, + ) => { + const { + getNodes, + setNodes, + } = store.getState() + const newNodes = produce(getNodes(), (draft) => { + draft.forEach((node) => { + node.data.selected = false + node.data._runningStatus = undefined + }) + }) + setNodes(newNodes) + await doSyncWorkflowDraft() + + const { + onWorkflowStarted, + onWorkflowFinished, + onNodeStarted, + onNodeFinished, + onIterationStart, + onIterationNext, + onIterationFinish, + onLoopStart, + onLoopNext, + onLoopFinish, + onNodeRetry, + onAgentLog, + onError, + ...restCallback + } = callback || {} + const { pipelineId } = workflowStore.getState() + workflowStore.setState({ historyWorkflowData: undefined }) + const workflowContainer = document.getElementById('workflow-container') + + const { + clientWidth, + clientHeight, + } = workflowContainer! + + const url = `/rag/pipeline/${pipelineId}/workflows/draft/run` + + const { + setWorkflowRunningData, + } = workflowStore.getState() + setWorkflowRunningData({ + result: { + status: WorkflowRunningStatus.Running, + }, + tracing: [], + resultText: '', + }) + + return true + + ssePost( + url, + { + body: params, + }, + { + onWorkflowStarted: (params) => { + handleWorkflowStarted(params) + + if (onWorkflowStarted) + onWorkflowStarted(params) + }, + onWorkflowFinished: (params) => { + handleWorkflowFinished(params) + + if (onWorkflowFinished) + onWorkflowFinished(params) + }, + onError: (params) => { + handleWorkflowFailed() + + if (onError) + onError(params) + }, + onNodeStarted: (params) => { + handleWorkflowNodeStarted( + params, + { + clientWidth, + clientHeight, + }, + ) + + if (onNodeStarted) + onNodeStarted(params) + }, + onNodeFinished: (params) => { + handleWorkflowNodeFinished(params) + + if (onNodeFinished) + onNodeFinished(params) + }, + onIterationStart: (params) => { + handleWorkflowNodeIterationStarted( + params, + { + clientWidth, + clientHeight, + }, + ) + + if (onIterationStart) + onIterationStart(params) + }, + onIterationNext: (params) => { + handleWorkflowNodeIterationNext(params) + + if (onIterationNext) + onIterationNext(params) + }, + onIterationFinish: (params) => { + handleWorkflowNodeIterationFinished(params) + + if (onIterationFinish) + onIterationFinish(params) + }, + onLoopStart: (params) => { + handleWorkflowNodeLoopStarted( + params, + { + clientWidth, + clientHeight, + }, + ) + + if (onLoopStart) + onLoopStart(params) + }, + onLoopNext: (params) => { + handleWorkflowNodeLoopNext(params) + + if (onLoopNext) + onLoopNext(params) + }, + onLoopFinish: (params) => { + handleWorkflowNodeLoopFinished(params) + + if (onLoopFinish) + onLoopFinish(params) + }, + onNodeRetry: (params) => { + handleWorkflowNodeRetry(params) + + if (onNodeRetry) + onNodeRetry(params) + }, + onAgentLog: (params) => { + handleWorkflowAgentLog(params) + + if (onAgentLog) + onAgentLog(params) + }, + onTextChunk: (params) => { + handleWorkflowTextChunk(params) + }, + onTextReplace: (params) => { + handleWorkflowTextReplace(params) + }, + ...restCallback, + }, + ) + }, [ + store, + workflowStore, + doSyncWorkflowDraft, + handleWorkflowStarted, + handleWorkflowFinished, + handleWorkflowFailed, + handleWorkflowNodeStarted, + handleWorkflowNodeFinished, + handleWorkflowNodeIterationStarted, + handleWorkflowNodeIterationNext, + handleWorkflowNodeIterationFinished, + handleWorkflowNodeLoopStarted, + handleWorkflowNodeLoopNext, + handleWorkflowNodeLoopFinished, + handleWorkflowNodeRetry, + handleWorkflowTextChunk, + handleWorkflowTextReplace, + handleWorkflowAgentLog, + ], + ) + + const handleStopRun = useCallback((taskId: string) => { + const { pipelineId } = workflowStore.getState() + + stopWorkflowRun(`/rag/pipeline/${pipelineId}/workflow-runs/tasks/${taskId}/stop`) + }, [workflowStore]) + + const handleRestoreFromPublishedWorkflow = useCallback((publishedWorkflow: VersionHistory) => { + const nodes = publishedWorkflow.graph.nodes.map(node => ({ ...node, selected: false, data: { ...node.data, selected: false } })) + const edges = publishedWorkflow.graph.edges + const viewport = publishedWorkflow.graph.viewport! + handleUpdateWorkflowCanvas({ + nodes, + edges, + viewport, + }) + + workflowStore.getState().setEnvironmentVariables(publishedWorkflow.environment_variables || []) + }, [handleUpdateWorkflowCanvas, workflowStore]) + + return { + handleBackupDraft, + handleLoadBackupDraft, + handleRun, + handleStopRun, + handleRestoreFromPublishedWorkflow, + } +} diff --git a/web/app/components/rag-pipeline/hooks/use-workflow-start-run.tsx b/web/app/components/rag-pipeline/hooks/use-workflow-start-run.tsx new file mode 100644 index 0000000000..bf411774b5 --- /dev/null +++ b/web/app/components/rag-pipeline/hooks/use-workflow-start-run.tsx @@ -0,0 +1,68 @@ +import { useCallback } from 'react' +import { useStoreApi } from 'reactflow' +import { useWorkflowStore } from '@/app/components/workflow/store' +import { + BlockEnum, + WorkflowRunningStatus, +} from '@/app/components/workflow/types' +import { useWorkflowInteractions } from '@/app/components/workflow/hooks' +import { + useNodesSyncDraft, + useWorkflowRun, +} from '.' + +export const useWorkflowStartRun = () => { + const store = useStoreApi() + const workflowStore = useWorkflowStore() + const { handleCancelDebugAndPreviewPanel } = useWorkflowInteractions() + const { handleRun } = useWorkflowRun() + const { doSyncWorkflowDraft } = useNodesSyncDraft() + + const handleWorkflowStartRunInWorkflow = useCallback(async () => { + const { + workflowRunningData, + } = workflowStore.getState() + + if (workflowRunningData?.result.status === WorkflowRunningStatus.Running) + return + + const { getNodes } = store.getState() + const nodes = getNodes() + const startNode = nodes.find(node => node.data.type === BlockEnum.Start) + const startVariables = startNode?.data.variables || [] + const { + showTestRunPanel, + setShowInputsPanel, + setShowEnvPanel, + setShowTestRunPanel, + } = workflowStore.getState() + + setShowEnvPanel(false) + + if (showTestRunPanel) { + setShowTestRunPanel?.(false) + handleCancelDebugAndPreviewPanel() + return + } + + if (!startVariables.length) { + await doSyncWorkflowDraft() + handleRun({ inputs: {}, files: [] }) + setShowTestRunPanel?.(true) + setShowInputsPanel(false) + } + else { + setShowTestRunPanel?.(true) + setShowInputsPanel(true) + } + }, [store, workflowStore, handleCancelDebugAndPreviewPanel, handleRun, doSyncWorkflowDraft]) + + const handleStartWorkflowRun = useCallback(() => { + handleWorkflowStartRunInWorkflow() + }, [handleWorkflowStartRunInWorkflow]) + + return { + handleStartWorkflowRun, + handleWorkflowStartRunInWorkflow, + } +} diff --git a/web/app/components/rag-pipeline/store/index.ts b/web/app/components/rag-pipeline/store/index.ts index 261e89b00f..80c5027d72 100644 --- a/web/app/components/rag-pipeline/store/index.ts +++ b/web/app/components/rag-pipeline/store/index.ts @@ -1,6 +1,7 @@ import type { StateCreator } from 'zustand' export type RagPipelineSliceShape = { + pipelineId: string showInputFieldDialog: boolean setShowInputFieldDialog: (showInputFieldPanel: boolean) => void nodesDefaultConfigs: Record @@ -11,6 +12,7 @@ export type RagPipelineSliceShape = { export type CreateRagPipelineSliceSlice = StateCreator export const createRagPipelineSliceSlice: StateCreator = set => ({ + pipelineId: '', showInputFieldDialog: false, setShowInputFieldDialog: showInputFieldDialog => set(() => ({ showInputFieldDialog })), nodesDefaultConfigs: {}, diff --git a/web/app/components/workflow/store/workflow/workflow-draft-slice.ts b/web/app/components/workflow/store/workflow/workflow-draft-slice.ts index ec28debee2..a4048a9455 100644 --- a/web/app/components/workflow/store/workflow/workflow-draft-slice.ts +++ b/web/app/components/workflow/store/workflow/workflow-draft-slice.ts @@ -12,7 +12,7 @@ export type WorkflowDraftSliceShape = { nodes: Node[] edges: Edge[] viewport: Viewport - features: Record + features?: Record environmentVariables: EnvironmentVariable[] } setBackupDraft: (backupDraft?: WorkflowDraftSliceShape['backupDraft']) => void diff --git a/web/i18n/en-US/workflow.ts b/web/i18n/en-US/workflow.ts index d23305f6ee..7b235a2723 100644 --- a/web/i18n/en-US/workflow.ts +++ b/web/i18n/en-US/workflow.ts @@ -254,7 +254,7 @@ const translation = { 'loop-start': 'Loop Start', 'loop': 'Loop', 'loop-end': 'Exit Loop', - 'knowledge-base': 'Knowledge Base', + 'knowledge-index': 'Knowledge Base', }, blocksAbout: { 'start': 'Define the initial parameters for launching a workflow', @@ -277,7 +277,7 @@ const translation = { 'document-extractor': 'Used to parse uploaded documents into text content that is easily understandable by LLM.', 'list-operator': 'Used to filter or sort array content.', 'agent': 'Invoking large language models to answer questions or process natural language', - 'knowledge-base': 'Knowledge Base About', + 'knowledge-index': 'Knowledge Base About', }, operator: { zoomIn: 'Zoom In', diff --git a/web/i18n/zh-Hans/workflow.ts b/web/i18n/zh-Hans/workflow.ts index c2f14e3403..ce40ca2c33 100644 --- a/web/i18n/zh-Hans/workflow.ts +++ b/web/i18n/zh-Hans/workflow.ts @@ -255,7 +255,7 @@ const translation = { 'loop-start': '循环开始', 'loop': '循环', 'loop-end': '退出循环', - 'knowledge-base': '知识库', + 'knowledge-index': '知识库', }, blocksAbout: { 'start': '定义一个 workflow 流程启动的初始参数', @@ -278,7 +278,7 @@ const translation = { 'document-extractor': '用于将用户上传的文档解析为 LLM 便于理解的文本内容。', 'list-operator': '用于过滤或排序数组内容。', 'agent': '调用大型语言模型回答问题或处理自然语言', - 'knowledge-base': '知识库节点', + 'knowledge-index': '知识库节点', }, operator: { zoomIn: '放大',