Feat/merge main (#25785)

This commit is contained in:
QuantumGhost 2025-09-16 19:33:22 +08:00 committed by GitHub
commit 3947945a6f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 236 additions and 394 deletions

View File

@ -1 +0,0 @@
CLAUDE.md

87
AGENTS.md Normal file
View File

@ -0,0 +1,87 @@
# AGENTS.md
## Project Overview
Dify is an open-source platform for developing LLM applications with an intuitive interface combining agentic AI workflows, RAG pipelines, agent capabilities, and model management.
The codebase consists of:
- **Backend API** (`/api`): Python Flask application with Domain-Driven Design architecture
- **Frontend Web** (`/web`): Next.js 15 application with TypeScript and React 19
- **Docker deployment** (`/docker`): Containerized deployment configurations
## Development Commands
### Backend (API)
All Python commands must be prefixed with `uv run --project api`:
```bash
# Start development servers
./dev/start-api # Start API server
./dev/start-worker # Start Celery worker
# Run tests
uv run --project api pytest # Run all tests
uv run --project api pytest tests/unit_tests/ # Unit tests only
uv run --project api pytest tests/integration_tests/ # Integration tests
# Code quality
./dev/reformat # Run all formatters and linters
uv run --project api ruff check --fix ./ # Fix linting issues
uv run --project api ruff format ./ # Format code
uv run --directory api basedpyright # Type checking
```
### Frontend (Web)
```bash
cd web
pnpm lint # Run ESLint
pnpm eslint-fix # Fix ESLint issues
pnpm test # Run Jest tests
```
## Testing Guidelines
### Backend Testing
- Use `pytest` for all backend tests
- Write tests first (TDD approach)
- Test structure: Arrange-Act-Assert
## Code Style Requirements
### Python
- Use type hints for all functions and class attributes
- No `Any` types unless absolutely necessary
- Implement special methods (`__repr__`, `__str__`) appropriately
### TypeScript/JavaScript
- Strict TypeScript configuration
- ESLint with Prettier integration
- Avoid `any` type
## Important Notes
- **Environment Variables**: Always use UV for Python commands: `uv run --project api <command>`
- **Comments**: Only write meaningful comments that explain "why", not "what"
- **File Creation**: Always prefer editing existing files over creating new ones
- **Documentation**: Don't create documentation files unless explicitly requested
- **Code Quality**: Always run `./dev/reformat` before committing backend changes
## Common Development Tasks
### Adding a New API Endpoint
1. Create controller in `/api/controllers/`
1. Add service logic in `/api/services/`
1. Update routes in controller's `__init__.py`
1. Write tests in `/api/tests/`
## Project-Specific Conventions
- All async tasks use Celery with Redis as broker
- **Internationalization**: Frontend supports multiple languages with English (`web/i18n/en-US/`) as the source. All user-facing text must use i18n keys, no hardcoded strings. Edit corresponding module files in `en-US/` directory for translations.

View File

@ -1,89 +0,0 @@
# CLAUDE.md
This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository.
## Project Overview
Dify is an open-source platform for developing LLM applications with an intuitive interface combining agentic AI workflows, RAG pipelines, agent capabilities, and model management.
The codebase consists of:
- **Backend API** (`/api`): Python Flask application with Domain-Driven Design architecture
- **Frontend Web** (`/web`): Next.js 15 application with TypeScript and React 19
- **Docker deployment** (`/docker`): Containerized deployment configurations
## Development Commands
### Backend (API)
All Python commands must be prefixed with `uv run --project api`:
```bash
# Start development servers
./dev/start-api # Start API server
./dev/start-worker # Start Celery worker
# Run tests
uv run --project api pytest # Run all tests
uv run --project api pytest tests/unit_tests/ # Unit tests only
uv run --project api pytest tests/integration_tests/ # Integration tests
# Code quality
./dev/reformat # Run all formatters and linters
uv run --project api ruff check --fix ./ # Fix linting issues
uv run --project api ruff format ./ # Format code
uv run --directory api basedpyright # Type checking
```
### Frontend (Web)
```bash
cd web
pnpm lint # Run ESLint
pnpm eslint-fix # Fix ESLint issues
pnpm test # Run Jest tests
```
## Testing Guidelines
### Backend Testing
- Use `pytest` for all backend tests
- Write tests first (TDD approach)
- Test structure: Arrange-Act-Assert
## Code Style Requirements
### Python
- Use type hints for all functions and class attributes
- No `Any` types unless absolutely necessary
- Implement special methods (`__repr__`, `__str__`) appropriately
### TypeScript/JavaScript
- Strict TypeScript configuration
- ESLint with Prettier integration
- Avoid `any` type
## Important Notes
- **Environment Variables**: Always use UV for Python commands: `uv run --project api <command>`
- **Comments**: Only write meaningful comments that explain "why", not "what"
- **File Creation**: Always prefer editing existing files over creating new ones
- **Documentation**: Don't create documentation files unless explicitly requested
- **Code Quality**: Always run `./dev/reformat` before committing backend changes
## Common Development Tasks
### Adding a New API Endpoint
1. Create controller in `/api/controllers/`
1. Add service logic in `/api/services/`
1. Update routes in controller's `__init__.py`
1. Write tests in `/api/tests/`
## Project-Specific Conventions
- All async tasks use Celery with Redis as broker
- **Internationalization**: Frontend supports multiple languages with English (`web/i18n/en-US/`) as the source. All user-facing text must use i18n keys, no hardcoded strings. Edit corresponding module files in `en-US/` directory for translations.

1
CLAUDE.md Symbolic link
View File

@ -0,0 +1 @@
AGENTS.md

View File

@ -328,7 +328,7 @@ MATRIXONE_DATABASE=dify
LINDORM_URL=http://ld-*******************-proxy-search-pub.lindorm.aliyuncs.com:30070 LINDORM_URL=http://ld-*******************-proxy-search-pub.lindorm.aliyuncs.com:30070
LINDORM_USERNAME=admin LINDORM_USERNAME=admin
LINDORM_PASSWORD=admin LINDORM_PASSWORD=admin
USING_UGC_INDEX=False LINDORM_USING_UGC=True
LINDORM_QUERY_TIMEOUT=1 LINDORM_QUERY_TIMEOUT=1
# OceanBase Vector configuration # OceanBase Vector configuration

View File

@ -19,15 +19,15 @@ class LindormConfig(BaseSettings):
description="Lindorm password", description="Lindorm password",
default=None, default=None,
) )
DEFAULT_INDEX_TYPE: str | None = Field( LINDORM_INDEX_TYPE: str | None = Field(
description="Lindorm Vector Index Type, hnsw or flat is available in dify", description="Lindorm Vector Index Type, hnsw or flat is available in dify",
default="hnsw", default="hnsw",
) )
DEFAULT_DISTANCE_TYPE: str | None = Field( LINDORM_DISTANCE_TYPE: str | None = Field(
description="Vector Distance Type, support l2, cosinesimil, innerproduct", default="l2" description="Vector Distance Type, support l2, cosinesimil, innerproduct", default="l2"
) )
USING_UGC_INDEX: bool | None = Field( LINDORM_USING_UGC: bool | None = Field(
description="Using UGC index will store the same type of Index in a single index but can retrieve separately.", description="Using UGC index will store indexes with the same IndexType/Dimension in a single big index.",
default=False, default=True,
) )
LINDORM_QUERY_TIMEOUT: float | None = Field(description="The lindorm search request timeout (s)", default=2.0) LINDORM_QUERY_TIMEOUT: float | None = Field(description="The lindorm search request timeout (s)", default=2.0)

