From 8d96f6cfbd8c10edfc5b37d2ceac350beb415cde Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=9B=90=E7=B2=92=20Yanli?= Date: Tue, 12 May 2026 01:46:39 +0800 Subject: [PATCH] refactor dify-agent run success protocol --- dify-agent/docs/agenton/api/index.md | 9 ++- dify-agent/docs/agenton/guide/index.md | 8 +- dify-agent/docs/dify-agent/api/index.md | 58 ++++++++------- dify-agent/docs/dify-agent/guide/index.md | 16 ++-- .../run_server_consumer.py | 32 ++++---- .../run_server_sync_client.py | 32 ++++---- dify-agent/src/agenton/compositor/__init__.py | 15 ++-- dify-agent/src/agenton/layers/__init__.py | 4 + dify-agent/src/agenton/layers/base.py | 60 +++++++++++++-- dify-agent/src/agenton/layers/types.py | 7 +- .../agenton_collections/layers/plain/basic.py | 10 ++- dify-agent/src/dify_agent/client/_client.py | 30 ++++---- .../src/dify_agent/protocol/__init__.py | 8 +- dify-agent/src/dify_agent/protocol/schemas.py | 40 +++------- .../src/dify_agent/runtime/agent_factory.py | 19 +++-- .../src/dify_agent/runtime/event_sink.py | 42 ++++------- dify-agent/src/dify_agent/runtime/runner.py | 29 ++++++-- .../compositor/test_builder_snapshot.py | 21 +++++- .../agenton/layers/test_schema_inference.py | 16 +++- .../local/dify_agent/client/test_client.py | 33 ++++++--- .../protocol/test_protocol_schemas.py | 21 +++++- .../local/dify_agent/runtime/test_runner.py | 10 ++- .../storage/test_redis_run_store.py | 73 ++++++++++++++++++- 23 files changed, 385 insertions(+), 208 deletions(-) diff --git a/dify-agent/docs/agenton/api/index.md b/dify-agent/docs/agenton/api/index.md index a74fbd379c..486638577d 100644 --- a/dify-agent/docs/agenton/api/index.md +++ b/dify-agent/docs/agenton/api/index.md @@ -12,7 +12,7 @@ Framework-neutral base class for prompt/tool layers. Class attributes: - `type_id: str | None`: registry id for config-backed plugin layers. -- `config_type: type[BaseModel]`: Pydantic schema for serialized layer config. +- `config_type: type[LayerConfig]`: Pydantic schema for serialized layer config. - `runtime_state_type: type[BaseModel]`: Pydantic schema for snapshot-safe per-session state. - `runtime_handles_type: type[BaseModel]`: Pydantic schema for live runtime @@ -74,6 +74,8 @@ serialized and should be rehydrated from runtime state in resume hooks. ### Schema defaults and lifecycle enums - `EmptyLayerConfig` +- `LayerConfig`: base DTO for serializable layer config schemas +- `LayerConfigValue`: JSON value or concrete `LayerConfig` DTO - `EmptyRuntimeState` - `EmptyRuntimeHandles` - `LifecycleState`: `NEW`, `ACTIVE`, `SUSPENDED`, `CLOSED` @@ -97,8 +99,9 @@ Tagged aggregate item types: - `LayerNodeConfig`: `name`, `type`, `config`, `deps`, `metadata` - `CompositorConfig`: `schema_version`, `layers` -Config nodes are pure serializable graph input. Use live instances for Python -objects and callables. +Config nodes are pure serializable graph input. `LayerNodeConfig.config` accepts +plain JSON values or concrete `LayerConfig` DTO instances and serializes DTOs as +JSON objects. Use live instances for Python objects and callables. ### Registry diff --git a/dify-agent/docs/agenton/guide/index.md b/dify-agent/docs/agenton/guide/index.md index a2a9eaa79e..7485843a8f 100644 --- a/dify-agent/docs/agenton/guide/index.md +++ b/dify-agent/docs/agenton/guide/index.md @@ -8,8 +8,8 @@ on the `LayerControl` created for that layer in a `CompositorSession`. ## Config, runtime state, and runtime handles - **Config** is serializable graph input. Config-constructible layers declare a - `type_id` and a Pydantic `config_type`; builders validate node config before - calling `Layer.from_config(validated_config)`. + `type_id` and a Pydantic `LayerConfig` schema; builders validate node config + before calling `Layer.from_config(validated_config)`. - **Runtime state** is serializable per-layer/per-session state. Layers declare a Pydantic `runtime_state_type`; session snapshots persist this model with `model_dump(mode="json")`. @@ -20,11 +20,11 @@ on the `LayerControl` created for that layer in a `CompositorSession`. ## Define a config-backed layer -Use a Pydantic model for config and pass it through the typed layer family so +Use a `LayerConfig` model for config and pass it through the typed layer family so `Layer.__init_subclass__` can infer the schema: ```python {test="skip" lint="skip"} -class GreetingConfig(BaseModel): +class GreetingConfig(LayerConfig): prefix: str model_config = ConfigDict(extra="forbid") diff --git a/dify-agent/docs/dify-agent/api/index.md b/dify-agent/docs/dify-agent/api/index.md index c61e94bed8..5185ea6d97 100644 --- a/dify-agent/docs/dify-agent/api/index.md +++ b/dify-agent/docs/dify-agent/api/index.md @@ -164,36 +164,39 @@ Use `dify_agent.client.Client` for both async and sync code. Async methods use normal names; sync methods add `_sync`. ```python {test="skip" lint="skip"} +from agenton.compositor import CompositorConfig, LayerNodeConfig +from agenton_collections.layers.plain import PromptLayerConfig from dify_agent.client import Client +from dify_agent.protocol import CreateRunRequest async def main() -> None: - async with Client(base_url="http://localhost:8000") as client: - run = await client.create_run( - { - "compositor": { - "schema_version": 1, - "layers": [{"name": "prompt", "type": "plain.prompt", "config": {"user": "hello"}}], - } - } + request = CreateRunRequest( + compositor=CompositorConfig( + layers=[LayerNodeConfig(name="prompt", type="plain.prompt", config=PromptLayerConfig(user="hello"))] ) + ) + async with Client(base_url="http://localhost:8000") as client: + run = await client.create_run(request) async for event in client.stream_events(run.run_id): print(event) ``` ```python {test="skip" lint="skip"} +from agenton.compositor import CompositorConfig, LayerNodeConfig +from agenton_collections.layers.plain import PromptLayerConfig from dify_agent.client import Client +from dify_agent.protocol import CreateRunRequest +request = CreateRunRequest( + compositor=CompositorConfig( + layers=[LayerNodeConfig(name="prompt", type="plain.prompt", config=PromptLayerConfig(user="hello"))] + ) +) + with Client(base_url="http://localhost:8000") as client: - run = client.create_run_sync( - { - "compositor": { - "schema_version": 1, - "layers": [{"name": "prompt", "type": "plain.prompt", "config": {"user": "hello"}}], - } - } - ) + run = client.create_run_sync(request) terminal = client.wait_run_sync(run.run_id) ``` @@ -201,8 +204,9 @@ with Client(base_url="http://localhost:8000") as client: They reconnect by default from the latest yielded event id and stop after `run_succeeded` or `run_failed`. They do not reconnect for HTTP 4xx responses, DTO validation failures, or malformed SSE frames. `create_run` and -`create_run_sync` never retry `POST /runs`; if a timeout occurs, the caller must -decide whether to inspect existing runs or submit a new run. +`create_run_sync` require a `CreateRunRequest` DTO and never retry `POST /runs`; +if a timeout occurs, the caller must decide whether to inspect existing runs or +submit a new run. ## Event types and order @@ -210,9 +214,7 @@ A normal successful run emits: 1. `run_started` 2. zero or more `pydantic_ai_event` -3. `agent_output` -4. `session_snapshot` -5. `run_succeeded` +3. `run_succeeded` A failed run emits: @@ -220,13 +222,13 @@ A failed run emits: 2. zero or more `pydantic_ai_event` 3. `run_failed` -Each event keeps the same envelope shape and has typed `data`: `run_started` and -`run_succeeded` use `{}`, `pydantic_ai_event` uses Pydantic AI's -`AgentStreamEvent` union, `agent_output` uses `{ "output": string }`, -`session_snapshot` uses `CompositorSessionSnapshot`, and `run_failed` uses -`{ "error": string, "reason": string | null }`. The session snapshot can be sent -as `session_snapshot` in a later create-run request with the same compositor layer -names and order. +Each event keeps the same envelope shape and has typed `data`: `run_started` uses +`{}`, `pydantic_ai_event` uses Pydantic AI's `AgentStreamEvent` union, +`run_succeeded` uses `{ "output": JsonValue, "session_snapshot": +CompositorSessionSnapshot }`, and `run_failed` uses `{ "error": string, +"reason": string | null }`. The session snapshot from `run_succeeded.data` can +be sent as `session_snapshot` in a later create-run request with the same +compositor layer names and order. ## Consumer examples diff --git a/dify-agent/docs/dify-agent/guide/index.md b/dify-agent/docs/dify-agent/guide/index.md index afff367b65..4e82e7c741 100644 --- a/dify-agent/docs/dify-agent/guide/index.md +++ b/dify-agent/docs/dify-agent/guide/index.md @@ -91,8 +91,8 @@ effective prompts are rejected during create-run validation before the run is persisted or scheduled. There is no Pydantic AI history layer. To resume Agenton layer state, pass the -`session_snapshot` emitted by a previous run together with a compositor that has -the same layer names and order. +`session_snapshot` from a previous `run_succeeded.data` payload together with a +compositor that has the same layer names and order. ## Observing runs @@ -107,12 +107,12 @@ progress: `id` is the event Redis Stream ID. `after` query cursors take precedence over `Last-Event-ID` headers. -Successful runs emit `run_started`, zero or more `pydantic_ai_event`, -`agent_output`, `session_snapshot`, and `run_succeeded`. Failed runs end with -`run_failed`. Event envelopes retain `id`, `run_id`, `type`, `data`, and -`created_at`; `data` is typed per event type, including Pydantic AI's -`AgentStreamEvent` payload for `pydantic_ai_event` and `CompositorSessionSnapshot` -for `session_snapshot`. +Successful runs emit `run_started`, zero or more `pydantic_ai_event`, and +`run_succeeded`. Failed runs end with `run_failed`. Event envelopes retain `id`, +`run_id`, `type`, `data`, and `created_at`; `data` is typed per event type, +including Pydantic AI's `AgentStreamEvent` payload for `pydantic_ai_event` and a +terminal `run_succeeded.data` object containing JSON-safe `output` plus a +`CompositorSessionSnapshot` for resumption. ## Examples diff --git a/dify-agent/examples/dify_agent/dify_agent_examples/run_server_consumer.py b/dify-agent/examples/dify_agent/dify_agent_examples/run_server_consumer.py index 4955629646..48c3168767 100644 --- a/dify-agent/examples/dify_agent/dify_agent_examples/run_server_consumer.py +++ b/dify-agent/examples/dify_agent/dify_agent_examples/run_server_consumer.py @@ -13,7 +13,10 @@ recover after client-side uncertainty. import asyncio +from agenton.compositor import CompositorConfig, LayerNodeConfig +from agenton_collections.layers.plain import PromptLayerConfig from dify_agent.client import Client +from dify_agent.protocol import AgentProfileConfig, CreateRunRequest API_BASE_URL = "http://localhost:8000" @@ -22,22 +25,21 @@ API_BASE_URL = "http://localhost:8000" async def main() -> None: async with Client(base_url=API_BASE_URL) as client: run = await client.create_run( - { - "compositor": { - "schema_version": 1, - "layers": [ - { - "name": "prompt", - "type": "plain.prompt", - "config": { - "prefix": "You are a concise assistant.", - "user": "Say hello from the Dify Agent API server example.", - }, - } + CreateRunRequest( + compositor=CompositorConfig( + layers=[ + LayerNodeConfig( + name="prompt", + type="plain.prompt", + config=PromptLayerConfig( + prefix="You are a concise assistant.", + user="Say hello from the Dify Agent API server example.", + ), + ) ], - }, - "agent_profile": {"provider": "test", "output_text": "Hello from the example TestModel."}, - } + ), + agent_profile=AgentProfileConfig(output_text="Hello from the example TestModel."), + ) ) print("created run", run) diff --git a/dify-agent/examples/dify_agent/dify_agent_examples/run_server_sync_client.py b/dify-agent/examples/dify_agent/dify_agent_examples/run_server_sync_client.py index 43f63de943..cf821c690c 100644 --- a/dify-agent/examples/dify_agent/dify_agent_examples/run_server_sync_client.py +++ b/dify-agent/examples/dify_agent/dify_agent_examples/run_server_sync_client.py @@ -5,7 +5,10 @@ does not retry ``POST /runs``; if a timeout occurs, inspect server state or crea a new run explicitly rather than assuming the original request was not accepted. """ +from agenton.compositor import CompositorConfig, LayerNodeConfig +from agenton_collections.layers.plain import PromptLayerConfig from dify_agent.client import Client +from dify_agent.protocol import AgentProfileConfig, CreateRunRequest API_BASE_URL = "http://localhost:8000" @@ -14,22 +17,21 @@ API_BASE_URL = "http://localhost:8000" def main() -> None: with Client(base_url=API_BASE_URL) as client: run = client.create_run_sync( - { - "compositor": { - "schema_version": 1, - "layers": [ - { - "name": "prompt", - "type": "plain.prompt", - "config": { - "prefix": "You are a concise assistant.", - "user": "Say hello from the synchronous Dify Agent client example.", - }, - } + CreateRunRequest( + compositor=CompositorConfig( + layers=[ + LayerNodeConfig( + name="prompt", + type="plain.prompt", + config=PromptLayerConfig( + prefix="You are a concise assistant.", + user="Say hello from the synchronous Dify Agent client example.", + ), + ) ], - }, - "agent_profile": {"provider": "test", "output_text": "Hello from the sync TestModel."}, - } + ), + agent_profile=AgentProfileConfig(output_text="Hello from the sync TestModel."), + ) ) print("created run", run) terminal = client.wait_run_sync(run.run_id, poll_interval_seconds=0.5) diff --git a/dify-agent/src/agenton/compositor/__init__.py b/dify-agent/src/agenton/compositor/__init__.py index 05fa1ccbde..95ce832635 100644 --- a/dify-agent/src/agenton/compositor/__init__.py +++ b/dify-agent/src/agenton/compositor/__init__.py @@ -18,8 +18,11 @@ collected in reverse. User prompts are collected from first to last layer so the composed user message preserves graph order. Serializable graph config uses registry type ids rather than import paths. -``CompositorBuilder`` resolves config nodes through ``LayerRegistry`` and can -mix those nodes with live layer instances for Python objects and callables. +``LayerNodeConfig.config`` accepts plain JSON values and ``LayerConfig`` DTO +instances; JSON serialization preserves concrete DTO fields before the builder +validates them with the registered layer schema. ``CompositorBuilder`` resolves +config nodes through ``LayerRegistry`` and can mix those nodes with live layer +instances for Python objects and callables. ``Compositor.enter`` enters layers in compositor order and exits them in reverse order through ``AsyncExitStack``. It accepts an optional ``CompositorSession`` @@ -43,7 +46,7 @@ from typing import Any, Generic, Mapping, TypedDict, cast from pydantic import BaseModel, ConfigDict, Field, JsonValue from typing_extensions import Self, TypeVar -from agenton.layers.base import Layer, LayerControl, LifecycleState +from agenton.layers.base import Layer, LayerConfig, LayerConfigValue, LayerControl, LifecycleState from agenton.layers.types import AllPromptTypes, AllToolTypes, AllUserPromptTypes PromptT = TypeVar("PromptT", default=AllPromptTypes) @@ -92,7 +95,7 @@ class LayerNodeConfig(BaseModel): name: str type: str - config: JsonValue = Field(default_factory=dict) + config: LayerConfigValue = Field(default_factory=dict) deps: Mapping[str, str] = Field(default_factory=dict) metadata: Mapping[str, JsonValue] = Field(default_factory=dict) @@ -126,7 +129,7 @@ class LayerDescriptor: type_id: str layer_type: type[Layer[Any, Any, Any, Any, Any, Any, Any]] - config_type: type[BaseModel] + config_type: type[LayerConfig] runtime_state_type: type[BaseModel] runtime_handles_type: type[BaseModel] @@ -279,7 +282,7 @@ class CompositorBuilder: *, name: str, type: str, - config: object | None = None, + config: LayerConfigValue | None = None, deps: Mapping[str, str] | None = None, ) -> Self: """Resolve, validate, and add one registry-backed layer config node.""" diff --git a/dify-agent/src/agenton/layers/__init__.py b/dify-agent/src/agenton/layers/__init__.py index 4a24b7de66..d86695f17f 100644 --- a/dify-agent/src/agenton/layers/__init__.py +++ b/dify-agent/src/agenton/layers/__init__.py @@ -11,6 +11,8 @@ from agenton.layers.base import ( EmptyRuntimeState, ExitIntent, Layer, + LayerConfig, + LayerConfigValue, LayerControl, LayerDeps, LifecycleState, @@ -41,6 +43,8 @@ __all__ = [ "AllToolTypes", "AllUserPromptTypes", "Layer", + "LayerConfig", + "LayerConfigValue", "LayerDeps", "LayerControl", "LifecycleState", diff --git a/dify-agent/src/agenton/layers/base.py b/dify-agent/src/agenton/layers/base.py index fb5f6e1919..49570b86c3 100644 --- a/dify-agent/src/agenton/layers/base.py +++ b/dify-agent/src/agenton/layers/base.py @@ -9,6 +9,10 @@ serializable runtime state, and live runtime handles. The base class infers while still allowing subclasses to set them explicitly for unusual inheritance patterns. +``LayerConfig`` is the DTO base for config schemas that can be embedded directly +in serializable compositor config. Runtime state and handle schemas remain plain +Pydantic models because they are not accepted as graph input. + ``Layer.bind_deps`` is the mutation point for dependency state. Layer implementations should treat ``self.deps`` as unavailable until a compositor or caller has resolved and bound dependencies. @@ -40,7 +44,7 @@ from enum import StrEnum from types import UnionType from typing import Any, ClassVar, Generic, Mapping, Sequence, Union, cast, get_args, get_origin, get_type_hints -from pydantic import BaseModel, ConfigDict +from pydantic import BaseModel, ConfigDict, JsonValue, SerializeAsAny from typing_extensions import Self, TypeVar @@ -48,7 +52,24 @@ _DepsT = TypeVar("_DepsT", bound="LayerDeps") _PromptT = TypeVar("_PromptT") _UserPromptT = TypeVar("_UserPromptT") _ToolT = TypeVar("_ToolT") -_ConfigT = TypeVar("_ConfigT", bound=BaseModel, default="EmptyLayerConfig") + + +class LayerConfig(BaseModel): + """Base DTO for serializable layer configuration. + + Subclasses are safe to place in ``LayerNodeConfig.config``. The compositor + still accepts plain JSON values for wire input, but typed Python call sites can + use concrete ``LayerConfig`` subclasses and preserve their fields during JSON + serialization. + """ + + model_config = ConfigDict(extra="forbid") + + +type LayerConfigValue = JsonValue | SerializeAsAny[LayerConfig] + + +_ConfigT = TypeVar("_ConfigT", bound=LayerConfig, default="EmptyLayerConfig") _RuntimeStateT = TypeVar("_RuntimeStateT", bound=BaseModel, default="EmptyRuntimeState") _RuntimeHandlesT = TypeVar("_RuntimeHandlesT", bound=BaseModel, default="EmptyRuntimeHandles") @@ -93,7 +114,7 @@ class NoLayerDeps(LayerDeps): """Dependency container for layers that do not require other layers.""" -class EmptyLayerConfig(BaseModel): +class EmptyLayerConfig(LayerConfig): """Default serializable config schema for layers without config.""" model_config = ConfigDict(extra="forbid") @@ -194,7 +215,7 @@ class Layer( deps_type: type[_DepsT] deps: _DepsT type_id: ClassVar[str | None] = None - config_type: ClassVar[type[BaseModel]] = EmptyLayerConfig + config_type: ClassVar[type[LayerConfig]] = EmptyLayerConfig runtime_state_type: ClassVar[type[BaseModel]] = EmptyRuntimeState runtime_handles_type: ClassVar[type[BaseModel]] = EmptyRuntimeHandles @@ -213,7 +234,7 @@ class Layer( if not isinstance(deps_type, type) or not issubclass(deps_type, LayerDeps): raise TypeError(f"{cls.__name__}.deps_type must be a LayerDeps subclass.") _get_dep_specs(deps_type) - _init_schema_type(cls, "config_type", _infer_schema_type(cls, 4, "config_type"), EmptyLayerConfig) + _init_config_type(cls, _infer_config_type(cls)) _init_schema_type( cls, "runtime_state_type", @@ -421,6 +442,16 @@ def _infer_schema_type( return schema_type +def _infer_config_type(layer_type: type[Layer[Any, Any, Any, Any, Any, Any, Any]]) -> type[LayerConfig] | None: + inferred = _infer_schema_generic_arg(layer_type, "config_type", {}) or _infer_layer_generic_arg(layer_type, 4, {}) + if inferred is None: + return None + config_type = _as_config_type(inferred) + if config_type is None: + raise TypeError(f"{layer_type.__name__}.config_type must be a LayerConfig subclass.") + return config_type + + def _infer_schema_generic_arg( layer_type: type[Layer[Any, Any, Any, Any, Any, Any, Any]], attr_name: str, @@ -494,6 +525,18 @@ def _init_schema_type( raise TypeError(f"{layer_type.__name__}.{attr_name} must be a Pydantic BaseModel subclass.") +def _init_config_type( + layer_type: type[Layer[Any, Any, Any, Any, Any, Any, Any]], + inferred_config_type: type[LayerConfig] | None, +) -> None: + config_type = layer_type.__dict__.get("config_type") + if config_type is None: + config_type = inferred_config_type or getattr(layer_type, "config_type", EmptyLayerConfig) + setattr(layer_type, "config_type", config_type) + if not isinstance(config_type, type) or not issubclass(config_type, LayerConfig): + raise TypeError(f"{layer_type.__name__}.config_type must be a LayerConfig subclass.") + + def _substitute_type(value: object, substitutions: Mapping[object, object]) -> object: if value in substitutions: return substitutions[value] @@ -542,6 +585,13 @@ def _as_model_type(value: object) -> type[BaseModel] | None: return None +def _as_config_type(value: object) -> type[LayerConfig] | None: + runtime_type = get_origin(value) or value + if isinstance(runtime_type, type) and issubclass(runtime_type, LayerConfig): + return runtime_type + return None + + def _is_generic_layer_template(layer_type: type[Layer[Any, Any, Any, Any, Any, Any, Any]]) -> bool: return bool(getattr(layer_type, "__type_params__", ())) or bool( getattr(layer_type, "__parameters__", ()) diff --git a/dify-agent/src/agenton/layers/types.py b/dify-agent/src/agenton/layers/types.py index e84605f84b..2c9729d68b 100644 --- a/dify-agent/src/agenton/layers/types.py +++ b/dify-agent/src/agenton/layers/types.py @@ -6,7 +6,8 @@ contracts, such as ordinary strings with plain callable tools or pydantic-ai prompt/tool shapes. The families keep the trailing schema generic slots open so concrete layers can have ``config_type``, ``runtime_state_type``, and ``runtime_handles_type`` inferred from type arguments instead of repeated class -attributes. +attributes. Config schemas use ``LayerConfig`` so they can also be embedded as +typed DTOs in serializable compositor config. Tagged aggregate aliases cover code paths that can accept any supported prompt/tool family without changing the plain and pydantic-ai layer contracts. Pydantic-ai names are imported for static analysis only, so ``agenton`` can be @@ -29,7 +30,7 @@ if TYPE_CHECKING: from pydantic import BaseModel -from agenton.layers.base import EmptyLayerConfig, EmptyRuntimeHandles, EmptyRuntimeState, Layer, LayerDeps +from agenton.layers.base import EmptyLayerConfig, EmptyRuntimeHandles, EmptyRuntimeState, Layer, LayerConfig, LayerDeps type PlainPrompt = str type PlainUserPrompt = str @@ -95,7 +96,7 @@ type AllToolTypes = PlainToolType | PydanticAIToolType[Any] _DepsT = TypeVar("_DepsT", bound=LayerDeps) -_ConfigT = TypeVar("_ConfigT", bound=BaseModel, default=EmptyLayerConfig) +_ConfigT = TypeVar("_ConfigT", bound=LayerConfig, default=EmptyLayerConfig) _RuntimeStateT = TypeVar("_RuntimeStateT", bound=BaseModel, default=EmptyRuntimeState) _RuntimeHandlesT = TypeVar("_RuntimeHandlesT", bound=BaseModel, default=EmptyRuntimeHandles) _AgentDepsT = TypeVar("_AgentDepsT") diff --git a/dify-agent/src/agenton_collections/layers/plain/basic.py b/dify-agent/src/agenton_collections/layers/plain/basic.py index 5fbcfdbb11..6c10724175 100644 --- a/dify-agent/src/agenton_collections/layers/plain/basic.py +++ b/dify-agent/src/agenton_collections/layers/plain/basic.py @@ -10,13 +10,14 @@ from collections.abc import Callable, Sequence from dataclasses import dataclass, field from typing import Any -from pydantic import BaseModel, ConfigDict, Field +from pydantic import ConfigDict, Field +from typing_extensions import Self, override -from agenton.layers.base import NoLayerDeps +from agenton.layers.base import LayerConfig, NoLayerDeps from agenton.layers.types import PlainLayer -class PromptLayerConfig(BaseModel): +class PromptLayerConfig(LayerConfig): """Serializable config schema for ``PromptLayer``.""" prefix: list[str] | str = Field(default_factory=list) @@ -48,7 +49,8 @@ class PromptLayer(PlainLayer[NoLayerDeps, PromptLayerConfig]): suffix: list[str] | str = field(default_factory=list) @classmethod - def from_config(cls, config: BaseModel): + @override + def from_config(cls, config: PromptLayerConfig) -> Self: """Create a prompt layer from validated prompt config.""" validated_config = PromptLayerConfig.model_validate(config) return cls(prefix=validated_config.prefix, user=validated_config.user, suffix=validated_config.suffix) diff --git a/dify-agent/src/dify_agent/client/_client.py b/dify-agent/src/dify_agent/client/_client.py index a956f080c3..7760f356aa 100644 --- a/dify-agent/src/dify_agent/client/_client.py +++ b/dify-agent/src/dify_agent/client/_client.py @@ -2,11 +2,12 @@ The client uses the public DTOs from ``dify_agent.protocol.schemas`` for all normal request and response parsing. It intentionally does not retry -``POST /runs`` because create-run is not idempotent. SSE streams are the only -operation with reconnect logic: transient stream/connect/read failures, stream -timeouts, and HTTP 5xx stream responses reconnect with the latest observed event -id, while HTTP 4xx responses, DTO validation failures, and malformed SSE frames -fail immediately. +``POST /runs`` because create-run is not idempotent, and create helpers require a +``CreateRunRequest`` instance rather than accepting raw payload dicts. SSE +streams are the only operation with reconnect logic: transient stream, connect, +or read failures, stream timeouts, and HTTP 5xx stream responses reconnect with +the latest observed event id, while HTTP 4xx responses, DTO validation failures, +and malformed SSE frames fail immediately. """ from __future__ import annotations @@ -241,12 +242,12 @@ class Client: if self._owns_sync_http_client and self._sync_http_client is not None: self.close_sync() - async def create_run(self, request: CreateRunRequest | dict[str, object]) -> CreateRunResponse: + async def create_run(self, request: CreateRunRequest) -> CreateRunResponse: """Create one run and return its accepted status response. - Dict inputs are validated as ``CreateRunRequest`` before the request is - sent. This method performs exactly one ``POST /runs`` attempt and maps - HTTPX timeouts to ``DifyAgentTimeoutError``. + ``request`` must already be a public ``CreateRunRequest`` DTO. This + method performs exactly one ``POST /runs`` attempt and maps HTTPX + timeouts to ``DifyAgentTimeoutError``. """ request_model = _validate_create_run_request(request) try: @@ -262,7 +263,7 @@ class Client: raise DifyAgentClientError(f"create_run request failed: {exc}") from exc return _parse_model_response(response, CreateRunResponse) - def create_run_sync(self, request: CreateRunRequest | dict[str, object]) -> CreateRunResponse: + def create_run_sync(self, request: CreateRunRequest) -> CreateRunResponse: """Synchronous variant of ``create_run`` with the same no-retry contract.""" request_model = _validate_create_run_request(request) try: @@ -549,14 +550,11 @@ class Client: return headers -def _validate_create_run_request(request: CreateRunRequest | dict[str, object]) -> CreateRunRequest: - """Validate user input before creating a run.""" +def _validate_create_run_request(request: CreateRunRequest) -> CreateRunRequest: + """Reject raw payloads so create-run uses the public request DTO boundary.""" if isinstance(request, CreateRunRequest): return request - try: - return CreateRunRequest.model_validate(request) - except ValidationError as exc: - raise DifyAgentValidationError(detail=exc.errors(include_url=False)) from exc + raise DifyAgentValidationError(detail="request must be a CreateRunRequest") def _parse_model_response(response: httpx.Response, model_type: type[_ResponseModelT]) -> _ResponseModelT: diff --git a/dify-agent/src/dify_agent/protocol/__init__.py b/dify-agent/src/dify_agent/protocol/__init__.py index 4c00d7ffeb..1b92896495 100644 --- a/dify-agent/src/dify_agent/protocol/__init__.py +++ b/dify-agent/src/dify_agent/protocol/__init__.py @@ -2,8 +2,6 @@ from .schemas import ( RUN_EVENT_ADAPTER, - AgentOutputRunEvent, - AgentOutputRunEventData, AgentProfileConfig, BaseRunEvent, CreateRunRequest, @@ -19,14 +17,12 @@ from .schemas import ( RunStatus, RunStatusResponse, RunSucceededEvent, - SessionSnapshotRunEvent, + RunSucceededEventData, utc_now, ) __all__ = [ "AgentProfileConfig", - "AgentOutputRunEvent", - "AgentOutputRunEventData", "BaseRunEvent", "CreateRunRequest", "CreateRunResponse", @@ -42,6 +38,6 @@ __all__ = [ "RunStatus", "RunStatusResponse", "RunSucceededEvent", - "SessionSnapshotRunEvent", + "RunSucceededEventData", "utc_now", ] diff --git a/dify-agent/src/dify_agent/protocol/schemas.py b/dify-agent/src/dify_agent/protocol/schemas.py index b7628bb978..b40d7bc856 100644 --- a/dify-agent/src/dify_agent/protocol/schemas.py +++ b/dify-agent/src/dify_agent/protocol/schemas.py @@ -8,13 +8,16 @@ Redis stream ids (or in-memory equivalents in tests) are the public cursors used by polling and SSE replay. Event envelopes keep the public ``id``/``run_id``/``type``/``data``/``created_at`` shape, while each ``type`` has a typed ``data`` model so OpenAPI, Redis replay, and clients parse the same -payload contract. +payload contract. Successful runs publish the final JSON-safe agent output and +the resumable Agenton session snapshot together on the terminal +``run_succeeded`` event so consumers can treat terminal events as complete run +summaries. """ from datetime import datetime, timezone from typing import Annotated, ClassVar, Literal, TypeAlias -from pydantic import BaseModel, ConfigDict, Field, TypeAdapter +from pydantic import BaseModel, ConfigDict, Field, JsonValue, TypeAdapter from pydantic_ai.messages import AgentStreamEvent from agenton.compositor import CompositorConfig, CompositorSessionSnapshot @@ -24,8 +27,6 @@ RunStatus = Literal["running", "succeeded", "failed"] RunEventType = Literal[ "run_started", "pydantic_ai_event", - "agent_output", - "session_snapshot", "run_succeeded", "run_failed", ] @@ -86,10 +87,11 @@ class EmptyRunEventData(BaseModel): model_config: ClassVar[ConfigDict] = ConfigDict(extra="forbid") -class AgentOutputRunEventData(BaseModel): - """Final agent output payload emitted before the session snapshot.""" +class RunSucceededEventData(BaseModel): + """Terminal success payload for final output and resumable session state.""" - output: str + output: JsonValue + session_snapshot: CompositorSessionSnapshot model_config: ClassVar[ConfigDict] = ConfigDict(extra="forbid") @@ -127,25 +129,11 @@ class PydanticAIStreamRunEvent(BaseRunEvent): data: AgentStreamEvent -class AgentOutputRunEvent(BaseRunEvent): - """Run event carrying the final agent output string.""" - - type: Literal["agent_output"] = "agent_output" - data: AgentOutputRunEventData - - -class SessionSnapshotRunEvent(BaseRunEvent): - """Run event carrying the resumable Agenton session snapshot.""" - - type: Literal["session_snapshot"] = "session_snapshot" - data: CompositorSessionSnapshot - - class RunSucceededEvent(BaseRunEvent): - """Terminal success event emitted after output and session snapshot.""" + """Terminal success event carrying the complete successful run result.""" type: Literal["run_succeeded"] = "run_succeeded" - data: EmptyRunEventData = Field(default_factory=EmptyRunEventData) + data: RunSucceededEventData class RunFailedEvent(BaseRunEvent): @@ -158,8 +146,6 @@ class RunFailedEvent(BaseRunEvent): RunEvent: TypeAlias = Annotated[ RunStartedEvent | PydanticAIStreamRunEvent - | AgentOutputRunEvent - | SessionSnapshotRunEvent | RunSucceededEvent | RunFailedEvent, Field(discriminator="type"), @@ -179,8 +165,6 @@ class RunEventsResponse(BaseModel): __all__ = [ "AgentProfileConfig", - "AgentOutputRunEvent", - "AgentOutputRunEventData", "BaseRunEvent", "CreateRunRequest", "CreateRunResponse", @@ -196,6 +180,6 @@ __all__ = [ "RunStatus", "RunStatusResponse", "RunSucceededEvent", - "SessionSnapshotRunEvent", + "RunSucceededEventData", "utc_now", ] diff --git a/dify-agent/src/dify_agent/runtime/agent_factory.py b/dify-agent/src/dify_agent/runtime/agent_factory.py index 59ed9ef359..b3cb686bc1 100644 --- a/dify-agent/src/dify_agent/runtime/agent_factory.py +++ b/dify-agent/src/dify_agent/runtime/agent_factory.py @@ -2,7 +2,9 @@ The initial server exposes only a credential-free ``test`` profile. The factory keeps model selection out of ``AgentRunRunner`` so production model profiles can -be added without changing storage or HTTP contracts. +be added without changing storage or HTTP contracts. Agents are returned through +an ``object`` output boundary because the runner serializes final output to the +public JSON-safe event payload instead of assuming text-only results. """ from collections.abc import Sequence @@ -21,14 +23,17 @@ def create_agent( *, system_prompts: Sequence[PydanticAIPrompt[object]], tools: Sequence[PydanticAITool[object]], -) -> Agent[None, str]: +) -> Agent[None, object]: """Create the pydantic-ai agent for one run.""" if profile.provider == "test": - return Agent[None, str]( - TestModel(custom_output_text=profile.output_text), - output_type=str, - system_prompt=materialize_static_system_prompts(system_prompts), - tools=tools, + return cast( + Agent[None, object], + Agent[None, str]( + TestModel(custom_output_text=profile.output_text), + output_type=str, + system_prompt=materialize_static_system_prompts(system_prompts), + tools=tools, + ), ) raise ValueError(f"Unsupported agent profile provider: {profile.provider}") diff --git a/dify-agent/src/dify_agent/runtime/event_sink.py b/dify-agent/src/dify_agent/runtime/event_sink.py index 70658e26bd..6567189c69 100644 --- a/dify-agent/src/dify_agent/runtime/event_sink.py +++ b/dify-agent/src/dify_agent/runtime/event_sink.py @@ -2,18 +2,20 @@ The runner only needs append-only event writes and status transitions, so tests can use ``InMemoryRunEventSink`` without Redis. Production storage implements the -same protocol with Redis streams in ``dify_agent.storage.redis_run_store``. +same protocol with Redis streams in ``dify_agent.storage.redis_run_store``. The +terminal success helper writes the final JSON-safe output and session snapshot in +one event so event consumers can stop at ``run_succeeded`` without correlating +separate payload events. """ from collections import defaultdict from typing import Protocol +from pydantic import JsonValue from pydantic_ai.messages import AgentStreamEvent from agenton.compositor import CompositorSessionSnapshot from dify_agent.protocol.schemas import ( - AgentOutputRunEvent, - AgentOutputRunEventData, EmptyRunEventData, PydanticAIStreamRunEvent, RunEvent, @@ -22,7 +24,7 @@ from dify_agent.protocol.schemas import ( RunStartedEvent, RunStatus, RunSucceededEvent, - SessionSnapshotRunEvent, + RunSucceededEventData, utc_now, ) @@ -89,34 +91,24 @@ async def emit_pydantic_ai_event(sink: RunEventSink, *, run_id: str, data: Agent ) -async def emit_agent_output(sink: RunEventSink, *, run_id: str, output: str) -> str: - """Emit the final output text produced by the agent.""" +async def emit_run_succeeded( + sink: RunEventSink, + *, + run_id: str, + output: JsonValue, + session_snapshot: CompositorSessionSnapshot, +) -> str: + """Emit the terminal success event with output and resumable state.""" return await emit_run_event( sink, - event=AgentOutputRunEvent( + event=RunSucceededEvent( run_id=run_id, - data=AgentOutputRunEventData(output=output), + data=RunSucceededEventData(output=output, session_snapshot=session_snapshot), created_at=utc_now(), ), ) -async def emit_session_snapshot(sink: RunEventSink, *, run_id: str, data: CompositorSessionSnapshot) -> str: - """Emit the typed Agenton session snapshot for later resumption.""" - return await emit_run_event( - sink, - event=SessionSnapshotRunEvent(run_id=run_id, data=data, created_at=utc_now()), - ) - - -async def emit_run_succeeded(sink: RunEventSink, *, run_id: str) -> str: - """Emit the terminal success lifecycle event.""" - return await emit_run_event( - sink, - event=RunSucceededEvent(run_id=run_id, data=EmptyRunEventData(), created_at=utc_now()), - ) - - async def emit_run_failed( sink: RunEventSink, *, @@ -134,11 +126,9 @@ async def emit_run_failed( __all__ = [ "InMemoryRunEventSink", "RunEventSink", - "emit_agent_output", "emit_pydantic_ai_event", "emit_run_event", "emit_run_failed", "emit_run_started", "emit_run_succeeded", - "emit_session_snapshot", ] diff --git a/dify-agent/src/dify_agent/runtime/runner.py b/dify-agent/src/dify_agent/runtime/runner.py index 102ec7c2de..6e660123fb 100644 --- a/dify-agent/src/dify_agent/runtime/runner.py +++ b/dify-agent/src/dify_agent/runtime/runner.py @@ -3,11 +3,15 @@ The runner is storage-agnostic: it builds an Agenton compositor, enters or resumes its session, runs pydantic-ai with ``compositor.user_prompts`` as the user input, emits stream events, suspends the session on exit, snapshots it, and then -publishes a terminal success or failure event. +publishes a terminal success or failure event. Successful terminal events contain +both the JSON-safe final output and session snapshot; there are no separate output +or snapshot events to correlate. """ from collections.abc import AsyncIterable +from typing import cast +from pydantic import JsonValue, TypeAdapter from pydantic_ai.messages import AgentStreamEvent from agenton.compositor import CompositorSessionSnapshot @@ -16,16 +20,17 @@ from dify_agent.runtime.agent_factory import create_agent, normalize_user_input from dify_agent.runtime.compositor_factory import build_pydantic_ai_compositor from dify_agent.runtime.event_sink import ( RunEventSink, - emit_agent_output, emit_pydantic_ai_event, emit_run_failed, emit_run_started, emit_run_succeeded, - emit_session_snapshot, ) from dify_agent.runtime.user_prompt_validation import EMPTY_USER_PROMPTS_ERROR, has_non_blank_user_prompt +_AGENT_OUTPUT_ADAPTER = TypeAdapter(object) + + class AgentRunValidationError(ValueError): """Raised when a run request is valid JSON but cannot execute.""" @@ -56,12 +61,15 @@ class AgentRunRunner: await self.sink.update_status(self.run_id, "failed", message) raise - _ = await emit_agent_output(self.sink, run_id=self.run_id, output=output) - _ = await emit_session_snapshot(self.sink, run_id=self.run_id, data=session_snapshot) - _ = await emit_run_succeeded(self.sink, run_id=self.run_id) + _ = await emit_run_succeeded( + self.sink, + run_id=self.run_id, + output=output, + session_snapshot=session_snapshot, + ) await self.sink.update_status(self.run_id, "succeeded") - async def _run_agent(self) -> tuple[str, CompositorSessionSnapshot]: + async def _run_agent(self) -> tuple[JsonValue, CompositorSessionSnapshot]: """Run pydantic-ai inside an entered Agenton session.""" compositor = build_pydantic_ai_compositor(self.request.compositor) session = ( @@ -86,7 +94,12 @@ class AgentRunRunner: ) result = await agent.run(normalize_user_input(user_prompts), event_stream_handler=handle_events) - return result.output, compositor.snapshot_session(session) + return _serialize_agent_output(result.output), compositor.snapshot_session(session) + + +def _serialize_agent_output(output: object) -> JsonValue: + """Convert arbitrary pydantic-ai output into the public JSON-safe payload type.""" + return cast(JsonValue, _AGENT_OUTPUT_ADAPTER.dump_python(output, mode="json")) __all__ = ["AgentRunRunner", "AgentRunValidationError"] diff --git a/dify-agent/tests/local/agenton/compositor/test_builder_snapshot.py b/dify-agent/tests/local/agenton/compositor/test_builder_snapshot.py index 6b16019846..86a90f43c5 100644 --- a/dify-agent/tests/local/agenton/compositor/test_builder_snapshot.py +++ b/dify-agent/tests/local/agenton/compositor/test_builder_snapshot.py @@ -5,9 +5,9 @@ from dataclasses import dataclass from pydantic import BaseModel, ConfigDict, ValidationError from typing_extensions import override -from agenton.compositor import Compositor, CompositorBuilder, CompositorSession, LayerRegistry +from agenton.compositor import Compositor, CompositorBuilder, CompositorSession, LayerNodeConfig, LayerRegistry from agenton.layers import EmptyLayerConfig, LayerControl, LayerDeps, NoLayerDeps, PlainLayer, PlainPromptType, PlainToolType -from agenton_collections.layers.plain import ObjectLayer, PromptLayer +from agenton_collections.layers.plain import ObjectLayer, PromptLayer, PromptLayerConfig def test_registry_infers_descriptor_and_rejects_duplicate_or_missing_type_id() -> None: @@ -74,6 +74,23 @@ def test_builder_creates_config_layers_with_typed_validation() -> None: raise AssertionError("Expected ValidationError.") +def test_layer_node_config_accepts_config_dto_and_serializes_fields() -> None: + registry = LayerRegistry() + registry.register_layer(PromptLayer) + node = LayerNodeConfig( + name="prompt", + type="plain.prompt", + config=PromptLayerConfig(prefix="hello", user="ask politely"), + ) + + dumped = node.model_dump(mode="json") + compositor = CompositorBuilder(registry).add_config({"layers": [dumped]}).build() + + assert dumped["config"] == {"prefix": "hello", "user": "ask politely", "suffix": []} + assert [prompt.value for prompt in compositor.prompts] == ["hello"] + assert [prompt.value for prompt in compositor.user_prompts] == ["ask politely"] + + class ObjectConsumerDeps(LayerDeps): obj: ObjectLayer[str] # pyright: ignore[reportUninitializedInstanceVariable] diff --git a/dify-agent/tests/local/agenton/layers/test_schema_inference.py b/dify-agent/tests/local/agenton/layers/test_schema_inference.py index 45eedf3bc5..501a2209c4 100644 --- a/dify-agent/tests/local/agenton/layers/test_schema_inference.py +++ b/dify-agent/tests/local/agenton/layers/test_schema_inference.py @@ -3,10 +3,18 @@ from dataclasses import dataclass from pydantic import BaseModel, ConfigDict from agenton.compositor import LayerRegistry -from agenton.layers import EmptyLayerConfig, EmptyRuntimeHandles, EmptyRuntimeState, LayerControl, NoLayerDeps, PlainLayer +from agenton.layers import ( + EmptyLayerConfig, + EmptyRuntimeHandles, + EmptyRuntimeState, + LayerConfig, + LayerControl, + NoLayerDeps, + PlainLayer, +) -class InferredConfig(BaseModel): +class InferredConfig(LayerConfig): value: str = "configured" model_config = ConfigDict(extra="forbid") @@ -68,7 +76,7 @@ def test_invalid_declared_schema_type_is_rejected_clearly() -> None: config_type = dict # pyright: ignore[reportAssignmentType] except TypeError as e: - assert str(e) == "InvalidSchemaLayer.config_type must be a Pydantic BaseModel subclass." + assert str(e) == "InvalidSchemaLayer.config_type must be a LayerConfig subclass." else: raise AssertionError("Expected TypeError.") @@ -78,7 +86,7 @@ def test_invalid_declared_schema_type_is_rejected_clearly() -> None: pass except TypeError as e: - assert str(e) == "InvalidGenericSchemaLayer.config_type must be a Pydantic BaseModel subclass." + assert str(e) == "InvalidGenericSchemaLayer.config_type must be a LayerConfig subclass." else: raise AssertionError("Expected TypeError.") diff --git a/dify-agent/tests/local/dify_agent/client/test_client.py b/dify-agent/tests/local/dify_agent/client/test_client.py index aa9eba527f..56e9a71d99 100644 --- a/dify-agent/tests/local/dify_agent/client/test_client.py +++ b/dify-agent/tests/local/dify_agent/client/test_client.py @@ -9,6 +9,7 @@ from typing import cast, override import httpx import pytest +from agenton.compositor import CompositorSessionSnapshot from dify_agent.client import ( Client, DifyAgentHTTPError, @@ -19,12 +20,12 @@ from dify_agent.client import ( ) from dify_agent.protocol.schemas import ( CreateRunRequest, - EmptyRunEventData, RUN_EVENT_ADAPTER, RunEvent, RunEventsResponse, RunStartedEvent, RunSucceededEvent, + RunSucceededEventData, ) @@ -47,6 +48,14 @@ def _event_frame(event: RunEvent, *, event_id: str | None = None, exclude_id: bo return "\n".join(lines) + "\n\n" +def _run_succeeded_event(*, event_id: str = "2-0", run_id: str = "run-1") -> RunSucceededEvent: + return RunSucceededEvent( + id=event_id, + run_id=run_id, + data=RunSucceededEventData(output="done", session_snapshot=CompositorSessionSnapshot(layers=[])), + ) + + def _run_status_json(status: str) -> dict[str, object]: now = datetime(2026, 5, 11, tzinfo=UTC).isoformat() return {"run_id": "run-1", "status": status, "created_at": now, "updated_at": now, "error": None} @@ -64,7 +73,7 @@ class DisconnectingSyncStream(httpx.SyncByteStream): raise httpx.ReadError("stream disconnected") -def test_sync_methods_parse_protocol_dtos_and_validate_create_dict() -> None: +def test_sync_methods_parse_protocol_dtos_and_send_create_request_dto() -> None: def handler(request: httpx.Request) -> httpx.Response: if request.method == "POST" and request.url.path == "/runs": payload = cast(dict[str, object], json.loads(request.content)) @@ -83,7 +92,7 @@ def test_sync_methods_parse_protocol_dtos_and_validate_create_dict() -> None: 200, json={ "run_id": "run-1", - "events": [cast(object, json.loads(RUN_EVENT_ADAPTER.dump_json(event)))], + "events": [cast(object, json.loads(RUN_EVENT_ADAPTER.dump_json(event)))], "next_cursor": "1-0", }, ) @@ -92,7 +101,7 @@ def test_sync_methods_parse_protocol_dtos_and_validate_create_dict() -> None: http_client = httpx.Client(transport=httpx.MockTransport(handler)) client = Client(base_url="http://testserver", sync_http_client=http_client) - created = client.create_run_sync(_create_run_payload()) + created = client.create_run_sync(CreateRunRequest.model_validate(_create_run_payload())) status = client.get_run_sync(created.run_id) events = client.get_events_sync(created.run_id, after="0-0", limit=10) @@ -162,7 +171,7 @@ def test_error_mapping_and_create_run_input_validation() -> None: assert server_error.value.status_code == 500 with pytest.raises(DifyAgentValidationError): - _ = client.create_run_sync({"unknown": "field"}) + _ = client.create_run_sync({"unknown": "field"}) # pyright: ignore[reportArgumentType] def test_http_timeout_maps_to_client_timeout_error() -> None: @@ -192,7 +201,7 @@ def test_create_run_is_not_retried_after_timeout() -> None: ) with pytest.raises(DifyAgentTimeoutError): - _ = client.create_run_sync(_create_run_payload()) + _ = client.create_run_sync(CreateRunRequest.model_validate(_create_run_payload())) assert attempts == 1 @@ -221,7 +230,7 @@ def test_stream_events_stops_after_terminal_event() -> None: body = "".join( [ _event_frame(RunStartedEvent(id="1-0", run_id="run-1")), - _event_frame(RunSucceededEvent(id="2-0", run_id="run-1", data=EmptyRunEventData())), + _event_frame(_run_succeeded_event()), ] ) @@ -251,7 +260,7 @@ def test_stream_events_reconnects_from_latest_event_id() -> None: 200, stream=DisconnectingSyncStream(_event_frame(RunStartedEvent(id="1-0", run_id="run-1"))), ) - return httpx.Response(200, content=_event_frame(RunSucceededEvent(id="2-0", run_id="run-1"))) + return httpx.Response(200, content=_event_frame(_run_succeeded_event())) client = Client( base_url="http://testserver", @@ -271,7 +280,7 @@ def test_stream_events_reconnects_after_http_5xx_response() -> None: seen_after.append(request.url.params["after"]) if len(seen_after) == 1: return httpx.Response(503, json={"detail": "temporarily unavailable"}) - return httpx.Response(200, content=_event_frame(RunSucceededEvent(id="2-0", run_id="run-1"))) + return httpx.Response(200, content=_event_frame(_run_succeeded_event())) client = Client( base_url="http://testserver", @@ -321,7 +330,7 @@ def test_malformed_sse_frame_does_not_reconnect() -> None: def test_async_stream_events_yields_terminal_event() -> None: - body = _event_frame(RunSucceededEvent(id="2-0", run_id="run-1")) + body = _event_frame(_run_succeeded_event()) def handler(_request: httpx.Request) -> httpx.Response: return httpx.Response(200, content=body) @@ -345,7 +354,7 @@ def test_async_stream_events_reconnects_after_http_5xx_response() -> None: seen_after.append(request.url.params["after"]) if len(seen_after) == 1: return httpx.Response(502, json={"detail": "bad gateway"}) - return httpx.Response(200, content=_event_frame(RunSucceededEvent(id="2-0", run_id="run-1"))) + return httpx.Response(200, content=_event_frame(_run_succeeded_event())) async def scenario() -> None: http_client = httpx.AsyncClient(transport=httpx.MockTransport(handler)) @@ -368,7 +377,7 @@ def test_stream_timeout_can_reconnect_until_terminal() -> None: calls += 1 if calls == 1: raise httpx.ReadTimeout("stream stalled", request=request) - return httpx.Response(200, content=_event_frame(RunSucceededEvent(id="2-0", run_id="run-1"))) + return httpx.Response(200, content=_event_frame(_run_succeeded_event())) client = Client( base_url="http://testserver", diff --git a/dify-agent/tests/local/dify_agent/protocol/test_protocol_schemas.py b/dify-agent/tests/local/dify_agent/protocol/test_protocol_schemas.py index 0a52316bbb..9f53c08b4e 100644 --- a/dify-agent/tests/local/dify_agent/protocol/test_protocol_schemas.py +++ b/dify-agent/tests/local/dify_agent/protocol/test_protocol_schemas.py @@ -1,13 +1,16 @@ +import pytest +from pydantic import ValidationError from pydantic_ai.messages import FinalResultEvent +from agenton.compositor import CompositorSessionSnapshot from dify_agent.protocol.schemas import ( RUN_EVENT_ADAPTER, - AgentOutputRunEvent, - AgentOutputRunEventData, PydanticAIStreamRunEvent, RunFailedEvent, RunFailedEventData, RunStartedEvent, + RunSucceededEvent, + RunSucceededEventData, ) @@ -15,7 +18,13 @@ def test_run_event_adapter_round_trips_typed_variants() -> None: events = [ RunStartedEvent(run_id="run-1"), PydanticAIStreamRunEvent(run_id="run-1", data=FinalResultEvent(tool_name=None, tool_call_id=None)), - AgentOutputRunEvent(run_id="run-1", data=AgentOutputRunEventData(output="done")), + RunSucceededEvent( + run_id="run-1", + data=RunSucceededEventData( + output={"answer": ["done"]}, + session_snapshot=CompositorSessionSnapshot(layers=[]), + ), + ), RunFailedEvent(run_id="run-1", data=RunFailedEventData(error="boom", reason="shutdown")), ] @@ -38,3 +47,9 @@ def test_pydantic_ai_event_data_uses_agent_stream_event_model() -> None: assert isinstance(event, PydanticAIStreamRunEvent) assert isinstance(event.data, FinalResultEvent) + + +@pytest.mark.parametrize("event_type", ["agent_output", "session_snapshot"]) +def test_removed_non_terminal_payload_events_are_rejected(event_type: str) -> None: + with pytest.raises(ValidationError): + _ = RUN_EVENT_ADAPTER.validate_python({"run_id": "run-1", "type": event_type, "data": {}}) diff --git a/dify-agent/tests/local/dify_agent/runtime/test_runner.py b/dify-agent/tests/local/dify_agent/runtime/test_runner.py index 92930ccb18..b79e96c929 100644 --- a/dify-agent/tests/local/dify_agent/runtime/test_runner.py +++ b/dify-agent/tests/local/dify_agent/runtime/test_runner.py @@ -3,7 +3,7 @@ import asyncio import pytest from agenton.compositor import CompositorConfig, LayerNodeConfig -from dify_agent.protocol.schemas import AgentProfileConfig, CreateRunRequest +from dify_agent.protocol.schemas import AgentProfileConfig, CreateRunRequest, RunSucceededEvent from dify_agent.runtime.event_sink import InMemoryRunEventSink from dify_agent.runtime.runner import AgentRunRunner, AgentRunValidationError @@ -28,7 +28,13 @@ def test_runner_emits_terminal_success_and_snapshot() -> None: event_types = [event.type for event in sink.events["run-1"]] assert event_types[0] == "run_started" assert "pydantic_ai_event" in event_types - assert event_types[-3:] == ["agent_output", "session_snapshot", "run_succeeded"] + assert "agent_output" not in event_types + assert "session_snapshot" not in event_types + assert event_types[-1:] == ["run_succeeded"] + terminal = sink.events["run-1"][-1] + assert isinstance(terminal, RunSucceededEvent) + assert terminal.data.output == "done" + assert [layer.name for layer in terminal.data.session_snapshot.layers] == ["prompt"] assert sink.statuses["run-1"] == "succeeded" diff --git a/dify-agent/tests/local/dify_agent/storage/test_redis_run_store.py b/dify-agent/tests/local/dify_agent/storage/test_redis_run_store.py index 2be665df22..518e9b4bec 100644 --- a/dify-agent/tests/local/dify_agent/storage/test_redis_run_store.py +++ b/dify-agent/tests/local/dify_agent/storage/test_redis_run_store.py @@ -1,8 +1,12 @@ import asyncio from collections.abc import Mapping +from typing import cast -from agenton.compositor import CompositorConfig, LayerNodeConfig -from dify_agent.protocol.schemas import CreateRunRequest, RunStartedEvent +from pydantic import JsonValue + +from agenton.compositor import CompositorConfig, CompositorSessionSnapshot, LayerNodeConfig, LayerSessionSnapshot +from agenton.layers import LifecycleState +from dify_agent.protocol.schemas import CreateRunRequest, RunStartedEvent, RunSucceededEvent, RunSucceededEventData from dify_agent.storage.redis_run_store import DEFAULT_RUN_RETENTION_SECONDS, RedisRunStore @@ -17,10 +21,12 @@ def _request() -> CreateRunRequest: class FakeRedis: commands: list[tuple[object, ...]] values: dict[str, object] + streams: dict[str, list[tuple[str, dict[str, object]]]] def __init__(self) -> None: self.commands = [] self.values = {} + self.streams = {} async def set(self, key: str, value: object, *, ex: int | None = None) -> None: self.commands.append(("set", key, value, ex)) @@ -32,12 +38,37 @@ class FakeRedis: async def xadd(self, key: str, fields: Mapping[str, object]) -> str: self.commands.append(("xadd", key, dict(fields))) - return "1-0" + entries = self.streams.setdefault(key, []) + event_id = f"{len(entries) + 1}-0" + entries.append((event_id, dict(fields))) + return event_id + + async def xrange(self, key: str, *, min: str = "-", count: int | None = None) -> list[tuple[str, dict[str, object]]]: + self.commands.append(("xrange", key, min, count)) + entries = [entry for entry in self.streams.get(key, []) if self._is_after_min(entry[0], min)] + if count is not None: + return entries[:count] + return entries async def expire(self, key: str, seconds: int) -> bool: self.commands.append(("expire", key, seconds)) return True + @staticmethod + def _is_after_min(event_id: str, min_id: str) -> bool: + if min_id == "-": + return True + is_exclusive = min_id.startswith("(") + cursor = min_id[1:] if is_exclusive else min_id + event_value = FakeRedis._stream_id_value(event_id) + cursor_value = FakeRedis._stream_id_value(cursor) + return event_value > cursor_value if is_exclusive else event_value >= cursor_value + + @staticmethod + def _stream_id_value(event_id: str) -> tuple[int, int]: + timestamp, sequence = event_id.split("-", maxsplit=1) + return int(timestamp), int(sequence) + def test_create_run_writes_running_record_without_job_queue_and_with_retention() -> None: redis = FakeRedis() @@ -80,3 +111,39 @@ def test_append_event_serializes_typed_event_without_id_and_expires_run_keys() - ("expire", "test:runs:run-1:events", 60), ("expire", "test:runs:run-1:record", 60), ] + + +def test_get_events_round_trips_run_succeeded_output_and_session_snapshot() -> None: + redis = FakeRedis() + store = RedisRunStore(redis, prefix="test", run_retention_seconds=60) # pyright: ignore[reportArgumentType] + output = cast(JsonValue, {"answer": ["done", 1], "ok": True}) + session_snapshot = CompositorSessionSnapshot( + layers=[ + LayerSessionSnapshot( + name="prompt", + state=LifecycleState.SUSPENDED, + runtime_state={"resource_id": "abc"}, + ) + ] + ) + + async def scenario() -> tuple[str, RunSucceededEvent]: + record = await store.create_run(_request()) + event_id = await store.append_event( + RunSucceededEvent( + id="local-only", + run_id=record.run_id, + data=RunSucceededEventData(output=output, session_snapshot=session_snapshot), + ) + ) + page = await store.get_events(record.run_id, after="0-0", limit=10) + decoded = page.events[0] + assert isinstance(decoded, RunSucceededEvent) + assert page.next_cursor == event_id + return event_id, decoded + + event_id, decoded = asyncio.run(scenario()) + + assert decoded.id == event_id + assert decoded.data.output == output + assert decoded.data.session_snapshot == session_snapshot