add dify-agent run retention ttl

This commit is contained in:
盐粒 Yanli 2026-05-11 20:18:34 +08:00
parent 80312f9745
commit f12ec0d53a
7 changed files with 101 additions and 21 deletions

View File

@ -63,7 +63,9 @@ Response (`202 Accepted`):
```
The server persists the run record and schedules execution immediately in the
same FastAPI process. Redis is not used as a job queue.
same FastAPI process. Redis is not used as a job queue. Run records and per-run
event streams expire after `DIFY_AGENT_RUN_RETENTION_SECONDS`, which defaults to
`259200` seconds (3 days).
`agent_profile.provider` currently supports the credential-free `test` profile.
@ -99,7 +101,7 @@ Status values are:
- `succeeded`
- `failed`
Unknown run ids return `404` with `"run not found"`.
Unknown or expired run ids return `404` with `"run not found"`.
## Poll events

View File

@ -31,6 +31,7 @@ also reads `.env` and `dify-agent/.env` when present.
| `DIFY_AGENT_REDIS_URL` | `redis://localhost:6379/0` | Redis connection URL. |
| `DIFY_AGENT_REDIS_PREFIX` | `dify-agent` | Prefix for Redis record and event keys. |
| `DIFY_AGENT_SHUTDOWN_GRACE_SECONDS` | `30` | Seconds to wait for active local runs during graceful shutdown before cancellation. |
| `DIFY_AGENT_RUN_RETENTION_SECONDS` | `259200` | Seconds to retain Redis run records and per-run event streams; defaults to 3 days. |
Example `.env`:
@ -38,8 +39,13 @@ Example `.env`:
DIFY_AGENT_REDIS_URL=redis://localhost:6379/0
DIFY_AGENT_REDIS_PREFIX=dify-agent-dev
DIFY_AGENT_SHUTDOWN_GRACE_SECONDS=30
DIFY_AGENT_RUN_RETENTION_SECONDS=259200
```
Run records and event streams use the same retention. Status writes refresh the
record TTL, and event writes refresh both the stream TTL and the corresponding
record TTL so active runs that keep producing events remain observable.
## Scheduling and shutdown semantics
`POST /runs` validates the compositor, persists a `running` run record, and starts

View File

@ -3,7 +3,8 @@
The HTTP process owns Redis clients, route wiring, and a process-local scheduler.
Run execution happens in background ``asyncio`` tasks rather than request
handlers, so client disconnects do not cancel the agent runtime. Redis persists
run records and per-run event streams only; it is not used as a job queue.
run records and per-run event streams with configured retention only; it is not
used as a job queue.
"""
from collections.abc import AsyncGenerator
@ -26,7 +27,11 @@ def create_app(settings: ServerSettings | None = None) -> FastAPI:
@asynccontextmanager
async def lifespan(_app: FastAPI) -> AsyncGenerator[None, None]:
redis = Redis.from_url(resolved_settings.redis_url)
store = RedisRunStore(redis, prefix=resolved_settings.redis_prefix)
store = RedisRunStore(
redis,
prefix=resolved_settings.redis_prefix,
run_retention_seconds=resolved_settings.run_retention_seconds,
)
scheduler = RunScheduler(store=store, shutdown_grace_seconds=resolved_settings.shutdown_grace_seconds)
state["store"] = store
state["scheduler"] = scheduler

View File

@ -2,15 +2,19 @@
from typing import ClassVar
from pydantic import Field
from pydantic_settings import BaseSettings, SettingsConfigDict
DEFAULT_RUN_RETENTION_SECONDS = 3 * 24 * 60 * 60
class ServerSettings(BaseSettings):
"""Environment-backed settings for Redis persistence and local scheduling."""
"""Environment-backed settings for Redis persistence, retention, and local scheduling."""
redis_url: str = "redis://localhost:6379/0"
redis_prefix: str = "dify-agent"
shutdown_grace_seconds: float = 30
run_retention_seconds: int = Field(default=DEFAULT_RUN_RETENTION_SECONDS, ge=1)
model_config: ClassVar[SettingsConfigDict] = SettingsConfigDict(
env_prefix="DIFY_AGENT_",
@ -19,4 +23,4 @@ class ServerSettings(BaseSettings):
)
__all__ = ["ServerSettings"]
__all__ = ["DEFAULT_RUN_RETENTION_SECONDS", "ServerSettings"]

View File

@ -2,7 +2,8 @@
The store writes run records as JSON strings and events as Redis streams. HTTP
event cursors are Redis stream ids; ``0-0`` means replay from the beginning for
polling and SSE. Execution is scheduled in-process by
polling and SSE. Records and streams share one retention window that is refreshed
when status or event data is written. Execution is scheduled in-process by
``dify_agent.runtime.run_scheduler``; Redis is not a job queue.
"""
@ -22,6 +23,7 @@ from dify_agent.server.schemas import (
new_run_id,
utc_now,
)
from dify_agent.server.settings import DEFAULT_RUN_RETENTION_SECONDS
from dify_agent.storage.redis_keys import run_events_key, run_record_key
@ -30,20 +32,39 @@ class RunNotFoundError(LookupError):
class RedisRunStore(RunEventSink):
"""Async Redis implementation for run records and event logs."""
"""Async Redis implementation for run records and event logs.
``run_retention_seconds`` is applied to both the run record key and the
per-run Redis stream. Event writes also refresh the record TTL so long-running
runs that keep producing events do not lose their status record mid-run.
"""
redis: Redis
prefix: str
run_retention_seconds: int
def __init__(self, redis: Redis, *, prefix: str = "dify-agent") -> None:
def __init__(
self,
redis: Redis,
*,
prefix: str = "dify-agent",
run_retention_seconds: int = DEFAULT_RUN_RETENTION_SECONDS,
) -> None:
if run_retention_seconds <= 0:
raise ValueError("run_retention_seconds must be positive")
self.redis = redis
self.prefix = prefix
self.run_retention_seconds = run_retention_seconds
async def create_run(self, request: CreateRunRequest) -> RunRecord:
"""Persist a running run record without enqueueing external work."""
run_id = new_run_id()
record = RunRecord(run_id=run_id, status="running", request=request)
await self.redis.set(run_record_key(self.prefix, run_id), record.model_dump_json())
await self.redis.set(
run_record_key(self.prefix, run_id),
record.model_dump_json(),
ex=self.run_retention_seconds,
)
return record
async def get_run(self, run_id: str) -> RunRecord:
@ -59,15 +80,22 @@ class RedisRunStore(RunEventSink):
"""Update the status fields of an existing run record."""
record = await self.get_run(run_id)
updated = record.model_copy(update={"status": status, "updated_at": utc_now(), "error": error})
await self.redis.set(run_record_key(self.prefix, run_id), updated.model_dump_json())
await self.redis.set(
run_record_key(self.prefix, run_id),
updated.model_dump_json(),
ex=self.run_retention_seconds,
)
async def append_event(self, event: RunEvent) -> str:
"""Append an event JSON payload to the run's Redis stream."""
events_key = run_events_key(self.prefix, event.run_id)
payload = RUN_EVENT_ADAPTER.dump_json(event, exclude={"id"}).decode()
event_id = await self.redis.xadd(
run_events_key(self.prefix, event.run_id),
events_key,
{"payload": payload},
)
await self.redis.expire(events_key, self.run_retention_seconds)
await self.redis.expire(run_record_key(self.prefix, event.run_id), self.run_retention_seconds)
return event_id.decode() if isinstance(event_id, bytes) else str(event_id)
async def get_events(self, run_id: str, *, after: str = "0-0", limit: int = 100) -> RunEventsResponse:
@ -112,4 +140,4 @@ class RedisRunStore(RunEventSink):
return event.model_copy(update={"id": event_id, "run_id": run_id})
__all__ = ["RedisRunStore", "RunNotFoundError"]
__all__ = ["DEFAULT_RUN_RETENTION_SECONDS", "RedisRunStore", "RunNotFoundError"]

View File

@ -4,6 +4,7 @@ from fastapi.testclient import TestClient
import dify_agent.server.app as app_module
from dify_agent.server.app import create_app
from dify_agent.server.settings import ServerSettings
from dify_agent.storage.redis_run_store import RedisRunStore
class FakeRedis:
@ -19,6 +20,7 @@ class FakeRedis:
class FakeRunScheduler:
created: list["FakeRunScheduler"] = []
store: object
shutdown_grace_seconds: float
shutdown_called: bool
@ -28,7 +30,7 @@ class FakeRunScheduler:
store: object,
shutdown_grace_seconds: float,
) -> None:
del store
self.store = store
self.shutdown_grace_seconds = shutdown_grace_seconds
self.shutdown_called = False
self.created.append(self)
@ -47,12 +49,16 @@ def test_create_app_creates_scheduler_and_closes_after_shutdown(monkeypatch: pyt
redis_url="redis://example.invalid/0",
redis_prefix="test",
shutdown_grace_seconds=5,
run_retention_seconds=7,
)
with TestClient(create_app(settings)):
assert len(FakeRunScheduler.created) == 1
scheduler = FakeRunScheduler.created[0]
assert scheduler.shutdown_grace_seconds == 5
store = scheduler.store
assert isinstance(store, RedisRunStore)
assert store.run_retention_seconds == 7
assert FakeRunScheduler.created[0].shutdown_called is True
assert fake_redis.closed is True

View File

@ -3,7 +3,7 @@ from collections.abc import Mapping
from agenton.compositor import CompositorConfig, LayerNodeConfig
from dify_agent.server.schemas import CreateRunRequest, RunStartedEvent
from dify_agent.storage.redis_run_store import RedisRunStore
from dify_agent.storage.redis_run_store import DEFAULT_RUN_RETENTION_SECONDS, RedisRunStore
def _request() -> CreateRunRequest:
@ -15,20 +15,31 @@ def _request() -> CreateRunRequest:
class FakeRedis:
commands: list[tuple[str, str, object]]
commands: list[tuple[object, ...]]
values: dict[str, object]
def __init__(self) -> None:
self.commands = []
self.values = {}
async def set(self, key: str, value: object) -> None:
self.commands.append(("set", key, value))
async def set(self, key: str, value: object, *, ex: int | None = None) -> None:
self.commands.append(("set", key, value, ex))
self.values[key] = value
async def get(self, key: str) -> object | None:
self.commands.append(("get", key))
return self.values.get(key)
async def xadd(self, key: str, fields: Mapping[str, object]) -> str:
self.commands.append(("xadd", key, dict(fields)))
return "1-0"
async def expire(self, key: str, seconds: int) -> bool:
self.commands.append(("expire", key, seconds))
return True
def test_create_run_writes_running_record_without_job_queue() -> None:
def test_create_run_writes_running_record_without_job_queue_and_with_retention() -> None:
redis = FakeRedis()
store = RedisRunStore(redis, prefix="test") # pyright: ignore[reportArgumentType]
@ -37,11 +48,25 @@ def test_create_run_writes_running_record_without_job_queue() -> None:
assert record.status == "running"
assert [command[0] for command in redis.commands] == ["set"]
assert redis.commands[0][1] == f"test:runs:{record.run_id}:record"
assert redis.commands[0][3] == DEFAULT_RUN_RETENTION_SECONDS
def test_append_event_serializes_typed_event_without_id() -> None:
def test_update_status_refreshes_record_retention() -> None:
redis = FakeRedis()
store = RedisRunStore(redis, prefix="test") # pyright: ignore[reportArgumentType]
store = RedisRunStore(redis, prefix="test", run_retention_seconds=60) # pyright: ignore[reportArgumentType]
record = asyncio.run(store.create_run(_request()))
redis.commands.clear()
asyncio.run(store.update_status(record.run_id, "succeeded"))
assert [command[0] for command in redis.commands] == ["get", "set"]
assert redis.commands[1][1] == f"test:runs:{record.run_id}:record"
assert redis.commands[1][3] == 60
def test_append_event_serializes_typed_event_without_id_and_expires_run_keys() -> None:
redis = FakeRedis()
store = RedisRunStore(redis, prefix="test", run_retention_seconds=60) # pyright: ignore[reportArgumentType]
event_id = asyncio.run(store.append_event(RunStartedEvent(id="local", run_id="run-1")))
@ -51,3 +76,7 @@ def test_append_event_serializes_typed_event_without_id() -> None:
assert isinstance(fields, dict)
assert '"id"' not in str(fields["payload"])
assert '"type":"run_started"' in str(fields["payload"])
assert redis.commands[1:] == [
("expire", "test:runs:run-1:events", 60),
("expire", "test:runs:run-1:record", 60),
]