From 12de554313a348124f2297c5939850763b33dc4c Mon Sep 17 00:00:00 2001 From: Frederick2313072 <2031894837@qq.com> Date: Tue, 23 Sep 2025 16:41:46 +0800 Subject: [PATCH] fix: add index initialization checks, improve batch vector operations and search, ensure robust exception handling. --- .../vdb/pinecone/pinecone_vector.py | 153 ++++++++++-------- 1 file changed, 86 insertions(+), 67 deletions(-) diff --git a/api/core/rag/datasource/vdb/pinecone/pinecone_vector.py b/api/core/rag/datasource/vdb/pinecone/pinecone_vector.py index 2df002464a..ef6edf2bb8 100644 --- a/api/core/rag/datasource/vdb/pinecone/pinecone_vector.py +++ b/api/core/rag/datasource/vdb/pinecone/pinecone_vector.py @@ -1,7 +1,6 @@ import json import time -import uuid -from typing import Any, Optional, Union +from typing import Any, Optional from pinecone import Pinecone, ServerlessSpec from pydantic import BaseModel @@ -30,7 +29,7 @@ class PineconeConfig(BaseModel): class PineconeVector(BaseVector): """Pinecone vector database concrete implementation class""" - + def __init__(self, collection_name: str, group_id: str, config: PineconeConfig): super().__init__(collection_name) self._client_config = config @@ -45,8 +44,6 @@ class PineconeVector(BaseVector): ) except Exception as e: # Fallback to basic initialization if SSL config fails - import logging - logging.warning(f"Failed to initialize Pinecone with SSL config: {e}, using basic config") self._pc = Pinecone(api_key=config.api_key) # Normalize index name: lowercase, only a-z0-9- and <=45 chars @@ -69,6 +66,19 @@ class PineconeVector(BaseVector): """Return vector database type identifier""" return "pinecone" + def _ensure_index_initialized(self) -> None: + """Ensure that self._index is attached to an existing Pinecone index.""" + if self._index is not None: + return + try: + existing_indexes = self._pc.list_indexes().names() + if self._index_name in existing_indexes: + self._index = self._pc.Index(self._index_name) + else: + raise ValueError("Index not initialized. Please ingest documents to create index.") + except Exception: + raise + def to_index_struct(self) -> dict: """Generate index structure dictionary""" return { @@ -90,21 +100,18 @@ class PineconeVector(BaseVector): def create_index(self, dimension: int): """Create Pinecone index""" - # Debug: Log the index name being used - import logging - logging.warning(f"Pinecone: Creating index with name: {self._index_name} (length: {len(self._index_name)})") lock_name = f"vector_indexing_lock_{self._index_name}" - + with redis_client.lock(lock_name, timeout=30): # Check Redis cache index_exist_cache_key = f"vector_indexing_{self._index_name}" if redis_client.get(index_exist_cache_key): self._index = self._pc.Index(self._index_name) return - + # Check if index already exists existing_indexes = self._pc.list_indexes().names() - + if self._index_name not in existing_indexes: # Create new index using ServerlessSpec self._pc.create_index( @@ -116,14 +123,14 @@ class PineconeVector(BaseVector): region=self._client_config.environment ) ) - + # Wait for index creation to complete while not self._pc.describe_index(self._index_name).status['ready']: time.sleep(1) - - # Get index instance - self._index = self._pc.Index(self._index_name) - + else: + # Get index instance + self._index = self._pc.Index(self._index_name) + # Set cache redis_client.set(index_exist_cache_key, 1, ex=3600) @@ -131,17 +138,21 @@ class PineconeVector(BaseVector): """Batch add document vectors""" if not self._index: raise ValueError("Index not initialized. Call create() first.") - + + total_docs = len(documents) + uuids = self._get_uuids(documents) batch_size = self._client_config.batch_size added_ids = [] - + # Batch processing - for i in range(0, len(documents), batch_size): + total_batches = (total_docs + batch_size - 1) // batch_size # Ceiling division + for batch_idx, i in enumerate(range(0, len(documents), batch_size), 1): batch_documents = documents[i:i + batch_size] batch_embeddings = embeddings[i:i + batch_size] batch_uuids = uuids[i:i + batch_size] - + batch_size_actual = len(batch_documents) + # Build Pinecone vector data (metadata must be primitives or list[str]) vectors_to_upsert = [] for doc, embedding, doc_id in zip(batch_documents, batch_embeddings, batch_uuids): @@ -166,52 +177,62 @@ class PineconeVector(BaseVector): "values": embedding, "metadata": safe_meta }) - + # Batch insert to Pinecone - self._index.upsert(vectors=vectors_to_upsert) - added_ids.extend(batch_uuids) - + try: + self._index.upsert(vectors=vectors_to_upsert) + added_ids.extend(batch_uuids) + except Exception as e: + raise + return added_ids def search_by_vector(self, query_vector: list[float], **kwargs) -> list[Document]: """Vector similarity search""" - if not self._index: - raise ValueError("Index not initialized.") - + # Lazily attach to an existing index if needed + self._ensure_index_initialized() + top_k = kwargs.get("top_k", 4) score_threshold = float(kwargs.get("score_threshold", 0.0)) - + # Build filter conditions - filter_dict = {Field.GROUP_KEY.value: {"$eq": self._group_id}} - + filter_dict = {Field.GROUP_KEY.value: {"$eq": str(self._group_id)}} + # Document scope filtering document_ids_filter = kwargs.get("document_ids_filter") if document_ids_filter: - filter_dict[f"{Field.METADATA_KEY.value}.document_id"] = {"$in": document_ids_filter} - + filter_dict["document_id"] = {"$in": document_ids_filter} + # Execute search - response = self._index.query( - vector=query_vector, - top_k=top_k, - include_metadata=True, - filter=filter_dict - ) - + try: + response = self._index.query( + vector=query_vector, + top_k=top_k, + include_metadata=True, + filter=filter_dict + ) + except Exception as e: + raise + # Convert results docs = [] + filtered_count = 0 for match in response.matches: if match.score >= score_threshold: - metadata = match.metadata.get(Field.METADATA_KEY.value, {}) + page_content = match.metadata.get(Field.CONTENT_KEY.value, "") + metadata = dict(match.metadata or {}) + metadata.pop(Field.CONTENT_KEY.value, None) + metadata.pop(Field.GROUP_KEY.value, None) metadata["score"] = match.score - - doc = Document( - page_content=match.metadata.get(Field.CONTENT_KEY.value, ""), - metadata=metadata, - ) + + doc = Document(page_content=page_content, metadata=metadata) docs.append(doc) - + else: + filtered_count += 1 + # Sort by similarity score in descending order docs.sort(key=lambda x: x.metadata.get("score", 0), reverse=True) + return docs def search_by_full_text(self, query: str, **kwargs) -> list[Document]: @@ -220,57 +241,55 @@ class PineconeVector(BaseVector): def delete_by_metadata_field(self, key: str, value: str): """Delete by metadata field""" - if not self._index: - return - + self._ensure_index_initialized() + try: # Build filter conditions filter_dict = { Field.GROUP_KEY.value: {"$eq": self._group_id}, f"{Field.METADATA_KEY.value}.{key}": {"$eq": value} } - + # Pinecone delete operation self._index.delete(filter=filter_dict) - except Exception: + except Exception as e: # Ignore delete errors pass def delete_by_ids(self, ids: list[str]) -> None: """Batch delete by ID list""" - if not self._index: - return - + self._ensure_index_initialized() + try: # Pinecone delete by ID self._index.delete(ids=ids) - except Exception: - # Ignore delete errors - pass + except Exception as e: + raise def delete(self) -> None: """Delete all vector data for the entire dataset""" - if not self._index: - return - + self._ensure_index_initialized() + try: # Delete all vectors by group_id filter_dict = {Field.GROUP_KEY.value: {"$eq": self._group_id}} self._index.delete(filter=filter_dict) - except Exception: - # Ignore delete errors - pass + except Exception as e: + raise def text_exists(self, id: str) -> bool: """Check if document exists""" - if not self._index: + try: + self._ensure_index_initialized() + except Exception: return False - + try: # Check if vector exists through query response = self._index.fetch(ids=[id]) - return id in response.vectors - except Exception: + exists = id in response.vectors + return exists + except Exception as e: return False