add agenton dependency control lookup

This commit is contained in:
盐粒 Yanli 2026-05-12 03:49:38 +08:00
parent 208012e268
commit ab0b4c45cb
11 changed files with 644 additions and 84 deletions

View File

@ -67,9 +67,14 @@ Methods:
- `suspend_on_exit() -> None`
- `delete_on_exit() -> None`
- `control_for(dep_layer) -> LayerControl`: resolve the unique dependency control
whose resolved target is `dep_layer` in the same session.
- `control_for(dep_name, dep_layer) -> LayerControl`: resolve a named dependency
control when multiple dependency fields could point at the same layer instance.
`runtime_state` is serialized in session snapshots. `runtime_handles` is never
serialized and should be rehydrated from runtime state in resume hooks.
serialized and should be rehydrated from runtime state in resume hooks. Private
owner links used by `control_for` are runtime-only and are not snapshotted.
### Schema defaults and lifecycle enums
@ -107,12 +112,12 @@ JSON objects. Use live instances for Python objects and callables.
`LayerRegistry` manually registers config-backed layer classes.
- `register_layer(layer_type, type_id=None) -> None`
- `register_layer(layer_type, type_id=None, factory=None) -> None`
- `resolve(type_id) -> LayerDescriptor`
- `descriptors() -> Mapping[str, LayerDescriptor]`
`LayerDescriptor` exposes `type_id`, `layer_type`, `config_type`,
`runtime_state_type`, and `runtime_handles_type`.
`runtime_state_type`, `runtime_handles_type`, and optional `factory`.
### Builder
@ -128,6 +133,11 @@ JSON objects. Use live instances for Python objects and callables.
`Compositor[PromptT, ToolT, LayerPromptT, LayerToolT, UserPromptT, LayerUserPromptT]`
owns the ordered layer graph.
Dependency binding uses explicit `deps={dep_name: target_layer_name}` mappings
first, then implicit same-name layer binding. Optional dependencies without a
target are recorded as absent so `LayerControl.control_for(...)` raises `KeyError`
rather than returning a control.
Construction:
- `Compositor(layers=..., deps_name_mapping=..., ...)`

View File

