initialize the agenton engine

This commit is contained in:
盐粒 Yanli 2026-04-29 03:40:05 +08:00
parent 264533324b
commit 5a7eb7fdb6
20 changed files with 1408 additions and 0 deletions

0
.codex Normal file
View File

View File

@ -0,0 +1,139 @@
"""Run with: uv run --project dify-agent python examples/agenton_basics.py."""
from __future__ import annotations
import asyncio
from dataclasses import dataclass, field
from inspect import signature
from typing_extensions import override
from agenton.compositor import Compositor, CompositorLayerConfig
from agenton.layers import LayerContextSignal, LayerDeps, NoLayerDeps, PlainLayer
from agenton_collections.plain import DynamicToolsLayer, ObjectLayer, ToolsLayer, with_object
@dataclass(frozen=True, slots=True)
class AgentProfile:
name: str
audience: str
tone: str
class ProfilePromptDeps(LayerDeps):
profile: ObjectLayer[AgentProfile] # pyright: ignore[reportUninitializedInstanceVariable]
@dataclass(slots=True)
class ProfilePromptLayer(PlainLayer[ProfilePromptDeps]):
@property
@override
def prefix_prompts(self) -> list[str]:
profile = self.deps.profile.value
return [
f"You are {profile.name}, writing for {profile.audience}.",
f"Keep the tone {profile.tone}.",
]
@dataclass(slots=True)
class TraceLayer(PlainLayer[NoLayerDeps]):
events: list[str] = field(default_factory=list)
@override
async def on_context_create(self, signal: LayerContextSignal) -> None:
self.events.append("create")
@override
async def on_context_temporarily_leave(self, signal: LayerContextSignal) -> None:
self.events.append("temporary_leave")
@override
async def on_context_reenter(self, signal: LayerContextSignal) -> None:
self.events.append("reenter")
@override
async def on_context_delete(self, signal: LayerContextSignal) -> None:
self.events.append("delete")
def count_words(text: str) -> int:
return len(text.split())
@with_object(AgentProfile)
def write_tagline(profile: AgentProfile, topic: str) -> str:
return f"{profile.name}: {topic} for {profile.audience}, in a {profile.tone} voice."
async def main() -> None:
profile = AgentProfile(
name="Agenton Assistant",
audience="engineers composing agent capabilities",
tone="precise and friendly",
)
trace = TraceLayer()
compositor = Compositor.from_config(
{
"layers": [
{
"name": "base_prompt",
"layer": {
"import_path": "agenton_collections.plain.basic:PromptLayer",
"config": {
"prefix": "Use config dicts for serializable layers.",
"suffix": "Before finalizing, make the result easy to scan.",
},
},
},
{
"name": "extra_prompt",
"layer": {
"import_path": "agenton_collections.plain.basic:PromptLayer",
"config": {
"prefix": "Use constructed instances for objects, local code, and callables.",
},
},
},
CompositorLayerConfig(
name="profile",
layer=ObjectLayer[AgentProfile](profile),
),
CompositorLayerConfig(
name="profile_prompt",
deps={"profile": "profile"},
layer=ProfilePromptLayer(),
),
CompositorLayerConfig(
name="tools",
layer=ToolsLayer(tool_entries=(count_words,)),
),
CompositorLayerConfig(
name="dynamic_tools",
deps={"object_layer": "profile"},
layer=DynamicToolsLayer[AgentProfile](tool_entries=(write_tagline,)),
),
CompositorLayerConfig(name="trace", layer=trace),
]
}
)
print("Prompts:")
for prompt in compositor.prompts:
print(f"- {prompt}")
print("\nTools:")
for tool in compositor.tools:
print(f"- {tool.__name__}{signature(tool)}")
print([tool("layer composition") for tool in compositor.tools])
async with compositor.context() as context:
context.temporary_leave = True
async with compositor.context():
pass
print("\nLifecycle:", trace.events)
if __name__ == "__main__":
asyncio.run(main())

View File

View File

