From ae4a3f75f43d7cb0147146a8a50907151220997a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=9B=90=E7=B2=92=20Yanli?= Date: Wed, 13 May 2026 03:51:37 +0800 Subject: [PATCH] refactor dify-agent agenton run model --- dify-agent/docs/agenton/api/index.md | 228 +++++++++--------- dify-agent/docs/agenton/guide/index.md | 217 +++++++++-------- dify-agent/docs/dify-agent/api/index.md | 65 ++--- dify-agent/docs/dify-agent/guide/index.md | 35 ++- .../agenton/agenton_examples/basics.py | 122 +++++----- .../agenton_examples/pydantic_ai_bridge.py | 82 ++++--- .../agenton_examples/session_snapshot.py | 52 ++-- .../run_server_consumer.py | 11 +- .../run_server_sync_client.py | 11 +- .../layers/dify_plugin/llm_layer.py | 22 +- .../layers/dify_plugin/plugin_layer.py | 91 ++----- .../src/dify_agent/protocol/__init__.py | 6 + dify-agent/src/dify_agent/protocol/schemas.py | 112 +++++++-- .../src/dify_agent/runtime/agent_factory.py | 26 +- .../dify_agent/runtime/agenton_validation.py | 28 +++ .../dify_agent/runtime/compositor_factory.py | 65 +++-- .../dify_agent/runtime/layer_exit_signals.py | 29 ++- .../src/dify_agent/runtime/run_scheduler.py | 84 +++++-- dify-agent/src/dify_agent/runtime/runner.py | 98 +++++--- .../runtime/user_prompt_validation.py | 14 +- dify-agent/src/dify_agent/server/app.py | 56 +++-- .../src/dify_agent/server/routes/runs.py | 21 +- dify-agent/src/dify_agent/server/settings.py | 16 +- .../local/dify_agent/client/test_client.py | 9 +- .../layers/dify_plugin/test_layers.py | 222 ++++++----------- .../protocol/test_protocol_schemas.py | 75 +++++- .../dify_agent/runtime/test_run_scheduler.py | 209 +++++++++++----- .../local/dify_agent/runtime/test_runner.py | 200 +++++++++++---- .../tests/local/dify_agent/server/test_app.py | 128 +++++++++- .../dify_agent/server/test_runs_routes.py | 66 ++++- .../storage/test_redis_run_store.py | 6 +- .../local/examples/test_agenton_examples.py | 2 +- 32 files changed, 1442 insertions(+), 966 deletions(-) create mode 100644 dify-agent/src/dify_agent/runtime/agenton_validation.py diff --git a/dify-agent/docs/agenton/api/index.md b/dify-agent/docs/agenton/api/index.md index c0a864665a..994f81e47f 100644 --- a/dify-agent/docs/agenton/api/index.md +++ b/dify-agent/docs/agenton/api/index.md @@ -1,48 +1,44 @@ # Agenton API reference -This page summarizes the public Agenton API. Import paths are shown for the -symbols commonly used by layer authors and compositor callers. +This page summarizes the public Agenton API. Import paths are shown for symbols +commonly used by layer authors and compositor callers. ## Layers: `agenton.layers` -### `Layer[DepsT, PromptT, UserPromptT, ToolT, ConfigT, RuntimeStateT, RuntimeHandlesT]` +### `Layer[DepsT, PromptT, UserPromptT, ToolT, ConfigT, RuntimeStateT]` -Framework-neutral base class for prompt/tool layers. +Framework-neutral base class for invocation-scoped prompt/tool layers. Class attributes: -- `type_id: str | None`: registry id for config-backed plugin layers. -- `config_type: type[LayerConfig]`: Pydantic schema for serialized layer config. +- `type_id: str | None`: provider id for config-backed graph nodes. +- `config_type: type[LayerConfig]`: Pydantic schema for per-run layer config. - `runtime_state_type: type[BaseModel]`: Pydantic schema for snapshot-safe - per-session state. -- `runtime_handles_type: type[BaseModel]`: Pydantic schema for live runtime - handles; use `arbitrary_types_allowed=True` for client/process objects. + per-layer state. - `deps_type: type[LayerDeps]`: inferred from the layer generic base or declared explicitly. +Invocation attributes assigned by `CompositorRun`: + +- `config: ConfigT` +- `deps: DepsT` +- `runtime_state: RuntimeStateT` + Construction and dependency APIs: -- `from_config(config: ConfigT) -> Self`: create a layer from schema-validated - config. The default implementation raises `TypeError`. +- `from_config(config: ConfigT) -> Self`: create a fresh layer from + schema-validated config. The default implementation supports only empty config. - `dependency_names() -> frozenset[str]`: dependency fields declared by `deps_type`. -- `bind_deps(deps: Mapping[str, Layer | None]) -> None`: bind graph dependencies. -- `new_control(state=LifecycleState.NEW, runtime_state=None) -> LayerControl`: create - a schema-validated per-session control. -- `require_control(control, active=False) -> LayerControl`: validate that a - capability method received this layer's own control with the expected runtime - schemas, optionally requiring `LifecycleState.ACTIVE`. +- `bind_deps(deps: Mapping[str, Layer | None]) -> None`: bind direct layer + instance dependencies for one invocation. Lifecycle hooks: -- `on_context_create(control)` -- `on_context_resume(control)` -- `on_context_suspend(control)` -- `on_context_delete(control)` -- `enter(control)` / `lifecycle_enter(control)`: async context manager entry - surface. The base lifecycle owns the per-entry resource stack; override - `enter()` only for unusual wrapping that cannot be expressed as registered - resources. +- `on_context_create() -> None` +- `on_context_resume() -> None` +- `on_context_suspend() -> None` +- `on_context_delete() -> None` Prompt/tool authoring surfaces: @@ -57,50 +53,23 @@ Aggregation adapters implemented by typed layer families: - `wrap_user_prompt(prompt: UserPromptT) -> object` - `wrap_tool(tool: ToolT) -> object` -### `LayerControl[RuntimeStateT, RuntimeHandlesT]` - -Per-layer, per-session lifecycle control. - -Fields: - -- `state: LifecycleState` -- `exit_intent: ExitIntent` -- `runtime_state: RuntimeStateT` -- `runtime_handles: RuntimeHandlesT` - -Methods: - -- `suspend_on_exit() -> None` -- `delete_on_exit() -> None` -- `enter_async_resource(cm) -> T`: enter an async context manager on the current - entry resource stack and return its resource. -- `add_async_cleanup(callback) -> None`: register an async cleanup callback on the - current entry resource stack. -- `control_for(dep_layer) -> LayerControl`: resolve the unique dependency control - whose resolved target is `dep_layer` in the same session. -- `control_for(dep_name, dep_layer) -> LayerControl`: resolve a named dependency - control when multiple dependency fields could point at the same layer instance. - -`runtime_state` is serialized in session snapshots. `runtime_handles` is never -serialized and should be rehydrated from runtime state in resume hooks. Private -owner links used by `control_for` and the per-entry resource stack are -runtime-only and are not snapshotted. Resource-stack APIs are available only -while a layer entry is being created/resumed, active, or exiting. - ### Schema defaults and lifecycle enums -- `EmptyLayerConfig` -- `LayerConfig`: base DTO for serializable layer config schemas -- `LayerConfigValue`: JSON value or concrete `LayerConfig` DTO -- `EmptyRuntimeState` -- `EmptyRuntimeHandles` -- `LifecycleState`: `NEW`, `ACTIVE`, `SUSPENDED`, `CLOSED` -- `ExitIntent`: `DELETE`, `SUSPEND` +- `LayerConfig`: base DTO for serializable layer config schemas. +- `LayerConfigValue`: JSON value or concrete `LayerConfig` DTO. +- `EmptyLayerConfig`: default config schema for layers without config. +- `EmptyRuntimeState`: default serializable runtime-state schema. +- `LayerDeps`: typed dependency container base. +- `NoLayerDeps`: dependency container for layers with no dependencies. +- `LifecycleState`: `NEW`, `ACTIVE`, `SUSPENDED`, `CLOSED`. +- `ExitIntent`: `DELETE`, `SUSPEND`. + +`ACTIVE` is internal to an entered run and is rejected in external snapshots. ### Typed layer families: `agenton.layers.types` -- `PlainLayer[DepsT, ConfigT, RuntimeStateT, RuntimeHandlesT]` -- `PydanticAILayer[DepsT, AgentDepsT, ConfigT, RuntimeStateT, RuntimeHandlesT]` +- `PlainLayer[DepsT, ConfigT, RuntimeStateT]` +- `PydanticAILayer[DepsT, AgentDepsT, ConfigT, RuntimeStateT]` Tagged aggregate item types: @@ -112,47 +81,71 @@ Tagged aggregate item types: ### Config models -- `LayerNodeConfig`: `name`, `type`, `config`, `deps`, `metadata` -- `CompositorConfig`: `schema_version`, `layers` +- `LayerNodeConfig`: `name`, `type`, `deps`, `metadata`. +- `CompositorConfig`: `schema_version`, `layers`. +- `LayerConfigInput`: accepted per-run config input for one node. -Config nodes are pure serializable graph input. `LayerNodeConfig.config` accepts -plain JSON values or concrete `LayerConfig` DTO instances and serializes DTOs as -JSON objects. Use live instances for Python objects and callables. +Config nodes are pure serializable graph topology. Per-run layer config is passed +separately to `Compositor.enter(configs=...)` keyed by node name. -### Registry +### Providers and graph nodes -`LayerRegistry` manually registers config-backed layer classes. +`LayerProvider[LayerT]` is a reusable validated factory for one concrete layer +class. -- `register_layer(layer_type, type_id=None, factory=None) -> None` -- `resolve(type_id) -> LayerDescriptor` -- `descriptors() -> Mapping[str, LayerDescriptor]` +- `LayerProvider.from_layer_type(layer_type) -> LayerProvider`: construct through + `layer_type.from_config`. +- `LayerProvider.from_factory(layer_type=..., create=...) -> LayerProvider`: + construct through a custom typed-config factory. +- `type_id -> str | None`: provider id declared by the layer type. +- `validate_config(config=None) -> LayerConfig`: validate config without invoking + the factory. +- `create_layer(config=None) -> LayerT`: validate config and create a fresh layer. +- `create_layer_from_config(config) -> LayerT`: create from already validated + config and enforce fresh-instance semantics. -`LayerDescriptor` exposes `type_id`, `layer_type`, `config_type`, -`runtime_state_type`, `runtime_handles_type`, and optional `factory`. +`LayerNode(name, implementation, deps=None, metadata=None)` creates a stateless +graph node from a `Layer` subclass or `LayerProvider`. `deps` maps dependency +field names on the node's layer class to other node names. -### Builder - -`CompositorBuilder(registry)` mixes config-backed nodes and live instances. - -- `add_config(config) -> Self` -- `add_config_layer(name, type, config=None, deps=None) -> Self` -- `add_instance(name, layer, deps=None) -> Self` -- `build(prompt_transformer=None, user_prompt_transformer=None, tool_transformer=None) -> Compositor` - -### Compositor +### `Compositor` `Compositor[PromptT, ToolT, LayerPromptT, LayerToolT, UserPromptT, LayerUserPromptT]` -owns the ordered layer graph. - -Dependency binding uses explicit `deps={dep_name: target_layer_name}` mappings -first, then implicit same-name layer binding. Optional dependencies without a -target are recorded as absent so `LayerControl.control_for(...)` raises `KeyError` -rather than returning a control. +owns the ordered graph plan and provider construction plans. Construction: -- `Compositor(layers=..., deps_name_mapping=..., ...)` -- `Compositor.from_config(conf, registry=..., ...)` +- `Compositor(nodes, prompt_transformer=None, user_prompt_transformer=None, tool_transformer=None)`. +- `Compositor.from_config(conf, providers=..., node_providers=None, prompt_transformer=None, user_prompt_transformer=None, tool_transformer=None)`. + +Public properties and entry API: + +- `nodes -> tuple[LayerNode, ...]`: stateless graph plan in order. +- `enter(configs=None, session_snapshot=None) -> AsyncIterator[CompositorRun]`: + validate per-run configs and optional snapshot, create fresh layers, bind direct + dependencies, enter hooks in graph order, and exit hooks in reverse order. + +`providers` resolve graph node `type` ids. `node_providers` are keyed by node name +and override type-id providers for node-specific construction. + +### `CompositorRun` + +`CompositorRun` is the single-invocation runtime object yielded by +`Compositor.enter(...)`. + +Fields: + +- `slots: OrderedDict[str, LayerRunSlot]` +- `session_snapshot: CompositorSessionSnapshot | None` + +Layer access and exit intent: + +- `get_layer(name) -> Layer` +- `get_layer(name, layer_type) -> LayerT` +- `suspend_on_exit() -> None` +- `delete_on_exit() -> None` +- `suspend_layer_on_exit(name) -> None` +- `delete_layer_on_exit(name) -> None` Aggregation properties: @@ -162,27 +155,24 @@ Aggregation properties: `user_prompt_transformer`. - `tools -> list[ToolT]`: tools in layer order, then optional `tool_transformer`. -Session APIs: +Snapshot API: -- `new_session() -> CompositorSession` -- `enter(session=None) -> AsyncIterator[CompositorSession]` -- `snapshot_session(session) -> CompositorSessionSnapshot` -- `session_from_snapshot(snapshot) -> CompositorSession` +- `snapshot_session() -> CompositorSessionSnapshot`: snapshot non-active layer + lifecycle state and runtime state. -### Sessions and snapshots +`session_snapshot` is populated after context exit. Core run slots default to +delete-on-exit; request suspend before exit when the next snapshot must be +resumable. -`CompositorSession` owns ordered layer controls. +### Run slots and snapshots -- `suspend_on_exit() -> None` -- `delete_on_exit() -> None` -- `layer(name) -> LayerControl` +- `LayerRunSlot`: `layer`, `lifecycle_state`, `exit_intent`. +- `LayerSessionSnapshot`: `name`, `lifecycle_state`, `runtime_state`. +- `CompositorSessionSnapshot`: `schema_version`, `layers`. -Snapshot models: - -- `LayerSessionSnapshot`: `name`, `state`, `runtime_state` -- `CompositorSessionSnapshot`: `schema_version`, `layers` - -Snapshots reject active sessions and exclude `runtime_handles` and `exit_intent`. +Snapshots include ordered layer lifecycle state and JSON-safe runtime state only. +They exclude live resources, dependencies, prompts, tools, per-run config, and +exit intent. ## Collection layers and transformers @@ -190,9 +180,11 @@ Snapshots reject active sessions and exclude `runtime_handles` and `exit_intent` - `PromptLayer`: config-backed layer with `PromptLayerConfig(prefix, user, suffix)` and `type_id = "plain.prompt"`. -- `ObjectLayer`: instance-only layer for Python objects. -- `ToolsLayer`: instance-only layer for callables. -- `DynamicToolsLayer`: instance-only layer for object-bound callables. +- `ObjectLayer`: factory-backed layer for Python objects. +- `ToolsLayer`: factory-backed layer for plain callables. +- `DynamicToolsLayer`: factory-backed layer for object-bound callables. +- `with_object`: decorator for dynamic tools whose first argument is supplied by + an `ObjectLayer` dependency. ### Pydantic AI bridge @@ -200,8 +192,10 @@ Snapshots reject active sessions and exclude `runtime_handles` and `exit_intent` pydantic-ai system prompts, user prompts, and tools while depending on an `ObjectLayer` for `RunContext.deps`. -`agenton_collections.transformers.PYDANTIC_AI_TRANSFORMERS` provides: +`agenton_collections.transformers.pydantic_ai.PYDANTIC_AI_TRANSFORMERS` provides: -- `prompt_transformer`: maps `compositor.prompts` to pydantic-ai system prompt functions. -- `user_prompt_transformer`: maps `compositor.user_prompts` to pydantic-ai `UserContent`. -- `tool_transformer`: maps `compositor.tools` to pydantic-ai tools. +- `prompt_transformer`: maps tagged Agenton prompt items to pydantic-ai system + prompt functions. +- `user_prompt_transformer`: maps tagged Agenton user prompt items to pydantic-ai + `UserContent` values. +- `tool_transformer`: maps tagged Agenton tool items to pydantic-ai tools. diff --git a/dify-agent/docs/agenton/guide/index.md b/dify-agent/docs/agenton/guide/index.md index 39b7febfb8..e9d322d7c8 100644 --- a/dify-agent/docs/agenton/guide/index.md +++ b/dify-agent/docs/agenton/guide/index.md @@ -1,175 +1,184 @@ # Agenton user guide -Agenton composes shared `Layer` instances into a named graph. Treat layer -instances as reusable capability definitions: config and dependency declarations -belong on the layer class or instance, while per-session runtime values belong -on the `LayerControl` created for that layer in a `CompositorSession`. +Agenton composes reusable graph plans from `LayerNode`s and `LayerProvider`s. +The core is state-only: a `Compositor` stores no live layer instances, clients, +cleanup stacks, or run state. Each `Compositor.enter(...)` call creates a fresh +`CompositorRun` with new layer instances, direct dependency bindings, lifecycle +state, and an optional hydrated session snapshot. -## Config, runtime state, and runtime handles +## Config and runtime state -- **Config** is serializable graph input. Config-constructible layers declare a - `type_id` and a Pydantic `LayerConfig` schema; builders validate node config - before calling `Layer.from_config(validated_config)`. -- **Runtime state** is serializable per-layer/per-session state. Layers declare a - Pydantic `runtime_state_type`; session snapshots persist this model with - `model_dump(mode="json")`. -- **Runtime handles** are live Python objects such as clients, open files, or - process handles. Layers declare a Pydantic `runtime_handles_type` with - `arbitrary_types_allowed=True`. Handles are never serialized; resume hooks - should rehydrate them from runtime state. Register handles that need async - cleanup with the control's entry resource stack rather than closing them - manually in layer instances. +- **Graph config** is serializable topology: node `name`, provider `type`, + dependency mappings, and metadata. `LayerNodeConfig` deliberately contains no + layer config. +- **Per-run layer config** is passed to `Compositor.enter(configs=...)` as a + mapping keyed by node name. Providers validate each value with the layer's + `config_type` before any factory runs. +- **Runtime state** is serializable per-layer invocation state on + `layer.runtime_state`. Session snapshots persist only lifecycle state and this + model's JSON-safe data. +- **Live Python resources** such as clients, files, sockets, or process handles + stay outside Agenton core. Own them in application code or integration-specific + context managers that wrap compositor entry. ## Define a config-backed layer -Use a `LayerConfig` model for config and pass it through the typed layer family so -`Layer.__init_subclass__` can infer the schema: +Use a `LayerConfig` model for per-run config and inherit from a typed layer family +so `Layer.__init_subclass__` can infer schemas: ```python {test="skip" lint="skip"} +from dataclasses import dataclass + +from pydantic import ConfigDict +from typing_extensions import Self, override + +from agenton.layers import LayerConfig, NoLayerDeps, PlainLayer + + class GreetingConfig(LayerConfig): prefix: str model_config = ConfigDict(extra="forbid") -@dataclass +@dataclass(slots=True) class GreetingLayer(PlainLayer[NoLayerDeps, GreetingConfig]): type_id = "example.greeting" + prefix: str @classmethod + @override def from_config(cls, config: GreetingConfig) -> Self: return cls(prefix=config.prefix) @property + @override def prefix_prompts(self) -> list[str]: return [self.prefix] ``` -Omitted schema slots default to `EmptyLayerConfig`, `EmptyRuntimeState`, and -`EmptyRuntimeHandles`. Lifecycle hooks can annotate controls as -`LayerControl[MyState, MyHandles]` to get static checking and IDE completion for -runtime state and handles. +Omitted schema slots default to `EmptyLayerConfig` and `EmptyRuntimeState`. +Lifecycle hooks are no-argument methods on the layer instance; use `self.deps` +for dependencies and `self.runtime_state` for serializable mutable state. ## Live resources -The base lifecycle creates a resource stack for each `LayerControl` entry before -`on_context_create` or `on_context_resume` runs. Enter async resources through the -control, store the live handle in `runtime_handles`, and clear the handle in -`on_context_suspend`/`on_context_delete`; the resource stack performs the actual -close after those hooks and also cleans up if create/resume or the context body -raises. +Agenton does not own resource cleanup. Keep live resources in the surrounding +application and pass them to capability methods explicitly: ```python {test="skip" lint="skip"} -class ClientHandles(BaseModel): - client: httpx.AsyncClient | None = None - - model_config = ConfigDict(arbitrary_types_allowed=True) +@dataclass(slots=True) +class ClientUserLayer(PlainLayer[NoLayerDeps]): + def make_client_user(self, *, http_client: httpx.AsyncClient) -> ClientUser: + return ClientUser(http_client) -@dataclass -class ClientLayer(PlainLayer[NoLayerDeps, EmptyLayerConfig, EmptyRuntimeState, ClientHandles]): - async def on_context_create(self, control: LayerControl[EmptyRuntimeState, ClientHandles]) -> None: - control.runtime_handles.client = await control.enter_async_resource(httpx.AsyncClient()) - - async def on_context_delete(self, control: LayerControl[EmptyRuntimeState, ClientHandles]) -> None: - control.runtime_handles.client = None - - def make_client_user(self, control: LayerControl) -> ClientUser: - control = self.require_control(control, active=True) - if control.runtime_handles.client is None: - raise RuntimeError("client is not available") - return ClientUser(control.runtime_handles.client) +compositor = Compositor([LayerNode("client_user", ClientUserLayer)]) +async with httpx.AsyncClient() as http_client: + async with compositor.enter() as run: + layer = run.get_layer("client_user", ClientUserLayer) + user = layer.make_client_user(http_client=http_client) ``` -`Layer.require_control(control, active=True)` is the recommended first line for -capability methods that read runtime state or handles. It verifies that callers -passed this layer's own control from the current session and, when requested, that -the control is active. +This keeps deterministic cleanup at the integration boundary and leaves Agenton +snapshots limited to serializable runtime state. -## Register layers and build a compositor +## Build a compositor -Register config-constructible layers manually: +Use providers for config-backed layers and pass per-run config at entry time: ```python {test="skip" lint="skip"} -registry = LayerRegistry() -registry.register_layer(PromptLayer) # uses PromptLayer.type_id == "plain.prompt" -``` +from agenton.compositor import Compositor, CompositorConfig, LayerNodeConfig, LayerProvider +from agenton_collections.layers.plain import PromptLayer, PromptLayerConfig -Use `CompositorBuilder` to mix serializable config nodes with live instances: -```python {test="skip" lint="skip"} -compositor = ( - CompositorBuilder(registry) - .add_config( - { - "layers": [ - { - "name": "prompt", - "type": "plain.prompt", - "config": {"prefix": "Hi", "user": "Answer with examples."}, - } - ] - } - ) - .add_instance(name="profile", layer=ObjectLayer(profile)) - .build() +providers = ( + LayerProvider.from_layer_type(PromptLayer), + LayerProvider.from_layer_type(GreetingLayer), ) +compositor = Compositor.from_config( + CompositorConfig( + layers=[ + LayerNodeConfig(name="prompt", type="plain.prompt"), + LayerNodeConfig(name="greeting", type="example.greeting"), + ] + ), + providers=providers, +) + +async with compositor.enter( + configs={ + "prompt": PromptLayerConfig(user="Answer with examples."), + "greeting": GreetingConfig(prefix="Hi"), + } +) as run: + prompts = run.prompts ``` -Use `.add_instance()` for layers that require Python objects or callables, such -as `ObjectLayer`, `ToolsLayer`, and dynamic tool layers. +Use `LayerProvider.from_factory(...)` when construction needs Python objects or +callables. Provider factories receive only validated config and must return a +fresh layer instance for every invocation. For node-specific construction with +`Compositor.from_config`, pass a `node_providers={"node_name": provider}` mapping +to override the provider selected by type id for that node. -## Dependency controls +## Dependencies -Layer dependencies bind layer instances on `self.deps`. When a layer method also -needs the dependency's per-session state or handles, pass the current layer's -`LayerControl` into that method and resolve the dependency control from the same -session: +Layer dependencies bind direct layer instances onto `self.deps` for one run. +Dependency mappings use dependency field names as keys and compositor node names +as values: ```python {test="skip" lint="skip"} class ModelDeps(LayerDeps): plugin: PluginLayer -@dataclass +@dataclass(slots=True) class ModelLayer(PlainLayer[ModelDeps]): - def make_model(self, control: LayerControl) -> Model: - plugin_control = control.control_for(self.deps.plugin) - return self.deps.plugin.make_provider(plugin_control) + def make_model(self) -> Model: + return self.deps.plugin.make_provider() ``` -Use `control.control_for(dep_name, dep_layer)` when more than one dependency -field can point at the same layer instance. Optional dependencies that were not -bound have no control and raise `KeyError` if requested. +Optional dependencies are assigned `None` when absent. Missing required +dependencies, unknown dependency keys, and dependency targets with the wrong layer +type fail before lifecycle hooks run. -## System prompts and user prompts +## System prompts, user prompts, and tools -Layers expose three prompt surfaces: +Layers expose four authoring surfaces: - `prefix_prompts`: system prompt fragments collected in layer order. - `suffix_prompts`: system prompt fragments collected in reverse layer order. - `user_prompts`: user-message fragments collected in layer order. +- `tools`: tool entries collected in layer order. -`PromptLayer` accepts `prefix`, `user`, and `suffix` config fields. For -pydantic-ai, `PYDANTIC_AI_TRANSFORMERS` maps `compositor.prompts` to system -prompt functions and `compositor.user_prompts` to values suitable for -`Agent.run(user_prompt=...)`. +`PromptLayer` accepts `prefix`, `user`, and `suffix` config fields. Aggregation is +available on the active `CompositorRun` as `run.prompts`, `run.user_prompts`, and +`run.tools`. For pydantic-ai, import +`agenton_collections.transformers.pydantic_ai.PYDANTIC_AI_TRANSFORMERS` and pass +it to `Compositor(...)` or `Compositor.from_config(...)` so tagged layer items are +converted to Pydantic AI prompt, user prompt, and tool values. ## Session snapshot and restore -`Compositor.snapshot_session(session)` serializes non-active sessions, including -layer lifecycle state and runtime state. It rejects active sessions because live -handles cannot be snapshotted safely. Restore with -`Compositor.session_from_snapshot(snapshot)`; restored controls validate runtime -state with each layer schema and initialize empty runtime handles. Suspended -sessions resume through `on_context_resume`, where handles should be hydrated -from the restored runtime state. +Core Agenton run slots default to delete-on-exit. Call `run.suspend_on_exit()` or +`run.suspend_layer_on_exit(name)` inside the active context when the next snapshot +should be resumable: -Create sessions with `Compositor.new_session()` or -`Compositor.session_from_snapshot()`. `Compositor.enter()` validates that every -session control uses the target layer's runtime state and handle schemas before -any lifecycle hook runs. +```python {test="skip" lint="skip"} +async with compositor.enter(configs=configs) as run: + run.suspend_on_exit() + +snapshot = run.session_snapshot +async with compositor.enter(configs=configs, session_snapshot=snapshot) as restored_run: + restored_layer = restored_run.get_layer("stateful", StatefulLayer) +``` + +`run.session_snapshot` is populated after context exit. Snapshots include ordered +layer names, non-active lifecycle states, and JSON-safe runtime state only. Active +state is rejected at the DTO boundary, and closed layers cannot be entered again. +To resume, pass the snapshot to a later `Compositor.enter(...)` call with the same +layer names and order. See also: diff --git a/dify-agent/docs/dify-agent/api/index.md b/dify-agent/docs/dify-agent/api/index.md index 0ceea8b9c3..ace323e56f 100644 --- a/dify-agent/docs/dify-agent/api/index.md +++ b/dify-agent/docs/dify-agent/api/index.md @@ -1,8 +1,8 @@ # Dify Agent Run API -The Dify Agent API exposes asynchronous agent runs backed by Agenton compositor -configuration, Pydantic AI runtime execution, Redis run records, and per-run Redis -Streams event logs. The FastAPI application lives at +The Dify Agent API exposes asynchronous agent runs backed by Agenton state-only +layer composition, Pydantic AI runtime execution, Redis run records, and per-run +Redis Streams event logs. The FastAPI application lives at `dify-agent/src/dify_agent/server/app.py`. Public Python DTOs and event models are exported from @@ -11,14 +11,14 @@ server-only and should not be used by API consumers. ## Input model -Create-run requests accept a `CompositorConfig` and an optional +Create-run requests accept a public `RunComposition` and an optional `CompositorSessionSnapshot`. There is **no top-level `user_prompt` or model profile field**. User input and model/provider selection are supplied by Agenton -layers. `layer_exit_signals` optionally controls whether layers suspend or delete -when the run leaves the active session; the default is suspend for all layers. In -the MVP server, the safe config-constructible layer registry includes -`plain.prompt`, `dify.plugin`, and `dify.plugin.llm`. The runtime reads the LLM -model layer named by `DIFY_AGENT_MODEL_LAYER_ID`, whose public value is `"llm"`. +layers. `on_exit` optionally controls whether layers suspend or delete when the +run leaves the active session; the default is suspend for all layers. In the MVP +server, the safe provider set includes `plain.prompt`, `dify.plugin`, and +`dify.plugin.llm`. The runtime reads the LLM model layer named by +`DIFY_AGENT_MODEL_LAYER_ID`, whose public value is `"llm"`. Blank user input is rejected. A request with no user prompt, an empty string, or only whitespace strings such as `"user": ["", " "]` returns `422` before a run @@ -38,7 +38,7 @@ Request: ```json { - "compositor": { + "composition": { "schema_version": 1, "layers": [ { @@ -77,7 +77,7 @@ Request: ] }, "session_snapshot": null, - "layer_exit_signals": { + "on_exit": { "default": "suspend", "layers": { "prompt": "delete" @@ -100,16 +100,16 @@ same FastAPI process. Redis is not used as a job queue. Run records and per-run event streams expire after `DIFY_AGENT_RUN_RETENTION_SECONDS`, which defaults to `259200` seconds (3 days). -`dify.plugin` receives tenant/plugin identity only; daemon URL, API key, and -timeout are server settings. `dify.plugin.llm.credentials` accepts scalar values -only (`string`, `number`, `boolean`, or `null`). Unknown -`layer_exit_signals.layers` keys return `422` before a run record is created. +`dify.plugin` receives tenant/plugin identity only; daemon URL, API key, timeout, +and connection limits are server settings. `dify.plugin.llm.credentials` accepts +scalar values only (`string`, `number`, `boolean`, or `null`). Unknown +`on_exit.layers` keys return `422` before a run record is created. Validation error example (`422`): ```json { - "detail": "compositor.user_prompts must not be empty" + "detail": "run.user_prompts must not be empty" } ``` @@ -196,24 +196,30 @@ Use `dify_agent.client.Client` for both async and sync code. Async methods use normal names; sync methods add `_sync`. ```python {test="skip" lint="skip"} -from agenton.compositor import CompositorConfig, LayerNodeConfig +from agenton.layers import ExitIntent from agenton_collections.layers.plain import PromptLayerConfig from dify_agent.client import Client from dify_agent.layers.dify_plugin import DifyPluginLLMLayerConfig, DifyPluginLayerConfig -from dify_agent.protocol import DIFY_AGENT_MODEL_LAYER_ID, CreateRunRequest, LayerExitSignals +from dify_agent.protocol import ( + DIFY_AGENT_MODEL_LAYER_ID, + CreateRunRequest, + LayerExitSignals, + RunComposition, + RunLayerSpec, +) async def main() -> None: request = CreateRunRequest( - compositor=CompositorConfig( + composition=RunComposition( layers=[ - LayerNodeConfig(name="prompt", type="plain.prompt", config=PromptLayerConfig(user="hello")), - LayerNodeConfig( + RunLayerSpec(name="prompt", type="plain.prompt", config=PromptLayerConfig(user="hello")), + RunLayerSpec( name="plugin", type="dify.plugin", config=DifyPluginLayerConfig(tenant_id="tenant-id", plugin_id="langgenius/openai"), ), - LayerNodeConfig( + RunLayerSpec( name=DIFY_AGENT_MODEL_LAYER_ID, type="dify.plugin.llm", deps={"plugin": "plugin"}, @@ -225,7 +231,7 @@ async def main() -> None: ), ] ), - layer_exit_signals=LayerExitSignals(layers={"prompt": "delete"}), + on_exit=LayerExitSignals(layers={"prompt": ExitIntent.DELETE}), ) async with Client(base_url="http://localhost:8000") as client: run = await client.create_run(request) @@ -234,23 +240,22 @@ async def main() -> None: ``` ```python {test="skip" lint="skip"} -from agenton.compositor import CompositorConfig, LayerNodeConfig from agenton_collections.layers.plain import PromptLayerConfig from dify_agent.client import Client from dify_agent.layers.dify_plugin import DifyPluginLLMLayerConfig, DifyPluginLayerConfig -from dify_agent.protocol import DIFY_AGENT_MODEL_LAYER_ID, CreateRunRequest +from dify_agent.protocol import DIFY_AGENT_MODEL_LAYER_ID, CreateRunRequest, RunComposition, RunLayerSpec request = CreateRunRequest( - compositor=CompositorConfig( + composition=RunComposition( layers=[ - LayerNodeConfig(name="prompt", type="plain.prompt", config=PromptLayerConfig(user="hello")), - LayerNodeConfig( + RunLayerSpec(name="prompt", type="plain.prompt", config=PromptLayerConfig(user="hello")), + RunLayerSpec( name="plugin", type="dify.plugin", config=DifyPluginLayerConfig(tenant_id="tenant-id", plugin_id="langgenius/openai"), ), - LayerNodeConfig( + RunLayerSpec( name=DIFY_AGENT_MODEL_LAYER_ID, type="dify.plugin.llm", deps={"plugin": "plugin"}, @@ -297,7 +302,7 @@ Each event keeps the same envelope shape and has typed `data`: `run_started` use CompositorSessionSnapshot }`, and `run_failed` uses `{ "error": string, "reason": string | null }`. The session snapshot from `run_succeeded.data` can be sent as `session_snapshot` in a later create-run request with the same -compositor layer names and order. +composition layer names and order. ## Consumer examples diff --git a/dify-agent/docs/dify-agent/guide/index.md b/dify-agent/docs/dify-agent/guide/index.md index 4e82e7c741..012bd3a598 100644 --- a/dify-agent/docs/dify-agent/guide/index.md +++ b/dify-agent/docs/dify-agent/guide/index.md @@ -12,14 +12,16 @@ Start Redis, then run one FastAPI/uvicorn process: uv run --project dify-agent uvicorn dify_agent.server.app:app --reload ``` -By default, the FastAPI lifespan creates both: +By default, the FastAPI lifespan creates: - one Redis-backed run store used by HTTP routes +- one shared plugin-daemon `httpx.AsyncClient` used by local run tasks - one process-local scheduler that starts background `asyncio` run tasks -This means local development needs one uvicorn process plus Redis. Run execution -still happens outside request handlers, so client disconnects do not cancel the -agent run. +This means local development needs one uvicorn process plus Redis, and +plugin-backed runs also need a reachable Dify plugin daemon. Run execution still +happens outside request handlers, so client disconnects do not cancel the agent +run. ## Configuration @@ -32,6 +34,15 @@ also reads `.env` and `dify-agent/.env` when present. | `DIFY_AGENT_REDIS_PREFIX` | `dify-agent` | Prefix for Redis record and event keys. | | `DIFY_AGENT_SHUTDOWN_GRACE_SECONDS` | `30` | Seconds to wait for active local runs during graceful shutdown before cancellation. | | `DIFY_AGENT_RUN_RETENTION_SECONDS` | `259200` | Seconds to retain Redis run records and per-run event streams; defaults to 3 days. | +| `DIFY_AGENT_PLUGIN_DAEMON_URL` | `http://localhost:5002` | Base URL for the Dify plugin daemon. | +| `DIFY_AGENT_PLUGIN_DAEMON_API_KEY` | empty | API key sent to the Dify plugin daemon. | +| `DIFY_AGENT_PLUGIN_DAEMON_CONNECT_TIMEOUT` | `10` | Plugin-daemon HTTP connect timeout in seconds. | +| `DIFY_AGENT_PLUGIN_DAEMON_READ_TIMEOUT` | `600` | Plugin-daemon HTTP read timeout in seconds. | +| `DIFY_AGENT_PLUGIN_DAEMON_WRITE_TIMEOUT` | `30` | Plugin-daemon HTTP write timeout in seconds. | +| `DIFY_AGENT_PLUGIN_DAEMON_POOL_TIMEOUT` | `10` | Plugin-daemon HTTP connection-pool wait timeout in seconds. | +| `DIFY_AGENT_PLUGIN_DAEMON_MAX_CONNECTIONS` | `100` | Maximum total plugin-daemon HTTP connections. | +| `DIFY_AGENT_PLUGIN_DAEMON_MAX_KEEPALIVE_CONNECTIONS` | `20` | Maximum idle keep-alive plugin-daemon HTTP connections. | +| `DIFY_AGENT_PLUGIN_DAEMON_KEEPALIVE_EXPIRY` | `30` | Keep-alive expiry in seconds for idle plugin-daemon HTTP connections. | Example `.env`: @@ -40,6 +51,8 @@ DIFY_AGENT_REDIS_URL=redis://localhost:6379/0 DIFY_AGENT_REDIS_PREFIX=dify-agent-dev DIFY_AGENT_SHUTDOWN_GRACE_SECONDS=30 DIFY_AGENT_RUN_RETENTION_SECONDS=259200 +DIFY_AGENT_PLUGIN_DAEMON_URL=http://localhost:5002 +DIFY_AGENT_PLUGIN_DAEMON_API_KEY=replace-with-daemon-key ``` Run records and event streams use the same retention. Status writes refresh the @@ -48,7 +61,7 @@ record TTL so active runs that keep producing events remain observable. ## Scheduling and shutdown semantics -`POST /runs` validates the compositor, persists a `running` run record, and starts +`POST /runs` validates the composition, persists a `running` run record, and starts an `asyncio` task in the same process. There is no Redis job stream, consumer group, pending reclaim, or automatic retry layer. @@ -64,13 +77,13 @@ shared status/event visibility, not load balancing or queued-job recovery. ## Run inputs and session snapshots -The API does not accept a top-level `user_prompt`. Submit a `CompositorConfig` -whose Agenton layers provide user input. With the MVP registry, use +The API does not accept a top-level `user_prompt`. Submit a `RunComposition` +whose Agenton layers provide user input. With the MVP provider set, use `plain.prompt` and its `config.user` field: ```json { - "compositor": { + "composition": { "schema_version": 1, "layers": [ { @@ -92,7 +105,7 @@ persisted or scheduled. There is no Pydantic AI history layer. To resume Agenton layer state, pass the `session_snapshot` from a previous `run_succeeded.data` payload together with a -compositor that has the same layer names and order. +composition that has the same layer names and order. ## Observing runs @@ -123,5 +136,5 @@ The repository includes simple consumers that print observed output/events: - `dify-agent/examples/dify_agent/dify_agent_examples/run_server_sse_consumer.py` consumes raw SSE frames for an existing run id. -Both examples use the credential-free Pydantic AI `TestModel` profile; they still -require Redis and the API server. +The create-run examples submit Dify plugin model layers, so they require Redis, +the API server, plugin-daemon settings, and provider credentials. diff --git a/dify-agent/examples/agenton/agenton_examples/basics.py b/dify-agent/examples/agenton/agenton_examples/basics.py index 81a66bcf15..81a3daeb18 100644 --- a/dify-agent/examples/agenton/agenton_examples/basics.py +++ b/dify-agent/examples/agenton/agenton_examples/basics.py @@ -5,12 +5,14 @@ from __future__ import annotations import asyncio from dataclasses import dataclass, field from inspect import signature +from typing import cast from typing_extensions import override -from agenton.compositor import CompositorBuilder, LayerRegistry -from agenton.layers import LayerControl, LayerDeps, NoLayerDeps, PlainLayer +from agenton.compositor import Compositor, LayerNode, LayerProvider +from agenton.layers import LayerDeps, NoLayerDeps, PlainLayer, PlainToolType from agenton_collections.layers.plain import DynamicToolsLayer, ObjectLayer, PromptLayer, ToolsLayer, with_object +from agenton_collections.layers.plain.basic import PromptLayerConfig @dataclass(frozen=True, slots=True) @@ -41,19 +43,19 @@ class TraceLayer(PlainLayer[NoLayerDeps]): events: list[str] = field(default_factory=list) @override - async def on_context_create(self, control: LayerControl) -> None: + async def on_context_create(self) -> None: self.events.append("create") @override - async def on_context_suspend(self, control: LayerControl) -> None: + async def on_context_suspend(self) -> None: self.events.append("suspend") @override - async def on_context_resume(self, control: LayerControl) -> None: + async def on_context_resume(self) -> None: self.events.append("resume") @override - async def on_context_delete(self, control: LayerControl) -> None: + async def on_context_delete(self) -> None: self.events.append("delete") @@ -72,64 +74,68 @@ async def main() -> None: audience="engineers composing agent capabilities", tone="precise and friendly", ) - trace = TraceLayer() - - registry = LayerRegistry() - registry.register_layer(PromptLayer) - compositor = ( - CompositorBuilder(registry) - .add_config( - { - "layers": [ - { - "name": "base_prompt", - "type": "plain.prompt", - "config": { - "prefix": "Use config dicts for serializable layers.", - "user": "Explain how the composed agent should use its layers.", - "suffix": "Before finalizing, make the result easy to scan.", - }, - }, - { - "name": "extra_prompt", - "type": "plain.prompt", - "config": { - "prefix": "Use constructed instances for objects, local code, and callables.", - }, - }, - ] - } - ) - .add_instance(name="profile", layer=ObjectLayer[AgentProfile](profile)) - .add_instance(name="profile_prompt", layer=ProfilePromptLayer()) - .add_instance(name="tools", layer=ToolsLayer(tool_entries=(count_words,))) - .add_instance( - name="dynamic_tools", - deps={"object_layer": "profile"}, - layer=DynamicToolsLayer[AgentProfile](tool_entries=(write_tagline,)), - ) - .add_instance(name="trace", layer=trace) - .build() + trace_events: list[str] = [] + compositor = Compositor( + [ + LayerNode("base_prompt", PromptLayer), + LayerNode("extra_prompt", PromptLayer), + LayerNode( + "profile", + LayerProvider.from_factory( + layer_type=ObjectLayer, + create=lambda _config: ObjectLayer[AgentProfile](profile), + ), + ), + LayerNode("profile_prompt", ProfilePromptLayer, deps={"profile": "profile"}), + LayerNode( + "tools", + LayerProvider.from_factory( + layer_type=ToolsLayer, + create=lambda _config: ToolsLayer(tool_entries=(count_words,)), + ), + ), + LayerNode( + "dynamic_tools", + LayerProvider.from_factory( + layer_type=DynamicToolsLayer, + create=lambda _config: DynamicToolsLayer[AgentProfile](tool_entries=(write_tagline,)), + ), + deps={"object_layer": "profile"}, + ), + LayerNode( + "trace", + LayerProvider.from_factory(layer_type=TraceLayer, create=lambda _config: TraceLayer(trace_events)), + ), + ] ) + configs = { + "base_prompt": PromptLayerConfig( + prefix="Use config dicts for serializable layers.", + user="Explain how the composed agent should use its layers.", + suffix="Before finalizing, make the result easy to scan.", + ), + "extra_prompt": PromptLayerConfig(prefix="Use constructed instances for objects, local code, and callables."), + } - print("Prompts:") - for prompt in compositor.prompts: - print(f"- {prompt.value}") + async with compositor.enter(configs=configs) as run: + print("Prompts:") + for prompt in run.prompts: + print(f"- {prompt.value}") - print("\nUser prompts:") - for prompt in compositor.user_prompts: - print(f"- {prompt.value}") + print("\nUser prompts:") + for prompt in run.user_prompts: + print(f"- {prompt.value}") - print("\nTools:") - for tool in compositor.tools: - print(f"- {tool.value.__name__}{signature(tool.value)}") - print([tool.value("layer composition") for tool in compositor.tools]) + print("\nTools:") + plain_tools = [cast(PlainToolType, tool) for tool in run.tools] + for tool in plain_tools: + print(f"- {tool.value.__name__}{signature(tool.value)}") + print([tool.value("layer composition") for tool in plain_tools]) + run.suspend_on_exit() - async with compositor.enter() as lifecycle_control: - lifecycle_control.suspend_on_exit() - async with compositor.enter(lifecycle_control): + async with compositor.enter(configs=configs, session_snapshot=run.session_snapshot): pass - print("\nLifecycle:", trace.events) + print("\nLifecycle:", trace_events) if __name__ == "__main__": diff --git a/dify-agent/examples/agenton/agenton_examples/pydantic_ai_bridge.py b/dify-agent/examples/agenton/agenton_examples/pydantic_ai_bridge.py index 012b8bc37a..ce6e10fbc7 100644 --- a/dify-agent/examples/agenton/agenton_examples/pydantic_ai_bridge.py +++ b/dify-agent/examples/agenton/agenton_examples/pydantic_ai_bridge.py @@ -12,8 +12,9 @@ from pydantic_ai.messages import BuiltinToolCallPart, ModelMessage, ToolCallPart from pydantic_ai.models.openai import OpenAIChatModel # pyright: ignore[reportDeprecated] from pydantic_ai.models.test import TestModel -from agenton.compositor import CompositorBuilder, LayerRegistry +from agenton.compositor import Compositor, LayerNode, LayerProvider from agenton_collections.layers.plain import ObjectLayer, PromptLayer, ToolsLayer +from agenton_collections.layers.plain.basic import PromptLayerConfig from agenton_collections.layers.pydantic_ai import PydanticAIBridgeLayer from agenton_collections.transformers import PYDANTIC_AI_TRANSFORMERS @@ -49,41 +50,47 @@ async def main() -> None: audience="engineers composing agent capabilities", tone="precise and friendly", ) - pydantic_ai_bridge = PydanticAIBridgeLayer[AgentProfile]( - prefix=("Prefer concrete details.", profile_prompt, tone_prompt), - user="Use the tools for 'layer composition'.", - tool_entries=(write_tagline,), + compositor = Compositor( + [ + LayerNode("base_prompt", PromptLayer), + LayerNode( + "profile", + LayerProvider.from_factory( + layer_type=ObjectLayer, + create=lambda _config: ObjectLayer[AgentProfile](profile), + ), + ), + LayerNode( + "plain_tools", + LayerProvider.from_factory( + layer_type=ToolsLayer, + create=lambda _config: ToolsLayer(tool_entries=(count_words,)), + ), + ), + LayerNode( + "pydantic_ai_bridge", + LayerProvider.from_factory( + layer_type=PydanticAIBridgeLayer, + create=lambda _config: PydanticAIBridgeLayer[AgentProfile]( + prefix=("Prefer concrete details.", profile_prompt, tone_prompt), + user="Use the tools for 'layer composition'.", + tool_entries=(write_tagline,), + ), + ), + deps={"object_layer": "profile"}, + ), + ], + **PYDANTIC_AI_TRANSFORMERS, ) - registry = LayerRegistry() - registry.register_layer(PromptLayer) - compositor = ( - CompositorBuilder(registry) - .add_config( - { - "layers": [ - { - "name": "base_prompt", - "type": "plain.prompt", - "config": { - "prefix": "Use the available tools before answering.", - "suffix": "Return concise, inspectable output.", - }, - }, - ] - } - ) - .add_instance(name="profile", layer=ObjectLayer[AgentProfile](profile)) - .add_instance(name="plain_tools", layer=ToolsLayer(tool_entries=(count_words,))) - .add_instance( - name="pydantic_ai_bridge", - deps={"object_layer": "profile"}, - layer=pydantic_ai_bridge, - ) - .build(**PYDANTIC_AI_TRANSFORMERS) - ) - - async with compositor.enter(): + async with compositor.enter( + configs={ + "base_prompt": PromptLayerConfig( + prefix="Use the available tools before answering.", + suffix="Return concise, inspectable output.", + ) + } + ) as run: model = ( OpenAIChatModel("gpt-5.5") # pyright: ignore[reportDeprecated] if os.getenv("OPENAI_API_KEY") @@ -92,12 +99,13 @@ async def main() -> None: agent = Agent[AgentProfile]( model=model, deps_type=AgentProfile, - tools=compositor.tools, + tools=run.tools, ) - for prompt in compositor.prompts: + for prompt in run.prompts: _ = agent.system_prompt(prompt) - result = await agent.run(compositor.user_prompts, deps=pydantic_ai_bridge.run_deps) + bridge_layer = run.get_layer("pydantic_ai_bridge", PydanticAIBridgeLayer) + result = await agent.run(run.user_prompts, deps=bridge_layer.run_deps) for line in _format_messages(result.all_messages()): print(line) diff --git a/dify-agent/examples/agenton/agenton_examples/session_snapshot.py b/dify-agent/examples/agenton/agenton_examples/session_snapshot.py index 43b47890c0..4e5a9c3b9f 100644 --- a/dify-agent/examples/agenton/agenton_examples/session_snapshot.py +++ b/dify-agent/examples/agenton/agenton_examples/session_snapshot.py @@ -3,15 +3,13 @@ from __future__ import annotations import asyncio -from collections import OrderedDict from dataclasses import dataclass from typing import ClassVar from pydantic import BaseModel, ConfigDict -from typing_extensions import override -from agenton.compositor import Compositor -from agenton.layers import LayerControl, NoLayerDeps, PlainLayer, PlainPromptType, PlainToolType +from agenton.compositor import Compositor, LayerNode +from agenton.layers import EmptyLayerConfig, NoLayerDeps, PlainLayer class ConnectionState(BaseModel): @@ -25,47 +23,27 @@ class ConnectionHandle: self.connection_id = connection_id -class ConnectionHandles(BaseModel): - connection: ConnectionHandle | None = None - - model_config = ConfigDict(extra="forbid", validate_assignment=True, arbitrary_types_allowed=True) - - @dataclass(slots=True) -class ConnectionLayer(PlainLayer[NoLayerDeps]): +class ConnectionLayer(PlainLayer[NoLayerDeps, EmptyLayerConfig, ConnectionState]): runtime_state_type: ClassVar[type[BaseModel]] = ConnectionState - runtime_handles_type: ClassVar[type[BaseModel]] = ConnectionHandles - - @override - async def on_context_create(self, control: LayerControl) -> None: - assert isinstance(control.runtime_state, ConnectionState) - assert isinstance(control.runtime_handles, ConnectionHandles) - control.runtime_handles.connection = ConnectionHandle(control.runtime_state.connection_id) - - @override - async def on_context_resume(self, control: LayerControl) -> None: - assert isinstance(control.runtime_state, ConnectionState) - assert isinstance(control.runtime_handles, ConnectionHandles) - control.runtime_handles.connection = ConnectionHandle(f"restored:{control.runtime_state.connection_id}") async def main() -> None: - compositor: Compositor[PlainPromptType, PlainToolType] = Compositor( - layers=OrderedDict([("connection", ConnectionLayer())]) - ) - session = compositor.new_session() - async with compositor.enter(session) as active_session: - active_session.suspend_on_exit() + compositor = Compositor([LayerNode("connection", ConnectionLayer)]) + async with compositor.enter() as run: + layer = run.get_layer("connection", ConnectionLayer) + connection = ConnectionHandle(layer.runtime_state.connection_id) + print("Active external handle:", connection.connection_id) + run.suspend_on_exit() - snapshot = compositor.snapshot_session(session) + snapshot = run.session_snapshot + assert snapshot is not None print("Snapshot:", snapshot.model_dump(mode="json")) - restored = compositor.session_from_snapshot(snapshot) - async with compositor.enter(restored): - handles = restored.layer("connection").runtime_handles - assert isinstance(handles, ConnectionHandles) - assert handles.connection is not None - print("Rehydrated handle:", handles.connection.connection_id) + async with compositor.enter(session_snapshot=snapshot) as restored_run: + layer = restored_run.get_layer("connection", ConnectionLayer) + restored_connection = ConnectionHandle(f"restored:{layer.runtime_state.connection_id}") + print("Rehydrated external handle:", restored_connection.connection_id) if __name__ == "__main__": diff --git a/dify-agent/examples/dify_agent/dify_agent_examples/run_server_consumer.py b/dify-agent/examples/dify_agent/dify_agent_examples/run_server_consumer.py index 33e1c11da5..3d2b7278d3 100644 --- a/dify-agent/examples/dify_agent/dify_agent_examples/run_server_consumer.py +++ b/dify-agent/examples/dify_agent/dify_agent_examples/run_server_consumer.py @@ -13,7 +13,6 @@ recover after client-side uncertainty. import asyncio -from agenton.compositor import CompositorConfig, LayerNodeConfig from agenton_collections.layers.plain import PromptLayerConfig from dify_agent.client import Client from dify_agent.layers.dify_plugin import ( @@ -21,7 +20,7 @@ from dify_agent.layers.dify_plugin import ( DifyPluginLLMLayerConfig, DifyPluginLayerConfig, ) -from dify_agent.protocol import DIFY_AGENT_MODEL_LAYER_ID, CreateRunRequest +from dify_agent.protocol import DIFY_AGENT_MODEL_LAYER_ID, CreateRunRequest, RunComposition, RunLayerSpec API_BASE_URL = "http://localhost:8000" @@ -36,9 +35,9 @@ async def main() -> None: async with Client(base_url=API_BASE_URL) as client: run = await client.create_run( CreateRunRequest( - compositor=CompositorConfig( + composition=RunComposition( layers=[ - LayerNodeConfig( + RunLayerSpec( name="prompt", type="plain.prompt", config=PromptLayerConfig( @@ -46,12 +45,12 @@ async def main() -> None: user="Say hello from the Dify Agent API server example.", ), ), - LayerNodeConfig( + RunLayerSpec( name="plugin", type="dify.plugin", config=DifyPluginLayerConfig(tenant_id=TENANT_ID, plugin_id=PLUGIN_ID), ), - LayerNodeConfig( + RunLayerSpec( name=DIFY_AGENT_MODEL_LAYER_ID, type="dify.plugin.llm", deps={"plugin": "plugin"}, diff --git a/dify-agent/examples/dify_agent/dify_agent_examples/run_server_sync_client.py b/dify-agent/examples/dify_agent/dify_agent_examples/run_server_sync_client.py index d00bb4dc76..c85184b082 100644 --- a/dify-agent/examples/dify_agent/dify_agent_examples/run_server_sync_client.py +++ b/dify-agent/examples/dify_agent/dify_agent_examples/run_server_sync_client.py @@ -5,7 +5,6 @@ does not retry ``POST /runs``; if a timeout occurs, inspect server state or crea a new run explicitly rather than assuming the original request was not accepted. """ -from agenton.compositor import CompositorConfig, LayerNodeConfig from agenton_collections.layers.plain import PromptLayerConfig from dify_agent.client import Client from dify_agent.layers.dify_plugin import ( @@ -13,7 +12,7 @@ from dify_agent.layers.dify_plugin import ( DifyPluginLLMLayerConfig, DifyPluginLayerConfig, ) -from dify_agent.protocol import DIFY_AGENT_MODEL_LAYER_ID, CreateRunRequest +from dify_agent.protocol import DIFY_AGENT_MODEL_LAYER_ID, CreateRunRequest, RunComposition, RunLayerSpec API_BASE_URL = "http://localhost:8000" @@ -28,9 +27,9 @@ def main() -> None: with Client(base_url=API_BASE_URL) as client: run = client.create_run_sync( CreateRunRequest( - compositor=CompositorConfig( + composition=RunComposition( layers=[ - LayerNodeConfig( + RunLayerSpec( name="prompt", type="plain.prompt", config=PromptLayerConfig( @@ -38,12 +37,12 @@ def main() -> None: user="Say hello from the synchronous Dify Agent client example.", ), ), - LayerNodeConfig( + RunLayerSpec( name="plugin", type="dify.plugin", config=DifyPluginLayerConfig(tenant_id=TENANT_ID, plugin_id=PLUGIN_ID), ), - LayerNodeConfig( + RunLayerSpec( name=DIFY_AGENT_MODEL_LAYER_ID, type="dify.plugin.llm", deps={"plugin": "plugin"}, diff --git a/dify-agent/src/dify_agent/layers/dify_plugin/llm_layer.py b/dify-agent/src/dify_agent/layers/dify_plugin/llm_layer.py index 5932d69cb3..75fe0c2908 100644 --- a/dify-agent/src/dify_agent/layers/dify_plugin/llm_layer.py +++ b/dify-agent/src/dify_agent/layers/dify_plugin/llm_layer.py @@ -1,18 +1,20 @@ """Dify plugin LLM model layer. This layer owns model capability resolution for Dify plugin-backed LLMs. It -depends on ``DifyPluginLayer`` for daemon access, resolves that dependency's -control from its own ``LayerControl``, and returns a Pydantic AI model adapter -configured from the public LLM layer DTO. The daemon provider carries plugin -transport identity; the DTO's ``model_provider`` is passed to the adapter as -request-level model identity. +depends on ``DifyPluginLayer`` for daemon identity through Agenton's direct +dependency binding and returns a Pydantic AI model adapter configured from the +public LLM layer DTO. Runtime code supplies the FastAPI lifespan-owned shared +HTTP client to ``get_model``; the layer does not own or discover live resources. +The daemon provider carries plugin transport identity, while the DTO's +``model_provider`` is passed to the adapter as request-level model identity. """ from dataclasses import dataclass +import httpx from typing_extensions import Self, override -from agenton.layers import EmptyRuntimeHandles, EmptyRuntimeState, LayerControl, LayerDeps, PlainLayer +from agenton.layers import LayerDeps, PlainLayer from dify_agent.adapters.llm import DifyLLMAdapterModel from dify_agent.layers.dify_plugin.configs import DifyPluginLLMLayerConfig from dify_agent.layers.dify_plugin.plugin_layer import DifyPluginLayer @@ -38,11 +40,9 @@ class DifyPluginLLMLayer(PlainLayer[DifyPluginLLMDeps, DifyPluginLLMLayerConfig] """Create the LLM layer from validated public config.""" return cls(config=config) - def get_model(self, control: LayerControl[EmptyRuntimeState, EmptyRuntimeHandles]) -> DifyLLMAdapterModel: - """Return the configured model using the current session's plugin control.""" - control = self.require_control(control, active=True) - plugin_control = control.control_for(self.deps.plugin) - provider = self.deps.plugin.get_daemon_provider(plugin_control) + def get_model(self, *, http_client: httpx.AsyncClient) -> DifyLLMAdapterModel: + """Return the configured model using the directly bound plugin dependency.""" + provider = self.deps.plugin.create_daemon_provider(http_client=http_client) return DifyLLMAdapterModel( model=self.config.model, daemon_provider=provider, diff --git a/dify-agent/src/dify_agent/layers/dify_plugin/plugin_layer.py b/dify-agent/src/dify_agent/layers/dify_plugin/plugin_layer.py index b8d42f63b1..0f2c5b56e1 100644 --- a/dify-agent/src/dify_agent/layers/dify_plugin/plugin_layer.py +++ b/dify-agent/src/dify_agent/layers/dify_plugin/plugin_layer.py @@ -1,50 +1,41 @@ """Runtime Dify plugin context layer. -The public config identifies tenant/plugin/user context only. Plugin daemon URL, -API key, and timeout are server-side dependencies injected by the layer registry -factory. Each active compositor entry owns an HTTP client in ``LayerControl`` -runtime handles and registers it on the control's resource stack. Callers pass -the control explicitly to ``get_daemon_provider`` so shared layer instances never -store or discover session-local clients implicitly. Business model-provider names -belong to the LLM layer/model request, not this daemon context layer. +The public config identifies tenant/plugin/user context only. Plugin daemon URL +and API key are server-side settings injected by the provider factory. The layer +is intentionally config/settings-only under Agenton's state-only core: it does +not open, cache, close, or snapshot HTTP clients, and its lifecycle hooks remain +the inherited no-op hooks. Runtime code passes the FastAPI lifespan-owned shared +``httpx.AsyncClient`` into ``create_daemon_provider`` for each model adapter. +Business model-provider names belong to the LLM layer/model request, not this +daemon context layer. """ from dataclasses import dataclass import httpx -from pydantic import BaseModel, ConfigDict from typing_extensions import Self, override -from agenton.layers import EmptyRuntimeState, LayerControl, NoLayerDeps, PlainLayer +from agenton.layers import EmptyRuntimeState, NoLayerDeps, PlainLayer from dify_agent.adapters.llm import DifyPluginDaemonProvider from dify_agent.layers.dify_plugin.configs import DifyPluginLayerConfig -class DifyPluginRuntimeHandles(BaseModel): - """Live per-entry handles for Dify plugin daemon access.""" - - http_client: httpx.AsyncClient | None = None - - model_config = ConfigDict(extra="forbid", validate_assignment=True, arbitrary_types_allowed=True) - - @dataclass(slots=True) -class DifyPluginLayer(PlainLayer[NoLayerDeps, DifyPluginLayerConfig, EmptyRuntimeState, DifyPluginRuntimeHandles]): - """Layer that owns plugin daemon connection state for one active session.""" +class DifyPluginLayer(PlainLayer[NoLayerDeps, DifyPluginLayerConfig, EmptyRuntimeState]): + """Layer that carries plugin daemon identity without owning live resources.""" type_id = "dify.plugin" config: DifyPluginLayerConfig daemon_url: str daemon_api_key: str - timeout: float | httpx.Timeout | None = 600.0 @classmethod @override def from_config(cls, config: DifyPluginLayerConfig) -> Self: """Reject construction without server-injected daemon settings.""" del config - raise TypeError("DifyPluginLayer requires server-side daemon settings and must use a registry factory.") + raise TypeError("DifyPluginLayer requires server-side daemon settings and must use a provider factory.") @classmethod def from_config_with_settings( @@ -53,68 +44,26 @@ class DifyPluginLayer(PlainLayer[NoLayerDeps, DifyPluginLayerConfig, EmptyRuntim *, daemon_url: str, daemon_api_key: str, - timeout: float | httpx.Timeout | None, ) -> Self: """Create a plugin layer from public config plus server-only daemon settings.""" - return cls(config=config, daemon_url=daemon_url, daemon_api_key=daemon_api_key, timeout=timeout) + return cls(config=config, daemon_url=daemon_url, daemon_api_key=daemon_api_key) - def get_daemon_provider( - self, - control: LayerControl[EmptyRuntimeState, DifyPluginRuntimeHandles], - ) -> DifyPluginDaemonProvider: - """Return a daemon provider backed by ``control``'s active HTTP client. + def create_daemon_provider(self, *, http_client: httpx.AsyncClient) -> DifyPluginDaemonProvider: + """Return a daemon provider backed by the shared plugin daemon client. Raises: - RuntimeError: if ``control`` is not active or its HTTP client is - absent/closed. + RuntimeError: if ``http_client`` has already been closed. """ - control = self.require_control(control, active=True) - client = control.runtime_handles.http_client - if client is None or client.is_closed: - raise RuntimeError( - "DifyPluginLayer.get_daemon_provider() requires an entered control with an open HTTP client." - ) + if http_client.is_closed: + raise RuntimeError("DifyPluginLayer.create_daemon_provider() requires an open shared HTTP client.") return DifyPluginDaemonProvider( tenant_id=self.config.tenant_id, plugin_id=self.config.plugin_id, plugin_daemon_url=self.daemon_url, plugin_daemon_api_key=self.daemon_api_key, user_id=self.config.user_id, - timeout=self.timeout, - http_client=client, + http_client=http_client, ) - @override - async def on_context_create( - self, - control: LayerControl[EmptyRuntimeState, DifyPluginRuntimeHandles], - ) -> None: - await self._open_http_client(control) - @override - async def on_context_resume( - self, - control: LayerControl[EmptyRuntimeState, DifyPluginRuntimeHandles], - ) -> None: - await self._open_http_client(control) - - @override - async def on_context_suspend( - self, - control: LayerControl[EmptyRuntimeState, DifyPluginRuntimeHandles], - ) -> None: - control.runtime_handles.http_client = None - - @override - async def on_context_delete( - self, - control: LayerControl[EmptyRuntimeState, DifyPluginRuntimeHandles], - ) -> None: - control.runtime_handles.http_client = None - - async def _open_http_client(self, control: LayerControl[EmptyRuntimeState, DifyPluginRuntimeHandles]) -> None: - control.runtime_handles.http_client = await control.enter_async_resource( - httpx.AsyncClient(timeout=self.timeout, trust_env=False) - ) - -__all__ = ["DifyPluginLayer", "DifyPluginRuntimeHandles"] +__all__ = ["DifyPluginLayer"] diff --git a/dify-agent/src/dify_agent/protocol/__init__.py b/dify-agent/src/dify_agent/protocol/__init__.py index 2feec28be9..82f47d1ded 100644 --- a/dify-agent/src/dify_agent/protocol/__init__.py +++ b/dify-agent/src/dify_agent/protocol/__init__.py @@ -10,15 +10,18 @@ from .schemas import ( LayerExitSignals, PydanticAIStreamRunEvent, RunEvent, + RunComposition, RunEventType, RunEventsResponse, RunFailedEvent, RunFailedEventData, + RunLayerSpec, RunStartedEvent, RunStatus, RunStatusResponse, RunSucceededEvent, RunSucceededEventData, + normalize_composition, utc_now, ) @@ -31,15 +34,18 @@ __all__ = [ "LayerExitSignals", "PydanticAIStreamRunEvent", "RUN_EVENT_ADAPTER", + "RunComposition", "RunEvent", "RunEventType", "RunEventsResponse", "RunFailedEvent", "RunFailedEventData", + "RunLayerSpec", "RunStartedEvent", "RunStatus", "RunStatusResponse", "RunSucceededEvent", "RunSucceededEventData", + "normalize_composition", "utc_now", ] diff --git a/dify-agent/src/dify_agent/protocol/schemas.py b/dify-agent/src/dify_agent/protocol/schemas.py index c92d8a6254..a09c93d404 100644 --- a/dify-agent/src/dify_agent/protocol/schemas.py +++ b/dify-agent/src/dify_agent/protocol/schemas.py @@ -1,17 +1,24 @@ """Public HTTP protocol schemas for the Dify Agent run API. This module is the shared wire contract for the FastAPI server, runtime event -producers, storage adapters, and Python client. The server accepts only -registry-backed Agenton compositor configs, keeping HTTP input data-only and -preventing unsafe import-path construction. Run events are append-only records; -Redis stream ids (or in-memory equivalents in tests) are the public cursors used -by polling and SSE replay. Event envelopes keep the public -``id``/``run_id``/``type``/``data``/``created_at`` shape, while each ``type`` has -a typed ``data`` model so OpenAPI, Redis replay, and clients parse the same -payload contract. Model/provider selection is part of the submitted Agenton -layer graph, not a top-level run field; the runtime reads the model layer named -by ``DIFY_AGENT_MODEL_LAYER_ID``. Request-level layer exit signals decide whether -each layer control is suspended or deleted when the active entry exits, with +producers, storage adapters, and Python client. Create-run requests expose a +Dify-friendly ``composition.layers[].config`` shape so callers can describe one +layer in one place; the server normalizes that public DTO into Agenton's +state-only ``CompositorConfig`` plus node-name keyed per-run configs before +calling ``Compositor.enter(configs=...)``. Session snapshots and ``on_exit`` stay +top-level because they are per-run resume state and exit policy, not graph node +definition. + +The server still constructs layers only from explicit provider type ids, keeping +HTTP input data-only and preventing unsafe import-path construction. Run events +are append-only records; Redis stream ids (or in-memory equivalents in tests) are +the public cursors used by polling and SSE replay. Event envelopes keep the +public ``id``/``run_id``/``type``/``data``/``created_at`` shape, while each +``type`` has a typed ``data`` model so OpenAPI, Redis replay, and clients parse +the same payload contract. Model/provider selection is part of the submitted +composition, not a top-level run field; the runtime reads the model layer named +by ``DIFY_AGENT_MODEL_LAYER_ID``. Request-level ``on_exit`` signals decide +whether each active layer is suspended or deleted when the run exits, with suspend as the default so successful terminal events can include resumable snapshots. Successful runs publish the final JSON-safe agent output and the resumable Agenton session snapshot together on the terminal ``run_succeeded`` @@ -24,7 +31,7 @@ from typing import Annotated, ClassVar, Final, Literal, TypeAlias from pydantic import BaseModel, ConfigDict, Field, JsonValue, TypeAdapter from pydantic_ai.messages import AgentStreamEvent -from agenton.compositor import CompositorConfig, CompositorSessionSnapshot +from agenton.compositor import CompositorConfig, CompositorSessionSnapshot, LayerConfigInput, LayerNodeConfig from agenton.layers import ExitIntent @@ -44,7 +51,7 @@ def utc_now() -> datetime: class LayerExitSignals(BaseModel): - """Requested per-layer lifecycle behavior when a run leaves its active session.""" + """Requested per-layer lifecycle behavior for the top-level ``on_exit`` field.""" default: ExitIntent = ExitIntent.SUSPEND layers: dict[str, ExitIntent] = Field(default_factory=dict) @@ -52,22 +59,85 @@ class LayerExitSignals(BaseModel): model_config: ClassVar[ConfigDict] = ConfigDict(extra="forbid") +class RunLayerSpec(BaseModel): + """Public graph node plus per-run layer config for one Dify Agent layer. + + ``name``/``type``/``deps``/``metadata`` are normalized into Agenton's + provider-backed graph config. ``config`` is kept separate at the Agenton + boundary and passed to ``Compositor.enter(configs=...)`` keyed by ``name``; + existing layer config DTO instances are preserved so client code can stay + DTO-first without being forced into raw dictionaries. + """ + + name: str + type: str + deps: dict[str, str] = Field(default_factory=dict) + metadata: dict[str, JsonValue] = Field(default_factory=dict) + config: LayerConfigInput = None + + model_config: ClassVar[ConfigDict] = ConfigDict(extra="forbid") + + +class RunComposition(BaseModel): + """Public create-run composition DTO. + + The public shape intentionally differs from Agenton's internal + ``CompositorConfig`` by carrying each layer's per-run config next to its graph + node fields. Use ``normalize_composition`` at server/runtime boundaries before + constructing a ``Compositor``. + """ + + schema_version: int = 1 + layers: list[RunLayerSpec] + + model_config: ClassVar[ConfigDict] = ConfigDict(extra="forbid") + + class CreateRunRequest(BaseModel): """Request body for creating one async agent run. - Model/provider configuration must be supplied through the compositor layer - named by ``DIFY_AGENT_MODEL_LAYER_ID``. ``layer_exit_signals`` defaults every + Model/provider configuration must be supplied through the composition layer + named by ``DIFY_AGENT_MODEL_LAYER_ID``. ``on_exit`` defaults every active layer to suspend so callers receive a resumable success snapshot unless they explicitly request delete for one or more layers. """ - compositor: CompositorConfig + composition: RunComposition session_snapshot: CompositorSessionSnapshot | None = None - layer_exit_signals: LayerExitSignals = Field(default_factory=LayerExitSignals) + on_exit: LayerExitSignals = Field(default_factory=LayerExitSignals) model_config: ClassVar[ConfigDict] = ConfigDict(extra="forbid") +def normalize_composition(composition: RunComposition) -> tuple[CompositorConfig, dict[str, LayerConfigInput]]: + """Split public Dify composition into Agenton's graph config and layer configs. + + Returns: + A ``CompositorConfig`` containing only graph fields and a node-name keyed + config mapping suitable for ``Compositor.enter(configs=...)``. + + The helper is the stable public-to-Agenton boundary: it preserves concrete + ``LayerConfig`` DTO inputs where possible, does not accept legacy + ``LayerNodeConfig(config=...)`` payloads, and keeps session snapshots plus + exit signals out of graph normalization. + """ + + graph_config = CompositorConfig( + schema_version=composition.schema_version, + layers=[ + LayerNodeConfig( + name=layer.name, + type=layer.type, + deps=dict(layer.deps), + metadata=dict(layer.metadata), + ) + for layer in composition.layers + ], + ) + layer_configs = {layer.name: layer.config for layer in composition.layers} + return graph_config, layer_configs + + class CreateRunResponse(BaseModel): """Response returned after a run has been persisted and scheduled locally.""" @@ -152,10 +222,7 @@ class RunFailedEvent(BaseRunEvent): RunEvent: TypeAlias = Annotated[ - RunStartedEvent - | PydanticAIStreamRunEvent - | RunSucceededEvent - | RunFailedEvent, + RunStartedEvent | PydanticAIStreamRunEvent | RunSucceededEvent | RunFailedEvent, Field(discriminator="type"), ] RUN_EVENT_ADAPTER: TypeAdapter[RunEvent] = TypeAdapter(RunEvent) @@ -180,6 +247,7 @@ __all__ = [ "LayerExitSignals", "PydanticAIStreamRunEvent", "RUN_EVENT_ADAPTER", + "RunComposition", "RunEvent", "RunEventType", "RunEventsResponse", @@ -190,5 +258,7 @@ __all__ = [ "RunStatusResponse", "RunSucceededEvent", "RunSucceededEventData", + "RunLayerSpec", + "normalize_composition", "utc_now", ] diff --git a/dify-agent/src/dify_agent/runtime/agent_factory.py b/dify-agent/src/dify_agent/runtime/agent_factory.py index d4274ec577..88589ad507 100644 --- a/dify-agent/src/dify_agent/runtime/agent_factory.py +++ b/dify-agent/src/dify_agent/runtime/agent_factory.py @@ -2,11 +2,14 @@ The run request carries model/provider selection in the layer graph. This helper keeps Agent construction details out of ``AgentRunRunner`` while accepting an -already resolved Pydantic AI model from the configured model layer. +already resolved Pydantic AI model from the configured model layer. Prompt and +tool values arriving here are already transformed by Agenton's +``PYDANTIC_AI_TRANSFORMERS`` preset; this module registers those pydantic-ai +objects without reimplementing plain/pydantic-ai conversion logic. """ from collections.abc import Sequence -from typing import Any, Callable, cast +from typing import Any, cast from pydantic_ai import Agent from pydantic_ai.messages import UserContent @@ -22,28 +25,17 @@ def create_agent( tools: Sequence[PydanticAITool[object]], ) -> Agent[None, object]: """Create the pydantic-ai agent for one run.""" - return cast( + agent = cast( Agent[None, object], Agent[None, str]( model, output_type=str, - system_prompt=materialize_static_system_prompts(system_prompts), tools=tools, ), ) - - -def materialize_static_system_prompts(system_prompts: Sequence[PydanticAIPrompt[object]]) -> list[str]: - """Convert MVP static prompt callables into strings for pydantic-ai.""" - result: list[str] = [] for prompt in system_prompts: - if isinstance(prompt, str): - result.append(prompt) - elif callable(prompt): - result.append(cast(Callable[[], str], prompt)()) - else: - raise TypeError(f"Unsupported system prompt type: {type(prompt).__qualname__}") - return result + _ = agent.system_prompt(cast(Any, prompt)) + return agent def normalize_user_input(user_prompts: Sequence[UserContent]) -> str | Sequence[UserContent]: @@ -53,4 +45,4 @@ def normalize_user_input(user_prompts: Sequence[UserContent]) -> str | Sequence[ return list(user_prompts) -__all__ = ["create_agent", "materialize_static_system_prompts", "normalize_user_input"] +__all__ = ["create_agent", "normalize_user_input"] diff --git a/dify-agent/src/dify_agent/runtime/agenton_validation.py b/dify-agent/src/dify_agent/runtime/agenton_validation.py new file mode 100644 index 0000000000..3522e16b08 --- /dev/null +++ b/dify-agent/src/dify_agent/runtime/agenton_validation.py @@ -0,0 +1,28 @@ +"""Shared validation helpers for Agenton-backed request boundaries. + +Most bad Dify Agent inputs surface from Agenton as ``KeyError``, ``TypeError``, +or ``ValueError`` while graph config, per-run layer config, and session snapshot +DTOs are being validated. One smaller class of request-shaped failures appears a +bit later, during ``Compositor.enter(...)`` before the body of the entered run +executes: session snapshots may contain lifecycle states such as ``CLOSED`` that +are serializable but not enterable. Agenton reports those as ``RuntimeError``. + +Dify Agent intentionally translates only these known enter-time runtime errors +into public request-validation errors. Other runtime failures still represent +execution bugs or infrastructure problems and must not be downgraded to client +input errors. +""" + +_ENTER_VALIDATION_RUNTIME_ERROR_FRAGMENTS = ( + "ACTIVE snapshots are not allowed.", + "CLOSED snapshots cannot be entered.", +) + + +def is_agenton_enter_validation_runtime_error(exc: RuntimeError) -> bool: + """Return whether ``exc`` is a known Agenton enter-time input failure.""" + message = str(exc) + return any(fragment in message for fragment in _ENTER_VALIDATION_RUNTIME_ERROR_FRAGMENTS) + + +__all__ = ["is_agenton_enter_validation_runtime_error"] diff --git a/dify-agent/src/dify_agent/runtime/compositor_factory.py b/dify-agent/src/dify_agent/runtime/compositor_factory.py index a51e3bb610..d4d1d2bb4f 100644 --- a/dify-agent/src/dify_agent/runtime/compositor_factory.py +++ b/dify-agent/src/dify_agent/runtime/compositor_factory.py @@ -1,17 +1,20 @@ """Safe Agenton compositor construction for API-submitted configs. -Only explicitly registered layer types are constructible here. The default -registry contains prompt layers plus Dify plugin LLM layers. Public DTOs provide -tenant/plugin/model data, while server-only plugin daemon settings are injected -through the registry factory for ``DifyPluginLayer``. +Only explicitly allowed provider type ids are constructible here. The default +provider set contains prompt layers plus Dify plugin LLM layers. Public DTOs +provide tenant/plugin/model data, while server-only plugin daemon settings are +injected through the provider factory for ``DifyPluginLayer``. The resulting +``Compositor`` remains Agenton state-only: live resources such as the plugin +daemon HTTP client are supplied later by the runtime and never enter providers, +layers, or session snapshots. """ -from typing import cast +from collections.abc import Mapping, Sequence +from typing import Any, cast -import httpx from pydantic_ai.messages import UserContent -from agenton.compositor import Compositor, CompositorConfig, LayerRegistry +from agenton.compositor import Compositor, CompositorConfig, LayerProvider, LayerProviderInput from agenton.layers.types import AllPromptTypes, AllToolTypes, AllUserPromptTypes, PydanticAIPrompt, PydanticAITool from agenton_collections.layers.plain.basic import PromptLayer from agenton_collections.transformers.pydantic_ai import PYDANTIC_AI_TRANSFORMERS @@ -20,32 +23,34 @@ from dify_agent.layers.dify_plugin.llm_layer import DifyPluginLLMLayer from dify_agent.layers.dify_plugin.plugin_layer import DifyPluginLayer -def create_default_layer_registry( +type DifyAgentLayerProvider = LayerProvider[Any] + + +def create_default_layer_providers( *, plugin_daemon_url: str = "http://localhost:5002", plugin_daemon_api_key: str = "", - plugin_daemon_timeout: float | httpx.Timeout | None = 600.0, -) -> LayerRegistry: - """Return the server registry of safe config-constructible layers.""" - registry = LayerRegistry() - registry.register_layer(PromptLayer) - registry.register_layer( - DifyPluginLayer, - factory=lambda config: DifyPluginLayer.from_config_with_settings( - DifyPluginLayerConfig.model_validate(config), - daemon_url=plugin_daemon_url, - daemon_api_key=plugin_daemon_api_key, - timeout=plugin_daemon_timeout, +) -> tuple[DifyAgentLayerProvider, ...]: + """Return the server provider set of safe config-constructible layers.""" + return ( + LayerProvider.from_layer_type(PromptLayer), + LayerProvider.from_factory( + layer_type=DifyPluginLayer, + create=lambda config: DifyPluginLayer.from_config_with_settings( + DifyPluginLayerConfig.model_validate(config), + daemon_url=plugin_daemon_url, + daemon_api_key=plugin_daemon_api_key, + ), ), + LayerProvider.from_layer_type(DifyPluginLLMLayer), ) - registry.register_layer(DifyPluginLLMLayer) - return registry def build_pydantic_ai_compositor( config: CompositorConfig, *, - registry: LayerRegistry | None = None, + providers: Sequence[LayerProviderInput], + node_providers: Mapping[str, LayerProviderInput] | None = None, ) -> Compositor[ PydanticAIPrompt[object], PydanticAITool[object], @@ -54,7 +59,14 @@ def build_pydantic_ai_compositor( UserContent, AllUserPromptTypes, ]: - """Build a Pydantic AI-ready compositor from a validated config.""" + """Build a Pydantic AI-ready compositor from a validated graph config. + + Prompt, user prompt, and tool conversion is delegated to Agenton's shared + pydantic-ai transformer preset so Dify Agent does not duplicate conversion + logic for plain and pydantic-ai layer families. Callers must pass the already + selected provider set explicitly so provider defaulting stays at outer runtime + boundaries rather than being duplicated here. + """ return cast( Compositor[ PydanticAIPrompt[object], @@ -66,10 +78,11 @@ def build_pydantic_ai_compositor( ], Compositor.from_config( config, - registry=registry or create_default_layer_registry(), + providers=providers, + node_providers=node_providers, **PYDANTIC_AI_TRANSFORMERS, # pyright: ignore[reportArgumentType] ), ) -__all__ = ["build_pydantic_ai_compositor", "create_default_layer_registry"] +__all__ = ["DifyAgentLayerProvider", "build_pydantic_ai_compositor", "create_default_layer_providers"] diff --git a/dify-agent/src/dify_agent/runtime/layer_exit_signals.py b/dify-agent/src/dify_agent/runtime/layer_exit_signals.py index 7d49558f5f..3636b89836 100644 --- a/dify-agent/src/dify_agent/runtime/layer_exit_signals.py +++ b/dify-agent/src/dify_agent/runtime/layer_exit_signals.py @@ -1,14 +1,15 @@ """Validation and application of request-level Agenton layer exit signals. -HTTP requests carry data-only lifecycle intent in ``LayerExitSignals``. The -runtime validates the signal keys against the built compositor before a run is -persisted or entered, then applies the resolved intent after entry because -``Layer.lifecycle_enter`` resets controls to delete on each successful enter. +HTTP requests carry data-only lifecycle intent in the top-level ``on_exit`` +field. The runtime validates signal keys against the built compositor before a +run is persisted or entered, then applies the resolved intent to the active +``CompositorRun`` after entry because Agenton initializes each run slot with a +delete-on-exit intent. """ from typing import Any -from agenton.compositor import Compositor, CompositorSession +from agenton.compositor import Compositor, CompositorRun from agenton.layers import ExitIntent from dify_agent.protocol.schemas import LayerExitSignals @@ -18,22 +19,26 @@ def validate_layer_exit_signals( signals: LayerExitSignals, ) -> None: """Raise ``ValueError`` when ``signals`` mention layers absent from ``compositor``.""" - unknown_layer_ids = set(signals.layers) - set(compositor.layers) + known_layer_ids = {node.name for node in compositor.nodes} + unknown_layer_ids = set(signals.layers) - known_layer_ids if not unknown_layer_ids: return names = ", ".join(sorted(unknown_layer_ids)) - raise ValueError(f"layer_exit_signals.layers references unknown layer ids: {names}.") + raise ValueError(f"on_exit.layers references unknown layer ids: {names}.") -def apply_layer_exit_signals(session: CompositorSession, signals: LayerExitSignals) -> None: - """Apply ``signals`` to active controls for the current compositor entry.""" - for layer_id, control in session.layer_controls.items(): +def apply_layer_exit_signals( + run: CompositorRun[Any, Any, Any, Any, Any, Any], + signals: LayerExitSignals, +) -> None: + """Apply ``signals`` to active run slots for the current compositor entry.""" + for layer_id in run.slots: intent = signals.layers.get(layer_id, signals.default) if intent is ExitIntent.SUSPEND: - control.suspend_on_exit() + run.suspend_layer_on_exit(layer_id) elif intent is ExitIntent.DELETE: - control.delete_on_exit() + run.delete_layer_on_exit(layer_id) else: raise ValueError(f"Unsupported layer exit intent: {intent!r}.") diff --git a/dify-agent/src/dify_agent/runtime/run_scheduler.py b/dify-agent/src/dify_agent/runtime/run_scheduler.py index 179ef39e53..0211eabbb1 100644 --- a/dify-agent/src/dify_agent/runtime/run_scheduler.py +++ b/dify-agent/src/dify_agent/runtime/run_scheduler.py @@ -5,6 +5,11 @@ The scheduler is intentionally process-local: it persists a run record, starts a task registry. Redis remains the durable source for status and event streams, but there is no Redis job queue or cross-process handoff. If the process crashes, currently active runs are lost until an external operator marks or retries them. +Create-run validation enters a lightweight Agenton run before persistence so the +same transformed user prompts and top-level ``on_exit`` policy used by execution +are checked without relying on removed session/control APIs; Dify's default +layers keep lifecycle hooks side-effect free so this validation does not open +plugin daemon clients. """ import asyncio @@ -12,11 +17,14 @@ import logging from collections.abc import Callable from typing import Protocol -from agenton.compositor import LayerRegistry -from dify_agent.protocol.schemas import CreateRunRequest -from dify_agent.runtime.compositor_factory import build_pydantic_ai_compositor, create_default_layer_registry +import httpx + +from agenton.compositor import LayerProviderInput +from dify_agent.protocol.schemas import CreateRunRequest, normalize_composition +from dify_agent.runtime.agenton_validation import is_agenton_enter_validation_runtime_error +from dify_agent.runtime.compositor_factory import build_pydantic_ai_compositor, create_default_layer_providers from dify_agent.runtime.event_sink import RunEventSink, emit_run_failed -from dify_agent.runtime.layer_exit_signals import validate_layer_exit_signals +from dify_agent.runtime.layer_exit_signals import apply_layer_exit_signals, validate_layer_exit_signals from dify_agent.runtime.runner import AgentRunRunner from dify_agent.runtime.user_prompt_validation import EMPTY_USER_PROMPTS_ERROR, has_non_blank_user_prompt from dify_agent.server.schemas import RunRecord @@ -28,6 +36,10 @@ class SchedulerStoppingError(RuntimeError): """Raised when a create-run request arrives after shutdown has started.""" +class RunRequestValidationError(ValueError): + """Raised when a create-run request cannot produce an executable Agenton run.""" + + class RunStore(RunEventSink, Protocol): """Persistence boundary needed by the scheduler.""" @@ -53,9 +65,9 @@ class RunScheduler: ``active_tasks`` is mutated only on the event loop that calls ``create_run`` and ``shutdown``. The task registry is not durable; it exists so the lifespan hook can wait for in-flight work and mark cancelled runs failed before Redis is - closed. A lock guards the stopping flag, run persistence, and task - registration so shutdown cannot complete while a run is between record - creation and active-task tracking. + closed. A lock guards the stopping flag, lightweight request validation, run + persistence, and task registration so shutdown cannot begin after a request is + admitted and no validation runs once stopping has been set. """ store: RunStore @@ -63,22 +75,25 @@ class RunScheduler: active_tasks: dict[str, asyncio.Task[None]] stopping: bool runner_factory: RunRunnerFactory - layer_registry: LayerRegistry + layer_providers: tuple[LayerProviderInput, ...] + plugin_daemon_http_client: httpx.AsyncClient _lifecycle_lock: asyncio.Lock def __init__( self, *, store: RunStore, + plugin_daemon_http_client: httpx.AsyncClient, shutdown_grace_seconds: float = 30, - layer_registry: LayerRegistry | None = None, + layer_providers: tuple[LayerProviderInput, ...] | None = None, runner_factory: RunRunnerFactory | None = None, ) -> None: self.store = store self.shutdown_grace_seconds = shutdown_grace_seconds self.active_tasks = {} self.stopping = False - self.layer_registry = layer_registry or create_default_layer_registry() + self.plugin_daemon_http_client = plugin_daemon_http_client + self.layer_providers = layer_providers if layer_providers is not None else create_default_layer_providers() self.runner_factory = runner_factory or self._default_runner_factory self._lifecycle_lock = asyncio.Lock() @@ -88,14 +103,10 @@ class RunScheduler: The returned record is already ``running``. The background task is removed from ``active_tasks`` when it finishes, regardless of success or failure. """ - compositor = build_pydantic_ai_compositor(request.compositor, registry=self.layer_registry) - validate_layer_exit_signals(compositor, request.layer_exit_signals) - if not has_non_blank_user_prompt(compositor.user_prompts): - raise ValueError(EMPTY_USER_PROMPTS_ERROR) - async with self._lifecycle_lock: if self.stopping: raise SchedulerStoppingError("run scheduler is shutting down") + await validate_run_request(request, layer_providers=self.layer_providers) record = await self.store.create_run() task = asyncio.create_task(self._run_record(record, request), name=f"dify-agent-run-{record.run_id}") self.active_tasks[record.run_id] = task @@ -136,7 +147,8 @@ class RunScheduler: sink=self.store, request=request, run_id=record.run_id, - layer_registry=self.layer_registry, + plugin_daemon_http_client=self.plugin_daemon_http_client, + layer_providers=self.layer_providers, ) async def _mark_cancelled_run_failed(self, run_id: str) -> None: @@ -149,4 +161,42 @@ class RunScheduler: logger.exception("failed to mark cancelled run failed", extra={"run_id": run_id}) -__all__ = ["RunScheduler", "SchedulerStoppingError"] +async def validate_run_request( + request: CreateRunRequest, + *, + layer_providers: tuple[LayerProviderInput, ...] | None = None, +) -> None: + """Validate create-run semantics that require an entered Agenton run. + + This boundary rejects unknown ``on_exit`` layer ids, effectively empty + transformed user prompts, and known enter-time snapshot lifecycle errors + before the scheduler persists a run record. It also exercises provider config + validation and snapshot hydration without touching external services because + Dify plugin daemon clients are owned by the FastAPI lifespan, not Agenton + lifecycle hooks. + """ + resolved_layer_providers = layer_providers if layer_providers is not None else create_default_layer_providers() + entered_run = False + try: + graph_config, layer_configs = normalize_composition(request.composition) + compositor = build_pydantic_ai_compositor( + graph_config, + providers=resolved_layer_providers, + ) + validate_layer_exit_signals(compositor, request.on_exit) + async with compositor.enter(configs=layer_configs, session_snapshot=request.session_snapshot) as run: + entered_run = True + apply_layer_exit_signals(run, request.on_exit) + if not has_non_blank_user_prompt(run.user_prompts): + raise RunRequestValidationError(EMPTY_USER_PROMPTS_ERROR) + except RunRequestValidationError: + raise + except RuntimeError as exc: + if not entered_run and is_agenton_enter_validation_runtime_error(exc): + raise RunRequestValidationError(str(exc)) from exc + raise + except (KeyError, TypeError, ValueError) as exc: + raise RunRequestValidationError(str(exc)) from exc + + +__all__ = ["RunRequestValidationError", "RunScheduler", "SchedulerStoppingError", "validate_run_request"] diff --git a/dify-agent/src/dify_agent/runtime/runner.py b/dify-agent/src/dify_agent/runtime/runner.py index f8c1b36ced..a310d02338 100644 --- a/dify-agent/src/dify_agent/runtime/runner.py +++ b/dify-agent/src/dify_agent/runtime/runner.py @@ -1,26 +1,29 @@ """Runtime execution for one scheduled Dify Agent run. -The runner is storage-agnostic: it builds an Agenton compositor, enters or -resumes its session, runs pydantic-ai with ``compositor.user_prompts`` as the user -input, emits stream events, applies request-level layer exit signals, snapshots -the resulting session, and then publishes a terminal success or failure event. -The Pydantic AI model is resolved from the active Agenton layer named by -``DIFY_AGENT_MODEL_LAYER_ID``. Successful terminal events contain both the -JSON-safe final output and session snapshot; there are no separate output or -snapshot events to correlate. +The runner is storage-agnostic: it normalizes the public Dify composition into +Agenton's graph/config split, enters a fresh ``CompositorRun`` (or resumes one +from a snapshot), runs pydantic-ai with ``run.user_prompts`` as the user input, +emits stream events, applies request-level ``on_exit`` signals, and then +publishes a terminal success or failure event. The Pydantic AI model is resolved +from the active Agenton layer named by ``DIFY_AGENT_MODEL_LAYER_ID`` and receives +the FastAPI lifespan-owned plugin daemon HTTP client; no run or layer owns that +client. Successful terminal events contain both the JSON-safe final output and +session snapshot; there are no separate output or snapshot events to correlate. """ from collections.abc import AsyncIterable from typing import cast +import httpx from pydantic import JsonValue, TypeAdapter from pydantic_ai.messages import AgentStreamEvent -from agenton.compositor import CompositorSessionSnapshot, LayerRegistry +from agenton.compositor import CompositorSessionSnapshot, LayerProviderInput from dify_agent.layers.dify_plugin.llm_layer import DifyPluginLLMLayer -from dify_agent.protocol.schemas import DIFY_AGENT_MODEL_LAYER_ID, CreateRunRequest +from dify_agent.protocol.schemas import DIFY_AGENT_MODEL_LAYER_ID, CreateRunRequest, normalize_composition from dify_agent.runtime.agent_factory import create_agent, normalize_user_input -from dify_agent.runtime.compositor_factory import build_pydantic_ai_compositor, create_default_layer_registry +from dify_agent.runtime.agenton_validation import is_agenton_enter_validation_runtime_error +from dify_agent.runtime.compositor_factory import build_pydantic_ai_compositor, create_default_layer_providers from dify_agent.runtime.event_sink import ( RunEventSink, emit_pydantic_ai_event, @@ -46,7 +49,8 @@ class AgentRunRunner: request: CreateRunRequest run_id: str - layer_registry: LayerRegistry + layer_providers: tuple[LayerProviderInput, ...] + plugin_daemon_http_client: httpx.AsyncClient def __init__( self, @@ -54,12 +58,14 @@ class AgentRunRunner: sink: RunEventSink, request: CreateRunRequest, run_id: str, - layer_registry: LayerRegistry | None = None, + plugin_daemon_http_client: httpx.AsyncClient, + layer_providers: tuple[LayerProviderInput, ...] | None = None, ) -> None: self.sink = sink self.request = request self.run_id = run_id - self.layer_registry = layer_registry or create_default_layer_registry() + self.plugin_daemon_http_client = plugin_daemon_http_client + self.layer_providers = layer_providers if layer_providers is not None else create_default_layer_providers() async def run(self) -> None: """Execute the run and emit the documented event sequence.""" @@ -83,38 +89,52 @@ class AgentRunRunner: await self.sink.update_status(self.run_id, "succeeded") async def _run_agent(self) -> tuple[JsonValue, CompositorSessionSnapshot]: - """Run pydantic-ai inside an entered Agenton session.""" - compositor = build_pydantic_ai_compositor(self.request.compositor, registry=self.layer_registry) + """Run pydantic-ai inside an entered Agenton run. + + Known input-shaped Agenton enter-time runtime errors, such as trying to + resume a ``CLOSED`` snapshot layer, are normalized to + ``AgentRunValidationError``. Later runtime failures still propagate as + execution errors so they become terminal failed runs rather than client + validation responses. + """ try: - validate_layer_exit_signals(compositor, self.request.layer_exit_signals) - except ValueError as exc: + graph_config, layer_configs = normalize_composition(self.request.composition) + compositor = build_pydantic_ai_compositor(graph_config, providers=self.layer_providers) + validate_layer_exit_signals(compositor, self.request.on_exit) + except (KeyError, TypeError, ValueError) as exc: raise AgentRunValidationError(str(exc)) from exc - session = ( - compositor.session_from_snapshot(self.request.session_snapshot) - if self.request.session_snapshot is not None - else compositor.new_session() - ) - async with compositor.enter(session) as active_session: - apply_layer_exit_signals(active_session, self.request.layer_exit_signals) - user_prompts = compositor.user_prompts - if not has_non_blank_user_prompt(user_prompts): - raise AgentRunValidationError(EMPTY_USER_PROMPTS_ERROR) - async def handle_events(_ctx: object, events: AsyncIterable[AgentStreamEvent]) -> None: - async for event in events: - _ = await emit_pydantic_ai_event(self.sink, run_id=self.run_id, data=event) + entered_run = False + try: + async with compositor.enter(configs=layer_configs, session_snapshot=self.request.session_snapshot) as run: + entered_run = True + apply_layer_exit_signals(run, self.request.on_exit) + user_prompts = run.user_prompts + if not has_non_blank_user_prompt(user_prompts): + raise AgentRunValidationError(EMPTY_USER_PROMPTS_ERROR) - try: - llm_layer = compositor.get_layer(DIFY_AGENT_MODEL_LAYER_ID, DifyPluginLLMLayer) - llm_control = active_session.layer(DIFY_AGENT_MODEL_LAYER_ID) - model = llm_layer.get_model(llm_control) - except (KeyError, TypeError, RuntimeError) as exc: + async def handle_events(_ctx: object, events: AsyncIterable[AgentStreamEvent]) -> None: + async for event in events: + _ = await emit_pydantic_ai_event(self.sink, run_id=self.run_id, data=event) + + try: + llm_layer = run.get_layer(DIFY_AGENT_MODEL_LAYER_ID, DifyPluginLLMLayer) + model = llm_layer.get_model(http_client=self.plugin_daemon_http_client) + except (KeyError, TypeError, RuntimeError) as exc: + raise AgentRunValidationError(str(exc)) from exc + + agent = create_agent(model, system_prompts=run.prompts, tools=run.tools) + result = await agent.run(normalize_user_input(user_prompts), event_stream_handler=handle_events) + output = _serialize_agent_output(result.output) + except RuntimeError as exc: + if not entered_run and is_agenton_enter_validation_runtime_error(exc): raise AgentRunValidationError(str(exc)) from exc + raise - agent = create_agent(model, system_prompts=compositor.prompts, tools=compositor.tools) - result = await agent.run(normalize_user_input(user_prompts), event_stream_handler=handle_events) + if run.session_snapshot is None: + raise RuntimeError("Agenton run did not produce a session snapshot after exit.") - return _serialize_agent_output(result.output), compositor.snapshot_session(session) + return output, run.session_snapshot def _serialize_agent_output(output: object) -> JsonValue: diff --git a/dify-agent/src/dify_agent/runtime/user_prompt_validation.py b/dify-agent/src/dify_agent/runtime/user_prompt_validation.py index c3cdd04cf4..8e8602c864 100644 --- a/dify-agent/src/dify_agent/runtime/user_prompt_validation.py +++ b/dify-agent/src/dify_agent/runtime/user_prompt_validation.py @@ -1,10 +1,10 @@ -"""Validation for effective user prompts produced by Agenton compositors. +"""Validation for effective user prompts produced by Agenton runs. -Validation happens after safe compositor construction so scheduler and runner -paths use the same semantics as the actual pydantic-ai input. Blank string fragments do not -count as meaningful input; non-string ``UserContent`` is treated as intentional -content because rich media/message parts do not have a universal whitespace -representation. +Validation happens after safe compositor construction and run entry so scheduler +and runner paths use the same transformed prompts as the actual pydantic-ai +input. Blank string fragments do not count as meaningful input; non-string +``UserContent`` is treated as intentional content because rich media/message +parts do not have a universal whitespace representation. """ from collections.abc import Sequence @@ -12,7 +12,7 @@ from collections.abc import Sequence from pydantic_ai.messages import UserContent -EMPTY_USER_PROMPTS_ERROR = "compositor.user_prompts must not be empty" +EMPTY_USER_PROMPTS_ERROR = "run.user_prompts must not be empty" def has_non_blank_user_prompt(user_prompts: Sequence[UserContent]) -> bool: diff --git a/dify-agent/src/dify_agent/server/app.py b/dify-agent/src/dify_agent/server/app.py index 75719b7a8d..9b5cf2a9c4 100644 --- a/dify-agent/src/dify_agent/server/app.py +++ b/dify-agent/src/dify_agent/server/app.py @@ -1,20 +1,22 @@ """FastAPI application factory for the Dify Agent run server. -The HTTP process owns Redis clients, route wiring, and a process-local scheduler. -Run execution happens in background ``asyncio`` tasks rather than request -handlers, so client disconnects do not cancel the agent runtime. Redis persists -run records and per-run event streams with configured retention only; it is not -used as a job queue. +The HTTP process owns Redis clients, one shared plugin daemon ``httpx.AsyncClient``, +route wiring, and a process-local scheduler. Run execution happens in background +``asyncio`` tasks rather than request handlers, so client disconnects do not +cancel the agent runtime. Redis persists run records and per-run event streams +with configured retention only; it is not used as a job queue. Agenton layers and +providers stay state-only: they borrow the lifespan-owned plugin daemon client +through the runner and never create or close it themselves. """ from collections.abc import AsyncGenerator from contextlib import asynccontextmanager +import httpx from fastapi import FastAPI from redis.asyncio import Redis -from agenton.compositor import LayerRegistry -from dify_agent.runtime.compositor_factory import create_default_layer_registry +from dify_agent.runtime.compositor_factory import create_default_layer_providers from dify_agent.runtime.run_scheduler import RunScheduler from dify_agent.server.routes.runs import create_runs_router from dify_agent.server.settings import ServerSettings @@ -24,16 +26,16 @@ from dify_agent.storage.redis_run_store import RedisRunStore def create_app(settings: ServerSettings | None = None) -> FastAPI: """Build the FastAPI app with one shared Redis store and local scheduler.""" resolved_settings = settings or ServerSettings() - layer_registry = create_default_layer_registry( + layer_providers = create_default_layer_providers( plugin_daemon_url=resolved_settings.plugin_daemon_url, plugin_daemon_api_key=resolved_settings.plugin_daemon_api_key, - plugin_daemon_timeout=resolved_settings.plugin_daemon_timeout, ) - state: dict[str, RedisRunStore | RunScheduler | LayerRegistry] = {"layer_registry": layer_registry} + state: dict[str, object] = {} @asynccontextmanager async def lifespan(_app: FastAPI) -> AsyncGenerator[None, None]: redis = Redis.from_url(resolved_settings.redis_url) + plugin_daemon_http_client = create_plugin_daemon_http_client(resolved_settings) store = RedisRunStore( redis, prefix=resolved_settings.redis_prefix, @@ -41,8 +43,9 @@ def create_app(settings: ServerSettings | None = None) -> FastAPI: ) scheduler = RunScheduler( store=store, + plugin_daemon_http_client=plugin_daemon_http_client, shutdown_grace_seconds=resolved_settings.shutdown_grace_seconds, - layer_registry=layer_registry, + layer_providers=layer_providers, ) state["store"] = store state["scheduler"] = scheduler @@ -50,6 +53,7 @@ def create_app(settings: ServerSettings | None = None) -> FastAPI: yield finally: await scheduler.shutdown() + await plugin_daemon_http_client.aclose() await redis.aclose() app = FastAPI(title="Dify Agent Run Server", version="0.1.0", lifespan=lifespan) @@ -60,14 +64,34 @@ def create_app(settings: ServerSettings | None = None) -> FastAPI: def get_scheduler() -> RunScheduler: return state["scheduler"] # pyright: ignore[reportReturnType] - def get_layer_registry() -> LayerRegistry: - return state["layer_registry"] # pyright: ignore[reportReturnType] - - app.include_router(create_runs_router(get_store, get_scheduler, get_layer_registry)) + app.include_router(create_runs_router(get_store, get_scheduler)) return app +def create_plugin_daemon_http_client(settings: ServerSettings) -> httpx.AsyncClient: + """Create the lifespan-owned plugin daemon HTTP client with configured limits. + + The returned client is shared by all local background runs in this FastAPI + process and must be closed by the app lifespan after the scheduler has stopped + using it. + """ + return httpx.AsyncClient( + timeout=httpx.Timeout( + connect=settings.plugin_daemon_connect_timeout, + read=settings.plugin_daemon_read_timeout, + write=settings.plugin_daemon_write_timeout, + pool=settings.plugin_daemon_pool_timeout, + ), + limits=httpx.Limits( + max_connections=settings.plugin_daemon_max_connections, + max_keepalive_connections=settings.plugin_daemon_max_keepalive_connections, + keepalive_expiry=settings.plugin_daemon_keepalive_expiry, + ), + trust_env=False, + ) + + app = create_app() -__all__ = ["app", "create_app"] +__all__ = ["app", "create_app", "create_plugin_daemon_http_client"] diff --git a/dify-agent/src/dify_agent/server/routes/runs.py b/dify-agent/src/dify_agent/server/routes/runs.py index 62cd3830e6..9375b1f5b7 100644 --- a/dify-agent/src/dify_agent/server/routes/runs.py +++ b/dify-agent/src/dify_agent/server/routes/runs.py @@ -13,12 +13,8 @@ from typing import Annotated from fastapi import APIRouter, Depends, Header, HTTPException, Query from fastapi.responses import StreamingResponse -from agenton.compositor import LayerRegistry from dify_agent.protocol.schemas import CreateRunRequest, CreateRunResponse, RunEventsResponse, RunStatusResponse -from dify_agent.runtime.compositor_factory import build_pydantic_ai_compositor, create_default_layer_registry -from dify_agent.runtime.layer_exit_signals import validate_layer_exit_signals -from dify_agent.runtime.run_scheduler import RunScheduler, SchedulerStoppingError -from dify_agent.runtime.user_prompt_validation import EMPTY_USER_PROMPTS_ERROR, has_non_blank_user_prompt +from dify_agent.runtime.run_scheduler import RunRequestValidationError, RunScheduler, SchedulerStoppingError from dify_agent.server.sse import sse_event_stream from dify_agent.storage.redis_run_store import RedisRunStore, RunNotFoundError @@ -26,11 +22,9 @@ from dify_agent.storage.redis_run_store import RedisRunStore, RunNotFoundError def create_runs_router( get_store: Callable[[], RedisRunStore], get_scheduler: Callable[[], RunScheduler], - get_layer_registry: Callable[[], LayerRegistry] | None = None, ) -> APIRouter: """Create routes bound to the application's store dependency provider.""" router = APIRouter(prefix="/runs", tags=["runs"]) - resolved_get_layer_registry = get_layer_registry or create_default_layer_registry async def store_dep() -> RedisRunStore: return get_store() @@ -43,19 +37,10 @@ def create_runs_router( request: CreateRunRequest, scheduler: Annotated[RunScheduler, Depends(scheduler_dep)], ) -> CreateRunResponse: - try: - compositor = build_pydantic_ai_compositor( - request.compositor, - registry=resolved_get_layer_registry(), - ) - validate_layer_exit_signals(compositor, request.layer_exit_signals) - except Exception as exc: - raise HTTPException(status_code=422, detail=str(exc)) from exc - if not has_non_blank_user_prompt(compositor.user_prompts): - raise HTTPException(status_code=422, detail=EMPTY_USER_PROMPTS_ERROR) - try: record = await scheduler.create_run(request) + except RunRequestValidationError as exc: + raise HTTPException(status_code=422, detail=str(exc)) from exc except SchedulerStoppingError as exc: raise HTTPException(status_code=503, detail="run scheduler is shutting down") from exc return CreateRunResponse(run_id=record.run_id, status=record.status) diff --git a/dify-agent/src/dify_agent/server/settings.py b/dify-agent/src/dify_agent/server/settings.py index 9186540b7d..18c0ce7747 100644 --- a/dify-agent/src/dify_agent/server/settings.py +++ b/dify-agent/src/dify_agent/server/settings.py @@ -1,4 +1,10 @@ -"""Configuration for the FastAPI run server.""" +"""Configuration for the FastAPI run server. + +Plugin daemon HTTP client settings describe the single FastAPI lifespan-owned +``httpx.AsyncClient`` shared by local run tasks. Layers and Agenton providers do +not own that client, so these settings are process resource limits rather than +per-run lifecycle knobs. +""" from typing import ClassVar @@ -17,7 +23,13 @@ class ServerSettings(BaseSettings): run_retention_seconds: int = Field(default=DEFAULT_RUN_RETENTION_SECONDS, ge=1) plugin_daemon_url: str = "http://localhost:5002" plugin_daemon_api_key: str = "" - plugin_daemon_timeout: float | None = 600.0 + plugin_daemon_connect_timeout: float = Field(default=10.0, ge=0) + plugin_daemon_read_timeout: float = Field(default=600.0, ge=0) + plugin_daemon_write_timeout: float = Field(default=30.0, ge=0) + plugin_daemon_pool_timeout: float = Field(default=10.0, ge=0) + plugin_daemon_max_connections: int = Field(default=100, ge=1) + plugin_daemon_max_keepalive_connections: int = Field(default=20, ge=0) + plugin_daemon_keepalive_expiry: float = Field(default=30.0, ge=0) model_config: ClassVar[SettingsConfigDict] = SettingsConfigDict( env_prefix="DIFY_AGENT_", diff --git a/dify-agent/tests/local/dify_agent/client/test_client.py b/dify-agent/tests/local/dify_agent/client/test_client.py index 64856ef59c..81ae96a700 100644 --- a/dify-agent/tests/local/dify_agent/client/test_client.py +++ b/dify-agent/tests/local/dify_agent/client/test_client.py @@ -31,7 +31,7 @@ from dify_agent.protocol.schemas import ( def _create_run_payload() -> dict[str, object]: return { - "compositor": { + "composition": { "schema_version": 1, "layers": [{"name": "prompt", "type": "plain.prompt", "config": {"user": "hello"}}], } @@ -76,9 +76,10 @@ def test_sync_methods_parse_protocol_dtos_and_send_create_request_dto() -> None: def handler(request: httpx.Request) -> httpx.Response: if request.method == "POST" and request.url.path == "/runs": payload = cast(dict[str, object], json.loads(request.content)) - compositor = cast(dict[str, object], payload["compositor"]) - layers = cast(list[dict[str, object]], compositor["layers"]) + composition = cast(dict[str, object], payload["composition"]) + layers = cast(list[dict[str, object]], composition["layers"]) assert layers[0]["config"] == {"user": "hello"} + assert "compositor" not in payload assert "agent_profile" not in payload return httpx.Response(202, json={"run_id": "run-1", "status": "running"}) if request.method == "GET" and request.url.path == "/runs/run-1": @@ -207,7 +208,7 @@ def test_create_run_is_not_retried_after_timeout() -> None: def test_sync_sse_parser_supports_comments_multiline_data_and_id_fill() -> None: payload = RUN_EVENT_ADAPTER.dump_json(RunStartedEvent(run_id="run-1"), exclude={"id"}).decode() before_type, after_type = payload.split('"type"', maxsplit=1) - body = f": keepalive\nid: 5-0\nevent: run_started\ndata: {before_type}\ndata: \"type\"{after_type}\n\n" + body = f': keepalive\nid: 5-0\nevent: run_started\ndata: {before_type}\ndata: "type"{after_type}\n\n' def handler(request: httpx.Request) -> httpx.Response: assert request.url.params["after"] == "0-0" diff --git a/dify-agent/tests/local/dify_agent/layers/dify_plugin/test_layers.py b/dify-agent/tests/local/dify_agent/layers/dify_plugin/test_layers.py index 13b5250970..0484ebe65a 100644 --- a/dify-agent/tests/local/dify_agent/layers/dify_plugin/test_layers.py +++ b/dify-agent/tests/local/dify_agent/layers/dify_plugin/test_layers.py @@ -1,190 +1,120 @@ import asyncio -from collections import OrderedDict -from typing import cast +import httpx import pytest -from agenton.compositor import Compositor -from agenton.layers import EmptyRuntimeHandles, EmptyRuntimeState, LayerControl, PlainPromptType, PlainToolType +from agenton.compositor import Compositor, LayerNode, LayerProvider from dify_agent.adapters.llm import DifyLLMAdapterModel from dify_agent.layers.dify_plugin.configs import DifyPluginLLMLayerConfig, DifyPluginLayerConfig from dify_agent.layers.dify_plugin.llm_layer import DifyPluginLLMLayer -from dify_agent.layers.dify_plugin.plugin_layer import DifyPluginLayer, DifyPluginRuntimeHandles +from dify_agent.layers.dify_plugin.plugin_layer import DifyPluginLayer + + +def _plugin_config() -> DifyPluginLayerConfig: + return DifyPluginLayerConfig(tenant_id="tenant-1", plugin_id="langgenius/openai", user_id="user-1") + + +def _llm_config() -> DifyPluginLLMLayerConfig: + return DifyPluginLLMLayerConfig( + model_provider="openai", + model="demo-model", + credentials={"api_key": "secret"}, + model_settings={"temperature": 0.2}, + ) def _plugin_layer() -> DifyPluginLayer: return DifyPluginLayer.from_config_with_settings( - DifyPluginLayerConfig(tenant_id="tenant-1", plugin_id="langgenius/openai", user_id="user-1"), + _plugin_config(), daemon_url="http://plugin-daemon", daemon_api_key="daemon-secret", - timeout=12, ) -def _llm_layer() -> DifyPluginLLMLayer: - return DifyPluginLLMLayer.from_config( - DifyPluginLLMLayerConfig( - model_provider="openai", - model="demo-model", - credentials={"api_key": "secret"}, - model_settings={"temperature": 0.2}, - ) +def _plugin_provider() -> LayerProvider[DifyPluginLayer]: + return LayerProvider.from_factory( + layer_type=DifyPluginLayer, + create=lambda config: DifyPluginLayer.from_config_with_settings( + DifyPluginLayerConfig.model_validate(config), + daemon_url="http://plugin-daemon", + daemon_api_key="daemon-secret", + ), ) -def _plugin_control(control: LayerControl) -> LayerControl[EmptyRuntimeState, DifyPluginRuntimeHandles]: - return cast(LayerControl[EmptyRuntimeState, DifyPluginRuntimeHandles], control) - - -def _llm_control(control: LayerControl) -> LayerControl[EmptyRuntimeState, EmptyRuntimeHandles]: - return cast(LayerControl[EmptyRuntimeState, EmptyRuntimeHandles], control) - - -def test_dify_plugin_layer_uses_resource_stack_and_get_daemon_provider_requires_active_control() -> None: +def test_dify_plugin_layer_creates_daemon_provider_from_shared_http_client() -> None: async def scenario() -> None: plugin = _plugin_layer() - compositor: Compositor[PlainPromptType, PlainToolType] = Compositor(layers=OrderedDict([("plugin", plugin)])) - session = compositor.new_session() + async with httpx.AsyncClient(transport=httpx.MockTransport(lambda _request: httpx.Response(200))) as client: + provider = plugin.create_daemon_provider(http_client=client) - with pytest.raises(RuntimeError, match="requires an active LayerControl"): - _ = plugin.get_daemon_provider(_plugin_control(session.layer("plugin"))) - - async with compositor.enter(session) as active_session: - handles = cast(DifyPluginRuntimeHandles, cast(object, session.layer("plugin").runtime_handles)) - first_client = handles.http_client - assert first_client is not None - provider = plugin.get_daemon_provider(_plugin_control(session.layer("plugin"))) assert provider.name == "DifyPlugin/langgenius/openai" - assert provider.client.http_client is first_client + assert provider.client.http_client is client assert provider.client.tenant_id == "tenant-1" assert provider.client.plugin_id == "langgenius/openai" assert provider.client.user_id == "user-1" + async with provider: pass - assert first_client.is_closed is False - active_session.suspend_on_exit() - - assert handles.http_client is None - assert first_client.is_closed is True - with pytest.raises(RuntimeError, match="requires an active LayerControl"): - _ = plugin.get_daemon_provider(_plugin_control(session.layer("plugin"))) - - async with compositor.enter(session): - second_client = handles.http_client - assert second_client is not None - assert second_client is not first_client - - assert handles.http_client is None - assert second_client.is_closed is True + assert client.is_closed is False asyncio.run(scenario()) -def test_dify_plugin_layer_get_daemon_provider_rejects_wrong_control() -> None: +def test_dify_plugin_layer_rejects_closed_shared_http_client() -> None: async def scenario() -> None: plugin = _plugin_layer() - llm = _llm_layer() - compositor: Compositor[PlainPromptType, PlainToolType] = Compositor( - layers=OrderedDict([("plugin", plugin), ("llm", llm)]), - deps_name_mapping={"llm": {"plugin": "plugin"}}, + client = httpx.AsyncClient() + await client.aclose() + + with pytest.raises(RuntimeError, match="open shared HTTP client"): + _ = plugin.create_daemon_provider(http_client=client) + + asyncio.run(scenario()) + + +def test_dify_plugin_llm_layer_builds_adapter_model_from_direct_dependency() -> None: + async def scenario() -> None: + compositor = Compositor( + [ + LayerNode("renamed-plugin", _plugin_provider()), + LayerNode("llm", DifyPluginLLMLayer, deps={"plugin": "renamed-plugin"}), + ] ) + async with httpx.AsyncClient(transport=httpx.MockTransport(lambda _request: httpx.Response(200))) as client: + async with compositor.enter( + configs={ + "renamed-plugin": _plugin_config(), + "llm": _llm_config(), + } + ) as run: + plugin = run.get_layer("renamed-plugin", DifyPluginLayer) + llm = run.get_layer("llm", DifyPluginLLMLayer) - async with compositor.enter() as session: - with pytest.raises(RuntimeError, match="belongs to layer 'llm'"): - _ = plugin.get_daemon_provider(_plugin_control(session.layer("llm"))) + model = llm.get_model(http_client=client) + + assert llm.deps.plugin is plugin + assert isinstance(model, DifyLLMAdapterModel) + assert model.model_name == "demo-model" + assert model.model_provider == "openai" + assert model.credentials == {"api_key": "secret"} + assert model.provider.name == "DifyPlugin/langgenius/openai" + assert model.provider.client.http_client is client asyncio.run(scenario()) -def test_dify_plugin_llm_layer_builds_adapter_model_from_dependency_provider() -> None: +def test_dify_plugin_layer_lifecycle_does_not_manage_http_client() -> None: async def scenario() -> None: - plugin = _plugin_layer() - llm = _llm_layer() - compositor: Compositor[PlainPromptType, PlainToolType] = Compositor( - layers=OrderedDict([("plugin", plugin), ("llm", llm)]), - deps_name_mapping={"llm": {"plugin": "plugin"}}, - ) + compositor = Compositor([LayerNode("plugin", _plugin_provider())]) + async with httpx.AsyncClient(transport=httpx.MockTransport(lambda _request: httpx.Response(200))) as client: + async with compositor.enter(configs={"plugin": _plugin_config()}) as run: + plugin = run.get_layer("plugin", DifyPluginLayer) + provider = plugin.create_daemon_provider(http_client=client) + run.suspend_layer_on_exit("plugin") - session = compositor.new_session() - with pytest.raises(RuntimeError, match="requires an active LayerControl"): - _ = llm.get_model(_llm_control(session.layer("llm"))) - - async with compositor.enter(session): - model = llm.get_model(_llm_control(session.layer("llm"))) - assert isinstance(model, DifyLLMAdapterModel) - assert model.model_name == "demo-model" - assert model.model_provider == "openai" - assert model.credentials == {"api_key": "secret"} - assert model.provider.name == "DifyPlugin/langgenius/openai" - handles = cast(DifyPluginRuntimeHandles, cast(object, session.layer("plugin").runtime_handles)) - assert model.provider.client.http_client is handles.http_client - - with pytest.raises(RuntimeError, match="belongs to layer 'plugin'"): - _ = llm.get_model(_llm_control(session.layer("plugin"))) - - asyncio.run(scenario()) - - -def test_dify_plugin_llm_layer_get_model_uses_control_dependency_lookup(monkeypatch: pytest.MonkeyPatch) -> None: - async def scenario() -> None: - plugin = _plugin_layer() - llm = _llm_layer() - compositor: Compositor[PlainPromptType, PlainToolType] = Compositor( - layers=OrderedDict([("renamed-plugin", plugin), ("llm", llm)]), - deps_name_mapping={"llm": {"plugin": "renamed-plugin"}}, - ) - - async with compositor.enter() as session: - llm_control = session.layer("llm") - plugin_control = session.layer("renamed-plugin") - calls: list[object] = [] - - def fake_control_for(self: LayerControl, dep_layer: object) -> object: - assert self is llm_control - calls.append(dep_layer) - return plugin_control - - monkeypatch.setattr(LayerControl, "control_for", fake_control_for) - - model = llm.get_model(llm_control) - - assert calls == [plugin] - assert isinstance(model, DifyLLMAdapterModel) - - asyncio.run(scenario()) - - -def test_dify_plugin_layer_concurrent_sessions_use_separate_controls_and_clients() -> None: - async def scenario() -> None: - plugin = _plugin_layer() - compositor: Compositor[PlainPromptType, PlainToolType] = Compositor(layers=OrderedDict([("plugin", plugin)])) - first_session = compositor.new_session() - second_session = compositor.new_session() - - async with compositor.enter(first_session): - async with compositor.enter(second_session): - first_handles = cast( - DifyPluginRuntimeHandles, - cast(object, first_session.layer("plugin").runtime_handles), - ) - second_handles = cast( - DifyPluginRuntimeHandles, - cast(object, second_session.layer("plugin").runtime_handles), - ) - first_client = first_handles.http_client - second_client = second_handles.http_client - assert first_client is not None - assert second_client is not None - assert first_client is not second_client - - first_provider = plugin.get_daemon_provider(_plugin_control(first_session.layer("plugin"))) - second_provider = plugin.get_daemon_provider(_plugin_control(second_session.layer("plugin"))) - assert first_provider.client.http_client is first_client - assert second_provider.client.http_client is second_client - - assert second_client.is_closed is True - assert first_client.is_closed is False - - assert first_client.is_closed is True + assert run.session_snapshot is not None + assert provider.client.http_client is client + assert client.is_closed is False asyncio.run(scenario()) diff --git a/dify-agent/tests/local/dify_agent/protocol/test_protocol_schemas.py b/dify-agent/tests/local/dify_agent/protocol/test_protocol_schemas.py index 76f79feca3..cd96f233da 100644 --- a/dify-agent/tests/local/dify_agent/protocol/test_protocol_schemas.py +++ b/dify-agent/tests/local/dify_agent/protocol/test_protocol_schemas.py @@ -2,8 +2,9 @@ import pytest from pydantic import ValidationError from pydantic_ai.messages import FinalResultEvent -from agenton.layers import ExitIntent from agenton.compositor import CompositorSessionSnapshot +from agenton.layers import ExitIntent +from agenton_collections.layers.plain import PromptLayerConfig import dify_agent.protocol as protocol_exports from dify_agent.protocol import DIFY_AGENT_MODEL_LAYER_ID from dify_agent.protocol.schemas import ( @@ -11,12 +12,16 @@ from dify_agent.protocol.schemas import ( CreateRunRequest, LayerExitSignals, PydanticAIStreamRunEvent, + RunComposition, RunFailedEvent, RunFailedEventData, + RunLayerSpec, RunStartedEvent, RunSucceededEvent, RunSucceededEventData, + normalize_composition, ) +from dify_agent.layers.dify_plugin.configs import DifyPluginLLMLayerConfig, DifyPluginLayerConfig def test_run_event_adapter_round_trips_typed_variants() -> None: @@ -54,38 +59,84 @@ def test_pydantic_ai_event_data_uses_agent_stream_event_model() -> None: assert isinstance(event.data, FinalResultEvent) -def test_create_run_request_rejects_agent_profile_and_model_layer_id_is_public() -> None: +def test_create_run_request_rejects_old_compositor_payload_and_model_layer_id_is_public() -> None: assert DIFY_AGENT_MODEL_LAYER_ID == "llm" with pytest.raises(ValidationError): _ = CreateRunRequest.model_validate( { "compositor": {"layers": []}, - "agent_profile": {"provider": "test", "output_text": "done"}, } ) -def test_layer_exit_signals_default_to_suspend_and_are_public() -> None: +def test_create_run_request_accepts_dto_first_public_composition_and_normalizes_graph_config() -> None: + prompt_config = PromptLayerConfig(prefix="system", user="hello") + plugin_config = DifyPluginLayerConfig(tenant_id="tenant-1", plugin_id="langgenius/openai") + llm_config = DifyPluginLLMLayerConfig( + model_provider="openai", + model="demo-model", + credentials={"api_key": "secret"}, + ) + request = CreateRunRequest( + composition=RunComposition( + layers=[ + RunLayerSpec(name="prompt", type="plain.prompt", config=prompt_config), + RunLayerSpec(name="plugin", type="dify.plugin", config=plugin_config), + RunLayerSpec( + name=DIFY_AGENT_MODEL_LAYER_ID, + type="dify.plugin.llm", + deps={"plugin": "plugin"}, + config=llm_config, + ), + ] + ) + ) + + graph_config, layer_configs = normalize_composition(request.composition) + payload = request.model_dump(mode="json") + + assert payload["composition"]["layers"][0]["config"] == {"prefix": "system", "user": "hello", "suffix": []} + assert [layer.model_dump(mode="json") for layer in graph_config.layers] == [ + {"name": "prompt", "type": "plain.prompt", "deps": {}, "metadata": {}}, + {"name": "plugin", "type": "dify.plugin", "deps": {}, "metadata": {}}, + { + "name": DIFY_AGENT_MODEL_LAYER_ID, + "type": "dify.plugin.llm", + "deps": {"plugin": "plugin"}, + "metadata": {}, + }, + ] + assert layer_configs == { + "prompt": prompt_config, + "plugin": plugin_config, + DIFY_AGENT_MODEL_LAYER_ID: llm_config, + } + + +def test_on_exit_default_to_suspend_and_are_public() -> None: assert protocol_exports.LayerExitSignals is LayerExitSignals - request = CreateRunRequest.model_validate({"compositor": {"layers": []}}) + assert protocol_exports.RunComposition is RunComposition + assert protocol_exports.RunLayerSpec is RunLayerSpec + assert protocol_exports.normalize_composition is normalize_composition + request = CreateRunRequest.model_validate({"composition": {"layers": []}}) - assert request.layer_exit_signals.default is ExitIntent.SUSPEND - assert request.layer_exit_signals.layers == {} + assert request.on_exit.default is ExitIntent.SUSPEND + assert request.on_exit.layers == {} -def test_layer_exit_signals_accept_layer_overrides() -> None: +def test_on_exit_accept_layer_overrides() -> None: request = CreateRunRequest.model_validate( { - "compositor": {"layers": []}, - "layer_exit_signals": { + "composition": {"layers": []}, + "on_exit": { "default": "delete", "layers": {"prompt": "suspend", "llm": "delete"}, }, } ) - assert request.layer_exit_signals.default is ExitIntent.DELETE - assert request.layer_exit_signals.layers == {"prompt": ExitIntent.SUSPEND, "llm": ExitIntent.DELETE} + assert request.on_exit.default is ExitIntent.DELETE + assert request.on_exit.layers == {"prompt": ExitIntent.SUSPEND, "llm": ExitIntent.DELETE} def test_layer_exit_signals_reject_extra_fields() -> None: diff --git a/dify-agent/tests/local/dify_agent/runtime/test_run_scheduler.py b/dify-agent/tests/local/dify_agent/runtime/test_run_scheduler.py index 51e60a2508..07dedd0886 100644 --- a/dify-agent/tests/local/dify_agent/runtime/test_run_scheduler.py +++ b/dify-agent/tests/local/dify_agent/runtime/test_run_scheduler.py @@ -1,21 +1,28 @@ import asyncio from collections import defaultdict -from typing import cast +import httpx import pytest -from pydantic import JsonValue -from agenton.compositor import CompositorConfig, LayerNodeConfig -from agenton.layers import ExitIntent -from dify_agent.protocol.schemas import CreateRunRequest, LayerExitSignals, RunEvent, RunStatus -from dify_agent.runtime.run_scheduler import RunScheduler, SchedulerStoppingError +from agenton.compositor import CompositorSessionSnapshot, LayerSessionSnapshot +from agenton.layers import ExitIntent, LifecycleState +from agenton_collections.layers.plain import PromptLayerConfig +from dify_agent.protocol.schemas import ( + CreateRunRequest, + LayerExitSignals, + RunComposition, + RunEvent, + RunLayerSpec, + RunStatus, +) +from dify_agent.runtime.run_scheduler import RunRequestValidationError, RunScheduler, SchedulerStoppingError, validate_run_request from dify_agent.server.schemas import RunRecord def _request(user: str | list[str] = "hello") -> CreateRunRequest: return CreateRunRequest( - compositor=CompositorConfig( - layers=[LayerNodeConfig(name="prompt", type="plain.prompt", config=cast(JsonValue, {"user": user}))] + composition=RunComposition( + layers=[RunLayerSpec(name="prompt", type="plain.prompt", config=PromptLayerConfig(user=user))] ) ) @@ -82,20 +89,22 @@ def test_create_run_starts_background_task_and_returns_running() -> None: store = FakeStore() started = asyncio.Event() release = asyncio.Event() - scheduler = RunScheduler( - store=store, - runner_factory=lambda _record, _request: ControlledRunner(started=started, release=release), - ) + async with httpx.AsyncClient() as client: + scheduler = RunScheduler( + store=store, + plugin_daemon_http_client=client, + runner_factory=lambda _record, _request: ControlledRunner(started=started, release=release), + ) - record = await scheduler.create_run(_request()) - await asyncio.wait_for(started.wait(), timeout=1) + record = await scheduler.create_run(_request()) + await asyncio.wait_for(started.wait(), timeout=1) - assert record.status == "running" - assert list(scheduler.active_tasks) == [record.run_id] - _ = release.set() - await asyncio.wait_for(scheduler.active_tasks[record.run_id], timeout=1) - await asyncio.sleep(0) - assert scheduler.active_tasks == {} + assert record.status == "running" + assert list(scheduler.active_tasks) == [record.run_id] + _ = release.set() + await asyncio.wait_for(scheduler.active_tasks[record.run_id], timeout=1) + await asyncio.sleep(0) + assert scheduler.active_tasks == {} asyncio.run(scenario()) @@ -104,21 +113,23 @@ def test_shutdown_marks_unfinished_runs_failed_and_appends_event() -> None: async def scenario() -> None: store = FakeStore() started = asyncio.Event() - scheduler = RunScheduler( - store=store, - shutdown_grace_seconds=0, - runner_factory=lambda _record, _request: ControlledRunner(started=started, release=asyncio.Event()), - ) - record = await scheduler.create_run(_request()) - await asyncio.wait_for(started.wait(), timeout=1) + async with httpx.AsyncClient() as client: + scheduler = RunScheduler( + store=store, + plugin_daemon_http_client=client, + shutdown_grace_seconds=0, + runner_factory=lambda _record, _request: ControlledRunner(started=started, release=asyncio.Event()), + ) + record = await scheduler.create_run(_request()) + await asyncio.wait_for(started.wait(), timeout=1) - await scheduler.shutdown() + await scheduler.shutdown() - assert scheduler.stopping is True - assert scheduler.active_tasks == {} - assert store.statuses[record.run_id] == "failed" - assert store.errors[record.run_id] == "run cancelled during server shutdown" - assert [event.type for event in store.events[record.run_id]] == ["run_failed"] + assert scheduler.stopping is True + assert scheduler.active_tasks == {} + assert store.statuses[record.run_id] == "failed" + assert store.errors[record.run_id] == "run cancelled during server shutdown" + assert [event.type for event in store.events[record.run_id]] == ["run_failed"] asyncio.run(scenario()) @@ -126,25 +137,73 @@ def test_shutdown_marks_unfinished_runs_failed_and_appends_event() -> None: def test_create_run_rejects_blank_prompt_before_persisting() -> None: async def scenario() -> None: store = FakeStore() - scheduler = RunScheduler(store=store) + async with httpx.AsyncClient() as client: + scheduler = RunScheduler(store=store, plugin_daemon_http_client=client) - with pytest.raises(ValueError, match="compositor.user_prompts must not be empty"): - await scheduler.create_run(_request(["", " "])) + with pytest.raises(ValueError, match="run.user_prompts must not be empty"): + await scheduler.create_run(_request(["", " "])) assert store.records == {} asyncio.run(scenario()) +def test_validate_run_request_honors_explicit_empty_layer_providers() -> None: + async def scenario() -> None: + with pytest.raises(RunRequestValidationError, match="plain.prompt"): + await validate_run_request(_request(), layer_providers=()) + + asyncio.run(scenario()) + + def test_create_run_rejects_unknown_layer_exit_signal_before_persisting() -> None: async def scenario() -> None: store = FakeStore() - scheduler = RunScheduler(store=store) - request = _request() - request.layer_exit_signals = LayerExitSignals(layers={"missing": ExitIntent.DELETE}) + async with httpx.AsyncClient() as client: + scheduler = RunScheduler(store=store, plugin_daemon_http_client=client) + request = _request() + request.on_exit = LayerExitSignals(layers={"missing": ExitIntent.DELETE}) - with pytest.raises(ValueError, match="missing"): - await scheduler.create_run(request) + with pytest.raises(ValueError, match="missing"): + await scheduler.create_run(request) + + assert store.records == {} + + asyncio.run(scenario()) + + +def test_create_run_honors_explicit_empty_layer_providers_before_persisting() -> None: + async def scenario() -> None: + store = FakeStore() + async with httpx.AsyncClient() as client: + scheduler = RunScheduler(store=store, plugin_daemon_http_client=client, layer_providers=()) + + with pytest.raises(RunRequestValidationError, match="plain.prompt"): + await scheduler.create_run(_request()) + + assert store.records == {} + + asyncio.run(scenario()) + + +def test_create_run_rejects_closed_session_snapshot_before_persisting() -> None: + async def scenario() -> None: + store = FakeStore() + async with httpx.AsyncClient() as client: + scheduler = RunScheduler(store=store, plugin_daemon_http_client=client) + request = _request() + request.session_snapshot = CompositorSessionSnapshot( + layers=[ + LayerSessionSnapshot( + name="prompt", + lifecycle_state=LifecycleState.CLOSED, + runtime_state={}, + ) + ] + ) + + with pytest.raises(ValueError, match="CLOSED snapshots cannot be entered"): + _ = await scheduler.create_run(request) assert store.records == {} @@ -153,11 +212,27 @@ def test_create_run_rejects_unknown_layer_exit_signal_before_persisting() -> Non def test_create_run_rejects_after_shutdown_starts() -> None: async def scenario() -> None: - scheduler = RunScheduler(store=FakeStore()) - await scheduler.shutdown() + async with httpx.AsyncClient() as client: + scheduler = RunScheduler(store=FakeStore(), plugin_daemon_http_client=client) + await scheduler.shutdown() - with pytest.raises(SchedulerStoppingError): - await scheduler.create_run(_request()) + with pytest.raises(SchedulerStoppingError): + await scheduler.create_run(_request()) + + asyncio.run(scenario()) + + +def test_create_run_rejects_invalid_request_after_shutdown_without_persisting() -> None: + async def scenario() -> None: + store = FakeStore() + async with httpx.AsyncClient() as client: + scheduler = RunScheduler(store=store, plugin_daemon_http_client=client) + await scheduler.shutdown() + + with pytest.raises(SchedulerStoppingError): + _ = await scheduler.create_run(_request(["", " "])) + + assert store.records == {} asyncio.run(scenario()) @@ -168,30 +243,34 @@ def test_shutdown_waits_for_in_flight_create_to_register_before_cancelling() -> release_create = asyncio.Event() runner_started = asyncio.Event() store = SlowCreateStore(create_started=create_started, release_create=release_create) - scheduler = RunScheduler( - store=store, - shutdown_grace_seconds=0, - runner_factory=lambda _record, _request: ControlledRunner(started=runner_started, release=asyncio.Event()), - ) + async with httpx.AsyncClient() as client: + scheduler = RunScheduler( + store=store, + plugin_daemon_http_client=client, + shutdown_grace_seconds=0, + runner_factory=lambda _record, _request: ControlledRunner( + started=runner_started, release=asyncio.Event() + ), + ) - create_task = asyncio.create_task(scheduler.create_run(_request())) - await asyncio.wait_for(create_started.wait(), timeout=1) - shutdown_task = asyncio.create_task(scheduler.shutdown()) - await asyncio.sleep(0) + create_task = asyncio.create_task(scheduler.create_run(_request())) + await asyncio.wait_for(create_started.wait(), timeout=1) + shutdown_task = asyncio.create_task(scheduler.shutdown()) + await asyncio.sleep(0) - assert shutdown_task.done() is False - assert scheduler.stopping is False + assert shutdown_task.done() is False + assert scheduler.stopping is False - _ = release_create.set() - record = await asyncio.wait_for(create_task, timeout=1) - await asyncio.wait_for(shutdown_task, timeout=1) + _ = release_create.set() + record = await asyncio.wait_for(create_task, timeout=1) + await asyncio.wait_for(shutdown_task, timeout=1) - assert scheduler.stopping is True - assert scheduler.active_tasks == {} - assert store.statuses[record.run_id] == "failed" - assert [event.type for event in store.events[record.run_id]] == ["run_failed"] + assert scheduler.stopping is True + assert scheduler.active_tasks == {} + assert store.statuses[record.run_id] == "failed" + assert [event.type for event in store.events[record.run_id]] == ["run_failed"] - with pytest.raises(SchedulerStoppingError): - await scheduler.create_run(_request()) + with pytest.raises(SchedulerStoppingError): + await scheduler.create_run(_request()) asyncio.run(scenario()) diff --git a/dify-agent/tests/local/dify_agent/runtime/test_runner.py b/dify-agent/tests/local/dify_agent/runtime/test_runner.py index 8b0a5a8a8a..85a15ad022 100644 --- a/dify-agent/tests/local/dify_agent/runtime/test_runner.py +++ b/dify-agent/tests/local/dify_agent/runtime/test_runner.py @@ -1,16 +1,22 @@ import asyncio +import httpx import pytest from pydantic_ai.models.test import TestModel -from agenton.compositor import CompositorConfig, LayerNodeConfig -from agenton.layers import ExitIntent, LayerControl, LifecycleState +from agenton.compositor import CompositorSessionSnapshot, LayerSessionSnapshot +from agenton.layers import ExitIntent, LifecycleState from agenton_collections.layers.plain import PromptLayerConfig from dify_agent.layers.dify_plugin.configs import DifyPluginLLMLayerConfig, DifyPluginLayerConfig from dify_agent.layers.dify_plugin.llm_layer import DifyPluginLLMLayer -from dify_agent.layers.dify_plugin.plugin_layer import DifyPluginRuntimeHandles from dify_agent.protocol import DIFY_AGENT_MODEL_LAYER_ID -from dify_agent.protocol.schemas import CreateRunRequest, LayerExitSignals, RunSucceededEvent +from dify_agent.protocol.schemas import ( + CreateRunRequest, + LayerExitSignals, + RunComposition, + RunLayerSpec, + RunSucceededEvent, +) from dify_agent.runtime.event_sink import InMemoryRunEventSink from dify_agent.runtime.runner import AgentRunRunner, AgentRunValidationError @@ -20,22 +26,22 @@ def _request( *, llm_layer_name: str = DIFY_AGENT_MODEL_LAYER_ID, plugin_layer_name: str = "plugin", - layer_exit_signals: LayerExitSignals | None = None, + on_exit: LayerExitSignals | None = None, ) -> CreateRunRequest: return CreateRunRequest( - compositor=CompositorConfig( + composition=RunComposition( layers=[ - LayerNodeConfig( + RunLayerSpec( name="prompt", type="plain.prompt", config=PromptLayerConfig(prefix="system", user=user), ), - LayerNodeConfig( + RunLayerSpec( name=plugin_layer_name, type="dify.plugin", config=DifyPluginLayerConfig(tenant_id="tenant-1", plugin_id="langgenius/openai"), ), - LayerNodeConfig( + RunLayerSpec( name=llm_layer_name, type="dify.plugin.llm", deps={"plugin": plugin_layer_name}, @@ -47,24 +53,35 @@ def _request( ), ] ), - layer_exit_signals=layer_exit_signals or LayerExitSignals(), + on_exit=on_exit or LayerExitSignals(), ) def test_runner_emits_terminal_success_and_snapshot(monkeypatch: pytest.MonkeyPatch) -> None: - def fake_get_model(self: DifyPluginLLMLayer, control: LayerControl): + seen_clients: list[httpx.AsyncClient] = [] + + def fake_get_model(self: DifyPluginLLMLayer, *, http_client: httpx.AsyncClient): assert self.config.model == "demo-model" - plugin_control = control.control_for(self.deps.plugin) - plugin_handles = plugin_control.runtime_handles - assert isinstance(plugin_handles, DifyPluginRuntimeHandles) - assert plugin_handles.http_client is not None + assert self.deps.plugin.config.plugin_id == "langgenius/openai" + seen_clients.append(http_client) return TestModel(custom_output_text="done") # pyright: ignore[reportReturnType] monkeypatch.setattr(DifyPluginLLMLayer, "get_model", fake_get_model) request = _request(plugin_layer_name="renamed-plugin") sink = InMemoryRunEventSink() - asyncio.run(AgentRunRunner(sink=sink, request=request, run_id="run-1").run()) + async def scenario() -> None: + async with httpx.AsyncClient() as client: + await AgentRunRunner( + sink=sink, + request=request, + run_id="run-1", + plugin_daemon_http_client=client, + ).run() + assert seen_clients == [client] + assert client.is_closed is False + + asyncio.run(scenario()) event_types = [event.type for event in sink.events["run-1"]] assert event_types[0] == "run_started" @@ -80,7 +97,7 @@ def test_runner_emits_terminal_success_and_snapshot(monkeypatch: pytest.MonkeyPa "renamed-plugin", DIFY_AGENT_MODEL_LAYER_ID, ] - assert [layer.state for layer in terminal.data.session_snapshot.layers] == [ + assert [layer.lifecycle_state for layer in terminal.data.session_snapshot.layers] == [ LifecycleState.SUSPENDED, LifecycleState.SUSPENDED, LifecycleState.SUSPENDED, @@ -88,70 +105,96 @@ def test_runner_emits_terminal_success_and_snapshot(monkeypatch: pytest.MonkeyPa assert sink.statuses["run-1"] == "succeeded" -def test_runner_applies_layer_exit_signal_overrides_to_success_snapshot(monkeypatch: pytest.MonkeyPatch) -> None: - def fake_get_model(_self: DifyPluginLLMLayer, _control: LayerControl): +def test_runner_applies_on_exit_overrides_to_success_snapshot(monkeypatch: pytest.MonkeyPatch) -> None: + def fake_get_model(_self: DifyPluginLLMLayer, *, http_client: httpx.AsyncClient): + assert http_client.is_closed is False return TestModel(custom_output_text="done") # pyright: ignore[reportReturnType] monkeypatch.setattr(DifyPluginLLMLayer, "get_model", fake_get_model) request = _request( - layer_exit_signals=LayerExitSignals( + on_exit=LayerExitSignals( default=ExitIntent.SUSPEND, layers={"prompt": ExitIntent.DELETE, DIFY_AGENT_MODEL_LAYER_ID: ExitIntent.DELETE}, ) ) sink = InMemoryRunEventSink() - asyncio.run(AgentRunRunner(sink=sink, request=request, run_id="run-exit").run()) + async def scenario() -> None: + async with httpx.AsyncClient() as client: + await AgentRunRunner( + sink=sink, + request=request, + run_id="run-exit", + plugin_daemon_http_client=client, + ).run() + + asyncio.run(scenario()) terminal = sink.events["run-exit"][-1] assert isinstance(terminal, RunSucceededEvent) - assert {layer.name: layer.state for layer in terminal.data.session_snapshot.layers} == { + assert {layer.name: layer.lifecycle_state for layer in terminal.data.session_snapshot.layers} == { "prompt": LifecycleState.CLOSED, "plugin": LifecycleState.SUSPENDED, DIFY_AGENT_MODEL_LAYER_ID: LifecycleState.CLOSED, } -def test_runner_rejects_unknown_layer_exit_signal_id() -> None: - request = _request(layer_exit_signals=LayerExitSignals(layers={"missing": ExitIntent.DELETE})) +def test_runner_rejects_unknown_on_exit_layer_id() -> None: + request = _request(on_exit=LayerExitSignals(layers={"missing": ExitIntent.DELETE})) sink = InMemoryRunEventSink() - with pytest.raises(AgentRunValidationError, match="missing"): - asyncio.run(AgentRunRunner(sink=sink, request=request, run_id="run-unknown-signal").run()) + async def scenario() -> None: + async with httpx.AsyncClient() as client: + with pytest.raises(AgentRunValidationError, match="missing"): + await AgentRunRunner( + sink=sink, + request=request, + run_id="run-unknown-signal", + plugin_daemon_http_client=client, + ).run() + + asyncio.run(scenario()) assert [event.type for event in sink.events["run-unknown-signal"]] == ["run_started", "run_failed"] assert sink.statuses["run-unknown-signal"] == "failed" -def test_runner_applies_layer_exit_signals_before_model_resolution_failure(monkeypatch: pytest.MonkeyPatch) -> None: - def fake_get_model(self: DifyPluginLLMLayer, control: LayerControl): - plugin_control = control.control_for(self.deps.plugin) - assert control.exit_intent is ExitIntent.DELETE - assert plugin_control.exit_intent is ExitIntent.SUSPEND - raise RuntimeError("model unavailable") - - monkeypatch.setattr(DifyPluginLLMLayer, "get_model", fake_get_model) - request = _request( - layer_exit_signals=LayerExitSignals( - default=ExitIntent.DELETE, - layers={"plugin": ExitIntent.SUSPEND}, - ) - ) +def test_runner_honors_explicit_empty_layer_providers() -> None: + request = _request() sink = InMemoryRunEventSink() - with pytest.raises(AgentRunValidationError, match="model unavailable"): - asyncio.run(AgentRunRunner(sink=sink, request=request, run_id="run-model-failure").run()) + async def scenario() -> None: + async with httpx.AsyncClient() as client: + with pytest.raises(AgentRunValidationError, match="plain.prompt"): + await AgentRunRunner( + sink=sink, + request=request, + run_id="run-empty-providers", + plugin_daemon_http_client=client, + layer_providers=(), + ).run() - assert [event.type for event in sink.events["run-model-failure"]] == ["run_started", "run_failed"] - assert sink.statuses["run-model-failure"] == "failed" + asyncio.run(scenario()) + + assert [event.type for event in sink.events["run-empty-providers"]] == ["run_started", "run_failed"] + assert sink.statuses["run-empty-providers"] == "failed" def test_runner_fails_empty_user_prompts() -> None: request = _request("") sink = InMemoryRunEventSink() - with pytest.raises(AgentRunValidationError): - asyncio.run(AgentRunRunner(sink=sink, request=request, run_id="run-2").run()) + async def scenario() -> None: + async with httpx.AsyncClient() as client: + with pytest.raises(AgentRunValidationError): + await AgentRunRunner( + sink=sink, + request=request, + run_id="run-2", + plugin_daemon_http_client=client, + ).run() + + asyncio.run(scenario()) assert [event.type for event in sink.events["run-2"]] == ["run_started", "run_failed"] assert sink.statuses["run-2"] == "failed" @@ -161,8 +204,17 @@ def test_runner_fails_blank_string_user_prompt_list() -> None: request = _request(["", " "]) sink = InMemoryRunEventSink() - with pytest.raises(AgentRunValidationError): - asyncio.run(AgentRunRunner(sink=sink, request=request, run_id="run-3").run()) + async def scenario() -> None: + async with httpx.AsyncClient() as client: + with pytest.raises(AgentRunValidationError): + await AgentRunRunner( + sink=sink, + request=request, + run_id="run-3", + plugin_daemon_http_client=client, + ).run() + + asyncio.run(scenario()) assert [event.type for event in sink.events["run-3"]] == ["run_started", "run_failed"] assert sink.statuses["run-3"] == "failed" @@ -172,8 +224,56 @@ def test_runner_requires_llm_layer_id() -> None: request = _request(llm_layer_name="not-llm") sink = InMemoryRunEventSink() - with pytest.raises(AgentRunValidationError, match="llm"): - asyncio.run(AgentRunRunner(sink=sink, request=request, run_id="run-4").run()) + async def scenario() -> None: + async with httpx.AsyncClient() as client: + with pytest.raises(AgentRunValidationError, match="llm"): + await AgentRunRunner( + sink=sink, + request=request, + run_id="run-4", + plugin_daemon_http_client=client, + ).run() + + asyncio.run(scenario()) assert [event.type for event in sink.events["run-4"]] == ["run_started", "run_failed"] assert sink.statuses["run-4"] == "failed" + + +def test_runner_rejects_closed_session_snapshot_as_validation_error() -> None: + request = _request() + request.session_snapshot = CompositorSessionSnapshot( + layers=[ + LayerSessionSnapshot( + name="prompt", + lifecycle_state=LifecycleState.CLOSED, + runtime_state={}, + ), + LayerSessionSnapshot( + name="plugin", + lifecycle_state=LifecycleState.NEW, + runtime_state={}, + ), + LayerSessionSnapshot( + name=DIFY_AGENT_MODEL_LAYER_ID, + lifecycle_state=LifecycleState.NEW, + runtime_state={}, + ), + ] + ) + sink = InMemoryRunEventSink() + + async def scenario() -> None: + async with httpx.AsyncClient() as client: + with pytest.raises(AgentRunValidationError, match="CLOSED snapshots cannot be entered"): + await AgentRunRunner( + sink=sink, + request=request, + run_id="run-closed-snapshot", + plugin_daemon_http_client=client, + ).run() + + asyncio.run(scenario()) + + assert [event.type for event in sink.events["run-closed-snapshot"]] == ["run_started", "run_failed"] + assert sink.statuses["run-closed-snapshot"] == "failed" diff --git a/dify-agent/tests/local/dify_agent/server/test_app.py b/dify-agent/tests/local/dify_agent/server/test_app.py index 580263f96c..73bfde69bd 100644 --- a/dify-agent/tests/local/dify_agent/server/test_app.py +++ b/dify-agent/tests/local/dify_agent/server/test_app.py @@ -1,11 +1,15 @@ +from __future__ import annotations + +from typing import ClassVar + import pytest from fastapi.testclient import TestClient import dify_agent.server.app as app_module -from agenton.compositor import LayerRegistry +from dify_agent.runtime.compositor_factory import DifyAgentLayerProvider from dify_agent.layers.dify_plugin.configs import DifyPluginLayerConfig from dify_agent.layers.dify_plugin.plugin_layer import DifyPluginLayer -from dify_agent.server.app import create_app +from dify_agent.server.app import create_app, create_plugin_daemon_http_client from dify_agent.server.settings import ServerSettings from dify_agent.storage.redis_run_store import RedisRunStore @@ -25,19 +29,22 @@ class FakeRunScheduler: store: object shutdown_grace_seconds: float - layer_registry: LayerRegistry + layer_providers: tuple[DifyAgentLayerProvider, ...] + plugin_daemon_http_client: FakePluginDaemonHttpClient shutdown_called: bool def __init__( self, *, store: object, + plugin_daemon_http_client: FakePluginDaemonHttpClient, shutdown_grace_seconds: float, - layer_registry: LayerRegistry, + layer_providers: tuple[DifyAgentLayerProvider, ...], ) -> None: self.store = store self.shutdown_grace_seconds = shutdown_grace_seconds - self.layer_registry = layer_registry + self.layer_providers = layer_providers + self.plugin_daemon_http_client = plugin_daemon_http_client self.shutdown_called = False self.created.append(self) @@ -45,12 +52,80 @@ class FakeRunScheduler: self.shutdown_called = True +class FakePluginDaemonHttpClient: + timeout: object | None + limits: object | None + trust_env: bool | None + is_closed: bool + + def __init__( + self, + *, + timeout: object | None = None, + limits: object | None = None, + trust_env: bool | None = None, + ) -> None: + self.timeout = timeout + self.limits = limits + self.trust_env = trust_env + self.is_closed = False + + async def aclose(self) -> None: + self.is_closed = True + + +class FakeTimeout: + connect: float + read: float + write: float + pool: float + + def __init__(self, *, connect: float, read: float, write: float, pool: float) -> None: + self.connect = connect + self.read = read + self.write = write + self.pool = pool + + +class FakeLimits: + max_connections: int + max_keepalive_connections: int + keepalive_expiry: float + + def __init__(self, *, max_connections: int, max_keepalive_connections: int, keepalive_expiry: float) -> None: + self.max_connections = max_connections + self.max_keepalive_connections = max_keepalive_connections + self.keepalive_expiry = keepalive_expiry + + +class FakeRedisModule: + fake_redis: ClassVar[FakeRedis | None] = None + + @staticmethod + def from_url(_url: str) -> FakeRedis: + assert FakeRedisModule.fake_redis is not None + return FakeRedisModule.fake_redis + + +class FakeHttpxModule: + Timeout: ClassVar[type[FakeTimeout]] = FakeTimeout + Limits: ClassVar[type[FakeLimits]] = FakeLimits + AsyncClient: ClassVar[type[FakePluginDaemonHttpClient]] = FakePluginDaemonHttpClient + + def test_create_app_creates_scheduler_and_closes_after_shutdown(monkeypatch: pytest.MonkeyPatch) -> None: fake_redis = FakeRedis() + fake_http_client = FakePluginDaemonHttpClient() FakeRunScheduler.created.clear() - monkeypatch.setattr(app_module.Redis, "from_url", lambda _url: fake_redis) + FakeRedisModule.fake_redis = fake_redis + monkeypatch.setattr(app_module, "Redis", FakeRedisModule) monkeypatch.setattr(app_module, "RunScheduler", FakeRunScheduler) + def fake_create_plugin_daemon_http_client(_settings: ServerSettings) -> FakePluginDaemonHttpClient: + return fake_http_client + + monkeypatch.setattr(app_module, "create_plugin_daemon_http_client", fake_create_plugin_daemon_http_client) + settings = ServerSettings( redis_url="redis://example.invalid/0", redis_prefix="test", @@ -58,24 +133,53 @@ def test_create_app_creates_scheduler_and_closes_after_shutdown(monkeypatch: pyt run_retention_seconds=7, plugin_daemon_url="http://plugin-daemon", plugin_daemon_api_key="daemon-secret", - plugin_daemon_timeout=12, + plugin_daemon_connect_timeout=1, + plugin_daemon_read_timeout=2, + plugin_daemon_write_timeout=3, + plugin_daemon_pool_timeout=4, + plugin_daemon_max_connections=5, + plugin_daemon_max_keepalive_connections=3, + plugin_daemon_keepalive_expiry=6, ) with TestClient(create_app(settings)): assert len(FakeRunScheduler.created) == 1 scheduler = FakeRunScheduler.created[0] assert scheduler.shutdown_grace_seconds == 5 - assert isinstance(scheduler.layer_registry, LayerRegistry) - descriptor = scheduler.layer_registry.resolve("dify.plugin") - assert descriptor.factory is not None - plugin_layer = descriptor.factory(DifyPluginLayerConfig(tenant_id="tenant-1", plugin_id="plugin-1")) + layer_providers = scheduler.layer_providers + assert isinstance(layer_providers, tuple) + plugin_provider = next(provider for provider in layer_providers if provider.type_id == "dify.plugin") + plugin_layer = plugin_provider.create_layer(DifyPluginLayerConfig(tenant_id="tenant-1", plugin_id="plugin-1")) assert isinstance(plugin_layer, DifyPluginLayer) assert plugin_layer.daemon_url == "http://plugin-daemon" assert plugin_layer.daemon_api_key == "daemon-secret" - assert plugin_layer.timeout == 12 + http_client = scheduler.plugin_daemon_http_client + assert http_client is fake_http_client + assert http_client.is_closed is False store = scheduler.store assert isinstance(store, RedisRunStore) assert store.run_retention_seconds == 7 assert FakeRunScheduler.created[0].shutdown_called is True + assert FakeRunScheduler.created[0].plugin_daemon_http_client.is_closed is True assert fake_redis.closed is True + + +def test_create_plugin_daemon_http_client_uses_configured_httpx_construction_args( + monkeypatch: pytest.MonkeyPatch, +) -> None: + monkeypatch.setattr(app_module, "httpx", FakeHttpxModule) + + client = create_plugin_daemon_http_client(ServerSettings()) + + assert isinstance(client, FakePluginDaemonHttpClient) + assert isinstance(client.timeout, FakeTimeout) + assert client.timeout.connect == 10 + assert client.timeout.read == 600 + assert client.timeout.write == 30 + assert client.timeout.pool == 10 + assert isinstance(client.limits, FakeLimits) + assert client.limits.max_connections == 100 + assert client.limits.max_keepalive_connections == 20 + assert client.limits.keepalive_expiry == 30 + assert client.trust_env is False diff --git a/dify-agent/tests/local/dify_agent/server/test_runs_routes.py b/dify-agent/tests/local/dify_agent/server/test_runs_routes.py index 90e579d331..c173816a51 100644 --- a/dify-agent/tests/local/dify_agent/server/test_runs_routes.py +++ b/dify-agent/tests/local/dify_agent/server/test_runs_routes.py @@ -1,14 +1,15 @@ from fastapi.testclient import TestClient from dify_agent.protocol import DIFY_AGENT_MODEL_LAYER_ID -from dify_agent.runtime.run_scheduler import SchedulerStoppingError +from dify_agent.runtime.run_scheduler import RunRequestValidationError, SchedulerStoppingError from dify_agent.server.routes.runs import create_runs_router from dify_agent.server.schemas import RunRecord class FakeScheduler: async def create_run(self, request: object) -> object: - raise AssertionError("blank prompt requests must be rejected before scheduling") + del request + raise RunRequestValidationError("run.user_prompts must not be empty") class FakeStore: @@ -27,7 +28,7 @@ def test_create_run_rejects_effectively_blank_user_prompt_list() -> None: response = client.post( "/runs", json={ - "compositor": { + "composition": { "schema_version": 1, "layers": [{"name": "prompt", "type": "plain.prompt", "config": {"user": ["", " "]}}], } @@ -35,7 +36,7 @@ def test_create_run_rejects_effectively_blank_user_prompt_list() -> None: ) assert response.status_code == 422 - assert response.json()["detail"] == "compositor.user_prompts must not be empty" + assert response.json()["detail"] == "run.user_prompts must not be empty" def test_create_run_returns_running_from_scheduler() -> None: @@ -55,7 +56,7 @@ def test_create_run_returns_running_from_scheduler() -> None: response = client.post( "/runs", json={ - "compositor": { + "composition": { "schema_version": 1, "layers": [{"name": "prompt", "type": "plain.prompt", "config": {"user": "hello"}}], } @@ -83,7 +84,7 @@ def test_create_run_accepts_valid_full_plugin_graph() -> None: response = client.post( "/runs", json={ - "compositor": { + "composition": { "schema_version": 1, "layers": [ {"name": "prompt", "type": "plain.prompt", "config": {"user": "hello"}}, @@ -115,20 +116,25 @@ def test_create_run_accepts_valid_full_plugin_graph() -> None: def test_create_run_rejects_unknown_layer_exit_signal_before_scheduling() -> None: from fastapi import FastAPI + class UnknownSignalScheduler: + async def create_run(self, request: object) -> RunRecord: + del request + raise RunRequestValidationError("on_exit.layers references unknown layer ids: missing.") + app = FastAPI() app.include_router( - create_runs_router(lambda: FakeStore(), lambda: FakeScheduler()) # pyright: ignore[reportArgumentType] + create_runs_router(lambda: FakeStore(), lambda: UnknownSignalScheduler()) # pyright: ignore[reportArgumentType] ) client = TestClient(app) response = client.post( "/runs", json={ - "compositor": { + "composition": { "schema_version": 1, "layers": [{"name": "prompt", "type": "plain.prompt", "config": {"user": "hello"}}], }, - "layer_exit_signals": {"layers": {"missing": "delete"}}, + "on_exit": {"layers": {"missing": "delete"}}, }, ) @@ -136,6 +142,44 @@ def test_create_run_rejects_unknown_layer_exit_signal_before_scheduling() -> Non assert "missing" in response.json()["detail"] +def test_create_run_rejects_closed_session_snapshot_with_422() -> None: + from fastapi import FastAPI + + class ClosedSnapshotScheduler: + async def create_run(self, request: object) -> RunRecord: + del request + raise RunRequestValidationError("Layer 'prompt' is closed; CLOSED snapshots cannot be entered.") + + app = FastAPI() + app.include_router( + create_runs_router(lambda: FakeStore(), lambda: ClosedSnapshotScheduler()) # pyright: ignore[reportArgumentType] + ) + client = TestClient(app) + + response = client.post( + "/runs", + json={ + "composition": { + "schema_version": 1, + "layers": [{"name": "prompt", "type": "plain.prompt", "config": {"user": "hello"}}], + }, + "session_snapshot": { + "schema_version": 1, + "layers": [ + { + "name": "prompt", + "lifecycle_state": "closed", + "runtime_state": {}, + } + ], + }, + }, + ) + + assert response.status_code == 422 + assert "CLOSED snapshots cannot be entered" in response.json()["detail"] + + def test_create_run_returns_503_when_scheduler_is_stopping() -> None: from fastapi import FastAPI @@ -153,7 +197,7 @@ def test_create_run_returns_503_when_scheduler_is_stopping() -> None: response = client.post( "/runs", json={ - "compositor": { + "composition": { "schema_version": 1, "layers": [{"name": "prompt", "type": "plain.prompt", "config": {"user": "hello"}}], } @@ -181,7 +225,7 @@ def test_create_run_does_not_map_infrastructure_failure_to_422() -> None: response = client.post( "/runs", json={ - "compositor": { + "composition": { "schema_version": 1, "layers": [{"name": "prompt", "type": "plain.prompt", "config": {"user": "hello"}}], } diff --git a/dify-agent/tests/local/dify_agent/storage/test_redis_run_store.py b/dify-agent/tests/local/dify_agent/storage/test_redis_run_store.py index ac05df3802..f835652536 100644 --- a/dify-agent/tests/local/dify_agent/storage/test_redis_run_store.py +++ b/dify-agent/tests/local/dify_agent/storage/test_redis_run_store.py @@ -35,7 +35,9 @@ class FakeRedis: entries.append((event_id, dict(fields))) return event_id - async def xrange(self, key: str, *, min: str = "-", count: int | None = None) -> list[tuple[str, dict[str, object]]]: + async def xrange( + self, key: str, *, min: str = "-", count: int | None = None + ) -> list[tuple[str, dict[str, object]]]: self.commands.append(("xrange", key, min, count)) entries = [entry for entry in self.streams.get(key, []) if self._is_after_min(entry[0], min)] if count is not None: @@ -114,7 +116,7 @@ def test_get_events_round_trips_run_succeeded_output_and_session_snapshot() -> N layers=[ LayerSessionSnapshot( name="prompt", - state=LifecycleState.SUSPENDED, + lifecycle_state=LifecycleState.SUSPENDED, runtime_state={"resource_id": "abc"}, ) ] diff --git a/dify-agent/tests/local/examples/test_agenton_examples.py b/dify-agent/tests/local/examples/test_agenton_examples.py index 23b5aa428c..545f5e6ef5 100644 --- a/dify-agent/tests/local/examples/test_agenton_examples.py +++ b/dify-agent/tests/local/examples/test_agenton_examples.py @@ -56,4 +56,4 @@ def test_agenton_session_snapshot_example_smoke() -> None: assert result.returncode == 0, result.stderr assert "Snapshot:" in result.stdout - assert "Rehydrated handle: restored:demo-connection" in result.stdout + assert "Rehydrated external handle: restored:demo-connection" in result.stdout