feat: Make summary index support vision, and make the code more standardized.

This commit is contained in:
FFXN 2026-01-14 17:52:27 +08:00
parent 9b7e807690
commit 7eb65b07c8
23 changed files with 569 additions and 307 deletions

View File

@ -107,7 +107,8 @@ class DocumentRenamePayload(BaseModel):
class GenerateSummaryPayload(BaseModel):
document_list: list[str]
class DocumentDatasetListParam(BaseModel):
page: int = Field(1, title="Page", description="Page number.")
limit: int = Field(20, title="Limit", description="Page size.")
@ -311,17 +312,14 @@ class DatasetDocumentListApi(Resource):
paginated_documents = db.paginate(select=query, page=page, per_page=limit, max_per_page=100, error_out=False)
documents = paginated_documents.items
# Check if dataset has summary index enabled
has_summary_index = (
dataset.summary_index_setting
and dataset.summary_index_setting.get("enable") is True
)
has_summary_index = dataset.summary_index_setting and dataset.summary_index_setting.get("enable") is True
# Filter documents that need summary calculation
documents_need_summary = [doc for doc in documents if doc.need_summary is True]
document_ids_need_summary = [str(doc.id) for doc in documents_need_summary]
# Calculate summary_index_status for documents that need summary (only if dataset summary index is enabled)
summary_status_map = {}
if has_summary_index and document_ids_need_summary:
@ -335,7 +333,7 @@ class DatasetDocumentListApi(Resource):
)
.all()
)
# Group segments by document_id
document_segments_map = {}
for segment in segments:
@ -343,7 +341,7 @@ class DatasetDocumentListApi(Resource):
if doc_id not in document_segments_map:
document_segments_map[doc_id] = []
document_segments_map[doc_id].append(segment.id)
# Get all summary records for these segments
all_segment_ids = [seg.id for seg in segments]
summaries = {}
@ -358,7 +356,7 @@ class DatasetDocumentListApi(Resource):
.all()
)
summaries = {summary.chunk_id: summary.status for summary in summary_records}
# Calculate summary_index_status for each document
for doc_id in document_ids_need_summary:
segment_ids = document_segments_map.get(doc_id, [])
@ -366,7 +364,7 @@ class DatasetDocumentListApi(Resource):
# No segments, status is "GENERATING" (waiting to generate)
summary_status_map[doc_id] = "GENERATING"
continue
# Count summary statuses for this document's segments
status_counts = {"completed": 0, "generating": 0, "error": 0, "not_started": 0}
for segment_id in segment_ids:
@ -375,12 +373,12 @@ class DatasetDocumentListApi(Resource):
status_counts[status] += 1
else:
status_counts["not_started"] += 1
total_segments = len(segment_ids)
completed_count = status_counts["completed"]
generating_count = status_counts["generating"]
error_count = status_counts["error"]
# Determine overall status (only three states: GENERATING, COMPLETED, ERROR)
if completed_count == total_segments:
summary_status_map[doc_id] = "COMPLETED"
@ -393,7 +391,7 @@ class DatasetDocumentListApi(Resource):
else:
# Default to generating
summary_status_map[doc_id] = "GENERATING"
# Add summary_index_status to each document
for document in documents:
if has_summary_index and document.need_summary is True:
@ -401,7 +399,7 @@ class DatasetDocumentListApi(Resource):
else:
# Return null if summary index is not enabled or document doesn't need summary
document.summary_index_status = None
if fetch:
for document in documents:
completed_segments = (
@ -500,7 +498,6 @@ class DatasetDocumentListApi(Resource):
return {"result": "success"}, 204
@console_ns.route("/datasets/init")
class DatasetInitApi(Resource):
@console_ns.doc("init_dataset")
@ -1311,49 +1308,46 @@ class DocumentGenerateSummaryApi(Resource):
def post(self, dataset_id):
"""
Generate summary index for specified documents.
This endpoint checks if the dataset configuration supports summary generation
(indexing_technique must be 'high_quality' and summary_index_setting.enable must be true),
then asynchronously generates summary indexes for the provided documents.
"""
current_user, _ = current_account_with_tenant()
dataset_id = str(dataset_id)
# Get dataset
dataset = DatasetService.get_dataset(dataset_id)
if not dataset:
raise NotFound("Dataset not found.")
# Check permissions
if not current_user.is_dataset_editor:
raise Forbidden()
try:
DatasetService.check_dataset_permission(dataset, current_user)
except services.errors.account.NoPermissionError as e:
raise Forbidden(str(e))
# Validate request payload
payload = GenerateSummaryPayload.model_validate(console_ns.payload or {})
document_list = payload.document_list
if not document_list:
raise ValueError("document_list cannot be empty.")
# Check if dataset configuration supports summary generation
if dataset.indexing_technique != "high_quality":
raise ValueError(
f"Summary generation is only available for 'high_quality' indexing technique. "
f"Current indexing technique: {dataset.indexing_technique}"
)
summary_index_setting = dataset.summary_index_setting
if not summary_index_setting or not summary_index_setting.get("enable"):
raise ValueError(
"Summary index is not enabled for this dataset. "
"Please enable it in the dataset settings."
)
raise ValueError("Summary index is not enabled for this dataset. Please enable it in the dataset settings.")
# Verify all documents exist and belong to the dataset
documents = (
db.session.query(Document)
@ -1363,27 +1357,27 @@ class DocumentGenerateSummaryApi(Resource):
)
.all()
)
if len(documents) != len(document_list):
found_ids = {doc.id for doc in documents}
missing_ids = set(document_list) - found_ids
raise NotFound(f"Some documents not found: {list(missing_ids)}")
# Dispatch async tasks for each document
for document in documents:
# Skip qa_model documents as they don't generate summaries
if document.doc_form == "qa_model":
logger.info(
f"Skipping summary generation for qa_model document {document.id}"
)
logger.info("Skipping summary generation for qa_model document %s", document.id)
continue
# Dispatch async task
generate_summary_index_task(dataset_id, document.id)
logger.info(
f"Dispatched summary generation task for document {document.id} in dataset {dataset_id}"
"Dispatched summary generation task for document %s in dataset %s",
document.id,
dataset_id,
)
return {"result": "success"}, 200
@ -1400,7 +1394,7 @@ class DocumentSummaryStatusApi(DocumentResource):
def get(self, dataset_id, document_id):
"""
Get summary index generation status for a document.
Returns:
- total_segments: Total number of segments in the document
- summary_status: Dictionary with status counts
@ -1413,21 +1407,21 @@ class DocumentSummaryStatusApi(DocumentResource):
current_user, _ = current_account_with_tenant()
dataset_id = str(dataset_id)
document_id = str(document_id)
# Get document
document = self.get_document(dataset_id, document_id)
# Get dataset
dataset = DatasetService.get_dataset(dataset_id)
if not dataset:
raise NotFound("Dataset not found.")
# Check permissions
try:
DatasetService.check_dataset_permission(dataset, current_user)
except services.errors.account.NoPermissionError as e:
raise Forbidden(str(e))
# Get all segments for this document
segments = (
db.session.query(DocumentSegment)
@ -1439,9 +1433,9 @@ class DocumentSummaryStatusApi(DocumentResource):
)
.all()
)
total_segments = len(segments)
# Get all summary records for these segments
segment_ids = [segment.id for segment in segments]
summaries = []
@ -1456,10 +1450,10 @@ class DocumentSummaryStatusApi(DocumentResource):
)
.all()
)
# Create a mapping of chunk_id to summary
summary_map = {summary.chunk_id: summary for summary in summaries}
# Count statuses
status_counts = {
"completed": 0,
@ -1467,34 +1461,42 @@ class DocumentSummaryStatusApi(DocumentResource):
"error": 0,
"not_started": 0,
}
summary_list = []
for segment in segments:
summary = summary_map.get(segment.id)
if summary:
status = summary.status
status_counts[status] = status_counts.get(status, 0) + 1
summary_list.append({
"segment_id": segment.id,
"segment_position": segment.position,
"status": summary.status,
"summary_preview": summary.summary_content[:100] + "..." if summary.summary_content and len(summary.summary_content) > 100 else summary.summary_content,
"error": summary.error,
"created_at": int(summary.created_at.timestamp()) if summary.created_at else None,
"updated_at": int(summary.updated_at.timestamp()) if summary.updated_at else None,
})
summary_list.append(
{
"segment_id": segment.id,
"segment_position": segment.position,
"status": summary.status,
"summary_preview": (
summary.summary_content[:100] + "..."
if summary.summary_content and len(summary.summary_content) > 100
else summary.summary_content
),
"error": summary.error,
"created_at": int(summary.created_at.timestamp()) if summary.created_at else None,
"updated_at": int(summary.updated_at.timestamp()) if summary.updated_at else None,
}
)
else:
status_counts["not_started"] += 1
summary_list.append({
"segment_id": segment.id,
"segment_position": segment.position,
"status": "not_started",
"summary_preview": None,
"error": None,
"created_at": None,
"updated_at": None,
})
summary_list.append(
{
"segment_id": segment.id,
"segment_position": segment.position,
"status": "not_started",
"summary_preview": None,
"error": None,
"created_at": None,
"updated_at": None,
}
)
return {
"total_segments": total_segments,
"summary_status": status_counts,

View File

@ -212,9 +212,7 @@ class DatasetDocumentSegmentListApi(Resource):
)
# Only include enabled summaries
summaries = {
summary.chunk_id: summary.summary_content
for summary in summary_records
if summary.enabled is True
summary.chunk_id: summary.summary_content for summary in summary_records if summary.enabled is True
}
# Add summary to each segment
@ -433,7 +431,7 @@ class DatasetDocumentSegmentUpdateApi(Resource):
payload = SegmentUpdatePayload.model_validate(console_ns.payload or {})
payload_dict = payload.model_dump(exclude_none=True)
SegmentService.segment_create_args_validate(payload_dict, document)
# Update segment (summary update with change detection is handled in SegmentService.update_segment)
segment = SegmentService.update_segment(
SegmentUpdateArgs.model_validate(payload.model_dump(exclude_none=True)), segment, document, dataset

View File

@ -1,6 +1,13 @@
from flask_restx import Resource, fields
from controllers.common.schema import register_schema_model
from fields.hit_testing_fields import (
child_chunk_fields,
document_fields,
files_fields,
hit_testing_record_fields,
segment_fields,
)
from libs.login import login_required
from .. import console_ns
@ -10,13 +17,6 @@ from ..wraps import (
cloud_edition_billing_rate_limit_check,
setup_required,
)
from fields.hit_testing_fields import (
child_chunk_fields,
document_fields,
files_fields,
hit_testing_record_fields,
segment_fields,
)
register_schema_model(console_ns, HitTestingPayload)

View File

@ -367,8 +367,8 @@ class IndexingRunner:
return IndexingEstimate(total_segments=total_segments * 20, qa_preview=qa_preview_texts, preview=[])
# Generate summary preview
summary_index_setting = tmp_processing_rule["summary_index_setting"] if "summary_index_setting" in tmp_processing_rule else None
if summary_index_setting and summary_index_setting.get('enable') and preview_texts:
summary_index_setting = tmp_processing_rule.get("summary_index_setting")
if summary_index_setting and summary_index_setting.get("enable") and preview_texts:
preview_texts = index_processor.generate_summary_preview(tenant_id, preview_texts, summary_index_setting)
return IndexingEstimate(total_segments=total_segments, preview=preview_texts)

View File

@ -436,4 +436,6 @@ You should edit the prompt according to the IDEAL OUTPUT."""
INSTRUCTION_GENERATE_TEMPLATE_CODE = """Please fix the errors in the {{#error_message#}}."""
DEFAULT_GENERATOR_SUMMARY_PROMPT = """
You are a helpful assistant that summarizes long pieces of text into concise summaries. Given the following text, generate a brief summary that captures the main points and key information. The summary should be clear, concise, and written in complete sentences. """
You are a helpful assistant that summarizes long pieces of text into concise summaries.
Given the following text, generate a brief summary that captures the main points and key information.
The summary should be clear, concise, and written in complete sentences. """

View File

@ -395,7 +395,7 @@ class RetrievalService:
index_node_ids = []
doc_to_document_map = {}
summary_segment_ids = set() # Track segments retrieved via summary
# First pass: collect all document IDs and identify summary documents
for document in documents:
document_id = document.metadata.get("document_id")
@ -455,7 +455,7 @@ class RetrievalService:
doc_segment_map[attachment["segment_id"]].append(attachment["attachment_id"])
else:
doc_segment_map[attachment["segment_id"]] = [attachment["attachment_id"]]
child_chunk_stmt = select(ChildChunk).where(ChildChunk.index_node_id.in_(child_index_node_ids))
child_index_nodes = session.execute(child_chunk_stmt).scalars().all()
@ -479,7 +479,7 @@ class RetrievalService:
index_node_segments = session.execute(document_segment_stmt).scalars().all() # type: ignore
for index_node_segment in index_node_segments:
doc_segment_map[index_node_segment.id] = [index_node_segment.index_node_id]
if segment_ids:
document_segment_stmt = select(DocumentSegment).where(
DocumentSegment.enabled == True,

View File

@ -47,7 +47,9 @@ class BaseIndexProcessor(ABC):
raise NotImplementedError
@abstractmethod
def generate_summary_preview(self, tenant_id: str, preview_texts: list[PreviewDetail], summary_index_setting: dict) -> list[PreviewDetail]:
def generate_summary_preview(
self, tenant_id: str, preview_texts: list[PreviewDetail], summary_index_setting: dict
) -> list[PreviewDetail]:
"""
For each segment in preview_texts, generate a summary using LLM and attach it to the segment.
The summary can be stored in a new attribute, e.g., summary.

