refine agenton layer lifecycle controls

This commit is contained in:
盐粒 Yanli 2026-05-12 04:38:23 +08:00
parent ab0b4c45cb
commit e8c16fb08b
28 changed files with 822 additions and 118 deletions

View File

@ -29,6 +29,9 @@ Construction and dependency APIs:
- `bind_deps(deps: Mapping[str, Layer | None]) -> None`: bind graph dependencies.
- `new_control(state=LifecycleState.NEW, runtime_state=None) -> LayerControl`: create
a schema-validated per-session control.
- `require_control(control, active=False) -> LayerControl`: validate that a
capability method received this layer's own control with the expected runtime
schemas, optionally requiring `LifecycleState.ACTIVE`.
Lifecycle hooks:
@ -37,7 +40,9 @@ Lifecycle hooks:
- `on_context_suspend(control)`
- `on_context_delete(control)`
- `enter(control)` / `lifecycle_enter(control)`: async context manager entry
surface. Override `enter()` only when a layer needs to wrap extra resources.
surface. The base lifecycle owns the per-entry resource stack; override
`enter()` only for unusual wrapping that cannot be expressed as registered
resources.
Prompt/tool authoring surfaces:
@ -67,6 +72,10 @@ Methods:
- `suspend_on_exit() -> None`
- `delete_on_exit() -> None`
- `enter_async_resource(cm) -> T`: enter an async context manager on the current
entry resource stack and return its resource.
- `add_async_cleanup(callback) -> None`: register an async cleanup callback on the
current entry resource stack.
- `control_for(dep_layer) -> LayerControl`: resolve the unique dependency control
whose resolved target is `dep_layer` in the same session.
- `control_for(dep_name, dep_layer) -> LayerControl`: resolve a named dependency
@ -74,7 +83,9 @@ Methods:
`runtime_state` is serialized in session snapshots. `runtime_handles` is never
serialized and should be rehydrated from runtime state in resume hooks. Private
owner links used by `control_for` are runtime-only and are not snapshotted.
owner links used by `control_for` and the per-entry resource stack are
runtime-only and are not snapshotted. Resource-stack APIs are available only
while a layer entry is being created/resumed, active, or exiting.
### Schema defaults and lifecycle enums

View File

@ -16,7 +16,9 @@ on the `LayerControl` created for that layer in a `CompositorSession`.
- **Runtime handles** are live Python objects such as clients, open files, or
process handles. Layers declare a Pydantic `runtime_handles_type` with
`arbitrary_types_allowed=True`. Handles are never serialized; resume hooks
should rehydrate them from runtime state.
should rehydrate them from runtime state. Register handles that need async
cleanup with the control's entry resource stack rather than closing them
manually in layer instances.
## Define a config-backed layer
@ -49,6 +51,42 @@ Omitted schema slots default to `EmptyLayerConfig`, `EmptyRuntimeState`, and
`LayerControl[MyState, MyHandles]` to get static checking and IDE completion for
runtime state and handles.
## Live resources
The base lifecycle creates a resource stack for each `LayerControl` entry before
`on_context_create` or `on_context_resume` runs. Enter async resources through the
control, store the live handle in `runtime_handles`, and clear the handle in
`on_context_suspend`/`on_context_delete`; the resource stack performs the actual
close after those hooks and also cleans up if create/resume or the context body
raises.
```python {test="skip" lint="skip"}
class ClientHandles(BaseModel):
client: httpx.AsyncClient | None = None
model_config = ConfigDict(arbitrary_types_allowed=True)
@dataclass
class ClientLayer(PlainLayer[NoLayerDeps, EmptyLayerConfig, EmptyRuntimeState, ClientHandles]):
async def on_context_create(self, control: LayerControl[EmptyRuntimeState, ClientHandles]) -> None:
control.runtime_handles.client = await control.enter_async_resource(httpx.AsyncClient())
async def on_context_delete(self, control: LayerControl[EmptyRuntimeState, ClientHandles]) -> None:
control.runtime_handles.client = None
def make_client_user(self, control: LayerControl) -> ClientUser:
control = self.require_control(control, active=True)
if control.runtime_handles.client is None:
raise RuntimeError("client is not available")
return ClientUser(control.runtime_handles.client)
```
`Layer.require_control(control, active=True)` is the recommended first line for
capability methods that read runtime state or handles. It verifies that callers
passed this layer's own control from the current session and, when requested, that
the control is active.
## Register layers and build a compositor
Register config-constructible layers manually:

View File