View File

@ -514,7 +514,7 @@ class DraftWorkflowRunApi(Resource):
raise InvokeRateLimitHttpError(ex.description) raise InvokeRateLimitHttpError(ex.description)
@console_ns.route("/apps/<uuid:app_id>/workflows/tasks/<string:task_id>/stop") @console_ns.route("/apps/<uuid:app_id>/workflow-runs/tasks/<string:task_id>/stop")
class WorkflowTaskStopApi(Resource): class WorkflowTaskStopApi(Resource):
@api.doc("stop_workflow_task") @api.doc("stop_workflow_task")
@api.doc(description="Stop running workflow task") @api.doc(description="Stop running workflow task")
@ -688,7 +688,7 @@ class PublishedWorkflowApi(Resource):
} }
@console_ns.route("/apps/<uuid:app_id>/workflows/default-block-configs") @console_ns.route("/apps/<uuid:app_id>/workflows/default-workflow-block-configs")
class DefaultBlockConfigsApi(Resource): class DefaultBlockConfigsApi(Resource):
@api.doc("get_default_block_configs") @api.doc("get_default_block_configs")
@api.doc(description="Get default block configurations for workflow") @api.doc(description="Get default block configurations for workflow")
@ -714,7 +714,7 @@ class DefaultBlockConfigsApi(Resource):
return workflow_service.get_default_block_configs() return workflow_service.get_default_block_configs()
@console_ns.route("/apps/<uuid:app_id>/workflows/default-block-configs/<string:block_type>") @console_ns.route("/apps/<uuid:app_id>/workflows/default-workflow-block-configs/<string:block_type>")
class DefaultBlockConfigApi(Resource): class DefaultBlockConfigApi(Resource):
@api.doc("get_default_block_config") @api.doc("get_default_block_config")
@api.doc(description="Get default block configuration by type") @api.doc(description="Get default block configuration by type")
@ -797,7 +797,7 @@ class ConvertToWorkflowApi(Resource):
} }
@console_ns.route("/apps/<uuid:app_id>/workflows/config") @console_ns.route("/apps/<uuid:app_id>/workflows/draft/config")
class WorkflowConfigApi(Resource): class WorkflowConfigApi(Resource):
"""Resource for workflow configuration.""" """Resource for workflow configuration."""
@ -815,7 +815,7 @@ class WorkflowConfigApi(Resource):
} }
@console_ns.route("/apps/<uuid:app_id>/workflows/published") @console_ns.route("/apps/<uuid:app_id>/workflows")
class PublishedAllWorkflowApi(Resource): class PublishedAllWorkflowApi(Resource):
@api.doc("get_all_published_workflows") @api.doc("get_all_published_workflows")
@api.doc(description="Get all published workflows for an application") @api.doc(description="Get all published workflows for an application")
@ -871,7 +871,7 @@ class PublishedAllWorkflowApi(Resource):
} }
@console_ns.route("/apps/<uuid:app_id>/workflows/<uuid:workflow_id>") @console_ns.route("/apps/<uuid:app_id>/workflows/<string:workflow_id>")
class WorkflowByIdApi(Resource): class WorkflowByIdApi(Resource):
@api.doc("update_workflow_by_id") @api.doc("update_workflow_by_id")
@api.doc(description="Update workflow by ID") @api.doc(description="Update workflow by ID")

View File

