From 9cdeffd0b1ac755793cc566db98462cf49521315 Mon Sep 17 00:00:00 2001 From: zyssyz123 <916125788@qq.com> Date: Wed, 27 May 2026 23:00:21 +0800 Subject: [PATCH] feat(api): agent backend session lifecycle for workflow agent nodes (#36724) Co-authored-by: Claude Opus 4.7 (1M context) --- api/clients/agent_backend/__init__.py | 4 + api/clients/agent_backend/fake_client.py | 24 + api/clients/agent_backend/request_builder.py | 144 +++++- api/core/app/apps/advanced_chat/app_runner.py | 2 + api/core/app/apps/workflow/app_runner.py | 2 + api/core/workflow/node_factory.py | 2 + .../workflow/nodes/agent_v2/agent_node.py | 135 +++++- .../nodes/agent_v2/runtime_request_builder.py | 27 +- .../nodes/agent_v2/session_cleanup_layer.py | 247 +++++++++++ .../workflow/nodes/agent_v2/session_store.py | 179 ++++++++ ...9a9_add_workflow_agent_runtime_sessions.py | 90 ++++ api/models/__init__.py | 4 + api/models/agent.py | 62 +++ ...anup_composition_compositor_integration.py | 134 ++++++ .../clients/agent_backend/test_fake_client.py | 22 + .../agent_backend/test_request_builder.py | 107 ++++- .../nodes/agent_v2/test_agent_node.py | 174 +++++++- .../agent_v2/test_runtime_request_builder.py | 54 ++- .../agent_v2/test_session_cleanup_layer.py | 412 ++++++++++++++++++ .../nodes/agent_v2/test_session_store.py | 286 ++++++++++++ 20 files changed, 2086 insertions(+), 25 deletions(-) create mode 100644 api/core/workflow/nodes/agent_v2/session_cleanup_layer.py create mode 100644 api/core/workflow/nodes/agent_v2/session_store.py create mode 100644 api/migrations/versions/2026_05_27_0953-7885bd53f9a9_add_workflow_agent_runtime_sessions.py create mode 100644 api/tests/unit_tests/clients/agent_backend/test_cleanup_composition_compositor_integration.py create mode 100644 api/tests/unit_tests/core/workflow/nodes/agent_v2/test_session_cleanup_layer.py create mode 100644 api/tests/unit_tests/core/workflow/nodes/agent_v2/test_session_store.py diff --git a/api/clients/agent_backend/__init__.py b/api/clients/agent_backend/__init__.py index 4d459d34a0..937dcea58d 100644 --- a/api/clients/agent_backend/__init__.py +++ b/api/clients/agent_backend/__init__.py @@ -38,6 +38,8 @@ from clients.agent_backend.request_builder import ( AgentBackendOutputConfig, AgentBackendRunRequestBuilder, AgentBackendWorkflowNodeRunInput, + CleanupLayerSpec, + extract_cleanup_layer_specs, redact_for_agent_backend_log, ) @@ -68,9 +70,11 @@ __all__ = [ "AgentBackendTransportError", "AgentBackendValidationError", "AgentBackendWorkflowNodeRunInput", + "CleanupLayerSpec", "DifyAgentBackendRunClient", "FakeAgentBackendRunClient", "FakeAgentBackendScenario", "create_agent_backend_run_client", + "extract_cleanup_layer_specs", "redact_for_agent_backend_log", ] diff --git a/api/clients/agent_backend/fake_client.py b/api/clients/agent_backend/fake_client.py index 6414ddc7b5..a768777039 100644 --- a/api/clients/agent_backend/fake_client.py +++ b/api/clients/agent_backend/fake_client.py @@ -20,6 +20,8 @@ from dify_agent.protocol import ( RunEvent, RunFailedEvent, RunFailedEventData, + RunPausedEvent, + RunPausedEventData, RunStartedEvent, RunStatusResponse, RunSucceededEvent, @@ -34,6 +36,7 @@ class FakeAgentBackendScenario(StrEnum): SUCCESS = "success" FAILED = "failed" + PAUSED = "paused" class FakeAgentBackendRunClient: @@ -89,6 +92,13 @@ class FakeAgentBackendRunClient: updated_at=_FIXED_TIME, error="fake failure", ) + case FakeAgentBackendScenario.PAUSED: + return RunStatusResponse( + run_id=run_id, + status="paused", + created_at=_FIXED_TIME, + updated_at=_FIXED_TIME, + ) def _events(self, run_id: str) -> tuple[RunEvent, ...]: match self.scenario: @@ -115,3 +125,17 @@ class FakeAgentBackendRunClient: data=RunFailedEventData(error="fake failure", reason="unit_test"), ), ) + case FakeAgentBackendScenario.PAUSED: + return ( + RunStartedEvent(id="1-0", run_id=run_id, created_at=_FIXED_TIME), + RunPausedEvent( + id="2-0", + run_id=run_id, + created_at=_FIXED_TIME, + data=RunPausedEventData( + reason="human_input_required", + message="Agent requested human input.", + session_snapshot=CompositorSessionSnapshot(layers=[]), + ), + ), + ) diff --git a/api/clients/agent_backend/request_builder.py b/api/clients/agent_backend/request_builder.py index 74114469dd..e7c7e6f1ac 100644 --- a/api/clients/agent_backend/request_builder.py +++ b/api/clients/agent_backend/request_builder.py @@ -11,11 +11,13 @@ composition-driven. from __future__ import annotations -from typing import ClassVar +from typing import ClassVar, cast from agenton.compositor import CompositorSessionSnapshot +from agenton.compositor.schemas import LayerSessionSnapshot from agenton.layers import ExitIntent from agenton_collections.layers.plain import PLAIN_PROMPT_LAYER_TYPE_ID, PromptLayerConfig +from agenton_collections.layers.pydantic_ai import PYDANTIC_AI_HISTORY_LAYER_TYPE_ID from dify_agent.layers.dify_plugin import ( DIFY_PLUGIN_LLM_LAYER_TYPE_ID, DIFY_PLUGIN_TOOLS_LAYER_TYPE_ID, @@ -29,6 +31,7 @@ from dify_agent.layers.execution_context import ( ) from dify_agent.layers.output import DIFY_OUTPUT_LAYER_TYPE_ID, DifyOutputLayerConfig from dify_agent.protocol import ( + DIFY_AGENT_HISTORY_LAYER_ID, DIFY_AGENT_MODEL_LAYER_ID, DIFY_AGENT_OUTPUT_LAYER_ID, CreateRunRequest, @@ -45,6 +48,84 @@ WORKFLOW_USER_PROMPT_LAYER_ID = "workflow_user_prompt" DIFY_EXECUTION_CONTEXT_LAYER_ID = "execution_context" DIFY_PLUGIN_TOOLS_LAYER_ID = "tools" +# Layer types that hold credentials in their per-run config. These are excluded +# from the cleanup-replay composition (and from the snapshot that is sent with +# the cleanup request) because we deliberately do not persist plaintext +# credentials between runs. +_CLEANUP_EXCLUDED_LAYER_TYPES: tuple[str, ...] = ( + DIFY_PLUGIN_LLM_LAYER_TYPE_ID, + DIFY_PLUGIN_TOOLS_LAYER_TYPE_ID, +) + + +class CleanupLayerSpec(BaseModel): + """One layer node replayed by an Agent backend cleanup-only run. + + Cleanup composition cannot include credential-bearing plugin layers, so we + persist only the non-plugin layer specs together with the original config. + Storing the config (rather than just ``name``/``type``) means cleanup does + not depend on the original build-time inputs being re-derivable. + """ + + name: str + type: str + deps: dict[str, str] = Field(default_factory=dict) + metadata: dict[str, JsonValue] = Field(default_factory=dict) + config: JsonValue = None + + model_config: ClassVar[ConfigDict] = ConfigDict(extra="forbid") + + +def extract_cleanup_layer_specs(composition: RunComposition) -> list[CleanupLayerSpec]: + """Project the in-flight composition into the persistable cleanup spec list. + + Plugin layers are intentionally dropped (their configs hold credentials and + the lifecycle contract says "do not include an LLM layer" during cleanup). + The filtered names must later drive snapshot filtering so the agenton + compositor's name-order check still passes for the cleanup run. + """ + excluded = set(_CLEANUP_EXCLUDED_LAYER_TYPES) + specs: list[CleanupLayerSpec] = [] + for layer in composition.layers: + if layer.type in excluded: + continue + config_value: JsonValue = None + if isinstance(layer.config, BaseModel): + config_value = layer.config.model_dump(mode="json", warnings=False) + else: + # ``RunLayerSpec.config`` is typed as ``LayerConfigInput`` which + # includes ``Mapping[str, object] | bytes``. In the cleanup-replay + # pipeline our builder only emits BaseModel-derived configs or + # ``None``, so the wider input alias narrows safely here. + config_value = cast(JsonValue, layer.config) + specs.append( + CleanupLayerSpec( + name=layer.name, + type=layer.type, + deps=dict(layer.deps), + metadata=dict(layer.metadata), + config=config_value, + ) + ) + return specs + + +def _filter_snapshot_to_specs( + snapshot: CompositorSessionSnapshot, + specs: list[CleanupLayerSpec], +) -> CompositorSessionSnapshot: + """Keep only snapshot layers whose names appear in the cleanup spec list. + + The agenton compositor rejects a snapshot whose layer-name sequence does + not match the active composition exactly. Cleanup-replay drops plugin + layers, so we must drop the matching snapshot entries here. + """ + kept_names = {spec.name for spec in specs} + filtered_layers: list[LayerSessionSnapshot] = [layer for layer in snapshot.layers if layer.name in kept_names] + if len(filtered_layers) == len(snapshot.layers): + return snapshot + return CompositorSessionSnapshot(schema_version=snapshot.schema_version, layers=filtered_layers) + class AgentBackendModelConfig(BaseModel): """API-side model/plugin selection before it is converted to Dify Agent layers.""" @@ -86,7 +167,8 @@ class AgentBackendWorkflowNodeRunInput(BaseModel): output: AgentBackendOutputConfig | None = None tools: DifyPluginToolsLayerConfig | None = None session_snapshot: CompositorSessionSnapshot | None = None - suspend_on_exit: bool = False + include_history: bool = True + suspend_on_exit: bool = True metadata: dict[str, JsonValue] = Field(default_factory=dict) model_config: ClassVar[ConfigDict] = ConfigDict(extra="forbid", arbitrary_types_allowed=True) @@ -102,6 +184,50 @@ class AgentBackendWorkflowNodeRunInput(BaseModel): class AgentBackendRunRequestBuilder: """Converts API product state into the public ``dify-agent`` run protocol.""" + def build_cleanup_request( + self, + *, + session_snapshot: CompositorSessionSnapshot, + composition_layer_specs: list[CleanupLayerSpec], + idempotency_key: str | None = None, + metadata: dict[str, JsonValue] | None = None, + ) -> CreateRunRequest: + """Build a lifecycle-only cleanup request that replays the prior layers. + + The agenton compositor enforces that the session snapshot's layer names + match the active composition in order, so cleanup must replay the same + non-plugin layer graph that produced the snapshot. Plugin layers + (``dify.plugin.llm``, ``dify.plugin.tools``) are excluded from both the + composition and the snapshot before submission because their configs + require credentials that are not persisted between runs. + """ + if not composition_layer_specs: + raise ValueError( + "build_cleanup_request requires composition_layer_specs; an empty " + "composition would fail the agent backend's snapshot validation." + ) + request_metadata = dict(metadata or {}) + request_metadata["agent_backend_lifecycle"] = "session_cleanup" + layers = [ + RunLayerSpec( + name=spec.name, + type=spec.type, + deps=dict(spec.deps), + metadata=dict(spec.metadata), + config=spec.config, + ) + for spec in composition_layer_specs + ] + filtered_snapshot = _filter_snapshot_to_specs(session_snapshot, composition_layer_specs) + return CreateRunRequest( + composition=RunComposition(layers=layers), + purpose="workflow_node", + idempotency_key=idempotency_key, + metadata=request_metadata, + session_snapshot=filtered_snapshot, + on_exit=LayerExitSignals(default=ExitIntent.DELETE), + ) + def build_for_workflow_node(self, run_input: AgentBackendWorkflowNodeRunInput) -> CreateRunRequest: """Build a workflow Agent Node run request without defining another wire schema.""" layers: list[RunLayerSpec] = [] @@ -135,6 +261,20 @@ class AgentBackendRunRequestBuilder: metadata=run_input.metadata, config=run_input.execution_context, ), + ] + ) + + if run_input.include_history: + layers.append( + RunLayerSpec( + name=DIFY_AGENT_HISTORY_LAYER_ID, + type=PYDANTIC_AI_HISTORY_LAYER_TYPE_ID, + metadata={**run_input.metadata, "origin": "agent_session_history"}, + ) + ) + + layers.extend( + [ RunLayerSpec( name=DIFY_AGENT_MODEL_LAYER_ID, type=DIFY_PLUGIN_LLM_LAYER_TYPE_ID, diff --git a/api/core/app/apps/advanced_chat/app_runner.py b/api/core/app/apps/advanced_chat/app_runner.py index 4e57b4dedc..db66e9f592 100644 --- a/api/core/app/apps/advanced_chat/app_runner.py +++ b/api/core/app/apps/advanced_chat/app_runner.py @@ -27,6 +27,7 @@ from core.moderation.base import ModerationError from core.moderation.input_moderation import InputModeration from core.repositories.factory import WorkflowExecutionRepository, WorkflowNodeExecutionRepository from core.workflow.node_factory import get_default_root_node_id +from core.workflow.nodes.agent_v2.session_cleanup_layer import build_workflow_agent_session_cleanup_layer from core.workflow.system_variables import ( build_bootstrap_variables, build_system_variables, @@ -239,6 +240,7 @@ class AdvancedChatAppRunner(WorkflowBasedAppRunner): ) workflow_entry.graph_engine.layer(persistence_layer) + workflow_entry.graph_engine.layer(build_workflow_agent_session_cleanup_layer()) conversation_variable_layer = ConversationVariablePersistenceLayer( ConversationVariableUpdater(session_factory.get_session_maker()) ) diff --git a/api/core/app/apps/workflow/app_runner.py b/api/core/app/apps/workflow/app_runner.py index cfb9208486..9d8a3eb1b1 100644 --- a/api/core/app/apps/workflow/app_runner.py +++ b/api/core/app/apps/workflow/app_runner.py @@ -10,6 +10,7 @@ from core.app.entities.app_invoke_entities import InvokeFrom, WorkflowAppGenerat from core.app.workflow.layers.persistence import PersistenceWorkflowInfo, WorkflowPersistenceLayer from core.repositories.factory import WorkflowExecutionRepository, WorkflowNodeExecutionRepository from core.workflow.node_factory import get_default_root_node_id +from core.workflow.nodes.agent_v2.session_cleanup_layer import build_workflow_agent_session_cleanup_layer from core.workflow.system_variables import build_bootstrap_variables, build_system_variables from core.workflow.variable_pool_initializer import add_node_inputs_to_pool, add_variables_to_pool from core.workflow.workflow_entry import WorkflowEntry @@ -166,6 +167,7 @@ class WorkflowAppRunner(WorkflowBasedAppRunner): ) workflow_entry.graph_engine.layer(persistence_layer) + workflow_entry.graph_engine.layer(build_workflow_agent_session_cleanup_layer()) for layer in self._graph_engine_layers: workflow_entry.graph_engine.layer(layer) diff --git a/api/core/workflow/node_factory.py b/api/core/workflow/node_factory.py index 4a27b2c623..531de248d9 100644 --- a/api/core/workflow/node_factory.py +++ b/api/core/workflow/node_factory.py @@ -475,6 +475,7 @@ class DifyNodeFactory(NodeFactory): from core.workflow.nodes.agent_v2.file_tenant_validator import UploadFileTenantValidator from core.workflow.nodes.agent_v2.output_failure_orchestrator import OutputFailureOrchestrator from core.workflow.nodes.agent_v2.output_type_checker import PerOutputTypeChecker + from core.workflow.nodes.agent_v2.session_store import WorkflowAgentRuntimeSessionStore return { "binding_resolver": WorkflowAgentBindingResolver(), @@ -494,6 +495,7 @@ class DifyNodeFactory(NodeFactory): # outputs contain no file refs. "type_checker": PerOutputTypeChecker(file_validator=UploadFileTenantValidator()), "failure_orchestrator": OutputFailureOrchestrator(), + "session_store": WorkflowAgentRuntimeSessionStore(), } return { "strategy_resolver": self._agent_strategy_resolver, diff --git a/api/core/workflow/nodes/agent_v2/agent_node.py b/api/core/workflow/nodes/agent_v2/agent_node.py index c209c6b1de..a255d3a143 100644 --- a/api/core/workflow/nodes/agent_v2/agent_node.py +++ b/api/core/workflow/nodes/agent_v2/agent_node.py @@ -1,8 +1,11 @@ from __future__ import annotations +import logging from collections.abc import Generator, Mapping, Sequence from typing import TYPE_CHECKING, Any +from agenton.compositor import CompositorSessionSnapshot + from clients.agent_backend import ( AgentBackendError, AgentBackendHTTPError, @@ -17,11 +20,14 @@ from clients.agent_backend import ( AgentBackendStreamInternalEvent, AgentBackendTransportError, AgentBackendValidationError, + CleanupLayerSpec, + extract_cleanup_layer_specs, ) from core.app.entities.app_invoke_entities import DIFY_RUN_CONTEXT_KEY, DifyRunContext from core.workflow.system_variables import SystemVariableKey, get_system_text +from graphon.entities.pause_reason import SchedulingPause from graphon.enums import BuiltinNodeTypes, WorkflowNodeExecutionMetadataKey, WorkflowNodeExecutionStatus -from graphon.node_events import NodeEventBase, NodeRunResult, StreamCompletedEvent +from graphon.node_events import NodeEventBase, NodeRunResult, PauseRequestedEvent, StreamCompletedEvent from graphon.nodes.base.node import Node from models.agent_config_entities import WorkflowNodeJobConfig @@ -40,11 +46,14 @@ from .runtime_request_builder import ( WorkflowAgentRuntimeRequestBuilder, WorkflowAgentRuntimeRequestBuildError, ) +from .session_store import WorkflowAgentRuntimeSessionStore, WorkflowAgentSessionScope if TYPE_CHECKING: from graphon.entities import GraphInitParams from graphon.runtime import GraphRuntimeState +logger = logging.getLogger(__name__) + # Stage 4 §5+§7: the terminal events that `_consume_event_stream` may return. # Stream + started events are filtered out before we yield; transport errors @@ -74,6 +83,7 @@ class DifyAgentNode(Node[DifyAgentNodeData]): output_adapter: WorkflowAgentOutputAdapter, type_checker: PerOutputTypeChecker, failure_orchestrator: OutputFailureOrchestrator, + session_store: WorkflowAgentRuntimeSessionStore | None = None, ) -> None: super().__init__( node_id=node_id, @@ -88,6 +98,7 @@ class DifyAgentNode(Node[DifyAgentNodeData]): self._output_adapter = output_adapter self._type_checker = type_checker self._failure_orchestrator = failure_orchestrator + self._session_store = session_store @classmethod def version(cls) -> str: @@ -134,6 +145,17 @@ class DifyAgentNode(Node[DifyAgentNodeData]): "agent_config_snapshot_id": bundle.snapshot.id, "binding_id": bundle.binding.id, } + session_scope = WorkflowAgentSessionScope( + tenant_id=dify_ctx.tenant_id, + app_id=dify_ctx.app_id, + workflow_id=workflow_id, + workflow_run_id=workflow_run_id, + node_id=self._node_id, + node_execution_id=self.id, + binding_id=bundle.binding.id, + agent_id=bundle.agent.id, + agent_config_snapshot_id=bundle.snapshot.id, + ) # Stage 4 §4.1 (D-3): use effective outputs so defaults flow through both # the backend request and the post-run type check. @@ -147,6 +169,9 @@ class DifyAgentNode(Node[DifyAgentNodeData]): attempt = 0 while True: try: + session_snapshot = None + if self._session_store is not None: + session_snapshot = self._session_store.load_active_snapshot(session_scope) runtime_request = self._runtime_request_builder.build( WorkflowAgentRuntimeBuildContext( dify_context=dify_ctx, @@ -159,6 +184,7 @@ class DifyAgentNode(Node[DifyAgentNodeData]): agent=bundle.agent, snapshot=bundle.snapshot, attempt=attempt, + session_snapshot=session_snapshot, ) ) except WorkflowAgentRuntimeRequestBuildError as error: @@ -221,9 +247,35 @@ class DifyAgentNode(Node[DifyAgentNodeData]): ) return - # Non-success terminal (failed / cancelled / paused) skips per-output - # post-processing — the backend itself already failed. + if isinstance(terminal_event, AgentBackendRunPausedInternalEvent): + self._save_session_snapshot( + session_scope=session_scope, + backend_run_id=terminal_event.run_id, + snapshot=terminal_event.session_snapshot, + composition_layer_specs=extract_cleanup_layer_specs(runtime_request.request.composition), + metadata=metadata, + ) + yield PauseRequestedEvent( + reason=SchedulingPause( + message=terminal_event.message + or "Agent backend run requested workflow pause for external input." + ) + ) + return + + # Non-success terminal (failed / cancelled) skips per-output + # post-processing — the backend itself already failed. We also retire + # the local ACTIVE session row so a workflow loop back into the same + # Agent node cannot resume from a stale snapshot. The failed agent + # backend layers (suspended per ``on_exit``) are left for agent + # backend's own GC; this row will no longer be picked up by the + # workflow-terminal cleanup layer. if not isinstance(terminal_event, AgentBackendRunSucceededInternalEvent): + self._mark_session_cleaned_on_failure( + session_scope=session_scope, + backend_run_id=terminal_event.run_id, + metadata=metadata, + ) yield StreamCompletedEvent( node_run_result=self._output_adapter.build_failure_result( event=terminal_event, @@ -234,6 +286,14 @@ class DifyAgentNode(Node[DifyAgentNodeData]): ) return + self._save_session_snapshot( + session_scope=session_scope, + backend_run_id=terminal_event.run_id, + snapshot=terminal_event.session_snapshot, + composition_layer_specs=extract_cleanup_layer_specs(runtime_request.request.composition), + metadata=metadata, + ) + # ──── Stage 4: per-output type check ──── type_check = self._type_checker.check( declared_outputs=effective_outputs, @@ -384,6 +444,75 @@ class DifyAgentNode(Node[DifyAgentNodeData]): ], } + def _save_session_snapshot( + self, + *, + session_scope: WorkflowAgentSessionScope, + backend_run_id: str, + snapshot: CompositorSessionSnapshot | None, + composition_layer_specs: list[CleanupLayerSpec], + metadata: dict[str, Any], + ) -> None: + if self._session_store is None: + return + try: + self._session_store.save_active_snapshot( + scope=session_scope, + backend_run_id=backend_run_id, + snapshot=snapshot, + composition_layer_specs=composition_layer_specs, + ) + agent_backend = dict(metadata.get("agent_backend") or {}) + agent_backend["session_snapshot_persisted"] = snapshot is not None + metadata["agent_backend"] = agent_backend + except Exception: + logger.warning( + "Failed to persist workflow Agent runtime session snapshot: " + "tenant_id=%s workflow_run_id=%s node_id=%s binding_id=%s agent_id=%s backend_run_id=%s", + session_scope.tenant_id, + session_scope.workflow_run_id, + session_scope.node_id, + session_scope.binding_id, + session_scope.agent_id, + backend_run_id, + exc_info=True, + ) + agent_backend = dict(metadata.get("agent_backend") or {}) + agent_backend["session_snapshot_persisted"] = False + agent_backend["session_snapshot_persist_error"] = "workflow_agent_runtime_session_store_error" + metadata["agent_backend"] = agent_backend + + def _mark_session_cleaned_on_failure( + self, + *, + session_scope: WorkflowAgentSessionScope, + backend_run_id: str, + metadata: dict[str, Any], + ) -> None: + if self._session_store is None: + return + try: + self._session_store.mark_cleaned(scope=session_scope, backend_run_id=backend_run_id) + agent_backend = dict(metadata.get("agent_backend") or {}) + agent_backend["session_snapshot_cleaned_on_failure"] = True + metadata["agent_backend"] = agent_backend + except Exception: + logger.warning( + "Failed to mark workflow Agent runtime session cleaned on agent run failure: " + "tenant_id=%s workflow_run_id=%s node_id=%s binding_id=%s agent_id=%s backend_run_id=%s", + session_scope.tenant_id, + session_scope.workflow_run_id, + session_scope.node_id, + session_scope.binding_id, + session_scope.agent_id, + backend_run_id, + exc_info=True, + ) + agent_backend = dict(metadata.get("agent_backend") or {}) + agent_backend["session_snapshot_cleaned_on_failure"] = False + agent_backend["session_snapshot_cleanup_error"] = "workflow_agent_runtime_session_store_error" + metadata["agent_backend"] = agent_backend + @staticmethod def _patch_event_with_defaults( event: AgentBackendRunSucceededInternalEvent, diff --git a/api/core/workflow/nodes/agent_v2/runtime_request_builder.py b/api/core/workflow/nodes/agent_v2/runtime_request_builder.py index 0a0960d493..f093587b57 100644 --- a/api/core/workflow/nodes/agent_v2/runtime_request_builder.py +++ b/api/core/workflow/nodes/agent_v2/runtime_request_builder.py @@ -4,6 +4,7 @@ from collections.abc import Mapping, Sequence from dataclasses import dataclass from typing import Any, Literal, Protocol, cast +from agenton.compositor import CompositorSessionSnapshot from dify_agent.layers.execution_context import DifyExecutionContextLayerConfig from dify_agent.protocol import CreateRunRequest @@ -28,6 +29,7 @@ from models.agent_config_entities import ( from models.agent_config_entities import ( effective_declared_outputs as _effective_declared_outputs, ) +from models.provider_ids import ModelProviderID from .output_failure_orchestrator import retry_idempotency_key from .plugin_tools_builder import WorkflowAgentPluginToolsBuilder, WorkflowAgentPluginToolsBuildError @@ -66,6 +68,7 @@ class WorkflowAgentRuntimeBuildContext: # Stage 4 §7 / D-4: 0 for the first run, then incremented per retry. Drives the # idempotency key so the backend treats each retry as a fresh request. attempt: int = 0 + session_snapshot: CompositorSessionSnapshot | None = None @dataclass(frozen=True, slots=True) @@ -129,11 +132,14 @@ class WorkflowAgentRuntimeRequestBuilder: request = self._request_builder.build_for_workflow_node( AgentBackendWorkflowNodeRunInput( model=AgentBackendModelConfig( - plugin_id=agent_soul.model.plugin_id, - model_provider=agent_soul.model.model_provider, + plugin_id=self._plugin_daemon_plugin_id( + plugin_id=agent_soul.model.plugin_id, + model_provider=agent_soul.model.model_provider, + ), + model_provider=self._plugin_daemon_provider_name(agent_soul.model.model_provider), model=agent_soul.model.model, credentials=self._normalize_credentials(credentials), - model_settings=cast(dict[str, Any], agent_soul.model.model_settings), + model_settings=agent_soul.model.model_settings, ), # The execution-context layer is now the only public protocol # carrier for Dify tenant/user/run identifiers. ``user_id`` must @@ -158,6 +164,7 @@ class WorkflowAgentRuntimeRequestBuilder: user_prompt=user_prompt, output=self._build_output_config(node_job.declared_outputs), tools=tools_layer, + session_snapshot=context.session_snapshot, idempotency_key=self._idempotency_key(context), metadata=metadata, ) @@ -177,6 +184,20 @@ class WorkflowAgentRuntimeRequestBuilder: return "single_step" return "workflow_run" + @staticmethod + def _plugin_daemon_plugin_id(*, plugin_id: str, model_provider: str) -> str: + """Return the transport plugin id expected by plugin-daemon headers.""" + if plugin_id.count("/") == 1: + return plugin_id + if plugin_id: + return ModelProviderID(plugin_id).plugin_id + return ModelProviderID(model_provider).plugin_id + + @staticmethod + def _plugin_daemon_provider_name(model_provider: str) -> str: + """Return the provider name expected by plugin-daemon dispatch payloads.""" + return ModelProviderID(model_provider).provider_name + @staticmethod def _idempotency_key(context: WorkflowAgentRuntimeBuildContext) -> str: # Stage 4 §7 / D-4: retries get distinct keys (``...:retry-{attempt}``) so diff --git a/api/core/workflow/nodes/agent_v2/session_cleanup_layer.py b/api/core/workflow/nodes/agent_v2/session_cleanup_layer.py new file mode 100644 index 0000000000..3c225ac470 --- /dev/null +++ b/api/core/workflow/nodes/agent_v2/session_cleanup_layer.py @@ -0,0 +1,247 @@ +from __future__ import annotations + +import logging +from typing import override + +from clients.agent_backend import AgentBackendError, AgentBackendRunClient, AgentBackendRunRequestBuilder +from clients.agent_backend.factory import create_agent_backend_run_client +from configs import dify_config +from core.workflow.system_variables import SystemVariableKey, get_system_text +from graphon.graph_engine.layers import GraphEngineLayer +from graphon.graph_events import ( + GraphEngineEvent, + GraphRunAbortedEvent, + GraphRunFailedEvent, + GraphRunPartialSucceededEvent, + GraphRunSucceededEvent, +) + +from .session_store import StoredWorkflowAgentSession, WorkflowAgentRuntimeSessionStore + +logger = logging.getLogger(__name__) + + +# Upper bound on how long a cleanup-only run is allowed to settle before the +# layer gives up and leaves the row ACTIVE so it can be retried later. Cleanup +# work is mostly local agent-backend bookkeeping (no LLM inference), so 30s is +# generous; a hung backend should never block workflow termination beyond this. +_CLEANUP_WAIT_TIMEOUT_SECONDS = 30.0 + + +class WorkflowAgentSessionCleanupLayer(GraphEngineLayer): + """Retires workflow Agent session snapshots when a workflow reaches a terminal state. + + Implementation notes — there are two failure modes the cleanup path has to + avoid simultaneously: + + 1. The agenton compositor on the agent-backend side validates the cleanup + request's session snapshot against the replayed composition before + running any lifecycle hook. If the snapshot's layer names diverge from + the composition, the run fails asynchronously with ``run_failed`` — but + the initial ``POST /runs`` already returned 202, so the API side has no + visibility of the failure unless it waits for terminal status. The + ``composition_layer_specs`` persistence in A.1–A.4 plus the + ``_filter_snapshot_to_specs`` shape in ``build_cleanup_request`` keeps + the two name lists in sync. + + 2. The current agent backend's ``runner.py::_run_agent`` always invokes + ``run.get_layer("llm")`` and the structured-output / history validators + before exiting any slot — there is no ``purpose: "cleanup"`` branch + yet. A truly cleanup-only request (no LLM layer) therefore still + crashes inside the runner with ``Layer 'llm' is not defined in this + compositor run.``. Until the backend grows a cleanup-only purpose, + this layer **does not issue an HTTP cleanup run**: it simply retires + the local snapshot row so stale state cannot be re-resumed, and lets + the agent backend's own retention TTL release the suspended layers. + + The HTTP-cleanup machinery (``build_cleanup_request`` + ``wait_run``) is + intentionally still wired into the request builder + integration tests so + that when the agent backend supports cleanup runs we can flip the switch + here with a one-line change (see ``_HTTP_CLEANUP_SUPPORTED``). + """ + + # Flip to True once dify-agent's runner has a ``purpose=cleanup`` branch + # that skips the LLM/output/user-prompt invariants. Until then we only + # update the local row; the spec list is still persisted so the future + # HTTP cleanup path has everything it needs. + _HTTP_CLEANUP_SUPPORTED: bool = False + + _TERMINAL_EVENTS = ( + GraphRunSucceededEvent, + GraphRunPartialSucceededEvent, + GraphRunFailedEvent, + GraphRunAbortedEvent, + ) + + def __init__( + self, + *, + session_store: WorkflowAgentRuntimeSessionStore, + request_builder: AgentBackendRunRequestBuilder, + agent_backend_client: AgentBackendRunClient | None, + cleanup_wait_timeout_seconds: float = _CLEANUP_WAIT_TIMEOUT_SECONDS, + ) -> None: + super().__init__() + self._session_store = session_store + self._request_builder = request_builder + self._agent_backend_client = agent_backend_client + self._cleanup_wait_timeout_seconds = cleanup_wait_timeout_seconds + + @override + def on_graph_start(self) -> None: + return + + @override + def on_event(self, event: GraphEngineEvent) -> None: + if not isinstance(event, self._TERMINAL_EVENTS): + return + workflow_run_id = get_system_text( + self.graph_runtime_state.variable_pool, + SystemVariableKey.WORKFLOW_EXECUTION_ID, + ) + if not workflow_run_id: + logger.warning("Skipping workflow Agent session cleanup: workflow_run_id is missing.") + return + + for stored_session in self._session_store.list_active_sessions(workflow_run_id=workflow_run_id): + self._cleanup_session(stored_session) + + @override + def on_graph_end(self, error: Exception | None) -> None: + return + + def _cleanup_session(self, stored_session: StoredWorkflowAgentSession) -> None: + scope = stored_session.scope + if not self._HTTP_CLEANUP_SUPPORTED: + # Agent backend has no cleanup-only run mode yet (see class + # docstring). Retire the local row so future re-entries do not + # resume from stale state, and let the backend's retention TTL + # release the suspended layers on its own schedule. + logger.info( + "Workflow Agent session retired locally; HTTP cleanup is disabled " + "until the agent backend supports a cleanup-only run mode. " + "workflow_run_id=%s node_id=%s binding_id=%s agent_id=%s previous_run_id=%s", + scope.workflow_run_id, + scope.node_id, + scope.binding_id, + scope.agent_id, + stored_session.backend_run_id, + ) + self._session_store.mark_cleaned(scope=scope, backend_run_id=stored_session.backend_run_id) + return + + if self._agent_backend_client is None: + # HTTP cleanup was enabled by the caller but no client was wired + # in (e.g. the API runs without AGENT_BACKEND_BASE_URL configured). + # Leave the row ACTIVE so an operator restart with proper config + # can drive the cleanup; do not silently retire it. + logger.warning( + "Skipping Agent backend cleanup: HTTP cleanup is enabled but no agent " + "backend client is wired in. workflow_run_id=%s node_id=%s agent_id=%s", + scope.workflow_run_id, + scope.node_id, + scope.agent_id, + ) + return + + if not stored_session.composition_layer_specs: + # Sessions persisted before A.1 landed do not carry the spec list, + # so we cannot replay a valid cleanup composition. Leave the row + # ACTIVE and warn so the absence shows up in observability rather + # than being silently swallowed by a doomed cleanup run. + logger.warning( + "Skipping Agent backend cleanup: no composition_layer_specs persisted. " + "workflow_run_id=%s node_id=%s agent_id=%s", + scope.workflow_run_id, + scope.node_id, + scope.agent_id, + ) + return + + request = self._request_builder.build_cleanup_request( + session_snapshot=stored_session.session_snapshot, + composition_layer_specs=stored_session.composition_layer_specs, + idempotency_key=f"{scope.workflow_run_id}:{scope.node_id}:{scope.binding_id}:agent-session-cleanup", + metadata={ + "tenant_id": scope.tenant_id, + "app_id": scope.app_id, + "workflow_id": scope.workflow_id, + "workflow_run_id": scope.workflow_run_id, + "node_id": scope.node_id, + "node_execution_id": scope.node_execution_id, + "binding_id": scope.binding_id, + "agent_id": scope.agent_id, + "agent_config_snapshot_id": scope.agent_config_snapshot_id, + "previous_agent_backend_run_id": stored_session.backend_run_id, + }, + ) + try: + response = self._agent_backend_client.create_run(request) + except AgentBackendError: + logger.warning( + "Agent backend session cleanup request failed: workflow_run_id=%s node_id=%s agent_id=%s", + scope.workflow_run_id, + scope.node_id, + scope.agent_id, + exc_info=True, + ) + return + + try: + status_response = self._agent_backend_client.wait_run( + response.run_id, timeout_seconds=self._cleanup_wait_timeout_seconds + ) + except AgentBackendError: + logger.warning( + "Agent backend session cleanup wait_run failed: " + "workflow_run_id=%s node_id=%s agent_id=%s cleanup_run_id=%s", + scope.workflow_run_id, + scope.node_id, + scope.agent_id, + response.run_id, + exc_info=True, + ) + return + + if status_response.status != "succeeded": + logger.warning( + "Agent backend session cleanup did not succeed: status=%s error=%s " + "workflow_run_id=%s node_id=%s agent_id=%s cleanup_run_id=%s", + status_response.status, + status_response.error, + scope.workflow_run_id, + scope.node_id, + scope.agent_id, + response.run_id, + ) + return + + self._session_store.mark_cleaned(scope=scope, backend_run_id=response.run_id) + + +def build_workflow_agent_session_cleanup_layer() -> WorkflowAgentSessionCleanupLayer: + """Wire the cleanup layer with the standard production dependencies. + + The agent backend client is constructed only when ``AGENT_BACKEND_BASE_URL`` + is configured (or the deterministic fake is explicitly enabled). When + neither is set — for example unit tests that bring up the workflow runner + without an Agent node — we pass ``None`` so the layer stays harmless. With + ``_HTTP_CLEANUP_SUPPORTED = False`` the local-retire branch never touches + the client anyway, but keeping it ``None`` avoids importing httpx and lets + test harnesses skip backend configuration. + """ + agent_backend_client: AgentBackendRunClient | None + if dify_config.AGENT_BACKEND_USE_FAKE or dify_config.AGENT_BACKEND_BASE_URL: + agent_backend_client = create_agent_backend_run_client( + base_url=dify_config.AGENT_BACKEND_BASE_URL, + use_fake=dify_config.AGENT_BACKEND_USE_FAKE, + fake_scenario=dify_config.AGENT_BACKEND_FAKE_SCENARIO, + ) + else: + agent_backend_client = None + + return WorkflowAgentSessionCleanupLayer( + session_store=WorkflowAgentRuntimeSessionStore(), + request_builder=AgentBackendRunRequestBuilder(), + agent_backend_client=agent_backend_client, + ) diff --git a/api/core/workflow/nodes/agent_v2/session_store.py b/api/core/workflow/nodes/agent_v2/session_store.py new file mode 100644 index 0000000000..9c45742489 --- /dev/null +++ b/api/core/workflow/nodes/agent_v2/session_store.py @@ -0,0 +1,179 @@ +from __future__ import annotations + +from dataclasses import dataclass, field + +from agenton.compositor import CompositorSessionSnapshot +from pydantic import TypeAdapter +from sqlalchemy import select + +from clients.agent_backend.request_builder import CleanupLayerSpec +from core.db.session_factory import session_factory +from libs.datetime_utils import naive_utc_now +from models.agent import ( + WorkflowAgentRuntimeSession, + WorkflowAgentRuntimeSessionStatus, +) + +_SPECS_ADAPTER: TypeAdapter[list[CleanupLayerSpec]] = TypeAdapter(list[CleanupLayerSpec]) + + +def _serialize_specs(specs: list[CleanupLayerSpec]) -> str: + return _SPECS_ADAPTER.dump_json(specs).decode() + + +def _deserialize_specs(value: str | None) -> list[CleanupLayerSpec]: + if not value: + return [] + return _SPECS_ADAPTER.validate_json(value) + + +@dataclass(frozen=True, slots=True) +class WorkflowAgentSessionScope: + tenant_id: str + app_id: str + workflow_id: str + workflow_run_id: str | None + node_id: str + node_execution_id: str + binding_id: str + agent_id: str + agent_config_snapshot_id: str + + +@dataclass(frozen=True, slots=True) +class StoredWorkflowAgentSession: + scope: WorkflowAgentSessionScope + session_snapshot: CompositorSessionSnapshot + backend_run_id: str | None + composition_layer_specs: list[CleanupLayerSpec] = field(default_factory=list) + + +class WorkflowAgentRuntimeSessionStore: + """Stores Agent backend session snapshots for workflow Agent node re-entry.""" + + def load_active_snapshot(self, scope: WorkflowAgentSessionScope) -> CompositorSessionSnapshot | None: + if scope.workflow_run_id is None: + return None + + with session_factory.create_session() as session: + row = session.scalar( + select(WorkflowAgentRuntimeSession).where( + WorkflowAgentRuntimeSession.tenant_id == scope.tenant_id, + WorkflowAgentRuntimeSession.workflow_run_id == scope.workflow_run_id, + WorkflowAgentRuntimeSession.node_id == scope.node_id, + WorkflowAgentRuntimeSession.binding_id == scope.binding_id, + WorkflowAgentRuntimeSession.agent_id == scope.agent_id, + WorkflowAgentRuntimeSession.status == WorkflowAgentRuntimeSessionStatus.ACTIVE, + ) + ) + if row is None: + return None + return CompositorSessionSnapshot.model_validate_json(row.session_snapshot) + + def list_active_sessions(self, *, workflow_run_id: str) -> list[StoredWorkflowAgentSession]: + with session_factory.create_session() as session: + rows = session.scalars( + select(WorkflowAgentRuntimeSession).where( + WorkflowAgentRuntimeSession.workflow_run_id == workflow_run_id, + WorkflowAgentRuntimeSession.status == WorkflowAgentRuntimeSessionStatus.ACTIVE, + ) + ).all() + return [ + StoredWorkflowAgentSession( + scope=WorkflowAgentSessionScope( + tenant_id=row.tenant_id, + app_id=row.app_id, + workflow_id=row.workflow_id, + workflow_run_id=row.workflow_run_id, + node_id=row.node_id, + node_execution_id=row.node_execution_id or "", + binding_id=row.binding_id, + agent_id=row.agent_id, + agent_config_snapshot_id=row.agent_config_snapshot_id, + ), + session_snapshot=CompositorSessionSnapshot.model_validate_json(row.session_snapshot), + backend_run_id=row.backend_run_id, + composition_layer_specs=_deserialize_specs(row.composition_layer_specs), + ) + for row in rows + ] + + def save_active_snapshot( + self, + *, + scope: WorkflowAgentSessionScope, + backend_run_id: str, + snapshot: CompositorSessionSnapshot | None, + composition_layer_specs: list[CleanupLayerSpec], + ) -> None: + if scope.workflow_run_id is None or snapshot is None: + return + + snapshot_json = snapshot.model_dump_json() + specs_json = _serialize_specs(composition_layer_specs) + with session_factory.create_session() as session: + row = session.scalar( + select(WorkflowAgentRuntimeSession).where( + WorkflowAgentRuntimeSession.tenant_id == scope.tenant_id, + WorkflowAgentRuntimeSession.workflow_run_id == scope.workflow_run_id, + WorkflowAgentRuntimeSession.node_id == scope.node_id, + WorkflowAgentRuntimeSession.binding_id == scope.binding_id, + WorkflowAgentRuntimeSession.agent_id == scope.agent_id, + ) + ) + if row is None: + row = WorkflowAgentRuntimeSession( + tenant_id=scope.tenant_id, + app_id=scope.app_id, + workflow_id=scope.workflow_id, + workflow_run_id=scope.workflow_run_id, + node_id=scope.node_id, + node_execution_id=scope.node_execution_id, + binding_id=scope.binding_id, + agent_id=scope.agent_id, + agent_config_snapshot_id=scope.agent_config_snapshot_id, + backend_run_id=backend_run_id, + session_snapshot=snapshot_json, + composition_layer_specs=specs_json, + status=WorkflowAgentRuntimeSessionStatus.ACTIVE, + ) + session.add(row) + else: + row.node_execution_id = scope.node_execution_id + row.agent_config_snapshot_id = scope.agent_config_snapshot_id + row.backend_run_id = backend_run_id + row.session_snapshot = snapshot_json + row.composition_layer_specs = specs_json + row.status = WorkflowAgentRuntimeSessionStatus.ACTIVE + row.cleaned_at = None + session.commit() + + def mark_cleaned(self, *, scope: WorkflowAgentSessionScope, backend_run_id: str | None = None) -> None: + if scope.workflow_run_id is None: + return + + with session_factory.create_session() as session: + row = session.scalar( + select(WorkflowAgentRuntimeSession).where( + WorkflowAgentRuntimeSession.tenant_id == scope.tenant_id, + WorkflowAgentRuntimeSession.workflow_run_id == scope.workflow_run_id, + WorkflowAgentRuntimeSession.node_id == scope.node_id, + WorkflowAgentRuntimeSession.binding_id == scope.binding_id, + WorkflowAgentRuntimeSession.agent_id == scope.agent_id, + WorkflowAgentRuntimeSession.status == WorkflowAgentRuntimeSessionStatus.ACTIVE, + ) + ) + if row is None: + return + if backend_run_id is not None: + row.backend_run_id = backend_run_id + row.status = WorkflowAgentRuntimeSessionStatus.CLEANED + row.cleaned_at = naive_utc_now() + session.commit() + + +__all__ = [ + "StoredWorkflowAgentSession", + "WorkflowAgentRuntimeSessionStore", + "WorkflowAgentSessionScope", +] diff --git a/api/migrations/versions/2026_05_27_0953-7885bd53f9a9_add_workflow_agent_runtime_sessions.py b/api/migrations/versions/2026_05_27_0953-7885bd53f9a9_add_workflow_agent_runtime_sessions.py new file mode 100644 index 0000000000..e62dcb96ca --- /dev/null +++ b/api/migrations/versions/2026_05_27_0953-7885bd53f9a9_add_workflow_agent_runtime_sessions.py @@ -0,0 +1,90 @@ +"""add workflow agent runtime sessions + +Revision ID: 7885bd53f9a9 +Revises: d4a5e1f3c9b7 +Create Date: 2026-05-27 09:53:54.711805 + +""" + +import sqlalchemy as sa +from alembic import op + +import models as models + +# revision identifiers, used by Alembic. +revision = "7885bd53f9a9" +down_revision = "d4a5e1f3c9b7" +branch_labels = None +depends_on = None + + +def _is_pg() -> bool: + return op.get_bind().dialect.name == "postgresql" + + +def _uuid_column(name: str, *, nullable: bool = False, primary_key: bool = False) -> sa.Column: + """Match the ``uuidv7()`` default that other tables on Postgres rely on, + while staying portable on MySQL where the ORM supplies the id.""" + kwargs: dict[str, object] = {"nullable": nullable, "primary_key": primary_key} + if primary_key and _is_pg(): + kwargs["server_default"] = sa.text("uuidv7()") + return sa.Column(name, models.types.StringUUID(), **kwargs) + + +def upgrade() -> None: + op.create_table( + "workflow_agent_runtime_sessions", + _uuid_column("id", primary_key=True), + sa.Column("tenant_id", models.types.StringUUID(), nullable=False), + sa.Column("app_id", models.types.StringUUID(), nullable=False), + sa.Column("workflow_id", models.types.StringUUID(), nullable=False), + sa.Column("workflow_run_id", models.types.StringUUID(), nullable=False), + sa.Column("node_id", sa.String(length=255), nullable=False), + sa.Column("node_execution_id", sa.String(length=255), nullable=True), + sa.Column("binding_id", models.types.StringUUID(), nullable=False), + sa.Column("agent_id", models.types.StringUUID(), nullable=False), + sa.Column("agent_config_snapshot_id", models.types.StringUUID(), nullable=False), + sa.Column("backend_run_id", sa.String(length=255), nullable=True), + sa.Column("session_snapshot", models.types.LongText(), nullable=False), + # MySQL rejects ``server_default`` on TEXT/BLOB columns. The JSON + # payload is always populated at the ORM layer via + # ``WorkflowAgentRuntimeSessionStore.save_active_snapshot`` so the + # missing DB-level default cannot leave new rows uninitialized. + sa.Column("composition_layer_specs", models.types.LongText(), nullable=False), + sa.Column( + "status", + sa.String(length=32), + server_default=sa.text("'active'"), + nullable=False, + ), + sa.Column("cleaned_at", sa.DateTime(), nullable=True), + sa.Column("created_at", sa.DateTime(), server_default=sa.func.current_timestamp(), nullable=False), + sa.Column("updated_at", sa.DateTime(), server_default=sa.func.current_timestamp(), nullable=False), + sa.PrimaryKeyConstraint("id", name=op.f("workflow_agent_runtime_session_pkey")), + sa.UniqueConstraint( + "tenant_id", + "workflow_run_id", + "node_id", + "binding_id", + "agent_id", + name=op.f("workflow_agent_runtime_session_scope_unique"), + ), + ) + with op.batch_alter_table("workflow_agent_runtime_sessions", schema=None) as batch_op: + batch_op.create_index( + "workflow_agent_runtime_session_lookup_idx", + ["tenant_id", "workflow_run_id", "node_id", "status"], + unique=False, + ) + batch_op.create_index( + "workflow_agent_runtime_session_backend_run_idx", + ["backend_run_id"], + unique=False, + ) + + +def downgrade() -> None: + with op.batch_alter_table("workflow_agent_runtime_sessions", schema=None) as batch_op: + batch_op.drop_index("workflow_agent_runtime_session_backend_run_idx") + batch_op.drop_index("workflow_agent_runtime_session_lookup_idx") + op.drop_table("workflow_agent_runtime_sessions") diff --git a/api/models/__init__.py b/api/models/__init__.py index 92b6561411..47962cad21 100644 --- a/api/models/__init__.py +++ b/api/models/__init__.py @@ -20,6 +20,8 @@ from .agent import ( AgentStatus, WorkflowAgentBindingType, WorkflowAgentNodeBinding, + WorkflowAgentRuntimeSession, + WorkflowAgentRuntimeSessionStatus, ) from .api_based_extension import APIBasedExtension, APIBasedExtensionPoint from .comment import ( @@ -235,6 +237,8 @@ __all__ = [ "Workflow", "WorkflowAgentBindingType", "WorkflowAgentNodeBinding", + "WorkflowAgentRuntimeSession", + "WorkflowAgentRuntimeSessionStatus", "WorkflowAppLog", "WorkflowAppLogCreatedFrom", "WorkflowArchiveLog", diff --git a/api/models/agent.py b/api/models/agent.py index a8f048eef5..684efd513d 100644 --- a/api/models/agent.py +++ b/api/models/agent.py @@ -92,6 +92,15 @@ class WorkflowAgentBindingType(StrEnum): INLINE_AGENT = "inline_agent" +class WorkflowAgentRuntimeSessionStatus(StrEnum): + """Lifecycle state of an Agent backend session snapshot owned by a workflow run.""" + + # Snapshot can be reused by a later Agent run in the same workflow run. + ACTIVE = "active" + # Snapshot has been retired and must not be submitted to Agent backend again. + CLEANED = "cleaned" + + class Agent(DefaultFieldsMixin, Base): """Workspace-scoped Agent identity used by Agent Roster and workflow-only agents.""" @@ -273,3 +282,56 @@ class WorkflowAgentNodeBinding(DefaultFieldsMixin, Base): if isinstance(self.node_job_config, str): return json.loads(self.node_job_config) return dict(self.node_job_config) + + +class WorkflowAgentRuntimeSession(DefaultFieldsMixin, Base): + """Persisted Agent backend session snapshot for one workflow Agent node execution scope. + + The snapshot is runtime state returned by Agent backend. It is intentionally + separate from Agent Soul snapshots and workflow node-job config. + """ + + __tablename__ = "workflow_agent_runtime_sessions" + __table_args__ = ( + sa.PrimaryKeyConstraint("id", name="workflow_agent_runtime_session_pkey"), + UniqueConstraint( + "tenant_id", + "workflow_run_id", + "node_id", + "binding_id", + "agent_id", + name="workflow_agent_runtime_session_scope_unique", + ), + Index( + "workflow_agent_runtime_session_lookup_idx", + "tenant_id", + "workflow_run_id", + "node_id", + "status", + ), + Index("workflow_agent_runtime_session_backend_run_idx", "backend_run_id"), + ) + + tenant_id: Mapped[str] = mapped_column(StringUUID, nullable=False) + app_id: Mapped[str] = mapped_column(StringUUID, nullable=False) + workflow_id: Mapped[str] = mapped_column(StringUUID, nullable=False) + workflow_run_id: Mapped[str] = mapped_column(StringUUID, nullable=False) + node_id: Mapped[str] = mapped_column(String(255), nullable=False) + node_execution_id: Mapped[str | None] = mapped_column(String(255), nullable=True) + binding_id: Mapped[str] = mapped_column(StringUUID, nullable=False) + agent_id: Mapped[str] = mapped_column(StringUUID, nullable=False) + agent_config_snapshot_id: Mapped[str] = mapped_column(StringUUID, nullable=False) + backend_run_id: Mapped[str | None] = mapped_column(String(255), nullable=True) + session_snapshot: Mapped[str] = mapped_column(LongText, nullable=False) + # JSON-encoded list of ``WorkflowAgentSessionLayerSpec`` ({name, type, deps, + # config}). Drives Agent backend cleanup-only runs: the agenton compositor + # rejects a session snapshot whose layer names do not match the cleanup + # composition, so we must replay the same layer graph (minus credential- + # bearing plugin layers) when issuing the cleanup request. + composition_layer_specs: Mapped[str] = mapped_column(LongText, nullable=False, server_default="[]") + status: Mapped[WorkflowAgentRuntimeSessionStatus] = mapped_column( + EnumText(WorkflowAgentRuntimeSessionStatus, length=32), + nullable=False, + default=WorkflowAgentRuntimeSessionStatus.ACTIVE, + ) + cleaned_at: Mapped[datetime | None] = mapped_column(DateTime, nullable=True) diff --git a/api/tests/unit_tests/clients/agent_backend/test_cleanup_composition_compositor_integration.py b/api/tests/unit_tests/clients/agent_backend/test_cleanup_composition_compositor_integration.py new file mode 100644 index 0000000000..d78bfe7653 --- /dev/null +++ b/api/tests/unit_tests/clients/agent_backend/test_cleanup_composition_compositor_integration.py @@ -0,0 +1,134 @@ +"""Integration test for the cleanup request against the real agenton compositor. + +The bug fixed by A+D was invisible to unit tests that use ``FakeAgentBackendRunClient`` +because the fake client never runs agenton's ``_validate_session_snapshot``. This +test plugs a cleanup request through the real ``Compositor`` (with the same +providers the agent backend wires in production) so that the snapshot-vs- +composition name-order check would fail loudly if the cleanup builder ever +regressed back to the empty-composition shape. +""" + +from __future__ import annotations + +from typing import cast + +import pytest +from agenton.compositor import Compositor, CompositorSessionSnapshot, LayerProvider +from agenton.compositor.schemas import LayerSessionSnapshot +from agenton.layers.base import LifecycleState +from agenton_collections.layers.plain import PLAIN_PROMPT_LAYER_TYPE_ID +from agenton_collections.layers.plain.basic import PromptLayer +from agenton_collections.layers.pydantic_ai import PYDANTIC_AI_HISTORY_LAYER_TYPE_ID, PydanticAIHistoryLayer + +from clients.agent_backend import AgentBackendRunRequestBuilder, CleanupLayerSpec + + +def test_cleanup_request_passes_agenton_snapshot_validation(): + """The cleanup request's composition layer names must match the (filtered) + snapshot's layer names exactly — agenton's compositor enforces this and + the agent backend rejects mismatches as ``run_failed`` asynchronously, + which is the trap A/D fixed.""" + # Persisted (non-plugin) layer specs — these are what cleanup will replay. + # We exclude the dify.execution_context layer from this integration check + # because its real provider needs a plugin-daemon HTTP client; the cleanup + # validation we are exercising is the snapshot-vs-composition name check, + # which is purely structural and does not depend on which non-plugin layer + # types appear. + persisted_specs = [ + CleanupLayerSpec( + name="workflow_node_job_prompt", + type=PLAIN_PROMPT_LAYER_TYPE_ID, + config={"prefix": "Do the cleanup."}, + ), + CleanupLayerSpec(name="history", type=PYDANTIC_AI_HISTORY_LAYER_TYPE_ID), + ] + # Saved snapshot still carries the LLM layer entry — cleanup's + # ``_filter_snapshot_to_specs`` must drop it so names match. + full_snapshot = CompositorSessionSnapshot( + layers=[ + LayerSessionSnapshot( + name="workflow_node_job_prompt", + lifecycle_state=LifecycleState.SUSPENDED, + runtime_state={}, + ), + LayerSessionSnapshot( + name="history", + lifecycle_state=LifecycleState.SUSPENDED, + runtime_state={"messages": []}, + ), + LayerSessionSnapshot( + name="llm", + lifecycle_state=LifecycleState.SUSPENDED, + runtime_state={}, + ), + ] + ) + + cleanup_request = AgentBackendRunRequestBuilder().build_cleanup_request( + session_snapshot=full_snapshot, + composition_layer_specs=persisted_specs, + ) + + # Drive the real agenton compositor through ``from_config`` + ``_create_run`` + # the same way the agent backend's RunScheduler does. ``_create_run`` is the + # private path that calls ``_validate_session_snapshot``; we use it directly + # to keep the test synchronous (no async ``enter()`` lifecycle needed — + # validation is the only thing under test). + config = { + "schema_version": 1, + "layers": [ + {"name": layer.name, "type": layer.type, "deps": dict(layer.deps), "metadata": dict(layer.metadata)} + for layer in cleanup_request.composition.layers + ], + } + compositor = Compositor.from_config( + config, + providers=[ + LayerProvider.from_layer_type(PromptLayer), + LayerProvider.from_layer_type(PydanticAIHistoryLayer), + ], + ) + + layer_configs = {layer.name: layer.config for layer in cleanup_request.composition.layers} + # This is the call that would raise ``ValueError`` if the cleanup snapshot + # and composition disagreed on layer names — the exact failure mode the + # original ``layers=[]`` cleanup hit. + run = compositor._create_run( # type: ignore[reportPrivateUsage] + configs=cast(dict[str, object], layer_configs), + session_snapshot=cleanup_request.session_snapshot, + ) + assert list(run.slots.keys()) == ["workflow_node_job_prompt", "history"] + + +def test_cleanup_request_with_mismatched_specs_would_be_rejected_by_agenton(): + """Regression sentinel: if a future refactor stops filtering the snapshot, + agenton would reject the request — and that rejection is what the runtime + fix is preventing. We confirm the validator does fail when given the + pre-fix shape so the previous test's success is not a coincidence.""" + snapshot_with_extra = CompositorSessionSnapshot( + layers=[ + LayerSessionSnapshot( + name="history", + lifecycle_state=LifecycleState.SUSPENDED, + runtime_state={}, + ), + LayerSessionSnapshot( + name="llm", # extra layer not in composition + lifecycle_state=LifecycleState.SUSPENDED, + runtime_state={}, + ), + ] + ) + compositor = Compositor.from_config( + { + "schema_version": 1, + "layers": [{"name": "history", "type": PYDANTIC_AI_HISTORY_LAYER_TYPE_ID, "deps": {}, "metadata": {}}], + }, + providers=[LayerProvider.from_layer_type(PydanticAIHistoryLayer)], + ) + + with pytest.raises(ValueError, match="layer names must match"): + compositor._create_run( # type: ignore[reportPrivateUsage] + configs={}, + session_snapshot=snapshot_with_extra, + ) diff --git a/api/tests/unit_tests/clients/agent_backend/test_fake_client.py b/api/tests/unit_tests/clients/agent_backend/test_fake_client.py index 087cffef81..ddeed9621a 100644 --- a/api/tests/unit_tests/clients/agent_backend/test_fake_client.py +++ b/api/tests/unit_tests/clients/agent_backend/test_fake_client.py @@ -63,3 +63,25 @@ def test_fake_client_cancel_run_returns_cancelled_status(): assert cancelled.run_id == "fake-run-1" assert cancelled.status == "cancelled" + + +def test_fake_client_paused_scenario_returns_paused_status_and_event(): + """The paused scenario exists for HITL-style flows; both ``wait_run`` and + the event stream must report the pause so consumers can branch on it.""" + client = FakeAgentBackendRunClient(scenario=FakeAgentBackendScenario.PAUSED) + + status = client.wait_run("fake-run-1") + events = list(client.stream_events("fake-run-1")) + + assert status.status == "paused" + assert status.error is None + assert events[-1].type == "run_paused" + assert events[-1].data.reason == "human_input_required" + + +def test_fake_client_success_wait_run_returns_succeeded_status(): + """Covers the default SUCCESS branch of ``wait_run`` directly.""" + status = FakeAgentBackendRunClient().wait_run("fake-run-1") + + assert status.status == "succeeded" + assert status.error is None diff --git a/api/tests/unit_tests/clients/agent_backend/test_request_builder.py b/api/tests/unit_tests/clients/agent_backend/test_request_builder.py index 0df3940af8..e8dabddc03 100644 --- a/api/tests/unit_tests/clients/agent_backend/test_request_builder.py +++ b/api/tests/unit_tests/clients/agent_backend/test_request_builder.py @@ -1,15 +1,23 @@ +from typing import Any, cast + import pytest +from agenton.compositor import CompositorSessionSnapshot +from agenton.compositor.schemas import LayerSessionSnapshot from agenton.layers import ExitIntent -from agenton_collections.layers.plain import PLAIN_PROMPT_LAYER_TYPE_ID +from agenton.layers.base import LifecycleState +from agenton_collections.layers.plain import PLAIN_PROMPT_LAYER_TYPE_ID, PromptLayerConfig +from agenton_collections.layers.pydantic_ai import PYDANTIC_AI_HISTORY_LAYER_TYPE_ID from dify_agent.layers.dify_plugin import ( DIFY_PLUGIN_LLM_LAYER_TYPE_ID, DIFY_PLUGIN_TOOLS_LAYER_TYPE_ID, + DifyPluginLLMLayerConfig, DifyPluginToolConfig, DifyPluginToolsLayerConfig, ) from dify_agent.layers.execution_context import DIFY_EXECUTION_CONTEXT_LAYER_TYPE_ID, DifyExecutionContextLayerConfig from dify_agent.layers.output import DIFY_OUTPUT_LAYER_TYPE_ID from dify_agent.protocol import ( + DIFY_AGENT_HISTORY_LAYER_ID, DIFY_AGENT_MODEL_LAYER_ID, DIFY_AGENT_OUTPUT_LAYER_ID, CreateRunRequest, @@ -26,6 +34,7 @@ from clients.agent_backend import ( AgentBackendOutputConfig, AgentBackendRunRequestBuilder, AgentBackendWorkflowNodeRunInput, + CleanupLayerSpec, redact_for_agent_backend_log, ) @@ -71,10 +80,11 @@ def test_request_builder_outputs_dify_agent_create_run_request(): WORKFLOW_NODE_JOB_PROMPT_LAYER_ID, WORKFLOW_USER_PROMPT_LAYER_ID, DIFY_EXECUTION_CONTEXT_LAYER_ID, + DIFY_AGENT_HISTORY_LAYER_ID, DIFY_AGENT_MODEL_LAYER_ID, DIFY_AGENT_OUTPUT_LAYER_ID, ] - assert request.on_exit.default is ExitIntent.DELETE + assert request.on_exit.default is ExitIntent.SUSPEND assert request.idempotency_key == "workflow-run-1:node-execution-1" assert request.metadata == {"workflow_id": "workflow-1", "node_id": "node-1"} @@ -99,9 +109,10 @@ def test_request_builder_sets_model_and_output_layer_contract_ids(): layers = {layer.name: layer for layer in request.composition.layers} assert layers[DIFY_EXECUTION_CONTEXT_LAYER_ID].type == DIFY_EXECUTION_CONTEXT_LAYER_TYPE_ID - assert layers[DIFY_EXECUTION_CONTEXT_LAYER_ID].config.user_id == "user-1" + assert cast(DifyExecutionContextLayerConfig, layers[DIFY_EXECUTION_CONTEXT_LAYER_ID].config).user_id == "user-1" + assert layers[DIFY_AGENT_HISTORY_LAYER_ID].type == PYDANTIC_AI_HISTORY_LAYER_TYPE_ID assert layers[DIFY_AGENT_MODEL_LAYER_ID].type == DIFY_PLUGIN_LLM_LAYER_TYPE_ID - assert layers[DIFY_AGENT_MODEL_LAYER_ID].config.plugin_id == "langgenius/openai" + assert cast(DifyPluginLLMLayerConfig, layers[DIFY_AGENT_MODEL_LAYER_ID].config).plugin_id == "langgenius/openai" assert layers[DIFY_AGENT_MODEL_LAYER_ID].deps == {"execution_context": DIFY_EXECUTION_CONTEXT_LAYER_ID} assert layers[DIFY_AGENT_OUTPUT_LAYER_ID].type == DIFY_OUTPUT_LAYER_TYPE_ID @@ -130,16 +141,92 @@ def test_request_builder_adds_dify_plugin_tools_layer_when_configured(): assert layers[DIFY_PLUGIN_TOOLS_LAYER_ID].type == DIFY_PLUGIN_TOOLS_LAYER_TYPE_ID assert layers[DIFY_PLUGIN_TOOLS_LAYER_ID].deps == {"execution_context": DIFY_EXECUTION_CONTEXT_LAYER_ID} - assert layers[DIFY_PLUGIN_TOOLS_LAYER_ID].config.tools[0].tool_name == "current_time" + tools_config = cast(DifyPluginToolsLayerConfig, layers[DIFY_PLUGIN_TOOLS_LAYER_ID].config) + assert tools_config.tools[0].tool_name == "current_time" -def test_request_builder_can_suspend_on_exit_for_resume_or_babysit_paths(): +def test_request_builder_can_delete_on_exit_for_cleanup_paths(): run_input = _run_input() - run_input.suspend_on_exit = True + run_input.suspend_on_exit = False request = AgentBackendRunRequestBuilder().build_for_workflow_node(run_input) - assert request.on_exit.default is ExitIntent.SUSPEND + assert request.on_exit.default is ExitIntent.DELETE + + +def test_request_builder_builds_cleanup_request_replays_persisted_layer_specs(): + """The cleanup request must replay the persisted (non-plugin) layer specs + and filter the snapshot to match so the agenton compositor's + snapshot-vs-composition name-order validator passes.""" + session_snapshot = CompositorSessionSnapshot( + layers=[ + LayerSessionSnapshot(name="history", lifecycle_state=LifecycleState.SUSPENDED, runtime_state={"k": 1}), + LayerSessionSnapshot(name="llm", lifecycle_state=LifecycleState.SUSPENDED, runtime_state={}), + ] + ) + specs = [CleanupLayerSpec(name="history", type="pydantic_ai.history")] + + request = AgentBackendRunRequestBuilder().build_cleanup_request( + session_snapshot=session_snapshot, + composition_layer_specs=specs, + idempotency_key="run-1:node-1:binding-1:agent-session-cleanup", + metadata={"workflow_run_id": "run-1"}, + ) + + assert [layer.name for layer in request.composition.layers] == ["history"] + assert request.session_snapshot is not None + assert [layer.name for layer in request.session_snapshot.layers] == ["history"] + assert request.on_exit.default is ExitIntent.DELETE + assert request.idempotency_key == "run-1:node-1:binding-1:agent-session-cleanup" + assert request.metadata["agent_backend_lifecycle"] == "session_cleanup" + + +def test_request_builder_rejects_empty_composition_layer_specs(): + """Empty specs would put us back in the original ``layers=[]`` trap that + fails on agenton's snapshot-vs-composition validation.""" + with pytest.raises(ValueError, match="composition_layer_specs"): + AgentBackendRunRequestBuilder().build_cleanup_request( + session_snapshot=CompositorSessionSnapshot(layers=[]), + composition_layer_specs=[], + ) + + +def test_extract_cleanup_layer_specs_drops_plugin_layers_keeps_configs(): + from dify_agent.protocol import RunComposition, RunLayerSpec + + from clients.agent_backend import extract_cleanup_layer_specs + + composition = RunComposition( + layers=[ + RunLayerSpec( + name="agent_soul_prompt", + type="plain.prompt", + config=PromptLayerConfig(prefix="hello"), + ), + RunLayerSpec( + name="llm", + type="dify.plugin.llm", + config=None, # protocol allows None; the redacted config is what matters + ), + RunLayerSpec( + name="tools", + type="dify.plugin.tools", + ), + RunLayerSpec( + name="history", + type="pydantic_ai.history", + ), + ] + ) + + specs = extract_cleanup_layer_specs(composition) + + assert [spec.name for spec in specs] == ["agent_soul_prompt", "history"] + # Non-plugin configs are dumped as JSON-compatible dicts so the persisted + # row can be replayed without holding live pydantic instances. + soul_config = specs[0].config + assert isinstance(soul_config, dict) + assert soul_config.get("prefix") == "hello" def test_request_builder_rejects_blank_prompts(): @@ -159,6 +246,6 @@ def test_request_builder_rejects_blank_prompts(): def test_redact_for_agent_backend_log_hides_credentials(): request = AgentBackendRunRequestBuilder().build_for_workflow_node(_run_input()) - redacted = redact_for_agent_backend_log(request) + redacted = cast(dict[str, Any], redact_for_agent_backend_log(request)) - assert redacted["composition"]["layers"][4]["config"]["credentials"] == "[REDACTED]" + assert redacted["composition"]["layers"][5]["config"]["credentials"] == "[REDACTED]" diff --git a/api/tests/unit_tests/core/workflow/nodes/agent_v2/test_agent_node.py b/api/tests/unit_tests/core/workflow/nodes/agent_v2/test_agent_node.py index b182b94161..6b1f737c96 100644 --- a/api/tests/unit_tests/core/workflow/nodes/agent_v2/test_agent_node.py +++ b/api/tests/unit_tests/core/workflow/nodes/agent_v2/test_agent_node.py @@ -1,9 +1,12 @@ from types import SimpleNamespace from typing import cast +from agenton.compositor import CompositorSessionSnapshot + from clients.agent_backend import ( AgentBackendRunEventAdapter, AgentBackendStreamInternalEvent, + CleanupLayerSpec, FakeAgentBackendRunClient, FakeAgentBackendScenario, ) @@ -13,9 +16,10 @@ from core.workflow.nodes.agent_v2.binding_resolver import WorkflowAgentBindingBu from core.workflow.nodes.agent_v2.entities import DifyAgentNodeData from core.workflow.nodes.agent_v2.output_adapter import WorkflowAgentOutputAdapter from core.workflow.nodes.agent_v2.runtime_request_builder import WorkflowAgentRuntimeRequestBuilder +from core.workflow.nodes.agent_v2.session_store import WorkflowAgentRuntimeSessionStore, WorkflowAgentSessionScope from graphon.entities import GraphInitParams from graphon.enums import BuiltinNodeTypes, WorkflowNodeExecutionMetadataKey, WorkflowNodeExecutionStatus -from graphon.node_events import StreamCompletedEvent +from graphon.node_events import PauseRequestedEvent, StreamCompletedEvent from graphon.runtime import GraphRuntimeState from graphon.variables.segments import StringSegment from models.agent import Agent, AgentConfigSnapshot, WorkflowAgentNodeBinding @@ -84,7 +88,47 @@ class FakeBindingResolver(WorkflowAgentBindingResolver): return WorkflowAgentBindingBundle(binding=self.binding, agent=self.agent, snapshot=self.snapshot) -def _node(*, scenario: FakeAgentBackendScenario = FakeAgentBackendScenario.SUCCESS) -> DifyAgentNode: +class FakeSessionStore: + def __init__(self, snapshot: CompositorSessionSnapshot | None = None) -> None: + self.loaded_snapshot = snapshot + self.saved: list[ + tuple[ + WorkflowAgentSessionScope, + str, + CompositorSessionSnapshot | None, + list[CleanupLayerSpec], + ] + ] = [] + self.cleaned: list[tuple[WorkflowAgentSessionScope, str | None]] = [] + + def load_active_snapshot(self, scope: WorkflowAgentSessionScope) -> CompositorSessionSnapshot | None: + return self.loaded_snapshot + + def save_active_snapshot( + self, + *, + scope: WorkflowAgentSessionScope, + backend_run_id: str, + snapshot: CompositorSessionSnapshot | None, + composition_layer_specs: list[CleanupLayerSpec], + ) -> None: + self.saved.append((scope, backend_run_id, snapshot, list(composition_layer_specs))) + + def mark_cleaned( + self, + *, + scope: WorkflowAgentSessionScope, + backend_run_id: str | None = None, + ) -> None: + self.cleaned.append((scope, backend_run_id)) + + +def _node( + *, + scenario: FakeAgentBackendScenario = FakeAgentBackendScenario.SUCCESS, + agent_backend_client: FakeAgentBackendRunClient | None = None, + session_store: FakeSessionStore | None = None, +) -> DifyAgentNode: graph_init_params = GraphInitParams( workflow_id="workflow-1", graph_config={"nodes": [], "edges": []}, @@ -106,6 +150,7 @@ def _node(*, scenario: FakeAgentBackendScenario = FakeAgentBackendScenario.SUCCE def is_owned_by_tenant(self, *, file_id: str, tenant_id: str) -> bool: return True + client = agent_backend_client or FakeAgentBackendRunClient(scenario=scenario) return DifyAgentNode( node_id="agent-node", data=DifyAgentNodeData.model_validate({"type": BuiltinNodeTypes.AGENT, "version": "2"}), @@ -113,11 +158,12 @@ def _node(*, scenario: FakeAgentBackendScenario = FakeAgentBackendScenario.SUCCE graph_runtime_state=cast(GraphRuntimeState, SimpleNamespace(variable_pool=FakeVariablePool())), binding_resolver=FakeBindingResolver(), runtime_request_builder=WorkflowAgentRuntimeRequestBuilder(credentials_provider=FakeCredentialsProvider()), - agent_backend_client=FakeAgentBackendRunClient(scenario=scenario), + agent_backend_client=client, event_adapter=AgentBackendRunEventAdapter(), output_adapter=WorkflowAgentOutputAdapter(), type_checker=PerOutputTypeChecker(file_validator=_AlwaysAllowFileValidator()), failure_orchestrator=OutputFailureOrchestrator(), + session_store=cast(WorkflowAgentRuntimeSessionStore | None, session_store), ) @@ -132,7 +178,7 @@ def test_agent_node_run_maps_successful_agent_backend_run_to_node_result(): assert agent_log["agent_backend"]["run_id"] == "fake-run-1" assert agent_log["agent_backend"]["status"] == "succeeded" assert result.process_data["agent_id"] == "agent-1" - assert result.inputs["agent_backend_request"]["composition"]["layers"][4]["config"]["credentials"] == "[REDACTED]" + assert result.inputs["agent_backend_request"]["composition"]["layers"][5]["config"]["credentials"] == "[REDACTED]" def test_agent_node_run_maps_failed_agent_backend_run_to_node_result(): @@ -145,6 +191,126 @@ def test_agent_node_run_maps_failed_agent_backend_run_to_node_result(): assert result.error_type == "unit_test" +def test_agent_node_failed_run_marks_session_cleaned_to_prevent_stale_reuse(): + """A failed agent run must retire the local ACTIVE session row so a workflow + loop back into the same Agent node does not resume from a stale snapshot.""" + existing_snapshot = CompositorSessionSnapshot(layers=[]) + store = FakeSessionStore(snapshot=existing_snapshot) + + events = list(_node(scenario=FakeAgentBackendScenario.FAILED, session_store=store)._run()) + + assert len(events) == 1 + assert store.cleaned, "failed agent run should mark the session cleaned" + cleaned_scope, cleaned_backend_run_id = store.cleaned[0] + assert cleaned_scope.workflow_run_id == "workflow-run-1" + assert cleaned_backend_run_id == "fake-run-1" + # A failed run does not produce a fresh snapshot to persist. + assert store.saved == [] + + +def test_agent_node_saves_success_snapshot_and_reuses_existing_snapshot(): + existing_snapshot = CompositorSessionSnapshot(layers=[]) + store = FakeSessionStore(snapshot=existing_snapshot) + client = FakeAgentBackendRunClient() + node = _node(agent_backend_client=client, session_store=store) + + events = list(node._run()) + + assert len(events) == 1 + assert store.saved + scope, backend_run_id, saved_snapshot, saved_specs = store.saved[0] + assert scope.workflow_run_id == "workflow-run-1" + assert backend_run_id == "fake-run-1" + assert saved_snapshot is not None + assert client.request is not None + assert client.request.session_snapshot is existing_snapshot + # Persist enough composition shape to replay a cleanup run; plugin layers + # (which would carry credentials) are intentionally absent. + saved_layer_names = [spec.name for spec in saved_specs] + assert saved_layer_names, "cleanup specs must persist at least the non-plugin layers" + plugin_types = {"dify.plugin.llm", "dify.plugin.tools"} + assert not {spec.type for spec in saved_specs} & plugin_types + + +def test_agent_node_run_when_session_store_save_raises_records_persist_error_in_metadata(): + """A DB-side write failure must not crash the node; it should set + ``session_snapshot_persist_error`` in the agent_backend metadata so the + incident is observable from the workflow_node_executions record.""" + + class _ExplodingSessionStore(FakeSessionStore): + def save_active_snapshot(self, **kwargs): # type: ignore[override] + del kwargs + raise RuntimeError("simulated DB failure") + + store = _ExplodingSessionStore() + events = list(_node(session_store=store)._run()) + + assert len(events) == 1 + result = cast(StreamCompletedEvent, events[0]).node_run_result + assert result.status == WorkflowNodeExecutionStatus.SUCCEEDED + agent_backend = result.metadata[WorkflowNodeExecutionMetadataKey.AGENT_LOG]["agent_backend"] + assert agent_backend["session_snapshot_persisted"] is False + assert agent_backend["session_snapshot_persist_error"] == "workflow_agent_runtime_session_store_error" + + +def test_agent_node_failed_run_when_mark_cleaned_raises_records_cleanup_error_in_metadata(): + """Same defensive pattern: a DB-side mark_cleaned failure must surface as + a ``session_snapshot_cleanup_error`` in metadata, not as a node crash.""" + + class _ExplodingMarkCleanedStore(FakeSessionStore): + def mark_cleaned(self, **kwargs): # type: ignore[override] + del kwargs + raise RuntimeError("simulated DB failure") + + store = _ExplodingMarkCleanedStore() + events = list(_node(scenario=FakeAgentBackendScenario.FAILED, session_store=store)._run()) + + assert len(events) == 1 + result = cast(StreamCompletedEvent, events[0]).node_run_result + assert result.status == WorkflowNodeExecutionStatus.FAILED + agent_backend = result.metadata[WorkflowNodeExecutionMetadataKey.AGENT_LOG]["agent_backend"] + assert agent_backend["session_snapshot_cleaned_on_failure"] is False + assert agent_backend["session_snapshot_cleanup_error"] == "workflow_agent_runtime_session_store_error" + + +def test_agent_node_success_run_without_session_store_skips_persistence(): + """When ``session_store`` is None the node still completes successfully — + the lifecycle branch is a no-op and the run result is unaffected.""" + events = list(_node(session_store=None)._run()) + + assert len(events) == 1 + result = cast(StreamCompletedEvent, events[0]).node_run_result + assert result.status == WorkflowNodeExecutionStatus.SUCCEEDED + agent_backend = result.metadata[WorkflowNodeExecutionMetadataKey.AGENT_LOG]["agent_backend"] + # No persistence metadata is attached when the store is missing. + assert "session_snapshot_persisted" not in agent_backend + + +def test_agent_node_failed_run_without_session_store_skips_mark_cleaned(): + """``session_store=None`` + failed terminal must remain a no-op for + the cleanup branch — the node failure path still surfaces correctly.""" + events = list(_node(scenario=FakeAgentBackendScenario.FAILED, session_store=None)._run()) + + assert len(events) == 1 + result = cast(StreamCompletedEvent, events[0]).node_run_result + assert result.status == WorkflowNodeExecutionStatus.FAILED + agent_backend = result.metadata[WorkflowNodeExecutionMetadataKey.AGENT_LOG]["agent_backend"] + assert "session_snapshot_cleaned_on_failure" not in agent_backend + + +def test_agent_node_paused_run_requests_workflow_pause_and_persists_snapshot(): + store = FakeSessionStore() + node = _node(scenario=FakeAgentBackendScenario.PAUSED, session_store=store) + + events = list(node._run()) + + assert len(events) == 1 + assert isinstance(events[0], PauseRequestedEvent) + assert store.saved + assert store.saved[0][1] == "fake-run-1" + assert store.saved[0][3], "paused agent run should still persist replayable layer specs" + + def test_agent_node_records_stream_usage_metadata(): metadata = {"agent_backend": {"run_id": "run-1"}} diff --git a/api/tests/unit_tests/core/workflow/nodes/agent_v2/test_runtime_request_builder.py b/api/tests/unit_tests/core/workflow/nodes/agent_v2/test_runtime_request_builder.py index 48ae0d46f2..b2f9e99c1b 100644 --- a/api/tests/unit_tests/core/workflow/nodes/agent_v2/test_runtime_request_builder.py +++ b/api/tests/unit_tests/core/workflow/nodes/agent_v2/test_runtime_request_builder.py @@ -1,10 +1,14 @@ from dataclasses import replace +from typing import cast import pytest +from agenton.compositor import CompositorSessionSnapshot from dify_agent.layers.dify_plugin import DifyPluginToolConfig, DifyPluginToolsLayerConfig +from dify_agent.protocol import DIFY_AGENT_HISTORY_LAYER_ID, DIFY_AGENT_MODEL_LAYER_ID from clients.agent_backend import DIFY_EXECUTION_CONTEXT_LAYER_ID, DIFY_PLUGIN_TOOLS_LAYER_ID from core.app.entities.app_invoke_entities import DifyRunContext, InvokeFrom, UserFrom +from core.workflow.nodes.agent_v2.plugin_tools_builder import WorkflowAgentPluginToolsBuilder from core.workflow.nodes.agent_v2.runtime_request_builder import ( WorkflowAgentRuntimeBuildContext, WorkflowAgentRuntimeRequestBuilder, @@ -27,6 +31,17 @@ class FakeCredentialsProvider: return {"api_key": "secret-key"} +class CapturingCredentialsProvider: + def __init__(self) -> None: + self.provider_name: str | None = None + self.model_name: str | None = None + + def fetch(self, provider_name: str, model_name: str) -> dict[str, object]: + self.provider_name = provider_name + self.model_name = model_name + return {"api_key": "secret-key"} + + class FakePluginToolsBuilder: def __init__(self) -> None: # Capture the runtime invocation source so tests can assert it was @@ -136,7 +151,31 @@ def test_builds_create_run_request_from_agent_soul_and_node_job(): assert dumped["composition"]["layers"][1]["config"]["prefix"] == "Use the previous output." assert "Previous result" in dumped["composition"]["layers"][2]["config"]["user"] assert dumped["composition"]["layers"][-1]["config"]["json_schema"]["properties"]["summary"]["type"] == "string" - assert result.redacted_request["composition"]["layers"][4]["config"]["credentials"] == "[REDACTED]" + assert DIFY_AGENT_HISTORY_LAYER_ID in layers + assert result.redacted_request["composition"]["layers"][5]["config"]["credentials"] == "[REDACTED]" + + +def test_normalizes_langgenius_model_provider_for_agent_backend_transport(): + context = _context() + context.snapshot.config_snapshot = AgentSoulConfig( + prompt={"system_prompt": "You are careful."}, + model=AgentSoulModelConfig( + plugin_id="langgenius/openai/openai", + model_provider="langgenius/openai/openai", + model="gpt-test", + ), + ) + credentials_provider = CapturingCredentialsProvider() + + result = WorkflowAgentRuntimeRequestBuilder(credentials_provider=credentials_provider).build(context) + + dumped = result.request.model_dump(mode="json") + layers = {layer["name"]: layer for layer in dumped["composition"]["layers"]} + model_config = layers[DIFY_AGENT_MODEL_LAYER_ID]["config"] + assert credentials_provider.provider_name == "langgenius/openai/openai" + assert credentials_provider.model_name == "gpt-test" + assert model_config["plugin_id"] == "langgenius/openai" + assert model_config["model_provider"] == "openai" def test_builds_workflow_run_request_with_file_output_schema_and_reserved_metadata(): @@ -187,7 +226,7 @@ def test_builds_workflow_run_request_with_file_output_schema_and_reserved_metada assert output_schema["properties"]["report"]["properties"]["file_id"]["type"] == "string" assert output_schema["properties"]["confidence"]["type"] == "number" assert output_schema["required"] == ["report"] - assert dumped["composition"]["layers"][4]["config"]["model_settings"] == {"temperature": 0.2} + assert dumped["composition"]["layers"][5]["config"]["model_settings"] == {"temperature": 0.2} assert result.metadata["runtime_support"]["reserved_status"]["tools.dify_tools"] == "supported_when_config_valid" assert result.metadata["runtime_support"]["reserved_status"]["tools.cli_tools"] == "reserved_not_executed" warnings = result.metadata["runtime_support"]["unsupported_runtime_warnings"] @@ -224,7 +263,7 @@ def test_builds_workflow_run_request_with_dify_plugin_tools_layer(): plugin_tools_builder = FakePluginToolsBuilder() result = WorkflowAgentRuntimeRequestBuilder( credentials_provider=FakeCredentialsProvider(), - plugin_tools_builder=plugin_tools_builder, + plugin_tools_builder=cast(WorkflowAgentPluginToolsBuilder, plugin_tools_builder), ).build(context) dumped = result.request.model_dump(mode="json") @@ -244,6 +283,15 @@ def test_builds_workflow_run_request_with_dify_plugin_tools_layer(): assert plugin_tools_builder.last_invoke_from == context.dify_context.invoke_from +def test_build_passes_saved_session_snapshot_to_agent_backend_request(): + session_snapshot = CompositorSessionSnapshot(layers=[]) + context = replace(_context(), session_snapshot=session_snapshot) + + result = WorkflowAgentRuntimeRequestBuilder(credentials_provider=FakeCredentialsProvider()).build(context) + + assert result.request.session_snapshot is session_snapshot + + def test_requires_agent_soul_model_config(): context = _context() snapshot = AgentConfigSnapshot( diff --git a/api/tests/unit_tests/core/workflow/nodes/agent_v2/test_session_cleanup_layer.py b/api/tests/unit_tests/core/workflow/nodes/agent_v2/test_session_cleanup_layer.py new file mode 100644 index 0000000000..11e77f43ca --- /dev/null +++ b/api/tests/unit_tests/core/workflow/nodes/agent_v2/test_session_cleanup_layer.py @@ -0,0 +1,412 @@ +from datetime import UTC +from typing import cast + +import pytest +from agenton.compositor import CompositorSessionSnapshot +from agenton.compositor.schemas import LayerSessionSnapshot +from agenton.layers.base import LifecycleState +from dify_agent.protocol import CancelRunRequest, RunEvent, RunStatusResponse + +from clients.agent_backend import AgentBackendRunRequestBuilder, CleanupLayerSpec, FakeAgentBackendRunClient +from clients.agent_backend.errors import AgentBackendHTTPError +from core.workflow.nodes.agent_v2.session_cleanup_layer import WorkflowAgentSessionCleanupLayer +from core.workflow.nodes.agent_v2.session_store import ( + StoredWorkflowAgentSession, + WorkflowAgentRuntimeSessionStore, + WorkflowAgentSessionScope, +) +from core.workflow.system_variables import build_system_variables +from graphon.entities.pause_reason import SchedulingPause +from graphon.graph_engine.command_channels import CommandChannel +from graphon.graph_events import ( + GraphRunAbortedEvent, + GraphRunFailedEvent, + GraphRunPartialSucceededEvent, + GraphRunPausedEvent, + GraphRunStartedEvent, + GraphRunSucceededEvent, +) +from graphon.runtime import GraphRuntimeState, ReadOnlyGraphRuntimeStateWrapper, VariablePool + + +def _layer_snapshot(name: str) -> LayerSessionSnapshot: + return LayerSessionSnapshot( + name=name, + lifecycle_state=LifecycleState.SUSPENDED, + runtime_state={}, + ) + + +def _stored_session(scope: WorkflowAgentSessionScope, *, index: int = 1) -> StoredWorkflowAgentSession: + """A typical stored session with prompt + execution_context + history + llm specs. + + The LLM layer is *not* in ``composition_layer_specs`` because the cleanup + contract excludes credential-bearing plugin layers, but it *is* present in + the saved snapshot so the layer's filter logic gets exercised. + """ + return StoredWorkflowAgentSession( + scope=scope, + session_snapshot=CompositorSessionSnapshot( + layers=[ + _layer_snapshot("workflow_node_job_prompt"), + _layer_snapshot("execution_context"), + _layer_snapshot("history"), + _layer_snapshot("llm"), + ] + ), + backend_run_id=f"agent-run-{index}", + composition_layer_specs=[ + CleanupLayerSpec(name="workflow_node_job_prompt", type="plain.prompt", config={"prefix": "ok"}), + CleanupLayerSpec(name="execution_context", type="dify.execution_context", config={"tenant_id": "t"}), + CleanupLayerSpec(name="history", type="pydantic_ai.history"), + ], + ) + + +class FakeSessionStore: + """In-memory stand-in for ``WorkflowAgentRuntimeSessionStore``.""" + + def __init__(self, *, stored: list[StoredWorkflowAgentSession] | None = None) -> None: + self._stored = stored if stored is not None else [_stored_session(_default_scope())] + self.list_calls: list[str] = [] + self.cleaned: list[tuple[WorkflowAgentSessionScope, str | None]] = [] + + def list_active_sessions(self, *, workflow_run_id: str) -> list[StoredWorkflowAgentSession]: + self.list_calls.append(workflow_run_id) + return list(self._stored) + + def mark_cleaned(self, *, scope: WorkflowAgentSessionScope, backend_run_id: str | None = None) -> None: + self.cleaned.append((scope, backend_run_id)) + + +def _default_scope() -> WorkflowAgentSessionScope: + return WorkflowAgentSessionScope( + tenant_id="tenant-1", + app_id="app-1", + workflow_id="workflow-1", + workflow_run_id="workflow-run-1", + node_id="agent-node", + node_execution_id="node-exec-1", + binding_id="binding-1", + agent_id="agent-1", + agent_config_snapshot_id="snapshot-1", + ) + + +class _WaitableFakeAgentBackendRunClient(FakeAgentBackendRunClient): + """``FakeAgentBackendRunClient`` plus the ``wait_run`` hook the layer needs.""" + + def __init__( + self, + *, + run_id: str = "cleanup-run-1", + wait_status: str = "succeeded", + wait_error: str | None = None, + wait_raises: Exception | None = None, + ) -> None: + super().__init__(run_id=run_id) + self._wait_status = wait_status + self._wait_error = wait_error + self._wait_raises = wait_raises + self.wait_calls: list[tuple[str, float | None]] = [] + + def wait_run(self, run_id: str, *, timeout_seconds: float | None = None) -> RunStatusResponse: + self.wait_calls.append((run_id, timeout_seconds)) + if self._wait_raises is not None: + raise self._wait_raises + from datetime import datetime + + return RunStatusResponse( + run_id=run_id, + status=cast(object, self._wait_status), # protocol Literal; cast keeps tests flexible + created_at=datetime(2026, 1, 1, tzinfo=UTC), + updated_at=datetime(2026, 1, 1, tzinfo=UTC), + error=self._wait_error, + ) + + # Inherit ``create_run`` from FakeAgentBackendRunClient; the missing protocol + # methods below are stub-only because the cleanup layer never calls them. + def cancel_run(self, run_id: str, request: CancelRunRequest | None = None): # pragma: no cover + del run_id, request + raise NotImplementedError + + def stream_events(self, run_id: str, *, after: str | None = None): # pragma: no cover + del run_id, after + if False: + yield cast(RunEvent, None) + + +def _build_layer( + *, + session_store: FakeSessionStore, + agent_backend_client: _WaitableFakeAgentBackendRunClient, + http_cleanup_supported: bool = True, +) -> WorkflowAgentSessionCleanupLayer: + variable_pool = VariablePool.from_bootstrap( + system_variables=build_system_variables(workflow_execution_id="workflow-run-1"), + user_inputs={}, + conversation_variables=[], + ) + runtime_state = GraphRuntimeState(variable_pool=variable_pool, start_at=0.0) + layer = WorkflowAgentSessionCleanupLayer( + session_store=cast(WorkflowAgentRuntimeSessionStore, session_store), + request_builder=AgentBackendRunRequestBuilder(), + agent_backend_client=agent_backend_client, + ) + # Tests opt in to the future HTTP-cleanup branch; the production default + # (False) is exercised by the dedicated tests below. + layer._HTTP_CLEANUP_SUPPORTED = http_cleanup_supported # type: ignore[reportPrivateUsage] + layer.initialize(ReadOnlyGraphRuntimeStateWrapper(runtime_state), cast(CommandChannel, object())) + return layer + + +@pytest.mark.parametrize( + "terminal_event", + [ + GraphRunSucceededEvent(outputs={}), + GraphRunPartialSucceededEvent(exceptions_count=1, outputs={}), + GraphRunFailedEvent(error="boom"), + GraphRunAbortedEvent(reason="user cancelled", outputs={}), + ], + ids=["succeeded", "partial_succeeded", "failed", "aborted"], +) +def test_cleanup_layer_triggers_cleanup_only_run_on_each_terminal_event(terminal_event): + session_store = FakeSessionStore() + agent_backend_client = _WaitableFakeAgentBackendRunClient() + layer = _build_layer(session_store=session_store, agent_backend_client=agent_backend_client) + + layer.on_event(terminal_event) + + assert session_store.list_calls == ["workflow-run-1"] + assert agent_backend_client.request is not None + # Cleanup composition replays the persisted (non-plugin) layer specs so the + # agent backend's snapshot-vs-composition name match succeeds. + layer_names = [layer.name for layer in agent_backend_client.request.composition.layers] + assert layer_names == ["workflow_node_job_prompt", "execution_context", "history"] + assert agent_backend_client.request.on_exit.default.value == "delete" + assert agent_backend_client.request.metadata["agent_backend_lifecycle"] == "session_cleanup" + # Snapshot is filtered to drop the plugin layer entry so names match the + # cleanup composition. + assert agent_backend_client.request.session_snapshot is not None + snapshot_names = [layer.name for layer in agent_backend_client.request.session_snapshot.layers] + assert snapshot_names == ["workflow_node_job_prompt", "execution_context", "history"] + # The layer waited for terminal status and the run succeeded, so the row + # is marked CLEANED with the cleanup run id. + assert agent_backend_client.wait_calls + assert session_store.cleaned == [(_default_scope(), "cleanup-run-1")] + + +@pytest.mark.parametrize( + "non_terminal_event", + [ + GraphRunStartedEvent(), + GraphRunPausedEvent(reasons=[SchedulingPause(message="awaiting human input")], outputs={}), + ], + ids=["started", "paused"], +) +def test_cleanup_layer_ignores_non_terminal_events(non_terminal_event): + session_store = FakeSessionStore() + agent_backend_client = _WaitableFakeAgentBackendRunClient() + layer = _build_layer(session_store=session_store, agent_backend_client=agent_backend_client) + + layer.on_event(non_terminal_event) + + assert session_store.list_calls == [] + assert agent_backend_client.request is None + assert session_store.cleaned == [] + + +def test_cleanup_layer_does_not_mark_cleaned_when_cleanup_run_fails(): + """Trap D: cleanup-only run goes ``run_failed`` (e.g. snapshot validation + error) — the layer must leave the row ACTIVE so it can be retried instead + of silently leaking suspended agent-backend layers.""" + session_store = FakeSessionStore() + agent_backend_client = _WaitableFakeAgentBackendRunClient( + wait_status="failed", + wait_error="snapshot mismatch", + ) + layer = _build_layer(session_store=session_store, agent_backend_client=agent_backend_client) + + layer.on_event(GraphRunSucceededEvent(outputs={})) + + assert agent_backend_client.wait_calls + assert session_store.cleaned == [] + + +def test_cleanup_layer_does_not_mark_cleaned_when_wait_raises(): + session_store = FakeSessionStore() + agent_backend_client = _WaitableFakeAgentBackendRunClient( + wait_raises=AgentBackendHTTPError("boom", status_code=500, detail=None), + ) + layer = _build_layer(session_store=session_store, agent_backend_client=agent_backend_client) + + layer.on_event(GraphRunSucceededEvent(outputs={})) + + assert session_store.cleaned == [] + + +def test_cleanup_layer_marks_cleaned_locally_when_http_cleanup_disabled(): + """Production default: dify-agent has no cleanup-only run mode yet, so the + layer must retire the local row without issuing a doomed HTTP request that + would crash inside the agent backend's runner on the missing LLM layer.""" + session_store = FakeSessionStore() + agent_backend_client = _WaitableFakeAgentBackendRunClient() + layer = _build_layer( + session_store=session_store, + agent_backend_client=agent_backend_client, + http_cleanup_supported=False, + ) + + layer.on_event(GraphRunSucceededEvent(outputs={})) + + # No HTTP call goes out — the trap is avoided entirely. + assert agent_backend_client.request is None + assert agent_backend_client.wait_calls == [] + # Local row is still retired so a workflow loop cannot resume from stale state. + assert session_store.cleaned == [(_default_scope(), "agent-run-1")] + + +def test_cleanup_layer_skips_sessions_without_persisted_specs(): + """Backwards-compatible safety net: a row written before A.1 landed has + no composition_layer_specs, so cleanup would unavoidably hit the snapshot- + validation trap. The layer must skip such rows instead of issuing a + doomed request.""" + scope = _default_scope() + legacy_session = StoredWorkflowAgentSession( + scope=scope, + session_snapshot=CompositorSessionSnapshot(layers=[_layer_snapshot("history")]), + backend_run_id="legacy-run", + composition_layer_specs=[], + ) + session_store = FakeSessionStore(stored=[legacy_session]) + agent_backend_client = _WaitableFakeAgentBackendRunClient() + layer = _build_layer(session_store=session_store, agent_backend_client=agent_backend_client) + + layer.on_event(GraphRunSucceededEvent(outputs={})) + + assert agent_backend_client.request is None + assert session_store.cleaned == [] + + +def test_cleanup_layer_fans_out_to_every_active_session(): + scopes = [ + WorkflowAgentSessionScope( + tenant_id="tenant-1", + app_id="app-1", + workflow_id="workflow-1", + workflow_run_id="workflow-run-1", + node_id=f"agent-node-{i}", + node_execution_id=f"node-exec-{i}", + binding_id=f"binding-{i}", + agent_id=f"agent-{i}", + agent_config_snapshot_id=f"snapshot-{i}", + ) + for i in range(3) + ] + session_store = FakeSessionStore(stored=[_stored_session(scope, index=i) for i, scope in enumerate(scopes, 1)]) + agent_backend_client = _WaitableFakeAgentBackendRunClient(run_id="cleanup-run-many") + layer = _build_layer(session_store=session_store, agent_backend_client=agent_backend_client) + + layer.on_event(GraphRunSucceededEvent(outputs={})) + + # One cleanup row per stored ACTIVE session, all marked cleaned with the + # backend run id returned by the agent backend client. + assert [entry[0] for entry in session_store.cleaned] == scopes + assert {entry[1] for entry in session_store.cleaned} == {"cleanup-run-many"} + + +def test_cleanup_layer_warns_when_http_enabled_but_client_missing(caplog): + """The HTTP cleanup branch must defensively skip when no client was wired. + + This is the deployment-misconfig path: ``_HTTP_CLEANUP_SUPPORTED`` was + flipped to ``True`` but ``AGENT_BACKEND_BASE_URL`` is unset, so the + factory returned ``None``. The layer must not crash and must not silently + retire the row — the warning surfaces the misconfig. + """ + import logging + + session_store = FakeSessionStore() + layer = WorkflowAgentSessionCleanupLayer( + session_store=cast(WorkflowAgentRuntimeSessionStore, session_store), + request_builder=AgentBackendRunRequestBuilder(), + agent_backend_client=None, + ) + layer._HTTP_CLEANUP_SUPPORTED = True # type: ignore[reportPrivateUsage] + variable_pool = VariablePool.from_bootstrap( + system_variables=build_system_variables(workflow_execution_id="workflow-run-1"), + user_inputs={}, + conversation_variables=[], + ) + runtime_state = GraphRuntimeState(variable_pool=variable_pool, start_at=0.0) + layer.initialize(ReadOnlyGraphRuntimeStateWrapper(runtime_state), cast(CommandChannel, object())) + + with caplog.at_level(logging.WARNING): + layer.on_event(GraphRunSucceededEvent(outputs={})) + + assert session_store.cleaned == [] + assert any("no agent backend client is wired in" in record.message for record in caplog.records) + + +def test_cleanup_layer_skips_workflow_terminal_when_workflow_run_id_missing(caplog): + """``workflow_run_id`` is the keying field; without it the fanout cannot + target a row, so the layer logs a warning and bails.""" + import logging + + session_store = FakeSessionStore() + agent_backend_client = _WaitableFakeAgentBackendRunClient() + layer = WorkflowAgentSessionCleanupLayer( + session_store=cast(WorkflowAgentRuntimeSessionStore, session_store), + request_builder=AgentBackendRunRequestBuilder(), + agent_backend_client=agent_backend_client, + ) + # Bootstrap *without* a workflow_execution_id system variable. + variable_pool = VariablePool.from_bootstrap( + system_variables=build_system_variables(workflow_execution_id=""), + user_inputs={}, + conversation_variables=[], + ) + runtime_state = GraphRuntimeState(variable_pool=variable_pool, start_at=0.0) + layer.initialize(ReadOnlyGraphRuntimeStateWrapper(runtime_state), cast(CommandChannel, object())) + + with caplog.at_level(logging.WARNING): + layer.on_event(GraphRunSucceededEvent(outputs={})) + + assert session_store.list_calls == [] + assert session_store.cleaned == [] + assert any("workflow_run_id is missing" in record.message for record in caplog.records) + + +def test_build_workflow_agent_session_cleanup_layer_returns_layer_without_client_when_unconfigured( + monkeypatch, +): + """The production builder must pass ``None`` for the agent backend client + when neither AGENT_BACKEND_BASE_URL nor AGENT_BACKEND_USE_FAKE is set, so + that unit-test environments without backend config don't crash at runner + construction.""" + from configs import dify_config + from core.workflow.nodes.agent_v2.session_cleanup_layer import ( + build_workflow_agent_session_cleanup_layer, + ) + + monkeypatch.setattr(dify_config, "AGENT_BACKEND_BASE_URL", None, raising=False) + monkeypatch.setattr(dify_config, "AGENT_BACKEND_USE_FAKE", False, raising=False) + + layer = build_workflow_agent_session_cleanup_layer() + assert layer._agent_backend_client is None # type: ignore[reportPrivateUsage] + + +def test_build_workflow_agent_session_cleanup_layer_returns_layer_with_fake_client(monkeypatch): + """With ``AGENT_BACKEND_USE_FAKE`` enabled the helper wires in the + deterministic fake client without needing a base_url.""" + from clients.agent_backend.fake_client import FakeAgentBackendRunClient + from configs import dify_config + from core.workflow.nodes.agent_v2.session_cleanup_layer import ( + build_workflow_agent_session_cleanup_layer, + ) + + monkeypatch.setattr(dify_config, "AGENT_BACKEND_BASE_URL", None, raising=False) + monkeypatch.setattr(dify_config, "AGENT_BACKEND_USE_FAKE", True, raising=False) + monkeypatch.setattr(dify_config, "AGENT_BACKEND_FAKE_SCENARIO", "success", raising=False) + + layer = build_workflow_agent_session_cleanup_layer() + assert isinstance(layer._agent_backend_client, FakeAgentBackendRunClient) # type: ignore[reportPrivateUsage] diff --git a/api/tests/unit_tests/core/workflow/nodes/agent_v2/test_session_store.py b/api/tests/unit_tests/core/workflow/nodes/agent_v2/test_session_store.py new file mode 100644 index 0000000000..1c2d0d1301 --- /dev/null +++ b/api/tests/unit_tests/core/workflow/nodes/agent_v2/test_session_store.py @@ -0,0 +1,286 @@ +"""Unit tests for :mod:`core.workflow.nodes.agent_v2.session_store`. + +Uses the in-memory SQLite engine configured by the project conftest plus a +per-test ``CREATE TABLE`` so the real ORM round-trip exercises every store +method. Keeps the suite self-contained — no Postgres / Docker required — while +still hitting the actual ``session_factory`` code path that production uses. +""" + +from __future__ import annotations + +from collections.abc import Generator + +import pytest +from agenton.compositor import CompositorSessionSnapshot +from agenton.compositor.schemas import LayerSessionSnapshot +from agenton.layers.base import LifecycleState +from sqlalchemy import delete + +from clients.agent_backend.request_builder import CleanupLayerSpec +from core.db.session_factory import session_factory +from core.workflow.nodes.agent_v2.session_store import ( + StoredWorkflowAgentSession, + WorkflowAgentRuntimeSessionStore, + WorkflowAgentSessionScope, +) +from models.agent import WorkflowAgentRuntimeSession, WorkflowAgentRuntimeSessionStatus + + +def _scope(workflow_run_id: str | None = "wfr-1", binding_id: str = "binding-1") -> WorkflowAgentSessionScope: + return WorkflowAgentSessionScope( + tenant_id="tenant-1", + app_id="app-1", + workflow_id="workflow-1", + workflow_run_id=workflow_run_id, + node_id="agent-node", + node_execution_id="node-exec-1", + binding_id=binding_id, + agent_id="agent-1", + agent_config_snapshot_id="snapshot-1", + ) + + +def _snapshot(messages: int = 1) -> CompositorSessionSnapshot: + return CompositorSessionSnapshot( + layers=[ + LayerSessionSnapshot( + name="history", + lifecycle_state=LifecycleState.SUSPENDED, + runtime_state={"messages": [{"role": "user", "content": f"m{i}"} for i in range(messages)]}, + ) + ] + ) + + +def _specs() -> list[CleanupLayerSpec]: + return [ + CleanupLayerSpec(name="workflow_node_job_prompt", type="plain.prompt", config={"prefix": "ok"}), + CleanupLayerSpec(name="history", type="pydantic_ai.history"), + ] + + +@pytest.fixture(autouse=True) +def _create_table() -> Generator[None, None, None]: + """Create the lifecycle table on the in-memory SQLite engine, drop after.""" + engine = session_factory.get_session_maker().kw["bind"] + WorkflowAgentRuntimeSession.__table__.create(bind=engine, checkfirst=True) + yield + with session_factory.create_session() as session: + session.execute(delete(WorkflowAgentRuntimeSession)) + session.commit() + WorkflowAgentRuntimeSession.__table__.drop(bind=engine, checkfirst=True) + + +def test_load_active_snapshot_returns_none_when_scope_has_no_workflow_run_id(): + """``workflow_run_id`` is the keying column; no row can match without it.""" + store = WorkflowAgentRuntimeSessionStore() + assert store.load_active_snapshot(_scope(workflow_run_id=None)) is None + + +def test_load_active_snapshot_returns_none_when_no_row_matches(): + store = WorkflowAgentRuntimeSessionStore() + assert store.load_active_snapshot(_scope()) is None + + +def test_save_active_snapshot_creates_row_and_load_round_trips(): + store = WorkflowAgentRuntimeSessionStore() + snapshot = _snapshot(messages=2) + store.save_active_snapshot( + scope=_scope(), backend_run_id="run-1", snapshot=snapshot, composition_layer_specs=_specs() + ) + + loaded = store.load_active_snapshot(_scope()) + assert loaded is not None + assert len(loaded.layers) == 1 + assert loaded.layers[0].name == "history" + assert loaded.layers[0].runtime_state["messages"] == snapshot.layers[0].runtime_state["messages"] + + +def test_save_active_snapshot_skips_when_workflow_run_id_missing(): + """Without a workflow_run_id the row cannot be keyed; save is a no-op.""" + store = WorkflowAgentRuntimeSessionStore() + store.save_active_snapshot( + scope=_scope(workflow_run_id=None), + backend_run_id="run-skipped", + snapshot=_snapshot(), + composition_layer_specs=_specs(), + ) + with session_factory.create_session() as session: + assert session.query(WorkflowAgentRuntimeSession).count() == 0 + + +def test_save_active_snapshot_skips_when_snapshot_missing(): + """A run that produced no snapshot (e.g. failed agent run) does not write.""" + store = WorkflowAgentRuntimeSessionStore() + store.save_active_snapshot( + scope=_scope(), + backend_run_id="run-empty", + snapshot=None, + composition_layer_specs=_specs(), + ) + with session_factory.create_session() as session: + assert session.query(WorkflowAgentRuntimeSession).count() == 0 + + +def test_save_active_snapshot_updates_existing_row_on_re_entry(): + """A second save under the same scope must update in place, not insert.""" + store = WorkflowAgentRuntimeSessionStore() + store.save_active_snapshot( + scope=_scope(), + backend_run_id="run-1", + snapshot=_snapshot(messages=1), + composition_layer_specs=_specs(), + ) + # Second call with new snapshot + backend_run_id. + store.save_active_snapshot( + scope=_scope(), + backend_run_id="run-2", + snapshot=_snapshot(messages=2), + composition_layer_specs=_specs(), + ) + + with session_factory.create_session() as session: + rows = session.query(WorkflowAgentRuntimeSession).all() + assert len(rows) == 1 + assert rows[0].backend_run_id == "run-2" + assert rows[0].status == WorkflowAgentRuntimeSessionStatus.ACTIVE + assert rows[0].cleaned_at is None + + +def test_save_active_snapshot_resurrects_cleaned_row(): + """If a prior cleanup retired the row, a re-entry flips it back to ACTIVE.""" + store = WorkflowAgentRuntimeSessionStore() + store.save_active_snapshot( + scope=_scope(), + backend_run_id="run-1", + snapshot=_snapshot(), + composition_layer_specs=_specs(), + ) + store.mark_cleaned(scope=_scope(), backend_run_id="cleanup-1") + # Save again — the existing row was CLEANED; should be revived. + store.save_active_snapshot( + scope=_scope(), + backend_run_id="run-2", + snapshot=_snapshot(messages=3), + composition_layer_specs=_specs(), + ) + + with session_factory.create_session() as session: + rows = session.query(WorkflowAgentRuntimeSession).all() + assert len(rows) == 1 + assert rows[0].status == WorkflowAgentRuntimeSessionStatus.ACTIVE + assert rows[0].cleaned_at is None + assert rows[0].backend_run_id == "run-2" + + +def test_list_active_sessions_returns_specs_and_snapshot(): + store = WorkflowAgentRuntimeSessionStore() + store.save_active_snapshot( + scope=_scope(binding_id="binding-A"), + backend_run_id="run-A", + snapshot=_snapshot(), + composition_layer_specs=_specs(), + ) + store.save_active_snapshot( + scope=_scope(binding_id="binding-B"), + backend_run_id="run-B", + snapshot=_snapshot(messages=2), + composition_layer_specs=_specs(), + ) + + listed = store.list_active_sessions(workflow_run_id="wfr-1") + assert {s.backend_run_id for s in listed} == {"run-A", "run-B"} + by_run = {s.backend_run_id: s for s in listed} + assert isinstance(by_run["run-A"], StoredWorkflowAgentSession) + # Specs round-trip through pydantic TypeAdapter — ensure deserialize works. + assert by_run["run-A"].composition_layer_specs[0].name == "workflow_node_job_prompt" + assert by_run["run-A"].composition_layer_specs[1].type == "pydantic_ai.history" + # node_execution_id default-replaces NULL with "" when the DB column is None. + assert by_run["run-A"].scope.node_execution_id == "node-exec-1" + + +def test_list_active_sessions_skips_cleaned_rows(): + store = WorkflowAgentRuntimeSessionStore() + store.save_active_snapshot( + scope=_scope(binding_id="binding-A"), + backend_run_id="run-A", + snapshot=_snapshot(), + composition_layer_specs=_specs(), + ) + store.save_active_snapshot( + scope=_scope(binding_id="binding-B"), + backend_run_id="run-B", + snapshot=_snapshot(), + composition_layer_specs=_specs(), + ) + store.mark_cleaned(scope=_scope(binding_id="binding-A"), backend_run_id="cleanup-A") + + listed = store.list_active_sessions(workflow_run_id="wfr-1") + assert {s.backend_run_id for s in listed} == {"run-B"} + + +def test_list_active_sessions_handles_legacy_rows_without_specs(): + """Rows persisted before composition_layer_specs landed have an empty string.""" + # Insert a legacy-shape row directly: empty specs payload simulates a row + # written before the spec persistence feature landed in A.1. + store = WorkflowAgentRuntimeSessionStore() + store.save_active_snapshot( + scope=_scope(), + backend_run_id="run-legacy", + snapshot=_snapshot(), + composition_layer_specs=[], + ) + listed = store.list_active_sessions(workflow_run_id="wfr-1") + assert len(listed) == 1 + assert listed[0].composition_layer_specs == [] + + +def test_mark_cleaned_sets_status_and_cleaned_at_with_backend_run_id(): + store = WorkflowAgentRuntimeSessionStore() + store.save_active_snapshot( + scope=_scope(), + backend_run_id="run-1", + snapshot=_snapshot(), + composition_layer_specs=_specs(), + ) + store.mark_cleaned(scope=_scope(), backend_run_id="cleanup-1") + + with session_factory.create_session() as session: + row = session.query(WorkflowAgentRuntimeSession).one() + assert row.status == WorkflowAgentRuntimeSessionStatus.CLEANED + assert row.cleaned_at is not None + assert row.backend_run_id == "cleanup-1" + + +def test_mark_cleaned_preserves_existing_backend_run_id_when_none_given(): + """``backend_run_id=None`` means "leave the previous one in place".""" + store = WorkflowAgentRuntimeSessionStore() + store.save_active_snapshot( + scope=_scope(), + backend_run_id="run-1", + snapshot=_snapshot(), + composition_layer_specs=_specs(), + ) + store.mark_cleaned(scope=_scope(), backend_run_id=None) + + with session_factory.create_session() as session: + row = session.query(WorkflowAgentRuntimeSession).one() + assert row.status == WorkflowAgentRuntimeSessionStatus.CLEANED + assert row.backend_run_id == "run-1" + + +def test_mark_cleaned_is_a_noop_when_no_active_row(): + """No matching ACTIVE row → no-op (already-cleaned rows are not re-touched).""" + store = WorkflowAgentRuntimeSessionStore() + store.mark_cleaned(scope=_scope(), backend_run_id="cleanup-1") + with session_factory.create_session() as session: + assert session.query(WorkflowAgentRuntimeSession).count() == 0 + + +def test_mark_cleaned_is_a_noop_when_workflow_run_id_missing(): + """Without a workflow_run_id we cannot key the row; ignore the call.""" + store = WorkflowAgentRuntimeSessionStore() + store.mark_cleaned(scope=_scope(workflow_run_id=None), backend_run_id="cleanup-1") + # Sanity — no rows created or touched. + with session_factory.create_session() as session: + assert session.query(WorkflowAgentRuntimeSession).count() == 0