@ -0,0 +1,232 @@
"""Layer composition primitives.
The compositor owns a named, ordered set of layers. ``Compositor[PromptT,
ToolT]`` is framework-neutral; callers choose prompt/tool item types by
annotating construction or assignment sites. Use
``agenton.compositor.helpers.make_compositor`` when type inference from layer
arguments is useful; it lives in a child module so the core compositor does not
depend on its helper overloads.
Dependency mappings use layer-local dependency names as keys and compositor
layer names as values. Prompt aggregation depends on insertion order: prefix
prompts are collected from first to last layer, while suffix prompts are
collected in reverse.
``Compositor.context`` enters layer contexts in compositor order and exits them
in reverse order through ``AsyncExitStack``. It yields per-layer lifecycle
signals so callers can mark individual layers, or all layers, as temporarily
leaving.
"""
from collections import OrderedDict
from collections.abc import AsyncIterator
from contextlib import AsyncExitStack, asynccontextmanager
from dataclasses import dataclass, field
from importlib import import_module
from typing import TYPE_CHECKING, Annotated, Any, Mapping, cast
from pydantic import AfterValidator, BaseModel, ConfigDict, Field, JsonValue
from typing_extensions import Self
from agenton.layers.base import Layer, LayerContextSignal
class ImportedLayerConfig(BaseModel):
"""Config for constructing one layer from an import path."""
import_path: str
config: Any = None
model_config = ConfigDict(arbitrary_types_allowed=True)
def create_layer(self) -> Layer[Any, Any, Any]:
"""Import the target layer class and create it from config."""
try:
import_module_name, import_target = self.import_path.rsplit(":", 1)
except ValueError as e:
raise ValueError(
f"Invalid import string '{self.import_path}'. "
"It should be in the format 'module:ClassName'."
) from e
layer_t = getattr(import_module(import_module_name), import_target)
if not isinstance(layer_t, type) or not issubclass(layer_t, Layer):
raise TypeError(f"Imported target '{self.import_path}' must be a Layer subclass.")
return layer_t.from_config(config=self.config)
LayerSpec = Layer[Any, Any, Any] | ImportedLayerConfig
type _ConfigModelValue[ModelT: BaseModel] = ModelT | JsonValue | str | bytes
def _validate_config_model_input[ModelT: BaseModel](
model_type: type[ModelT],
value: _ConfigModelValue[ModelT] | Mapping[str, object],
) -> ModelT:
if isinstance(value, model_type):
return value
if isinstance(value, str | bytes):
return model_type.model_validate_json(value)
return model_type.model_validate(value)
class CompositorLayerConfig(BaseModel):
"""Config entry for one named layer in a compositor.
``layer`` may be either an already constructed layer instance or an
``ImportedLayerConfig``. Direct instances are already initialized, so config
for imported layers lives inside ``ImportedLayerConfig`` instead of beside
the graph node fields.
"""
name: str
deps: Mapping[str, str] = Field(default_factory=dict)
layer: LayerSpec
model_config = ConfigDict(arbitrary_types_allowed=True)
def create_layer(self) -> Layer[Any, Any, Any]:
"""Create or return the configured layer instance."""
if isinstance(self.layer, Layer):
return self.layer
return self.layer.create_layer()
type CompositorLayerConfigValue = _ConfigModelValue[CompositorLayerConfig]
def _validate_layer_config_input(value: CompositorLayerConfigValue) -> CompositorLayerConfig:
return _validate_config_model_input(CompositorLayerConfig, value)
type CompositorLayerConfigInput = Annotated[
CompositorLayerConfigValue,
AfterValidator(_validate_layer_config_input),
]
class CompositorConfig(BaseModel):
"""Serializable config for constructing a compositor graph.
``layers`` accepts ready-made ``CompositorLayerConfig`` instances, raw JSON
values, or JSON-encoded strings/bytes. After validation, callers always see
normalized ``CompositorLayerConfig`` objects.
"""
if TYPE_CHECKING:
layers: list[CompositorLayerConfig]
else:
layers: list[CompositorLayerConfigInput]
type CompositorConfigValue = _ConfigModelValue[CompositorConfig] | Mapping[str, object]
def _validate_compositor_config_input(value: CompositorConfigValue) -> CompositorConfig:
return _validate_config_model_input(CompositorConfig, value)
@dataclass(slots=True)
class CompositorContext:
"""Signal slots for layer contexts entered by a compositor."""
signals: OrderedDict[str, LayerContextSignal]
@property
def temporary_leave(self) -> bool:
"""Whether any entered layer is currently marked for temporary leave."""
return any(signal.temporary_leave for signal in self.signals.values())
@temporary_leave.setter
def temporary_leave(self, value: bool) -> None:
for signal in self.signals.values():
signal.temporary_leave = value
@dataclass(kw_only=True)
class Compositor[PromptT, ToolT]:
"""Framework-neutral ordered layer graph with lifecycle and aggregation."""
layers: OrderedDict[str, Layer[Any, PromptT, ToolT]]
deps_name_mapping: Mapping[str, Mapping[str, str]] = field(default_factory=dict)
_deps_bound: bool = field(default=False, init=False)
def __post_init__(self) -> None:
self._bind_deps(self.deps_name_mapping)
@classmethod
def from_config(cls, conf: CompositorConfigValue) -> Self:
"""Create layers from config-like input and bind named dependencies."""
conf = _validate_compositor_config_input(conf)
layers: OrderedDict[str, Layer[Any, PromptT, ToolT]] = OrderedDict()
for layer_conf in conf.layers:
layers[layer_conf.name] = cast(Layer[Any, PromptT, ToolT], layer_conf.create_layer())
deps_name_mapping = {layer_conf.name: layer_conf.deps for layer_conf in conf.layers}
return cls(layers=layers, deps_name_mapping=deps_name_mapping)
def _bind_deps(self, deps_name_mapping: Mapping[str, Mapping[str, str]]) -> None:
"""Resolve dependency-name mappings and bind dependencies on each layer.
The outer mapping key is the layer being bound. The inner mapping key is
the dependency field declared by that layer's deps type, and the value is
the target layer name in this compositor.
"""
if self._deps_bound:
raise RuntimeError("Compositor deps are already bound.")
for layer_name, layer in self.layers.items():
layer_deps = deps_name_mapping.get(layer_name, {})
try:
deps = {
dep_name: self.layers[target_layer_name]
for dep_name, target_layer_name in layer_deps.items()
}
except KeyError as e:
raise ValueError(
f"Layer '{layer_name}' has a dependency on layer '{e.args[0]}', "
"which is not defined in the builder."
) from e
layer.bind_deps({**self.layers, **deps})
self._deps_bound = True
@asynccontextmanager
async def context(self) -> AsyncIterator[CompositorContext]:
"""Enter each layer context in order and yield their signal slots."""
if not self._deps_bound:
raise RuntimeError("Compositor deps must be bound before entering context.")
signals: OrderedDict[str, LayerContextSignal] = OrderedDict()
async with AsyncExitStack() as stack:
for layer_name, layer in self.layers.items():
signals[layer_name] = await stack.enter_async_context(layer.context())
yield CompositorContext(signals=signals)
@property
def prompts(self) -> list[PromptT]:
result: list[PromptT] = []
for layer in self.layers.values():
result.extend(layer.prefix_prompts)
for layer in reversed(self.layers.values()):
result.extend(layer.suffix_prompts)
return result
@property
def tools(self) -> list[ToolT]:
result: list[ToolT] = []
for layer in self.layers.values():
result.extend(layer.tools)
return result
__all__ = [
"Compositor",
"CompositorConfig",
"CompositorConfigValue",
"CompositorLayerConfigInput",
"CompositorContext",
"CompositorLayerConfig",
"CompositorLayerConfigValue",
"ImportedLayerConfig",
"LayerSpec",
]

