refactor dify-agent run success protocol

This commit is contained in:
盐粒 Yanli 2026-05-12 01:46:39 +08:00
parent 15f5c7064e
commit 8d96f6cfbd
23 changed files with 385 additions and 208 deletions

View File

@ -12,7 +12,7 @@ Framework-neutral base class for prompt/tool layers.
Class attributes:
- `type_id: str | None`: registry id for config-backed plugin layers.
- `config_type: type[BaseModel]`: Pydantic schema for serialized layer config.
- `config_type: type[LayerConfig]`: Pydantic schema for serialized layer config.
- `runtime_state_type: type[BaseModel]`: Pydantic schema for snapshot-safe
per-session state.
- `runtime_handles_type: type[BaseModel]`: Pydantic schema for live runtime
@ -74,6 +74,8 @@ serialized and should be rehydrated from runtime state in resume hooks.
### Schema defaults and lifecycle enums
- `EmptyLayerConfig`
- `LayerConfig`: base DTO for serializable layer config schemas
- `LayerConfigValue`: JSON value or concrete `LayerConfig` DTO
- `EmptyRuntimeState`
- `EmptyRuntimeHandles`
- `LifecycleState`: `NEW`, `ACTIVE`, `SUSPENDED`, `CLOSED`
@ -97,8 +99,9 @@ Tagged aggregate item types:
- `LayerNodeConfig`: `name`, `type`, `config`, `deps`, `metadata`
- `CompositorConfig`: `schema_version`, `layers`
Config nodes are pure serializable graph input. Use live instances for Python
objects and callables.
Config nodes are pure serializable graph input. `LayerNodeConfig.config` accepts
plain JSON values or concrete `LayerConfig` DTO instances and serializes DTOs as
JSON objects. Use live instances for Python objects and callables.
### Registry

View File

