From 3c95ff47824a9029883f822e5c80fd0e3d086f31 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=9B=90=E7=B2=92=20Yanli?= Date: Fri, 8 May 2026 01:37:05 +0800 Subject: [PATCH] add docs and tests for dify-agent --- dify-agent/docs/dify-agent/api/README.md | 178 ++++++++++++++++++ dify-agent/docs/dify-agent/guide/README.md | 139 ++++++++++++++ .../local/dify_agent/runtime/test_runner.py | 62 ++++++ .../tests/local/dify_agent/server/test_app.py | 92 +++++++++ .../dify_agent/server/test_runs_routes.py | 29 +++ .../tests/local/dify_agent/server/test_sse.py | 12 ++ .../storage/test_redis_run_store.py | 89 +++++++++ .../dify_agent/worker/test_job_worker.py | 90 +++++++++ 8 files changed, 691 insertions(+) create mode 100644 dify-agent/docs/dify-agent/api/README.md create mode 100644 dify-agent/docs/dify-agent/guide/README.md create mode 100644 dify-agent/tests/local/dify_agent/runtime/test_runner.py create mode 100644 dify-agent/tests/local/dify_agent/server/test_app.py create mode 100644 dify-agent/tests/local/dify_agent/server/test_runs_routes.py create mode 100644 dify-agent/tests/local/dify_agent/server/test_sse.py create mode 100644 dify-agent/tests/local/dify_agent/storage/test_redis_run_store.py create mode 100644 dify-agent/tests/local/dify_agent/worker/test_job_worker.py diff --git a/dify-agent/docs/dify-agent/api/README.md b/dify-agent/docs/dify-agent/api/README.md new file mode 100644 index 0000000000..21da86649e --- /dev/null +++ b/dify-agent/docs/dify-agent/api/README.md @@ -0,0 +1,178 @@ +# Dify Agent Run API + +The Dify Agent API exposes asynchronous agent runs backed by Agenton compositor +configuration, Pydantic AI runtime execution, and Redis Streams event logs. The +FastAPI application lives at `dify-agent/src/dify_agent/server/app.py`. + +## Input model + +Create-run requests accept a `CompositorConfig` and an optional +`CompositorSessionSnapshot`. There is **no top-level `user_prompt` field**. +User input must be supplied by Agenton layers. In the MVP server, the safe +config-constructible layer registry includes `plain.prompt`; its `config.user` +field becomes `Compositor.user_prompts` and is passed to Pydantic AI as the run +input. + +Blank user input is rejected. A request with no user prompt, an empty string, or +only whitespace strings such as `"user": ["", " "]` returns `422` from the API +or a runner validation error if it reaches worker execution. + +The server does not implement a Pydantic AI history layer. Resumable Agenton +state is represented only by `session_snapshot`. + +## Create a run + +```http +POST /runs +Content-Type: application/json +``` + +Request: + +```json +{ + "compositor": { + "schema_version": 1, + "layers": [ + { + "name": "prompt", + "type": "plain.prompt", + "config": { + "prefix": "You are a concise assistant.", + "user": "Say hello from the Dify Agent API." + } + } + ] + }, + "session_snapshot": null, + "agent_profile": { + "provider": "test", + "output_text": "Hello from the TestModel." + } +} +``` + +Response (`202 Accepted`): + +```json +{ + "run_id": "4a7f9a98-5c55-48d0-8f3e-87ef2cf81234", + "status": "queued" +} +``` + +`agent_profile.provider` currently supports the credential-free `test` profile. + +Validation error example (`422`): + +```json +{ + "detail": "compositor.user_prompts must not be empty" +} +``` + +## Get run status + +```http +GET /runs/{run_id} +``` + +Response: + +```json +{ + "run_id": "4a7f9a98-5c55-48d0-8f3e-87ef2cf81234", + "status": "succeeded", + "created_at": "2026-05-08T12:00:00Z", + "updated_at": "2026-05-08T12:00:02Z", + "error": null +} +``` + +Status values are: + +- `queued` +- `running` +- `succeeded` +- `failed` + +Unknown run ids return `404` with `"run not found"`. + +## Poll events + +```http +GET /runs/{run_id}/events?after=0-0&limit=100 +``` + +Cursor values are Redis Stream IDs. Use `after=0-0` to read from the beginning. +The response includes `next_cursor`; pass it as the next `after` value to continue +polling. + +Response: + +```json +{ + "run_id": "4a7f9a98-5c55-48d0-8f3e-87ef2cf81234", + "events": [ + { + "id": "1715170000000-0", + "run_id": "4a7f9a98-5c55-48d0-8f3e-87ef2cf81234", + "type": "run_started", + "data": {}, + "created_at": "2026-05-08T12:00:00Z" + } + ], + "next_cursor": "1715170000000-0" +} +``` + +## Stream events with SSE + +```http +GET /runs/{run_id}/events/sse +``` + +SSE frames use the run event id as `id`, the event type as `event`, and the full +`RunEvent` JSON object as `data`: + +```text +id: 1715170000000-0 +event: run_started +data: {"id":"1715170000000-0","run_id":"...","type":"run_started","data":{},"created_at":"..."} + +``` + +Replay can start from a cursor with either: + +- `GET /runs/{run_id}/events/sse?after=1715170000000-0` +- `Last-Event-ID: 1715170000000-0` + +If both are provided, the `after` query parameter takes precedence. + +## Event types and order + +A normal successful run emits: + +1. `run_started` +2. zero or more `pydantic_ai_event` +3. `agent_output` +4. `session_snapshot` +5. `run_succeeded` + +A failed run emits: + +1. `run_started` +2. zero or more `pydantic_ai_event` +3. `run_failed` + +`pydantic_ai_event.data` is serialized with Pydantic AI's `AgentStreamEvent` +adapter. `session_snapshot.data` contains the serialized +`CompositorSessionSnapshot` that can be sent as `session_snapshot` in a later +create-run request with the same compositor layer names and order. + +## Consumer examples + +See: + +- `dify-agent/examples/run_server_consumer.py` for cursor polling +- `dify-agent/examples/run_server_sse_consumer.py` for SSE consumption diff --git a/dify-agent/docs/dify-agent/guide/README.md b/dify-agent/docs/dify-agent/guide/README.md new file mode 100644 index 0000000000..c50bf4a86a --- /dev/null +++ b/dify-agent/docs/dify-agent/guide/README.md @@ -0,0 +1,139 @@ +# Operating the Dify Agent Run Server + +This guide describes how to run the MVP Dify Agent API server and worker. The +server is implemented in `dify-agent/src/dify_agent/server/app.py` and uses Redis +for run records, job queues, and event streams. + +## Default local startup + +Start Redis, then run one FastAPI/uvicorn process: + +```bash +uv run --project dify-agent uvicorn dify_agent.server.app:app --reload +``` + +By default, the FastAPI lifespan creates both: + +- one Redis-backed run store used by HTTP routes +- one embedded Redis Streams worker task that executes queued runs + +This means local development needs one uvicorn process plus Redis. Run execution +still happens outside request handlers, so client disconnects do not cancel the +agent run. + +## Configuration + +`ServerSettings` loads environment variables with the `DIFY_AGENT_` prefix. It +also reads `.env` and `dify-agent/.env` when present. + +| Environment variable | Default | Description | +| --- | --- | --- | +| `DIFY_AGENT_REDIS_URL` | `redis://localhost:6379/0` | Redis connection URL. | +| `DIFY_AGENT_REDIS_PREFIX` | `dify-agent` | Prefix for Redis record, job, and event keys. | +| `DIFY_AGENT_WORKER_ENABLED` | `true` | Starts the embedded worker in the FastAPI process when true. | +| `DIFY_AGENT_WORKER_GROUP_NAME` | `run-workers` | Redis consumer group used by workers. | +| `DIFY_AGENT_WORKER_CONSUMER_NAME` | unset | Explicit consumer name. If unset, the API process uses `api-{hostname}-{pid}`; the standalone worker uses `worker-1`. | +| `DIFY_AGENT_WORKER_PENDING_IDLE_MS` | `600000` | Idle time before a pending job may be reclaimed with `XAUTOCLAIM` (10 minutes). | + +Boolean settings accept Pydantic settings values such as `false`, `0`, or `no`. + +Example `.env`: + +```env +DIFY_AGENT_REDIS_URL=redis://localhost:6379/0 +DIFY_AGENT_REDIS_PREFIX=dify-agent-dev +DIFY_AGENT_WORKER_ENABLED=true +DIFY_AGENT_WORKER_PENDING_IDLE_MS=600000 +``` + +## Running a separate worker + +For deployments that want to scale HTTP and worker processes independently, +disable the embedded worker and start a worker process separately: + +```bash +DIFY_AGENT_WORKER_ENABLED=false \ + uv run --project dify-agent uvicorn dify_agent.server.app:app + +uv run --project dify-agent python -m dify_agent.worker.job_worker +``` + +Use the same Redis URL, prefix, and worker group for the API process and all +standalone workers. Give each live worker a unique +`DIFY_AGENT_WORKER_CONSUMER_NAME` when running multiple standalone workers. + +## Redis Streams reliability + +Run creation stores the run record and enqueues the worker job in one Redis +transaction (`MULTI/EXEC`). A create request either persists both pieces or fails +without leaving a queued run that has no job. + +Workers read jobs from a Redis Streams consumer group. If a worker crashes after +receiving a job but before acknowledging it, Redis keeps the entry pending. On +later iterations, workers call `XAUTOCLAIM` and reclaim entries idle for at least +`DIFY_AGENT_WORKER_PENDING_IDLE_MS` before reading new `>` entries. The default +idle time is `600000` milliseconds (10 minutes). + +Choose the pending idle value according to your longest expected run time. A +value that is too short can cause a healthy long-running job to be reclaimed by +another worker; a value that is too long delays recovery after crashes. + +## Run inputs and session snapshots + +The API does not accept a top-level `user_prompt`. Submit a `CompositorConfig` +whose Agenton layers provide user input. With the MVP registry, use +`plain.prompt` and its `config.user` field: + +```json +{ + "compositor": { + "schema_version": 1, + "layers": [ + { + "name": "prompt", + "type": "plain.prompt", + "config": { + "prefix": "You are concise.", + "user": "Summarize the current state." + } + } + ] + } +} +``` + +`config.user` can be a string or a list of strings. Empty or whitespace-only +effective prompts are rejected with `422` at the API boundary or with a runner +validation error if they reach execution. + +There is no Pydantic AI history layer. To resume Agenton layer state, pass the +`session_snapshot` emitted by a previous run together with a compositor that has +the same layer names and order. + +## Observing runs + +Use the HTTP status endpoint for coarse state and the event endpoints for detailed +progress: + +- `POST /runs` creates a queued run. +- `GET /runs/{run_id}` returns `queued`, `running`, `succeeded`, or `failed`. +- `GET /runs/{run_id}/events` polls the Redis Stream event log with `after` and + `next_cursor` cursors. +- `GET /runs/{run_id}/events/sse` replays and streams events over SSE. The SSE + `id` is the event Redis Stream ID. `after` query cursors take precedence over + `Last-Event-ID` headers. + +Successful runs emit `run_started`, zero or more `pydantic_ai_event`, +`agent_output`, `session_snapshot`, and `run_succeeded`. Failed runs end with +`run_failed`. + +## Examples + +The repository includes simple consumers that print observed output/events: + +- `dify-agent/examples/run_server_consumer.py` creates a run and polls events. +- `dify-agent/examples/run_server_sse_consumer.py` consumes raw SSE frames for an + existing run id. + +Both examples use the credential-free Pydantic AI `TestModel` profile; they still +require Redis and the API server. diff --git a/dify-agent/tests/local/dify_agent/runtime/test_runner.py b/dify-agent/tests/local/dify_agent/runtime/test_runner.py new file mode 100644 index 0000000000..c0cecb470e --- /dev/null +++ b/dify-agent/tests/local/dify_agent/runtime/test_runner.py @@ -0,0 +1,62 @@ +import asyncio + +import pytest + +from agenton.compositor import CompositorConfig, LayerNodeConfig +from dify_agent.runtime.event_sink import InMemoryRunEventSink +from dify_agent.runtime.runner import AgentRunRunner, AgentRunValidationError +from dify_agent.server.schemas import AgentProfileConfig, CreateRunRequest + + +def test_runner_emits_terminal_success_and_snapshot() -> None: + request = CreateRunRequest( + compositor=CompositorConfig( + layers=[ + LayerNodeConfig( + name="prompt", + type="plain.prompt", + config={"prefix": "system", "user": "hello"}, + ) + ] + ), + agent_profile=AgentProfileConfig(output_text="done"), + ) + sink = InMemoryRunEventSink() + + asyncio.run(AgentRunRunner(sink=sink, request=request, run_id="run-1").run()) + + event_types = [event.type for event in sink.events["run-1"]] + assert event_types[0] == "run_started" + assert "pydantic_ai_event" in event_types + assert event_types[-3:] == ["agent_output", "session_snapshot", "run_succeeded"] + assert sink.statuses["run-1"] == "succeeded" + + +def test_runner_fails_empty_user_prompts() -> None: + request = CreateRunRequest( + compositor=CompositorConfig( + layers=[LayerNodeConfig(name="prompt", type="plain.prompt", config={"prefix": "system"})] + ) + ) + sink = InMemoryRunEventSink() + + with pytest.raises(AgentRunValidationError): + asyncio.run(AgentRunRunner(sink=sink, request=request, run_id="run-2").run()) + + assert [event.type for event in sink.events["run-2"]] == ["run_started", "run_failed"] + assert sink.statuses["run-2"] == "failed" + + +def test_runner_fails_blank_string_user_prompt_list() -> None: + request = CreateRunRequest( + compositor=CompositorConfig( + layers=[LayerNodeConfig(name="prompt", type="plain.prompt", config={"user": ["", " "]})] + ) + ) + sink = InMemoryRunEventSink() + + with pytest.raises(AgentRunValidationError): + asyncio.run(AgentRunRunner(sink=sink, request=request, run_id="run-3").run()) + + assert [event.type for event in sink.events["run-3"]] == ["run_started", "run_failed"] + assert sink.statuses["run-3"] == "failed" diff --git a/dify-agent/tests/local/dify_agent/server/test_app.py b/dify-agent/tests/local/dify_agent/server/test_app.py new file mode 100644 index 0000000000..0287bdd4bb --- /dev/null +++ b/dify-agent/tests/local/dify_agent/server/test_app.py @@ -0,0 +1,92 @@ +import asyncio +from typing import ClassVar + +import pytest +from fastapi.testclient import TestClient + +import dify_agent.server.app as app_module +from dify_agent.server.app import create_app +from dify_agent.server.settings import ServerSettings + + +class FakeRedis: + closed: bool + + def __init__(self) -> None: + self.closed = False + + async def aclose(self) -> None: + self.closed = True + + +class FakeRunJobWorker: + created: ClassVar[list["FakeRunJobWorker"]] = [] + + group_name: str + consumer_name: str + pending_idle_ms: int + started: bool + cancelled: bool + + def __init__( + self, + *, + store: object, + group_name: str, + consumer_name: str, + pending_idle_ms: int, + ) -> None: + del store + self.group_name = group_name + self.consumer_name = consumer_name + self.pending_idle_ms = pending_idle_ms + self.started = False + self.cancelled = False + self.created.append(self) + + async def run_forever(self) -> None: + self.started = True + try: + await asyncio.get_running_loop().create_future() + except asyncio.CancelledError: + self.cancelled = True + raise + + +def test_create_app_starts_and_cancels_embedded_worker(monkeypatch: pytest.MonkeyPatch) -> None: + fake_redis = FakeRedis() + FakeRunJobWorker.created.clear() + monkeypatch.setattr(app_module.Redis, "from_url", lambda _url: fake_redis) + monkeypatch.setattr(app_module, "RunJobWorker", FakeRunJobWorker) + + settings = ServerSettings( + redis_url="redis://example.invalid/0", + redis_prefix="test", + worker_enabled=True, + worker_group_name="workers", + worker_consumer_name="consumer-a", + worker_pending_idle_ms=5, + ) + + with TestClient(create_app(settings)): + assert len(FakeRunJobWorker.created) == 1 + worker = FakeRunJobWorker.created[0] + assert worker.started is True + assert worker.group_name == "workers" + assert worker.consumer_name == "consumer-a" + assert worker.pending_idle_ms == 5 + + assert FakeRunJobWorker.created[0].cancelled is True + assert fake_redis.closed is True + + +def test_create_app_can_disable_embedded_worker(monkeypatch: pytest.MonkeyPatch) -> None: + fake_redis = FakeRedis() + FakeRunJobWorker.created.clear() + monkeypatch.setattr(app_module.Redis, "from_url", lambda _url: fake_redis) + monkeypatch.setattr(app_module, "RunJobWorker", FakeRunJobWorker) + + with TestClient(create_app(ServerSettings(worker_enabled=False))): + assert FakeRunJobWorker.created == [] + + assert fake_redis.closed is True diff --git a/dify-agent/tests/local/dify_agent/server/test_runs_routes.py b/dify-agent/tests/local/dify_agent/server/test_runs_routes.py new file mode 100644 index 0000000000..562df814b1 --- /dev/null +++ b/dify-agent/tests/local/dify_agent/server/test_runs_routes.py @@ -0,0 +1,29 @@ +from fastapi.testclient import TestClient + +from dify_agent.server.routes.runs import create_runs_router + + +class FakeStore: + async def create_run(self, request: object) -> object: + raise AssertionError("blank prompt requests must be rejected before enqueue") + + +def test_create_run_rejects_effectively_blank_user_prompt_list() -> None: + from fastapi import FastAPI + + app = FastAPI() + app.include_router(create_runs_router(lambda: FakeStore())) # pyright: ignore[reportArgumentType] + client = TestClient(app) + + response = client.post( + "/runs", + json={ + "compositor": { + "schema_version": 1, + "layers": [{"name": "prompt", "type": "plain.prompt", "config": {"user": ["", " "]}}], + } + }, + ) + + assert response.status_code == 422 + assert response.json()["detail"] == "compositor.user_prompts must not be empty" diff --git a/dify-agent/tests/local/dify_agent/server/test_sse.py b/dify-agent/tests/local/dify_agent/server/test_sse.py new file mode 100644 index 0000000000..fe05f39285 --- /dev/null +++ b/dify-agent/tests/local/dify_agent/server/test_sse.py @@ -0,0 +1,12 @@ +from dify_agent.server.schemas import RunEvent +from dify_agent.server.sse import format_sse_event + + +def test_format_sse_event_uses_id_event_and_json_data() -> None: + event = RunEvent(id="7-0", run_id="run-1", type="run_started", data={}) + + frame = format_sse_event(event) + + assert frame.startswith("id: 7-0\nevent: run_started\ndata: ") + assert '"run_id":"run-1"' in frame + assert frame.endswith("\n\n") diff --git a/dify-agent/tests/local/dify_agent/storage/test_redis_run_store.py b/dify-agent/tests/local/dify_agent/storage/test_redis_run_store.py new file mode 100644 index 0000000000..61424165ab --- /dev/null +++ b/dify-agent/tests/local/dify_agent/storage/test_redis_run_store.py @@ -0,0 +1,89 @@ +import asyncio +from collections.abc import Mapping + +import pytest + +from agenton.compositor import CompositorConfig, LayerNodeConfig +from dify_agent.server.schemas import CreateRunRequest +from dify_agent.storage.redis_run_store import RedisRunStore + + +def _request() -> CreateRunRequest: + return CreateRunRequest( + compositor=CompositorConfig( + layers=[LayerNodeConfig(name="prompt", type="plain.prompt", config={"user": "hello"})] + ) + ) + + +class FakePipeline: + staged: list[tuple[str, str, object]] + executed: bool + fail_execute: bool + + def __init__(self, *, fail_execute: bool = False) -> None: + self.staged = [] + self.executed = False + self.fail_execute = fail_execute + + async def __aenter__(self) -> "FakePipeline": + return self + + async def __aexit__(self, exc_type: object, exc: object, traceback: object) -> None: + return None + + def set(self, key: str, value: object) -> None: + self.staged.append(("set", key, value)) + + def xadd(self, key: str, fields: Mapping[str, object]) -> None: + self.staged.append(("xadd", key, dict(fields))) + + async def execute(self) -> None: + if self.fail_execute: + raise RuntimeError("transaction failed") + self.executed = True + + +class FakeRedis: + pipeline_instance: FakePipeline + direct_commands: list[str] + + def __init__(self, pipeline: FakePipeline) -> None: + self.pipeline_instance = pipeline + self.direct_commands = [] + + def pipeline(self, *, transaction: bool) -> FakePipeline: + assert transaction is True + return self.pipeline_instance + + async def set(self, key: str, value: object) -> None: + self.direct_commands.append(f"set:{key}") + + async def xadd(self, key: str, fields: Mapping[str, object]) -> str: + self.direct_commands.append(f"xadd:{key}") + return "1-0" + + +def test_create_run_writes_record_and_job_in_one_transaction() -> None: + pipeline = FakePipeline() + redis = FakeRedis(pipeline) + store = RedisRunStore(redis, prefix="test") # pyright: ignore[reportArgumentType] + + record = asyncio.run(store.create_run(_request())) + + assert record.status == "queued" + assert pipeline.executed is True + assert [command[0] for command in pipeline.staged] == ["set", "xadd"] + assert redis.direct_commands == [] + + +def test_create_run_does_not_fall_back_to_partial_writes_when_transaction_fails() -> None: + pipeline = FakePipeline(fail_execute=True) + redis = FakeRedis(pipeline) + store = RedisRunStore(redis, prefix="test") # pyright: ignore[reportArgumentType] + + with pytest.raises(RuntimeError, match="transaction failed"): + asyncio.run(store.create_run(_request())) + + assert pipeline.executed is False + assert redis.direct_commands == [] diff --git a/dify-agent/tests/local/dify_agent/worker/test_job_worker.py b/dify-agent/tests/local/dify_agent/worker/test_job_worker.py new file mode 100644 index 0000000000..a9c09900ea --- /dev/null +++ b/dify-agent/tests/local/dify_agent/worker/test_job_worker.py @@ -0,0 +1,90 @@ +import asyncio +from collections.abc import Mapping +from typing import cast + +from agenton.compositor import CompositorConfig, LayerNodeConfig +from dify_agent.server.schemas import CreateRunRequest, RunnerJob +from dify_agent.storage.redis_run_store import RedisRunStore +from dify_agent.worker.job_worker import JobRunner, RunJobWorker + + +def _job() -> RunnerJob: + request = CreateRunRequest( + compositor=CompositorConfig( + layers=[LayerNodeConfig(name="prompt", type="plain.prompt", config={"user": "hello"})] + ) + ) + return RunnerJob(run_id="run-1", request=request) + + +class FakeRunner: + ran: bool + + def __init__(self) -> None: + self.ran = False + + async def run(self) -> None: + self.ran = True + + +class FakeRedis: + xreadgroup_called: bool + acked: list[tuple[str, str, str | bytes]] + claimed_payload: str + + def __init__(self, claimed_payload: str) -> None: + self.xreadgroup_called = False + self.acked = [] + self.claimed_payload = claimed_payload + + async def xautoclaim( + self, + name: str, + groupname: str, + consumername: str, + min_idle_time: int, + start_id: str, + count: int, + ) -> tuple[str, list[tuple[bytes, dict[bytes, bytes]]], list[bytes]]: + assert name == "test:runs:jobs" + assert groupname == "workers" + assert consumername == "worker-b" + assert min_idle_time == 10 + assert start_id == "0-0" + assert count == 1 + return "0-0", [(b"1-0", {b"payload": self.claimed_payload.encode()})], [] + + async def xreadgroup( + self, + groupname: str, + consumername: str, + streams: Mapping[str, str], + count: int, + block: int, + ) -> list[tuple[str, list[tuple[bytes, dict[bytes, bytes]]]]]: + self.xreadgroup_called = True + return [] + + async def xack(self, name: str, groupname: str, entry_id: str | bytes) -> None: + self.acked.append((name, groupname, entry_id)) + + +def test_process_once_reclaims_stale_pending_job_before_reading_new_entries() -> None: + job = _job() + runner = FakeRunner() + redis = FakeRedis(job.model_dump_json()) + store = RedisRunStore(cast(object, redis), prefix="test") # pyright: ignore[reportArgumentType] + worker = RunJobWorker( + store=store, + group_name="workers", + consumer_name="worker-b", + pending_idle_ms=10, + runner_factory=lambda _job: cast(JobRunner, runner), + ) + + processed = asyncio.run(worker.process_once(block_ms=0)) + + assert processed is True + assert runner.ran is True + assert redis.xreadgroup_called is False + assert redis.acked == [("test:runs:jobs", "workers", b"1-0")]