@ -501,6 +501,7 @@ class DocumentIndexingEstimateApi(DocumentResource):
return response, 200 return response, 200
@console_ns.route("/datasets/<uuid:dataset_id>/batch/<string:batch>/indexing-estimate")
class DocumentBatchIndexingEstimateApi(DocumentResource): class DocumentBatchIndexingEstimateApi(DocumentResource):
@setup_required @setup_required
@login_required @login_required
@ -594,6 +595,7 @@ class DocumentBatchIndexingEstimateApi(DocumentResource):
raise IndexingEstimateError(str(e)) raise IndexingEstimateError(str(e))
@console_ns.route("/datasets/<uuid:dataset_id>/batch/<string:batch>/indexing-status")
class DocumentBatchIndexingStatusApi(DocumentResource): class DocumentBatchIndexingStatusApi(DocumentResource):
@setup_required @setup_required
@login_required @login_required
@ -913,6 +915,7 @@ class DocumentMetadataApi(DocumentResource):
return {"result": "success", "message": "Document metadata updated."}, 200 return {"result": "success", "message": "Document metadata updated."}, 200
@console_ns.route("/datasets/<uuid:dataset_id>/documents/status/<string:action>/batch")
class DocumentStatusApi(DocumentResource): class DocumentStatusApi(DocumentResource):
@setup_required @setup_required
@login_required @login_required
@ -949,6 +952,7 @@ class DocumentStatusApi(DocumentResource):
return {"result": "success"}, 200 return {"result": "success"}, 200
@console_ns.route("/datasets/<uuid:dataset_id>/documents/<uuid:document_id>/processing/pause")
class DocumentPauseApi(DocumentResource): class DocumentPauseApi(DocumentResource):
@setup_required @setup_required
@login_required @login_required
@ -982,6 +986,7 @@ class DocumentPauseApi(DocumentResource):
return {"result": "success"}, 204 return {"result": "success"}, 204
@console_ns.route("/datasets/<uuid:dataset_id>/documents/<uuid:document_id>/processing/resume")
class DocumentRecoverApi(DocumentResource): class DocumentRecoverApi(DocumentResource):
@setup_required @setup_required
@login_required @login_required
@ -1012,6 +1017,7 @@ class DocumentRecoverApi(DocumentResource):
return {"result": "success"}, 204 return {"result": "success"}, 204
@console_ns.route("/datasets/<uuid:dataset_id>/retry")
class DocumentRetryApi(DocumentResource): class DocumentRetryApi(DocumentResource):
@setup_required @setup_required
@login_required @login_required
@ -1055,6 +1061,7 @@ class DocumentRetryApi(DocumentResource):
return {"result": "success"}, 204 return {"result": "success"}, 204
@console_ns.route("/datasets/<uuid:dataset_id>/documents/<uuid:document_id>/rename")
class DocumentRenameApi(DocumentResource): class DocumentRenameApi(DocumentResource):
@setup_required @setup_required
@login_required @login_required
@ -1078,6 +1085,7 @@ class DocumentRenameApi(DocumentResource):
return document return document
@console_ns.route("/datasets/<uuid:dataset_id>/documents/<uuid:document_id>/website-sync")
class WebsiteDocumentSyncApi(DocumentResource): class WebsiteDocumentSyncApi(DocumentResource):
@setup_required @setup_required
@login_required @login_required

View File

