From 43e5798e13a2ec763774d871778749b449667e6b Mon Sep 17 00:00:00 2001 From: Dongyu Li <544104925@qq.com> Date: Wed, 18 Jun 2025 16:27:10 +0800 Subject: [PATCH] feat(datasource): change datasource result type to event-stream --- api/services/rag_pipeline/rag_pipeline.py | 61 ----------------------- 1 file changed, 61 deletions(-) diff --git a/api/services/rag_pipeline/rag_pipeline.py b/api/services/rag_pipeline/rag_pipeline.py index 9ddbb7c083..909df456d4 100644 --- a/api/services/rag_pipeline/rag_pipeline.py +++ b/api/services/rag_pipeline/rag_pipeline.py @@ -423,67 +423,6 @@ class RagPipelineService: return workflow_node_execution - # def run_datasource_workflow_node_status( - # self, pipeline: Pipeline, node_id: str, job_id: str, account: Account, - # datasource_type: str, is_published: bool - # ) -> dict: - # """ - # Run published workflow datasource - # """ - # if is_published: - # # fetch published workflow by app_model - # workflow = self.get_published_workflow(pipeline=pipeline) - # else: - # workflow = self.get_draft_workflow(pipeline=pipeline) - # if not workflow: - # raise ValueError("Workflow not initialized") - # - # # run draft workflow node - # datasource_node_data = None - # start_at = time.perf_counter() - # datasource_nodes = workflow.graph_dict.get("nodes", []) - # for datasource_node in datasource_nodes: - # if datasource_node.get("id") == node_id: - # datasource_node_data = datasource_node.get("data", {}) - # break - # if not datasource_node_data: - # raise ValueError("Datasource node data not found") - # - # from core.datasource.datasource_manager import DatasourceManager - # - # datasource_runtime = DatasourceManager.get_datasource_runtime( - # provider_id=f"{datasource_node_data.get('plugin_id')}/{datasource_node_data.get('provider_name')}", - # datasource_name=datasource_node_data.get("datasource_name"), - # tenant_id=pipeline.tenant_id, - # datasource_type=DatasourceProviderType(datasource_type), - # ) - # datasource_provider_service = DatasourceProviderService() - # credentials = datasource_provider_service.get_real_datasource_credentials( - # tenant_id=pipeline.tenant_id, - # provider=datasource_node_data.get('provider_name'), - # plugin_id=datasource_node_data.get('plugin_id'), - # ) - # if credentials: - # datasource_runtime.runtime.credentials = credentials[0].get("credentials") - # match datasource_type: - # - # case DatasourceProviderType.WEBSITE_CRAWL: - # datasource_runtime = cast(WebsiteCrawlDatasourcePlugin, datasource_runtime) - # website_crawl_results: list[WebsiteCrawlMessage] = [] - # for website_message in datasource_runtime.get_website_crawl( - # user_id=account.id, - # datasource_parameters={"job_id": job_id}, - # provider_type=datasource_runtime.datasource_provider_type(), - # ): - # website_crawl_results.append(website_message) - # return { - # "result": [result for result in website_crawl_results.result], - # "status": website_crawl_results.result.status, - # "provider_type": datasource_node_data.get("provider_type"), - # } - # case _: - # raise ValueError(f"Unsupported datasource provider: {datasource_runtime.datasource_provider_type}") - def run_datasource_workflow_node( self, pipeline: Pipeline, node_id: str, user_inputs: dict, account: Account, datasource_type: str, is_published: bool