diff --git a/dify-agent/src/agenton/__init__.py b/dify-agent/src/agenton/__init__.py index e69de29bb2..9c9bdfee40 100644 --- a/dify-agent/src/agenton/__init__.py +++ b/dify-agent/src/agenton/__init__.py @@ -0,0 +1,18 @@ +"""Agenton state-only core. + +Agenton core composes reusable stateless layer graph plans, creates a fresh +``CompositorRun`` for each invocation, hydrates and advances serializable layer +``runtime_state`` through run slots, and emits session snapshots. It intentionally +does not own resources, handles, clients, cleanup callbacks, or any other +non-serializable runtime object. + +Each ``Compositor`` stores only graph nodes and layer providers. Every enter call +creates new layer instances, binds direct dependencies for that run, and writes +the next cross-call state to ``run.session_snapshot`` after exit. To resume a +suspended call, reuse the same compositor plan and pass the prior snapshot to a +new enter call. + +``LifecycleState.ACTIVE`` is internal-only while an entry is running. External +session snapshots and hydrated input must contain only non-active lifecycle +states; ``runtime_state`` is the only mutable layer data captured by snapshots. +""" diff --git a/dify-agent/src/agenton/compositor/__init__.py b/dify-agent/src/agenton/compositor/__init__.py index aed487332c..e050dc6eda 100644 --- a/dify-agent/src/agenton/compositor/__init__.py +++ b/dify-agent/src/agenton/compositor/__init__.py @@ -1,57 +1,50 @@ -"""Layer composition primitives. +"""Stateless layer graph composition for the Agenton core. -The compositor owns a named, ordered set of layers. ``Compositor[PromptT, -ToolT, LayerPromptT, LayerToolT]`` is framework-neutral; callers choose layer and -exposed prompt/tool item types by annotating construction or assignment sites. -When only the first two type arguments are supplied, ``LayerPromptT`` and -``LayerToolT`` default to the corresponding exposed item types. +``Compositor`` is a reusable graph plan plus layer providers. It stores no live +layer instances, run lifecycle state, session state, resources, or handles. Each +``Compositor.enter(...)`` call creates a fresh ``CompositorRun`` that owns the +new layer instances and per-layer run slots for that invocation only. -Layer instances are shared graph/capability definitions owned by the compositor. -Per-session runtime state belongs to each session's ``LayerControl`` objects, -not to the shared layer instances, so different sessions can enter the same -compositor without leaking generated ids or handles through ``self``. -Controls know their owning session and layer id privately so code running inside a -layer can use ``LayerControl.control_for`` to resolve dependency controls from the -same session. These owner links are runtime metadata and are never serialized in -session snapshots. +Agenton core does not manage resources, handles, cleanup stacks, clients, or any +other live object. It composes the layer graph, validates node-name keyed configs +through providers, hydrates serializable ``runtime_state`` from an optional +``CompositorSessionSnapshot``, runs no-argument layer lifecycle hooks, and writes +the next session snapshot to ``run.session_snapshot`` after exit. +``LifecycleState.ACTIVE`` exists only while a run is entered; it is rejected in +external session snapshots and is never emitted. -Dependency mappings use layer-local dependency names as keys and compositor -layer names as values. System prompt aggregation depends on insertion order: -prefix prompts are collected from first to last layer, while suffix prompts are -collected in reverse. User prompts are collected from first to last layer so the -composed user message preserves graph order. +Dependencies are direct layer instance relationships bound onto ``layer.deps`` +inside one run. Dependency mappings use layer-local dependency names as keys and +compositor layer names as values. System prompt aggregation depends on graph +order: prefix prompts are collected from first to last layer, while suffix +prompts are collected in reverse. User prompts are collected from first to last +layer so the composed user message preserves graph order. -Serializable graph config uses registry type ids rather than import paths. -``LayerNodeConfig.config`` accepts plain JSON values and ``LayerConfig`` DTO -instances; JSON serialization preserves concrete DTO fields before the builder -validates them with the registered layer schema. ``CompositorBuilder`` resolves -config nodes through ``LayerRegistry`` and can mix those nodes with live layer -instances for Python objects and callables. Registries may also supply factories -for layers that require server-side dependencies in addition to client DTOs. +Serializable graph config uses provider type ids rather than import paths. +Graph nodes contain only name, type, dependency mapping, and metadata; runtime +state travels only in session snapshots and per-call layer config travels only +through ``Compositor.enter(configs=...)``. ``Compositor.from_config`` resolves +type ids from provider lists, and ``node_providers`` override type-id providers +for named nodes. -``Compositor.enter`` enters layers in compositor order and exits them in reverse -order through ``AsyncExitStack``. It accepts an optional ``CompositorSession`` -whose layer controls must match the compositor layer names and order. When -omitted, a fresh session is created. Reusing a suspended session resumes its -layer contexts; closed sessions must be replaced. - -Optional prompt, user prompt, and tool transformers run after layer aggregation. -The compositor asks each layer to ``wrap_prompt``, ``wrap_user_prompt``, and -``wrap_tool`` its native values, so typed layer families can tag values without -changing their authoring contracts. When transformers are omitted, the -compositor returns those wrapped items unchanged. +Optional prompt, user prompt, and tool transformers run after run-level layer +aggregation. The run asks each layer to ``wrap_prompt``, ``wrap_user_prompt``, +and ``wrap_tool`` its native values, so typed layer families can tag values +without changing their authoring contracts. When transformers are omitted, the +run returns those wrapped items unchanged. """ from collections import OrderedDict -from collections.abc import AsyncIterator, Callable, Iterable, Mapping as MappingABC, Sequence -from contextlib import AsyncExitStack, asynccontextmanager -from dataclasses import dataclass, field -from typing import Any, Generic, Mapping, TypedDict, cast, overload +from collections.abc import AsyncIterator, Callable, Mapping, Sequence +from contextlib import asynccontextmanager +from dataclasses import dataclass +from typing import Any, Generic, TypedDict, cast, overload +import weakref -from pydantic import BaseModel, ConfigDict, Field, JsonValue -from typing_extensions import Self, TypeVar +from pydantic import BaseModel, ConfigDict, Field, JsonValue, field_validator +from typing_extensions import TypeVar -from agenton.layers.base import Layer, LayerConfig, LayerConfigValue, LayerControl, LifecycleState +from agenton.layers.base import ExitIntent, Layer, LayerConfig, LayerConfigValue, LifecycleState from agenton.layers.types import AllPromptTypes, AllToolTypes, AllUserPromptTypes PromptT = TypeVar("PromptT", default=AllPromptTypes) @@ -60,9 +53,7 @@ LayerPromptT = TypeVar("LayerPromptT", default=AllPromptTypes) LayerToolT = TypeVar("LayerToolT", default=AllToolTypes) UserPromptT = TypeVar("UserPromptT", default=AllUserPromptTypes) LayerUserPromptT = TypeVar("LayerUserPromptT", default=AllUserPromptTypes) -LayerT = TypeVar("LayerT", bound=Layer[Any, Any, Any, Any, Any, Any, Any]) -DepRuntimeStateT = TypeVar("DepRuntimeStateT", bound=BaseModel) -DepRuntimeHandlesT = TypeVar("DepRuntimeHandlesT", bound=BaseModel) +LayerT = TypeVar("LayerT", bound=Layer[Any, Any, Any, Any, Any, Any]) type CompositorTransformer[InputT, OutputT] = Callable[[Sequence[InputT]], Sequence[OutputT]] @@ -84,27 +75,175 @@ class CompositorTransformerKwargs[ type _ConfigModelValue[ModelT: BaseModel] = ModelT | JsonValue | str | bytes -type LayerFactory = Callable[[LayerConfig], Layer[Any, Any, Any, Any, Any, Any, Any]] +type LayerConfigInput = LayerConfigValue | Mapping[str, object] | str | bytes | None +type LayerFactory = Callable[[LayerConfig], Layer[Any, Any, Any, Any, Any, Any]] +type LayerProviderInput = type[Layer[Any, Any, Any, Any, Any, Any]] | "LayerProvider[Any]" def _validate_config_model_input[ModelT: BaseModel]( model_type: type[ModelT], value: _ConfigModelValue[ModelT] | Mapping[str, object], ) -> ModelT: - if isinstance(value, model_type): - return value + """Validate an external DTO boundary, including existing model instances. + + Pydantic models in this package are generally mutable and do not all enable + assignment validation. Revalidating existing instances through their dumped + data prevents post-construction mutations from bypassing config or snapshot + validators at compositor entry boundaries. + """ + if isinstance(value, BaseModel): + return model_type.model_validate(value.model_dump(mode="python", warnings=False)) if isinstance(value, str | bytes): return model_type.model_validate_json(value) return model_type.model_validate(value) +_USED_LAYER_INSTANCE_REFS: dict[int, weakref.ReferenceType[Layer[Any, Any, Any, Any, Any, Any]]] = {} + + +def _claim_fresh_layer_instance(layer: Layer[Any, Any, Any, Any, Any, Any]) -> None: + """Reject provider factories that return a layer object used before. + + The registry stores weak references, not live resources or run state. It is + intentionally global to keep ``Compositor`` stateless while still enforcing + the proposal's fresh-instance boundary before any lifecycle hook can run. + """ + layer_identity = id(layer) + existing_ref = _USED_LAYER_INSTANCE_REFS.get(layer_identity) + if existing_ref is not None: + existing_layer = existing_ref() + if existing_layer is not None: + raise ValueError( + "LayerProvider factories must return a fresh layer instance for each invocation; " + f"got reused instance of '{type(layer).__name__}'." + ) + _USED_LAYER_INSTANCE_REFS.pop(layer_identity, None) + + def remove_ref(ref: weakref.ReferenceType[Layer[Any, Any, Any, Any, Any, Any]]) -> None: + if _USED_LAYER_INSTANCE_REFS.get(layer_identity) is ref: + _USED_LAYER_INSTANCE_REFS.pop(layer_identity, None) + + _USED_LAYER_INSTANCE_REFS[layer_identity] = weakref.ref(layer, remove_ref) + + +class LayerProvider(Generic[LayerT]): + """Validated layer factory for one concrete ``Layer`` class. + + Providers are reusable construction plans. They validate per-call config with + ``layer_type.config_type`` before invoking either ``layer_type.from_config`` + or a custom factory. The factory receives only typed config, never graph node + data, and must return a fresh ``layer_type`` instance; reused instances are + rejected before dependencies are bound or hooks run. + """ + + __slots__ = ("_create", "layer_type") + + layer_type: type[LayerT] + _create: Callable[[LayerConfig], LayerT] + + def __init__(self, *, layer_type: type[LayerT], create: Callable[[LayerConfig], LayerT]) -> None: + self.layer_type = layer_type + self._create = create + + @classmethod + def from_layer_type(cls, layer_type: type[LayerT]) -> "LayerProvider[LayerT]": + """Create a provider that constructs layers via ``layer_type.from_config``.""" + + def create(config: LayerConfig) -> LayerT: + return layer_type.from_config(cast(Any, config)) + + return cls(layer_type=layer_type, create=create) + + @classmethod + def from_factory( + cls, + *, + layer_type: type[LayerT], + create: Callable[[Any], LayerT], + ) -> "LayerProvider[LayerT]": + """Create a provider from a custom typed-config factory. + + ``create`` receives the validated instance of ``layer_type.config_type``. + It does not receive the graph node; node-specific construction should use + a dedicated provider in ``Compositor.from_config(node_providers=...)``. + """ + return cls(layer_type=layer_type, create=cast(Callable[[LayerConfig], LayerT], create)) + + @property + def type_id(self) -> str | None: + """Return the serializable registry type id declared by ``layer_type``.""" + return self.layer_type.type_id + + def create_layer(self, config: LayerConfigInput = None) -> LayerT: + """Validate config, call the factory, and return a fresh layer instance.""" + typed_config = self.validate_config(config) + return self.create_layer_from_config(typed_config) + + def validate_config(self, config: LayerConfigInput = None) -> LayerConfig: + """Return typed config without invoking the layer factory. + + ``Compositor.enter`` calls this for every node before creating any layer + so a later invalid node config cannot leave earlier factory side effects. + """ + raw_config: LayerConfigValue | Mapping[str, object] | str | bytes = {} if config is None else config + return _validate_config_model_input(self.layer_type.config_type, raw_config) + + def create_layer_from_config(self, config: LayerConfig) -> LayerT: + """Call the factory with validated config and enforce fresh instances.""" + typed_config = self.validate_config(config) + layer = self._create(typed_config) + if not isinstance(layer, self.layer_type): + raise TypeError( + f"LayerProvider for '{self.layer_type.__name__}' returned '{type(layer).__name__}', " + f"expected '{self.layer_type.__name__}'." + ) + _claim_fresh_layer_instance(layer) + layer.config = cast(Any, typed_config) + return layer + + +@dataclass(frozen=True, slots=True, init=False) +class LayerNode: + """Stateless graph node plan for one named layer provider. + + ``implementation`` may be a layer class or an explicit ``LayerProvider``. + ``deps`` maps dependency field names on this node's layer class to other + compositor node names. ``metadata`` is graph description data only; it is not + passed to provider factories and is never included in session snapshots. + """ + + name: str + provider: LayerProvider[Any] + deps: Mapping[str, str] + metadata: Mapping[str, JsonValue] + + def __init__( + self, + name: str, + implementation: LayerProviderInput, + *, + deps: Mapping[str, str] | None = None, + metadata: Mapping[str, JsonValue] | None = None, + ) -> None: + if not name: + raise ValueError("Layer node name must not be empty.") + object.__setattr__(self, "name", name) + object.__setattr__(self, "provider", _as_layer_provider(implementation)) + object.__setattr__(self, "deps", dict(deps or {})) + object.__setattr__(self, "metadata", dict(metadata or {})) + + class LayerNodeConfig(BaseModel): - """Serializable config for one registry-backed layer node.""" + """Serializable config for one provider-backed layer graph node. + + Nodes intentionally contain no runtime state and no per-call layer config. + Runtime state belongs to session snapshots; layer config belongs to + ``Compositor.enter(configs=...)`` keyed by node name. + """ name: str type: str - config: LayerConfigValue = Field(default_factory=dict) deps: Mapping[str, str] = Field(default_factory=dict) metadata: Mapping[str, JsonValue] = Field(default_factory=dict) @@ -112,12 +251,7 @@ class LayerNodeConfig(BaseModel): class CompositorConfig(BaseModel): - """Serializable config for constructing a compositor graph. - - The graph references layer implementations by registry type id. Live Python - objects and callables are intentionally excluded; compose those with - ``CompositorBuilder.add_instance``. - """ + """Serializable config for constructing a reusable compositor graph plan.""" schema_version: int = 1 layers: list[LayerNodeConfig] @@ -132,220 +266,34 @@ def _validate_compositor_config_input(value: CompositorConfigValue) -> Composito return _validate_config_model_input(CompositorConfig, value) -@dataclass(frozen=True, slots=True) -class LayerDescriptor: - """Registry descriptor inferred from a layer class.""" - - type_id: str - layer_type: type[Layer[Any, Any, Any, Any, Any, Any, Any]] - config_type: type[LayerConfig] - runtime_state_type: type[BaseModel] - runtime_handles_type: type[BaseModel] - factory: LayerFactory | None = None - - -class LayerRegistry: - """Manual registry for config-constructible layer classes. - - Registration infers config and runtime schemas from layer class attributes. - A registered layer must have a type id, either declared as ``type_id`` on the - class or supplied to ``register_layer``. Optional factories let server code - inject dependencies that do not belong in public layer DTOs. - """ - - __slots__ = ("_descriptors",) - - _descriptors: dict[str, LayerDescriptor] - - def __init__(self) -> None: - self._descriptors = {} - - def register_layer( - self, - layer_type: type[Layer[Any, Any, Any, Any, Any, Any, Any]], - *, - type_id: str | None = None, - factory: LayerFactory | None = None, - ) -> None: - """Register ``layer_type`` under its inferred or explicit type id. - - ``factory`` receives validated layer config and constructs the layer. It - is intended for server-only dependencies such as clients or secrets; omit - it for normal ``Layer.from_config`` construction. - """ - resolved_type_id = type_id or layer_type.type_id - if resolved_type_id is not None and not isinstance(resolved_type_id, str): - raise TypeError(f"Layer type id for '{layer_type.__qualname__}' must be a string.") - if resolved_type_id is None or not resolved_type_id: - raise ValueError(f"Layer '{layer_type.__qualname__}' must declare a type_id or be registered with one.") - if resolved_type_id in self._descriptors: - raise ValueError(f"Layer type id '{resolved_type_id}' is already registered.") - self._descriptors[resolved_type_id] = LayerDescriptor( - type_id=resolved_type_id, - layer_type=layer_type, - config_type=layer_type.config_type, - runtime_state_type=layer_type.runtime_state_type, - runtime_handles_type=layer_type.runtime_handles_type, - factory=factory, - ) - - def resolve(self, type_id: str) -> LayerDescriptor: - """Return the descriptor for ``type_id`` or raise ``KeyError``.""" - try: - return self._descriptors[type_id] - except KeyError as e: - raise KeyError(f"Layer type id '{type_id}' is not registered.") from e - - def descriptors(self) -> Mapping[str, LayerDescriptor]: - """Return registered descriptors keyed by type id.""" - return dict(self._descriptors) - - -class CompositorSession: - """External lifecycle session for layer contexts entered by a compositor. - - A session owns one ``LayerControl`` per compositor layer name, preserving - compositor order. Controls must be created from the matching layer schemas; - prefer ``Compositor.new_session`` or ``Compositor.session_from_snapshot`` for - public session construction. Broadcast methods are convenience APIs for - setting every layer's per-entry exit intent; ``layer`` allows explicit - per-layer control when callers need partial suspend/delete behavior. A mixed - session with any closed layer cannot be entered again because compositor - entry is all-or-none. The session also carries private owner metadata so its - controls can resolve dependency controls; snapshots include only public - lifecycle/runtime state. - """ - - __slots__ = ("layer_controls", "_owner_compositor") - - layer_controls: OrderedDict[str, LayerControl] - _owner_compositor: "Compositor[Any, Any, Any, Any, Any, Any] | None" - - def __init__(self, layer_names: Iterable[str] | Mapping[str, LayerControl]) -> None: - self._owner_compositor = None - if isinstance(layer_names, MappingABC): - self.layer_controls = OrderedDict(layer_names.items()) - return - self.layer_controls = OrderedDict((layer_name, LayerControl()) for layer_name in layer_names) - - def suspend_on_exit(self) -> None: - """Request suspend behavior for every layer when this entry exits.""" - for control in self.layer_controls.values(): - control.suspend_on_exit() - - def delete_on_exit(self) -> None: - """Request delete behavior for every layer when this entry exits.""" - for control in self.layer_controls.values(): - control.delete_on_exit() - - def layer(self, name: str) -> LayerControl: - """Return the layer control for ``name`` or raise ``KeyError``.""" - try: - return self.layer_controls[name] - except KeyError as e: - raise KeyError(f"CompositorSession has no layer control named '{name}'.") from e - - def _bind_owner(self, compositor: "Compositor[Any, Any, Any, Any, Any, Any]") -> None: - """Bind runtime owner links on this session and all child controls.""" - self._owner_compositor = compositor - for layer_id, control in self.layer_controls.items(): - control._bind_owner(self, layer_id) - - def _control_for_dependency( - self, - owner_layer_id: str, - dep_name: str | None, - dep_layer: Layer[Any, Any, Any, Any, Any, DepRuntimeStateT, DepRuntimeHandlesT], - ) -> LayerControl[DepRuntimeStateT, DepRuntimeHandlesT]: - """Resolve a dependency control from the owner's resolved dependency targets.""" - if self._owner_compositor is None: - raise RuntimeError("CompositorSession is not attached to a compositor.") - if dep_name is None: - return self._control_for_unique_dependency(owner_layer_id, dep_layer) - return self._control_for_named_dependency(owner_layer_id, dep_name, dep_layer) - - def _layer_for_control_owner(self, owner_layer_id: str) -> Layer[Any, Any, Any, Any, Any, Any, Any]: - """Return the layer instance that owns a control in this session.""" - compositor = self._require_owner_compositor() - try: - return compositor.layers[owner_layer_id] - except KeyError as e: - raise KeyError(f"Layer '{owner_layer_id}' is not defined in this compositor.") from e - - def _control_for_unique_dependency( - self, - owner_layer_id: str, - dep_layer: Layer[Any, Any, Any, Any, Any, DepRuntimeStateT, DepRuntimeHandlesT], - ) -> LayerControl[DepRuntimeStateT, DepRuntimeHandlesT]: - compositor = self._require_owner_compositor() - dep_targets = self._dependency_targets_for(owner_layer_id) - matches = [ - (name, target_id) - for name, target_id in dep_targets.items() - if target_id is not None and compositor.layers[target_id] is dep_layer - ] - if not matches: - raise KeyError( - f"Layer '{owner_layer_id}' has no dependency target bound to the provided " - f"{type(dep_layer).__name__} instance." - ) - if len(matches) > 1: - names = ", ".join(name for name, _target_id in matches) - raise ValueError( - f"Layer '{owner_layer_id}' has multiple dependency fields bound to the provided " - f"{type(dep_layer).__name__} instance: {names}. Pass dep_name explicitly." - ) - _name, target_id = matches[0] - return cast(LayerControl[DepRuntimeStateT, DepRuntimeHandlesT], self.layer(target_id)) - - def _control_for_named_dependency( - self, - owner_layer_id: str, - dep_name: str, - dep_layer: Layer[Any, Any, Any, Any, Any, DepRuntimeStateT, DepRuntimeHandlesT], - ) -> LayerControl[DepRuntimeStateT, DepRuntimeHandlesT]: - compositor = self._require_owner_compositor() - dep_targets = self._dependency_targets_for(owner_layer_id) - if dep_name not in dep_targets: - raise KeyError(f"Layer '{owner_layer_id}' has no resolved dependency named '{dep_name}'.") - target_id = dep_targets[dep_name] - if target_id is None: - raise KeyError(f"Layer '{owner_layer_id}' dependency '{dep_name}' is not bound to a target layer.") - if compositor.layers[target_id] is not dep_layer: - raise TypeError( - f"Layer '{owner_layer_id}' dependency '{dep_name}' resolves to layer '{target_id}', " - f"not the provided {type(dep_layer).__name__} instance." - ) - return cast(LayerControl[DepRuntimeStateT, DepRuntimeHandlesT], self.layer(target_id)) - - def _require_owner_compositor(self) -> "Compositor[Any, Any, Any, Any, Any, Any]": - if self._owner_compositor is None: - raise RuntimeError("CompositorSession is not attached to a compositor.") - return self._owner_compositor - - def _dependency_targets_for(self, owner_layer_id: str) -> Mapping[str, str | None]: - compositor = self._require_owner_compositor() - try: - return compositor._resolved_dep_targets[owner_layer_id] - except KeyError as e: - raise KeyError(f"Layer '{owner_layer_id}' is not defined in this compositor.") from e - - class LayerSessionSnapshot(BaseModel): - """Serializable snapshot for one layer control.""" + """Serializable snapshot for one layer's state-only invocation data. + + ``runtime_state`` is the only snapshotted mutable layer data. ``ACTIVE`` is + rejected here because a running layer cannot be represented safely outside + the active compositor entry. + """ name: str - state: LifecycleState + lifecycle_state: LifecycleState runtime_state: dict[str, JsonValue] model_config = ConfigDict(extra="forbid") + @field_validator("lifecycle_state") + @classmethod + def _reject_active_lifecycle(cls, value: LifecycleState) -> LifecycleState: + if value is LifecycleState.ACTIVE: + raise ValueError("LifecycleState.ACTIVE is internal-only and cannot appear in session snapshots.") + return value + class CompositorSessionSnapshot(BaseModel): """Serializable compositor session snapshot. - Snapshots include runtime state only. Live runtime handles are intentionally - excluded and must be rehydrated by resume hooks using runtime state. + Snapshots include ordered layer lifecycle state and serializable runtime + state only. Live resources, handles, dependencies, prompts, tools, and config + are outside Agenton snapshots and are never captured here. """ schema_version: int = 1 @@ -354,336 +302,171 @@ class CompositorSessionSnapshot(BaseModel): model_config = ConfigDict(extra="forbid") -@dataclass(frozen=True, slots=True) -class _LayerBuildEntry: - name: str - layer: Layer[Any, Any, Any, Any, Any, Any, Any] - deps: Mapping[str, str] +type CompositorSessionSnapshotValue = _ConfigModelValue[CompositorSessionSnapshot] | Mapping[str, object] -class CompositorBuilder: - """Build compositors from registry config nodes and live instances.""" +@dataclass(slots=True) +class LayerRunSlot: + """Invocation-local lifecycle and exit state for one fresh layer instance.""" - __slots__ = ("_registry", "_entries") - - _registry: LayerRegistry - _entries: list[_LayerBuildEntry] - - def __init__(self, registry: LayerRegistry) -> None: - self._registry = registry - self._entries = [] - - def add_config(self, config: CompositorConfigValue) -> Self: - """Add all layers from a serializable compositor config.""" - conf = _validate_compositor_config_input(config) - if conf.schema_version != 1: - raise ValueError(f"Unsupported compositor config schema_version: {conf.schema_version}.") - for layer_conf in conf.layers: - self.add_config_layer( - name=layer_conf.name, - type=layer_conf.type, - config=layer_conf.config, - deps=layer_conf.deps, - ) - return self - - def add_config_layer( - self, - *, - name: str, - type: str, - config: LayerConfigValue | None = None, - deps: Mapping[str, str] | None = None, - ) -> Self: - """Resolve, validate, and add one registry-backed layer config node.""" - descriptor = self._registry.resolve(type) - raw_config = {} if config is None else config - validated_config = descriptor.config_type.model_validate(raw_config) - if descriptor.factory is not None: - layer = descriptor.factory(validated_config) - else: - layer = descriptor.layer_type.from_config(cast(Any, validated_config)) - self.add_instance(name=name, layer=layer, deps=deps) - return self - - def add_instance( - self, - *, - name: str, - layer: Layer[Any, Any, Any, Any, Any, Any, Any], - deps: Mapping[str, str] | None = None, - ) -> Self: - """Add a live layer instance, useful for Python objects and callables.""" - self._entries.append(_LayerBuildEntry(name=name, layer=layer, deps=dict(deps or {}))) - return self - - def build[PromptT, ToolT, LayerPromptT, LayerToolT, UserPromptT, LayerUserPromptT]( - self, - *, - prompt_transformer: CompositorTransformer[LayerPromptT, PromptT] | None = None, - user_prompt_transformer: CompositorTransformer[LayerUserPromptT, UserPromptT] | None = None, - tool_transformer: CompositorTransformer[LayerToolT, ToolT] | None = None, - ) -> "Compositor[PromptT, ToolT, LayerPromptT, LayerToolT, UserPromptT, LayerUserPromptT]": - """Validate names/dependencies, bind deps, and return a compositor.""" - layers: OrderedDict[str, Layer[Any, Any, Any, Any, Any, Any, Any]] = OrderedDict() - deps_name_mapping: dict[str, Mapping[str, str]] = {} - for entry in self._entries: - if entry.name in layers: - raise ValueError(f"Duplicate layer name '{entry.name}'.") - layers[entry.name] = entry.layer - deps_name_mapping[entry.name] = entry.deps - - layer_names = set(layers) - for layer_name, deps in deps_name_mapping.items(): - declared_deps = layers[layer_name].dependency_names() - unknown_dep_keys = set(deps) - declared_deps - if unknown_dep_keys: - names = ", ".join(sorted(unknown_dep_keys)) - raise ValueError(f"Layer '{layer_name}' declares unknown dependency keys: {names}.") - missing_targets = set(deps.values()) - layer_names - if missing_targets: - names = ", ".join(sorted(missing_targets)) - raise ValueError(f"Layer '{layer_name}' depends on undefined layer names: {names}.") - - return Compositor( - layers=layers, - deps_name_mapping=deps_name_mapping, - prompt_transformer=prompt_transformer, - user_prompt_transformer=user_prompt_transformer, - tool_transformer=tool_transformer, - ) + layer: Layer[Any, Any, Any, Any, Any, Any] + lifecycle_state: LifecycleState + exit_intent: ExitIntent = ExitIntent.DELETE -@dataclass(kw_only=True) -class Compositor(Generic[PromptT, ToolT, LayerPromptT, LayerToolT, UserPromptT, LayerUserPromptT]): - """Framework-neutral ordered layer graph with lifecycle and aggregation. +@dataclass(slots=True) +class CompositorRun(Generic[PromptT, ToolT, LayerPromptT, LayerToolT, UserPromptT, LayerUserPromptT]): + """Single-invocation runtime object created by ``Compositor.enter``. - ``prompt_transformer``, ``user_prompt_transformer``, and - ``tool_transformer`` are post-aggregation hooks: they run whenever - ``prompts``, ``user_prompts``, or ``tools`` is read, after layer - contributions have been collected in compositor order. Use two type - arguments for identity aggregation, four when prompt/tool layer item types - differ from exposed item types, or all six when user prompt item types also - differ. + The run owns ordered ``LayerRunSlot`` objects and the fresh layers inside + them. It is the only object that exposes live layers, lifecycle state, exit + intent, and prompt/user-prompt/tool aggregation for an active invocation. + After context exit, ``session_snapshot`` contains the next cross-call state. """ - layers: OrderedDict[str, Layer[Any, Any, Any, Any, Any, Any, Any]] - deps_name_mapping: Mapping[str, Mapping[str, str]] = field(default_factory=dict) + slots: OrderedDict[str, LayerRunSlot] prompt_transformer: CompositorTransformer[LayerPromptT, PromptT] | None = None user_prompt_transformer: CompositorTransformer[LayerUserPromptT, UserPromptT] | None = None tool_transformer: CompositorTransformer[LayerToolT, ToolT] | None = None - _deps_bound: bool = field(default=False, init=False) - _resolved_dep_targets: dict[str, dict[str, str | None]] = field(default_factory=dict, init=False) - - def __post_init__(self) -> None: - self._bind_deps(self.deps_name_mapping) - - @classmethod - def from_config( - cls, - conf: CompositorConfigValue, - *, - registry: LayerRegistry, - prompt_transformer: CompositorTransformer[LayerPromptT, PromptT] | None = None, - user_prompt_transformer: CompositorTransformer[LayerUserPromptT, UserPromptT] | None = None, - tool_transformer: CompositorTransformer[LayerToolT, ToolT] | None = None, - ) -> "Compositor[PromptT, ToolT, LayerPromptT, LayerToolT, UserPromptT, LayerUserPromptT]": - """Create a compositor from registry-backed serializable config.""" - return CompositorBuilder(registry).add_config(conf).build( - prompt_transformer=prompt_transformer, - user_prompt_transformer=user_prompt_transformer, - tool_transformer=tool_transformer, - ) - - def _bind_deps(self, deps_name_mapping: Mapping[str, Mapping[str, str]]) -> None: - """Resolve dependency-name mappings and bind dependencies on each layer. - - The outer mapping key is the layer being bound. The inner mapping key is - the dependency field declared by that layer's deps type, and the value is - the target layer name in this compositor. Explicit mappings win over - implicit same-name layer binding. Optional dependencies with no target are - recorded as ``None`` so ``LayerControl.control_for`` can distinguish - "declared but absent" from unknown dependency names. - """ - if self._deps_bound: - raise RuntimeError("Compositor deps are already bound.") - - self._resolved_dep_targets = {} - for layer_name, layer in self.layers.items(): - layer_deps = deps_name_mapping.get(layer_name, {}) - for target_layer_name in layer_deps.values(): - if target_layer_name not in self.layers: - raise ValueError( - f"Layer '{layer_name}' has a dependency on layer '{target_layer_name}', " - "which is not defined in the builder." - ) - - resolved_target_ids: dict[str, str | None] = {} - resolved_deps: dict[str, Layer[Any, Any, Any, Any, Any, Any, Any]] = {} - for dep_name in layer.dependency_names(): - target_layer_name = layer_deps.get(dep_name) - if target_layer_name is None and dep_name in self.layers: - target_layer_name = dep_name - resolved_target_ids[dep_name] = target_layer_name - if target_layer_name is not None: - resolved_deps[dep_name] = self.layers[target_layer_name] - - layer.bind_deps(resolved_deps) - self._resolved_dep_targets[layer_name] = resolved_target_ids - self._deps_bound = True + session_snapshot: CompositorSessionSnapshot | None = None @overload - def get_layer(self, layer_id: str) -> Layer[Any, Any, Any, Any, Any, Any, Any]: ... + def get_layer(self, name: str) -> Layer[Any, Any, Any, Any, Any, Any]: ... @overload - def get_layer(self, layer_id: str, layer_type: type[LayerT]) -> LayerT: ... + def get_layer(self, name: str, layer_type: type[LayerT]) -> LayerT: ... def get_layer( self, - layer_id: str, + name: str, layer_type: type[LayerT] | None = None, - ) -> Layer[Any, Any, Any, Any, Any, Any, Any] | LayerT: - """Return a layer by compositor name and optionally validate its type.""" + ) -> Layer[Any, Any, Any, Any, Any, Any] | LayerT: + """Return a live layer by node name and optionally validate its type.""" try: - layer = self.layers[layer_id] + layer = self.slots[name].layer except KeyError as e: - raise KeyError(f"Layer '{layer_id}' is not defined in this compositor.") from e + raise KeyError(f"Layer '{name}' is not defined in this compositor run.") from e if layer_type is not None and not isinstance(layer, layer_type): - raise TypeError( - f"Layer '{layer_id}' must be {layer_type.__name__}, got {type(layer).__name__}." - ) + raise TypeError(f"Layer '{name}' must be {layer_type.__name__}, got {type(layer).__name__}.") return layer - def new_session(self) -> CompositorSession: - """Create a fresh lifecycle session matching this compositor's layer order.""" - session = CompositorSession( - OrderedDict((layer_name, layer.new_control()) for layer_name, layer in self.layers.items()) - ) - session._bind_owner(self) - return session + def suspend_on_exit(self) -> None: + """Request suspend behavior for every active layer when the run exits.""" + for name in self.slots: + self.suspend_layer_on_exit(name) - def snapshot_session(self, session: CompositorSession) -> CompositorSessionSnapshot: - """Serialize non-active session lifecycle state and runtime state. + def delete_on_exit(self) -> None: + """Request delete behavior for every active layer when the run exits.""" + for name in self.slots: + self.delete_layer_on_exit(name) - Runtime handles are live Python objects and are intentionally excluded. - """ - self._validate_session(session) - active_layers = [name for name, control in session.layer_controls.items() if control.state is LifecycleState.ACTIVE] + def suspend_layer_on_exit(self, name: str) -> None: + """Request suspend behavior for one active layer when the run exits.""" + self._set_layer_exit_intent(name, ExitIntent.SUSPEND) + + def delete_layer_on_exit(self, name: str) -> None: + """Request delete behavior for one active layer when the run exits.""" + self._set_layer_exit_intent(name, ExitIntent.DELETE) + + def snapshot_session(self) -> CompositorSessionSnapshot: + """Snapshot non-active layer lifecycle state and runtime state from this run.""" + active_layers = [name for name, slot in self.slots.items() if slot.lifecycle_state is LifecycleState.ACTIVE] if active_layers: names = ", ".join(active_layers) - raise RuntimeError(f"Cannot snapshot active compositor session layers: {names}.") + raise RuntimeError(f"Cannot snapshot active compositor run layers: {names}.") return CompositorSessionSnapshot( layers=[ LayerSessionSnapshot( name=name, - state=control.state, - runtime_state=cast(dict[str, JsonValue], control.runtime_state.model_dump(mode="json")), + lifecycle_state=slot.lifecycle_state, + runtime_state=cast(dict[str, JsonValue], slot.layer.runtime_state.model_dump(mode="json")), ) - for name, control in session.layer_controls.items() + for name, slot in self.slots.items() ] ) - def session_from_snapshot(self, snapshot: CompositorSessionSnapshot | JsonValue | str | bytes) -> CompositorSession: - """Restore a session from a snapshot and reinitialize empty handles.""" - snapshot = _validate_config_model_input(CompositorSessionSnapshot, snapshot) - if snapshot.schema_version != 1: - raise ValueError(f"Unsupported compositor session snapshot schema_version: {snapshot.schema_version}.") - snapshot_layer_names = tuple(layer.name for layer in snapshot.layers) - expected_layer_names = tuple(self.layers) - if snapshot_layer_names != expected_layer_names: - expected = ", ".join(expected_layer_names) - actual = ", ".join(snapshot_layer_names) - raise ValueError( - "CompositorSessionSnapshot layer names must match compositor layers in order. " - f"Expected [{expected}], got [{actual}]." - ) - active_layers = [layer.name for layer in snapshot.layers if layer.state is LifecycleState.ACTIVE] - if active_layers: - names = ", ".join(active_layers) - raise ValueError(f"Cannot restore active compositor session layers from snapshot: {names}.") - controls = OrderedDict( - ( - layer_snapshot.name, - self.layers[layer_snapshot.name].new_control( - state=layer_snapshot.state, - runtime_state=layer_snapshot.runtime_state, - ), - ) - for layer_snapshot in snapshot.layers - ) - session = CompositorSession(controls) - session._bind_owner(self) - return session + async def _enter_layers(self) -> None: + self._ensure_layers_can_enter() + entered_slots: list[LayerRunSlot] = [] + try: + for slot in self.slots.values(): + await self._enter_slot(slot) + entered_slots.append(slot) + except BaseException as enter_error: + hook_error = await self._exit_slots_reversed(entered_slots) + self.session_snapshot = self.snapshot_session() + if hook_error is not None: + raise hook_error from enter_error + raise - @asynccontextmanager - async def enter( - self, - session: CompositorSession | None = None, - ) -> AsyncIterator[CompositorSession]: - """Enter each layer context in order and yield the active session.""" - if not self._deps_bound: - raise RuntimeError("Compositor deps must be bound before entering context.") + async def _exit_layers(self) -> None: + hook_error = await self._exit_slots_reversed(list(self.slots.values())) + self.session_snapshot = self.snapshot_session() + if hook_error is not None: + raise hook_error - if session is None: - session = self.new_session() - self._validate_session(session) - self._ensure_session_can_enter(session) - session._bind_owner(self) + async def _enter_slot(self, slot: LayerRunSlot) -> None: + if slot.lifecycle_state is LifecycleState.NEW: + slot.exit_intent = ExitIntent.DELETE + await slot.layer.on_context_create() + slot.lifecycle_state = LifecycleState.ACTIVE + return + if slot.lifecycle_state is LifecycleState.SUSPENDED: + slot.exit_intent = ExitIntent.DELETE + await slot.layer.on_context_resume() + slot.lifecycle_state = LifecycleState.ACTIVE + return + raise RuntimeError(f"Cannot enter layer from lifecycle state '{slot.lifecycle_state}'.") - async with AsyncExitStack() as stack: - for layer_name, layer in self.layers.items(): - await stack.enter_async_context(layer.enter(session.layer_controls[layer_name])) - yield session + async def _exit_slots_reversed(self, slots: Sequence[LayerRunSlot]) -> BaseException | None: + hook_error: BaseException | None = None + for slot in reversed(slots): + if slot.lifecycle_state is not LifecycleState.ACTIVE: + continue + if slot.exit_intent is ExitIntent.SUSPEND: + try: + await slot.layer.on_context_suspend() + except BaseException as exc: + hook_error = hook_error or exc + finally: + slot.lifecycle_state = LifecycleState.SUSPENDED + else: + try: + await slot.layer.on_context_delete() + except BaseException as exc: + hook_error = hook_error or exc + finally: + slot.lifecycle_state = LifecycleState.CLOSED - def _validate_session(self, session: CompositorSession) -> None: - expected_layer_names = tuple(self.layers) - actual_layer_names = tuple(session.layer_controls) - if actual_layer_names != expected_layer_names: - expected = ", ".join(expected_layer_names) - actual = ", ".join(actual_layer_names) - raise ValueError( - "CompositorSession layer names must match compositor layers in order. " - f"Expected [{expected}], got [{actual}]." - ) - for layer_name, layer in self.layers.items(): - control = session.layer_controls[layer_name] - if not isinstance(control.runtime_state, layer.runtime_state_type): - raise TypeError( - f"CompositorSession layer '{layer_name}' runtime_state must be " - f"{layer.runtime_state_type.__name__}, got {type(control.runtime_state).__name__}." - ) - if not isinstance(control.runtime_handles, layer.runtime_handles_type): - raise TypeError( - f"CompositorSession layer '{layer_name}' runtime_handles must be " - f"{layer.runtime_handles_type.__name__}, got {type(control.runtime_handles).__name__}." - ) + return hook_error - def _ensure_session_can_enter(self, session: CompositorSession) -> None: - """Reject active or closed layer controls before any layer side effects.""" - for control in session.layer_controls.values(): - if control.state is LifecycleState.ACTIVE: - raise RuntimeError( - "LayerControl is already active; duplicate or nested enter is not allowed." - ) - if control.state is LifecycleState.CLOSED: - raise RuntimeError( - "LayerControl is closed; create a new compositor session before entering again." - ) + def _set_layer_exit_intent(self, name: str, intent: ExitIntent) -> None: + try: + slot = self.slots[name] + except KeyError as e: + raise KeyError(f"Layer '{name}' is not defined in this compositor run.") from e + if slot.lifecycle_state is not LifecycleState.ACTIVE: + raise RuntimeError("Layer exit intent can only be changed while the run slot is active.") + slot.exit_intent = intent + + def _ensure_layers_can_enter(self) -> None: + """Reject invalid external lifecycle states before any layer side effects.""" + for name, slot in self.slots.items(): + if slot.lifecycle_state is LifecycleState.ACTIVE: + raise RuntimeError(f"Layer '{name}' is already active; ACTIVE snapshots are not allowed.") + if slot.lifecycle_state is LifecycleState.CLOSED: + raise RuntimeError(f"Layer '{name}' is closed; CLOSED snapshots cannot be entered.") @property def prompts(self) -> list[PromptT]: result: list[LayerPromptT] = [] - for layer in self.layers.values(): - result.extend( - cast(LayerPromptT, layer.wrap_prompt(prompt)) - for prompt in layer.prefix_prompts - ) - for layer in reversed(self.layers.values()): - result.extend( - cast(LayerPromptT, layer.wrap_prompt(prompt)) - for prompt in layer.suffix_prompts - ) + for slot in self.slots.values(): + layer = slot.layer + result.extend(cast(LayerPromptT, layer.wrap_prompt(prompt)) for prompt in layer.prefix_prompts) + for slot in reversed(self.slots.values()): + layer = slot.layer + result.extend(cast(LayerPromptT, layer.wrap_prompt(prompt)) for prompt in layer.suffix_prompts) if self.prompt_transformer is None: return cast(list[PromptT], result) return list(self.prompt_transformer(result)) @@ -691,11 +474,9 @@ class Compositor(Generic[PromptT, ToolT, LayerPromptT, LayerToolT, UserPromptT, @property def user_prompts(self) -> list[UserPromptT]: result: list[LayerUserPromptT] = [] - for layer in self.layers.values(): - result.extend( - cast(LayerUserPromptT, layer.wrap_user_prompt(prompt)) - for prompt in layer.user_prompts - ) + for slot in self.slots.values(): + layer = slot.layer + result.extend(cast(LayerUserPromptT, layer.wrap_user_prompt(prompt)) for prompt in layer.user_prompts) if self.user_prompt_transformer is None: return cast(list[UserPromptT], result) return list(self.user_prompt_transformer(result)) @@ -703,25 +484,271 @@ class Compositor(Generic[PromptT, ToolT, LayerPromptT, LayerToolT, UserPromptT, @property def tools(self) -> list[ToolT]: result: list[LayerToolT] = [] - for layer in self.layers.values(): + for slot in self.slots.values(): + layer = slot.layer result.extend(cast(LayerToolT, layer.wrap_tool(tool)) for tool in layer.tools) if self.tool_transformer is None: return cast(list[ToolT], result) return list(self.tool_transformer(result)) +class Compositor(Generic[PromptT, ToolT, LayerPromptT, LayerToolT, UserPromptT, LayerUserPromptT]): + """Reusable, framework-neutral ordered layer graph plan. + + A compositor stores only immutable graph nodes and provider construction + plans. It is safe to enter repeatedly or concurrently because every entry + creates a separate ``CompositorRun`` with fresh layer instances, run slots, + dependency bindings, and optional hydrated runtime state. Session continuity + is explicit: pass the previous ``CompositorSessionSnapshot`` to the next + ``enter`` call and read the next one from ``run.session_snapshot`` after + exit. + + ``prompt_transformer``, ``user_prompt_transformer``, and + ``tool_transformer`` are post-aggregation hooks on each run. Use two type + arguments for identity aggregation, four when prompt/tool layer item types + differ from exposed item types, or all six when user prompt item types also + differ. + """ + + __slots__ = ("_nodes", "prompt_transformer", "tool_transformer", "user_prompt_transformer") + + _nodes: tuple[LayerNode, ...] + prompt_transformer: CompositorTransformer[LayerPromptT, PromptT] | None + user_prompt_transformer: CompositorTransformer[LayerUserPromptT, UserPromptT] | None + tool_transformer: CompositorTransformer[LayerToolT, ToolT] | None + + def __init__( + self, + nodes: Sequence[LayerNode], + *, + prompt_transformer: CompositorTransformer[LayerPromptT, PromptT] | None = None, + user_prompt_transformer: CompositorTransformer[LayerUserPromptT, UserPromptT] | None = None, + tool_transformer: CompositorTransformer[LayerToolT, ToolT] | None = None, + ) -> None: + self._nodes = tuple(nodes) + self.prompt_transformer = prompt_transformer + self.user_prompt_transformer = user_prompt_transformer + self.tool_transformer = tool_transformer + self._validate_nodes() + + @property + def nodes(self) -> tuple[LayerNode, ...]: + """Return the stateless graph plan nodes in compositor order.""" + return self._nodes + + @classmethod + def from_config( + cls, + conf: CompositorConfigValue, + *, + providers: Sequence[LayerProviderInput], + node_providers: Mapping[str, LayerProviderInput] | None = None, + prompt_transformer: CompositorTransformer[LayerPromptT, PromptT] | None = None, + user_prompt_transformer: CompositorTransformer[LayerUserPromptT, UserPromptT] | None = None, + tool_transformer: CompositorTransformer[LayerToolT, ToolT] | None = None, + ) -> "Compositor[PromptT, ToolT, LayerPromptT, LayerToolT, UserPromptT, LayerUserPromptT]": + """Create a reusable compositor plan from serializable graph config. + + ``providers`` resolve graph node ``type`` ids. ``node_providers`` are + keyed by graph node name and take precedence over the type-id provider, + allowing node-specific construction without passing node data to factory + callables. + """ + graph_config = _validate_compositor_config_input(conf) + if graph_config.schema_version != 1: + raise ValueError(f"Unsupported compositor config schema_version: {graph_config.schema_version}.") + + provider_by_type = _build_provider_type_map(providers) + provider_by_node = {name: _as_layer_provider(provider) for name, provider in (node_providers or {}).items()} + graph_node_names = {node.name for node in graph_config.layers} + unknown_node_providers = provider_by_node.keys() - graph_node_names + if unknown_node_providers: + names = ", ".join(sorted(unknown_node_providers)) + raise ValueError(f"node_providers contains unknown layer node names: {names}.") + + nodes: list[LayerNode] = [] + for node_config in graph_config.layers: + provider = provider_by_node.get(node_config.name) + if provider is None: + try: + provider = provider_by_type[node_config.type] + except KeyError as e: + raise KeyError(f"Layer type id '{node_config.type}' is not registered.") from e + nodes.append( + LayerNode( + node_config.name, + provider, + deps=node_config.deps, + metadata=node_config.metadata, + ) + ) + + return cls( + nodes, + prompt_transformer=prompt_transformer, + user_prompt_transformer=user_prompt_transformer, + tool_transformer=tool_transformer, + ) + + @asynccontextmanager + async def enter( + self, + *, + configs: Mapping[str, LayerConfigInput] | None = None, + session_snapshot: CompositorSessionSnapshotValue | None = None, + ) -> AsyncIterator[CompositorRun[PromptT, ToolT, LayerPromptT, LayerToolT, UserPromptT, LayerUserPromptT]]: + """Create a fresh run, enter layers in graph order, and yield it. + + Configs are keyed by layer node name and validated before factories run. + The optional session snapshot is validated and hydrated before any hook + runs. Layers exit in reverse graph order, and ``run.session_snapshot`` is + populated after exit with the next non-active lifecycle states. + """ + run = self._create_run(configs=configs, session_snapshot=session_snapshot) + await run._enter_layers() + try: + yield run + finally: + await run._exit_layers() + + def _create_run( + self, + *, + configs: Mapping[str, LayerConfigInput] | None, + session_snapshot: CompositorSessionSnapshotValue | None, + ) -> CompositorRun[PromptT, ToolT, LayerPromptT, LayerToolT, UserPromptT, LayerUserPromptT]: + config_by_name = self._validate_run_configs(configs) + typed_config_by_name = self._validate_layer_configs(config_by_name) + snapshot = self._validate_session_snapshot(session_snapshot) if session_snapshot is not None else None + layer_by_name = self._create_layers(typed_config_by_name) + + snapshot_by_name = {layer_snapshot.name: layer_snapshot for layer_snapshot in snapshot.layers} if snapshot else {} + lifecycle_by_name: dict[str, LifecycleState] = {} + for node in self._nodes: + layer = layer_by_name[node.name] + layer_snapshot = snapshot_by_name.get(node.name) + if layer_snapshot is None: + lifecycle_by_name[node.name] = LifecycleState.NEW + continue + layer.runtime_state = cast(Any, layer.runtime_state_type.model_validate(layer_snapshot.runtime_state)) + lifecycle_by_name[node.name] = layer_snapshot.lifecycle_state + + self._bind_deps(layer_by_name) + return CompositorRun( + slots=OrderedDict( + (node.name, LayerRunSlot(layer=layer_by_name[node.name], lifecycle_state=lifecycle_by_name[node.name])) + for node in self._nodes + ), + prompt_transformer=self.prompt_transformer, + user_prompt_transformer=self.user_prompt_transformer, + tool_transformer=self.tool_transformer, + ) + + def _create_layers( + self, + config_by_name: Mapping[str, LayerConfig], + ) -> OrderedDict[str, Layer[Any, Any, Any, Any, Any, Any]]: + return OrderedDict( + (node.name, node.provider.create_layer_from_config(config_by_name[node.name])) + for node in self._nodes + ) + + def _validate_layer_configs(self, config_by_name: Mapping[str, LayerConfigInput]) -> dict[str, LayerConfig]: + """Validate every node config before any provider factory is invoked.""" + return { + node.name: node.provider.validate_config(config_by_name.get(node.name)) + for node in self._nodes + } + + def _bind_deps(self, layer_by_name: Mapping[str, Layer[Any, Any, Any, Any, Any, Any]]) -> None: + """Resolve dependency-name mappings and bind direct layer dependencies.""" + for node in self._nodes: + layer = layer_by_name[node.name] + resolved_deps = {dep_name: layer_by_name[target_name] for dep_name, target_name in node.deps.items()} + layer.bind_deps(resolved_deps) + + def _validate_nodes(self) -> None: + layer_names: set[str] = set() + for node in self._nodes: + if node.name in layer_names: + raise ValueError(f"Duplicate layer name '{node.name}'.") + layer_names.add(node.name) + + for node in self._nodes: + declared_deps = node.provider.layer_type.dependency_names() + unknown_dep_keys = set(node.deps) - declared_deps + if unknown_dep_keys: + names = ", ".join(sorted(unknown_dep_keys)) + raise ValueError(f"Layer '{node.name}' declares unknown dependency keys: {names}.") + missing_targets = set(node.deps.values()) - layer_names + if missing_targets: + names = ", ".join(sorted(missing_targets)) + raise ValueError(f"Layer '{node.name}' depends on undefined layer names: {names}.") + + def _validate_run_configs(self, configs: Mapping[str, LayerConfigInput] | None) -> dict[str, LayerConfigInput]: + config_by_name = dict(configs or {}) + known_names = {node.name for node in self._nodes} + unknown_names = config_by_name.keys() - known_names + if unknown_names: + names = ", ".join(sorted(unknown_names)) + raise ValueError(f"Layer configs contain unknown layer node names: {names}.") + return config_by_name + + def _validate_session_snapshot( + self, + snapshot: CompositorSessionSnapshotValue, + ) -> CompositorSessionSnapshot: + resolved_snapshot = _validate_config_model_input(CompositorSessionSnapshot, snapshot) + if resolved_snapshot.schema_version != 1: + raise ValueError( + f"Unsupported compositor session snapshot schema_version: {resolved_snapshot.schema_version}." + ) + expected_layer_names = tuple(node.name for node in self._nodes) + actual_layer_names = tuple(layer.name for layer in resolved_snapshot.layers) + if actual_layer_names != expected_layer_names: + expected = ", ".join(expected_layer_names) + actual = ", ".join(actual_layer_names) + raise ValueError( + "CompositorSessionSnapshot layer names must match compositor layers in order. " + f"Expected [{expected}], got [{actual}]." + ) + return resolved_snapshot + + +def _as_layer_provider(implementation: LayerProviderInput) -> LayerProvider[Any]: + if isinstance(implementation, LayerProvider): + return implementation + if isinstance(implementation, type) and issubclass(implementation, Layer): + return LayerProvider.from_layer_type(implementation) + raise TypeError("LayerNode implementation must be a Layer subclass or LayerProvider.") + + +def _build_provider_type_map(providers: Sequence[LayerProviderInput]) -> dict[str, LayerProvider[Any]]: + provider_by_type: dict[str, LayerProvider[Any]] = {} + for provider_input in providers: + provider = _as_layer_provider(provider_input) + type_id = provider.type_id + if type_id is None or not type_id: + raise ValueError(f"Layer provider for '{provider.layer_type.__qualname__}' must declare a type_id.") + if type_id in provider_by_type: + raise ValueError(f"Layer type id '{type_id}' is already registered.") + provider_by_type[type_id] = provider + return provider_by_type + + __all__ = [ "Compositor", - "CompositorBuilder", "CompositorConfig", "CompositorConfigValue", + "CompositorRun", "CompositorSessionSnapshot", - "CompositorSession", + "CompositorSessionSnapshotValue", "CompositorTransformer", "CompositorTransformerKwargs", - "LayerDescriptor", "LayerFactory", + "LayerNode", "LayerNodeConfig", - "LayerRegistry", + "LayerProvider", + "LayerRunSlot", "LayerSessionSnapshot", ] diff --git a/dify-agent/src/agenton/layers/__init__.py b/dify-agent/src/agenton/layers/__init__.py index d86695f17f..bc02a13f0c 100644 --- a/dify-agent/src/agenton/layers/__init__.py +++ b/dify-agent/src/agenton/layers/__init__.py @@ -7,13 +7,11 @@ families while keeping concrete reusable layers in ``agenton_collections``. from agenton.layers.base import ( EmptyLayerConfig, - EmptyRuntimeHandles, EmptyRuntimeState, ExitIntent, Layer, LayerConfig, LayerConfigValue, - LayerControl, LayerDeps, LifecycleState, NoLayerDeps, @@ -46,12 +44,10 @@ __all__ = [ "LayerConfig", "LayerConfigValue", "LayerDeps", - "LayerControl", "LifecycleState", "ExitIntent", "EmptyLayerConfig", "EmptyRuntimeState", - "EmptyRuntimeHandles", "NoLayerDeps", "PlainLayer", "PlainPrompt", diff --git a/dify-agent/src/agenton/layers/base.py b/dify-agent/src/agenton/layers/base.py index 7ec761636c..591c24b4e5 100644 --- a/dify-agent/src/agenton/layers/base.py +++ b/dify-agent/src/agenton/layers/base.py @@ -1,37 +1,41 @@ -"""Core layer abstractions and typed dependency binding. +"""Invocation-scoped core layer abstractions and typed dependency binding. -Layers declare their dependency shape with ``Layer[DepsT, PromptT, ToolT, ...]``. +Agenton core deliberately manages only three concerns: stateless layer graph +composition, serializable ``runtime_state`` lifecycle, and session snapshots. It +does not own live resources, process handles, HTTP clients, cleanup stacks, or +any other non-serializable runtime object. Those belong to application layers or +integration code outside the core. + +Layers declare their dependency shape with +``Layer[DepsT, PromptT, UserPromptT, ToolT, ConfigT, RuntimeStateT]``. ``DepsT`` must be a ``LayerDeps`` subclass whose annotated members are concrete -``Layer`` subclasses or modern optional dependencies such as ``SomeLayer | -None``. The optional trailing generic slots declare Pydantic schemas for config, -serializable runtime state, and live runtime handles. The base class infers -``deps_type`` and schema class attributes from the generic base when possible, -while still allowing subclasses to set them explicitly for unusual inheritance -patterns. +``Layer`` subclasses or modern optional dependencies such as ``SomeLayer | None``. +Dependencies are direct layer instance relationships bound onto ``self.deps`` +for one compositor invocation; there is no dependency-control lookup API in the +core. -``LayerConfig`` is the DTO base for config schemas that can be embedded directly -in serializable compositor config. Runtime state and handle schemas remain plain -Pydantic models because they are not accepted as graph input. +``LayerConfig`` is the DTO base for config schemas accepted by layer providers. +The provider validates raw node-name keyed configs with a layer's +``config_type`` before constructing the layer and assigning ``self.config``. +``runtime_state_type`` is the only mutable schema managed by Agenton and the only +per-layer data included in session snapshots. The base class infers +``deps_type``, ``config_type``, and ``runtime_state_type`` from generic bases +when possible, while still allowing subclasses to set them explicitly for +unusual inheritance patterns. -``Layer.bind_deps`` is the mutation point for dependency state. Layer -implementations should treat ``self.deps`` as unavailable until a compositor or -caller has resolved and bound dependencies. When a layer needs a dependency's -session-local state or handles, use the current ``LayerControl.control_for`` API -instead of storing dependency controls on layer instances. +``Layer`` is an invocation-scoped business object. It owns ``config``, direct +``deps``, and serializable ``runtime_state`` plus prompt/tool authoring surfaces, +but it does not own lifecycle state, exit intent, graph owner tokens, entry +stacks, resources, or cleanup callbacks. ``CompositorRun`` owns lifecycle state +and exit intent for one entry. ``SessionSnapshot`` objects are the only supported +cross-call state carrier. -Layer async entry uses a caller-provided ``LayerControl`` as an explicit state -machine and per-session runtime owner. A fresh control starts in -``LifecycleState.NEW`` and enters create logic. A suspended control resumes, -while active or closed controls are rejected to prevent ambiguous nested or -post-delete reuse. Exit behavior is selected per entry with ``ExitIntent`` and -resets to delete on every successful enter. Layer instances are shared graph and -capability definitions, so session-local serializable ids, checkpoints, and -other snapshot data belong in ``LayerControl.runtime_state``; live clients, -connections, and process handles belong in ``LayerControl.runtime_handles``. -Neither category should be stored on ``self`` when it is session-local. Live -resources that need deterministic cleanup should be registered on the control's -per-entry resource stack; the base lifecycle closes that stack after suspend and -delete hooks, and also when create/resume fails. +Lifecycle hooks are no-argument business hooks on the layer instance: +``on_context_create/resume/suspend/delete(self)``. They should read dependencies +from ``self.deps`` and read or mutate serializable invocation state through +``self.runtime_state``. Resource acquisition and deterministic cleanup should be +handled outside Agenton core, for example by integration-specific context +managers that wrap compositor entry. ``Layer`` is framework-neutral over system prompt, user prompt, and tool item types. The native ``prefix_prompts``, ``suffix_prompts``, ``user_prompts``, and @@ -42,24 +46,19 @@ native values without changing layer implementations. """ from abc import ABC, abstractmethod -from collections.abc import AsyncIterator, Awaitable, Callable -from contextlib import AbstractAsyncContextManager, AsyncExitStack, asynccontextmanager -from dataclasses import dataclass, field +from collections.abc import Mapping, Sequence +from dataclasses import dataclass from enum import StrEnum from types import UnionType from typing import ( Any, ClassVar, Generic, - Mapping, - Protocol, - Sequence, Union, cast, get_args, get_origin, get_type_hints, - overload, ) from pydantic import BaseModel, ConfigDict, JsonValue, SerializeAsAny @@ -75,10 +74,10 @@ _ToolT = TypeVar("_ToolT") class LayerConfig(BaseModel): """Base DTO for serializable layer configuration. - Subclasses are safe to place in ``LayerNodeConfig.config``. The compositor - still accepts plain JSON values for wire input, but typed Python call sites can - use concrete ``LayerConfig`` subclasses and preserve their fields during JSON - serialization. + Layer providers validate raw config values with concrete ``LayerConfig`` + subclasses before constructing a layer for one invocation. Serializable + compositor graph config references layer type ids and node metadata only; + per-call config travels through ``Compositor.enter(configs=...)``. """ model_config = ConfigDict(extra="forbid") @@ -89,34 +88,17 @@ type LayerConfigValue = JsonValue | SerializeAsAny[LayerConfig] _ConfigT = TypeVar("_ConfigT", bound=LayerConfig, default="EmptyLayerConfig") _RuntimeStateT = TypeVar("_RuntimeStateT", bound=BaseModel, default="EmptyRuntimeState") -_RuntimeHandlesT = TypeVar("_RuntimeHandlesT", bound=BaseModel, default="EmptyRuntimeHandles") -_DepRuntimeStateT = TypeVar("_DepRuntimeStateT", bound=BaseModel) -_DepRuntimeHandlesT = TypeVar("_DepRuntimeHandlesT", bound=BaseModel) -_ResourceT = TypeVar("_ResourceT") - - -class _LayerControlOwnerSession(Protocol): - """Private structural API used by controls to resolve dependency controls.""" - - def _control_for_dependency( - self, - owner_layer_id: str, - dep_name: str | None, - dep_layer: "Layer[Any, Any, Any, Any, Any, _DepRuntimeStateT, _DepRuntimeHandlesT]", - ) -> "LayerControl[_DepRuntimeStateT, _DepRuntimeHandlesT]": ... - - def _layer_for_control_owner(self, owner_layer_id: str) -> "Layer[Any, Any, Any, Any, Any, Any, Any]": ... class LayerDeps: - """Typed dependency container for a Layer. + """Typed dependency container for a layer. Subclasses declare dependency members with annotations. Every annotated member must be a Layer subclass or ``LayerSubclass | None``. Optional deps are always assigned as attributes; missing optional values become ``None``. """ - def __init__(self, **deps: "Layer[Any, Any, Any, Any, Any, Any, Any] | None") -> None: + def __init__(self, **deps: "Layer[Any, Any, Any, Any, Any, Any] | None") -> None: dep_specs = _get_dep_specs(type(self)) missing_names = {name for name, spec in dep_specs.items() if not spec.optional} - deps.keys() if missing_names: @@ -155,23 +137,17 @@ class EmptyLayerConfig(LayerConfig): class EmptyRuntimeState(BaseModel): - """Default serializable per-session runtime state schema.""" + """Default serializable invocation runtime state schema.""" model_config = ConfigDict(extra="forbid", validate_assignment=True) -class EmptyRuntimeHandles(BaseModel): - """Default live per-session runtime handle schema. - - Handles may contain arbitrary Python objects and are intentionally excluded - from session snapshots. - """ - - model_config = ConfigDict(extra="forbid", validate_assignment=True, arbitrary_types_allowed=True) - - class LifecycleState(StrEnum): - """Externally observable lifecycle state for a layer control.""" + """Lifecycle state for one run slot. + + ``ACTIVE`` is internal-only. It is used while an invocation is running and + must never appear in external session snapshots or hydrated input. + """ NEW = "new" ACTIVE = "active" @@ -180,165 +156,53 @@ class LifecycleState(StrEnum): class ExitIntent(StrEnum): - """Per-entry exit behavior requested for a layer control.""" + """Run-slot exit behavior requested during active invocation.""" DELETE = "delete" SUSPEND = "suspend" -@dataclass(slots=True) -class LayerControl(Generic[_RuntimeStateT, _RuntimeHandlesT]): - """Stateful control slot passed into a layer entry context. - - ``Layer.enter`` requires the caller to provide this object. The control owns - the layer lifecycle state, the current entry's exit intent, and arbitrary - per-session runtime state and live handles. Call ``suspend_on_exit`` before leaving the - context to make a later entry resume; call ``delete_on_exit`` or do nothing - for the default delete behavior. Store session-local serializable ids, - checkpoints, and other snapshot data in ``runtime_state``. Store live - clients, connections, process handles, and other non-serializable objects in - ``runtime_handles``. Do not put either kind of session-local data on the - shared layer instance. - - ``runtime_state`` intentionally persists after suspend and delete. Suspend, - resume, and delete hooks can inspect the same values created on entry, and - callers may inspect closed-session diagnostics after exit. Reuse is still - governed by ``state``: a closed control cannot be entered again. Runtime - handles are not serialized in snapshots and should be rehydrated from - runtime state in resume hooks. A compositor also binds private owner metadata - so ``control_for`` can find controls for this layer's dependencies in the - same session; those links are runtime-only and not part of snapshots. The - per-entry resource stack is also runtime-only and exists only while the layer - is entering, active, or exiting. - """ - - state: LifecycleState = LifecycleState.NEW - exit_intent: ExitIntent = ExitIntent.DELETE - runtime_state: _RuntimeStateT = field(default_factory=lambda: cast(_RuntimeStateT, EmptyRuntimeState())) - runtime_handles: _RuntimeHandlesT = field(default_factory=lambda: cast(_RuntimeHandlesT, EmptyRuntimeHandles())) - _owner_session: _LayerControlOwnerSession | None = field(default=None, init=False, repr=False, compare=False) - _owner_layer_id: str | None = field(default=None, init=False, repr=False, compare=False) - _entry_stack: AsyncExitStack | None = field(default=None, init=False, repr=False, compare=False) - - def suspend_on_exit(self) -> None: - """Request suspend behavior when the current layer entry exits.""" - self.exit_intent = ExitIntent.SUSPEND - - def delete_on_exit(self) -> None: - """Request delete behavior when the current layer entry exits.""" - self.exit_intent = ExitIntent.DELETE - - async def enter_async_resource(self, cm: AbstractAsyncContextManager[_ResourceT]) -> _ResourceT: - """Enter ``cm`` on this control's current entry resource stack. - - Resource registration is available only while a layer entry is in - progress or active. The base lifecycle closes registered resources after - suspend/delete hooks and when create/resume fails. - """ - return await self._require_entry_stack().enter_async_context(cm) - - def add_async_cleanup(self, callback: Callable[[], Awaitable[None]]) -> None: - """Register an async cleanup callback on the current entry resource stack.""" - self._require_entry_stack().push_async_callback(callback) - - @overload - def control_for( - self, - dep_layer: "Layer[Any, Any, Any, Any, Any, _DepRuntimeStateT, _DepRuntimeHandlesT]", - /, - ) -> "LayerControl[_DepRuntimeStateT, _DepRuntimeHandlesT]": ... - - @overload - def control_for( - self, - dep_name: str, - dep_layer: "Layer[Any, Any, Any, Any, Any, _DepRuntimeStateT, _DepRuntimeHandlesT]", - /, - ) -> "LayerControl[_DepRuntimeStateT, _DepRuntimeHandlesT]": ... - - def control_for( - self, - dep_name_or_layer: "str | Layer[Any, Any, Any, Any, Any, _DepRuntimeStateT, _DepRuntimeHandlesT]", - dep_layer: "Layer[Any, Any, Any, Any, Any, _DepRuntimeStateT, _DepRuntimeHandlesT] | None" = None, - /, - ) -> "LayerControl[_DepRuntimeStateT, _DepRuntimeHandlesT]": - """Return the current session control for one resolved dependency. - - ``control_for(dep_layer)`` is for the common case where exactly one - resolved dependency target of this control's owner layer is ``dep_layer``. - Use ``control_for(dep_name, dep_layer)`` when multiple dependency fields - can point at the same layer instance or when the name makes the lookup - clearer. Optional dependencies that resolved to ``None`` have no control - and raise ``KeyError`` when requested. - """ - if isinstance(dep_name_or_layer, str): - if dep_layer is None: - raise TypeError("LayerControl.control_for(dep_name, dep_layer) requires dep_layer.") - dep_name = dep_name_or_layer - resolved_dep_layer = dep_layer - else: - if dep_layer is not None: - raise TypeError("LayerControl.control_for accepts either (dep_layer) or (dep_name, dep_layer).") - dep_name = None - resolved_dep_layer = dep_name_or_layer - - if self._owner_session is None or self._owner_layer_id is None: - raise RuntimeError("LayerControl is not attached to a compositor session.") - - return self._owner_session._control_for_dependency(self._owner_layer_id, dep_name, resolved_dep_layer) - - def _bind_owner(self, session: _LayerControlOwnerSession, layer_id: str) -> None: - """Attach runtime owner metadata used by ``control_for``.""" - self._owner_session = session - self._owner_layer_id = layer_id - - def _require_entry_stack(self) -> AsyncExitStack: - if self._entry_stack is None: - raise RuntimeError("LayerControl entry resource stack is not active.") - return self._entry_stack - - def _begin_entry_stack(self) -> None: - if self._entry_stack is not None: - raise RuntimeError("LayerControl entry resource stack is already active.") - self._entry_stack = AsyncExitStack() - - async def _close_entry_stack(self) -> None: - stack = self._entry_stack - self._entry_stack = None - if stack is not None: - await stack.aclose() - - @dataclass(frozen=True, slots=True) class LayerDepSpec: """Runtime dependency specification derived from a deps annotation.""" - layer_type: type["Layer[Any, Any, Any, Any, Any, Any, Any]"] + layer_type: type["Layer[Any, Any, Any, Any, Any, Any]"] optional: bool = False class Layer( ABC, - Generic[_DepsT, _PromptT, _UserPromptT, _ToolT, _ConfigT, _RuntimeStateT, _RuntimeHandlesT], + Generic[_DepsT, _PromptT, _UserPromptT, _ToolT, _ConfigT, _RuntimeStateT], ): """Framework-neutral base class for prompt/tool layers. - Subclasses expose optional prompt fragments and tools through typed - properties. They declare required dependencies in the ``DepsT`` container - rather than by accepting dependencies in ``__init__``. Layer instances can be - entered by multiple sessions, including concurrently, so lifecycle hooks - should store session-local runtime values on the passed ``LayerControl``. - The default async context manager handles create, resume, suspend, and - delete transitions; layers can override ``enter`` when they need to wrap - extra runtime resources. + A layer instance is invocation-scoped mutable business state, not a reusable + cross-session definition. ``CompositorRun`` creates fresh instances through + layer providers, assigns validated ``config``, binds direct dependency layer + instances to ``deps``, hydrates ``runtime_state`` from an optional session + snapshot, and then runs no-argument lifecycle hooks. The run owns lifecycle + state and exit intent; layers never expose a public entry context manager. + + Live resources and handles are intentionally outside this abstraction. Only + ``runtime_state`` is managed and snapshotted by Agenton core. Lifecycle hooks + should operate on ``self`` and keep any non-serializable cleanup policy in + integration code that wraps the compositor. """ deps_type: type[_DepsT] + config: _ConfigT deps: _DepsT + runtime_state: _RuntimeStateT type_id: ClassVar[str | None] = None config_type: ClassVar[type[LayerConfig]] = EmptyLayerConfig runtime_state_type: ClassVar[type[BaseModel]] = EmptyRuntimeState - runtime_handles_type: ClassVar[type[BaseModel]] = EmptyRuntimeHandles + + def __new__(cls, *args: object, **kwargs: object) -> Self: + instance = cast(Self, super().__new__(cls)) + runtime_state_type = getattr(cls, "runtime_state_type", None) + if isinstance(runtime_state_type, type) and issubclass(runtime_state_type, BaseModel): + instance.runtime_state = cast(Any, runtime_state_type.model_validate({})) + return instance def __init_subclass__(cls) -> None: super().__init_subclass__() @@ -362,56 +226,38 @@ class Layer( _infer_schema_type(cls, 5, "runtime_state_type"), EmptyRuntimeState, ) - _init_schema_type( - cls, - "runtime_handles_type", - _infer_schema_type(cls, 6, "runtime_handles_type"), - EmptyRuntimeHandles, - ) @classmethod def from_config(cls: type[Self], config: _ConfigT) -> Self: """Create a layer from schema-validated serialized config. - Registries/builders validate raw config with ``config_type`` before - calling this method. Layers are not config-constructible by default. - Subclasses that accept config should override this method and consume - the typed Pydantic model for their schema. + ``LayerProvider.from_layer_type`` validates raw config with + ``config_type`` before calling this method. Layers without config use the + default no-argument construction path. Layers with a concrete config + schema should override this method and consume the typed Pydantic model. """ - raise TypeError(f"{cls.__name__} cannot be created from config.") + if cls.config_type is not EmptyLayerConfig: + raise TypeError(f"{cls.__name__} cannot be created from config; override from_config or use a provider.") + EmptyLayerConfig.model_validate(config) + try: + return cast(Self, cls()) + except TypeError as e: + raise TypeError(f"{cls.__name__} cannot be created from empty config; use a custom provider.") from e @classmethod def dependency_names(cls) -> frozenset[str]: """Return dependency field names declared by this layer's deps schema.""" return frozenset(_get_dep_specs(cls.deps_type)) - def new_control( - self, - *, - state: LifecycleState = LifecycleState.NEW, - runtime_state: object | None = None, - ) -> LayerControl[_RuntimeStateT, _RuntimeHandlesT]: - """Create a schema-validated per-session control for this layer. - - ``runtime_state`` is validated through ``runtime_state_type`` and live - handles are always initialized empty through ``runtime_handles_type``. - """ - raw_runtime_state = {} if runtime_state is None else runtime_state - return LayerControl( - state=state, - exit_intent=ExitIntent.DELETE, - runtime_state=cast(_RuntimeStateT, self.runtime_state_type.model_validate(raw_runtime_state)), - runtime_handles=cast(_RuntimeHandlesT, self.runtime_handles_type.model_validate({})), - ) - - def bind_deps(self, deps: Mapping[str, "Layer[Any, Any, Any, Any, Any, Any, Any] | None"]) -> None: + def bind_deps(self, deps: Mapping[str, "Layer[Any, Any, Any, Any, Any, Any] | None"]) -> None: """Bind this layer's declared dependencies from a name-to-layer mapping. The mapping may include more layers than the declared dependency fields. Only names declared by ``deps_type`` are selected and validated. Missing - optional deps are bound as ``None``. + optional deps are bound as ``None``. Bound values are direct layer + instances for this invocation graph. """ - resolved_deps: dict[str, Layer[Any, Any, Any, Any, Any, Any, Any] | None] = {} + resolved_deps: dict[str, Layer[Any, Any, Any, Any, Any, Any] | None] = {} for name, spec in _get_dep_specs(self.deps_type).items(): if name not in deps: if spec.optional: @@ -423,127 +269,17 @@ class Layer( resolved_deps[name] = deps[name] self.deps = self.deps_type(**resolved_deps) - def require_control( - self, - control: LayerControl[Any, Any], - *, - active: bool = False, - ) -> LayerControl[_RuntimeStateT, _RuntimeHandlesT]: - """Validate and return ``control`` as this layer's current session control. + async def on_context_create(self) -> None: + """Run when the run slot enters from ``LifecycleState.NEW``.""" - Capability methods should accept their own layer control explicitly and - call this helper before reading runtime state or handles. The control must - be attached to a compositor session whose owner layer id resolves to this - exact layer instance. Runtime state and handle schemas are checked against - the layer's declared schema types; when ``active`` is true, the lifecycle - state must be ``LifecycleState.ACTIVE``. - """ - if control._owner_session is None or control._owner_layer_id is None: - raise RuntimeError("LayerControl is not attached to a compositor session.") - try: - owner_layer = control._owner_session._layer_for_control_owner(control._owner_layer_id) - except KeyError as e: - raise RuntimeError( - f"LayerControl owner layer '{control._owner_layer_id}' is not defined in its compositor." - ) from e - if owner_layer is not self: - raise RuntimeError( - f"LayerControl belongs to layer '{control._owner_layer_id}', not this {type(self).__name__} instance." - ) - if not isinstance(control.runtime_state, self.runtime_state_type): - raise TypeError( - f"{type(self).__name__} control runtime_state must be {self.runtime_state_type.__name__}, " - f"got {type(control.runtime_state).__name__}." - ) - if not isinstance(control.runtime_handles, self.runtime_handles_type): - raise TypeError( - f"{type(self).__name__} control runtime_handles must be {self.runtime_handles_type.__name__}, " - f"got {type(control.runtime_handles).__name__}." - ) - if active and control.state is not LifecycleState.ACTIVE: - raise RuntimeError( - f"{type(self).__name__} requires an active LayerControl; current state is {control.state.value}." - ) - return cast(LayerControl[_RuntimeStateT, _RuntimeHandlesT], control) + async def on_context_delete(self) -> None: + """Run when the run slot exits with ``ExitIntent.DELETE``.""" - def enter(self, control: LayerControl[_RuntimeStateT, _RuntimeHandlesT]) -> AbstractAsyncContextManager[None]: - """Return the layer's async entry context manager. + async def on_context_suspend(self) -> None: + """Run when the run slot exits with ``ExitIntent.SUSPEND``.""" - ``control`` is the lifecycle control slot for this entry. Subclasses can - override this for unusual wrapping, but ordinary live resources should be - registered with ``control.enter_async_resource`` from lifecycle hooks. - """ - return self.lifecycle_enter(control) - - @asynccontextmanager - async def lifecycle_enter(self, control: LayerControl[_RuntimeStateT, _RuntimeHandlesT]) -> AsyncIterator[None]: - """Run the default explicit lifecycle and resource stack for one entry. - - Exit state is recorded even when suspend/delete hooks fail because the - active entry is over once hook cleanup begins. - """ - if control.state is LifecycleState.ACTIVE: - raise RuntimeError( - "LayerControl is already active; duplicate or nested enter is not allowed." - ) - if control.state is LifecycleState.CLOSED: - raise RuntimeError( - "LayerControl is closed; create a new compositor session before entering again." - ) - - control._begin_entry_stack() - try: - if control.state is LifecycleState.NEW: - control.exit_intent = ExitIntent.DELETE - await self.on_context_create(control) - control.state = LifecycleState.ACTIVE - elif control.state is LifecycleState.SUSPENDED: - control.exit_intent = ExitIntent.DELETE - await self.on_context_resume(control) - control.state = LifecycleState.ACTIVE - except BaseException: - await control._close_entry_stack() - raise - - try: - yield - finally: - hook_error: BaseException | None = None - if control.exit_intent is ExitIntent.SUSPEND: - try: - await self.on_context_suspend(control) - except BaseException as exc: - hook_error = exc - finally: - control.state = LifecycleState.SUSPENDED - else: - try: - await self.on_context_delete(control) - except BaseException as exc: - hook_error = exc - finally: - control.state = LifecycleState.CLOSED - - try: - await control._close_entry_stack() - except BaseException: - if hook_error is not None: - raise hook_error - raise - if hook_error is not None: - raise hook_error - - async def on_context_create(self, control: LayerControl[_RuntimeStateT, _RuntimeHandlesT]) -> None: - """Run when the layer context is entered from ``LifecycleState.NEW``.""" - - async def on_context_delete(self, control: LayerControl[_RuntimeStateT, _RuntimeHandlesT]) -> None: - """Run when the layer context exits with ``ExitIntent.DELETE``.""" - - async def on_context_suspend(self, control: LayerControl[_RuntimeStateT, _RuntimeHandlesT]) -> None: - """Run when the layer context exits with ``ExitIntent.SUSPEND``.""" - - async def on_context_resume(self, control: LayerControl[_RuntimeStateT, _RuntimeHandlesT]) -> None: - """Run when the layer context enters from ``LifecycleState.SUSPENDED``.""" + async def on_context_resume(self) -> None: + """Run when the run slot enters from ``LifecycleState.SUSPENDED``.""" @property def prefix_prompts(self) -> Sequence[_PromptT]: @@ -563,17 +299,17 @@ class Layer( @abstractmethod def wrap_prompt(self, prompt: _PromptT) -> object: - """Wrap a native prompt item for compositor aggregation.""" + """Wrap a native prompt item for run-level aggregation.""" raise NotImplementedError @abstractmethod def wrap_user_prompt(self, prompt: _UserPromptT) -> object: - """Wrap a native user prompt item for compositor aggregation.""" + """Wrap a native user prompt item for run-level aggregation.""" raise NotImplementedError @abstractmethod def wrap_tool(self, tool: _ToolT) -> object: - """Wrap a native tool item for compositor aggregation.""" + """Wrap a native tool item for run-level aggregation.""" raise NotImplementedError @@ -606,14 +342,14 @@ def _as_dep_spec(annotation: object) -> LayerDepSpec | None: return LayerDepSpec(layer_type=layer_type) -def _as_layer_type(annotation: object) -> type[Layer[Any, Any, Any, Any, Any, Any, Any]] | None: +def _as_layer_type(annotation: object) -> type[Layer[Any, Any, Any, Any, Any, Any]] | None: runtime_type = get_origin(annotation) or annotation if isinstance(runtime_type, type) and issubclass(runtime_type, Layer): - return cast(type[Layer[Any, Any, Any, Any, Any, Any, Any]], runtime_type) + return cast(type[Layer[Any, Any, Any, Any, Any, Any]], runtime_type) return None -def _infer_deps_type(layer_type: type[Layer[Any, Any, Any, Any, Any, Any, Any]]) -> type[LayerDeps] | None: +def _infer_deps_type(layer_type: type[Layer[Any, Any, Any, Any, Any, Any]]) -> type[LayerDeps] | None: inferred = _infer_layer_generic_arg(layer_type, 0, {}) if inferred is None: return None @@ -621,7 +357,7 @@ def _infer_deps_type(layer_type: type[Layer[Any, Any, Any, Any, Any, Any, Any]]) def _infer_schema_type( - layer_type: type[Layer[Any, Any, Any, Any, Any, Any, Any]], + layer_type: type[Layer[Any, Any, Any, Any, Any, Any]], index: int, attr_name: str, ) -> type[BaseModel] | None: @@ -634,7 +370,7 @@ def _infer_schema_type( return schema_type -def _infer_config_type(layer_type: type[Layer[Any, Any, Any, Any, Any, Any, Any]]) -> type[LayerConfig] | None: +def _infer_config_type(layer_type: type[Layer[Any, Any, Any, Any, Any, Any]]) -> type[LayerConfig] | None: inferred = _infer_schema_generic_arg(layer_type, "config_type", {}) or _infer_layer_generic_arg(layer_type, 4, {}) if inferred is None: return None @@ -645,7 +381,7 @@ def _infer_config_type(layer_type: type[Layer[Any, Any, Any, Any, Any, Any, Any] def _infer_schema_generic_arg( - layer_type: type[Layer[Any, Any, Any, Any, Any, Any, Any]], + layer_type: type[Layer[Any, Any, Any, Any, Any, Any]], attr_name: str, substitutions: Mapping[object, object], ) -> object | None: @@ -653,7 +389,6 @@ def _infer_schema_generic_arg( expected_names = { "config_type": {"ConfigT", "_ConfigT"}, "runtime_state_type": {"RuntimeStateT", "_RuntimeStateT"}, - "runtime_handles_type": {"RuntimeHandlesT", "_RuntimeHandlesT"}, }[attr_name] for base in getattr(layer_type, "__orig_bases__", ()): origin = get_origin(base) or base @@ -675,7 +410,7 @@ def _infer_schema_generic_arg( def _infer_layer_generic_arg( - layer_type: type[Layer[Any, Any, Any, Any, Any, Any, Any]], + layer_type: type[Layer[Any, Any, Any, Any, Any, Any]], index: int, substitutions: Mapping[object, object], ) -> object | None: @@ -704,7 +439,7 @@ def _infer_layer_generic_arg( def _init_schema_type( - layer_type: type[Layer[Any, Any, Any, Any, Any, Any, Any]], + layer_type: type[Layer[Any, Any, Any, Any, Any, Any]], attr_name: str, inferred_schema_type: type[BaseModel] | None, default_schema_type: type[BaseModel], @@ -718,7 +453,7 @@ def _init_schema_type( def _init_config_type( - layer_type: type[Layer[Any, Any, Any, Any, Any, Any, Any]], + layer_type: type[Layer[Any, Any, Any, Any, Any, Any]], inferred_config_type: type[LayerConfig] | None, ) -> None: config_type = layer_type.__dict__.get("config_type") @@ -784,7 +519,5 @@ def _as_config_type(value: object) -> type[LayerConfig] | None: return None -def _is_generic_layer_template(layer_type: type[Layer[Any, Any, Any, Any, Any, Any, Any]]) -> bool: - return bool(getattr(layer_type, "__type_params__", ())) or bool( - getattr(layer_type, "__parameters__", ()) - ) +def _is_generic_layer_template(layer_type: type[Layer[Any, Any, Any, Any, Any, Any]]) -> bool: + return bool(getattr(layer_type, "__type_params__", ())) or bool(getattr(layer_type, "__parameters__", ())) diff --git a/dify-agent/src/agenton/layers/types.py b/dify-agent/src/agenton/layers/types.py index 2c9729d68b..e6133dcd55 100644 --- a/dify-agent/src/agenton/layers/types.py +++ b/dify-agent/src/agenton/layers/types.py @@ -4,10 +4,11 @@ that bind its system prompt, user prompt, and tool generic slots to concrete contracts, such as ordinary strings with plain callable tools or pydantic-ai prompt/tool shapes. The families keep the trailing schema generic slots open so -concrete layers can have ``config_type``, ``runtime_state_type``, and -``runtime_handles_type`` inferred from type arguments instead of repeated class -attributes. Config schemas use ``LayerConfig`` so they can also be embedded as -typed DTOs in serializable compositor config. +concrete layers can have ``config_type`` and ``runtime_state_type`` inferred from +type arguments instead of repeated class attributes. Config schemas use +``LayerConfig`` so they can also be embedded as +typed DTOs in serializable compositor config. Agenton core is state-only: +typed layer families do not expose runtime handle schemas or resource ownership. Tagged aggregate aliases cover code paths that can accept any supported prompt/tool family without changing the plain and pydantic-ai layer contracts. Pydantic-ai names are imported for static analysis only, so ``agenton`` can be @@ -30,7 +31,7 @@ if TYPE_CHECKING: from pydantic import BaseModel -from agenton.layers.base import EmptyLayerConfig, EmptyRuntimeHandles, EmptyRuntimeState, Layer, LayerConfig, LayerDeps +from agenton.layers.base import EmptyLayerConfig, EmptyRuntimeState, Layer, LayerConfig, LayerDeps type PlainPrompt = str type PlainUserPrompt = str @@ -98,12 +99,11 @@ type AllToolTypes = PlainToolType | PydanticAIToolType[Any] _DepsT = TypeVar("_DepsT", bound=LayerDeps) _ConfigT = TypeVar("_ConfigT", bound=LayerConfig, default=EmptyLayerConfig) _RuntimeStateT = TypeVar("_RuntimeStateT", bound=BaseModel, default=EmptyRuntimeState) -_RuntimeHandlesT = TypeVar("_RuntimeHandlesT", bound=BaseModel, default=EmptyRuntimeHandles) _AgentDepsT = TypeVar("_AgentDepsT") class PlainLayer( - Generic[_DepsT, _ConfigT, _RuntimeStateT, _RuntimeHandlesT], + Generic[_DepsT, _ConfigT, _RuntimeStateT], Layer[ _DepsT, PlainPrompt, @@ -111,7 +111,6 @@ class PlainLayer( PlainTool, _ConfigT, _RuntimeStateT, - _RuntimeHandlesT, ], ): """Layer base for ordinary string prompts and plain-callable tools.""" @@ -133,7 +132,7 @@ class PlainLayer( class PydanticAILayer( - Generic[_DepsT, _AgentDepsT, _ConfigT, _RuntimeStateT, _RuntimeHandlesT], + Generic[_DepsT, _AgentDepsT, _ConfigT, _RuntimeStateT], Layer[ _DepsT, PydanticAIPrompt[_AgentDepsT], @@ -141,7 +140,6 @@ class PydanticAILayer( PydanticAITool[_AgentDepsT], _ConfigT, _RuntimeStateT, - _RuntimeHandlesT, ], ): """Layer base for pydantic-ai prompt and tool adapters.""" diff --git a/dify-agent/src/agenton_collections/layers/plain/basic.py b/dify-agent/src/agenton_collections/layers/plain/basic.py index 6c10724175..faeecc096e 100644 --- a/dify-agent/src/agenton_collections/layers/plain/basic.py +++ b/dify-agent/src/agenton_collections/layers/plain/basic.py @@ -32,7 +32,8 @@ class ObjectLayer[ObjectT](PlainLayer[NoLayerDeps]): """Layer that stores one typed object for downstream dependencies. Object layers are instance-only because arbitrary Python objects are not - serializable graph config. Add them with ``CompositorBuilder.add_instance``. + serializable graph config. Add them with a custom ``LayerProvider`` factory + that creates a fresh object layer for each compositor run. """ value: ObjectT @@ -79,7 +80,8 @@ class ToolsLayer(PlainLayer[NoLayerDeps]): """Layer that contributes configured plain-callable tools. Tool layers are instance-only because Python callables are live objects. Add - them with ``CompositorBuilder.add_instance``. + them with a custom ``LayerProvider`` factory that returns a fresh layer for + each compositor run. """ tool_entries: Sequence[Callable[..., Any]] = () diff --git a/dify-agent/tests/local/agenton/compositor/test_builder_snapshot.py b/dify-agent/tests/local/agenton/compositor/test_builder_snapshot.py index 585c67e854..a946520d13 100644 --- a/dify-agent/tests/local/agenton/compositor/test_builder_snapshot.py +++ b/dify-agent/tests/local/agenton/compositor/test_builder_snapshot.py @@ -1,130 +1,138 @@ import asyncio -from collections import OrderedDict from dataclasses import dataclass +import pytest from pydantic import BaseModel, ConfigDict, ValidationError from typing_extensions import override -from agenton.compositor import Compositor, CompositorBuilder, CompositorSession, LayerNodeConfig, LayerRegistry -from agenton.layers import EmptyLayerConfig, LayerControl, LayerDeps, NoLayerDeps, PlainLayer, PlainPromptType, PlainToolType +import agenton.compositor as compositor_module +import agenton.layers as layers_module +from agenton.compositor import Compositor, LayerNode, LayerNodeConfig, LayerProvider +from agenton.layers import EmptyLayerConfig, Layer, LayerDeps, NoLayerDeps, PlainLayer from agenton_collections.layers.plain import ObjectLayer, PromptLayer, PromptLayerConfig -def test_registry_infers_descriptor_and_rejects_duplicate_or_missing_type_id() -> None: - registry = LayerRegistry() - registry.register_layer(PromptLayer) - - descriptor = registry.resolve("plain.prompt") - assert descriptor.layer_type is PromptLayer - assert descriptor.config_type is PromptLayer.config_type - - try: - registry.register_layer(PromptLayer) - except ValueError as e: - assert str(e) == "Layer type id 'plain.prompt' is already registered." - else: - raise AssertionError("Expected ValueError.") - - try: - registry.register_layer(InstanceOnlyLayer) - except ValueError as e: - assert "must declare a type_id" in str(e) - else: - raise AssertionError("Expected ValueError.") - - try: - registry.register_layer(InstanceOnlyLayer, type_id=123) # pyright: ignore[reportArgumentType] - except TypeError as e: - assert str(e) == "Layer type id for 'InstanceOnlyLayer' must be a string." - else: - raise AssertionError("Expected TypeError.") - - @dataclass(slots=True) class InstanceOnlyLayer(PlainLayer[NoLayerDeps]): pass -def test_builder_creates_config_layers_with_typed_validation() -> None: - registry = LayerRegistry() - registry.register_layer(PromptLayer) +@dataclass(slots=True) +class RequiredConstructorLayer(PlainLayer[NoLayerDeps]): + value: str - compositor = ( - CompositorBuilder(registry) - .add_config_layer( - name="prompt", - type="plain.prompt", - config={"prefix": "hello", "user": "ask politely", "suffix": ["bye"]}, - ) - .build() + +def test_layer_provider_from_layer_type_uses_declared_schema_and_type_id() -> None: + provider = LayerProvider.from_layer_type(PromptLayer) + + assert provider.type_id == "plain.prompt" + assert provider.layer_type is PromptLayer + + layer = provider.create_layer(PromptLayerConfig(prefix="hello", user="ask politely")) + + assert isinstance(layer, PromptLayer) + assert layer.config == PromptLayerConfig(prefix="hello", user="ask politely") + assert layer.prefix_prompts == ["hello"] + + with pytest.raises(TypeError, match="cannot be created from empty config"): + LayerProvider.from_layer_type(RequiredConstructorLayer).create_layer() + + +def test_compositor_from_config_uses_providers_and_enter_configs_by_node_name() -> None: + compositor = Compositor.from_config( + {"layers": [{"name": "prompt", "type": "plain.prompt"}]}, + providers=[PromptLayer], ) - assert [prompt.value for prompt in compositor.prompts] == ["hello", "bye"] - assert [prompt.value for prompt in compositor.user_prompts] == ["ask politely"] + async def run() -> None: + async with compositor.enter( + configs={"prompt": {"prefix": "hello", "user": "ask politely", "suffix": ["bye"]}} + ) as active_run: + assert [prompt.value for prompt in active_run.prompts] == ["hello", "bye"] + assert [prompt.value for prompt in active_run.user_prompts] == ["ask politely"] - try: - CompositorBuilder(registry).add_config_layer( - name="bad", - type="plain.prompt", - config={"unknown": "field"}, - ) - except ValidationError: - pass - else: - raise AssertionError("Expected ValidationError.") + asyncio.run(run()) + + with pytest.raises(ValidationError): + asyncio.run(_enter_once(compositor, configs={"prompt": {"unknown": "field"}})) -def test_layer_node_config_accepts_config_dto_and_serializes_fields() -> None: - registry = LayerRegistry() - registry.register_layer(PromptLayer) +def test_layer_node_config_has_no_runtime_state_or_layer_config() -> None: node = LayerNodeConfig( name="prompt", type="plain.prompt", - config=PromptLayerConfig(prefix="hello", user="ask politely"), + deps={"source": "other"}, + metadata={"label": "Prompt"}, ) - dumped = node.model_dump(mode="json") - compositor = CompositorBuilder(registry).add_config({"layers": [dumped]}).build() - - assert dumped["config"] == {"prefix": "hello", "user": "ask politely", "suffix": []} - assert [prompt.value for prompt in compositor.prompts] == ["hello"] - assert [prompt.value for prompt in compositor.user_prompts] == ["ask politely"] + assert node.model_dump(mode="json") == { + "name": "prompt", + "type": "plain.prompt", + "deps": {"source": "other"}, + "metadata": {"label": "Prompt"}, + } + assert "runtime_state" not in LayerNodeConfig.model_fields + assert "config" not in LayerNodeConfig.model_fields -def test_registry_factory_constructs_layer_with_injected_dependencies() -> None: - registry = LayerRegistry() - registry.register_layer( - PromptLayer, - factory=lambda config: PromptLayer(prefix=PromptLayerConfig.model_validate(config).prefix), +def test_node_providers_override_type_id_providers_for_serializable_graphs() -> None: + override_provider = LayerProvider.from_factory( + layer_type=PromptLayer, + create=lambda config: PromptLayer(prefix="override"), + ) + compositor = Compositor.from_config( + {"layers": [{"name": "prompt", "type": "plain.prompt"}]}, + providers=[PromptLayer], + node_providers={"prompt": override_provider}, ) - compositor = CompositorBuilder(registry).add_config( - {"layers": [{"name": "prompt", "type": "plain.prompt", "config": {"prefix": "factory"}}]} - ).build() + async def run() -> None: + async with compositor.enter(configs={"prompt": {"prefix": "ignored"}}) as active_run: + assert [prompt.value for prompt in active_run.prompts] == ["override"] - assert [prompt.value for prompt in compositor.prompts] == ["factory"] + asyncio.run(run()) -def test_compositor_get_layer_returns_named_layer_and_validates_type() -> None: - layer = ObjectLayer("value") - compositor: Compositor[PlainPromptType, PlainToolType] = Compositor(layers=OrderedDict([("obj", layer)])) +def test_from_config_rejects_missing_duplicate_and_unknown_providers() -> None: + with pytest.raises(KeyError, match="Layer type id 'missing' is not registered"): + Compositor.from_config({"layers": [{"name": "node", "type": "missing"}]}, providers=[]) - assert compositor.get_layer("obj") is layer - assert compositor.get_layer("obj", ObjectLayer) is layer + with pytest.raises(ValueError, match="already registered"): + Compositor.from_config( + {"layers": [{"name": "prompt", "type": "plain.prompt"}]}, + providers=[PromptLayer, PromptLayer], + ) - try: - compositor.get_layer("missing") - except KeyError as e: - assert str(e) == '"Layer \'missing\' is not defined in this compositor."' - else: - raise AssertionError("Expected KeyError.") + with pytest.raises(ValueError, match="must declare a type_id"): + Compositor.from_config( + {"layers": [{"name": "node", "type": "instance.only"}]}, + providers=[InstanceOnlyLayer], + ) - try: - compositor.get_layer("obj", PromptLayer) - except TypeError as e: - assert str(e) == "Layer 'obj' must be PromptLayer, got ObjectLayer." - else: - raise AssertionError("Expected TypeError.") + with pytest.raises(ValueError, match="unknown layer node names: other"): + Compositor.from_config( + {"layers": [{"name": "prompt", "type": "plain.prompt"}]}, + providers=[PromptLayer], + node_providers={"other": PromptLayer}, + ) + + +def test_compositor_run_get_layer_returns_named_layer_and_validates_type() -> None: + compositor = Compositor([LayerNode("obj", _object_provider("value"))]) + + async def run() -> None: + async with compositor.enter() as active_run: + layer = active_run.get_layer("obj", ObjectLayer) + assert active_run.get_layer("obj") is layer + assert layer.value == "value" + + with pytest.raises(KeyError, match="Layer 'missing' is not defined"): + active_run.get_layer("missing") + + with pytest.raises(TypeError, match="Layer 'obj' must be PromptLayer, got ObjectLayer"): + active_run.get_layer("obj", PromptLayer) + + asyncio.run(run()) class ObjectConsumerDeps(LayerDeps): @@ -139,176 +147,137 @@ class ObjectConsumerLayer(PlainLayer[ObjectConsumerDeps]): return [self.deps.obj.value] -def test_builder_mixes_config_and_instances_and_rejects_invalid_deps() -> None: - registry = LayerRegistry() - registry.register_layer(PromptLayer) - - compositor = ( - CompositorBuilder(registry) - .add_config({"layers": [{"name": "prompt", "type": "plain.prompt", "config": {"prefix": "cfg"}}]}) - .add_instance(name="obj", layer=ObjectLayer("instance")) - .add_instance(name="consumer", layer=ObjectConsumerLayer(), deps={"obj": "obj"}) - .build() +def test_python_native_construction_mixes_layer_classes_and_providers() -> None: + compositor = Compositor( + [ + LayerNode("prompt", PromptLayer), + LayerNode("obj", _object_provider("instance")), + LayerNode("consumer", ObjectConsumerLayer, deps={"obj": "obj"}), + ] ) - assert [prompt.value for prompt in compositor.prompts] == ["cfg", "instance"] + async def run() -> None: + async with compositor.enter(configs={"prompt": {"prefix": "cfg"}}) as active_run: + assert [prompt.value for prompt in active_run.prompts] == ["cfg", "instance"] - try: - CompositorBuilder(registry).add_instance( - name="consumer", - layer=ObjectConsumerLayer(), - deps={"missing_dep_key": "obj"}, - ).build() - except ValueError as e: - assert str(e) == "Layer 'consumer' declares unknown dependency keys: missing_dep_key." - else: - raise AssertionError("Expected ValueError.") - - try: - CompositorBuilder(registry).add_instance( - name="consumer", - layer=ObjectConsumerLayer(), - deps={"obj": "missing_target"}, - ).build() - except ValueError as e: - assert str(e) == "Layer 'consumer' depends on undefined layer names: missing_target." - else: - raise AssertionError("Expected ValueError.") + asyncio.run(run()) -class HandleState(BaseModel): +class SerializableState(BaseModel): resource_id: str = "" + created: bool = False + resumed: bool = False model_config = ConfigDict(extra="forbid", validate_assignment=True) -class HandleBox: - def __init__(self, value: str) -> None: - self.value = value - - -class HandleModels(BaseModel): - handle: HandleBox | None = None - - model_config = ConfigDict(extra="forbid", validate_assignment=True, arbitrary_types_allowed=True) - - @dataclass(slots=True) -class HandleLayer(PlainLayer[NoLayerDeps, EmptyLayerConfig, HandleState, HandleModels]): - created: int = 0 - resumed: int = 0 +class StateLayer(PlainLayer[NoLayerDeps, EmptyLayerConfig, SerializableState]): + created_hooks: int = 0 + resumed_hooks: int = 0 @override - async def on_context_create(self, control: LayerControl[HandleState, HandleModels]) -> None: - self.created += 1 - control.runtime_handles.handle = HandleBox(control.runtime_state.resource_id) + async def on_context_create(self) -> None: + self.created_hooks += 1 + self.runtime_state.created = True @override - async def on_context_resume(self, control: LayerControl[HandleState, HandleModels]) -> None: - self.resumed += 1 - control.runtime_handles.handle = HandleBox(f"resumed:{control.runtime_state.resource_id}") + async def on_context_resume(self) -> None: + self.resumed_hooks += 1 + self.runtime_state.resumed = True -def test_new_session_uses_layer_runtime_schemas() -> None: - compositor: Compositor[PlainPromptType, PlainToolType] = Compositor( - layers=OrderedDict([("handle", HandleLayer())]) - ) - session = compositor.new_session() +def test_snapshot_contains_runtime_state_only_not_config_deps_or_resources() -> None: + compositor = Compositor([LayerNode("state", StateLayer)]) - assert isinstance(session.layer("handle").runtime_state, HandleState) - assert isinstance(session.layer("handle").runtime_handles, HandleModels) + async def get_snapshot() -> dict[str, object]: + async with compositor.enter() as active_run: + state_layer = active_run.get_layer("state", StateLayer) + state_layer.runtime_state.resource_id = "abc" + assert active_run.session_snapshot is not None + return active_run.session_snapshot.model_dump(mode="json") - -def test_enter_rejects_bad_session_runtime_schemas_before_layer_hooks() -> None: - layer = HandleLayer() - compositor: Compositor[PlainPromptType, PlainToolType] = Compositor(layers=OrderedDict([("handle", layer)])) - bad_session = CompositorSession(OrderedDict([("handle", LayerControl())])) - - async def run() -> None: - async with compositor.enter(bad_session): - pass - - try: - asyncio.run(run()) - except TypeError as e: - assert str(e) == ( - "CompositorSession layer 'handle' runtime_state must be HandleState, " - "got EmptyRuntimeState." - ) - else: - raise AssertionError("Expected TypeError.") - - assert layer.created == 0 - - -def test_snapshot_rejects_active_sessions_and_excludes_handles() -> None: - compositor: Compositor[PlainPromptType, PlainToolType] = Compositor( - layers=OrderedDict([("handle", HandleLayer())]) - ) - session = compositor.session_from_snapshot( - {"layers": [{"name": "handle", "state": "new", "runtime_state": {"resource_id": "abc"}}]} - ) - - async def run() -> None: - async with compositor.enter(session): - try: - compositor.snapshot_session(session) - except RuntimeError as e: - assert str(e) == "Cannot snapshot active compositor session layers: handle." - else: - raise AssertionError("Expected RuntimeError.") - - asyncio.run(run()) - - snapshot = compositor.snapshot_session(session) - dumped = snapshot.model_dump(mode="json") + dumped = asyncio.run(get_snapshot()) assert dumped == { "schema_version": 1, - "layers": [{"name": "handle", "state": "closed", "runtime_state": {"resource_id": "abc"}}], + "layers": [ + { + "name": "state", + "lifecycle_state": "closed", + "runtime_state": {"resource_id": "abc", "created": True, "resumed": False}, + } + ], } - assert "_entry_stack" not in str(dumped) - assert "_owner" not in str(dumped) -def test_restore_validates_runtime_state_and_resume_rehydrates_handles() -> None: - layer = HandleLayer() - compositor: Compositor[PlainPromptType, PlainToolType] = Compositor(layers=OrderedDict([("handle", layer)])) +def test_hydrate_validates_runtime_state_and_resume_mutates_layer_self() -> None: + compositor = Compositor([LayerNode("state", StateLayer)]) - try: - compositor.session_from_snapshot( - {"layers": [{"name": "handle", "state": "suspended", "runtime_state": {"wrong": "field"}}]} - ) - except ValidationError: - pass - else: - raise AssertionError("Expected ValidationError.") + bad_snapshot = {"layers": [{"name": "state", "lifecycle_state": "suspended", "runtime_state": {"wrong": "field"}}]} + with pytest.raises(ValidationError): + asyncio.run(_enter_once(compositor, session_snapshot=bad_snapshot)) - restored = compositor.session_from_snapshot( - {"layers": [{"name": "handle", "state": "suspended", "runtime_state": {"resource_id": "abc"}}]} - ) + good_snapshot = { + "layers": [ + { + "name": "state", + "lifecycle_state": "suspended", + "runtime_state": {"resource_id": "abc", "created": True, "resumed": False}, + } + ] + } async def run() -> None: - async with compositor.enter(restored): - control = restored.layer("handle") - assert isinstance(control.runtime_handles, HandleModels) - assert control.runtime_handles.handle is not None - assert control.runtime_handles.handle.value == "resumed:abc" + async with compositor.enter(session_snapshot=good_snapshot) as active_run: + layer = active_run.get_layer("state", StateLayer) + assert layer.runtime_state.resource_id == "abc" + assert layer.runtime_state.resumed is True + assert layer.resumed_hooks == 1 asyncio.run(run()) - assert layer.resumed == 1 +def test_hydrate_rejects_mismatched_snapshot_layer_names() -> None: + compositor = Compositor([LayerNode("state", StateLayer)]) -def test_session_from_snapshot_rejects_active_layer_state() -> None: - compositor: Compositor[PlainPromptType, PlainToolType] = Compositor( - layers=OrderedDict([("handle", HandleLayer())]) - ) - - try: - compositor.session_from_snapshot( - {"layers": [{"name": "handle", "state": "active", "runtime_state": {"resource_id": "abc"}}]} + with pytest.raises(ValueError, match=r"Expected \[state\], got \[other\]"): + asyncio.run( + _enter_once( + compositor, + session_snapshot={"layers": [{"name": "other", "lifecycle_state": "new", "runtime_state": {}}]}, + ) ) - except ValueError as e: - assert str(e) == "Cannot restore active compositor session layers from snapshot: handle." - else: - raise AssertionError("Expected ValueError.") + + +def test_removed_lifecycle_and_resource_apis_are_not_public_exports() -> None: + assert not hasattr(compositor_module, "CompositorBuilder") + assert not hasattr(compositor_module, "LayerRegistry") + assert not hasattr(compositor_module, "LayerDescriptor") + assert not hasattr(layers_module, "LayerControl") + assert not hasattr(layers_module, "EmptyRuntimeHandles") + assert not hasattr(Layer, "enter") + assert not hasattr(Layer, "hydrate_session_state") + assert not hasattr(Layer, "suspend_on_exit") + assert not hasattr(Layer, "delete_on_exit") + assert not hasattr(Layer, "runtime_handles") + assert not hasattr(Layer, "require_control") + assert not hasattr(Layer, "control_for") + assert not hasattr(Layer, "enter_async_resource") + assert not hasattr(Layer, "add_async_cleanup") + + +def _object_provider(value: str) -> LayerProvider[ObjectLayer[str]]: + return LayerProvider.from_factory(layer_type=ObjectLayer, create=lambda config: ObjectLayer(value)) + + +async def _enter_once( + compositor: Compositor, + *, + configs: dict[str, object] | None = None, + session_snapshot: object | None = None, +) -> None: + async with compositor.enter( + configs=configs, # pyright: ignore[reportArgumentType] + session_snapshot=session_snapshot, # pyright: ignore[reportArgumentType] + ): + pass diff --git a/dify-agent/tests/local/agenton/compositor/test_control_deps.py b/dify-agent/tests/local/agenton/compositor/test_control_deps.py deleted file mode 100644 index 400df113de..0000000000 --- a/dify-agent/tests/local/agenton/compositor/test_control_deps.py +++ /dev/null @@ -1,222 +0,0 @@ -import asyncio -from collections import OrderedDict -from dataclasses import dataclass - -import pytest -from typing_extensions import override - -from agenton.compositor import Compositor, CompositorSession -from agenton.layers import LayerControl, LayerDeps, PlainLayer, PlainPromptType, PlainToolType -from agenton_collections.layers.plain import ObjectLayer - - -class RenamedObjectDeps(LayerDeps): - renamed: ObjectLayer[str] # pyright: ignore[reportUninitializedInstanceVariable] - - -@dataclass(slots=True) -class RenamedConsumerLayer(PlainLayer[RenamedObjectDeps]): - @property - @override - def prefix_prompts(self) -> list[str]: - return [self.deps.renamed.value] - - -class SameNameObjectDeps(LayerDeps): - same: ObjectLayer[str] # pyright: ignore[reportUninitializedInstanceVariable] - - -@dataclass(slots=True) -class SameNameConsumerLayer(PlainLayer[SameNameObjectDeps]): - pass - - -class DoubleObjectDeps(LayerDeps): - first: ObjectLayer[str] # pyright: ignore[reportUninitializedInstanceVariable] - second: ObjectLayer[str] # pyright: ignore[reportUninitializedInstanceVariable] - - -@dataclass(slots=True) -class DoubleConsumerLayer(PlainLayer[DoubleObjectDeps]): - pass - - -class OptionalObjectDeps(LayerDeps): - maybe: ObjectLayer[str] | None # pyright: ignore[reportUninitializedInstanceVariable] - - -@dataclass(slots=True) -class OptionalConsumerLayer(PlainLayer[OptionalObjectDeps]): - pass - - -def test_control_for_layer_resolves_unique_explicit_dependency_rename() -> None: - target = ObjectLayer("target") - consumer = RenamedConsumerLayer() - compositor: Compositor[PlainPromptType, PlainToolType] = Compositor( - layers=OrderedDict([("actual", target), ("consumer", consumer)]), - deps_name_mapping={"consumer": {"renamed": "actual"}}, - ) - session = compositor.new_session() - - resolved = session.layer("consumer").control_for(target) - - assert resolved is session.layer("actual") - assert consumer.prefix_prompts == ["target"] - - -def test_control_for_layer_resolves_unique_implicit_same_name_dependency() -> None: - target = ObjectLayer("target") - compositor: Compositor[PlainPromptType, PlainToolType] = Compositor( - layers=OrderedDict([("same", target), ("consumer", SameNameConsumerLayer())]), - ) - session = compositor.new_session() - - assert session.layer("consumer").control_for(target) is session.layer("same") - - -def test_control_for_layer_raises_when_no_dependency_points_to_layer() -> None: - target = ObjectLayer("target") - unrelated = ObjectLayer("unrelated") - compositor: Compositor[PlainPromptType, PlainToolType] = Compositor( - layers=OrderedDict([("target", target), ("unrelated", unrelated), ("consumer", RenamedConsumerLayer())]), - deps_name_mapping={"consumer": {"renamed": "target"}}, - ) - session = compositor.new_session() - - with pytest.raises(KeyError, match="no dependency target.*provided ObjectLayer instance"): - _ = session.layer("consumer").control_for(unrelated) - - -def test_control_for_layer_raises_when_multiple_dependency_fields_match() -> None: - target = ObjectLayer("target") - compositor: Compositor[PlainPromptType, PlainToolType] = Compositor( - layers=OrderedDict([("target", target), ("consumer", DoubleConsumerLayer())]), - deps_name_mapping={"consumer": {"first": "target", "second": "target"}}, - ) - session = compositor.new_session() - - with pytest.raises(ValueError, match="multiple dependency fields.*Pass dep_name explicitly"): - _ = session.layer("consumer").control_for(target) - - -def test_control_for_explicit_dep_name_disambiguates_multiple_deps() -> None: - target = ObjectLayer("target") - compositor: Compositor[PlainPromptType, PlainToolType] = Compositor( - layers=OrderedDict([("target", target), ("consumer", DoubleConsumerLayer())]), - deps_name_mapping={"consumer": {"first": "target", "second": "target"}}, - ) - session = compositor.new_session() - - assert session.layer("consumer").control_for("second", target) is session.layer("target") - - -def test_control_for_optional_missing_dependency_raises() -> None: - target = ObjectLayer("target") - consumer = OptionalConsumerLayer() - compositor: Compositor[PlainPromptType, PlainToolType] = Compositor( - layers=OrderedDict([("consumer", consumer)]), - ) - session = compositor.new_session() - - assert consumer.deps.maybe is None - with pytest.raises(KeyError, match="dependency 'maybe' is not bound"): - _ = session.layer("consumer").control_for("maybe", target) - - -def test_restored_session_rebinds_owner_links_for_control_for() -> None: - target = ObjectLayer("target") - compositor: Compositor[PlainPromptType, PlainToolType] = Compositor( - layers=OrderedDict([("actual", target), ("consumer", RenamedConsumerLayer())]), - deps_name_mapping={"consumer": {"renamed": "actual"}}, - ) - session = compositor.new_session() - - async def suspend_session() -> None: - async with compositor.enter(session) as active_session: - active_session.suspend_on_exit() - - asyncio.run(suspend_session()) - restored = compositor.session_from_snapshot(compositor.snapshot_session(session)) - - assert restored.layer("consumer").control_for(target) is restored.layer("actual") - - -def test_enter_rebinds_external_session_owner_links_for_control_for() -> None: - target = ObjectLayer("target") - consumer = RenamedConsumerLayer() - compositor: Compositor[PlainPromptType, PlainToolType] = Compositor( - layers=OrderedDict([("actual", target), ("consumer", consumer)]), - deps_name_mapping={"consumer": {"renamed": "actual"}}, - ) - external_session = CompositorSession( - OrderedDict([("actual", target.new_control()), ("consumer", consumer.new_control())]) - ) - - async def enter_session() -> None: - async with compositor.enter(external_session) as active_session: - assert active_session.layer("consumer").control_for(target) is active_session.layer("actual") - - asyncio.run(enter_session()) - - -def test_failed_enter_does_not_rebind_active_session_owner_links() -> None: - first_target = ObjectLayer("first") - second_target = ObjectLayer("second") - first_compositor: Compositor[PlainPromptType, PlainToolType] = Compositor( - layers=OrderedDict([("actual", first_target), ("consumer", RenamedConsumerLayer())]), - deps_name_mapping={"consumer": {"renamed": "actual"}}, - ) - second_compositor: Compositor[PlainPromptType, PlainToolType] = Compositor( - layers=OrderedDict([("actual", second_target), ("consumer", RenamedConsumerLayer())]), - deps_name_mapping={"consumer": {"renamed": "actual"}}, - ) - session = first_compositor.new_session() - - async def enter_conflicting_compositor() -> None: - async with first_compositor.enter(session) as active_session: - with pytest.raises(RuntimeError, match="already active"): - async with second_compositor.enter(active_session): - raise AssertionError("Expected active-session rejection before entering layers.") - - assert active_session.layer("consumer").control_for(first_target) is active_session.layer("actual") - - asyncio.run(enter_conflicting_compositor()) - - -def test_control_for_uses_owner_resolved_targets_not_graph_wide_object_identity() -> None: - shared_target = ObjectLayer("shared") - compositor: Compositor[PlainPromptType, PlainToolType] = Compositor( - layers=OrderedDict( - [ - ("first-id", shared_target), - ("second-id", shared_target), - ("consumer", RenamedConsumerLayer()), - ] - ), - deps_name_mapping={"consumer": {"renamed": "second-id"}}, - ) - session = compositor.new_session() - - resolved = session.layer("consumer").control_for(shared_target) - - assert resolved is session.layer("second-id") - assert resolved is not session.layer("first-id") - - -def test_control_for_explicit_dep_name_rejects_wrong_layer_instance() -> None: - target = ObjectLayer("target") - wrong = ObjectLayer("wrong") - compositor: Compositor[PlainPromptType, PlainToolType] = Compositor( - layers=OrderedDict([("target", target), ("wrong", wrong), ("consumer", RenamedConsumerLayer())]), - deps_name_mapping={"consumer": {"renamed": "target"}}, - ) - session = compositor.new_session() - - with pytest.raises(TypeError, match="dependency 'renamed'.*not the provided ObjectLayer instance"): - _ = session.layer("consumer").control_for("renamed", wrong) - - -def test_control_for_unowned_control_raises_clear_error() -> None: - with pytest.raises(RuntimeError, match="not attached to a compositor session"): - _ = LayerControl().control_for(ObjectLayer("target")) diff --git a/dify-agent/tests/local/agenton/compositor/test_direct_deps.py b/dify-agent/tests/local/agenton/compositor/test_direct_deps.py new file mode 100644 index 0000000000..d89d4bb7e2 --- /dev/null +++ b/dify-agent/tests/local/agenton/compositor/test_direct_deps.py @@ -0,0 +1,128 @@ +import asyncio +from dataclasses import dataclass + +import pytest +from typing_extensions import override + +from agenton.compositor import Compositor, LayerNode, LayerProvider +from agenton.layers import EmptyLayerConfig, LayerDeps, PlainLayer +from agenton_collections.layers.plain import ObjectLayer + + +class RenamedObjectDeps(LayerDeps): + renamed: ObjectLayer[str] # pyright: ignore[reportUninitializedInstanceVariable] + + +@dataclass(slots=True) +class RenamedConsumerLayer(PlainLayer[RenamedObjectDeps]): + @property + @override + def prefix_prompts(self) -> list[str]: + return [self.deps.renamed.value] + + +class SameNameObjectDeps(LayerDeps): + same: ObjectLayer[str] # pyright: ignore[reportUninitializedInstanceVariable] + + +@dataclass(slots=True) +class SameNameConsumerLayer(PlainLayer[SameNameObjectDeps]): + @property + @override + def prefix_prompts(self) -> list[str]: + return [self.deps.same.value] + + +class OptionalObjectDeps(LayerDeps): + maybe: ObjectLayer[str] | None # pyright: ignore[reportUninitializedInstanceVariable] + + +@dataclass(slots=True) +class OptionalConsumerLayer(PlainLayer[OptionalObjectDeps]): + pass + + +def _object_provider(value: str) -> LayerProvider[ObjectLayer[str]]: + return LayerProvider.from_factory( + layer_type=ObjectLayer, + create=lambda config: ObjectLayer(value), + ) + + +def test_direct_deps_access_uses_explicit_dependency_rename() -> None: + compositor = Compositor( + [ + LayerNode("actual", _object_provider("target")), + LayerNode("consumer", RenamedConsumerLayer, deps={"renamed": "actual"}), + ] + ) + + async def run() -> None: + async with compositor.enter() as active_run: + target = active_run.get_layer("actual", ObjectLayer) + consumer = active_run.get_layer("consumer", RenamedConsumerLayer) + assert consumer.deps.renamed is target + assert [prompt.value for prompt in active_run.prompts] == ["target"] + + asyncio.run(run()) + + +def test_direct_deps_access_uses_explicit_same_name_dependency() -> None: + compositor = Compositor( + [ + LayerNode("same", _object_provider("target")), + LayerNode("consumer", SameNameConsumerLayer, deps={"same": "same"}), + ] + ) + + async def run() -> None: + async with compositor.enter() as active_run: + target = active_run.get_layer("same", ObjectLayer) + consumer = active_run.get_layer("consumer", SameNameConsumerLayer) + assert consumer.deps.same is target + assert [prompt.value for prompt in active_run.prompts] == ["target"] + + asyncio.run(run()) + + +def test_optional_missing_dependency_is_bound_to_none() -> None: + compositor = Compositor([LayerNode("consumer", OptionalConsumerLayer)]) + + async def run() -> None: + async with compositor.enter() as active_run: + consumer = active_run.get_layer("consumer", OptionalConsumerLayer) + assert consumer.deps.maybe is None + + asyncio.run(run()) + + +def test_missing_required_dependency_is_rejected_before_hooks() -> None: + compositor = Compositor([LayerNode("consumer", SameNameConsumerLayer)]) + + with pytest.raises(ValueError, match="Dependency 'same' is required"): + asyncio.run(_enter_once(compositor)) + + +def test_unknown_dependency_mapping_is_rejected_for_compositor_construction() -> None: + with pytest.raises(ValueError, match="unknown dependency keys: missing"): + Compositor([LayerNode("consumer", RenamedConsumerLayer, deps={"missing": "target"})]) + + +def test_undefined_dependency_target_is_rejected_for_compositor_construction() -> None: + with pytest.raises(ValueError, match="depends on undefined layer names: missing_target"): + Compositor([LayerNode("consumer", RenamedConsumerLayer, deps={"renamed": "missing_target"})]) + + +def test_duplicate_layer_node_name_is_rejected() -> None: + with pytest.raises(ValueError, match="Duplicate layer name 'same'"): + Compositor( + [ + LayerNode("same", _object_provider("first")), + LayerNode("same", _object_provider("second")), + ] + ) + + +async def _enter_once(compositor: Compositor) -> None: + async with compositor.enter(configs={"consumer": EmptyLayerConfig()}): + pass diff --git a/dify-agent/tests/local/agenton/compositor/test_enter.py b/dify-agent/tests/local/agenton/compositor/test_enter.py index 096e096eea..01228ac2a9 100644 --- a/dify-agent/tests/local/agenton/compositor/test_enter.py +++ b/dify-agent/tests/local/agenton/compositor/test_enter.py @@ -1,254 +1,266 @@ import asyncio -from collections import OrderedDict from collections.abc import Iterator from dataclasses import dataclass, field from itertools import count -from typing import cast import pytest -from pydantic import BaseModel, ConfigDict +from pydantic import BaseModel, ConfigDict, ValidationError from typing_extensions import override -from agenton.compositor import Compositor, CompositorSession +from agenton.compositor import Compositor, CompositorSessionSnapshot, LayerNode, LayerProvider from agenton.layers import ( - ExitIntent, EmptyLayerConfig, - EmptyRuntimeHandles, - EmptyRuntimeState, - LayerControl, + ExitIntent, + LayerConfig, LifecycleState, NoLayerDeps, PlainLayer, - PlainPromptType, - PlainToolType, ) @dataclass(slots=True) class TraceLayer(PlainLayer[NoLayerDeps]): - """Layer that records lifecycle events observable to tests.""" + """Layer that records no-arg lifecycle events observable to tests.""" events: list[str] = field(default_factory=list) @override - async def on_context_create(self, control: LayerControl) -> None: + async def on_context_create(self) -> None: self.events.append("create") @override - async def on_context_suspend(self, control: LayerControl) -> None: + async def on_context_suspend(self) -> None: self.events.append("suspend") @override - async def on_context_resume(self, control: LayerControl) -> None: + async def on_context_resume(self) -> None: self.events.append("resume") @override - async def on_context_delete(self, control: LayerControl) -> None: + async def on_context_delete(self) -> None: self.events.append("delete") -def _compositor(*layer_names: str) -> tuple[Compositor[PlainPromptType, PlainToolType], dict[str, TraceLayer]]: - layers = {layer_name: TraceLayer() for layer_name in layer_names} - compositor: Compositor[PlainPromptType, PlainToolType] = Compositor(layers=OrderedDict(layers.items())) - return compositor, layers +def _compositor(*layer_names: str) -> Compositor: + return Compositor([LayerNode(layer_name, TraceLayer) for layer_name in layer_names]) -def test_compositor_session_suspends_resumes_and_deletes_all_layers() -> None: - compositor, layers = _compositor("first", "second") - session = compositor.new_session() +def test_same_compositor_enters_multiple_times_with_fresh_layers_and_snapshot_resume() -> None: + compositor = _compositor("first", "second") + runs = [] async def run() -> None: - async with compositor.enter(session) as active_session: - assert active_session is session - assert list(active_session.layer_controls) == ["first", "second"] - active_session.suspend_on_exit() - assert active_session.layer("first").exit_intent is ExitIntent.SUSPEND + async with compositor.enter() as first_run: + assert [slot.lifecycle_state for slot in first_run.slots.values()] == [ + LifecycleState.ACTIVE, + LifecycleState.ACTIVE, + ] + first_run.suspend_on_exit() + assert [slot.exit_intent for slot in first_run.slots.values()] == [ + ExitIntent.SUSPEND, + ExitIntent.SUSPEND, + ] + runs.append(first_run) - assert session.layer("first").state is LifecycleState.SUSPENDED - - async with compositor.enter(session): - pass + assert first_run.session_snapshot is not None + async with compositor.enter(session_snapshot=first_run.session_snapshot) as resumed_run: + assert resumed_run.get_layer("first", TraceLayer).events == ["resume"] + assert resumed_run.get_layer("second", TraceLayer).events == ["resume"] + runs.append(resumed_run) asyncio.run(run()) - assert layers["first"].events == ["create", "suspend", "resume", "delete"] - assert layers["second"].events == ["create", "suspend", "resume", "delete"] - assert session.layer("first").state is LifecycleState.CLOSED + first_layer = runs[0].get_layer("first", TraceLayer) + resumed_layer = runs[1].get_layer("first", TraceLayer) + assert first_layer is not resumed_layer + assert first_layer.events == ["create", "suspend"] + assert resumed_layer.events == ["resume", "delete"] + assert runs[1].session_snapshot is not None + assert [layer.lifecycle_state for layer in runs[1].session_snapshot.layers] == [ + LifecycleState.CLOSED, + LifecycleState.CLOSED, + ] -def test_compositor_enter_without_session_uses_fresh_lifecycle_each_time() -> None: - compositor, layers = _compositor("trace") +def test_concurrent_enters_do_not_share_layer_instances() -> None: + compositor = _compositor("trace") + + async def enter_once() -> tuple[int, list[str]]: + async with compositor.enter() as run: + layer = run.get_layer("trace", TraceLayer) + await asyncio.sleep(0) + return id(layer), layer.events + + async def run_concurrently() -> list[tuple[int, list[str]]]: + return list(await asyncio.gather(enter_once(), enter_once())) + + results = asyncio.run(run_concurrently()) + + assert results[0][0] != results[1][0] + assert results[0][1] == ["create", "delete"] + assert results[1][1] == ["create", "delete"] + + +class ConfiguredLayerConfig(LayerConfig): + value: str + + model_config = ConfigDict(extra="forbid") + + +@dataclass(slots=True) +class ConfiguredLayer(PlainLayer[NoLayerDeps, ConfiguredLayerConfig]): + type_id = "test.configured" + + value: str + + hooks: list[str] = field(default_factory=list) + + @classmethod + @override + def from_config(cls, config: ConfiguredLayerConfig) -> "ConfiguredLayer": + return cls(value=config.value) + + @override + async def on_context_create(self) -> None: + self.hooks.append(f"create:{self.config.value}") + + +def test_custom_factory_is_called_each_enter_with_typed_config() -> None: + calls: list[str] = [] + + def create_layer(config: ConfiguredLayerConfig) -> ConfiguredLayer: + calls.append(config.value) + return ConfiguredLayer(value=f"factory:{config.value}") + + compositor = Compositor( + [LayerNode("configured", LayerProvider.from_factory(layer_type=ConfiguredLayer, create=create_layer))] + ) async def run() -> None: - async with compositor.enter() as session: - session.suspend_on_exit() + async with compositor.enter(configs={"configured": {"value": "one"}}) as first_run: + first_layer = first_run.get_layer("configured", ConfiguredLayer) + assert first_layer.value == "factory:one" + assert first_layer.config.value == "one" + async with compositor.enter(configs={"configured": ConfiguredLayerConfig(value="two")}) as second_run: + second_layer = second_run.get_layer("configured", ConfiguredLayer) + assert second_layer.value == "factory:two" + assert second_layer.config.value == "two" + assert second_layer is not first_layer + asyncio.run(run()) + + assert calls == ["one", "two"] + + +def test_provider_rejects_reused_layer_instance_before_hooks_run() -> None: + shared_layer = TraceLayer() + compositor = Compositor( + [ + LayerNode( + "trace", + LayerProvider.from_factory(layer_type=TraceLayer, create=lambda config: shared_layer), + ) + ] + ) + + async def run() -> None: async with compositor.enter(): pass + with pytest.raises(ValueError, match="fresh layer instance"): + async with compositor.enter(): + pass + asyncio.run(run()) - assert layers["trace"].events == ["create", "suspend", "create", "delete"] + assert shared_layer.events == ["create", "delete"] -def test_compositor_enter_rejects_session_with_mismatched_layer_names() -> None: - compositor, _layers = _compositor("trace") - session = CompositorSession(["other"]) +def test_configs_are_validated_by_node_name_before_factory_call() -> None: + calls: list[str] = [] - async def run() -> None: - async with compositor.enter(session): - pass + def create_layer(config: ConfiguredLayerConfig) -> ConfiguredLayer: + calls.append(config.value) + return ConfiguredLayer(value=config.value) - try: - asyncio.run(run()) - except ValueError as e: - assert str(e) == ( - "CompositorSession layer names must match compositor layers in order. " - "Expected [trace], got [other]." + compositor = Compositor( + [LayerNode("configured", LayerProvider.from_factory(layer_type=ConfiguredLayer, create=create_layer))] + ) + + with pytest.raises(ValueError, match="unknown layer node names: missing"): + asyncio.run(_enter_once(compositor, configs={"missing": {}})) + + with pytest.raises(ValidationError): + asyncio.run(_enter_once(compositor, configs={"configured": {"unknown": "field"}})) + + assert calls == [] + + +def test_all_node_configs_are_validated_before_any_factory_runs() -> None: + calls: list[str] = [] + + def create_layer(config: ConfiguredLayerConfig) -> ConfiguredLayer: + calls.append(config.value) + return ConfiguredLayer(value=config.value) + + provider = LayerProvider.from_factory(layer_type=ConfiguredLayer, create=create_layer) + compositor = Compositor([LayerNode("first", provider), LayerNode("second", provider)]) + + with pytest.raises(ValidationError): + asyncio.run( + _enter_once( + compositor, + configs={"first": {"value": "valid"}, "second": {"unknown": "field"}}, + ) ) - else: - raise AssertionError("Expected ValueError.") + + assert calls == [] -def test_compositor_enter_rejects_same_active_session_nested() -> None: - compositor, _layers = _compositor("trace") - session = compositor.new_session() +def test_existing_config_model_instances_are_revalidated_before_factory_runs() -> None: + calls: list[str] = [] - async def run() -> None: - async with compositor.enter(session): - async with compositor.enter(session): - pass + def create_layer(config: ConfiguredLayerConfig) -> ConfiguredLayer: + calls.append(config.value) + return ConfiguredLayer(value=config.value) - try: - asyncio.run(run()) - except RuntimeError as e: - assert str(e) == "LayerControl is already active; duplicate or nested enter is not allowed." - else: - raise AssertionError("Expected RuntimeError.") + compositor = Compositor( + [LayerNode("configured", LayerProvider.from_factory(layer_type=ConfiguredLayer, create=create_layer))] + ) + config = ConfiguredLayerConfig(value="valid") + config.value = 123 # pyright: ignore[reportAttributeAccessIssue] + + with pytest.raises(ValidationError): + asyncio.run(_enter_once(compositor, configs={"configured": config})) + + assert calls == [] -def test_compositor_enter_rejects_closed_session() -> None: - compositor, _layers = _compositor("trace") - session = compositor.new_session() +def test_existing_snapshot_model_instances_are_revalidated_before_factory_runs() -> None: + calls = 0 - async def run() -> None: - async with compositor.enter(session): - pass + def create_layer(config: EmptyLayerConfig) -> TraceLayer: + nonlocal calls + calls += 1 + return TraceLayer() - async with compositor.enter(session): - pass + compositor = Compositor([LayerNode("trace", LayerProvider.from_factory(layer_type=TraceLayer, create=create_layer))]) + snapshot = CompositorSessionSnapshot.model_validate( + {"layers": [{"name": "trace", "lifecycle_state": "suspended", "runtime_state": {}}]} + ) + snapshot.layers[0].lifecycle_state = LifecycleState.ACTIVE - try: - asyncio.run(run()) - except RuntimeError as e: - assert str(e) == "LayerControl is closed; create a new compositor session before entering again." - else: - raise AssertionError("Expected RuntimeError.") + with pytest.raises(ValidationError, match="ACTIVE is internal-only"): + asyncio.run(_enter_once(compositor, session_snapshot=snapshot)) - -def test_per_layer_suspend_on_exit_only_resumes_that_layer() -> None: - compositor, layers = _compositor("first", "second") - session = compositor.new_session() - - async def run() -> None: - async with compositor.enter(session): - session.layer("first").suspend_on_exit() - - assert session.layer("first").state is LifecycleState.SUSPENDED - assert session.layer("second").state is LifecycleState.CLOSED - - async with compositor.enter(session): - pass - - try: - asyncio.run(run()) - except RuntimeError as e: - assert str(e) == "LayerControl is closed; create a new compositor session before entering again." - else: - raise AssertionError("Expected RuntimeError.") - - assert layers["first"].events == ["create", "suspend"] - assert layers["second"].events == ["create", "delete"] - - -@dataclass(slots=True) -class FailingCreateLayer(PlainLayer[NoLayerDeps]): - attempts: int = 0 - - @override - async def on_context_create(self, control: LayerControl) -> None: - self.attempts += 1 - if self.attempts == 1: - raise RuntimeError("create failed") - - -def test_failed_create_keeps_control_reusable_as_new() -> None: - layer = FailingCreateLayer() - compositor: Compositor[PlainPromptType, PlainToolType] = Compositor(layers=OrderedDict([("trace", layer)])) - session = compositor.new_session() - - async def fail_then_retry() -> None: - try: - async with compositor.enter(session): - pass - except RuntimeError as e: - assert str(e) == "create failed" - else: - raise AssertionError("Expected RuntimeError.") - - assert session.layer("trace").state is LifecycleState.NEW - - async with compositor.enter(session): - pass - - asyncio.run(fail_then_retry()) - - assert session.layer("trace").state is LifecycleState.CLOSED - assert layer.attempts == 2 - - -@dataclass(slots=True) -class FailingResumeLayer(PlainLayer[NoLayerDeps]): - resumed: bool = False - - @override - async def on_context_resume(self, control: LayerControl) -> None: - if not self.resumed: - self.resumed = True - raise RuntimeError("resume failed") - - -def test_failed_resume_keeps_control_reusable_as_suspended() -> None: - layer = FailingResumeLayer() - compositor: Compositor[PlainPromptType, PlainToolType] = Compositor(layers=OrderedDict([("trace", layer)])) - session = compositor.new_session() - - async def suspend_fail_then_retry() -> None: - async with compositor.enter(session) as active_session: - active_session.suspend_on_exit() - - try: - async with compositor.enter(session): - pass - except RuntimeError as e: - assert str(e) == "resume failed" - else: - raise AssertionError("Expected RuntimeError.") - - assert session.layer("trace").state is LifecycleState.SUSPENDED - - async with compositor.enter(session): - pass - - asyncio.run(suspend_fail_then_retry()) - - assert session.layer("trace").state is LifecycleState.CLOSED + assert calls == 0 class RuntimeState(BaseModel): runtime_id: int | None = None resumed_runtime_id: int | None = None deleted_runtime_id: int | None = None + body_value: str | None = None model_config = ConfigDict(extra="forbid", validate_assignment=True) @@ -258,302 +270,126 @@ class RuntimeStateLayer(PlainLayer[NoLayerDeps, EmptyLayerConfig, RuntimeState]) next_id: Iterator[int] = field(default_factory=lambda: count(1)) @override - async def on_context_create(self, control: LayerControl[RuntimeState, EmptyRuntimeHandles]) -> None: - runtime_id = next(self.next_id) - control.runtime_state.runtime_id = runtime_id + async def on_context_create(self) -> None: + self.runtime_state.runtime_id = next(self.next_id) @override - async def on_context_resume(self, control: LayerControl[RuntimeState, EmptyRuntimeHandles]) -> None: - control.runtime_state.resumed_runtime_id = control.runtime_state.runtime_id + async def on_context_resume(self) -> None: + self.runtime_state.resumed_runtime_id = self.runtime_state.runtime_id @override - async def on_context_delete(self, control: LayerControl[RuntimeState, EmptyRuntimeHandles]) -> None: - control.runtime_state.deleted_runtime_id = control.runtime_state.runtime_id + async def on_context_delete(self) -> None: + self.runtime_state.deleted_runtime_id = self.runtime_state.runtime_id -def test_runtime_state_is_per_session_and_survives_suspend_resume_delete() -> None: - layer = RuntimeStateLayer() - compositor: Compositor[PlainPromptType, PlainToolType] = Compositor(layers=OrderedDict([("trace", layer)])) - first_session = compositor.new_session() - second_session = compositor.new_session() +def test_snapshot_hydrates_runtime_state_and_exit_snapshots_from_layer_self() -> None: + compositor = Compositor([LayerNode("state", RuntimeStateLayer)]) - async def run() -> None: - async with compositor.enter(first_session) as active_session: - active_session.suspend_on_exit() + async def create_suspend_resume_delete() -> tuple[CompositorSessionSnapshot, CompositorSessionSnapshot]: + async with compositor.enter() as first_run: + first_run.suspend_on_exit() + assert first_run.session_snapshot is not None - async with compositor.enter(second_session): - pass + async with compositor.enter(session_snapshot=first_run.session_snapshot) as resumed_run: + resumed_layer = resumed_run.get_layer("state", RuntimeStateLayer) + assert isinstance(resumed_layer.runtime_state, RuntimeState) + assert resumed_layer.runtime_state.runtime_id == 1 + assert resumed_layer.runtime_state.resumed_runtime_id == 1 + resumed_layer.runtime_state.body_value = "mutated on self" + assert resumed_run.session_snapshot is not None + return first_run.session_snapshot, resumed_run.session_snapshot - async with compositor.enter(first_session): - pass + suspended_snapshot, closed_snapshot = asyncio.run(create_suspend_resume_delete()) - asyncio.run(run()) - - assert first_session.layer("trace").runtime_state.model_dump(exclude_none=True) == { - "runtime_id": 1, - "resumed_runtime_id": 1, - "deleted_runtime_id": 1, + assert suspended_snapshot.model_dump(mode="json") == { + "schema_version": 1, + "layers": [ + { + "name": "state", + "lifecycle_state": "suspended", + "runtime_state": { + "runtime_id": 1, + "resumed_runtime_id": None, + "deleted_runtime_id": None, + "body_value": None, + }, + } + ], } - assert second_session.layer("trace").runtime_state.model_dump(exclude_none=True) == { - "runtime_id": 2, - "deleted_runtime_id": 2, + assert closed_snapshot.model_dump(mode="json") == { + "schema_version": 1, + "layers": [ + { + "name": "state", + "lifecycle_state": "closed", + "runtime_state": { + "runtime_id": 1, + "resumed_runtime_id": 1, + "deleted_runtime_id": 1, + "body_value": "mutated on self", + }, + } + ], } - assert not hasattr(layer, "runtime_id") -class ResourceHandles(BaseModel): - resource: "RecordingResource | None" = None - - model_config = ConfigDict(extra="forbid", validate_assignment=True, arbitrary_types_allowed=True) - - -class RecordingResource: - events: list[str] - label: str - closed: bool - - def __init__(self, events: list[str], label: str) -> None: - self.events = events - self.label = label - self.closed = False - - async def __aenter__(self) -> "RecordingResource": - self.events.append(f"enter:{self.label}") - return self - - async def __aexit__(self, exc_type: object, exc: object, traceback: object) -> None: - self.closed = True - self.events.append(f"exit:{self.label}") - - -@dataclass(slots=True) -class ResourceLayer(PlainLayer[NoLayerDeps, EmptyLayerConfig, EmptyRuntimeState, ResourceHandles]): - events: list[str] = field(default_factory=list) - resources: list[RecordingResource] = field(default_factory=list) - fail_create: bool = False - fail_resume: bool = False - fail_suspend: bool = False - fail_delete: bool = False - - @override - async def on_context_create(self, control: LayerControl[EmptyRuntimeState, ResourceHandles]) -> None: - await self._open_resource(control, "create") - self.events.append("create") - if self.fail_create: - raise RuntimeError("create failed") - - @override - async def on_context_resume(self, control: LayerControl[EmptyRuntimeState, ResourceHandles]) -> None: - await self._open_resource(control, "resume") - self.events.append("resume") - if self.fail_resume: - raise RuntimeError("resume failed") - - @override - async def on_context_suspend(self, control: LayerControl[EmptyRuntimeState, ResourceHandles]) -> None: - self.events.append("suspend") - control.runtime_handles.resource = None - if self.fail_suspend: - raise RuntimeError("suspend failed") - - @override - async def on_context_delete(self, control: LayerControl[EmptyRuntimeState, ResourceHandles]) -> None: - self.events.append("delete") - control.runtime_handles.resource = None - if self.fail_delete: - raise RuntimeError("delete failed") - - async def _open_resource( - self, - control: LayerControl[EmptyRuntimeState, ResourceHandles], - label: str, - ) -> None: - resource = await control.enter_async_resource(RecordingResource(self.events, label)) - - async def cleanup() -> None: - self.events.append(f"cleanup:{label}") - - control.add_async_cleanup(cleanup) - control.runtime_handles.resource = resource - self.resources.append(resource) - - -def _resource_control(control: LayerControl) -> LayerControl[EmptyRuntimeState, ResourceHandles]: - return cast(LayerControl[EmptyRuntimeState, ResourceHandles], control) - - -def test_entry_resource_stack_closes_resources_on_suspend_and_delete() -> None: - layer = ResourceLayer() - compositor: Compositor[PlainPromptType, PlainToolType] = Compositor(layers=OrderedDict([("resource", layer)])) - session = compositor.new_session() +def test_run_snapshot_rejects_active_layers() -> None: + compositor = _compositor("trace") async def run() -> None: - async with compositor.enter(session) as active_session: - control = _resource_control(active_session.layer("resource")) - assert control.runtime_handles.resource is layer.resources[0] - assert layer.resources[0].closed is False - active_session.suspend_on_exit() - - assert _resource_control(session.layer("resource")).runtime_handles.resource is None - assert layer.resources[0].closed is True - - async with compositor.enter(session): - control = _resource_control(session.layer("resource")) - assert control.runtime_handles.resource is layer.resources[1] - assert layer.resources[1].closed is False - - assert _resource_control(session.layer("resource")).runtime_handles.resource is None - assert layer.resources[1].closed is True - - asyncio.run(run()) - - assert layer.events == [ - "enter:create", - "create", - "suspend", - "cleanup:create", - "exit:create", - "enter:resume", - "resume", - "delete", - "cleanup:resume", - "exit:resume", - ] - - -def test_entry_resource_stack_closes_resources_when_create_or_resume_raises() -> None: - async def fail_create() -> None: - layer = ResourceLayer(fail_create=True) - compositor: Compositor[PlainPromptType, PlainToolType] = Compositor(layers=OrderedDict([("resource", layer)])) - session = compositor.new_session() - - with pytest.raises(RuntimeError, match="create failed"): - async with compositor.enter(session): - pass - - assert session.layer("resource").state is LifecycleState.NEW - assert layer.resources[0].closed is True - assert session.layer("resource")._entry_stack is None - - async def fail_resume() -> None: - layer = ResourceLayer() - compositor: Compositor[PlainPromptType, PlainToolType] = Compositor(layers=OrderedDict([("resource", layer)])) - session = compositor.new_session() - - async with compositor.enter(session) as active_session: - active_session.suspend_on_exit() - - layer.fail_resume = True - with pytest.raises(RuntimeError, match="resume failed"): - async with compositor.enter(session): - pass - - assert session.layer("resource").state is LifecycleState.SUSPENDED - assert layer.resources[1].closed is True - assert session.layer("resource")._entry_stack is None - - asyncio.run(fail_create()) - asyncio.run(fail_resume()) - - -def test_entry_resource_stack_closes_resources_when_body_raises() -> None: - layer = ResourceLayer() - compositor: Compositor[PlainPromptType, PlainToolType] = Compositor(layers=OrderedDict([("resource", layer)])) - session = compositor.new_session() - - async def run() -> None: - with pytest.raises(RuntimeError, match="body failed"): - async with compositor.enter(session): - raise RuntimeError("body failed") - - asyncio.run(run()) - - assert session.layer("resource").state is LifecycleState.CLOSED - assert layer.resources[0].closed is True - assert session.layer("resource")._entry_stack is None - - -def test_failed_suspend_hook_exits_active_state_and_closes_resources() -> None: - layer = ResourceLayer(fail_suspend=True) - compositor: Compositor[PlainPromptType, PlainToolType] = Compositor(layers=OrderedDict([("resource", layer)])) - session = compositor.new_session() - - async def run() -> None: - with pytest.raises(RuntimeError, match="suspend failed"): - async with compositor.enter(session) as active_session: - active_session.suspend_on_exit() - - control = session.layer("resource") - assert control.state is LifecycleState.SUSPENDED - assert layer.resources[0].closed is True - assert control._entry_stack is None - with pytest.raises(RuntimeError, match="entry resource stack is not active"): - await control.enter_async_resource(RecordingResource([], "unused")) - with pytest.raises(RuntimeError, match="requires an active LayerControl"): - _ = layer.require_control(control, active=True) + async with compositor.enter() as active_run: + with pytest.raises(RuntimeError, match="Cannot snapshot active compositor run layers: trace"): + active_run.snapshot_session() asyncio.run(run()) -def test_failed_delete_hook_exits_active_state_and_closes_resources() -> None: - layer = ResourceLayer(fail_delete=True) - compositor: Compositor[PlainPromptType, PlainToolType] = Compositor(layers=OrderedDict([("resource", layer)])) - session = compositor.new_session() +def test_active_snapshot_input_is_rejected_before_factories_run() -> None: + calls = 0 - async def run() -> None: - with pytest.raises(RuntimeError, match="delete failed"): - async with compositor.enter(session): - pass + def create_layer(config: EmptyLayerConfig) -> TraceLayer: + nonlocal calls + calls += 1 + return TraceLayer() - control = session.layer("resource") - assert control.state is LifecycleState.CLOSED - assert layer.resources[0].closed is True - assert control._entry_stack is None - with pytest.raises(RuntimeError, match="entry resource stack is not active"): - await control.enter_async_resource(RecordingResource([], "unused")) - with pytest.raises(RuntimeError, match="requires an active LayerControl"): - _ = layer.require_control(control, active=True) + compositor = Compositor([LayerNode("trace", LayerProvider.from_factory(layer_type=TraceLayer, create=create_layer))]) + active_snapshot = {"layers": [{"name": "trace", "lifecycle_state": "active", "runtime_state": {}}]} - asyncio.run(run()) + with pytest.raises(ValidationError, match="ACTIVE is internal-only"): + CompositorSessionSnapshot.model_validate(active_snapshot) + + with pytest.raises(ValidationError, match="ACTIVE is internal-only"): + asyncio.run(_enter_once(compositor, session_snapshot=active_snapshot)) + + assert calls == 0 -def test_resource_stack_api_raises_outside_active_entry_stack() -> None: - control = LayerControl() +def test_closed_snapshot_enter_is_rejected_before_hooks_run() -> None: + created_layers: list[TraceLayer] = [] - async def cleanup() -> None: - raise AssertionError("cleanup should not be registered") + def create_layer(config: EmptyLayerConfig) -> TraceLayer: + layer = TraceLayer() + created_layers.append(layer) + return layer - with pytest.raises(RuntimeError, match="entry resource stack is not active"): - control.add_async_cleanup(cleanup) + compositor = Compositor([LayerNode("trace", LayerProvider.from_factory(layer_type=TraceLayer, create=create_layer))]) + closed_snapshot = {"layers": [{"name": "trace", "lifecycle_state": "closed", "runtime_state": {}}]} - with pytest.raises(RuntimeError, match="entry resource stack is not active"): - asyncio.run(control.enter_async_resource(RecordingResource([], "unused"))) + with pytest.raises(RuntimeError, match="CLOSED snapshots cannot be entered"): + asyncio.run(_enter_once(compositor, session_snapshot=closed_snapshot)) + + assert len(created_layers) == 1 + assert created_layers[0].events == [] -def test_require_control_validates_owner_schema_and_active_state() -> None: - layer = ResourceLayer() - other_layer = TraceLayer() - compositor: Compositor[PlainPromptType, PlainToolType] = Compositor( - layers=OrderedDict([("resource", layer), ("other", other_layer)]) - ) - session = compositor.new_session() - control = session.layer("resource") - - with pytest.raises(RuntimeError, match="requires an active LayerControl"): - _ = layer.require_control(control, active=True) - - with pytest.raises(RuntimeError, match="belongs to layer 'other'"): - _ = layer.require_control(session.layer("other")) - - bad_control = LayerControl() - bad_session = CompositorSession(OrderedDict([("resource", bad_control), ("other", other_layer.new_control())])) - bad_session._bind_owner(compositor) - with pytest.raises(TypeError, match="runtime_handles must be ResourceHandles"): - _ = layer.require_control(bad_control) - - async def run() -> None: - async with compositor.enter(session) as active_session: - active_control = active_session.layer("resource") - assert layer.require_control(active_control, active=True) is active_control - - asyncio.run(run()) +async def _enter_once( + compositor: Compositor, + *, + configs: dict[str, object] | None = None, + session_snapshot: object | None = None, +) -> None: + async with compositor.enter( + configs=configs, # pyright: ignore[reportArgumentType] + session_snapshot=session_snapshot, # pyright: ignore[reportArgumentType] + ): + pass diff --git a/dify-agent/tests/local/agenton/compositor/test_transformers.py b/dify-agent/tests/local/agenton/compositor/test_transformers.py index 754a2f6cf1..8786d0f802 100644 --- a/dify-agent/tests/local/agenton/compositor/test_transformers.py +++ b/dify-agent/tests/local/agenton/compositor/test_transformers.py @@ -1,11 +1,11 @@ -from collections import OrderedDict +import asyncio from collections.abc import Callable, Sequence from dataclasses import dataclass from inspect import Parameter, signature from typing_extensions import override -from agenton.compositor import Compositor, CompositorTransformerKwargs +from agenton.compositor import Compositor, CompositorTransformerKwargs, LayerNode, LayerProvider from agenton.layers import NoLayerDeps, PlainLayer, PlainPromptType, PlainToolType, PlainUserPromptType type ToolCallable = Callable[..., object] @@ -61,6 +61,24 @@ def describe_tools(tools: Sequence[PlainToolType]) -> list[str]: return [tool.value.__name__ for tool in tools] +def prompt_tool_provider( + *, + prefix: list[str] | None = None, + user: list[str] | None = None, + suffix: list[str] | None = None, + tool_entries: list[ToolCallable] | None = None, +) -> LayerProvider[PromptAndToolLayer]: + return LayerProvider.from_factory( + layer_type=PromptAndToolLayer, + create=lambda config: PromptAndToolLayer( + prefix=list(prefix or []), + user=list(user or []), + suffix=list(suffix or []), + tool_entries=list(tool_entries or []), + ), + ) + + def test_compositor_transformer_kwargs_keys_match_constructor_parameters() -> None: transformer_kwargs = set(CompositorTransformerKwargs.__required_keys__) parameters = signature(Compositor).parameters @@ -84,53 +102,36 @@ def test_compositor_transformer_kwargs_keys_match_from_config_parameters() -> No def test_compositor_transforms_prompts_to_another_type_after_layer_ordering() -> None: compositor: Compositor[WrappedPrompt, PlainToolType, PlainPromptType, PlainToolType] = Compositor( - layers=OrderedDict( - [ - ( - "first", - PromptAndToolLayer( - prefix=["first-prefix"], - user=[], - suffix=["first-suffix"], - tool_entries=[], - ), - ), - ( - "second", - PromptAndToolLayer( - prefix=["second-prefix"], - user=[], - suffix=["second-suffix"], - tool_entries=[], - ), - ), - ] - ), + [ + LayerNode("first", prompt_tool_provider(prefix=["first-prefix"], suffix=["first-suffix"])), + LayerNode("second", prompt_tool_provider(prefix=["second-prefix"], suffix=["second-suffix"])), + ], prompt_transformer=wrap_prompts, ) - assert compositor.prompts == [ - ("wrapped", "first-prefix"), - ("wrapped", "second-prefix"), - ("wrapped", "second-suffix"), - ("wrapped", "first-suffix"), - ] + async def run() -> None: + async with compositor.enter() as active_run: + assert active_run.prompts == [ + ("wrapped", "first-prefix"), + ("wrapped", "second-prefix"), + ("wrapped", "second-suffix"), + ("wrapped", "first-suffix"), + ] + + asyncio.run(run()) def test_compositor_transforms_tools_to_another_type_after_layer_aggregation() -> None: compositor: Compositor[PlainPromptType, str, PlainPromptType, PlainToolType] = Compositor( - layers=OrderedDict( - [ - ( - "tools", - PromptAndToolLayer(prefix=[], user=[], suffix=[], tool_entries=[base_tool, wrapped_tool]), - ) - ] - ), + [LayerNode("tools", prompt_tool_provider(tool_entries=[base_tool, wrapped_tool]))], tool_transformer=describe_tools, ) - assert compositor.tools == ["base_tool", "wrapped_tool"] + async def run() -> None: + async with compositor.enter() as active_run: + assert active_run.tools == ["base_tool", "wrapped_tool"] + + asyncio.run(run()) def test_compositor_transforms_user_prompts_after_layer_ordering() -> None: @@ -142,22 +143,18 @@ def test_compositor_transforms_user_prompts_after_layer_ordering() -> None: WrappedUserPrompt, PlainUserPromptType, ] = Compositor( - layers=OrderedDict( - [ - ( - "first", - PromptAndToolLayer(prefix=[], user=["first-user"], suffix=[], tool_entries=[]), - ), - ( - "second", - PromptAndToolLayer(prefix=[], user=["second-user"], suffix=[], tool_entries=[]), - ), - ] - ), + [ + LayerNode("first", prompt_tool_provider(user=["first-user"])), + LayerNode("second", prompt_tool_provider(user=["second-user"])), + ], user_prompt_transformer=wrap_user_prompts, ) - assert compositor.user_prompts == [ - ("wrapped-user", "first-user"), - ("wrapped-user", "second-user"), - ] + async def run() -> None: + async with compositor.enter() as active_run: + assert active_run.user_prompts == [ + ("wrapped-user", "first-user"), + ("wrapped-user", "second-user"), + ] + + asyncio.run(run()) diff --git a/dify-agent/tests/local/agenton/layers/test_schema_inference.py b/dify-agent/tests/local/agenton/layers/test_schema_inference.py index 501a2209c4..55af512e54 100644 --- a/dify-agent/tests/local/agenton/layers/test_schema_inference.py +++ b/dify-agent/tests/local/agenton/layers/test_schema_inference.py @@ -2,13 +2,11 @@ from dataclasses import dataclass from pydantic import BaseModel, ConfigDict -from agenton.compositor import LayerRegistry +from agenton.compositor import LayerProvider from agenton.layers import ( EmptyLayerConfig, - EmptyRuntimeHandles, EmptyRuntimeState, LayerConfig, - LayerControl, NoLayerDeps, PlainLayer, ) @@ -26,19 +24,16 @@ class InferredState(BaseModel): model_config = ConfigDict(extra="forbid", validate_assignment=True) -class InferredHandles(BaseModel): - token: object | None = None - - model_config = ConfigDict(extra="forbid", validate_assignment=True, arbitrary_types_allowed=True) - - @dataclass(slots=True) -class GenericSchemaLayer(PlainLayer[NoLayerDeps, InferredConfig, InferredState, InferredHandles]): +class GenericSchemaLayer(PlainLayer[NoLayerDeps, InferredConfig, InferredState]): type_id = "test.generic-schema" - async def on_context_create(self, control: LayerControl[InferredState, InferredHandles]) -> None: - control.runtime_state.count += 1 - control.runtime_handles.token = object() + @classmethod + def from_config(cls, config: InferredConfig) -> "GenericSchemaLayer": + return cls() + + async def on_context_create(self) -> None: + self.runtime_state.count += 1 @dataclass(slots=True) @@ -46,27 +41,22 @@ class DefaultSchemaLayer(PlainLayer[NoLayerDeps]): type_id = "test.default-schema" -def test_layer_infers_config_runtime_state_and_handles_from_generics() -> None: +def test_layer_infers_config_and_runtime_state_from_generics() -> None: layer = GenericSchemaLayer() - control = layer.new_control(runtime_state={"count": 3}) + layer.runtime_state = InferredState(count=3) assert GenericSchemaLayer.config_type is InferredConfig assert GenericSchemaLayer.runtime_state_type is InferredState - assert GenericSchemaLayer.runtime_handles_type is InferredHandles - assert isinstance(control.runtime_state, InferredState) - assert control.runtime_state.count == 3 - assert isinstance(control.runtime_handles, InferredHandles) + assert isinstance(layer.runtime_state, InferredState) + assert layer.runtime_state.count == 3 def test_layer_uses_empty_schema_defaults_when_omitted() -> None: layer = DefaultSchemaLayer() - control = layer.new_control() assert DefaultSchemaLayer.config_type is EmptyLayerConfig assert DefaultSchemaLayer.runtime_state_type is EmptyRuntimeState - assert DefaultSchemaLayer.runtime_handles_type is EmptyRuntimeHandles - assert isinstance(control.runtime_state, EmptyRuntimeState) - assert isinstance(control.runtime_handles, EmptyRuntimeHandles) + assert isinstance(layer.runtime_state, EmptyRuntimeState) def test_invalid_declared_schema_type_is_rejected_clearly() -> None: @@ -91,12 +81,12 @@ def test_invalid_declared_schema_type_is_rejected_clearly() -> None: raise AssertionError("Expected TypeError.") -def test_registry_descriptor_uses_inferred_schema_types() -> None: - registry = LayerRegistry() - registry.register_layer(GenericSchemaLayer) +def test_layer_provider_uses_inferred_schema_types() -> None: + provider = LayerProvider.from_layer_type(GenericSchemaLayer) - descriptor = registry.resolve("test.generic-schema") + layer = provider.create_layer({"value": "configured"}) - assert descriptor.config_type is InferredConfig - assert descriptor.runtime_state_type is InferredState - assert descriptor.runtime_handles_type is InferredHandles + assert provider.type_id == "test.generic-schema" + assert provider.layer_type.config_type is InferredConfig + assert provider.layer_type.runtime_state_type is InferredState + assert isinstance(layer.config, InferredConfig)