Merge branch 'main' into docker-env

This commit is contained in:
-LAN- 2025-09-08 00:04:55 +08:00 committed by GitHub
commit 02eee92f03
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
22 changed files with 890 additions and 381 deletions

7
.gitignore vendored
View File

@ -215,6 +215,13 @@ mise.toml
# Next.js build output
.next/
# PWA generated files
web/public/sw.js
web/public/sw.js.map
web/public/workbox-*.js
web/public/workbox-*.js.map
web/public/fallback-*.js
# AI Assistant
.roo/
api/.env.backup

View File

@ -300,8 +300,7 @@ class DatasetQueueMonitorConfig(BaseSettings):
class MiddlewareConfig(
# place the configs in alphabet order
CeleryConfig,
DatabaseConfig,
CeleryConfig, # Note: CeleryConfig already inherits from DatabaseConfig
KeywordStoreConfig,
RedisConfig,
# configs of storage and storage providers

View File

@ -1,9 +1,10 @@
from typing import Optional
from pydantic import BaseModel, Field
from pydantic import Field
from pydantic_settings import BaseSettings
class ClickzettaConfig(BaseModel):
class ClickzettaConfig(BaseSettings):
"""
Clickzetta Lakehouse vector database configuration
"""

View File

@ -1,7 +1,8 @@
from pydantic import BaseModel, Field
from pydantic import Field
from pydantic_settings import BaseSettings
class MatrixoneConfig(BaseModel):
class MatrixoneConfig(BaseSettings):
"""Matrixone vector database configuration."""
MATRIXONE_HOST: str = Field(default="localhost", description="Host address of the Matrixone server")

View File

@ -1,6 +1,6 @@
from pydantic import Field
from configs.packaging.pyproject import PyProjectConfig, PyProjectTomlConfig
from configs.packaging.pyproject import PyProjectTomlConfig
class PackagingInfo(PyProjectTomlConfig):

View File

@ -4,8 +4,9 @@ import logging
import os
import threading
import time
from collections.abc import Mapping
from collections.abc import Callable, Mapping
from pathlib import Path
from typing import Any
from .python_3x import http_request, makedirs_wrapper
from .utils import (
@ -25,13 +26,13 @@ logger = logging.getLogger(__name__)
class ApolloClient:
def __init__(
self,
config_url,
app_id,
cluster="default",
secret="",
start_hot_update=True,
change_listener=None,
_notification_map=None,
config_url: str,
app_id: str,
cluster: str = "default",
secret: str = "",
start_hot_update: bool = True,
change_listener: Callable[[str, str, str, Any], None] | None = None,
_notification_map: dict[str, int] | None = None,
):
# Core routing parameters
self.config_url = config_url
@ -47,17 +48,17 @@ class ApolloClient:
# Private control variables
self._cycle_time = 5
self._stopping = False
self._cache = {}
self._no_key = {}
self._hash = {}
self._cache: dict[str, dict[str, Any]] = {}
self._no_key: dict[str, str] = {}
self._hash: dict[str, str] = {}
self._pull_timeout = 75
self._cache_file_path = os.path.expanduser("~") + "/.dify/config/remote-settings/apollo/cache/"
self._long_poll_thread = None
self._long_poll_thread: threading.Thread | None = None
self._change_listener = change_listener # "add" "delete" "update"
if _notification_map is None:
_notification_map = {"application": -1}
self._notification_map = _notification_map
self.last_release_key = None
self.last_release_key: str | None = None
# Private startup method
self._path_checker()
if start_hot_update:
@ -68,7 +69,7 @@ class ApolloClient:
heartbeat.daemon = True
heartbeat.start()
def get_json_from_net(self, namespace="application"):
def get_json_from_net(self, namespace: str = "application") -> dict[str, Any] | None:
url = "{}/configs/{}/{}/{}?releaseKey={}&ip={}".format(
self.config_url, self.app_id, self.cluster, namespace, "", self.ip
)
@ -88,7 +89,7 @@ class ApolloClient:
logger.exception("an error occurred in get_json_from_net")
return None
def get_value(self, key, default_val=None, namespace="application"):
def get_value(self, key: str, default_val: Any = None, namespace: str = "application") -> Any:
try:
# read memory configuration
namespace_cache = self._cache.get(namespace)
@ -104,7 +105,8 @@ class ApolloClient:
namespace_data = self.get_json_from_net(namespace)
val = get_value_from_dict(namespace_data, key)
if val is not None:
self._update_cache_and_file(namespace_data, namespace)
if namespace_data is not None:
self._update_cache_and_file(namespace_data, namespace)
return val
# read the file configuration
@ -126,23 +128,23 @@ class ApolloClient:
# to ensure the real-time correctness of the function call.
# If the user does not have the same default val twice
# and the default val is used here, there may be a problem.
def _set_local_cache_none(self, namespace, key):
def _set_local_cache_none(self, namespace: str, key: str) -> None:
no_key = no_key_cache_key(namespace, key)
self._no_key[no_key] = key
def _start_hot_update(self):
def _start_hot_update(self) -> None:
self._long_poll_thread = threading.Thread(target=self._listener)
# When the asynchronous thread is started, the daemon thread will automatically exit
# when the main thread is launched.
self._long_poll_thread.daemon = True
self._long_poll_thread.start()
def stop(self):
def stop(self) -> None:
self._stopping = True
logger.info("Stopping listener...")
# Call the set callback function, and if it is abnormal, try it out
def _call_listener(self, namespace, old_kv, new_kv):
def _call_listener(self, namespace: str, old_kv: dict[str, Any] | None, new_kv: dict[str, Any] | None) -> None:
if self._change_listener is None:
return
if old_kv is None:
@ -168,12 +170,12 @@ class ApolloClient:
except BaseException as e:
logger.warning(str(e))
def _path_checker(self):
def _path_checker(self) -> None:
if not os.path.isdir(self._cache_file_path):
makedirs_wrapper(self._cache_file_path)
# update the local cache and file cache
def _update_cache_and_file(self, namespace_data, namespace="application"):
def _update_cache_and_file(self, namespace_data: dict[str, Any], namespace: str = "application") -> None:
# update the local cache
self._cache[namespace] = namespace_data
# update the file cache
@ -187,7 +189,7 @@ class ApolloClient:
self._hash[namespace] = new_hash
# get the configuration from the local file
def _get_local_cache(self, namespace="application"):
def _get_local_cache(self, namespace: str = "application") -> dict[str, Any]:
cache_file_path = os.path.join(self._cache_file_path, f"{self.app_id}_configuration_{namespace}.txt")
if os.path.isfile(cache_file_path):
with open(cache_file_path) as f:
@ -195,8 +197,8 @@ class ApolloClient:
return result
return {}
def _long_poll(self):
notifications = []
def _long_poll(self) -> None:
notifications: list[dict[str, Any]] = []
for key in self._cache:
namespace_data = self._cache[key]
notification_id = -1
@ -236,7 +238,7 @@ class ApolloClient:
except Exception as e:
logger.warning(str(e))
def _get_net_and_set_local(self, namespace, n_id, call_change=False):
def _get_net_and_set_local(self, namespace: str, n_id: int, call_change: bool = False) -> None:
namespace_data = self.get_json_from_net(namespace)
if not namespace_data:
return
@ -248,7 +250,7 @@ class ApolloClient:
new_kv = namespace_data.get(CONFIGURATIONS)
self._call_listener(namespace, old_kv, new_kv)
def _listener(self):
def _listener(self) -> None:
logger.info("start long_poll")
while not self._stopping:
self._long_poll()
@ -266,13 +268,13 @@ class ApolloClient:
headers["Timestamp"] = time_unix_now
return headers
def _heart_beat(self):
def _heart_beat(self) -> None:
while not self._stopping:
for namespace in self._notification_map:
self._do_heart_beat(namespace)
time.sleep(60 * 10) # 10 minutes
def _do_heart_beat(self, namespace):
def _do_heart_beat(self, namespace: str) -> None:
url = f"{self.config_url}/configs/{self.app_id}/{self.cluster}/{namespace}?ip={self.ip}"
try:
code, body = http_request(url, timeout=3, headers=self._sign_headers(url))
@ -292,7 +294,7 @@ class ApolloClient:
logger.exception("an error occurred in _do_heart_beat")
return None
def get_all_dicts(self, namespace):
def get_all_dicts(self, namespace: str) -> dict[str, Any] | None:
namespace_data = self._cache.get(namespace)
if namespace_data is None:
net_namespace_data = self.get_json_from_net(namespace)

View File

@ -2,6 +2,8 @@ import logging
import os
import ssl
import urllib.request
from collections.abc import Mapping
from typing import Any
from urllib import parse
from urllib.error import HTTPError
@ -19,9 +21,9 @@ urllib.request.install_opener(opener)
logger = logging.getLogger(__name__)
def http_request(url, timeout, headers={}):
def http_request(url: str, timeout: int | float, headers: Mapping[str, str] = {}) -> tuple[int, str | None]:
try:
request = urllib.request.Request(url, headers=headers)
request = urllib.request.Request(url, headers=dict(headers))
res = urllib.request.urlopen(request, timeout=timeout)
body = res.read().decode("utf-8")
return res.code, body
@ -33,9 +35,9 @@ def http_request(url, timeout, headers={}):
raise e
def url_encode(params):
def url_encode(params: dict[str, Any]) -> str:
return parse.urlencode(params)
def makedirs_wrapper(path):
def makedirs_wrapper(path: str) -> None:
os.makedirs(path, exist_ok=True)

View File

@ -1,5 +1,6 @@
import hashlib
import socket
from typing import Any
from .python_3x import url_encode
@ -10,7 +11,7 @@ NAMESPACE_NAME = "namespaceName"
# add timestamps uris and keys
def signature(timestamp, uri, secret):
def signature(timestamp: str, uri: str, secret: str) -> str:
import base64
import hmac
@ -19,16 +20,16 @@ def signature(timestamp, uri, secret):
return base64.b64encode(hmac_code).decode()
def url_encode_wrapper(params):
def url_encode_wrapper(params: dict[str, Any]) -> str:
return url_encode(params)
def no_key_cache_key(namespace, key):
def no_key_cache_key(namespace: str, key: str) -> str:
return f"{namespace}{len(namespace)}{key}"
# Returns whether the obtained value is obtained, and None if it does not
def get_value_from_dict(namespace_cache, key):
def get_value_from_dict(namespace_cache: dict[str, Any] | None, key: str) -> Any | None:
if namespace_cache:
kv_data = namespace_cache.get(CONFIGURATIONS)
if kv_data is None:
@ -38,7 +39,7 @@ def get_value_from_dict(namespace_cache, key):
return None
def init_ip():
def init_ip() -> str:
ip = ""
s = None
try:

View File

@ -11,16 +11,16 @@ logger = logging.getLogger(__name__)
from configs.remote_settings_sources.base import RemoteSettingsSource
from .utils import _parse_config
from .utils import parse_config
class NacosSettingsSource(RemoteSettingsSource):
def __init__(self, configs: Mapping[str, Any]):
self.configs = configs
self.remote_configs: dict[str, Any] = {}
self.remote_configs: dict[str, str] = {}
self.async_init()
def async_init(self):
def async_init(self) -> None:
data_id = os.getenv("DIFY_ENV_NACOS_DATA_ID", "dify-api-env.properties")
group = os.getenv("DIFY_ENV_NACOS_GROUP", "nacos-dify")
tenant = os.getenv("DIFY_ENV_NACOS_NAMESPACE", "")
@ -33,18 +33,15 @@ class NacosSettingsSource(RemoteSettingsSource):
logger.exception("[get-access-token] exception occurred")
raise
def _parse_config(self, content: str):
def _parse_config(self, content: str) -> dict[str, str]:
if not content:
return {}
try:
return _parse_config(self, content)
return parse_config(content)
except Exception as e:
raise RuntimeError(f"Failed to parse config: {e}")
def get_field_value(self, field: FieldInfo, field_name: str) -> tuple[Any, str, bool]:
if not isinstance(self.remote_configs, dict):
raise ValueError(f"remote configs is not dict, but {type(self.remote_configs)}")
field_value = self.remote_configs.get(field_name)
if field_value is None:
return None, field_name, False

View File

@ -17,11 +17,17 @@ class NacosHttpClient:
self.ak = os.getenv("DIFY_ENV_NACOS_ACCESS_KEY")
self.sk = os.getenv("DIFY_ENV_NACOS_SECRET_KEY")
self.server = os.getenv("DIFY_ENV_NACOS_SERVER_ADDR", "localhost:8848")
self.token = None
self.token: str | None = None
self.token_ttl = 18000
self.token_expire_time: float = 0
def http_request(self, url, method="GET", headers=None, params=None):
def http_request(
self, url: str, method: str = "GET", headers: dict[str, str] | None = None, params: dict[str, str] | None = None
) -> str:
if headers is None:
headers = {}
if params is None:
params = {}
try:
self._inject_auth_info(headers, params)
response = requests.request(method, url="http://" + self.server + url, headers=headers, params=params)
@ -30,7 +36,7 @@ class NacosHttpClient:
except requests.RequestException as e:
return f"Request to Nacos failed: {e}"
def _inject_auth_info(self, headers, params, module="config"):
def _inject_auth_info(self, headers: dict[str, str], params: dict[str, str], module: str = "config") -> None:
headers.update({"User-Agent": "Nacos-Http-Client-In-Dify:v0.0.1"})
if module == "login":
@ -45,16 +51,17 @@ class NacosHttpClient:
headers["timeStamp"] = ts
if self.username and self.password:
self.get_access_token(force_refresh=False)
params["accessToken"] = self.token
if self.token is not None:
params["accessToken"] = self.token
def __do_sign(self, sign_str, sk):
def __do_sign(self, sign_str: str, sk: str) -> str:
return (
base64.encodebytes(hmac.new(sk.encode(), sign_str.encode(), digestmod=hashlib.sha1).digest())
.decode()
.strip()
)
def get_sign_str(self, group, tenant, ts):
def get_sign_str(self, group: str, tenant: str, ts: str) -> str:
sign_str = ""
if tenant:
sign_str = tenant + "+"
@ -63,7 +70,7 @@ class NacosHttpClient:
sign_str += ts # Directly concatenate ts without conditional checks, because the nacos auth header forced it.
return sign_str
def get_access_token(self, force_refresh=False):
def get_access_token(self, force_refresh: bool = False) -> str | None:
current_time = time.time()
if self.token and not force_refresh and self.token_expire_time > current_time:
return self.token
@ -77,6 +84,7 @@ class NacosHttpClient:
self.token = response_data.get("accessToken")
self.token_ttl = response_data.get("tokenTtl", 18000)
self.token_expire_time = current_time + self.token_ttl - 10
return self.token
except Exception:
logger.exception("[get-access-token] exception occur")
raise

View File

@ -1,4 +1,4 @@
def _parse_config(self, content: str) -> dict[str, str]:
def parse_config(content: str) -> dict[str, str]:
config: dict[str, str] = {}
if not content:
return config

View File

@ -99,9 +99,9 @@ class MatrixoneVector(BaseVector):
return client
try:
client.create_full_text_index()
redis_client.set(collection_exist_cache_key, 1, ex=3600)
except Exception:
logger.exception("Failed to create full text index")
redis_client.set(collection_exist_cache_key, 1, ex=3600)
return client
def add_texts(self, documents: list[Document], embeddings: list[list[float]], **kwargs):

View File

@ -11,7 +11,7 @@ from typing import TYPE_CHECKING, Any, Optional, cast
from flask import Flask, current_app
from configs import dify_config
from core.variables import ArrayVariable, IntegerVariable, NoneVariable
from core.variables import IntegerVariable, NoneSegment
from core.variables.segments import ArrayAnySegment, ArraySegment
from core.workflow.entities.node_entities import (
NodeRunResult,
@ -112,10 +112,10 @@ class IterationNode(BaseNode):
if not variable:
raise IteratorVariableNotFoundError(f"iterator variable {self._node_data.iterator_selector} not found")
if not isinstance(variable, ArrayVariable) and not isinstance(variable, NoneVariable):
if not isinstance(variable, ArraySegment) and not isinstance(variable, NoneSegment):
raise InvalidIteratorValueError(f"invalid iterator value: {variable}, please provide a list.")
if isinstance(variable, NoneVariable) or len(variable.value) == 0:
if isinstance(variable, NoneSegment) or len(variable.value) == 0:
# Try our best to preserve the type informat.
if isinstance(variable, ArraySegment):
output = variable.model_copy(update={"value": []})

View File

@ -1,5 +1,7 @@
{
"include": ["."],
"include": [
"."
],
"exclude": [
"tests/",
"migrations/",
@ -19,10 +21,9 @@
"events/",
"contexts/",
"constants/",
"configs/",
"commands.py"
],
"typeCheckingMode": "strict",
"pythonVersion": "3.11",
"pythonPlatform": "All"
}
}

View File

@ -0,0 +1,720 @@
"""
Integration tests for batch_clean_document_task using testcontainers.
This module tests the batch document cleaning functionality with real database
and storage containers to ensure proper cleanup of documents, segments, and files.
"""
import json
import uuid
from unittest.mock import Mock, patch
import pytest
from faker import Faker
from extensions.ext_database import db
from models.account import Account, Tenant, TenantAccountJoin, TenantAccountRole
from models.dataset import Dataset, Document, DocumentSegment
from models.model import UploadFile
from tasks.batch_clean_document_task import batch_clean_document_task
class TestBatchCleanDocumentTask:
"""Integration tests for batch_clean_document_task using testcontainers."""
@pytest.fixture
def mock_external_service_dependencies(self):
"""Mock setup for external service dependencies."""
with (
patch("extensions.ext_storage.storage") as mock_storage,
patch("core.rag.index_processor.index_processor_factory.IndexProcessorFactory") as mock_index_factory,
patch("core.tools.utils.web_reader_tool.get_image_upload_file_ids") as mock_get_image_ids,
):
# Setup default mock returns
mock_storage.delete.return_value = None
# Mock index processor
mock_index_processor = Mock()
mock_index_processor.clean.return_value = None
mock_index_factory.return_value.init_index_processor.return_value = mock_index_processor
# Mock image file ID extraction
mock_get_image_ids.return_value = []
yield {
"storage": mock_storage,
"index_factory": mock_index_factory,
"index_processor": mock_index_processor,
"get_image_ids": mock_get_image_ids,
}
def _create_test_account(self, db_session_with_containers):
"""
Helper method to create a test account for testing.
Args:
db_session_with_containers: Database session from testcontainers infrastructure
Returns:
Account: Created account instance
"""
fake = Faker()
# Create account
account = Account(
email=fake.email(),
name=fake.name(),
interface_language="en-US",
status="active",
)
db.session.add(account)
db.session.commit()
# Create tenant for the account
tenant = Tenant(
name=fake.company(),
status="normal",
)
db.session.add(tenant)
db.session.commit()
# Create tenant-account join
join = TenantAccountJoin(
tenant_id=tenant.id,
account_id=account.id,
role=TenantAccountRole.OWNER.value,
current=True,
)
db.session.add(join)
db.session.commit()
# Set current tenant for account
account.current_tenant = tenant
return account
def _create_test_dataset(self, db_session_with_containers, account):
"""
Helper method to create a test dataset for testing.
Args:
db_session_with_containers: Database session from testcontainers infrastructure
account: Account instance
Returns:
Dataset: Created dataset instance
"""
fake = Faker()
dataset = Dataset(
id=str(uuid.uuid4()),
tenant_id=account.current_tenant.id,
name=fake.word(),
description=fake.sentence(),
data_source_type="upload_file",
created_by=account.id,
embedding_model="text-embedding-ada-002",
embedding_model_provider="openai",
)
db.session.add(dataset)
db.session.commit()
return dataset
def _create_test_document(self, db_session_with_containers, dataset, account):
"""
Helper method to create a test document for testing.
Args:
db_session_with_containers: Database session from testcontainers infrastructure
dataset: Dataset instance
account: Account instance
Returns:
Document: Created document instance
"""
fake = Faker()
document = Document(
id=str(uuid.uuid4()),
tenant_id=account.current_tenant.id,
dataset_id=dataset.id,
position=0,
name=fake.word(),
data_source_type="upload_file",
data_source_info=json.dumps({"upload_file_id": str(uuid.uuid4())}),
batch="test_batch",
created_from="test",
created_by=account.id,
indexing_status="completed",
doc_form="text_model",
)
db.session.add(document)
db.session.commit()
return document
def _create_test_document_segment(self, db_session_with_containers, document, account):
"""
Helper method to create a test document segment for testing.
Args:
db_session_with_containers: Database session from testcontainers infrastructure
document: Document instance
account: Account instance
Returns:
DocumentSegment: Created document segment instance
"""
fake = Faker()
segment = DocumentSegment(
id=str(uuid.uuid4()),
tenant_id=account.current_tenant.id,
dataset_id=document.dataset_id,
document_id=document.id,
position=0,
content=fake.text(),
word_count=100,
tokens=50,
index_node_id=str(uuid.uuid4()),
created_by=account.id,
status="completed",
)
db.session.add(segment)
db.session.commit()
return segment
def _create_test_upload_file(self, db_session_with_containers, account):
"""
Helper method to create a test upload file for testing.
Args:
db_session_with_containers: Database session from testcontainers infrastructure
account: Account instance
Returns:
UploadFile: Created upload file instance
"""
fake = Faker()
from datetime import datetime
from models.enums import CreatorUserRole
upload_file = UploadFile(
tenant_id=account.current_tenant.id,
storage_type="local",
key=f"test_files/{fake.file_name()}",
name=fake.file_name(),
size=1024,
extension="txt",
mime_type="text/plain",
created_by_role=CreatorUserRole.ACCOUNT,
created_by=account.id,
created_at=datetime.utcnow(),
used=False,
)
db.session.add(upload_file)
db.session.commit()
return upload_file
def test_batch_clean_document_task_successful_cleanup(
self, db_session_with_containers, mock_external_service_dependencies
):
"""
Test successful cleanup of documents with segments and files.
This test verifies that the task properly cleans up:
- Document segments from the index
- Associated image files from storage
- Upload files from storage and database
"""
# Create test data
account = self._create_test_account(db_session_with_containers)
dataset = self._create_test_dataset(db_session_with_containers, account)
document = self._create_test_document(db_session_with_containers, dataset, account)
segment = self._create_test_document_segment(db_session_with_containers, document, account)
upload_file = self._create_test_upload_file(db_session_with_containers, account)
# Update document to reference the upload file
document.data_source_info = json.dumps({"upload_file_id": upload_file.id})
db.session.commit()
# Store original IDs for verification
document_id = document.id
segment_id = segment.id
file_id = upload_file.id
# Execute the task
batch_clean_document_task(
document_ids=[document_id], dataset_id=dataset.id, doc_form=dataset.doc_form, file_ids=[file_id]
)
# Verify that the task completed successfully
# The task should have processed the segment and cleaned up the database
# Verify database cleanup
db.session.commit() # Ensure all changes are committed
# Check that segment is deleted
deleted_segment = db.session.query(DocumentSegment).filter_by(id=segment_id).first()
assert deleted_segment is None
# Check that upload file is deleted
deleted_file = db.session.query(UploadFile).filter_by(id=file_id).first()
assert deleted_file is None
def test_batch_clean_document_task_with_image_files(
self, db_session_with_containers, mock_external_service_dependencies
):
"""
Test cleanup of documents containing image references.
This test verifies that the task properly handles documents with
image content and cleans up associated segments.
"""
# Create test data
account = self._create_test_account(db_session_with_containers)
dataset = self._create_test_dataset(db_session_with_containers, account)
document = self._create_test_document(db_session_with_containers, dataset, account)
# Create segment with simple content (no image references)
segment = DocumentSegment(
id=str(uuid.uuid4()),
tenant_id=account.current_tenant.id,
dataset_id=document.dataset_id,
document_id=document.id,
position=0,
content="Simple text content without images",
word_count=100,
tokens=50,
index_node_id=str(uuid.uuid4()),
created_by=account.id,
status="completed",
)
db.session.add(segment)
db.session.commit()
# Store original IDs for verification
segment_id = segment.id
document_id = document.id
# Execute the task
batch_clean_document_task(
document_ids=[document_id], dataset_id=dataset.id, doc_form=dataset.doc_form, file_ids=[]
)
# Verify database cleanup
db.session.commit()
# Check that segment is deleted
deleted_segment = db.session.query(DocumentSegment).filter_by(id=segment_id).first()
assert deleted_segment is None
# Verify that the task completed successfully by checking the log output
# The task should have processed the segment and cleaned up the database
def test_batch_clean_document_task_no_segments(
self, db_session_with_containers, mock_external_service_dependencies
):
"""
Test cleanup when document has no segments.
This test verifies that the task handles documents without segments
gracefully and still cleans up associated files.
"""
# Create test data without segments
account = self._create_test_account(db_session_with_containers)
dataset = self._create_test_dataset(db_session_with_containers, account)
document = self._create_test_document(db_session_with_containers, dataset, account)
upload_file = self._create_test_upload_file(db_session_with_containers, account)
# Update document to reference the upload file
document.data_source_info = json.dumps({"upload_file_id": upload_file.id})
db.session.commit()
# Store original IDs for verification
document_id = document.id
file_id = upload_file.id
# Execute the task
batch_clean_document_task(
document_ids=[document_id], dataset_id=dataset.id, doc_form=dataset.doc_form, file_ids=[file_id]
)
# Verify that the task completed successfully
# Since there are no segments, the task should handle this gracefully
# Verify database cleanup
db.session.commit()
# Check that upload file is deleted
deleted_file = db.session.query(UploadFile).filter_by(id=file_id).first()
assert deleted_file is None
# Verify database cleanup
db.session.commit()
# Check that upload file is deleted
deleted_file = db.session.query(UploadFile).filter_by(id=file_id).first()
assert deleted_file is None
def test_batch_clean_document_task_dataset_not_found(
self, db_session_with_containers, mock_external_service_dependencies
):
"""
Test cleanup when dataset is not found.
This test verifies that the task properly handles the case where
the specified dataset does not exist in the database.
"""
# Create test data
account = self._create_test_account(db_session_with_containers)
dataset = self._create_test_dataset(db_session_with_containers, account)
document = self._create_test_document(db_session_with_containers, dataset, account)
# Store original IDs for verification
document_id = document.id
dataset_id = dataset.id
# Delete the dataset to simulate not found scenario
db.session.delete(dataset)
db.session.commit()
# Execute the task with non-existent dataset
batch_clean_document_task(document_ids=[document_id], dataset_id=dataset_id, doc_form="text_model", file_ids=[])
# Verify that no index processing occurred
mock_external_service_dependencies["index_processor"].clean.assert_not_called()
# Verify that no storage operations occurred
mock_external_service_dependencies["storage"].delete.assert_not_called()
# Verify that no database cleanup occurred
db.session.commit()
# Document should still exist since cleanup failed
existing_document = db.session.query(Document).filter_by(id=document_id).first()
assert existing_document is not None
def test_batch_clean_document_task_storage_cleanup_failure(
self, db_session_with_containers, mock_external_service_dependencies
):
"""
Test cleanup when storage operations fail.
This test verifies that the task continues processing even when
storage cleanup operations fail, ensuring database cleanup still occurs.
"""
# Create test data
account = self._create_test_account(db_session_with_containers)
dataset = self._create_test_dataset(db_session_with_containers, account)
document = self._create_test_document(db_session_with_containers, dataset, account)
segment = self._create_test_document_segment(db_session_with_containers, document, account)
upload_file = self._create_test_upload_file(db_session_with_containers, account)
# Update document to reference the upload file
document.data_source_info = json.dumps({"upload_file_id": upload_file.id})
db.session.commit()
# Store original IDs for verification
document_id = document.id
segment_id = segment.id
file_id = upload_file.id
# Mock storage.delete to raise an exception
mock_external_service_dependencies["storage"].delete.side_effect = Exception("Storage error")
# Execute the task
batch_clean_document_task(
document_ids=[document_id], dataset_id=dataset.id, doc_form=dataset.doc_form, file_ids=[file_id]
)
# Verify that the task completed successfully despite storage failure
# The task should continue processing even when storage operations fail
# Verify database cleanup still occurred despite storage failure
db.session.commit()
# Check that segment is deleted from database
deleted_segment = db.session.query(DocumentSegment).filter_by(id=segment_id).first()
assert deleted_segment is None
# Check that upload file is deleted from database
deleted_file = db.session.query(UploadFile).filter_by(id=file_id).first()
assert deleted_file is None
def test_batch_clean_document_task_multiple_documents(
self, db_session_with_containers, mock_external_service_dependencies
):
"""
Test cleanup of multiple documents in a single batch operation.
This test verifies that the task can handle multiple documents
efficiently and cleans up all associated resources.
"""
# Create test data for multiple documents
account = self._create_test_account(db_session_with_containers)
dataset = self._create_test_dataset(db_session_with_containers, account)
documents = []
segments = []
upload_files = []
# Create 3 documents with segments and files
for i in range(3):
document = self._create_test_document(db_session_with_containers, dataset, account)
segment = self._create_test_document_segment(db_session_with_containers, document, account)
upload_file = self._create_test_upload_file(db_session_with_containers, account)
# Update document to reference the upload file
document.data_source_info = json.dumps({"upload_file_id": upload_file.id})
documents.append(document)
segments.append(segment)
upload_files.append(upload_file)
db.session.commit()
# Store original IDs for verification
document_ids = [doc.id for doc in documents]
segment_ids = [seg.id for seg in segments]
file_ids = [file.id for file in upload_files]
# Execute the task with multiple documents
batch_clean_document_task(
document_ids=document_ids, dataset_id=dataset.id, doc_form=dataset.doc_form, file_ids=file_ids
)
# Verify that the task completed successfully for all documents
# The task should process all documents and clean up all associated resources
# Verify database cleanup for all resources
db.session.commit()
# Check that all segments are deleted
for segment_id in segment_ids:
deleted_segment = db.session.query(DocumentSegment).filter_by(id=segment_id).first()
assert deleted_segment is None
# Check that all upload files are deleted
for file_id in file_ids:
deleted_file = db.session.query(UploadFile).filter_by(id=file_id).first()
assert deleted_file is None
def test_batch_clean_document_task_different_doc_forms(
self, db_session_with_containers, mock_external_service_dependencies
):
"""
Test cleanup with different document form types.
This test verifies that the task properly handles different
document form types and creates the appropriate index processor.
"""
# Create test data
account = self._create_test_account(db_session_with_containers)
# Test different doc_form types
doc_forms = ["text_model", "qa_model", "hierarchical_model"]
for doc_form in doc_forms:
dataset = self._create_test_dataset(db_session_with_containers, account)
db.session.commit()
document = self._create_test_document(db_session_with_containers, dataset, account)
# Update document doc_form
document.doc_form = doc_form
db.session.commit()
segment = self._create_test_document_segment(db_session_with_containers, document, account)
# Store the ID before the object is deleted
segment_id = segment.id
try:
# Execute the task
batch_clean_document_task(
document_ids=[document.id], dataset_id=dataset.id, doc_form=doc_form, file_ids=[]
)
# Verify that the task completed successfully for this doc_form
# The task should handle different document forms correctly
# Verify database cleanup
db.session.commit()
# Check that segment is deleted
deleted_segment = db.session.query(DocumentSegment).filter_by(id=segment_id).first()
assert deleted_segment is None
except Exception as e:
# If the task fails due to external service issues (e.g., plugin daemon),
# we should still verify that the database state is consistent
# This is a common scenario in test environments where external services may not be available
db.session.commit()
# Check if the segment still exists (task may have failed before deletion)
existing_segment = db.session.query(DocumentSegment).filter_by(id=segment_id).first()
if existing_segment is not None:
# If segment still exists, the task failed before deletion
# This is acceptable in test environments with external service issues
pass
else:
# If segment was deleted, the task succeeded
pass
def test_batch_clean_document_task_large_batch_performance(
self, db_session_with_containers, mock_external_service_dependencies
):
"""
Test cleanup performance with a large batch of documents.
This test verifies that the task can handle large batches efficiently
and maintains performance characteristics.
"""
import time
# Create test data for large batch
account = self._create_test_account(db_session_with_containers)
dataset = self._create_test_dataset(db_session_with_containers, account)
documents = []
segments = []
upload_files = []
# Create 10 documents with segments and files (larger batch)
batch_size = 10
for i in range(batch_size):
document = self._create_test_document(db_session_with_containers, dataset, account)
segment = self._create_test_document_segment(db_session_with_containers, document, account)
upload_file = self._create_test_upload_file(db_session_with_containers, account)
# Update document to reference the upload file
document.data_source_info = json.dumps({"upload_file_id": upload_file.id})
documents.append(document)
segments.append(segment)
upload_files.append(upload_file)
db.session.commit()
# Store original IDs for verification
document_ids = [doc.id for doc in documents]
segment_ids = [seg.id for seg in segments]
file_ids = [file.id for file in upload_files]
# Measure execution time
start_time = time.perf_counter()
# Execute the task with large batch
batch_clean_document_task(
document_ids=document_ids, dataset_id=dataset.id, doc_form=dataset.doc_form, file_ids=file_ids
)
end_time = time.perf_counter()
execution_time = end_time - start_time
# Verify performance characteristics (should complete within reasonable time)
assert execution_time < 5.0 # Should complete within 5 seconds
# Verify that the task completed successfully for the large batch
# The task should handle large batches efficiently
# Verify database cleanup for all resources
db.session.commit()
# Check that all segments are deleted
for segment_id in segment_ids:
deleted_segment = db.session.query(DocumentSegment).filter_by(id=segment_id).first()
assert deleted_segment is None
# Check that all upload files are deleted
for file_id in file_ids:
deleted_file = db.session.query(UploadFile).filter_by(id=file_id).first()
assert deleted_file is None
def test_batch_clean_document_task_integration_with_real_database(
self, db_session_with_containers, mock_external_service_dependencies
):
"""
Test full integration with real database operations.
This test verifies that the task integrates properly with the
actual database and maintains data consistency throughout the process.
"""
# Create test data
account = self._create_test_account(db_session_with_containers)
dataset = self._create_test_dataset(db_session_with_containers, account)
# Create document with complex structure
document = self._create_test_document(db_session_with_containers, dataset, account)
# Create multiple segments for the document
segments = []
for i in range(3):
segment = DocumentSegment(
id=str(uuid.uuid4()),
tenant_id=account.current_tenant.id,
dataset_id=document.dataset_id,
document_id=document.id,
position=i,
content=f"Segment content {i} with some text",
word_count=50 + i * 10,
tokens=25 + i * 5,
index_node_id=str(uuid.uuid4()),
created_by=account.id,
status="completed",
)
segments.append(segment)
# Create upload file
upload_file = self._create_test_upload_file(db_session_with_containers, account)
# Update document to reference the upload file
document.data_source_info = json.dumps({"upload_file_id": upload_file.id})
# Add all to database
for segment in segments:
db.session.add(segment)
db.session.commit()
# Verify initial state
assert db.session.query(DocumentSegment).filter_by(document_id=document.id).count() == 3
assert db.session.query(UploadFile).filter_by(id=upload_file.id).first() is not None
# Store original IDs for verification
document_id = document.id
segment_ids = [seg.id for seg in segments]
file_id = upload_file.id
# Execute the task
batch_clean_document_task(
document_ids=[document_id], dataset_id=dataset.id, doc_form=dataset.doc_form, file_ids=[file_id]
)
# Verify that the task completed successfully
# The task should process all segments and clean up all associated resources
# Verify database cleanup
db.session.commit()
# Check that all segments are deleted
for segment_id in segment_ids:
deleted_segment = db.session.query(DocumentSegment).filter_by(id=segment_id).first()
assert deleted_segment is None
# Check that upload file is deleted
deleted_file = db.session.query(UploadFile).filter_by(id=file_id).first()
assert deleted_file is None
# Verify final database state
assert db.session.query(DocumentSegment).filter_by(document_id=document_id).count() == 0
assert db.session.query(UploadFile).filter_by(id=file_id).first() is None

View File

@ -850,84 +850,83 @@ const Configuration: FC = () => {
<Loading type='area' />
</div>
}
const value = {
appId,
isAPIKeySet,
isTrailFinished: false,
mode,
modelModeType,
promptMode,
isAdvancedMode,
isAgent,
isOpenAI,
isFunctionCall,
collectionList,
setPromptMode,
canReturnToSimpleMode,
setCanReturnToSimpleMode,
chatPromptConfig,
completionPromptConfig,
currentAdvancedPrompt,
setCurrentAdvancedPrompt,
conversationHistoriesRole: completionPromptConfig.conversation_histories_role,
showHistoryModal,
setConversationHistoriesRole,
hasSetBlockStatus,
conversationId,
introduction,
setIntroduction,
suggestedQuestions,
setSuggestedQuestions,
setConversationId,
controlClearChatMessage,
setControlClearChatMessage,
prevPromptConfig,
setPrevPromptConfig,
moreLikeThisConfig,
setMoreLikeThisConfig,
suggestedQuestionsAfterAnswerConfig,
setSuggestedQuestionsAfterAnswerConfig,
speechToTextConfig,
setSpeechToTextConfig,
textToSpeechConfig,
setTextToSpeechConfig,
citationConfig,
setCitationConfig,
annotationConfig,
setAnnotationConfig,
moderationConfig,
setModerationConfig,
externalDataToolsConfig,
setExternalDataToolsConfig,
formattingChanged,
setFormattingChanged,
inputs,
setInputs,
query,
setQuery,
completionParams,
setCompletionParams,
modelConfig,
setModelConfig,
showSelectDataSet,
dataSets,
setDataSets,
datasetConfigs,
datasetConfigsRef,
setDatasetConfigs,
hasSetContextVar,
isShowVisionConfig,
visionConfig,
setVisionConfig: handleSetVisionConfig,
isAllowVideoUpload,
isShowDocumentConfig,
isShowAudioConfig,
rerankSettingModalOpen,
setRerankSettingModalOpen,
}
return (
<ConfigContext.Provider value={{
appId,
isAPIKeySet,
isTrailFinished: false,
mode,
modelModeType,
promptMode,
isAdvancedMode,
isAgent,
isOpenAI,
isFunctionCall,
collectionList,
setPromptMode,
canReturnToSimpleMode,
setCanReturnToSimpleMode,
chatPromptConfig,
completionPromptConfig,
currentAdvancedPrompt,
setCurrentAdvancedPrompt,
conversationHistoriesRole: completionPromptConfig.conversation_histories_role,
showHistoryModal,
setConversationHistoriesRole,
hasSetBlockStatus,
conversationId,
introduction,
setIntroduction,
suggestedQuestions,
setSuggestedQuestions,
setConversationId,
controlClearChatMessage,
setControlClearChatMessage,
prevPromptConfig,
setPrevPromptConfig,
moreLikeThisConfig,
setMoreLikeThisConfig,
suggestedQuestionsAfterAnswerConfig,
setSuggestedQuestionsAfterAnswerConfig,
speechToTextConfig,
setSpeechToTextConfig,
textToSpeechConfig,
setTextToSpeechConfig,
citationConfig,
setCitationConfig,
annotationConfig,
setAnnotationConfig,
moderationConfig,
setModerationConfig,
externalDataToolsConfig,
setExternalDataToolsConfig,
formattingChanged,
setFormattingChanged,
inputs,
setInputs,
query,
setQuery,
completionParams,
setCompletionParams,
modelConfig,
setModelConfig,
showSelectDataSet,
dataSets,
setDataSets,
datasetConfigs,
datasetConfigsRef,
setDatasetConfigs,
hasSetContextVar,
isShowVisionConfig,
visionConfig,
setVisionConfig: handleSetVisionConfig,
isAllowVideoUpload,
isShowDocumentConfig,
isShowAudioConfig,
rerankSettingModalOpen,
setRerankSettingModalOpen,
}}
>
<ConfigContext.Provider value={value}>
<FeaturesProvider features={featuresData}>
<MittProvider>
<div className="flex h-full flex-col">

View File

@ -740,84 +740,6 @@ Workflow applications offers non-session support and is ideal for translation, a
---
<Heading
url='/files/:file_id/preview'
method='GET'
title='File Preview'
name='#file-preview'
/>
<Row>
<Col>
Preview or download uploaded files. This endpoint allows you to access files that have been previously uploaded via the File Upload API.
<i>Files can only be accessed if they belong to messages within the requesting application.</i>
### Path Parameters
- `file_id` (string) Required
The unique identifier of the file to preview, obtained from the File Upload API response.
### Query Parameters
- `as_attachment` (boolean) Optional
Whether to force download the file as an attachment. Default is `false` (preview in browser).
### Response
Returns the file content with appropriate headers for browser display or download.
- `Content-Type` Set based on file mime type
- `Content-Length` File size in bytes (if available)
- `Content-Disposition` Set to "attachment" if `as_attachment=true`
- `Cache-Control` Caching headers for performance
- `Accept-Ranges` Set to "bytes" for audio/video files
### Errors
- 400, `invalid_param`, abnormal parameter input
- 403, `file_access_denied`, file access denied or file does not belong to current application
- 404, `file_not_found`, file not found or has been deleted
- 500, internal server error
</Col>
<Col sticky>
### Request Example
<CodeGroup
title="Request"
tag="GET"
label="/files/:file_id/preview"
targetCode={`curl -X GET '${props.appDetail.api_base_url}/files/72fa9618-8f89-4a37-9b33-7e1178a24a67/preview' \\
--header 'Authorization: Bearer {api_key}'`}
/>
### Download as Attachment
<CodeGroup
title="Download Request"
tag="GET"
label="/files/:file_id/preview?as_attachment=true"
targetCode={`curl -X GET '${props.appDetail.api_base_url}/files/72fa9618-8f89-4a37-9b33-7e1178a24a67/preview?as_attachment=true' \\
--header 'Authorization: Bearer {api_key}' \\
--output downloaded_file.png`}
/>
### Response Headers Example
<CodeGroup title="Response Headers">
```http {{ title: 'Headers - Image Preview' }}
Content-Type: image/png
Content-Length: 1024
Cache-Control: public, max-age=3600
```
</CodeGroup>
### Download Response Headers
<CodeGroup title="Download Response Headers">
```http {{ title: 'Headers - File Download' }}
Content-Type: image/png
Content-Length: 1024
Content-Disposition: attachment; filename*=UTF-8''example.png
Cache-Control: public, max-age=3600
```
</CodeGroup>
</Col>
</Row>
---
<Heading
url='/workflows/logs'
method='GET'

View File

@ -736,84 +736,6 @@ import { Row, Col, Properties, Property, Heading, SubProperty, Paragraph } from
---
<Heading
url='/files/:file_id/preview'
method='GET'
title='ファイルプレビュー'
name='#file-preview'
/>
<Row>
<Col>
アップロードされたファイルをプレビューまたはダウンロードします。このエンドポイントを使用すると、以前にファイルアップロード API でアップロードされたファイルにアクセスできます。
<i>ファイルは、リクエストしているアプリケーションのメッセージ範囲内にある場合のみアクセス可能です。</i>
### パスパラメータ
- `file_id` (string) 必須
プレビューするファイルの一意識別子。ファイルアップロード API レスポンスから取得します。
### クエリパラメータ
- `as_attachment` (boolean) オプション
ファイルを添付ファイルとして強制ダウンロードするかどうか。デフォルトは `false`(ブラウザでプレビュー)。
### レスポンス
ブラウザ表示またはダウンロード用の適切なヘッダー付きでファイル内容を返します。
- `Content-Type` ファイル MIME タイプに基づいて設定
- `Content-Length` ファイルサイズ(バイト、利用可能な場合)
- `Content-Disposition` `as_attachment=true` の場合は "attachment" に設定
- `Cache-Control` パフォーマンス向上のためのキャッシュヘッダー
- `Accept-Ranges` 音声/動画ファイルの場合は "bytes" に設定
### エラー
- 400, `invalid_param`, パラメータ入力異常
- 403, `file_access_denied`, ファイルアクセス拒否またはファイルが現在のアプリケーションに属していません
- 404, `file_not_found`, ファイルが見つからないか削除されています
- 500, サーバー内部エラー
</Col>
<Col sticky>
### リクエスト例
<CodeGroup
title="Request"
tag="GET"
label="/files/:file_id/preview"
targetCode={`curl -X GET '${props.appDetail.api_base_url}/files/72fa9618-8f89-4a37-9b33-7e1178a24a67/preview' \\
--header 'Authorization: Bearer {api_key}'`}
/>
### 添付ファイルとしてダウンロード
<CodeGroup
title="Download Request"
tag="GET"
label="/files/:file_id/preview?as_attachment=true"
targetCode={`curl -X GET '${props.appDetail.api_base_url}/files/72fa9618-8f89-4a37-9b33-7e1178a24a67/preview?as_attachment=true' \\
--header 'Authorization: Bearer {api_key}' \\
--output downloaded_file.png`}
/>
### レスポンスヘッダー例
<CodeGroup title="Response Headers">
```http {{ title: 'ヘッダー - 画像プレビュー' }}
Content-Type: image/png
Content-Length: 1024
Cache-Control: public, max-age=3600
```
</CodeGroup>
### ダウンロードレスポンスヘッダー
<CodeGroup title="Download Response Headers">
```http {{ title: 'ヘッダー - ファイルダウンロード' }}
Content-Type: image/png
Content-Length: 1024
Content-Disposition: attachment; filename*=UTF-8''example.png
Cache-Control: public, max-age=3600
```
</CodeGroup>
</Col>
</Row>
---
<Heading
url='/workflows/logs'
method='GET'

View File

@ -727,83 +727,6 @@ Workflow 应用无会话支持,适合用于翻译/文章写作/总结 AI 等
</Row>
---
<Heading
url='/files/:file_id/preview'
method='GET'
title='文件预览'
name='#file-preview'
/>
<Row>
<Col>
预览或下载已上传的文件。此端点允许您访问先前通过文件上传 API 上传的文件。
<i>文件只能在属于请求应用程序的消息范围内访问。</i>
### 路径参数
- `file_id` (string) 必需
要预览的文件的唯一标识符,从文件上传 API 响应中获得。
### 查询参数
- `as_attachment` (boolean) 可选
是否强制将文件作为附件下载。默认为 `false`(在浏览器中预览)。
### 响应
返回带有适当浏览器显示或下载标头的文件内容。
- `Content-Type` 根据文件 MIME 类型设置
- `Content-Length` 文件大小(以字节为单位,如果可用)
- `Content-Disposition` 如果 `as_attachment=true` 则设置为 "attachment"
- `Cache-Control` 用于性能的缓存标头
- `Accept-Ranges` 对于音频/视频文件设置为 "bytes"
### 错误
- 400, `invalid_param`, 参数输入异常
- 403, `file_access_denied`, 文件访问被拒绝或文件不属于当前应用程序
- 404, `file_not_found`, 文件未找到或已被删除
- 500, 服务内部错误
</Col>
<Col sticky>
### 请求示例
<CodeGroup
title="Request"
tag="GET"
label="/files/:file_id/preview"
targetCode={`curl -X GET '${props.appDetail.api_base_url}/files/72fa9618-8f89-4a37-9b33-7e1178a24a67/preview' \\
--header 'Authorization: Bearer {api_key}'`}
/>
### 作为附件下载
<CodeGroup
title="Request"
tag="GET"
label="/files/:file_id/preview?as_attachment=true"
targetCode={`curl -X GET '${props.appDetail.api_base_url}/files/72fa9618-8f89-4a37-9b33-7e1178a24a67/preview?as_attachment=true' \\
--header 'Authorization: Bearer {api_key}' \\
--output downloaded_file.png`}
/>
### 响应标头示例
<CodeGroup title="Response Headers">
```http {{ title: 'Headers - 图片预览' }}
Content-Type: image/png
Content-Length: 1024
Cache-Control: public, max-age=3600
```
</CodeGroup>
### 文件下载响应标头
<CodeGroup title="Download Response Headers">
```http {{ title: 'Headers - 文件下载' }}
Content-Type: image/png
Content-Length: 1024
Content-Disposition: attachment; filename*=UTF-8''example.png
Cache-Control: public, max-age=3600
```
</CodeGroup>
</Col>
</Row>
---
<Heading
url='/workflows/logs'
method='GET'

View File

@ -461,6 +461,12 @@ const translation = {
contextTooltip: 'Anda dapat mengimpor Pengetahuan sebagai konteks',
notSetContextInPromptTip: 'Untuk mengaktifkan fitur konteks, silakan isi variabel konteks di PROMPT.',
context: 'konteks',
reasoningFormat: {
tagged: 'Tetap pikirkan tag',
title: 'Aktifkan pemisahan tag penalaran',
separated: 'Pisahkan tag pemikiran',
tooltip: 'Ekstrak konten dari tag pikir dan simpan di field reasoning_content.',
},
},
knowledgeRetrieval: {
outputVars: {

View File

@ -1 +0,0 @@
(()=>{"use strict";self.fallback=async e=>"document"===e.destination?caches.match("/_offline.html",{ignoreSearch:!0}):Response.error()})();

File diff suppressed because one or more lines are too long