refactor(web): streamline data source before run form and enhance run handling logic

This commit is contained in:
twwu 2025-08-27 16:56:33 +08:00
parent bd294ffe0d
commit 367b2d0320
9 changed files with 278 additions and 30 deletions

View File

@ -163,7 +163,7 @@ const Preparation = () => {
{datasourceType === DatasourceType.localFile && (
<LocalFile
allowedExtensions={datasource!.nodeData.fileExtensions || []}
notSupportBatchUpload={false} // only support single file upload in test run
notSupportBatchUpload // only support single file upload in test run
/>
)}
{datasourceType === DatasourceType.onlineDocument && (

View File

@ -74,6 +74,7 @@ import type { CustomRunFormProps } from '@/app/components/workflow/nodes/data-so
import { DataSourceClassification } from '@/app/components/workflow/nodes/data-source/types'
import { useModalContext } from '@/context/modal-context'
import DataSourceBeforeRunForm from '@/app/components/workflow/nodes/data-source/before-run-form'
import useInspectVarsCrud from '@/app/components/workflow/hooks/use-inspect-vars-crud'
const getCustomRunForm = (params: CustomRunFormProps): React.JSX.Element => {
const nodeType = params.payload.type
@ -222,10 +223,12 @@ const BasePanel: FC<BasePanelProps> = ({
runInputData,
runInputDataRef,
runResult,
setRunResult,
getInputVars,
toVarInputs,
tabType,
isRunAfterSingleRun,
setIsRunAfterSingleRun,
setTabType,
handleAfterCustomSingleRun,
singleRunParams,
@ -281,6 +284,10 @@ const BasePanel: FC<BasePanelProps> = ({
setShowAccountSettingModal({ payload: 'data-source' })
}, [setShowAccountSettingModal])
const {
appendNodeInspectVars,
} = useInspectVarsCrud()
if (logParams.showSpecialResultPanel) {
return (
<div className={cn(
@ -309,9 +316,16 @@ const BasePanel: FC<BasePanelProps> = ({
if (isShowSingleRun) {
const form = getCustomRunForm({
nodeId: id,
flowId: configsMap?.flowId || '',
flowType: configsMap?.flowType || FlowType.appFlow,
payload: data,
setRunResult,
setIsRunAfterSingleRun,
isPaused,
isRunAfterSingleRun,
onSuccess: handleAfterCustomSingleRun,
onCancel: hideSingleRun,
appendNodeInspectVars,
})
return (

View File

@ -174,7 +174,7 @@ const useLastRun = <T>({
})
const toSubmitData = useCallback((data: Record<string, any>) => {
if(!isIterationNode && !isLoopNode)
if (!isIterationNode && !isLoopNode)
return data
const allVarObject = singleRunParams?.allVarObject || {}
@ -183,7 +183,7 @@ const useLastRun = <T>({
const [varSectorStr, nodeId] = key.split(DELIMITER)
formattedData[`${nodeId}.${allVarObject[key].inSingleRunPassedKey}`] = data[varSectorStr]
})
if(isIterationNode) {
if (isIterationNode) {
const iteratorInputKey = `${id}.input_selector`
formattedData[iteratorInputKey] = data[iteratorInputKey]
}
@ -203,7 +203,7 @@ const useLastRun = <T>({
const initShowLastRunTab = useStore(s => s.initShowLastRunTab)
const [tabType, setTabType] = useState<TabType>(initShowLastRunTab ? TabType.lastRun : TabType.settings)
useEffect(() => {
if(initShowLastRunTab)
if (initShowLastRunTab)
setTabType(TabType.lastRun)
setInitShowLastRunTab(false)
@ -212,7 +212,7 @@ const useLastRun = <T>({
const handleRunWithParams = async (data: Record<string, any>) => {
const { isValid } = checkValid()
if(!isValid)
if (!isValid)
return
setNodeRunning()
setIsRunAfterSingleRun(true)
@ -236,14 +236,14 @@ const useLastRun = <T>({
const values: Record<string, boolean> = {}
form.inputs.forEach(({ variable, getVarValueFromDependent }) => {
const isGetValueFromDependent = getVarValueFromDependent || !variable.includes('.')
if(isGetValueFromDependent && !singleRunParams?.getDependentVar)
if (isGetValueFromDependent && !singleRunParams?.getDependentVar)
return
const selector = isGetValueFromDependent ? (singleRunParams?.getDependentVar(variable) || []) : variable.slice(1, -1).split('.')
if(!selector || selector.length === 0)
if (!selector || selector.length === 0)
return
const [nodeId, varName] = selector.slice(0, 2)
if(!isStartNode && nodeId === id) { // inner vars like loop vars
if (!isStartNode && nodeId === id) { // inner vars like loop vars
values[variable] = true
return
}
@ -257,7 +257,7 @@ const useLastRun = <T>({
}
const isAllVarsHasValue = (vars?: ValueSelector[]) => {
if(!vars || vars.length === 0)
if (!vars || vars.length === 0)
return true
return vars.every((varItem) => {
const [nodeId, varName] = varItem.slice(0, 2)
@ -267,7 +267,7 @@ const useLastRun = <T>({
}
const isSomeVarsHasValue = (vars?: ValueSelector[]) => {
if(!vars || vars.length === 0)
if (!vars || vars.length === 0)
return true
return vars.some((varItem) => {
const [nodeId, varName] = varItem.slice(0, 2)
@ -294,7 +294,7 @@ const useLastRun = <T>({
}
const checkAggregatorVarsSet = (vars: ValueSelector[][]) => {
if(!vars || vars.length === 0)
if (!vars || vars.length === 0)
return true
// in each group, at last one set is ok
return vars.every((varItem) => {
@ -310,9 +310,9 @@ const useLastRun = <T>({
const handleSingleRun = () => {
const { isValid } = checkValid()
if(!isValid)
if (!isValid)
return
if(isCustomRunNode) {
if (isCustomRunNode) {
showSingleRun()
return
}
@ -335,6 +335,7 @@ const useLastRun = <T>({
...oneStepRunRes,
tabType,
isRunAfterSingleRun,
setIsRunAfterSingleRun,
setTabType: handleTabClicked,
handleAfterCustomSingleRun,
singleRunParams,

View File

@ -663,6 +663,7 @@ const useOneStepRun = <T>({
runInputDataRef,
setRunInputData: handleSetRunInputData,
runResult,
setRunResult: doSetRunResult,
iterationRunResult,
loopRunResult,
setNodeRunning,

View File

@ -1,7 +1,7 @@
'use client'
import type { FC } from 'react'
import React, { useCallback } from 'react'
import type { CustomRunFormProps, DataSourceNodeType } from './types'
import type { CustomRunFormProps } from './types'
import { DatasourceType } from '@/models/pipeline'
import LocalFile from '@/app/components/datasets/documents/create-from-pipeline/data-source/local-file'
import OnlineDocuments from '@/app/components/datasets/documents/create-from-pipeline/data-source/online-documents'
@ -13,18 +13,24 @@ import Button from '@/app/components/base/button'
import { useTranslation } from 'react-i18next'
import DataSourceProvider from '@/app/components/datasets/documents/create-from-pipeline/data-source/store/provider'
import PanelWrap from '../_base/components/before-run-form/panel-wrap'
import useBeforeRunForm from './hooks/use-before-run-form'
const BeforeRunForm: FC<CustomRunFormProps> = ({
nodeId,
payload,
onSuccess,
onCancel,
}) => {
const BeforeRunForm: FC<CustomRunFormProps> = (props) => {
const {
nodeId,
payload,
onCancel,
} = props
const { t } = useTranslation()
const datasourceType = payload.provider_type
const datasourceNodeData = payload as DataSourceNodeType
const dataSourceStore = useDataSourceStore()
const {
isPending,
handleRunWithSyncDraft,
datasourceType,
datasourceNodeData,
} = useBeforeRunForm(props)
const { clearOnlineDocumentData } = useOnlineDocument()
const { clearWebsiteCrawlData } = useWebsiteCrawl()
const { clearOnlineDriveData } = useOnlineDrive()
@ -44,10 +50,6 @@ const BeforeRunForm: FC<CustomRunFormProps> = ({
setCurrentCredentialId(credentialId)
}, [dataSourceStore])
const handleRun = useCallback(() => {
onSuccess()
}, [onSuccess])
return (
<PanelWrap
nodeName={payload.title}
@ -57,13 +59,14 @@ const BeforeRunForm: FC<CustomRunFormProps> = ({
{datasourceType === DatasourceType.localFile && (
<LocalFile
allowedExtensions={datasourceNodeData.fileExtensions || []}
notSupportBatchUpload={false}
notSupportBatchUpload
/>
)}
{datasourceType === DatasourceType.onlineDocument && (
<OnlineDocuments
nodeId={nodeId}
nodeData={datasourceNodeData}
isInPipeline
onCredentialChange={handleCredentialChange}
/>
)}
@ -71,6 +74,7 @@ const BeforeRunForm: FC<CustomRunFormProps> = ({
<WebsiteCrawl
nodeId={nodeId}
nodeData={datasourceNodeData}
isInPipeline
onCredentialChange={handleCredentialChange}
/>
)}
@ -78,12 +82,22 @@ const BeforeRunForm: FC<CustomRunFormProps> = ({
<OnlineDrive
nodeId={nodeId}
nodeData={datasourceNodeData}
isInPipeline
onCredentialChange={handleCredentialChange}
/>
)}
<div className='flex justify-end gap-x-2'>
<Button onClick={onCancel}>{t('common.operation.cancel')}</Button>
<Button onClick={handleRun} variant='primary'>{t('workflow.singleRun.startRun')}</Button>
<Button onClick={onCancel}>
{t('common.operation.cancel')}
</Button>
<Button
onClick={handleRunWithSyncDraft}
variant='primary'
loading={isPending}
disabled={isPending}
>
{t('workflow.singleRun.startRun')}
</Button>
</div>
</div>
</PanelWrap>

View File

@ -0,0 +1,180 @@
import { useStoreApi } from 'reactflow'
import type { CustomRunFormProps, DataSourceNodeType } from '../types'
import { useEffect, useRef } from 'react'
import { useNodeDataUpdate, useNodesSyncDraft } from '../../../hooks'
import { NodeRunningStatus } from '../../../types'
import { useInvalidLastRun } from '@/service/use-workflow'
import type { NodeRunResult } from '@/types/workflow'
import { fetchNodeInspectVars } from '@/service/workflow'
import { FlowType } from '@/types/common'
import { useDatasourceSingleRun } from '@/service/use-pipeline'
import { useDataSourceStore } from '@/app/components/datasets/documents/create-from-pipeline/data-source/store'
import { DatasourceType } from '@/models/pipeline'
import { TransferMethod } from '@/types/app'
const useBeforeRunForm = ({
nodeId,
flowId,
flowType,
payload,
setRunResult,
isPaused,
isRunAfterSingleRun,
setIsRunAfterSingleRun,
onSuccess,
appendNodeInspectVars,
}: CustomRunFormProps) => {
const store = useStoreApi()
const dataSourceStore = useDataSourceStore()
const isPausedRef = useRef(isPaused)
const { handleNodeDataUpdate } = useNodeDataUpdate()
const datasourceType = payload.provider_type as DatasourceType
const datasourceNodeData = payload as DataSourceNodeType
useEffect(() => {
isPausedRef.current = isPaused
}, [isPaused])
const runningStatus = payload._singleRunningStatus || NodeRunningStatus.NotStart
const setNodeRunning = () => {
handleNodeDataUpdate({
id: nodeId,
data: {
...payload,
_singleRunningStatus: NodeRunningStatus.Running,
},
})
}
const invalidLastRun = useInvalidLastRun(flowType, flowId, nodeId)
const updateRunResult = async (data: NodeRunResult) => {
const isPaused = isPausedRef.current
// The backend don't support pause the single run, so the frontend handle the pause state.
if (isPaused)
return
const canRunLastRun = !isRunAfterSingleRun || runningStatus === NodeRunningStatus.Succeeded
if (!canRunLastRun) {
setRunResult(data)
return
}
// run fail may also update the inspect vars when the node set the error default output.
const vars = await fetchNodeInspectVars(FlowType.ragPipeline, flowId, nodeId)
const { getNodes } = store.getState()
const nodes = getNodes()
appendNodeInspectVars(nodeId, vars, nodes)
if (data?.status === NodeRunningStatus.Succeeded)
onSuccess()
}
const { mutateAsync: handleDatasourceSingleRun, isPending } = useDatasourceSingleRun()
const handleRun = () => {
let datasourceInfo: Record<string, any> = {}
const { currentCredentialId: credentialId } = dataSourceStore.getState()
if (datasourceType === DatasourceType.localFile) {
const { localFileList } = dataSourceStore.getState()
const { id, name, type, size, extension, mime_type } = localFileList[0].file
const documentInfo = {
related_id: id,
name,
type,
size,
extension,
mime_type,
url: '',
transfer_method: TransferMethod.local_file,
}
datasourceInfo = documentInfo
}
if (datasourceType === DatasourceType.onlineDocument) {
const { onlineDocuments } = dataSourceStore.getState()
const { workspace_id, ...rest } = onlineDocuments[0]
const documentInfo = {
workspace_id,
page: rest,
credential_id: credentialId,
}
datasourceInfo = documentInfo
}
if (datasourceType === DatasourceType.websiteCrawl) {
const { websitePages } = dataSourceStore.getState()
datasourceInfo = {
...websitePages[0],
credential_id: credentialId,
}
}
if (datasourceType === DatasourceType.onlineDrive) {
const { bucket, fileList, selectedFileIds } = dataSourceStore.getState()
const file = fileList.find(file => file.id === selectedFileIds[0])
datasourceInfo = {
bucket,
id: file?.id,
type: file?.type,
credential_id: credentialId,
}
}
let hasError = false
handleDatasourceSingleRun({
pipeline_id: flowId,
start_node_id: nodeId,
start_node_title: datasourceNodeData.title,
datasource_type: datasourceType,
datasource_info: datasourceInfo,
}, {
onError: () => {
hasError = true
invalidLastRun()
if (isPausedRef.current)
return
handleNodeDataUpdate({
id: nodeId,
data: {
...payload,
_isSingleRun: false,
_singleRunningStatus: NodeRunningStatus.Failed,
},
})
},
onSettled: (data) => {
updateRunResult(data!)
if (!hasError && !isPausedRef.current) {
handleNodeDataUpdate({
id: nodeId,
data: {
...payload,
_isSingleRun: false,
_singleRunningStatus: NodeRunningStatus.Succeeded,
},
})
}
},
})
}
const { handleSyncWorkflowDraft } = useNodesSyncDraft()
const handleRunWithSyncDraft = () => {
setNodeRunning()
setIsRunAfterSingleRun(true)
handleSyncWorkflowDraft(true, true, {
onSuccess() {
handleRun()
},
})
}
return {
isPending,
handleRunWithSyncDraft,
datasourceType,
datasourceNodeData,
}
}
export default useBeforeRunForm

View File

@ -1,4 +1,7 @@
import type { CommonNodeType, ValueSelector } from '@/app/components/workflow/types'
import type { CommonNodeType, Node, ValueSelector } from '@/app/components/workflow/types'
import type { FlowType } from '@/types/common'
import type { NodeRunResult, VarInInspect } from '@/types/workflow'
import type { Dispatch, SetStateAction } from 'react'
export enum VarType {
variable = 'variable',
@ -31,7 +34,14 @@ export type DataSourceNodeType = CommonNodeType & {
export type CustomRunFormProps = {
nodeId: string
flowId: string
flowType: FlowType
payload: CommonNodeType
setRunResult: Dispatch<SetStateAction<NodeRunResult | null>>
setIsRunAfterSingleRun: Dispatch<SetStateAction<boolean>>
isPaused: boolean
isRunAfterSingleRun: boolean
onSuccess: () => void
onCancel: () => void
appendNodeInspectVars: (nodeId: string, vars: VarInInspect[], nodes: Node[]) => void
}

View File

@ -6,6 +6,7 @@ import type { AppIconSelection } from '@/app/components/base/app-icon-picker'
import type { Viewport } from 'reactflow'
import type { TransferMethod } from '@/types/app'
import { BaseFieldType } from '@/app/components/base/form/form-scenarios/base/types'
import type { NodeRunResult } from '@/types/workflow'
export enum DatasourceType {
localFile = 'local_file',
@ -287,3 +288,13 @@ export type OnlineDriveFile = {
size?: number
type: OnlineDriveFileType
}
export type DatasourceNodeSingleRunRequest = {
pipeline_id: string
start_node_id: string
start_node_title: string
datasource_type: DatasourceType
datasource_info: Record<string, any>
}
export type DatasourceNodeSingleRunResponse = NodeRunResult

View File

@ -4,6 +4,8 @@ import { del, get, patch, post } from './base'
import { DatasourceType } from '@/models/pipeline'
import type {
ConversionResponse,
DatasourceNodeSingleRunRequest,
DatasourceNodeSingleRunResponse,
DeleteTemplateResponse,
ExportTemplateDSLResponse,
ImportPipelineDSLConfirmResponse,
@ -367,3 +369,18 @@ export const useConvertDatasetToPipeline = () => {
},
})
}
export const useDatasourceSingleRun = (
mutationOptions: MutationOptions<DatasourceNodeSingleRunResponse, Error, DatasourceNodeSingleRunRequest> = {},
) => {
return useMutation({
mutationKey: [NAME_SPACE, 'datasource-node-single-run'],
mutationFn: (params: DatasourceNodeSingleRunRequest) => {
const { pipeline_id: pipelineId, ...rest } = params
return post<DatasourceNodeSingleRunResponse>(`/rag/pipelines/${pipelineId}/workflows/draft/datasource/variables-inspect`, {
body: rest,
})
},
...mutationOptions,
})
}