diff --git a/dify-agent/docs/dify-agent/api/README.md b/dify-agent/docs/dify-agent/api/README.md index b983500208..df4e55bc3e 100644 --- a/dify-agent/docs/dify-agent/api/README.md +++ b/dify-agent/docs/dify-agent/api/README.md @@ -63,7 +63,9 @@ Response (`202 Accepted`): ``` The server persists the run record and schedules execution immediately in the -same FastAPI process. Redis is not used as a job queue. +same FastAPI process. Redis is not used as a job queue. Run records and per-run +event streams expire after `DIFY_AGENT_RUN_RETENTION_SECONDS`, which defaults to +`259200` seconds (3 days). `agent_profile.provider` currently supports the credential-free `test` profile. @@ -99,7 +101,7 @@ Status values are: - `succeeded` - `failed` -Unknown run ids return `404` with `"run not found"`. +Unknown or expired run ids return `404` with `"run not found"`. ## Poll events diff --git a/dify-agent/docs/dify-agent/guide/README.md b/dify-agent/docs/dify-agent/guide/README.md index 1cd82ffcc2..1a8539fde7 100644 --- a/dify-agent/docs/dify-agent/guide/README.md +++ b/dify-agent/docs/dify-agent/guide/README.md @@ -31,6 +31,7 @@ also reads `.env` and `dify-agent/.env` when present. | `DIFY_AGENT_REDIS_URL` | `redis://localhost:6379/0` | Redis connection URL. | | `DIFY_AGENT_REDIS_PREFIX` | `dify-agent` | Prefix for Redis record and event keys. | | `DIFY_AGENT_SHUTDOWN_GRACE_SECONDS` | `30` | Seconds to wait for active local runs during graceful shutdown before cancellation. | +| `DIFY_AGENT_RUN_RETENTION_SECONDS` | `259200` | Seconds to retain Redis run records and per-run event streams; defaults to 3 days. | Example `.env`: @@ -38,8 +39,13 @@ Example `.env`: DIFY_AGENT_REDIS_URL=redis://localhost:6379/0 DIFY_AGENT_REDIS_PREFIX=dify-agent-dev DIFY_AGENT_SHUTDOWN_GRACE_SECONDS=30 +DIFY_AGENT_RUN_RETENTION_SECONDS=259200 ``` +Run records and event streams use the same retention. Status writes refresh the +record TTL, and event writes refresh both the stream TTL and the corresponding +record TTL so active runs that keep producing events remain observable. + ## Scheduling and shutdown semantics `POST /runs` validates the compositor, persists a `running` run record, and starts diff --git a/dify-agent/src/dify_agent/server/app.py b/dify-agent/src/dify_agent/server/app.py index 575eb18ff9..d9f04516a9 100644 --- a/dify-agent/src/dify_agent/server/app.py +++ b/dify-agent/src/dify_agent/server/app.py @@ -3,7 +3,8 @@ The HTTP process owns Redis clients, route wiring, and a process-local scheduler. Run execution happens in background ``asyncio`` tasks rather than request handlers, so client disconnects do not cancel the agent runtime. Redis persists -run records and per-run event streams only; it is not used as a job queue. +run records and per-run event streams with configured retention only; it is not +used as a job queue. """ from collections.abc import AsyncGenerator @@ -26,7 +27,11 @@ def create_app(settings: ServerSettings | None = None) -> FastAPI: @asynccontextmanager async def lifespan(_app: FastAPI) -> AsyncGenerator[None, None]: redis = Redis.from_url(resolved_settings.redis_url) - store = RedisRunStore(redis, prefix=resolved_settings.redis_prefix) + store = RedisRunStore( + redis, + prefix=resolved_settings.redis_prefix, + run_retention_seconds=resolved_settings.run_retention_seconds, + ) scheduler = RunScheduler(store=store, shutdown_grace_seconds=resolved_settings.shutdown_grace_seconds) state["store"] = store state["scheduler"] = scheduler diff --git a/dify-agent/src/dify_agent/server/settings.py b/dify-agent/src/dify_agent/server/settings.py index b1f9cc0ba9..a5f9905b90 100644 --- a/dify-agent/src/dify_agent/server/settings.py +++ b/dify-agent/src/dify_agent/server/settings.py @@ -2,15 +2,19 @@ from typing import ClassVar +from pydantic import Field from pydantic_settings import BaseSettings, SettingsConfigDict +DEFAULT_RUN_RETENTION_SECONDS = 3 * 24 * 60 * 60 + class ServerSettings(BaseSettings): - """Environment-backed settings for Redis persistence and local scheduling.""" + """Environment-backed settings for Redis persistence, retention, and local scheduling.""" redis_url: str = "redis://localhost:6379/0" redis_prefix: str = "dify-agent" shutdown_grace_seconds: float = 30 + run_retention_seconds: int = Field(default=DEFAULT_RUN_RETENTION_SECONDS, ge=1) model_config: ClassVar[SettingsConfigDict] = SettingsConfigDict( env_prefix="DIFY_AGENT_", @@ -19,4 +23,4 @@ class ServerSettings(BaseSettings): ) -__all__ = ["ServerSettings"] +__all__ = ["DEFAULT_RUN_RETENTION_SECONDS", "ServerSettings"] diff --git a/dify-agent/src/dify_agent/storage/redis_run_store.py b/dify-agent/src/dify_agent/storage/redis_run_store.py index d69f339119..f5c56815c9 100644 --- a/dify-agent/src/dify_agent/storage/redis_run_store.py +++ b/dify-agent/src/dify_agent/storage/redis_run_store.py @@ -2,7 +2,8 @@ The store writes run records as JSON strings and events as Redis streams. HTTP event cursors are Redis stream ids; ``0-0`` means replay from the beginning for -polling and SSE. Execution is scheduled in-process by +polling and SSE. Records and streams share one retention window that is refreshed +when status or event data is written. Execution is scheduled in-process by ``dify_agent.runtime.run_scheduler``; Redis is not a job queue. """ @@ -22,6 +23,7 @@ from dify_agent.server.schemas import ( new_run_id, utc_now, ) +from dify_agent.server.settings import DEFAULT_RUN_RETENTION_SECONDS from dify_agent.storage.redis_keys import run_events_key, run_record_key @@ -30,20 +32,39 @@ class RunNotFoundError(LookupError): class RedisRunStore(RunEventSink): - """Async Redis implementation for run records and event logs.""" + """Async Redis implementation for run records and event logs. + + ``run_retention_seconds`` is applied to both the run record key and the + per-run Redis stream. Event writes also refresh the record TTL so long-running + runs that keep producing events do not lose their status record mid-run. + """ redis: Redis prefix: str + run_retention_seconds: int - def __init__(self, redis: Redis, *, prefix: str = "dify-agent") -> None: + def __init__( + self, + redis: Redis, + *, + prefix: str = "dify-agent", + run_retention_seconds: int = DEFAULT_RUN_RETENTION_SECONDS, + ) -> None: + if run_retention_seconds <= 0: + raise ValueError("run_retention_seconds must be positive") self.redis = redis self.prefix = prefix + self.run_retention_seconds = run_retention_seconds async def create_run(self, request: CreateRunRequest) -> RunRecord: """Persist a running run record without enqueueing external work.""" run_id = new_run_id() record = RunRecord(run_id=run_id, status="running", request=request) - await self.redis.set(run_record_key(self.prefix, run_id), record.model_dump_json()) + await self.redis.set( + run_record_key(self.prefix, run_id), + record.model_dump_json(), + ex=self.run_retention_seconds, + ) return record async def get_run(self, run_id: str) -> RunRecord: @@ -59,15 +80,22 @@ class RedisRunStore(RunEventSink): """Update the status fields of an existing run record.""" record = await self.get_run(run_id) updated = record.model_copy(update={"status": status, "updated_at": utc_now(), "error": error}) - await self.redis.set(run_record_key(self.prefix, run_id), updated.model_dump_json()) + await self.redis.set( + run_record_key(self.prefix, run_id), + updated.model_dump_json(), + ex=self.run_retention_seconds, + ) async def append_event(self, event: RunEvent) -> str: """Append an event JSON payload to the run's Redis stream.""" + events_key = run_events_key(self.prefix, event.run_id) payload = RUN_EVENT_ADAPTER.dump_json(event, exclude={"id"}).decode() event_id = await self.redis.xadd( - run_events_key(self.prefix, event.run_id), + events_key, {"payload": payload}, ) + await self.redis.expire(events_key, self.run_retention_seconds) + await self.redis.expire(run_record_key(self.prefix, event.run_id), self.run_retention_seconds) return event_id.decode() if isinstance(event_id, bytes) else str(event_id) async def get_events(self, run_id: str, *, after: str = "0-0", limit: int = 100) -> RunEventsResponse: @@ -112,4 +140,4 @@ class RedisRunStore(RunEventSink): return event.model_copy(update={"id": event_id, "run_id": run_id}) -__all__ = ["RedisRunStore", "RunNotFoundError"] +__all__ = ["DEFAULT_RUN_RETENTION_SECONDS", "RedisRunStore", "RunNotFoundError"] diff --git a/dify-agent/tests/local/dify_agent/server/test_app.py b/dify-agent/tests/local/dify_agent/server/test_app.py index 569391fede..fdb10685ae 100644 --- a/dify-agent/tests/local/dify_agent/server/test_app.py +++ b/dify-agent/tests/local/dify_agent/server/test_app.py @@ -4,6 +4,7 @@ from fastapi.testclient import TestClient import dify_agent.server.app as app_module from dify_agent.server.app import create_app from dify_agent.server.settings import ServerSettings +from dify_agent.storage.redis_run_store import RedisRunStore class FakeRedis: @@ -19,6 +20,7 @@ class FakeRedis: class FakeRunScheduler: created: list["FakeRunScheduler"] = [] + store: object shutdown_grace_seconds: float shutdown_called: bool @@ -28,7 +30,7 @@ class FakeRunScheduler: store: object, shutdown_grace_seconds: float, ) -> None: - del store + self.store = store self.shutdown_grace_seconds = shutdown_grace_seconds self.shutdown_called = False self.created.append(self) @@ -47,12 +49,16 @@ def test_create_app_creates_scheduler_and_closes_after_shutdown(monkeypatch: pyt redis_url="redis://example.invalid/0", redis_prefix="test", shutdown_grace_seconds=5, + run_retention_seconds=7, ) with TestClient(create_app(settings)): assert len(FakeRunScheduler.created) == 1 scheduler = FakeRunScheduler.created[0] assert scheduler.shutdown_grace_seconds == 5 + store = scheduler.store + assert isinstance(store, RedisRunStore) + assert store.run_retention_seconds == 7 assert FakeRunScheduler.created[0].shutdown_called is True assert fake_redis.closed is True diff --git a/dify-agent/tests/local/dify_agent/storage/test_redis_run_store.py b/dify-agent/tests/local/dify_agent/storage/test_redis_run_store.py index 4df4ec9a8b..e76ec5437c 100644 --- a/dify-agent/tests/local/dify_agent/storage/test_redis_run_store.py +++ b/dify-agent/tests/local/dify_agent/storage/test_redis_run_store.py @@ -3,7 +3,7 @@ from collections.abc import Mapping from agenton.compositor import CompositorConfig, LayerNodeConfig from dify_agent.server.schemas import CreateRunRequest, RunStartedEvent -from dify_agent.storage.redis_run_store import RedisRunStore +from dify_agent.storage.redis_run_store import DEFAULT_RUN_RETENTION_SECONDS, RedisRunStore def _request() -> CreateRunRequest: @@ -15,20 +15,31 @@ def _request() -> CreateRunRequest: class FakeRedis: - commands: list[tuple[str, str, object]] + commands: list[tuple[object, ...]] + values: dict[str, object] def __init__(self) -> None: self.commands = [] + self.values = {} - async def set(self, key: str, value: object) -> None: - self.commands.append(("set", key, value)) + async def set(self, key: str, value: object, *, ex: int | None = None) -> None: + self.commands.append(("set", key, value, ex)) + self.values[key] = value + + async def get(self, key: str) -> object | None: + self.commands.append(("get", key)) + return self.values.get(key) async def xadd(self, key: str, fields: Mapping[str, object]) -> str: self.commands.append(("xadd", key, dict(fields))) return "1-0" + async def expire(self, key: str, seconds: int) -> bool: + self.commands.append(("expire", key, seconds)) + return True -def test_create_run_writes_running_record_without_job_queue() -> None: + +def test_create_run_writes_running_record_without_job_queue_and_with_retention() -> None: redis = FakeRedis() store = RedisRunStore(redis, prefix="test") # pyright: ignore[reportArgumentType] @@ -37,11 +48,25 @@ def test_create_run_writes_running_record_without_job_queue() -> None: assert record.status == "running" assert [command[0] for command in redis.commands] == ["set"] assert redis.commands[0][1] == f"test:runs:{record.run_id}:record" + assert redis.commands[0][3] == DEFAULT_RUN_RETENTION_SECONDS -def test_append_event_serializes_typed_event_without_id() -> None: +def test_update_status_refreshes_record_retention() -> None: redis = FakeRedis() - store = RedisRunStore(redis, prefix="test") # pyright: ignore[reportArgumentType] + store = RedisRunStore(redis, prefix="test", run_retention_seconds=60) # pyright: ignore[reportArgumentType] + record = asyncio.run(store.create_run(_request())) + redis.commands.clear() + + asyncio.run(store.update_status(record.run_id, "succeeded")) + + assert [command[0] for command in redis.commands] == ["get", "set"] + assert redis.commands[1][1] == f"test:runs:{record.run_id}:record" + assert redis.commands[1][3] == 60 + + +def test_append_event_serializes_typed_event_without_id_and_expires_run_keys() -> None: + redis = FakeRedis() + store = RedisRunStore(redis, prefix="test", run_retention_seconds=60) # pyright: ignore[reportArgumentType] event_id = asyncio.run(store.append_event(RunStartedEvent(id="local", run_id="run-1"))) @@ -51,3 +76,7 @@ def test_append_event_serializes_typed_event_without_id() -> None: assert isinstance(fields, dict) assert '"id"' not in str(fields["payload"]) assert '"type":"run_started"' in str(fields["payload"]) + assert redis.commands[1:] == [ + ("expire", "test:runs:run-1:events", 60), + ("expire", "test:runs:run-1:record", 60), + ]