Merge branch 'main' into feat/rag-pipeline

This commit is contained in:
zxhlyh 2025-04-18 14:00:58 +08:00
commit d238da9826
220 changed files with 10814 additions and 13035 deletions

View File

@ -2,10 +2,10 @@
npm add -g pnpm@10.8.0
cd web && pnpm install
pipx install poetry
pipx install uv
echo 'alias start-api="cd /workspaces/dify/api && poetry run python -m flask run --host 0.0.0.0 --port=5001 --debug"' >> ~/.bashrc
echo 'alias start-worker="cd /workspaces/dify/api && poetry run python -m celery -A app.celery worker -P gevent -c 1 --loglevel INFO -Q dataset,generation,mail,ops_trace,app_deletion"' >> ~/.bashrc
echo 'alias start-api="cd /workspaces/dify/api && uv run python -m flask run --host 0.0.0.0 --port=5001 --debug"' >> ~/.bashrc
echo 'alias start-worker="cd /workspaces/dify/api && uv run python -m celery -A app.celery worker -P gevent -c 1 --loglevel INFO -Q dataset,generation,mail,ops_trace,app_deletion"' >> ~/.bashrc
echo 'alias start-web="cd /workspaces/dify/web && pnpm dev"' >> ~/.bashrc
echo 'alias start-containers="cd /workspaces/dify/docker && docker-compose -f docker-compose.middleware.yaml -p dify --env-file middleware.env up -d"' >> ~/.bashrc
echo 'alias stop-containers="cd /workspaces/dify/docker && docker-compose -f docker-compose.middleware.yaml -p dify --env-file middleware.env down"' >> ~/.bashrc

View File

@ -1,3 +1,3 @@
#!/bin/bash
cd api && poetry install
cd api && uv sync

View File

@ -1,36 +0,0 @@
name: Setup Poetry and Python
inputs:
python-version:
description: Python version to use and the Poetry installed with
required: true
default: '3.11'
poetry-version:
description: Poetry version to set up
required: true
default: '2.0.1'
poetry-lockfile:
description: Path to the Poetry lockfile to restore cache from
required: true
default: ''
runs:
using: composite
steps:
- name: Set up Python ${{ inputs.python-version }}
uses: actions/setup-python@v5
with:
python-version: ${{ inputs.python-version }}
cache: pip
- name: Install Poetry
shell: bash
run: pip install poetry==${{ inputs.poetry-version }}
- name: Restore Poetry cache
if: ${{ inputs.poetry-lockfile != '' }}
uses: actions/setup-python@v5
with:
python-version: ${{ inputs.python-version }}
cache: poetry
cache-dependency-path: ${{ inputs.poetry-lockfile }}

34
.github/actions/setup-uv/action.yml vendored Normal file
View File

@ -0,0 +1,34 @@
name: Setup UV and Python
inputs:
python-version:
description: Python version to use and the UV installed with
required: true
default: '3.12'
uv-version:
description: UV version to set up
required: true
default: '0.6.14'
uv-lockfile:
description: Path to the UV lockfile to restore cache from
required: true
default: ''
enable-cache:
required: true
default: true
runs:
using: composite
steps:
- name: Set up Python ${{ inputs.python-version }}
uses: actions/setup-python@v5
with:
python-version: ${{ inputs.python-version }}
- name: Install uv
uses: astral-sh/setup-uv@v5
with:
version: ${{ inputs.uv-version }}
python-version: ${{ inputs.python-version }}
enable-cache: ${{ inputs.enable-cache }}
cache-dependency-glob: ${{ inputs.uv-lockfile }}

View File

@ -17,6 +17,9 @@ jobs:
test:
name: API Tests
runs-on: ubuntu-latest
defaults:
run:
shell: bash
strategy:
matrix:
python-version:
@ -27,40 +30,44 @@ jobs:
- name: Checkout code
uses: actions/checkout@v4
with:
fetch-depth: 0
persist-credentials: false
- name: Setup Poetry and Python ${{ matrix.python-version }}
uses: ./.github/actions/setup-poetry
- name: Setup UV and Python
uses: ./.github/actions/setup-uv
with:
python-version: ${{ matrix.python-version }}
poetry-lockfile: api/poetry.lock
uv-lockfile: api/uv.lock
- name: Check Poetry lockfile
run: |
poetry check -C api --lock
poetry show -C api
- name: Check UV lockfile
run: uv lock --project api --check
- name: Install dependencies
run: poetry install -C api --with dev
- name: Check dependencies in pyproject.toml
run: poetry run -P api bash dev/pytest/pytest_artifacts.sh
run: uv sync --project api --dev
- name: Run Unit tests
run: poetry run -P api bash dev/pytest/pytest_unit_tests.sh
run: |
uv run --project api bash dev/pytest/pytest_unit_tests.sh
# Extract coverage percentage and create a summary
TOTAL_COVERAGE=$(python -c 'import json; print(json.load(open("coverage.json"))["totals"]["percent_covered_display"])')
# Create a detailed coverage summary
echo "### Test Coverage Summary :test_tube:" >> $GITHUB_STEP_SUMMARY
echo "Total Coverage: ${TOTAL_COVERAGE}%" >> $GITHUB_STEP_SUMMARY
echo "\`\`\`" >> $GITHUB_STEP_SUMMARY
uv run --project api coverage report >> $GITHUB_STEP_SUMMARY
echo "\`\`\`" >> $GITHUB_STEP_SUMMARY
- name: Run dify config tests
run: poetry run -P api python dev/pytest/pytest_config_tests.py
run: uv run --project api dev/pytest/pytest_config_tests.py
- name: Cache MyPy
- name: MyPy Cache
uses: actions/cache@v4
with:
path: api/.mypy_cache
key: mypy-${{ matrix.python-version }}-${{ runner.os }}-${{ hashFiles('api/poetry.lock') }}
key: mypy-${{ matrix.python-version }}-${{ runner.os }}-${{ hashFiles('api/uv.lock') }}
- name: Run mypy
run: dev/run-mypy
- name: Run MyPy Checks
run: dev/mypy-check
- name: Set up dotenvs
run: |
@ -80,4 +87,4 @@ jobs:
ssrf_proxy
- name: Run Workflow
run: poetry run -P api bash dev/pytest/pytest_workflow.sh
run: uv run --project api bash dev/pytest/pytest_workflow.sh

View File

@ -24,13 +24,13 @@ jobs:
fetch-depth: 0
persist-credentials: false
- name: Setup Poetry and Python
uses: ./.github/actions/setup-poetry
- name: Setup UV and Python
uses: ./.github/actions/setup-uv
with:
poetry-lockfile: api/poetry.lock
uv-lockfile: api/uv.lock
- name: Install dependencies
run: poetry install -C api
run: uv sync --project api
- name: Prepare middleware env
run: |
@ -54,6 +54,4 @@ jobs:
- name: Run DB Migration
env:
DEBUG: true
run: |
cd api
poetry run python -m flask upgrade-db
run: uv run --directory api flask upgrade-db

View File

@ -42,6 +42,7 @@ jobs:
with:
push: false
context: "{{defaultContext}}:${{ matrix.context }}"
file: "${{ matrix.file }}"
platforms: ${{ matrix.platform }}
cache-from: type=gha
cache-to: type=gha,mode=max

View File

