add typed dify-agent run events

This commit is contained in:
盐粒 Yanli 2026-05-11 19:55:23 +08:00
parent 658caa2ae7
commit 80312f9745
11 changed files with 278 additions and 61 deletions

View File

@ -168,10 +168,13 @@ A failed run emits:
2. zero or more `pydantic_ai_event`
3. `run_failed`
`pydantic_ai_event.data` is serialized with Pydantic AI's `AgentStreamEvent`
adapter. `session_snapshot.data` contains the serialized
`CompositorSessionSnapshot` that can be sent as `session_snapshot` in a later
create-run request with the same compositor layer names and order.
Each event keeps the same envelope shape and has typed `data`: `run_started` and
`run_succeeded` use `{}`, `pydantic_ai_event` uses Pydantic AI's
`AgentStreamEvent` union, `agent_output` uses `{ "output": string }`,
`session_snapshot` uses `CompositorSessionSnapshot`, and `run_failed` uses
`{ "error": string, "reason": string | null }`. The session snapshot can be sent
as `session_snapshot` in a later create-run request with the same compositor layer
names and order.
## Consumer examples

View File

@ -103,7 +103,10 @@ progress:
Successful runs emit `run_started`, zero or more `pydantic_ai_event`,
`agent_output`, `session_snapshot`, and `run_succeeded`. Failed runs end with
`run_failed`.
`run_failed`. Event envelopes retain `id`, `run_id`, `type`, `data`, and
`created_at`; `data` is typed per event type, including Pydantic AI's
`AgentStreamEvent` payload for `pydantic_ai_event` and `CompositorSessionSnapshot`
for `session_snapshot`.
## Examples

View File

@ -8,9 +8,23 @@ same protocol with Redis streams in ``dify_agent.storage.redis_run_store``.
from collections import defaultdict
from typing import Protocol
from pydantic import JsonValue
from pydantic_ai.messages import AgentStreamEvent
from dify_agent.server.schemas import RunEvent, RunEventType, RunStatus, utc_now
from agenton.compositor import CompositorSessionSnapshot
from dify_agent.server.schemas import (
AgentOutputRunEvent,
AgentOutputRunEventData,
EmptyRunEventData,
PydanticAIStreamRunEvent,
RunEvent,
RunFailedEvent,
RunFailedEventData,
RunStartedEvent,
RunStatus,
RunSucceededEvent,
SessionSnapshotRunEvent,
utc_now,
)
class RunEventSink(Protocol):
@ -53,12 +67,78 @@ class InMemoryRunEventSink:
async def emit_run_event(
sink: RunEventSink,
*,
run_id: str,
type: RunEventType,
data: JsonValue,
event: RunEvent,
) -> str:
"""Create and append a timestamped ``RunEvent``."""
return await sink.append_event(RunEvent(run_id=run_id, type=type, data=data, created_at=utc_now()))
"""Append an already typed public run event."""
return await sink.append_event(event)
__all__ = ["InMemoryRunEventSink", "RunEventSink", "emit_run_event"]
async def emit_run_started(sink: RunEventSink, *, run_id: str) -> str:
"""Emit the first lifecycle event for one run."""
return await emit_run_event(
sink,
event=RunStartedEvent(run_id=run_id, data=EmptyRunEventData(), created_at=utc_now()),
)
async def emit_pydantic_ai_event(sink: RunEventSink, *, run_id: str, data: AgentStreamEvent) -> str:
"""Emit one typed Pydantic AI stream event."""
return await emit_run_event(
sink,
event=PydanticAIStreamRunEvent(run_id=run_id, data=data, created_at=utc_now()),
)
async def emit_agent_output(sink: RunEventSink, *, run_id: str, output: str) -> str:
"""Emit the final output text produced by the agent."""
return await emit_run_event(
sink,
event=AgentOutputRunEvent(
run_id=run_id,
data=AgentOutputRunEventData(output=output),
created_at=utc_now(),
),
)
async def emit_session_snapshot(sink: RunEventSink, *, run_id: str, data: CompositorSessionSnapshot) -> str:
"""Emit the typed Agenton session snapshot for later resumption."""
return await emit_run_event(
sink,
event=SessionSnapshotRunEvent(run_id=run_id, data=data, created_at=utc_now()),
)
async def emit_run_succeeded(sink: RunEventSink, *, run_id: str) -> str:
"""Emit the terminal success lifecycle event."""
return await emit_run_event(
sink,
event=RunSucceededEvent(run_id=run_id, data=EmptyRunEventData(), created_at=utc_now()),
)
async def emit_run_failed(
sink: RunEventSink,
*,
run_id: str,
error: str,
reason: str | None = None,
) -> str:
"""Emit the terminal failure lifecycle event."""
return await emit_run_event(
sink,
event=RunFailedEvent(run_id=run_id, data=RunFailedEventData(error=error, reason=reason), created_at=utc_now()),
)
__all__ = [
"InMemoryRunEventSink",
"RunEventSink",
"emit_agent_output",
"emit_pydantic_ai_event",
"emit_run_event",
"emit_run_failed",
"emit_run_started",
"emit_run_succeeded",
"emit_session_snapshot",
]