View File

@ -0,0 +1,110 @@
"""Type-inference helpers for compositor construction.
The core ``Compositor`` stays framework-neutral and usually needs explicit
prompt/tool type parameters. ``make_compositor`` is a small runtime factory
whose overloads let type checkers infer prompt and tool unions from the layer
arguments without introducing annotation-only compositor aliases.
"""
from __future__ import annotations
from collections import OrderedDict
from typing import TYPE_CHECKING, Any, Mapping, overload
from agenton.layers.base import Layer
if TYPE_CHECKING:
from . import Compositor
type NamedLayer[PromptT, ToolT] = tuple[str, Layer[Any, PromptT, ToolT]]
@overload
def make_compositor[PromptT1, ToolT1](
layer1: NamedLayer[PromptT1, ToolT1],
/,
*,
deps_name_mapping: Mapping[str, Mapping[str, str]] | None = None,
) -> Compositor[PromptT1, ToolT1]: ...
@overload
def make_compositor[PromptT1, ToolT1, PromptT2, ToolT2](
layer1: NamedLayer[PromptT1, ToolT1],
layer2: NamedLayer[PromptT2, ToolT2],
/,
*,
deps_name_mapping: Mapping[str, Mapping[str, str]] | None = None,
) -> Compositor[PromptT1 | PromptT2, ToolT1 | ToolT2]: ...
@overload
def make_compositor[PromptT1, ToolT1, PromptT2, ToolT2, PromptT3, ToolT3](
layer1: NamedLayer[PromptT1, ToolT1],
layer2: NamedLayer[PromptT2, ToolT2],
layer3: NamedLayer[PromptT3, ToolT3],
/,
*,
deps_name_mapping: Mapping[str, Mapping[str, str]] | None = None,
) -> Compositor[PromptT1 | PromptT2 | PromptT3, ToolT1 | ToolT2 | ToolT3]: ...
@overload
def make_compositor[
PromptT1,
ToolT1,
PromptT2,
ToolT2,
PromptT3,
ToolT3,
PromptT4,
ToolT4,
](
layer1: NamedLayer[PromptT1, ToolT1],
layer2: NamedLayer[PromptT2, ToolT2],
layer3: NamedLayer[PromptT3, ToolT3],
layer4: NamedLayer[PromptT4, ToolT4],
/,
*,
deps_name_mapping: Mapping[str, Mapping[str, str]] | None = None,
) -> Compositor[PromptT1 | PromptT2 | PromptT3 | PromptT4, ToolT1 | ToolT2 | ToolT3 | ToolT4]: ...
@overload
def make_compositor[
PromptT1,
ToolT1,
PromptT2,
ToolT2,
PromptT3,
ToolT3,
PromptT4,
ToolT4,
PromptT5,
ToolT5,
](
layer1: NamedLayer[PromptT1, ToolT1],
layer2: NamedLayer[PromptT2, ToolT2],
layer3: NamedLayer[PromptT3, ToolT3],
layer4: NamedLayer[PromptT4, ToolT4],
layer5: NamedLayer[PromptT5, ToolT5],
/,
*,
deps_name_mapping: Mapping[str, Mapping[str, str]] | None = None,
) -> Compositor[
PromptT1 | PromptT2 | PromptT3 | PromptT4 | PromptT5,
ToolT1 | ToolT2 | ToolT3 | ToolT4 | ToolT5,
]: ...
def make_compositor(
*layers: NamedLayer[Any, Any],
deps_name_mapping: Mapping[str, Mapping[str, str]] | None = None,
) -> Compositor[Any, Any]:
"""Create a compositor while letting type checkers infer layer item unions."""
from . import Compositor
return Compositor(
layers=OrderedDict(layers),
deps_name_mapping=deps_name_mapping or {},
)

View File

@ -0,0 +1,29 @@
"""Layer base classes and typed layer families.
``agenton.layers.base`` owns the framework-neutral ``Layer`` abstraction.
``agenton.layers.types`` binds the prompt/tool generic slots to specific layer
families while keeping concrete reusable layers in ``agenton_collections``.
"""
from agenton.layers.base import Layer, LayerContextSignal, LayerDeps, NoLayerDeps
from agenton.layers.types import (
PlainLayer,
PlainPrompt,
PlainTool,
PydanticAILayer,
PydanticAIPrompt,
PydanticAITool,
)
__all__ = [
"Layer",
"LayerContextSignal",
"LayerDeps",
"NoLayerDeps",
"PlainLayer",
"PlainPrompt",
"PlainTool",
"PydanticAILayer",
"PydanticAIPrompt",
"PydanticAITool",
]

View File

