From ab0b4c45cb75428fb57e5c6390f3a68903eb6713 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=9B=90=E7=B2=92=20Yanli?= Date: Tue, 12 May 2026 03:49:38 +0800 Subject: [PATCH] add agenton dependency control lookup --- dify-agent/docs/agenton/api/index.md | 16 +- dify-agent/docs/agenton/guide/index.md | 23 ++ dify-agent/src/agenton/compositor/__init__.py | 142 +++++++++-- dify-agent/src/agenton/layers/base.py | 89 ++++++- .../layers/dify_plugin/llm_layer.py | 14 +- .../layers/dify_plugin/plugin_layer.py | 56 ++--- dify-agent/src/dify_agent/runtime/runner.py | 4 +- .../agenton/compositor/test_control_deps.py | 222 ++++++++++++++++++ .../layers/dify_plugin/test_layers.py | 92 +++++++- .../local/dify_agent/runtime/test_runner.py | 23 +- .../dify_agent/server/test_runs_routes.py | 47 ++++ 11 files changed, 644 insertions(+), 84 deletions(-) create mode 100644 dify-agent/tests/local/agenton/compositor/test_control_deps.py diff --git a/dify-agent/docs/agenton/api/index.md b/dify-agent/docs/agenton/api/index.md index 486638577d..aa90c8b9fa 100644 --- a/dify-agent/docs/agenton/api/index.md +++ b/dify-agent/docs/agenton/api/index.md @@ -67,9 +67,14 @@ Methods: - `suspend_on_exit() -> None` - `delete_on_exit() -> None` +- `control_for(dep_layer) -> LayerControl`: resolve the unique dependency control + whose resolved target is `dep_layer` in the same session. +- `control_for(dep_name, dep_layer) -> LayerControl`: resolve a named dependency + control when multiple dependency fields could point at the same layer instance. `runtime_state` is serialized in session snapshots. `runtime_handles` is never -serialized and should be rehydrated from runtime state in resume hooks. +serialized and should be rehydrated from runtime state in resume hooks. Private +owner links used by `control_for` are runtime-only and are not snapshotted. ### Schema defaults and lifecycle enums @@ -107,12 +112,12 @@ JSON objects. Use live instances for Python objects and callables. `LayerRegistry` manually registers config-backed layer classes. -- `register_layer(layer_type, type_id=None) -> None` +- `register_layer(layer_type, type_id=None, factory=None) -> None` - `resolve(type_id) -> LayerDescriptor` - `descriptors() -> Mapping[str, LayerDescriptor]` `LayerDescriptor` exposes `type_id`, `layer_type`, `config_type`, -`runtime_state_type`, and `runtime_handles_type`. +`runtime_state_type`, `runtime_handles_type`, and optional `factory`. ### Builder @@ -128,6 +133,11 @@ JSON objects. Use live instances for Python objects and callables. `Compositor[PromptT, ToolT, LayerPromptT, LayerToolT, UserPromptT, LayerUserPromptT]` owns the ordered layer graph. +Dependency binding uses explicit `deps={dep_name: target_layer_name}` mappings +first, then implicit same-name layer binding. Optional dependencies without a +target are recorded as absent so `LayerControl.control_for(...)` raises `KeyError` +rather than returning a control. + Construction: - `Compositor(layers=..., deps_name_mapping=..., ...)` diff --git a/dify-agent/docs/agenton/guide/index.md b/dify-agent/docs/agenton/guide/index.md index 7485843a8f..89d38f77f0 100644 --- a/dify-agent/docs/agenton/guide/index.md +++ b/dify-agent/docs/agenton/guide/index.md @@ -82,6 +82,29 @@ compositor = ( Use `.add_instance()` for layers that require Python objects or callables, such as `ObjectLayer`, `ToolsLayer`, and dynamic tool layers. +## Dependency controls + +Layer dependencies bind layer instances on `self.deps`. When a layer method also +needs the dependency's per-session state or handles, pass the current layer's +`LayerControl` into that method and resolve the dependency control from the same +session: + +```python {test="skip" lint="skip"} +class ModelDeps(LayerDeps): + plugin: PluginLayer + + +@dataclass +class ModelLayer(PlainLayer[ModelDeps]): + def make_model(self, control: LayerControl) -> Model: + plugin_control = control.control_for(self.deps.plugin) + return self.deps.plugin.make_provider(plugin_control) +``` + +Use `control.control_for(dep_name, dep_layer)` when more than one dependency +field can point at the same layer instance. Optional dependencies that were not +bound have no control and raise `KeyError` if requested. + ## System prompts and user prompts Layers expose three prompt surfaces: diff --git a/dify-agent/src/agenton/compositor/__init__.py b/dify-agent/src/agenton/compositor/__init__.py index e4e2d1f4a8..72588758a0 100644 --- a/dify-agent/src/agenton/compositor/__init__.py +++ b/dify-agent/src/agenton/compositor/__init__.py @@ -10,6 +10,10 @@ Layer instances are shared graph/capability definitions owned by the compositor. Per-session runtime state belongs to each session's ``LayerControl`` objects, not to the shared layer instances, so different sessions can enter the same compositor without leaking generated ids or handles through ``self``. +Controls know their owning session and layer id privately so code running inside a +layer can use ``LayerControl.control_for`` to resolve dependency controls from the +same session. These owner links are runtime metadata and are never serialized in +session snapshots. Dependency mappings use layer-local dependency names as keys and compositor layer names as values. System prompt aggregation depends on insertion order: @@ -57,6 +61,8 @@ LayerToolT = TypeVar("LayerToolT", default=AllToolTypes) UserPromptT = TypeVar("UserPromptT", default=AllUserPromptTypes) LayerUserPromptT = TypeVar("LayerUserPromptT", default=AllUserPromptTypes) LayerT = TypeVar("LayerT", bound=Layer[Any, Any, Any, Any, Any, Any, Any]) +DepRuntimeStateT = TypeVar("DepRuntimeStateT", bound=BaseModel) +DepRuntimeHandlesT = TypeVar("DepRuntimeHandlesT", bound=BaseModel) type CompositorTransformer[InputT, OutputT] = Callable[[Sequence[InputT]], Sequence[OutputT]] @@ -205,14 +211,18 @@ class CompositorSession: setting every layer's per-entry exit intent; ``layer`` allows explicit per-layer control when callers need partial suspend/delete behavior. A mixed session with any closed layer cannot be entered again because compositor - entry is all-or-none. + entry is all-or-none. The session also carries private owner metadata so its + controls can resolve dependency controls; snapshots include only public + lifecycle/runtime state. """ - __slots__ = ("layer_controls",) + __slots__ = ("layer_controls", "_owner_compositor") layer_controls: OrderedDict[str, LayerControl] + _owner_compositor: "Compositor[Any, Any, Any, Any, Any, Any] | None" def __init__(self, layer_names: Iterable[str] | Mapping[str, LayerControl]) -> None: + self._owner_compositor = None if isinstance(layer_names, MappingABC): self.layer_controls = OrderedDict(layer_names.items()) return @@ -230,7 +240,87 @@ class CompositorSession: def layer(self, name: str) -> LayerControl: """Return the layer control for ``name`` or raise ``KeyError``.""" - return self.layer_controls[name] + try: + return self.layer_controls[name] + except KeyError as e: + raise KeyError(f"CompositorSession has no layer control named '{name}'.") from e + + def _bind_owner(self, compositor: "Compositor[Any, Any, Any, Any, Any, Any]") -> None: + """Bind runtime owner links on this session and all child controls.""" + self._owner_compositor = compositor + for layer_id, control in self.layer_controls.items(): + control._bind_owner(self, layer_id) + + def _control_for_dependency( + self, + owner_layer_id: str, + dep_name: str | None, + dep_layer: Layer[Any, Any, Any, Any, Any, DepRuntimeStateT, DepRuntimeHandlesT], + ) -> LayerControl[DepRuntimeStateT, DepRuntimeHandlesT]: + """Resolve a dependency control from the owner's resolved dependency targets.""" + if self._owner_compositor is None: + raise RuntimeError("CompositorSession is not attached to a compositor.") + if dep_name is None: + return self._control_for_unique_dependency(owner_layer_id, dep_layer) + return self._control_for_named_dependency(owner_layer_id, dep_name, dep_layer) + + def _control_for_unique_dependency( + self, + owner_layer_id: str, + dep_layer: Layer[Any, Any, Any, Any, Any, DepRuntimeStateT, DepRuntimeHandlesT], + ) -> LayerControl[DepRuntimeStateT, DepRuntimeHandlesT]: + compositor = self._require_owner_compositor() + dep_targets = self._dependency_targets_for(owner_layer_id) + matches = [ + (name, target_id) + for name, target_id in dep_targets.items() + if target_id is not None and compositor.layers[target_id] is dep_layer + ] + if not matches: + raise KeyError( + f"Layer '{owner_layer_id}' has no dependency target bound to the provided " + f"{type(dep_layer).__name__} instance." + ) + if len(matches) > 1: + names = ", ".join(name for name, _target_id in matches) + raise ValueError( + f"Layer '{owner_layer_id}' has multiple dependency fields bound to the provided " + f"{type(dep_layer).__name__} instance: {names}. Pass dep_name explicitly." + ) + _name, target_id = matches[0] + return cast(LayerControl[DepRuntimeStateT, DepRuntimeHandlesT], self.layer(target_id)) + + def _control_for_named_dependency( + self, + owner_layer_id: str, + dep_name: str, + dep_layer: Layer[Any, Any, Any, Any, Any, DepRuntimeStateT, DepRuntimeHandlesT], + ) -> LayerControl[DepRuntimeStateT, DepRuntimeHandlesT]: + compositor = self._require_owner_compositor() + dep_targets = self._dependency_targets_for(owner_layer_id) + if dep_name not in dep_targets: + raise KeyError(f"Layer '{owner_layer_id}' has no resolved dependency named '{dep_name}'.") + target_id = dep_targets[dep_name] + if target_id is None: + raise KeyError(f"Layer '{owner_layer_id}' dependency '{dep_name}' is not bound to a target layer.") + if compositor.layers[target_id] is not dep_layer: + raise TypeError( + f"Layer '{owner_layer_id}' dependency '{dep_name}' resolves to layer '{target_id}', " + f"not the provided {type(dep_layer).__name__} instance." + ) + return cast(LayerControl[DepRuntimeStateT, DepRuntimeHandlesT], self.layer(target_id)) + + def _require_owner_compositor(self) -> "Compositor[Any, Any, Any, Any, Any, Any]": + if self._owner_compositor is None: + raise RuntimeError("CompositorSession is not attached to a compositor.") + return self._owner_compositor + + def _dependency_targets_for(self, owner_layer_id: str) -> Mapping[str, str | None]: + compositor = self._require_owner_compositor() + try: + return compositor._resolved_dep_targets[owner_layer_id] + except KeyError as e: + raise KeyError(f"Layer '{owner_layer_id}' is not defined in this compositor.") from e class LayerSessionSnapshot(BaseModel): @@ -375,6 +465,7 @@ class Compositor(Generic[PromptT, ToolT, LayerPromptT, LayerToolT, UserPromptT, user_prompt_transformer: CompositorTransformer[LayerUserPromptT, UserPromptT] | None = None tool_transformer: CompositorTransformer[LayerToolT, ToolT] | None = None _deps_bound: bool = field(default=False, init=False) + _resolved_dep_targets: dict[str, dict[str, str | None]] = field(default_factory=dict, init=False) def __post_init__(self) -> None: self._bind_deps(self.deps_name_mapping) @@ -401,24 +492,36 @@ class Compositor(Generic[PromptT, ToolT, LayerPromptT, LayerToolT, UserPromptT, The outer mapping key is the layer being bound. The inner mapping key is the dependency field declared by that layer's deps type, and the value is - the target layer name in this compositor. + the target layer name in this compositor. Explicit mappings win over + implicit same-name layer binding. Optional dependencies with no target are + recorded as ``None`` so ``LayerControl.control_for`` can distinguish + "declared but absent" from unknown dependency names. """ if self._deps_bound: raise RuntimeError("Compositor deps are already bound.") + self._resolved_dep_targets = {} for layer_name, layer in self.layers.items(): layer_deps = deps_name_mapping.get(layer_name, {}) - try: - deps = { - dep_name: self.layers[target_layer_name] - for dep_name, target_layer_name in layer_deps.items() - } - except KeyError as e: - raise ValueError( - f"Layer '{layer_name}' has a dependency on layer '{e.args[0]}', " - "which is not defined in the builder." - ) from e - layer.bind_deps({**self.layers, **deps}) + for target_layer_name in layer_deps.values(): + if target_layer_name not in self.layers: + raise ValueError( + f"Layer '{layer_name}' has a dependency on layer '{target_layer_name}', " + "which is not defined in the builder." + ) + + resolved_target_ids: dict[str, str | None] = {} + resolved_deps: dict[str, Layer[Any, Any, Any, Any, Any, Any, Any]] = {} + for dep_name in layer.dependency_names(): + target_layer_name = layer_deps.get(dep_name) + if target_layer_name is None and dep_name in self.layers: + target_layer_name = dep_name + resolved_target_ids[dep_name] = target_layer_name + if target_layer_name is not None: + resolved_deps[dep_name] = self.layers[target_layer_name] + + layer.bind_deps(resolved_deps) + self._resolved_dep_targets[layer_name] = resolved_target_ids self._deps_bound = True @overload @@ -446,9 +549,11 @@ class Compositor(Generic[PromptT, ToolT, LayerPromptT, LayerToolT, UserPromptT, def new_session(self) -> CompositorSession: """Create a fresh lifecycle session matching this compositor's layer order.""" - return CompositorSession( + session = CompositorSession( OrderedDict((layer_name, layer.new_control()) for layer_name, layer in self.layers.items()) ) + session._bind_owner(self) + return session def snapshot_session(self, session: CompositorSession) -> CompositorSessionSnapshot: """Serialize non-active session lifecycle state and runtime state. @@ -499,7 +604,9 @@ class Compositor(Generic[PromptT, ToolT, LayerPromptT, LayerToolT, UserPromptT, ) for layer_snapshot in snapshot.layers ) - return CompositorSession(controls) + session = CompositorSession(controls) + session._bind_owner(self) + return session @asynccontextmanager async def enter( @@ -514,6 +621,7 @@ class Compositor(Generic[PromptT, ToolT, LayerPromptT, LayerToolT, UserPromptT, session = self.new_session() self._validate_session(session) self._ensure_session_can_enter(session) + session._bind_owner(self) async with AsyncExitStack() as stack: for layer_name, layer in self.layers.items(): diff --git a/dify-agent/src/agenton/layers/base.py b/dify-agent/src/agenton/layers/base.py index 49570b86c3..8fbac67a76 100644 --- a/dify-agent/src/agenton/layers/base.py +++ b/dify-agent/src/agenton/layers/base.py @@ -15,7 +15,9 @@ Pydantic models because they are not accepted as graph input. ``Layer.bind_deps`` is the mutation point for dependency state. Layer implementations should treat ``self.deps`` as unavailable until a compositor or -caller has resolved and bound dependencies. +caller has resolved and bound dependencies. When a layer needs a dependency's +session-local state or handles, use the current ``LayerControl.control_for`` API +instead of storing dependency controls on layer instances. Layer async entry uses a caller-provided ``LayerControl`` as an explicit state machine and per-session runtime owner. A fresh control starts in @@ -42,7 +44,20 @@ from contextlib import AbstractAsyncContextManager, asynccontextmanager from dataclasses import dataclass, field from enum import StrEnum from types import UnionType -from typing import Any, ClassVar, Generic, Mapping, Sequence, Union, cast, get_args, get_origin, get_type_hints +from typing import ( + Any, + ClassVar, + Generic, + Mapping, + Protocol, + Sequence, + Union, + cast, + get_args, + get_origin, + get_type_hints, + overload, +) from pydantic import BaseModel, ConfigDict, JsonValue, SerializeAsAny from typing_extensions import Self, TypeVar @@ -72,6 +87,19 @@ type LayerConfigValue = JsonValue | SerializeAsAny[LayerConfig] _ConfigT = TypeVar("_ConfigT", bound=LayerConfig, default="EmptyLayerConfig") _RuntimeStateT = TypeVar("_RuntimeStateT", bound=BaseModel, default="EmptyRuntimeState") _RuntimeHandlesT = TypeVar("_RuntimeHandlesT", bound=BaseModel, default="EmptyRuntimeHandles") +_DepRuntimeStateT = TypeVar("_DepRuntimeStateT", bound=BaseModel) +_DepRuntimeHandlesT = TypeVar("_DepRuntimeHandlesT", bound=BaseModel) + + +class _LayerControlOwnerSession(Protocol): + """Private structural API used by controls to resolve dependency controls.""" + + def _control_for_dependency( + self, + owner_layer_id: str, + dep_name: str | None, + dep_layer: "Layer[Any, Any, Any, Any, Any, _DepRuntimeStateT, _DepRuntimeHandlesT]", + ) -> "LayerControl[_DepRuntimeStateT, _DepRuntimeHandlesT]": ... class LayerDeps: @@ -171,13 +199,17 @@ class LayerControl(Generic[_RuntimeStateT, _RuntimeHandlesT]): callers may inspect closed-session diagnostics after exit. Reuse is still governed by ``state``: a closed control cannot be entered again. Runtime handles are not serialized in snapshots and should be rehydrated from - runtime state in resume hooks. + runtime state in resume hooks. A compositor also binds private owner metadata + so ``control_for`` can find controls for this layer's dependencies in the + same session; those links are runtime-only and not part of snapshots. """ state: LifecycleState = LifecycleState.NEW exit_intent: ExitIntent = ExitIntent.DELETE runtime_state: _RuntimeStateT = field(default_factory=lambda: cast(_RuntimeStateT, EmptyRuntimeState())) runtime_handles: _RuntimeHandlesT = field(default_factory=lambda: cast(_RuntimeHandlesT, EmptyRuntimeHandles())) + _owner_session: _LayerControlOwnerSession | None = field(default=None, init=False, repr=False, compare=False) + _owner_layer_id: str | None = field(default=None, init=False, repr=False, compare=False) def suspend_on_exit(self) -> None: """Request suspend behavior when the current layer entry exits.""" @@ -187,6 +219,57 @@ class LayerControl(Generic[_RuntimeStateT, _RuntimeHandlesT]): """Request delete behavior when the current layer entry exits.""" self.exit_intent = ExitIntent.DELETE + @overload + def control_for( + self, + dep_layer: "Layer[Any, Any, Any, Any, Any, _DepRuntimeStateT, _DepRuntimeHandlesT]", + /, + ) -> "LayerControl[_DepRuntimeStateT, _DepRuntimeHandlesT]": ... + + @overload + def control_for( + self, + dep_name: str, + dep_layer: "Layer[Any, Any, Any, Any, Any, _DepRuntimeStateT, _DepRuntimeHandlesT]", + /, + ) -> "LayerControl[_DepRuntimeStateT, _DepRuntimeHandlesT]": ... + + def control_for( + self, + dep_name_or_layer: "str | Layer[Any, Any, Any, Any, Any, _DepRuntimeStateT, _DepRuntimeHandlesT]", + dep_layer: "Layer[Any, Any, Any, Any, Any, _DepRuntimeStateT, _DepRuntimeHandlesT] | None" = None, + /, + ) -> "LayerControl[_DepRuntimeStateT, _DepRuntimeHandlesT]": + """Return the current session control for one resolved dependency. + + ``control_for(dep_layer)`` is for the common case where exactly one + resolved dependency target of this control's owner layer is ``dep_layer``. + Use ``control_for(dep_name, dep_layer)`` when multiple dependency fields + can point at the same layer instance or when the name makes the lookup + clearer. Optional dependencies that resolved to ``None`` have no control + and raise ``KeyError`` when requested. + """ + if isinstance(dep_name_or_layer, str): + if dep_layer is None: + raise TypeError("LayerControl.control_for(dep_name, dep_layer) requires dep_layer.") + dep_name = dep_name_or_layer + resolved_dep_layer = dep_layer + else: + if dep_layer is not None: + raise TypeError("LayerControl.control_for accepts either (dep_layer) or (dep_name, dep_layer).") + dep_name = None + resolved_dep_layer = dep_name_or_layer + + if self._owner_session is None or self._owner_layer_id is None: + raise RuntimeError("LayerControl is not attached to a compositor session.") + + return self._owner_session._control_for_dependency(self._owner_layer_id, dep_name, resolved_dep_layer) + + def _bind_owner(self, session: _LayerControlOwnerSession, layer_id: str) -> None: + """Attach runtime owner metadata used by ``control_for``.""" + self._owner_session = session + self._owner_layer_id = layer_id + @dataclass(frozen=True, slots=True) class LayerDepSpec: diff --git a/dify-agent/src/dify_agent/layers/dify_plugin/llm_layer.py b/dify-agent/src/dify_agent/layers/dify_plugin/llm_layer.py index de8df348d1..69ba9a5ae3 100644 --- a/dify-agent/src/dify_agent/layers/dify_plugin/llm_layer.py +++ b/dify-agent/src/dify_agent/layers/dify_plugin/llm_layer.py @@ -1,15 +1,16 @@ """Dify plugin LLM model layer. This layer owns model capability resolution for Dify plugin-backed LLMs. It -depends on ``DifyPluginLayer`` for active daemon access and returns a Pydantic AI -model adapter configured from the public LLM layer DTO. +depends on ``DifyPluginLayer`` for daemon access, resolves that dependency's +control from its own ``LayerControl``, and returns a Pydantic AI model adapter +configured from the public LLM layer DTO. """ from dataclasses import dataclass from typing_extensions import Self, override -from agenton.layers import LayerDeps, PlainLayer +from agenton.layers import EmptyRuntimeHandles, EmptyRuntimeState, LayerControl, LayerDeps, PlainLayer from dify_agent.adapters.llm import DifyLLMAdapterModel from dify_agent.layers.dify_plugin.configs import DifyPluginLLMLayerConfig from dify_agent.layers.dify_plugin.plugin_layer import DifyPluginLayer @@ -35,9 +36,10 @@ class DifyPluginLLMLayer(PlainLayer[DifyPluginLLMDeps, DifyPluginLLMLayerConfig] """Create the LLM layer from validated public config.""" return cls(config=config) - def get_model(self) -> DifyLLMAdapterModel: - """Return the configured model using the active plugin daemon provider.""" - provider = self.deps.plugin.get_provider(plugin_provider=self.config.provider) + def get_model(self, control: LayerControl[EmptyRuntimeState, EmptyRuntimeHandles]) -> DifyLLMAdapterModel: + """Return the configured model using the current session's plugin control.""" + plugin_control = control.control_for(self.deps.plugin) + provider = self.deps.plugin.get_provider(plugin_control, plugin_provider=self.config.provider) return DifyLLMAdapterModel( model=self.config.model, daemon_provider=provider, diff --git a/dify-agent/src/dify_agent/layers/dify_plugin/plugin_layer.py b/dify-agent/src/dify_agent/layers/dify_plugin/plugin_layer.py index a869aa213b..5e0ff65489 100644 --- a/dify-agent/src/dify_agent/layers/dify_plugin/plugin_layer.py +++ b/dify-agent/src/dify_agent/layers/dify_plugin/plugin_layer.py @@ -3,21 +3,17 @@ The public config identifies tenant/plugin/user context only. Plugin daemon URL, API key, and timeout are server-side dependencies injected by the layer registry factory. Each active compositor entry owns an HTTP client in ``LayerControl`` -runtime handles; ``get_provider`` discovers those handles via a task-local -context variable so shared layer instances never store session-local clients. +runtime handles; callers pass the control explicitly to ``get_provider`` so +shared layer instances never store or discover session-local clients implicitly. """ -from collections.abc import AsyncIterator -from contextlib import asynccontextmanager -from contextvars import ContextVar, Token from dataclasses import dataclass -from typing import cast import httpx from pydantic import BaseModel, ConfigDict from typing_extensions import Self, override -from agenton.layers import EmptyRuntimeState, LayerControl, NoLayerDeps, PlainLayer +from agenton.layers import EmptyRuntimeState, LayerControl, LifecycleState, NoLayerDeps, PlainLayer from dify_agent.adapters.llm import DifyPluginDaemonProvider from dify_agent.layers.dify_plugin.configs import DifyPluginLayerConfig @@ -30,12 +26,6 @@ class DifyPluginRuntimeHandles(BaseModel): model_config = ConfigDict(extra="forbid", validate_assignment=True, arbitrary_types_allowed=True) -_ACTIVE_PLUGIN_HANDLES: ContextVar[dict[int, DifyPluginRuntimeHandles]] = ContextVar( - "dify_agent_active_plugin_handles", - default={}, -) - - @dataclass(slots=True) class DifyPluginLayer(PlainLayer[NoLayerDeps, DifyPluginLayerConfig, EmptyRuntimeState, DifyPluginRuntimeHandles]): """Layer that owns plugin daemon connection state for one active session.""" @@ -66,33 +56,21 @@ class DifyPluginLayer(PlainLayer[NoLayerDeps, DifyPluginLayerConfig, EmptyRuntim """Create a plugin layer from public config plus server-only daemon settings.""" return cls(config=config, daemon_url=daemon_url, daemon_api_key=daemon_api_key, timeout=timeout) - @override - def enter(self, control: LayerControl[EmptyRuntimeState, DifyPluginRuntimeHandles]): - """Enter the layer and expose active handles through task-local context.""" - return self._enter_with_active_handles(control) - - @asynccontextmanager - async def _enter_with_active_handles( + def get_provider( self, control: LayerControl[EmptyRuntimeState, DifyPluginRuntimeHandles], - ) -> AsyncIterator[None]: - async with self.lifecycle_enter(control): - token = self._set_active_handles(control.runtime_handles) - try: - yield - finally: - _ACTIVE_PLUGIN_HANDLES.reset(token) - - def get_provider(self, *, plugin_provider: str) -> DifyPluginDaemonProvider: - """Return a provider backed by this layer's active HTTP client. + *, + plugin_provider: str, + ) -> DifyPluginDaemonProvider: + """Return a provider backed by ``control``'s active HTTP client. Raises: - RuntimeError: if called outside an active compositor context for this - layer, or after its runtime handles have been closed. + RuntimeError: if ``control`` is not active or its HTTP client is + absent/closed. """ - handles = _ACTIVE_PLUGIN_HANDLES.get().get(id(self)) - if handles is None or handles.http_client is None: - raise RuntimeError("DifyPluginLayer.get_provider() requires an active compositor context.") + client = control.runtime_handles.http_client + if control.state is not LifecycleState.ACTIVE or client is None or client.is_closed: + raise RuntimeError("DifyPluginLayer.get_provider() requires an entered control with an open HTTP client.") return DifyPluginDaemonProvider( tenant_id=self.config.tenant_id, plugin_id=self.config.plugin_id, @@ -101,7 +79,7 @@ class DifyPluginLayer(PlainLayer[NoLayerDeps, DifyPluginLayerConfig, EmptyRuntim plugin_daemon_api_key=self.daemon_api_key, user_id=self.config.user_id, timeout=self.timeout, - http_client=handles.http_client, + http_client=client, ) @override @@ -142,10 +120,4 @@ class DifyPluginLayer(PlainLayer[NoLayerDeps, DifyPluginLayerConfig, EmptyRuntim if client is not None: await client.aclose() - def _set_active_handles(self, handles: DifyPluginRuntimeHandles) -> Token[dict[int, DifyPluginRuntimeHandles]]: - active_handles = dict(_ACTIVE_PLUGIN_HANDLES.get()) - active_handles[id(self)] = handles - return cast(Token[dict[int, DifyPluginRuntimeHandles]], _ACTIVE_PLUGIN_HANDLES.set(active_handles)) - - __all__ = ["DifyPluginLayer", "DifyPluginRuntimeHandles"] diff --git a/dify-agent/src/dify_agent/runtime/runner.py b/dify-agent/src/dify_agent/runtime/runner.py index 5feb4aa342..b70f68da05 100644 --- a/dify-agent/src/dify_agent/runtime/runner.py +++ b/dify-agent/src/dify_agent/runtime/runner.py @@ -99,7 +99,9 @@ class AgentRunRunner: _ = await emit_pydantic_ai_event(self.sink, run_id=self.run_id, data=event) try: - model = compositor.get_layer(DIFY_AGENT_MODEL_LAYER_ID, DifyPluginLLMLayer).get_model() + llm_layer = compositor.get_layer(DIFY_AGENT_MODEL_LAYER_ID, DifyPluginLLMLayer) + llm_control = active_session.layer(DIFY_AGENT_MODEL_LAYER_ID) + model = llm_layer.get_model(llm_control) except (KeyError, TypeError, RuntimeError) as exc: raise AgentRunValidationError(str(exc)) from exc diff --git a/dify-agent/tests/local/agenton/compositor/test_control_deps.py b/dify-agent/tests/local/agenton/compositor/test_control_deps.py new file mode 100644 index 0000000000..400df113de --- /dev/null +++ b/dify-agent/tests/local/agenton/compositor/test_control_deps.py @@ -0,0 +1,222 @@ +import asyncio +from collections import OrderedDict +from dataclasses import dataclass + +import pytest +from typing_extensions import override + +from agenton.compositor import Compositor, CompositorSession +from agenton.layers import LayerControl, LayerDeps, PlainLayer, PlainPromptType, PlainToolType +from agenton_collections.layers.plain import ObjectLayer + + +class RenamedObjectDeps(LayerDeps): + renamed: ObjectLayer[str] # pyright: ignore[reportUninitializedInstanceVariable] + + +@dataclass(slots=True) +class RenamedConsumerLayer(PlainLayer[RenamedObjectDeps]): + @property + @override + def prefix_prompts(self) -> list[str]: + return [self.deps.renamed.value] + + +class SameNameObjectDeps(LayerDeps): + same: ObjectLayer[str] # pyright: ignore[reportUninitializedInstanceVariable] + + +@dataclass(slots=True) +class SameNameConsumerLayer(PlainLayer[SameNameObjectDeps]): + pass + + +class DoubleObjectDeps(LayerDeps): + first: ObjectLayer[str] # pyright: ignore[reportUninitializedInstanceVariable] + second: ObjectLayer[str] # pyright: ignore[reportUninitializedInstanceVariable] + + +@dataclass(slots=True) +class DoubleConsumerLayer(PlainLayer[DoubleObjectDeps]): + pass + + +class OptionalObjectDeps(LayerDeps): + maybe: ObjectLayer[str] | None # pyright: ignore[reportUninitializedInstanceVariable] + + +@dataclass(slots=True) +class OptionalConsumerLayer(PlainLayer[OptionalObjectDeps]): + pass + + +def test_control_for_layer_resolves_unique_explicit_dependency_rename() -> None: + target = ObjectLayer("target") + consumer = RenamedConsumerLayer() + compositor: Compositor[PlainPromptType, PlainToolType] = Compositor( + layers=OrderedDict([("actual", target), ("consumer", consumer)]), + deps_name_mapping={"consumer": {"renamed": "actual"}}, + ) + session = compositor.new_session() + + resolved = session.layer("consumer").control_for(target) + + assert resolved is session.layer("actual") + assert consumer.prefix_prompts == ["target"] + + +def test_control_for_layer_resolves_unique_implicit_same_name_dependency() -> None: + target = ObjectLayer("target") + compositor: Compositor[PlainPromptType, PlainToolType] = Compositor( + layers=OrderedDict([("same", target), ("consumer", SameNameConsumerLayer())]), + ) + session = compositor.new_session() + + assert session.layer("consumer").control_for(target) is session.layer("same") + + +def test_control_for_layer_raises_when_no_dependency_points_to_layer() -> None: + target = ObjectLayer("target") + unrelated = ObjectLayer("unrelated") + compositor: Compositor[PlainPromptType, PlainToolType] = Compositor( + layers=OrderedDict([("target", target), ("unrelated", unrelated), ("consumer", RenamedConsumerLayer())]), + deps_name_mapping={"consumer": {"renamed": "target"}}, + ) + session = compositor.new_session() + + with pytest.raises(KeyError, match="no dependency target.*provided ObjectLayer instance"): + _ = session.layer("consumer").control_for(unrelated) + + +def test_control_for_layer_raises_when_multiple_dependency_fields_match() -> None: + target = ObjectLayer("target") + compositor: Compositor[PlainPromptType, PlainToolType] = Compositor( + layers=OrderedDict([("target", target), ("consumer", DoubleConsumerLayer())]), + deps_name_mapping={"consumer": {"first": "target", "second": "target"}}, + ) + session = compositor.new_session() + + with pytest.raises(ValueError, match="multiple dependency fields.*Pass dep_name explicitly"): + _ = session.layer("consumer").control_for(target) + + +def test_control_for_explicit_dep_name_disambiguates_multiple_deps() -> None: + target = ObjectLayer("target") + compositor: Compositor[PlainPromptType, PlainToolType] = Compositor( + layers=OrderedDict([("target", target), ("consumer", DoubleConsumerLayer())]), + deps_name_mapping={"consumer": {"first": "target", "second": "target"}}, + ) + session = compositor.new_session() + + assert session.layer("consumer").control_for("second", target) is session.layer("target") + + +def test_control_for_optional_missing_dependency_raises() -> None: + target = ObjectLayer("target") + consumer = OptionalConsumerLayer() + compositor: Compositor[PlainPromptType, PlainToolType] = Compositor( + layers=OrderedDict([("consumer", consumer)]), + ) + session = compositor.new_session() + + assert consumer.deps.maybe is None + with pytest.raises(KeyError, match="dependency 'maybe' is not bound"): + _ = session.layer("consumer").control_for("maybe", target) + + +def test_restored_session_rebinds_owner_links_for_control_for() -> None: + target = ObjectLayer("target") + compositor: Compositor[PlainPromptType, PlainToolType] = Compositor( + layers=OrderedDict([("actual", target), ("consumer", RenamedConsumerLayer())]), + deps_name_mapping={"consumer": {"renamed": "actual"}}, + ) + session = compositor.new_session() + + async def suspend_session() -> None: + async with compositor.enter(session) as active_session: + active_session.suspend_on_exit() + + asyncio.run(suspend_session()) + restored = compositor.session_from_snapshot(compositor.snapshot_session(session)) + + assert restored.layer("consumer").control_for(target) is restored.layer("actual") + + +def test_enter_rebinds_external_session_owner_links_for_control_for() -> None: + target = ObjectLayer("target") + consumer = RenamedConsumerLayer() + compositor: Compositor[PlainPromptType, PlainToolType] = Compositor( + layers=OrderedDict([("actual", target), ("consumer", consumer)]), + deps_name_mapping={"consumer": {"renamed": "actual"}}, + ) + external_session = CompositorSession( + OrderedDict([("actual", target.new_control()), ("consumer", consumer.new_control())]) + ) + + async def enter_session() -> None: + async with compositor.enter(external_session) as active_session: + assert active_session.layer("consumer").control_for(target) is active_session.layer("actual") + + asyncio.run(enter_session()) + + +def test_failed_enter_does_not_rebind_active_session_owner_links() -> None: + first_target = ObjectLayer("first") + second_target = ObjectLayer("second") + first_compositor: Compositor[PlainPromptType, PlainToolType] = Compositor( + layers=OrderedDict([("actual", first_target), ("consumer", RenamedConsumerLayer())]), + deps_name_mapping={"consumer": {"renamed": "actual"}}, + ) + second_compositor: Compositor[PlainPromptType, PlainToolType] = Compositor( + layers=OrderedDict([("actual", second_target), ("consumer", RenamedConsumerLayer())]), + deps_name_mapping={"consumer": {"renamed": "actual"}}, + ) + session = first_compositor.new_session() + + async def enter_conflicting_compositor() -> None: + async with first_compositor.enter(session) as active_session: + with pytest.raises(RuntimeError, match="already active"): + async with second_compositor.enter(active_session): + raise AssertionError("Expected active-session rejection before entering layers.") + + assert active_session.layer("consumer").control_for(first_target) is active_session.layer("actual") + + asyncio.run(enter_conflicting_compositor()) + + +def test_control_for_uses_owner_resolved_targets_not_graph_wide_object_identity() -> None: + shared_target = ObjectLayer("shared") + compositor: Compositor[PlainPromptType, PlainToolType] = Compositor( + layers=OrderedDict( + [ + ("first-id", shared_target), + ("second-id", shared_target), + ("consumer", RenamedConsumerLayer()), + ] + ), + deps_name_mapping={"consumer": {"renamed": "second-id"}}, + ) + session = compositor.new_session() + + resolved = session.layer("consumer").control_for(shared_target) + + assert resolved is session.layer("second-id") + assert resolved is not session.layer("first-id") + + +def test_control_for_explicit_dep_name_rejects_wrong_layer_instance() -> None: + target = ObjectLayer("target") + wrong = ObjectLayer("wrong") + compositor: Compositor[PlainPromptType, PlainToolType] = Compositor( + layers=OrderedDict([("target", target), ("wrong", wrong), ("consumer", RenamedConsumerLayer())]), + deps_name_mapping={"consumer": {"renamed": "target"}}, + ) + session = compositor.new_session() + + with pytest.raises(TypeError, match="dependency 'renamed'.*not the provided ObjectLayer instance"): + _ = session.layer("consumer").control_for("renamed", wrong) + + +def test_control_for_unowned_control_raises_clear_error() -> None: + with pytest.raises(RuntimeError, match="not attached to a compositor session"): + _ = LayerControl().control_for(ObjectLayer("target")) diff --git a/dify-agent/tests/local/dify_agent/layers/dify_plugin/test_layers.py b/dify-agent/tests/local/dify_agent/layers/dify_plugin/test_layers.py index 4f3452c394..bdc3c72dce 100644 --- a/dify-agent/tests/local/dify_agent/layers/dify_plugin/test_layers.py +++ b/dify-agent/tests/local/dify_agent/layers/dify_plugin/test_layers.py @@ -2,8 +2,10 @@ import asyncio from collections import OrderedDict from typing import cast +import pytest + from agenton.compositor import Compositor -from agenton.layers import PlainPromptType, PlainToolType +from agenton.layers import EmptyRuntimeState, LayerControl, PlainPromptType, PlainToolType from dify_agent.adapters.llm import DifyLLMAdapterModel from dify_agent.layers.dify_plugin.configs import DifyPluginLLMLayerConfig, DifyPluginLayerConfig from dify_agent.layers.dify_plugin.llm_layer import DifyPluginLLMLayer @@ -30,23 +32,28 @@ def _llm_layer() -> DifyPluginLLMLayer: ) +def _plugin_control(control: LayerControl) -> LayerControl[EmptyRuntimeState, DifyPluginRuntimeHandles]: + return cast(LayerControl[EmptyRuntimeState, DifyPluginRuntimeHandles], control) + + def test_dify_plugin_layer_get_provider_requires_active_context_and_uses_runtime_client() -> None: async def scenario() -> None: plugin = _plugin_layer() compositor: Compositor[PlainPromptType, PlainToolType] = Compositor(layers=OrderedDict([("plugin", plugin)])) + session = compositor.new_session() try: - _ = plugin.get_provider(plugin_provider="openai") + _ = plugin.get_provider(_plugin_control(session.layer("plugin")), plugin_provider="openai") except RuntimeError as e: - assert str(e) == "DifyPluginLayer.get_provider() requires an active compositor context." + assert str(e) == "DifyPluginLayer.get_provider() requires an entered control with an open HTTP client." else: raise AssertionError("Expected RuntimeError.") - async with compositor.enter() as session: + async with compositor.enter(session): handles = cast(DifyPluginRuntimeHandles, cast(object, session.layer("plugin").runtime_handles)) client = handles.http_client assert client is not None - provider = plugin.get_provider(plugin_provider="openai") + provider = plugin.get_provider(_plugin_control(session.layer("plugin")), plugin_provider="openai") assert provider.client.http_client is client assert provider.client.tenant_id == "tenant-1" assert provider.client.plugin_id == "langgenius/openai" @@ -57,6 +64,8 @@ def test_dify_plugin_layer_get_provider_requires_active_context_and_uses_runtime assert client.is_closed is False assert client.is_closed is True + with pytest.raises(RuntimeError, match="entered control with an open HTTP client"): + _ = plugin.get_provider(_plugin_control(session.layer("plugin")), plugin_provider="openai") asyncio.run(scenario()) @@ -71,7 +80,7 @@ def test_dify_plugin_llm_layer_builds_adapter_model_from_dependency_provider() - ) async with compositor.enter() as session: - model = llm.get_model() + model = llm.get_model(session.layer("llm")) assert isinstance(model, DifyLLMAdapterModel) assert model.model_name == "demo-model" assert model.credentials == {"api_key": "secret"} @@ -80,3 +89,74 @@ def test_dify_plugin_llm_layer_builds_adapter_model_from_dependency_provider() - assert model.provider.client.http_client is handles.http_client asyncio.run(scenario()) + + +def test_dify_plugin_llm_layer_get_model_uses_control_dependency_lookup(monkeypatch: pytest.MonkeyPatch) -> None: + async def scenario() -> None: + plugin = _plugin_layer() + llm = _llm_layer() + compositor: Compositor[PlainPromptType, PlainToolType] = Compositor( + layers=OrderedDict([("renamed-plugin", plugin), ("llm", llm)]), + deps_name_mapping={"llm": {"plugin": "renamed-plugin"}}, + ) + + async with compositor.enter() as session: + llm_control = session.layer("llm") + plugin_control = session.layer("renamed-plugin") + calls: list[object] = [] + + def fake_control_for(self: LayerControl, dep_layer: object) -> object: + assert self is llm_control + calls.append(dep_layer) + return plugin_control + + monkeypatch.setattr(LayerControl, "control_for", fake_control_for) + + model = llm.get_model(llm_control) + + assert calls == [plugin] + assert isinstance(model, DifyLLMAdapterModel) + + asyncio.run(scenario()) + + +def test_dify_plugin_layer_concurrent_sessions_use_separate_controls_and_clients() -> None: + async def scenario() -> None: + plugin = _plugin_layer() + compositor: Compositor[PlainPromptType, PlainToolType] = Compositor(layers=OrderedDict([("plugin", plugin)])) + first_session = compositor.new_session() + second_session = compositor.new_session() + + async with compositor.enter(first_session): + async with compositor.enter(second_session): + first_handles = cast( + DifyPluginRuntimeHandles, + cast(object, first_session.layer("plugin").runtime_handles), + ) + second_handles = cast( + DifyPluginRuntimeHandles, + cast(object, second_session.layer("plugin").runtime_handles), + ) + first_client = first_handles.http_client + second_client = second_handles.http_client + assert first_client is not None + assert second_client is not None + assert first_client is not second_client + + first_provider = plugin.get_provider( + _plugin_control(first_session.layer("plugin")), + plugin_provider="openai", + ) + second_provider = plugin.get_provider( + _plugin_control(second_session.layer("plugin")), + plugin_provider="openai", + ) + assert first_provider.client.http_client is first_client + assert second_provider.client.http_client is second_client + + assert second_client.is_closed is True + assert first_client.is_closed is False + + assert first_client.is_closed is True + + asyncio.run(scenario()) diff --git a/dify-agent/tests/local/dify_agent/runtime/test_runner.py b/dify-agent/tests/local/dify_agent/runtime/test_runner.py index 2c47a58a7f..1db8b059ee 100644 --- a/dify-agent/tests/local/dify_agent/runtime/test_runner.py +++ b/dify-agent/tests/local/dify_agent/runtime/test_runner.py @@ -4,16 +4,23 @@ import pytest from pydantic_ai.models.test import TestModel from agenton.compositor import CompositorConfig, LayerNodeConfig +from agenton.layers import LayerControl from agenton_collections.layers.plain import PromptLayerConfig from dify_agent.layers.dify_plugin.configs import DifyPluginLLMLayerConfig, DifyPluginLayerConfig from dify_agent.layers.dify_plugin.llm_layer import DifyPluginLLMLayer +from dify_agent.layers.dify_plugin.plugin_layer import DifyPluginRuntimeHandles from dify_agent.protocol import DIFY_AGENT_MODEL_LAYER_ID from dify_agent.protocol.schemas import CreateRunRequest, RunSucceededEvent from dify_agent.runtime.event_sink import InMemoryRunEventSink from dify_agent.runtime.runner import AgentRunRunner, AgentRunValidationError -def _request(user: str | list[str] = "hello", *, llm_layer_name: str = DIFY_AGENT_MODEL_LAYER_ID) -> CreateRunRequest: +def _request( + user: str | list[str] = "hello", + *, + llm_layer_name: str = DIFY_AGENT_MODEL_LAYER_ID, + plugin_layer_name: str = "plugin", +) -> CreateRunRequest: return CreateRunRequest( compositor=CompositorConfig( layers=[ @@ -23,14 +30,14 @@ def _request(user: str | list[str] = "hello", *, llm_layer_name: str = DIFY_AGEN config=PromptLayerConfig(prefix="system", user=user), ), LayerNodeConfig( - name="plugin", + name=plugin_layer_name, type="dify.plugin", config=DifyPluginLayerConfig(tenant_id="tenant-1", plugin_id="langgenius/openai"), ), LayerNodeConfig( name=llm_layer_name, type="dify.plugin.llm", - deps={"plugin": "plugin"}, + deps={"plugin": plugin_layer_name}, config=DifyPluginLLMLayerConfig( provider="openai", model="demo-model", @@ -43,12 +50,16 @@ def _request(user: str | list[str] = "hello", *, llm_layer_name: str = DIFY_AGEN def test_runner_emits_terminal_success_and_snapshot(monkeypatch: pytest.MonkeyPatch) -> None: - def fake_get_model(self: DifyPluginLLMLayer): + def fake_get_model(self: DifyPluginLLMLayer, control: LayerControl): assert self.config.model == "demo-model" + plugin_control = control.control_for(self.deps.plugin) + plugin_handles = plugin_control.runtime_handles + assert isinstance(plugin_handles, DifyPluginRuntimeHandles) + assert plugin_handles.http_client is not None return TestModel(custom_output_text="done") # pyright: ignore[reportReturnType] monkeypatch.setattr(DifyPluginLLMLayer, "get_model", fake_get_model) - request = _request() + request = _request(plugin_layer_name="renamed-plugin") sink = InMemoryRunEventSink() asyncio.run(AgentRunRunner(sink=sink, request=request, run_id="run-1").run()) @@ -64,7 +75,7 @@ def test_runner_emits_terminal_success_and_snapshot(monkeypatch: pytest.MonkeyPa assert terminal.data.output == "done" assert [layer.name for layer in terminal.data.session_snapshot.layers] == [ "prompt", - "plugin", + "renamed-plugin", DIFY_AGENT_MODEL_LAYER_ID, ] assert sink.statuses["run-1"] == "succeeded" diff --git a/dify-agent/tests/local/dify_agent/server/test_runs_routes.py b/dify-agent/tests/local/dify_agent/server/test_runs_routes.py index d1806796e0..5a820a2c1c 100644 --- a/dify-agent/tests/local/dify_agent/server/test_runs_routes.py +++ b/dify-agent/tests/local/dify_agent/server/test_runs_routes.py @@ -1,5 +1,6 @@ from fastapi.testclient import TestClient +from dify_agent.protocol import DIFY_AGENT_MODEL_LAYER_ID from dify_agent.runtime.run_scheduler import SchedulerStoppingError from dify_agent.server.routes.runs import create_runs_router from dify_agent.server.schemas import RunRecord @@ -65,6 +66,52 @@ def test_create_run_returns_running_from_scheduler() -> None: assert response.json() == {"run_id": "run-1", "status": "running"} +def test_create_run_accepts_valid_full_plugin_graph() -> None: + from fastapi import FastAPI + + class CapturingScheduler: + async def create_run(self, request: object) -> RunRecord: + del request + return RunRecord(run_id="run-1", status="running") + + app = FastAPI() + app.include_router( + create_runs_router(lambda: FakeStore(), lambda: CapturingScheduler()) # pyright: ignore[reportArgumentType] + ) + client = TestClient(app) + + response = client.post( + "/runs", + json={ + "compositor": { + "schema_version": 1, + "layers": [ + {"name": "prompt", "type": "plain.prompt", "config": {"user": "hello"}}, + { + "name": "plugin-renamed", + "type": "dify.plugin", + "config": {"tenant_id": "tenant-1", "plugin_id": "langgenius/openai"}, + }, + { + "name": DIFY_AGENT_MODEL_LAYER_ID, + "type": "dify.plugin.llm", + "deps": {"plugin": "plugin-renamed"}, + "config": { + "provider": "openai", + "model": "gpt-4o-mini", + "credentials": {"api_key": "secret"}, + "model_settings": {"temperature": 0.2}, + }, + }, + ], + } + }, + ) + + assert response.status_code == 202 + assert response.json() == {"run_id": "run-1", "status": "running"} + + def test_create_run_returns_503_when_scheduler_is_stopping() -> None: from fastapi import FastAPI