transform document

This commit is contained in:
jyong 2025-07-29 15:23:11 +08:00
parent e89398f415
commit 9cfb531e3b

View File

@ -1,6 +1,7 @@
import json
from datetime import UTC, datetime
from pathlib import Path
from pydoc import doc
from uuid import uuid4
import yaml
@ -13,8 +14,10 @@ from core.plugin.impl.plugin import PluginInstaller
from core.tools.tool_manager import ToolManager
from extensions.ext_database import db
from factories import variable_factory
from models.dataset import Dataset, Pipeline
from models.dataset import Dataset, Document, DocumentPipelineExecutionLog, Pipeline
from models.model import UploadFile
from models.workflow import Workflow, WorkflowType
from services.auth import firecrawl
from services.entities.knowledge_entities.rag_pipeline_entities import KnowledgeConfiguration, RetrievalSetting
from services.plugin.plugin_migration import PluginMigration
@ -78,6 +81,9 @@ class RagPipelineTransformService:
dataset.runtime_mode = "rag_pipeline"
dataset.pipeline_id = pipeline.id
# deal document data
self._deal_document_data(dataset)
db.session.commit()
return {
"pipeline_id": pipeline.id,
@ -238,12 +244,8 @@ class RagPipelineTransformService:
installer_manager = PluginInstaller()
installed_plugins = installer_manager.list_plugins(tenant_id)
datasource_manager = PluginDatasourceManager()
plugin_migration = PluginMigration()
tool_manager = ToolManager()
installed_plugins_ids = [plugin.plugin_id for plugin in installed_plugins]
dependencies = pipeline_yaml.get("dependencies", [])
need_install_plugin_unique_identifiers = []
@ -289,3 +291,97 @@ class RagPipelineTransformService:
"dataset_id": dataset.id,
"status": "success",
}
def _deal_document_data(self, dataset: Dataset):
file_node_id = "1752479895761"
notion_node_id = "1752489759475"
jina_node_id = "1752491761974"
firecrawl_node_id = "1752565402678"
documents = db.session.query(Document).filter(Document.dataset_id == dataset.id).all()
for document in documents:
data_source_info_dict = document.data_source_info_dict
if not data_source_info_dict:
continue
if document.data_source_type == "upload_file":
document.data_source_type = "local_file"
file_id = data_source_info_dict.get("upload_file_id")
if file_id:
file = db.session.query(UploadFile).filter(UploadFile.id == file_id).first()
if file:
data_source_info = json.dumps({
"real_file_id": file_id,
"name": file.name,
"size": file.size,
"extension": file.extension,
"mime_type": file.mime_type,
"url": "",
"transfer_method": "local_file",
})
document.data_source_info = data_source_info
document_pipeline_execution_log = DocumentPipelineExecutionLog(
document_id=document.id,
pipeline_id=dataset.pipeline_id,
datasource_type="local_file",
datasource_info=data_source_info,
input_data={},
created_by=document.created_by,
created_at=document.created_at,
datasource_node_id=file_node_id,
)
db.session.add(document)
db.session.add(document_pipeline_execution_log)
elif document.data_source_type == "notion_import":
document.data_source_type = "online_document"
data_source_info = json.dumps({
"workspace_id": data_source_info_dict.get("notion_workspace_id"),
"page": {
"page_id": data_source_info_dict.get("notion_page_id"),
"page_name": document.name,
"page_icon": data_source_info_dict.get("notion_page_icon"),
"type": data_source_info_dict.get("type"),
"last_edited_time": data_source_info_dict.get("last_edited_time"),
"parent_id": None,
},
})
document.data_source_info = data_source_info
document_pipeline_execution_log = DocumentPipelineExecutionLog(
document_id=document.id,
pipeline_id=dataset.pipeline_id,
datasource_type="online_document",
datasource_info=data_source_info,
input_data={},
created_by=document.created_by,
created_at=document.created_at,
datasource_node_id=notion_node_id,
)
db.session.add(document)
db.session.add(document_pipeline_execution_log)
elif document.data_source_type == "website_crawl":
document.data_source_type = "website_crawl"
data_source_info = json.dumps({
"source_url": data_source_info_dict.get("url"),
"content": "",
"title": document.name,
"description": "",
})
document.data_source_info = data_source_info
if data_source_info_dict.get("provider") == "firecrawl":
datasource_node_id = firecrawl_node_id
elif data_source_info_dict.get("provider") == "jinareader":
datasource_node_id = jina_node_id
else:
continue
document_pipeline_execution_log = DocumentPipelineExecutionLog(
document_id=document.id,
pipeline_id=dataset.pipeline_id,
datasource_type="website_crawl",
datasource_info=data_source_info,
input_data={},
created_by=document.created_by,
created_at=document.created_at,
datasource_node_id=datasource_node_id,
)
db.session.add(document)
db.session.add(document_pipeline_execution_log)