"""Summary index service for generating and managing document segment summaries.""" import logging import time import uuid from core.rag.datasource.vdb.vector_factory import Vector from core.rag.index_processor.constant.doc_type import DocType from core.rag.models.document import Document from extensions.ext_database import db from libs import helper from models.dataset import Dataset, DocumentSegment, DocumentSegmentSummary from models.dataset import Document as DatasetDocument logger = logging.getLogger(__name__) class SummaryIndexService: """Service for generating and managing summary indexes.""" @staticmethod def generate_summary_for_segment( segment: DocumentSegment, dataset: Dataset, summary_index_setting: dict, ) -> str: """ Generate summary for a single segment. Args: segment: DocumentSegment to generate summary for dataset: Dataset containing the segment summary_index_setting: Summary index configuration Returns: Generated summary text Raises: ValueError: If summary_index_setting is invalid or generation fails """ # Reuse the existing generate_summary method from ParagraphIndexProcessor # Use lazy import to avoid circular import from core.rag.index_processor.processor.paragraph_index_processor import ParagraphIndexProcessor summary_content = ParagraphIndexProcessor.generate_summary( tenant_id=dataset.tenant_id, text=segment.content, summary_index_setting=summary_index_setting, segment_id=segment.id, ) if not summary_content: raise ValueError("Generated summary is empty") return summary_content @staticmethod def create_summary_record( segment: DocumentSegment, dataset: Dataset, summary_content: str, status: str = "generating", ) -> DocumentSegmentSummary: """ Create or update a DocumentSegmentSummary record. If a summary record already exists for this segment, it will be updated instead of creating a new one. Args: segment: DocumentSegment to create summary for dataset: Dataset containing the segment summary_content: Generated summary content status: Summary status (default: "generating") Returns: Created or updated DocumentSegmentSummary instance """ # Check if summary record already exists existing_summary = ( db.session.query(DocumentSegmentSummary).filter_by(chunk_id=segment.id, dataset_id=dataset.id).first() ) if existing_summary: # Update existing record existing_summary.summary_content = summary_content existing_summary.status = status existing_summary.error = None # Clear any previous errors # Re-enable if it was disabled if not existing_summary.enabled: existing_summary.enabled = True existing_summary.disabled_at = None existing_summary.disabled_by = None db.session.add(existing_summary) db.session.flush() return existing_summary else: # Create new record (enabled by default) summary_record = DocumentSegmentSummary( dataset_id=dataset.id, document_id=segment.document_id, chunk_id=segment.id, summary_content=summary_content, status=status, enabled=True, # Explicitly set enabled to True ) db.session.add(summary_record) db.session.flush() return summary_record @staticmethod def vectorize_summary( summary_record: DocumentSegmentSummary, segment: DocumentSegment, dataset: Dataset, ) -> None: """ Vectorize summary and store in vector database. Args: summary_record: DocumentSegmentSummary record segment: Original DocumentSegment dataset: Dataset containing the segment """ if dataset.indexing_technique != "high_quality": logger.warning( "Summary vectorization skipped for dataset %s: indexing_technique is not high_quality", dataset.id, ) return # Reuse existing index_node_id if available (like segment does), otherwise generate new one old_summary_node_id = summary_record.summary_index_node_id if old_summary_node_id: # Reuse existing index_node_id (like segment behavior) summary_index_node_id = old_summary_node_id else: # Generate new index node ID only for new summaries summary_index_node_id = str(uuid.uuid4()) # Always regenerate hash (in case summary content changed) summary_hash = helper.generate_text_hash(summary_record.summary_content) # Delete old vector only if we're reusing the same index_node_id (to overwrite) # If index_node_id changed, the old vector should have been deleted elsewhere if old_summary_node_id and old_summary_node_id == summary_index_node_id: try: vector = Vector(dataset) vector.delete_by_ids([old_summary_node_id]) except Exception as e: logger.warning( "Failed to delete old summary vector for segment %s: %s. Continuing with new vectorization.", segment.id, str(e), ) # Create document with summary content and metadata summary_document = Document( page_content=summary_record.summary_content, metadata={ "doc_id": summary_index_node_id, "doc_hash": summary_hash, "dataset_id": dataset.id, "document_id": segment.document_id, "original_chunk_id": segment.id, # Key: link to original chunk "doc_type": DocType.TEXT, "is_summary": True, # Identifier for summary documents }, ) # Vectorize and store with retry mechanism for connection errors max_retries = 3 retry_delay = 2.0 for attempt in range(max_retries): try: vector = Vector(dataset) vector.add_texts([summary_document], duplicate_check=True) # Success - update summary record with index node info summary_record.summary_index_node_id = summary_index_node_id summary_record.summary_index_node_hash = summary_hash summary_record.status = "completed" db.session.add(summary_record) db.session.flush() return # Success, exit function except (ConnectionError, Exception) as e: error_str = str(e).lower() # Check if it's a connection-related error that might be transient is_connection_error = any( keyword in error_str for keyword in [ "connection", "disconnected", "timeout", "network", "could not connect", "server disconnected", "weaviate", ] ) if is_connection_error and attempt < max_retries - 1: # Retry for connection errors wait_time = retry_delay * (2**attempt) # Exponential backoff logger.warning( "Vectorization attempt %s/%s failed for segment %s: %s. Retrying in %.1f seconds...", attempt + 1, max_retries, segment.id, str(e), wait_time, ) time.sleep(wait_time) continue else: # Final attempt failed or non-connection error - log and update status logger.error( "Failed to vectorize summary for segment %s after %s attempts: %s", segment.id, attempt + 1, str(e), exc_info=True, ) summary_record.status = "error" summary_record.error = f"Vectorization failed: {str(e)}" db.session.add(summary_record) db.session.flush() raise @staticmethod def generate_and_vectorize_summary( segment: DocumentSegment, dataset: Dataset, summary_index_setting: dict, ) -> DocumentSegmentSummary: """ Generate summary for a segment and vectorize it. Args: segment: DocumentSegment to generate summary for dataset: Dataset containing the segment summary_index_setting: Summary index configuration Returns: Created DocumentSegmentSummary instance Raises: ValueError: If summary generation fails """ try: # Generate summary summary_content = SummaryIndexService.generate_summary_for_segment(segment, dataset, summary_index_setting) # Create or update summary record (will handle overwrite internally) summary_record = SummaryIndexService.create_summary_record( segment, dataset, summary_content, status="generating" ) # Vectorize summary (will delete old vector if exists before creating new one) SummaryIndexService.vectorize_summary(summary_record, segment, dataset) db.session.commit() logger.info("Successfully generated and vectorized summary for segment %s", segment.id) return summary_record except Exception: logger.exception("Failed to generate summary for segment %s", segment.id) # Update summary record with error status if it exists summary_record = ( db.session.query(DocumentSegmentSummary).filter_by(chunk_id=segment.id, dataset_id=dataset.id).first() ) if summary_record: summary_record.status = "error" summary_record.error = str(e) db.session.add(summary_record) db.session.commit() raise @staticmethod def generate_summaries_for_document( dataset: Dataset, document: DatasetDocument, summary_index_setting: dict, segment_ids: list[str] | None = None, only_parent_chunks: bool = False, ) -> list[DocumentSegmentSummary]: """ Generate summaries for all segments in a document including vectorization. Args: dataset: Dataset containing the document document: DatasetDocument to generate summaries for summary_index_setting: Summary index configuration segment_ids: Optional list of specific segment IDs to process only_parent_chunks: If True, only process parent chunks (for parent-child mode) Returns: List of created DocumentSegmentSummary instances """ # Only generate summary index for high_quality indexing technique if dataset.indexing_technique != "high_quality": logger.info( "Skipping summary generation for dataset %s: indexing_technique is %s, not 'high_quality'", dataset.id, dataset.indexing_technique, ) return [] if not summary_index_setting or not summary_index_setting.get("enable"): logger.info("Summary index is disabled for dataset %s", dataset.id) return [] # Skip qa_model documents if document.doc_form == "qa_model": logger.info("Skipping summary generation for qa_model document %s", document.id) return [] logger.info( "Starting summary generation for document %s in dataset %s, segment_ids: %s, only_parent_chunks: %s", document.id, dataset.id, len(segment_ids) if segment_ids else "all", only_parent_chunks, ) # Query segments (only enabled segments) query = db.session.query(DocumentSegment).filter_by( dataset_id=dataset.id, document_id=document.id, status="completed", enabled=True, # Only generate summaries for enabled segments ) if segment_ids: query = query.filter(DocumentSegment.id.in_(segment_ids)) segments = query.all() if not segments: logger.info("No segments found for document %s", document.id) return [] summary_records = [] for segment in segments: # For parent-child mode, only process parent chunks # In parent-child mode, all DocumentSegments are parent chunks, # so we process all of them. Child chunks are stored in ChildChunk table # and are not DocumentSegments, so they won't be in the segments list. # This check is mainly for clarity and future-proofing. if only_parent_chunks: # In parent-child mode, all segments in the query are parent chunks # Child chunks are not DocumentSegments, so they won't appear here # We can process all segments pass try: summary_record = SummaryIndexService.generate_and_vectorize_summary( segment, dataset, summary_index_setting ) summary_records.append(summary_record) except Exception: logger.exception("Failed to generate summary for segment %s", segment.id) # Continue with other segments continue logger.info( "Completed summary generation for document %s: %s summaries generated and vectorized", document.id, len(summary_records), ) return summary_records @staticmethod def disable_summaries_for_segments( dataset: Dataset, segment_ids: list[str] | None = None, disabled_by: str | None = None, ) -> None: """ Disable summary records and remove vectors from vector database for segments. Unlike delete, this preserves the summary records but marks them as disabled. Args: dataset: Dataset containing the segments segment_ids: List of segment IDs to disable summaries for. If None, disable all. disabled_by: User ID who disabled the summaries """ from libs.datetime_utils import naive_utc_now query = db.session.query(DocumentSegmentSummary).filter_by( dataset_id=dataset.id, enabled=True, # Only disable enabled summaries ) if segment_ids: query = query.filter(DocumentSegmentSummary.chunk_id.in_(segment_ids)) summaries = query.all() if not summaries: return logger.info( "Disabling %s summary records for dataset %s, segment_ids: %s", len(summaries), dataset.id, len(segment_ids) if segment_ids else "all", ) # Remove from vector database (but keep records) if dataset.indexing_technique == "high_quality": summary_node_ids = [s.summary_index_node_id for s in summaries if s.summary_index_node_id] if summary_node_ids: try: vector = Vector(dataset) vector.delete_by_ids(summary_node_ids) except Exception as e: logger.warning("Failed to remove summary vectors: %s", str(e)) # Disable summary records (don't delete) now = naive_utc_now() for summary in summaries: summary.enabled = False summary.disabled_at = now summary.disabled_by = disabled_by db.session.add(summary) db.session.commit() logger.info("Disabled %s summary records for dataset %s", len(summaries), dataset.id) @staticmethod def enable_summaries_for_segments( dataset: Dataset, segment_ids: list[str] | None = None, ) -> None: """ Enable summary records and re-add vectors to vector database for segments. Args: dataset: Dataset containing the segments segment_ids: List of segment IDs to enable summaries for. If None, enable all. """ # Only enable summary index for high_quality indexing technique if dataset.indexing_technique != "high_quality": return # Check if summary index is enabled summary_index_setting = dataset.summary_index_setting if not summary_index_setting or not summary_index_setting.get("enable"): return query = db.session.query(DocumentSegmentSummary).filter_by( dataset_id=dataset.id, enabled=False, # Only enable disabled summaries ) if segment_ids: query = query.filter(DocumentSegmentSummary.chunk_id.in_(segment_ids)) summaries = query.all() if not summaries: return logger.info( "Enabling %s summary records for dataset %s, segment_ids: %s", len(summaries), dataset.id, len(segment_ids) if segment_ids else "all", ) # Re-vectorize and re-add to vector database enabled_count = 0 for summary in summaries: # Get the original segment segment = ( db.session.query(DocumentSegment) .filter_by( id=summary.chunk_id, dataset_id=dataset.id, ) .first() ) if not segment or not segment.enabled or segment.status != "completed": continue if not summary.summary_content: continue try: # Re-vectorize summary SummaryIndexService.vectorize_summary(summary, segment, dataset) # Enable summary record summary.enabled = True summary.disabled_at = None summary.disabled_by = None db.session.add(summary) enabled_count += 1 except Exception: logger.exception("Failed to re-vectorize summary %s", summary.id) # Keep it disabled if vectorization fails continue db.session.commit() logger.info("Enabled %s summary records for dataset %s", enabled_count, dataset.id) @staticmethod def delete_summaries_for_segments( dataset: Dataset, segment_ids: list[str] | None = None, ) -> None: """ Delete summary records and vectors for segments (used only for actual deletion scenarios). For disable/enable operations, use disable_summaries_for_segments/enable_summaries_for_segments. Args: dataset: Dataset containing the segments segment_ids: List of segment IDs to delete summaries for. If None, delete all. """ query = db.session.query(DocumentSegmentSummary).filter_by(dataset_id=dataset.id) if segment_ids: query = query.filter(DocumentSegmentSummary.chunk_id.in_(segment_ids)) summaries = query.all() if not summaries: return # Delete from vector database if dataset.indexing_technique == "high_quality": summary_node_ids = [s.summary_index_node_id for s in summaries if s.summary_index_node_id] if summary_node_ids: vector = Vector(dataset) vector.delete_by_ids(summary_node_ids) # Delete summary records for summary in summaries: db.session.delete(summary) db.session.commit() logger.info("Deleted %s summary records for dataset %s", len(summaries), dataset.id) @staticmethod def update_summary_for_segment( segment: DocumentSegment, dataset: Dataset, summary_content: str, ) -> DocumentSegmentSummary | None: """ Update summary for a segment and re-vectorize it. Args: segment: DocumentSegment to update summary for dataset: Dataset containing the segment summary_content: New summary content Returns: Updated DocumentSegmentSummary instance, or None if summary index is not enabled """ # Only update summary index for high_quality indexing technique if dataset.indexing_technique != "high_quality": return None # Check if summary index is enabled summary_index_setting = dataset.summary_index_setting if not summary_index_setting or not summary_index_setting.get("enable"): return None # Skip qa_model documents if segment.document and segment.document.doc_form == "qa_model": return None try: # Find existing summary record summary_record = ( db.session.query(DocumentSegmentSummary).filter_by(chunk_id=segment.id, dataset_id=dataset.id).first() ) if summary_record: # Update existing summary old_summary_node_id = summary_record.summary_index_node_id # Update summary content summary_record.summary_content = summary_content summary_record.status = "generating" db.session.add(summary_record) db.session.flush() # Delete old vector if exists if old_summary_node_id: vector = Vector(dataset) vector.delete_by_ids([old_summary_node_id]) # Re-vectorize summary SummaryIndexService.vectorize_summary(summary_record, segment, dataset) db.session.commit() logger.info("Successfully updated and re-vectorized summary for segment %s", segment.id) return summary_record else: # Create new summary record if doesn't exist summary_record = SummaryIndexService.create_summary_record( segment, dataset, summary_content, status="generating" ) SummaryIndexService.vectorize_summary(summary_record, segment, dataset) db.session.commit() logger.info("Successfully created and vectorized summary for segment %s", segment.id) return summary_record except Exception: logger.exception("Failed to update summary for segment %s", segment.id) # Update summary record with error status if it exists summary_record = ( db.session.query(DocumentSegmentSummary).filter_by(chunk_id=segment.id, dataset_id=dataset.id).first() ) if summary_record: summary_record.status = "error" summary_record.error = str(e) db.session.add(summary_record) db.session.commit() raise