feat(agent): add Agent Stub drive commands (#37593)

Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
This commit is contained in:
盐粒 Yanli 2026-06-17 23:21:09 +09:00 committed by GitHub
parent c71f03f590
commit 9021b3f5be
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
28 changed files with 2626 additions and 46 deletions

View File

@ -0,0 +1,380 @@
"""CLI helpers for sandbox-visible Agent Stub drive commands.
Drive commands stay in the sandbox-facing CLI because they orchestrate existing
control-plane and signed data-plane helpers. The Agent Stub server authenticates
and injects trusted drive scope; this module only formats manifest output,
downloads signed URLs into a local drive base (including safe auto-extraction of
downloaded skill archives), and uploads local files before committing their
ToolFile ids back into the drive.
"""
from __future__ import annotations
import stat
from dataclasses import dataclass
from pathlib import Path, PurePosixPath
from tempfile import TemporaryDirectory
from uuid import uuid4
from zipfile import BadZipFile, ZIP_DEFLATED, ZipFile, ZipInfo
from dify_agent.agent_stub.cli._env import read_agent_stub_environment
from dify_agent.agent_stub.cli._files import upload_tool_file_resource_from_environment
from dify_agent.agent_stub.client._agent_stub import (
download_file_bytes_from_signed_url_sync,
request_agent_stub_drive_commit_sync,
request_agent_stub_drive_manifest_sync,
)
from dify_agent.agent_stub.client._errors import AgentStubTransferError, AgentStubValidationError
from dify_agent.agent_stub.protocol.agent_stub import (
AgentStubDriveCommitItem,
AgentStubDriveCommitRequest,
AgentStubDriveCommitResponse,
AgentStubDriveFileRef,
AgentStubDriveItem,
AgentStubDriveManifestResponse,
)
_SKILL_MD_FILENAME = "SKILL.md"
_SKILL_ARCHIVE_FILENAME = ".DIFY-SKILL-FULL.zip"
_SKIP_DIR_NAMES = frozenset(
{".git", "__pycache__", ".pytest_cache", ".mypy_cache", ".ruff_cache", ".venv", "node_modules"}
)
_SKIP_FILE_NAMES = frozenset({".DS_Store", _SKILL_ARCHIVE_FILENAME})
@dataclass(frozen=True, slots=True)
class _DriveUploadItem:
"""Prepared local upload paired with its destination drive key."""
local_path: Path
drive_key: str
def list_drive_from_environment(prefix: str, json_output: bool) -> str | AgentStubDriveManifestResponse:
"""List drive items through the Agent Stub using the current environment.
Args:
prefix: Optional drive-key prefix forwarded to the manifest request.
json_output: When ``True``, return the validated manifest response model.
When ``False``, return one human-readable tab-separated line per item
containing size, mime type, hash, and key.
Returns:
Either ``AgentStubDriveManifestResponse`` for JSON callers or a formatted
string for human-facing CLI output.
Side effects:
Calls the Agent Stub drive manifest control-plane endpoint with
``include_download_url=False`` so list output does not allocate signed
download URLs.
"""
environment = read_agent_stub_environment()
response = request_agent_stub_drive_manifest_sync(
url=environment.url,
auth_jwe=environment.auth_jwe,
prefix=prefix,
include_download_url=False,
)
if json_output:
return response
return _format_manifest(response)
def pull_drive_from_environment(prefix: str, drive_base: str = "/mnt/drive") -> list[Path]:
"""Pull drive files into one local drive base via signed download URLs.
Args:
prefix: Optional drive-key prefix forwarded to the manifest request.
drive_base: Local base directory that receives downloaded drive files.
Returns:
A list of written local paths under ``drive_base``.
Observable behavior:
Requests a manifest with ``include_download_url=True``, requires every
returned item to include ``download_url``, downloads bytes directly from
those signed URLs, blocks path traversal by resolving each destination
under the resolved drive base, writes through a temporary sibling file
before replacing the final path, validates byte length when the manifest
includes ``size``, and automatically extracts
``.DIFY-SKILL-FULL.zip`` archives into their containing skill
directory with the same path-safety checks. Archive extraction is staged
under a temporary directory and only moved into place after the full
archive validates successfully.
The return value remains the list of downloaded paths only; extracted
files are materialized on disk but are not added to the returned list.
Raises:
AgentStubValidationError: if a manifest item omits ``download_url``, a
destination would escape the drive base, or a downloaded skill
archive contains unsafe entries such as absolute paths, traversal
entries, or symlink entries.
AgentStubTransferError: if a downloaded payload does not match declared
size metadata or a downloaded skill archive is corrupt / not a valid
zip file.
"""
environment = read_agent_stub_environment()
response = request_agent_stub_drive_manifest_sync(
url=environment.url,
auth_jwe=environment.auth_jwe,
prefix=prefix,
include_download_url=True,
)
base_path = Path(drive_base).expanduser().resolve()
base_path.mkdir(parents=True, exist_ok=True)
written_paths: list[Path] = []
for item in response.items:
download_url = item.download_url
if not isinstance(download_url, str) or not download_url:
raise AgentStubValidationError(f"drive manifest item is missing download_url: {item.key}")
destination = _resolve_drive_destination(base_path, item.key)
payload = download_file_bytes_from_signed_url_sync(download_url=download_url)
if item.size is not None and len(payload) != item.size:
raise AgentStubTransferError(f"downloaded drive file size mismatch for {item.key}")
destination.parent.mkdir(parents=True, exist_ok=True)
temp_path = destination.with_name(f"{destination.name}.tmp-{uuid4().hex}")
temp_path.write_bytes(payload)
temp_path.replace(destination)
written_paths.append(destination)
if destination.name == _SKILL_ARCHIVE_FILENAME:
_extract_skill_archive(destination)
return written_paths
def push_drive_from_environment(local_path: str, drive_path: str, recursive: bool) -> AgentStubDriveCommitResponse:
"""Upload local files through the Agent Stub and commit them into the drive.
Args:
local_path: Source file or directory in the sandbox filesystem.
drive_path: Destination drive key or drive-key prefix.
recursive: Select directory mode. ``False`` standardizes skill
directories, while ``True`` uploads raw directory contents.
Returns:
The validated drive commit response returned by the Agent Stub.
Mode split:
* If ``local_path`` is a file, upload that file and commit exactly one
``tool_file`` binding to ``drive_path``.
* If ``local_path`` is a directory and ``recursive`` is ``False``,
require ``SKILL.md`` and standardize the upload into
``<drive_path>/SKILL.md`` plus ``<drive_path>/.DIFY-SKILL-FULL.zip``.
* If ``local_path`` is a directory and ``recursive`` is ``True``, upload
each regular file under ``drive_path/<relative_path>`` without skill
standardization.
Observable safety behavior:
Rejects missing local paths, rejects recursive directory pushes with no
regular files, and rejects symlinked or escaping paths while preparing
directory uploads or skill archives.
"""
source_path = Path(local_path).expanduser().resolve()
if source_path.is_file():
return _commit_uploaded_items([_prepare_uploaded_file(source_path, drive_path)])
if not source_path.is_dir():
raise AgentStubValidationError(f"local path not found: {source_path}")
if recursive:
upload_items = [
_prepare_uploaded_file(path, _join_drive_key(drive_path, relative_path))
for path, relative_path in _iter_regular_files(source_path)
]
if not upload_items:
raise AgentStubValidationError(f"directory has no regular files: {source_path}")
return _commit_uploaded_items(upload_items)
return _push_skill_directory(source_path, drive_path)
def _push_skill_directory(source_path: Path, drive_path: str) -> AgentStubDriveCommitResponse:
skill_md_path = source_path / _SKILL_MD_FILENAME
if not skill_md_path.is_file():
raise AgentStubValidationError(f"non-recursive drive push requires {_SKILL_MD_FILENAME}: {source_path}")
with TemporaryDirectory() as temp_dir:
archive_path = Path(temp_dir) / _SKILL_ARCHIVE_FILENAME
_build_skill_archive(source_path, archive_path)
return _commit_uploaded_items(
[
_prepare_uploaded_file(skill_md_path.resolve(), _join_drive_key(drive_path, _SKILL_MD_FILENAME)),
_prepare_uploaded_file(archive_path, _join_drive_key(drive_path, _SKILL_ARCHIVE_FILENAME)),
]
)
def _prepare_uploaded_file(local_path: Path, drive_key: str) -> _DriveUploadItem:
return _DriveUploadItem(local_path=local_path, drive_key=drive_key)
def _commit_uploaded_items(items: list[_DriveUploadItem]) -> AgentStubDriveCommitResponse:
environment = read_agent_stub_environment()
commit_items: list[AgentStubDriveCommitItem] = []
for item in items:
uploaded_file = upload_tool_file_resource_from_environment(path=str(item.local_path))
commit_items.append(
AgentStubDriveCommitItem(
key=item.drive_key,
file_ref=AgentStubDriveFileRef(kind="tool_file", id=uploaded_file.tool_file_id),
)
)
return request_agent_stub_drive_commit_sync(
url=environment.url,
auth_jwe=environment.auth_jwe,
request=AgentStubDriveCommitRequest(items=commit_items),
)
def _format_manifest(response: AgentStubDriveManifestResponse) -> str:
return "\n".join(_format_manifest_item(item) for item in response.items)
def _format_manifest_item(item: AgentStubDriveItem) -> str:
size = str(item.size) if item.size is not None else "-"
mime_type = item.mime_type or "-"
item_hash = item.hash or "-"
return f"{size}\t{mime_type}\t{item_hash}\t{item.key}"
def _resolve_drive_destination(base_path: Path, drive_key: str) -> Path:
destination = (base_path / Path(drive_key)).resolve()
try:
destination.relative_to(base_path)
except ValueError as exc:
raise AgentStubValidationError(f"drive key resolves outside the drive base: {drive_key}") from exc
return destination
def _iter_regular_files(root_path: Path) -> list[tuple[Path, str]]:
"""Return all regular files under one directory, rejecting unsafe symlinks."""
return _iter_regular_files_with_skip_filter(root_path, skip_filtered=False)
def _iter_skill_archive_files(root_path: Path) -> list[tuple[Path, str]]:
"""Return regular files for skill packaging, excluding transient content."""
return _iter_regular_files_with_skip_filter(root_path, skip_filtered=True)
def _iter_regular_files_with_skip_filter(root_path: Path, *, skip_filtered: bool) -> list[tuple[Path, str]]:
root_resolved = root_path.resolve()
collected: list[tuple[Path, str]] = []
for candidate in sorted(root_path.rglob("*")):
if skip_filtered and _should_skip_path(candidate, root_path):
continue
if candidate.is_symlink():
raise AgentStubValidationError(f"drive push does not support symlinked files: {candidate}")
if not candidate.is_file():
continue
resolved_candidate = candidate.resolve()
try:
relative_path = resolved_candidate.relative_to(root_resolved)
except ValueError as exc:
raise AgentStubValidationError(
f"drive push file resolves outside the source directory: {candidate}"
) from exc
collected.append((resolved_candidate, relative_path.as_posix()))
return collected
def _should_skip_path(candidate: Path, root_path: Path) -> bool:
relative_path = candidate.relative_to(root_path)
if any(part in _SKIP_DIR_NAMES for part in relative_path.parts):
return True
return candidate.name in _SKIP_FILE_NAMES
def _build_skill_archive(source_path: Path, archive_path: Path) -> None:
with ZipFile(archive_path, mode="w", compression=ZIP_DEFLATED) as archive:
for file_path, relative_path in _iter_skill_archive_files(source_path):
archive.write(file_path, arcname=relative_path)
def _extract_skill_archive(archive_path: Path) -> None:
"""Safely extract one downloaded skill archive into its containing directory.
Extraction is staged under a temporary directory created inside the target
skill directory. Every entry is validated and materialized into staging
first, and only after the full archive succeeds are staged files moved into
their final locations under the skill directory. Existing files at those
final locations are overwritten in place by the extracted archive content.
Error mapping is intentionally stable for CLI callers: unsafe archive entry
names raise ``AgentStubValidationError``, while malformed archives and zip
parsing / archive I/O failures are translated into ``AgentStubTransferError``.
"""
target_dir = archive_path.parent.resolve()
try:
with TemporaryDirectory(dir=target_dir, prefix=".dify-skill-extract-") as staging_dir_name:
staging_dir = Path(staging_dir_name).resolve()
with ZipFile(archive_path) as archive:
for zip_info in archive.infolist():
destination = _resolve_zip_entry_destination(staging_dir, zip_info.filename)
if _is_zip_symlink(zip_info):
raise AgentStubValidationError(
f"skill archive contains unsupported symlink entry: {zip_info.filename}"
)
if zip_info.is_dir():
destination.mkdir(parents=True, exist_ok=True)
continue
destination.parent.mkdir(parents=True, exist_ok=True)
with archive.open(zip_info) as source_file:
temp_path = destination.with_name(f"{destination.name}.tmp-{uuid4().hex}")
temp_path.write_bytes(source_file.read())
temp_path.replace(destination)
for staged_path in sorted(staging_dir.rglob("*")):
if staged_path.is_dir():
continue
relative_path = staged_path.relative_to(staging_dir)
destination = (target_dir / relative_path).resolve()
destination.parent.mkdir(parents=True, exist_ok=True)
staged_path.replace(destination)
except AgentStubValidationError:
raise
except (BadZipFile, OSError) as exc:
raise AgentStubTransferError(f"downloaded skill archive is invalid: {archive_path.name}") from exc
def _resolve_zip_entry_destination(target_dir: Path, entry_name: str) -> Path:
"""Resolve one zip entry path under a target skill directory.
Zip metadata may contain POSIX or backslash-separated names, so entry names
are normalized to forward slashes before validation. The resolved entry must
not be absolute, empty, ``.`` / ``..`` based, or otherwise escape the target
skill directory after resolution.
"""
normalized_name = entry_name.replace("\\", "/")
pure_path = PurePosixPath(normalized_name)
if not normalized_name or normalized_name.startswith("/") or pure_path.is_absolute():
raise AgentStubValidationError(f"skill archive contains unsafe absolute path: {entry_name}")
if any(part in {"", ".", ".."} for part in pure_path.parts):
raise AgentStubValidationError(f"skill archive contains unsafe path traversal entry: {entry_name}")
destination = (target_dir / Path(*pure_path.parts)).resolve()
try:
destination.relative_to(target_dir)
except ValueError as exc:
raise AgentStubValidationError(
f"skill archive entry resolves outside the skill directory: {entry_name}"
) from exc
return destination
def _is_zip_symlink(zip_info: ZipInfo) -> bool:
file_mode = zip_info.external_attr >> 16
return stat.S_ISLNK(file_mode)
def _join_drive_key(base_key: str, child_key: str) -> str:
stripped_base = base_key.rstrip("/")
stripped_child = child_key.lstrip("/")
return f"{stripped_base}/{stripped_child}" if stripped_base else stripped_child
__all__ = [
"list_drive_from_environment",
"pull_drive_from_environment",
"push_drive_from_environment",
]

View File

@ -36,6 +36,14 @@ class DownloadedFileResult:
path: Path
@dataclass(frozen=True, slots=True)
class UploadedToolFileResource:
"""Lower-level upload result carrying both public mapping and ToolFile id."""
mapping: UploadedToolFileMapping
tool_file_id: str
def upload_file_from_environment(*, path: str) -> UploadedToolFileMapping:
"""Upload one sandbox-local file through the Agent Stub control plane.
@ -44,6 +52,23 @@ def upload_file_from_environment(*, path: str) -> UploadedToolFileMapping:
canonical Agent output file mapping without synthesizing reference format.
"""
return upload_tool_file_resource_from_environment(path=path).mapping
def upload_tool_file_resource_from_environment(*, path: str) -> UploadedToolFileResource:
"""Upload one sandbox-local file and preserve both reference and ToolFile id.
This lower-level helper backs ``drive push``. The signed upload data-plane
response must include both the canonical Dify ``reference`` used by public
CLI output and the raw ToolFile ``id`` required by drive commit payloads.
Raises:
AgentStubValidationError: if ``path`` does not resolve to a local file.
AgentStubTransferError: if the signed upload response omits either the
canonical ``reference`` or the raw ToolFile ``id``, or if the
canonical reference is malformed.
"""
source_path = Path(path).expanduser().resolve()
if not source_path.is_file():
raise AgentStubValidationError(f"local file not found: {source_path}")
@ -64,7 +89,7 @@ def upload_file_from_environment(*, path: str) -> UploadedToolFileMapping:
file_obj=file_obj,
mimetype=mime_type,
)
return _normalize_uploaded_tool_file(payload)
return _normalize_uploaded_tool_file_resource(payload)
def download_file_from_environment(
@ -101,13 +126,19 @@ def download_file_from_environment(
return DownloadedFileResult(path=destination)
def _normalize_uploaded_tool_file(payload: dict[str, object]) -> UploadedToolFileMapping:
def _normalize_uploaded_tool_file_resource(payload: dict[str, object]) -> UploadedToolFileResource:
reference = payload.get("reference")
if not isinstance(reference, str) or not reference:
raise AgentStubTransferError("signed file upload response is missing reference")
if not is_canonical_dify_file_reference(reference):
raise AgentStubTransferError("signed file upload response has invalid canonical reference")
return UploadedToolFileMapping(reference=reference)
tool_file_id = payload.get("id")
if not isinstance(tool_file_id, str) or not tool_file_id:
raise AgentStubTransferError("signed file upload response is missing id")
return UploadedToolFileResource(
mapping=UploadedToolFileMapping(reference=reference),
tool_file_id=tool_file_id,
)
def _deduplicate_destination_path(path: Path) -> Path:
@ -134,6 +165,8 @@ def _sanitize_download_filename(filename: str) -> str:
__all__ = [
"DownloadedFileResult",
"UploadedToolFileMapping",
"UploadedToolFileResource",
"download_file_from_environment",
"upload_file_from_environment",
"upload_tool_file_resource_from_environment",
]

View File

@ -1,11 +1,11 @@
"""Typer entry point for the client-safe ``dify-agent`` console script.
The CLI supports an explicit ``connect`` command and treats unknown bare
commands as Agent Stub forwards. When the injected Agent Stub environment
variables are missing, that path intentionally surfaces a clear missing-env
error instead of Typer's generic unknown-command message. The module depends
only on client-safe code so importing the console entry point does not pull in
FastAPI, Redis, shellctl, or JWE runtime dependencies.
The CLI supports explicit ``connect``, ``file``, and ``drive`` commands and
treats unknown bare commands as Agent Stub forwards. When the injected Agent
Stub environment variables are missing, that path intentionally surfaces a
clear missing-env error instead of Typer's generic unknown-command message. The
module depends only on client-safe code so importing the console entry point
does not pull in FastAPI, Redis, shellctl, or JWE runtime dependencies.
"""
from __future__ import annotations
@ -16,6 +16,11 @@ import typer
from typer.main import get_command
from dify_agent.agent_stub.cli._agent_stub import connect_from_environment
from dify_agent.agent_stub.cli._drive import (
list_drive_from_environment,
pull_drive_from_environment,
push_drive_from_environment,
)
from dify_agent.agent_stub.cli._env import MissingAgentStubEnvironmentError, has_agent_stub_environment
from dify_agent.agent_stub.cli._files import download_file_from_environment, upload_file_from_environment
from dify_agent.agent_stub.client._errors import AgentStubClientError
@ -27,8 +32,10 @@ app = typer.Typer(
no_args_is_help=True,
)
file_app = typer.Typer(help="Upload or download workflow files through the Agent Stub.")
drive_app = typer.Typer(help="List, pull, or push agent drive files through the Agent Stub.")
app.add_typer(file_app, name="file")
_KNOWN_ROOT_COMMANDS = frozenset({"connect", "file"})
app.add_typer(drive_app, name="drive")
_KNOWN_ROOT_COMMANDS = frozenset({"connect", "drive", "file"})
@app.command(context_settings={"allow_extra_args": True, "ignore_unknown_options": True})
@ -60,6 +67,34 @@ def download(
)
@drive_app.command("list")
def drive_list(
path_prefix: str = typer.Argument("", metavar="PATH_PREFIX"),
json_output: bool = typer.Option(False, "--json", help="Emit the drive manifest as JSON."),
) -> None:
"""List drive files visible to the current sandbox execution."""
_run_drive_list(path_prefix=path_prefix, json_output=json_output)
@drive_app.command("pull")
def drive_pull(
path_prefix: str = typer.Argument("", metavar="PATH_PREFIX"),
drive_base: str = typer.Option("/mnt/drive", "--drive-base", help="Local base directory for pulled drive files."),
) -> None:
"""Pull drive files into one local directory tree."""
_run_drive_pull(path_prefix=path_prefix, drive_base=drive_base)
@drive_app.command("push")
def drive_push(
local_path: str = typer.Argument(..., metavar="LOCAL_PATH"),
drive_path: str = typer.Argument(..., metavar="DRIVE_PATH"),
recursive: bool = typer.Option(False, "-r", "--recursive", help="Recursively upload directory contents."),
) -> None:
"""Upload one local file or directory into the agent drive."""
_run_drive_push(local_path=local_path, drive_path=drive_path, recursive=recursive)
def main(argv: list[str] | None = None) -> None:
"""Run the ``dify-agent`` CLI with optional argv injection for tests."""
args = list(sys.argv[1:] if argv is None else argv)
@ -155,4 +190,46 @@ def _run_file_download(*, transfer_method: str, reference_or_url: str, directory
typer.echo(str(response.path))
def _run_drive_list(*, path_prefix: str, json_output: bool) -> None:
try:
response = list_drive_from_environment(prefix=path_prefix, json_output=json_output)
except MissingAgentStubEnvironmentError as exc:
typer.echo(str(exc), err=True)
raise SystemExit(2) from exc
except AgentStubClientError as exc:
typer.echo(str(exc), err=True)
raise SystemExit(1) from exc
if json_output:
if isinstance(response, str):
raise RuntimeError("drive list JSON output expected a manifest response")
typer.echo(response.model_dump_json())
return
typer.echo(response)
def _run_drive_pull(*, path_prefix: str, drive_base: str) -> None:
try:
response = pull_drive_from_environment(prefix=path_prefix, drive_base=drive_base)
except MissingAgentStubEnvironmentError as exc:
typer.echo(str(exc), err=True)
raise SystemExit(2) from exc
except AgentStubClientError as exc:
typer.echo(str(exc), err=True)
raise SystemExit(1) from exc
for path in response:
typer.echo(str(path))
def _run_drive_push(*, local_path: str, drive_path: str, recursive: bool) -> None:
try:
response = push_drive_from_environment(local_path=local_path, drive_path=drive_path, recursive=recursive)
except MissingAgentStubEnvironmentError as exc:
typer.echo(str(exc), err=True)
raise SystemExit(2) from exc
except AgentStubClientError as exc:
typer.echo(str(exc), err=True)
raise SystemExit(1) from exc
typer.echo(response.model_dump_json())
__all__ = ["app", "main"]

View File

@ -8,12 +8,18 @@ from pydantic import JsonValue
from dify_agent.agent_stub.client._agent_stub_http import (
connect_agent_stub_http_sync,
download_file_bytes_from_signed_url_sync,
request_agent_stub_drive_commit_http_sync,
request_agent_stub_drive_manifest_http_sync,
request_agent_stub_file_download_http_sync,
request_agent_stub_file_upload_http_sync,
upload_file_to_signed_url_sync,
)
from dify_agent.agent_stub.client._errors import AgentStubValidationError
from dify_agent.agent_stub.protocol.agent_stub import AgentStubFileMapping, parse_agent_stub_endpoint
from dify_agent.agent_stub.protocol.agent_stub import (
AgentStubDriveCommitRequest,
AgentStubFileMapping,
parse_agent_stub_endpoint,
)
def connect_agent_stub_sync(
@ -106,6 +112,60 @@ def request_agent_stub_file_download_sync(
)
def request_agent_stub_drive_manifest_sync(
*,
url: str,
auth_jwe: str,
prefix: str,
include_download_url: bool,
timeout: float | httpx.Timeout = 30.0,
sync_http_client: httpx.Client | None = None,
):
"""Request one drive manifest through the HTTP Agent Stub transport.
Drive operations are intentionally HTTP-only in this stage. Callers must
provide an ``http://`` or ``https://`` Agent Stub URL; ``grpc://`` endpoints
raise ``AgentStubValidationError`` instead of attempting transport fallback.
"""
endpoint = _parse_endpoint(url)
if endpoint.is_grpc:
raise AgentStubValidationError("Agent Stub drive operations require an HTTP Agent Stub URL")
return request_agent_stub_drive_manifest_http_sync(
base_url=endpoint.url,
auth_jwe=auth_jwe,
prefix=prefix,
include_download_url=include_download_url,
timeout=timeout,
sync_http_client=sync_http_client,
)
def request_agent_stub_drive_commit_sync(
*,
url: str,
auth_jwe: str,
request: AgentStubDriveCommitRequest,
timeout: float | httpx.Timeout = 30.0,
sync_http_client: httpx.Client | None = None,
):
"""Commit one drive batch through the HTTP Agent Stub transport.
Drive operations are intentionally HTTP-only in this stage. Callers must
provide an ``http://`` or ``https://`` Agent Stub URL; ``grpc://`` endpoints
raise ``AgentStubValidationError`` instead of attempting transport fallback.
"""
endpoint = _parse_endpoint(url)
if endpoint.is_grpc:
raise AgentStubValidationError("Agent Stub drive operations require an HTTP Agent Stub URL")
return request_agent_stub_drive_commit_http_sync(
base_url=endpoint.url,
auth_jwe=auth_jwe,
request=request,
timeout=timeout,
sync_http_client=sync_http_client,
)
def _parse_endpoint(url: str):
try:
return parse_agent_stub_endpoint(url)
@ -116,6 +176,8 @@ def _parse_endpoint(url: str):
__all__ = [
"connect_agent_stub_sync",
"download_file_bytes_from_signed_url_sync",
"request_agent_stub_drive_commit_sync",
"request_agent_stub_drive_manifest_sync",
"request_agent_stub_file_download_sync",
"request_agent_stub_file_upload_sync",
"upload_file_to_signed_url_sync",

View File

@ -24,12 +24,17 @@ from dify_agent.agent_stub.client._errors import (
from dify_agent.agent_stub.protocol.agent_stub import (
AgentStubConnectRequest,
AgentStubConnectResponse,
AgentStubDriveCommitRequest,
AgentStubDriveCommitResponse,
AgentStubDriveManifestResponse,
AgentStubFileDownloadRequest,
AgentStubFileDownloadResponse,
AgentStubFileMapping,
AgentStubFileUploadRequest,
AgentStubFileUploadResponse,
agent_stub_connections_url,
agent_stub_drive_commit_url,
agent_stub_drive_manifest_url,
agent_stub_file_download_request_url,
agent_stub_file_upload_request_url,
)
@ -124,6 +129,56 @@ def request_agent_stub_file_download_http_sync(
)
def request_agent_stub_drive_manifest_http_sync(
*,
base_url: str,
auth_jwe: str,
prefix: str,
include_download_url: bool,
timeout: float | httpx.Timeout = 30.0,
sync_http_client: httpx.Client | None = None,
) -> AgentStubDriveManifestResponse:
"""Request one drive manifest from the HTTP Agent Stub endpoint."""
response = _get_agent_stub_json(
base_url=base_url,
auth_jwe=auth_jwe,
endpoint_name="drive manifest request",
endpoint_url_factory=agent_stub_drive_manifest_url,
params={
"prefix": prefix,
"include_download_url": str(include_download_url).lower(),
},
timeout=timeout,
sync_http_client=sync_http_client,
)
return _parse_success_response(
response=response, response_model=AgentStubDriveManifestResponse, label="drive manifest"
)
def request_agent_stub_drive_commit_http_sync(
*,
base_url: str,
auth_jwe: str,
request: AgentStubDriveCommitRequest,
timeout: float | httpx.Timeout = 30.0,
sync_http_client: httpx.Client | None = None,
) -> AgentStubDriveCommitResponse:
"""Commit one drive batch through the HTTP Agent Stub endpoint."""
response = _post_agent_stub_json(
base_url=base_url,
auth_jwe=auth_jwe,
endpoint_name="drive commit request",
endpoint_url_factory=agent_stub_drive_commit_url,
request_body=request.model_dump_json(exclude_none=True),
timeout=timeout,
sync_http_client=sync_http_client,
)
return _parse_success_response(response=response, response_model=AgentStubDriveCommitResponse, label="drive commit")
def upload_file_to_signed_url_sync(
*,
upload_url: str,
@ -229,6 +284,38 @@ def _post_agent_stub_json(
client.close()
def _get_agent_stub_json(
*,
base_url: str,
auth_jwe: str,
endpoint_name: str,
endpoint_url_factory: Callable[[str], str],
params: dict[str, str],
timeout: float | httpx.Timeout,
sync_http_client: httpx.Client | None,
) -> httpx.Response:
try:
endpoint_url = endpoint_url_factory(base_url)
except ValueError as exc:
raise AgentStubValidationError("invalid Agent Stub base URL") from exc
owns_client = sync_http_client is None
client = sync_http_client or httpx.Client(timeout=timeout, follow_redirects=True)
try:
return client.get(
endpoint_url,
params=params,
headers={"Authorization": f"Bearer {auth_jwe}"},
timeout=timeout,
)
except httpx.TimeoutException as exc:
raise AgentStubClientError(f"Agent Stub {endpoint_name} timed out") from exc
except httpx.RequestError as exc:
raise AgentStubClientError(f"Agent Stub {endpoint_name} request failed: {exc}") from exc
finally:
if owns_client:
client.close()
def _parse_success_response[T: BaseModel](
*,
response: httpx.Response,
@ -257,6 +344,8 @@ def _parse_json_payload(response: httpx.Response, *, invalid_json_message: str)
__all__ = [
"connect_agent_stub_http_sync",
"download_file_bytes_from_signed_url_sync",
"request_agent_stub_drive_commit_http_sync",
"request_agent_stub_drive_manifest_http_sync",
"request_agent_stub_file_download_http_sync",
"request_agent_stub_file_upload_http_sync",
"upload_file_to_signed_url_sync",

View File

@ -6,6 +6,12 @@ from .agent_stub import (
AGENT_STUB_URL_ENV_VAR,
AgentStubConnectRequest,
AgentStubConnectResponse,
AgentStubDriveCommitItem,
AgentStubDriveCommitRequest,
AgentStubDriveCommitResponse,
AgentStubDriveFileRef,
AgentStubDriveItem,
AgentStubDriveManifestResponse,
AgentStubEndpoint,
AgentStubFileDownloadRequest,
AgentStubFileDownloadResponse,
@ -14,6 +20,8 @@ from .agent_stub import (
AgentStubFileUploadResponse,
AgentStubURLScheme,
agent_stub_connections_url,
agent_stub_drive_commit_url,
agent_stub_drive_manifest_url,
agent_stub_file_download_request_url,
agent_stub_file_upload_request_url,
is_canonical_dify_file_reference,
@ -27,6 +35,12 @@ __all__ = [
"AGENT_STUB_URL_ENV_VAR",
"AgentStubConnectRequest",
"AgentStubConnectResponse",
"AgentStubDriveCommitItem",
"AgentStubDriveCommitRequest",
"AgentStubDriveCommitResponse",
"AgentStubDriveFileRef",
"AgentStubDriveItem",
"AgentStubDriveManifestResponse",
"AgentStubEndpoint",
"AgentStubFileDownloadRequest",
"AgentStubFileDownloadResponse",
@ -35,6 +49,8 @@ __all__ = [
"AgentStubFileUploadResponse",
"AgentStubURLScheme",
"agent_stub_connections_url",
"agent_stub_drive_commit_url",
"agent_stub_drive_manifest_url",
"agent_stub_file_download_request_url",
"agent_stub_file_upload_request_url",
"is_canonical_dify_file_reference",

View File

@ -115,6 +115,16 @@ def agent_stub_file_download_request_url(base_url: str) -> str:
return f"{_require_http_base_url(base_url)}/files/download-request"
def agent_stub_drive_manifest_url(base_url: str) -> str:
"""Return the stable HTTP drive-manifest endpoint URL for one base URL."""
return f"{_require_http_base_url(base_url)}/drive/manifest"
def agent_stub_drive_commit_url(base_url: str) -> str:
"""Return the stable HTTP drive-commit endpoint URL for one base URL."""
return f"{_require_http_base_url(base_url)}/drive/commit"
def is_canonical_dify_file_reference(reference: str) -> bool:
"""Return whether one string matches Dify's opaque file reference format."""
prefix = "dify-file-ref:"
@ -210,6 +220,65 @@ class AgentStubFileDownloadResponse(BaseModel):
model_config: ClassVar[ConfigDict] = ConfigDict(extra="forbid")
class AgentStubDriveFileRef(BaseModel):
"""Trusted file reference used by Agent Stub drive commit requests."""
kind: Literal["upload_file", "tool_file"]
id: str
model_config: ClassVar[ConfigDict] = ConfigDict(extra="forbid")
class AgentStubDriveCommitItem(BaseModel):
"""One drive key to file binding committed through the Agent Stub."""
key: str
file_ref: AgentStubDriveFileRef
value_owned_by_drive: bool = True
model_config: ClassVar[ConfigDict] = ConfigDict(extra="forbid")
class AgentStubDriveCommitRequest(BaseModel):
"""Request body for one Agent Stub drive commit batch."""
items: list[AgentStubDriveCommitItem]
model_config: ClassVar[ConfigDict] = ConfigDict(extra="forbid")
class AgentStubDriveItem(BaseModel):
"""One manifest or commit item returned by the Agent Stub drive API."""
key: str
size: int | None = None
hash: str | None = None
mime_type: str | None = None
file_kind: Literal["upload_file", "tool_file"]
file_id: str
created_at: int | None = None
download_url: str | None = None
value_owned_by_drive: bool | None = None
model_config: ClassVar[ConfigDict] = ConfigDict(extra="forbid")
class AgentStubDriveManifestResponse(BaseModel):
"""Response body for one Agent Stub drive manifest request."""
items: list[AgentStubDriveItem]
model_config: ClassVar[ConfigDict] = ConfigDict(extra="forbid")
class AgentStubDriveCommitResponse(BaseModel):
"""Response body for one Agent Stub drive commit request."""
items: list[AgentStubDriveItem]
model_config: ClassVar[ConfigDict] = ConfigDict(extra="forbid")
def _require_http_base_url(base_url: str) -> str:
endpoint = parse_agent_stub_endpoint(base_url)
if not endpoint.is_http:
@ -228,6 +297,12 @@ __all__ = [
"AgentStubConnectRequest",
"AgentStubConnectResponse",
"AgentStubEndpoint",
"AgentStubDriveCommitItem",
"AgentStubDriveCommitRequest",
"AgentStubDriveCommitResponse",
"AgentStubDriveFileRef",
"AgentStubDriveItem",
"AgentStubDriveManifestResponse",
"AgentStubFileDownloadRequest",
"AgentStubFileDownloadResponse",
"AgentStubFileMapping",
@ -235,6 +310,8 @@ __all__ = [
"AgentStubFileUploadResponse",
"AgentStubURLScheme",
"agent_stub_connections_url",
"agent_stub_drive_commit_url",
"agent_stub_drive_manifest_url",
"agent_stub_file_download_request_url",
"agent_stub_file_upload_request_url",
"is_canonical_dify_file_reference",

View File

@ -0,0 +1,190 @@
"""Server-side Dify API client for Agent Stub drive endpoints.
The Agent Stub drive API is an HTTP-only control plane over the existing Dify
agent drive inner APIs. Sandbox callers never send trusted tenant, agent, or
user ids directly; this module receives an authenticated ``AgentStubPrincipal``,
derives ``agent-<agent_id>`` from execution context, injects trusted identity
fields into the Dify inner request, and normalizes transport, HTTP, JSON, and
schema failures into ``AgentStubDriveRequestError`` for the route layer.
"""
from __future__ import annotations
from collections.abc import Mapping
from dataclasses import dataclass
from typing import Any, Protocol
import httpx
from pydantic import ValidationError
from dify_agent.agent_stub.protocol.agent_stub import (
AgentStubDriveCommitRequest,
AgentStubDriveCommitResponse,
AgentStubDriveManifestResponse,
)
from dify_agent.agent_stub.server.tokens.agent_stub import AgentStubPrincipal
from dify_agent.layers.execution_context import DifyExecutionContextLayerConfig
class AgentStubDriveRequestHandler(Protocol):
"""Trusted control-plane bridge from sandbox drive calls to Dify inner APIs."""
async def get_manifest(
self,
*,
principal: AgentStubPrincipal,
prefix: str,
include_download_url: bool,
) -> AgentStubDriveManifestResponse: ...
async def commit(
self,
*,
principal: AgentStubPrincipal,
request: AgentStubDriveCommitRequest,
) -> AgentStubDriveCommitResponse: ...
class AgentStubDriveRequestError(RuntimeError):
"""Raised when the Agent Stub cannot complete one drive control-plane call."""
status_code: int
detail: object
def __init__(self, status_code: int, detail: object) -> None:
self.status_code = status_code
self.detail = detail
super().__init__(str(detail))
@dataclass(slots=True)
class DifyApiAgentStubDriveRequestHandler:
"""Call Dify API inner drive endpoints on behalf of authenticated sandboxes.
Manifest requests require ``tenant_id`` and ``agent_id`` from execution
context and forward query parameters to
``/inner/api/drive/agent-<agent_id>/manifest``. Commit requests additionally
require ``user_id`` and post a raw JSON payload to
``/inner/api/drive/agent-<agent_id>/commit``. Dify drive endpoints return
raw ``{"items": [...]}`` payloads instead of plugin-style ``data`` envelopes,
so this module validates the raw success payload directly.
"""
dify_api_base_url: str
dify_api_inner_api_key: str
timeout: httpx.Timeout | float = 30.0
async def get_manifest(
self,
*,
principal: AgentStubPrincipal,
prefix: str,
include_download_url: bool,
) -> AgentStubDriveManifestResponse:
"""Request one drive manifest from Dify's inner drive manifest endpoint."""
execution_context = self._require_agent_context(principal.execution_context)
payload = await self._get_inner_api(
f"/inner/api/drive/{self._drive_ref(execution_context)}/manifest",
{
"tenant_id": execution_context.tenant_id,
"prefix": prefix,
"include_download_url": str(include_download_url).lower(),
},
)
try:
return AgentStubDriveManifestResponse.model_validate(payload)
except ValidationError as exc:
raise AgentStubDriveRequestError(502, "Dify API drive manifest response is invalid") from exc
async def commit(
self,
*,
principal: AgentStubPrincipal,
request: AgentStubDriveCommitRequest,
) -> AgentStubDriveCommitResponse:
"""Commit one drive batch through Dify's inner drive commit endpoint."""
execution_context = self._require_user_context(self._require_agent_context(principal.execution_context))
payload = await self._post_inner_api(
f"/inner/api/drive/{self._drive_ref(execution_context)}/commit",
{
"tenant_id": execution_context.tenant_id,
"user_id": execution_context.user_id,
"items": [item.model_dump(mode="json", exclude_none=True) for item in request.items],
},
)
try:
return AgentStubDriveCommitResponse.model_validate(payload)
except ValidationError as exc:
raise AgentStubDriveRequestError(502, "Dify API drive commit response is invalid") from exc
def _require_agent_context(
self, execution_context: DifyExecutionContextLayerConfig
) -> DifyExecutionContextLayerConfig:
if execution_context.agent_id is None:
raise AgentStubDriveRequestError(400, "execution context agent_id is required for drive operations")
return execution_context
def _require_user_context(
self, execution_context: DifyExecutionContextLayerConfig
) -> DifyExecutionContextLayerConfig:
if execution_context.user_id is None:
raise AgentStubDriveRequestError(400, "execution context user_id is required for drive commit")
return execution_context
@staticmethod
def _drive_ref(execution_context: DifyExecutionContextLayerConfig) -> str:
agent_id = execution_context.agent_id
if agent_id is None:
raise AgentStubDriveRequestError(400, "execution context agent_id is required for drive operations")
return f"agent-{agent_id}"
async def _get_inner_api(self, path: str, params: Mapping[str, str]) -> object:
url = f"{self.dify_api_base_url.rstrip('/')}{path}"
async with httpx.AsyncClient(timeout=self.timeout, follow_redirects=True, trust_env=False) as client:
try:
response = await client.get(
url,
params=dict(params),
headers={"X-Inner-Api-Key": self.dify_api_inner_api_key},
)
except httpx.TimeoutException as exc:
raise AgentStubDriveRequestError(504, "Dify API drive request timed out") from exc
except httpx.RequestError as exc:
raise AgentStubDriveRequestError(502, f"Dify API drive request failed: {exc}") from exc
return self._normalize_payload(response)
async def _post_inner_api(self, path: str, payload: Mapping[str, Any]) -> object:
url = f"{self.dify_api_base_url.rstrip('/')}{path}"
async with httpx.AsyncClient(timeout=self.timeout, follow_redirects=True, trust_env=False) as client:
try:
response = await client.post(
url,
json=dict(payload),
headers={"X-Inner-Api-Key": self.dify_api_inner_api_key},
)
except httpx.TimeoutException as exc:
raise AgentStubDriveRequestError(504, "Dify API drive request timed out") from exc
except httpx.RequestError as exc:
raise AgentStubDriveRequestError(502, f"Dify API drive request failed: {exc}") from exc
return self._normalize_payload(response)
def _normalize_payload(self, response: httpx.Response) -> object:
raw_payload = self._parse_json(response)
if response.is_error:
detail = raw_payload.get("detail", raw_payload) if isinstance(raw_payload, dict) else raw_payload
raise AgentStubDriveRequestError(response.status_code, detail)
return raw_payload
@staticmethod
def _parse_json(response: httpx.Response) -> object:
try:
return response.json()
except ValueError as exc:
raise AgentStubDriveRequestError(502, "Dify API drive request returned invalid JSON") from exc
__all__ = [
"AgentStubDriveRequestError",
"AgentStubDriveRequestHandler",
"DifyApiAgentStubDriveRequestHandler",
]

View File

@ -2,8 +2,9 @@
The standalone stub server is only a convenience wrapper around the shared
router. It reuses the main ``ServerSettings`` model and derives the Agent Stub
token codec and optional file-request bridge from the same helper methods that
the standard run server uses before mounting ``create_agent_stub_router(...)``.
token codec plus optional file and drive request bridges from the same helper
methods that the standard run server uses before mounting
``create_agent_stub_router(...)``.
"""
from __future__ import annotations
@ -22,6 +23,7 @@ def create_agent_stub_app(settings: ServerSettings | None = None) -> FastAPI:
create_agent_stub_router(
token_codec=resolved_settings.create_agent_stub_token_codec(),
file_request_handler=resolved_settings.create_agent_stub_file_request_handler(),
drive_request_handler=resolved_settings.create_agent_stub_drive_request_handler(),
)
)
return app

View File

@ -8,11 +8,15 @@ from uuid import uuid4
from dify_agent.agent_stub.protocol.agent_stub import (
AgentStubConnectResponse,
AgentStubDriveCommitRequest,
AgentStubDriveCommitResponse,
AgentStubDriveManifestResponse,
AgentStubFileDownloadRequest,
AgentStubFileDownloadResponse,
AgentStubFileUploadRequest,
AgentStubFileUploadResponse,
)
from dify_agent.agent_stub.server.agent_stub_drive import AgentStubDriveRequestError, AgentStubDriveRequestHandler
from dify_agent.agent_stub.server.agent_stub_files import AgentStubFileRequestError, AgentStubFileRequestHandler
from dify_agent.agent_stub.server.tokens.agent_stub import AgentStubPrincipal, AgentStubTokenCodec, AgentStubTokenError
@ -43,11 +47,12 @@ class AgentStubControlPlaneService:
HTTP and gRPC adapters validate or decode transport payloads before calling
this service, so this layer focuses only on shared auth, connection-id
generation, and file-request delegation.
generation, plus file and drive request delegation.
"""
token_codec: AgentStubTokenCodec | None
file_request_handler: AgentStubFileRequestHandler | None = None
drive_request_handler: AgentStubDriveRequestHandler | None = None
connection_id_factory: Callable[[], str] = field(default=lambda: str(uuid4()))
async def connect(self, *, authorization: str | None) -> AgentStubConnectResponse:
@ -83,6 +88,39 @@ class AgentStubControlPlaneService:
except AgentStubFileRequestError as exc:
raise AgentStubControlPlaneError(exc.status_code, exc.detail) from exc
async def get_drive_manifest(
self,
*,
prefix: str,
include_download_url: bool,
authorization: str | None,
) -> AgentStubDriveManifestResponse:
"""Authenticate and delegate one drive manifest request."""
principal = self._authenticate(authorization)
handler = self._require_drive_request_handler()
try:
return await handler.get_manifest(
principal=principal,
prefix=prefix,
include_download_url=include_download_url,
)
except AgentStubDriveRequestError as exc:
raise AgentStubControlPlaneError(exc.status_code, exc.detail) from exc
async def commit_drive(
self,
*,
request: AgentStubDriveCommitRequest,
authorization: str | None,
) -> AgentStubDriveCommitResponse:
"""Authenticate and delegate one drive commit request."""
principal = self._authenticate(authorization)
handler = self._require_drive_request_handler()
try:
return await handler.commit(principal=principal, request=request)
except AgentStubDriveRequestError as exc:
raise AgentStubControlPlaneError(exc.status_code, exc.detail) from exc
def _authenticate(self, authorization: str | None) -> AgentStubPrincipal:
token_codec = self.token_codec
if token_codec is None:
@ -97,6 +135,11 @@ class AgentStubControlPlaneService:
raise AgentStubConfigurationError(503, "Agent Stub file API is not configured")
return self.file_request_handler
def _require_drive_request_handler(self) -> AgentStubDriveRequestHandler:
if self.drive_request_handler is None:
raise AgentStubConfigurationError(503, "Agent Stub drive API is not configured")
return self.drive_request_handler
__all__ = [
"AgentStubAuthenticationError",

View File

@ -1,17 +1,18 @@
"""Embeddable router factory for Dify Agent stub endpoints.
Both the standalone stub server and the standard run server mount the same
router so the Agent Stub protocol, token validation, and file-control-plane
behavior stay identical regardless of hosting mode. The factory is intentionally
settings-agnostic: callers must pass already constructed token-codec and file
handler dependencies rather than having this module read environment variables
or import server settings directly.
router so the Agent Stub protocol, token validation, and file/drive
control-plane behavior stay identical regardless of hosting mode. The factory is
intentionally settings-agnostic: callers must pass already constructed
token-codec and request-handler dependencies rather than having this module read
environment variables or import server settings directly.
"""
from __future__ import annotations
from fastapi import APIRouter
from dify_agent.agent_stub.server.agent_stub_drive import AgentStubDriveRequestHandler
from dify_agent.agent_stub.server.agent_stub_files import AgentStubFileRequestHandler
from dify_agent.agent_stub.server.routes.agent_stub import create_agent_stub_http_router
from dify_agent.agent_stub.server.tokens.agent_stub import AgentStubTokenCodec
@ -21,9 +22,10 @@ def create_agent_stub_router(
*,
token_codec: AgentStubTokenCodec | None,
file_request_handler: AgentStubFileRequestHandler | None = None,
drive_request_handler: AgentStubDriveRequestHandler | None = None,
) -> APIRouter:
"""Build the embeddable stub router from pre-built server dependencies."""
return create_agent_stub_http_router(token_codec, file_request_handler)
return create_agent_stub_http_router(token_codec, file_request_handler, drive_request_handler)
__all__ = ["create_agent_stub_router"]

View File

@ -2,8 +2,8 @@
The router is a thin HTTP adapter around ``AgentStubControlPlaneService``. It
keeps FastAPI-specific request parsing and HTTPException translation here while
sharing auth, DTO validation, connection-id generation, and file delegation with
the gRPC transport.
sharing auth, DTO validation, connection-id generation, and file/drive
delegation with the gRPC transport.
"""
from __future__ import annotations
@ -13,11 +13,15 @@ from fastapi import APIRouter, Header, HTTPException
from dify_agent.agent_stub.protocol.agent_stub import (
AgentStubConnectRequest,
AgentStubConnectResponse,
AgentStubDriveCommitRequest,
AgentStubDriveCommitResponse,
AgentStubDriveManifestResponse,
AgentStubFileDownloadRequest,
AgentStubFileDownloadResponse,
AgentStubFileUploadRequest,
AgentStubFileUploadResponse,
)
from dify_agent.agent_stub.server.agent_stub_drive import AgentStubDriveRequestHandler
from dify_agent.agent_stub.server.agent_stub_files import AgentStubFileRequestHandler
from dify_agent.agent_stub.server.control_plane import AgentStubControlPlaneError, AgentStubControlPlaneService
from dify_agent.agent_stub.server.tokens.agent_stub import AgentStubTokenCodec
@ -26,10 +30,11 @@ from dify_agent.agent_stub.server.tokens.agent_stub import AgentStubTokenCodec
def create_agent_stub_http_router(
token_codec: AgentStubTokenCodec | None,
file_request_handler: AgentStubFileRequestHandler | None = None,
drive_request_handler: AgentStubDriveRequestHandler | None = None,
) -> APIRouter:
"""Create HTTP routes bound to the application's Agent Stub dependencies."""
router = APIRouter(prefix="/agent-stub", tags=["agent-stub"])
service = AgentStubControlPlaneService(token_codec, file_request_handler)
service = AgentStubControlPlaneService(token_codec, file_request_handler, drive_request_handler)
@router.post("/connections", response_model=AgentStubConnectResponse)
async def create_connection(
@ -62,6 +67,31 @@ def create_agent_stub_http_router(
except AgentStubControlPlaneError as exc:
raise HTTPException(status_code=exc.status_code, detail=exc.detail) from exc
@router.get("/drive/manifest", response_model=AgentStubDriveManifestResponse)
async def get_drive_manifest(
prefix: str = "",
include_download_url: bool = False,
authorization: str | None = Header(default=None, alias="Authorization"),
) -> AgentStubDriveManifestResponse:
try:
return await service.get_drive_manifest(
prefix=prefix,
include_download_url=include_download_url,
authorization=authorization,
)
except AgentStubControlPlaneError as exc:
raise HTTPException(status_code=exc.status_code, detail=exc.detail) from exc
@router.post("/drive/commit", response_model=AgentStubDriveCommitResponse)
async def commit_drive(
request: AgentStubDriveCommitRequest,
authorization: str | None = Header(default=None, alias="Authorization"),
) -> AgentStubDriveCommitResponse:
try:
return await service.commit_drive(request=request, authorization=authorization)
except AgentStubControlPlaneError as exc:
raise HTTPException(status_code=exc.status_code, detail=exc.detail) from exc
return router

View File

@ -46,7 +46,7 @@ from dify_agent.protocol import (
_ResponseModelT = TypeVar("_ResponseModelT", bound=BaseModel)
_TERMINAL_EVENT_TYPES = {"run_succeeded", "run_failed", "run_cancelled"}
_TERMINAL_RUN_STATUSES = {"succeeded", "failed", "cancelled"}
_FUNCTION_TOOL_RESULT_PAYLOAD_KEY: str | None = None
_function_tool_result_payload_key_cache: str | None = None
class DifyAgentClientError(RuntimeError):
@ -176,13 +176,13 @@ def _function_tool_result_payload_key() -> str:
during local development or rolling deploys, so the client normalizes the
remote frame into the local schema before Pydantic validation.
"""
global _FUNCTION_TOOL_RESULT_PAYLOAD_KEY
if _FUNCTION_TOOL_RESULT_PAYLOAD_KEY is not None:
return _FUNCTION_TOOL_RESULT_PAYLOAD_KEY
global _function_tool_result_payload_key_cache
if _function_tool_result_payload_key_cache is not None:
return _function_tool_result_payload_key_cache
parameters = list(inspect.signature(FunctionToolResultEvent).parameters)
_FUNCTION_TOOL_RESULT_PAYLOAD_KEY = "part" if parameters and parameters[0] == "part" else "result"
return _FUNCTION_TOOL_RESULT_PAYLOAD_KEY
_function_tool_result_payload_key_cache = "part" if parameters and parameters[0] == "part" else "result"
return _function_tool_result_payload_key_cache
def _normalize_run_event_payload_for_local_pydantic_ai(payload: Any) -> Any:

View File

@ -32,7 +32,7 @@ class DifyPluginLLMDeps(LayerDeps):
class DifyPluginLLMLayer(PlainLayer[DifyPluginLLMDeps, DifyPluginLLMLayerConfig]):
"""Layer that creates the Dify plugin-daemon Pydantic AI model."""
type_id: ClassVar[str] = DIFY_PLUGIN_LLM_LAYER_TYPE_ID
type_id: ClassVar[str | None] = DIFY_PLUGIN_LLM_LAYER_TYPE_ID
config: DifyPluginLLMLayerConfig

View File

@ -58,7 +58,7 @@ class DifyPluginToolsDeps(LayerDeps):
class DifyPluginToolsLayer(PlainLayer[DifyPluginToolsDeps, DifyPluginToolsLayerConfig]):
"""Layer that resolves Dify plugin tools into Pydantic AI tools."""
type_id: ClassVar[str] = DIFY_PLUGIN_TOOLS_LAYER_TYPE_ID
type_id: ClassVar[str | None] = DIFY_PLUGIN_TOOLS_LAYER_TYPE_ID
config: DifyPluginToolsLayerConfig

View File

@ -21,7 +21,7 @@ from dify_agent.layers.drive.configs import DIFY_DRIVE_LAYER_TYPE_ID, DifyDriveL
class DifyDriveLayer(PlainLayer[NoLayerDeps, DifyDriveLayerConfig, EmptyRuntimeState]):
"""Config-only carrier of the drive Skills & Files manifest."""
type_id: ClassVar[str] = DIFY_DRIVE_LAYER_TYPE_ID
type_id: ClassVar[str | None] = DIFY_DRIVE_LAYER_TYPE_ID
config: DifyDriveLayerConfig

View File

@ -37,6 +37,7 @@ def create_app(settings: ServerSettings | None = None) -> FastAPI:
resolved_settings = settings or ServerSettings()
agent_stub_token_codec = resolved_settings.create_agent_stub_token_codec()
agent_stub_file_request_handler = resolved_settings.create_agent_stub_file_request_handler()
agent_stub_drive_request_handler = resolved_settings.create_agent_stub_drive_request_handler()
layer_providers = create_default_layer_providers(
plugin_daemon_url=resolved_settings.plugin_daemon_url,
plugin_daemon_api_key=resolved_settings.plugin_daemon_api_key,
@ -106,6 +107,7 @@ def create_app(settings: ServerSettings | None = None) -> FastAPI:
create_agent_stub_router(
token_codec=agent_stub_token_codec,
file_request_handler=agent_stub_file_request_handler,
drive_request_handler=agent_stub_drive_request_handler,
)
)
return app
@ -135,12 +137,7 @@ def create_dify_api_inner_http_client(settings: ServerSettings) -> httpx.AsyncCl
def _create_shared_http_client(settings: ServerSettings) -> httpx.AsyncClient:
"""Build one shared HTTP client using generic outbound timeout/pool settings."""
return httpx.AsyncClient(
timeout=httpx.Timeout(
connect=settings.outbound_http_connect_timeout,
read=settings.outbound_http_read_timeout,
write=settings.outbound_http_write_timeout,
pool=settings.outbound_http_pool_timeout,
),
timeout=settings.create_outbound_http_timeout(),
limits=httpx.Limits(
max_connections=settings.outbound_http_max_connections,
max_keepalive_connections=settings.outbound_http_max_keepalive_connections,

View File

@ -6,16 +6,19 @@ Dify API inner calls. Layers and Agenton providers do not own those clients, so
these settings are process resource limits rather than per-run lifecycle knobs.
Endpoint URLs and API keys stay service-specific. The Agent Stub also uses this
settings model directly: the public Agent Stub URL, server secret, optional gRPC
bind override, and optional Dify inner API file-request settings all live here
under the longstanding ``DIFY_AGENT_...`` environment-variable namespace.
bind override, and optional Dify inner API file/drive request settings all live
here under the longstanding ``DIFY_AGENT_...`` environment-variable namespace.
"""
import httpx
from typing import ClassVar
from pydantic import AnyHttpUrl, Field, TypeAdapter, field_validator, model_validator
from pydantic_settings import BaseSettings, SettingsConfigDict
from dify_agent.agent_stub.protocol.agent_stub import normalize_agent_stub_url, parse_agent_stub_endpoint
from dify_agent.agent_stub.server.agent_stub_drive import DifyApiAgentStubDriveRequestHandler
from dify_agent.agent_stub.server.agent_stub_files import DifyApiAgentStubFileRequestHandler
from dify_agent.agent_stub.server.grpc_bind import normalize_agent_stub_grpc_bind_address
from dify_agent.agent_stub.server.tokens.agent_stub import AgentStubTokenCodec, decode_server_secret_key
@ -145,5 +148,28 @@ class ServerSettings(BaseSettings):
dify_api_inner_api_key=self.dify_api_inner_api_key,
)
def create_agent_stub_drive_request_handler(self) -> DifyApiAgentStubDriveRequestHandler | None:
"""Return the Dify API drive bridge when both Dify API settings are configured.
Drive manifest and commit requests should honor the same outbound timeout
settings as the server's other trusted Dify API HTTP calls.
"""
if self.dify_api_base_url is None or self.dify_api_inner_api_key is None:
return None
return DifyApiAgentStubDriveRequestHandler(
dify_api_base_url=self.dify_api_base_url,
dify_api_inner_api_key=self.dify_api_inner_api_key,
timeout=self.create_outbound_http_timeout(),
)
def create_outbound_http_timeout(self) -> httpx.Timeout:
"""Build one shared outbound HTTP timeout object from server settings."""
return httpx.Timeout(
connect=self.outbound_http_connect_timeout,
read=self.outbound_http_read_timeout,
write=self.outbound_http_write_timeout,
pool=self.outbound_http_pool_timeout,
)
__all__ = ["DEFAULT_RUN_RETENTION_SECONDS", "ServerSettings"]

View File

@ -0,0 +1,651 @@
from __future__ import annotations
from io import BytesIO
from pathlib import Path
import stat
from zipfile import ZipFile, ZipInfo
import pytest
from dify_agent.agent_stub.cli._drive import (
list_drive_from_environment,
pull_drive_from_environment,
push_drive_from_environment,
)
from dify_agent.agent_stub.cli._files import UploadedToolFileMapping, UploadedToolFileResource
from dify_agent.agent_stub.client._errors import AgentStubTransferError, AgentStubValidationError
from dify_agent.agent_stub.protocol.agent_stub import (
AgentStubDriveCommitRequest,
AgentStubDriveCommitResponse,
AgentStubDriveItem,
AgentStubDriveManifestResponse,
)
def test_list_drive_from_environment_returns_manifest_json_model(monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.setenv("DIFY_AGENT_STUB_URL", "https://agent.example.com/agent-stub")
monkeypatch.setenv("DIFY_AGENT_STUB_AUTH_JWE", "test-jwe")
captured: dict[str, object] = {}
def fake_manifest(**kwargs):
captured.update(kwargs)
return AgentStubDriveManifestResponse(
items=[
AgentStubDriveItem(
key="skills/example/SKILL.md",
size=12,
hash="sha256:abc",
mime_type="text/markdown",
file_kind="tool_file",
file_id="tool-file-1",
)
]
)
monkeypatch.setattr(
"dify_agent.agent_stub.cli._drive.request_agent_stub_drive_manifest_sync",
fake_manifest,
)
result = list_drive_from_environment(prefix="skills/", json_output=True)
assert isinstance(result, AgentStubDriveManifestResponse)
assert result.items[0].key == "skills/example/SKILL.md"
assert captured["prefix"] == "skills/"
assert captured["include_download_url"] is False
def test_list_drive_from_environment_returns_human_readable_listing(monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.setenv("DIFY_AGENT_STUB_URL", "https://agent.example.com/agent-stub")
monkeypatch.setenv("DIFY_AGENT_STUB_AUTH_JWE", "test-jwe")
captured: dict[str, object] = {}
def fake_manifest(**kwargs):
captured.update(kwargs)
return AgentStubDriveManifestResponse(
items=[
AgentStubDriveItem(
key="skills/example/SKILL.md",
size=12,
hash=None,
mime_type="text/markdown",
file_kind="tool_file",
file_id="tool-file-1",
),
AgentStubDriveItem(
key="skills/example/helper.py",
size=None,
hash="sha256:abc",
mime_type=None,
file_kind="tool_file",
file_id="tool-file-2",
),
]
)
monkeypatch.setattr(
"dify_agent.agent_stub.cli._drive.request_agent_stub_drive_manifest_sync",
fake_manifest,
)
result = list_drive_from_environment(prefix="skills/", json_output=False)
assert result == ("12\ttext/markdown\t-\tskills/example/SKILL.md\n-\t-\tsha256:abc\tskills/example/helper.py")
assert captured["prefix"] == "skills/"
assert captured["include_download_url"] is False
def test_pull_drive_from_environment_writes_files_under_drive_base(
monkeypatch: pytest.MonkeyPatch,
tmp_path: Path,
) -> None:
monkeypatch.setenv("DIFY_AGENT_STUB_URL", "https://agent.example.com/agent-stub")
monkeypatch.setenv("DIFY_AGENT_STUB_AUTH_JWE", "test-jwe")
captured: dict[str, object] = {}
def fake_manifest(**kwargs):
captured.update(kwargs)
return AgentStubDriveManifestResponse(
items=[
AgentStubDriveItem(
key="skills/example/SKILL.md",
size=11,
hash=None,
mime_type="text/markdown",
file_kind="tool_file",
file_id="tool-file-1",
download_url="https://files.example.com/download",
)
]
)
monkeypatch.setattr(
"dify_agent.agent_stub.cli._drive.request_agent_stub_drive_manifest_sync",
fake_manifest,
)
monkeypatch.setattr(
"dify_agent.agent_stub.cli._drive.download_file_bytes_from_signed_url_sync",
lambda **_kwargs: b"hello world",
)
results = pull_drive_from_environment(prefix="skills/", drive_base=str(tmp_path))
assert results == [tmp_path / "skills" / "example" / "SKILL.md"]
assert results[0].read_bytes() == b"hello world"
assert captured["prefix"] == "skills/"
assert captured["include_download_url"] is True
def test_pull_drive_from_environment_auto_extracts_skill_archive(
monkeypatch: pytest.MonkeyPatch,
tmp_path: Path,
) -> None:
archive_buffer = BytesIO()
with ZipFile(archive_buffer, mode="w") as archive:
archive.writestr("SKILL.md", "# Example\n")
archive.writestr("nested/helper.py", "print('x')\n")
archive_bytes = archive_buffer.getvalue()
monkeypatch.setenv("DIFY_AGENT_STUB_URL", "https://agent.example.com/agent-stub")
monkeypatch.setenv("DIFY_AGENT_STUB_AUTH_JWE", "test-jwe")
monkeypatch.setattr(
"dify_agent.agent_stub.cli._drive.request_agent_stub_drive_manifest_sync",
lambda **_kwargs: AgentStubDriveManifestResponse(
items=[
AgentStubDriveItem(
key="skills/foo/.DIFY-SKILL-FULL.zip",
size=len(archive_bytes),
hash=None,
mime_type="application/zip",
file_kind="tool_file",
file_id="tool-file-1",
download_url="https://files.example.com/download",
)
]
),
)
monkeypatch.setattr(
"dify_agent.agent_stub.cli._drive.download_file_bytes_from_signed_url_sync",
lambda **_kwargs: archive_bytes,
)
results = pull_drive_from_environment(prefix="skills/foo", drive_base=str(tmp_path))
archive_path = tmp_path / "skills" / "foo" / ".DIFY-SKILL-FULL.zip"
assert results == [archive_path]
assert archive_path.read_bytes() == archive_bytes
assert (tmp_path / "skills" / "foo" / "SKILL.md").read_text(encoding="utf-8") == "# Example\n"
assert (tmp_path / "skills" / "foo" / "nested" / "helper.py").read_text(encoding="utf-8") == "print('x')\n"
def test_pull_drive_from_environment_rejects_traversal_keys(
monkeypatch: pytest.MonkeyPatch,
tmp_path: Path,
) -> None:
monkeypatch.setenv("DIFY_AGENT_STUB_URL", "https://agent.example.com/agent-stub")
monkeypatch.setenv("DIFY_AGENT_STUB_AUTH_JWE", "test-jwe")
monkeypatch.setattr(
"dify_agent.agent_stub.cli._drive.request_agent_stub_drive_manifest_sync",
lambda **_kwargs: AgentStubDriveManifestResponse(
items=[
AgentStubDriveItem(
key="../escape.txt",
size=4,
hash=None,
mime_type="text/plain",
file_kind="tool_file",
file_id="tool-file-1",
download_url="https://files.example.com/download",
)
]
),
)
with pytest.raises(AgentStubValidationError, match="outside the drive base"):
_ = pull_drive_from_environment(prefix="", drive_base=str(tmp_path))
def test_pull_drive_from_environment_rejects_skill_archive_path_traversal(
monkeypatch: pytest.MonkeyPatch,
tmp_path: Path,
) -> None:
archive_buffer = BytesIO()
with ZipFile(archive_buffer, mode="w") as archive:
archive.writestr("SKILL.md", "# Example\n")
archive.writestr("../escape.txt", "escape")
archive_bytes = archive_buffer.getvalue()
monkeypatch.setenv("DIFY_AGENT_STUB_URL", "https://agent.example.com/agent-stub")
monkeypatch.setenv("DIFY_AGENT_STUB_AUTH_JWE", "test-jwe")
monkeypatch.setattr(
"dify_agent.agent_stub.cli._drive.request_agent_stub_drive_manifest_sync",
lambda **_kwargs: AgentStubDriveManifestResponse(
items=[
AgentStubDriveItem(
key="skills/foo/.DIFY-SKILL-FULL.zip",
size=len(archive_bytes),
hash=None,
mime_type="application/zip",
file_kind="tool_file",
file_id="tool-file-1",
download_url="https://files.example.com/download",
)
]
),
)
monkeypatch.setattr(
"dify_agent.agent_stub.cli._drive.download_file_bytes_from_signed_url_sync",
lambda **_kwargs: archive_bytes,
)
with pytest.raises(AgentStubValidationError, match="path traversal"):
_ = pull_drive_from_environment(prefix="skills/foo", drive_base=str(tmp_path))
assert not (tmp_path / "skills" / "foo" / "SKILL.md").exists()
def test_pull_drive_from_environment_rejects_skill_archive_absolute_entry(
monkeypatch: pytest.MonkeyPatch,
tmp_path: Path,
) -> None:
archive_buffer = BytesIO()
with ZipFile(archive_buffer, mode="w") as archive:
archive.writestr("/escape.txt", "escape")
archive_bytes = archive_buffer.getvalue()
monkeypatch.setenv("DIFY_AGENT_STUB_URL", "https://agent.example.com/agent-stub")
monkeypatch.setenv("DIFY_AGENT_STUB_AUTH_JWE", "test-jwe")
monkeypatch.setattr(
"dify_agent.agent_stub.cli._drive.request_agent_stub_drive_manifest_sync",
lambda **_kwargs: AgentStubDriveManifestResponse(
items=[
AgentStubDriveItem(
key="skills/foo/.DIFY-SKILL-FULL.zip",
size=len(archive_bytes),
hash=None,
mime_type="application/zip",
file_kind="tool_file",
file_id="tool-file-1",
download_url="https://files.example.com/download",
)
]
),
)
monkeypatch.setattr(
"dify_agent.agent_stub.cli._drive.download_file_bytes_from_signed_url_sync",
lambda **_kwargs: archive_bytes,
)
with pytest.raises(AgentStubValidationError, match="absolute path"):
_ = pull_drive_from_environment(prefix="skills/foo", drive_base=str(tmp_path))
def test_pull_drive_from_environment_rejects_skill_archive_symlink_entry(
monkeypatch: pytest.MonkeyPatch,
tmp_path: Path,
) -> None:
archive_buffer = BytesIO()
with ZipFile(archive_buffer, mode="w") as archive:
symlink_info = ZipInfo("linked.txt")
symlink_info.external_attr = (stat.S_IFLNK | 0o777) << 16
archive.writestr(symlink_info, "outside.txt")
archive_bytes = archive_buffer.getvalue()
monkeypatch.setenv("DIFY_AGENT_STUB_URL", "https://agent.example.com/agent-stub")
monkeypatch.setenv("DIFY_AGENT_STUB_AUTH_JWE", "test-jwe")
monkeypatch.setattr(
"dify_agent.agent_stub.cli._drive.request_agent_stub_drive_manifest_sync",
lambda **_kwargs: AgentStubDriveManifestResponse(
items=[
AgentStubDriveItem(
key="skills/foo/.DIFY-SKILL-FULL.zip",
size=len(archive_bytes),
hash=None,
mime_type="application/zip",
file_kind="tool_file",
file_id="tool-file-1",
download_url="https://files.example.com/download",
)
]
),
)
monkeypatch.setattr(
"dify_agent.agent_stub.cli._drive.download_file_bytes_from_signed_url_sync",
lambda **_kwargs: archive_bytes,
)
with pytest.raises(AgentStubValidationError, match="symlink entry"):
_ = pull_drive_from_environment(prefix="skills/foo", drive_base=str(tmp_path))
def test_pull_drive_from_environment_rejects_invalid_skill_archive(
monkeypatch: pytest.MonkeyPatch,
tmp_path: Path,
) -> None:
archive_bytes = b"not-a-zip"
monkeypatch.setenv("DIFY_AGENT_STUB_URL", "https://agent.example.com/agent-stub")
monkeypatch.setenv("DIFY_AGENT_STUB_AUTH_JWE", "test-jwe")
monkeypatch.setattr(
"dify_agent.agent_stub.cli._drive.request_agent_stub_drive_manifest_sync",
lambda **_kwargs: AgentStubDriveManifestResponse(
items=[
AgentStubDriveItem(
key="skills/foo/.DIFY-SKILL-FULL.zip",
size=len(archive_bytes),
hash=None,
mime_type="application/zip",
file_kind="tool_file",
file_id="tool-file-1",
download_url="https://files.example.com/download",
)
]
),
)
monkeypatch.setattr(
"dify_agent.agent_stub.cli._drive.download_file_bytes_from_signed_url_sync",
lambda **_kwargs: archive_bytes,
)
with pytest.raises(AgentStubTransferError, match="downloaded skill archive is invalid"):
_ = pull_drive_from_environment(prefix="skills/foo", drive_base=str(tmp_path))
def test_pull_drive_from_environment_rejects_missing_download_url(
monkeypatch: pytest.MonkeyPatch,
tmp_path: Path,
) -> None:
monkeypatch.setenv("DIFY_AGENT_STUB_URL", "https://agent.example.com/agent-stub")
monkeypatch.setenv("DIFY_AGENT_STUB_AUTH_JWE", "test-jwe")
monkeypatch.setattr(
"dify_agent.agent_stub.cli._drive.request_agent_stub_drive_manifest_sync",
lambda **_kwargs: AgentStubDriveManifestResponse(
items=[
AgentStubDriveItem(
key="skills/example/SKILL.md",
size=11,
hash=None,
mime_type="text/markdown",
file_kind="tool_file",
file_id="tool-file-1",
)
]
),
)
with pytest.raises(AgentStubValidationError, match="missing download_url"):
_ = pull_drive_from_environment(prefix="skills/", drive_base=str(tmp_path))
def test_pull_drive_from_environment_rejects_size_mismatch(
monkeypatch: pytest.MonkeyPatch,
tmp_path: Path,
) -> None:
monkeypatch.setenv("DIFY_AGENT_STUB_URL", "https://agent.example.com/agent-stub")
monkeypatch.setenv("DIFY_AGENT_STUB_AUTH_JWE", "test-jwe")
monkeypatch.setattr(
"dify_agent.agent_stub.cli._drive.request_agent_stub_drive_manifest_sync",
lambda **_kwargs: AgentStubDriveManifestResponse(
items=[
AgentStubDriveItem(
key="skills/example/SKILL.md",
size=99,
hash=None,
mime_type="text/markdown",
file_kind="tool_file",
file_id="tool-file-1",
download_url="https://files.example.com/download",
)
]
),
)
monkeypatch.setattr(
"dify_agent.agent_stub.cli._drive.download_file_bytes_from_signed_url_sync",
lambda **_kwargs: b"hello world",
)
with pytest.raises(AgentStubTransferError, match="size mismatch"):
_ = pull_drive_from_environment(prefix="skills/", drive_base=str(tmp_path))
def test_push_drive_from_environment_commits_single_file(monkeypatch: pytest.MonkeyPatch, tmp_path: Path) -> None:
source = tmp_path / "report.pdf"
source.write_bytes(b"report")
monkeypatch.setenv("DIFY_AGENT_STUB_URL", "https://agent.example.com/agent-stub")
monkeypatch.setenv("DIFY_AGENT_STUB_AUTH_JWE", "test-jwe")
monkeypatch.setattr(
"dify_agent.agent_stub.cli._drive.upload_tool_file_resource_from_environment",
lambda *, path: UploadedToolFileResource(
mapping=UploadedToolFileMapping(reference="dify-file-ref:tool-file-1"),
tool_file_id="tool-file-1",
),
)
captured: dict[str, object] = {}
def fake_commit(**kwargs):
captured.update(kwargs)
return AgentStubDriveCommitResponse(
items=[
AgentStubDriveItem(
key="files/report.pdf",
size=6,
hash=None,
mime_type="application/pdf",
file_kind="tool_file",
file_id="tool-file-1",
value_owned_by_drive=True,
)
]
)
monkeypatch.setattr("dify_agent.agent_stub.cli._drive.request_agent_stub_drive_commit_sync", fake_commit)
response = push_drive_from_environment(local_path=str(source), drive_path="files/report.pdf", recursive=False)
assert response.items[0].key == "files/report.pdf"
request = captured["request"]
assert isinstance(request, AgentStubDriveCommitRequest)
assert request.items[0].model_dump(mode="json") == {
"key": "files/report.pdf",
"file_ref": {"kind": "tool_file", "id": "tool-file-1"},
"value_owned_by_drive": True,
}
def test_push_drive_from_environment_requires_skill_md_for_non_recursive_directory(
monkeypatch: pytest.MonkeyPatch,
tmp_path: Path,
) -> None:
skill_dir = tmp_path / "skill"
skill_dir.mkdir()
monkeypatch.setenv("DIFY_AGENT_STUB_URL", "https://agent.example.com/agent-stub")
monkeypatch.setenv("DIFY_AGENT_STUB_AUTH_JWE", "test-jwe")
with pytest.raises(AgentStubValidationError, match="SKILL.md"):
_ = push_drive_from_environment(local_path=str(skill_dir), drive_path="skills/example", recursive=False)
def test_push_drive_from_environment_standardizes_non_recursive_skill_directory(
monkeypatch: pytest.MonkeyPatch,
tmp_path: Path,
) -> None:
skill_dir = tmp_path / "skill"
skill_dir.mkdir()
(skill_dir / "SKILL.md").write_text("# Example\n", encoding="utf-8")
(skill_dir / "helper.py").write_text("print('x')\n", encoding="utf-8")
monkeypatch.setenv("DIFY_AGENT_STUB_URL", "https://agent.example.com/agent-stub")
monkeypatch.setenv("DIFY_AGENT_STUB_AUTH_JWE", "test-jwe")
uploaded_paths: list[str] = []
def fake_upload(*, path: str) -> UploadedToolFileResource:
uploaded_paths.append(Path(path).name)
return UploadedToolFileResource(
mapping=UploadedToolFileMapping(reference=f"dify-file-ref:{Path(path).name}"),
tool_file_id=Path(path).name,
)
monkeypatch.setattr("dify_agent.agent_stub.cli._drive.upload_tool_file_resource_from_environment", fake_upload)
monkeypatch.setattr(
"dify_agent.agent_stub.cli._drive.request_agent_stub_drive_commit_sync",
lambda **kwargs: AgentStubDriveCommitResponse(
items=[
AgentStubDriveItem(
key=item.key,
size=None,
hash=None,
mime_type=None,
file_kind=item.file_ref.kind,
file_id=item.file_ref.id,
value_owned_by_drive=item.value_owned_by_drive,
)
for item in kwargs["request"].items
]
),
)
response = push_drive_from_environment(local_path=str(skill_dir), drive_path="skills/example", recursive=False)
assert set(uploaded_paths) == {"SKILL.md", ".DIFY-SKILL-FULL.zip"}
assert {item.key for item in response.items} == {
"skills/example/SKILL.md",
"skills/example/.DIFY-SKILL-FULL.zip",
}
def test_push_drive_from_environment_non_recursive_archive_excludes_transient_entries(
monkeypatch: pytest.MonkeyPatch,
tmp_path: Path,
) -> None:
skill_dir = tmp_path / "skill"
skill_dir.mkdir()
(skill_dir / "SKILL.md").write_text("# Example\n", encoding="utf-8")
(skill_dir / "helper.py").write_text("print('x')\n", encoding="utf-8")
(skill_dir / ".DIFY-SKILL-FULL.zip").write_bytes(b"old-archive")
git_dir = skill_dir / ".git"
git_dir.mkdir()
(git_dir / "config").write_text("[core]\n", encoding="utf-8")
pycache_dir = skill_dir / "__pycache__"
pycache_dir.mkdir()
(pycache_dir / "helper.pyc").write_bytes(b"compiled")
monkeypatch.setenv("DIFY_AGENT_STUB_URL", "https://agent.example.com/agent-stub")
monkeypatch.setenv("DIFY_AGENT_STUB_AUTH_JWE", "test-jwe")
archive_entries: list[str] = []
def fake_upload(*, path: str) -> UploadedToolFileResource:
if Path(path).name == ".DIFY-SKILL-FULL.zip":
with ZipFile(path) as archive:
archive_entries.extend(sorted(archive.namelist()))
return UploadedToolFileResource(
mapping=UploadedToolFileMapping(reference=f"dify-file-ref:{Path(path).name}"),
tool_file_id=Path(path).name,
)
monkeypatch.setattr("dify_agent.agent_stub.cli._drive.upload_tool_file_resource_from_environment", fake_upload)
monkeypatch.setattr(
"dify_agent.agent_stub.cli._drive.request_agent_stub_drive_commit_sync",
lambda **kwargs: AgentStubDriveCommitResponse(
items=[
AgentStubDriveItem(
key=item.key,
size=None,
hash=None,
mime_type=None,
file_kind=item.file_ref.kind,
file_id=item.file_ref.id,
value_owned_by_drive=item.value_owned_by_drive,
)
for item in kwargs["request"].items
]
),
)
_ = push_drive_from_environment(local_path=str(skill_dir), drive_path="skills/example", recursive=False)
assert {"SKILL.md", "helper.py"}.issubset(archive_entries)
assert ".git/config" not in archive_entries
assert "__pycache__/helper.pyc" not in archive_entries
assert ".DIFY-SKILL-FULL.zip" not in archive_entries
def test_push_drive_from_environment_non_recursive_rejects_symlinked_archive_entries(
monkeypatch: pytest.MonkeyPatch,
tmp_path: Path,
) -> None:
skill_dir = tmp_path / "skill"
skill_dir.mkdir()
(skill_dir / "SKILL.md").write_text("# Example\n", encoding="utf-8")
outside = tmp_path / "outside.txt"
outside.write_text("outside", encoding="utf-8")
(skill_dir / "linked.txt").symlink_to(outside)
monkeypatch.setenv("DIFY_AGENT_STUB_URL", "https://agent.example.com/agent-stub")
monkeypatch.setenv("DIFY_AGENT_STUB_AUTH_JWE", "test-jwe")
with pytest.raises(AgentStubValidationError, match="symlink"):
_ = push_drive_from_environment(local_path=str(skill_dir), drive_path="skills/example", recursive=False)
def test_push_drive_from_environment_rejects_symlinked_recursive_files(
monkeypatch: pytest.MonkeyPatch,
tmp_path: Path,
) -> None:
root = tmp_path / "skill"
root.mkdir()
outside = tmp_path / "outside.txt"
outside.write_text("outside", encoding="utf-8")
(root / "linked.txt").symlink_to(outside)
monkeypatch.setenv("DIFY_AGENT_STUB_URL", "https://agent.example.com/agent-stub")
monkeypatch.setenv("DIFY_AGENT_STUB_AUTH_JWE", "test-jwe")
with pytest.raises(AgentStubValidationError, match="symlink"):
_ = push_drive_from_environment(local_path=str(root), drive_path="skills/example", recursive=True)
def test_push_drive_from_environment_recursive_keeps_user_files_that_skill_packaging_skips(
monkeypatch: pytest.MonkeyPatch,
tmp_path: Path,
) -> None:
root = tmp_path / "skill"
root.mkdir()
(root / ".DIFY-SKILL-FULL.zip").write_bytes(b"archive")
node_modules_dir = root / "node_modules"
node_modules_dir.mkdir()
(node_modules_dir / "module.js").write_text("export default 1\n", encoding="utf-8")
monkeypatch.setenv("DIFY_AGENT_STUB_URL", "https://agent.example.com/agent-stub")
monkeypatch.setenv("DIFY_AGENT_STUB_AUTH_JWE", "test-jwe")
uploaded_paths: list[str] = []
def fake_upload(*, path: str) -> UploadedToolFileResource:
uploaded_paths.append(Path(path).relative_to(root).as_posix())
return UploadedToolFileResource(
mapping=UploadedToolFileMapping(reference=f"dify-file-ref:{Path(path).name}"),
tool_file_id=Path(path).name,
)
monkeypatch.setattr("dify_agent.agent_stub.cli._drive.upload_tool_file_resource_from_environment", fake_upload)
monkeypatch.setattr(
"dify_agent.agent_stub.cli._drive.request_agent_stub_drive_commit_sync",
lambda **kwargs: AgentStubDriveCommitResponse(
items=[
AgentStubDriveItem(
key=item.key,
size=None,
hash=None,
mime_type=None,
file_kind=item.file_ref.kind,
file_id=item.file_ref.id,
value_owned_by_drive=item.value_owned_by_drive,
)
for item in kwargs["request"].items
]
),
)
response = push_drive_from_environment(local_path=str(root), drive_path="skills/example", recursive=True)
assert set(uploaded_paths) == {".DIFY-SKILL-FULL.zip", "node_modules/module.js"}
assert {item.key for item in response.items} == {
"skills/example/.DIFY-SKILL-FULL.zip",
"skills/example/node_modules/module.js",
}

View File

@ -6,7 +6,11 @@ from pathlib import Path
import pytest
from dify_agent.agent_stub.cli._files import download_file_from_environment, upload_file_from_environment
from dify_agent.agent_stub.cli._files import (
download_file_from_environment,
upload_file_from_environment,
upload_tool_file_resource_from_environment,
)
from dify_agent.agent_stub.client._errors import AgentStubTransferError
@ -36,6 +40,7 @@ def test_upload_file_from_environment_requests_signed_url_and_normalizes_output(
captured["file_bytes"] = kwargs["file_obj"].read()
kwargs["file_obj"].seek(0)
return {
"id": "tool-file-1",
"reference": _reference("tool-file-1"),
}
@ -57,6 +62,33 @@ def test_upload_file_from_environment_requests_signed_url_and_normalizes_output(
}
def test_upload_tool_file_resource_from_environment_preserves_tool_file_id(
monkeypatch: pytest.MonkeyPatch,
tmp_path: Path,
) -> None:
source = tmp_path / "report.pdf"
source.write_bytes(b"report-bytes")
monkeypatch.setenv("DIFY_AGENT_STUB_URL", "https://agent.example.com/agent-stub")
monkeypatch.setenv("DIFY_AGENT_STUB_AUTH_JWE", "test-jwe")
monkeypatch.setattr(
"dify_agent.agent_stub.cli._files.request_agent_stub_file_upload_sync",
lambda **_kwargs: type("Response", (), {"upload_url": "https://files.example.com/upload"})(),
)
monkeypatch.setattr(
"dify_agent.agent_stub.cli._files.upload_file_to_signed_url_sync",
lambda **_kwargs: {"id": "tool-file-1", "reference": _reference("tool-file-1")},
)
result = upload_tool_file_resource_from_environment(path=str(source))
assert result.mapping.model_dump() == {
"transfer_method": "tool_file",
"reference": _reference("tool-file-1"),
}
assert result.tool_file_id == "tool-file-1"
def test_download_file_from_environment_saves_bytes_and_renames_on_collision(
monkeypatch: pytest.MonkeyPatch,
tmp_path: Path,
@ -148,8 +180,30 @@ def test_upload_file_from_environment_rejects_non_canonical_reference(
)
monkeypatch.setattr(
"dify_agent.agent_stub.cli._files.upload_file_to_signed_url_sync",
lambda **_kwargs: {"reference": "raw-tool-file-uuid"},
lambda **_kwargs: {"id": "tool-file-1", "reference": "raw-tool-file-uuid"},
)
with pytest.raises(AgentStubTransferError, match="invalid canonical reference"):
_ = upload_file_from_environment(path=str(source))
def test_upload_tool_file_resource_from_environment_rejects_missing_id(
monkeypatch: pytest.MonkeyPatch,
tmp_path: Path,
) -> None:
source = tmp_path / "report.pdf"
source.write_bytes(b"report-bytes")
monkeypatch.setenv("DIFY_AGENT_STUB_URL", "https://agent.example.com/agent-stub")
monkeypatch.setenv("DIFY_AGENT_STUB_AUTH_JWE", "test-jwe")
monkeypatch.setattr(
"dify_agent.agent_stub.cli._files.request_agent_stub_file_upload_sync",
lambda **_kwargs: type("Response", (), {"upload_url": "https://files.example.com/upload"})(),
)
monkeypatch.setattr(
"dify_agent.agent_stub.cli._files.upload_file_to_signed_url_sync",
lambda **_kwargs: {"reference": _reference("tool-file-1")},
)
with pytest.raises(AgentStubTransferError, match="missing id"):
_ = upload_tool_file_resource_from_environment(path=str(source))

View File

@ -7,6 +7,11 @@ from pathlib import Path
import pytest
from dify_agent.agent_stub.cli.main import main
from dify_agent.agent_stub.protocol.agent_stub import (
AgentStubDriveCommitResponse,
AgentStubDriveItem,
AgentStubDriveManifestResponse,
)
from dify_agent.agent_stub.protocol.agent_stub import AgentStubConnectResponse
@ -194,3 +199,97 @@ def test_cli_file_download_prints_saved_path(
captured = capsys.readouterr()
assert exc_info.value.code == 0
assert captured.out.strip() == "/tmp/report.pdf"
def test_cli_drive_list_prints_manifest_json(
monkeypatch: pytest.MonkeyPatch,
capsys: pytest.CaptureFixture[str],
) -> None:
monkeypatch.setattr(
"dify_agent.agent_stub.cli.main.list_drive_from_environment",
lambda *, prefix, json_output: AgentStubDriveManifestResponse(
items=[
AgentStubDriveItem(
key=prefix + "example/SKILL.md",
size=12,
hash="sha256:abc",
mime_type="text/markdown",
file_kind="tool_file",
file_id="tool-file-1",
)
]
),
)
with pytest.raises(SystemExit) as exc_info:
main(["drive", "list", "skills/", "--json"])
captured = capsys.readouterr()
assert exc_info.value.code == 0
assert json.loads(captured.out)["items"][0]["key"] == "skills/example/SKILL.md"
def test_cli_drive_list_prints_human_readable_listing(
monkeypatch: pytest.MonkeyPatch,
capsys: pytest.CaptureFixture[str],
) -> None:
monkeypatch.setattr(
"dify_agent.agent_stub.cli.main.list_drive_from_environment",
lambda *, prefix, json_output: f"12\ttext/markdown\t-\t{prefix}example/SKILL.md",
)
with pytest.raises(SystemExit) as exc_info:
main(["drive", "list", "skills/"])
captured = capsys.readouterr()
assert exc_info.value.code == 0
assert captured.out.strip() == "12\ttext/markdown\t-\tskills/example/SKILL.md"
def test_cli_drive_pull_prints_downloaded_paths(
monkeypatch: pytest.MonkeyPatch,
capsys: pytest.CaptureFixture[str],
) -> None:
monkeypatch.setattr(
"dify_agent.agent_stub.cli.main.pull_drive_from_environment",
lambda *, prefix, drive_base: [Path(drive_base) / prefix / "SKILL.md", Path(drive_base) / prefix / "helper.py"],
)
with pytest.raises(SystemExit) as exc_info:
main(["drive", "pull", "skills/example", "--drive-base", "/tmp/drive"])
captured = capsys.readouterr()
assert exc_info.value.code == 0
assert captured.out.strip().splitlines() == [
"/tmp/drive/skills/example/SKILL.md",
"/tmp/drive/skills/example/helper.py",
]
def test_cli_drive_push_prints_commit_json(
monkeypatch: pytest.MonkeyPatch,
capsys: pytest.CaptureFixture[str],
) -> None:
monkeypatch.setattr(
"dify_agent.agent_stub.cli.main.push_drive_from_environment",
lambda *, local_path, drive_path, recursive: AgentStubDriveCommitResponse(
items=[
AgentStubDriveItem(
key=drive_path,
size=12,
hash=None,
mime_type="text/markdown",
file_kind="tool_file",
file_id=Path(local_path).name,
value_owned_by_drive=recursive is False,
)
]
),
)
with pytest.raises(SystemExit) as exc_info:
main(["drive", "push", "/tmp/report.md", "skills/example/SKILL.md"])
captured = capsys.readouterr()
assert exc_info.value.code == 0
assert json.loads(captured.out)["items"][0]["key"] == "skills/example/SKILL.md"

View File

@ -11,6 +11,8 @@ import pytest
from dify_agent.agent_stub.client._agent_stub import (
connect_agent_stub_sync,
download_file_bytes_from_signed_url_sync,
request_agent_stub_drive_commit_sync,
request_agent_stub_drive_manifest_sync,
request_agent_stub_file_download_sync,
request_agent_stub_file_upload_sync,
upload_file_to_signed_url_sync,
@ -23,7 +25,12 @@ from dify_agent.agent_stub.client._errors import (
AgentStubTransferError,
AgentStubValidationError,
)
from dify_agent.agent_stub.protocol.agent_stub import AgentStubFileMapping
from dify_agent.agent_stub.protocol.agent_stub import (
AgentStubDriveCommitItem,
AgentStubDriveCommitRequest,
AgentStubDriveFileRef,
AgentStubFileMapping,
)
def _reference(record_id: str) -> str:
@ -174,6 +181,140 @@ def test_request_agent_stub_file_download_sync_posts_download_request() -> None:
assert response.download_url == "https://files.example.com/download"
def test_request_agent_stub_drive_manifest_sync_gets_manifest_request() -> None:
def handler(request: httpx.Request) -> httpx.Response:
assert request.method == "GET"
assert str(request.url) == (
"https://agent.example.com/agent-stub/drive/manifest?prefix=skills%2F&include_download_url=true"
)
assert request.headers["Authorization"] == "Bearer test-jwe"
return httpx.Response(
200,
json={
"items": [
{
"key": "skills/example/SKILL.md",
"size": 12,
"hash": "sha256:abc",
"mime_type": "text/markdown",
"file_kind": "tool_file",
"file_id": "tool-file-1",
"created_at": 123,
}
]
},
)
http_client = httpx.Client(transport=httpx.MockTransport(handler))
try:
response = request_agent_stub_drive_manifest_sync(
url="https://agent.example.com/agent-stub",
auth_jwe="test-jwe",
prefix="skills/",
include_download_url=True,
sync_http_client=http_client,
)
finally:
http_client.close()
assert response.items[0].key == "skills/example/SKILL.md"
def test_request_agent_stub_drive_commit_sync_posts_commit_request() -> None:
def handler(request: httpx.Request) -> httpx.Response:
assert request.method == "POST"
assert str(request.url) == "https://agent.example.com/agent-stub/drive/commit"
assert request.headers["Authorization"] == "Bearer test-jwe"
assert json.loads(request.content) == {
"items": [
{
"key": "skills/example/SKILL.md",
"file_ref": {"kind": "tool_file", "id": "tool-file-1"},
"value_owned_by_drive": True,
}
]
}
return httpx.Response(
200,
json={
"items": [
{
"key": "skills/example/SKILL.md",
"size": 12,
"mime_type": "text/markdown",
"file_kind": "tool_file",
"file_id": "tool-file-1",
"value_owned_by_drive": True,
}
]
},
)
http_client = httpx.Client(transport=httpx.MockTransport(handler))
try:
response = request_agent_stub_drive_commit_sync(
url="https://agent.example.com/agent-stub",
auth_jwe="test-jwe",
request=AgentStubDriveCommitRequest(
items=[
AgentStubDriveCommitItem(
key="skills/example/SKILL.md",
file_ref=AgentStubDriveFileRef(kind="tool_file", id="tool-file-1"),
)
]
),
sync_http_client=http_client,
)
finally:
http_client.close()
assert response.items[0].file_id == "tool-file-1"
def test_request_agent_stub_drive_manifest_sync_maps_invalid_json_to_client_error() -> None:
def handler(_request: httpx.Request) -> httpx.Response:
return httpx.Response(200, text="not-json", headers={"Content-Type": "application/json"})
http_client = httpx.Client(transport=httpx.MockTransport(handler))
try:
with pytest.raises(AgentStubClientError, match="invalid JSON"):
_ = request_agent_stub_drive_manifest_sync(
url="https://agent.example.com/agent-stub",
auth_jwe="test-jwe",
prefix="",
include_download_url=False,
sync_http_client=http_client,
)
finally:
http_client.close()
def test_request_agent_stub_drive_commit_sync_maps_non_2xx_to_http_error() -> None:
def handler(_request: httpx.Request) -> httpx.Response:
return httpx.Response(403, json={"detail": "forbidden"})
http_client = httpx.Client(transport=httpx.MockTransport(handler))
try:
with pytest.raises(AgentStubHTTPError, match="403") as exc_info:
_ = request_agent_stub_drive_commit_sync(
url="https://agent.example.com/agent-stub",
auth_jwe="test-jwe",
request=AgentStubDriveCommitRequest(
items=[
AgentStubDriveCommitItem(
key="skills/example/SKILL.md",
file_ref=AgentStubDriveFileRef(kind="tool_file", id="tool-file-1"),
)
]
),
sync_http_client=http_client,
)
finally:
http_client.close()
assert exc_info.value.detail == "forbidden"
def test_upload_file_to_signed_url_sync_posts_multipart_file() -> None:
def handler(request: httpx.Request) -> httpx.Response:
assert request.method == "POST"
@ -322,6 +463,32 @@ def test_request_agent_stub_file_download_sync_dispatches_grpc_urls(monkeypatch:
assert response.download_url == "https://files.example.com/download"
def test_request_agent_stub_drive_manifest_sync_rejects_grpc_urls() -> None:
with pytest.raises(AgentStubValidationError, match="require an HTTP Agent Stub URL"):
_ = request_agent_stub_drive_manifest_sync(
url="grpc://agent.example.com:9091",
auth_jwe="token",
prefix="skills/",
include_download_url=False,
)
def test_request_agent_stub_drive_commit_sync_rejects_grpc_urls() -> None:
with pytest.raises(AgentStubValidationError, match="require an HTTP Agent Stub URL"):
_ = request_agent_stub_drive_commit_sync(
url="grpc://agent.example.com:9091",
auth_jwe="token",
request=AgentStubDriveCommitRequest(
items=[
AgentStubDriveCommitItem(
key="skills/example/SKILL.md",
file_ref=AgentStubDriveFileRef(kind="tool_file", id="tool-file-1"),
)
]
),
)
def test_request_agent_stub_file_upload_grpc_sync_attaches_bearer_metadata(monkeypatch: pytest.MonkeyPatch) -> None:
import dify_agent.agent_stub.client._agent_stub_grpc as grpc_module

View File

@ -5,10 +5,16 @@ import json
from typing import Literal
import pytest
from pydantic import ValidationError
from dify_agent.agent_stub.protocol.agent_stub import (
AgentStubDriveCommitItem,
AgentStubDriveCommitRequest,
AgentStubDriveFileRef,
AgentStubFileMapping,
agent_stub_connections_url,
agent_stub_drive_commit_url,
agent_stub_drive_manifest_url,
agent_stub_file_download_request_url,
agent_stub_file_upload_request_url,
normalize_agent_stub_url,
@ -39,6 +45,15 @@ def test_agent_stub_file_request_urls_handle_trailing_slash() -> None:
)
def test_agent_stub_drive_request_urls_handle_trailing_slash() -> None:
assert agent_stub_drive_manifest_url("https://agent.example.com/agent-stub/") == (
"https://agent.example.com/agent-stub/drive/manifest"
)
assert agent_stub_drive_commit_url("https://agent.example.com/agent-stub") == (
"https://agent.example.com/agent-stub/drive/commit"
)
def test_normalize_agent_stub_url_rejects_query_and_fragment() -> None:
with pytest.raises(ValueError, match="query string or fragment"):
_ = normalize_agent_stub_url("https://agent.example.com/agent-stub?x=1")
@ -98,6 +113,25 @@ def test_agent_stub_file_mapping_rejects_remote_url_with_reference() -> None:
)
def test_agent_stub_drive_commit_request_validates_file_refs() -> None:
request = AgentStubDriveCommitRequest(
items=[
AgentStubDriveCommitItem(
key="skills/example/SKILL.md",
file_ref=AgentStubDriveFileRef(kind="tool_file", id="tool-file-1"),
)
]
)
assert request.items[0].file_ref.kind == "tool_file"
with pytest.raises(ValidationError, match="tool_file"):
_ = AgentStubDriveFileRef(kind="bad_kind", id="tool-file-1") # pyright: ignore[reportArgumentType]
with pytest.raises(ValidationError, match="file_ref"):
_ = AgentStubDriveCommitItem.model_validate({"key": "skills/example/SKILL.md"})
@pytest.mark.parametrize("transfer_method", ["tool_file", "local_file", "datasource_file"])
def test_agent_stub_file_mapping_rejects_non_remote_with_url(
transfer_method: Literal["tool_file", "local_file", "datasource_file"],

View File

@ -39,6 +39,8 @@ def test_create_agent_stub_app_exposes_same_stub_routes_as_module_app() -> None:
assert "/agent-stub/connections" in created_paths
assert "/agent-stub/files/upload-request" in created_paths
assert "/agent-stub/files/download-request" in created_paths
assert "/agent-stub/drive/manifest" in created_paths
assert "/agent-stub/drive/commit" in created_paths
assert created_paths == module_paths
@ -88,3 +90,56 @@ def test_create_agent_stub_app_wires_configured_file_handler_for_upload_requests
assert response.status_code == 200
assert response.json() == {"upload_url": "https://files.example.com/upload"}
def test_create_agent_stub_app_wires_configured_drive_handler_for_manifest_requests(monkeypatch) -> None:
settings = ServerSettings(
agent_stub_url="https://agent.example.com/agent-stub",
server_secret_key=_base64url_secret(b"1" * 32),
dify_api_base_url="https://api.example.com",
dify_api_inner_api_key="inner-secret",
)
token_codec = settings.create_agent_stub_token_codec()
assert token_codec is not None
token = token_codec.encode_connection_token(
_execution_context().model_copy(update={"agent_id": "agent-1"}), now=int(time.time()) - 1
)
original_async_client = httpx.AsyncClient
def handler(request: httpx.Request) -> httpx.Response:
assert str(request.url) == (
"https://api.example.com/inner/api/drive/agent-agent-1/manifest"
"?tenant_id=tenant-1&prefix=skills%2F&include_download_url=false"
)
assert request.headers["X-Inner-Api-Key"] == "inner-secret"
return httpx.Response(
200,
json={
"items": [
{
"key": "skills/example/SKILL.md",
"size": 12,
"hash": "sha256:abc",
"mime_type": "text/markdown",
"file_kind": "tool_file",
"file_id": "tool-file-1",
}
]
},
)
monkeypatch.setattr(
"dify_agent.agent_stub.server.agent_stub_drive.httpx.AsyncClient",
lambda **kwargs: original_async_client(transport=httpx.MockTransport(handler), **kwargs),
)
client = TestClient(create_agent_stub_app(settings))
response = client.get(
"/agent-stub/drive/manifest",
headers={"Authorization": f"Bearer {token}"},
params={"prefix": "skills/"},
)
assert response.status_code == 200
assert response.json()["items"][0]["key"] == "skills/example/SKILL.md"

View File

@ -0,0 +1,266 @@
from __future__ import annotations
import asyncio
import json
import httpx
from dify_agent.agent_stub.protocol.agent_stub import (
AgentStubDriveCommitItem,
AgentStubDriveCommitRequest,
AgentStubDriveFileRef,
)
from dify_agent.agent_stub.server.agent_stub_drive import (
AgentStubDriveRequestError,
DifyApiAgentStubDriveRequestHandler,
)
from dify_agent.agent_stub.server.tokens.agent_stub import AgentStubPrincipal
from dify_agent.layers.execution_context import DifyExecutionContextLayerConfig
def _principal() -> AgentStubPrincipal:
return AgentStubPrincipal(
execution_context=DifyExecutionContextLayerConfig(
tenant_id="tenant-1",
user_id="user-1",
user_from="account",
workflow_id="workflow-1",
agent_id="agent-1",
agent_mode="workflow_run",
invoke_from="service-api",
),
session_id="session-1",
scope=["agent_stub:connect"],
token_id="token-1",
)
def _patch_async_client(monkeypatch, handler) -> None:
original_async_client = httpx.AsyncClient
monkeypatch.setattr(
"dify_agent.agent_stub.server.agent_stub_drive.httpx.AsyncClient",
lambda **kwargs: original_async_client(transport=httpx.MockTransport(handler), **kwargs),
)
def test_dify_api_agent_stub_drive_handler_injects_execution_context_for_manifest(monkeypatch) -> None:
def handler(request: httpx.Request) -> httpx.Response:
assert request.method == "GET"
assert str(request.url) == (
"https://api.example.com/inner/api/drive/agent-agent-1/manifest"
"?tenant_id=tenant-1&prefix=skills%2F&include_download_url=true"
)
assert request.headers["X-Inner-Api-Key"] == "inner-secret"
return httpx.Response(
200,
json={
"items": [
{
"key": "skills/example/SKILL.md",
"size": 12,
"hash": "sha256:abc",
"mime_type": "text/markdown",
"file_kind": "tool_file",
"file_id": "tool-file-1",
"created_at": 123,
"download_url": "https://files.example.com/download",
}
]
},
)
_patch_async_client(monkeypatch, handler)
drive_handler = DifyApiAgentStubDriveRequestHandler(
dify_api_base_url="https://api.example.com",
dify_api_inner_api_key="inner-secret",
)
async def scenario() -> None:
response = await drive_handler.get_manifest(
principal=_principal(),
prefix="skills/",
include_download_url=True,
)
assert response.items[0].download_url == "https://files.example.com/download"
asyncio.run(scenario())
def test_dify_api_agent_stub_drive_handler_injects_execution_context_for_commit(monkeypatch) -> None:
def handler(request: httpx.Request) -> httpx.Response:
assert request.method == "POST"
assert str(request.url) == "https://api.example.com/inner/api/drive/agent-agent-1/commit"
assert json.loads(request.content) == {
"tenant_id": "tenant-1",
"user_id": "user-1",
"items": [
{
"key": "skills/example/SKILL.md",
"file_ref": {"kind": "tool_file", "id": "tool-file-1"},
"value_owned_by_drive": True,
}
],
}
return httpx.Response(
200,
json={
"items": [
{
"key": "skills/example/SKILL.md",
"size": 12,
"mime_type": "text/markdown",
"file_kind": "tool_file",
"file_id": "tool-file-1",
"value_owned_by_drive": True,
}
]
},
)
_patch_async_client(monkeypatch, handler)
drive_handler = DifyApiAgentStubDriveRequestHandler(
dify_api_base_url="https://api.example.com",
dify_api_inner_api_key="inner-secret",
)
async def scenario() -> None:
response = await drive_handler.commit(
principal=_principal(),
request=AgentStubDriveCommitRequest(
items=[
AgentStubDriveCommitItem(
key="skills/example/SKILL.md",
file_ref=AgentStubDriveFileRef(kind="tool_file", id="tool-file-1"),
)
]
),
)
assert response.items[0].value_owned_by_drive is True
asyncio.run(scenario())
def test_dify_api_agent_stub_drive_handler_rejects_missing_agent_id() -> None:
drive_handler = DifyApiAgentStubDriveRequestHandler(
dify_api_base_url="https://api.example.com",
dify_api_inner_api_key="inner-secret",
)
principal = _principal()
principal.execution_context = principal.execution_context.model_copy(update={"agent_id": None})
async def scenario() -> None:
try:
await drive_handler.get_manifest(principal=principal, prefix="", include_download_url=False)
except AgentStubDriveRequestError as exc:
assert exc.status_code == 400
assert "agent_id" in str(exc)
else:
raise AssertionError("expected AgentStubDriveRequestError")
asyncio.run(scenario())
def test_dify_api_agent_stub_drive_handler_rejects_missing_user_id_for_commit() -> None:
drive_handler = DifyApiAgentStubDriveRequestHandler(
dify_api_base_url="https://api.example.com",
dify_api_inner_api_key="inner-secret",
)
principal = _principal()
principal.execution_context = principal.execution_context.model_copy(update={"user_id": None})
async def scenario() -> None:
try:
await drive_handler.commit(
principal=principal,
request=AgentStubDriveCommitRequest(
items=[
AgentStubDriveCommitItem(
key="skills/example/SKILL.md",
file_ref=AgentStubDriveFileRef(kind="tool_file", id="tool-file-1"),
)
]
),
)
except AgentStubDriveRequestError as exc:
assert exc.status_code == 400
assert "user_id" in str(exc)
else:
raise AssertionError("expected AgentStubDriveRequestError")
asyncio.run(scenario())
def test_dify_api_agent_stub_drive_handler_maps_invalid_json_response(monkeypatch) -> None:
def handler(_request: httpx.Request) -> httpx.Response:
return httpx.Response(200, text="not-json", headers={"Content-Type": "application/json"})
_patch_async_client(monkeypatch, handler)
drive_handler = DifyApiAgentStubDriveRequestHandler(
dify_api_base_url="https://api.example.com",
dify_api_inner_api_key="inner-secret",
)
async def scenario() -> None:
try:
await drive_handler.get_manifest(principal=_principal(), prefix="skills/", include_download_url=False)
except AgentStubDriveRequestError as exc:
assert exc.status_code == 502
assert exc.detail == "Dify API drive request returned invalid JSON"
else:
raise AssertionError("expected AgentStubDriveRequestError")
asyncio.run(scenario())
def test_dify_api_agent_stub_drive_handler_rejects_malformed_success_payload(monkeypatch) -> None:
def handler(_request: httpx.Request) -> httpx.Response:
return httpx.Response(200, json={"unexpected": []})
_patch_async_client(monkeypatch, handler)
drive_handler = DifyApiAgentStubDriveRequestHandler(
dify_api_base_url="https://api.example.com",
dify_api_inner_api_key="inner-secret",
)
async def scenario() -> None:
try:
await drive_handler.get_manifest(principal=_principal(), prefix="skills/", include_download_url=False)
except AgentStubDriveRequestError as exc:
assert exc.status_code == 502
assert exc.detail == "Dify API drive manifest response is invalid"
else:
raise AssertionError("expected AgentStubDriveRequestError")
asyncio.run(scenario())
def test_dify_api_agent_stub_drive_handler_preserves_non_2xx_detail(monkeypatch) -> None:
def handler(_request: httpx.Request) -> httpx.Response:
return httpx.Response(404, json={"code": "source_not_found", "message": "missing file"})
_patch_async_client(monkeypatch, handler)
drive_handler = DifyApiAgentStubDriveRequestHandler(
dify_api_base_url="https://api.example.com",
dify_api_inner_api_key="inner-secret",
)
async def scenario() -> None:
try:
await drive_handler.commit(
principal=_principal(),
request=AgentStubDriveCommitRequest(
items=[
AgentStubDriveCommitItem(
key="skills/example/SKILL.md",
file_ref=AgentStubDriveFileRef(kind="tool_file", id="tool-file-1"),
)
]
),
)
except AgentStubDriveRequestError as exc:
assert exc.status_code == 404
assert exc.detail == {"code": "source_not_found", "message": "missing file"}
else:
raise AssertionError("expected AgentStubDriveRequestError")
asyncio.run(scenario())

View File

@ -8,7 +8,14 @@ from typing import cast
from fastapi import FastAPI
from fastapi.testclient import TestClient
from dify_agent.agent_stub.protocol.agent_stub import AgentStubFileDownloadResponse, AgentStubFileUploadResponse
from dify_agent.agent_stub.protocol.agent_stub import (
AgentStubDriveCommitResponse,
AgentStubDriveItem,
AgentStubDriveManifestResponse,
AgentStubFileDownloadResponse,
AgentStubFileUploadResponse,
)
from dify_agent.agent_stub.server.agent_stub_drive import AgentStubDriveRequestError, AgentStubDriveRequestHandler
from dify_agent.agent_stub.server.agent_stub_files import AgentStubFileRequestError, AgentStubFileRequestHandler
from dify_agent.agent_stub.server.routes.agent_stub import create_agent_stub_http_router
from dify_agent.agent_stub.server.tokens.agent_stub import AgentStubTokenCodec
@ -261,3 +268,137 @@ def test_agent_stub_file_route_preserves_structured_handler_error_details() -> N
assert response.status_code == 400
assert response.json()["detail"] == {"detail": "bad request", "code": "inner_api_error"}
def test_agent_stub_drive_manifest_route_forwards_authenticated_request() -> None:
codec = _token_codec()
token = codec.encode_connection_token(_execution_context(), now=int(time.time()) - 1)
class FakeDriveHandler:
async def get_manifest(self, *, principal, prefix, include_download_url):
assert principal.execution_context.user_id == "user-1"
assert prefix == "skills/"
assert include_download_url is True
return AgentStubDriveManifestResponse(
items=[
AgentStubDriveItem(
key="skills/example/SKILL.md",
size=12,
hash="sha256:abc",
mime_type="text/markdown",
file_kind="tool_file",
file_id="tool-file-1",
created_at=123,
download_url="https://files.example.com/download",
)
]
)
async def commit(self, *, principal, request):
del principal, request
raise AssertionError("unexpected commit request")
drive_handler = cast(AgentStubDriveRequestHandler, cast(object, FakeDriveHandler()))
app = FastAPI()
app.include_router(create_agent_stub_http_router(codec, None, drive_handler))
client = TestClient(app)
response = client.get(
"/agent-stub/drive/manifest",
headers={"Authorization": f"Bearer {token}"},
params={"prefix": "skills/", "include_download_url": "true"},
)
assert response.status_code == 200
assert response.json()["items"][0]["key"] == "skills/example/SKILL.md"
def test_agent_stub_drive_commit_route_forwards_authenticated_request() -> None:
codec = _token_codec()
token = codec.encode_connection_token(_execution_context(), now=int(time.time()) - 1)
class FakeDriveHandler:
async def commit(self, *, principal, request):
assert principal.execution_context.user_id == "user-1"
assert request.items[0].file_ref.id == "tool-file-1"
return AgentStubDriveCommitResponse(
items=[
AgentStubDriveItem(
key="skills/example/SKILL.md",
size=12,
hash=None,
mime_type="text/markdown",
file_kind="tool_file",
file_id="tool-file-1",
value_owned_by_drive=True,
)
]
)
async def get_manifest(self, *, principal, prefix, include_download_url):
del principal, prefix, include_download_url
raise AssertionError("unexpected manifest request")
drive_handler = cast(AgentStubDriveRequestHandler, cast(object, FakeDriveHandler()))
app = FastAPI()
app.include_router(create_agent_stub_http_router(codec, None, drive_handler))
client = TestClient(app)
response = client.post(
"/agent-stub/drive/commit",
headers={"Authorization": f"Bearer {token}"},
json={"items": [{"key": "skills/example/SKILL.md", "file_ref": {"kind": "tool_file", "id": "tool-file-1"}}]},
)
assert response.status_code == 200
assert response.json()["items"][0]["file_id"] == "tool-file-1"
def test_agent_stub_drive_routes_return_503_when_drive_api_is_unconfigured() -> None:
codec = _token_codec()
token = codec.encode_connection_token(_execution_context(), now=int(time.time()) - 1)
app = FastAPI()
app.include_router(create_agent_stub_http_router(codec, None, None))
client = TestClient(app)
manifest_response = client.get(
"/agent-stub/drive/manifest",
headers={"Authorization": f"Bearer {token}"},
)
commit_response = client.post(
"/agent-stub/drive/commit",
headers={"Authorization": f"Bearer {token}"},
json={"items": [{"key": "skills/example/SKILL.md", "file_ref": {"kind": "tool_file", "id": "tool-file-1"}}]},
)
assert manifest_response.status_code == 503
assert commit_response.status_code == 503
assert manifest_response.json()["detail"] == "Agent Stub drive API is not configured"
assert commit_response.json()["detail"] == "Agent Stub drive API is not configured"
def test_agent_stub_drive_route_preserves_structured_handler_error_details() -> None:
codec = _token_codec()
token = codec.encode_connection_token(_execution_context(), now=int(time.time()) - 1)
class FakeDriveHandler:
async def get_manifest(self, *, principal, prefix, include_download_url):
del principal, prefix, include_download_url
raise AgentStubDriveRequestError(400, {"code": "invalid_key", "message": "bad request"})
async def commit(self, *, principal, request):
del principal, request
raise AssertionError("unexpected commit request")
drive_handler = cast(AgentStubDriveRequestHandler, cast(object, FakeDriveHandler()))
app = FastAPI()
app.include_router(create_agent_stub_http_router(codec, None, drive_handler))
client = TestClient(app)
response = client.get(
"/agent-stub/drive/manifest",
headers={"Authorization": f"Bearer {token}"},
)
assert response.status_code == 400
assert response.json()["detail"] == {"code": "invalid_key", "message": "bad request"}

View File

@ -262,6 +262,10 @@ def test_create_app_creates_scheduler_and_closes_after_shutdown(monkeypatch: pyt
getattr(route, "path", None) == "/agent-stub/files/download-request"
for route in create_app(settings).routes
)
assert any(
getattr(route, "path", None) == "/agent-stub/drive/manifest" for route in create_app(settings).routes
)
assert any(getattr(route, "path", None) == "/agent-stub/drive/commit" for route in create_app(settings).routes)
assert FakeRunScheduler.created[0].shutdown_called is True
assert FakeRunScheduler.created[0].dify_api_http_client.is_closed is True
@ -334,6 +338,64 @@ def test_create_app_wires_authenticated_agent_stub_file_upload_route(monkeypatch
assert fake_redis.closed is True
def test_create_app_wires_authenticated_agent_stub_drive_manifest_route(monkeypatch: pytest.MonkeyPatch) -> None:
fake_redis, fake_http_client = _patch_app_lifecycle(monkeypatch)
settings = ServerSettings(
redis_url="redis://example.invalid/0",
agent_stub_url="https://agent.example.com/agent-stub",
server_secret_key=_base64url_secret(b"1" * 32),
dify_api_base_url="https://api.example.com",
dify_api_inner_api_key="inner-secret",
)
token_codec = settings.create_agent_stub_token_codec()
assert token_codec is not None
token = token_codec.encode_connection_token(
_execution_context().model_copy(update={"agent_id": "agent-1"}), now=int(time.time()) - 1
)
original_async_client = httpx.AsyncClient
def handler(request: httpx.Request) -> httpx.Response:
assert str(request.url) == (
"https://api.example.com/inner/api/drive/agent-agent-1/manifest"
"?tenant_id=tenant-1&prefix=skills%2F&include_download_url=false"
)
assert request.headers["X-Inner-Api-Key"] == "inner-secret"
return httpx.Response(
200,
json={
"items": [
{
"key": "skills/example/SKILL.md",
"size": 12,
"hash": "sha256:abc",
"mime_type": "text/markdown",
"file_kind": "tool_file",
"file_id": "tool-file-1",
}
]
},
)
monkeypatch.setattr(
"dify_agent.agent_stub.server.agent_stub_drive.httpx.AsyncClient",
lambda **kwargs: original_async_client(transport=httpx.MockTransport(handler), **kwargs),
)
with TestClient(create_app(settings)) as client:
response = client.get(
"/agent-stub/drive/manifest",
headers={"Authorization": f"Bearer {token}"},
params={"prefix": "skills/"},
)
assert response.status_code == 200
assert response.json()["items"][0]["key"] == "skills/example/SKILL.md"
assert FakeRunScheduler.created[0].shutdown_called is True
assert fake_http_client.is_closed is True
assert fake_redis.closed is True
def test_create_app_starts_and_stops_agent_stub_grpc_server_for_grpc_url(monkeypatch: pytest.MonkeyPatch) -> None:
fake_redis, fake_http_client = _patch_app_lifecycle(monkeypatch)
started: dict[str, object] = {}
@ -380,7 +442,6 @@ def test_create_plugin_daemon_http_client_uses_generic_outbound_httpx_constructi
)
assert isinstance(client, FakePluginDaemonHttpClient)
assert isinstance(client.timeout, FakeTimeout)
assert client.timeout.connect == 1
assert client.timeout.read == 2
assert client.timeout.write == 3
@ -410,7 +471,6 @@ def test_create_dify_api_inner_http_client_uses_generic_outbound_httpx_construct
)
assert isinstance(client, FakePluginDaemonHttpClient)
assert isinstance(client.timeout, FakeTimeout)
assert client.timeout.connect == 1
assert client.timeout.read == 2
assert client.timeout.write == 3

View File

@ -2,10 +2,13 @@ from __future__ import annotations
from pathlib import Path
import secrets
from typing import cast
import httpx
import pytest
from pydantic import ValidationError
from dify_agent.agent_stub.server.agent_stub_drive import DifyApiAgentStubDriveRequestHandler
from dify_agent.agent_stub.server.agent_stub_files import DifyApiAgentStubFileRequestHandler
from dify_agent.agent_stub.server.tokens.agent_stub import AgentStubTokenCodec
from dify_agent.server.settings import ServerSettings
@ -179,3 +182,29 @@ def test_server_settings_create_agent_stub_file_request_handler_returns_handler_
assert isinstance(handler, DifyApiAgentStubFileRequestHandler)
assert handler.dify_api_base_url == "https://api.example.com"
assert handler.dify_api_inner_api_key == "inner-secret"
def test_server_settings_create_agent_stub_drive_request_handler_returns_none_without_full_settings() -> None:
assert ServerSettings().create_agent_stub_drive_request_handler() is None
def test_server_settings_create_agent_stub_drive_request_handler_returns_handler_when_configured() -> None:
settings = ServerSettings(
dify_api_base_url="https://api.example.com",
dify_api_inner_api_key="inner-secret",
outbound_http_connect_timeout=11,
outbound_http_read_timeout=22,
outbound_http_write_timeout=33,
outbound_http_pool_timeout=44,
)
handler = settings.create_agent_stub_drive_request_handler()
assert isinstance(handler, DifyApiAgentStubDriveRequestHandler)
assert handler.dify_api_base_url == "https://api.example.com"
assert handler.dify_api_inner_api_key == "inner-secret"
timeout = cast(httpx.Timeout, handler.timeout)
assert timeout.connect == 11
assert timeout.read == 22
assert timeout.write == 33
assert timeout.pool == 44