@ -0,0 +1,314 @@
"""Core layer abstractions and typed dependency binding.
Layers declare their dependency shape with ``Layer[DepsT, PromptT, ToolT]``.
``DepsT`` must be a ``LayerDeps`` subclass whose annotated members are concrete
``Layer`` subclasses or modern optional dependencies such as ``SomeLayer |
None``. The base class infers ``deps_type`` from the generic base when possible,
while still allowing subclasses to set ``deps_type`` explicitly for unusual
inheritance patterns.
``Layer.bind_deps`` is the mutation point for dependency state. Layer
implementations should treat ``self.deps`` as unavailable until a compositor or
caller has resolved and bound dependencies.
Layer async contexts use a bool signal to distinguish permanent exits from
temporary exits. A normal first entry runs create logic and a normal exit runs
delete logic; when the signal is set, exit runs temporary-leave logic and the
next entry runs reenter logic.
``Layer`` is framework-neutral over prompt and tool item types. Typed families
such as ``agenton.layers.types.PlainLayer`` bind those generic slots to a
specific contract without pushing framework types into this base module.
"""
from abc import ABC
from collections.abc import AsyncIterator
from contextlib import AbstractAsyncContextManager, asynccontextmanager
from dataclasses import dataclass
from types import UnionType
from typing import Any, Mapping, Sequence, Union, cast, get_args, get_origin, get_type_hints
from typing_extensions import Self
class LayerDeps:
"""Typed dependency container for a Layer.
Subclasses declare dependency members with annotations. Every annotated
member must be a Layer subclass or ``LayerSubclass | None``. Optional deps
are always assigned as attributes; missing optional values become ``None``.
"""
def __init__(self, **deps: "Layer[Any, Any, Any] | None") -> None:
dep_specs = _get_dep_specs(type(self))
missing_names = {name for name, spec in dep_specs.items() if not spec.optional} - deps.keys()
if missing_names:
names = ", ".join(sorted(missing_names))
raise ValueError(f"Missing layer dependencies: {names}.")
unknown_names = deps.keys() - dep_specs.keys()
if unknown_names:
names = ", ".join(sorted(unknown_names))
raise ValueError(f"Unknown layer dependencies: {names}.")
for name, spec in dep_specs.items():
value = deps.get(name)
if value is None:
if spec.optional:
setattr(self, name, None)
continue
raise ValueError(f"Dependency '{name}' is required but not provided.")
if not isinstance(value, spec.layer_type):
raise TypeError(
f"Dependency '{name}' should be of type '{spec.layer_type.__name__}', "
f"but got type '{type(value).__name__}'."
)
setattr(self, name, value)
class NoLayerDeps(LayerDeps):
"""Dependency container for layers that do not require other layers."""
@dataclass(slots=True)
class LayerContextSignal:
"""Signal slot exposed inside a layer context.
Set ``temporary_leave`` before leaving the context to run temporary-leave
logic instead of delete logic. A later entry will then run reenter logic.
"""
temporary_leave: bool = False
@dataclass(frozen=True, slots=True)
class LayerDepSpec:
"""Runtime dependency specification derived from a deps annotation."""
layer_type: type["Layer[Any, Any, Any]"]
optional: bool = False
class Layer[DepsT: LayerDeps, PromptT, ToolT](ABC):
"""Framework-neutral base class for prompt/tool layers.
Subclasses expose optional prompt fragments and tools through typed
properties. They declare required dependencies in the ``DepsT`` container
rather than by accepting dependencies in ``__init__``. The default async
context manager handles create, delete, temporary-leave, and reenter
transitions; layers can override ``context`` when they need to wrap extra
runtime resources.
"""
deps_type: type[DepsT]
deps: DepsT
_temporarily_left: bool
def __init_subclass__(cls) -> None:
super().__init_subclass__()
deps_type = cls.__dict__.get("deps_type")
if deps_type is None:
deps_type = _infer_deps_type(cls) or getattr(cls, "deps_type", None)
if deps_type is None and _is_generic_layer_template(cls):
return
if deps_type is not None:
cls.deps_type = deps_type # pyright: ignore[reportAttributeAccessIssue]
if deps_type is None:
raise TypeError(f"{cls.__name__} must define deps_type or inherit from Layer[DepsT].")
if not isinstance(deps_type, type) or not issubclass(deps_type, LayerDeps):
raise TypeError(f"{cls.__name__}.deps_type must be a LayerDeps subclass.")
_get_dep_specs(deps_type)
@classmethod
def from_config(cls: type[Self], config: Any) -> Self:
"""Create a layer from serialized config.
Layers are not config-constructible by default. Subclasses that accept
config should override this method and validate dynamic input before
constructing the layer.
"""
raise TypeError(f"{cls.__name__} cannot be created from config.")
def bind_deps(self, deps: Mapping[str, "Layer[Any, Any, Any] | None"]) -> None:
"""Bind this layer's declared dependencies from a name-to-layer mapping.
The mapping may include more layers than the declared dependency fields.
Only names declared by ``deps_type`` are selected and validated. Missing
optional deps are bound as ``None``.
"""
resolved_deps: dict[str, Layer[Any, Any, Any] | None] = {}
for name, spec in _get_dep_specs(self.deps_type).items():
if name not in deps:
if spec.optional:
resolved_deps[name] = None
continue
raise ValueError(
f"Dependency '{name}' is required for layer '{type(self).__name__}' but not provided."
)
resolved_deps[name] = deps[name]
self.deps = self.deps_type(**resolved_deps)
def context(self) -> AbstractAsyncContextManager[LayerContextSignal]:
"""Return the layer's async context manager.
The yielded ``LayerContextSignal`` is the signal slot available to code
inside the context. Subclasses can override this to wrap extra async
resources around ``self.lifecycle_context()``.
"""
return self.lifecycle_context()
@asynccontextmanager
async def lifecycle_context(self) -> AsyncIterator[LayerContextSignal]:
"""Run the default create/reenter and delete/temporary-leave lifecycle."""
signal = LayerContextSignal()
was_temporarily_left = getattr(self, "_temporarily_left", False)
self._temporarily_left = False
if was_temporarily_left:
await self.on_context_reenter(signal)
else:
await self.on_context_create(signal)
try:
yield signal
finally:
if signal.temporary_leave:
await self.on_context_temporarily_leave(signal)
self._temporarily_left = True
else:
await self.on_context_delete(signal)
self._temporarily_left = False
async def on_context_create(self, signal: LayerContextSignal) -> None:
"""Run when the layer context is entered from a non-temporary state."""
async def on_context_delete(self, signal: LayerContextSignal) -> None:
"""Run when the layer context exits without a temporary-leave signal."""
async def on_context_temporarily_leave(self, signal: LayerContextSignal) -> None:
"""Run when the layer context exits with ``temporary_leave`` set."""
async def on_context_reenter(self, signal: LayerContextSignal) -> None:
"""Run when the layer context enters after a temporary leave."""
@property
def prefix_prompts(self) -> Sequence[PromptT]:
return []
@property
def suffix_prompts(self) -> Sequence[PromptT]:
return []
@property
def tools(self) -> Sequence[ToolT]:
return []
def _get_dep_specs(deps_type: type[LayerDeps]) -> dict[str, LayerDepSpec]:
dep_specs: dict[str, LayerDepSpec] = {}
for name, annotation in get_type_hints(deps_type).items():
spec = _as_dep_spec(annotation)
if spec is None:
raise TypeError(
f"{deps_type.__name__}.{name} must be annotated with a Layer subclass "
"or Layer subclass | None."
)
dep_specs[name] = spec
return dep_specs
def _as_dep_spec(annotation: object) -> LayerDepSpec | None:
origin = get_origin(annotation)
args = get_args(annotation)
if origin in (UnionType, Union) and len(args) == 2 and type(None) in args:
layer_annotation = args[0] if args[1] is type(None) else args[1]
layer_type = _as_layer_type(layer_annotation)
if layer_type is None:
return None
return LayerDepSpec(layer_type=layer_type, optional=True)
layer_type = _as_layer_type(annotation)
if layer_type is None:
return None
return LayerDepSpec(layer_type=layer_type)
def _as_layer_type(annotation: object) -> type[Layer[Any, Any, Any]] | None:
runtime_type = get_origin(annotation) or annotation
if isinstance(runtime_type, type) and issubclass(runtime_type, Layer):
return cast(type[Layer[Any, Any, Any]], runtime_type)
return None
def _infer_deps_type(layer_type: type[Layer[Any, Any, Any]]) -> type[LayerDeps] | None:
return _infer_deps_type_from_bases(layer_type, {})
def _infer_deps_type_from_bases(
layer_type: type[Layer[Any, Any, Any]],
substitutions: Mapping[object, object],
) -> type[LayerDeps] | None:
"""Infer the concrete deps container through generic Layer inheritance.
This walks through intermediate generic base classes so subclasses can omit
an explicit ``deps_type`` in common cases such as ``class X(Base[YDeps])``.
"""
for base in getattr(layer_type, "__orig_bases__", ()):
origin = get_origin(base) or base
args = tuple(_substitute_type(arg, substitutions) for arg in get_args(base))
if origin is Layer:
if not args:
continue
return _as_deps_type(args[0])
if not isinstance(origin, type) or not issubclass(origin, Layer):
continue
next_substitutions = dict(substitutions)
next_substitutions.update(_generic_arg_substitutions(origin, args))
inferred = _infer_deps_type_from_bases(origin, next_substitutions)
if inferred is not None:
return inferred
return None
def _substitute_type(value: object, substitutions: Mapping[object, object]) -> object:
if value in substitutions:
return substitutions[value]
origin = get_origin(value)
if origin is None:
return value
args = get_args(value)
if not args:
return value
substituted_args = tuple(_substitute_type(arg, substitutions) for arg in args)
if substituted_args == args:
return value
try:
return origin[substituted_args]
except TypeError:
return value
def _generic_arg_substitutions(origin: type[Any], args: Sequence[object]) -> dict[object, object]:
params = getattr(origin, "__type_params__", ())
if not params:
params = getattr(origin, "__parameters__", ())
return dict(zip(params, args))
def _as_deps_type(value: object) -> type[LayerDeps] | None:
runtime_type = get_origin(value) or value
if isinstance(runtime_type, type) and issubclass(runtime_type, LayerDeps):
return runtime_type
return None
def _is_generic_layer_template(layer_type: type[Layer[Any, Any, Any]]) -> bool:
return bool(getattr(layer_type, "__type_params__", ())) or bool(
getattr(layer_type, "__parameters__", ())
)

