diff --git a/dify-agent/docs/dify-agent/api/README.md b/dify-agent/docs/dify-agent/api/README.md index efffe67005..b983500208 100644 --- a/dify-agent/docs/dify-agent/api/README.md +++ b/dify-agent/docs/dify-agent/api/README.md @@ -168,10 +168,13 @@ A failed run emits: 2. zero or more `pydantic_ai_event` 3. `run_failed` -`pydantic_ai_event.data` is serialized with Pydantic AI's `AgentStreamEvent` -adapter. `session_snapshot.data` contains the serialized -`CompositorSessionSnapshot` that can be sent as `session_snapshot` in a later -create-run request with the same compositor layer names and order. +Each event keeps the same envelope shape and has typed `data`: `run_started` and +`run_succeeded` use `{}`, `pydantic_ai_event` uses Pydantic AI's +`AgentStreamEvent` union, `agent_output` uses `{ "output": string }`, +`session_snapshot` uses `CompositorSessionSnapshot`, and `run_failed` uses +`{ "error": string, "reason": string | null }`. The session snapshot can be sent +as `session_snapshot` in a later create-run request with the same compositor layer +names and order. ## Consumer examples diff --git a/dify-agent/docs/dify-agent/guide/README.md b/dify-agent/docs/dify-agent/guide/README.md index b2ebf79c04..1cd82ffcc2 100644 --- a/dify-agent/docs/dify-agent/guide/README.md +++ b/dify-agent/docs/dify-agent/guide/README.md @@ -103,7 +103,10 @@ progress: Successful runs emit `run_started`, zero or more `pydantic_ai_event`, `agent_output`, `session_snapshot`, and `run_succeeded`. Failed runs end with -`run_failed`. +`run_failed`. Event envelopes retain `id`, `run_id`, `type`, `data`, and +`created_at`; `data` is typed per event type, including Pydantic AI's +`AgentStreamEvent` payload for `pydantic_ai_event` and `CompositorSessionSnapshot` +for `session_snapshot`. ## Examples diff --git a/dify-agent/src/dify_agent/runtime/event_sink.py b/dify-agent/src/dify_agent/runtime/event_sink.py index 563725b330..552961750d 100644 --- a/dify-agent/src/dify_agent/runtime/event_sink.py +++ b/dify-agent/src/dify_agent/runtime/event_sink.py @@ -8,9 +8,23 @@ same protocol with Redis streams in ``dify_agent.storage.redis_run_store``. from collections import defaultdict from typing import Protocol -from pydantic import JsonValue +from pydantic_ai.messages import AgentStreamEvent -from dify_agent.server.schemas import RunEvent, RunEventType, RunStatus, utc_now +from agenton.compositor import CompositorSessionSnapshot +from dify_agent.server.schemas import ( + AgentOutputRunEvent, + AgentOutputRunEventData, + EmptyRunEventData, + PydanticAIStreamRunEvent, + RunEvent, + RunFailedEvent, + RunFailedEventData, + RunStartedEvent, + RunStatus, + RunSucceededEvent, + SessionSnapshotRunEvent, + utc_now, +) class RunEventSink(Protocol): @@ -53,12 +67,78 @@ class InMemoryRunEventSink: async def emit_run_event( sink: RunEventSink, *, - run_id: str, - type: RunEventType, - data: JsonValue, + event: RunEvent, ) -> str: - """Create and append a timestamped ``RunEvent``.""" - return await sink.append_event(RunEvent(run_id=run_id, type=type, data=data, created_at=utc_now())) + """Append an already typed public run event.""" + return await sink.append_event(event) -__all__ = ["InMemoryRunEventSink", "RunEventSink", "emit_run_event"] +async def emit_run_started(sink: RunEventSink, *, run_id: str) -> str: + """Emit the first lifecycle event for one run.""" + return await emit_run_event( + sink, + event=RunStartedEvent(run_id=run_id, data=EmptyRunEventData(), created_at=utc_now()), + ) + + +async def emit_pydantic_ai_event(sink: RunEventSink, *, run_id: str, data: AgentStreamEvent) -> str: + """Emit one typed Pydantic AI stream event.""" + return await emit_run_event( + sink, + event=PydanticAIStreamRunEvent(run_id=run_id, data=data, created_at=utc_now()), + ) + + +async def emit_agent_output(sink: RunEventSink, *, run_id: str, output: str) -> str: + """Emit the final output text produced by the agent.""" + return await emit_run_event( + sink, + event=AgentOutputRunEvent( + run_id=run_id, + data=AgentOutputRunEventData(output=output), + created_at=utc_now(), + ), + ) + + +async def emit_session_snapshot(sink: RunEventSink, *, run_id: str, data: CompositorSessionSnapshot) -> str: + """Emit the typed Agenton session snapshot for later resumption.""" + return await emit_run_event( + sink, + event=SessionSnapshotRunEvent(run_id=run_id, data=data, created_at=utc_now()), + ) + + +async def emit_run_succeeded(sink: RunEventSink, *, run_id: str) -> str: + """Emit the terminal success lifecycle event.""" + return await emit_run_event( + sink, + event=RunSucceededEvent(run_id=run_id, data=EmptyRunEventData(), created_at=utc_now()), + ) + + +async def emit_run_failed( + sink: RunEventSink, + *, + run_id: str, + error: str, + reason: str | None = None, +) -> str: + """Emit the terminal failure lifecycle event.""" + return await emit_run_event( + sink, + event=RunFailedEvent(run_id=run_id, data=RunFailedEventData(error=error, reason=reason), created_at=utc_now()), + ) + + +__all__ = [ + "InMemoryRunEventSink", + "RunEventSink", + "emit_agent_output", + "emit_pydantic_ai_event", + "emit_run_event", + "emit_run_failed", + "emit_run_started", + "emit_run_succeeded", + "emit_session_snapshot", +] diff --git a/dify-agent/src/dify_agent/runtime/run_scheduler.py b/dify-agent/src/dify_agent/runtime/run_scheduler.py index 685f8849bf..329893d290 100644 --- a/dify-agent/src/dify_agent/runtime/run_scheduler.py +++ b/dify-agent/src/dify_agent/runtime/run_scheduler.py @@ -13,7 +13,7 @@ from collections.abc import Callable from typing import Protocol from dify_agent.runtime.compositor_factory import build_pydantic_ai_compositor -from dify_agent.runtime.event_sink import RunEventSink, emit_run_event +from dify_agent.runtime.event_sink import RunEventSink, emit_run_failed from dify_agent.runtime.runner import AgentRunRunner from dify_agent.runtime.user_prompt_validation import EMPTY_USER_PROMPTS_ERROR, has_non_blank_user_prompt from dify_agent.server.schemas import CreateRunRequest, RunRecord @@ -131,12 +131,7 @@ class RunScheduler: """Best-effort failure event/status for shutdown-cancelled runs.""" message = "run cancelled during server shutdown" try: - _ = await emit_run_event( - self.store, - run_id=run_id, - type="run_failed", - data={"error": message, "reason": "shutdown"}, - ) + _ = await emit_run_failed(self.store, run_id=run_id, error=message, reason="shutdown") await self.store.update_status(run_id, "failed", message) except Exception: logger.exception("failed to mark cancelled run failed", extra={"run_id": run_id}) diff --git a/dify-agent/src/dify_agent/runtime/runner.py b/dify-agent/src/dify_agent/runtime/runner.py index c34bf543e8..296d14d1c5 100644 --- a/dify-agent/src/dify_agent/runtime/runner.py +++ b/dify-agent/src/dify_agent/runtime/runner.py @@ -7,20 +7,24 @@ publishes a terminal success or failure event. """ from collections.abc import AsyncIterable -from typing import cast -from pydantic import JsonValue, TypeAdapter from pydantic_ai.messages import AgentStreamEvent from agenton.compositor import CompositorSessionSnapshot from dify_agent.runtime.agent_factory import create_agent, normalize_user_input from dify_agent.runtime.compositor_factory import build_pydantic_ai_compositor -from dify_agent.runtime.event_sink import RunEventSink, emit_run_event +from dify_agent.runtime.event_sink import ( + RunEventSink, + emit_agent_output, + emit_pydantic_ai_event, + emit_run_failed, + emit_run_started, + emit_run_succeeded, + emit_session_snapshot, +) from dify_agent.runtime.user_prompt_validation import EMPTY_USER_PROMPTS_ERROR, has_non_blank_user_prompt from dify_agent.server.schemas import CreateRunRequest -_AGENT_STREAM_EVENT_ADAPTER = TypeAdapter(AgentStreamEvent) - class AgentRunValidationError(ValueError): """Raised when a run request is valid JSON but cannot execute.""" @@ -42,24 +46,19 @@ class AgentRunRunner: async def run(self) -> None: """Execute the run and emit the documented event sequence.""" await self.sink.update_status(self.run_id, "running") - _ = await emit_run_event(self.sink, run_id=self.run_id, type="run_started", data={}) + _ = await emit_run_started(self.sink, run_id=self.run_id) try: output, session_snapshot = await self._run_agent() except Exception as exc: message = str(exc) or type(exc).__name__ - _ = await emit_run_event(self.sink, run_id=self.run_id, type="run_failed", data={"error": message}) + _ = await emit_run_failed(self.sink, run_id=self.run_id, error=message) await self.sink.update_status(self.run_id, "failed", message) raise - _ = await emit_run_event(self.sink, run_id=self.run_id, type="agent_output", data={"output": output}) - _ = await emit_run_event( - self.sink, - run_id=self.run_id, - type="session_snapshot", - data=cast(JsonValue, session_snapshot.model_dump(mode="json")), - ) - _ = await emit_run_event(self.sink, run_id=self.run_id, type="run_succeeded", data={}) + _ = await emit_agent_output(self.sink, run_id=self.run_id, output=output) + _ = await emit_session_snapshot(self.sink, run_id=self.run_id, data=session_snapshot) + _ = await emit_run_succeeded(self.sink, run_id=self.run_id) await self.sink.update_status(self.run_id, "succeeded") async def _run_agent(self) -> tuple[str, CompositorSessionSnapshot]: @@ -78,12 +77,7 @@ class AgentRunRunner: async def handle_events(_ctx: object, events: AsyncIterable[AgentStreamEvent]) -> None: async for event in events: - _ = await emit_run_event( - self.sink, - run_id=self.run_id, - type="pydantic_ai_event", - data=cast(JsonValue, _AGENT_STREAM_EVENT_ADAPTER.dump_python(event, mode="json")), - ) + _ = await emit_pydantic_ai_event(self.sink, run_id=self.run_id, data=event) agent = create_agent( self.request.agent_profile, diff --git a/dify-agent/src/dify_agent/server/schemas.py b/dify-agent/src/dify_agent/server/schemas.py index 548d7413b4..7b3ff93f92 100644 --- a/dify-agent/src/dify_agent/server/schemas.py +++ b/dify-agent/src/dify_agent/server/schemas.py @@ -3,14 +3,18 @@ The server accepts only registry-backed Agenton compositor configs. This keeps HTTP input data-only and prevents unsafe import-path construction. Run events are append-only records; Redis stream ids (or in-memory equivalents in tests) are the -public cursors used by polling and SSE replay. +public cursors used by polling and SSE replay. Event envelopes keep the public +``id``/``run_id``/``type``/``data``/``created_at`` shape, but each ``type`` has a +typed ``data`` model so OpenAPI, Redis replay, and runtime producers agree on the +payload contract. """ from datetime import datetime, timezone -from typing import Literal +from typing import Annotated, Literal, TypeAlias from uuid import uuid4 -from pydantic import BaseModel, ConfigDict, Field, JsonValue, field_validator +from pydantic import BaseModel, ConfigDict, Field, TypeAdapter, field_validator +from pydantic_ai.messages import AgentStreamEvent from agenton.compositor import CompositorConfig, CompositorSessionSnapshot @@ -80,18 +84,94 @@ class RunStatusResponse(BaseModel): model_config = ConfigDict(extra="forbid") -class RunEvent(BaseModel): - """Append-only event visible through polling and SSE.""" +class EmptyRunEventData(BaseModel): + """Typed empty payload for lifecycle events that carry no extra data.""" + + model_config = ConfigDict(extra="forbid") + + +class AgentOutputRunEventData(BaseModel): + """Final agent output payload emitted before the session snapshot.""" + + output: str + + model_config = ConfigDict(extra="forbid") + + +class RunFailedEventData(BaseModel): + """Terminal failure payload shown to polling and SSE consumers.""" + + error: str + reason: str | None = None + + model_config = ConfigDict(extra="forbid") + + +class BaseRunEvent(BaseModel): + """Shared append-only event envelope visible through polling and SSE.""" id: str | None = None run_id: str - type: RunEventType - data: JsonValue = Field(default_factory=dict) created_at: datetime = Field(default_factory=utc_now) model_config = ConfigDict(extra="forbid") +class RunStartedEvent(BaseRunEvent): + """Run lifecycle event emitted before runtime execution starts.""" + + type: Literal["run_started"] = "run_started" + data: EmptyRunEventData = Field(default_factory=EmptyRunEventData) + + +class PydanticAIStreamRunEvent(BaseRunEvent): + """Pydantic AI stream event using the upstream typed event model.""" + + type: Literal["pydantic_ai_event"] = "pydantic_ai_event" + data: AgentStreamEvent + + +class AgentOutputRunEvent(BaseRunEvent): + """Run event carrying the final agent output string.""" + + type: Literal["agent_output"] = "agent_output" + data: AgentOutputRunEventData + + +class SessionSnapshotRunEvent(BaseRunEvent): + """Run event carrying the resumable Agenton session snapshot.""" + + type: Literal["session_snapshot"] = "session_snapshot" + data: CompositorSessionSnapshot + + +class RunSucceededEvent(BaseRunEvent): + """Terminal success event emitted after output and session snapshot.""" + + type: Literal["run_succeeded"] = "run_succeeded" + data: EmptyRunEventData = Field(default_factory=EmptyRunEventData) + + +class RunFailedEvent(BaseRunEvent): + """Terminal failure event emitted before the run status becomes failed.""" + + type: Literal["run_failed"] = "run_failed" + data: RunFailedEventData + + + +RunEvent: TypeAlias = Annotated[ + RunStartedEvent + | PydanticAIStreamRunEvent + | AgentOutputRunEvent + | SessionSnapshotRunEvent + | RunSucceededEvent + | RunFailedEvent, + Field(discriminator="type"), +] +RUN_EVENT_ADAPTER = TypeAdapter(RunEvent) + + class RunEventsResponse(BaseModel): """Cursor-paginated event log response.""" @@ -125,13 +205,24 @@ class RunRecord(BaseModel): __all__ = [ "AgentProfileConfig", + "AgentOutputRunEvent", + "AgentOutputRunEventData", + "BaseRunEvent", "CreateRunRequest", "CreateRunResponse", + "EmptyRunEventData", + "PydanticAIStreamRunEvent", + "RUN_EVENT_ADAPTER", "RunEvent", "RunEventsResponse", + "RunFailedEvent", + "RunFailedEventData", "RunRecord", + "RunStartedEvent", "RunStatus", "RunStatusResponse", + "RunSucceededEvent", + "SessionSnapshotRunEvent", "new_run_id", "utc_now", ] diff --git a/dify-agent/src/dify_agent/server/sse.py b/dify-agent/src/dify_agent/server/sse.py index 6d9d33a9b1..0e917120a5 100644 --- a/dify-agent/src/dify_agent/server/sse.py +++ b/dify-agent/src/dify_agent/server/sse.py @@ -7,7 +7,7 @@ name. Payload data is the full public ``RunEvent`` JSON object. from collections.abc import AsyncIterable, AsyncIterator -from dify_agent.server.schemas import RunEvent +from dify_agent.server.schemas import RUN_EVENT_ADAPTER, RunEvent def format_sse_event(event: RunEvent) -> str: @@ -16,7 +16,7 @@ def format_sse_event(event: RunEvent) -> str: if event.id is not None: lines.append(f"id: {event.id}") lines.append(f"event: {event.type}") - lines.append(f"data: {event.model_dump_json()}") + lines.append(f"data: {RUN_EVENT_ADAPTER.dump_json(event).decode()}") return "\n".join(lines) + "\n\n" diff --git a/dify-agent/src/dify_agent/storage/redis_run_store.py b/dify-agent/src/dify_agent/storage/redis_run_store.py index 1f4871c5c8..d69f339119 100644 --- a/dify-agent/src/dify_agent/storage/redis_run_store.py +++ b/dify-agent/src/dify_agent/storage/redis_run_store.py @@ -9,12 +9,12 @@ polling and SSE. Execution is scheduled in-process by from collections.abc import AsyncIterator from typing import cast -from pydantic import JsonValue from redis.asyncio import Redis from dify_agent.runtime.event_sink import RunEventSink from dify_agent.server.schemas import ( CreateRunRequest, + RUN_EVENT_ADAPTER, RunEvent, RunEventsResponse, RunRecord, @@ -63,9 +63,10 @@ class RedisRunStore(RunEventSink): async def append_event(self, event: RunEvent) -> str: """Append an event JSON payload to the run's Redis stream.""" + payload = RUN_EVENT_ADAPTER.dump_json(event, exclude={"id"}).decode() event_id = await self.redis.xadd( run_events_key(self.prefix, event.run_id), - {"payload": event.model_dump_json(exclude={"id"})}, + {"payload": payload}, ) return event_id.decode() if isinstance(event_id, bytes) else str(event_id) @@ -107,12 +108,8 @@ class RedisRunStore(RunEventSink): if isinstance(payload, bytes): payload = payload.decode() event_id = raw_id.decode() if isinstance(raw_id, bytes) else str(raw_id) - return RunEvent.model_validate_json(cast(str, payload)).model_copy(update={"id": event_id, "run_id": run_id}) - - -def json_field(value: object) -> JsonValue: - """Narrow helper for dynamic Redis payloads.""" - return cast(JsonValue, value) + event = RUN_EVENT_ADAPTER.validate_json(cast(str, payload)) + return event.model_copy(update={"id": event_id, "run_id": run_id}) __all__ = ["RedisRunStore", "RunNotFoundError"] diff --git a/dify-agent/tests/local/dify_agent/server/test_schemas.py b/dify-agent/tests/local/dify_agent/server/test_schemas.py new file mode 100644 index 0000000000..e4be884b2e --- /dev/null +++ b/dify-agent/tests/local/dify_agent/server/test_schemas.py @@ -0,0 +1,40 @@ +from pydantic_ai.messages import FinalResultEvent + +from dify_agent.server.schemas import ( + RUN_EVENT_ADAPTER, + AgentOutputRunEvent, + AgentOutputRunEventData, + PydanticAIStreamRunEvent, + RunFailedEvent, + RunFailedEventData, + RunStartedEvent, +) + + +def test_run_event_adapter_round_trips_typed_variants() -> None: + events = [ + RunStartedEvent(run_id="run-1"), + PydanticAIStreamRunEvent(run_id="run-1", data=FinalResultEvent(tool_name=None, tool_call_id=None)), + AgentOutputRunEvent(run_id="run-1", data=AgentOutputRunEventData(output="done")), + RunFailedEvent(run_id="run-1", data=RunFailedEventData(error="boom", reason="shutdown")), + ] + + for event in events: + payload = RUN_EVENT_ADAPTER.dump_json(event) + decoded = RUN_EVENT_ADAPTER.validate_json(payload) + + assert decoded.type == event.type + assert decoded.run_id == event.run_id + + +def test_pydantic_ai_event_data_uses_agent_stream_event_model() -> None: + event = RUN_EVENT_ADAPTER.validate_python( + { + "run_id": "run-1", + "type": "pydantic_ai_event", + "data": {"event_kind": "final_result", "tool_name": None, "tool_call_id": None}, + } + ) + + assert isinstance(event, PydanticAIStreamRunEvent) + assert isinstance(event.data, FinalResultEvent) diff --git a/dify-agent/tests/local/dify_agent/server/test_sse.py b/dify-agent/tests/local/dify_agent/server/test_sse.py index fe05f39285..f54249453e 100644 --- a/dify-agent/tests/local/dify_agent/server/test_sse.py +++ b/dify-agent/tests/local/dify_agent/server/test_sse.py @@ -1,9 +1,9 @@ -from dify_agent.server.schemas import RunEvent +from dify_agent.server.schemas import RunStartedEvent from dify_agent.server.sse import format_sse_event def test_format_sse_event_uses_id_event_and_json_data() -> None: - event = RunEvent(id="7-0", run_id="run-1", type="run_started", data={}) + event = RunStartedEvent(id="7-0", run_id="run-1") frame = format_sse_event(event) diff --git a/dify-agent/tests/local/dify_agent/storage/test_redis_run_store.py b/dify-agent/tests/local/dify_agent/storage/test_redis_run_store.py index c6e0d8025c..4df4ec9a8b 100644 --- a/dify-agent/tests/local/dify_agent/storage/test_redis_run_store.py +++ b/dify-agent/tests/local/dify_agent/storage/test_redis_run_store.py @@ -2,7 +2,7 @@ import asyncio from collections.abc import Mapping from agenton.compositor import CompositorConfig, LayerNodeConfig -from dify_agent.server.schemas import CreateRunRequest +from dify_agent.server.schemas import CreateRunRequest, RunStartedEvent from dify_agent.storage.redis_run_store import RedisRunStore @@ -37,3 +37,17 @@ def test_create_run_writes_running_record_without_job_queue() -> None: assert record.status == "running" assert [command[0] for command in redis.commands] == ["set"] assert redis.commands[0][1] == f"test:runs:{record.run_id}:record" + + +def test_append_event_serializes_typed_event_without_id() -> None: + redis = FakeRedis() + store = RedisRunStore(redis, prefix="test") # pyright: ignore[reportArgumentType] + + event_id = asyncio.run(store.append_event(RunStartedEvent(id="local", run_id="run-1"))) + + assert event_id == "1-0" + assert redis.commands[0][0] == "xadd" + fields = redis.commands[0][2] + assert isinstance(fields, dict) + assert '"id"' not in str(fields["payload"]) + assert '"type":"run_started"' in str(fields["payload"])