View File

@ -13,7 +13,7 @@ from collections.abc import Callable
from typing import Protocol
from dify_agent.runtime.compositor_factory import build_pydantic_ai_compositor
from dify_agent.runtime.event_sink import RunEventSink, emit_run_event
from dify_agent.runtime.event_sink import RunEventSink, emit_run_failed
from dify_agent.runtime.runner import AgentRunRunner
from dify_agent.runtime.user_prompt_validation import EMPTY_USER_PROMPTS_ERROR, has_non_blank_user_prompt
from dify_agent.server.schemas import CreateRunRequest, RunRecord
@ -131,12 +131,7 @@ class RunScheduler:
"""Best-effort failure event/status for shutdown-cancelled runs."""
message = "run cancelled during server shutdown"
try:
_ = await emit_run_event(
self.store,
run_id=run_id,
type="run_failed",
data={"error": message, "reason": "shutdown"},
)
_ = await emit_run_failed(self.store, run_id=run_id, error=message, reason="shutdown")
await self.store.update_status(run_id, "failed", message)
except Exception:
logger.exception("failed to mark cancelled run failed", extra={"run_id": run_id})

View File

@ -7,20 +7,24 @@ publishes a terminal success or failure event.
"""
from collections.abc import AsyncIterable
from typing import cast
from pydantic import JsonValue, TypeAdapter
from pydantic_ai.messages import AgentStreamEvent
from agenton.compositor import CompositorSessionSnapshot
from dify_agent.runtime.agent_factory import create_agent, normalize_user_input
from dify_agent.runtime.compositor_factory import build_pydantic_ai_compositor
from dify_agent.runtime.event_sink import RunEventSink, emit_run_event
from dify_agent.runtime.event_sink import (
RunEventSink,
emit_agent_output,
emit_pydantic_ai_event,
emit_run_failed,
emit_run_started,
emit_run_succeeded,
emit_session_snapshot,
)
from dify_agent.runtime.user_prompt_validation import EMPTY_USER_PROMPTS_ERROR, has_non_blank_user_prompt
from dify_agent.server.schemas import CreateRunRequest
_AGENT_STREAM_EVENT_ADAPTER = TypeAdapter(AgentStreamEvent)
class AgentRunValidationError(ValueError):
"""Raised when a run request is valid JSON but cannot execute."""
@ -42,24 +46,19 @@ class AgentRunRunner:
async def run(self) -> None:
"""Execute the run and emit the documented event sequence."""
await self.sink.update_status(self.run_id, "running")
_ = await emit_run_event(self.sink, run_id=self.run_id, type="run_started", data={})
_ = await emit_run_started(self.sink, run_id=self.run_id)
try:
output, session_snapshot = await self._run_agent()
except Exception as exc:
message = str(exc) or type(exc).__name__
_ = await emit_run_event(self.sink, run_id=self.run_id, type="run_failed", data={"error": message})
_ = await emit_run_failed(self.sink, run_id=self.run_id, error=message)
await self.sink.update_status(self.run_id, "failed", message)
raise
_ = await emit_run_event(self.sink, run_id=self.run_id, type="agent_output", data={"output": output})
_ = await emit_run_event(
self.sink,
run_id=self.run_id,
type="session_snapshot",
data=cast(JsonValue, session_snapshot.model_dump(mode="json")),
)
_ = await emit_run_event(self.sink, run_id=self.run_id, type="run_succeeded", data={})
_ = await emit_agent_output(self.sink, run_id=self.run_id, output=output)
_ = await emit_session_snapshot(self.sink, run_id=self.run_id, data=session_snapshot)
_ = await emit_run_succeeded(self.sink, run_id=self.run_id)
await self.sink.update_status(self.run_id, "succeeded")
async def _run_agent(self) -> tuple[str, CompositorSessionSnapshot]:
@ -78,12 +77,7 @@ class AgentRunRunner:
async def handle_events(_ctx: object, events: AsyncIterable[AgentStreamEvent]) -> None:
async for event in events:
_ = await emit_run_event(
self.sink,
run_id=self.run_id,
type="pydantic_ai_event",
data=cast(JsonValue, _AGENT_STREAM_EVENT_ADAPTER.dump_python(event, mode="json")),
)
_ = await emit_pydantic_ai_event(self.sink, run_id=self.run_id, data=event)
agent = create_agent(
self.request.agent_profile,

View File

@ -3,14 +3,18 @@
The server accepts only registry-backed Agenton compositor configs. This keeps
HTTP input data-only and prevents unsafe import-path construction. Run events are
append-only records; Redis stream ids (or in-memory equivalents in tests) are the
public cursors used by polling and SSE replay.
public cursors used by polling and SSE replay. Event envelopes keep the public
``id``/``run_id``/``type``/``data``/``created_at`` shape, but each ``type`` has a
typed ``data`` model so OpenAPI, Redis replay, and runtime producers agree on the
payload contract.
"""
from datetime import datetime, timezone
from typing import Literal
from typing import Annotated, Literal, TypeAlias
from uuid import uuid4
from pydantic import BaseModel, ConfigDict, Field, JsonValue, field_validator
from pydantic import BaseModel, ConfigDict, Field, TypeAdapter, field_validator
from pydantic_ai.messages import AgentStreamEvent
from agenton.compositor import CompositorConfig, CompositorSessionSnapshot
@ -80,18 +84,94 @@ class RunStatusResponse(BaseModel):
model_config = ConfigDict(extra="forbid")
class RunEvent(BaseModel):
"""Append-only event visible through polling and SSE."""
class EmptyRunEventData(BaseModel):
"""Typed empty payload for lifecycle events that carry no extra data."""
model_config = ConfigDict(extra="forbid")
class AgentOutputRunEventData(BaseModel):
"""Final agent output payload emitted before the session snapshot."""
output: str
model_config = ConfigDict(extra="forbid")
class RunFailedEventData(BaseModel):
"""Terminal failure payload shown to polling and SSE consumers."""
error: str
reason: str | None = None
model_config = ConfigDict(extra="forbid")
class BaseRunEvent(BaseModel):
"""Shared append-only event envelope visible through polling and SSE."""
id: str | None = None
run_id: str
type: RunEventType
data: JsonValue = Field(default_factory=dict)
created_at: datetime = Field(default_factory=utc_now)
model_config = ConfigDict(extra="forbid")
class RunStartedEvent(BaseRunEvent):
"""Run lifecycle event emitted before runtime execution starts."""
type: Literal["run_started"] = "run_started"
data: EmptyRunEventData = Field(default_factory=EmptyRunEventData)
class PydanticAIStreamRunEvent(BaseRunEvent):
"""Pydantic AI stream event using the upstream typed event model."""
type: Literal["pydantic_ai_event"] = "pydantic_ai_event"
data: AgentStreamEvent
class AgentOutputRunEvent(BaseRunEvent):
"""Run event carrying the final agent output string."""
type: Literal["agent_output"] = "agent_output"
data: AgentOutputRunEventData
class SessionSnapshotRunEvent(BaseRunEvent):
"""Run event carrying the resumable Agenton session snapshot."""
type: Literal["session_snapshot"] = "session_snapshot"
data: CompositorSessionSnapshot
class RunSucceededEvent(BaseRunEvent):
"""Terminal success event emitted after output and session snapshot."""
type: Literal["run_succeeded"] = "run_succeeded"
data: EmptyRunEventData = Field(default_factory=EmptyRunEventData)
class RunFailedEvent(BaseRunEvent):
"""Terminal failure event emitted before the run status becomes failed."""
type: Literal["run_failed"] = "run_failed"
data: RunFailedEventData
RunEvent: TypeAlias = Annotated[
RunStartedEvent
| PydanticAIStreamRunEvent
| AgentOutputRunEvent
| SessionSnapshotRunEvent
| RunSucceededEvent
| RunFailedEvent,
Field(discriminator="type"),
]
RUN_EVENT_ADAPTER = TypeAdapter(RunEvent)
class RunEventsResponse(BaseModel):
"""Cursor-paginated event log response."""
@ -125,13 +205,24 @@ class RunRecord(BaseModel):
__all__ = [
"AgentProfileConfig",
"AgentOutputRunEvent",
"AgentOutputRunEventData",
"BaseRunEvent",
"CreateRunRequest",
"CreateRunResponse",
"EmptyRunEventData",
"PydanticAIStreamRunEvent",
"RUN_EVENT_ADAPTER",
"RunEvent",
"RunEventsResponse",
"RunFailedEvent",
"RunFailedEventData",
"RunRecord",
"RunStartedEvent",
"RunStatus",
"RunStatusResponse",
"RunSucceededEvent",
"SessionSnapshotRunEvent",
"new_run_id",
"utc_now",
]