View File

@ -0,0 +1,42 @@
"""Typed layer family definitions.
``Layer`` itself is framework-neutral. This module defines typed layer families
that bind its prompt/tool generic slots to concrete contracts, such as ordinary
string prompts with plain callable tools or pydantic-ai prompt/tool shapes.
Concrete reusable layers live under ``agenton_collections``.
"""
from collections.abc import Callable
from typing import Any
from pydantic_ai import Tool
from pydantic_ai.tools import SystemPromptFunc, ToolFuncEither
from agenton.layers.base import Layer, LayerDeps
type PlainPrompt = str
type PlainTool = Callable[..., Any]
class PlainLayer[DepsT: LayerDeps](Layer[DepsT, PlainPrompt, PlainTool]):
"""Layer base for ordinary string prompts and plain-callable tools."""
type PydanticAIPrompt[AgentDepsT] = str | SystemPromptFunc[AgentDepsT]
type PydanticAITool[AgentDepsT] = Tool[AgentDepsT] | ToolFuncEither[AgentDepsT, ...]
class PydanticAILayer[DepsT: LayerDeps, AgentDepsT](
Layer[DepsT, PydanticAIPrompt[AgentDepsT], PydanticAITool[AgentDepsT]]
):
"""Layer base for pydantic-ai prompt and tool adapters."""
__all__ = [
"PlainLayer",
"PlainPrompt",
"PlainTool",
"PydanticAILayer",
"PydanticAIPrompt",
"PydanticAITool",
]

