feat(api): agent backend session lifecycle for workflow agent nodes (#36724)

Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
zyssyz123 2026-05-27 23:00:21 +08:00 committed by GitHub
parent 09ef785a20
commit 9cdeffd0b1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
20 changed files with 2086 additions and 25 deletions

View File

@ -38,6 +38,8 @@ from clients.agent_backend.request_builder import (
AgentBackendOutputConfig,
AgentBackendRunRequestBuilder,
AgentBackendWorkflowNodeRunInput,
CleanupLayerSpec,
extract_cleanup_layer_specs,
redact_for_agent_backend_log,
)
@ -68,9 +70,11 @@ __all__ = [
"AgentBackendTransportError",
"AgentBackendValidationError",
"AgentBackendWorkflowNodeRunInput",
"CleanupLayerSpec",
"DifyAgentBackendRunClient",
"FakeAgentBackendRunClient",
"FakeAgentBackendScenario",
"create_agent_backend_run_client",
"extract_cleanup_layer_specs",
"redact_for_agent_backend_log",
]

View File

@ -20,6 +20,8 @@ from dify_agent.protocol import (
RunEvent,
RunFailedEvent,
RunFailedEventData,
RunPausedEvent,
RunPausedEventData,
RunStartedEvent,
RunStatusResponse,
RunSucceededEvent,
@ -34,6 +36,7 @@ class FakeAgentBackendScenario(StrEnum):
SUCCESS = "success"
FAILED = "failed"
PAUSED = "paused"
class FakeAgentBackendRunClient:
@ -89,6 +92,13 @@ class FakeAgentBackendRunClient:
updated_at=_FIXED_TIME,
error="fake failure",
)
case FakeAgentBackendScenario.PAUSED:
return RunStatusResponse(
run_id=run_id,
status="paused",
created_at=_FIXED_TIME,
updated_at=_FIXED_TIME,
)
def _events(self, run_id: str) -> tuple[RunEvent, ...]:
match self.scenario:
@ -115,3 +125,17 @@ class FakeAgentBackendRunClient:
data=RunFailedEventData(error="fake failure", reason="unit_test"),
),
)
case FakeAgentBackendScenario.PAUSED:
return (
RunStartedEvent(id="1-0", run_id=run_id, created_at=_FIXED_TIME),
RunPausedEvent(
id="2-0",
run_id=run_id,
created_at=_FIXED_TIME,
data=RunPausedEventData(
reason="human_input_required",
message="Agent requested human input.",
session_snapshot=CompositorSessionSnapshot(layers=[]),
),
),
)

View File

