From 65e89520c079892df82219db233532c13bda7a35 Mon Sep 17 00:00:00 2001 From: Harry Date: Tue, 10 Mar 2026 17:11:41 +0800 Subject: [PATCH] refactor: unify download item types and eliminate extension-based branching MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Merge AssetDownloadItem, AssetInlineItem into SandboxDownloadItem with optional 'content' field. All consumers now follow a clean pipeline: get items → accessor.resolve_items() → AppAssetService.to_download_items() → download Key changes: - SandboxDownloadItem gains content: bytes | None (entities.py) - ZipSandbox.download_items() handles both inline (base64 heredoc) and remote (curl) via a single pipeline — no structural branching - AssetDownloadService.build_download_script() takes unified list - CachedContentAccessor.resolve_items() batch-enriches items from DB (extension-agnostic, no 'if md' checks needed) - AppAssetService.to_download_items() converts AssetItem → SandboxDownloadItem - DraftAppAssetsInitializer, package_and_upload, export_bundle simplified - file_upload/node.py switched to SandboxDownloadItem - Deleted AssetDownloadItem and AssetInlineItem classes --- api/core/app/entities/app_asset_entities.py | 3 + api/core/app_assets/accessor.py | 59 ++++--- api/core/app_assets/builder/skill_builder.py | 24 ++- api/core/app_assets/entities/assets.py | 12 +- api/core/app_assets/storage.py | 9 -- .../initializer/dify_cli_initializer.py | 4 +- .../draft_app_assets_initializer.py | 61 +++++--- .../services/asset_download_service.py | 146 +++++++++++++----- api/core/workflow/nodes/file_upload/node.py | 12 +- api/core/zip_sandbox/__init__.py | 17 +- api/core/zip_sandbox/entities.py | 39 +++++ api/core/zip_sandbox/zip_sandbox.py | 46 +++--- api/services/app_asset_package_service.py | 67 +++----- api/services/app_asset_service.py | 43 ++++++ api/services/app_bundle_service.py | 16 +- api/services/sandbox/sandbox_service.py | 19 ++- .../core/app_assets/test_storage.py | 6 - api/tests/unit_tests/core/sandbox/__init__.py | 0 .../services/test_asset_download_service.py | 123 +++++++++++++-- 19 files changed, 492 insertions(+), 214 deletions(-) create mode 100644 api/core/zip_sandbox/entities.py create mode 100644 api/tests/unit_tests/core/sandbox/__init__.py diff --git a/api/core/app/entities/app_asset_entities.py b/api/core/app/entities/app_asset_entities.py index b18f0cfb9f..aaf6dffdce 100644 --- a/api/core/app/entities/app_asset_entities.py +++ b/api/core/app/entities/app_asset_entities.py @@ -347,3 +347,6 @@ class AppAssetFileTree(BaseModel): build_view(root_node, "") return [tree_views[n.id] for n in by_parent.get(None, [])] + + def empty(self) -> bool: + return len(self.nodes) == 0 diff --git a/api/core/app_assets/accessor.py b/api/core/app_assets/accessor.py index a09ff692e8..e32e02c2c3 100644 --- a/api/core/app_assets/accessor.py +++ b/api/core/app_assets/accessor.py @@ -6,12 +6,9 @@ All methods accept an AppAssetNode parameter to identify the target. CachedContentAccessor is the primary entry point: - Reads DB first, misses fall through to S3 with sync backfill. - Writes go to both DB and S3 (dual-write). -- Wraps an internal StorageContentAccessor for S3 I/O. - -Public helper: -- should_mirror(extension) — the ONLY place that maps file extensions to the - "should this node use DB mirror?" decision. All callers (presigned-upload - gating, etc.) should use this function instead of hard-coding extension checks. +- resolve_items() batch-enriches AssetItem lists with DB-cached content + (extension-agnostic), so callers never need to filter by extension. +- Wraps an internal _StorageAccessor for S3 I/O. Collaborators: - services.asset_content_service.AssetContentService (DB layer) @@ -24,29 +21,13 @@ from __future__ import annotations import logging from core.app.entities.app_asset_entities import AppAssetNode +from core.app_assets.entities.assets import AssetItem from core.app_assets.storage import AssetPaths from extensions.storage.cached_presign_storage import CachedPresignStorage from services.asset_content_service import AssetContentService logger = logging.getLogger(__name__) -# --------------------------------------------------------------------------- -# Extension-based policy — the single source of truth -# --------------------------------------------------------------------------- - -_MIRROR_EXTENSIONS: frozenset[str] = frozenset({"md"}) - - -def should_mirror(extension: str) -> bool: - """Return True if files with *extension* should be cached in DB. - - This is the ONLY place that maps file extensions to the inline-mirror - decision. All other modules should call this function instead of - checking extensions directly. - """ - return extension.lower() in _MIRROR_EXTENSIONS - - # --------------------------------------------------------------------------- # S3-only implementation (internal, used as inner delegate) # --------------------------------------------------------------------------- @@ -162,6 +143,38 @@ class CachedContentAccessor: ) self._inner.save(node, content) + def resolve_items(self, items: list[AssetItem]) -> list[AssetItem]: + """Batch-enrich asset items with DB-cached content. + + Queries by ``asset_id`` only — extension-agnostic. Items without + a DB cache row keep their original *content* value (typically + ``None``), so only genuinely cached assets (e.g. ``.md`` skill + documents) get populated. + + This eliminates the need for callers to filter by file extension + before deciding whether to read from the DB cache. + """ + if not items: + return items + + node_ids = [a.asset_id for a in items] + cached = AssetContentService.get_many(self._tenant_id, self._app_id, node_ids) + + if not cached: + return items + + return [ + AssetItem( + asset_id=a.asset_id, + path=a.path, + file_name=a.file_name, + extension=a.extension, + storage_key=a.storage_key, + content=cached[a.asset_id].encode("utf-8") if a.asset_id in cached else a.content, + ) + for a in items + ] + def delete(self, node: AppAssetNode) -> None: AssetContentService.delete(self._tenant_id, self._app_id, node.id) self._inner.delete(node) diff --git a/api/core/app_assets/builder/skill_builder.py b/api/core/app_assets/builder/skill_builder.py index 174b9f95cd..a39e5891c2 100644 --- a/api/core/app_assets/builder/skill_builder.py +++ b/api/core/app_assets/builder/skill_builder.py @@ -1,14 +1,25 @@ +"""Builder that compiles ``.md`` skill documents into resolved content. + +The builder reads raw draft content from the DB-backed accessor, parses +each into a ``SkillDocument``, assembles a ``SkillBundle`` (with +transitive tool/file dependency resolution), and returns ``AssetItem`` +objects whose *content* field carries the resolved bytes in-process. + +No S3 writes happen here — the only persistence is the ``SkillBundle`` +saved via ``SkillManager`` (S3 + Redis cache invalidation) so that +downstream consumers (``SkillInitializer``, ``DifyCliInitializer``) can +load it later. +""" + import json import logging from core.app.entities.app_asset_entities import AppAssetFileTree, AppAssetNode from core.app_assets.accessor import CachedContentAccessor from core.app_assets.entities import AssetItem -from core.app_assets.storage import AssetPaths from core.skill.assembler import SkillBundleAssembler from core.skill.entities.skill_bundle import SkillBundle from core.skill.entities.skill_document import SkillDocument -from extensions.storage.base_storage import BaseStorage from .base import BuildContext @@ -18,12 +29,10 @@ logger = logging.getLogger(__name__) class SkillBuilder: _nodes: list[tuple[AppAssetNode, str]] _accessor: CachedContentAccessor - _storage: BaseStorage - def __init__(self, accessor: CachedContentAccessor, storage: BaseStorage) -> None: + def __init__(self, accessor: CachedContentAccessor) -> None: self._nodes = [] self._accessor = accessor - self._storage = storage def accept(self, node: AppAssetNode) -> bool: return node.extension == "md" @@ -66,15 +75,14 @@ class SkillBuilder: skill = bundle.get(node.id) if skill is None: continue - storage_key = AssetPaths.resolved(ctx.tenant_id, ctx.app_id, ctx.build_id, node.id) - self._storage.save(storage_key, skill.content.encode("utf-8")) items.append( AssetItem( asset_id=node.id, path=path, file_name=node.name, extension=node.extension or "", - storage_key=storage_key, + storage_key="", + content=skill.content.encode("utf-8"), ) ) return items diff --git a/api/core/app_assets/entities/assets.py b/api/core/app_assets/entities/assets.py index bab1c717a8..fdd0c89768 100644 --- a/api/core/app_assets/entities/assets.py +++ b/api/core/app_assets/entities/assets.py @@ -1,10 +1,20 @@ -from dataclasses import dataclass +from dataclasses import dataclass, field @dataclass class AssetItem: + """A single asset file produced by the build pipeline. + + When *content* is set the payload is available in-process and can be + written directly into a ZIP or uploaded to a sandbox VM without an + extra S3 round-trip. When *content* is ``None`` the caller should + fetch the bytes from *storage_key* (the traditional presigned-URL + path). + """ + asset_id: str path: str file_name: str extension: str storage_key: str + content: bytes | None = field(default=None, repr=False) diff --git a/api/core/app_assets/storage.py b/api/core/app_assets/storage.py index 16d89762b4..380f8daef1 100644 --- a/api/core/app_assets/storage.py +++ b/api/core/app_assets/storage.py @@ -37,15 +37,6 @@ class AssetPaths: _check_uuid(assets_id, "assets_id") return f"{_BASE}/{tenant_id}/{app_id}/artifacts/{assets_id}.zip" - @staticmethod - def resolved(tenant_id: str, app_id: str, assets_id: str, node_id: str) -> str: - """app_assets/{tenant}/{app}/artifacts/{assets_id}/resolved/{node_id}""" - _check_uuid(tenant_id, "tenant_id") - _check_uuid(app_id, "app_id") - _check_uuid(assets_id, "assets_id") - _check_uuid(node_id, "node_id") - return f"{_BASE}/{tenant_id}/{app_id}/artifacts/{assets_id}/resolved/{node_id}" - @staticmethod def skill_bundle(tenant_id: str, app_id: str, assets_id: str) -> str: """app_assets/{tenant}/{app}/artifacts/{assets_id}/skill_artifact_set.json""" diff --git a/api/core/sandbox/initializer/dify_cli_initializer.py b/api/core/sandbox/initializer/dify_cli_initializer.py index e9c98233ad..7e6dbf1ad1 100644 --- a/api/core/sandbox/initializer/dify_cli_initializer.py +++ b/api/core/sandbox/initializer/dify_cli_initializer.py @@ -13,12 +13,12 @@ from core.virtual_environment.__base.helpers import pipeline from ..bash.dify_cli import DifyCliConfig, DifyCliLocator from ..entities import DifyCli -from .base import AsyncSandboxInitializer +from .base import SyncSandboxInitializer logger = logging.getLogger(__name__) -class DifyCliInitializer(AsyncSandboxInitializer): +class DifyCliInitializer(SyncSandboxInitializer): def __init__( self, tenant_id: str, diff --git a/api/core/sandbox/initializer/draft_app_assets_initializer.py b/api/core/sandbox/initializer/draft_app_assets_initializer.py index 6ab4ca06af..f8163ff6e4 100644 --- a/api/core/sandbox/initializer/draft_app_assets_initializer.py +++ b/api/core/sandbox/initializer/draft_app_assets_initializer.py @@ -1,23 +1,46 @@ +"""Async initializer that populates a draft sandbox with app asset files. + +Unlike ``AppAssetsInitializer`` (which downloads a pre-built ZIP for +published assets), this initializer runs the build pipeline on the fly +so that ``.md`` skill documents are compiled and their resolved content +is embedded directly into the download script — avoiding the S3 +round-trip that was previously required for resolved keys. + +Execution order guarantee: + This runs as an ``AsyncSandboxInitializer`` in the background thread. + By the time it finishes, ``SkillManager.save_bundle()`` has been + called (inside ``SkillBuilder.build()``), so subsequent initializers + like ``DifyCliInitializer`` can safely load the bundle from Redis/S3. +""" + import logging -from core.app_assets.accessor import should_mirror +from core.app_assets.builder.base import BuildContext +from core.app_assets.builder.file_builder import FileBuilder +from core.app_assets.builder.pipeline import AssetBuildPipeline +from core.app_assets.builder.skill_builder import SkillBuilder from core.app_assets.constants import AppAssetsAttrs -from core.app_assets.storage import AssetPaths from core.sandbox.entities import AppAssets from core.sandbox.sandbox import Sandbox from core.sandbox.services import AssetDownloadService -from core.sandbox.services.asset_download_service import AssetDownloadItem from core.virtual_environment.__base.helpers import pipeline from services.app_asset_service import AppAssetService -from .base import AsyncSandboxInitializer +from .base import SyncSandboxInitializer logger = logging.getLogger(__name__) _TIMEOUT = 600 # 10 minutes -class DraftAppAssetsInitializer(AsyncSandboxInitializer): +class DraftAppAssetsInitializer(SyncSandboxInitializer): + """Compile draft assets and push them into the sandbox VM. + + ``.md`` (skill) files are compiled in-process and their resolved + content is embedded as base64 heredocs in the download script. + All other files are fetched from S3 via presigned URLs. + """ + def __init__(self, tenant_id: str, app_id: str, assets_id: str) -> None: self._tenant_id = tenant_id self._app_id = app_id @@ -25,22 +48,22 @@ class DraftAppAssetsInitializer(AsyncSandboxInitializer): def initialize(self, sandbox: Sandbox) -> None: vm = sandbox.vm - build_id = self._assets_id tree = sandbox.attrs.get(AppAssetsAttrs.FILE_TREE) - asset_storage = AppAssetService.get_storage() - nodes = list(tree.walk_files()) - if not nodes: + if tree.empty(): return - # Inline-mirror nodes use the resolved (compiled) key; others use draft. - keys = [ - AssetPaths.resolved(self._tenant_id, self._app_id, build_id, node.id) - if should_mirror(node.extension) - else AssetPaths.draft(self._tenant_id, self._app_id, node.id) - for node in nodes - ] - urls = asset_storage.get_download_urls(keys, _TIMEOUT) - items = [AssetDownloadItem(path=tree.get_path(node.id).lstrip("/"), url=url) for node, url in zip(nodes, urls)] - script = AssetDownloadService.build_download_script(items, AppAssets.PATH) + + # --- 1. Run the build pipeline (SkillBuilder compiles .md inline) --- + accessor = AppAssetService.get_accessor(self._tenant_id, self._app_id) + build_pipeline = AssetBuildPipeline([SkillBuilder(accessor=accessor), FileBuilder()]) + ctx = BuildContext(tenant_id=self._tenant_id, app_id=self._app_id, build_id=self._assets_id) + built_assets = build_pipeline.build_all(tree, ctx) + + if not built_assets: + return + + # --- 2. Convert to unified download items and execute --- + download_items = AppAssetService.to_download_items(built_assets) + script = AssetDownloadService.build_download_script(download_items, AppAssets.PATH) pipeline(vm).add( ["sh", "-c", script], error_message="Failed to download draft assets", diff --git a/api/core/sandbox/services/asset_download_service.py b/api/core/sandbox/services/asset_download_service.py index 589b2ce65a..96a05526f2 100644 --- a/api/core/sandbox/services/asset_download_service.py +++ b/api/core/sandbox/services/asset_download_service.py @@ -1,11 +1,46 @@ +"""Shell script builder for downloading / writing assets into a sandbox VM. + +Generates a self-contained POSIX shell script that handles two kinds of +``SandboxDownloadItem``: + +- Items with *content* — written via base64 heredoc (sequential). +- Items with *url* — fetched via ``curl``/``wget``/``python3`` with + auto-detection, run as parallel background jobs. + +Both kinds can be mixed freely in a single call. +""" + from __future__ import annotations +import base64 import shlex import textwrap -from dataclasses import dataclass +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from core.zip_sandbox.entities import SandboxDownloadItem -def _render_download_script(root_path: str, download_commands: str) -> str: +def _build_inline_commands(items: list[SandboxDownloadItem], root_var: str) -> str: + """Generate shell commands that write base64-encoded content to files.""" + lines: list[str] = [] + for idx, item in enumerate(items): + assert item.content is not None + dest = f"${{{root_var}}}/{shlex.quote(item.path)}" + encoded = base64.b64encode(item.content).decode("ascii") + lines.append(f'mkdir -p "$(dirname "{dest}")"') + lines.append(f"base64 -d <<'_INLINE_{idx}' > \"{dest}\"") + lines.append(encoded) + lines.append(f"_INLINE_{idx}") + return "\n".join(lines) + + +def _render_download_script( + root_path: str, + inline_commands: str, + download_commands: str, + need_downloader: bool, +) -> str: python_download_cmd = ( 'python3 - "${url}" "${dest}" <<"PY"\n' "import sys\n" @@ -18,59 +53,88 @@ def _render_download_script(root_path: str, download_commands: str) -> str: " f.write(data)\n" "PY" ) - script = f""" - download_root={shlex.quote(root_path)} - if command -v curl >/dev/null 2>&1; then - download_cmd='curl -fsSL "${{url}}" -o "${{dest}}"' - elif command -v wget >/dev/null 2>&1; then - download_cmd='wget -q "${{url}}" -O "${{dest}}"' - elif command -v python3 >/dev/null 2>&1; then - download_cmd={shlex.quote(python_download_cmd)} - else - echo 'No downloader found (curl/wget/python3)' >&2 - exit 1 - fi + # Only emit the downloader-detection block when there are remote items. + if need_downloader: + downloader_block = f"""\ +if command -v curl >/dev/null 2>&1; then + download_cmd='curl -fsSL "${{url}}" -o "${{dest}}"' +elif command -v wget >/dev/null 2>&1; then + download_cmd='wget -q "${{url}}" -O "${{dest}}"' +elif command -v python3 >/dev/null 2>&1; then + download_cmd={shlex.quote(python_download_cmd)} +else + echo 'No downloader found (curl/wget/python3)' >&2 + exit 1 +fi - mkdir -p "${{download_root}}" - fail_log="$(mktemp)" +fail_log="$(mktemp)" - download_one() {{ - file_path="$1" - url="$2" - dest="${{download_root}}/${{file_path}}" - mkdir -p "$(dirname "${{dest}}")" - eval "${{download_cmd}}" 2>/dev/null || echo "${{file_path}}" >> "${{fail_log}}" - }} +download_one() {{ + file_path="$1" + url="$2" + dest="${{download_root}}/${{file_path}}" + mkdir -p "$(dirname "${{dest}}")" + eval "${{download_cmd}}" 2>/dev/null || echo "${{file_path}}" >> "${{fail_log}}" +}}""" + else: + downloader_block = "" - {download_commands} + # The failure-check block is only meaningful when downloads occurred. + if need_downloader: + wait_block = textwrap.dedent("""\ + wait - wait + if [ -s "${fail_log}" ]; then + mv "${fail_log}" "${download_root}/DOWNLOAD_FAILURES.txt" + else + rm -f "${fail_log}" + fi""") + else: + wait_block = "" - if [ -s "${{fail_log}}" ]; then - mv "${{fail_log}}" "${{download_root}}/DOWNLOAD_FAILURES.txt" - else - rm -f "${{fail_log}}" - fi - exit 0 - """ - return textwrap.dedent(script).strip() + script = f"""\ +download_root={shlex.quote(root_path)} +mkdir -p "${{download_root}}" +{downloader_block} -@dataclass(frozen=True) -class AssetDownloadItem: - path: str - url: str +{inline_commands} + +{download_commands} + +{wait_block} +exit 0""" + return script class AssetDownloadService: @staticmethod - def build_download_script(items: list[AssetDownloadItem], root_path: str) -> str: - # Build a portable shell script to download assets in parallel. + def build_download_script( + items: list[SandboxDownloadItem], + root_path: str, + ) -> str: + """Build a portable shell script to write inline assets and download remote ones. + + Items with *content* are written first (sequential base64 decode), + then items with *url* are fetched in parallel background jobs. + The two kinds can be mixed freely in a single list. + """ + inline = [item for item in items if item.content is not None] + remote = [item for item in items if item.content is None] + + inline_commands = _build_inline_commands(inline, "download_root") if inline else "" + commands: list[str] = [] - for item in items: + for item in remote: path = shlex.quote(item.path) url = shlex.quote(item.url) commands.append(f"download_one {path} {url} &") download_commands = "\n".join(commands) - return _render_download_script(root_path, download_commands) + + return _render_download_script( + root_path, + inline_commands, + download_commands, + need_downloader=bool(remote), + ) diff --git a/api/core/workflow/nodes/file_upload/node.py b/api/core/workflow/nodes/file_upload/node.py index 4b10ddc5bf..ae5a4eb8b9 100644 --- a/api/core/workflow/nodes/file_upload/node.py +++ b/api/core/workflow/nodes/file_upload/node.py @@ -7,7 +7,6 @@ from typing import Any, cast from core.file import File, FileTransferMethod from core.sandbox.bash.session import SANDBOX_READY_TIMEOUT -from core.sandbox.services.asset_download_service import AssetDownloadItem from core.variables import ArrayFileSegment from core.variables.segments import ArrayStringSegment, FileSegment from core.virtual_environment.__base.command_future import CommandCancelledError, CommandTimeoutError @@ -15,6 +14,7 @@ from core.virtual_environment.__base.helpers import pipeline from core.workflow.enums import NodeType, WorkflowNodeExecutionStatus from core.workflow.node_events import NodeRunResult from core.workflow.nodes.base.node import Node +from core.zip_sandbox import SandboxDownloadItem from .entities import FileUploadNodeData from .exc import FileUploadDownloadError, FileUploadNodeError @@ -90,7 +90,7 @@ class FileUploadNode(Node[FileUploadNodeData]): try: sandbox.wait_ready(timeout=SANDBOX_READY_TIMEOUT) - download_items: list[AssetDownloadItem] = self._build_download_items(files) + download_items: list[SandboxDownloadItem] = self._build_download_items(files) sandbox_paths = self._upload(sandbox.vm, download_items) file_names = [PurePosixPath(path).name for path in sandbox_paths] process_data = { @@ -178,9 +178,9 @@ class FileUploadNode(Node[FileUploadNodeData]): return files return [] - def _build_download_items(self, files: Sequence[File]) -> list[AssetDownloadItem]: + def _build_download_items(self, files: Sequence[File]) -> list[SandboxDownloadItem]: used_paths: set[str] = set() - items: list[AssetDownloadItem] = [] + items: list[SandboxDownloadItem] = [] for index, file in enumerate(files): file_url = self._get_download_url(file) @@ -198,7 +198,7 @@ class FileUploadNode(Node[FileUploadNodeData]): dedupe += 1 used_paths.add(filename) - items.append(AssetDownloadItem(path=filename, url=file_url)) + items.append(SandboxDownloadItem(path=filename, url=file_url)) return items @staticmethod @@ -208,7 +208,7 @@ class FileUploadNode(Node[FileUploadNodeData]): normalized = normalized.lstrip("/") return normalized or "." - def _upload(self, vm: Any, items: list[AssetDownloadItem]) -> list[str]: + def _upload(self, vm: Any, items: list[SandboxDownloadItem]) -> list[str]: p = pipeline(vm) out_paths: list[str] = [] for item in items: diff --git a/api/core/zip_sandbox/__init__.py b/api/core/zip_sandbox/__init__.py index 71d36e6ee9..266e6c7dc2 100644 --- a/api/core/zip_sandbox/__init__.py +++ b/api/core/zip_sandbox/__init__.py @@ -1,4 +1,11 @@ -from .zip_sandbox import SandboxDownloadItem, SandboxFile, SandboxUploadItem, ZipSandbox +from __future__ import annotations + +from typing import TYPE_CHECKING + +from .entities import SandboxDownloadItem, SandboxFile, SandboxUploadItem + +if TYPE_CHECKING: + from .zip_sandbox import ZipSandbox __all__ = [ "SandboxDownloadItem", @@ -6,3 +13,11 @@ __all__ = [ "SandboxUploadItem", "ZipSandbox", ] + + +def __getattr__(name: str): + if name == "ZipSandbox": + from .zip_sandbox import ZipSandbox + + return ZipSandbox + raise AttributeError(f"module {__name__!r} has no attribute {name!r}") diff --git a/api/core/zip_sandbox/entities.py b/api/core/zip_sandbox/entities.py new file mode 100644 index 0000000000..fc350899cf --- /dev/null +++ b/api/core/zip_sandbox/entities.py @@ -0,0 +1,39 @@ +"""Data classes for ZipSandbox file operations. + +Separated from ``zip_sandbox.py`` so that lightweight consumers (tests, +shell-script builders) can import the types without pulling in the full +sandbox provider chain. +""" + +from __future__ import annotations + +from dataclasses import dataclass, field + + +@dataclass(frozen=True) +class SandboxDownloadItem: + """Unified download/inline item for sandbox file operations. + + For remote files, *url* is set and the item is fetched via ``curl``. + For inline content, *content* is set and the bytes are written directly + into the VM via ``upload_file`` — no network round-trip. + """ + + path: str + url: str = "" + content: bytes | None = field(default=None, repr=False) + + +@dataclass(frozen=True) +class SandboxUploadItem: + """Item for uploading: sandbox path -> URL.""" + + path: str + url: str + + +@dataclass(frozen=True) +class SandboxFile: + """A handle to a file in the sandbox.""" + + path: str diff --git a/api/core/zip_sandbox/zip_sandbox.py b/api/core/zip_sandbox/zip_sandbox.py index b8c61a3890..728ef538c6 100644 --- a/api/core/zip_sandbox/zip_sandbox.py +++ b/api/core/zip_sandbox/zip_sandbox.py @@ -1,7 +1,8 @@ from __future__ import annotations +import base64 import posixpath -from dataclasses import dataclass +import shlex from io import BytesIO from pathlib import PurePosixPath from types import TracebackType @@ -20,34 +21,12 @@ from core.virtual_environment.__base.virtual_environment import VirtualEnvironme from services.sandbox.sandbox_provider_service import SandboxProviderService from .cli_strategy import CliZipStrategy +from .entities import SandboxDownloadItem, SandboxFile, SandboxUploadItem from .node_strategy import NodeZipStrategy from .python_strategy import PythonZipStrategy from .strategy import ZipStrategy -@dataclass(frozen=True) -class SandboxDownloadItem: - """Item for downloading: URL -> sandbox path.""" - - url: str - path: str - - -@dataclass(frozen=True) -class SandboxUploadItem: - """Item for uploading: sandbox path -> URL.""" - - path: str - url: str - - -@dataclass(frozen=True) -class SandboxFile: - """A handle to a file in the sandbox.""" - - path: str - - class ZipSandbox: """A sandbox for archive (zip) operations. @@ -221,6 +200,12 @@ class ZipSandbox: # ========== Download operations ========== def download_items(self, items: list[SandboxDownloadItem], *, dest_dir: str = ".") -> list[str]: + """Download or write items into the sandbox via a single pipeline. + + Remote items (with *url*) are fetched via ``curl``. Inline items + (with *content*) are written via ``base64 -d`` heredoc. Both go + through the same pipeline — no branching at the structural level. + """ if not items: return [] @@ -238,7 +223,10 @@ class ZipSandbox: out_dir = posixpath.dirname(out_path) if out_dir not in ("", "."): p.add(["mkdir", "-p", out_dir], error_message="Failed to create download directory") - p.add(["curl", "-fsSL", item.url, "-o", out_path], error_message="Failed to download file") + p.add( + self.to_download_command(item, out_path), + error_message=f"Failed to write {item.path}", + ) try: p.execute(timeout=self._DEFAULT_TIMEOUT_SECONDS, raise_on_error=True) @@ -247,6 +235,14 @@ class ZipSandbox: return out_paths + @staticmethod + def to_download_command(item: SandboxDownloadItem, out_path: str) -> list[str]: + """Return the shell command to materialise *item* at *out_path*.""" + if item.content is not None: + encoded = base64.b64encode(item.content).decode("ascii") + return ["sh", "-c", f"base64 -d <<'_B64_' > {shlex.quote(out_path)}\n{encoded}\n_B64_"] + return ["curl", "-fsSL", item.url, "-o", out_path] + def download_archive(self, archive_url: str, *, path: str = "input.tar.gz") -> str: path = self._normalize_path(path) diff --git a/api/services/app_asset_package_service.py b/api/services/app_asset_package_service.py index c1ab180e58..9df4874880 100644 --- a/api/services/app_asset_package_service.py +++ b/api/services/app_asset_package_service.py @@ -6,6 +6,13 @@ separated from AppAssetService to avoid circular imports. Dependency flow: core/* -> AppAssetPackageService -> AppAssetService (core modules can import this service without circular dependency) + +Inline content optimisation: + ``AssetItem`` objects returned by the build pipeline may carry an + in-process *content* field (e.g. resolved ``.md`` skill documents). + ``AppAssetService.to_download_items()`` converts these into unified + ``SandboxDownloadItem`` instances, and ``ZipSandbox.download_items()`` + handles both inline and remote items natively. """ import logging @@ -19,7 +26,7 @@ from core.app_assets.builder.file_builder import FileBuilder from core.app_assets.builder.skill_builder import SkillBuilder from core.app_assets.entities.assets import AssetItem from core.app_assets.storage import AssetPaths -from core.zip_sandbox import SandboxDownloadItem, ZipSandbox +from core.zip_sandbox import ZipSandbox from models.app_asset import AppAssets from models.model import App @@ -84,15 +91,12 @@ class AppAssetPackageService: ) -> None: """Package assets into a ZIP and upload directly to the given URL. - When *assets* is empty an empty ZIP is written directly to storage - using *storage_key*, bypassing the HTTP ticket URL. This avoids a - ``ConnectionError`` when the api process cannot reach the ticket - endpoint (e.g. ``localhost:80`` inside a Docker container where nginx - runs in a separate service). + Uses ``AppAssetService.to_download_items()`` to convert assets + into unified download items, then ``ZipSandbox.download_items()`` + handles both inline content and remote presigned URLs natively. - For non-empty assets the ZIP is built inside a remote sandbox VM - which uploads via ``curl`` to *upload_url* (the sandbox container - *can* reach the ticket endpoint thanks to socat forwarding). + When *assets* is empty an empty ZIP is written directly to storage + using *storage_key*, bypassing the HTTP ticket URL. """ from services.app_asset_service import AppAssetService @@ -119,12 +123,8 @@ class AppAssetPackageService: requests.put(upload_url, data=buf.getvalue(), timeout=30) return - asset_storage = AppAssetService.get_storage() - keys = [AssetPaths.draft(tenant_id, app_id, asset.asset_id) for asset in assets] - download_urls = asset_storage.get_download_urls(keys) - download_items = [ - SandboxDownloadItem(url=url, path=asset.path) for asset, url in zip(assets, download_urls, strict=True) - ] + download_items = AppAssetService.to_download_items(assets) + with ZipSandbox(tenant_id=tenant_id, user_id=user_id, app_id="asset-packager") as zs: zs.download_items(download_items) archive = zs.zip() @@ -134,7 +134,11 @@ class AppAssetPackageService: def publish(session: Session, app_model: App, account_id: str, workflow_id: str) -> AppAssets: """Publish app assets for a workflow. - Creates a versioned copy of draft assets and packages them for runtime use. + Creates a versioned copy of draft assets and packages them for + runtime use. The build ZIP contains resolved ``.md`` content + (inline from ``SkillBuilder``) and raw draft content for all + other files. A separate source ZIP snapshots the raw drafts for + later export. """ from services.app_asset_service import AppAssetService @@ -159,10 +163,11 @@ class AppAssetPackageService: asset_storage = AppAssetService.get_storage() accessor = AppAssetService.get_accessor(tenant_id, app_id) - pipeline = AssetBuildPipeline([SkillBuilder(accessor=accessor, storage=asset_storage), FileBuilder()]) + build_pipeline = AssetBuildPipeline([SkillBuilder(accessor=accessor), FileBuilder()]) ctx = BuildContext(tenant_id=tenant_id, app_id=app_id, build_id=publish_id) - built_assets = pipeline.build_all(tree, ctx) + built_assets = build_pipeline.build_all(tree, ctx) + # Runtime ZIP: resolved .md (inline) + raw draft (remote). runtime_zip_key = AssetPaths.build_zip(tenant_id, app_id, publish_id) runtime_upload_url = asset_storage.get_upload_url(runtime_zip_key) AppAssetPackageService.package_and_upload( @@ -174,6 +179,7 @@ class AppAssetPackageService: storage_key=runtime_zip_key, ) + # Source ZIP: all raw draft content (for export/restore). source_items = AppAssetService.get_draft_assets(tenant_id, app_id) source_key = AssetPaths.source_zip(tenant_id, app_id, workflow_id) source_upload_url = asset_storage.get_upload_url(source_key) @@ -187,28 +193,3 @@ class AppAssetPackageService: ) return published - - @staticmethod - def build_assets(tenant_id: str, app_id: str, assets: AppAssets) -> None: - """Build resolved draft assets without packaging into a zip.""" - from services.app_asset_service import AppAssetService - - tree = assets.asset_tree - - asset_storage = AppAssetService.get_storage() - accessor = AppAssetService.get_accessor(tenant_id, app_id) - pipeline = AssetBuildPipeline([SkillBuilder(accessor=accessor, storage=asset_storage), FileBuilder()]) - ctx = BuildContext(tenant_id=tenant_id, app_id=app_id, build_id=assets.id) - built_assets: list[AssetItem] = pipeline.build_all(tree, ctx) - - user_id = getattr(assets, "updated_by", None) or getattr(assets, "created_by", None) or "system" - key = AssetPaths.build_zip(tenant_id, app_id, assets.id) - upload_url = asset_storage.get_upload_url(key) - AppAssetPackageService.package_and_upload( - assets=built_assets, - upload_url=upload_url, - tenant_id=tenant_id, - app_id=app_id, - user_id=user_id, - storage_key=key, - ) diff --git a/api/services/app_asset_service.py b/api/services/app_asset_service.py index 50c107a551..992d465a49 100644 --- a/api/services/app_asset_service.py +++ b/api/services/app_asset_service.py @@ -1,3 +1,5 @@ +from __future__ import annotations + import logging import threading from uuid import uuid4 @@ -16,6 +18,7 @@ from core.app.entities.app_asset_entities import ( from core.app_assets.accessor import CachedContentAccessor from core.app_assets.entities.assets import AssetItem from core.app_assets.storage import AssetPaths +from core.zip_sandbox import SandboxDownloadItem from extensions.ext_database import db from extensions.ext_redis import redis_client from extensions.ext_storage import storage @@ -214,6 +217,46 @@ class AppAssetService: """Get a content accessor with DB caching for the given app.""" return CachedContentAccessor(AppAssetService.get_storage(), tenant_id, app_id) + # Default TTL for presigned download URLs generated by to_download_items(). + _DOWNLOAD_URL_TTL_SECONDS = 600 + + @staticmethod + def to_download_items( + items: list[AssetItem], + *, + path_prefix: str = "", + ) -> list[SandboxDownloadItem]: + """Convert asset items to unified download items. + + Items with *content* become inline ``SandboxDownloadItem`` instances + (no presigned URL needed). Items without *content* get presigned + download URLs from storage. + + *path_prefix*, when set, is prepended to every item path + (e.g. ``"my-app"`` → ``"my-app/skills/foo.md"``). + """ + from core.zip_sandbox import SandboxDownloadItem + + inline: list[SandboxDownloadItem] = [] + remote_items: list[tuple[AssetItem, str]] = [] # (item, path) + + for item in items: + path = f"{path_prefix}/{item.path}" if path_prefix else item.path + if item.content is not None: + inline.append(SandboxDownloadItem(path=path, content=item.content)) + else: + remote_items.append((item, path)) + + result = list(inline) + if remote_items: + asset_storage = AppAssetService.get_storage() + keys = [a.storage_key for a, _ in remote_items] + urls = asset_storage.get_download_urls(keys, AppAssetService._DOWNLOAD_URL_TTL_SECONDS) + for (_, path), url in zip(remote_items, urls, strict=True): + result.append(SandboxDownloadItem(path=path, url=url)) + + return result + @staticmethod def get_file_content(app_model: App, account_id: str, node_id: str) -> bytes: with Session(db.engine) as session: diff --git a/api/services/app_bundle_service.py b/api/services/app_bundle_service.py index 3e54006607..21efb1d424 100644 --- a/api/services/app_bundle_service.py +++ b/api/services/app_bundle_service.py @@ -37,7 +37,7 @@ from core.app.entities.app_bundle_entities import ( BundleManifest, ) from core.app_assets.storage import AssetPaths -from core.zip_sandbox import SandboxDownloadItem, SandboxUploadItem, ZipSandbox +from core.zip_sandbox import SandboxUploadItem, ZipSandbox from extensions.ext_database import db from extensions.ext_redis import redis_client from extensions.storage.cached_presign_storage import CachedPresignStorage @@ -131,16 +131,10 @@ class AppBundleService: else: asset_items = AppAssetService.get_draft_assets(tenant_id, app_id) if asset_items: - asset_urls = asset_storage.get_download_urls( - [AssetPaths.draft(tenant_id, app_id, a.asset_id) for a in asset_items], expires_in - ) - zs.download_items( - [ - SandboxDownloadItem(url=url, path=f"{safe_name}/{a.path}") - for a, url in zip(asset_items, asset_urls, strict=True) - ], - dest_dir="bundle_root", - ) + accessor = AppAssetService.get_accessor(tenant_id, app_id) + resolved = accessor.resolve_items(asset_items) + download_items = AppAssetService.to_download_items(resolved, path_prefix=safe_name) + zs.download_items(download_items, dest_dir="bundle_root") archive = zs.zip(src="bundle_root", include_base=False) zs.upload(archive, upload_url) diff --git a/api/services/sandbox/sandbox_service.py b/api/services/sandbox/sandbox_service.py index e69215ee1a..60b8877261 100644 --- a/api/services/sandbox/sandbox_service.py +++ b/api/services/sandbox/sandbox_service.py @@ -1,3 +1,19 @@ +"""Service for creating and managing sandbox instances. + +Three creation paths: + +- ``create()`` — published runtime. Downloads the pre-built ZIP via + ``AppAssetsInitializer`` and loads the ``SkillBundle`` via + ``SkillInitializer``. + +- ``create_draft()`` / ``create_for_single_step()`` — draft runtime. + ``DraftAppAssetsInitializer`` runs the build pipeline on the fly, + compiles ``.md`` skills (saving the ``SkillBundle`` to Redis/S3 as a + side-effect), and pushes resolved content as inline base64 into the + sandbox. ``SkillInitializer`` then loads the bundle from Redis/S3. + No separate ``build_assets()`` call is needed. +""" + import logging from core.sandbox.builder import SandboxBuilder @@ -11,7 +27,6 @@ from core.sandbox.initializer.skill_initializer import SkillInitializer from core.sandbox.sandbox import Sandbox from core.sandbox.storage.archive_storage import ArchiveSandboxStorage from extensions.ext_storage import storage -from services.app_asset_package_service import AppAssetPackageService from services.app_asset_service import AppAssetService logger = logging.getLogger(__name__) @@ -67,7 +82,6 @@ class SandboxService: if not assets: raise ValueError(f"No assets found for tid={tenant_id}, app_id={app_id}") - AppAssetPackageService.build_assets(tenant_id, app_id, assets) sandbox_id = SandboxBuilder.draft_id(user_id) archive_storage = ArchiveSandboxStorage( tenant_id, app_id, sandbox_id, storage.storage_runner, exclude_patterns=[AppAssets.PATH] @@ -101,7 +115,6 @@ class SandboxService: if not assets: raise ValueError(f"No assets found for tid={tenant_id}, app_id={app_id}") - AppAssetPackageService.build_assets(tenant_id, app_id, assets) sandbox_id = SandboxBuilder.draft_id(user_id) archive_storage = ArchiveSandboxStorage( tenant_id, app_id, sandbox_id, storage.storage_runner, exclude_patterns=[AppAssets.PATH] diff --git a/api/tests/unit_tests/core/app_assets/test_storage.py b/api/tests/unit_tests/core/app_assets/test_storage.py index 2094f1b89c..2d5adae8e7 100644 --- a/api/tests/unit_tests/core/app_assets/test_storage.py +++ b/api/tests/unit_tests/core/app_assets/test_storage.py @@ -104,12 +104,6 @@ def test_build_zip_storage_key(): assert key == f"app_assets/{tid}/{aid}/artifacts/{assets_id}.zip" -def test_resolved_storage_key(): - tid, aid, assets_id, nid = str(uuid4()), str(uuid4()), str(uuid4()), str(uuid4()) - key = AssetPaths.resolved(tid, aid, assets_id, nid) - assert key == f"app_assets/{tid}/{aid}/artifacts/{assets_id}/resolved/{nid}" - - def test_skill_bundle_storage_key(): tid, aid, assets_id = str(uuid4()), str(uuid4()), str(uuid4()) key = AssetPaths.skill_bundle(tid, aid, assets_id) diff --git a/api/tests/unit_tests/core/sandbox/__init__.py b/api/tests/unit_tests/core/sandbox/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/api/tests/unit_tests/core/sandbox/services/test_asset_download_service.py b/api/tests/unit_tests/core/sandbox/services/test_asset_download_service.py index 338fff2197..86a0235d0d 100644 --- a/api/tests/unit_tests/core/sandbox/services/test_asset_download_service.py +++ b/api/tests/unit_tests/core/sandbox/services/test_asset_download_service.py @@ -1,8 +1,22 @@ -from core.sandbox.services.asset_download_service import AssetDownloadItem, AssetDownloadService +"""Tests for AssetDownloadService shell script generation. + +Covers three scenarios: +1. Remote-only items (presigned URL download) +2. Inline-only items (base64 heredoc write) +3. Mixed (inline written first, then parallel downloads) +""" + +import base64 + +from core.sandbox.services.asset_download_service import AssetDownloadService +from core.zip_sandbox.entities import SandboxDownloadItem + +# --- Remote-only tests --- -def test_build_download_script_includes_downloader_detection() -> None: - script = AssetDownloadService.build_download_script([], "skills") +def test_remote_only_includes_downloader_detection() -> None: + items = [SandboxDownloadItem(path="file.txt", url="https://example.com/file.txt")] + script = AssetDownloadService.build_download_script(items, "skills") assert "command -v curl" in script assert "command -v wget" in script @@ -10,10 +24,10 @@ def test_build_download_script_includes_downloader_detection() -> None: assert "No downloader found" in script -def test_build_download_script_contains_items_and_root() -> None: +def test_remote_only_contains_items_and_root() -> None: items = [ - AssetDownloadItem(path="/docs/readme.md", url="https://example.com/readme.md"), - AssetDownloadItem(path="/data/input.json", url="https://example.com/input.json"), + SandboxDownloadItem(path="/docs/readme.md", url="https://example.com/readme.md"), + SandboxDownloadItem(path="/data/input.json", url="https://example.com/input.json"), ] script = AssetDownloadService.build_download_script(items, "skills") @@ -25,10 +39,10 @@ def test_build_download_script_contains_items_and_root() -> None: assert "https://example.com/input.json" in script -def test_build_download_script_escapes_paths_and_urls() -> None: +def test_remote_only_escapes_paths_and_urls() -> None: items = [ - AssetDownloadItem(path='/space path/"quoted".txt', url="https://example.com/a?b=1&c=2"), - AssetDownloadItem(path=r"/path/with\\backslash", url="https://example.com/with space"), + SandboxDownloadItem(path='/space path/"quoted".txt', url="https://example.com/a?b=1&c=2"), + SandboxDownloadItem(path=r"/path/with\\backslash", url="https://example.com/with space"), ] script = AssetDownloadService.build_download_script(items, "skills") @@ -38,23 +52,100 @@ def test_build_download_script_escapes_paths_and_urls() -> None: assert "?b=1&c=2" in script -def test_build_download_script_runs_parallel_jobs() -> None: - script = AssetDownloadService.build_download_script([], "skills") +def test_remote_only_runs_parallel_jobs() -> None: + items = [ + SandboxDownloadItem(path="a.txt", url="https://example.com/a"), + SandboxDownloadItem(path="b.txt", url="https://example.com/b"), + ] + script = AssetDownloadService.build_download_script(items, "skills") assert "download_one" in script assert "&" in script assert "wait" in script -def test_build_download_script_appends_failures() -> None: - script = AssetDownloadService.build_download_script([], "skills") +def test_remote_only_appends_failures() -> None: + items = [SandboxDownloadItem(path="a.txt", url="https://example.com/a")] + script = AssetDownloadService.build_download_script(items, "skills") assert "fail_log" in script - assert "Failed downloads" in script + assert "DOWNLOAD_FAILURES" in script -def test_build_download_script_contains_python_fallback() -> None: - script = AssetDownloadService.build_download_script([], "skills") +def test_remote_only_contains_python_fallback() -> None: + items = [SandboxDownloadItem(path="a.txt", url="https://example.com/a")] + script = AssetDownloadService.build_download_script(items, "skills") assert "python3 -" in script assert "urllib.request" in script + + +# --- Inline-only tests --- + + +def test_inline_only_no_downloader_detection() -> None: + items = [SandboxDownloadItem(path="skill.md", content=b"hello world")] + script = AssetDownloadService.build_download_script(items, "skills") + + # No remote items → no downloader detection block. + assert "command -v curl" not in script + assert "download_one" not in script + assert "wait" not in script + + +def test_inline_only_base64_content() -> None: + content = b'{"content": "test skill", "metadata": {}}' + items = [SandboxDownloadItem(path="docs/skill.md", content=content)] + script = AssetDownloadService.build_download_script(items, "skills") + + encoded = base64.b64encode(content).decode("ascii") + assert encoded in script + assert "base64 -d" in script + assert "docs/skill.md" in script + assert "_INLINE_0" in script + + +def test_inline_multiple_items() -> None: + items = [ + SandboxDownloadItem(path="a.md", content=b"aaa"), + SandboxDownloadItem(path="b.md", content=b"bbb"), + ] + script = AssetDownloadService.build_download_script(items, "skills") + + assert "_INLINE_0" in script + assert "_INLINE_1" in script + assert base64.b64encode(b"aaa").decode() in script + assert base64.b64encode(b"bbb").decode() in script + + +# --- Mixed tests --- + + +def test_mixed_inline_and_remote() -> None: + items = [ + SandboxDownloadItem(path="skill.md", content=b"resolved content"), + SandboxDownloadItem(path="data.py", url="https://example.com/data.py"), + ] + + script = AssetDownloadService.build_download_script(items, "skills") + + # Inline content present + assert "base64 -d" in script + assert base64.b64encode(b"resolved content").decode() in script + + # Remote download present + assert "command -v curl" in script + assert "download_one" in script + assert "data.py" in script + assert "https://example.com/data.py" in script + + +# --- Empty items --- + + +def test_empty_items_produces_valid_script() -> None: + script = AssetDownloadService.build_download_script([], "skills") + + assert "download_root=skills" in script + assert "mkdir -p" in script + assert "exit 0" in script