fix: fix summary index bug.

This commit is contained in:
FFXN 2026-01-20 11:53:16 +08:00
parent 4fb08ae7d2
commit 008a5f361d
4 changed files with 63 additions and 46 deletions

View File

@ -361,8 +361,8 @@ class DatasetDocumentListApi(Resource):
for doc_id in document_ids_need_summary:
segment_ids = document_segments_map.get(doc_id, [])
if not segment_ids:
# No segments, status is "GENERATING" (waiting to generate)
summary_status_map[doc_id] = "GENERATING"
# No segments, status is None (not started)
summary_status_map[doc_id] = None
continue
# Count summary statuses for this document's segments
@ -374,28 +374,23 @@ class DatasetDocumentListApi(Resource):
else:
status_counts["not_started"] += 1
total_segments = len(segment_ids)
completed_count = status_counts["completed"]
generating_count = status_counts["generating"]
error_count = status_counts["error"]
# Determine overall status (only three states: GENERATING, COMPLETED, ERROR)
if completed_count == total_segments:
summary_status_map[doc_id] = "COMPLETED"
elif error_count > 0:
# Has errors (even if some are completed or generating)
summary_status_map[doc_id] = "ERROR"
elif generating_count > 0 or status_counts["not_started"] > 0:
# Still generating or not started
summary_status_map[doc_id] = "GENERATING"
# Determine overall status:
# - "SUMMARIZING" only when task is queued and at least one summary is generating
# - None (empty) for all other cases (not queued, all completed/error)
if generating_count > 0:
# Task is queued and at least one summary is still generating
summary_status_map[doc_id] = "SUMMARIZING"
else:
# Default to generating
summary_status_map[doc_id] = "GENERATING"
# Task not queued yet, or all summaries are completed/error (task finished)
summary_status_map[doc_id] = None
# Add summary_index_status to each document
for document in documents:
if has_summary_index and document.need_summary is True:
document.summary_index_status = summary_status_map.get(str(document.id), "GENERATING")
# Get status from map, default to None (not queued yet)
document.summary_index_status = summary_status_map.get(str(document.id))
else:
# Return null if summary index is not enabled or document doesn't need summary
document.summary_index_status = None

View File