@ -14,7 +14,9 @@ server-only and should not be used by API consumers.
Create-run requests accept a `CompositorConfig` and an optional
`CompositorSessionSnapshot`. There is **no top-level `user_prompt` or model
profile field**. User input and model/provider selection are supplied by Agenton
layers. In the MVP server, the safe config-constructible layer registry includes
layers. `layer_exit_signals` optionally controls whether layers suspend or delete
when the run leaves the active session; the default is suspend for all layers. In
the MVP server, the safe config-constructible layer registry includes
`plain.prompt`, `dify.plugin`, and `dify.plugin.llm`. The runtime reads the LLM
model layer named by `DIFY_AGENT_MODEL_LAYER_ID`, whose public value is `"llm"`.
@ -62,7 +64,7 @@ Request:
"plugin": "plugin"
},
"config": {
"provider": "openai",
"model_provider": "openai",
"model": "gpt-4o-mini",
"credentials": {
"api_key": "replace-with-provider-key"
@ -74,7 +76,13 @@ Request:
}
]
},
"session_snapshot": null
"session_snapshot": null,
"layer_exit_signals": {
"default": "suspend",
"layers": {
"prompt": "delete"
}
}
}
```
@ -94,7 +102,8 @@ event streams expire after `DIFY_AGENT_RUN_RETENTION_SECONDS`, which defaults to
`dify.plugin` receives tenant/plugin identity only; daemon URL, API key, and
timeout are server settings. `dify.plugin.llm.credentials` accepts scalar values
only (`string`, `number`, `boolean`, or `null`).
only (`string`, `number`, `boolean`, or `null`). Unknown
`layer_exit_signals.layers` keys return `422` before a run record is created.
Validation error example (`422`):
@ -191,7 +200,7 @@ from agenton.compositor import CompositorConfig, LayerNodeConfig
from agenton_collections.layers.plain import PromptLayerConfig
from dify_agent.client import Client
from dify_agent.layers.dify_plugin import DifyPluginLLMLayerConfig, DifyPluginLayerConfig
from dify_agent.protocol import DIFY_AGENT_MODEL_LAYER_ID, CreateRunRequest
from dify_agent.protocol import DIFY_AGENT_MODEL_LAYER_ID, CreateRunRequest, LayerExitSignals
async def main() -> None:
@ -209,13 +218,14 @@ async def main() -> None:
type="dify.plugin.llm",
deps={"plugin": "plugin"},
config=DifyPluginLLMLayerConfig(
provider="openai",
model_provider="openai",
model="gpt-4o-mini",
credentials={"api_key": "provider-key"},
),
),
]
)
),
layer_exit_signals=LayerExitSignals(layers={"prompt": "delete"}),
)
async with Client(base_url="http://localhost:8000") as client:
run = await client.create_run(request)
@ -245,7 +255,7 @@ request = CreateRunRequest(
type="dify.plugin.llm",
deps={"plugin": "plugin"},
config=DifyPluginLLMLayerConfig(
provider="openai",
model_provider="openai",
model="gpt-4o-mini",
credentials={"api_key": "provider-key"},
),

View File

@ -61,10 +61,10 @@ async def main() -> None:
DifyPluginDaemonProvider(
tenant_id=required_env("DIFY_AGENT_TENANT_ID"),
plugin_id=required_env("DIFY_AGENT_PLUGIN_ID"),
plugin_provider=required_env("DIFY_AGENT_PROVIDER"),
plugin_daemon_url=required_env("PLUGIN_DAEMON_URL"),
plugin_daemon_api_key=required_env("PLUGIN_DAEMON_KEY"),
),
model_provider=required_env("DIFY_AGENT_PROVIDER"),
credentials=load_credentials(),
)
agent = Agent(model=model)

View File

@ -56,7 +56,7 @@ async def main() -> None:
type="dify.plugin.llm",
deps={"plugin": "plugin"},
config=DifyPluginLLMLayerConfig(
provider=PLUGIN_PROVIDER,
model_provider=PLUGIN_PROVIDER,
model=MODEL_NAME,
credentials=MODEL_CREDENTIALS,
),

View File

@ -48,7 +48,7 @@ def main() -> None:
type="dify.plugin.llm",
deps={"plugin": "plugin"},
config=DifyPluginLLMLayerConfig(
provider=PLUGIN_PROVIDER,
model_provider=PLUGIN_PROVIDER,
model=MODEL_NAME,
credentials=MODEL_CREDENTIALS,
),

View File

@ -264,6 +264,14 @@ class CompositorSession:
return self._control_for_unique_dependency(owner_layer_id, dep_layer)
return self._control_for_named_dependency(owner_layer_id, dep_name, dep_layer)
def _layer_for_control_owner(self, owner_layer_id: str) -> Layer[Any, Any, Any, Any, Any, Any, Any]:
"""Return the layer instance that owns a control in this session."""
compositor = self._require_owner_compositor()
try:
return compositor.layers[owner_layer_id]
except KeyError as e:
raise KeyError(f"Layer '{owner_layer_id}' is not defined in this compositor.") from e
def _control_for_unique_dependency(
self,
owner_layer_id: str,

View File

@ -28,7 +28,10 @@ resets to delete on every successful enter. Layer instances are shared graph and
capability definitions, so session-local serializable ids, checkpoints, and
other snapshot data belong in ``LayerControl.runtime_state``; live clients,
connections, and process handles belong in ``LayerControl.runtime_handles``.
Neither category should be stored on ``self`` when it is session-local.
Neither category should be stored on ``self`` when it is session-local. Live
resources that need deterministic cleanup should be registered on the control's
per-entry resource stack; the base lifecycle closes that stack after suspend and
delete hooks, and also when create/resume fails.
``Layer`` is framework-neutral over system prompt, user prompt, and tool item
types. The native ``prefix_prompts``, ``suffix_prompts``, ``user_prompts``, and
@ -39,8 +42,8 @@ native values without changing layer implementations.
"""
from abc import ABC, abstractmethod
from collections.abc import AsyncIterator
from contextlib import AbstractAsyncContextManager, asynccontextmanager
from collections.abc import AsyncIterator, Awaitable, Callable
from contextlib import AbstractAsyncContextManager, AsyncExitStack, asynccontextmanager
from dataclasses import dataclass, field
from enum import StrEnum
from types import UnionType
@ -89,6 +92,7 @@ _RuntimeStateT = TypeVar("_RuntimeStateT", bound=BaseModel, default="EmptyRuntim
_RuntimeHandlesT = TypeVar("_RuntimeHandlesT", bound=BaseModel, default="EmptyRuntimeHandles")
_DepRuntimeStateT = TypeVar("_DepRuntimeStateT", bound=BaseModel)
_DepRuntimeHandlesT = TypeVar("_DepRuntimeHandlesT", bound=BaseModel)
_ResourceT = TypeVar("_ResourceT")
class _LayerControlOwnerSession(Protocol):
@ -101,6 +105,8 @@ class _LayerControlOwnerSession(Protocol):
dep_layer: "Layer[Any, Any, Any, Any, Any, _DepRuntimeStateT, _DepRuntimeHandlesT]",
) -> "LayerControl[_DepRuntimeStateT, _DepRuntimeHandlesT]": ...
def _layer_for_control_owner(self, owner_layer_id: str) -> "Layer[Any, Any, Any, Any, Any, Any, Any]": ...
class LayerDeps:
"""Typed dependency container for a Layer.
@ -201,7 +207,9 @@ class LayerControl(Generic[_RuntimeStateT, _RuntimeHandlesT]):
handles are not serialized in snapshots and should be rehydrated from
runtime state in resume hooks. A compositor also binds private owner metadata
so ``control_for`` can find controls for this layer's dependencies in the
same session; those links are runtime-only and not part of snapshots.
same session; those links are runtime-only and not part of snapshots. The
per-entry resource stack is also runtime-only and exists only while the layer
is entering, active, or exiting.
"""
state: LifecycleState = LifecycleState.NEW
@ -210,6 +218,7 @@ class LayerControl(Generic[_RuntimeStateT, _RuntimeHandlesT]):
runtime_handles: _RuntimeHandlesT = field(default_factory=lambda: cast(_RuntimeHandlesT, EmptyRuntimeHandles()))
_owner_session: _LayerControlOwnerSession | None = field(default=None, init=False, repr=False, compare=False)
_owner_layer_id: str | None = field(default=None, init=False, repr=False, compare=False)
_entry_stack: AsyncExitStack | None = field(default=None, init=False, repr=False, compare=False)
def suspend_on_exit(self) -> None:
"""Request suspend behavior when the current layer entry exits."""
@ -219,6 +228,19 @@ class LayerControl(Generic[_RuntimeStateT, _RuntimeHandlesT]):
"""Request delete behavior when the current layer entry exits."""
self.exit_intent = ExitIntent.DELETE
async def enter_async_resource(self, cm: AbstractAsyncContextManager[_ResourceT]) -> _ResourceT:
"""Enter ``cm`` on this control's current entry resource stack.
Resource registration is available only while a layer entry is in
progress or active. The base lifecycle closes registered resources after
suspend/delete hooks and when create/resume fails.
"""
return await self._require_entry_stack().enter_async_context(cm)
def add_async_cleanup(self, callback: Callable[[], Awaitable[None]]) -> None:
"""Register an async cleanup callback on the current entry resource stack."""
self._require_entry_stack().push_async_callback(callback)
@overload
def control_for(
self,
@ -270,6 +292,22 @@ class LayerControl(Generic[_RuntimeStateT, _RuntimeHandlesT]):
self._owner_session = session
self._owner_layer_id = layer_id
def _require_entry_stack(self) -> AsyncExitStack:
if self._entry_stack is None:
raise RuntimeError("LayerControl entry resource stack is not active.")
return self._entry_stack
def _begin_entry_stack(self) -> None:
if self._entry_stack is not None:
raise RuntimeError("LayerControl entry resource stack is already active.")
self._entry_stack = AsyncExitStack()
async def _close_entry_stack(self) -> None:
stack = self._entry_stack
self._entry_stack = None
if stack is not None:
await stack.aclose()
@dataclass(frozen=True, slots=True)
class LayerDepSpec:
@ -385,44 +423,115 @@ class Layer(
resolved_deps[name] = deps[name]
self.deps = self.deps_type(**resolved_deps)
def require_control(
self,
control: LayerControl[Any, Any],
*,
active: bool = False,
) -> LayerControl[_RuntimeStateT, _RuntimeHandlesT]:
"""Validate and return ``control`` as this layer's current session control.
Capability methods should accept their own layer control explicitly and
call this helper before reading runtime state or handles. The control must
be attached to a compositor session whose owner layer id resolves to this
exact layer instance. Runtime state and handle schemas are checked against
the layer's declared schema types; when ``active`` is true, the lifecycle
state must be ``LifecycleState.ACTIVE``.
"""
if control._owner_session is None or control._owner_layer_id is None:
raise RuntimeError("LayerControl is not attached to a compositor session.")
try:
owner_layer = control._owner_session._layer_for_control_owner(control._owner_layer_id)
except KeyError as e:
raise RuntimeError(
f"LayerControl owner layer '{control._owner_layer_id}' is not defined in its compositor."
) from e
if owner_layer is not self:
raise RuntimeError(
f"LayerControl belongs to layer '{control._owner_layer_id}', not this {type(self).__name__} instance."
)
if not isinstance(control.runtime_state, self.runtime_state_type):
raise TypeError(
f"{type(self).__name__} control runtime_state must be {self.runtime_state_type.__name__}, "
f"got {type(control.runtime_state).__name__}."
)
if not isinstance(control.runtime_handles, self.runtime_handles_type):
raise TypeError(
f"{type(self).__name__} control runtime_handles must be {self.runtime_handles_type.__name__}, "
f"got {type(control.runtime_handles).__name__}."
)
if active and control.state is not LifecycleState.ACTIVE:
raise RuntimeError(
f"{type(self).__name__} requires an active LayerControl; current state is {control.state.value}."
)
return cast(LayerControl[_RuntimeStateT, _RuntimeHandlesT], control)
def enter(self, control: LayerControl[_RuntimeStateT, _RuntimeHandlesT]) -> AbstractAsyncContextManager[None]:
"""Return the layer's async entry context manager.
``control`` is the lifecycle control slot for this entry. Subclasses can
override this to wrap extra async resources around
``self.lifecycle_enter(control)``.
override this for unusual wrapping, but ordinary live resources should be
registered with ``control.enter_async_resource`` from lifecycle hooks.
"""
return self.lifecycle_enter(control)
@asynccontextmanager
async def lifecycle_enter(self, control: LayerControl[_RuntimeStateT, _RuntimeHandlesT]) -> AsyncIterator[None]:
"""Run the default explicit lifecycle state machine for one entry."""
if control.state is LifecycleState.NEW:
control.exit_intent = ExitIntent.DELETE
await self.on_context_create(control)
control.state = LifecycleState.ACTIVE
elif control.state is LifecycleState.SUSPENDED:
control.exit_intent = ExitIntent.DELETE
await self.on_context_resume(control)
control.state = LifecycleState.ACTIVE
elif control.state is LifecycleState.ACTIVE:
"""Run the default explicit lifecycle and resource stack for one entry.
Exit state is recorded even when suspend/delete hooks fail because the
active entry is over once hook cleanup begins.
"""
if control.state is LifecycleState.ACTIVE:
raise RuntimeError(
"LayerControl is already active; duplicate or nested enter is not allowed."
)
elif control.state is LifecycleState.CLOSED:
if control.state is LifecycleState.CLOSED:
raise RuntimeError(
"LayerControl is closed; create a new compositor session before entering again."
)
control._begin_entry_stack()
try:
if control.state is LifecycleState.NEW:
control.exit_intent = ExitIntent.DELETE
await self.on_context_create(control)
control.state = LifecycleState.ACTIVE
elif control.state is LifecycleState.SUSPENDED:
control.exit_intent = ExitIntent.DELETE
await self.on_context_resume(control)
control.state = LifecycleState.ACTIVE
except BaseException:
await control._close_entry_stack()
raise
try:
yield
finally:
hook_error: BaseException | None = None
if control.exit_intent is ExitIntent.SUSPEND:
await self.on_context_suspend(control)
control.state = LifecycleState.SUSPENDED
try:
await self.on_context_suspend(control)
except BaseException as exc:
hook_error = exc
finally:
control.state = LifecycleState.SUSPENDED
else:
await self.on_context_delete(control)
control.state = LifecycleState.CLOSED
try:
await self.on_context_delete(control)
except BaseException as exc:
hook_error = exc
finally:
control.state = LifecycleState.CLOSED
try:
await control._close_entry_stack()
except BaseException:
if hook_error is not None:
raise hook_error
raise
if hook_error is not None:
raise hook_error
async def on_context_create(self, control: LayerControl[_RuntimeStateT, _RuntimeHandlesT]) -> None:
"""Run when the layer context is entered from ``LifecycleState.NEW``."""

View File

@ -88,11 +88,12 @@ class _DifyRequestInput:
@dataclass(slots=True)
class DifyLLMAdapterModel(Model[DifyPluginDaemonLLMClient]):
"""Use a Dify plugin-daemon LLM provider as a Pydantic AI model."""
"""Use a Dify plugin-daemon transport plus request-level model identity."""
model: str
daemon_provider: DifyPluginDaemonProvider
_: KW_ONLY
model_provider: str
credentials: dict[str, object] = field(default_factory=dict, repr=False)
model_profile: InitVar[ModelProfileSpec | None] = None
model_settings: InitVar[ModelSettings | None] = None
@ -140,6 +141,7 @@ class DifyLLMAdapterModel(Model[DifyPluginDaemonLLMClient]):
response = DifyStreamedResponse(
model_request_parameters=prepared_params,
chunks=self.daemon_provider.client.iter_llm_result_chunks(
provider=self.model_provider,
model=self.model_name,
credentials=request_input.credentials,
prompt_messages=request_input.prompt_messages,
@ -175,6 +177,7 @@ class DifyLLMAdapterModel(Model[DifyPluginDaemonLLMClient]):
yield DifyStreamedResponse(
model_request_parameters=prepared_params,
chunks=self.daemon_provider.client.iter_llm_result_chunks(
provider=self.model_provider,
model=self.model_name,
credentials=request_input.credentials,
prompt_messages=request_input.prompt_messages,

View File

@ -1,4 +1,10 @@
"""Dify plugin-daemon provider for Pydantic AI LLM adapters."""
"""Dify plugin-daemon provider for Pydantic AI LLM adapters.
The Pydantic AI provider represents daemon/plugin transport identity. Business
model provider names such as ``openai`` are request-level model identity and are
passed by ``DifyLLMAdapterModel`` for each invocation instead of being stored on
this provider.
"""
from __future__ import annotations
@ -27,11 +33,12 @@ class PluginDaemonBasicResponse(BaseModel):
@dataclass(slots=True)
class DifyPluginDaemonLLMClient:
"""HTTP client wrapper for plugin-daemon LLM dispatch requests."""
plugin_daemon_url: str
plugin_daemon_api_key: str
tenant_id: str
plugin_id: str
provider: str
user_id: str | None
http_client: httpx.AsyncClient = field(repr=False)
@ -41,6 +48,7 @@ class DifyPluginDaemonLLMClient:
async def iter_llm_result_chunks(
self,
*,
provider: str,
model: str,
credentials: dict[str, object],
prompt_messages: list[PromptMessage],
@ -53,7 +61,7 @@ class DifyPluginDaemonLLMClient:
model_name=model,
path=f"plugin/{self.tenant_id}/dispatch/llm/invoke",
request_data={
"provider": self.provider,
"provider": provider,
"model_type": "llm",
"model": model,
"credentials": credentials,
@ -130,6 +138,9 @@ class DifyPluginDaemonLLMClient:
class DifyPluginDaemonProvider(Provider[DifyPluginDaemonLLMClient]):
"""Pydantic AI provider for Dify plugin-daemon dispatch requests.
The provider ``name`` identifies the daemon/plugin context. The business LLM
provider is supplied by each adapter model request so one daemon provider can
serve different model-provider selections without mutating transport state.
When ``http_client`` is omitted the provider owns an ``AsyncClient`` and the
Pydantic AI provider context manager closes it. When an external client is
supplied, ownership stays with the caller and provider exit leaves it open.
@ -137,7 +148,6 @@ class DifyPluginDaemonProvider(Provider[DifyPluginDaemonLLMClient]):
tenant_id: str
plugin_id: str
plugin_provider: str
plugin_daemon_url: str
plugin_daemon_api_key: str = field(repr=False)
user_id: str | None = None
@ -162,7 +172,6 @@ class DifyPluginDaemonProvider(Provider[DifyPluginDaemonLLMClient]):
plugin_daemon_api_key=self.plugin_daemon_api_key,
tenant_id=self.tenant_id,
plugin_id=self.plugin_id,
provider=self.plugin_provider,
user_id=self.user_id,
http_client=http_client,
)
@ -177,7 +186,7 @@ class DifyPluginDaemonProvider(Provider[DifyPluginDaemonLLMClient]):
@property
@override
def name(self) -> str:
return f"DifyPlugin/{self.plugin_provider}"
return f"DifyPlugin/{self.plugin_id}"
@property
@override

View File

@ -28,9 +28,9 @@ class DifyPluginLayerConfig(LayerConfig):
class DifyPluginLLMLayerConfig(LayerConfig):
"""Public config for selecting a Dify plugin LLM model."""
"""Public config for selecting a business provider/model from a plugin."""
provider: str
model_provider: str
model: str
credentials: dict[str, DifyPluginCredentialValue] = Field(default_factory=dict)
model_settings: ModelSettings | None = None

View File

@ -3,7 +3,9 @@
This layer owns model capability resolution for Dify plugin-backed LLMs. It
depends on ``DifyPluginLayer`` for daemon access, resolves that dependency's
control from its own ``LayerControl``, and returns a Pydantic AI model adapter
configured from the public LLM layer DTO.
configured from the public LLM layer DTO. The daemon provider carries plugin
transport identity; the DTO's ``model_provider`` is passed to the adapter as
request-level model identity.
"""
from dataclasses import dataclass
@ -38,11 +40,13 @@ class DifyPluginLLMLayer(PlainLayer[DifyPluginLLMDeps, DifyPluginLLMLayerConfig]
def get_model(self, control: LayerControl[EmptyRuntimeState, EmptyRuntimeHandles]) -> DifyLLMAdapterModel:
"""Return the configured model using the current session's plugin control."""
control = self.require_control(control, active=True)
plugin_control = control.control_for(self.deps.plugin)
provider = self.deps.plugin.get_provider(plugin_control, plugin_provider=self.config.provider)
provider = self.deps.plugin.get_daemon_provider(plugin_control)
return DifyLLMAdapterModel(
model=self.config.model,
daemon_provider=provider,
model_provider=self.config.model_provider,
credentials=dict(self.config.credentials),
model_settings=self.config.model_settings,
)

View File

@ -3,8 +3,10 @@
The public config identifies tenant/plugin/user context only. Plugin daemon URL,
API key, and timeout are server-side dependencies injected by the layer registry
factory. Each active compositor entry owns an HTTP client in ``LayerControl``
runtime handles; callers pass the control explicitly to ``get_provider`` so
shared layer instances never store or discover session-local clients implicitly.
runtime handles and registers it on the control's resource stack. Callers pass
the control explicitly to ``get_daemon_provider`` so shared layer instances never
store or discover session-local clients implicitly. Business model-provider names
belong to the LLM layer/model request, not this daemon context layer.
"""
from dataclasses import dataclass
@ -13,7 +15,7 @@ import httpx
from pydantic import BaseModel, ConfigDict
from typing_extensions import Self, override
from agenton.layers import EmptyRuntimeState, LayerControl, LifecycleState, NoLayerDeps, PlainLayer
from agenton.layers import EmptyRuntimeState, LayerControl, NoLayerDeps, PlainLayer
from dify_agent.adapters.llm import DifyPluginDaemonProvider
from dify_agent.layers.dify_plugin.configs import DifyPluginLayerConfig
@ -56,25 +58,25 @@ class DifyPluginLayer(PlainLayer[NoLayerDeps, DifyPluginLayerConfig, EmptyRuntim
"""Create a plugin layer from public config plus server-only daemon settings."""
return cls(config=config, daemon_url=daemon_url, daemon_api_key=daemon_api_key, timeout=timeout)
def get_provider(
def get_daemon_provider(
self,
control: LayerControl[EmptyRuntimeState, DifyPluginRuntimeHandles],
*,
plugin_provider: str,
) -> DifyPluginDaemonProvider:
"""Return a provider backed by ``control``'s active HTTP client.
"""Return a daemon provider backed by ``control``'s active HTTP client.
Raises:
RuntimeError: if ``control`` is not active or its HTTP client is
absent/closed.
"""
control = self.require_control(control, active=True)
client = control.runtime_handles.http_client
if control.state is not LifecycleState.ACTIVE or client is None or client.is_closed:
raise RuntimeError("DifyPluginLayer.get_provider() requires an entered control with an open HTTP client.")
if client is None or client.is_closed:
raise RuntimeError(
"DifyPluginLayer.get_daemon_provider() requires an entered control with an open HTTP client."
)
return DifyPluginDaemonProvider(
tenant_id=self.config.tenant_id,
plugin_id=self.config.plugin_id,
plugin_provider=plugin_provider,
plugin_daemon_url=self.daemon_url,
plugin_daemon_api_key=self.daemon_api_key,
user_id=self.config.user_id,
@ -101,23 +103,18 @@ class DifyPluginLayer(PlainLayer[NoLayerDeps, DifyPluginLayerConfig, EmptyRuntim
self,
control: LayerControl[EmptyRuntimeState, DifyPluginRuntimeHandles],
) -> None:
await self._close_http_client(control)
control.runtime_handles.http_client = None
@override
async def on_context_delete(
self,
control: LayerControl[EmptyRuntimeState, DifyPluginRuntimeHandles],
) -> None:
await self._close_http_client(control)
control.runtime_handles.http_client = None
async def _open_http_client(self, control: LayerControl[EmptyRuntimeState, DifyPluginRuntimeHandles]) -> None:
if control.runtime_handles.http_client is None or control.runtime_handles.http_client.is_closed:
control.runtime_handles.http_client = httpx.AsyncClient(timeout=self.timeout, trust_env=False)
async def _close_http_client(self, control: LayerControl[EmptyRuntimeState, DifyPluginRuntimeHandles]) -> None:
client = control.runtime_handles.http_client
control.runtime_handles.http_client = None
if client is not None:
await client.aclose()
control.runtime_handles.http_client = await control.enter_async_resource(
httpx.AsyncClient(timeout=self.timeout, trust_env=False)
)
__all__ = ["DifyPluginLayer", "DifyPluginRuntimeHandles"]

View File

@ -7,6 +7,7 @@ from .schemas import (
CreateRunRequest,
CreateRunResponse,
EmptyRunEventData,
LayerExitSignals,
PydanticAIStreamRunEvent,
RunEvent,
RunEventType,
@ -27,6 +28,7 @@ __all__ = [
"CreateRunResponse",
"DIFY_AGENT_MODEL_LAYER_ID",
"EmptyRunEventData",
"LayerExitSignals",
"PydanticAIStreamRunEvent",
"RUN_EVENT_ADAPTER",
"RunEvent",

View File

@ -10,10 +10,12 @@ by polling and SSE replay. Event envelopes keep the public
a typed ``data`` model so OpenAPI, Redis replay, and clients parse the same
payload contract. Model/provider selection is part of the submitted Agenton
layer graph, not a top-level run field; the runtime reads the model layer named
by ``DIFY_AGENT_MODEL_LAYER_ID``. Successful runs publish the final JSON-safe
agent output and the resumable Agenton session snapshot together on the terminal
``run_succeeded`` event so consumers can treat terminal events as complete run
summaries.
by ``DIFY_AGENT_MODEL_LAYER_ID``. Request-level layer exit signals decide whether
each layer control is suspended or deleted when the active entry exits, with
suspend as the default so successful terminal events can include resumable
snapshots. Successful runs publish the final JSON-safe agent output and the
resumable Agenton session snapshot together on the terminal ``run_succeeded``
event so consumers can treat terminal events as complete run summaries.
"""
from datetime import datetime, timezone
@ -23,6 +25,7 @@ from pydantic import BaseModel, ConfigDict, Field, JsonValue, TypeAdapter
from pydantic_ai.messages import AgentStreamEvent
from agenton.compositor import CompositorConfig, CompositorSessionSnapshot
from agenton.layers import ExitIntent
DIFY_AGENT_MODEL_LAYER_ID: Final[str] = "llm"
@ -40,15 +43,27 @@ def utc_now() -> datetime:
return datetime.now(timezone.utc)
class LayerExitSignals(BaseModel):
"""Requested per-layer lifecycle behavior when a run leaves its active session."""
default: ExitIntent = ExitIntent.SUSPEND
layers: dict[str, ExitIntent] = Field(default_factory=dict)
model_config: ClassVar[ConfigDict] = ConfigDict(extra="forbid")
class CreateRunRequest(BaseModel):
"""Request body for creating one async agent run.
Model/provider configuration must be supplied through the compositor layer
named by ``DIFY_AGENT_MODEL_LAYER_ID``.
named by ``DIFY_AGENT_MODEL_LAYER_ID``. ``layer_exit_signals`` defaults every
layer to suspend so callers receive a resumable success snapshot unless they
explicitly request delete for one or more layers.
"""
compositor: CompositorConfig
session_snapshot: CompositorSessionSnapshot | None = None
layer_exit_signals: LayerExitSignals = Field(default_factory=LayerExitSignals)
model_config: ClassVar[ConfigDict] = ConfigDict(extra="forbid")
@ -162,6 +177,7 @@ __all__ = [
"CreateRunResponse",
"DIFY_AGENT_MODEL_LAYER_ID",
"EmptyRunEventData",
"LayerExitSignals",
"PydanticAIStreamRunEvent",
"RUN_EVENT_ADAPTER",
"RunEvent",

View File

@ -0,0 +1,41 @@
"""Validation and application of request-level Agenton layer exit signals.
HTTP requests carry data-only lifecycle intent in ``LayerExitSignals``. The
runtime validates the signal keys against the built compositor before a run is
persisted or entered, then applies the resolved intent after entry because
``Layer.lifecycle_enter`` resets controls to delete on each successful enter.
"""
from typing import Any
from agenton.compositor import Compositor, CompositorSession
from agenton.layers import ExitIntent
from dify_agent.protocol.schemas import LayerExitSignals
def validate_layer_exit_signals(
compositor: Compositor[Any, Any, Any, Any, Any, Any],
signals: LayerExitSignals,
) -> None:
"""Raise ``ValueError`` when ``signals`` mention layers absent from ``compositor``."""
unknown_layer_ids = set(signals.layers) - set(compositor.layers)
if not unknown_layer_ids:
return
names = ", ".join(sorted(unknown_layer_ids))
raise ValueError(f"layer_exit_signals.layers references unknown layer ids: {names}.")
def apply_layer_exit_signals(session: CompositorSession, signals: LayerExitSignals) -> None:
"""Apply ``signals`` to active controls for the current compositor entry."""
for layer_id, control in session.layer_controls.items():
intent = signals.layers.get(layer_id, signals.default)
if intent is ExitIntent.SUSPEND:
control.suspend_on_exit()
elif intent is ExitIntent.DELETE:
control.delete_on_exit()
else:
raise ValueError(f"Unsupported layer exit intent: {intent!r}.")
__all__ = ["apply_layer_exit_signals", "validate_layer_exit_signals"]

View File

@ -16,6 +16,7 @@ from agenton.compositor import LayerRegistry
from dify_agent.protocol.schemas import CreateRunRequest
from dify_agent.runtime.compositor_factory import build_pydantic_ai_compositor, create_default_layer_registry
from dify_agent.runtime.event_sink import RunEventSink, emit_run_failed
from dify_agent.runtime.layer_exit_signals import validate_layer_exit_signals
from dify_agent.runtime.runner import AgentRunRunner
from dify_agent.runtime.user_prompt_validation import EMPTY_USER_PROMPTS_ERROR, has_non_blank_user_prompt
from dify_agent.server.schemas import RunRecord
@ -88,6 +89,7 @@ class RunScheduler:
from ``active_tasks`` when it finishes, regardless of success or failure.
"""
compositor = build_pydantic_ai_compositor(request.compositor, registry=self.layer_registry)
validate_layer_exit_signals(compositor, request.layer_exit_signals)
if not has_non_blank_user_prompt(compositor.user_prompts):
raise ValueError(EMPTY_USER_PROMPTS_ERROR)

View File

@ -2,11 +2,12 @@
The runner is storage-agnostic: it builds an Agenton compositor, enters or
resumes its session, runs pydantic-ai with ``compositor.user_prompts`` as the user
input, emits stream events, suspends the session on exit, snapshots it, and then
publishes a terminal success or failure event. The Pydantic AI model is resolved
from the active Agenton layer named by ``DIFY_AGENT_MODEL_LAYER_ID``. Successful
terminal events contain both the JSON-safe final output and session snapshot;
there are no separate output or snapshot events to correlate.
input, emits stream events, applies request-level layer exit signals, snapshots
the resulting session, and then publishes a terminal success or failure event.
The Pydantic AI model is resolved from the active Agenton layer named by
``DIFY_AGENT_MODEL_LAYER_ID``. Successful terminal events contain both the
JSON-safe final output and session snapshot; there are no separate output or
snapshot events to correlate.
"""
from collections.abc import AsyncIterable
@ -27,6 +28,7 @@ from dify_agent.runtime.event_sink import (
emit_run_started,
emit_run_succeeded,
)
from dify_agent.runtime.layer_exit_signals import apply_layer_exit_signals, validate_layer_exit_signals
from dify_agent.runtime.user_prompt_validation import EMPTY_USER_PROMPTS_ERROR, has_non_blank_user_prompt
@ -83,13 +85,17 @@ class AgentRunRunner:
async def _run_agent(self) -> tuple[JsonValue, CompositorSessionSnapshot]:
"""Run pydantic-ai inside an entered Agenton session."""
compositor = build_pydantic_ai_compositor(self.request.compositor, registry=self.layer_registry)
try:
validate_layer_exit_signals(compositor, self.request.layer_exit_signals)
except ValueError as exc:
raise AgentRunValidationError(str(exc)) from exc
session = (
compositor.session_from_snapshot(self.request.session_snapshot)
if self.request.session_snapshot is not None
else compositor.new_session()
)
async with compositor.enter(session) as active_session:
active_session.suspend_on_exit()
apply_layer_exit_signals(active_session, self.request.layer_exit_signals)
user_prompts = compositor.user_prompts
if not has_non_blank_user_prompt(user_prompts):
raise AgentRunValidationError(EMPTY_USER_PROMPTS_ERROR)

View File

@ -16,6 +16,7 @@ from fastapi.responses import StreamingResponse
from agenton.compositor import LayerRegistry
from dify_agent.protocol.schemas import CreateRunRequest, CreateRunResponse, RunEventsResponse, RunStatusResponse
from dify_agent.runtime.compositor_factory import build_pydantic_ai_compositor, create_default_layer_registry
from dify_agent.runtime.layer_exit_signals import validate_layer_exit_signals
from dify_agent.runtime.run_scheduler import RunScheduler, SchedulerStoppingError
from dify_agent.runtime.user_prompt_validation import EMPTY_USER_PROMPTS_ERROR, has_non_blank_user_prompt
from dify_agent.server.sse import sse_event_stream
@ -47,6 +48,7 @@ def create_runs_router(
request.compositor,
registry=resolved_get_layer_registry(),
)
validate_layer_exit_signals(compositor, request.layer_exit_signals)
except Exception as exc:
raise HTTPException(status_code=422, detail=str(exc)) from exc
if not has_non_blank_user_prompt(compositor.user_prompts):

View File

@ -261,10 +261,13 @@ def test_snapshot_rejects_active_sessions_and_excludes_handles() -> None:
asyncio.run(run())
snapshot = compositor.snapshot_session(session)
assert snapshot.model_dump(mode="json") == {
dumped = snapshot.model_dump(mode="json")
assert dumped == {
"schema_version": 1,
"layers": [{"name": "handle", "state": "closed", "runtime_state": {"resource_id": "abc"}}],
}
assert "_entry_stack" not in str(dumped)
assert "_owner" not in str(dumped)
def test_restore_validates_runtime_state_and_resume_rehydrates_handles() -> None:

View File

@ -3,7 +3,9 @@ from collections import OrderedDict
from collections.abc import Iterator
from dataclasses import dataclass, field
from itertools import count
from typing import cast
import pytest
from pydantic import BaseModel, ConfigDict
from typing_extensions import override
@ -12,6 +14,7 @@ from agenton.layers import (
ExitIntent,
EmptyLayerConfig,
EmptyRuntimeHandles,
EmptyRuntimeState,
LayerControl,
LifecycleState,
NoLayerDeps,
@ -296,3 +299,261 @@ def test_runtime_state_is_per_session_and_survives_suspend_resume_delete() -> No
"deleted_runtime_id": 2,
}
assert not hasattr(layer, "runtime_id")
class ResourceHandles(BaseModel):
resource: "RecordingResource | None" = None
model_config = ConfigDict(extra="forbid", validate_assignment=True, arbitrary_types_allowed=True)
class RecordingResource:
events: list[str]
label: str
closed: bool
def __init__(self, events: list[str], label: str) -> None:
self.events = events
self.label = label
self.closed = False
async def __aenter__(self) -> "RecordingResource":
self.events.append(f"enter:{self.label}")
return self
async def __aexit__(self, exc_type: object, exc: object, traceback: object) -> None:
self.closed = True
self.events.append(f"exit:{self.label}")
@dataclass(slots=True)
class ResourceLayer(PlainLayer[NoLayerDeps, EmptyLayerConfig, EmptyRuntimeState, ResourceHandles]):
events: list[str] = field(default_factory=list)
resources: list[RecordingResource] = field(default_factory=list)
fail_create: bool = False
fail_resume: bool = False
fail_suspend: bool = False
fail_delete: bool = False
@override
async def on_context_create(self, control: LayerControl[EmptyRuntimeState, ResourceHandles]) -> None:
await self._open_resource(control, "create")
self.events.append("create")
if self.fail_create:
raise RuntimeError("create failed")
@override
async def on_context_resume(self, control: LayerControl[EmptyRuntimeState, ResourceHandles]) -> None:
await self._open_resource(control, "resume")
self.events.append("resume")
if self.fail_resume:
raise RuntimeError("resume failed")
@override
async def on_context_suspend(self, control: LayerControl[EmptyRuntimeState, ResourceHandles]) -> None:
self.events.append("suspend")
control.runtime_handles.resource = None
if self.fail_suspend:
raise RuntimeError("suspend failed")
@override
async def on_context_delete(self, control: LayerControl[EmptyRuntimeState, ResourceHandles]) -> None:
self.events.append("delete")
control.runtime_handles.resource = None
if self.fail_delete:
raise RuntimeError("delete failed")
async def _open_resource(
self,
control: LayerControl[EmptyRuntimeState, ResourceHandles],
label: str,
) -> None:
resource = await control.enter_async_resource(RecordingResource(self.events, label))
async def cleanup() -> None:
self.events.append(f"cleanup:{label}")
control.add_async_cleanup(cleanup)
control.runtime_handles.resource = resource
self.resources.append(resource)
def _resource_control(control: LayerControl) -> LayerControl[EmptyRuntimeState, ResourceHandles]:
return cast(LayerControl[EmptyRuntimeState, ResourceHandles], control)
def test_entry_resource_stack_closes_resources_on_suspend_and_delete() -> None:
layer = ResourceLayer()
compositor: Compositor[PlainPromptType, PlainToolType] = Compositor(layers=OrderedDict([("resource", layer)]))
session = compositor.new_session()
async def run() -> None:
async with compositor.enter(session) as active_session:
control = _resource_control(active_session.layer("resource"))
assert control.runtime_handles.resource is layer.resources[0]
assert layer.resources[0].closed is False
active_session.suspend_on_exit()
assert _resource_control(session.layer("resource")).runtime_handles.resource is None
assert layer.resources[0].closed is True
async with compositor.enter(session):
control = _resource_control(session.layer("resource"))
assert control.runtime_handles.resource is layer.resources[1]
assert layer.resources[1].closed is False
assert _resource_control(session.layer("resource")).runtime_handles.resource is None
assert layer.resources[1].closed is True
asyncio.run(run())
assert layer.events == [
"enter:create",
"create",
"suspend",
"cleanup:create",
"exit:create",
"enter:resume",
"resume",
"delete",
"cleanup:resume",
"exit:resume",
]
def test_entry_resource_stack_closes_resources_when_create_or_resume_raises() -> None:
async def fail_create() -> None:
layer = ResourceLayer(fail_create=True)
compositor: Compositor[PlainPromptType, PlainToolType] = Compositor(layers=OrderedDict([("resource", layer)]))
session = compositor.new_session()
with pytest.raises(RuntimeError, match="create failed"):
async with compositor.enter(session):
pass
assert session.layer("resource").state is LifecycleState.NEW
assert layer.resources[0].closed is True
assert session.layer("resource")._entry_stack is None
async def fail_resume() -> None:
layer = ResourceLayer()
compositor: Compositor[PlainPromptType, PlainToolType] = Compositor(layers=OrderedDict([("resource", layer)]))
session = compositor.new_session()
async with compositor.enter(session) as active_session:
active_session.suspend_on_exit()
layer.fail_resume = True
with pytest.raises(RuntimeError, match="resume failed"):
async with compositor.enter(session):
pass
assert session.layer("resource").state is LifecycleState.SUSPENDED
assert layer.resources[1].closed is True
assert session.layer("resource")._entry_stack is None
asyncio.run(fail_create())
asyncio.run(fail_resume())
def test_entry_resource_stack_closes_resources_when_body_raises() -> None:
layer = ResourceLayer()
compositor: Compositor[PlainPromptType, PlainToolType] = Compositor(layers=OrderedDict([("resource", layer)]))
session = compositor.new_session()
async def run() -> None:
with pytest.raises(RuntimeError, match="body failed"):
async with compositor.enter(session):
raise RuntimeError("body failed")
asyncio.run(run())
assert session.layer("resource").state is LifecycleState.CLOSED
assert layer.resources[0].closed is True
assert session.layer("resource")._entry_stack is None
def test_failed_suspend_hook_exits_active_state_and_closes_resources() -> None:
layer = ResourceLayer(fail_suspend=True)
compositor: Compositor[PlainPromptType, PlainToolType] = Compositor(layers=OrderedDict([("resource", layer)]))
session = compositor.new_session()
async def run() -> None:
with pytest.raises(RuntimeError, match="suspend failed"):
async with compositor.enter(session) as active_session:
active_session.suspend_on_exit()
control = session.layer("resource")
assert control.state is LifecycleState.SUSPENDED
assert layer.resources[0].closed is True
assert control._entry_stack is None
with pytest.raises(RuntimeError, match="entry resource stack is not active"):
await control.enter_async_resource(RecordingResource([], "unused"))
with pytest.raises(RuntimeError, match="requires an active LayerControl"):
_ = layer.require_control(control, active=True)
asyncio.run(run())
def test_failed_delete_hook_exits_active_state_and_closes_resources() -> None:
layer = ResourceLayer(fail_delete=True)
compositor: Compositor[PlainPromptType, PlainToolType] = Compositor(layers=OrderedDict([("resource", layer)]))
session = compositor.new_session()
async def run() -> None:
with pytest.raises(RuntimeError, match="delete failed"):
async with compositor.enter(session):
pass
control = session.layer("resource")
assert control.state is LifecycleState.CLOSED
assert layer.resources[0].closed is True
assert control._entry_stack is None
with pytest.raises(RuntimeError, match="entry resource stack is not active"):
await control.enter_async_resource(RecordingResource([], "unused"))
with pytest.raises(RuntimeError, match="requires an active LayerControl"):
_ = layer.require_control(control, active=True)
asyncio.run(run())
def test_resource_stack_api_raises_outside_active_entry_stack() -> None:
control = LayerControl()
async def cleanup() -> None:
raise AssertionError("cleanup should not be registered")
with pytest.raises(RuntimeError, match="entry resource stack is not active"):
control.add_async_cleanup(cleanup)
with pytest.raises(RuntimeError, match="entry resource stack is not active"):
asyncio.run(control.enter_async_resource(RecordingResource([], "unused")))
def test_require_control_validates_owner_schema_and_active_state() -> None:
layer = ResourceLayer()
other_layer = TraceLayer()
compositor: Compositor[PlainPromptType, PlainToolType] = Compositor(
layers=OrderedDict([("resource", layer), ("other", other_layer)])
)
session = compositor.new_session()
control = session.layer("resource")
with pytest.raises(RuntimeError, match="requires an active LayerControl"):
_ = layer.require_control(control, active=True)
with pytest.raises(RuntimeError, match="belongs to layer 'other'"):
_ = layer.require_control(session.layer("other"))
bad_control = LayerControl()
bad_session = CompositorSession(OrderedDict([("resource", bad_control), ("other", other_layer.new_control())]))
bad_session._bind_owner(compositor)
with pytest.raises(TypeError, match="runtime_handles must be ResourceHandles"):
_ = layer.require_control(bad_control)
async def run() -> None:
async with compositor.enter(session) as active_session:
active_control = active_session.layer("resource")
assert layer.require_control(active_control, active=True) is active_control
asyncio.run(run())

View File

@ -45,7 +45,6 @@ class DifyLLMAdapterModelTests(unittest.IsolatedAsyncioTestCase):
return DifyPluginDaemonProvider(
tenant_id="tenant-1",
plugin_id="langgenius/openai",
plugin_provider="openai",
plugin_daemon_url="http://plugin-daemon",
plugin_daemon_api_key="daemon-secret",
user_id=user_id,
@ -157,6 +156,7 @@ class DifyLLMAdapterModelTests(unittest.IsolatedAsyncioTestCase):
adapter = DifyLLMAdapterModel(
"demo-model",
self.make_provider(user_id="user-123"),
model_provider="openai",
credentials={"api_key": "secret"},
model_settings={"temperature": 0.2, "stop_sequences": ["DEFAULT_STOP"]},
)
@ -168,7 +168,7 @@ class DifyLLMAdapterModelTests(unittest.IsolatedAsyncioTestCase):
)
self.assertEqual(response.model_name, "demo-model")
self.assertEqual(response.provider_name, "DifyPlugin/openai")
self.assertEqual(response.provider_name, "DifyPlugin/langgenius/openai")
self.assertEqual(response.usage.input_tokens, 11)
self.assertEqual(response.usage.output_tokens, 7)
self.assertEqual(response.parts[0].part_kind, "text")
@ -178,6 +178,7 @@ class DifyLLMAdapterModelTests(unittest.IsolatedAsyncioTestCase):
http_client = httpx.AsyncClient()
provider = self.make_provider(http_client=http_client)
self.assertEqual(provider.name, "DifyPlugin/langgenius/openai")
self.assertIs(provider.client.http_client, http_client)
async with provider:
pass
@ -197,6 +198,7 @@ class DifyLLMAdapterModelTests(unittest.IsolatedAsyncioTestCase):
adapter = DifyLLMAdapterModel(
"demo-model",
self.make_provider(),
model_provider="openai",
credentials={"api_key": "secret"},
)
@ -256,6 +258,7 @@ class DifyLLMAdapterModelTests(unittest.IsolatedAsyncioTestCase):
adapter = DifyLLMAdapterModel(
"demo-model",
self.make_provider(),
model_provider="openai",
credentials={"api_key": "secret"},
)
@ -288,6 +291,7 @@ class DifyLLMAdapterModelTests(unittest.IsolatedAsyncioTestCase):
adapter = DifyLLMAdapterModel(
"demo-model",
self.make_provider(),
model_provider="openai",
credentials={"api_key": "secret"},
)
@ -319,6 +323,7 @@ class DifyLLMAdapterModelTests(unittest.IsolatedAsyncioTestCase):
adapter = DifyLLMAdapterModel(
"demo-model",
self.make_provider(),
model_provider="openai",
credentials={"api_key": "secret"},
)
@ -345,6 +350,7 @@ class DifyLLMAdapterModelTests(unittest.IsolatedAsyncioTestCase):
adapter = DifyLLMAdapterModel(
"demo-model",
self.make_provider(),
model_provider="openai",
credentials={"api_key": "secret"},
)
@ -374,6 +380,7 @@ class DifyLLMAdapterModelTests(unittest.IsolatedAsyncioTestCase):
adapter = DifyLLMAdapterModel(
"demo-model",
self.make_provider(),
model_provider="openai",
credentials={"api_key": "secret"},
)

View File

@ -38,19 +38,30 @@ def test_dify_plugin_layer_config_forbids_runtime_settings() -> None:
def test_dify_plugin_llm_config_accepts_scalar_credentials_and_model_settings() -> None:
credential: DifyPluginCredentialValue = "secret"
config = DifyPluginLLMLayerConfig(
provider="openai",
model_provider="openai",
model="gpt-4o-mini",
credentials={"api_key": credential, "enabled": True, "retries": 2, "ratio": 0.5, "empty": None},
model_settings={"temperature": 0.2, "max_tokens": 64},
)
assert config.model_provider == "openai"
assert config.credentials == {"api_key": "secret", "enabled": True, "retries": 2, "ratio": 0.5, "empty": None}
assert config.model_settings == {"temperature": 0.2, "max_tokens": 64}
with pytest.raises(ValidationError):
_ = DifyPluginLLMLayerConfig.model_validate(
{
"provider": "openai",
"model_provider": "openai",
"model": "gpt-4o-mini",
"credentials": {"nested": {"not": "allowed"}},
}
)
def test_dify_plugin_llm_config_rejects_old_provider_field() -> None:
with pytest.raises(ValidationError):
_ = DifyPluginLLMLayerConfig.model_validate(
{
"provider": "openai",
"model": "gpt-4o-mini",
}
)

View File

@ -5,7 +5,7 @@ from typing import cast
import pytest
from agenton.compositor import Compositor
from agenton.layers import EmptyRuntimeState, LayerControl, PlainPromptType, PlainToolType
from agenton.layers import EmptyRuntimeHandles, EmptyRuntimeState, LayerControl, PlainPromptType, PlainToolType
from dify_agent.adapters.llm import DifyLLMAdapterModel
from dify_agent.layers.dify_plugin.configs import DifyPluginLLMLayerConfig, DifyPluginLayerConfig
from dify_agent.layers.dify_plugin.llm_layer import DifyPluginLLMLayer
@ -24,7 +24,7 @@ def _plugin_layer() -> DifyPluginLayer:
def _llm_layer() -> DifyPluginLLMLayer:
return DifyPluginLLMLayer.from_config(
DifyPluginLLMLayerConfig(
provider="openai",
model_provider="openai",
model="demo-model",
credentials={"api_key": "secret"},
model_settings={"temperature": 0.2},
@ -36,36 +36,62 @@ def _plugin_control(control: LayerControl) -> LayerControl[EmptyRuntimeState, Di
return cast(LayerControl[EmptyRuntimeState, DifyPluginRuntimeHandles], control)
def test_dify_plugin_layer_get_provider_requires_active_context_and_uses_runtime_client() -> None:
def _llm_control(control: LayerControl) -> LayerControl[EmptyRuntimeState, EmptyRuntimeHandles]:
return cast(LayerControl[EmptyRuntimeState, EmptyRuntimeHandles], control)
def test_dify_plugin_layer_uses_resource_stack_and_get_daemon_provider_requires_active_control() -> None:
async def scenario() -> None:
plugin = _plugin_layer()
compositor: Compositor[PlainPromptType, PlainToolType] = Compositor(layers=OrderedDict([("plugin", plugin)]))
session = compositor.new_session()
try:
_ = plugin.get_provider(_plugin_control(session.layer("plugin")), plugin_provider="openai")
except RuntimeError as e:
assert str(e) == "DifyPluginLayer.get_provider() requires an entered control with an open HTTP client."
else:
raise AssertionError("Expected RuntimeError.")
with pytest.raises(RuntimeError, match="requires an active LayerControl"):
_ = plugin.get_daemon_provider(_plugin_control(session.layer("plugin")))
async with compositor.enter(session):
async with compositor.enter(session) as active_session:
handles = cast(DifyPluginRuntimeHandles, cast(object, session.layer("plugin").runtime_handles))
client = handles.http_client
assert client is not None
provider = plugin.get_provider(_plugin_control(session.layer("plugin")), plugin_provider="openai")
assert provider.client.http_client is client
first_client = handles.http_client
assert first_client is not None
provider = plugin.get_daemon_provider(_plugin_control(session.layer("plugin")))
assert provider.name == "DifyPlugin/langgenius/openai"
assert provider.client.http_client is first_client
assert provider.client.tenant_id == "tenant-1"
assert provider.client.plugin_id == "langgenius/openai"
assert provider.client.provider == "openai"
assert provider.client.user_id == "user-1"
async with provider:
pass
assert client.is_closed is False
assert first_client.is_closed is False
active_session.suspend_on_exit()
assert client.is_closed is True
with pytest.raises(RuntimeError, match="entered control with an open HTTP client"):
_ = plugin.get_provider(_plugin_control(session.layer("plugin")), plugin_provider="openai")
assert handles.http_client is None
assert first_client.is_closed is True
with pytest.raises(RuntimeError, match="requires an active LayerControl"):
_ = plugin.get_daemon_provider(_plugin_control(session.layer("plugin")))
async with compositor.enter(session):
second_client = handles.http_client
assert second_client is not None
assert second_client is not first_client
assert handles.http_client is None
assert second_client.is_closed is True
asyncio.run(scenario())
def test_dify_plugin_layer_get_daemon_provider_rejects_wrong_control() -> None:
async def scenario() -> None:
plugin = _plugin_layer()
llm = _llm_layer()
compositor: Compositor[PlainPromptType, PlainToolType] = Compositor(
layers=OrderedDict([("plugin", plugin), ("llm", llm)]),
deps_name_mapping={"llm": {"plugin": "plugin"}},
)
async with compositor.enter() as session:
with pytest.raises(RuntimeError, match="belongs to layer 'llm'"):
_ = plugin.get_daemon_provider(_plugin_control(session.layer("llm")))
asyncio.run(scenario())
@ -79,15 +105,23 @@ def test_dify_plugin_llm_layer_builds_adapter_model_from_dependency_provider() -
deps_name_mapping={"llm": {"plugin": "plugin"}},
)
async with compositor.enter() as session:
model = llm.get_model(session.layer("llm"))
session = compositor.new_session()
with pytest.raises(RuntimeError, match="requires an active LayerControl"):
_ = llm.get_model(_llm_control(session.layer("llm")))
async with compositor.enter(session):
model = llm.get_model(_llm_control(session.layer("llm")))
assert isinstance(model, DifyLLMAdapterModel)
assert model.model_name == "demo-model"
assert model.model_provider == "openai"
assert model.credentials == {"api_key": "secret"}
assert model.provider.name == "DifyPlugin/openai"
assert model.provider.name == "DifyPlugin/langgenius/openai"
handles = cast(DifyPluginRuntimeHandles, cast(object, session.layer("plugin").runtime_handles))
assert model.provider.client.http_client is handles.http_client
with pytest.raises(RuntimeError, match="belongs to layer 'plugin'"):
_ = llm.get_model(_llm_control(session.layer("plugin")))
asyncio.run(scenario())
@ -143,14 +177,8 @@ def test_dify_plugin_layer_concurrent_sessions_use_separate_controls_and_clients
assert second_client is not None
assert first_client is not second_client
first_provider = plugin.get_provider(
_plugin_control(first_session.layer("plugin")),
plugin_provider="openai",
)
second_provider = plugin.get_provider(
_plugin_control(second_session.layer("plugin")),
plugin_provider="openai",
)
first_provider = plugin.get_daemon_provider(_plugin_control(first_session.layer("plugin")))
second_provider = plugin.get_daemon_provider(_plugin_control(second_session.layer("plugin")))
assert first_provider.client.http_client is first_client
assert second_provider.client.http_client is second_client

View File

@ -2,11 +2,14 @@ import pytest
from pydantic import ValidationError
from pydantic_ai.messages import FinalResultEvent
from agenton.layers import ExitIntent
from agenton.compositor import CompositorSessionSnapshot
import dify_agent.protocol as protocol_exports
from dify_agent.protocol import DIFY_AGENT_MODEL_LAYER_ID
from dify_agent.protocol.schemas import (
RUN_EVENT_ADAPTER,
CreateRunRequest,
LayerExitSignals,
PydanticAIStreamRunEvent,
RunFailedEvent,
RunFailedEventData,
@ -62,6 +65,34 @@ def test_create_run_request_rejects_agent_profile_and_model_layer_id_is_public()
)
def test_layer_exit_signals_default_to_suspend_and_are_public() -> None:
assert protocol_exports.LayerExitSignals is LayerExitSignals
request = CreateRunRequest.model_validate({"compositor": {"layers": []}})
assert request.layer_exit_signals.default is ExitIntent.SUSPEND
assert request.layer_exit_signals.layers == {}
def test_layer_exit_signals_accept_layer_overrides() -> None:
request = CreateRunRequest.model_validate(
{
"compositor": {"layers": []},
"layer_exit_signals": {
"default": "delete",
"layers": {"prompt": "suspend", "llm": "delete"},
},
}
)
assert request.layer_exit_signals.default is ExitIntent.DELETE
assert request.layer_exit_signals.layers == {"prompt": ExitIntent.SUSPEND, "llm": ExitIntent.DELETE}
def test_layer_exit_signals_reject_extra_fields() -> None:
with pytest.raises(ValidationError):
_ = LayerExitSignals.model_validate({"default": "suspend", "unknown": "value"})
@pytest.mark.parametrize("event_type", ["agent_output", "session_snapshot"])
def test_removed_non_terminal_payload_events_are_rejected(event_type: str) -> None:
with pytest.raises(ValidationError):

View File

@ -6,7 +6,8 @@ import pytest
from pydantic import JsonValue
from agenton.compositor import CompositorConfig, LayerNodeConfig
from dify_agent.protocol.schemas import CreateRunRequest, RunEvent, RunStatus
from agenton.layers import ExitIntent
from dify_agent.protocol.schemas import CreateRunRequest, LayerExitSignals, RunEvent, RunStatus
from dify_agent.runtime.run_scheduler import RunScheduler, SchedulerStoppingError
from dify_agent.server.schemas import RunRecord
@ -135,6 +136,21 @@ def test_create_run_rejects_blank_prompt_before_persisting() -> None:
asyncio.run(scenario())
def test_create_run_rejects_unknown_layer_exit_signal_before_persisting() -> None:
async def scenario() -> None:
store = FakeStore()
scheduler = RunScheduler(store=store)
request = _request()
request.layer_exit_signals = LayerExitSignals(layers={"missing": ExitIntent.DELETE})
with pytest.raises(ValueError, match="missing"):
await scheduler.create_run(request)
assert store.records == {}
asyncio.run(scenario())
def test_create_run_rejects_after_shutdown_starts() -> None:
async def scenario() -> None:
scheduler = RunScheduler(store=FakeStore())

View File

@ -4,13 +4,13 @@ import pytest
from pydantic_ai.models.test import TestModel
from agenton.compositor import CompositorConfig, LayerNodeConfig
from agenton.layers import LayerControl
from agenton.layers import ExitIntent, LayerControl, LifecycleState
from agenton_collections.layers.plain import PromptLayerConfig
from dify_agent.layers.dify_plugin.configs import DifyPluginLLMLayerConfig, DifyPluginLayerConfig
from dify_agent.layers.dify_plugin.llm_layer import DifyPluginLLMLayer
from dify_agent.layers.dify_plugin.plugin_layer import DifyPluginRuntimeHandles
from dify_agent.protocol import DIFY_AGENT_MODEL_LAYER_ID
from dify_agent.protocol.schemas import CreateRunRequest, RunSucceededEvent
from dify_agent.protocol.schemas import CreateRunRequest, LayerExitSignals, RunSucceededEvent
from dify_agent.runtime.event_sink import InMemoryRunEventSink
from dify_agent.runtime.runner import AgentRunRunner, AgentRunValidationError
@ -20,6 +20,7 @@ def _request(
*,
llm_layer_name: str = DIFY_AGENT_MODEL_LAYER_ID,
plugin_layer_name: str = "plugin",
layer_exit_signals: LayerExitSignals | None = None,
) -> CreateRunRequest:
return CreateRunRequest(
compositor=CompositorConfig(
@ -39,13 +40,14 @@ def _request(
type="dify.plugin.llm",
deps={"plugin": plugin_layer_name},
config=DifyPluginLLMLayerConfig(
provider="openai",
model_provider="openai",
model="demo-model",
credentials={"api_key": "secret"},
),
),
]
)
),
layer_exit_signals=layer_exit_signals or LayerExitSignals(),
)
@ -78,9 +80,72 @@ def test_runner_emits_terminal_success_and_snapshot(monkeypatch: pytest.MonkeyPa
"renamed-plugin",
DIFY_AGENT_MODEL_LAYER_ID,
]
assert [layer.state for layer in terminal.data.session_snapshot.layers] == [
LifecycleState.SUSPENDED,
LifecycleState.SUSPENDED,
LifecycleState.SUSPENDED,
]
assert sink.statuses["run-1"] == "succeeded"
def test_runner_applies_layer_exit_signal_overrides_to_success_snapshot(monkeypatch: pytest.MonkeyPatch) -> None:
def fake_get_model(_self: DifyPluginLLMLayer, _control: LayerControl):
return TestModel(custom_output_text="done") # pyright: ignore[reportReturnType]
monkeypatch.setattr(DifyPluginLLMLayer, "get_model", fake_get_model)
request = _request(
layer_exit_signals=LayerExitSignals(
default=ExitIntent.SUSPEND,
layers={"prompt": ExitIntent.DELETE, DIFY_AGENT_MODEL_LAYER_ID: ExitIntent.DELETE},
)
)
sink = InMemoryRunEventSink()
asyncio.run(AgentRunRunner(sink=sink, request=request, run_id="run-exit").run())
terminal = sink.events["run-exit"][-1]
assert isinstance(terminal, RunSucceededEvent)
assert {layer.name: layer.state for layer in terminal.data.session_snapshot.layers} == {
"prompt": LifecycleState.CLOSED,
"plugin": LifecycleState.SUSPENDED,
DIFY_AGENT_MODEL_LAYER_ID: LifecycleState.CLOSED,
}
def test_runner_rejects_unknown_layer_exit_signal_id() -> None:
request = _request(layer_exit_signals=LayerExitSignals(layers={"missing": ExitIntent.DELETE}))
sink = InMemoryRunEventSink()
with pytest.raises(AgentRunValidationError, match="missing"):
asyncio.run(AgentRunRunner(sink=sink, request=request, run_id="run-unknown-signal").run())
assert [event.type for event in sink.events["run-unknown-signal"]] == ["run_started", "run_failed"]
assert sink.statuses["run-unknown-signal"] == "failed"
def test_runner_applies_layer_exit_signals_before_model_resolution_failure(monkeypatch: pytest.MonkeyPatch) -> None:
def fake_get_model(self: DifyPluginLLMLayer, control: LayerControl):
plugin_control = control.control_for(self.deps.plugin)
assert control.exit_intent is ExitIntent.DELETE
assert plugin_control.exit_intent is ExitIntent.SUSPEND
raise RuntimeError("model unavailable")
monkeypatch.setattr(DifyPluginLLMLayer, "get_model", fake_get_model)
request = _request(
layer_exit_signals=LayerExitSignals(
default=ExitIntent.DELETE,
layers={"plugin": ExitIntent.SUSPEND},
)
)
sink = InMemoryRunEventSink()
with pytest.raises(AgentRunValidationError, match="model unavailable"):
asyncio.run(AgentRunRunner(sink=sink, request=request, run_id="run-model-failure").run())
assert [event.type for event in sink.events["run-model-failure"]] == ["run_started", "run_failed"]
assert sink.statuses["run-model-failure"] == "failed"
def test_runner_fails_empty_user_prompts() -> None:
request = _request("")
sink = InMemoryRunEventSink()

View File

@ -97,7 +97,7 @@ def test_create_run_accepts_valid_full_plugin_graph() -> None:
"type": "dify.plugin.llm",
"deps": {"plugin": "plugin-renamed"},
"config": {
"provider": "openai",
"model_provider": "openai",
"model": "gpt-4o-mini",
"credentials": {"api_key": "secret"},
"model_settings": {"temperature": 0.2},
@ -112,6 +112,30 @@ def test_create_run_accepts_valid_full_plugin_graph() -> None:
assert response.json() == {"run_id": "run-1", "status": "running"}
def test_create_run_rejects_unknown_layer_exit_signal_before_scheduling() -> None:
from fastapi import FastAPI
app = FastAPI()
app.include_router(
create_runs_router(lambda: FakeStore(), lambda: FakeScheduler()) # pyright: ignore[reportArgumentType]
)
client = TestClient(app)
response = client.post(
"/runs",
json={
"compositor": {
"schema_version": 1,
"layers": [{"name": "prompt", "type": "plain.prompt", "config": {"user": "hello"}}],
},
"layer_exit_signals": {"layers": {"missing": "delete"}},
},
)
assert response.status_code == 422
assert "missing" in response.json()["detail"]
def test_create_run_returns_503_when_scheduler_is_stopping() -> None:
from fastapi import FastAPI