mirror of https://github.com/langgenius/dify.git
114 lines
3.9 KiB
Python
114 lines
3.9 KiB
Python
"""Async task for generating summary indexes."""
|
|
|
|
import logging
|
|
import time
|
|
|
|
import click
|
|
from celery import shared_task
|
|
|
|
from extensions.ext_database import db
|
|
from models.dataset import Dataset, DocumentSegment
|
|
from models.dataset import Document as DatasetDocument
|
|
from services.summary_index_service import SummaryIndexService
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
@shared_task(queue="dataset")
|
|
def generate_summary_index_task(dataset_id: str, document_id: str, segment_ids: list[str] | None = None):
|
|
"""
|
|
Async generate summary index for document segments.
|
|
|
|
Args:
|
|
dataset_id: Dataset ID
|
|
document_id: Document ID
|
|
segment_ids: Optional list of specific segment IDs to process. If None, process all segments.
|
|
|
|
Usage:
|
|
generate_summary_index_task.delay(dataset_id, document_id)
|
|
generate_summary_index_task.delay(dataset_id, document_id, segment_ids)
|
|
"""
|
|
logger.info(
|
|
click.style(
|
|
f"Start generating summary index for document {document_id} in dataset {dataset_id}",
|
|
fg="green",
|
|
)
|
|
)
|
|
start_at = time.perf_counter()
|
|
|
|
try:
|
|
dataset = db.session.query(Dataset).where(Dataset.id == dataset_id).first()
|
|
if not dataset:
|
|
logger.error(click.style(f"Dataset not found: {dataset_id}", fg="red"))
|
|
db.session.close()
|
|
return
|
|
|
|
document = db.session.query(DatasetDocument).where(DatasetDocument.id == document_id).first()
|
|
if not document:
|
|
logger.error(click.style(f"Document not found: {document_id}", fg="red"))
|
|
db.session.close()
|
|
return
|
|
|
|
# Only generate summary index for high_quality indexing technique
|
|
if dataset.indexing_technique != "high_quality":
|
|
logger.info(
|
|
click.style(
|
|
f"Skipping summary generation for dataset {dataset_id}: "
|
|
f"indexing_technique is {dataset.indexing_technique}, not 'high_quality'",
|
|
fg="cyan",
|
|
)
|
|
)
|
|
db.session.close()
|
|
return
|
|
|
|
# Check if summary index is enabled
|
|
summary_index_setting = dataset.summary_index_setting
|
|
if not summary_index_setting or not summary_index_setting.get("enable"):
|
|
logger.info(
|
|
click.style(
|
|
f"Summary index is disabled for dataset {dataset_id}",
|
|
fg="cyan",
|
|
)
|
|
)
|
|
db.session.close()
|
|
return
|
|
|
|
# Determine if only parent chunks should be processed
|
|
only_parent_chunks = dataset.chunk_structure == "parent_child_index"
|
|
|
|
# Generate summaries
|
|
summary_records = SummaryIndexService.generate_summaries_for_document(
|
|
dataset=dataset,
|
|
document=document,
|
|
summary_index_setting=summary_index_setting,
|
|
segment_ids=segment_ids,
|
|
only_parent_chunks=only_parent_chunks,
|
|
)
|
|
|
|
end_at = time.perf_counter()
|
|
logger.info(
|
|
click.style(
|
|
f"Summary index generation completed for document {document_id}: "
|
|
f"{len(summary_records)} summaries generated, latency: {end_at - start_at}",
|
|
fg="green",
|
|
)
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.exception(f"Failed to generate summary index for document {document_id}: {str(e)}")
|
|
# Update document segments with error status if needed
|
|
if segment_ids:
|
|
db.session.query(DocumentSegment).filter(
|
|
DocumentSegment.id.in_(segment_ids),
|
|
DocumentSegment.dataset_id == dataset_id,
|
|
).update(
|
|
{
|
|
DocumentSegment.error: f"Summary generation failed: {str(e)}",
|
|
},
|
|
synchronize_session=False,
|
|
)
|
|
db.session.commit()
|
|
finally:
|
|
db.session.close()
|
|
|