feat: implement Summary Index feature.

This commit is contained in:
FFXN 2026-01-12 16:52:21 +08:00
parent f4a7efde3d
commit 25bfc1cc3b
36 changed files with 2290 additions and 32 deletions

View File

@ -146,6 +146,7 @@ class DatasetUpdatePayload(BaseModel):
embedding_model: str | None = None
embedding_model_provider: str | None = None
retrieval_model: dict[str, Any] | None = None
summary_index_setting: dict[str, Any] | None = None
partial_member_list: list[str] | None = None
external_retrieval_model: dict[str, Any] | None = None
external_knowledge_id: str | None = None

View File

@ -39,9 +39,10 @@ from fields.document_fields import (
from libs.datetime_utils import naive_utc_now
from libs.login import current_account_with_tenant, login_required
from models import DatasetProcessRule, Document, DocumentSegment, UploadFile
from models.dataset import DocumentPipelineExecutionLog
from models.dataset import DocumentPipelineExecutionLog, DocumentSegmentSummary
from services.dataset_service import DatasetService, DocumentService
from services.entities.knowledge_entities.knowledge_entities import KnowledgeConfig, ProcessRule, RetrievalModel
from tasks.generate_summary_index_task import generate_summary_index_task
from ..app.error import (
ProviderModelCurrentlyNotSupportError,
@ -104,6 +105,10 @@ class DocumentRenamePayload(BaseModel):
name: str
class GenerateSummaryPayload(BaseModel):
document_list: list[str]
register_schema_models(
console_ns,
KnowledgeConfig,
@ -111,6 +116,7 @@ register_schema_models(
RetrievalModel,
DocumentRetryPayload,
DocumentRenamePayload,
GenerateSummaryPayload,
)
@ -295,6 +301,97 @@ class DatasetDocumentListApi(Resource):
paginated_documents = db.paginate(select=query, page=page, per_page=limit, max_per_page=100, error_out=False)
documents = paginated_documents.items
# Check if dataset has summary index enabled
has_summary_index = (
dataset.summary_index_setting
and dataset.summary_index_setting.get("enable") is True
)
# Filter documents that need summary calculation
documents_need_summary = [doc for doc in documents if doc.need_summary is True]
document_ids_need_summary = [str(doc.id) for doc in documents_need_summary]
# Calculate summary_index_status for documents that need summary (only if dataset summary index is enabled)
summary_status_map = {}
if has_summary_index and document_ids_need_summary:
# Get all segments for these documents (excluding qa_model and re_segment)
segments = (
db.session.query(DocumentSegment.id, DocumentSegment.document_id)
.where(
DocumentSegment.document_id.in_(document_ids_need_summary),
DocumentSegment.status != "re_segment",
DocumentSegment.tenant_id == current_tenant_id,
)
.all()
)
# Group segments by document_id
document_segments_map = {}
for segment in segments:
doc_id = str(segment.document_id)
if doc_id not in document_segments_map:
document_segments_map[doc_id] = []
document_segments_map[doc_id].append(segment.id)
# Get all summary records for these segments
all_segment_ids = [seg.id for seg in segments]
summaries = {}
if all_segment_ids:
summary_records = (
db.session.query(DocumentSegmentSummary)
.where(
DocumentSegmentSummary.chunk_id.in_(all_segment_ids),
DocumentSegmentSummary.dataset_id == dataset_id,
DocumentSegmentSummary.enabled == True, # Only count enabled summaries
)
.all()
)
summaries = {summary.chunk_id: summary.status for summary in summary_records}
# Calculate summary_index_status for each document
for doc_id in document_ids_need_summary:
segment_ids = document_segments_map.get(doc_id, [])
if not segment_ids:
# No segments, status is "GENERATING" (waiting to generate)
summary_status_map[doc_id] = "GENERATING"
continue
# Count summary statuses for this document's segments
status_counts = {"completed": 0, "generating": 0, "error": 0, "not_started": 0}
for segment_id in segment_ids:
status = summaries.get(segment_id, "not_started")
if status in status_counts:
status_counts[status] += 1
else:
status_counts["not_started"] += 1
total_segments = len(segment_ids)
completed_count = status_counts["completed"]
generating_count = status_counts["generating"]
error_count = status_counts["error"]
# Determine overall status (only three states: GENERATING, COMPLETED, ERROR)
if completed_count == total_segments:
summary_status_map[doc_id] = "COMPLETED"
elif error_count > 0:
# Has errors (even if some are completed or generating)
summary_status_map[doc_id] = "ERROR"
elif generating_count > 0 or status_counts["not_started"] > 0:
# Still generating or not started
summary_status_map[doc_id] = "GENERATING"
else:
# Default to generating
summary_status_map[doc_id] = "GENERATING"
# Add summary_index_status to each document
for document in documents:
if has_summary_index and document.need_summary is True:
document.summary_index_status = summary_status_map.get(str(document.id), "GENERATING")
else:
# Return null if summary index is not enabled or document doesn't need summary
document.summary_index_status = None
if fetch:
for document in documents:
completed_segments = (
@ -393,6 +490,7 @@ class DatasetDocumentListApi(Resource):
return {"result": "success"}, 204
@console_ns.route("/datasets/init")
class DatasetInitApi(Resource):
@console_ns.doc("init_dataset")
@ -780,6 +878,7 @@ class DocumentApi(DocumentResource):
"display_status": document.display_status,
"doc_form": document.doc_form,
"doc_language": document.doc_language,
"need_summary": document.need_summary if document.need_summary is not None else False,
}
else:
dataset_process_rules = DatasetService.get_process_rules(dataset_id)
@ -815,6 +914,7 @@ class DocumentApi(DocumentResource):
"display_status": document.display_status,
"doc_form": document.doc_form,
"doc_language": document.doc_language,
"need_summary": document.need_summary if document.need_summary is not None else False,
}
return response, 200
@ -1182,3 +1282,211 @@ class DocumentPipelineExecutionLogApi(DocumentResource):
"input_data": log.input_data,
"datasource_node_id": log.datasource_node_id,
}, 200
@console_ns.route("/datasets/<uuid:dataset_id>/documents/generate-summary")
class DocumentGenerateSummaryApi(Resource):
@console_ns.doc("generate_summary_for_documents")
@console_ns.doc(description="Generate summary index for documents")
@console_ns.doc(params={"dataset_id": "Dataset ID"})
@console_ns.expect(console_ns.models[GenerateSummaryPayload.__name__])
@console_ns.response(200, "Summary generation started successfully")
@console_ns.response(400, "Invalid request or dataset configuration")
@console_ns.response(403, "Permission denied")
@console_ns.response(404, "Dataset not found")
@setup_required
@login_required
@account_initialization_required
@cloud_edition_billing_rate_limit_check("knowledge")
def post(self, dataset_id):
"""
Generate summary index for specified documents.
This endpoint checks if the dataset configuration supports summary generation
(indexing_technique must be 'high_quality' and summary_index_setting.enable must be true),
then asynchronously generates summary indexes for the provided documents.
"""
current_user, _ = current_account_with_tenant()
dataset_id = str(dataset_id)
# Get dataset
dataset = DatasetService.get_dataset(dataset_id)
if not dataset:
raise NotFound("Dataset not found.")
# Check permissions
if not current_user.is_dataset_editor:
raise Forbidden()
try:
DatasetService.check_dataset_permission(dataset, current_user)
except services.errors.account.NoPermissionError as e:
raise Forbidden(str(e))
# Validate request payload
payload = GenerateSummaryPayload.model_validate(console_ns.payload or {})
document_list = payload.document_list
if not document_list:
raise ValueError("document_list cannot be empty.")
# Check if dataset configuration supports summary generation
if dataset.indexing_technique != "high_quality":
raise ValueError(
f"Summary generation is only available for 'high_quality' indexing technique. "
f"Current indexing technique: {dataset.indexing_technique}"
)
summary_index_setting = dataset.summary_index_setting
if not summary_index_setting or not summary_index_setting.get("enable"):
raise ValueError(
"Summary index is not enabled for this dataset. "
"Please enable it in the dataset settings."
)
# Verify all documents exist and belong to the dataset
documents = (
db.session.query(Document)
.filter(
Document.id.in_(document_list),
Document.dataset_id == dataset_id,
)
.all()
)
if len(documents) != len(document_list):
found_ids = {doc.id for doc in documents}
missing_ids = set(document_list) - found_ids
raise NotFound(f"Some documents not found: {list(missing_ids)}")
# Dispatch async tasks for each document
for document in documents:
# Skip qa_model documents as they don't generate summaries
if document.doc_form == "qa_model":
logger.info(
f"Skipping summary generation for qa_model document {document.id}"
)
continue
# Dispatch async task
generate_summary_index_task(dataset_id, document.id)
logger.info(
f"Dispatched summary generation task for document {document.id} in dataset {dataset_id}"
)
return {"result": "success"}, 200
@console_ns.route("/datasets/<uuid:dataset_id>/documents/<uuid:document_id>/summary-status")
class DocumentSummaryStatusApi(DocumentResource):
@console_ns.doc("get_document_summary_status")
@console_ns.doc(description="Get summary index generation status for a document")
@console_ns.doc(params={"dataset_id": "Dataset ID", "document_id": "Document ID"})
@console_ns.response(200, "Summary status retrieved successfully")
@console_ns.response(404, "Document not found")
@setup_required
@login_required
@account_initialization_required
def get(self, dataset_id, document_id):
"""
Get summary index generation status for a document.
Returns:
- total_segments: Total number of segments in the document
- summary_status: Dictionary with status counts
- completed: Number of summaries completed
- generating: Number of summaries being generated
- error: Number of summaries with errors
- not_started: Number of segments without summary records
- summaries: List of summary records with status and content preview
"""
current_user, _ = current_account_with_tenant()
dataset_id = str(dataset_id)
document_id = str(document_id)
# Get document
document = self.get_document(dataset_id, document_id)
# Get dataset
dataset = DatasetService.get_dataset(dataset_id)
if not dataset:
raise NotFound("Dataset not found.")
# Check permissions
try:
DatasetService.check_dataset_permission(dataset, current_user)
except services.errors.account.NoPermissionError as e:
raise Forbidden(str(e))
# Get all segments for this document
segments = (
db.session.query(DocumentSegment)
.filter(
DocumentSegment.document_id == document_id,
DocumentSegment.dataset_id == dataset_id,
DocumentSegment.status == "completed",
DocumentSegment.enabled == True,
)
.all()
)
total_segments = len(segments)
# Get all summary records for these segments
segment_ids = [segment.id for segment in segments]
summaries = []
if segment_ids:
summaries = (
db.session.query(DocumentSegmentSummary)
.filter(
DocumentSegmentSummary.document_id == document_id,
DocumentSegmentSummary.dataset_id == dataset_id,
DocumentSegmentSummary.chunk_id.in_(segment_ids),
DocumentSegmentSummary.enabled == True, # Only return enabled summaries
)
.all()
)
# Create a mapping of chunk_id to summary
summary_map = {summary.chunk_id: summary for summary in summaries}
# Count statuses
status_counts = {
"completed": 0,
"generating": 0,
"error": 0,
"not_started": 0,
}
summary_list = []
for segment in segments:
summary = summary_map.get(segment.id)
if summary:
status = summary.status
status_counts[status] = status_counts.get(status, 0) + 1
summary_list.append({
"segment_id": segment.id,
"segment_position": segment.position,
"status": summary.status,
"summary_preview": summary.summary_content[:100] + "..." if summary.summary_content and len(summary.summary_content) > 100 else summary.summary_content,
"error": summary.error,
"created_at": int(summary.created_at.timestamp()) if summary.created_at else None,
"updated_at": int(summary.updated_at.timestamp()) if summary.updated_at else None,
})
else:
status_counts["not_started"] += 1
summary_list.append({
"segment_id": segment.id,
"segment_position": segment.position,
"status": "not_started",
"summary_preview": None,
"error": None,
"created_at": None,
"updated_at": None,
})
return {
"total_segments": total_segments,
"summary_status": status_counts,
"summaries": summary_list,
}, 200

View File

@ -29,7 +29,7 @@ from extensions.ext_database import db
from extensions.ext_redis import redis_client
from fields.segment_fields import child_chunk_fields, segment_fields
from libs.login import current_account_with_tenant, login_required
from models.dataset import ChildChunk, DocumentSegment
from models.dataset import ChildChunk, DocumentSegment, DocumentSegmentSummary
from models.model import UploadFile
from services.dataset_service import DatasetService, DocumentService, SegmentService
from services.entities.knowledge_entities.knowledge_entities import ChildChunkUpdateArgs, SegmentUpdateArgs
@ -38,6 +38,23 @@ from services.errors.chunk import ChildChunkIndexingError as ChildChunkIndexingS
from tasks.batch_create_segment_to_index_task import batch_create_segment_to_index_task
def _get_segment_with_summary(segment, dataset_id):
"""Helper function to marshal segment and add summary information."""
segment_dict = marshal(segment, segment_fields)
# Query summary for this segment (only enabled summaries)
summary = (
db.session.query(DocumentSegmentSummary)
.where(
DocumentSegmentSummary.chunk_id == segment.id,
DocumentSegmentSummary.dataset_id == dataset_id,
DocumentSegmentSummary.enabled == True, # Only return enabled summaries
)
.first()
)
segment_dict["summary"] = summary.summary_content if summary else None
return segment_dict
class SegmentListQuery(BaseModel):
limit: int = Field(default=20, ge=1, le=100)
status: list[str] = Field(default_factory=list)
@ -60,6 +77,7 @@ class SegmentUpdatePayload(BaseModel):
keywords: list[str] | None = None
regenerate_child_chunks: bool = False
attachment_ids: list[str] | None = None
summary: str | None = None # Summary content for summary index
class BatchImportPayload(BaseModel):
@ -153,8 +171,34 @@ class DatasetDocumentSegmentListApi(Resource):
segments = db.paginate(select=query, page=page, per_page=limit, max_per_page=100, error_out=False)
# Query summaries for all segments in this page (batch query for efficiency)
segment_ids = [segment.id for segment in segments.items]
summaries = {}
if segment_ids:
summary_records = (
db.session.query(DocumentSegmentSummary)
.where(
DocumentSegmentSummary.chunk_id.in_(segment_ids),
DocumentSegmentSummary.dataset_id == dataset_id,
)
.all()
)
# Only include enabled summaries
summaries = {
summary.chunk_id: summary.summary_content
for summary in summary_records
if summary.enabled is True
}
# Add summary to each segment
segments_with_summary = []
for segment in segments.items:
segment_dict = marshal(segment, segment_fields)
segment_dict["summary"] = summaries.get(segment.id)
segments_with_summary.append(segment_dict)
response = {
"data": marshal(segments.items, segment_fields),
"data": segments_with_summary,
"limit": limit,
"total": segments.total,
"total_pages": segments.pages,
@ -300,7 +344,7 @@ class DatasetDocumentSegmentAddApi(Resource):
payload_dict = payload.model_dump(exclude_none=True)
SegmentService.segment_create_args_validate(payload_dict, document)
segment = SegmentService.create_segment(payload_dict, document, dataset)
return {"data": marshal(segment, segment_fields), "doc_form": document.doc_form}, 200
return {"data": _get_segment_with_summary(segment, dataset_id), "doc_form": document.doc_form}, 200
@console_ns.route("/datasets/<uuid:dataset_id>/documents/<uuid:document_id>/segments/<uuid:segment_id>")
@ -362,10 +406,12 @@ class DatasetDocumentSegmentUpdateApi(Resource):
payload = SegmentUpdatePayload.model_validate(console_ns.payload or {})
payload_dict = payload.model_dump(exclude_none=True)
SegmentService.segment_create_args_validate(payload_dict, document)
# Update segment (summary update with change detection is handled in SegmentService.update_segment)
segment = SegmentService.update_segment(
SegmentUpdateArgs.model_validate(payload.model_dump(exclude_none=True)), segment, document, dataset
)
return {"data": marshal(segment, segment_fields), "doc_form": document.doc_form}, 200
return {"data": _get_segment_with_summary(segment, dataset_id), "doc_form": document.doc_form}, 200
@setup_required
@login_required

View File

@ -1,4 +1,4 @@
from flask_restx import Resource
from flask_restx import Resource, fields
from controllers.common.schema import register_schema_model
from libs.login import login_required
@ -10,17 +10,56 @@ from ..wraps import (
cloud_edition_billing_rate_limit_check,
setup_required,
)
from fields.hit_testing_fields import (
child_chunk_fields,
document_fields,
files_fields,
hit_testing_record_fields,
segment_fields,
)
register_schema_model(console_ns, HitTestingPayload)
def _get_or_create_model(model_name: str, field_def):
"""Get or create a flask_restx model to avoid dict type issues in Swagger."""
existing = console_ns.models.get(model_name)
if existing is None:
existing = console_ns.model(model_name, field_def)
return existing
# Register models for flask_restx to avoid dict type issues in Swagger
document_model = _get_or_create_model("HitTestingDocument", document_fields)
segment_fields_copy = segment_fields.copy()
segment_fields_copy["document"] = fields.Nested(document_model)
segment_model = _get_or_create_model("HitTestingSegment", segment_fields_copy)
child_chunk_model = _get_or_create_model("HitTestingChildChunk", child_chunk_fields)
files_model = _get_or_create_model("HitTestingFile", files_fields)
hit_testing_record_fields_copy = hit_testing_record_fields.copy()
hit_testing_record_fields_copy["segment"] = fields.Nested(segment_model)
hit_testing_record_fields_copy["child_chunks"] = fields.List(fields.Nested(child_chunk_model))
hit_testing_record_fields_copy["files"] = fields.List(fields.Nested(files_model))
hit_testing_record_model = _get_or_create_model("HitTestingRecord", hit_testing_record_fields_copy)
# Response model for hit testing API
hit_testing_response_fields = {
"query": fields.String,
"records": fields.List(fields.Nested(hit_testing_record_model)),
}
hit_testing_response_model = _get_or_create_model("HitTestingResponse", hit_testing_response_fields)
@console_ns.route("/datasets/<uuid:dataset_id>/hit-testing")
class HitTestingApi(Resource, DatasetsHitTestingBase):
@console_ns.doc("test_dataset_retrieval")
@console_ns.doc(description="Test dataset knowledge retrieval")
@console_ns.doc(params={"dataset_id": "Dataset ID"})
@console_ns.expect(console_ns.models[HitTestingPayload.__name__])
@console_ns.response(200, "Hit testing completed successfully")
@console_ns.response(200, "Hit testing completed successfully", model=hit_testing_response_model)
@console_ns.response(404, "Dataset not found")
@console_ns.response(400, "Invalid parameters")
@setup_required

View File

@ -3,6 +3,7 @@ from pydantic import BaseModel, Field, field_validator
class PreviewDetail(BaseModel):
content: str
summary: str | None = None
child_chunks: list[str] | None = None

View File

@ -311,14 +311,18 @@ class IndexingRunner:
qa_preview_texts: list[QAPreviewDetail] = []
total_segments = 0
# doc_form represents the segmentation method (general, parent-child, QA)
index_type = doc_form
index_processor = IndexProcessorFactory(index_type).init_index_processor()
# one extract_setting is one source document
for extract_setting in extract_settings:
# extract
processing_rule = DatasetProcessRule(
mode=tmp_processing_rule["mode"], rules=json.dumps(tmp_processing_rule["rules"])
)
# Extract document content
text_docs = index_processor.extract(extract_setting, process_rule_mode=tmp_processing_rule["mode"])
# Cleaning and segmentation
documents = index_processor.transform(
text_docs,
current_user=None,
@ -361,6 +365,12 @@ class IndexingRunner:
if doc_form and doc_form == "qa_model":
return IndexingEstimate(total_segments=total_segments * 20, qa_preview=qa_preview_texts, preview=[])
# Generate summary preview
summary_index_setting = tmp_processing_rule["summary_index_setting"] if "summary_index_setting" in tmp_processing_rule else None
if summary_index_setting and summary_index_setting.get('enable') and preview_texts:
preview_texts = index_processor.generate_summary_preview(tenant_id, preview_texts, summary_index_setting)
return IndexingEstimate(total_segments=total_segments, preview=preview_texts)
def _extract(

View File

@ -434,3 +434,6 @@ INSTRUCTION_GENERATE_TEMPLATE_PROMPT = """The output of this prompt is not as ex
You should edit the prompt according to the IDEAL OUTPUT."""
INSTRUCTION_GENERATE_TEMPLATE_CODE = """Please fix the errors in the {{#error_message#}}."""
DEFAULT_GENERATOR_SUMMARY_PROMPT = """
You are a helpful assistant that summarizes long pieces of text into concise summaries. Given the following text, generate a brief summary that captures the main points and key information. The summary should be clear, concise, and written in complete sentences. """

View File

@ -371,6 +371,8 @@ class RetrievalService:
include_segment_ids = set()
segment_child_map = {}
segment_file_map = {}
segment_summary_map = {} # Map segment_id to summary content
summary_segment_ids = set() # Track segments retrieved via summary
with Session(bind=db.engine, expire_on_commit=False) as session:
# Process documents
for document in documents:
@ -398,13 +400,25 @@ class RetrievalService:
attachment_info = attachment_info_dict["attachment_info"]
segment_id = attachment_info_dict["segment_id"]
else:
child_index_node_id = document.metadata.get("doc_id")
child_chunk_stmt = select(ChildChunk).where(ChildChunk.index_node_id == child_index_node_id)
child_chunk = session.scalar(child_chunk_stmt)
# Check if this is a summary document
is_summary = document.metadata.get("is_summary", False)
if is_summary:
# For summary documents, find the original chunk via original_chunk_id
original_chunk_id = document.metadata.get("original_chunk_id")
if not original_chunk_id:
continue
segment_id = original_chunk_id
# Track that this segment was retrieved via summary
summary_segment_ids.add(segment_id)
else:
# For normal documents, find by child chunk index_node_id
child_index_node_id = document.metadata.get("doc_id")
child_chunk_stmt = select(ChildChunk).where(ChildChunk.index_node_id == child_index_node_id)
child_chunk = session.scalar(child_chunk_stmt)
if not child_chunk:
continue
segment_id = child_chunk.segment_id
if not child_chunk:
continue
segment_id = child_chunk.segment_id
if not segment_id:
continue
@ -489,16 +503,34 @@ class RetrievalService:
if segment:
segment_file_map[segment.id] = [attachment_info]
else:
index_node_id = document.metadata.get("doc_id")
if not index_node_id:
continue
document_segment_stmt = select(DocumentSegment).where(
DocumentSegment.dataset_id == dataset_document.dataset_id,
DocumentSegment.enabled == True,
DocumentSegment.status == "completed",
DocumentSegment.index_node_id == index_node_id,
)
segment = session.scalar(document_segment_stmt)
# Check if this is a summary document
is_summary = document.metadata.get("is_summary", False)
if is_summary:
# For summary documents, find the original chunk via original_chunk_id
original_chunk_id = document.metadata.get("original_chunk_id")
if not original_chunk_id:
continue
# Track that this segment was retrieved via summary
summary_segment_ids.add(original_chunk_id)
document_segment_stmt = select(DocumentSegment).where(
DocumentSegment.dataset_id == dataset_document.dataset_id,
DocumentSegment.enabled == True,
DocumentSegment.status == "completed",
DocumentSegment.id == original_chunk_id,
)
segment = session.scalar(document_segment_stmt)
else:
# For normal documents, find by index_node_id
index_node_id = document.metadata.get("doc_id")
if not index_node_id:
continue
document_segment_stmt = select(DocumentSegment).where(
DocumentSegment.dataset_id == dataset_document.dataset_id,
DocumentSegment.enabled == True,
DocumentSegment.status == "completed",
DocumentSegment.index_node_id == index_node_id,
)
segment = session.scalar(document_segment_stmt)
if not segment:
continue
@ -526,6 +558,23 @@ class RetrievalService:
if record["segment"].id in segment_file_map:
record["files"] = segment_file_map[record["segment"].id] # type: ignore[assignment]
# Batch query summaries for segments retrieved via summary (only enabled summaries)
if summary_segment_ids:
from models.dataset import DocumentSegmentSummary
summaries = (
session.query(DocumentSegmentSummary)
.filter(
DocumentSegmentSummary.chunk_id.in_(summary_segment_ids),
DocumentSegmentSummary.status == "completed",
DocumentSegmentSummary.enabled == True, # Only retrieve enabled summaries
)
.all()
)
for summary in summaries:
if summary.summary_content:
segment_summary_map[summary.chunk_id] = summary.summary_content
result = []
for record in records:
# Extract segment
@ -549,9 +598,16 @@ class RetrievalService:
else None
)
# Extract summary if this segment was retrieved via summary
summary_content = segment_summary_map.get(segment.id)
# Create RetrievalSegments object
retrieval_segment = RetrievalSegments(
segment=segment, child_chunks=child_chunks, score=score, files=files
segment=segment,
child_chunks=child_chunks,
score=score,
files=files,
summary=summary_content,
)
result.append(retrieval_segment)

View File

@ -20,3 +20,4 @@ class RetrievalSegments(BaseModel):
child_chunks: list[RetrievalChildChunk] | None = None
score: float | None = None
files: list[dict[str, str | int]] | None = None
summary: str | None = None # Summary content if retrieved via summary index

View File

@ -13,6 +13,7 @@ from urllib.parse import unquote, urlparse
import httpx
from configs import dify_config
from core.entities.knowledge_entities import PreviewDetail
from core.helper import ssrf_proxy
from core.rag.extractor.entity.extract_setting import ExtractSetting
from core.rag.index_processor.constant.doc_type import DocType
@ -45,6 +46,15 @@ class BaseIndexProcessor(ABC):
def transform(self, documents: list[Document], current_user: Account | None = None, **kwargs) -> list[Document]:
raise NotImplementedError
@abstractmethod
def generate_summary_preview(self, tenant_id: str, preview_texts: list[PreviewDetail], summary_index_setting: dict) -> list[PreviewDetail]:
"""
For each segment in preview_texts, generate a summary using LLM and attach it to the segment.
The summary can be stored in a new attribute, e.g., summary.
This method should be implemented by subclasses.
"""
raise NotImplementedError
@abstractmethod
def load(
self,

View File

@ -1,9 +1,13 @@
"""Paragraph index processor."""
import logging
import uuid
from collections.abc import Mapping
from typing import Any
logger = logging.getLogger(__name__)
from core.entities.knowledge_entities import PreviewDetail
from core.rag.cleaner.clean_processor import CleanProcessor
from core.rag.datasource.keyword.keyword_factory import Keyword
from core.rag.datasource.retrieval_service import RetrievalService
@ -17,12 +21,19 @@ from core.rag.index_processor.index_processor_base import BaseIndexProcessor
from core.rag.models.document import AttachmentDocument, Document, MultimodalGeneralStructureChunk
from core.rag.retrieval.retrieval_methods import RetrievalMethod
from core.tools.utils.text_processing_utils import remove_leading_symbols
from extensions.ext_database import db
from libs import helper
from models.account import Account
from models.dataset import Dataset, DatasetProcessRule
from models.dataset import Dataset, DatasetProcessRule, DocumentSegment
from models.dataset import Document as DatasetDocument
from services.account_service import AccountService
from services.entities.knowledge_entities.knowledge_entities import Rule
from services.summary_index_service import SummaryIndexService
from core.llm_generator.prompts import DEFAULT_GENERATOR_SUMMARY_PROMPT
from core.model_runtime.entities.message_entities import UserPromptMessage
from core.model_runtime.entities.model_entities import ModelType
from core.provider_manager import ProviderManager
from core.model_manager import ModelInstance
class ParagraphIndexProcessor(BaseIndexProcessor):
@ -108,6 +119,29 @@ class ParagraphIndexProcessor(BaseIndexProcessor):
keyword.add_texts(documents)
def clean(self, dataset: Dataset, node_ids: list[str] | None, with_keywords: bool = True, **kwargs):
# Note: Summary indexes are now disabled (not deleted) when segments are disabled.
# This method is called for actual deletion scenarios (e.g., when segment is deleted).
# For disable operations, disable_summaries_for_segments is called directly in the task.
# Only delete summaries if explicitly requested (e.g., when segment is actually deleted)
delete_summaries = kwargs.get("delete_summaries", False)
if delete_summaries:
if node_ids:
# Find segments by index_node_id
segments = (
db.session.query(DocumentSegment)
.filter(
DocumentSegment.dataset_id == dataset.id,
DocumentSegment.index_node_id.in_(node_ids),
)
.all()
)
segment_ids = [segment.id for segment in segments]
if segment_ids:
SummaryIndexService.delete_summaries_for_segments(dataset, segment_ids)
else:
# Delete all summaries for the dataset
SummaryIndexService.delete_summaries_for_segments(dataset, None)
if dataset.indexing_technique == "high_quality":
vector = Vector(dataset)
if node_ids:
@ -227,3 +261,70 @@ class ParagraphIndexProcessor(BaseIndexProcessor):
}
else:
raise ValueError("Chunks is not a list")
def generate_summary_preview(self, tenant_id: str, preview_texts: list[PreviewDetail], summary_index_setting: dict) -> list[PreviewDetail]:
"""
For each segment, concurrently call generate_summary to generate a summary
and write it to the summary attribute of PreviewDetail.
"""
import concurrent.futures
from flask import current_app
# Capture Flask app context for worker threads
flask_app = None
try:
flask_app = current_app._get_current_object() # type: ignore
except RuntimeError:
logger.warning("No Flask application context available, summary generation may fail")
def process(preview: PreviewDetail) -> None:
"""Generate summary for a single preview item."""
try:
if flask_app:
# Ensure Flask app context in worker thread
with flask_app.app_context():
summary = self.generate_summary(tenant_id, preview.content, summary_index_setting)
preview.summary = summary
else:
# Fallback: try without app context (may fail)
summary = self.generate_summary(tenant_id, preview.content, summary_index_setting)
preview.summary = summary
except Exception as e:
logger.error(f"Failed to generate summary for preview: {str(e)}")
# Don't fail the entire preview if summary generation fails
preview.summary = None
with concurrent.futures.ThreadPoolExecutor() as executor:
list(executor.map(process, preview_texts))
return preview_texts
@staticmethod
def generate_summary(tenant_id: str, text: str, summary_index_setting: dict = None) -> str:
"""
Generate summary for the given text using ModelInstance.invoke_llm and the default or custom summary prompt.
"""
if not summary_index_setting or not summary_index_setting.get("enable"):
raise ValueError("summary_index_setting is required and must be enabled to generate summary.")
model_name = summary_index_setting.get("model_name")
model_provider_name = summary_index_setting.get("model_provider_name")
summary_prompt = summary_index_setting.get("summary_prompt")
# Import default summary prompt
if not summary_prompt:
summary_prompt = DEFAULT_GENERATOR_SUMMARY_PROMPT
prompt = f"{summary_prompt}\n{text}"
provider_manager = ProviderManager()
provider_model_bundle = provider_manager.get_provider_model_bundle(tenant_id, model_provider_name, ModelType.LLM)
model_instance = ModelInstance(provider_model_bundle, model_name)
prompt_messages = [UserPromptMessage(content=prompt)]
result = model_instance.invoke_llm(
prompt_messages=prompt_messages,
model_parameters={},
stream=False
)
return getattr(result.message, "content", "")

View File

@ -25,6 +25,7 @@ from models.dataset import ChildChunk, Dataset, DatasetProcessRule, DocumentSegm
from models.dataset import Document as DatasetDocument
from services.account_service import AccountService
from services.entities.knowledge_entities.knowledge_entities import ParentMode, Rule
from services.summary_index_service import SummaryIndexService
class ParentChildIndexProcessor(BaseIndexProcessor):
@ -135,6 +136,29 @@ class ParentChildIndexProcessor(BaseIndexProcessor):
def clean(self, dataset: Dataset, node_ids: list[str] | None, with_keywords: bool = True, **kwargs):
# node_ids is segment's node_ids
# Note: Summary indexes are now disabled (not deleted) when segments are disabled.
# This method is called for actual deletion scenarios (e.g., when segment is deleted).
# For disable operations, disable_summaries_for_segments is called directly in the task.
# Only delete summaries if explicitly requested (e.g., when segment is actually deleted)
delete_summaries = kwargs.get("delete_summaries", False)
if delete_summaries:
if node_ids:
# Find segments by index_node_id
segments = (
db.session.query(DocumentSegment)
.filter(
DocumentSegment.dataset_id == dataset.id,
DocumentSegment.index_node_id.in_(node_ids),
)
.all()
)
segment_ids = [segment.id for segment in segments]
if segment_ids:
SummaryIndexService.delete_summaries_for_segments(dataset, segment_ids)
else:
# Delete all summaries for the dataset
SummaryIndexService.delete_summaries_for_segments(dataset, None)
if dataset.indexing_technique == "high_quality":
delete_child_chunks = kwargs.get("delete_child_chunks") or False
precomputed_child_node_ids = kwargs.get("precomputed_child_node_ids")

View File

@ -25,9 +25,10 @@ from core.rag.retrieval.retrieval_methods import RetrievalMethod
from core.tools.utils.text_processing_utils import remove_leading_symbols
from libs import helper
from models.account import Account
from models.dataset import Dataset
from models.dataset import Dataset, DocumentSegment
from models.dataset import Document as DatasetDocument
from services.entities.knowledge_entities.knowledge_entities import Rule
from services.summary_index_service import SummaryIndexService
logger = logging.getLogger(__name__)
@ -144,6 +145,30 @@ class QAIndexProcessor(BaseIndexProcessor):
vector.create_multimodal(multimodal_documents)
def clean(self, dataset: Dataset, node_ids: list[str] | None, with_keywords: bool = True, **kwargs):
# Note: Summary indexes are now disabled (not deleted) when segments are disabled.
# This method is called for actual deletion scenarios (e.g., when segment is deleted).
# For disable operations, disable_summaries_for_segments is called directly in the task.
# Note: qa_model doesn't generate summaries, but we clean them for completeness
# Only delete summaries if explicitly requested (e.g., when segment is actually deleted)
delete_summaries = kwargs.get("delete_summaries", False)
if delete_summaries:
if node_ids:
# Find segments by index_node_id
segments = (
db.session.query(DocumentSegment)
.filter(
DocumentSegment.dataset_id == dataset.id,
DocumentSegment.index_node_id.in_(node_ids),
)
.all()
)
segment_ids = [segment.id for segment in segments]
if segment_ids:
SummaryIndexService.delete_summaries_for_segments(dataset, segment_ids)
else:
# Delete all summaries for the dataset
SummaryIndexService.delete_summaries_for_segments(dataset, None)
vector = Vector(dataset)
if node_ids:
vector.delete_by_ids(node_ids)

View File

@ -62,6 +62,21 @@ class DocumentExtractorNode(Node[DocumentExtractorNodeData]):
inputs = {"variable_selector": variable_selector}
process_data = {"documents": value if isinstance(value, list) else [value]}
# Ensure storage_key is loaded for File objects
files_to_check = value if isinstance(value, list) else [value]
files_needing_storage_key = [
f for f in files_to_check
if isinstance(f, File) and not f.storage_key and f.related_id
]
if files_needing_storage_key:
from factories.file_factory import StorageKeyLoader
from extensions.ext_database import db
from sqlalchemy.orm import Session
with Session(bind=db.engine) as session:
storage_key_loader = StorageKeyLoader(session, tenant_id=self.tenant_id)
storage_key_loader.load_storage_keys(files_needing_storage_key)
try:
if isinstance(value, list):
extracted_text_list = list(map(_extract_text_from_file, value))
@ -415,6 +430,15 @@ def _download_file_content(file: File) -> bytes:
response.raise_for_status()
return response.content
else:
# Check if storage_key is set
if not file.storage_key:
raise FileDownloadError(f"File storage_key is missing for file: {file.filename}")
# Check if file exists before downloading
from extensions.ext_storage import storage
if not storage.exists(file.storage_key):
raise FileDownloadError(f"File not found in storage: {file.storage_key}")
return file_manager.download(file)
except Exception as e:
raise FileDownloadError(f"Error downloading file: {str(e)}") from e

View File

@ -158,3 +158,5 @@ class KnowledgeIndexNodeData(BaseNodeData):
type: str = "knowledge-index"
chunk_structure: str
index_chunk_variable_selector: list[str]
indexing_technique: str | None = None
summary_index_setting: dict | None = None

View File

@ -1,9 +1,11 @@
import concurrent.futures
import datetime
import logging
import time
from collections.abc import Mapping
from typing import Any
from flask import current_app
from sqlalchemy import func, select
from core.app.entities.app_invoke_entities import InvokeFrom
@ -16,7 +18,9 @@ from core.workflow.nodes.base.node import Node
from core.workflow.nodes.base.template import Template
from core.workflow.runtime import VariablePool
from extensions.ext_database import db
from models.dataset import Dataset, Document, DocumentSegment
from models.dataset import Dataset, Document, DocumentSegment, DocumentSegmentSummary
from services.summary_index_service import SummaryIndexService
from tasks.generate_summary_index_task import generate_summary_index_task
from .entities import KnowledgeIndexNodeData
from .exc import (
@ -67,7 +71,18 @@ class KnowledgeIndexNode(Node[KnowledgeIndexNodeData]):
# index knowledge
try:
if is_preview:
outputs = self._get_preview_output(node_data.chunk_structure, chunks)
# Preview mode: generate summaries for chunks directly without saving to database
# Format preview and generate summaries on-the-fly
# Get indexing_technique and summary_index_setting from node_data (workflow graph config)
# or fallback to dataset if not available in node_data
indexing_technique = node_data.indexing_technique or dataset.indexing_technique
summary_index_setting = node_data.summary_index_setting or dataset.summary_index_setting
outputs = self._get_preview_output_with_summaries(
node_data.chunk_structure, chunks, dataset=dataset,
indexing_technique=indexing_technique,
summary_index_setting=summary_index_setting
)
return NodeRunResult(
status=WorkflowNodeExecutionStatus.SUCCEEDED,
inputs=variables,
@ -163,6 +178,9 @@ class KnowledgeIndexNode(Node[KnowledgeIndexNodeData]):
db.session.commit()
# Generate summary index if enabled
self._handle_summary_index_generation(dataset, document, variable_pool)
return {
"dataset_id": ds_id_value,
"dataset_name": dataset_name_value,
@ -173,9 +191,269 @@ class KnowledgeIndexNode(Node[KnowledgeIndexNodeData]):
"display_status": "completed",
}
def _get_preview_output(self, chunk_structure: str, chunks: Any) -> Mapping[str, Any]:
def _handle_summary_index_generation(
self,
dataset: Dataset,
document: Document,
variable_pool: VariablePool,
) -> None:
"""
Handle summary index generation based on mode (debug/preview or production).
Args:
dataset: Dataset containing the document
document: Document to generate summaries for
variable_pool: Variable pool to check invoke_from
"""
# Only generate summary index for high_quality indexing technique
if dataset.indexing_technique != "high_quality":
return
# Check if summary index is enabled
summary_index_setting = dataset.summary_index_setting
if not summary_index_setting or not summary_index_setting.get("enable"):
return
# Skip qa_model documents
if document.doc_form == "qa_model":
return
# Determine if in preview/debug mode
invoke_from = variable_pool.get(["sys", SystemVariableKey.INVOKE_FROM])
is_preview = invoke_from and invoke_from.value == InvokeFrom.DEBUGGER
# Determine if only parent chunks should be processed
only_parent_chunks = dataset.chunk_structure == "parent_child_index"
if is_preview:
try:
# Query segments that need summary generation
query = db.session.query(DocumentSegment).filter_by(
dataset_id=dataset.id,
document_id=document.id,
status="completed",
enabled=True,
)
segments = query.all()
if not segments:
logger.info(f"No segments found for document {document.id}")
return
# Filter segments based on mode
segments_to_process = []
for segment in segments:
# Skip if summary already exists
existing_summary = (
db.session.query(DocumentSegmentSummary)
.filter_by(chunk_id=segment.id, dataset_id=dataset.id, status="completed")
.first()
)
if existing_summary:
continue
# For parent-child mode, all segments are parent chunks, so process all
segments_to_process.append(segment)
if not segments_to_process:
logger.info(f"No segments need summary generation for document {document.id}")
return
# Use ThreadPoolExecutor for concurrent generation
flask_app = current_app._get_current_object() # type: ignore
max_workers = min(10, len(segments_to_process)) # Limit to 10 workers
def process_segment(segment: DocumentSegment) -> None:
"""Process a single segment in a thread with Flask app context."""
with flask_app.app_context():
try:
SummaryIndexService.generate_and_vectorize_summary(
segment, dataset, summary_index_setting
)
except Exception as e:
logger.error(f"Failed to generate summary for segment {segment.id}: {str(e)}")
# Continue processing other segments
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = [
executor.submit(process_segment, segment) for segment in segments_to_process
]
# Wait for all tasks to complete
concurrent.futures.wait(futures)
logger.info(
f"Successfully generated summary index for {len(segments_to_process)} segments "
f"in document {document.id}"
)
except Exception as e:
logger.exception(f"Failed to generate summary index for document {document.id}: {str(e)}")
# Don't fail the entire indexing process if summary generation fails
else:
# Production mode: asynchronous generation
logger.info(f"Queuing summary index generation task for document {document.id} (production mode)")
try:
generate_summary_index_task.delay(dataset.id, document.id, None)
logger.info(f"Summary index generation task queued for document {document.id}")
except Exception as e:
logger.exception(f"Failed to queue summary index generation task for document {document.id}: {str(e)}")
# Don't fail the entire indexing process if task queuing fails
def _get_preview_output_with_summaries(
self, chunk_structure: str, chunks: Any, dataset: Dataset,
indexing_technique: str | None = None,
summary_index_setting: dict | None = None
) -> Mapping[str, Any]:
"""
Generate preview output with summaries for chunks in preview mode.
This method generates summaries on-the-fly without saving to database.
Args:
chunk_structure: Chunk structure type
chunks: Chunks to generate preview for
dataset: Dataset object (for tenant_id)
indexing_technique: Indexing technique from node config or dataset
summary_index_setting: Summary index setting from node config or dataset
"""
index_processor = IndexProcessorFactory(chunk_structure).init_index_processor()
return index_processor.format_preview(chunks)
preview_output = index_processor.format_preview(chunks)
# Check if summary index is enabled
if indexing_technique != "high_quality":
return preview_output
if not summary_index_setting or not summary_index_setting.get("enable"):
return preview_output
# Generate summaries for chunks
if "preview" in preview_output and isinstance(preview_output["preview"], list):
chunk_count = len(preview_output["preview"])
logger.info(
f"Generating summaries for {chunk_count} chunks in preview mode "
f"(dataset: {dataset.id})"
)
# Use ParagraphIndexProcessor's generate_summary method
from core.rag.index_processor.processor.paragraph_index_processor import ParagraphIndexProcessor
# Get Flask app for application context in worker threads
flask_app = None
try:
flask_app = current_app._get_current_object() # type: ignore
except RuntimeError:
logger.warning("No Flask application context available, summary generation may fail")
def generate_summary_for_chunk(preview_item: dict) -> None:
"""Generate summary for a single chunk."""
if "content" in preview_item:
try:
# Set Flask application context in worker thread
if flask_app:
with flask_app.app_context():
summary = ParagraphIndexProcessor.generate_summary(
tenant_id=dataset.tenant_id,
text=preview_item["content"],
summary_index_setting=summary_index_setting,
)
if summary:
preview_item["summary"] = summary
else:
# Fallback: try without app context (may fail)
summary = ParagraphIndexProcessor.generate_summary(
tenant_id=dataset.tenant_id,
text=preview_item["content"],
summary_index_setting=summary_index_setting,
)
if summary:
preview_item["summary"] = summary
except Exception as e:
logger.error(f"Failed to generate summary for chunk: {str(e)}")
# Don't fail the entire preview if summary generation fails
# Generate summaries concurrently using ThreadPoolExecutor
# Set a reasonable timeout to prevent hanging (60 seconds per chunk, max 5 minutes total)
timeout_seconds = min(300, 60 * len(preview_output["preview"]))
with concurrent.futures.ThreadPoolExecutor(max_workers=min(10, len(preview_output["preview"]))) as executor:
futures = [
executor.submit(generate_summary_for_chunk, preview_item)
for preview_item in preview_output["preview"]
]
# Wait for all tasks to complete with timeout
done, not_done = concurrent.futures.wait(futures, timeout=timeout_seconds)
# Cancel tasks that didn't complete in time
if not_done:
logger.warning(
f"Summary generation timeout: {len(not_done)} chunks did not complete within {timeout_seconds}s. "
"Cancelling remaining tasks..."
)
for future in not_done:
future.cancel()
# Wait a bit for cancellation to take effect
concurrent.futures.wait(not_done, timeout=5)
completed_count = sum(1 for item in preview_output["preview"] if item.get("summary") is not None)
logger.info(
f"Completed summary generation for preview chunks: {completed_count}/{len(preview_output['preview'])} succeeded"
)
return preview_output
def _get_preview_output(
self, chunk_structure: str, chunks: Any, dataset: Dataset | None = None, variable_pool: VariablePool | None = None
) -> Mapping[str, Any]:
index_processor = IndexProcessorFactory(chunk_structure).init_index_processor()
preview_output = index_processor.format_preview(chunks)
# If dataset is provided, try to enrich preview with summaries
if dataset and variable_pool:
document_id = variable_pool.get(["sys", SystemVariableKey.DOCUMENT_ID])
if document_id:
document = db.session.query(Document).filter_by(id=document_id.value).first()
if document:
# Query summaries for this document
summaries = (
db.session.query(DocumentSegmentSummary)
.filter_by(
dataset_id=dataset.id,
document_id=document.id,
status="completed",
enabled=True,
)
.all()
)
if summaries:
# Create a map of segment content to summary for matching
# Use content matching as chunks in preview might not be indexed yet
summary_by_content = {}
for summary in summaries:
segment = (
db.session.query(DocumentSegment)
.filter_by(id=summary.chunk_id, dataset_id=dataset.id)
.first()
)
if segment:
# Normalize content for matching (strip whitespace)
normalized_content = segment.content.strip()
summary_by_content[normalized_content] = summary.summary_content
# Enrich preview with summaries by content matching
if "preview" in preview_output and isinstance(preview_output["preview"], list):
matched_count = 0
for preview_item in preview_output["preview"]:
if "content" in preview_item:
# Normalize content for matching
normalized_chunk_content = preview_item["content"].strip()
if normalized_chunk_content in summary_by_content:
preview_item["summary"] = summary_by_content[normalized_chunk_content]
matched_count += 1
if matched_count > 0:
logger.info(
f"Enriched preview with {matched_count} existing summaries "
f"(dataset: {dataset.id}, document: {document.id})"
)
return preview_output
@classmethod
def version(cls) -> str:

View File

@ -99,6 +99,8 @@ def init_app(app: DifyApp) -> Celery:
imports = [
"tasks.async_workflow_tasks", # trigger workers
"tasks.trigger_processing_tasks", # async trigger processing
"tasks.generate_summary_index_task", # summary index generation
"tasks.regenerate_summary_index_task", # summary index regeneration
]
day = dify_config.CELERY_BEAT_SCHEDULER_TIME

View File

@ -39,6 +39,14 @@ dataset_retrieval_model_fields = {
"score_threshold_enabled": fields.Boolean,
"score_threshold": fields.Float,
}
dataset_summary_index_fields = {
"enable": fields.Boolean,
"model_name": fields.String,
"model_provider_name": fields.String,
"summary_prompt": fields.String,
}
external_retrieval_model_fields = {
"top_k": fields.Integer,
"score_threshold": fields.Float,
@ -83,6 +91,7 @@ dataset_detail_fields = {
"embedding_model_provider": fields.String,
"embedding_available": fields.Boolean,
"retrieval_model_dict": fields.Nested(dataset_retrieval_model_fields),
"summary_index_setting": fields.Nested(dataset_summary_index_fields),
"tags": fields.List(fields.Nested(tag_fields)),
"doc_form": fields.String,
"external_knowledge_info": fields.Nested(external_knowledge_info_fields),

View File

@ -33,6 +33,8 @@ document_fields = {
"hit_count": fields.Integer,
"doc_form": fields.String,
"doc_metadata": fields.List(fields.Nested(document_metadata_fields), attribute="doc_metadata_details"),
"summary_index_status": fields.String, # Summary index generation status: "waiting", "generating", "completed", "partial_error", or null if not enabled
"need_summary": fields.Boolean, # Whether this document needs summary index generation
}
document_with_segments_fields = {
@ -60,6 +62,8 @@ document_with_segments_fields = {
"completed_segments": fields.Integer,
"total_segments": fields.Integer,
"doc_metadata": fields.List(fields.Nested(document_metadata_fields), attribute="doc_metadata_details"),
"summary_index_status": fields.String, # Summary index generation status: "waiting", "generating", "completed", "partial_error", or null if not enabled
"need_summary": fields.Boolean, # Whether this document needs summary index generation
}
dataset_and_document_fields = {

View File

@ -58,4 +58,5 @@ hit_testing_record_fields = {
"score": fields.Float,
"tsne_position": fields.Raw,
"files": fields.List(fields.Nested(files_fields)),
"summary": fields.String, # Summary content if retrieved via summary index
}

View File

@ -49,4 +49,5 @@ segment_fields = {
"stopped_at": TimestampField,
"child_chunks": fields.List(fields.Nested(child_chunk_fields)),
"attachments": fields.List(fields.Nested(attachment_fields)),
"summary": fields.String, # Summary content for the segment
}

View File

@ -0,0 +1,69 @@
"""add SummaryIndex feature
Revision ID: 562dcce7d77c
Revises: 03ea244985ce
Create Date: 2026-01-12 13:58:40.584802
"""
from alembic import op
import models as models
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '562dcce7d77c'
down_revision = '03ea244985ce'
branch_labels = None
depends_on = None
def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.create_table('document_segment_summary',
sa.Column('id', models.types.StringUUID(), nullable=False),
sa.Column('dataset_id', models.types.StringUUID(), nullable=False),
sa.Column('document_id', models.types.StringUUID(), nullable=False),
sa.Column('chunk_id', models.types.StringUUID(), nullable=False),
sa.Column('summary_content', models.types.LongText(), nullable=True),
sa.Column('summary_index_node_id', sa.String(length=255), nullable=True),
sa.Column('summary_index_node_hash', sa.String(length=255), nullable=True),
sa.Column('status', sa.String(length=32), server_default=sa.text("'generating'"), nullable=False),
sa.Column('error', models.types.LongText(), nullable=True),
sa.Column('enabled', sa.Boolean(), server_default=sa.text('true'), nullable=False),
sa.Column('disabled_at', sa.DateTime(), nullable=True),
sa.Column('disabled_by', models.types.StringUUID(), nullable=True),
sa.Column('created_at', sa.DateTime(), server_default=sa.text('CURRENT_TIMESTAMP'), nullable=False),
sa.Column('updated_at', sa.DateTime(), server_default=sa.text('CURRENT_TIMESTAMP'), nullable=False),
sa.PrimaryKeyConstraint('id', name='document_segment_summary_pkey')
)
with op.batch_alter_table('document_segment_summary', schema=None) as batch_op:
batch_op.create_index('document_segment_summary_chunk_id_idx', ['chunk_id'], unique=False)
batch_op.create_index('document_segment_summary_dataset_id_idx', ['dataset_id'], unique=False)
batch_op.create_index('document_segment_summary_document_id_idx', ['document_id'], unique=False)
batch_op.create_index('document_segment_summary_status_idx', ['status'], unique=False)
with op.batch_alter_table('datasets', schema=None) as batch_op:
batch_op.add_column(sa.Column('summary_index_setting', models.types.AdjustedJSON(), nullable=True))
with op.batch_alter_table('documents', schema=None) as batch_op:
batch_op.add_column(sa.Column('need_summary', sa.Boolean(), server_default=sa.text('false'), nullable=True))
# ### end Alembic commands ###
def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
with op.batch_alter_table('documents', schema=None) as batch_op:
batch_op.drop_column('need_summary')
with op.batch_alter_table('datasets', schema=None) as batch_op:
batch_op.drop_column('summary_index_setting')
with op.batch_alter_table('document_segment_summary', schema=None) as batch_op:
batch_op.drop_index('document_segment_summary_status_idx')
batch_op.drop_index('document_segment_summary_document_id_idx')
batch_op.drop_index('document_segment_summary_dataset_id_idx')
batch_op.drop_index('document_segment_summary_chunk_id_idx')
op.drop_table('document_segment_summary')
# ### end Alembic commands ###

View File

@ -72,6 +72,7 @@ class Dataset(Base):
keyword_number = mapped_column(sa.Integer, nullable=True, server_default=sa.text("10"))
collection_binding_id = mapped_column(StringUUID, nullable=True)
retrieval_model = mapped_column(AdjustedJSON, nullable=True)
summary_index_setting = mapped_column(AdjustedJSON, nullable=True)
built_in_field_enabled = mapped_column(sa.Boolean, nullable=False, server_default=sa.text("false"))
icon_info = mapped_column(AdjustedJSON, nullable=True)
runtime_mode = mapped_column(sa.String(255), nullable=True, server_default=sa.text("'general'"))
@ -419,6 +420,7 @@ class Document(Base):
doc_metadata = mapped_column(AdjustedJSON, nullable=True)
doc_form = mapped_column(String(255), nullable=False, server_default=sa.text("'text_model'"))
doc_language = mapped_column(String(255), nullable=True)
need_summary: Mapped[bool | None] = mapped_column(sa.Boolean, nullable=True, server_default=sa.text("false"))
DATA_SOURCES = ["upload_file", "notion_import", "website_crawl"]
@ -1567,3 +1569,34 @@ class SegmentAttachmentBinding(Base):
segment_id: Mapped[str] = mapped_column(StringUUID, nullable=False)
attachment_id: Mapped[str] = mapped_column(StringUUID, nullable=False)
created_at: Mapped[datetime] = mapped_column(sa.DateTime, nullable=False, server_default=func.current_timestamp())
class DocumentSegmentSummary(Base):
__tablename__ = "document_segment_summary"
__table_args__ = (
sa.PrimaryKeyConstraint("id", name="document_segment_summary_pkey"),
sa.Index("document_segment_summary_dataset_id_idx", "dataset_id"),
sa.Index("document_segment_summary_document_id_idx", "document_id"),
sa.Index("document_segment_summary_chunk_id_idx", "chunk_id"),
sa.Index("document_segment_summary_status_idx", "status"),
)
id: Mapped[str] = mapped_column(StringUUID, nullable=False, default=lambda: str(uuid4()))
dataset_id: Mapped[str] = mapped_column(StringUUID, nullable=False)
document_id: Mapped[str] = mapped_column(StringUUID, nullable=False)
# corresponds to DocumentSegment.id or parent chunk id
chunk_id: Mapped[str] = mapped_column(StringUUID, nullable=False)
summary_content: Mapped[str] = mapped_column(LongText, nullable=True)
summary_index_node_id: Mapped[str] = mapped_column(String(255), nullable=True)
summary_index_node_hash: Mapped[str] = mapped_column(String(255), nullable=True)
status: Mapped[str] = mapped_column(String(32), nullable=False, server_default=sa.text("'generating'"))
error: Mapped[str] = mapped_column(LongText, nullable=True)
enabled: Mapped[bool] = mapped_column(sa.Boolean, nullable=False, server_default=sa.text("true"))
disabled_at: Mapped[datetime | None] = mapped_column(DateTime, nullable=True)
disabled_by = mapped_column(StringUUID, nullable=True)
created_at: Mapped[datetime] = mapped_column(DateTime, nullable=False, server_default=func.current_timestamp())
updated_at: Mapped[datetime] = mapped_column(DateTime, nullable=False, server_default=func.current_timestamp(), onupdate=func.current_timestamp())
def __repr__(self):
return f"<DocumentSegmentSummary id={self.id} chunk_id={self.chunk_id} status={self.status}>"

View File

@ -89,6 +89,7 @@ from tasks.enable_segments_to_index_task import enable_segments_to_index_task
from tasks.recover_document_indexing_task import recover_document_indexing_task
from tasks.remove_document_from_index_task import remove_document_from_index_task
from tasks.retry_document_indexing_task import retry_document_indexing_task
from tasks.regenerate_summary_index_task import regenerate_summary_index_task
from tasks.sync_website_document_indexing_task import sync_website_document_indexing_task
logger = logging.getLogger(__name__)
@ -473,6 +474,11 @@ class DatasetService:
if external_retrieval_model:
dataset.retrieval_model = external_retrieval_model
# Update summary index setting if provided
summary_index_setting = data.get("summary_index_setting", None)
if summary_index_setting is not None:
dataset.summary_index_setting = summary_index_setting
# Update basic dataset properties
dataset.name = data.get("name", dataset.name)
dataset.description = data.get("description", dataset.description)
@ -555,12 +561,20 @@ class DatasetService:
# Handle indexing technique changes and embedding model updates
action = DatasetService._handle_indexing_technique_change(dataset, data, filtered_data)
# Check if summary_index_setting model changed (before updating database)
summary_model_changed = DatasetService._check_summary_index_setting_model_changed(
dataset, data
)
# Add metadata fields
filtered_data["updated_by"] = user.id
filtered_data["updated_at"] = naive_utc_now()
# update Retrieval model
if data.get("retrieval_model"):
filtered_data["retrieval_model"] = data["retrieval_model"]
# update summary index setting
if data.get("summary_index_setting"):
filtered_data["summary_index_setting"] = data.get("summary_index_setting")
# update icon info
if data.get("icon_info"):
filtered_data["icon_info"] = data.get("icon_info")
@ -569,12 +583,30 @@ class DatasetService:
db.session.query(Dataset).filter_by(id=dataset.id).update(filtered_data)
db.session.commit()
# Reload dataset to get updated values
db.session.refresh(dataset)
# update pipeline knowledge base node data
DatasetService._update_pipeline_knowledge_base_node_data(dataset, user.id)
# Trigger vector index task if indexing technique changed
if action:
deal_dataset_vector_index_task.delay(dataset.id, action)
# If embedding_model changed, also regenerate summary vectors
if action == "update":
regenerate_summary_index_task.delay(
dataset.id,
regenerate_reason="embedding_model_changed",
regenerate_vectors_only=True,
)
# Trigger summary index regeneration if summary model changed
if summary_model_changed:
regenerate_summary_index_task.delay(
dataset.id,
regenerate_reason="summary_model_changed",
regenerate_vectors_only=False,
)
return dataset
@ -613,6 +645,7 @@ class DatasetService:
knowledge_index_node_data["chunk_structure"] = dataset.chunk_structure
knowledge_index_node_data["indexing_technique"] = dataset.indexing_technique # pyright: ignore[reportAttributeAccessIssue]
knowledge_index_node_data["keyword_number"] = dataset.keyword_number
knowledge_index_node_data["summary_index_setting"] = dataset.summary_index_setting
node["data"] = knowledge_index_node_data
updated = True
except Exception:
@ -851,6 +884,49 @@ class DatasetService:
)
filtered_data["collection_binding_id"] = dataset_collection_binding.id
@staticmethod
def _check_summary_index_setting_model_changed(dataset: Dataset, data: dict[str, Any]) -> bool:
"""
Check if summary_index_setting model (model_name or model_provider_name) has changed.
Args:
dataset: Current dataset object
data: Update data dictionary
Returns:
bool: True if summary model changed, False otherwise
"""
# Check if summary_index_setting is being updated
if "summary_index_setting" not in data or data.get("summary_index_setting") is None:
return False
new_summary_setting = data.get("summary_index_setting")
old_summary_setting = dataset.summary_index_setting
# If old setting doesn't exist or is disabled, no need to regenerate
if not old_summary_setting or not old_summary_setting.get("enable"):
return False
# If new setting is disabled, no need to regenerate
if not new_summary_setting or not new_summary_setting.get("enable"):
return False
# Compare model_name and model_provider_name
old_model_name = old_summary_setting.get("model_name")
old_model_provider = old_summary_setting.get("model_provider_name")
new_model_name = new_summary_setting.get("model_name")
new_model_provider = new_summary_setting.get("model_provider_name")
# Check if model changed
if old_model_name != new_model_name or old_model_provider != new_model_provider:
logger.info(
f"Summary index setting model changed for dataset {dataset.id}: "
f"old={old_model_provider}/{old_model_name}, new={new_model_provider}/{new_model_name}"
)
return True
return False
@staticmethod
def update_rag_pipeline_dataset_settings(
session: Session, dataset: Dataset, knowledge_configuration: KnowledgeConfiguration, has_published: bool = False
@ -1823,6 +1899,8 @@ class DocumentService:
DuplicateDocumentIndexingTaskProxy(
dataset.tenant_id, dataset.id, duplicate_document_ids
).delay()
# Note: Summary index generation is triggered in document_indexing_task after indexing completes
# to ensure segments are available. See tasks/document_indexing_task.py
except LockNotOwnedError:
pass
@ -2127,6 +2205,14 @@ class DocumentService:
name: str,
batch: str,
):
# Set need_summary based on dataset's summary_index_setting
need_summary = False
if (
dataset.summary_index_setting
and dataset.summary_index_setting.get("enable") is True
):
need_summary = True
document = Document(
tenant_id=dataset.tenant_id,
dataset_id=dataset.id,
@ -2140,6 +2226,7 @@ class DocumentService:
created_by=account.id,
doc_form=document_form,
doc_language=document_language,
need_summary=need_summary,
)
doc_metadata = {}
if dataset.built_in_field_enabled:
@ -2364,6 +2451,7 @@ class DocumentService:
embedding_model_provider=knowledge_config.embedding_model_provider,
collection_binding_id=dataset_collection_binding_id,
retrieval_model=retrieval_model.model_dump() if retrieval_model else None,
summary_index_setting=knowledge_config.summary_index_setting,
is_multimodal=knowledge_config.is_multimodal,
)
@ -2545,6 +2633,14 @@ class DocumentService:
if not isinstance(args["process_rule"]["rules"]["segmentation"]["max_tokens"], int):
raise ValueError("Process rule segmentation max_tokens is invalid")
# valid summary index setting
if args["process_rule"]["summary_index_setting"] and args["process_rule"]["summary_index_setting"]["enable"]:
summary_index_setting = args["process_rule"]["summary_index_setting"]
if "model_name" not in summary_index_setting or not summary_index_setting["model_name"]:
raise ValueError("Summary index model name is required")
if "model_provider_name" not in summary_index_setting or not summary_index_setting["model_provider_name"]:
raise ValueError("Summary index model provider name is required")
@staticmethod
def batch_update_document_status(
dataset: Dataset, document_ids: list[str], action: Literal["enable", "disable", "archive", "un_archive"], user
@ -3013,6 +3109,37 @@ class SegmentService:
if args.enabled or keyword_changed:
# update segment vector index
VectorService.update_segment_vector(args.keywords, segment, dataset)
# update summary index if summary is provided and has changed
if args.summary is not None:
# Check if summary index is enabled
has_summary_index = (
dataset.indexing_technique == "high_quality"
and dataset.summary_index_setting
and dataset.summary_index_setting.get("enable") is True
)
if has_summary_index:
# Query existing summary from database
from models.dataset import DocumentSegmentSummary
existing_summary = (
db.session.query(DocumentSegmentSummary)
.where(
DocumentSegmentSummary.chunk_id == segment.id,
DocumentSegmentSummary.dataset_id == dataset.id,
)
.first()
)
# Check if summary has changed
existing_summary_content = existing_summary.summary_content if existing_summary else None
if existing_summary_content != args.summary:
# Summary has changed, update it
from services.summary_index_service import SummaryIndexService
try:
SummaryIndexService.update_summary_for_segment(segment, dataset, args.summary)
except Exception as e:
logger.exception(f"Failed to update summary for segment {segment.id}: {str(e)}")
# Don't fail the entire update if summary update fails
else:
segment_hash = helper.generate_text_hash(content)
tokens = 0
@ -3087,6 +3214,15 @@ class SegmentService:
elif document.doc_form in (IndexStructureType.PARAGRAPH_INDEX, IndexStructureType.QA_INDEX):
# update segment vector index
VectorService.update_segment_vector(args.keywords, segment, dataset)
# update summary index if summary is provided
if args.summary is not None:
from services.summary_index_service import SummaryIndexService
try:
SummaryIndexService.update_summary_for_segment(segment, dataset, args.summary)
except Exception as e:
logger.exception(f"Failed to update summary for segment {segment.id}: {str(e)}")
# Don't fail the entire update if summary update fails
# update multimodel vector index
VectorService.update_multimodel_vector(segment, args.attachment_ids or [], dataset)
except Exception as e:

View File

@ -119,6 +119,7 @@ class KnowledgeConfig(BaseModel):
data_source: DataSource | None = None
process_rule: ProcessRule | None = None
retrieval_model: RetrievalModel | None = None
summary_index_setting: dict | None = None
doc_form: str = "text_model"
doc_language: str = "English"
embedding_model: str | None = None
@ -141,6 +142,7 @@ class SegmentUpdateArgs(BaseModel):
regenerate_child_chunks: bool = False
enabled: bool | None = None
attachment_ids: list[str] | None = None
summary: str | None = None # Summary content for summary index
class ChildChunkUpdateArgs(BaseModel):

View File

@ -0,0 +1,612 @@
"""Summary index service for generating and managing document segment summaries."""
import logging
import time
import uuid
from typing import Any
from core.rag.datasource.vdb.vector_factory import Vector
from core.rag.index_processor.constant.doc_type import DocType
from core.rag.models.document import Document
from extensions.ext_database import db
from libs import helper
from models.dataset import Dataset, DocumentSegment, DocumentSegmentSummary
from models.dataset import Document as DatasetDocument
logger = logging.getLogger(__name__)
class SummaryIndexService:
"""Service for generating and managing summary indexes."""
@staticmethod
def generate_summary_for_segment(
segment: DocumentSegment,
dataset: Dataset,
summary_index_setting: dict,
) -> str:
"""
Generate summary for a single segment.
Args:
segment: DocumentSegment to generate summary for
dataset: Dataset containing the segment
summary_index_setting: Summary index configuration
Returns:
Generated summary text
Raises:
ValueError: If summary_index_setting is invalid or generation fails
"""
# Reuse the existing generate_summary method from ParagraphIndexProcessor
# Use lazy import to avoid circular import
from core.rag.index_processor.processor.paragraph_index_processor import ParagraphIndexProcessor
summary_content = ParagraphIndexProcessor.generate_summary(
tenant_id=dataset.tenant_id,
text=segment.content,
summary_index_setting=summary_index_setting,
)
if not summary_content:
raise ValueError("Generated summary is empty")
return summary_content
@staticmethod
def create_summary_record(
segment: DocumentSegment,
dataset: Dataset,
summary_content: str,
status: str = "generating",
) -> DocumentSegmentSummary:
"""
Create or update a DocumentSegmentSummary record.
If a summary record already exists for this segment, it will be updated instead of creating a new one.
Args:
segment: DocumentSegment to create summary for
dataset: Dataset containing the segment
summary_content: Generated summary content
status: Summary status (default: "generating")
Returns:
Created or updated DocumentSegmentSummary instance
"""
# Check if summary record already exists
existing_summary = (
db.session.query(DocumentSegmentSummary)
.filter_by(chunk_id=segment.id, dataset_id=dataset.id)
.first()
)
if existing_summary:
# Update existing record
existing_summary.summary_content = summary_content
existing_summary.status = status
existing_summary.error = None # Clear any previous errors
# Re-enable if it was disabled
if not existing_summary.enabled:
existing_summary.enabled = True
existing_summary.disabled_at = None
existing_summary.disabled_by = None
db.session.add(existing_summary)
db.session.flush()
return existing_summary
else:
# Create new record (enabled by default)
summary_record = DocumentSegmentSummary(
dataset_id=dataset.id,
document_id=segment.document_id,
chunk_id=segment.id,
summary_content=summary_content,
status=status,
enabled=True, # Explicitly set enabled to True
)
db.session.add(summary_record)
db.session.flush()
return summary_record
@staticmethod
def vectorize_summary(
summary_record: DocumentSegmentSummary,
segment: DocumentSegment,
dataset: Dataset,
) -> None:
"""
Vectorize summary and store in vector database.
Args:
summary_record: DocumentSegmentSummary record
segment: Original DocumentSegment
dataset: Dataset containing the segment
"""
if dataset.indexing_technique != "high_quality":
logger.warning(
f"Summary vectorization skipped for dataset {dataset.id}: "
"indexing_technique is not high_quality"
)
return
# Reuse existing index_node_id if available (like segment does), otherwise generate new one
old_summary_node_id = summary_record.summary_index_node_id
if old_summary_node_id:
# Reuse existing index_node_id (like segment behavior)
summary_index_node_id = old_summary_node_id
else:
# Generate new index node ID only for new summaries
summary_index_node_id = str(uuid.uuid4())
# Always regenerate hash (in case summary content changed)
summary_hash = helper.generate_text_hash(summary_record.summary_content)
# Delete old vector only if we're reusing the same index_node_id (to overwrite)
# If index_node_id changed, the old vector should have been deleted elsewhere
if old_summary_node_id and old_summary_node_id == summary_index_node_id:
try:
vector = Vector(dataset)
vector.delete_by_ids([old_summary_node_id])
except Exception as e:
logger.warning(
f"Failed to delete old summary vector for segment {segment.id}: {str(e)}. "
"Continuing with new vectorization."
)
# Create document with summary content and metadata
summary_document = Document(
page_content=summary_record.summary_content,
metadata={
"doc_id": summary_index_node_id,
"doc_hash": summary_hash,
"dataset_id": dataset.id,
"document_id": segment.document_id,
"original_chunk_id": segment.id, # Key: link to original chunk
"doc_type": DocType.TEXT,
"is_summary": True, # Identifier for summary documents
},
)
# Vectorize and store with retry mechanism for connection errors
max_retries = 3
retry_delay = 2.0
for attempt in range(max_retries):
try:
vector = Vector(dataset)
vector.add_texts([summary_document], duplicate_check=True)
# Success - update summary record with index node info
summary_record.summary_index_node_id = summary_index_node_id
summary_record.summary_index_node_hash = summary_hash
summary_record.status = "completed"
db.session.add(summary_record)
db.session.flush()
return # Success, exit function
except (ConnectionError, Exception) as e:
error_str = str(e).lower()
# Check if it's a connection-related error that might be transient
is_connection_error = any(keyword in error_str for keyword in [
"connection", "disconnected", "timeout", "network",
"could not connect", "server disconnected", "weaviate"
])
if is_connection_error and attempt < max_retries - 1:
# Retry for connection errors
wait_time = retry_delay * (2 ** attempt) # Exponential backoff
logger.warning(
f"Vectorization attempt {attempt + 1}/{max_retries} failed for segment {segment.id}: {str(e)}. "
f"Retrying in {wait_time:.1f} seconds..."
)
time.sleep(wait_time)
continue
else:
# Final attempt failed or non-connection error - log and update status
logger.error(
f"Failed to vectorize summary for segment {segment.id} after {attempt + 1} attempts: {str(e)}",
exc_info=True
)
summary_record.status = "error"
summary_record.error = f"Vectorization failed: {str(e)}"
db.session.add(summary_record)
db.session.flush()
raise
@staticmethod
def generate_and_vectorize_summary(
segment: DocumentSegment,
dataset: Dataset,
summary_index_setting: dict,
) -> DocumentSegmentSummary:
"""
Generate summary for a segment and vectorize it.
Args:
segment: DocumentSegment to generate summary for
dataset: Dataset containing the segment
summary_index_setting: Summary index configuration
Returns:
Created DocumentSegmentSummary instance
Raises:
ValueError: If summary generation fails
"""
try:
# Generate summary
summary_content = SummaryIndexService.generate_summary_for_segment(
segment, dataset, summary_index_setting
)
# Create or update summary record (will handle overwrite internally)
summary_record = SummaryIndexService.create_summary_record(
segment, dataset, summary_content, status="generating"
)
# Vectorize summary (will delete old vector if exists before creating new one)
SummaryIndexService.vectorize_summary(summary_record, segment, dataset)
db.session.commit()
logger.info(f"Successfully generated and vectorized summary for segment {segment.id}")
return summary_record
except Exception as e:
logger.exception(f"Failed to generate summary for segment {segment.id}: {str(e)}")
# Update summary record with error status if it exists
summary_record = (
db.session.query(DocumentSegmentSummary)
.filter_by(chunk_id=segment.id, dataset_id=dataset.id)
.first()
)
if summary_record:
summary_record.status = "error"
summary_record.error = str(e)
db.session.add(summary_record)
db.session.commit()
raise
@staticmethod
def generate_summaries_for_document(
dataset: Dataset,
document: DatasetDocument,
summary_index_setting: dict,
segment_ids: list[str] | None = None,
only_parent_chunks: bool = False,
) -> list[DocumentSegmentSummary]:
"""
Generate summaries for all segments in a document including vectorization.
Args:
dataset: Dataset containing the document
document: DatasetDocument to generate summaries for
summary_index_setting: Summary index configuration
segment_ids: Optional list of specific segment IDs to process
only_parent_chunks: If True, only process parent chunks (for parent-child mode)
Returns:
List of created DocumentSegmentSummary instances
"""
# Only generate summary index for high_quality indexing technique
if dataset.indexing_technique != "high_quality":
logger.info(
f"Skipping summary generation for dataset {dataset.id}: "
f"indexing_technique is {dataset.indexing_technique}, not 'high_quality'"
)
return []
if not summary_index_setting or not summary_index_setting.get("enable"):
logger.info(f"Summary index is disabled for dataset {dataset.id}")
return []
# Skip qa_model documents
if document.doc_form == "qa_model":
logger.info(f"Skipping summary generation for qa_model document {document.id}")
return []
logger.info(
f"Starting summary generation for document {document.id} in dataset {dataset.id}, "
f"segment_ids: {len(segment_ids) if segment_ids else 'all'}, "
f"only_parent_chunks: {only_parent_chunks}"
)
# Query segments (only enabled segments)
query = db.session.query(DocumentSegment).filter_by(
dataset_id=dataset.id,
document_id=document.id,
status="completed",
enabled=True, # Only generate summaries for enabled segments
)
if segment_ids:
query = query.filter(DocumentSegment.id.in_(segment_ids))
segments = query.all()
if not segments:
logger.info(f"No segments found for document {document.id}")
return []
summary_records = []
for segment in segments:
# For parent-child mode, only process parent chunks
# In parent-child mode, all DocumentSegments are parent chunks,
# so we process all of them. Child chunks are stored in ChildChunk table
# and are not DocumentSegments, so they won't be in the segments list.
# This check is mainly for clarity and future-proofing.
if only_parent_chunks:
# In parent-child mode, all segments in the query are parent chunks
# Child chunks are not DocumentSegments, so they won't appear here
# We can process all segments
pass
try:
summary_record = SummaryIndexService.generate_and_vectorize_summary(
segment, dataset, summary_index_setting
)
summary_records.append(summary_record)
except Exception as e:
logger.error(f"Failed to generate summary for segment {segment.id}: {str(e)}")
# Continue with other segments
continue
logger.info(
f"Completed summary generation for document {document.id}: "
f"{len(summary_records)} summaries generated and vectorized"
)
return summary_records
@staticmethod
def disable_summaries_for_segments(
dataset: Dataset,
segment_ids: list[str] | None = None,
disabled_by: str | None = None,
) -> None:
"""
Disable summary records and remove vectors from vector database for segments.
Unlike delete, this preserves the summary records but marks them as disabled.
Args:
dataset: Dataset containing the segments
segment_ids: List of segment IDs to disable summaries for. If None, disable all.
disabled_by: User ID who disabled the summaries
"""
from libs.datetime_utils import naive_utc_now
query = db.session.query(DocumentSegmentSummary).filter_by(
dataset_id=dataset.id,
enabled=True, # Only disable enabled summaries
)
if segment_ids:
query = query.filter(DocumentSegmentSummary.chunk_id.in_(segment_ids))
summaries = query.all()
if not summaries:
return
logger.info(
f"Disabling {len(summaries)} summary records for dataset {dataset.id}, "
f"segment_ids: {len(segment_ids) if segment_ids else 'all'}"
)
# Remove from vector database (but keep records)
if dataset.indexing_technique == "high_quality":
summary_node_ids = [
s.summary_index_node_id for s in summaries if s.summary_index_node_id
]
if summary_node_ids:
try:
vector = Vector(dataset)
vector.delete_by_ids(summary_node_ids)
except Exception as e:
logger.warning(f"Failed to remove summary vectors: {str(e)}")
# Disable summary records (don't delete)
now = naive_utc_now()
for summary in summaries:
summary.enabled = False
summary.disabled_at = now
summary.disabled_by = disabled_by
db.session.add(summary)
db.session.commit()
logger.info(f"Disabled {len(summaries)} summary records for dataset {dataset.id}")
@staticmethod
def enable_summaries_for_segments(
dataset: Dataset,
segment_ids: list[str] | None = None,
) -> None:
"""
Enable summary records and re-add vectors to vector database for segments.
Args:
dataset: Dataset containing the segments
segment_ids: List of segment IDs to enable summaries for. If None, enable all.
"""
# Only enable summary index for high_quality indexing technique
if dataset.indexing_technique != "high_quality":
return
# Check if summary index is enabled
summary_index_setting = dataset.summary_index_setting
if not summary_index_setting or not summary_index_setting.get("enable"):
return
query = db.session.query(DocumentSegmentSummary).filter_by(
dataset_id=dataset.id,
enabled=False, # Only enable disabled summaries
)
if segment_ids:
query = query.filter(DocumentSegmentSummary.chunk_id.in_(segment_ids))
summaries = query.all()
if not summaries:
return
logger.info(
f"Enabling {len(summaries)} summary records for dataset {dataset.id}, "
f"segment_ids: {len(segment_ids) if segment_ids else 'all'}"
)
# Re-vectorize and re-add to vector database
enabled_count = 0
for summary in summaries:
# Get the original segment
segment = db.session.query(DocumentSegment).filter_by(
id=summary.chunk_id,
dataset_id=dataset.id,
).first()
if not segment or not segment.enabled or segment.status != "completed":
continue
if not summary.summary_content:
continue
try:
# Re-vectorize summary
SummaryIndexService.vectorize_summary(summary, segment, dataset)
# Enable summary record
summary.enabled = True
summary.disabled_at = None
summary.disabled_by = None
db.session.add(summary)
enabled_count += 1
except Exception as e:
logger.error(f"Failed to re-vectorize summary {summary.id}: {str(e)}")
# Keep it disabled if vectorization fails
continue
db.session.commit()
logger.info(f"Enabled {enabled_count} summary records for dataset {dataset.id}")
@staticmethod
def delete_summaries_for_segments(
dataset: Dataset,
segment_ids: list[str] | None = None,
) -> None:
"""
Delete summary records and vectors for segments (used only for actual deletion scenarios).
For disable/enable operations, use disable_summaries_for_segments/enable_summaries_for_segments.
Args:
dataset: Dataset containing the segments
segment_ids: List of segment IDs to delete summaries for. If None, delete all.
"""
query = db.session.query(DocumentSegmentSummary).filter_by(dataset_id=dataset.id)
if segment_ids:
query = query.filter(DocumentSegmentSummary.chunk_id.in_(segment_ids))
summaries = query.all()
if not summaries:
return
# Delete from vector database
if dataset.indexing_technique == "high_quality":
summary_node_ids = [
s.summary_index_node_id for s in summaries if s.summary_index_node_id
]
if summary_node_ids:
vector = Vector(dataset)
vector.delete_by_ids(summary_node_ids)
# Delete summary records
for summary in summaries:
db.session.delete(summary)
db.session.commit()
logger.info(f"Deleted {len(summaries)} summary records for dataset {dataset.id}")
@staticmethod
def update_summary_for_segment(
segment: DocumentSegment,
dataset: Dataset,
summary_content: str,
) -> DocumentSegmentSummary | None:
"""
Update summary for a segment and re-vectorize it.
Args:
segment: DocumentSegment to update summary for
dataset: Dataset containing the segment
summary_content: New summary content
Returns:
Updated DocumentSegmentSummary instance, or None if summary index is not enabled
"""
# Only update summary index for high_quality indexing technique
if dataset.indexing_technique != "high_quality":
return None
# Check if summary index is enabled
summary_index_setting = dataset.summary_index_setting
if not summary_index_setting or not summary_index_setting.get("enable"):
return None
# Skip qa_model documents
if segment.document and segment.document.doc_form == "qa_model":
return None
try:
# Find existing summary record
summary_record = (
db.session.query(DocumentSegmentSummary)
.filter_by(chunk_id=segment.id, dataset_id=dataset.id)
.first()
)
if summary_record:
# Update existing summary
old_summary_node_id = summary_record.summary_index_node_id
# Update summary content
summary_record.summary_content = summary_content
summary_record.status = "generating"
db.session.add(summary_record)
db.session.flush()
# Delete old vector if exists
if old_summary_node_id:
vector = Vector(dataset)
vector.delete_by_ids([old_summary_node_id])
# Re-vectorize summary
SummaryIndexService.vectorize_summary(summary_record, segment, dataset)
db.session.commit()
logger.info(f"Successfully updated and re-vectorized summary for segment {segment.id}")
return summary_record
else:
# Create new summary record if doesn't exist
summary_record = SummaryIndexService.create_summary_record(
segment, dataset, summary_content, status="generating"
)
SummaryIndexService.vectorize_summary(summary_record, segment, dataset)
db.session.commit()
logger.info(f"Successfully created and vectorized summary for segment {segment.id}")
return summary_record
except Exception as e:
logger.exception(f"Failed to update summary for segment {segment.id}: {str(e)}")
# Update summary record with error status if it exists
summary_record = (
db.session.query(DocumentSegmentSummary)
.filter_by(chunk_id=segment.id, dataset_id=dataset.id)
.first()
)
if summary_record:
summary_record.status = "error"
summary_record.error = str(e)
db.session.add(summary_record)
db.session.commit()
raise

View File

@ -117,6 +117,18 @@ def add_document_to_index_task(dataset_document_id: str):
)
db.session.commit()
# Enable summary indexes for all segments in this document
from services.summary_index_service import SummaryIndexService
segment_ids_list = [segment.id for segment in segments]
if segment_ids_list:
try:
SummaryIndexService.enable_summaries_for_segments(
dataset=dataset,
segment_ids=segment_ids_list,
)
except Exception as e:
logger.warning(f"Failed to enable summaries for document {dataset_document.id}: {str(e)}")
end_at = time.perf_counter()
logger.info(
click.style(f"Document added to index: {dataset_document.id} latency: {end_at - start_at}", fg="green")

View File

@ -42,6 +42,7 @@ def delete_segment_from_index_task(
doc_form = dataset_document.doc_form
# Proceed with index cleanup using the index_node_ids directly
# For actual deletion, we should delete summaries (not just disable them)
index_processor = IndexProcessorFactory(doc_form).init_index_processor()
index_processor.clean(
dataset,
@ -49,6 +50,7 @@ def delete_segment_from_index_task(
with_keywords=True,
delete_child_chunks=True,
precomputed_child_node_ids=child_node_ids,
delete_summaries=True, # Actually delete summaries when segment is deleted
)
if dataset.is_multimodal:
# delete segment attachment binding

View File

@ -53,6 +53,17 @@ def disable_segment_from_index_task(segment_id: str):
logger.info(click.style(f"Segment {segment.id} document status is invalid, pass.", fg="cyan"))
return
# Disable summary index for this segment
from services.summary_index_service import SummaryIndexService
try:
SummaryIndexService.disable_summaries_for_segments(
dataset=dataset,
segment_ids=[segment.id],
disabled_by=segment.disabled_by,
)
except Exception as e:
logger.warning(f"Failed to disable summary for segment {segment.id}: {str(e)}")
index_type = dataset_document.doc_form
index_processor = IndexProcessorFactory(index_type).init_index_processor()
index_processor.clean(dataset, [segment.index_node_id])

View File

@ -58,12 +58,25 @@ def disable_segments_from_index_task(segment_ids: list, dataset_id: str, documen
return
try:
# Disable summary indexes for these segments
from services.summary_index_service import SummaryIndexService
segment_ids_list = [segment.id for segment in segments]
try:
# Get disabled_by from first segment (they should all have the same disabled_by)
disabled_by = segments[0].disabled_by if segments else None
SummaryIndexService.disable_summaries_for_segments(
dataset=dataset,
segment_ids=segment_ids_list,
disabled_by=disabled_by,
)
except Exception as e:
logger.warning(f"Failed to disable summaries for segments: {str(e)}")
index_node_ids = [segment.index_node_id for segment in segments]
if dataset.is_multimodal:
segment_ids = [segment.id for segment in segments]
segment_attachment_bindings = (
db.session.query(SegmentAttachmentBinding)
.where(SegmentAttachmentBinding.segment_id.in_(segment_ids))
.where(SegmentAttachmentBinding.segment_id.in_(segment_ids_list))
.all()
)
if segment_attachment_bindings:

View File

@ -8,6 +8,7 @@ from celery import shared_task
from configs import dify_config
from core.entities.document_task import DocumentTask
from core.indexing_runner import DocumentIsPausedError, IndexingRunner
from tasks.generate_summary_index_task import generate_summary_index_task
from core.rag.pipeline.queue import TenantIsolatedTaskQueue
from enums.cloud_plan import CloudPlan
from extensions.ext_database import db
@ -100,6 +101,60 @@ def _document_indexing(dataset_id: str, document_ids: Sequence[str]):
indexing_runner.run(documents)
end_at = time.perf_counter()
logger.info(click.style(f"Processed dataset: {dataset_id} latency: {end_at - start_at}", fg="green"))
# Trigger summary index generation for completed documents if enabled
# Only generate for high_quality indexing technique and when summary_index_setting is enabled
# Re-query dataset to get latest summary_index_setting (in case it was updated)
dataset = db.session.query(Dataset).where(Dataset.id == dataset_id).first()
if not dataset:
logger.warning(f"Dataset {dataset_id} not found after indexing")
return
if dataset.indexing_technique == "high_quality":
summary_index_setting = dataset.summary_index_setting
if summary_index_setting and summary_index_setting.get("enable"):
# Check each document's indexing status and trigger summary generation if completed
for document_id in document_ids:
# Re-query document to get latest status (IndexingRunner may have updated it)
document = (
db.session.query(Document)
.where(Document.id == document_id, Document.dataset_id == dataset_id)
.first()
)
if document:
logger.info(
f"Checking document {document_id} for summary generation: "
f"status={document.indexing_status}, doc_form={document.doc_form}"
)
if document.indexing_status == "completed" and document.doc_form != "qa_model":
try:
generate_summary_index_task.delay(dataset.id, document_id, None)
logger.info(
f"Queued summary index generation task for document {document_id} "
f"in dataset {dataset.id} after indexing completed"
)
except Exception as e:
logger.exception(
f"Failed to queue summary index generation task for document {document_id}: {str(e)}"
)
# Don't fail the entire indexing process if summary task queuing fails
else:
logger.info(
f"Skipping summary generation for document {document_id}: "
f"status={document.indexing_status}, doc_form={document.doc_form}"
)
else:
logger.warning(f"Document {document_id} not found after indexing")
else:
logger.info(
f"Summary index generation skipped for dataset {dataset.id}: "
f"summary_index_setting.enable={summary_index_setting.get('enable') if summary_index_setting else None}"
)
else:
logger.info(
f"Summary index generation skipped for dataset {dataset.id}: "
f"indexing_technique={dataset.indexing_technique} (not 'high_quality')"
)
except DocumentIsPausedError as ex:
logger.info(click.style(str(ex), fg="yellow"))
except Exception:

View File

@ -103,6 +103,16 @@ def enable_segment_to_index_task(segment_id: str):
# save vector index
index_processor.load(dataset, [document], multimodal_documents=multimodel_documents)
# Enable summary index for this segment
from services.summary_index_service import SummaryIndexService
try:
SummaryIndexService.enable_summaries_for_segments(
dataset=dataset,
segment_ids=[segment.id],
)
except Exception as e:
logger.warning(f"Failed to enable summary for segment {segment.id}: {str(e)}")
end_at = time.perf_counter()
logger.info(click.style(f"Segment enabled to index: {segment.id} latency: {end_at - start_at}", fg="green"))
except Exception as e:

View File

@ -108,6 +108,17 @@ def enable_segments_to_index_task(segment_ids: list, dataset_id: str, document_i
# save vector index
index_processor.load(dataset, documents, multimodal_documents=multimodal_documents)
# Enable summary indexes for these segments
from services.summary_index_service import SummaryIndexService
segment_ids_list = [segment.id for segment in segments]
try:
SummaryIndexService.enable_summaries_for_segments(
dataset=dataset,
segment_ids=segment_ids_list,
)
except Exception as e:
logger.warning(f"Failed to enable summaries for segments: {str(e)}")
end_at = time.perf_counter()
logger.info(click.style(f"Segments enabled to index latency: {end_at - start_at}", fg="green"))
except Exception as e:

View File

@ -0,0 +1,113 @@
"""Async task for generating summary indexes."""
import logging
import time
import click
from celery import shared_task
from extensions.ext_database import db
from models.dataset import Dataset, DocumentSegment
from models.dataset import Document as DatasetDocument
from services.summary_index_service import SummaryIndexService
logger = logging.getLogger(__name__)
@shared_task(queue="dataset")
def generate_summary_index_task(dataset_id: str, document_id: str, segment_ids: list[str] | None = None):
"""
Async generate summary index for document segments.
Args:
dataset_id: Dataset ID
document_id: Document ID
segment_ids: Optional list of specific segment IDs to process. If None, process all segments.
Usage:
generate_summary_index_task.delay(dataset_id, document_id)
generate_summary_index_task.delay(dataset_id, document_id, segment_ids)
"""
logger.info(
click.style(
f"Start generating summary index for document {document_id} in dataset {dataset_id}",
fg="green",
)
)
start_at = time.perf_counter()
try:
dataset = db.session.query(Dataset).where(Dataset.id == dataset_id).first()
if not dataset:
logger.error(click.style(f"Dataset not found: {dataset_id}", fg="red"))
db.session.close()
return
document = db.session.query(DatasetDocument).where(DatasetDocument.id == document_id).first()
if not document:
logger.error(click.style(f"Document not found: {document_id}", fg="red"))
db.session.close()
return
# Only generate summary index for high_quality indexing technique
if dataset.indexing_technique != "high_quality":
logger.info(
click.style(
f"Skipping summary generation for dataset {dataset_id}: "
f"indexing_technique is {dataset.indexing_technique}, not 'high_quality'",
fg="cyan",
)
)
db.session.close()
return
# Check if summary index is enabled
summary_index_setting = dataset.summary_index_setting
if not summary_index_setting or not summary_index_setting.get("enable"):
logger.info(
click.style(
f"Summary index is disabled for dataset {dataset_id}",
fg="cyan",
)
)
db.session.close()
return
# Determine if only parent chunks should be processed
only_parent_chunks = dataset.chunk_structure == "parent_child_index"
# Generate summaries
summary_records = SummaryIndexService.generate_summaries_for_document(
dataset=dataset,
document=document,
summary_index_setting=summary_index_setting,
segment_ids=segment_ids,
only_parent_chunks=only_parent_chunks,
)
end_at = time.perf_counter()
logger.info(
click.style(
f"Summary index generation completed for document {document_id}: "
f"{len(summary_records)} summaries generated, latency: {end_at - start_at}",
fg="green",
)
)
except Exception as e:
logger.exception(f"Failed to generate summary index for document {document_id}: {str(e)}")
# Update document segments with error status if needed
if segment_ids:
db.session.query(DocumentSegment).filter(
DocumentSegment.id.in_(segment_ids),
DocumentSegment.dataset_id == dataset_id,
).update(
{
DocumentSegment.error: f"Summary generation failed: {str(e)}",
},
synchronize_session=False,
)
db.session.commit()
finally:
db.session.close()

View File

@ -0,0 +1,219 @@
"""Task for regenerating summary indexes when dataset settings change."""
import logging
import time
from typing import Any
import click
from celery import shared_task
from sqlalchemy import select
from extensions.ext_database import db
from models.dataset import Dataset, DocumentSegment, DocumentSegmentSummary
from models.dataset import Document as DatasetDocument
from services.summary_index_service import SummaryIndexService
logger = logging.getLogger(__name__)
@shared_task(queue="dataset")
def regenerate_summary_index_task(
dataset_id: str,
regenerate_reason: str = "summary_model_changed",
regenerate_vectors_only: bool = False,
):
"""
Regenerate summary indexes for all documents in a dataset.
This task is triggered when:
1. summary_index_setting model changes (regenerate_reason="summary_model_changed")
- Regenerates summary content and vectors for all existing summaries
2. embedding_model changes (regenerate_reason="embedding_model_changed")
- Only regenerates vectors for existing summaries (keeps summary content)
Args:
dataset_id: Dataset ID
regenerate_reason: Reason for regeneration ("summary_model_changed" or "embedding_model_changed")
regenerate_vectors_only: If True, only regenerate vectors without regenerating summary content
"""
logger.info(
click.style(
f"Start regenerate summary index for dataset {dataset_id}, reason: {regenerate_reason}",
fg="green",
)
)
start_at = time.perf_counter()
try:
dataset = db.session.query(Dataset).filter_by(id=dataset_id).first()
if not dataset:
logger.error(click.style(f"Dataset not found: {dataset_id}", fg="red"))
db.session.close()
return
# Only regenerate summary index for high_quality indexing technique
if dataset.indexing_technique != "high_quality":
logger.info(
click.style(
f"Skipping summary regeneration for dataset {dataset_id}: "
f"indexing_technique is {dataset.indexing_technique}, not 'high_quality'",
fg="cyan",
)
)
db.session.close()
return
# Check if summary index is enabled
summary_index_setting = dataset.summary_index_setting
if not summary_index_setting or not summary_index_setting.get("enable"):
logger.info(
click.style(
f"Summary index is disabled for dataset {dataset_id}",
fg="cyan",
)
)
db.session.close()
return
# Get all documents with completed indexing status
dataset_documents = db.session.scalars(
select(DatasetDocument).where(
DatasetDocument.dataset_id == dataset_id,
DatasetDocument.indexing_status == "completed",
DatasetDocument.enabled == True,
DatasetDocument.archived == False,
)
).all()
if not dataset_documents:
logger.info(
click.style(
f"No documents found for summary regeneration in dataset {dataset_id}",
fg="cyan",
)
)
db.session.close()
return
logger.info(
f"Found {len(dataset_documents)} documents for summary regeneration in dataset {dataset_id}"
)
total_segments_processed = 0
total_segments_failed = 0
for dataset_document in dataset_documents:
# Skip qa_model documents
if dataset_document.doc_form == "qa_model":
continue
try:
# Get all segments with existing summaries
segments = (
db.session.query(DocumentSegment)
.join(
DocumentSegmentSummary,
DocumentSegment.id == DocumentSegmentSummary.chunk_id,
)
.where(
DocumentSegment.document_id == dataset_document.id,
DocumentSegment.dataset_id == dataset_id,
DocumentSegment.status == "completed",
DocumentSegment.enabled == True,
DocumentSegmentSummary.dataset_id == dataset_id,
)
.order_by(DocumentSegment.position.asc())
.all()
)
if not segments:
continue
logger.info(
f"Regenerating summaries for {len(segments)} segments in document {dataset_document.id}"
)
for segment in segments:
try:
# Get existing summary record
summary_record = (
db.session.query(DocumentSegmentSummary)
.filter_by(
chunk_id=segment.id,
dataset_id=dataset_id,
)
.first()
)
if not summary_record:
logger.warning(
f"Summary record not found for segment {segment.id}, skipping"
)
continue
if regenerate_vectors_only:
# Only regenerate vectors (for embedding_model change)
# Delete old vector
if summary_record.summary_index_node_id:
try:
from core.rag.datasource.vdb.vector_factory import Vector
vector = Vector(dataset)
vector.delete_by_ids([summary_record.summary_index_node_id])
except Exception as e:
logger.warning(
f"Failed to delete old summary vector for segment {segment.id}: {str(e)}"
)
# Re-vectorize with new embedding model
SummaryIndexService.vectorize_summary(
summary_record, segment, dataset
)
db.session.commit()
else:
# Regenerate both summary content and vectors (for summary_model change)
SummaryIndexService.generate_and_vectorize_summary(
segment, dataset, summary_index_setting
)
db.session.commit()
total_segments_processed += 1
except Exception as e:
logger.error(
f"Failed to regenerate summary for segment {segment.id}: {str(e)}",
exc_info=True,
)
total_segments_failed += 1
# Update summary record with error status
if summary_record:
summary_record.status = "error"
summary_record.error = f"Regeneration failed: {str(e)}"
db.session.add(summary_record)
db.session.commit()
continue
except Exception as e:
logger.error(
f"Failed to process document {dataset_document.id} for summary regeneration: {str(e)}",
exc_info=True,
)
continue
end_at = time.perf_counter()
logger.info(
click.style(
f"Summary index regeneration completed for dataset {dataset_id}: "
f"{total_segments_processed} segments processed successfully, "
f"{total_segments_failed} segments failed, "
f"total documents: {len(dataset_documents)}, "
f"latency: {end_at - start_at:.2f}s",
fg="green",
)
)
except Exception:
logger.exception(f"Regenerate summary index failed for dataset {dataset_id}")
finally:
db.session.close()

View File

@ -47,6 +47,20 @@ def remove_document_from_index_task(document_id: str):
index_processor = IndexProcessorFactory(document.doc_form).init_index_processor()
segments = db.session.scalars(select(DocumentSegment).where(DocumentSegment.document_id == document.id)).all()
# Disable summary indexes for all segments in this document
from services.summary_index_service import SummaryIndexService
segment_ids_list = [segment.id for segment in segments]
if segment_ids_list:
try:
SummaryIndexService.disable_summaries_for_segments(
dataset=dataset,
segment_ids=segment_ids_list,
disabled_by=document.disabled_by,
)
except Exception as e:
logger.warning(f"Failed to disable summaries for document {document.id}: {str(e)}")
index_node_ids = [segment.index_node_id for segment in segments]
if index_node_ids:
try: