diff --git a/web/app/components/rag-pipeline/components/panel/test-run/data-source/online-documents/online-document-selector.tsx b/web/app/components/rag-pipeline/components/panel/test-run/data-source/online-documents/online-document-selector.tsx index 5ad26034b1..d89d214fb9 100644 --- a/web/app/components/rag-pipeline/components/panel/test-run/data-source/online-documents/online-document-selector.tsx +++ b/web/app/components/rag-pipeline/components/panel/test-run/data-source/online-documents/online-document-selector.tsx @@ -1,12 +1,14 @@ -import { useCallback, useEffect, useMemo, useRef, useState } from 'react' +import { useCallback, useEffect, useMemo, useState } from 'react' import WorkspaceSelector from '@/app/components/base/notion-page-selector/workspace-selector' import SearchInput from '@/app/components/base/notion-page-selector/search-input' import PageSelector from '@/app/components/base/notion-page-selector/page-selector' import type { DataSourceNotionPageMap, DataSourceNotionWorkspace, NotionPage } from '@/models/common' import Header from '@/app/components/datasets/create/website/base/header' import { useDatasetDetailContextWithSelector } from '@/context/dataset-detail' -import { useDraftDatasourceNodeRun, usePublishedDatasourceNodeRun } from '@/service/use-pipeline' import { DatasourceType } from '@/models/pipeline' +import { ssePost } from '@/service/base' +import Toast from '@/app/components/base/toast' +import type { DataSourceNodeCompletedResponse } from '@/types/pipeline' type OnlineDocumentSelectorProps = { value?: string[] @@ -33,28 +35,37 @@ const OnlineDocumentSelector = ({ nodeId, headerInfo, }: OnlineDocumentSelectorProps) => { - const pipeline_id = useDatasetDetailContextWithSelector(s => s.dataset?.pipeline_id) + const pipelineId = useDatasetDetailContextWithSelector(s => s.dataset?.pipeline_id) const [documentsData, setDocumentsData] = useState([]) const [searchValue, setSearchValue] = useState('') const [currentWorkspaceId, setCurrentWorkspaceId] = useState('') - const useDatasourceNodeRun = useRef(!isInPipeline ? usePublishedDatasourceNodeRun : useDraftDatasourceNodeRun) - const { mutateAsync: crawlOnlineDocuments } = useDatasourceNodeRun.current() + const datasourceNodeRunURL = !isInPipeline + ? `/rag/pipelines/${pipelineId}/workflows/published/datasource/nodes/${nodeId}/run` + : `/rag/pipelines/${pipelineId}/workflows/draft/datasource/nodes/${nodeId}/run` const getOnlineDocuments = useCallback(async () => { - if (pipeline_id) { - await crawlOnlineDocuments({ - pipeline_id, - node_id: nodeId, - inputs: {}, - datasource_type: DatasourceType.onlineDocument, - }, { - onSuccess(documentsData) { - setDocumentsData(documentsData.result as DataSourceNotionWorkspace[]) + ssePost( + datasourceNodeRunURL, + { + body: { + inputs: {}, + datasource_type: DatasourceType.onlineDocument, }, - }) - } - }, [crawlOnlineDocuments, nodeId, pipeline_id]) + }, + { + onDataSourceNodeCompleted: (documentsData: DataSourceNodeCompletedResponse) => { + setDocumentsData(documentsData.data as DataSourceNotionWorkspace[]) + }, + onError: (message: string) => { + Toast.notify({ + type: 'error', + message, + }) + }, + }, + ) + }, [datasourceNodeRunURL]) useEffect(() => { getOnlineDocuments() diff --git a/web/app/components/rag-pipeline/components/panel/test-run/data-source/website-crawl/base/crawler.tsx b/web/app/components/rag-pipeline/components/panel/test-run/data-source/website-crawl/base/crawler.tsx index f85208dbc5..c65f13c78b 100644 --- a/web/app/components/rag-pipeline/components/panel/test-run/data-source/website-crawl/base/crawler.tsx +++ b/web/app/components/rag-pipeline/components/panel/test-run/data-source/website-crawl/base/crawler.tsx @@ -8,16 +8,16 @@ import Crawling from './crawling' import ErrorMessage from './error-message' import CrawledResult from './crawled-result' import { - useDraftDatasourceNodeRun, - useDraftDatasourceNodeRunStatus, useDraftPipelinePreProcessingParams, - usePublishedDatasourceNodeRun, - usePublishedDatasourceNodeRunStatus, usePublishedPipelinePreProcessingParams, } from '@/service/use-pipeline' import { useDatasetDetailContextWithSelector } from '@/context/dataset-detail' import { DatasourceType } from '@/models/pipeline' -import { sleep } from '@/utils' +import { ssePost } from '@/service/base' +import type { + DataSourceNodeCompletedResponse, + DataSourceNodeProcessingResponse, +} from '@/types/pipeline' const I18N_PREFIX = 'datasetCreation.stepOne.website' @@ -51,6 +51,8 @@ const Crawler = ({ const { t } = useTranslation() const [step, setStep] = useState(Step.init) const [controlFoldOptions, setControlFoldOptions] = useState(0) + const [totalNum, setTotalNum] = useState(0) + const [crawledNum, setCrawledNum] = useState(0) const pipelineId = useDatasetDetailContextWithSelector(s => s.dataset?.pipeline_id) const usePreProcessingParams = useRef(!isInPipeline ? usePublishedPipelinePreProcessingParams : useDraftPipelinePreProcessingParams) @@ -68,66 +70,49 @@ const Crawler = ({ const isCrawlFinished = step === Step.finished const isRunning = step === Step.running const [crawlResult, setCrawlResult] = useState<{ - result: CrawlResultItem[] + data: CrawlResultItem[] time_consuming: number | string } | undefined>(undefined) const [crawlErrorMessage, setCrawlErrorMessage] = useState('') const showError = isCrawlFinished && crawlErrorMessage - const useDatasourceNodeRun = useRef(!isInPipeline ? usePublishedDatasourceNodeRun : useDraftDatasourceNodeRun) - const useDatasourceNodeRunStatus = useRef(!isInPipeline ? usePublishedDatasourceNodeRunStatus : useDraftDatasourceNodeRunStatus) - const { mutateAsync: runDatasourceNode } = useDatasourceNodeRun.current() - const { mutateAsync: getDatasourceNodeRunStatus } = useDatasourceNodeRunStatus.current() - - const checkCrawlStatus = useCallback(async (jobId: string) => { - const res = await getDatasourceNodeRunStatus({ - node_id: nodeId, - pipeline_id: pipelineId!, - job_id: jobId, - datasource_type: DatasourceType.websiteCrawl, - }, { - onError: async (error: any) => { - const message = await error.json() - setCrawlErrorMessage(message || t(`${I18N_PREFIX}.unknownError`)) - }, - }) as any - if (res.status === 'completed') { - setCrawlResult(res) - onCheckedCrawlResultChange(res.result || []) // default select the crawl result - setCrawlErrorMessage('') - setStep(Step.finished) - } - else if (res.status === 'processing') { - await sleep(2500) - await checkCrawlStatus(jobId) - } - }, [getDatasourceNodeRunStatus, nodeId, pipelineId, t, onCheckedCrawlResultChange]) + const datasourceNodeRunURL = !isInPipeline + ? `/rag/pipelines/${pipelineId}/workflows/published/datasource/nodes/${nodeId}/run` + : `/rag/pipelines/${pipelineId}/workflows/draft/datasource/nodes/${nodeId}/run` const handleRun = useCallback(async (value: Record) => { setStep(Step.running) - const res = await runDatasourceNode({ - node_id: nodeId, - pipeline_id: pipelineId!, - inputs: value, - datasource_type: DatasourceType.websiteCrawl, - }, { - onError: async (error: any) => { - const message = await error.json() - setCrawlErrorMessage(message || t(`${I18N_PREFIX}.unknownError`)) - setStep(Step.finished) + ssePost( + datasourceNodeRunURL, + { + body: { + inputs: value, + datasource_type: DatasourceType.websiteCrawl, + response_mode: 'streaming', + }, }, - }) as any - const jobId = res.job_id - if (!jobId && res.status === 'completed') { - setCrawlResult(res) - onCheckedCrawlResultChange(res.result || []) // default select the crawl result - setStep(Step.finished) - } - else if (jobId) { - await checkCrawlStatus(jobId) - } - setCrawlErrorMessage('') - }, [runDatasourceNode, nodeId, pipelineId, onCheckedCrawlResultChange, checkCrawlStatus, t]) + { + onDataSourceNodeProcessing: (data: DataSourceNodeProcessingResponse) => { + setTotalNum(data.total ?? 0) + setCrawledNum(data.completed ?? 0) + }, + onDataSourceNodeCompleted: (data: DataSourceNodeCompletedResponse) => { + const { data: crawlData, time_consuming } = data + setCrawlResult({ + data: crawlData as CrawlResultItem[], + time_consuming: time_consuming ?? 0, + }) + onCheckedCrawlResultChange(crawlData || []) // default select the crawl result + setCrawlErrorMessage('') + setStep(Step.finished) + }, + onError: (message: string) => { + setCrawlErrorMessage(message || t(`${I18N_PREFIX}.unknownError`)) + setStep(Step.finished) + }, + }, + ) + }, [datasourceNodeRunURL, onCheckedCrawlResultChange, t]) const handleSubmit = useCallback((value: Record) => { handleRun(value) @@ -152,8 +137,8 @@ const Crawler = ({
{isRunning && ( )} {showError && ( @@ -166,7 +151,7 @@ const Crawler = ({ {isCrawlFinished && !showError && ( - datasource_type: DatasourceType -} - -export type PipelineDatasourceNodeRunResponse = { - job_id?: string - status: 'processing' | 'completed' - result: any - provider_type: DatasourceType -} - -export type PipelineDatasourceNodeRunStatusRequest = { - pipeline_id: string - node_id: string - job_id: string - datasource_type: DatasourceType -} - -export type PipelineDatasourceNodeRunStatusResponse = { - provider_type: DatasourceType - result: Record - status: 'processing' | 'completed' - job_id: string -} - export type PublishedPipelineInfoResponse = { id: string graph: { diff --git a/web/service/base.ts b/web/service/base.ts index ba398c07a6..80ce3b801d 100644 --- a/web/service/base.ts +++ b/web/service/base.ts @@ -25,6 +25,10 @@ import { removeAccessToken } from '@/app/components/share/utils' import type { FetchOptionType, ResponseError } from './fetch' import { ContentType, base, baseOptions, getAccessToken } from './fetch' import { asyncRunSafe } from '@/utils' +import type { + DataSourceNodeCompletedResponse, + DataSourceNodeProcessingResponse, +} from '@/types/pipeline' const TIME_OUT = 100000 export type IOnDataMoreInfo = { @@ -63,6 +67,9 @@ export type IOnLoopNext = (workflowStarted: LoopNextResponse) => void export type IOnLoopFinished = (workflowFinished: LoopFinishedResponse) => void export type IOnAgentLog = (agentLog: AgentLogResponse) => void +export type IOnDataSourceNodeProcessing = (dataSourceNodeProcessing: DataSourceNodeProcessingResponse) => void +export type IOnDataSourceNodeCompleted = (dataSourceNodeCompleted: DataSourceNodeCompletedResponse) => void + export type IOtherOptions = { isPublicAPI?: boolean isMarketplaceAPI?: boolean @@ -97,6 +104,10 @@ export type IOtherOptions = { onLoopNext?: IOnLoopNext onLoopFinish?: IOnLoopFinished onAgentLog?: IOnAgentLog + + // Pipeline data source node run + onDataSourceNodeProcessing?: IOnDataSourceNodeProcessing + onDataSourceNodeCompleted?: IOnDataSourceNodeCompleted } function unicodeToChar(text: string) { @@ -152,6 +163,8 @@ const handleStream = ( onTTSEnd?: IOnTTSEnd, onTextReplace?: IOnTextReplace, onAgentLog?: IOnAgentLog, + onDataSourceNodeProcessing?: IOnDataSourceNodeProcessing, + onDataSourceNodeCompleted?: IOnDataSourceNodeCompleted, ) => { if (!response.ok) throw new Error('Network response was not ok') @@ -270,6 +283,15 @@ const handleStream = ( else if (bufferObj.event === 'tts_message_end') { onTTSEnd?.(bufferObj.message_id, bufferObj.audio) } + else if (bufferObj.event === 'datasource_processing') { + onDataSourceNodeProcessing?.(bufferObj as DataSourceNodeProcessingResponse) + } + else if (bufferObj.event === 'datasource_completed') { + onDataSourceNodeCompleted?.(bufferObj as DataSourceNodeCompletedResponse) + } + else { + console.warn(`Unknown event: ${bufferObj.event}`, bufferObj) + } } }) buffer = lines[lines.length - 1] @@ -363,6 +385,8 @@ export const ssePost = async ( onLoopStart, onLoopNext, onLoopFinish, + onDataSourceNodeProcessing, + onDataSourceNodeCompleted, } = otherOptions const abortController = new AbortController() @@ -460,6 +484,8 @@ export const ssePost = async ( onTTSEnd, onTextReplace, onAgentLog, + onDataSourceNodeProcessing, + onDataSourceNodeCompleted, ) }).catch((e) => { if (e.toString() !== 'AbortError: The user aborted a request.' && !e.toString().errorMessage.includes('TypeError: Cannot assign to read only property')) diff --git a/web/service/use-pipeline.ts b/web/service/use-pipeline.ts index 10b76d1a54..f8c047000c 100644 --- a/web/service/use-pipeline.ts +++ b/web/service/use-pipeline.ts @@ -8,10 +8,6 @@ import type { ImportPipelineDSLRequest, ImportPipelineDSLResponse, PipelineCheckDependenciesResponse, - PipelineDatasourceNodeRunRequest, - PipelineDatasourceNodeRunResponse, - PipelineDatasourceNodeRunStatusRequest, - PipelineDatasourceNodeRunStatusResponse, PipelinePreProcessingParamsRequest, PipelinePreProcessingParamsResponse, PipelineProcessingParamsRequest, @@ -133,66 +129,6 @@ export const useCheckPipelineDependencies = ( }) } -export const useDraftDatasourceNodeRun = ( - mutationOptions: MutationOptions = {}, -) => { - return useMutation({ - mutationKey: [NAME_SPACE, 'draft-datasource-node-run'], - mutationFn: (request: PipelineDatasourceNodeRunRequest) => { - const { pipeline_id, node_id, ...rest } = request - return post(`/rag/pipelines/${pipeline_id}/workflows/draft/datasource/nodes/${node_id}/run`, { - body: rest, - }) - }, - ...mutationOptions, - }) -} - -export const usePublishedDatasourceNodeRun = ( - mutationOptions: MutationOptions = {}, -) => { - return useMutation({ - mutationKey: [NAME_SPACE, 'published-datasource-node-run'], - mutationFn: (request: PipelineDatasourceNodeRunRequest) => { - const { pipeline_id, node_id, ...rest } = request - return post(`/rag/pipelines/${pipeline_id}/workflows/published/datasource/nodes/${node_id}/run`, { - body: rest, - }) - }, - ...mutationOptions, - }) -} - -export const useDraftDatasourceNodeRunStatus = ( - mutationOptions: MutationOptions = {}, -) => { - return useMutation({ - mutationKey: [NAME_SPACE, 'draft-datasource-node-run-status'], - mutationFn: (request: PipelineDatasourceNodeRunStatusRequest) => { - const { pipeline_id, node_id, ...rest } = request - return post(`/rag/pipelines/${pipeline_id}/workflows/draft/datasource/nodes/${node_id}/run`, { - body: rest, - }) - }, - ...mutationOptions, - }) -} - -export const usePublishedDatasourceNodeRunStatus = ( - mutationOptions: MutationOptions = {}, -) => { - return useMutation({ - mutationKey: [NAME_SPACE, 'published-datasource-node-run-status'], - mutationFn: (request: PipelineDatasourceNodeRunStatusRequest) => { - const { pipeline_id, node_id, ...rest } = request - return post(`/rag/pipelines/${pipeline_id}/workflows/published/datasource/nodes/${node_id}/run`, { - body: rest, - }) - }, - ...mutationOptions, - }) -} - export const useDraftPipelineProcessingParams = (params: PipelineProcessingParamsRequest, enabled = true) => { const { pipeline_id, node_id } = params return useQuery({ diff --git a/web/types/pipeline.tsx b/web/types/pipeline.tsx new file mode 100644 index 0000000000..bd26ea0bb1 --- /dev/null +++ b/web/types/pipeline.tsx @@ -0,0 +1,17 @@ +export type DataSourceNodeProcessingResponse = { + event: 'datasource_processing' + total: number + completed: number +} + +export type DataSourceNodeError = { + event: 'datasource_error' + message: string + code?: string +} + +export type DataSourceNodeCompletedResponse = { + event: 'datasource_completed' + data: any + time_consuming?: number +}