From 1c76ed2c405bba7af5c0c049923da0d8c6513d2f Mon Sep 17 00:00:00 2001 From: Harry Date: Tue, 20 Jan 2026 18:44:00 +0800 Subject: [PATCH] feat(sandbox): draft storage --- api/commands.py | 6 +- .../app/apps/advanced_chat/app_generator.py | 7 +- api/core/app/apps/workflow/app_generator.py | 7 +- api/core/app/layers/sandbox_layer.py | 63 ++++----- api/core/sandbox/bash/session.py | 86 +++++------- .../initializer/app_assets_initializer.py | 43 +++--- .../initializer/dify_cli_initializer.py | 76 ++++------ api/core/sandbox/storage/archive_storage.py | 52 +++---- api/core/sandbox/vm.py | 4 + api/core/skill/entities/skill_artifact.py | 0 api/core/virtual_environment/__base/exec.py | 18 +++ .../virtual_environment/__base/helpers.py | 130 +++++++++++++++++- api/extensions/storage/aws_s3_storage.py | 8 ++ api/extensions/storage/base_storage.py | 9 ++ .../storage/file_presign_storage.py | 10 ++ api/models/app_asset.py | 1 + api/services/app_asset_service.py | 16 ++- api/services/workflow_service.py | 5 +- 18 files changed, 333 insertions(+), 208 deletions(-) create mode 100644 api/core/skill/entities/skill_artifact.py diff --git a/api/commands.py b/api/commands.py index 99ba835d04..565661321d 100644 --- a/api/commands.py +++ b/api/commands.py @@ -23,6 +23,7 @@ from core.rag.datasource.vdb.vector_factory import Vector from core.rag.datasource.vdb.vector_type import VectorType from core.rag.index_processor.constant.built_in_field import BuiltInField from core.rag.models.document import Document +from core.sandbox.vm import SandboxBuilder, SandboxType from core.tools.utils.system_encryption import encrypt_system_params from events.app_event import app_was_created from extensions.ext_database import db @@ -1508,7 +1509,6 @@ def setup_sandbox_system_config(provider_type: str, config: str): flask setup-sandbox-system-config --provider-type local --config '{}' """ from models.sandbox import SandboxProviderSystemConfig - from services.sandbox.sandbox_provider_service import PROVIDER_CONFIG_MODELS try: click.echo(click.style(f"Validating config: {config}", fg="yellow")) @@ -1516,9 +1516,7 @@ def setup_sandbox_system_config(provider_type: str, config: str): click.echo(click.style("Config validated successfully.", fg="green")) click.echo(click.style(f"Validating config schema for provider type: {provider_type}", fg="yellow")) - model_class = PROVIDER_CONFIG_MODELS.get(provider_type) - if model_class: - model_class.model_validate(config_dict) + SandboxBuilder.validate(SandboxType(provider_type), config_dict) click.echo(click.style("Config schema validated successfully.", fg="green")) click.echo(click.style("Encrypting config...", fg="yellow")) diff --git a/api/core/app/apps/advanced_chat/app_generator.py b/api/core/app/apps/advanced_chat/app_generator.py index 3d394b86c7..f1e4b1ad14 100644 --- a/api/core/app/apps/advanced_chat/app_generator.py +++ b/api/core/app/apps/advanced_chat/app_generator.py @@ -30,7 +30,6 @@ from core.model_runtime.errors.invoke import InvokeAuthorizationError from core.ops.ops_trace_manager import TraceQueueManager from core.prompt.utils.get_thread_messages_length import get_thread_messages_length from core.repositories import DifyCoreRepositoryFactory -from core.sandbox.storage.archive_storage import ArchiveSandboxStorage from core.workflow.repositories.draft_variable_repository import ( DraftVariableSaverFactory, ) @@ -525,11 +524,7 @@ class AdvancedChatAppGenerator(MessageBasedAppGenerator): app_id=application_generate_entity.app_config.app_id, user_id=application_generate_entity.user_id, workflow_version=workflow.version, - sandbox_id=application_generate_entity.workflow_run_id, - sandbox_storage=ArchiveSandboxStorage( - tenant_id=application_generate_entity.app_config.tenant_id, - sandbox_id=application_generate_entity.workflow_run_id, - ), + workflow_execution_id=application_generate_entity.workflow_run_id, ), ) diff --git a/api/core/app/apps/workflow/app_generator.py b/api/core/app/apps/workflow/app_generator.py index 9bb80ded0c..53e80dcd2b 100644 --- a/api/core/app/apps/workflow/app_generator.py +++ b/api/core/app/apps/workflow/app_generator.py @@ -29,7 +29,6 @@ from core.helper.trace_id_helper import extract_external_trace_id_from_args from core.model_runtime.errors.invoke import InvokeAuthorizationError from core.ops.ops_trace_manager import TraceQueueManager from core.repositories import DifyCoreRepositoryFactory -from core.sandbox.storage.archive_storage import ArchiveSandboxStorage from core.workflow.graph_engine.layers.base import GraphEngineLayer from core.workflow.repositories.draft_variable_repository import DraftVariableSaverFactory from core.workflow.repositories.workflow_execution_repository import WorkflowExecutionRepository @@ -499,11 +498,7 @@ class WorkflowAppGenerator(BaseAppGenerator): app_id=application_generate_entity.app_config.app_id, user_id=application_generate_entity.user_id, workflow_version=workflow.version, - sandbox_id=application_generate_entity.workflow_execution_id, - sandbox_storage=ArchiveSandboxStorage( - tenant_id=application_generate_entity.app_config.tenant_id, - sandbox_id=application_generate_entity.workflow_execution_id, - ), + workflow_execution_id=application_generate_entity.workflow_execution_id, ), ) diff --git a/api/core/app/layers/sandbox_layer.py b/api/core/app/layers/sandbox_layer.py index d0cea9339a..4ad8229cc6 100644 --- a/api/core/app/layers/sandbox_layer.py +++ b/api/core/app/layers/sandbox_layer.py @@ -1,8 +1,8 @@ import logging from core.sandbox import AppAssetsInitializer, DifyCliInitializer, SandboxManager -from core.sandbox.storage.sandbox_storage import SandboxStorage -from core.virtual_environment.__base.virtual_environment import VirtualEnvironment +from core.sandbox.storage.archive_storage import ArchiveSandboxStorage +from core.sandbox.vm import SandboxBuilder from core.workflow.graph_engine.layers.base import GraphEngineLayer from core.workflow.graph_events.base import GraphEngineEvent from core.workflow.graph_events.graph import GraphRunPausedEvent @@ -25,32 +25,30 @@ class SandboxLayer(GraphEngineLayer): app_id: str, user_id: str, workflow_version: str, - sandbox_id: str, - sandbox_storage: SandboxStorage, + workflow_execution_id: str, ) -> None: super().__init__() self._tenant_id = tenant_id self._app_id = app_id self._user_id = user_id self._workflow_version = workflow_version - self._sandbox_id = sandbox_id - self._sandbox_storage = sandbox_storage - - @property - def sandbox(self) -> VirtualEnvironment: - sandbox = SandboxManager.get(self._sandbox_id) - if sandbox is None: - raise RuntimeError(f"Sandbox not found or not initialized for sandbox_id={self._sandbox_id}") - return sandbox + self._workflow_execution_id = workflow_execution_id + self._sandbox_id = ( + self._workflow_execution_id + if self._workflow_version == Workflow.VERSION_DRAFT + else SandboxBuilder.draft_id(self._user_id) + ) + self._sandbox_storage = ArchiveSandboxStorage(self._tenant_id, self._sandbox_id) def on_graph_start(self) -> None: try: is_draft = self._workflow_version == Workflow.VERSION_DRAFT - assets = AppAssetService.get_assets(self._tenant_id, self._app_id, is_draft=is_draft) + assets = AppAssetService.get_assets(self._tenant_id, self._app_id, self._user_id, is_draft=is_draft) if not assets: raise ValueError( f"No assets found for tid={self._tenant_id}, app_id={self._app_id}, wf={self._workflow_version}" ) + self._assets_id = assets.id if is_draft: @@ -63,20 +61,21 @@ class SandboxLayer(GraphEngineLayer): ) AppAssetService.build_assets(self._tenant_id, self._app_id, assets) - logger.info( - "Initializing sandbox for tenant_id=%s, app_id=%s, workflow_version=%s, assets_id=%s", - self._tenant_id, - self._app_id, - self._workflow_version, - assets.id, - ) - builder = ( SandboxProviderService.create_sandbox_builder(self._tenant_id) .initializer(AppAssetsInitializer(self._tenant_id, self._app_id, assets.id)) .initializer(DifyCliInitializer(self._tenant_id, self._user_id, self._app_id, assets.id)) ) - sandbox = builder.build() + try: + sandbox = builder.build() + logger.info( + "Sandbox initialized, workflow_execution_id=%s, sandbox_id=%s, sandbox_arch=%s", + self._sandbox_id, + sandbox.metadata.id, + sandbox.metadata.arch, + ) + except Exception as e: + raise SandboxInitializationError(f"Failed to build sandbox: {e}") from e SandboxManager.register(self._sandbox_id, sandbox) logger.info( @@ -86,9 +85,10 @@ class SandboxLayer(GraphEngineLayer): sandbox.metadata.arch, ) - # Check if sandbox is initialized - if self._sandbox_storage.mount(sandbox): - logger.info("Sandbox files restored, sandbox_id=%s", self._sandbox_id) + # mount sandbox files from storage + mounted = self._sandbox_storage.mount(sandbox) + logger.info("Sandbox files mount status: %s", mounted) + except Exception as e: logger.exception("Failed to initialize sandbox") raise SandboxInitializationError(f"Failed to initialize sandbox: {e}") from e @@ -108,13 +108,6 @@ class SandboxLayer(GraphEngineLayer): logger.debug("No sandbox to release for sandbox_id=%s", self._sandbox_id) return - sandbox_id = sandbox.metadata.id - logger.info( - "Releasing sandbox, workflow_execution_id=%s, sandbox_id=%s", - self._sandbox_id, - sandbox_id, - ) - try: self._sandbox_storage.unmount(sandbox) logger.info("Sandbox files persisted, sandbox_id=%s", self._sandbox_id) @@ -123,6 +116,6 @@ class SandboxLayer(GraphEngineLayer): try: sandbox.release_environment() - logger.info("Sandbox released, sandbox_id=%s", sandbox_id) + logger.info("Sandbox released, sandbox_id=%s", self._sandbox_id) except Exception: - logger.exception("Failed to release sandbox, sandbox_id=%s", sandbox_id) + logger.exception("Failed to release sandbox, sandbox_id=%s", self._sandbox_id) diff --git a/api/core/sandbox/bash/session.py b/api/core/sandbox/bash/session.py index f2e05fbebb..e37959b1b8 100644 --- a/api/core/sandbox/bash/session.py +++ b/api/core/sandbox/bash/session.py @@ -8,7 +8,7 @@ from types import TracebackType from core.session.cli_api import CliApiSessionManager from core.skill.entities.tool_artifact import ToolArtifact from core.skill.skill_manager import SkillManager -from core.virtual_environment.__base.helpers import execute, with_connection +from core.virtual_environment.__base.helpers import pipeline from core.virtual_environment.__base.virtual_environment import VirtualEnvironment from ..bash.dify_cli import DifyCliConfig @@ -73,63 +73,47 @@ class SandboxBashSession: node_id: str, allow_tools: list[tuple[str, str]], ) -> str | None: - with with_connection(sandbox) as conn: - artifact: ToolArtifact | None = SkillManager.load_tool_artifact( - self._tenant_id, - self._app_id, - self._assets_id, - ) + artifact: ToolArtifact | None = SkillManager.load_tool_artifact( + self._tenant_id, + self._app_id, + self._assets_id, + ) - if artifact is None or artifact.is_empty(): - logger.info("No tools found in artifact for assets_id=%s", self._assets_id) - return None + if artifact is None or artifact.is_empty(): + logger.info("No tools found in artifact for assets_id=%s", self._assets_id) + return None - artifact = artifact.filter(allow_tools) - if artifact.is_empty(): - logger.info("No tools found in artifact for assets_id=%s", self._assets_id) - return None + artifact = artifact.filter(allow_tools) + if artifact.is_empty(): + logger.info("No tools found in artifact for assets_id=%s", self._assets_id) + return None - self._cli_api_session = CliApiSessionManager().create(tenant_id=self._tenant_id, user_id=self._user_id) + self._cli_api_session = CliApiSessionManager().create(tenant_id=self._tenant_id, user_id=self._user_id) + node_tools_path = f"{DIFY_CLI_TOOLS_ROOT}/{node_id}" - execute( - sandbox, - ["mkdir", "-p", DIFY_CLI_GLOBAL_TOOLS_PATH], - connection=conn, - error_message="Failed to create Dify CLI global tools directory", - ) + ( + pipeline(sandbox) + .add(["mkdir", "-p", DIFY_CLI_GLOBAL_TOOLS_PATH], error_message="Failed to create global tools dir") + .add(["mkdir", "-p", node_tools_path], error_message="Failed to create node tools dir") + .execute(raise_on_error=True) + ) - execute( - sandbox, - ["mkdir", "-p", f"{DIFY_CLI_TOOLS_ROOT}/{node_id}"], - connection=conn, - error_message="Failed to create Dify CLI node tools directory", - ) + config_json = json.dumps( + DifyCliConfig.create( + session=self._cli_api_session, tenant_id=self._tenant_id, artifact=artifact + ).model_dump(mode="json"), + ensure_ascii=False, + ) + sandbox.upload_file(f"{node_tools_path}/{DIFY_CLI_CONFIG_FILENAME}", BytesIO(config_json.encode("utf-8"))) - config_json = json.dumps( - DifyCliConfig.create( - session=self._cli_api_session, tenant_id=self._tenant_id, artifact=artifact - ).model_dump(mode="json"), - ensure_ascii=False, - ) - sandbox.upload_file( - f"{DIFY_CLI_TOOLS_ROOT}/{node_id}/{DIFY_CLI_CONFIG_FILENAME}", BytesIO(config_json.encode("utf-8")) - ) + pipeline(sandbox, cwd=node_tools_path).add( + [DIFY_CLI_PATH, "init"], error_message="Failed to initialize Dify CLI" + ).execute(raise_on_error=True) - execute( - sandbox, - [DIFY_CLI_PATH, "init"], - connection=conn, - cwd=f"{DIFY_CLI_TOOLS_ROOT}/{node_id}", - error_message="Failed to initialize Dify CLI", - ) - - logger.info( - "Node %s tools initialized, path=%s, tool_count=%d", - node_id, - f"{DIFY_CLI_TOOLS_ROOT}/{node_id}", - len(artifact.references), - ) - return f"{DIFY_CLI_TOOLS_ROOT}/{node_id}" + logger.info( + "Node %s tools initialized, path=%s, tool_count=%d", node_id, node_tools_path, len(artifact.references) + ) + return node_tools_path def __exit__( self, diff --git a/api/core/sandbox/initializer/app_assets_initializer.py b/api/core/sandbox/initializer/app_assets_initializer.py index c7b54b85c5..f738df11d5 100644 --- a/api/core/sandbox/initializer/app_assets_initializer.py +++ b/api/core/sandbox/initializer/app_assets_initializer.py @@ -1,16 +1,18 @@ import logging -from io import BytesIO from core.app_assets.paths import AssetPaths -from core.virtual_environment.__base.helpers import execute, with_connection +from core.virtual_environment.__base.helpers import pipeline from core.virtual_environment.__base.virtual_environment import VirtualEnvironment from extensions.ext_storage import storage +from extensions.storage.file_presign_storage import FilePresignStorage -from ..constants import APP_ASSETS_PATH, APP_ASSETS_ZIP_PATH +from ..constants import APP_ASSETS_ZIP_PATH from .base import SandboxInitializer logger = logging.getLogger(__name__) +APP_ASSETS_DOWNLOAD_TIMEOUT = 60 * 10 + class AppAssetsInitializer(SandboxInitializer): def __init__(self, tenant_id: str, app_id: str, assets_id: str) -> None: @@ -20,33 +22,20 @@ class AppAssetsInitializer(SandboxInitializer): def initialize(self, env: VirtualEnvironment) -> None: zip_key = AssetPaths.build_zip(self._tenant_id, self._app_id, self._assets_id) - try: - zip_data = storage.load_once(zip_key) - except Exception: - logger.warning( - "Failed to load assets zip for app_id=%s, key=%s", - self._app_id, - zip_key, - exc_info=True, - ) - return + download_url = FilePresignStorage(storage.storage_runner).get_download_url(zip_key) - env.upload_file(APP_ASSETS_ZIP_PATH, BytesIO(zip_data)) - - with with_connection(env) as conn: - execute( - env, - ["unzip", "-o", APP_ASSETS_ZIP_PATH, "-d", APP_ASSETS_PATH], - connection=conn, - timeout=60, + ( + pipeline(env) + .add(["wget", "-q", download_url, "-O", APP_ASSETS_ZIP_PATH], error_message="Failed to download assets zip") + # unzip with silent error and return 1 if the zip is empty + # FIXME(Mairuis): should use a more robust way to check if the zip is empty + .add( + ["sh", "-c", f"unzip {APP_ASSETS_ZIP_PATH} 2>/dev/null || [ $? -eq 1 ]"], error_message="Failed to unzip assets", ) - execute( - env, - ["rm", "-f", APP_ASSETS_ZIP_PATH], - connection=conn, - error_message="Failed to cleanup temp zip file", - ) + .add(["rm", "-f", APP_ASSETS_ZIP_PATH], error_message="Failed to cleanup temp zip file") + .execute(timeout=APP_ASSETS_DOWNLOAD_TIMEOUT, raise_on_error=True) + ) logger.info( "App assets initialized for app_id=%s, published_id=%s", diff --git a/api/core/sandbox/initializer/dify_cli_initializer.py b/api/core/sandbox/initializer/dify_cli_initializer.py index bc36f3af1b..1e92047b4c 100644 --- a/api/core/sandbox/initializer/dify_cli_initializer.py +++ b/api/core/sandbox/initializer/dify_cli_initializer.py @@ -7,7 +7,7 @@ from pathlib import Path from core.session.cli_api import CliApiSessionManager from core.skill.skill_manager import SkillManager -from core.virtual_environment.__base.helpers import execute, with_connection +from core.virtual_environment.__base.helpers import pipeline from core.virtual_environment.__base.virtual_environment import VirtualEnvironment from ..bash.dify_cli import DifyCliConfig, DifyCliLocator @@ -43,61 +43,37 @@ class DifyCliInitializer(SandboxInitializer): def initialize(self, env: VirtualEnvironment) -> None: binary = self._locator.resolve(env.metadata.os, env.metadata.arch) - with with_connection(env) as conn: - execute( - env, - ["mkdir", "-p", f"{DIFY_CLI_ROOT}/bin"], - connection=conn, - error_message="Failed to create dify CLI directory", - ) + pipeline(env).add( + ["mkdir", "-p", f"{DIFY_CLI_ROOT}/bin"], error_message="Failed to create dify CLI directory" + ).execute(raise_on_error=True) - env.upload_file(DIFY_CLI_PATH, BytesIO(binary.path.read_bytes())) + env.upload_file(DIFY_CLI_PATH, BytesIO(binary.path.read_bytes())) - execute( - env, - ["chmod", "+x", DIFY_CLI_PATH], - connection=conn, - error_message="Failed to mark dify CLI as executable", - ) + pipeline(env).add( + ["chmod", "+x", DIFY_CLI_PATH], error_message="Failed to mark dify CLI as executable" + ).execute(raise_on_error=True) - logger.info("Dify CLI uploaded to sandbox, path=%s", DIFY_CLI_PATH) + logger.info("Dify CLI uploaded to sandbox, path=%s", DIFY_CLI_PATH) - artifact = SkillManager.load_tool_artifact( - self._tenant_id, - self._app_id, - self._assets_id, - ) + artifact = SkillManager.load_tool_artifact(self._tenant_id, self._app_id, self._assets_id) + if artifact is None or not artifact.references: + logger.info("No tools found in artifact for assets_id=%s", self._assets_id) + return - if artifact is None or not artifact.references: - logger.info("No tools found in artifact for assets_id=%s", self._assets_id) - return + # FIXME(Mairuis): store it in workflow context + self._cli_api_session = CliApiSessionManager().create(tenant_id=self._tenant_id, user_id=self._user_id) - # FIXME(Mairuis): store it in workflow context - self._cli_api_session = CliApiSessionManager().create(tenant_id=self._tenant_id, user_id=self._user_id) + pipeline(env).add( + ["mkdir", "-p", DIFY_CLI_GLOBAL_TOOLS_PATH], error_message="Failed to create global tools dir" + ).execute(raise_on_error=True) - execute( - env, - ["mkdir", "-p", DIFY_CLI_GLOBAL_TOOLS_PATH], - connection=conn, - error_message="Failed to create Dify CLI global tools directory", - ) + config = DifyCliConfig.create(self._cli_api_session, self._tenant_id, artifact) + config_json = json.dumps(config.model_dump(mode="json"), ensure_ascii=False) + config_path = f"{DIFY_CLI_GLOBAL_TOOLS_PATH}/{DIFY_CLI_CONFIG_FILENAME}" + env.upload_file(config_path, BytesIO(config_json.encode("utf-8"))) - config = DifyCliConfig.create(self._cli_api_session, self._tenant_id, artifact) - config_json = json.dumps(config.model_dump(mode="json"), ensure_ascii=False) - env.upload_file( - f"{DIFY_CLI_GLOBAL_TOOLS_PATH}/{DIFY_CLI_CONFIG_FILENAME}", BytesIO(config_json.encode("utf-8")) - ) + pipeline(env, cwd=DIFY_CLI_GLOBAL_TOOLS_PATH).add( + [DIFY_CLI_PATH, "init"], error_message="Failed to initialize Dify CLI" + ).execute(raise_on_error=True) - execute( - env, - [DIFY_CLI_PATH, "init"], - connection=conn, - cwd=DIFY_CLI_GLOBAL_TOOLS_PATH, - error_message="Failed to initialize Dify CLI", - ) - - logger.info( - "Global tools initialized, path=%s, tool_count=%d", - DIFY_CLI_GLOBAL_TOOLS_PATH, - len(self._tools), - ) + logger.info("Global tools initialized, path=%s, tool_count=%d", DIFY_CLI_GLOBAL_TOOLS_PATH, len(self._tools)) diff --git a/api/core/sandbox/storage/archive_storage.py b/api/core/sandbox/storage/archive_storage.py index f4b762e5fc..4f7ed247bb 100644 --- a/api/core/sandbox/storage/archive_storage.py +++ b/api/core/sandbox/storage/archive_storage.py @@ -1,9 +1,10 @@ import logging -from io import BytesIO -from core.virtual_environment.__base.helpers import try_execute +from core.virtual_environment.__base.exec import PipelineExecutionError +from core.virtual_environment.__base.helpers import pipeline from core.virtual_environment.__base.virtual_environment import VirtualEnvironment from extensions.ext_storage import storage +from extensions.storage.file_presign_storage import FilePresignStorage from .sandbox_storage import SandboxStorage @@ -13,10 +14,12 @@ ARCHIVE_NAME = "workspace.tar.gz" WORKSPACE_DIR = "." ARCHIVE_PATH = f"/tmp/{ARCHIVE_NAME}" +ARCHIVE_DOWNLOAD_TIMEOUT = 60 * 5 +ARCHIVE_UPLOAD_TIMEOUT = 60 * 5 + class ArchiveSandboxStorage(SandboxStorage): def __init__(self, tenant_id: str, sandbox_id: str): - self._storage = storage self._tenant_id = tenant_id self._sandbox_id = sandbox_id @@ -29,39 +32,40 @@ class ArchiveSandboxStorage(SandboxStorage): logger.debug("No archive found for sandbox %s, skipping mount", self._sandbox_id) return False - archive_data = self._storage.load_once(self._storage_key) - sandbox.upload_file(ARCHIVE_NAME, BytesIO(archive_data)) - - result = try_execute(sandbox, ["tar", "-xzf", ARCHIVE_NAME], timeout=60) - if result.is_error: - logger.error("Failed to extract archive: %s", result.error_message) + download_url = FilePresignStorage(storage.storage_runner).get_download_url(self._storage_key) + try: + ( + pipeline(sandbox) + .add(["wget", download_url, "-O", ARCHIVE_NAME], error_message="Failed to download archive") + .add(["tar", "-xzf", ARCHIVE_NAME], error_message="Failed to extract archive") + .add(["rm", ARCHIVE_NAME], error_message="Failed to cleanup archive") + .execute(timeout=ARCHIVE_DOWNLOAD_TIMEOUT, raise_on_error=True) + ) + except PipelineExecutionError: + logger.exception("Failed to extract archive") return False - try_execute(sandbox, ["rm", ARCHIVE_NAME], timeout=10) - logger.info("Mounted archive for sandbox %s", self._sandbox_id) return True def unmount(self, sandbox: VirtualEnvironment) -> bool: - result = try_execute( - sandbox, - ["tar", "-czf", ARCHIVE_PATH, "--warning=no-file-changed", "-C", WORKSPACE_DIR, "."], - timeout=120, + upload_url = FilePresignStorage(storage.storage_runner).get_upload_url(self._storage_key) + ( + pipeline(sandbox) + .add( + ["tar", "-czf", ARCHIVE_PATH, "--warning=no-file-changed", "-C", WORKSPACE_DIR, "."], + error_message="Failed to create archive", + ) + .add(["wget", upload_url, "-O", ARCHIVE_PATH], error_message="Failed to upload archive") + .execute(timeout=ARCHIVE_UPLOAD_TIMEOUT, raise_on_error=True) ) - if result.is_error: - logger.error("Failed to create archive: %s", result.error_message) - return False - - archive_content = sandbox.download_file(ARCHIVE_PATH) - self._storage.save(self._storage_key, archive_content.getvalue()) - logger.info("Unmounted archive for sandbox %s", self._sandbox_id) return True def exists(self) -> bool: - return self._storage.exists(self._storage_key) + return storage.exists(self._storage_key) def delete(self) -> None: if self.exists(): - self._storage.delete(self._storage_key) + storage.delete(self._storage_key) logger.info("Deleted archive for sandbox %s", self._sandbox_id) diff --git a/api/core/sandbox/vm.py b/api/core/sandbox/vm.py index 911d442992..ac2b81ee1b 100644 --- a/api/core/sandbox/vm.py +++ b/api/core/sandbox/vm.py @@ -102,6 +102,10 @@ class SandboxBuilder: vm_class = _get_sandbox_class(vm_type) vm_class.validate(options) + @classmethod + def draft_id(cls, user_id: str) -> str: + return f"sandbox_draft_{user_id}" + class VMConfig: @staticmethod diff --git a/api/core/skill/entities/skill_artifact.py b/api/core/skill/entities/skill_artifact.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/api/core/virtual_environment/__base/exec.py b/api/core/virtual_environment/__base/exec.py index d523556e7d..f74baccaab 100644 --- a/api/core/virtual_environment/__base/exec.py +++ b/api/core/virtual_environment/__base/exec.py @@ -33,6 +33,8 @@ class SandboxConfigValidationError(ValueError): class CommandExecutionError(Exception): """Raised when a command execution fails.""" + result: CommandResult + def __init__(self, message: str, result: CommandResult): super().__init__(message) self.result = result @@ -44,3 +46,19 @@ class CommandExecutionError(Exception): @property def stderr(self) -> bytes: return self.result.stderr + + +class PipelineExecutionError(CommandExecutionError): + """Raised when a pipeline command fails in strict mode.""" + + index: int + command: list[str] + results: list[CommandResult] + + def __init__( + self, message: str, result: CommandResult, *, index: int, command: list[str], results: list[CommandResult] + ): + super().__init__(message, result) + self.index = index + self.command = command + self.results = results diff --git a/api/core/virtual_environment/__base/helpers.py b/api/core/virtual_environment/__base/helpers.py index 808f8a594f..92cf2f87b2 100644 --- a/api/core/virtual_environment/__base/helpers.py +++ b/api/core/virtual_environment/__base/helpers.py @@ -1,15 +1,19 @@ from __future__ import annotations import contextlib +import shlex from collections.abc import Generator, Mapping from contextlib import contextmanager +from dataclasses import dataclass, field from functools import partial from core.virtual_environment.__base.command_future import CommandFuture from core.virtual_environment.__base.entities import CommandResult, ConnectionHandle -from core.virtual_environment.__base.exec import CommandExecutionError +from core.virtual_environment.__base.exec import CommandExecutionError, PipelineExecutionError from core.virtual_environment.__base.virtual_environment import VirtualEnvironment +_PIPE_SENTINEL = "__DIFY_PIPE__" + @contextmanager def with_connection(env: VirtualEnvironment) -> Generator[ConnectionHandle, None, None]: @@ -147,3 +151,127 @@ def try_execute( with with_connection(env) as conn: return _execute_with_connection(env, conn, command, timeout, cwd) + + +@dataclass(frozen=True) +class _PipelineStep: + argv: list[str] + error_message: str = "Command failed" + + +@dataclass +class CommandPipeline: + """Batch multiple commands into a single shell execution (Redis pipeline style). + + Example: + results = pipeline(env).add(["echo", "hi"]).add(["ls"]).execute() + # Strict mode: raise on first failure + pipeline(env).add(["mkdir", "/x"], error_message="mkdir failed").execute(raise_on_error=True) + """ + + env: VirtualEnvironment + connection: ConnectionHandle | None = None + cwd: str | None = None + environments: Mapping[str, str] | None = None + + _steps: list[_PipelineStep] = field(default_factory=list) # pyright: ignore[reportUnknownVariableType] + + def add(self, command: list[str], *, error_message: str = "Command failed") -> CommandPipeline: + self._steps.append(_PipelineStep(argv=command, error_message=error_message)) + return self + + def execute(self, *, timeout: float | None = 30, raise_on_error: bool = False) -> list[CommandResult]: + if not self._steps: + return [] + + script = self._build_script(fail_fast=raise_on_error) + batch_cmd = ["sh", "-lc", script] + + if self.connection is not None: + batch_result = try_execute(self.env, batch_cmd, timeout=timeout, cwd=self.cwd, connection=self.connection) + else: + with with_connection(self.env) as conn: + batch_result = try_execute(self.env, batch_cmd, timeout=timeout, cwd=self.cwd, connection=conn) + + results = self._parse_results(batch_result.stdout, batch_result.pid) + + if raise_on_error: + for i, r in enumerate(iterable=results): + if r.is_error: + step = self._steps[i] + raise PipelineExecutionError( + f"{step.error_message}: {r.error_message}", + r, + index=i, + command=step.argv, + results=results, + ) + + return results + + def _build_script(self, *, fail_fast: bool = False) -> str: + lines = [ + "run() {", + ' i="$1"; shift', + ' out="$(mktemp)"; err="$(mktemp)"', + ' ("$@") >"$out" 2>"$err"; ec="$?"', + ' os="$(wc -c <"$out" | tr -d \' \')"', + ' es="$(wc -c <"$err" | tr -d \' \')"', + f' printf \'{_PIPE_SENTINEL} %s %s %s %s\\n\' "$i" "$ec" "$os" "$es"', + ' cat "$out"', + ' cat "$err"', + ' rm -f "$out" "$err"', + ' return "$ec"', + "}", + ] + suffix = " || exit $?" if fail_fast else "" + for i, step in enumerate(self._steps): + quoted = " ".join(shlex.quote(arg) for arg in step.argv) + lines.append(f"run {i} {quoted}{suffix}") + return "\n".join(lines) + + @staticmethod + def _parse_results(stdout: bytes, pid: str) -> list[CommandResult]: + results: list[CommandResult] = [] + pos = 0 + sentinel = _PIPE_SENTINEL.encode() + b" " + + while pos < len(stdout): + nl = stdout.find(b"\n", pos) + if nl == -1: + break + header = stdout[pos : nl + 1] + pos = nl + 1 + + if not header.startswith(sentinel): + raise ValueError("Malformed pipeline output: missing sentinel") + + parts = header.decode().strip().split(" ") + _, idx, ec, os_len, es_len = parts + out_len, err_len = int(os_len), int(es_len) + + out_bytes = stdout[pos : pos + out_len] + pos += out_len + err_bytes = stdout[pos : pos + err_len] + pos += err_len + + results.append( + CommandResult( + stdout=out_bytes, + stderr=err_bytes, + exit_code=int(ec), + pid=f"{pid}:{idx}", + ) + ) + + return results + + +def pipeline( + env: VirtualEnvironment, + connection: ConnectionHandle | None = None, + *, + cwd: str | None = None, + environments: Mapping[str, str] | None = None, +) -> CommandPipeline: + return CommandPipeline(env=env, connection=connection, cwd=cwd, environments=environments) diff --git a/api/extensions/storage/aws_s3_storage.py b/api/extensions/storage/aws_s3_storage.py index 1d47926f3d..95907c1c28 100644 --- a/api/extensions/storage/aws_s3_storage.py +++ b/api/extensions/storage/aws_s3_storage.py @@ -93,3 +93,11 @@ class AwsS3Storage(BaseStorage): ExpiresIn=expires_in, ) return url + + def get_upload_url(self, filename: str, expires_in: int = 3600) -> str: + url: str = self.client.generate_presigned_url( + ClientMethod="put_object", + Params={"Bucket": self.bucket_name, "Key": filename}, + ExpiresIn=expires_in, + ) + return url diff --git a/api/extensions/storage/base_storage.py b/api/extensions/storage/base_storage.py index 5243b66299..59134044bb 100644 --- a/api/extensions/storage/base_storage.py +++ b/api/extensions/storage/base_storage.py @@ -57,3 +57,12 @@ class BaseStorage(ABC): NotImplementedError: If this storage backend doesn't support pre-signed URLs """ raise NotImplementedError("This storage backend doesn't support pre-signed URLs") + + def get_upload_url(self, filename: str, expires_in: int = 3600) -> str: + """ + Generate a pre-signed URL for uploading a file. + + Storage backends that support pre-signed URLs (e.g., S3, Azure Blob, GCS) + should override this method to return a direct upload URL. + """ + raise NotImplementedError("This storage backend doesn't support pre-signed URLs") diff --git a/api/extensions/storage/file_presign_storage.py b/api/extensions/storage/file_presign_storage.py index 7ec0775398..67bf300d66 100644 --- a/api/extensions/storage/file_presign_storage.py +++ b/api/extensions/storage/file_presign_storage.py @@ -44,6 +44,16 @@ class FilePresignStorage(BaseStorage): except NotImplementedError: return self._generate_signed_proxy_url(filename) + def get_upload_url(self, filename: str, expires_in: int = 3600) -> str: + try: + return self._storage.get_upload_url(filename, expires_in) + except NotImplementedError: + return self._generate_signed_upload_url(filename) + + def _generate_signed_upload_url(self, filename: str) -> str: + # TODO: Implement this + raise NotImplementedError("This storage backend doesn't support pre-signed URLs") + def _generate_signed_proxy_url(self, filename: str) -> str: base_url = dify_config.FILES_URL encoded_filename = urllib.parse.quote(filename, safe="") diff --git a/api/models/app_asset.py b/api/models/app_asset.py index d2937a0238..dc9d6282c8 100644 --- a/api/models/app_asset.py +++ b/api/models/app_asset.py @@ -19,6 +19,7 @@ class AppAssets(Base): ) VERSION_DRAFT = "draft" + VERSION_PUBLISHED = "published" id: Mapped[str] = mapped_column(StringUUID, default=lambda: str(uuid4())) tenant_id: Mapped[str] = mapped_column(StringUUID, nullable=False) diff --git a/api/services/app_asset_service.py b/api/services/app_asset_service.py index ea8161f358..c4282264ed 100644 --- a/api/services/app_asset_service.py +++ b/api/services/app_asset_service.py @@ -62,14 +62,24 @@ class AppAssetService: return assets @staticmethod - def get_assets(tenant_id: str, app_id: str, *, is_draft: bool) -> AppAssets | None: - with Session(db.engine) as session: + def get_assets(tenant_id: str, app_id: str, user_id: str, *, is_draft: bool) -> AppAssets | None: + with Session(db.engine, expire_on_commit=False) as session: if is_draft: stmt = session.query(AppAssets).filter( AppAssets.tenant_id == tenant_id, AppAssets.app_id == app_id, AppAssets.version == AppAssets.VERSION_DRAFT, ) + if not stmt.first(): + assets = AppAssets( + id=str(uuid4()), + tenant_id=tenant_id, + app_id=app_id, + version=AppAssets.VERSION_DRAFT, + created_by=user_id, + ) + session.add(assets) + session.commit() else: stmt = ( session.query(AppAssets) @@ -308,7 +318,7 @@ class AppAssetService: parser = AssetParser(tree, tenant_id, app_id) parser.register( "md", - SkillAssetParser(tenant_id, app_id, publish_id), + SkillAssetParser(tenant_id, app_id, publish_id, tree), ) assets = parser.parse() diff --git a/api/services/workflow_service.py b/api/services/workflow_service.py index 612db86dc8..8b13de703b 100644 --- a/api/services/workflow_service.py +++ b/api/services/workflow_service.py @@ -15,6 +15,8 @@ from core.app.apps.workflow.app_config_manager import WorkflowAppConfigManager from core.file import File from core.repositories import DifyCoreRepositoryFactory from core.sandbox import SandboxManager +from core.sandbox.storage.archive_storage import ArchiveSandboxStorage +from core.sandbox.storage.sandbox_storage import SandboxStorage from core.variables import Variable, VariableBase from core.workflow.entities import WorkflowNodeExecution from core.workflow.enums import ErrorStrategy, WorkflowNodeExecutionMetadataKey, WorkflowNodeExecutionStatus @@ -704,7 +706,7 @@ class WorkflowService: from core.sandbox import AppAssetsInitializer, DifyCliInitializer from services.app_asset_service import AppAssetService - assets = AppAssetService.get_assets(draft_workflow.tenant_id, app_model.id, is_draft=True) + assets = AppAssetService.get_or_create_assets(draft_workflow.tenant_id, app_model.id, is_draft=True) if not assets: raise ValueError(f"No assets found for tid={draft_workflow.tenant_id}, app_id={app_model.id}") @@ -715,6 +717,7 @@ class WorkflowService: SandboxProviderService.create_sandbox_builder(draft_workflow.tenant_id) .initializer(DifyCliInitializer(draft_workflow.tenant_id, account.id, app_model.id, assets.id)) .initializer(AppAssetsInitializer(draft_workflow.tenant_id, app_model.id, assets.id)) + .storage(ArchiveSandboxStorage(draft_workflow.tenant_id, SandboxStorage.draft_id(account.id))) .build() ) single_step_execution_id = f"single-step-{uuid.uuid4()}"