View File

@ -7,7 +7,7 @@ name. Payload data is the full public ``RunEvent`` JSON object.
from collections.abc import AsyncIterable, AsyncIterator
from dify_agent.server.schemas import RunEvent
from dify_agent.server.schemas import RUN_EVENT_ADAPTER, RunEvent
def format_sse_event(event: RunEvent) -> str:
@ -16,7 +16,7 @@ def format_sse_event(event: RunEvent) -> str:
if event.id is not None:
lines.append(f"id: {event.id}")
lines.append(f"event: {event.type}")
lines.append(f"data: {event.model_dump_json()}")
lines.append(f"data: {RUN_EVENT_ADAPTER.dump_json(event).decode()}")
return "\n".join(lines) + "\n\n"

View File

@ -9,12 +9,12 @@ polling and SSE. Execution is scheduled in-process by
from collections.abc import AsyncIterator
from typing import cast
from pydantic import JsonValue
from redis.asyncio import Redis
from dify_agent.runtime.event_sink import RunEventSink
from dify_agent.server.schemas import (
CreateRunRequest,
RUN_EVENT_ADAPTER,
RunEvent,
RunEventsResponse,
RunRecord,
@ -63,9 +63,10 @@ class RedisRunStore(RunEventSink):
async def append_event(self, event: RunEvent) -> str:
"""Append an event JSON payload to the run's Redis stream."""
payload = RUN_EVENT_ADAPTER.dump_json(event, exclude={"id"}).decode()
event_id = await self.redis.xadd(
run_events_key(self.prefix, event.run_id),
{"payload": event.model_dump_json(exclude={"id"})},
{"payload": payload},
)
return event_id.decode() if isinstance(event_id, bytes) else str(event_id)
@ -107,12 +108,8 @@ class RedisRunStore(RunEventSink):
if isinstance(payload, bytes):
payload = payload.decode()
event_id = raw_id.decode() if isinstance(raw_id, bytes) else str(raw_id)
return RunEvent.model_validate_json(cast(str, payload)).model_copy(update={"id": event_id, "run_id": run_id})
def json_field(value: object) -> JsonValue:
"""Narrow helper for dynamic Redis payloads."""
return cast(JsonValue, value)
event = RUN_EVENT_ADAPTER.validate_json(cast(str, payload))
return event.model_copy(update={"id": event_id, "run_id": run_id})
__all__ = ["RedisRunStore", "RunNotFoundError"]

View File

@ -0,0 +1,40 @@
from pydantic_ai.messages import FinalResultEvent
from dify_agent.server.schemas import (
RUN_EVENT_ADAPTER,
AgentOutputRunEvent,
AgentOutputRunEventData,
PydanticAIStreamRunEvent,
RunFailedEvent,
RunFailedEventData,
RunStartedEvent,
)
def test_run_event_adapter_round_trips_typed_variants() -> None:
events = [
RunStartedEvent(run_id="run-1"),
PydanticAIStreamRunEvent(run_id="run-1", data=FinalResultEvent(tool_name=None, tool_call_id=None)),
AgentOutputRunEvent(run_id="run-1", data=AgentOutputRunEventData(output="done")),
RunFailedEvent(run_id="run-1", data=RunFailedEventData(error="boom", reason="shutdown")),
]
for event in events:
payload = RUN_EVENT_ADAPTER.dump_json(event)
decoded = RUN_EVENT_ADAPTER.validate_json(payload)
assert decoded.type == event.type
assert decoded.run_id == event.run_id
def test_pydantic_ai_event_data_uses_agent_stream_event_model() -> None:
event = RUN_EVENT_ADAPTER.validate_python(
{
"run_id": "run-1",
"type": "pydantic_ai_event",
"data": {"event_kind": "final_result", "tool_name": None, "tool_call_id": None},
}
)
assert isinstance(event, PydanticAIStreamRunEvent)
assert isinstance(event.data, FinalResultEvent)

View File

@ -1,9 +1,9 @@
from dify_agent.server.schemas import RunEvent
from dify_agent.server.schemas import RunStartedEvent
from dify_agent.server.sse import format_sse_event
def test_format_sse_event_uses_id_event_and_json_data() -> None:
event = RunEvent(id="7-0", run_id="run-1", type="run_started", data={})
event = RunStartedEvent(id="7-0", run_id="run-1")
frame = format_sse_event(event)

View File

@ -2,7 +2,7 @@ import asyncio
from collections.abc import Mapping
from agenton.compositor import CompositorConfig, LayerNodeConfig
from dify_agent.server.schemas import CreateRunRequest
from dify_agent.server.schemas import CreateRunRequest, RunStartedEvent
from dify_agent.storage.redis_run_store import RedisRunStore
@ -37,3 +37,17 @@ def test_create_run_writes_running_record_without_job_queue() -> None:
assert record.status == "running"
assert [command[0] for command in redis.commands] == ["set"]
assert redis.commands[0][1] == f"test:runs:{record.run_id}:record"
def test_append_event_serializes_typed_event_without_id() -> None:
redis = FakeRedis()
store = RedisRunStore(redis, prefix="test") # pyright: ignore[reportArgumentType]
event_id = asyncio.run(store.append_event(RunStartedEvent(id="local", run_id="run-1")))
assert event_id == "1-0"
assert redis.commands[0][0] == "xadd"
fields = redis.commands[0][2]
assert isinstance(fields, dict)
assert '"id"' not in str(fields["payload"])
assert '"type":"run_started"' in str(fields["payload"])