From 626e71cb3b82d43b1c40d64b5341124b0c1e92e6 Mon Sep 17 00:00:00 2001 From: Frederick2313072 <2031894837@qq.com> Date: Sat, 20 Sep 2025 06:28:14 +0800 Subject: [PATCH] feat: implement content-based deduplication for document segments - Add database index on (dataset_id, index_node_hash) for efficient deduplication queries - Add deduplication check in SegmentService.create_segment and multi_create_segment methods - Add deduplication check in DatasetDocumentStore.add_documents method to prevent duplicate embedding processing - Skip creating segments with identical content hashes across the entire dataset This prevents duplicate content from being re-processed and re-embedded when uploading documents with repeated content, improving efficiency and reducing unnecessary compute costs. --- api/core/rag/docstore/dataset_docstore.py | 11 ++++++++++ api/models/dataset.py | 1 + api/services/dataset_service.py | 26 +++++++++++++++++++++++ 3 files changed, 38 insertions(+) diff --git a/api/core/rag/docstore/dataset_docstore.py b/api/core/rag/docstore/dataset_docstore.py index 74a2653e9d..3fe4243fed 100644 --- a/api/core/rag/docstore/dataset_docstore.py +++ b/api/core/rag/docstore/dataset_docstore.py @@ -93,6 +93,17 @@ class DatasetDocumentStore: segment_document = self.get_document_segment(doc_id=doc.metadata["doc_id"]) + # Check if a segment with the same content hash already exists in the dataset + existing_segment_by_hash = db.session.query(DocumentSegment).filter_by( + dataset_id=self._dataset.id, + index_node_hash=doc.metadata["doc_hash"], + enabled=True + ).first() + + if existing_segment_by_hash: + # Skip creating duplicate segment with same content hash + continue + # NOTE: doc could already exist in the store, but we overwrite it if not allow_update and segment_document: raise ValueError( diff --git a/api/models/dataset.py b/api/models/dataset.py index 2c4059f800..8d0d2a1357 100644 --- a/api/models/dataset.py +++ b/api/models/dataset.py @@ -689,6 +689,7 @@ class DocumentSegment(Base): sa.Index("document_segment_tenant_document_idx", "document_id", "tenant_id"), sa.Index("document_segment_node_dataset_idx", "index_node_id", "dataset_id"), sa.Index("document_segment_tenant_idx", "tenant_id"), + sa.Index("document_segment_dataset_hash_idx", "dataset_id", "index_node_hash"), ) # initial fields diff --git a/api/services/dataset_service.py b/api/services/dataset_service.py index 8b3720026d..1d2395be14 100644 --- a/api/services/dataset_service.py +++ b/api/services/dataset_service.py @@ -2623,6 +2623,17 @@ class SegmentService: tokens = embedding_model.get_text_embedding_num_tokens(texts=[content])[0] lock_name = f"add_segment_lock_document_id_{document.id}" with redis_client.lock(lock_name, timeout=600): + # Check if a segment with the same content hash already exists + existing_segment = db.session.query(DocumentSegment).filter_by( + dataset_id=document.dataset_id, + index_node_hash=segment_hash, + enabled=True + ).first() + + if existing_segment: + logger.info(f"Segment with same content hash already exists: {segment_hash}") + return existing_segment + max_position = ( db.session.query(func.max(DocumentSegment.position)) .where(DocumentSegment.document_id == document.id) @@ -2689,6 +2700,15 @@ class SegmentService: .where(DocumentSegment.document_id == document.id) .scalar() ) + # Batch query existing hashes before the loop + segment_hashes = [helper.generate_text_hash(seg["content"]) for seg in segments] + existing_segments = db.session.query(DocumentSegment.index_node_hash).filter( + DocumentSegment.dataset_id == document.dataset_id, + DocumentSegment.index_node_hash.in_(segment_hashes), + DocumentSegment.enabled == True + ).all() + existing_hashes = {seg.index_node_hash for seg in existing_segments} + pre_segment_data_list = [] segment_data_list = [] keywords_list = [] @@ -2697,6 +2717,12 @@ class SegmentService: content = segment_item["content"] doc_id = str(uuid.uuid4()) segment_hash = helper.generate_text_hash(content) + + # Skip existing segments + if segment_hash in existing_hashes: + logger.info(f"Skipping duplicate segment with hash: {segment_hash}") + continue + tokens = 0 if dataset.indexing_technique == "high_quality" and embedding_model: # calc embedding use tokens