From 008a5f361dba0893cccb6823cd5c85cf6c5f1342 Mon Sep 17 00:00:00 2001 From: FFXN Date: Tue, 20 Jan 2026 11:53:16 +0800 Subject: [PATCH] fix: fix summary index bug. --- .../console/datasets/datasets_document.py | 29 ++++----- .../knowledge_index/knowledge_index_node.py | 60 ++++++++++++------- api/fields/document_fields.py | 9 ++- api/services/summary_index_service.py | 11 ++-- 4 files changed, 63 insertions(+), 46 deletions(-) diff --git a/api/controllers/console/datasets/datasets_document.py b/api/controllers/console/datasets/datasets_document.py index 1ca9a615e3..85c2f33222 100644 --- a/api/controllers/console/datasets/datasets_document.py +++ b/api/controllers/console/datasets/datasets_document.py @@ -361,8 +361,8 @@ class DatasetDocumentListApi(Resource): for doc_id in document_ids_need_summary: segment_ids = document_segments_map.get(doc_id, []) if not segment_ids: - # No segments, status is "GENERATING" (waiting to generate) - summary_status_map[doc_id] = "GENERATING" + # No segments, status is None (not started) + summary_status_map[doc_id] = None continue # Count summary statuses for this document's segments @@ -374,28 +374,23 @@ class DatasetDocumentListApi(Resource): else: status_counts["not_started"] += 1 - total_segments = len(segment_ids) - completed_count = status_counts["completed"] generating_count = status_counts["generating"] - error_count = status_counts["error"] - # Determine overall status (only three states: GENERATING, COMPLETED, ERROR) - if completed_count == total_segments: - summary_status_map[doc_id] = "COMPLETED" - elif error_count > 0: - # Has errors (even if some are completed or generating) - summary_status_map[doc_id] = "ERROR" - elif generating_count > 0 or status_counts["not_started"] > 0: - # Still generating or not started - summary_status_map[doc_id] = "GENERATING" + # Determine overall status: + # - "SUMMARIZING" only when task is queued and at least one summary is generating + # - None (empty) for all other cases (not queued, all completed/error) + if generating_count > 0: + # Task is queued and at least one summary is still generating + summary_status_map[doc_id] = "SUMMARIZING" else: - # Default to generating - summary_status_map[doc_id] = "GENERATING" + # Task not queued yet, or all summaries are completed/error (task finished) + summary_status_map[doc_id] = None # Add summary_index_status to each document for document in documents: if has_summary_index and document.need_summary is True: - document.summary_index_status = summary_status_map.get(str(document.id), "GENERATING") + # Get status from map, default to None (not queued yet) + document.summary_index_status = summary_status_map.get(str(document.id)) else: # Return null if summary index is not enabled or document doesn't need summary document.summary_index_status = None diff --git a/api/core/workflow/nodes/knowledge_index/knowledge_index_node.py b/api/core/workflow/nodes/knowledge_index/knowledge_index_node.py index d14bdee1fd..366fe24f60 100644 --- a/api/core/workflow/nodes/knowledge_index/knowledge_index_node.py +++ b/api/core/workflow/nodes/knowledge_index/knowledge_index_node.py @@ -356,19 +356,9 @@ class KnowledgeIndexNode(Node[KnowledgeIndexNodeData]): def generate_summary_for_chunk(preview_item: dict) -> None: """Generate summary for a single chunk.""" if "content" in preview_item: - try: - # Set Flask application context in worker thread - if flask_app: - with flask_app.app_context(): - summary = ParagraphIndexProcessor.generate_summary( - tenant_id=dataset.tenant_id, - text=preview_item["content"], - summary_index_setting=summary_index_setting, - ) - if summary: - preview_item["summary"] = summary - else: - # Fallback: try without app context (may fail) + # Set Flask application context in worker thread + if flask_app: + with flask_app.app_context(): summary = ParagraphIndexProcessor.generate_summary( tenant_id=dataset.tenant_id, text=preview_item["content"], @@ -376,13 +366,21 @@ class KnowledgeIndexNode(Node[KnowledgeIndexNodeData]): ) if summary: preview_item["summary"] = summary - except Exception: - logger.exception("Failed to generate summary for chunk") - # Don't fail the entire preview if summary generation fails + else: + # Fallback: try without app context (may fail) + summary = ParagraphIndexProcessor.generate_summary( + tenant_id=dataset.tenant_id, + text=preview_item["content"], + summary_index_setting=summary_index_setting, + ) + if summary: + preview_item["summary"] = summary # Generate summaries concurrently using ThreadPoolExecutor # Set a reasonable timeout to prevent hanging (60 seconds per chunk, max 5 minutes total) timeout_seconds = min(300, 60 * len(preview_output["preview"])) + errors: list[Exception] = [] + with concurrent.futures.ThreadPoolExecutor(max_workers=min(10, len(preview_output["preview"]))) as executor: futures = [ executor.submit(generate_summary_for_chunk, preview_item) @@ -393,17 +391,37 @@ class KnowledgeIndexNode(Node[KnowledgeIndexNodeData]): # Cancel tasks that didn't complete in time if not_done: - logger.warning( - "Summary generation timeout: %s chunks did not complete within %ss. " - "Cancelling remaining tasks...", - len(not_done), - timeout_seconds, + timeout_error_msg = ( + f"Summary generation timeout: {len(not_done)} chunks did not complete within {timeout_seconds}s" ) + logger.warning("%s. Cancelling remaining tasks...", timeout_error_msg) + # In preview mode, timeout is also an error + errors.append(TimeoutError(timeout_error_msg)) for future in not_done: future.cancel() # Wait a bit for cancellation to take effect concurrent.futures.wait(not_done, timeout=5) + # Collect exceptions from completed futures + for future in done: + try: + future.result() # This will raise any exception that occurred + except Exception as e: + logger.exception("Error in summary generation future") + errors.append(e) + + # In preview mode, if there are any errors, fail the request + if errors: + error_messages = [str(e) for e in errors] + error_summary = ( + f"Failed to generate summaries for {len(errors)} chunk(s). " + f"Errors: {'; '.join(error_messages[:3])}" # Show first 3 errors + ) + if len(errors) > 3: + error_summary += f" (and {len(errors) - 3} more)" + logger.error("Summary generation failed in preview mode: %s", error_summary) + raise KnowledgeIndexNodeError(error_summary) + completed_count = sum(1 for item in preview_output["preview"] if item.get("summary") is not None) logger.info( "Completed summary generation for preview chunks: %s/%s succeeded", diff --git a/api/fields/document_fields.py b/api/fields/document_fields.py index 875726d31d..35a2a04f3e 100644 --- a/api/fields/document_fields.py +++ b/api/fields/document_fields.py @@ -33,9 +33,11 @@ document_fields = { "hit_count": fields.Integer, "doc_form": fields.String, "doc_metadata": fields.List(fields.Nested(document_metadata_fields), attribute="doc_metadata_details"), - # Summary index generation status: "GENERATING", "COMPLETED", "ERROR", or null if not enabled + # Summary index generation status: + # "SUMMARIZING" (when task is queued and generating) "summary_index_status": fields.String, - "need_summary": fields.Boolean, # Whether this document needs summary index generation + # Whether this document needs summary index generation + "need_summary": fields.Boolean, } document_with_segments_fields = { @@ -63,7 +65,8 @@ document_with_segments_fields = { "completed_segments": fields.Integer, "total_segments": fields.Integer, "doc_metadata": fields.List(fields.Nested(document_metadata_fields), attribute="doc_metadata_details"), - # Summary index generation status: "GENERATING", "COMPLETED", "ERROR", or null if not enabled + # Summary index generation status: + # "SUMMARIZING" (when task is queued and generating) "summary_index_status": fields.String, "need_summary": fields.Boolean, # Whether this document needs summary index generation } diff --git a/api/services/summary_index_service.py b/api/services/summary_index_service.py index d2cf23cb1c..1ab2ac510f 100644 --- a/api/services/summary_index_service.py +++ b/api/services/summary_index_service.py @@ -437,6 +437,11 @@ class SummaryIndexService: """ Enable summary records and re-add vectors to vector database for segments. + Note: This method enables summaries based on chunk status, not summary_index_setting.enable. + The summary_index_setting.enable flag only controls automatic generation, + not whether existing summaries can be used. + Summary.enabled should always be kept in sync with chunk.enabled. + Args: dataset: Dataset containing the segments segment_ids: List of segment IDs to enable summaries for. If None, enable all. @@ -445,11 +450,6 @@ class SummaryIndexService: if dataset.indexing_technique != "high_quality": return - # Check if summary index is enabled - summary_index_setting = dataset.summary_index_setting - if not summary_index_setting or not summary_index_setting.get("enable"): - return - query = db.session.query(DocumentSegmentSummary).filter_by( dataset_id=dataset.id, enabled=False, # Only enable disabled summaries @@ -483,6 +483,7 @@ class SummaryIndexService: .first() ) + # Summary.enabled stays in sync with chunk.enabled, only enable summary if the associated chunk is enabled. if not segment or not segment.enabled or segment.status != "completed": continue