@ -82,6 +82,29 @@ compositor = (
Use `.add_instance()` for layers that require Python objects or callables, such
as `ObjectLayer`, `ToolsLayer`, and dynamic tool layers.
## Dependency controls
Layer dependencies bind layer instances on `self.deps`. When a layer method also
needs the dependency's per-session state or handles, pass the current layer's
`LayerControl` into that method and resolve the dependency control from the same
session:
```python {test="skip" lint="skip"}
class ModelDeps(LayerDeps):
plugin: PluginLayer
@dataclass
class ModelLayer(PlainLayer[ModelDeps]):
def make_model(self, control: LayerControl) -> Model:
plugin_control = control.control_for(self.deps.plugin)
return self.deps.plugin.make_provider(plugin_control)
```
Use `control.control_for(dep_name, dep_layer)` when more than one dependency
field can point at the same layer instance. Optional dependencies that were not
bound have no control and raise `KeyError` if requested.
## System prompts and user prompts
Layers expose three prompt surfaces:

View File

@ -10,6 +10,10 @@ Layer instances are shared graph/capability definitions owned by the compositor.
Per-session runtime state belongs to each session's ``LayerControl`` objects,
not to the shared layer instances, so different sessions can enter the same
compositor without leaking generated ids or handles through ``self``.
Controls know their owning session and layer id privately so code running inside a
layer can use ``LayerControl.control_for`` to resolve dependency controls from the
same session. These owner links are runtime metadata and are never serialized in
session snapshots.
Dependency mappings use layer-local dependency names as keys and compositor
layer names as values. System prompt aggregation depends on insertion order:
@ -57,6 +61,8 @@ LayerToolT = TypeVar("LayerToolT", default=AllToolTypes)
UserPromptT = TypeVar("UserPromptT", default=AllUserPromptTypes)
LayerUserPromptT = TypeVar("LayerUserPromptT", default=AllUserPromptTypes)
LayerT = TypeVar("LayerT", bound=Layer[Any, Any, Any, Any, Any, Any, Any])
DepRuntimeStateT = TypeVar("DepRuntimeStateT", bound=BaseModel)
DepRuntimeHandlesT = TypeVar("DepRuntimeHandlesT", bound=BaseModel)
type CompositorTransformer[InputT, OutputT] = Callable[[Sequence[InputT]], Sequence[OutputT]]
@ -205,14 +211,18 @@ class CompositorSession:
setting every layer's per-entry exit intent; ``layer`` allows explicit
per-layer control when callers need partial suspend/delete behavior. A mixed
session with any closed layer cannot be entered again because compositor
entry is all-or-none.
entry is all-or-none. The session also carries private owner metadata so its
controls can resolve dependency controls; snapshots include only public
lifecycle/runtime state.
"""
__slots__ = ("layer_controls",)
__slots__ = ("layer_controls", "_owner_compositor")
layer_controls: OrderedDict[str, LayerControl]
_owner_compositor: "Compositor[Any, Any, Any, Any, Any, Any] | None"
def __init__(self, layer_names: Iterable[str] | Mapping[str, LayerControl]) -> None:
self._owner_compositor = None
if isinstance(layer_names, MappingABC):
self.layer_controls = OrderedDict(layer_names.items())
return
@ -230,7 +240,87 @@ class CompositorSession:
def layer(self, name: str) -> LayerControl:
"""Return the layer control for ``name`` or raise ``KeyError``."""
return self.layer_controls[name]
try:
return self.layer_controls[name]
except KeyError as e:
raise KeyError(f"CompositorSession has no layer control named '{name}'.") from e
def _bind_owner(self, compositor: "Compositor[Any, Any, Any, Any, Any, Any]") -> None:
"""Bind runtime owner links on this session and all child controls."""
self._owner_compositor = compositor
for layer_id, control in self.layer_controls.items():
control._bind_owner(self, layer_id)
def _control_for_dependency(
self,
owner_layer_id: str,
dep_name: str | None,
dep_layer: Layer[Any, Any, Any, Any, Any, DepRuntimeStateT, DepRuntimeHandlesT],
) -> LayerControl[DepRuntimeStateT, DepRuntimeHandlesT]:
"""Resolve a dependency control from the owner's resolved dependency targets."""
if self._owner_compositor is None:
raise RuntimeError("CompositorSession is not attached to a compositor.")
if dep_name is None:
return self._control_for_unique_dependency(owner_layer_id, dep_layer)
return self._control_for_named_dependency(owner_layer_id, dep_name, dep_layer)
def _control_for_unique_dependency(
self,
owner_layer_id: str,
dep_layer: Layer[Any, Any, Any, Any, Any, DepRuntimeStateT, DepRuntimeHandlesT],
) -> LayerControl[DepRuntimeStateT, DepRuntimeHandlesT]:
compositor = self._require_owner_compositor()
dep_targets = self._dependency_targets_for(owner_layer_id)
matches = [
(name, target_id)
for name, target_id in dep_targets.items()
if target_id is not None and compositor.layers[target_id] is dep_layer
]
if not matches:
raise KeyError(
f"Layer '{owner_layer_id}' has no dependency target bound to the provided "
f"{type(dep_layer).__name__} instance."
)
if len(matches) > 1:
names = ", ".join(name for name, _target_id in matches)
raise ValueError(
f"Layer '{owner_layer_id}' has multiple dependency fields bound to the provided "
f"{type(dep_layer).__name__} instance: {names}. Pass dep_name explicitly."
)
_name, target_id = matches[0]
return cast(LayerControl[DepRuntimeStateT, DepRuntimeHandlesT], self.layer(target_id))
def _control_for_named_dependency(
self,
owner_layer_id: str,
dep_name: str,
dep_layer: Layer[Any, Any, Any, Any, Any, DepRuntimeStateT, DepRuntimeHandlesT],
) -> LayerControl[DepRuntimeStateT, DepRuntimeHandlesT]:
compositor = self._require_owner_compositor()
dep_targets = self._dependency_targets_for(owner_layer_id)
if dep_name not in dep_targets:
raise KeyError(f"Layer '{owner_layer_id}' has no resolved dependency named '{dep_name}'.")
target_id = dep_targets[dep_name]
if target_id is None:
raise KeyError(f"Layer '{owner_layer_id}' dependency '{dep_name}' is not bound to a target layer.")
if compositor.layers[target_id] is not dep_layer:
raise TypeError(
f"Layer '{owner_layer_id}' dependency '{dep_name}' resolves to layer '{target_id}', "
f"not the provided {type(dep_layer).__name__} instance."
)
return cast(LayerControl[DepRuntimeStateT, DepRuntimeHandlesT], self.layer(target_id))
def _require_owner_compositor(self) -> "Compositor[Any, Any, Any, Any, Any, Any]":
if self._owner_compositor is None:
raise RuntimeError("CompositorSession is not attached to a compositor.")
return self._owner_compositor
def _dependency_targets_for(self, owner_layer_id: str) -> Mapping[str, str | None]:
compositor = self._require_owner_compositor()
try:
return compositor._resolved_dep_targets[owner_layer_id]
except KeyError as e:
raise KeyError(f"Layer '{owner_layer_id}' is not defined in this compositor.") from e
class LayerSessionSnapshot(BaseModel):
@ -375,6 +465,7 @@ class Compositor(Generic[PromptT, ToolT, LayerPromptT, LayerToolT, UserPromptT,
user_prompt_transformer: CompositorTransformer[LayerUserPromptT, UserPromptT] | None = None
tool_transformer: CompositorTransformer[LayerToolT, ToolT] | None = None
_deps_bound: bool = field(default=False, init=False)
_resolved_dep_targets: dict[str, dict[str, str | None]] = field(default_factory=dict, init=False)
def __post_init__(self) -> None:
self._bind_deps(self.deps_name_mapping)
@ -401,24 +492,36 @@ class Compositor(Generic[PromptT, ToolT, LayerPromptT, LayerToolT, UserPromptT,
The outer mapping key is the layer being bound. The inner mapping key is
the dependency field declared by that layer's deps type, and the value is
the target layer name in this compositor.
the target layer name in this compositor. Explicit mappings win over
implicit same-name layer binding. Optional dependencies with no target are
recorded as ``None`` so ``LayerControl.control_for`` can distinguish
"declared but absent" from unknown dependency names.
"""
if self._deps_bound:
raise RuntimeError("Compositor deps are already bound.")
self._resolved_dep_targets = {}
for layer_name, layer in self.layers.items():
layer_deps = deps_name_mapping.get(layer_name, {})
try:
deps = {
dep_name: self.layers[target_layer_name]
for dep_name, target_layer_name in layer_deps.items()
}
except KeyError as e:
raise ValueError(
f"Layer '{layer_name}' has a dependency on layer '{e.args[0]}', "
"which is not defined in the builder."
) from e
layer.bind_deps({**self.layers, **deps})
for target_layer_name in layer_deps.values():
if target_layer_name not in self.layers:
raise ValueError(
f"Layer '{layer_name}' has a dependency on layer '{target_layer_name}', "
"which is not defined in the builder."
)
resolved_target_ids: dict[str, str | None] = {}
resolved_deps: dict[str, Layer[Any, Any, Any, Any, Any, Any, Any]] = {}
for dep_name in layer.dependency_names():
target_layer_name = layer_deps.get(dep_name)
if target_layer_name is None and dep_name in self.layers:
target_layer_name = dep_name
resolved_target_ids[dep_name] = target_layer_name
if target_layer_name is not None:
resolved_deps[dep_name] = self.layers[target_layer_name]
layer.bind_deps(resolved_deps)
self._resolved_dep_targets[layer_name] = resolved_target_ids
self._deps_bound = True
@overload
@ -446,9 +549,11 @@ class Compositor(Generic[PromptT, ToolT, LayerPromptT, LayerToolT, UserPromptT,
def new_session(self) -> CompositorSession:
"""Create a fresh lifecycle session matching this compositor's layer order."""
return CompositorSession(
session = CompositorSession(
OrderedDict((layer_name, layer.new_control()) for layer_name, layer in self.layers.items())
)
session._bind_owner(self)
return session
def snapshot_session(self, session: CompositorSession) -> CompositorSessionSnapshot:
"""Serialize non-active session lifecycle state and runtime state.
@ -499,7 +604,9 @@ class Compositor(Generic[PromptT, ToolT, LayerPromptT, LayerToolT, UserPromptT,
)
for layer_snapshot in snapshot.layers
)
return CompositorSession(controls)
session = CompositorSession(controls)
session._bind_owner(self)
return session
@asynccontextmanager
async def enter(
@ -514,6 +621,7 @@ class Compositor(Generic[PromptT, ToolT, LayerPromptT, LayerToolT, UserPromptT,
session = self.new_session()
self._validate_session(session)
self._ensure_session_can_enter(session)
session._bind_owner(self)
async with AsyncExitStack() as stack:
for layer_name, layer in self.layers.items():

View File

@ -15,7 +15,9 @@ Pydantic models because they are not accepted as graph input.
``Layer.bind_deps`` is the mutation point for dependency state. Layer
implementations should treat ``self.deps`` as unavailable until a compositor or
caller has resolved and bound dependencies.
caller has resolved and bound dependencies. When a layer needs a dependency's
session-local state or handles, use the current ``LayerControl.control_for`` API
instead of storing dependency controls on layer instances.
Layer async entry uses a caller-provided ``LayerControl`` as an explicit state
machine and per-session runtime owner. A fresh control starts in
@ -42,7 +44,20 @@ from contextlib import AbstractAsyncContextManager, asynccontextmanager
from dataclasses import dataclass, field
from enum import StrEnum
from types import UnionType
from typing import Any, ClassVar, Generic, Mapping, Sequence, Union, cast, get_args, get_origin, get_type_hints
from typing import (
Any,
ClassVar,
Generic,
Mapping,
Protocol,
Sequence,
Union,
cast,
get_args,
get_origin,
get_type_hints,
overload,
)
from pydantic import BaseModel, ConfigDict, JsonValue, SerializeAsAny
from typing_extensions import Self, TypeVar
@ -72,6 +87,19 @@ type LayerConfigValue = JsonValue | SerializeAsAny[LayerConfig]
_ConfigT = TypeVar("_ConfigT", bound=LayerConfig, default="EmptyLayerConfig")
_RuntimeStateT = TypeVar("_RuntimeStateT", bound=BaseModel, default="EmptyRuntimeState")
_RuntimeHandlesT = TypeVar("_RuntimeHandlesT", bound=BaseModel, default="EmptyRuntimeHandles")
_DepRuntimeStateT = TypeVar("_DepRuntimeStateT", bound=BaseModel)
_DepRuntimeHandlesT = TypeVar("_DepRuntimeHandlesT", bound=BaseModel)
class _LayerControlOwnerSession(Protocol):
"""Private structural API used by controls to resolve dependency controls."""
def _control_for_dependency(
self,
owner_layer_id: str,
dep_name: str | None,
dep_layer: "Layer[Any, Any, Any, Any, Any, _DepRuntimeStateT, _DepRuntimeHandlesT]",
) -> "LayerControl[_DepRuntimeStateT, _DepRuntimeHandlesT]": ...
class LayerDeps:
@ -171,13 +199,17 @@ class LayerControl(Generic[_RuntimeStateT, _RuntimeHandlesT]):
callers may inspect closed-session diagnostics after exit. Reuse is still
governed by ``state``: a closed control cannot be entered again. Runtime
handles are not serialized in snapshots and should be rehydrated from
runtime state in resume hooks.
runtime state in resume hooks. A compositor also binds private owner metadata
so ``control_for`` can find controls for this layer's dependencies in the
same session; those links are runtime-only and not part of snapshots.
"""
state: LifecycleState = LifecycleState.NEW
exit_intent: ExitIntent = ExitIntent.DELETE
runtime_state: _RuntimeStateT = field(default_factory=lambda: cast(_RuntimeStateT, EmptyRuntimeState()))
runtime_handles: _RuntimeHandlesT = field(default_factory=lambda: cast(_RuntimeHandlesT, EmptyRuntimeHandles()))
_owner_session: _LayerControlOwnerSession | None = field(default=None, init=False, repr=False, compare=False)
_owner_layer_id: str | None = field(default=None, init=False, repr=False, compare=False)
def suspend_on_exit(self) -> None:
"""Request suspend behavior when the current layer entry exits."""
@ -187,6 +219,57 @@ class LayerControl(Generic[_RuntimeStateT, _RuntimeHandlesT]):
"""Request delete behavior when the current layer entry exits."""
self.exit_intent = ExitIntent.DELETE
@overload
def control_for(
self,
dep_layer: "Layer[Any, Any, Any, Any, Any, _DepRuntimeStateT, _DepRuntimeHandlesT]",
/,
) -> "LayerControl[_DepRuntimeStateT, _DepRuntimeHandlesT]": ...
@overload
def control_for(
self,
dep_name: str,
dep_layer: "Layer[Any, Any, Any, Any, Any, _DepRuntimeStateT, _DepRuntimeHandlesT]",
/,
) -> "LayerControl[_DepRuntimeStateT, _DepRuntimeHandlesT]": ...
def control_for(
self,
dep_name_or_layer: "str | Layer[Any, Any, Any, Any, Any, _DepRuntimeStateT, _DepRuntimeHandlesT]",
dep_layer: "Layer[Any, Any, Any, Any, Any, _DepRuntimeStateT, _DepRuntimeHandlesT] | None" = None,
/,
) -> "LayerControl[_DepRuntimeStateT, _DepRuntimeHandlesT]":
"""Return the current session control for one resolved dependency.
``control_for(dep_layer)`` is for the common case where exactly one
resolved dependency target of this control's owner layer is ``dep_layer``.
Use ``control_for(dep_name, dep_layer)`` when multiple dependency fields
can point at the same layer instance or when the name makes the lookup
clearer. Optional dependencies that resolved to ``None`` have no control
and raise ``KeyError`` when requested.
"""
if isinstance(dep_name_or_layer, str):
if dep_layer is None:
raise TypeError("LayerControl.control_for(dep_name, dep_layer) requires dep_layer.")
dep_name = dep_name_or_layer
resolved_dep_layer = dep_layer
else:
if dep_layer is not None:
raise TypeError("LayerControl.control_for accepts either (dep_layer) or (dep_name, dep_layer).")
dep_name = None
resolved_dep_layer = dep_name_or_layer
if self._owner_session is None or self._owner_layer_id is None:
raise RuntimeError("LayerControl is not attached to a compositor session.")
return self._owner_session._control_for_dependency(self._owner_layer_id, dep_name, resolved_dep_layer)
def _bind_owner(self, session: _LayerControlOwnerSession, layer_id: str) -> None:
"""Attach runtime owner metadata used by ``control_for``."""
self._owner_session = session
self._owner_layer_id = layer_id
@dataclass(frozen=True, slots=True)
class LayerDepSpec:

View File

@ -1,15 +1,16 @@
"""Dify plugin LLM model layer.
This layer owns model capability resolution for Dify plugin-backed LLMs. It
depends on ``DifyPluginLayer`` for active daemon access and returns a Pydantic AI
model adapter configured from the public LLM layer DTO.
depends on ``DifyPluginLayer`` for daemon access, resolves that dependency's
control from its own ``LayerControl``, and returns a Pydantic AI model adapter
configured from the public LLM layer DTO.
"""
from dataclasses import dataclass
from typing_extensions import Self, override
from agenton.layers import LayerDeps, PlainLayer
from agenton.layers import EmptyRuntimeHandles, EmptyRuntimeState, LayerControl, LayerDeps, PlainLayer
from dify_agent.adapters.llm import DifyLLMAdapterModel
from dify_agent.layers.dify_plugin.configs import DifyPluginLLMLayerConfig
from dify_agent.layers.dify_plugin.plugin_layer import DifyPluginLayer
@ -35,9 +36,10 @@ class DifyPluginLLMLayer(PlainLayer[DifyPluginLLMDeps, DifyPluginLLMLayerConfig]
"""Create the LLM layer from validated public config."""
return cls(config=config)
def get_model(self) -> DifyLLMAdapterModel:
"""Return the configured model using the active plugin daemon provider."""
provider = self.deps.plugin.get_provider(plugin_provider=self.config.provider)
def get_model(self, control: LayerControl[EmptyRuntimeState, EmptyRuntimeHandles]) -> DifyLLMAdapterModel:
"""Return the configured model using the current session's plugin control."""
plugin_control = control.control_for(self.deps.plugin)
provider = self.deps.plugin.get_provider(plugin_control, plugin_provider=self.config.provider)
return DifyLLMAdapterModel(
model=self.config.model,
daemon_provider=provider,

View File

@ -3,21 +3,17 @@
The public config identifies tenant/plugin/user context only. Plugin daemon URL,
API key, and timeout are server-side dependencies injected by the layer registry
factory. Each active compositor entry owns an HTTP client in ``LayerControl``
runtime handles; ``get_provider`` discovers those handles via a task-local
context variable so shared layer instances never store session-local clients.
runtime handles; callers pass the control explicitly to ``get_provider`` so
shared layer instances never store or discover session-local clients implicitly.
"""
from collections.abc import AsyncIterator
from contextlib import asynccontextmanager
from contextvars import ContextVar, Token
from dataclasses import dataclass
from typing import cast
import httpx
from pydantic import BaseModel, ConfigDict
from typing_extensions import Self, override
from agenton.layers import EmptyRuntimeState, LayerControl, NoLayerDeps, PlainLayer
from agenton.layers import EmptyRuntimeState, LayerControl, LifecycleState, NoLayerDeps, PlainLayer
from dify_agent.adapters.llm import DifyPluginDaemonProvider
from dify_agent.layers.dify_plugin.configs import DifyPluginLayerConfig
@ -30,12 +26,6 @@ class DifyPluginRuntimeHandles(BaseModel):
model_config = ConfigDict(extra="forbid", validate_assignment=True, arbitrary_types_allowed=True)
_ACTIVE_PLUGIN_HANDLES: ContextVar[dict[int, DifyPluginRuntimeHandles]] = ContextVar(
"dify_agent_active_plugin_handles",
default={},
)
@dataclass(slots=True)
class DifyPluginLayer(PlainLayer[NoLayerDeps, DifyPluginLayerConfig, EmptyRuntimeState, DifyPluginRuntimeHandles]):
"""Layer that owns plugin daemon connection state for one active session."""
@ -66,33 +56,21 @@ class DifyPluginLayer(PlainLayer[NoLayerDeps, DifyPluginLayerConfig, EmptyRuntim
"""Create a plugin layer from public config plus server-only daemon settings."""
return cls(config=config, daemon_url=daemon_url, daemon_api_key=daemon_api_key, timeout=timeout)
@override
def enter(self, control: LayerControl[EmptyRuntimeState, DifyPluginRuntimeHandles]):
"""Enter the layer and expose active handles through task-local context."""
return self._enter_with_active_handles(control)
@asynccontextmanager
async def _enter_with_active_handles(
def get_provider(
self,
control: LayerControl[EmptyRuntimeState, DifyPluginRuntimeHandles],
) -> AsyncIterator[None]:
async with self.lifecycle_enter(control):
token = self._set_active_handles(control.runtime_handles)
try:
yield
finally:
_ACTIVE_PLUGIN_HANDLES.reset(token)
def get_provider(self, *, plugin_provider: str) -> DifyPluginDaemonProvider:
"""Return a provider backed by this layer's active HTTP client.
*,
plugin_provider: str,
) -> DifyPluginDaemonProvider:
"""Return a provider backed by ``control``'s active HTTP client.
Raises:
RuntimeError: if called outside an active compositor context for this
layer, or after its runtime handles have been closed.
RuntimeError: if ``control`` is not active or its HTTP client is
absent/closed.
"""
handles = _ACTIVE_PLUGIN_HANDLES.get().get(id(self))
if handles is None or handles.http_client is None:
raise RuntimeError("DifyPluginLayer.get_provider() requires an active compositor context.")
client = control.runtime_handles.http_client
if control.state is not LifecycleState.ACTIVE or client is None or client.is_closed:
raise RuntimeError("DifyPluginLayer.get_provider() requires an entered control with an open HTTP client.")
return DifyPluginDaemonProvider(
tenant_id=self.config.tenant_id,
plugin_id=self.config.plugin_id,
@ -101,7 +79,7 @@ class DifyPluginLayer(PlainLayer[NoLayerDeps, DifyPluginLayerConfig, EmptyRuntim
plugin_daemon_api_key=self.daemon_api_key,
user_id=self.config.user_id,
timeout=self.timeout,
http_client=handles.http_client,
http_client=client,
)
@override
@ -142,10 +120,4 @@ class DifyPluginLayer(PlainLayer[NoLayerDeps, DifyPluginLayerConfig, EmptyRuntim
if client is not None:
await client.aclose()
def _set_active_handles(self, handles: DifyPluginRuntimeHandles) -> Token[dict[int, DifyPluginRuntimeHandles]]:
active_handles = dict(_ACTIVE_PLUGIN_HANDLES.get())
active_handles[id(self)] = handles
return cast(Token[dict[int, DifyPluginRuntimeHandles]], _ACTIVE_PLUGIN_HANDLES.set(active_handles))
__all__ = ["DifyPluginLayer", "DifyPluginRuntimeHandles"]

View File

@ -99,7 +99,9 @@ class AgentRunRunner:
_ = await emit_pydantic_ai_event(self.sink, run_id=self.run_id, data=event)
try:
model = compositor.get_layer(DIFY_AGENT_MODEL_LAYER_ID, DifyPluginLLMLayer).get_model()
llm_layer = compositor.get_layer(DIFY_AGENT_MODEL_LAYER_ID, DifyPluginLLMLayer)
llm_control = active_session.layer(DIFY_AGENT_MODEL_LAYER_ID)
model = llm_layer.get_model(llm_control)
except (KeyError, TypeError, RuntimeError) as exc:
raise AgentRunValidationError(str(exc)) from exc

View File

@ -0,0 +1,222 @@
import asyncio
from collections import OrderedDict
from dataclasses import dataclass
import pytest
from typing_extensions import override
from agenton.compositor import Compositor, CompositorSession
from agenton.layers import LayerControl, LayerDeps, PlainLayer, PlainPromptType, PlainToolType
from agenton_collections.layers.plain import ObjectLayer
class RenamedObjectDeps(LayerDeps):
renamed: ObjectLayer[str] # pyright: ignore[reportUninitializedInstanceVariable]
@dataclass(slots=True)
class RenamedConsumerLayer(PlainLayer[RenamedObjectDeps]):
@property
@override
def prefix_prompts(self) -> list[str]:
return [self.deps.renamed.value]
class SameNameObjectDeps(LayerDeps):
same: ObjectLayer[str] # pyright: ignore[reportUninitializedInstanceVariable]
@dataclass(slots=True)
class SameNameConsumerLayer(PlainLayer[SameNameObjectDeps]):
pass
class DoubleObjectDeps(LayerDeps):
first: ObjectLayer[str] # pyright: ignore[reportUninitializedInstanceVariable]
second: ObjectLayer[str] # pyright: ignore[reportUninitializedInstanceVariable]
@dataclass(slots=True)
class DoubleConsumerLayer(PlainLayer[DoubleObjectDeps]):
pass
class OptionalObjectDeps(LayerDeps):
maybe: ObjectLayer[str] | None # pyright: ignore[reportUninitializedInstanceVariable]
@dataclass(slots=True)
class OptionalConsumerLayer(PlainLayer[OptionalObjectDeps]):
pass
def test_control_for_layer_resolves_unique_explicit_dependency_rename() -> None:
target = ObjectLayer("target")
consumer = RenamedConsumerLayer()
compositor: Compositor[PlainPromptType, PlainToolType] = Compositor(
layers=OrderedDict([("actual", target), ("consumer", consumer)]),
deps_name_mapping={"consumer": {"renamed": "actual"}},
)
session = compositor.new_session()
resolved = session.layer("consumer").control_for(target)
assert resolved is session.layer("actual")
assert consumer.prefix_prompts == ["target"]
def test_control_for_layer_resolves_unique_implicit_same_name_dependency() -> None:
target = ObjectLayer("target")
compositor: Compositor[PlainPromptType, PlainToolType] = Compositor(
layers=OrderedDict([("same", target), ("consumer", SameNameConsumerLayer())]),
)
session = compositor.new_session()
assert session.layer("consumer").control_for(target) is session.layer("same")
def test_control_for_layer_raises_when_no_dependency_points_to_layer() -> None:
target = ObjectLayer("target")
unrelated = ObjectLayer("unrelated")
compositor: Compositor[PlainPromptType, PlainToolType] = Compositor(
layers=OrderedDict([("target", target), ("unrelated", unrelated), ("consumer", RenamedConsumerLayer())]),
deps_name_mapping={"consumer": {"renamed": "target"}},
)
session = compositor.new_session()
with pytest.raises(KeyError, match="no dependency target.*provided ObjectLayer instance"):
_ = session.layer("consumer").control_for(unrelated)
def test_control_for_layer_raises_when_multiple_dependency_fields_match() -> None:
target = ObjectLayer("target")
compositor: Compositor[PlainPromptType, PlainToolType] = Compositor(
layers=OrderedDict([("target", target), ("consumer", DoubleConsumerLayer())]),
deps_name_mapping={"consumer": {"first": "target", "second": "target"}},
)
session = compositor.new_session()
with pytest.raises(ValueError, match="multiple dependency fields.*Pass dep_name explicitly"):
_ = session.layer("consumer").control_for(target)
def test_control_for_explicit_dep_name_disambiguates_multiple_deps() -> None:
target = ObjectLayer("target")
compositor: Compositor[PlainPromptType, PlainToolType] = Compositor(
layers=OrderedDict([("target", target), ("consumer", DoubleConsumerLayer())]),
deps_name_mapping={"consumer": {"first": "target", "second": "target"}},
)
session = compositor.new_session()
assert session.layer("consumer").control_for("second", target) is session.layer("target")
def test_control_for_optional_missing_dependency_raises() -> None:
target = ObjectLayer("target")
consumer = OptionalConsumerLayer()
compositor: Compositor[PlainPromptType, PlainToolType] = Compositor(
layers=OrderedDict([("consumer", consumer)]),
)
session = compositor.new_session()
assert consumer.deps.maybe is None
with pytest.raises(KeyError, match="dependency 'maybe' is not bound"):
_ = session.layer("consumer").control_for("maybe", target)
def test_restored_session_rebinds_owner_links_for_control_for() -> None:
target = ObjectLayer("target")
compositor: Compositor[PlainPromptType, PlainToolType] = Compositor(
layers=OrderedDict([("actual", target), ("consumer", RenamedConsumerLayer())]),
deps_name_mapping={"consumer": {"renamed": "actual"}},
)
session = compositor.new_session()
async def suspend_session() -> None:
async with compositor.enter(session) as active_session:
active_session.suspend_on_exit()
asyncio.run(suspend_session())
restored = compositor.session_from_snapshot(compositor.snapshot_session(session))
assert restored.layer("consumer").control_for(target) is restored.layer("actual")
def test_enter_rebinds_external_session_owner_links_for_control_for() -> None:
target = ObjectLayer("target")
consumer = RenamedConsumerLayer()
compositor: Compositor[PlainPromptType, PlainToolType] = Compositor(
layers=OrderedDict([("actual", target), ("consumer", consumer)]),
deps_name_mapping={"consumer": {"renamed": "actual"}},
)
external_session = CompositorSession(
OrderedDict([("actual", target.new_control()), ("consumer", consumer.new_control())])
)
async def enter_session() -> None:
async with compositor.enter(external_session) as active_session:
assert active_session.layer("consumer").control_for(target) is active_session.layer("actual")
asyncio.run(enter_session())
def test_failed_enter_does_not_rebind_active_session_owner_links() -> None:
first_target = ObjectLayer("first")
second_target = ObjectLayer("second")
first_compositor: Compositor[PlainPromptType, PlainToolType] = Compositor(
layers=OrderedDict([("actual", first_target), ("consumer", RenamedConsumerLayer())]),
deps_name_mapping={"consumer": {"renamed": "actual"}},
)
second_compositor: Compositor[PlainPromptType, PlainToolType] = Compositor(
layers=OrderedDict([("actual", second_target), ("consumer", RenamedConsumerLayer())]),
deps_name_mapping={"consumer": {"renamed": "actual"}},
)
session = first_compositor.new_session()
async def enter_conflicting_compositor() -> None:
async with first_compositor.enter(session) as active_session:
with pytest.raises(RuntimeError, match="already active"):
async with second_compositor.enter(active_session):
raise AssertionError("Expected active-session rejection before entering layers.")
assert active_session.layer("consumer").control_for(first_target) is active_session.layer("actual")
asyncio.run(enter_conflicting_compositor())
def test_control_for_uses_owner_resolved_targets_not_graph_wide_object_identity() -> None:
shared_target = ObjectLayer("shared")
compositor: Compositor[PlainPromptType, PlainToolType] = Compositor(
layers=OrderedDict(
[
("first-id", shared_target),
("second-id", shared_target),
("consumer", RenamedConsumerLayer()),
]
),
deps_name_mapping={"consumer": {"renamed": "second-id"}},
)
session = compositor.new_session()
resolved = session.layer("consumer").control_for(shared_target)
assert resolved is session.layer("second-id")
assert resolved is not session.layer("first-id")
def test_control_for_explicit_dep_name_rejects_wrong_layer_instance() -> None:
target = ObjectLayer("target")
wrong = ObjectLayer("wrong")
compositor: Compositor[PlainPromptType, PlainToolType] = Compositor(
layers=OrderedDict([("target", target), ("wrong", wrong), ("consumer", RenamedConsumerLayer())]),
deps_name_mapping={"consumer": {"renamed": "target"}},
)
session = compositor.new_session()
with pytest.raises(TypeError, match="dependency 'renamed'.*not the provided ObjectLayer instance"):
_ = session.layer("consumer").control_for("renamed", wrong)
def test_control_for_unowned_control_raises_clear_error() -> None:
with pytest.raises(RuntimeError, match="not attached to a compositor session"):
_ = LayerControl().control_for(ObjectLayer("target"))

View File

@ -2,8 +2,10 @@ import asyncio
from collections import OrderedDict
from typing import cast
import pytest
from agenton.compositor import Compositor
from agenton.layers import PlainPromptType, PlainToolType
from agenton.layers import EmptyRuntimeState, LayerControl, PlainPromptType, PlainToolType
from dify_agent.adapters.llm import DifyLLMAdapterModel
from dify_agent.layers.dify_plugin.configs import DifyPluginLLMLayerConfig, DifyPluginLayerConfig
from dify_agent.layers.dify_plugin.llm_layer import DifyPluginLLMLayer
@ -30,23 +32,28 @@ def _llm_layer() -> DifyPluginLLMLayer:
)
def _plugin_control(control: LayerControl) -> LayerControl[EmptyRuntimeState, DifyPluginRuntimeHandles]:
return cast(LayerControl[EmptyRuntimeState, DifyPluginRuntimeHandles], control)
def test_dify_plugin_layer_get_provider_requires_active_context_and_uses_runtime_client() -> None:
async def scenario() -> None:
plugin = _plugin_layer()
compositor: Compositor[PlainPromptType, PlainToolType] = Compositor(layers=OrderedDict([("plugin", plugin)]))
session = compositor.new_session()
try:
_ = plugin.get_provider(plugin_provider="openai")
_ = plugin.get_provider(_plugin_control(session.layer("plugin")), plugin_provider="openai")
except RuntimeError as e:
assert str(e) == "DifyPluginLayer.get_provider() requires an active compositor context."
assert str(e) == "DifyPluginLayer.get_provider() requires an entered control with an open HTTP client."
else:
raise AssertionError("Expected RuntimeError.")
async with compositor.enter() as session:
async with compositor.enter(session):
handles = cast(DifyPluginRuntimeHandles, cast(object, session.layer("plugin").runtime_handles))
client = handles.http_client
assert client is not None
provider = plugin.get_provider(plugin_provider="openai")
provider = plugin.get_provider(_plugin_control(session.layer("plugin")), plugin_provider="openai")
assert provider.client.http_client is client
assert provider.client.tenant_id == "tenant-1"
assert provider.client.plugin_id == "langgenius/openai"
@ -57,6 +64,8 @@ def test_dify_plugin_layer_get_provider_requires_active_context_and_uses_runtime
assert client.is_closed is False
assert client.is_closed is True
with pytest.raises(RuntimeError, match="entered control with an open HTTP client"):
_ = plugin.get_provider(_plugin_control(session.layer("plugin")), plugin_provider="openai")
asyncio.run(scenario())
@ -71,7 +80,7 @@ def test_dify_plugin_llm_layer_builds_adapter_model_from_dependency_provider() -
)
async with compositor.enter() as session:
model = llm.get_model()
model = llm.get_model(session.layer("llm"))
assert isinstance(model, DifyLLMAdapterModel)
assert model.model_name == "demo-model"
assert model.credentials == {"api_key": "secret"}
@ -80,3 +89,74 @@ def test_dify_plugin_llm_layer_builds_adapter_model_from_dependency_provider() -
assert model.provider.client.http_client is handles.http_client
asyncio.run(scenario())
def test_dify_plugin_llm_layer_get_model_uses_control_dependency_lookup(monkeypatch: pytest.MonkeyPatch) -> None:
async def scenario() -> None:
plugin = _plugin_layer()
llm = _llm_layer()
compositor: Compositor[PlainPromptType, PlainToolType] = Compositor(
layers=OrderedDict([("renamed-plugin", plugin), ("llm", llm)]),
deps_name_mapping={"llm": {"plugin": "renamed-plugin"}},
)
async with compositor.enter() as session:
llm_control = session.layer("llm")
plugin_control = session.layer("renamed-plugin")
calls: list[object] = []
def fake_control_for(self: LayerControl, dep_layer: object) -> object:
assert self is llm_control
calls.append(dep_layer)
return plugin_control
monkeypatch.setattr(LayerControl, "control_for", fake_control_for)
model = llm.get_model(llm_control)
assert calls == [plugin]
assert isinstance(model, DifyLLMAdapterModel)
asyncio.run(scenario())
def test_dify_plugin_layer_concurrent_sessions_use_separate_controls_and_clients() -> None:
async def scenario() -> None:
plugin = _plugin_layer()
compositor: Compositor[PlainPromptType, PlainToolType] = Compositor(layers=OrderedDict([("plugin", plugin)]))
first_session = compositor.new_session()
second_session = compositor.new_session()
async with compositor.enter(first_session):
async with compositor.enter(second_session):
first_handles = cast(
DifyPluginRuntimeHandles,
cast(object, first_session.layer("plugin").runtime_handles),
)
second_handles = cast(
DifyPluginRuntimeHandles,
cast(object, second_session.layer("plugin").runtime_handles),
)
first_client = first_handles.http_client
second_client = second_handles.http_client
assert first_client is not None
assert second_client is not None
assert first_client is not second_client
first_provider = plugin.get_provider(
_plugin_control(first_session.layer("plugin")),
plugin_provider="openai",
)
second_provider = plugin.get_provider(
_plugin_control(second_session.layer("plugin")),
plugin_provider="openai",
)
assert first_provider.client.http_client is first_client
assert second_provider.client.http_client is second_client
assert second_client.is_closed is True
assert first_client.is_closed is False
assert first_client.is_closed is True
asyncio.run(scenario())

View File

@ -4,16 +4,23 @@ import pytest
from pydantic_ai.models.test import TestModel
from agenton.compositor import CompositorConfig, LayerNodeConfig
from agenton.layers import LayerControl
from agenton_collections.layers.plain import PromptLayerConfig
from dify_agent.layers.dify_plugin.configs import DifyPluginLLMLayerConfig, DifyPluginLayerConfig
from dify_agent.layers.dify_plugin.llm_layer import DifyPluginLLMLayer
from dify_agent.layers.dify_plugin.plugin_layer import DifyPluginRuntimeHandles
from dify_agent.protocol import DIFY_AGENT_MODEL_LAYER_ID
from dify_agent.protocol.schemas import CreateRunRequest, RunSucceededEvent
from dify_agent.runtime.event_sink import InMemoryRunEventSink
from dify_agent.runtime.runner import AgentRunRunner, AgentRunValidationError
def _request(user: str | list[str] = "hello", *, llm_layer_name: str = DIFY_AGENT_MODEL_LAYER_ID) -> CreateRunRequest:
def _request(
user: str | list[str] = "hello",
*,
llm_layer_name: str = DIFY_AGENT_MODEL_LAYER_ID,
plugin_layer_name: str = "plugin",
) -> CreateRunRequest:
return CreateRunRequest(
compositor=CompositorConfig(
layers=[
@ -23,14 +30,14 @@ def _request(user: str | list[str] = "hello", *, llm_layer_name: str = DIFY_AGEN
config=PromptLayerConfig(prefix="system", user=user),
),
LayerNodeConfig(
name="plugin",
name=plugin_layer_name,
type="dify.plugin",
config=DifyPluginLayerConfig(tenant_id="tenant-1", plugin_id="langgenius/openai"),
),
LayerNodeConfig(
name=llm_layer_name,
type="dify.plugin.llm",
deps={"plugin": "plugin"},
deps={"plugin": plugin_layer_name},
config=DifyPluginLLMLayerConfig(
provider="openai",
model="demo-model",
@ -43,12 +50,16 @@ def _request(user: str | list[str] = "hello", *, llm_layer_name: str = DIFY_AGEN
def test_runner_emits_terminal_success_and_snapshot(monkeypatch: pytest.MonkeyPatch) -> None:
def fake_get_model(self: DifyPluginLLMLayer):
def fake_get_model(self: DifyPluginLLMLayer, control: LayerControl):
assert self.config.model == "demo-model"
plugin_control = control.control_for(self.deps.plugin)
plugin_handles = plugin_control.runtime_handles
assert isinstance(plugin_handles, DifyPluginRuntimeHandles)
assert plugin_handles.http_client is not None
return TestModel(custom_output_text="done") # pyright: ignore[reportReturnType]
monkeypatch.setattr(DifyPluginLLMLayer, "get_model", fake_get_model)
request = _request()
request = _request(plugin_layer_name="renamed-plugin")
sink = InMemoryRunEventSink()
asyncio.run(AgentRunRunner(sink=sink, request=request, run_id="run-1").run())
@ -64,7 +75,7 @@ def test_runner_emits_terminal_success_and_snapshot(monkeypatch: pytest.MonkeyPa
assert terminal.data.output == "done"
assert [layer.name for layer in terminal.data.session_snapshot.layers] == [
"prompt",
"plugin",
"renamed-plugin",
DIFY_AGENT_MODEL_LAYER_ID,
]
assert sink.statuses["run-1"] == "succeeded"

View File

@ -1,5 +1,6 @@
from fastapi.testclient import TestClient
from dify_agent.protocol import DIFY_AGENT_MODEL_LAYER_ID
from dify_agent.runtime.run_scheduler import SchedulerStoppingError
from dify_agent.server.routes.runs import create_runs_router
from dify_agent.server.schemas import RunRecord
@ -65,6 +66,52 @@ def test_create_run_returns_running_from_scheduler() -> None:
assert response.json() == {"run_id": "run-1", "status": "running"}
def test_create_run_accepts_valid_full_plugin_graph() -> None:
from fastapi import FastAPI
class CapturingScheduler:
async def create_run(self, request: object) -> RunRecord:
del request
return RunRecord(run_id="run-1", status="running")
app = FastAPI()
app.include_router(
create_runs_router(lambda: FakeStore(), lambda: CapturingScheduler()) # pyright: ignore[reportArgumentType]
)
client = TestClient(app)
response = client.post(
"/runs",
json={
"compositor": {
"schema_version": 1,
"layers": [
{"name": "prompt", "type": "plain.prompt", "config": {"user": "hello"}},
{
"name": "plugin-renamed",
"type": "dify.plugin",
"config": {"tenant_id": "tenant-1", "plugin_id": "langgenius/openai"},
},
{
"name": DIFY_AGENT_MODEL_LAYER_ID,
"type": "dify.plugin.llm",
"deps": {"plugin": "plugin-renamed"},
"config": {
"provider": "openai",
"model": "gpt-4o-mini",
"credentials": {"api_key": "secret"},
"model_settings": {"temperature": 0.2},
},
},
],
}
},
)
assert response.status_code == 202
assert response.json() == {"run_id": "run-1", "status": "running"}
def test_create_run_returns_503_when_scheduler_is_stopping() -> None:
from fastapi import FastAPI