feat: enhance file download functionality with pipeline execution and improved error handling

This commit is contained in:
Harry 2026-02-05 15:15:30 +08:00
parent 469fda8327
commit d690b97568

View File

@ -1,6 +1,7 @@
from __future__ import annotations
import json
import logging
import os
from typing import TYPE_CHECKING
from uuid import uuid4
@ -8,13 +9,15 @@ from uuid import uuid4
from core.sandbox.entities.files import SandboxFileDownloadTicket, SandboxFileNode
from core.sandbox.inspector.base import SandboxFileSource
from core.sandbox.storage import SandboxFilePaths
from core.virtual_environment.__base.exec import CommandExecutionError
from core.virtual_environment.__base.helpers import execute
from core.virtual_environment.__base.exec import CommandExecutionError, PipelineExecutionError
from core.virtual_environment.__base.helpers import execute, pipeline
from extensions.ext_storage import storage
if TYPE_CHECKING:
from core.zip_sandbox import ZipSandbox
logger = logging.getLogger(__name__)
class SandboxFileArchiveSource(SandboxFileSource):
_PYTHON_EXEC_CMD = 'if command -v python3 >/dev/null 2>&1; then py=python3; else py=python; fi; "$py" -c "$0" "$@"'
@ -141,82 +144,76 @@ print(json.dumps(entries))
return sorted(entries, key=lambda e: e.path)
def download_file(self, *, path: str) -> SandboxFileDownloadTicket:
"""Download a file or directory from the archived sandbox.
Uses direct upload from sandbox to storage via presigned URL, avoiding
data transfer through the service layer. This preserves binary integrity
(no text encoding issues) and reduces bandwidth overhead.
"""
from services.sandbox.sandbox_file_service import SandboxFileService
archive_url = self._get_archive_download_url()
export_name = os.path.basename(path.rstrip("/")) or "workspace"
export_id = uuid4().hex
with self._create_zip_sandbox() as zs:
# Download and extract the archive
archive_path = zs.download_archive(archive_url, path="workspace.tar.gz")
zs.untar(archive_path=archive_path, dest_dir="workspace")
# Determine the target path inside extracted workspace
target_path = f"workspace/{path}" if path not in (".", "") else "workspace"
# Detect if target is file or directory
detect_script = r"""
import os
import sys
p = sys.argv[1]
if os.path.isdir(p):
print("dir")
raise SystemExit(0)
if os.path.isfile(p):
print("file")
raise SystemExit(0)
print("none")
raise SystemExit(2)
"""
try:
result = execute(
zs.vm,
[
"sh",
"-c",
self._PYTHON_EXEC_CMD,
detect_script,
target_path,
],
timeout=self._LIST_TIMEOUT_SECONDS,
error_message="Failed to check path in sandbox",
results = (
pipeline(zs.vm)
.add(
[
"sh",
"-c",
'if [ -d "$1" ]; then echo dir; elif [ -f "$1" ]; then echo file; else exit 2; fi',
"sh",
target_path,
],
error_message="Failed to check path in sandbox",
)
.execute(timeout=self._LIST_TIMEOUT_SECONDS, raise_on_error=True)
)
except CommandExecutionError as exc:
except PipelineExecutionError as exc:
raise ValueError(str(exc)) from exc
kind = result.stdout.decode("utf-8", errors="replace").strip()
kind = results[0].stdout.decode("utf-8", errors="replace").strip()
if kind not in ("dir", "file"):
raise ValueError("File not found in sandbox archive")
from services.sandbox.sandbox_file_service import SandboxFileService
sandbox_storage = SandboxFileService.get_storage()
is_file = kind == "file"
filename = (os.path.basename(path) or "file") if is_file else f"{export_name}.tar.gz"
export_key = SandboxFilePaths.export(self._tenant_id, self._app_id, self._sandbox_id, export_id, filename)
upload_url = sandbox_storage.get_upload_url(export_key, self._EXPORT_EXPIRES_IN_SECONDS)
if kind == "file":
# Download file content from sandbox
file_data = zs.read_file(target_path)
export_key = SandboxFilePaths.export(
self._tenant_id,
self._app_id,
self._sandbox_id,
export_id,
os.path.basename(path) or "file",
# Build pipeline: for directories, tar first then upload; for files, upload directly
archive_temp = f"/tmp/{export_id}.tar.gz"
src_path = target_path if is_file else archive_temp
tar_src = path if path not in (".", "") else "."
try:
(
pipeline(zs.vm)
.add(
["tar", "-czf", archive_temp, "-C", "workspace", tar_src],
error_message="Failed to archive directory",
on=not is_file,
)
.add(
["curl", "-sf", "-X", "PUT", "-T", src_path, upload_url],
error_message="Failed to upload file",
)
.add(["rm", "-f", archive_temp], on=not is_file)
.execute(timeout=self._UPLOAD_TIMEOUT_SECONDS, raise_on_error=True)
)
sandbox_storage.save(export_key, file_data)
else:
# Create tar.gz archive of the directory
tar_file = zs.tar(target_path, include_base=True, compress=True)
tar_data = zs.read_file(tar_file.path)
export_key = SandboxFilePaths.export(
self._tenant_id,
self._app_id,
self._sandbox_id,
export_id,
f"{export_name}.tar.gz",
)
sandbox_storage.save(export_key, tar_data)
except PipelineExecutionError as exc:
raise RuntimeError(str(exc)) from exc
download_url = sandbox_storage.get_download_url(export_key, self._EXPORT_EXPIRES_IN_SECONDS)
return SandboxFileDownloadTicket(
download_url=download_url,
expires_in=self._EXPORT_EXPIRES_IN_SECONDS,