View File

@ -0,0 +1,47 @@
"""Convenience exports for reusable layer implementations.
Concrete collection layers live in family subpackages such as
``agenton_collections.plain`` and ``agenton_collections.pydantic_ai``. The
package root keeps the short import path for common layers while avoiding
implementation code in ``__init__``.
"""
from agenton.layers.types import (
PlainLayer,
PlainPrompt,
PlainTool,
PydanticAILayer,
PydanticAIPrompt,
PydanticAITool,
)
from agenton_collections.pydantic_ai import (
PydanticAIBridgeLayer,
PydanticAIBridgeLayerDeps,
PydanticAIPrompts,
)
from agenton_collections.plain import (
DynamicToolsLayer,
DynamicToolsLayerDeps,
ObjectLayer,
PromptLayer,
ToolsLayer,
with_object,
)
__all__ = [
"DynamicToolsLayer",
"DynamicToolsLayerDeps",
"ObjectLayer",
"PlainLayer",
"PlainPrompt",
"PlainTool",
"PromptLayer",
"PydanticAIBridgeLayer",
"PydanticAIBridgeLayerDeps",
"PydanticAILayer",
"PydanticAIPrompt",
"PydanticAIPrompts",
"PydanticAITool",
"ToolsLayer",
"with_object",
]

View File

@ -0,0 +1,17 @@
"""Reusable collection layers for the plain layer family."""
from agenton_collections.plain.basic import ObjectLayer, PromptLayer, ToolsLayer
from agenton_collections.plain.dynamic_tools import (
DynamicToolsLayer,
DynamicToolsLayerDeps,
with_object,
)
__all__ = [
"DynamicToolsLayer",
"DynamicToolsLayerDeps",
"ObjectLayer",
"PromptLayer",
"ToolsLayer",
"with_object",
]

View File

@ -0,0 +1,68 @@
"""Basic ready-to-compose layers for common plain use cases.
These layers are small concrete implementations built on
``agenton.layers.types``. They intentionally stay free of compositor graph
construction so they can be reused from config, examples, and higher-level
dynamic layers.
"""
from collections.abc import Callable, Sequence
from dataclasses import dataclass, field
from typing import Any
from pydantic import TypeAdapter
from agenton.layers.base import NoLayerDeps
from agenton.layers.types import PlainLayer
@dataclass
class ObjectLayer[ObjectT](PlainLayer[NoLayerDeps]):
"""Layer that stores one typed object for downstream dependencies."""
value: ObjectT
@dataclass
class PromptLayer(PlainLayer[NoLayerDeps]):
"""Layer that contributes configured prefix and suffix prompt fragments."""
prefix: list[str] | str = field(default_factory=list)
suffix: list[str] | str = field(default_factory=list)
@classmethod
def from_config(cls, config: Any):
"""Validate prompt config against this dataclass."""
return _PROMPT_LAYER_ADAPTER.validate_python(config)
@property
def prefix_prompts(self) -> list[str]:
if isinstance(self.prefix, str):
return [self.prefix]
return self.prefix
@property
def suffix_prompts(self) -> list[str]:
if isinstance(self.suffix, str):
return [self.suffix]
return self.suffix
@dataclass
class ToolsLayer(PlainLayer[NoLayerDeps]):
"""Layer that contributes configured plain-callable tools."""
tool_entries: Sequence[Callable[..., Any]] = ()
@property
def tools(self) -> list[Callable[..., Any]]:
return list(self.tool_entries)
_PROMPT_LAYER_ADAPTER = TypeAdapter(PromptLayer)
__all__ = [
"ObjectLayer",
"PromptLayer",
"ToolsLayer",
]

View File

