refactor: migrate session.query to select API in summary and remove document tasks (#34650)

This commit is contained in:
Renzo 2026-04-07 00:55:31 -05:00 committed by GitHub
parent 84d8940dbf
commit 173e818a62
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 21 additions and 19 deletions

View File

@ -5,6 +5,7 @@ import time
import click
from celery import shared_task
from sqlalchemy import select, update
from core.db.session_factory import session_factory
from core.rag.index_processor.constant.index_type import IndexTechniqueType
@ -39,12 +40,12 @@ def generate_summary_index_task(dataset_id: str, document_id: str, segment_ids:
try:
with session_factory.create_session() as session:
dataset = session.query(Dataset).where(Dataset.id == dataset_id).first()
dataset = session.scalar(select(Dataset).where(Dataset.id == dataset_id).limit(1))
if not dataset:
logger.error(click.style(f"Dataset not found: {dataset_id}", fg="red"))
return
document = session.query(DatasetDocument).where(DatasetDocument.id == document_id).first()
document = session.scalar(select(DatasetDocument).where(DatasetDocument.id == document_id).limit(1))
if not document:
logger.error(click.style(f"Document not found: {document_id}", fg="red"))
return
@ -108,13 +109,12 @@ def generate_summary_index_task(dataset_id: str, document_id: str, segment_ids:
if segment_ids:
error_message = f"Summary generation failed: {str(e)}"
with session_factory.create_session() as session:
session.query(DocumentSegment).filter(
DocumentSegment.id.in_(segment_ids),
DocumentSegment.dataset_id == dataset_id,
).update(
{
DocumentSegment.error: error_message,
},
synchronize_session=False,
session.execute(
update(DocumentSegment)
.where(
DocumentSegment.id.in_(segment_ids),
DocumentSegment.dataset_id == dataset_id,
)
.values(error=error_message)
)
session.commit()

View File

@ -3,7 +3,7 @@ import time
import click
from celery import shared_task
from sqlalchemy import select
from sqlalchemy import select, update
from core.db.session_factory import session_factory
from core.rag.index_processor.index_processor_factory import IndexProcessorFactory
@ -26,7 +26,7 @@ def remove_document_from_index_task(document_id: str):
start_at = time.perf_counter()
with session_factory.create_session() as session:
document = session.query(Document).where(Document.id == document_id).first()
document = session.scalar(select(Document).where(Document.id == document_id).limit(1))
if not document:
logger.info(click.style(f"Document not found: {document_id}", fg="red"))
return
@ -68,13 +68,15 @@ def remove_document_from_index_task(document_id: str):
except Exception:
logger.exception("clean dataset %s from index failed", dataset.id)
# update segment to disable
session.query(DocumentSegment).where(DocumentSegment.document_id == document.id).update(
{
DocumentSegment.enabled: False,
DocumentSegment.disabled_at: naive_utc_now(),
DocumentSegment.disabled_by: document.disabled_by,
DocumentSegment.updated_at: naive_utc_now(),
}
session.execute(
update(DocumentSegment)
.where(DocumentSegment.document_id == document.id)
.values(
enabled=False,
disabled_at=naive_utc_now(),
disabled_by=document.disabled_by,
updated_at=naive_utc_now(),
)
)
session.commit()