From 2b4b4aea1a724db2784cc8a3d6cf1004b3b24a95 Mon Sep 17 00:00:00 2001 From: Frederick2313072 <2031894837@qq.com> Date: Wed, 24 Sep 2025 19:44:54 +0800 Subject: [PATCH] feat: pinecone vdb support --- api/.env.example | 12 +- api/configs/middleware/__init__.py | 2 + api/configs/middleware/vdb/pinecone_config.py | 40 ++ api/controllers/console/datasets/datasets.py | 2 + .../rag/datasource/vdb/pinecone/__init__.py | 0 .../vdb/pinecone/pinecone_vector.py | 341 ++++++++++++++++++ api/core/rag/datasource/vdb/vector_factory.py | 4 + api/core/rag/datasource/vdb/vector_type.py | 1 + api/pyproject.toml | 1 + .../vdb/pinecone/__init__.py | 0 .../vdb/pinecone/test_pinecone.py | 30 ++ 11 files changed, 432 insertions(+), 1 deletion(-) create mode 100644 api/configs/middleware/vdb/pinecone_config.py create mode 100644 api/core/rag/datasource/vdb/pinecone/__init__.py create mode 100644 api/core/rag/datasource/vdb/pinecone/pinecone_vector.py create mode 100644 api/tests/integration_tests/vdb/pinecone/__init__.py create mode 100644 api/tests/integration_tests/vdb/pinecone/test_pinecone.py diff --git a/api/.env.example b/api/.env.example index 78a363e506..9bd29f7d41 100644 --- a/api/.env.example +++ b/api/.env.example @@ -158,7 +158,7 @@ WEB_API_CORS_ALLOW_ORIGINS=http://localhost:3000,* CONSOLE_CORS_ALLOW_ORIGINS=http://localhost:3000,* # Vector database configuration -# Supported values are `weaviate`, `qdrant`, `milvus`, `myscale`, `relyt`, `pgvector`, `pgvecto-rs`, `chroma`, `opensearch`, `oracle`, `tencent`, `elasticsearch`, `elasticsearch-ja`, `analyticdb`, `couchbase`, `vikingdb`, `oceanbase`, `opengauss`, `tablestore`,`vastbase`,`tidb`,`tidb_on_qdrant`,`baidu`,`lindorm`,`huawei_cloud`,`upstash`, `matrixone`. +# Supported values are `weaviate`, `qdrant`, `milvus`, `myscale`, `relyt`, `pgvector`, `pgvecto-rs`, `chroma`, `opensearch`, `oracle`, `tencent`, `elasticsearch`, `elasticsearch-ja`, `analyticdb`, `couchbase`, `vikingdb`, `oceanbase`, `opengauss`, `tablestore`,`vastbase`,`tidb`,`tidb_on_qdrant`,`baidu`,`lindorm`,`huawei_cloud`,`upstash`, `matrixone`, `pinecone`. VECTOR_STORE=weaviate # Prefix used to create collection name in vector database VECTOR_INDEX_NAME_PREFIX=Vector_index @@ -365,6 +365,16 @@ PROMPT_GENERATION_MAX_TOKENS=512 CODE_GENERATION_MAX_TOKENS=1024 PLUGIN_BASED_TOKEN_COUNTING_ENABLED=false +# Pinecone configuration, only available when VECTOR_STORE is `pinecone` +PINECONE_API_KEY=your-pinecone-api-key +PINECONE_ENVIRONMENT=your-pinecone-environment +PINECONE_INDEX_NAME=dify-index +PINECONE_CLIENT_TIMEOUT=30 +PINECONE_BATCH_SIZE=100 +PINECONE_METRIC=cosine +PINECONE_PODS=1 +PINECONE_POD_TYPE=s1 + # Mail configuration, support: resend, smtp, sendgrid MAIL_TYPE= # If using SendGrid, use the 'from' field for authentication if necessary. diff --git a/api/configs/middleware/__init__.py b/api/configs/middleware/__init__.py index 62b3cc9842..c84765873c 100644 --- a/api/configs/middleware/__init__.py +++ b/api/configs/middleware/__init__.py @@ -35,6 +35,7 @@ from .vdb.opensearch_config import OpenSearchConfig from .vdb.oracle_config import OracleConfig from .vdb.pgvector_config import PGVectorConfig from .vdb.pgvectors_config import PGVectoRSConfig +from .vdb.pinecone_config import PineconeConfig from .vdb.qdrant_config import QdrantConfig from .vdb.relyt_config import RelytConfig from .vdb.tablestore_config import TableStoreConfig @@ -336,6 +337,7 @@ class MiddlewareConfig( PGVectorConfig, VastbaseVectorConfig, PGVectoRSConfig, + PineconeConfig, QdrantConfig, RelytConfig, TencentVectorDBConfig, diff --git a/api/configs/middleware/vdb/pinecone_config.py b/api/configs/middleware/vdb/pinecone_config.py new file mode 100644 index 0000000000..3b3ca186ff --- /dev/null +++ b/api/configs/middleware/vdb/pinecone_config.py @@ -0,0 +1,40 @@ +from typing import Optional + +from pydantic import Field, PositiveInt +from pydantic_settings import BaseSettings + + +class PineconeConfig(BaseSettings): + """ + Configuration settings for Pinecone vector database + """ + + PINECONE_API_KEY: Optional[str] = Field( + description="API key for authenticating with Pinecone service", + default=None, + ) + + PINECONE_ENVIRONMENT: Optional[str] = Field( + description="Pinecone environment (e.g., 'us-west1-gcp', 'us-east-1-aws')", + default=None, + ) + + PINECONE_INDEX_NAME: Optional[str] = Field( + description="Default Pinecone index name", + default=None, + ) + + PINECONE_CLIENT_TIMEOUT: PositiveInt = Field( + description="Timeout in seconds for Pinecone client operations (default is 30 seconds)", + default=30, + ) + + PINECONE_BATCH_SIZE: PositiveInt = Field( + description="Batch size for Pinecone operations (default is 100)", + default=100, + ) + + PINECONE_METRIC: str = Field( + description="Distance metric for Pinecone index (cosine, euclidean, dotproduct)", + default="cosine", + ) \ No newline at end of file diff --git a/api/controllers/console/datasets/datasets.py b/api/controllers/console/datasets/datasets.py index 2affbd6a42..ed9817a5ac 100644 --- a/api/controllers/console/datasets/datasets.py +++ b/api/controllers/console/datasets/datasets.py @@ -784,6 +784,7 @@ class DatasetRetrievalSettingApi(Resource): | VectorType.PGVECTO_RS | VectorType.VIKINGDB | VectorType.UPSTASH + | VectorType.PINECONE ): return {"retrieval_method": [RetrievalMethod.SEMANTIC_SEARCH.value]} case ( @@ -840,6 +841,7 @@ class DatasetRetrievalSettingMockApi(Resource): | VectorType.PGVECTO_RS | VectorType.VIKINGDB | VectorType.UPSTASH + | VectorType.PINECONE ): return {"retrieval_method": [RetrievalMethod.SEMANTIC_SEARCH.value]} case ( diff --git a/api/core/rag/datasource/vdb/pinecone/__init__.py b/api/core/rag/datasource/vdb/pinecone/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/api/core/rag/datasource/vdb/pinecone/pinecone_vector.py b/api/core/rag/datasource/vdb/pinecone/pinecone_vector.py new file mode 100644 index 0000000000..68093894c1 --- /dev/null +++ b/api/core/rag/datasource/vdb/pinecone/pinecone_vector.py @@ -0,0 +1,341 @@ +import json +import time +from typing import Any, Optional + +from pinecone import Pinecone, ServerlessSpec +from pydantic import BaseModel + +from configs import dify_config +from core.rag.datasource.vdb.field import Field +from core.rag.datasource.vdb.vector_base import BaseVector +from core.rag.datasource.vdb.vector_factory import AbstractVectorFactory +from core.rag.datasource.vdb.vector_type import VectorType +from core.rag.embedding.embedding_base import Embeddings +from core.rag.models.document import Document +from extensions.ext_database import db +from extensions.ext_redis import redis_client +from models.dataset import Dataset, DatasetCollectionBinding + + +class PineconeConfig(BaseModel): + """Pinecone configuration class""" + api_key: str + environment: str + index_name: Optional[str] = None + timeout: float = 30 + batch_size: int = 100 + metric: str = "cosine" + + +class PineconeVector(BaseVector): + """Pinecone vector database concrete implementation class""" + + def __init__(self, collection_name: str, group_id: str, config: PineconeConfig): + super().__init__(collection_name) + self._client_config = config + self._group_id = group_id + + # Initialize Pinecone client with SSL configuration + try: + self._pc = Pinecone( + api_key=config.api_key, + # Configure SSL to handle connection issues + ssl_ca_certs=None, # Use system default CA certificates + ) + except Exception as e: + # Fallback to basic initialization if SSL config fails + self._pc = Pinecone(api_key=config.api_key) + + # Normalize index name: lowercase, only a-z0-9- and <=45 chars + import re, hashlib + base_name = collection_name.lower() + base_name = re.sub(r'[^a-z0-9-]+', '-', base_name) # replace invalid chars with '-' + base_name = re.sub(r'-+', '-', base_name).strip('-') + # Use longer secure suffix to reduce collision risk + suffix_len = 24 # 24 hex digits (96-bit entropy) + if len(base_name) > 45: + hash_suffix = hashlib.sha256(base_name.encode()).hexdigest()[:suffix_len] + truncated_name = base_name[:45-(suffix_len+1)].rstrip('-') + self._index_name = f"{truncated_name}-{hash_suffix}" + else: + self._index_name = base_name + # Guard empty name + if not self._index_name: + self._index_name = f"index-{hashlib.sha256(collection_name.encode()).hexdigest()[:suffix_len]}" + self._index = None + + def get_type(self) -> str: + """Return vector database type identifier""" + return "pinecone" + + def _ensure_index_initialized(self) -> None: + """Ensure that self._index is attached to an existing Pinecone index.""" + if self._index is not None: + return + try: + existing_indexes = self._pc.list_indexes().names() + if self._index_name in existing_indexes: + self._index = self._pc.Index(self._index_name) + else: + raise ValueError("Index not initialized. Please ingest documents to create index.") + except Exception: + raise + + def to_index_struct(self) -> dict: + """Generate index structure dictionary""" + return { + "type": self.get_type(), + "vector_store": {"class_prefix": self._collection_name} + } + + def create(self, texts: list[Document], embeddings: list[list[float]], **kwargs): + """Create vector index""" + if texts: + # Get vector dimension + vector_size = len(embeddings[0]) + + # Create Pinecone index + self.create_index(vector_size) + + # Add vector data + self.add_texts(texts, embeddings, **kwargs) + + def create_index(self, dimension: int): + """Create Pinecone index""" + lock_name = f"vector_indexing_lock_{self._index_name}" + + with redis_client.lock(lock_name, timeout=30): + # Check Redis cache + index_exist_cache_key = f"vector_indexing_{self._index_name}" + if redis_client.get(index_exist_cache_key): + self._index = self._pc.Index(self._index_name) + return + + # Check if index already exists + existing_indexes = self._pc.list_indexes().names() + + if self._index_name not in existing_indexes: + # Create new index using ServerlessSpec + self._pc.create_index( + name=self._index_name, + dimension=dimension, + metric=self._client_config.metric, + spec=ServerlessSpec( + cloud='aws', + region=self._client_config.environment + ) + ) + + # Wait for index creation to complete + while not self._pc.describe_index(self._index_name).status['ready']: + time.sleep(1) + else: + # Get index instance + self._index = self._pc.Index(self._index_name) + + # Set cache + redis_client.set(index_exist_cache_key, 1, ex=3600) + + def add_texts(self, documents: list[Document], embeddings: list[list[float]], **kwargs): + """Batch add document vectors""" + if not self._index: + raise ValueError("Index not initialized. Call create() first.") + + total_docs = len(documents) + + uuids = self._get_uuids(documents) + batch_size = self._client_config.batch_size + added_ids = [] + + # Batch processing + total_batches = (total_docs + batch_size - 1) // batch_size # Ceiling division + for batch_idx, i in enumerate(range(0, len(documents), batch_size), 1): + batch_documents = documents[i:i + batch_size] + batch_embeddings = embeddings[i:i + batch_size] + batch_uuids = uuids[i:i + batch_size] + batch_size_actual = len(batch_documents) + + # Build Pinecone vector data (metadata must be primitives or list[str]) + vectors_to_upsert = [] + for doc, embedding, doc_id in zip(batch_documents, batch_embeddings, batch_uuids): + raw_meta = doc.metadata or {} + safe_meta: dict[str, Any] = {} + # lift common identifiers to top-level fields for filtering + for k, v in raw_meta.items(): + if isinstance(v, (str, int, float, bool)): + safe_meta[k] = v + elif isinstance(v, list) and all(isinstance(x, str) for x in v): + safe_meta[k] = v + else: + safe_meta[k] = json.dumps(v, ensure_ascii=False) + + # keep content as string metadata if needed + safe_meta[Field.CONTENT_KEY.value] = doc.page_content + # group id as string + safe_meta[Field.GROUP_KEY.value] = str(self._group_id) + + vectors_to_upsert.append({ + "id": doc_id, + "values": embedding, + "metadata": safe_meta + }) + + # Batch insert to Pinecone + try: + self._index.upsert(vectors=vectors_to_upsert) + added_ids.extend(batch_uuids) + except Exception as e: + raise + + return added_ids + + def search_by_vector(self, query_vector: list[float], **kwargs) -> list[Document]: + """Vector similarity search""" + # Lazily attach to an existing index if needed + self._ensure_index_initialized() + + top_k = kwargs.get("top_k", 4) + score_threshold = float(kwargs.get("score_threshold", 0.0)) + + # Build filter conditions + filter_dict = {Field.GROUP_KEY.value: {"$eq": str(self._group_id)}} + + # Document scope filtering + document_ids_filter = kwargs.get("document_ids_filter") + if document_ids_filter: + filter_dict["document_id"] = {"$in": document_ids_filter} + + # Execute search + try: + response = self._index.query( + vector=query_vector, + top_k=top_k, + include_metadata=True, + filter=filter_dict + ) + except Exception as e: + raise + + # Convert results + docs = [] + filtered_count = 0 + for match in response.matches: + if match.score >= score_threshold: + page_content = match.metadata.get(Field.CONTENT_KEY.value, "") + metadata = dict(match.metadata or {}) + metadata.pop(Field.CONTENT_KEY.value, None) + metadata.pop(Field.GROUP_KEY.value, None) + metadata["score"] = match.score + + doc = Document(page_content=page_content, metadata=metadata) + docs.append(doc) + else: + filtered_count += 1 + + # Sort by similarity score in descending order + docs.sort(key=lambda x: x.metadata.get("score", 0), reverse=True) + + return docs + + def search_by_full_text(self, query: str, **kwargs) -> list[Document]: + """Full-text search - Pinecone does not natively support it, returns empty list""" + return [] + + def delete_by_metadata_field(self, key: str, value: str): + """Delete by metadata field""" + self._ensure_index_initialized() + + try: + # Build filter conditions + filter_dict = { + Field.GROUP_KEY.value: {"$eq": self._group_id}, + f"{Field.METADATA_KEY.value}.{key}": {"$eq": value} + } + + # Pinecone delete operation + self._index.delete(filter=filter_dict) + except Exception as e: + # Ignore delete errors + pass + + def delete_by_ids(self, ids: list[str]) -> None: + """Batch delete by ID list""" + self._ensure_index_initialized() + + try: + # Pinecone delete by ID + self._index.delete(ids=ids) + except Exception as e: + raise + + def delete(self) -> None: + """Delete all vector data for the entire dataset""" + self._ensure_index_initialized() + + try: + # Delete all vectors by group_id + filter_dict = {Field.GROUP_KEY.value: {"$eq": self._group_id}} + self._index.delete(filter=filter_dict) + except Exception as e: + raise + + def text_exists(self, id: str) -> bool: + """Check if document exists""" + try: + self._ensure_index_initialized() + except Exception: + return False + + try: + # Check if vector exists through query + response = self._index.fetch(ids=[id]) + exists = id in response.vectors + return exists + except Exception as e: + return False + + +class PineconeVectorFactory(AbstractVectorFactory): + """Pinecone vector database factory class""" + + def init_vector(self, dataset: Dataset, attributes: list, embeddings: Embeddings) -> PineconeVector: + """Create PineconeVector instance""" + + # Determine index name + if dataset.collection_binding_id: + dataset_collection_binding = ( + db.session.query(DatasetCollectionBinding) + .where(DatasetCollectionBinding.id == dataset.collection_binding_id) + .one_or_none() + ) + if dataset_collection_binding: + collection_name = dataset_collection_binding.collection_name + else: + raise ValueError("Dataset Collection Bindings does not exist!") + else: + if dataset.index_struct_dict: + class_prefix: str = dataset.index_struct_dict["vector_store"]["class_prefix"] + collection_name = class_prefix + else: + dataset_id = dataset.id + collection_name = Dataset.gen_collection_name_by_id(dataset_id) + + # Set index structure + if not dataset.index_struct_dict: + dataset.index_struct = json.dumps( + self.gen_index_struct_dict("pinecone", collection_name) + ) + + # Create PineconeVector instance + return PineconeVector( + collection_name=collection_name, + group_id=dataset.id, + config=PineconeConfig( + api_key=dify_config.PINECONE_API_KEY or "", + environment=dify_config.PINECONE_ENVIRONMENT or "", + index_name=dify_config.PINECONE_INDEX_NAME, + timeout=dify_config.PINECONE_CLIENT_TIMEOUT, + batch_size=dify_config.PINECONE_BATCH_SIZE, + metric=dify_config.PINECONE_METRIC, + ), + ) \ No newline at end of file diff --git a/api/core/rag/datasource/vdb/vector_factory.py b/api/core/rag/datasource/vdb/vector_factory.py index dc4f026ff3..a2d3511d9b 100644 --- a/api/core/rag/datasource/vdb/vector_factory.py +++ b/api/core/rag/datasource/vdb/vector_factory.py @@ -87,6 +87,10 @@ class Vector: from core.rag.datasource.vdb.pgvecto_rs.pgvecto_rs import PGVectoRSFactory return PGVectoRSFactory + case VectorType.PINECONE: + from core.rag.datasource.vdb.pinecone.pinecone_vector import PineconeVectorFactory + + return PineconeVectorFactory case VectorType.QDRANT: from core.rag.datasource.vdb.qdrant.qdrant_vector import QdrantVectorFactory diff --git a/api/core/rag/datasource/vdb/vector_type.py b/api/core/rag/datasource/vdb/vector_type.py index a415142196..1ae30b62e4 100644 --- a/api/core/rag/datasource/vdb/vector_type.py +++ b/api/core/rag/datasource/vdb/vector_type.py @@ -31,3 +31,4 @@ class VectorType(StrEnum): HUAWEI_CLOUD = "huawei_cloud" MATRIXONE = "matrixone" CLICKZETTA = "clickzetta" + PINECONE = "pinecone" diff --git a/api/pyproject.toml b/api/pyproject.toml index 012702edd2..0b2a4b7b19 100644 --- a/api/pyproject.toml +++ b/api/pyproject.toml @@ -89,6 +89,7 @@ dependencies = [ "sendgrid~=6.12.3", "flask-restx~=1.3.0", "packaging~=23.2", + "pinecone>=7.3.0", ] # Before adding new dependency, consider place it in # alphabet order (a-z) and suitable group. diff --git a/api/tests/integration_tests/vdb/pinecone/__init__.py b/api/tests/integration_tests/vdb/pinecone/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/api/tests/integration_tests/vdb/pinecone/test_pinecone.py b/api/tests/integration_tests/vdb/pinecone/test_pinecone.py new file mode 100644 index 0000000000..e3f4e7f899 --- /dev/null +++ b/api/tests/integration_tests/vdb/pinecone/test_pinecone.py @@ -0,0 +1,30 @@ +from core.rag.datasource.vdb.pinecone.pinecone_vector import PineconeConfig, PineconeVector +from core.rag.models.document import Document +from tests.integration_tests.vdb.test_vector_store import ( + AbstractVectorTest, + setup_mock_redis, +) + + +class PineconeVectorTest(AbstractVectorTest): + def __init__(self): + super().__init__() + self.attributes = ["doc_id", "dataset_id", "document_id", "doc_hash"] + self.vector = PineconeVector( + collection_name=self.collection_name, + group_id=self.dataset_id, + config=PineconeConfig( + api_key="test_api_key", + environment="test_environment", + index_name="test_index", + ), + ) + + def search_by_vector(self): + super().search_by_vector() + + +def test_pinecone_vector(): + + + PineconeVectorTest().run_all_tests() \ No newline at end of file