@ -0,0 +1,233 @@
"""Dynamic plain-tool layer with object-bound tool entries.
This module builds on ``ObjectLayer`` from ``agenton_collections.plain.basic``.
Plain callables are exposed unchanged, while entries wrapped with
``with_object`` bind the current object value into the first callable argument
and expose the remaining parameters as the public tool signature.
"""
from collections.abc import Callable, Sequence
from dataclasses import dataclass
from functools import wraps
from inspect import Parameter, Signature, iscoroutinefunction, signature
from types import UnionType
from typing import (
Annotated,
Any,
Concatenate,
Union,
get_args,
get_origin,
get_type_hints,
)
from agenton.layers.base import LayerDeps
from agenton.layers.types import PlainLayer
from agenton_collections.plain.basic import ObjectLayer
type _ObjectToolCallable[ObjectT] = Callable[Concatenate[ObjectT, ...], Any]
@dataclass(frozen=True, slots=True)
class _ObjectToolEntry[ObjectT]:
"""Tool entry whose first argument should be filled from ``ObjectLayer``."""
tool_entry: _ObjectToolCallable[ObjectT]
object_type: type[ObjectT] | None = None
type _DynamicToolEntry[ObjectT] = Callable[..., Any] | _ObjectToolEntry[ObjectT]
def with_object[ObjectT](
object_type: type[ObjectT],
/,
) -> Callable[[_ObjectToolCallable[ObjectT]], _ObjectToolEntry[ObjectT]]:
"""Mark a tool as requiring the bound object value as its first argument."""
def decorator(tool_entry: _ObjectToolCallable[ObjectT]) -> _ObjectToolEntry[ObjectT]:
_validate_object_tool_annotation(tool_entry, object_type)
return _ObjectToolEntry(tool_entry=tool_entry, object_type=object_type)
return decorator
class DynamicToolsLayerDeps[ObjectT](LayerDeps):
"""Dependencies required by ``DynamicToolsLayer``."""
object_layer: ObjectLayer[ObjectT] # pyright: ignore[reportUninitializedInstanceVariable]
@dataclass
class DynamicToolsLayer[ObjectT](PlainLayer[DynamicToolsLayerDeps[ObjectT]]):
"""Layer that exposes plain tools and object-bound tools."""
tool_entries: Sequence[_DynamicToolEntry[ObjectT]] = ()
@property
def tools(self) -> list[Callable[..., Any]]:
object_value = self.deps.object_layer.value
return [
_bind_object_argument(tool_entry.tool_entry, object_value, tool_entry.object_type)
if isinstance(tool_entry, _ObjectToolEntry)
else tool_entry
for tool_entry in self.tool_entries
]
def _bind_object_argument[ObjectT](
tool_entry: _ObjectToolCallable[ObjectT],
object_value: ObjectT,
object_type: type[ObjectT] | None,
) -> Callable[..., Any]:
_validate_object_value(tool_entry, object_value, object_type)
if iscoroutinefunction(tool_entry):
wrapped = _async_object_wrapper(tool_entry, object_value)
else:
wrapped = _sync_object_wrapper(tool_entry, object_value)
public_signature = _public_tool_signature(tool_entry)
if public_signature is not None:
setattr(wrapped, "__signature__", public_signature)
_set_public_annotations(wrapped, tool_entry)
return wrapped
def _validate_object_tool_annotation[ObjectT](
tool_entry: _ObjectToolCallable[ObjectT],
object_type: type[ObjectT],
) -> None:
parameter = _first_object_parameter(tool_entry)
if parameter is None:
return
annotation = _parameter_annotation(tool_entry, parameter)
if annotation is Parameter.empty:
return
if _annotation_accepts_object_type(annotation, object_type):
return
raise TypeError(
f"Object-bound tool '{_tool_name(tool_entry)}' first parameter should accept "
f"'{_type_name(object_type)}'."
)
def _first_object_parameter(tool_entry: Callable[..., Any]) -> Parameter | None:
try:
tool_signature = signature(tool_entry)
except (TypeError, ValueError):
return None
parameters = list(tool_signature.parameters.values())
if not parameters:
raise ValueError("Dynamic tools must accept the object dependency as their first parameter.")
return parameters[0]
def _parameter_annotation(tool_entry: Callable[..., Any], parameter: Parameter) -> object:
try:
type_hints = get_type_hints(tool_entry, include_extras=True)
except (AttributeError, NameError, TypeError):
return parameter.annotation
return type_hints.get(parameter.name, parameter.annotation)
def _annotation_accepts_object_type(annotation: object, object_type: type[Any]) -> bool:
if annotation is Any or annotation is Parameter.empty:
return True
origin = get_origin(annotation)
if origin is Annotated:
args = get_args(annotation)
return True if not args else _annotation_accepts_object_type(args[0], object_type)
if origin in (UnionType, Union):
return any(
arg is type(None) or _annotation_accepts_object_type(arg, object_type)
for arg in get_args(annotation)
)
runtime_type = origin or annotation
if not isinstance(runtime_type, type):
return True
try:
return issubclass(object_type, runtime_type)
except TypeError:
return True
def _validate_object_value[ObjectT](
tool_entry: _ObjectToolCallable[ObjectT],
object_value: ObjectT,
object_type: type[ObjectT] | None,
) -> None:
if object_type is None or isinstance(object_value, object_type):
return
raise TypeError(
f"Object-bound tool '{_tool_name(tool_entry)}' expected object dependency "
f"of type '{_type_name(object_type)}', but got '{type(object_value).__qualname__}'."
)
def _tool_name(tool_entry: Callable[..., Any]) -> str:
return getattr(tool_entry, "__qualname__", getattr(tool_entry, "__name__", repr(tool_entry)))
def _type_name(object_type: type[Any]) -> str:
return object_type.__qualname__
def _sync_object_wrapper[ObjectT](
tool_entry: _ObjectToolCallable[ObjectT],
object_value: ObjectT,
) -> Callable[..., Any]:
@wraps(tool_entry)
def wrapped(*args: Any, **kwargs: Any) -> Any:
return tool_entry(object_value, *args, **kwargs)
return wrapped
def _async_object_wrapper[ObjectT](
tool_entry: _ObjectToolCallable[ObjectT],
object_value: ObjectT,
) -> Callable[..., Any]:
@wraps(tool_entry)
async def wrapped(*args: Any, **kwargs: Any) -> Any:
return await tool_entry(object_value, *args, **kwargs)
return wrapped
def _public_tool_signature(tool_entry: Callable[..., Any]) -> Signature | None:
try:
tool_signature = signature(tool_entry)
except (TypeError, ValueError):
return None
parameters = list(tool_signature.parameters.values())
if not parameters:
raise ValueError("Dynamic tools must accept the object dependency as their first parameter.")
return tool_signature.replace(parameters=parameters[1:])
def _set_public_annotations(wrapper: Callable[..., Any], tool_entry: Callable[..., Any]) -> None:
annotations = getattr(tool_entry, "__annotations__", None)
if not isinstance(annotations, dict):
return
try:
parameters = list(signature(tool_entry).parameters)
except (TypeError, ValueError):
parameters = []
public_annotations = dict(annotations)
if parameters:
public_annotations.pop(parameters[0], None)
wrapper.__annotations__ = public_annotations
__all__ = [
"DynamicToolsLayer",
"DynamicToolsLayerDeps",
"with_object",
]

View File

@ -0,0 +1,13 @@
"""Reusable collection layers for the pydantic-ai layer family."""
from agenton_collections.pydantic_ai.bridge import (
PydanticAIBridgeLayer,
PydanticAIBridgeLayerDeps,
PydanticAIPrompts,
)
__all__ = [
"PydanticAIBridgeLayer",
"PydanticAIBridgeLayerDeps",
"PydanticAIPrompts",
]

