refactor dify-agent agenton run model

This commit is contained in:
盐粒 Yanli 2026-05-13 03:51:37 +08:00
parent b4ce54a7ea
commit ae4a3f75f4
32 changed files with 1442 additions and 966 deletions

View File

@ -1,48 +1,44 @@
# Agenton API reference
This page summarizes the public Agenton API. Import paths are shown for the
symbols commonly used by layer authors and compositor callers.
This page summarizes the public Agenton API. Import paths are shown for symbols
commonly used by layer authors and compositor callers.
## Layers: `agenton.layers`
### `Layer[DepsT, PromptT, UserPromptT, ToolT, ConfigT, RuntimeStateT, RuntimeHandlesT]`
### `Layer[DepsT, PromptT, UserPromptT, ToolT, ConfigT, RuntimeStateT]`
Framework-neutral base class for prompt/tool layers.
Framework-neutral base class for invocation-scoped prompt/tool layers.
Class attributes:
- `type_id: str | None`: registry id for config-backed plugin layers.
- `config_type: type[LayerConfig]`: Pydantic schema for serialized layer config.
- `type_id: str | None`: provider id for config-backed graph nodes.
- `config_type: type[LayerConfig]`: Pydantic schema for per-run layer config.
- `runtime_state_type: type[BaseModel]`: Pydantic schema for snapshot-safe
per-session state.
- `runtime_handles_type: type[BaseModel]`: Pydantic schema for live runtime
handles; use `arbitrary_types_allowed=True` for client/process objects.
per-layer state.
- `deps_type: type[LayerDeps]`: inferred from the layer generic base or declared
explicitly.
Invocation attributes assigned by `CompositorRun`:
- `config: ConfigT`
- `deps: DepsT`
- `runtime_state: RuntimeStateT`
Construction and dependency APIs:
- `from_config(config: ConfigT) -> Self`: create a layer from schema-validated
config. The default implementation raises `TypeError`.
- `from_config(config: ConfigT) -> Self`: create a fresh layer from
schema-validated config. The default implementation supports only empty config.
- `dependency_names() -> frozenset[str]`: dependency fields declared by
`deps_type`.
- `bind_deps(deps: Mapping[str, Layer | None]) -> None`: bind graph dependencies.
- `new_control(state=LifecycleState.NEW, runtime_state=None) -> LayerControl`: create
a schema-validated per-session control.
- `require_control(control, active=False) -> LayerControl`: validate that a
capability method received this layer's own control with the expected runtime
schemas, optionally requiring `LifecycleState.ACTIVE`.
- `bind_deps(deps: Mapping[str, Layer | None]) -> None`: bind direct layer
instance dependencies for one invocation.
Lifecycle hooks:
- `on_context_create(control)`
- `on_context_resume(control)`
- `on_context_suspend(control)`
- `on_context_delete(control)`
- `enter(control)` / `lifecycle_enter(control)`: async context manager entry
surface. The base lifecycle owns the per-entry resource stack; override
`enter()` only for unusual wrapping that cannot be expressed as registered
resources.
- `on_context_create() -> None`
- `on_context_resume() -> None`
- `on_context_suspend() -> None`
- `on_context_delete() -> None`
Prompt/tool authoring surfaces:
@ -57,50 +53,23 @@ Aggregation adapters implemented by typed layer families:
- `wrap_user_prompt(prompt: UserPromptT) -> object`
- `wrap_tool(tool: ToolT) -> object`
### `LayerControl[RuntimeStateT, RuntimeHandlesT]`
Per-layer, per-session lifecycle control.
Fields:
- `state: LifecycleState`
- `exit_intent: ExitIntent`
- `runtime_state: RuntimeStateT`
- `runtime_handles: RuntimeHandlesT`
Methods:
- `suspend_on_exit() -> None`
- `delete_on_exit() -> None`
- `enter_async_resource(cm) -> T`: enter an async context manager on the current
entry resource stack and return its resource.
- `add_async_cleanup(callback) -> None`: register an async cleanup callback on the
current entry resource stack.
- `control_for(dep_layer) -> LayerControl`: resolve the unique dependency control
whose resolved target is `dep_layer` in the same session.
- `control_for(dep_name, dep_layer) -> LayerControl`: resolve a named dependency
control when multiple dependency fields could point at the same layer instance.
`runtime_state` is serialized in session snapshots. `runtime_handles` is never
serialized and should be rehydrated from runtime state in resume hooks. Private
owner links used by `control_for` and the per-entry resource stack are
runtime-only and are not snapshotted. Resource-stack APIs are available only
while a layer entry is being created/resumed, active, or exiting.
### Schema defaults and lifecycle enums
- `EmptyLayerConfig`
- `LayerConfig`: base DTO for serializable layer config schemas
- `LayerConfigValue`: JSON value or concrete `LayerConfig` DTO
- `EmptyRuntimeState`
- `EmptyRuntimeHandles`
- `LifecycleState`: `NEW`, `ACTIVE`, `SUSPENDED`, `CLOSED`
- `ExitIntent`: `DELETE`, `SUSPEND`
- `LayerConfig`: base DTO for serializable layer config schemas.
- `LayerConfigValue`: JSON value or concrete `LayerConfig` DTO.
- `EmptyLayerConfig`: default config schema for layers without config.
- `EmptyRuntimeState`: default serializable runtime-state schema.
- `LayerDeps`: typed dependency container base.
- `NoLayerDeps`: dependency container for layers with no dependencies.
- `LifecycleState`: `NEW`, `ACTIVE`, `SUSPENDED`, `CLOSED`.
- `ExitIntent`: `DELETE`, `SUSPEND`.
`ACTIVE` is internal to an entered run and is rejected in external snapshots.
### Typed layer families: `agenton.layers.types`
- `PlainLayer[DepsT, ConfigT, RuntimeStateT, RuntimeHandlesT]`
- `PydanticAILayer[DepsT, AgentDepsT, ConfigT, RuntimeStateT, RuntimeHandlesT]`
- `PlainLayer[DepsT, ConfigT, RuntimeStateT]`
- `PydanticAILayer[DepsT, AgentDepsT, ConfigT, RuntimeStateT]`
Tagged aggregate item types:
@ -112,47 +81,71 @@ Tagged aggregate item types:
### Config models
- `LayerNodeConfig`: `name`, `type`, `config`, `deps`, `metadata`
- `CompositorConfig`: `schema_version`, `layers`
- `LayerNodeConfig`: `name`, `type`, `deps`, `metadata`.
- `CompositorConfig`: `schema_version`, `layers`.
- `LayerConfigInput`: accepted per-run config input for one node.
Config nodes are pure serializable graph input. `LayerNodeConfig.config` accepts
plain JSON values or concrete `LayerConfig` DTO instances and serializes DTOs as
JSON objects. Use live instances for Python objects and callables.
Config nodes are pure serializable graph topology. Per-run layer config is passed
separately to `Compositor.enter(configs=...)` keyed by node name.
### Registry
### Providers and graph nodes
`LayerRegistry` manually registers config-backed layer classes.
`LayerProvider[LayerT]` is a reusable validated factory for one concrete layer
class.
- `register_layer(layer_type, type_id=None, factory=None) -> None`
- `resolve(type_id) -> LayerDescriptor`
- `descriptors() -> Mapping[str, LayerDescriptor]`
- `LayerProvider.from_layer_type(layer_type) -> LayerProvider`: construct through
`layer_type.from_config`.
- `LayerProvider.from_factory(layer_type=..., create=...) -> LayerProvider`:
construct through a custom typed-config factory.
- `type_id -> str | None`: provider id declared by the layer type.
- `validate_config(config=None) -> LayerConfig`: validate config without invoking
the factory.
- `create_layer(config=None) -> LayerT`: validate config and create a fresh layer.
- `create_layer_from_config(config) -> LayerT`: create from already validated
config and enforce fresh-instance semantics.
`LayerDescriptor` exposes `type_id`, `layer_type`, `config_type`,
`runtime_state_type`, `runtime_handles_type`, and optional `factory`.
`LayerNode(name, implementation, deps=None, metadata=None)` creates a stateless
graph node from a `Layer` subclass or `LayerProvider`. `deps` maps dependency
field names on the node's layer class to other node names.
### Builder
`CompositorBuilder(registry)` mixes config-backed nodes and live instances.
- `add_config(config) -> Self`
- `add_config_layer(name, type, config=None, deps=None) -> Self`
- `add_instance(name, layer, deps=None) -> Self`
- `build(prompt_transformer=None, user_prompt_transformer=None, tool_transformer=None) -> Compositor`
### Compositor
### `Compositor`
`Compositor[PromptT, ToolT, LayerPromptT, LayerToolT, UserPromptT, LayerUserPromptT]`
owns the ordered layer graph.
Dependency binding uses explicit `deps={dep_name: target_layer_name}` mappings
first, then implicit same-name layer binding. Optional dependencies without a
target are recorded as absent so `LayerControl.control_for(...)` raises `KeyError`
rather than returning a control.
owns the ordered graph plan and provider construction plans.
Construction:
- `Compositor(layers=..., deps_name_mapping=..., ...)`
- `Compositor.from_config(conf, registry=..., ...)`
- `Compositor(nodes, prompt_transformer=None, user_prompt_transformer=None, tool_transformer=None)`.
- `Compositor.from_config(conf, providers=..., node_providers=None, prompt_transformer=None, user_prompt_transformer=None, tool_transformer=None)`.
Public properties and entry API:
- `nodes -> tuple[LayerNode, ...]`: stateless graph plan in order.
- `enter(configs=None, session_snapshot=None) -> AsyncIterator[CompositorRun]`:
validate per-run configs and optional snapshot, create fresh layers, bind direct
dependencies, enter hooks in graph order, and exit hooks in reverse order.
`providers` resolve graph node `type` ids. `node_providers` are keyed by node name
and override type-id providers for node-specific construction.
### `CompositorRun`
`CompositorRun` is the single-invocation runtime object yielded by
`Compositor.enter(...)`.
Fields:
- `slots: OrderedDict[str, LayerRunSlot]`
- `session_snapshot: CompositorSessionSnapshot | None`
Layer access and exit intent:
- `get_layer(name) -> Layer`
- `get_layer(name, layer_type) -> LayerT`
- `suspend_on_exit() -> None`
- `delete_on_exit() -> None`
- `suspend_layer_on_exit(name) -> None`
- `delete_layer_on_exit(name) -> None`
Aggregation properties:
@ -162,27 +155,24 @@ Aggregation properties:
`user_prompt_transformer`.
- `tools -> list[ToolT]`: tools in layer order, then optional `tool_transformer`.
Session APIs:
Snapshot API:
- `new_session() -> CompositorSession`
- `enter(session=None) -> AsyncIterator[CompositorSession]`
- `snapshot_session(session) -> CompositorSessionSnapshot`
- `session_from_snapshot(snapshot) -> CompositorSession`
- `snapshot_session() -> CompositorSessionSnapshot`: snapshot non-active layer
lifecycle state and runtime state.
### Sessions and snapshots
`session_snapshot` is populated after context exit. Core run slots default to
delete-on-exit; request suspend before exit when the next snapshot must be
resumable.
`CompositorSession` owns ordered layer controls.
### Run slots and snapshots
- `suspend_on_exit() -> None`
- `delete_on_exit() -> None`
- `layer(name) -> LayerControl`
- `LayerRunSlot`: `layer`, `lifecycle_state`, `exit_intent`.
- `LayerSessionSnapshot`: `name`, `lifecycle_state`, `runtime_state`.
- `CompositorSessionSnapshot`: `schema_version`, `layers`.
Snapshot models:
- `LayerSessionSnapshot`: `name`, `state`, `runtime_state`
- `CompositorSessionSnapshot`: `schema_version`, `layers`
Snapshots reject active sessions and exclude `runtime_handles` and `exit_intent`.
Snapshots include ordered layer lifecycle state and JSON-safe runtime state only.
They exclude live resources, dependencies, prompts, tools, per-run config, and
exit intent.
## Collection layers and transformers
@ -190,9 +180,11 @@ Snapshots reject active sessions and exclude `runtime_handles` and `exit_intent`
- `PromptLayer`: config-backed layer with `PromptLayerConfig(prefix, user,
suffix)` and `type_id = "plain.prompt"`.
- `ObjectLayer`: instance-only layer for Python objects.
- `ToolsLayer`: instance-only layer for callables.
- `DynamicToolsLayer`: instance-only layer for object-bound callables.
- `ObjectLayer`: factory-backed layer for Python objects.
- `ToolsLayer`: factory-backed layer for plain callables.
- `DynamicToolsLayer`: factory-backed layer for object-bound callables.
- `with_object`: decorator for dynamic tools whose first argument is supplied by
an `ObjectLayer` dependency.
### Pydantic AI bridge
@ -200,8 +192,10 @@ Snapshots reject active sessions and exclude `runtime_handles` and `exit_intent`
pydantic-ai system prompts, user prompts, and tools while depending on an
`ObjectLayer` for `RunContext.deps`.
`agenton_collections.transformers.PYDANTIC_AI_TRANSFORMERS` provides:
`agenton_collections.transformers.pydantic_ai.PYDANTIC_AI_TRANSFORMERS` provides:
- `prompt_transformer`: maps `compositor.prompts` to pydantic-ai system prompt functions.
- `user_prompt_transformer`: maps `compositor.user_prompts` to pydantic-ai `UserContent`.
- `tool_transformer`: maps `compositor.tools` to pydantic-ai tools.
- `prompt_transformer`: maps tagged Agenton prompt items to pydantic-ai system
prompt functions.
- `user_prompt_transformer`: maps tagged Agenton user prompt items to pydantic-ai
`UserContent` values.
- `tool_transformer`: maps tagged Agenton tool items to pydantic-ai tools.

View File

@ -1,175 +1,184 @@
# Agenton user guide
Agenton composes shared `Layer` instances into a named graph. Treat layer
instances as reusable capability definitions: config and dependency declarations
belong on the layer class or instance, while per-session runtime values belong
on the `LayerControl` created for that layer in a `CompositorSession`.
Agenton composes reusable graph plans from `LayerNode`s and `LayerProvider`s.
The core is state-only: a `Compositor` stores no live layer instances, clients,
cleanup stacks, or run state. Each `Compositor.enter(...)` call creates a fresh
`CompositorRun` with new layer instances, direct dependency bindings, lifecycle
state, and an optional hydrated session snapshot.
## Config, runtime state, and runtime handles
## Config and runtime state
- **Config** is serializable graph input. Config-constructible layers declare a
`type_id` and a Pydantic `LayerConfig` schema; builders validate node config
before calling `Layer.from_config(validated_config)`.
- **Runtime state** is serializable per-layer/per-session state. Layers declare a
Pydantic `runtime_state_type`; session snapshots persist this model with
`model_dump(mode="json")`.
- **Runtime handles** are live Python objects such as clients, open files, or
process handles. Layers declare a Pydantic `runtime_handles_type` with
`arbitrary_types_allowed=True`. Handles are never serialized; resume hooks
should rehydrate them from runtime state. Register handles that need async
cleanup with the control's entry resource stack rather than closing them
manually in layer instances.
- **Graph config** is serializable topology: node `name`, provider `type`,
dependency mappings, and metadata. `LayerNodeConfig` deliberately contains no
layer config.
- **Per-run layer config** is passed to `Compositor.enter(configs=...)` as a
mapping keyed by node name. Providers validate each value with the layer's
`config_type` before any factory runs.
- **Runtime state** is serializable per-layer invocation state on
`layer.runtime_state`. Session snapshots persist only lifecycle state and this
model's JSON-safe data.
- **Live Python resources** such as clients, files, sockets, or process handles
stay outside Agenton core. Own them in application code or integration-specific
context managers that wrap compositor entry.
## Define a config-backed layer
Use a `LayerConfig` model for config and pass it through the typed layer family so
`Layer.__init_subclass__` can infer the schema:
Use a `LayerConfig` model for per-run config and inherit from a typed layer family
so `Layer.__init_subclass__` can infer schemas:
```python {test="skip" lint="skip"}
from dataclasses import dataclass
from pydantic import ConfigDict
from typing_extensions import Self, override
from agenton.layers import LayerConfig, NoLayerDeps, PlainLayer
class GreetingConfig(LayerConfig):
prefix: str
model_config = ConfigDict(extra="forbid")
@dataclass
@dataclass(slots=True)
class GreetingLayer(PlainLayer[NoLayerDeps, GreetingConfig]):
type_id = "example.greeting"
prefix: str
@classmethod
@override
def from_config(cls, config: GreetingConfig) -> Self:
return cls(prefix=config.prefix)
@property
@override
def prefix_prompts(self) -> list[str]:
return [self.prefix]
```
Omitted schema slots default to `EmptyLayerConfig`, `EmptyRuntimeState`, and
`EmptyRuntimeHandles`. Lifecycle hooks can annotate controls as
`LayerControl[MyState, MyHandles]` to get static checking and IDE completion for
runtime state and handles.
Omitted schema slots default to `EmptyLayerConfig` and `EmptyRuntimeState`.
Lifecycle hooks are no-argument methods on the layer instance; use `self.deps`
for dependencies and `self.runtime_state` for serializable mutable state.
## Live resources
The base lifecycle creates a resource stack for each `LayerControl` entry before
`on_context_create` or `on_context_resume` runs. Enter async resources through the
control, store the live handle in `runtime_handles`, and clear the handle in
`on_context_suspend`/`on_context_delete`; the resource stack performs the actual
close after those hooks and also cleans up if create/resume or the context body
raises.
Agenton does not own resource cleanup. Keep live resources in the surrounding
application and pass them to capability methods explicitly:
```python {test="skip" lint="skip"}
class ClientHandles(BaseModel):
client: httpx.AsyncClient | None = None
model_config = ConfigDict(arbitrary_types_allowed=True)
@dataclass(slots=True)
class ClientUserLayer(PlainLayer[NoLayerDeps]):
def make_client_user(self, *, http_client: httpx.AsyncClient) -> ClientUser:
return ClientUser(http_client)
@dataclass
class ClientLayer(PlainLayer[NoLayerDeps, EmptyLayerConfig, EmptyRuntimeState, ClientHandles]):
async def on_context_create(self, control: LayerControl[EmptyRuntimeState, ClientHandles]) -> None:
control.runtime_handles.client = await control.enter_async_resource(httpx.AsyncClient())
async def on_context_delete(self, control: LayerControl[EmptyRuntimeState, ClientHandles]) -> None:
control.runtime_handles.client = None
def make_client_user(self, control: LayerControl) -> ClientUser:
control = self.require_control(control, active=True)
if control.runtime_handles.client is None:
raise RuntimeError("client is not available")
return ClientUser(control.runtime_handles.client)
compositor = Compositor([LayerNode("client_user", ClientUserLayer)])
async with httpx.AsyncClient() as http_client:
async with compositor.enter() as run:
layer = run.get_layer("client_user", ClientUserLayer)
user = layer.make_client_user(http_client=http_client)
```
`Layer.require_control(control, active=True)` is the recommended first line for
capability methods that read runtime state or handles. It verifies that callers
passed this layer's own control from the current session and, when requested, that
the control is active.
This keeps deterministic cleanup at the integration boundary and leaves Agenton
snapshots limited to serializable runtime state.
## Register layers and build a compositor
## Build a compositor
Register config-constructible layers manually:
Use providers for config-backed layers and pass per-run config at entry time:
```python {test="skip" lint="skip"}
registry = LayerRegistry()
registry.register_layer(PromptLayer) # uses PromptLayer.type_id == "plain.prompt"
```
from agenton.compositor import Compositor, CompositorConfig, LayerNodeConfig, LayerProvider
from agenton_collections.layers.plain import PromptLayer, PromptLayerConfig
Use `CompositorBuilder` to mix serializable config nodes with live instances:
```python {test="skip" lint="skip"}
compositor = (
CompositorBuilder(registry)
.add_config(
{
"layers": [
{
"name": "prompt",
"type": "plain.prompt",
"config": {"prefix": "Hi", "user": "Answer with examples."},
}
]
}
)
.add_instance(name="profile", layer=ObjectLayer(profile))
.build()
providers = (
LayerProvider.from_layer_type(PromptLayer),
LayerProvider.from_layer_type(GreetingLayer),
)
compositor = Compositor.from_config(
CompositorConfig(
layers=[
LayerNodeConfig(name="prompt", type="plain.prompt"),
LayerNodeConfig(name="greeting", type="example.greeting"),
]
),
providers=providers,
)
async with compositor.enter(
configs={
"prompt": PromptLayerConfig(user="Answer with examples."),
"greeting": GreetingConfig(prefix="Hi"),
}
) as run:
prompts = run.prompts
```
Use `.add_instance()` for layers that require Python objects or callables, such
as `ObjectLayer`, `ToolsLayer`, and dynamic tool layers.
Use `LayerProvider.from_factory(...)` when construction needs Python objects or
callables. Provider factories receive only validated config and must return a
fresh layer instance for every invocation. For node-specific construction with
`Compositor.from_config`, pass a `node_providers={"node_name": provider}` mapping
to override the provider selected by type id for that node.
## Dependency controls
## Dependencies
Layer dependencies bind layer instances on `self.deps`. When a layer method also
needs the dependency's per-session state or handles, pass the current layer's
`LayerControl` into that method and resolve the dependency control from the same
session:
Layer dependencies bind direct layer instances onto `self.deps` for one run.
Dependency mappings use dependency field names as keys and compositor node names
as values:
```python {test="skip" lint="skip"}
class ModelDeps(LayerDeps):
plugin: PluginLayer
@dataclass
@dataclass(slots=True)
class ModelLayer(PlainLayer[ModelDeps]):
def make_model(self, control: LayerControl) -> Model:
plugin_control = control.control_for(self.deps.plugin)
return self.deps.plugin.make_provider(plugin_control)
def make_model(self) -> Model:
return self.deps.plugin.make_provider()
```
Use `control.control_for(dep_name, dep_layer)` when more than one dependency
field can point at the same layer instance. Optional dependencies that were not
bound have no control and raise `KeyError` if requested.
Optional dependencies are assigned `None` when absent. Missing required
dependencies, unknown dependency keys, and dependency targets with the wrong layer
type fail before lifecycle hooks run.
## System prompts and user prompts
## System prompts, user prompts, and tools
Layers expose three prompt surfaces:
Layers expose four authoring surfaces:
- `prefix_prompts`: system prompt fragments collected in layer order.
- `suffix_prompts`: system prompt fragments collected in reverse layer order.
- `user_prompts`: user-message fragments collected in layer order.
- `tools`: tool entries collected in layer order.
`PromptLayer` accepts `prefix`, `user`, and `suffix` config fields. For
pydantic-ai, `PYDANTIC_AI_TRANSFORMERS` maps `compositor.prompts` to system
prompt functions and `compositor.user_prompts` to values suitable for
`Agent.run(user_prompt=...)`.
`PromptLayer` accepts `prefix`, `user`, and `suffix` config fields. Aggregation is
available on the active `CompositorRun` as `run.prompts`, `run.user_prompts`, and
`run.tools`. For pydantic-ai, import
`agenton_collections.transformers.pydantic_ai.PYDANTIC_AI_TRANSFORMERS` and pass
it to `Compositor(...)` or `Compositor.from_config(...)` so tagged layer items are
converted to Pydantic AI prompt, user prompt, and tool values.
## Session snapshot and restore
`Compositor.snapshot_session(session)` serializes non-active sessions, including
layer lifecycle state and runtime state. It rejects active sessions because live
handles cannot be snapshotted safely. Restore with
`Compositor.session_from_snapshot(snapshot)`; restored controls validate runtime
state with each layer schema and initialize empty runtime handles. Suspended
sessions resume through `on_context_resume`, where handles should be hydrated
from the restored runtime state.
Core Agenton run slots default to delete-on-exit. Call `run.suspend_on_exit()` or
`run.suspend_layer_on_exit(name)` inside the active context when the next snapshot
should be resumable:
Create sessions with `Compositor.new_session()` or
`Compositor.session_from_snapshot()`. `Compositor.enter()` validates that every
session control uses the target layer's runtime state and handle schemas before
any lifecycle hook runs.
```python {test="skip" lint="skip"}
async with compositor.enter(configs=configs) as run:
run.suspend_on_exit()
snapshot = run.session_snapshot
async with compositor.enter(configs=configs, session_snapshot=snapshot) as restored_run:
restored_layer = restored_run.get_layer("stateful", StatefulLayer)
```
`run.session_snapshot` is populated after context exit. Snapshots include ordered
layer names, non-active lifecycle states, and JSON-safe runtime state only. Active
state is rejected at the DTO boundary, and closed layers cannot be entered again.
To resume, pass the snapshot to a later `Compositor.enter(...)` call with the same
layer names and order.
See also:

View File

@ -1,8 +1,8 @@
# Dify Agent Run API
The Dify Agent API exposes asynchronous agent runs backed by Agenton compositor
configuration, Pydantic AI runtime execution, Redis run records, and per-run Redis
Streams event logs. The FastAPI application lives at
The Dify Agent API exposes asynchronous agent runs backed by Agenton state-only
layer composition, Pydantic AI runtime execution, Redis run records, and per-run
Redis Streams event logs. The FastAPI application lives at
`dify-agent/src/dify_agent/server/app.py`.
Public Python DTOs and event models are exported from
@ -11,14 +11,14 @@ server-only and should not be used by API consumers.
## Input model
Create-run requests accept a `CompositorConfig` and an optional
Create-run requests accept a public `RunComposition` and an optional
`CompositorSessionSnapshot`. There is **no top-level `user_prompt` or model
profile field**. User input and model/provider selection are supplied by Agenton
layers. `layer_exit_signals` optionally controls whether layers suspend or delete
when the run leaves the active session; the default is suspend for all layers. In
the MVP server, the safe config-constructible layer registry includes
`plain.prompt`, `dify.plugin`, and `dify.plugin.llm`. The runtime reads the LLM
model layer named by `DIFY_AGENT_MODEL_LAYER_ID`, whose public value is `"llm"`.
layers. `on_exit` optionally controls whether layers suspend or delete when the
run leaves the active session; the default is suspend for all layers. In the MVP
server, the safe provider set includes `plain.prompt`, `dify.plugin`, and
`dify.plugin.llm`. The runtime reads the LLM model layer named by
`DIFY_AGENT_MODEL_LAYER_ID`, whose public value is `"llm"`.
Blank user input is rejected. A request with no user prompt, an empty string, or
only whitespace strings such as `"user": ["", " "]` returns `422` before a run
@ -38,7 +38,7 @@ Request:
```json
{
"compositor": {
"composition": {
"schema_version": 1,
"layers": [
{
@ -77,7 +77,7 @@ Request:
]
},
"session_snapshot": null,
"layer_exit_signals": {
"on_exit": {
"default": "suspend",
"layers": {
"prompt": "delete"
@ -100,16 +100,16 @@ same FastAPI process. Redis is not used as a job queue. Run records and per-run
event streams expire after `DIFY_AGENT_RUN_RETENTION_SECONDS`, which defaults to
`259200` seconds (3 days).
`dify.plugin` receives tenant/plugin identity only; daemon URL, API key, and
timeout are server settings. `dify.plugin.llm.credentials` accepts scalar values
only (`string`, `number`, `boolean`, or `null`). Unknown
`layer_exit_signals.layers` keys return `422` before a run record is created.
`dify.plugin` receives tenant/plugin identity only; daemon URL, API key, timeout,
and connection limits are server settings. `dify.plugin.llm.credentials` accepts
scalar values only (`string`, `number`, `boolean`, or `null`). Unknown
`on_exit.layers` keys return `422` before a run record is created.
Validation error example (`422`):
```json
{
"detail": "compositor.user_prompts must not be empty"
"detail": "run.user_prompts must not be empty"
}
```
@ -196,24 +196,30 @@ Use `dify_agent.client.Client` for both async and sync code. Async methods use
normal names; sync methods add `_sync`.
```python {test="skip" lint="skip"}
from agenton.compositor import CompositorConfig, LayerNodeConfig
from agenton.layers import ExitIntent
from agenton_collections.layers.plain import PromptLayerConfig
from dify_agent.client import Client
from dify_agent.layers.dify_plugin import DifyPluginLLMLayerConfig, DifyPluginLayerConfig
from dify_agent.protocol import DIFY_AGENT_MODEL_LAYER_ID, CreateRunRequest, LayerExitSignals
from dify_agent.protocol import (
DIFY_AGENT_MODEL_LAYER_ID,
CreateRunRequest,
LayerExitSignals,
RunComposition,
RunLayerSpec,
)
async def main() -> None:
request = CreateRunRequest(
compositor=CompositorConfig(
composition=RunComposition(
layers=[
LayerNodeConfig(name="prompt", type="plain.prompt", config=PromptLayerConfig(user="hello")),
LayerNodeConfig(
RunLayerSpec(name="prompt", type="plain.prompt", config=PromptLayerConfig(user="hello")),
RunLayerSpec(
name="plugin",
type="dify.plugin",
config=DifyPluginLayerConfig(tenant_id="tenant-id", plugin_id="langgenius/openai"),
),
LayerNodeConfig(
RunLayerSpec(
name=DIFY_AGENT_MODEL_LAYER_ID,
type="dify.plugin.llm",
deps={"plugin": "plugin"},
@ -225,7 +231,7 @@ async def main() -> None:
),
]
),
layer_exit_signals=LayerExitSignals(layers={"prompt": "delete"}),
on_exit=LayerExitSignals(layers={"prompt": ExitIntent.DELETE}),
)
async with Client(base_url="http://localhost:8000") as client:
run = await client.create_run(request)
@ -234,23 +240,22 @@ async def main() -> None:
```
```python {test="skip" lint="skip"}
from agenton.compositor import CompositorConfig, LayerNodeConfig
from agenton_collections.layers.plain import PromptLayerConfig
from dify_agent.client import Client
from dify_agent.layers.dify_plugin import DifyPluginLLMLayerConfig, DifyPluginLayerConfig
from dify_agent.protocol import DIFY_AGENT_MODEL_LAYER_ID, CreateRunRequest
from dify_agent.protocol import DIFY_AGENT_MODEL_LAYER_ID, CreateRunRequest, RunComposition, RunLayerSpec
request = CreateRunRequest(
compositor=CompositorConfig(
composition=RunComposition(
layers=[
LayerNodeConfig(name="prompt", type="plain.prompt", config=PromptLayerConfig(user="hello")),
LayerNodeConfig(
RunLayerSpec(name="prompt", type="plain.prompt", config=PromptLayerConfig(user="hello")),
RunLayerSpec(
name="plugin",
type="dify.plugin",
config=DifyPluginLayerConfig(tenant_id="tenant-id", plugin_id="langgenius/openai"),
),
LayerNodeConfig(
RunLayerSpec(
name=DIFY_AGENT_MODEL_LAYER_ID,
type="dify.plugin.llm",
deps={"plugin": "plugin"},
@ -297,7 +302,7 @@ Each event keeps the same envelope shape and has typed `data`: `run_started` use
CompositorSessionSnapshot }`, and `run_failed` uses `{ "error": string,
"reason": string | null }`. The session snapshot from `run_succeeded.data` can
be sent as `session_snapshot` in a later create-run request with the same
compositor layer names and order.
composition layer names and order.
## Consumer examples

View File

@ -12,14 +12,16 @@ Start Redis, then run one FastAPI/uvicorn process:
uv run --project dify-agent uvicorn dify_agent.server.app:app --reload
```
By default, the FastAPI lifespan creates both:
By default, the FastAPI lifespan creates:
- one Redis-backed run store used by HTTP routes
- one shared plugin-daemon `httpx.AsyncClient` used by local run tasks
- one process-local scheduler that starts background `asyncio` run tasks
This means local development needs one uvicorn process plus Redis. Run execution
still happens outside request handlers, so client disconnects do not cancel the
agent run.
This means local development needs one uvicorn process plus Redis, and
plugin-backed runs also need a reachable Dify plugin daemon. Run execution still
happens outside request handlers, so client disconnects do not cancel the agent
run.
## Configuration
@ -32,6 +34,15 @@ also reads `.env` and `dify-agent/.env` when present.
| `DIFY_AGENT_REDIS_PREFIX` | `dify-agent` | Prefix for Redis record and event keys. |
| `DIFY_AGENT_SHUTDOWN_GRACE_SECONDS` | `30` | Seconds to wait for active local runs during graceful shutdown before cancellation. |
| `DIFY_AGENT_RUN_RETENTION_SECONDS` | `259200` | Seconds to retain Redis run records and per-run event streams; defaults to 3 days. |
| `DIFY_AGENT_PLUGIN_DAEMON_URL` | `http://localhost:5002` | Base URL for the Dify plugin daemon. |
| `DIFY_AGENT_PLUGIN_DAEMON_API_KEY` | empty | API key sent to the Dify plugin daemon. |
| `DIFY_AGENT_PLUGIN_DAEMON_CONNECT_TIMEOUT` | `10` | Plugin-daemon HTTP connect timeout in seconds. |
| `DIFY_AGENT_PLUGIN_DAEMON_READ_TIMEOUT` | `600` | Plugin-daemon HTTP read timeout in seconds. |
| `DIFY_AGENT_PLUGIN_DAEMON_WRITE_TIMEOUT` | `30` | Plugin-daemon HTTP write timeout in seconds. |
| `DIFY_AGENT_PLUGIN_DAEMON_POOL_TIMEOUT` | `10` | Plugin-daemon HTTP connection-pool wait timeout in seconds. |
| `DIFY_AGENT_PLUGIN_DAEMON_MAX_CONNECTIONS` | `100` | Maximum total plugin-daemon HTTP connections. |
| `DIFY_AGENT_PLUGIN_DAEMON_MAX_KEEPALIVE_CONNECTIONS` | `20` | Maximum idle keep-alive plugin-daemon HTTP connections. |
| `DIFY_AGENT_PLUGIN_DAEMON_KEEPALIVE_EXPIRY` | `30` | Keep-alive expiry in seconds for idle plugin-daemon HTTP connections. |
Example `.env`:
@ -40,6 +51,8 @@ DIFY_AGENT_REDIS_URL=redis://localhost:6379/0
DIFY_AGENT_REDIS_PREFIX=dify-agent-dev
DIFY_AGENT_SHUTDOWN_GRACE_SECONDS=30
DIFY_AGENT_RUN_RETENTION_SECONDS=259200
DIFY_AGENT_PLUGIN_DAEMON_URL=http://localhost:5002
DIFY_AGENT_PLUGIN_DAEMON_API_KEY=replace-with-daemon-key
```
Run records and event streams use the same retention. Status writes refresh the
@ -48,7 +61,7 @@ record TTL so active runs that keep producing events remain observable.
## Scheduling and shutdown semantics
`POST /runs` validates the compositor, persists a `running` run record, and starts
`POST /runs` validates the composition, persists a `running` run record, and starts
an `asyncio` task in the same process. There is no Redis job stream, consumer
group, pending reclaim, or automatic retry layer.
@ -64,13 +77,13 @@ shared status/event visibility, not load balancing or queued-job recovery.
## Run inputs and session snapshots
The API does not accept a top-level `user_prompt`. Submit a `CompositorConfig`
whose Agenton layers provide user input. With the MVP registry, use
The API does not accept a top-level `user_prompt`. Submit a `RunComposition`
whose Agenton layers provide user input. With the MVP provider set, use
`plain.prompt` and its `config.user` field:
```json
{
"compositor": {
"composition": {
"schema_version": 1,
"layers": [
{
@ -92,7 +105,7 @@ persisted or scheduled.
There is no Pydantic AI history layer. To resume Agenton layer state, pass the
`session_snapshot` from a previous `run_succeeded.data` payload together with a
compositor that has the same layer names and order.
composition that has the same layer names and order.
## Observing runs
@ -123,5 +136,5 @@ The repository includes simple consumers that print observed output/events:
- `dify-agent/examples/dify_agent/dify_agent_examples/run_server_sse_consumer.py`
consumes raw SSE frames for an existing run id.
Both examples use the credential-free Pydantic AI `TestModel` profile; they still
require Redis and the API server.
The create-run examples submit Dify plugin model layers, so they require Redis,
the API server, plugin-daemon settings, and provider credentials.

View File

@ -5,12 +5,14 @@ from __future__ import annotations
import asyncio
from dataclasses import dataclass, field
from inspect import signature
from typing import cast
from typing_extensions import override
from agenton.compositor import CompositorBuilder, LayerRegistry
from agenton.layers import LayerControl, LayerDeps, NoLayerDeps, PlainLayer
from agenton.compositor import Compositor, LayerNode, LayerProvider
from agenton.layers import LayerDeps, NoLayerDeps, PlainLayer, PlainToolType
from agenton_collections.layers.plain import DynamicToolsLayer, ObjectLayer, PromptLayer, ToolsLayer, with_object
from agenton_collections.layers.plain.basic import PromptLayerConfig
@dataclass(frozen=True, slots=True)
@ -41,19 +43,19 @@ class TraceLayer(PlainLayer[NoLayerDeps]):
events: list[str] = field(default_factory=list)
@override
async def on_context_create(self, control: LayerControl) -> None:
async def on_context_create(self) -> None:
self.events.append("create")
@override
async def on_context_suspend(self, control: LayerControl) -> None:
async def on_context_suspend(self) -> None:
self.events.append("suspend")
@override
async def on_context_resume(self, control: LayerControl) -> None:
async def on_context_resume(self) -> None:
self.events.append("resume")
@override
async def on_context_delete(self, control: LayerControl) -> None:
async def on_context_delete(self) -> None:
self.events.append("delete")
@ -72,64 +74,68 @@ async def main() -> None:
audience="engineers composing agent capabilities",
tone="precise and friendly",
)
trace = TraceLayer()
registry = LayerRegistry()
registry.register_layer(PromptLayer)
compositor = (
CompositorBuilder(registry)
.add_config(
{
"layers": [
{
"name": "base_prompt",
"type": "plain.prompt",
"config": {
"prefix": "Use config dicts for serializable layers.",
"user": "Explain how the composed agent should use its layers.",
"suffix": "Before finalizing, make the result easy to scan.",
},
},
{
"name": "extra_prompt",
"type": "plain.prompt",
"config": {
"prefix": "Use constructed instances for objects, local code, and callables.",
},
},
]
}
)
.add_instance(name="profile", layer=ObjectLayer[AgentProfile](profile))
.add_instance(name="profile_prompt", layer=ProfilePromptLayer())
.add_instance(name="tools", layer=ToolsLayer(tool_entries=(count_words,)))
.add_instance(
name="dynamic_tools",
deps={"object_layer": "profile"},
layer=DynamicToolsLayer[AgentProfile](tool_entries=(write_tagline,)),
)
.add_instance(name="trace", layer=trace)
.build()
trace_events: list[str] = []
compositor = Compositor(
[
LayerNode("base_prompt", PromptLayer),
LayerNode("extra_prompt", PromptLayer),
LayerNode(
"profile",
LayerProvider.from_factory(
layer_type=ObjectLayer,
create=lambda _config: ObjectLayer[AgentProfile](profile),
),
),
LayerNode("profile_prompt", ProfilePromptLayer, deps={"profile": "profile"}),
LayerNode(
"tools",
LayerProvider.from_factory(
layer_type=ToolsLayer,
create=lambda _config: ToolsLayer(tool_entries=(count_words,)),
),
),
LayerNode(
"dynamic_tools",
LayerProvider.from_factory(
layer_type=DynamicToolsLayer,
create=lambda _config: DynamicToolsLayer[AgentProfile](tool_entries=(write_tagline,)),
),
deps={"object_layer": "profile"},
),
LayerNode(
"trace",
LayerProvider.from_factory(layer_type=TraceLayer, create=lambda _config: TraceLayer(trace_events)),
),
]
)
configs = {
"base_prompt": PromptLayerConfig(
prefix="Use config dicts for serializable layers.",
user="Explain how the composed agent should use its layers.",
suffix="Before finalizing, make the result easy to scan.",
),
"extra_prompt": PromptLayerConfig(prefix="Use constructed instances for objects, local code, and callables."),
}
print("Prompts:")
for prompt in compositor.prompts:
print(f"- {prompt.value}")
async with compositor.enter(configs=configs) as run:
print("Prompts:")
for prompt in run.prompts:
print(f"- {prompt.value}")
print("\nUser prompts:")
for prompt in compositor.user_prompts:
print(f"- {prompt.value}")
print("\nUser prompts:")
for prompt in run.user_prompts:
print(f"- {prompt.value}")
print("\nTools:")
for tool in compositor.tools:
print(f"- {tool.value.__name__}{signature(tool.value)}")
print([tool.value("layer composition") for tool in compositor.tools])
print("\nTools:")
plain_tools = [cast(PlainToolType, tool) for tool in run.tools]
for tool in plain_tools:
print(f"- {tool.value.__name__}{signature(tool.value)}")
print([tool.value("layer composition") for tool in plain_tools])
run.suspend_on_exit()
async with compositor.enter() as lifecycle_control:
lifecycle_control.suspend_on_exit()
async with compositor.enter(lifecycle_control):
async with compositor.enter(configs=configs, session_snapshot=run.session_snapshot):
pass
print("\nLifecycle:", trace.events)
print("\nLifecycle:", trace_events)
if __name__ == "__main__":

View File

@ -12,8 +12,9 @@ from pydantic_ai.messages import BuiltinToolCallPart, ModelMessage, ToolCallPart
from pydantic_ai.models.openai import OpenAIChatModel # pyright: ignore[reportDeprecated]
from pydantic_ai.models.test import TestModel
from agenton.compositor import CompositorBuilder, LayerRegistry
from agenton.compositor import Compositor, LayerNode, LayerProvider
from agenton_collections.layers.plain import ObjectLayer, PromptLayer, ToolsLayer
from agenton_collections.layers.plain.basic import PromptLayerConfig
from agenton_collections.layers.pydantic_ai import PydanticAIBridgeLayer
from agenton_collections.transformers import PYDANTIC_AI_TRANSFORMERS
@ -49,41 +50,47 @@ async def main() -> None:
audience="engineers composing agent capabilities",
tone="precise and friendly",
)
pydantic_ai_bridge = PydanticAIBridgeLayer[AgentProfile](
prefix=("Prefer concrete details.", profile_prompt, tone_prompt),
user="Use the tools for 'layer composition'.",
tool_entries=(write_tagline,),
compositor = Compositor(
[
LayerNode("base_prompt", PromptLayer),
LayerNode(
"profile",
LayerProvider.from_factory(
layer_type=ObjectLayer,
create=lambda _config: ObjectLayer[AgentProfile](profile),
),
),
LayerNode(
"plain_tools",
LayerProvider.from_factory(
layer_type=ToolsLayer,
create=lambda _config: ToolsLayer(tool_entries=(count_words,)),
),
),
LayerNode(
"pydantic_ai_bridge",
LayerProvider.from_factory(
layer_type=PydanticAIBridgeLayer,
create=lambda _config: PydanticAIBridgeLayer[AgentProfile](
prefix=("Prefer concrete details.", profile_prompt, tone_prompt),
user="Use the tools for 'layer composition'.",
tool_entries=(write_tagline,),
),
),
deps={"object_layer": "profile"},
),
],
**PYDANTIC_AI_TRANSFORMERS,
)
registry = LayerRegistry()
registry.register_layer(PromptLayer)
compositor = (
CompositorBuilder(registry)
.add_config(
{
"layers": [
{
"name": "base_prompt",
"type": "plain.prompt",
"config": {
"prefix": "Use the available tools before answering.",
"suffix": "Return concise, inspectable output.",
},
},
]
}
)
.add_instance(name="profile", layer=ObjectLayer[AgentProfile](profile))
.add_instance(name="plain_tools", layer=ToolsLayer(tool_entries=(count_words,)))
.add_instance(
name="pydantic_ai_bridge",
deps={"object_layer": "profile"},
layer=pydantic_ai_bridge,
)
.build(**PYDANTIC_AI_TRANSFORMERS)
)
async with compositor.enter():
async with compositor.enter(
configs={
"base_prompt": PromptLayerConfig(
prefix="Use the available tools before answering.",
suffix="Return concise, inspectable output.",
)
}
) as run:
model = (
OpenAIChatModel("gpt-5.5") # pyright: ignore[reportDeprecated]
if os.getenv("OPENAI_API_KEY")
@ -92,12 +99,13 @@ async def main() -> None:
agent = Agent[AgentProfile](
model=model,
deps_type=AgentProfile,
tools=compositor.tools,
tools=run.tools,
)
for prompt in compositor.prompts:
for prompt in run.prompts:
_ = agent.system_prompt(prompt)
result = await agent.run(compositor.user_prompts, deps=pydantic_ai_bridge.run_deps)
bridge_layer = run.get_layer("pydantic_ai_bridge", PydanticAIBridgeLayer)
result = await agent.run(run.user_prompts, deps=bridge_layer.run_deps)
for line in _format_messages(result.all_messages()):
print(line)

View File

@ -3,15 +3,13 @@
from __future__ import annotations
import asyncio
from collections import OrderedDict
from dataclasses import dataclass
from typing import ClassVar
from pydantic import BaseModel, ConfigDict
from typing_extensions import override
from agenton.compositor import Compositor
from agenton.layers import LayerControl, NoLayerDeps, PlainLayer, PlainPromptType, PlainToolType
from agenton.compositor import Compositor, LayerNode
from agenton.layers import EmptyLayerConfig, NoLayerDeps, PlainLayer
class ConnectionState(BaseModel):
@ -25,47 +23,27 @@ class ConnectionHandle:
self.connection_id = connection_id
class ConnectionHandles(BaseModel):
connection: ConnectionHandle | None = None
model_config = ConfigDict(extra="forbid", validate_assignment=True, arbitrary_types_allowed=True)
@dataclass(slots=True)
class ConnectionLayer(PlainLayer[NoLayerDeps]):
class ConnectionLayer(PlainLayer[NoLayerDeps, EmptyLayerConfig, ConnectionState]):
runtime_state_type: ClassVar[type[BaseModel]] = ConnectionState
runtime_handles_type: ClassVar[type[BaseModel]] = ConnectionHandles
@override
async def on_context_create(self, control: LayerControl) -> None:
assert isinstance(control.runtime_state, ConnectionState)
assert isinstance(control.runtime_handles, ConnectionHandles)
control.runtime_handles.connection = ConnectionHandle(control.runtime_state.connection_id)
@override
async def on_context_resume(self, control: LayerControl) -> None:
assert isinstance(control.runtime_state, ConnectionState)
assert isinstance(control.runtime_handles, ConnectionHandles)
control.runtime_handles.connection = ConnectionHandle(f"restored:{control.runtime_state.connection_id}")
async def main() -> None:
compositor: Compositor[PlainPromptType, PlainToolType] = Compositor(
layers=OrderedDict([("connection", ConnectionLayer())])
)
session = compositor.new_session()
async with compositor.enter(session) as active_session:
active_session.suspend_on_exit()
compositor = Compositor([LayerNode("connection", ConnectionLayer)])
async with compositor.enter() as run:
layer = run.get_layer("connection", ConnectionLayer)
connection = ConnectionHandle(layer.runtime_state.connection_id)
print("Active external handle:", connection.connection_id)
run.suspend_on_exit()
snapshot = compositor.snapshot_session(session)
snapshot = run.session_snapshot
assert snapshot is not None
print("Snapshot:", snapshot.model_dump(mode="json"))
restored = compositor.session_from_snapshot(snapshot)
async with compositor.enter(restored):
handles = restored.layer("connection").runtime_handles
assert isinstance(handles, ConnectionHandles)
assert handles.connection is not None
print("Rehydrated handle:", handles.connection.connection_id)
async with compositor.enter(session_snapshot=snapshot) as restored_run:
layer = restored_run.get_layer("connection", ConnectionLayer)
restored_connection = ConnectionHandle(f"restored:{layer.runtime_state.connection_id}")
print("Rehydrated external handle:", restored_connection.connection_id)
if __name__ == "__main__":

View File

@ -13,7 +13,6 @@ recover after client-side uncertainty.
import asyncio
from agenton.compositor import CompositorConfig, LayerNodeConfig
from agenton_collections.layers.plain import PromptLayerConfig
from dify_agent.client import Client
from dify_agent.layers.dify_plugin import (
@ -21,7 +20,7 @@ from dify_agent.layers.dify_plugin import (
DifyPluginLLMLayerConfig,
DifyPluginLayerConfig,
)
from dify_agent.protocol import DIFY_AGENT_MODEL_LAYER_ID, CreateRunRequest
from dify_agent.protocol import DIFY_AGENT_MODEL_LAYER_ID, CreateRunRequest, RunComposition, RunLayerSpec
API_BASE_URL = "http://localhost:8000"
@ -36,9 +35,9 @@ async def main() -> None:
async with Client(base_url=API_BASE_URL) as client:
run = await client.create_run(
CreateRunRequest(
compositor=CompositorConfig(
composition=RunComposition(
layers=[
LayerNodeConfig(
RunLayerSpec(
name="prompt",
type="plain.prompt",
config=PromptLayerConfig(
@ -46,12 +45,12 @@ async def main() -> None:
user="Say hello from the Dify Agent API server example.",
),
),
LayerNodeConfig(
RunLayerSpec(
name="plugin",
type="dify.plugin",
config=DifyPluginLayerConfig(tenant_id=TENANT_ID, plugin_id=PLUGIN_ID),
),
LayerNodeConfig(
RunLayerSpec(
name=DIFY_AGENT_MODEL_LAYER_ID,
type="dify.plugin.llm",
deps={"plugin": "plugin"},

View File

@ -5,7 +5,6 @@ does not retry ``POST /runs``; if a timeout occurs, inspect server state or crea
a new run explicitly rather than assuming the original request was not accepted.
"""
from agenton.compositor import CompositorConfig, LayerNodeConfig
from agenton_collections.layers.plain import PromptLayerConfig
from dify_agent.client import Client
from dify_agent.layers.dify_plugin import (
@ -13,7 +12,7 @@ from dify_agent.layers.dify_plugin import (
DifyPluginLLMLayerConfig,
DifyPluginLayerConfig,
)
from dify_agent.protocol import DIFY_AGENT_MODEL_LAYER_ID, CreateRunRequest
from dify_agent.protocol import DIFY_AGENT_MODEL_LAYER_ID, CreateRunRequest, RunComposition, RunLayerSpec
API_BASE_URL = "http://localhost:8000"
@ -28,9 +27,9 @@ def main() -> None:
with Client(base_url=API_BASE_URL) as client:
run = client.create_run_sync(
CreateRunRequest(
compositor=CompositorConfig(
composition=RunComposition(
layers=[
LayerNodeConfig(
RunLayerSpec(
name="prompt",
type="plain.prompt",
config=PromptLayerConfig(
@ -38,12 +37,12 @@ def main() -> None:
user="Say hello from the synchronous Dify Agent client example.",
),
),
LayerNodeConfig(
RunLayerSpec(
name="plugin",
type="dify.plugin",
config=DifyPluginLayerConfig(tenant_id=TENANT_ID, plugin_id=PLUGIN_ID),
),
LayerNodeConfig(
RunLayerSpec(
name=DIFY_AGENT_MODEL_LAYER_ID,
type="dify.plugin.llm",
deps={"plugin": "plugin"},

View File

@ -1,18 +1,20 @@
"""Dify plugin LLM model layer.
This layer owns model capability resolution for Dify plugin-backed LLMs. It
depends on ``DifyPluginLayer`` for daemon access, resolves that dependency's
control from its own ``LayerControl``, and returns a Pydantic AI model adapter
configured from the public LLM layer DTO. The daemon provider carries plugin
transport identity; the DTO's ``model_provider`` is passed to the adapter as
request-level model identity.
depends on ``DifyPluginLayer`` for daemon identity through Agenton's direct
dependency binding and returns a Pydantic AI model adapter configured from the
public LLM layer DTO. Runtime code supplies the FastAPI lifespan-owned shared
HTTP client to ``get_model``; the layer does not own or discover live resources.
The daemon provider carries plugin transport identity, while the DTO's
``model_provider`` is passed to the adapter as request-level model identity.
"""
from dataclasses import dataclass
import httpx
from typing_extensions import Self, override
from agenton.layers import EmptyRuntimeHandles, EmptyRuntimeState, LayerControl, LayerDeps, PlainLayer
from agenton.layers import LayerDeps, PlainLayer
from dify_agent.adapters.llm import DifyLLMAdapterModel
from dify_agent.layers.dify_plugin.configs import DifyPluginLLMLayerConfig
from dify_agent.layers.dify_plugin.plugin_layer import DifyPluginLayer
@ -38,11 +40,9 @@ class DifyPluginLLMLayer(PlainLayer[DifyPluginLLMDeps, DifyPluginLLMLayerConfig]
"""Create the LLM layer from validated public config."""
return cls(config=config)
def get_model(self, control: LayerControl[EmptyRuntimeState, EmptyRuntimeHandles]) -> DifyLLMAdapterModel:
"""Return the configured model using the current session's plugin control."""
control = self.require_control(control, active=True)
plugin_control = control.control_for(self.deps.plugin)
provider = self.deps.plugin.get_daemon_provider(plugin_control)
def get_model(self, *, http_client: httpx.AsyncClient) -> DifyLLMAdapterModel:
"""Return the configured model using the directly bound plugin dependency."""
provider = self.deps.plugin.create_daemon_provider(http_client=http_client)
return DifyLLMAdapterModel(
model=self.config.model,
daemon_provider=provider,

View File

@ -1,50 +1,41 @@
"""Runtime Dify plugin context layer.
The public config identifies tenant/plugin/user context only. Plugin daemon URL,
API key, and timeout are server-side dependencies injected by the layer registry
factory. Each active compositor entry owns an HTTP client in ``LayerControl``
runtime handles and registers it on the control's resource stack. Callers pass
the control explicitly to ``get_daemon_provider`` so shared layer instances never
store or discover session-local clients implicitly. Business model-provider names
belong to the LLM layer/model request, not this daemon context layer.
The public config identifies tenant/plugin/user context only. Plugin daemon URL
and API key are server-side settings injected by the provider factory. The layer
is intentionally config/settings-only under Agenton's state-only core: it does
not open, cache, close, or snapshot HTTP clients, and its lifecycle hooks remain
the inherited no-op hooks. Runtime code passes the FastAPI lifespan-owned shared
``httpx.AsyncClient`` into ``create_daemon_provider`` for each model adapter.
Business model-provider names belong to the LLM layer/model request, not this
daemon context layer.
"""
from dataclasses import dataclass
import httpx
from pydantic import BaseModel, ConfigDict
from typing_extensions import Self, override
from agenton.layers import EmptyRuntimeState, LayerControl, NoLayerDeps, PlainLayer
from agenton.layers import EmptyRuntimeState, NoLayerDeps, PlainLayer
from dify_agent.adapters.llm import DifyPluginDaemonProvider
from dify_agent.layers.dify_plugin.configs import DifyPluginLayerConfig
class DifyPluginRuntimeHandles(BaseModel):
"""Live per-entry handles for Dify plugin daemon access."""
http_client: httpx.AsyncClient | None = None
model_config = ConfigDict(extra="forbid", validate_assignment=True, arbitrary_types_allowed=True)
@dataclass(slots=True)
class DifyPluginLayer(PlainLayer[NoLayerDeps, DifyPluginLayerConfig, EmptyRuntimeState, DifyPluginRuntimeHandles]):
"""Layer that owns plugin daemon connection state for one active session."""
class DifyPluginLayer(PlainLayer[NoLayerDeps, DifyPluginLayerConfig, EmptyRuntimeState]):
"""Layer that carries plugin daemon identity without owning live resources."""
type_id = "dify.plugin"
config: DifyPluginLayerConfig
daemon_url: str
daemon_api_key: str
timeout: float | httpx.Timeout | None = 600.0
@classmethod
@override
def from_config(cls, config: DifyPluginLayerConfig) -> Self:
"""Reject construction without server-injected daemon settings."""
del config
raise TypeError("DifyPluginLayer requires server-side daemon settings and must use a registry factory.")
raise TypeError("DifyPluginLayer requires server-side daemon settings and must use a provider factory.")
@classmethod
def from_config_with_settings(
@ -53,68 +44,26 @@ class DifyPluginLayer(PlainLayer[NoLayerDeps, DifyPluginLayerConfig, EmptyRuntim
*,
daemon_url: str,
daemon_api_key: str,
timeout: float | httpx.Timeout | None,
) -> Self:
"""Create a plugin layer from public config plus server-only daemon settings."""
return cls(config=config, daemon_url=daemon_url, daemon_api_key=daemon_api_key, timeout=timeout)
return cls(config=config, daemon_url=daemon_url, daemon_api_key=daemon_api_key)
def get_daemon_provider(
self,
control: LayerControl[EmptyRuntimeState, DifyPluginRuntimeHandles],
) -> DifyPluginDaemonProvider:
"""Return a daemon provider backed by ``control``'s active HTTP client.
def create_daemon_provider(self, *, http_client: httpx.AsyncClient) -> DifyPluginDaemonProvider:
"""Return a daemon provider backed by the shared plugin daemon client.
Raises:
RuntimeError: if ``control`` is not active or its HTTP client is
absent/closed.
RuntimeError: if ``http_client`` has already been closed.
"""
control = self.require_control(control, active=True)
client = control.runtime_handles.http_client
if client is None or client.is_closed:
raise RuntimeError(
"DifyPluginLayer.get_daemon_provider() requires an entered control with an open HTTP client."
)
if http_client.is_closed:
raise RuntimeError("DifyPluginLayer.create_daemon_provider() requires an open shared HTTP client.")
return DifyPluginDaemonProvider(
tenant_id=self.config.tenant_id,
plugin_id=self.config.plugin_id,
plugin_daemon_url=self.daemon_url,
plugin_daemon_api_key=self.daemon_api_key,
user_id=self.config.user_id,
timeout=self.timeout,
http_client=client,
http_client=http_client,
)
@override
async def on_context_create(
self,
control: LayerControl[EmptyRuntimeState, DifyPluginRuntimeHandles],
) -> None:
await self._open_http_client(control)
@override
async def on_context_resume(
self,
control: LayerControl[EmptyRuntimeState, DifyPluginRuntimeHandles],
) -> None:
await self._open_http_client(control)
@override
async def on_context_suspend(
self,
control: LayerControl[EmptyRuntimeState, DifyPluginRuntimeHandles],
) -> None:
control.runtime_handles.http_client = None
@override
async def on_context_delete(
self,
control: LayerControl[EmptyRuntimeState, DifyPluginRuntimeHandles],
) -> None:
control.runtime_handles.http_client = None
async def _open_http_client(self, control: LayerControl[EmptyRuntimeState, DifyPluginRuntimeHandles]) -> None:
control.runtime_handles.http_client = await control.enter_async_resource(
httpx.AsyncClient(timeout=self.timeout, trust_env=False)
)
__all__ = ["DifyPluginLayer", "DifyPluginRuntimeHandles"]
__all__ = ["DifyPluginLayer"]

View File

@ -10,15 +10,18 @@ from .schemas import (
LayerExitSignals,
PydanticAIStreamRunEvent,
RunEvent,
RunComposition,
RunEventType,
RunEventsResponse,
RunFailedEvent,
RunFailedEventData,
RunLayerSpec,
RunStartedEvent,
RunStatus,
RunStatusResponse,
RunSucceededEvent,
RunSucceededEventData,
normalize_composition,
utc_now,
)
@ -31,15 +34,18 @@ __all__ = [
"LayerExitSignals",
"PydanticAIStreamRunEvent",
"RUN_EVENT_ADAPTER",
"RunComposition",
"RunEvent",
"RunEventType",
"RunEventsResponse",
"RunFailedEvent",
"RunFailedEventData",
"RunLayerSpec",
"RunStartedEvent",
"RunStatus",
"RunStatusResponse",
"RunSucceededEvent",
"RunSucceededEventData",
"normalize_composition",
"utc_now",
]

View File

@ -1,17 +1,24 @@
"""Public HTTP protocol schemas for the Dify Agent run API.
This module is the shared wire contract for the FastAPI server, runtime event
producers, storage adapters, and Python client. The server accepts only
registry-backed Agenton compositor configs, keeping HTTP input data-only and
preventing unsafe import-path construction. Run events are append-only records;
Redis stream ids (or in-memory equivalents in tests) are the public cursors used
by polling and SSE replay. Event envelopes keep the public
``id``/``run_id``/``type``/``data``/``created_at`` shape, while each ``type`` has
a typed ``data`` model so OpenAPI, Redis replay, and clients parse the same
payload contract. Model/provider selection is part of the submitted Agenton
layer graph, not a top-level run field; the runtime reads the model layer named
by ``DIFY_AGENT_MODEL_LAYER_ID``. Request-level layer exit signals decide whether
each layer control is suspended or deleted when the active entry exits, with
producers, storage adapters, and Python client. Create-run requests expose a
Dify-friendly ``composition.layers[].config`` shape so callers can describe one
layer in one place; the server normalizes that public DTO into Agenton's
state-only ``CompositorConfig`` plus node-name keyed per-run configs before
calling ``Compositor.enter(configs=...)``. Session snapshots and ``on_exit`` stay
top-level because they are per-run resume state and exit policy, not graph node
definition.
The server still constructs layers only from explicit provider type ids, keeping
HTTP input data-only and preventing unsafe import-path construction. Run events
are append-only records; Redis stream ids (or in-memory equivalents in tests) are
the public cursors used by polling and SSE replay. Event envelopes keep the
public ``id``/``run_id``/``type``/``data``/``created_at`` shape, while each
``type`` has a typed ``data`` model so OpenAPI, Redis replay, and clients parse
the same payload contract. Model/provider selection is part of the submitted
composition, not a top-level run field; the runtime reads the model layer named
by ``DIFY_AGENT_MODEL_LAYER_ID``. Request-level ``on_exit`` signals decide
whether each active layer is suspended or deleted when the run exits, with
suspend as the default so successful terminal events can include resumable
snapshots. Successful runs publish the final JSON-safe agent output and the
resumable Agenton session snapshot together on the terminal ``run_succeeded``
@ -24,7 +31,7 @@ from typing import Annotated, ClassVar, Final, Literal, TypeAlias
from pydantic import BaseModel, ConfigDict, Field, JsonValue, TypeAdapter
from pydantic_ai.messages import AgentStreamEvent
from agenton.compositor import CompositorConfig, CompositorSessionSnapshot
from agenton.compositor import CompositorConfig, CompositorSessionSnapshot, LayerConfigInput, LayerNodeConfig
from agenton.layers import ExitIntent
@ -44,7 +51,7 @@ def utc_now() -> datetime:
class LayerExitSignals(BaseModel):
"""Requested per-layer lifecycle behavior when a run leaves its active session."""
"""Requested per-layer lifecycle behavior for the top-level ``on_exit`` field."""
default: ExitIntent = ExitIntent.SUSPEND
layers: dict[str, ExitIntent] = Field(default_factory=dict)
@ -52,22 +59,85 @@ class LayerExitSignals(BaseModel):
model_config: ClassVar[ConfigDict] = ConfigDict(extra="forbid")
class RunLayerSpec(BaseModel):
"""Public graph node plus per-run layer config for one Dify Agent layer.
``name``/``type``/``deps``/``metadata`` are normalized into Agenton's
provider-backed graph config. ``config`` is kept separate at the Agenton
boundary and passed to ``Compositor.enter(configs=...)`` keyed by ``name``;
existing layer config DTO instances are preserved so client code can stay
DTO-first without being forced into raw dictionaries.
"""
name: str
type: str
deps: dict[str, str] = Field(default_factory=dict)
metadata: dict[str, JsonValue] = Field(default_factory=dict)
config: LayerConfigInput = None
model_config: ClassVar[ConfigDict] = ConfigDict(extra="forbid")
class RunComposition(BaseModel):
"""Public create-run composition DTO.
The public shape intentionally differs from Agenton's internal
``CompositorConfig`` by carrying each layer's per-run config next to its graph
node fields. Use ``normalize_composition`` at server/runtime boundaries before
constructing a ``Compositor``.
"""
schema_version: int = 1
layers: list[RunLayerSpec]
model_config: ClassVar[ConfigDict] = ConfigDict(extra="forbid")
class CreateRunRequest(BaseModel):
"""Request body for creating one async agent run.
Model/provider configuration must be supplied through the compositor layer
named by ``DIFY_AGENT_MODEL_LAYER_ID``. ``layer_exit_signals`` defaults every
Model/provider configuration must be supplied through the composition layer
named by ``DIFY_AGENT_MODEL_LAYER_ID``. ``on_exit`` defaults every active
layer to suspend so callers receive a resumable success snapshot unless they
explicitly request delete for one or more layers.
"""
compositor: CompositorConfig
composition: RunComposition
session_snapshot: CompositorSessionSnapshot | None = None
layer_exit_signals: LayerExitSignals = Field(default_factory=LayerExitSignals)
on_exit: LayerExitSignals = Field(default_factory=LayerExitSignals)
model_config: ClassVar[ConfigDict] = ConfigDict(extra="forbid")
def normalize_composition(composition: RunComposition) -> tuple[CompositorConfig, dict[str, LayerConfigInput]]:
"""Split public Dify composition into Agenton's graph config and layer configs.
Returns:
A ``CompositorConfig`` containing only graph fields and a node-name keyed
config mapping suitable for ``Compositor.enter(configs=...)``.
The helper is the stable public-to-Agenton boundary: it preserves concrete
``LayerConfig`` DTO inputs where possible, does not accept legacy
``LayerNodeConfig(config=...)`` payloads, and keeps session snapshots plus
exit signals out of graph normalization.
"""
graph_config = CompositorConfig(
schema_version=composition.schema_version,
layers=[
LayerNodeConfig(
name=layer.name,
type=layer.type,
deps=dict(layer.deps),
metadata=dict(layer.metadata),
)
for layer in composition.layers
],
)
layer_configs = {layer.name: layer.config for layer in composition.layers}
return graph_config, layer_configs
class CreateRunResponse(BaseModel):
"""Response returned after a run has been persisted and scheduled locally."""
@ -152,10 +222,7 @@ class RunFailedEvent(BaseRunEvent):
RunEvent: TypeAlias = Annotated[
RunStartedEvent
| PydanticAIStreamRunEvent
| RunSucceededEvent
| RunFailedEvent,
RunStartedEvent | PydanticAIStreamRunEvent | RunSucceededEvent | RunFailedEvent,
Field(discriminator="type"),
]
RUN_EVENT_ADAPTER: TypeAdapter[RunEvent] = TypeAdapter(RunEvent)
@ -180,6 +247,7 @@ __all__ = [
"LayerExitSignals",
"PydanticAIStreamRunEvent",
"RUN_EVENT_ADAPTER",
"RunComposition",
"RunEvent",
"RunEventType",
"RunEventsResponse",
@ -190,5 +258,7 @@ __all__ = [
"RunStatusResponse",
"RunSucceededEvent",
"RunSucceededEventData",
"RunLayerSpec",
"normalize_composition",
"utc_now",
]

View File

@ -2,11 +2,14 @@
The run request carries model/provider selection in the layer graph. This helper
keeps Agent construction details out of ``AgentRunRunner`` while accepting an
already resolved Pydantic AI model from the configured model layer.
already resolved Pydantic AI model from the configured model layer. Prompt and
tool values arriving here are already transformed by Agenton's
``PYDANTIC_AI_TRANSFORMERS`` preset; this module registers those pydantic-ai
objects without reimplementing plain/pydantic-ai conversion logic.
"""
from collections.abc import Sequence
from typing import Any, Callable, cast
from typing import Any, cast
from pydantic_ai import Agent
from pydantic_ai.messages import UserContent
@ -22,28 +25,17 @@ def create_agent(
tools: Sequence[PydanticAITool[object]],
) -> Agent[None, object]:
"""Create the pydantic-ai agent for one run."""
return cast(
agent = cast(
Agent[None, object],
Agent[None, str](
model,
output_type=str,
system_prompt=materialize_static_system_prompts(system_prompts),
tools=tools,
),
)
def materialize_static_system_prompts(system_prompts: Sequence[PydanticAIPrompt[object]]) -> list[str]:
"""Convert MVP static prompt callables into strings for pydantic-ai."""
result: list[str] = []
for prompt in system_prompts:
if isinstance(prompt, str):
result.append(prompt)
elif callable(prompt):
result.append(cast(Callable[[], str], prompt)())
else:
raise TypeError(f"Unsupported system prompt type: {type(prompt).__qualname__}")
return result
_ = agent.system_prompt(cast(Any, prompt))
return agent
def normalize_user_input(user_prompts: Sequence[UserContent]) -> str | Sequence[UserContent]:
@ -53,4 +45,4 @@ def normalize_user_input(user_prompts: Sequence[UserContent]) -> str | Sequence[
return list(user_prompts)
__all__ = ["create_agent", "materialize_static_system_prompts", "normalize_user_input"]
__all__ = ["create_agent", "normalize_user_input"]

View File

@ -0,0 +1,28 @@
"""Shared validation helpers for Agenton-backed request boundaries.
Most bad Dify Agent inputs surface from Agenton as ``KeyError``, ``TypeError``,
or ``ValueError`` while graph config, per-run layer config, and session snapshot
DTOs are being validated. One smaller class of request-shaped failures appears a
bit later, during ``Compositor.enter(...)`` before the body of the entered run
executes: session snapshots may contain lifecycle states such as ``CLOSED`` that
are serializable but not enterable. Agenton reports those as ``RuntimeError``.
Dify Agent intentionally translates only these known enter-time runtime errors
into public request-validation errors. Other runtime failures still represent
execution bugs or infrastructure problems and must not be downgraded to client
input errors.
"""
_ENTER_VALIDATION_RUNTIME_ERROR_FRAGMENTS = (
"ACTIVE snapshots are not allowed.",
"CLOSED snapshots cannot be entered.",
)
def is_agenton_enter_validation_runtime_error(exc: RuntimeError) -> bool:
"""Return whether ``exc`` is a known Agenton enter-time input failure."""
message = str(exc)
return any(fragment in message for fragment in _ENTER_VALIDATION_RUNTIME_ERROR_FRAGMENTS)
__all__ = ["is_agenton_enter_validation_runtime_error"]

View File

@ -1,17 +1,20 @@
"""Safe Agenton compositor construction for API-submitted configs.
Only explicitly registered layer types are constructible here. The default
registry contains prompt layers plus Dify plugin LLM layers. Public DTOs provide
tenant/plugin/model data, while server-only plugin daemon settings are injected
through the registry factory for ``DifyPluginLayer``.
Only explicitly allowed provider type ids are constructible here. The default
provider set contains prompt layers plus Dify plugin LLM layers. Public DTOs
provide tenant/plugin/model data, while server-only plugin daemon settings are
injected through the provider factory for ``DifyPluginLayer``. The resulting
``Compositor`` remains Agenton state-only: live resources such as the plugin
daemon HTTP client are supplied later by the runtime and never enter providers,
layers, or session snapshots.
"""
from typing import cast
from collections.abc import Mapping, Sequence
from typing import Any, cast
import httpx
from pydantic_ai.messages import UserContent
from agenton.compositor import Compositor, CompositorConfig, LayerRegistry
from agenton.compositor import Compositor, CompositorConfig, LayerProvider, LayerProviderInput
from agenton.layers.types import AllPromptTypes, AllToolTypes, AllUserPromptTypes, PydanticAIPrompt, PydanticAITool
from agenton_collections.layers.plain.basic import PromptLayer
from agenton_collections.transformers.pydantic_ai import PYDANTIC_AI_TRANSFORMERS
@ -20,32 +23,34 @@ from dify_agent.layers.dify_plugin.llm_layer import DifyPluginLLMLayer
from dify_agent.layers.dify_plugin.plugin_layer import DifyPluginLayer
def create_default_layer_registry(
type DifyAgentLayerProvider = LayerProvider[Any]
def create_default_layer_providers(
*,
plugin_daemon_url: str = "http://localhost:5002",
plugin_daemon_api_key: str = "",
plugin_daemon_timeout: float | httpx.Timeout | None = 600.0,
) -> LayerRegistry:
"""Return the server registry of safe config-constructible layers."""
registry = LayerRegistry()
registry.register_layer(PromptLayer)
registry.register_layer(
DifyPluginLayer,
factory=lambda config: DifyPluginLayer.from_config_with_settings(
DifyPluginLayerConfig.model_validate(config),
daemon_url=plugin_daemon_url,
daemon_api_key=plugin_daemon_api_key,
timeout=plugin_daemon_timeout,
) -> tuple[DifyAgentLayerProvider, ...]:
"""Return the server provider set of safe config-constructible layers."""
return (
LayerProvider.from_layer_type(PromptLayer),
LayerProvider.from_factory(
layer_type=DifyPluginLayer,
create=lambda config: DifyPluginLayer.from_config_with_settings(
DifyPluginLayerConfig.model_validate(config),
daemon_url=plugin_daemon_url,
daemon_api_key=plugin_daemon_api_key,
),
),
LayerProvider.from_layer_type(DifyPluginLLMLayer),
)
registry.register_layer(DifyPluginLLMLayer)
return registry
def build_pydantic_ai_compositor(
config: CompositorConfig,
*,
registry: LayerRegistry | None = None,
providers: Sequence[LayerProviderInput],
node_providers: Mapping[str, LayerProviderInput] | None = None,
) -> Compositor[
PydanticAIPrompt[object],
PydanticAITool[object],
@ -54,7 +59,14 @@ def build_pydantic_ai_compositor(
UserContent,
AllUserPromptTypes,
]:
"""Build a Pydantic AI-ready compositor from a validated config."""
"""Build a Pydantic AI-ready compositor from a validated graph config.
Prompt, user prompt, and tool conversion is delegated to Agenton's shared
pydantic-ai transformer preset so Dify Agent does not duplicate conversion
logic for plain and pydantic-ai layer families. Callers must pass the already
selected provider set explicitly so provider defaulting stays at outer runtime
boundaries rather than being duplicated here.
"""
return cast(
Compositor[
PydanticAIPrompt[object],
@ -66,10 +78,11 @@ def build_pydantic_ai_compositor(
],
Compositor.from_config(
config,
registry=registry or create_default_layer_registry(),
providers=providers,
node_providers=node_providers,
**PYDANTIC_AI_TRANSFORMERS, # pyright: ignore[reportArgumentType]
),
)
__all__ = ["build_pydantic_ai_compositor", "create_default_layer_registry"]
__all__ = ["DifyAgentLayerProvider", "build_pydantic_ai_compositor", "create_default_layer_providers"]

View File

@ -1,14 +1,15 @@
"""Validation and application of request-level Agenton layer exit signals.
HTTP requests carry data-only lifecycle intent in ``LayerExitSignals``. The
runtime validates the signal keys against the built compositor before a run is
persisted or entered, then applies the resolved intent after entry because
``Layer.lifecycle_enter`` resets controls to delete on each successful enter.
HTTP requests carry data-only lifecycle intent in the top-level ``on_exit``
field. The runtime validates signal keys against the built compositor before a
run is persisted or entered, then applies the resolved intent to the active
``CompositorRun`` after entry because Agenton initializes each run slot with a
delete-on-exit intent.
"""
from typing import Any
from agenton.compositor import Compositor, CompositorSession
from agenton.compositor import Compositor, CompositorRun
from agenton.layers import ExitIntent
from dify_agent.protocol.schemas import LayerExitSignals
@ -18,22 +19,26 @@ def validate_layer_exit_signals(
signals: LayerExitSignals,
) -> None:
"""Raise ``ValueError`` when ``signals`` mention layers absent from ``compositor``."""
unknown_layer_ids = set(signals.layers) - set(compositor.layers)
known_layer_ids = {node.name for node in compositor.nodes}
unknown_layer_ids = set(signals.layers) - known_layer_ids
if not unknown_layer_ids:
return
names = ", ".join(sorted(unknown_layer_ids))
raise ValueError(f"layer_exit_signals.layers references unknown layer ids: {names}.")
raise ValueError(f"on_exit.layers references unknown layer ids: {names}.")
def apply_layer_exit_signals(session: CompositorSession, signals: LayerExitSignals) -> None:
"""Apply ``signals`` to active controls for the current compositor entry."""
for layer_id, control in session.layer_controls.items():
def apply_layer_exit_signals(
run: CompositorRun[Any, Any, Any, Any, Any, Any],
signals: LayerExitSignals,
) -> None:
"""Apply ``signals`` to active run slots for the current compositor entry."""
for layer_id in run.slots:
intent = signals.layers.get(layer_id, signals.default)
if intent is ExitIntent.SUSPEND:
control.suspend_on_exit()
run.suspend_layer_on_exit(layer_id)
elif intent is ExitIntent.DELETE:
control.delete_on_exit()
run.delete_layer_on_exit(layer_id)
else:
raise ValueError(f"Unsupported layer exit intent: {intent!r}.")

View File

@ -5,6 +5,11 @@ The scheduler is intentionally process-local: it persists a run record, starts a
task registry. Redis remains the durable source for status and event streams, but
there is no Redis job queue or cross-process handoff. If the process crashes,
currently active runs are lost until an external operator marks or retries them.
Create-run validation enters a lightweight Agenton run before persistence so the
same transformed user prompts and top-level ``on_exit`` policy used by execution
are checked without relying on removed session/control APIs; Dify's default
layers keep lifecycle hooks side-effect free so this validation does not open
plugin daemon clients.
"""
import asyncio
@ -12,11 +17,14 @@ import logging
from collections.abc import Callable
from typing import Protocol
from agenton.compositor import LayerRegistry
from dify_agent.protocol.schemas import CreateRunRequest
from dify_agent.runtime.compositor_factory import build_pydantic_ai_compositor, create_default_layer_registry
import httpx
from agenton.compositor import LayerProviderInput
from dify_agent.protocol.schemas import CreateRunRequest, normalize_composition
from dify_agent.runtime.agenton_validation import is_agenton_enter_validation_runtime_error
from dify_agent.runtime.compositor_factory import build_pydantic_ai_compositor, create_default_layer_providers
from dify_agent.runtime.event_sink import RunEventSink, emit_run_failed
from dify_agent.runtime.layer_exit_signals import validate_layer_exit_signals
from dify_agent.runtime.layer_exit_signals import apply_layer_exit_signals, validate_layer_exit_signals
from dify_agent.runtime.runner import AgentRunRunner
from dify_agent.runtime.user_prompt_validation import EMPTY_USER_PROMPTS_ERROR, has_non_blank_user_prompt
from dify_agent.server.schemas import RunRecord
@ -28,6 +36,10 @@ class SchedulerStoppingError(RuntimeError):
"""Raised when a create-run request arrives after shutdown has started."""
class RunRequestValidationError(ValueError):
"""Raised when a create-run request cannot produce an executable Agenton run."""
class RunStore(RunEventSink, Protocol):
"""Persistence boundary needed by the scheduler."""
@ -53,9 +65,9 @@ class RunScheduler:
``active_tasks`` is mutated only on the event loop that calls ``create_run``
and ``shutdown``. The task registry is not durable; it exists so the lifespan
hook can wait for in-flight work and mark cancelled runs failed before Redis is
closed. A lock guards the stopping flag, run persistence, and task
registration so shutdown cannot complete while a run is between record
creation and active-task tracking.
closed. A lock guards the stopping flag, lightweight request validation, run
persistence, and task registration so shutdown cannot begin after a request is
admitted and no validation runs once stopping has been set.
"""
store: RunStore
@ -63,22 +75,25 @@ class RunScheduler:
active_tasks: dict[str, asyncio.Task[None]]
stopping: bool
runner_factory: RunRunnerFactory
layer_registry: LayerRegistry
layer_providers: tuple[LayerProviderInput, ...]
plugin_daemon_http_client: httpx.AsyncClient
_lifecycle_lock: asyncio.Lock
def __init__(
self,
*,
store: RunStore,
plugin_daemon_http_client: httpx.AsyncClient,
shutdown_grace_seconds: float = 30,
layer_registry: LayerRegistry | None = None,
layer_providers: tuple[LayerProviderInput, ...] | None = None,
runner_factory: RunRunnerFactory | None = None,
) -> None:
self.store = store
self.shutdown_grace_seconds = shutdown_grace_seconds
self.active_tasks = {}
self.stopping = False
self.layer_registry = layer_registry or create_default_layer_registry()
self.plugin_daemon_http_client = plugin_daemon_http_client
self.layer_providers = layer_providers if layer_providers is not None else create_default_layer_providers()
self.runner_factory = runner_factory or self._default_runner_factory
self._lifecycle_lock = asyncio.Lock()
@ -88,14 +103,10 @@ class RunScheduler:
The returned record is already ``running``. The background task is removed
from ``active_tasks`` when it finishes, regardless of success or failure.
"""
compositor = build_pydantic_ai_compositor(request.compositor, registry=self.layer_registry)
validate_layer_exit_signals(compositor, request.layer_exit_signals)
if not has_non_blank_user_prompt(compositor.user_prompts):
raise ValueError(EMPTY_USER_PROMPTS_ERROR)
async with self._lifecycle_lock:
if self.stopping:
raise SchedulerStoppingError("run scheduler is shutting down")
await validate_run_request(request, layer_providers=self.layer_providers)
record = await self.store.create_run()
task = asyncio.create_task(self._run_record(record, request), name=f"dify-agent-run-{record.run_id}")
self.active_tasks[record.run_id] = task
@ -136,7 +147,8 @@ class RunScheduler:
sink=self.store,
request=request,
run_id=record.run_id,
layer_registry=self.layer_registry,
plugin_daemon_http_client=self.plugin_daemon_http_client,
layer_providers=self.layer_providers,
)
async def _mark_cancelled_run_failed(self, run_id: str) -> None:
@ -149,4 +161,42 @@ class RunScheduler:
logger.exception("failed to mark cancelled run failed", extra={"run_id": run_id})
__all__ = ["RunScheduler", "SchedulerStoppingError"]
async def validate_run_request(
request: CreateRunRequest,
*,
layer_providers: tuple[LayerProviderInput, ...] | None = None,
) -> None:
"""Validate create-run semantics that require an entered Agenton run.
This boundary rejects unknown ``on_exit`` layer ids, effectively empty
transformed user prompts, and known enter-time snapshot lifecycle errors
before the scheduler persists a run record. It also exercises provider config
validation and snapshot hydration without touching external services because
Dify plugin daemon clients are owned by the FastAPI lifespan, not Agenton
lifecycle hooks.
"""
resolved_layer_providers = layer_providers if layer_providers is not None else create_default_layer_providers()
entered_run = False
try:
graph_config, layer_configs = normalize_composition(request.composition)
compositor = build_pydantic_ai_compositor(
graph_config,
providers=resolved_layer_providers,
)
validate_layer_exit_signals(compositor, request.on_exit)
async with compositor.enter(configs=layer_configs, session_snapshot=request.session_snapshot) as run:
entered_run = True
apply_layer_exit_signals(run, request.on_exit)
if not has_non_blank_user_prompt(run.user_prompts):
raise RunRequestValidationError(EMPTY_USER_PROMPTS_ERROR)
except RunRequestValidationError:
raise
except RuntimeError as exc:
if not entered_run and is_agenton_enter_validation_runtime_error(exc):
raise RunRequestValidationError(str(exc)) from exc
raise
except (KeyError, TypeError, ValueError) as exc:
raise RunRequestValidationError(str(exc)) from exc
__all__ = ["RunRequestValidationError", "RunScheduler", "SchedulerStoppingError", "validate_run_request"]

View File

@ -1,26 +1,29 @@
"""Runtime execution for one scheduled Dify Agent run.
The runner is storage-agnostic: it builds an Agenton compositor, enters or
resumes its session, runs pydantic-ai with ``compositor.user_prompts`` as the user
input, emits stream events, applies request-level layer exit signals, snapshots
the resulting session, and then publishes a terminal success or failure event.
The Pydantic AI model is resolved from the active Agenton layer named by
``DIFY_AGENT_MODEL_LAYER_ID``. Successful terminal events contain both the
JSON-safe final output and session snapshot; there are no separate output or
snapshot events to correlate.
The runner is storage-agnostic: it normalizes the public Dify composition into
Agenton's graph/config split, enters a fresh ``CompositorRun`` (or resumes one
from a snapshot), runs pydantic-ai with ``run.user_prompts`` as the user input,
emits stream events, applies request-level ``on_exit`` signals, and then
publishes a terminal success or failure event. The Pydantic AI model is resolved
from the active Agenton layer named by ``DIFY_AGENT_MODEL_LAYER_ID`` and receives
the FastAPI lifespan-owned plugin daemon HTTP client; no run or layer owns that
client. Successful terminal events contain both the JSON-safe final output and
session snapshot; there are no separate output or snapshot events to correlate.
"""
from collections.abc import AsyncIterable
from typing import cast
import httpx
from pydantic import JsonValue, TypeAdapter
from pydantic_ai.messages import AgentStreamEvent
from agenton.compositor import CompositorSessionSnapshot, LayerRegistry
from agenton.compositor import CompositorSessionSnapshot, LayerProviderInput
from dify_agent.layers.dify_plugin.llm_layer import DifyPluginLLMLayer
from dify_agent.protocol.schemas import DIFY_AGENT_MODEL_LAYER_ID, CreateRunRequest
from dify_agent.protocol.schemas import DIFY_AGENT_MODEL_LAYER_ID, CreateRunRequest, normalize_composition
from dify_agent.runtime.agent_factory import create_agent, normalize_user_input
from dify_agent.runtime.compositor_factory import build_pydantic_ai_compositor, create_default_layer_registry
from dify_agent.runtime.agenton_validation import is_agenton_enter_validation_runtime_error
from dify_agent.runtime.compositor_factory import build_pydantic_ai_compositor, create_default_layer_providers
from dify_agent.runtime.event_sink import (
RunEventSink,
emit_pydantic_ai_event,
@ -46,7 +49,8 @@ class AgentRunRunner:
request: CreateRunRequest
run_id: str
layer_registry: LayerRegistry
layer_providers: tuple[LayerProviderInput, ...]
plugin_daemon_http_client: httpx.AsyncClient
def __init__(
self,
@ -54,12 +58,14 @@ class AgentRunRunner:
sink: RunEventSink,
request: CreateRunRequest,
run_id: str,
layer_registry: LayerRegistry | None = None,
plugin_daemon_http_client: httpx.AsyncClient,
layer_providers: tuple[LayerProviderInput, ...] | None = None,
) -> None:
self.sink = sink
self.request = request
self.run_id = run_id
self.layer_registry = layer_registry or create_default_layer_registry()
self.plugin_daemon_http_client = plugin_daemon_http_client
self.layer_providers = layer_providers if layer_providers is not None else create_default_layer_providers()
async def run(self) -> None:
"""Execute the run and emit the documented event sequence."""
@ -83,38 +89,52 @@ class AgentRunRunner:
await self.sink.update_status(self.run_id, "succeeded")
async def _run_agent(self) -> tuple[JsonValue, CompositorSessionSnapshot]:
"""Run pydantic-ai inside an entered Agenton session."""
compositor = build_pydantic_ai_compositor(self.request.compositor, registry=self.layer_registry)
"""Run pydantic-ai inside an entered Agenton run.
Known input-shaped Agenton enter-time runtime errors, such as trying to
resume a ``CLOSED`` snapshot layer, are normalized to
``AgentRunValidationError``. Later runtime failures still propagate as
execution errors so they become terminal failed runs rather than client
validation responses.
"""
try:
validate_layer_exit_signals(compositor, self.request.layer_exit_signals)
except ValueError as exc:
graph_config, layer_configs = normalize_composition(self.request.composition)
compositor = build_pydantic_ai_compositor(graph_config, providers=self.layer_providers)
validate_layer_exit_signals(compositor, self.request.on_exit)
except (KeyError, TypeError, ValueError) as exc:
raise AgentRunValidationError(str(exc)) from exc
session = (
compositor.session_from_snapshot(self.request.session_snapshot)
if self.request.session_snapshot is not None
else compositor.new_session()
)
async with compositor.enter(session) as active_session:
apply_layer_exit_signals(active_session, self.request.layer_exit_signals)
user_prompts = compositor.user_prompts
if not has_non_blank_user_prompt(user_prompts):
raise AgentRunValidationError(EMPTY_USER_PROMPTS_ERROR)
async def handle_events(_ctx: object, events: AsyncIterable[AgentStreamEvent]) -> None:
async for event in events:
_ = await emit_pydantic_ai_event(self.sink, run_id=self.run_id, data=event)
entered_run = False
try:
async with compositor.enter(configs=layer_configs, session_snapshot=self.request.session_snapshot) as run:
entered_run = True
apply_layer_exit_signals(run, self.request.on_exit)
user_prompts = run.user_prompts
if not has_non_blank_user_prompt(user_prompts):
raise AgentRunValidationError(EMPTY_USER_PROMPTS_ERROR)
try:
llm_layer = compositor.get_layer(DIFY_AGENT_MODEL_LAYER_ID, DifyPluginLLMLayer)
llm_control = active_session.layer(DIFY_AGENT_MODEL_LAYER_ID)
model = llm_layer.get_model(llm_control)
except (KeyError, TypeError, RuntimeError) as exc:
async def handle_events(_ctx: object, events: AsyncIterable[AgentStreamEvent]) -> None:
async for event in events:
_ = await emit_pydantic_ai_event(self.sink, run_id=self.run_id, data=event)
try:
llm_layer = run.get_layer(DIFY_AGENT_MODEL_LAYER_ID, DifyPluginLLMLayer)
model = llm_layer.get_model(http_client=self.plugin_daemon_http_client)
except (KeyError, TypeError, RuntimeError) as exc:
raise AgentRunValidationError(str(exc)) from exc
agent = create_agent(model, system_prompts=run.prompts, tools=run.tools)
result = await agent.run(normalize_user_input(user_prompts), event_stream_handler=handle_events)
output = _serialize_agent_output(result.output)
except RuntimeError as exc:
if not entered_run and is_agenton_enter_validation_runtime_error(exc):
raise AgentRunValidationError(str(exc)) from exc
raise
agent = create_agent(model, system_prompts=compositor.prompts, tools=compositor.tools)
result = await agent.run(normalize_user_input(user_prompts), event_stream_handler=handle_events)
if run.session_snapshot is None:
raise RuntimeError("Agenton run did not produce a session snapshot after exit.")
return _serialize_agent_output(result.output), compositor.snapshot_session(session)
return output, run.session_snapshot
def _serialize_agent_output(output: object) -> JsonValue:

View File

@ -1,10 +1,10 @@
"""Validation for effective user prompts produced by Agenton compositors.
"""Validation for effective user prompts produced by Agenton runs.
Validation happens after safe compositor construction so scheduler and runner
paths use the same semantics as the actual pydantic-ai input. Blank string fragments do not
count as meaningful input; non-string ``UserContent`` is treated as intentional
content because rich media/message parts do not have a universal whitespace
representation.
Validation happens after safe compositor construction and run entry so scheduler
and runner paths use the same transformed prompts as the actual pydantic-ai
input. Blank string fragments do not count as meaningful input; non-string
``UserContent`` is treated as intentional content because rich media/message
parts do not have a universal whitespace representation.
"""
from collections.abc import Sequence
@ -12,7 +12,7 @@ from collections.abc import Sequence
from pydantic_ai.messages import UserContent
EMPTY_USER_PROMPTS_ERROR = "compositor.user_prompts must not be empty"
EMPTY_USER_PROMPTS_ERROR = "run.user_prompts must not be empty"
def has_non_blank_user_prompt(user_prompts: Sequence[UserContent]) -> bool:

View File

@ -1,20 +1,22 @@
"""FastAPI application factory for the Dify Agent run server.
The HTTP process owns Redis clients, route wiring, and a process-local scheduler.
Run execution happens in background ``asyncio`` tasks rather than request
handlers, so client disconnects do not cancel the agent runtime. Redis persists
run records and per-run event streams with configured retention only; it is not
used as a job queue.
The HTTP process owns Redis clients, one shared plugin daemon ``httpx.AsyncClient``,
route wiring, and a process-local scheduler. Run execution happens in background
``asyncio`` tasks rather than request handlers, so client disconnects do not
cancel the agent runtime. Redis persists run records and per-run event streams
with configured retention only; it is not used as a job queue. Agenton layers and
providers stay state-only: they borrow the lifespan-owned plugin daemon client
through the runner and never create or close it themselves.
"""
from collections.abc import AsyncGenerator
from contextlib import asynccontextmanager
import httpx
from fastapi import FastAPI
from redis.asyncio import Redis
from agenton.compositor import LayerRegistry
from dify_agent.runtime.compositor_factory import create_default_layer_registry
from dify_agent.runtime.compositor_factory import create_default_layer_providers
from dify_agent.runtime.run_scheduler import RunScheduler
from dify_agent.server.routes.runs import create_runs_router
from dify_agent.server.settings import ServerSettings
@ -24,16 +26,16 @@ from dify_agent.storage.redis_run_store import RedisRunStore
def create_app(settings: ServerSettings | None = None) -> FastAPI:
"""Build the FastAPI app with one shared Redis store and local scheduler."""
resolved_settings = settings or ServerSettings()
layer_registry = create_default_layer_registry(
layer_providers = create_default_layer_providers(
plugin_daemon_url=resolved_settings.plugin_daemon_url,
plugin_daemon_api_key=resolved_settings.plugin_daemon_api_key,
plugin_daemon_timeout=resolved_settings.plugin_daemon_timeout,
)
state: dict[str, RedisRunStore | RunScheduler | LayerRegistry] = {"layer_registry": layer_registry}
state: dict[str, object] = {}
@asynccontextmanager
async def lifespan(_app: FastAPI) -> AsyncGenerator[None, None]:
redis = Redis.from_url(resolved_settings.redis_url)
plugin_daemon_http_client = create_plugin_daemon_http_client(resolved_settings)
store = RedisRunStore(
redis,
prefix=resolved_settings.redis_prefix,
@ -41,8 +43,9 @@ def create_app(settings: ServerSettings | None = None) -> FastAPI:
)
scheduler = RunScheduler(
store=store,
plugin_daemon_http_client=plugin_daemon_http_client,
shutdown_grace_seconds=resolved_settings.shutdown_grace_seconds,
layer_registry=layer_registry,
layer_providers=layer_providers,
)
state["store"] = store
state["scheduler"] = scheduler
@ -50,6 +53,7 @@ def create_app(settings: ServerSettings | None = None) -> FastAPI:
yield
finally:
await scheduler.shutdown()
await plugin_daemon_http_client.aclose()
await redis.aclose()
app = FastAPI(title="Dify Agent Run Server", version="0.1.0", lifespan=lifespan)
@ -60,14 +64,34 @@ def create_app(settings: ServerSettings | None = None) -> FastAPI:
def get_scheduler() -> RunScheduler:
return state["scheduler"] # pyright: ignore[reportReturnType]
def get_layer_registry() -> LayerRegistry:
return state["layer_registry"] # pyright: ignore[reportReturnType]
app.include_router(create_runs_router(get_store, get_scheduler, get_layer_registry))
app.include_router(create_runs_router(get_store, get_scheduler))
return app
def create_plugin_daemon_http_client(settings: ServerSettings) -> httpx.AsyncClient:
"""Create the lifespan-owned plugin daemon HTTP client with configured limits.
The returned client is shared by all local background runs in this FastAPI
process and must be closed by the app lifespan after the scheduler has stopped
using it.
"""
return httpx.AsyncClient(
timeout=httpx.Timeout(
connect=settings.plugin_daemon_connect_timeout,
read=settings.plugin_daemon_read_timeout,
write=settings.plugin_daemon_write_timeout,
pool=settings.plugin_daemon_pool_timeout,
),
limits=httpx.Limits(
max_connections=settings.plugin_daemon_max_connections,
max_keepalive_connections=settings.plugin_daemon_max_keepalive_connections,
keepalive_expiry=settings.plugin_daemon_keepalive_expiry,
),
trust_env=False,
)
app = create_app()
__all__ = ["app", "create_app"]
__all__ = ["app", "create_app", "create_plugin_daemon_http_client"]

View File

@ -13,12 +13,8 @@ from typing import Annotated
from fastapi import APIRouter, Depends, Header, HTTPException, Query
from fastapi.responses import StreamingResponse
from agenton.compositor import LayerRegistry
from dify_agent.protocol.schemas import CreateRunRequest, CreateRunResponse, RunEventsResponse, RunStatusResponse
from dify_agent.runtime.compositor_factory import build_pydantic_ai_compositor, create_default_layer_registry
from dify_agent.runtime.layer_exit_signals import validate_layer_exit_signals
from dify_agent.runtime.run_scheduler import RunScheduler, SchedulerStoppingError
from dify_agent.runtime.user_prompt_validation import EMPTY_USER_PROMPTS_ERROR, has_non_blank_user_prompt
from dify_agent.runtime.run_scheduler import RunRequestValidationError, RunScheduler, SchedulerStoppingError
from dify_agent.server.sse import sse_event_stream
from dify_agent.storage.redis_run_store import RedisRunStore, RunNotFoundError
@ -26,11 +22,9 @@ from dify_agent.storage.redis_run_store import RedisRunStore, RunNotFoundError
def create_runs_router(
get_store: Callable[[], RedisRunStore],
get_scheduler: Callable[[], RunScheduler],
get_layer_registry: Callable[[], LayerRegistry] | None = None,
) -> APIRouter:
"""Create routes bound to the application's store dependency provider."""
router = APIRouter(prefix="/runs", tags=["runs"])
resolved_get_layer_registry = get_layer_registry or create_default_layer_registry
async def store_dep() -> RedisRunStore:
return get_store()
@ -43,19 +37,10 @@ def create_runs_router(
request: CreateRunRequest,
scheduler: Annotated[RunScheduler, Depends(scheduler_dep)],
) -> CreateRunResponse:
try:
compositor = build_pydantic_ai_compositor(
request.compositor,
registry=resolved_get_layer_registry(),
)
validate_layer_exit_signals(compositor, request.layer_exit_signals)
except Exception as exc:
raise HTTPException(status_code=422, detail=str(exc)) from exc
if not has_non_blank_user_prompt(compositor.user_prompts):
raise HTTPException(status_code=422, detail=EMPTY_USER_PROMPTS_ERROR)
try:
record = await scheduler.create_run(request)
except RunRequestValidationError as exc:
raise HTTPException(status_code=422, detail=str(exc)) from exc
except SchedulerStoppingError as exc:
raise HTTPException(status_code=503, detail="run scheduler is shutting down") from exc
return CreateRunResponse(run_id=record.run_id, status=record.status)

View File

@ -1,4 +1,10 @@
"""Configuration for the FastAPI run server."""
"""Configuration for the FastAPI run server.
Plugin daemon HTTP client settings describe the single FastAPI lifespan-owned
``httpx.AsyncClient`` shared by local run tasks. Layers and Agenton providers do
not own that client, so these settings are process resource limits rather than
per-run lifecycle knobs.
"""
from typing import ClassVar
@ -17,7 +23,13 @@ class ServerSettings(BaseSettings):
run_retention_seconds: int = Field(default=DEFAULT_RUN_RETENTION_SECONDS, ge=1)
plugin_daemon_url: str = "http://localhost:5002"
plugin_daemon_api_key: str = ""
plugin_daemon_timeout: float | None = 600.0
plugin_daemon_connect_timeout: float = Field(default=10.0, ge=0)
plugin_daemon_read_timeout: float = Field(default=600.0, ge=0)
plugin_daemon_write_timeout: float = Field(default=30.0, ge=0)
plugin_daemon_pool_timeout: float = Field(default=10.0, ge=0)
plugin_daemon_max_connections: int = Field(default=100, ge=1)
plugin_daemon_max_keepalive_connections: int = Field(default=20, ge=0)
plugin_daemon_keepalive_expiry: float = Field(default=30.0, ge=0)
model_config: ClassVar[SettingsConfigDict] = SettingsConfigDict(
env_prefix="DIFY_AGENT_",

View File

@ -31,7 +31,7 @@ from dify_agent.protocol.schemas import (
def _create_run_payload() -> dict[str, object]:
return {
"compositor": {
"composition": {
"schema_version": 1,
"layers": [{"name": "prompt", "type": "plain.prompt", "config": {"user": "hello"}}],
}
@ -76,9 +76,10 @@ def test_sync_methods_parse_protocol_dtos_and_send_create_request_dto() -> None:
def handler(request: httpx.Request) -> httpx.Response:
if request.method == "POST" and request.url.path == "/runs":
payload = cast(dict[str, object], json.loads(request.content))
compositor = cast(dict[str, object], payload["compositor"])
layers = cast(list[dict[str, object]], compositor["layers"])
composition = cast(dict[str, object], payload["composition"])
layers = cast(list[dict[str, object]], composition["layers"])
assert layers[0]["config"] == {"user": "hello"}
assert "compositor" not in payload
assert "agent_profile" not in payload
return httpx.Response(202, json={"run_id": "run-1", "status": "running"})
if request.method == "GET" and request.url.path == "/runs/run-1":
@ -207,7 +208,7 @@ def test_create_run_is_not_retried_after_timeout() -> None:
def test_sync_sse_parser_supports_comments_multiline_data_and_id_fill() -> None:
payload = RUN_EVENT_ADAPTER.dump_json(RunStartedEvent(run_id="run-1"), exclude={"id"}).decode()
before_type, after_type = payload.split('"type"', maxsplit=1)
body = f": keepalive\nid: 5-0\nevent: run_started\ndata: {before_type}\ndata: \"type\"{after_type}\n\n"
body = f': keepalive\nid: 5-0\nevent: run_started\ndata: {before_type}\ndata: "type"{after_type}\n\n'
def handler(request: httpx.Request) -> httpx.Response:
assert request.url.params["after"] == "0-0"

View File

@ -1,190 +1,120 @@
import asyncio
from collections import OrderedDict
from typing import cast
import httpx
import pytest
from agenton.compositor import Compositor
from agenton.layers import EmptyRuntimeHandles, EmptyRuntimeState, LayerControl, PlainPromptType, PlainToolType
from agenton.compositor import Compositor, LayerNode, LayerProvider
from dify_agent.adapters.llm import DifyLLMAdapterModel
from dify_agent.layers.dify_plugin.configs import DifyPluginLLMLayerConfig, DifyPluginLayerConfig
from dify_agent.layers.dify_plugin.llm_layer import DifyPluginLLMLayer
from dify_agent.layers.dify_plugin.plugin_layer import DifyPluginLayer, DifyPluginRuntimeHandles
from dify_agent.layers.dify_plugin.plugin_layer import DifyPluginLayer
def _plugin_config() -> DifyPluginLayerConfig:
return DifyPluginLayerConfig(tenant_id="tenant-1", plugin_id="langgenius/openai", user_id="user-1")
def _llm_config() -> DifyPluginLLMLayerConfig:
return DifyPluginLLMLayerConfig(
model_provider="openai",
model="demo-model",
credentials={"api_key": "secret"},
model_settings={"temperature": 0.2},
)
def _plugin_layer() -> DifyPluginLayer:
return DifyPluginLayer.from_config_with_settings(
DifyPluginLayerConfig(tenant_id="tenant-1", plugin_id="langgenius/openai", user_id="user-1"),
_plugin_config(),
daemon_url="http://plugin-daemon",
daemon_api_key="daemon-secret",
timeout=12,
)
def _llm_layer() -> DifyPluginLLMLayer:
return DifyPluginLLMLayer.from_config(
DifyPluginLLMLayerConfig(
model_provider="openai",
model="demo-model",
credentials={"api_key": "secret"},
model_settings={"temperature": 0.2},
)
def _plugin_provider() -> LayerProvider[DifyPluginLayer]:
return LayerProvider.from_factory(
layer_type=DifyPluginLayer,
create=lambda config: DifyPluginLayer.from_config_with_settings(
DifyPluginLayerConfig.model_validate(config),
daemon_url="http://plugin-daemon",
daemon_api_key="daemon-secret",
),
)
def _plugin_control(control: LayerControl) -> LayerControl[EmptyRuntimeState, DifyPluginRuntimeHandles]:
return cast(LayerControl[EmptyRuntimeState, DifyPluginRuntimeHandles], control)
def _llm_control(control: LayerControl) -> LayerControl[EmptyRuntimeState, EmptyRuntimeHandles]:
return cast(LayerControl[EmptyRuntimeState, EmptyRuntimeHandles], control)
def test_dify_plugin_layer_uses_resource_stack_and_get_daemon_provider_requires_active_control() -> None:
def test_dify_plugin_layer_creates_daemon_provider_from_shared_http_client() -> None:
async def scenario() -> None:
plugin = _plugin_layer()
compositor: Compositor[PlainPromptType, PlainToolType] = Compositor(layers=OrderedDict([("plugin", plugin)]))
session = compositor.new_session()
async with httpx.AsyncClient(transport=httpx.MockTransport(lambda _request: httpx.Response(200))) as client:
provider = plugin.create_daemon_provider(http_client=client)
with pytest.raises(RuntimeError, match="requires an active LayerControl"):
_ = plugin.get_daemon_provider(_plugin_control(session.layer("plugin")))
async with compositor.enter(session) as active_session:
handles = cast(DifyPluginRuntimeHandles, cast(object, session.layer("plugin").runtime_handles))
first_client = handles.http_client
assert first_client is not None
provider = plugin.get_daemon_provider(_plugin_control(session.layer("plugin")))
assert provider.name == "DifyPlugin/langgenius/openai"
assert provider.client.http_client is first_client
assert provider.client.http_client is client
assert provider.client.tenant_id == "tenant-1"
assert provider.client.plugin_id == "langgenius/openai"
assert provider.client.user_id == "user-1"
async with provider:
pass
assert first_client.is_closed is False
active_session.suspend_on_exit()
assert handles.http_client is None
assert first_client.is_closed is True
with pytest.raises(RuntimeError, match="requires an active LayerControl"):
_ = plugin.get_daemon_provider(_plugin_control(session.layer("plugin")))
async with compositor.enter(session):
second_client = handles.http_client
assert second_client is not None
assert second_client is not first_client
assert handles.http_client is None
assert second_client.is_closed is True
assert client.is_closed is False
asyncio.run(scenario())
def test_dify_plugin_layer_get_daemon_provider_rejects_wrong_control() -> None:
def test_dify_plugin_layer_rejects_closed_shared_http_client() -> None:
async def scenario() -> None:
plugin = _plugin_layer()
llm = _llm_layer()
compositor: Compositor[PlainPromptType, PlainToolType] = Compositor(
layers=OrderedDict([("plugin", plugin), ("llm", llm)]),
deps_name_mapping={"llm": {"plugin": "plugin"}},
client = httpx.AsyncClient()
await client.aclose()
with pytest.raises(RuntimeError, match="open shared HTTP client"):
_ = plugin.create_daemon_provider(http_client=client)
asyncio.run(scenario())
def test_dify_plugin_llm_layer_builds_adapter_model_from_direct_dependency() -> None:
async def scenario() -> None:
compositor = Compositor(
[
LayerNode("renamed-plugin", _plugin_provider()),
LayerNode("llm", DifyPluginLLMLayer, deps={"plugin": "renamed-plugin"}),
]
)
async with httpx.AsyncClient(transport=httpx.MockTransport(lambda _request: httpx.Response(200))) as client:
async with compositor.enter(
configs={
"renamed-plugin": _plugin_config(),
"llm": _llm_config(),
}
) as run:
plugin = run.get_layer("renamed-plugin", DifyPluginLayer)
llm = run.get_layer("llm", DifyPluginLLMLayer)
async with compositor.enter() as session:
with pytest.raises(RuntimeError, match="belongs to layer 'llm'"):
_ = plugin.get_daemon_provider(_plugin_control(session.layer("llm")))
model = llm.get_model(http_client=client)
assert llm.deps.plugin is plugin
assert isinstance(model, DifyLLMAdapterModel)
assert model.model_name == "demo-model"
assert model.model_provider == "openai"
assert model.credentials == {"api_key": "secret"}
assert model.provider.name == "DifyPlugin/langgenius/openai"
assert model.provider.client.http_client is client
asyncio.run(scenario())
def test_dify_plugin_llm_layer_builds_adapter_model_from_dependency_provider() -> None:
def test_dify_plugin_layer_lifecycle_does_not_manage_http_client() -> None:
async def scenario() -> None:
plugin = _plugin_layer()
llm = _llm_layer()
compositor: Compositor[PlainPromptType, PlainToolType] = Compositor(
layers=OrderedDict([("plugin", plugin), ("llm", llm)]),
deps_name_mapping={"llm": {"plugin": "plugin"}},
)
compositor = Compositor([LayerNode("plugin", _plugin_provider())])
async with httpx.AsyncClient(transport=httpx.MockTransport(lambda _request: httpx.Response(200))) as client:
async with compositor.enter(configs={"plugin": _plugin_config()}) as run:
plugin = run.get_layer("plugin", DifyPluginLayer)
provider = plugin.create_daemon_provider(http_client=client)
run.suspend_layer_on_exit("plugin")
session = compositor.new_session()
with pytest.raises(RuntimeError, match="requires an active LayerControl"):
_ = llm.get_model(_llm_control(session.layer("llm")))
async with compositor.enter(session):
model = llm.get_model(_llm_control(session.layer("llm")))
assert isinstance(model, DifyLLMAdapterModel)
assert model.model_name == "demo-model"
assert model.model_provider == "openai"
assert model.credentials == {"api_key": "secret"}
assert model.provider.name == "DifyPlugin/langgenius/openai"
handles = cast(DifyPluginRuntimeHandles, cast(object, session.layer("plugin").runtime_handles))
assert model.provider.client.http_client is handles.http_client
with pytest.raises(RuntimeError, match="belongs to layer 'plugin'"):
_ = llm.get_model(_llm_control(session.layer("plugin")))
asyncio.run(scenario())
def test_dify_plugin_llm_layer_get_model_uses_control_dependency_lookup(monkeypatch: pytest.MonkeyPatch) -> None:
async def scenario() -> None:
plugin = _plugin_layer()
llm = _llm_layer()
compositor: Compositor[PlainPromptType, PlainToolType] = Compositor(
layers=OrderedDict([("renamed-plugin", plugin), ("llm", llm)]),
deps_name_mapping={"llm": {"plugin": "renamed-plugin"}},
)
async with compositor.enter() as session:
llm_control = session.layer("llm")
plugin_control = session.layer("renamed-plugin")
calls: list[object] = []
def fake_control_for(self: LayerControl, dep_layer: object) -> object:
assert self is llm_control
calls.append(dep_layer)
return plugin_control
monkeypatch.setattr(LayerControl, "control_for", fake_control_for)
model = llm.get_model(llm_control)
assert calls == [plugin]
assert isinstance(model, DifyLLMAdapterModel)
asyncio.run(scenario())
def test_dify_plugin_layer_concurrent_sessions_use_separate_controls_and_clients() -> None:
async def scenario() -> None:
plugin = _plugin_layer()
compositor: Compositor[PlainPromptType, PlainToolType] = Compositor(layers=OrderedDict([("plugin", plugin)]))
first_session = compositor.new_session()
second_session = compositor.new_session()
async with compositor.enter(first_session):
async with compositor.enter(second_session):
first_handles = cast(
DifyPluginRuntimeHandles,
cast(object, first_session.layer("plugin").runtime_handles),
)
second_handles = cast(
DifyPluginRuntimeHandles,
cast(object, second_session.layer("plugin").runtime_handles),
)
first_client = first_handles.http_client
second_client = second_handles.http_client
assert first_client is not None
assert second_client is not None
assert first_client is not second_client
first_provider = plugin.get_daemon_provider(_plugin_control(first_session.layer("plugin")))
second_provider = plugin.get_daemon_provider(_plugin_control(second_session.layer("plugin")))
assert first_provider.client.http_client is first_client
assert second_provider.client.http_client is second_client
assert second_client.is_closed is True
assert first_client.is_closed is False
assert first_client.is_closed is True
assert run.session_snapshot is not None
assert provider.client.http_client is client
assert client.is_closed is False
asyncio.run(scenario())

View File

@ -2,8 +2,9 @@ import pytest
from pydantic import ValidationError
from pydantic_ai.messages import FinalResultEvent
from agenton.layers import ExitIntent
from agenton.compositor import CompositorSessionSnapshot
from agenton.layers import ExitIntent
from agenton_collections.layers.plain import PromptLayerConfig
import dify_agent.protocol as protocol_exports
from dify_agent.protocol import DIFY_AGENT_MODEL_LAYER_ID
from dify_agent.protocol.schemas import (
@ -11,12 +12,16 @@ from dify_agent.protocol.schemas import (
CreateRunRequest,
LayerExitSignals,
PydanticAIStreamRunEvent,
RunComposition,
RunFailedEvent,
RunFailedEventData,
RunLayerSpec,
RunStartedEvent,
RunSucceededEvent,
RunSucceededEventData,
normalize_composition,
)
from dify_agent.layers.dify_plugin.configs import DifyPluginLLMLayerConfig, DifyPluginLayerConfig
def test_run_event_adapter_round_trips_typed_variants() -> None:
@ -54,38 +59,84 @@ def test_pydantic_ai_event_data_uses_agent_stream_event_model() -> None:
assert isinstance(event.data, FinalResultEvent)
def test_create_run_request_rejects_agent_profile_and_model_layer_id_is_public() -> None:
def test_create_run_request_rejects_old_compositor_payload_and_model_layer_id_is_public() -> None:
assert DIFY_AGENT_MODEL_LAYER_ID == "llm"
with pytest.raises(ValidationError):
_ = CreateRunRequest.model_validate(
{
"compositor": {"layers": []},
"agent_profile": {"provider": "test", "output_text": "done"},
}
)
def test_layer_exit_signals_default_to_suspend_and_are_public() -> None:
def test_create_run_request_accepts_dto_first_public_composition_and_normalizes_graph_config() -> None:
prompt_config = PromptLayerConfig(prefix="system", user="hello")
plugin_config = DifyPluginLayerConfig(tenant_id="tenant-1", plugin_id="langgenius/openai")
llm_config = DifyPluginLLMLayerConfig(
model_provider="openai",
model="demo-model",
credentials={"api_key": "secret"},
)
request = CreateRunRequest(
composition=RunComposition(
layers=[
RunLayerSpec(name="prompt", type="plain.prompt", config=prompt_config),
RunLayerSpec(name="plugin", type="dify.plugin", config=plugin_config),
RunLayerSpec(
name=DIFY_AGENT_MODEL_LAYER_ID,
type="dify.plugin.llm",
deps={"plugin": "plugin"},
config=llm_config,
),
]
)
)
graph_config, layer_configs = normalize_composition(request.composition)
payload = request.model_dump(mode="json")
assert payload["composition"]["layers"][0]["config"] == {"prefix": "system", "user": "hello", "suffix": []}
assert [layer.model_dump(mode="json") for layer in graph_config.layers] == [
{"name": "prompt", "type": "plain.prompt", "deps": {}, "metadata": {}},
{"name": "plugin", "type": "dify.plugin", "deps": {}, "metadata": {}},
{
"name": DIFY_AGENT_MODEL_LAYER_ID,
"type": "dify.plugin.llm",
"deps": {"plugin": "plugin"},
"metadata": {},
},
]
assert layer_configs == {
"prompt": prompt_config,
"plugin": plugin_config,
DIFY_AGENT_MODEL_LAYER_ID: llm_config,
}
def test_on_exit_default_to_suspend_and_are_public() -> None:
assert protocol_exports.LayerExitSignals is LayerExitSignals
request = CreateRunRequest.model_validate({"compositor": {"layers": []}})
assert protocol_exports.RunComposition is RunComposition
assert protocol_exports.RunLayerSpec is RunLayerSpec
assert protocol_exports.normalize_composition is normalize_composition
request = CreateRunRequest.model_validate({"composition": {"layers": []}})
assert request.layer_exit_signals.default is ExitIntent.SUSPEND
assert request.layer_exit_signals.layers == {}
assert request.on_exit.default is ExitIntent.SUSPEND
assert request.on_exit.layers == {}
def test_layer_exit_signals_accept_layer_overrides() -> None:
def test_on_exit_accept_layer_overrides() -> None:
request = CreateRunRequest.model_validate(
{
"compositor": {"layers": []},
"layer_exit_signals": {
"composition": {"layers": []},
"on_exit": {
"default": "delete",
"layers": {"prompt": "suspend", "llm": "delete"},
},
}
)
assert request.layer_exit_signals.default is ExitIntent.DELETE
assert request.layer_exit_signals.layers == {"prompt": ExitIntent.SUSPEND, "llm": ExitIntent.DELETE}
assert request.on_exit.default is ExitIntent.DELETE
assert request.on_exit.layers == {"prompt": ExitIntent.SUSPEND, "llm": ExitIntent.DELETE}
def test_layer_exit_signals_reject_extra_fields() -> None:

View File

@ -1,21 +1,28 @@
import asyncio
from collections import defaultdict
from typing import cast
import httpx
import pytest
from pydantic import JsonValue
from agenton.compositor import CompositorConfig, LayerNodeConfig
from agenton.layers import ExitIntent
from dify_agent.protocol.schemas import CreateRunRequest, LayerExitSignals, RunEvent, RunStatus
from dify_agent.runtime.run_scheduler import RunScheduler, SchedulerStoppingError
from agenton.compositor import CompositorSessionSnapshot, LayerSessionSnapshot
from agenton.layers import ExitIntent, LifecycleState
from agenton_collections.layers.plain import PromptLayerConfig
from dify_agent.protocol.schemas import (
CreateRunRequest,
LayerExitSignals,
RunComposition,
RunEvent,
RunLayerSpec,
RunStatus,
)
from dify_agent.runtime.run_scheduler import RunRequestValidationError, RunScheduler, SchedulerStoppingError, validate_run_request
from dify_agent.server.schemas import RunRecord
def _request(user: str | list[str] = "hello") -> CreateRunRequest:
return CreateRunRequest(
compositor=CompositorConfig(
layers=[LayerNodeConfig(name="prompt", type="plain.prompt", config=cast(JsonValue, {"user": user}))]
composition=RunComposition(
layers=[RunLayerSpec(name="prompt", type="plain.prompt", config=PromptLayerConfig(user=user))]
)
)
@ -82,20 +89,22 @@ def test_create_run_starts_background_task_and_returns_running() -> None:
store = FakeStore()
started = asyncio.Event()
release = asyncio.Event()
scheduler = RunScheduler(
store=store,
runner_factory=lambda _record, _request: ControlledRunner(started=started, release=release),
)
async with httpx.AsyncClient() as client:
scheduler = RunScheduler(
store=store,
plugin_daemon_http_client=client,
runner_factory=lambda _record, _request: ControlledRunner(started=started, release=release),
)
record = await scheduler.create_run(_request())
await asyncio.wait_for(started.wait(), timeout=1)
record = await scheduler.create_run(_request())
await asyncio.wait_for(started.wait(), timeout=1)
assert record.status == "running"
assert list(scheduler.active_tasks) == [record.run_id]
_ = release.set()
await asyncio.wait_for(scheduler.active_tasks[record.run_id], timeout=1)
await asyncio.sleep(0)
assert scheduler.active_tasks == {}
assert record.status == "running"
assert list(scheduler.active_tasks) == [record.run_id]
_ = release.set()
await asyncio.wait_for(scheduler.active_tasks[record.run_id], timeout=1)
await asyncio.sleep(0)
assert scheduler.active_tasks == {}
asyncio.run(scenario())
@ -104,21 +113,23 @@ def test_shutdown_marks_unfinished_runs_failed_and_appends_event() -> None:
async def scenario() -> None:
store = FakeStore()
started = asyncio.Event()
scheduler = RunScheduler(
store=store,
shutdown_grace_seconds=0,
runner_factory=lambda _record, _request: ControlledRunner(started=started, release=asyncio.Event()),
)
record = await scheduler.create_run(_request())
await asyncio.wait_for(started.wait(), timeout=1)
async with httpx.AsyncClient() as client:
scheduler = RunScheduler(
store=store,
plugin_daemon_http_client=client,
shutdown_grace_seconds=0,
runner_factory=lambda _record, _request: ControlledRunner(started=started, release=asyncio.Event()),
)
record = await scheduler.create_run(_request())
await asyncio.wait_for(started.wait(), timeout=1)
await scheduler.shutdown()
await scheduler.shutdown()
assert scheduler.stopping is True
assert scheduler.active_tasks == {}
assert store.statuses[record.run_id] == "failed"
assert store.errors[record.run_id] == "run cancelled during server shutdown"
assert [event.type for event in store.events[record.run_id]] == ["run_failed"]
assert scheduler.stopping is True
assert scheduler.active_tasks == {}
assert store.statuses[record.run_id] == "failed"
assert store.errors[record.run_id] == "run cancelled during server shutdown"
assert [event.type for event in store.events[record.run_id]] == ["run_failed"]
asyncio.run(scenario())
@ -126,25 +137,73 @@ def test_shutdown_marks_unfinished_runs_failed_and_appends_event() -> None:
def test_create_run_rejects_blank_prompt_before_persisting() -> None:
async def scenario() -> None:
store = FakeStore()
scheduler = RunScheduler(store=store)
async with httpx.AsyncClient() as client:
scheduler = RunScheduler(store=store, plugin_daemon_http_client=client)
with pytest.raises(ValueError, match="compositor.user_prompts must not be empty"):
await scheduler.create_run(_request(["", " "]))
with pytest.raises(ValueError, match="run.user_prompts must not be empty"):
await scheduler.create_run(_request(["", " "]))
assert store.records == {}
asyncio.run(scenario())
def test_validate_run_request_honors_explicit_empty_layer_providers() -> None:
async def scenario() -> None:
with pytest.raises(RunRequestValidationError, match="plain.prompt"):
await validate_run_request(_request(), layer_providers=())
asyncio.run(scenario())
def test_create_run_rejects_unknown_layer_exit_signal_before_persisting() -> None:
async def scenario() -> None:
store = FakeStore()
scheduler = RunScheduler(store=store)
request = _request()
request.layer_exit_signals = LayerExitSignals(layers={"missing": ExitIntent.DELETE})
async with httpx.AsyncClient() as client:
scheduler = RunScheduler(store=store, plugin_daemon_http_client=client)
request = _request()
request.on_exit = LayerExitSignals(layers={"missing": ExitIntent.DELETE})
with pytest.raises(ValueError, match="missing"):
await scheduler.create_run(request)
with pytest.raises(ValueError, match="missing"):
await scheduler.create_run(request)
assert store.records == {}
asyncio.run(scenario())
def test_create_run_honors_explicit_empty_layer_providers_before_persisting() -> None:
async def scenario() -> None:
store = FakeStore()
async with httpx.AsyncClient() as client:
scheduler = RunScheduler(store=store, plugin_daemon_http_client=client, layer_providers=())
with pytest.raises(RunRequestValidationError, match="plain.prompt"):
await scheduler.create_run(_request())
assert store.records == {}
asyncio.run(scenario())
def test_create_run_rejects_closed_session_snapshot_before_persisting() -> None:
async def scenario() -> None:
store = FakeStore()
async with httpx.AsyncClient() as client:
scheduler = RunScheduler(store=store, plugin_daemon_http_client=client)
request = _request()
request.session_snapshot = CompositorSessionSnapshot(
layers=[
LayerSessionSnapshot(
name="prompt",
lifecycle_state=LifecycleState.CLOSED,
runtime_state={},
)
]
)
with pytest.raises(ValueError, match="CLOSED snapshots cannot be entered"):
_ = await scheduler.create_run(request)
assert store.records == {}
@ -153,11 +212,27 @@ def test_create_run_rejects_unknown_layer_exit_signal_before_persisting() -> Non
def test_create_run_rejects_after_shutdown_starts() -> None:
async def scenario() -> None:
scheduler = RunScheduler(store=FakeStore())
await scheduler.shutdown()
async with httpx.AsyncClient() as client:
scheduler = RunScheduler(store=FakeStore(), plugin_daemon_http_client=client)
await scheduler.shutdown()
with pytest.raises(SchedulerStoppingError):
await scheduler.create_run(_request())
with pytest.raises(SchedulerStoppingError):
await scheduler.create_run(_request())
asyncio.run(scenario())
def test_create_run_rejects_invalid_request_after_shutdown_without_persisting() -> None:
async def scenario() -> None:
store = FakeStore()
async with httpx.AsyncClient() as client:
scheduler = RunScheduler(store=store, plugin_daemon_http_client=client)
await scheduler.shutdown()
with pytest.raises(SchedulerStoppingError):
_ = await scheduler.create_run(_request(["", " "]))
assert store.records == {}
asyncio.run(scenario())
@ -168,30 +243,34 @@ def test_shutdown_waits_for_in_flight_create_to_register_before_cancelling() ->
release_create = asyncio.Event()
runner_started = asyncio.Event()
store = SlowCreateStore(create_started=create_started, release_create=release_create)
scheduler = RunScheduler(
store=store,
shutdown_grace_seconds=0,
runner_factory=lambda _record, _request: ControlledRunner(started=runner_started, release=asyncio.Event()),
)
async with httpx.AsyncClient() as client:
scheduler = RunScheduler(
store=store,
plugin_daemon_http_client=client,
shutdown_grace_seconds=0,
runner_factory=lambda _record, _request: ControlledRunner(
started=runner_started, release=asyncio.Event()
),
)
create_task = asyncio.create_task(scheduler.create_run(_request()))
await asyncio.wait_for(create_started.wait(), timeout=1)
shutdown_task = asyncio.create_task(scheduler.shutdown())
await asyncio.sleep(0)
create_task = asyncio.create_task(scheduler.create_run(_request()))
await asyncio.wait_for(create_started.wait(), timeout=1)
shutdown_task = asyncio.create_task(scheduler.shutdown())
await asyncio.sleep(0)
assert shutdown_task.done() is False
assert scheduler.stopping is False
assert shutdown_task.done() is False
assert scheduler.stopping is False
_ = release_create.set()
record = await asyncio.wait_for(create_task, timeout=1)
await asyncio.wait_for(shutdown_task, timeout=1)
_ = release_create.set()
record = await asyncio.wait_for(create_task, timeout=1)
await asyncio.wait_for(shutdown_task, timeout=1)
assert scheduler.stopping is True
assert scheduler.active_tasks == {}
assert store.statuses[record.run_id] == "failed"
assert [event.type for event in store.events[record.run_id]] == ["run_failed"]
assert scheduler.stopping is True
assert scheduler.active_tasks == {}
assert store.statuses[record.run_id] == "failed"
assert [event.type for event in store.events[record.run_id]] == ["run_failed"]
with pytest.raises(SchedulerStoppingError):
await scheduler.create_run(_request())
with pytest.raises(SchedulerStoppingError):
await scheduler.create_run(_request())
asyncio.run(scenario())

View File

@ -1,16 +1,22 @@
import asyncio
import httpx
import pytest
from pydantic_ai.models.test import TestModel
from agenton.compositor import CompositorConfig, LayerNodeConfig
from agenton.layers import ExitIntent, LayerControl, LifecycleState
from agenton.compositor import CompositorSessionSnapshot, LayerSessionSnapshot
from agenton.layers import ExitIntent, LifecycleState
from agenton_collections.layers.plain import PromptLayerConfig
from dify_agent.layers.dify_plugin.configs import DifyPluginLLMLayerConfig, DifyPluginLayerConfig
from dify_agent.layers.dify_plugin.llm_layer import DifyPluginLLMLayer
from dify_agent.layers.dify_plugin.plugin_layer import DifyPluginRuntimeHandles
from dify_agent.protocol import DIFY_AGENT_MODEL_LAYER_ID
from dify_agent.protocol.schemas import CreateRunRequest, LayerExitSignals, RunSucceededEvent
from dify_agent.protocol.schemas import (
CreateRunRequest,
LayerExitSignals,
RunComposition,
RunLayerSpec,
RunSucceededEvent,
)
from dify_agent.runtime.event_sink import InMemoryRunEventSink
from dify_agent.runtime.runner import AgentRunRunner, AgentRunValidationError
@ -20,22 +26,22 @@ def _request(
*,
llm_layer_name: str = DIFY_AGENT_MODEL_LAYER_ID,
plugin_layer_name: str = "plugin",
layer_exit_signals: LayerExitSignals | None = None,
on_exit: LayerExitSignals | None = None,
) -> CreateRunRequest:
return CreateRunRequest(
compositor=CompositorConfig(
composition=RunComposition(
layers=[
LayerNodeConfig(
RunLayerSpec(
name="prompt",
type="plain.prompt",
config=PromptLayerConfig(prefix="system", user=user),
),
LayerNodeConfig(
RunLayerSpec(
name=plugin_layer_name,
type="dify.plugin",
config=DifyPluginLayerConfig(tenant_id="tenant-1", plugin_id="langgenius/openai"),
),
LayerNodeConfig(
RunLayerSpec(
name=llm_layer_name,
type="dify.plugin.llm",
deps={"plugin": plugin_layer_name},
@ -47,24 +53,35 @@ def _request(
),
]
),
layer_exit_signals=layer_exit_signals or LayerExitSignals(),
on_exit=on_exit or LayerExitSignals(),
)
def test_runner_emits_terminal_success_and_snapshot(monkeypatch: pytest.MonkeyPatch) -> None:
def fake_get_model(self: DifyPluginLLMLayer, control: LayerControl):
seen_clients: list[httpx.AsyncClient] = []
def fake_get_model(self: DifyPluginLLMLayer, *, http_client: httpx.AsyncClient):
assert self.config.model == "demo-model"
plugin_control = control.control_for(self.deps.plugin)
plugin_handles = plugin_control.runtime_handles
assert isinstance(plugin_handles, DifyPluginRuntimeHandles)
assert plugin_handles.http_client is not None
assert self.deps.plugin.config.plugin_id == "langgenius/openai"
seen_clients.append(http_client)
return TestModel(custom_output_text="done") # pyright: ignore[reportReturnType]
monkeypatch.setattr(DifyPluginLLMLayer, "get_model", fake_get_model)
request = _request(plugin_layer_name="renamed-plugin")
sink = InMemoryRunEventSink()
asyncio.run(AgentRunRunner(sink=sink, request=request, run_id="run-1").run())
async def scenario() -> None:
async with httpx.AsyncClient() as client:
await AgentRunRunner(
sink=sink,
request=request,
run_id="run-1",
plugin_daemon_http_client=client,
).run()
assert seen_clients == [client]
assert client.is_closed is False
asyncio.run(scenario())
event_types = [event.type for event in sink.events["run-1"]]
assert event_types[0] == "run_started"
@ -80,7 +97,7 @@ def test_runner_emits_terminal_success_and_snapshot(monkeypatch: pytest.MonkeyPa
"renamed-plugin",
DIFY_AGENT_MODEL_LAYER_ID,
]
assert [layer.state for layer in terminal.data.session_snapshot.layers] == [
assert [layer.lifecycle_state for layer in terminal.data.session_snapshot.layers] == [
LifecycleState.SUSPENDED,
LifecycleState.SUSPENDED,
LifecycleState.SUSPENDED,
@ -88,70 +105,96 @@ def test_runner_emits_terminal_success_and_snapshot(monkeypatch: pytest.MonkeyPa
assert sink.statuses["run-1"] == "succeeded"
def test_runner_applies_layer_exit_signal_overrides_to_success_snapshot(monkeypatch: pytest.MonkeyPatch) -> None:
def fake_get_model(_self: DifyPluginLLMLayer, _control: LayerControl):
def test_runner_applies_on_exit_overrides_to_success_snapshot(monkeypatch: pytest.MonkeyPatch) -> None:
def fake_get_model(_self: DifyPluginLLMLayer, *, http_client: httpx.AsyncClient):
assert http_client.is_closed is False
return TestModel(custom_output_text="done") # pyright: ignore[reportReturnType]
monkeypatch.setattr(DifyPluginLLMLayer, "get_model", fake_get_model)
request = _request(
layer_exit_signals=LayerExitSignals(
on_exit=LayerExitSignals(
default=ExitIntent.SUSPEND,
layers={"prompt": ExitIntent.DELETE, DIFY_AGENT_MODEL_LAYER_ID: ExitIntent.DELETE},
)
)
sink = InMemoryRunEventSink()
asyncio.run(AgentRunRunner(sink=sink, request=request, run_id="run-exit").run())
async def scenario() -> None:
async with httpx.AsyncClient() as client:
await AgentRunRunner(
sink=sink,
request=request,
run_id="run-exit",
plugin_daemon_http_client=client,
).run()
asyncio.run(scenario())
terminal = sink.events["run-exit"][-1]
assert isinstance(terminal, RunSucceededEvent)
assert {layer.name: layer.state for layer in terminal.data.session_snapshot.layers} == {
assert {layer.name: layer.lifecycle_state for layer in terminal.data.session_snapshot.layers} == {
"prompt": LifecycleState.CLOSED,
"plugin": LifecycleState.SUSPENDED,
DIFY_AGENT_MODEL_LAYER_ID: LifecycleState.CLOSED,
}
def test_runner_rejects_unknown_layer_exit_signal_id() -> None:
request = _request(layer_exit_signals=LayerExitSignals(layers={"missing": ExitIntent.DELETE}))
def test_runner_rejects_unknown_on_exit_layer_id() -> None:
request = _request(on_exit=LayerExitSignals(layers={"missing": ExitIntent.DELETE}))
sink = InMemoryRunEventSink()
with pytest.raises(AgentRunValidationError, match="missing"):
asyncio.run(AgentRunRunner(sink=sink, request=request, run_id="run-unknown-signal").run())
async def scenario() -> None:
async with httpx.AsyncClient() as client:
with pytest.raises(AgentRunValidationError, match="missing"):
await AgentRunRunner(
sink=sink,
request=request,
run_id="run-unknown-signal",
plugin_daemon_http_client=client,
).run()
asyncio.run(scenario())
assert [event.type for event in sink.events["run-unknown-signal"]] == ["run_started", "run_failed"]
assert sink.statuses["run-unknown-signal"] == "failed"
def test_runner_applies_layer_exit_signals_before_model_resolution_failure(monkeypatch: pytest.MonkeyPatch) -> None:
def fake_get_model(self: DifyPluginLLMLayer, control: LayerControl):
plugin_control = control.control_for(self.deps.plugin)
assert control.exit_intent is ExitIntent.DELETE
assert plugin_control.exit_intent is ExitIntent.SUSPEND
raise RuntimeError("model unavailable")
monkeypatch.setattr(DifyPluginLLMLayer, "get_model", fake_get_model)
request = _request(
layer_exit_signals=LayerExitSignals(
default=ExitIntent.DELETE,
layers={"plugin": ExitIntent.SUSPEND},
)
)
def test_runner_honors_explicit_empty_layer_providers() -> None:
request = _request()
sink = InMemoryRunEventSink()
with pytest.raises(AgentRunValidationError, match="model unavailable"):
asyncio.run(AgentRunRunner(sink=sink, request=request, run_id="run-model-failure").run())
async def scenario() -> None:
async with httpx.AsyncClient() as client:
with pytest.raises(AgentRunValidationError, match="plain.prompt"):
await AgentRunRunner(
sink=sink,
request=request,
run_id="run-empty-providers",
plugin_daemon_http_client=client,
layer_providers=(),
).run()
assert [event.type for event in sink.events["run-model-failure"]] == ["run_started", "run_failed"]
assert sink.statuses["run-model-failure"] == "failed"
asyncio.run(scenario())
assert [event.type for event in sink.events["run-empty-providers"]] == ["run_started", "run_failed"]
assert sink.statuses["run-empty-providers"] == "failed"
def test_runner_fails_empty_user_prompts() -> None:
request = _request("")
sink = InMemoryRunEventSink()
with pytest.raises(AgentRunValidationError):
asyncio.run(AgentRunRunner(sink=sink, request=request, run_id="run-2").run())
async def scenario() -> None:
async with httpx.AsyncClient() as client:
with pytest.raises(AgentRunValidationError):
await AgentRunRunner(
sink=sink,
request=request,
run_id="run-2",
plugin_daemon_http_client=client,
).run()
asyncio.run(scenario())
assert [event.type for event in sink.events["run-2"]] == ["run_started", "run_failed"]
assert sink.statuses["run-2"] == "failed"
@ -161,8 +204,17 @@ def test_runner_fails_blank_string_user_prompt_list() -> None:
request = _request(["", " "])
sink = InMemoryRunEventSink()
with pytest.raises(AgentRunValidationError):
asyncio.run(AgentRunRunner(sink=sink, request=request, run_id="run-3").run())
async def scenario() -> None:
async with httpx.AsyncClient() as client:
with pytest.raises(AgentRunValidationError):
await AgentRunRunner(
sink=sink,
request=request,
run_id="run-3",
plugin_daemon_http_client=client,
).run()
asyncio.run(scenario())
assert [event.type for event in sink.events["run-3"]] == ["run_started", "run_failed"]
assert sink.statuses["run-3"] == "failed"
@ -172,8 +224,56 @@ def test_runner_requires_llm_layer_id() -> None:
request = _request(llm_layer_name="not-llm")
sink = InMemoryRunEventSink()
with pytest.raises(AgentRunValidationError, match="llm"):
asyncio.run(AgentRunRunner(sink=sink, request=request, run_id="run-4").run())
async def scenario() -> None:
async with httpx.AsyncClient() as client:
with pytest.raises(AgentRunValidationError, match="llm"):
await AgentRunRunner(
sink=sink,
request=request,
run_id="run-4",
plugin_daemon_http_client=client,
).run()
asyncio.run(scenario())
assert [event.type for event in sink.events["run-4"]] == ["run_started", "run_failed"]
assert sink.statuses["run-4"] == "failed"
def test_runner_rejects_closed_session_snapshot_as_validation_error() -> None:
request = _request()
request.session_snapshot = CompositorSessionSnapshot(
layers=[
LayerSessionSnapshot(
name="prompt",
lifecycle_state=LifecycleState.CLOSED,
runtime_state={},
),
LayerSessionSnapshot(
name="plugin",
lifecycle_state=LifecycleState.NEW,
runtime_state={},
),
LayerSessionSnapshot(
name=DIFY_AGENT_MODEL_LAYER_ID,
lifecycle_state=LifecycleState.NEW,
runtime_state={},
),
]
)
sink = InMemoryRunEventSink()
async def scenario() -> None:
async with httpx.AsyncClient() as client:
with pytest.raises(AgentRunValidationError, match="CLOSED snapshots cannot be entered"):
await AgentRunRunner(
sink=sink,
request=request,
run_id="run-closed-snapshot",
plugin_daemon_http_client=client,
).run()
asyncio.run(scenario())
assert [event.type for event in sink.events["run-closed-snapshot"]] == ["run_started", "run_failed"]
assert sink.statuses["run-closed-snapshot"] == "failed"

View File

@ -1,11 +1,15 @@
from __future__ import annotations
from typing import ClassVar
import pytest
from fastapi.testclient import TestClient
import dify_agent.server.app as app_module
from agenton.compositor import LayerRegistry
from dify_agent.runtime.compositor_factory import DifyAgentLayerProvider
from dify_agent.layers.dify_plugin.configs import DifyPluginLayerConfig
from dify_agent.layers.dify_plugin.plugin_layer import DifyPluginLayer
from dify_agent.server.app import create_app
from dify_agent.server.app import create_app, create_plugin_daemon_http_client
from dify_agent.server.settings import ServerSettings
from dify_agent.storage.redis_run_store import RedisRunStore
@ -25,19 +29,22 @@ class FakeRunScheduler:
store: object
shutdown_grace_seconds: float
layer_registry: LayerRegistry
layer_providers: tuple[DifyAgentLayerProvider, ...]
plugin_daemon_http_client: FakePluginDaemonHttpClient
shutdown_called: bool
def __init__(
self,
*,
store: object,
plugin_daemon_http_client: FakePluginDaemonHttpClient,
shutdown_grace_seconds: float,
layer_registry: LayerRegistry,
layer_providers: tuple[DifyAgentLayerProvider, ...],
) -> None:
self.store = store
self.shutdown_grace_seconds = shutdown_grace_seconds
self.layer_registry = layer_registry
self.layer_providers = layer_providers
self.plugin_daemon_http_client = plugin_daemon_http_client
self.shutdown_called = False
self.created.append(self)
@ -45,12 +52,80 @@ class FakeRunScheduler:
self.shutdown_called = True
class FakePluginDaemonHttpClient:
timeout: object | None
limits: object | None
trust_env: bool | None
is_closed: bool
def __init__(
self,
*,
timeout: object | None = None,
limits: object | None = None,
trust_env: bool | None = None,
) -> None:
self.timeout = timeout
self.limits = limits
self.trust_env = trust_env
self.is_closed = False
async def aclose(self) -> None:
self.is_closed = True
class FakeTimeout:
connect: float
read: float
write: float
pool: float
def __init__(self, *, connect: float, read: float, write: float, pool: float) -> None:
self.connect = connect
self.read = read
self.write = write
self.pool = pool
class FakeLimits:
max_connections: int
max_keepalive_connections: int
keepalive_expiry: float
def __init__(self, *, max_connections: int, max_keepalive_connections: int, keepalive_expiry: float) -> None:
self.max_connections = max_connections
self.max_keepalive_connections = max_keepalive_connections
self.keepalive_expiry = keepalive_expiry
class FakeRedisModule:
fake_redis: ClassVar[FakeRedis | None] = None
@staticmethod
def from_url(_url: str) -> FakeRedis:
assert FakeRedisModule.fake_redis is not None
return FakeRedisModule.fake_redis
class FakeHttpxModule:
Timeout: ClassVar[type[FakeTimeout]] = FakeTimeout
Limits: ClassVar[type[FakeLimits]] = FakeLimits
AsyncClient: ClassVar[type[FakePluginDaemonHttpClient]] = FakePluginDaemonHttpClient
def test_create_app_creates_scheduler_and_closes_after_shutdown(monkeypatch: pytest.MonkeyPatch) -> None:
fake_redis = FakeRedis()
fake_http_client = FakePluginDaemonHttpClient()
FakeRunScheduler.created.clear()
monkeypatch.setattr(app_module.Redis, "from_url", lambda _url: fake_redis)
FakeRedisModule.fake_redis = fake_redis
monkeypatch.setattr(app_module, "Redis", FakeRedisModule)
monkeypatch.setattr(app_module, "RunScheduler", FakeRunScheduler)
def fake_create_plugin_daemon_http_client(_settings: ServerSettings) -> FakePluginDaemonHttpClient:
return fake_http_client
monkeypatch.setattr(app_module, "create_plugin_daemon_http_client", fake_create_plugin_daemon_http_client)
settings = ServerSettings(
redis_url="redis://example.invalid/0",
redis_prefix="test",
@ -58,24 +133,53 @@ def test_create_app_creates_scheduler_and_closes_after_shutdown(monkeypatch: pyt
run_retention_seconds=7,
plugin_daemon_url="http://plugin-daemon",
plugin_daemon_api_key="daemon-secret",
plugin_daemon_timeout=12,
plugin_daemon_connect_timeout=1,
plugin_daemon_read_timeout=2,
plugin_daemon_write_timeout=3,
plugin_daemon_pool_timeout=4,
plugin_daemon_max_connections=5,
plugin_daemon_max_keepalive_connections=3,
plugin_daemon_keepalive_expiry=6,
)
with TestClient(create_app(settings)):
assert len(FakeRunScheduler.created) == 1
scheduler = FakeRunScheduler.created[0]
assert scheduler.shutdown_grace_seconds == 5
assert isinstance(scheduler.layer_registry, LayerRegistry)
descriptor = scheduler.layer_registry.resolve("dify.plugin")
assert descriptor.factory is not None
plugin_layer = descriptor.factory(DifyPluginLayerConfig(tenant_id="tenant-1", plugin_id="plugin-1"))
layer_providers = scheduler.layer_providers
assert isinstance(layer_providers, tuple)
plugin_provider = next(provider for provider in layer_providers if provider.type_id == "dify.plugin")
plugin_layer = plugin_provider.create_layer(DifyPluginLayerConfig(tenant_id="tenant-1", plugin_id="plugin-1"))
assert isinstance(plugin_layer, DifyPluginLayer)
assert plugin_layer.daemon_url == "http://plugin-daemon"
assert plugin_layer.daemon_api_key == "daemon-secret"
assert plugin_layer.timeout == 12
http_client = scheduler.plugin_daemon_http_client
assert http_client is fake_http_client
assert http_client.is_closed is False
store = scheduler.store
assert isinstance(store, RedisRunStore)
assert store.run_retention_seconds == 7
assert FakeRunScheduler.created[0].shutdown_called is True
assert FakeRunScheduler.created[0].plugin_daemon_http_client.is_closed is True
assert fake_redis.closed is True
def test_create_plugin_daemon_http_client_uses_configured_httpx_construction_args(
monkeypatch: pytest.MonkeyPatch,
) -> None:
monkeypatch.setattr(app_module, "httpx", FakeHttpxModule)
client = create_plugin_daemon_http_client(ServerSettings())
assert isinstance(client, FakePluginDaemonHttpClient)
assert isinstance(client.timeout, FakeTimeout)
assert client.timeout.connect == 10
assert client.timeout.read == 600
assert client.timeout.write == 30
assert client.timeout.pool == 10
assert isinstance(client.limits, FakeLimits)
assert client.limits.max_connections == 100
assert client.limits.max_keepalive_connections == 20
assert client.limits.keepalive_expiry == 30
assert client.trust_env is False

View File

@ -1,14 +1,15 @@
from fastapi.testclient import TestClient
from dify_agent.protocol import DIFY_AGENT_MODEL_LAYER_ID
from dify_agent.runtime.run_scheduler import SchedulerStoppingError
from dify_agent.runtime.run_scheduler import RunRequestValidationError, SchedulerStoppingError
from dify_agent.server.routes.runs import create_runs_router
from dify_agent.server.schemas import RunRecord
class FakeScheduler:
async def create_run(self, request: object) -> object:
raise AssertionError("blank prompt requests must be rejected before scheduling")
del request
raise RunRequestValidationError("run.user_prompts must not be empty")
class FakeStore:
@ -27,7 +28,7 @@ def test_create_run_rejects_effectively_blank_user_prompt_list() -> None:
response = client.post(
"/runs",
json={
"compositor": {
"composition": {
"schema_version": 1,
"layers": [{"name": "prompt", "type": "plain.prompt", "config": {"user": ["", " "]}}],
}
@ -35,7 +36,7 @@ def test_create_run_rejects_effectively_blank_user_prompt_list() -> None:
)
assert response.status_code == 422
assert response.json()["detail"] == "compositor.user_prompts must not be empty"
assert response.json()["detail"] == "run.user_prompts must not be empty"
def test_create_run_returns_running_from_scheduler() -> None:
@ -55,7 +56,7 @@ def test_create_run_returns_running_from_scheduler() -> None:
response = client.post(
"/runs",
json={
"compositor": {
"composition": {
"schema_version": 1,
"layers": [{"name": "prompt", "type": "plain.prompt", "config": {"user": "hello"}}],
}
@ -83,7 +84,7 @@ def test_create_run_accepts_valid_full_plugin_graph() -> None:
response = client.post(
"/runs",
json={
"compositor": {
"composition": {
"schema_version": 1,
"layers": [
{"name": "prompt", "type": "plain.prompt", "config": {"user": "hello"}},
@ -115,20 +116,25 @@ def test_create_run_accepts_valid_full_plugin_graph() -> None:
def test_create_run_rejects_unknown_layer_exit_signal_before_scheduling() -> None:
from fastapi import FastAPI
class UnknownSignalScheduler:
async def create_run(self, request: object) -> RunRecord:
del request
raise RunRequestValidationError("on_exit.layers references unknown layer ids: missing.")
app = FastAPI()
app.include_router(
create_runs_router(lambda: FakeStore(), lambda: FakeScheduler()) # pyright: ignore[reportArgumentType]
create_runs_router(lambda: FakeStore(), lambda: UnknownSignalScheduler()) # pyright: ignore[reportArgumentType]
)
client = TestClient(app)
response = client.post(
"/runs",
json={
"compositor": {
"composition": {
"schema_version": 1,
"layers": [{"name": "prompt", "type": "plain.prompt", "config": {"user": "hello"}}],
},
"layer_exit_signals": {"layers": {"missing": "delete"}},
"on_exit": {"layers": {"missing": "delete"}},
},
)
@ -136,6 +142,44 @@ def test_create_run_rejects_unknown_layer_exit_signal_before_scheduling() -> Non
assert "missing" in response.json()["detail"]
def test_create_run_rejects_closed_session_snapshot_with_422() -> None:
from fastapi import FastAPI
class ClosedSnapshotScheduler:
async def create_run(self, request: object) -> RunRecord:
del request
raise RunRequestValidationError("Layer 'prompt' is closed; CLOSED snapshots cannot be entered.")
app = FastAPI()
app.include_router(
create_runs_router(lambda: FakeStore(), lambda: ClosedSnapshotScheduler()) # pyright: ignore[reportArgumentType]
)
client = TestClient(app)
response = client.post(
"/runs",
json={
"composition": {
"schema_version": 1,
"layers": [{"name": "prompt", "type": "plain.prompt", "config": {"user": "hello"}}],
},
"session_snapshot": {
"schema_version": 1,
"layers": [
{
"name": "prompt",
"lifecycle_state": "closed",
"runtime_state": {},
}
],
},
},
)
assert response.status_code == 422
assert "CLOSED snapshots cannot be entered" in response.json()["detail"]
def test_create_run_returns_503_when_scheduler_is_stopping() -> None:
from fastapi import FastAPI
@ -153,7 +197,7 @@ def test_create_run_returns_503_when_scheduler_is_stopping() -> None:
response = client.post(
"/runs",
json={
"compositor": {
"composition": {
"schema_version": 1,
"layers": [{"name": "prompt", "type": "plain.prompt", "config": {"user": "hello"}}],
}
@ -181,7 +225,7 @@ def test_create_run_does_not_map_infrastructure_failure_to_422() -> None:
response = client.post(
"/runs",
json={
"compositor": {
"composition": {
"schema_version": 1,
"layers": [{"name": "prompt", "type": "plain.prompt", "config": {"user": "hello"}}],
}

View File

@ -35,7 +35,9 @@ class FakeRedis:
entries.append((event_id, dict(fields)))
return event_id
async def xrange(self, key: str, *, min: str = "-", count: int | None = None) -> list[tuple[str, dict[str, object]]]:
async def xrange(
self, key: str, *, min: str = "-", count: int | None = None
) -> list[tuple[str, dict[str, object]]]:
self.commands.append(("xrange", key, min, count))
entries = [entry for entry in self.streams.get(key, []) if self._is_after_min(entry[0], min)]
if count is not None:
@ -114,7 +116,7 @@ def test_get_events_round_trips_run_succeeded_output_and_session_snapshot() -> N
layers=[
LayerSessionSnapshot(
name="prompt",
state=LifecycleState.SUSPENDED,
lifecycle_state=LifecycleState.SUSPENDED,
runtime_state={"resource_id": "abc"},
)
]

View File

@ -56,4 +56,4 @@ def test_agenton_session_snapshot_example_smoke() -> None:
assert result.returncode == 0, result.stderr
assert "Snapshot:" in result.stdout
assert "Rehydrated handle: restored:demo-connection" in result.stdout
assert "Rehydrated external handle: restored:demo-connection" in result.stdout