View File

@ -1,6 +1,7 @@
"""Paragraph index processor."""
import logging
import re
import uuid
from collections.abc import Mapping
from typing import Any
@ -8,6 +9,17 @@ from typing import Any
logger = logging.getLogger(__name__)
from core.entities.knowledge_entities import PreviewDetail
from core.file import File, FileTransferMethod, FileType, file_manager
from core.llm_generator.prompts import DEFAULT_GENERATOR_SUMMARY_PROMPT
from core.model_manager import ModelInstance
from core.model_runtime.entities.message_entities import (
ImagePromptMessageContent,
PromptMessageContentUnionTypes,
TextPromptMessageContent,
UserPromptMessage,
)
from core.model_runtime.entities.model_entities import ModelFeature, ModelType
from core.provider_manager import ProviderManager
from core.rag.cleaner.clean_processor import CleanProcessor
from core.rag.datasource.keyword.keyword_factory import Keyword
from core.rag.datasource.retrieval_service import RetrievalService
@ -22,18 +34,15 @@ from core.rag.models.document import AttachmentDocument, Document, MultimodalGen
from core.rag.retrieval.retrieval_methods import RetrievalMethod
from core.tools.utils.text_processing_utils import remove_leading_symbols
from extensions.ext_database import db
from factories.file_factory import build_from_mapping
from libs import helper
from models import UploadFile
from models.account import Account
from models.dataset import Dataset, DatasetProcessRule, DocumentSegment
from models.dataset import Dataset, DatasetProcessRule, DocumentSegment, SegmentAttachmentBinding
from models.dataset import Document as DatasetDocument
from services.account_service import AccountService
from services.entities.knowledge_entities.knowledge_entities import Rule
from services.summary_index_service import SummaryIndexService
from core.llm_generator.prompts import DEFAULT_GENERATOR_SUMMARY_PROMPT
from core.model_runtime.entities.message_entities import UserPromptMessage
from core.model_runtime.entities.model_entities import ModelType
from core.provider_manager import ProviderManager
from core.model_manager import ModelInstance
class ParagraphIndexProcessor(BaseIndexProcessor):
@ -262,12 +271,15 @@ class ParagraphIndexProcessor(BaseIndexProcessor):
else:
raise ValueError("Chunks is not a list")
def generate_summary_preview(self, tenant_id: str, preview_texts: list[PreviewDetail], summary_index_setting: dict) -> list[PreviewDetail]:
def generate_summary_preview(
self, tenant_id: str, preview_texts: list[PreviewDetail], summary_index_setting: dict
) -> list[PreviewDetail]:
"""
For each segment, concurrently call generate_summary to generate a summary
and write it to the summary attribute of PreviewDetail.
"""
import concurrent.futures
from flask import current_app
# Capture Flask app context for worker threads
@ -289,8 +301,8 @@ class ParagraphIndexProcessor(BaseIndexProcessor):
# Fallback: try without app context (may fail)
summary = self.generate_summary(tenant_id, preview.content, summary_index_setting)
preview.summary = summary
except Exception as e:
logger.error(f"Failed to generate summary for preview: {str(e)}")
except Exception:
logger.exception("Failed to generate summary for preview")
# Don't fail the entire preview if summary generation fails
preview.summary = None
@ -299,9 +311,21 @@ class ParagraphIndexProcessor(BaseIndexProcessor):
return preview_texts
@staticmethod
def generate_summary(tenant_id: str, text: str, summary_index_setting: dict = None) -> str:
def generate_summary(
tenant_id: str,
text: str,
summary_index_setting: dict | None = None,
segment_id: str | None = None,
) -> str:
"""
Generate summary for the given text using ModelInstance.invoke_llm and the default or custom summary prompt.
Generate summary for the given text using ModelInstance.invoke_llm and the default or custom summary prompt,
and supports vision models by including images from the segment attachments or text content.
Args:
tenant_id: Tenant ID
text: Text content to summarize
summary_index_setting: Summary index configuration
segment_id: Optional segment ID to fetch attachments from SegmentAttachmentBinding table
"""
if not summary_index_setting or not summary_index_setting.get("enable"):
raise ValueError("summary_index_setting is required and must be enabled to generate summary.")
@ -314,17 +338,195 @@ class ParagraphIndexProcessor(BaseIndexProcessor):
if not summary_prompt:
summary_prompt = DEFAULT_GENERATOR_SUMMARY_PROMPT
prompt = f"{summary_prompt}\n{text}"
provider_manager = ProviderManager()
provider_model_bundle = provider_manager.get_provider_model_bundle(tenant_id, model_provider_name, ModelType.LLM)
model_instance = ModelInstance(provider_model_bundle, model_name)
prompt_messages = [UserPromptMessage(content=prompt)]
result = model_instance.invoke_llm(
prompt_messages=prompt_messages,
model_parameters={},
stream=False
provider_model_bundle = provider_manager.get_provider_model_bundle(
tenant_id, model_provider_name, ModelType.LLM
)
model_instance = ModelInstance(provider_model_bundle, model_name)
# Get model schema to check if vision is supported
model_schema = model_instance.get_model_schema(model_name, provider_model_bundle.credentials)
supports_vision = model_schema and model_schema.features and ModelFeature.VISION in model_schema.features
# Extract images if model supports vision
image_files = []
if supports_vision:
# First, try to get images from SegmentAttachmentBinding (preferred method)
if segment_id:
image_files = ParagraphIndexProcessor._extract_images_from_segment_attachments(tenant_id, segment_id)
# If no images from attachments, fall back to extracting from text
if not image_files:
image_files = ParagraphIndexProcessor._extract_images_from_text(tenant_id, text)
# Build prompt messages
prompt_messages = []
if image_files:
# If we have images, create a UserPromptMessage with both text and images
prompt_message_contents: list[PromptMessageContentUnionTypes] = []
# Add images first
for file in image_files:
try:
file_content = file_manager.to_prompt_message_content(
file, image_detail_config=ImagePromptMessageContent.DETAIL.LOW
)
prompt_message_contents.append(file_content)
except Exception as e:
logger.warning("Failed to convert image file to prompt message content: %s", str(e))
continue
# Add text content
if prompt_message_contents: # Only add text if we successfully added images
prompt_message_contents.append(TextPromptMessageContent(data=f"{summary_prompt}\n{text}"))
prompt_messages.append(UserPromptMessage(content=prompt_message_contents))
else:
# If image conversion failed, fall back to text-only
prompt = f"{summary_prompt}\n{text}"
prompt_messages.append(UserPromptMessage(content=prompt))
else:
# No images, use simple text prompt
prompt = f"{summary_prompt}\n{text}"
prompt_messages.append(UserPromptMessage(content=prompt))
result = model_instance.invoke_llm(prompt_messages=prompt_messages, model_parameters={}, stream=False)
return getattr(result.message, "content", "")
@staticmethod
def _extract_images_from_text(tenant_id: str, text: str) -> list[File]:
"""
Extract images from markdown text and convert them to File objects.
Args:
tenant_id: Tenant ID
text: Text content that may contain markdown image links
Returns:
List of File objects representing images found in the text
"""
# Extract markdown images using regex pattern
pattern = r"!\[.*?\]\((.*?)\)"
images = re.findall(pattern, text)
if not images:
return []
upload_file_id_list = []
for image in images:
# For data before v0.10.0
pattern = r"/files/([a-f0-9\-]+)/image-preview(?:\?.*?)?"
match = re.search(pattern, image)
if match:
upload_file_id = match.group(1)
upload_file_id_list.append(upload_file_id)
continue
# For data after v0.10.0
pattern = r"/files/([a-f0-9\-]+)/file-preview(?:\?.*?)?"
match = re.search(pattern, image)
if match:
upload_file_id = match.group(1)
upload_file_id_list.append(upload_file_id)
continue
# For tools directory - direct file formats (e.g., .png, .jpg, etc.)
pattern = r"/files/tools/([a-f0-9\-]+)\.([a-zA-Z0-9]+)(?:\?[^\s\)\"\']*)?"
match = re.search(pattern, image)
if match:
# Tool files are handled differently, skip for now
continue
if not upload_file_id_list:
return []
# Get unique IDs for database query
unique_upload_file_ids = list(set(upload_file_id_list))
upload_files = (
db.session.query(UploadFile)
.where(UploadFile.id.in_(unique_upload_file_ids), UploadFile.tenant_id == tenant_id)
.all()
)
# Create File objects from UploadFile records
file_objects = []
for upload_file in upload_files:
# Only process image files
if not upload_file.mime_type or "image" not in upload_file.mime_type:
continue
mapping = {
"upload_file_id": upload_file.id,
"transfer_method": FileTransferMethod.LOCAL_FILE.value,
"type": FileType.IMAGE.value,
}
try:
file_obj = build_from_mapping(
mapping=mapping,
tenant_id=tenant_id,
)
file_objects.append(file_obj)
except Exception as e:
logger.warning("Failed to create File object from UploadFile %s: %s", upload_file.id, str(e))
continue
return file_objects
@staticmethod
def _extract_images_from_segment_attachments(tenant_id: str, segment_id: str) -> list[File]:
"""
Extract images from SegmentAttachmentBinding table (preferred method).
This matches how DatasetRetrieval gets segment attachments.
Args:
tenant_id: Tenant ID
segment_id: Segment ID to fetch attachments for
Returns:
List of File objects representing images found in segment attachments
"""
from sqlalchemy import select
# Query attachments from SegmentAttachmentBinding table
attachments_with_bindings = db.session.execute(
select(SegmentAttachmentBinding, UploadFile)
.join(UploadFile, UploadFile.id == SegmentAttachmentBinding.attachment_id)
.where(
SegmentAttachmentBinding.segment_id == segment_id,
SegmentAttachmentBinding.tenant_id == tenant_id,
)
).all()
if not attachments_with_bindings:
return []
file_objects = []
for _, upload_file in attachments_with_bindings:
# Only process image files
if not upload_file.mime_type or "image" not in upload_file.mime_type:
continue
try:
# Create File object directly (similar to DatasetRetrieval)
file_obj = File(
id=upload_file.id,
filename=upload_file.name,
extension="." + upload_file.extension,
mime_type=upload_file.mime_type,
tenant_id=tenant_id,
type=FileType.IMAGE,
transfer_method=FileTransferMethod.LOCAL_FILE,
remote_url=upload_file.source_url,
related_id=upload_file.id,
size=upload_file.size,
storage_key=upload_file.key,
)
file_objects.append(file_obj)
except Exception as e:
logger.warning("Failed to create File object from UploadFile %s: %s", upload_file.id, str(e))
continue
return file_objects

View File

@ -65,14 +65,14 @@ class DocumentExtractorNode(Node[DocumentExtractorNodeData]):
# Ensure storage_key is loaded for File objects
files_to_check = value if isinstance(value, list) else [value]
files_needing_storage_key = [
f for f in files_to_check
if isinstance(f, File) and not f.storage_key and f.related_id
f for f in files_to_check if isinstance(f, File) and not f.storage_key and f.related_id
]
if files_needing_storage_key:
from factories.file_factory import StorageKeyLoader
from extensions.ext_database import db
from sqlalchemy.orm import Session
from extensions.ext_database import db
from factories.file_factory import StorageKeyLoader
with Session(bind=db.engine) as session:
storage_key_loader = StorageKeyLoader(session, tenant_id=self.tenant_id)
storage_key_loader.load_storage_keys(files_needing_storage_key)
@ -433,12 +433,13 @@ def _download_file_content(file: File) -> bytes:
# Check if storage_key is set
if not file.storage_key:
raise FileDownloadError(f"File storage_key is missing for file: {file.filename}")
# Check if file exists before downloading
from extensions.ext_storage import storage
if not storage.exists(file.storage_key):
raise FileDownloadError(f"File not found in storage: {file.storage_key}")
return file_manager.download(file)
except Exception as e:
raise FileDownloadError(f"Error downloading file: {str(e)}") from e

View File

@ -77,11 +77,13 @@ class KnowledgeIndexNode(Node[KnowledgeIndexNodeData]):
# or fallback to dataset if not available in node_data
indexing_technique = node_data.indexing_technique or dataset.indexing_technique
summary_index_setting = node_data.summary_index_setting or dataset.summary_index_setting
outputs = self._get_preview_output_with_summaries(
node_data.chunk_structure, chunks, dataset=dataset,
node_data.chunk_structure,
chunks,
dataset=dataset,
indexing_technique=indexing_technique,
summary_index_setting=summary_index_setting
summary_index_setting=summary_index_setting,
)
return NodeRunResult(
status=WorkflowNodeExecutionStatus.SUCCEEDED,
@ -237,7 +239,7 @@ class KnowledgeIndexNode(Node[KnowledgeIndexNodeData]):
segments = query.all()
if not segments:
logger.info(f"No segments found for document {document.id}")
logger.info("No segments found for document %s", document.id)
return
# Filter segments based on mode
@ -256,7 +258,7 @@ class KnowledgeIndexNode(Node[KnowledgeIndexNodeData]):
segments_to_process.append(segment)
if not segments_to_process:
logger.info(f"No segments need summary generation for document {document.id}")
logger.info("No segments need summary generation for document %s", document.id)
return
# Use ThreadPoolExecutor for concurrent generation
@ -267,46 +269,55 @@ class KnowledgeIndexNode(Node[KnowledgeIndexNodeData]):
"""Process a single segment in a thread with Flask app context."""
with flask_app.app_context():
try:
SummaryIndexService.generate_and_vectorize_summary(
segment, dataset, summary_index_setting
SummaryIndexService.generate_and_vectorize_summary(segment, dataset, summary_index_setting)
except Exception:
logger.exception(
"Failed to generate summary for segment %s",
segment.id,
)
except Exception as e:
logger.error(f"Failed to generate summary for segment {segment.id}: {str(e)}")
# Continue processing other segments
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = [
executor.submit(process_segment, segment) for segment in segments_to_process
]
futures = [executor.submit(process_segment, segment) for segment in segments_to_process]
# Wait for all tasks to complete
concurrent.futures.wait(futures)
logger.info(
f"Successfully generated summary index for {len(segments_to_process)} segments "
f"in document {document.id}"
"Successfully generated summary index for %s segments in document %s",
len(segments_to_process),
document.id,
)
except Exception as e:
logger.exception(f"Failed to generate summary index for document {document.id}: {str(e)}")
except Exception:
logger.exception("Failed to generate summary index for document %s", document.id)
# Don't fail the entire indexing process if summary generation fails
else:
# Production mode: asynchronous generation
logger.info(f"Queuing summary index generation task for document {document.id} (production mode)")
logger.info(
"Queuing summary index generation task for document %s (production mode)",
document.id,
)
try:
generate_summary_index_task.delay(dataset.id, document.id, None)
logger.info(f"Summary index generation task queued for document {document.id}")
except Exception as e:
logger.exception(f"Failed to queue summary index generation task for document {document.id}: {str(e)}")
logger.info("Summary index generation task queued for document %s", document.id)
except Exception:
logger.exception(
"Failed to queue summary index generation task for document %s",
document.id,
)
# Don't fail the entire indexing process if task queuing fails
def _get_preview_output_with_summaries(
self, chunk_structure: str, chunks: Any, dataset: Dataset,
self,
chunk_structure: str,
chunks: Any,
dataset: Dataset,
indexing_technique: str | None = None,
summary_index_setting: dict | None = None
summary_index_setting: dict | None = None,
) -> Mapping[str, Any]:
"""
Generate preview output with summaries for chunks in preview mode.
This method generates summaries on-the-fly without saving to database.
Args:
chunk_structure: Chunk structure type
chunks: Chunks to generate preview for
@ -316,31 +327,32 @@ class KnowledgeIndexNode(Node[KnowledgeIndexNodeData]):
"""
index_processor = IndexProcessorFactory(chunk_structure).init_index_processor()
preview_output = index_processor.format_preview(chunks)
# Check if summary index is enabled
if indexing_technique != "high_quality":
return preview_output
if not summary_index_setting or not summary_index_setting.get("enable"):
return preview_output
# Generate summaries for chunks
if "preview" in preview_output and isinstance(preview_output["preview"], list):
chunk_count = len(preview_output["preview"])
logger.info(
f"Generating summaries for {chunk_count} chunks in preview mode "
f"(dataset: {dataset.id})"
"Generating summaries for %s chunks in preview mode (dataset: %s)",
chunk_count,
dataset.id,
)
# Use ParagraphIndexProcessor's generate_summary method
from core.rag.index_processor.processor.paragraph_index_processor import ParagraphIndexProcessor
# Get Flask app for application context in worker threads
flask_app = None
try:
flask_app = current_app._get_current_object() # type: ignore
except RuntimeError:
logger.warning("No Flask application context available, summary generation may fail")
def generate_summary_for_chunk(preview_item: dict) -> None:
"""Generate summary for a single chunk."""
if "content" in preview_item:
@ -364,10 +376,10 @@ class KnowledgeIndexNode(Node[KnowledgeIndexNodeData]):
)
if summary:
preview_item["summary"] = summary
except Exception as e:
logger.error(f"Failed to generate summary for chunk: {str(e)}")
except Exception:
logger.exception("Failed to generate summary for chunk")
# Don't fail the entire preview if summary generation fails
# Generate summaries concurrently using ThreadPoolExecutor
# Set a reasonable timeout to prevent hanging (60 seconds per chunk, max 5 minutes total)
timeout_seconds = min(300, 60 * len(preview_output["preview"]))
@ -378,31 +390,39 @@ class KnowledgeIndexNode(Node[KnowledgeIndexNodeData]):
]
# Wait for all tasks to complete with timeout
done, not_done = concurrent.futures.wait(futures, timeout=timeout_seconds)
# Cancel tasks that didn't complete in time
if not_done:
logger.warning(
f"Summary generation timeout: {len(not_done)} chunks did not complete within {timeout_seconds}s. "
"Cancelling remaining tasks..."
"Summary generation timeout: %s chunks did not complete within %ss. "
"Cancelling remaining tasks...",
len(not_done),
timeout_seconds,
)
for future in not_done:
future.cancel()
# Wait a bit for cancellation to take effect
concurrent.futures.wait(not_done, timeout=5)
completed_count = sum(1 for item in preview_output["preview"] if item.get("summary") is not None)
logger.info(
f"Completed summary generation for preview chunks: {completed_count}/{len(preview_output['preview'])} succeeded"
"Completed summary generation for preview chunks: %s/%s succeeded",
completed_count,
len(preview_output["preview"]),
)
return preview_output
def _get_preview_output(
self, chunk_structure: str, chunks: Any, dataset: Dataset | None = None, variable_pool: VariablePool | None = None
self,
chunk_structure: str,
chunks: Any,
dataset: Dataset | None = None,
variable_pool: VariablePool | None = None,
) -> Mapping[str, Any]:
index_processor = IndexProcessorFactory(chunk_structure).init_index_processor()
preview_output = index_processor.format_preview(chunks)
# If dataset is provided, try to enrich preview with summaries
if dataset and variable_pool:
document_id = variable_pool.get(["sys", SystemVariableKey.DOCUMENT_ID])
@ -420,7 +440,7 @@ class KnowledgeIndexNode(Node[KnowledgeIndexNodeData]):
)
.all()
)
if summaries:
# Create a map of segment content to summary for matching
# Use content matching as chunks in preview might not be indexed yet
@ -435,7 +455,7 @@ class KnowledgeIndexNode(Node[KnowledgeIndexNodeData]):
# Normalize content for matching (strip whitespace)
normalized_content = segment.content.strip()
summary_by_content[normalized_content] = summary.summary_content
# Enrich preview with summaries by content matching
if "preview" in preview_output and isinstance(preview_output["preview"], list):
matched_count = 0
@ -446,13 +466,15 @@ class KnowledgeIndexNode(Node[KnowledgeIndexNodeData]):
if normalized_chunk_content in summary_by_content:
preview_item["summary"] = summary_by_content[normalized_chunk_content]
matched_count += 1
if matched_count > 0:
logger.info(
f"Enriched preview with {matched_count} existing summaries "
f"(dataset: {dataset.id}, document: {document.id})"
"Enriched preview with %s existing summaries (dataset: %s, document: %s)",
matched_count,
dataset.id,
document.id,
)
return preview_output
@classmethod

