From 52903a6e699d809bf81a1fcd3a447e8dc3e8a552 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Yanli=20=E7=9B=90=E7=B2=92?= Date: Thu, 25 Jun 2026 23:55:17 +0800 Subject: [PATCH] Materialize drive mentions through shell --- api/clients/agent_backend/request_builder.py | 93 ++-- .../agent_backend/test_request_builder.py | 24 +- .../agent_app/test_runtime_request_builder.py | 75 ++- .../agent_v2/test_runtime_request_builder.py | 14 +- .../src/dify_agent/layers/drive/configs.py | 4 +- .../src/dify_agent/layers/drive/layer.py | 244 ++++----- .../src/dify_agent/layers/shell/configs.py | 5 +- .../src/dify_agent/layers/shell/layer.py | 10 +- .../dify_agent/runtime/compositor_factory.py | 10 +- .../dify_agent/layers/drive/test_configs.py | 4 +- .../dify_agent/layers/drive/test_layer.py | 492 ++++++------------ .../dify_agent/layers/shell/test_configs.py | 3 + .../dify_agent/layers/shell/test_layer.py | 18 +- 13 files changed, 419 insertions(+), 577 deletions(-) diff --git a/api/clients/agent_backend/request_builder.py b/api/clients/agent_backend/request_builder.py index 29cc28a3179..639976fddca 100644 --- a/api/clients/agent_backend/request_builder.py +++ b/api/clients/agent_backend/request_builder.py @@ -78,11 +78,22 @@ def _filter_snapshot_to_specs( return CompositorSessionSnapshot(schema_version=snapshot.schema_version, layers=filtered_layers) -def _shell_layer_deps(*, include_drive: bool) -> dict[str, str]: - deps = {"execution_context": DIFY_EXECUTION_CONTEXT_LAYER_ID} - if include_drive: - deps["drive"] = DIFY_DRIVE_LAYER_ID - return deps +def _shell_layer_deps() -> dict[str, str]: + return {"execution_context": DIFY_EXECUTION_CONTEXT_LAYER_ID} + + +def _drive_layer_deps() -> dict[str, str]: + return {"shell": DIFY_SHELL_LAYER_ID} + + +def _shell_config_with_drive_ref( + shell_config: DifyShellLayerConfig | None, + drive_config: DifyDriveLayerConfig | None, +) -> DifyShellLayerConfig: + config = shell_config or DifyShellLayerConfig() + if drive_config is None: + return config + return config.model_copy(update={"agent_stub_drive_ref": drive_config.drive_ref}) class AgentBackendModelConfig(BaseModel): @@ -263,14 +274,29 @@ class AgentBackendRunRequestBuilder: ] ) + include_shell = run_input.include_shell or run_input.drive_config is not None + if include_shell: + # Sandboxed bash workspace (dify.shell). It enters before drive so + # drive can materialize mentioned targets with `dify-agent drive pull` + # in the same shell-visible filesystem used by model commands. + layers.append( + RunLayerSpec( + name=DIFY_SHELL_LAYER_ID, + type=DIFY_SHELL_LAYER_TYPE_ID, + deps=_shell_layer_deps(), + metadata=run_input.metadata, + config=_shell_config_with_drive_ref(run_input.shell_config, run_input.drive_config), + ) + ) + if run_input.drive_config is not None: - # Drive Skills & Files declaration (dify.drive): a config-only index; - # the agent pulls listed entries through the back proxy by drive_ref. + # Drive Skills & Files declaration (dify.drive): the catalog plus + # prompt-mentioned entries eagerly pulled through the shell layer. layers.append( RunLayerSpec( name=DIFY_DRIVE_LAYER_ID, type=DIFY_DRIVE_LAYER_TYPE_ID, - deps={"execution_context": DIFY_EXECUTION_CONTEXT_LAYER_ID}, + deps=_drive_layer_deps(), metadata=run_input.metadata, config=run_input.drive_config, ) @@ -336,21 +362,6 @@ class AgentBackendRunRequestBuilder: ) ) - if run_input.include_shell: - # Sandboxed bash workspace (dify.shell). Depends on execution_context - # so the agent server can mint per-command Agent Stub env, and on - # drive when present so that env points at /mnt/drive/. - # shellctl connection itself is server-injected. - layers.append( - RunLayerSpec( - name=DIFY_SHELL_LAYER_ID, - type=DIFY_SHELL_LAYER_TYPE_ID, - deps=_shell_layer_deps(include_drive=run_input.drive_config is not None), - metadata=run_input.metadata, - config=run_input.shell_config or DifyShellLayerConfig(), - ) - ) - if run_input.output is not None: layers.append( RunLayerSpec( @@ -462,14 +473,29 @@ class AgentBackendRunRequestBuilder: ] ) + include_shell = run_input.include_shell or run_input.drive_config is not None + if include_shell: + # Sandboxed bash workspace (dify.shell). It enters before drive so + # drive can materialize mentioned targets with `dify-agent drive pull` + # in the same shell-visible filesystem used by model commands. + layers.append( + RunLayerSpec( + name=DIFY_SHELL_LAYER_ID, + type=DIFY_SHELL_LAYER_TYPE_ID, + deps=_shell_layer_deps(), + metadata=run_input.metadata, + config=_shell_config_with_drive_ref(run_input.shell_config, run_input.drive_config), + ) + ) + if run_input.drive_config is not None: - # Drive Skills & Files declaration (dify.drive): a config-only index; - # the agent pulls listed entries through the back proxy by drive_ref. + # Drive Skills & Files declaration (dify.drive): the catalog plus + # prompt-mentioned entries eagerly pulled through the shell layer. layers.append( RunLayerSpec( name=DIFY_DRIVE_LAYER_ID, type=DIFY_DRIVE_LAYER_TYPE_ID, - deps={"execution_context": DIFY_EXECUTION_CONTEXT_LAYER_ID}, + deps=_drive_layer_deps(), metadata=run_input.metadata, config=run_input.drive_config, ) @@ -537,21 +563,6 @@ class AgentBackendRunRequestBuilder: ) ) - if run_input.include_shell: - # Sandboxed bash workspace (dify.shell). Depends on execution_context - # so the agent server can mint per-command Agent Stub env, and on - # drive when present so that env points at /mnt/drive/. - # shellctl connection itself is server-injected. - layers.append( - RunLayerSpec( - name=DIFY_SHELL_LAYER_ID, - type=DIFY_SHELL_LAYER_TYPE_ID, - deps=_shell_layer_deps(include_drive=run_input.drive_config is not None), - metadata=run_input.metadata, - config=run_input.shell_config or DifyShellLayerConfig(), - ) - ) - if run_input.output is not None: layers.append( RunLayerSpec( diff --git a/api/tests/unit_tests/clients/agent_backend/test_request_builder.py b/api/tests/unit_tests/clients/agent_backend/test_request_builder.py index 3bb73289580..d3f97072589 100644 --- a/api/tests/unit_tests/clients/agent_backend/test_request_builder.py +++ b/api/tests/unit_tests/clients/agent_backend/test_request_builder.py @@ -339,7 +339,7 @@ def test_workflow_request_builder_adds_shell_layer_when_include_shell(): assert shell_config.env[0].name == "PROJECT_NAME" -def test_workflow_request_builder_binds_shell_to_drive_when_configured(): +def test_workflow_request_builder_binds_drive_to_shell_when_configured(): run_input = _run_input() run_input.include_shell = True run_input.drive_config = DifyDriveLayerConfig(drive_ref="agent-agent-1") @@ -348,11 +348,11 @@ def test_workflow_request_builder_binds_shell_to_drive_when_configured(): layers = {layer.name: layer for layer in request.composition.layers} layer_names = [layer.name for layer in request.composition.layers] - assert layers[DIFY_SHELL_LAYER_ID].deps == { - "execution_context": DIFY_EXECUTION_CONTEXT_LAYER_ID, - "drive": DIFY_DRIVE_LAYER_ID, - } - assert layer_names.index(DIFY_DRIVE_LAYER_ID) < layer_names.index(DIFY_SHELL_LAYER_ID) + assert layers[DIFY_SHELL_LAYER_ID].deps == {"execution_context": DIFY_EXECUTION_CONTEXT_LAYER_ID} + shell_config = cast(DifyShellLayerConfig, layers[DIFY_SHELL_LAYER_ID].config) + assert shell_config.agent_stub_drive_ref == "agent-agent-1" + assert layers[DIFY_DRIVE_LAYER_ID].deps == {"shell": DIFY_SHELL_LAYER_ID} + assert layer_names.index(DIFY_SHELL_LAYER_ID) < layer_names.index(DIFY_DRIVE_LAYER_ID) def test_agent_app_request_builder_omits_shell_layer_by_default(): @@ -374,7 +374,7 @@ def test_agent_app_request_builder_adds_shell_layer_when_include_shell(): assert shell_config.env[0].name == "APP_ENV" -def test_agent_app_request_builder_binds_shell_to_drive_when_configured(): +def test_agent_app_request_builder_binds_drive_to_shell_when_configured(): run_input = _agent_app_input(include_shell=True) run_input.drive_config = DifyDriveLayerConfig(drive_ref="agent-agent-1") @@ -382,11 +382,11 @@ def test_agent_app_request_builder_binds_shell_to_drive_when_configured(): layers = {layer.name: layer for layer in request.composition.layers} layer_names = [layer.name for layer in request.composition.layers] - assert layers[DIFY_SHELL_LAYER_ID].deps == { - "execution_context": DIFY_EXECUTION_CONTEXT_LAYER_ID, - "drive": DIFY_DRIVE_LAYER_ID, - } - assert layer_names.index(DIFY_DRIVE_LAYER_ID) < layer_names.index(DIFY_SHELL_LAYER_ID) + assert layers[DIFY_SHELL_LAYER_ID].deps == {"execution_context": DIFY_EXECUTION_CONTEXT_LAYER_ID} + shell_config = cast(DifyShellLayerConfig, layers[DIFY_SHELL_LAYER_ID].config) + assert shell_config.agent_stub_drive_ref == "agent-agent-1" + assert layers[DIFY_DRIVE_LAYER_ID].deps == {"shell": DIFY_SHELL_LAYER_ID} + assert layer_names.index(DIFY_SHELL_LAYER_ID) < layer_names.index(DIFY_DRIVE_LAYER_ID) def test_agent_app_request_builder_adds_knowledge_layer_when_configured(): diff --git a/api/tests/unit_tests/core/app/apps/agent_app/test_runtime_request_builder.py b/api/tests/unit_tests/core/app/apps/agent_app/test_runtime_request_builder.py index ef5aff1dd41..f8fe4dc942e 100644 --- a/api/tests/unit_tests/core/app/apps/agent_app/test_runtime_request_builder.py +++ b/api/tests/unit_tests/core/app/apps/agent_app/test_runtime_request_builder.py @@ -85,6 +85,49 @@ class _NoToolsBuilder: del kwargs +def _mock_empty_drive_catalog(monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setattr( + "core.workflow.nodes.agent_v2.runtime_request_builder.AgentDriveService.list_skills", + lambda self, *, tenant_id, agent_id: [], + ) + monkeypatch.setattr( + "core.workflow.nodes.agent_v2.runtime_request_builder.AgentDriveService.manifest", + lambda self, *, tenant_id, agent_id, prefix="", include_download_url=False: [], + ) + + +def _mock_drive_catalog(monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setattr( + "core.workflow.nodes.agent_v2.runtime_request_builder.AgentDriveService.list_skills", + lambda self, *, tenant_id, agent_id: [ + { + "path": "tender-analyzer", + "skill_md_key": "tender-analyzer/SKILL.md", + "archive_key": "tender-analyzer/.DIFY-SKILL-FULL.zip", + "name": "Tender Analyzer", + "description": "Parses RFPs.", + "size": 123, + "mime_type": "text/markdown", + "hash": "hash-1", + "created_at": 1, + } + ], + ) + monkeypatch.setattr( + "core.workflow.nodes.agent_v2.runtime_request_builder.AgentDriveService.manifest", + lambda self, *, tenant_id, agent_id, prefix="", include_download_url=False: [ + {"key": "tender-analyzer/SKILL.md", "is_skill": True}, + {"key": "tender-analyzer/.DIFY-SKILL-FULL.zip", "is_skill": False}, + {"key": "files/sample.pdf", "is_skill": False}, + ], + ) + + +@pytest.fixture(autouse=True) +def _mock_default_agent_app_drive_catalog(monkeypatch: pytest.MonkeyPatch) -> None: + _mock_empty_drive_catalog(monkeypatch) + + def _ctx(soul: AgentSoulConfig, *, query: str = "hello") -> AgentAppRuntimeBuildContext: dify_context = SimpleNamespace( tenant_id="tenant-1", @@ -128,7 +171,15 @@ class TestAgentAppRuntimeRequestBuilder: req = result.request assert req.purpose == "agent_app" names = [layer.name for layer in req.composition.layers] - assert names == ["agent_soul_prompt", "agent_app_user_prompt", "execution_context", "drive", "history", "llm"] + assert names == [ + "agent_soul_prompt", + "agent_app_user_prompt", + "execution_context", + "shell", + "drive", + "history", + "llm", + ] # plugin_id / provider normalized for plugin-daemon transport. llm = next(layer for layer in req.composition.layers if layer.name == "llm") assert llm.config.plugin_id == "langgenius/openai" @@ -278,6 +329,7 @@ class TestAgentAppDriveLayer: monkeypatch.setattr( "core.app.apps.agent_app.runtime_request_builder.dify_config.AGENT_DRIVE_MANIFEST_ENABLED", True ) + _mock_drive_catalog(monkeypatch) builder = AgentAppRuntimeRequestBuilder( credentials_provider=_FakeCredentialsProvider(), plugin_tools_builder=_NoToolsBuilder(), # type: ignore[arg-type] @@ -287,15 +339,18 @@ class TestAgentAppDriveLayer: drive = next(layer for layer in result.request.composition.layers if layer.name == "drive") assert drive.type == "dify.drive" - assert drive.deps == {"execution_context": "execution_context"} + assert drive.deps == {"shell": DIFY_SHELL_LAYER_ID} assert drive.config.drive_ref == "agent-agent-1" assert [skill.skill_md_key for skill in drive.config.skills] == ["tender-analyzer/SKILL.md"] assert drive.config.mentioned_skill_keys == ["tender-analyzer/SKILL.md"] - # injected right after execution_context, mirroring the workflow surface + # shell enters first; drive uses that shell to materialize mentioned targets. names = [layer.name for layer in result.request.composition.layers] - assert names.index("drive") == names.index("execution_context") + 1 + assert names.index(DIFY_SHELL_LAYER_ID) == names.index("execution_context") + 1 + assert names.index("drive") == names.index(DIFY_SHELL_LAYER_ID) + 1 - def test_drive_layer_injected_with_empty_catalog_and_shell_depends_on_it(self, monkeypatch: pytest.MonkeyPatch): + def test_drive_layer_injected_with_empty_catalog_and_drive_depends_on_shell( + self, monkeypatch: pytest.MonkeyPatch + ): monkeypatch.setattr( "core.app.apps.agent_app.runtime_request_builder.dify_config.AGENT_DRIVE_MANIFEST_ENABLED", True ) @@ -310,10 +365,9 @@ class TestAgentAppDriveLayer: layers = {layer.name: layer for layer in result.request.composition.layers} assert layers["drive"].config.drive_ref == "agent-agent-1" assert layers["drive"].config.skills == [] - assert layers[DIFY_SHELL_LAYER_ID].deps == { - "execution_context": "execution_context", - "drive": "drive", - } + assert layers[DIFY_SHELL_LAYER_ID].deps == {"execution_context": "execution_context"} + assert layers[DIFY_SHELL_LAYER_ID].config.agent_stub_drive_ref == "agent-agent-1" + assert layers["drive"].deps == {"shell": DIFY_SHELL_LAYER_ID} def test_no_drive_layer_when_flag_disabled(self, monkeypatch: pytest.MonkeyPatch): monkeypatch.setattr( @@ -333,6 +387,7 @@ class TestAgentAppDriveLayer: monkeypatch.setattr( "core.app.apps.agent_app.runtime_request_builder.dify_config.AGENT_DRIVE_MANIFEST_ENABLED", True ) + _mock_drive_catalog(monkeypatch) soul = _soul_with_model() soul.prompt.system_prompt = ( "Use [§skill:tender-analyzer%2FSKILL.md:Tender Analyzer§] and [§file:files%2Fsample.pdf:sample.pdf§]." @@ -355,6 +410,7 @@ class TestAgentAppDriveLayer: monkeypatch.setattr( "core.app.apps.agent_app.runtime_request_builder.dify_config.AGENT_DRIVE_MANIFEST_ENABLED", True ) + _mock_drive_catalog(monkeypatch) soul = _soul_with_model() soul.prompt.system_prompt = ( "Use [§skill:ghost%2FSKILL.md:Ghost Skill§], [§file:files%2Fghost.txt:Ghost File§], " @@ -375,6 +431,7 @@ class TestAgentAppDriveLayer: monkeypatch.setattr( "core.app.apps.agent_app.runtime_request_builder.dify_config.AGENT_DRIVE_MANIFEST_ENABLED", True ) + _mock_drive_catalog(monkeypatch) soul = _soul_with_model() soul.prompt.system_prompt = ( "Use [§skill:tender-analyzer%2FSKILL.md:Tender Analyzer§] and [§file:files%2Fsample.pdf:sample.pdf§]" diff --git a/api/tests/unit_tests/core/workflow/nodes/agent_v2/test_runtime_request_builder.py b/api/tests/unit_tests/core/workflow/nodes/agent_v2/test_runtime_request_builder.py index dc5a3e9720b..abd12aa21bd 100644 --- a/api/tests/unit_tests/core/workflow/nodes/agent_v2/test_runtime_request_builder.py +++ b/api/tests/unit_tests/core/workflow/nodes/agent_v2/test_runtime_request_builder.py @@ -1032,10 +1032,9 @@ def test_workflow_run_request_contains_drive_layer_with_empty_catalog(monkeypatc "mentioned_skill_keys": [], "mentioned_file_keys": [], } - assert layers[DIFY_SHELL_LAYER_ID]["deps"] == { - "execution_context": DIFY_EXECUTION_CONTEXT_LAYER_ID, - "drive": "drive", - } + assert layers[DIFY_SHELL_LAYER_ID]["deps"] == {"execution_context": DIFY_EXECUTION_CONTEXT_LAYER_ID} + assert layers[DIFY_SHELL_LAYER_ID]["config"]["agent_stub_drive_ref"] == "agent-agent-1" + assert layers["drive"]["deps"] == {"shell": DIFY_SHELL_LAYER_ID} def test_build_drive_layer_config_requires_agent_identity(): @@ -1061,11 +1060,12 @@ def test_workflow_run_request_contains_drive_layer_when_flag_enabled(monkeypatch dumped = result.request.model_dump(mode="json") layer_names = [layer["name"] for layer in dumped["composition"]["layers"]] assert "drive" in layer_names - # injected right after execution_context, before history/llm - assert layer_names.index("drive") == layer_names.index("execution_context") + 1 + # shell enters first; drive uses that shell to materialize mentioned targets. + assert layer_names.index(DIFY_SHELL_LAYER_ID) == layer_names.index("execution_context") + 1 + assert layer_names.index("drive") == layer_names.index(DIFY_SHELL_LAYER_ID) + 1 drive = next(layer for layer in dumped["composition"]["layers"] if layer["name"] == "drive") assert drive["type"] == "dify.drive" - assert drive["deps"] == {"execution_context": "execution_context"} + assert drive["deps"] == {"shell": DIFY_SHELL_LAYER_ID} assert drive["config"]["drive_ref"] == "agent-agent-1" assert drive["config"]["skills"] == [ { diff --git a/dify-agent/src/dify_agent/layers/drive/configs.py b/dify-agent/src/dify_agent/layers/drive/configs.py index f74a24298fd..20fd514baf2 100644 --- a/dify-agent/src/dify_agent/layers/drive/configs.py +++ b/dify-agent/src/dify_agent/layers/drive/configs.py @@ -6,7 +6,9 @@ only: skills are declared as metadata, not content, and plain files are listed only when the prompt explicitly mentions their drive keys. The API backend catalogs and writes this config; the Agent backend consumes it -(ENG-387: pull via back proxy, lazy-load SKILL.md, materialize files). +by running sandbox-visible ``dify-agent drive pull`` commands through the shell +layer so materialized files live in the same filesystem that model shell jobs +use. """ from typing import Final diff --git a/dify-agent/src/dify_agent/layers/drive/layer.py b/dify-agent/src/dify_agent/layers/drive/layer.py index af5011b7b69..d370501ae0e 100644 --- a/dify-agent/src/dify_agent/layers/drive/layer.py +++ b/dify-agent/src/dify_agent/layers/drive/layer.py @@ -1,37 +1,29 @@ -"""Runtime Dify drive layer with eager pull for prompt-mentioned targets. +"""Runtime Dify drive layer with shell-backed eager pulls. The API backend sends the full drive skill catalog plus the ordered drive keys mentioned in the prompt. When the layer enters a run context it eagerly pulls -those mentioned skills/files from the Dify inner drive bridge, materializes them -under the fixed Agent Stub drive base for ``drive_ref``, and contributes a +those mentioned skills/files through the already-active shell layer by running +the sandbox-visible ``dify-agent drive pull`` command, then contributes a concise prompt block describing what was loaded. It also contributes a suffix prompt with the remaining skill catalog plus ``dify-agent drive`` and ``dify-agent file`` usage so the model has concrete Agent Stub commands for -materializing drive content and workflow files when a shell layer is available. +materializing drive content and workflow files. """ from __future__ import annotations -import asyncio +import shlex from dataclasses import dataclass, field from pathlib import Path -from typing import Any, ClassVar, cast +from typing import ClassVar -import httpx from typing_extensions import Self, override -from agenton.layers import EmptyRuntimeState, Layer, LayerDeps, PlainLayer -from dify_agent.agent_stub._drive_materialization import ( - DriveDownloadPayload, - DriveMaterializationTransferError, - DriveMaterializationValidationError, - materialize_drive_downloads, -) +from agenton.layers import EmptyRuntimeState, LayerDeps, PlainLayer from dify_agent.agent_stub.protocol import agent_stub_drive_base_for_ref -from dify_agent.agent_stub.protocol.agent_stub import AgentStubDriveItem, AgentStubDriveManifestResponse from dify_agent.layers.drive.configs import DIFY_DRIVE_LAYER_TYPE_ID, DifyDriveLayerConfig +from dify_agent.layers.shell.layer import DifyShellLayer -_DOWNLOAD_CONCURRENCY = 4 _AGENT_STUB_CLI_USAGE_PROMPT = """Agent Stub CLI usage is available inside shell jobs: Drive assets are Agent Soul versioned assets: @@ -59,40 +51,23 @@ class DifyDriveLayerError(RuntimeError): class DifyDriveDeps(LayerDeps): - execution_context: Layer[Any, Any, Any, Any, Any, Any] # pyright: ignore[reportUninitializedInstanceVariable] + shell: DifyShellLayer # pyright: ignore[reportUninitializedInstanceVariable] @dataclass(slots=True) class DifyDriveLayer(PlainLayer[DifyDriveDeps, DifyDriveLayerConfig, EmptyRuntimeState]): - """Drive runtime layer that eagerly materializes prompt-mentioned drive targets.""" + """Drive runtime layer that materializes prompt-mentioned targets via shell.""" type_id: ClassVar[str | None] = DIFY_DRIVE_LAYER_TYPE_ID config: DifyDriveLayerConfig - inner_api_url: str - inner_api_key: str _loaded_skill_bodies: dict[str, str] = field(default_factory=dict) _pulled_file_paths: dict[str, str] = field(default_factory=dict) @classmethod @override def from_config(cls, config: DifyDriveLayerConfig) -> Self: - del config - raise TypeError("DifyDriveLayer requires server-side Dify API settings and must use a provider factory.") - - @classmethod - def from_config_with_settings( - cls, - config: DifyDriveLayerConfig, - *, - inner_api_url: str, - inner_api_key: str, - ) -> Self: - return cls( - config=DifyDriveLayerConfig.model_validate(config), - inner_api_url=inner_api_url.rstrip("/"), - inner_api_key=inner_api_key, - ) + return cls(config=DifyDriveLayerConfig.model_validate(config)) @property @override @@ -127,9 +102,7 @@ class DifyDriveLayer(PlainLayer[DifyDriveDeps, DifyDriveLayerConfig, EmptyRuntim if pulled_skill_path is None: continue local_path = Path(pulled_skill_path).parent - loaded_skill_sections.append( - f"Path: {skill.path}\nLocal path: {local_path}\nSKILL.md:\n{body}" - ) + loaded_skill_sections.append(f"Path: {skill.path}\nLocal path: {local_path}\nSKILL.md:\n{body}") if loaded_skill_sections: sections.append("Loaded mentioned skills:\n\n" + "\n\n".join(loaded_skill_sections)) @@ -154,12 +127,16 @@ class DifyDriveLayer(PlainLayer[DifyDriveDeps, DifyDriveLayerConfig, EmptyRuntim if skill.skill_md_key not in mentioned_skill_keys ] if other_skills: + pull_and_read_command = ( + '`skill_dir="$(dify-agent drive pull --to /tmp/drive)"; ' + + 'printf "%s\\n" "$skill_dir"; cat "$skill_dir/SKILL.md"`' + ) sections.append( "Other available skills:\n" + "\n".join(other_skills) + "\n\nTo use one, pull it and read its SKILL.md in one command: " - '`skill_dir="$(dify-agent drive pull --to /tmp/drive)"; ' - 'printf "%s\\n" "$skill_dir"; cat "$skill_dir/SKILL.md"`.' + + pull_and_read_command + + "." ) sections.append(_AGENT_STUB_CLI_USAGE_PROMPT) return "\n\n".join(sections) @@ -167,113 +144,106 @@ class DifyDriveLayer(PlainLayer[DifyDriveDeps, DifyDriveLayerConfig, EmptyRuntim async def _pull_mentioned_targets(self) -> None: self._loaded_skill_bodies = {} self._pulled_file_paths = {} - targets: list[tuple[str, bool]] = [ - (self._skill_prefix(skill_key), False) for skill_key in self.config.mentioned_skill_keys - ] + [(file_key, True) for file_key in self.config.mentioned_file_keys] + targets = self._mentioned_pull_targets() if not targets: return - tenant_id = self._require_tenant_id() - manifest_items = await self._fetch_manifest_items(tenant_id=tenant_id, targets=targets) - written_paths = await self._download_items(manifest_items) + script = self._build_shell_pull_script(targets=targets) + result = await self.deps.shell.run_remote_script(script, inject_agent_stub_env=True) + if result.exit_code != 0: + raise DifyDriveLayerError( + f"drive mentioned pull failed in shell: {result.status} exit_code={result.exit_code}\n{result.output}" + ) + if result.truncated: + raise DifyDriveLayerError("drive mentioned pull output was truncated before SKILL.md content was loaded") + + written_paths, skill_bodies = self._parse_shell_pull_output(result.output) + self._record_pulled_paths(written_paths) + for skill_key in self.config.mentioned_skill_keys: + body = skill_bodies.get(skill_key) + if body is None: + raise DifyDriveLayerError(f"missing pulled SKILL.md content for mentioned skill {skill_key}") + self._loaded_skill_bodies[skill_key] = body + + def _build_shell_pull_script(self, *, targets: list[tuple[str, bool]]) -> str: + pull_targets = list(dict.fromkeys(prefix for prefix, _exact in targets)) + base_path = agent_stub_drive_base_for_ref(self.config.drive_ref) + lines = [ + "set -eu", + f"base={shlex.quote(base_path)}", + "dify-agent drive pull " + + " ".join(shlex.quote(target) for target in pull_targets) + + ' --to "$base"', + ] + for skill_key in self.config.mentioned_skill_keys: + skill_path = self._shell_local_path(skill_key) + lines.extend( + [ + f"test -f {shlex.quote(skill_path)}", + f"printf '\\n__DIFY_DRIVE_MENTIONED_PATH__\\t%s\\t%s\\n' {shlex.quote(skill_key)} {shlex.quote(skill_path)}", + f"printf '__DIFY_DRIVE_SKILL_BEGIN__\\t%s\\n' {shlex.quote(skill_key)}", + f"cat {shlex.quote(skill_path)}", + f"printf '\\n__DIFY_DRIVE_SKILL_END__\\t%s\\n' {shlex.quote(skill_key)}", + ] + ) + for file_key in self.config.mentioned_file_keys: + file_path = self._shell_local_path(file_key) + lines.extend( + [ + f"test -e {shlex.quote(file_path)}", + f"printf '\\n__DIFY_DRIVE_MENTIONED_PATH__\\t%s\\t%s\\n' {shlex.quote(file_key)} {shlex.quote(file_path)}", + ] + ) + return "\n".join(lines) + + def _parse_shell_pull_output(self, output: str) -> tuple[dict[str, str], dict[str, str]]: + written_paths: dict[str, str] = {} + skill_bodies: dict[str, str] = {} + current_skill_key: str | None = None + current_skill_body: list[str] = [] + + for line in output.splitlines(keepends=True): + stripped_line = line.rstrip("\n") + if current_skill_key is not None: + if stripped_line == f"__DIFY_DRIVE_SKILL_END__\t{current_skill_key}": + skill_bodies[current_skill_key] = "".join(current_skill_body) + current_skill_key = None + current_skill_body = [] + continue + current_skill_body.append(line) + continue + + if stripped_line.startswith("__DIFY_DRIVE_MENTIONED_PATH__\t"): + parts = stripped_line.split("\t", 2) + if len(parts) != 3: + raise DifyDriveLayerError("drive mentioned pull emitted an invalid path marker") + _marker, key, path = parts + written_paths[key] = path + continue + if stripped_line.startswith("__DIFY_DRIVE_SKILL_BEGIN__\t"): + current_skill_key = stripped_line.split("\t", 1)[1] + current_skill_body = [] + + if current_skill_key is not None: + raise DifyDriveLayerError(f"drive mentioned pull omitted SKILL.md end marker for {current_skill_key}") + return written_paths, skill_bodies + + def _record_pulled_paths(self, written_paths: dict[str, str]) -> None: self._pulled_file_paths = written_paths for file_key in self.config.mentioned_file_keys: if file_key not in written_paths: raise DifyDriveLayerError(f"missing pulled file for mentioned drive key {file_key}") for skill_key in self.config.mentioned_skill_keys: - skill_path = written_paths.get(skill_key) - if skill_path is None: + if skill_key not in written_paths: raise DifyDriveLayerError(f"missing pulled SKILL.md for mentioned skill {skill_key}") - try: - self._loaded_skill_bodies[skill_key] = Path(skill_path).read_text(encoding="utf-8") - except (OSError, UnicodeError) as exc: - raise DifyDriveLayerError(f"failed to load pulled SKILL.md for mentioned skill {skill_key}") from exc - async def _fetch_manifest_items( - self, - *, - tenant_id: str, - targets: list[tuple[str, bool]], - ) -> list[AgentStubDriveItem]: - semaphore = asyncio.Semaphore(_DOWNLOAD_CONCURRENCY) + def _mentioned_pull_targets(self) -> list[tuple[str, bool]]: + return [(self._skill_prefix(skill_key), False) for skill_key in self.config.mentioned_skill_keys] + [ + (file_key, True) for file_key in self.config.mentioned_file_keys + ] - async with httpx.AsyncClient(timeout=30.0, follow_redirects=True, trust_env=False) as client: - - async def fetch_one(target: tuple[str, bool]) -> list[AgentStubDriveItem]: - prefix, exact = target - try: - async with semaphore: - response = await client.get( - f"{self.inner_api_url}/inner/api/drive/{self.config.drive_ref}/manifest", - params={ - "tenant_id": tenant_id, - "prefix": prefix, - "include_download_url": "true", - }, - headers={"X-Inner-Api-Key": self.inner_api_key}, - ) - except (httpx.InvalidURL, httpx.TimeoutException, httpx.RequestError) as exc: - raise DifyDriveLayerError(f"drive manifest request failed for {prefix}") from exc - if response.is_error: - raise DifyDriveLayerError(f"drive manifest request failed for {prefix}: {response.status_code}") - try: - payload = AgentStubDriveManifestResponse.model_validate(response.json()) - except (ValueError, TypeError) as exc: - raise DifyDriveLayerError(f"drive manifest response is invalid for {prefix}") from exc - manifest_items: list[AgentStubDriveItem] = [] - for item in payload.items: - if not item.download_url: - raise DifyDriveLayerError(f"drive manifest item is missing download_url for {prefix}") - if exact and item.key != prefix: - continue - manifest_items.append(item) - return manifest_items - - grouped_items = await asyncio.gather(*(fetch_one(target) for target in targets)) - - deduplicated: dict[str, AgentStubDriveItem] = {} - for items in grouped_items: - for item in items: - deduplicated.setdefault(item.key, item) - return [deduplicated[key] for key in sorted(deduplicated)] - - async def _download_items(self, items: list[AgentStubDriveItem]) -> dict[str, str]: - base_path = Path(agent_stub_drive_base_for_ref(self.config.drive_ref)) - semaphore = asyncio.Semaphore(_DOWNLOAD_CONCURRENCY) - - async with httpx.AsyncClient(timeout=30.0, follow_redirects=True, trust_env=False) as client: - - async def download_one(item: AgentStubDriveItem) -> DriveDownloadPayload: - download_url = item.download_url - if not download_url: - raise DifyDriveLayerError(f"drive manifest item is missing download_url for {item.key}") - try: - async with semaphore: - response = await client.get(download_url) - except (httpx.InvalidURL, httpx.TimeoutException, httpx.RequestError) as exc: - raise DifyDriveLayerError(f"drive download failed for {item.key}") from exc - if response.is_error: - raise DifyDriveLayerError(f"drive download failed for {item.key}: {response.status_code}") - return DriveDownloadPayload(key=item.key, payload=response.content, size=item.size) - - downloads = await asyncio.gather(*(download_one(item) for item in items)) - - try: - written_paths = materialize_drive_downloads( - base_path=base_path, - downloads=downloads, - ) - except (DriveMaterializationValidationError, DriveMaterializationTransferError) as exc: - raise DifyDriveLayerError(str(exc)) from exc - - return {download.key: str(path) for download, path in zip(downloads, written_paths, strict=True)} - - def _require_tenant_id(self) -> str: - execution_context = self.deps.execution_context.config - tenant_id = getattr(execution_context, "tenant_id", None) - if not isinstance(tenant_id, str) or not tenant_id.strip(): - raise DifyDriveLayerError("DifyDriveLayer requires execution_context.tenant_id") - return cast(str, tenant_id).strip() + def _shell_local_path(self, drive_key: str) -> str: + return f"{agent_stub_drive_base_for_ref(self.config.drive_ref).rstrip('/')}/{drive_key.lstrip('/')}" @staticmethod def _skill_prefix(skill_key: str) -> str: diff --git a/dify-agent/src/dify_agent/layers/shell/configs.py b/dify-agent/src/dify_agent/layers/shell/configs.py index fafab6a3bd9..f538ffa2636 100644 --- a/dify-agent/src/dify_agent/layers/shell/configs.py +++ b/dify-agent/src/dify_agent/layers/shell/configs.py @@ -3,7 +3,8 @@ Server-only shellctl connection settings are injected by the runtime provider factory. Public config carries product-level Agent Soul settings that must affect the sandbox workspace itself: CLI tool bootstrap commands, normal environment -variables, secret environment variable names, and sandbox-provider metadata. +variables, secret environment variable names, sandbox-provider metadata, and the +Agent Stub drive ref used by shell-visible drive commands. """ import re @@ -80,6 +81,8 @@ class DifyShellLayerConfig(LayerConfig): model_config: ClassVar[ConfigDict] = ConfigDict(extra="forbid") + # Optional because shell can be used without a drive layer. + agent_stub_drive_ref: str | None = Field(default=None, max_length=1024) cli_tools: list[DifyShellCliToolConfig] = Field(default_factory=list) env: list[DifyShellEnvVarConfig] = Field(default_factory=list) secret_refs: list[DifyShellSecretRefConfig] = Field(default_factory=list) diff --git a/dify-agent/src/dify_agent/layers/shell/layer.py b/dify-agent/src/dify_agent/layers/shell/layer.py index a8f46d628a6..17ef504f6bf 100644 --- a/dify-agent/src/dify_agent/layers/shell/layer.py +++ b/dify-agent/src/dify_agent/layers/shell/layer.py @@ -51,7 +51,6 @@ from typing_extensions import Self, override from agenton.layers import LayerDeps, PydanticAILayer, PydanticAIPrompt, PydanticAITool from dify_agent.agent_stub.server.shell_agent_stub_env import ShellAgentStubTokenFactory, build_shell_agent_stub_env -from dify_agent.layers.drive.layer import DifyDriveLayer from dify_agent.layers.execution_context.layer import DifyExecutionContextLayer from dify_agent.layers.shell.configs import DIFY_SHELL_LAYER_TYPE_ID, DifyShellLayerConfig @@ -173,11 +172,11 @@ type ShellInterruptToolResult = ShellJobStatusObservation | ShellToolErrorObserv class DifyShellLayerDeps(LayerDeps): """Optional direct-layer dependencies used by the shell runtime layer. - The drive dependency supplies the drive ref for injected - Agent Stub CLI commands; the execution context supplies the token principal. + The execution context supplies the token principal. The drive ref used for + Agent Stub CLI commands is passed through config so the drive layer can + depend on shell for eager materialization without a dependency cycle. """ - drive: DifyDriveLayer | None # pyright: ignore[reportUninitializedInstanceVariable] execution_context: DifyExecutionContextLayer | None # pyright: ignore[reportUninitializedInstanceVariable] @@ -768,10 +767,9 @@ class DifyShellLayer(PydanticAILayer[DifyShellLayerDeps, object, DifyShellLayerC """Build per-command Agent Stub env only for user-visible ``shell.run``.""" execution_context_layer = self.deps.execution_context execution_context = execution_context_layer.config if execution_context_layer is not None else None - drive_layer = self.deps.drive return build_shell_agent_stub_env( agent_stub_api_base_url=self.agent_stub_api_base_url, - agent_stub_drive_ref=drive_layer.config.drive_ref if drive_layer is not None else None, + agent_stub_drive_ref=self.config.agent_stub_drive_ref, execution_context=execution_context, token_factory=self.agent_stub_token_factory, session_id=self.runtime_state.session_id, diff --git a/dify-agent/src/dify_agent/runtime/compositor_factory.py b/dify-agent/src/dify_agent/runtime/compositor_factory.py index 0cfab33ad0d..610733cff98 100644 --- a/dify-agent/src/dify_agent/runtime/compositor_factory.py +++ b/dify-agent/src/dify_agent/runtime/compositor_factory.py @@ -38,7 +38,6 @@ from dify_agent.agent_stub.server.tokens.agent_stub import AgentStubTokenCodec from dify_agent.layers.ask_human.layer import DifyAskHumanLayer from dify_agent.layers.dify_plugin.llm_layer import DifyPluginLLMLayer from dify_agent.layers.dify_plugin.tools_layer import DifyPluginToolsLayer -from dify_agent.layers.drive import DifyDriveLayerConfig from dify_agent.layers.drive.layer import DifyDriveLayer from dify_agent.layers.execution_context.configs import DifyExecutionContextLayerConfig from dify_agent.layers.execution_context.layer import DifyExecutionContextLayer @@ -90,14 +89,7 @@ def create_default_layer_providers( LayerProvider.from_layer_type(PydanticAIHistoryLayer), LayerProvider.from_layer_type(DifyOutputLayer), LayerProvider.from_layer_type(DifyAskHumanLayer), - LayerProvider.from_factory( - layer_type=DifyDriveLayer, - create=lambda config: DifyDriveLayer.from_config_with_settings( - DifyDriveLayerConfig.model_validate(config), - inner_api_url=inner_api_url, - inner_api_key=inner_api_key, - ), - ), + LayerProvider.from_layer_type(DifyDriveLayer), LayerProvider.from_factory( layer_type=DifyExecutionContextLayer, create=lambda config: DifyExecutionContextLayer.from_config_with_settings( diff --git a/dify-agent/tests/local/dify_agent/layers/drive/test_configs.py b/dify-agent/tests/local/dify_agent/layers/drive/test_configs.py index 96d781caa72..05ddad3543c 100644 --- a/dify-agent/tests/local/dify_agent/layers/drive/test_configs.py +++ b/dify-agent/tests/local/dify_agent/layers/drive/test_configs.py @@ -48,10 +48,8 @@ def test_layer_config_rejects_unknown_fields() -> None: def test_drive_layer_is_registered_and_constructible_from_config() -> None: - layer = DifyDriveLayer.from_config_with_settings( + layer = DifyDriveLayer.from_config( DifyDriveLayerConfig(drive_ref="agent-1", skills=[], mentioned_skill_keys=[], mentioned_file_keys=[]), - inner_api_url="https://api.example.com", - inner_api_key="secret", ) assert isinstance(layer, DifyDriveLayer) diff --git a/dify-agent/tests/local/dify_agent/layers/drive/test_layer.py b/dify-agent/tests/local/dify_agent/layers/drive/test_layer.py index ead91a41f01..f9abc8e718b 100644 --- a/dify-agent/tests/local/dify_agent/layers/drive/test_layer.py +++ b/dify-agent/tests/local/dify_agent/layers/drive/test_layer.py @@ -2,38 +2,30 @@ from __future__ import annotations -from io import BytesIO -from pathlib import Path -from typing import ClassVar -from zipfile import ZipFile +from typing import cast import pytest -from agenton.layers import EmptyRuntimeState, LayerConfig, NoLayerDeps, PlainLayer -from dify_agent.agent_stub._drive_materialization import DriveDownloadPayload -from dify_agent.agent_stub.protocol.agent_stub import AgentStubDriveItem from dify_agent.layers.drive import DifyDriveLayerConfig, DifyDriveSkillConfig from dify_agent.layers.drive.layer import DifyDriveLayer, DifyDriveLayerError +from dify_agent.layers.shell import DifyShellLayerConfig +from dify_agent.layers.shell.layer import DifyShellLayer, RemoteCommandResult, ShellctlClientFactory -class _FakeExecutionContextConfig(LayerConfig): - tenant_id: str +def _unused_client_factory(_entrypoint: str): + raise AssertionError("shellctl client should not be used by these drive-layer tests") -class _FakeExecutionContextLayer(PlainLayer[NoLayerDeps, _FakeExecutionContextConfig, EmptyRuntimeState]): - type_id: ClassVar[str | None] = None - - config: _FakeExecutionContextConfig - - def __new__(cls) -> _FakeExecutionContextLayer: - return super().__new__(cls) - - def __init__(self) -> None: - self.config = _FakeExecutionContextConfig(tenant_id="tenant-1") +def _shell_layer() -> DifyShellLayer: + return DifyShellLayer.from_config_with_settings( + DifyShellLayerConfig(agent_stub_drive_ref="agent-1"), + shellctl_entrypoint="http://shellctl", + shellctl_client_factory=cast(ShellctlClientFactory, _unused_client_factory), + ) -def _build_layer(tmp_path: Path) -> DifyDriveLayer: - layer = DifyDriveLayer.from_config_with_settings( +def _build_layer() -> DifyDriveLayer: + layer = DifyDriveLayer.from_config( DifyDriveLayerConfig( drive_ref="agent-1", skills=[ @@ -54,48 +46,45 @@ def _build_layer(tmp_path: Path) -> DifyDriveLayer: ], mentioned_skill_keys=["tender-analyzer/SKILL.md"], mentioned_file_keys=["files/report.pdf"], - ), - inner_api_url="https://api.example.com", - inner_api_key="secret", + ) ) - layer.bind_deps({"execution_context": _FakeExecutionContextLayer()}) + layer.bind_deps({"shell": _shell_layer()}) return layer -class _FakeAsyncResponse: - def __init__(self, *, status_code: int = 200, json_data: object | None = None, content: bytes = b"") -> None: - self.status_code = status_code - self._json_data = json_data - self.content = content - - @property - def is_error(self) -> bool: - return self.status_code >= 400 - - def json(self) -> object: - if isinstance(self._json_data, Exception): - raise self._json_data - return self._json_data +def _remote_result( + output: str, + *, + exit_code: int | None = 0, + truncated: bool = False, +) -> RemoteCommandResult: + return RemoteCommandResult( + job_id="remote-drive-pull", + status="exited", + done=True, + exit_code=exit_code, + output=output, + offset=len(output), + truncated=truncated, + output_path="/tmp/output.log", + ) -class _FakeAsyncClient: - def __init__(self, responses: dict[str, _FakeAsyncResponse]) -> None: - self._responses = responses - - async def __aenter__(self) -> _FakeAsyncClient: - return self - - async def __aexit__(self, exc_type, exc, tb) -> None: - del exc_type, exc, tb - - async def get(self, url: str, **kwargs) -> _FakeAsyncResponse: - prefix = kwargs.get("params", {}).get("prefix") - key = f"manifest:{prefix}" if prefix is not None else f"download:{url}" - return self._responses[key] +def _pulled_output() -> str: + return ( + "/mnt/drive/agent-1/tender-analyzer\n" + "/mnt/drive/agent-1/files/report.pdf\n" + "__DIFY_DRIVE_MENTIONED_PATH__\ttender-analyzer/SKILL.md\t/mnt/drive/agent-1/tender-analyzer/SKILL.md\n" + "__DIFY_DRIVE_SKILL_BEGIN__\ttender-analyzer/SKILL.md\n" + "# Tender Analyzer\n" + "Use carefully.\n" + "__DIFY_DRIVE_SKILL_END__\ttender-analyzer/SKILL.md\n" + "__DIFY_DRIVE_MENTIONED_PATH__\tfiles/report.pdf\t/mnt/drive/agent-1/files/report.pdf\n" + ) -def test_drive_layer_exposes_agent_stub_cli_usage_suffix_prompt(tmp_path: Path) -> None: - layer = _build_layer(tmp_path) +def test_drive_layer_exposes_agent_stub_cli_usage_suffix_prompt() -> None: + layer = _build_layer() assert len(layer.suffix_prompts) == 1 prompt = layer.suffix_prompts[0] @@ -121,317 +110,146 @@ def test_drive_layer_exposes_agent_stub_cli_usage_suffix_prompt(tmp_path: Path) @pytest.mark.anyio -async def test_on_context_create_loads_mentioned_targets_into_prompt( - monkeypatch: pytest.MonkeyPatch, tmp_path: Path +async def test_on_context_create_pulls_mentioned_targets_through_shell( + monkeypatch: pytest.MonkeyPatch, ) -> None: - layer = _build_layer(tmp_path) + layer = _build_layer() + captured: dict[str, object] = {} - async def _fetch_manifest_items(*, tenant_id: str, targets: list[tuple[str, bool]]) -> list[AgentStubDriveItem]: - assert tenant_id == "tenant-1" - assert targets == [("tender-analyzer/", False), ("files/report.pdf", True)] - return [ - AgentStubDriveItem(key="tender-analyzer/SKILL.md", download_url="https://files/skill-md"), - AgentStubDriveItem(key="files/report.pdf", download_url="https://files/report"), - ] + async def fake_run_remote_script( + self: DifyShellLayer, + script: str, + *, + timeout: float = 10.0, + inject_agent_stub_env: bool = False, + ) -> RemoteCommandResult: + del self, timeout + captured["script"] = script + captured["inject_agent_stub_env"] = inject_agent_stub_env + return _remote_result(_pulled_output()) - async def _download_items(items: list[AgentStubDriveItem]) -> dict[str, str]: - assert {item.key for item in items} == {"files/report.pdf", "tender-analyzer/SKILL.md"} - skill_path = tmp_path / "tender-analyzer" / "SKILL.md" - skill_path.parent.mkdir(parents=True, exist_ok=True) - skill_path.write_text("# Tender Analyzer\nUse carefully.\n", encoding="utf-8") - file_path = tmp_path / "files" / "report.pdf" - file_path.parent.mkdir(parents=True, exist_ok=True) - file_path.write_bytes(b"pdf") - return { - "tender-analyzer/SKILL.md": str(skill_path), - "files/report.pdf": str(file_path), - } - - monkeypatch.setattr(layer, "_fetch_manifest_items", _fetch_manifest_items) - monkeypatch.setattr(layer, "_download_items", _download_items) + monkeypatch.setattr(DifyShellLayer, "run_remote_script", fake_run_remote_script) await layer.on_context_create() + script = captured["script"] + assert isinstance(script, str) + assert captured["inject_agent_stub_env"] is True + assert "base=/mnt/drive/agent-1" in script + assert 'dify-agent drive pull tender-analyzer/ files/report.pdf --to "$base"' in script + assert "cat /mnt/drive/agent-1/tender-analyzer/SKILL.md" in script prompt = layer.build_prompt_context() assert "Loaded mentioned skills" in prompt - assert f"Local path: {tmp_path / 'tender-analyzer'}" in prompt + assert "Path: tender-analyzer" in prompt + assert "Local path: /mnt/drive/agent-1/tender-analyzer" in prompt assert "Name: Tender Analyzer" not in prompt assert "# Tender Analyzer\nUse carefully." in prompt - assert f"files/report.pdf -> {tmp_path / 'files' / 'report.pdf'}" in prompt + assert "files/report.pdf -> /mnt/drive/agent-1/files/report.pdf" in prompt assert "Other available skills" not in prompt - assert "other-skill: Other Skill — Fallback catalog entry." not in prompt @pytest.mark.anyio -async def test_on_context_resume_loads_mentioned_targets_into_prompt( - monkeypatch: pytest.MonkeyPatch, tmp_path: Path +async def test_on_context_resume_repulls_mentioned_targets_through_shell( + monkeypatch: pytest.MonkeyPatch, ) -> None: - layer = _build_layer(tmp_path) + layer = _build_layer() + calls = 0 - async def _fetch_manifest_items(*, tenant_id: str, targets: list[tuple[str, bool]]) -> list[AgentStubDriveItem]: - assert tenant_id == "tenant-1" - assert targets == [("tender-analyzer/", False), ("files/report.pdf", True)] - return [ - AgentStubDriveItem(key="tender-analyzer/SKILL.md", download_url="https://files/skill-md"), - AgentStubDriveItem(key="files/report.pdf", download_url="https://files/report"), - ] + async def fake_run_remote_script( + self: DifyShellLayer, + script: str, + *, + timeout: float = 10.0, + inject_agent_stub_env: bool = False, + ) -> RemoteCommandResult: + del self, script, timeout + nonlocal calls + calls += 1 + assert inject_agent_stub_env is True + return _remote_result(_pulled_output()) - async def _download_items(items: list[AgentStubDriveItem]) -> dict[str, str]: - assert {item.key for item in items} == {"files/report.pdf", "tender-analyzer/SKILL.md"} - skill_path = tmp_path / "tender-analyzer" / "SKILL.md" - skill_path.parent.mkdir(parents=True, exist_ok=True) - skill_path.write_text("# Tender Analyzer\nUse carefully.\n", encoding="utf-8") - file_path = tmp_path / "files" / "report.pdf" - file_path.parent.mkdir(parents=True, exist_ok=True) - file_path.write_bytes(b"pdf") - return { - "tender-analyzer/SKILL.md": str(skill_path), - "files/report.pdf": str(file_path), - } - - monkeypatch.setattr(layer, "_fetch_manifest_items", _fetch_manifest_items) - monkeypatch.setattr(layer, "_download_items", _download_items) + monkeypatch.setattr(DifyShellLayer, "run_remote_script", fake_run_remote_script) await layer.on_context_resume() - prompt = layer.build_prompt_context() - assert "Loaded mentioned skills" in prompt - assert f"Local path: {tmp_path / 'tender-analyzer'}" in prompt - assert "Name: Tender Analyzer" not in prompt - assert "# Tender Analyzer\nUse carefully." in prompt - assert f"files/report.pdf -> {tmp_path / 'files' / 'report.pdf'}" in prompt - assert "Other available skills" not in prompt - assert "other-skill: Other Skill — Fallback catalog entry." not in prompt + assert calls == 1 + assert "Loaded mentioned skills" in layer.build_prompt_context() @pytest.mark.anyio async def test_on_context_create_raises_when_mentioned_file_is_missing( monkeypatch: pytest.MonkeyPatch, - tmp_path: Path, ) -> None: - layer = _build_layer(tmp_path) + layer = _build_layer() - async def _fetch_manifest_items(*, tenant_id: str, targets: list[tuple[str, bool]]) -> list[AgentStubDriveItem]: - del tenant_id, targets - return [AgentStubDriveItem(key="tender-analyzer/SKILL.md", download_url="https://files/skill-md")] - - async def _download_items(items: list[AgentStubDriveItem]) -> dict[str, str]: - del items - skill_path = tmp_path / "tender-analyzer" / "SKILL.md" - skill_path.parent.mkdir(parents=True, exist_ok=True) - skill_path.write_text("# Tender Analyzer\nUse carefully.\n", encoding="utf-8") - return {"tender-analyzer/SKILL.md": str(skill_path)} - - monkeypatch.setattr(layer, "_fetch_manifest_items", _fetch_manifest_items) - monkeypatch.setattr(layer, "_download_items", _download_items) - - with pytest.raises(DifyDriveLayerError, match="missing pulled file"): - await layer.on_context_create() - - -@pytest.mark.anyio -async def test_fetch_manifest_items_validates_payload_filters_exact_targets_and_deduplicates( - monkeypatch: pytest.MonkeyPatch, - tmp_path: Path, -) -> None: - layer = _build_layer(tmp_path) - responses = { - "manifest:tender-analyzer/": _FakeAsyncResponse( - json_data={ - "items": [ - {"key": "tender-analyzer/SKILL.md", "download_url": "https://files/skill-md", "size": 7}, - {"key": "files/report.pdf", "download_url": "https://files/report", "size": 3}, - ] - } - ), - "manifest:files/report.pdf": _FakeAsyncResponse( - json_data={ - "items": [ - {"key": "files/report.pdf", "download_url": "https://files/report", "size": 3}, - {"key": "files/other.pdf", "download_url": "https://files/other", "size": 4}, - ] - } - ), - } - monkeypatch.setattr( - "dify_agent.layers.drive.layer.httpx.AsyncClient", - lambda **_kwargs: _FakeAsyncClient(responses), - ) - - result = await layer._fetch_manifest_items( - tenant_id="tenant-1", - targets=[("tender-analyzer/", False), ("files/report.pdf", True)], - ) - - assert {(item.key, item.download_url, item.size) for item in result} == { - ("files/report.pdf", "https://files/report", 3), - ("tender-analyzer/SKILL.md", "https://files/skill-md", 7), - } - assert [item.key for item in result].count("files/report.pdf") == 1 - - -@pytest.mark.anyio -async def test_fetch_manifest_items_rejects_invalid_manifest_payload( - monkeypatch: pytest.MonkeyPatch, - tmp_path: Path, -) -> None: - layer = _build_layer(tmp_path) - responses = {"manifest:tender-analyzer/": _FakeAsyncResponse(json_data={"items": "bad"})} - monkeypatch.setattr( - "dify_agent.layers.drive.layer.httpx.AsyncClient", - lambda **_kwargs: _FakeAsyncClient(responses), - ) - - with pytest.raises(DifyDriveLayerError, match="drive manifest response is invalid"): - await layer._fetch_manifest_items(tenant_id="tenant-1", targets=[("tender-analyzer/", False)]) - - -@pytest.mark.anyio -async def test_fetch_manifest_items_rejects_missing_download_url( - monkeypatch: pytest.MonkeyPatch, - tmp_path: Path, -) -> None: - layer = _build_layer(tmp_path) - responses = { - "manifest:tender-analyzer/": _FakeAsyncResponse( - json_data={"items": [{"key": "tender-analyzer/SKILL.md", "download_url": None, "size": 7}]} + async def fake_run_remote_script( + self: DifyShellLayer, + script: str, + *, + timeout: float = 10.0, + inject_agent_stub_env: bool = False, + ) -> RemoteCommandResult: + del self, script, timeout, inject_agent_stub_env + output = ( + "__DIFY_DRIVE_MENTIONED_PATH__\ttender-analyzer/SKILL.md\t/mnt/drive/agent-1/tender-analyzer/SKILL.md\n" + "__DIFY_DRIVE_SKILL_BEGIN__\ttender-analyzer/SKILL.md\n" + "# Tender Analyzer\n" + "__DIFY_DRIVE_SKILL_END__\ttender-analyzer/SKILL.md\n" ) - } - monkeypatch.setattr( - "dify_agent.layers.drive.layer.httpx.AsyncClient", - lambda **_kwargs: _FakeAsyncClient(responses), - ) + return _remote_result(output) - with pytest.raises(DifyDriveLayerError, match="missing download_url"): - await layer._fetch_manifest_items(tenant_id="tenant-1", targets=[("tender-analyzer/", False)]) - - -@pytest.mark.anyio -async def test_download_items_hands_validated_downloads_to_materialization( - monkeypatch: pytest.MonkeyPatch, - tmp_path: Path, -) -> None: - layer = _build_layer(tmp_path) - responses = { - "download:https://files/skill-md": _FakeAsyncResponse(content=b"skill-md"), - "download:https://files/report": _FakeAsyncResponse(content=b"pdf"), - } - monkeypatch.setattr( - "dify_agent.layers.drive.layer.httpx.AsyncClient", - lambda **_kwargs: _FakeAsyncClient(responses), - ) - captured: dict[str, object] = {} - - def fake_materialize_drive_downloads(*, base_path: Path, downloads: list[DriveDownloadPayload]): - captured["base_path"] = base_path - captured["downloads"] = downloads - return [tmp_path / "tender-analyzer" / "SKILL.md", tmp_path / "files" / "report.pdf"] - - monkeypatch.setattr( - "dify_agent.layers.drive.layer.materialize_drive_downloads", - fake_materialize_drive_downloads, - ) - - result = await layer._download_items( - [ - AgentStubDriveItem(key="tender-analyzer/SKILL.md", download_url="https://files/skill-md", size=8), - AgentStubDriveItem(key="files/report.pdf", download_url="https://files/report", size=3), - ] - ) - - downloads = captured["downloads"] - assert isinstance(downloads, list) - assert downloads == [ - DriveDownloadPayload(key="tender-analyzer/SKILL.md", payload=b"skill-md", size=8), - DriveDownloadPayload(key="files/report.pdf", payload=b"pdf", size=3), - ] - assert result == { - "tender-analyzer/SKILL.md": str(tmp_path / "tender-analyzer" / "SKILL.md"), - "files/report.pdf": str(tmp_path / "files" / "report.pdf"), - } - - -@pytest.mark.anyio -async def test_download_items_extracts_skill_archive_over_skill_md_and_deletes_archive( - monkeypatch: pytest.MonkeyPatch, - tmp_path: Path, -) -> None: - layer = _build_layer(tmp_path) - archive_buffer = BytesIO() - with ZipFile(archive_buffer, mode="w") as archive: - archive.writestr("SKILL.md", "# From archive\n") - archive.writestr("helper.py", "print('archive')\n") - archive_bytes = archive_buffer.getvalue() - responses = { - "download:https://files/skill-md": _FakeAsyncResponse(content=b"# From manifest\n"), - "download:https://files/archive": _FakeAsyncResponse(content=archive_bytes), - } - monkeypatch.setattr( - "dify_agent.layers.drive.layer.httpx.AsyncClient", - lambda **_kwargs: _FakeAsyncClient(responses), - ) - monkeypatch.setattr( - "dify_agent.layers.drive.layer.agent_stub_drive_base_for_ref", - lambda _drive_ref: str(tmp_path), - ) - - result = await layer._download_items( - [ - AgentStubDriveItem(key="tender-analyzer/SKILL.md", download_url="https://files/skill-md", size=16), - AgentStubDriveItem( - key="tender-analyzer/.DIFY-SKILL-FULL.zip", - download_url="https://files/archive", - size=len(archive_bytes), - ), - ] - ) - - archive_path = tmp_path / "tender-analyzer" / ".DIFY-SKILL-FULL.zip" - assert result["tender-analyzer/.DIFY-SKILL-FULL.zip"] == str(archive_path) - assert not archive_path.exists() - assert (tmp_path / "tender-analyzer" / "SKILL.md").read_text(encoding="utf-8") == "# From archive\n" - assert (tmp_path / "tender-analyzer" / "helper.py").read_text(encoding="utf-8") == "print('archive')\n" - - -@pytest.mark.anyio -async def test_on_context_resume_raises_when_mentioned_targets_are_missing( - monkeypatch: pytest.MonkeyPatch, - tmp_path: Path, -) -> None: - layer = _build_layer(tmp_path) - - async def _fetch_manifest_items(*, tenant_id: str, targets: list[tuple[str, bool]]) -> list[AgentStubDriveItem]: - del tenant_id, targets - return [] - - async def _download_items(items: list[AgentStubDriveItem]) -> dict[str, str]: - assert items == [] - return {} - - monkeypatch.setattr(layer, "_fetch_manifest_items", _fetch_manifest_items) - monkeypatch.setattr(layer, "_download_items", _download_items) - - with pytest.raises(DifyDriveLayerError, match="missing pulled file"): - await layer.on_context_resume() - - -@pytest.mark.anyio -async def test_on_context_create_raises_when_manifest_is_empty_for_mentioned_targets( - monkeypatch: pytest.MonkeyPatch, - tmp_path: Path, -) -> None: - layer = _build_layer(tmp_path) - - async def _fetch_manifest_items(*, tenant_id: str, targets: list[tuple[str, bool]]) -> list[AgentStubDriveItem]: - del tenant_id, targets - return [] - - async def _download_items(items: list[AgentStubDriveItem]) -> dict[str, str]: - assert items == [] - return {} - - monkeypatch.setattr(layer, "_fetch_manifest_items", _fetch_manifest_items) - monkeypatch.setattr(layer, "_download_items", _download_items) + monkeypatch.setattr(DifyShellLayer, "run_remote_script", fake_run_remote_script) with pytest.raises(DifyDriveLayerError, match="missing pulled file"): await layer.on_context_create() + + +@pytest.mark.anyio +async def test_on_context_create_raises_when_shell_pull_fails( + monkeypatch: pytest.MonkeyPatch, +) -> None: + layer = _build_layer() + + async def fake_run_remote_script( + self: DifyShellLayer, + script: str, + *, + timeout: float = 10.0, + inject_agent_stub_env: bool = False, + ) -> RemoteCommandResult: + del self, script, timeout, inject_agent_stub_env + return _remote_result("permission denied\n", exit_code=1) + + monkeypatch.setattr(DifyShellLayer, "run_remote_script", fake_run_remote_script) + + with pytest.raises(DifyDriveLayerError, match="drive mentioned pull failed in shell"): + await layer.on_context_create() + + +@pytest.mark.anyio +async def test_on_context_create_raises_when_shell_output_is_truncated( + monkeypatch: pytest.MonkeyPatch, +) -> None: + layer = _build_layer() + + async def fake_run_remote_script( + self: DifyShellLayer, + script: str, + *, + timeout: float = 10.0, + inject_agent_stub_env: bool = False, + ) -> RemoteCommandResult: + del self, script, timeout, inject_agent_stub_env + return _remote_result(_pulled_output(), truncated=True) + + monkeypatch.setattr(DifyShellLayer, "run_remote_script", fake_run_remote_script) + + with pytest.raises(DifyDriveLayerError, match="output was truncated"): + await layer.on_context_create() + + +def test_parse_shell_pull_output_rejects_unclosed_skill_marker() -> None: + layer = _build_layer() + + with pytest.raises(DifyDriveLayerError, match="omitted SKILL.md end marker"): + layer._parse_shell_pull_output("__DIFY_DRIVE_SKILL_BEGIN__\ttender-analyzer/SKILL.md\n# Tender\n") diff --git a/dify-agent/tests/local/dify_agent/layers/shell/test_configs.py b/dify-agent/tests/local/dify_agent/layers/shell/test_configs.py index c15b15ee284..ec429f94339 100644 --- a/dify-agent/tests/local/dify_agent/layers/shell/test_configs.py +++ b/dify-agent/tests/local/dify_agent/layers/shell/test_configs.py @@ -29,6 +29,7 @@ def test_shell_layer_config_defaults_and_forbids_unknown_fields() -> None: config = DifyShellLayerConfig() assert config.model_dump() == { + "agent_stub_drive_ref": None, "cli_tools": [], "env": [], "secret_refs": [], @@ -51,6 +52,7 @@ def test_shell_layer_config_accepts_agent_soul_shell_settings() -> None: ], env=[DifyShellEnvVarConfig(name="PROJECT_NAME", value="demo")], secret_refs=[DifyShellSecretRefConfig(name="OPENAI_API_KEY", ref="credential-1")], + agent_stub_drive_ref="agent-1", sandbox=DifyShellSandboxConfig(provider="independent", config={"cpu": 2}), ) @@ -59,6 +61,7 @@ def test_shell_layer_config_accepts_agent_soul_shell_settings() -> None: assert config.cli_tools[0].secret_refs[0].ref == "credential-2" assert config.env[0].name == "PROJECT_NAME" assert config.secret_refs[0].ref == "credential-1" + assert config.agent_stub_drive_ref == "agent-1" assert config.sandbox is not None assert config.sandbox.config == {"cpu": 2} diff --git a/dify-agent/tests/local/dify_agent/layers/shell/test_layer.py b/dify-agent/tests/local/dify_agent/layers/shell/test_layer.py index c7d2599b63c..c3a5c2b941e 100644 --- a/dify-agent/tests/local/dify_agent/layers/shell/test_layer.py +++ b/dify-agent/tests/local/dify_agent/layers/shell/test_layer.py @@ -14,8 +14,6 @@ from dify_agent.agent_stub.server.shell_agent_stub_env import ( AGENT_STUB_DRIVE_BASE_ENV_VAR, AGENT_STUB_API_BASE_URL_ENV_VAR, ) -from dify_agent.layers.drive import DifyDriveLayerConfig -from dify_agent.layers.drive.layer import DifyDriveLayer from dify_agent.layers.execution_context import DifyExecutionContextLayerConfig from dify_agent.layers.execution_context.layer import DifyExecutionContextLayer from dify_agent.layers.shell import ( @@ -238,14 +236,6 @@ def _execution_context_layer() -> DifyExecutionContextLayer: ) -def _drive_layer() -> DifyDriveLayer: - return DifyDriveLayer.from_config_with_settings( - DifyDriveLayerConfig(drive_ref="agent-1"), - inner_api_url="https://api.example.com", - inner_api_key="secret", - ) - - def _shell_provider(*, client_factory: ShellctlClientFactory) -> LayerProvider[DifyShellLayer]: return LayerProvider.from_factory( layer_type=DifyShellLayer, @@ -666,7 +656,7 @@ def test_shell_layer_injects_agent_stub_env_only_for_user_visible_shell_run() -> client = FakeShellctlClient(run_handler=run_handler) layer = DifyShellLayer.from_config_with_settings( - DifyShellLayerConfig(), + DifyShellLayerConfig(agent_stub_drive_ref="agent-1"), shellctl_entrypoint="http://shellctl", shellctl_client_factory=lambda _entrypoint: client, agent_stub_api_base_url="https://agent.example.com/agent-stub", @@ -674,7 +664,7 @@ def test_shell_layer_injects_agent_stub_env_only_for_user_visible_shell_run() -> f"token-for:{execution_context.tenant_id}:{session_id}" ), ) - layer.deps = layer.deps_type(drive=_drive_layer(), execution_context=_execution_context_layer()) + layer.deps = layer.deps_type(execution_context=_execution_context_layer()) tools = {tool.name: tool for tool in layer.tools} async def scenario() -> None: @@ -793,7 +783,7 @@ def test_run_remote_script_can_inject_agent_stub_env_for_server_owned_uploads() client = FakeShellctlClient(run_handler=run_handler) layer = DifyShellLayer.from_config_with_settings( - DifyShellLayerConfig(), + DifyShellLayerConfig(agent_stub_drive_ref="agent-1"), shellctl_entrypoint="http://shellctl", shellctl_client_factory=lambda _entrypoint: client, agent_stub_api_base_url="https://agent.example.com/agent-stub", @@ -801,7 +791,7 @@ def test_run_remote_script_can_inject_agent_stub_env_for_server_owned_uploads() f"token-for:{execution_context.tenant_id}:{session_id}" ), ) - layer.deps = layer.deps_type(drive=_drive_layer(), execution_context=_execution_context_layer()) + layer.deps = layer.deps_type(execution_context=_execution_context_layer()) async def scenario() -> None: async with layer.resource_context():