perf: optimize save_document_with_dataset_id (#29550)

This commit is contained in:
wangxiaolei 2025-12-12 13:14:45 +08:00 committed by GitHub
parent 12e39365fa
commit ac40309850
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 70 additions and 54 deletions

View File

@ -1636,6 +1636,20 @@ class DocumentService:
return [], ""
db.session.add(dataset_process_rule)
db.session.flush()
else:
# Fallback when no process_rule provided in knowledge_config:
# 1) reuse dataset.latest_process_rule if present
# 2) otherwise create an automatic rule
dataset_process_rule = getattr(dataset, "latest_process_rule", None)
if not dataset_process_rule:
dataset_process_rule = DatasetProcessRule(
dataset_id=dataset.id,
mode="automatic",
rules=json.dumps(DatasetProcessRule.AUTOMATIC_RULES),
created_by=account.id,
)
db.session.add(dataset_process_rule)
db.session.flush()
lock_name = f"add_document_lock_dataset_id_{dataset.id}"
try:
with redis_client.lock(lock_name, timeout=600):
@ -1647,65 +1661,67 @@ class DocumentService:
if not knowledge_config.data_source.info_list.file_info_list:
raise ValueError("File source info is required")
upload_file_list = knowledge_config.data_source.info_list.file_info_list.file_ids
for file_id in upload_file_list:
file = (
db.session.query(UploadFile)
.where(UploadFile.tenant_id == dataset.tenant_id, UploadFile.id == file_id)
.first()
files = (
db.session.query(UploadFile)
.where(
UploadFile.tenant_id == dataset.tenant_id,
UploadFile.id.in_(upload_file_list),
)
.all()
)
if len(files) != len(set(upload_file_list)):
raise FileNotExistsError("One or more files not found.")
# raise error if file not found
if not file:
raise FileNotExistsError()
file_name = file.name
file_names = [file.name for file in files]
db_documents = (
db.session.query(Document)
.where(
Document.dataset_id == dataset.id,
Document.tenant_id == current_user.current_tenant_id,
Document.data_source_type == "upload_file",
Document.enabled == True,
Document.name.in_(file_names),
)
.all()
)
documents_map = {document.name: document for document in db_documents}
for file in files:
data_source_info: dict[str, str | bool] = {
"upload_file_id": file_id,
"upload_file_id": file.id,
}
# check duplicate
if knowledge_config.duplicate:
document = (
db.session.query(Document)
.filter_by(
dataset_id=dataset.id,
tenant_id=current_user.current_tenant_id,
data_source_type="upload_file",
enabled=True,
name=file_name,
)
.first()
document = documents_map.get(file.name)
if knowledge_config.duplicate and document:
document.dataset_process_rule_id = dataset_process_rule.id
document.updated_at = naive_utc_now()
document.created_from = created_from
document.doc_form = knowledge_config.doc_form
document.doc_language = knowledge_config.doc_language
document.data_source_info = json.dumps(data_source_info)
document.batch = batch
document.indexing_status = "waiting"
db.session.add(document)
documents.append(document)
duplicate_document_ids.append(document.id)
continue
else:
document = DocumentService.build_document(
dataset,
dataset_process_rule.id,
knowledge_config.data_source.info_list.data_source_type,
knowledge_config.doc_form,
knowledge_config.doc_language,
data_source_info,
created_from,
position,
account,
file.name,
batch,
)
if document:
document.dataset_process_rule_id = dataset_process_rule.id
document.updated_at = naive_utc_now()
document.created_from = created_from
document.doc_form = knowledge_config.doc_form
document.doc_language = knowledge_config.doc_language
document.data_source_info = json.dumps(data_source_info)
document.batch = batch
document.indexing_status = "waiting"
db.session.add(document)
documents.append(document)
duplicate_document_ids.append(document.id)
continue
document = DocumentService.build_document(
dataset,
dataset_process_rule.id,
knowledge_config.data_source.info_list.data_source_type,
knowledge_config.doc_form,
knowledge_config.doc_language,
data_source_info,
created_from,
position,
account,
file_name,
batch,
)
db.session.add(document)
db.session.flush()
document_ids.append(document.id)
documents.append(document)
position += 1
db.session.add(document)
db.session.flush()
document_ids.append(document.id)
documents.append(document)
position += 1
elif knowledge_config.data_source.info_list.data_source_type == "notion_import":
notion_info_list = knowledge_config.data_source.info_list.notion_info_list # type: ignore
if not notion_info_list: