mirror of
https://github.com/langgenius/dify.git
synced 2026-05-09 21:28:25 +08:00
update the signal of compositor
This commit is contained in:
parent
3a5477b39a
commit
31a1de4828
@ -46,12 +46,12 @@ class TraceLayer(PlainLayer[NoLayerDeps]):
|
||||
self.events.append("create")
|
||||
|
||||
@override
|
||||
async def on_context_tmp_leave(self, control: LayerControl) -> None:
|
||||
self.events.append("tmp_leave")
|
||||
async def on_context_suspend(self, control: LayerControl) -> None:
|
||||
self.events.append("suspend")
|
||||
|
||||
@override
|
||||
async def on_context_reenter(self, control: LayerControl) -> None:
|
||||
self.events.append("reenter")
|
||||
async def on_context_resume(self, control: LayerControl) -> None:
|
||||
self.events.append("resume")
|
||||
|
||||
@override
|
||||
async def on_context_delete(self, control: LayerControl) -> None:
|
||||
@ -132,7 +132,7 @@ async def main() -> None:
|
||||
print([tool.value("layer composition") for tool in compositor.tools])
|
||||
|
||||
async with compositor.enter() as lifecycle_control:
|
||||
lifecycle_control.tmp_leave = True
|
||||
lifecycle_control.suspend_on_exit()
|
||||
async with compositor.enter(lifecycle_control):
|
||||
pass
|
||||
print("\nLifecycle:", trace.events)
|
||||
|
||||
@ -4,10 +4,12 @@ from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import json
|
||||
import os
|
||||
from dataclasses import dataclass
|
||||
|
||||
from pydantic_ai import Agent, RunContext
|
||||
from pydantic_ai.messages import BuiltinToolCallPart, ModelMessage, ToolCallPart
|
||||
from pydantic_ai.models.openai import OpenAIChatModel # pyright: ignore[reportDeprecated]
|
||||
from pydantic_ai.models.test import TestModel
|
||||
|
||||
from agenton.compositor import Compositor, CompositorLayerConfig
|
||||
@ -90,8 +92,13 @@ async def main() -> None:
|
||||
)
|
||||
|
||||
async with compositor.enter():
|
||||
model = (
|
||||
OpenAIChatModel("gpt-5.5") # pyright: ignore[reportDeprecated]
|
||||
if os.getenv("OPENAI_API_KEY")
|
||||
else TestModel()
|
||||
)
|
||||
agent = Agent[AgentProfile](
|
||||
model=TestModel(call_tools=["count_words", "write_tagline"]),
|
||||
model=model,
|
||||
deps_type=AgentProfile,
|
||||
tools=compositor.tools,
|
||||
)
|
||||
|
||||
@ -6,17 +6,21 @@ exposed prompt/tool item types by annotating construction or assignment sites.
|
||||
When only the first two type arguments are supplied, ``LayerPromptT`` and
|
||||
``LayerToolT`` default to the corresponding exposed item types.
|
||||
|
||||
Layer instances are shared graph/capability definitions owned by the compositor.
|
||||
Per-session runtime state belongs to each session's ``LayerControl`` objects,
|
||||
not to the shared layer instances, so different sessions can enter the same
|
||||
compositor without leaking generated ids or handles through ``self``.
|
||||
|
||||
Dependency mappings use layer-local dependency names as keys and compositor
|
||||
layer names as values. Prompt aggregation depends on insertion order: prefix
|
||||
prompts are collected from first to last layer, while suffix prompts are
|
||||
collected in reverse.
|
||||
|
||||
``Compositor.enter`` enters layers in compositor order and exits them in
|
||||
reverse order through ``AsyncExitStack``. It accepts an optional
|
||||
``CompositorControl`` whose keys must match the compositor layer names. When
|
||||
omitted, one is created from the compositor's layer names. Reuse the same
|
||||
``CompositorControl`` after setting ``tmp_leave`` to reenter those layer
|
||||
contexts.
|
||||
``Compositor.enter`` enters layers in compositor order and exits them in reverse
|
||||
order through ``AsyncExitStack``. It accepts an optional ``CompositorSession``
|
||||
whose layer controls must match the compositor layer names and order. When
|
||||
omitted, a fresh session is created. Reusing a suspended session resumes its
|
||||
layer contexts; closed sessions must be replaced.
|
||||
|
||||
Optional prompt and tool transformers run after layer aggregation. The
|
||||
compositor asks each layer to ``wrap_prompt`` and ``wrap_tool`` its native
|
||||
@ -35,7 +39,7 @@ from typing import TYPE_CHECKING, Annotated, Any, Generic, Mapping, TypedDict, c
|
||||
from pydantic import AfterValidator, BaseModel, ConfigDict, Field, JsonValue
|
||||
from typing_extensions import Self, TypeVar
|
||||
|
||||
from agenton.layers.base import Layer, LayerControl
|
||||
from agenton.layers.base import Layer, LayerControl, LifecycleState
|
||||
from agenton.layers.types import AllPromptTypes, AllToolTypes
|
||||
|
||||
PromptT = TypeVar("PromptT", default=AllPromptTypes)
|
||||
@ -150,8 +154,15 @@ def _validate_compositor_config_input(value: CompositorConfigValue) -> Composito
|
||||
return _validate_config_model_input(CompositorConfig, value)
|
||||
|
||||
|
||||
class CompositorControl:
|
||||
"""External controls for layer entry contexts entered by a compositor."""
|
||||
class CompositorSession:
|
||||
"""External lifecycle session for layer contexts entered by a compositor.
|
||||
|
||||
A session owns one ``LayerControl`` per compositor layer name, preserving
|
||||
compositor order. Broadcast methods are convenience APIs for setting every
|
||||
layer's per-entry exit intent; ``layer`` allows explicit per-layer control
|
||||
when callers need partial suspend/delete behavior. A mixed session with any
|
||||
closed layer cannot be entered again because compositor entry is all-or-none.
|
||||
"""
|
||||
|
||||
__slots__ = ("layer_controls",)
|
||||
|
||||
@ -162,15 +173,19 @@ class CompositorControl:
|
||||
(layer_name, LayerControl()) for layer_name in layer_names
|
||||
)
|
||||
|
||||
@property
|
||||
def tmp_leave(self) -> bool:
|
||||
"""Whether any entered layer control is marked for temporary leave."""
|
||||
return any(control.tmp_leave for control in self.layer_controls.values())
|
||||
|
||||
@tmp_leave.setter
|
||||
def tmp_leave(self, value: bool) -> None:
|
||||
def suspend_on_exit(self) -> None:
|
||||
"""Request suspend behavior for every layer when this entry exits."""
|
||||
for control in self.layer_controls.values():
|
||||
control.tmp_leave = value
|
||||
control.suspend_on_exit()
|
||||
|
||||
def delete_on_exit(self) -> None:
|
||||
"""Request delete behavior for every layer when this entry exits."""
|
||||
for control in self.layer_controls.values():
|
||||
control.delete_on_exit()
|
||||
|
||||
def layer(self, name: str) -> LayerControl:
|
||||
"""Return the layer control for ``name`` or raise ``KeyError``."""
|
||||
return self.layer_controls[name]
|
||||
|
||||
|
||||
@dataclass(kw_only=True)
|
||||
@ -240,35 +255,52 @@ class Compositor(Generic[PromptT, ToolT, LayerPromptT, LayerToolT]):
|
||||
layer.bind_deps({**self.layers, **deps})
|
||||
self._deps_bound = True
|
||||
|
||||
def new_session(self) -> CompositorSession:
|
||||
"""Create a fresh lifecycle session matching this compositor's layer order."""
|
||||
return CompositorSession(self.layers)
|
||||
|
||||
@asynccontextmanager
|
||||
async def enter(
|
||||
self,
|
||||
control: CompositorControl | None = None,
|
||||
) -> AsyncIterator[CompositorControl]:
|
||||
"""Enter each layer context in order and yield compositor control."""
|
||||
session: CompositorSession | None = None,
|
||||
) -> AsyncIterator[CompositorSession]:
|
||||
"""Enter each layer context in order and yield the active session."""
|
||||
if not self._deps_bound:
|
||||
raise RuntimeError("Compositor deps must be bound before entering context.")
|
||||
|
||||
if control is None:
|
||||
control = CompositorControl(self.layers)
|
||||
self._validate_control(control)
|
||||
if session is None:
|
||||
session = self.new_session()
|
||||
self._validate_session(session)
|
||||
self._ensure_session_can_enter(session)
|
||||
|
||||
async with AsyncExitStack() as stack:
|
||||
for layer_name, layer in self.layers.items():
|
||||
await stack.enter_async_context(layer.enter(control.layer_controls[layer_name]))
|
||||
yield control
|
||||
await stack.enter_async_context(layer.enter(session.layer_controls[layer_name]))
|
||||
yield session
|
||||
|
||||
def _validate_control(self, control: CompositorControl) -> None:
|
||||
def _validate_session(self, session: CompositorSession) -> None:
|
||||
expected_layer_names = tuple(self.layers)
|
||||
actual_layer_names = tuple(control.layer_controls)
|
||||
actual_layer_names = tuple(session.layer_controls)
|
||||
if actual_layer_names != expected_layer_names:
|
||||
expected = ", ".join(expected_layer_names)
|
||||
actual = ", ".join(actual_layer_names)
|
||||
raise ValueError(
|
||||
"CompositorControl layer names must match compositor layers in order. "
|
||||
"CompositorSession layer names must match compositor layers in order. "
|
||||
f"Expected [{expected}], got [{actual}]."
|
||||
)
|
||||
|
||||
def _ensure_session_can_enter(self, session: CompositorSession) -> None:
|
||||
"""Reject active or closed layer controls before any layer side effects."""
|
||||
for control in session.layer_controls.values():
|
||||
if control.state is LifecycleState.ACTIVE:
|
||||
raise RuntimeError(
|
||||
"LayerControl is already active; duplicate or nested enter is not allowed."
|
||||
)
|
||||
if control.state is LifecycleState.CLOSED:
|
||||
raise RuntimeError(
|
||||
"LayerControl is closed; create a new compositor session before entering again."
|
||||
)
|
||||
|
||||
@property
|
||||
def prompts(self) -> list[PromptT]:
|
||||
result: list[LayerPromptT] = []
|
||||
@ -301,7 +333,7 @@ __all__ = [
|
||||
"CompositorConfig",
|
||||
"CompositorConfigValue",
|
||||
"CompositorLayerConfigInput",
|
||||
"CompositorControl",
|
||||
"CompositorSession",
|
||||
"CompositorTransformer",
|
||||
"CompositorTransformerKwargs",
|
||||
"CompositorLayerConfig",
|
||||
|
||||
@ -5,7 +5,7 @@
|
||||
families while keeping concrete reusable layers in ``agenton_collections``.
|
||||
"""
|
||||
|
||||
from agenton.layers.base import Layer, LayerControl, LayerDeps, NoLayerDeps
|
||||
from agenton.layers.base import ExitIntent, Layer, LayerControl, LayerDeps, LifecycleState, NoLayerDeps
|
||||
from agenton.layers.types import (
|
||||
AllPromptTypes,
|
||||
AllToolTypes,
|
||||
@ -27,6 +27,8 @@ __all__ = [
|
||||
"Layer",
|
||||
"LayerDeps",
|
||||
"LayerControl",
|
||||
"LifecycleState",
|
||||
"ExitIntent",
|
||||
"NoLayerDeps",
|
||||
"PlainLayer",
|
||||
"PlainPrompt",
|
||||
|
||||
@ -11,10 +11,15 @@ inheritance patterns.
|
||||
implementations should treat ``self.deps`` as unavailable until a compositor or
|
||||
caller has resolved and bound dependencies.
|
||||
|
||||
Layer async entry uses a caller-provided bool control to distinguish permanent
|
||||
exits from temporary exits. The control is also the external lifecycle state:
|
||||
reuse a ``tmp_leave`` control to reenter, or pass a fresh control to start from
|
||||
create logic.
|
||||
Layer async entry uses a caller-provided ``LayerControl`` as an explicit state
|
||||
machine and per-session runtime owner. A fresh control starts in
|
||||
``LifecycleState.NEW`` and enters create logic. A suspended control resumes,
|
||||
while active or closed controls are rejected to prevent ambiguous nested or
|
||||
post-delete reuse. Exit behavior is selected per entry with ``ExitIntent`` and
|
||||
resets to delete on every successful enter. Layer instances are shared graph and
|
||||
capability definitions, so session-local ids, handles, clients, and other
|
||||
runtime values generated by lifecycle hooks belong in
|
||||
``LayerControl.runtime_state`` rather than on ``self``.
|
||||
|
||||
``Layer`` is framework-neutral over prompt and tool item types. The native
|
||||
``prefix_prompts``, ``suffix_prompts``, and ``tools`` properties are the layer
|
||||
@ -26,7 +31,8 @@ implement them to tag native values without changing layer implementations.
|
||||
from abc import ABC, abstractmethod
|
||||
from collections.abc import AsyncIterator
|
||||
from contextlib import AbstractAsyncContextManager, asynccontextmanager
|
||||
from dataclasses import dataclass
|
||||
from dataclasses import dataclass, field
|
||||
from enum import StrEnum
|
||||
from types import UnionType
|
||||
from typing import Any, Mapping, Sequence, Union, cast, get_args, get_origin, get_type_hints
|
||||
|
||||
@ -73,18 +79,51 @@ class NoLayerDeps(LayerDeps):
|
||||
"""Dependency container for layers that do not require other layers."""
|
||||
|
||||
|
||||
class LifecycleState(StrEnum):
|
||||
"""Externally observable lifecycle state for a layer control."""
|
||||
|
||||
NEW = "new"
|
||||
ACTIVE = "active"
|
||||
SUSPENDED = "suspended"
|
||||
CLOSED = "closed"
|
||||
|
||||
|
||||
class ExitIntent(StrEnum):
|
||||
"""Per-entry exit behavior requested for a layer control."""
|
||||
|
||||
DELETE = "delete"
|
||||
SUSPEND = "suspend"
|
||||
|
||||
|
||||
@dataclass(slots=True)
|
||||
class LayerControl:
|
||||
"""Control slot passed into a layer entry context.
|
||||
"""Stateful control slot passed into a layer entry context.
|
||||
|
||||
``Layer.enter`` requires the caller to provide this object. Set
|
||||
``tmp_leave`` before leaving the context to run temporary-leave logic
|
||||
instead of delete logic. Reusing that same control on a later entry will
|
||||
consume ``tmp_leave`` and run reenter logic; using a fresh control starts
|
||||
from create logic.
|
||||
``Layer.enter`` requires the caller to provide this object. The control owns
|
||||
the layer lifecycle state, the current entry's exit intent, and arbitrary
|
||||
per-session runtime state. Call ``suspend_on_exit`` before leaving the
|
||||
context to make a later entry resume; call ``delete_on_exit`` or do nothing
|
||||
for the default delete behavior. Store session-local ids and resource
|
||||
handles in ``runtime_state`` so concurrent or later sessions do not share
|
||||
mutable runtime data through the layer instance.
|
||||
|
||||
``runtime_state`` intentionally persists after suspend and delete. Suspend,
|
||||
resume, and delete hooks can inspect the same values created on entry, and
|
||||
callers may inspect closed-session diagnostics after exit. Reuse is still
|
||||
governed by ``state``: a closed control cannot be entered again.
|
||||
"""
|
||||
|
||||
tmp_leave: bool = False
|
||||
state: LifecycleState = LifecycleState.NEW
|
||||
exit_intent: ExitIntent = ExitIntent.DELETE
|
||||
runtime_state: dict[str, object] = field(default_factory=dict)
|
||||
|
||||
def suspend_on_exit(self) -> None:
|
||||
"""Request suspend behavior when the current layer entry exits."""
|
||||
self.exit_intent = ExitIntent.SUSPEND
|
||||
|
||||
def delete_on_exit(self) -> None:
|
||||
"""Request delete behavior when the current layer entry exits."""
|
||||
self.exit_intent = ExitIntent.DELETE
|
||||
|
||||
|
||||
@dataclass(frozen=True, slots=True)
|
||||
@ -100,10 +139,12 @@ class Layer[DepsT: LayerDeps, PromptT, ToolT](ABC):
|
||||
|
||||
Subclasses expose optional prompt fragments and tools through typed
|
||||
properties. They declare required dependencies in the ``DepsT`` container
|
||||
rather than by accepting dependencies in ``__init__``. The default async
|
||||
context manager handles create, delete, temporary-leave, and reenter
|
||||
transitions; layers can override ``enter`` when they need to wrap extra
|
||||
runtime resources.
|
||||
rather than by accepting dependencies in ``__init__``. Layer instances can be
|
||||
entered by multiple sessions, including concurrently, so lifecycle hooks
|
||||
should store session-local runtime values on the passed ``LayerControl``.
|
||||
The default async context manager handles create, resume, suspend, and
|
||||
delete transitions; layers can override ``enter`` when they need to wrap
|
||||
extra runtime resources.
|
||||
"""
|
||||
|
||||
deps_type: type[DepsT]
|
||||
@ -164,33 +205,45 @@ class Layer[DepsT: LayerDeps, PromptT, ToolT](ABC):
|
||||
|
||||
@asynccontextmanager
|
||||
async def lifecycle_enter(self, control: LayerControl) -> AsyncIterator[None]:
|
||||
"""Run the default create/reenter and delete/temporary-leave lifecycle."""
|
||||
was_tmp_left = control.tmp_leave
|
||||
control.tmp_leave = False
|
||||
if was_tmp_left:
|
||||
await self.on_context_reenter(control)
|
||||
else:
|
||||
"""Run the default explicit lifecycle state machine for one entry."""
|
||||
if control.state is LifecycleState.NEW:
|
||||
control.exit_intent = ExitIntent.DELETE
|
||||
await self.on_context_create(control)
|
||||
control.state = LifecycleState.ACTIVE
|
||||
elif control.state is LifecycleState.SUSPENDED:
|
||||
control.exit_intent = ExitIntent.DELETE
|
||||
await self.on_context_resume(control)
|
||||
control.state = LifecycleState.ACTIVE
|
||||
elif control.state is LifecycleState.ACTIVE:
|
||||
raise RuntimeError(
|
||||
"LayerControl is already active; duplicate or nested enter is not allowed."
|
||||
)
|
||||
elif control.state is LifecycleState.CLOSED:
|
||||
raise RuntimeError(
|
||||
"LayerControl is closed; create a new compositor session before entering again."
|
||||
)
|
||||
|
||||
try:
|
||||
yield
|
||||
finally:
|
||||
if control.tmp_leave:
|
||||
await self.on_context_tmp_leave(control)
|
||||
if control.exit_intent is ExitIntent.SUSPEND:
|
||||
await self.on_context_suspend(control)
|
||||
control.state = LifecycleState.SUSPENDED
|
||||
else:
|
||||
await self.on_context_delete(control)
|
||||
control.state = LifecycleState.CLOSED
|
||||
|
||||
async def on_context_create(self, control: LayerControl) -> None:
|
||||
"""Run when the layer context is entered from a non-temporary state."""
|
||||
"""Run when the layer context is entered from ``LifecycleState.NEW``."""
|
||||
|
||||
async def on_context_delete(self, control: LayerControl) -> None:
|
||||
"""Run when the layer context exits without ``tmp_leave`` set."""
|
||||
"""Run when the layer context exits with ``ExitIntent.DELETE``."""
|
||||
|
||||
async def on_context_tmp_leave(self, control: LayerControl) -> None:
|
||||
"""Run when the layer context exits with ``tmp_leave`` set."""
|
||||
async def on_context_suspend(self, control: LayerControl) -> None:
|
||||
"""Run when the layer context exits with ``ExitIntent.SUSPEND``."""
|
||||
|
||||
async def on_context_reenter(self, control: LayerControl) -> None:
|
||||
"""Run when the layer context enters after a temporary leave."""
|
||||
async def on_context_resume(self, control: LayerControl) -> None:
|
||||
"""Run when the layer context enters from ``LifecycleState.SUSPENDED``."""
|
||||
|
||||
@property
|
||||
def prefix_prompts(self) -> Sequence[PromptT]:
|
||||
|
||||
@ -1,11 +1,21 @@
|
||||
import asyncio
|
||||
from collections import OrderedDict
|
||||
from collections.abc import Iterator
|
||||
from dataclasses import dataclass, field
|
||||
from itertools import count
|
||||
|
||||
from typing_extensions import override
|
||||
|
||||
from agenton.compositor import Compositor, CompositorControl
|
||||
from agenton.layers import LayerControl, NoLayerDeps, PlainLayer, PlainPromptType, PlainToolType
|
||||
from agenton.compositor import Compositor, CompositorSession
|
||||
from agenton.layers import (
|
||||
ExitIntent,
|
||||
LayerControl,
|
||||
LifecycleState,
|
||||
NoLayerDeps,
|
||||
PlainLayer,
|
||||
PlainPromptType,
|
||||
PlainToolType,
|
||||
)
|
||||
|
||||
|
||||
@dataclass(slots=True)
|
||||
@ -19,81 +29,259 @@ class TraceLayer(PlainLayer[NoLayerDeps]):
|
||||
self.events.append("create")
|
||||
|
||||
@override
|
||||
async def on_context_tmp_leave(self, control: LayerControl) -> None:
|
||||
self.events.append("tmp_leave")
|
||||
async def on_context_suspend(self, control: LayerControl) -> None:
|
||||
self.events.append("suspend")
|
||||
|
||||
@override
|
||||
async def on_context_reenter(self, control: LayerControl) -> None:
|
||||
self.events.append("reenter")
|
||||
async def on_context_resume(self, control: LayerControl) -> None:
|
||||
self.events.append("resume")
|
||||
|
||||
@override
|
||||
async def on_context_delete(self, control: LayerControl) -> None:
|
||||
self.events.append("delete")
|
||||
|
||||
|
||||
def test_compositor_enter_creates_control_and_applies_tmp_leave_to_all_layers() -> None:
|
||||
first_layer = TraceLayer()
|
||||
second_layer = TraceLayer()
|
||||
compositor: Compositor[PlainPromptType, PlainToolType] = Compositor(
|
||||
layers=OrderedDict(
|
||||
[
|
||||
("first", first_layer),
|
||||
("second", second_layer),
|
||||
]
|
||||
)
|
||||
)
|
||||
compositor_control = CompositorControl(compositor.layers)
|
||||
def _compositor(*layer_names: str) -> tuple[Compositor[PlainPromptType, PlainToolType], dict[str, TraceLayer]]:
|
||||
layers = {layer_name: TraceLayer() for layer_name in layer_names}
|
||||
compositor: Compositor[PlainPromptType, PlainToolType] = Compositor(layers=OrderedDict(layers.items()))
|
||||
return compositor, layers
|
||||
|
||||
|
||||
def test_compositor_session_suspends_resumes_and_deletes_all_layers() -> None:
|
||||
compositor, layers = _compositor("first", "second")
|
||||
session = compositor.new_session()
|
||||
|
||||
async def run() -> None:
|
||||
async with compositor.enter(compositor_control) as control:
|
||||
assert control is compositor_control
|
||||
assert list(control.layer_controls) == ["first", "second"]
|
||||
control.tmp_leave = True
|
||||
async with compositor.enter(session) as active_session:
|
||||
assert active_session is session
|
||||
assert list(active_session.layer_controls) == ["first", "second"]
|
||||
active_session.suspend_on_exit()
|
||||
assert active_session.layer("first").exit_intent is ExitIntent.SUSPEND
|
||||
|
||||
async with compositor.enter(compositor_control):
|
||||
assert session.layer("first").state is LifecycleState.SUSPENDED
|
||||
|
||||
async with compositor.enter(session):
|
||||
pass
|
||||
|
||||
asyncio.run(run())
|
||||
|
||||
assert first_layer.events == ["create", "tmp_leave", "reenter", "delete"]
|
||||
assert second_layer.events == ["create", "tmp_leave", "reenter", "delete"]
|
||||
assert layers["first"].events == ["create", "suspend", "resume", "delete"]
|
||||
assert layers["second"].events == ["create", "suspend", "resume", "delete"]
|
||||
assert session.layer("first").state is LifecycleState.CLOSED
|
||||
|
||||
|
||||
def test_compositor_enter_does_not_store_tmp_leave_on_layer() -> None:
|
||||
layer = TraceLayer()
|
||||
compositor: Compositor[PlainPromptType, PlainToolType] = Compositor(
|
||||
layers=OrderedDict([("trace", layer)])
|
||||
)
|
||||
def test_compositor_enter_without_session_uses_fresh_lifecycle_each_time() -> None:
|
||||
compositor, layers = _compositor("trace")
|
||||
|
||||
async def run() -> None:
|
||||
async with compositor.enter() as control:
|
||||
control.tmp_leave = True
|
||||
async with compositor.enter() as session:
|
||||
session.suspend_on_exit()
|
||||
|
||||
async with compositor.enter():
|
||||
pass
|
||||
|
||||
asyncio.run(run())
|
||||
|
||||
assert layer.events == ["create", "tmp_leave", "create", "delete"]
|
||||
assert layers["trace"].events == ["create", "suspend", "create", "delete"]
|
||||
|
||||
|
||||
def test_compositor_enter_rejects_control_with_mismatched_layer_names() -> None:
|
||||
layer = TraceLayer()
|
||||
compositor: Compositor[PlainPromptType, PlainToolType] = Compositor(
|
||||
layers=OrderedDict([("trace", layer)])
|
||||
)
|
||||
compositor_control = CompositorControl(["other"])
|
||||
def test_compositor_enter_rejects_session_with_mismatched_layer_names() -> None:
|
||||
compositor, _layers = _compositor("trace")
|
||||
session = CompositorSession(["other"])
|
||||
|
||||
async def run() -> None:
|
||||
async with compositor.enter(compositor_control):
|
||||
async with compositor.enter(session):
|
||||
pass
|
||||
|
||||
try:
|
||||
asyncio.run(run())
|
||||
except ValueError as e:
|
||||
assert str(e) == (
|
||||
"CompositorControl layer names must match compositor layers in order. "
|
||||
"CompositorSession layer names must match compositor layers in order. "
|
||||
"Expected [trace], got [other]."
|
||||
)
|
||||
else:
|
||||
raise AssertionError("Expected ValueError.")
|
||||
|
||||
|
||||
def test_compositor_enter_rejects_same_active_session_nested() -> None:
|
||||
compositor, _layers = _compositor("trace")
|
||||
session = compositor.new_session()
|
||||
|
||||
async def run() -> None:
|
||||
async with compositor.enter(session):
|
||||
async with compositor.enter(session):
|
||||
pass
|
||||
|
||||
try:
|
||||
asyncio.run(run())
|
||||
except RuntimeError as e:
|
||||
assert str(e) == "LayerControl is already active; duplicate or nested enter is not allowed."
|
||||
else:
|
||||
raise AssertionError("Expected RuntimeError.")
|
||||
|
||||
|
||||
def test_compositor_enter_rejects_closed_session() -> None:
|
||||
compositor, _layers = _compositor("trace")
|
||||
session = compositor.new_session()
|
||||
|
||||
async def run() -> None:
|
||||
async with compositor.enter(session):
|
||||
pass
|
||||
|
||||
async with compositor.enter(session):
|
||||
pass
|
||||
|
||||
try:
|
||||
asyncio.run(run())
|
||||
except RuntimeError as e:
|
||||
assert str(e) == "LayerControl is closed; create a new compositor session before entering again."
|
||||
else:
|
||||
raise AssertionError("Expected RuntimeError.")
|
||||
|
||||
|
||||
def test_per_layer_suspend_on_exit_only_resumes_that_layer() -> None:
|
||||
compositor, layers = _compositor("first", "second")
|
||||
session = compositor.new_session()
|
||||
|
||||
async def run() -> None:
|
||||
async with compositor.enter(session):
|
||||
session.layer("first").suspend_on_exit()
|
||||
|
||||
assert session.layer("first").state is LifecycleState.SUSPENDED
|
||||
assert session.layer("second").state is LifecycleState.CLOSED
|
||||
|
||||
async with compositor.enter(session):
|
||||
pass
|
||||
|
||||
try:
|
||||
asyncio.run(run())
|
||||
except RuntimeError as e:
|
||||
assert str(e) == "LayerControl is closed; create a new compositor session before entering again."
|
||||
else:
|
||||
raise AssertionError("Expected RuntimeError.")
|
||||
|
||||
assert layers["first"].events == ["create", "suspend"]
|
||||
assert layers["second"].events == ["create", "delete"]
|
||||
|
||||
|
||||
@dataclass(slots=True)
|
||||
class FailingCreateLayer(PlainLayer[NoLayerDeps]):
|
||||
attempts: int = 0
|
||||
|
||||
@override
|
||||
async def on_context_create(self, control: LayerControl) -> None:
|
||||
self.attempts += 1
|
||||
if self.attempts == 1:
|
||||
raise RuntimeError("create failed")
|
||||
|
||||
|
||||
def test_failed_create_keeps_control_reusable_as_new() -> None:
|
||||
layer = FailingCreateLayer()
|
||||
compositor: Compositor[PlainPromptType, PlainToolType] = Compositor(layers=OrderedDict([("trace", layer)]))
|
||||
session = compositor.new_session()
|
||||
|
||||
async def fail_then_retry() -> None:
|
||||
try:
|
||||
async with compositor.enter(session):
|
||||
pass
|
||||
except RuntimeError as e:
|
||||
assert str(e) == "create failed"
|
||||
else:
|
||||
raise AssertionError("Expected RuntimeError.")
|
||||
|
||||
assert session.layer("trace").state is LifecycleState.NEW
|
||||
|
||||
async with compositor.enter(session):
|
||||
pass
|
||||
|
||||
asyncio.run(fail_then_retry())
|
||||
|
||||
assert session.layer("trace").state is LifecycleState.CLOSED
|
||||
assert layer.attempts == 2
|
||||
|
||||
|
||||
@dataclass(slots=True)
|
||||
class FailingResumeLayer(PlainLayer[NoLayerDeps]):
|
||||
resumed: bool = False
|
||||
|
||||
@override
|
||||
async def on_context_resume(self, control: LayerControl) -> None:
|
||||
if not self.resumed:
|
||||
self.resumed = True
|
||||
raise RuntimeError("resume failed")
|
||||
|
||||
|
||||
def test_failed_resume_keeps_control_reusable_as_suspended() -> None:
|
||||
layer = FailingResumeLayer()
|
||||
compositor: Compositor[PlainPromptType, PlainToolType] = Compositor(layers=OrderedDict([("trace", layer)]))
|
||||
session = compositor.new_session()
|
||||
|
||||
async def suspend_fail_then_retry() -> None:
|
||||
async with compositor.enter(session) as active_session:
|
||||
active_session.suspend_on_exit()
|
||||
|
||||
try:
|
||||
async with compositor.enter(session):
|
||||
pass
|
||||
except RuntimeError as e:
|
||||
assert str(e) == "resume failed"
|
||||
else:
|
||||
raise AssertionError("Expected RuntimeError.")
|
||||
|
||||
assert session.layer("trace").state is LifecycleState.SUSPENDED
|
||||
|
||||
async with compositor.enter(session):
|
||||
pass
|
||||
|
||||
asyncio.run(suspend_fail_then_retry())
|
||||
|
||||
assert session.layer("trace").state is LifecycleState.CLOSED
|
||||
|
||||
|
||||
@dataclass(slots=True)
|
||||
class RuntimeStateLayer(PlainLayer[NoLayerDeps]):
|
||||
next_id: Iterator[int] = field(default_factory=lambda: count(1))
|
||||
|
||||
@override
|
||||
async def on_context_create(self, control: LayerControl) -> None:
|
||||
runtime_id = next(self.next_id)
|
||||
control.runtime_state["runtime_id"] = runtime_id
|
||||
|
||||
@override
|
||||
async def on_context_resume(self, control: LayerControl) -> None:
|
||||
control.runtime_state["resumed_runtime_id"] = control.runtime_state["runtime_id"]
|
||||
|
||||
@override
|
||||
async def on_context_delete(self, control: LayerControl) -> None:
|
||||
control.runtime_state["deleted_runtime_id"] = control.runtime_state["runtime_id"]
|
||||
|
||||
|
||||
def test_runtime_state_is_per_session_and_survives_suspend_resume_delete() -> None:
|
||||
layer = RuntimeStateLayer()
|
||||
compositor: Compositor[PlainPromptType, PlainToolType] = Compositor(layers=OrderedDict([("trace", layer)]))
|
||||
first_session = compositor.new_session()
|
||||
second_session = compositor.new_session()
|
||||
|
||||
async def run() -> None:
|
||||
async with compositor.enter(first_session) as active_session:
|
||||
active_session.suspend_on_exit()
|
||||
|
||||
async with compositor.enter(second_session):
|
||||
pass
|
||||
|
||||
async with compositor.enter(first_session):
|
||||
pass
|
||||
|
||||
asyncio.run(run())
|
||||
|
||||
assert first_session.layer("trace").runtime_state == {
|
||||
"runtime_id": 1,
|
||||
"resumed_runtime_id": 1,
|
||||
"deleted_runtime_id": 1,
|
||||
}
|
||||
assert second_session.layer("trace").runtime_state == {
|
||||
"runtime_id": 2,
|
||||
"deleted_runtime_id": 2,
|
||||
}
|
||||
assert not hasattr(layer, "runtime_id")
|
||||
|
||||
@ -1,3 +1,4 @@
|
||||
import os
|
||||
import subprocess
|
||||
import sys
|
||||
from pathlib import Path
|
||||
@ -7,9 +8,13 @@ PROJECT_ROOT = Path(__file__).resolve().parents[3]
|
||||
|
||||
|
||||
def _run_example(path: str) -> subprocess.CompletedProcess[str]:
|
||||
env = os.environ.copy()
|
||||
_ = env.pop("OPENAI_API_KEY", None)
|
||||
|
||||
return subprocess.run(
|
||||
[sys.executable, path],
|
||||
cwd=PROJECT_ROOT,
|
||||
env=env,
|
||||
text=True,
|
||||
capture_output=True,
|
||||
check=False,
|
||||
@ -22,7 +27,7 @@ def test_agenton_basics_example_smoke() -> None:
|
||||
assert result.returncode == 0, result.stderr
|
||||
assert "Prompts:" in result.stdout
|
||||
assert "Tools:" in result.stdout
|
||||
assert "Lifecycle: ['create', 'tmp_leave', 'reenter', 'delete']" in result.stdout
|
||||
assert "Lifecycle: ['create', 'suspend', 'resume', 'delete']" in result.stdout
|
||||
|
||||
|
||||
def test_agenton_pydantic_ai_example_smoke() -> None:
|
||||
|
||||
Loading…
Reference in New Issue
Block a user