"""Summary index service for generating and managing document segment summaries.""" import logging import time import uuid from typing import Any from core.rag.datasource.vdb.vector_factory import Vector from core.rag.index_processor.constant.doc_type import DocType from core.rag.models.document import Document from extensions.ext_database import db from libs import helper from models.dataset import Dataset, DocumentSegment, DocumentSegmentSummary from models.dataset import Document as DatasetDocument logger = logging.getLogger(__name__) class SummaryIndexService: """Service for generating and managing summary indexes.""" @staticmethod def generate_summary_for_segment( segment: DocumentSegment, dataset: Dataset, summary_index_setting: dict, ) -> str: """ Generate summary for a single segment. Args: segment: DocumentSegment to generate summary for dataset: Dataset containing the segment summary_index_setting: Summary index configuration Returns: Generated summary text Raises: ValueError: If summary_index_setting is invalid or generation fails """ # Reuse the existing generate_summary method from ParagraphIndexProcessor # Use lazy import to avoid circular import from core.rag.index_processor.processor.paragraph_index_processor import ParagraphIndexProcessor summary_content = ParagraphIndexProcessor.generate_summary( tenant_id=dataset.tenant_id, text=segment.content, summary_index_setting=summary_index_setting, ) if not summary_content: raise ValueError("Generated summary is empty") return summary_content @staticmethod def create_summary_record( segment: DocumentSegment, dataset: Dataset, summary_content: str, status: str = "generating", ) -> DocumentSegmentSummary: """ Create or update a DocumentSegmentSummary record. If a summary record already exists for this segment, it will be updated instead of creating a new one. Args: segment: DocumentSegment to create summary for dataset: Dataset containing the segment summary_content: Generated summary content status: Summary status (default: "generating") Returns: Created or updated DocumentSegmentSummary instance """ # Check if summary record already exists existing_summary = ( db.session.query(DocumentSegmentSummary) .filter_by(chunk_id=segment.id, dataset_id=dataset.id) .first() ) if existing_summary: # Update existing record existing_summary.summary_content = summary_content existing_summary.status = status existing_summary.error = None # Clear any previous errors # Re-enable if it was disabled if not existing_summary.enabled: existing_summary.enabled = True existing_summary.disabled_at = None existing_summary.disabled_by = None db.session.add(existing_summary) db.session.flush() return existing_summary else: # Create new record (enabled by default) summary_record = DocumentSegmentSummary( dataset_id=dataset.id, document_id=segment.document_id, chunk_id=segment.id, summary_content=summary_content, status=status, enabled=True, # Explicitly set enabled to True ) db.session.add(summary_record) db.session.flush() return summary_record @staticmethod def vectorize_summary( summary_record: DocumentSegmentSummary, segment: DocumentSegment, dataset: Dataset, ) -> None: """ Vectorize summary and store in vector database. Args: summary_record: DocumentSegmentSummary record segment: Original DocumentSegment dataset: Dataset containing the segment """ if dataset.indexing_technique != "high_quality": logger.warning( f"Summary vectorization skipped for dataset {dataset.id}: " "indexing_technique is not high_quality" ) return # Reuse existing index_node_id if available (like segment does), otherwise generate new one old_summary_node_id = summary_record.summary_index_node_id if old_summary_node_id: # Reuse existing index_node_id (like segment behavior) summary_index_node_id = old_summary_node_id else: # Generate new index node ID only for new summaries summary_index_node_id = str(uuid.uuid4()) # Always regenerate hash (in case summary content changed) summary_hash = helper.generate_text_hash(summary_record.summary_content) # Delete old vector only if we're reusing the same index_node_id (to overwrite) # If index_node_id changed, the old vector should have been deleted elsewhere if old_summary_node_id and old_summary_node_id == summary_index_node_id: try: vector = Vector(dataset) vector.delete_by_ids([old_summary_node_id]) except Exception as e: logger.warning( f"Failed to delete old summary vector for segment {segment.id}: {str(e)}. " "Continuing with new vectorization." ) # Create document with summary content and metadata summary_document = Document( page_content=summary_record.summary_content, metadata={ "doc_id": summary_index_node_id, "doc_hash": summary_hash, "dataset_id": dataset.id, "document_id": segment.document_id, "original_chunk_id": segment.id, # Key: link to original chunk "doc_type": DocType.TEXT, "is_summary": True, # Identifier for summary documents }, ) # Vectorize and store with retry mechanism for connection errors max_retries = 3 retry_delay = 2.0 for attempt in range(max_retries): try: vector = Vector(dataset) vector.add_texts([summary_document], duplicate_check=True) # Success - update summary record with index node info summary_record.summary_index_node_id = summary_index_node_id summary_record.summary_index_node_hash = summary_hash summary_record.status = "completed" db.session.add(summary_record) db.session.flush() return # Success, exit function except (ConnectionError, Exception) as e: error_str = str(e).lower() # Check if it's a connection-related error that might be transient is_connection_error = any(keyword in error_str for keyword in [ "connection", "disconnected", "timeout", "network", "could not connect", "server disconnected", "weaviate" ]) if is_connection_error and attempt < max_retries - 1: # Retry for connection errors wait_time = retry_delay * (2 ** attempt) # Exponential backoff logger.warning( f"Vectorization attempt {attempt + 1}/{max_retries} failed for segment {segment.id}: {str(e)}. " f"Retrying in {wait_time:.1f} seconds..." ) time.sleep(wait_time) continue else: # Final attempt failed or non-connection error - log and update status logger.error( f"Failed to vectorize summary for segment {segment.id} after {attempt + 1} attempts: {str(e)}", exc_info=True ) summary_record.status = "error" summary_record.error = f"Vectorization failed: {str(e)}" db.session.add(summary_record) db.session.flush() raise @staticmethod def generate_and_vectorize_summary( segment: DocumentSegment, dataset: Dataset, summary_index_setting: dict, ) -> DocumentSegmentSummary: """ Generate summary for a segment and vectorize it. Args: segment: DocumentSegment to generate summary for dataset: Dataset containing the segment summary_index_setting: Summary index configuration Returns: Created DocumentSegmentSummary instance Raises: ValueError: If summary generation fails """ try: # Generate summary summary_content = SummaryIndexService.generate_summary_for_segment( segment, dataset, summary_index_setting ) # Create or update summary record (will handle overwrite internally) summary_record = SummaryIndexService.create_summary_record( segment, dataset, summary_content, status="generating" ) # Vectorize summary (will delete old vector if exists before creating new one) SummaryIndexService.vectorize_summary(summary_record, segment, dataset) db.session.commit() logger.info(f"Successfully generated and vectorized summary for segment {segment.id}") return summary_record except Exception as e: logger.exception(f"Failed to generate summary for segment {segment.id}: {str(e)}") # Update summary record with error status if it exists summary_record = ( db.session.query(DocumentSegmentSummary) .filter_by(chunk_id=segment.id, dataset_id=dataset.id) .first() ) if summary_record: summary_record.status = "error" summary_record.error = str(e) db.session.add(summary_record) db.session.commit() raise @staticmethod def generate_summaries_for_document( dataset: Dataset, document: DatasetDocument, summary_index_setting: dict, segment_ids: list[str] | None = None, only_parent_chunks: bool = False, ) -> list[DocumentSegmentSummary]: """ Generate summaries for all segments in a document including vectorization. Args: dataset: Dataset containing the document document: DatasetDocument to generate summaries for summary_index_setting: Summary index configuration segment_ids: Optional list of specific segment IDs to process only_parent_chunks: If True, only process parent chunks (for parent-child mode) Returns: List of created DocumentSegmentSummary instances """ # Only generate summary index for high_quality indexing technique if dataset.indexing_technique != "high_quality": logger.info( f"Skipping summary generation for dataset {dataset.id}: " f"indexing_technique is {dataset.indexing_technique}, not 'high_quality'" ) return [] if not summary_index_setting or not summary_index_setting.get("enable"): logger.info(f"Summary index is disabled for dataset {dataset.id}") return [] # Skip qa_model documents if document.doc_form == "qa_model": logger.info(f"Skipping summary generation for qa_model document {document.id}") return [] logger.info( f"Starting summary generation for document {document.id} in dataset {dataset.id}, " f"segment_ids: {len(segment_ids) if segment_ids else 'all'}, " f"only_parent_chunks: {only_parent_chunks}" ) # Query segments (only enabled segments) query = db.session.query(DocumentSegment).filter_by( dataset_id=dataset.id, document_id=document.id, status="completed", enabled=True, # Only generate summaries for enabled segments ) if segment_ids: query = query.filter(DocumentSegment.id.in_(segment_ids)) segments = query.all() if not segments: logger.info(f"No segments found for document {document.id}") return [] summary_records = [] for segment in segments: # For parent-child mode, only process parent chunks # In parent-child mode, all DocumentSegments are parent chunks, # so we process all of them. Child chunks are stored in ChildChunk table # and are not DocumentSegments, so they won't be in the segments list. # This check is mainly for clarity and future-proofing. if only_parent_chunks: # In parent-child mode, all segments in the query are parent chunks # Child chunks are not DocumentSegments, so they won't appear here # We can process all segments pass try: summary_record = SummaryIndexService.generate_and_vectorize_summary( segment, dataset, summary_index_setting ) summary_records.append(summary_record) except Exception as e: logger.error(f"Failed to generate summary for segment {segment.id}: {str(e)}") # Continue with other segments continue logger.info( f"Completed summary generation for document {document.id}: " f"{len(summary_records)} summaries generated and vectorized" ) return summary_records @staticmethod def disable_summaries_for_segments( dataset: Dataset, segment_ids: list[str] | None = None, disabled_by: str | None = None, ) -> None: """ Disable summary records and remove vectors from vector database for segments. Unlike delete, this preserves the summary records but marks them as disabled. Args: dataset: Dataset containing the segments segment_ids: List of segment IDs to disable summaries for. If None, disable all. disabled_by: User ID who disabled the summaries """ from libs.datetime_utils import naive_utc_now query = db.session.query(DocumentSegmentSummary).filter_by( dataset_id=dataset.id, enabled=True, # Only disable enabled summaries ) if segment_ids: query = query.filter(DocumentSegmentSummary.chunk_id.in_(segment_ids)) summaries = query.all() if not summaries: return logger.info( f"Disabling {len(summaries)} summary records for dataset {dataset.id}, " f"segment_ids: {len(segment_ids) if segment_ids else 'all'}" ) # Remove from vector database (but keep records) if dataset.indexing_technique == "high_quality": summary_node_ids = [ s.summary_index_node_id for s in summaries if s.summary_index_node_id ] if summary_node_ids: try: vector = Vector(dataset) vector.delete_by_ids(summary_node_ids) except Exception as e: logger.warning(f"Failed to remove summary vectors: {str(e)}") # Disable summary records (don't delete) now = naive_utc_now() for summary in summaries: summary.enabled = False summary.disabled_at = now summary.disabled_by = disabled_by db.session.add(summary) db.session.commit() logger.info(f"Disabled {len(summaries)} summary records for dataset {dataset.id}") @staticmethod def enable_summaries_for_segments( dataset: Dataset, segment_ids: list[str] | None = None, ) -> None: """ Enable summary records and re-add vectors to vector database for segments. Args: dataset: Dataset containing the segments segment_ids: List of segment IDs to enable summaries for. If None, enable all. """ # Only enable summary index for high_quality indexing technique if dataset.indexing_technique != "high_quality": return # Check if summary index is enabled summary_index_setting = dataset.summary_index_setting if not summary_index_setting or not summary_index_setting.get("enable"): return query = db.session.query(DocumentSegmentSummary).filter_by( dataset_id=dataset.id, enabled=False, # Only enable disabled summaries ) if segment_ids: query = query.filter(DocumentSegmentSummary.chunk_id.in_(segment_ids)) summaries = query.all() if not summaries: return logger.info( f"Enabling {len(summaries)} summary records for dataset {dataset.id}, " f"segment_ids: {len(segment_ids) if segment_ids else 'all'}" ) # Re-vectorize and re-add to vector database enabled_count = 0 for summary in summaries: # Get the original segment segment = db.session.query(DocumentSegment).filter_by( id=summary.chunk_id, dataset_id=dataset.id, ).first() if not segment or not segment.enabled or segment.status != "completed": continue if not summary.summary_content: continue try: # Re-vectorize summary SummaryIndexService.vectorize_summary(summary, segment, dataset) # Enable summary record summary.enabled = True summary.disabled_at = None summary.disabled_by = None db.session.add(summary) enabled_count += 1 except Exception as e: logger.error(f"Failed to re-vectorize summary {summary.id}: {str(e)}") # Keep it disabled if vectorization fails continue db.session.commit() logger.info(f"Enabled {enabled_count} summary records for dataset {dataset.id}") @staticmethod def delete_summaries_for_segments( dataset: Dataset, segment_ids: list[str] | None = None, ) -> None: """ Delete summary records and vectors for segments (used only for actual deletion scenarios). For disable/enable operations, use disable_summaries_for_segments/enable_summaries_for_segments. Args: dataset: Dataset containing the segments segment_ids: List of segment IDs to delete summaries for. If None, delete all. """ query = db.session.query(DocumentSegmentSummary).filter_by(dataset_id=dataset.id) if segment_ids: query = query.filter(DocumentSegmentSummary.chunk_id.in_(segment_ids)) summaries = query.all() if not summaries: return # Delete from vector database if dataset.indexing_technique == "high_quality": summary_node_ids = [ s.summary_index_node_id for s in summaries if s.summary_index_node_id ] if summary_node_ids: vector = Vector(dataset) vector.delete_by_ids(summary_node_ids) # Delete summary records for summary in summaries: db.session.delete(summary) db.session.commit() logger.info(f"Deleted {len(summaries)} summary records for dataset {dataset.id}") @staticmethod def update_summary_for_segment( segment: DocumentSegment, dataset: Dataset, summary_content: str, ) -> DocumentSegmentSummary | None: """ Update summary for a segment and re-vectorize it. Args: segment: DocumentSegment to update summary for dataset: Dataset containing the segment summary_content: New summary content Returns: Updated DocumentSegmentSummary instance, or None if summary index is not enabled """ # Only update summary index for high_quality indexing technique if dataset.indexing_technique != "high_quality": return None # Check if summary index is enabled summary_index_setting = dataset.summary_index_setting if not summary_index_setting or not summary_index_setting.get("enable"): return None # Skip qa_model documents if segment.document and segment.document.doc_form == "qa_model": return None try: # Find existing summary record summary_record = ( db.session.query(DocumentSegmentSummary) .filter_by(chunk_id=segment.id, dataset_id=dataset.id) .first() ) if summary_record: # Update existing summary old_summary_node_id = summary_record.summary_index_node_id # Update summary content summary_record.summary_content = summary_content summary_record.status = "generating" db.session.add(summary_record) db.session.flush() # Delete old vector if exists if old_summary_node_id: vector = Vector(dataset) vector.delete_by_ids([old_summary_node_id]) # Re-vectorize summary SummaryIndexService.vectorize_summary(summary_record, segment, dataset) db.session.commit() logger.info(f"Successfully updated and re-vectorized summary for segment {segment.id}") return summary_record else: # Create new summary record if doesn't exist summary_record = SummaryIndexService.create_summary_record( segment, dataset, summary_content, status="generating" ) SummaryIndexService.vectorize_summary(summary_record, segment, dataset) db.session.commit() logger.info(f"Successfully created and vectorized summary for segment {segment.id}") return summary_record except Exception as e: logger.exception(f"Failed to update summary for segment {segment.id}: {str(e)}") # Update summary record with error status if it exists summary_record = ( db.session.query(DocumentSegmentSummary) .filter_by(chunk_id=segment.id, dataset_id=dataset.id) .first() ) if summary_record: summary_record.status = "error" summary_record.error = str(e) db.session.add(summary_record) db.session.commit() raise