diff --git a/api/core/indexing_runner.py b/api/core/indexing_runner.py index 7822ed4268..c430fba0b9 100644 --- a/api/core/indexing_runner.py +++ b/api/core/indexing_runner.py @@ -49,62 +49,80 @@ class IndexingRunner: self.storage = storage self.model_manager = ModelManager() + def _handle_indexing_error(self, document_id: str, error: Exception) -> None: + """Handle indexing errors by updating document status.""" + logger.exception("consume document failed") + document = db.session.get(DatasetDocument, document_id) + if document: + document.indexing_status = "error" + error_message = getattr(error, "description", str(error)) + document.error = str(error_message) + document.stopped_at = naive_utc_now() + db.session.commit() + def run(self, dataset_documents: list[DatasetDocument]): """Run the indexing process.""" for dataset_document in dataset_documents: + document_id = dataset_document.id try: + # Re-query the document to ensure it's bound to the current session + requeried_document = db.session.get(DatasetDocument, document_id) + if not requeried_document: + logger.warning("Document not found, skipping document id: %s", document_id) + continue + # get dataset - dataset = db.session.query(Dataset).filter_by(id=dataset_document.dataset_id).first() + dataset = db.session.query(Dataset).filter_by(id=requeried_document.dataset_id).first() if not dataset: raise ValueError("no dataset found") # get the process rule stmt = select(DatasetProcessRule).where( - DatasetProcessRule.id == dataset_document.dataset_process_rule_id + DatasetProcessRule.id == requeried_document.dataset_process_rule_id ) processing_rule = db.session.scalar(stmt) if not processing_rule: raise ValueError("no process rule found") - index_type = dataset_document.doc_form + index_type = requeried_document.doc_form index_processor = IndexProcessorFactory(index_type).init_index_processor() # extract - text_docs = self._extract(index_processor, dataset_document, processing_rule.to_dict()) + text_docs = self._extract(index_processor, requeried_document, processing_rule.to_dict()) # transform documents = self._transform( - index_processor, dataset, text_docs, dataset_document.doc_language, processing_rule.to_dict() + index_processor, dataset, text_docs, requeried_document.doc_language, processing_rule.to_dict() ) # save segment - self._load_segments(dataset, dataset_document, documents) + self._load_segments(dataset, requeried_document, documents) # load self._load( index_processor=index_processor, dataset=dataset, - dataset_document=dataset_document, + dataset_document=requeried_document, documents=documents, ) except DocumentIsPausedError: - raise DocumentIsPausedError(f"Document paused, document id: {dataset_document.id}") + raise DocumentIsPausedError(f"Document paused, document id: {document_id}") except ProviderTokenNotInitError as e: - dataset_document.indexing_status = "error" - dataset_document.error = str(e.description) - dataset_document.stopped_at = naive_utc_now() - db.session.commit() + self._handle_indexing_error(document_id, e) except ObjectDeletedError: - logger.warning("Document deleted, document id: %s", dataset_document.id) + logger.warning("Document deleted, document id: %s", document_id) except Exception as e: - logger.exception("consume document failed") - dataset_document.indexing_status = "error" - dataset_document.error = str(e) - dataset_document.stopped_at = naive_utc_now() - db.session.commit() + self._handle_indexing_error(document_id, e) def run_in_splitting_status(self, dataset_document: DatasetDocument): """Run the indexing process when the index_status is splitting.""" + document_id = dataset_document.id try: + # Re-query the document to ensure it's bound to the current session + requeried_document = db.session.get(DatasetDocument, document_id) + if not requeried_document: + logger.warning("Document not found: %s", document_id) + return + # get dataset - dataset = db.session.query(Dataset).filter_by(id=dataset_document.dataset_id).first() + dataset = db.session.query(Dataset).filter_by(id=requeried_document.dataset_id).first() if not dataset: raise ValueError("no dataset found") @@ -112,57 +130,60 @@ class IndexingRunner: # get exist document_segment list and delete document_segments = ( db.session.query(DocumentSegment) - .filter_by(dataset_id=dataset.id, document_id=dataset_document.id) + .filter_by(dataset_id=dataset.id, document_id=requeried_document.id) .all() ) for document_segment in document_segments: db.session.delete(document_segment) - if dataset_document.doc_form == IndexType.PARENT_CHILD_INDEX: + if requeried_document.doc_form == IndexType.PARENT_CHILD_INDEX: # delete child chunks db.session.query(ChildChunk).where(ChildChunk.segment_id == document_segment.id).delete() db.session.commit() # get the process rule - stmt = select(DatasetProcessRule).where(DatasetProcessRule.id == dataset_document.dataset_process_rule_id) + stmt = select(DatasetProcessRule).where(DatasetProcessRule.id == requeried_document.dataset_process_rule_id) processing_rule = db.session.scalar(stmt) if not processing_rule: raise ValueError("no process rule found") - index_type = dataset_document.doc_form + index_type = requeried_document.doc_form index_processor = IndexProcessorFactory(index_type).init_index_processor() # extract - text_docs = self._extract(index_processor, dataset_document, processing_rule.to_dict()) + text_docs = self._extract(index_processor, requeried_document, processing_rule.to_dict()) # transform documents = self._transform( - index_processor, dataset, text_docs, dataset_document.doc_language, processing_rule.to_dict() + index_processor, dataset, text_docs, requeried_document.doc_language, processing_rule.to_dict() ) # save segment - self._load_segments(dataset, dataset_document, documents) + self._load_segments(dataset, requeried_document, documents) # load self._load( - index_processor=index_processor, dataset=dataset, dataset_document=dataset_document, documents=documents + index_processor=index_processor, + dataset=dataset, + dataset_document=requeried_document, + documents=documents, ) except DocumentIsPausedError: - raise DocumentIsPausedError(f"Document paused, document id: {dataset_document.id}") + raise DocumentIsPausedError(f"Document paused, document id: {document_id}") except ProviderTokenNotInitError as e: - dataset_document.indexing_status = "error" - dataset_document.error = str(e.description) - dataset_document.stopped_at = naive_utc_now() - db.session.commit() + self._handle_indexing_error(document_id, e) except Exception as e: - logger.exception("consume document failed") - dataset_document.indexing_status = "error" - dataset_document.error = str(e) - dataset_document.stopped_at = naive_utc_now() - db.session.commit() + self._handle_indexing_error(document_id, e) def run_in_indexing_status(self, dataset_document: DatasetDocument): """Run the indexing process when the index_status is indexing.""" + document_id = dataset_document.id try: + # Re-query the document to ensure it's bound to the current session + requeried_document = db.session.get(DatasetDocument, document_id) + if not requeried_document: + logger.warning("Document not found: %s", document_id) + return + # get dataset - dataset = db.session.query(Dataset).filter_by(id=dataset_document.dataset_id).first() + dataset = db.session.query(Dataset).filter_by(id=requeried_document.dataset_id).first() if not dataset: raise ValueError("no dataset found") @@ -170,7 +191,7 @@ class IndexingRunner: # get exist document_segment list and delete document_segments = ( db.session.query(DocumentSegment) - .filter_by(dataset_id=dataset.id, document_id=dataset_document.id) + .filter_by(dataset_id=dataset.id, document_id=requeried_document.id) .all() ) @@ -188,7 +209,7 @@ class IndexingRunner: "dataset_id": document_segment.dataset_id, }, ) - if dataset_document.doc_form == IndexType.PARENT_CHILD_INDEX: + if requeried_document.doc_form == IndexType.PARENT_CHILD_INDEX: child_chunks = document_segment.get_child_chunks() if child_chunks: child_documents = [] @@ -206,24 +227,20 @@ class IndexingRunner: document.children = child_documents documents.append(document) # build index - index_type = dataset_document.doc_form + index_type = requeried_document.doc_form index_processor = IndexProcessorFactory(index_type).init_index_processor() self._load( - index_processor=index_processor, dataset=dataset, dataset_document=dataset_document, documents=documents + index_processor=index_processor, + dataset=dataset, + dataset_document=requeried_document, + documents=documents, ) except DocumentIsPausedError: - raise DocumentIsPausedError(f"Document paused, document id: {dataset_document.id}") + raise DocumentIsPausedError(f"Document paused, document id: {document_id}") except ProviderTokenNotInitError as e: - dataset_document.indexing_status = "error" - dataset_document.error = str(e.description) - dataset_document.stopped_at = naive_utc_now() - db.session.commit() + self._handle_indexing_error(document_id, e) except Exception as e: - logger.exception("consume document failed") - dataset_document.indexing_status = "error" - dataset_document.error = str(e) - dataset_document.stopped_at = naive_utc_now() - db.session.commit() + self._handle_indexing_error(document_id, e) def indexing_estimate( self,