@ -8,8 +8,8 @@ on the `LayerControl` created for that layer in a `CompositorSession`.
## Config, runtime state, and runtime handles
- **Config** is serializable graph input. Config-constructible layers declare a
`type_id` and a Pydantic `config_type`; builders validate node config before
calling `Layer.from_config(validated_config)`.
`type_id` and a Pydantic `LayerConfig` schema; builders validate node config
before calling `Layer.from_config(validated_config)`.
- **Runtime state** is serializable per-layer/per-session state. Layers declare a
Pydantic `runtime_state_type`; session snapshots persist this model with
`model_dump(mode="json")`.
@ -20,11 +20,11 @@ on the `LayerControl` created for that layer in a `CompositorSession`.
## Define a config-backed layer
Use a Pydantic model for config and pass it through the typed layer family so
Use a `LayerConfig` model for config and pass it through the typed layer family so
`Layer.__init_subclass__` can infer the schema:
```python {test="skip" lint="skip"}
class GreetingConfig(BaseModel):
class GreetingConfig(LayerConfig):
prefix: str
model_config = ConfigDict(extra="forbid")

View File

@ -164,36 +164,39 @@ Use `dify_agent.client.Client` for both async and sync code. Async methods use
normal names; sync methods add `_sync`.
```python {test="skip" lint="skip"}
from agenton.compositor import CompositorConfig, LayerNodeConfig
from agenton_collections.layers.plain import PromptLayerConfig
from dify_agent.client import Client
from dify_agent.protocol import CreateRunRequest
async def main() -> None:
async with Client(base_url="http://localhost:8000") as client:
run = await client.create_run(
{
"compositor": {
"schema_version": 1,
"layers": [{"name": "prompt", "type": "plain.prompt", "config": {"user": "hello"}}],
}
}
request = CreateRunRequest(
compositor=CompositorConfig(
layers=[LayerNodeConfig(name="prompt", type="plain.prompt", config=PromptLayerConfig(user="hello"))]
)
)
async with Client(base_url="http://localhost:8000") as client:
run = await client.create_run(request)
async for event in client.stream_events(run.run_id):
print(event)
```
```python {test="skip" lint="skip"}
from agenton.compositor import CompositorConfig, LayerNodeConfig
from agenton_collections.layers.plain import PromptLayerConfig
from dify_agent.client import Client
from dify_agent.protocol import CreateRunRequest
request = CreateRunRequest(
compositor=CompositorConfig(
layers=[LayerNodeConfig(name="prompt", type="plain.prompt", config=PromptLayerConfig(user="hello"))]
)
)
with Client(base_url="http://localhost:8000") as client:
run = client.create_run_sync(
{
"compositor": {
"schema_version": 1,
"layers": [{"name": "prompt", "type": "plain.prompt", "config": {"user": "hello"}}],
}
}
)
run = client.create_run_sync(request)
terminal = client.wait_run_sync(run.run_id)
```
@ -201,8 +204,9 @@ with Client(base_url="http://localhost:8000") as client:
They reconnect by default from the latest yielded event id and stop after
`run_succeeded` or `run_failed`. They do not reconnect for HTTP 4xx responses,
DTO validation failures, or malformed SSE frames. `create_run` and
`create_run_sync` never retry `POST /runs`; if a timeout occurs, the caller must
decide whether to inspect existing runs or submit a new run.
`create_run_sync` require a `CreateRunRequest` DTO and never retry `POST /runs`;
if a timeout occurs, the caller must decide whether to inspect existing runs or
submit a new run.
## Event types and order
@ -210,9 +214,7 @@ A normal successful run emits:
1. `run_started`
2. zero or more `pydantic_ai_event`
3. `agent_output`
4. `session_snapshot`
5. `run_succeeded`
3. `run_succeeded`
A failed run emits:
@ -220,13 +222,13 @@ A failed run emits:
2. zero or more `pydantic_ai_event`
3. `run_failed`
Each event keeps the same envelope shape and has typed `data`: `run_started` and
`run_succeeded` use `{}`, `pydantic_ai_event` uses Pydantic AI's
`AgentStreamEvent` union, `agent_output` uses `{ "output": string }`,
`session_snapshot` uses `CompositorSessionSnapshot`, and `run_failed` uses
`{ "error": string, "reason": string | null }`. The session snapshot can be sent
as `session_snapshot` in a later create-run request with the same compositor layer
names and order.
Each event keeps the same envelope shape and has typed `data`: `run_started` uses
`{}`, `pydantic_ai_event` uses Pydantic AI's `AgentStreamEvent` union,
`run_succeeded` uses `{ "output": JsonValue, "session_snapshot":
CompositorSessionSnapshot }`, and `run_failed` uses `{ "error": string,
"reason": string | null }`. The session snapshot from `run_succeeded.data` can
be sent as `session_snapshot` in a later create-run request with the same
compositor layer names and order.
## Consumer examples

View File

@ -91,8 +91,8 @@ effective prompts are rejected during create-run validation before the run is
persisted or scheduled.
There is no Pydantic AI history layer. To resume Agenton layer state, pass the
`session_snapshot` emitted by a previous run together with a compositor that has
the same layer names and order.
`session_snapshot` from a previous `run_succeeded.data` payload together with a
compositor that has the same layer names and order.
## Observing runs
@ -107,12 +107,12 @@ progress:
`id` is the event Redis Stream ID. `after` query cursors take precedence over
`Last-Event-ID` headers.
Successful runs emit `run_started`, zero or more `pydantic_ai_event`,
`agent_output`, `session_snapshot`, and `run_succeeded`. Failed runs end with
`run_failed`. Event envelopes retain `id`, `run_id`, `type`, `data`, and
`created_at`; `data` is typed per event type, including Pydantic AI's
`AgentStreamEvent` payload for `pydantic_ai_event` and `CompositorSessionSnapshot`
for `session_snapshot`.
Successful runs emit `run_started`, zero or more `pydantic_ai_event`, and
`run_succeeded`. Failed runs end with `run_failed`. Event envelopes retain `id`,
`run_id`, `type`, `data`, and `created_at`; `data` is typed per event type,
including Pydantic AI's `AgentStreamEvent` payload for `pydantic_ai_event` and a
terminal `run_succeeded.data` object containing JSON-safe `output` plus a
`CompositorSessionSnapshot` for resumption.
## Examples

View File

@ -13,7 +13,10 @@ recover after client-side uncertainty.
import asyncio
from agenton.compositor import CompositorConfig, LayerNodeConfig
from agenton_collections.layers.plain import PromptLayerConfig
from dify_agent.client import Client
from dify_agent.protocol import AgentProfileConfig, CreateRunRequest
API_BASE_URL = "http://localhost:8000"
@ -22,22 +25,21 @@ API_BASE_URL = "http://localhost:8000"
async def main() -> None:
async with Client(base_url=API_BASE_URL) as client:
run = await client.create_run(
{
"compositor": {
"schema_version": 1,
"layers": [
{
"name": "prompt",
"type": "plain.prompt",
"config": {
"prefix": "You are a concise assistant.",
"user": "Say hello from the Dify Agent API server example.",
},
}
CreateRunRequest(
compositor=CompositorConfig(
layers=[
LayerNodeConfig(
name="prompt",
type="plain.prompt",
config=PromptLayerConfig(
prefix="You are a concise assistant.",
user="Say hello from the Dify Agent API server example.",
),
)
],
},
"agent_profile": {"provider": "test", "output_text": "Hello from the example TestModel."},
}
),
agent_profile=AgentProfileConfig(output_text="Hello from the example TestModel."),
)
)
print("created run", run)

View File

@ -5,7 +5,10 @@ does not retry ``POST /runs``; if a timeout occurs, inspect server state or crea
a new run explicitly rather than assuming the original request was not accepted.
"""
from agenton.compositor import CompositorConfig, LayerNodeConfig
from agenton_collections.layers.plain import PromptLayerConfig
from dify_agent.client import Client
from dify_agent.protocol import AgentProfileConfig, CreateRunRequest
API_BASE_URL = "http://localhost:8000"
@ -14,22 +17,21 @@ API_BASE_URL = "http://localhost:8000"
def main() -> None:
with Client(base_url=API_BASE_URL) as client:
run = client.create_run_sync(
{
"compositor": {
"schema_version": 1,
"layers": [
{
"name": "prompt",
"type": "plain.prompt",
"config": {
"prefix": "You are a concise assistant.",
"user": "Say hello from the synchronous Dify Agent client example.",
},
}
CreateRunRequest(
compositor=CompositorConfig(
layers=[
LayerNodeConfig(
name="prompt",
type="plain.prompt",
config=PromptLayerConfig(
prefix="You are a concise assistant.",
user="Say hello from the synchronous Dify Agent client example.",
),
)
],
},
"agent_profile": {"provider": "test", "output_text": "Hello from the sync TestModel."},
}
),
agent_profile=AgentProfileConfig(output_text="Hello from the sync TestModel."),
)
)
print("created run", run)
terminal = client.wait_run_sync(run.run_id, poll_interval_seconds=0.5)

View File

@ -18,8 +18,11 @@ collected in reverse. User prompts are collected from first to last layer so the
composed user message preserves graph order.
Serializable graph config uses registry type ids rather than import paths.
``CompositorBuilder`` resolves config nodes through ``LayerRegistry`` and can
mix those nodes with live layer instances for Python objects and callables.
``LayerNodeConfig.config`` accepts plain JSON values and ``LayerConfig`` DTO
instances; JSON serialization preserves concrete DTO fields before the builder
validates them with the registered layer schema. ``CompositorBuilder`` resolves
config nodes through ``LayerRegistry`` and can mix those nodes with live layer
instances for Python objects and callables.
``Compositor.enter`` enters layers in compositor order and exits them in reverse
order through ``AsyncExitStack``. It accepts an optional ``CompositorSession``
@ -43,7 +46,7 @@ from typing import Any, Generic, Mapping, TypedDict, cast
from pydantic import BaseModel, ConfigDict, Field, JsonValue
from typing_extensions import Self, TypeVar
from agenton.layers.base import Layer, LayerControl, LifecycleState
from agenton.layers.base import Layer, LayerConfig, LayerConfigValue, LayerControl, LifecycleState
from agenton.layers.types import AllPromptTypes, AllToolTypes, AllUserPromptTypes
PromptT = TypeVar("PromptT", default=AllPromptTypes)
@ -92,7 +95,7 @@ class LayerNodeConfig(BaseModel):
name: str
type: str
config: JsonValue = Field(default_factory=dict)
config: LayerConfigValue = Field(default_factory=dict)
deps: Mapping[str, str] = Field(default_factory=dict)
metadata: Mapping[str, JsonValue] = Field(default_factory=dict)
@ -126,7 +129,7 @@ class LayerDescriptor:
type_id: str
layer_type: type[Layer[Any, Any, Any, Any, Any, Any, Any]]
config_type: type[BaseModel]
config_type: type[LayerConfig]
runtime_state_type: type[BaseModel]
runtime_handles_type: type[BaseModel]
@ -279,7 +282,7 @@ class CompositorBuilder:
*,
name: str,
type: str,
config: object | None = None,
config: LayerConfigValue | None = None,
deps: Mapping[str, str] | None = None,
) -> Self:
"""Resolve, validate, and add one registry-backed layer config node."""

View File

@ -11,6 +11,8 @@ from agenton.layers.base import (
EmptyRuntimeState,
ExitIntent,
Layer,
LayerConfig,
LayerConfigValue,
LayerControl,
LayerDeps,
LifecycleState,
@ -41,6 +43,8 @@ __all__ = [
"AllToolTypes",
"AllUserPromptTypes",
"Layer",
"LayerConfig",
"LayerConfigValue",
"LayerDeps",
"LayerControl",
"LifecycleState",

View File

@ -9,6 +9,10 @@ serializable runtime state, and live runtime handles. The base class infers
while still allowing subclasses to set them explicitly for unusual inheritance
patterns.
``LayerConfig`` is the DTO base for config schemas that can be embedded directly
in serializable compositor config. Runtime state and handle schemas remain plain
Pydantic models because they are not accepted as graph input.
``Layer.bind_deps`` is the mutation point for dependency state. Layer
implementations should treat ``self.deps`` as unavailable until a compositor or
caller has resolved and bound dependencies.
@ -40,7 +44,7 @@ from enum import StrEnum
from types import UnionType
from typing import Any, ClassVar, Generic, Mapping, Sequence, Union, cast, get_args, get_origin, get_type_hints
from pydantic import BaseModel, ConfigDict
from pydantic import BaseModel, ConfigDict, JsonValue, SerializeAsAny
from typing_extensions import Self, TypeVar
@ -48,7 +52,24 @@ _DepsT = TypeVar("_DepsT", bound="LayerDeps")
_PromptT = TypeVar("_PromptT")
_UserPromptT = TypeVar("_UserPromptT")
_ToolT = TypeVar("_ToolT")
_ConfigT = TypeVar("_ConfigT", bound=BaseModel, default="EmptyLayerConfig")
class LayerConfig(BaseModel):
"""Base DTO for serializable layer configuration.
Subclasses are safe to place in ``LayerNodeConfig.config``. The compositor
still accepts plain JSON values for wire input, but typed Python call sites can
use concrete ``LayerConfig`` subclasses and preserve their fields during JSON
serialization.
"""
model_config = ConfigDict(extra="forbid")
type LayerConfigValue = JsonValue | SerializeAsAny[LayerConfig]
_ConfigT = TypeVar("_ConfigT", bound=LayerConfig, default="EmptyLayerConfig")
_RuntimeStateT = TypeVar("_RuntimeStateT", bound=BaseModel, default="EmptyRuntimeState")
_RuntimeHandlesT = TypeVar("_RuntimeHandlesT", bound=BaseModel, default="EmptyRuntimeHandles")
@ -93,7 +114,7 @@ class NoLayerDeps(LayerDeps):
"""Dependency container for layers that do not require other layers."""
class EmptyLayerConfig(BaseModel):
class EmptyLayerConfig(LayerConfig):
"""Default serializable config schema for layers without config."""
model_config = ConfigDict(extra="forbid")
@ -194,7 +215,7 @@ class Layer(
deps_type: type[_DepsT]
deps: _DepsT
type_id: ClassVar[str | None] = None
config_type: ClassVar[type[BaseModel]] = EmptyLayerConfig
config_type: ClassVar[type[LayerConfig]] = EmptyLayerConfig
runtime_state_type: ClassVar[type[BaseModel]] = EmptyRuntimeState
runtime_handles_type: ClassVar[type[BaseModel]] = EmptyRuntimeHandles
@ -213,7 +234,7 @@ class Layer(
if not isinstance(deps_type, type) or not issubclass(deps_type, LayerDeps):
raise TypeError(f"{cls.__name__}.deps_type must be a LayerDeps subclass.")
_get_dep_specs(deps_type)
_init_schema_type(cls, "config_type", _infer_schema_type(cls, 4, "config_type"), EmptyLayerConfig)
_init_config_type(cls, _infer_config_type(cls))
_init_schema_type(
cls,
"runtime_state_type",
@ -421,6 +442,16 @@ def _infer_schema_type(
return schema_type
def _infer_config_type(layer_type: type[Layer[Any, Any, Any, Any, Any, Any, Any]]) -> type[LayerConfig] | None:
inferred = _infer_schema_generic_arg(layer_type, "config_type", {}) or _infer_layer_generic_arg(layer_type, 4, {})
if inferred is None:
return None
config_type = _as_config_type(inferred)
if config_type is None:
raise TypeError(f"{layer_type.__name__}.config_type must be a LayerConfig subclass.")
return config_type
def _infer_schema_generic_arg(
layer_type: type[Layer[Any, Any, Any, Any, Any, Any, Any]],
attr_name: str,
@ -494,6 +525,18 @@ def _init_schema_type(
raise TypeError(f"{layer_type.__name__}.{attr_name} must be a Pydantic BaseModel subclass.")
def _init_config_type(
layer_type: type[Layer[Any, Any, Any, Any, Any, Any, Any]],
inferred_config_type: type[LayerConfig] | None,
) -> None:
config_type = layer_type.__dict__.get("config_type")
if config_type is None:
config_type = inferred_config_type or getattr(layer_type, "config_type", EmptyLayerConfig)
setattr(layer_type, "config_type", config_type)
if not isinstance(config_type, type) or not issubclass(config_type, LayerConfig):
raise TypeError(f"{layer_type.__name__}.config_type must be a LayerConfig subclass.")
def _substitute_type(value: object, substitutions: Mapping[object, object]) -> object:
if value in substitutions:
return substitutions[value]
@ -542,6 +585,13 @@ def _as_model_type(value: object) -> type[BaseModel] | None:
return None
def _as_config_type(value: object) -> type[LayerConfig] | None:
runtime_type = get_origin(value) or value
if isinstance(runtime_type, type) and issubclass(runtime_type, LayerConfig):
return runtime_type
return None
def _is_generic_layer_template(layer_type: type[Layer[Any, Any, Any, Any, Any, Any, Any]]) -> bool:
return bool(getattr(layer_type, "__type_params__", ())) or bool(
getattr(layer_type, "__parameters__", ())

View File

@ -6,7 +6,8 @@ contracts, such as ordinary strings with plain callable tools or pydantic-ai
prompt/tool shapes. The families keep the trailing schema generic slots open so
concrete layers can have ``config_type``, ``runtime_state_type``, and
``runtime_handles_type`` inferred from type arguments instead of repeated class
attributes.
attributes. Config schemas use ``LayerConfig`` so they can also be embedded as
typed DTOs in serializable compositor config.
Tagged aggregate aliases cover code paths that can accept any supported
prompt/tool family without changing the plain and pydantic-ai layer contracts.
Pydantic-ai names are imported for static analysis only, so ``agenton`` can be
@ -29,7 +30,7 @@ if TYPE_CHECKING:
from pydantic import BaseModel
from agenton.layers.base import EmptyLayerConfig, EmptyRuntimeHandles, EmptyRuntimeState, Layer, LayerDeps
from agenton.layers.base import EmptyLayerConfig, EmptyRuntimeHandles, EmptyRuntimeState, Layer, LayerConfig, LayerDeps
type PlainPrompt = str
type PlainUserPrompt = str
@ -95,7 +96,7 @@ type AllToolTypes = PlainToolType | PydanticAIToolType[Any]
_DepsT = TypeVar("_DepsT", bound=LayerDeps)
_ConfigT = TypeVar("_ConfigT", bound=BaseModel, default=EmptyLayerConfig)
_ConfigT = TypeVar("_ConfigT", bound=LayerConfig, default=EmptyLayerConfig)
_RuntimeStateT = TypeVar("_RuntimeStateT", bound=BaseModel, default=EmptyRuntimeState)
_RuntimeHandlesT = TypeVar("_RuntimeHandlesT", bound=BaseModel, default=EmptyRuntimeHandles)
_AgentDepsT = TypeVar("_AgentDepsT")

View File

@ -10,13 +10,14 @@ from collections.abc import Callable, Sequence
from dataclasses import dataclass, field
from typing import Any
from pydantic import BaseModel, ConfigDict, Field
from pydantic import ConfigDict, Field
from typing_extensions import Self, override
from agenton.layers.base import NoLayerDeps
from agenton.layers.base import LayerConfig, NoLayerDeps
from agenton.layers.types import PlainLayer
class PromptLayerConfig(BaseModel):
class PromptLayerConfig(LayerConfig):
"""Serializable config schema for ``PromptLayer``."""
prefix: list[str] | str = Field(default_factory=list)
@ -48,7 +49,8 @@ class PromptLayer(PlainLayer[NoLayerDeps, PromptLayerConfig]):
suffix: list[str] | str = field(default_factory=list)
@classmethod
def from_config(cls, config: BaseModel):
@override
def from_config(cls, config: PromptLayerConfig) -> Self:
"""Create a prompt layer from validated prompt config."""
validated_config = PromptLayerConfig.model_validate(config)
return cls(prefix=validated_config.prefix, user=validated_config.user, suffix=validated_config.suffix)

View File

@ -2,11 +2,12 @@
The client uses the public DTOs from ``dify_agent.protocol.schemas`` for all
normal request and response parsing. It intentionally does not retry
``POST /runs`` because create-run is not idempotent. SSE streams are the only
operation with reconnect logic: transient stream/connect/read failures, stream
timeouts, and HTTP 5xx stream responses reconnect with the latest observed event
id, while HTTP 4xx responses, DTO validation failures, and malformed SSE frames
fail immediately.
``POST /runs`` because create-run is not idempotent, and create helpers require a
``CreateRunRequest`` instance rather than accepting raw payload dicts. SSE
streams are the only operation with reconnect logic: transient stream, connect,
or read failures, stream timeouts, and HTTP 5xx stream responses reconnect with
the latest observed event id, while HTTP 4xx responses, DTO validation failures,
and malformed SSE frames fail immediately.
"""
from __future__ import annotations
@ -241,12 +242,12 @@ class Client:
if self._owns_sync_http_client and self._sync_http_client is not None:
self.close_sync()
async def create_run(self, request: CreateRunRequest | dict[str, object]) -> CreateRunResponse:
async def create_run(self, request: CreateRunRequest) -> CreateRunResponse:
"""Create one run and return its accepted status response.
Dict inputs are validated as ``CreateRunRequest`` before the request is
sent. This method performs exactly one ``POST /runs`` attempt and maps
HTTPX timeouts to ``DifyAgentTimeoutError``.
``request`` must already be a public ``CreateRunRequest`` DTO. This
method performs exactly one ``POST /runs`` attempt and maps HTTPX
timeouts to ``DifyAgentTimeoutError``.
"""
request_model = _validate_create_run_request(request)
try:
@ -262,7 +263,7 @@ class Client:
raise DifyAgentClientError(f"create_run request failed: {exc}") from exc
return _parse_model_response(response, CreateRunResponse)
def create_run_sync(self, request: CreateRunRequest | dict[str, object]) -> CreateRunResponse:
def create_run_sync(self, request: CreateRunRequest) -> CreateRunResponse:
"""Synchronous variant of ``create_run`` with the same no-retry contract."""
request_model = _validate_create_run_request(request)
try:
@ -549,14 +550,11 @@ class Client:
return headers
def _validate_create_run_request(request: CreateRunRequest | dict[str, object]) -> CreateRunRequest:
"""Validate user input before creating a run."""
def _validate_create_run_request(request: CreateRunRequest) -> CreateRunRequest:
"""Reject raw payloads so create-run uses the public request DTO boundary."""
if isinstance(request, CreateRunRequest):
return request
try:
return CreateRunRequest.model_validate(request)
except ValidationError as exc:
raise DifyAgentValidationError(detail=exc.errors(include_url=False)) from exc
raise DifyAgentValidationError(detail="request must be a CreateRunRequest")
def _parse_model_response(response: httpx.Response, model_type: type[_ResponseModelT]) -> _ResponseModelT:

View File

@ -2,8 +2,6 @@
from .schemas import (
RUN_EVENT_ADAPTER,
AgentOutputRunEvent,
AgentOutputRunEventData,
AgentProfileConfig,
BaseRunEvent,
CreateRunRequest,
@ -19,14 +17,12 @@ from .schemas import (
RunStatus,
RunStatusResponse,
RunSucceededEvent,
SessionSnapshotRunEvent,
RunSucceededEventData,
utc_now,
)
__all__ = [
"AgentProfileConfig",
"AgentOutputRunEvent",
"AgentOutputRunEventData",
"BaseRunEvent",
"CreateRunRequest",
"CreateRunResponse",
@ -42,6 +38,6 @@ __all__ = [
"RunStatus",
"RunStatusResponse",
"RunSucceededEvent",
"SessionSnapshotRunEvent",
"RunSucceededEventData",
"utc_now",
]

View File

@ -8,13 +8,16 @@ Redis stream ids (or in-memory equivalents in tests) are the public cursors used
by polling and SSE replay. Event envelopes keep the public
``id``/``run_id``/``type``/``data``/``created_at`` shape, while each ``type`` has
a typed ``data`` model so OpenAPI, Redis replay, and clients parse the same
payload contract.
payload contract. Successful runs publish the final JSON-safe agent output and
the resumable Agenton session snapshot together on the terminal
``run_succeeded`` event so consumers can treat terminal events as complete run
summaries.
"""
from datetime import datetime, timezone
from typing import Annotated, ClassVar, Literal, TypeAlias
from pydantic import BaseModel, ConfigDict, Field, TypeAdapter
from pydantic import BaseModel, ConfigDict, Field, JsonValue, TypeAdapter
from pydantic_ai.messages import AgentStreamEvent
from agenton.compositor import CompositorConfig, CompositorSessionSnapshot
@ -24,8 +27,6 @@ RunStatus = Literal["running", "succeeded", "failed"]
RunEventType = Literal[
"run_started",
"pydantic_ai_event",
"agent_output",
"session_snapshot",
"run_succeeded",
"run_failed",
]
@ -86,10 +87,11 @@ class EmptyRunEventData(BaseModel):
model_config: ClassVar[ConfigDict] = ConfigDict(extra="forbid")
class AgentOutputRunEventData(BaseModel):
"""Final agent output payload emitted before the session snapshot."""
class RunSucceededEventData(BaseModel):
"""Terminal success payload for final output and resumable session state."""
output: str
output: JsonValue
session_snapshot: CompositorSessionSnapshot
model_config: ClassVar[ConfigDict] = ConfigDict(extra="forbid")
@ -127,25 +129,11 @@ class PydanticAIStreamRunEvent(BaseRunEvent):
data: AgentStreamEvent
class AgentOutputRunEvent(BaseRunEvent):
"""Run event carrying the final agent output string."""
type: Literal["agent_output"] = "agent_output"
data: AgentOutputRunEventData
class SessionSnapshotRunEvent(BaseRunEvent):
"""Run event carrying the resumable Agenton session snapshot."""
type: Literal["session_snapshot"] = "session_snapshot"
data: CompositorSessionSnapshot
class RunSucceededEvent(BaseRunEvent):
"""Terminal success event emitted after output and session snapshot."""
"""Terminal success event carrying the complete successful run result."""
type: Literal["run_succeeded"] = "run_succeeded"
data: EmptyRunEventData = Field(default_factory=EmptyRunEventData)
data: RunSucceededEventData
class RunFailedEvent(BaseRunEvent):
@ -158,8 +146,6 @@ class RunFailedEvent(BaseRunEvent):
RunEvent: TypeAlias = Annotated[
RunStartedEvent
| PydanticAIStreamRunEvent
| AgentOutputRunEvent
| SessionSnapshotRunEvent
| RunSucceededEvent
| RunFailedEvent,
Field(discriminator="type"),
@ -179,8 +165,6 @@ class RunEventsResponse(BaseModel):
__all__ = [
"AgentProfileConfig",
"AgentOutputRunEvent",
"AgentOutputRunEventData",
"BaseRunEvent",
"CreateRunRequest",
"CreateRunResponse",
@ -196,6 +180,6 @@ __all__ = [
"RunStatus",
"RunStatusResponse",
"RunSucceededEvent",
"SessionSnapshotRunEvent",
"RunSucceededEventData",
"utc_now",
]

View File

@ -2,7 +2,9 @@
The initial server exposes only a credential-free ``test`` profile. The factory
keeps model selection out of ``AgentRunRunner`` so production model profiles can
be added without changing storage or HTTP contracts.
be added without changing storage or HTTP contracts. Agents are returned through
an ``object`` output boundary because the runner serializes final output to the
public JSON-safe event payload instead of assuming text-only results.
"""
from collections.abc import Sequence
@ -21,14 +23,17 @@ def create_agent(
*,
system_prompts: Sequence[PydanticAIPrompt[object]],
tools: Sequence[PydanticAITool[object]],
) -> Agent[None, str]:
) -> Agent[None, object]:
"""Create the pydantic-ai agent for one run."""
if profile.provider == "test":
return Agent[None, str](
TestModel(custom_output_text=profile.output_text),
output_type=str,
system_prompt=materialize_static_system_prompts(system_prompts),
tools=tools,
return cast(
Agent[None, object],
Agent[None, str](
TestModel(custom_output_text=profile.output_text),
output_type=str,
system_prompt=materialize_static_system_prompts(system_prompts),
tools=tools,
),
)
raise ValueError(f"Unsupported agent profile provider: {profile.provider}")

View File

@ -2,18 +2,20 @@
The runner only needs append-only event writes and status transitions, so tests
can use ``InMemoryRunEventSink`` without Redis. Production storage implements the
same protocol with Redis streams in ``dify_agent.storage.redis_run_store``.
same protocol with Redis streams in ``dify_agent.storage.redis_run_store``. The
terminal success helper writes the final JSON-safe output and session snapshot in
one event so event consumers can stop at ``run_succeeded`` without correlating
separate payload events.
"""
from collections import defaultdict
from typing import Protocol
from pydantic import JsonValue
from pydantic_ai.messages import AgentStreamEvent
from agenton.compositor import CompositorSessionSnapshot
from dify_agent.protocol.schemas import (
AgentOutputRunEvent,
AgentOutputRunEventData,
EmptyRunEventData,
PydanticAIStreamRunEvent,
RunEvent,
@ -22,7 +24,7 @@ from dify_agent.protocol.schemas import (
RunStartedEvent,
RunStatus,
RunSucceededEvent,
SessionSnapshotRunEvent,
RunSucceededEventData,
utc_now,
)
@ -89,34 +91,24 @@ async def emit_pydantic_ai_event(sink: RunEventSink, *, run_id: str, data: Agent
)
async def emit_agent_output(sink: RunEventSink, *, run_id: str, output: str) -> str:
"""Emit the final output text produced by the agent."""
async def emit_run_succeeded(
sink: RunEventSink,
*,
run_id: str,
output: JsonValue,
session_snapshot: CompositorSessionSnapshot,
) -> str:
"""Emit the terminal success event with output and resumable state."""
return await emit_run_event(
sink,
event=AgentOutputRunEvent(
event=RunSucceededEvent(
run_id=run_id,
data=AgentOutputRunEventData(output=output),
data=RunSucceededEventData(output=output, session_snapshot=session_snapshot),
created_at=utc_now(),
),
)
async def emit_session_snapshot(sink: RunEventSink, *, run_id: str, data: CompositorSessionSnapshot) -> str:
"""Emit the typed Agenton session snapshot for later resumption."""
return await emit_run_event(
sink,
event=SessionSnapshotRunEvent(run_id=run_id, data=data, created_at=utc_now()),
)
async def emit_run_succeeded(sink: RunEventSink, *, run_id: str) -> str:
"""Emit the terminal success lifecycle event."""
return await emit_run_event(
sink,
event=RunSucceededEvent(run_id=run_id, data=EmptyRunEventData(), created_at=utc_now()),
)
async def emit_run_failed(
sink: RunEventSink,
*,
@ -134,11 +126,9 @@ async def emit_run_failed(
__all__ = [
"InMemoryRunEventSink",
"RunEventSink",
"emit_agent_output",
"emit_pydantic_ai_event",
"emit_run_event",
"emit_run_failed",
"emit_run_started",
"emit_run_succeeded",
"emit_session_snapshot",
]

View File

@ -3,11 +3,15 @@
The runner is storage-agnostic: it builds an Agenton compositor, enters or
resumes its session, runs pydantic-ai with ``compositor.user_prompts`` as the user
input, emits stream events, suspends the session on exit, snapshots it, and then
publishes a terminal success or failure event.
publishes a terminal success or failure event. Successful terminal events contain
both the JSON-safe final output and session snapshot; there are no separate output
or snapshot events to correlate.
"""
from collections.abc import AsyncIterable
from typing import cast
from pydantic import JsonValue, TypeAdapter
from pydantic_ai.messages import AgentStreamEvent
from agenton.compositor import CompositorSessionSnapshot
@ -16,16 +20,17 @@ from dify_agent.runtime.agent_factory import create_agent, normalize_user_input
from dify_agent.runtime.compositor_factory import build_pydantic_ai_compositor
from dify_agent.runtime.event_sink import (
RunEventSink,
emit_agent_output,
emit_pydantic_ai_event,
emit_run_failed,
emit_run_started,
emit_run_succeeded,
emit_session_snapshot,
)
from dify_agent.runtime.user_prompt_validation import EMPTY_USER_PROMPTS_ERROR, has_non_blank_user_prompt
_AGENT_OUTPUT_ADAPTER = TypeAdapter(object)
class AgentRunValidationError(ValueError):
"""Raised when a run request is valid JSON but cannot execute."""
@ -56,12 +61,15 @@ class AgentRunRunner:
await self.sink.update_status(self.run_id, "failed", message)
raise
_ = await emit_agent_output(self.sink, run_id=self.run_id, output=output)
_ = await emit_session_snapshot(self.sink, run_id=self.run_id, data=session_snapshot)
_ = await emit_run_succeeded(self.sink, run_id=self.run_id)
_ = await emit_run_succeeded(
self.sink,
run_id=self.run_id,
output=output,
session_snapshot=session_snapshot,
)
await self.sink.update_status(self.run_id, "succeeded")
async def _run_agent(self) -> tuple[str, CompositorSessionSnapshot]:
async def _run_agent(self) -> tuple[JsonValue, CompositorSessionSnapshot]:
"""Run pydantic-ai inside an entered Agenton session."""
compositor = build_pydantic_ai_compositor(self.request.compositor)
session = (
@ -86,7 +94,12 @@ class AgentRunRunner:
)
result = await agent.run(normalize_user_input(user_prompts), event_stream_handler=handle_events)
return result.output, compositor.snapshot_session(session)
return _serialize_agent_output(result.output), compositor.snapshot_session(session)
def _serialize_agent_output(output: object) -> JsonValue:
"""Convert arbitrary pydantic-ai output into the public JSON-safe payload type."""
return cast(JsonValue, _AGENT_OUTPUT_ADAPTER.dump_python(output, mode="json"))
__all__ = ["AgentRunRunner", "AgentRunValidationError"]

View File

@ -5,9 +5,9 @@ from dataclasses import dataclass
from pydantic import BaseModel, ConfigDict, ValidationError
from typing_extensions import override
from agenton.compositor import Compositor, CompositorBuilder, CompositorSession, LayerRegistry
from agenton.compositor import Compositor, CompositorBuilder, CompositorSession, LayerNodeConfig, LayerRegistry
from agenton.layers import EmptyLayerConfig, LayerControl, LayerDeps, NoLayerDeps, PlainLayer, PlainPromptType, PlainToolType
from agenton_collections.layers.plain import ObjectLayer, PromptLayer
from agenton_collections.layers.plain import ObjectLayer, PromptLayer, PromptLayerConfig
def test_registry_infers_descriptor_and_rejects_duplicate_or_missing_type_id() -> None:
@ -74,6 +74,23 @@ def test_builder_creates_config_layers_with_typed_validation() -> None:
raise AssertionError("Expected ValidationError.")
def test_layer_node_config_accepts_config_dto_and_serializes_fields() -> None:
registry = LayerRegistry()
registry.register_layer(PromptLayer)
node = LayerNodeConfig(
name="prompt",
type="plain.prompt",
config=PromptLayerConfig(prefix="hello", user="ask politely"),
)
dumped = node.model_dump(mode="json")
compositor = CompositorBuilder(registry).add_config({"layers": [dumped]}).build()
assert dumped["config"] == {"prefix": "hello", "user": "ask politely", "suffix": []}
assert [prompt.value for prompt in compositor.prompts] == ["hello"]
assert [prompt.value for prompt in compositor.user_prompts] == ["ask politely"]
class ObjectConsumerDeps(LayerDeps):
obj: ObjectLayer[str] # pyright: ignore[reportUninitializedInstanceVariable]

View File

@ -3,10 +3,18 @@ from dataclasses import dataclass
from pydantic import BaseModel, ConfigDict
from agenton.compositor import LayerRegistry
from agenton.layers import EmptyLayerConfig, EmptyRuntimeHandles, EmptyRuntimeState, LayerControl, NoLayerDeps, PlainLayer
from agenton.layers import (
EmptyLayerConfig,
EmptyRuntimeHandles,
EmptyRuntimeState,
LayerConfig,
LayerControl,
NoLayerDeps,
PlainLayer,
)
class InferredConfig(BaseModel):
class InferredConfig(LayerConfig):
value: str = "configured"
model_config = ConfigDict(extra="forbid")
@ -68,7 +76,7 @@ def test_invalid_declared_schema_type_is_rejected_clearly() -> None:
config_type = dict # pyright: ignore[reportAssignmentType]
except TypeError as e:
assert str(e) == "InvalidSchemaLayer.config_type must be a Pydantic BaseModel subclass."
assert str(e) == "InvalidSchemaLayer.config_type must be a LayerConfig subclass."
else:
raise AssertionError("Expected TypeError.")
@ -78,7 +86,7 @@ def test_invalid_declared_schema_type_is_rejected_clearly() -> None:
pass
except TypeError as e:
assert str(e) == "InvalidGenericSchemaLayer.config_type must be a Pydantic BaseModel subclass."
assert str(e) == "InvalidGenericSchemaLayer.config_type must be a LayerConfig subclass."
else:
raise AssertionError("Expected TypeError.")

View File

@ -9,6 +9,7 @@ from typing import cast, override
import httpx
import pytest
from agenton.compositor import CompositorSessionSnapshot
from dify_agent.client import (
Client,
DifyAgentHTTPError,
@ -19,12 +20,12 @@ from dify_agent.client import (
)
from dify_agent.protocol.schemas import (
CreateRunRequest,
EmptyRunEventData,
RUN_EVENT_ADAPTER,
RunEvent,
RunEventsResponse,
RunStartedEvent,
RunSucceededEvent,
RunSucceededEventData,
)
@ -47,6 +48,14 @@ def _event_frame(event: RunEvent, *, event_id: str | None = None, exclude_id: bo
return "\n".join(lines) + "\n\n"
def _run_succeeded_event(*, event_id: str = "2-0", run_id: str = "run-1") -> RunSucceededEvent:
return RunSucceededEvent(
id=event_id,
run_id=run_id,
data=RunSucceededEventData(output="done", session_snapshot=CompositorSessionSnapshot(layers=[])),
)
def _run_status_json(status: str) -> dict[str, object]:
now = datetime(2026, 5, 11, tzinfo=UTC).isoformat()
return {"run_id": "run-1", "status": status, "created_at": now, "updated_at": now, "error": None}
@ -64,7 +73,7 @@ class DisconnectingSyncStream(httpx.SyncByteStream):
raise httpx.ReadError("stream disconnected")
def test_sync_methods_parse_protocol_dtos_and_validate_create_dict() -> None:
def test_sync_methods_parse_protocol_dtos_and_send_create_request_dto() -> None:
def handler(request: httpx.Request) -> httpx.Response:
if request.method == "POST" and request.url.path == "/runs":
payload = cast(dict[str, object], json.loads(request.content))
@ -83,7 +92,7 @@ def test_sync_methods_parse_protocol_dtos_and_validate_create_dict() -> None:
200,
json={
"run_id": "run-1",
"events": [cast(object, json.loads(RUN_EVENT_ADAPTER.dump_json(event)))],
"events": [cast(object, json.loads(RUN_EVENT_ADAPTER.dump_json(event)))],
"next_cursor": "1-0",
},
)
@ -92,7 +101,7 @@ def test_sync_methods_parse_protocol_dtos_and_validate_create_dict() -> None:
http_client = httpx.Client(transport=httpx.MockTransport(handler))
client = Client(base_url="http://testserver", sync_http_client=http_client)
created = client.create_run_sync(_create_run_payload())
created = client.create_run_sync(CreateRunRequest.model_validate(_create_run_payload()))
status = client.get_run_sync(created.run_id)
events = client.get_events_sync(created.run_id, after="0-0", limit=10)
@ -162,7 +171,7 @@ def test_error_mapping_and_create_run_input_validation() -> None:
assert server_error.value.status_code == 500
with pytest.raises(DifyAgentValidationError):
_ = client.create_run_sync({"unknown": "field"})
_ = client.create_run_sync({"unknown": "field"}) # pyright: ignore[reportArgumentType]
def test_http_timeout_maps_to_client_timeout_error() -> None:
@ -192,7 +201,7 @@ def test_create_run_is_not_retried_after_timeout() -> None:
)
with pytest.raises(DifyAgentTimeoutError):
_ = client.create_run_sync(_create_run_payload())
_ = client.create_run_sync(CreateRunRequest.model_validate(_create_run_payload()))
assert attempts == 1
@ -221,7 +230,7 @@ def test_stream_events_stops_after_terminal_event() -> None:
body = "".join(
[
_event_frame(RunStartedEvent(id="1-0", run_id="run-1")),
_event_frame(RunSucceededEvent(id="2-0", run_id="run-1", data=EmptyRunEventData())),
_event_frame(_run_succeeded_event()),
]
)
@ -251,7 +260,7 @@ def test_stream_events_reconnects_from_latest_event_id() -> None:
200,
stream=DisconnectingSyncStream(_event_frame(RunStartedEvent(id="1-0", run_id="run-1"))),
)
return httpx.Response(200, content=_event_frame(RunSucceededEvent(id="2-0", run_id="run-1")))
return httpx.Response(200, content=_event_frame(_run_succeeded_event()))
client = Client(
base_url="http://testserver",
@ -271,7 +280,7 @@ def test_stream_events_reconnects_after_http_5xx_response() -> None:
seen_after.append(request.url.params["after"])
if len(seen_after) == 1:
return httpx.Response(503, json={"detail": "temporarily unavailable"})
return httpx.Response(200, content=_event_frame(RunSucceededEvent(id="2-0", run_id="run-1")))
return httpx.Response(200, content=_event_frame(_run_succeeded_event()))
client = Client(
base_url="http://testserver",
@ -321,7 +330,7 @@ def test_malformed_sse_frame_does_not_reconnect() -> None:
def test_async_stream_events_yields_terminal_event() -> None:
body = _event_frame(RunSucceededEvent(id="2-0", run_id="run-1"))
body = _event_frame(_run_succeeded_event())
def handler(_request: httpx.Request) -> httpx.Response:
return httpx.Response(200, content=body)
@ -345,7 +354,7 @@ def test_async_stream_events_reconnects_after_http_5xx_response() -> None:
seen_after.append(request.url.params["after"])
if len(seen_after) == 1:
return httpx.Response(502, json={"detail": "bad gateway"})
return httpx.Response(200, content=_event_frame(RunSucceededEvent(id="2-0", run_id="run-1")))
return httpx.Response(200, content=_event_frame(_run_succeeded_event()))
async def scenario() -> None:
http_client = httpx.AsyncClient(transport=httpx.MockTransport(handler))
@ -368,7 +377,7 @@ def test_stream_timeout_can_reconnect_until_terminal() -> None:
calls += 1
if calls == 1:
raise httpx.ReadTimeout("stream stalled", request=request)
return httpx.Response(200, content=_event_frame(RunSucceededEvent(id="2-0", run_id="run-1")))
return httpx.Response(200, content=_event_frame(_run_succeeded_event()))
client = Client(
base_url="http://testserver",

View File

@ -1,13 +1,16 @@
import pytest
from pydantic import ValidationError
from pydantic_ai.messages import FinalResultEvent
from agenton.compositor import CompositorSessionSnapshot
from dify_agent.protocol.schemas import (
RUN_EVENT_ADAPTER,
AgentOutputRunEvent,
AgentOutputRunEventData,
PydanticAIStreamRunEvent,
RunFailedEvent,
RunFailedEventData,
RunStartedEvent,
RunSucceededEvent,
RunSucceededEventData,
)
@ -15,7 +18,13 @@ def test_run_event_adapter_round_trips_typed_variants() -> None:
events = [
RunStartedEvent(run_id="run-1"),
PydanticAIStreamRunEvent(run_id="run-1", data=FinalResultEvent(tool_name=None, tool_call_id=None)),
AgentOutputRunEvent(run_id="run-1", data=AgentOutputRunEventData(output="done")),
RunSucceededEvent(
run_id="run-1",
data=RunSucceededEventData(
output={"answer": ["done"]},
session_snapshot=CompositorSessionSnapshot(layers=[]),
),
),
RunFailedEvent(run_id="run-1", data=RunFailedEventData(error="boom", reason="shutdown")),
]
@ -38,3 +47,9 @@ def test_pydantic_ai_event_data_uses_agent_stream_event_model() -> None:
assert isinstance(event, PydanticAIStreamRunEvent)
assert isinstance(event.data, FinalResultEvent)
@pytest.mark.parametrize("event_type", ["agent_output", "session_snapshot"])
def test_removed_non_terminal_payload_events_are_rejected(event_type: str) -> None:
with pytest.raises(ValidationError):
_ = RUN_EVENT_ADAPTER.validate_python({"run_id": "run-1", "type": event_type, "data": {}})

View File

@ -3,7 +3,7 @@ import asyncio
import pytest
from agenton.compositor import CompositorConfig, LayerNodeConfig
from dify_agent.protocol.schemas import AgentProfileConfig, CreateRunRequest
from dify_agent.protocol.schemas import AgentProfileConfig, CreateRunRequest, RunSucceededEvent
from dify_agent.runtime.event_sink import InMemoryRunEventSink
from dify_agent.runtime.runner import AgentRunRunner, AgentRunValidationError
@ -28,7 +28,13 @@ def test_runner_emits_terminal_success_and_snapshot() -> None:
event_types = [event.type for event in sink.events["run-1"]]
assert event_types[0] == "run_started"
assert "pydantic_ai_event" in event_types
assert event_types[-3:] == ["agent_output", "session_snapshot", "run_succeeded"]
assert "agent_output" not in event_types
assert "session_snapshot" not in event_types
assert event_types[-1:] == ["run_succeeded"]
terminal = sink.events["run-1"][-1]
assert isinstance(terminal, RunSucceededEvent)
assert terminal.data.output == "done"
assert [layer.name for layer in terminal.data.session_snapshot.layers] == ["prompt"]
assert sink.statuses["run-1"] == "succeeded"

View File

@ -1,8 +1,12 @@
import asyncio
from collections.abc import Mapping
from typing import cast
from agenton.compositor import CompositorConfig, LayerNodeConfig
from dify_agent.protocol.schemas import CreateRunRequest, RunStartedEvent
from pydantic import JsonValue
from agenton.compositor import CompositorConfig, CompositorSessionSnapshot, LayerNodeConfig, LayerSessionSnapshot
from agenton.layers import LifecycleState
from dify_agent.protocol.schemas import CreateRunRequest, RunStartedEvent, RunSucceededEvent, RunSucceededEventData
from dify_agent.storage.redis_run_store import DEFAULT_RUN_RETENTION_SECONDS, RedisRunStore
@ -17,10 +21,12 @@ def _request() -> CreateRunRequest:
class FakeRedis:
commands: list[tuple[object, ...]]
values: dict[str, object]
streams: dict[str, list[tuple[str, dict[str, object]]]]
def __init__(self) -> None:
self.commands = []
self.values = {}
self.streams = {}
async def set(self, key: str, value: object, *, ex: int | None = None) -> None:
self.commands.append(("set", key, value, ex))
@ -32,12 +38,37 @@ class FakeRedis:
async def xadd(self, key: str, fields: Mapping[str, object]) -> str:
self.commands.append(("xadd", key, dict(fields)))
return "1-0"
entries = self.streams.setdefault(key, [])
event_id = f"{len(entries) + 1}-0"
entries.append((event_id, dict(fields)))
return event_id
async def xrange(self, key: str, *, min: str = "-", count: int | None = None) -> list[tuple[str, dict[str, object]]]:
self.commands.append(("xrange", key, min, count))
entries = [entry for entry in self.streams.get(key, []) if self._is_after_min(entry[0], min)]
if count is not None:
return entries[:count]
return entries
async def expire(self, key: str, seconds: int) -> bool:
self.commands.append(("expire", key, seconds))
return True
@staticmethod
def _is_after_min(event_id: str, min_id: str) -> bool:
if min_id == "-":
return True
is_exclusive = min_id.startswith("(")
cursor = min_id[1:] if is_exclusive else min_id
event_value = FakeRedis._stream_id_value(event_id)
cursor_value = FakeRedis._stream_id_value(cursor)
return event_value > cursor_value if is_exclusive else event_value >= cursor_value
@staticmethod
def _stream_id_value(event_id: str) -> tuple[int, int]:
timestamp, sequence = event_id.split("-", maxsplit=1)
return int(timestamp), int(sequence)
def test_create_run_writes_running_record_without_job_queue_and_with_retention() -> None:
redis = FakeRedis()
@ -80,3 +111,39 @@ def test_append_event_serializes_typed_event_without_id_and_expires_run_keys() -
("expire", "test:runs:run-1:events", 60),
("expire", "test:runs:run-1:record", 60),
]
def test_get_events_round_trips_run_succeeded_output_and_session_snapshot() -> None:
redis = FakeRedis()
store = RedisRunStore(redis, prefix="test", run_retention_seconds=60) # pyright: ignore[reportArgumentType]
output = cast(JsonValue, {"answer": ["done", 1], "ok": True})
session_snapshot = CompositorSessionSnapshot(
layers=[
LayerSessionSnapshot(
name="prompt",
state=LifecycleState.SUSPENDED,
runtime_state={"resource_id": "abc"},
)
]
)
async def scenario() -> tuple[str, RunSucceededEvent]:
record = await store.create_run(_request())
event_id = await store.append_event(
RunSucceededEvent(
id="local-only",
run_id=record.run_id,
data=RunSucceededEventData(output=output, session_snapshot=session_snapshot),
)
)
page = await store.get_events(record.run_id, after="0-0", limit=10)
decoded = page.events[0]
assert isinstance(decoded, RunSucceededEvent)
assert page.next_cursor == event_id
return event_id, decoded
event_id, decoded = asyncio.run(scenario())
assert decoded.id == event_id
assert decoded.data.output == output
assert decoded.data.session_snapshot == session_snapshot