View File

@ -33,7 +33,8 @@ document_fields = {
"hit_count": fields.Integer,
"doc_form": fields.String,
"doc_metadata": fields.List(fields.Nested(document_metadata_fields), attribute="doc_metadata_details"),
"summary_index_status": fields.String, # Summary index generation status: "waiting", "generating", "completed", "partial_error", or null if not enabled
# Summary index generation status: "GENERATING", "COMPLETED", "ERROR", or null if not enabled
"summary_index_status": fields.String,
"need_summary": fields.Boolean, # Whether this document needs summary index generation
}
@ -62,7 +63,8 @@ document_with_segments_fields = {
"completed_segments": fields.Integer,
"total_segments": fields.Integer,
"doc_metadata": fields.List(fields.Nested(document_metadata_fields), attribute="doc_metadata_details"),
"summary_index_status": fields.String, # Summary index generation status: "waiting", "generating", "completed", "partial_error", or null if not enabled
# Summary index generation status: "GENERATING", "COMPLETED", "ERROR", or null if not enabled
"summary_index_status": fields.String,
"need_summary": fields.Boolean, # Whether this document needs summary index generation
}

View File

@ -1595,8 +1595,9 @@ class DocumentSegmentSummary(Base):
disabled_at: Mapped[datetime | None] = mapped_column(DateTime, nullable=True)
disabled_by = mapped_column(StringUUID, nullable=True)
created_at: Mapped[datetime] = mapped_column(DateTime, nullable=False, server_default=func.current_timestamp())
updated_at: Mapped[datetime] = mapped_column(DateTime, nullable=False, server_default=func.current_timestamp(), onupdate=func.current_timestamp())
updated_at: Mapped[datetime] = mapped_column(
DateTime, nullable=False, server_default=func.current_timestamp(), onupdate=func.current_timestamp()
)
def __repr__(self):
return f"<DocumentSegmentSummary id={self.id} chunk_id={self.chunk_id} status={self.status}>"

