feat: enhance pipeline settings with execution log and processing capabilities

This commit is contained in:
twwu 2025-06-19 15:57:49 +08:00
parent 55516c4e57
commit 335e1e3602
7 changed files with 193 additions and 55 deletions

View File

@ -147,9 +147,6 @@ const ChunkPreview = ({
{!isPending && currentDocForm === ChunkingMode.parentChild && estimateData?.preview && (
estimateData?.preview?.map((item, index) => {
const indexForLabel = index + 1
// const childChunks = parentChildConfig.chunkForContext === 'full-doc'
// ? item.child_chunks.slice(0, FULL_DOC_PREVIEW_LENGTH)
// : item.child_chunks
return (
<ChunkContainer
key={item.content}

View File

@ -1,15 +1,19 @@
import { useCallback, useRef, useState } from 'react'
import type { CrawlResultItem, DocumentItem, FileIndexingEstimateResponse } from '@/models/datasets'
import { useCallback, useMemo, useRef, useState } from 'react'
import type { CrawlResultItem, CustomFile, FileIndexingEstimateResponse } from '@/models/datasets'
import type { NotionPage } from '@/models/common'
import { useTranslation } from 'react-i18next'
import { useDatasetDetailContextWithSelector } from '@/context/dataset-detail'
import { useDocumentDetail } from '@/service/knowledge/use-document'
import AppUnavailable from '@/app/components/base/app-unavailable'
import ChunkPreview from '../../../create-from-pipeline/preview/chunk-preview'
import Loading from '@/app/components/base/loading'
import type { DatasourceType } from '@/models/pipeline'
import ProcessDocuments from './process-documents'
import LeftHeader from './left-header'
import { usePipelineExecutionLog, useRunPublishedPipeline } from '@/service/use-pipeline'
import type { PublishedPipelineRunPreviewResponse } from '@/models/pipeline'
import { DatasourceType } from '@/models/pipeline'
import { noop } from 'lodash-es'
import { useDatasetDetailContextWithSelector } from '@/context/dataset-detail'
import { useRouter } from 'next/navigation'
import { useInvalidDocumentList } from '@/service/knowledge/use-document'
type PipelineSettingsProps = {
datasetId: string
@ -21,25 +25,100 @@ const PipelineSettings = ({
documentId,
}: PipelineSettingsProps) => {
const { t } = useTranslation()
const pipelineId = useDatasetDetailContextWithSelector(s => s.dataset?.pipeline_id)
const { push } = useRouter()
const [estimateData, setEstimateData] = useState<FileIndexingEstimateResponse | undefined>(undefined)
const pipelineId = useDatasetDetailContextWithSelector(state => state.dataset?.pipeline_id)
const isPreview = useRef(false)
const formRef = useRef<any>(null)
const { data: documentDetail, error, isFetching: isFetchingDocumentDetail } = useDocumentDetail({
datasetId,
documentId,
params: { metadata: 'without' },
const { data: lastRunData, isFetching: isFetchingLastRunData, isError } = usePipelineExecutionLog({
dataset_id: datasetId,
document_id: documentId,
})
const handlePreviewChunks = useCallback(async (data: Record<string, any>) => {
// todo: Preview
}, [])
const files = useMemo(() => {
const files: CustomFile[] = []
if (lastRunData?.datasource_type === DatasourceType.localFile) {
const { related_id, name, extension } = lastRunData.datasource_info
files.push({
id: related_id,
name,
extension,
} as CustomFile)
}
return files
}, [lastRunData])
const websitePages = useMemo(() => {
const websitePages: CrawlResultItem[] = []
if (lastRunData?.datasource_type === DatasourceType.websiteCrawl) {
const { content, description, source_url, title } = lastRunData.datasource_info
websitePages.push({
markdown: content,
description,
source_url,
title,
})
}
return websitePages
}, [lastRunData])
const onlineDocuments = useMemo(() => {
const onlineDocuments: NotionPage[] = []
if (lastRunData?.datasource_type === DatasourceType.onlineDocument) {
const { workspace_id, page } = lastRunData.datasource_info
onlineDocuments.push({
workspace_id,
...page,
})
}
return onlineDocuments
}, [lastRunData])
const { mutateAsync: runPublishedPipeline, isIdle, isPending } = useRunPublishedPipeline()
const handlePreviewChunks = useCallback(async (data: Record<string, any>) => {
if (!lastRunData)
return
const datasourceInfoList: Record<string, any>[] = []
const documentInfo = lastRunData.datasource_info
datasourceInfoList.push(documentInfo)
await runPublishedPipeline({
pipeline_id: pipelineId!,
inputs: data,
start_node_id: lastRunData.datasource_node_id,
datasource_type: lastRunData.datasource_type,
datasource_info_list: datasourceInfoList,
is_preview: true,
}, {
onSuccess: (res) => {
setEstimateData((res as PublishedPipelineRunPreviewResponse).data.outputs)
},
})
}, [lastRunData, pipelineId, runPublishedPipeline])
const invalidDocumentList = useInvalidDocumentList(datasetId)
const handleProcess = useCallback(async (data: Record<string, any>) => {
// todo: Process
}, [])
if (!lastRunData)
return
const datasourceInfoList: Record<string, any>[] = []
const documentInfo = lastRunData.datasource_info
datasourceInfoList.push(documentInfo)
await runPublishedPipeline({
pipeline_id: pipelineId!,
inputs: data,
start_node_id: lastRunData.datasource_node_id,
datasource_type: lastRunData.datasource_type,
datasource_info_list: datasourceInfoList,
is_preview: false,
}, {
onSuccess: () => {
invalidDocumentList()
push(`/datasets/${datasetId}/documents/${documentId}`)
},
})
}, [datasetId, documentId, invalidDocumentList, lastRunData, pipelineId, push, runPublishedPipeline])
const onClickProcess = useCallback(() => {
isPreview.current = false
@ -55,25 +134,13 @@ const PipelineSettings = ({
isPreview.current ? handlePreviewChunks(data) : handleProcess(data)
}, [handlePreviewChunks, handleProcess])
const handlePreviewFileChange = useCallback((file: DocumentItem) => {
onClickPreview()
}, [onClickPreview])
const handlePreviewOnlineDocumentChange = useCallback((page: NotionPage) => {
onClickPreview()
}, [onClickPreview])
const handlePreviewWebsiteChange = useCallback((website: CrawlResultItem) => {
onClickPreview()
}, [onClickPreview])
if (isFetchingDocumentDetail) {
if (isFetchingLastRunData) {
return (
<Loading type='app' />
)
}
if (error)
if (isError)
return <AppUnavailable code={500} unknownReason={t('datasetCreation.error.unavailable') as string} />
return (
@ -85,7 +152,8 @@ const PipelineSettings = ({
<div className='grow overflow-y-auto'>
<ProcessDocuments
ref={formRef}
documentId={documentId}
lastRunInputData={lastRunData!.input_data}
datasourceNodeId={lastRunData!.datasource_node_id}
onProcess={onClickProcess}
onPreview={onClickPreview}
onSubmit={handleSubmit}
@ -95,22 +163,17 @@ const PipelineSettings = ({
{/* Preview */}
<div className='flex h-full flex-1 pl-2 pt-2'>
<ChunkPreview
dataSourceType={documentDetail!.data_source_type as DatasourceType}
// @ts-expect-error mock data // todo: remove mock data
files={[{
id: '12345678',
name: 'test-file',
extension: 'txt',
}]}
onlineDocuments={[]}
websitePages={[]}
isIdle={true}
isPending={true}
dataSourceType={lastRunData!.datasource_type}
files={files}
onlineDocuments={onlineDocuments}
websitePages={websitePages}
isIdle={isIdle}
isPending={isPending && isPreview.current}
estimateData={estimateData}
onPreview={onClickPreview}
handlePreviewFileChange={handlePreviewFileChange}
handlePreviewOnlineDocumentChange={handlePreviewOnlineDocumentChange}
handlePreviewWebsitePageChange={handlePreviewWebsiteChange}
handlePreviewFileChange={noop}
handlePreviewOnlineDocumentChange={noop}
handlePreviewWebsitePageChange={noop}
/>
</div>
</div>

View File

@ -3,11 +3,13 @@ import Button from '@/app/components/base/button'
import { useTranslation } from 'react-i18next'
type ActionsProps = {
runDisabled?: boolean
onProcess: () => void
}
const Actions = ({
onProcess,
runDisabled,
}: ActionsProps) => {
const { t } = useTranslation()
@ -16,6 +18,7 @@ const Actions = ({
<Button
variant='primary'
onClick={onProcess}
disabled={runDisabled}
>
{t('datasetPipeline.operations.saveAndProcess')}
</Button>

View File

@ -1,10 +1,58 @@
import type { BaseConfiguration } from '@/app/components/base/form/form-scenarios/base/types'
import { useMemo } from 'react'
import { useDatasetDetailContextWithSelector } from '@/context/dataset-detail'
import { usePublishedPipelineProcessingParams } from '@/service/use-pipeline'
import { VAR_TYPE_MAP } from '@/models/pipeline'
import { BaseFieldType } from '@/app/components/base/form/form-scenarios/base/types'
export const useConfigurations = (documentdId: string) => {
const initialData: Record<string, any> = {}
const configurations: BaseConfiguration[] = []
export const useConfigurations = (lastRunInputData: Record<string, any>, datasourceNodeId: string) => {
const pipelineId = useDatasetDetailContextWithSelector(state => state.dataset?.pipeline_id)
const { data: paramsConfig, isFetching: isFetchingParams } = usePublishedPipelineProcessingParams({
pipeline_id: pipelineId!,
node_id: datasourceNodeId,
})
const initialData = useMemo(() => {
const variables = paramsConfig?.variables || []
return variables.reduce((acc, item) => {
const type = VAR_TYPE_MAP[item.type]
const variableName = item.variable
if ([BaseFieldType.textInput, BaseFieldType.paragraph, BaseFieldType.select].includes(type))
acc[item.variable] = lastRunInputData[variableName] ?? ''
if (type === BaseFieldType.numberInput)
acc[item.variable] = lastRunInputData[variableName] ?? 0
if (type === BaseFieldType.checkbox)
acc[item.variable] = lastRunInputData[variableName]
if ([BaseFieldType.file, BaseFieldType.fileList].includes(type))
acc[item.variable] = lastRunInputData[variableName]
return acc
}, {} as Record<string, any>)
}, [lastRunInputData, paramsConfig?.variables])
const configurations = useMemo(() => {
const variables = paramsConfig?.variables || []
const configs = variables.map(item => ({
type: VAR_TYPE_MAP[item.type],
variable: item.variable,
label: item.label,
required: item.required,
maxLength: item.max_length,
options: item.options?.map(option => ({
label: option,
value: option,
})),
showConditions: [],
placeholder: item.placeholder,
tooltip: item.tooltips,
unit: item.unit,
allowedFileTypes: item.allowed_file_types,
allowedFileExtensions: item.allowed_file_extensions,
allowedFileUploadMethods: item.allowed_file_upload_methods,
}))
return configs
}, [paramsConfig])
return {
isFetchingParams,
initialData,
configurations,
}

View File

@ -4,7 +4,8 @@ import Actions from './actions'
import Form from '../../../../create-from-pipeline/process-documents/form'
type ProcessDocumentsProps = {
documentId: string
datasourceNodeId: string
lastRunInputData: Record<string, any>
ref: React.RefObject<any>
onProcess: () => void
onPreview: () => void
@ -12,13 +13,14 @@ type ProcessDocumentsProps = {
}
const ProcessDocuments = ({
documentId,
datasourceNodeId,
lastRunInputData,
onProcess,
onPreview,
onSubmit,
ref,
}: ProcessDocumentsProps) => {
const { initialData, configurations } = useConfigurations(documentId)
const { isFetchingParams, initialData, configurations } = useConfigurations(lastRunInputData, datasourceNodeId)
const schema = generateZodSchema(configurations)
return (
@ -31,7 +33,7 @@ const ProcessDocuments = ({
onSubmit={onSubmit}
onPreview={onPreview}
/>
<Actions onProcess={onProcess} />
<Actions runDisabled={isFetchingParams} onProcess={onProcess} />
</div>
)
}

View File

@ -242,3 +242,15 @@ export type InitialDocumentDetail = {
name: string
position: number
}
export type PipelineExecutionLogRequest = {
dataset_id: string
document_id: string
}
export type PipelineExecutionLogResponse = {
datasource_info: Record<string, any>
datasource_type: DatasourceType
input_data: Record<string, any>
datasource_node_id: string
}

View File

@ -8,6 +8,8 @@ import type {
ImportPipelineDSLRequest,
ImportPipelineDSLResponse,
PipelineCheckDependenciesResponse,
PipelineExecutionLogRequest,
PipelineExecutionLogResponse,
PipelinePreProcessingParamsRequest,
PipelinePreProcessingParamsResponse,
PipelineProcessingParamsRequest,
@ -311,3 +313,14 @@ export const usePublishAsCustomizedPipeline = () => {
},
})
}
export const usePipelineExecutionLog = (params: PipelineExecutionLogRequest) => {
const { dataset_id, document_id } = params
return useQuery<PipelineExecutionLogResponse>({
queryKey: [NAME_SPACE, 'pipeline-execution-log', dataset_id, document_id],
queryFn: () => {
return get<PipelineExecutionLogResponse>(`/datasets/${dataset_id}/documents/${document_id}/pipeline-execution-log`)
},
staleTime: 0,
})
}