@ -18,7 +18,6 @@ jobs:
- name: Checkout code
uses: actions/checkout@v4
with:
fetch-depth: 0
persist-credentials: false
- name: Check changed files
@ -29,24 +28,27 @@ jobs:
api/**
.github/workflows/style.yml
- name: Setup Poetry and Python
- name: Setup UV and Python
if: steps.changed-files.outputs.any_changed == 'true'
uses: ./.github/actions/setup-poetry
uses: ./.github/actions/setup-uv
with:
uv-lockfile: api/uv.lock
enable-cache: false
- name: Install dependencies
if: steps.changed-files.outputs.any_changed == 'true'
run: poetry install -C api --only lint
run: uv sync --project api --dev
- name: Ruff check
if: steps.changed-files.outputs.any_changed == 'true'
run: |
poetry run -C api ruff --version
poetry run -C api ruff check ./
poetry run -C api ruff format --check ./
uv run --directory api ruff --version
uv run --directory api ruff check ./
uv run --directory api ruff format --check ./
- name: Dotenv check
if: steps.changed-files.outputs.any_changed == 'true'
run: poetry run -P api dotenv-linter ./api/.env.example ./web/.env.example
run: uv run --project api dotenv-linter ./api/.env.example ./web/.env.example
- name: Lint hints
if: failure()
@ -63,7 +65,6 @@ jobs:
- name: Checkout code
uses: actions/checkout@v4
with:
fetch-depth: 0
persist-credentials: false
- name: Check changed files
@ -102,7 +103,6 @@ jobs:
- name: Checkout code
uses: actions/checkout@v4
with:
fetch-depth: 0
persist-credentials: false
- name: Check changed files
@ -133,7 +133,6 @@ jobs:
- name: Checkout code
uses: actions/checkout@v4
with:
fetch-depth: 0
persist-credentials: false
- name: Check changed files

View File

@ -27,7 +27,6 @@ jobs:
steps:
- uses: actions/checkout@v4
with:
fetch-depth: 0
persist-credentials: false
- name: Use Node.js ${{ matrix.node-version }}

View File

@ -8,7 +8,7 @@ on:
- api/core/rag/datasource/**
- docker/**
- .github/workflows/vdb-tests.yml
- api/poetry.lock
- api/uv.lock
- api/pyproject.toml
concurrency:
@ -29,22 +29,19 @@ jobs:
- name: Checkout code
uses: actions/checkout@v4
with:
fetch-depth: 0
persist-credentials: false
- name: Setup Poetry and Python ${{ matrix.python-version }}
uses: ./.github/actions/setup-poetry
- name: Setup UV and Python
uses: ./.github/actions/setup-uv
with:
python-version: ${{ matrix.python-version }}
poetry-lockfile: api/poetry.lock
uv-lockfile: api/uv.lock
- name: Check Poetry lockfile
run: |
poetry check -C api --lock
poetry show -C api
- name: Check UV lockfile
run: uv lock --project api --check
- name: Install dependencies
run: poetry install -C api --with dev
run: uv sync --project api --dev
- name: Set up dotenvs
run: |
@ -80,7 +77,7 @@ jobs:
elasticsearch
- name: Check TiDB Ready
run: poetry run -P api python api/tests/integration_tests/vdb/tidb_vector/check_tiflash_ready.py
run: uv run --project api python api/tests/integration_tests/vdb/tidb_vector/check_tiflash_ready.py
- name: Test Vector Stores
run: poetry run -P api bash dev/pytest/pytest_vdb.sh
run: uv run --project api bash dev/pytest/pytest_vdb.sh

View File

@ -23,7 +23,6 @@ jobs:
- name: Checkout code
uses: actions/checkout@v4
with:
fetch-depth: 0
persist-credentials: false
- name: Check changed files

1
.gitignore vendored
View File

@ -46,6 +46,7 @@ htmlcov/
.cache
nosetests.xml
coverage.xml
coverage.json
*.cover
*.py,cover
.hypothesis/

View File

@ -165,6 +165,7 @@ MILVUS_URI=http://127.0.0.1:19530
MILVUS_TOKEN=
MILVUS_USER=root
MILVUS_PASSWORD=Milvus
MILVUS_ANALYZER_PARAMS=
# MyScale configuration
MYSCALE_HOST=127.0.0.1
@ -423,6 +424,12 @@ WORKFLOW_CALL_MAX_DEPTH=5
WORKFLOW_PARALLEL_DEPTH_LIMIT=3
MAX_VARIABLE_SIZE=204800
# Workflow storage configuration
# Options: rdbms, hybrid
# rdbms: Use only the relational database (default)
# hybrid: Save new data to object storage, read from both object storage and RDBMS
WORKFLOW_NODE_EXECUTION_STORAGE=rdbms
# App configuration
APP_MAX_EXECUTION_TIME=1200
APP_MAX_ACTIVE_REQUESTS=0

View File

@ -3,20 +3,11 @@ FROM python:3.12-slim-bookworm AS base
WORKDIR /app/api
# Install Poetry
ENV POETRY_VERSION=2.0.1
# Install uv
ENV UV_VERSION=0.6.14
# if you located in China, you can use aliyun mirror to speed up
# RUN pip install --no-cache-dir poetry==${POETRY_VERSION} -i https://mirrors.aliyun.com/pypi/simple/
RUN pip install --no-cache-dir uv==${UV_VERSION}
RUN pip install --no-cache-dir poetry==${POETRY_VERSION}
# Configure Poetry
ENV POETRY_CACHE_DIR=/tmp/poetry_cache
ENV POETRY_NO_INTERACTION=1
ENV POETRY_VIRTUALENVS_IN_PROJECT=true
ENV POETRY_VIRTUALENVS_CREATE=true
ENV POETRY_REQUESTS_TIMEOUT=15
FROM base AS packages
@ -27,8 +18,8 @@ RUN apt-get update \
&& apt-get install -y --no-install-recommends gcc g++ libc-dev libffi-dev libgmp-dev libmpfr-dev libmpc-dev
# Install Python dependencies
COPY pyproject.toml poetry.lock ./
RUN poetry install --sync --no-cache --no-root
COPY pyproject.toml uv.lock ./
RUN uv sync --locked
# production stage
FROM base AS production

View File

@ -3,7 +3,10 @@
## Usage
> [!IMPORTANT]
> In the v0.6.12 release, we deprecated `pip` as the package management tool for Dify API Backend service and replaced it with `poetry`.
>
> In the v1.3.0 release, `poetry` has been replaced with
> [`uv`](https://docs.astral.sh/uv/) as the package manager
> for Dify API backend service.
1. Start the docker-compose stack
@ -37,19 +40,19 @@
4. Create environment.
Dify API service uses [Poetry](https://python-poetry.org/docs/) to manage dependencies. First, you need to add the poetry shell plugin, if you don't have it already, in order to run in a virtual environment. [Note: Poetry shell is no longer a native command so you need to install the poetry plugin beforehand]
Dify API service uses [UV](https://docs.astral.sh/uv/) to manage dependencies.
First, you need to add the uv package manager, if you don't have it already.
```bash
poetry self add poetry-plugin-shell
pip install uv
# Or on macOS
brew install uv
```
Then, You can execute `poetry shell` to activate the environment.
5. Install dependencies
```bash
poetry env use 3.12
poetry install
uv sync --dev
```
6. Run migrate
@ -57,21 +60,21 @@
Before the first launch, migrate the database to the latest version.
```bash
poetry run python -m flask db upgrade
uv run flask db upgrade
```
7. Start backend
```bash
poetry run python -m flask run --host 0.0.0.0 --port=5001 --debug
uv run flask run --host 0.0.0.0 --port=5001 --debug
```
8. Start Dify [web](../web) service.
9. Setup your application by visiting `http://localhost:3000`...
9. Setup your application by visiting `http://localhost:3000`.
10. If you need to handle and debug the async tasks (e.g. dataset importing and documents indexing), please start the worker service.
```bash
poetry run python -m celery -A app.celery worker -P gevent -c 1 --loglevel INFO -Q dataset,generation,mail,ops_trace,app_deletion
uv run celery -A app.celery worker -P gevent -c 1 --loglevel INFO -Q dataset,generation,mail,ops_trace,app_deletion
```
## Testing
@ -79,11 +82,11 @@
1. Install dependencies for both the backend and the test environment
```bash
poetry install -C api --with dev
uv sync --dev
```
2. Run the tests locally with mocked system environment variables in `tool.pytest_env` section in `pyproject.toml`
```bash
poetry run -P api bash dev/pytest/pytest_all_tests.sh
uv run -P api bash dev/pytest/pytest_all_tests.sh
```

View File

@ -54,6 +54,7 @@ def initialize_extensions(app: DifyApp):
ext_otel,
ext_proxy_fix,
ext_redis,
ext_repositories,
ext_sentry,
ext_set_secretkey,
ext_storage,
@ -74,6 +75,7 @@ def initialize_extensions(app: DifyApp):
ext_migrate,
ext_redis,
ext_storage,
ext_repositories,
ext_celery,
ext_login,
ext_mail,

View File

@ -12,7 +12,7 @@ from pydantic import (
)
from pydantic_settings import BaseSettings
from configs.feature.hosted_service import HostedServiceConfig
from .hosted_service import HostedServiceConfig
class SecurityConfig(BaseSettings):
@ -519,6 +519,11 @@ class WorkflowNodeExecutionConfig(BaseSettings):
default=100,
)
WORKFLOW_NODE_EXECUTION_STORAGE: str = Field(
default="rdbms",
description="Storage backend for WorkflowNodeExecution. Options: 'rdbms', 'hybrid'",
)
class AuthConfig(BaseSettings):
"""

View File

@ -39,3 +39,8 @@ class MilvusConfig(BaseSettings):
"older versions",
default=True,
)
MILVUS_ANALYZER_PARAMS: Optional[str] = Field(
description='Milvus text analyzer parameters, e.g., {"type": "chinese"} for Chinese segmentation support.',
default=None,
)

View File

@ -4,14 +4,10 @@ import platform
import re
import urllib.parse
import warnings
from collections.abc import Mapping
from typing import Any
from uuid import uuid4
import httpx
from constants import DEFAULT_FILE_NUMBER_LIMITS
try:
import magic
except ImportError:
@ -31,8 +27,6 @@ except ImportError:
from pydantic import BaseModel
from configs import dify_config
class FileInfo(BaseModel):
filename: str
@ -89,38 +83,3 @@ def guess_file_info_from_response(response: httpx.Response):
mimetype=mimetype,
size=int(response.headers.get("Content-Length", -1)),
)
def get_parameters_from_feature_dict(*, features_dict: Mapping[str, Any], user_input_form: list[dict[str, Any]]):
return {
"opening_statement": features_dict.get("opening_statement"),
"suggested_questions": features_dict.get("suggested_questions", []),
"suggested_questions_after_answer": features_dict.get("suggested_questions_after_answer", {"enabled": False}),
"speech_to_text": features_dict.get("speech_to_text", {"enabled": False}),
"text_to_speech": features_dict.get("text_to_speech", {"enabled": False}),
"retriever_resource": features_dict.get("retriever_resource", {"enabled": False}),
"annotation_reply": features_dict.get("annotation_reply", {"enabled": False}),
"more_like_this": features_dict.get("more_like_this", {"enabled": False}),
"user_input_form": user_input_form,
"sensitive_word_avoidance": features_dict.get(
"sensitive_word_avoidance", {"enabled": False, "type": "", "configs": []}
),
"file_upload": features_dict.get(
"file_upload",
{
"image": {
"enabled": False,
"number_limits": DEFAULT_FILE_NUMBER_LIMITS,
"detail": "high",
"transfer_methods": ["remote_url", "local_file"],
}
},
),
"system_parameters": {
"image_file_size_limit": dify_config.UPLOAD_IMAGE_FILE_SIZE_LIMIT,
"video_file_size_limit": dify_config.UPLOAD_VIDEO_FILE_SIZE_LIMIT,
"audio_file_size_limit": dify_config.UPLOAD_AUDIO_FILE_SIZE_LIMIT,
"file_size_limit": dify_config.UPLOAD_FILE_SIZE_LIMIT,
"workflow_file_upload_limit": dify_config.WORKFLOW_FILE_UPLOAD_LIMIT,
},
}

View File

@ -1,5 +1,4 @@
from datetime import datetime
from dateutil.parser import isoparse
from flask_restful import Resource, marshal_with, reqparse # type: ignore
from flask_restful.inputs import int_range # type: ignore
from sqlalchemy.orm import Session
@ -41,10 +40,10 @@ class WorkflowAppLogApi(Resource):
args.status = WorkflowRunStatus(args.status) if args.status else None
if args.created_at__before:
args.created_at__before = datetime.fromisoformat(args.created_at__before.replace("Z", "+00:00"))
args.created_at__before = isoparse(args.created_at__before)
if args.created_at__after:
args.created_at__after = datetime.fromisoformat(args.created_at__after.replace("Z", "+00:00"))
args.created_at__after = isoparse(args.created_at__after)
# get paginate workflow app logs
workflow_app_service = WorkflowAppService()

View File

@ -74,7 +74,9 @@ class OAuthDataSourceBinding(Resource):
if not oauth_provider:
return {"error": "Invalid provider"}, 400
if "code" in request.args:
code = request.args.get("code")
code = request.args.get("code", "")
if not code:
return {"error": "Invalid code"}, 400
try:
oauth_provider.get_access_token(code)
except requests.exceptions.HTTPError as e:

View File

@ -1,10 +1,10 @@
from flask_restful import marshal_with # type: ignore
from controllers.common import fields
from controllers.common import helpers as controller_helpers
from controllers.console import api
from controllers.console.app.error import AppUnavailableError
from controllers.console.explore.wraps import InstalledAppResource
from core.app.app_config.common.parameters_mapping import get_parameters_from_feature_dict
from models.model import AppMode, InstalledApp
from services.app_service import AppService
@ -36,9 +36,7 @@ class AppParameterApi(InstalledAppResource):
user_input_form = features_dict.get("user_input_form", [])
return controller_helpers.get_parameters_from_feature_dict(
features_dict=features_dict, user_input_form=user_input_form
)
return get_parameters_from_feature_dict(features_dict=features_dict, user_input_form=user_input_form)
class ExploreAppMetaApi(InstalledAppResource):

View File

@ -13,6 +13,7 @@ from core.plugin.backwards_invocation.model import PluginModelBackwardsInvocatio
from core.plugin.backwards_invocation.node import PluginNodeBackwardsInvocation
from core.plugin.backwards_invocation.tool import PluginToolBackwardsInvocation
from core.plugin.entities.request import (
RequestFetchAppInfo,
RequestInvokeApp,
RequestInvokeEncrypt,
RequestInvokeLLM,
@ -278,6 +279,17 @@ class PluginUploadFileRequestApi(Resource):
return BaseBackwardsInvocationResponse(data={"url": url}).model_dump()
class PluginFetchAppInfoApi(Resource):
@setup_required
@plugin_inner_api_only
@get_user_tenant
@plugin_data(payload_type=RequestFetchAppInfo)
def post(self, user_model: Account | EndUser, tenant_model: Tenant, payload: RequestFetchAppInfo):
return BaseBackwardsInvocationResponse(
data=PluginAppBackwardsInvocation.fetch_app_info(payload.app_id, tenant_model.id)
).model_dump()
api.add_resource(PluginInvokeLLMApi, "/invoke/llm")
api.add_resource(PluginInvokeTextEmbeddingApi, "/invoke/text-embedding")
api.add_resource(PluginInvokeRerankApi, "/invoke/rerank")
@ -291,3 +303,4 @@ api.add_resource(PluginInvokeAppApi, "/invoke/app")
api.add_resource(PluginInvokeEncryptApi, "/invoke/encrypt")
api.add_resource(PluginInvokeSummaryApi, "/invoke/summary")
api.add_resource(PluginUploadFileRequestApi, "/upload/file/request")
api.add_resource(PluginFetchAppInfoApi, "/fetch/app/info")

View File

@ -1,10 +1,10 @@
from flask_restful import Resource, marshal_with # type: ignore
from controllers.common import fields
from controllers.common import helpers as controller_helpers
from controllers.service_api import api
from controllers.service_api.app.error import AppUnavailableError
from controllers.service_api.wraps import validate_app_token
from core.app.app_config.common.parameters_mapping import get_parameters_from_feature_dict
from models.model import App, AppMode
from services.app_service import AppService
@ -32,9 +32,7 @@ class AppParameterApi(Resource):
user_input_form = features_dict.get("user_input_form", [])
return controller_helpers.get_parameters_from_feature_dict(
features_dict=features_dict, user_input_form=user_input_form
)
return get_parameters_from_feature_dict(features_dict=features_dict, user_input_form=user_input_form)
class AppMetaApi(Resource):

View File

@ -1,6 +1,6 @@
import logging
from datetime import datetime
from dateutil.parser import isoparse
from flask_restful import Resource, fields, marshal_with, reqparse # type: ignore
from flask_restful.inputs import int_range # type: ignore
from sqlalchemy.orm import Session
@ -140,10 +140,10 @@ class WorkflowAppLogApi(Resource):
args.status = WorkflowRunStatus(args.status) if args.status else None
if args.created_at__before:
args.created_at__before = datetime.fromisoformat(args.created_at__before.replace("Z", "+00:00"))
args.created_at__before = isoparse(args.created_at__before)
if args.created_at__after:
args.created_at__after = datetime.fromisoformat(args.created_at__after.replace("Z", "+00:00"))
args.created_at__after = isoparse(args.created_at__after)
# get paginate workflow app logs
workflow_app_service = WorkflowAppService()

View File

@ -139,7 +139,9 @@ class DatasetListApi(DatasetApiResource):
external_knowledge_id=args["external_knowledge_id"],
embedding_model_provider=args["embedding_model_provider"],
embedding_model_name=args["embedding_model"],
retrieval_model=RetrievalModel(**args["retrieval_model"]),
retrieval_model=RetrievalModel(**args["retrieval_model"])
if args["retrieval_model"] is not None
else None,
)
except services.errors.dataset.DatasetNameDuplicateError:
raise DatasetNameDuplicateError()

View File

@ -122,6 +122,8 @@ class SegmentApi(DatasetApiResource):
tenant_id=current_user.current_tenant_id,
status_list=args["status"],
keyword=args["keyword"],
page=page,
limit=limit,
)
response = {

View File

@ -1,10 +1,10 @@
from flask_restful import marshal_with # type: ignore
from controllers.common import fields
from controllers.common import helpers as controller_helpers
from controllers.web import api
from controllers.web.error import AppUnavailableError
from controllers.web.wraps import WebApiResource
from core.app.app_config.common.parameters_mapping import get_parameters_from_feature_dict
from models.model import App, AppMode
from services.app_service import AppService
@ -31,9 +31,7 @@ class AppParameterApi(WebApiResource):
user_input_form = features_dict.get("user_input_form", [])
return controller_helpers.get_parameters_from_feature_dict(
features_dict=features_dict, user_input_form=user_input_form
)
return get_parameters_from_feature_dict(features_dict=features_dict, user_input_form=user_input_form)
class AppMeta(WebApiResource):

View File

@ -46,6 +46,7 @@ class MessageListApi(WebApiResource):
"retriever_resources": fields.List(fields.Nested(retriever_resource_fields)),
"created_at": TimestampField,
"agent_thoughts": fields.List(fields.Nested(agent_thought_fields)),
"metadata": fields.Raw(attribute="message_metadata_dict"),
"status": fields.String,
"error": fields.String,
}

View File

@ -191,7 +191,7 @@ class CotAgentRunner(BaseAgentRunner, ABC):
# action is final answer, return final answer directly
try:
if isinstance(scratchpad.action.action_input, dict):
final_answer = json.dumps(scratchpad.action.action_input)
final_answer = json.dumps(scratchpad.action.action_input, ensure_ascii=False)
elif isinstance(scratchpad.action.action_input, str):
final_answer = scratchpad.action.action_input
else:

View File

@ -52,6 +52,7 @@ class AgentStrategyParameter(PluginParameter):
return cast_parameter_value(self, value)
type: AgentStrategyParameterType = Field(..., description="The type of the parameter")
help: Optional[I18nObject] = None
def init_frontend_parameter(self, value: Any):
return init_frontend_parameter(self, self.type, value)

View File

@ -0,0 +1,45 @@
from collections.abc import Mapping
from typing import Any
from configs import dify_config
from constants import DEFAULT_FILE_NUMBER_LIMITS
def get_parameters_from_feature_dict(
*, features_dict: Mapping[str, Any], user_input_form: list[dict[str, Any]]
) -> Mapping[str, Any]:
"""
Mapping from feature dict to webapp parameters
"""
return {
"opening_statement": features_dict.get("opening_statement"),
"suggested_questions": features_dict.get("suggested_questions", []),
"suggested_questions_after_answer": features_dict.get("suggested_questions_after_answer", {"enabled": False}),
"speech_to_text": features_dict.get("speech_to_text", {"enabled": False}),
"text_to_speech": features_dict.get("text_to_speech", {"enabled": False}),
"retriever_resource": features_dict.get("retriever_resource", {"enabled": False}),
"annotation_reply": features_dict.get("annotation_reply", {"enabled": False}),
"more_like_this": features_dict.get("more_like_this", {"enabled": False}),
"user_input_form": user_input_form,
"sensitive_word_avoidance": features_dict.get(
"sensitive_word_avoidance", {"enabled": False, "type": "", "configs": []}
),
"file_upload": features_dict.get(
"file_upload",
{
"image": {
"enabled": False,
"number_limits": DEFAULT_FILE_NUMBER_LIMITS,
"detail": "high",
"transfer_methods": ["remote_url", "local_file"],
}
},
),
"system_parameters": {
"image_file_size_limit": dify_config.UPLOAD_IMAGE_FILE_SIZE_LIMIT,
"video_file_size_limit": dify_config.UPLOAD_VIDEO_FILE_SIZE_LIMIT,
"audio_file_size_limit": dify_config.UPLOAD_AUDIO_FILE_SIZE_LIMIT,
"file_size_limit": dify_config.UPLOAD_FILE_SIZE_LIMIT,
"workflow_file_upload_limit": dify_config.WORKFLOW_FILE_UPLOAD_LIMIT,
},
}

View File

@ -320,10 +320,9 @@ class AdvancedChatAppGenerateTaskPipeline:
session=session, workflow_run_id=self._workflow_run_id
)
workflow_node_execution = self._workflow_cycle_manager._handle_workflow_node_execution_retried(
session=session, workflow_run=workflow_run, event=event
workflow_run=workflow_run, event=event
)
node_retry_resp = self._workflow_cycle_manager._workflow_node_retry_to_stream_response(
session=session,
event=event,
task_id=self._application_generate_entity.task_id,
workflow_node_execution=workflow_node_execution,
@ -341,11 +340,10 @@ class AdvancedChatAppGenerateTaskPipeline:
session=session, workflow_run_id=self._workflow_run_id
)
workflow_node_execution = self._workflow_cycle_manager._handle_node_execution_start(
session=session, workflow_run=workflow_run, event=event
workflow_run=workflow_run, event=event
)
node_start_resp = self._workflow_cycle_manager._workflow_node_start_to_stream_response(
session=session,
event=event,
task_id=self._application_generate_entity.task_id,
workflow_node_execution=workflow_node_execution,
@ -363,11 +361,10 @@ class AdvancedChatAppGenerateTaskPipeline:
with Session(db.engine, expire_on_commit=False) as session:
workflow_node_execution = self._workflow_cycle_manager._handle_workflow_node_execution_success(
session=session, event=event
event=event
)
node_finish_resp = self._workflow_cycle_manager._workflow_node_finish_to_stream_response(
session=session,
event=event,
task_id=self._application_generate_entity.task_id,
workflow_node_execution=workflow_node_execution,
@ -383,18 +380,15 @@ class AdvancedChatAppGenerateTaskPipeline:
| QueueNodeInLoopFailedEvent
| QueueNodeExceptionEvent,
):
with Session(db.engine, expire_on_commit=False) as session:
workflow_node_execution = self._workflow_cycle_manager._handle_workflow_node_execution_failed(
session=session, event=event
)
workflow_node_execution = self._workflow_cycle_manager._handle_workflow_node_execution_failed(
event=event
)
node_finish_resp = self._workflow_cycle_manager._workflow_node_finish_to_stream_response(
session=session,
event=event,
task_id=self._application_generate_entity.task_id,
workflow_node_execution=workflow_node_execution,
)
session.commit()
node_finish_resp = self._workflow_cycle_manager._workflow_node_finish_to_stream_response(
event=event,
task_id=self._application_generate_entity.task_id,
workflow_node_execution=workflow_node_execution,
)
if node_finish_resp:
yield node_finish_resp

View File

@ -17,6 +17,7 @@ class BaseAppGenerator:
user_inputs: Optional[Mapping[str, Any]],
variables: Sequence["VariableEntity"],
tenant_id: str,
strict_type_validation: bool = False,
) -> Mapping[str, Any]:
user_inputs = user_inputs or {}
# Filter input variables from form configuration, handle required fields, default values, and option values
@ -37,6 +38,7 @@ class BaseAppGenerator:
allowed_file_extensions=entity_dictionary[k].allowed_file_extensions,
allowed_file_upload_methods=entity_dictionary[k].allowed_file_upload_methods,
),
strict_type_validation=strict_type_validation,
)
for k, v in user_inputs.items()
if isinstance(v, dict) and entity_dictionary[k].type == VariableEntityType.FILE

View File

@ -92,6 +92,7 @@ class WorkflowAppGenerator(BaseAppGenerator):
mappings=files,
tenant_id=app_model.tenant_id,
config=file_extra_config,
strict_type_validation=True if invoke_from == InvokeFrom.SERVICE_API else False,
)
# convert to app config
@ -114,7 +115,10 @@ class WorkflowAppGenerator(BaseAppGenerator):
app_config=app_config,
file_upload_config=file_extra_config,
inputs=self._prepare_user_inputs(
user_inputs=inputs, variables=app_config.variables, tenant_id=app_model.tenant_id
user_inputs=inputs,
variables=app_config.variables,
tenant_id=app_model.tenant_id,
strict_type_validation=True if invoke_from == InvokeFrom.SERVICE_API else False,
),
files=list(system_files),
user_id=user.id,

View File

@ -279,10 +279,9 @@ class WorkflowAppGenerateTaskPipeline:
session=session, workflow_run_id=self._workflow_run_id
)
workflow_node_execution = self._workflow_cycle_manager._handle_workflow_node_execution_retried(
session=session, workflow_run=workflow_run, event=event
workflow_run=workflow_run, event=event
)
response = self._workflow_cycle_manager._workflow_node_retry_to_stream_response(
session=session,
event=event,
task_id=self._application_generate_entity.task_id,
workflow_node_execution=workflow_node_execution,
@ -300,10 +299,9 @@ class WorkflowAppGenerateTaskPipeline:
session=session, workflow_run_id=self._workflow_run_id
)
workflow_node_execution = self._workflow_cycle_manager._handle_node_execution_start(
session=session, workflow_run=workflow_run, event=event
workflow_run=workflow_run, event=event
)
node_start_response = self._workflow_cycle_manager._workflow_node_start_to_stream_response(
session=session,
event=event,
task_id=self._application_generate_entity.task_id,
workflow_node_execution=workflow_node_execution,
@ -313,17 +311,14 @@ class WorkflowAppGenerateTaskPipeline:
if node_start_response:
yield node_start_response
elif isinstance(event, QueueNodeSucceededEvent):
with Session(db.engine, expire_on_commit=False) as session:
workflow_node_execution = self._workflow_cycle_manager._handle_workflow_node_execution_success(
session=session, event=event
)
node_success_response = self._workflow_cycle_manager._workflow_node_finish_to_stream_response(
session=session,
event=event,
task_id=self._application_generate_entity.task_id,
workflow_node_execution=workflow_node_execution,
)
session.commit()
workflow_node_execution = self._workflow_cycle_manager._handle_workflow_node_execution_success(
event=event
)
node_success_response = self._workflow_cycle_manager._workflow_node_finish_to_stream_response(
event=event,
task_id=self._application_generate_entity.task_id,
workflow_node_execution=workflow_node_execution,
)
if node_success_response:
yield node_success_response
@ -334,18 +329,14 @@ class WorkflowAppGenerateTaskPipeline:
| QueueNodeInLoopFailedEvent
| QueueNodeExceptionEvent,
):
with Session(db.engine, expire_on_commit=False) as session:
workflow_node_execution = self._workflow_cycle_manager._handle_workflow_node_execution_failed(
session=session,
event=event,
)
node_failed_response = self._workflow_cycle_manager._workflow_node_finish_to_stream_response(
session=session,
event=event,
task_id=self._application_generate_entity.task_id,
workflow_node_execution=workflow_node_execution,
)
session.commit()
workflow_node_execution = self._workflow_cycle_manager._handle_workflow_node_execution_failed(
event=event,
)
node_failed_response = self._workflow_cycle_manager._workflow_node_finish_to_stream_response(
event=event,
task_id=self._application_generate_entity.task_id,
workflow_node_execution=workflow_node_execution,
)
if node_failed_response:
yield node_failed_response
@ -627,6 +618,7 @@ class WorkflowAppGenerateTaskPipeline:
workflow_app_log.created_by = self._user_id
session.add(workflow_app_log)
session.commit()
def _text_chunk_to_stream_response(
self, text: str, from_variable_selector: Optional[list[str]] = None

View File

@ -6,7 +6,7 @@ from typing import Any, Optional, Union, cast
from uuid import uuid4
from sqlalchemy import func, select
from sqlalchemy.orm import Session
from sqlalchemy.orm import Session, sessionmaker
from core.app.entities.app_invoke_entities import AdvancedChatAppGenerateEntity, InvokeFrom, WorkflowAppGenerateEntity
from core.app.entities.queue_entities import (
@ -49,12 +49,14 @@ from core.file import FILE_MODEL_IDENTITY, File
from core.model_runtime.utils.encoders import jsonable_encoder
from core.ops.entities.trace_entity import TraceTaskName
from core.ops.ops_trace_manager import TraceQueueManager, TraceTask
from core.repository import RepositoryFactory
from core.tools.tool_manager import ToolManager
from core.workflow.entities.node_entities import NodeRunMetadataKey
from core.workflow.enums import SystemVariableKey
from core.workflow.nodes import NodeType
from core.workflow.nodes.tool.entities import ToolNodeData
from core.workflow.workflow_entry import WorkflowEntry
from extensions.ext_database import db
from models.account import Account
from models.enums import CreatedByRole, WorkflowRunTriggeredFrom
from models.model import EndUser
@ -80,6 +82,21 @@ class WorkflowCycleManage:
self._application_generate_entity = application_generate_entity
self._workflow_system_variables = workflow_system_variables
# Initialize the session factory and repository
# We use the global db engine instead of the session passed to methods
# Disable expire_on_commit to avoid the need for merging objects
self._session_factory = sessionmaker(bind=db.engine, expire_on_commit=False)
self._workflow_node_execution_repository = RepositoryFactory.create_workflow_node_execution_repository(
params={
"tenant_id": self._application_generate_entity.app_config.tenant_id,
"app_id": self._application_generate_entity.app_config.app_id,
"session_factory": self._session_factory,
}
)
# We'll still keep the cache for backward compatibility and performance
# but use the repository for database operations
def _handle_workflow_run_start(
self,
*,
@ -254,19 +271,15 @@ class WorkflowCycleManage:
workflow_run.finished_at = datetime.now(UTC).replace(tzinfo=None)
workflow_run.exceptions_count = exceptions_count
stmt = select(WorkflowNodeExecution.node_execution_id).where(
WorkflowNodeExecution.tenant_id == workflow_run.tenant_id,
WorkflowNodeExecution.app_id == workflow_run.app_id,
WorkflowNodeExecution.workflow_id == workflow_run.workflow_id,
WorkflowNodeExecution.triggered_from == WorkflowNodeExecutionTriggeredFrom.WORKFLOW_RUN.value,
WorkflowNodeExecution.workflow_run_id == workflow_run.id,
WorkflowNodeExecution.status == WorkflowNodeExecutionStatus.RUNNING.value,
# Use the instance repository to find running executions for a workflow run
running_workflow_node_executions = self._workflow_node_execution_repository.get_running_executions(
workflow_run_id=workflow_run.id
)
ids = session.scalars(stmt).all()
# Use self._get_workflow_node_execution here to make sure the cache is updated
running_workflow_node_executions = [
self._get_workflow_node_execution(session=session, node_execution_id=id) for id in ids if id
]
# Update the cache with the retrieved executions
for execution in running_workflow_node_executions:
if execution.node_execution_id:
self._workflow_node_executions[execution.node_execution_id] = execution
for workflow_node_execution in running_workflow_node_executions:
now = datetime.now(UTC).replace(tzinfo=None)
@ -288,7 +301,7 @@ class WorkflowCycleManage:
return workflow_run
def _handle_node_execution_start(
self, *, session: Session, workflow_run: WorkflowRun, event: QueueNodeStartedEvent
self, *, workflow_run: WorkflowRun, event: QueueNodeStartedEvent
) -> WorkflowNodeExecution:
workflow_node_execution = WorkflowNodeExecution()
workflow_node_execution.id = str(uuid4())
@ -315,17 +328,14 @@ class WorkflowCycleManage:
)
workflow_node_execution.created_at = datetime.now(UTC).replace(tzinfo=None)
session.add(workflow_node_execution)
# Use the instance repository to save the workflow node execution
self._workflow_node_execution_repository.save(workflow_node_execution)
self._workflow_node_executions[event.node_execution_id] = workflow_node_execution
return workflow_node_execution
def _handle_workflow_node_execution_success(
self, *, session: Session, event: QueueNodeSucceededEvent
) -> WorkflowNodeExecution:
workflow_node_execution = self._get_workflow_node_execution(
session=session, node_execution_id=event.node_execution_id
)
def _handle_workflow_node_execution_success(self, *, event: QueueNodeSucceededEvent) -> WorkflowNodeExecution:
workflow_node_execution = self._get_workflow_node_execution(node_execution_id=event.node_execution_id)
inputs = WorkflowEntry.handle_special_values(event.inputs)
process_data = WorkflowEntry.handle_special_values(event.process_data)
outputs = WorkflowEntry.handle_special_values(event.outputs)
@ -344,13 +354,13 @@ class WorkflowCycleManage:
workflow_node_execution.finished_at = finished_at
workflow_node_execution.elapsed_time = elapsed_time
workflow_node_execution = session.merge(workflow_node_execution)
# Use the instance repository to update the workflow node execution
self._workflow_node_execution_repository.update(workflow_node_execution)
return workflow_node_execution
def _handle_workflow_node_execution_failed(
self,
*,
session: Session,
event: QueueNodeFailedEvent
| QueueNodeInIterationFailedEvent
| QueueNodeInLoopFailedEvent
@ -361,9 +371,7 @@ class WorkflowCycleManage:
:param event: queue node failed event
:return:
"""
workflow_node_execution = self._get_workflow_node_execution(
session=session, node_execution_id=event.node_execution_id
)
workflow_node_execution = self._get_workflow_node_execution(node_execution_id=event.node_execution_id)
inputs = WorkflowEntry.handle_special_values(event.inputs)
process_data = WorkflowEntry.handle_special_values(event.process_data)
@ -387,14 +395,14 @@ class WorkflowCycleManage:
workflow_node_execution.elapsed_time = elapsed_time
workflow_node_execution.execution_metadata = execution_metadata
workflow_node_execution = session.merge(workflow_node_execution)
return workflow_node_execution
def _handle_workflow_node_execution_retried(
self, *, session: Session, workflow_run: WorkflowRun, event: QueueNodeRetryEvent
self, *, workflow_run: WorkflowRun, event: QueueNodeRetryEvent
) -> WorkflowNodeExecution:
"""
Workflow node execution failed
:param workflow_run: workflow run
:param event: queue node failed event
:return:
"""
@ -439,15 +447,12 @@ class WorkflowCycleManage:
workflow_node_execution.execution_metadata = execution_metadata
workflow_node_execution.index = event.node_run_index
session.add(workflow_node_execution)
# Use the instance repository to save the workflow node execution
self._workflow_node_execution_repository.save(workflow_node_execution)
self._workflow_node_executions[event.node_execution_id] = workflow_node_execution
return workflow_node_execution
#################################################
# to stream responses #
#################################################
def _workflow_start_to_stream_response(
self,
*,
@ -455,7 +460,6 @@ class WorkflowCycleManage:
task_id: str,
workflow_run: WorkflowRun,
) -> WorkflowStartStreamResponse:
# receive session to make sure the workflow_run won't be expired, need a more elegant way to handle this
_ = session
return WorkflowStartStreamResponse(
task_id=task_id,
@ -521,14 +525,10 @@ class WorkflowCycleManage:
def _workflow_node_start_to_stream_response(
self,
*,
session: Session,
event: QueueNodeStartedEvent,
task_id: str,
workflow_node_execution: WorkflowNodeExecution,
) -> Optional[NodeStartStreamResponse]:
# receive session to make sure the workflow_run won't be expired, need a more elegant way to handle this
_ = session
if workflow_node_execution.node_type in {NodeType.ITERATION.value, NodeType.LOOP.value}:
return None
if not workflow_node_execution.workflow_run_id:
@ -571,7 +571,6 @@ class WorkflowCycleManage:
def _workflow_node_finish_to_stream_response(
self,
*,
session: Session,
event: QueueNodeSucceededEvent
| QueueNodeFailedEvent
| QueueNodeInIterationFailedEvent
@ -580,8 +579,6 @@ class WorkflowCycleManage:
task_id: str,
workflow_node_execution: WorkflowNodeExecution,
) -> Optional[NodeFinishStreamResponse]:
# receive session to make sure the workflow_run won't be expired, need a more elegant way to handle this
_ = session
if workflow_node_execution.node_type in {NodeType.ITERATION.value, NodeType.LOOP.value}:
return None
if not workflow_node_execution.workflow_run_id:
@ -621,13 +618,10 @@ class WorkflowCycleManage:
def _workflow_node_retry_to_stream_response(
self,
*,
session: Session,
event: QueueNodeRetryEvent,
task_id: str,
workflow_node_execution: WorkflowNodeExecution,
) -> Optional[Union[NodeRetryStreamResponse, NodeFinishStreamResponse]]:
# receive session to make sure the workflow_run won't be expired, need a more elegant way to handle this
_ = session
if workflow_node_execution.node_type in {NodeType.ITERATION.value, NodeType.LOOP.value}:
return None
if not workflow_node_execution.workflow_run_id:
@ -668,7 +662,6 @@ class WorkflowCycleManage:
def _workflow_parallel_branch_start_to_stream_response(
self, *, session: Session, task_id: str, workflow_run: WorkflowRun, event: QueueParallelBranchRunStartedEvent
) -> ParallelBranchStartStreamResponse:
# receive session to make sure the workflow_run won't be expired, need a more elegant way to handle this
_ = session
return ParallelBranchStartStreamResponse(
task_id=task_id,
@ -692,7 +685,6 @@ class WorkflowCycleManage:
workflow_run: WorkflowRun,
event: QueueParallelBranchRunSucceededEvent | QueueParallelBranchRunFailedEvent,
) -> ParallelBranchFinishedStreamResponse:
# receive session to make sure the workflow_run won't be expired, need a more elegant way to handle this
_ = session
return ParallelBranchFinishedStreamResponse(
task_id=task_id,
@ -713,7 +705,6 @@ class WorkflowCycleManage:
def _workflow_iteration_start_to_stream_response(
self, *, session: Session, task_id: str, workflow_run: WorkflowRun, event: QueueIterationStartEvent
) -> IterationNodeStartStreamResponse:
# receive session to make sure the workflow_run won't be expired, need a more elegant way to handle this
_ = session
return IterationNodeStartStreamResponse(
task_id=task_id,
@ -735,7 +726,6 @@ class WorkflowCycleManage:
def _workflow_iteration_next_to_stream_response(
self, *, session: Session, task_id: str, workflow_run: WorkflowRun, event: QueueIterationNextEvent
) -> IterationNodeNextStreamResponse:
# receive session to make sure the workflow_run won't be expired, need a more elegant way to handle this
_ = session
return IterationNodeNextStreamResponse(
task_id=task_id,
@ -759,7 +749,6 @@ class WorkflowCycleManage:
def _workflow_iteration_completed_to_stream_response(
self, *, session: Session, task_id: str, workflow_run: WorkflowRun, event: QueueIterationCompletedEvent
) -> IterationNodeCompletedStreamResponse:
# receive session to make sure the workflow_run won't be expired, need a more elegant way to handle this
_ = session
return IterationNodeCompletedStreamResponse(
task_id=task_id,
@ -790,7 +779,6 @@ class WorkflowCycleManage:
def _workflow_loop_start_to_stream_response(
self, *, session: Session, task_id: str, workflow_run: WorkflowRun, event: QueueLoopStartEvent
) -> LoopNodeStartStreamResponse:
# receive session to make sure the workflow_run won't be expired, need a more elegant way to handle this
_ = session
return LoopNodeStartStreamResponse(
task_id=task_id,
@ -812,7 +800,6 @@ class WorkflowCycleManage:
def _workflow_loop_next_to_stream_response(
self, *, session: Session, task_id: str, workflow_run: WorkflowRun, event: QueueLoopNextEvent
) -> LoopNodeNextStreamResponse:
# receive session to make sure the workflow_run won't be expired, need a more elegant way to handle this
_ = session
return LoopNodeNextStreamResponse(
task_id=task_id,
@ -836,7 +823,6 @@ class WorkflowCycleManage:
def _workflow_loop_completed_to_stream_response(
self, *, session: Session, task_id: str, workflow_run: WorkflowRun, event: QueueLoopCompletedEvent
) -> LoopNodeCompletedStreamResponse:
# receive session to make sure the workflow_run won't be expired, need a more elegant way to handle this
_ = session
return LoopNodeCompletedStreamResponse(
task_id=task_id,
@ -934,11 +920,22 @@ class WorkflowCycleManage:
return workflow_run
def _get_workflow_node_execution(self, session: Session, node_execution_id: str) -> WorkflowNodeExecution:
if node_execution_id not in self._workflow_node_executions:
def _get_workflow_node_execution(self, node_execution_id: str) -> WorkflowNodeExecution:
# First check the cache for performance
if node_execution_id in self._workflow_node_executions:
cached_execution = self._workflow_node_executions[node_execution_id]
# No need to merge with session since expire_on_commit=False
return cached_execution
# If not in cache, use the instance repository to get by node_execution_id
execution = self._workflow_node_execution_repository.get_by_node_execution_id(node_execution_id)
if not execution:
raise ValueError(f"Workflow node execution not found: {node_execution_id}")
cached_workflow_node_execution = self._workflow_node_executions[node_execution_id]
return session.merge(cached_workflow_node_execution)
# Update cache
self._workflow_node_executions[node_execution_id] = execution
return execution
def _handle_agent_log(self, task_id: str, event: QueueAgentLogEvent) -> AgentLogStreamResponse:
"""

View File

@ -6,7 +6,6 @@ from core.rag.models.document import Document
from extensions.ext_database import db
from models.dataset import ChildChunk, DatasetQuery, DocumentSegment
from models.dataset import Document as DatasetDocument
from models.model import DatasetRetrieverResource
class DatasetIndexToolCallbackHandler:
@ -71,29 +70,6 @@ class DatasetIndexToolCallbackHandler:
def return_retriever_resource_info(self, resource: list):
"""Handle return_retriever_resource_info."""
if resource and len(resource) > 0:
for item in resource:
dataset_retriever_resource = DatasetRetrieverResource(
message_id=self._message_id,
position=item.get("position") or 0,
dataset_id=item.get("dataset_id"),
dataset_name=item.get("dataset_name"),
document_id=item.get("document_id"),
document_name=item.get("document_name"),
data_source_type=item.get("data_source_type"),
segment_id=item.get("segment_id"),
score=item.get("score") if "score" in item else None,
hit_count=item.get("hit_count") if "hit_count" in item else None,
word_count=item.get("word_count") if "word_count" in item else None,
segment_position=item.get("segment_position") if "segment_position" in item else None,
index_node_hash=item.get("index_node_hash") if "index_node_hash" in item else None,
content=item.get("content"),
retriever_from=item.get("retriever_from"),
created_by=self._user_id,
)
db.session.add(dataset_retriever_resource)
db.session.commit()
self._queue_manager.publish(
QueueRetrieverResourcesEvent(retriever_resources=resource), PublishFrom.APPLICATION_MANAGER
)

View File

@ -48,25 +48,26 @@ def make_request(method, url, max_retries=SSRF_DEFAULT_MAX_RETRIES, **kwargs):
write=dify_config.SSRF_DEFAULT_WRITE_TIME_OUT,
)
if "ssl_verify" not in kwargs:
kwargs["ssl_verify"] = HTTP_REQUEST_NODE_SSL_VERIFY
ssl_verify = kwargs.pop("ssl_verify")
retries = 0
while retries <= max_retries:
try:
if dify_config.SSRF_PROXY_ALL_URL:
with httpx.Client(proxy=dify_config.SSRF_PROXY_ALL_URL, verify=HTTP_REQUEST_NODE_SSL_VERIFY) as client:
with httpx.Client(proxy=dify_config.SSRF_PROXY_ALL_URL, verify=ssl_verify) as client:
response = client.request(method=method, url=url, **kwargs)
elif dify_config.SSRF_PROXY_HTTP_URL and dify_config.SSRF_PROXY_HTTPS_URL:
proxy_mounts = {
"http://": httpx.HTTPTransport(
proxy=dify_config.SSRF_PROXY_HTTP_URL, verify=HTTP_REQUEST_NODE_SSL_VERIFY
),
"https://": httpx.HTTPTransport(
proxy=dify_config.SSRF_PROXY_HTTPS_URL, verify=HTTP_REQUEST_NODE_SSL_VERIFY
),
"http://": httpx.HTTPTransport(proxy=dify_config.SSRF_PROXY_HTTP_URL, verify=ssl_verify),
"https://": httpx.HTTPTransport(proxy=dify_config.SSRF_PROXY_HTTPS_URL, verify=ssl_verify),
}
with httpx.Client(mounts=proxy_mounts, verify=HTTP_REQUEST_NODE_SSL_VERIFY) as client:
with httpx.Client(mounts=proxy_mounts, verify=ssl_verify) as client:
response = client.request(method=method, url=url, **kwargs)
else:
with httpx.Client(verify=HTTP_REQUEST_NODE_SSL_VERIFY) as client:
with httpx.Client(verify=ssl_verify) as client:
response = client.request(method=method, url=url, **kwargs)
if response.status_code not in STATUS_FORCELIST:

View File

@ -44,6 +44,7 @@ class TokenBufferMemory:
Message.created_at,
Message.workflow_run_id,
Message.parent_message_id,
Message.answer_tokens,
)
.filter(
Message.conversation_id == self.conversation.id,
@ -63,7 +64,7 @@ class TokenBufferMemory:
thread_messages = extract_thread_messages(messages)
# for newly created message, its answer is temporarily empty, we don't need to add it to memory
if thread_messages and not thread_messages[0].answer:
if thread_messages and not thread_messages[0].answer and thread_messages[0].answer_tokens == 0:
thread_messages.pop(0)
messages = list(reversed(thread_messages))

View File

@ -1,5 +1,6 @@
import logging
import time
import uuid
from collections.abc import Generator, Sequence
from typing import Optional, Union
@ -24,6 +25,58 @@ from core.plugin.manager.model import PluginModelManager
logger = logging.getLogger(__name__)
def _gen_tool_call_id() -> str:
return f"chatcmpl-tool-{str(uuid.uuid4().hex)}"
def _increase_tool_call(
new_tool_calls: list[AssistantPromptMessage.ToolCall], existing_tools_calls: list[AssistantPromptMessage.ToolCall]
):
"""
Merge incremental tool call updates into existing tool calls.
:param new_tool_calls: List of new tool call deltas to be merged.
:param existing_tools_calls: List of existing tool calls to be modified IN-PLACE.
"""
def get_tool_call(tool_call_id: str):
"""
Get or create a tool call by ID
:param tool_call_id: tool call ID
:return: existing or new tool call
"""
if not tool_call_id:
return existing_tools_calls[-1]
_tool_call = next((_tool_call for _tool_call in existing_tools_calls if _tool_call.id == tool_call_id), None)
if _tool_call is None:
_tool_call = AssistantPromptMessage.ToolCall(
id=tool_call_id,
type="function",
function=AssistantPromptMessage.ToolCall.ToolCallFunction(name="", arguments=""),
)
existing_tools_calls.append(_tool_call)
return _tool_call
for new_tool_call in new_tool_calls:
# generate ID for tool calls with function name but no ID to track them
if new_tool_call.function.name and not new_tool_call.id:
new_tool_call.id = _gen_tool_call_id()
# get tool call
tool_call = get_tool_call(new_tool_call.id)
# update tool call
if new_tool_call.id:
tool_call.id = new_tool_call.id
if new_tool_call.type:
tool_call.type = new_tool_call.type
if new_tool_call.function.name:
tool_call.function.name = new_tool_call.function.name
if new_tool_call.function.arguments:
tool_call.function.arguments += new_tool_call.function.arguments
class LargeLanguageModel(AIModel):
"""
Model class for large language model.
@ -109,44 +162,13 @@ class LargeLanguageModel(AIModel):
system_fingerprint = None
tools_calls: list[AssistantPromptMessage.ToolCall] = []
def increase_tool_call(new_tool_calls: list[AssistantPromptMessage.ToolCall]):
def get_tool_call(tool_name: str):
if not tool_name:
return tools_calls[-1]
tool_call = next(
(tool_call for tool_call in tools_calls if tool_call.function.name == tool_name), None
)
if tool_call is None:
tool_call = AssistantPromptMessage.ToolCall(
id="",
type="",
function=AssistantPromptMessage.ToolCall.ToolCallFunction(name=tool_name, arguments=""),
)
tools_calls.append(tool_call)
return tool_call
for new_tool_call in new_tool_calls:
# get tool call
tool_call = get_tool_call(new_tool_call.function.name)
# update tool call
if new_tool_call.id:
tool_call.id = new_tool_call.id
if new_tool_call.type:
tool_call.type = new_tool_call.type
if new_tool_call.function.name:
tool_call.function.name = new_tool_call.function.name
if new_tool_call.function.arguments:
tool_call.function.arguments += new_tool_call.function.arguments
for chunk in result:
if isinstance(chunk.delta.message.content, str):
content += chunk.delta.message.content
elif isinstance(chunk.delta.message.content, list):
content_list.extend(chunk.delta.message.content)
if chunk.delta.message.tool_calls:
increase_tool_call(chunk.delta.message.tool_calls)
_increase_tool_call(chunk.delta.message.tool_calls, tools_calls)
usage = chunk.delta.usage or LLMUsage.empty_usage()
system_fingerprint = chunk.system_fingerprint

View File

@ -5,6 +5,7 @@ from datetime import datetime, timedelta
from typing import Optional
from langfuse import Langfuse # type: ignore
from sqlalchemy.orm import sessionmaker
from core.ops.base_trace_instance import BaseTraceInstance
from core.ops.entities.config_entity import LangfuseConfig
@ -28,9 +29,9 @@ from core.ops.langfuse_trace.entities.langfuse_trace_entity import (
UnitEnum,
)
from core.ops.utils import filter_none_values
from core.repository.repository_factory import RepositoryFactory
from extensions.ext_database import db
from models.model import EndUser
from models.workflow import WorkflowNodeExecution
logger = logging.getLogger(__name__)
@ -110,36 +111,18 @@ class LangFuseDataTrace(BaseTraceInstance):
)
self.add_trace(langfuse_trace_data=trace_data)
# through workflow_run_id get all_nodes_execution
workflow_nodes_execution_id_records = (
db.session.query(WorkflowNodeExecution.id)
.filter(WorkflowNodeExecution.workflow_run_id == trace_info.workflow_run_id)
.all()
# through workflow_run_id get all_nodes_execution using repository
session_factory = sessionmaker(bind=db.engine)
workflow_node_execution_repository = RepositoryFactory.create_workflow_node_execution_repository(
params={"tenant_id": trace_info.tenant_id, "session_factory": session_factory},
)
for node_execution_id_record in workflow_nodes_execution_id_records:
node_execution = (
db.session.query(
WorkflowNodeExecution.id,
WorkflowNodeExecution.tenant_id,
WorkflowNodeExecution.app_id,
WorkflowNodeExecution.title,
WorkflowNodeExecution.node_type,
WorkflowNodeExecution.status,
WorkflowNodeExecution.inputs,
WorkflowNodeExecution.outputs,
WorkflowNodeExecution.created_at,
WorkflowNodeExecution.elapsed_time,
WorkflowNodeExecution.process_data,
WorkflowNodeExecution.execution_metadata,
)
.filter(WorkflowNodeExecution.id == node_execution_id_record.id)
.first()
)
if not node_execution:
continue
# Get all executions for this workflow run
workflow_node_executions = workflow_node_execution_repository.get_by_workflow_run(
workflow_run_id=trace_info.workflow_run_id
)
for node_execution in workflow_node_executions:
node_execution_id = node_execution.id
tenant_id = node_execution.tenant_id
app_id = node_execution.app_id

View File

@ -7,6 +7,7 @@ from typing import Optional, cast
from langsmith import Client
from langsmith.schemas import RunBase
from sqlalchemy.orm import sessionmaker
from core.ops.base_trace_instance import BaseTraceInstance
from core.ops.entities.config_entity import LangSmithConfig
@ -27,9 +28,9 @@ from core.ops.langsmith_trace.entities.langsmith_trace_entity import (
LangSmithRunUpdateModel,
)
from core.ops.utils import filter_none_values, generate_dotted_order
from core.repository.repository_factory import RepositoryFactory
from extensions.ext_database import db
from models.model import EndUser, MessageFile
from models.workflow import WorkflowNodeExecution
logger = logging.getLogger(__name__)
@ -134,36 +135,22 @@ class LangSmithDataTrace(BaseTraceInstance):
self.add_run(langsmith_run)
# through workflow_run_id get all_nodes_execution
workflow_nodes_execution_id_records = (
db.session.query(WorkflowNodeExecution.id)
.filter(WorkflowNodeExecution.workflow_run_id == trace_info.workflow_run_id)
.all()
# through workflow_run_id get all_nodes_execution using repository
session_factory = sessionmaker(bind=db.engine)
workflow_node_execution_repository = RepositoryFactory.create_workflow_node_execution_repository(
params={
"tenant_id": trace_info.tenant_id,
"app_id": trace_info.metadata.get("app_id"),
"session_factory": session_factory,
},
)
for node_execution_id_record in workflow_nodes_execution_id_records:
node_execution = (
db.session.query(
WorkflowNodeExecution.id,
WorkflowNodeExecution.tenant_id,
WorkflowNodeExecution.app_id,
WorkflowNodeExecution.title,
WorkflowNodeExecution.node_type,
WorkflowNodeExecution.status,
WorkflowNodeExecution.inputs,
WorkflowNodeExecution.outputs,
WorkflowNodeExecution.created_at,
WorkflowNodeExecution.elapsed_time,
WorkflowNodeExecution.process_data,
WorkflowNodeExecution.execution_metadata,
)
.filter(WorkflowNodeExecution.id == node_execution_id_record.id)
.first()
)
if not node_execution:
continue
# Get all executions for this workflow run
workflow_node_executions = workflow_node_execution_repository.get_by_workflow_run(
workflow_run_id=trace_info.workflow_run_id
)
for node_execution in workflow_node_executions:
node_execution_id = node_execution.id
tenant_id = node_execution.tenant_id
app_id = node_execution.app_id

View File

@ -7,6 +7,7 @@ from typing import Optional, cast
from opik import Opik, Trace
from opik.id_helpers import uuid4_to_uuid7
from sqlalchemy.orm import sessionmaker
from core.ops.base_trace_instance import BaseTraceInstance
from core.ops.entities.config_entity import OpikConfig
@ -21,9 +22,9 @@ from core.ops.entities.trace_entity import (
TraceTaskName,
WorkflowTraceInfo,
)
from core.repository.repository_factory import RepositoryFactory
from extensions.ext_database import db
from models.model import EndUser, MessageFile
from models.workflow import WorkflowNodeExecution
logger = logging.getLogger(__name__)
@ -147,36 +148,22 @@ class OpikDataTrace(BaseTraceInstance):
}
self.add_trace(trace_data)
# through workflow_run_id get all_nodes_execution
workflow_nodes_execution_id_records = (
db.session.query(WorkflowNodeExecution.id)
.filter(WorkflowNodeExecution.workflow_run_id == trace_info.workflow_run_id)
.all()
# through workflow_run_id get all_nodes_execution using repository
session_factory = sessionmaker(bind=db.engine)
workflow_node_execution_repository = RepositoryFactory.create_workflow_node_execution_repository(
params={
"tenant_id": trace_info.tenant_id,
"app_id": trace_info.metadata.get("app_id"),
"session_factory": session_factory,
},
)
for node_execution_id_record in workflow_nodes_execution_id_records:
node_execution = (
db.session.query(
WorkflowNodeExecution.id,
WorkflowNodeExecution.tenant_id,
WorkflowNodeExecution.app_id,
WorkflowNodeExecution.title,
WorkflowNodeExecution.node_type,
WorkflowNodeExecution.status,
WorkflowNodeExecution.inputs,
WorkflowNodeExecution.outputs,
WorkflowNodeExecution.created_at,
WorkflowNodeExecution.elapsed_time,
WorkflowNodeExecution.process_data,
WorkflowNodeExecution.execution_metadata,
)
.filter(WorkflowNodeExecution.id == node_execution_id_record.id)
.first()
)
if not node_execution:
continue
# Get all executions for this workflow run
workflow_node_executions = workflow_node_execution_repository.get_by_workflow_run(
workflow_run_id=trace_info.workflow_run_id
)
for node_execution in workflow_node_executions:
node_execution_id = node_execution.id
tenant_id = node_execution.tenant_id
app_id = node_execution.app_id

View File

@ -2,6 +2,7 @@ from collections.abc import Generator, Mapping
from typing import Optional, Union
from controllers.service_api.wraps import create_or_update_end_user_for_user_id
from core.app.app_config.common.parameters_mapping import get_parameters_from_feature_dict
from core.app.apps.advanced_chat.app_generator import AdvancedChatAppGenerator
from core.app.apps.agent_chat.app_generator import AgentChatAppGenerator
from core.app.apps.chat.app_generator import ChatAppGenerator
@ -15,6 +16,34 @@ from models.model import App, AppMode, EndUser
class PluginAppBackwardsInvocation(BaseBackwardsInvocation):
@classmethod
def fetch_app_info(cls, app_id: str, tenant_id: str) -> Mapping:
"""
Fetch app info
"""
app = cls._get_app(app_id, tenant_id)
"""Retrieve app parameters."""
if app.mode in {AppMode.ADVANCED_CHAT.value, AppMode.WORKFLOW.value}:
workflow = app.workflow
if workflow is None:
raise ValueError("unexpected app type")
features_dict = workflow.features_dict
user_input_form = workflow.user_input_form(to_old_structure=True)
else:
app_model_config = app.app_model_config
if app_model_config is None:
raise ValueError("unexpected app type")
features_dict = app_model_config.to_dict()
user_input_form = features_dict.get("user_input_form", [])
return {
"data": get_parameters_from_feature_dict(features_dict=features_dict, user_input_form=user_input_form),
}
@classmethod
def invoke_app(
cls,

View File

@ -131,7 +131,7 @@ def cast_parameter_value(typ: enum.StrEnum, value: Any, /):
raise ValueError("The selector must be a dictionary.")
return value
case PluginParameterType.TOOLS_SELECTOR:
if not isinstance(value, list):
if value and not isinstance(value, list):
raise ValueError("The tools selector must be a list.")
return value
case _:
@ -147,7 +147,7 @@ def init_frontend_parameter(rule: PluginParameter, type: enum.StrEnum, value: An
init frontend parameter by rule
"""
parameter_value = value
if not parameter_value and parameter_value != 0 and type != PluginParameterType.TOOLS_SELECTOR:
if not parameter_value and parameter_value != 0:
# get default value
parameter_value = rule.default
if not parameter_value and rule.required:

View File

@ -204,3 +204,11 @@ class RequestRequestUploadFile(BaseModel):
filename: str
mimetype: str
class RequestFetchAppInfo(BaseModel):
"""
Request to fetch app info
"""
app_id: str

View File

@ -82,7 +82,7 @@ class BasePluginManager:
Make a stream request to the plugin daemon inner API
"""
response = self._request(method, path, headers, data, params, files, stream=True)
for line in response.iter_lines():
for line in response.iter_lines(chunk_size=1024 * 8):
line = line.decode("utf-8").strip()
if line.startswith("data:"):
line = line[5:].strip()

View File

@ -110,7 +110,62 @@ class PluginToolManager(BasePluginManager):
"Content-Type": "application/json",
},
)
return response
class FileChunk:
"""
Only used for internal processing.
"""
bytes_written: int
total_length: int
data: bytearray
def __init__(self, total_length: int):
self.bytes_written = 0
self.total_length = total_length
self.data = bytearray(total_length)
files: dict[str, FileChunk] = {}
for resp in response:
if resp.type == ToolInvokeMessage.MessageType.BLOB_CHUNK:
assert isinstance(resp.message, ToolInvokeMessage.BlobChunkMessage)
# Get blob chunk information
chunk_id = resp.message.id
total_length = resp.message.total_length
blob_data = resp.message.blob
is_end = resp.message.end
# Initialize buffer for this file if it doesn't exist
if chunk_id not in files:
files[chunk_id] = FileChunk(total_length)
# If this is the final chunk, yield a complete blob message
if is_end:
yield ToolInvokeMessage(
type=ToolInvokeMessage.MessageType.BLOB,
message=ToolInvokeMessage.BlobMessage(blob=files[chunk_id].data),
meta=resp.meta,
)
else:
# Check if file is too large (30MB limit)
if files[chunk_id].bytes_written + len(blob_data) > 30 * 1024 * 1024:
# Delete the file if it's too large
del files[chunk_id]
# Skip yielding this message
raise ValueError("File is too large which reached the limit of 30MB")
# Check if single chunk is too large (8KB limit)
if len(blob_data) > 8192:
# Skip yielding this message
raise ValueError("File chunk is too large which reached the limit of 8KB")
# Append the blob data to the buffer
files[chunk_id].data[
files[chunk_id].bytes_written : files[chunk_id].bytes_written + len(blob_data)
] = blob_data
files[chunk_id].bytes_written += len(blob_data)
else:
yield resp
def validate_provider_credentials(
self, tenant_id: str, user_id: str, provider: str, credentials: dict[str, Any]

View File

@ -124,6 +124,15 @@ class ProviderManager:
# Get All preferred provider types of the workspace
provider_name_to_preferred_model_provider_records_dict = self._get_all_preferred_model_providers(tenant_id)
# Ensure that both the original provider name and its ModelProviderID string representation
# are present in the dictionary to handle cases where either form might be used
for provider_name in list(provider_name_to_preferred_model_provider_records_dict.keys()):
provider_id = ModelProviderID(provider_name)
if str(provider_id) not in provider_name_to_preferred_model_provider_records_dict:
# Add the ModelProviderID string representation if it's not already present
provider_name_to_preferred_model_provider_records_dict[str(provider_id)] = (
provider_name_to_preferred_model_provider_records_dict[provider_name]
)
# Get All provider model settings
provider_name_to_provider_model_settings_dict = self._get_all_provider_model_settings(tenant_id)
@ -497,8 +506,8 @@ class ProviderManager:
@staticmethod
def _init_trial_provider_records(
tenant_id: str, provider_name_to_provider_records_dict: dict[str, list]
) -> dict[str, list]:
tenant_id: str, provider_name_to_provider_records_dict: dict[str, list[Provider]]
) -> dict[str, list[Provider]]:
"""
Initialize trial provider records if not exists.
@ -532,7 +541,7 @@ class ProviderManager:
if ProviderQuotaType.TRIAL not in provider_quota_to_provider_record_dict:
try:
# FIXME ignore the type errork, onyl TrialHostingQuota has limit need to change the logic
provider_record = Provider(
new_provider_record = Provider(
tenant_id=tenant_id,
# TODO: Use provider name with prefix after the data migration.
provider_name=ModelProviderID(provider_name).provider_name,
@ -542,11 +551,12 @@ class ProviderManager:
quota_used=0,
is_valid=True,
)
db.session.add(provider_record)
db.session.add(new_provider_record)
db.session.commit()
provider_name_to_provider_records_dict[provider_name].append(new_provider_record)
except IntegrityError:
db.session.rollback()
provider_record = (
existed_provider_record = (
db.session.query(Provider)
.filter(
Provider.tenant_id == tenant_id,
@ -556,11 +566,14 @@ class ProviderManager:
)
.first()
)
if provider_record and not provider_record.is_valid:
provider_record.is_valid = True
if not existed_provider_record:
continue
if not existed_provider_record.is_valid:
existed_provider_record.is_valid = True
db.session.commit()
provider_name_to_provider_records_dict[provider_name].append(provider_record)
provider_name_to_provider_records_dict[provider_name].append(existed_provider_record)
return provider_name_to_provider_records_dict

View File

@ -139,13 +139,17 @@ class AnalyticdbVectorBySql:
)
if embedding_dimension is not None:
index_name = f"{self._collection_name}_embedding_idx"
cur.execute(f"ALTER TABLE {self.table_name} ALTER COLUMN vector SET STORAGE PLAIN")
cur.execute(
f"CREATE INDEX {index_name} ON {self.table_name} USING ann(vector) "
f"WITH(dim='{embedding_dimension}', distancemeasure='{self.config.metrics}', "
f"pq_enable=0, external_storage=0)"
)
cur.execute(f"CREATE INDEX ON {self.table_name} USING gin(to_tsvector)")
try:
cur.execute(f"ALTER TABLE {self.table_name} ALTER COLUMN vector SET STORAGE PLAIN")
cur.execute(
f"CREATE INDEX {index_name} ON {self.table_name} USING ann(vector) "
f"WITH(dim='{embedding_dimension}', distancemeasure='{self.config.metrics}', "
f"pq_enable=0, external_storage=0)"
)
cur.execute(f"CREATE INDEX ON {self.table_name} USING gin(to_tsvector)")
except Exception as e:
if "already exists" not in str(e):
raise e
redis_client.set(collection_exist_cache_key, 1, ex=3600)
def add_texts(self, documents: list[Document], embeddings: list[list[float]], **kwargs):
@ -177,9 +181,11 @@ class AnalyticdbVectorBySql:
return cur.fetchone() is not None
def delete_by_ids(self, ids: list[str]) -> None:
if not ids:
return
with self._get_cursor() as cur:
try:
cur.execute(f"DELETE FROM {self.table_name} WHERE ref_doc_id IN %s", (tuple(ids),))
cur.execute(f"DELETE FROM {self.table_name} WHERE ref_doc_id = ANY(%s)", (ids,))
except Exception as e:
if "does not exist" not in str(e):
raise e
@ -240,7 +246,7 @@ class AnalyticdbVectorBySql:
ts_rank(to_tsvector, to_tsquery_from_text(%s, 'zh_cn'), 32) AS score
FROM {self.table_name}
WHERE to_tsvector@@to_tsquery_from_text(%s, 'zh_cn') {where_clause}
ORDER BY score DESC
ORDER BY score DESC, id DESC
LIMIT {top_k}""",
(f"'{query}'", f"'{query}'"),
)

View File

@ -32,6 +32,7 @@ class MilvusConfig(BaseModel):
batch_size: int = 100 # Batch size for operations
database: str = "default" # Database name
enable_hybrid_search: bool = False # Flag to enable hybrid search
analyzer_params: Optional[str] = None # Analyzer params
@model_validator(mode="before")
@classmethod
@ -58,6 +59,7 @@ class MilvusConfig(BaseModel):
"user": self.user,
"password": self.password,
"db_name": self.database,
"analyzer_params": self.analyzer_params,
}
@ -300,14 +302,19 @@ class MilvusVector(BaseVector):
# Create the text field, enable_analyzer will be set True to support milvus automatically
# transfer text to sparse_vector, reference: https://milvus.io/docs/full-text-search.md
fields.append(
FieldSchema(
Field.CONTENT_KEY.value,
DataType.VARCHAR,
max_length=65_535,
enable_analyzer=self._hybrid_search_enabled,
)
)
content_field_kwargs: dict[str, Any] = {
"max_length": 65_535,
"enable_analyzer": self._hybrid_search_enabled,
}
if (
self._hybrid_search_enabled
and self._client_config.analyzer_params is not None
and self._client_config.analyzer_params.strip()
):
content_field_kwargs["analyzer_params"] = self._client_config.analyzer_params
fields.append(FieldSchema(Field.CONTENT_KEY.value, DataType.VARCHAR, **content_field_kwargs))
# Create the primary key field
fields.append(FieldSchema(Field.PRIMARY_KEY.value, DataType.INT64, is_primary=True, auto_id=True))
# Create the vector field, supports binary or float vectors
@ -383,5 +390,6 @@ class MilvusVectorFactory(AbstractVectorFactory):
password=dify_config.MILVUS_PASSWORD or "",
database=dify_config.MILVUS_DATABASE or "",
enable_hybrid_search=dify_config.MILVUS_ENABLE_HYBRID_SEARCH or False,
analyzer_params=dify_config.MILVUS_ANALYZER_PARAMS or "",
),
)

View File

@ -39,6 +39,12 @@ class EnhanceRecursiveCharacterTextSplitter(RecursiveCharacterTextSplitter):
else:
return [GPT2Tokenizer.get_num_tokens(text) for text in texts]
def _character_encoder(texts: list[str]) -> list[int]:
if not texts:
return []
return [len(text) for text in texts]
if issubclass(cls, TokenTextSplitter):
extra_kwargs = {
"model_name": embedding_model_instance.model if embedding_model_instance else "gpt2",
@ -47,7 +53,7 @@ class EnhanceRecursiveCharacterTextSplitter(RecursiveCharacterTextSplitter):
}
kwargs = {**kwargs, **extra_kwargs}
return cls(length_function=_token_encoder, **kwargs)
return cls(length_function=_character_encoder, **kwargs)
class FixedRecursiveCharacterTextSplitter(EnhanceRecursiveCharacterTextSplitter):
@ -103,7 +109,7 @@ class FixedRecursiveCharacterTextSplitter(EnhanceRecursiveCharacterTextSplitter)
_good_splits_lengths = [] # cache the lengths of the splits
_separator = "" if self._keep_separator else separator
s_lens = self._length_function(splits)
if _separator != "":
if separator != "":
for s, s_len in zip(splits, s_lens):
if s_len < self._chunk_size:
_good_splits.append(s)

View File

@ -0,0 +1,15 @@
"""
Repository interfaces for data access.
This package contains repository interfaces that define the contract
for accessing and manipulating data, regardless of the underlying
storage mechanism.
"""
from core.repository.repository_factory import RepositoryFactory
from core.repository.workflow_node_execution_repository import WorkflowNodeExecutionRepository
__all__ = [
"RepositoryFactory",
"WorkflowNodeExecutionRepository",
]

View File

@ -0,0 +1,97 @@
"""
Repository factory for creating repository instances.
This module provides a simple factory interface for creating repository instances.
It does not contain any implementation details or dependencies on specific repositories.
"""
from collections.abc import Callable, Mapping
from typing import Any, Literal, Optional, cast
from core.repository.workflow_node_execution_repository import WorkflowNodeExecutionRepository
# Type for factory functions - takes a dict of parameters and returns any repository type
RepositoryFactoryFunc = Callable[[Mapping[str, Any]], Any]
# Type for workflow node execution factory function
WorkflowNodeExecutionFactoryFunc = Callable[[Mapping[str, Any]], WorkflowNodeExecutionRepository]
# Repository type literals
_RepositoryType = Literal["workflow_node_execution"]
class RepositoryFactory:
"""
Factory class for creating repository instances.
This factory delegates the actual repository creation to implementation-specific
factory functions that are registered with the factory at runtime.
"""
# Dictionary to store factory functions
_factory_functions: dict[str, RepositoryFactoryFunc] = {}
@classmethod
def _register_factory(cls, repository_type: _RepositoryType, factory_func: RepositoryFactoryFunc) -> None:
"""
Register a factory function for a specific repository type.
This is a private method and should not be called directly.
Args:
repository_type: The type of repository (e.g., 'workflow_node_execution')
factory_func: A function that takes parameters and returns a repository instance
"""
cls._factory_functions[repository_type] = factory_func
@classmethod
def _create_repository(cls, repository_type: _RepositoryType, params: Optional[Mapping[str, Any]] = None) -> Any:
"""
Create a new repository instance with the provided parameters.
This is a private method and should not be called directly.
Args:
repository_type: The type of repository to create
params: A dictionary of parameters to pass to the factory function
Returns:
A new instance of the requested repository
Raises:
ValueError: If no factory function is registered for the repository type
"""
if repository_type not in cls._factory_functions:
raise ValueError(f"No factory function registered for repository type '{repository_type}'")
# Use empty dict if params is None
params = params or {}
return cls._factory_functions[repository_type](params)
@classmethod
def register_workflow_node_execution_factory(cls, factory_func: WorkflowNodeExecutionFactoryFunc) -> None:
"""
Register a factory function for the workflow node execution repository.
Args:
factory_func: A function that takes parameters and returns a WorkflowNodeExecutionRepository instance
"""
cls._register_factory("workflow_node_execution", factory_func)
@classmethod
def create_workflow_node_execution_repository(
cls, params: Optional[Mapping[str, Any]] = None
) -> WorkflowNodeExecutionRepository:
"""
Create a new WorkflowNodeExecutionRepository instance with the provided parameters.
Args:
params: A dictionary of parameters to pass to the factory function
Returns:
A new instance of the WorkflowNodeExecutionRepository
Raises:
ValueError: If no factory function is registered for the workflow_node_execution repository type
"""
# We can safely cast here because we've registered a WorkflowNodeExecutionFactoryFunc
return cast(WorkflowNodeExecutionRepository, cls._create_repository("workflow_node_execution", params))

View File

@ -0,0 +1,88 @@
from collections.abc import Sequence
from dataclasses import dataclass
from typing import Literal, Optional, Protocol
from models.workflow import WorkflowNodeExecution
@dataclass
class OrderConfig:
"""Configuration for ordering WorkflowNodeExecution instances."""
order_by: list[str]
order_direction: Optional[Literal["asc", "desc"]] = None
class WorkflowNodeExecutionRepository(Protocol):
"""
Repository interface for WorkflowNodeExecution.
This interface defines the contract for accessing and manipulating
WorkflowNodeExecution data, regardless of the underlying storage mechanism.
Note: Domain-specific concepts like multi-tenancy (tenant_id), application context (app_id),
and trigger sources (triggered_from) should be handled at the implementation level, not in
the core interface. This keeps the core domain model clean and independent of specific
application domains or deployment scenarios.
"""
def save(self, execution: WorkflowNodeExecution) -> None:
"""
Save a WorkflowNodeExecution instance.
Args:
execution: The WorkflowNodeExecution instance to save
"""
...
def get_by_node_execution_id(self, node_execution_id: str) -> Optional[WorkflowNodeExecution]:
"""
Retrieve a WorkflowNodeExecution by its node_execution_id.
Args:
node_execution_id: The node execution ID
Returns:
The WorkflowNodeExecution instance if found, None otherwise
"""
...
def get_by_workflow_run(
self,
workflow_run_id: str,
order_config: Optional[OrderConfig] = None,
) -> Sequence[WorkflowNodeExecution]:
"""
Retrieve all WorkflowNodeExecution instances for a specific workflow run.
Args:
workflow_run_id: The workflow run ID
order_config: Optional configuration for ordering results
order_config.order_by: List of fields to order by (e.g., ["index", "created_at"])
order_config.order_direction: Direction to order ("asc" or "desc")
Returns:
A list of WorkflowNodeExecution instances
"""
...
def get_running_executions(self, workflow_run_id: str) -> Sequence[WorkflowNodeExecution]:
"""
Retrieve all running WorkflowNodeExecution instances for a specific workflow run.
Args:
workflow_run_id: The workflow run ID
Returns:
A list of running WorkflowNodeExecution instances
"""
...
def update(self, execution: WorkflowNodeExecution) -> None:
"""
Update an existing WorkflowNodeExecution instance.
Args:
execution: The WorkflowNodeExecution instance to update
"""
...

View File

@ -120,6 +120,13 @@ class ToolInvokeMessage(BaseModel):
class BlobMessage(BaseModel):
blob: bytes
class BlobChunkMessage(BaseModel):
id: str = Field(..., description="The id of the blob")
sequence: int = Field(..., description="The sequence of the chunk")
total_length: int = Field(..., description="The total length of the blob")
blob: bytes = Field(..., description="The blob data of the chunk")
end: bool = Field(..., description="Whether the chunk is the last chunk")
class FileMessage(BaseModel):
pass
@ -180,12 +187,15 @@ class ToolInvokeMessage(BaseModel):
VARIABLE = "variable"
FILE = "file"
LOG = "log"
BLOB_CHUNK = "blob_chunk"
type: MessageType = MessageType.TEXT
"""
plain text, image url or link url
"""
message: JsonMessage | TextMessage | BlobMessage | LogMessage | FileMessage | None | VariableMessage
message: (
JsonMessage | TextMessage | BlobChunkMessage | BlobMessage | LogMessage | FileMessage | None | VariableMessage
)
meta: dict[str, Any] | None = None
@field_validator("message", mode="before")

View File

@ -155,9 +155,28 @@ class AnswerStreamProcessor(StreamProcessor):
for answer_node_id, route_position in self.route_position.items():
if answer_node_id not in self.rest_node_ids:
continue
# exclude current node id
# Remove current node id from answer dependencies to support stream output if it is a success branch
answer_dependencies = self.generate_routes.answer_dependencies
if event.node_id in answer_dependencies[answer_node_id]:
edge_mapping = self.graph.edge_mapping.get(event.node_id)
success_edge = (
next(
(
edge
for edge in edge_mapping
if edge.run_condition
and edge.run_condition.type == "branch_identify"
and edge.run_condition.branch_identify == "success-branch"
),
None,
)
if edge_mapping
else None
)
if (
event.node_id in answer_dependencies[answer_node_id]
and success_edge
and success_edge.target_node_id == answer_node_id
):
answer_dependencies[answer_node_id].remove(event.node_id)
answer_dependencies_ids = answer_dependencies.get(answer_node_id, [])
# all depends on answer node id not in rest node ids

View File

@ -90,6 +90,7 @@ class HttpRequestNodeData(BaseNodeData):
params: str
body: Optional[HttpRequestNodeBody] = None
timeout: Optional[HttpRequestNodeTimeout] = None
ssl_verify: Optional[bool] = dify_config.HTTP_REQUEST_NODE_SSL_VERIFY
class Response:

View File

@ -88,6 +88,7 @@ class Executor:
self.method = node_data.method
self.auth = node_data.authorization
self.timeout = timeout
self.ssl_verify = node_data.ssl_verify
self.params = []
self.headers = {}
self.content = None
@ -316,6 +317,7 @@ class Executor:
"headers": headers,
"params": self.params,
"timeout": (self.timeout.connect, self.timeout.read, self.timeout.write),
"ssl_verify": self.ssl_verify,
"follow_redirects": True,
"max_retries": self.max_retries,
}

View File

@ -51,6 +51,7 @@ class HttpRequestNode(BaseNode[HttpRequestNodeData]):
"max_read_timeout": dify_config.HTTP_REQUEST_MAX_READ_TIMEOUT,
"max_write_timeout": dify_config.HTTP_REQUEST_MAX_WRITE_TIMEOUT,
},
"ssl_verify": dify_config.HTTP_REQUEST_NODE_SSL_VERIFY,
},
"retry_config": {
"max_retries": dify_config.SSRF_DEFAULT_MAX_RETRIES,

View File

@ -149,7 +149,10 @@ class ListOperatorNode(BaseNode[ListOperatorNodeData]):
def _extract_slice(
self, variable: Union[ArrayFileSegment, ArrayNumberSegment, ArrayStringSegment]
) -> Union[ArrayFileSegment, ArrayNumberSegment, ArrayStringSegment]:
value = int(self.graph_runtime_state.variable_pool.convert_template(self.node_data.extract_by.serial).text) - 1
value = int(self.graph_runtime_state.variable_pool.convert_template(self.node_data.extract_by.serial).text)
if value < 1:
raise ValueError(f"Invalid serial index: must be >= 1, got {value}")
value -= 1
if len(variable.value) > int(value):
result = variable.value[value]
else:

View File

@ -26,9 +26,12 @@ def init_app(app: DifyApp):
# Always add StreamHandler to log to console
sh = logging.StreamHandler(sys.stdout)
sh.addFilter(RequestIdFilter())
log_handlers.append(sh)
# Apply RequestIdFilter to all handlers
for handler in log_handlers:
handler.addFilter(RequestIdFilter())
logging.basicConfig(
level=dify_config.LOG_LEVEL,
format=dify_config.LOG_FORMAT,

View File

@ -0,0 +1,18 @@
"""
Extension for initializing repositories.
This extension registers repository implementations with the RepositoryFactory.
"""
from dify_app import DifyApp
from repositories.repository_registry import register_repositories
def init_app(_app: DifyApp) -> None:
"""
Initialize repository implementations.
Args:
_app: The Flask application instance (unused)
"""
register_repositories()

View File

@ -73,11 +73,7 @@ class Storage:
raise ValueError(f"unsupported storage type {storage_type}")
def save(self, filename, data):
try:
self.storage_runner.save(filename, data)
except Exception as e:
logger.exception(f"Failed to save file {filename}")
raise e
self.storage_runner.save(filename, data)
@overload
def load(self, filename: str, /, *, stream: Literal[False] = False) -> bytes: ...
@ -86,49 +82,25 @@ class Storage:
def load(self, filename: str, /, *, stream: Literal[True]) -> Generator: ...
def load(self, filename: str, /, *, stream: bool = False) -> Union[bytes, Generator]:
try:
if stream:
return self.load_stream(filename)
else:
return self.load_once(filename)
except Exception as e:
logger.exception(f"Failed to load file {filename}")
raise e
if stream:
return self.load_stream(filename)
else:
return self.load_once(filename)
def load_once(self, filename: str) -> bytes:
try:
return self.storage_runner.load_once(filename)
except Exception as e:
logger.exception(f"Failed to load_once file {filename}")
raise e
return self.storage_runner.load_once(filename)
def load_stream(self, filename: str) -> Generator:
try:
return self.storage_runner.load_stream(filename)
except Exception as e:
logger.exception(f"Failed to load_stream file {filename}")
raise e
return self.storage_runner.load_stream(filename)
def download(self, filename, target_filepath):
try:
self.storage_runner.download(filename, target_filepath)
except Exception as e:
logger.exception(f"Failed to download file {filename}")
raise e
self.storage_runner.download(filename, target_filepath)
def exists(self, filename):
try:
return self.storage_runner.exists(filename)
except Exception as e:
logger.exception(f"Failed to check file exists {filename}")
raise e
return self.storage_runner.exists(filename)
def delete(self, filename):
try:
return self.storage_runner.delete(filename)
except Exception as e:
logger.exception(f"Failed to delete file {filename}")
raise e
return self.storage_runner.delete(filename)
storage = Storage()

View File

@ -52,6 +52,7 @@ def build_from_mapping(
mapping: Mapping[str, Any],
tenant_id: str,
config: FileUploadConfig | None = None,
strict_type_validation: bool = False,
) -> File:
transfer_method = FileTransferMethod.value_of(mapping.get("transfer_method"))
@ -69,6 +70,7 @@ def build_from_mapping(
mapping=mapping,
tenant_id=tenant_id,
transfer_method=transfer_method,
strict_type_validation=strict_type_validation,
)
if config and not _is_file_valid_with_config(
@ -87,12 +89,14 @@ def build_from_mappings(
mappings: Sequence[Mapping[str, Any]],
config: FileUploadConfig | None = None,
tenant_id: str,
strict_type_validation: bool = False,
) -> Sequence[File]:
files = [
build_from_mapping(
mapping=mapping,
tenant_id=tenant_id,
config=config,
strict_type_validation=strict_type_validation,
)
for mapping in mappings
]
@ -116,6 +120,7 @@ def _build_from_local_file(
mapping: Mapping[str, Any],
tenant_id: str,
transfer_method: FileTransferMethod,
strict_type_validation: bool = False,
) -> File:
upload_file_id = mapping.get("upload_file_id")
if not upload_file_id:
@ -134,10 +139,16 @@ def _build_from_local_file(
if row is None:
raise ValueError("Invalid upload file")
file_type = _standardize_file_type(extension="." + row.extension, mime_type=row.mime_type)
if file_type.value != mapping.get("type", "custom"):
detected_file_type = _standardize_file_type(extension="." + row.extension, mime_type=row.mime_type)
specified_type = mapping.get("type", "custom")
if strict_type_validation and detected_file_type.value != specified_type:
raise ValueError("Detected file type does not match the specified type. Please verify the file.")
file_type = (
FileType(specified_type) if specified_type and specified_type != FileType.CUSTOM.value else detected_file_type
)
return File(
id=mapping.get("id"),
filename=row.name,
@ -158,6 +169,7 @@ def _build_from_remote_url(
mapping: Mapping[str, Any],
tenant_id: str,
transfer_method: FileTransferMethod,
strict_type_validation: bool = False,
) -> File:
upload_file_id = mapping.get("upload_file_id")
if upload_file_id:
@ -174,10 +186,21 @@ def _build_from_remote_url(
if upload_file is None:
raise ValueError("Invalid upload file")
file_type = _standardize_file_type(extension="." + upload_file.extension, mime_type=upload_file.mime_type)
if file_type.value != mapping.get("type", "custom"):
detected_file_type = _standardize_file_type(
extension="." + upload_file.extension, mime_type=upload_file.mime_type
)
specified_type = mapping.get("type")
if strict_type_validation and specified_type and detected_file_type.value != specified_type:
raise ValueError("Detected file type does not match the specified type. Please verify the file.")
file_type = (
FileType(specified_type)
if specified_type and specified_type != FileType.CUSTOM.value
else detected_file_type
)
return File(
id=mapping.get("id"),
filename=upload_file.name,
@ -237,6 +260,7 @@ def _build_from_tool_file(
mapping: Mapping[str, Any],
tenant_id: str,
transfer_method: FileTransferMethod,
strict_type_validation: bool = False,
) -> File:
tool_file = (
db.session.query(ToolFile)
@ -252,7 +276,16 @@ def _build_from_tool_file(
extension = "." + tool_file.file_key.split(".")[-1] if "." in tool_file.file_key else ".bin"
file_type = _standardize_file_type(extension=extension, mime_type=tool_file.mimetype)
detected_file_type = _standardize_file_type(extension="." + extension, mime_type=tool_file.mimetype)
specified_type = mapping.get("type")
if strict_type_validation and specified_type and detected_file_type.value != specified_type:
raise ValueError("Detected file type does not match the specified type. Please verify the file.")
file_type = (
FileType(specified_type) if specified_type and specified_type != FileType.CUSTOM.value else detected_file_type
)
return File(
id=mapping.get("id"),

View File

@ -42,6 +42,7 @@ message_file_fields = {
"size": fields.Integer,
"transfer_method": fields.String,
"belongs_to": fields.String(default="user"),
"upload_file_id": fields.String(default=None),
}
agent_thought_fields = {

View File

@ -1091,12 +1091,7 @@ class Message(db.Model): # type: ignore[name-defined]
@property
def retriever_resources(self):
return (
db.session.query(DatasetRetrieverResource)
.filter(DatasetRetrieverResource.message_id == self.id)
.order_by(DatasetRetrieverResource.position.asc())
.all()
)
return self.message_metadata_dict.get("retriever_resources") if self.message_metadata else []
@property
def message_files(self):
@ -1155,7 +1150,7 @@ class Message(db.Model): # type: ignore[name-defined]
files.append(file)
result = [
{"belongs_to": message_file.belongs_to, **file.to_dict()}
{"belongs_to": message_file.belongs_to, "upload_file_id": message_file.upload_file_id, **file.to_dict()}
for (file, message_file) in zip(files, message_files)
]

View File

@ -510,7 +510,7 @@ class WorkflowRun(Base):
)
class WorkflowNodeExecutionTriggeredFrom(Enum):
class WorkflowNodeExecutionTriggeredFrom(StrEnum):
"""
Workflow Node Execution Triggered From Enum
"""
@ -518,21 +518,8 @@ class WorkflowNodeExecutionTriggeredFrom(Enum):
SINGLE_STEP = "single-step"
WORKFLOW_RUN = "workflow-run"
@classmethod
def value_of(cls, value: str) -> "WorkflowNodeExecutionTriggeredFrom":
"""
Get value of given mode.
:param value: mode value
:return: mode
"""
for mode in cls:
if mode.value == value:
return mode
raise ValueError(f"invalid workflow node execution triggered from value {value}")
class WorkflowNodeExecutionStatus(Enum):
class WorkflowNodeExecutionStatus(StrEnum):
"""
Workflow Node Execution Status Enum
"""
@ -543,19 +530,6 @@ class WorkflowNodeExecutionStatus(Enum):
EXCEPTION = "exception"
RETRY = "retry"
@classmethod
def value_of(cls, value: str) -> "WorkflowNodeExecutionStatus":
"""
Get value of given mode.
:param value: mode value
:return: mode
"""
for mode in cls:
if mode.value == value:
return mode
raise ValueError(f"invalid workflow node execution status value {value}")
class WorkflowNodeExecution(Base):
"""

10583
api/poetry.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -1,4 +0,0 @@
[virtualenvs]
in-project = true
create = true
prefer-active-python = true

View File

@ -1,215 +1,197 @@
[project]
name = "dify-api"
version = "1.2.0"
requires-python = ">=3.11,<3.13"
dynamic = ["dependencies"]
[build-system]
requires = ["poetry-core>=2.0.0"]
build-backend = "poetry.core.masonry.api"
dependencies = [
"authlib==1.3.1",
"azure-identity==1.16.1",
"beautifulsoup4==4.12.2",
"boto3==1.35.99",
"bs4~=0.0.1",
"cachetools~=5.3.0",
"celery~=5.4.0",
"chardet~=5.1.0",
"flask~=3.1.0",
"flask-compress~=1.17",
"flask-cors~=4.0.0",
"flask-login~=0.6.3",
"flask-migrate~=4.0.7",
"flask-restful~=0.3.10",
"flask-sqlalchemy~=3.1.1",
"gevent~=24.11.1",
"gmpy2~=2.2.1",
"google-api-core==2.18.0",
"google-api-python-client==2.90.0",
"google-auth==2.29.0",
"google-auth-httplib2==0.2.0",
"google-cloud-aiplatform==1.49.0",
"googleapis-common-protos==1.63.0",
"gunicorn~=23.0.0",
"httpx[socks]~=0.27.0",
"jieba==0.42.1",
"langfuse~=2.51.3",
"langsmith~=0.1.77",
"mailchimp-transactional~=1.0.50",
"markdown~=3.5.1",
"numpy~=1.26.4",
"oci~=2.135.1",
"openai~=1.61.0",
"openpyxl~=3.1.5",
"opik~=1.3.4",
"opentelemetry-api==1.27.0",
"opentelemetry-distro==0.48b0",
"opentelemetry-exporter-otlp==1.27.0",
"opentelemetry-exporter-otlp-proto-common==1.27.0",
"opentelemetry-exporter-otlp-proto-grpc==1.27.0",
"opentelemetry-exporter-otlp-proto-http==1.27.0",
"opentelemetry-instrumentation==0.48b0",
"opentelemetry-instrumentation-celery==0.48b0",
"opentelemetry-instrumentation-flask==0.48b0",
"opentelemetry-instrumentation-sqlalchemy==0.48b0",
"opentelemetry-propagator-b3==1.27.0",
# opentelemetry-proto1.28.0 depends on protobuf (>=5.0,<6.0),
# which is conflict with googleapis-common-protos (1.63.0)
"opentelemetry-proto==1.27.0",
"opentelemetry-sdk==1.27.0",
"opentelemetry-semantic-conventions==0.48b0",
"opentelemetry-util-http==0.48b0",
"pandas-stubs~=2.2.3.241009",
"pandas[excel,output-formatting,performance]~=2.2.2",
"pandoc~=2.4",
"psycogreen~=1.0.2",
"psycopg2-binary~=2.9.6",
"pycryptodome==3.19.1",
"pydantic~=2.9.2",
"pydantic-extra-types~=2.9.0",
"pydantic-settings~=2.6.0",
"pyjwt~=2.8.0",
"pypdfium2~=4.30.0",
"python-docx~=1.1.0",
"python-dotenv==1.0.1",
"pyyaml~=6.0.1",
"readabilipy==0.2.0",
"redis[hiredis]~=5.0.3",
"resend~=0.7.0",
"sentry-sdk[flask]~=1.44.1",
"sqlalchemy~=2.0.29",
"starlette==0.41.0",
"tiktoken~=0.8.0",
"tokenizers~=0.15.0",
"transformers~=4.35.0",
"unstructured[docx,epub,md,ppt,pptx]~=0.16.1",
"validators==0.21.0",
"yarl~=1.18.3",
]
# Before adding new dependency, consider place it in
# alphabet order (a-z) and suitable group.
[tool.poetry]
package-mode = false
[tool.uv]
default-groups = ["storage", "tools", "vdb"]
############################################################
# [ Main ] Dependency group
############################################################
[tool.poetry.dependencies]
authlib = "1.3.1"
azure-identity = "1.16.1"
beautifulsoup4 = "4.12.2"
boto3 = "1.35.99"
bs4 = "~0.0.1"
cachetools = "~5.3.0"
celery = "~5.4.0"
chardet = "~5.1.0"
flask = "~3.1.0"
flask-compress = "~1.17"
flask-cors = "~4.0.0"
flask-login = "~0.6.3"
flask-migrate = "~4.0.7"
flask-restful = "~0.3.10"
flask-sqlalchemy = "~3.1.1"
gevent = "~24.11.1"
gmpy2 = "~2.2.1"
google-api-core = "2.18.0"
google-api-python-client = "2.90.0"
google-auth = "2.29.0"
google-auth-httplib2 = "0.2.0"
google-cloud-aiplatform = "1.49.0"
googleapis-common-protos = "1.63.0"
gunicorn = "~23.0.0"
httpx = { version = "~0.27.0", extras = ["socks"] }
jieba = "0.42.1"
langfuse = "~2.51.3"
langsmith = "~0.1.77"
mailchimp-transactional = "~1.0.50"
markdown = "~3.5.1"
numpy = "~1.26.4"
oci = "~2.135.1"
openai = "~1.61.0"
openpyxl = "~3.1.5"
opentelemetry-api = "1.27.0"
opentelemetry-distro = "0.48b0"
opentelemetry-exporter-otlp = "1.27.0"
opentelemetry-exporter-otlp-proto-common = "1.27.0"
opentelemetry-exporter-otlp-proto-grpc = "1.27.0"
opentelemetry-exporter-otlp-proto-http = "1.27.0"
opentelemetry-instrumentation = "0.48b0"
opentelemetry-instrumentation-celery = "0.48b0"
opentelemetry-instrumentation-flask = "0.48b0"
opentelemetry-instrumentation-sqlalchemy = "0.48b0"
opentelemetry-propagator-b3 = "1.27.0"
opentelemetry-proto = "1.27.0" # 1.28.0 depends on protobuf (>=5.0,<6.0), conflict with googleapis-common-protos (1.63.0)
opentelemetry-sdk = "1.27.0"
opentelemetry-semantic-conventions = "0.48b0"
opentelemetry-util-http = "0.48b0"
opik = "~1.3.4"
pandas = { version = "~2.2.2", extras = ["performance", "excel", "output-formatting"] }
pandas-stubs = "~2.2.3.241009"
pandoc = "~2.4"
psycogreen = "~1.0.2"
psycopg2-binary = "~2.9.6"
pycryptodome = "3.19.1"
pydantic = "~2.9.2"
pydantic-settings = "~2.6.0"
pydantic_extra_types = "~2.9.0"
pyjwt = "~2.8.0"
pypdfium2 = "~4.30.0"
python = ">=3.11,<3.13"
python-docx = "~1.1.0"
python-dotenv = "1.0.1"
pyyaml = "~6.0.1"
readabilipy = "0.2.0"
redis = { version = "~5.0.3", extras = ["hiredis"] }
resend = "~0.7.0"
sentry-sdk = { version = "~1.44.1", extras = ["flask"] }
sqlalchemy = "~2.0.29"
starlette = "0.41.0"
tiktoken = "~0.8.0"
tokenizers = "~0.15.0"
transformers = "~4.35.0"
unstructured = { version = "~0.16.1", extras = ["docx", "epub", "md", "ppt", "pptx"] }
validators = "0.21.0"
yarl = "~1.18.3"
# Before adding new dependency, consider place it in alphabet order (a-z) and suitable group.
############################################################
# [ Indirect ] dependency group
# Related transparent dependencies with pinned version
# required by main implementations
############################################################
[tool.poetry.group.indirect.dependencies]
kaleido = "0.2.1"
rank-bm25 = "~0.2.2"
safetensors = "~0.4.3"
############################################################
# [ Tools ] dependency group
############################################################
[tool.poetry.group.tools.dependencies]
cloudscraper = "1.2.71"
nltk = "3.9.1"
############################################################
# [ Storage ] dependency group
# Required for storage clients
############################################################
[tool.poetry.group.storage.dependencies]
azure-storage-blob = "12.13.0"
bce-python-sdk = "~0.9.23"
cos-python-sdk-v5 = "1.9.30"
esdk-obs-python = "3.24.6.1"
google-cloud-storage = "2.16.0"
opendal = "~0.45.16"
oss2 = "2.18.5"
supabase = "~2.8.1"
tos = "~2.7.1"
############################################################
# [ VDB ] dependency group
# Required by vector store clients
############################################################
[tool.poetry.group.vdb.dependencies]
alibabacloud_gpdb20160503 = "~3.8.0"
alibabacloud_tea_openapi = "~0.3.9"
chromadb = "0.5.20"
clickhouse-connect = "~0.7.16"
couchbase = "~4.3.0"
elasticsearch = "8.14.0"
opensearch-py = "2.4.0"
oracledb = "~2.2.1"
pgvecto-rs = { version = "~0.2.1", extras = ['sqlalchemy'] }
pgvector = "0.2.5"
pymilvus = "~2.5.0"
pymochow = "1.3.1"
pyobvector = "~0.1.6"
qdrant-client = "1.7.3"
tablestore = "6.1.0"
tcvectordb = "~1.6.4"
tidb-vector = "0.0.9"
upstash-vector = "0.6.0"
volcengine-compat = "~1.0.156"
weaviate-client = "~3.21.0"
xinference-client = "~1.2.2"
[dependency-groups]
############################################################
# [ Dev ] dependency group
# Required for development and running tests
############################################################
[tool.poetry.group.dev]
optional = true
[tool.poetry.group.dev.dependencies]
coverage = "~7.2.4"
faker = "~32.1.0"
lxml-stubs = "~0.5.1"
mypy = "~1.15.0"
pytest = "~8.3.2"
pytest-benchmark = "~4.0.0"
pytest-env = "~1.1.3"
pytest-mock = "~3.14.0"
types-aiofiles = "~24.1.0"
types-beautifulsoup4 = "~4.12.0"
types-cachetools = "~5.5.0"
types-colorama = "~0.4.15"
types-defusedxml = "~0.7.0"
types-deprecated = "~1.2.15"
types-docutils = "~0.21.0"
types-flask-cors = "~5.0.0"
types-flask-migrate = "~4.1.0"
types-gevent = "~24.11.0"
types-greenlet = "~3.1.0"
types-html5lib = "~1.1.11"
types-markdown = "~3.7.0"
types-oauthlib = "~3.2.0"
types-objgraph = "~3.6.0"
types-olefile = "~0.47.0"
types-openpyxl = "~3.1.5"
types-pexpect = "~4.9.0"
types-protobuf = "~5.29.1"
types-psutil = "~7.0.0"
types-psycopg2 = "~2.9.21"
types-pygments = "~2.19.0"
types-pymysql = "~1.1.0"
types-python-dateutil = "~2.9.0"
types-pywin32 = "~310.0.0"
types-pyyaml = "~6.0.12"
types-regex = "~2024.11.6"
types-requests = "~2.32.0"
types-requests-oauthlib = "~2.0.0"
types-shapely = "~2.0.0"
types-simplejson = "~3.20.0"
types-six = "~1.17.0"
types-tensorflow = "~2.18.0"
types-tqdm = "~4.67.0"
types-ujson = "~5.10.0"
dev = [
"coverage~=7.2.4",
"dotenv-linter~=0.5.0",
"faker~=32.1.0",
"lxml-stubs~=0.5.1",
"mypy~=1.15.0",
"ruff~=0.11.5",
"pytest~=8.3.2",
"pytest-benchmark~=4.0.0",
"pytest-cov~=4.1.0",
"pytest-env~=1.1.3",
"pytest-mock~=3.14.0",
"types-aiofiles~=24.1.0",
"types-beautifulsoup4~=4.12.0",
"types-cachetools~=5.5.0",
"types-colorama~=0.4.15",
"types-defusedxml~=0.7.0",
"types-deprecated~=1.2.15",
"types-docutils~=0.21.0",
"types-flask-cors~=5.0.0",
"types-flask-migrate~=4.1.0",
"types-gevent~=24.11.0",
"types-greenlet~=3.1.0",
"types-html5lib~=1.1.11",
"types-markdown~=3.7.0",
"types-oauthlib~=3.2.0",
"types-objgraph~=3.6.0",
"types-olefile~=0.47.0",
"types-openpyxl~=3.1.5",
"types-pexpect~=4.9.0",
"types-protobuf~=5.29.1",
"types-psutil~=7.0.0",
"types-psycopg2~=2.9.21",
"types-pygments~=2.19.0",
"types-pymysql~=1.1.0",
"types-python-dateutil~=2.9.0",
"types-pywin32~=310.0.0",
"types-pyyaml~=6.0.12",
"types-regex~=2024.11.6",
"types-requests~=2.32.0",
"types-requests-oauthlib~=2.0.0",
"types-shapely~=2.0.0",
"types-simplejson~=3.20.0",
"types-six~=1.17.0",
"types-tensorflow~=2.18.0",
"types-tqdm~=4.67.0",
"types-ujson~=5.10.0",
]
############################################################
# [ Lint ] dependency group
# Required for code style linting
# [ Storage ] dependency group
# Required for storage clients
############################################################
[tool.poetry.group.lint]
optional = true
[tool.poetry.group.lint.dependencies]
dotenv-linter = "~0.5.0"
ruff = "~0.11.0"
storage = [
"azure-storage-blob==12.13.0",
"bce-python-sdk~=0.9.23",
"cos-python-sdk-v5==1.9.30",
"esdk-obs-python==3.24.6.1",
"google-cloud-storage==2.16.0",
"opendal~=0.45.16",
"oss2==2.18.5",
"supabase~=2.8.1",
"tos~=2.7.1",
]
############################################################
# [ Tools ] dependency group
############################################################
tools = [
"cloudscraper~=1.2.71",
"nltk~=3.9.1",
]
############################################################
# [ VDB ] dependency group
# Required by vector store clients
############################################################
vdb = [
"alibabacloud_gpdb20160503~=3.8.0",
"alibabacloud_tea_openapi~=0.3.9",
"chromadb==0.5.20",
"clickhouse-connect~=0.7.16",
"couchbase~=4.3.0",
"elasticsearch==8.14.0",
"opensearch-py==2.4.0",
"oracledb~=2.2.1",
"pgvecto-rs[sqlalchemy]~=0.2.1",
"pgvector==0.2.5",
"pymilvus~=2.5.0",
"pymochow==1.3.1",
"pyobvector~=0.1.6",
"qdrant-client==1.7.3",
"tablestore==6.1.0",
"tcvectordb~=1.6.4",
"tidb-vector==0.0.9",
"upstash-vector==0.6.0",
"volcengine-compat~=1.0.156",
"weaviate-client~=3.21.0",
"xinference-client~=1.2.2",
]

View File

@ -1,5 +1,6 @@
[pytest]
continue-on-collection-errors = true
addopts = --cov=./api --cov-report=json --cov-report=xml
env =
ANTHROPIC_API_KEY = sk-ant-api11-IamNotARealKeyJustForMockTestKawaiiiiiiiiii-NotBaka-ASkksz
AZURE_OPENAI_API_BASE = https://difyai-openai.openai.azure.com

View File

@ -0,0 +1,6 @@
"""
Repository implementations for data access.
This package contains concrete implementations of the repository interfaces
defined in the core.repository package.
"""

View File

@ -0,0 +1,87 @@
"""
Registry for repository implementations.
This module is responsible for registering factory functions with the repository factory.
"""
import logging
from collections.abc import Mapping
from typing import Any
from sqlalchemy.orm import sessionmaker
from configs import dify_config
from core.repository.repository_factory import RepositoryFactory
from extensions.ext_database import db
from repositories.workflow_node_execution import SQLAlchemyWorkflowNodeExecutionRepository
logger = logging.getLogger(__name__)
# Storage type constants
STORAGE_TYPE_RDBMS = "rdbms"
STORAGE_TYPE_HYBRID = "hybrid"
def register_repositories() -> None:
"""
Register repository factory functions with the RepositoryFactory.
This function reads configuration settings to determine which repository
implementations to register.
"""
# Configure WorkflowNodeExecutionRepository factory based on configuration
workflow_node_execution_storage = dify_config.WORKFLOW_NODE_EXECUTION_STORAGE
# Check storage type and register appropriate implementation
if workflow_node_execution_storage == STORAGE_TYPE_RDBMS:
# Register SQLAlchemy implementation for RDBMS storage
logger.info("Registering WorkflowNodeExecution repository with RDBMS storage")
RepositoryFactory.register_workflow_node_execution_factory(create_workflow_node_execution_repository)
elif workflow_node_execution_storage == STORAGE_TYPE_HYBRID:
# Hybrid storage is not yet implemented
raise NotImplementedError("Hybrid storage for WorkflowNodeExecution repository is not yet implemented")
else:
# Unknown storage type
raise ValueError(
f"Unknown storage type '{workflow_node_execution_storage}' for WorkflowNodeExecution repository. "
f"Supported types: {STORAGE_TYPE_RDBMS}"
)
def create_workflow_node_execution_repository(params: Mapping[str, Any]) -> SQLAlchemyWorkflowNodeExecutionRepository:
"""
Create a WorkflowNodeExecutionRepository instance using SQLAlchemy implementation.
This factory function creates a repository for the RDBMS storage type.
Args:
params: Parameters for creating the repository, including:
- tenant_id: Required. The tenant ID for multi-tenancy.
- app_id: Optional. The application ID for filtering.
- session_factory: Optional. A SQLAlchemy sessionmaker instance. If not provided,
a new sessionmaker will be created using the global database engine.
Returns:
A WorkflowNodeExecutionRepository instance
Raises:
ValueError: If required parameters are missing
"""
# Extract required parameters
tenant_id = params.get("tenant_id")
if tenant_id is None:
raise ValueError("tenant_id is required for WorkflowNodeExecution repository with RDBMS storage")
# Extract optional parameters
app_id = params.get("app_id")
# Use the session_factory from params if provided, otherwise create one using the global db engine
session_factory = params.get("session_factory")
if session_factory is None:
# Create a sessionmaker using the same engine as the global db session
session_factory = sessionmaker(bind=db.engine)
# Create and return the repository
return SQLAlchemyWorkflowNodeExecutionRepository(
session_factory=session_factory, tenant_id=tenant_id, app_id=app_id
)

View File

@ -0,0 +1,9 @@
"""
WorkflowNodeExecution repository implementations.
"""
from repositories.workflow_node_execution.sqlalchemy_repository import SQLAlchemyWorkflowNodeExecutionRepository
__all__ = [
"SQLAlchemyWorkflowNodeExecutionRepository",
]

View File

@ -0,0 +1,170 @@
"""
SQLAlchemy implementation of the WorkflowNodeExecutionRepository.
"""
import logging
from collections.abc import Sequence
from typing import Optional
from sqlalchemy import UnaryExpression, asc, desc, select
from sqlalchemy.engine import Engine
from sqlalchemy.orm import sessionmaker
from core.repository.workflow_node_execution_repository import OrderConfig
from models.workflow import WorkflowNodeExecution, WorkflowNodeExecutionStatus, WorkflowNodeExecutionTriggeredFrom
logger = logging.getLogger(__name__)
class SQLAlchemyWorkflowNodeExecutionRepository:
"""
SQLAlchemy implementation of the WorkflowNodeExecutionRepository interface.
This implementation supports multi-tenancy by filtering operations based on tenant_id.
Each method creates its own session, handles the transaction, and commits changes
to the database. This prevents long-running connections in the workflow core.
"""
def __init__(self, session_factory: sessionmaker | Engine, tenant_id: str, app_id: Optional[str] = None):
"""
Initialize the repository with a SQLAlchemy sessionmaker or engine and tenant context.
Args:
session_factory: SQLAlchemy sessionmaker or engine for creating sessions
tenant_id: Tenant ID for multi-tenancy
app_id: Optional app ID for filtering by application
"""
# If an engine is provided, create a sessionmaker from it
if isinstance(session_factory, Engine):
self._session_factory = sessionmaker(bind=session_factory, expire_on_commit=False)
else:
self._session_factory = session_factory
self._tenant_id = tenant_id
self._app_id = app_id
def save(self, execution: WorkflowNodeExecution) -> None:
"""
Save a WorkflowNodeExecution instance and commit changes to the database.
Args:
execution: The WorkflowNodeExecution instance to save
"""
with self._session_factory() as session:
# Ensure tenant_id is set
if not execution.tenant_id:
execution.tenant_id = self._tenant_id
# Set app_id if provided and not already set
if self._app_id and not execution.app_id:
execution.app_id = self._app_id
session.add(execution)
session.commit()
def get_by_node_execution_id(self, node_execution_id: str) -> Optional[WorkflowNodeExecution]:
"""
Retrieve a WorkflowNodeExecution by its node_execution_id.
Args:
node_execution_id: The node execution ID
Returns:
The WorkflowNodeExecution instance if found, None otherwise
"""
with self._session_factory() as session:
stmt = select(WorkflowNodeExecution).where(
WorkflowNodeExecution.node_execution_id == node_execution_id,
WorkflowNodeExecution.tenant_id == self._tenant_id,
)
if self._app_id:
stmt = stmt.where(WorkflowNodeExecution.app_id == self._app_id)
return session.scalar(stmt)
def get_by_workflow_run(
self,
workflow_run_id: str,
order_config: Optional[OrderConfig] = None,
) -> Sequence[WorkflowNodeExecution]:
"""
Retrieve all WorkflowNodeExecution instances for a specific workflow run.
Args:
workflow_run_id: The workflow run ID
order_config: Optional configuration for ordering results
order_config.order_by: List of fields to order by (e.g., ["index", "created_at"])
order_config.order_direction: Direction to order ("asc" or "desc")
Returns:
A list of WorkflowNodeExecution instances
"""
with self._session_factory() as session:
stmt = select(WorkflowNodeExecution).where(
WorkflowNodeExecution.workflow_run_id == workflow_run_id,
WorkflowNodeExecution.tenant_id == self._tenant_id,
WorkflowNodeExecution.triggered_from == WorkflowNodeExecutionTriggeredFrom.WORKFLOW_RUN,
)
if self._app_id:
stmt = stmt.where(WorkflowNodeExecution.app_id == self._app_id)
# Apply ordering if provided
if order_config and order_config.order_by:
order_columns: list[UnaryExpression] = []
for field in order_config.order_by:
column = getattr(WorkflowNodeExecution, field, None)
if not column:
continue
if order_config.order_direction == "desc":
order_columns.append(desc(column))
else:
order_columns.append(asc(column))
if order_columns:
stmt = stmt.order_by(*order_columns)
return session.scalars(stmt).all()
def get_running_executions(self, workflow_run_id: str) -> Sequence[WorkflowNodeExecution]:
"""
Retrieve all running WorkflowNodeExecution instances for a specific workflow run.
Args:
workflow_run_id: The workflow run ID
Returns:
A list of running WorkflowNodeExecution instances
"""
with self._session_factory() as session:
stmt = select(WorkflowNodeExecution).where(
WorkflowNodeExecution.workflow_run_id == workflow_run_id,
WorkflowNodeExecution.tenant_id == self._tenant_id,
WorkflowNodeExecution.status == WorkflowNodeExecutionStatus.RUNNING,
WorkflowNodeExecution.triggered_from == WorkflowNodeExecutionTriggeredFrom.WORKFLOW_RUN,
)
if self._app_id:
stmt = stmt.where(WorkflowNodeExecution.app_id == self._app_id)
return session.scalars(stmt).all()
def update(self, execution: WorkflowNodeExecution) -> None:
"""
Update an existing WorkflowNodeExecution instance and commit changes to the database.
Args:
execution: The WorkflowNodeExecution instance to update
"""
with self._session_factory() as session:
# Ensure tenant_id is set
if not execution.tenant_id:
execution.tenant_id = self._tenant_id
# Set app_id if provided and not already set
if self._app_id and not execution.app_id:
execution.app_id = self._app_id
session.merge(execution)
session.commit()

View File

@ -553,7 +553,7 @@ class DocumentService:
{"id": "remove_extra_spaces", "enabled": True},
{"id": "remove_urls_emails", "enabled": False},
],
"segmentation": {"delimiter": "\n", "max_tokens": 500, "chunk_overlap": 50},
"segmentation": {"delimiter": "\n", "max_tokens": 1024, "chunk_overlap": 50},
},
"limits": {
"indexing_max_segmentation_tokens_length": dify_config.INDEXING_MAX_SEGMENTATION_TOKENS_LENGTH,
@ -2025,7 +2025,7 @@ class SegmentService:
dataset_id=dataset.id,
document_id=document.id,
segment_id=segment.id,
position=max_position + 1,
position=max_position + 1 if max_position else 1,
index_node_id=index_node_id,
index_node_hash=index_node_hash,
content=content,
@ -2175,7 +2175,13 @@ class SegmentService:
@classmethod
def get_segments(
cls, document_id: str, tenant_id: str, status_list: list[str] | None = None, keyword: str | None = None
cls,
document_id: str,
tenant_id: str,
status_list: list[str] | None = None,
keyword: str | None = None,
page: int = 1,
limit: int = 20,
):
"""Get segments for a document with optional filtering."""
query = DocumentSegment.query.filter(
@ -2188,10 +2194,11 @@ class SegmentService:
if keyword:
query = query.filter(DocumentSegment.content.ilike(f"%{keyword}%"))
segments = query.order_by(DocumentSegment.position.asc()).all()
total = len(segments)
paginated_segments = query.order_by(DocumentSegment.position.asc()).paginate(
page=page, per_page=limit, max_per_page=100, error_out=False
)
return segments, total
return paginated_segments.items, paginated_segments.total
@classmethod
def update_segment_by_id(

View File

@ -1,3 +1,4 @@
from configs import dify_config
from core.helper import marketplace
from core.plugin.entities.plugin import ModelProviderID, PluginDependency, PluginInstallationSource, ToolProviderID
from core.plugin.manager.plugin import PluginInstallationManager
@ -111,6 +112,8 @@ class DependenciesAnalysisService:
Generate the latest version of dependencies
"""
dependencies = list(set(dependencies))
if not dify_config.MARKETPLACE_ENABLED:
return []
deps = marketplace.batch_fetch_plugin_manifests(dependencies)
return [
PluginDependency(

View File

@ -1,49 +0,0 @@
from typing import Any
import toml # type: ignore
def load_api_poetry_configs() -> dict[str, Any]:
pyproject_toml = toml.load("api/pyproject.toml")
return pyproject_toml["tool"]["poetry"]
def load_all_dependency_groups() -> dict[str, dict[str, dict[str, Any]]]:
configs = load_api_poetry_configs()
configs_by_group = {"main": configs}
for group_name in configs["group"]:
configs_by_group[group_name] = configs["group"][group_name]
dependencies_by_group = {group_name: base["dependencies"] for group_name, base in configs_by_group.items()}
return dependencies_by_group
def test_group_dependencies_sorted():
for group_name, dependencies in load_all_dependency_groups().items():
dependency_names = list(dependencies.keys())
expected_dependency_names = sorted(set(dependency_names))
section = f"tool.poetry.group.{group_name}.dependencies" if group_name else "tool.poetry.dependencies"
assert expected_dependency_names == dependency_names, (
f"Dependencies in group {group_name} are not sorted. "
f"Check and fix [{section}] section in pyproject.toml file"
)
def test_group_dependencies_version_operator():
for group_name, dependencies in load_all_dependency_groups().items():
for dependency_name, specification in dependencies.items():
version_spec = specification if isinstance(specification, str) else specification["version"]
assert not version_spec.startswith("^"), (
f"Please replace '{dependency_name} = {version_spec}' with '{dependency_name} = ~{version_spec[1:]}' "
f"'^' operator is too wide and not allowed in the version specification."
)
def test_duplicated_dependency_crossing_groups() -> None:
all_dependency_names: list[str] = []
for dependencies in load_all_dependency_groups().values():
dependency_names = list(dependencies.keys())
all_dependency_names.extend(dependency_names)
expected_all_dependency_names = set(all_dependency_names)
assert sorted(expected_all_dependency_names) == sorted(all_dependency_names), (
"Duplicated dependencies crossing groups are found"
)

View File

@ -0,0 +1,99 @@
from unittest.mock import MagicMock, patch
from core.model_runtime.entities.message_entities import AssistantPromptMessage
from core.model_runtime.model_providers.__base.large_language_model import _increase_tool_call
ToolCall = AssistantPromptMessage.ToolCall
# CASE 1: Single tool call
INPUTS_CASE_1 = [
ToolCall(id="1", type="function", function=ToolCall.ToolCallFunction(name="func_foo", arguments="")),
ToolCall(id="", type="function", function=ToolCall.ToolCallFunction(name="", arguments='{"arg1": ')),
ToolCall(id="", type="function", function=ToolCall.ToolCallFunction(name="", arguments='"value"}')),
]
EXPECTED_CASE_1 = [
ToolCall(
id="1", type="function", function=ToolCall.ToolCallFunction(name="func_foo", arguments='{"arg1": "value"}')
),
]
# CASE 2: Tool call sequences where IDs are anchored to the first chunk (vLLM/SiliconFlow ...)
INPUTS_CASE_2 = [
ToolCall(id="1", type="function", function=ToolCall.ToolCallFunction(name="func_foo", arguments="")),
ToolCall(id="", type="function", function=ToolCall.ToolCallFunction(name="", arguments='{"arg1": ')),
ToolCall(id="", type="function", function=ToolCall.ToolCallFunction(name="", arguments='"value"}')),
ToolCall(id="2", type="function", function=ToolCall.ToolCallFunction(name="func_bar", arguments="")),
ToolCall(id="", type="function", function=ToolCall.ToolCallFunction(name="", arguments='{"arg2": ')),
ToolCall(id="", type="function", function=ToolCall.ToolCallFunction(name="", arguments='"value"}')),
]
EXPECTED_CASE_2 = [
ToolCall(
id="1", type="function", function=ToolCall.ToolCallFunction(name="func_foo", arguments='{"arg1": "value"}')
),
ToolCall(
id="2", type="function", function=ToolCall.ToolCallFunction(name="func_bar", arguments='{"arg2": "value"}')
),
]
# CASE 3: Tool call sequences where IDs are anchored to every chunk (SGLang ...)
INPUTS_CASE_3 = [
ToolCall(id="1", type="function", function=ToolCall.ToolCallFunction(name="func_foo", arguments="")),
ToolCall(id="1", type="function", function=ToolCall.ToolCallFunction(name="", arguments='{"arg1": ')),
ToolCall(id="1", type="function", function=ToolCall.ToolCallFunction(name="", arguments='"value"}')),
ToolCall(id="2", type="function", function=ToolCall.ToolCallFunction(name="func_bar", arguments="")),
ToolCall(id="2", type="function", function=ToolCall.ToolCallFunction(name="", arguments='{"arg2": ')),
ToolCall(id="2", type="function", function=ToolCall.ToolCallFunction(name="", arguments='"value"}')),
]
EXPECTED_CASE_3 = [
ToolCall(
id="1", type="function", function=ToolCall.ToolCallFunction(name="func_foo", arguments='{"arg1": "value"}')
),
ToolCall(
id="2", type="function", function=ToolCall.ToolCallFunction(name="func_bar", arguments='{"arg2": "value"}')
),
]
# CASE 4: Tool call sequences with no IDs
INPUTS_CASE_4 = [
ToolCall(id="", type="function", function=ToolCall.ToolCallFunction(name="func_foo", arguments="")),
ToolCall(id="", type="function", function=ToolCall.ToolCallFunction(name="", arguments='{"arg1": ')),
ToolCall(id="", type="function", function=ToolCall.ToolCallFunction(name="", arguments='"value"}')),
ToolCall(id="", type="function", function=ToolCall.ToolCallFunction(name="func_bar", arguments="")),
ToolCall(id="", type="function", function=ToolCall.ToolCallFunction(name="", arguments='{"arg2": ')),
ToolCall(id="", type="function", function=ToolCall.ToolCallFunction(name="", arguments='"value"}')),
]
EXPECTED_CASE_4 = [
ToolCall(
id="RANDOM_ID_1",
type="function",
function=ToolCall.ToolCallFunction(name="func_foo", arguments='{"arg1": "value"}'),
),
ToolCall(
id="RANDOM_ID_2",
type="function",
function=ToolCall.ToolCallFunction(name="func_bar", arguments='{"arg2": "value"}'),
),
]
def _run_case(inputs: list[ToolCall], expected: list[ToolCall]):
actual = []
_increase_tool_call(inputs, actual)
assert actual == expected
def test__increase_tool_call():
# case 1:
_run_case(INPUTS_CASE_1, EXPECTED_CASE_1)
# case 2:
_run_case(INPUTS_CASE_2, EXPECTED_CASE_2)
# case 3:
_run_case(INPUTS_CASE_3, EXPECTED_CASE_3)
# case 4:
mock_id_generator = MagicMock()
mock_id_generator.side_effect = [_exp_case.id for _exp_case in EXPECTED_CASE_4]
with patch("core.model_runtime.model_providers.__base.large_language_model._gen_tool_call_id", mock_id_generator):
_run_case(INPUTS_CASE_4, EXPECTED_CASE_4)

View File

@ -1,14 +1,20 @@
from unittest.mock import patch
from core.app.entities.app_invoke_entities import InvokeFrom
from core.workflow.entities.node_entities import NodeRunMetadataKey, NodeRunResult
from core.workflow.enums import SystemVariableKey
from core.workflow.graph_engine.entities.event import (
GraphRunPartialSucceededEvent,
NodeRunExceptionEvent,
NodeRunFailedEvent,
NodeRunStreamChunkEvent,
)
from core.workflow.graph_engine.entities.graph import Graph
from core.workflow.graph_engine.graph_engine import GraphEngine
from core.workflow.nodes.event.event import RunCompletedEvent, RunStreamChunkEvent
from core.workflow.nodes.llm.node import LLMNode
from models.enums import UserFrom
from models.workflow import WorkflowType
from models.workflow import WorkflowNodeExecutionStatus, WorkflowType
class ContinueOnErrorTestHelper:
@ -492,10 +498,7 @@ def test_no_node_in_fail_branch_continue_on_error():
"edges": FAIL_BRANCH_EDGES[:-1],
"nodes": [
{"data": {"title": "Start", "type": "start", "variables": []}, "id": "start"},
{
"data": {"title": "success", "type": "answer", "answer": "HTTP request successful"},
"id": "success",
},
{"data": {"title": "success", "type": "answer", "answer": "HTTP request successful"}, "id": "success"},
ContinueOnErrorTestHelper.get_http_node(),
],
}
@ -506,3 +509,47 @@ def test_no_node_in_fail_branch_continue_on_error():
assert any(isinstance(e, NodeRunExceptionEvent) for e in events)
assert any(isinstance(e, GraphRunPartialSucceededEvent) and e.outputs == {} for e in events)
assert sum(1 for e in events if isinstance(e, NodeRunStreamChunkEvent)) == 0
def test_stream_output_with_fail_branch_continue_on_error():
"""Test stream output with fail-branch error strategy"""
graph_config = {
"edges": FAIL_BRANCH_EDGES,
"nodes": [
{"data": {"title": "Start", "type": "start", "variables": []}, "id": "start"},
{
"data": {"title": "success", "type": "answer", "answer": "LLM request successful"},
"id": "success",
},
{
"data": {"title": "error", "type": "answer", "answer": "{{#node.text#}}"},
"id": "error",
},
ContinueOnErrorTestHelper.get_llm_node(),
],
}
graph_engine = ContinueOnErrorTestHelper.create_test_graph_engine(graph_config)
def llm_generator(self):
contents = ["hi", "bye", "good morning"]
yield RunStreamChunkEvent(chunk_content=contents[0], from_variable_selector=[self.node_id, "text"])
yield RunCompletedEvent(
run_result=NodeRunResult(
status=WorkflowNodeExecutionStatus.SUCCEEDED,
inputs={},
process_data={},
outputs={},
metadata={
NodeRunMetadataKey.TOTAL_TOKENS: 1,
NodeRunMetadataKey.TOTAL_PRICE: 1,
NodeRunMetadataKey.CURRENCY: "USD",
},
)
)
with patch.object(LLMNode, "_run", new=llm_generator):
events = list(graph_engine.run())
assert sum(isinstance(e, NodeRunStreamChunkEvent) for e in events) == 1
assert all(not isinstance(e, NodeRunFailedEvent | NodeRunExceptionEvent) for e in events)

View File

@ -0,0 +1,198 @@
import uuid
from unittest.mock import MagicMock, patch
import pytest
from httpx import Response
from factories.file_factory import (
File,
FileTransferMethod,
FileType,
FileUploadConfig,
build_from_mapping,
)
from models import ToolFile, UploadFile
# Test Data
TEST_TENANT_ID = "test_tenant_id"
TEST_UPLOAD_FILE_ID = str(uuid.uuid4())
TEST_TOOL_FILE_ID = str(uuid.uuid4())
TEST_REMOTE_URL = "http://example.com/test.jpg"
# Test Config
TEST_CONFIG = FileUploadConfig(
allowed_file_types=["image", "document"],
allowed_file_extensions=[".jpg", ".pdf"],
allowed_file_upload_methods=[FileTransferMethod.LOCAL_FILE, FileTransferMethod.TOOL_FILE],
number_limits=10,
)
# Fixtures
@pytest.fixture
def mock_upload_file():
mock = MagicMock(spec=UploadFile)
mock.id = TEST_UPLOAD_FILE_ID
mock.tenant_id = TEST_TENANT_ID
mock.name = "test.jpg"
mock.extension = "jpg"
mock.mime_type = "image/jpeg"
mock.source_url = TEST_REMOTE_URL
mock.size = 1024
mock.key = "test_key"
with patch("factories.file_factory.db.session.scalar", return_value=mock) as m:
yield m
@pytest.fixture
def mock_tool_file():
mock = MagicMock(spec=ToolFile)
mock.id = TEST_TOOL_FILE_ID
mock.tenant_id = TEST_TENANT_ID
mock.name = "tool_file.pdf"
mock.file_key = "tool_file.pdf"
mock.mimetype = "application/pdf"
mock.original_url = "http://example.com/tool.pdf"
mock.size = 2048
with patch("factories.file_factory.db.session.query") as mock_query:
mock_query.return_value.filter.return_value.first.return_value = mock
yield mock
@pytest.fixture
def mock_http_head():
def _mock_response(filename, size, content_type):
return Response(
status_code=200,
headers={
"Content-Disposition": f'attachment; filename="{filename}"',
"Content-Length": str(size),
"Content-Type": content_type,
},
)
with patch("factories.file_factory.ssrf_proxy.head") as mock_head:
mock_head.return_value = _mock_response("remote_test.jpg", 2048, "image/jpeg")
yield mock_head
# Helper functions
def local_file_mapping(file_type="image"):
return {
"transfer_method": "local_file",
"upload_file_id": TEST_UPLOAD_FILE_ID,
"type": file_type,
}
def tool_file_mapping(file_type="document"):
return {
"transfer_method": "tool_file",
"tool_file_id": TEST_TOOL_FILE_ID,
"type": file_type,
}
# Tests
def test_build_from_mapping_backward_compatibility(mock_upload_file):
mapping = local_file_mapping(file_type="image")
file = build_from_mapping(mapping=mapping, tenant_id=TEST_TENANT_ID)
assert isinstance(file, File)
assert file.transfer_method == FileTransferMethod.LOCAL_FILE
assert file.type == FileType.IMAGE
assert file.related_id == TEST_UPLOAD_FILE_ID
@pytest.mark.parametrize(
("file_type", "should_pass", "expected_error"),
[
("image", True, None),
("document", False, "Detected file type does not match"),
],
)
def test_build_from_local_file_strict_validation(mock_upload_file, file_type, should_pass, expected_error):
mapping = local_file_mapping(file_type=file_type)
if should_pass:
file = build_from_mapping(mapping=mapping, tenant_id=TEST_TENANT_ID, strict_type_validation=True)
assert file.type == FileType(file_type)
else:
with pytest.raises(ValueError, match=expected_error):
build_from_mapping(mapping=mapping, tenant_id=TEST_TENANT_ID, strict_type_validation=True)
@pytest.mark.parametrize(
("file_type", "should_pass", "expected_error"),
[
("document", True, None),
("image", False, "Detected file type does not match"),
],
)
def test_build_from_tool_file_strict_validation(mock_tool_file, file_type, should_pass, expected_error):
"""Strict type validation for tool_file."""
mapping = tool_file_mapping(file_type=file_type)
if should_pass:
file = build_from_mapping(mapping=mapping, tenant_id=TEST_TENANT_ID, strict_type_validation=True)
assert file.type == FileType(file_type)
else:
with pytest.raises(ValueError, match=expected_error):
build_from_mapping(mapping=mapping, tenant_id=TEST_TENANT_ID, strict_type_validation=True)
def test_build_from_remote_url(mock_http_head):
mapping = {
"transfer_method": "remote_url",
"url": TEST_REMOTE_URL,
"type": "image",
}
file = build_from_mapping(mapping=mapping, tenant_id=TEST_TENANT_ID)
assert file.transfer_method == FileTransferMethod.REMOTE_URL
assert file.type == FileType.IMAGE
assert file.filename == "remote_test.jpg"
assert file.size == 2048
def test_tool_file_not_found():
"""Test ToolFile not found in database."""
with patch("factories.file_factory.db.session.query") as mock_query:
mock_query.return_value.filter.return_value.first.return_value = None
mapping = tool_file_mapping()
with pytest.raises(ValueError, match=f"ToolFile {TEST_TOOL_FILE_ID} not found"):
build_from_mapping(mapping=mapping, tenant_id=TEST_TENANT_ID)
def test_local_file_not_found():
"""Test UploadFile not found in database."""
with patch("factories.file_factory.db.session.scalar", return_value=None):
mapping = local_file_mapping()
with pytest.raises(ValueError, match="Invalid upload file"):
build_from_mapping(mapping=mapping, tenant_id=TEST_TENANT_ID)
def test_build_without_type_specification(mock_upload_file):
"""Test the situation where no file type is specified"""
mapping = {
"transfer_method": "local_file",
"upload_file_id": TEST_UPLOAD_FILE_ID,
# leave out the type
}
file = build_from_mapping(mapping=mapping, tenant_id=TEST_TENANT_ID)
# It should automatically infer the type as "image" based on the file extension
assert file.type == FileType.IMAGE
@pytest.mark.parametrize(
("file_type", "should_pass", "expected_error"),
[
("image", True, None),
("video", False, "File validation failed"),
],
)
def test_file_validation_with_config(mock_upload_file, file_type, should_pass, expected_error):
"""Test the validation of files and configurations"""
mapping = local_file_mapping(file_type=file_type)
if should_pass:
file = build_from_mapping(mapping=mapping, tenant_id=TEST_TENANT_ID, config=TEST_CONFIG)
assert file is not None
else:
with pytest.raises(ValueError, match=expected_error):
build_from_mapping(mapping=mapping, tenant_id=TEST_TENANT_ID, config=TEST_CONFIG)

View File

@ -0,0 +1,3 @@
"""
Unit tests for repositories.
"""

View File

@ -0,0 +1,3 @@
"""
Unit tests for workflow_node_execution repositories.
"""

View File

@ -0,0 +1,154 @@
"""
Unit tests for the SQLAlchemy implementation of WorkflowNodeExecutionRepository.
"""
from unittest.mock import MagicMock
import pytest
from pytest_mock import MockerFixture
from sqlalchemy.orm import Session, sessionmaker
from core.repository.workflow_node_execution_repository import OrderConfig
from models.workflow import WorkflowNodeExecution
from repositories.workflow_node_execution.sqlalchemy_repository import SQLAlchemyWorkflowNodeExecutionRepository
@pytest.fixture
def session():
"""Create a mock SQLAlchemy session."""
session = MagicMock(spec=Session)
# Configure the session to be used as a context manager
session.__enter__ = MagicMock(return_value=session)
session.__exit__ = MagicMock(return_value=None)
# Configure the session factory to return the session
session_factory = MagicMock(spec=sessionmaker)
session_factory.return_value = session
return session, session_factory
@pytest.fixture
def repository(session):
"""Create a repository instance with test data."""
_, session_factory = session
tenant_id = "test-tenant"
app_id = "test-app"
return SQLAlchemyWorkflowNodeExecutionRepository(
session_factory=session_factory, tenant_id=tenant_id, app_id=app_id
)
def test_save(repository, session):
"""Test save method."""
session_obj, _ = session
# Create a mock execution
execution = MagicMock(spec=WorkflowNodeExecution)
execution.tenant_id = None
execution.app_id = None
# Call save method
repository.save(execution)
# Assert tenant_id and app_id are set
assert execution.tenant_id == repository._tenant_id
assert execution.app_id == repository._app_id
# Assert session.add was called
session_obj.add.assert_called_once_with(execution)
def test_save_with_existing_tenant_id(repository, session):
"""Test save method with existing tenant_id."""
session_obj, _ = session
# Create a mock execution with existing tenant_id
execution = MagicMock(spec=WorkflowNodeExecution)
execution.tenant_id = "existing-tenant"
execution.app_id = None
# Call save method
repository.save(execution)
# Assert tenant_id is not changed and app_id is set
assert execution.tenant_id == "existing-tenant"
assert execution.app_id == repository._app_id
# Assert session.add was called
session_obj.add.assert_called_once_with(execution)
def test_get_by_node_execution_id(repository, session, mocker: MockerFixture):
"""Test get_by_node_execution_id method."""
session_obj, _ = session
# Set up mock
mock_select = mocker.patch("repositories.workflow_node_execution.sqlalchemy_repository.select")
mock_stmt = mocker.MagicMock()
mock_select.return_value = mock_stmt
mock_stmt.where.return_value = mock_stmt
session_obj.scalar.return_value = mocker.MagicMock(spec=WorkflowNodeExecution)
# Call method
result = repository.get_by_node_execution_id("test-node-execution-id")
# Assert select was called with correct parameters
mock_select.assert_called_once()
session_obj.scalar.assert_called_once_with(mock_stmt)
assert result is not None
def test_get_by_workflow_run(repository, session, mocker: MockerFixture):
"""Test get_by_workflow_run method."""
session_obj, _ = session
# Set up mock
mock_select = mocker.patch("repositories.workflow_node_execution.sqlalchemy_repository.select")
mock_stmt = mocker.MagicMock()
mock_select.return_value = mock_stmt
mock_stmt.where.return_value = mock_stmt
mock_stmt.order_by.return_value = mock_stmt
session_obj.scalars.return_value.all.return_value = [mocker.MagicMock(spec=WorkflowNodeExecution)]
# Call method
order_config = OrderConfig(order_by=["index"], order_direction="desc")
result = repository.get_by_workflow_run(workflow_run_id="test-workflow-run-id", order_config=order_config)
# Assert select was called with correct parameters
mock_select.assert_called_once()
session_obj.scalars.assert_called_once_with(mock_stmt)
assert len(result) == 1
def test_get_running_executions(repository, session, mocker: MockerFixture):
"""Test get_running_executions method."""
session_obj, _ = session
# Set up mock
mock_select = mocker.patch("repositories.workflow_node_execution.sqlalchemy_repository.select")
mock_stmt = mocker.MagicMock()
mock_select.return_value = mock_stmt
mock_stmt.where.return_value = mock_stmt
session_obj.scalars.return_value.all.return_value = [mocker.MagicMock(spec=WorkflowNodeExecution)]
# Call method
result = repository.get_running_executions("test-workflow-run-id")
# Assert select was called with correct parameters
mock_select.assert_called_once()
session_obj.scalars.assert_called_once_with(mock_stmt)
assert len(result) == 1
def test_update(repository, session):
"""Test update method."""
session_obj, _ = session
# Create a mock execution
execution = MagicMock(spec=WorkflowNodeExecution)
execution.tenant_id = None
execution.app_id = None
# Call update method
repository.update(execution)
# Assert tenant_id and app_id are set
assert execution.tenant_id == repository._tenant_id
assert execution.app_id == repository._app_id
# Assert session.merge was called
session_obj.merge.assert_called_once_with(execution)

6286
api/uv.lock Normal file

File diff suppressed because it is too large Load Diff

7
dev/mypy-check Executable file
View File

@ -0,0 +1,7 @@
#!/bin/bash
set -x
# run mypy checks
uv run --directory api --dev \
python -m mypy --install-types --non-interactive .

View File

@ -2,20 +2,14 @@
set -x
# style checks rely on commands in path
if ! command -v ruff &> /dev/null || ! command -v dotenv-linter &> /dev/null; then
echo "Installing linting tools (Ruff, dotenv-linter ...) ..."
poetry install -C api --only lint
fi
# run ruff linter
poetry run -C api ruff check --fix ./
uv run --directory api --dev ruff check --fix ./
# run ruff formatter
poetry run -C api ruff format ./
uv run --directory api --dev ruff format ./
# run dotenv-linter linter
poetry run -P api dotenv-linter ./api/.env.example ./web/.env.example
uv run --project api --dev dotenv-linter ./api/.env.example ./web/.env.example
# run mypy check
dev/run-mypy
dev/mypy-check

View File

@ -1,11 +0,0 @@
#!/bin/bash
set -x
if ! command -v mypy &> /dev/null; then
poetry install -C api --with dev
fi
# run mypy checks
poetry run -C api \
python -m mypy --install-types --non-interactive .

View File

@ -1,18 +0,0 @@
#!/bin/bash
# rely on `poetry` in path
if ! command -v poetry &> /dev/null; then
echo "Installing Poetry ..."
pip install poetry
fi
# check poetry.lock in sync with pyproject.toml
poetry check -C api --lock
if [ $? -ne 0 ]; then
# update poetry.lock
# refreshing lockfile only without updating locked versions
echo "poetry.lock is outdated, refreshing without updating locked versions ..."
poetry lock -C api
else
echo "poetry.lock is ready."
fi

10
dev/sync-uv Executable file
View File

@ -0,0 +1,10 @@
#!/bin/bash
# rely on `uv` in path
if ! command -v uv &> /dev/null; then
echo "Installing uv ..."
pip install uv
fi
# check uv.lock in sync with pyproject.toml
uv lock --project api

View File

@ -1,13 +0,0 @@
#!/bin/bash
# rely on `poetry` in path
if ! command -v poetry &> /dev/null; then
echo "Installing Poetry ..."
pip install poetry
fi
# refreshing lockfile, updating locked versions
poetry update -C api
# check poetry.lock in sync with pyproject.toml
poetry check -C api --lock

22
dev/update-uv Executable file
View File

@ -0,0 +1,22 @@
#!/bin/bash
# Update dependencies in dify/api project using uv
set -e
set -o pipefail
SCRIPT_DIR="$(dirname "$0")"
REPO_ROOT="$(dirname "${SCRIPT_DIR}")"
# rely on `poetry` in path
if ! command -v uv &> /dev/null; then
echo "Installing uv ..."
pip install uv
fi
cd "${REPO_ROOT}"
# refreshing lockfile, updating locked versions
uv lock --project api --upgrade
# check uv.lock in sync with pyproject.toml
uv lock --project api --check

View File

@ -174,6 +174,12 @@ CELERY_MIN_WORKERS=
API_TOOL_DEFAULT_CONNECT_TIMEOUT=10
API_TOOL_DEFAULT_READ_TIMEOUT=60
# -------------------------------
# Datasource Configuration
# --------------------------------
ENABLE_WEBSITE_JINAREADER=true
ENABLE_WEBSITE_FIRECRAWL=true
ENABLE_WEBSITE_WATERCRAWL=true
# ------------------------------
# Database Configuration
@ -404,6 +410,7 @@ MILVUS_TOKEN=
MILVUS_USER=
MILVUS_PASSWORD=
MILVUS_ENABLE_HYBRID_SEARCH=False
MILVUS_ANALYZER_PARAMS=
# MyScale configuration, only available when VECTOR_STORE is `myscale`
# For multi-language support, please set MYSCALE_FTS_PARAMS with referring to:
@ -737,6 +744,12 @@ MAX_VARIABLE_SIZE=204800
WORKFLOW_PARALLEL_DEPTH_LIMIT=3
WORKFLOW_FILE_UPLOAD_LIMIT=10
# Workflow storage configuration
# Options: rdbms, hybrid
# rdbms: Use only the relational database (default)
# hybrid: Save new data to object storage, read from both object storage and RDBMS
WORKFLOW_NODE_EXECUTION_STORAGE=rdbms
# HTTP request node in workflow configuration
HTTP_REQUEST_NODE_MAX_BINARY_SIZE=10485760
HTTP_REQUEST_NODE_MAX_TEXT_SIZE=1048576

View File

@ -17,8 +17,10 @@ services:
PLUGIN_MAX_PACKAGE_SIZE: ${PLUGIN_MAX_PACKAGE_SIZE:-52428800}
INNER_API_KEY_FOR_PLUGIN: ${PLUGIN_DIFY_INNER_API_KEY:-QaHbTe77CtuXmsfyhR7+vRjI/+XbV1AaFy691iy+kGDv2Jvy0/eAh8Y1}
depends_on:
- db
- redis
db:
condition: service_healthy
redis:
condition: service_started
volumes:
# Mount the storage directory to the container, for storing user files.
- ./volumes/app/storage:/app/api/storage
@ -42,8 +44,10 @@ services:
PLUGIN_MAX_PACKAGE_SIZE: ${PLUGIN_MAX_PACKAGE_SIZE:-52428800}
INNER_API_KEY_FOR_PLUGIN: ${PLUGIN_DIFY_INNER_API_KEY:-QaHbTe77CtuXmsfyhR7+vRjI/+XbV1AaFy691iy+kGDv2Jvy0/eAh8Y1}
depends_on:
- db
- redis
db:
condition: service_healthy
redis:
condition: service_started
volumes:
# Mount the storage directory to the container, for storing user files.
- ./volumes/app/storage:/app/api/storage
@ -71,7 +75,9 @@ services:
MAX_TOOLS_NUM: ${MAX_TOOLS_NUM:-10}
MAX_PARALLEL_LIMIT: ${MAX_PARALLEL_LIMIT:-10}
MAX_ITERATIONS_NUM: ${MAX_ITERATIONS_NUM:-5}
ENABLE_WEBSITE_JINAREADER: ${ENABLE_WEBSITE_JINAREADER:-true}
ENABLE_WEBSITE_FIRECRAWL: ${ENABLE_WEBSITE_FIRECRAWL:-true}
ENABLE_WEBSITE_WATERCRAWL: ${ENABLE_WEBSITE_WATERCRAWL:-true}
# The postgres database.
db:
image: postgres:15-alpine
@ -163,7 +169,7 @@ services:
S3_ENDPOINT: ${PLUGIN_S3_ENDPOINT:-}
S3_USE_PATH_STYLE: ${PLUGIN_S3_USE_PATH_STYLE:-false}
AWS_ACCESS_KEY: ${PLUGIN_AWS_ACCESS_KEY:-}
PAWS_SECRET_KEY: ${PLUGIN_AWS_SECRET_KEY:-}
AWS_SECRET_KEY: ${PLUGIN_AWS_SECRET_KEY:-}
AWS_REGION: ${PLUGIN_AWS_REGION:-}
AZURE_BLOB_STORAGE_CONNECTION_STRING: ${PLUGIN_AZURE_BLOB_STORAGE_CONNECTION_STRING:-}
AZURE_BLOB_STORAGE_CONTAINER_NAME: ${PLUGIN_AZURE_BLOB_STORAGE_CONTAINER_NAME:-}

View File

@ -107,7 +107,7 @@ services:
S3_ENDPOINT: ${PLUGIN_S3_ENDPOINT:-}
S3_USE_PATH_STYLE: ${PLUGIN_S3_USE_PATH_STYLE:-false}
AWS_ACCESS_KEY: ${PLUGIN_AWS_ACCESS_KEY:-}
PAWS_SECRET_KEY: ${PLUGIN_AWS_SECRET_KEY:-}
AWS_SECRET_KEY: ${PLUGIN_AWS_SECRET_KEY:-}
AWS_REGION: ${PLUGIN_AWS_REGION:-}
AZURE_BLOB_STORAGE_CONNECTION_STRING: ${PLUGIN_AZURE_BLOB_STORAGE_CONNECTION_STRING:-}
AZURE_BLOB_STORAGE_CONTAINER_NAME: ${PLUGIN_AZURE_BLOB_STORAGE_CONTAINER_NAME:-}

View File

@ -43,6 +43,9 @@ x-shared-env: &shared-api-worker-env
CELERY_MIN_WORKERS: ${CELERY_MIN_WORKERS:-}
API_TOOL_DEFAULT_CONNECT_TIMEOUT: ${API_TOOL_DEFAULT_CONNECT_TIMEOUT:-10}
API_TOOL_DEFAULT_READ_TIMEOUT: ${API_TOOL_DEFAULT_READ_TIMEOUT:-60}
ENABLE_WEBSITE_JINAREADER: ${ENABLE_WEBSITE_JINAREADER:-true}
ENABLE_WEBSITE_FIRECRAWL: ${ENABLE_WEBSITE_FIRECRAWL:-true}
ENABLE_WEBSITE_WATERCRAWL: ${ENABLE_WEBSITE_WATERCRAWL:-true}
DB_USERNAME: ${DB_USERNAME:-postgres}
DB_PASSWORD: ${DB_PASSWORD:-difyai123456}
DB_HOST: ${DB_HOST:-db}
@ -139,6 +142,7 @@ x-shared-env: &shared-api-worker-env
MILVUS_USER: ${MILVUS_USER:-}
MILVUS_PASSWORD: ${MILVUS_PASSWORD:-}
MILVUS_ENABLE_HYBRID_SEARCH: ${MILVUS_ENABLE_HYBRID_SEARCH:-False}
MILVUS_ANALYZER_PARAMS: ${MILVUS_ANALYZER_PARAMS:-}
MYSCALE_HOST: ${MYSCALE_HOST:-myscale}
MYSCALE_PORT: ${MYSCALE_PORT:-8123}
MYSCALE_USER: ${MYSCALE_USER:-default}
@ -323,6 +327,7 @@ x-shared-env: &shared-api-worker-env
MAX_VARIABLE_SIZE: ${MAX_VARIABLE_SIZE:-204800}
WORKFLOW_PARALLEL_DEPTH_LIMIT: ${WORKFLOW_PARALLEL_DEPTH_LIMIT:-3}
WORKFLOW_FILE_UPLOAD_LIMIT: ${WORKFLOW_FILE_UPLOAD_LIMIT:-10}
WORKFLOW_NODE_EXECUTION_STORAGE: ${WORKFLOW_NODE_EXECUTION_STORAGE:-rdbms}
HTTP_REQUEST_NODE_MAX_BINARY_SIZE: ${HTTP_REQUEST_NODE_MAX_BINARY_SIZE:-10485760}
HTTP_REQUEST_NODE_MAX_TEXT_SIZE: ${HTTP_REQUEST_NODE_MAX_TEXT_SIZE:-1048576}
HTTP_REQUEST_NODE_SSL_VERIFY: ${HTTP_REQUEST_NODE_SSL_VERIFY:-True}
@ -485,8 +490,10 @@ services:
PLUGIN_MAX_PACKAGE_SIZE: ${PLUGIN_MAX_PACKAGE_SIZE:-52428800}
INNER_API_KEY_FOR_PLUGIN: ${PLUGIN_DIFY_INNER_API_KEY:-QaHbTe77CtuXmsfyhR7+vRjI/+XbV1AaFy691iy+kGDv2Jvy0/eAh8Y1}
depends_on:
- db
- redis
db:
condition: service_healthy
redis:
condition: service_started
volumes:
# Mount the storage directory to the container, for storing user files.
- ./volumes/app/storage:/app/api/storage
@ -510,8 +517,10 @@ services:
PLUGIN_MAX_PACKAGE_SIZE: ${PLUGIN_MAX_PACKAGE_SIZE:-52428800}
INNER_API_KEY_FOR_PLUGIN: ${PLUGIN_DIFY_INNER_API_KEY:-QaHbTe77CtuXmsfyhR7+vRjI/+XbV1AaFy691iy+kGDv2Jvy0/eAh8Y1}
depends_on:
- db
- redis
db:
condition: service_healthy
redis:
condition: service_started
volumes:
# Mount the storage directory to the container, for storing user files.
- ./volumes/app/storage:/app/api/storage
@ -539,7 +548,9 @@ services:
MAX_TOOLS_NUM: ${MAX_TOOLS_NUM:-10}
MAX_PARALLEL_LIMIT: ${MAX_PARALLEL_LIMIT:-10}
MAX_ITERATIONS_NUM: ${MAX_ITERATIONS_NUM:-5}
ENABLE_WEBSITE_JINAREADER: ${ENABLE_WEBSITE_JINAREADER:-true}
ENABLE_WEBSITE_FIRECRAWL: ${ENABLE_WEBSITE_FIRECRAWL:-true}
ENABLE_WEBSITE_WATERCRAWL: ${ENABLE_WEBSITE_WATERCRAWL:-true}
# The postgres database.
db:
image: postgres:15-alpine
@ -631,7 +642,7 @@ services:
S3_ENDPOINT: ${PLUGIN_S3_ENDPOINT:-}
S3_USE_PATH_STYLE: ${PLUGIN_S3_USE_PATH_STYLE:-false}
AWS_ACCESS_KEY: ${PLUGIN_AWS_ACCESS_KEY:-}
PAWS_SECRET_KEY: ${PLUGIN_AWS_SECRET_KEY:-}
AWS_SECRET_KEY: ${PLUGIN_AWS_SECRET_KEY:-}
AWS_REGION: ${PLUGIN_AWS_REGION:-}
AZURE_BLOB_STORAGE_CONNECTION_STRING: ${PLUGIN_AZURE_BLOB_STORAGE_CONNECTION_STRING:-}
AZURE_BLOB_STORAGE_CONTAINER_NAME: ${PLUGIN_AZURE_BLOB_STORAGE_CONTAINER_NAME:-}

Some files were not shown because too many files have changed in this diff Show More