From b470cca533d7e42cf3b8d7cec5969c8ad43e123a Mon Sep 17 00:00:00 2001 From: Harry Date: Thu, 22 Jan 2026 13:41:15 +0800 Subject: [PATCH] feat(skill-builder): enhance skill loading and compilation with parallel processing - Introduced threading for loading skills and uploading compiled content to improve performance. - Added data classes for better structure and clarity in handling loaded and compiled skills. - Refactored the skill compilation process to separate loading and uploading, enhancing maintainability. --- api/core/app_assets/builder/skill_builder.py | 128 ++++++++++++------- 1 file changed, 82 insertions(+), 46 deletions(-) diff --git a/api/core/app_assets/builder/skill_builder.py b/api/core/app_assets/builder/skill_builder.py index 4f395517cf..9d46c71b72 100644 --- a/api/core/app_assets/builder/skill_builder.py +++ b/api/core/app_assets/builder/skill_builder.py @@ -1,4 +1,6 @@ import json +from concurrent.futures import ThreadPoolExecutor +from dataclasses import dataclass from core.app.entities.app_asset_entities import AppAssetFileTree, AppAssetNode from core.app_assets.entities import AssetItem, FileAsset @@ -11,11 +13,29 @@ from extensions.ext_storage import storage from .base import BuildContext +@dataclass +class _LoadedSkill: + node: AppAssetNode + path: str + content: str + metadata: dict + + +@dataclass +class _CompiledSkill: + node: AppAssetNode + path: str + resolved_key: str + content_bytes: bytes + + class SkillBuilder: _nodes: list[tuple[AppAssetNode, str]] + _max_workers: int - def __init__(self) -> None: + def __init__(self, max_workers: int = 8) -> None: self._nodes = [] + self._max_workers = max_workers def accept(self, node: AppAssetNode) -> bool: return node.extension == "md" @@ -27,9 +47,56 @@ class SkillBuilder: if not self._nodes: return [] - # 1. Load and create documents - documents: list[SkillDocument] = [] - for node, _ in self._nodes: + # 1. Load all skills (parallel IO) + loaded = self._load_all(ctx) + + # 2. Compile all skills (CPU-bound, single thread) + documents = [ + SkillDocument(skill_id=s.node.id, content=s.content, metadata=s.metadata) + for s in loaded + ] + artifact_set = SkillCompiler().compile_all(documents, tree, ctx.build_id) + + # 3. Save tool artifact + SkillManager.save_tool_artifact( + ctx.tenant_id, ctx.app_id, ctx.build_id, artifact_set.get_tool_artifact() + ) + + # 4. Prepare compiled skills for upload + to_upload: list[_CompiledSkill] = [] + for skill in loaded: + artifact = artifact_set.get(skill.node.id) + if artifact is None: + continue + resolved_key = AssetPaths.build_resolved_file( + ctx.tenant_id, ctx.app_id, ctx.build_id, skill.node.id + ) + to_upload.append( + _CompiledSkill( + node=skill.node, + path=skill.path, + resolved_key=resolved_key, + content_bytes=artifact.content.encode("utf-8"), + ) + ) + + # 5. Upload all compiled skills (parallel IO) + self._upload_all(to_upload) + + # 6. Return FileAssets + return [ + FileAsset( + asset_id=s.node.id, + path=s.path, + file_name=s.node.name, + extension=s.node.extension or "", + storage_key=s.resolved_key, + ) + for s in to_upload + ] + + def _load_all(self, ctx: BuildContext) -> list[_LoadedSkill]: + def load_one(node: AppAssetNode, path: str) -> _LoadedSkill: draft_key = AssetPaths.draft_file(ctx.tenant_id, ctx.app_id, node.id) try: data = json.loads(storage.load_once(draft_key)) @@ -38,48 +105,17 @@ class SkillBuilder: except Exception: content = "" metadata = {} + return _LoadedSkill(node=node, path=path, content=content, metadata=metadata) - documents.append( - SkillDocument( - skill_id=node.id, - content=content, - metadata=metadata, - ) - ) + with ThreadPoolExecutor(max_workers=self._max_workers) as executor: + futures = [executor.submit(load_one, node, path) for node, path in self._nodes] + return [f.result() for f in futures] - # 2. Compile all skills - compiler = SkillCompiler() - artifact_set = compiler.compile_all(documents, tree, ctx.build_id) + def _upload_all(self, skills: list[_CompiledSkill]) -> None: + def upload_one(skill: _CompiledSkill) -> None: + storage.save(skill.resolved_key, skill.content_bytes) - # 3. Save tool artifact - SkillManager.save_tool_artifact( - ctx.tenant_id, - ctx.app_id, - ctx.build_id, - artifact_set.get_tool_artifact(), - ) - - # 4. Save compiled content to storage and return FileAssets - results: list[AssetItem] = [] - for node, path in self._nodes: - artifact = artifact_set.get(node.id) - if artifact is None: - continue - - # Write compiled content to storage - resolved_key = AssetPaths.build_resolved_file( - ctx.tenant_id, ctx.app_id, ctx.build_id, node.id - ) - storage.save(resolved_key, artifact.content.encode("utf-8")) - - results.append( - FileAsset( - asset_id=node.id, - path=path, - file_name=node.name, - extension=node.extension or "", - storage_key=resolved_key, - ) - ) - - return results + with ThreadPoolExecutor(max_workers=self._max_workers) as executor: + futures = [executor.submit(upload_one, skill) for skill in skills] + for f in futures: + f.result()