View File

@ -87,9 +87,9 @@ from tasks.disable_segments_from_index_task import disable_segments_from_index_t
from tasks.document_indexing_update_task import document_indexing_update_task
from tasks.enable_segments_to_index_task import enable_segments_to_index_task
from tasks.recover_document_indexing_task import recover_document_indexing_task
from tasks.regenerate_summary_index_task import regenerate_summary_index_task
from tasks.remove_document_from_index_task import remove_document_from_index_task
from tasks.retry_document_indexing_task import retry_document_indexing_task
from tasks.regenerate_summary_index_task import regenerate_summary_index_task
from tasks.sync_website_document_indexing_task import sync_website_document_indexing_task
logger = logging.getLogger(__name__)
@ -563,9 +563,7 @@ class DatasetService:
action = DatasetService._handle_indexing_technique_change(dataset, data, filtered_data)
# Check if summary_index_setting model changed (before updating database)
summary_model_changed = DatasetService._check_summary_index_setting_model_changed(
dataset, data
)
summary_model_changed = DatasetService._check_summary_index_setting_model_changed(dataset, data)
# Add metadata fields
filtered_data["updated_by"] = user.id
@ -921,8 +919,12 @@ class DatasetService:
# Check if model changed
if old_model_name != new_model_name or old_model_provider != new_model_provider:
logger.info(
f"Summary index setting model changed for dataset {dataset.id}: "
f"old={old_model_provider}/{old_model_name}, new={new_model_provider}/{new_model_name}"
"Summary index setting model changed for dataset %s: old=%s/%s, new=%s/%s",
dataset.id,
old_model_provider,
old_model_name,
new_model_provider,
new_model_name,
)
return True
@ -2208,12 +2210,9 @@ class DocumentService:
):
# Set need_summary based on dataset's summary_index_setting
need_summary = False
if (
dataset.summary_index_setting
and dataset.summary_index_setting.get("enable") is True
):
if dataset.summary_index_setting and dataset.summary_index_setting.get("enable") is True:
need_summary = True
document = Document(
tenant_id=dataset.tenant_id,
dataset_id=dataset.id,
@ -3118,10 +3117,11 @@ class SegmentService:
and dataset.summary_index_setting
and dataset.summary_index_setting.get("enable") is True
)
if has_summary_index:
# Query existing summary from database
from models.dataset import DocumentSegmentSummary
existing_summary = (
db.session.query(DocumentSegmentSummary)
.where(
@ -3130,16 +3130,17 @@ class SegmentService:
)
.first()
)
# Check if summary has changed
existing_summary_content = existing_summary.summary_content if existing_summary else None
if existing_summary_content != args.summary:
# Summary has changed, update it
from services.summary_index_service import SummaryIndexService
try:
SummaryIndexService.update_summary_for_segment(segment, dataset, args.summary)
except Exception as e:
logger.exception(f"Failed to update summary for segment {segment.id}: {str(e)}")
except Exception:
logger.exception("Failed to update summary for segment %s", segment.id)
# Don't fail the entire update if summary update fails
else:
segment_hash = helper.generate_text_hash(content)
@ -3221,8 +3222,8 @@ class SegmentService:
try:
SummaryIndexService.update_summary_for_segment(segment, dataset, args.summary)
except Exception as e:
logger.exception(f"Failed to update summary for segment {segment.id}: {str(e)}")
except Exception:
logger.exception("Failed to update summary for segment %s", segment.id)
# Don't fail the entire update if summary update fails
# update multimodel vector index
VectorService.update_multimodel_vector(segment, args.attachment_ids or [], dataset)

