From a8074f4f4a6f536b3d9cd947466a68df160df209 Mon Sep 17 00:00:00 2001 From: Harry Date: Mon, 9 Mar 2026 14:44:21 +0800 Subject: [PATCH] feat: add DB inline content cache for app asset draft files MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Introduce app_asset_contents table as a read-through cache over S3 for text-like asset files (e.g. .md skill documents). This eliminates N individual S3 fetches during SkillBuilder builds — bulk_load pulls all content in a single SQL query with S3 fallback on miss. Key components: - CachedContentAccessor: DB-first read / dual-write / S3 fallback - AssetContentService: static DB operations (get, get_many, upsert, delete) - should_mirror(): single source of truth for extension-based policy - Alembic migration for app_asset_contents table Modified callers: - SkillBuilder uses accessor.bulk_load() instead of per-node S3 reads - AppAssetService.get/update_file_content route through accessor - delete_node cleans both DB cache and S3 - draft_app_assets_initializer uses should_mirror() instead of hardcoded .md --- api/core/app_assets/accessor.py | 167 ++++++++++++++++++ api/core/app_assets/builder/skill_builder.py | 22 ++- .../draft_app_assets_initializer.py | 5 +- api/core/sandbox/inspector/runtime_source.py | 4 +- ...e0aa981887_add_app_asset_contents_table.py | 40 +++++ api/models/__init__.py | 3 +- api/models/app_asset.py | 40 ++++- api/services/app_asset_package_service.py | 10 +- api/services/app_asset_service.py | 34 ++-- api/services/asset_content_service.py | 103 +++++++++++ .../core/app_assets/test_storage.py | 15 -- 11 files changed, 391 insertions(+), 52 deletions(-) create mode 100644 api/core/app_assets/accessor.py create mode 100644 api/migrations/versions/2026_03_09_1200-5ee0aa981887_add_app_asset_contents_table.py create mode 100644 api/services/asset_content_service.py diff --git a/api/core/app_assets/accessor.py b/api/core/app_assets/accessor.py new file mode 100644 index 0000000000..a09ff692e8 --- /dev/null +++ b/api/core/app_assets/accessor.py @@ -0,0 +1,167 @@ +"""Unified content accessor for app asset nodes. + +Accessor is scoped to a single app (tenant_id + app_id), not a single node. +All methods accept an AppAssetNode parameter to identify the target. + +CachedContentAccessor is the primary entry point: +- Reads DB first, misses fall through to S3 with sync backfill. +- Writes go to both DB and S3 (dual-write). +- Wraps an internal StorageContentAccessor for S3 I/O. + +Public helper: +- should_mirror(extension) — the ONLY place that maps file extensions to the + "should this node use DB mirror?" decision. All callers (presigned-upload + gating, etc.) should use this function instead of hard-coding extension checks. + +Collaborators: + - services.asset_content_service.AssetContentService (DB layer) + - core.app_assets.storage.AssetPaths (S3 key generation) + - extensions.storage.cached_presign_storage.CachedPresignStorage (S3 I/O) +""" + +from __future__ import annotations + +import logging + +from core.app.entities.app_asset_entities import AppAssetNode +from core.app_assets.storage import AssetPaths +from extensions.storage.cached_presign_storage import CachedPresignStorage +from services.asset_content_service import AssetContentService + +logger = logging.getLogger(__name__) + +# --------------------------------------------------------------------------- +# Extension-based policy — the single source of truth +# --------------------------------------------------------------------------- + +_MIRROR_EXTENSIONS: frozenset[str] = frozenset({"md"}) + + +def should_mirror(extension: str) -> bool: + """Return True if files with *extension* should be cached in DB. + + This is the ONLY place that maps file extensions to the inline-mirror + decision. All other modules should call this function instead of + checking extensions directly. + """ + return extension.lower() in _MIRROR_EXTENSIONS + + +# --------------------------------------------------------------------------- +# S3-only implementation (internal, used as inner delegate) +# --------------------------------------------------------------------------- + + +class _StorageAccessor: + """Reads/writes draft content via object storage (S3) only.""" + + _storage: CachedPresignStorage + _tenant_id: str + _app_id: str + + def __init__(self, storage: CachedPresignStorage, tenant_id: str, app_id: str) -> None: + self._storage = storage + self._tenant_id = tenant_id + self._app_id = app_id + + def _key(self, node: AppAssetNode) -> str: + return AssetPaths.draft(self._tenant_id, self._app_id, node.id) + + def load(self, node: AppAssetNode) -> bytes: + return self._storage.load_once(self._key(node)) + + def save(self, node: AppAssetNode, content: bytes) -> None: + self._storage.save(self._key(node), content) + + def delete(self, node: AppAssetNode) -> None: + try: + self._storage.delete(self._key(node)) + except Exception: + logger.warning("Failed to delete storage key %s", self._key(node), exc_info=True) + + +# --------------------------------------------------------------------------- +# DB-cached implementation (the public API) +# --------------------------------------------------------------------------- + + +class CachedContentAccessor: + """App-level content accessor with DB read-through cache over S3. + + Read path: DB first -> miss -> S3 fallback -> sync backfill DB + Write path: DB upsert + S3 save (dual-write) + Delete path: DB delete + S3 delete + + bulk_load uses a single SQL query for all nodes, with S3 fallback per miss. + + Usage: + accessor = CachedContentAccessor(storage, tenant_id, app_id) + content = accessor.load(node) + accessor.save(node, content) + results = accessor.bulk_load(nodes) + """ + + _inner: _StorageAccessor + _tenant_id: str + _app_id: str + + def __init__(self, storage: CachedPresignStorage, tenant_id: str, app_id: str) -> None: + self._inner = _StorageAccessor(storage, tenant_id, app_id) + self._tenant_id = tenant_id + self._app_id = app_id + + def load(self, node: AppAssetNode) -> bytes: + # 1. Try DB + cached = AssetContentService.get(self._tenant_id, self._app_id, node.id) + if cached is not None: + return cached.encode("utf-8") + + # 2. Fallback to S3 + data = self._inner.load(node) + + # 3. Sync backfill DB + AssetContentService.upsert( + tenant_id=self._tenant_id, + app_id=self._app_id, + node_id=node.id, + content=data.decode("utf-8"), + size=len(data), + ) + return data + + def bulk_load(self, nodes: list[AppAssetNode]) -> dict[str, bytes]: + """Single SQL for all nodes, S3 fallback + backfill per miss.""" + result: dict[str, bytes] = {} + node_ids = [n.id for n in nodes] + cached = AssetContentService.get_many(self._tenant_id, self._app_id, node_ids) + + for node in nodes: + if node.id in cached: + result[node.id] = cached[node.id].encode("utf-8") + else: + # S3 fallback + sync backfill + data = self._inner.load(node) + AssetContentService.upsert( + tenant_id=self._tenant_id, + app_id=self._app_id, + node_id=node.id, + content=data.decode("utf-8"), + size=len(data), + ) + result[node.id] = data + return result + + def save(self, node: AppAssetNode, content: bytes) -> None: + # Dual-write: DB + S3 + AssetContentService.upsert( + tenant_id=self._tenant_id, + app_id=self._app_id, + node_id=node.id, + content=content.decode("utf-8"), + size=len(content), + ) + self._inner.save(node, content) + + def delete(self, node: AppAssetNode) -> None: + AssetContentService.delete(self._tenant_id, self._app_id, node.id) + self._inner.delete(node) diff --git a/api/core/app_assets/builder/skill_builder.py b/api/core/app_assets/builder/skill_builder.py index b485da5e90..174b9f95cd 100644 --- a/api/core/app_assets/builder/skill_builder.py +++ b/api/core/app_assets/builder/skill_builder.py @@ -2,12 +2,13 @@ import json import logging from core.app.entities.app_asset_entities import AppAssetFileTree, AppAssetNode +from core.app_assets.accessor import CachedContentAccessor from core.app_assets.entities import AssetItem from core.app_assets.storage import AssetPaths from core.skill.assembler import SkillBundleAssembler from core.skill.entities.skill_bundle import SkillBundle from core.skill.entities.skill_document import SkillDocument -from extensions.storage.cached_presign_storage import CachedPresignStorage +from extensions.storage.base_storage import BaseStorage from .base import BuildContext @@ -16,10 +17,12 @@ logger = logging.getLogger(__name__) class SkillBuilder: _nodes: list[tuple[AppAssetNode, str]] - _storage: CachedPresignStorage + _accessor: CachedContentAccessor + _storage: BaseStorage - def __init__(self, storage: CachedPresignStorage) -> None: + def __init__(self, accessor: CachedContentAccessor, storage: BaseStorage) -> None: self._nodes = [] + self._accessor = accessor self._storage = storage def accept(self, node: AppAssetNode) -> bool: @@ -37,15 +40,16 @@ class SkillBuilder: ) return [] - # load documents – skip nodes whose draft content is still the empty - # placeholder written at creation time (the front-end has not uploaded - # the actual skill document yet). + # Batch-load all skill draft content in one DB query (with S3 fallback on miss). + nodes_only = [node for node, _ in self._nodes] + raw_contents = self._accessor.bulk_load(nodes_only) + + # Parse documents — skip nodes whose draft content is still the empty + # placeholder written at creation time. documents: dict[str, SkillDocument] = {} for node, _ in self._nodes: try: - key = AssetPaths.draft(ctx.tenant_id, ctx.app_id, node.id) - raw = self._storage.load_once(key) - # skip empty content + raw = raw_contents.get(node.id) if not raw: continue data = {"skill_id": node.id, **json.loads(raw)} diff --git a/api/core/sandbox/initializer/draft_app_assets_initializer.py b/api/core/sandbox/initializer/draft_app_assets_initializer.py index ece8f93aee..6ab4ca06af 100644 --- a/api/core/sandbox/initializer/draft_app_assets_initializer.py +++ b/api/core/sandbox/initializer/draft_app_assets_initializer.py @@ -1,5 +1,6 @@ import logging +from core.app_assets.accessor import should_mirror from core.app_assets.constants import AppAssetsAttrs from core.app_assets.storage import AssetPaths from core.sandbox.entities import AppAssets @@ -30,10 +31,10 @@ class DraftAppAssetsInitializer(AsyncSandboxInitializer): nodes = list(tree.walk_files()) if not nodes: return - # FIXME(Mairuis): should be more graceful + # Inline-mirror nodes use the resolved (compiled) key; others use draft. keys = [ AssetPaths.resolved(self._tenant_id, self._app_id, build_id, node.id) - if node.extension == "md" + if should_mirror(node.extension) else AssetPaths.draft(self._tenant_id, self._app_id, node.id) for node in nodes ] diff --git a/api/core/sandbox/inspector/runtime_source.py b/api/core/sandbox/inspector/runtime_source.py index 2cb92e034b..ae8294aca9 100644 --- a/api/core/sandbox/inspector/runtime_source.py +++ b/api/core/sandbox/inspector/runtime_source.py @@ -115,9 +115,7 @@ class SandboxFileRuntimeSource(SandboxFileSource): raise RuntimeError(str(exc)) from exc finally: try: - pipeline(self._runtime).add(["rm", "-f", archive_path]).execute( - timeout=self._LIST_TIMEOUT_SECONDS - ) + pipeline(self._runtime).add(["rm", "-f", archive_path]).execute(timeout=self._LIST_TIMEOUT_SECONDS) except Exception as exc: # Best-effort cleanup; do not fail the download on cleanup issues. logger.debug("Failed to cleanup temp archive %s: %s", archive_path, exc) diff --git a/api/migrations/versions/2026_03_09_1200-5ee0aa981887_add_app_asset_contents_table.py b/api/migrations/versions/2026_03_09_1200-5ee0aa981887_add_app_asset_contents_table.py new file mode 100644 index 0000000000..fe4a44ff22 --- /dev/null +++ b/api/migrations/versions/2026_03_09_1200-5ee0aa981887_add_app_asset_contents_table.py @@ -0,0 +1,40 @@ +"""Add app_asset_contents table for inline content caching. + +Revision ID: 5ee0aa981887 +Revises: aab323465866 +Create Date: 2026-03-09 12:00:00.000000 + +""" + +import sqlalchemy as sa +from alembic import op + +import models as models + +# revision identifiers, used by Alembic. +revision = "5ee0aa981887" +down_revision = "aab323465866" +branch_labels = None +depends_on = None + + +def upgrade() -> None: + op.create_table( + "app_asset_contents", + sa.Column("id", models.types.StringUUID(), nullable=False), + sa.Column("tenant_id", models.types.StringUUID(), nullable=False), + sa.Column("app_id", models.types.StringUUID(), nullable=False), + sa.Column("node_id", models.types.StringUUID(), nullable=False), + sa.Column("content", sa.Text(), nullable=False, server_default=""), + sa.Column("size", sa.Integer(), nullable=False, server_default="0"), + sa.Column("created_at", sa.DateTime(), nullable=False, server_default=sa.func.current_timestamp()), + sa.Column("updated_at", sa.DateTime(), nullable=False, server_default=sa.func.current_timestamp()), + sa.PrimaryKeyConstraint("id", name="app_asset_contents_pkey"), + sa.UniqueConstraint("tenant_id", "app_id", "node_id", name="uq_asset_content_node"), + ) + op.create_index("idx_asset_content_app", "app_asset_contents", ["tenant_id", "app_id"]) + + +def downgrade() -> None: + op.drop_index("idx_asset_content_app", table_name="app_asset_contents") + op.drop_table("app_asset_contents") diff --git a/api/models/__init__.py b/api/models/__init__.py index fdc384f205..6b9d509482 100644 --- a/api/models/__init__.py +++ b/api/models/__init__.py @@ -9,7 +9,7 @@ from .account import ( TenantStatus, ) from .api_based_extension import APIBasedExtension, APIBasedExtensionPoint -from .app_asset import AppAssets +from .app_asset import AppAssetContent, AppAssets from .comment import ( WorkflowComment, WorkflowCommentMention, @@ -136,6 +136,7 @@ __all__ = [ "App", "AppAnnotationHitHistory", "AppAnnotationSetting", + "AppAssetContent", "AppAssets", "AppDatasetJoin", "AppMCPServer", diff --git a/api/models/app_asset.py b/api/models/app_asset.py index dc9d6282c8..c2d32b2d42 100644 --- a/api/models/app_asset.py +++ b/api/models/app_asset.py @@ -2,7 +2,7 @@ from datetime import datetime from uuid import uuid4 import sqlalchemy as sa -from sqlalchemy import DateTime, String, func +from sqlalchemy import DateTime, Integer, func from sqlalchemy.orm import Mapped, mapped_column from core.app.entities.app_asset_entities import AppAssetFileTree @@ -49,3 +49,41 @@ class AppAssets(Base): def __repr__(self) -> str: return f"" + + +class AppAssetContent(Base): + """Inline content cache for app asset draft files. + + Acts as a read-through cache for S3: text-like asset content is dual-written + here on save and read from DB first (falling back to S3 on miss with sync backfill). + Keyed by (tenant_id, app_id, node_id) — stores only the current draft content, + not published snapshots. + + See core/app_assets/content_accessor.py for the accessor abstraction that + manages the DB/S3 read-through and dual-write logic. + """ + + __tablename__ = "app_asset_contents" + __table_args__ = ( + sa.PrimaryKeyConstraint("id", name="app_asset_contents_pkey"), + sa.UniqueConstraint("tenant_id", "app_id", "node_id", name="uq_asset_content_node"), + sa.Index("idx_asset_content_app", "tenant_id", "app_id"), + ) + + id: Mapped[str] = mapped_column(StringUUID, default=lambda: str(uuid4())) + tenant_id: Mapped[str] = mapped_column(StringUUID, nullable=False) + app_id: Mapped[str] = mapped_column(StringUUID, nullable=False) + node_id: Mapped[str] = mapped_column(StringUUID, nullable=False) + content: Mapped[str] = mapped_column(LongText, nullable=False, default="") + size: Mapped[int] = mapped_column(Integer, nullable=False, default=0) + created_at: Mapped[datetime] = mapped_column(DateTime, nullable=False, server_default=func.current_timestamp()) + updated_at: Mapped[datetime] = mapped_column( + DateTime, + nullable=False, + default=func.current_timestamp(), + server_default=func.current_timestamp(), + onupdate=func.current_timestamp(), + ) + + def __repr__(self) -> str: + return f"" diff --git a/api/services/app_asset_package_service.py b/api/services/app_asset_package_service.py index 4af3a17ad8..c1ab180e58 100644 --- a/api/services/app_asset_package_service.py +++ b/api/services/app_asset_package_service.py @@ -158,8 +158,10 @@ class AppAssetPackageService: session.flush() asset_storage = AppAssetService.get_storage() + accessor = AppAssetService.get_accessor(tenant_id, app_id) + pipeline = AssetBuildPipeline([SkillBuilder(accessor=accessor, storage=asset_storage), FileBuilder()]) ctx = BuildContext(tenant_id=tenant_id, app_id=app_id, build_id=publish_id) - built_assets = AssetBuildPipeline([SkillBuilder(storage=asset_storage), FileBuilder()]).build_all(tree, ctx) + built_assets = pipeline.build_all(tree, ctx) runtime_zip_key = AssetPaths.build_zip(tenant_id, app_id, publish_id) runtime_upload_url = asset_storage.get_upload_url(runtime_zip_key) @@ -194,10 +196,10 @@ class AppAssetPackageService: tree = assets.asset_tree asset_storage = AppAssetService.get_storage() + accessor = AppAssetService.get_accessor(tenant_id, app_id) + pipeline = AssetBuildPipeline([SkillBuilder(accessor=accessor, storage=asset_storage), FileBuilder()]) ctx = BuildContext(tenant_id=tenant_id, app_id=app_id, build_id=assets.id) - built_assets: list[AssetItem] = AssetBuildPipeline( - [SkillBuilder(storage=asset_storage), FileBuilder()] - ).build_all(tree, ctx) + built_assets: list[AssetItem] = pipeline.build_all(tree, ctx) user_id = getattr(assets, "updated_by", None) or getattr(assets, "created_by", None) or "system" key = AssetPaths.build_zip(tenant_id, app_id, assets.id) diff --git a/api/services/app_asset_service.py b/api/services/app_asset_service.py index 72db0dd473..50c107a551 100644 --- a/api/services/app_asset_service.py +++ b/api/services/app_asset_service.py @@ -13,6 +13,7 @@ from core.app.entities.app_asset_entities import ( TreeParentNotFoundError, TreePathConflictError, ) +from core.app_assets.accessor import CachedContentAccessor from core.app_assets.entities.assets import AssetItem from core.app_assets.storage import AssetPaths from extensions.ext_database import db @@ -22,6 +23,7 @@ from extensions.storage.cached_presign_storage import CachedPresignStorage from extensions.storage.file_presign_storage import FilePresignStorage from models.app_asset import AppAssets from models.model import App +from services.asset_content_service import AssetContentService from .errors.app_asset import ( AppAssetNodeNotFoundError, @@ -207,6 +209,11 @@ class AppAssetService: return node + @staticmethod + def get_accessor(tenant_id: str, app_id: str) -> CachedContentAccessor: + """Get a content accessor with DB caching for the given app.""" + return CachedContentAccessor(AppAssetService.get_storage(), tenant_id, app_id) + @staticmethod def get_file_content(app_model: App, account_id: str, node_id: str) -> bytes: with Session(db.engine) as session: @@ -221,9 +228,8 @@ class AppAssetService: max_size_mb = AppAssetService.MAX_PREVIEW_CONTENT_SIZE / 1024 / 1024 raise AppAssetNodeTooLargeError(f"File node {node_id} size exceeded the limit: {max_size_mb} MB") - asset_storage = AppAssetService.get_storage() - key = AssetPaths.draft(app_model.tenant_id, app_model.id, node_id) - return asset_storage.load_once(key) + accessor = AppAssetService.get_accessor(app_model.tenant_id, app_model.id) + return accessor.load(node) @staticmethod def update_file_content( @@ -242,9 +248,8 @@ class AppAssetService: except TreeNodeNotFoundError as e: raise AppAssetNodeNotFoundError(str(e)) from e - asset_storage = AppAssetService.get_storage() - key = AssetPaths.draft(app_model.tenant_id, app_model.id, node_id) - asset_storage.save(key, content) + accessor = AppAssetService.get_accessor(app_model.tenant_id, app_model.id) + accessor.save(node, content) assets.asset_tree = tree assets.updated_by = account_id @@ -340,8 +345,9 @@ class AppAssetService: assets.updated_by = account_id session.commit() - # FIXME(Mairuis): sync deletion queue, failed is fine - def _delete_file_from_storage(tenant_id: str, app_id: str, node_ids: list[str]) -> None: + # Delete from both DB cache and S3 in background; failures are non-fatal. + def _delete_files(tenant_id: str, app_id: str, node_ids: list[str]) -> None: + AssetContentService.delete_many(tenant_id, app_id, node_ids) asset_storage = AppAssetService.get_storage() for nid in node_ids: key = AssetPaths.draft(tenant_id, app_id, nid) @@ -350,9 +356,7 @@ class AppAssetService: except Exception: logger.warning("Failed to delete storage file %s", key, exc_info=True) - threading.Thread( - target=lambda: _delete_file_from_storage(app_model.tenant_id, app_model.id, removed_ids) - ).start() + threading.Thread(target=lambda: _delete_files(app_model.tenant_id, app_model.id, removed_ids)).start() @staticmethod def get_file_download_url( @@ -469,17 +473,13 @@ class AppAssetService: tree = assets.asset_tree taken_by_parent: dict[str | None, set[str]] = {} - stack: list[tuple[BatchUploadNode, str | None]] = [ - (child, None) for child in reversed(input_children) - ] + stack: list[tuple[BatchUploadNode, str | None]] = [(child, None) for child in reversed(input_children)] while stack: node, parent_id = stack.pop() if node.id is None: node.id = str(uuid4()) if parent_id not in taken_by_parent: - taken_by_parent[parent_id] = { - child.name for child in tree.get_children(parent_id) - } + taken_by_parent[parent_id] = {child.name for child in tree.get_children(parent_id)} taken = taken_by_parent[parent_id] unique_name = tree.ensure_unique_name( parent_id, diff --git a/api/services/asset_content_service.py b/api/services/asset_content_service.py new file mode 100644 index 0000000000..bf67e489fa --- /dev/null +++ b/api/services/asset_content_service.py @@ -0,0 +1,103 @@ +"""Service for the app_asset_contents table. + +Provides single-node and batch DB operations for the inline content cache. +All methods are static and open their own short-lived sessions. + +Collaborators: + - models.app_asset.AppAssetContent (SQLAlchemy model) + - core.app_assets.accessor (accessor abstraction that calls this service) +""" + +import logging + +from sqlalchemy import delete, select +from sqlalchemy.dialects.postgresql import insert as pg_insert +from sqlalchemy.orm import Session + +from extensions.ext_database import db +from models.app_asset import AppAssetContent + +logger = logging.getLogger(__name__) + + +class AssetContentService: + """DB operations for the inline asset content cache. + + All methods are static. All queries are scoped by tenant_id + app_id. + """ + + @staticmethod + def get(tenant_id: str, app_id: str, node_id: str) -> str | None: + """Get cached content for a single node. Returns None on miss.""" + with Session(db.engine) as session: + return session.execute( + select(AppAssetContent.content).where( + AppAssetContent.tenant_id == tenant_id, + AppAssetContent.app_id == app_id, + AppAssetContent.node_id == node_id, + ) + ).scalar_one_or_none() + + @staticmethod + def get_many(tenant_id: str, app_id: str, node_ids: list[str]) -> dict[str, str]: + """Batch get. Returns {node_id: content} for hits only.""" + if not node_ids: + return {} + with Session(db.engine) as session: + rows = session.execute( + select(AppAssetContent.node_id, AppAssetContent.content).where( + AppAssetContent.tenant_id == tenant_id, + AppAssetContent.app_id == app_id, + AppAssetContent.node_id.in_(node_ids), + ) + ).all() + return {row.node_id: row.content for row in rows} + + @staticmethod + def upsert(tenant_id: str, app_id: str, node_id: str, content: str, size: int) -> None: + """Insert or update inline content for a single node.""" + with Session(db.engine) as session: + stmt = pg_insert(AppAssetContent).values( + tenant_id=tenant_id, + app_id=app_id, + node_id=node_id, + content=content, + size=size, + ) + stmt = stmt.on_conflict_do_update( + constraint="uq_asset_content_node", + set_={ + "content": stmt.excluded.content, + "size": stmt.excluded.size, + }, + ) + session.execute(stmt) + session.commit() + + @staticmethod + def delete(tenant_id: str, app_id: str, node_id: str) -> None: + """Delete cached content for a single node.""" + with Session(db.engine) as session: + session.execute( + delete(AppAssetContent).where( + AppAssetContent.tenant_id == tenant_id, + AppAssetContent.app_id == app_id, + AppAssetContent.node_id == node_id, + ) + ) + session.commit() + + @staticmethod + def delete_many(tenant_id: str, app_id: str, node_ids: list[str]) -> None: + """Delete cached content for multiple nodes.""" + if not node_ids: + return + with Session(db.engine) as session: + session.execute( + delete(AppAssetContent).where( + AppAssetContent.tenant_id == tenant_id, + AppAssetContent.app_id == app_id, + AppAssetContent.node_id.in_(node_ids), + ) + ) + session.commit() diff --git a/api/tests/unit_tests/core/app_assets/test_storage.py b/api/tests/unit_tests/core/app_assets/test_storage.py index f280f037d4..2094f1b89c 100644 --- a/api/tests/unit_tests/core/app_assets/test_storage.py +++ b/api/tests/unit_tests/core/app_assets/test_storage.py @@ -89,21 +89,6 @@ def test_asset_paths_draft_validation(): AssetPaths.draft(tenant_id=tenant_id, app_id=app_id, node_id="not-a-uuid") -def test_asset_paths_resolved_requires_node_id(): - """Test that AssetPaths.resolved() requires a valid node_id.""" - tenant_id = str(uuid4()) - app_id = str(uuid4()) - assets_id = str(uuid4()) - - # Missing node_id should raise - with pytest.raises(TypeError): - AssetPaths.resolved(tenant_id, app_id, assets_id) # type: ignore[call-arg] - - # Invalid node_id should raise - with pytest.raises(ValueError, match="node_id must be a valid UUID"): - AssetPaths.resolved(tenant_id, app_id, assets_id, node_id="not-a-uuid") - - # --- Storage key format tests (must match existing paths exactly) ---