refactor agenton compositor lifecycle model

This commit is contained in:
盐粒 Yanli 2026-05-12 23:32:12 +08:00
parent e8c16fb08b
commit b4ce54a7ea
12 changed files with 1402 additions and 1930 deletions

View File

@ -0,0 +1,18 @@
"""Agenton state-only core.
Agenton core composes reusable stateless layer graph plans, creates a fresh
``CompositorRun`` for each invocation, hydrates and advances serializable layer
``runtime_state`` through run slots, and emits session snapshots. It intentionally
does not own resources, handles, clients, cleanup callbacks, or any other
non-serializable runtime object.
Each ``Compositor`` stores only graph nodes and layer providers. Every enter call
creates new layer instances, binds direct dependencies for that run, and writes
the next cross-call state to ``run.session_snapshot`` after exit. To resume a
suspended call, reuse the same compositor plan and pass the prior snapshot to a
new enter call.
``LifecycleState.ACTIVE`` is internal-only while an entry is running. External
session snapshots and hydrated input must contain only non-active lifecycle
states; ``runtime_state`` is the only mutable layer data captured by snapshots.
"""

File diff suppressed because it is too large Load Diff

View File

@ -7,13 +7,11 @@ families while keeping concrete reusable layers in ``agenton_collections``.
from agenton.layers.base import (
EmptyLayerConfig,
EmptyRuntimeHandles,
EmptyRuntimeState,
ExitIntent,
Layer,
LayerConfig,
LayerConfigValue,
LayerControl,
LayerDeps,
LifecycleState,
NoLayerDeps,
@ -46,12 +44,10 @@ __all__ = [
"LayerConfig",
"LayerConfigValue",
"LayerDeps",
"LayerControl",
"LifecycleState",
"ExitIntent",
"EmptyLayerConfig",
"EmptyRuntimeState",
"EmptyRuntimeHandles",
"NoLayerDeps",
"PlainLayer",
"PlainPrompt",

View File

@ -1,37 +1,41 @@
"""Core layer abstractions and typed dependency binding.
"""Invocation-scoped core layer abstractions and typed dependency binding.
Layers declare their dependency shape with ``Layer[DepsT, PromptT, ToolT, ...]``.
Agenton core deliberately manages only three concerns: stateless layer graph
composition, serializable ``runtime_state`` lifecycle, and session snapshots. It
does not own live resources, process handles, HTTP clients, cleanup stacks, or
any other non-serializable runtime object. Those belong to application layers or
integration code outside the core.
Layers declare their dependency shape with
``Layer[DepsT, PromptT, UserPromptT, ToolT, ConfigT, RuntimeStateT]``.
``DepsT`` must be a ``LayerDeps`` subclass whose annotated members are concrete
``Layer`` subclasses or modern optional dependencies such as ``SomeLayer |
None``. The optional trailing generic slots declare Pydantic schemas for config,
serializable runtime state, and live runtime handles. The base class infers
``deps_type`` and schema class attributes from the generic base when possible,
while still allowing subclasses to set them explicitly for unusual inheritance
patterns.
``Layer`` subclasses or modern optional dependencies such as ``SomeLayer | None``.
Dependencies are direct layer instance relationships bound onto ``self.deps``
for one compositor invocation; there is no dependency-control lookup API in the
core.
``LayerConfig`` is the DTO base for config schemas that can be embedded directly
in serializable compositor config. Runtime state and handle schemas remain plain
Pydantic models because they are not accepted as graph input.
``LayerConfig`` is the DTO base for config schemas accepted by layer providers.
The provider validates raw node-name keyed configs with a layer's
``config_type`` before constructing the layer and assigning ``self.config``.
``runtime_state_type`` is the only mutable schema managed by Agenton and the only
per-layer data included in session snapshots. The base class infers
``deps_type``, ``config_type``, and ``runtime_state_type`` from generic bases
when possible, while still allowing subclasses to set them explicitly for
unusual inheritance patterns.
``Layer.bind_deps`` is the mutation point for dependency state. Layer
implementations should treat ``self.deps`` as unavailable until a compositor or
caller has resolved and bound dependencies. When a layer needs a dependency's
session-local state or handles, use the current ``LayerControl.control_for`` API
instead of storing dependency controls on layer instances.
``Layer`` is an invocation-scoped business object. It owns ``config``, direct
``deps``, and serializable ``runtime_state`` plus prompt/tool authoring surfaces,
but it does not own lifecycle state, exit intent, graph owner tokens, entry
stacks, resources, or cleanup callbacks. ``CompositorRun`` owns lifecycle state
and exit intent for one entry. ``SessionSnapshot`` objects are the only supported
cross-call state carrier.
Layer async entry uses a caller-provided ``LayerControl`` as an explicit state
machine and per-session runtime owner. A fresh control starts in
``LifecycleState.NEW`` and enters create logic. A suspended control resumes,
while active or closed controls are rejected to prevent ambiguous nested or
post-delete reuse. Exit behavior is selected per entry with ``ExitIntent`` and
resets to delete on every successful enter. Layer instances are shared graph and
capability definitions, so session-local serializable ids, checkpoints, and
other snapshot data belong in ``LayerControl.runtime_state``; live clients,
connections, and process handles belong in ``LayerControl.runtime_handles``.
Neither category should be stored on ``self`` when it is session-local. Live
resources that need deterministic cleanup should be registered on the control's
per-entry resource stack; the base lifecycle closes that stack after suspend and
delete hooks, and also when create/resume fails.
Lifecycle hooks are no-argument business hooks on the layer instance:
``on_context_create/resume/suspend/delete(self)``. They should read dependencies
from ``self.deps`` and read or mutate serializable invocation state through
``self.runtime_state``. Resource acquisition and deterministic cleanup should be
handled outside Agenton core, for example by integration-specific context
managers that wrap compositor entry.
``Layer`` is framework-neutral over system prompt, user prompt, and tool item
types. The native ``prefix_prompts``, ``suffix_prompts``, ``user_prompts``, and
@ -42,24 +46,19 @@ native values without changing layer implementations.
"""
from abc import ABC, abstractmethod
from collections.abc import AsyncIterator, Awaitable, Callable
from contextlib import AbstractAsyncContextManager, AsyncExitStack, asynccontextmanager
from dataclasses import dataclass, field
from collections.abc import Mapping, Sequence
from dataclasses import dataclass
from enum import StrEnum
from types import UnionType
from typing import (
Any,
ClassVar,
Generic,
Mapping,
Protocol,
Sequence,
Union,
cast,
get_args,
get_origin,
get_type_hints,
overload,
)
from pydantic import BaseModel, ConfigDict, JsonValue, SerializeAsAny
@ -75,10 +74,10 @@ _ToolT = TypeVar("_ToolT")
class LayerConfig(BaseModel):
"""Base DTO for serializable layer configuration.
Subclasses are safe to place in ``LayerNodeConfig.config``. The compositor
still accepts plain JSON values for wire input, but typed Python call sites can
use concrete ``LayerConfig`` subclasses and preserve their fields during JSON
serialization.
Layer providers validate raw config values with concrete ``LayerConfig``
subclasses before constructing a layer for one invocation. Serializable
compositor graph config references layer type ids and node metadata only;
per-call config travels through ``Compositor.enter(configs=...)``.
"""
model_config = ConfigDict(extra="forbid")
@ -89,34 +88,17 @@ type LayerConfigValue = JsonValue | SerializeAsAny[LayerConfig]
_ConfigT = TypeVar("_ConfigT", bound=LayerConfig, default="EmptyLayerConfig")
_RuntimeStateT = TypeVar("_RuntimeStateT", bound=BaseModel, default="EmptyRuntimeState")
_RuntimeHandlesT = TypeVar("_RuntimeHandlesT", bound=BaseModel, default="EmptyRuntimeHandles")
_DepRuntimeStateT = TypeVar("_DepRuntimeStateT", bound=BaseModel)
_DepRuntimeHandlesT = TypeVar("_DepRuntimeHandlesT", bound=BaseModel)
_ResourceT = TypeVar("_ResourceT")
class _LayerControlOwnerSession(Protocol):
"""Private structural API used by controls to resolve dependency controls."""
def _control_for_dependency(
self,
owner_layer_id: str,
dep_name: str | None,
dep_layer: "Layer[Any, Any, Any, Any, Any, _DepRuntimeStateT, _DepRuntimeHandlesT]",
) -> "LayerControl[_DepRuntimeStateT, _DepRuntimeHandlesT]": ...
def _layer_for_control_owner(self, owner_layer_id: str) -> "Layer[Any, Any, Any, Any, Any, Any, Any]": ...
class LayerDeps:
"""Typed dependency container for a Layer.
"""Typed dependency container for a layer.
Subclasses declare dependency members with annotations. Every annotated
member must be a Layer subclass or ``LayerSubclass | None``. Optional deps
are always assigned as attributes; missing optional values become ``None``.
"""
def __init__(self, **deps: "Layer[Any, Any, Any, Any, Any, Any, Any] | None") -> None:
def __init__(self, **deps: "Layer[Any, Any, Any, Any, Any, Any] | None") -> None:
dep_specs = _get_dep_specs(type(self))
missing_names = {name for name, spec in dep_specs.items() if not spec.optional} - deps.keys()
if missing_names:
@ -155,23 +137,17 @@ class EmptyLayerConfig(LayerConfig):
class EmptyRuntimeState(BaseModel):
"""Default serializable per-session runtime state schema."""
"""Default serializable invocation runtime state schema."""
model_config = ConfigDict(extra="forbid", validate_assignment=True)
class EmptyRuntimeHandles(BaseModel):
"""Default live per-session runtime handle schema.
Handles may contain arbitrary Python objects and are intentionally excluded
from session snapshots.
"""
model_config = ConfigDict(extra="forbid", validate_assignment=True, arbitrary_types_allowed=True)
class LifecycleState(StrEnum):
"""Externally observable lifecycle state for a layer control."""
"""Lifecycle state for one run slot.
``ACTIVE`` is internal-only. It is used while an invocation is running and
must never appear in external session snapshots or hydrated input.
"""
NEW = "new"
ACTIVE = "active"
@ -180,165 +156,53 @@ class LifecycleState(StrEnum):
class ExitIntent(StrEnum):
"""Per-entry exit behavior requested for a layer control."""
"""Run-slot exit behavior requested during active invocation."""
DELETE = "delete"
SUSPEND = "suspend"
@dataclass(slots=True)
class LayerControl(Generic[_RuntimeStateT, _RuntimeHandlesT]):
"""Stateful control slot passed into a layer entry context.
``Layer.enter`` requires the caller to provide this object. The control owns
the layer lifecycle state, the current entry's exit intent, and arbitrary
per-session runtime state and live handles. Call ``suspend_on_exit`` before leaving the
context to make a later entry resume; call ``delete_on_exit`` or do nothing
for the default delete behavior. Store session-local serializable ids,
checkpoints, and other snapshot data in ``runtime_state``. Store live
clients, connections, process handles, and other non-serializable objects in
``runtime_handles``. Do not put either kind of session-local data on the
shared layer instance.
``runtime_state`` intentionally persists after suspend and delete. Suspend,
resume, and delete hooks can inspect the same values created on entry, and
callers may inspect closed-session diagnostics after exit. Reuse is still
governed by ``state``: a closed control cannot be entered again. Runtime
handles are not serialized in snapshots and should be rehydrated from
runtime state in resume hooks. A compositor also binds private owner metadata
so ``control_for`` can find controls for this layer's dependencies in the
same session; those links are runtime-only and not part of snapshots. The
per-entry resource stack is also runtime-only and exists only while the layer
is entering, active, or exiting.
"""
state: LifecycleState = LifecycleState.NEW
exit_intent: ExitIntent = ExitIntent.DELETE
runtime_state: _RuntimeStateT = field(default_factory=lambda: cast(_RuntimeStateT, EmptyRuntimeState()))
runtime_handles: _RuntimeHandlesT = field(default_factory=lambda: cast(_RuntimeHandlesT, EmptyRuntimeHandles()))
_owner_session: _LayerControlOwnerSession | None = field(default=None, init=False, repr=False, compare=False)
_owner_layer_id: str | None = field(default=None, init=False, repr=False, compare=False)
_entry_stack: AsyncExitStack | None = field(default=None, init=False, repr=False, compare=False)
def suspend_on_exit(self) -> None:
"""Request suspend behavior when the current layer entry exits."""
self.exit_intent = ExitIntent.SUSPEND
def delete_on_exit(self) -> None:
"""Request delete behavior when the current layer entry exits."""
self.exit_intent = ExitIntent.DELETE
async def enter_async_resource(self, cm: AbstractAsyncContextManager[_ResourceT]) -> _ResourceT:
"""Enter ``cm`` on this control's current entry resource stack.
Resource registration is available only while a layer entry is in
progress or active. The base lifecycle closes registered resources after
suspend/delete hooks and when create/resume fails.
"""
return await self._require_entry_stack().enter_async_context(cm)
def add_async_cleanup(self, callback: Callable[[], Awaitable[None]]) -> None:
"""Register an async cleanup callback on the current entry resource stack."""
self._require_entry_stack().push_async_callback(callback)
@overload
def control_for(
self,
dep_layer: "Layer[Any, Any, Any, Any, Any, _DepRuntimeStateT, _DepRuntimeHandlesT]",
/,
) -> "LayerControl[_DepRuntimeStateT, _DepRuntimeHandlesT]": ...
@overload
def control_for(
self,
dep_name: str,
dep_layer: "Layer[Any, Any, Any, Any, Any, _DepRuntimeStateT, _DepRuntimeHandlesT]",
/,
) -> "LayerControl[_DepRuntimeStateT, _DepRuntimeHandlesT]": ...
def control_for(
self,
dep_name_or_layer: "str | Layer[Any, Any, Any, Any, Any, _DepRuntimeStateT, _DepRuntimeHandlesT]",
dep_layer: "Layer[Any, Any, Any, Any, Any, _DepRuntimeStateT, _DepRuntimeHandlesT] | None" = None,
/,
) -> "LayerControl[_DepRuntimeStateT, _DepRuntimeHandlesT]":
"""Return the current session control for one resolved dependency.
``control_for(dep_layer)`` is for the common case where exactly one
resolved dependency target of this control's owner layer is ``dep_layer``.
Use ``control_for(dep_name, dep_layer)`` when multiple dependency fields
can point at the same layer instance or when the name makes the lookup
clearer. Optional dependencies that resolved to ``None`` have no control
and raise ``KeyError`` when requested.
"""
if isinstance(dep_name_or_layer, str):
if dep_layer is None:
raise TypeError("LayerControl.control_for(dep_name, dep_layer) requires dep_layer.")
dep_name = dep_name_or_layer
resolved_dep_layer = dep_layer
else:
if dep_layer is not None:
raise TypeError("LayerControl.control_for accepts either (dep_layer) or (dep_name, dep_layer).")
dep_name = None
resolved_dep_layer = dep_name_or_layer
if self._owner_session is None or self._owner_layer_id is None:
raise RuntimeError("LayerControl is not attached to a compositor session.")
return self._owner_session._control_for_dependency(self._owner_layer_id, dep_name, resolved_dep_layer)
def _bind_owner(self, session: _LayerControlOwnerSession, layer_id: str) -> None:
"""Attach runtime owner metadata used by ``control_for``."""
self._owner_session = session
self._owner_layer_id = layer_id
def _require_entry_stack(self) -> AsyncExitStack:
if self._entry_stack is None:
raise RuntimeError("LayerControl entry resource stack is not active.")
return self._entry_stack
def _begin_entry_stack(self) -> None:
if self._entry_stack is not None:
raise RuntimeError("LayerControl entry resource stack is already active.")
self._entry_stack = AsyncExitStack()
async def _close_entry_stack(self) -> None:
stack = self._entry_stack
self._entry_stack = None
if stack is not None:
await stack.aclose()
@dataclass(frozen=True, slots=True)
class LayerDepSpec:
"""Runtime dependency specification derived from a deps annotation."""
layer_type: type["Layer[Any, Any, Any, Any, Any, Any, Any]"]
layer_type: type["Layer[Any, Any, Any, Any, Any, Any]"]
optional: bool = False
class Layer(
ABC,
Generic[_DepsT, _PromptT, _UserPromptT, _ToolT, _ConfigT, _RuntimeStateT, _RuntimeHandlesT],
Generic[_DepsT, _PromptT, _UserPromptT, _ToolT, _ConfigT, _RuntimeStateT],
):
"""Framework-neutral base class for prompt/tool layers.
Subclasses expose optional prompt fragments and tools through typed
properties. They declare required dependencies in the ``DepsT`` container
rather than by accepting dependencies in ``__init__``. Layer instances can be
entered by multiple sessions, including concurrently, so lifecycle hooks
should store session-local runtime values on the passed ``LayerControl``.
The default async context manager handles create, resume, suspend, and
delete transitions; layers can override ``enter`` when they need to wrap
extra runtime resources.
A layer instance is invocation-scoped mutable business state, not a reusable
cross-session definition. ``CompositorRun`` creates fresh instances through
layer providers, assigns validated ``config``, binds direct dependency layer
instances to ``deps``, hydrates ``runtime_state`` from an optional session
snapshot, and then runs no-argument lifecycle hooks. The run owns lifecycle
state and exit intent; layers never expose a public entry context manager.
Live resources and handles are intentionally outside this abstraction. Only
``runtime_state`` is managed and snapshotted by Agenton core. Lifecycle hooks
should operate on ``self`` and keep any non-serializable cleanup policy in
integration code that wraps the compositor.
"""
deps_type: type[_DepsT]
config: _ConfigT
deps: _DepsT
runtime_state: _RuntimeStateT
type_id: ClassVar[str | None] = None
config_type: ClassVar[type[LayerConfig]] = EmptyLayerConfig
runtime_state_type: ClassVar[type[BaseModel]] = EmptyRuntimeState
runtime_handles_type: ClassVar[type[BaseModel]] = EmptyRuntimeHandles
def __new__(cls, *args: object, **kwargs: object) -> Self:
instance = cast(Self, super().__new__(cls))
runtime_state_type = getattr(cls, "runtime_state_type", None)
if isinstance(runtime_state_type, type) and issubclass(runtime_state_type, BaseModel):
instance.runtime_state = cast(Any, runtime_state_type.model_validate({}))
return instance
def __init_subclass__(cls) -> None:
super().__init_subclass__()
@ -362,56 +226,38 @@ class Layer(
_infer_schema_type(cls, 5, "runtime_state_type"),
EmptyRuntimeState,
)
_init_schema_type(
cls,
"runtime_handles_type",
_infer_schema_type(cls, 6, "runtime_handles_type"),
EmptyRuntimeHandles,
)
@classmethod
def from_config(cls: type[Self], config: _ConfigT) -> Self:
"""Create a layer from schema-validated serialized config.
Registries/builders validate raw config with ``config_type`` before
calling this method. Layers are not config-constructible by default.
Subclasses that accept config should override this method and consume
the typed Pydantic model for their schema.
``LayerProvider.from_layer_type`` validates raw config with
``config_type`` before calling this method. Layers without config use the
default no-argument construction path. Layers with a concrete config
schema should override this method and consume the typed Pydantic model.
"""
raise TypeError(f"{cls.__name__} cannot be created from config.")
if cls.config_type is not EmptyLayerConfig:
raise TypeError(f"{cls.__name__} cannot be created from config; override from_config or use a provider.")
EmptyLayerConfig.model_validate(config)
try:
return cast(Self, cls())
except TypeError as e:
raise TypeError(f"{cls.__name__} cannot be created from empty config; use a custom provider.") from e
@classmethod
def dependency_names(cls) -> frozenset[str]:
"""Return dependency field names declared by this layer's deps schema."""
return frozenset(_get_dep_specs(cls.deps_type))
def new_control(
self,
*,
state: LifecycleState = LifecycleState.NEW,
runtime_state: object | None = None,
) -> LayerControl[_RuntimeStateT, _RuntimeHandlesT]:
"""Create a schema-validated per-session control for this layer.
``runtime_state`` is validated through ``runtime_state_type`` and live
handles are always initialized empty through ``runtime_handles_type``.
"""
raw_runtime_state = {} if runtime_state is None else runtime_state
return LayerControl(
state=state,
exit_intent=ExitIntent.DELETE,
runtime_state=cast(_RuntimeStateT, self.runtime_state_type.model_validate(raw_runtime_state)),
runtime_handles=cast(_RuntimeHandlesT, self.runtime_handles_type.model_validate({})),
)
def bind_deps(self, deps: Mapping[str, "Layer[Any, Any, Any, Any, Any, Any, Any] | None"]) -> None:
def bind_deps(self, deps: Mapping[str, "Layer[Any, Any, Any, Any, Any, Any] | None"]) -> None:
"""Bind this layer's declared dependencies from a name-to-layer mapping.
The mapping may include more layers than the declared dependency fields.
Only names declared by ``deps_type`` are selected and validated. Missing
optional deps are bound as ``None``.
optional deps are bound as ``None``. Bound values are direct layer
instances for this invocation graph.
"""
resolved_deps: dict[str, Layer[Any, Any, Any, Any, Any, Any, Any] | None] = {}
resolved_deps: dict[str, Layer[Any, Any, Any, Any, Any, Any] | None] = {}
for name, spec in _get_dep_specs(self.deps_type).items():
if name not in deps:
if spec.optional:
@ -423,127 +269,17 @@ class Layer(
resolved_deps[name] = deps[name]
self.deps = self.deps_type(**resolved_deps)
def require_control(
self,
control: LayerControl[Any, Any],
*,
active: bool = False,
) -> LayerControl[_RuntimeStateT, _RuntimeHandlesT]:
"""Validate and return ``control`` as this layer's current session control.
async def on_context_create(self) -> None:
"""Run when the run slot enters from ``LifecycleState.NEW``."""
Capability methods should accept their own layer control explicitly and
call this helper before reading runtime state or handles. The control must
be attached to a compositor session whose owner layer id resolves to this
exact layer instance. Runtime state and handle schemas are checked against
the layer's declared schema types; when ``active`` is true, the lifecycle
state must be ``LifecycleState.ACTIVE``.
"""
if control._owner_session is None or control._owner_layer_id is None:
raise RuntimeError("LayerControl is not attached to a compositor session.")
try:
owner_layer = control._owner_session._layer_for_control_owner(control._owner_layer_id)
except KeyError as e:
raise RuntimeError(
f"LayerControl owner layer '{control._owner_layer_id}' is not defined in its compositor."
) from e
if owner_layer is not self:
raise RuntimeError(
f"LayerControl belongs to layer '{control._owner_layer_id}', not this {type(self).__name__} instance."
)
if not isinstance(control.runtime_state, self.runtime_state_type):
raise TypeError(
f"{type(self).__name__} control runtime_state must be {self.runtime_state_type.__name__}, "
f"got {type(control.runtime_state).__name__}."
)
if not isinstance(control.runtime_handles, self.runtime_handles_type):
raise TypeError(
f"{type(self).__name__} control runtime_handles must be {self.runtime_handles_type.__name__}, "
f"got {type(control.runtime_handles).__name__}."
)
if active and control.state is not LifecycleState.ACTIVE:
raise RuntimeError(
f"{type(self).__name__} requires an active LayerControl; current state is {control.state.value}."
)
return cast(LayerControl[_RuntimeStateT, _RuntimeHandlesT], control)
async def on_context_delete(self) -> None:
"""Run when the run slot exits with ``ExitIntent.DELETE``."""
def enter(self, control: LayerControl[_RuntimeStateT, _RuntimeHandlesT]) -> AbstractAsyncContextManager[None]:
"""Return the layer's async entry context manager.
async def on_context_suspend(self) -> None:
"""Run when the run slot exits with ``ExitIntent.SUSPEND``."""
``control`` is the lifecycle control slot for this entry. Subclasses can
override this for unusual wrapping, but ordinary live resources should be
registered with ``control.enter_async_resource`` from lifecycle hooks.
"""
return self.lifecycle_enter(control)
@asynccontextmanager
async def lifecycle_enter(self, control: LayerControl[_RuntimeStateT, _RuntimeHandlesT]) -> AsyncIterator[None]:
"""Run the default explicit lifecycle and resource stack for one entry.
Exit state is recorded even when suspend/delete hooks fail because the
active entry is over once hook cleanup begins.
"""
if control.state is LifecycleState.ACTIVE:
raise RuntimeError(
"LayerControl is already active; duplicate or nested enter is not allowed."
)
if control.state is LifecycleState.CLOSED:
raise RuntimeError(
"LayerControl is closed; create a new compositor session before entering again."
)
control._begin_entry_stack()
try:
if control.state is LifecycleState.NEW:
control.exit_intent = ExitIntent.DELETE
await self.on_context_create(control)
control.state = LifecycleState.ACTIVE
elif control.state is LifecycleState.SUSPENDED:
control.exit_intent = ExitIntent.DELETE
await self.on_context_resume(control)
control.state = LifecycleState.ACTIVE
except BaseException:
await control._close_entry_stack()
raise
try:
yield
finally:
hook_error: BaseException | None = None
if control.exit_intent is ExitIntent.SUSPEND:
try:
await self.on_context_suspend(control)
except BaseException as exc:
hook_error = exc
finally:
control.state = LifecycleState.SUSPENDED
else:
try:
await self.on_context_delete(control)
except BaseException as exc:
hook_error = exc
finally:
control.state = LifecycleState.CLOSED
try:
await control._close_entry_stack()
except BaseException:
if hook_error is not None:
raise hook_error
raise
if hook_error is not None:
raise hook_error
async def on_context_create(self, control: LayerControl[_RuntimeStateT, _RuntimeHandlesT]) -> None:
"""Run when the layer context is entered from ``LifecycleState.NEW``."""
async def on_context_delete(self, control: LayerControl[_RuntimeStateT, _RuntimeHandlesT]) -> None:
"""Run when the layer context exits with ``ExitIntent.DELETE``."""
async def on_context_suspend(self, control: LayerControl[_RuntimeStateT, _RuntimeHandlesT]) -> None:
"""Run when the layer context exits with ``ExitIntent.SUSPEND``."""
async def on_context_resume(self, control: LayerControl[_RuntimeStateT, _RuntimeHandlesT]) -> None:
"""Run when the layer context enters from ``LifecycleState.SUSPENDED``."""
async def on_context_resume(self) -> None:
"""Run when the run slot enters from ``LifecycleState.SUSPENDED``."""
@property
def prefix_prompts(self) -> Sequence[_PromptT]:
@ -563,17 +299,17 @@ class Layer(
@abstractmethod
def wrap_prompt(self, prompt: _PromptT) -> object:
"""Wrap a native prompt item for compositor aggregation."""
"""Wrap a native prompt item for run-level aggregation."""
raise NotImplementedError
@abstractmethod
def wrap_user_prompt(self, prompt: _UserPromptT) -> object:
"""Wrap a native user prompt item for compositor aggregation."""
"""Wrap a native user prompt item for run-level aggregation."""
raise NotImplementedError
@abstractmethod
def wrap_tool(self, tool: _ToolT) -> object:
"""Wrap a native tool item for compositor aggregation."""
"""Wrap a native tool item for run-level aggregation."""
raise NotImplementedError
@ -606,14 +342,14 @@ def _as_dep_spec(annotation: object) -> LayerDepSpec | None:
return LayerDepSpec(layer_type=layer_type)
def _as_layer_type(annotation: object) -> type[Layer[Any, Any, Any, Any, Any, Any, Any]] | None:
def _as_layer_type(annotation: object) -> type[Layer[Any, Any, Any, Any, Any, Any]] | None:
runtime_type = get_origin(annotation) or annotation
if isinstance(runtime_type, type) and issubclass(runtime_type, Layer):
return cast(type[Layer[Any, Any, Any, Any, Any, Any, Any]], runtime_type)
return cast(type[Layer[Any, Any, Any, Any, Any, Any]], runtime_type)
return None
def _infer_deps_type(layer_type: type[Layer[Any, Any, Any, Any, Any, Any, Any]]) -> type[LayerDeps] | None:
def _infer_deps_type(layer_type: type[Layer[Any, Any, Any, Any, Any, Any]]) -> type[LayerDeps] | None:
inferred = _infer_layer_generic_arg(layer_type, 0, {})
if inferred is None:
return None
@ -621,7 +357,7 @@ def _infer_deps_type(layer_type: type[Layer[Any, Any, Any, Any, Any, Any, Any]])
def _infer_schema_type(
layer_type: type[Layer[Any, Any, Any, Any, Any, Any, Any]],
layer_type: type[Layer[Any, Any, Any, Any, Any, Any]],
index: int,
attr_name: str,
) -> type[BaseModel] | None:
@ -634,7 +370,7 @@ def _infer_schema_type(
return schema_type
def _infer_config_type(layer_type: type[Layer[Any, Any, Any, Any, Any, Any, Any]]) -> type[LayerConfig] | None:
def _infer_config_type(layer_type: type[Layer[Any, Any, Any, Any, Any, Any]]) -> type[LayerConfig] | None:
inferred = _infer_schema_generic_arg(layer_type, "config_type", {}) or _infer_layer_generic_arg(layer_type, 4, {})
if inferred is None:
return None
@ -645,7 +381,7 @@ def _infer_config_type(layer_type: type[Layer[Any, Any, Any, Any, Any, Any, Any]
def _infer_schema_generic_arg(
layer_type: type[Layer[Any, Any, Any, Any, Any, Any, Any]],
layer_type: type[Layer[Any, Any, Any, Any, Any, Any]],
attr_name: str,
substitutions: Mapping[object, object],
) -> object | None:
@ -653,7 +389,6 @@ def _infer_schema_generic_arg(
expected_names = {
"config_type": {"ConfigT", "_ConfigT"},
"runtime_state_type": {"RuntimeStateT", "_RuntimeStateT"},
"runtime_handles_type": {"RuntimeHandlesT", "_RuntimeHandlesT"},
}[attr_name]
for base in getattr(layer_type, "__orig_bases__", ()):
origin = get_origin(base) or base
@ -675,7 +410,7 @@ def _infer_schema_generic_arg(
def _infer_layer_generic_arg(
layer_type: type[Layer[Any, Any, Any, Any, Any, Any, Any]],
layer_type: type[Layer[Any, Any, Any, Any, Any, Any]],
index: int,
substitutions: Mapping[object, object],
) -> object | None:
@ -704,7 +439,7 @@ def _infer_layer_generic_arg(
def _init_schema_type(
layer_type: type[Layer[Any, Any, Any, Any, Any, Any, Any]],
layer_type: type[Layer[Any, Any, Any, Any, Any, Any]],
attr_name: str,
inferred_schema_type: type[BaseModel] | None,
default_schema_type: type[BaseModel],
@ -718,7 +453,7 @@ def _init_schema_type(
def _init_config_type(
layer_type: type[Layer[Any, Any, Any, Any, Any, Any, Any]],
layer_type: type[Layer[Any, Any, Any, Any, Any, Any]],
inferred_config_type: type[LayerConfig] | None,
) -> None:
config_type = layer_type.__dict__.get("config_type")
@ -784,7 +519,5 @@ def _as_config_type(value: object) -> type[LayerConfig] | None:
return None
def _is_generic_layer_template(layer_type: type[Layer[Any, Any, Any, Any, Any, Any, Any]]) -> bool:
return bool(getattr(layer_type, "__type_params__", ())) or bool(
getattr(layer_type, "__parameters__", ())
)
def _is_generic_layer_template(layer_type: type[Layer[Any, Any, Any, Any, Any, Any]]) -> bool:
return bool(getattr(layer_type, "__type_params__", ())) or bool(getattr(layer_type, "__parameters__", ()))

View File

@ -4,10 +4,11 @@
that bind its system prompt, user prompt, and tool generic slots to concrete
contracts, such as ordinary strings with plain callable tools or pydantic-ai
prompt/tool shapes. The families keep the trailing schema generic slots open so
concrete layers can have ``config_type``, ``runtime_state_type``, and
``runtime_handles_type`` inferred from type arguments instead of repeated class
attributes. Config schemas use ``LayerConfig`` so they can also be embedded as
typed DTOs in serializable compositor config.
concrete layers can have ``config_type`` and ``runtime_state_type`` inferred from
type arguments instead of repeated class attributes. Config schemas use
``LayerConfig`` so they can also be embedded as
typed DTOs in serializable compositor config. Agenton core is state-only:
typed layer families do not expose runtime handle schemas or resource ownership.
Tagged aggregate aliases cover code paths that can accept any supported
prompt/tool family without changing the plain and pydantic-ai layer contracts.
Pydantic-ai names are imported for static analysis only, so ``agenton`` can be
@ -30,7 +31,7 @@ if TYPE_CHECKING:
from pydantic import BaseModel
from agenton.layers.base import EmptyLayerConfig, EmptyRuntimeHandles, EmptyRuntimeState, Layer, LayerConfig, LayerDeps
from agenton.layers.base import EmptyLayerConfig, EmptyRuntimeState, Layer, LayerConfig, LayerDeps
type PlainPrompt = str
type PlainUserPrompt = str
@ -98,12 +99,11 @@ type AllToolTypes = PlainToolType | PydanticAIToolType[Any]
_DepsT = TypeVar("_DepsT", bound=LayerDeps)
_ConfigT = TypeVar("_ConfigT", bound=LayerConfig, default=EmptyLayerConfig)
_RuntimeStateT = TypeVar("_RuntimeStateT", bound=BaseModel, default=EmptyRuntimeState)
_RuntimeHandlesT = TypeVar("_RuntimeHandlesT", bound=BaseModel, default=EmptyRuntimeHandles)
_AgentDepsT = TypeVar("_AgentDepsT")
class PlainLayer(
Generic[_DepsT, _ConfigT, _RuntimeStateT, _RuntimeHandlesT],
Generic[_DepsT, _ConfigT, _RuntimeStateT],
Layer[
_DepsT,
PlainPrompt,
@ -111,7 +111,6 @@ class PlainLayer(
PlainTool,
_ConfigT,
_RuntimeStateT,
_RuntimeHandlesT,
],
):
"""Layer base for ordinary string prompts and plain-callable tools."""
@ -133,7 +132,7 @@ class PlainLayer(
class PydanticAILayer(
Generic[_DepsT, _AgentDepsT, _ConfigT, _RuntimeStateT, _RuntimeHandlesT],
Generic[_DepsT, _AgentDepsT, _ConfigT, _RuntimeStateT],
Layer[
_DepsT,
PydanticAIPrompt[_AgentDepsT],
@ -141,7 +140,6 @@ class PydanticAILayer(
PydanticAITool[_AgentDepsT],
_ConfigT,
_RuntimeStateT,
_RuntimeHandlesT,
],
):
"""Layer base for pydantic-ai prompt and tool adapters."""

View File

@ -32,7 +32,8 @@ class ObjectLayer[ObjectT](PlainLayer[NoLayerDeps]):
"""Layer that stores one typed object for downstream dependencies.
Object layers are instance-only because arbitrary Python objects are not
serializable graph config. Add them with ``CompositorBuilder.add_instance``.
serializable graph config. Add them with a custom ``LayerProvider`` factory
that creates a fresh object layer for each compositor run.
"""
value: ObjectT
@ -79,7 +80,8 @@ class ToolsLayer(PlainLayer[NoLayerDeps]):
"""Layer that contributes configured plain-callable tools.
Tool layers are instance-only because Python callables are live objects. Add
them with ``CompositorBuilder.add_instance``.
them with a custom ``LayerProvider`` factory that returns a fresh layer for
each compositor run.
"""
tool_entries: Sequence[Callable[..., Any]] = ()

View File

@ -1,130 +1,138 @@
import asyncio
from collections import OrderedDict
from dataclasses import dataclass
import pytest
from pydantic import BaseModel, ConfigDict, ValidationError
from typing_extensions import override
from agenton.compositor import Compositor, CompositorBuilder, CompositorSession, LayerNodeConfig, LayerRegistry
from agenton.layers import EmptyLayerConfig, LayerControl, LayerDeps, NoLayerDeps, PlainLayer, PlainPromptType, PlainToolType
import agenton.compositor as compositor_module
import agenton.layers as layers_module
from agenton.compositor import Compositor, LayerNode, LayerNodeConfig, LayerProvider
from agenton.layers import EmptyLayerConfig, Layer, LayerDeps, NoLayerDeps, PlainLayer
from agenton_collections.layers.plain import ObjectLayer, PromptLayer, PromptLayerConfig
def test_registry_infers_descriptor_and_rejects_duplicate_or_missing_type_id() -> None:
registry = LayerRegistry()
registry.register_layer(PromptLayer)
descriptor = registry.resolve("plain.prompt")
assert descriptor.layer_type is PromptLayer
assert descriptor.config_type is PromptLayer.config_type
try:
registry.register_layer(PromptLayer)
except ValueError as e:
assert str(e) == "Layer type id 'plain.prompt' is already registered."
else:
raise AssertionError("Expected ValueError.")
try:
registry.register_layer(InstanceOnlyLayer)
except ValueError as e:
assert "must declare a type_id" in str(e)
else:
raise AssertionError("Expected ValueError.")
try:
registry.register_layer(InstanceOnlyLayer, type_id=123) # pyright: ignore[reportArgumentType]
except TypeError as e:
assert str(e) == "Layer type id for 'InstanceOnlyLayer' must be a string."
else:
raise AssertionError("Expected TypeError.")
@dataclass(slots=True)
class InstanceOnlyLayer(PlainLayer[NoLayerDeps]):
pass
def test_builder_creates_config_layers_with_typed_validation() -> None:
registry = LayerRegistry()
registry.register_layer(PromptLayer)
@dataclass(slots=True)
class RequiredConstructorLayer(PlainLayer[NoLayerDeps]):
value: str
compositor = (
CompositorBuilder(registry)
.add_config_layer(
name="prompt",
type="plain.prompt",
config={"prefix": "hello", "user": "ask politely", "suffix": ["bye"]},
)
.build()
def test_layer_provider_from_layer_type_uses_declared_schema_and_type_id() -> None:
provider = LayerProvider.from_layer_type(PromptLayer)
assert provider.type_id == "plain.prompt"
assert provider.layer_type is PromptLayer
layer = provider.create_layer(PromptLayerConfig(prefix="hello", user="ask politely"))
assert isinstance(layer, PromptLayer)
assert layer.config == PromptLayerConfig(prefix="hello", user="ask politely")
assert layer.prefix_prompts == ["hello"]
with pytest.raises(TypeError, match="cannot be created from empty config"):
LayerProvider.from_layer_type(RequiredConstructorLayer).create_layer()
def test_compositor_from_config_uses_providers_and_enter_configs_by_node_name() -> None:
compositor = Compositor.from_config(
{"layers": [{"name": "prompt", "type": "plain.prompt"}]},
providers=[PromptLayer],
)
assert [prompt.value for prompt in compositor.prompts] == ["hello", "bye"]
assert [prompt.value for prompt in compositor.user_prompts] == ["ask politely"]
async def run() -> None:
async with compositor.enter(
configs={"prompt": {"prefix": "hello", "user": "ask politely", "suffix": ["bye"]}}
) as active_run:
assert [prompt.value for prompt in active_run.prompts] == ["hello", "bye"]
assert [prompt.value for prompt in active_run.user_prompts] == ["ask politely"]
try:
CompositorBuilder(registry).add_config_layer(
name="bad",
type="plain.prompt",
config={"unknown": "field"},
)
except ValidationError:
pass
else:
raise AssertionError("Expected ValidationError.")
asyncio.run(run())
with pytest.raises(ValidationError):
asyncio.run(_enter_once(compositor, configs={"prompt": {"unknown": "field"}}))
def test_layer_node_config_accepts_config_dto_and_serializes_fields() -> None:
registry = LayerRegistry()
registry.register_layer(PromptLayer)
def test_layer_node_config_has_no_runtime_state_or_layer_config() -> None:
node = LayerNodeConfig(
name="prompt",
type="plain.prompt",
config=PromptLayerConfig(prefix="hello", user="ask politely"),
deps={"source": "other"},
metadata={"label": "Prompt"},
)
dumped = node.model_dump(mode="json")
compositor = CompositorBuilder(registry).add_config({"layers": [dumped]}).build()
assert dumped["config"] == {"prefix": "hello", "user": "ask politely", "suffix": []}
assert [prompt.value for prompt in compositor.prompts] == ["hello"]
assert [prompt.value for prompt in compositor.user_prompts] == ["ask politely"]
assert node.model_dump(mode="json") == {
"name": "prompt",
"type": "plain.prompt",
"deps": {"source": "other"},
"metadata": {"label": "Prompt"},
}
assert "runtime_state" not in LayerNodeConfig.model_fields
assert "config" not in LayerNodeConfig.model_fields
def test_registry_factory_constructs_layer_with_injected_dependencies() -> None:
registry = LayerRegistry()
registry.register_layer(
PromptLayer,
factory=lambda config: PromptLayer(prefix=PromptLayerConfig.model_validate(config).prefix),
def test_node_providers_override_type_id_providers_for_serializable_graphs() -> None:
override_provider = LayerProvider.from_factory(
layer_type=PromptLayer,
create=lambda config: PromptLayer(prefix="override"),
)
compositor = Compositor.from_config(
{"layers": [{"name": "prompt", "type": "plain.prompt"}]},
providers=[PromptLayer],
node_providers={"prompt": override_provider},
)
compositor = CompositorBuilder(registry).add_config(
{"layers": [{"name": "prompt", "type": "plain.prompt", "config": {"prefix": "factory"}}]}
).build()
async def run() -> None:
async with compositor.enter(configs={"prompt": {"prefix": "ignored"}}) as active_run:
assert [prompt.value for prompt in active_run.prompts] == ["override"]
assert [prompt.value for prompt in compositor.prompts] == ["factory"]
asyncio.run(run())
def test_compositor_get_layer_returns_named_layer_and_validates_type() -> None:
layer = ObjectLayer("value")
compositor: Compositor[PlainPromptType, PlainToolType] = Compositor(layers=OrderedDict([("obj", layer)]))
def test_from_config_rejects_missing_duplicate_and_unknown_providers() -> None:
with pytest.raises(KeyError, match="Layer type id 'missing' is not registered"):
Compositor.from_config({"layers": [{"name": "node", "type": "missing"}]}, providers=[])
assert compositor.get_layer("obj") is layer
assert compositor.get_layer("obj", ObjectLayer) is layer
with pytest.raises(ValueError, match="already registered"):
Compositor.from_config(
{"layers": [{"name": "prompt", "type": "plain.prompt"}]},
providers=[PromptLayer, PromptLayer],
)
try:
compositor.get_layer("missing")
except KeyError as e:
assert str(e) == '"Layer \'missing\' is not defined in this compositor."'
else:
raise AssertionError("Expected KeyError.")
with pytest.raises(ValueError, match="must declare a type_id"):
Compositor.from_config(
{"layers": [{"name": "node", "type": "instance.only"}]},
providers=[InstanceOnlyLayer],
)
try:
compositor.get_layer("obj", PromptLayer)
except TypeError as e:
assert str(e) == "Layer 'obj' must be PromptLayer, got ObjectLayer."
else:
raise AssertionError("Expected TypeError.")
with pytest.raises(ValueError, match="unknown layer node names: other"):
Compositor.from_config(
{"layers": [{"name": "prompt", "type": "plain.prompt"}]},
providers=[PromptLayer],
node_providers={"other": PromptLayer},
)
def test_compositor_run_get_layer_returns_named_layer_and_validates_type() -> None:
compositor = Compositor([LayerNode("obj", _object_provider("value"))])
async def run() -> None:
async with compositor.enter() as active_run:
layer = active_run.get_layer("obj", ObjectLayer)
assert active_run.get_layer("obj") is layer
assert layer.value == "value"
with pytest.raises(KeyError, match="Layer 'missing' is not defined"):
active_run.get_layer("missing")
with pytest.raises(TypeError, match="Layer 'obj' must be PromptLayer, got ObjectLayer"):
active_run.get_layer("obj", PromptLayer)
asyncio.run(run())
class ObjectConsumerDeps(LayerDeps):
@ -139,176 +147,137 @@ class ObjectConsumerLayer(PlainLayer[ObjectConsumerDeps]):
return [self.deps.obj.value]
def test_builder_mixes_config_and_instances_and_rejects_invalid_deps() -> None:
registry = LayerRegistry()
registry.register_layer(PromptLayer)
compositor = (
CompositorBuilder(registry)
.add_config({"layers": [{"name": "prompt", "type": "plain.prompt", "config": {"prefix": "cfg"}}]})
.add_instance(name="obj", layer=ObjectLayer("instance"))
.add_instance(name="consumer", layer=ObjectConsumerLayer(), deps={"obj": "obj"})
.build()
def test_python_native_construction_mixes_layer_classes_and_providers() -> None:
compositor = Compositor(
[
LayerNode("prompt", PromptLayer),
LayerNode("obj", _object_provider("instance")),
LayerNode("consumer", ObjectConsumerLayer, deps={"obj": "obj"}),
]
)
assert [prompt.value for prompt in compositor.prompts] == ["cfg", "instance"]
async def run() -> None:
async with compositor.enter(configs={"prompt": {"prefix": "cfg"}}) as active_run:
assert [prompt.value for prompt in active_run.prompts] == ["cfg", "instance"]
try:
CompositorBuilder(registry).add_instance(
name="consumer",
layer=ObjectConsumerLayer(),
deps={"missing_dep_key": "obj"},
).build()
except ValueError as e:
assert str(e) == "Layer 'consumer' declares unknown dependency keys: missing_dep_key."
else:
raise AssertionError("Expected ValueError.")
try:
CompositorBuilder(registry).add_instance(
name="consumer",
layer=ObjectConsumerLayer(),
deps={"obj": "missing_target"},
).build()
except ValueError as e:
assert str(e) == "Layer 'consumer' depends on undefined layer names: missing_target."
else:
raise AssertionError("Expected ValueError.")
asyncio.run(run())
class HandleState(BaseModel):
class SerializableState(BaseModel):
resource_id: str = ""
created: bool = False
resumed: bool = False
model_config = ConfigDict(extra="forbid", validate_assignment=True)
class HandleBox:
def __init__(self, value: str) -> None:
self.value = value
class HandleModels(BaseModel):
handle: HandleBox | None = None
model_config = ConfigDict(extra="forbid", validate_assignment=True, arbitrary_types_allowed=True)
@dataclass(slots=True)
class HandleLayer(PlainLayer[NoLayerDeps, EmptyLayerConfig, HandleState, HandleModels]):
created: int = 0
resumed: int = 0
class StateLayer(PlainLayer[NoLayerDeps, EmptyLayerConfig, SerializableState]):
created_hooks: int = 0
resumed_hooks: int = 0
@override
async def on_context_create(self, control: LayerControl[HandleState, HandleModels]) -> None:
self.created += 1
control.runtime_handles.handle = HandleBox(control.runtime_state.resource_id)
async def on_context_create(self) -> None:
self.created_hooks += 1
self.runtime_state.created = True
@override
async def on_context_resume(self, control: LayerControl[HandleState, HandleModels]) -> None:
self.resumed += 1
control.runtime_handles.handle = HandleBox(f"resumed:{control.runtime_state.resource_id}")
async def on_context_resume(self) -> None:
self.resumed_hooks += 1
self.runtime_state.resumed = True
def test_new_session_uses_layer_runtime_schemas() -> None:
compositor: Compositor[PlainPromptType, PlainToolType] = Compositor(
layers=OrderedDict([("handle", HandleLayer())])
)
session = compositor.new_session()
def test_snapshot_contains_runtime_state_only_not_config_deps_or_resources() -> None:
compositor = Compositor([LayerNode("state", StateLayer)])
assert isinstance(session.layer("handle").runtime_state, HandleState)
assert isinstance(session.layer("handle").runtime_handles, HandleModels)
async def get_snapshot() -> dict[str, object]:
async with compositor.enter() as active_run:
state_layer = active_run.get_layer("state", StateLayer)
state_layer.runtime_state.resource_id = "abc"
assert active_run.session_snapshot is not None
return active_run.session_snapshot.model_dump(mode="json")
def test_enter_rejects_bad_session_runtime_schemas_before_layer_hooks() -> None:
layer = HandleLayer()
compositor: Compositor[PlainPromptType, PlainToolType] = Compositor(layers=OrderedDict([("handle", layer)]))
bad_session = CompositorSession(OrderedDict([("handle", LayerControl())]))
async def run() -> None:
async with compositor.enter(bad_session):
pass
try:
asyncio.run(run())
except TypeError as e:
assert str(e) == (
"CompositorSession layer 'handle' runtime_state must be HandleState, "
"got EmptyRuntimeState."
)
else:
raise AssertionError("Expected TypeError.")
assert layer.created == 0
def test_snapshot_rejects_active_sessions_and_excludes_handles() -> None:
compositor: Compositor[PlainPromptType, PlainToolType] = Compositor(
layers=OrderedDict([("handle", HandleLayer())])
)
session = compositor.session_from_snapshot(
{"layers": [{"name": "handle", "state": "new", "runtime_state": {"resource_id": "abc"}}]}
)
async def run() -> None:
async with compositor.enter(session):
try:
compositor.snapshot_session(session)
except RuntimeError as e:
assert str(e) == "Cannot snapshot active compositor session layers: handle."
else:
raise AssertionError("Expected RuntimeError.")
asyncio.run(run())
snapshot = compositor.snapshot_session(session)
dumped = snapshot.model_dump(mode="json")
dumped = asyncio.run(get_snapshot())
assert dumped == {
"schema_version": 1,
"layers": [{"name": "handle", "state": "closed", "runtime_state": {"resource_id": "abc"}}],
"layers": [
{
"name": "state",
"lifecycle_state": "closed",
"runtime_state": {"resource_id": "abc", "created": True, "resumed": False},
}
],
}
assert "_entry_stack" not in str(dumped)
assert "_owner" not in str(dumped)
def test_restore_validates_runtime_state_and_resume_rehydrates_handles() -> None:
layer = HandleLayer()
compositor: Compositor[PlainPromptType, PlainToolType] = Compositor(layers=OrderedDict([("handle", layer)]))
def test_hydrate_validates_runtime_state_and_resume_mutates_layer_self() -> None:
compositor = Compositor([LayerNode("state", StateLayer)])
try:
compositor.session_from_snapshot(
{"layers": [{"name": "handle", "state": "suspended", "runtime_state": {"wrong": "field"}}]}
)
except ValidationError:
pass
else:
raise AssertionError("Expected ValidationError.")
bad_snapshot = {"layers": [{"name": "state", "lifecycle_state": "suspended", "runtime_state": {"wrong": "field"}}]}
with pytest.raises(ValidationError):
asyncio.run(_enter_once(compositor, session_snapshot=bad_snapshot))
restored = compositor.session_from_snapshot(
{"layers": [{"name": "handle", "state": "suspended", "runtime_state": {"resource_id": "abc"}}]}
)
good_snapshot = {
"layers": [
{
"name": "state",
"lifecycle_state": "suspended",
"runtime_state": {"resource_id": "abc", "created": True, "resumed": False},
}
]
}
async def run() -> None:
async with compositor.enter(restored):
control = restored.layer("handle")
assert isinstance(control.runtime_handles, HandleModels)
assert control.runtime_handles.handle is not None
assert control.runtime_handles.handle.value == "resumed:abc"
async with compositor.enter(session_snapshot=good_snapshot) as active_run:
layer = active_run.get_layer("state", StateLayer)
assert layer.runtime_state.resource_id == "abc"
assert layer.runtime_state.resumed is True
assert layer.resumed_hooks == 1
asyncio.run(run())
assert layer.resumed == 1
def test_hydrate_rejects_mismatched_snapshot_layer_names() -> None:
compositor = Compositor([LayerNode("state", StateLayer)])
def test_session_from_snapshot_rejects_active_layer_state() -> None:
compositor: Compositor[PlainPromptType, PlainToolType] = Compositor(
layers=OrderedDict([("handle", HandleLayer())])
)
try:
compositor.session_from_snapshot(
{"layers": [{"name": "handle", "state": "active", "runtime_state": {"resource_id": "abc"}}]}
with pytest.raises(ValueError, match=r"Expected \[state\], got \[other\]"):
asyncio.run(
_enter_once(
compositor,
session_snapshot={"layers": [{"name": "other", "lifecycle_state": "new", "runtime_state": {}}]},
)
)
except ValueError as e:
assert str(e) == "Cannot restore active compositor session layers from snapshot: handle."
else:
raise AssertionError("Expected ValueError.")
def test_removed_lifecycle_and_resource_apis_are_not_public_exports() -> None:
assert not hasattr(compositor_module, "CompositorBuilder")
assert not hasattr(compositor_module, "LayerRegistry")
assert not hasattr(compositor_module, "LayerDescriptor")
assert not hasattr(layers_module, "LayerControl")
assert not hasattr(layers_module, "EmptyRuntimeHandles")
assert not hasattr(Layer, "enter")
assert not hasattr(Layer, "hydrate_session_state")
assert not hasattr(Layer, "suspend_on_exit")
assert not hasattr(Layer, "delete_on_exit")
assert not hasattr(Layer, "runtime_handles")
assert not hasattr(Layer, "require_control")
assert not hasattr(Layer, "control_for")
assert not hasattr(Layer, "enter_async_resource")
assert not hasattr(Layer, "add_async_cleanup")
def _object_provider(value: str) -> LayerProvider[ObjectLayer[str]]:
return LayerProvider.from_factory(layer_type=ObjectLayer, create=lambda config: ObjectLayer(value))
async def _enter_once(
compositor: Compositor,
*,
configs: dict[str, object] | None = None,
session_snapshot: object | None = None,
) -> None:
async with compositor.enter(
configs=configs, # pyright: ignore[reportArgumentType]
session_snapshot=session_snapshot, # pyright: ignore[reportArgumentType]
):
pass

View File

@ -1,222 +0,0 @@
import asyncio
from collections import OrderedDict
from dataclasses import dataclass
import pytest
from typing_extensions import override
from agenton.compositor import Compositor, CompositorSession
from agenton.layers import LayerControl, LayerDeps, PlainLayer, PlainPromptType, PlainToolType
from agenton_collections.layers.plain import ObjectLayer
class RenamedObjectDeps(LayerDeps):
renamed: ObjectLayer[str] # pyright: ignore[reportUninitializedInstanceVariable]
@dataclass(slots=True)
class RenamedConsumerLayer(PlainLayer[RenamedObjectDeps]):
@property
@override
def prefix_prompts(self) -> list[str]:
return [self.deps.renamed.value]
class SameNameObjectDeps(LayerDeps):
same: ObjectLayer[str] # pyright: ignore[reportUninitializedInstanceVariable]
@dataclass(slots=True)
class SameNameConsumerLayer(PlainLayer[SameNameObjectDeps]):
pass
class DoubleObjectDeps(LayerDeps):
first: ObjectLayer[str] # pyright: ignore[reportUninitializedInstanceVariable]
second: ObjectLayer[str] # pyright: ignore[reportUninitializedInstanceVariable]
@dataclass(slots=True)
class DoubleConsumerLayer(PlainLayer[DoubleObjectDeps]):
pass
class OptionalObjectDeps(LayerDeps):
maybe: ObjectLayer[str] | None # pyright: ignore[reportUninitializedInstanceVariable]
@dataclass(slots=True)
class OptionalConsumerLayer(PlainLayer[OptionalObjectDeps]):
pass
def test_control_for_layer_resolves_unique_explicit_dependency_rename() -> None:
target = ObjectLayer("target")
consumer = RenamedConsumerLayer()
compositor: Compositor[PlainPromptType, PlainToolType] = Compositor(
layers=OrderedDict([("actual", target), ("consumer", consumer)]),
deps_name_mapping={"consumer": {"renamed": "actual"}},
)
session = compositor.new_session()
resolved = session.layer("consumer").control_for(target)
assert resolved is session.layer("actual")
assert consumer.prefix_prompts == ["target"]
def test_control_for_layer_resolves_unique_implicit_same_name_dependency() -> None:
target = ObjectLayer("target")
compositor: Compositor[PlainPromptType, PlainToolType] = Compositor(
layers=OrderedDict([("same", target), ("consumer", SameNameConsumerLayer())]),
)
session = compositor.new_session()
assert session.layer("consumer").control_for(target) is session.layer("same")
def test_control_for_layer_raises_when_no_dependency_points_to_layer() -> None:
target = ObjectLayer("target")
unrelated = ObjectLayer("unrelated")
compositor: Compositor[PlainPromptType, PlainToolType] = Compositor(
layers=OrderedDict([("target", target), ("unrelated", unrelated), ("consumer", RenamedConsumerLayer())]),
deps_name_mapping={"consumer": {"renamed": "target"}},
)
session = compositor.new_session()
with pytest.raises(KeyError, match="no dependency target.*provided ObjectLayer instance"):
_ = session.layer("consumer").control_for(unrelated)
def test_control_for_layer_raises_when_multiple_dependency_fields_match() -> None:
target = ObjectLayer("target")
compositor: Compositor[PlainPromptType, PlainToolType] = Compositor(
layers=OrderedDict([("target", target), ("consumer", DoubleConsumerLayer())]),
deps_name_mapping={"consumer": {"first": "target", "second": "target"}},
)
session = compositor.new_session()
with pytest.raises(ValueError, match="multiple dependency fields.*Pass dep_name explicitly"):
_ = session.layer("consumer").control_for(target)
def test_control_for_explicit_dep_name_disambiguates_multiple_deps() -> None:
target = ObjectLayer("target")
compositor: Compositor[PlainPromptType, PlainToolType] = Compositor(
layers=OrderedDict([("target", target), ("consumer", DoubleConsumerLayer())]),
deps_name_mapping={"consumer": {"first": "target", "second": "target"}},
)
session = compositor.new_session()
assert session.layer("consumer").control_for("second", target) is session.layer("target")
def test_control_for_optional_missing_dependency_raises() -> None:
target = ObjectLayer("target")
consumer = OptionalConsumerLayer()
compositor: Compositor[PlainPromptType, PlainToolType] = Compositor(
layers=OrderedDict([("consumer", consumer)]),
)
session = compositor.new_session()
assert consumer.deps.maybe is None
with pytest.raises(KeyError, match="dependency 'maybe' is not bound"):
_ = session.layer("consumer").control_for("maybe", target)
def test_restored_session_rebinds_owner_links_for_control_for() -> None:
target = ObjectLayer("target")
compositor: Compositor[PlainPromptType, PlainToolType] = Compositor(
layers=OrderedDict([("actual", target), ("consumer", RenamedConsumerLayer())]),
deps_name_mapping={"consumer": {"renamed": "actual"}},
)
session = compositor.new_session()
async def suspend_session() -> None:
async with compositor.enter(session) as active_session:
active_session.suspend_on_exit()
asyncio.run(suspend_session())
restored = compositor.session_from_snapshot(compositor.snapshot_session(session))
assert restored.layer("consumer").control_for(target) is restored.layer("actual")
def test_enter_rebinds_external_session_owner_links_for_control_for() -> None:
target = ObjectLayer("target")
consumer = RenamedConsumerLayer()
compositor: Compositor[PlainPromptType, PlainToolType] = Compositor(
layers=OrderedDict([("actual", target), ("consumer", consumer)]),
deps_name_mapping={"consumer": {"renamed": "actual"}},
)
external_session = CompositorSession(
OrderedDict([("actual", target.new_control()), ("consumer", consumer.new_control())])
)
async def enter_session() -> None:
async with compositor.enter(external_session) as active_session:
assert active_session.layer("consumer").control_for(target) is active_session.layer("actual")
asyncio.run(enter_session())
def test_failed_enter_does_not_rebind_active_session_owner_links() -> None:
first_target = ObjectLayer("first")
second_target = ObjectLayer("second")
first_compositor: Compositor[PlainPromptType, PlainToolType] = Compositor(
layers=OrderedDict([("actual", first_target), ("consumer", RenamedConsumerLayer())]),
deps_name_mapping={"consumer": {"renamed": "actual"}},
)
second_compositor: Compositor[PlainPromptType, PlainToolType] = Compositor(
layers=OrderedDict([("actual", second_target), ("consumer", RenamedConsumerLayer())]),
deps_name_mapping={"consumer": {"renamed": "actual"}},
)
session = first_compositor.new_session()
async def enter_conflicting_compositor() -> None:
async with first_compositor.enter(session) as active_session:
with pytest.raises(RuntimeError, match="already active"):
async with second_compositor.enter(active_session):
raise AssertionError("Expected active-session rejection before entering layers.")
assert active_session.layer("consumer").control_for(first_target) is active_session.layer("actual")
asyncio.run(enter_conflicting_compositor())
def test_control_for_uses_owner_resolved_targets_not_graph_wide_object_identity() -> None:
shared_target = ObjectLayer("shared")
compositor: Compositor[PlainPromptType, PlainToolType] = Compositor(
layers=OrderedDict(
[
("first-id", shared_target),
("second-id", shared_target),
("consumer", RenamedConsumerLayer()),
]
),
deps_name_mapping={"consumer": {"renamed": "second-id"}},
)
session = compositor.new_session()
resolved = session.layer("consumer").control_for(shared_target)
assert resolved is session.layer("second-id")
assert resolved is not session.layer("first-id")
def test_control_for_explicit_dep_name_rejects_wrong_layer_instance() -> None:
target = ObjectLayer("target")
wrong = ObjectLayer("wrong")
compositor: Compositor[PlainPromptType, PlainToolType] = Compositor(
layers=OrderedDict([("target", target), ("wrong", wrong), ("consumer", RenamedConsumerLayer())]),
deps_name_mapping={"consumer": {"renamed": "target"}},
)
session = compositor.new_session()
with pytest.raises(TypeError, match="dependency 'renamed'.*not the provided ObjectLayer instance"):
_ = session.layer("consumer").control_for("renamed", wrong)
def test_control_for_unowned_control_raises_clear_error() -> None:
with pytest.raises(RuntimeError, match="not attached to a compositor session"):
_ = LayerControl().control_for(ObjectLayer("target"))

View File

@ -0,0 +1,128 @@
import asyncio
from dataclasses import dataclass
import pytest
from typing_extensions import override
from agenton.compositor import Compositor, LayerNode, LayerProvider
from agenton.layers import EmptyLayerConfig, LayerDeps, PlainLayer
from agenton_collections.layers.plain import ObjectLayer
class RenamedObjectDeps(LayerDeps):
renamed: ObjectLayer[str] # pyright: ignore[reportUninitializedInstanceVariable]
@dataclass(slots=True)
class RenamedConsumerLayer(PlainLayer[RenamedObjectDeps]):
@property
@override
def prefix_prompts(self) -> list[str]:
return [self.deps.renamed.value]
class SameNameObjectDeps(LayerDeps):
same: ObjectLayer[str] # pyright: ignore[reportUninitializedInstanceVariable]
@dataclass(slots=True)
class SameNameConsumerLayer(PlainLayer[SameNameObjectDeps]):
@property
@override
def prefix_prompts(self) -> list[str]:
return [self.deps.same.value]
class OptionalObjectDeps(LayerDeps):
maybe: ObjectLayer[str] | None # pyright: ignore[reportUninitializedInstanceVariable]
@dataclass(slots=True)
class OptionalConsumerLayer(PlainLayer[OptionalObjectDeps]):
pass
def _object_provider(value: str) -> LayerProvider[ObjectLayer[str]]:
return LayerProvider.from_factory(
layer_type=ObjectLayer,
create=lambda config: ObjectLayer(value),
)
def test_direct_deps_access_uses_explicit_dependency_rename() -> None:
compositor = Compositor(
[
LayerNode("actual", _object_provider("target")),
LayerNode("consumer", RenamedConsumerLayer, deps={"renamed": "actual"}),
]
)
async def run() -> None:
async with compositor.enter() as active_run:
target = active_run.get_layer("actual", ObjectLayer)
consumer = active_run.get_layer("consumer", RenamedConsumerLayer)
assert consumer.deps.renamed is target
assert [prompt.value for prompt in active_run.prompts] == ["target"]
asyncio.run(run())
def test_direct_deps_access_uses_explicit_same_name_dependency() -> None:
compositor = Compositor(
[
LayerNode("same", _object_provider("target")),
LayerNode("consumer", SameNameConsumerLayer, deps={"same": "same"}),
]
)
async def run() -> None:
async with compositor.enter() as active_run:
target = active_run.get_layer("same", ObjectLayer)
consumer = active_run.get_layer("consumer", SameNameConsumerLayer)
assert consumer.deps.same is target
assert [prompt.value for prompt in active_run.prompts] == ["target"]
asyncio.run(run())
def test_optional_missing_dependency_is_bound_to_none() -> None:
compositor = Compositor([LayerNode("consumer", OptionalConsumerLayer)])
async def run() -> None:
async with compositor.enter() as active_run:
consumer = active_run.get_layer("consumer", OptionalConsumerLayer)
assert consumer.deps.maybe is None
asyncio.run(run())
def test_missing_required_dependency_is_rejected_before_hooks() -> None:
compositor = Compositor([LayerNode("consumer", SameNameConsumerLayer)])
with pytest.raises(ValueError, match="Dependency 'same' is required"):
asyncio.run(_enter_once(compositor))
def test_unknown_dependency_mapping_is_rejected_for_compositor_construction() -> None:
with pytest.raises(ValueError, match="unknown dependency keys: missing"):
Compositor([LayerNode("consumer", RenamedConsumerLayer, deps={"missing": "target"})])
def test_undefined_dependency_target_is_rejected_for_compositor_construction() -> None:
with pytest.raises(ValueError, match="depends on undefined layer names: missing_target"):
Compositor([LayerNode("consumer", RenamedConsumerLayer, deps={"renamed": "missing_target"})])
def test_duplicate_layer_node_name_is_rejected() -> None:
with pytest.raises(ValueError, match="Duplicate layer name 'same'"):
Compositor(
[
LayerNode("same", _object_provider("first")),
LayerNode("same", _object_provider("second")),
]
)
async def _enter_once(compositor: Compositor) -> None:
async with compositor.enter(configs={"consumer": EmptyLayerConfig()}):
pass

View File

@ -1,254 +1,266 @@
import asyncio
from collections import OrderedDict
from collections.abc import Iterator
from dataclasses import dataclass, field
from itertools import count
from typing import cast
import pytest
from pydantic import BaseModel, ConfigDict
from pydantic import BaseModel, ConfigDict, ValidationError
from typing_extensions import override
from agenton.compositor import Compositor, CompositorSession
from agenton.compositor import Compositor, CompositorSessionSnapshot, LayerNode, LayerProvider
from agenton.layers import (
ExitIntent,
EmptyLayerConfig,
EmptyRuntimeHandles,
EmptyRuntimeState,
LayerControl,
ExitIntent,
LayerConfig,
LifecycleState,
NoLayerDeps,
PlainLayer,
PlainPromptType,
PlainToolType,
)
@dataclass(slots=True)
class TraceLayer(PlainLayer[NoLayerDeps]):
"""Layer that records lifecycle events observable to tests."""
"""Layer that records no-arg lifecycle events observable to tests."""
events: list[str] = field(default_factory=list)
@override
async def on_context_create(self, control: LayerControl) -> None:
async def on_context_create(self) -> None:
self.events.append("create")
@override
async def on_context_suspend(self, control: LayerControl) -> None:
async def on_context_suspend(self) -> None:
self.events.append("suspend")
@override
async def on_context_resume(self, control: LayerControl) -> None:
async def on_context_resume(self) -> None:
self.events.append("resume")
@override
async def on_context_delete(self, control: LayerControl) -> None:
async def on_context_delete(self) -> None:
self.events.append("delete")
def _compositor(*layer_names: str) -> tuple[Compositor[PlainPromptType, PlainToolType], dict[str, TraceLayer]]:
layers = {layer_name: TraceLayer() for layer_name in layer_names}
compositor: Compositor[PlainPromptType, PlainToolType] = Compositor(layers=OrderedDict(layers.items()))
return compositor, layers
def _compositor(*layer_names: str) -> Compositor:
return Compositor([LayerNode(layer_name, TraceLayer) for layer_name in layer_names])
def test_compositor_session_suspends_resumes_and_deletes_all_layers() -> None:
compositor, layers = _compositor("first", "second")
session = compositor.new_session()
def test_same_compositor_enters_multiple_times_with_fresh_layers_and_snapshot_resume() -> None:
compositor = _compositor("first", "second")
runs = []
async def run() -> None:
async with compositor.enter(session) as active_session:
assert active_session is session
assert list(active_session.layer_controls) == ["first", "second"]
active_session.suspend_on_exit()
assert active_session.layer("first").exit_intent is ExitIntent.SUSPEND
async with compositor.enter() as first_run:
assert [slot.lifecycle_state for slot in first_run.slots.values()] == [
LifecycleState.ACTIVE,
LifecycleState.ACTIVE,
]
first_run.suspend_on_exit()
assert [slot.exit_intent for slot in first_run.slots.values()] == [
ExitIntent.SUSPEND,
ExitIntent.SUSPEND,
]
runs.append(first_run)
assert session.layer("first").state is LifecycleState.SUSPENDED
async with compositor.enter(session):
pass
assert first_run.session_snapshot is not None
async with compositor.enter(session_snapshot=first_run.session_snapshot) as resumed_run:
assert resumed_run.get_layer("first", TraceLayer).events == ["resume"]
assert resumed_run.get_layer("second", TraceLayer).events == ["resume"]
runs.append(resumed_run)
asyncio.run(run())
assert layers["first"].events == ["create", "suspend", "resume", "delete"]
assert layers["second"].events == ["create", "suspend", "resume", "delete"]
assert session.layer("first").state is LifecycleState.CLOSED
first_layer = runs[0].get_layer("first", TraceLayer)
resumed_layer = runs[1].get_layer("first", TraceLayer)
assert first_layer is not resumed_layer
assert first_layer.events == ["create", "suspend"]
assert resumed_layer.events == ["resume", "delete"]
assert runs[1].session_snapshot is not None
assert [layer.lifecycle_state for layer in runs[1].session_snapshot.layers] == [
LifecycleState.CLOSED,
LifecycleState.CLOSED,
]
def test_compositor_enter_without_session_uses_fresh_lifecycle_each_time() -> None:
compositor, layers = _compositor("trace")
def test_concurrent_enters_do_not_share_layer_instances() -> None:
compositor = _compositor("trace")
async def enter_once() -> tuple[int, list[str]]:
async with compositor.enter() as run:
layer = run.get_layer("trace", TraceLayer)
await asyncio.sleep(0)
return id(layer), layer.events
async def run_concurrently() -> list[tuple[int, list[str]]]:
return list(await asyncio.gather(enter_once(), enter_once()))
results = asyncio.run(run_concurrently())
assert results[0][0] != results[1][0]
assert results[0][1] == ["create", "delete"]
assert results[1][1] == ["create", "delete"]
class ConfiguredLayerConfig(LayerConfig):
value: str
model_config = ConfigDict(extra="forbid")
@dataclass(slots=True)
class ConfiguredLayer(PlainLayer[NoLayerDeps, ConfiguredLayerConfig]):
type_id = "test.configured"
value: str
hooks: list[str] = field(default_factory=list)
@classmethod
@override
def from_config(cls, config: ConfiguredLayerConfig) -> "ConfiguredLayer":
return cls(value=config.value)
@override
async def on_context_create(self) -> None:
self.hooks.append(f"create:{self.config.value}")
def test_custom_factory_is_called_each_enter_with_typed_config() -> None:
calls: list[str] = []
def create_layer(config: ConfiguredLayerConfig) -> ConfiguredLayer:
calls.append(config.value)
return ConfiguredLayer(value=f"factory:{config.value}")
compositor = Compositor(
[LayerNode("configured", LayerProvider.from_factory(layer_type=ConfiguredLayer, create=create_layer))]
)
async def run() -> None:
async with compositor.enter() as session:
session.suspend_on_exit()
async with compositor.enter(configs={"configured": {"value": "one"}}) as first_run:
first_layer = first_run.get_layer("configured", ConfiguredLayer)
assert first_layer.value == "factory:one"
assert first_layer.config.value == "one"
async with compositor.enter(configs={"configured": ConfiguredLayerConfig(value="two")}) as second_run:
second_layer = second_run.get_layer("configured", ConfiguredLayer)
assert second_layer.value == "factory:two"
assert second_layer.config.value == "two"
assert second_layer is not first_layer
asyncio.run(run())
assert calls == ["one", "two"]
def test_provider_rejects_reused_layer_instance_before_hooks_run() -> None:
shared_layer = TraceLayer()
compositor = Compositor(
[
LayerNode(
"trace",
LayerProvider.from_factory(layer_type=TraceLayer, create=lambda config: shared_layer),
)
]
)
async def run() -> None:
async with compositor.enter():
pass
with pytest.raises(ValueError, match="fresh layer instance"):
async with compositor.enter():
pass
asyncio.run(run())
assert layers["trace"].events == ["create", "suspend", "create", "delete"]
assert shared_layer.events == ["create", "delete"]
def test_compositor_enter_rejects_session_with_mismatched_layer_names() -> None:
compositor, _layers = _compositor("trace")
session = CompositorSession(["other"])
def test_configs_are_validated_by_node_name_before_factory_call() -> None:
calls: list[str] = []
async def run() -> None:
async with compositor.enter(session):
pass
def create_layer(config: ConfiguredLayerConfig) -> ConfiguredLayer:
calls.append(config.value)
return ConfiguredLayer(value=config.value)
try:
asyncio.run(run())
except ValueError as e:
assert str(e) == (
"CompositorSession layer names must match compositor layers in order. "
"Expected [trace], got [other]."
compositor = Compositor(
[LayerNode("configured", LayerProvider.from_factory(layer_type=ConfiguredLayer, create=create_layer))]
)
with pytest.raises(ValueError, match="unknown layer node names: missing"):
asyncio.run(_enter_once(compositor, configs={"missing": {}}))
with pytest.raises(ValidationError):
asyncio.run(_enter_once(compositor, configs={"configured": {"unknown": "field"}}))
assert calls == []
def test_all_node_configs_are_validated_before_any_factory_runs() -> None:
calls: list[str] = []
def create_layer(config: ConfiguredLayerConfig) -> ConfiguredLayer:
calls.append(config.value)
return ConfiguredLayer(value=config.value)
provider = LayerProvider.from_factory(layer_type=ConfiguredLayer, create=create_layer)
compositor = Compositor([LayerNode("first", provider), LayerNode("second", provider)])
with pytest.raises(ValidationError):
asyncio.run(
_enter_once(
compositor,
configs={"first": {"value": "valid"}, "second": {"unknown": "field"}},
)
)
else:
raise AssertionError("Expected ValueError.")
assert calls == []
def test_compositor_enter_rejects_same_active_session_nested() -> None:
compositor, _layers = _compositor("trace")
session = compositor.new_session()
def test_existing_config_model_instances_are_revalidated_before_factory_runs() -> None:
calls: list[str] = []
async def run() -> None:
async with compositor.enter(session):
async with compositor.enter(session):
pass
def create_layer(config: ConfiguredLayerConfig) -> ConfiguredLayer:
calls.append(config.value)
return ConfiguredLayer(value=config.value)
try:
asyncio.run(run())
except RuntimeError as e:
assert str(e) == "LayerControl is already active; duplicate or nested enter is not allowed."
else:
raise AssertionError("Expected RuntimeError.")
compositor = Compositor(
[LayerNode("configured", LayerProvider.from_factory(layer_type=ConfiguredLayer, create=create_layer))]
)
config = ConfiguredLayerConfig(value="valid")
config.value = 123 # pyright: ignore[reportAttributeAccessIssue]
with pytest.raises(ValidationError):
asyncio.run(_enter_once(compositor, configs={"configured": config}))
assert calls == []
def test_compositor_enter_rejects_closed_session() -> None:
compositor, _layers = _compositor("trace")
session = compositor.new_session()
def test_existing_snapshot_model_instances_are_revalidated_before_factory_runs() -> None:
calls = 0
async def run() -> None:
async with compositor.enter(session):
pass
def create_layer(config: EmptyLayerConfig) -> TraceLayer:
nonlocal calls
calls += 1
return TraceLayer()
async with compositor.enter(session):
pass
compositor = Compositor([LayerNode("trace", LayerProvider.from_factory(layer_type=TraceLayer, create=create_layer))])
snapshot = CompositorSessionSnapshot.model_validate(
{"layers": [{"name": "trace", "lifecycle_state": "suspended", "runtime_state": {}}]}
)
snapshot.layers[0].lifecycle_state = LifecycleState.ACTIVE
try:
asyncio.run(run())
except RuntimeError as e:
assert str(e) == "LayerControl is closed; create a new compositor session before entering again."
else:
raise AssertionError("Expected RuntimeError.")
with pytest.raises(ValidationError, match="ACTIVE is internal-only"):
asyncio.run(_enter_once(compositor, session_snapshot=snapshot))
def test_per_layer_suspend_on_exit_only_resumes_that_layer() -> None:
compositor, layers = _compositor("first", "second")
session = compositor.new_session()
async def run() -> None:
async with compositor.enter(session):
session.layer("first").suspend_on_exit()
assert session.layer("first").state is LifecycleState.SUSPENDED
assert session.layer("second").state is LifecycleState.CLOSED
async with compositor.enter(session):
pass
try:
asyncio.run(run())
except RuntimeError as e:
assert str(e) == "LayerControl is closed; create a new compositor session before entering again."
else:
raise AssertionError("Expected RuntimeError.")
assert layers["first"].events == ["create", "suspend"]
assert layers["second"].events == ["create", "delete"]
@dataclass(slots=True)
class FailingCreateLayer(PlainLayer[NoLayerDeps]):
attempts: int = 0
@override
async def on_context_create(self, control: LayerControl) -> None:
self.attempts += 1
if self.attempts == 1:
raise RuntimeError("create failed")
def test_failed_create_keeps_control_reusable_as_new() -> None:
layer = FailingCreateLayer()
compositor: Compositor[PlainPromptType, PlainToolType] = Compositor(layers=OrderedDict([("trace", layer)]))
session = compositor.new_session()
async def fail_then_retry() -> None:
try:
async with compositor.enter(session):
pass
except RuntimeError as e:
assert str(e) == "create failed"
else:
raise AssertionError("Expected RuntimeError.")
assert session.layer("trace").state is LifecycleState.NEW
async with compositor.enter(session):
pass
asyncio.run(fail_then_retry())
assert session.layer("trace").state is LifecycleState.CLOSED
assert layer.attempts == 2
@dataclass(slots=True)
class FailingResumeLayer(PlainLayer[NoLayerDeps]):
resumed: bool = False
@override
async def on_context_resume(self, control: LayerControl) -> None:
if not self.resumed:
self.resumed = True
raise RuntimeError("resume failed")
def test_failed_resume_keeps_control_reusable_as_suspended() -> None:
layer = FailingResumeLayer()
compositor: Compositor[PlainPromptType, PlainToolType] = Compositor(layers=OrderedDict([("trace", layer)]))
session = compositor.new_session()
async def suspend_fail_then_retry() -> None:
async with compositor.enter(session) as active_session:
active_session.suspend_on_exit()
try:
async with compositor.enter(session):
pass
except RuntimeError as e:
assert str(e) == "resume failed"
else:
raise AssertionError("Expected RuntimeError.")
assert session.layer("trace").state is LifecycleState.SUSPENDED
async with compositor.enter(session):
pass
asyncio.run(suspend_fail_then_retry())
assert session.layer("trace").state is LifecycleState.CLOSED
assert calls == 0
class RuntimeState(BaseModel):
runtime_id: int | None = None
resumed_runtime_id: int | None = None
deleted_runtime_id: int | None = None
body_value: str | None = None
model_config = ConfigDict(extra="forbid", validate_assignment=True)
@ -258,302 +270,126 @@ class RuntimeStateLayer(PlainLayer[NoLayerDeps, EmptyLayerConfig, RuntimeState])
next_id: Iterator[int] = field(default_factory=lambda: count(1))
@override
async def on_context_create(self, control: LayerControl[RuntimeState, EmptyRuntimeHandles]) -> None:
runtime_id = next(self.next_id)
control.runtime_state.runtime_id = runtime_id
async def on_context_create(self) -> None:
self.runtime_state.runtime_id = next(self.next_id)
@override
async def on_context_resume(self, control: LayerControl[RuntimeState, EmptyRuntimeHandles]) -> None:
control.runtime_state.resumed_runtime_id = control.runtime_state.runtime_id
async def on_context_resume(self) -> None:
self.runtime_state.resumed_runtime_id = self.runtime_state.runtime_id
@override
async def on_context_delete(self, control: LayerControl[RuntimeState, EmptyRuntimeHandles]) -> None:
control.runtime_state.deleted_runtime_id = control.runtime_state.runtime_id
async def on_context_delete(self) -> None:
self.runtime_state.deleted_runtime_id = self.runtime_state.runtime_id
def test_runtime_state_is_per_session_and_survives_suspend_resume_delete() -> None:
layer = RuntimeStateLayer()
compositor: Compositor[PlainPromptType, PlainToolType] = Compositor(layers=OrderedDict([("trace", layer)]))
first_session = compositor.new_session()
second_session = compositor.new_session()
def test_snapshot_hydrates_runtime_state_and_exit_snapshots_from_layer_self() -> None:
compositor = Compositor([LayerNode("state", RuntimeStateLayer)])
async def run() -> None:
async with compositor.enter(first_session) as active_session:
active_session.suspend_on_exit()
async def create_suspend_resume_delete() -> tuple[CompositorSessionSnapshot, CompositorSessionSnapshot]:
async with compositor.enter() as first_run:
first_run.suspend_on_exit()
assert first_run.session_snapshot is not None
async with compositor.enter(second_session):
pass
async with compositor.enter(session_snapshot=first_run.session_snapshot) as resumed_run:
resumed_layer = resumed_run.get_layer("state", RuntimeStateLayer)
assert isinstance(resumed_layer.runtime_state, RuntimeState)
assert resumed_layer.runtime_state.runtime_id == 1
assert resumed_layer.runtime_state.resumed_runtime_id == 1
resumed_layer.runtime_state.body_value = "mutated on self"
assert resumed_run.session_snapshot is not None
return first_run.session_snapshot, resumed_run.session_snapshot
async with compositor.enter(first_session):
pass
suspended_snapshot, closed_snapshot = asyncio.run(create_suspend_resume_delete())
asyncio.run(run())
assert first_session.layer("trace").runtime_state.model_dump(exclude_none=True) == {
"runtime_id": 1,
"resumed_runtime_id": 1,
"deleted_runtime_id": 1,
assert suspended_snapshot.model_dump(mode="json") == {
"schema_version": 1,
"layers": [
{
"name": "state",
"lifecycle_state": "suspended",
"runtime_state": {
"runtime_id": 1,
"resumed_runtime_id": None,
"deleted_runtime_id": None,
"body_value": None,
},
}
],
}
assert second_session.layer("trace").runtime_state.model_dump(exclude_none=True) == {
"runtime_id": 2,
"deleted_runtime_id": 2,
assert closed_snapshot.model_dump(mode="json") == {
"schema_version": 1,
"layers": [
{
"name": "state",
"lifecycle_state": "closed",
"runtime_state": {
"runtime_id": 1,
"resumed_runtime_id": 1,
"deleted_runtime_id": 1,
"body_value": "mutated on self",
},
}
],
}
assert not hasattr(layer, "runtime_id")
class ResourceHandles(BaseModel):
resource: "RecordingResource | None" = None
model_config = ConfigDict(extra="forbid", validate_assignment=True, arbitrary_types_allowed=True)
class RecordingResource:
events: list[str]
label: str
closed: bool
def __init__(self, events: list[str], label: str) -> None:
self.events = events
self.label = label
self.closed = False
async def __aenter__(self) -> "RecordingResource":
self.events.append(f"enter:{self.label}")
return self
async def __aexit__(self, exc_type: object, exc: object, traceback: object) -> None:
self.closed = True
self.events.append(f"exit:{self.label}")
@dataclass(slots=True)
class ResourceLayer(PlainLayer[NoLayerDeps, EmptyLayerConfig, EmptyRuntimeState, ResourceHandles]):
events: list[str] = field(default_factory=list)
resources: list[RecordingResource] = field(default_factory=list)
fail_create: bool = False
fail_resume: bool = False
fail_suspend: bool = False
fail_delete: bool = False
@override
async def on_context_create(self, control: LayerControl[EmptyRuntimeState, ResourceHandles]) -> None:
await self._open_resource(control, "create")
self.events.append("create")
if self.fail_create:
raise RuntimeError("create failed")
@override
async def on_context_resume(self, control: LayerControl[EmptyRuntimeState, ResourceHandles]) -> None:
await self._open_resource(control, "resume")
self.events.append("resume")
if self.fail_resume:
raise RuntimeError("resume failed")
@override
async def on_context_suspend(self, control: LayerControl[EmptyRuntimeState, ResourceHandles]) -> None:
self.events.append("suspend")
control.runtime_handles.resource = None
if self.fail_suspend:
raise RuntimeError("suspend failed")
@override
async def on_context_delete(self, control: LayerControl[EmptyRuntimeState, ResourceHandles]) -> None:
self.events.append("delete")
control.runtime_handles.resource = None
if self.fail_delete:
raise RuntimeError("delete failed")
async def _open_resource(
self,
control: LayerControl[EmptyRuntimeState, ResourceHandles],
label: str,
) -> None:
resource = await control.enter_async_resource(RecordingResource(self.events, label))
async def cleanup() -> None:
self.events.append(f"cleanup:{label}")
control.add_async_cleanup(cleanup)
control.runtime_handles.resource = resource
self.resources.append(resource)
def _resource_control(control: LayerControl) -> LayerControl[EmptyRuntimeState, ResourceHandles]:
return cast(LayerControl[EmptyRuntimeState, ResourceHandles], control)
def test_entry_resource_stack_closes_resources_on_suspend_and_delete() -> None:
layer = ResourceLayer()
compositor: Compositor[PlainPromptType, PlainToolType] = Compositor(layers=OrderedDict([("resource", layer)]))
session = compositor.new_session()
def test_run_snapshot_rejects_active_layers() -> None:
compositor = _compositor("trace")
async def run() -> None:
async with compositor.enter(session) as active_session:
control = _resource_control(active_session.layer("resource"))
assert control.runtime_handles.resource is layer.resources[0]
assert layer.resources[0].closed is False
active_session.suspend_on_exit()
assert _resource_control(session.layer("resource")).runtime_handles.resource is None
assert layer.resources[0].closed is True
async with compositor.enter(session):
control = _resource_control(session.layer("resource"))
assert control.runtime_handles.resource is layer.resources[1]
assert layer.resources[1].closed is False
assert _resource_control(session.layer("resource")).runtime_handles.resource is None
assert layer.resources[1].closed is True
asyncio.run(run())
assert layer.events == [
"enter:create",
"create",
"suspend",
"cleanup:create",
"exit:create",
"enter:resume",
"resume",
"delete",
"cleanup:resume",
"exit:resume",
]
def test_entry_resource_stack_closes_resources_when_create_or_resume_raises() -> None:
async def fail_create() -> None:
layer = ResourceLayer(fail_create=True)
compositor: Compositor[PlainPromptType, PlainToolType] = Compositor(layers=OrderedDict([("resource", layer)]))
session = compositor.new_session()
with pytest.raises(RuntimeError, match="create failed"):
async with compositor.enter(session):
pass
assert session.layer("resource").state is LifecycleState.NEW
assert layer.resources[0].closed is True
assert session.layer("resource")._entry_stack is None
async def fail_resume() -> None:
layer = ResourceLayer()
compositor: Compositor[PlainPromptType, PlainToolType] = Compositor(layers=OrderedDict([("resource", layer)]))
session = compositor.new_session()
async with compositor.enter(session) as active_session:
active_session.suspend_on_exit()
layer.fail_resume = True
with pytest.raises(RuntimeError, match="resume failed"):
async with compositor.enter(session):
pass
assert session.layer("resource").state is LifecycleState.SUSPENDED
assert layer.resources[1].closed is True
assert session.layer("resource")._entry_stack is None
asyncio.run(fail_create())
asyncio.run(fail_resume())
def test_entry_resource_stack_closes_resources_when_body_raises() -> None:
layer = ResourceLayer()
compositor: Compositor[PlainPromptType, PlainToolType] = Compositor(layers=OrderedDict([("resource", layer)]))
session = compositor.new_session()
async def run() -> None:
with pytest.raises(RuntimeError, match="body failed"):
async with compositor.enter(session):
raise RuntimeError("body failed")
asyncio.run(run())
assert session.layer("resource").state is LifecycleState.CLOSED
assert layer.resources[0].closed is True
assert session.layer("resource")._entry_stack is None
def test_failed_suspend_hook_exits_active_state_and_closes_resources() -> None:
layer = ResourceLayer(fail_suspend=True)
compositor: Compositor[PlainPromptType, PlainToolType] = Compositor(layers=OrderedDict([("resource", layer)]))
session = compositor.new_session()
async def run() -> None:
with pytest.raises(RuntimeError, match="suspend failed"):
async with compositor.enter(session) as active_session:
active_session.suspend_on_exit()
control = session.layer("resource")
assert control.state is LifecycleState.SUSPENDED
assert layer.resources[0].closed is True
assert control._entry_stack is None
with pytest.raises(RuntimeError, match="entry resource stack is not active"):
await control.enter_async_resource(RecordingResource([], "unused"))
with pytest.raises(RuntimeError, match="requires an active LayerControl"):
_ = layer.require_control(control, active=True)
async with compositor.enter() as active_run:
with pytest.raises(RuntimeError, match="Cannot snapshot active compositor run layers: trace"):
active_run.snapshot_session()
asyncio.run(run())
def test_failed_delete_hook_exits_active_state_and_closes_resources() -> None:
layer = ResourceLayer(fail_delete=True)
compositor: Compositor[PlainPromptType, PlainToolType] = Compositor(layers=OrderedDict([("resource", layer)]))
session = compositor.new_session()
def test_active_snapshot_input_is_rejected_before_factories_run() -> None:
calls = 0
async def run() -> None:
with pytest.raises(RuntimeError, match="delete failed"):
async with compositor.enter(session):
pass
def create_layer(config: EmptyLayerConfig) -> TraceLayer:
nonlocal calls
calls += 1
return TraceLayer()
control = session.layer("resource")
assert control.state is LifecycleState.CLOSED
assert layer.resources[0].closed is True
assert control._entry_stack is None
with pytest.raises(RuntimeError, match="entry resource stack is not active"):
await control.enter_async_resource(RecordingResource([], "unused"))
with pytest.raises(RuntimeError, match="requires an active LayerControl"):
_ = layer.require_control(control, active=True)
compositor = Compositor([LayerNode("trace", LayerProvider.from_factory(layer_type=TraceLayer, create=create_layer))])
active_snapshot = {"layers": [{"name": "trace", "lifecycle_state": "active", "runtime_state": {}}]}
asyncio.run(run())
with pytest.raises(ValidationError, match="ACTIVE is internal-only"):
CompositorSessionSnapshot.model_validate(active_snapshot)
with pytest.raises(ValidationError, match="ACTIVE is internal-only"):
asyncio.run(_enter_once(compositor, session_snapshot=active_snapshot))
assert calls == 0
def test_resource_stack_api_raises_outside_active_entry_stack() -> None:
control = LayerControl()
def test_closed_snapshot_enter_is_rejected_before_hooks_run() -> None:
created_layers: list[TraceLayer] = []
async def cleanup() -> None:
raise AssertionError("cleanup should not be registered")
def create_layer(config: EmptyLayerConfig) -> TraceLayer:
layer = TraceLayer()
created_layers.append(layer)
return layer
with pytest.raises(RuntimeError, match="entry resource stack is not active"):
control.add_async_cleanup(cleanup)
compositor = Compositor([LayerNode("trace", LayerProvider.from_factory(layer_type=TraceLayer, create=create_layer))])
closed_snapshot = {"layers": [{"name": "trace", "lifecycle_state": "closed", "runtime_state": {}}]}
with pytest.raises(RuntimeError, match="entry resource stack is not active"):
asyncio.run(control.enter_async_resource(RecordingResource([], "unused")))
with pytest.raises(RuntimeError, match="CLOSED snapshots cannot be entered"):
asyncio.run(_enter_once(compositor, session_snapshot=closed_snapshot))
assert len(created_layers) == 1
assert created_layers[0].events == []
def test_require_control_validates_owner_schema_and_active_state() -> None:
layer = ResourceLayer()
other_layer = TraceLayer()
compositor: Compositor[PlainPromptType, PlainToolType] = Compositor(
layers=OrderedDict([("resource", layer), ("other", other_layer)])
)
session = compositor.new_session()
control = session.layer("resource")
with pytest.raises(RuntimeError, match="requires an active LayerControl"):
_ = layer.require_control(control, active=True)
with pytest.raises(RuntimeError, match="belongs to layer 'other'"):
_ = layer.require_control(session.layer("other"))
bad_control = LayerControl()
bad_session = CompositorSession(OrderedDict([("resource", bad_control), ("other", other_layer.new_control())]))
bad_session._bind_owner(compositor)
with pytest.raises(TypeError, match="runtime_handles must be ResourceHandles"):
_ = layer.require_control(bad_control)
async def run() -> None:
async with compositor.enter(session) as active_session:
active_control = active_session.layer("resource")
assert layer.require_control(active_control, active=True) is active_control
asyncio.run(run())
async def _enter_once(
compositor: Compositor,
*,
configs: dict[str, object] | None = None,
session_snapshot: object | None = None,
) -> None:
async with compositor.enter(
configs=configs, # pyright: ignore[reportArgumentType]
session_snapshot=session_snapshot, # pyright: ignore[reportArgumentType]
):
pass

View File

@ -1,11 +1,11 @@
from collections import OrderedDict
import asyncio
from collections.abc import Callable, Sequence
from dataclasses import dataclass
from inspect import Parameter, signature
from typing_extensions import override
from agenton.compositor import Compositor, CompositorTransformerKwargs
from agenton.compositor import Compositor, CompositorTransformerKwargs, LayerNode, LayerProvider
from agenton.layers import NoLayerDeps, PlainLayer, PlainPromptType, PlainToolType, PlainUserPromptType
type ToolCallable = Callable[..., object]
@ -61,6 +61,24 @@ def describe_tools(tools: Sequence[PlainToolType]) -> list[str]:
return [tool.value.__name__ for tool in tools]
def prompt_tool_provider(
*,
prefix: list[str] | None = None,
user: list[str] | None = None,
suffix: list[str] | None = None,
tool_entries: list[ToolCallable] | None = None,
) -> LayerProvider[PromptAndToolLayer]:
return LayerProvider.from_factory(
layer_type=PromptAndToolLayer,
create=lambda config: PromptAndToolLayer(
prefix=list(prefix or []),
user=list(user or []),
suffix=list(suffix or []),
tool_entries=list(tool_entries or []),
),
)
def test_compositor_transformer_kwargs_keys_match_constructor_parameters() -> None:
transformer_kwargs = set(CompositorTransformerKwargs.__required_keys__)
parameters = signature(Compositor).parameters
@ -84,53 +102,36 @@ def test_compositor_transformer_kwargs_keys_match_from_config_parameters() -> No
def test_compositor_transforms_prompts_to_another_type_after_layer_ordering() -> None:
compositor: Compositor[WrappedPrompt, PlainToolType, PlainPromptType, PlainToolType] = Compositor(
layers=OrderedDict(
[
(
"first",
PromptAndToolLayer(
prefix=["first-prefix"],
user=[],
suffix=["first-suffix"],
tool_entries=[],
),
),
(
"second",
PromptAndToolLayer(
prefix=["second-prefix"],
user=[],
suffix=["second-suffix"],
tool_entries=[],
),
),
]
),
[
LayerNode("first", prompt_tool_provider(prefix=["first-prefix"], suffix=["first-suffix"])),
LayerNode("second", prompt_tool_provider(prefix=["second-prefix"], suffix=["second-suffix"])),
],
prompt_transformer=wrap_prompts,
)
assert compositor.prompts == [
("wrapped", "first-prefix"),
("wrapped", "second-prefix"),
("wrapped", "second-suffix"),
("wrapped", "first-suffix"),
]
async def run() -> None:
async with compositor.enter() as active_run:
assert active_run.prompts == [
("wrapped", "first-prefix"),
("wrapped", "second-prefix"),
("wrapped", "second-suffix"),
("wrapped", "first-suffix"),
]
asyncio.run(run())
def test_compositor_transforms_tools_to_another_type_after_layer_aggregation() -> None:
compositor: Compositor[PlainPromptType, str, PlainPromptType, PlainToolType] = Compositor(
layers=OrderedDict(
[
(
"tools",
PromptAndToolLayer(prefix=[], user=[], suffix=[], tool_entries=[base_tool, wrapped_tool]),
)
]
),
[LayerNode("tools", prompt_tool_provider(tool_entries=[base_tool, wrapped_tool]))],
tool_transformer=describe_tools,
)
assert compositor.tools == ["base_tool", "wrapped_tool"]
async def run() -> None:
async with compositor.enter() as active_run:
assert active_run.tools == ["base_tool", "wrapped_tool"]
asyncio.run(run())
def test_compositor_transforms_user_prompts_after_layer_ordering() -> None:
@ -142,22 +143,18 @@ def test_compositor_transforms_user_prompts_after_layer_ordering() -> None:
WrappedUserPrompt,
PlainUserPromptType,
] = Compositor(
layers=OrderedDict(
[
(
"first",
PromptAndToolLayer(prefix=[], user=["first-user"], suffix=[], tool_entries=[]),
),
(
"second",
PromptAndToolLayer(prefix=[], user=["second-user"], suffix=[], tool_entries=[]),
),
]
),
[
LayerNode("first", prompt_tool_provider(user=["first-user"])),
LayerNode("second", prompt_tool_provider(user=["second-user"])),
],
user_prompt_transformer=wrap_user_prompts,
)
assert compositor.user_prompts == [
("wrapped-user", "first-user"),
("wrapped-user", "second-user"),
]
async def run() -> None:
async with compositor.enter() as active_run:
assert active_run.user_prompts == [
("wrapped-user", "first-user"),
("wrapped-user", "second-user"),
]
asyncio.run(run())

View File

@ -2,13 +2,11 @@ from dataclasses import dataclass
from pydantic import BaseModel, ConfigDict
from agenton.compositor import LayerRegistry
from agenton.compositor import LayerProvider
from agenton.layers import (
EmptyLayerConfig,
EmptyRuntimeHandles,
EmptyRuntimeState,
LayerConfig,
LayerControl,
NoLayerDeps,
PlainLayer,
)
@ -26,19 +24,16 @@ class InferredState(BaseModel):
model_config = ConfigDict(extra="forbid", validate_assignment=True)
class InferredHandles(BaseModel):
token: object | None = None
model_config = ConfigDict(extra="forbid", validate_assignment=True, arbitrary_types_allowed=True)
@dataclass(slots=True)
class GenericSchemaLayer(PlainLayer[NoLayerDeps, InferredConfig, InferredState, InferredHandles]):
class GenericSchemaLayer(PlainLayer[NoLayerDeps, InferredConfig, InferredState]):
type_id = "test.generic-schema"
async def on_context_create(self, control: LayerControl[InferredState, InferredHandles]) -> None:
control.runtime_state.count += 1
control.runtime_handles.token = object()
@classmethod
def from_config(cls, config: InferredConfig) -> "GenericSchemaLayer":
return cls()
async def on_context_create(self) -> None:
self.runtime_state.count += 1
@dataclass(slots=True)
@ -46,27 +41,22 @@ class DefaultSchemaLayer(PlainLayer[NoLayerDeps]):
type_id = "test.default-schema"
def test_layer_infers_config_runtime_state_and_handles_from_generics() -> None:
def test_layer_infers_config_and_runtime_state_from_generics() -> None:
layer = GenericSchemaLayer()
control = layer.new_control(runtime_state={"count": 3})
layer.runtime_state = InferredState(count=3)
assert GenericSchemaLayer.config_type is InferredConfig
assert GenericSchemaLayer.runtime_state_type is InferredState
assert GenericSchemaLayer.runtime_handles_type is InferredHandles
assert isinstance(control.runtime_state, InferredState)
assert control.runtime_state.count == 3
assert isinstance(control.runtime_handles, InferredHandles)
assert isinstance(layer.runtime_state, InferredState)
assert layer.runtime_state.count == 3
def test_layer_uses_empty_schema_defaults_when_omitted() -> None:
layer = DefaultSchemaLayer()
control = layer.new_control()
assert DefaultSchemaLayer.config_type is EmptyLayerConfig
assert DefaultSchemaLayer.runtime_state_type is EmptyRuntimeState
assert DefaultSchemaLayer.runtime_handles_type is EmptyRuntimeHandles
assert isinstance(control.runtime_state, EmptyRuntimeState)
assert isinstance(control.runtime_handles, EmptyRuntimeHandles)
assert isinstance(layer.runtime_state, EmptyRuntimeState)
def test_invalid_declared_schema_type_is_rejected_clearly() -> None:
@ -91,12 +81,12 @@ def test_invalid_declared_schema_type_is_rejected_clearly() -> None:
raise AssertionError("Expected TypeError.")
def test_registry_descriptor_uses_inferred_schema_types() -> None:
registry = LayerRegistry()
registry.register_layer(GenericSchemaLayer)
def test_layer_provider_uses_inferred_schema_types() -> None:
provider = LayerProvider.from_layer_type(GenericSchemaLayer)
descriptor = registry.resolve("test.generic-schema")
layer = provider.create_layer({"value": "configured"})
assert descriptor.config_type is InferredConfig
assert descriptor.runtime_state_type is InferredState
assert descriptor.runtime_handles_type is InferredHandles
assert provider.type_id == "test.generic-schema"
assert provider.layer_type.config_type is InferredConfig
assert provider.layer_type.runtime_state_type is InferredState
assert isinstance(layer.config, InferredConfig)