@ -11,11 +11,13 @@ composition-driven.
from __future__ import annotations
from typing import ClassVar
from typing import ClassVar, cast
from agenton.compositor import CompositorSessionSnapshot
from agenton.compositor.schemas import LayerSessionSnapshot
from agenton.layers import ExitIntent
from agenton_collections.layers.plain import PLAIN_PROMPT_LAYER_TYPE_ID, PromptLayerConfig
from agenton_collections.layers.pydantic_ai import PYDANTIC_AI_HISTORY_LAYER_TYPE_ID
from dify_agent.layers.dify_plugin import (
DIFY_PLUGIN_LLM_LAYER_TYPE_ID,
DIFY_PLUGIN_TOOLS_LAYER_TYPE_ID,
@ -29,6 +31,7 @@ from dify_agent.layers.execution_context import (
)
from dify_agent.layers.output import DIFY_OUTPUT_LAYER_TYPE_ID, DifyOutputLayerConfig
from dify_agent.protocol import (
DIFY_AGENT_HISTORY_LAYER_ID,
DIFY_AGENT_MODEL_LAYER_ID,
DIFY_AGENT_OUTPUT_LAYER_ID,
CreateRunRequest,
@ -45,6 +48,84 @@ WORKFLOW_USER_PROMPT_LAYER_ID = "workflow_user_prompt"
DIFY_EXECUTION_CONTEXT_LAYER_ID = "execution_context"
DIFY_PLUGIN_TOOLS_LAYER_ID = "tools"
# Layer types that hold credentials in their per-run config. These are excluded
# from the cleanup-replay composition (and from the snapshot that is sent with
# the cleanup request) because we deliberately do not persist plaintext
# credentials between runs.
_CLEANUP_EXCLUDED_LAYER_TYPES: tuple[str, ...] = (
DIFY_PLUGIN_LLM_LAYER_TYPE_ID,
DIFY_PLUGIN_TOOLS_LAYER_TYPE_ID,
)
class CleanupLayerSpec(BaseModel):
"""One layer node replayed by an Agent backend cleanup-only run.
Cleanup composition cannot include credential-bearing plugin layers, so we
persist only the non-plugin layer specs together with the original config.
Storing the config (rather than just ``name``/``type``) means cleanup does
not depend on the original build-time inputs being re-derivable.
"""
name: str
type: str
deps: dict[str, str] = Field(default_factory=dict)
metadata: dict[str, JsonValue] = Field(default_factory=dict)
config: JsonValue = None
model_config: ClassVar[ConfigDict] = ConfigDict(extra="forbid")
def extract_cleanup_layer_specs(composition: RunComposition) -> list[CleanupLayerSpec]:
"""Project the in-flight composition into the persistable cleanup spec list.
Plugin layers are intentionally dropped (their configs hold credentials and
the lifecycle contract says "do not include an LLM layer" during cleanup).
The filtered names must later drive snapshot filtering so the agenton
compositor's name-order check still passes for the cleanup run.
"""
excluded = set(_CLEANUP_EXCLUDED_LAYER_TYPES)
specs: list[CleanupLayerSpec] = []
for layer in composition.layers:
if layer.type in excluded:
continue
config_value: JsonValue = None
if isinstance(layer.config, BaseModel):
config_value = layer.config.model_dump(mode="json", warnings=False)
else:
# ``RunLayerSpec.config`` is typed as ``LayerConfigInput`` which
# includes ``Mapping[str, object] | bytes``. In the cleanup-replay
# pipeline our builder only emits BaseModel-derived configs or
# ``None``, so the wider input alias narrows safely here.
config_value = cast(JsonValue, layer.config)
specs.append(
CleanupLayerSpec(
name=layer.name,
type=layer.type,
deps=dict(layer.deps),
metadata=dict(layer.metadata),
config=config_value,
)
)
return specs
def _filter_snapshot_to_specs(
snapshot: CompositorSessionSnapshot,
specs: list[CleanupLayerSpec],
) -> CompositorSessionSnapshot:
"""Keep only snapshot layers whose names appear in the cleanup spec list.
The agenton compositor rejects a snapshot whose layer-name sequence does
not match the active composition exactly. Cleanup-replay drops plugin
layers, so we must drop the matching snapshot entries here.
"""
kept_names = {spec.name for spec in specs}
filtered_layers: list[LayerSessionSnapshot] = [layer for layer in snapshot.layers if layer.name in kept_names]
if len(filtered_layers) == len(snapshot.layers):
return snapshot
return CompositorSessionSnapshot(schema_version=snapshot.schema_version, layers=filtered_layers)
class AgentBackendModelConfig(BaseModel):
"""API-side model/plugin selection before it is converted to Dify Agent layers."""
@ -86,7 +167,8 @@ class AgentBackendWorkflowNodeRunInput(BaseModel):
output: AgentBackendOutputConfig | None = None
tools: DifyPluginToolsLayerConfig | None = None
session_snapshot: CompositorSessionSnapshot | None = None
suspend_on_exit: bool = False
include_history: bool = True
suspend_on_exit: bool = True
metadata: dict[str, JsonValue] = Field(default_factory=dict)
model_config: ClassVar[ConfigDict] = ConfigDict(extra="forbid", arbitrary_types_allowed=True)
@ -102,6 +184,50 @@ class AgentBackendWorkflowNodeRunInput(BaseModel):
class AgentBackendRunRequestBuilder:
"""Converts API product state into the public ``dify-agent`` run protocol."""
def build_cleanup_request(
self,
*,
session_snapshot: CompositorSessionSnapshot,
composition_layer_specs: list[CleanupLayerSpec],
idempotency_key: str | None = None,
metadata: dict[str, JsonValue] | None = None,
) -> CreateRunRequest:
"""Build a lifecycle-only cleanup request that replays the prior layers.
The agenton compositor enforces that the session snapshot's layer names
match the active composition in order, so cleanup must replay the same
non-plugin layer graph that produced the snapshot. Plugin layers
(``dify.plugin.llm``, ``dify.plugin.tools``) are excluded from both the
composition and the snapshot before submission because their configs
require credentials that are not persisted between runs.
"""
if not composition_layer_specs:
raise ValueError(
"build_cleanup_request requires composition_layer_specs; an empty "
"composition would fail the agent backend's snapshot validation."
)
request_metadata = dict(metadata or {})
request_metadata["agent_backend_lifecycle"] = "session_cleanup"
layers = [
RunLayerSpec(
name=spec.name,
type=spec.type,
deps=dict(spec.deps),
metadata=dict(spec.metadata),
config=spec.config,
)
for spec in composition_layer_specs
]
filtered_snapshot = _filter_snapshot_to_specs(session_snapshot, composition_layer_specs)
return CreateRunRequest(
composition=RunComposition(layers=layers),
purpose="workflow_node",
idempotency_key=idempotency_key,
metadata=request_metadata,
session_snapshot=filtered_snapshot,
on_exit=LayerExitSignals(default=ExitIntent.DELETE),
)
def build_for_workflow_node(self, run_input: AgentBackendWorkflowNodeRunInput) -> CreateRunRequest:
"""Build a workflow Agent Node run request without defining another wire schema."""
layers: list[RunLayerSpec] = []
@ -135,6 +261,20 @@ class AgentBackendRunRequestBuilder:
metadata=run_input.metadata,
config=run_input.execution_context,
),
]
)
if run_input.include_history:
layers.append(
RunLayerSpec(
name=DIFY_AGENT_HISTORY_LAYER_ID,
type=PYDANTIC_AI_HISTORY_LAYER_TYPE_ID,
metadata={**run_input.metadata, "origin": "agent_session_history"},
)
)
layers.extend(
[
RunLayerSpec(
name=DIFY_AGENT_MODEL_LAYER_ID,
type=DIFY_PLUGIN_LLM_LAYER_TYPE_ID,

View File

@ -27,6 +27,7 @@ from core.moderation.base import ModerationError
from core.moderation.input_moderation import InputModeration
from core.repositories.factory import WorkflowExecutionRepository, WorkflowNodeExecutionRepository
from core.workflow.node_factory import get_default_root_node_id
from core.workflow.nodes.agent_v2.session_cleanup_layer import build_workflow_agent_session_cleanup_layer
from core.workflow.system_variables import (
build_bootstrap_variables,
build_system_variables,
@ -239,6 +240,7 @@ class AdvancedChatAppRunner(WorkflowBasedAppRunner):
)
workflow_entry.graph_engine.layer(persistence_layer)
workflow_entry.graph_engine.layer(build_workflow_agent_session_cleanup_layer())
conversation_variable_layer = ConversationVariablePersistenceLayer(
ConversationVariableUpdater(session_factory.get_session_maker())
)

View File

@ -10,6 +10,7 @@ from core.app.entities.app_invoke_entities import InvokeFrom, WorkflowAppGenerat
from core.app.workflow.layers.persistence import PersistenceWorkflowInfo, WorkflowPersistenceLayer
from core.repositories.factory import WorkflowExecutionRepository, WorkflowNodeExecutionRepository
from core.workflow.node_factory import get_default_root_node_id
from core.workflow.nodes.agent_v2.session_cleanup_layer import build_workflow_agent_session_cleanup_layer
from core.workflow.system_variables import build_bootstrap_variables, build_system_variables
from core.workflow.variable_pool_initializer import add_node_inputs_to_pool, add_variables_to_pool
from core.workflow.workflow_entry import WorkflowEntry
@ -166,6 +167,7 @@ class WorkflowAppRunner(WorkflowBasedAppRunner):
)
workflow_entry.graph_engine.layer(persistence_layer)
workflow_entry.graph_engine.layer(build_workflow_agent_session_cleanup_layer())
for layer in self._graph_engine_layers:
workflow_entry.graph_engine.layer(layer)

View File

@ -475,6 +475,7 @@ class DifyNodeFactory(NodeFactory):
from core.workflow.nodes.agent_v2.file_tenant_validator import UploadFileTenantValidator
from core.workflow.nodes.agent_v2.output_failure_orchestrator import OutputFailureOrchestrator
from core.workflow.nodes.agent_v2.output_type_checker import PerOutputTypeChecker
from core.workflow.nodes.agent_v2.session_store import WorkflowAgentRuntimeSessionStore
return {
"binding_resolver": WorkflowAgentBindingResolver(),
@ -494,6 +495,7 @@ class DifyNodeFactory(NodeFactory):
# outputs contain no file refs.
"type_checker": PerOutputTypeChecker(file_validator=UploadFileTenantValidator()),
"failure_orchestrator": OutputFailureOrchestrator(),
"session_store": WorkflowAgentRuntimeSessionStore(),
}
return {
"strategy_resolver": self._agent_strategy_resolver,

View File

@ -1,8 +1,11 @@
from __future__ import annotations
import logging
from collections.abc import Generator, Mapping, Sequence
from typing import TYPE_CHECKING, Any
from agenton.compositor import CompositorSessionSnapshot
from clients.agent_backend import (
AgentBackendError,
AgentBackendHTTPError,
@ -17,11 +20,14 @@ from clients.agent_backend import (
AgentBackendStreamInternalEvent,
AgentBackendTransportError,
AgentBackendValidationError,
CleanupLayerSpec,
extract_cleanup_layer_specs,
)
from core.app.entities.app_invoke_entities import DIFY_RUN_CONTEXT_KEY, DifyRunContext
from core.workflow.system_variables import SystemVariableKey, get_system_text
from graphon.entities.pause_reason import SchedulingPause
from graphon.enums import BuiltinNodeTypes, WorkflowNodeExecutionMetadataKey, WorkflowNodeExecutionStatus
from graphon.node_events import NodeEventBase, NodeRunResult, StreamCompletedEvent
from graphon.node_events import NodeEventBase, NodeRunResult, PauseRequestedEvent, StreamCompletedEvent
from graphon.nodes.base.node import Node
from models.agent_config_entities import WorkflowNodeJobConfig
@ -40,11 +46,14 @@ from .runtime_request_builder import (
WorkflowAgentRuntimeRequestBuilder,
WorkflowAgentRuntimeRequestBuildError,
)
from .session_store import WorkflowAgentRuntimeSessionStore, WorkflowAgentSessionScope
if TYPE_CHECKING:
from graphon.entities import GraphInitParams
from graphon.runtime import GraphRuntimeState
logger = logging.getLogger(__name__)
# Stage 4 §5+§7: the terminal events that `_consume_event_stream` may return.
# Stream + started events are filtered out before we yield; transport errors
@ -74,6 +83,7 @@ class DifyAgentNode(Node[DifyAgentNodeData]):
output_adapter: WorkflowAgentOutputAdapter,
type_checker: PerOutputTypeChecker,
failure_orchestrator: OutputFailureOrchestrator,
session_store: WorkflowAgentRuntimeSessionStore | None = None,
) -> None:
super().__init__(
node_id=node_id,
@ -88,6 +98,7 @@ class DifyAgentNode(Node[DifyAgentNodeData]):
self._output_adapter = output_adapter
self._type_checker = type_checker
self._failure_orchestrator = failure_orchestrator
self._session_store = session_store
@classmethod
def version(cls) -> str:
@ -134,6 +145,17 @@ class DifyAgentNode(Node[DifyAgentNodeData]):
"agent_config_snapshot_id": bundle.snapshot.id,
"binding_id": bundle.binding.id,
}
session_scope = WorkflowAgentSessionScope(
tenant_id=dify_ctx.tenant_id,
app_id=dify_ctx.app_id,
workflow_id=workflow_id,
workflow_run_id=workflow_run_id,
node_id=self._node_id,
node_execution_id=self.id,
binding_id=bundle.binding.id,
agent_id=bundle.agent.id,
agent_config_snapshot_id=bundle.snapshot.id,
)
# Stage 4 §4.1 (D-3): use effective outputs so defaults flow through both
# the backend request and the post-run type check.
@ -147,6 +169,9 @@ class DifyAgentNode(Node[DifyAgentNodeData]):
attempt = 0
while True:
try:
session_snapshot = None
if self._session_store is not None:
session_snapshot = self._session_store.load_active_snapshot(session_scope)
runtime_request = self._runtime_request_builder.build(
WorkflowAgentRuntimeBuildContext(
dify_context=dify_ctx,
@ -159,6 +184,7 @@ class DifyAgentNode(Node[DifyAgentNodeData]):
agent=bundle.agent,
snapshot=bundle.snapshot,
attempt=attempt,
session_snapshot=session_snapshot,
)
)
except WorkflowAgentRuntimeRequestBuildError as error:
@ -221,9 +247,35 @@ class DifyAgentNode(Node[DifyAgentNodeData]):
)
return
# Non-success terminal (failed / cancelled / paused) skips per-output
# post-processing — the backend itself already failed.
if isinstance(terminal_event, AgentBackendRunPausedInternalEvent):
self._save_session_snapshot(
session_scope=session_scope,
backend_run_id=terminal_event.run_id,
snapshot=terminal_event.session_snapshot,
composition_layer_specs=extract_cleanup_layer_specs(runtime_request.request.composition),
metadata=metadata,
)
yield PauseRequestedEvent(
reason=SchedulingPause(
message=terminal_event.message
or "Agent backend run requested workflow pause for external input."
)
)
return
# Non-success terminal (failed / cancelled) skips per-output
# post-processing — the backend itself already failed. We also retire
# the local ACTIVE session row so a workflow loop back into the same
# Agent node cannot resume from a stale snapshot. The failed agent
# backend layers (suspended per ``on_exit``) are left for agent
# backend's own GC; this row will no longer be picked up by the
# workflow-terminal cleanup layer.
if not isinstance(terminal_event, AgentBackendRunSucceededInternalEvent):
self._mark_session_cleaned_on_failure(
session_scope=session_scope,
backend_run_id=terminal_event.run_id,
metadata=metadata,
)
yield StreamCompletedEvent(
node_run_result=self._output_adapter.build_failure_result(
event=terminal_event,
@ -234,6 +286,14 @@ class DifyAgentNode(Node[DifyAgentNodeData]):
)
return
self._save_session_snapshot(
session_scope=session_scope,
backend_run_id=terminal_event.run_id,
snapshot=terminal_event.session_snapshot,
composition_layer_specs=extract_cleanup_layer_specs(runtime_request.request.composition),
metadata=metadata,
)
# ──── Stage 4: per-output type check ────
type_check = self._type_checker.check(
declared_outputs=effective_outputs,
@ -384,6 +444,75 @@ class DifyAgentNode(Node[DifyAgentNodeData]):
],
}
def _save_session_snapshot(
self,
*,
session_scope: WorkflowAgentSessionScope,
backend_run_id: str,
snapshot: CompositorSessionSnapshot | None,
composition_layer_specs: list[CleanupLayerSpec],
metadata: dict[str, Any],
) -> None:
if self._session_store is None:
return
try:
self._session_store.save_active_snapshot(
scope=session_scope,
backend_run_id=backend_run_id,
snapshot=snapshot,
composition_layer_specs=composition_layer_specs,
)
agent_backend = dict(metadata.get("agent_backend") or {})
agent_backend["session_snapshot_persisted"] = snapshot is not None
metadata["agent_backend"] = agent_backend
except Exception:
logger.warning(
"Failed to persist workflow Agent runtime session snapshot: "
"tenant_id=%s workflow_run_id=%s node_id=%s binding_id=%s agent_id=%s backend_run_id=%s",
session_scope.tenant_id,
session_scope.workflow_run_id,
session_scope.node_id,
session_scope.binding_id,
session_scope.agent_id,
backend_run_id,
exc_info=True,
)
agent_backend = dict(metadata.get("agent_backend") or {})
agent_backend["session_snapshot_persisted"] = False
agent_backend["session_snapshot_persist_error"] = "workflow_agent_runtime_session_store_error"
metadata["agent_backend"] = agent_backend
def _mark_session_cleaned_on_failure(
self,
*,
session_scope: WorkflowAgentSessionScope,
backend_run_id: str,
metadata: dict[str, Any],
) -> None:
if self._session_store is None:
return
try:
self._session_store.mark_cleaned(scope=session_scope, backend_run_id=backend_run_id)
agent_backend = dict(metadata.get("agent_backend") or {})
agent_backend["session_snapshot_cleaned_on_failure"] = True
metadata["agent_backend"] = agent_backend
except Exception:
logger.warning(
"Failed to mark workflow Agent runtime session cleaned on agent run failure: "
"tenant_id=%s workflow_run_id=%s node_id=%s binding_id=%s agent_id=%s backend_run_id=%s",
session_scope.tenant_id,
session_scope.workflow_run_id,
session_scope.node_id,
session_scope.binding_id,
session_scope.agent_id,
backend_run_id,
exc_info=True,
)
agent_backend = dict(metadata.get("agent_backend") or {})
agent_backend["session_snapshot_cleaned_on_failure"] = False
agent_backend["session_snapshot_cleanup_error"] = "workflow_agent_runtime_session_store_error"
metadata["agent_backend"] = agent_backend
@staticmethod
def _patch_event_with_defaults(
event: AgentBackendRunSucceededInternalEvent,

View File

@ -4,6 +4,7 @@ from collections.abc import Mapping, Sequence
from dataclasses import dataclass
from typing import Any, Literal, Protocol, cast
from agenton.compositor import CompositorSessionSnapshot
from dify_agent.layers.execution_context import DifyExecutionContextLayerConfig
from dify_agent.protocol import CreateRunRequest
@ -28,6 +29,7 @@ from models.agent_config_entities import (
from models.agent_config_entities import (
effective_declared_outputs as _effective_declared_outputs,
)
from models.provider_ids import ModelProviderID
from .output_failure_orchestrator import retry_idempotency_key
from .plugin_tools_builder import WorkflowAgentPluginToolsBuilder, WorkflowAgentPluginToolsBuildError
@ -66,6 +68,7 @@ class WorkflowAgentRuntimeBuildContext:
# Stage 4 §7 / D-4: 0 for the first run, then incremented per retry. Drives the
# idempotency key so the backend treats each retry as a fresh request.
attempt: int = 0
session_snapshot: CompositorSessionSnapshot | None = None
@dataclass(frozen=True, slots=True)
@ -129,11 +132,14 @@ class WorkflowAgentRuntimeRequestBuilder:
request = self._request_builder.build_for_workflow_node(
AgentBackendWorkflowNodeRunInput(
model=AgentBackendModelConfig(
plugin_id=agent_soul.model.plugin_id,
model_provider=agent_soul.model.model_provider,
plugin_id=self._plugin_daemon_plugin_id(
plugin_id=agent_soul.model.plugin_id,
model_provider=agent_soul.model.model_provider,
),
model_provider=self._plugin_daemon_provider_name(agent_soul.model.model_provider),
model=agent_soul.model.model,
credentials=self._normalize_credentials(credentials),
model_settings=cast(dict[str, Any], agent_soul.model.model_settings),
model_settings=agent_soul.model.model_settings,
),
# The execution-context layer is now the only public protocol
# carrier for Dify tenant/user/run identifiers. ``user_id`` must
@ -158,6 +164,7 @@ class WorkflowAgentRuntimeRequestBuilder:
user_prompt=user_prompt,
output=self._build_output_config(node_job.declared_outputs),
tools=tools_layer,
session_snapshot=context.session_snapshot,
idempotency_key=self._idempotency_key(context),
metadata=metadata,
)
@ -177,6 +184,20 @@ class WorkflowAgentRuntimeRequestBuilder:
return "single_step"
return "workflow_run"
@staticmethod
def _plugin_daemon_plugin_id(*, plugin_id: str, model_provider: str) -> str:
"""Return the transport plugin id expected by plugin-daemon headers."""
if plugin_id.count("/") == 1:
return plugin_id
if plugin_id:
return ModelProviderID(plugin_id).plugin_id
return ModelProviderID(model_provider).plugin_id
@staticmethod
def _plugin_daemon_provider_name(model_provider: str) -> str:
"""Return the provider name expected by plugin-daemon dispatch payloads."""
return ModelProviderID(model_provider).provider_name
@staticmethod
def _idempotency_key(context: WorkflowAgentRuntimeBuildContext) -> str:
# Stage 4 §7 / D-4: retries get distinct keys (``...:retry-{attempt}``) so

View File

@ -0,0 +1,247 @@
from __future__ import annotations
import logging
from typing import override
from clients.agent_backend import AgentBackendError, AgentBackendRunClient, AgentBackendRunRequestBuilder
from clients.agent_backend.factory import create_agent_backend_run_client
from configs import dify_config
from core.workflow.system_variables import SystemVariableKey, get_system_text
from graphon.graph_engine.layers import GraphEngineLayer
from graphon.graph_events import (
GraphEngineEvent,
GraphRunAbortedEvent,
GraphRunFailedEvent,
GraphRunPartialSucceededEvent,
GraphRunSucceededEvent,
)
from .session_store import StoredWorkflowAgentSession, WorkflowAgentRuntimeSessionStore
logger = logging.getLogger(__name__)
# Upper bound on how long a cleanup-only run is allowed to settle before the
# layer gives up and leaves the row ACTIVE so it can be retried later. Cleanup
# work is mostly local agent-backend bookkeeping (no LLM inference), so 30s is
# generous; a hung backend should never block workflow termination beyond this.
_CLEANUP_WAIT_TIMEOUT_SECONDS = 30.0
class WorkflowAgentSessionCleanupLayer(GraphEngineLayer):
"""Retires workflow Agent session snapshots when a workflow reaches a terminal state.
Implementation notes there are two failure modes the cleanup path has to
avoid simultaneously:
1. The agenton compositor on the agent-backend side validates the cleanup
request's session snapshot against the replayed composition before
running any lifecycle hook. If the snapshot's layer names diverge from
the composition, the run fails asynchronously with ``run_failed`` but
the initial ``POST /runs`` already returned 202, so the API side has no
visibility of the failure unless it waits for terminal status. The
``composition_layer_specs`` persistence in A.1A.4 plus the
``_filter_snapshot_to_specs`` shape in ``build_cleanup_request`` keeps
the two name lists in sync.
2. The current agent backend's ``runner.py::_run_agent`` always invokes
``run.get_layer("llm")`` and the structured-output / history validators
before exiting any slot there is no ``purpose: "cleanup"`` branch
yet. A truly cleanup-only request (no LLM layer) therefore still
crashes inside the runner with ``Layer 'llm' is not defined in this
compositor run.``. Until the backend grows a cleanup-only purpose,
this layer **does not issue an HTTP cleanup run**: it simply retires
the local snapshot row so stale state cannot be re-resumed, and lets
the agent backend's own retention TTL release the suspended layers.
The HTTP-cleanup machinery (``build_cleanup_request`` + ``wait_run``) is
intentionally still wired into the request builder + integration tests so
that when the agent backend supports cleanup runs we can flip the switch
here with a one-line change (see ``_HTTP_CLEANUP_SUPPORTED``).
"""
# Flip to True once dify-agent's runner has a ``purpose=cleanup`` branch
# that skips the LLM/output/user-prompt invariants. Until then we only
# update the local row; the spec list is still persisted so the future
# HTTP cleanup path has everything it needs.
_HTTP_CLEANUP_SUPPORTED: bool = False
_TERMINAL_EVENTS = (
GraphRunSucceededEvent,
GraphRunPartialSucceededEvent,
GraphRunFailedEvent,
GraphRunAbortedEvent,
)
def __init__(
self,
*,
session_store: WorkflowAgentRuntimeSessionStore,
request_builder: AgentBackendRunRequestBuilder,
agent_backend_client: AgentBackendRunClient | None,
cleanup_wait_timeout_seconds: float = _CLEANUP_WAIT_TIMEOUT_SECONDS,
) -> None:
super().__init__()
self._session_store = session_store
self._request_builder = request_builder
self._agent_backend_client = agent_backend_client
self._cleanup_wait_timeout_seconds = cleanup_wait_timeout_seconds
@override
def on_graph_start(self) -> None:
return
@override
def on_event(self, event: GraphEngineEvent) -> None:
if not isinstance(event, self._TERMINAL_EVENTS):
return
workflow_run_id = get_system_text(
self.graph_runtime_state.variable_pool,
SystemVariableKey.WORKFLOW_EXECUTION_ID,
)
if not workflow_run_id:
logger.warning("Skipping workflow Agent session cleanup: workflow_run_id is missing.")
return
for stored_session in self._session_store.list_active_sessions(workflow_run_id=workflow_run_id):
self._cleanup_session(stored_session)
@override
def on_graph_end(self, error: Exception | None) -> None:
return
def _cleanup_session(self, stored_session: StoredWorkflowAgentSession) -> None:
scope = stored_session.scope
if not self._HTTP_CLEANUP_SUPPORTED:
# Agent backend has no cleanup-only run mode yet (see class
# docstring). Retire the local row so future re-entries do not
# resume from stale state, and let the backend's retention TTL
# release the suspended layers on its own schedule.
logger.info(
"Workflow Agent session retired locally; HTTP cleanup is disabled "
"until the agent backend supports a cleanup-only run mode. "
"workflow_run_id=%s node_id=%s binding_id=%s agent_id=%s previous_run_id=%s",
scope.workflow_run_id,
scope.node_id,
scope.binding_id,
scope.agent_id,
stored_session.backend_run_id,
)
self._session_store.mark_cleaned(scope=scope, backend_run_id=stored_session.backend_run_id)
return
if self._agent_backend_client is None:
# HTTP cleanup was enabled by the caller but no client was wired
# in (e.g. the API runs without AGENT_BACKEND_BASE_URL configured).
# Leave the row ACTIVE so an operator restart with proper config
# can drive the cleanup; do not silently retire it.
logger.warning(
"Skipping Agent backend cleanup: HTTP cleanup is enabled but no agent "
"backend client is wired in. workflow_run_id=%s node_id=%s agent_id=%s",
scope.workflow_run_id,
scope.node_id,
scope.agent_id,
)
return
if not stored_session.composition_layer_specs:
# Sessions persisted before A.1 landed do not carry the spec list,
# so we cannot replay a valid cleanup composition. Leave the row
# ACTIVE and warn so the absence shows up in observability rather
# than being silently swallowed by a doomed cleanup run.
logger.warning(
"Skipping Agent backend cleanup: no composition_layer_specs persisted. "
"workflow_run_id=%s node_id=%s agent_id=%s",
scope.workflow_run_id,
scope.node_id,
scope.agent_id,
)
return
request = self._request_builder.build_cleanup_request(
session_snapshot=stored_session.session_snapshot,
composition_layer_specs=stored_session.composition_layer_specs,
idempotency_key=f"{scope.workflow_run_id}:{scope.node_id}:{scope.binding_id}:agent-session-cleanup",
metadata={
"tenant_id": scope.tenant_id,
"app_id": scope.app_id,
"workflow_id": scope.workflow_id,
"workflow_run_id": scope.workflow_run_id,
"node_id": scope.node_id,
"node_execution_id": scope.node_execution_id,
"binding_id": scope.binding_id,
"agent_id": scope.agent_id,
"agent_config_snapshot_id": scope.agent_config_snapshot_id,
"previous_agent_backend_run_id": stored_session.backend_run_id,
},
)
try:
response = self._agent_backend_client.create_run(request)
except AgentBackendError:
logger.warning(
"Agent backend session cleanup request failed: workflow_run_id=%s node_id=%s agent_id=%s",
scope.workflow_run_id,
scope.node_id,
scope.agent_id,
exc_info=True,
)
return
try:
status_response = self._agent_backend_client.wait_run(
response.run_id, timeout_seconds=self._cleanup_wait_timeout_seconds
)
except AgentBackendError:
logger.warning(
"Agent backend session cleanup wait_run failed: "
"workflow_run_id=%s node_id=%s agent_id=%s cleanup_run_id=%s",
scope.workflow_run_id,
scope.node_id,
scope.agent_id,
response.run_id,
exc_info=True,
)
return
if status_response.status != "succeeded":
logger.warning(
"Agent backend session cleanup did not succeed: status=%s error=%s "
"workflow_run_id=%s node_id=%s agent_id=%s cleanup_run_id=%s",
status_response.status,
status_response.error,
scope.workflow_run_id,
scope.node_id,
scope.agent_id,
response.run_id,
)
return
self._session_store.mark_cleaned(scope=scope, backend_run_id=response.run_id)
def build_workflow_agent_session_cleanup_layer() -> WorkflowAgentSessionCleanupLayer:
"""Wire the cleanup layer with the standard production dependencies.
The agent backend client is constructed only when ``AGENT_BACKEND_BASE_URL``
is configured (or the deterministic fake is explicitly enabled). When
neither is set for example unit tests that bring up the workflow runner
without an Agent node we pass ``None`` so the layer stays harmless. With
``_HTTP_CLEANUP_SUPPORTED = False`` the local-retire branch never touches
the client anyway, but keeping it ``None`` avoids importing httpx and lets
test harnesses skip backend configuration.
"""
agent_backend_client: AgentBackendRunClient | None
if dify_config.AGENT_BACKEND_USE_FAKE or dify_config.AGENT_BACKEND_BASE_URL:
agent_backend_client = create_agent_backend_run_client(
base_url=dify_config.AGENT_BACKEND_BASE_URL,
use_fake=dify_config.AGENT_BACKEND_USE_FAKE,
fake_scenario=dify_config.AGENT_BACKEND_FAKE_SCENARIO,
)
else:
agent_backend_client = None
return WorkflowAgentSessionCleanupLayer(
session_store=WorkflowAgentRuntimeSessionStore(),
request_builder=AgentBackendRunRequestBuilder(),
agent_backend_client=agent_backend_client,
)

View File

@ -0,0 +1,179 @@
from __future__ import annotations
from dataclasses import dataclass, field
from agenton.compositor import CompositorSessionSnapshot
from pydantic import TypeAdapter
from sqlalchemy import select
from clients.agent_backend.request_builder import CleanupLayerSpec
from core.db.session_factory import session_factory
from libs.datetime_utils import naive_utc_now
from models.agent import (
WorkflowAgentRuntimeSession,
WorkflowAgentRuntimeSessionStatus,
)
_SPECS_ADAPTER: TypeAdapter[list[CleanupLayerSpec]] = TypeAdapter(list[CleanupLayerSpec])
def _serialize_specs(specs: list[CleanupLayerSpec]) -> str:
return _SPECS_ADAPTER.dump_json(specs).decode()
def _deserialize_specs(value: str | None) -> list[CleanupLayerSpec]:
if not value:
return []
return _SPECS_ADAPTER.validate_json(value)
@dataclass(frozen=True, slots=True)
class WorkflowAgentSessionScope:
tenant_id: str
app_id: str
workflow_id: str
workflow_run_id: str | None
node_id: str
node_execution_id: str
binding_id: str
agent_id: str
agent_config_snapshot_id: str
@dataclass(frozen=True, slots=True)
class StoredWorkflowAgentSession:
scope: WorkflowAgentSessionScope
session_snapshot: CompositorSessionSnapshot
backend_run_id: str | None
composition_layer_specs: list[CleanupLayerSpec] = field(default_factory=list)
class WorkflowAgentRuntimeSessionStore:
"""Stores Agent backend session snapshots for workflow Agent node re-entry."""
def load_active_snapshot(self, scope: WorkflowAgentSessionScope) -> CompositorSessionSnapshot | None:
if scope.workflow_run_id is None:
return None
with session_factory.create_session() as session:
row = session.scalar(
select(WorkflowAgentRuntimeSession).where(
WorkflowAgentRuntimeSession.tenant_id == scope.tenant_id,
WorkflowAgentRuntimeSession.workflow_run_id == scope.workflow_run_id,
WorkflowAgentRuntimeSession.node_id == scope.node_id,
WorkflowAgentRuntimeSession.binding_id == scope.binding_id,
WorkflowAgentRuntimeSession.agent_id == scope.agent_id,
WorkflowAgentRuntimeSession.status == WorkflowAgentRuntimeSessionStatus.ACTIVE,
)
)
if row is None:
return None
return CompositorSessionSnapshot.model_validate_json(row.session_snapshot)
def list_active_sessions(self, *, workflow_run_id: str) -> list[StoredWorkflowAgentSession]:
with session_factory.create_session() as session:
rows = session.scalars(
select(WorkflowAgentRuntimeSession).where(
WorkflowAgentRuntimeSession.workflow_run_id == workflow_run_id,
WorkflowAgentRuntimeSession.status == WorkflowAgentRuntimeSessionStatus.ACTIVE,
)
).all()
return [
StoredWorkflowAgentSession(
scope=WorkflowAgentSessionScope(
tenant_id=row.tenant_id,
app_id=row.app_id,
workflow_id=row.workflow_id,
workflow_run_id=row.workflow_run_id,
node_id=row.node_id,
node_execution_id=row.node_execution_id or "",
binding_id=row.binding_id,
agent_id=row.agent_id,
agent_config_snapshot_id=row.agent_config_snapshot_id,
),
session_snapshot=CompositorSessionSnapshot.model_validate_json(row.session_snapshot),
backend_run_id=row.backend_run_id,
composition_layer_specs=_deserialize_specs(row.composition_layer_specs),
)
for row in rows
]
def save_active_snapshot(
self,
*,
scope: WorkflowAgentSessionScope,
backend_run_id: str,
snapshot: CompositorSessionSnapshot | None,
composition_layer_specs: list[CleanupLayerSpec],
) -> None:
if scope.workflow_run_id is None or snapshot is None:
return
snapshot_json = snapshot.model_dump_json()
specs_json = _serialize_specs(composition_layer_specs)
with session_factory.create_session() as session:
row = session.scalar(
select(WorkflowAgentRuntimeSession).where(
WorkflowAgentRuntimeSession.tenant_id == scope.tenant_id,
WorkflowAgentRuntimeSession.workflow_run_id == scope.workflow_run_id,
WorkflowAgentRuntimeSession.node_id == scope.node_id,
WorkflowAgentRuntimeSession.binding_id == scope.binding_id,
WorkflowAgentRuntimeSession.agent_id == scope.agent_id,
)
)
if row is None:
row = WorkflowAgentRuntimeSession(
tenant_id=scope.tenant_id,
app_id=scope.app_id,
workflow_id=scope.workflow_id,
workflow_run_id=scope.workflow_run_id,
node_id=scope.node_id,
node_execution_id=scope.node_execution_id,
binding_id=scope.binding_id,
agent_id=scope.agent_id,
agent_config_snapshot_id=scope.agent_config_snapshot_id,
backend_run_id=backend_run_id,
session_snapshot=snapshot_json,
composition_layer_specs=specs_json,
status=WorkflowAgentRuntimeSessionStatus.ACTIVE,
)
session.add(row)
else:
row.node_execution_id = scope.node_execution_id
row.agent_config_snapshot_id = scope.agent_config_snapshot_id
row.backend_run_id = backend_run_id
row.session_snapshot = snapshot_json
row.composition_layer_specs = specs_json
row.status = WorkflowAgentRuntimeSessionStatus.ACTIVE
row.cleaned_at = None
session.commit()
def mark_cleaned(self, *, scope: WorkflowAgentSessionScope, backend_run_id: str | None = None) -> None:
if scope.workflow_run_id is None:
return
with session_factory.create_session() as session:
row = session.scalar(
select(WorkflowAgentRuntimeSession).where(
WorkflowAgentRuntimeSession.tenant_id == scope.tenant_id,
WorkflowAgentRuntimeSession.workflow_run_id == scope.workflow_run_id,
WorkflowAgentRuntimeSession.node_id == scope.node_id,
WorkflowAgentRuntimeSession.binding_id == scope.binding_id,
WorkflowAgentRuntimeSession.agent_id == scope.agent_id,
WorkflowAgentRuntimeSession.status == WorkflowAgentRuntimeSessionStatus.ACTIVE,
)
)
if row is None:
return
if backend_run_id is not None:
row.backend_run_id = backend_run_id
row.status = WorkflowAgentRuntimeSessionStatus.CLEANED
row.cleaned_at = naive_utc_now()
session.commit()
__all__ = [
"StoredWorkflowAgentSession",
"WorkflowAgentRuntimeSessionStore",
"WorkflowAgentSessionScope",
]

View File

@ -0,0 +1,90 @@
"""add workflow agent runtime sessions
Revision ID: 7885bd53f9a9
Revises: d4a5e1f3c9b7
Create Date: 2026-05-27 09:53:54.711805
"""
import sqlalchemy as sa
from alembic import op
import models as models
# revision identifiers, used by Alembic.
revision = "7885bd53f9a9"
down_revision = "d4a5e1f3c9b7"
branch_labels = None
depends_on = None
def _is_pg() -> bool:
return op.get_bind().dialect.name == "postgresql"
def _uuid_column(name: str, *, nullable: bool = False, primary_key: bool = False) -> sa.Column:
"""Match the ``uuidv7()`` default that other tables on Postgres rely on,
while staying portable on MySQL where the ORM supplies the id."""
kwargs: dict[str, object] = {"nullable": nullable, "primary_key": primary_key}
if primary_key and _is_pg():
kwargs["server_default"] = sa.text("uuidv7()")
return sa.Column(name, models.types.StringUUID(), **kwargs)
def upgrade() -> None:
op.create_table(
"workflow_agent_runtime_sessions",
_uuid_column("id", primary_key=True),
sa.Column("tenant_id", models.types.StringUUID(), nullable=False),
sa.Column("app_id", models.types.StringUUID(), nullable=False),
sa.Column("workflow_id", models.types.StringUUID(), nullable=False),
sa.Column("workflow_run_id", models.types.StringUUID(), nullable=False),
sa.Column("node_id", sa.String(length=255), nullable=False),
sa.Column("node_execution_id", sa.String(length=255), nullable=True),
sa.Column("binding_id", models.types.StringUUID(), nullable=False),
sa.Column("agent_id", models.types.StringUUID(), nullable=False),
sa.Column("agent_config_snapshot_id", models.types.StringUUID(), nullable=False),
sa.Column("backend_run_id", sa.String(length=255), nullable=True),
sa.Column("session_snapshot", models.types.LongText(), nullable=False),
# MySQL rejects ``server_default`` on TEXT/BLOB columns. The JSON
# payload is always populated at the ORM layer via
# ``WorkflowAgentRuntimeSessionStore.save_active_snapshot`` so the
# missing DB-level default cannot leave new rows uninitialized.
sa.Column("composition_layer_specs", models.types.LongText(), nullable=False),
sa.Column(
"status",
sa.String(length=32),
server_default=sa.text("'active'"),
nullable=False,
),
sa.Column("cleaned_at", sa.DateTime(), nullable=True),
sa.Column("created_at", sa.DateTime(), server_default=sa.func.current_timestamp(), nullable=False),
sa.Column("updated_at", sa.DateTime(), server_default=sa.func.current_timestamp(), nullable=False),
sa.PrimaryKeyConstraint("id", name=op.f("workflow_agent_runtime_session_pkey")),
sa.UniqueConstraint(
"tenant_id",
"workflow_run_id",
"node_id",
"binding_id",
"agent_id",
name=op.f("workflow_agent_runtime_session_scope_unique"),
),
)
with op.batch_alter_table("workflow_agent_runtime_sessions", schema=None) as batch_op:
batch_op.create_index(
"workflow_agent_runtime_session_lookup_idx",
["tenant_id", "workflow_run_id", "node_id", "status"],
unique=False,
)
batch_op.create_index(
"workflow_agent_runtime_session_backend_run_idx",
["backend_run_id"],
unique=False,
)
def downgrade() -> None:
with op.batch_alter_table("workflow_agent_runtime_sessions", schema=None) as batch_op:
batch_op.drop_index("workflow_agent_runtime_session_backend_run_idx")
batch_op.drop_index("workflow_agent_runtime_session_lookup_idx")
op.drop_table("workflow_agent_runtime_sessions")

View File

@ -20,6 +20,8 @@ from .agent import (
AgentStatus,
WorkflowAgentBindingType,
WorkflowAgentNodeBinding,
WorkflowAgentRuntimeSession,
WorkflowAgentRuntimeSessionStatus,
)
from .api_based_extension import APIBasedExtension, APIBasedExtensionPoint
from .comment import (
@ -235,6 +237,8 @@ __all__ = [
"Workflow",
"WorkflowAgentBindingType",
"WorkflowAgentNodeBinding",
"WorkflowAgentRuntimeSession",
"WorkflowAgentRuntimeSessionStatus",
"WorkflowAppLog",
"WorkflowAppLogCreatedFrom",
"WorkflowArchiveLog",

View File

@ -92,6 +92,15 @@ class WorkflowAgentBindingType(StrEnum):
INLINE_AGENT = "inline_agent"
class WorkflowAgentRuntimeSessionStatus(StrEnum):
"""Lifecycle state of an Agent backend session snapshot owned by a workflow run."""
# Snapshot can be reused by a later Agent run in the same workflow run.
ACTIVE = "active"
# Snapshot has been retired and must not be submitted to Agent backend again.
CLEANED = "cleaned"
class Agent(DefaultFieldsMixin, Base):
"""Workspace-scoped Agent identity used by Agent Roster and workflow-only agents."""
@ -273,3 +282,56 @@ class WorkflowAgentNodeBinding(DefaultFieldsMixin, Base):
if isinstance(self.node_job_config, str):
return json.loads(self.node_job_config)
return dict(self.node_job_config)
class WorkflowAgentRuntimeSession(DefaultFieldsMixin, Base):
"""Persisted Agent backend session snapshot for one workflow Agent node execution scope.
The snapshot is runtime state returned by Agent backend. It is intentionally
separate from Agent Soul snapshots and workflow node-job config.
"""
__tablename__ = "workflow_agent_runtime_sessions"
__table_args__ = (
sa.PrimaryKeyConstraint("id", name="workflow_agent_runtime_session_pkey"),
UniqueConstraint(
"tenant_id",
"workflow_run_id",
"node_id",
"binding_id",
"agent_id",
name="workflow_agent_runtime_session_scope_unique",
),
Index(
"workflow_agent_runtime_session_lookup_idx",
"tenant_id",
"workflow_run_id",
"node_id",
"status",
),
Index("workflow_agent_runtime_session_backend_run_idx", "backend_run_id"),
)
tenant_id: Mapped[str] = mapped_column(StringUUID, nullable=False)
app_id: Mapped[str] = mapped_column(StringUUID, nullable=False)
workflow_id: Mapped[str] = mapped_column(StringUUID, nullable=False)
workflow_run_id: Mapped[str] = mapped_column(StringUUID, nullable=False)
node_id: Mapped[str] = mapped_column(String(255), nullable=False)
node_execution_id: Mapped[str | None] = mapped_column(String(255), nullable=True)
binding_id: Mapped[str] = mapped_column(StringUUID, nullable=False)
agent_id: Mapped[str] = mapped_column(StringUUID, nullable=False)
agent_config_snapshot_id: Mapped[str] = mapped_column(StringUUID, nullable=False)
backend_run_id: Mapped[str | None] = mapped_column(String(255), nullable=True)
session_snapshot: Mapped[str] = mapped_column(LongText, nullable=False)
# JSON-encoded list of ``WorkflowAgentSessionLayerSpec`` ({name, type, deps,
# config}). Drives Agent backend cleanup-only runs: the agenton compositor
# rejects a session snapshot whose layer names do not match the cleanup
# composition, so we must replay the same layer graph (minus credential-
# bearing plugin layers) when issuing the cleanup request.
composition_layer_specs: Mapped[str] = mapped_column(LongText, nullable=False, server_default="[]")
status: Mapped[WorkflowAgentRuntimeSessionStatus] = mapped_column(
EnumText(WorkflowAgentRuntimeSessionStatus, length=32),
nullable=False,
default=WorkflowAgentRuntimeSessionStatus.ACTIVE,
)
cleaned_at: Mapped[datetime | None] = mapped_column(DateTime, nullable=True)

View File

@ -0,0 +1,134 @@
"""Integration test for the cleanup request against the real agenton compositor.
The bug fixed by A+D was invisible to unit tests that use ``FakeAgentBackendRunClient``
because the fake client never runs agenton's ``_validate_session_snapshot``. This
test plugs a cleanup request through the real ``Compositor`` (with the same
providers the agent backend wires in production) so that the snapshot-vs-
composition name-order check would fail loudly if the cleanup builder ever
regressed back to the empty-composition shape.
"""
from __future__ import annotations
from typing import cast
import pytest
from agenton.compositor import Compositor, CompositorSessionSnapshot, LayerProvider
from agenton.compositor.schemas import LayerSessionSnapshot
from agenton.layers.base import LifecycleState
from agenton_collections.layers.plain import PLAIN_PROMPT_LAYER_TYPE_ID
from agenton_collections.layers.plain.basic import PromptLayer
from agenton_collections.layers.pydantic_ai import PYDANTIC_AI_HISTORY_LAYER_TYPE_ID, PydanticAIHistoryLayer
from clients.agent_backend import AgentBackendRunRequestBuilder, CleanupLayerSpec
def test_cleanup_request_passes_agenton_snapshot_validation():
"""The cleanup request's composition layer names must match the (filtered)
snapshot's layer names exactly — agenton's compositor enforces this and
the agent backend rejects mismatches as ``run_failed`` asynchronously,
which is the trap A/D fixed."""
# Persisted (non-plugin) layer specs — these are what cleanup will replay.
# We exclude the dify.execution_context layer from this integration check
# because its real provider needs a plugin-daemon HTTP client; the cleanup
# validation we are exercising is the snapshot-vs-composition name check,
# which is purely structural and does not depend on which non-plugin layer
# types appear.
persisted_specs = [
CleanupLayerSpec(
name="workflow_node_job_prompt",
type=PLAIN_PROMPT_LAYER_TYPE_ID,
config={"prefix": "Do the cleanup."},
),
CleanupLayerSpec(name="history", type=PYDANTIC_AI_HISTORY_LAYER_TYPE_ID),
]
# Saved snapshot still carries the LLM layer entry — cleanup's
# ``_filter_snapshot_to_specs`` must drop it so names match.
full_snapshot = CompositorSessionSnapshot(
layers=[
LayerSessionSnapshot(
name="workflow_node_job_prompt",
lifecycle_state=LifecycleState.SUSPENDED,
runtime_state={},
),
LayerSessionSnapshot(
name="history",
lifecycle_state=LifecycleState.SUSPENDED,
runtime_state={"messages": []},
),
LayerSessionSnapshot(
name="llm",
lifecycle_state=LifecycleState.SUSPENDED,
runtime_state={},
),
]
)
cleanup_request = AgentBackendRunRequestBuilder().build_cleanup_request(
session_snapshot=full_snapshot,
composition_layer_specs=persisted_specs,
)
# Drive the real agenton compositor through ``from_config`` + ``_create_run``
# the same way the agent backend's RunScheduler does. ``_create_run`` is the
# private path that calls ``_validate_session_snapshot``; we use it directly
# to keep the test synchronous (no async ``enter()`` lifecycle needed —
# validation is the only thing under test).
config = {
"schema_version": 1,
"layers": [
{"name": layer.name, "type": layer.type, "deps": dict(layer.deps), "metadata": dict(layer.metadata)}
for layer in cleanup_request.composition.layers
],
}
compositor = Compositor.from_config(
config,
providers=[
LayerProvider.from_layer_type(PromptLayer),
LayerProvider.from_layer_type(PydanticAIHistoryLayer),
],
)
layer_configs = {layer.name: layer.config for layer in cleanup_request.composition.layers}
# This is the call that would raise ``ValueError`` if the cleanup snapshot
# and composition disagreed on layer names — the exact failure mode the
# original ``layers=[]`` cleanup hit.
run = compositor._create_run( # type: ignore[reportPrivateUsage]
configs=cast(dict[str, object], layer_configs),
session_snapshot=cleanup_request.session_snapshot,
)
assert list(run.slots.keys()) == ["workflow_node_job_prompt", "history"]
def test_cleanup_request_with_mismatched_specs_would_be_rejected_by_agenton():
"""Regression sentinel: if a future refactor stops filtering the snapshot,
agenton would reject the request and that rejection is what the runtime
fix is preventing. We confirm the validator does fail when given the
pre-fix shape so the previous test's success is not a coincidence."""
snapshot_with_extra = CompositorSessionSnapshot(
layers=[
LayerSessionSnapshot(
name="history",
lifecycle_state=LifecycleState.SUSPENDED,
runtime_state={},
),
LayerSessionSnapshot(
name="llm", # extra layer not in composition
lifecycle_state=LifecycleState.SUSPENDED,
runtime_state={},
),
]
)
compositor = Compositor.from_config(
{
"schema_version": 1,
"layers": [{"name": "history", "type": PYDANTIC_AI_HISTORY_LAYER_TYPE_ID, "deps": {}, "metadata": {}}],
},
providers=[LayerProvider.from_layer_type(PydanticAIHistoryLayer)],
)
with pytest.raises(ValueError, match="layer names must match"):
compositor._create_run( # type: ignore[reportPrivateUsage]
configs={},
session_snapshot=snapshot_with_extra,
)

View File

@ -63,3 +63,25 @@ def test_fake_client_cancel_run_returns_cancelled_status():
assert cancelled.run_id == "fake-run-1"
assert cancelled.status == "cancelled"
def test_fake_client_paused_scenario_returns_paused_status_and_event():
"""The paused scenario exists for HITL-style flows; both ``wait_run`` and
the event stream must report the pause so consumers can branch on it."""
client = FakeAgentBackendRunClient(scenario=FakeAgentBackendScenario.PAUSED)
status = client.wait_run("fake-run-1")
events = list(client.stream_events("fake-run-1"))
assert status.status == "paused"
assert status.error is None
assert events[-1].type == "run_paused"
assert events[-1].data.reason == "human_input_required"
def test_fake_client_success_wait_run_returns_succeeded_status():
"""Covers the default SUCCESS branch of ``wait_run`` directly."""
status = FakeAgentBackendRunClient().wait_run("fake-run-1")
assert status.status == "succeeded"
assert status.error is None

View File

@ -1,15 +1,23 @@
from typing import Any, cast
import pytest
from agenton.compositor import CompositorSessionSnapshot
from agenton.compositor.schemas import LayerSessionSnapshot
from agenton.layers import ExitIntent
from agenton_collections.layers.plain import PLAIN_PROMPT_LAYER_TYPE_ID
from agenton.layers.base import LifecycleState
from agenton_collections.layers.plain import PLAIN_PROMPT_LAYER_TYPE_ID, PromptLayerConfig
from agenton_collections.layers.pydantic_ai import PYDANTIC_AI_HISTORY_LAYER_TYPE_ID
from dify_agent.layers.dify_plugin import (
DIFY_PLUGIN_LLM_LAYER_TYPE_ID,
DIFY_PLUGIN_TOOLS_LAYER_TYPE_ID,
DifyPluginLLMLayerConfig,
DifyPluginToolConfig,
DifyPluginToolsLayerConfig,
)
from dify_agent.layers.execution_context import DIFY_EXECUTION_CONTEXT_LAYER_TYPE_ID, DifyExecutionContextLayerConfig
from dify_agent.layers.output import DIFY_OUTPUT_LAYER_TYPE_ID
from dify_agent.protocol import (
DIFY_AGENT_HISTORY_LAYER_ID,
DIFY_AGENT_MODEL_LAYER_ID,
DIFY_AGENT_OUTPUT_LAYER_ID,
CreateRunRequest,
@ -26,6 +34,7 @@ from clients.agent_backend import (
AgentBackendOutputConfig,
AgentBackendRunRequestBuilder,
AgentBackendWorkflowNodeRunInput,
CleanupLayerSpec,
redact_for_agent_backend_log,
)
@ -71,10 +80,11 @@ def test_request_builder_outputs_dify_agent_create_run_request():
WORKFLOW_NODE_JOB_PROMPT_LAYER_ID,
WORKFLOW_USER_PROMPT_LAYER_ID,
DIFY_EXECUTION_CONTEXT_LAYER_ID,
DIFY_AGENT_HISTORY_LAYER_ID,
DIFY_AGENT_MODEL_LAYER_ID,
DIFY_AGENT_OUTPUT_LAYER_ID,
]
assert request.on_exit.default is ExitIntent.DELETE
assert request.on_exit.default is ExitIntent.SUSPEND
assert request.idempotency_key == "workflow-run-1:node-execution-1"
assert request.metadata == {"workflow_id": "workflow-1", "node_id": "node-1"}
@ -99,9 +109,10 @@ def test_request_builder_sets_model_and_output_layer_contract_ids():
layers = {layer.name: layer for layer in request.composition.layers}
assert layers[DIFY_EXECUTION_CONTEXT_LAYER_ID].type == DIFY_EXECUTION_CONTEXT_LAYER_TYPE_ID
assert layers[DIFY_EXECUTION_CONTEXT_LAYER_ID].config.user_id == "user-1"
assert cast(DifyExecutionContextLayerConfig, layers[DIFY_EXECUTION_CONTEXT_LAYER_ID].config).user_id == "user-1"
assert layers[DIFY_AGENT_HISTORY_LAYER_ID].type == PYDANTIC_AI_HISTORY_LAYER_TYPE_ID
assert layers[DIFY_AGENT_MODEL_LAYER_ID].type == DIFY_PLUGIN_LLM_LAYER_TYPE_ID
assert layers[DIFY_AGENT_MODEL_LAYER_ID].config.plugin_id == "langgenius/openai"
assert cast(DifyPluginLLMLayerConfig, layers[DIFY_AGENT_MODEL_LAYER_ID].config).plugin_id == "langgenius/openai"
assert layers[DIFY_AGENT_MODEL_LAYER_ID].deps == {"execution_context": DIFY_EXECUTION_CONTEXT_LAYER_ID}
assert layers[DIFY_AGENT_OUTPUT_LAYER_ID].type == DIFY_OUTPUT_LAYER_TYPE_ID
@ -130,16 +141,92 @@ def test_request_builder_adds_dify_plugin_tools_layer_when_configured():
assert layers[DIFY_PLUGIN_TOOLS_LAYER_ID].type == DIFY_PLUGIN_TOOLS_LAYER_TYPE_ID
assert layers[DIFY_PLUGIN_TOOLS_LAYER_ID].deps == {"execution_context": DIFY_EXECUTION_CONTEXT_LAYER_ID}
assert layers[DIFY_PLUGIN_TOOLS_LAYER_ID].config.tools[0].tool_name == "current_time"
tools_config = cast(DifyPluginToolsLayerConfig, layers[DIFY_PLUGIN_TOOLS_LAYER_ID].config)
assert tools_config.tools[0].tool_name == "current_time"
def test_request_builder_can_suspend_on_exit_for_resume_or_babysit_paths():
def test_request_builder_can_delete_on_exit_for_cleanup_paths():
run_input = _run_input()
run_input.suspend_on_exit = True
run_input.suspend_on_exit = False
request = AgentBackendRunRequestBuilder().build_for_workflow_node(run_input)
assert request.on_exit.default is ExitIntent.SUSPEND
assert request.on_exit.default is ExitIntent.DELETE
def test_request_builder_builds_cleanup_request_replays_persisted_layer_specs():
"""The cleanup request must replay the persisted (non-plugin) layer specs
and filter the snapshot to match so the agenton compositor's
snapshot-vs-composition name-order validator passes."""
session_snapshot = CompositorSessionSnapshot(
layers=[
LayerSessionSnapshot(name="history", lifecycle_state=LifecycleState.SUSPENDED, runtime_state={"k": 1}),
LayerSessionSnapshot(name="llm", lifecycle_state=LifecycleState.SUSPENDED, runtime_state={}),
]
)
specs = [CleanupLayerSpec(name="history", type="pydantic_ai.history")]
request = AgentBackendRunRequestBuilder().build_cleanup_request(
session_snapshot=session_snapshot,
composition_layer_specs=specs,
idempotency_key="run-1:node-1:binding-1:agent-session-cleanup",
metadata={"workflow_run_id": "run-1"},
)
assert [layer.name for layer in request.composition.layers] == ["history"]
assert request.session_snapshot is not None
assert [layer.name for layer in request.session_snapshot.layers] == ["history"]
assert request.on_exit.default is ExitIntent.DELETE
assert request.idempotency_key == "run-1:node-1:binding-1:agent-session-cleanup"
assert request.metadata["agent_backend_lifecycle"] == "session_cleanup"
def test_request_builder_rejects_empty_composition_layer_specs():
"""Empty specs would put us back in the original ``layers=[]`` trap that
fails on agenton's snapshot-vs-composition validation."""
with pytest.raises(ValueError, match="composition_layer_specs"):
AgentBackendRunRequestBuilder().build_cleanup_request(
session_snapshot=CompositorSessionSnapshot(layers=[]),
composition_layer_specs=[],
)
def test_extract_cleanup_layer_specs_drops_plugin_layers_keeps_configs():
from dify_agent.protocol import RunComposition, RunLayerSpec
from clients.agent_backend import extract_cleanup_layer_specs
composition = RunComposition(
layers=[
RunLayerSpec(
name="agent_soul_prompt",
type="plain.prompt",
config=PromptLayerConfig(prefix="hello"),
),
RunLayerSpec(
name="llm",
type="dify.plugin.llm",
config=None, # protocol allows None; the redacted config is what matters
),
RunLayerSpec(
name="tools",
type="dify.plugin.tools",
),
RunLayerSpec(
name="history",
type="pydantic_ai.history",
),
]
)
specs = extract_cleanup_layer_specs(composition)
assert [spec.name for spec in specs] == ["agent_soul_prompt", "history"]
# Non-plugin configs are dumped as JSON-compatible dicts so the persisted
# row can be replayed without holding live pydantic instances.
soul_config = specs[0].config
assert isinstance(soul_config, dict)
assert soul_config.get("prefix") == "hello"
def test_request_builder_rejects_blank_prompts():
@ -159,6 +246,6 @@ def test_request_builder_rejects_blank_prompts():
def test_redact_for_agent_backend_log_hides_credentials():
request = AgentBackendRunRequestBuilder().build_for_workflow_node(_run_input())
redacted = redact_for_agent_backend_log(request)
redacted = cast(dict[str, Any], redact_for_agent_backend_log(request))
assert redacted["composition"]["layers"][4]["config"]["credentials"] == "[REDACTED]"
assert redacted["composition"]["layers"][5]["config"]["credentials"] == "[REDACTED]"

View File

@ -1,9 +1,12 @@
from types import SimpleNamespace
from typing import cast
from agenton.compositor import CompositorSessionSnapshot
from clients.agent_backend import (
AgentBackendRunEventAdapter,
AgentBackendStreamInternalEvent,
CleanupLayerSpec,
FakeAgentBackendRunClient,
FakeAgentBackendScenario,
)
@ -13,9 +16,10 @@ from core.workflow.nodes.agent_v2.binding_resolver import WorkflowAgentBindingBu
from core.workflow.nodes.agent_v2.entities import DifyAgentNodeData
from core.workflow.nodes.agent_v2.output_adapter import WorkflowAgentOutputAdapter
from core.workflow.nodes.agent_v2.runtime_request_builder import WorkflowAgentRuntimeRequestBuilder
from core.workflow.nodes.agent_v2.session_store import WorkflowAgentRuntimeSessionStore, WorkflowAgentSessionScope
from graphon.entities import GraphInitParams
from graphon.enums import BuiltinNodeTypes, WorkflowNodeExecutionMetadataKey, WorkflowNodeExecutionStatus
from graphon.node_events import StreamCompletedEvent
from graphon.node_events import PauseRequestedEvent, StreamCompletedEvent
from graphon.runtime import GraphRuntimeState
from graphon.variables.segments import StringSegment
from models.agent import Agent, AgentConfigSnapshot, WorkflowAgentNodeBinding
@ -84,7 +88,47 @@ class FakeBindingResolver(WorkflowAgentBindingResolver):
return WorkflowAgentBindingBundle(binding=self.binding, agent=self.agent, snapshot=self.snapshot)
def _node(*, scenario: FakeAgentBackendScenario = FakeAgentBackendScenario.SUCCESS) -> DifyAgentNode:
class FakeSessionStore:
def __init__(self, snapshot: CompositorSessionSnapshot | None = None) -> None:
self.loaded_snapshot = snapshot
self.saved: list[
tuple[
WorkflowAgentSessionScope,
str,
CompositorSessionSnapshot | None,
list[CleanupLayerSpec],
]
] = []
self.cleaned: list[tuple[WorkflowAgentSessionScope, str | None]] = []
def load_active_snapshot(self, scope: WorkflowAgentSessionScope) -> CompositorSessionSnapshot | None:
return self.loaded_snapshot
def save_active_snapshot(
self,
*,
scope: WorkflowAgentSessionScope,
backend_run_id: str,
snapshot: CompositorSessionSnapshot | None,
composition_layer_specs: list[CleanupLayerSpec],
) -> None:
self.saved.append((scope, backend_run_id, snapshot, list(composition_layer_specs)))
def mark_cleaned(
self,
*,
scope: WorkflowAgentSessionScope,
backend_run_id: str | None = None,
) -> None:
self.cleaned.append((scope, backend_run_id))
def _node(
*,
scenario: FakeAgentBackendScenario = FakeAgentBackendScenario.SUCCESS,
agent_backend_client: FakeAgentBackendRunClient | None = None,
session_store: FakeSessionStore | None = None,
) -> DifyAgentNode:
graph_init_params = GraphInitParams(
workflow_id="workflow-1",
graph_config={"nodes": [], "edges": []},
@ -106,6 +150,7 @@ def _node(*, scenario: FakeAgentBackendScenario = FakeAgentBackendScenario.SUCCE
def is_owned_by_tenant(self, *, file_id: str, tenant_id: str) -> bool:
return True
client = agent_backend_client or FakeAgentBackendRunClient(scenario=scenario)
return DifyAgentNode(
node_id="agent-node",
data=DifyAgentNodeData.model_validate({"type": BuiltinNodeTypes.AGENT, "version": "2"}),
@ -113,11 +158,12 @@ def _node(*, scenario: FakeAgentBackendScenario = FakeAgentBackendScenario.SUCCE
graph_runtime_state=cast(GraphRuntimeState, SimpleNamespace(variable_pool=FakeVariablePool())),
binding_resolver=FakeBindingResolver(),
runtime_request_builder=WorkflowAgentRuntimeRequestBuilder(credentials_provider=FakeCredentialsProvider()),
agent_backend_client=FakeAgentBackendRunClient(scenario=scenario),
agent_backend_client=client,
event_adapter=AgentBackendRunEventAdapter(),
output_adapter=WorkflowAgentOutputAdapter(),
type_checker=PerOutputTypeChecker(file_validator=_AlwaysAllowFileValidator()),
failure_orchestrator=OutputFailureOrchestrator(),
session_store=cast(WorkflowAgentRuntimeSessionStore | None, session_store),
)
@ -132,7 +178,7 @@ def test_agent_node_run_maps_successful_agent_backend_run_to_node_result():
assert agent_log["agent_backend"]["run_id"] == "fake-run-1"
assert agent_log["agent_backend"]["status"] == "succeeded"
assert result.process_data["agent_id"] == "agent-1"
assert result.inputs["agent_backend_request"]["composition"]["layers"][4]["config"]["credentials"] == "[REDACTED]"
assert result.inputs["agent_backend_request"]["composition"]["layers"][5]["config"]["credentials"] == "[REDACTED]"
def test_agent_node_run_maps_failed_agent_backend_run_to_node_result():
@ -145,6 +191,126 @@ def test_agent_node_run_maps_failed_agent_backend_run_to_node_result():
assert result.error_type == "unit_test"
def test_agent_node_failed_run_marks_session_cleaned_to_prevent_stale_reuse():
"""A failed agent run must retire the local ACTIVE session row so a workflow
loop back into the same Agent node does not resume from a stale snapshot."""
existing_snapshot = CompositorSessionSnapshot(layers=[])
store = FakeSessionStore(snapshot=existing_snapshot)
events = list(_node(scenario=FakeAgentBackendScenario.FAILED, session_store=store)._run())
assert len(events) == 1
assert store.cleaned, "failed agent run should mark the session cleaned"
cleaned_scope, cleaned_backend_run_id = store.cleaned[0]
assert cleaned_scope.workflow_run_id == "workflow-run-1"
assert cleaned_backend_run_id == "fake-run-1"
# A failed run does not produce a fresh snapshot to persist.
assert store.saved == []
def test_agent_node_saves_success_snapshot_and_reuses_existing_snapshot():
existing_snapshot = CompositorSessionSnapshot(layers=[])
store = FakeSessionStore(snapshot=existing_snapshot)
client = FakeAgentBackendRunClient()
node = _node(agent_backend_client=client, session_store=store)
events = list(node._run())
assert len(events) == 1
assert store.saved
scope, backend_run_id, saved_snapshot, saved_specs = store.saved[0]
assert scope.workflow_run_id == "workflow-run-1"
assert backend_run_id == "fake-run-1"
assert saved_snapshot is not None
assert client.request is not None
assert client.request.session_snapshot is existing_snapshot
# Persist enough composition shape to replay a cleanup run; plugin layers
# (which would carry credentials) are intentionally absent.
saved_layer_names = [spec.name for spec in saved_specs]
assert saved_layer_names, "cleanup specs must persist at least the non-plugin layers"
plugin_types = {"dify.plugin.llm", "dify.plugin.tools"}
assert not {spec.type for spec in saved_specs} & plugin_types
def test_agent_node_run_when_session_store_save_raises_records_persist_error_in_metadata():
"""A DB-side write failure must not crash the node; it should set
``session_snapshot_persist_error`` in the agent_backend metadata so the
incident is observable from the workflow_node_executions record."""
class _ExplodingSessionStore(FakeSessionStore):
def save_active_snapshot(self, **kwargs): # type: ignore[override]
del kwargs
raise RuntimeError("simulated DB failure")
store = _ExplodingSessionStore()
events = list(_node(session_store=store)._run())
assert len(events) == 1
result = cast(StreamCompletedEvent, events[0]).node_run_result
assert result.status == WorkflowNodeExecutionStatus.SUCCEEDED
agent_backend = result.metadata[WorkflowNodeExecutionMetadataKey.AGENT_LOG]["agent_backend"]
assert agent_backend["session_snapshot_persisted"] is False
assert agent_backend["session_snapshot_persist_error"] == "workflow_agent_runtime_session_store_error"
def test_agent_node_failed_run_when_mark_cleaned_raises_records_cleanup_error_in_metadata():
"""Same defensive pattern: a DB-side mark_cleaned failure must surface as
a ``session_snapshot_cleanup_error`` in metadata, not as a node crash."""
class _ExplodingMarkCleanedStore(FakeSessionStore):
def mark_cleaned(self, **kwargs): # type: ignore[override]
del kwargs
raise RuntimeError("simulated DB failure")
store = _ExplodingMarkCleanedStore()
events = list(_node(scenario=FakeAgentBackendScenario.FAILED, session_store=store)._run())
assert len(events) == 1
result = cast(StreamCompletedEvent, events[0]).node_run_result
assert result.status == WorkflowNodeExecutionStatus.FAILED
agent_backend = result.metadata[WorkflowNodeExecutionMetadataKey.AGENT_LOG]["agent_backend"]
assert agent_backend["session_snapshot_cleaned_on_failure"] is False
assert agent_backend["session_snapshot_cleanup_error"] == "workflow_agent_runtime_session_store_error"
def test_agent_node_success_run_without_session_store_skips_persistence():
"""When ``session_store`` is None the node still completes successfully —
the lifecycle branch is a no-op and the run result is unaffected."""
events = list(_node(session_store=None)._run())
assert len(events) == 1
result = cast(StreamCompletedEvent, events[0]).node_run_result
assert result.status == WorkflowNodeExecutionStatus.SUCCEEDED
agent_backend = result.metadata[WorkflowNodeExecutionMetadataKey.AGENT_LOG]["agent_backend"]
# No persistence metadata is attached when the store is missing.
assert "session_snapshot_persisted" not in agent_backend
def test_agent_node_failed_run_without_session_store_skips_mark_cleaned():
"""``session_store=None`` + failed terminal must remain a no-op for
the cleanup branch the node failure path still surfaces correctly."""
events = list(_node(scenario=FakeAgentBackendScenario.FAILED, session_store=None)._run())
assert len(events) == 1
result = cast(StreamCompletedEvent, events[0]).node_run_result
assert result.status == WorkflowNodeExecutionStatus.FAILED
agent_backend = result.metadata[WorkflowNodeExecutionMetadataKey.AGENT_LOG]["agent_backend"]
assert "session_snapshot_cleaned_on_failure" not in agent_backend
def test_agent_node_paused_run_requests_workflow_pause_and_persists_snapshot():
store = FakeSessionStore()
node = _node(scenario=FakeAgentBackendScenario.PAUSED, session_store=store)
events = list(node._run())
assert len(events) == 1
assert isinstance(events[0], PauseRequestedEvent)
assert store.saved
assert store.saved[0][1] == "fake-run-1"
assert store.saved[0][3], "paused agent run should still persist replayable layer specs"
def test_agent_node_records_stream_usage_metadata():
metadata = {"agent_backend": {"run_id": "run-1"}}

View File

@ -1,10 +1,14 @@
from dataclasses import replace
from typing import cast
import pytest
from agenton.compositor import CompositorSessionSnapshot
from dify_agent.layers.dify_plugin import DifyPluginToolConfig, DifyPluginToolsLayerConfig
from dify_agent.protocol import DIFY_AGENT_HISTORY_LAYER_ID, DIFY_AGENT_MODEL_LAYER_ID
from clients.agent_backend import DIFY_EXECUTION_CONTEXT_LAYER_ID, DIFY_PLUGIN_TOOLS_LAYER_ID
from core.app.entities.app_invoke_entities import DifyRunContext, InvokeFrom, UserFrom
from core.workflow.nodes.agent_v2.plugin_tools_builder import WorkflowAgentPluginToolsBuilder
from core.workflow.nodes.agent_v2.runtime_request_builder import (
WorkflowAgentRuntimeBuildContext,
WorkflowAgentRuntimeRequestBuilder,
@ -27,6 +31,17 @@ class FakeCredentialsProvider:
return {"api_key": "secret-key"}
class CapturingCredentialsProvider:
def __init__(self) -> None:
self.provider_name: str | None = None
self.model_name: str | None = None
def fetch(self, provider_name: str, model_name: str) -> dict[str, object]:
self.provider_name = provider_name
self.model_name = model_name
return {"api_key": "secret-key"}
class FakePluginToolsBuilder:
def __init__(self) -> None:
# Capture the runtime invocation source so tests can assert it was
@ -136,7 +151,31 @@ def test_builds_create_run_request_from_agent_soul_and_node_job():
assert dumped["composition"]["layers"][1]["config"]["prefix"] == "Use the previous output."
assert "Previous result" in dumped["composition"]["layers"][2]["config"]["user"]
assert dumped["composition"]["layers"][-1]["config"]["json_schema"]["properties"]["summary"]["type"] == "string"
assert result.redacted_request["composition"]["layers"][4]["config"]["credentials"] == "[REDACTED]"
assert DIFY_AGENT_HISTORY_LAYER_ID in layers
assert result.redacted_request["composition"]["layers"][5]["config"]["credentials"] == "[REDACTED]"
def test_normalizes_langgenius_model_provider_for_agent_backend_transport():
context = _context()
context.snapshot.config_snapshot = AgentSoulConfig(
prompt={"system_prompt": "You are careful."},
model=AgentSoulModelConfig(
plugin_id="langgenius/openai/openai",
model_provider="langgenius/openai/openai",
model="gpt-test",
),
)
credentials_provider = CapturingCredentialsProvider()
result = WorkflowAgentRuntimeRequestBuilder(credentials_provider=credentials_provider).build(context)
dumped = result.request.model_dump(mode="json")
layers = {layer["name"]: layer for layer in dumped["composition"]["layers"]}
model_config = layers[DIFY_AGENT_MODEL_LAYER_ID]["config"]
assert credentials_provider.provider_name == "langgenius/openai/openai"
assert credentials_provider.model_name == "gpt-test"
assert model_config["plugin_id"] == "langgenius/openai"
assert model_config["model_provider"] == "openai"
def test_builds_workflow_run_request_with_file_output_schema_and_reserved_metadata():
@ -187,7 +226,7 @@ def test_builds_workflow_run_request_with_file_output_schema_and_reserved_metada
assert output_schema["properties"]["report"]["properties"]["file_id"]["type"] == "string"
assert output_schema["properties"]["confidence"]["type"] == "number"
assert output_schema["required"] == ["report"]
assert dumped["composition"]["layers"][4]["config"]["model_settings"] == {"temperature": 0.2}
assert dumped["composition"]["layers"][5]["config"]["model_settings"] == {"temperature": 0.2}
assert result.metadata["runtime_support"]["reserved_status"]["tools.dify_tools"] == "supported_when_config_valid"
assert result.metadata["runtime_support"]["reserved_status"]["tools.cli_tools"] == "reserved_not_executed"
warnings = result.metadata["runtime_support"]["unsupported_runtime_warnings"]
@ -224,7 +263,7 @@ def test_builds_workflow_run_request_with_dify_plugin_tools_layer():
plugin_tools_builder = FakePluginToolsBuilder()
result = WorkflowAgentRuntimeRequestBuilder(
credentials_provider=FakeCredentialsProvider(),
plugin_tools_builder=plugin_tools_builder,
plugin_tools_builder=cast(WorkflowAgentPluginToolsBuilder, plugin_tools_builder),
).build(context)
dumped = result.request.model_dump(mode="json")
@ -244,6 +283,15 @@ def test_builds_workflow_run_request_with_dify_plugin_tools_layer():
assert plugin_tools_builder.last_invoke_from == context.dify_context.invoke_from
def test_build_passes_saved_session_snapshot_to_agent_backend_request():
session_snapshot = CompositorSessionSnapshot(layers=[])
context = replace(_context(), session_snapshot=session_snapshot)
result = WorkflowAgentRuntimeRequestBuilder(credentials_provider=FakeCredentialsProvider()).build(context)
assert result.request.session_snapshot is session_snapshot
def test_requires_agent_soul_model_config():
context = _context()
snapshot = AgentConfigSnapshot(

View File

@ -0,0 +1,412 @@
from datetime import UTC
from typing import cast
import pytest
from agenton.compositor import CompositorSessionSnapshot
from agenton.compositor.schemas import LayerSessionSnapshot
from agenton.layers.base import LifecycleState
from dify_agent.protocol import CancelRunRequest, RunEvent, RunStatusResponse
from clients.agent_backend import AgentBackendRunRequestBuilder, CleanupLayerSpec, FakeAgentBackendRunClient
from clients.agent_backend.errors import AgentBackendHTTPError
from core.workflow.nodes.agent_v2.session_cleanup_layer import WorkflowAgentSessionCleanupLayer
from core.workflow.nodes.agent_v2.session_store import (
StoredWorkflowAgentSession,
WorkflowAgentRuntimeSessionStore,
WorkflowAgentSessionScope,
)
from core.workflow.system_variables import build_system_variables
from graphon.entities.pause_reason import SchedulingPause
from graphon.graph_engine.command_channels import CommandChannel
from graphon.graph_events import (
GraphRunAbortedEvent,
GraphRunFailedEvent,
GraphRunPartialSucceededEvent,
GraphRunPausedEvent,
GraphRunStartedEvent,
GraphRunSucceededEvent,
)
from graphon.runtime import GraphRuntimeState, ReadOnlyGraphRuntimeStateWrapper, VariablePool
def _layer_snapshot(name: str) -> LayerSessionSnapshot:
return LayerSessionSnapshot(
name=name,
lifecycle_state=LifecycleState.SUSPENDED,
runtime_state={},
)
def _stored_session(scope: WorkflowAgentSessionScope, *, index: int = 1) -> StoredWorkflowAgentSession:
"""A typical stored session with prompt + execution_context + history + llm specs.
The LLM layer is *not* in ``composition_layer_specs`` because the cleanup
contract excludes credential-bearing plugin layers, but it *is* present in
the saved snapshot so the layer's filter logic gets exercised.
"""
return StoredWorkflowAgentSession(
scope=scope,
session_snapshot=CompositorSessionSnapshot(
layers=[
_layer_snapshot("workflow_node_job_prompt"),
_layer_snapshot("execution_context"),
_layer_snapshot("history"),
_layer_snapshot("llm"),
]
),
backend_run_id=f"agent-run-{index}",
composition_layer_specs=[
CleanupLayerSpec(name="workflow_node_job_prompt", type="plain.prompt", config={"prefix": "ok"}),
CleanupLayerSpec(name="execution_context", type="dify.execution_context", config={"tenant_id": "t"}),
CleanupLayerSpec(name="history", type="pydantic_ai.history"),
],
)
class FakeSessionStore:
"""In-memory stand-in for ``WorkflowAgentRuntimeSessionStore``."""
def __init__(self, *, stored: list[StoredWorkflowAgentSession] | None = None) -> None:
self._stored = stored if stored is not None else [_stored_session(_default_scope())]
self.list_calls: list[str] = []
self.cleaned: list[tuple[WorkflowAgentSessionScope, str | None]] = []
def list_active_sessions(self, *, workflow_run_id: str) -> list[StoredWorkflowAgentSession]:
self.list_calls.append(workflow_run_id)
return list(self._stored)
def mark_cleaned(self, *, scope: WorkflowAgentSessionScope, backend_run_id: str | None = None) -> None:
self.cleaned.append((scope, backend_run_id))
def _default_scope() -> WorkflowAgentSessionScope:
return WorkflowAgentSessionScope(
tenant_id="tenant-1",
app_id="app-1",
workflow_id="workflow-1",
workflow_run_id="workflow-run-1",
node_id="agent-node",
node_execution_id="node-exec-1",
binding_id="binding-1",
agent_id="agent-1",
agent_config_snapshot_id="snapshot-1",
)
class _WaitableFakeAgentBackendRunClient(FakeAgentBackendRunClient):
"""``FakeAgentBackendRunClient`` plus the ``wait_run`` hook the layer needs."""
def __init__(
self,
*,
run_id: str = "cleanup-run-1",
wait_status: str = "succeeded",
wait_error: str | None = None,
wait_raises: Exception | None = None,
) -> None:
super().__init__(run_id=run_id)
self._wait_status = wait_status
self._wait_error = wait_error
self._wait_raises = wait_raises
self.wait_calls: list[tuple[str, float | None]] = []
def wait_run(self, run_id: str, *, timeout_seconds: float | None = None) -> RunStatusResponse:
self.wait_calls.append((run_id, timeout_seconds))
if self._wait_raises is not None:
raise self._wait_raises
from datetime import datetime
return RunStatusResponse(
run_id=run_id,
status=cast(object, self._wait_status), # protocol Literal; cast keeps tests flexible
created_at=datetime(2026, 1, 1, tzinfo=UTC),
updated_at=datetime(2026, 1, 1, tzinfo=UTC),
error=self._wait_error,
)
# Inherit ``create_run`` from FakeAgentBackendRunClient; the missing protocol
# methods below are stub-only because the cleanup layer never calls them.
def cancel_run(self, run_id: str, request: CancelRunRequest | None = None): # pragma: no cover
del run_id, request
raise NotImplementedError
def stream_events(self, run_id: str, *, after: str | None = None): # pragma: no cover
del run_id, after
if False:
yield cast(RunEvent, None)
def _build_layer(
*,
session_store: FakeSessionStore,
agent_backend_client: _WaitableFakeAgentBackendRunClient,
http_cleanup_supported: bool = True,
) -> WorkflowAgentSessionCleanupLayer:
variable_pool = VariablePool.from_bootstrap(
system_variables=build_system_variables(workflow_execution_id="workflow-run-1"),
user_inputs={},
conversation_variables=[],
)
runtime_state = GraphRuntimeState(variable_pool=variable_pool, start_at=0.0)
layer = WorkflowAgentSessionCleanupLayer(
session_store=cast(WorkflowAgentRuntimeSessionStore, session_store),
request_builder=AgentBackendRunRequestBuilder(),
agent_backend_client=agent_backend_client,
)
# Tests opt in to the future HTTP-cleanup branch; the production default
# (False) is exercised by the dedicated tests below.
layer._HTTP_CLEANUP_SUPPORTED = http_cleanup_supported # type: ignore[reportPrivateUsage]
layer.initialize(ReadOnlyGraphRuntimeStateWrapper(runtime_state), cast(CommandChannel, object()))
return layer
@pytest.mark.parametrize(
"terminal_event",
[
GraphRunSucceededEvent(outputs={}),
GraphRunPartialSucceededEvent(exceptions_count=1, outputs={}),
GraphRunFailedEvent(error="boom"),
GraphRunAbortedEvent(reason="user cancelled", outputs={}),
],
ids=["succeeded", "partial_succeeded", "failed", "aborted"],
)
def test_cleanup_layer_triggers_cleanup_only_run_on_each_terminal_event(terminal_event):
session_store = FakeSessionStore()
agent_backend_client = _WaitableFakeAgentBackendRunClient()
layer = _build_layer(session_store=session_store, agent_backend_client=agent_backend_client)
layer.on_event(terminal_event)
assert session_store.list_calls == ["workflow-run-1"]
assert agent_backend_client.request is not None
# Cleanup composition replays the persisted (non-plugin) layer specs so the
# agent backend's snapshot-vs-composition name match succeeds.
layer_names = [layer.name for layer in agent_backend_client.request.composition.layers]
assert layer_names == ["workflow_node_job_prompt", "execution_context", "history"]
assert agent_backend_client.request.on_exit.default.value == "delete"
assert agent_backend_client.request.metadata["agent_backend_lifecycle"] == "session_cleanup"
# Snapshot is filtered to drop the plugin layer entry so names match the
# cleanup composition.
assert agent_backend_client.request.session_snapshot is not None
snapshot_names = [layer.name for layer in agent_backend_client.request.session_snapshot.layers]
assert snapshot_names == ["workflow_node_job_prompt", "execution_context", "history"]
# The layer waited for terminal status and the run succeeded, so the row
# is marked CLEANED with the cleanup run id.
assert agent_backend_client.wait_calls
assert session_store.cleaned == [(_default_scope(), "cleanup-run-1")]
@pytest.mark.parametrize(
"non_terminal_event",
[
GraphRunStartedEvent(),
GraphRunPausedEvent(reasons=[SchedulingPause(message="awaiting human input")], outputs={}),
],
ids=["started", "paused"],
)
def test_cleanup_layer_ignores_non_terminal_events(non_terminal_event):
session_store = FakeSessionStore()
agent_backend_client = _WaitableFakeAgentBackendRunClient()
layer = _build_layer(session_store=session_store, agent_backend_client=agent_backend_client)
layer.on_event(non_terminal_event)
assert session_store.list_calls == []
assert agent_backend_client.request is None
assert session_store.cleaned == []
def test_cleanup_layer_does_not_mark_cleaned_when_cleanup_run_fails():
"""Trap D: cleanup-only run goes ``run_failed`` (e.g. snapshot validation
error) the layer must leave the row ACTIVE so it can be retried instead
of silently leaking suspended agent-backend layers."""
session_store = FakeSessionStore()
agent_backend_client = _WaitableFakeAgentBackendRunClient(
wait_status="failed",
wait_error="snapshot mismatch",
)
layer = _build_layer(session_store=session_store, agent_backend_client=agent_backend_client)
layer.on_event(GraphRunSucceededEvent(outputs={}))
assert agent_backend_client.wait_calls
assert session_store.cleaned == []
def test_cleanup_layer_does_not_mark_cleaned_when_wait_raises():
session_store = FakeSessionStore()
agent_backend_client = _WaitableFakeAgentBackendRunClient(
wait_raises=AgentBackendHTTPError("boom", status_code=500, detail=None),
)
layer = _build_layer(session_store=session_store, agent_backend_client=agent_backend_client)
layer.on_event(GraphRunSucceededEvent(outputs={}))
assert session_store.cleaned == []
def test_cleanup_layer_marks_cleaned_locally_when_http_cleanup_disabled():
"""Production default: dify-agent has no cleanup-only run mode yet, so the
layer must retire the local row without issuing a doomed HTTP request that
would crash inside the agent backend's runner on the missing LLM layer."""
session_store = FakeSessionStore()
agent_backend_client = _WaitableFakeAgentBackendRunClient()
layer = _build_layer(
session_store=session_store,
agent_backend_client=agent_backend_client,
http_cleanup_supported=False,
)
layer.on_event(GraphRunSucceededEvent(outputs={}))
# No HTTP call goes out — the trap is avoided entirely.
assert agent_backend_client.request is None
assert agent_backend_client.wait_calls == []
# Local row is still retired so a workflow loop cannot resume from stale state.
assert session_store.cleaned == [(_default_scope(), "agent-run-1")]
def test_cleanup_layer_skips_sessions_without_persisted_specs():
"""Backwards-compatible safety net: a row written before A.1 landed has
no composition_layer_specs, so cleanup would unavoidably hit the snapshot-
validation trap. The layer must skip such rows instead of issuing a
doomed request."""
scope = _default_scope()
legacy_session = StoredWorkflowAgentSession(
scope=scope,
session_snapshot=CompositorSessionSnapshot(layers=[_layer_snapshot("history")]),
backend_run_id="legacy-run",
composition_layer_specs=[],
)
session_store = FakeSessionStore(stored=[legacy_session])
agent_backend_client = _WaitableFakeAgentBackendRunClient()
layer = _build_layer(session_store=session_store, agent_backend_client=agent_backend_client)
layer.on_event(GraphRunSucceededEvent(outputs={}))
assert agent_backend_client.request is None
assert session_store.cleaned == []
def test_cleanup_layer_fans_out_to_every_active_session():
scopes = [
WorkflowAgentSessionScope(
tenant_id="tenant-1",
app_id="app-1",
workflow_id="workflow-1",
workflow_run_id="workflow-run-1",
node_id=f"agent-node-{i}",
node_execution_id=f"node-exec-{i}",
binding_id=f"binding-{i}",
agent_id=f"agent-{i}",
agent_config_snapshot_id=f"snapshot-{i}",
)
for i in range(3)
]
session_store = FakeSessionStore(stored=[_stored_session(scope, index=i) for i, scope in enumerate(scopes, 1)])
agent_backend_client = _WaitableFakeAgentBackendRunClient(run_id="cleanup-run-many")
layer = _build_layer(session_store=session_store, agent_backend_client=agent_backend_client)
layer.on_event(GraphRunSucceededEvent(outputs={}))
# One cleanup row per stored ACTIVE session, all marked cleaned with the
# backend run id returned by the agent backend client.
assert [entry[0] for entry in session_store.cleaned] == scopes
assert {entry[1] for entry in session_store.cleaned} == {"cleanup-run-many"}
def test_cleanup_layer_warns_when_http_enabled_but_client_missing(caplog):
"""The HTTP cleanup branch must defensively skip when no client was wired.
This is the deployment-misconfig path: ``_HTTP_CLEANUP_SUPPORTED`` was
flipped to ``True`` but ``AGENT_BACKEND_BASE_URL`` is unset, so the
factory returned ``None``. The layer must not crash and must not silently
retire the row the warning surfaces the misconfig.
"""
import logging
session_store = FakeSessionStore()
layer = WorkflowAgentSessionCleanupLayer(
session_store=cast(WorkflowAgentRuntimeSessionStore, session_store),
request_builder=AgentBackendRunRequestBuilder(),
agent_backend_client=None,
)
layer._HTTP_CLEANUP_SUPPORTED = True # type: ignore[reportPrivateUsage]
variable_pool = VariablePool.from_bootstrap(
system_variables=build_system_variables(workflow_execution_id="workflow-run-1"),
user_inputs={},
conversation_variables=[],
)
runtime_state = GraphRuntimeState(variable_pool=variable_pool, start_at=0.0)
layer.initialize(ReadOnlyGraphRuntimeStateWrapper(runtime_state), cast(CommandChannel, object()))
with caplog.at_level(logging.WARNING):
layer.on_event(GraphRunSucceededEvent(outputs={}))
assert session_store.cleaned == []
assert any("no agent backend client is wired in" in record.message for record in caplog.records)
def test_cleanup_layer_skips_workflow_terminal_when_workflow_run_id_missing(caplog):
"""``workflow_run_id`` is the keying field; without it the fanout cannot
target a row, so the layer logs a warning and bails."""
import logging
session_store = FakeSessionStore()
agent_backend_client = _WaitableFakeAgentBackendRunClient()
layer = WorkflowAgentSessionCleanupLayer(
session_store=cast(WorkflowAgentRuntimeSessionStore, session_store),
request_builder=AgentBackendRunRequestBuilder(),
agent_backend_client=agent_backend_client,
)
# Bootstrap *without* a workflow_execution_id system variable.
variable_pool = VariablePool.from_bootstrap(
system_variables=build_system_variables(workflow_execution_id=""),
user_inputs={},
conversation_variables=[],
)
runtime_state = GraphRuntimeState(variable_pool=variable_pool, start_at=0.0)
layer.initialize(ReadOnlyGraphRuntimeStateWrapper(runtime_state), cast(CommandChannel, object()))
with caplog.at_level(logging.WARNING):
layer.on_event(GraphRunSucceededEvent(outputs={}))
assert session_store.list_calls == []
assert session_store.cleaned == []
assert any("workflow_run_id is missing" in record.message for record in caplog.records)
def test_build_workflow_agent_session_cleanup_layer_returns_layer_without_client_when_unconfigured(
monkeypatch,
):
"""The production builder must pass ``None`` for the agent backend client
when neither AGENT_BACKEND_BASE_URL nor AGENT_BACKEND_USE_FAKE is set, so
that unit-test environments without backend config don't crash at runner
construction."""
from configs import dify_config
from core.workflow.nodes.agent_v2.session_cleanup_layer import (
build_workflow_agent_session_cleanup_layer,
)
monkeypatch.setattr(dify_config, "AGENT_BACKEND_BASE_URL", None, raising=False)
monkeypatch.setattr(dify_config, "AGENT_BACKEND_USE_FAKE", False, raising=False)
layer = build_workflow_agent_session_cleanup_layer()
assert layer._agent_backend_client is None # type: ignore[reportPrivateUsage]
def test_build_workflow_agent_session_cleanup_layer_returns_layer_with_fake_client(monkeypatch):
"""With ``AGENT_BACKEND_USE_FAKE`` enabled the helper wires in the
deterministic fake client without needing a base_url."""
from clients.agent_backend.fake_client import FakeAgentBackendRunClient
from configs import dify_config
from core.workflow.nodes.agent_v2.session_cleanup_layer import (
build_workflow_agent_session_cleanup_layer,
)
monkeypatch.setattr(dify_config, "AGENT_BACKEND_BASE_URL", None, raising=False)
monkeypatch.setattr(dify_config, "AGENT_BACKEND_USE_FAKE", True, raising=False)
monkeypatch.setattr(dify_config, "AGENT_BACKEND_FAKE_SCENARIO", "success", raising=False)
layer = build_workflow_agent_session_cleanup_layer()
assert isinstance(layer._agent_backend_client, FakeAgentBackendRunClient) # type: ignore[reportPrivateUsage]

View File

@ -0,0 +1,286 @@
"""Unit tests for :mod:`core.workflow.nodes.agent_v2.session_store`.
Uses the in-memory SQLite engine configured by the project conftest plus a
per-test ``CREATE TABLE`` so the real ORM round-trip exercises every store
method. Keeps the suite self-contained no Postgres / Docker required while
still hitting the actual ``session_factory`` code path that production uses.
"""
from __future__ import annotations
from collections.abc import Generator
import pytest
from agenton.compositor import CompositorSessionSnapshot
from agenton.compositor.schemas import LayerSessionSnapshot
from agenton.layers.base import LifecycleState
from sqlalchemy import delete
from clients.agent_backend.request_builder import CleanupLayerSpec
from core.db.session_factory import session_factory
from core.workflow.nodes.agent_v2.session_store import (
StoredWorkflowAgentSession,
WorkflowAgentRuntimeSessionStore,
WorkflowAgentSessionScope,
)
from models.agent import WorkflowAgentRuntimeSession, WorkflowAgentRuntimeSessionStatus
def _scope(workflow_run_id: str | None = "wfr-1", binding_id: str = "binding-1") -> WorkflowAgentSessionScope:
return WorkflowAgentSessionScope(
tenant_id="tenant-1",
app_id="app-1",
workflow_id="workflow-1",
workflow_run_id=workflow_run_id,
node_id="agent-node",
node_execution_id="node-exec-1",
binding_id=binding_id,
agent_id="agent-1",
agent_config_snapshot_id="snapshot-1",
)
def _snapshot(messages: int = 1) -> CompositorSessionSnapshot:
return CompositorSessionSnapshot(
layers=[
LayerSessionSnapshot(
name="history",
lifecycle_state=LifecycleState.SUSPENDED,
runtime_state={"messages": [{"role": "user", "content": f"m{i}"} for i in range(messages)]},
)
]
)
def _specs() -> list[CleanupLayerSpec]:
return [
CleanupLayerSpec(name="workflow_node_job_prompt", type="plain.prompt", config={"prefix": "ok"}),
CleanupLayerSpec(name="history", type="pydantic_ai.history"),
]
@pytest.fixture(autouse=True)
def _create_table() -> Generator[None, None, None]:
"""Create the lifecycle table on the in-memory SQLite engine, drop after."""
engine = session_factory.get_session_maker().kw["bind"]
WorkflowAgentRuntimeSession.__table__.create(bind=engine, checkfirst=True)
yield
with session_factory.create_session() as session:
session.execute(delete(WorkflowAgentRuntimeSession))
session.commit()
WorkflowAgentRuntimeSession.__table__.drop(bind=engine, checkfirst=True)
def test_load_active_snapshot_returns_none_when_scope_has_no_workflow_run_id():
"""``workflow_run_id`` is the keying column; no row can match without it."""
store = WorkflowAgentRuntimeSessionStore()
assert store.load_active_snapshot(_scope(workflow_run_id=None)) is None
def test_load_active_snapshot_returns_none_when_no_row_matches():
store = WorkflowAgentRuntimeSessionStore()
assert store.load_active_snapshot(_scope()) is None
def test_save_active_snapshot_creates_row_and_load_round_trips():
store = WorkflowAgentRuntimeSessionStore()
snapshot = _snapshot(messages=2)
store.save_active_snapshot(
scope=_scope(), backend_run_id="run-1", snapshot=snapshot, composition_layer_specs=_specs()
)
loaded = store.load_active_snapshot(_scope())
assert loaded is not None
assert len(loaded.layers) == 1
assert loaded.layers[0].name == "history"
assert loaded.layers[0].runtime_state["messages"] == snapshot.layers[0].runtime_state["messages"]
def test_save_active_snapshot_skips_when_workflow_run_id_missing():
"""Without a workflow_run_id the row cannot be keyed; save is a no-op."""
store = WorkflowAgentRuntimeSessionStore()
store.save_active_snapshot(
scope=_scope(workflow_run_id=None),
backend_run_id="run-skipped",
snapshot=_snapshot(),
composition_layer_specs=_specs(),
)
with session_factory.create_session() as session:
assert session.query(WorkflowAgentRuntimeSession).count() == 0
def test_save_active_snapshot_skips_when_snapshot_missing():
"""A run that produced no snapshot (e.g. failed agent run) does not write."""
store = WorkflowAgentRuntimeSessionStore()
store.save_active_snapshot(
scope=_scope(),
backend_run_id="run-empty",
snapshot=None,
composition_layer_specs=_specs(),
)
with session_factory.create_session() as session:
assert session.query(WorkflowAgentRuntimeSession).count() == 0
def test_save_active_snapshot_updates_existing_row_on_re_entry():
"""A second save under the same scope must update in place, not insert."""
store = WorkflowAgentRuntimeSessionStore()
store.save_active_snapshot(
scope=_scope(),
backend_run_id="run-1",
snapshot=_snapshot(messages=1),
composition_layer_specs=_specs(),
)
# Second call with new snapshot + backend_run_id.
store.save_active_snapshot(
scope=_scope(),
backend_run_id="run-2",
snapshot=_snapshot(messages=2),
composition_layer_specs=_specs(),
)
with session_factory.create_session() as session:
rows = session.query(WorkflowAgentRuntimeSession).all()
assert len(rows) == 1
assert rows[0].backend_run_id == "run-2"
assert rows[0].status == WorkflowAgentRuntimeSessionStatus.ACTIVE
assert rows[0].cleaned_at is None
def test_save_active_snapshot_resurrects_cleaned_row():
"""If a prior cleanup retired the row, a re-entry flips it back to ACTIVE."""
store = WorkflowAgentRuntimeSessionStore()
store.save_active_snapshot(
scope=_scope(),
backend_run_id="run-1",
snapshot=_snapshot(),
composition_layer_specs=_specs(),
)
store.mark_cleaned(scope=_scope(), backend_run_id="cleanup-1")
# Save again — the existing row was CLEANED; should be revived.
store.save_active_snapshot(
scope=_scope(),
backend_run_id="run-2",
snapshot=_snapshot(messages=3),
composition_layer_specs=_specs(),
)
with session_factory.create_session() as session:
rows = session.query(WorkflowAgentRuntimeSession).all()
assert len(rows) == 1
assert rows[0].status == WorkflowAgentRuntimeSessionStatus.ACTIVE
assert rows[0].cleaned_at is None
assert rows[0].backend_run_id == "run-2"
def test_list_active_sessions_returns_specs_and_snapshot():
store = WorkflowAgentRuntimeSessionStore()
store.save_active_snapshot(
scope=_scope(binding_id="binding-A"),
backend_run_id="run-A",
snapshot=_snapshot(),
composition_layer_specs=_specs(),
)
store.save_active_snapshot(
scope=_scope(binding_id="binding-B"),
backend_run_id="run-B",
snapshot=_snapshot(messages=2),
composition_layer_specs=_specs(),
)
listed = store.list_active_sessions(workflow_run_id="wfr-1")
assert {s.backend_run_id for s in listed} == {"run-A", "run-B"}
by_run = {s.backend_run_id: s for s in listed}
assert isinstance(by_run["run-A"], StoredWorkflowAgentSession)
# Specs round-trip through pydantic TypeAdapter — ensure deserialize works.
assert by_run["run-A"].composition_layer_specs[0].name == "workflow_node_job_prompt"
assert by_run["run-A"].composition_layer_specs[1].type == "pydantic_ai.history"
# node_execution_id default-replaces NULL with "" when the DB column is None.
assert by_run["run-A"].scope.node_execution_id == "node-exec-1"
def test_list_active_sessions_skips_cleaned_rows():
store = WorkflowAgentRuntimeSessionStore()
store.save_active_snapshot(
scope=_scope(binding_id="binding-A"),
backend_run_id="run-A",
snapshot=_snapshot(),
composition_layer_specs=_specs(),
)
store.save_active_snapshot(
scope=_scope(binding_id="binding-B"),
backend_run_id="run-B",
snapshot=_snapshot(),
composition_layer_specs=_specs(),
)
store.mark_cleaned(scope=_scope(binding_id="binding-A"), backend_run_id="cleanup-A")
listed = store.list_active_sessions(workflow_run_id="wfr-1")
assert {s.backend_run_id for s in listed} == {"run-B"}
def test_list_active_sessions_handles_legacy_rows_without_specs():
"""Rows persisted before composition_layer_specs landed have an empty string."""
# Insert a legacy-shape row directly: empty specs payload simulates a row
# written before the spec persistence feature landed in A.1.
store = WorkflowAgentRuntimeSessionStore()
store.save_active_snapshot(
scope=_scope(),
backend_run_id="run-legacy",
snapshot=_snapshot(),
composition_layer_specs=[],
)
listed = store.list_active_sessions(workflow_run_id="wfr-1")
assert len(listed) == 1
assert listed[0].composition_layer_specs == []
def test_mark_cleaned_sets_status_and_cleaned_at_with_backend_run_id():
store = WorkflowAgentRuntimeSessionStore()
store.save_active_snapshot(
scope=_scope(),
backend_run_id="run-1",
snapshot=_snapshot(),
composition_layer_specs=_specs(),
)
store.mark_cleaned(scope=_scope(), backend_run_id="cleanup-1")
with session_factory.create_session() as session:
row = session.query(WorkflowAgentRuntimeSession).one()
assert row.status == WorkflowAgentRuntimeSessionStatus.CLEANED
assert row.cleaned_at is not None
assert row.backend_run_id == "cleanup-1"
def test_mark_cleaned_preserves_existing_backend_run_id_when_none_given():
"""``backend_run_id=None`` means "leave the previous one in place"."""
store = WorkflowAgentRuntimeSessionStore()
store.save_active_snapshot(
scope=_scope(),
backend_run_id="run-1",
snapshot=_snapshot(),
composition_layer_specs=_specs(),
)
store.mark_cleaned(scope=_scope(), backend_run_id=None)
with session_factory.create_session() as session:
row = session.query(WorkflowAgentRuntimeSession).one()
assert row.status == WorkflowAgentRuntimeSessionStatus.CLEANED
assert row.backend_run_id == "run-1"
def test_mark_cleaned_is_a_noop_when_no_active_row():
"""No matching ACTIVE row → no-op (already-cleaned rows are not re-touched)."""
store = WorkflowAgentRuntimeSessionStore()
store.mark_cleaned(scope=_scope(), backend_run_id="cleanup-1")
with session_factory.create_session() as session:
assert session.query(WorkflowAgentRuntimeSession).count() == 0
def test_mark_cleaned_is_a_noop_when_workflow_run_id_missing():
"""Without a workflow_run_id we cannot key the row; ignore the call."""
store = WorkflowAgentRuntimeSessionStore()
store.mark_cleaned(scope=_scope(workflow_run_id=None), backend_run_id="cleanup-1")
# Sanity — no rows created or touched.
with session_factory.create_session() as session:
assert session.query(WorkflowAgentRuntimeSession).count() == 0