mirror of
https://github.com/langgenius/dify.git
synced 2026-05-13 08:57:28 +08:00
Port the complete infrastructure for agent sandbox execution and skill system: Sandbox & Virtual Environment (core/sandbox/, core/virtual_environment/): - Sandbox entity with lifecycle management (ready/failed/cancelled states) - SandboxBuilder with fluent API for configuring providers - 5 VM providers: Local, SSH, Docker, E2B, AWS CodeInterpreter - VirtualEnvironment base with command execution, file transfer, transport layers - Channel transport: pipe, queue, socket implementations - Bash session management and DifyCli binary integration - Storage: archive storage, file storage, noop storage, presign storage - Initializers: DifyCli, AppAssets, DraftAppAssets, Skills - Inspector: file browser, archive/runtime source, script utils - Security: encryption utils, debug helpers Skill & App Assets (core/skill/, core/app_assets/, core/app_bundle/): - Skill entity and manager - App asset accessor, builder pipeline (file, skill builders) - App bundle source zip extractor - Storage and converter utilities API Endpoints: - CLI API blueprint (controllers/cli_api/) for sandbox callback - Sandbox provider management (workspace/sandbox_providers) - Sandbox file browser (console/sandbox_files) - App asset management (console/app/app_asset) - Skill management (console/app/skills) - Storage file endpoints (controllers/files/storage_files) Services: - Sandbox service, provider service, file service - App asset service, app bundle service Config: - CliApiConfig, CreatorsPlatformConfig, CollaborationConfig - FILES_API_URL for sandbox file access Note: Controller route registration temporarily commented out (marked TODO) pending resolution of deep dependency chains (socketio, workflow_comment, command node, etc.). Core sandbox modules are fully ported and syntax-validated. 110 files changed, 10,549 insertions. Made-with: Cursor
181 lines
6.3 KiB
Python
181 lines
6.3 KiB
Python
"""Unified content accessor for app asset nodes.
|
|
|
|
Accessor is scoped to a single app (tenant_id + app_id), not a single node.
|
|
All methods accept an AppAssetNode parameter to identify the target.
|
|
|
|
CachedContentAccessor is the primary entry point:
|
|
- Reads DB first, misses fall through to S3 with sync backfill.
|
|
- Writes go to both DB and S3 (dual-write).
|
|
- resolve_items() batch-enriches AssetItem lists with DB-cached content
|
|
(extension-agnostic), so callers never need to filter by extension.
|
|
- Wraps an internal _StorageAccessor for S3 I/O.
|
|
|
|
Collaborators:
|
|
- services.asset_content_service.AssetContentService (DB layer)
|
|
- core.app_assets.storage.AssetPaths (S3 key generation)
|
|
- extensions.storage.cached_presign_storage.CachedPresignStorage (S3 I/O)
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
|
|
from core.app.entities.app_asset_entities import AppAssetNode
|
|
from core.app_assets.entities.assets import AssetItem
|
|
from core.app_assets.storage import AssetPaths
|
|
from extensions.storage.cached_presign_storage import CachedPresignStorage
|
|
from services.asset_content_service import AssetContentService
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# S3-only implementation (internal, used as inner delegate)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class _StorageAccessor:
|
|
"""Reads/writes draft content via object storage (S3) only."""
|
|
|
|
_storage: CachedPresignStorage
|
|
_tenant_id: str
|
|
_app_id: str
|
|
|
|
def __init__(self, storage: CachedPresignStorage, tenant_id: str, app_id: str) -> None:
|
|
self._storage = storage
|
|
self._tenant_id = tenant_id
|
|
self._app_id = app_id
|
|
|
|
def _key(self, node: AppAssetNode) -> str:
|
|
return AssetPaths.draft(self._tenant_id, self._app_id, node.id)
|
|
|
|
def load(self, node: AppAssetNode) -> bytes:
|
|
return self._storage.load_once(self._key(node))
|
|
|
|
def save(self, node: AppAssetNode, content: bytes) -> None:
|
|
self._storage.save(self._key(node), content)
|
|
|
|
def delete(self, node: AppAssetNode) -> None:
|
|
try:
|
|
self._storage.delete(self._key(node))
|
|
except Exception:
|
|
logger.warning("Failed to delete storage key %s", self._key(node), exc_info=True)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# DB-cached implementation (the public API)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class CachedContentAccessor:
|
|
"""App-level content accessor with DB read-through cache over S3.
|
|
|
|
Read path: DB first -> miss -> S3 fallback -> sync backfill DB
|
|
Write path: DB upsert + S3 save (dual-write)
|
|
Delete path: DB delete + S3 delete
|
|
|
|
bulk_load uses a single SQL query for all nodes, with S3 fallback per miss.
|
|
|
|
Usage:
|
|
accessor = CachedContentAccessor(storage, tenant_id, app_id)
|
|
content = accessor.load(node)
|
|
accessor.save(node, content)
|
|
results = accessor.bulk_load(nodes)
|
|
"""
|
|
|
|
_inner: _StorageAccessor
|
|
_tenant_id: str
|
|
_app_id: str
|
|
|
|
def __init__(self, storage: CachedPresignStorage, tenant_id: str, app_id: str) -> None:
|
|
self._inner = _StorageAccessor(storage, tenant_id, app_id)
|
|
self._tenant_id = tenant_id
|
|
self._app_id = app_id
|
|
|
|
def load(self, node: AppAssetNode) -> bytes:
|
|
# 1. Try DB
|
|
cached = AssetContentService.get(self._tenant_id, self._app_id, node.id)
|
|
if cached is not None:
|
|
return cached.encode("utf-8")
|
|
|
|
# 2. Fallback to S3
|
|
data = self._inner.load(node)
|
|
|
|
# 3. Sync backfill DB
|
|
AssetContentService.upsert(
|
|
tenant_id=self._tenant_id,
|
|
app_id=self._app_id,
|
|
node_id=node.id,
|
|
content=data.decode("utf-8"),
|
|
size=len(data),
|
|
)
|
|
return data
|
|
|
|
def bulk_load(self, nodes: list[AppAssetNode]) -> dict[str, bytes]:
|
|
"""Single SQL for all nodes, S3 fallback + backfill per miss."""
|
|
result: dict[str, bytes] = {}
|
|
node_ids = [n.id for n in nodes]
|
|
cached = AssetContentService.get_many(self._tenant_id, self._app_id, node_ids)
|
|
|
|
for node in nodes:
|
|
if node.id in cached:
|
|
result[node.id] = cached[node.id].encode("utf-8")
|
|
else:
|
|
# S3 fallback + sync backfill
|
|
data = self._inner.load(node)
|
|
AssetContentService.upsert(
|
|
tenant_id=self._tenant_id,
|
|
app_id=self._app_id,
|
|
node_id=node.id,
|
|
content=data.decode("utf-8"),
|
|
size=len(data),
|
|
)
|
|
result[node.id] = data
|
|
return result
|
|
|
|
def save(self, node: AppAssetNode, content: bytes) -> None:
|
|
# Dual-write: DB + S3
|
|
AssetContentService.upsert(
|
|
tenant_id=self._tenant_id,
|
|
app_id=self._app_id,
|
|
node_id=node.id,
|
|
content=content.decode("utf-8"),
|
|
size=len(content),
|
|
)
|
|
self._inner.save(node, content)
|
|
|
|
def resolve_items(self, items: list[AssetItem]) -> list[AssetItem]:
|
|
"""Batch-enrich asset items with DB-cached content.
|
|
|
|
Queries by ``asset_id`` only — extension-agnostic. Items without
|
|
a DB cache row keep their original *content* value (typically
|
|
``None``), so only genuinely cached assets (e.g. ``.md`` skill
|
|
documents) get populated.
|
|
|
|
This eliminates the need for callers to filter by file extension
|
|
before deciding whether to read from the DB cache.
|
|
"""
|
|
if not items:
|
|
return items
|
|
|
|
node_ids = [a.asset_id for a in items]
|
|
cached = AssetContentService.get_many(self._tenant_id, self._app_id, node_ids)
|
|
|
|
if not cached:
|
|
return items
|
|
|
|
return [
|
|
AssetItem(
|
|
asset_id=a.asset_id,
|
|
path=a.path,
|
|
file_name=a.file_name,
|
|
extension=a.extension,
|
|
storage_key=a.storage_key,
|
|
content=cached[a.asset_id].encode("utf-8") if a.asset_id in cached else a.content,
|
|
)
|
|
for a in items
|
|
]
|
|
|
|
def delete(self, node: AppAssetNode) -> None:
|
|
AssetContentService.delete(self._tenant_id, self._app_id, node.id)
|
|
self._inner.delete(node)
|