From 224111081bb2217162418162b1940715023eaa74 Mon Sep 17 00:00:00 2001 From: Dongyu Li <544104925@qq.com> Date: Wed, 18 Jun 2025 16:04:40 +0800 Subject: [PATCH] feat(datasource): change datasource result type to event-stream --- .../rag_pipeline/rag_pipeline_workflow.py | 42 +++++++++---------- .../app/apps/pipeline/pipeline_generator.py | 6 +-- api/core/rag/entities/event.py | 6 +-- api/services/rag_pipeline/rag_pipeline.py | 33 ++++++++------- 4 files changed, 45 insertions(+), 42 deletions(-) diff --git a/api/controllers/console/datasets/rag_pipeline/rag_pipeline_workflow.py b/api/controllers/console/datasets/rag_pipeline/rag_pipeline_workflow.py index 28ab4b1635..b040e70b92 100644 --- a/api/controllers/console/datasets/rag_pipeline/rag_pipeline_workflow.py +++ b/api/controllers/console/datasets/rag_pipeline/rag_pipeline_workflow.py @@ -414,16 +414,19 @@ class RagPipelinePublishedDatasourceNodeRunApi(Resource): raise ValueError("missing datasource_type") rag_pipeline_service = RagPipelineService() - result = rag_pipeline_service.run_datasource_workflow_node( - pipeline=pipeline, - node_id=node_id, - user_inputs=inputs, - account=current_user, - datasource_type=datasource_type, - is_published=True, + return helper.compact_generate_response( + PipelineGenerator.convert_to_event_stream( + rag_pipeline_service.run_datasource_workflow_node( + pipeline=pipeline, + node_id=node_id, + user_inputs=inputs, + account=current_user, + datasource_type=datasource_type, + is_published=False, + ) + ) ) - return result class RagPipelineDraftDatasourceNodeRunApi(Resource): @@ -455,21 +458,18 @@ class RagPipelineDraftDatasourceNodeRunApi(Resource): raise ValueError("missing datasource_type") rag_pipeline_service = RagPipelineService() - try: - return helper.compact_generate_response( - PipelineGenerator.convert_to_event_stream( - rag_pipeline_service.run_datasource_workflow_node( - pipeline=pipeline, - node_id=node_id, - user_inputs=inputs, - account=current_user, - datasource_type=datasource_type, - is_published=False, - ) + return helper.compact_generate_response( + PipelineGenerator.convert_to_event_stream( + rag_pipeline_service.run_datasource_workflow_node( + pipeline=pipeline, + node_id=node_id, + user_inputs=inputs, + account=current_user, + datasource_type=datasource_type, + is_published=False, ) ) - except Exception as e: - print(e) + ) class RagPipelinePublishedNodeRunApi(Resource): diff --git a/api/core/app/apps/pipeline/pipeline_generator.py b/api/core/app/apps/pipeline/pipeline_generator.py index e726ad4841..f0921c3442 100644 --- a/api/core/app/apps/pipeline/pipeline_generator.py +++ b/api/core/app/apps/pipeline/pipeline_generator.py @@ -2,14 +2,14 @@ import contextvars import datetime import json import logging -import random +import secrets import threading import time import uuid from collections.abc import Generator, Mapping from typing import Any, Literal, Optional, Union, overload -from flask import Flask, copy_current_request_context, current_app, has_request_context +from flask import Flask, current_app from pydantic import ValidationError from sqlalchemy.orm import sessionmaker @@ -110,7 +110,7 @@ class PipelineGenerator(BaseAppGenerator): start_node_id: str = args["start_node_id"] datasource_type: str = args["datasource_type"] datasource_info_list: list[Mapping[str, Any]] = args["datasource_info_list"] - batch = time.strftime("%Y%m%d%H%M%S") + str(random.randint(100000, 999999)) + batch = time.strftime("%Y%m%d%H%M%S") + str(secrets.randbelow(900000) + 100000) documents = [] if invoke_from == InvokeFrom.PUBLISHED: for datasource_info in datasource_info_list: diff --git a/api/core/rag/entities/event.py b/api/core/rag/entities/event.py index 4921c94557..59a470c35c 100644 --- a/api/core/rag/entities/event.py +++ b/api/core/rag/entities/event.py @@ -19,9 +19,9 @@ class BaseDatasourceEvent(BaseModel): class DatasourceCompletedEvent(BaseDatasourceEvent): event: str = DatasourceStreamEvent.COMPLETED.value data: Mapping[str,Any] | list = Field(..., description="result") - total: Optional[int] = Field(..., description="total") - completed: Optional[int] = Field(..., description="completed") - time_consuming: Optional[float] = Field(..., description="time consuming") + total: Optional[int] = Field(default=0, description="total") + completed: Optional[int] = Field(default=0, description="completed") + time_consuming: Optional[float] = Field(default=0.0, description="time consuming") class DatasourceProcessingEvent(BaseDatasourceEvent): event: str = DatasourceStreamEvent.PROCESSING.value diff --git a/api/services/rag_pipeline/rag_pipeline.py b/api/services/rag_pipeline/rag_pipeline.py index 26036dc2c5..9ddbb7c083 100644 --- a/api/services/rag_pipeline/rag_pipeline.py +++ b/api/services/rag_pipeline/rag_pipeline.py @@ -558,21 +558,24 @@ class RagPipelineService: provider_type=datasource_runtime.datasource_provider_type(), ) start_time = time.time() - for message in website_crawl_result: - end_time = time.time() - if message.result.status == "completed": - crawl_event = DatasourceCompletedEvent( - data=message.result.web_info_list, - total=message.result.total, - completed=message.result.completed, - time_consuming=round(end_time - start_time, 2) - ) - else: - crawl_event = DatasourceProcessingEvent( - total=message.result.total, - completed=message.result.completed, - ) - yield crawl_event.model_dump() + try: + for message in website_crawl_result: + end_time = time.time() + if message.result.status == "completed": + crawl_event = DatasourceCompletedEvent( + data=message.result.web_info_list, + total=message.result.total, + completed=message.result.completed, + time_consuming=round(end_time - start_time, 2) + ) + else: + crawl_event = DatasourceProcessingEvent( + total=message.result.total, + completed=message.result.completed, + ) + yield crawl_event.model_dump() + except Exception as e: + print(str(e)) case _: raise ValueError(f"Unsupported datasource provider: {datasource_runtime.datasource_provider_type}")