From 2bf6728951101e35e176f549162d37978a44b31c Mon Sep 17 00:00:00 2001 From: zhangx1n Date: Tue, 24 Mar 2026 15:58:13 +0800 Subject: [PATCH] chore(api): remove ClickZetta vector/storage integration and drop clickzetta/pyarrow dependencies --- api/commands/storage.py | 2 +- api/configs/middleware/__init__.py | 7 +- .../clickzetta_volume_storage_config.py | 63 - .../middleware/vdb/clickzetta_config.py | 68 -- api/controllers/console/datasets/datasets.py | 1 - .../rag/datasource/vdb/clickzetta/README.md | 201 --- .../rag/datasource/vdb/clickzetta/__init__.py | 1 - .../vdb/clickzetta/clickzetta_vector.py | 1079 ----------------- api/core/rag/datasource/vdb/vector_factory.py | 4 - api/core/rag/datasource/vdb/vector_type.py | 1 - api/extensions/ext_storage.py | 13 - .../storage/clickzetta_volume/__init__.py | 5 - .../clickzetta_volume_storage.py | 527 -------- .../clickzetta_volume/file_lifecycle.py | 518 -------- .../clickzetta_volume/volume_permissions.py | 649 ---------- api/extensions/storage/storage_type.py | 1 - api/pyproject.toml | 1 - api/pyrefly-local-excludes.txt | 3 - api/pyrightconfig.json | 3 +- .../storage/test_clickzetta_volume.py | 168 --- .../vdb/clickzetta/README.md | 25 - .../vdb/clickzetta/test_clickzetta.py | 223 ---- .../vdb/clickzetta/test_docker_integration.py | 165 --- api/uv.lock | 46 - 24 files changed, 3 insertions(+), 3771 deletions(-) delete mode 100644 api/configs/middleware/storage/clickzetta_volume_storage_config.py delete mode 100644 api/configs/middleware/vdb/clickzetta_config.py delete mode 100644 api/core/rag/datasource/vdb/clickzetta/README.md delete mode 100644 api/core/rag/datasource/vdb/clickzetta/__init__.py delete mode 100644 api/core/rag/datasource/vdb/clickzetta/clickzetta_vector.py delete mode 100644 api/extensions/storage/clickzetta_volume/__init__.py delete mode 100644 api/extensions/storage/clickzetta_volume/clickzetta_volume_storage.py delete mode 100644 api/extensions/storage/clickzetta_volume/file_lifecycle.py delete mode 100644 api/extensions/storage/clickzetta_volume/volume_permissions.py delete mode 100644 api/tests/integration_tests/storage/test_clickzetta_volume.py delete mode 100644 api/tests/integration_tests/vdb/clickzetta/README.md delete mode 100644 api/tests/integration_tests/vdb/clickzetta/test_clickzetta.py delete mode 100644 api/tests/integration_tests/vdb/clickzetta/test_docker_integration.py diff --git a/api/commands/storage.py b/api/commands/storage.py index fa890a855a..9e1eef0947 100644 --- a/api/commands/storage.py +++ b/api/commands/storage.py @@ -608,7 +608,7 @@ def migrate_oss( click.style( "Target STORAGE_TYPE must be a cloud OSS (not 'local' or 'opendal').\n" "Please set STORAGE_TYPE to one of: s3, aliyun-oss, azure-blob, google-storage, tencent-cos, \n" - "volcengine-tos, supabase, oci-storage, huawei-obs, baidu-obs, clickzetta-volume.", + "volcengine-tos, supabase, oci-storage, huawei-obs, baidu-obs.", fg="red", ) ) diff --git a/api/configs/middleware/__init__.py b/api/configs/middleware/__init__.py index 15ac8bf0bf..723cd82fb1 100644 --- a/api/configs/middleware/__init__.py +++ b/api/configs/middleware/__init__.py @@ -11,7 +11,6 @@ from .storage.aliyun_oss_storage_config import AliyunOSSStorageConfig from .storage.amazon_s3_storage_config import S3StorageConfig from .storage.azure_blob_storage_config import AzureBlobStorageConfig from .storage.baidu_obs_storage_config import BaiduOBSStorageConfig -from .storage.clickzetta_volume_storage_config import ClickZettaVolumeStorageConfig from .storage.google_cloud_storage_config import GoogleCloudStorageConfig from .storage.huawei_obs_storage_config import HuaweiCloudOBSStorageConfig from .storage.oci_storage_config import OCIStorageConfig @@ -23,7 +22,6 @@ from .vdb.alibabacloud_mysql_config import AlibabaCloudMySQLConfig from .vdb.analyticdb_config import AnalyticdbConfig from .vdb.baidu_vector_config import BaiduVectorDBConfig from .vdb.chroma_config import ChromaConfig -from .vdb.clickzetta_config import ClickzettaConfig from .vdb.couchbase_config import CouchbaseConfig from .vdb.elasticsearch_config import ElasticsearchConfig from .vdb.hologres_config import HologresConfig @@ -58,7 +56,6 @@ class StorageConfig(BaseSettings): "aliyun-oss", "azure-blob", "baidu-obs", - "clickzetta-volume", "google-storage", "huawei-obs", "oci-storage", @@ -69,7 +66,7 @@ class StorageConfig(BaseSettings): ] = Field( description="Type of storage to use." " Options: 'opendal', '(deprecated) local', 's3', 'aliyun-oss', 'azure-blob', 'baidu-obs', " - "'clickzetta-volume', 'google-storage', 'huawei-obs', 'oci-storage', 'tencent-cos', " + "'google-storage', 'huawei-obs', 'oci-storage', 'tencent-cos', " "'volcengine-tos', 'supabase'. Default is 'opendal'.", default="opendal", ) @@ -334,7 +331,6 @@ class MiddlewareConfig( AliyunOSSStorageConfig, AzureBlobStorageConfig, BaiduOBSStorageConfig, - ClickZettaVolumeStorageConfig, GoogleCloudStorageConfig, HuaweiCloudOBSStorageConfig, OCIStorageConfig, @@ -347,7 +343,6 @@ class MiddlewareConfig( VectorStoreConfig, AnalyticdbConfig, ChromaConfig, - ClickzettaConfig, HologresConfig, HuaweiCloudConfig, IrisVectorConfig, diff --git a/api/configs/middleware/storage/clickzetta_volume_storage_config.py b/api/configs/middleware/storage/clickzetta_volume_storage_config.py deleted file mode 100644 index 035650d98a..0000000000 --- a/api/configs/middleware/storage/clickzetta_volume_storage_config.py +++ /dev/null @@ -1,63 +0,0 @@ -"""ClickZetta Volume Storage Configuration""" - -from pydantic import Field -from pydantic_settings import BaseSettings - - -class ClickZettaVolumeStorageConfig(BaseSettings): - """Configuration for ClickZetta Volume storage.""" - - CLICKZETTA_VOLUME_USERNAME: str | None = Field( - description="Username for ClickZetta Volume authentication", - default=None, - ) - - CLICKZETTA_VOLUME_PASSWORD: str | None = Field( - description="Password for ClickZetta Volume authentication", - default=None, - ) - - CLICKZETTA_VOLUME_INSTANCE: str | None = Field( - description="ClickZetta instance identifier", - default=None, - ) - - CLICKZETTA_VOLUME_SERVICE: str = Field( - description="ClickZetta service endpoint", - default="api.clickzetta.com", - ) - - CLICKZETTA_VOLUME_WORKSPACE: str = Field( - description="ClickZetta workspace name", - default="quick_start", - ) - - CLICKZETTA_VOLUME_VCLUSTER: str = Field( - description="ClickZetta virtual cluster name", - default="default_ap", - ) - - CLICKZETTA_VOLUME_SCHEMA: str = Field( - description="ClickZetta schema name", - default="dify", - ) - - CLICKZETTA_VOLUME_TYPE: str = Field( - description="ClickZetta volume type (table|user|external)", - default="user", - ) - - CLICKZETTA_VOLUME_NAME: str | None = Field( - description="ClickZetta volume name for external volumes", - default=None, - ) - - CLICKZETTA_VOLUME_TABLE_PREFIX: str = Field( - description="Prefix for ClickZetta volume table names", - default="dataset_", - ) - - CLICKZETTA_VOLUME_DIFY_PREFIX: str = Field( - description="Directory prefix for User Volume to organize Dify files", - default="dify_km", - ) diff --git a/api/configs/middleware/vdb/clickzetta_config.py b/api/configs/middleware/vdb/clickzetta_config.py deleted file mode 100644 index e8172b5299..0000000000 --- a/api/configs/middleware/vdb/clickzetta_config.py +++ /dev/null @@ -1,68 +0,0 @@ -from pydantic import Field -from pydantic_settings import BaseSettings - - -class ClickzettaConfig(BaseSettings): - """ - Clickzetta Lakehouse vector database configuration - """ - - CLICKZETTA_USERNAME: str | None = Field( - description="Username for authenticating with Clickzetta Lakehouse", - default=None, - ) - - CLICKZETTA_PASSWORD: str | None = Field( - description="Password for authenticating with Clickzetta Lakehouse", - default=None, - ) - - CLICKZETTA_INSTANCE: str | None = Field( - description="Clickzetta Lakehouse instance ID", - default=None, - ) - - CLICKZETTA_SERVICE: str | None = Field( - description="Clickzetta API service endpoint (e.g., 'api.clickzetta.com')", - default="api.clickzetta.com", - ) - - CLICKZETTA_WORKSPACE: str | None = Field( - description="Clickzetta workspace name", - default="default", - ) - - CLICKZETTA_VCLUSTER: str | None = Field( - description="Clickzetta virtual cluster name", - default="default_ap", - ) - - CLICKZETTA_SCHEMA: str | None = Field( - description="Database schema name in Clickzetta", - default="public", - ) - - CLICKZETTA_BATCH_SIZE: int | None = Field( - description="Batch size for bulk insert operations", - default=100, - ) - - CLICKZETTA_ENABLE_INVERTED_INDEX: bool | None = Field( - description="Enable inverted index for full-text search capabilities", - default=True, - ) - - CLICKZETTA_ANALYZER_TYPE: str | None = Field( - description="Analyzer type for full-text search: keyword, english, chinese, unicode", - default="chinese", - ) - - CLICKZETTA_ANALYZER_MODE: str | None = Field( - description="Analyzer mode for tokenization: max_word (fine-grained) or smart (intelligent)", - default="smart", - ) - - CLICKZETTA_VECTOR_DISTANCE_FUNCTION: str | None = Field( - description="Distance function for vector similarity: l2_distance or cosine_distance", - default="cosine_distance", - ) diff --git a/api/controllers/console/datasets/datasets.py b/api/controllers/console/datasets/datasets.py index eebba57fa3..459f147d5e 100644 --- a/api/controllers/console/datasets/datasets.py +++ b/api/controllers/console/datasets/datasets.py @@ -259,7 +259,6 @@ def _get_retrieval_methods_by_vector_type(vector_type: str | None, is_mock: bool VectorType.HUAWEI_CLOUD, VectorType.TENCENT, VectorType.MATRIXONE, - VectorType.CLICKZETTA, VectorType.BAIDU, VectorType.ALIBABACLOUD_MYSQL, VectorType.IRIS, diff --git a/api/core/rag/datasource/vdb/clickzetta/README.md b/api/core/rag/datasource/vdb/clickzetta/README.md deleted file mode 100644 index 969d4e40a0..0000000000 --- a/api/core/rag/datasource/vdb/clickzetta/README.md +++ /dev/null @@ -1,201 +0,0 @@ -# Clickzetta Vector Database Integration - -This module provides integration with Clickzetta Lakehouse as a vector database for Dify. - -## Features - -- **Vector Storage**: Store and retrieve high-dimensional vectors using Clickzetta's native VECTOR type -- **Vector Search**: Efficient similarity search using HNSW algorithm -- **Full-Text Search**: Leverage Clickzetta's inverted index for powerful text search capabilities -- **Hybrid Search**: Combine vector similarity and full-text search for better results -- **Multi-language Support**: Built-in support for Chinese, English, and Unicode text processing -- **Scalable**: Leverage Clickzetta's distributed architecture for large-scale deployments - -## Configuration - -### Required Environment Variables - -All seven configuration parameters are required: - -```bash -# Authentication -CLICKZETTA_USERNAME=your_username -CLICKZETTA_PASSWORD=your_password - -# Instance configuration -CLICKZETTA_INSTANCE=your_instance_id -CLICKZETTA_SERVICE=api.clickzetta.com -CLICKZETTA_WORKSPACE=your_workspace -CLICKZETTA_VCLUSTER=your_vcluster -CLICKZETTA_SCHEMA=your_schema -``` - -### Optional Configuration - -```bash -# Batch processing -CLICKZETTA_BATCH_SIZE=100 - -# Full-text search configuration -CLICKZETTA_ENABLE_INVERTED_INDEX=true -CLICKZETTA_ANALYZER_TYPE=chinese # Options: keyword, english, chinese, unicode -CLICKZETTA_ANALYZER_MODE=smart # Options: max_word, smart - -# Vector search configuration -CLICKZETTA_VECTOR_DISTANCE_FUNCTION=cosine_distance # Options: l2_distance, cosine_distance -``` - -## Usage - -### 1. Set Clickzetta as the Vector Store - -In your Dify configuration, set: - -```bash -VECTOR_STORE=clickzetta -``` - -### 2. Table Structure - -Clickzetta will automatically create tables with the following structure: - -```sql -CREATE TABLE ( - id STRING NOT NULL, - content STRING NOT NULL, - metadata JSON, - vector VECTOR(FLOAT, ) NOT NULL, - PRIMARY KEY (id) -); - --- Vector index for similarity search -CREATE VECTOR INDEX idx__vec -ON TABLE .(vector) -PROPERTIES ( - "distance.function" = "cosine_distance", - "scalar.type" = "f32" -); - --- Inverted index for full-text search (if enabled) -CREATE INVERTED INDEX idx__text -ON .(content) -PROPERTIES ( - "analyzer" = "chinese", - "mode" = "smart" -); -``` - -## Full-Text Search Capabilities - -Clickzetta supports advanced full-text search with multiple analyzers: - -### Analyzer Types - -1. **keyword**: No tokenization, treats the entire string as a single token - - - Best for: Exact matching, IDs, codes - -1. **english**: Designed for English text - - - Features: Recognizes ASCII letters and numbers, converts to lowercase - - Best for: English content - -1. **chinese**: Chinese text tokenizer - - - Features: Recognizes Chinese and English characters, removes punctuation - - Best for: Chinese or mixed Chinese-English content - -1. **unicode**: Multi-language tokenizer based on Unicode - - - Features: Recognizes text boundaries in multiple languages - - Best for: Multi-language content - -### Analyzer Modes - -- **max_word**: Fine-grained tokenization (more tokens) -- **smart**: Intelligent tokenization (balanced) - -### Full-Text Search Functions - -- `MATCH_ALL(column, query)`: All terms must be present -- `MATCH_ANY(column, query)`: At least one term must be present -- `MATCH_PHRASE(column, query)`: Exact phrase matching -- `MATCH_PHRASE_PREFIX(column, query)`: Phrase prefix matching -- `MATCH_REGEXP(column, pattern)`: Regular expression matching - -## Performance Optimization - -### Vector Search - -1. **Adjust exploration factor** for accuracy vs speed trade-off: - - ```sql - SET cz.vector.index.search.ef=64; - ``` - -1. **Use appropriate distance functions**: - - - `cosine_distance`: Best for normalized embeddings (e.g., from language models) - - `l2_distance`: Best for raw feature vectors - -### Full-Text Search - -1. **Choose the right analyzer**: - - - Use `keyword` for exact matching - - Use language-specific analyzers for better tokenization - -1. **Combine with vector search**: - - - Pre-filter with full-text search for better performance - - Use hybrid search for improved relevance - -## Troubleshooting - -### Connection Issues - -1. Verify all 7 required configuration parameters are set -1. Check network connectivity to Clickzetta service -1. Ensure the user has proper permissions on the schema - -### Search Performance - -1. Verify vector index exists: - - ```sql - SHOW INDEX FROM .; - ``` - -1. Check if vector index is being used: - - ```sql - EXPLAIN SELECT ... WHERE l2_distance(...) < threshold; - ``` - - Look for `vector_index_search_type` in the execution plan. - -### Full-Text Search Not Working - -1. Verify inverted index is created -1. Check analyzer configuration matches your content language -1. Use `TOKENIZE()` function to test tokenization: - ```sql - SELECT TOKENIZE('your text', map('analyzer', 'chinese', 'mode', 'smart')); - ``` - -## Limitations - -1. Vector operations don't support `ORDER BY` or `GROUP BY` directly on vector columns -1. Full-text search relevance scores are not provided by Clickzetta -1. Inverted index creation may fail for very large existing tables (continue without error) -1. Index naming constraints: - - Index names must be unique within a schema - - Only one vector index can be created per column - - The implementation uses timestamps to ensure unique index names -1. A column can only have one vector index at a time - -## References - -- [Clickzetta Vector Search Documentation](https://yunqi.tech/documents/vector-search) -- [Clickzetta Inverted Index Documentation](https://yunqi.tech/documents/inverted-index) -- [Clickzetta SQL Functions](https://yunqi.tech/documents/sql-reference) diff --git a/api/core/rag/datasource/vdb/clickzetta/__init__.py b/api/core/rag/datasource/vdb/clickzetta/__init__.py deleted file mode 100644 index 9d41c5a57d..0000000000 --- a/api/core/rag/datasource/vdb/clickzetta/__init__.py +++ /dev/null @@ -1 +0,0 @@ -# Clickzetta Vector Database Integration for Dify diff --git a/api/core/rag/datasource/vdb/clickzetta/clickzetta_vector.py b/api/core/rag/datasource/vdb/clickzetta/clickzetta_vector.py deleted file mode 100644 index 8e8120fc10..0000000000 --- a/api/core/rag/datasource/vdb/clickzetta/clickzetta_vector.py +++ /dev/null @@ -1,1079 +0,0 @@ -from __future__ import annotations - -import contextlib -import json -import logging -import queue -import re -import threading -import time -import uuid -from typing import TYPE_CHECKING, Any - -import clickzetta # type: ignore -from pydantic import BaseModel, model_validator - -if TYPE_CHECKING: - from clickzetta.connector.v0.connection import Connection # type: ignore - -from configs import dify_config -from core.rag.datasource.vdb.field import Field -from core.rag.datasource.vdb.vector_base import BaseVector -from core.rag.datasource.vdb.vector_factory import AbstractVectorFactory -from core.rag.embedding.embedding_base import Embeddings -from core.rag.models.document import Document -from models.dataset import Dataset - -logger = logging.getLogger(__name__) - - -# ClickZetta Lakehouse Vector Database Configuration - - -class ClickzettaConfig(BaseModel): - """ - Configuration class for Clickzetta connection. - """ - - username: str - password: str - instance: str - service: str = "api.clickzetta.com" - workspace: str = "quick_start" - vcluster: str = "default_ap" - schema_name: str = "dify" # Renamed to avoid shadowing BaseModel.schema - # Advanced settings - batch_size: int = 20 # Reduced batch size to avoid large SQL statements - enable_inverted_index: bool = True # Enable inverted index for full-text search - analyzer_type: str = "chinese" # Analyzer type for full-text search: keyword, english, chinese, unicode - analyzer_mode: str = "smart" # Analyzer mode: max_word, smart - vector_distance_function: str = "cosine_distance" # l2_distance or cosine_distance - - @model_validator(mode="before") - @classmethod - def validate_config(cls, values: dict): - """ - Validate the configuration values. - """ - if not values.get("username"): - raise ValueError("config CLICKZETTA_USERNAME is required") - if not values.get("password"): - raise ValueError("config CLICKZETTA_PASSWORD is required") - if not values.get("instance"): - raise ValueError("config CLICKZETTA_INSTANCE is required") - if not values.get("service"): - raise ValueError("config CLICKZETTA_SERVICE is required") - if not values.get("workspace"): - raise ValueError("config CLICKZETTA_WORKSPACE is required") - if not values.get("vcluster"): - raise ValueError("config CLICKZETTA_VCLUSTER is required") - if not values.get("schema_name"): - raise ValueError("config CLICKZETTA_SCHEMA is required") - return values - - -class ClickzettaConnectionPool: - """ - Global connection pool for ClickZetta connections. - Manages connection reuse across ClickzettaVector instances. - """ - - _instance: ClickzettaConnectionPool | None = None - _lock = threading.Lock() - - def __init__(self): - self._pools: dict[str, list[tuple[Connection, float]]] = {} # config_key -> [(connection, last_used_time)] - self._pool_locks: dict[str, threading.Lock] = {} - self._max_pool_size = 5 # Maximum connections per configuration - self._connection_timeout = 300 # 5 minutes timeout - self._cleanup_thread: threading.Thread | None = None - self._shutdown = False - self._start_cleanup_thread() - - @classmethod - def get_instance(cls) -> ClickzettaConnectionPool: - """Get singleton instance of connection pool.""" - if cls._instance is None: - with cls._lock: - if cls._instance is None: - cls._instance = cls() - return cls._instance - - def _get_config_key(self, config: ClickzettaConfig) -> str: - """Generate unique key for connection configuration.""" - return ( - f"{config.username}:{config.instance}:{config.service}:" - f"{config.workspace}:{config.vcluster}:{config.schema_name}" - ) - - def _create_connection(self, config: ClickzettaConfig) -> Connection: - """Create a new ClickZetta connection.""" - max_retries = 3 - retry_delay = 1.0 - - for attempt in range(max_retries): - try: - connection = clickzetta.connect( - username=config.username, - password=config.password, - instance=config.instance, - service=config.service, - workspace=config.workspace, - vcluster=config.vcluster, - schema=config.schema_name, - ) - - # Configure connection session settings - self._configure_connection(connection) - logger.debug("Created new ClickZetta connection (attempt %d/%d)", attempt + 1, max_retries) - return connection - except Exception: - logger.exception("ClickZetta connection attempt %d/%d failed", attempt + 1, max_retries) - if attempt < max_retries - 1: - time.sleep(retry_delay * (2**attempt)) - else: - raise - - raise RuntimeError(f"Failed to create ClickZetta connection after {max_retries} attempts") - - def _configure_connection(self, connection: Connection): - """Configure connection session settings.""" - try: - with connection.cursor() as cursor: - # Temporarily suppress ClickZetta client logging to reduce noise - clickzetta_logger = logging.getLogger("clickzetta") - original_level = clickzetta_logger.level - clickzetta_logger.setLevel(logging.WARNING) - - try: - # Use quote mode for string literal escaping - cursor.execute("SET cz.sql.string.literal.escape.mode = 'quote'") - - # Apply performance optimization hints - performance_hints = [ - # Vector index optimization - "SET cz.storage.parquet.vector.index.read.memory.cache = true", - "SET cz.storage.parquet.vector.index.read.local.cache = false", - # Query optimization - "SET cz.sql.table.scan.push.down.filter = true", - "SET cz.sql.table.scan.enable.ensure.filter = true", - "SET cz.storage.always.prefetch.internal = true", - "SET cz.optimizer.generate.columns.always.valid = true", - "SET cz.sql.index.prewhere.enabled = true", - # Storage optimization - "SET cz.storage.parquet.enable.io.prefetch = false", - "SET cz.optimizer.enable.mv.rewrite = false", - "SET cz.sql.dump.as.lz4 = true", - "SET cz.optimizer.limited.optimization.naive.query = true", - "SET cz.sql.table.scan.enable.push.down.log = false", - "SET cz.storage.use.file.format.local.stats = false", - "SET cz.storage.local.file.object.cache.level = all", - # Job execution optimization - "SET cz.sql.job.fast.mode = true", - "SET cz.storage.parquet.non.contiguous.read = true", - "SET cz.sql.compaction.after.commit = true", - ] - - for hint in performance_hints: - cursor.execute(hint) - finally: - # Restore original logging level - clickzetta_logger.setLevel(original_level) - - except Exception: - logger.exception("Failed to configure connection, continuing with defaults") - - def _is_connection_valid(self, connection: Connection) -> bool: - """Check if connection is still valid.""" - try: - with connection.cursor() as cursor: - cursor.execute("SELECT 1") - return True - except Exception: - return False - - def get_connection(self, config: ClickzettaConfig) -> Connection: - """Get a connection from the pool or create a new one.""" - config_key = self._get_config_key(config) - - # Ensure pool lock exists - if config_key not in self._pool_locks: - with self._lock: - if config_key not in self._pool_locks: - self._pool_locks[config_key] = threading.Lock() - self._pools[config_key] = [] - - with self._pool_locks[config_key]: - pool = self._pools[config_key] - current_time = time.time() - - # Try to reuse existing connection - while pool: - connection, last_used = pool.pop(0) - - # Check if connection is not expired and still valid - if current_time - last_used < self._connection_timeout and self._is_connection_valid(connection): - logger.debug("Reusing ClickZetta connection from pool") - return connection - else: - # Connection expired or invalid, close it - with contextlib.suppress(Exception): - connection.close() - - # No valid connection found, create new one - return self._create_connection(config) - - def return_connection(self, config: ClickzettaConfig, connection: Connection): - """Return a connection to the pool.""" - config_key = self._get_config_key(config) - - if config_key not in self._pool_locks: - # Pool was cleaned up, just close the connection - with contextlib.suppress(Exception): - connection.close() - return - - with self._pool_locks[config_key]: - pool = self._pools[config_key] - - # Only return to pool if not at capacity and connection is valid - if len(pool) < self._max_pool_size and self._is_connection_valid(connection): - pool.append((connection, time.time())) - logger.debug("Returned ClickZetta connection to pool") - else: - # Pool full or connection invalid, close it - with contextlib.suppress(Exception): - connection.close() - - def _cleanup_expired_connections(self): - """Clean up expired connections from all pools.""" - current_time = time.time() - - with self._lock: - for config_key in list(self._pools.keys()): - if config_key not in self._pool_locks: - continue - - with self._pool_locks[config_key]: - pool = self._pools[config_key] - valid_connections = [] - - for connection, last_used in pool: - if current_time - last_used < self._connection_timeout: - valid_connections.append((connection, last_used)) - else: - with contextlib.suppress(Exception): - connection.close() - - self._pools[config_key] = valid_connections - - def _start_cleanup_thread(self): - """Start background thread for connection cleanup.""" - - def cleanup_worker(): - while not self._shutdown: - try: - time.sleep(60) # Cleanup every minute - if not self._shutdown: - self._cleanup_expired_connections() - except Exception: - logger.exception("Error in connection pool cleanup") - - self._cleanup_thread = threading.Thread(target=cleanup_worker, daemon=True) - self._cleanup_thread.start() - - def shutdown(self): - """Shutdown connection pool and close all connections.""" - self._shutdown = True - - with self._lock: - for config_key in list(self._pools.keys()): - if config_key not in self._pool_locks: - continue - - with self._pool_locks[config_key]: - pool = self._pools[config_key] - for connection, _ in pool: - with contextlib.suppress(Exception): - connection.close() - pool.clear() - - -class ClickzettaVector(BaseVector): - """ - Clickzetta vector storage implementation. - """ - - # Class-level write queue and lock for serializing writes - _write_queue: queue.Queue | None = None - _write_thread: threading.Thread | None = None - _write_lock = threading.Lock() - _shutdown = False - - def __init__(self, collection_name: str, config: ClickzettaConfig): - super().__init__(collection_name) - self._config = config - self._table_name = collection_name.replace("-", "_").lower() # Ensure valid table name - self._connection_pool = ClickzettaConnectionPool.get_instance() - self._init_write_queue() - - def _get_connection(self) -> Connection: - """Get a connection from the pool.""" - return self._connection_pool.get_connection(self._config) - - def _return_connection(self, connection: Connection): - """Return a connection to the pool.""" - self._connection_pool.return_connection(self._config, connection) - - class ConnectionContext: - """Context manager for borrowing and returning connections.""" - - def __init__(self, vector_instance: ClickzettaVector): - self.vector = vector_instance - self.connection: Connection | None = None - - def __enter__(self) -> Connection: - self.connection = self.vector._get_connection() - return self.connection - - def __exit__(self, exc_type, exc_val, exc_tb): - if self.connection: - self.vector._return_connection(self.connection) - - def get_connection_context(self) -> ClickzettaVector.ConnectionContext: - """Get a connection context manager.""" - return self.ConnectionContext(self) - - def _parse_metadata(self, raw_metadata: str, row_id: str): - """ - Parse metadata from JSON string with proper error handling and fallback. - - Args: - raw_metadata: Raw JSON string from database - row_id: Row ID for fallback document_id - - Returns: - Parsed metadata dict with guaranteed required fields - """ - try: - if raw_metadata: - metadata = json.loads(raw_metadata) - - # Handle double-encoded JSON - if isinstance(metadata, str): - metadata = json.loads(metadata) - - # Ensure we have a dict - if not isinstance(metadata, dict): - metadata = {} - else: - metadata = {} - except (json.JSONDecodeError, TypeError): - logger.exception("JSON parsing failed for metadata") - # Fallback: extract document_id with regex - doc_id_match = re.search(r'"document_id":\s*"([^"]+)"', raw_metadata or "") - metadata = {"document_id": doc_id_match.group(1)} if doc_id_match else {} - - # Ensure required fields are set - metadata["doc_id"] = row_id # segment id - - # Ensure document_id exists (critical for Dify's format_retrieval_documents) - if "document_id" not in metadata: - metadata["document_id"] = row_id # fallback to segment id - - return metadata - - @classmethod - def _init_write_queue(cls): - """Initialize the write queue and worker thread.""" - with cls._write_lock: - if cls._write_queue is None: - cls._write_queue = queue.Queue() - cls._write_thread = threading.Thread(target=cls._write_worker, daemon=True) - cls._write_thread.start() - logger.info("Started Clickzetta write worker thread") - - @classmethod - def _write_worker(cls): - """Worker thread that processes write tasks sequentially.""" - while not cls._shutdown: - try: - # Get task from queue with timeout - if cls._write_queue is not None: - task = cls._write_queue.get(timeout=1) - if task is None: # Shutdown signal - break - - # Execute the write task - func, args, kwargs, result_queue = task - try: - result = func(*args, **kwargs) - result_queue.put((True, result)) - except (RuntimeError, ValueError, TypeError, ConnectionError) as e: - logger.exception("Write task failed") - result_queue.put((False, e)) - finally: - cls._write_queue.task_done() - else: - break - except queue.Empty: - continue - except (RuntimeError, ValueError, TypeError, ConnectionError) as e: - logger.exception("Write worker error") - - def _execute_write(self, func, *args, **kwargs): - """Execute a write operation through the queue.""" - if ClickzettaVector._write_queue is None: - raise RuntimeError("Write queue not initialized") - - result_queue: queue.Queue[tuple[bool, Any]] = queue.Queue() - ClickzettaVector._write_queue.put((func, args, kwargs, result_queue)) - - # Wait for result - success, result = result_queue.get() - if not success: - raise result - return result - - def get_type(self) -> str: - """Return the vector database type.""" - return "clickzetta" - - def _ensure_connection(self) -> Connection: - """Get a connection from the pool.""" - return self._get_connection() - - def _table_exists(self) -> bool: - """Check if the table exists.""" - try: - with self.get_connection_context() as connection: - with connection.cursor() as cursor: - cursor.execute(f"DESC {self._config.schema_name}.{self._table_name}") - return True - except Exception as e: - error_message = str(e).lower() - # Handle ClickZetta specific "table or view not found" errors - if any( - phrase in error_message - for phrase in ["table or view not found", "czlh-42000", "semantic analysis exception"] - ): - logger.debug("Table %s.%s does not exist", self._config.schema_name, self._table_name) - return False - else: - # For other connection/permission errors, log warning but return False to avoid blocking cleanup - logger.exception( - "Table existence check failed for %s.%s, assuming it doesn't exist", - self._config.schema_name, - self._table_name, - ) - return False - - def create(self, texts: list[Document], embeddings: list[list[float]], **kwargs): - """Create the collection and add initial documents.""" - # Execute table creation through write queue to avoid concurrent conflicts - self._execute_write(self._create_table_and_indexes, embeddings) - - # Add initial texts - if texts: - self.add_texts(texts, embeddings, **kwargs) - - def _create_table_and_indexes(self, embeddings: list[list[float]]): - """Create table and indexes (executed in write worker thread).""" - # Check if table already exists to avoid unnecessary index creation - if self._table_exists(): - logger.info("Table %s.%s already exists, skipping creation", self._config.schema_name, self._table_name) - return - - # Create table with vector and metadata columns - dimension = len(embeddings[0]) if embeddings else 768 - - create_table_sql = f""" - CREATE TABLE IF NOT EXISTS {self._config.schema_name}.{self._table_name} ( - id STRING NOT NULL COMMENT 'Unique document identifier', - {Field.CONTENT_KEY} STRING NOT NULL COMMENT 'Document text content for search and retrieval', - {Field.METADATA_KEY} JSON COMMENT 'Document metadata including source, type, and other attributes', - {Field.VECTOR} VECTOR(FLOAT, {dimension}) NOT NULL COMMENT - 'High-dimensional embedding vector for semantic similarity search', - PRIMARY KEY (id) - ) COMMENT 'Dify RAG knowledge base vector storage table for document embeddings and content' - """ - - with self.get_connection_context() as connection: - with connection.cursor() as cursor: - cursor.execute(create_table_sql) - logger.info("Created table %s.%s", self._config.schema_name, self._table_name) - - # Create vector index - self._create_vector_index(cursor) - - # Create inverted index for full-text search if enabled - if self._config.enable_inverted_index: - self._create_inverted_index(cursor) - - def _create_vector_index(self, cursor): - """Create HNSW vector index for similarity search.""" - # Use a fixed index name based on table and column name - index_name = f"idx_{self._table_name}_vector" - - # First check if an index already exists on this column - try: - cursor.execute(f"SHOW INDEX FROM {self._config.schema_name}.{self._table_name}") - existing_indexes = cursor.fetchall() - for idx in existing_indexes: - # Check if vector index already exists on the embedding column - if Field.VECTOR in str(idx).lower(): - logger.info("Vector index already exists on column %s", Field.VECTOR) - return - except (RuntimeError, ValueError) as e: - logger.warning("Failed to check existing indexes: %s", e) - - index_sql = f""" - CREATE VECTOR INDEX IF NOT EXISTS {index_name} - ON TABLE {self._config.schema_name}.{self._table_name}({Field.VECTOR}) - PROPERTIES ( - "distance.function" = "{self._config.vector_distance_function}", - "scalar.type" = "f32", - "m" = "16", - "ef.construction" = "128" - ) - """ - try: - cursor.execute(index_sql) - logger.info("Created vector index: %s", index_name) - except (RuntimeError, ValueError) as e: - error_msg = str(e).lower() - if "already exists" in error_msg or "already has index" in error_msg or "with the same type" in error_msg: - logger.info("Vector index already exists: %s", e) - else: - logger.exception("Failed to create vector index") - raise - - def _create_inverted_index(self, cursor): - """Create inverted index for full-text search.""" - # Use a fixed index name based on table name to avoid duplicates - index_name = f"idx_{self._table_name}_text" - - # Check if an inverted index already exists on this column - try: - cursor.execute(f"SHOW INDEX FROM {self._config.schema_name}.{self._table_name}") - existing_indexes = cursor.fetchall() - for idx in existing_indexes: - idx_str = str(idx).lower() - # More precise check: look for inverted index specifically on the content column - if ( - "inverted" in idx_str - and Field.CONTENT_KEY.lower() in idx_str - and (index_name.lower() in idx_str or f"idx_{self._table_name}_text" in idx_str) - ): - logger.info("Inverted index already exists on column %s: %s", Field.CONTENT_KEY, idx) - return - except (RuntimeError, ValueError) as e: - logger.warning("Failed to check existing indexes: %s", e) - - index_sql = f""" - CREATE INVERTED INDEX IF NOT EXISTS {index_name} - ON TABLE {self._config.schema_name}.{self._table_name} ({Field.CONTENT_KEY}) - PROPERTIES ( - "analyzer" = "{self._config.analyzer_type}", - "mode" = "{self._config.analyzer_mode}" - ) - """ - try: - cursor.execute(index_sql) - logger.info("Created inverted index: %s", index_name) - except (RuntimeError, ValueError) as e: - error_msg = str(e).lower() - # Handle ClickZetta specific error messages - if ( - "already exists" in error_msg - or "already has index" in error_msg - or "with the same type" in error_msg - or "cannot create inverted index" in error_msg - ) and "already has index" in error_msg: - logger.info("Inverted index already exists on column %s", Field.CONTENT_KEY) - # Try to get the existing index name for logging - try: - cursor.execute(f"SHOW INDEX FROM {self._config.schema_name}.{self._table_name}") - existing_indexes = cursor.fetchall() - for idx in existing_indexes: - if "inverted" in str(idx).lower() and Field.CONTENT_KEY.lower() in str(idx).lower(): - logger.info("Found existing inverted index: %s", idx) - break - except (RuntimeError, ValueError): - pass - else: - logger.warning("Failed to create inverted index: %s", e) - # Continue without inverted index - full-text search will fall back to LIKE - - def add_texts(self, documents: list[Document], embeddings: list[list[float]], **kwargs) -> list[str]: - """Add documents with embeddings to the collection.""" - if not documents: - return [] - - batch_size = self._config.batch_size - total_batches = (len(documents) + batch_size - 1) // batch_size - added_ids = [] - - for i in range(0, len(documents), batch_size): - batch_docs = documents[i : i + batch_size] - batch_embeddings = embeddings[i : i + batch_size] - batch_doc_ids = [] - for doc in batch_docs: - metadata = doc.metadata if isinstance(doc.metadata, dict) else {} - batch_doc_ids.append(self._safe_doc_id(metadata.get("doc_id", str(uuid.uuid4())))) - added_ids.extend(batch_doc_ids) - - # Execute batch insert through write queue - self._execute_write( - self._insert_batch, batch_docs, batch_embeddings, batch_doc_ids, i, batch_size, total_batches - ) - - return added_ids - - def _insert_batch( - self, - batch_docs: list[Document], - batch_embeddings: list[list[float]], - batch_doc_ids: list[str], - batch_index: int, - batch_size: int, - total_batches: int, - ): - """Insert a batch of documents using parameterized queries (executed in write worker thread).""" - if not batch_docs or not batch_embeddings: - logger.warning("Empty batch provided, skipping insertion") - return - - if len(batch_docs) != len(batch_embeddings): - logger.error("Mismatch between docs (%d) and embeddings (%d)", len(batch_docs), len(batch_embeddings)) - return - - # Prepare data for parameterized insertion - data_rows = [] - vector_dimension = len(batch_embeddings[0]) if batch_embeddings and batch_embeddings[0] else 768 - - for doc, embedding, doc_id in zip(batch_docs, batch_embeddings, batch_doc_ids): - # Optimized: minimal checks for common case, fallback for edge cases - metadata = doc.metadata if isinstance(doc.metadata, dict) else {} - - # Fast path for JSON serialization - try: - metadata_json = json.dumps(metadata, ensure_ascii=True) - except (TypeError, ValueError): - logger.warning("JSON serialization failed, using empty dict") - metadata_json = "{}" - - content = doc.page_content or "" - - # According to ClickZetta docs, vector should be formatted as array string - # for external systems: '[1.0, 2.0, 3.0]' - vector_str = "[" + ",".join(map(str, embedding)) + "]" - data_rows.append([doc_id, content, metadata_json, vector_str]) - - # Check if we have any valid data to insert - if not data_rows: - logger.warning("No valid documents to insert in batch %d/%d", batch_index // batch_size + 1, total_batches) - return - - # Use parameterized INSERT with executemany for better performance and security - # Cast JSON and VECTOR in SQL, pass raw data as parameters - columns = f"id, {Field.CONTENT_KEY}, {Field.METADATA_KEY}, {Field.VECTOR}" - insert_sql = ( - f"INSERT INTO {self._config.schema_name}.{self._table_name} ({columns}) " - f"VALUES (?, ?, CAST(? AS JSON), CAST(? AS VECTOR({vector_dimension})))" - ) - - with self.get_connection_context() as connection: - with connection.cursor() as cursor: - try: - # Set session-level hints for batch insert operations - # Note: executemany doesn't support hints parameter, so we set them as session variables - # Temporarily suppress ClickZetta client logging to reduce noise - clickzetta_logger = logging.getLogger("clickzetta") - original_level = clickzetta_logger.level - clickzetta_logger.setLevel(logging.WARNING) - - try: - cursor.execute("SET cz.sql.job.fast.mode = true") - cursor.execute("SET cz.sql.compaction.after.commit = true") - cursor.execute("SET cz.storage.always.prefetch.internal = true") - finally: - # Restore original logging level - clickzetta_logger.setLevel(original_level) - - cursor.executemany(insert_sql, data_rows) - logger.info( - "Inserted batch %d/%d (%d valid docs using parameterized query with VECTOR(%d) cast)", - batch_index // batch_size + 1, - total_batches, - len(data_rows), - vector_dimension, - ) - except (RuntimeError, ValueError, TypeError, ConnectionError): - logger.exception("Parameterized SQL execution failed for %d documents", len(data_rows)) - logger.exception("SQL template: %s", insert_sql) - logger.exception("Sample data row: %s", data_rows[0] if data_rows else "None") - raise - - def text_exists(self, id: str) -> bool: - """Check if a document exists by ID.""" - # Check if table exists first - if not self._table_exists(): - return False - - safe_id = self._safe_doc_id(id) - with self.get_connection_context() as connection: - with connection.cursor() as cursor: - cursor.execute( - f"SELECT COUNT(*) FROM {self._config.schema_name}.{self._table_name} WHERE id = ?", - binding_params=[safe_id], - ) - result = cursor.fetchone() - return result[0] > 0 if result else False - - def delete_by_ids(self, ids: list[str]): - """Delete documents by IDs.""" - if not ids: - return - - # Check if table exists before attempting delete - if not self._table_exists(): - logger.warning("Table %s.%s does not exist, skipping delete", self._config.schema_name, self._table_name) - return - - # Execute delete through write queue - self._execute_write(self._delete_by_ids_impl, ids) - - def _delete_by_ids_impl(self, ids: list[str]): - """Implementation of delete by IDs (executed in write worker thread).""" - safe_ids = [self._safe_doc_id(id) for id in ids] - - # Use parameterized query to prevent SQL injection - placeholders = ",".join("?" for _ in safe_ids) - sql = f"DELETE FROM {self._config.schema_name}.{self._table_name} WHERE id IN ({placeholders})" - - with self.get_connection_context() as connection: - with connection.cursor() as cursor: - cursor.execute(sql, binding_params=safe_ids) - - def delete_by_metadata_field(self, key: str, value: str): - """Delete documents by metadata field.""" - # Check if table exists before attempting delete - if not self._table_exists(): - logger.warning("Table %s.%s does not exist, skipping delete", self._config.schema_name, self._table_name) - return - - # Execute delete through write queue - self._execute_write(self._delete_by_metadata_field_impl, key, value) - - def _delete_by_metadata_field_impl(self, key: str, value: str): - """Implementation of delete by metadata field (executed in write worker thread).""" - with self.get_connection_context() as connection: - with connection.cursor() as cursor: - # Using JSON path to filter with parameterized query - # Note: JSON path requires literal key name, cannot be parameterized - # Use json_extract_string function for ClickZetta compatibility - sql = ( - f"DELETE FROM {self._config.schema_name}.{self._table_name} " - f"WHERE json_extract_string({Field.METADATA_KEY}, '$.{key}') = ?" - ) - cursor.execute(sql, binding_params=[value]) - - def search_by_vector(self, query_vector: list[float], **kwargs: Any) -> list[Document]: - """Search for documents by vector similarity.""" - # Check if table exists first - if not self._table_exists(): - logger.warning( - "Table %s.%s does not exist, returning empty results", - self._config.schema_name, - self._table_name, - ) - return [] - - top_k = kwargs.get("top_k", 10) - score_threshold = kwargs.get("score_threshold", 0.0) - document_ids_filter = kwargs.get("document_ids_filter") - - # Handle filter parameter from canvas (workflow) - _ = kwargs.get("filter", {}) - - # Build filter clause - filter_clauses = [] - if document_ids_filter: - safe_doc_ids = [str(id).replace("'", "''") for id in document_ids_filter] - doc_ids_str = ",".join(f"'{id}'" for id in safe_doc_ids) - # Use json_extract_string function for ClickZetta compatibility - filter_clauses.append(f"json_extract_string({Field.METADATA_KEY}, '$.document_id') IN ({doc_ids_str})") - - # No need for dataset_id filter since each dataset has its own table - - # Add distance threshold based on distance function - vector_dimension = len(query_vector) - if self._config.vector_distance_function == "cosine_distance": - # For cosine distance, smaller is better (0 = identical, 2 = opposite) - distance_func = "COSINE_DISTANCE" - if score_threshold > 0: - query_vector_str = f"CAST('[{self._format_vector_simple(query_vector)}]' AS VECTOR({vector_dimension}))" - filter_clauses.append(f"{distance_func}({Field.VECTOR}, {query_vector_str}) < {2 - score_threshold}") - else: - # For L2 distance, smaller is better - distance_func = "L2_DISTANCE" - if score_threshold > 0: - query_vector_str = f"CAST('[{self._format_vector_simple(query_vector)}]' AS VECTOR({vector_dimension}))" - filter_clauses.append(f"{distance_func}({Field.VECTOR}, {query_vector_str}) < {score_threshold}") - - where_clause = " AND ".join(filter_clauses) if filter_clauses else "1=1" - - # Execute vector search query - query_vector_str = f"CAST('[{self._format_vector_simple(query_vector)}]' AS VECTOR({vector_dimension}))" - search_sql = f""" - SELECT id, {Field.CONTENT_KEY}, {Field.METADATA_KEY}, - {distance_func}({Field.VECTOR}, {query_vector_str}) AS distance - FROM {self._config.schema_name}.{self._table_name} - WHERE {where_clause} - ORDER BY distance - LIMIT {top_k} - """ - - documents = [] - with self.get_connection_context() as connection: - with connection.cursor() as cursor: - # Use hints parameter for vector search optimization - search_hints = { - "hints": { - "sdk.job.timeout": 60, # Increase timeout for vector search - "cz.sql.job.fast.mode": True, - "cz.storage.parquet.vector.index.read.memory.cache": True, - } - } - cursor.execute(search_sql, search_hints) - results = cursor.fetchall() - - for row in results: - # Parse metadata using centralized method - metadata = self._parse_metadata(row[2], row[0]) - - # Add score based on distance - if self._config.vector_distance_function == "cosine_distance": - metadata["score"] = 1 - (row[3] / 2) - else: - metadata["score"] = 1 / (1 + row[3]) - - doc = Document(page_content=row[1], metadata=metadata) - documents.append(doc) - - return documents - - def search_by_full_text(self, query: str, **kwargs: Any) -> list[Document]: - """Search for documents using full-text search with inverted index.""" - if not self._config.enable_inverted_index: - logger.warning("Full-text search is not enabled. Enable inverted index in config.") - return [] - - # Check if table exists first - if not self._table_exists(): - logger.warning( - "Table %s.%s does not exist, returning empty results", - self._config.schema_name, - self._table_name, - ) - return [] - - top_k = kwargs.get("top_k", 10) - document_ids_filter = kwargs.get("document_ids_filter") - - # Handle filter parameter from canvas (workflow) - _ = kwargs.get("filter", {}) - - # Build filter clause - filter_clauses = [] - if document_ids_filter: - safe_doc_ids = [str(id).replace("'", "''") for id in document_ids_filter] - doc_ids_str = ",".join(f"'{id}'" for id in safe_doc_ids) - # Use json_extract_string function for ClickZetta compatibility - filter_clauses.append(f"json_extract_string({Field.METADATA_KEY}, '$.document_id') IN ({doc_ids_str})") - - # No need for dataset_id filter since each dataset has its own table - - # Use match_all function for full-text search - # match_all requires all terms to be present - # Use simple quote escaping for MATCH_ALL since it needs to be in the WHERE clause - escaped_query = query.replace("'", "''") - filter_clauses.append(f"MATCH_ALL({Field.CONTENT_KEY}, '{escaped_query}')") - - where_clause = " AND ".join(filter_clauses) - - # Execute full-text search query - search_sql = f""" - SELECT id, {Field.CONTENT_KEY}, {Field.METADATA_KEY} - FROM {self._config.schema_name}.{self._table_name} - WHERE {where_clause} - LIMIT {top_k} - """ - - documents = [] - with self.get_connection_context() as connection: - with connection.cursor() as cursor: - try: - # Use hints parameter for full-text search optimization - fulltext_hints = { - "hints": { - "sdk.job.timeout": 30, # Timeout for full-text search - "cz.sql.job.fast.mode": True, - "cz.sql.index.prewhere.enabled": True, - } - } - cursor.execute(search_sql, fulltext_hints) - results = cursor.fetchall() - - for row in results: - # Parse metadata from JSON string (may be double-encoded) - try: - if row[2]: - metadata = json.loads(row[2]) - - # If result is a string, it's double-encoded JSON - parse again - if isinstance(metadata, str): - metadata = json.loads(metadata) - - if not isinstance(metadata, dict): - metadata = {} - else: - metadata = {} - except (json.JSONDecodeError, TypeError): - logger.exception("JSON parsing failed") - # Fallback: extract document_id with regex - - doc_id_match = re.search(r'"document_id":\s*"([^"]+)"', str(row[2] or "")) - metadata = {"document_id": doc_id_match.group(1)} if doc_id_match else {} - - # Ensure required fields are set - metadata["doc_id"] = row[0] # segment id - - # Ensure document_id exists (critical for Dify's format_retrieval_documents) - if "document_id" not in metadata: - metadata["document_id"] = row[0] # fallback to segment id - - # Add a relevance score for full-text search - metadata["score"] = 1.0 # Clickzetta doesn't provide relevance scores - doc = Document(page_content=row[1], metadata=metadata) - documents.append(doc) - except (RuntimeError, ValueError, TypeError, ConnectionError): - logger.exception("Full-text search failed") - # Fallback to LIKE search if full-text search fails - return self._search_by_like(query, **kwargs) - - return documents - - def _search_by_like(self, query: str, **kwargs: Any) -> list[Document]: - """Fallback search using LIKE operator.""" - # Check if table exists first - if not self._table_exists(): - logger.warning( - "Table %s.%s does not exist, returning empty results", - self._config.schema_name, - self._table_name, - ) - return [] - - top_k = kwargs.get("top_k", 10) - document_ids_filter = kwargs.get("document_ids_filter") - - # Handle filter parameter from canvas (workflow) - _ = kwargs.get("filter", {}) - - # Build filter clause - filter_clauses = [] - if document_ids_filter: - safe_doc_ids = [str(id).replace("'", "''") for id in document_ids_filter] - doc_ids_str = ",".join(f"'{id}'" for id in safe_doc_ids) - # Use json_extract_string function for ClickZetta compatibility - filter_clauses.append(f"json_extract_string({Field.METADATA_KEY}, '$.document_id') IN ({doc_ids_str})") - - # No need for dataset_id filter since each dataset has its own table - - # Escape special characters for LIKE clause to prevent SQL injection - from libs.helper import escape_like_pattern - - escaped_query = escape_like_pattern(query).replace("'", "''") - filter_clauses.append(f"{Field.CONTENT_KEY} LIKE '%{escaped_query}%' ESCAPE '\\\\'") - where_clause = " AND ".join(filter_clauses) - - search_sql = f""" - SELECT id, {Field.CONTENT_KEY}, {Field.METADATA_KEY} - FROM {self._config.schema_name}.{self._table_name} - WHERE {where_clause} - LIMIT {top_k} - """ - - documents = [] - with self.get_connection_context() as connection: - with connection.cursor() as cursor: - # Use hints parameter for LIKE search optimization - like_hints = { - "hints": { - "sdk.job.timeout": 20, # Timeout for LIKE search - "cz.sql.job.fast.mode": True, - } - } - cursor.execute(search_sql, like_hints) - results = cursor.fetchall() - - for row in results: - # Parse metadata using centralized method - metadata = self._parse_metadata(row[2], row[0]) - - metadata["score"] = 0.5 # Lower score for LIKE search - doc = Document(page_content=row[1], metadata=metadata) - documents.append(doc) - - return documents - - def delete(self): - """Delete the entire collection.""" - with self.get_connection_context() as connection: - with connection.cursor() as cursor: - cursor.execute(f"DROP TABLE IF EXISTS {self._config.schema_name}.{self._table_name}") - - def _format_vector_simple(self, vector: list[float]) -> str: - """Simple vector formatting for SQL queries.""" - return ",".join(map(str, vector)) - - def _safe_doc_id(self, doc_id: str) -> str: - """Ensure doc_id is safe for SQL and doesn't contain special characters.""" - if not doc_id: - return str(uuid.uuid4()) - # Remove or replace potentially problematic characters - safe_id = str(doc_id) - # Only allow alphanumeric, hyphens, underscores - safe_id = "".join(c for c in safe_id if c.isalnum() or c in "-_") - if not safe_id: # If all characters were removed - return str(uuid.uuid4()) - return safe_id[:255] # Limit length - - -class ClickzettaVectorFactory(AbstractVectorFactory): - """Factory for creating Clickzetta vector instances.""" - - def init_vector(self, dataset: Dataset, attributes: list, embeddings: Embeddings) -> BaseVector: - """Initialize a Clickzetta vector instance.""" - # Get configuration from environment variables or dataset config - config = ClickzettaConfig( - username=dify_config.CLICKZETTA_USERNAME or "", - password=dify_config.CLICKZETTA_PASSWORD or "", - instance=dify_config.CLICKZETTA_INSTANCE or "", - service=dify_config.CLICKZETTA_SERVICE or "api.clickzetta.com", - workspace=dify_config.CLICKZETTA_WORKSPACE or "quick_start", - vcluster=dify_config.CLICKZETTA_VCLUSTER or "default_ap", - schema_name=dify_config.CLICKZETTA_SCHEMA or "dify", - batch_size=dify_config.CLICKZETTA_BATCH_SIZE or 100, - enable_inverted_index=dify_config.CLICKZETTA_ENABLE_INVERTED_INDEX or True, - analyzer_type=dify_config.CLICKZETTA_ANALYZER_TYPE or "chinese", - analyzer_mode=dify_config.CLICKZETTA_ANALYZER_MODE or "smart", - vector_distance_function=dify_config.CLICKZETTA_VECTOR_DISTANCE_FUNCTION or "cosine_distance", - ) - - # Use dataset collection name as table name - collection_name = Dataset.gen_collection_name_by_id(dataset.id).lower() - - return ClickzettaVector(collection_name=collection_name, config=config) diff --git a/api/core/rag/datasource/vdb/vector_factory.py b/api/core/rag/datasource/vdb/vector_factory.py index cd12cd3fae..c48437e4a7 100644 --- a/api/core/rag/datasource/vdb/vector_factory.py +++ b/api/core/rag/datasource/vdb/vector_factory.py @@ -183,10 +183,6 @@ class Vector: from core.rag.datasource.vdb.matrixone.matrixone_vector import MatrixoneVectorFactory return MatrixoneVectorFactory - case VectorType.CLICKZETTA: - from core.rag.datasource.vdb.clickzetta.clickzetta_vector import ClickzettaVectorFactory - - return ClickzettaVectorFactory case VectorType.IRIS: from core.rag.datasource.vdb.iris.iris_vector import IrisVectorFactory diff --git a/api/core/rag/datasource/vdb/vector_type.py b/api/core/rag/datasource/vdb/vector_type.py index 9cce8e4c32..8872d4ca5b 100644 --- a/api/core/rag/datasource/vdb/vector_type.py +++ b/api/core/rag/datasource/vdb/vector_type.py @@ -32,6 +32,5 @@ class VectorType(StrEnum): TABLESTORE = "tablestore" HUAWEI_CLOUD = "huawei_cloud" MATRIXONE = "matrixone" - CLICKZETTA = "clickzetta" IRIS = "iris" HOLOGRES = "hologres" diff --git a/api/extensions/ext_storage.py b/api/extensions/ext_storage.py index db5a6e4812..cde7b62515 100644 --- a/api/extensions/ext_storage.py +++ b/api/extensions/ext_storage.py @@ -69,19 +69,6 @@ class Storage: from extensions.storage.supabase_storage import SupabaseStorage return SupabaseStorage - case StorageType.CLICKZETTA_VOLUME: - from extensions.storage.clickzetta_volume.clickzetta_volume_storage import ( - ClickZettaVolumeConfig, - ClickZettaVolumeStorage, - ) - - def create_clickzetta_volume_storage(): - # ClickZettaVolumeConfig will automatically read from environment variables - # and fallback to CLICKZETTA_* config if CLICKZETTA_VOLUME_* is not set - volume_config = ClickZettaVolumeConfig() - return ClickZettaVolumeStorage(volume_config) - - return create_clickzetta_volume_storage case _: raise ValueError(f"unsupported storage type {storage_type}") diff --git a/api/extensions/storage/clickzetta_volume/__init__.py b/api/extensions/storage/clickzetta_volume/__init__.py deleted file mode 100644 index 8a1588034b..0000000000 --- a/api/extensions/storage/clickzetta_volume/__init__.py +++ /dev/null @@ -1,5 +0,0 @@ -"""ClickZetta Volume storage implementation.""" - -from .clickzetta_volume_storage import ClickZettaVolumeStorage - -__all__ = ["ClickZettaVolumeStorage"] diff --git a/api/extensions/storage/clickzetta_volume/clickzetta_volume_storage.py b/api/extensions/storage/clickzetta_volume/clickzetta_volume_storage.py deleted file mode 100644 index 18eed4e481..0000000000 --- a/api/extensions/storage/clickzetta_volume/clickzetta_volume_storage.py +++ /dev/null @@ -1,527 +0,0 @@ -"""ClickZetta Volume Storage Implementation - -This module provides storage backend using ClickZetta Volume functionality. -Supports Table Volume, User Volume, and External Volume types. -""" - -import logging -import os -import tempfile -from collections.abc import Generator -from io import BytesIO -from pathlib import Path - -import clickzetta -from pydantic import BaseModel, model_validator - -from extensions.storage.base_storage import BaseStorage - -from .volume_permissions import VolumePermissionManager, check_volume_permission - -logger = logging.getLogger(__name__) - - -class ClickZettaVolumeConfig(BaseModel): - """Configuration for ClickZetta Volume storage.""" - - username: str = "" - password: str = "" - instance: str = "" - service: str = "api.clickzetta.com" - workspace: str = "quick_start" - vcluster: str = "default_ap" - schema_name: str = "dify" - volume_type: str = "table" # table|user|external - volume_name: str | None = None # For external volumes - table_prefix: str = "dataset_" # Prefix for table volume names - dify_prefix: str = "dify_km" # Directory prefix for User Volume - permission_check: bool = True # Enable/disable permission checking - - @model_validator(mode="before") - @classmethod - def validate_config(cls, values: dict): - """Validate the configuration values. - - This method will first try to use CLICKZETTA_VOLUME_* environment variables, - then fall back to CLICKZETTA_* environment variables (for vector DB config). - """ - - # Helper function to get environment variable with fallback - def get_env_with_fallback(volume_key: str, fallback_key: str, default: str | None = None) -> str: - # First try CLICKZETTA_VOLUME_* specific config - volume_value = values.get(volume_key.lower().replace("clickzetta_volume_", "")) - if volume_value: - return str(volume_value) - - # Then try environment variables - volume_env = os.getenv(volume_key) - if volume_env: - return volume_env - - # Fall back to existing CLICKZETTA_* config - fallback_env = os.getenv(fallback_key) - if fallback_env: - return fallback_env - - return default or "" - - # Apply environment variables with fallback to existing CLICKZETTA_* config - values.setdefault("username", get_env_with_fallback("CLICKZETTA_VOLUME_USERNAME", "CLICKZETTA_USERNAME")) - values.setdefault("password", get_env_with_fallback("CLICKZETTA_VOLUME_PASSWORD", "CLICKZETTA_PASSWORD")) - values.setdefault("instance", get_env_with_fallback("CLICKZETTA_VOLUME_INSTANCE", "CLICKZETTA_INSTANCE")) - values.setdefault( - "service", get_env_with_fallback("CLICKZETTA_VOLUME_SERVICE", "CLICKZETTA_SERVICE", "api.clickzetta.com") - ) - values.setdefault( - "workspace", get_env_with_fallback("CLICKZETTA_VOLUME_WORKSPACE", "CLICKZETTA_WORKSPACE", "quick_start") - ) - values.setdefault( - "vcluster", get_env_with_fallback("CLICKZETTA_VOLUME_VCLUSTER", "CLICKZETTA_VCLUSTER", "default_ap") - ) - values.setdefault("schema_name", get_env_with_fallback("CLICKZETTA_VOLUME_SCHEMA", "CLICKZETTA_SCHEMA", "dify")) - - # Volume-specific configurations (no fallback to vector DB config) - values.setdefault("volume_type", os.getenv("CLICKZETTA_VOLUME_TYPE", "table")) - values.setdefault("volume_name", os.getenv("CLICKZETTA_VOLUME_NAME")) - values.setdefault("table_prefix", os.getenv("CLICKZETTA_VOLUME_TABLE_PREFIX", "dataset_")) - values.setdefault("dify_prefix", os.getenv("CLICKZETTA_VOLUME_DIFY_PREFIX", "dify_km")) - # Temporarily disable permission check feature, set directly to false - values.setdefault("permission_check", False) - - # Validate required fields - if not values.get("username"): - raise ValueError("CLICKZETTA_VOLUME_USERNAME or CLICKZETTA_USERNAME is required") - if not values.get("password"): - raise ValueError("CLICKZETTA_VOLUME_PASSWORD or CLICKZETTA_PASSWORD is required") - if not values.get("instance"): - raise ValueError("CLICKZETTA_VOLUME_INSTANCE or CLICKZETTA_INSTANCE is required") - - # Validate volume type - volume_type = values["volume_type"] - if volume_type not in ["table", "user", "external"]: - raise ValueError("CLICKZETTA_VOLUME_TYPE must be one of: table, user, external") - - if volume_type == "external" and not values.get("volume_name"): - raise ValueError("CLICKZETTA_VOLUME_NAME is required for external volume type") - - return values - - -class ClickZettaVolumeStorage(BaseStorage): - """ClickZetta Volume storage implementation.""" - - def __init__(self, config: ClickZettaVolumeConfig): - """Initialize ClickZetta Volume storage. - - Args: - config: ClickZetta Volume configuration - """ - self._config = config - self._connection = None - self._permission_manager: VolumePermissionManager | None = None - self._init_connection() - self._init_permission_manager() - - logger.info("ClickZetta Volume storage initialized with type: %s", config.volume_type) - - def _init_connection(self): - """Initialize ClickZetta connection.""" - try: - self._connection = clickzetta.connect( - username=self._config.username, - password=self._config.password, - instance=self._config.instance, - service=self._config.service, - workspace=self._config.workspace, - vcluster=self._config.vcluster, - schema=self._config.schema_name, - ) - logger.debug("ClickZetta connection established") - except Exception: - logger.exception("Failed to connect to ClickZetta") - raise - - def _init_permission_manager(self): - """Initialize permission manager.""" - try: - self._permission_manager = VolumePermissionManager( - self._connection, self._config.volume_type, self._config.volume_name - ) - logger.debug("Permission manager initialized") - except Exception: - logger.exception("Failed to initialize permission manager") - raise - - def _get_volume_path(self, filename: str, dataset_id: str | None = None) -> str: - """Get the appropriate volume path based on volume type.""" - if self._config.volume_type == "user": - # Add dify prefix for User Volume to organize files - return f"{self._config.dify_prefix}/{filename}" - elif self._config.volume_type == "table": - # Check if this should use User Volume (special directories) - if dataset_id in ["upload_files", "temp", "cache", "tools", "website_files", "privkeys"]: - # Use User Volume with dify prefix for special directories - return f"{self._config.dify_prefix}/{filename}" - - if dataset_id: - return f"{self._config.table_prefix}{dataset_id}/{filename}" - else: - # Extract dataset_id from filename if not provided - # Format: dataset_id/filename - if "/" in filename: - return filename - else: - raise ValueError("dataset_id is required for table volume or filename must include dataset_id/") - elif self._config.volume_type == "external": - return filename - else: - raise ValueError(f"Unsupported volume type: {self._config.volume_type}") - - def _get_volume_sql_prefix(self, dataset_id: str | None = None) -> str: - """Get SQL prefix for volume operations.""" - if self._config.volume_type == "user": - return "USER VOLUME" - elif self._config.volume_type == "table": - # For Dify's current file storage pattern, most files are stored in - # paths like "upload_files/tenant_id/uuid.ext", "tools/tenant_id/uuid.ext" - # These should use USER VOLUME for better compatibility - if dataset_id in ["upload_files", "temp", "cache", "tools", "website_files", "privkeys"]: - return "USER VOLUME" - - # Only use TABLE VOLUME for actual dataset-specific paths - # like "dataset_12345/file.pdf" or paths with dataset_ prefix - if dataset_id: - table_name = f"{self._config.table_prefix}{dataset_id}" - else: - # Default table name for generic operations - table_name = "default_dataset" - return f"TABLE VOLUME {table_name}" - elif self._config.volume_type == "external": - return f"VOLUME {self._config.volume_name}" - else: - raise ValueError(f"Unsupported volume type: {self._config.volume_type}") - - def _execute_sql(self, sql: str, fetch: bool = False): - """Execute SQL command.""" - try: - if self._connection is None: - raise RuntimeError("Connection not initialized") - with self._connection.cursor() as cursor: - cursor.execute(sql) - if fetch: - return cursor.fetchall() - return None - except Exception: - logger.exception("SQL execution failed: %s", sql) - raise - - def _ensure_table_volume_exists(self, dataset_id: str): - """Ensure table volume exists for the given dataset_id.""" - if self._config.volume_type != "table" or not dataset_id: - return - - # Skip for upload_files and other special directories that use USER VOLUME - if dataset_id in ["upload_files", "temp", "cache", "tools", "website_files", "privkeys"]: - return - - table_name = f"{self._config.table_prefix}{dataset_id}" - - try: - # Check if table exists - check_sql = f"SHOW TABLES LIKE '{table_name}'" - result = self._execute_sql(check_sql, fetch=True) - - if not result: - # Create table with volume - create_sql = f""" - CREATE TABLE {table_name} ( - id INT PRIMARY KEY AUTO_INCREMENT, - filename VARCHAR(255) NOT NULL, - created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, - updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, - INDEX idx_filename (filename) - ) WITH VOLUME - """ - self._execute_sql(create_sql) - logger.info("Created table volume: %s", table_name) - - except Exception as e: - logger.warning("Failed to create table volume %s: %s", table_name, e) - # Don't raise exception, let the operation continue - # The table might exist but not be visible due to permissions - - def save(self, filename: str, data: bytes): - """Save data to ClickZetta Volume. - - Args: - filename: File path in volume - data: File content as bytes - """ - # Extract dataset_id from filename if present - dataset_id = None - if "/" in filename and self._config.volume_type == "table": - parts = filename.split("/", 1) - if parts[0].startswith(self._config.table_prefix): - dataset_id = parts[0][len(self._config.table_prefix) :] - filename = parts[1] - else: - dataset_id = parts[0] - filename = parts[1] - - # Ensure table volume exists (for table volumes) - if dataset_id: - self._ensure_table_volume_exists(dataset_id) - - # Check permissions (if enabled) - if self._config.permission_check: - # Skip permission check for special directories that use USER VOLUME - if dataset_id not in ["upload_files", "temp", "cache", "tools", "website_files", "privkeys"]: - if self._permission_manager is not None: - check_volume_permission(self._permission_manager, "save", dataset_id) - - # Write data to temporary file - with tempfile.NamedTemporaryFile(delete=False) as temp_file: - temp_file.write(data) - temp_file_path = temp_file.name - - try: - # Upload to volume - volume_prefix = self._get_volume_sql_prefix(dataset_id) - - # Get the actual volume path (may include dify_km prefix) - volume_path = self._get_volume_path(filename, dataset_id) - - # For User Volume, use the full path with dify_km prefix - if volume_prefix == "USER VOLUME": - sql = f"PUT '{temp_file_path}' TO {volume_prefix} FILE '{volume_path}'" - else: - sql = f"PUT '{temp_file_path}' TO {volume_prefix} FILE '{filename}'" - - self._execute_sql(sql) - logger.debug("File %s saved to ClickZetta Volume at path %s", filename, volume_path) - finally: - # Clean up temporary file - Path(temp_file_path).unlink(missing_ok=True) - - def load_once(self, filename: str) -> bytes: - """Load file content from ClickZetta Volume. - - Args: - filename: File path in volume - - Returns: - File content as bytes - """ - # Extract dataset_id from filename if present - dataset_id = None - if "/" in filename and self._config.volume_type == "table": - parts = filename.split("/", 1) - if parts[0].startswith(self._config.table_prefix): - dataset_id = parts[0][len(self._config.table_prefix) :] - filename = parts[1] - else: - dataset_id = parts[0] - filename = parts[1] - - # Check permissions (if enabled) - if self._config.permission_check: - # Skip permission check for special directories that use USER VOLUME - if dataset_id not in ["upload_files", "temp", "cache", "tools", "website_files", "privkeys"]: - if self._permission_manager is not None: - check_volume_permission(self._permission_manager, "load_once", dataset_id) - - # Download to temporary directory - with tempfile.TemporaryDirectory() as temp_dir: - volume_prefix = self._get_volume_sql_prefix(dataset_id) - - # Get the actual volume path (may include dify_km prefix) - volume_path = self._get_volume_path(filename, dataset_id) - - # For User Volume, use the full path with dify_km prefix - if volume_prefix == "USER VOLUME": - sql = f"GET {volume_prefix} FILE '{volume_path}' TO '{temp_dir}'" - else: - sql = f"GET {volume_prefix} FILE '{filename}' TO '{temp_dir}'" - - self._execute_sql(sql) - - # Find the downloaded file (may be in subdirectories) - downloaded_file = None - for root, _, files in os.walk(temp_dir): - for file in files: - if file == filename or file == os.path.basename(filename): - downloaded_file = Path(root) / file - break - if downloaded_file: - break - - if not downloaded_file or not downloaded_file.exists(): - raise FileNotFoundError(f"Downloaded file not found: {filename}") - - content = downloaded_file.read_bytes() - - logger.debug("File %s loaded from ClickZetta Volume", filename) - return content - - def load_stream(self, filename: str) -> Generator: - """Load file as stream from ClickZetta Volume. - - Args: - filename: File path in volume - - Yields: - File content chunks - """ - content = self.load_once(filename) - batch_size = 4096 - stream = BytesIO(content) - - while chunk := stream.read(batch_size): - yield chunk - - logger.debug("File %s loaded as stream from ClickZetta Volume", filename) - - def download(self, filename: str, target_filepath: str): - """Download file from ClickZetta Volume to local path. - - Args: - filename: File path in volume - target_filepath: Local target file path - """ - content = self.load_once(filename) - - Path(target_filepath).write_bytes(content) - - logger.debug("File %s downloaded from ClickZetta Volume to %s", filename, target_filepath) - - def exists(self, filename: str) -> bool: - """Check if file exists in ClickZetta Volume. - - Args: - filename: File path in volume - - Returns: - True if file exists, False otherwise - """ - try: - # Extract dataset_id from filename if present - dataset_id = None - if "/" in filename and self._config.volume_type == "table": - parts = filename.split("/", 1) - if parts[0].startswith(self._config.table_prefix): - dataset_id = parts[0][len(self._config.table_prefix) :] - filename = parts[1] - else: - dataset_id = parts[0] - filename = parts[1] - - volume_prefix = self._get_volume_sql_prefix(dataset_id) - - # Get the actual volume path (may include dify_km prefix) - volume_path = self._get_volume_path(filename, dataset_id) - - # For User Volume, use the full path with dify_km prefix - if volume_prefix == "USER VOLUME": - sql = f"LIST {volume_prefix} REGEXP = '^{volume_path}$'" - else: - sql = f"LIST {volume_prefix} REGEXP = '^{filename}$'" - - rows = self._execute_sql(sql, fetch=True) - - exists = len(rows) > 0 if rows else False - logger.debug("File %s exists check: %s", filename, exists) - return exists - except Exception as e: - logger.warning("Error checking file existence for %s: %s", filename, e) - return False - - def delete(self, filename: str): - """Delete file from ClickZetta Volume. - - Args: - filename: File path in volume - """ - if not self.exists(filename): - logger.debug("File %s not found, skip delete", filename) - return - - # Extract dataset_id from filename if present - dataset_id = None - if "/" in filename and self._config.volume_type == "table": - parts = filename.split("/", 1) - if parts[0].startswith(self._config.table_prefix): - dataset_id = parts[0][len(self._config.table_prefix) :] - filename = parts[1] - else: - dataset_id = parts[0] - filename = parts[1] - - volume_prefix = self._get_volume_sql_prefix(dataset_id) - - # Get the actual volume path (may include dify_km prefix) - volume_path = self._get_volume_path(filename, dataset_id) - - # For User Volume, use the full path with dify_km prefix - if volume_prefix == "USER VOLUME": - sql = f"REMOVE {volume_prefix} FILE '{volume_path}'" - else: - sql = f"REMOVE {volume_prefix} FILE '{filename}'" - - self._execute_sql(sql) - - logger.debug("File %s deleted from ClickZetta Volume", filename) - - def scan(self, path: str, files: bool = True, directories: bool = False) -> list[str]: - """Scan files and directories in ClickZetta Volume. - - Args: - path: Path to scan (dataset_id for table volumes) - files: Include files in results - directories: Include directories in results - - Returns: - List of file/directory paths - """ - try: - # For table volumes, path is treated as dataset_id - dataset_id = None - if self._config.volume_type == "table": - dataset_id = path - path = "" # Root of the table volume - - volume_prefix = self._get_volume_sql_prefix(dataset_id) - - # For User Volume, add dify prefix to path - if volume_prefix == "USER VOLUME": - if path: - scan_path = f"{self._config.dify_prefix}/{path}" - sql = f"LIST {volume_prefix} SUBDIRECTORY '{scan_path}'" - else: - sql = f"LIST {volume_prefix} SUBDIRECTORY '{self._config.dify_prefix}'" - else: - if path: - sql = f"LIST {volume_prefix} SUBDIRECTORY '{path}'" - else: - sql = f"LIST {volume_prefix}" - - rows = self._execute_sql(sql, fetch=True) - - result = [] - if rows: - for row in rows: - file_path = row[0] # relative_path column - - # For User Volume, remove dify prefix from results - dify_prefix_with_slash = f"{self._config.dify_prefix}/" - if volume_prefix == "USER VOLUME" and file_path.startswith(dify_prefix_with_slash): - file_path = file_path[len(dify_prefix_with_slash) :] # Remove prefix - - if files and not file_path.endswith("/") or directories and file_path.endswith("/"): - result.append(file_path) - - logger.debug("Scanned %d items in path %s", len(result), path) - return result - - except Exception: - logger.exception("Error scanning path %s", path) - return [] diff --git a/api/extensions/storage/clickzetta_volume/file_lifecycle.py b/api/extensions/storage/clickzetta_volume/file_lifecycle.py deleted file mode 100644 index 1d9911465b..0000000000 --- a/api/extensions/storage/clickzetta_volume/file_lifecycle.py +++ /dev/null @@ -1,518 +0,0 @@ -"""ClickZetta Volume file lifecycle management - -This module provides file lifecycle management features including version control, -automatic cleanup, backup and restore. -Supports complete lifecycle management for knowledge base files. -""" - -from __future__ import annotations - -import json -import logging -import operator -from dataclasses import asdict, dataclass -from datetime import datetime -from enum import StrEnum, auto -from typing import Any - -logger = logging.getLogger(__name__) - - -class FileStatus(StrEnum): - """File status enumeration""" - - ACTIVE = auto() # Active status - ARCHIVED = auto() # Archived - DELETED = auto() # Deleted (soft delete) - BACKUP = auto() # Backup file - - -@dataclass -class FileMetadata: - """File metadata""" - - filename: str - size: int | None - created_at: datetime - modified_at: datetime - version: int | None - status: FileStatus - checksum: str | None = None - tags: dict[str, str] | None = None - parent_version: int | None = None - - def to_dict(self): - """Convert to dictionary format""" - data = asdict(self) - data["created_at"] = self.created_at.isoformat() - data["modified_at"] = self.modified_at.isoformat() - data["status"] = self.status.value - return data - - @classmethod - def from_dict(cls, data: dict) -> FileMetadata: - """Create instance from dictionary""" - data = data.copy() - data["created_at"] = datetime.fromisoformat(data["created_at"]) - data["modified_at"] = datetime.fromisoformat(data["modified_at"]) - data["status"] = FileStatus(data["status"]) - return cls(**data) - - -class FileLifecycleManager: - """File lifecycle manager""" - - def __init__(self, storage, dataset_id: str | None = None): - """Initialize lifecycle manager - - Args: - storage: ClickZetta Volume storage instance - dataset_id: Dataset ID (for Table Volume) - """ - self._storage = storage - self._dataset_id = dataset_id - self._metadata_file = ".dify_file_metadata.json" - self._version_prefix = ".versions/" - self._backup_prefix = ".backups/" - self._deleted_prefix = ".deleted/" - - # Get permission manager (if exists) - self._permission_manager: Any | None = getattr(storage, "_permission_manager", None) - - def save_with_lifecycle(self, filename: str, data: bytes, tags: dict[str, str] | None = None) -> FileMetadata: - """Save file and manage lifecycle - - Args: - filename: File name - data: File content - tags: File tags - - Returns: - File metadata - """ - # Permission check - if not self._check_permission(filename, "save"): - from .volume_permissions import VolumePermissionError - - raise VolumePermissionError( - f"Permission denied for lifecycle save operation on file: {filename}", - operation="save", - volume_type=getattr(self._storage, "_config", {}).get("volume_type", "unknown"), - dataset_id=self._dataset_id, - ) - - try: - # 1. Check if old version exists - metadata_dict = self._load_metadata() - current_metadata = metadata_dict.get(filename) - - # 2. If old version exists, create version backup - if current_metadata: - self._create_version_backup(filename, current_metadata) - - # 3. Calculate file information - now = datetime.now() - checksum = self._calculate_checksum(data) - new_version = (current_metadata["version"] + 1) if current_metadata else 1 - - # 4. Save new file - self._storage.save(filename, data) - - # 5. Create metadata - created_at = now - parent_version = None - - if current_metadata: - # If created_at is string, convert to datetime - if isinstance(current_metadata["created_at"], str): - created_at = datetime.fromisoformat(current_metadata["created_at"]) - else: - created_at = current_metadata["created_at"] - parent_version = current_metadata["version"] - - file_metadata = FileMetadata( - filename=filename, - size=len(data), - created_at=created_at, - modified_at=now, - version=new_version, - status=FileStatus.ACTIVE, - checksum=checksum, - tags=tags or {}, - parent_version=parent_version, - ) - - # 6. Update metadata - metadata_dict[filename] = file_metadata.to_dict() - self._save_metadata(metadata_dict) - - logger.info("File %s saved with lifecycle management, version %s", filename, new_version) - return file_metadata - - except Exception: - logger.exception("Failed to save file with lifecycle") - raise - - def get_file_metadata(self, filename: str) -> FileMetadata | None: - """Get file metadata - - Args: - filename: File name - - Returns: - File metadata, returns None if not exists - """ - try: - metadata_dict = self._load_metadata() - if filename in metadata_dict: - return FileMetadata.from_dict(metadata_dict[filename]) - return None - except Exception: - logger.exception("Failed to get file metadata for %s", filename) - return None - - def list_file_versions(self, filename: str) -> list[FileMetadata]: - """List all versions of a file - - Args: - filename: File name - - Returns: - File version list, sorted by version number - """ - try: - versions = [] - - # Get current version - current_metadata = self.get_file_metadata(filename) - if current_metadata: - versions.append(current_metadata) - - # Get historical versions - try: - version_files = self._storage.scan(self._dataset_id or "", files=True) - for file_path in version_files: - if file_path.startswith(f"{self._version_prefix}{filename}.v"): - # Parse version number - version_str = file_path.split(".v")[-1].split(".")[0] - try: - _ = int(version_str) - # Simplified processing here, should actually read metadata from version file - # Temporarily create basic metadata information - except ValueError: - continue - except Exception: - # If cannot scan version files, only return current version - logger.exception("Failed to scan version files for %s", filename) - - return sorted(versions, key=lambda x: x.version or 0, reverse=True) - - except Exception: - logger.exception("Failed to list file versions for %s", filename) - return [] - - def restore_version(self, filename: str, version: int) -> bool: - """Restore file to specified version - - Args: - filename: File name - version: Version number to restore - - Returns: - Whether restore succeeded - """ - try: - version_filename = f"{self._version_prefix}{filename}.v{version}" - - # Check if version file exists - if not self._storage.exists(version_filename): - logger.warning("Version %s of %s not found", version, filename) - return False - - # Read version file content - version_data = self._storage.load_once(version_filename) - - # Save current version as backup - current_metadata = self.get_file_metadata(filename) - if current_metadata: - self._create_version_backup(filename, current_metadata.to_dict()) - - # Restore file - self.save_with_lifecycle(filename, version_data, {"restored_from": str(version)}) - return True - - except Exception: - logger.exception("Failed to restore %s to version %s", filename, version) - return False - - def archive_file(self, filename: str) -> bool: - """Archive file - - Args: - filename: File name - - Returns: - Whether archive succeeded - """ - # Permission check - if not self._check_permission(filename, "archive"): - logger.warning("Permission denied for archive operation on file: %s", filename) - return False - - try: - # Update file status to archived - metadata_dict = self._load_metadata() - if filename not in metadata_dict: - logger.warning("File %s not found in metadata", filename) - return False - - metadata_dict[filename]["status"] = FileStatus.ARCHIVED - metadata_dict[filename]["modified_at"] = datetime.now().isoformat() - - self._save_metadata(metadata_dict) - - logger.info("File %s archived successfully", filename) - return True - - except Exception: - logger.exception("Failed to archive file %s", filename) - return False - - def soft_delete_file(self, filename: str) -> bool: - """Soft delete file (move to deleted directory) - - Args: - filename: File name - - Returns: - Whether delete succeeded - """ - # Permission check - if not self._check_permission(filename, "delete"): - logger.warning("Permission denied for soft delete operation on file: %s", filename) - return False - - try: - # Check if file exists - if not self._storage.exists(filename): - logger.warning("File %s not found", filename) - return False - - # Read file content - file_data = self._storage.load_once(filename) - - # Move to deleted directory - deleted_filename = f"{self._deleted_prefix}{filename}.{datetime.now().strftime('%Y%m%d_%H%M%S')}" - self._storage.save(deleted_filename, file_data) - - # Delete original file - self._storage.delete(filename) - - # Update metadata - metadata_dict = self._load_metadata() - if filename in metadata_dict: - metadata_dict[filename]["status"] = FileStatus.DELETED - metadata_dict[filename]["modified_at"] = datetime.now().isoformat() - self._save_metadata(metadata_dict) - - logger.info("File %s soft deleted successfully", filename) - return True - - except Exception: - logger.exception("Failed to soft delete file %s", filename) - return False - - def cleanup_old_versions(self, max_versions: int = 5, max_age_days: int = 30) -> int: - """Cleanup old version files - - Args: - max_versions: Maximum number of versions to keep - max_age_days: Maximum retention days for version files - - Returns: - Number of files cleaned - """ - try: - cleaned_count = 0 - - # Get all version files - try: - all_files = self._storage.scan(self._dataset_id or "", files=True) - version_files = [f for f in all_files if f.startswith(self._version_prefix)] - - # Group by file - file_versions: dict[str, list[tuple[int, str]]] = {} - for version_file in version_files: - # Parse filename and version - parts = version_file[len(self._version_prefix) :].split(".v") - if len(parts) >= 2: - base_filename = parts[0] - version_part = parts[1].split(".")[0] - try: - version_num = int(version_part) - if base_filename not in file_versions: - file_versions[base_filename] = [] - file_versions[base_filename].append((version_num, version_file)) - except ValueError: - continue - - # Cleanup old versions for each file - for base_filename, versions in file_versions.items(): - # Sort by version number - versions.sort(key=operator.itemgetter(0), reverse=True) - - # Keep the newest max_versions versions, delete the rest - if len(versions) > max_versions: - to_delete = versions[max_versions:] - for version_num, version_file in to_delete: - self._storage.delete(version_file) - cleaned_count += 1 - logger.debug("Cleaned old version: %s", version_file) - - logger.info("Cleaned %d old version files", cleaned_count) - - except Exception as e: - logger.warning("Could not scan for version files: %s", e) - - return cleaned_count - - except Exception: - logger.exception("Failed to cleanup old versions") - return 0 - - def get_storage_statistics(self) -> dict[str, Any]: - """Get storage statistics - - Returns: - Storage statistics dictionary - """ - try: - metadata_dict = self._load_metadata() - - stats: dict[str, Any] = { - "total_files": len(metadata_dict), - "active_files": 0, - "archived_files": 0, - "deleted_files": 0, - "total_size": 0, - "versions_count": 0, - "oldest_file": None, - "newest_file": None, - } - - oldest_date = None - newest_date = None - - for filename, metadata in metadata_dict.items(): - file_meta = FileMetadata.from_dict(metadata) - - # Count file status - if file_meta.status == FileStatus.ACTIVE: - stats["active_files"] = (stats["active_files"] or 0) + 1 - elif file_meta.status == FileStatus.ARCHIVED: - stats["archived_files"] = (stats["archived_files"] or 0) + 1 - elif file_meta.status == FileStatus.DELETED: - stats["deleted_files"] = (stats["deleted_files"] or 0) + 1 - - # Count size - stats["total_size"] = (stats["total_size"] or 0) + (file_meta.size or 0) - - # Count versions - stats["versions_count"] = (stats["versions_count"] or 0) + (file_meta.version or 0) - - # Find newest and oldest files - if oldest_date is None or file_meta.created_at < oldest_date: - oldest_date = file_meta.created_at - stats["oldest_file"] = filename - - if newest_date is None or file_meta.modified_at > newest_date: - newest_date = file_meta.modified_at - stats["newest_file"] = filename - - return stats - - except Exception: - logger.exception("Failed to get storage statistics") - return {} - - def _create_version_backup(self, filename: str, metadata: dict): - """Create version backup""" - try: - # Read current file content - current_data = self._storage.load_once(filename) - - # Save as version file - version_filename = f"{self._version_prefix}{filename}.v{metadata['version']}" - self._storage.save(version_filename, current_data) - - logger.debug("Created version backup: %s", version_filename) - - except Exception as e: - logger.warning("Failed to create version backup for %s: %s", filename, e) - - def _load_metadata(self) -> dict[str, Any]: - """Load metadata file""" - try: - if self._storage.exists(self._metadata_file): - metadata_content = self._storage.load_once(self._metadata_file) - result = json.loads(metadata_content.decode("utf-8")) - return dict(result) if result else {} - else: - return {} - except Exception as e: - logger.warning("Failed to load metadata: %s", e) - return {} - - def _save_metadata(self, metadata_dict: dict): - """Save metadata file""" - try: - metadata_content = json.dumps(metadata_dict, indent=2, ensure_ascii=False) - self._storage.save(self._metadata_file, metadata_content.encode("utf-8")) - logger.debug("Metadata saved successfully") - except Exception: - logger.exception("Failed to save metadata") - raise - - def _calculate_checksum(self, data: bytes) -> str: - """Calculate file checksum""" - import hashlib - - return hashlib.md5(data).hexdigest() - - def _check_permission(self, filename: str, operation: str) -> bool: - """Check file operation permission - - Args: - filename: File name - operation: Operation type - - Returns: - True if permission granted, False otherwise - """ - # If no permission manager, allow by default - if not self._permission_manager: - return True - - try: - # Map operation type to permission - operation_mapping = { - "save": "save", - "load": "load_once", - "delete": "delete", - "archive": "delete", # Archive requires delete permission - "restore": "save", # Restore requires write permission - "cleanup": "delete", # Cleanup requires delete permission - "read": "load_once", - "write": "save", - } - - mapped_operation = operation_mapping.get(operation, operation) - - # Check permission - result = self._permission_manager.validate_operation(mapped_operation, self._dataset_id) - return bool(result) - - except Exception: - logger.exception("Permission check failed for %s operation %s", filename, operation) - # Safe default: deny access when permission check fails - return False diff --git a/api/extensions/storage/clickzetta_volume/volume_permissions.py b/api/extensions/storage/clickzetta_volume/volume_permissions.py deleted file mode 100644 index 9d4ca689d8..0000000000 --- a/api/extensions/storage/clickzetta_volume/volume_permissions.py +++ /dev/null @@ -1,649 +0,0 @@ -"""ClickZetta Volume permission management mechanism - -This module provides Volume permission checking, validation and management features. -According to ClickZetta's permission model, different Volume types have different permission requirements. -""" - -import logging -from enum import StrEnum - -logger = logging.getLogger(__name__) - - -class VolumePermission(StrEnum): - """Volume permission type enumeration""" - - READ = "SELECT" # Corresponds to ClickZetta's SELECT permission - WRITE = "INSERT,UPDATE,DELETE" # Corresponds to ClickZetta's write permissions - LIST = "SELECT" # Listing files requires SELECT permission - DELETE = "INSERT,UPDATE,DELETE" # Deleting files requires write permissions - USAGE = "USAGE" # Basic permission required for External Volume - - -class VolumePermissionManager: - """Volume permission manager""" - - def __init__(self, connection_or_config, volume_type: str | None = None, volume_name: str | None = None): - """Initialize permission manager - - Args: - connection_or_config: ClickZetta connection object or configuration dictionary - volume_type: Volume type (user|table|external) - volume_name: Volume name (for external volume) - """ - # Support two initialization methods: connection object or configuration dictionary - if isinstance(connection_or_config, dict): - # Create connection from configuration dictionary - import clickzetta - - config = connection_or_config - self._connection = clickzetta.connect( - username=config.get("username"), - password=config.get("password"), - instance=config.get("instance"), - service=config.get("service"), - workspace=config.get("workspace"), - vcluster=config.get("vcluster"), - schema=config.get("schema") or config.get("database"), - ) - self._volume_type = config.get("volume_type", volume_type) - self._volume_name = config.get("volume_name", volume_name) - else: - # Use connection object directly - self._connection = connection_or_config - self._volume_type = volume_type - self._volume_name = volume_name - - if not self._connection: - raise ValueError("Valid connection or config is required") - if not self._volume_type: - raise ValueError("volume_type is required") - - self._permission_cache: dict[str, set[str]] = {} - self._current_username = None # Will get current username from connection - - def check_permission(self, operation: VolumePermission, dataset_id: str | None = None) -> bool: - """Check if user has permission to perform specific operation - - Args: - operation: Type of operation to perform - dataset_id: Dataset ID (for table volume) - - Returns: - True if user has permission, False otherwise - """ - try: - if self._volume_type == "user": - return self._check_user_volume_permission(operation) - elif self._volume_type == "table": - return self._check_table_volume_permission(operation, dataset_id) - elif self._volume_type == "external": - return self._check_external_volume_permission(operation) - else: - logger.warning("Unknown volume type: %s", self._volume_type) - return False - - except Exception: - logger.exception("Permission check failed") - return False - - def _check_user_volume_permission(self, operation: VolumePermission) -> bool: - """Check User Volume permission - - User Volume permission rules: - - User has full permissions on their own User Volume - - As long as user can connect to ClickZetta, they have basic User Volume permissions by default - - Focus more on connection authentication rather than complex permission checking - """ - try: - # Get current username - current_user = self._get_current_username() - - # Check basic connection status - with self._connection.cursor() as cursor: - # Simple connection test, if query can be executed user has basic permissions - cursor.execute("SELECT 1") - result = cursor.fetchone() - - if result: - logger.debug( - "User Volume permission check for %s, operation %s: granted (basic connection verified)", - current_user, - operation.name, - ) - return True - else: - logger.warning( - "User Volume permission check failed: cannot verify basic connection for %s", current_user - ) - return False - - except Exception: - logger.exception("User Volume permission check failed") - # For User Volume, if permission check fails, it might be a configuration issue, - # provide friendlier error message - logger.info("User Volume permission check failed, but permission checking is disabled in this version") - return False - - def _check_table_volume_permission(self, operation: VolumePermission, dataset_id: str | None) -> bool: - """Check Table Volume permission - - Table Volume permission rules: - - Table Volume permissions inherit from corresponding table permissions - - SELECT permission -> can READ/LIST files - - INSERT,UPDATE,DELETE permissions -> can WRITE/DELETE files - """ - if not dataset_id: - logger.warning("dataset_id is required for table volume permission check") - return False - - table_name = f"dataset_{dataset_id}" if not dataset_id.startswith("dataset_") else dataset_id - - try: - # Check table permissions - permissions = self._get_table_permissions(table_name) - required_permissions = set(operation.value.split(",")) - - # Check if has all required permissions - has_permission = required_permissions.issubset(permissions) - - logger.debug( - "Table Volume permission check for %s, operation %s: required=%s, has=%s, granted=%s", - table_name, - operation.name, - required_permissions, - permissions, - has_permission, - ) - - return has_permission - - except Exception: - logger.exception("Table volume permission check failed for %s", table_name) - return False - - def _check_external_volume_permission(self, operation: VolumePermission) -> bool: - """Check External Volume permission - - External Volume permission rules: - - Try to get permissions for External Volume - - If permission check fails, perform fallback verification - - For development environment, provide more lenient permission checking - """ - if not self._volume_name: - logger.warning("volume_name is required for external volume permission check") - return False - - try: - # Check External Volume permissions - permissions = self._get_external_volume_permissions(self._volume_name) - - # External Volume permission mapping: determine required permissions based on operation type - required_permissions = set() - - if operation in [VolumePermission.READ, VolumePermission.LIST]: - required_permissions.add("read") - elif operation in [VolumePermission.WRITE, VolumePermission.DELETE]: - required_permissions.add("write") - - # Check if has all required permissions - has_permission = required_permissions.issubset(permissions) - - logger.debug( - "External Volume permission check for %s, operation %s: required=%s, has=%s, granted=%s", - self._volume_name, - operation.name, - required_permissions, - permissions, - has_permission, - ) - - # If permission check fails, try fallback verification - if not has_permission: - logger.info("Direct permission check failed for %s, trying fallback verification", self._volume_name) - - # Fallback verification: try listing Volume to verify basic access permissions - try: - with self._connection.cursor() as cursor: - cursor.execute("SHOW VOLUMES") - volumes = cursor.fetchall() - for volume in volumes: - if len(volume) > 0 and volume[0] == self._volume_name: - logger.info("Fallback verification successful for %s", self._volume_name) - return True - except Exception as fallback_e: - logger.warning("Fallback verification failed for %s: %s", self._volume_name, fallback_e) - - return has_permission - - except Exception: - logger.exception("External volume permission check failed for %s", self._volume_name) - logger.info("External Volume permission check failed, but permission checking is disabled in this version") - return False - - def _get_table_permissions(self, table_name: str) -> set[str]: - """Get user permissions for specified table - - Args: - table_name: Table name - - Returns: - Set of user permissions for this table - """ - cache_key = f"table:{table_name}" - - if cache_key in self._permission_cache: - return self._permission_cache[cache_key] - - permissions = set() - - try: - with self._connection.cursor() as cursor: - # Use correct ClickZetta syntax to check current user permissions - cursor.execute("SHOW GRANTS") - grants = cursor.fetchall() - - # Parse permission results, find permissions for this table - for grant in grants: - if len(grant) >= 3: # Typical format: (privilege, object_type, object_name, ...) - privilege = grant[0].upper() - object_type = grant[1].upper() if len(grant) > 1 else "" - object_name = grant[2] if len(grant) > 2 else "" - - # Check if it's permission for this table - if ( - object_type == "TABLE" - and object_name == table_name - or object_type == "SCHEMA" - and object_name in table_name - ): - if privilege in ["SELECT", "INSERT", "UPDATE", "DELETE", "ALL"]: - if privilege == "ALL": - permissions.update(["SELECT", "INSERT", "UPDATE", "DELETE"]) - else: - permissions.add(privilege) - - # If no explicit permissions found, try executing a simple query to verify permissions - if not permissions: - try: - cursor.execute(f"SELECT COUNT(*) FROM {table_name} LIMIT 1") - permissions.add("SELECT") - except Exception: - logger.debug("Cannot query table %s, no SELECT permission", table_name) - - except Exception as e: - logger.warning("Could not check table permissions for %s: %s", table_name, e) - # Safe default: deny access when permission check fails - pass - - # Cache permission information - self._permission_cache[cache_key] = permissions - return permissions - - def _get_current_username(self) -> str: - """Get current username""" - if self._current_username: - return self._current_username - - try: - with self._connection.cursor() as cursor: - cursor.execute("SELECT CURRENT_USER()") - result = cursor.fetchone() - if result: - self._current_username = result[0] - return str(self._current_username) - except Exception: - logger.exception("Failed to get current username") - - return "unknown" - - def _get_user_permissions(self, username: str) -> set[str]: - """Get user's basic permission set""" - cache_key = f"user_permissions:{username}" - - if cache_key in self._permission_cache: - return self._permission_cache[cache_key] - - permissions = set() - - try: - with self._connection.cursor() as cursor: - # Use correct ClickZetta syntax to check current user permissions - cursor.execute("SHOW GRANTS") - grants = cursor.fetchall() - - # Parse permission results, find user's basic permissions - for grant in grants: - if len(grant) >= 3: # Typical format: (privilege, object_type, object_name, ...) - privilege = grant[0].upper() - _ = grant[1].upper() if len(grant) > 1 else "" - - # Collect all relevant permissions - if privilege in ["SELECT", "INSERT", "UPDATE", "DELETE", "ALL"]: - if privilege == "ALL": - permissions.update(["SELECT", "INSERT", "UPDATE", "DELETE"]) - else: - permissions.add(privilege) - - except Exception as e: - logger.warning("Could not check user permissions for %s: %s", username, e) - # Safe default: deny access when permission check fails - pass - - # Cache permission information - self._permission_cache[cache_key] = permissions - return permissions - - def _get_external_volume_permissions(self, volume_name: str) -> set[str]: - """Get user permissions for specified External Volume - - Args: - volume_name: External Volume name - - Returns: - Set of user permissions for this Volume - """ - cache_key = f"external_volume:{volume_name}" - - if cache_key in self._permission_cache: - return self._permission_cache[cache_key] - - permissions = set() - - try: - with self._connection.cursor() as cursor: - # Use correct ClickZetta syntax to check Volume permissions - logger.info("Checking permissions for volume: %s", volume_name) - cursor.execute(f"SHOW GRANTS ON VOLUME {volume_name}") - grants = cursor.fetchall() - - logger.info("Raw grants result for %s: %s", volume_name, grants) - - # Parse permission results - # Format: (granted_type, privilege, conditions, granted_on, object_name, granted_to, - # grantee_name, grantor_name, grant_option, granted_time) - for grant in grants: - logger.info("Processing grant: %s", grant) - if len(grant) >= 5: - granted_type = grant[0] - privilege = grant[1].upper() - granted_on = grant[3] - object_name = grant[4] - - logger.info( - "Grant details - type: %s, privilege: %s, granted_on: %s, object_name: %s", - granted_type, - privilege, - granted_on, - object_name, - ) - - # Check if it's permission for this Volume or hierarchical permission - if ( - granted_type == "PRIVILEGE" and granted_on == "VOLUME" and object_name.endswith(volume_name) - ) or (granted_type == "OBJECT_HIERARCHY" and granted_on == "VOLUME"): - logger.info("Matching grant found for %s", volume_name) - - if "READ" in privilege: - permissions.add("read") - logger.info("Added READ permission for %s", volume_name) - if "WRITE" in privilege: - permissions.add("write") - logger.info("Added WRITE permission for %s", volume_name) - if "ALTER" in privilege: - permissions.add("alter") - logger.info("Added ALTER permission for %s", volume_name) - if privilege == "ALL": - permissions.update(["read", "write", "alter"]) - logger.info("Added ALL permissions for %s", volume_name) - - logger.info("Final permissions for %s: %s", volume_name, permissions) - - # If no explicit permissions found, try viewing Volume list to verify basic permissions - if not permissions: - try: - cursor.execute("SHOW VOLUMES") - volumes = cursor.fetchall() - for volume in volumes: - if len(volume) > 0 and volume[0] == volume_name: - permissions.add("read") # At least has read permission - logger.debug("Volume %s found in SHOW VOLUMES, assuming read permission", volume_name) - break - except Exception: - logger.debug("Cannot access volume %s, no basic permission", volume_name) - - except Exception as e: - logger.warning("Could not check external volume permissions for %s: %s", volume_name, e) - # When permission check fails, try basic Volume access verification - try: - with self._connection.cursor() as cursor: - cursor.execute("SHOW VOLUMES") - volumes = cursor.fetchall() - for volume in volumes: - if len(volume) > 0 and volume[0] == volume_name: - logger.info("Basic volume access verified for %s", volume_name) - permissions.add("read") - permissions.add("write") # Assume has write permission - break - except Exception as basic_e: - logger.warning("Basic volume access check failed for %s: %s", volume_name, basic_e) - # Last fallback: assume basic permissions - permissions.add("read") - - # Cache permission information - self._permission_cache[cache_key] = permissions - return permissions - - def clear_permission_cache(self): - """Clear permission cache""" - self._permission_cache.clear() - logger.debug("Permission cache cleared") - - @property - def volume_type(self) -> str | None: - """Get the volume type.""" - return self._volume_type - - def get_permission_summary(self, dataset_id: str | None = None) -> dict[str, bool]: - """Get permission summary - - Args: - dataset_id: Dataset ID (for table volume) - - Returns: - Permission summary dictionary - """ - summary = {} - - for operation in VolumePermission: - summary[operation.name.lower()] = self.check_permission(operation, dataset_id) - - return summary - - def check_inherited_permission(self, file_path: str, operation: VolumePermission) -> bool: - """Check permission inheritance for file path - - Args: - file_path: File path - operation: Operation to perform - - Returns: - True if user has permission, False otherwise - """ - try: - # Parse file path - path_parts = file_path.strip("/").split("/") - - if not path_parts: - logger.warning("Invalid file path for permission inheritance check") - return False - - # For Table Volume, first layer is dataset_id - if self._volume_type == "table": - if len(path_parts) < 1: - return False - - dataset_id = path_parts[0] - - # Check permissions for dataset - has_dataset_permission = self.check_permission(operation, dataset_id) - - if not has_dataset_permission: - logger.debug("Permission denied for dataset %s", dataset_id) - return False - - # Check path traversal attack - if self._contains_path_traversal(file_path): - logger.warning("Path traversal attack detected: %s", file_path) - return False - - # Check if accessing sensitive directory - if self._is_sensitive_path(file_path): - logger.warning("Access to sensitive path denied: %s", file_path) - return False - - logger.debug("Permission inherited for path %s", file_path) - return True - - elif self._volume_type == "user": - # User Volume permission inheritance - current_user = self._get_current_username() - - # Check if attempting to access other user's directory - if len(path_parts) > 1 and path_parts[0] != current_user: - logger.warning("User %s attempted to access %s's directory", current_user, path_parts[0]) - return False - - # Check basic permissions - return self.check_permission(operation) - - elif self._volume_type == "external": - # External Volume permission inheritance - # Check permissions for External Volume - return self.check_permission(operation) - - else: - logger.warning("Unknown volume type for permission inheritance: %s", self._volume_type) - return False - - except Exception: - logger.exception("Permission inheritance check failed") - return False - - def _contains_path_traversal(self, file_path: str) -> bool: - """Check if path contains path traversal attack""" - # Check common path traversal patterns - traversal_patterns = [ - "../", - "..\\", - "..%2f", - "..%2F", - "..%5c", - "..%5C", - "%2e%2e%2f", - "%2e%2e%5c", - "....//", - "....\\\\", - ] - - file_path_lower = file_path.lower() - - for pattern in traversal_patterns: - if pattern in file_path_lower: - return True - - # Check absolute path - if file_path.startswith("/") or file_path.startswith("\\"): - return True - - # Check Windows drive path - if len(file_path) >= 2 and file_path[1] == ":": - return True - - return False - - def _is_sensitive_path(self, file_path: str) -> bool: - """Check if path is sensitive path""" - sensitive_patterns = [ - "passwd", - "shadow", - "hosts", - "config", - "secrets", - "private", - "key", - "certificate", - "cert", - "ssl", - "database", - "backup", - "dump", - "log", - "tmp", - ] - - file_path_lower = file_path.lower() - - return any(pattern in file_path_lower for pattern in sensitive_patterns) - - def validate_operation(self, operation: str, dataset_id: str | None = None) -> bool: - """Validate operation permission - - Args: - operation: Operation name (save|load|exists|delete|scan) - dataset_id: Dataset ID - - Returns: - True if operation is allowed, False otherwise - """ - operation_mapping = { - "save": VolumePermission.WRITE, - "load": VolumePermission.READ, - "load_once": VolumePermission.READ, - "load_stream": VolumePermission.READ, - "download": VolumePermission.READ, - "exists": VolumePermission.READ, - "delete": VolumePermission.DELETE, - "scan": VolumePermission.LIST, - } - - if operation not in operation_mapping: - logger.warning("Unknown operation: %s", operation) - return False - - volume_permission = operation_mapping[operation] - return self.check_permission(volume_permission, dataset_id) - - -class VolumePermissionError(Exception): - """Volume permission error exception""" - - def __init__(self, message: str, operation: str, volume_type: str, dataset_id: str | None = None): - self.operation = operation - self.volume_type = volume_type - self.dataset_id = dataset_id - super().__init__(message) - - -def check_volume_permission(permission_manager: VolumePermissionManager, operation: str, dataset_id: str | None = None): - """Permission check decorator function - - Args: - permission_manager: Permission manager - operation: Operation name - dataset_id: Dataset ID - - Raises: - VolumePermissionError: If no permission - """ - if not permission_manager.validate_operation(operation, dataset_id): - error_message = f"Permission denied for operation '{operation}' on {permission_manager.volume_type} volume" - if dataset_id: - error_message += f" (dataset: {dataset_id})" - - raise VolumePermissionError( - error_message, - operation=operation, - volume_type=permission_manager.volume_type or "unknown", - dataset_id=dataset_id, - ) diff --git a/api/extensions/storage/storage_type.py b/api/extensions/storage/storage_type.py index baffa423b6..9bde601093 100644 --- a/api/extensions/storage/storage_type.py +++ b/api/extensions/storage/storage_type.py @@ -5,7 +5,6 @@ class StorageType(StrEnum): ALIYUN_OSS = "aliyun-oss" AZURE_BLOB = "azure-blob" BAIDU_OBS = "baidu-obs" - CLICKZETTA_VOLUME = "clickzetta-volume" GOOGLE_STORAGE = "google-storage" HUAWEI_OBS = "huawei-obs" LOCAL = "local" diff --git a/api/pyproject.toml b/api/pyproject.toml index f824fe7c23..c2db52d5d6 100644 --- a/api/pyproject.toml +++ b/api/pyproject.toml @@ -206,7 +206,6 @@ vdb = [ "alibabacloud_tea_openapi~=0.4.3", "chromadb==0.5.20", "clickhouse-connect~=0.14.1", - "clickzetta-connector-python>=0.8.102", "couchbase~=4.5.0", "elasticsearch==8.14.0", "opensearch-py==3.1.0", diff --git a/api/pyrefly-local-excludes.txt b/api/pyrefly-local-excludes.txt index 9a76de1927..25ba34e28f 100644 --- a/api/pyrefly-local-excludes.txt +++ b/api/pyrefly-local-excludes.txt @@ -49,7 +49,6 @@ core/rag/datasource/vdb/analyticdb/analyticdb_vector.py core/rag/datasource/vdb/analyticdb/analyticdb_vector_openapi.py core/rag/datasource/vdb/baidu/baidu_vector.py core/rag/datasource/vdb/chroma/chroma_vector.py -core/rag/datasource/vdb/clickzetta/clickzetta_vector.py core/rag/datasource/vdb/couchbase/couchbase_vector.py core/rag/datasource/vdb/elasticsearch/elasticsearch_vector.py core/rag/datasource/vdb/huawei/huawei_cloud_vector.py @@ -144,8 +143,6 @@ extensions/storage/aliyun_oss_storage.py extensions/storage/aws_s3_storage.py extensions/storage/azure_blob_storage.py extensions/storage/baidu_obs_storage.py -extensions/storage/clickzetta_volume/clickzetta_volume_storage.py -extensions/storage/clickzetta_volume/file_lifecycle.py extensions/storage/google_cloud_storage.py extensions/storage/huawei_obs_storage.py extensions/storage/opendal_storage.py diff --git a/api/pyrightconfig.json b/api/pyrightconfig.json index 48271aab61..0ad484b48b 100644 --- a/api/pyrightconfig.json +++ b/api/pyrightconfig.json @@ -28,7 +28,6 @@ "baidubce.auth.bce_credentials", "baidubce.bce_client_configuration", "baidubce.services.bos.bos_client", - "clickzetta", "google.cloud", "obs", "qcloud_cos", @@ -52,4 +51,4 @@ "reportAttributeAccessIssue": "hint", "pythonVersion": "3.11", "pythonPlatform": "All" -} \ No newline at end of file +} diff --git a/api/tests/integration_tests/storage/test_clickzetta_volume.py b/api/tests/integration_tests/storage/test_clickzetta_volume.py deleted file mode 100644 index 7e60f60adc..0000000000 --- a/api/tests/integration_tests/storage/test_clickzetta_volume.py +++ /dev/null @@ -1,168 +0,0 @@ -"""Integration tests for ClickZetta Volume Storage.""" - -import os -import tempfile -import unittest -from pathlib import Path - -import pytest - -from extensions.storage.clickzetta_volume.clickzetta_volume_storage import ( - ClickZettaVolumeConfig, - ClickZettaVolumeStorage, -) - - -class TestClickZettaVolumeStorage(unittest.TestCase): - """Test cases for ClickZetta Volume Storage.""" - - def setUp(self): - """Set up test environment.""" - self.config = ClickZettaVolumeConfig( - username=os.getenv("CLICKZETTA_USERNAME", "test_user"), - password=os.getenv("CLICKZETTA_PASSWORD", "test_pass"), - instance=os.getenv("CLICKZETTA_INSTANCE", "test_instance"), - service=os.getenv("CLICKZETTA_SERVICE", "uat-api.clickzetta.com"), - workspace=os.getenv("CLICKZETTA_WORKSPACE", "quick_start"), - vcluster=os.getenv("CLICKZETTA_VCLUSTER", "default_ap"), - schema_name=os.getenv("CLICKZETTA_SCHEMA", "dify"), - volume_type="table", - table_prefix="test_dataset_", - ) - - @pytest.mark.skipif(not os.getenv("CLICKZETTA_USERNAME"), reason="ClickZetta credentials not provided") - def test_user_volume_operations(self): - """Test basic operations with User Volume.""" - config = self.config - config.volume_type = "user" - - storage = ClickZettaVolumeStorage(config) - - # Test file operations - test_filename = "test_file.txt" - test_content = b"Hello, ClickZetta Volume!" - - # Save file - storage.save(test_filename, test_content) - - # Check if file exists - assert storage.exists(test_filename) - - # Load file - loaded_content = storage.load_once(test_filename) - assert loaded_content == test_content - - # Test streaming - stream_content = b"" - for chunk in storage.load_stream(test_filename): - stream_content += chunk - assert stream_content == test_content - - # Test download - with tempfile.NamedTemporaryFile() as temp_file: - storage.download(test_filename, temp_file.name) - downloaded_content = Path(temp_file.name).read_bytes() - assert downloaded_content == test_content - - # Test scan - files = storage.scan("", files=True, directories=False) - assert test_filename in files - - # Delete file - storage.delete(test_filename) - assert not storage.exists(test_filename) - - @pytest.mark.skipif(not os.getenv("CLICKZETTA_USERNAME"), reason="ClickZetta credentials not provided") - def test_table_volume_operations(self): - """Test basic operations with Table Volume.""" - config = self.config - config.volume_type = "table" - - storage = ClickZettaVolumeStorage(config) - - # Test file operations with dataset_id - dataset_id = "12345" - test_filename = f"{dataset_id}/test_file.txt" - test_content = b"Hello, Table Volume!" - - # Save file - storage.save(test_filename, test_content) - - # Check if file exists - assert storage.exists(test_filename) - - # Load file - loaded_content = storage.load_once(test_filename) - assert loaded_content == test_content - - # Test scan for dataset - files = storage.scan(dataset_id, files=True, directories=False) - assert "test_file.txt" in files - - # Delete file - storage.delete(test_filename) - assert not storage.exists(test_filename) - - def test_config_validation(self): - """Test configuration validation.""" - # Test missing required fields - with pytest.raises(ValueError): - ClickZettaVolumeConfig( - username="", # Empty username should fail - password="pass", - instance="instance", - ) - - # Test invalid volume type - with pytest.raises(ValueError): - ClickZettaVolumeConfig(username="user", password="pass", instance="instance", volume_type="invalid_type") - - # Test external volume without volume_name - with pytest.raises(ValueError): - ClickZettaVolumeConfig( - username="user", - password="pass", - instance="instance", - volume_type="external", - # Missing volume_name - ) - - def test_volume_path_generation(self): - """Test volume path generation for different types.""" - storage = ClickZettaVolumeStorage(self.config) - - # Test table volume path - path = storage._get_volume_path("test.txt", "12345") - assert path == "test_dataset_12345/test.txt" - - # Test path with existing dataset_id prefix - path = storage._get_volume_path("12345/test.txt") - assert path == "12345/test.txt" - - # Test user volume - storage._config.volume_type = "user" - path = storage._get_volume_path("test.txt") - assert path == "test.txt" - - def test_sql_prefix_generation(self): - """Test SQL prefix generation for different volume types.""" - storage = ClickZettaVolumeStorage(self.config) - - # Test table volume SQL prefix - prefix = storage._get_volume_sql_prefix("12345") - assert prefix == "TABLE VOLUME test_dataset_12345" - - # Test user volume SQL prefix - storage._config.volume_type = "user" - prefix = storage._get_volume_sql_prefix() - assert prefix == "USER VOLUME" - - # Test external volume SQL prefix - storage._config.volume_type = "external" - storage._config.volume_name = "my_external_volume" - prefix = storage._get_volume_sql_prefix() - assert prefix == "VOLUME my_external_volume" - - -if __name__ == "__main__": - unittest.main() diff --git a/api/tests/integration_tests/vdb/clickzetta/README.md b/api/tests/integration_tests/vdb/clickzetta/README.md deleted file mode 100644 index c16dca8018..0000000000 --- a/api/tests/integration_tests/vdb/clickzetta/README.md +++ /dev/null @@ -1,25 +0,0 @@ -# Clickzetta Integration Tests - -## Running Tests - -To run the Clickzetta integration tests, you need to set the following environment variables: - -```bash -export CLICKZETTA_USERNAME=your_username -export CLICKZETTA_PASSWORD=your_password -export CLICKZETTA_INSTANCE=your_instance -export CLICKZETTA_SERVICE=api.clickzetta.com -export CLICKZETTA_WORKSPACE=your_workspace -export CLICKZETTA_VCLUSTER=your_vcluster -export CLICKZETTA_SCHEMA=dify -``` - -Then run the tests: - -```bash -pytest api/tests/integration_tests/vdb/clickzetta/ -``` - -## Security Note - -Never commit credentials to the repository. Always use environment variables or secure credential management systems. diff --git a/api/tests/integration_tests/vdb/clickzetta/test_clickzetta.py b/api/tests/integration_tests/vdb/clickzetta/test_clickzetta.py deleted file mode 100644 index 21de8be6e3..0000000000 --- a/api/tests/integration_tests/vdb/clickzetta/test_clickzetta.py +++ /dev/null @@ -1,223 +0,0 @@ -import contextlib -import os - -import pytest - -from core.rag.datasource.vdb.clickzetta.clickzetta_vector import ClickzettaConfig, ClickzettaVector -from core.rag.models.document import Document -from tests.integration_tests.vdb.test_vector_store import AbstractVectorTest, get_example_text, setup_mock_redis - - -class TestClickzettaVector(AbstractVectorTest): - """ - Test cases for Clickzetta vector database integration. - """ - - @pytest.fixture - def vector_store(self): - """Create a Clickzetta vector store instance for testing.""" - # Skip test if Clickzetta credentials are not configured - if not os.getenv("CLICKZETTA_USERNAME"): - pytest.skip("CLICKZETTA_USERNAME is not configured") - if not os.getenv("CLICKZETTA_PASSWORD"): - pytest.skip("CLICKZETTA_PASSWORD is not configured") - if not os.getenv("CLICKZETTA_INSTANCE"): - pytest.skip("CLICKZETTA_INSTANCE is not configured") - - config = ClickzettaConfig( - username=os.getenv("CLICKZETTA_USERNAME", ""), - password=os.getenv("CLICKZETTA_PASSWORD", ""), - instance=os.getenv("CLICKZETTA_INSTANCE", ""), - service=os.getenv("CLICKZETTA_SERVICE", "api.clickzetta.com"), - workspace=os.getenv("CLICKZETTA_WORKSPACE", "quick_start"), - vcluster=os.getenv("CLICKZETTA_VCLUSTER", "default_ap"), - schema=os.getenv("CLICKZETTA_SCHEMA", "dify_test"), - batch_size=10, # Small batch size for testing - enable_inverted_index=True, - analyzer_type="chinese", - analyzer_mode="smart", - vector_distance_function="cosine_distance", - ) - - with setup_mock_redis(): - vector = ClickzettaVector(collection_name="test_collection_" + str(os.getpid()), config=config) - - yield vector - - # Cleanup: delete the test collection - with contextlib.suppress(Exception): - vector.delete() - - def test_clickzetta_vector_basic_operations(self, vector_store): - """Test basic CRUD operations on Clickzetta vector store.""" - # Prepare test data - texts = [ - "这是第一个测试文档,包含一些中文内容。", - "This is the second test document with English content.", - "第三个文档混合了English和中文内容。", - ] - embeddings = [ - [0.1, 0.2, 0.3, 0.4], - [0.5, 0.6, 0.7, 0.8], - [0.9, 1.0, 1.1, 1.2], - ] - documents = [ - Document(page_content=text, metadata={"doc_id": f"doc_{i}", "source": "test"}) - for i, text in enumerate(texts) - ] - - # Test create (initial insert) - vector_store.create(texts=documents, embeddings=embeddings) - - # Test text_exists - assert vector_store.text_exists("doc_0") - assert not vector_store.text_exists("doc_999") - - # Test search_by_vector - query_vector = [0.1, 0.2, 0.3, 0.4] - results = vector_store.search_by_vector(query_vector, top_k=2) - assert len(results) > 0 - assert results[0].page_content == texts[0] # Should match the first document - - # Test search_by_full_text (Chinese) - results = vector_store.search_by_full_text("中文", top_k=3) - assert len(results) >= 2 # Should find documents with Chinese content - - # Test search_by_full_text (English) - results = vector_store.search_by_full_text("English", top_k=3) - assert len(results) >= 2 # Should find documents with English content - - # Test delete_by_ids - vector_store.delete_by_ids(["doc_0"]) - assert not vector_store.text_exists("doc_0") - assert vector_store.text_exists("doc_1") - - # Test delete_by_metadata_field - vector_store.delete_by_metadata_field("source", "test") - assert not vector_store.text_exists("doc_1") - assert not vector_store.text_exists("doc_2") - - def test_clickzetta_vector_advanced_search(self, vector_store): - """Test advanced search features of Clickzetta vector store.""" - # Prepare test data with more complex metadata - documents = [] - embeddings = [] - for i in range(10): - doc = Document( - page_content=f"Document {i}: " + get_example_text(), - metadata={ - "doc_id": f"adv_doc_{i}", - "category": "technical" if i % 2 == 0 else "general", - "document_id": f"doc_{i // 3}", # Group documents - "importance": i, - }, - ) - documents.append(doc) - # Create varied embeddings - embeddings.append([0.1 * i, 0.2 * i, 0.3 * i, 0.4 * i]) - - vector_store.create(texts=documents, embeddings=embeddings) - - # Test vector search with document filter - query_vector = [0.5, 1.0, 1.5, 2.0] - results = vector_store.search_by_vector(query_vector, top_k=5, document_ids_filter=["doc_0", "doc_1"]) - assert len(results) > 0 - # All results should belong to doc_0 or doc_1 groups - for result in results: - assert result.metadata["document_id"] in ["doc_0", "doc_1"] - - # Test score threshold - results = vector_store.search_by_vector(query_vector, top_k=10, score_threshold=0.5) - # Check that all results have a score above threshold - for result in results: - assert result.metadata.get("score", 0) >= 0.5 - - def test_clickzetta_batch_operations(self, vector_store): - """Test batch insertion operations.""" - # Prepare large batch of documents - batch_size = 25 - documents = [] - embeddings = [] - - for i in range(batch_size): - doc = Document( - page_content=f"Batch document {i}: This is a test document for batch processing.", - metadata={"doc_id": f"batch_doc_{i}", "batch": "test_batch"}, - ) - documents.append(doc) - embeddings.append([0.1 * (i % 10), 0.2 * (i % 10), 0.3 * (i % 10), 0.4 * (i % 10)]) - - # Test batch insert - vector_store.add_texts(documents=documents, embeddings=embeddings) - - # Verify all documents were inserted - for i in range(batch_size): - assert vector_store.text_exists(f"batch_doc_{i}") - - # Clean up - vector_store.delete_by_metadata_field("batch", "test_batch") - - def test_clickzetta_edge_cases(self, vector_store): - """Test edge cases and error handling.""" - # Test empty operations - vector_store.create(texts=[], embeddings=[]) - vector_store.add_texts(documents=[], embeddings=[]) - vector_store.delete_by_ids([]) - - # Test special characters in content - special_doc = Document( - page_content="Special chars: 'quotes', \"double\", \\backslash, \n newline", - metadata={"doc_id": "special_doc", "test": "edge_case"}, - ) - embeddings = [[0.1, 0.2, 0.3, 0.4]] - - vector_store.add_texts(documents=[special_doc], embeddings=embeddings) - assert vector_store.text_exists("special_doc") - - # Test search with special characters - results = vector_store.search_by_full_text("quotes", top_k=1) - if results: # Full-text search might not be available - assert len(results) > 0 - - # Clean up - vector_store.delete_by_ids(["special_doc"]) - - def test_clickzetta_full_text_search_modes(self, vector_store): - """Test different full-text search capabilities.""" - # Prepare documents with various language content - documents = [ - Document( - page_content="云器科技提供强大的Lakehouse解决方案", metadata={"doc_id": "cn_doc_1", "lang": "chinese"} - ), - Document( - page_content="Clickzetta provides powerful Lakehouse solutions", - metadata={"doc_id": "en_doc_1", "lang": "english"}, - ), - Document( - page_content="Lakehouse是现代数据架构的重要组成部分", metadata={"doc_id": "cn_doc_2", "lang": "chinese"} - ), - Document( - page_content="Modern data architecture includes Lakehouse technology", - metadata={"doc_id": "en_doc_2", "lang": "english"}, - ), - ] - - embeddings = [[0.1, 0.2, 0.3, 0.4] for _ in documents] - - vector_store.create(texts=documents, embeddings=embeddings) - - # Test Chinese full-text search - results = vector_store.search_by_full_text("Lakehouse", top_k=4) - assert len(results) >= 2 # Should find at least documents with "Lakehouse" - - # Test English full-text search - results = vector_store.search_by_full_text("solutions", top_k=2) - assert len(results) >= 1 # Should find English documents with "solutions" - - # Test mixed search - results = vector_store.search_by_full_text("数据架构", top_k=2) - assert len(results) >= 1 # Should find Chinese documents with this phrase - - # Clean up - vector_store.delete_by_metadata_field("lang", "chinese") - vector_store.delete_by_metadata_field("lang", "english") diff --git a/api/tests/integration_tests/vdb/clickzetta/test_docker_integration.py b/api/tests/integration_tests/vdb/clickzetta/test_docker_integration.py deleted file mode 100644 index 60e3f30f26..0000000000 --- a/api/tests/integration_tests/vdb/clickzetta/test_docker_integration.py +++ /dev/null @@ -1,165 +0,0 @@ -#!/usr/bin/env python3 -""" -Test Clickzetta integration in Docker environment -""" - -import os -import time - -import httpx -from clickzetta import connect - - -def test_clickzetta_connection(): - """Test direct connection to Clickzetta""" - print("=== Testing direct Clickzetta connection ===") - try: - conn = connect( - username=os.getenv("CLICKZETTA_USERNAME", "test_user"), - password=os.getenv("CLICKZETTA_PASSWORD", "test_password"), - instance=os.getenv("CLICKZETTA_INSTANCE", "test_instance"), - service=os.getenv("CLICKZETTA_SERVICE", "api.clickzetta.com"), - workspace=os.getenv("CLICKZETTA_WORKSPACE", "test_workspace"), - vcluster=os.getenv("CLICKZETTA_VCLUSTER", "default"), - database=os.getenv("CLICKZETTA_SCHEMA", "dify"), - ) - - with conn.cursor() as cursor: - # Test basic connectivity - cursor.execute("SELECT 1 as test") - result = cursor.fetchone() - print(f"✓ Connection test: {result}") - - # Check if our test table exists - cursor.execute("SHOW TABLES IN dify") - tables = cursor.fetchall() - print(f"✓ Existing tables: {[t[1] for t in tables if t[0] == 'dify']}") - - # Check if test collection exists - test_collection = "collection_test_dataset" - if test_collection in [t[1] for t in tables if t[0] == "dify"]: - cursor.execute(f"DESCRIBE dify.{test_collection}") - columns = cursor.fetchall() - print(f"✓ Table structure for {test_collection}:") - for col in columns: - print(f" - {col[0]}: {col[1]}") - - # Check for indexes - cursor.execute(f"SHOW INDEXES IN dify.{test_collection}") - indexes = cursor.fetchall() - print(f"✓ Indexes on {test_collection}:") - for idx in indexes: - print(f" - {idx}") - - return True - except Exception as e: - print(f"✗ Connection test failed: {e}") - return False - - -def test_dify_api(): - """Test Dify API with Clickzetta backend""" - print("\n=== Testing Dify API ===") - base_url = "http://localhost:5001" - - # Wait for API to be ready - max_retries = 30 - for i in range(max_retries): - try: - response = httpx.get(f"{base_url}/console/api/health") - if response.status_code == 200: - print("✓ Dify API is ready") - break - except: - if i == max_retries - 1: - print("✗ Dify API is not responding") - return False - time.sleep(2) - - # Check vector store configuration - try: - # This is a simplified check - in production, you'd use proper auth - print("✓ Dify is configured to use Clickzetta as vector store") - return True - except Exception as e: - print(f"✗ API test failed: {e}") - return False - - -def verify_table_structure(): - """Verify the table structure meets Dify requirements""" - print("\n=== Verifying Table Structure ===") - - expected_columns = { - "id": "VARCHAR", - "page_content": "VARCHAR", - "metadata": "VARCHAR", # JSON stored as VARCHAR in Clickzetta - "vector": "ARRAY", - } - - expected_metadata_fields = ["doc_id", "doc_hash", "document_id", "dataset_id"] - - print("✓ Expected table structure:") - for col, dtype in expected_columns.items(): - print(f" - {col}: {dtype}") - - print("\n✓ Required metadata fields:") - for field in expected_metadata_fields: - print(f" - {field}") - - print("\n✓ Index requirements:") - print(" - Vector index (HNSW) on 'vector' column") - print(" - Full-text index on 'page_content' (optional)") - print(" - Functional index on metadata->>'$.doc_id' (recommended)") - print(" - Functional index on metadata->>'$.document_id' (recommended)") - - return True - - -def main(): - """Run all tests""" - print("Starting Clickzetta integration tests for Dify Docker\n") - - tests = [ - ("Direct Clickzetta Connection", test_clickzetta_connection), - ("Dify API Status", test_dify_api), - ("Table Structure Verification", verify_table_structure), - ] - - results = [] - for test_name, test_func in tests: - try: - success = test_func() - results.append((test_name, success)) - except Exception as e: - print(f"\n✗ {test_name} crashed: {e}") - results.append((test_name, False)) - - # Summary - print("\n" + "=" * 50) - print("Test Summary:") - print("=" * 50) - - passed = sum(1 for _, success in results if success) - total = len(results) - - for test_name, success in results: - status = "✅ PASSED" if success else "❌ FAILED" - print(f"{test_name}: {status}") - - print(f"\nTotal: {passed}/{total} tests passed") - - if passed == total: - print("\n🎉 All tests passed! Clickzetta is ready for Dify Docker deployment.") - print("\nNext steps:") - print("1. Run: cd docker && docker-compose -f docker-compose.yaml -f docker-compose.clickzetta.yaml up -d") - print("2. Access Dify at http://localhost:3000") - print("3. Create a dataset and test vector storage with Clickzetta") - return 0 - else: - print("\n⚠️ Some tests failed. Please check the errors above.") - return 1 - - -if __name__ == "__main__": - exit(main()) diff --git a/api/uv.lock b/api/uv.lock index 8d7b38ecd0..1610d504c1 100644 --- a/api/uv.lock +++ b/api/uv.lock @@ -1167,25 +1167,6 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/01/c7/7b55d346952fcd8f0f491faca4449f607a04764fd23cada846dc93facb9e/clickhouse_connect-0.14.1-cp312-cp312-win_amd64.whl", hash = "sha256:c6bb2cce37041c90f8a3b1b380665acbaf252f125e401c13ce8f8df105378f69", size = 269353 }, ] -[[package]] -name = "clickzetta-connector-python" -version = "0.8.106" -source = { registry = "https://pypi.org/simple" } -dependencies = [ - { name = "future" }, - { name = "numpy" }, - { name = "packaging" }, - { name = "pandas" }, - { name = "pyarrow" }, - { name = "python-dateutil" }, - { name = "requests" }, - { name = "sqlalchemy" }, - { name = "urllib3" }, -] -wheels = [ - { url = "https://files.pythonhosted.org/packages/23/38/749c708619f402d4d582dfa73fbeb64ade77b1f250a93bd064d2a1aa3776/clickzetta_connector_python-0.8.106-py3-none-any.whl", hash = "sha256:120d6700051d97609dbd6655c002ab3bc260b7c8e67d39dfc7191e749563f7b4", size = 78121 }, -] - [[package]] name = "cloudpathlib" version = "0.23.0" @@ -1712,7 +1693,6 @@ vdb = [ { name = "alibabacloud-tea-openapi" }, { name = "chromadb" }, { name = "clickhouse-connect" }, - { name = "clickzetta-connector-python" }, { name = "couchbase" }, { name = "elasticsearch" }, { name = "holo-search-sdk" }, @@ -1914,7 +1894,6 @@ vdb = [ { name = "alibabacloud-tea-openapi", specifier = "~=0.4.3" }, { name = "chromadb", specifier = "==0.5.20" }, { name = "clickhouse-connect", specifier = "~=0.14.1" }, - { name = "clickzetta-connector-python", specifier = ">=0.8.102" }, { name = "couchbase", specifier = "~=4.5.0" }, { name = "elasticsearch", specifier = "==8.14.0" }, { name = "holo-search-sdk", specifier = ">=0.4.1" }, @@ -5127,31 +5106,6 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/e0/a9/023730ba63db1e494a271cb018dcd361bd2c917ba7004c3e49d5daf795a2/py_cpuinfo-9.0.0-py3-none-any.whl", hash = "sha256:859625bc251f64e21f077d099d4162689c762b5d6a4c3c97553d56241c9674d5", size = 22335 }, ] -[[package]] -name = "pyarrow" -version = "14.0.2" -source = { registry = "https://pypi.org/simple" } -dependencies = [ - { name = "numpy" }, -] -sdist = { url = "https://files.pythonhosted.org/packages/d7/8b/d18b7eb6fb22e5ed6ffcbc073c85dae635778dbd1270a6cf5d750b031e84/pyarrow-14.0.2.tar.gz", hash = "sha256:36cef6ba12b499d864d1def3e990f97949e0b79400d08b7cf74504ffbd3eb025", size = 1063645 } -wheels = [ - { url = "https://files.pythonhosted.org/packages/94/8a/411ef0b05483076b7f548c74ccaa0f90c1e60d3875db71a821f6ffa8cf42/pyarrow-14.0.2-cp311-cp311-macosx_10_14_x86_64.whl", hash = "sha256:87482af32e5a0c0cce2d12eb3c039dd1d853bd905b04f3f953f147c7a196915b", size = 26904455 }, - { url = "https://files.pythonhosted.org/packages/6c/6c/882a57798877e3a49ba54d8e0540bea24aed78fb42e1d860f08c3449c75e/pyarrow-14.0.2-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:059bd8f12a70519e46cd64e1ba40e97eae55e0cbe1695edd95384653d7626b23", size = 23997116 }, - { url = "https://files.pythonhosted.org/packages/ec/3f/ef47fe6192ce4d82803a073db449b5292135406c364a7fc49dfbcd34c987/pyarrow-14.0.2-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:3f16111f9ab27e60b391c5f6d197510e3ad6654e73857b4e394861fc79c37200", size = 35944575 }, - { url = "https://files.pythonhosted.org/packages/1a/90/2021e529d7f234a3909f419d4341d53382541ef77d957fa274a99c533b18/pyarrow-14.0.2-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:06ff1264fe4448e8d02073f5ce45a9f934c0f3db0a04460d0b01ff28befc3696", size = 38079719 }, - { url = "https://files.pythonhosted.org/packages/30/a9/474caf5fd54a6d5315aaf9284c6e8f5d071ca825325ad64c53137b646e1f/pyarrow-14.0.2-cp311-cp311-manylinux_2_28_aarch64.whl", hash = "sha256:6dd4f4b472ccf4042f1eab77e6c8bce574543f54d2135c7e396f413046397d5a", size = 35429706 }, - { url = "https://files.pythonhosted.org/packages/d9/f8/cfba56f5353e51c19b0c240380ce39483f4c76e5c4aee5a000f3d75b72da/pyarrow-14.0.2-cp311-cp311-manylinux_2_28_x86_64.whl", hash = "sha256:32356bfb58b36059773f49e4e214996888eeea3a08893e7dbde44753799b2a02", size = 38001476 }, - { url = "https://files.pythonhosted.org/packages/43/3f/7bdf7dc3b3b0cfdcc60760e7880954ba99ccd0bc1e0df806f3dd61bc01cd/pyarrow-14.0.2-cp311-cp311-win_amd64.whl", hash = "sha256:52809ee69d4dbf2241c0e4366d949ba035cbcf48409bf404f071f624ed313a2b", size = 24576230 }, - { url = "https://files.pythonhosted.org/packages/69/5b/d8ab6c20c43b598228710e4e4a6cba03a01f6faa3d08afff9ce76fd0fd47/pyarrow-14.0.2-cp312-cp312-macosx_10_14_x86_64.whl", hash = "sha256:c87824a5ac52be210d32906c715f4ed7053d0180c1060ae3ff9b7e560f53f944", size = 26819585 }, - { url = "https://files.pythonhosted.org/packages/2d/29/bed2643d0dd5e9570405244a61f6db66c7f4704a6e9ce313f84fa5a3675a/pyarrow-14.0.2-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:a25eb2421a58e861f6ca91f43339d215476f4fe159eca603c55950c14f378cc5", size = 23965222 }, - { url = "https://files.pythonhosted.org/packages/2a/34/da464632e59a8cdd083370d69e6c14eae30221acb284f671c6bc9273fadd/pyarrow-14.0.2-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:5c1da70d668af5620b8ba0a23f229030a4cd6c5f24a616a146f30d2386fec422", size = 35942036 }, - { url = "https://files.pythonhosted.org/packages/a8/ff/cbed4836d543b29f00d2355af67575c934999ff1d43e3f438ab0b1b394f1/pyarrow-14.0.2-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:2cc61593c8e66194c7cdfae594503e91b926a228fba40b5cf25cc593563bcd07", size = 38089266 }, - { url = "https://files.pythonhosted.org/packages/38/41/345011cb831d3dbb2dab762fc244c745a5df94b199223a99af52a5f7dff6/pyarrow-14.0.2-cp312-cp312-manylinux_2_28_aarch64.whl", hash = "sha256:78ea56f62fb7c0ae8ecb9afdd7893e3a7dbeb0b04106f5c08dbb23f9c0157591", size = 35404468 }, - { url = "https://files.pythonhosted.org/packages/fd/af/2fc23ca2068ff02068d8dabf0fb85b6185df40ec825973470e613dbd8790/pyarrow-14.0.2-cp312-cp312-manylinux_2_28_x86_64.whl", hash = "sha256:37c233ddbce0c67a76c0985612fef27c0c92aef9413cf5aa56952f359fcb7379", size = 38003134 }, - { url = "https://files.pythonhosted.org/packages/95/1f/9d912f66a87e3864f694e000977a6a70a644ea560289eac1d733983f215d/pyarrow-14.0.2-cp312-cp312-win_amd64.whl", hash = "sha256:e4b123ad0f6add92de898214d404e488167b87b5dd86e9a434126bc2b7a5578d", size = 25043754 }, -] - [[package]] name = "pyasn1" version = "0.6.3"