From 9cfb531e3b26ac26e3f1280f104414a2947ba633 Mon Sep 17 00:00:00 2001 From: jyong <718720800@qq.com> Date: Tue, 29 Jul 2025 15:23:11 +0800 Subject: [PATCH] transform document --- .../rag_pipeline_transform_service.py | 106 +++++++++++++++++- 1 file changed, 101 insertions(+), 5 deletions(-) diff --git a/api/services/rag_pipeline/rag_pipeline_transform_service.py b/api/services/rag_pipeline/rag_pipeline_transform_service.py index f964af5ae2..025aaa68aa 100644 --- a/api/services/rag_pipeline/rag_pipeline_transform_service.py +++ b/api/services/rag_pipeline/rag_pipeline_transform_service.py @@ -1,6 +1,7 @@ import json from datetime import UTC, datetime from pathlib import Path +from pydoc import doc from uuid import uuid4 import yaml @@ -13,8 +14,10 @@ from core.plugin.impl.plugin import PluginInstaller from core.tools.tool_manager import ToolManager from extensions.ext_database import db from factories import variable_factory -from models.dataset import Dataset, Pipeline +from models.dataset import Dataset, Document, DocumentPipelineExecutionLog, Pipeline +from models.model import UploadFile from models.workflow import Workflow, WorkflowType +from services.auth import firecrawl from services.entities.knowledge_entities.rag_pipeline_entities import KnowledgeConfiguration, RetrievalSetting from services.plugin.plugin_migration import PluginMigration @@ -78,6 +81,9 @@ class RagPipelineTransformService: dataset.runtime_mode = "rag_pipeline" dataset.pipeline_id = pipeline.id + # deal document data + self._deal_document_data(dataset) + db.session.commit() return { "pipeline_id": pipeline.id, @@ -238,12 +244,8 @@ class RagPipelineTransformService: installer_manager = PluginInstaller() installed_plugins = installer_manager.list_plugins(tenant_id) - datasource_manager = PluginDatasourceManager() - plugin_migration = PluginMigration() - tool_manager = ToolManager() - installed_plugins_ids = [plugin.plugin_id for plugin in installed_plugins] dependencies = pipeline_yaml.get("dependencies", []) need_install_plugin_unique_identifiers = [] @@ -289,3 +291,97 @@ class RagPipelineTransformService: "dataset_id": dataset.id, "status": "success", } + + def _deal_document_data(self, dataset: Dataset): + file_node_id = "1752479895761" + notion_node_id = "1752489759475" + jina_node_id = "1752491761974" + firecrawl_node_id = "1752565402678" + + documents = db.session.query(Document).filter(Document.dataset_id == dataset.id).all() + + for document in documents: + data_source_info_dict = document.data_source_info_dict + if not data_source_info_dict: + continue + if document.data_source_type == "upload_file": + document.data_source_type = "local_file" + file_id = data_source_info_dict.get("upload_file_id") + if file_id: + file = db.session.query(UploadFile).filter(UploadFile.id == file_id).first() + if file: + data_source_info = json.dumps({ + "real_file_id": file_id, + "name": file.name, + "size": file.size, + "extension": file.extension, + "mime_type": file.mime_type, + "url": "", + "transfer_method": "local_file", + }) + document.data_source_info = data_source_info + document_pipeline_execution_log = DocumentPipelineExecutionLog( + document_id=document.id, + pipeline_id=dataset.pipeline_id, + datasource_type="local_file", + datasource_info=data_source_info, + input_data={}, + created_by=document.created_by, + created_at=document.created_at, + datasource_node_id=file_node_id, + ) + db.session.add(document) + db.session.add(document_pipeline_execution_log) + elif document.data_source_type == "notion_import": + document.data_source_type = "online_document" + data_source_info = json.dumps({ + "workspace_id": data_source_info_dict.get("notion_workspace_id"), + "page": { + "page_id": data_source_info_dict.get("notion_page_id"), + "page_name": document.name, + "page_icon": data_source_info_dict.get("notion_page_icon"), + "type": data_source_info_dict.get("type"), + "last_edited_time": data_source_info_dict.get("last_edited_time"), + "parent_id": None, + }, + }) + document.data_source_info = data_source_info + document_pipeline_execution_log = DocumentPipelineExecutionLog( + document_id=document.id, + pipeline_id=dataset.pipeline_id, + datasource_type="online_document", + datasource_info=data_source_info, + input_data={}, + created_by=document.created_by, + created_at=document.created_at, + datasource_node_id=notion_node_id, + ) + db.session.add(document) + db.session.add(document_pipeline_execution_log) + elif document.data_source_type == "website_crawl": + document.data_source_type = "website_crawl" + data_source_info = json.dumps({ + "source_url": data_source_info_dict.get("url"), + "content": "", + "title": document.name, + "description": "", + }) + document.data_source_info = data_source_info + if data_source_info_dict.get("provider") == "firecrawl": + datasource_node_id = firecrawl_node_id + elif data_source_info_dict.get("provider") == "jinareader": + datasource_node_id = jina_node_id + else: + continue + document_pipeline_execution_log = DocumentPipelineExecutionLog( + document_id=document.id, + pipeline_id=dataset.pipeline_id, + datasource_type="website_crawl", + datasource_info=data_source_info, + input_data={}, + created_by=document.created_by, + created_at=document.created_at, + datasource_node_id=datasource_node_id, + ) + db.session.add(document) + db.session.add(document_pipeline_execution_log) \ No newline at end of file