feat(datasource): change datasource result type to event-stream

This commit is contained in:
Dongyu Li 2025-06-18 16:27:10 +08:00
parent 8aca70cd50
commit 43e5798e13
1 changed files with 0 additions and 61 deletions

View File

@ -423,67 +423,6 @@ class RagPipelineService:
return workflow_node_execution
# def run_datasource_workflow_node_status(
# self, pipeline: Pipeline, node_id: str, job_id: str, account: Account,
# datasource_type: str, is_published: bool
# ) -> dict:
# """
# Run published workflow datasource
# """
# if is_published:
# # fetch published workflow by app_model
# workflow = self.get_published_workflow(pipeline=pipeline)
# else:
# workflow = self.get_draft_workflow(pipeline=pipeline)
# if not workflow:
# raise ValueError("Workflow not initialized")
#
# # run draft workflow node
# datasource_node_data = None
# start_at = time.perf_counter()
# datasource_nodes = workflow.graph_dict.get("nodes", [])
# for datasource_node in datasource_nodes:
# if datasource_node.get("id") == node_id:
# datasource_node_data = datasource_node.get("data", {})
# break
# if not datasource_node_data:
# raise ValueError("Datasource node data not found")
#
# from core.datasource.datasource_manager import DatasourceManager
#
# datasource_runtime = DatasourceManager.get_datasource_runtime(
# provider_id=f"{datasource_node_data.get('plugin_id')}/{datasource_node_data.get('provider_name')}",
# datasource_name=datasource_node_data.get("datasource_name"),
# tenant_id=pipeline.tenant_id,
# datasource_type=DatasourceProviderType(datasource_type),
# )
# datasource_provider_service = DatasourceProviderService()
# credentials = datasource_provider_service.get_real_datasource_credentials(
# tenant_id=pipeline.tenant_id,
# provider=datasource_node_data.get('provider_name'),
# plugin_id=datasource_node_data.get('plugin_id'),
# )
# if credentials:
# datasource_runtime.runtime.credentials = credentials[0].get("credentials")
# match datasource_type:
#
# case DatasourceProviderType.WEBSITE_CRAWL:
# datasource_runtime = cast(WebsiteCrawlDatasourcePlugin, datasource_runtime)
# website_crawl_results: list[WebsiteCrawlMessage] = []
# for website_message in datasource_runtime.get_website_crawl(
# user_id=account.id,
# datasource_parameters={"job_id": job_id},
# provider_type=datasource_runtime.datasource_provider_type(),
# ):
# website_crawl_results.append(website_message)
# return {
# "result": [result for result in website_crawl_results.result],
# "status": website_crawl_results.result.status,
# "provider_type": datasource_node_data.get("provider_type"),
# }
# case _:
# raise ValueError(f"Unsupported datasource provider: {datasource_runtime.datasource_provider_type}")
def run_datasource_workflow_node(
self, pipeline: Pipeline, node_id: str, user_inputs: dict, account: Account, datasource_type: str,
is_published: bool