@ -356,19 +356,9 @@ class KnowledgeIndexNode(Node[KnowledgeIndexNodeData]):
def generate_summary_for_chunk(preview_item: dict) -> None:
"""Generate summary for a single chunk."""
if "content" in preview_item:
try:
# Set Flask application context in worker thread
if flask_app:
with flask_app.app_context():
summary = ParagraphIndexProcessor.generate_summary(
tenant_id=dataset.tenant_id,
text=preview_item["content"],
summary_index_setting=summary_index_setting,
)
if summary:
preview_item["summary"] = summary
else:
# Fallback: try without app context (may fail)
# Set Flask application context in worker thread
if flask_app:
with flask_app.app_context():
summary = ParagraphIndexProcessor.generate_summary(
tenant_id=dataset.tenant_id,
text=preview_item["content"],
@ -376,13 +366,21 @@ class KnowledgeIndexNode(Node[KnowledgeIndexNodeData]):
)
if summary:
preview_item["summary"] = summary
except Exception:
logger.exception("Failed to generate summary for chunk")
# Don't fail the entire preview if summary generation fails
else:
# Fallback: try without app context (may fail)
summary = ParagraphIndexProcessor.generate_summary(
tenant_id=dataset.tenant_id,
text=preview_item["content"],
summary_index_setting=summary_index_setting,
)
if summary:
preview_item["summary"] = summary
# Generate summaries concurrently using ThreadPoolExecutor
# Set a reasonable timeout to prevent hanging (60 seconds per chunk, max 5 minutes total)
timeout_seconds = min(300, 60 * len(preview_output["preview"]))
errors: list[Exception] = []
with concurrent.futures.ThreadPoolExecutor(max_workers=min(10, len(preview_output["preview"]))) as executor:
futures = [
executor.submit(generate_summary_for_chunk, preview_item)
@ -393,17 +391,37 @@ class KnowledgeIndexNode(Node[KnowledgeIndexNodeData]):
# Cancel tasks that didn't complete in time
if not_done:
logger.warning(
"Summary generation timeout: %s chunks did not complete within %ss. "
"Cancelling remaining tasks...",
len(not_done),
timeout_seconds,
timeout_error_msg = (
f"Summary generation timeout: {len(not_done)} chunks did not complete within {timeout_seconds}s"
)
logger.warning("%s. Cancelling remaining tasks...", timeout_error_msg)
# In preview mode, timeout is also an error
errors.append(TimeoutError(timeout_error_msg))
for future in not_done:
future.cancel()
# Wait a bit for cancellation to take effect
concurrent.futures.wait(not_done, timeout=5)
# Collect exceptions from completed futures
for future in done:
try:
future.result() # This will raise any exception that occurred
except Exception as e:
logger.exception("Error in summary generation future")
errors.append(e)
# In preview mode, if there are any errors, fail the request
if errors:
error_messages = [str(e) for e in errors]
error_summary = (
f"Failed to generate summaries for {len(errors)} chunk(s). "
f"Errors: {'; '.join(error_messages[:3])}" # Show first 3 errors
)
if len(errors) > 3:
error_summary += f" (and {len(errors) - 3} more)"
logger.error("Summary generation failed in preview mode: %s", error_summary)
raise KnowledgeIndexNodeError(error_summary)
completed_count = sum(1 for item in preview_output["preview"] if item.get("summary") is not None)
logger.info(
"Completed summary generation for preview chunks: %s/%s succeeded",

View File

@ -33,9 +33,11 @@ document_fields = {
"hit_count": fields.Integer,
"doc_form": fields.String,
"doc_metadata": fields.List(fields.Nested(document_metadata_fields), attribute="doc_metadata_details"),
# Summary index generation status: "GENERATING", "COMPLETED", "ERROR", or null if not enabled
# Summary index generation status:
# "SUMMARIZING" (when task is queued and generating)
"summary_index_status": fields.String,
"need_summary": fields.Boolean, # Whether this document needs summary index generation
# Whether this document needs summary index generation
"need_summary": fields.Boolean,
}
document_with_segments_fields = {
@ -63,7 +65,8 @@ document_with_segments_fields = {
"completed_segments": fields.Integer,
"total_segments": fields.Integer,
"doc_metadata": fields.List(fields.Nested(document_metadata_fields), attribute="doc_metadata_details"),
# Summary index generation status: "GENERATING", "COMPLETED", "ERROR", or null if not enabled
# Summary index generation status:
# "SUMMARIZING" (when task is queued and generating)
"summary_index_status": fields.String,
"need_summary": fields.Boolean, # Whether this document needs summary index generation
}

View File

@ -437,6 +437,11 @@ class SummaryIndexService:
"""
Enable summary records and re-add vectors to vector database for segments.
Note: This method enables summaries based on chunk status, not summary_index_setting.enable.
The summary_index_setting.enable flag only controls automatic generation,
not whether existing summaries can be used.
Summary.enabled should always be kept in sync with chunk.enabled.
Args:
dataset: Dataset containing the segments
segment_ids: List of segment IDs to enable summaries for. If None, enable all.
@ -445,11 +450,6 @@ class SummaryIndexService:
if dataset.indexing_technique != "high_quality":
return
# Check if summary index is enabled
summary_index_setting = dataset.summary_index_setting
if not summary_index_setting or not summary_index_setting.get("enable"):
return
query = db.session.query(DocumentSegmentSummary).filter_by(
dataset_id=dataset.id,
enabled=False, # Only enable disabled summaries
@ -483,6 +483,7 @@ class SummaryIndexService:
.first()
)
# Summary.enabled stays in sync with chunk.enabled, only enable summary if the associated chunk is enabled.
if not segment or not segment.enabled or segment.status != "completed":
continue