Materialize drive mentions through shell

This commit is contained in:
Yanli 盐粒 2026-06-25 23:55:17 +08:00
parent d67b0bb615
commit 52903a6e69
13 changed files with 419 additions and 577 deletions

View File

@ -78,11 +78,22 @@ def _filter_snapshot_to_specs(
return CompositorSessionSnapshot(schema_version=snapshot.schema_version, layers=filtered_layers)
def _shell_layer_deps(*, include_drive: bool) -> dict[str, str]:
deps = {"execution_context": DIFY_EXECUTION_CONTEXT_LAYER_ID}
if include_drive:
deps["drive"] = DIFY_DRIVE_LAYER_ID
return deps
def _shell_layer_deps() -> dict[str, str]:
return {"execution_context": DIFY_EXECUTION_CONTEXT_LAYER_ID}
def _drive_layer_deps() -> dict[str, str]:
return {"shell": DIFY_SHELL_LAYER_ID}
def _shell_config_with_drive_ref(
shell_config: DifyShellLayerConfig | None,
drive_config: DifyDriveLayerConfig | None,
) -> DifyShellLayerConfig:
config = shell_config or DifyShellLayerConfig()
if drive_config is None:
return config
return config.model_copy(update={"agent_stub_drive_ref": drive_config.drive_ref})
class AgentBackendModelConfig(BaseModel):
@ -263,14 +274,29 @@ class AgentBackendRunRequestBuilder:
]
)
include_shell = run_input.include_shell or run_input.drive_config is not None
if include_shell:
# Sandboxed bash workspace (dify.shell). It enters before drive so
# drive can materialize mentioned targets with `dify-agent drive pull`
# in the same shell-visible filesystem used by model commands.
layers.append(
RunLayerSpec(
name=DIFY_SHELL_LAYER_ID,
type=DIFY_SHELL_LAYER_TYPE_ID,
deps=_shell_layer_deps(),
metadata=run_input.metadata,
config=_shell_config_with_drive_ref(run_input.shell_config, run_input.drive_config),
)
)
if run_input.drive_config is not None:
# Drive Skills & Files declaration (dify.drive): a config-only index;
# the agent pulls listed entries through the back proxy by drive_ref.
# Drive Skills & Files declaration (dify.drive): the catalog plus
# prompt-mentioned entries eagerly pulled through the shell layer.
layers.append(
RunLayerSpec(
name=DIFY_DRIVE_LAYER_ID,
type=DIFY_DRIVE_LAYER_TYPE_ID,
deps={"execution_context": DIFY_EXECUTION_CONTEXT_LAYER_ID},
deps=_drive_layer_deps(),
metadata=run_input.metadata,
config=run_input.drive_config,
)
@ -336,21 +362,6 @@ class AgentBackendRunRequestBuilder:
)
)
if run_input.include_shell:
# Sandboxed bash workspace (dify.shell). Depends on execution_context
# so the agent server can mint per-command Agent Stub env, and on
# drive when present so that env points at /mnt/drive/<drive_ref>.
# shellctl connection itself is server-injected.
layers.append(
RunLayerSpec(
name=DIFY_SHELL_LAYER_ID,
type=DIFY_SHELL_LAYER_TYPE_ID,
deps=_shell_layer_deps(include_drive=run_input.drive_config is not None),
metadata=run_input.metadata,
config=run_input.shell_config or DifyShellLayerConfig(),
)
)
if run_input.output is not None:
layers.append(
RunLayerSpec(
@ -462,14 +473,29 @@ class AgentBackendRunRequestBuilder:
]
)
include_shell = run_input.include_shell or run_input.drive_config is not None
if include_shell:
# Sandboxed bash workspace (dify.shell). It enters before drive so
# drive can materialize mentioned targets with `dify-agent drive pull`
# in the same shell-visible filesystem used by model commands.
layers.append(
RunLayerSpec(
name=DIFY_SHELL_LAYER_ID,
type=DIFY_SHELL_LAYER_TYPE_ID,
deps=_shell_layer_deps(),
metadata=run_input.metadata,
config=_shell_config_with_drive_ref(run_input.shell_config, run_input.drive_config),
)
)
if run_input.drive_config is not None:
# Drive Skills & Files declaration (dify.drive): a config-only index;
# the agent pulls listed entries through the back proxy by drive_ref.
# Drive Skills & Files declaration (dify.drive): the catalog plus
# prompt-mentioned entries eagerly pulled through the shell layer.
layers.append(
RunLayerSpec(
name=DIFY_DRIVE_LAYER_ID,
type=DIFY_DRIVE_LAYER_TYPE_ID,
deps={"execution_context": DIFY_EXECUTION_CONTEXT_LAYER_ID},
deps=_drive_layer_deps(),
metadata=run_input.metadata,
config=run_input.drive_config,
)
@ -537,21 +563,6 @@ class AgentBackendRunRequestBuilder:
)
)
if run_input.include_shell:
# Sandboxed bash workspace (dify.shell). Depends on execution_context
# so the agent server can mint per-command Agent Stub env, and on
# drive when present so that env points at /mnt/drive/<drive_ref>.
# shellctl connection itself is server-injected.
layers.append(
RunLayerSpec(
name=DIFY_SHELL_LAYER_ID,
type=DIFY_SHELL_LAYER_TYPE_ID,
deps=_shell_layer_deps(include_drive=run_input.drive_config is not None),
metadata=run_input.metadata,
config=run_input.shell_config or DifyShellLayerConfig(),
)
)
if run_input.output is not None:
layers.append(
RunLayerSpec(

View File

@ -339,7 +339,7 @@ def test_workflow_request_builder_adds_shell_layer_when_include_shell():
assert shell_config.env[0].name == "PROJECT_NAME"
def test_workflow_request_builder_binds_shell_to_drive_when_configured():
def test_workflow_request_builder_binds_drive_to_shell_when_configured():
run_input = _run_input()
run_input.include_shell = True
run_input.drive_config = DifyDriveLayerConfig(drive_ref="agent-agent-1")
@ -348,11 +348,11 @@ def test_workflow_request_builder_binds_shell_to_drive_when_configured():
layers = {layer.name: layer for layer in request.composition.layers}
layer_names = [layer.name for layer in request.composition.layers]
assert layers[DIFY_SHELL_LAYER_ID].deps == {
"execution_context": DIFY_EXECUTION_CONTEXT_LAYER_ID,
"drive": DIFY_DRIVE_LAYER_ID,
}
assert layer_names.index(DIFY_DRIVE_LAYER_ID) < layer_names.index(DIFY_SHELL_LAYER_ID)
assert layers[DIFY_SHELL_LAYER_ID].deps == {"execution_context": DIFY_EXECUTION_CONTEXT_LAYER_ID}
shell_config = cast(DifyShellLayerConfig, layers[DIFY_SHELL_LAYER_ID].config)
assert shell_config.agent_stub_drive_ref == "agent-agent-1"
assert layers[DIFY_DRIVE_LAYER_ID].deps == {"shell": DIFY_SHELL_LAYER_ID}
assert layer_names.index(DIFY_SHELL_LAYER_ID) < layer_names.index(DIFY_DRIVE_LAYER_ID)
def test_agent_app_request_builder_omits_shell_layer_by_default():
@ -374,7 +374,7 @@ def test_agent_app_request_builder_adds_shell_layer_when_include_shell():
assert shell_config.env[0].name == "APP_ENV"
def test_agent_app_request_builder_binds_shell_to_drive_when_configured():
def test_agent_app_request_builder_binds_drive_to_shell_when_configured():
run_input = _agent_app_input(include_shell=True)
run_input.drive_config = DifyDriveLayerConfig(drive_ref="agent-agent-1")
@ -382,11 +382,11 @@ def test_agent_app_request_builder_binds_shell_to_drive_when_configured():
layers = {layer.name: layer for layer in request.composition.layers}
layer_names = [layer.name for layer in request.composition.layers]
assert layers[DIFY_SHELL_LAYER_ID].deps == {
"execution_context": DIFY_EXECUTION_CONTEXT_LAYER_ID,
"drive": DIFY_DRIVE_LAYER_ID,
}
assert layer_names.index(DIFY_DRIVE_LAYER_ID) < layer_names.index(DIFY_SHELL_LAYER_ID)
assert layers[DIFY_SHELL_LAYER_ID].deps == {"execution_context": DIFY_EXECUTION_CONTEXT_LAYER_ID}
shell_config = cast(DifyShellLayerConfig, layers[DIFY_SHELL_LAYER_ID].config)
assert shell_config.agent_stub_drive_ref == "agent-agent-1"
assert layers[DIFY_DRIVE_LAYER_ID].deps == {"shell": DIFY_SHELL_LAYER_ID}
assert layer_names.index(DIFY_SHELL_LAYER_ID) < layer_names.index(DIFY_DRIVE_LAYER_ID)
def test_agent_app_request_builder_adds_knowledge_layer_when_configured():

View File

@ -85,6 +85,49 @@ class _NoToolsBuilder:
del kwargs
def _mock_empty_drive_catalog(monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.setattr(
"core.workflow.nodes.agent_v2.runtime_request_builder.AgentDriveService.list_skills",
lambda self, *, tenant_id, agent_id: [],
)
monkeypatch.setattr(
"core.workflow.nodes.agent_v2.runtime_request_builder.AgentDriveService.manifest",
lambda self, *, tenant_id, agent_id, prefix="", include_download_url=False: [],
)
def _mock_drive_catalog(monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.setattr(
"core.workflow.nodes.agent_v2.runtime_request_builder.AgentDriveService.list_skills",
lambda self, *, tenant_id, agent_id: [
{
"path": "tender-analyzer",
"skill_md_key": "tender-analyzer/SKILL.md",
"archive_key": "tender-analyzer/.DIFY-SKILL-FULL.zip",
"name": "Tender Analyzer",
"description": "Parses RFPs.",
"size": 123,
"mime_type": "text/markdown",
"hash": "hash-1",
"created_at": 1,
}
],
)
monkeypatch.setattr(
"core.workflow.nodes.agent_v2.runtime_request_builder.AgentDriveService.manifest",
lambda self, *, tenant_id, agent_id, prefix="", include_download_url=False: [
{"key": "tender-analyzer/SKILL.md", "is_skill": True},
{"key": "tender-analyzer/.DIFY-SKILL-FULL.zip", "is_skill": False},
{"key": "files/sample.pdf", "is_skill": False},
],
)
@pytest.fixture(autouse=True)
def _mock_default_agent_app_drive_catalog(monkeypatch: pytest.MonkeyPatch) -> None:
_mock_empty_drive_catalog(monkeypatch)
def _ctx(soul: AgentSoulConfig, *, query: str = "hello") -> AgentAppRuntimeBuildContext:
dify_context = SimpleNamespace(
tenant_id="tenant-1",
@ -128,7 +171,15 @@ class TestAgentAppRuntimeRequestBuilder:
req = result.request
assert req.purpose == "agent_app"
names = [layer.name for layer in req.composition.layers]
assert names == ["agent_soul_prompt", "agent_app_user_prompt", "execution_context", "drive", "history", "llm"]
assert names == [
"agent_soul_prompt",
"agent_app_user_prompt",
"execution_context",
"shell",
"drive",
"history",
"llm",
]
# plugin_id / provider normalized for plugin-daemon transport.
llm = next(layer for layer in req.composition.layers if layer.name == "llm")
assert llm.config.plugin_id == "langgenius/openai"
@ -278,6 +329,7 @@ class TestAgentAppDriveLayer:
monkeypatch.setattr(
"core.app.apps.agent_app.runtime_request_builder.dify_config.AGENT_DRIVE_MANIFEST_ENABLED", True
)
_mock_drive_catalog(monkeypatch)
builder = AgentAppRuntimeRequestBuilder(
credentials_provider=_FakeCredentialsProvider(),
plugin_tools_builder=_NoToolsBuilder(), # type: ignore[arg-type]
@ -287,15 +339,18 @@ class TestAgentAppDriveLayer:
drive = next(layer for layer in result.request.composition.layers if layer.name == "drive")
assert drive.type == "dify.drive"
assert drive.deps == {"execution_context": "execution_context"}
assert drive.deps == {"shell": DIFY_SHELL_LAYER_ID}
assert drive.config.drive_ref == "agent-agent-1"
assert [skill.skill_md_key for skill in drive.config.skills] == ["tender-analyzer/SKILL.md"]
assert drive.config.mentioned_skill_keys == ["tender-analyzer/SKILL.md"]
# injected right after execution_context, mirroring the workflow surface
# shell enters first; drive uses that shell to materialize mentioned targets.
names = [layer.name for layer in result.request.composition.layers]
assert names.index("drive") == names.index("execution_context") + 1
assert names.index(DIFY_SHELL_LAYER_ID) == names.index("execution_context") + 1
assert names.index("drive") == names.index(DIFY_SHELL_LAYER_ID) + 1
def test_drive_layer_injected_with_empty_catalog_and_shell_depends_on_it(self, monkeypatch: pytest.MonkeyPatch):
def test_drive_layer_injected_with_empty_catalog_and_drive_depends_on_shell(
self, monkeypatch: pytest.MonkeyPatch
):
monkeypatch.setattr(
"core.app.apps.agent_app.runtime_request_builder.dify_config.AGENT_DRIVE_MANIFEST_ENABLED", True
)
@ -310,10 +365,9 @@ class TestAgentAppDriveLayer:
layers = {layer.name: layer for layer in result.request.composition.layers}
assert layers["drive"].config.drive_ref == "agent-agent-1"
assert layers["drive"].config.skills == []
assert layers[DIFY_SHELL_LAYER_ID].deps == {
"execution_context": "execution_context",
"drive": "drive",
}
assert layers[DIFY_SHELL_LAYER_ID].deps == {"execution_context": "execution_context"}
assert layers[DIFY_SHELL_LAYER_ID].config.agent_stub_drive_ref == "agent-agent-1"
assert layers["drive"].deps == {"shell": DIFY_SHELL_LAYER_ID}
def test_no_drive_layer_when_flag_disabled(self, monkeypatch: pytest.MonkeyPatch):
monkeypatch.setattr(
@ -333,6 +387,7 @@ class TestAgentAppDriveLayer:
monkeypatch.setattr(
"core.app.apps.agent_app.runtime_request_builder.dify_config.AGENT_DRIVE_MANIFEST_ENABLED", True
)
_mock_drive_catalog(monkeypatch)
soul = _soul_with_model()
soul.prompt.system_prompt = (
"Use [§skill:tender-analyzer%2FSKILL.md:Tender Analyzer§] and [§file:files%2Fsample.pdf:sample.pdf§]."
@ -355,6 +410,7 @@ class TestAgentAppDriveLayer:
monkeypatch.setattr(
"core.app.apps.agent_app.runtime_request_builder.dify_config.AGENT_DRIVE_MANIFEST_ENABLED", True
)
_mock_drive_catalog(monkeypatch)
soul = _soul_with_model()
soul.prompt.system_prompt = (
"Use [§skill:ghost%2FSKILL.md:Ghost Skill§], [§file:files%2Fghost.txt:Ghost File§], "
@ -375,6 +431,7 @@ class TestAgentAppDriveLayer:
monkeypatch.setattr(
"core.app.apps.agent_app.runtime_request_builder.dify_config.AGENT_DRIVE_MANIFEST_ENABLED", True
)
_mock_drive_catalog(monkeypatch)
soul = _soul_with_model()
soul.prompt.system_prompt = (
"Use [§skill:tender-analyzer%2FSKILL.md:Tender Analyzer§] and [§file:files%2Fsample.pdf:sample.pdf§]"

View File

@ -1032,10 +1032,9 @@ def test_workflow_run_request_contains_drive_layer_with_empty_catalog(monkeypatc
"mentioned_skill_keys": [],
"mentioned_file_keys": [],
}
assert layers[DIFY_SHELL_LAYER_ID]["deps"] == {
"execution_context": DIFY_EXECUTION_CONTEXT_LAYER_ID,
"drive": "drive",
}
assert layers[DIFY_SHELL_LAYER_ID]["deps"] == {"execution_context": DIFY_EXECUTION_CONTEXT_LAYER_ID}
assert layers[DIFY_SHELL_LAYER_ID]["config"]["agent_stub_drive_ref"] == "agent-agent-1"
assert layers["drive"]["deps"] == {"shell": DIFY_SHELL_LAYER_ID}
def test_build_drive_layer_config_requires_agent_identity():
@ -1061,11 +1060,12 @@ def test_workflow_run_request_contains_drive_layer_when_flag_enabled(monkeypatch
dumped = result.request.model_dump(mode="json")
layer_names = [layer["name"] for layer in dumped["composition"]["layers"]]
assert "drive" in layer_names
# injected right after execution_context, before history/llm
assert layer_names.index("drive") == layer_names.index("execution_context") + 1
# shell enters first; drive uses that shell to materialize mentioned targets.
assert layer_names.index(DIFY_SHELL_LAYER_ID) == layer_names.index("execution_context") + 1
assert layer_names.index("drive") == layer_names.index(DIFY_SHELL_LAYER_ID) + 1
drive = next(layer for layer in dumped["composition"]["layers"] if layer["name"] == "drive")
assert drive["type"] == "dify.drive"
assert drive["deps"] == {"execution_context": "execution_context"}
assert drive["deps"] == {"shell": DIFY_SHELL_LAYER_ID}
assert drive["config"]["drive_ref"] == "agent-agent-1"
assert drive["config"]["skills"] == [
{

View File

@ -6,7 +6,9 @@ only: skills are declared as metadata, not content, and plain files are listed
only when the prompt explicitly mentions their drive keys.
The API backend catalogs and writes this config; the Agent backend consumes it
(ENG-387: pull via back proxy, lazy-load SKILL.md, materialize files).
by running sandbox-visible ``dify-agent drive pull`` commands through the shell
layer so materialized files live in the same filesystem that model shell jobs
use.
"""
from typing import Final

View File

@ -1,37 +1,29 @@
"""Runtime Dify drive layer with eager pull for prompt-mentioned targets.
"""Runtime Dify drive layer with shell-backed eager pulls.
The API backend sends the full drive skill catalog plus the ordered drive keys
mentioned in the prompt. When the layer enters a run context it eagerly pulls
those mentioned skills/files from the Dify inner drive bridge, materializes them
under the fixed Agent Stub drive base for ``drive_ref``, and contributes a
those mentioned skills/files through the already-active shell layer by running
the sandbox-visible ``dify-agent drive pull`` command, then contributes a
concise prompt block describing what was loaded. It also contributes a suffix
prompt with the remaining skill catalog plus ``dify-agent drive`` and
``dify-agent file`` usage so the model has concrete Agent Stub commands for
materializing drive content and workflow files when a shell layer is available.
materializing drive content and workflow files.
"""
from __future__ import annotations
import asyncio
import shlex
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, ClassVar, cast
from typing import ClassVar
import httpx
from typing_extensions import Self, override
from agenton.layers import EmptyRuntimeState, Layer, LayerDeps, PlainLayer
from dify_agent.agent_stub._drive_materialization import (
DriveDownloadPayload,
DriveMaterializationTransferError,
DriveMaterializationValidationError,
materialize_drive_downloads,
)
from agenton.layers import EmptyRuntimeState, LayerDeps, PlainLayer
from dify_agent.agent_stub.protocol import agent_stub_drive_base_for_ref
from dify_agent.agent_stub.protocol.agent_stub import AgentStubDriveItem, AgentStubDriveManifestResponse
from dify_agent.layers.drive.configs import DIFY_DRIVE_LAYER_TYPE_ID, DifyDriveLayerConfig
from dify_agent.layers.shell.layer import DifyShellLayer
_DOWNLOAD_CONCURRENCY = 4
_AGENT_STUB_CLI_USAGE_PROMPT = """Agent Stub CLI usage is available inside shell jobs:
Drive assets are Agent Soul versioned assets:
@ -59,40 +51,23 @@ class DifyDriveLayerError(RuntimeError):
class DifyDriveDeps(LayerDeps):
execution_context: Layer[Any, Any, Any, Any, Any, Any] # pyright: ignore[reportUninitializedInstanceVariable]
shell: DifyShellLayer # pyright: ignore[reportUninitializedInstanceVariable]
@dataclass(slots=True)
class DifyDriveLayer(PlainLayer[DifyDriveDeps, DifyDriveLayerConfig, EmptyRuntimeState]):
"""Drive runtime layer that eagerly materializes prompt-mentioned drive targets."""
"""Drive runtime layer that materializes prompt-mentioned targets via shell."""
type_id: ClassVar[str | None] = DIFY_DRIVE_LAYER_TYPE_ID
config: DifyDriveLayerConfig
inner_api_url: str
inner_api_key: str
_loaded_skill_bodies: dict[str, str] = field(default_factory=dict)
_pulled_file_paths: dict[str, str] = field(default_factory=dict)
@classmethod
@override
def from_config(cls, config: DifyDriveLayerConfig) -> Self:
del config
raise TypeError("DifyDriveLayer requires server-side Dify API settings and must use a provider factory.")
@classmethod
def from_config_with_settings(
cls,
config: DifyDriveLayerConfig,
*,
inner_api_url: str,
inner_api_key: str,
) -> Self:
return cls(
config=DifyDriveLayerConfig.model_validate(config),
inner_api_url=inner_api_url.rstrip("/"),
inner_api_key=inner_api_key,
)
return cls(config=DifyDriveLayerConfig.model_validate(config))
@property
@override
@ -127,9 +102,7 @@ class DifyDriveLayer(PlainLayer[DifyDriveDeps, DifyDriveLayerConfig, EmptyRuntim
if pulled_skill_path is None:
continue
local_path = Path(pulled_skill_path).parent
loaded_skill_sections.append(
f"Path: {skill.path}\nLocal path: {local_path}\nSKILL.md:\n{body}"
)
loaded_skill_sections.append(f"Path: {skill.path}\nLocal path: {local_path}\nSKILL.md:\n{body}")
if loaded_skill_sections:
sections.append("Loaded mentioned skills:\n\n" + "\n\n".join(loaded_skill_sections))
@ -154,12 +127,16 @@ class DifyDriveLayer(PlainLayer[DifyDriveDeps, DifyDriveLayerConfig, EmptyRuntim
if skill.skill_md_key not in mentioned_skill_keys
]
if other_skills:
pull_and_read_command = (
'`skill_dir="$(dify-agent drive pull <SKILL_PATH> --to /tmp/drive)"; '
+ 'printf "%s\\n" "$skill_dir"; cat "$skill_dir/SKILL.md"`'
)
sections.append(
"Other available skills:\n"
+ "\n".join(other_skills)
+ "\n\nTo use one, pull it and read its SKILL.md in one command: "
'`skill_dir="$(dify-agent drive pull <SKILL_PATH> --to /tmp/drive)"; '
'printf "%s\\n" "$skill_dir"; cat "$skill_dir/SKILL.md"`.'
+ pull_and_read_command
+ "."
)
sections.append(_AGENT_STUB_CLI_USAGE_PROMPT)
return "\n\n".join(sections)
@ -167,113 +144,106 @@ class DifyDriveLayer(PlainLayer[DifyDriveDeps, DifyDriveLayerConfig, EmptyRuntim
async def _pull_mentioned_targets(self) -> None:
self._loaded_skill_bodies = {}
self._pulled_file_paths = {}
targets: list[tuple[str, bool]] = [
(self._skill_prefix(skill_key), False) for skill_key in self.config.mentioned_skill_keys
] + [(file_key, True) for file_key in self.config.mentioned_file_keys]
targets = self._mentioned_pull_targets()
if not targets:
return
tenant_id = self._require_tenant_id()
manifest_items = await self._fetch_manifest_items(tenant_id=tenant_id, targets=targets)
written_paths = await self._download_items(manifest_items)
script = self._build_shell_pull_script(targets=targets)
result = await self.deps.shell.run_remote_script(script, inject_agent_stub_env=True)
if result.exit_code != 0:
raise DifyDriveLayerError(
f"drive mentioned pull failed in shell: {result.status} exit_code={result.exit_code}\n{result.output}"
)
if result.truncated:
raise DifyDriveLayerError("drive mentioned pull output was truncated before SKILL.md content was loaded")
written_paths, skill_bodies = self._parse_shell_pull_output(result.output)
self._record_pulled_paths(written_paths)
for skill_key in self.config.mentioned_skill_keys:
body = skill_bodies.get(skill_key)
if body is None:
raise DifyDriveLayerError(f"missing pulled SKILL.md content for mentioned skill {skill_key}")
self._loaded_skill_bodies[skill_key] = body
def _build_shell_pull_script(self, *, targets: list[tuple[str, bool]]) -> str:
pull_targets = list(dict.fromkeys(prefix for prefix, _exact in targets))
base_path = agent_stub_drive_base_for_ref(self.config.drive_ref)
lines = [
"set -eu",
f"base={shlex.quote(base_path)}",
"dify-agent drive pull "
+ " ".join(shlex.quote(target) for target in pull_targets)
+ ' --to "$base"',
]
for skill_key in self.config.mentioned_skill_keys:
skill_path = self._shell_local_path(skill_key)
lines.extend(
[
f"test -f {shlex.quote(skill_path)}",
f"printf '\\n__DIFY_DRIVE_MENTIONED_PATH__\\t%s\\t%s\\n' {shlex.quote(skill_key)} {shlex.quote(skill_path)}",
f"printf '__DIFY_DRIVE_SKILL_BEGIN__\\t%s\\n' {shlex.quote(skill_key)}",
f"cat {shlex.quote(skill_path)}",
f"printf '\\n__DIFY_DRIVE_SKILL_END__\\t%s\\n' {shlex.quote(skill_key)}",
]
)
for file_key in self.config.mentioned_file_keys:
file_path = self._shell_local_path(file_key)
lines.extend(
[
f"test -e {shlex.quote(file_path)}",
f"printf '\\n__DIFY_DRIVE_MENTIONED_PATH__\\t%s\\t%s\\n' {shlex.quote(file_key)} {shlex.quote(file_path)}",
]
)
return "\n".join(lines)
def _parse_shell_pull_output(self, output: str) -> tuple[dict[str, str], dict[str, str]]:
written_paths: dict[str, str] = {}
skill_bodies: dict[str, str] = {}
current_skill_key: str | None = None
current_skill_body: list[str] = []
for line in output.splitlines(keepends=True):
stripped_line = line.rstrip("\n")
if current_skill_key is not None:
if stripped_line == f"__DIFY_DRIVE_SKILL_END__\t{current_skill_key}":
skill_bodies[current_skill_key] = "".join(current_skill_body)
current_skill_key = None
current_skill_body = []
continue
current_skill_body.append(line)
continue
if stripped_line.startswith("__DIFY_DRIVE_MENTIONED_PATH__\t"):
parts = stripped_line.split("\t", 2)
if len(parts) != 3:
raise DifyDriveLayerError("drive mentioned pull emitted an invalid path marker")
_marker, key, path = parts
written_paths[key] = path
continue
if stripped_line.startswith("__DIFY_DRIVE_SKILL_BEGIN__\t"):
current_skill_key = stripped_line.split("\t", 1)[1]
current_skill_body = []
if current_skill_key is not None:
raise DifyDriveLayerError(f"drive mentioned pull omitted SKILL.md end marker for {current_skill_key}")
return written_paths, skill_bodies
def _record_pulled_paths(self, written_paths: dict[str, str]) -> None:
self._pulled_file_paths = written_paths
for file_key in self.config.mentioned_file_keys:
if file_key not in written_paths:
raise DifyDriveLayerError(f"missing pulled file for mentioned drive key {file_key}")
for skill_key in self.config.mentioned_skill_keys:
skill_path = written_paths.get(skill_key)
if skill_path is None:
if skill_key not in written_paths:
raise DifyDriveLayerError(f"missing pulled SKILL.md for mentioned skill {skill_key}")
try:
self._loaded_skill_bodies[skill_key] = Path(skill_path).read_text(encoding="utf-8")
except (OSError, UnicodeError) as exc:
raise DifyDriveLayerError(f"failed to load pulled SKILL.md for mentioned skill {skill_key}") from exc
async def _fetch_manifest_items(
self,
*,
tenant_id: str,
targets: list[tuple[str, bool]],
) -> list[AgentStubDriveItem]:
semaphore = asyncio.Semaphore(_DOWNLOAD_CONCURRENCY)
def _mentioned_pull_targets(self) -> list[tuple[str, bool]]:
return [(self._skill_prefix(skill_key), False) for skill_key in self.config.mentioned_skill_keys] + [
(file_key, True) for file_key in self.config.mentioned_file_keys
]
async with httpx.AsyncClient(timeout=30.0, follow_redirects=True, trust_env=False) as client:
async def fetch_one(target: tuple[str, bool]) -> list[AgentStubDriveItem]:
prefix, exact = target
try:
async with semaphore:
response = await client.get(
f"{self.inner_api_url}/inner/api/drive/{self.config.drive_ref}/manifest",
params={
"tenant_id": tenant_id,
"prefix": prefix,
"include_download_url": "true",
},
headers={"X-Inner-Api-Key": self.inner_api_key},
)
except (httpx.InvalidURL, httpx.TimeoutException, httpx.RequestError) as exc:
raise DifyDriveLayerError(f"drive manifest request failed for {prefix}") from exc
if response.is_error:
raise DifyDriveLayerError(f"drive manifest request failed for {prefix}: {response.status_code}")
try:
payload = AgentStubDriveManifestResponse.model_validate(response.json())
except (ValueError, TypeError) as exc:
raise DifyDriveLayerError(f"drive manifest response is invalid for {prefix}") from exc
manifest_items: list[AgentStubDriveItem] = []
for item in payload.items:
if not item.download_url:
raise DifyDriveLayerError(f"drive manifest item is missing download_url for {prefix}")
if exact and item.key != prefix:
continue
manifest_items.append(item)
return manifest_items
grouped_items = await asyncio.gather(*(fetch_one(target) for target in targets))
deduplicated: dict[str, AgentStubDriveItem] = {}
for items in grouped_items:
for item in items:
deduplicated.setdefault(item.key, item)
return [deduplicated[key] for key in sorted(deduplicated)]
async def _download_items(self, items: list[AgentStubDriveItem]) -> dict[str, str]:
base_path = Path(agent_stub_drive_base_for_ref(self.config.drive_ref))
semaphore = asyncio.Semaphore(_DOWNLOAD_CONCURRENCY)
async with httpx.AsyncClient(timeout=30.0, follow_redirects=True, trust_env=False) as client:
async def download_one(item: AgentStubDriveItem) -> DriveDownloadPayload:
download_url = item.download_url
if not download_url:
raise DifyDriveLayerError(f"drive manifest item is missing download_url for {item.key}")
try:
async with semaphore:
response = await client.get(download_url)
except (httpx.InvalidURL, httpx.TimeoutException, httpx.RequestError) as exc:
raise DifyDriveLayerError(f"drive download failed for {item.key}") from exc
if response.is_error:
raise DifyDriveLayerError(f"drive download failed for {item.key}: {response.status_code}")
return DriveDownloadPayload(key=item.key, payload=response.content, size=item.size)
downloads = await asyncio.gather(*(download_one(item) for item in items))
try:
written_paths = materialize_drive_downloads(
base_path=base_path,
downloads=downloads,
)
except (DriveMaterializationValidationError, DriveMaterializationTransferError) as exc:
raise DifyDriveLayerError(str(exc)) from exc
return {download.key: str(path) for download, path in zip(downloads, written_paths, strict=True)}
def _require_tenant_id(self) -> str:
execution_context = self.deps.execution_context.config
tenant_id = getattr(execution_context, "tenant_id", None)
if not isinstance(tenant_id, str) or not tenant_id.strip():
raise DifyDriveLayerError("DifyDriveLayer requires execution_context.tenant_id")
return cast(str, tenant_id).strip()
def _shell_local_path(self, drive_key: str) -> str:
return f"{agent_stub_drive_base_for_ref(self.config.drive_ref).rstrip('/')}/{drive_key.lstrip('/')}"
@staticmethod
def _skill_prefix(skill_key: str) -> str:

View File

@ -3,7 +3,8 @@
Server-only shellctl connection settings are injected by the runtime provider
factory. Public config carries product-level Agent Soul settings that must affect
the sandbox workspace itself: CLI tool bootstrap commands, normal environment
variables, secret environment variable names, and sandbox-provider metadata.
variables, secret environment variable names, sandbox-provider metadata, and the
Agent Stub drive ref used by shell-visible drive commands.
"""
import re
@ -80,6 +81,8 @@ class DifyShellLayerConfig(LayerConfig):
model_config: ClassVar[ConfigDict] = ConfigDict(extra="forbid")
# Optional because shell can be used without a drive layer.
agent_stub_drive_ref: str | None = Field(default=None, max_length=1024)
cli_tools: list[DifyShellCliToolConfig] = Field(default_factory=list)
env: list[DifyShellEnvVarConfig] = Field(default_factory=list)
secret_refs: list[DifyShellSecretRefConfig] = Field(default_factory=list)

View File

@ -51,7 +51,6 @@ from typing_extensions import Self, override
from agenton.layers import LayerDeps, PydanticAILayer, PydanticAIPrompt, PydanticAITool
from dify_agent.agent_stub.server.shell_agent_stub_env import ShellAgentStubTokenFactory, build_shell_agent_stub_env
from dify_agent.layers.drive.layer import DifyDriveLayer
from dify_agent.layers.execution_context.layer import DifyExecutionContextLayer
from dify_agent.layers.shell.configs import DIFY_SHELL_LAYER_TYPE_ID, DifyShellLayerConfig
@ -173,11 +172,11 @@ type ShellInterruptToolResult = ShellJobStatusObservation | ShellToolErrorObserv
class DifyShellLayerDeps(LayerDeps):
"""Optional direct-layer dependencies used by the shell runtime layer.
The drive dependency supplies the drive ref for injected
Agent Stub CLI commands; the execution context supplies the token principal.
The execution context supplies the token principal. The drive ref used for
Agent Stub CLI commands is passed through config so the drive layer can
depend on shell for eager materialization without a dependency cycle.
"""
drive: DifyDriveLayer | None # pyright: ignore[reportUninitializedInstanceVariable]
execution_context: DifyExecutionContextLayer | None # pyright: ignore[reportUninitializedInstanceVariable]
@ -768,10 +767,9 @@ class DifyShellLayer(PydanticAILayer[DifyShellLayerDeps, object, DifyShellLayerC
"""Build per-command Agent Stub env only for user-visible ``shell.run``."""
execution_context_layer = self.deps.execution_context
execution_context = execution_context_layer.config if execution_context_layer is not None else None
drive_layer = self.deps.drive
return build_shell_agent_stub_env(
agent_stub_api_base_url=self.agent_stub_api_base_url,
agent_stub_drive_ref=drive_layer.config.drive_ref if drive_layer is not None else None,
agent_stub_drive_ref=self.config.agent_stub_drive_ref,
execution_context=execution_context,
token_factory=self.agent_stub_token_factory,
session_id=self.runtime_state.session_id,

View File

@ -38,7 +38,6 @@ from dify_agent.agent_stub.server.tokens.agent_stub import AgentStubTokenCodec
from dify_agent.layers.ask_human.layer import DifyAskHumanLayer
from dify_agent.layers.dify_plugin.llm_layer import DifyPluginLLMLayer
from dify_agent.layers.dify_plugin.tools_layer import DifyPluginToolsLayer
from dify_agent.layers.drive import DifyDriveLayerConfig
from dify_agent.layers.drive.layer import DifyDriveLayer
from dify_agent.layers.execution_context.configs import DifyExecutionContextLayerConfig
from dify_agent.layers.execution_context.layer import DifyExecutionContextLayer
@ -90,14 +89,7 @@ def create_default_layer_providers(
LayerProvider.from_layer_type(PydanticAIHistoryLayer),
LayerProvider.from_layer_type(DifyOutputLayer),
LayerProvider.from_layer_type(DifyAskHumanLayer),
LayerProvider.from_factory(
layer_type=DifyDriveLayer,
create=lambda config: DifyDriveLayer.from_config_with_settings(
DifyDriveLayerConfig.model_validate(config),
inner_api_url=inner_api_url,
inner_api_key=inner_api_key,
),
),
LayerProvider.from_layer_type(DifyDriveLayer),
LayerProvider.from_factory(
layer_type=DifyExecutionContextLayer,
create=lambda config: DifyExecutionContextLayer.from_config_with_settings(

View File

@ -48,10 +48,8 @@ def test_layer_config_rejects_unknown_fields() -> None:
def test_drive_layer_is_registered_and_constructible_from_config() -> None:
layer = DifyDriveLayer.from_config_with_settings(
layer = DifyDriveLayer.from_config(
DifyDriveLayerConfig(drive_ref="agent-1", skills=[], mentioned_skill_keys=[], mentioned_file_keys=[]),
inner_api_url="https://api.example.com",
inner_api_key="secret",
)
assert isinstance(layer, DifyDriveLayer)

View File

@ -2,38 +2,30 @@
from __future__ import annotations
from io import BytesIO
from pathlib import Path
from typing import ClassVar
from zipfile import ZipFile
from typing import cast
import pytest
from agenton.layers import EmptyRuntimeState, LayerConfig, NoLayerDeps, PlainLayer
from dify_agent.agent_stub._drive_materialization import DriveDownloadPayload
from dify_agent.agent_stub.protocol.agent_stub import AgentStubDriveItem
from dify_agent.layers.drive import DifyDriveLayerConfig, DifyDriveSkillConfig
from dify_agent.layers.drive.layer import DifyDriveLayer, DifyDriveLayerError
from dify_agent.layers.shell import DifyShellLayerConfig
from dify_agent.layers.shell.layer import DifyShellLayer, RemoteCommandResult, ShellctlClientFactory
class _FakeExecutionContextConfig(LayerConfig):
tenant_id: str
def _unused_client_factory(_entrypoint: str):
raise AssertionError("shellctl client should not be used by these drive-layer tests")
class _FakeExecutionContextLayer(PlainLayer[NoLayerDeps, _FakeExecutionContextConfig, EmptyRuntimeState]):
type_id: ClassVar[str | None] = None
config: _FakeExecutionContextConfig
def __new__(cls) -> _FakeExecutionContextLayer:
return super().__new__(cls)
def __init__(self) -> None:
self.config = _FakeExecutionContextConfig(tenant_id="tenant-1")
def _shell_layer() -> DifyShellLayer:
return DifyShellLayer.from_config_with_settings(
DifyShellLayerConfig(agent_stub_drive_ref="agent-1"),
shellctl_entrypoint="http://shellctl",
shellctl_client_factory=cast(ShellctlClientFactory, _unused_client_factory),
)
def _build_layer(tmp_path: Path) -> DifyDriveLayer:
layer = DifyDriveLayer.from_config_with_settings(
def _build_layer() -> DifyDriveLayer:
layer = DifyDriveLayer.from_config(
DifyDriveLayerConfig(
drive_ref="agent-1",
skills=[
@ -54,48 +46,45 @@ def _build_layer(tmp_path: Path) -> DifyDriveLayer:
],
mentioned_skill_keys=["tender-analyzer/SKILL.md"],
mentioned_file_keys=["files/report.pdf"],
),
inner_api_url="https://api.example.com",
inner_api_key="secret",
)
)
layer.bind_deps({"execution_context": _FakeExecutionContextLayer()})
layer.bind_deps({"shell": _shell_layer()})
return layer
class _FakeAsyncResponse:
def __init__(self, *, status_code: int = 200, json_data: object | None = None, content: bytes = b"") -> None:
self.status_code = status_code
self._json_data = json_data
self.content = content
@property
def is_error(self) -> bool:
return self.status_code >= 400
def json(self) -> object:
if isinstance(self._json_data, Exception):
raise self._json_data
return self._json_data
def _remote_result(
output: str,
*,
exit_code: int | None = 0,
truncated: bool = False,
) -> RemoteCommandResult:
return RemoteCommandResult(
job_id="remote-drive-pull",
status="exited",
done=True,
exit_code=exit_code,
output=output,
offset=len(output),
truncated=truncated,
output_path="/tmp/output.log",
)
class _FakeAsyncClient:
def __init__(self, responses: dict[str, _FakeAsyncResponse]) -> None:
self._responses = responses
async def __aenter__(self) -> _FakeAsyncClient:
return self
async def __aexit__(self, exc_type, exc, tb) -> None:
del exc_type, exc, tb
async def get(self, url: str, **kwargs) -> _FakeAsyncResponse:
prefix = kwargs.get("params", {}).get("prefix")
key = f"manifest:{prefix}" if prefix is not None else f"download:{url}"
return self._responses[key]
def _pulled_output() -> str:
return (
"/mnt/drive/agent-1/tender-analyzer\n"
"/mnt/drive/agent-1/files/report.pdf\n"
"__DIFY_DRIVE_MENTIONED_PATH__\ttender-analyzer/SKILL.md\t/mnt/drive/agent-1/tender-analyzer/SKILL.md\n"
"__DIFY_DRIVE_SKILL_BEGIN__\ttender-analyzer/SKILL.md\n"
"# Tender Analyzer\n"
"Use carefully.\n"
"__DIFY_DRIVE_SKILL_END__\ttender-analyzer/SKILL.md\n"
"__DIFY_DRIVE_MENTIONED_PATH__\tfiles/report.pdf\t/mnt/drive/agent-1/files/report.pdf\n"
)
def test_drive_layer_exposes_agent_stub_cli_usage_suffix_prompt(tmp_path: Path) -> None:
layer = _build_layer(tmp_path)
def test_drive_layer_exposes_agent_stub_cli_usage_suffix_prompt() -> None:
layer = _build_layer()
assert len(layer.suffix_prompts) == 1
prompt = layer.suffix_prompts[0]
@ -121,317 +110,146 @@ def test_drive_layer_exposes_agent_stub_cli_usage_suffix_prompt(tmp_path: Path)
@pytest.mark.anyio
async def test_on_context_create_loads_mentioned_targets_into_prompt(
monkeypatch: pytest.MonkeyPatch, tmp_path: Path
async def test_on_context_create_pulls_mentioned_targets_through_shell(
monkeypatch: pytest.MonkeyPatch,
) -> None:
layer = _build_layer(tmp_path)
layer = _build_layer()
captured: dict[str, object] = {}
async def _fetch_manifest_items(*, tenant_id: str, targets: list[tuple[str, bool]]) -> list[AgentStubDriveItem]:
assert tenant_id == "tenant-1"
assert targets == [("tender-analyzer/", False), ("files/report.pdf", True)]
return [
AgentStubDriveItem(key="tender-analyzer/SKILL.md", download_url="https://files/skill-md"),
AgentStubDriveItem(key="files/report.pdf", download_url="https://files/report"),
]
async def fake_run_remote_script(
self: DifyShellLayer,
script: str,
*,
timeout: float = 10.0,
inject_agent_stub_env: bool = False,
) -> RemoteCommandResult:
del self, timeout
captured["script"] = script
captured["inject_agent_stub_env"] = inject_agent_stub_env
return _remote_result(_pulled_output())
async def _download_items(items: list[AgentStubDriveItem]) -> dict[str, str]:
assert {item.key for item in items} == {"files/report.pdf", "tender-analyzer/SKILL.md"}
skill_path = tmp_path / "tender-analyzer" / "SKILL.md"
skill_path.parent.mkdir(parents=True, exist_ok=True)
skill_path.write_text("# Tender Analyzer\nUse carefully.\n", encoding="utf-8")
file_path = tmp_path / "files" / "report.pdf"
file_path.parent.mkdir(parents=True, exist_ok=True)
file_path.write_bytes(b"pdf")
return {
"tender-analyzer/SKILL.md": str(skill_path),
"files/report.pdf": str(file_path),
}
monkeypatch.setattr(layer, "_fetch_manifest_items", _fetch_manifest_items)
monkeypatch.setattr(layer, "_download_items", _download_items)
monkeypatch.setattr(DifyShellLayer, "run_remote_script", fake_run_remote_script)
await layer.on_context_create()
script = captured["script"]
assert isinstance(script, str)
assert captured["inject_agent_stub_env"] is True
assert "base=/mnt/drive/agent-1" in script
assert 'dify-agent drive pull tender-analyzer/ files/report.pdf --to "$base"' in script
assert "cat /mnt/drive/agent-1/tender-analyzer/SKILL.md" in script
prompt = layer.build_prompt_context()
assert "Loaded mentioned skills" in prompt
assert f"Local path: {tmp_path / 'tender-analyzer'}" in prompt
assert "Path: tender-analyzer" in prompt
assert "Local path: /mnt/drive/agent-1/tender-analyzer" in prompt
assert "Name: Tender Analyzer" not in prompt
assert "# Tender Analyzer\nUse carefully." in prompt
assert f"files/report.pdf -> {tmp_path / 'files' / 'report.pdf'}" in prompt
assert "files/report.pdf -> /mnt/drive/agent-1/files/report.pdf" in prompt
assert "Other available skills" not in prompt
assert "other-skill: Other Skill — Fallback catalog entry." not in prompt
@pytest.mark.anyio
async def test_on_context_resume_loads_mentioned_targets_into_prompt(
monkeypatch: pytest.MonkeyPatch, tmp_path: Path
async def test_on_context_resume_repulls_mentioned_targets_through_shell(
monkeypatch: pytest.MonkeyPatch,
) -> None:
layer = _build_layer(tmp_path)
layer = _build_layer()
calls = 0
async def _fetch_manifest_items(*, tenant_id: str, targets: list[tuple[str, bool]]) -> list[AgentStubDriveItem]:
assert tenant_id == "tenant-1"
assert targets == [("tender-analyzer/", False), ("files/report.pdf", True)]
return [
AgentStubDriveItem(key="tender-analyzer/SKILL.md", download_url="https://files/skill-md"),
AgentStubDriveItem(key="files/report.pdf", download_url="https://files/report"),
]
async def fake_run_remote_script(
self: DifyShellLayer,
script: str,
*,
timeout: float = 10.0,
inject_agent_stub_env: bool = False,
) -> RemoteCommandResult:
del self, script, timeout
nonlocal calls
calls += 1
assert inject_agent_stub_env is True
return _remote_result(_pulled_output())
async def _download_items(items: list[AgentStubDriveItem]) -> dict[str, str]:
assert {item.key for item in items} == {"files/report.pdf", "tender-analyzer/SKILL.md"}
skill_path = tmp_path / "tender-analyzer" / "SKILL.md"
skill_path.parent.mkdir(parents=True, exist_ok=True)
skill_path.write_text("# Tender Analyzer\nUse carefully.\n", encoding="utf-8")
file_path = tmp_path / "files" / "report.pdf"
file_path.parent.mkdir(parents=True, exist_ok=True)
file_path.write_bytes(b"pdf")
return {
"tender-analyzer/SKILL.md": str(skill_path),
"files/report.pdf": str(file_path),
}
monkeypatch.setattr(layer, "_fetch_manifest_items", _fetch_manifest_items)
monkeypatch.setattr(layer, "_download_items", _download_items)
monkeypatch.setattr(DifyShellLayer, "run_remote_script", fake_run_remote_script)
await layer.on_context_resume()
prompt = layer.build_prompt_context()
assert "Loaded mentioned skills" in prompt
assert f"Local path: {tmp_path / 'tender-analyzer'}" in prompt
assert "Name: Tender Analyzer" not in prompt
assert "# Tender Analyzer\nUse carefully." in prompt
assert f"files/report.pdf -> {tmp_path / 'files' / 'report.pdf'}" in prompt
assert "Other available skills" not in prompt
assert "other-skill: Other Skill — Fallback catalog entry." not in prompt
assert calls == 1
assert "Loaded mentioned skills" in layer.build_prompt_context()
@pytest.mark.anyio
async def test_on_context_create_raises_when_mentioned_file_is_missing(
monkeypatch: pytest.MonkeyPatch,
tmp_path: Path,
) -> None:
layer = _build_layer(tmp_path)
layer = _build_layer()
async def _fetch_manifest_items(*, tenant_id: str, targets: list[tuple[str, bool]]) -> list[AgentStubDriveItem]:
del tenant_id, targets
return [AgentStubDriveItem(key="tender-analyzer/SKILL.md", download_url="https://files/skill-md")]
async def _download_items(items: list[AgentStubDriveItem]) -> dict[str, str]:
del items
skill_path = tmp_path / "tender-analyzer" / "SKILL.md"
skill_path.parent.mkdir(parents=True, exist_ok=True)
skill_path.write_text("# Tender Analyzer\nUse carefully.\n", encoding="utf-8")
return {"tender-analyzer/SKILL.md": str(skill_path)}
monkeypatch.setattr(layer, "_fetch_manifest_items", _fetch_manifest_items)
monkeypatch.setattr(layer, "_download_items", _download_items)
with pytest.raises(DifyDriveLayerError, match="missing pulled file"):
await layer.on_context_create()
@pytest.mark.anyio
async def test_fetch_manifest_items_validates_payload_filters_exact_targets_and_deduplicates(
monkeypatch: pytest.MonkeyPatch,
tmp_path: Path,
) -> None:
layer = _build_layer(tmp_path)
responses = {
"manifest:tender-analyzer/": _FakeAsyncResponse(
json_data={
"items": [
{"key": "tender-analyzer/SKILL.md", "download_url": "https://files/skill-md", "size": 7},
{"key": "files/report.pdf", "download_url": "https://files/report", "size": 3},
]
}
),
"manifest:files/report.pdf": _FakeAsyncResponse(
json_data={
"items": [
{"key": "files/report.pdf", "download_url": "https://files/report", "size": 3},
{"key": "files/other.pdf", "download_url": "https://files/other", "size": 4},
]
}
),
}
monkeypatch.setattr(
"dify_agent.layers.drive.layer.httpx.AsyncClient",
lambda **_kwargs: _FakeAsyncClient(responses),
)
result = await layer._fetch_manifest_items(
tenant_id="tenant-1",
targets=[("tender-analyzer/", False), ("files/report.pdf", True)],
)
assert {(item.key, item.download_url, item.size) for item in result} == {
("files/report.pdf", "https://files/report", 3),
("tender-analyzer/SKILL.md", "https://files/skill-md", 7),
}
assert [item.key for item in result].count("files/report.pdf") == 1
@pytest.mark.anyio
async def test_fetch_manifest_items_rejects_invalid_manifest_payload(
monkeypatch: pytest.MonkeyPatch,
tmp_path: Path,
) -> None:
layer = _build_layer(tmp_path)
responses = {"manifest:tender-analyzer/": _FakeAsyncResponse(json_data={"items": "bad"})}
monkeypatch.setattr(
"dify_agent.layers.drive.layer.httpx.AsyncClient",
lambda **_kwargs: _FakeAsyncClient(responses),
)
with pytest.raises(DifyDriveLayerError, match="drive manifest response is invalid"):
await layer._fetch_manifest_items(tenant_id="tenant-1", targets=[("tender-analyzer/", False)])
@pytest.mark.anyio
async def test_fetch_manifest_items_rejects_missing_download_url(
monkeypatch: pytest.MonkeyPatch,
tmp_path: Path,
) -> None:
layer = _build_layer(tmp_path)
responses = {
"manifest:tender-analyzer/": _FakeAsyncResponse(
json_data={"items": [{"key": "tender-analyzer/SKILL.md", "download_url": None, "size": 7}]}
async def fake_run_remote_script(
self: DifyShellLayer,
script: str,
*,
timeout: float = 10.0,
inject_agent_stub_env: bool = False,
) -> RemoteCommandResult:
del self, script, timeout, inject_agent_stub_env
output = (
"__DIFY_DRIVE_MENTIONED_PATH__\ttender-analyzer/SKILL.md\t/mnt/drive/agent-1/tender-analyzer/SKILL.md\n"
"__DIFY_DRIVE_SKILL_BEGIN__\ttender-analyzer/SKILL.md\n"
"# Tender Analyzer\n"
"__DIFY_DRIVE_SKILL_END__\ttender-analyzer/SKILL.md\n"
)
}
monkeypatch.setattr(
"dify_agent.layers.drive.layer.httpx.AsyncClient",
lambda **_kwargs: _FakeAsyncClient(responses),
)
return _remote_result(output)
with pytest.raises(DifyDriveLayerError, match="missing download_url"):
await layer._fetch_manifest_items(tenant_id="tenant-1", targets=[("tender-analyzer/", False)])
@pytest.mark.anyio
async def test_download_items_hands_validated_downloads_to_materialization(
monkeypatch: pytest.MonkeyPatch,
tmp_path: Path,
) -> None:
layer = _build_layer(tmp_path)
responses = {
"download:https://files/skill-md": _FakeAsyncResponse(content=b"skill-md"),
"download:https://files/report": _FakeAsyncResponse(content=b"pdf"),
}
monkeypatch.setattr(
"dify_agent.layers.drive.layer.httpx.AsyncClient",
lambda **_kwargs: _FakeAsyncClient(responses),
)
captured: dict[str, object] = {}
def fake_materialize_drive_downloads(*, base_path: Path, downloads: list[DriveDownloadPayload]):
captured["base_path"] = base_path
captured["downloads"] = downloads
return [tmp_path / "tender-analyzer" / "SKILL.md", tmp_path / "files" / "report.pdf"]
monkeypatch.setattr(
"dify_agent.layers.drive.layer.materialize_drive_downloads",
fake_materialize_drive_downloads,
)
result = await layer._download_items(
[
AgentStubDriveItem(key="tender-analyzer/SKILL.md", download_url="https://files/skill-md", size=8),
AgentStubDriveItem(key="files/report.pdf", download_url="https://files/report", size=3),
]
)
downloads = captured["downloads"]
assert isinstance(downloads, list)
assert downloads == [
DriveDownloadPayload(key="tender-analyzer/SKILL.md", payload=b"skill-md", size=8),
DriveDownloadPayload(key="files/report.pdf", payload=b"pdf", size=3),
]
assert result == {
"tender-analyzer/SKILL.md": str(tmp_path / "tender-analyzer" / "SKILL.md"),
"files/report.pdf": str(tmp_path / "files" / "report.pdf"),
}
@pytest.mark.anyio
async def test_download_items_extracts_skill_archive_over_skill_md_and_deletes_archive(
monkeypatch: pytest.MonkeyPatch,
tmp_path: Path,
) -> None:
layer = _build_layer(tmp_path)
archive_buffer = BytesIO()
with ZipFile(archive_buffer, mode="w") as archive:
archive.writestr("SKILL.md", "# From archive\n")
archive.writestr("helper.py", "print('archive')\n")
archive_bytes = archive_buffer.getvalue()
responses = {
"download:https://files/skill-md": _FakeAsyncResponse(content=b"# From manifest\n"),
"download:https://files/archive": _FakeAsyncResponse(content=archive_bytes),
}
monkeypatch.setattr(
"dify_agent.layers.drive.layer.httpx.AsyncClient",
lambda **_kwargs: _FakeAsyncClient(responses),
)
monkeypatch.setattr(
"dify_agent.layers.drive.layer.agent_stub_drive_base_for_ref",
lambda _drive_ref: str(tmp_path),
)
result = await layer._download_items(
[
AgentStubDriveItem(key="tender-analyzer/SKILL.md", download_url="https://files/skill-md", size=16),
AgentStubDriveItem(
key="tender-analyzer/.DIFY-SKILL-FULL.zip",
download_url="https://files/archive",
size=len(archive_bytes),
),
]
)
archive_path = tmp_path / "tender-analyzer" / ".DIFY-SKILL-FULL.zip"
assert result["tender-analyzer/.DIFY-SKILL-FULL.zip"] == str(archive_path)
assert not archive_path.exists()
assert (tmp_path / "tender-analyzer" / "SKILL.md").read_text(encoding="utf-8") == "# From archive\n"
assert (tmp_path / "tender-analyzer" / "helper.py").read_text(encoding="utf-8") == "print('archive')\n"
@pytest.mark.anyio
async def test_on_context_resume_raises_when_mentioned_targets_are_missing(
monkeypatch: pytest.MonkeyPatch,
tmp_path: Path,
) -> None:
layer = _build_layer(tmp_path)
async def _fetch_manifest_items(*, tenant_id: str, targets: list[tuple[str, bool]]) -> list[AgentStubDriveItem]:
del tenant_id, targets
return []
async def _download_items(items: list[AgentStubDriveItem]) -> dict[str, str]:
assert items == []
return {}
monkeypatch.setattr(layer, "_fetch_manifest_items", _fetch_manifest_items)
monkeypatch.setattr(layer, "_download_items", _download_items)
with pytest.raises(DifyDriveLayerError, match="missing pulled file"):
await layer.on_context_resume()
@pytest.mark.anyio
async def test_on_context_create_raises_when_manifest_is_empty_for_mentioned_targets(
monkeypatch: pytest.MonkeyPatch,
tmp_path: Path,
) -> None:
layer = _build_layer(tmp_path)
async def _fetch_manifest_items(*, tenant_id: str, targets: list[tuple[str, bool]]) -> list[AgentStubDriveItem]:
del tenant_id, targets
return []
async def _download_items(items: list[AgentStubDriveItem]) -> dict[str, str]:
assert items == []
return {}
monkeypatch.setattr(layer, "_fetch_manifest_items", _fetch_manifest_items)
monkeypatch.setattr(layer, "_download_items", _download_items)
monkeypatch.setattr(DifyShellLayer, "run_remote_script", fake_run_remote_script)
with pytest.raises(DifyDriveLayerError, match="missing pulled file"):
await layer.on_context_create()
@pytest.mark.anyio
async def test_on_context_create_raises_when_shell_pull_fails(
monkeypatch: pytest.MonkeyPatch,
) -> None:
layer = _build_layer()
async def fake_run_remote_script(
self: DifyShellLayer,
script: str,
*,
timeout: float = 10.0,
inject_agent_stub_env: bool = False,
) -> RemoteCommandResult:
del self, script, timeout, inject_agent_stub_env
return _remote_result("permission denied\n", exit_code=1)
monkeypatch.setattr(DifyShellLayer, "run_remote_script", fake_run_remote_script)
with pytest.raises(DifyDriveLayerError, match="drive mentioned pull failed in shell"):
await layer.on_context_create()
@pytest.mark.anyio
async def test_on_context_create_raises_when_shell_output_is_truncated(
monkeypatch: pytest.MonkeyPatch,
) -> None:
layer = _build_layer()
async def fake_run_remote_script(
self: DifyShellLayer,
script: str,
*,
timeout: float = 10.0,
inject_agent_stub_env: bool = False,
) -> RemoteCommandResult:
del self, script, timeout, inject_agent_stub_env
return _remote_result(_pulled_output(), truncated=True)
monkeypatch.setattr(DifyShellLayer, "run_remote_script", fake_run_remote_script)
with pytest.raises(DifyDriveLayerError, match="output was truncated"):
await layer.on_context_create()
def test_parse_shell_pull_output_rejects_unclosed_skill_marker() -> None:
layer = _build_layer()
with pytest.raises(DifyDriveLayerError, match="omitted SKILL.md end marker"):
layer._parse_shell_pull_output("__DIFY_DRIVE_SKILL_BEGIN__\ttender-analyzer/SKILL.md\n# Tender\n")

View File

@ -29,6 +29,7 @@ def test_shell_layer_config_defaults_and_forbids_unknown_fields() -> None:
config = DifyShellLayerConfig()
assert config.model_dump() == {
"agent_stub_drive_ref": None,
"cli_tools": [],
"env": [],
"secret_refs": [],
@ -51,6 +52,7 @@ def test_shell_layer_config_accepts_agent_soul_shell_settings() -> None:
],
env=[DifyShellEnvVarConfig(name="PROJECT_NAME", value="demo")],
secret_refs=[DifyShellSecretRefConfig(name="OPENAI_API_KEY", ref="credential-1")],
agent_stub_drive_ref="agent-1",
sandbox=DifyShellSandboxConfig(provider="independent", config={"cpu": 2}),
)
@ -59,6 +61,7 @@ def test_shell_layer_config_accepts_agent_soul_shell_settings() -> None:
assert config.cli_tools[0].secret_refs[0].ref == "credential-2"
assert config.env[0].name == "PROJECT_NAME"
assert config.secret_refs[0].ref == "credential-1"
assert config.agent_stub_drive_ref == "agent-1"
assert config.sandbox is not None
assert config.sandbox.config == {"cpu": 2}

View File

@ -14,8 +14,6 @@ from dify_agent.agent_stub.server.shell_agent_stub_env import (
AGENT_STUB_DRIVE_BASE_ENV_VAR,
AGENT_STUB_API_BASE_URL_ENV_VAR,
)
from dify_agent.layers.drive import DifyDriveLayerConfig
from dify_agent.layers.drive.layer import DifyDriveLayer
from dify_agent.layers.execution_context import DifyExecutionContextLayerConfig
from dify_agent.layers.execution_context.layer import DifyExecutionContextLayer
from dify_agent.layers.shell import (
@ -238,14 +236,6 @@ def _execution_context_layer() -> DifyExecutionContextLayer:
)
def _drive_layer() -> DifyDriveLayer:
return DifyDriveLayer.from_config_with_settings(
DifyDriveLayerConfig(drive_ref="agent-1"),
inner_api_url="https://api.example.com",
inner_api_key="secret",
)
def _shell_provider(*, client_factory: ShellctlClientFactory) -> LayerProvider[DifyShellLayer]:
return LayerProvider.from_factory(
layer_type=DifyShellLayer,
@ -666,7 +656,7 @@ def test_shell_layer_injects_agent_stub_env_only_for_user_visible_shell_run() ->
client = FakeShellctlClient(run_handler=run_handler)
layer = DifyShellLayer.from_config_with_settings(
DifyShellLayerConfig(),
DifyShellLayerConfig(agent_stub_drive_ref="agent-1"),
shellctl_entrypoint="http://shellctl",
shellctl_client_factory=lambda _entrypoint: client,
agent_stub_api_base_url="https://agent.example.com/agent-stub",
@ -674,7 +664,7 @@ def test_shell_layer_injects_agent_stub_env_only_for_user_visible_shell_run() ->
f"token-for:{execution_context.tenant_id}:{session_id}"
),
)
layer.deps = layer.deps_type(drive=_drive_layer(), execution_context=_execution_context_layer())
layer.deps = layer.deps_type(execution_context=_execution_context_layer())
tools = {tool.name: tool for tool in layer.tools}
async def scenario() -> None:
@ -793,7 +783,7 @@ def test_run_remote_script_can_inject_agent_stub_env_for_server_owned_uploads()
client = FakeShellctlClient(run_handler=run_handler)
layer = DifyShellLayer.from_config_with_settings(
DifyShellLayerConfig(),
DifyShellLayerConfig(agent_stub_drive_ref="agent-1"),
shellctl_entrypoint="http://shellctl",
shellctl_client_factory=lambda _entrypoint: client,
agent_stub_api_base_url="https://agent.example.com/agent-stub",
@ -801,7 +791,7 @@ def test_run_remote_script_can_inject_agent_stub_env_for_server_owned_uploads()
f"token-for:{execution_context.tenant_id}:{session_id}"
),
)
layer.deps = layer.deps_type(drive=_drive_layer(), execution_context=_execution_context_layer())
layer.deps = layer.deps_type(execution_context=_execution_context_layer())
async def scenario() -> None:
async with layer.resource_context():