@ -1,4 +1,3 @@
import copy
import json import json
import logging import logging
import time import time
@ -28,7 +27,7 @@ UGC_INDEX_PREFIX = "ugc_index"
class LindormVectorStoreConfig(BaseModel): class LindormVectorStoreConfig(BaseModel):
hosts: str hosts: str | None
username: str | None = None username: str | None = None
password: str | None = None password: str | None = None
using_ugc: bool | None = False using_ugc: bool | None = False
@ -46,7 +45,12 @@ class LindormVectorStoreConfig(BaseModel):
return values return values
def to_opensearch_params(self) -> dict[str, Any]: def to_opensearch_params(self) -> dict[str, Any]:
params: dict[str, Any] = {"hosts": self.hosts} params: dict[str, Any] = {
"hosts": self.hosts,
"use_ssl": False,
"pool_maxsize": 128,
"timeout": 30,
}
if self.username and self.password: if self.username and self.password:
params["http_auth"] = (self.username, self.password) params["http_auth"] = (self.username, self.password)
return params return params
@ -54,18 +58,13 @@ class LindormVectorStoreConfig(BaseModel):
class LindormVectorStore(BaseVector): class LindormVectorStore(BaseVector):
def __init__(self, collection_name: str, config: LindormVectorStoreConfig, using_ugc: bool, **kwargs): def __init__(self, collection_name: str, config: LindormVectorStoreConfig, using_ugc: bool, **kwargs):
self._routing = None self._routing: str | None = None
self._routing_field = None
if using_ugc: if using_ugc:
routing_value: str | None = kwargs.get("routing_value") routing_value: str | None = kwargs.get("routing_value")
if routing_value is None: if routing_value is None:
raise ValueError("UGC index should init vector with valid 'routing_value' parameter value") raise ValueError("UGC index should init vector with valid 'routing_value' parameter value")
self._routing = routing_value.lower() self._routing = routing_value.lower()
self._routing_field = ROUTING_FIELD super().__init__(collection_name.lower())
ugc_index_name = collection_name
super().__init__(ugc_index_name.lower())
else:
super().__init__(collection_name.lower())
self._client_config = config self._client_config = config
self._client = OpenSearch(**config.to_opensearch_params()) self._client = OpenSearch(**config.to_opensearch_params())
self._using_ugc = using_ugc self._using_ugc = using_ugc
@ -75,7 +74,8 @@ class LindormVectorStore(BaseVector):
return VectorType.LINDORM return VectorType.LINDORM
def create(self, texts: list[Document], embeddings: list[list[float]], **kwargs): def create(self, texts: list[Document], embeddings: list[list[float]], **kwargs):
self.create_collection(len(embeddings[0]), **kwargs) metadatas = [d.metadata if d.metadata is not None else {} for d in texts]
self.create_collection(embeddings, metadatas)
self.add_texts(texts, embeddings) self.add_texts(texts, embeddings)
def refresh(self): def refresh(self):
@ -120,7 +120,7 @@ class LindormVectorStore(BaseVector):
for i in range(start_idx, end_idx): for i in range(start_idx, end_idx):
action_header = { action_header = {
"index": { "index": {
"_index": self.collection_name.lower(), "_index": self.collection_name,
"_id": uuids[i], "_id": uuids[i],
} }
} }
@ -131,14 +131,11 @@ class LindormVectorStore(BaseVector):
} }
if self._using_ugc: if self._using_ugc:
action_header["index"]["routing"] = self._routing action_header["index"]["routing"] = self._routing
if self._routing_field is not None: action_values[ROUTING_FIELD] = self._routing
action_values[self._routing_field] = self._routing
actions.append(action_header) actions.append(action_header)
actions.append(action_values) actions.append(action_values)
# logger.info(f"Processing batch {batch_num + 1}/{num_batches} (documents {start_idx + 1} to {end_idx})")
try: try:
_bulk_with_retry(actions) _bulk_with_retry(actions)
# logger.info(f"Successfully processed batch {batch_num + 1}") # logger.info(f"Successfully processed batch {batch_num + 1}")
@ -155,7 +152,7 @@ class LindormVectorStore(BaseVector):
"query": {"bool": {"must": [{"term": {f"{Field.METADATA_KEY.value}.{key}.keyword": value}}]}} "query": {"bool": {"must": [{"term": {f"{Field.METADATA_KEY.value}.{key}.keyword": value}}]}}
} }
if self._using_ugc: if self._using_ugc:
query["query"]["bool"]["must"].append({"term": {f"{self._routing_field}.keyword": self._routing}}) query["query"]["bool"]["must"].append({"term": {f"{ROUTING_FIELD}.keyword": self._routing}})
response = self._client.search(index=self._collection_name, body=query) response = self._client.search(index=self._collection_name, body=query)
if response["hits"]["hits"]: if response["hits"]["hits"]:
return [hit["_id"] for hit in response["hits"]["hits"]] return [hit["_id"] for hit in response["hits"]["hits"]]
@ -216,7 +213,7 @@ class LindormVectorStore(BaseVector):
def delete(self): def delete(self):
if self._using_ugc: if self._using_ugc:
routing_filter_query = { routing_filter_query = {
"query": {"bool": {"must": [{"term": {f"{self._routing_field}.keyword": self._routing}}]}} "query": {"bool": {"must": [{"term": {f"{ROUTING_FIELD}.keyword": self._routing}}]}}
} }
self._client.delete_by_query(self._collection_name, body=routing_filter_query) self._client.delete_by_query(self._collection_name, body=routing_filter_query)
self.refresh() self.refresh()
@ -229,7 +226,7 @@ class LindormVectorStore(BaseVector):
def text_exists(self, id: str) -> bool: def text_exists(self, id: str) -> bool:
try: try:
params = {} params: dict[str, Any] = {}
if self._using_ugc: if self._using_ugc:
params["routing"] = self._routing params["routing"] = self._routing
self._client.get(index=self._collection_name, id=id, params=params) self._client.get(index=self._collection_name, id=id, params=params)
@ -244,20 +241,37 @@ class LindormVectorStore(BaseVector):
if not all(isinstance(x, float) for x in query_vector): if not all(isinstance(x, float) for x in query_vector):
raise ValueError("All elements in query_vector should be floats") raise ValueError("All elements in query_vector should be floats")
top_k = kwargs.get("top_k", 3)
document_ids_filter = kwargs.get("document_ids_filter")
filters = [] filters = []
document_ids_filter = kwargs.get("document_ids_filter")
if document_ids_filter: if document_ids_filter:
filters.append({"terms": {"metadata.document_id.keyword": document_ids_filter}}) filters.append({"terms": {"metadata.document_id.keyword": document_ids_filter}})
query = default_vector_search_query(query_vector=query_vector, k=top_k, filters=filters, **kwargs) if self._using_ugc:
filters.append({"term": {f"{ROUTING_FIELD}.keyword": self._routing}})
top_k = kwargs.get("top_k", 5)
search_query: dict[str, Any] = {
"size": top_k,
"_source": True,
"query": {"knn": {Field.VECTOR.value: {"vector": query_vector, "k": top_k}}},
}
final_ext: dict[str, Any] = {"lvector": {}}
if filters is not None and len(filters) > 0:
# when using filter, transform filter from List[Dict] to Dict as valid format
filter_dict = {"bool": {"must": filters}} if len(filters) > 1 else filters[0]
search_query["query"]["knn"][Field.VECTOR.value]["filter"] = filter_dict # filter should be Dict
final_ext["lvector"]["filter_type"] = "pre_filter"
if final_ext != {"lvector": {}}:
search_query["ext"] = final_ext
try: try:
params = {"timeout": self._client_config.request_timeout} params = {"timeout": self._client_config.request_timeout}
if self._using_ugc: if self._using_ugc:
params["routing"] = self._routing # type: ignore params["routing"] = self._routing # type: ignore
response = self._client.search(index=self._collection_name, body=query, params=params) response = self._client.search(index=self._collection_name, body=search_query, params=params)
except Exception: except Exception:
logger.exception("Error executing vector search, query: %s", query) logger.exception("Error executing vector search, query: %s", search_query)
raise raise
docs_and_scores = [] docs_and_scores = []
@ -283,283 +297,85 @@ class LindormVectorStore(BaseVector):
return docs return docs
def search_by_full_text(self, query: str, **kwargs: Any) -> list[Document]: def search_by_full_text(self, query: str, **kwargs: Any) -> list[Document]:
must = kwargs.get("must") full_text_query = {"query": {"bool": {"must": [{"match": {Field.CONTENT_KEY.value: query}}]}}}
must_not = kwargs.get("must_not") filters = []
should = kwargs.get("should")
minimum_should_match = kwargs.get("minimum_should_match", 0)
top_k = kwargs.get("top_k", 3)
filters = kwargs.get("filter", [])
document_ids_filter = kwargs.get("document_ids_filter") document_ids_filter = kwargs.get("document_ids_filter")
if document_ids_filter: if document_ids_filter:
filters.append({"terms": {"metadata.document_id.keyword": document_ids_filter}}) filters.append({"terms": {"metadata.document_id.keyword": document_ids_filter}})
routing = self._routing if self._using_ugc:
full_text_query = default_text_search_query( filters.append({"term": {f"{ROUTING_FIELD}.keyword": self._routing}})
query_text=query, if filters:
k=top_k, full_text_query["query"]["bool"]["filter"] = filters
text_field=Field.CONTENT_KEY.value,
must=must, try:
must_not=must_not, params: dict[str, Any] = {"timeout": self._client_config.request_timeout}
should=should, if self._using_ugc:
minimum_should_match=minimum_should_match, params["routing"] = self._routing
filters=filters, response = self._client.search(index=self._collection_name, body=full_text_query, params=params)
routing=routing, except Exception:
routing_field=self._routing_field, logger.exception("Error executing vector search, query: %s", full_text_query)
) raise
params = {"timeout": self._client_config.request_timeout}
response = self._client.search(index=self._collection_name, body=full_text_query, params=params)
docs = [] docs = []
for hit in response["hits"]["hits"]: for hit in response["hits"]["hits"]:
docs.append( metadata = hit["_source"].get(Field.METADATA_KEY.value)
Document( vector = hit["_source"].get(Field.VECTOR.value)
page_content=hit["_source"][Field.CONTENT_KEY.value], page_content = hit["_source"].get(Field.CONTENT_KEY.value)
vector=hit["_source"][Field.VECTOR.value], doc = Document(page_content=page_content, vector=vector, metadata=metadata)
metadata=hit["_source"][Field.METADATA_KEY.value], docs.append(doc)
)
)
return docs return docs
def create_collection(self, dimension: int, **kwargs): def create_collection(
self, embeddings: list, metadatas: list[dict] | None = None, index_params: dict | None = None
):
if not embeddings:
raise ValueError(f"Embeddings list cannot be empty for collection create '{self._collection_name}'")
lock_name = f"vector_indexing_lock_{self._collection_name}" lock_name = f"vector_indexing_lock_{self._collection_name}"
with redis_client.lock(lock_name, timeout=20): with redis_client.lock(lock_name, timeout=20):
collection_exist_cache_key = f"vector_indexing_{self._collection_name}" collection_exist_cache_key = f"vector_indexing_{self._collection_name}"
if redis_client.get(collection_exist_cache_key): if redis_client.get(collection_exist_cache_key):
logger.info("Collection %s already exists.", self._collection_name) logger.info("Collection %s already exists.", self._collection_name)
return return
if self._client.indices.exists(index=self._collection_name): if not self._client.indices.exists(index=self._collection_name):
logger.info("%s already exists.", self._collection_name.lower()) index_body = {
redis_client.set(collection_exist_cache_key, 1, ex=3600) "settings": {"index": {"knn": True, "knn_routing": self._using_ugc}},
return "mappings": {
if len(self.kwargs) == 0 and len(kwargs) != 0: "properties": {
self.kwargs = copy.deepcopy(kwargs) Field.CONTENT_KEY.value: {"type": "text"},
vector_field = kwargs.pop("vector_field", Field.VECTOR.value) Field.VECTOR.value: {
shards = kwargs.pop("shards", 4) "type": "knn_vector",
"dimension": len(embeddings[0]), # Make sure the dimension is correct here
engine = kwargs.pop("engine", "lvector") "method": {
method_name = kwargs.pop("method_name", dify_config.DEFAULT_INDEX_TYPE) "name": index_params.get("index_type", "hnsw")
space_type = kwargs.pop("space_type", dify_config.DEFAULT_DISTANCE_TYPE) if index_params
data_type = kwargs.pop("data_type", "float") else dify_config.LINDORM_INDEX_TYPE,
"space_type": index_params.get("space_type", "l2")
hnsw_m = kwargs.pop("hnsw_m", 24) if index_params
hnsw_ef_construction = kwargs.pop("hnsw_ef_construction", 500) else dify_config.LINDORM_DISTANCE_TYPE,
ivfpq_m = kwargs.pop("ivfpq_m", dimension) "engine": "lvector",
nlist = kwargs.pop("nlist", 1000) },
centroids_use_hnsw = kwargs.pop("centroids_use_hnsw", nlist >= 5000) },
centroids_hnsw_m = kwargs.pop("centroids_hnsw_m", 24) }
centroids_hnsw_ef_construct = kwargs.pop("centroids_hnsw_ef_construct", 500)
centroids_hnsw_ef_search = kwargs.pop("centroids_hnsw_ef_search", 100)
mapping = default_text_mapping(
dimension,
method_name,
space_type=space_type,
shards=shards,
engine=engine,
data_type=data_type,
vector_field=vector_field,
hnsw_m=hnsw_m,
hnsw_ef_construction=hnsw_ef_construction,
nlist=nlist,
ivfpq_m=ivfpq_m,
centroids_use_hnsw=centroids_use_hnsw,
centroids_hnsw_m=centroids_hnsw_m,
centroids_hnsw_ef_construct=centroids_hnsw_ef_construct,
centroids_hnsw_ef_search=centroids_hnsw_ef_search,
using_ugc=self._using_ugc,
**kwargs,
)
self._client.indices.create(index=self._collection_name.lower(), body=mapping)
redis_client.set(collection_exist_cache_key, 1, ex=3600)
# logger.info(f"create index success: {self._collection_name}")
def default_text_mapping(dimension: int, method_name: str, **kwargs: Any):
excludes_from_source = kwargs.get("excludes_from_source", False)
analyzer = kwargs.get("analyzer", "ik_max_word")
text_field = kwargs.get("text_field", Field.CONTENT_KEY.value)
engine = kwargs["engine"]
shard = kwargs["shards"]
space_type = kwargs.get("space_type")
if space_type is None:
if method_name == "hnsw":
space_type = "l2"
else:
space_type = "cosine"
data_type = kwargs["data_type"]
vector_field = kwargs.get("vector_field", Field.VECTOR.value)
using_ugc = kwargs.get("using_ugc", False)
if method_name == "ivfpq":
ivfpq_m = kwargs["ivfpq_m"]
nlist = kwargs["nlist"]
centroids_use_hnsw = nlist > 10000
centroids_hnsw_m = 24
centroids_hnsw_ef_construct = 500
centroids_hnsw_ef_search = 100
parameters = {
"m": ivfpq_m,
"nlist": nlist,
"centroids_use_hnsw": centroids_use_hnsw,
"centroids_hnsw_m": centroids_hnsw_m,
"centroids_hnsw_ef_construct": centroids_hnsw_ef_construct,
"centroids_hnsw_ef_search": centroids_hnsw_ef_search,
}
elif method_name == "hnsw":
neighbor = kwargs["hnsw_m"]
ef_construction = kwargs["hnsw_ef_construction"]
parameters = {"m": neighbor, "ef_construction": ef_construction}
elif method_name == "flat":
parameters = {}
else:
raise RuntimeError(f"unexpected method_name: {method_name}")
mapping = {
"settings": {"index": {"number_of_shards": shard, "knn": True}},
"mappings": {
"properties": {
vector_field: {
"type": "knn_vector",
"dimension": dimension,
"data_type": data_type,
"method": {
"engine": engine,
"name": method_name,
"space_type": space_type,
"parameters": parameters,
}, },
}, }
text_field: {"type": "text", "analyzer": analyzer}, logger.info("Creating Lindorm Search index %s", self._collection_name)
} self._client.indices.create(index=self._collection_name, body=index_body)
}, redis_client.set(collection_exist_cache_key, 1, ex=3600)
}
if excludes_from_source:
# e.g. {"excludes": ["vector_field"]}
mapping["mappings"]["_source"] = {"excludes": [vector_field]}
if using_ugc and method_name == "ivfpq":
mapping["settings"]["index"]["knn_routing"] = True
mapping["settings"]["index"]["knn.offline.construction"] = True
elif (using_ugc and method_name == "hnsw") or (using_ugc and method_name == "flat"):
mapping["settings"]["index"]["knn_routing"] = True
return mapping
def default_text_search_query(
query_text: str,
k: int = 4,
text_field: str = Field.CONTENT_KEY.value,
must: list[dict] | None = None,
must_not: list[dict] | None = None,
should: list[dict] | None = None,
minimum_should_match: int = 0,
filters: list[dict] | None = None,
routing: str | None = None,
routing_field: str | None = None,
**kwargs,
):
query_clause: dict[str, Any] = {}
if routing is not None:
query_clause = {
"bool": {"must": [{"match": {text_field: query_text}}, {"term": {f"{routing_field}.keyword": routing}}]}
}
else:
query_clause = {"match": {text_field: query_text}}
# build the simplest search_query when only query_text is specified
if not must and not must_not and not should and not filters:
search_query = {"size": k, "query": query_clause}
return search_query
# build complex search_query when either of must/must_not/should/filter is specified
if must:
if not isinstance(must, list):
raise RuntimeError(f"unexpected [must] clause with {type(filters)}")
if query_clause not in must:
must.append(query_clause)
else:
must = [query_clause]
boolean_query: dict[str, Any] = {"must": must}
if must_not:
if not isinstance(must_not, list):
raise RuntimeError(f"unexpected [must_not] clause with {type(filters)}")
boolean_query["must_not"] = must_not
if should:
if not isinstance(should, list):
raise RuntimeError(f"unexpected [should] clause with {type(filters)}")
boolean_query["should"] = should
if minimum_should_match != 0:
boolean_query["minimum_should_match"] = minimum_should_match
if filters:
if not isinstance(filters, list):
raise RuntimeError(f"unexpected [filter] clause with {type(filters)}")
boolean_query["filter"] = filters
search_query = {"size": k, "query": {"bool": boolean_query}}
return search_query
def default_vector_search_query(
query_vector: list[float],
k: int = 4,
min_score: str = "0.0",
ef_search: str | None = None, # only for hnsw
nprobe: str | None = None, # "2000"
reorder_factor: str | None = None, # "20"
client_refactor: str | None = None, # "true"
vector_field: str = Field.VECTOR.value,
filters: list[dict] | None = None,
filter_type: str | None = None,
**kwargs,
):
if filters is not None:
filter_type = "pre_filter" if filter_type is None else filter_type
if not isinstance(filters, list):
raise RuntimeError(f"unexpected filter with {type(filters)}")
final_ext: dict[str, Any] = {"lvector": {}}
if min_score != "0.0":
final_ext["lvector"]["min_score"] = min_score
if ef_search:
final_ext["lvector"]["ef_search"] = ef_search
if nprobe:
final_ext["lvector"]["nprobe"] = nprobe
if reorder_factor:
final_ext["lvector"]["reorder_factor"] = reorder_factor
if client_refactor:
final_ext["lvector"]["client_refactor"] = client_refactor
search_query: dict[str, Any] = {
"size": k,
"_source": True, # force return '_source'
"query": {"knn": {vector_field: {"vector": query_vector, "k": k}}},
}
if filters is not None and len(filters) > 0:
# when using filter, transform filter from List[Dict] to Dict as valid format
filter_dict = {"bool": {"must": filters}} if len(filters) > 1 else filters[0]
search_query["query"]["knn"][vector_field]["filter"] = filter_dict # filter should be Dict
if filter_type:
final_ext["lvector"]["filter_type"] = filter_type
if final_ext != {"lvector": {}}:
search_query["ext"] = final_ext
return search_query
class LindormVectorStoreFactory(AbstractVectorFactory): class LindormVectorStoreFactory(AbstractVectorFactory):
def init_vector(self, dataset: Dataset, attributes: list, embeddings: Embeddings) -> LindormVectorStore: def init_vector(self, dataset: Dataset, attributes: list, embeddings: Embeddings) -> LindormVectorStore:
lindorm_config = LindormVectorStoreConfig( lindorm_config = LindormVectorStoreConfig(
hosts=dify_config.LINDORM_URL or "", hosts=dify_config.LINDORM_URL,
username=dify_config.LINDORM_USERNAME, username=dify_config.LINDORM_USERNAME,
password=dify_config.LINDORM_PASSWORD, password=dify_config.LINDORM_PASSWORD,
using_ugc=dify_config.USING_UGC_INDEX, using_ugc=dify_config.LINDORM_USING_UGC,
request_timeout=dify_config.LINDORM_QUERY_TIMEOUT, request_timeout=dify_config.LINDORM_QUERY_TIMEOUT,
) )
using_ugc = dify_config.USING_UGC_INDEX using_ugc = dify_config.LINDORM_USING_UGC
if using_ugc is None: if using_ugc is None:
raise ValueError("USING_UGC_INDEX is not set") raise ValueError("LINDORM_USING_UGC is not set")
routing_value = None routing_value = None
if dataset.index_struct: if dataset.index_struct:
# if an existed record's index_struct_dict doesn't contain using_ugc field, # if an existed record's index_struct_dict doesn't contain using_ugc field,
@ -571,27 +387,27 @@ class LindormVectorStoreFactory(AbstractVectorFactory):
index_type = dataset.index_struct_dict["index_type"] index_type = dataset.index_struct_dict["index_type"]
distance_type = dataset.index_struct_dict["distance_type"] distance_type = dataset.index_struct_dict["distance_type"]
routing_value = dataset.index_struct_dict["vector_store"]["class_prefix"] routing_value = dataset.index_struct_dict["vector_store"]["class_prefix"]
index_name = f"{UGC_INDEX_PREFIX}_{dimension}_{index_type}_{distance_type}" index_name = f"{UGC_INDEX_PREFIX}_{dimension}_{index_type}_{distance_type}".lower()
else: else:
index_name = dataset.index_struct_dict["vector_store"]["class_prefix"] index_name = dataset.index_struct_dict["vector_store"]["class_prefix"].lower()
else: else:
embedding_vector = embeddings.embed_query("hello word") embedding_vector = embeddings.embed_query("hello word")
dimension = len(embedding_vector) dimension = len(embedding_vector)
index_type = dify_config.DEFAULT_INDEX_TYPE
distance_type = dify_config.DEFAULT_DISTANCE_TYPE
class_prefix = Dataset.gen_collection_name_by_id(dataset.id) class_prefix = Dataset.gen_collection_name_by_id(dataset.id)
index_struct_dict = { index_struct_dict = {
"type": VectorType.LINDORM, "type": VectorType.LINDORM,
"vector_store": {"class_prefix": class_prefix}, "vector_store": {"class_prefix": class_prefix},
"index_type": index_type, "index_type": dify_config.LINDORM_INDEX_TYPE,
"dimension": dimension, "dimension": dimension,
"distance_type": distance_type, "distance_type": dify_config.LINDORM_DISTANCE_TYPE,
"using_ugc": using_ugc, "using_ugc": using_ugc,
} }
dataset.index_struct = json.dumps(index_struct_dict) dataset.index_struct = json.dumps(index_struct_dict)
if using_ugc: if using_ugc:
index_name = f"{UGC_INDEX_PREFIX}_{dimension}_{index_type}_{distance_type}" index_type = dify_config.LINDORM_INDEX_TYPE
routing_value = class_prefix distance_type = dify_config.LINDORM_DISTANCE_TYPE
index_name = f"{UGC_INDEX_PREFIX}_{dimension}_{index_type}_{distance_type}".lower()
routing_value = class_prefix.lower()
else: else:
index_name = class_prefix index_name = class_prefix.lower()
return LindormVectorStore(index_name, lindorm_config, routing_value=routing_value, using_ugc=using_ugc) return LindormVectorStore(index_name, lindorm_config, routing_value=routing_value, using_ugc=using_ugc)

