diff --git a/dify-agent/docs/dify-agent/api/index.md b/dify-agent/docs/dify-agent/api/index.md index 5185ea6d97..a03f2e6eaa 100644 --- a/dify-agent/docs/dify-agent/api/index.md +++ b/dify-agent/docs/dify-agent/api/index.md @@ -12,11 +12,11 @@ server-only and should not be used by API consumers. ## Input model Create-run requests accept a `CompositorConfig` and an optional -`CompositorSessionSnapshot`. There is **no top-level `user_prompt` field**. -User input must be supplied by Agenton layers. In the MVP server, the safe -config-constructible layer registry includes `plain.prompt`; its `config.user` -field becomes `Compositor.user_prompts` and is passed to Pydantic AI as the run -input. +`CompositorSessionSnapshot`. There is **no top-level `user_prompt` or model +profile field**. User input and model/provider selection are supplied by Agenton +layers. In the MVP server, the safe config-constructible layer registry includes +`plain.prompt`, `dify.plugin`, and `dify.plugin.llm`. The runtime reads the LLM +model layer named by `DIFY_AGENT_MODEL_LAYER_ID`, whose public value is `"llm"`. Blank user input is rejected. A request with no user prompt, an empty string, or only whitespace strings such as `"user": ["", " "]` returns `422` before a run @@ -46,14 +46,35 @@ Request: "prefix": "You are a concise assistant.", "user": "Say hello from the Dify Agent API." } + }, + { + "name": "plugin", + "type": "dify.plugin", + "config": { + "tenant_id": "replace-with-tenant-id", + "plugin_id": "langgenius/openai" + } + }, + { + "name": "llm", + "type": "dify.plugin.llm", + "deps": { + "plugin": "plugin" + }, + "config": { + "provider": "openai", + "model": "gpt-4o-mini", + "credentials": { + "api_key": "replace-with-provider-key" + }, + "model_settings": { + "temperature": 0.2 + } + } } ] }, - "session_snapshot": null, - "agent_profile": { - "provider": "test", - "output_text": "Hello from the TestModel." - } + "session_snapshot": null } ``` @@ -71,7 +92,9 @@ same FastAPI process. Redis is not used as a job queue. Run records and per-run event streams expire after `DIFY_AGENT_RUN_RETENTION_SECONDS`, which defaults to `259200` seconds (3 days). -`agent_profile.provider` currently supports the credential-free `test` profile. +`dify.plugin` receives tenant/plugin identity only; daemon URL, API key, and +timeout are server settings. `dify.plugin.llm.credentials` accepts scalar values +only (`string`, `number`, `boolean`, or `null`). Validation error example (`422`): @@ -167,13 +190,31 @@ normal names; sync methods add `_sync`. from agenton.compositor import CompositorConfig, LayerNodeConfig from agenton_collections.layers.plain import PromptLayerConfig from dify_agent.client import Client -from dify_agent.protocol import CreateRunRequest +from dify_agent.layers.dify_plugin import DifyPluginLLMLayerConfig, DifyPluginLayerConfig +from dify_agent.protocol import DIFY_AGENT_MODEL_LAYER_ID, CreateRunRequest async def main() -> None: request = CreateRunRequest( compositor=CompositorConfig( - layers=[LayerNodeConfig(name="prompt", type="plain.prompt", config=PromptLayerConfig(user="hello"))] + layers=[ + LayerNodeConfig(name="prompt", type="plain.prompt", config=PromptLayerConfig(user="hello")), + LayerNodeConfig( + name="plugin", + type="dify.plugin", + config=DifyPluginLayerConfig(tenant_id="tenant-id", plugin_id="langgenius/openai"), + ), + LayerNodeConfig( + name=DIFY_AGENT_MODEL_LAYER_ID, + type="dify.plugin.llm", + deps={"plugin": "plugin"}, + config=DifyPluginLLMLayerConfig( + provider="openai", + model="gpt-4o-mini", + credentials={"api_key": "provider-key"}, + ), + ), + ] ) ) async with Client(base_url="http://localhost:8000") as client: @@ -186,12 +227,30 @@ async def main() -> None: from agenton.compositor import CompositorConfig, LayerNodeConfig from agenton_collections.layers.plain import PromptLayerConfig from dify_agent.client import Client -from dify_agent.protocol import CreateRunRequest +from dify_agent.layers.dify_plugin import DifyPluginLLMLayerConfig, DifyPluginLayerConfig +from dify_agent.protocol import DIFY_AGENT_MODEL_LAYER_ID, CreateRunRequest request = CreateRunRequest( compositor=CompositorConfig( - layers=[LayerNodeConfig(name="prompt", type="plain.prompt", config=PromptLayerConfig(user="hello"))] + layers=[ + LayerNodeConfig(name="prompt", type="plain.prompt", config=PromptLayerConfig(user="hello")), + LayerNodeConfig( + name="plugin", + type="dify.plugin", + config=DifyPluginLayerConfig(tenant_id="tenant-id", plugin_id="langgenius/openai"), + ), + LayerNodeConfig( + name=DIFY_AGENT_MODEL_LAYER_ID, + type="dify.plugin.llm", + deps={"plugin": "plugin"}, + config=DifyPluginLLMLayerConfig( + provider="openai", + model="gpt-4o-mini", + credentials={"api_key": "provider-key"}, + ), + ), + ] ) ) diff --git a/dify-agent/examples/dify_agent/dify_agent_examples/run_server_consumer.py b/dify-agent/examples/dify_agent/dify_agent_examples/run_server_consumer.py index 48c3168767..694fa09e7f 100644 --- a/dify-agent/examples/dify_agent/dify_agent_examples/run_server_consumer.py +++ b/dify-agent/examples/dify_agent/dify_agent_examples/run_server_consumer.py @@ -5,7 +5,7 @@ example: uv run --project dify-agent uvicorn dify_agent.server.app:app --reload -The default request uses the credential-free pydantic-ai TestModel profile. This +The request carries Dify plugin model configuration in Agenton layers. This script prints the created run and every event observed through cursor polling. ``Client.create_run`` performs one POST attempt only; use polling or SSE replay to recover after client-side uncertainty. @@ -16,10 +16,20 @@ import asyncio from agenton.compositor import CompositorConfig, LayerNodeConfig from agenton_collections.layers.plain import PromptLayerConfig from dify_agent.client import Client -from dify_agent.protocol import AgentProfileConfig, CreateRunRequest +from dify_agent.layers.dify_plugin import ( + DifyPluginCredentialValue, + DifyPluginLLMLayerConfig, + DifyPluginLayerConfig, +) +from dify_agent.protocol import DIFY_AGENT_MODEL_LAYER_ID, CreateRunRequest API_BASE_URL = "http://localhost:8000" +TENANT_ID = "replace-with-tenant-id" +PLUGIN_ID = "langgenius/openai" +PLUGIN_PROVIDER = "openai" +MODEL_NAME = "gpt-4o-mini" +MODEL_CREDENTIALS: dict[str, DifyPluginCredentialValue] = {"api_key": "replace-with-provider-key"} async def main() -> None: @@ -35,10 +45,24 @@ async def main() -> None: prefix="You are a concise assistant.", user="Say hello from the Dify Agent API server example.", ), - ) + ), + LayerNodeConfig( + name="plugin", + type="dify.plugin", + config=DifyPluginLayerConfig(tenant_id=TENANT_ID, plugin_id=PLUGIN_ID), + ), + LayerNodeConfig( + name=DIFY_AGENT_MODEL_LAYER_ID, + type="dify.plugin.llm", + deps={"plugin": "plugin"}, + config=DifyPluginLLMLayerConfig( + provider=PLUGIN_PROVIDER, + model=MODEL_NAME, + credentials=MODEL_CREDENTIALS, + ), + ), ], ), - agent_profile=AgentProfileConfig(output_text="Hello from the example TestModel."), ) ) print("created run", run) diff --git a/dify-agent/examples/dify_agent/dify_agent_examples/run_server_sync_client.py b/dify-agent/examples/dify_agent/dify_agent_examples/run_server_sync_client.py index cf821c690c..92678a7713 100644 --- a/dify-agent/examples/dify_agent/dify_agent_examples/run_server_sync_client.py +++ b/dify-agent/examples/dify_agent/dify_agent_examples/run_server_sync_client.py @@ -8,10 +8,20 @@ a new run explicitly rather than assuming the original request was not accepted. from agenton.compositor import CompositorConfig, LayerNodeConfig from agenton_collections.layers.plain import PromptLayerConfig from dify_agent.client import Client -from dify_agent.protocol import AgentProfileConfig, CreateRunRequest +from dify_agent.layers.dify_plugin import ( + DifyPluginCredentialValue, + DifyPluginLLMLayerConfig, + DifyPluginLayerConfig, +) +from dify_agent.protocol import DIFY_AGENT_MODEL_LAYER_ID, CreateRunRequest API_BASE_URL = "http://localhost:8000" +TENANT_ID = "replace-with-tenant-id" +PLUGIN_ID = "langgenius/openai" +PLUGIN_PROVIDER = "openai" +MODEL_NAME = "gpt-4o-mini" +MODEL_CREDENTIALS: dict[str, DifyPluginCredentialValue] = {"api_key": "replace-with-provider-key"} def main() -> None: @@ -27,10 +37,24 @@ def main() -> None: prefix="You are a concise assistant.", user="Say hello from the synchronous Dify Agent client example.", ), - ) + ), + LayerNodeConfig( + name="plugin", + type="dify.plugin", + config=DifyPluginLayerConfig(tenant_id=TENANT_ID, plugin_id=PLUGIN_ID), + ), + LayerNodeConfig( + name=DIFY_AGENT_MODEL_LAYER_ID, + type="dify.plugin.llm", + deps={"plugin": "plugin"}, + config=DifyPluginLLMLayerConfig( + provider=PLUGIN_PROVIDER, + model=MODEL_NAME, + credentials=MODEL_CREDENTIALS, + ), + ), ], ), - agent_profile=AgentProfileConfig(output_text="Hello from the sync TestModel."), ) ) print("created run", run) diff --git a/dify-agent/src/agenton/compositor/__init__.py b/dify-agent/src/agenton/compositor/__init__.py index 95ce832635..e4e2d1f4a8 100644 --- a/dify-agent/src/agenton/compositor/__init__.py +++ b/dify-agent/src/agenton/compositor/__init__.py @@ -22,7 +22,8 @@ Serializable graph config uses registry type ids rather than import paths. instances; JSON serialization preserves concrete DTO fields before the builder validates them with the registered layer schema. ``CompositorBuilder`` resolves config nodes through ``LayerRegistry`` and can mix those nodes with live layer -instances for Python objects and callables. +instances for Python objects and callables. Registries may also supply factories +for layers that require server-side dependencies in addition to client DTOs. ``Compositor.enter`` enters layers in compositor order and exits them in reverse order through ``AsyncExitStack``. It accepts an optional ``CompositorSession`` @@ -41,7 +42,7 @@ from collections import OrderedDict from collections.abc import AsyncIterator, Callable, Iterable, Mapping as MappingABC, Sequence from contextlib import AsyncExitStack, asynccontextmanager from dataclasses import dataclass, field -from typing import Any, Generic, Mapping, TypedDict, cast +from typing import Any, Generic, Mapping, TypedDict, cast, overload from pydantic import BaseModel, ConfigDict, Field, JsonValue from typing_extensions import Self, TypeVar @@ -55,6 +56,7 @@ LayerPromptT = TypeVar("LayerPromptT", default=AllPromptTypes) LayerToolT = TypeVar("LayerToolT", default=AllToolTypes) UserPromptT = TypeVar("UserPromptT", default=AllUserPromptTypes) LayerUserPromptT = TypeVar("LayerUserPromptT", default=AllUserPromptTypes) +LayerT = TypeVar("LayerT", bound=Layer[Any, Any, Any, Any, Any, Any, Any]) type CompositorTransformer[InputT, OutputT] = Callable[[Sequence[InputT]], Sequence[OutputT]] @@ -76,6 +78,7 @@ class CompositorTransformerKwargs[ type _ConfigModelValue[ModelT: BaseModel] = ModelT | JsonValue | str | bytes +type LayerFactory = Callable[[LayerConfig], Layer[Any, Any, Any, Any, Any, Any, Any]] def _validate_config_model_input[ModelT: BaseModel]( @@ -132,6 +135,7 @@ class LayerDescriptor: config_type: type[LayerConfig] runtime_state_type: type[BaseModel] runtime_handles_type: type[BaseModel] + factory: LayerFactory | None = None class LayerRegistry: @@ -139,7 +143,8 @@ class LayerRegistry: Registration infers config and runtime schemas from layer class attributes. A registered layer must have a type id, either declared as ``type_id`` on the - class or supplied to ``register_layer``. + class or supplied to ``register_layer``. Optional factories let server code + inject dependencies that do not belong in public layer DTOs. """ __slots__ = ("_descriptors",) @@ -154,8 +159,14 @@ class LayerRegistry: layer_type: type[Layer[Any, Any, Any, Any, Any, Any, Any]], *, type_id: str | None = None, + factory: LayerFactory | None = None, ) -> None: - """Register ``layer_type`` under its inferred or explicit type id.""" + """Register ``layer_type`` under its inferred or explicit type id. + + ``factory`` receives validated layer config and constructs the layer. It + is intended for server-only dependencies such as clients or secrets; omit + it for normal ``Layer.from_config`` construction. + """ resolved_type_id = type_id or layer_type.type_id if resolved_type_id is not None and not isinstance(resolved_type_id, str): raise TypeError(f"Layer type id for '{layer_type.__qualname__}' must be a string.") @@ -169,6 +180,7 @@ class LayerRegistry: config_type=layer_type.config_type, runtime_state_type=layer_type.runtime_state_type, runtime_handles_type=layer_type.runtime_handles_type, + factory=factory, ) def resolve(self, type_id: str) -> LayerDescriptor: @@ -289,7 +301,10 @@ class CompositorBuilder: descriptor = self._registry.resolve(type) raw_config = {} if config is None else config validated_config = descriptor.config_type.model_validate(raw_config) - layer = descriptor.layer_type.from_config(cast(Any, validated_config)) + if descriptor.factory is not None: + layer = descriptor.factory(validated_config) + else: + layer = descriptor.layer_type.from_config(cast(Any, validated_config)) self.add_instance(name=name, layer=layer, deps=deps) return self @@ -406,6 +421,29 @@ class Compositor(Generic[PromptT, ToolT, LayerPromptT, LayerToolT, UserPromptT, layer.bind_deps({**self.layers, **deps}) self._deps_bound = True + @overload + def get_layer(self, layer_id: str) -> Layer[Any, Any, Any, Any, Any, Any, Any]: ... + + @overload + def get_layer(self, layer_id: str, layer_type: type[LayerT]) -> LayerT: ... + + def get_layer( + self, + layer_id: str, + layer_type: type[LayerT] | None = None, + ) -> Layer[Any, Any, Any, Any, Any, Any, Any] | LayerT: + """Return a layer by compositor name and optionally validate its type.""" + try: + layer = self.layers[layer_id] + except KeyError as e: + raise KeyError(f"Layer '{layer_id}' is not defined in this compositor.") from e + + if layer_type is not None and not isinstance(layer, layer_type): + raise TypeError( + f"Layer '{layer_id}' must be {layer_type.__name__}, got {type(layer).__name__}." + ) + return layer + def new_session(self) -> CompositorSession: """Create a fresh lifecycle session matching this compositor's layer order.""" return CompositorSession( @@ -566,6 +604,7 @@ __all__ = [ "CompositorTransformer", "CompositorTransformerKwargs", "LayerDescriptor", + "LayerFactory", "LayerNodeConfig", "LayerRegistry", "LayerSessionSnapshot", diff --git a/dify-agent/src/dify_agent/adapters/llm/provider.py b/dify-agent/src/dify_agent/adapters/llm/provider.py index 9b2bf35e73..a8e6539611 100644 --- a/dify-agent/src/dify_agent/adapters/llm/provider.py +++ b/dify-agent/src/dify_agent/adapters/llm/provider.py @@ -128,7 +128,12 @@ class DifyPluginDaemonLLMClient: @dataclass(slots=True, kw_only=True) class DifyPluginDaemonProvider(Provider[DifyPluginDaemonLLMClient]): - """Pydantic AI provider for Dify plugin-daemon dispatch requests.""" + """Pydantic AI provider for Dify plugin-daemon dispatch requests. + + When ``http_client`` is omitted the provider owns an ``AsyncClient`` and the + Pydantic AI provider context manager closes it. When an external client is + supplied, ownership stays with the caller and provider exit leaves it open. + """ tenant_id: str plugin_id: str @@ -137,15 +142,21 @@ class DifyPluginDaemonProvider(Provider[DifyPluginDaemonLLMClient]): plugin_daemon_api_key: str = field(repr=False) user_id: str | None = None timeout: float | httpx.Timeout | None = _DEFAULT_DAEMON_TIMEOUT + http_client: httpx.AsyncClient | None = field(default=None, repr=False) _client: DifyPluginDaemonLLMClient = field(init=False, repr=False) _own_http_client: httpx.AsyncClient | None = field(init=False, default=None, repr=False) _http_client_factory: Callable[[], httpx.AsyncClient] | None = field(init=False, default=None, repr=False) def __post_init__(self) -> None: self.plugin_daemon_url = self.plugin_daemon_url.rstrip("/") - self._http_client_factory = self._make_http_client - http_client = self._make_http_client() - self._own_http_client = http_client + if self.http_client is None: + self._http_client_factory = self._make_http_client + http_client = self._make_http_client() + self._own_http_client = http_client + else: + http_client = self.http_client + self._own_http_client = None + self._http_client_factory = None self._client = DifyPluginDaemonLLMClient( plugin_daemon_url=self.plugin_daemon_url, plugin_daemon_api_key=self.plugin_daemon_api_key, diff --git a/dify-agent/src/dify_agent/layers/__init__.py b/dify-agent/src/dify_agent/layers/__init__.py new file mode 100644 index 0000000000..518fbf9bd9 --- /dev/null +++ b/dify-agent/src/dify_agent/layers/__init__.py @@ -0,0 +1,3 @@ +"""Dify-owned Agenton layer packages.""" + +__all__: list[str] = [] diff --git a/dify-agent/src/dify_agent/layers/dify_plugin/__init__.py b/dify-agent/src/dify_agent/layers/dify_plugin/__init__.py new file mode 100644 index 0000000000..54396fa1bb --- /dev/null +++ b/dify-agent/src/dify_agent/layers/dify_plugin/__init__.py @@ -0,0 +1,13 @@ +"""Client-safe exports for Dify plugin layer config DTOs.""" + +from dify_agent.layers.dify_plugin.configs import ( + DifyPluginCredentialValue, + DifyPluginLLMLayerConfig, + DifyPluginLayerConfig, +) + +__all__ = [ + "DifyPluginCredentialValue", + "DifyPluginLLMLayerConfig", + "DifyPluginLayerConfig", +] diff --git a/dify-agent/src/dify_agent/layers/dify_plugin/configs.py b/dify-agent/src/dify_agent/layers/dify_plugin/configs.py new file mode 100644 index 0000000000..949dac8c08 --- /dev/null +++ b/dify-agent/src/dify_agent/layers/dify_plugin/configs.py @@ -0,0 +1,45 @@ +"""Client-safe DTOs for Dify plugin-backed Agenton layers. + +This module intentionally contains only public config schemas and scalar type +aliases. Runtime objects such as HTTP clients, server settings, and adapter +implementations live in sibling implementation modules so clients can build run +requests without importing server-only dependencies. +""" + +from typing import ClassVar, TypeAlias + +from pydantic import ConfigDict, Field +from pydantic_ai.settings import ModelSettings + +from agenton.layers import LayerConfig + + +DifyPluginCredentialValue: TypeAlias = str | int | float | bool | None + + +class DifyPluginLayerConfig(LayerConfig): + """Public config for the plugin daemon tenant/plugin context layer.""" + + tenant_id: str + plugin_id: str + user_id: str | None = None + + model_config: ClassVar[ConfigDict] = ConfigDict(extra="forbid", arbitrary_types_allowed=True) + + +class DifyPluginLLMLayerConfig(LayerConfig): + """Public config for selecting a Dify plugin LLM model.""" + + provider: str + model: str + credentials: dict[str, DifyPluginCredentialValue] = Field(default_factory=dict) + model_settings: ModelSettings | None = None + + model_config: ClassVar[ConfigDict] = ConfigDict(extra="forbid", arbitrary_types_allowed=True) + + +__all__ = [ + "DifyPluginCredentialValue", + "DifyPluginLLMLayerConfig", + "DifyPluginLayerConfig", +] diff --git a/dify-agent/src/dify_agent/layers/dify_plugin/llm_layer.py b/dify-agent/src/dify_agent/layers/dify_plugin/llm_layer.py new file mode 100644 index 0000000000..de8df348d1 --- /dev/null +++ b/dify-agent/src/dify_agent/layers/dify_plugin/llm_layer.py @@ -0,0 +1,49 @@ +"""Dify plugin LLM model layer. + +This layer owns model capability resolution for Dify plugin-backed LLMs. It +depends on ``DifyPluginLayer`` for active daemon access and returns a Pydantic AI +model adapter configured from the public LLM layer DTO. +""" + +from dataclasses import dataclass + +from typing_extensions import Self, override + +from agenton.layers import LayerDeps, PlainLayer +from dify_agent.adapters.llm import DifyLLMAdapterModel +from dify_agent.layers.dify_plugin.configs import DifyPluginLLMLayerConfig +from dify_agent.layers.dify_plugin.plugin_layer import DifyPluginLayer + + +class DifyPluginLLMDeps(LayerDeps): + """Dependencies required by ``DifyPluginLLMLayer``.""" + + plugin: DifyPluginLayer # pyright: ignore[reportUninitializedInstanceVariable] + + +@dataclass(slots=True) +class DifyPluginLLMLayer(PlainLayer[DifyPluginLLMDeps, DifyPluginLLMLayerConfig]): + """Layer that creates the Dify plugin-daemon Pydantic AI model.""" + + type_id = "dify.plugin.llm" + + config: DifyPluginLLMLayerConfig + + @classmethod + @override + def from_config(cls, config: DifyPluginLLMLayerConfig) -> Self: + """Create the LLM layer from validated public config.""" + return cls(config=config) + + def get_model(self) -> DifyLLMAdapterModel: + """Return the configured model using the active plugin daemon provider.""" + provider = self.deps.plugin.get_provider(plugin_provider=self.config.provider) + return DifyLLMAdapterModel( + model=self.config.model, + daemon_provider=provider, + credentials=dict(self.config.credentials), + model_settings=self.config.model_settings, + ) + + +__all__ = ["DifyPluginLLMDeps", "DifyPluginLLMLayer"] diff --git a/dify-agent/src/dify_agent/layers/dify_plugin/plugin_layer.py b/dify-agent/src/dify_agent/layers/dify_plugin/plugin_layer.py new file mode 100644 index 0000000000..a869aa213b --- /dev/null +++ b/dify-agent/src/dify_agent/layers/dify_plugin/plugin_layer.py @@ -0,0 +1,151 @@ +"""Runtime Dify plugin context layer. + +The public config identifies tenant/plugin/user context only. Plugin daemon URL, +API key, and timeout are server-side dependencies injected by the layer registry +factory. Each active compositor entry owns an HTTP client in ``LayerControl`` +runtime handles; ``get_provider`` discovers those handles via a task-local +context variable so shared layer instances never store session-local clients. +""" + +from collections.abc import AsyncIterator +from contextlib import asynccontextmanager +from contextvars import ContextVar, Token +from dataclasses import dataclass +from typing import cast + +import httpx +from pydantic import BaseModel, ConfigDict +from typing_extensions import Self, override + +from agenton.layers import EmptyRuntimeState, LayerControl, NoLayerDeps, PlainLayer +from dify_agent.adapters.llm import DifyPluginDaemonProvider +from dify_agent.layers.dify_plugin.configs import DifyPluginLayerConfig + + +class DifyPluginRuntimeHandles(BaseModel): + """Live per-entry handles for Dify plugin daemon access.""" + + http_client: httpx.AsyncClient | None = None + + model_config = ConfigDict(extra="forbid", validate_assignment=True, arbitrary_types_allowed=True) + + +_ACTIVE_PLUGIN_HANDLES: ContextVar[dict[int, DifyPluginRuntimeHandles]] = ContextVar( + "dify_agent_active_plugin_handles", + default={}, +) + + +@dataclass(slots=True) +class DifyPluginLayer(PlainLayer[NoLayerDeps, DifyPluginLayerConfig, EmptyRuntimeState, DifyPluginRuntimeHandles]): + """Layer that owns plugin daemon connection state for one active session.""" + + type_id = "dify.plugin" + + config: DifyPluginLayerConfig + daemon_url: str + daemon_api_key: str + timeout: float | httpx.Timeout | None = 600.0 + + @classmethod + @override + def from_config(cls, config: DifyPluginLayerConfig) -> Self: + """Reject construction without server-injected daemon settings.""" + del config + raise TypeError("DifyPluginLayer requires server-side daemon settings and must use a registry factory.") + + @classmethod + def from_config_with_settings( + cls, + config: DifyPluginLayerConfig, + *, + daemon_url: str, + daemon_api_key: str, + timeout: float | httpx.Timeout | None, + ) -> Self: + """Create a plugin layer from public config plus server-only daemon settings.""" + return cls(config=config, daemon_url=daemon_url, daemon_api_key=daemon_api_key, timeout=timeout) + + @override + def enter(self, control: LayerControl[EmptyRuntimeState, DifyPluginRuntimeHandles]): + """Enter the layer and expose active handles through task-local context.""" + return self._enter_with_active_handles(control) + + @asynccontextmanager + async def _enter_with_active_handles( + self, + control: LayerControl[EmptyRuntimeState, DifyPluginRuntimeHandles], + ) -> AsyncIterator[None]: + async with self.lifecycle_enter(control): + token = self._set_active_handles(control.runtime_handles) + try: + yield + finally: + _ACTIVE_PLUGIN_HANDLES.reset(token) + + def get_provider(self, *, plugin_provider: str) -> DifyPluginDaemonProvider: + """Return a provider backed by this layer's active HTTP client. + + Raises: + RuntimeError: if called outside an active compositor context for this + layer, or after its runtime handles have been closed. + """ + handles = _ACTIVE_PLUGIN_HANDLES.get().get(id(self)) + if handles is None or handles.http_client is None: + raise RuntimeError("DifyPluginLayer.get_provider() requires an active compositor context.") + return DifyPluginDaemonProvider( + tenant_id=self.config.tenant_id, + plugin_id=self.config.plugin_id, + plugin_provider=plugin_provider, + plugin_daemon_url=self.daemon_url, + plugin_daemon_api_key=self.daemon_api_key, + user_id=self.config.user_id, + timeout=self.timeout, + http_client=handles.http_client, + ) + + @override + async def on_context_create( + self, + control: LayerControl[EmptyRuntimeState, DifyPluginRuntimeHandles], + ) -> None: + await self._open_http_client(control) + + @override + async def on_context_resume( + self, + control: LayerControl[EmptyRuntimeState, DifyPluginRuntimeHandles], + ) -> None: + await self._open_http_client(control) + + @override + async def on_context_suspend( + self, + control: LayerControl[EmptyRuntimeState, DifyPluginRuntimeHandles], + ) -> None: + await self._close_http_client(control) + + @override + async def on_context_delete( + self, + control: LayerControl[EmptyRuntimeState, DifyPluginRuntimeHandles], + ) -> None: + await self._close_http_client(control) + + async def _open_http_client(self, control: LayerControl[EmptyRuntimeState, DifyPluginRuntimeHandles]) -> None: + if control.runtime_handles.http_client is None or control.runtime_handles.http_client.is_closed: + control.runtime_handles.http_client = httpx.AsyncClient(timeout=self.timeout, trust_env=False) + + async def _close_http_client(self, control: LayerControl[EmptyRuntimeState, DifyPluginRuntimeHandles]) -> None: + client = control.runtime_handles.http_client + control.runtime_handles.http_client = None + if client is not None: + await client.aclose() + + def _set_active_handles(self, handles: DifyPluginRuntimeHandles) -> Token[dict[int, DifyPluginRuntimeHandles]]: + active_handles = dict(_ACTIVE_PLUGIN_HANDLES.get()) + active_handles[id(self)] = handles + return cast(Token[dict[int, DifyPluginRuntimeHandles]], _ACTIVE_PLUGIN_HANDLES.set(active_handles)) + + +__all__ = ["DifyPluginLayer", "DifyPluginRuntimeHandles"] diff --git a/dify-agent/src/dify_agent/protocol/__init__.py b/dify-agent/src/dify_agent/protocol/__init__.py index 1b92896495..6568977386 100644 --- a/dify-agent/src/dify_agent/protocol/__init__.py +++ b/dify-agent/src/dify_agent/protocol/__init__.py @@ -1,8 +1,8 @@ """Public protocol exports shared by the Dify Agent server and clients.""" from .schemas import ( + DIFY_AGENT_MODEL_LAYER_ID, RUN_EVENT_ADAPTER, - AgentProfileConfig, BaseRunEvent, CreateRunRequest, CreateRunResponse, @@ -22,10 +22,10 @@ from .schemas import ( ) __all__ = [ - "AgentProfileConfig", "BaseRunEvent", "CreateRunRequest", "CreateRunResponse", + "DIFY_AGENT_MODEL_LAYER_ID", "EmptyRunEventData", "PydanticAIStreamRunEvent", "RUN_EVENT_ADAPTER", diff --git a/dify-agent/src/dify_agent/protocol/schemas.py b/dify-agent/src/dify_agent/protocol/schemas.py index b40d7bc856..d496f36d82 100644 --- a/dify-agent/src/dify_agent/protocol/schemas.py +++ b/dify-agent/src/dify_agent/protocol/schemas.py @@ -8,14 +8,16 @@ Redis stream ids (or in-memory equivalents in tests) are the public cursors used by polling and SSE replay. Event envelopes keep the public ``id``/``run_id``/``type``/``data``/``created_at`` shape, while each ``type`` has a typed ``data`` model so OpenAPI, Redis replay, and clients parse the same -payload contract. Successful runs publish the final JSON-safe agent output and -the resumable Agenton session snapshot together on the terminal +payload contract. Model/provider selection is part of the submitted Agenton +layer graph, not a top-level run field; the runtime reads the model layer named +by ``DIFY_AGENT_MODEL_LAYER_ID``. Successful runs publish the final JSON-safe +agent output and the resumable Agenton session snapshot together on the terminal ``run_succeeded`` event so consumers can treat terminal events as complete run summaries. """ from datetime import datetime, timezone -from typing import Annotated, ClassVar, Literal, TypeAlias +from typing import Annotated, ClassVar, Final, Literal, TypeAlias from pydantic import BaseModel, ConfigDict, Field, JsonValue, TypeAdapter from pydantic_ai.messages import AgentStreamEvent @@ -23,6 +25,7 @@ from pydantic_ai.messages import AgentStreamEvent from agenton.compositor import CompositorConfig, CompositorSessionSnapshot +DIFY_AGENT_MODEL_LAYER_ID: Final[str] = "llm" RunStatus = Literal["running", "succeeded", "failed"] RunEventType = Literal[ "run_started", @@ -37,25 +40,15 @@ def utc_now() -> datetime: return datetime.now(timezone.utc) -class AgentProfileConfig(BaseModel): - """Minimal model profile for the MVP runner. - - ``test`` uses pydantic-ai's ``TestModel`` and is credential-free. Other - profiles can be added behind this schema without changing run/event storage. - """ - - provider: Literal["test"] = "test" - output_text: str = "Hello from the Dify Agent test model." - - model_config: ClassVar[ConfigDict] = ConfigDict(extra="forbid") - - class CreateRunRequest(BaseModel): - """Request body for creating one async agent run.""" + """Request body for creating one async agent run. + + Model/provider configuration must be supplied through the compositor layer + named by ``DIFY_AGENT_MODEL_LAYER_ID``. + """ compositor: CompositorConfig session_snapshot: CompositorSessionSnapshot | None = None - agent_profile: AgentProfileConfig = Field(default_factory=AgentProfileConfig) model_config: ClassVar[ConfigDict] = ConfigDict(extra="forbid") @@ -164,10 +157,10 @@ class RunEventsResponse(BaseModel): __all__ = [ - "AgentProfileConfig", "BaseRunEvent", "CreateRunRequest", "CreateRunResponse", + "DIFY_AGENT_MODEL_LAYER_ID", "EmptyRunEventData", "PydanticAIStreamRunEvent", "RUN_EVENT_ADAPTER", diff --git a/dify-agent/src/dify_agent/runtime/agent_factory.py b/dify-agent/src/dify_agent/runtime/agent_factory.py index b3cb686bc1..d4274ec577 100644 --- a/dify-agent/src/dify_agent/runtime/agent_factory.py +++ b/dify-agent/src/dify_agent/runtime/agent_factory.py @@ -1,41 +1,36 @@ -"""Pydantic AI agent construction for runtime profiles. +"""Pydantic AI agent construction for models supplied by Agenton layers. -The initial server exposes only a credential-free ``test`` profile. The factory -keeps model selection out of ``AgentRunRunner`` so production model profiles can -be added without changing storage or HTTP contracts. Agents are returned through -an ``object`` output boundary because the runner serializes final output to the -public JSON-safe event payload instead of assuming text-only results. +The run request carries model/provider selection in the layer graph. This helper +keeps Agent construction details out of ``AgentRunRunner`` while accepting an +already resolved Pydantic AI model from the configured model layer. """ from collections.abc import Sequence -from typing import Callable, cast +from typing import Any, Callable, cast from pydantic_ai import Agent from pydantic_ai.messages import UserContent -from pydantic_ai.models.test import TestModel +from pydantic_ai.models import Model from agenton.layers.types import PydanticAIPrompt, PydanticAITool -from dify_agent.protocol.schemas import AgentProfileConfig def create_agent( - profile: AgentProfileConfig, + model: Model[Any], *, system_prompts: Sequence[PydanticAIPrompt[object]], tools: Sequence[PydanticAITool[object]], ) -> Agent[None, object]: """Create the pydantic-ai agent for one run.""" - if profile.provider == "test": - return cast( - Agent[None, object], - Agent[None, str]( - TestModel(custom_output_text=profile.output_text), - output_type=str, - system_prompt=materialize_static_system_prompts(system_prompts), - tools=tools, - ), - ) - raise ValueError(f"Unsupported agent profile provider: {profile.provider}") + return cast( + Agent[None, object], + Agent[None, str]( + model, + output_type=str, + system_prompt=materialize_static_system_prompts(system_prompts), + tools=tools, + ), + ) def materialize_static_system_prompts(system_prompts: Sequence[PydanticAIPrompt[object]]) -> list[str]: diff --git a/dify-agent/src/dify_agent/runtime/compositor_factory.py b/dify-agent/src/dify_agent/runtime/compositor_factory.py index 991ed55d7f..a51e3bb610 100644 --- a/dify-agent/src/dify_agent/runtime/compositor_factory.py +++ b/dify-agent/src/dify_agent/runtime/compositor_factory.py @@ -1,29 +1,51 @@ """Safe Agenton compositor construction for API-submitted configs. -Only explicitly registered layer types are constructible here. The MVP registry -contains ``PromptLayer`` so callers can provide system/user prompt fragments while -the runtime preserves hooks for richer profiles later. +Only explicitly registered layer types are constructible here. The default +registry contains prompt layers plus Dify plugin LLM layers. Public DTOs provide +tenant/plugin/model data, while server-only plugin daemon settings are injected +through the registry factory for ``DifyPluginLayer``. """ from typing import cast +import httpx from pydantic_ai.messages import UserContent from agenton.compositor import Compositor, CompositorConfig, LayerRegistry from agenton.layers.types import AllPromptTypes, AllToolTypes, AllUserPromptTypes, PydanticAIPrompt, PydanticAITool from agenton_collections.layers.plain.basic import PromptLayer from agenton_collections.transformers.pydantic_ai import PYDANTIC_AI_TRANSFORMERS +from dify_agent.layers.dify_plugin.configs import DifyPluginLayerConfig +from dify_agent.layers.dify_plugin.llm_layer import DifyPluginLLMLayer +from dify_agent.layers.dify_plugin.plugin_layer import DifyPluginLayer -def create_default_layer_registry() -> LayerRegistry: +def create_default_layer_registry( + *, + plugin_daemon_url: str = "http://localhost:5002", + plugin_daemon_api_key: str = "", + plugin_daemon_timeout: float | httpx.Timeout | None = 600.0, +) -> LayerRegistry: """Return the server registry of safe config-constructible layers.""" registry = LayerRegistry() registry.register_layer(PromptLayer) + registry.register_layer( + DifyPluginLayer, + factory=lambda config: DifyPluginLayer.from_config_with_settings( + DifyPluginLayerConfig.model_validate(config), + daemon_url=plugin_daemon_url, + daemon_api_key=plugin_daemon_api_key, + timeout=plugin_daemon_timeout, + ), + ) + registry.register_layer(DifyPluginLLMLayer) return registry def build_pydantic_ai_compositor( config: CompositorConfig, + *, + registry: LayerRegistry | None = None, ) -> Compositor[ PydanticAIPrompt[object], PydanticAITool[object], @@ -44,7 +66,7 @@ def build_pydantic_ai_compositor( ], Compositor.from_config( config, - registry=create_default_layer_registry(), + registry=registry or create_default_layer_registry(), **PYDANTIC_AI_TRANSFORMERS, # pyright: ignore[reportArgumentType] ), ) diff --git a/dify-agent/src/dify_agent/runtime/run_scheduler.py b/dify-agent/src/dify_agent/runtime/run_scheduler.py index 93423ade75..3b2d7b9fc2 100644 --- a/dify-agent/src/dify_agent/runtime/run_scheduler.py +++ b/dify-agent/src/dify_agent/runtime/run_scheduler.py @@ -12,8 +12,9 @@ import logging from collections.abc import Callable from typing import Protocol +from agenton.compositor import LayerRegistry from dify_agent.protocol.schemas import CreateRunRequest -from dify_agent.runtime.compositor_factory import build_pydantic_ai_compositor +from dify_agent.runtime.compositor_factory import build_pydantic_ai_compositor, create_default_layer_registry from dify_agent.runtime.event_sink import RunEventSink, emit_run_failed from dify_agent.runtime.runner import AgentRunRunner from dify_agent.runtime.user_prompt_validation import EMPTY_USER_PROMPTS_ERROR, has_non_blank_user_prompt @@ -29,7 +30,7 @@ class SchedulerStoppingError(RuntimeError): class RunStore(RunEventSink, Protocol): """Persistence boundary needed by the scheduler.""" - async def create_run(self, request: CreateRunRequest) -> RunRecord: + async def create_run(self) -> RunRecord: """Persist a new run record and return it with status ``running``.""" ... @@ -42,7 +43,7 @@ class RunnableRun(Protocol): ... -type RunRunnerFactory = Callable[[RunRecord], RunnableRun] +type RunRunnerFactory = Callable[[RunRecord, CreateRunRequest], RunnableRun] class RunScheduler: @@ -61,6 +62,7 @@ class RunScheduler: active_tasks: dict[str, asyncio.Task[None]] stopping: bool runner_factory: RunRunnerFactory + layer_registry: LayerRegistry _lifecycle_lock: asyncio.Lock def __init__( @@ -68,12 +70,14 @@ class RunScheduler: *, store: RunStore, shutdown_grace_seconds: float = 30, + layer_registry: LayerRegistry | None = None, runner_factory: RunRunnerFactory | None = None, ) -> None: self.store = store self.shutdown_grace_seconds = shutdown_grace_seconds self.active_tasks = {} self.stopping = False + self.layer_registry = layer_registry or create_default_layer_registry() self.runner_factory = runner_factory or self._default_runner_factory self._lifecycle_lock = asyncio.Lock() @@ -83,15 +87,15 @@ class RunScheduler: The returned record is already ``running``. The background task is removed from ``active_tasks`` when it finishes, regardless of success or failure. """ - compositor = build_pydantic_ai_compositor(request.compositor) + compositor = build_pydantic_ai_compositor(request.compositor, registry=self.layer_registry) if not has_non_blank_user_prompt(compositor.user_prompts): raise ValueError(EMPTY_USER_PROMPTS_ERROR) async with self._lifecycle_lock: if self.stopping: raise SchedulerStoppingError("run scheduler is shutting down") - record = await self.store.create_run(request) - task = asyncio.create_task(self._run_record(record), name=f"dify-agent-run-{record.run_id}") + record = await self.store.create_run() + task = asyncio.create_task(self._run_record(record, request), name=f"dify-agent-run-{record.run_id}") self.active_tasks[record.run_id] = task task.add_done_callback(lambda _task, run_id=record.run_id: self.active_tasks.pop(run_id, None)) return record @@ -115,18 +119,23 @@ class RunScheduler: for run_id in pending_run_ids: await self._mark_cancelled_run_failed(run_id) - async def _run_record(self, record: RunRecord) -> None: + async def _run_record(self, record: RunRecord, request: CreateRunRequest) -> None: """Execute a stored run and log failures already reflected in events.""" try: - await self.runner_factory(record).run() + await self.runner_factory(record, request).run() except asyncio.CancelledError: raise except Exception: logger.exception("scheduled run failed", extra={"run_id": record.run_id}) - def _default_runner_factory(self, record: RunRecord) -> RunnableRun: + def _default_runner_factory(self, record: RunRecord, request: CreateRunRequest) -> RunnableRun: """Create the production runner for a stored run record.""" - return AgentRunRunner(sink=self.store, request=record.request, run_id=record.run_id) + return AgentRunRunner( + sink=self.store, + request=request, + run_id=record.run_id, + layer_registry=self.layer_registry, + ) async def _mark_cancelled_run_failed(self, run_id: str) -> None: """Best-effort failure event/status for shutdown-cancelled runs.""" diff --git a/dify-agent/src/dify_agent/runtime/runner.py b/dify-agent/src/dify_agent/runtime/runner.py index 6e660123fb..5feb4aa342 100644 --- a/dify-agent/src/dify_agent/runtime/runner.py +++ b/dify-agent/src/dify_agent/runtime/runner.py @@ -3,9 +3,10 @@ The runner is storage-agnostic: it builds an Agenton compositor, enters or resumes its session, runs pydantic-ai with ``compositor.user_prompts`` as the user input, emits stream events, suspends the session on exit, snapshots it, and then -publishes a terminal success or failure event. Successful terminal events contain -both the JSON-safe final output and session snapshot; there are no separate output -or snapshot events to correlate. +publishes a terminal success or failure event. The Pydantic AI model is resolved +from the active Agenton layer named by ``DIFY_AGENT_MODEL_LAYER_ID``. Successful +terminal events contain both the JSON-safe final output and session snapshot; +there are no separate output or snapshot events to correlate. """ from collections.abc import AsyncIterable @@ -14,10 +15,11 @@ from typing import cast from pydantic import JsonValue, TypeAdapter from pydantic_ai.messages import AgentStreamEvent -from agenton.compositor import CompositorSessionSnapshot -from dify_agent.protocol.schemas import CreateRunRequest +from agenton.compositor import CompositorSessionSnapshot, LayerRegistry +from dify_agent.layers.dify_plugin.llm_layer import DifyPluginLLMLayer +from dify_agent.protocol.schemas import DIFY_AGENT_MODEL_LAYER_ID, CreateRunRequest from dify_agent.runtime.agent_factory import create_agent, normalize_user_input -from dify_agent.runtime.compositor_factory import build_pydantic_ai_compositor +from dify_agent.runtime.compositor_factory import build_pydantic_ai_compositor, create_default_layer_registry from dify_agent.runtime.event_sink import ( RunEventSink, emit_pydantic_ai_event, @@ -42,11 +44,20 @@ class AgentRunRunner: request: CreateRunRequest run_id: str + layer_registry: LayerRegistry - def __init__(self, *, sink: RunEventSink, request: CreateRunRequest, run_id: str) -> None: + def __init__( + self, + *, + sink: RunEventSink, + request: CreateRunRequest, + run_id: str, + layer_registry: LayerRegistry | None = None, + ) -> None: self.sink = sink self.request = request self.run_id = run_id + self.layer_registry = layer_registry or create_default_layer_registry() async def run(self) -> None: """Execute the run and emit the documented event sequence.""" @@ -71,7 +82,7 @@ class AgentRunRunner: async def _run_agent(self) -> tuple[JsonValue, CompositorSessionSnapshot]: """Run pydantic-ai inside an entered Agenton session.""" - compositor = build_pydantic_ai_compositor(self.request.compositor) + compositor = build_pydantic_ai_compositor(self.request.compositor, registry=self.layer_registry) session = ( compositor.session_from_snapshot(self.request.session_snapshot) if self.request.session_snapshot is not None @@ -87,11 +98,12 @@ class AgentRunRunner: async for event in events: _ = await emit_pydantic_ai_event(self.sink, run_id=self.run_id, data=event) - agent = create_agent( - self.request.agent_profile, - system_prompts=compositor.prompts, - tools=compositor.tools, - ) + try: + model = compositor.get_layer(DIFY_AGENT_MODEL_LAYER_ID, DifyPluginLLMLayer).get_model() + except (KeyError, TypeError, RuntimeError) as exc: + raise AgentRunValidationError(str(exc)) from exc + + agent = create_agent(model, system_prompts=compositor.prompts, tools=compositor.tools) result = await agent.run(normalize_user_input(user_prompts), event_stream_handler=handle_events) return _serialize_agent_output(result.output), compositor.snapshot_session(session) diff --git a/dify-agent/src/dify_agent/server/app.py b/dify-agent/src/dify_agent/server/app.py index d9f04516a9..75719b7a8d 100644 --- a/dify-agent/src/dify_agent/server/app.py +++ b/dify-agent/src/dify_agent/server/app.py @@ -13,6 +13,8 @@ from contextlib import asynccontextmanager from fastapi import FastAPI from redis.asyncio import Redis +from agenton.compositor import LayerRegistry +from dify_agent.runtime.compositor_factory import create_default_layer_registry from dify_agent.runtime.run_scheduler import RunScheduler from dify_agent.server.routes.runs import create_runs_router from dify_agent.server.settings import ServerSettings @@ -22,7 +24,12 @@ from dify_agent.storage.redis_run_store import RedisRunStore def create_app(settings: ServerSettings | None = None) -> FastAPI: """Build the FastAPI app with one shared Redis store and local scheduler.""" resolved_settings = settings or ServerSettings() - state: dict[str, RedisRunStore | RunScheduler] = {} + layer_registry = create_default_layer_registry( + plugin_daemon_url=resolved_settings.plugin_daemon_url, + plugin_daemon_api_key=resolved_settings.plugin_daemon_api_key, + plugin_daemon_timeout=resolved_settings.plugin_daemon_timeout, + ) + state: dict[str, RedisRunStore | RunScheduler | LayerRegistry] = {"layer_registry": layer_registry} @asynccontextmanager async def lifespan(_app: FastAPI) -> AsyncGenerator[None, None]: @@ -32,7 +39,11 @@ def create_app(settings: ServerSettings | None = None) -> FastAPI: prefix=resolved_settings.redis_prefix, run_retention_seconds=resolved_settings.run_retention_seconds, ) - scheduler = RunScheduler(store=store, shutdown_grace_seconds=resolved_settings.shutdown_grace_seconds) + scheduler = RunScheduler( + store=store, + shutdown_grace_seconds=resolved_settings.shutdown_grace_seconds, + layer_registry=layer_registry, + ) state["store"] = store state["scheduler"] = scheduler try: @@ -49,7 +60,10 @@ def create_app(settings: ServerSettings | None = None) -> FastAPI: def get_scheduler() -> RunScheduler: return state["scheduler"] # pyright: ignore[reportReturnType] - app.include_router(create_runs_router(get_store, get_scheduler)) + def get_layer_registry() -> LayerRegistry: + return state["layer_registry"] # pyright: ignore[reportReturnType] + + app.include_router(create_runs_router(get_store, get_scheduler, get_layer_registry)) return app diff --git a/dify-agent/src/dify_agent/server/routes/runs.py b/dify-agent/src/dify_agent/server/routes/runs.py index e279e26d33..6e79e9d193 100644 --- a/dify-agent/src/dify_agent/server/routes/runs.py +++ b/dify-agent/src/dify_agent/server/routes/runs.py @@ -13,17 +13,23 @@ from typing import Annotated from fastapi import APIRouter, Depends, Header, HTTPException, Query from fastapi.responses import StreamingResponse +from agenton.compositor import LayerRegistry from dify_agent.protocol.schemas import CreateRunRequest, CreateRunResponse, RunEventsResponse, RunStatusResponse -from dify_agent.runtime.compositor_factory import build_pydantic_ai_compositor +from dify_agent.runtime.compositor_factory import build_pydantic_ai_compositor, create_default_layer_registry from dify_agent.runtime.run_scheduler import RunScheduler, SchedulerStoppingError from dify_agent.runtime.user_prompt_validation import EMPTY_USER_PROMPTS_ERROR, has_non_blank_user_prompt from dify_agent.server.sse import sse_event_stream from dify_agent.storage.redis_run_store import RedisRunStore, RunNotFoundError -def create_runs_router(get_store: Callable[[], RedisRunStore], get_scheduler: Callable[[], RunScheduler]) -> APIRouter: +def create_runs_router( + get_store: Callable[[], RedisRunStore], + get_scheduler: Callable[[], RunScheduler], + get_layer_registry: Callable[[], LayerRegistry] | None = None, +) -> APIRouter: """Create routes bound to the application's store dependency provider.""" router = APIRouter(prefix="/runs", tags=["runs"]) + resolved_get_layer_registry = get_layer_registry or create_default_layer_registry async def store_dep() -> RedisRunStore: return get_store() @@ -37,7 +43,10 @@ def create_runs_router(get_store: Callable[[], RedisRunStore], get_scheduler: Ca scheduler: Annotated[RunScheduler, Depends(scheduler_dep)], ) -> CreateRunResponse: try: - compositor = build_pydantic_ai_compositor(request.compositor) + compositor = build_pydantic_ai_compositor( + request.compositor, + registry=resolved_get_layer_registry(), + ) except Exception as exc: raise HTTPException(status_code=422, detail=str(exc)) from exc if not has_non_blank_user_prompt(compositor.user_prompts): diff --git a/dify-agent/src/dify_agent/server/schemas.py b/dify-agent/src/dify_agent/server/schemas.py index 75d34ece34..21e8e624a0 100644 --- a/dify-agent/src/dify_agent/server/schemas.py +++ b/dify-agent/src/dify_agent/server/schemas.py @@ -23,14 +23,13 @@ def new_run_id() -> str: class RunRecord(BaseModel): """Internal representation persisted for status reads. - The embedded request and status use protocol types so persisted records stay - JSON-compatible with the public API, but callers must import those DTOs from - ``dify_agent.protocol.schemas`` rather than this server-only module. + Only status metadata is persisted. Create-run requests can contain model + credentials in layer config and must remain in scheduler memory rather than + being written to Redis. """ run_id: str status: _protocol_schemas.RunStatus - request: _protocol_schemas.CreateRunRequest created_at: datetime = Field(default_factory=_protocol_schemas.utc_now) updated_at: datetime = Field(default_factory=_protocol_schemas.utc_now) error: str | None = None diff --git a/dify-agent/src/dify_agent/server/settings.py b/dify-agent/src/dify_agent/server/settings.py index a5f9905b90..9186540b7d 100644 --- a/dify-agent/src/dify_agent/server/settings.py +++ b/dify-agent/src/dify_agent/server/settings.py @@ -9,12 +9,15 @@ DEFAULT_RUN_RETENTION_SECONDS = 3 * 24 * 60 * 60 class ServerSettings(BaseSettings): - """Environment-backed settings for Redis persistence, retention, and local scheduling.""" + """Environment-backed settings for Redis, scheduling, and plugin daemon access.""" redis_url: str = "redis://localhost:6379/0" redis_prefix: str = "dify-agent" shutdown_grace_seconds: float = 30 run_retention_seconds: int = Field(default=DEFAULT_RUN_RETENTION_SECONDS, ge=1) + plugin_daemon_url: str = "http://localhost:5002" + plugin_daemon_api_key: str = "" + plugin_daemon_timeout: float | None = 600.0 model_config: ClassVar[SettingsConfigDict] = SettingsConfigDict( env_prefix="DIFY_AGENT_", diff --git a/dify-agent/src/dify_agent/storage/redis_run_store.py b/dify-agent/src/dify_agent/storage/redis_run_store.py index f9183b37ba..72091dcc1e 100644 --- a/dify-agent/src/dify_agent/storage/redis_run_store.py +++ b/dify-agent/src/dify_agent/storage/redis_run_store.py @@ -1,10 +1,12 @@ """Redis-backed run records and per-run event streams. -The store writes run records as JSON strings and events as Redis streams. HTTP -event cursors are Redis stream ids; ``0-0`` means replay from the beginning for -polling and SSE. Records and streams share one retention window that is refreshed -when status or event data is written. Execution is scheduled in-process by -``dify_agent.runtime.run_scheduler``; Redis is not a job queue. +The store writes status-only run records as JSON strings and events as Redis +streams. HTTP event cursors are Redis stream ids; ``0-0`` means replay from the +beginning for polling and SSE. Records and streams share one retention window +that is refreshed when status or event data is written. Execution is scheduled +in-process by ``dify_agent.runtime.run_scheduler``; Redis is not a job queue, and +create-run payloads are never persisted because layer config may include model +credentials. """ from collections.abc import AsyncIterator @@ -12,14 +14,7 @@ from typing import cast from redis.asyncio import Redis -from dify_agent.protocol.schemas import ( - CreateRunRequest, - RUN_EVENT_ADAPTER, - RunEvent, - RunEventsResponse, - RunStatus, - utc_now, -) +from dify_agent.protocol.schemas import RUN_EVENT_ADAPTER, RunEvent, RunEventsResponse, RunStatus, utc_now from dify_agent.runtime.event_sink import RunEventSink from dify_agent.server.schemas import RunRecord, new_run_id from dify_agent.server.settings import DEFAULT_RUN_RETENTION_SECONDS @@ -55,10 +50,10 @@ class RedisRunStore(RunEventSink): self.prefix = prefix self.run_retention_seconds = run_retention_seconds - async def create_run(self, request: CreateRunRequest) -> RunRecord: - """Persist a running run record without enqueueing external work.""" + async def create_run(self) -> RunRecord: + """Persist a running run record without storing the create request.""" run_id = new_run_id() - record = RunRecord(run_id=run_id, status="running", request=request) + record = RunRecord(run_id=run_id, status="running") await self.redis.set( run_record_key(self.prefix, run_id), record.model_dump_json(), diff --git a/dify-agent/tests/local/agenton/compositor/test_builder_snapshot.py b/dify-agent/tests/local/agenton/compositor/test_builder_snapshot.py index 86a90f43c5..3cc888c3f4 100644 --- a/dify-agent/tests/local/agenton/compositor/test_builder_snapshot.py +++ b/dify-agent/tests/local/agenton/compositor/test_builder_snapshot.py @@ -91,6 +91,42 @@ def test_layer_node_config_accepts_config_dto_and_serializes_fields() -> None: assert [prompt.value for prompt in compositor.user_prompts] == ["ask politely"] +def test_registry_factory_constructs_layer_with_injected_dependencies() -> None: + registry = LayerRegistry() + registry.register_layer( + PromptLayer, + factory=lambda config: PromptLayer(prefix=PromptLayerConfig.model_validate(config).prefix), + ) + + compositor = CompositorBuilder(registry).add_config( + {"layers": [{"name": "prompt", "type": "plain.prompt", "config": {"prefix": "factory"}}]} + ).build() + + assert [prompt.value for prompt in compositor.prompts] == ["factory"] + + +def test_compositor_get_layer_returns_named_layer_and_validates_type() -> None: + layer = ObjectLayer("value") + compositor: Compositor[PlainPromptType, PlainToolType] = Compositor(layers=OrderedDict([("obj", layer)])) + + assert compositor.get_layer("obj") is layer + assert compositor.get_layer("obj", ObjectLayer) is layer + + try: + compositor.get_layer("missing") + except KeyError as e: + assert str(e) == '"Layer \'missing\' is not defined in this compositor."' + else: + raise AssertionError("Expected KeyError.") + + try: + compositor.get_layer("obj", PromptLayer) + except TypeError as e: + assert str(e) == "Layer 'obj' must be PromptLayer, got ObjectLayer." + else: + raise AssertionError("Expected TypeError.") + + class ObjectConsumerDeps(LayerDeps): obj: ObjectLayer[str] # pyright: ignore[reportUninitializedInstanceVariable] diff --git a/dify-agent/tests/local/dify_agent/adapters/llm/test_model.py b/dify-agent/tests/local/dify_agent/adapters/llm/test_model.py index e0595139e8..7c8ef67a43 100644 --- a/dify-agent/tests/local/dify_agent/adapters/llm/test_model.py +++ b/dify-agent/tests/local/dify_agent/adapters/llm/test_model.py @@ -40,6 +40,7 @@ class DifyLLMAdapterModelTests(unittest.IsolatedAsyncioTestCase): self, *, user_id: str | None = None, + http_client: httpx.AsyncClient | None = None, ) -> DifyPluginDaemonProvider: return DifyPluginDaemonProvider( tenant_id="tenant-1", @@ -48,6 +49,7 @@ class DifyLLMAdapterModelTests(unittest.IsolatedAsyncioTestCase): plugin_daemon_url="http://plugin-daemon", plugin_daemon_api_key="daemon-secret", user_id=user_id, + http_client=http_client, ) @asynccontextmanager @@ -172,6 +174,17 @@ class DifyLLMAdapterModelTests(unittest.IsolatedAsyncioTestCase): self.assertEqual(response.parts[0].part_kind, "text") self.assertEqual(cast(TextPart, response.parts[0]).content, "adapter response") + async def test_provider_does_not_close_external_http_client(self) -> None: + http_client = httpx.AsyncClient() + provider = self.make_provider(http_client=http_client) + + self.assertIs(provider.client.http_client, http_client) + async with provider: + pass + + self.assertFalse(http_client.is_closed) + await http_client.aclose() + async def test_request_returns_a_response(self) -> None: def handler(_request: httpx.Request) -> httpx.Response: return build_stream_response( diff --git a/dify-agent/tests/local/dify_agent/client/test_client.py b/dify-agent/tests/local/dify_agent/client/test_client.py index 56e9a71d99..64856ef59c 100644 --- a/dify-agent/tests/local/dify_agent/client/test_client.py +++ b/dify-agent/tests/local/dify_agent/client/test_client.py @@ -34,8 +34,7 @@ def _create_run_payload() -> dict[str, object]: "compositor": { "schema_version": 1, "layers": [{"name": "prompt", "type": "plain.prompt", "config": {"user": "hello"}}], - }, - "agent_profile": {"provider": "test", "output_text": "done"}, + } } @@ -80,7 +79,7 @@ def test_sync_methods_parse_protocol_dtos_and_send_create_request_dto() -> None: compositor = cast(dict[str, object], payload["compositor"]) layers = cast(list[dict[str, object]], compositor["layers"]) assert layers[0]["config"] == {"user": "hello"} - assert payload["agent_profile"] == {"provider": "test", "output_text": "done"} + assert "agent_profile" not in payload return httpx.Response(202, json={"run_id": "run-1", "status": "running"}) if request.method == "GET" and request.url.path == "/runs/run-1": return httpx.Response(200, json=_run_status_json("running")) diff --git a/dify-agent/tests/local/dify_agent/layers/__init__.py b/dify-agent/tests/local/dify_agent/layers/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/dify-agent/tests/local/dify_agent/layers/dify_plugin/__init__.py b/dify-agent/tests/local/dify_agent/layers/dify_plugin/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/dify-agent/tests/local/dify_agent/layers/dify_plugin/test_configs.py b/dify-agent/tests/local/dify_agent/layers/dify_plugin/test_configs.py new file mode 100644 index 0000000000..e82d84801d --- /dev/null +++ b/dify-agent/tests/local/dify_agent/layers/dify_plugin/test_configs.py @@ -0,0 +1,56 @@ +import pytest +from pydantic import ValidationError + +import dify_agent.layers.dify_plugin as dify_plugin_exports +from dify_agent.layers.dify_plugin import ( + DifyPluginCredentialValue, + DifyPluginLLMLayerConfig, + DifyPluginLayerConfig, +) + + +def test_dify_plugin_package_exports_client_safe_config_symbols_only() -> None: + assert dify_plugin_exports.__all__ == [ + "DifyPluginCredentialValue", + "DifyPluginLLMLayerConfig", + "DifyPluginLayerConfig", + ] + assert not hasattr(dify_plugin_exports, "DifyPluginLayer") + assert not hasattr(dify_plugin_exports, "DifyPluginLLMLayer") + + +def test_dify_plugin_layer_config_forbids_runtime_settings() -> None: + config = DifyPluginLayerConfig(tenant_id="tenant-1", plugin_id="plugin-1", user_id="user-1") + + assert config.tenant_id == "tenant-1" + assert config.plugin_id == "plugin-1" + assert config.user_id == "user-1" + with pytest.raises(ValidationError): + _ = DifyPluginLayerConfig.model_validate( + { + "tenant_id": "tenant-1", + "plugin_id": "plugin-1", + "daemon_url": "http://daemon", + } + ) + + +def test_dify_plugin_llm_config_accepts_scalar_credentials_and_model_settings() -> None: + credential: DifyPluginCredentialValue = "secret" + config = DifyPluginLLMLayerConfig( + provider="openai", + model="gpt-4o-mini", + credentials={"api_key": credential, "enabled": True, "retries": 2, "ratio": 0.5, "empty": None}, + model_settings={"temperature": 0.2, "max_tokens": 64}, + ) + + assert config.credentials == {"api_key": "secret", "enabled": True, "retries": 2, "ratio": 0.5, "empty": None} + assert config.model_settings == {"temperature": 0.2, "max_tokens": 64} + with pytest.raises(ValidationError): + _ = DifyPluginLLMLayerConfig.model_validate( + { + "provider": "openai", + "model": "gpt-4o-mini", + "credentials": {"nested": {"not": "allowed"}}, + } + ) diff --git a/dify-agent/tests/local/dify_agent/layers/dify_plugin/test_layers.py b/dify-agent/tests/local/dify_agent/layers/dify_plugin/test_layers.py new file mode 100644 index 0000000000..4f3452c394 --- /dev/null +++ b/dify-agent/tests/local/dify_agent/layers/dify_plugin/test_layers.py @@ -0,0 +1,82 @@ +import asyncio +from collections import OrderedDict +from typing import cast + +from agenton.compositor import Compositor +from agenton.layers import PlainPromptType, PlainToolType +from dify_agent.adapters.llm import DifyLLMAdapterModel +from dify_agent.layers.dify_plugin.configs import DifyPluginLLMLayerConfig, DifyPluginLayerConfig +from dify_agent.layers.dify_plugin.llm_layer import DifyPluginLLMLayer +from dify_agent.layers.dify_plugin.plugin_layer import DifyPluginLayer, DifyPluginRuntimeHandles + + +def _plugin_layer() -> DifyPluginLayer: + return DifyPluginLayer.from_config_with_settings( + DifyPluginLayerConfig(tenant_id="tenant-1", plugin_id="langgenius/openai", user_id="user-1"), + daemon_url="http://plugin-daemon", + daemon_api_key="daemon-secret", + timeout=12, + ) + + +def _llm_layer() -> DifyPluginLLMLayer: + return DifyPluginLLMLayer.from_config( + DifyPluginLLMLayerConfig( + provider="openai", + model="demo-model", + credentials={"api_key": "secret"}, + model_settings={"temperature": 0.2}, + ) + ) + + +def test_dify_plugin_layer_get_provider_requires_active_context_and_uses_runtime_client() -> None: + async def scenario() -> None: + plugin = _plugin_layer() + compositor: Compositor[PlainPromptType, PlainToolType] = Compositor(layers=OrderedDict([("plugin", plugin)])) + + try: + _ = plugin.get_provider(plugin_provider="openai") + except RuntimeError as e: + assert str(e) == "DifyPluginLayer.get_provider() requires an active compositor context." + else: + raise AssertionError("Expected RuntimeError.") + + async with compositor.enter() as session: + handles = cast(DifyPluginRuntimeHandles, cast(object, session.layer("plugin").runtime_handles)) + client = handles.http_client + assert client is not None + provider = plugin.get_provider(plugin_provider="openai") + assert provider.client.http_client is client + assert provider.client.tenant_id == "tenant-1" + assert provider.client.plugin_id == "langgenius/openai" + assert provider.client.provider == "openai" + assert provider.client.user_id == "user-1" + async with provider: + pass + assert client.is_closed is False + + assert client.is_closed is True + + asyncio.run(scenario()) + + +def test_dify_plugin_llm_layer_builds_adapter_model_from_dependency_provider() -> None: + async def scenario() -> None: + plugin = _plugin_layer() + llm = _llm_layer() + compositor: Compositor[PlainPromptType, PlainToolType] = Compositor( + layers=OrderedDict([("plugin", plugin), ("llm", llm)]), + deps_name_mapping={"llm": {"plugin": "plugin"}}, + ) + + async with compositor.enter() as session: + model = llm.get_model() + assert isinstance(model, DifyLLMAdapterModel) + assert model.model_name == "demo-model" + assert model.credentials == {"api_key": "secret"} + assert model.provider.name == "DifyPlugin/openai" + handles = cast(DifyPluginRuntimeHandles, cast(object, session.layer("plugin").runtime_handles)) + assert model.provider.client.http_client is handles.http_client + + asyncio.run(scenario()) diff --git a/dify-agent/tests/local/dify_agent/protocol/test_protocol_schemas.py b/dify-agent/tests/local/dify_agent/protocol/test_protocol_schemas.py index 9f53c08b4e..4a474dc109 100644 --- a/dify-agent/tests/local/dify_agent/protocol/test_protocol_schemas.py +++ b/dify-agent/tests/local/dify_agent/protocol/test_protocol_schemas.py @@ -3,8 +3,10 @@ from pydantic import ValidationError from pydantic_ai.messages import FinalResultEvent from agenton.compositor import CompositorSessionSnapshot +from dify_agent.protocol import DIFY_AGENT_MODEL_LAYER_ID from dify_agent.protocol.schemas import ( RUN_EVENT_ADAPTER, + CreateRunRequest, PydanticAIStreamRunEvent, RunFailedEvent, RunFailedEventData, @@ -49,6 +51,17 @@ def test_pydantic_ai_event_data_uses_agent_stream_event_model() -> None: assert isinstance(event.data, FinalResultEvent) +def test_create_run_request_rejects_agent_profile_and_model_layer_id_is_public() -> None: + assert DIFY_AGENT_MODEL_LAYER_ID == "llm" + with pytest.raises(ValidationError): + _ = CreateRunRequest.model_validate( + { + "compositor": {"layers": []}, + "agent_profile": {"provider": "test", "output_text": "done"}, + } + ) + + @pytest.mark.parametrize("event_type", ["agent_output", "session_snapshot"]) def test_removed_non_terminal_payload_events_are_rejected(event_type: str) -> None: with pytest.raises(ValidationError): diff --git a/dify-agent/tests/local/dify_agent/runtime/test_run_scheduler.py b/dify-agent/tests/local/dify_agent/runtime/test_run_scheduler.py index 06d6af2e59..8edead2ff4 100644 --- a/dify-agent/tests/local/dify_agent/runtime/test_run_scheduler.py +++ b/dify-agent/tests/local/dify_agent/runtime/test_run_scheduler.py @@ -31,9 +31,9 @@ class FakeStore: self.statuses = {} self.errors = {} - async def create_run(self, request: CreateRunRequest) -> RunRecord: + async def create_run(self) -> RunRecord: run_id = f"run-{len(self.records) + 1}" - record = RunRecord(run_id=run_id, status="running", request=request) + record = RunRecord(run_id=run_id, status="running") self.records[run_id] = record self.statuses[run_id] = "running" return record @@ -57,10 +57,10 @@ class SlowCreateStore(FakeStore): self.create_started = create_started self.release_create = release_create - async def create_run(self, request: CreateRunRequest) -> RunRecord: + async def create_run(self) -> RunRecord: _ = self.create_started.set() await self.release_create.wait() - return await super().create_run(request) + return await super().create_run() class ControlledRunner: @@ -83,7 +83,7 @@ def test_create_run_starts_background_task_and_returns_running() -> None: release = asyncio.Event() scheduler = RunScheduler( store=store, - runner_factory=lambda _record: ControlledRunner(started=started, release=release), + runner_factory=lambda _record, _request: ControlledRunner(started=started, release=release), ) record = await scheduler.create_run(_request()) @@ -106,7 +106,7 @@ def test_shutdown_marks_unfinished_runs_failed_and_appends_event() -> None: scheduler = RunScheduler( store=store, shutdown_grace_seconds=0, - runner_factory=lambda _record: ControlledRunner(started=started, release=asyncio.Event()), + runner_factory=lambda _record, _request: ControlledRunner(started=started, release=asyncio.Event()), ) record = await scheduler.create_run(_request()) await asyncio.wait_for(started.wait(), timeout=1) @@ -155,7 +155,7 @@ def test_shutdown_waits_for_in_flight_create_to_register_before_cancelling() -> scheduler = RunScheduler( store=store, shutdown_grace_seconds=0, - runner_factory=lambda _record: ControlledRunner(started=runner_started, release=asyncio.Event()), + runner_factory=lambda _record, _request: ControlledRunner(started=runner_started, release=asyncio.Event()), ) create_task = asyncio.create_task(scheduler.create_run(_request())) diff --git a/dify-agent/tests/local/dify_agent/runtime/test_runner.py b/dify-agent/tests/local/dify_agent/runtime/test_runner.py index b79e96c929..2c47a58a7f 100644 --- a/dify-agent/tests/local/dify_agent/runtime/test_runner.py +++ b/dify-agent/tests/local/dify_agent/runtime/test_runner.py @@ -1,26 +1,54 @@ import asyncio import pytest +from pydantic_ai.models.test import TestModel from agenton.compositor import CompositorConfig, LayerNodeConfig -from dify_agent.protocol.schemas import AgentProfileConfig, CreateRunRequest, RunSucceededEvent +from agenton_collections.layers.plain import PromptLayerConfig +from dify_agent.layers.dify_plugin.configs import DifyPluginLLMLayerConfig, DifyPluginLayerConfig +from dify_agent.layers.dify_plugin.llm_layer import DifyPluginLLMLayer +from dify_agent.protocol import DIFY_AGENT_MODEL_LAYER_ID +from dify_agent.protocol.schemas import CreateRunRequest, RunSucceededEvent from dify_agent.runtime.event_sink import InMemoryRunEventSink from dify_agent.runtime.runner import AgentRunRunner, AgentRunValidationError -def test_runner_emits_terminal_success_and_snapshot() -> None: - request = CreateRunRequest( +def _request(user: str | list[str] = "hello", *, llm_layer_name: str = DIFY_AGENT_MODEL_LAYER_ID) -> CreateRunRequest: + return CreateRunRequest( compositor=CompositorConfig( layers=[ LayerNodeConfig( name="prompt", type="plain.prompt", - config={"prefix": "system", "user": "hello"}, - ) + config=PromptLayerConfig(prefix="system", user=user), + ), + LayerNodeConfig( + name="plugin", + type="dify.plugin", + config=DifyPluginLayerConfig(tenant_id="tenant-1", plugin_id="langgenius/openai"), + ), + LayerNodeConfig( + name=llm_layer_name, + type="dify.plugin.llm", + deps={"plugin": "plugin"}, + config=DifyPluginLLMLayerConfig( + provider="openai", + model="demo-model", + credentials={"api_key": "secret"}, + ), + ), ] - ), - agent_profile=AgentProfileConfig(output_text="done"), + ) ) + + +def test_runner_emits_terminal_success_and_snapshot(monkeypatch: pytest.MonkeyPatch) -> None: + def fake_get_model(self: DifyPluginLLMLayer): + assert self.config.model == "demo-model" + return TestModel(custom_output_text="done") # pyright: ignore[reportReturnType] + + monkeypatch.setattr(DifyPluginLLMLayer, "get_model", fake_get_model) + request = _request() sink = InMemoryRunEventSink() asyncio.run(AgentRunRunner(sink=sink, request=request, run_id="run-1").run()) @@ -34,16 +62,16 @@ def test_runner_emits_terminal_success_and_snapshot() -> None: terminal = sink.events["run-1"][-1] assert isinstance(terminal, RunSucceededEvent) assert terminal.data.output == "done" - assert [layer.name for layer in terminal.data.session_snapshot.layers] == ["prompt"] + assert [layer.name for layer in terminal.data.session_snapshot.layers] == [ + "prompt", + "plugin", + DIFY_AGENT_MODEL_LAYER_ID, + ] assert sink.statuses["run-1"] == "succeeded" def test_runner_fails_empty_user_prompts() -> None: - request = CreateRunRequest( - compositor=CompositorConfig( - layers=[LayerNodeConfig(name="prompt", type="plain.prompt", config={"prefix": "system"})] - ) - ) + request = _request("") sink = InMemoryRunEventSink() with pytest.raises(AgentRunValidationError): @@ -54,11 +82,7 @@ def test_runner_fails_empty_user_prompts() -> None: def test_runner_fails_blank_string_user_prompt_list() -> None: - request = CreateRunRequest( - compositor=CompositorConfig( - layers=[LayerNodeConfig(name="prompt", type="plain.prompt", config={"user": ["", " "]})] - ) - ) + request = _request(["", " "]) sink = InMemoryRunEventSink() with pytest.raises(AgentRunValidationError): @@ -66,3 +90,14 @@ def test_runner_fails_blank_string_user_prompt_list() -> None: assert [event.type for event in sink.events["run-3"]] == ["run_started", "run_failed"] assert sink.statuses["run-3"] == "failed" + + +def test_runner_requires_llm_layer_id() -> None: + request = _request(llm_layer_name="not-llm") + sink = InMemoryRunEventSink() + + with pytest.raises(AgentRunValidationError, match="llm"): + asyncio.run(AgentRunRunner(sink=sink, request=request, run_id="run-4").run()) + + assert [event.type for event in sink.events["run-4"]] == ["run_started", "run_failed"] + assert sink.statuses["run-4"] == "failed" diff --git a/dify-agent/tests/local/dify_agent/server/test_app.py b/dify-agent/tests/local/dify_agent/server/test_app.py index fdb10685ae..580263f96c 100644 --- a/dify-agent/tests/local/dify_agent/server/test_app.py +++ b/dify-agent/tests/local/dify_agent/server/test_app.py @@ -2,6 +2,9 @@ import pytest from fastapi.testclient import TestClient import dify_agent.server.app as app_module +from agenton.compositor import LayerRegistry +from dify_agent.layers.dify_plugin.configs import DifyPluginLayerConfig +from dify_agent.layers.dify_plugin.plugin_layer import DifyPluginLayer from dify_agent.server.app import create_app from dify_agent.server.settings import ServerSettings from dify_agent.storage.redis_run_store import RedisRunStore @@ -22,6 +25,7 @@ class FakeRunScheduler: store: object shutdown_grace_seconds: float + layer_registry: LayerRegistry shutdown_called: bool def __init__( @@ -29,9 +33,11 @@ class FakeRunScheduler: *, store: object, shutdown_grace_seconds: float, + layer_registry: LayerRegistry, ) -> None: self.store = store self.shutdown_grace_seconds = shutdown_grace_seconds + self.layer_registry = layer_registry self.shutdown_called = False self.created.append(self) @@ -50,12 +56,23 @@ def test_create_app_creates_scheduler_and_closes_after_shutdown(monkeypatch: pyt redis_prefix="test", shutdown_grace_seconds=5, run_retention_seconds=7, + plugin_daemon_url="http://plugin-daemon", + plugin_daemon_api_key="daemon-secret", + plugin_daemon_timeout=12, ) with TestClient(create_app(settings)): assert len(FakeRunScheduler.created) == 1 scheduler = FakeRunScheduler.created[0] assert scheduler.shutdown_grace_seconds == 5 + assert isinstance(scheduler.layer_registry, LayerRegistry) + descriptor = scheduler.layer_registry.resolve("dify.plugin") + assert descriptor.factory is not None + plugin_layer = descriptor.factory(DifyPluginLayerConfig(tenant_id="tenant-1", plugin_id="plugin-1")) + assert isinstance(plugin_layer, DifyPluginLayer) + assert plugin_layer.daemon_url == "http://plugin-daemon" + assert plugin_layer.daemon_api_key == "daemon-secret" + assert plugin_layer.timeout == 12 store = scheduler.store assert isinstance(store, RedisRunStore) assert store.run_retention_seconds == 7 diff --git a/dify-agent/tests/local/dify_agent/server/test_runs_routes.py b/dify-agent/tests/local/dify_agent/server/test_runs_routes.py index 083152ae5e..d1806796e0 100644 --- a/dify-agent/tests/local/dify_agent/server/test_runs_routes.py +++ b/dify-agent/tests/local/dify_agent/server/test_runs_routes.py @@ -1,6 +1,5 @@ from fastapi.testclient import TestClient -from dify_agent.protocol.schemas import CreateRunRequest from dify_agent.runtime.run_scheduler import SchedulerStoppingError from dify_agent.server.routes.runs import create_runs_router from dify_agent.server.schemas import RunRecord @@ -44,7 +43,7 @@ def test_create_run_returns_running_from_scheduler() -> None: class CapturingScheduler: async def create_run(self, request: object) -> RunRecord: del request - return RunRecord(run_id="run-1", status="running", request=_request()) + return RunRecord(run_id="run-1", status="running") app = FastAPI() app.include_router( @@ -119,13 +118,3 @@ def test_create_run_does_not_map_infrastructure_failure_to_422() -> None: ) assert response.status_code == 500 - - -def _request(): - from agenton.compositor import CompositorConfig, LayerNodeConfig - - return CreateRunRequest( - compositor=CompositorConfig( - layers=[LayerNodeConfig(name="prompt", type="plain.prompt", config={"user": "hello"})] - ) - ) diff --git a/dify-agent/tests/local/dify_agent/storage/test_redis_run_store.py b/dify-agent/tests/local/dify_agent/storage/test_redis_run_store.py index 518e9b4bec..ac05df3802 100644 --- a/dify-agent/tests/local/dify_agent/storage/test_redis_run_store.py +++ b/dify-agent/tests/local/dify_agent/storage/test_redis_run_store.py @@ -4,20 +4,12 @@ from typing import cast from pydantic import JsonValue -from agenton.compositor import CompositorConfig, CompositorSessionSnapshot, LayerNodeConfig, LayerSessionSnapshot +from agenton.compositor import CompositorSessionSnapshot, LayerSessionSnapshot from agenton.layers import LifecycleState -from dify_agent.protocol.schemas import CreateRunRequest, RunStartedEvent, RunSucceededEvent, RunSucceededEventData +from dify_agent.protocol.schemas import RunStartedEvent, RunSucceededEvent, RunSucceededEventData from dify_agent.storage.redis_run_store import DEFAULT_RUN_RETENTION_SECONDS, RedisRunStore -def _request() -> CreateRunRequest: - return CreateRunRequest( - compositor=CompositorConfig( - layers=[LayerNodeConfig(name="prompt", type="plain.prompt", config={"user": "hello"})] - ) - ) - - class FakeRedis: commands: list[tuple[object, ...]] values: dict[str, object] @@ -74,18 +66,19 @@ def test_create_run_writes_running_record_without_job_queue_and_with_retention() redis = FakeRedis() store = RedisRunStore(redis, prefix="test") # pyright: ignore[reportArgumentType] - record = asyncio.run(store.create_run(_request())) + record = asyncio.run(store.create_run()) assert record.status == "running" assert [command[0] for command in redis.commands] == ["set"] assert redis.commands[0][1] == f"test:runs:{record.run_id}:record" assert redis.commands[0][3] == DEFAULT_RUN_RETENTION_SECONDS + assert "request" not in str(redis.commands[0][2]) def test_update_status_refreshes_record_retention() -> None: redis = FakeRedis() store = RedisRunStore(redis, prefix="test", run_retention_seconds=60) # pyright: ignore[reportArgumentType] - record = asyncio.run(store.create_run(_request())) + record = asyncio.run(store.create_run()) redis.commands.clear() asyncio.run(store.update_status(record.run_id, "succeeded")) @@ -128,7 +121,7 @@ def test_get_events_round_trips_run_succeeded_output_and_session_snapshot() -> N ) async def scenario() -> tuple[str, RunSucceededEvent]: - record = await store.create_run(_request()) + record = await store.create_run() event_id = await store.append_event( RunSucceededEvent( id="local-only",