View File

@ -0,0 +1,70 @@
"""Pydantic AI bridge prompt and tool layer.
This module keeps pydantic-ai's callable shapes intact through
``PydanticAILayer``. The bridge layer depends on ``ObjectLayer`` so callers have
one explicit graph node that provides the object used as
``RunContext[ObjectT].deps`` in pydantic-ai prompt and tool callables.
"""
from collections.abc import Sequence
from dataclasses import dataclass
from typing_extensions import override
from agenton.layers.base import LayerDeps
from agenton.layers.types import PydanticAILayer, PydanticAIPrompt, PydanticAITool
from agenton_collections.plain.basic import ObjectLayer
type PydanticAIPrompts[ObjectT] = PydanticAIPrompt[ObjectT] | Sequence[PydanticAIPrompt[ObjectT]]
class PydanticAIBridgeLayerDeps[ObjectT](LayerDeps):
"""Dependencies required by ``PydanticAIBridgeLayer``."""
object_layer: ObjectLayer[ObjectT] # pyright: ignore[reportUninitializedInstanceVariable]
@dataclass
class PydanticAIBridgeLayer[ObjectT](
PydanticAILayer[PydanticAIBridgeLayerDeps[ObjectT], ObjectT]
):
"""Bridge layer for pydantic-ai prompts and tools using one object deps."""
prefix: PydanticAIPrompts[ObjectT] = ()
suffix: PydanticAIPrompts[ObjectT] = ()
tool_entries: Sequence[PydanticAITool[ObjectT]] = ()
@property
def run_deps(self) -> ObjectT:
"""Object to pass as pydantic-ai run deps for this layer."""
return self.deps.object_layer.value
@property
@override
def prefix_prompts(self) -> list[PydanticAIPrompt[ObjectT]]:
return _normalize_prompts(self.prefix)
@property
@override
def suffix_prompts(self) -> list[PydanticAIPrompt[ObjectT]]:
return _normalize_prompts(self.suffix)
@property
@override
def tools(self) -> list[PydanticAITool[ObjectT]]:
return list(self.tool_entries)
def _normalize_prompts[ObjectT](
prompts: PydanticAIPrompts[ObjectT],
) -> list[PydanticAIPrompt[ObjectT]]:
if isinstance(prompts, str) or callable(prompts):
return [prompts]
return list(prompts)
__all__ = [
"PydanticAIBridgeLayer",
"PydanticAIBridgeLayerDeps",
"PydanticAIPrompts",
]

View File

@ -0,0 +1 @@

View File

@ -0,0 +1 @@

View File

@ -0,0 +1,15 @@
import pytest
from agenton.layers import LayerDeps
from agenton_collections.plain import ObjectLayer, PromptLayer
class ObjectLayerDeps(LayerDeps):
"""Deps container used to exercise runtime dependency validation."""
object_layer: ObjectLayer[str] # pyright: ignore[reportUninitializedInstanceVariable]
def test_layer_deps_rejects_mismatched_runtime_layer_class() -> None:
with pytest.raises(TypeError, match="should be of type 'ObjectLayer'"):
ObjectLayerDeps(object_layer=PromptLayer())

View File

@ -0,0 +1 @@

View File

@ -0,0 +1 @@

View File

@ -0,0 +1,75 @@
import asyncio
import pytest
from pydantic_ai import Tool
from agenton_collections.plain import DynamicToolsLayer, ObjectLayer, with_object
class Profile:
"""Profile object used by object-bound tool tests."""
name: str
def __init__(self, name: str) -> None:
self.name = name
class OtherProfile:
"""Different runtime object used to trigger object mismatch checks."""
@with_object(Profile)
def greet(profile: Profile, topic: str) -> str:
return f"{profile.name}: {topic}"
def test_with_object_rejects_tool_without_object_parameter() -> None:
def tool() -> str:
return "unused"
with pytest.raises(ValueError, match="must accept the object dependency"):
with_object(Profile)(tool) # pyright: ignore[reportArgumentType]
def test_with_object_rejects_first_parameter_annotation_mismatch() -> None:
def tool(profile: OtherProfile) -> str:
return repr(profile)
with pytest.raises(TypeError, match="first parameter should accept 'Profile'"):
with_object(Profile)(tool) # pyright: ignore[reportArgumentType]
def test_dynamic_tools_layer_rejects_mismatched_runtime_object_value() -> None:
layer = DynamicToolsLayer[Profile](tool_entries=(greet,))
layer.bind_deps({"object_layer": ObjectLayer[OtherProfile](OtherProfile())})
with pytest.raises(TypeError, match="expected object dependency of type 'Profile'"):
layer.tools
def public_greet(topic: str) -> str:
return f"Ada: {topic}"
def test_dynamic_tools_layer_binds_object_as_pydantic_ai_equivalent_tool() -> None:
layer = DynamicToolsLayer[Profile](tool_entries=(greet,))
layer.bind_deps({"object_layer": ObjectLayer[Profile](Profile("Ada"))})
expected_tool = Tool(public_greet, name="greet")
dynamic_tool = Tool(layer.tools[0], name="greet")
dynamic_result = asyncio.run(
dynamic_tool.function_schema.call(
{"topic": "layer composition"},
None, # pyright: ignore[reportArgumentType]
)
)
expected_result = asyncio.run(
expected_tool.function_schema.call(
{"topic": "layer composition"},
None, # pyright: ignore[reportArgumentType]
)
)
assert dynamic_tool.tool_def == expected_tool.tool_def
assert dynamic_result == expected_result