add dify plugin llm layers

This commit is contained in:
盐粒 Yanli 2026-05-12 02:42:45 +08:00
parent 8d96f6cfbd
commit 208012e268
34 changed files with 883 additions and 181 deletions

View File

@ -12,11 +12,11 @@ server-only and should not be used by API consumers.
## Input model
Create-run requests accept a `CompositorConfig` and an optional
`CompositorSessionSnapshot`. There is **no top-level `user_prompt` field**.
User input must be supplied by Agenton layers. In the MVP server, the safe
config-constructible layer registry includes `plain.prompt`; its `config.user`
field becomes `Compositor.user_prompts` and is passed to Pydantic AI as the run
input.
`CompositorSessionSnapshot`. There is **no top-level `user_prompt` or model
profile field**. User input and model/provider selection are supplied by Agenton
layers. In the MVP server, the safe config-constructible layer registry includes
`plain.prompt`, `dify.plugin`, and `dify.plugin.llm`. The runtime reads the LLM
model layer named by `DIFY_AGENT_MODEL_LAYER_ID`, whose public value is `"llm"`.
Blank user input is rejected. A request with no user prompt, an empty string, or
only whitespace strings such as `"user": ["", " "]` returns `422` before a run
@ -46,14 +46,35 @@ Request:
"prefix": "You are a concise assistant.",
"user": "Say hello from the Dify Agent API."
}
},
{
"name": "plugin",
"type": "dify.plugin",
"config": {
"tenant_id": "replace-with-tenant-id",
"plugin_id": "langgenius/openai"
}
},
{
"name": "llm",
"type": "dify.plugin.llm",
"deps": {
"plugin": "plugin"
},
"config": {
"provider": "openai",
"model": "gpt-4o-mini",
"credentials": {
"api_key": "replace-with-provider-key"
},
"model_settings": {
"temperature": 0.2
}
}
}
]
},
"session_snapshot": null,
"agent_profile": {
"provider": "test",
"output_text": "Hello from the TestModel."
}
"session_snapshot": null
}
```
@ -71,7 +92,9 @@ same FastAPI process. Redis is not used as a job queue. Run records and per-run
event streams expire after `DIFY_AGENT_RUN_RETENTION_SECONDS`, which defaults to
`259200` seconds (3 days).
`agent_profile.provider` currently supports the credential-free `test` profile.
`dify.plugin` receives tenant/plugin identity only; daemon URL, API key, and
timeout are server settings. `dify.plugin.llm.credentials` accepts scalar values
only (`string`, `number`, `boolean`, or `null`).
Validation error example (`422`):
@ -167,13 +190,31 @@ normal names; sync methods add `_sync`.
from agenton.compositor import CompositorConfig, LayerNodeConfig
from agenton_collections.layers.plain import PromptLayerConfig
from dify_agent.client import Client
from dify_agent.protocol import CreateRunRequest
from dify_agent.layers.dify_plugin import DifyPluginLLMLayerConfig, DifyPluginLayerConfig
from dify_agent.protocol import DIFY_AGENT_MODEL_LAYER_ID, CreateRunRequest
async def main() -> None:
request = CreateRunRequest(
compositor=CompositorConfig(
layers=[LayerNodeConfig(name="prompt", type="plain.prompt", config=PromptLayerConfig(user="hello"))]
layers=[
LayerNodeConfig(name="prompt", type="plain.prompt", config=PromptLayerConfig(user="hello")),
LayerNodeConfig(
name="plugin",
type="dify.plugin",
config=DifyPluginLayerConfig(tenant_id="tenant-id", plugin_id="langgenius/openai"),
),
LayerNodeConfig(
name=DIFY_AGENT_MODEL_LAYER_ID,
type="dify.plugin.llm",
deps={"plugin": "plugin"},
config=DifyPluginLLMLayerConfig(
provider="openai",
model="gpt-4o-mini",
credentials={"api_key": "provider-key"},
),
),
]
)
)
async with Client(base_url="http://localhost:8000") as client:
@ -186,12 +227,30 @@ async def main() -> None:
from agenton.compositor import CompositorConfig, LayerNodeConfig
from agenton_collections.layers.plain import PromptLayerConfig
from dify_agent.client import Client
from dify_agent.protocol import CreateRunRequest
from dify_agent.layers.dify_plugin import DifyPluginLLMLayerConfig, DifyPluginLayerConfig
from dify_agent.protocol import DIFY_AGENT_MODEL_LAYER_ID, CreateRunRequest
request = CreateRunRequest(
compositor=CompositorConfig(
layers=[LayerNodeConfig(name="prompt", type="plain.prompt", config=PromptLayerConfig(user="hello"))]
layers=[
LayerNodeConfig(name="prompt", type="plain.prompt", config=PromptLayerConfig(user="hello")),
LayerNodeConfig(
name="plugin",
type="dify.plugin",
config=DifyPluginLayerConfig(tenant_id="tenant-id", plugin_id="langgenius/openai"),
),
LayerNodeConfig(
name=DIFY_AGENT_MODEL_LAYER_ID,
type="dify.plugin.llm",
deps={"plugin": "plugin"},
config=DifyPluginLLMLayerConfig(
provider="openai",
model="gpt-4o-mini",
credentials={"api_key": "provider-key"},
),
),
]
)
)

View File

@ -5,7 +5,7 @@ example:
uv run --project dify-agent uvicorn dify_agent.server.app:app --reload
The default request uses the credential-free pydantic-ai TestModel profile. This
The request carries Dify plugin model configuration in Agenton layers. This
script prints the created run and every event observed through cursor polling.
``Client.create_run`` performs one POST attempt only; use polling or SSE replay to
recover after client-side uncertainty.
@ -16,10 +16,20 @@ import asyncio
from agenton.compositor import CompositorConfig, LayerNodeConfig
from agenton_collections.layers.plain import PromptLayerConfig
from dify_agent.client import Client
from dify_agent.protocol import AgentProfileConfig, CreateRunRequest
from dify_agent.layers.dify_plugin import (
DifyPluginCredentialValue,
DifyPluginLLMLayerConfig,
DifyPluginLayerConfig,
)
from dify_agent.protocol import DIFY_AGENT_MODEL_LAYER_ID, CreateRunRequest
API_BASE_URL = "http://localhost:8000"
TENANT_ID = "replace-with-tenant-id"
PLUGIN_ID = "langgenius/openai"
PLUGIN_PROVIDER = "openai"
MODEL_NAME = "gpt-4o-mini"
MODEL_CREDENTIALS: dict[str, DifyPluginCredentialValue] = {"api_key": "replace-with-provider-key"}
async def main() -> None:
@ -35,10 +45,24 @@ async def main() -> None:
prefix="You are a concise assistant.",
user="Say hello from the Dify Agent API server example.",
),
)
),
LayerNodeConfig(
name="plugin",
type="dify.plugin",
config=DifyPluginLayerConfig(tenant_id=TENANT_ID, plugin_id=PLUGIN_ID),
),
LayerNodeConfig(
name=DIFY_AGENT_MODEL_LAYER_ID,
type="dify.plugin.llm",
deps={"plugin": "plugin"},
config=DifyPluginLLMLayerConfig(
provider=PLUGIN_PROVIDER,
model=MODEL_NAME,
credentials=MODEL_CREDENTIALS,
),
),
],
),
agent_profile=AgentProfileConfig(output_text="Hello from the example TestModel."),
)
)
print("created run", run)

View File

@ -8,10 +8,20 @@ a new run explicitly rather than assuming the original request was not accepted.
from agenton.compositor import CompositorConfig, LayerNodeConfig
from agenton_collections.layers.plain import PromptLayerConfig
from dify_agent.client import Client
from dify_agent.protocol import AgentProfileConfig, CreateRunRequest
from dify_agent.layers.dify_plugin import (
DifyPluginCredentialValue,
DifyPluginLLMLayerConfig,
DifyPluginLayerConfig,
)
from dify_agent.protocol import DIFY_AGENT_MODEL_LAYER_ID, CreateRunRequest
API_BASE_URL = "http://localhost:8000"
TENANT_ID = "replace-with-tenant-id"
PLUGIN_ID = "langgenius/openai"
PLUGIN_PROVIDER = "openai"
MODEL_NAME = "gpt-4o-mini"
MODEL_CREDENTIALS: dict[str, DifyPluginCredentialValue] = {"api_key": "replace-with-provider-key"}
def main() -> None:
@ -27,10 +37,24 @@ def main() -> None:
prefix="You are a concise assistant.",
user="Say hello from the synchronous Dify Agent client example.",
),
)
),
LayerNodeConfig(
name="plugin",
type="dify.plugin",
config=DifyPluginLayerConfig(tenant_id=TENANT_ID, plugin_id=PLUGIN_ID),
),
LayerNodeConfig(
name=DIFY_AGENT_MODEL_LAYER_ID,
type="dify.plugin.llm",
deps={"plugin": "plugin"},
config=DifyPluginLLMLayerConfig(
provider=PLUGIN_PROVIDER,
model=MODEL_NAME,
credentials=MODEL_CREDENTIALS,
),
),
],
),
agent_profile=AgentProfileConfig(output_text="Hello from the sync TestModel."),
)
)
print("created run", run)

View File

@ -22,7 +22,8 @@ Serializable graph config uses registry type ids rather than import paths.
instances; JSON serialization preserves concrete DTO fields before the builder
validates them with the registered layer schema. ``CompositorBuilder`` resolves
config nodes through ``LayerRegistry`` and can mix those nodes with live layer
instances for Python objects and callables.
instances for Python objects and callables. Registries may also supply factories
for layers that require server-side dependencies in addition to client DTOs.
``Compositor.enter`` enters layers in compositor order and exits them in reverse
order through ``AsyncExitStack``. It accepts an optional ``CompositorSession``
@ -41,7 +42,7 @@ from collections import OrderedDict
from collections.abc import AsyncIterator, Callable, Iterable, Mapping as MappingABC, Sequence
from contextlib import AsyncExitStack, asynccontextmanager
from dataclasses import dataclass, field
from typing import Any, Generic, Mapping, TypedDict, cast
from typing import Any, Generic, Mapping, TypedDict, cast, overload
from pydantic import BaseModel, ConfigDict, Field, JsonValue
from typing_extensions import Self, TypeVar
@ -55,6 +56,7 @@ LayerPromptT = TypeVar("LayerPromptT", default=AllPromptTypes)
LayerToolT = TypeVar("LayerToolT", default=AllToolTypes)
UserPromptT = TypeVar("UserPromptT", default=AllUserPromptTypes)
LayerUserPromptT = TypeVar("LayerUserPromptT", default=AllUserPromptTypes)
LayerT = TypeVar("LayerT", bound=Layer[Any, Any, Any, Any, Any, Any, Any])
type CompositorTransformer[InputT, OutputT] = Callable[[Sequence[InputT]], Sequence[OutputT]]
@ -76,6 +78,7 @@ class CompositorTransformerKwargs[
type _ConfigModelValue[ModelT: BaseModel] = ModelT | JsonValue | str | bytes
type LayerFactory = Callable[[LayerConfig], Layer[Any, Any, Any, Any, Any, Any, Any]]
def _validate_config_model_input[ModelT: BaseModel](
@ -132,6 +135,7 @@ class LayerDescriptor:
config_type: type[LayerConfig]
runtime_state_type: type[BaseModel]
runtime_handles_type: type[BaseModel]
factory: LayerFactory | None = None
class LayerRegistry:
@ -139,7 +143,8 @@ class LayerRegistry:
Registration infers config and runtime schemas from layer class attributes.
A registered layer must have a type id, either declared as ``type_id`` on the
class or supplied to ``register_layer``.
class or supplied to ``register_layer``. Optional factories let server code
inject dependencies that do not belong in public layer DTOs.
"""
__slots__ = ("_descriptors",)
@ -154,8 +159,14 @@ class LayerRegistry:
layer_type: type[Layer[Any, Any, Any, Any, Any, Any, Any]],
*,
type_id: str | None = None,
factory: LayerFactory | None = None,
) -> None:
"""Register ``layer_type`` under its inferred or explicit type id."""
"""Register ``layer_type`` under its inferred or explicit type id.
``factory`` receives validated layer config and constructs the layer. It
is intended for server-only dependencies such as clients or secrets; omit
it for normal ``Layer.from_config`` construction.
"""
resolved_type_id = type_id or layer_type.type_id
if resolved_type_id is not None and not isinstance(resolved_type_id, str):
raise TypeError(f"Layer type id for '{layer_type.__qualname__}' must be a string.")
@ -169,6 +180,7 @@ class LayerRegistry:
config_type=layer_type.config_type,
runtime_state_type=layer_type.runtime_state_type,
runtime_handles_type=layer_type.runtime_handles_type,
factory=factory,
)
def resolve(self, type_id: str) -> LayerDescriptor:
@ -289,7 +301,10 @@ class CompositorBuilder:
descriptor = self._registry.resolve(type)
raw_config = {} if config is None else config
validated_config = descriptor.config_type.model_validate(raw_config)
layer = descriptor.layer_type.from_config(cast(Any, validated_config))
if descriptor.factory is not None:
layer = descriptor.factory(validated_config)
else:
layer = descriptor.layer_type.from_config(cast(Any, validated_config))
self.add_instance(name=name, layer=layer, deps=deps)
return self
@ -406,6 +421,29 @@ class Compositor(Generic[PromptT, ToolT, LayerPromptT, LayerToolT, UserPromptT,
layer.bind_deps({**self.layers, **deps})
self._deps_bound = True
@overload
def get_layer(self, layer_id: str) -> Layer[Any, Any, Any, Any, Any, Any, Any]: ...
@overload
def get_layer(self, layer_id: str, layer_type: type[LayerT]) -> LayerT: ...
def get_layer(
self,
layer_id: str,
layer_type: type[LayerT] | None = None,
) -> Layer[Any, Any, Any, Any, Any, Any, Any] | LayerT:
"""Return a layer by compositor name and optionally validate its type."""
try:
layer = self.layers[layer_id]
except KeyError as e:
raise KeyError(f"Layer '{layer_id}' is not defined in this compositor.") from e
if layer_type is not None and not isinstance(layer, layer_type):
raise TypeError(
f"Layer '{layer_id}' must be {layer_type.__name__}, got {type(layer).__name__}."
)
return layer
def new_session(self) -> CompositorSession:
"""Create a fresh lifecycle session matching this compositor's layer order."""
return CompositorSession(
@ -566,6 +604,7 @@ __all__ = [
"CompositorTransformer",
"CompositorTransformerKwargs",
"LayerDescriptor",
"LayerFactory",
"LayerNodeConfig",
"LayerRegistry",
"LayerSessionSnapshot",

View File

@ -128,7 +128,12 @@ class DifyPluginDaemonLLMClient:
@dataclass(slots=True, kw_only=True)
class DifyPluginDaemonProvider(Provider[DifyPluginDaemonLLMClient]):
"""Pydantic AI provider for Dify plugin-daemon dispatch requests."""
"""Pydantic AI provider for Dify plugin-daemon dispatch requests.
When ``http_client`` is omitted the provider owns an ``AsyncClient`` and the
Pydantic AI provider context manager closes it. When an external client is
supplied, ownership stays with the caller and provider exit leaves it open.
"""
tenant_id: str
plugin_id: str
@ -137,15 +142,21 @@ class DifyPluginDaemonProvider(Provider[DifyPluginDaemonLLMClient]):
plugin_daemon_api_key: str = field(repr=False)
user_id: str | None = None
timeout: float | httpx.Timeout | None = _DEFAULT_DAEMON_TIMEOUT
http_client: httpx.AsyncClient | None = field(default=None, repr=False)
_client: DifyPluginDaemonLLMClient = field(init=False, repr=False)
_own_http_client: httpx.AsyncClient | None = field(init=False, default=None, repr=False)
_http_client_factory: Callable[[], httpx.AsyncClient] | None = field(init=False, default=None, repr=False)
def __post_init__(self) -> None:
self.plugin_daemon_url = self.plugin_daemon_url.rstrip("/")
self._http_client_factory = self._make_http_client
http_client = self._make_http_client()
self._own_http_client = http_client
if self.http_client is None:
self._http_client_factory = self._make_http_client
http_client = self._make_http_client()
self._own_http_client = http_client
else:
http_client = self.http_client
self._own_http_client = None
self._http_client_factory = None
self._client = DifyPluginDaemonLLMClient(
plugin_daemon_url=self.plugin_daemon_url,
plugin_daemon_api_key=self.plugin_daemon_api_key,

View File

@ -0,0 +1,3 @@
"""Dify-owned Agenton layer packages."""
__all__: list[str] = []

View File

@ -0,0 +1,13 @@
"""Client-safe exports for Dify plugin layer config DTOs."""
from dify_agent.layers.dify_plugin.configs import (
DifyPluginCredentialValue,
DifyPluginLLMLayerConfig,
DifyPluginLayerConfig,
)
__all__ = [
"DifyPluginCredentialValue",
"DifyPluginLLMLayerConfig",
"DifyPluginLayerConfig",
]

View File

@ -0,0 +1,45 @@
"""Client-safe DTOs for Dify plugin-backed Agenton layers.
This module intentionally contains only public config schemas and scalar type
aliases. Runtime objects such as HTTP clients, server settings, and adapter
implementations live in sibling implementation modules so clients can build run
requests without importing server-only dependencies.
"""
from typing import ClassVar, TypeAlias
from pydantic import ConfigDict, Field
from pydantic_ai.settings import ModelSettings
from agenton.layers import LayerConfig
DifyPluginCredentialValue: TypeAlias = str | int | float | bool | None
class DifyPluginLayerConfig(LayerConfig):
"""Public config for the plugin daemon tenant/plugin context layer."""
tenant_id: str
plugin_id: str
user_id: str | None = None
model_config: ClassVar[ConfigDict] = ConfigDict(extra="forbid", arbitrary_types_allowed=True)
class DifyPluginLLMLayerConfig(LayerConfig):
"""Public config for selecting a Dify plugin LLM model."""
provider: str
model: str
credentials: dict[str, DifyPluginCredentialValue] = Field(default_factory=dict)
model_settings: ModelSettings | None = None
model_config: ClassVar[ConfigDict] = ConfigDict(extra="forbid", arbitrary_types_allowed=True)
__all__ = [
"DifyPluginCredentialValue",
"DifyPluginLLMLayerConfig",
"DifyPluginLayerConfig",
]

View File

@ -0,0 +1,49 @@
"""Dify plugin LLM model layer.
This layer owns model capability resolution for Dify plugin-backed LLMs. It
depends on ``DifyPluginLayer`` for active daemon access and returns a Pydantic AI
model adapter configured from the public LLM layer DTO.
"""
from dataclasses import dataclass
from typing_extensions import Self, override
from agenton.layers import LayerDeps, PlainLayer
from dify_agent.adapters.llm import DifyLLMAdapterModel
from dify_agent.layers.dify_plugin.configs import DifyPluginLLMLayerConfig
from dify_agent.layers.dify_plugin.plugin_layer import DifyPluginLayer
class DifyPluginLLMDeps(LayerDeps):
"""Dependencies required by ``DifyPluginLLMLayer``."""
plugin: DifyPluginLayer # pyright: ignore[reportUninitializedInstanceVariable]
@dataclass(slots=True)
class DifyPluginLLMLayer(PlainLayer[DifyPluginLLMDeps, DifyPluginLLMLayerConfig]):
"""Layer that creates the Dify plugin-daemon Pydantic AI model."""
type_id = "dify.plugin.llm"
config: DifyPluginLLMLayerConfig
@classmethod
@override
def from_config(cls, config: DifyPluginLLMLayerConfig) -> Self:
"""Create the LLM layer from validated public config."""
return cls(config=config)
def get_model(self) -> DifyLLMAdapterModel:
"""Return the configured model using the active plugin daemon provider."""
provider = self.deps.plugin.get_provider(plugin_provider=self.config.provider)
return DifyLLMAdapterModel(
model=self.config.model,
daemon_provider=provider,
credentials=dict(self.config.credentials),
model_settings=self.config.model_settings,
)
__all__ = ["DifyPluginLLMDeps", "DifyPluginLLMLayer"]

View File

@ -0,0 +1,151 @@
"""Runtime Dify plugin context layer.
The public config identifies tenant/plugin/user context only. Plugin daemon URL,
API key, and timeout are server-side dependencies injected by the layer registry
factory. Each active compositor entry owns an HTTP client in ``LayerControl``
runtime handles; ``get_provider`` discovers those handles via a task-local
context variable so shared layer instances never store session-local clients.
"""
from collections.abc import AsyncIterator
from contextlib import asynccontextmanager
from contextvars import ContextVar, Token
from dataclasses import dataclass
from typing import cast
import httpx
from pydantic import BaseModel, ConfigDict
from typing_extensions import Self, override
from agenton.layers import EmptyRuntimeState, LayerControl, NoLayerDeps, PlainLayer
from dify_agent.adapters.llm import DifyPluginDaemonProvider
from dify_agent.layers.dify_plugin.configs import DifyPluginLayerConfig
class DifyPluginRuntimeHandles(BaseModel):
"""Live per-entry handles for Dify plugin daemon access."""
http_client: httpx.AsyncClient | None = None
model_config = ConfigDict(extra="forbid", validate_assignment=True, arbitrary_types_allowed=True)
_ACTIVE_PLUGIN_HANDLES: ContextVar[dict[int, DifyPluginRuntimeHandles]] = ContextVar(
"dify_agent_active_plugin_handles",
default={},
)
@dataclass(slots=True)
class DifyPluginLayer(PlainLayer[NoLayerDeps, DifyPluginLayerConfig, EmptyRuntimeState, DifyPluginRuntimeHandles]):
"""Layer that owns plugin daemon connection state for one active session."""
type_id = "dify.plugin"
config: DifyPluginLayerConfig
daemon_url: str
daemon_api_key: str
timeout: float | httpx.Timeout | None = 600.0
@classmethod
@override
def from_config(cls, config: DifyPluginLayerConfig) -> Self:
"""Reject construction without server-injected daemon settings."""
del config
raise TypeError("DifyPluginLayer requires server-side daemon settings and must use a registry factory.")
@classmethod
def from_config_with_settings(
cls,
config: DifyPluginLayerConfig,
*,
daemon_url: str,
daemon_api_key: str,
timeout: float | httpx.Timeout | None,
) -> Self:
"""Create a plugin layer from public config plus server-only daemon settings."""
return cls(config=config, daemon_url=daemon_url, daemon_api_key=daemon_api_key, timeout=timeout)
@override
def enter(self, control: LayerControl[EmptyRuntimeState, DifyPluginRuntimeHandles]):
"""Enter the layer and expose active handles through task-local context."""
return self._enter_with_active_handles(control)
@asynccontextmanager
async def _enter_with_active_handles(
self,
control: LayerControl[EmptyRuntimeState, DifyPluginRuntimeHandles],
) -> AsyncIterator[None]:
async with self.lifecycle_enter(control):
token = self._set_active_handles(control.runtime_handles)
try:
yield
finally:
_ACTIVE_PLUGIN_HANDLES.reset(token)
def get_provider(self, *, plugin_provider: str) -> DifyPluginDaemonProvider:
"""Return a provider backed by this layer's active HTTP client.
Raises:
RuntimeError: if called outside an active compositor context for this
layer, or after its runtime handles have been closed.
"""
handles = _ACTIVE_PLUGIN_HANDLES.get().get(id(self))
if handles is None or handles.http_client is None:
raise RuntimeError("DifyPluginLayer.get_provider() requires an active compositor context.")
return DifyPluginDaemonProvider(
tenant_id=self.config.tenant_id,
plugin_id=self.config.plugin_id,
plugin_provider=plugin_provider,
plugin_daemon_url=self.daemon_url,
plugin_daemon_api_key=self.daemon_api_key,
user_id=self.config.user_id,
timeout=self.timeout,
http_client=handles.http_client,
)
@override
async def on_context_create(
self,
control: LayerControl[EmptyRuntimeState, DifyPluginRuntimeHandles],
) -> None:
await self._open_http_client(control)
@override
async def on_context_resume(
self,
control: LayerControl[EmptyRuntimeState, DifyPluginRuntimeHandles],
) -> None:
await self._open_http_client(control)
@override
async def on_context_suspend(
self,
control: LayerControl[EmptyRuntimeState, DifyPluginRuntimeHandles],
) -> None:
await self._close_http_client(control)
@override
async def on_context_delete(
self,
control: LayerControl[EmptyRuntimeState, DifyPluginRuntimeHandles],
) -> None:
await self._close_http_client(control)
async def _open_http_client(self, control: LayerControl[EmptyRuntimeState, DifyPluginRuntimeHandles]) -> None:
if control.runtime_handles.http_client is None or control.runtime_handles.http_client.is_closed:
control.runtime_handles.http_client = httpx.AsyncClient(timeout=self.timeout, trust_env=False)
async def _close_http_client(self, control: LayerControl[EmptyRuntimeState, DifyPluginRuntimeHandles]) -> None:
client = control.runtime_handles.http_client
control.runtime_handles.http_client = None
if client is not None:
await client.aclose()
def _set_active_handles(self, handles: DifyPluginRuntimeHandles) -> Token[dict[int, DifyPluginRuntimeHandles]]:
active_handles = dict(_ACTIVE_PLUGIN_HANDLES.get())
active_handles[id(self)] = handles
return cast(Token[dict[int, DifyPluginRuntimeHandles]], _ACTIVE_PLUGIN_HANDLES.set(active_handles))
__all__ = ["DifyPluginLayer", "DifyPluginRuntimeHandles"]

View File

@ -1,8 +1,8 @@
"""Public protocol exports shared by the Dify Agent server and clients."""
from .schemas import (
DIFY_AGENT_MODEL_LAYER_ID,
RUN_EVENT_ADAPTER,
AgentProfileConfig,
BaseRunEvent,
CreateRunRequest,
CreateRunResponse,
@ -22,10 +22,10 @@ from .schemas import (
)
__all__ = [
"AgentProfileConfig",
"BaseRunEvent",
"CreateRunRequest",
"CreateRunResponse",
"DIFY_AGENT_MODEL_LAYER_ID",
"EmptyRunEventData",
"PydanticAIStreamRunEvent",
"RUN_EVENT_ADAPTER",

View File

@ -8,14 +8,16 @@ Redis stream ids (or in-memory equivalents in tests) are the public cursors used
by polling and SSE replay. Event envelopes keep the public
``id``/``run_id``/``type``/``data``/``created_at`` shape, while each ``type`` has
a typed ``data`` model so OpenAPI, Redis replay, and clients parse the same
payload contract. Successful runs publish the final JSON-safe agent output and
the resumable Agenton session snapshot together on the terminal
payload contract. Model/provider selection is part of the submitted Agenton
layer graph, not a top-level run field; the runtime reads the model layer named
by ``DIFY_AGENT_MODEL_LAYER_ID``. Successful runs publish the final JSON-safe
agent output and the resumable Agenton session snapshot together on the terminal
``run_succeeded`` event so consumers can treat terminal events as complete run
summaries.
"""
from datetime import datetime, timezone
from typing import Annotated, ClassVar, Literal, TypeAlias
from typing import Annotated, ClassVar, Final, Literal, TypeAlias
from pydantic import BaseModel, ConfigDict, Field, JsonValue, TypeAdapter
from pydantic_ai.messages import AgentStreamEvent
@ -23,6 +25,7 @@ from pydantic_ai.messages import AgentStreamEvent
from agenton.compositor import CompositorConfig, CompositorSessionSnapshot
DIFY_AGENT_MODEL_LAYER_ID: Final[str] = "llm"
RunStatus = Literal["running", "succeeded", "failed"]
RunEventType = Literal[
"run_started",
@ -37,25 +40,15 @@ def utc_now() -> datetime:
return datetime.now(timezone.utc)
class AgentProfileConfig(BaseModel):
"""Minimal model profile for the MVP runner.
``test`` uses pydantic-ai's ``TestModel`` and is credential-free. Other
profiles can be added behind this schema without changing run/event storage.
"""
provider: Literal["test"] = "test"
output_text: str = "Hello from the Dify Agent test model."
model_config: ClassVar[ConfigDict] = ConfigDict(extra="forbid")
class CreateRunRequest(BaseModel):
"""Request body for creating one async agent run."""
"""Request body for creating one async agent run.
Model/provider configuration must be supplied through the compositor layer
named by ``DIFY_AGENT_MODEL_LAYER_ID``.
"""
compositor: CompositorConfig
session_snapshot: CompositorSessionSnapshot | None = None
agent_profile: AgentProfileConfig = Field(default_factory=AgentProfileConfig)
model_config: ClassVar[ConfigDict] = ConfigDict(extra="forbid")
@ -164,10 +157,10 @@ class RunEventsResponse(BaseModel):
__all__ = [
"AgentProfileConfig",
"BaseRunEvent",
"CreateRunRequest",
"CreateRunResponse",
"DIFY_AGENT_MODEL_LAYER_ID",
"EmptyRunEventData",
"PydanticAIStreamRunEvent",
"RUN_EVENT_ADAPTER",

View File

@ -1,41 +1,36 @@
"""Pydantic AI agent construction for runtime profiles.
"""Pydantic AI agent construction for models supplied by Agenton layers.
The initial server exposes only a credential-free ``test`` profile. The factory
keeps model selection out of ``AgentRunRunner`` so production model profiles can
be added without changing storage or HTTP contracts. Agents are returned through
an ``object`` output boundary because the runner serializes final output to the
public JSON-safe event payload instead of assuming text-only results.
The run request carries model/provider selection in the layer graph. This helper
keeps Agent construction details out of ``AgentRunRunner`` while accepting an
already resolved Pydantic AI model from the configured model layer.
"""
from collections.abc import Sequence
from typing import Callable, cast
from typing import Any, Callable, cast
from pydantic_ai import Agent
from pydantic_ai.messages import UserContent
from pydantic_ai.models.test import TestModel
from pydantic_ai.models import Model
from agenton.layers.types import PydanticAIPrompt, PydanticAITool
from dify_agent.protocol.schemas import AgentProfileConfig
def create_agent(
profile: AgentProfileConfig,
model: Model[Any],
*,
system_prompts: Sequence[PydanticAIPrompt[object]],
tools: Sequence[PydanticAITool[object]],
) -> Agent[None, object]:
"""Create the pydantic-ai agent for one run."""
if profile.provider == "test":
return cast(
Agent[None, object],
Agent[None, str](
TestModel(custom_output_text=profile.output_text),
output_type=str,
system_prompt=materialize_static_system_prompts(system_prompts),
tools=tools,
),
)
raise ValueError(f"Unsupported agent profile provider: {profile.provider}")
return cast(
Agent[None, object],
Agent[None, str](
model,
output_type=str,
system_prompt=materialize_static_system_prompts(system_prompts),
tools=tools,
),
)
def materialize_static_system_prompts(system_prompts: Sequence[PydanticAIPrompt[object]]) -> list[str]:

View File

@ -1,29 +1,51 @@
"""Safe Agenton compositor construction for API-submitted configs.
Only explicitly registered layer types are constructible here. The MVP registry
contains ``PromptLayer`` so callers can provide system/user prompt fragments while
the runtime preserves hooks for richer profiles later.
Only explicitly registered layer types are constructible here. The default
registry contains prompt layers plus Dify plugin LLM layers. Public DTOs provide
tenant/plugin/model data, while server-only plugin daemon settings are injected
through the registry factory for ``DifyPluginLayer``.
"""
from typing import cast
import httpx
from pydantic_ai.messages import UserContent
from agenton.compositor import Compositor, CompositorConfig, LayerRegistry
from agenton.layers.types import AllPromptTypes, AllToolTypes, AllUserPromptTypes, PydanticAIPrompt, PydanticAITool
from agenton_collections.layers.plain.basic import PromptLayer
from agenton_collections.transformers.pydantic_ai import PYDANTIC_AI_TRANSFORMERS
from dify_agent.layers.dify_plugin.configs import DifyPluginLayerConfig
from dify_agent.layers.dify_plugin.llm_layer import DifyPluginLLMLayer
from dify_agent.layers.dify_plugin.plugin_layer import DifyPluginLayer
def create_default_layer_registry() -> LayerRegistry:
def create_default_layer_registry(
*,
plugin_daemon_url: str = "http://localhost:5002",
plugin_daemon_api_key: str = "",
plugin_daemon_timeout: float | httpx.Timeout | None = 600.0,
) -> LayerRegistry:
"""Return the server registry of safe config-constructible layers."""
registry = LayerRegistry()
registry.register_layer(PromptLayer)
registry.register_layer(
DifyPluginLayer,
factory=lambda config: DifyPluginLayer.from_config_with_settings(
DifyPluginLayerConfig.model_validate(config),
daemon_url=plugin_daemon_url,
daemon_api_key=plugin_daemon_api_key,
timeout=plugin_daemon_timeout,
),
)
registry.register_layer(DifyPluginLLMLayer)
return registry
def build_pydantic_ai_compositor(
config: CompositorConfig,
*,
registry: LayerRegistry | None = None,
) -> Compositor[
PydanticAIPrompt[object],
PydanticAITool[object],
@ -44,7 +66,7 @@ def build_pydantic_ai_compositor(
],
Compositor.from_config(
config,
registry=create_default_layer_registry(),
registry=registry or create_default_layer_registry(),
**PYDANTIC_AI_TRANSFORMERS, # pyright: ignore[reportArgumentType]
),
)

View File

@ -12,8 +12,9 @@ import logging
from collections.abc import Callable
from typing import Protocol
from agenton.compositor import LayerRegistry
from dify_agent.protocol.schemas import CreateRunRequest
from dify_agent.runtime.compositor_factory import build_pydantic_ai_compositor
from dify_agent.runtime.compositor_factory import build_pydantic_ai_compositor, create_default_layer_registry
from dify_agent.runtime.event_sink import RunEventSink, emit_run_failed
from dify_agent.runtime.runner import AgentRunRunner
from dify_agent.runtime.user_prompt_validation import EMPTY_USER_PROMPTS_ERROR, has_non_blank_user_prompt
@ -29,7 +30,7 @@ class SchedulerStoppingError(RuntimeError):
class RunStore(RunEventSink, Protocol):
"""Persistence boundary needed by the scheduler."""
async def create_run(self, request: CreateRunRequest) -> RunRecord:
async def create_run(self) -> RunRecord:
"""Persist a new run record and return it with status ``running``."""
...
@ -42,7 +43,7 @@ class RunnableRun(Protocol):
...
type RunRunnerFactory = Callable[[RunRecord], RunnableRun]
type RunRunnerFactory = Callable[[RunRecord, CreateRunRequest], RunnableRun]
class RunScheduler:
@ -61,6 +62,7 @@ class RunScheduler:
active_tasks: dict[str, asyncio.Task[None]]
stopping: bool
runner_factory: RunRunnerFactory
layer_registry: LayerRegistry
_lifecycle_lock: asyncio.Lock
def __init__(
@ -68,12 +70,14 @@ class RunScheduler:
*,
store: RunStore,
shutdown_grace_seconds: float = 30,
layer_registry: LayerRegistry | None = None,
runner_factory: RunRunnerFactory | None = None,
) -> None:
self.store = store
self.shutdown_grace_seconds = shutdown_grace_seconds
self.active_tasks = {}
self.stopping = False
self.layer_registry = layer_registry or create_default_layer_registry()
self.runner_factory = runner_factory or self._default_runner_factory
self._lifecycle_lock = asyncio.Lock()
@ -83,15 +87,15 @@ class RunScheduler:
The returned record is already ``running``. The background task is removed
from ``active_tasks`` when it finishes, regardless of success or failure.
"""
compositor = build_pydantic_ai_compositor(request.compositor)
compositor = build_pydantic_ai_compositor(request.compositor, registry=self.layer_registry)
if not has_non_blank_user_prompt(compositor.user_prompts):
raise ValueError(EMPTY_USER_PROMPTS_ERROR)
async with self._lifecycle_lock:
if self.stopping:
raise SchedulerStoppingError("run scheduler is shutting down")
record = await self.store.create_run(request)
task = asyncio.create_task(self._run_record(record), name=f"dify-agent-run-{record.run_id}")
record = await self.store.create_run()
task = asyncio.create_task(self._run_record(record, request), name=f"dify-agent-run-{record.run_id}")
self.active_tasks[record.run_id] = task
task.add_done_callback(lambda _task, run_id=record.run_id: self.active_tasks.pop(run_id, None))
return record
@ -115,18 +119,23 @@ class RunScheduler:
for run_id in pending_run_ids:
await self._mark_cancelled_run_failed(run_id)
async def _run_record(self, record: RunRecord) -> None:
async def _run_record(self, record: RunRecord, request: CreateRunRequest) -> None:
"""Execute a stored run and log failures already reflected in events."""
try:
await self.runner_factory(record).run()
await self.runner_factory(record, request).run()
except asyncio.CancelledError:
raise
except Exception:
logger.exception("scheduled run failed", extra={"run_id": record.run_id})
def _default_runner_factory(self, record: RunRecord) -> RunnableRun:
def _default_runner_factory(self, record: RunRecord, request: CreateRunRequest) -> RunnableRun:
"""Create the production runner for a stored run record."""
return AgentRunRunner(sink=self.store, request=record.request, run_id=record.run_id)
return AgentRunRunner(
sink=self.store,
request=request,
run_id=record.run_id,
layer_registry=self.layer_registry,
)
async def _mark_cancelled_run_failed(self, run_id: str) -> None:
"""Best-effort failure event/status for shutdown-cancelled runs."""

View File

@ -3,9 +3,10 @@
The runner is storage-agnostic: it builds an Agenton compositor, enters or
resumes its session, runs pydantic-ai with ``compositor.user_prompts`` as the user
input, emits stream events, suspends the session on exit, snapshots it, and then
publishes a terminal success or failure event. Successful terminal events contain
both the JSON-safe final output and session snapshot; there are no separate output
or snapshot events to correlate.
publishes a terminal success or failure event. The Pydantic AI model is resolved
from the active Agenton layer named by ``DIFY_AGENT_MODEL_LAYER_ID``. Successful
terminal events contain both the JSON-safe final output and session snapshot;
there are no separate output or snapshot events to correlate.
"""
from collections.abc import AsyncIterable
@ -14,10 +15,11 @@ from typing import cast
from pydantic import JsonValue, TypeAdapter
from pydantic_ai.messages import AgentStreamEvent
from agenton.compositor import CompositorSessionSnapshot
from dify_agent.protocol.schemas import CreateRunRequest
from agenton.compositor import CompositorSessionSnapshot, LayerRegistry
from dify_agent.layers.dify_plugin.llm_layer import DifyPluginLLMLayer
from dify_agent.protocol.schemas import DIFY_AGENT_MODEL_LAYER_ID, CreateRunRequest
from dify_agent.runtime.agent_factory import create_agent, normalize_user_input
from dify_agent.runtime.compositor_factory import build_pydantic_ai_compositor
from dify_agent.runtime.compositor_factory import build_pydantic_ai_compositor, create_default_layer_registry
from dify_agent.runtime.event_sink import (
RunEventSink,
emit_pydantic_ai_event,
@ -42,11 +44,20 @@ class AgentRunRunner:
request: CreateRunRequest
run_id: str
layer_registry: LayerRegistry
def __init__(self, *, sink: RunEventSink, request: CreateRunRequest, run_id: str) -> None:
def __init__(
self,
*,
sink: RunEventSink,
request: CreateRunRequest,
run_id: str,
layer_registry: LayerRegistry | None = None,
) -> None:
self.sink = sink
self.request = request
self.run_id = run_id
self.layer_registry = layer_registry or create_default_layer_registry()
async def run(self) -> None:
"""Execute the run and emit the documented event sequence."""
@ -71,7 +82,7 @@ class AgentRunRunner:
async def _run_agent(self) -> tuple[JsonValue, CompositorSessionSnapshot]:
"""Run pydantic-ai inside an entered Agenton session."""
compositor = build_pydantic_ai_compositor(self.request.compositor)
compositor = build_pydantic_ai_compositor(self.request.compositor, registry=self.layer_registry)
session = (
compositor.session_from_snapshot(self.request.session_snapshot)
if self.request.session_snapshot is not None
@ -87,11 +98,12 @@ class AgentRunRunner:
async for event in events:
_ = await emit_pydantic_ai_event(self.sink, run_id=self.run_id, data=event)
agent = create_agent(
self.request.agent_profile,
system_prompts=compositor.prompts,
tools=compositor.tools,
)
try:
model = compositor.get_layer(DIFY_AGENT_MODEL_LAYER_ID, DifyPluginLLMLayer).get_model()
except (KeyError, TypeError, RuntimeError) as exc:
raise AgentRunValidationError(str(exc)) from exc
agent = create_agent(model, system_prompts=compositor.prompts, tools=compositor.tools)
result = await agent.run(normalize_user_input(user_prompts), event_stream_handler=handle_events)
return _serialize_agent_output(result.output), compositor.snapshot_session(session)

View File

@ -13,6 +13,8 @@ from contextlib import asynccontextmanager
from fastapi import FastAPI
from redis.asyncio import Redis
from agenton.compositor import LayerRegistry
from dify_agent.runtime.compositor_factory import create_default_layer_registry
from dify_agent.runtime.run_scheduler import RunScheduler
from dify_agent.server.routes.runs import create_runs_router
from dify_agent.server.settings import ServerSettings
@ -22,7 +24,12 @@ from dify_agent.storage.redis_run_store import RedisRunStore
def create_app(settings: ServerSettings | None = None) -> FastAPI:
"""Build the FastAPI app with one shared Redis store and local scheduler."""
resolved_settings = settings or ServerSettings()
state: dict[str, RedisRunStore | RunScheduler] = {}
layer_registry = create_default_layer_registry(
plugin_daemon_url=resolved_settings.plugin_daemon_url,
plugin_daemon_api_key=resolved_settings.plugin_daemon_api_key,
plugin_daemon_timeout=resolved_settings.plugin_daemon_timeout,
)
state: dict[str, RedisRunStore | RunScheduler | LayerRegistry] = {"layer_registry": layer_registry}
@asynccontextmanager
async def lifespan(_app: FastAPI) -> AsyncGenerator[None, None]:
@ -32,7 +39,11 @@ def create_app(settings: ServerSettings | None = None) -> FastAPI:
prefix=resolved_settings.redis_prefix,
run_retention_seconds=resolved_settings.run_retention_seconds,
)
scheduler = RunScheduler(store=store, shutdown_grace_seconds=resolved_settings.shutdown_grace_seconds)
scheduler = RunScheduler(
store=store,
shutdown_grace_seconds=resolved_settings.shutdown_grace_seconds,
layer_registry=layer_registry,
)
state["store"] = store
state["scheduler"] = scheduler
try:
@ -49,7 +60,10 @@ def create_app(settings: ServerSettings | None = None) -> FastAPI:
def get_scheduler() -> RunScheduler:
return state["scheduler"] # pyright: ignore[reportReturnType]
app.include_router(create_runs_router(get_store, get_scheduler))
def get_layer_registry() -> LayerRegistry:
return state["layer_registry"] # pyright: ignore[reportReturnType]
app.include_router(create_runs_router(get_store, get_scheduler, get_layer_registry))
return app

View File

@ -13,17 +13,23 @@ from typing import Annotated
from fastapi import APIRouter, Depends, Header, HTTPException, Query
from fastapi.responses import StreamingResponse
from agenton.compositor import LayerRegistry
from dify_agent.protocol.schemas import CreateRunRequest, CreateRunResponse, RunEventsResponse, RunStatusResponse
from dify_agent.runtime.compositor_factory import build_pydantic_ai_compositor
from dify_agent.runtime.compositor_factory import build_pydantic_ai_compositor, create_default_layer_registry
from dify_agent.runtime.run_scheduler import RunScheduler, SchedulerStoppingError
from dify_agent.runtime.user_prompt_validation import EMPTY_USER_PROMPTS_ERROR, has_non_blank_user_prompt
from dify_agent.server.sse import sse_event_stream
from dify_agent.storage.redis_run_store import RedisRunStore, RunNotFoundError
def create_runs_router(get_store: Callable[[], RedisRunStore], get_scheduler: Callable[[], RunScheduler]) -> APIRouter:
def create_runs_router(
get_store: Callable[[], RedisRunStore],
get_scheduler: Callable[[], RunScheduler],
get_layer_registry: Callable[[], LayerRegistry] | None = None,
) -> APIRouter:
"""Create routes bound to the application's store dependency provider."""
router = APIRouter(prefix="/runs", tags=["runs"])
resolved_get_layer_registry = get_layer_registry or create_default_layer_registry
async def store_dep() -> RedisRunStore:
return get_store()
@ -37,7 +43,10 @@ def create_runs_router(get_store: Callable[[], RedisRunStore], get_scheduler: Ca
scheduler: Annotated[RunScheduler, Depends(scheduler_dep)],
) -> CreateRunResponse:
try:
compositor = build_pydantic_ai_compositor(request.compositor)
compositor = build_pydantic_ai_compositor(
request.compositor,
registry=resolved_get_layer_registry(),
)
except Exception as exc:
raise HTTPException(status_code=422, detail=str(exc)) from exc
if not has_non_blank_user_prompt(compositor.user_prompts):

View File

@ -23,14 +23,13 @@ def new_run_id() -> str:
class RunRecord(BaseModel):
"""Internal representation persisted for status reads.
The embedded request and status use protocol types so persisted records stay
JSON-compatible with the public API, but callers must import those DTOs from
``dify_agent.protocol.schemas`` rather than this server-only module.
Only status metadata is persisted. Create-run requests can contain model
credentials in layer config and must remain in scheduler memory rather than
being written to Redis.
"""
run_id: str
status: _protocol_schemas.RunStatus
request: _protocol_schemas.CreateRunRequest
created_at: datetime = Field(default_factory=_protocol_schemas.utc_now)
updated_at: datetime = Field(default_factory=_protocol_schemas.utc_now)
error: str | None = None

View File

@ -9,12 +9,15 @@ DEFAULT_RUN_RETENTION_SECONDS = 3 * 24 * 60 * 60
class ServerSettings(BaseSettings):
"""Environment-backed settings for Redis persistence, retention, and local scheduling."""
"""Environment-backed settings for Redis, scheduling, and plugin daemon access."""
redis_url: str = "redis://localhost:6379/0"
redis_prefix: str = "dify-agent"
shutdown_grace_seconds: float = 30
run_retention_seconds: int = Field(default=DEFAULT_RUN_RETENTION_SECONDS, ge=1)
plugin_daemon_url: str = "http://localhost:5002"
plugin_daemon_api_key: str = ""
plugin_daemon_timeout: float | None = 600.0
model_config: ClassVar[SettingsConfigDict] = SettingsConfigDict(
env_prefix="DIFY_AGENT_",

View File

@ -1,10 +1,12 @@
"""Redis-backed run records and per-run event streams.
The store writes run records as JSON strings and events as Redis streams. HTTP
event cursors are Redis stream ids; ``0-0`` means replay from the beginning for
polling and SSE. Records and streams share one retention window that is refreshed
when status or event data is written. Execution is scheduled in-process by
``dify_agent.runtime.run_scheduler``; Redis is not a job queue.
The store writes status-only run records as JSON strings and events as Redis
streams. HTTP event cursors are Redis stream ids; ``0-0`` means replay from the
beginning for polling and SSE. Records and streams share one retention window
that is refreshed when status or event data is written. Execution is scheduled
in-process by ``dify_agent.runtime.run_scheduler``; Redis is not a job queue, and
create-run payloads are never persisted because layer config may include model
credentials.
"""
from collections.abc import AsyncIterator
@ -12,14 +14,7 @@ from typing import cast
from redis.asyncio import Redis
from dify_agent.protocol.schemas import (
CreateRunRequest,
RUN_EVENT_ADAPTER,
RunEvent,
RunEventsResponse,
RunStatus,
utc_now,
)
from dify_agent.protocol.schemas import RUN_EVENT_ADAPTER, RunEvent, RunEventsResponse, RunStatus, utc_now
from dify_agent.runtime.event_sink import RunEventSink
from dify_agent.server.schemas import RunRecord, new_run_id
from dify_agent.server.settings import DEFAULT_RUN_RETENTION_SECONDS
@ -55,10 +50,10 @@ class RedisRunStore(RunEventSink):
self.prefix = prefix
self.run_retention_seconds = run_retention_seconds
async def create_run(self, request: CreateRunRequest) -> RunRecord:
"""Persist a running run record without enqueueing external work."""
async def create_run(self) -> RunRecord:
"""Persist a running run record without storing the create request."""
run_id = new_run_id()
record = RunRecord(run_id=run_id, status="running", request=request)
record = RunRecord(run_id=run_id, status="running")
await self.redis.set(
run_record_key(self.prefix, run_id),
record.model_dump_json(),

View File

@ -91,6 +91,42 @@ def test_layer_node_config_accepts_config_dto_and_serializes_fields() -> None:
assert [prompt.value for prompt in compositor.user_prompts] == ["ask politely"]
def test_registry_factory_constructs_layer_with_injected_dependencies() -> None:
registry = LayerRegistry()
registry.register_layer(
PromptLayer,
factory=lambda config: PromptLayer(prefix=PromptLayerConfig.model_validate(config).prefix),
)
compositor = CompositorBuilder(registry).add_config(
{"layers": [{"name": "prompt", "type": "plain.prompt", "config": {"prefix": "factory"}}]}
).build()
assert [prompt.value for prompt in compositor.prompts] == ["factory"]
def test_compositor_get_layer_returns_named_layer_and_validates_type() -> None:
layer = ObjectLayer("value")
compositor: Compositor[PlainPromptType, PlainToolType] = Compositor(layers=OrderedDict([("obj", layer)]))
assert compositor.get_layer("obj") is layer
assert compositor.get_layer("obj", ObjectLayer) is layer
try:
compositor.get_layer("missing")
except KeyError as e:
assert str(e) == '"Layer \'missing\' is not defined in this compositor."'
else:
raise AssertionError("Expected KeyError.")
try:
compositor.get_layer("obj", PromptLayer)
except TypeError as e:
assert str(e) == "Layer 'obj' must be PromptLayer, got ObjectLayer."
else:
raise AssertionError("Expected TypeError.")
class ObjectConsumerDeps(LayerDeps):
obj: ObjectLayer[str] # pyright: ignore[reportUninitializedInstanceVariable]

View File

@ -40,6 +40,7 @@ class DifyLLMAdapterModelTests(unittest.IsolatedAsyncioTestCase):
self,
*,
user_id: str | None = None,
http_client: httpx.AsyncClient | None = None,
) -> DifyPluginDaemonProvider:
return DifyPluginDaemonProvider(
tenant_id="tenant-1",
@ -48,6 +49,7 @@ class DifyLLMAdapterModelTests(unittest.IsolatedAsyncioTestCase):
plugin_daemon_url="http://plugin-daemon",
plugin_daemon_api_key="daemon-secret",
user_id=user_id,
http_client=http_client,
)
@asynccontextmanager
@ -172,6 +174,17 @@ class DifyLLMAdapterModelTests(unittest.IsolatedAsyncioTestCase):
self.assertEqual(response.parts[0].part_kind, "text")
self.assertEqual(cast(TextPart, response.parts[0]).content, "adapter response")
async def test_provider_does_not_close_external_http_client(self) -> None:
http_client = httpx.AsyncClient()
provider = self.make_provider(http_client=http_client)
self.assertIs(provider.client.http_client, http_client)
async with provider:
pass
self.assertFalse(http_client.is_closed)
await http_client.aclose()
async def test_request_returns_a_response(self) -> None:
def handler(_request: httpx.Request) -> httpx.Response:
return build_stream_response(

View File

@ -34,8 +34,7 @@ def _create_run_payload() -> dict[str, object]:
"compositor": {
"schema_version": 1,
"layers": [{"name": "prompt", "type": "plain.prompt", "config": {"user": "hello"}}],
},
"agent_profile": {"provider": "test", "output_text": "done"},
}
}
@ -80,7 +79,7 @@ def test_sync_methods_parse_protocol_dtos_and_send_create_request_dto() -> None:
compositor = cast(dict[str, object], payload["compositor"])
layers = cast(list[dict[str, object]], compositor["layers"])
assert layers[0]["config"] == {"user": "hello"}
assert payload["agent_profile"] == {"provider": "test", "output_text": "done"}
assert "agent_profile" not in payload
return httpx.Response(202, json={"run_id": "run-1", "status": "running"})
if request.method == "GET" and request.url.path == "/runs/run-1":
return httpx.Response(200, json=_run_status_json("running"))

View File

@ -0,0 +1,56 @@
import pytest
from pydantic import ValidationError
import dify_agent.layers.dify_plugin as dify_plugin_exports
from dify_agent.layers.dify_plugin import (
DifyPluginCredentialValue,
DifyPluginLLMLayerConfig,
DifyPluginLayerConfig,
)
def test_dify_plugin_package_exports_client_safe_config_symbols_only() -> None:
assert dify_plugin_exports.__all__ == [
"DifyPluginCredentialValue",
"DifyPluginLLMLayerConfig",
"DifyPluginLayerConfig",
]
assert not hasattr(dify_plugin_exports, "DifyPluginLayer")
assert not hasattr(dify_plugin_exports, "DifyPluginLLMLayer")
def test_dify_plugin_layer_config_forbids_runtime_settings() -> None:
config = DifyPluginLayerConfig(tenant_id="tenant-1", plugin_id="plugin-1", user_id="user-1")
assert config.tenant_id == "tenant-1"
assert config.plugin_id == "plugin-1"
assert config.user_id == "user-1"
with pytest.raises(ValidationError):
_ = DifyPluginLayerConfig.model_validate(
{
"tenant_id": "tenant-1",
"plugin_id": "plugin-1",
"daemon_url": "http://daemon",
}
)
def test_dify_plugin_llm_config_accepts_scalar_credentials_and_model_settings() -> None:
credential: DifyPluginCredentialValue = "secret"
config = DifyPluginLLMLayerConfig(
provider="openai",
model="gpt-4o-mini",
credentials={"api_key": credential, "enabled": True, "retries": 2, "ratio": 0.5, "empty": None},
model_settings={"temperature": 0.2, "max_tokens": 64},
)
assert config.credentials == {"api_key": "secret", "enabled": True, "retries": 2, "ratio": 0.5, "empty": None}
assert config.model_settings == {"temperature": 0.2, "max_tokens": 64}
with pytest.raises(ValidationError):
_ = DifyPluginLLMLayerConfig.model_validate(
{
"provider": "openai",
"model": "gpt-4o-mini",
"credentials": {"nested": {"not": "allowed"}},
}
)

View File

@ -0,0 +1,82 @@
import asyncio
from collections import OrderedDict
from typing import cast
from agenton.compositor import Compositor
from agenton.layers import PlainPromptType, PlainToolType
from dify_agent.adapters.llm import DifyLLMAdapterModel
from dify_agent.layers.dify_plugin.configs import DifyPluginLLMLayerConfig, DifyPluginLayerConfig
from dify_agent.layers.dify_plugin.llm_layer import DifyPluginLLMLayer
from dify_agent.layers.dify_plugin.plugin_layer import DifyPluginLayer, DifyPluginRuntimeHandles
def _plugin_layer() -> DifyPluginLayer:
return DifyPluginLayer.from_config_with_settings(
DifyPluginLayerConfig(tenant_id="tenant-1", plugin_id="langgenius/openai", user_id="user-1"),
daemon_url="http://plugin-daemon",
daemon_api_key="daemon-secret",
timeout=12,
)
def _llm_layer() -> DifyPluginLLMLayer:
return DifyPluginLLMLayer.from_config(
DifyPluginLLMLayerConfig(
provider="openai",
model="demo-model",
credentials={"api_key": "secret"},
model_settings={"temperature": 0.2},
)
)
def test_dify_plugin_layer_get_provider_requires_active_context_and_uses_runtime_client() -> None:
async def scenario() -> None:
plugin = _plugin_layer()
compositor: Compositor[PlainPromptType, PlainToolType] = Compositor(layers=OrderedDict([("plugin", plugin)]))
try:
_ = plugin.get_provider(plugin_provider="openai")
except RuntimeError as e:
assert str(e) == "DifyPluginLayer.get_provider() requires an active compositor context."
else:
raise AssertionError("Expected RuntimeError.")
async with compositor.enter() as session:
handles = cast(DifyPluginRuntimeHandles, cast(object, session.layer("plugin").runtime_handles))
client = handles.http_client
assert client is not None
provider = plugin.get_provider(plugin_provider="openai")
assert provider.client.http_client is client
assert provider.client.tenant_id == "tenant-1"
assert provider.client.plugin_id == "langgenius/openai"
assert provider.client.provider == "openai"
assert provider.client.user_id == "user-1"
async with provider:
pass
assert client.is_closed is False
assert client.is_closed is True
asyncio.run(scenario())
def test_dify_plugin_llm_layer_builds_adapter_model_from_dependency_provider() -> None:
async def scenario() -> None:
plugin = _plugin_layer()
llm = _llm_layer()
compositor: Compositor[PlainPromptType, PlainToolType] = Compositor(
layers=OrderedDict([("plugin", plugin), ("llm", llm)]),
deps_name_mapping={"llm": {"plugin": "plugin"}},
)
async with compositor.enter() as session:
model = llm.get_model()
assert isinstance(model, DifyLLMAdapterModel)
assert model.model_name == "demo-model"
assert model.credentials == {"api_key": "secret"}
assert model.provider.name == "DifyPlugin/openai"
handles = cast(DifyPluginRuntimeHandles, cast(object, session.layer("plugin").runtime_handles))
assert model.provider.client.http_client is handles.http_client
asyncio.run(scenario())

View File

@ -3,8 +3,10 @@ from pydantic import ValidationError
from pydantic_ai.messages import FinalResultEvent
from agenton.compositor import CompositorSessionSnapshot
from dify_agent.protocol import DIFY_AGENT_MODEL_LAYER_ID
from dify_agent.protocol.schemas import (
RUN_EVENT_ADAPTER,
CreateRunRequest,
PydanticAIStreamRunEvent,
RunFailedEvent,
RunFailedEventData,
@ -49,6 +51,17 @@ def test_pydantic_ai_event_data_uses_agent_stream_event_model() -> None:
assert isinstance(event.data, FinalResultEvent)
def test_create_run_request_rejects_agent_profile_and_model_layer_id_is_public() -> None:
assert DIFY_AGENT_MODEL_LAYER_ID == "llm"
with pytest.raises(ValidationError):
_ = CreateRunRequest.model_validate(
{
"compositor": {"layers": []},
"agent_profile": {"provider": "test", "output_text": "done"},
}
)
@pytest.mark.parametrize("event_type", ["agent_output", "session_snapshot"])
def test_removed_non_terminal_payload_events_are_rejected(event_type: str) -> None:
with pytest.raises(ValidationError):

View File

@ -31,9 +31,9 @@ class FakeStore:
self.statuses = {}
self.errors = {}
async def create_run(self, request: CreateRunRequest) -> RunRecord:
async def create_run(self) -> RunRecord:
run_id = f"run-{len(self.records) + 1}"
record = RunRecord(run_id=run_id, status="running", request=request)
record = RunRecord(run_id=run_id, status="running")
self.records[run_id] = record
self.statuses[run_id] = "running"
return record
@ -57,10 +57,10 @@ class SlowCreateStore(FakeStore):
self.create_started = create_started
self.release_create = release_create
async def create_run(self, request: CreateRunRequest) -> RunRecord:
async def create_run(self) -> RunRecord:
_ = self.create_started.set()
await self.release_create.wait()
return await super().create_run(request)
return await super().create_run()
class ControlledRunner:
@ -83,7 +83,7 @@ def test_create_run_starts_background_task_and_returns_running() -> None:
release = asyncio.Event()
scheduler = RunScheduler(
store=store,
runner_factory=lambda _record: ControlledRunner(started=started, release=release),
runner_factory=lambda _record, _request: ControlledRunner(started=started, release=release),
)
record = await scheduler.create_run(_request())
@ -106,7 +106,7 @@ def test_shutdown_marks_unfinished_runs_failed_and_appends_event() -> None:
scheduler = RunScheduler(
store=store,
shutdown_grace_seconds=0,
runner_factory=lambda _record: ControlledRunner(started=started, release=asyncio.Event()),
runner_factory=lambda _record, _request: ControlledRunner(started=started, release=asyncio.Event()),
)
record = await scheduler.create_run(_request())
await asyncio.wait_for(started.wait(), timeout=1)
@ -155,7 +155,7 @@ def test_shutdown_waits_for_in_flight_create_to_register_before_cancelling() ->
scheduler = RunScheduler(
store=store,
shutdown_grace_seconds=0,
runner_factory=lambda _record: ControlledRunner(started=runner_started, release=asyncio.Event()),
runner_factory=lambda _record, _request: ControlledRunner(started=runner_started, release=asyncio.Event()),
)
create_task = asyncio.create_task(scheduler.create_run(_request()))

View File

@ -1,26 +1,54 @@
import asyncio
import pytest
from pydantic_ai.models.test import TestModel
from agenton.compositor import CompositorConfig, LayerNodeConfig
from dify_agent.protocol.schemas import AgentProfileConfig, CreateRunRequest, RunSucceededEvent
from agenton_collections.layers.plain import PromptLayerConfig
from dify_agent.layers.dify_plugin.configs import DifyPluginLLMLayerConfig, DifyPluginLayerConfig
from dify_agent.layers.dify_plugin.llm_layer import DifyPluginLLMLayer
from dify_agent.protocol import DIFY_AGENT_MODEL_LAYER_ID
from dify_agent.protocol.schemas import CreateRunRequest, RunSucceededEvent
from dify_agent.runtime.event_sink import InMemoryRunEventSink
from dify_agent.runtime.runner import AgentRunRunner, AgentRunValidationError
def test_runner_emits_terminal_success_and_snapshot() -> None:
request = CreateRunRequest(
def _request(user: str | list[str] = "hello", *, llm_layer_name: str = DIFY_AGENT_MODEL_LAYER_ID) -> CreateRunRequest:
return CreateRunRequest(
compositor=CompositorConfig(
layers=[
LayerNodeConfig(
name="prompt",
type="plain.prompt",
config={"prefix": "system", "user": "hello"},
)
config=PromptLayerConfig(prefix="system", user=user),
),
LayerNodeConfig(
name="plugin",
type="dify.plugin",
config=DifyPluginLayerConfig(tenant_id="tenant-1", plugin_id="langgenius/openai"),
),
LayerNodeConfig(
name=llm_layer_name,
type="dify.plugin.llm",
deps={"plugin": "plugin"},
config=DifyPluginLLMLayerConfig(
provider="openai",
model="demo-model",
credentials={"api_key": "secret"},
),
),
]
),
agent_profile=AgentProfileConfig(output_text="done"),
)
)
def test_runner_emits_terminal_success_and_snapshot(monkeypatch: pytest.MonkeyPatch) -> None:
def fake_get_model(self: DifyPluginLLMLayer):
assert self.config.model == "demo-model"
return TestModel(custom_output_text="done") # pyright: ignore[reportReturnType]
monkeypatch.setattr(DifyPluginLLMLayer, "get_model", fake_get_model)
request = _request()
sink = InMemoryRunEventSink()
asyncio.run(AgentRunRunner(sink=sink, request=request, run_id="run-1").run())
@ -34,16 +62,16 @@ def test_runner_emits_terminal_success_and_snapshot() -> None:
terminal = sink.events["run-1"][-1]
assert isinstance(terminal, RunSucceededEvent)
assert terminal.data.output == "done"
assert [layer.name for layer in terminal.data.session_snapshot.layers] == ["prompt"]
assert [layer.name for layer in terminal.data.session_snapshot.layers] == [
"prompt",
"plugin",
DIFY_AGENT_MODEL_LAYER_ID,
]
assert sink.statuses["run-1"] == "succeeded"
def test_runner_fails_empty_user_prompts() -> None:
request = CreateRunRequest(
compositor=CompositorConfig(
layers=[LayerNodeConfig(name="prompt", type="plain.prompt", config={"prefix": "system"})]
)
)
request = _request("")
sink = InMemoryRunEventSink()
with pytest.raises(AgentRunValidationError):
@ -54,11 +82,7 @@ def test_runner_fails_empty_user_prompts() -> None:
def test_runner_fails_blank_string_user_prompt_list() -> None:
request = CreateRunRequest(
compositor=CompositorConfig(
layers=[LayerNodeConfig(name="prompt", type="plain.prompt", config={"user": ["", " "]})]
)
)
request = _request(["", " "])
sink = InMemoryRunEventSink()
with pytest.raises(AgentRunValidationError):
@ -66,3 +90,14 @@ def test_runner_fails_blank_string_user_prompt_list() -> None:
assert [event.type for event in sink.events["run-3"]] == ["run_started", "run_failed"]
assert sink.statuses["run-3"] == "failed"
def test_runner_requires_llm_layer_id() -> None:
request = _request(llm_layer_name="not-llm")
sink = InMemoryRunEventSink()
with pytest.raises(AgentRunValidationError, match="llm"):
asyncio.run(AgentRunRunner(sink=sink, request=request, run_id="run-4").run())
assert [event.type for event in sink.events["run-4"]] == ["run_started", "run_failed"]
assert sink.statuses["run-4"] == "failed"

View File

@ -2,6 +2,9 @@ import pytest
from fastapi.testclient import TestClient
import dify_agent.server.app as app_module
from agenton.compositor import LayerRegistry
from dify_agent.layers.dify_plugin.configs import DifyPluginLayerConfig
from dify_agent.layers.dify_plugin.plugin_layer import DifyPluginLayer
from dify_agent.server.app import create_app
from dify_agent.server.settings import ServerSettings
from dify_agent.storage.redis_run_store import RedisRunStore
@ -22,6 +25,7 @@ class FakeRunScheduler:
store: object
shutdown_grace_seconds: float
layer_registry: LayerRegistry
shutdown_called: bool
def __init__(
@ -29,9 +33,11 @@ class FakeRunScheduler:
*,
store: object,
shutdown_grace_seconds: float,
layer_registry: LayerRegistry,
) -> None:
self.store = store
self.shutdown_grace_seconds = shutdown_grace_seconds
self.layer_registry = layer_registry
self.shutdown_called = False
self.created.append(self)
@ -50,12 +56,23 @@ def test_create_app_creates_scheduler_and_closes_after_shutdown(monkeypatch: pyt
redis_prefix="test",
shutdown_grace_seconds=5,
run_retention_seconds=7,
plugin_daemon_url="http://plugin-daemon",
plugin_daemon_api_key="daemon-secret",
plugin_daemon_timeout=12,
)
with TestClient(create_app(settings)):
assert len(FakeRunScheduler.created) == 1
scheduler = FakeRunScheduler.created[0]
assert scheduler.shutdown_grace_seconds == 5
assert isinstance(scheduler.layer_registry, LayerRegistry)
descriptor = scheduler.layer_registry.resolve("dify.plugin")
assert descriptor.factory is not None
plugin_layer = descriptor.factory(DifyPluginLayerConfig(tenant_id="tenant-1", plugin_id="plugin-1"))
assert isinstance(plugin_layer, DifyPluginLayer)
assert plugin_layer.daemon_url == "http://plugin-daemon"
assert plugin_layer.daemon_api_key == "daemon-secret"
assert plugin_layer.timeout == 12
store = scheduler.store
assert isinstance(store, RedisRunStore)
assert store.run_retention_seconds == 7

View File

@ -1,6 +1,5 @@
from fastapi.testclient import TestClient
from dify_agent.protocol.schemas import CreateRunRequest
from dify_agent.runtime.run_scheduler import SchedulerStoppingError
from dify_agent.server.routes.runs import create_runs_router
from dify_agent.server.schemas import RunRecord
@ -44,7 +43,7 @@ def test_create_run_returns_running_from_scheduler() -> None:
class CapturingScheduler:
async def create_run(self, request: object) -> RunRecord:
del request
return RunRecord(run_id="run-1", status="running", request=_request())
return RunRecord(run_id="run-1", status="running")
app = FastAPI()
app.include_router(
@ -119,13 +118,3 @@ def test_create_run_does_not_map_infrastructure_failure_to_422() -> None:
)
assert response.status_code == 500
def _request():
from agenton.compositor import CompositorConfig, LayerNodeConfig
return CreateRunRequest(
compositor=CompositorConfig(
layers=[LayerNodeConfig(name="prompt", type="plain.prompt", config={"user": "hello"})]
)
)

View File

@ -4,20 +4,12 @@ from typing import cast
from pydantic import JsonValue
from agenton.compositor import CompositorConfig, CompositorSessionSnapshot, LayerNodeConfig, LayerSessionSnapshot
from agenton.compositor import CompositorSessionSnapshot, LayerSessionSnapshot
from agenton.layers import LifecycleState
from dify_agent.protocol.schemas import CreateRunRequest, RunStartedEvent, RunSucceededEvent, RunSucceededEventData
from dify_agent.protocol.schemas import RunStartedEvent, RunSucceededEvent, RunSucceededEventData
from dify_agent.storage.redis_run_store import DEFAULT_RUN_RETENTION_SECONDS, RedisRunStore
def _request() -> CreateRunRequest:
return CreateRunRequest(
compositor=CompositorConfig(
layers=[LayerNodeConfig(name="prompt", type="plain.prompt", config={"user": "hello"})]
)
)
class FakeRedis:
commands: list[tuple[object, ...]]
values: dict[str, object]
@ -74,18 +66,19 @@ def test_create_run_writes_running_record_without_job_queue_and_with_retention()
redis = FakeRedis()
store = RedisRunStore(redis, prefix="test") # pyright: ignore[reportArgumentType]
record = asyncio.run(store.create_run(_request()))
record = asyncio.run(store.create_run())
assert record.status == "running"
assert [command[0] for command in redis.commands] == ["set"]
assert redis.commands[0][1] == f"test:runs:{record.run_id}:record"
assert redis.commands[0][3] == DEFAULT_RUN_RETENTION_SECONDS
assert "request" not in str(redis.commands[0][2])
def test_update_status_refreshes_record_retention() -> None:
redis = FakeRedis()
store = RedisRunStore(redis, prefix="test", run_retention_seconds=60) # pyright: ignore[reportArgumentType]
record = asyncio.run(store.create_run(_request()))
record = asyncio.run(store.create_run())
redis.commands.clear()
asyncio.run(store.update_status(record.run_id, "succeeded"))
@ -128,7 +121,7 @@ def test_get_events_round_trips_run_succeeded_output_and_session_snapshot() -> N
)
async def scenario() -> tuple[str, RunSucceededEvent]:
record = await store.create_run(_request())
record = await store.create_run()
event_id = await store.append_event(
RunSucceededEvent(
id="local-only",