From 7eb65b07c8535370fea9297e6cb33ce3110093aa Mon Sep 17 00:00:00 2001 From: FFXN Date: Wed, 14 Jan 2026 17:52:27 +0800 Subject: [PATCH] feat: Make summary index support vision, and make the code more standardized. --- .../console/datasets/datasets_document.py | 136 +++++----- .../console/datasets/datasets_segments.py | 6 +- .../console/datasets/hit_testing.py | 14 +- api/core/indexing_runner.py | 4 +- api/core/llm_generator/prompts.py | 4 +- api/core/rag/datasource/retrieval_service.py | 6 +- .../index_processor/index_processor_base.py | 4 +- .../processor/paragraph_index_processor.py | 244 ++++++++++++++++-- .../workflow/nodes/document_extractor/node.py | 15 +- .../knowledge_index/knowledge_index_node.py | 116 +++++---- api/fields/document_fields.py | 6 +- api/models/dataset.py | 5 +- api/services/dataset_service.py | 35 +-- api/services/summary_index_service.py | 171 ++++++------ api/tasks/add_document_to_index_task.py | 3 +- api/tasks/disable_segment_from_index_task.py | 3 +- api/tasks/disable_segments_from_index_task.py | 3 +- api/tasks/document_indexing_task.py | 43 +-- api/tasks/enable_segment_to_index_task.py | 3 +- api/tasks/enable_segments_to_index_task.py | 3 +- api/tasks/generate_summary_index_task.py | 5 +- api/tasks/regenerate_summary_index_task.py | 40 +-- api/tasks/remove_document_from_index_task.py | 7 +- 23 files changed, 569 insertions(+), 307 deletions(-) diff --git a/api/controllers/console/datasets/datasets_document.py b/api/controllers/console/datasets/datasets_document.py index 5d3a11d200..1ca9a615e3 100644 --- a/api/controllers/console/datasets/datasets_document.py +++ b/api/controllers/console/datasets/datasets_document.py @@ -107,7 +107,8 @@ class DocumentRenamePayload(BaseModel): class GenerateSummaryPayload(BaseModel): document_list: list[str] - + + class DocumentDatasetListParam(BaseModel): page: int = Field(1, title="Page", description="Page number.") limit: int = Field(20, title="Limit", description="Page size.") @@ -311,17 +312,14 @@ class DatasetDocumentListApi(Resource): paginated_documents = db.paginate(select=query, page=page, per_page=limit, max_per_page=100, error_out=False) documents = paginated_documents.items - + # Check if dataset has summary index enabled - has_summary_index = ( - dataset.summary_index_setting - and dataset.summary_index_setting.get("enable") is True - ) - + has_summary_index = dataset.summary_index_setting and dataset.summary_index_setting.get("enable") is True + # Filter documents that need summary calculation documents_need_summary = [doc for doc in documents if doc.need_summary is True] document_ids_need_summary = [str(doc.id) for doc in documents_need_summary] - + # Calculate summary_index_status for documents that need summary (only if dataset summary index is enabled) summary_status_map = {} if has_summary_index and document_ids_need_summary: @@ -335,7 +333,7 @@ class DatasetDocumentListApi(Resource): ) .all() ) - + # Group segments by document_id document_segments_map = {} for segment in segments: @@ -343,7 +341,7 @@ class DatasetDocumentListApi(Resource): if doc_id not in document_segments_map: document_segments_map[doc_id] = [] document_segments_map[doc_id].append(segment.id) - + # Get all summary records for these segments all_segment_ids = [seg.id for seg in segments] summaries = {} @@ -358,7 +356,7 @@ class DatasetDocumentListApi(Resource): .all() ) summaries = {summary.chunk_id: summary.status for summary in summary_records} - + # Calculate summary_index_status for each document for doc_id in document_ids_need_summary: segment_ids = document_segments_map.get(doc_id, []) @@ -366,7 +364,7 @@ class DatasetDocumentListApi(Resource): # No segments, status is "GENERATING" (waiting to generate) summary_status_map[doc_id] = "GENERATING" continue - + # Count summary statuses for this document's segments status_counts = {"completed": 0, "generating": 0, "error": 0, "not_started": 0} for segment_id in segment_ids: @@ -375,12 +373,12 @@ class DatasetDocumentListApi(Resource): status_counts[status] += 1 else: status_counts["not_started"] += 1 - + total_segments = len(segment_ids) completed_count = status_counts["completed"] generating_count = status_counts["generating"] error_count = status_counts["error"] - + # Determine overall status (only three states: GENERATING, COMPLETED, ERROR) if completed_count == total_segments: summary_status_map[doc_id] = "COMPLETED" @@ -393,7 +391,7 @@ class DatasetDocumentListApi(Resource): else: # Default to generating summary_status_map[doc_id] = "GENERATING" - + # Add summary_index_status to each document for document in documents: if has_summary_index and document.need_summary is True: @@ -401,7 +399,7 @@ class DatasetDocumentListApi(Resource): else: # Return null if summary index is not enabled or document doesn't need summary document.summary_index_status = None - + if fetch: for document in documents: completed_segments = ( @@ -500,7 +498,6 @@ class DatasetDocumentListApi(Resource): return {"result": "success"}, 204 - @console_ns.route("/datasets/init") class DatasetInitApi(Resource): @console_ns.doc("init_dataset") @@ -1311,49 +1308,46 @@ class DocumentGenerateSummaryApi(Resource): def post(self, dataset_id): """ Generate summary index for specified documents. - + This endpoint checks if the dataset configuration supports summary generation (indexing_technique must be 'high_quality' and summary_index_setting.enable must be true), then asynchronously generates summary indexes for the provided documents. """ current_user, _ = current_account_with_tenant() dataset_id = str(dataset_id) - + # Get dataset dataset = DatasetService.get_dataset(dataset_id) if not dataset: raise NotFound("Dataset not found.") - + # Check permissions if not current_user.is_dataset_editor: raise Forbidden() - + try: DatasetService.check_dataset_permission(dataset, current_user) except services.errors.account.NoPermissionError as e: raise Forbidden(str(e)) - + # Validate request payload payload = GenerateSummaryPayload.model_validate(console_ns.payload or {}) document_list = payload.document_list - + if not document_list: raise ValueError("document_list cannot be empty.") - + # Check if dataset configuration supports summary generation if dataset.indexing_technique != "high_quality": raise ValueError( f"Summary generation is only available for 'high_quality' indexing technique. " f"Current indexing technique: {dataset.indexing_technique}" ) - + summary_index_setting = dataset.summary_index_setting if not summary_index_setting or not summary_index_setting.get("enable"): - raise ValueError( - "Summary index is not enabled for this dataset. " - "Please enable it in the dataset settings." - ) - + raise ValueError("Summary index is not enabled for this dataset. Please enable it in the dataset settings.") + # Verify all documents exist and belong to the dataset documents = ( db.session.query(Document) @@ -1363,27 +1357,27 @@ class DocumentGenerateSummaryApi(Resource): ) .all() ) - + if len(documents) != len(document_list): found_ids = {doc.id for doc in documents} missing_ids = set(document_list) - found_ids raise NotFound(f"Some documents not found: {list(missing_ids)}") - + # Dispatch async tasks for each document for document in documents: # Skip qa_model documents as they don't generate summaries if document.doc_form == "qa_model": - logger.info( - f"Skipping summary generation for qa_model document {document.id}" - ) + logger.info("Skipping summary generation for qa_model document %s", document.id) continue - + # Dispatch async task generate_summary_index_task(dataset_id, document.id) logger.info( - f"Dispatched summary generation task for document {document.id} in dataset {dataset_id}" + "Dispatched summary generation task for document %s in dataset %s", + document.id, + dataset_id, ) - + return {"result": "success"}, 200 @@ -1400,7 +1394,7 @@ class DocumentSummaryStatusApi(DocumentResource): def get(self, dataset_id, document_id): """ Get summary index generation status for a document. - + Returns: - total_segments: Total number of segments in the document - summary_status: Dictionary with status counts @@ -1413,21 +1407,21 @@ class DocumentSummaryStatusApi(DocumentResource): current_user, _ = current_account_with_tenant() dataset_id = str(dataset_id) document_id = str(document_id) - + # Get document document = self.get_document(dataset_id, document_id) - + # Get dataset dataset = DatasetService.get_dataset(dataset_id) if not dataset: raise NotFound("Dataset not found.") - + # Check permissions try: DatasetService.check_dataset_permission(dataset, current_user) except services.errors.account.NoPermissionError as e: raise Forbidden(str(e)) - + # Get all segments for this document segments = ( db.session.query(DocumentSegment) @@ -1439,9 +1433,9 @@ class DocumentSummaryStatusApi(DocumentResource): ) .all() ) - + total_segments = len(segments) - + # Get all summary records for these segments segment_ids = [segment.id for segment in segments] summaries = [] @@ -1456,10 +1450,10 @@ class DocumentSummaryStatusApi(DocumentResource): ) .all() ) - + # Create a mapping of chunk_id to summary summary_map = {summary.chunk_id: summary for summary in summaries} - + # Count statuses status_counts = { "completed": 0, @@ -1467,34 +1461,42 @@ class DocumentSummaryStatusApi(DocumentResource): "error": 0, "not_started": 0, } - + summary_list = [] for segment in segments: summary = summary_map.get(segment.id) if summary: status = summary.status status_counts[status] = status_counts.get(status, 0) + 1 - summary_list.append({ - "segment_id": segment.id, - "segment_position": segment.position, - "status": summary.status, - "summary_preview": summary.summary_content[:100] + "..." if summary.summary_content and len(summary.summary_content) > 100 else summary.summary_content, - "error": summary.error, - "created_at": int(summary.created_at.timestamp()) if summary.created_at else None, - "updated_at": int(summary.updated_at.timestamp()) if summary.updated_at else None, - }) + summary_list.append( + { + "segment_id": segment.id, + "segment_position": segment.position, + "status": summary.status, + "summary_preview": ( + summary.summary_content[:100] + "..." + if summary.summary_content and len(summary.summary_content) > 100 + else summary.summary_content + ), + "error": summary.error, + "created_at": int(summary.created_at.timestamp()) if summary.created_at else None, + "updated_at": int(summary.updated_at.timestamp()) if summary.updated_at else None, + } + ) else: status_counts["not_started"] += 1 - summary_list.append({ - "segment_id": segment.id, - "segment_position": segment.position, - "status": "not_started", - "summary_preview": None, - "error": None, - "created_at": None, - "updated_at": None, - }) - + summary_list.append( + { + "segment_id": segment.id, + "segment_position": segment.position, + "status": "not_started", + "summary_preview": None, + "error": None, + "created_at": None, + "updated_at": None, + } + ) + return { "total_segments": total_segments, "summary_status": status_counts, diff --git a/api/controllers/console/datasets/datasets_segments.py b/api/controllers/console/datasets/datasets_segments.py index 423462f966..8a9bc6a201 100644 --- a/api/controllers/console/datasets/datasets_segments.py +++ b/api/controllers/console/datasets/datasets_segments.py @@ -212,9 +212,7 @@ class DatasetDocumentSegmentListApi(Resource): ) # Only include enabled summaries summaries = { - summary.chunk_id: summary.summary_content - for summary in summary_records - if summary.enabled is True + summary.chunk_id: summary.summary_content for summary in summary_records if summary.enabled is True } # Add summary to each segment @@ -433,7 +431,7 @@ class DatasetDocumentSegmentUpdateApi(Resource): payload = SegmentUpdatePayload.model_validate(console_ns.payload or {}) payload_dict = payload.model_dump(exclude_none=True) SegmentService.segment_create_args_validate(payload_dict, document) - + # Update segment (summary update with change detection is handled in SegmentService.update_segment) segment = SegmentService.update_segment( SegmentUpdateArgs.model_validate(payload.model_dump(exclude_none=True)), segment, document, dataset diff --git a/api/controllers/console/datasets/hit_testing.py b/api/controllers/console/datasets/hit_testing.py index c947132070..e62be13c2f 100644 --- a/api/controllers/console/datasets/hit_testing.py +++ b/api/controllers/console/datasets/hit_testing.py @@ -1,6 +1,13 @@ from flask_restx import Resource, fields from controllers.common.schema import register_schema_model +from fields.hit_testing_fields import ( + child_chunk_fields, + document_fields, + files_fields, + hit_testing_record_fields, + segment_fields, +) from libs.login import login_required from .. import console_ns @@ -10,13 +17,6 @@ from ..wraps import ( cloud_edition_billing_rate_limit_check, setup_required, ) -from fields.hit_testing_fields import ( - child_chunk_fields, - document_fields, - files_fields, - hit_testing_record_fields, - segment_fields, -) register_schema_model(console_ns, HitTestingPayload) diff --git a/api/core/indexing_runner.py b/api/core/indexing_runner.py index 599a655ab9..e172e88298 100644 --- a/api/core/indexing_runner.py +++ b/api/core/indexing_runner.py @@ -367,8 +367,8 @@ class IndexingRunner: return IndexingEstimate(total_segments=total_segments * 20, qa_preview=qa_preview_texts, preview=[]) # Generate summary preview - summary_index_setting = tmp_processing_rule["summary_index_setting"] if "summary_index_setting" in tmp_processing_rule else None - if summary_index_setting and summary_index_setting.get('enable') and preview_texts: + summary_index_setting = tmp_processing_rule.get("summary_index_setting") + if summary_index_setting and summary_index_setting.get("enable") and preview_texts: preview_texts = index_processor.generate_summary_preview(tenant_id, preview_texts, summary_index_setting) return IndexingEstimate(total_segments=total_segments, preview=preview_texts) diff --git a/api/core/llm_generator/prompts.py b/api/core/llm_generator/prompts.py index 1fbf279309..af7995f3bd 100644 --- a/api/core/llm_generator/prompts.py +++ b/api/core/llm_generator/prompts.py @@ -436,4 +436,6 @@ You should edit the prompt according to the IDEAL OUTPUT.""" INSTRUCTION_GENERATE_TEMPLATE_CODE = """Please fix the errors in the {{#error_message#}}.""" DEFAULT_GENERATOR_SUMMARY_PROMPT = """ -You are a helpful assistant that summarizes long pieces of text into concise summaries. Given the following text, generate a brief summary that captures the main points and key information. The summary should be clear, concise, and written in complete sentences. """ +You are a helpful assistant that summarizes long pieces of text into concise summaries. +Given the following text, generate a brief summary that captures the main points and key information. +The summary should be clear, concise, and written in complete sentences. """ diff --git a/api/core/rag/datasource/retrieval_service.py b/api/core/rag/datasource/retrieval_service.py index 6deb967e0a..372cbe5032 100644 --- a/api/core/rag/datasource/retrieval_service.py +++ b/api/core/rag/datasource/retrieval_service.py @@ -395,7 +395,7 @@ class RetrievalService: index_node_ids = [] doc_to_document_map = {} summary_segment_ids = set() # Track segments retrieved via summary - + # First pass: collect all document IDs and identify summary documents for document in documents: document_id = document.metadata.get("document_id") @@ -455,7 +455,7 @@ class RetrievalService: doc_segment_map[attachment["segment_id"]].append(attachment["attachment_id"]) else: doc_segment_map[attachment["segment_id"]] = [attachment["attachment_id"]] - + child_chunk_stmt = select(ChildChunk).where(ChildChunk.index_node_id.in_(child_index_node_ids)) child_index_nodes = session.execute(child_chunk_stmt).scalars().all() @@ -479,7 +479,7 @@ class RetrievalService: index_node_segments = session.execute(document_segment_stmt).scalars().all() # type: ignore for index_node_segment in index_node_segments: doc_segment_map[index_node_segment.id] = [index_node_segment.index_node_id] - + if segment_ids: document_segment_stmt = select(DocumentSegment).where( DocumentSegment.enabled == True, diff --git a/api/core/rag/index_processor/index_processor_base.py b/api/core/rag/index_processor/index_processor_base.py index 8bbdf8ba39..151a3de7d9 100644 --- a/api/core/rag/index_processor/index_processor_base.py +++ b/api/core/rag/index_processor/index_processor_base.py @@ -47,7 +47,9 @@ class BaseIndexProcessor(ABC): raise NotImplementedError @abstractmethod - def generate_summary_preview(self, tenant_id: str, preview_texts: list[PreviewDetail], summary_index_setting: dict) -> list[PreviewDetail]: + def generate_summary_preview( + self, tenant_id: str, preview_texts: list[PreviewDetail], summary_index_setting: dict + ) -> list[PreviewDetail]: """ For each segment in preview_texts, generate a summary using LLM and attach it to the segment. The summary can be stored in a new attribute, e.g., summary. diff --git a/api/core/rag/index_processor/processor/paragraph_index_processor.py b/api/core/rag/index_processor/processor/paragraph_index_processor.py index 89a6d80306..a6f2f4e820 100644 --- a/api/core/rag/index_processor/processor/paragraph_index_processor.py +++ b/api/core/rag/index_processor/processor/paragraph_index_processor.py @@ -1,6 +1,7 @@ """Paragraph index processor.""" import logging +import re import uuid from collections.abc import Mapping from typing import Any @@ -8,6 +9,17 @@ from typing import Any logger = logging.getLogger(__name__) from core.entities.knowledge_entities import PreviewDetail +from core.file import File, FileTransferMethod, FileType, file_manager +from core.llm_generator.prompts import DEFAULT_GENERATOR_SUMMARY_PROMPT +from core.model_manager import ModelInstance +from core.model_runtime.entities.message_entities import ( + ImagePromptMessageContent, + PromptMessageContentUnionTypes, + TextPromptMessageContent, + UserPromptMessage, +) +from core.model_runtime.entities.model_entities import ModelFeature, ModelType +from core.provider_manager import ProviderManager from core.rag.cleaner.clean_processor import CleanProcessor from core.rag.datasource.keyword.keyword_factory import Keyword from core.rag.datasource.retrieval_service import RetrievalService @@ -22,18 +34,15 @@ from core.rag.models.document import AttachmentDocument, Document, MultimodalGen from core.rag.retrieval.retrieval_methods import RetrievalMethod from core.tools.utils.text_processing_utils import remove_leading_symbols from extensions.ext_database import db +from factories.file_factory import build_from_mapping from libs import helper +from models import UploadFile from models.account import Account -from models.dataset import Dataset, DatasetProcessRule, DocumentSegment +from models.dataset import Dataset, DatasetProcessRule, DocumentSegment, SegmentAttachmentBinding from models.dataset import Document as DatasetDocument from services.account_service import AccountService from services.entities.knowledge_entities.knowledge_entities import Rule from services.summary_index_service import SummaryIndexService -from core.llm_generator.prompts import DEFAULT_GENERATOR_SUMMARY_PROMPT -from core.model_runtime.entities.message_entities import UserPromptMessage -from core.model_runtime.entities.model_entities import ModelType -from core.provider_manager import ProviderManager -from core.model_manager import ModelInstance class ParagraphIndexProcessor(BaseIndexProcessor): @@ -262,12 +271,15 @@ class ParagraphIndexProcessor(BaseIndexProcessor): else: raise ValueError("Chunks is not a list") - def generate_summary_preview(self, tenant_id: str, preview_texts: list[PreviewDetail], summary_index_setting: dict) -> list[PreviewDetail]: + def generate_summary_preview( + self, tenant_id: str, preview_texts: list[PreviewDetail], summary_index_setting: dict + ) -> list[PreviewDetail]: """ For each segment, concurrently call generate_summary to generate a summary and write it to the summary attribute of PreviewDetail. """ import concurrent.futures + from flask import current_app # Capture Flask app context for worker threads @@ -289,8 +301,8 @@ class ParagraphIndexProcessor(BaseIndexProcessor): # Fallback: try without app context (may fail) summary = self.generate_summary(tenant_id, preview.content, summary_index_setting) preview.summary = summary - except Exception as e: - logger.error(f"Failed to generate summary for preview: {str(e)}") + except Exception: + logger.exception("Failed to generate summary for preview") # Don't fail the entire preview if summary generation fails preview.summary = None @@ -299,9 +311,21 @@ class ParagraphIndexProcessor(BaseIndexProcessor): return preview_texts @staticmethod - def generate_summary(tenant_id: str, text: str, summary_index_setting: dict = None) -> str: + def generate_summary( + tenant_id: str, + text: str, + summary_index_setting: dict | None = None, + segment_id: str | None = None, + ) -> str: """ - Generate summary for the given text using ModelInstance.invoke_llm and the default or custom summary prompt. + Generate summary for the given text using ModelInstance.invoke_llm and the default or custom summary prompt, + and supports vision models by including images from the segment attachments or text content. + + Args: + tenant_id: Tenant ID + text: Text content to summarize + summary_index_setting: Summary index configuration + segment_id: Optional segment ID to fetch attachments from SegmentAttachmentBinding table """ if not summary_index_setting or not summary_index_setting.get("enable"): raise ValueError("summary_index_setting is required and must be enabled to generate summary.") @@ -314,17 +338,195 @@ class ParagraphIndexProcessor(BaseIndexProcessor): if not summary_prompt: summary_prompt = DEFAULT_GENERATOR_SUMMARY_PROMPT - prompt = f"{summary_prompt}\n{text}" - provider_manager = ProviderManager() - provider_model_bundle = provider_manager.get_provider_model_bundle(tenant_id, model_provider_name, ModelType.LLM) - model_instance = ModelInstance(provider_model_bundle, model_name) - prompt_messages = [UserPromptMessage(content=prompt)] - - result = model_instance.invoke_llm( - prompt_messages=prompt_messages, - model_parameters={}, - stream=False + provider_model_bundle = provider_manager.get_provider_model_bundle( + tenant_id, model_provider_name, ModelType.LLM ) + model_instance = ModelInstance(provider_model_bundle, model_name) + + # Get model schema to check if vision is supported + model_schema = model_instance.get_model_schema(model_name, provider_model_bundle.credentials) + supports_vision = model_schema and model_schema.features and ModelFeature.VISION in model_schema.features + + # Extract images if model supports vision + image_files = [] + if supports_vision: + # First, try to get images from SegmentAttachmentBinding (preferred method) + if segment_id: + image_files = ParagraphIndexProcessor._extract_images_from_segment_attachments(tenant_id, segment_id) + + # If no images from attachments, fall back to extracting from text + if not image_files: + image_files = ParagraphIndexProcessor._extract_images_from_text(tenant_id, text) + + # Build prompt messages + prompt_messages = [] + + if image_files: + # If we have images, create a UserPromptMessage with both text and images + prompt_message_contents: list[PromptMessageContentUnionTypes] = [] + + # Add images first + for file in image_files: + try: + file_content = file_manager.to_prompt_message_content( + file, image_detail_config=ImagePromptMessageContent.DETAIL.LOW + ) + prompt_message_contents.append(file_content) + except Exception as e: + logger.warning("Failed to convert image file to prompt message content: %s", str(e)) + continue + + # Add text content + if prompt_message_contents: # Only add text if we successfully added images + prompt_message_contents.append(TextPromptMessageContent(data=f"{summary_prompt}\n{text}")) + prompt_messages.append(UserPromptMessage(content=prompt_message_contents)) + else: + # If image conversion failed, fall back to text-only + prompt = f"{summary_prompt}\n{text}" + prompt_messages.append(UserPromptMessage(content=prompt)) + else: + # No images, use simple text prompt + prompt = f"{summary_prompt}\n{text}" + prompt_messages.append(UserPromptMessage(content=prompt)) + + result = model_instance.invoke_llm(prompt_messages=prompt_messages, model_parameters={}, stream=False) return getattr(result.message, "content", "") + + @staticmethod + def _extract_images_from_text(tenant_id: str, text: str) -> list[File]: + """ + Extract images from markdown text and convert them to File objects. + + Args: + tenant_id: Tenant ID + text: Text content that may contain markdown image links + + Returns: + List of File objects representing images found in the text + """ + # Extract markdown images using regex pattern + pattern = r"!\[.*?\]\((.*?)\)" + images = re.findall(pattern, text) + + if not images: + return [] + + upload_file_id_list = [] + + for image in images: + # For data before v0.10.0 + pattern = r"/files/([a-f0-9\-]+)/image-preview(?:\?.*?)?" + match = re.search(pattern, image) + if match: + upload_file_id = match.group(1) + upload_file_id_list.append(upload_file_id) + continue + + # For data after v0.10.0 + pattern = r"/files/([a-f0-9\-]+)/file-preview(?:\?.*?)?" + match = re.search(pattern, image) + if match: + upload_file_id = match.group(1) + upload_file_id_list.append(upload_file_id) + continue + + # For tools directory - direct file formats (e.g., .png, .jpg, etc.) + pattern = r"/files/tools/([a-f0-9\-]+)\.([a-zA-Z0-9]+)(?:\?[^\s\)\"\']*)?" + match = re.search(pattern, image) + if match: + # Tool files are handled differently, skip for now + continue + + if not upload_file_id_list: + return [] + + # Get unique IDs for database query + unique_upload_file_ids = list(set(upload_file_id_list)) + upload_files = ( + db.session.query(UploadFile) + .where(UploadFile.id.in_(unique_upload_file_ids), UploadFile.tenant_id == tenant_id) + .all() + ) + + # Create File objects from UploadFile records + file_objects = [] + for upload_file in upload_files: + # Only process image files + if not upload_file.mime_type or "image" not in upload_file.mime_type: + continue + + mapping = { + "upload_file_id": upload_file.id, + "transfer_method": FileTransferMethod.LOCAL_FILE.value, + "type": FileType.IMAGE.value, + } + + try: + file_obj = build_from_mapping( + mapping=mapping, + tenant_id=tenant_id, + ) + file_objects.append(file_obj) + except Exception as e: + logger.warning("Failed to create File object from UploadFile %s: %s", upload_file.id, str(e)) + continue + + return file_objects + + @staticmethod + def _extract_images_from_segment_attachments(tenant_id: str, segment_id: str) -> list[File]: + """ + Extract images from SegmentAttachmentBinding table (preferred method). + This matches how DatasetRetrieval gets segment attachments. + + Args: + tenant_id: Tenant ID + segment_id: Segment ID to fetch attachments for + + Returns: + List of File objects representing images found in segment attachments + """ + from sqlalchemy import select + + # Query attachments from SegmentAttachmentBinding table + attachments_with_bindings = db.session.execute( + select(SegmentAttachmentBinding, UploadFile) + .join(UploadFile, UploadFile.id == SegmentAttachmentBinding.attachment_id) + .where( + SegmentAttachmentBinding.segment_id == segment_id, + SegmentAttachmentBinding.tenant_id == tenant_id, + ) + ).all() + + if not attachments_with_bindings: + return [] + + file_objects = [] + for _, upload_file in attachments_with_bindings: + # Only process image files + if not upload_file.mime_type or "image" not in upload_file.mime_type: + continue + + try: + # Create File object directly (similar to DatasetRetrieval) + file_obj = File( + id=upload_file.id, + filename=upload_file.name, + extension="." + upload_file.extension, + mime_type=upload_file.mime_type, + tenant_id=tenant_id, + type=FileType.IMAGE, + transfer_method=FileTransferMethod.LOCAL_FILE, + remote_url=upload_file.source_url, + related_id=upload_file.id, + size=upload_file.size, + storage_key=upload_file.key, + ) + file_objects.append(file_obj) + except Exception as e: + logger.warning("Failed to create File object from UploadFile %s: %s", upload_file.id, str(e)) + continue + + return file_objects diff --git a/api/core/workflow/nodes/document_extractor/node.py b/api/core/workflow/nodes/document_extractor/node.py index 2cbd7952ba..25dd98f48a 100644 --- a/api/core/workflow/nodes/document_extractor/node.py +++ b/api/core/workflow/nodes/document_extractor/node.py @@ -65,14 +65,14 @@ class DocumentExtractorNode(Node[DocumentExtractorNodeData]): # Ensure storage_key is loaded for File objects files_to_check = value if isinstance(value, list) else [value] files_needing_storage_key = [ - f for f in files_to_check - if isinstance(f, File) and not f.storage_key and f.related_id + f for f in files_to_check if isinstance(f, File) and not f.storage_key and f.related_id ] if files_needing_storage_key: - from factories.file_factory import StorageKeyLoader - from extensions.ext_database import db from sqlalchemy.orm import Session - + + from extensions.ext_database import db + from factories.file_factory import StorageKeyLoader + with Session(bind=db.engine) as session: storage_key_loader = StorageKeyLoader(session, tenant_id=self.tenant_id) storage_key_loader.load_storage_keys(files_needing_storage_key) @@ -433,12 +433,13 @@ def _download_file_content(file: File) -> bytes: # Check if storage_key is set if not file.storage_key: raise FileDownloadError(f"File storage_key is missing for file: {file.filename}") - + # Check if file exists before downloading from extensions.ext_storage import storage + if not storage.exists(file.storage_key): raise FileDownloadError(f"File not found in storage: {file.storage_key}") - + return file_manager.download(file) except Exception as e: raise FileDownloadError(f"Error downloading file: {str(e)}") from e diff --git a/api/core/workflow/nodes/knowledge_index/knowledge_index_node.py b/api/core/workflow/nodes/knowledge_index/knowledge_index_node.py index 4d264683d0..d14bdee1fd 100644 --- a/api/core/workflow/nodes/knowledge_index/knowledge_index_node.py +++ b/api/core/workflow/nodes/knowledge_index/knowledge_index_node.py @@ -77,11 +77,13 @@ class KnowledgeIndexNode(Node[KnowledgeIndexNodeData]): # or fallback to dataset if not available in node_data indexing_technique = node_data.indexing_technique or dataset.indexing_technique summary_index_setting = node_data.summary_index_setting or dataset.summary_index_setting - + outputs = self._get_preview_output_with_summaries( - node_data.chunk_structure, chunks, dataset=dataset, + node_data.chunk_structure, + chunks, + dataset=dataset, indexing_technique=indexing_technique, - summary_index_setting=summary_index_setting + summary_index_setting=summary_index_setting, ) return NodeRunResult( status=WorkflowNodeExecutionStatus.SUCCEEDED, @@ -237,7 +239,7 @@ class KnowledgeIndexNode(Node[KnowledgeIndexNodeData]): segments = query.all() if not segments: - logger.info(f"No segments found for document {document.id}") + logger.info("No segments found for document %s", document.id) return # Filter segments based on mode @@ -256,7 +258,7 @@ class KnowledgeIndexNode(Node[KnowledgeIndexNodeData]): segments_to_process.append(segment) if not segments_to_process: - logger.info(f"No segments need summary generation for document {document.id}") + logger.info("No segments need summary generation for document %s", document.id) return # Use ThreadPoolExecutor for concurrent generation @@ -267,46 +269,55 @@ class KnowledgeIndexNode(Node[KnowledgeIndexNodeData]): """Process a single segment in a thread with Flask app context.""" with flask_app.app_context(): try: - SummaryIndexService.generate_and_vectorize_summary( - segment, dataset, summary_index_setting + SummaryIndexService.generate_and_vectorize_summary(segment, dataset, summary_index_setting) + except Exception: + logger.exception( + "Failed to generate summary for segment %s", + segment.id, ) - except Exception as e: - logger.error(f"Failed to generate summary for segment {segment.id}: {str(e)}") # Continue processing other segments with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor: - futures = [ - executor.submit(process_segment, segment) for segment in segments_to_process - ] + futures = [executor.submit(process_segment, segment) for segment in segments_to_process] # Wait for all tasks to complete concurrent.futures.wait(futures) logger.info( - f"Successfully generated summary index for {len(segments_to_process)} segments " - f"in document {document.id}" + "Successfully generated summary index for %s segments in document %s", + len(segments_to_process), + document.id, ) - except Exception as e: - logger.exception(f"Failed to generate summary index for document {document.id}: {str(e)}") + except Exception: + logger.exception("Failed to generate summary index for document %s", document.id) # Don't fail the entire indexing process if summary generation fails else: # Production mode: asynchronous generation - logger.info(f"Queuing summary index generation task for document {document.id} (production mode)") + logger.info( + "Queuing summary index generation task for document %s (production mode)", + document.id, + ) try: generate_summary_index_task.delay(dataset.id, document.id, None) - logger.info(f"Summary index generation task queued for document {document.id}") - except Exception as e: - logger.exception(f"Failed to queue summary index generation task for document {document.id}: {str(e)}") + logger.info("Summary index generation task queued for document %s", document.id) + except Exception: + logger.exception( + "Failed to queue summary index generation task for document %s", + document.id, + ) # Don't fail the entire indexing process if task queuing fails def _get_preview_output_with_summaries( - self, chunk_structure: str, chunks: Any, dataset: Dataset, + self, + chunk_structure: str, + chunks: Any, + dataset: Dataset, indexing_technique: str | None = None, - summary_index_setting: dict | None = None + summary_index_setting: dict | None = None, ) -> Mapping[str, Any]: """ Generate preview output with summaries for chunks in preview mode. This method generates summaries on-the-fly without saving to database. - + Args: chunk_structure: Chunk structure type chunks: Chunks to generate preview for @@ -316,31 +327,32 @@ class KnowledgeIndexNode(Node[KnowledgeIndexNodeData]): """ index_processor = IndexProcessorFactory(chunk_structure).init_index_processor() preview_output = index_processor.format_preview(chunks) - + # Check if summary index is enabled if indexing_technique != "high_quality": return preview_output - + if not summary_index_setting or not summary_index_setting.get("enable"): return preview_output - + # Generate summaries for chunks if "preview" in preview_output and isinstance(preview_output["preview"], list): chunk_count = len(preview_output["preview"]) logger.info( - f"Generating summaries for {chunk_count} chunks in preview mode " - f"(dataset: {dataset.id})" + "Generating summaries for %s chunks in preview mode (dataset: %s)", + chunk_count, + dataset.id, ) # Use ParagraphIndexProcessor's generate_summary method from core.rag.index_processor.processor.paragraph_index_processor import ParagraphIndexProcessor - + # Get Flask app for application context in worker threads flask_app = None try: flask_app = current_app._get_current_object() # type: ignore except RuntimeError: logger.warning("No Flask application context available, summary generation may fail") - + def generate_summary_for_chunk(preview_item: dict) -> None: """Generate summary for a single chunk.""" if "content" in preview_item: @@ -364,10 +376,10 @@ class KnowledgeIndexNode(Node[KnowledgeIndexNodeData]): ) if summary: preview_item["summary"] = summary - except Exception as e: - logger.error(f"Failed to generate summary for chunk: {str(e)}") + except Exception: + logger.exception("Failed to generate summary for chunk") # Don't fail the entire preview if summary generation fails - + # Generate summaries concurrently using ThreadPoolExecutor # Set a reasonable timeout to prevent hanging (60 seconds per chunk, max 5 minutes total) timeout_seconds = min(300, 60 * len(preview_output["preview"])) @@ -378,31 +390,39 @@ class KnowledgeIndexNode(Node[KnowledgeIndexNodeData]): ] # Wait for all tasks to complete with timeout done, not_done = concurrent.futures.wait(futures, timeout=timeout_seconds) - + # Cancel tasks that didn't complete in time if not_done: logger.warning( - f"Summary generation timeout: {len(not_done)} chunks did not complete within {timeout_seconds}s. " - "Cancelling remaining tasks..." + "Summary generation timeout: %s chunks did not complete within %ss. " + "Cancelling remaining tasks...", + len(not_done), + timeout_seconds, ) for future in not_done: future.cancel() # Wait a bit for cancellation to take effect concurrent.futures.wait(not_done, timeout=5) - + completed_count = sum(1 for item in preview_output["preview"] if item.get("summary") is not None) logger.info( - f"Completed summary generation for preview chunks: {completed_count}/{len(preview_output['preview'])} succeeded" + "Completed summary generation for preview chunks: %s/%s succeeded", + completed_count, + len(preview_output["preview"]), ) - + return preview_output def _get_preview_output( - self, chunk_structure: str, chunks: Any, dataset: Dataset | None = None, variable_pool: VariablePool | None = None + self, + chunk_structure: str, + chunks: Any, + dataset: Dataset | None = None, + variable_pool: VariablePool | None = None, ) -> Mapping[str, Any]: index_processor = IndexProcessorFactory(chunk_structure).init_index_processor() preview_output = index_processor.format_preview(chunks) - + # If dataset is provided, try to enrich preview with summaries if dataset and variable_pool: document_id = variable_pool.get(["sys", SystemVariableKey.DOCUMENT_ID]) @@ -420,7 +440,7 @@ class KnowledgeIndexNode(Node[KnowledgeIndexNodeData]): ) .all() ) - + if summaries: # Create a map of segment content to summary for matching # Use content matching as chunks in preview might not be indexed yet @@ -435,7 +455,7 @@ class KnowledgeIndexNode(Node[KnowledgeIndexNodeData]): # Normalize content for matching (strip whitespace) normalized_content = segment.content.strip() summary_by_content[normalized_content] = summary.summary_content - + # Enrich preview with summaries by content matching if "preview" in preview_output and isinstance(preview_output["preview"], list): matched_count = 0 @@ -446,13 +466,15 @@ class KnowledgeIndexNode(Node[KnowledgeIndexNodeData]): if normalized_chunk_content in summary_by_content: preview_item["summary"] = summary_by_content[normalized_chunk_content] matched_count += 1 - + if matched_count > 0: logger.info( - f"Enriched preview with {matched_count} existing summaries " - f"(dataset: {dataset.id}, document: {document.id})" + "Enriched preview with %s existing summaries (dataset: %s, document: %s)", + matched_count, + dataset.id, + document.id, ) - + return preview_output @classmethod diff --git a/api/fields/document_fields.py b/api/fields/document_fields.py index 62f5e19e25..875726d31d 100644 --- a/api/fields/document_fields.py +++ b/api/fields/document_fields.py @@ -33,7 +33,8 @@ document_fields = { "hit_count": fields.Integer, "doc_form": fields.String, "doc_metadata": fields.List(fields.Nested(document_metadata_fields), attribute="doc_metadata_details"), - "summary_index_status": fields.String, # Summary index generation status: "waiting", "generating", "completed", "partial_error", or null if not enabled + # Summary index generation status: "GENERATING", "COMPLETED", "ERROR", or null if not enabled + "summary_index_status": fields.String, "need_summary": fields.Boolean, # Whether this document needs summary index generation } @@ -62,7 +63,8 @@ document_with_segments_fields = { "completed_segments": fields.Integer, "total_segments": fields.Integer, "doc_metadata": fields.List(fields.Nested(document_metadata_fields), attribute="doc_metadata_details"), - "summary_index_status": fields.String, # Summary index generation status: "waiting", "generating", "completed", "partial_error", or null if not enabled + # Summary index generation status: "GENERATING", "COMPLETED", "ERROR", or null if not enabled + "summary_index_status": fields.String, "need_summary": fields.Boolean, # Whether this document needs summary index generation } diff --git a/api/models/dataset.py b/api/models/dataset.py index 6497c0efc0..f207c1d2d8 100644 --- a/api/models/dataset.py +++ b/api/models/dataset.py @@ -1595,8 +1595,9 @@ class DocumentSegmentSummary(Base): disabled_at: Mapped[datetime | None] = mapped_column(DateTime, nullable=True) disabled_by = mapped_column(StringUUID, nullable=True) created_at: Mapped[datetime] = mapped_column(DateTime, nullable=False, server_default=func.current_timestamp()) - updated_at: Mapped[datetime] = mapped_column(DateTime, nullable=False, server_default=func.current_timestamp(), onupdate=func.current_timestamp()) + updated_at: Mapped[datetime] = mapped_column( + DateTime, nullable=False, server_default=func.current_timestamp(), onupdate=func.current_timestamp() + ) def __repr__(self): return f"" - diff --git a/api/services/dataset_service.py b/api/services/dataset_service.py index 82e9770286..2bff0e1524 100644 --- a/api/services/dataset_service.py +++ b/api/services/dataset_service.py @@ -87,9 +87,9 @@ from tasks.disable_segments_from_index_task import disable_segments_from_index_t from tasks.document_indexing_update_task import document_indexing_update_task from tasks.enable_segments_to_index_task import enable_segments_to_index_task from tasks.recover_document_indexing_task import recover_document_indexing_task +from tasks.regenerate_summary_index_task import regenerate_summary_index_task from tasks.remove_document_from_index_task import remove_document_from_index_task from tasks.retry_document_indexing_task import retry_document_indexing_task -from tasks.regenerate_summary_index_task import regenerate_summary_index_task from tasks.sync_website_document_indexing_task import sync_website_document_indexing_task logger = logging.getLogger(__name__) @@ -563,9 +563,7 @@ class DatasetService: action = DatasetService._handle_indexing_technique_change(dataset, data, filtered_data) # Check if summary_index_setting model changed (before updating database) - summary_model_changed = DatasetService._check_summary_index_setting_model_changed( - dataset, data - ) + summary_model_changed = DatasetService._check_summary_index_setting_model_changed(dataset, data) # Add metadata fields filtered_data["updated_by"] = user.id @@ -921,8 +919,12 @@ class DatasetService: # Check if model changed if old_model_name != new_model_name or old_model_provider != new_model_provider: logger.info( - f"Summary index setting model changed for dataset {dataset.id}: " - f"old={old_model_provider}/{old_model_name}, new={new_model_provider}/{new_model_name}" + "Summary index setting model changed for dataset %s: old=%s/%s, new=%s/%s", + dataset.id, + old_model_provider, + old_model_name, + new_model_provider, + new_model_name, ) return True @@ -2208,12 +2210,9 @@ class DocumentService: ): # Set need_summary based on dataset's summary_index_setting need_summary = False - if ( - dataset.summary_index_setting - and dataset.summary_index_setting.get("enable") is True - ): + if dataset.summary_index_setting and dataset.summary_index_setting.get("enable") is True: need_summary = True - + document = Document( tenant_id=dataset.tenant_id, dataset_id=dataset.id, @@ -3118,10 +3117,11 @@ class SegmentService: and dataset.summary_index_setting and dataset.summary_index_setting.get("enable") is True ) - + if has_summary_index: # Query existing summary from database from models.dataset import DocumentSegmentSummary + existing_summary = ( db.session.query(DocumentSegmentSummary) .where( @@ -3130,16 +3130,17 @@ class SegmentService: ) .first() ) - + # Check if summary has changed existing_summary_content = existing_summary.summary_content if existing_summary else None if existing_summary_content != args.summary: # Summary has changed, update it from services.summary_index_service import SummaryIndexService + try: SummaryIndexService.update_summary_for_segment(segment, dataset, args.summary) - except Exception as e: - logger.exception(f"Failed to update summary for segment {segment.id}: {str(e)}") + except Exception: + logger.exception("Failed to update summary for segment %s", segment.id) # Don't fail the entire update if summary update fails else: segment_hash = helper.generate_text_hash(content) @@ -3221,8 +3222,8 @@ class SegmentService: try: SummaryIndexService.update_summary_for_segment(segment, dataset, args.summary) - except Exception as e: - logger.exception(f"Failed to update summary for segment {segment.id}: {str(e)}") + except Exception: + logger.exception("Failed to update summary for segment %s", segment.id) # Don't fail the entire update if summary update fails # update multimodel vector index VectorService.update_multimodel_vector(segment, args.attachment_ids or [], dataset) diff --git a/api/services/summary_index_service.py b/api/services/summary_index_service.py index 1d5c51aad8..d2cf23cb1c 100644 --- a/api/services/summary_index_service.py +++ b/api/services/summary_index_service.py @@ -3,7 +3,6 @@ import logging import time import uuid -from typing import Any from core.rag.datasource.vdb.vector_factory import Vector from core.rag.index_processor.constant.doc_type import DocType @@ -47,6 +46,7 @@ class SummaryIndexService: tenant_id=dataset.tenant_id, text=segment.content, summary_index_setting=summary_index_setting, + segment_id=segment.id, ) if not summary_content: @@ -76,11 +76,9 @@ class SummaryIndexService: """ # Check if summary record already exists existing_summary = ( - db.session.query(DocumentSegmentSummary) - .filter_by(chunk_id=segment.id, dataset_id=dataset.id) - .first() + db.session.query(DocumentSegmentSummary).filter_by(chunk_id=segment.id, dataset_id=dataset.id).first() ) - + if existing_summary: # Update existing record existing_summary.summary_content = summary_content @@ -124,8 +122,8 @@ class SummaryIndexService: """ if dataset.indexing_technique != "high_quality": logger.warning( - f"Summary vectorization skipped for dataset {dataset.id}: " - "indexing_technique is not high_quality" + "Summary vectorization skipped for dataset %s: indexing_technique is not high_quality", + dataset.id, ) return @@ -137,10 +135,10 @@ class SummaryIndexService: else: # Generate new index node ID only for new summaries summary_index_node_id = str(uuid.uuid4()) - + # Always regenerate hash (in case summary content changed) summary_hash = helper.generate_text_hash(summary_record.summary_content) - + # Delete old vector only if we're reusing the same index_node_id (to overwrite) # If index_node_id changed, the old vector should have been deleted elsewhere if old_summary_node_id and old_summary_node_id == summary_index_node_id: @@ -149,8 +147,9 @@ class SummaryIndexService: vector.delete_by_ids([old_summary_node_id]) except Exception as e: logger.warning( - f"Failed to delete old summary vector for segment {segment.id}: {str(e)}. " - "Continuing with new vectorization." + "Failed to delete old summary vector for segment %s: %s. Continuing with new vectorization.", + segment.id, + str(e), ) # Create document with summary content and metadata @@ -170,12 +169,12 @@ class SummaryIndexService: # Vectorize and store with retry mechanism for connection errors max_retries = 3 retry_delay = 2.0 - + for attempt in range(max_retries): try: vector = Vector(dataset) vector.add_texts([summary_document], duplicate_check=True) - + # Success - update summary record with index node info summary_record.summary_index_node_id = summary_index_node_id summary_record.summary_index_node_hash = summary_hash @@ -183,29 +182,44 @@ class SummaryIndexService: db.session.add(summary_record) db.session.flush() return # Success, exit function - + except (ConnectionError, Exception) as e: error_str = str(e).lower() # Check if it's a connection-related error that might be transient - is_connection_error = any(keyword in error_str for keyword in [ - "connection", "disconnected", "timeout", "network", - "could not connect", "server disconnected", "weaviate" - ]) - + is_connection_error = any( + keyword in error_str + for keyword in [ + "connection", + "disconnected", + "timeout", + "network", + "could not connect", + "server disconnected", + "weaviate", + ] + ) + if is_connection_error and attempt < max_retries - 1: # Retry for connection errors - wait_time = retry_delay * (2 ** attempt) # Exponential backoff + wait_time = retry_delay * (2**attempt) # Exponential backoff logger.warning( - f"Vectorization attempt {attempt + 1}/{max_retries} failed for segment {segment.id}: {str(e)}. " - f"Retrying in {wait_time:.1f} seconds..." + "Vectorization attempt %s/%s failed for segment %s: %s. Retrying in %.1f seconds...", + attempt + 1, + max_retries, + segment.id, + str(e), + wait_time, ) time.sleep(wait_time) continue else: # Final attempt failed or non-connection error - log and update status logger.error( - f"Failed to vectorize summary for segment {segment.id} after {attempt + 1} attempts: {str(e)}", - exc_info=True + "Failed to vectorize summary for segment %s after %s attempts: %s", + segment.id, + attempt + 1, + str(e), + exc_info=True, ) summary_record.status = "error" summary_record.error = f"Vectorization failed: {str(e)}" @@ -235,9 +249,7 @@ class SummaryIndexService: """ try: # Generate summary - summary_content = SummaryIndexService.generate_summary_for_segment( - segment, dataset, summary_index_setting - ) + summary_content = SummaryIndexService.generate_summary_for_segment(segment, dataset, summary_index_setting) # Create or update summary record (will handle overwrite internally) summary_record = SummaryIndexService.create_summary_record( @@ -248,16 +260,14 @@ class SummaryIndexService: SummaryIndexService.vectorize_summary(summary_record, segment, dataset) db.session.commit() - logger.info(f"Successfully generated and vectorized summary for segment {segment.id}") + logger.info("Successfully generated and vectorized summary for segment %s", segment.id) return summary_record - except Exception as e: - logger.exception(f"Failed to generate summary for segment {segment.id}: {str(e)}") + except Exception: + logger.exception("Failed to generate summary for segment %s", segment.id) # Update summary record with error status if it exists summary_record = ( - db.session.query(DocumentSegmentSummary) - .filter_by(chunk_id=segment.id, dataset_id=dataset.id) - .first() + db.session.query(DocumentSegmentSummary).filter_by(chunk_id=segment.id, dataset_id=dataset.id).first() ) if summary_record: summary_record.status = "error" @@ -290,24 +300,27 @@ class SummaryIndexService: # Only generate summary index for high_quality indexing technique if dataset.indexing_technique != "high_quality": logger.info( - f"Skipping summary generation for dataset {dataset.id}: " - f"indexing_technique is {dataset.indexing_technique}, not 'high_quality'" + "Skipping summary generation for dataset %s: indexing_technique is %s, not 'high_quality'", + dataset.id, + dataset.indexing_technique, ) return [] if not summary_index_setting or not summary_index_setting.get("enable"): - logger.info(f"Summary index is disabled for dataset {dataset.id}") + logger.info("Summary index is disabled for dataset %s", dataset.id) return [] # Skip qa_model documents if document.doc_form == "qa_model": - logger.info(f"Skipping summary generation for qa_model document {document.id}") + logger.info("Skipping summary generation for qa_model document %s", document.id) return [] logger.info( - f"Starting summary generation for document {document.id} in dataset {dataset.id}, " - f"segment_ids: {len(segment_ids) if segment_ids else 'all'}, " - f"only_parent_chunks: {only_parent_chunks}" + "Starting summary generation for document %s in dataset %s, segment_ids: %s, only_parent_chunks: %s", + document.id, + dataset.id, + len(segment_ids) if segment_ids else "all", + only_parent_chunks, ) # Query segments (only enabled segments) @@ -324,7 +337,7 @@ class SummaryIndexService: segments = query.all() if not segments: - logger.info(f"No segments found for document {document.id}") + logger.info("No segments found for document %s", document.id) return [] summary_records = [] @@ -346,14 +359,15 @@ class SummaryIndexService: segment, dataset, summary_index_setting ) summary_records.append(summary_record) - except Exception as e: - logger.error(f"Failed to generate summary for segment {segment.id}: {str(e)}") + except Exception: + logger.exception("Failed to generate summary for segment %s", segment.id) # Continue with other segments continue logger.info( - f"Completed summary generation for document {document.id}: " - f"{len(summary_records)} summaries generated and vectorized" + "Completed summary generation for document %s: %s summaries generated and vectorized", + document.id, + len(summary_records), ) return summary_records @@ -373,7 +387,7 @@ class SummaryIndexService: disabled_by: User ID who disabled the summaries """ from libs.datetime_utils import naive_utc_now - + query = db.session.query(DocumentSegmentSummary).filter_by( dataset_id=dataset.id, enabled=True, # Only disable enabled summaries @@ -388,21 +402,21 @@ class SummaryIndexService: return logger.info( - f"Disabling {len(summaries)} summary records for dataset {dataset.id}, " - f"segment_ids: {len(segment_ids) if segment_ids else 'all'}" + "Disabling %s summary records for dataset %s, segment_ids: %s", + len(summaries), + dataset.id, + len(segment_ids) if segment_ids else "all", ) # Remove from vector database (but keep records) if dataset.indexing_technique == "high_quality": - summary_node_ids = [ - s.summary_index_node_id for s in summaries if s.summary_index_node_id - ] + summary_node_ids = [s.summary_index_node_id for s in summaries if s.summary_index_node_id] if summary_node_ids: try: vector = Vector(dataset) vector.delete_by_ids(summary_node_ids) except Exception as e: - logger.warning(f"Failed to remove summary vectors: {str(e)}") + logger.warning("Failed to remove summary vectors: %s", str(e)) # Disable summary records (don't delete) now = naive_utc_now() @@ -413,7 +427,7 @@ class SummaryIndexService: db.session.add(summary) db.session.commit() - logger.info(f"Disabled {len(summaries)} summary records for dataset {dataset.id}") + logger.info("Disabled %s summary records for dataset %s", len(summaries), dataset.id) @staticmethod def enable_summaries_for_segments( @@ -450,19 +464,25 @@ class SummaryIndexService: return logger.info( - f"Enabling {len(summaries)} summary records for dataset {dataset.id}, " - f"segment_ids: {len(segment_ids) if segment_ids else 'all'}" + "Enabling %s summary records for dataset %s, segment_ids: %s", + len(summaries), + dataset.id, + len(segment_ids) if segment_ids else "all", ) # Re-vectorize and re-add to vector database enabled_count = 0 for summary in summaries: # Get the original segment - segment = db.session.query(DocumentSegment).filter_by( - id=summary.chunk_id, - dataset_id=dataset.id, - ).first() - + segment = ( + db.session.query(DocumentSegment) + .filter_by( + id=summary.chunk_id, + dataset_id=dataset.id, + ) + .first() + ) + if not segment or not segment.enabled or segment.status != "completed": continue @@ -472,20 +492,20 @@ class SummaryIndexService: try: # Re-vectorize summary SummaryIndexService.vectorize_summary(summary, segment, dataset) - + # Enable summary record summary.enabled = True summary.disabled_at = None summary.disabled_by = None db.session.add(summary) enabled_count += 1 - except Exception as e: - logger.error(f"Failed to re-vectorize summary {summary.id}: {str(e)}") + except Exception: + logger.exception("Failed to re-vectorize summary %s", summary.id) # Keep it disabled if vectorization fails continue db.session.commit() - logger.info(f"Enabled {enabled_count} summary records for dataset {dataset.id}") + logger.info("Enabled %s summary records for dataset %s", enabled_count, dataset.id) @staticmethod def delete_summaries_for_segments( @@ -512,9 +532,7 @@ class SummaryIndexService: # Delete from vector database if dataset.indexing_technique == "high_quality": - summary_node_ids = [ - s.summary_index_node_id for s in summaries if s.summary_index_node_id - ] + summary_node_ids = [s.summary_index_node_id for s in summaries if s.summary_index_node_id] if summary_node_ids: vector = Vector(dataset) vector.delete_by_ids(summary_node_ids) @@ -524,7 +542,7 @@ class SummaryIndexService: db.session.delete(summary) db.session.commit() - logger.info(f"Deleted {len(summaries)} summary records for dataset {dataset.id}") + logger.info("Deleted %s summary records for dataset %s", len(summaries), dataset.id) @staticmethod def update_summary_for_segment( @@ -559,9 +577,7 @@ class SummaryIndexService: try: # Find existing summary record summary_record = ( - db.session.query(DocumentSegmentSummary) - .filter_by(chunk_id=segment.id, dataset_id=dataset.id) - .first() + db.session.query(DocumentSegmentSummary).filter_by(chunk_id=segment.id, dataset_id=dataset.id).first() ) if summary_record: @@ -583,7 +599,7 @@ class SummaryIndexService: SummaryIndexService.vectorize_summary(summary_record, segment, dataset) db.session.commit() - logger.info(f"Successfully updated and re-vectorized summary for segment {segment.id}") + logger.info("Successfully updated and re-vectorized summary for segment %s", segment.id) return summary_record else: # Create new summary record if doesn't exist @@ -592,16 +608,14 @@ class SummaryIndexService: ) SummaryIndexService.vectorize_summary(summary_record, segment, dataset) db.session.commit() - logger.info(f"Successfully created and vectorized summary for segment {segment.id}") + logger.info("Successfully created and vectorized summary for segment %s", segment.id) return summary_record - except Exception as e: - logger.exception(f"Failed to update summary for segment {segment.id}: {str(e)}") + except Exception: + logger.exception("Failed to update summary for segment %s", segment.id) # Update summary record with error status if it exists summary_record = ( - db.session.query(DocumentSegmentSummary) - .filter_by(chunk_id=segment.id, dataset_id=dataset.id) - .first() + db.session.query(DocumentSegmentSummary).filter_by(chunk_id=segment.id, dataset_id=dataset.id).first() ) if summary_record: summary_record.status = "error" @@ -609,4 +623,3 @@ class SummaryIndexService: db.session.add(summary_record) db.session.commit() raise - diff --git a/api/tasks/add_document_to_index_task.py b/api/tasks/add_document_to_index_task.py index da6f468edd..c6cf8cc10d 100644 --- a/api/tasks/add_document_to_index_task.py +++ b/api/tasks/add_document_to_index_task.py @@ -119,6 +119,7 @@ def add_document_to_index_task(dataset_document_id: str): # Enable summary indexes for all segments in this document from services.summary_index_service import SummaryIndexService + segment_ids_list = [segment.id for segment in segments] if segment_ids_list: try: @@ -127,7 +128,7 @@ def add_document_to_index_task(dataset_document_id: str): segment_ids=segment_ids_list, ) except Exception as e: - logger.warning(f"Failed to enable summaries for document {dataset_document.id}: {str(e)}") + logger.warning("Failed to enable summaries for document %s: %s", dataset_document.id, str(e)) end_at = time.perf_counter() logger.info( diff --git a/api/tasks/disable_segment_from_index_task.py b/api/tasks/disable_segment_from_index_task.py index 67c2867edd..335de86ec0 100644 --- a/api/tasks/disable_segment_from_index_task.py +++ b/api/tasks/disable_segment_from_index_task.py @@ -55,6 +55,7 @@ def disable_segment_from_index_task(segment_id: str): # Disable summary index for this segment from services.summary_index_service import SummaryIndexService + try: SummaryIndexService.disable_summaries_for_segments( dataset=dataset, @@ -62,7 +63,7 @@ def disable_segment_from_index_task(segment_id: str): disabled_by=segment.disabled_by, ) except Exception as e: - logger.warning(f"Failed to disable summary for segment {segment.id}: {str(e)}") + logger.warning("Failed to disable summary for segment %s: %s", segment.id, str(e)) index_type = dataset_document.doc_form index_processor = IndexProcessorFactory(index_type).init_index_processor() diff --git a/api/tasks/disable_segments_from_index_task.py b/api/tasks/disable_segments_from_index_task.py index b6a534bacf..43cd466e4b 100644 --- a/api/tasks/disable_segments_from_index_task.py +++ b/api/tasks/disable_segments_from_index_task.py @@ -60,6 +60,7 @@ def disable_segments_from_index_task(segment_ids: list, dataset_id: str, documen try: # Disable summary indexes for these segments from services.summary_index_service import SummaryIndexService + segment_ids_list = [segment.id for segment in segments] try: # Get disabled_by from first segment (they should all have the same disabled_by) @@ -70,7 +71,7 @@ def disable_segments_from_index_task(segment_ids: list, dataset_id: str, documen disabled_by=disabled_by, ) except Exception as e: - logger.warning(f"Failed to disable summaries for segments: {str(e)}") + logger.warning("Failed to disable summaries for segments: %s", str(e)) index_node_ids = [segment.index_node_id for segment in segments] if dataset.is_multimodal: diff --git a/api/tasks/document_indexing_task.py b/api/tasks/document_indexing_task.py index 319837ceaf..4c65ed6ab3 100644 --- a/api/tasks/document_indexing_task.py +++ b/api/tasks/document_indexing_task.py @@ -8,13 +8,13 @@ from celery import shared_task from configs import dify_config from core.entities.document_task import DocumentTask from core.indexing_runner import DocumentIsPausedError, IndexingRunner -from tasks.generate_summary_index_task import generate_summary_index_task from core.rag.pipeline.queue import TenantIsolatedTaskQueue from enums.cloud_plan import CloudPlan from extensions.ext_database import db from libs.datetime_utils import naive_utc_now from models.dataset import Dataset, Document from services.feature_service import FeatureService +from tasks.generate_summary_index_task import generate_summary_index_task logger = logging.getLogger(__name__) @@ -101,15 +101,15 @@ def _document_indexing(dataset_id: str, document_ids: Sequence[str]): indexing_runner.run(documents) end_at = time.perf_counter() logger.info(click.style(f"Processed dataset: {dataset_id} latency: {end_at - start_at}", fg="green")) - + # Trigger summary index generation for completed documents if enabled # Only generate for high_quality indexing technique and when summary_index_setting is enabled # Re-query dataset to get latest summary_index_setting (in case it was updated) dataset = db.session.query(Dataset).where(Dataset.id == dataset_id).first() if not dataset: - logger.warning(f"Dataset {dataset_id} not found after indexing") + logger.warning("Dataset %s not found after indexing", dataset_id) return - + if dataset.indexing_technique == "high_quality": summary_index_setting = dataset.summary_index_setting if summary_index_setting and summary_index_setting.get("enable"): @@ -123,37 +123,46 @@ def _document_indexing(dataset_id: str, document_ids: Sequence[str]): ) if document: logger.info( - f"Checking document {document_id} for summary generation: " - f"status={document.indexing_status}, doc_form={document.doc_form}" + "Checking document %s for summary generation: status=%s, doc_form=%s", + document_id, + document.indexing_status, + document.doc_form, ) if document.indexing_status == "completed" and document.doc_form != "qa_model": try: generate_summary_index_task.delay(dataset.id, document_id, None) logger.info( - f"Queued summary index generation task for document {document_id} " - f"in dataset {dataset.id} after indexing completed" + "Queued summary index generation task for document %s in dataset %s " + "after indexing completed", + document_id, + dataset.id, ) - except Exception as e: + except Exception: logger.exception( - f"Failed to queue summary index generation task for document {document_id}: {str(e)}" + "Failed to queue summary index generation task for document %s", + document_id, ) # Don't fail the entire indexing process if summary task queuing fails else: logger.info( - f"Skipping summary generation for document {document_id}: " - f"status={document.indexing_status}, doc_form={document.doc_form}" + "Skipping summary generation for document %s: status=%s, doc_form=%s", + document_id, + document.indexing_status, + document.doc_form, ) else: - logger.warning(f"Document {document_id} not found after indexing") + logger.warning("Document %s not found after indexing", document_id) else: logger.info( - f"Summary index generation skipped for dataset {dataset.id}: " - f"summary_index_setting.enable={summary_index_setting.get('enable') if summary_index_setting else None}" + "Summary index generation skipped for dataset %s: summary_index_setting.enable=%s", + dataset.id, + summary_index_setting.get("enable") if summary_index_setting else None, ) else: logger.info( - f"Summary index generation skipped for dataset {dataset.id}: " - f"indexing_technique={dataset.indexing_technique} (not 'high_quality')" + "Summary index generation skipped for dataset %s: indexing_technique=%s (not 'high_quality')", + dataset.id, + dataset.indexing_technique, ) except DocumentIsPausedError as ex: logger.info(click.style(str(ex), fg="yellow")) diff --git a/api/tasks/enable_segment_to_index_task.py b/api/tasks/enable_segment_to_index_task.py index 113e19871e..0b16dfd56e 100644 --- a/api/tasks/enable_segment_to_index_task.py +++ b/api/tasks/enable_segment_to_index_task.py @@ -105,13 +105,14 @@ def enable_segment_to_index_task(segment_id: str): # Enable summary index for this segment from services.summary_index_service import SummaryIndexService + try: SummaryIndexService.enable_summaries_for_segments( dataset=dataset, segment_ids=[segment.id], ) except Exception as e: - logger.warning(f"Failed to enable summary for segment {segment.id}: {str(e)}") + logger.warning("Failed to enable summary for segment %s: %s", segment.id, str(e)) end_at = time.perf_counter() logger.info(click.style(f"Segment enabled to index: {segment.id} latency: {end_at - start_at}", fg="green")) diff --git a/api/tasks/enable_segments_to_index_task.py b/api/tasks/enable_segments_to_index_task.py index 0c419ca2f0..8b9ae5c10b 100644 --- a/api/tasks/enable_segments_to_index_task.py +++ b/api/tasks/enable_segments_to_index_task.py @@ -110,6 +110,7 @@ def enable_segments_to_index_task(segment_ids: list, dataset_id: str, document_i # Enable summary indexes for these segments from services.summary_index_service import SummaryIndexService + segment_ids_list = [segment.id for segment in segments] try: SummaryIndexService.enable_summaries_for_segments( @@ -117,7 +118,7 @@ def enable_segments_to_index_task(segment_ids: list, dataset_id: str, document_i segment_ids=segment_ids_list, ) except Exception as e: - logger.warning(f"Failed to enable summaries for segments: {str(e)}") + logger.warning("Failed to enable summaries for segments: %s", str(e)) end_at = time.perf_counter() logger.info(click.style(f"Segments enabled to index latency: {end_at - start_at}", fg="green")) diff --git a/api/tasks/generate_summary_index_task.py b/api/tasks/generate_summary_index_task.py index 2850658ce4..99f957abaa 100644 --- a/api/tasks/generate_summary_index_task.py +++ b/api/tasks/generate_summary_index_task.py @@ -94,8 +94,8 @@ def generate_summary_index_task(dataset_id: str, document_id: str, segment_ids: ) ) - except Exception as e: - logger.exception(f"Failed to generate summary index for document {document_id}: {str(e)}") + except Exception: + logger.exception("Failed to generate summary index for document %s", document_id) # Update document segments with error status if needed if segment_ids: db.session.query(DocumentSegment).filter( @@ -110,4 +110,3 @@ def generate_summary_index_task(dataset_id: str, document_id: str, segment_ids: db.session.commit() finally: db.session.close() - diff --git a/api/tasks/regenerate_summary_index_task.py b/api/tasks/regenerate_summary_index_task.py index ddc48f9d99..f24b7bf368 100644 --- a/api/tasks/regenerate_summary_index_task.py +++ b/api/tasks/regenerate_summary_index_task.py @@ -2,7 +2,6 @@ import logging import time -from typing import Any import click from celery import shared_task @@ -24,13 +23,13 @@ def regenerate_summary_index_task( ): """ Regenerate summary indexes for all documents in a dataset. - + This task is triggered when: 1. summary_index_setting model changes (regenerate_reason="summary_model_changed") - Regenerates summary content and vectors for all existing summaries 2. embedding_model changes (regenerate_reason="embedding_model_changed") - Only regenerates vectors for existing summaries (keeps summary content) - + Args: dataset_id: Dataset ID regenerate_reason: Reason for regeneration ("summary_model_changed" or "embedding_model_changed") @@ -96,7 +95,9 @@ def regenerate_summary_index_task( return logger.info( - f"Found {len(dataset_documents)} documents for summary regeneration in dataset {dataset_id}" + "Found %s documents for summary regeneration in dataset %s", + len(dataset_documents), + dataset_id, ) total_segments_processed = 0 @@ -130,7 +131,9 @@ def regenerate_summary_index_task( continue logger.info( - f"Regenerating summaries for {len(segments)} segments in document {dataset_document.id}" + "Regenerating summaries for %s segments in document %s", + len(segments), + dataset_document.id, ) for segment in segments: @@ -146,9 +149,7 @@ def regenerate_summary_index_task( ) if not summary_record: - logger.warning( - f"Summary record not found for segment {segment.id}, skipping" - ) + logger.warning("Summary record not found for segment %s, skipping", segment.id) continue if regenerate_vectors_only: @@ -162,26 +163,26 @@ def regenerate_summary_index_task( vector.delete_by_ids([summary_record.summary_index_node_id]) except Exception as e: logger.warning( - f"Failed to delete old summary vector for segment {segment.id}: {str(e)}" + "Failed to delete old summary vector for segment %s: %s", + segment.id, + str(e), ) # Re-vectorize with new embedding model - SummaryIndexService.vectorize_summary( - summary_record, segment, dataset - ) + SummaryIndexService.vectorize_summary(summary_record, segment, dataset) db.session.commit() else: # Regenerate both summary content and vectors (for summary_model change) - SummaryIndexService.generate_and_vectorize_summary( - segment, dataset, summary_index_setting - ) + SummaryIndexService.generate_and_vectorize_summary(segment, dataset, summary_index_setting) db.session.commit() total_segments_processed += 1 except Exception as e: logger.error( - f"Failed to regenerate summary for segment {segment.id}: {str(e)}", + "Failed to regenerate summary for segment %s: %s", + segment.id, + str(e), exc_info=True, ) total_segments_failed += 1 @@ -195,7 +196,9 @@ def regenerate_summary_index_task( except Exception as e: logger.error( - f"Failed to process document {dataset_document.id} for summary regeneration: {str(e)}", + "Failed to process document %s for summary regeneration: %s", + dataset_document.id, + str(e), exc_info=True, ) continue @@ -213,7 +216,6 @@ def regenerate_summary_index_task( ) except Exception: - logger.exception(f"Regenerate summary index failed for dataset {dataset_id}") + logger.exception("Regenerate summary index failed for dataset %s", dataset_id) finally: db.session.close() - diff --git a/api/tasks/remove_document_from_index_task.py b/api/tasks/remove_document_from_index_task.py index 7d191f00c0..5c8f1ff993 100644 --- a/api/tasks/remove_document_from_index_task.py +++ b/api/tasks/remove_document_from_index_task.py @@ -47,9 +47,10 @@ def remove_document_from_index_task(document_id: str): index_processor = IndexProcessorFactory(document.doc_form).init_index_processor() segments = db.session.scalars(select(DocumentSegment).where(DocumentSegment.document_id == document.id)).all() - + # Disable summary indexes for all segments in this document from services.summary_index_service import SummaryIndexService + segment_ids_list = [segment.id for segment in segments] if segment_ids_list: try: @@ -59,8 +60,8 @@ def remove_document_from_index_task(document_id: str): disabled_by=document.disabled_by, ) except Exception as e: - logger.warning(f"Failed to disable summaries for document {document.id}: {str(e)}") - + logger.warning("Failed to disable summaries for document %s: %s", document.id, str(e)) + index_node_ids = [segment.index_node_id for segment in segments] if index_node_ids: try: