From 367b2d0320b0d42ceff4c4e831bbe7a75f7810b6 Mon Sep 17 00:00:00 2001 From: twwu Date: Wed, 27 Aug 2025 16:56:33 +0800 Subject: [PATCH] refactor(web): streamline data source before run form and enhance run handling logic --- .../panel/test-run/preparation/index.tsx | 2 +- .../_base/components/workflow-panel/index.tsx | 14 ++ .../workflow-panel/last-run/use-last-run.ts | 25 +-- .../nodes/_base/hooks/use-one-step-run.ts | 1 + .../nodes/data-source/before-run-form.tsx | 46 +++-- .../data-source/hooks/use-before-run-form.ts | 180 ++++++++++++++++++ .../workflow/nodes/data-source/types.ts | 12 +- web/models/pipeline.ts | 11 ++ web/service/use-pipeline.ts | 17 ++ 9 files changed, 278 insertions(+), 30 deletions(-) create mode 100644 web/app/components/workflow/nodes/data-source/hooks/use-before-run-form.ts diff --git a/web/app/components/rag-pipeline/components/panel/test-run/preparation/index.tsx b/web/app/components/rag-pipeline/components/panel/test-run/preparation/index.tsx index 7dc5e760c9..b4dc32c122 100644 --- a/web/app/components/rag-pipeline/components/panel/test-run/preparation/index.tsx +++ b/web/app/components/rag-pipeline/components/panel/test-run/preparation/index.tsx @@ -163,7 +163,7 @@ const Preparation = () => { {datasourceType === DatasourceType.localFile && ( )} {datasourceType === DatasourceType.onlineDocument && ( diff --git a/web/app/components/workflow/nodes/_base/components/workflow-panel/index.tsx b/web/app/components/workflow/nodes/_base/components/workflow-panel/index.tsx index 88cf57b853..a528d00929 100644 --- a/web/app/components/workflow/nodes/_base/components/workflow-panel/index.tsx +++ b/web/app/components/workflow/nodes/_base/components/workflow-panel/index.tsx @@ -74,6 +74,7 @@ import type { CustomRunFormProps } from '@/app/components/workflow/nodes/data-so import { DataSourceClassification } from '@/app/components/workflow/nodes/data-source/types' import { useModalContext } from '@/context/modal-context' import DataSourceBeforeRunForm from '@/app/components/workflow/nodes/data-source/before-run-form' +import useInspectVarsCrud from '@/app/components/workflow/hooks/use-inspect-vars-crud' const getCustomRunForm = (params: CustomRunFormProps): React.JSX.Element => { const nodeType = params.payload.type @@ -222,10 +223,12 @@ const BasePanel: FC = ({ runInputData, runInputDataRef, runResult, + setRunResult, getInputVars, toVarInputs, tabType, isRunAfterSingleRun, + setIsRunAfterSingleRun, setTabType, handleAfterCustomSingleRun, singleRunParams, @@ -281,6 +284,10 @@ const BasePanel: FC = ({ setShowAccountSettingModal({ payload: 'data-source' }) }, [setShowAccountSettingModal]) + const { + appendNodeInspectVars, + } = useInspectVarsCrud() + if (logParams.showSpecialResultPanel) { return (
= ({ if (isShowSingleRun) { const form = getCustomRunForm({ nodeId: id, + flowId: configsMap?.flowId || '', + flowType: configsMap?.flowType || FlowType.appFlow, payload: data, + setRunResult, + setIsRunAfterSingleRun, + isPaused, + isRunAfterSingleRun, onSuccess: handleAfterCustomSingleRun, onCancel: hideSingleRun, + appendNodeInspectVars, }) return ( diff --git a/web/app/components/workflow/nodes/_base/components/workflow-panel/last-run/use-last-run.ts b/web/app/components/workflow/nodes/_base/components/workflow-panel/last-run/use-last-run.ts index becef81252..21462de939 100644 --- a/web/app/components/workflow/nodes/_base/components/workflow-panel/last-run/use-last-run.ts +++ b/web/app/components/workflow/nodes/_base/components/workflow-panel/last-run/use-last-run.ts @@ -174,7 +174,7 @@ const useLastRun = ({ }) const toSubmitData = useCallback((data: Record) => { - if(!isIterationNode && !isLoopNode) + if (!isIterationNode && !isLoopNode) return data const allVarObject = singleRunParams?.allVarObject || {} @@ -183,7 +183,7 @@ const useLastRun = ({ const [varSectorStr, nodeId] = key.split(DELIMITER) formattedData[`${nodeId}.${allVarObject[key].inSingleRunPassedKey}`] = data[varSectorStr] }) - if(isIterationNode) { + if (isIterationNode) { const iteratorInputKey = `${id}.input_selector` formattedData[iteratorInputKey] = data[iteratorInputKey] } @@ -203,7 +203,7 @@ const useLastRun = ({ const initShowLastRunTab = useStore(s => s.initShowLastRunTab) const [tabType, setTabType] = useState(initShowLastRunTab ? TabType.lastRun : TabType.settings) useEffect(() => { - if(initShowLastRunTab) + if (initShowLastRunTab) setTabType(TabType.lastRun) setInitShowLastRunTab(false) @@ -212,7 +212,7 @@ const useLastRun = ({ const handleRunWithParams = async (data: Record) => { const { isValid } = checkValid() - if(!isValid) + if (!isValid) return setNodeRunning() setIsRunAfterSingleRun(true) @@ -236,14 +236,14 @@ const useLastRun = ({ const values: Record = {} form.inputs.forEach(({ variable, getVarValueFromDependent }) => { const isGetValueFromDependent = getVarValueFromDependent || !variable.includes('.') - if(isGetValueFromDependent && !singleRunParams?.getDependentVar) + if (isGetValueFromDependent && !singleRunParams?.getDependentVar) return const selector = isGetValueFromDependent ? (singleRunParams?.getDependentVar(variable) || []) : variable.slice(1, -1).split('.') - if(!selector || selector.length === 0) + if (!selector || selector.length === 0) return const [nodeId, varName] = selector.slice(0, 2) - if(!isStartNode && nodeId === id) { // inner vars like loop vars + if (!isStartNode && nodeId === id) { // inner vars like loop vars values[variable] = true return } @@ -257,7 +257,7 @@ const useLastRun = ({ } const isAllVarsHasValue = (vars?: ValueSelector[]) => { - if(!vars || vars.length === 0) + if (!vars || vars.length === 0) return true return vars.every((varItem) => { const [nodeId, varName] = varItem.slice(0, 2) @@ -267,7 +267,7 @@ const useLastRun = ({ } const isSomeVarsHasValue = (vars?: ValueSelector[]) => { - if(!vars || vars.length === 0) + if (!vars || vars.length === 0) return true return vars.some((varItem) => { const [nodeId, varName] = varItem.slice(0, 2) @@ -294,7 +294,7 @@ const useLastRun = ({ } const checkAggregatorVarsSet = (vars: ValueSelector[][]) => { - if(!vars || vars.length === 0) + if (!vars || vars.length === 0) return true // in each group, at last one set is ok return vars.every((varItem) => { @@ -310,9 +310,9 @@ const useLastRun = ({ const handleSingleRun = () => { const { isValid } = checkValid() - if(!isValid) + if (!isValid) return - if(isCustomRunNode) { + if (isCustomRunNode) { showSingleRun() return } @@ -335,6 +335,7 @@ const useLastRun = ({ ...oneStepRunRes, tabType, isRunAfterSingleRun, + setIsRunAfterSingleRun, setTabType: handleTabClicked, handleAfterCustomSingleRun, singleRunParams, diff --git a/web/app/components/workflow/nodes/_base/hooks/use-one-step-run.ts b/web/app/components/workflow/nodes/_base/hooks/use-one-step-run.ts index 1582e8d36d..4970d99c40 100644 --- a/web/app/components/workflow/nodes/_base/hooks/use-one-step-run.ts +++ b/web/app/components/workflow/nodes/_base/hooks/use-one-step-run.ts @@ -663,6 +663,7 @@ const useOneStepRun = ({ runInputDataRef, setRunInputData: handleSetRunInputData, runResult, + setRunResult: doSetRunResult, iterationRunResult, loopRunResult, setNodeRunning, diff --git a/web/app/components/workflow/nodes/data-source/before-run-form.tsx b/web/app/components/workflow/nodes/data-source/before-run-form.tsx index daf451e54c..e34984faa1 100644 --- a/web/app/components/workflow/nodes/data-source/before-run-form.tsx +++ b/web/app/components/workflow/nodes/data-source/before-run-form.tsx @@ -1,7 +1,7 @@ 'use client' import type { FC } from 'react' import React, { useCallback } from 'react' -import type { CustomRunFormProps, DataSourceNodeType } from './types' +import type { CustomRunFormProps } from './types' import { DatasourceType } from '@/models/pipeline' import LocalFile from '@/app/components/datasets/documents/create-from-pipeline/data-source/local-file' import OnlineDocuments from '@/app/components/datasets/documents/create-from-pipeline/data-source/online-documents' @@ -13,18 +13,24 @@ import Button from '@/app/components/base/button' import { useTranslation } from 'react-i18next' import DataSourceProvider from '@/app/components/datasets/documents/create-from-pipeline/data-source/store/provider' import PanelWrap from '../_base/components/before-run-form/panel-wrap' +import useBeforeRunForm from './hooks/use-before-run-form' -const BeforeRunForm: FC = ({ - nodeId, - payload, - onSuccess, - onCancel, -}) => { +const BeforeRunForm: FC = (props) => { + const { + nodeId, + payload, + onCancel, + } = props const { t } = useTranslation() - const datasourceType = payload.provider_type - const datasourceNodeData = payload as DataSourceNodeType const dataSourceStore = useDataSourceStore() + const { + isPending, + handleRunWithSyncDraft, + datasourceType, + datasourceNodeData, + } = useBeforeRunForm(props) + const { clearOnlineDocumentData } = useOnlineDocument() const { clearWebsiteCrawlData } = useWebsiteCrawl() const { clearOnlineDriveData } = useOnlineDrive() @@ -44,10 +50,6 @@ const BeforeRunForm: FC = ({ setCurrentCredentialId(credentialId) }, [dataSourceStore]) - const handleRun = useCallback(() => { - onSuccess() - }, [onSuccess]) - return ( = ({ {datasourceType === DatasourceType.localFile && ( )} {datasourceType === DatasourceType.onlineDocument && ( )} @@ -71,6 +74,7 @@ const BeforeRunForm: FC = ({ )} @@ -78,12 +82,22 @@ const BeforeRunForm: FC = ({ )}
- - + +
diff --git a/web/app/components/workflow/nodes/data-source/hooks/use-before-run-form.ts b/web/app/components/workflow/nodes/data-source/hooks/use-before-run-form.ts new file mode 100644 index 0000000000..36540d6217 --- /dev/null +++ b/web/app/components/workflow/nodes/data-source/hooks/use-before-run-form.ts @@ -0,0 +1,180 @@ +import { useStoreApi } from 'reactflow' +import type { CustomRunFormProps, DataSourceNodeType } from '../types' +import { useEffect, useRef } from 'react' +import { useNodeDataUpdate, useNodesSyncDraft } from '../../../hooks' +import { NodeRunningStatus } from '../../../types' +import { useInvalidLastRun } from '@/service/use-workflow' +import type { NodeRunResult } from '@/types/workflow' +import { fetchNodeInspectVars } from '@/service/workflow' +import { FlowType } from '@/types/common' +import { useDatasourceSingleRun } from '@/service/use-pipeline' +import { useDataSourceStore } from '@/app/components/datasets/documents/create-from-pipeline/data-source/store' +import { DatasourceType } from '@/models/pipeline' +import { TransferMethod } from '@/types/app' + +const useBeforeRunForm = ({ + nodeId, + flowId, + flowType, + payload, + setRunResult, + isPaused, + isRunAfterSingleRun, + setIsRunAfterSingleRun, + onSuccess, + appendNodeInspectVars, +}: CustomRunFormProps) => { + const store = useStoreApi() + const dataSourceStore = useDataSourceStore() + const isPausedRef = useRef(isPaused) + const { handleNodeDataUpdate } = useNodeDataUpdate() + + const datasourceType = payload.provider_type as DatasourceType + const datasourceNodeData = payload as DataSourceNodeType + + useEffect(() => { + isPausedRef.current = isPaused + }, [isPaused]) + + const runningStatus = payload._singleRunningStatus || NodeRunningStatus.NotStart + + const setNodeRunning = () => { + handleNodeDataUpdate({ + id: nodeId, + data: { + ...payload, + _singleRunningStatus: NodeRunningStatus.Running, + }, + }) + } + + const invalidLastRun = useInvalidLastRun(flowType, flowId, nodeId) + + const updateRunResult = async (data: NodeRunResult) => { + const isPaused = isPausedRef.current + + // The backend don't support pause the single run, so the frontend handle the pause state. + if (isPaused) + return + + const canRunLastRun = !isRunAfterSingleRun || runningStatus === NodeRunningStatus.Succeeded + if (!canRunLastRun) { + setRunResult(data) + return + } + + // run fail may also update the inspect vars when the node set the error default output. + const vars = await fetchNodeInspectVars(FlowType.ragPipeline, flowId, nodeId) + const { getNodes } = store.getState() + const nodes = getNodes() + appendNodeInspectVars(nodeId, vars, nodes) + if (data?.status === NodeRunningStatus.Succeeded) + onSuccess() + } + + const { mutateAsync: handleDatasourceSingleRun, isPending } = useDatasourceSingleRun() + + const handleRun = () => { + let datasourceInfo: Record = {} + const { currentCredentialId: credentialId } = dataSourceStore.getState() + if (datasourceType === DatasourceType.localFile) { + const { localFileList } = dataSourceStore.getState() + const { id, name, type, size, extension, mime_type } = localFileList[0].file + const documentInfo = { + related_id: id, + name, + type, + size, + extension, + mime_type, + url: '', + transfer_method: TransferMethod.local_file, + } + datasourceInfo = documentInfo + } + if (datasourceType === DatasourceType.onlineDocument) { + const { onlineDocuments } = dataSourceStore.getState() + const { workspace_id, ...rest } = onlineDocuments[0] + const documentInfo = { + workspace_id, + page: rest, + credential_id: credentialId, + } + datasourceInfo = documentInfo + } + if (datasourceType === DatasourceType.websiteCrawl) { + const { websitePages } = dataSourceStore.getState() + datasourceInfo = { + ...websitePages[0], + credential_id: credentialId, + } + } + if (datasourceType === DatasourceType.onlineDrive) { + const { bucket, fileList, selectedFileIds } = dataSourceStore.getState() + const file = fileList.find(file => file.id === selectedFileIds[0]) + datasourceInfo = { + bucket, + id: file?.id, + type: file?.type, + credential_id: credentialId, + } + } + let hasError = false + handleDatasourceSingleRun({ + pipeline_id: flowId, + start_node_id: nodeId, + start_node_title: datasourceNodeData.title, + datasource_type: datasourceType, + datasource_info: datasourceInfo, + }, { + onError: () => { + hasError = true + invalidLastRun() + if (isPausedRef.current) + return + handleNodeDataUpdate({ + id: nodeId, + data: { + ...payload, + _isSingleRun: false, + _singleRunningStatus: NodeRunningStatus.Failed, + }, + }) + }, + onSettled: (data) => { + updateRunResult(data!) + if (!hasError && !isPausedRef.current) { + handleNodeDataUpdate({ + id: nodeId, + data: { + ...payload, + _isSingleRun: false, + _singleRunningStatus: NodeRunningStatus.Succeeded, + }, + }) + } + }, + }) + } + + const { handleSyncWorkflowDraft } = useNodesSyncDraft() + + const handleRunWithSyncDraft = () => { + setNodeRunning() + setIsRunAfterSingleRun(true) + handleSyncWorkflowDraft(true, true, { + onSuccess() { + handleRun() + }, + }) + } + + return { + isPending, + handleRunWithSyncDraft, + datasourceType, + datasourceNodeData, + } +} + +export default useBeforeRunForm diff --git a/web/app/components/workflow/nodes/data-source/types.ts b/web/app/components/workflow/nodes/data-source/types.ts index 3e6e8e6c64..da887244b8 100644 --- a/web/app/components/workflow/nodes/data-source/types.ts +++ b/web/app/components/workflow/nodes/data-source/types.ts @@ -1,4 +1,7 @@ -import type { CommonNodeType, ValueSelector } from '@/app/components/workflow/types' +import type { CommonNodeType, Node, ValueSelector } from '@/app/components/workflow/types' +import type { FlowType } from '@/types/common' +import type { NodeRunResult, VarInInspect } from '@/types/workflow' +import type { Dispatch, SetStateAction } from 'react' export enum VarType { variable = 'variable', @@ -31,7 +34,14 @@ export type DataSourceNodeType = CommonNodeType & { export type CustomRunFormProps = { nodeId: string + flowId: string + flowType: FlowType payload: CommonNodeType + setRunResult: Dispatch> + setIsRunAfterSingleRun: Dispatch> + isPaused: boolean + isRunAfterSingleRun: boolean onSuccess: () => void onCancel: () => void + appendNodeInspectVars: (nodeId: string, vars: VarInInspect[], nodes: Node[]) => void } diff --git a/web/models/pipeline.ts b/web/models/pipeline.ts index 6d6835ee2c..c1a9b42462 100644 --- a/web/models/pipeline.ts +++ b/web/models/pipeline.ts @@ -6,6 +6,7 @@ import type { AppIconSelection } from '@/app/components/base/app-icon-picker' import type { Viewport } from 'reactflow' import type { TransferMethod } from '@/types/app' import { BaseFieldType } from '@/app/components/base/form/form-scenarios/base/types' +import type { NodeRunResult } from '@/types/workflow' export enum DatasourceType { localFile = 'local_file', @@ -287,3 +288,13 @@ export type OnlineDriveFile = { size?: number type: OnlineDriveFileType } + +export type DatasourceNodeSingleRunRequest = { + pipeline_id: string + start_node_id: string + start_node_title: string + datasource_type: DatasourceType + datasource_info: Record +} + +export type DatasourceNodeSingleRunResponse = NodeRunResult diff --git a/web/service/use-pipeline.ts b/web/service/use-pipeline.ts index 2b46af9226..9938bee9d0 100644 --- a/web/service/use-pipeline.ts +++ b/web/service/use-pipeline.ts @@ -4,6 +4,8 @@ import { del, get, patch, post } from './base' import { DatasourceType } from '@/models/pipeline' import type { ConversionResponse, + DatasourceNodeSingleRunRequest, + DatasourceNodeSingleRunResponse, DeleteTemplateResponse, ExportTemplateDSLResponse, ImportPipelineDSLConfirmResponse, @@ -367,3 +369,18 @@ export const useConvertDatasetToPipeline = () => { }, }) } + +export const useDatasourceSingleRun = ( + mutationOptions: MutationOptions = {}, +) => { + return useMutation({ + mutationKey: [NAME_SPACE, 'datasource-node-single-run'], + mutationFn: (params: DatasourceNodeSingleRunRequest) => { + const { pipeline_id: pipelineId, ...rest } = params + return post(`/rag/pipelines/${pipelineId}/workflows/draft/datasource/variables-inspect`, { + body: rest, + }) + }, + ...mutationOptions, + }) +}