diff --git a/api/core/rag/datasource/vdb/tidb_on_qdrant/tidb_service.py b/api/core/rag/datasource/vdb/tidb_on_qdrant/tidb_service.py index 118bd0b84e..8b425164e4 100644 --- a/api/core/rag/datasource/vdb/tidb_on_qdrant/tidb_service.py +++ b/api/core/rag/datasource/vdb/tidb_on_qdrant/tidb_service.py @@ -27,21 +27,29 @@ class TidbService: def fetch_qdrant_endpoint(api_url: str, public_key: str, private_key: str, cluster_id: str) -> str | None: """Fetch the qdrant endpoint for a cluster by calling the Get Cluster API. - The Get Cluster response contains ``status.connection_strings.standard.host`` - (e.g. ``gateway01.xx.tidbcloud.com``). We prepend ``qdrant-`` and wrap it - as an ``https://`` URL. + The v1beta1 serverless Get Cluster response contains + ``endpoints.public.host`` (e.g. ``gateway01.xx.tidbcloud.com``). + We prepend ``qdrant-`` and wrap it as an ``https://`` URL. """ try: + logger.info("Fetching qdrant endpoint for cluster %s", cluster_id) cluster_response = TidbService.get_tidb_serverless_cluster(api_url, public_key, private_key, cluster_id) if not cluster_response: + logger.warning("Empty response from Get Cluster API for cluster %s", cluster_id) return None - # v1beta: status.connection_strings.standard.host - status = cluster_response.get("status") or {} - connection_strings = status.get("connection_strings") or {} - standard = connection_strings.get("standard") or {} - host = standard.get("host") + # v1beta1 serverless: endpoints.public.host + endpoints = cluster_response.get("endpoints") or {} + public = endpoints.get("public") or {} + host = public.get("host") if host: - return f"https://qdrant-{host}" + qdrant_url = f"https://qdrant-{host}" + logger.info("Resolved qdrant endpoint for cluster %s: %s", cluster_id, qdrant_url) + return qdrant_url + logger.warning( + "No endpoints.public.host found for cluster %s, response keys: %s", + cluster_id, + list(cluster_response.keys()), + ) except Exception: logger.exception("Failed to fetch qdrant endpoint for cluster %s", cluster_id) return None @@ -83,6 +91,7 @@ class TidbService: "rootPassword": password, } + logger.info("Creating TiDB serverless cluster: display_name=%s, region=%s", display_name, region) response = _tidb_http_client.post( f"{api_url}/clusters", json=cluster_data, auth=DigestAuth(public_key, private_key) ) @@ -90,6 +99,7 @@ class TidbService: if response.status_code == 200: response_data = response.json() cluster_id = response_data["clusterId"] + logger.info("Cluster created, cluster_id=%s, waiting for ACTIVE state", cluster_id) retry_count = 0 max_retries = 30 while retry_count < max_retries: @@ -97,6 +107,10 @@ class TidbService: if cluster_response["state"] == "ACTIVE": user_prefix = cluster_response["userPrefix"] qdrant_endpoint = TidbService.fetch_qdrant_endpoint(api_url, public_key, private_key, cluster_id) + logger.info( + "Cluster %s is ACTIVE, user_prefix=%s, qdrant_endpoint=%s", + cluster_id, user_prefix, qdrant_endpoint, + ) return { "cluster_id": cluster_id, "cluster_name": display_name, @@ -104,9 +118,12 @@ class TidbService: "password": password, "qdrant_endpoint": qdrant_endpoint, } - time.sleep(30) # wait 30 seconds before retrying + logger.info("Cluster %s state=%s, retry %d/%d", cluster_id, cluster_response["state"], retry_count + 1, max_retries) + time.sleep(30) retry_count += 1 + logger.error("Cluster %s did not become ACTIVE after %d retries", cluster_id, max_retries) else: + logger.error("Failed to create cluster: status=%d, body=%s", response.status_code, response.text) response.raise_for_status() @staticmethod @@ -271,22 +288,30 @@ class TidbService: if response.status_code == 200: response_data = response.json() cluster_infos = [] + logger.info("Batch created %d clusters", len(response_data.get("clusters", []))) for item in response_data["clusters"]: cache_key = f"tidb_serverless_cluster_password:{item['displayName']}" cached_password = redis_client.get(cache_key) if not cached_password: + logger.warning("No cached password for cluster %s, skipping", item["displayName"]) continue + qdrant_endpoint = TidbService.fetch_qdrant_endpoint( + api_url, public_key, private_key, item["clusterId"] + ) + logger.info( + "Batch cluster %s: qdrant_endpoint=%s", + item["clusterId"], qdrant_endpoint, + ) cluster_info = { "cluster_id": item["clusterId"], "cluster_name": item["displayName"], "account": "root", "password": cached_password.decode("utf-8"), - "qdrant_endpoint": TidbService.fetch_qdrant_endpoint( - api_url, public_key, private_key, item["clusterId"] - ), + "qdrant_endpoint": qdrant_endpoint, } cluster_infos.append(cluster_info) return cluster_infos else: + logger.error("Batch create failed: status=%d, body=%s", response.status_code, response.text) response.raise_for_status() return [] diff --git a/api/providers/vdb/vdb-tidb-on-qdrant/src/dify_vdb_tidb_on_qdrant/tidb_on_qdrant_vector.py b/api/providers/vdb/vdb-tidb-on-qdrant/src/dify_vdb_tidb_on_qdrant/tidb_on_qdrant_vector.py index 4a5cfc0387..a7b14b84e7 100644 --- a/api/providers/vdb/vdb-tidb-on-qdrant/src/dify_vdb_tidb_on_qdrant/tidb_on_qdrant_vector.py +++ b/api/providers/vdb/vdb-tidb-on-qdrant/src/dify_vdb_tidb_on_qdrant/tidb_on_qdrant_vector.py @@ -1,4 +1,5 @@ import json +import logging import os import uuid from collections.abc import Generator, Iterable, Sequence @@ -7,6 +8,8 @@ from typing import TYPE_CHECKING, Any import httpx import qdrant_client + +logger = logging.getLogger(__name__) from flask import current_app from httpx import DigestAuth from pydantic import BaseModel @@ -421,13 +424,16 @@ class TidbOnQdrantVector(BaseVector): class TidbOnQdrantVectorFactory(AbstractVectorFactory): def init_vector(self, dataset: Dataset, attributes: list, embeddings: Embeddings) -> TidbOnQdrantVector: + logger.info("init_vector: tenant_id=%s, dataset_id=%s", dataset.tenant_id, dataset.id) stmt = select(TidbAuthBinding).where(TidbAuthBinding.tenant_id == dataset.tenant_id) tidb_auth_binding = db.session.scalars(stmt).one_or_none() if not tidb_auth_binding: + logger.info("No existing TidbAuthBinding for tenant %s, acquiring lock", dataset.tenant_id) with redis_client.lock("create_tidb_serverless_cluster_lock", timeout=900): stmt = select(TidbAuthBinding).where(TidbAuthBinding.tenant_id == dataset.tenant_id) tidb_auth_binding = db.session.scalars(stmt).one_or_none() if tidb_auth_binding: + logger.info("Found binding after lock: cluster_id=%s", tidb_auth_binding.cluster_id) TIDB_ON_QDRANT_API_KEY = f"{tidb_auth_binding.account}:{tidb_auth_binding.password}" else: @@ -437,12 +443,17 @@ class TidbOnQdrantVectorFactory(AbstractVectorFactory): .limit(1) ) if idle_tidb_auth_binding: + logger.info( + "Assigning idle cluster %s to tenant %s", + idle_tidb_auth_binding.cluster_id, dataset.tenant_id, + ) idle_tidb_auth_binding.active = True idle_tidb_auth_binding.tenant_id = dataset.tenant_id db.session.commit() tidb_auth_binding = idle_tidb_auth_binding TIDB_ON_QDRANT_API_KEY = f"{idle_tidb_auth_binding.account}:{idle_tidb_auth_binding.password}" else: + logger.info("No idle clusters available, creating new cluster for tenant %s", dataset.tenant_id) new_cluster = TidbService.create_tidb_serverless_cluster( dify_config.TIDB_PROJECT_ID or "", dify_config.TIDB_API_URL or "", @@ -451,6 +462,10 @@ class TidbOnQdrantVectorFactory(AbstractVectorFactory): dify_config.TIDB_PRIVATE_KEY or "", dify_config.TIDB_REGION or "", ) + logger.info( + "New cluster created: cluster_id=%s, qdrant_endpoint=%s", + new_cluster["cluster_id"], new_cluster.get("qdrant_endpoint"), + ) new_tidb_auth_binding = TidbAuthBinding( cluster_id=new_cluster["cluster_id"], cluster_name=new_cluster["cluster_name"], @@ -466,11 +481,18 @@ class TidbOnQdrantVectorFactory(AbstractVectorFactory): tidb_auth_binding = new_tidb_auth_binding TIDB_ON_QDRANT_API_KEY = f"{new_tidb_auth_binding.account}:{new_tidb_auth_binding.password}" else: + logger.info("Existing binding found: cluster_id=%s", tidb_auth_binding.cluster_id) TIDB_ON_QDRANT_API_KEY = f"{tidb_auth_binding.account}:{tidb_auth_binding.password}" qdrant_url = ( (tidb_auth_binding.qdrant_endpoint if tidb_auth_binding else None) or dify_config.TIDB_ON_QDRANT_URL or "" ) + logger.info( + "Using qdrant endpoint: %s (from_binding=%s, fallback_global=%s)", + qdrant_url, + tidb_auth_binding.qdrant_endpoint if tidb_auth_binding else None, + dify_config.TIDB_ON_QDRANT_URL, + ) if dataset.index_struct_dict: class_prefix: str = dataset.index_struct_dict["vector_store"]["class_prefix"]