View File

@ -3,7 +3,6 @@
import logging
import time
import uuid
from typing import Any
from core.rag.datasource.vdb.vector_factory import Vector
from core.rag.index_processor.constant.doc_type import DocType
@ -47,6 +46,7 @@ class SummaryIndexService:
tenant_id=dataset.tenant_id,
text=segment.content,
summary_index_setting=summary_index_setting,
segment_id=segment.id,
)
if not summary_content:
@ -76,11 +76,9 @@ class SummaryIndexService:
"""
# Check if summary record already exists
existing_summary = (
db.session.query(DocumentSegmentSummary)
.filter_by(chunk_id=segment.id, dataset_id=dataset.id)
.first()
db.session.query(DocumentSegmentSummary).filter_by(chunk_id=segment.id, dataset_id=dataset.id).first()
)
if existing_summary:
# Update existing record
existing_summary.summary_content = summary_content
@ -124,8 +122,8 @@ class SummaryIndexService:
"""
if dataset.indexing_technique != "high_quality":
logger.warning(
f"Summary vectorization skipped for dataset {dataset.id}: "
"indexing_technique is not high_quality"
"Summary vectorization skipped for dataset %s: indexing_technique is not high_quality",
dataset.id,
)
return
@ -137,10 +135,10 @@ class SummaryIndexService:
else:
# Generate new index node ID only for new summaries
summary_index_node_id = str(uuid.uuid4())
# Always regenerate hash (in case summary content changed)
summary_hash = helper.generate_text_hash(summary_record.summary_content)
# Delete old vector only if we're reusing the same index_node_id (to overwrite)
# If index_node_id changed, the old vector should have been deleted elsewhere
if old_summary_node_id and old_summary_node_id == summary_index_node_id:
@ -149,8 +147,9 @@ class SummaryIndexService:
vector.delete_by_ids([old_summary_node_id])
except Exception as e:
logger.warning(
f"Failed to delete old summary vector for segment {segment.id}: {str(e)}. "
"Continuing with new vectorization."
"Failed to delete old summary vector for segment %s: %s. Continuing with new vectorization.",
segment.id,
str(e),
)
# Create document with summary content and metadata
@ -170,12 +169,12 @@ class SummaryIndexService:
# Vectorize and store with retry mechanism for connection errors
max_retries = 3
retry_delay = 2.0
for attempt in range(max_retries):
try:
vector = Vector(dataset)
vector.add_texts([summary_document], duplicate_check=True)
# Success - update summary record with index node info
summary_record.summary_index_node_id = summary_index_node_id
summary_record.summary_index_node_hash = summary_hash
@ -183,29 +182,44 @@ class SummaryIndexService:
db.session.add(summary_record)
db.session.flush()
return # Success, exit function
except (ConnectionError, Exception) as e:
error_str = str(e).lower()
# Check if it's a connection-related error that might be transient
is_connection_error = any(keyword in error_str for keyword in [
"connection", "disconnected", "timeout", "network",
"could not connect", "server disconnected", "weaviate"
])
is_connection_error = any(
keyword in error_str
for keyword in [
"connection",
"disconnected",
"timeout",
"network",
"could not connect",
"server disconnected",
"weaviate",
]
)
if is_connection_error and attempt < max_retries - 1:
# Retry for connection errors
wait_time = retry_delay * (2 ** attempt) # Exponential backoff
wait_time = retry_delay * (2**attempt) # Exponential backoff
logger.warning(
f"Vectorization attempt {attempt + 1}/{max_retries} failed for segment {segment.id}: {str(e)}. "
f"Retrying in {wait_time:.1f} seconds..."
"Vectorization attempt %s/%s failed for segment %s: %s. Retrying in %.1f seconds...",
attempt + 1,
max_retries,
segment.id,
str(e),
wait_time,
)
time.sleep(wait_time)
continue
else:
# Final attempt failed or non-connection error - log and update status
logger.error(
f"Failed to vectorize summary for segment {segment.id} after {attempt + 1} attempts: {str(e)}",
exc_info=True
"Failed to vectorize summary for segment %s after %s attempts: %s",
segment.id,
attempt + 1,
str(e),
exc_info=True,
)
summary_record.status = "error"
summary_record.error = f"Vectorization failed: {str(e)}"
@ -235,9 +249,7 @@ class SummaryIndexService:
"""
try:
# Generate summary
summary_content = SummaryIndexService.generate_summary_for_segment(
segment, dataset, summary_index_setting
)
summary_content = SummaryIndexService.generate_summary_for_segment(segment, dataset, summary_index_setting)
# Create or update summary record (will handle overwrite internally)
summary_record = SummaryIndexService.create_summary_record(
@ -248,16 +260,14 @@ class SummaryIndexService:
SummaryIndexService.vectorize_summary(summary_record, segment, dataset)
db.session.commit()
logger.info(f"Successfully generated and vectorized summary for segment {segment.id}")
logger.info("Successfully generated and vectorized summary for segment %s", segment.id)
return summary_record
except Exception as e:
logger.exception(f"Failed to generate summary for segment {segment.id}: {str(e)}")
except Exception:
logger.exception("Failed to generate summary for segment %s", segment.id)
# Update summary record with error status if it exists
summary_record = (
db.session.query(DocumentSegmentSummary)
.filter_by(chunk_id=segment.id, dataset_id=dataset.id)
.first()
db.session.query(DocumentSegmentSummary).filter_by(chunk_id=segment.id, dataset_id=dataset.id).first()
)
if summary_record:
summary_record.status = "error"
@ -290,24 +300,27 @@ class SummaryIndexService:
# Only generate summary index for high_quality indexing technique
if dataset.indexing_technique != "high_quality":
logger.info(
f"Skipping summary generation for dataset {dataset.id}: "
f"indexing_technique is {dataset.indexing_technique}, not 'high_quality'"
"Skipping summary generation for dataset %s: indexing_technique is %s, not 'high_quality'",
dataset.id,
dataset.indexing_technique,
)
return []
if not summary_index_setting or not summary_index_setting.get("enable"):
logger.info(f"Summary index is disabled for dataset {dataset.id}")
logger.info("Summary index is disabled for dataset %s", dataset.id)
return []
# Skip qa_model documents
if document.doc_form == "qa_model":
logger.info(f"Skipping summary generation for qa_model document {document.id}")
logger.info("Skipping summary generation for qa_model document %s", document.id)
return []
logger.info(
f"Starting summary generation for document {document.id} in dataset {dataset.id}, "
f"segment_ids: {len(segment_ids) if segment_ids else 'all'}, "
f"only_parent_chunks: {only_parent_chunks}"
"Starting summary generation for document %s in dataset %s, segment_ids: %s, only_parent_chunks: %s",
document.id,
dataset.id,
len(segment_ids) if segment_ids else "all",
only_parent_chunks,
)
# Query segments (only enabled segments)
@ -324,7 +337,7 @@ class SummaryIndexService:
segments = query.all()
if not segments:
logger.info(f"No segments found for document {document.id}")
logger.info("No segments found for document %s", document.id)
return []
summary_records = []
@ -346,14 +359,15 @@ class SummaryIndexService:
segment, dataset, summary_index_setting
)
summary_records.append(summary_record)
except Exception as e:
logger.error(f"Failed to generate summary for segment {segment.id}: {str(e)}")
except Exception:
logger.exception("Failed to generate summary for segment %s", segment.id)
# Continue with other segments
continue
logger.info(
f"Completed summary generation for document {document.id}: "
f"{len(summary_records)} summaries generated and vectorized"
"Completed summary generation for document %s: %s summaries generated and vectorized",
document.id,
len(summary_records),
)
return summary_records
@ -373,7 +387,7 @@ class SummaryIndexService:
disabled_by: User ID who disabled the summaries
"""
from libs.datetime_utils import naive_utc_now
query = db.session.query(DocumentSegmentSummary).filter_by(
dataset_id=dataset.id,
enabled=True, # Only disable enabled summaries
@ -388,21 +402,21 @@ class SummaryIndexService:
return
logger.info(
f"Disabling {len(summaries)} summary records for dataset {dataset.id}, "
f"segment_ids: {len(segment_ids) if segment_ids else 'all'}"
"Disabling %s summary records for dataset %s, segment_ids: %s",
len(summaries),
dataset.id,
len(segment_ids) if segment_ids else "all",
)
# Remove from vector database (but keep records)
if dataset.indexing_technique == "high_quality":
summary_node_ids = [
s.summary_index_node_id for s in summaries if s.summary_index_node_id
]
summary_node_ids = [s.summary_index_node_id for s in summaries if s.summary_index_node_id]
if summary_node_ids:
try:
vector = Vector(dataset)
vector.delete_by_ids(summary_node_ids)
except Exception as e:
logger.warning(f"Failed to remove summary vectors: {str(e)}")
logger.warning("Failed to remove summary vectors: %s", str(e))
# Disable summary records (don't delete)
now = naive_utc_now()
@ -413,7 +427,7 @@ class SummaryIndexService:
db.session.add(summary)
db.session.commit()
logger.info(f"Disabled {len(summaries)} summary records for dataset {dataset.id}")
logger.info("Disabled %s summary records for dataset %s", len(summaries), dataset.id)
@staticmethod
def enable_summaries_for_segments(
@ -450,19 +464,25 @@ class SummaryIndexService:
return
logger.info(
f"Enabling {len(summaries)} summary records for dataset {dataset.id}, "
f"segment_ids: {len(segment_ids) if segment_ids else 'all'}"
"Enabling %s summary records for dataset %s, segment_ids: %s",
len(summaries),
dataset.id,
len(segment_ids) if segment_ids else "all",
)
# Re-vectorize and re-add to vector database
enabled_count = 0
for summary in summaries:
# Get the original segment
segment = db.session.query(DocumentSegment).filter_by(
id=summary.chunk_id,
dataset_id=dataset.id,
).first()
segment = (
db.session.query(DocumentSegment)
.filter_by(
id=summary.chunk_id,
dataset_id=dataset.id,
)
.first()
)
if not segment or not segment.enabled or segment.status != "completed":
continue
@ -472,20 +492,20 @@ class SummaryIndexService:
try:
# Re-vectorize summary
SummaryIndexService.vectorize_summary(summary, segment, dataset)
# Enable summary record
summary.enabled = True
summary.disabled_at = None
summary.disabled_by = None
db.session.add(summary)
enabled_count += 1
except Exception as e:
logger.error(f"Failed to re-vectorize summary {summary.id}: {str(e)}")
except Exception:
logger.exception("Failed to re-vectorize summary %s", summary.id)
# Keep it disabled if vectorization fails
continue
db.session.commit()
logger.info(f"Enabled {enabled_count} summary records for dataset {dataset.id}")
logger.info("Enabled %s summary records for dataset %s", enabled_count, dataset.id)
@staticmethod
def delete_summaries_for_segments(
@ -512,9 +532,7 @@ class SummaryIndexService:
# Delete from vector database
if dataset.indexing_technique == "high_quality":
summary_node_ids = [
s.summary_index_node_id for s in summaries if s.summary_index_node_id
]
summary_node_ids = [s.summary_index_node_id for s in summaries if s.summary_index_node_id]
if summary_node_ids:
vector = Vector(dataset)
vector.delete_by_ids(summary_node_ids)
@ -524,7 +542,7 @@ class SummaryIndexService:
db.session.delete(summary)
db.session.commit()
logger.info(f"Deleted {len(summaries)} summary records for dataset {dataset.id}")
logger.info("Deleted %s summary records for dataset %s", len(summaries), dataset.id)
@staticmethod
def update_summary_for_segment(
@ -559,9 +577,7 @@ class SummaryIndexService:
try:
# Find existing summary record
summary_record = (
db.session.query(DocumentSegmentSummary)
.filter_by(chunk_id=segment.id, dataset_id=dataset.id)
.first()
db.session.query(DocumentSegmentSummary).filter_by(chunk_id=segment.id, dataset_id=dataset.id).first()
)
if summary_record:
@ -583,7 +599,7 @@ class SummaryIndexService:
SummaryIndexService.vectorize_summary(summary_record, segment, dataset)
db.session.commit()
logger.info(f"Successfully updated and re-vectorized summary for segment {segment.id}")
logger.info("Successfully updated and re-vectorized summary for segment %s", segment.id)
return summary_record
else:
# Create new summary record if doesn't exist
@ -592,16 +608,14 @@ class SummaryIndexService:
)
SummaryIndexService.vectorize_summary(summary_record, segment, dataset)
db.session.commit()
logger.info(f"Successfully created and vectorized summary for segment {segment.id}")
logger.info("Successfully created and vectorized summary for segment %s", segment.id)
return summary_record
except Exception as e:
logger.exception(f"Failed to update summary for segment {segment.id}: {str(e)}")
except Exception:
logger.exception("Failed to update summary for segment %s", segment.id)
# Update summary record with error status if it exists
summary_record = (
db.session.query(DocumentSegmentSummary)
.filter_by(chunk_id=segment.id, dataset_id=dataset.id)
.first()
db.session.query(DocumentSegmentSummary).filter_by(chunk_id=segment.id, dataset_id=dataset.id).first()
)
if summary_record:
summary_record.status = "error"
@ -609,4 +623,3 @@ class SummaryIndexService:
db.session.add(summary_record)
db.session.commit()
raise

View File

@ -119,6 +119,7 @@ def add_document_to_index_task(dataset_document_id: str):
# Enable summary indexes for all segments in this document
from services.summary_index_service import SummaryIndexService
segment_ids_list = [segment.id for segment in segments]
if segment_ids_list:
try:
@ -127,7 +128,7 @@ def add_document_to_index_task(dataset_document_id: str):
segment_ids=segment_ids_list,
)
except Exception as e:
logger.warning(f"Failed to enable summaries for document {dataset_document.id}: {str(e)}")
logger.warning("Failed to enable summaries for document %s: %s", dataset_document.id, str(e))
end_at = time.perf_counter()
logger.info(

View File

@ -55,6 +55,7 @@ def disable_segment_from_index_task(segment_id: str):
# Disable summary index for this segment
from services.summary_index_service import SummaryIndexService
try:
SummaryIndexService.disable_summaries_for_segments(
dataset=dataset,
@ -62,7 +63,7 @@ def disable_segment_from_index_task(segment_id: str):
disabled_by=segment.disabled_by,
)
except Exception as e:
logger.warning(f"Failed to disable summary for segment {segment.id}: {str(e)}")
logger.warning("Failed to disable summary for segment %s: %s", segment.id, str(e))
index_type = dataset_document.doc_form
index_processor = IndexProcessorFactory(index_type).init_index_processor()

View File

@ -60,6 +60,7 @@ def disable_segments_from_index_task(segment_ids: list, dataset_id: str, documen
try:
# Disable summary indexes for these segments
from services.summary_index_service import SummaryIndexService
segment_ids_list = [segment.id for segment in segments]
try:
# Get disabled_by from first segment (they should all have the same disabled_by)
@ -70,7 +71,7 @@ def disable_segments_from_index_task(segment_ids: list, dataset_id: str, documen
disabled_by=disabled_by,
)
except Exception as e:
logger.warning(f"Failed to disable summaries for segments: {str(e)}")
logger.warning("Failed to disable summaries for segments: %s", str(e))
index_node_ids = [segment.index_node_id for segment in segments]
if dataset.is_multimodal:

View File

@ -8,13 +8,13 @@ from celery import shared_task
from configs import dify_config
from core.entities.document_task import DocumentTask
from core.indexing_runner import DocumentIsPausedError, IndexingRunner
from tasks.generate_summary_index_task import generate_summary_index_task
from core.rag.pipeline.queue import TenantIsolatedTaskQueue
from enums.cloud_plan import CloudPlan
from extensions.ext_database import db
from libs.datetime_utils import naive_utc_now
from models.dataset import Dataset, Document
from services.feature_service import FeatureService
from tasks.generate_summary_index_task import generate_summary_index_task
logger = logging.getLogger(__name__)
@ -101,15 +101,15 @@ def _document_indexing(dataset_id: str, document_ids: Sequence[str]):
indexing_runner.run(documents)
end_at = time.perf_counter()
logger.info(click.style(f"Processed dataset: {dataset_id} latency: {end_at - start_at}", fg="green"))
# Trigger summary index generation for completed documents if enabled
# Only generate for high_quality indexing technique and when summary_index_setting is enabled
# Re-query dataset to get latest summary_index_setting (in case it was updated)
dataset = db.session.query(Dataset).where(Dataset.id == dataset_id).first()
if not dataset:
logger.warning(f"Dataset {dataset_id} not found after indexing")
logger.warning("Dataset %s not found after indexing", dataset_id)
return
if dataset.indexing_technique == "high_quality":
summary_index_setting = dataset.summary_index_setting
if summary_index_setting and summary_index_setting.get("enable"):
@ -123,37 +123,46 @@ def _document_indexing(dataset_id: str, document_ids: Sequence[str]):
)
if document:
logger.info(
f"Checking document {document_id} for summary generation: "
f"status={document.indexing_status}, doc_form={document.doc_form}"
"Checking document %s for summary generation: status=%s, doc_form=%s",
document_id,
document.indexing_status,
document.doc_form,
)
if document.indexing_status == "completed" and document.doc_form != "qa_model":
try:
generate_summary_index_task.delay(dataset.id, document_id, None)
logger.info(
f"Queued summary index generation task for document {document_id} "
f"in dataset {dataset.id} after indexing completed"
"Queued summary index generation task for document %s in dataset %s "
"after indexing completed",
document_id,
dataset.id,
)
except Exception as e:
except Exception:
logger.exception(
f"Failed to queue summary index generation task for document {document_id}: {str(e)}"
"Failed to queue summary index generation task for document %s",
document_id,
)
# Don't fail the entire indexing process if summary task queuing fails
else:
logger.info(
f"Skipping summary generation for document {document_id}: "
f"status={document.indexing_status}, doc_form={document.doc_form}"
"Skipping summary generation for document %s: status=%s, doc_form=%s",
document_id,
document.indexing_status,
document.doc_form,
)
else:
logger.warning(f"Document {document_id} not found after indexing")
logger.warning("Document %s not found after indexing", document_id)
else:
logger.info(
f"Summary index generation skipped for dataset {dataset.id}: "
f"summary_index_setting.enable={summary_index_setting.get('enable') if summary_index_setting else None}"
"Summary index generation skipped for dataset %s: summary_index_setting.enable=%s",
dataset.id,
summary_index_setting.get("enable") if summary_index_setting else None,
)
else:
logger.info(
f"Summary index generation skipped for dataset {dataset.id}: "
f"indexing_technique={dataset.indexing_technique} (not 'high_quality')"
"Summary index generation skipped for dataset %s: indexing_technique=%s (not 'high_quality')",
dataset.id,
dataset.indexing_technique,
)
except DocumentIsPausedError as ex:
logger.info(click.style(str(ex), fg="yellow"))

View File

@ -105,13 +105,14 @@ def enable_segment_to_index_task(segment_id: str):
# Enable summary index for this segment
from services.summary_index_service import SummaryIndexService
try:
SummaryIndexService.enable_summaries_for_segments(
dataset=dataset,
segment_ids=[segment.id],
)
except Exception as e:
logger.warning(f"Failed to enable summary for segment {segment.id}: {str(e)}")
logger.warning("Failed to enable summary for segment %s: %s", segment.id, str(e))
end_at = time.perf_counter()
logger.info(click.style(f"Segment enabled to index: {segment.id} latency: {end_at - start_at}", fg="green"))

View File

@ -110,6 +110,7 @@ def enable_segments_to_index_task(segment_ids: list, dataset_id: str, document_i
# Enable summary indexes for these segments
from services.summary_index_service import SummaryIndexService
segment_ids_list = [segment.id for segment in segments]
try:
SummaryIndexService.enable_summaries_for_segments(
@ -117,7 +118,7 @@ def enable_segments_to_index_task(segment_ids: list, dataset_id: str, document_i
segment_ids=segment_ids_list,
)
except Exception as e:
logger.warning(f"Failed to enable summaries for segments: {str(e)}")
logger.warning("Failed to enable summaries for segments: %s", str(e))
end_at = time.perf_counter()
logger.info(click.style(f"Segments enabled to index latency: {end_at - start_at}", fg="green"))

View File

@ -94,8 +94,8 @@ def generate_summary_index_task(dataset_id: str, document_id: str, segment_ids:
)
)
except Exception as e:
logger.exception(f"Failed to generate summary index for document {document_id}: {str(e)}")
except Exception:
logger.exception("Failed to generate summary index for document %s", document_id)
# Update document segments with error status if needed
if segment_ids:
db.session.query(DocumentSegment).filter(
@ -110,4 +110,3 @@ def generate_summary_index_task(dataset_id: str, document_id: str, segment_ids:
db.session.commit()
finally:
db.session.close()

View File

@ -2,7 +2,6 @@
import logging
import time
from typing import Any
import click
from celery import shared_task
@ -24,13 +23,13 @@ def regenerate_summary_index_task(
):
"""
Regenerate summary indexes for all documents in a dataset.
This task is triggered when:
1. summary_index_setting model changes (regenerate_reason="summary_model_changed")
- Regenerates summary content and vectors for all existing summaries
2. embedding_model changes (regenerate_reason="embedding_model_changed")
- Only regenerates vectors for existing summaries (keeps summary content)
Args:
dataset_id: Dataset ID
regenerate_reason: Reason for regeneration ("summary_model_changed" or "embedding_model_changed")
@ -96,7 +95,9 @@ def regenerate_summary_index_task(
return
logger.info(
f"Found {len(dataset_documents)} documents for summary regeneration in dataset {dataset_id}"
"Found %s documents for summary regeneration in dataset %s",
len(dataset_documents),
dataset_id,
)
total_segments_processed = 0
@ -130,7 +131,9 @@ def regenerate_summary_index_task(
continue
logger.info(
f"Regenerating summaries for {len(segments)} segments in document {dataset_document.id}"
"Regenerating summaries for %s segments in document %s",
len(segments),
dataset_document.id,
)
for segment in segments:
@ -146,9 +149,7 @@ def regenerate_summary_index_task(
)
if not summary_record:
logger.warning(
f"Summary record not found for segment {segment.id}, skipping"
)
logger.warning("Summary record not found for segment %s, skipping", segment.id)
continue
if regenerate_vectors_only:
@ -162,26 +163,26 @@ def regenerate_summary_index_task(
vector.delete_by_ids([summary_record.summary_index_node_id])
except Exception as e:
logger.warning(
f"Failed to delete old summary vector for segment {segment.id}: {str(e)}"
"Failed to delete old summary vector for segment %s: %s",
segment.id,
str(e),
)
# Re-vectorize with new embedding model
SummaryIndexService.vectorize_summary(
summary_record, segment, dataset
)
SummaryIndexService.vectorize_summary(summary_record, segment, dataset)
db.session.commit()
else:
# Regenerate both summary content and vectors (for summary_model change)
SummaryIndexService.generate_and_vectorize_summary(
segment, dataset, summary_index_setting
)
SummaryIndexService.generate_and_vectorize_summary(segment, dataset, summary_index_setting)
db.session.commit()
total_segments_processed += 1
except Exception as e:
logger.error(
f"Failed to regenerate summary for segment {segment.id}: {str(e)}",
"Failed to regenerate summary for segment %s: %s",
segment.id,
str(e),
exc_info=True,
)
total_segments_failed += 1
@ -195,7 +196,9 @@ def regenerate_summary_index_task(
except Exception as e:
logger.error(
f"Failed to process document {dataset_document.id} for summary regeneration: {str(e)}",
"Failed to process document %s for summary regeneration: %s",
dataset_document.id,
str(e),
exc_info=True,
)
continue
@ -213,7 +216,6 @@ def regenerate_summary_index_task(
)
except Exception:
logger.exception(f"Regenerate summary index failed for dataset {dataset_id}")
logger.exception("Regenerate summary index failed for dataset %s", dataset_id)
finally:
db.session.close()

View File

@ -47,9 +47,10 @@ def remove_document_from_index_task(document_id: str):
index_processor = IndexProcessorFactory(document.doc_form).init_index_processor()
segments = db.session.scalars(select(DocumentSegment).where(DocumentSegment.document_id == document.id)).all()
# Disable summary indexes for all segments in this document
from services.summary_index_service import SummaryIndexService
segment_ids_list = [segment.id for segment in segments]
if segment_ids_list:
try:
@ -59,8 +60,8 @@ def remove_document_from_index_task(document_id: str):
disabled_by=document.disabled_by,
)
except Exception as e:
logger.warning(f"Failed to disable summaries for document {document.id}: {str(e)}")
logger.warning("Failed to disable summaries for document %s: %s", document.id, str(e))
index_node_ids = [segment.index_node_id for segment in segments]
if index_node_ids:
try: