diff --git a/api/core/app_assets/builder/pipeline.py b/api/core/app_assets/builder/pipeline.py index d266ecf2d1..f8db220c0a 100644 --- a/api/core/app_assets/builder/pipeline.py +++ b/api/core/app_assets/builder/pipeline.py @@ -1,6 +1,4 @@ from core.app.entities.app_asset_entities import AppAssetFileTree -from core.app_assets.builder.file_builder import FileBuilder -from core.app_assets.builder.skill_builder import SkillBuilder from core.app_assets.entities import AssetItem from .base import AssetBuilder, BuildContext @@ -9,8 +7,8 @@ from .base import AssetBuilder, BuildContext class AssetBuildPipeline: _builders: list[AssetBuilder] - def __init__(self, builders: list[AssetBuilder] | None = None) -> None: - self._builders = builders or [SkillBuilder(), FileBuilder()] + def __init__(self, builders: list[AssetBuilder]) -> None: + self._builders = builders def build_all(self, tree: AppAssetFileTree, ctx: BuildContext) -> list[AssetItem]: # 1. Distribute: each node goes to first accepting builder diff --git a/api/core/app_assets/builder/skill_builder.py b/api/core/app_assets/builder/skill_builder.py index 0a381b6365..4446062038 100644 --- a/api/core/app_assets/builder/skill_builder.py +++ b/api/core/app_assets/builder/skill_builder.py @@ -8,7 +8,7 @@ from core.app_assets.paths import AssetPaths from core.skill.entities.skill_document import SkillDocument from core.skill.skill_compiler import SkillCompiler from core.skill.skill_manager import SkillManager -from extensions.ext_storage import storage +from extensions.storage.base_storage import BaseStorage from .base import BuildContext @@ -32,10 +32,12 @@ class _CompiledSkill: class SkillBuilder: _nodes: list[tuple[AppAssetNode, str]] _max_workers: int + _storage: BaseStorage - def __init__(self, max_workers: int = 8) -> None: + def __init__(self, storage: BaseStorage, max_workers: int = 8) -> None: self._nodes = [] self._max_workers = max_workers + self._storage = storage def accept(self, node: AppAssetNode) -> bool: return node.extension == "md" @@ -91,7 +93,7 @@ class SkillBuilder: def load_one(node: AppAssetNode, path: str) -> _LoadedSkill: draft_key = AssetPaths.draft_file(ctx.tenant_id, ctx.app_id, node.id) try: - data = json.loads(storage.load_once(draft_key)) + data = json.loads(self._storage.load_once(draft_key)) content = data.get("content", "") if isinstance(data, dict) else "" metadata = data.get("metadata", {}) if isinstance(data, dict) else {} except Exception: @@ -105,7 +107,7 @@ class SkillBuilder: def _upload_all(self, skills: list[_CompiledSkill]) -> None: def upload_one(skill: _CompiledSkill) -> None: - storage.save(skill.resolved_key, skill.content_bytes) + self._storage.save(skill.resolved_key, skill.content_bytes) with ThreadPoolExecutor(max_workers=self._max_workers) as executor: futures = [executor.submit(upload_one, skill) for skill in skills] diff --git a/api/core/app_assets/constants.py b/api/core/app_assets/constants.py index a13c6cd0a9..c6583989e8 100644 --- a/api/core/app_assets/constants.py +++ b/api/core/app_assets/constants.py @@ -5,3 +5,4 @@ from libs.attr_map import AttrKey class AppAssetsAttrs: # Skill artifact set FILE_TREE = AttrKey("file_tree", AppAssetFileTree) + APP_ASSETS_ID = AttrKey("app_assets_id", str) diff --git a/api/core/app_assets/converters.py b/api/core/app_assets/converters.py index c610fff542..d949e13259 100644 --- a/api/core/app_assets/converters.py +++ b/api/core/app_assets/converters.py @@ -2,6 +2,7 @@ from __future__ import annotations from core.app.entities.app_asset_entities import AppAssetFileTree, AssetNodeType from core.app_assets.entities import FileAsset +from core.app_assets.entities.assets import AssetItem from core.app_assets.paths import AssetPaths @@ -9,7 +10,7 @@ def tree_to_asset_items( tree: AppAssetFileTree, tenant_id: str, app_id: str, -) -> list[FileAsset]: +) -> list[AssetItem]: """ Convert AppAssetFileTree to list of FileAsset for packaging. @@ -21,7 +22,7 @@ def tree_to_asset_items( Returns: List of FileAsset items ready for packaging """ - items: list[FileAsset] = [] + items: list[AssetItem] = [] for node in tree.nodes: if node.node_type == AssetNodeType.FILE: path = tree.get_path(node.id) diff --git a/api/core/app_assets/packager/asset_zip_packager.py b/api/core/app_assets/packager/asset_zip_packager.py index b48099bf3c..5b08398b00 100644 --- a/api/core/app_assets/packager/asset_zip_packager.py +++ b/api/core/app_assets/packager/asset_zip_packager.py @@ -4,12 +4,9 @@ import io import zipfile from concurrent.futures import ThreadPoolExecutor from threading import Lock -from typing import TYPE_CHECKING from core.app_assets.entities import AssetItem - -if TYPE_CHECKING: - from extensions.ext_storage import Storage +from extensions.storage.base_storage import BaseStorage class AssetZipPackager: @@ -18,7 +15,7 @@ class AssetZipPackager: Automatically creates directory entries from asset paths. """ - def __init__(self, storage: Storage, *, max_workers: int = 8) -> None: + def __init__(self, storage: BaseStorage, *, max_workers: int = 8) -> None: self._storage = storage self._max_workers = max_workers diff --git a/api/core/sandbox/initializer/app_assets_attrs_loader.py b/api/core/sandbox/initializer/app_assets_attrs_loader.py deleted file mode 100644 index f3c19eaad2..0000000000 --- a/api/core/sandbox/initializer/app_assets_attrs_loader.py +++ /dev/null @@ -1,16 +0,0 @@ -from core.app_assets.constants import AppAssetsAttrs -from core.sandbox.initializer.base import SyncSandboxInitializer -from core.sandbox.sandbox import Sandbox -from services.app_asset_service import AppAssetService - - -class AppAssetsAttrsInitializer(SyncSandboxInitializer): - def __init__(self, tenant_id: str, app_id: str, assets_id: str) -> None: - self._tenant_id = tenant_id - self._app_id = app_id - self._assets_id = assets_id - - def initialize(self, sandbox: Sandbox) -> None: - # Load published app assets and unzip the artifact bundle. - app_assets = AppAssetService.get_tenant_app_assets(self._tenant_id, self._assets_id) - sandbox.attrs.set(AppAssetsAttrs.FILE_TREE, app_assets.asset_tree) diff --git a/api/core/sandbox/initializer/app_assets_initializer.py b/api/core/sandbox/initializer/app_assets_initializer.py index 5d73ce2bcd..4e78bdc005 100644 --- a/api/core/sandbox/initializer/app_assets_initializer.py +++ b/api/core/sandbox/initializer/app_assets_initializer.py @@ -1,10 +1,12 @@ import logging +from core.app_assets.constants import AppAssetsAttrs from core.app_assets.paths import AssetPaths from core.sandbox.sandbox import Sandbox from core.virtual_environment.__base.helpers import pipeline from extensions.ext_storage import storage from extensions.storage.file_presign_storage import FilePresignStorage +from services.app_asset_service import AppAssetService from ..entities import AppAssets from .base import AsyncSandboxInitializer @@ -21,6 +23,10 @@ class AppAssetsInitializer(AsyncSandboxInitializer): self._assets_id = assets_id def initialize(self, sandbox: Sandbox) -> None: + # Load published app assets and unzip the artifact bundle. + app_assets = AppAssetService.get_tenant_app_assets(self._tenant_id, self._assets_id) + sandbox.attrs.set(AppAssetsAttrs.FILE_TREE, app_assets.asset_tree) + sandbox.attrs.set(AppAssetsAttrs.APP_ASSETS_ID, self._assets_id) vm = sandbox.vm zip_key = AssetPaths.build_zip(self._tenant_id, self._app_id, self._assets_id) download_url = FilePresignStorage(storage.storage_runner).get_download_url(zip_key) diff --git a/api/core/sandbox/initializer/draft_app_assets_initializer.py b/api/core/sandbox/initializer/draft_app_assets_initializer.py index 13464cfaf5..d6fa945dd7 100644 --- a/api/core/sandbox/initializer/draft_app_assets_initializer.py +++ b/api/core/sandbox/initializer/draft_app_assets_initializer.py @@ -1,5 +1,7 @@ import logging +from core.app_assets.constants import AppAssetsAttrs +from core.app_assets.paths import AssetPaths from core.sandbox.entities import AppAssets from core.sandbox.sandbox import Sandbox from core.sandbox.services import AssetDownloadService @@ -12,6 +14,7 @@ from .base import AsyncSandboxInitializer logger = logging.getLogger(__name__) DRAFT_ASSETS_DOWNLOAD_TIMEOUT = 60 * 10 +DRAFT_ASSETS_EXPIRES_IN = 60 * 10 class DraftAppAssetsInitializer(AsyncSandboxInitializer): @@ -21,15 +24,27 @@ class DraftAppAssetsInitializer(AsyncSandboxInitializer): self._assets_id = assets_id def initialize(self, sandbox: Sandbox) -> None: - vm = sandbox.vm - # Draft assets download via presigned URLs to avoid zip build overhead. - # FIXME(Yeuoly): merge 2 IO operations in DraftAppAssetsInitializer and AppAssetsAttrsInitializer + # Load published app assets and unzip the artifact bundle. app_assets = AppAssetService.get_tenant_app_assets(self._tenant_id, self._assets_id) + sandbox.attrs.set(AppAssetsAttrs.FILE_TREE, app_assets.asset_tree) + sandbox.attrs.set(AppAssetsAttrs.APP_ASSETS_ID, self._assets_id) - items = [ - AssetDownloadItem(path=path, url=url) - for path, url in AppAssetService.get_cached_draft_download_urls(app_assets) + vm = sandbox.vm + build_id = self._assets_id + tree = app_assets.asset_tree + storage = AppAssetService.assets_storage() + nodes = list(tree.walk_files()) + if not nodes: + return + # FIXME(Mairuis): should be more graceful + storage_keys = [ + AssetPaths.build_resolved_file(self._tenant_id, self._app_id, build_id, node.id) + if node.extension == "md" + else AssetPaths.draft_file(self._tenant_id, self._app_id, node.id) + for node in nodes ] + urls = storage.get_download_urls(storage_keys, DRAFT_ASSETS_EXPIRES_IN) + items = [AssetDownloadItem(path=tree.get_path(node.id).lstrip("/"), url=url) for node, url in zip(nodes, urls)] script = AssetDownloadService.build_download_script(items, AppAssets.PATH) pipeline(vm).add( ["sh", "-lc", script], diff --git a/api/core/sandbox/manager.py b/api/core/sandbox/manager.py index 6716c9bdcb..8e83d2253d 100644 --- a/api/core/sandbox/manager.py +++ b/api/core/sandbox/manager.py @@ -7,7 +7,6 @@ from typing import Final from core.sandbox.builder import SandboxBuilder from core.sandbox.entities import AppAssets, SandboxType from core.sandbox.entities.providers import SandboxProviderEntity -from core.sandbox.initializer.app_assets_attrs_loader import AppAssetsAttrsInitializer from core.sandbox.initializer.app_assets_initializer import AppAssetsInitializer from core.sandbox.initializer.dify_cli_initializer import DifyCliInitializer from core.sandbox.initializer.draft_app_assets_initializer import DraftAppAssetsInitializer @@ -124,7 +123,6 @@ class SandboxManager: .options(sandbox_provider.config) .user(user_id) .app(app_id) - .initializer(AppAssetsAttrsInitializer(tenant_id, app_id, assets.id)) .initializer(AppAssetsInitializer(tenant_id, app_id, assets.id)) .initializer(DifyCliInitializer(tenant_id, user_id, app_id, assets.id)) .initializer(SkillInitializer(tenant_id, user_id, app_id, assets.id)) @@ -161,7 +159,6 @@ class SandboxManager: .options(sandbox_provider.config) .user(user_id) .app(app_id) - .initializer(AppAssetsAttrsInitializer(tenant_id, app_id, assets.id)) .initializer(DraftAppAssetsInitializer(tenant_id, app_id, assets.id)) .initializer(DifyCliInitializer(tenant_id, user_id, app_id, assets.id)) .initializer(SkillInitializer(tenant_id, user_id, app_id, assets.id)) @@ -193,7 +190,6 @@ class SandboxManager: .options(sandbox_provider.config) .user(user_id) .app(app_id) - .initializer(AppAssetsAttrsInitializer(tenant_id, app_id, assets.id)) .initializer(AppAssetsInitializer(tenant_id, app_id, assets.id)) .initializer(DifyCliInitializer(tenant_id, user_id, app_id, assets.id)) .initializer(SkillInitializer(tenant_id, user_id, app_id, assets.id)) diff --git a/api/core/sandbox/services/asset_download_service.py b/api/core/sandbox/services/asset_download_service.py index 138c5e551f..372f767e13 100644 --- a/api/core/sandbox/services/asset_download_service.py +++ b/api/core/sandbox/services/asset_download_service.py @@ -7,14 +7,14 @@ from dataclasses import dataclass def _render_download_script(root_path: str, download_commands: str) -> str: python_download_cmd = ( - "python3 - \"${url}\" \"${dest}\" <<\"PY\"\n" + 'python3 - "${url}" "${dest}" <<"PY"\n' "import sys\n" "import urllib.request\n" "url = sys.argv[1]\n" "dest = sys.argv[2]\n" "with urllib.request.urlopen(url) as resp:\n" " data = resp.read()\n" - "with open(dest, \"wb\") as f:\n" + 'with open(dest, "wb") as f:\n' " f.write(data)\n" "PY" ) diff --git a/api/extensions/storage/cached_presign_storage.py b/api/extensions/storage/cached_presign_storage.py new file mode 100644 index 0000000000..b15c5cba87 --- /dev/null +++ b/api/extensions/storage/cached_presign_storage.py @@ -0,0 +1,172 @@ +"""Storage wrapper that caches presigned download URLs.""" + +import logging +from collections.abc import Generator +from typing import Any + +from extensions.storage.base_storage import BaseStorage + +logger = logging.getLogger(__name__) + + +class CachedPresignStorage(BaseStorage): + """Storage wrapper that caches presigned download URLs. + + Wraps a storage with presign capability and caches the generated URLs + in Redis to reduce repeated presign API calls. + + Example: + cached_storage = CachedPresignStorage( + storage=FilePresignStorage(base_storage), + redis_client=redis_client, + cache_key_prefix="app_asset:draft_download", + ) + url = cached_storage.get_download_url("path/to/file.txt", expires_in=3600) + """ + + TTL_BUFFER_SECONDS = 60 + MIN_TTL_SECONDS = 60 + + def __init__( + self, + storage: BaseStorage, + redis_client: Any, + cache_key_prefix: str = "presign_cache", + ): + super().__init__() + self._storage = storage + self._redis = redis_client + self._cache_key_prefix = cache_key_prefix + + def save(self, filename: str, data: bytes): + self._storage.save(filename, data) + + def load_once(self, filename: str) -> bytes: + return self._storage.load_once(filename) + + def load_stream(self, filename: str) -> Generator: + return self._storage.load_stream(filename) + + def download(self, filename: str, target_filepath: str): + self._storage.download(filename, target_filepath) + + def exists(self, filename: str) -> bool: + return self._storage.exists(filename) + + def delete(self, filename: str): + self._storage.delete(filename) + self.invalidate([filename]) + + def scan(self, path: str, files: bool = True, directories: bool = False) -> list[str]: + return self._storage.scan(path, files=files, directories=directories) + + def get_upload_url(self, filename: str, expires_in: int = 3600) -> str: + return self._storage.get_upload_url(filename, expires_in) + + def get_download_url(self, filename: str, expires_in: int = 3600) -> str: + """Get a presigned download URL, using cache when available. + + Args: + filename: The file path/key in storage + expires_in: URL validity duration in seconds (default: 1 hour) + + Returns: + Presigned URL string + """ + cache_key = self._cache_key(filename) + + cached = self._get_cached(cache_key) + if cached: + return cached + + url = self._storage.get_download_url(filename, expires_in) + self._set_cached(cache_key, url, expires_in) + + return url + + def get_download_urls( + self, + filenames: list[str], + expires_in: int = 3600, + ) -> list[str]: + """Batch get download URLs with cache. + + Args: + filenames: List of file paths/keys in storage + expires_in: URL validity duration in seconds (default: 1 hour) + + Returns: + List of presigned URLs in the same order as filenames + """ + if not filenames: + return [] + + cache_keys = [self._cache_key(f) for f in filenames] + cached_values = self._get_cached_batch(cache_keys) + + results: list[str] = [] + for filename, cache_key, cached in zip(filenames, cache_keys, cached_values): + if cached: + results.append(cached) + else: + url = self._storage.get_download_url(filename, expires_in) + self._set_cached(cache_key, url, expires_in) + results.append(url) + + return results + + def invalidate(self, filenames: list[str]) -> None: + """Invalidate cached URLs for given filenames. + + Args: + filenames: List of file paths/keys to invalidate + """ + if not filenames: + return + + cache_keys = [self._cache_key(f) for f in filenames] + try: + self._redis.delete(*cache_keys) + except Exception: + logger.warning("Failed to invalidate presign cache", exc_info=True) + + def _cache_key(self, filename: str) -> str: + """Generate cache key for a filename.""" + return f"{self._cache_key_prefix}:{filename}" + + def _compute_ttl(self, expires_in: int) -> int: + """Compute cache TTL from presign expiration. + + Returns TTL slightly shorter than presign expiry to ensure + cached URLs are refreshed before they expire. + """ + return max(expires_in - self.TTL_BUFFER_SECONDS, self.MIN_TTL_SECONDS) + + def _get_cached(self, cache_key: str) -> str | None: + """Get a single cached URL.""" + try: + values = self._redis.mget([cache_key]) + cached = values[0] if values else None + if cached: + return cached.decode("utf-8") if isinstance(cached, (bytes, bytearray)) else cached + return None + except Exception: + logger.warning("Failed to read presign cache", exc_info=True) + return None + + def _get_cached_batch(self, cache_keys: list[str]) -> list[str | None]: + """Get multiple cached URLs.""" + try: + cached_values = self._redis.mget(cache_keys) + return [v.decode("utf-8") if isinstance(v, (bytes, bytearray)) else v for v in cached_values] + except Exception: + logger.warning("Failed to read presign cache batch", exc_info=True) + return [None] * len(cache_keys) + + def _set_cached(self, cache_key: str, url: str, expires_in: int) -> None: + """Store a URL in cache with computed TTL.""" + ttl = self._compute_ttl(expires_in) + try: + self._redis.setex(cache_key, ttl, url) + except Exception: + logger.warning("Failed to write presign cache", exc_info=True) diff --git a/api/libs/attr_map.py b/api/libs/attr_map.py index cad546afce..c7dd61a820 100644 --- a/api/libs/attr_map.py +++ b/api/libs/attr_map.py @@ -77,8 +77,7 @@ class AttrMapTypeError(TypeError): self.expected_type = expected_type self.actual_type = actual_type super().__init__( - f"Attribute '{key.name}' expects type '{expected_type.__name__}', " - f"got '{actual_type.__name__}'" + f"Attribute '{key.name}' expects type '{expected_type.__name__}', got '{actual_type.__name__}'" ) diff --git a/api/services/app_asset_service.py b/api/services/app_asset_service.py index c847bdfc4d..93548535cb 100644 --- a/api/services/app_asset_service.py +++ b/api/services/app_asset_service.py @@ -13,12 +13,14 @@ from core.app.entities.app_asset_entities import ( TreePathConflictError, ) from core.app_assets.builder import AssetBuildPipeline, BuildContext +from core.app_assets.builder.file_builder import FileBuilder +from core.app_assets.builder.skill_builder import SkillBuilder from core.app_assets.converters import tree_to_asset_items from core.app_assets.packager import AssetZipPackager from core.app_assets.paths import AssetPaths from extensions.ext_database import db from extensions.ext_redis import redis_client -from extensions.ext_storage import storage +from extensions.storage.cached_presign_storage import CachedPresignStorage from extensions.storage.file_presign_storage import FilePresignStorage from models.app_asset import AppAssets from models.model import App @@ -35,84 +37,22 @@ logger = logging.getLogger(__name__) class AppAssetService: MAX_PREVIEW_CONTENT_SIZE = 5 * 1024 * 1024 # 5MB - _PRESIGN_CACHE_TTL_BUFFER_SECONDS = 300 - _PRESIGN_CACHE_MIN_TTL_SECONDS = 60 _LOCK_TIMEOUT_SECONDS = 60 + _DRAFT_CACHE_KEY_PREFIX = "app_asset:draft_download" @staticmethod def _lock(app_id: str): return redis_client.lock(f"app_asset:lock:{app_id}", timeout=AppAssetService._LOCK_TIMEOUT_SECONDS) @staticmethod - def _draft_download_cache_key(storage_key: str) -> str: - # Cache key for a single draft asset download URL. - return f"app_asset:draft_download:{storage_key}" + def assets_storage() -> CachedPresignStorage: + from extensions.ext_storage import storage - @staticmethod - def _get_cached_download_urls(cache_keys: list[str]) -> list[str | None] | None: - # Return cached draft download URLs per asset if available. - try: - cached = redis_client.mget(cache_keys) - except Exception: - logger.warning("Failed to read draft download cache", exc_info=True) - return None - - return cached - - @staticmethod - def _set_cached_download_url(cache_key: str, url: str, expires_in: int) -> None: - # Store draft download URL with TTL slightly shorter than presign expiry. - ttl = max( - expires_in - AppAssetService._PRESIGN_CACHE_TTL_BUFFER_SECONDS, - AppAssetService._PRESIGN_CACHE_MIN_TTL_SECONDS, + return CachedPresignStorage( + storage=FilePresignStorage(storage.storage_runner), + redis_client=redis_client, + cache_key_prefix=AppAssetService._DRAFT_CACHE_KEY_PREFIX, ) - try: - redis_client.setex(cache_key, ttl, url) - except Exception: - logger.warning("Failed to write draft download cache", exc_info=True) - - @staticmethod - def _clear_draft_download_cache(storage_keys: list[str]) -> None: - # Clear draft download URL cache for specific assets. - if not storage_keys: - return - cache_keys = [AppAssetService._draft_download_cache_key(key) for key in storage_keys] - try: - redis_client.delete(*cache_keys) - except Exception: - logger.warning("Failed to clear draft download cache", exc_info=True) - - @staticmethod - def get_cached_draft_download_urls(app_assets: AppAssets, *, expires_in: int = 3600) -> list[tuple[str, str]]: - # Build draft download URLs with cache to avoid repeated presign calls. - tree = app_assets.asset_tree - build_id = app_assets.id - presign_storage = FilePresignStorage(storage.storage_runner) - nodes = list(tree.walk_files()) - if not nodes: - return [] - storage_keys = [ - AssetPaths.build_resolved_file(app_assets.tenant_id, app_assets.app_id, build_id, node.id) - if node.extension == "md" - else AssetPaths.draft_file(app_assets.tenant_id, app_assets.app_id, node.id) - for node in nodes - ] - cache_keys = [AppAssetService._draft_download_cache_key(key) for key in storage_keys] - cached_values = AppAssetService._get_cached_download_urls(cache_keys) - if cached_values is None: - cached_values = [None] * len(nodes) - - items: list[tuple[str, str]] = [] - for node, storage_key, cache_key, cached in zip(nodes, storage_keys, cache_keys, cached_values): - path = tree.get_path(node.id) - if cached: - url = cached.decode("utf-8") if isinstance(cached, (bytes, bytearray)) else cached - else: - url = presign_storage.get_download_url(storage_key, expires_in) - AppAssetService._set_cached_download_url(cache_key, url, expires_in) - items.append((path, url)) - - return items @staticmethod def _draft_storage_key_for_node(tenant_id: str, app_id: str, assets_id: str, node: AppAssetNode) -> str: @@ -238,9 +178,8 @@ class AppAssetService: raise AppAssetNodeTooLargeError(f"File node {node_id} size exceeded the limit: {max_size_mb} MB") storage_key = AssetPaths.draft_file(app_model.tenant_id, app_model.id, node_id) - return storage.load_once(storage_key) + return AppAssetService.assets_storage().load_once(storage_key) - # FIXME(Mairuis): migrate to presigned upload API @staticmethod def update_file_content( app_model: App, @@ -259,20 +198,12 @@ class AppAssetService: raise AppAssetNodeNotFoundError(str(e)) from e storage_key = AssetPaths.draft_file(app_model.tenant_id, app_model.id, node_id) - storage.save(storage_key, content) + AppAssetService.assets_storage().save(storage_key, content) assets.asset_tree = tree assets.updated_by = account_id session.commit() - cache_key = AppAssetService._draft_storage_key_for_node( - app_model.tenant_id, - app_model.id, - assets.id, - node, - ) - AppAssetService._clear_draft_download_cache([cache_key]) - return node @staticmethod @@ -286,10 +217,6 @@ class AppAssetService: with Session(db.engine, expire_on_commit=False) as session: assets = AppAssetService.get_or_create_assets(session, app_model, account_id) tree = assets.asset_tree - - old_node = tree.get(node_id) - old_extension = old_node.extension if old_node else None - try: node = tree.rename(node_id, new_name) except TreeNodeNotFoundError as e: @@ -300,26 +227,6 @@ class AppAssetService: assets.asset_tree = tree assets.updated_by = account_id session.commit() - - if node.node_type == AssetNodeType.FILE: - cache_keys: list[str] = [] - if old_extension is not None: - old_storage_key = ( - AssetPaths.build_resolved_file(app_model.tenant_id, app_model.id, assets.id, node.id) - if old_extension == "md" - else AssetPaths.draft_file(app_model.tenant_id, app_model.id, node.id) - ) - cache_keys.append(old_storage_key) - cache_keys.append( - AppAssetService._draft_storage_key_for_node( - app_model.tenant_id, - app_model.id, - assets.id, - node, - ) - ) - AppAssetService._clear_draft_download_cache(list(set(cache_keys))) - return node @staticmethod @@ -347,15 +254,6 @@ class AppAssetService: assets.updated_by = account_id session.commit() - if node.node_type == AssetNodeType.FILE: - cache_key = AppAssetService._draft_storage_key_for_node( - app_model.tenant_id, - app_model.id, - assets.id, - node, - ) - AppAssetService._clear_draft_download_cache([cache_key]) - return node @staticmethod @@ -388,14 +286,6 @@ class AppAssetService: assets = AppAssetService.get_or_create_assets(session, app_model, account_id) tree = assets.asset_tree - target_ids = [node_id] + tree.get_descendant_ids(node_id) - target_nodes = [tree.get(nid) for nid in target_ids] - cache_keys = [ - AppAssetService._draft_storage_key_for_node(app_model.tenant_id, app_model.id, assets.id, node) - for node in target_nodes - if node is not None and node.node_type == AssetNodeType.FILE - ] - try: removed_ids = tree.remove(node_id) except TreeNodeNotFoundError as e: @@ -404,7 +294,7 @@ class AppAssetService: for nid in removed_ids: storage_key = AssetPaths.draft_file(app_model.tenant_id, app_model.id, nid) try: - storage.delete(storage_key) + AppAssetService.assets_storage().delete(storage_key) except Exception: logger.warning("Failed to delete storage file %s", storage_key, exc_info=True) @@ -412,8 +302,6 @@ class AppAssetService: assets.updated_by = account_id session.commit() - AppAssetService._clear_draft_download_cache(cache_keys) - @staticmethod def publish(session: Session, app_model: App, account_id: str, workflow_id: str) -> AppAssets: tenant_id = app_model.tenant_id @@ -436,18 +324,20 @@ class AppAssetService: session.flush() ctx = BuildContext(tenant_id=tenant_id, app_id=app_id, build_id=publish_id) - built_assets = AssetBuildPipeline().build_all(tree, ctx) + built_assets = AssetBuildPipeline( + [SkillBuilder(storage=AppAssetService.assets_storage()), FileBuilder()] + ).build_all(tree, ctx) - packager = AssetZipPackager(storage) + packager = AssetZipPackager(AppAssetService.assets_storage()) runtime_zip_bytes = packager.package(built_assets) runtime_zip_key = AssetPaths.build_zip(tenant_id, app_id, publish_id) - storage.save(runtime_zip_key, runtime_zip_bytes) + AppAssetService.assets_storage().save(runtime_zip_key, runtime_zip_bytes) source_items = tree_to_asset_items(tree, tenant_id, app_id) source_zip_bytes = packager.package(source_items) source_zip_key = AssetPaths.build_source_zip(tenant_id, app_id, workflow_id) - storage.save(source_zip_key, source_zip_bytes) + AppAssetService.assets_storage().save(source_zip_key, source_zip_bytes) return published @@ -457,12 +347,14 @@ class AppAssetService: tree = assets.asset_tree ctx = BuildContext(tenant_id=tenant_id, app_id=app_id, build_id=assets.id) - built_assets = AssetBuildPipeline().build_all(tree, ctx) + built_assets = AssetBuildPipeline( + [SkillBuilder(storage=AppAssetService.assets_storage()), FileBuilder()] + ).build_all(tree, ctx) - packager = AssetZipPackager(storage) + packager = AssetZipPackager(storage=AppAssetService.assets_storage()) zip_bytes = packager.package(built_assets) zip_key = AssetPaths.build_zip(tenant_id, app_id, assets.id) - storage.save(zip_key, zip_bytes) + AppAssetService.assets_storage().save(zip_key, zip_bytes) @staticmethod def get_file_download_url( @@ -480,14 +372,13 @@ class AppAssetService: raise AppAssetNodeNotFoundError(f"File node {node_id} not found") storage_key = AssetPaths.draft_file(app_model.tenant_id, app_model.id, node_id) - presign_storage = FilePresignStorage(storage.storage_runner) - return presign_storage.get_download_url(storage_key, expires_in) + return AppAssetService.assets_storage().get_download_url(storage_key, expires_in) @staticmethod def get_source_zip_bytes(tenant_id: str, app_id: str, workflow_id: str) -> bytes | None: source_zip_key = AssetPaths.build_source_zip(tenant_id, app_id, workflow_id) try: - return storage.load_once(source_zip_key) + return AppAssetService.assets_storage().load_once(source_zip_key) except Exception: logger.warning("Source zip not found: %s", source_zip_key) return None @@ -545,7 +436,7 @@ class AppAssetService: session.commit() storage_key = AssetPaths.draft_file(app_model.tenant_id, app_model.id, node_id) - presign_storage = FilePresignStorage(storage.storage_runner) + presign_storage = AppAssetService.assets_storage() upload_url = presign_storage.get_upload_url(storage_key, expires_in) return node, upload_url @@ -581,12 +472,12 @@ class AppAssetService: assets.updated_by = account_id session.commit() - presign_storage = FilePresignStorage(storage.storage_runner) + storage = AppAssetService.assets_storage() def fill_urls(node: BatchUploadNode) -> None: if node.node_type == AssetNodeType.FILE and node.id: storage_key = AssetPaths.draft_file(app_model.tenant_id, app_model.id, node.id) - node.upload_url = presign_storage.get_upload_url(storage_key, expires_in) + node.upload_url = storage.get_upload_url(storage_key, expires_in) for child in node.children: fill_urls(child) diff --git a/api/tests/unit_tests/extensions/storage/test_cached_presign_storage.py b/api/tests/unit_tests/extensions/storage/test_cached_presign_storage.py new file mode 100644 index 0000000000..280f8925ab --- /dev/null +++ b/api/tests/unit_tests/extensions/storage/test_cached_presign_storage.py @@ -0,0 +1,204 @@ +from unittest.mock import Mock + +import pytest + +from extensions.storage.cached_presign_storage import CachedPresignStorage + + +class TestCachedPresignStorage: + """Test suite for CachedPresignStorage class.""" + + @pytest.fixture + def mock_storage(self): + """Create a mock underlying storage.""" + return Mock() + + @pytest.fixture + def mock_redis(self): + """Create a mock Redis client.""" + return Mock() + + @pytest.fixture + def cached_storage(self, mock_storage, mock_redis): + """Create CachedPresignStorage with mocks.""" + return CachedPresignStorage( + storage=mock_storage, + redis_client=mock_redis, + cache_key_prefix="test_prefix", + ) + + def test_get_download_url_returns_cached_on_hit(self, cached_storage, mock_storage, mock_redis): + """Test that cached URL is returned when cache hit occurs.""" + mock_redis.mget.return_value = [b"https://cached-url.com/file.txt"] + + result = cached_storage.get_download_url("path/to/file.txt", expires_in=3600) + + assert result == "https://cached-url.com/file.txt" + mock_redis.mget.assert_called_once_with(["test_prefix:path/to/file.txt"]) + mock_storage.get_download_url.assert_not_called() + mock_redis.setex.assert_not_called() + + def test_get_download_url_calls_storage_on_miss(self, cached_storage, mock_storage, mock_redis): + """Test that storage is called and result cached on cache miss.""" + mock_redis.mget.return_value = [None] + mock_storage.get_download_url.return_value = "https://new-url.com/file.txt" + + result = cached_storage.get_download_url("path/to/file.txt", expires_in=3600) + + assert result == "https://new-url.com/file.txt" + mock_redis.mget.assert_called_once_with(["test_prefix:path/to/file.txt"]) + mock_storage.get_download_url.assert_called_once_with("path/to/file.txt", 3600) + mock_redis.setex.assert_called_once() + call_args = mock_redis.setex.call_args + assert call_args[0][0] == "test_prefix:path/to/file.txt" + assert call_args[0][2] == "https://new-url.com/file.txt" + + def test_get_download_urls_batch_operation(self, cached_storage, mock_storage, mock_redis): + """Test batch URL retrieval with mixed cache hits/misses.""" + mock_redis.mget.return_value = [b"https://cached1.com", None, b"https://cached2.com"] + mock_storage.get_download_url.return_value = "https://new.com" + + filenames = ["file1.txt", "file2.txt", "file3.txt"] + result = cached_storage.get_download_urls(filenames, expires_in=3600) + + assert result == ["https://cached1.com", "https://new.com", "https://cached2.com"] + mock_storage.get_download_url.assert_called_once_with("file2.txt", 3600) + mock_redis.setex.assert_called_once() + + def test_get_download_urls_empty_list(self, cached_storage, mock_storage, mock_redis): + """Test batch URL retrieval with empty list.""" + result = cached_storage.get_download_urls([], expires_in=3600) + + assert result == [] + mock_redis.mget.assert_not_called() + mock_storage.get_download_url.assert_not_called() + + def test_invalidate_clears_cache(self, cached_storage, mock_redis): + """Test that invalidate deletes the correct cache keys.""" + filenames = ["file1.txt", "file2.txt"] + cached_storage.invalidate(filenames) + + mock_redis.delete.assert_called_once_with( + "test_prefix:file1.txt", + "test_prefix:file2.txt", + ) + + def test_invalidate_empty_list(self, cached_storage, mock_redis): + """Test that invalidate does nothing for empty list.""" + cached_storage.invalidate([]) + + mock_redis.delete.assert_not_called() + + def test_ttl_calculation_with_normal_expiry(self, cached_storage): + """Test TTL is computed correctly for normal expiry values.""" + ttl = cached_storage._compute_ttl(3600) + expected = 3600 - CachedPresignStorage.TTL_BUFFER_SECONDS + assert ttl == expected + + def test_ttl_calculation_respects_minimum(self, cached_storage): + """Test TTL respects minimum value for short expiry times.""" + ttl = cached_storage._compute_ttl(100) + assert ttl == CachedPresignStorage.MIN_TTL_SECONDS + + def test_ttl_calculation_edge_case(self, cached_storage): + """Test TTL calculation at the boundary.""" + ttl = cached_storage._compute_ttl(CachedPresignStorage.TTL_BUFFER_SECONDS + 30) + assert ttl == CachedPresignStorage.MIN_TTL_SECONDS + + def test_graceful_degradation_on_redis_mget_error(self, cached_storage, mock_storage, mock_redis): + """Test that storage is called when Redis mget fails.""" + mock_redis.mget.side_effect = Exception("Redis connection error") + mock_storage.get_download_url.return_value = "https://new-url.com/file.txt" + + result = cached_storage.get_download_url("path/to/file.txt", expires_in=3600) + + assert result == "https://new-url.com/file.txt" + mock_storage.get_download_url.assert_called_once_with("path/to/file.txt", 3600) + + def test_graceful_degradation_on_redis_setex_error(self, cached_storage, mock_storage, mock_redis): + """Test that URL is still returned when Redis setex fails.""" + mock_redis.mget.return_value = [None] + mock_redis.setex.side_effect = Exception("Redis connection error") + mock_storage.get_download_url.return_value = "https://new-url.com/file.txt" + + result = cached_storage.get_download_url("path/to/file.txt", expires_in=3600) + + assert result == "https://new-url.com/file.txt" + + def test_graceful_degradation_on_redis_delete_error(self, cached_storage, mock_redis): + """Test that invalidate doesn't raise when Redis delete fails.""" + mock_redis.delete.side_effect = Exception("Redis connection error") + + cached_storage.invalidate(["file.txt"]) + + def test_delegates_save_to_storage(self, cached_storage, mock_storage): + """Test that save delegates to underlying storage.""" + cached_storage.save("file.txt", b"data") + mock_storage.save.assert_called_once_with("file.txt", b"data") + + def test_delegates_load_once_to_storage(self, cached_storage, mock_storage): + """Test that load_once delegates to underlying storage.""" + mock_storage.load_once.return_value = b"content" + result = cached_storage.load_once("file.txt") + assert result == b"content" + mock_storage.load_once.assert_called_once_with("file.txt") + + def test_delegates_exists_to_storage(self, cached_storage, mock_storage): + """Test that exists delegates to underlying storage.""" + mock_storage.exists.return_value = True + result = cached_storage.exists("file.txt") + assert result is True + mock_storage.exists.assert_called_once_with("file.txt") + + def test_delete_delegates_and_invalidates_cache(self, cached_storage, mock_storage, mock_redis): + """Test that delete delegates to storage and invalidates cache.""" + cached_storage.delete("file.txt") + + mock_storage.delete.assert_called_once_with("file.txt") + mock_redis.delete.assert_called_once_with("test_prefix:file.txt") + + def test_delegates_scan_to_storage(self, cached_storage, mock_storage): + """Test that scan delegates to underlying storage.""" + mock_storage.scan.return_value = ["file1.txt", "file2.txt"] + result = cached_storage.scan("path/", files=True, directories=False) + assert result == ["file1.txt", "file2.txt"] + mock_storage.scan.assert_called_once_with("path/", files=True, directories=False) + + def test_delegates_get_upload_url_to_storage(self, cached_storage, mock_storage): + """Test that get_upload_url delegates to underlying storage.""" + mock_storage.get_upload_url.return_value = "https://upload-url.com" + result = cached_storage.get_upload_url("file.txt", expires_in=3600) + assert result == "https://upload-url.com" + mock_storage.get_upload_url.assert_called_once_with("file.txt", 3600) + + def test_cache_key_generation(self, cached_storage): + """Test cache key is generated correctly.""" + key = cached_storage._cache_key("path/to/file.txt") + assert key == "test_prefix:path/to/file.txt" + + def test_cached_value_decoded_from_bytes(self, cached_storage, mock_storage, mock_redis): + """Test that bytes cached values are decoded to strings.""" + mock_redis.mget.return_value = [b"https://cached-url.com"] + + result = cached_storage.get_download_url("file.txt") + + assert result == "https://cached-url.com" + assert isinstance(result, str) + + def test_cached_value_decoded_from_bytearray(self, cached_storage, mock_storage, mock_redis): + """Test that bytearray cached values are decoded to strings.""" + mock_redis.mget.return_value = [bytearray(b"https://cached-url.com")] + + result = cached_storage.get_download_url("file.txt") + + assert result == "https://cached-url.com" + assert isinstance(result, str) + + def test_default_cache_key_prefix(self, mock_storage, mock_redis): + """Test default cache key prefix is used when not specified.""" + storage = CachedPresignStorage( + storage=mock_storage, + redis_client=mock_redis, + ) + key = storage._cache_key("file.txt") + assert key == "presign_cache:file.txt"