diff --git a/api/core/rag/index_processor/processor/paragraph_index_processor.py b/api/core/rag/index_processor/processor/paragraph_index_processor.py index 930abd6bc6..0bf1b1e30a 100644 --- a/api/core/rag/index_processor/processor/paragraph_index_processor.py +++ b/api/core/rag/index_processor/processor/paragraph_index_processor.py @@ -277,6 +277,7 @@ class ParagraphIndexProcessor(BaseIndexProcessor): """ For each segment, concurrently call generate_summary to generate a summary and write it to the summary attribute of PreviewDetail. + In preview mode (indexing-estimate), if any summary generation fails, the method will raise an exception. """ import concurrent.futures @@ -291,23 +292,62 @@ class ParagraphIndexProcessor(BaseIndexProcessor): def process(preview: PreviewDetail) -> None: """Generate summary for a single preview item.""" - try: - if flask_app: - # Ensure Flask app context in worker thread - with flask_app.app_context(): - summary = self.generate_summary(tenant_id, preview.content, summary_index_setting) - preview.summary = summary - else: - # Fallback: try without app context (may fail) + if flask_app: + # Ensure Flask app context in worker thread + with flask_app.app_context(): summary = self.generate_summary(tenant_id, preview.content, summary_index_setting) preview.summary = summary - except Exception: - logger.exception("Failed to generate summary for preview") - # Don't fail the entire preview if summary generation fails - preview.summary = None + else: + # Fallback: try without app context (may fail) + summary = self.generate_summary(tenant_id, preview.content, summary_index_setting) + preview.summary = summary + + # Generate summaries concurrently using ThreadPoolExecutor + # Set a reasonable timeout to prevent hanging (60 seconds per chunk, max 5 minutes total) + timeout_seconds = min(300, 60 * len(preview_texts)) + errors: list[Exception] = [] + + with concurrent.futures.ThreadPoolExecutor(max_workers=min(10, len(preview_texts))) as executor: + futures = [ + executor.submit(process, preview) + for preview in preview_texts + ] + # Wait for all tasks to complete with timeout + done, not_done = concurrent.futures.wait(futures, timeout=timeout_seconds) + + # Cancel tasks that didn't complete in time + if not_done: + timeout_error_msg = ( + f"Summary generation timeout: {len(not_done)} chunks did not complete within {timeout_seconds}s" + ) + logger.warning("%s. Cancelling remaining tasks...", timeout_error_msg) + # In preview mode, timeout is also an error + errors.append(TimeoutError(timeout_error_msg)) + for future in not_done: + future.cancel() + # Wait a bit for cancellation to take effect + concurrent.futures.wait(not_done, timeout=5) + + # Collect exceptions from completed futures + for future in done: + try: + future.result() # This will raise any exception that occurred + except Exception as e: + logger.exception("Error in summary generation future") + errors.append(e) + + # In preview mode (indexing-estimate), if there are any errors, fail the request + if errors: + error_messages = [str(e) for e in errors] + error_summary = ( + f"Failed to generate summaries for {len(errors)} chunk(s). " + f"Errors: {'; '.join(error_messages[:3])}" # Show first 3 errors + ) + if len(errors) > 3: + error_summary += f" (and {len(errors) - 3} more)" + logger.error("Summary generation failed in preview mode: %s", error_summary) + raise ValueError(error_summary) - with concurrent.futures.ThreadPoolExecutor() as executor: - list(executor.map(process, preview_texts)) return preview_texts @staticmethod diff --git a/api/core/rag/index_processor/processor/parent_child_index_processor.py b/api/core/rag/index_processor/processor/parent_child_index_processor.py index ccb1c55b72..8c803621b8 100644 --- a/api/core/rag/index_processor/processor/parent_child_index_processor.py +++ b/api/core/rag/index_processor/processor/parent_child_index_processor.py @@ -361,6 +361,7 @@ class ParentChildIndexProcessor(BaseIndexProcessor): """ For each parent chunk in preview_texts, concurrently call generate_summary to generate a summary and write it to the summary attribute of PreviewDetail. + In preview mode (indexing-estimate), if any summary generation fails, the method will raise an exception. Note: For parent-child structure, we only generate summaries for parent chunks. """ @@ -377,34 +378,69 @@ class ParentChildIndexProcessor(BaseIndexProcessor): def process(preview: PreviewDetail) -> None: """Generate summary for a single preview item (parent chunk).""" - try: - if flask_app: - # Ensure Flask app context in worker thread - with flask_app.app_context(): - # Use ParagraphIndexProcessor's generate_summary method - from core.rag.index_processor.processor.paragraph_index_processor import ParagraphIndexProcessor - summary = ParagraphIndexProcessor.generate_summary( - tenant_id=tenant_id, - text=preview.content, - summary_index_setting=summary_index_setting, - ) - if summary: - preview.summary = summary - else: - # Fallback: try without app context (may fail) - from core.rag.index_processor.processor.paragraph_index_processor import ParagraphIndexProcessor + from core.rag.index_processor.processor.paragraph_index_processor import ParagraphIndexProcessor + if flask_app: + # Ensure Flask app context in worker thread + with flask_app.app_context(): summary = ParagraphIndexProcessor.generate_summary( tenant_id=tenant_id, text=preview.content, summary_index_setting=summary_index_setting, ) - if summary: - preview.summary = summary - except Exception: - logger.exception("Failed to generate summary for preview") - # Don't fail the entire preview if summary generation fails - preview.summary = None + preview.summary = summary + else: + # Fallback: try without app context (may fail) + summary = ParagraphIndexProcessor.generate_summary( + tenant_id=tenant_id, + text=preview.content, + summary_index_setting=summary_index_setting, + ) + preview.summary = summary + + # Generate summaries concurrently using ThreadPoolExecutor + # Set a reasonable timeout to prevent hanging (60 seconds per chunk, max 5 minutes total) + timeout_seconds = min(300, 60 * len(preview_texts)) + errors: list[Exception] = [] + + with concurrent.futures.ThreadPoolExecutor(max_workers=min(10, len(preview_texts))) as executor: + futures = [ + executor.submit(process, preview) + for preview in preview_texts + ] + # Wait for all tasks to complete with timeout + done, not_done = concurrent.futures.wait(futures, timeout=timeout_seconds) + + # Cancel tasks that didn't complete in time + if not_done: + timeout_error_msg = ( + f"Summary generation timeout: {len(not_done)} chunks did not complete within {timeout_seconds}s" + ) + logger.warning("%s. Cancelling remaining tasks...", timeout_error_msg) + # In preview mode, timeout is also an error + errors.append(TimeoutError(timeout_error_msg)) + for future in not_done: + future.cancel() + # Wait a bit for cancellation to take effect + concurrent.futures.wait(not_done, timeout=5) + + # Collect exceptions from completed futures + for future in done: + try: + future.result() # This will raise any exception that occurred + except Exception as e: + logger.exception("Error in summary generation future") + errors.append(e) + + # In preview mode (indexing-estimate), if there are any errors, fail the request + if errors: + error_messages = [str(e) for e in errors] + error_summary = ( + f"Failed to generate summaries for {len(errors)} chunk(s). " + f"Errors: {'; '.join(error_messages[:3])}" # Show first 3 errors + ) + if len(errors) > 3: + error_summary += f" (and {len(errors) - 3} more)" + logger.error("Summary generation failed in preview mode: %s", error_summary) + raise ValueError(error_summary) - with concurrent.futures.ThreadPoolExecutor() as executor: - list(executor.map(process, preview_texts)) return preview_texts