View File

@ -373,13 +373,14 @@ class WorkflowService:
def _validate_llm_model_config(self, tenant_id: str, provider: str, model_name: str) -> None: def _validate_llm_model_config(self, tenant_id: str, provider: str, model_name: str) -> None:
""" """
Validate that an LLM model configuration can fetch valid credentials. Validate that an LLM model configuration can fetch valid credentials and has active status.
This method attempts to get the model instance and validates that: This method attempts to get the model instance and validates that:
1. The provider exists and is configured 1. The provider exists and is configured
2. The model exists in the provider 2. The model exists in the provider
3. Credentials can be fetched for the model 3. Credentials can be fetched for the model
4. The credentials pass policy compliance checks 4. The credentials pass policy compliance checks
5. The model status is ACTIVE (not NO_CONFIGURE, DISABLED, etc.)
:param tenant_id: The tenant ID :param tenant_id: The tenant ID
:param provider: The provider name :param provider: The provider name
@ -389,6 +390,7 @@ class WorkflowService:
try: try:
from core.model_manager import ModelManager from core.model_manager import ModelManager
from core.model_runtime.entities.model_entities import ModelType from core.model_runtime.entities.model_entities import ModelType
from core.provider_manager import ProviderManager
# Get model instance to validate provider+model combination # Get model instance to validate provider+model combination
model_manager = ModelManager() model_manager = ModelManager()
@ -400,6 +402,22 @@ class WorkflowService:
# via ProviderConfiguration.get_current_credentials() -> _check_credential_policy_compliance() # via ProviderConfiguration.get_current_credentials() -> _check_credential_policy_compliance()
# If it fails, an exception will be raised # If it fails, an exception will be raised
# Additionally, check the model status to ensure it's ACTIVE
provider_manager = ProviderManager()
provider_configurations = provider_manager.get_configurations(tenant_id)
models = provider_configurations.get_models(provider=provider, model_type=ModelType.LLM)
target_model = None
for model in models:
if model.model == model_name and model.provider.provider == provider:
target_model = model
break
if target_model:
target_model.raise_for_status()
else:
raise ValueError(f"Model {model_name} not found for provider {provider}")
except Exception as e: except Exception as e:
raise ValueError( raise ValueError(
f"Failed to validate LLM model configuration (provider: {provider}, model: {model_name}): {str(e)}" f"Failed to validate LLM model configuration (provider: {provider}, model: {model_name}): {str(e)}"

View File

@ -643,9 +643,10 @@ VIKINGDB_CONNECTION_TIMEOUT=30
VIKINGDB_SOCKET_TIMEOUT=30 VIKINGDB_SOCKET_TIMEOUT=30
# Lindorm configuration, only available when VECTOR_STORE is `lindorm` # Lindorm configuration, only available when VECTOR_STORE is `lindorm`
LINDORM_URL=http://lindorm:30070 LINDORM_URL=http://localhost:30070
LINDORM_USERNAME=lindorm LINDORM_USERNAME=admin
LINDORM_PASSWORD=lindorm LINDORM_PASSWORD=admin
LINDORM_USING_UGC=True
LINDORM_QUERY_TIMEOUT=1 LINDORM_QUERY_TIMEOUT=1
# OceanBase Vector configuration, only available when VECTOR_STORE is `oceanbase` # OceanBase Vector configuration, only available when VECTOR_STORE is `oceanbase`

View File

@ -292,9 +292,10 @@ x-shared-env: &shared-api-worker-env
VIKINGDB_SCHEMA: ${VIKINGDB_SCHEMA:-http} VIKINGDB_SCHEMA: ${VIKINGDB_SCHEMA:-http}
VIKINGDB_CONNECTION_TIMEOUT: ${VIKINGDB_CONNECTION_TIMEOUT:-30} VIKINGDB_CONNECTION_TIMEOUT: ${VIKINGDB_CONNECTION_TIMEOUT:-30}
VIKINGDB_SOCKET_TIMEOUT: ${VIKINGDB_SOCKET_TIMEOUT:-30} VIKINGDB_SOCKET_TIMEOUT: ${VIKINGDB_SOCKET_TIMEOUT:-30}
LINDORM_URL: ${LINDORM_URL:-http://lindorm:30070} LINDORM_URL: ${LINDORM_URL:-http://localhost:30070}
LINDORM_USERNAME: ${LINDORM_USERNAME:-lindorm} LINDORM_USERNAME: ${LINDORM_USERNAME:-admin}
LINDORM_PASSWORD: ${LINDORM_PASSWORD:-lindorm} LINDORM_PASSWORD: ${LINDORM_PASSWORD:-admin}
LINDORM_USING_UGC: ${LINDORM_USING_UGC:-True}
LINDORM_QUERY_TIMEOUT: ${LINDORM_QUERY_TIMEOUT:-1} LINDORM_QUERY_TIMEOUT: ${LINDORM_QUERY_TIMEOUT:-1}
OCEANBASE_VECTOR_HOST: ${OCEANBASE_VECTOR_HOST:-oceanbase} OCEANBASE_VECTOR_HOST: ${OCEANBASE_VECTOR_HOST:-oceanbase}
OCEANBASE_VECTOR_PORT: ${OCEANBASE_VECTOR_PORT:-2881} OCEANBASE_VECTOR_PORT: ${OCEANBASE_VECTOR_PORT:-2881}