diff --git a/dify-agent/docs/agenton/api/index.md b/dify-agent/docs/agenton/api/index.md index aa90c8b9fa..c0a864665a 100644 --- a/dify-agent/docs/agenton/api/index.md +++ b/dify-agent/docs/agenton/api/index.md @@ -29,6 +29,9 @@ Construction and dependency APIs: - `bind_deps(deps: Mapping[str, Layer | None]) -> None`: bind graph dependencies. - `new_control(state=LifecycleState.NEW, runtime_state=None) -> LayerControl`: create a schema-validated per-session control. +- `require_control(control, active=False) -> LayerControl`: validate that a + capability method received this layer's own control with the expected runtime + schemas, optionally requiring `LifecycleState.ACTIVE`. Lifecycle hooks: @@ -37,7 +40,9 @@ Lifecycle hooks: - `on_context_suspend(control)` - `on_context_delete(control)` - `enter(control)` / `lifecycle_enter(control)`: async context manager entry - surface. Override `enter()` only when a layer needs to wrap extra resources. + surface. The base lifecycle owns the per-entry resource stack; override + `enter()` only for unusual wrapping that cannot be expressed as registered + resources. Prompt/tool authoring surfaces: @@ -67,6 +72,10 @@ Methods: - `suspend_on_exit() -> None` - `delete_on_exit() -> None` +- `enter_async_resource(cm) -> T`: enter an async context manager on the current + entry resource stack and return its resource. +- `add_async_cleanup(callback) -> None`: register an async cleanup callback on the + current entry resource stack. - `control_for(dep_layer) -> LayerControl`: resolve the unique dependency control whose resolved target is `dep_layer` in the same session. - `control_for(dep_name, dep_layer) -> LayerControl`: resolve a named dependency @@ -74,7 +83,9 @@ Methods: `runtime_state` is serialized in session snapshots. `runtime_handles` is never serialized and should be rehydrated from runtime state in resume hooks. Private -owner links used by `control_for` are runtime-only and are not snapshotted. +owner links used by `control_for` and the per-entry resource stack are +runtime-only and are not snapshotted. Resource-stack APIs are available only +while a layer entry is being created/resumed, active, or exiting. ### Schema defaults and lifecycle enums diff --git a/dify-agent/docs/agenton/guide/index.md b/dify-agent/docs/agenton/guide/index.md index 89d38f77f0..39b7febfb8 100644 --- a/dify-agent/docs/agenton/guide/index.md +++ b/dify-agent/docs/agenton/guide/index.md @@ -16,7 +16,9 @@ on the `LayerControl` created for that layer in a `CompositorSession`. - **Runtime handles** are live Python objects such as clients, open files, or process handles. Layers declare a Pydantic `runtime_handles_type` with `arbitrary_types_allowed=True`. Handles are never serialized; resume hooks - should rehydrate them from runtime state. + should rehydrate them from runtime state. Register handles that need async + cleanup with the control's entry resource stack rather than closing them + manually in layer instances. ## Define a config-backed layer @@ -49,6 +51,42 @@ Omitted schema slots default to `EmptyLayerConfig`, `EmptyRuntimeState`, and `LayerControl[MyState, MyHandles]` to get static checking and IDE completion for runtime state and handles. +## Live resources + +The base lifecycle creates a resource stack for each `LayerControl` entry before +`on_context_create` or `on_context_resume` runs. Enter async resources through the +control, store the live handle in `runtime_handles`, and clear the handle in +`on_context_suspend`/`on_context_delete`; the resource stack performs the actual +close after those hooks and also cleans up if create/resume or the context body +raises. + +```python {test="skip" lint="skip"} +class ClientHandles(BaseModel): + client: httpx.AsyncClient | None = None + + model_config = ConfigDict(arbitrary_types_allowed=True) + + +@dataclass +class ClientLayer(PlainLayer[NoLayerDeps, EmptyLayerConfig, EmptyRuntimeState, ClientHandles]): + async def on_context_create(self, control: LayerControl[EmptyRuntimeState, ClientHandles]) -> None: + control.runtime_handles.client = await control.enter_async_resource(httpx.AsyncClient()) + + async def on_context_delete(self, control: LayerControl[EmptyRuntimeState, ClientHandles]) -> None: + control.runtime_handles.client = None + + def make_client_user(self, control: LayerControl) -> ClientUser: + control = self.require_control(control, active=True) + if control.runtime_handles.client is None: + raise RuntimeError("client is not available") + return ClientUser(control.runtime_handles.client) +``` + +`Layer.require_control(control, active=True)` is the recommended first line for +capability methods that read runtime state or handles. It verifies that callers +passed this layer's own control from the current session and, when requested, that +the control is active. + ## Register layers and build a compositor Register config-constructible layers manually: diff --git a/dify-agent/docs/dify-agent/api/index.md b/dify-agent/docs/dify-agent/api/index.md index a03f2e6eaa..0ceea8b9c3 100644 --- a/dify-agent/docs/dify-agent/api/index.md +++ b/dify-agent/docs/dify-agent/api/index.md @@ -14,7 +14,9 @@ server-only and should not be used by API consumers. Create-run requests accept a `CompositorConfig` and an optional `CompositorSessionSnapshot`. There is **no top-level `user_prompt` or model profile field**. User input and model/provider selection are supplied by Agenton -layers. In the MVP server, the safe config-constructible layer registry includes +layers. `layer_exit_signals` optionally controls whether layers suspend or delete +when the run leaves the active session; the default is suspend for all layers. In +the MVP server, the safe config-constructible layer registry includes `plain.prompt`, `dify.plugin`, and `dify.plugin.llm`. The runtime reads the LLM model layer named by `DIFY_AGENT_MODEL_LAYER_ID`, whose public value is `"llm"`. @@ -62,7 +64,7 @@ Request: "plugin": "plugin" }, "config": { - "provider": "openai", + "model_provider": "openai", "model": "gpt-4o-mini", "credentials": { "api_key": "replace-with-provider-key" @@ -74,7 +76,13 @@ Request: } ] }, - "session_snapshot": null + "session_snapshot": null, + "layer_exit_signals": { + "default": "suspend", + "layers": { + "prompt": "delete" + } + } } ``` @@ -94,7 +102,8 @@ event streams expire after `DIFY_AGENT_RUN_RETENTION_SECONDS`, which defaults to `dify.plugin` receives tenant/plugin identity only; daemon URL, API key, and timeout are server settings. `dify.plugin.llm.credentials` accepts scalar values -only (`string`, `number`, `boolean`, or `null`). +only (`string`, `number`, `boolean`, or `null`). Unknown +`layer_exit_signals.layers` keys return `422` before a run record is created. Validation error example (`422`): @@ -191,7 +200,7 @@ from agenton.compositor import CompositorConfig, LayerNodeConfig from agenton_collections.layers.plain import PromptLayerConfig from dify_agent.client import Client from dify_agent.layers.dify_plugin import DifyPluginLLMLayerConfig, DifyPluginLayerConfig -from dify_agent.protocol import DIFY_AGENT_MODEL_LAYER_ID, CreateRunRequest +from dify_agent.protocol import DIFY_AGENT_MODEL_LAYER_ID, CreateRunRequest, LayerExitSignals async def main() -> None: @@ -209,13 +218,14 @@ async def main() -> None: type="dify.plugin.llm", deps={"plugin": "plugin"}, config=DifyPluginLLMLayerConfig( - provider="openai", + model_provider="openai", model="gpt-4o-mini", credentials={"api_key": "provider-key"}, ), ), ] - ) + ), + layer_exit_signals=LayerExitSignals(layers={"prompt": "delete"}), ) async with Client(base_url="http://localhost:8000") as client: run = await client.create_run(request) @@ -245,7 +255,7 @@ request = CreateRunRequest( type="dify.plugin.llm", deps={"plugin": "plugin"}, config=DifyPluginLLMLayerConfig( - provider="openai", + model_provider="openai", model="gpt-4o-mini", credentials={"api_key": "provider-key"}, ), diff --git a/dify-agent/examples/dify_agent/dify_agent_examples/run_pydantic_ai_agent.py b/dify-agent/examples/dify_agent/dify_agent_examples/run_pydantic_ai_agent.py index 4df6b1fa77..2783b7f860 100644 --- a/dify-agent/examples/dify_agent/dify_agent_examples/run_pydantic_ai_agent.py +++ b/dify-agent/examples/dify_agent/dify_agent_examples/run_pydantic_ai_agent.py @@ -61,10 +61,10 @@ async def main() -> None: DifyPluginDaemonProvider( tenant_id=required_env("DIFY_AGENT_TENANT_ID"), plugin_id=required_env("DIFY_AGENT_PLUGIN_ID"), - plugin_provider=required_env("DIFY_AGENT_PROVIDER"), plugin_daemon_url=required_env("PLUGIN_DAEMON_URL"), plugin_daemon_api_key=required_env("PLUGIN_DAEMON_KEY"), ), + model_provider=required_env("DIFY_AGENT_PROVIDER"), credentials=load_credentials(), ) agent = Agent(model=model) diff --git a/dify-agent/examples/dify_agent/dify_agent_examples/run_server_consumer.py b/dify-agent/examples/dify_agent/dify_agent_examples/run_server_consumer.py index 694fa09e7f..33e1c11da5 100644 --- a/dify-agent/examples/dify_agent/dify_agent_examples/run_server_consumer.py +++ b/dify-agent/examples/dify_agent/dify_agent_examples/run_server_consumer.py @@ -56,7 +56,7 @@ async def main() -> None: type="dify.plugin.llm", deps={"plugin": "plugin"}, config=DifyPluginLLMLayerConfig( - provider=PLUGIN_PROVIDER, + model_provider=PLUGIN_PROVIDER, model=MODEL_NAME, credentials=MODEL_CREDENTIALS, ), diff --git a/dify-agent/examples/dify_agent/dify_agent_examples/run_server_sync_client.py b/dify-agent/examples/dify_agent/dify_agent_examples/run_server_sync_client.py index 92678a7713..d00bb4dc76 100644 --- a/dify-agent/examples/dify_agent/dify_agent_examples/run_server_sync_client.py +++ b/dify-agent/examples/dify_agent/dify_agent_examples/run_server_sync_client.py @@ -48,7 +48,7 @@ def main() -> None: type="dify.plugin.llm", deps={"plugin": "plugin"}, config=DifyPluginLLMLayerConfig( - provider=PLUGIN_PROVIDER, + model_provider=PLUGIN_PROVIDER, model=MODEL_NAME, credentials=MODEL_CREDENTIALS, ), diff --git a/dify-agent/src/agenton/compositor/__init__.py b/dify-agent/src/agenton/compositor/__init__.py index 72588758a0..aed487332c 100644 --- a/dify-agent/src/agenton/compositor/__init__.py +++ b/dify-agent/src/agenton/compositor/__init__.py @@ -264,6 +264,14 @@ class CompositorSession: return self._control_for_unique_dependency(owner_layer_id, dep_layer) return self._control_for_named_dependency(owner_layer_id, dep_name, dep_layer) + def _layer_for_control_owner(self, owner_layer_id: str) -> Layer[Any, Any, Any, Any, Any, Any, Any]: + """Return the layer instance that owns a control in this session.""" + compositor = self._require_owner_compositor() + try: + return compositor.layers[owner_layer_id] + except KeyError as e: + raise KeyError(f"Layer '{owner_layer_id}' is not defined in this compositor.") from e + def _control_for_unique_dependency( self, owner_layer_id: str, diff --git a/dify-agent/src/agenton/layers/base.py b/dify-agent/src/agenton/layers/base.py index 8fbac67a76..7ec761636c 100644 --- a/dify-agent/src/agenton/layers/base.py +++ b/dify-agent/src/agenton/layers/base.py @@ -28,7 +28,10 @@ resets to delete on every successful enter. Layer instances are shared graph and capability definitions, so session-local serializable ids, checkpoints, and other snapshot data belong in ``LayerControl.runtime_state``; live clients, connections, and process handles belong in ``LayerControl.runtime_handles``. -Neither category should be stored on ``self`` when it is session-local. +Neither category should be stored on ``self`` when it is session-local. Live +resources that need deterministic cleanup should be registered on the control's +per-entry resource stack; the base lifecycle closes that stack after suspend and +delete hooks, and also when create/resume fails. ``Layer`` is framework-neutral over system prompt, user prompt, and tool item types. The native ``prefix_prompts``, ``suffix_prompts``, ``user_prompts``, and @@ -39,8 +42,8 @@ native values without changing layer implementations. """ from abc import ABC, abstractmethod -from collections.abc import AsyncIterator -from contextlib import AbstractAsyncContextManager, asynccontextmanager +from collections.abc import AsyncIterator, Awaitable, Callable +from contextlib import AbstractAsyncContextManager, AsyncExitStack, asynccontextmanager from dataclasses import dataclass, field from enum import StrEnum from types import UnionType @@ -89,6 +92,7 @@ _RuntimeStateT = TypeVar("_RuntimeStateT", bound=BaseModel, default="EmptyRuntim _RuntimeHandlesT = TypeVar("_RuntimeHandlesT", bound=BaseModel, default="EmptyRuntimeHandles") _DepRuntimeStateT = TypeVar("_DepRuntimeStateT", bound=BaseModel) _DepRuntimeHandlesT = TypeVar("_DepRuntimeHandlesT", bound=BaseModel) +_ResourceT = TypeVar("_ResourceT") class _LayerControlOwnerSession(Protocol): @@ -101,6 +105,8 @@ class _LayerControlOwnerSession(Protocol): dep_layer: "Layer[Any, Any, Any, Any, Any, _DepRuntimeStateT, _DepRuntimeHandlesT]", ) -> "LayerControl[_DepRuntimeStateT, _DepRuntimeHandlesT]": ... + def _layer_for_control_owner(self, owner_layer_id: str) -> "Layer[Any, Any, Any, Any, Any, Any, Any]": ... + class LayerDeps: """Typed dependency container for a Layer. @@ -201,7 +207,9 @@ class LayerControl(Generic[_RuntimeStateT, _RuntimeHandlesT]): handles are not serialized in snapshots and should be rehydrated from runtime state in resume hooks. A compositor also binds private owner metadata so ``control_for`` can find controls for this layer's dependencies in the - same session; those links are runtime-only and not part of snapshots. + same session; those links are runtime-only and not part of snapshots. The + per-entry resource stack is also runtime-only and exists only while the layer + is entering, active, or exiting. """ state: LifecycleState = LifecycleState.NEW @@ -210,6 +218,7 @@ class LayerControl(Generic[_RuntimeStateT, _RuntimeHandlesT]): runtime_handles: _RuntimeHandlesT = field(default_factory=lambda: cast(_RuntimeHandlesT, EmptyRuntimeHandles())) _owner_session: _LayerControlOwnerSession | None = field(default=None, init=False, repr=False, compare=False) _owner_layer_id: str | None = field(default=None, init=False, repr=False, compare=False) + _entry_stack: AsyncExitStack | None = field(default=None, init=False, repr=False, compare=False) def suspend_on_exit(self) -> None: """Request suspend behavior when the current layer entry exits.""" @@ -219,6 +228,19 @@ class LayerControl(Generic[_RuntimeStateT, _RuntimeHandlesT]): """Request delete behavior when the current layer entry exits.""" self.exit_intent = ExitIntent.DELETE + async def enter_async_resource(self, cm: AbstractAsyncContextManager[_ResourceT]) -> _ResourceT: + """Enter ``cm`` on this control's current entry resource stack. + + Resource registration is available only while a layer entry is in + progress or active. The base lifecycle closes registered resources after + suspend/delete hooks and when create/resume fails. + """ + return await self._require_entry_stack().enter_async_context(cm) + + def add_async_cleanup(self, callback: Callable[[], Awaitable[None]]) -> None: + """Register an async cleanup callback on the current entry resource stack.""" + self._require_entry_stack().push_async_callback(callback) + @overload def control_for( self, @@ -270,6 +292,22 @@ class LayerControl(Generic[_RuntimeStateT, _RuntimeHandlesT]): self._owner_session = session self._owner_layer_id = layer_id + def _require_entry_stack(self) -> AsyncExitStack: + if self._entry_stack is None: + raise RuntimeError("LayerControl entry resource stack is not active.") + return self._entry_stack + + def _begin_entry_stack(self) -> None: + if self._entry_stack is not None: + raise RuntimeError("LayerControl entry resource stack is already active.") + self._entry_stack = AsyncExitStack() + + async def _close_entry_stack(self) -> None: + stack = self._entry_stack + self._entry_stack = None + if stack is not None: + await stack.aclose() + @dataclass(frozen=True, slots=True) class LayerDepSpec: @@ -385,44 +423,115 @@ class Layer( resolved_deps[name] = deps[name] self.deps = self.deps_type(**resolved_deps) + def require_control( + self, + control: LayerControl[Any, Any], + *, + active: bool = False, + ) -> LayerControl[_RuntimeStateT, _RuntimeHandlesT]: + """Validate and return ``control`` as this layer's current session control. + + Capability methods should accept their own layer control explicitly and + call this helper before reading runtime state or handles. The control must + be attached to a compositor session whose owner layer id resolves to this + exact layer instance. Runtime state and handle schemas are checked against + the layer's declared schema types; when ``active`` is true, the lifecycle + state must be ``LifecycleState.ACTIVE``. + """ + if control._owner_session is None or control._owner_layer_id is None: + raise RuntimeError("LayerControl is not attached to a compositor session.") + try: + owner_layer = control._owner_session._layer_for_control_owner(control._owner_layer_id) + except KeyError as e: + raise RuntimeError( + f"LayerControl owner layer '{control._owner_layer_id}' is not defined in its compositor." + ) from e + if owner_layer is not self: + raise RuntimeError( + f"LayerControl belongs to layer '{control._owner_layer_id}', not this {type(self).__name__} instance." + ) + if not isinstance(control.runtime_state, self.runtime_state_type): + raise TypeError( + f"{type(self).__name__} control runtime_state must be {self.runtime_state_type.__name__}, " + f"got {type(control.runtime_state).__name__}." + ) + if not isinstance(control.runtime_handles, self.runtime_handles_type): + raise TypeError( + f"{type(self).__name__} control runtime_handles must be {self.runtime_handles_type.__name__}, " + f"got {type(control.runtime_handles).__name__}." + ) + if active and control.state is not LifecycleState.ACTIVE: + raise RuntimeError( + f"{type(self).__name__} requires an active LayerControl; current state is {control.state.value}." + ) + return cast(LayerControl[_RuntimeStateT, _RuntimeHandlesT], control) + def enter(self, control: LayerControl[_RuntimeStateT, _RuntimeHandlesT]) -> AbstractAsyncContextManager[None]: """Return the layer's async entry context manager. ``control`` is the lifecycle control slot for this entry. Subclasses can - override this to wrap extra async resources around - ``self.lifecycle_enter(control)``. + override this for unusual wrapping, but ordinary live resources should be + registered with ``control.enter_async_resource`` from lifecycle hooks. """ return self.lifecycle_enter(control) @asynccontextmanager async def lifecycle_enter(self, control: LayerControl[_RuntimeStateT, _RuntimeHandlesT]) -> AsyncIterator[None]: - """Run the default explicit lifecycle state machine for one entry.""" - if control.state is LifecycleState.NEW: - control.exit_intent = ExitIntent.DELETE - await self.on_context_create(control) - control.state = LifecycleState.ACTIVE - elif control.state is LifecycleState.SUSPENDED: - control.exit_intent = ExitIntent.DELETE - await self.on_context_resume(control) - control.state = LifecycleState.ACTIVE - elif control.state is LifecycleState.ACTIVE: + """Run the default explicit lifecycle and resource stack for one entry. + + Exit state is recorded even when suspend/delete hooks fail because the + active entry is over once hook cleanup begins. + """ + if control.state is LifecycleState.ACTIVE: raise RuntimeError( "LayerControl is already active; duplicate or nested enter is not allowed." ) - elif control.state is LifecycleState.CLOSED: + if control.state is LifecycleState.CLOSED: raise RuntimeError( "LayerControl is closed; create a new compositor session before entering again." ) + control._begin_entry_stack() + try: + if control.state is LifecycleState.NEW: + control.exit_intent = ExitIntent.DELETE + await self.on_context_create(control) + control.state = LifecycleState.ACTIVE + elif control.state is LifecycleState.SUSPENDED: + control.exit_intent = ExitIntent.DELETE + await self.on_context_resume(control) + control.state = LifecycleState.ACTIVE + except BaseException: + await control._close_entry_stack() + raise + try: yield finally: + hook_error: BaseException | None = None if control.exit_intent is ExitIntent.SUSPEND: - await self.on_context_suspend(control) - control.state = LifecycleState.SUSPENDED + try: + await self.on_context_suspend(control) + except BaseException as exc: + hook_error = exc + finally: + control.state = LifecycleState.SUSPENDED else: - await self.on_context_delete(control) - control.state = LifecycleState.CLOSED + try: + await self.on_context_delete(control) + except BaseException as exc: + hook_error = exc + finally: + control.state = LifecycleState.CLOSED + + try: + await control._close_entry_stack() + except BaseException: + if hook_error is not None: + raise hook_error + raise + if hook_error is not None: + raise hook_error async def on_context_create(self, control: LayerControl[_RuntimeStateT, _RuntimeHandlesT]) -> None: """Run when the layer context is entered from ``LifecycleState.NEW``.""" diff --git a/dify-agent/src/dify_agent/adapters/llm/model.py b/dify-agent/src/dify_agent/adapters/llm/model.py index 5723253177..34ee515ee5 100644 --- a/dify-agent/src/dify_agent/adapters/llm/model.py +++ b/dify-agent/src/dify_agent/adapters/llm/model.py @@ -88,11 +88,12 @@ class _DifyRequestInput: @dataclass(slots=True) class DifyLLMAdapterModel(Model[DifyPluginDaemonLLMClient]): - """Use a Dify plugin-daemon LLM provider as a Pydantic AI model.""" + """Use a Dify plugin-daemon transport plus request-level model identity.""" model: str daemon_provider: DifyPluginDaemonProvider _: KW_ONLY + model_provider: str credentials: dict[str, object] = field(default_factory=dict, repr=False) model_profile: InitVar[ModelProfileSpec | None] = None model_settings: InitVar[ModelSettings | None] = None @@ -140,6 +141,7 @@ class DifyLLMAdapterModel(Model[DifyPluginDaemonLLMClient]): response = DifyStreamedResponse( model_request_parameters=prepared_params, chunks=self.daemon_provider.client.iter_llm_result_chunks( + provider=self.model_provider, model=self.model_name, credentials=request_input.credentials, prompt_messages=request_input.prompt_messages, @@ -175,6 +177,7 @@ class DifyLLMAdapterModel(Model[DifyPluginDaemonLLMClient]): yield DifyStreamedResponse( model_request_parameters=prepared_params, chunks=self.daemon_provider.client.iter_llm_result_chunks( + provider=self.model_provider, model=self.model_name, credentials=request_input.credentials, prompt_messages=request_input.prompt_messages, diff --git a/dify-agent/src/dify_agent/adapters/llm/provider.py b/dify-agent/src/dify_agent/adapters/llm/provider.py index a8e6539611..6e7b92f646 100644 --- a/dify-agent/src/dify_agent/adapters/llm/provider.py +++ b/dify-agent/src/dify_agent/adapters/llm/provider.py @@ -1,4 +1,10 @@ -"""Dify plugin-daemon provider for Pydantic AI LLM adapters.""" +"""Dify plugin-daemon provider for Pydantic AI LLM adapters. + +The Pydantic AI provider represents daemon/plugin transport identity. Business +model provider names such as ``openai`` are request-level model identity and are +passed by ``DifyLLMAdapterModel`` for each invocation instead of being stored on +this provider. +""" from __future__ import annotations @@ -27,11 +33,12 @@ class PluginDaemonBasicResponse(BaseModel): @dataclass(slots=True) class DifyPluginDaemonLLMClient: + """HTTP client wrapper for plugin-daemon LLM dispatch requests.""" + plugin_daemon_url: str plugin_daemon_api_key: str tenant_id: str plugin_id: str - provider: str user_id: str | None http_client: httpx.AsyncClient = field(repr=False) @@ -41,6 +48,7 @@ class DifyPluginDaemonLLMClient: async def iter_llm_result_chunks( self, *, + provider: str, model: str, credentials: dict[str, object], prompt_messages: list[PromptMessage], @@ -53,7 +61,7 @@ class DifyPluginDaemonLLMClient: model_name=model, path=f"plugin/{self.tenant_id}/dispatch/llm/invoke", request_data={ - "provider": self.provider, + "provider": provider, "model_type": "llm", "model": model, "credentials": credentials, @@ -130,6 +138,9 @@ class DifyPluginDaemonLLMClient: class DifyPluginDaemonProvider(Provider[DifyPluginDaemonLLMClient]): """Pydantic AI provider for Dify plugin-daemon dispatch requests. + The provider ``name`` identifies the daemon/plugin context. The business LLM + provider is supplied by each adapter model request so one daemon provider can + serve different model-provider selections without mutating transport state. When ``http_client`` is omitted the provider owns an ``AsyncClient`` and the Pydantic AI provider context manager closes it. When an external client is supplied, ownership stays with the caller and provider exit leaves it open. @@ -137,7 +148,6 @@ class DifyPluginDaemonProvider(Provider[DifyPluginDaemonLLMClient]): tenant_id: str plugin_id: str - plugin_provider: str plugin_daemon_url: str plugin_daemon_api_key: str = field(repr=False) user_id: str | None = None @@ -162,7 +172,6 @@ class DifyPluginDaemonProvider(Provider[DifyPluginDaemonLLMClient]): plugin_daemon_api_key=self.plugin_daemon_api_key, tenant_id=self.tenant_id, plugin_id=self.plugin_id, - provider=self.plugin_provider, user_id=self.user_id, http_client=http_client, ) @@ -177,7 +186,7 @@ class DifyPluginDaemonProvider(Provider[DifyPluginDaemonLLMClient]): @property @override def name(self) -> str: - return f"DifyPlugin/{self.plugin_provider}" + return f"DifyPlugin/{self.plugin_id}" @property @override diff --git a/dify-agent/src/dify_agent/layers/dify_plugin/configs.py b/dify-agent/src/dify_agent/layers/dify_plugin/configs.py index 949dac8c08..42790f9605 100644 --- a/dify-agent/src/dify_agent/layers/dify_plugin/configs.py +++ b/dify-agent/src/dify_agent/layers/dify_plugin/configs.py @@ -28,9 +28,9 @@ class DifyPluginLayerConfig(LayerConfig): class DifyPluginLLMLayerConfig(LayerConfig): - """Public config for selecting a Dify plugin LLM model.""" + """Public config for selecting a business provider/model from a plugin.""" - provider: str + model_provider: str model: str credentials: dict[str, DifyPluginCredentialValue] = Field(default_factory=dict) model_settings: ModelSettings | None = None diff --git a/dify-agent/src/dify_agent/layers/dify_plugin/llm_layer.py b/dify-agent/src/dify_agent/layers/dify_plugin/llm_layer.py index 69ba9a5ae3..5932d69cb3 100644 --- a/dify-agent/src/dify_agent/layers/dify_plugin/llm_layer.py +++ b/dify-agent/src/dify_agent/layers/dify_plugin/llm_layer.py @@ -3,7 +3,9 @@ This layer owns model capability resolution for Dify plugin-backed LLMs. It depends on ``DifyPluginLayer`` for daemon access, resolves that dependency's control from its own ``LayerControl``, and returns a Pydantic AI model adapter -configured from the public LLM layer DTO. +configured from the public LLM layer DTO. The daemon provider carries plugin +transport identity; the DTO's ``model_provider`` is passed to the adapter as +request-level model identity. """ from dataclasses import dataclass @@ -38,11 +40,13 @@ class DifyPluginLLMLayer(PlainLayer[DifyPluginLLMDeps, DifyPluginLLMLayerConfig] def get_model(self, control: LayerControl[EmptyRuntimeState, EmptyRuntimeHandles]) -> DifyLLMAdapterModel: """Return the configured model using the current session's plugin control.""" + control = self.require_control(control, active=True) plugin_control = control.control_for(self.deps.plugin) - provider = self.deps.plugin.get_provider(plugin_control, plugin_provider=self.config.provider) + provider = self.deps.plugin.get_daemon_provider(plugin_control) return DifyLLMAdapterModel( model=self.config.model, daemon_provider=provider, + model_provider=self.config.model_provider, credentials=dict(self.config.credentials), model_settings=self.config.model_settings, ) diff --git a/dify-agent/src/dify_agent/layers/dify_plugin/plugin_layer.py b/dify-agent/src/dify_agent/layers/dify_plugin/plugin_layer.py index 5e0ff65489..b8d42f63b1 100644 --- a/dify-agent/src/dify_agent/layers/dify_plugin/plugin_layer.py +++ b/dify-agent/src/dify_agent/layers/dify_plugin/plugin_layer.py @@ -3,8 +3,10 @@ The public config identifies tenant/plugin/user context only. Plugin daemon URL, API key, and timeout are server-side dependencies injected by the layer registry factory. Each active compositor entry owns an HTTP client in ``LayerControl`` -runtime handles; callers pass the control explicitly to ``get_provider`` so -shared layer instances never store or discover session-local clients implicitly. +runtime handles and registers it on the control's resource stack. Callers pass +the control explicitly to ``get_daemon_provider`` so shared layer instances never +store or discover session-local clients implicitly. Business model-provider names +belong to the LLM layer/model request, not this daemon context layer. """ from dataclasses import dataclass @@ -13,7 +15,7 @@ import httpx from pydantic import BaseModel, ConfigDict from typing_extensions import Self, override -from agenton.layers import EmptyRuntimeState, LayerControl, LifecycleState, NoLayerDeps, PlainLayer +from agenton.layers import EmptyRuntimeState, LayerControl, NoLayerDeps, PlainLayer from dify_agent.adapters.llm import DifyPluginDaemonProvider from dify_agent.layers.dify_plugin.configs import DifyPluginLayerConfig @@ -56,25 +58,25 @@ class DifyPluginLayer(PlainLayer[NoLayerDeps, DifyPluginLayerConfig, EmptyRuntim """Create a plugin layer from public config plus server-only daemon settings.""" return cls(config=config, daemon_url=daemon_url, daemon_api_key=daemon_api_key, timeout=timeout) - def get_provider( + def get_daemon_provider( self, control: LayerControl[EmptyRuntimeState, DifyPluginRuntimeHandles], - *, - plugin_provider: str, ) -> DifyPluginDaemonProvider: - """Return a provider backed by ``control``'s active HTTP client. + """Return a daemon provider backed by ``control``'s active HTTP client. Raises: RuntimeError: if ``control`` is not active or its HTTP client is absent/closed. """ + control = self.require_control(control, active=True) client = control.runtime_handles.http_client - if control.state is not LifecycleState.ACTIVE or client is None or client.is_closed: - raise RuntimeError("DifyPluginLayer.get_provider() requires an entered control with an open HTTP client.") + if client is None or client.is_closed: + raise RuntimeError( + "DifyPluginLayer.get_daemon_provider() requires an entered control with an open HTTP client." + ) return DifyPluginDaemonProvider( tenant_id=self.config.tenant_id, plugin_id=self.config.plugin_id, - plugin_provider=plugin_provider, plugin_daemon_url=self.daemon_url, plugin_daemon_api_key=self.daemon_api_key, user_id=self.config.user_id, @@ -101,23 +103,18 @@ class DifyPluginLayer(PlainLayer[NoLayerDeps, DifyPluginLayerConfig, EmptyRuntim self, control: LayerControl[EmptyRuntimeState, DifyPluginRuntimeHandles], ) -> None: - await self._close_http_client(control) + control.runtime_handles.http_client = None @override async def on_context_delete( self, control: LayerControl[EmptyRuntimeState, DifyPluginRuntimeHandles], ) -> None: - await self._close_http_client(control) + control.runtime_handles.http_client = None async def _open_http_client(self, control: LayerControl[EmptyRuntimeState, DifyPluginRuntimeHandles]) -> None: - if control.runtime_handles.http_client is None or control.runtime_handles.http_client.is_closed: - control.runtime_handles.http_client = httpx.AsyncClient(timeout=self.timeout, trust_env=False) - - async def _close_http_client(self, control: LayerControl[EmptyRuntimeState, DifyPluginRuntimeHandles]) -> None: - client = control.runtime_handles.http_client - control.runtime_handles.http_client = None - if client is not None: - await client.aclose() + control.runtime_handles.http_client = await control.enter_async_resource( + httpx.AsyncClient(timeout=self.timeout, trust_env=False) + ) __all__ = ["DifyPluginLayer", "DifyPluginRuntimeHandles"] diff --git a/dify-agent/src/dify_agent/protocol/__init__.py b/dify-agent/src/dify_agent/protocol/__init__.py index 6568977386..2feec28be9 100644 --- a/dify-agent/src/dify_agent/protocol/__init__.py +++ b/dify-agent/src/dify_agent/protocol/__init__.py @@ -7,6 +7,7 @@ from .schemas import ( CreateRunRequest, CreateRunResponse, EmptyRunEventData, + LayerExitSignals, PydanticAIStreamRunEvent, RunEvent, RunEventType, @@ -27,6 +28,7 @@ __all__ = [ "CreateRunResponse", "DIFY_AGENT_MODEL_LAYER_ID", "EmptyRunEventData", + "LayerExitSignals", "PydanticAIStreamRunEvent", "RUN_EVENT_ADAPTER", "RunEvent", diff --git a/dify-agent/src/dify_agent/protocol/schemas.py b/dify-agent/src/dify_agent/protocol/schemas.py index d496f36d82..c92d8a6254 100644 --- a/dify-agent/src/dify_agent/protocol/schemas.py +++ b/dify-agent/src/dify_agent/protocol/schemas.py @@ -10,10 +10,12 @@ by polling and SSE replay. Event envelopes keep the public a typed ``data`` model so OpenAPI, Redis replay, and clients parse the same payload contract. Model/provider selection is part of the submitted Agenton layer graph, not a top-level run field; the runtime reads the model layer named -by ``DIFY_AGENT_MODEL_LAYER_ID``. Successful runs publish the final JSON-safe -agent output and the resumable Agenton session snapshot together on the terminal -``run_succeeded`` event so consumers can treat terminal events as complete run -summaries. +by ``DIFY_AGENT_MODEL_LAYER_ID``. Request-level layer exit signals decide whether +each layer control is suspended or deleted when the active entry exits, with +suspend as the default so successful terminal events can include resumable +snapshots. Successful runs publish the final JSON-safe agent output and the +resumable Agenton session snapshot together on the terminal ``run_succeeded`` +event so consumers can treat terminal events as complete run summaries. """ from datetime import datetime, timezone @@ -23,6 +25,7 @@ from pydantic import BaseModel, ConfigDict, Field, JsonValue, TypeAdapter from pydantic_ai.messages import AgentStreamEvent from agenton.compositor import CompositorConfig, CompositorSessionSnapshot +from agenton.layers import ExitIntent DIFY_AGENT_MODEL_LAYER_ID: Final[str] = "llm" @@ -40,15 +43,27 @@ def utc_now() -> datetime: return datetime.now(timezone.utc) +class LayerExitSignals(BaseModel): + """Requested per-layer lifecycle behavior when a run leaves its active session.""" + + default: ExitIntent = ExitIntent.SUSPEND + layers: dict[str, ExitIntent] = Field(default_factory=dict) + + model_config: ClassVar[ConfigDict] = ConfigDict(extra="forbid") + + class CreateRunRequest(BaseModel): """Request body for creating one async agent run. Model/provider configuration must be supplied through the compositor layer - named by ``DIFY_AGENT_MODEL_LAYER_ID``. + named by ``DIFY_AGENT_MODEL_LAYER_ID``. ``layer_exit_signals`` defaults every + layer to suspend so callers receive a resumable success snapshot unless they + explicitly request delete for one or more layers. """ compositor: CompositorConfig session_snapshot: CompositorSessionSnapshot | None = None + layer_exit_signals: LayerExitSignals = Field(default_factory=LayerExitSignals) model_config: ClassVar[ConfigDict] = ConfigDict(extra="forbid") @@ -162,6 +177,7 @@ __all__ = [ "CreateRunResponse", "DIFY_AGENT_MODEL_LAYER_ID", "EmptyRunEventData", + "LayerExitSignals", "PydanticAIStreamRunEvent", "RUN_EVENT_ADAPTER", "RunEvent", diff --git a/dify-agent/src/dify_agent/runtime/layer_exit_signals.py b/dify-agent/src/dify_agent/runtime/layer_exit_signals.py new file mode 100644 index 0000000000..7d49558f5f --- /dev/null +++ b/dify-agent/src/dify_agent/runtime/layer_exit_signals.py @@ -0,0 +1,41 @@ +"""Validation and application of request-level Agenton layer exit signals. + +HTTP requests carry data-only lifecycle intent in ``LayerExitSignals``. The +runtime validates the signal keys against the built compositor before a run is +persisted or entered, then applies the resolved intent after entry because +``Layer.lifecycle_enter`` resets controls to delete on each successful enter. +""" + +from typing import Any + +from agenton.compositor import Compositor, CompositorSession +from agenton.layers import ExitIntent +from dify_agent.protocol.schemas import LayerExitSignals + + +def validate_layer_exit_signals( + compositor: Compositor[Any, Any, Any, Any, Any, Any], + signals: LayerExitSignals, +) -> None: + """Raise ``ValueError`` when ``signals`` mention layers absent from ``compositor``.""" + unknown_layer_ids = set(signals.layers) - set(compositor.layers) + if not unknown_layer_ids: + return + + names = ", ".join(sorted(unknown_layer_ids)) + raise ValueError(f"layer_exit_signals.layers references unknown layer ids: {names}.") + + +def apply_layer_exit_signals(session: CompositorSession, signals: LayerExitSignals) -> None: + """Apply ``signals`` to active controls for the current compositor entry.""" + for layer_id, control in session.layer_controls.items(): + intent = signals.layers.get(layer_id, signals.default) + if intent is ExitIntent.SUSPEND: + control.suspend_on_exit() + elif intent is ExitIntent.DELETE: + control.delete_on_exit() + else: + raise ValueError(f"Unsupported layer exit intent: {intent!r}.") + + +__all__ = ["apply_layer_exit_signals", "validate_layer_exit_signals"] diff --git a/dify-agent/src/dify_agent/runtime/run_scheduler.py b/dify-agent/src/dify_agent/runtime/run_scheduler.py index 3b2d7b9fc2..179ef39e53 100644 --- a/dify-agent/src/dify_agent/runtime/run_scheduler.py +++ b/dify-agent/src/dify_agent/runtime/run_scheduler.py @@ -16,6 +16,7 @@ from agenton.compositor import LayerRegistry from dify_agent.protocol.schemas import CreateRunRequest from dify_agent.runtime.compositor_factory import build_pydantic_ai_compositor, create_default_layer_registry from dify_agent.runtime.event_sink import RunEventSink, emit_run_failed +from dify_agent.runtime.layer_exit_signals import validate_layer_exit_signals from dify_agent.runtime.runner import AgentRunRunner from dify_agent.runtime.user_prompt_validation import EMPTY_USER_PROMPTS_ERROR, has_non_blank_user_prompt from dify_agent.server.schemas import RunRecord @@ -88,6 +89,7 @@ class RunScheduler: from ``active_tasks`` when it finishes, regardless of success or failure. """ compositor = build_pydantic_ai_compositor(request.compositor, registry=self.layer_registry) + validate_layer_exit_signals(compositor, request.layer_exit_signals) if not has_non_blank_user_prompt(compositor.user_prompts): raise ValueError(EMPTY_USER_PROMPTS_ERROR) diff --git a/dify-agent/src/dify_agent/runtime/runner.py b/dify-agent/src/dify_agent/runtime/runner.py index b70f68da05..f8c1b36ced 100644 --- a/dify-agent/src/dify_agent/runtime/runner.py +++ b/dify-agent/src/dify_agent/runtime/runner.py @@ -2,11 +2,12 @@ The runner is storage-agnostic: it builds an Agenton compositor, enters or resumes its session, runs pydantic-ai with ``compositor.user_prompts`` as the user -input, emits stream events, suspends the session on exit, snapshots it, and then -publishes a terminal success or failure event. The Pydantic AI model is resolved -from the active Agenton layer named by ``DIFY_AGENT_MODEL_LAYER_ID``. Successful -terminal events contain both the JSON-safe final output and session snapshot; -there are no separate output or snapshot events to correlate. +input, emits stream events, applies request-level layer exit signals, snapshots +the resulting session, and then publishes a terminal success or failure event. +The Pydantic AI model is resolved from the active Agenton layer named by +``DIFY_AGENT_MODEL_LAYER_ID``. Successful terminal events contain both the +JSON-safe final output and session snapshot; there are no separate output or +snapshot events to correlate. """ from collections.abc import AsyncIterable @@ -27,6 +28,7 @@ from dify_agent.runtime.event_sink import ( emit_run_started, emit_run_succeeded, ) +from dify_agent.runtime.layer_exit_signals import apply_layer_exit_signals, validate_layer_exit_signals from dify_agent.runtime.user_prompt_validation import EMPTY_USER_PROMPTS_ERROR, has_non_blank_user_prompt @@ -83,13 +85,17 @@ class AgentRunRunner: async def _run_agent(self) -> tuple[JsonValue, CompositorSessionSnapshot]: """Run pydantic-ai inside an entered Agenton session.""" compositor = build_pydantic_ai_compositor(self.request.compositor, registry=self.layer_registry) + try: + validate_layer_exit_signals(compositor, self.request.layer_exit_signals) + except ValueError as exc: + raise AgentRunValidationError(str(exc)) from exc session = ( compositor.session_from_snapshot(self.request.session_snapshot) if self.request.session_snapshot is not None else compositor.new_session() ) async with compositor.enter(session) as active_session: - active_session.suspend_on_exit() + apply_layer_exit_signals(active_session, self.request.layer_exit_signals) user_prompts = compositor.user_prompts if not has_non_blank_user_prompt(user_prompts): raise AgentRunValidationError(EMPTY_USER_PROMPTS_ERROR) diff --git a/dify-agent/src/dify_agent/server/routes/runs.py b/dify-agent/src/dify_agent/server/routes/runs.py index 6e79e9d193..62cd3830e6 100644 --- a/dify-agent/src/dify_agent/server/routes/runs.py +++ b/dify-agent/src/dify_agent/server/routes/runs.py @@ -16,6 +16,7 @@ from fastapi.responses import StreamingResponse from agenton.compositor import LayerRegistry from dify_agent.protocol.schemas import CreateRunRequest, CreateRunResponse, RunEventsResponse, RunStatusResponse from dify_agent.runtime.compositor_factory import build_pydantic_ai_compositor, create_default_layer_registry +from dify_agent.runtime.layer_exit_signals import validate_layer_exit_signals from dify_agent.runtime.run_scheduler import RunScheduler, SchedulerStoppingError from dify_agent.runtime.user_prompt_validation import EMPTY_USER_PROMPTS_ERROR, has_non_blank_user_prompt from dify_agent.server.sse import sse_event_stream @@ -47,6 +48,7 @@ def create_runs_router( request.compositor, registry=resolved_get_layer_registry(), ) + validate_layer_exit_signals(compositor, request.layer_exit_signals) except Exception as exc: raise HTTPException(status_code=422, detail=str(exc)) from exc if not has_non_blank_user_prompt(compositor.user_prompts): diff --git a/dify-agent/tests/local/agenton/compositor/test_builder_snapshot.py b/dify-agent/tests/local/agenton/compositor/test_builder_snapshot.py index 3cc888c3f4..585c67e854 100644 --- a/dify-agent/tests/local/agenton/compositor/test_builder_snapshot.py +++ b/dify-agent/tests/local/agenton/compositor/test_builder_snapshot.py @@ -261,10 +261,13 @@ def test_snapshot_rejects_active_sessions_and_excludes_handles() -> None: asyncio.run(run()) snapshot = compositor.snapshot_session(session) - assert snapshot.model_dump(mode="json") == { + dumped = snapshot.model_dump(mode="json") + assert dumped == { "schema_version": 1, "layers": [{"name": "handle", "state": "closed", "runtime_state": {"resource_id": "abc"}}], } + assert "_entry_stack" not in str(dumped) + assert "_owner" not in str(dumped) def test_restore_validates_runtime_state_and_resume_rehydrates_handles() -> None: diff --git a/dify-agent/tests/local/agenton/compositor/test_enter.py b/dify-agent/tests/local/agenton/compositor/test_enter.py index f6c9a2d67f..096e096eea 100644 --- a/dify-agent/tests/local/agenton/compositor/test_enter.py +++ b/dify-agent/tests/local/agenton/compositor/test_enter.py @@ -3,7 +3,9 @@ from collections import OrderedDict from collections.abc import Iterator from dataclasses import dataclass, field from itertools import count +from typing import cast +import pytest from pydantic import BaseModel, ConfigDict from typing_extensions import override @@ -12,6 +14,7 @@ from agenton.layers import ( ExitIntent, EmptyLayerConfig, EmptyRuntimeHandles, + EmptyRuntimeState, LayerControl, LifecycleState, NoLayerDeps, @@ -296,3 +299,261 @@ def test_runtime_state_is_per_session_and_survives_suspend_resume_delete() -> No "deleted_runtime_id": 2, } assert not hasattr(layer, "runtime_id") + + +class ResourceHandles(BaseModel): + resource: "RecordingResource | None" = None + + model_config = ConfigDict(extra="forbid", validate_assignment=True, arbitrary_types_allowed=True) + + +class RecordingResource: + events: list[str] + label: str + closed: bool + + def __init__(self, events: list[str], label: str) -> None: + self.events = events + self.label = label + self.closed = False + + async def __aenter__(self) -> "RecordingResource": + self.events.append(f"enter:{self.label}") + return self + + async def __aexit__(self, exc_type: object, exc: object, traceback: object) -> None: + self.closed = True + self.events.append(f"exit:{self.label}") + + +@dataclass(slots=True) +class ResourceLayer(PlainLayer[NoLayerDeps, EmptyLayerConfig, EmptyRuntimeState, ResourceHandles]): + events: list[str] = field(default_factory=list) + resources: list[RecordingResource] = field(default_factory=list) + fail_create: bool = False + fail_resume: bool = False + fail_suspend: bool = False + fail_delete: bool = False + + @override + async def on_context_create(self, control: LayerControl[EmptyRuntimeState, ResourceHandles]) -> None: + await self._open_resource(control, "create") + self.events.append("create") + if self.fail_create: + raise RuntimeError("create failed") + + @override + async def on_context_resume(self, control: LayerControl[EmptyRuntimeState, ResourceHandles]) -> None: + await self._open_resource(control, "resume") + self.events.append("resume") + if self.fail_resume: + raise RuntimeError("resume failed") + + @override + async def on_context_suspend(self, control: LayerControl[EmptyRuntimeState, ResourceHandles]) -> None: + self.events.append("suspend") + control.runtime_handles.resource = None + if self.fail_suspend: + raise RuntimeError("suspend failed") + + @override + async def on_context_delete(self, control: LayerControl[EmptyRuntimeState, ResourceHandles]) -> None: + self.events.append("delete") + control.runtime_handles.resource = None + if self.fail_delete: + raise RuntimeError("delete failed") + + async def _open_resource( + self, + control: LayerControl[EmptyRuntimeState, ResourceHandles], + label: str, + ) -> None: + resource = await control.enter_async_resource(RecordingResource(self.events, label)) + + async def cleanup() -> None: + self.events.append(f"cleanup:{label}") + + control.add_async_cleanup(cleanup) + control.runtime_handles.resource = resource + self.resources.append(resource) + + +def _resource_control(control: LayerControl) -> LayerControl[EmptyRuntimeState, ResourceHandles]: + return cast(LayerControl[EmptyRuntimeState, ResourceHandles], control) + + +def test_entry_resource_stack_closes_resources_on_suspend_and_delete() -> None: + layer = ResourceLayer() + compositor: Compositor[PlainPromptType, PlainToolType] = Compositor(layers=OrderedDict([("resource", layer)])) + session = compositor.new_session() + + async def run() -> None: + async with compositor.enter(session) as active_session: + control = _resource_control(active_session.layer("resource")) + assert control.runtime_handles.resource is layer.resources[0] + assert layer.resources[0].closed is False + active_session.suspend_on_exit() + + assert _resource_control(session.layer("resource")).runtime_handles.resource is None + assert layer.resources[0].closed is True + + async with compositor.enter(session): + control = _resource_control(session.layer("resource")) + assert control.runtime_handles.resource is layer.resources[1] + assert layer.resources[1].closed is False + + assert _resource_control(session.layer("resource")).runtime_handles.resource is None + assert layer.resources[1].closed is True + + asyncio.run(run()) + + assert layer.events == [ + "enter:create", + "create", + "suspend", + "cleanup:create", + "exit:create", + "enter:resume", + "resume", + "delete", + "cleanup:resume", + "exit:resume", + ] + + +def test_entry_resource_stack_closes_resources_when_create_or_resume_raises() -> None: + async def fail_create() -> None: + layer = ResourceLayer(fail_create=True) + compositor: Compositor[PlainPromptType, PlainToolType] = Compositor(layers=OrderedDict([("resource", layer)])) + session = compositor.new_session() + + with pytest.raises(RuntimeError, match="create failed"): + async with compositor.enter(session): + pass + + assert session.layer("resource").state is LifecycleState.NEW + assert layer.resources[0].closed is True + assert session.layer("resource")._entry_stack is None + + async def fail_resume() -> None: + layer = ResourceLayer() + compositor: Compositor[PlainPromptType, PlainToolType] = Compositor(layers=OrderedDict([("resource", layer)])) + session = compositor.new_session() + + async with compositor.enter(session) as active_session: + active_session.suspend_on_exit() + + layer.fail_resume = True + with pytest.raises(RuntimeError, match="resume failed"): + async with compositor.enter(session): + pass + + assert session.layer("resource").state is LifecycleState.SUSPENDED + assert layer.resources[1].closed is True + assert session.layer("resource")._entry_stack is None + + asyncio.run(fail_create()) + asyncio.run(fail_resume()) + + +def test_entry_resource_stack_closes_resources_when_body_raises() -> None: + layer = ResourceLayer() + compositor: Compositor[PlainPromptType, PlainToolType] = Compositor(layers=OrderedDict([("resource", layer)])) + session = compositor.new_session() + + async def run() -> None: + with pytest.raises(RuntimeError, match="body failed"): + async with compositor.enter(session): + raise RuntimeError("body failed") + + asyncio.run(run()) + + assert session.layer("resource").state is LifecycleState.CLOSED + assert layer.resources[0].closed is True + assert session.layer("resource")._entry_stack is None + + +def test_failed_suspend_hook_exits_active_state_and_closes_resources() -> None: + layer = ResourceLayer(fail_suspend=True) + compositor: Compositor[PlainPromptType, PlainToolType] = Compositor(layers=OrderedDict([("resource", layer)])) + session = compositor.new_session() + + async def run() -> None: + with pytest.raises(RuntimeError, match="suspend failed"): + async with compositor.enter(session) as active_session: + active_session.suspend_on_exit() + + control = session.layer("resource") + assert control.state is LifecycleState.SUSPENDED + assert layer.resources[0].closed is True + assert control._entry_stack is None + with pytest.raises(RuntimeError, match="entry resource stack is not active"): + await control.enter_async_resource(RecordingResource([], "unused")) + with pytest.raises(RuntimeError, match="requires an active LayerControl"): + _ = layer.require_control(control, active=True) + + asyncio.run(run()) + + +def test_failed_delete_hook_exits_active_state_and_closes_resources() -> None: + layer = ResourceLayer(fail_delete=True) + compositor: Compositor[PlainPromptType, PlainToolType] = Compositor(layers=OrderedDict([("resource", layer)])) + session = compositor.new_session() + + async def run() -> None: + with pytest.raises(RuntimeError, match="delete failed"): + async with compositor.enter(session): + pass + + control = session.layer("resource") + assert control.state is LifecycleState.CLOSED + assert layer.resources[0].closed is True + assert control._entry_stack is None + with pytest.raises(RuntimeError, match="entry resource stack is not active"): + await control.enter_async_resource(RecordingResource([], "unused")) + with pytest.raises(RuntimeError, match="requires an active LayerControl"): + _ = layer.require_control(control, active=True) + + asyncio.run(run()) + + +def test_resource_stack_api_raises_outside_active_entry_stack() -> None: + control = LayerControl() + + async def cleanup() -> None: + raise AssertionError("cleanup should not be registered") + + with pytest.raises(RuntimeError, match="entry resource stack is not active"): + control.add_async_cleanup(cleanup) + + with pytest.raises(RuntimeError, match="entry resource stack is not active"): + asyncio.run(control.enter_async_resource(RecordingResource([], "unused"))) + + +def test_require_control_validates_owner_schema_and_active_state() -> None: + layer = ResourceLayer() + other_layer = TraceLayer() + compositor: Compositor[PlainPromptType, PlainToolType] = Compositor( + layers=OrderedDict([("resource", layer), ("other", other_layer)]) + ) + session = compositor.new_session() + control = session.layer("resource") + + with pytest.raises(RuntimeError, match="requires an active LayerControl"): + _ = layer.require_control(control, active=True) + + with pytest.raises(RuntimeError, match="belongs to layer 'other'"): + _ = layer.require_control(session.layer("other")) + + bad_control = LayerControl() + bad_session = CompositorSession(OrderedDict([("resource", bad_control), ("other", other_layer.new_control())])) + bad_session._bind_owner(compositor) + with pytest.raises(TypeError, match="runtime_handles must be ResourceHandles"): + _ = layer.require_control(bad_control) + + async def run() -> None: + async with compositor.enter(session) as active_session: + active_control = active_session.layer("resource") + assert layer.require_control(active_control, active=True) is active_control + + asyncio.run(run()) diff --git a/dify-agent/tests/local/dify_agent/adapters/llm/test_model.py b/dify-agent/tests/local/dify_agent/adapters/llm/test_model.py index 7c8ef67a43..a0065de718 100644 --- a/dify-agent/tests/local/dify_agent/adapters/llm/test_model.py +++ b/dify-agent/tests/local/dify_agent/adapters/llm/test_model.py @@ -45,7 +45,6 @@ class DifyLLMAdapterModelTests(unittest.IsolatedAsyncioTestCase): return DifyPluginDaemonProvider( tenant_id="tenant-1", plugin_id="langgenius/openai", - plugin_provider="openai", plugin_daemon_url="http://plugin-daemon", plugin_daemon_api_key="daemon-secret", user_id=user_id, @@ -157,6 +156,7 @@ class DifyLLMAdapterModelTests(unittest.IsolatedAsyncioTestCase): adapter = DifyLLMAdapterModel( "demo-model", self.make_provider(user_id="user-123"), + model_provider="openai", credentials={"api_key": "secret"}, model_settings={"temperature": 0.2, "stop_sequences": ["DEFAULT_STOP"]}, ) @@ -168,7 +168,7 @@ class DifyLLMAdapterModelTests(unittest.IsolatedAsyncioTestCase): ) self.assertEqual(response.model_name, "demo-model") - self.assertEqual(response.provider_name, "DifyPlugin/openai") + self.assertEqual(response.provider_name, "DifyPlugin/langgenius/openai") self.assertEqual(response.usage.input_tokens, 11) self.assertEqual(response.usage.output_tokens, 7) self.assertEqual(response.parts[0].part_kind, "text") @@ -178,6 +178,7 @@ class DifyLLMAdapterModelTests(unittest.IsolatedAsyncioTestCase): http_client = httpx.AsyncClient() provider = self.make_provider(http_client=http_client) + self.assertEqual(provider.name, "DifyPlugin/langgenius/openai") self.assertIs(provider.client.http_client, http_client) async with provider: pass @@ -197,6 +198,7 @@ class DifyLLMAdapterModelTests(unittest.IsolatedAsyncioTestCase): adapter = DifyLLMAdapterModel( "demo-model", self.make_provider(), + model_provider="openai", credentials={"api_key": "secret"}, ) @@ -256,6 +258,7 @@ class DifyLLMAdapterModelTests(unittest.IsolatedAsyncioTestCase): adapter = DifyLLMAdapterModel( "demo-model", self.make_provider(), + model_provider="openai", credentials={"api_key": "secret"}, ) @@ -288,6 +291,7 @@ class DifyLLMAdapterModelTests(unittest.IsolatedAsyncioTestCase): adapter = DifyLLMAdapterModel( "demo-model", self.make_provider(), + model_provider="openai", credentials={"api_key": "secret"}, ) @@ -319,6 +323,7 @@ class DifyLLMAdapterModelTests(unittest.IsolatedAsyncioTestCase): adapter = DifyLLMAdapterModel( "demo-model", self.make_provider(), + model_provider="openai", credentials={"api_key": "secret"}, ) @@ -345,6 +350,7 @@ class DifyLLMAdapterModelTests(unittest.IsolatedAsyncioTestCase): adapter = DifyLLMAdapterModel( "demo-model", self.make_provider(), + model_provider="openai", credentials={"api_key": "secret"}, ) @@ -374,6 +380,7 @@ class DifyLLMAdapterModelTests(unittest.IsolatedAsyncioTestCase): adapter = DifyLLMAdapterModel( "demo-model", self.make_provider(), + model_provider="openai", credentials={"api_key": "secret"}, ) diff --git a/dify-agent/tests/local/dify_agent/layers/dify_plugin/test_configs.py b/dify-agent/tests/local/dify_agent/layers/dify_plugin/test_configs.py index e82d84801d..8545e622c9 100644 --- a/dify-agent/tests/local/dify_agent/layers/dify_plugin/test_configs.py +++ b/dify-agent/tests/local/dify_agent/layers/dify_plugin/test_configs.py @@ -38,19 +38,30 @@ def test_dify_plugin_layer_config_forbids_runtime_settings() -> None: def test_dify_plugin_llm_config_accepts_scalar_credentials_and_model_settings() -> None: credential: DifyPluginCredentialValue = "secret" config = DifyPluginLLMLayerConfig( - provider="openai", + model_provider="openai", model="gpt-4o-mini", credentials={"api_key": credential, "enabled": True, "retries": 2, "ratio": 0.5, "empty": None}, model_settings={"temperature": 0.2, "max_tokens": 64}, ) + assert config.model_provider == "openai" assert config.credentials == {"api_key": "secret", "enabled": True, "retries": 2, "ratio": 0.5, "empty": None} assert config.model_settings == {"temperature": 0.2, "max_tokens": 64} with pytest.raises(ValidationError): _ = DifyPluginLLMLayerConfig.model_validate( { - "provider": "openai", + "model_provider": "openai", "model": "gpt-4o-mini", "credentials": {"nested": {"not": "allowed"}}, } ) + + +def test_dify_plugin_llm_config_rejects_old_provider_field() -> None: + with pytest.raises(ValidationError): + _ = DifyPluginLLMLayerConfig.model_validate( + { + "provider": "openai", + "model": "gpt-4o-mini", + } + ) diff --git a/dify-agent/tests/local/dify_agent/layers/dify_plugin/test_layers.py b/dify-agent/tests/local/dify_agent/layers/dify_plugin/test_layers.py index bdc3c72dce..13b5250970 100644 --- a/dify-agent/tests/local/dify_agent/layers/dify_plugin/test_layers.py +++ b/dify-agent/tests/local/dify_agent/layers/dify_plugin/test_layers.py @@ -5,7 +5,7 @@ from typing import cast import pytest from agenton.compositor import Compositor -from agenton.layers import EmptyRuntimeState, LayerControl, PlainPromptType, PlainToolType +from agenton.layers import EmptyRuntimeHandles, EmptyRuntimeState, LayerControl, PlainPromptType, PlainToolType from dify_agent.adapters.llm import DifyLLMAdapterModel from dify_agent.layers.dify_plugin.configs import DifyPluginLLMLayerConfig, DifyPluginLayerConfig from dify_agent.layers.dify_plugin.llm_layer import DifyPluginLLMLayer @@ -24,7 +24,7 @@ def _plugin_layer() -> DifyPluginLayer: def _llm_layer() -> DifyPluginLLMLayer: return DifyPluginLLMLayer.from_config( DifyPluginLLMLayerConfig( - provider="openai", + model_provider="openai", model="demo-model", credentials={"api_key": "secret"}, model_settings={"temperature": 0.2}, @@ -36,36 +36,62 @@ def _plugin_control(control: LayerControl) -> LayerControl[EmptyRuntimeState, Di return cast(LayerControl[EmptyRuntimeState, DifyPluginRuntimeHandles], control) -def test_dify_plugin_layer_get_provider_requires_active_context_and_uses_runtime_client() -> None: +def _llm_control(control: LayerControl) -> LayerControl[EmptyRuntimeState, EmptyRuntimeHandles]: + return cast(LayerControl[EmptyRuntimeState, EmptyRuntimeHandles], control) + + +def test_dify_plugin_layer_uses_resource_stack_and_get_daemon_provider_requires_active_control() -> None: async def scenario() -> None: plugin = _plugin_layer() compositor: Compositor[PlainPromptType, PlainToolType] = Compositor(layers=OrderedDict([("plugin", plugin)])) session = compositor.new_session() - try: - _ = plugin.get_provider(_plugin_control(session.layer("plugin")), plugin_provider="openai") - except RuntimeError as e: - assert str(e) == "DifyPluginLayer.get_provider() requires an entered control with an open HTTP client." - else: - raise AssertionError("Expected RuntimeError.") + with pytest.raises(RuntimeError, match="requires an active LayerControl"): + _ = plugin.get_daemon_provider(_plugin_control(session.layer("plugin"))) - async with compositor.enter(session): + async with compositor.enter(session) as active_session: handles = cast(DifyPluginRuntimeHandles, cast(object, session.layer("plugin").runtime_handles)) - client = handles.http_client - assert client is not None - provider = plugin.get_provider(_plugin_control(session.layer("plugin")), plugin_provider="openai") - assert provider.client.http_client is client + first_client = handles.http_client + assert first_client is not None + provider = plugin.get_daemon_provider(_plugin_control(session.layer("plugin"))) + assert provider.name == "DifyPlugin/langgenius/openai" + assert provider.client.http_client is first_client assert provider.client.tenant_id == "tenant-1" assert provider.client.plugin_id == "langgenius/openai" - assert provider.client.provider == "openai" assert provider.client.user_id == "user-1" async with provider: pass - assert client.is_closed is False + assert first_client.is_closed is False + active_session.suspend_on_exit() - assert client.is_closed is True - with pytest.raises(RuntimeError, match="entered control with an open HTTP client"): - _ = plugin.get_provider(_plugin_control(session.layer("plugin")), plugin_provider="openai") + assert handles.http_client is None + assert first_client.is_closed is True + with pytest.raises(RuntimeError, match="requires an active LayerControl"): + _ = plugin.get_daemon_provider(_plugin_control(session.layer("plugin"))) + + async with compositor.enter(session): + second_client = handles.http_client + assert second_client is not None + assert second_client is not first_client + + assert handles.http_client is None + assert second_client.is_closed is True + + asyncio.run(scenario()) + + +def test_dify_plugin_layer_get_daemon_provider_rejects_wrong_control() -> None: + async def scenario() -> None: + plugin = _plugin_layer() + llm = _llm_layer() + compositor: Compositor[PlainPromptType, PlainToolType] = Compositor( + layers=OrderedDict([("plugin", plugin), ("llm", llm)]), + deps_name_mapping={"llm": {"plugin": "plugin"}}, + ) + + async with compositor.enter() as session: + with pytest.raises(RuntimeError, match="belongs to layer 'llm'"): + _ = plugin.get_daemon_provider(_plugin_control(session.layer("llm"))) asyncio.run(scenario()) @@ -79,15 +105,23 @@ def test_dify_plugin_llm_layer_builds_adapter_model_from_dependency_provider() - deps_name_mapping={"llm": {"plugin": "plugin"}}, ) - async with compositor.enter() as session: - model = llm.get_model(session.layer("llm")) + session = compositor.new_session() + with pytest.raises(RuntimeError, match="requires an active LayerControl"): + _ = llm.get_model(_llm_control(session.layer("llm"))) + + async with compositor.enter(session): + model = llm.get_model(_llm_control(session.layer("llm"))) assert isinstance(model, DifyLLMAdapterModel) assert model.model_name == "demo-model" + assert model.model_provider == "openai" assert model.credentials == {"api_key": "secret"} - assert model.provider.name == "DifyPlugin/openai" + assert model.provider.name == "DifyPlugin/langgenius/openai" handles = cast(DifyPluginRuntimeHandles, cast(object, session.layer("plugin").runtime_handles)) assert model.provider.client.http_client is handles.http_client + with pytest.raises(RuntimeError, match="belongs to layer 'plugin'"): + _ = llm.get_model(_llm_control(session.layer("plugin"))) + asyncio.run(scenario()) @@ -143,14 +177,8 @@ def test_dify_plugin_layer_concurrent_sessions_use_separate_controls_and_clients assert second_client is not None assert first_client is not second_client - first_provider = plugin.get_provider( - _plugin_control(first_session.layer("plugin")), - plugin_provider="openai", - ) - second_provider = plugin.get_provider( - _plugin_control(second_session.layer("plugin")), - plugin_provider="openai", - ) + first_provider = plugin.get_daemon_provider(_plugin_control(first_session.layer("plugin"))) + second_provider = plugin.get_daemon_provider(_plugin_control(second_session.layer("plugin"))) assert first_provider.client.http_client is first_client assert second_provider.client.http_client is second_client diff --git a/dify-agent/tests/local/dify_agent/protocol/test_protocol_schemas.py b/dify-agent/tests/local/dify_agent/protocol/test_protocol_schemas.py index 4a474dc109..76f79feca3 100644 --- a/dify-agent/tests/local/dify_agent/protocol/test_protocol_schemas.py +++ b/dify-agent/tests/local/dify_agent/protocol/test_protocol_schemas.py @@ -2,11 +2,14 @@ import pytest from pydantic import ValidationError from pydantic_ai.messages import FinalResultEvent +from agenton.layers import ExitIntent from agenton.compositor import CompositorSessionSnapshot +import dify_agent.protocol as protocol_exports from dify_agent.protocol import DIFY_AGENT_MODEL_LAYER_ID from dify_agent.protocol.schemas import ( RUN_EVENT_ADAPTER, CreateRunRequest, + LayerExitSignals, PydanticAIStreamRunEvent, RunFailedEvent, RunFailedEventData, @@ -62,6 +65,34 @@ def test_create_run_request_rejects_agent_profile_and_model_layer_id_is_public() ) +def test_layer_exit_signals_default_to_suspend_and_are_public() -> None: + assert protocol_exports.LayerExitSignals is LayerExitSignals + request = CreateRunRequest.model_validate({"compositor": {"layers": []}}) + + assert request.layer_exit_signals.default is ExitIntent.SUSPEND + assert request.layer_exit_signals.layers == {} + + +def test_layer_exit_signals_accept_layer_overrides() -> None: + request = CreateRunRequest.model_validate( + { + "compositor": {"layers": []}, + "layer_exit_signals": { + "default": "delete", + "layers": {"prompt": "suspend", "llm": "delete"}, + }, + } + ) + + assert request.layer_exit_signals.default is ExitIntent.DELETE + assert request.layer_exit_signals.layers == {"prompt": ExitIntent.SUSPEND, "llm": ExitIntent.DELETE} + + +def test_layer_exit_signals_reject_extra_fields() -> None: + with pytest.raises(ValidationError): + _ = LayerExitSignals.model_validate({"default": "suspend", "unknown": "value"}) + + @pytest.mark.parametrize("event_type", ["agent_output", "session_snapshot"]) def test_removed_non_terminal_payload_events_are_rejected(event_type: str) -> None: with pytest.raises(ValidationError): diff --git a/dify-agent/tests/local/dify_agent/runtime/test_run_scheduler.py b/dify-agent/tests/local/dify_agent/runtime/test_run_scheduler.py index 8edead2ff4..51e60a2508 100644 --- a/dify-agent/tests/local/dify_agent/runtime/test_run_scheduler.py +++ b/dify-agent/tests/local/dify_agent/runtime/test_run_scheduler.py @@ -6,7 +6,8 @@ import pytest from pydantic import JsonValue from agenton.compositor import CompositorConfig, LayerNodeConfig -from dify_agent.protocol.schemas import CreateRunRequest, RunEvent, RunStatus +from agenton.layers import ExitIntent +from dify_agent.protocol.schemas import CreateRunRequest, LayerExitSignals, RunEvent, RunStatus from dify_agent.runtime.run_scheduler import RunScheduler, SchedulerStoppingError from dify_agent.server.schemas import RunRecord @@ -135,6 +136,21 @@ def test_create_run_rejects_blank_prompt_before_persisting() -> None: asyncio.run(scenario()) +def test_create_run_rejects_unknown_layer_exit_signal_before_persisting() -> None: + async def scenario() -> None: + store = FakeStore() + scheduler = RunScheduler(store=store) + request = _request() + request.layer_exit_signals = LayerExitSignals(layers={"missing": ExitIntent.DELETE}) + + with pytest.raises(ValueError, match="missing"): + await scheduler.create_run(request) + + assert store.records == {} + + asyncio.run(scenario()) + + def test_create_run_rejects_after_shutdown_starts() -> None: async def scenario() -> None: scheduler = RunScheduler(store=FakeStore()) diff --git a/dify-agent/tests/local/dify_agent/runtime/test_runner.py b/dify-agent/tests/local/dify_agent/runtime/test_runner.py index 1db8b059ee..8b0a5a8a8a 100644 --- a/dify-agent/tests/local/dify_agent/runtime/test_runner.py +++ b/dify-agent/tests/local/dify_agent/runtime/test_runner.py @@ -4,13 +4,13 @@ import pytest from pydantic_ai.models.test import TestModel from agenton.compositor import CompositorConfig, LayerNodeConfig -from agenton.layers import LayerControl +from agenton.layers import ExitIntent, LayerControl, LifecycleState from agenton_collections.layers.plain import PromptLayerConfig from dify_agent.layers.dify_plugin.configs import DifyPluginLLMLayerConfig, DifyPluginLayerConfig from dify_agent.layers.dify_plugin.llm_layer import DifyPluginLLMLayer from dify_agent.layers.dify_plugin.plugin_layer import DifyPluginRuntimeHandles from dify_agent.protocol import DIFY_AGENT_MODEL_LAYER_ID -from dify_agent.protocol.schemas import CreateRunRequest, RunSucceededEvent +from dify_agent.protocol.schemas import CreateRunRequest, LayerExitSignals, RunSucceededEvent from dify_agent.runtime.event_sink import InMemoryRunEventSink from dify_agent.runtime.runner import AgentRunRunner, AgentRunValidationError @@ -20,6 +20,7 @@ def _request( *, llm_layer_name: str = DIFY_AGENT_MODEL_LAYER_ID, plugin_layer_name: str = "plugin", + layer_exit_signals: LayerExitSignals | None = None, ) -> CreateRunRequest: return CreateRunRequest( compositor=CompositorConfig( @@ -39,13 +40,14 @@ def _request( type="dify.plugin.llm", deps={"plugin": plugin_layer_name}, config=DifyPluginLLMLayerConfig( - provider="openai", + model_provider="openai", model="demo-model", credentials={"api_key": "secret"}, ), ), ] - ) + ), + layer_exit_signals=layer_exit_signals or LayerExitSignals(), ) @@ -78,9 +80,72 @@ def test_runner_emits_terminal_success_and_snapshot(monkeypatch: pytest.MonkeyPa "renamed-plugin", DIFY_AGENT_MODEL_LAYER_ID, ] + assert [layer.state for layer in terminal.data.session_snapshot.layers] == [ + LifecycleState.SUSPENDED, + LifecycleState.SUSPENDED, + LifecycleState.SUSPENDED, + ] assert sink.statuses["run-1"] == "succeeded" +def test_runner_applies_layer_exit_signal_overrides_to_success_snapshot(monkeypatch: pytest.MonkeyPatch) -> None: + def fake_get_model(_self: DifyPluginLLMLayer, _control: LayerControl): + return TestModel(custom_output_text="done") # pyright: ignore[reportReturnType] + + monkeypatch.setattr(DifyPluginLLMLayer, "get_model", fake_get_model) + request = _request( + layer_exit_signals=LayerExitSignals( + default=ExitIntent.SUSPEND, + layers={"prompt": ExitIntent.DELETE, DIFY_AGENT_MODEL_LAYER_ID: ExitIntent.DELETE}, + ) + ) + sink = InMemoryRunEventSink() + + asyncio.run(AgentRunRunner(sink=sink, request=request, run_id="run-exit").run()) + + terminal = sink.events["run-exit"][-1] + assert isinstance(terminal, RunSucceededEvent) + assert {layer.name: layer.state for layer in terminal.data.session_snapshot.layers} == { + "prompt": LifecycleState.CLOSED, + "plugin": LifecycleState.SUSPENDED, + DIFY_AGENT_MODEL_LAYER_ID: LifecycleState.CLOSED, + } + + +def test_runner_rejects_unknown_layer_exit_signal_id() -> None: + request = _request(layer_exit_signals=LayerExitSignals(layers={"missing": ExitIntent.DELETE})) + sink = InMemoryRunEventSink() + + with pytest.raises(AgentRunValidationError, match="missing"): + asyncio.run(AgentRunRunner(sink=sink, request=request, run_id="run-unknown-signal").run()) + + assert [event.type for event in sink.events["run-unknown-signal"]] == ["run_started", "run_failed"] + assert sink.statuses["run-unknown-signal"] == "failed" + + +def test_runner_applies_layer_exit_signals_before_model_resolution_failure(monkeypatch: pytest.MonkeyPatch) -> None: + def fake_get_model(self: DifyPluginLLMLayer, control: LayerControl): + plugin_control = control.control_for(self.deps.plugin) + assert control.exit_intent is ExitIntent.DELETE + assert plugin_control.exit_intent is ExitIntent.SUSPEND + raise RuntimeError("model unavailable") + + monkeypatch.setattr(DifyPluginLLMLayer, "get_model", fake_get_model) + request = _request( + layer_exit_signals=LayerExitSignals( + default=ExitIntent.DELETE, + layers={"plugin": ExitIntent.SUSPEND}, + ) + ) + sink = InMemoryRunEventSink() + + with pytest.raises(AgentRunValidationError, match="model unavailable"): + asyncio.run(AgentRunRunner(sink=sink, request=request, run_id="run-model-failure").run()) + + assert [event.type for event in sink.events["run-model-failure"]] == ["run_started", "run_failed"] + assert sink.statuses["run-model-failure"] == "failed" + + def test_runner_fails_empty_user_prompts() -> None: request = _request("") sink = InMemoryRunEventSink() diff --git a/dify-agent/tests/local/dify_agent/server/test_runs_routes.py b/dify-agent/tests/local/dify_agent/server/test_runs_routes.py index 5a820a2c1c..90e579d331 100644 --- a/dify-agent/tests/local/dify_agent/server/test_runs_routes.py +++ b/dify-agent/tests/local/dify_agent/server/test_runs_routes.py @@ -97,7 +97,7 @@ def test_create_run_accepts_valid_full_plugin_graph() -> None: "type": "dify.plugin.llm", "deps": {"plugin": "plugin-renamed"}, "config": { - "provider": "openai", + "model_provider": "openai", "model": "gpt-4o-mini", "credentials": {"api_key": "secret"}, "model_settings": {"temperature": 0.2}, @@ -112,6 +112,30 @@ def test_create_run_accepts_valid_full_plugin_graph() -> None: assert response.json() == {"run_id": "run-1", "status": "running"} +def test_create_run_rejects_unknown_layer_exit_signal_before_scheduling() -> None: + from fastapi import FastAPI + + app = FastAPI() + app.include_router( + create_runs_router(lambda: FakeStore(), lambda: FakeScheduler()) # pyright: ignore[reportArgumentType] + ) + client = TestClient(app) + + response = client.post( + "/runs", + json={ + "compositor": { + "schema_version": 1, + "layers": [{"name": "prompt", "type": "plain.prompt", "config": {"user": "hello"}}], + }, + "layer_exit_signals": {"layers": {"missing": "delete"}}, + }, + ) + + assert response.status_code == 422 + assert "missing" in response.json()["detail"] + + def test_create_run_returns_503_when_scheduler_is_stopping() -> None: from fastapi import FastAPI