diff --git a/api/core/app_assets/builder/skill_builder.py b/api/core/app_assets/builder/skill_builder.py index 3f3a76a373..c710eb0942 100644 --- a/api/core/app_assets/builder/skill_builder.py +++ b/api/core/app_assets/builder/skill_builder.py @@ -60,7 +60,7 @@ class SkillBuilder: # 2. Compile all skills (CPU-bound, single thread) documents = [SkillDocument(skill_id=s.node.id, content=s.content, metadata=s.metadata) for s in loaded] - artifact_set = SkillCompiler().compile_all(documents, tree, ctx.build_id) + artifact_set = SkillCompiler().compile_bundle(documents, tree, ctx.build_id) SkillManager.save_bundle(ctx.tenant_id, ctx.app_id, ctx.build_id, artifact_set) diff --git a/api/core/skill/entities/skill_bundle.py b/api/core/skill/entities/skill_bundle.py index 19b58650d9..4bca812e20 100644 --- a/api/core/skill/entities/skill_bundle.py +++ b/api/core/skill/entities/skill_bundle.py @@ -3,26 +3,30 @@ from datetime import datetime from pydantic import BaseModel, ConfigDict, Field +from core.skill.entities.asset_references import AssetReferences from core.skill.entities.skill_bundle_entry import SkillBundleEntry from core.skill.entities.skill_metadata import ToolReference from core.skill.entities.tool_dependencies import ToolDependencies, ToolDependency +from core.skill.graph_utils import collect_reachable, invert_dependency_map class SkillBundle(BaseModel): + """Persisted skill compilation snapshot with graph metadata and merge support.""" + model_config = ConfigDict(extra="forbid") assets_id: str = Field(description="Assets ID this bundle belongs to") - schema_version: int = Field(default=1, description="Schema version for forward compatibility") + schema_version: int = Field(default=2, description="Schema version for forward compatibility") built_at: datetime | None = Field(default=None, description="Build timestamp") entries: dict[str, SkillBundleEntry] = Field(default_factory=dict, description="skill_id -> SkillBundleEntry") - dependency_graph: dict[str, list[str]] = Field( + depends_on_map: dict[str, list[str]] = Field( default_factory=dict, description="skill_id -> list of skill_ids it depends on", ) - reverse_graph: dict[str, list[str]] = Field( + reference_map: dict[str, list[str]] = Field( default_factory=dict, description="skill_id -> list of skill_ids that depend on it", ) @@ -35,28 +39,66 @@ class SkillBundle(BaseModel): def remove(self, skill_id: str) -> None: self.entries.pop(skill_id, None) - self.dependency_graph.pop(skill_id, None) - self.reverse_graph.pop(skill_id, None) - for deps in self.reverse_graph.values(): + self.depends_on_map.pop(skill_id, None) + self.reference_map.pop(skill_id, None) + for deps in self.reference_map.values(): if skill_id in deps: deps.remove(skill_id) - for deps in self.dependency_graph.values(): + for deps in self.depends_on_map.values(): if skill_id in deps: deps.remove(skill_id) def referenced_skill_ids(self, skill_id: str) -> set[str]: - return set(self.dependency_graph.get(skill_id, [])) + return set(self.depends_on_map.get(skill_id, [])) def recompile_group_ids(self, skill_id: str) -> set[str]: - result: set[str] = {skill_id} - queue = [skill_id] - while queue: - current = queue.pop() - for dependent in self.reverse_graph.get(current, []): - if dependent not in result: - result.add(dependent) - queue.append(dependent) - return result + return collect_reachable([skill_id], self.reference_map) + + def merge(self, patch: "SkillBundle") -> "SkillBundle": + """Return a new bundle with patch entries merged and affected closure recomputed.""" + if self.assets_id != patch.assets_id: + raise ValueError("bundle assets_id mismatch") + + changed_skill_ids = set(patch.entries.keys()) + if not changed_skill_ids: + return self.model_copy(deep=True) + + merged_entries = dict(self.entries) + merged_entries.update(patch.entries) + + merged_depends_on_map: dict[str, list[str]] = { + skill_id: [dep for dep in deps if dep in merged_entries] + for skill_id, deps in self.depends_on_map.items() + if skill_id in merged_entries + } + + for skill_id in changed_skill_ids: + deps = patch.depends_on_map.get(skill_id) + if deps is None: + entry = patch.entries[skill_id] + deps = [f.asset_id for f in entry.direct_files.references] + merged_depends_on_map[skill_id] = [dep for dep in _dedupe(deps) if dep in merged_entries] + + for skill_id in merged_entries: + merged_depends_on_map.setdefault(skill_id, []) + + reference_map = { + skill_id: sorted(referrers) + for skill_id, referrers in invert_dependency_map(merged_depends_on_map, merged_entries.keys()).items() + } + + affected_skill_ids = collect_reachable(changed_skill_ids, reference_map) + recomputed_entries = _recompute_affected_entries(merged_entries, merged_depends_on_map, affected_skill_ids) + merged_entries.update(recomputed_entries) + + return SkillBundle( + assets_id=self.assets_id, + schema_version=max(self.schema_version, patch.schema_version), + built_at=patch.built_at or self.built_at, + entries=merged_entries, + depends_on_map=dict(merged_depends_on_map), + reference_map=reference_map, + ) def subset(self, skill_ids: Iterable[str]) -> "SkillBundle": skill_id_set = set(skill_ids) @@ -65,14 +107,14 @@ class SkillBundle(BaseModel): schema_version=self.schema_version, built_at=self.built_at, entries={sid: self.entries[sid] for sid in skill_id_set if sid in self.entries}, - dependency_graph={ + depends_on_map={ sid: [dep for dep in deps if dep in skill_id_set] - for sid, deps in self.dependency_graph.items() + for sid, deps in self.depends_on_map.items() if sid in skill_id_set }, - reverse_graph={ + reference_map={ sid: [dep for dep in deps if dep in skill_id_set] - for sid, deps in self.reverse_graph.items() + for sid, deps in self.reference_map.items() if sid in skill_id_set }, ) @@ -95,3 +137,60 @@ class SkillBundle(BaseModel): dependencies=list(dependencies.values()), references=list(references.values()), ) + + +def _dedupe(values: Iterable[str]) -> list[str]: + return list(dict.fromkeys(values)) + + +def _recompute_affected_entries( + entries: dict[str, SkillBundleEntry], + depends_on_map: dict[str, list[str]], + affected_skill_ids: set[str], +) -> dict[str, SkillBundleEntry]: + recomputed_entries = {skill_id: entries[skill_id] for skill_id in affected_skill_ids if skill_id in entries} + changed = True + while changed: + changed = False + for skill_id in affected_skill_ids: + current_entry = recomputed_entries.get(skill_id) + if current_entry is None: + continue + + merged_tool_deps: dict[str, ToolDependency] = { + dep.tool_id(): dep for dep in current_entry.direct_tools.dependencies + } + merged_tool_refs: dict[str, ToolReference] = { + ref.uuid: ref for ref in current_entry.direct_tools.references + } + merged_files = {f.asset_id: f for f in current_entry.direct_files.references} + + for dep_id in depends_on_map.get(skill_id, []): + dep_entry = recomputed_entries.get(dep_id) or entries.get(dep_id) + if dep_entry is None: + continue + + for dep in dep_entry.tools.dependencies: + merged_tool_deps.setdefault(dep.tool_id(), dep) + + for ref in dep_entry.tools.references: + merged_tool_refs.setdefault(ref.uuid, ref) + + for file_ref in dep_entry.files.references: + merged_files.setdefault(file_ref.asset_id, file_ref) + + merged_tools = ToolDependencies( + dependencies=[merged_tool_deps[key] for key in sorted(merged_tool_deps.keys())], + references=[merged_tool_refs[key] for key in sorted(merged_tool_refs.keys())], + ) + merged_asset_refs = AssetReferences(references=[merged_files[key] for key in sorted(merged_files.keys())]) + if merged_tools != current_entry.tools or merged_asset_refs != current_entry.files: + recomputed_entries[skill_id] = current_entry.model_copy( + update={ + "tools": merged_tools, + "files": merged_asset_refs, + } + ) + changed = True + + return recomputed_entries diff --git a/api/core/skill/entities/skill_bundle_entry.py b/api/core/skill/entities/skill_bundle_entry.py index 202c7e10ac..ae2bd5f83c 100644 --- a/api/core/skill/entities/skill_bundle_entry.py +++ b/api/core/skill/entities/skill_bundle_entry.py @@ -16,6 +16,8 @@ class SkillBundleEntry(BaseModel): skill_id: str = Field(description="Unique identifier for this skill") source: SourceInfo = Field(description="Source file information") + direct_tools: ToolDependencies = Field(description="Direct tool dependencies parsed from this skill only") + direct_files: AssetReferences = Field(description="Direct file references parsed from this skill only") tools: ToolDependencies = Field(description="All tool dependencies (transitive closure)") files: AssetReferences = Field(description="All file references (transitive closure)") content: str = Field(description="Resolved content with all references replaced") diff --git a/api/core/skill/graph_utils.py b/api/core/skill/graph_utils.py new file mode 100644 index 0000000000..dceeb1ebd2 --- /dev/null +++ b/api/core/skill/graph_utils.py @@ -0,0 +1,29 @@ +from __future__ import annotations + +from collections import deque +from collections.abc import Iterable, Mapping + + +def invert_dependency_map(depends_on_map: Mapping[str, Iterable[str]], all_nodes: Iterable[str]) -> dict[str, set[str]]: + """Build a reverse lookup map: target_id -> direct referrer ids.""" + reference_map: dict[str, set[str]] = {node_id: set() for node_id in all_nodes} + for node_id, deps in depends_on_map.items(): + for dep_id in deps: + if dep_id in reference_map: + reference_map[dep_id].add(node_id) + return reference_map + + +def collect_reachable(start_nodes: Iterable[str], adjacency_map: Mapping[str, Iterable[str]]) -> set[str]: + """Return all nodes reachable from start nodes in adjacency map, inclusive.""" + visited: set[str] = set() + queue = deque(start_nodes) + while queue: + node_id = queue.popleft() + if node_id in visited: + continue + visited.add(node_id) + for next_id in adjacency_map.get(node_id, []): + if next_id not in visited: + queue.append(next_id) + return visited diff --git a/api/core/skill/skill_compiler.py b/api/core/skill/skill_compiler.py index 96213cc1d9..dc27d0a424 100644 --- a/api/core/skill/skill_compiler.py +++ b/api/core/skill/skill_compiler.py @@ -1,6 +1,6 @@ import hashlib import re -from collections.abc import Iterable, Mapping, Sequence +from collections.abc import Iterable, Mapping from dataclasses import dataclass from typing import Any, Protocol, cast @@ -17,6 +17,7 @@ from core.skill.entities.skill_metadata import ( create_tool_id, ) from core.skill.entities.tool_dependencies import ToolDependencies, ToolDependency +from core.skill.graph_utils import invert_dependency_map from core.tools.entities.tool_entities import ToolProviderType @@ -71,6 +72,8 @@ class DefaultToolResolver: class SkillCompiler: + """Compile skill documents into full bundles or incremental patches.""" + def __init__( self, path_resolver: PathResolver | None = None, @@ -81,14 +84,98 @@ class SkillCompiler: self._tool_resolver = tool_resolver or DefaultToolResolver() self._config = config or CompilerConfig() + def compile_bundle( + self, + documents: Iterable[SkillDocument], + file_tree: AppAssetFileTree, + assets_id: str, + ) -> SkillBundle: + """Compile all provided documents into a complete persisted bundle.""" + path_resolver = self._path_resolver or FileTreePathResolver(file_tree) + doc_map = {doc.skill_id: doc for doc in documents} + entries, metadata_cache = self._compile_documents_direct(doc_map.values(), path_resolver) + depends_on_map = self._build_depends_on_map(metadata_cache, set(entries.keys())) + + direct_bundle = SkillBundle( + assets_id=assets_id, + entries=entries, + depends_on_map=depends_on_map, + reference_map=self._build_reference_map(depends_on_map, set(entries.keys())), + ) + return SkillBundle(assets_id=assets_id).merge(direct_bundle) + + def compile_increment( + self, + base_bundle: SkillBundle, + documents: Iterable[SkillDocument], + file_tree: AppAssetFileTree, + base_path: str = "", + ) -> SkillBundle: + """Compile changed documents against base bundle and return a merge-ready patch.""" + doc_map = {doc.skill_id: doc for doc in documents} + if not doc_map: + return SkillBundle(assets_id=base_bundle.assets_id) + + path_resolver = self._path_resolver or FileTreePathResolver(file_tree, base_path) + entries, metadata_cache = self._compile_documents_direct(doc_map.values(), path_resolver) + known_skill_ids = set(base_bundle.entries.keys()) | set(entries.keys()) + depends_on_map = self._build_depends_on_map(metadata_cache, known_skill_ids) + + direct_patch = SkillBundle( + assets_id=base_bundle.assets_id, + entries=entries, + depends_on_map=depends_on_map, + reference_map=self._build_reference_map(depends_on_map, set(entries.keys())), + ) + merged_bundle = base_bundle.merge(direct_patch) + compiled_entries = { + skill_id: merged_bundle.entries[skill_id] for skill_id in entries if skill_id in merged_bundle.entries + } + + return SkillBundle( + assets_id=base_bundle.assets_id, + schema_version=merged_bundle.schema_version, + built_at=merged_bundle.built_at, + entries=compiled_entries, + depends_on_map=depends_on_map, + reference_map=self._build_reference_map(depends_on_map, set(compiled_entries.keys())), + ) + + def compile_document( + self, + bundle: SkillBundle, + document: SkillDocument, + file_tree: AppAssetFileTree, + base_path: str = "", + ) -> SkillBundleEntry: + """Compile one document with bundle context without mutating the bundle.""" + patch = self.compile_increment(bundle, [document], file_tree, base_path) + entry = patch.get(document.skill_id) + if entry is not None: + return entry + + path_resolver = self._path_resolver or FileTreePathResolver(file_tree, base_path) + metadata = self._parse_metadata(document.content, document.metadata) + return self._build_direct_entry(document, metadata, path_resolver) + + def put( + self, + base_bundle: SkillBundle, + document: SkillDocument, + file_tree: AppAssetFileTree, + base_path: str = "", + ) -> SkillBundle: + """Compile one document and merge it into a newly returned bundle.""" + patch = self.compile_increment(base_bundle, [document], file_tree, base_path) + return base_bundle.merge(patch) + def compile_all( self, documents: Iterable[SkillDocument], file_tree: AppAssetFileTree, assets_id: str, ) -> SkillBundle: - path_resolver = self._path_resolver or FileTreePathResolver(file_tree) - return self._compile_batch_internal(documents, assets_id, path_resolver) + return self.compile_bundle(documents, file_tree, assets_id) def compile_one( self, @@ -97,213 +184,76 @@ class SkillCompiler: file_tree: AppAssetFileTree, base_path: str = "", ) -> SkillBundleEntry: - path_resolver = self._path_resolver or FileTreePathResolver(file_tree, base_path) - resolved_content, tool_dependencies = self._compile_template_internal( - document.content, document.metadata, bundle, path_resolver - ) + return self.compile_document(bundle, document, file_tree, base_path) - metadata = self._parse_metadata(document.content, document.metadata) - final_files: dict[str, FileReference] = {f.asset_id: f for f in metadata.files} - - return SkillBundleEntry( - skill_id=document.skill_id, - source=SourceInfo( - asset_id=document.skill_id, - content_digest=hashlib.sha256(document.content.encode("utf-8")).hexdigest(), - ), - tools=tool_dependencies, - files=AssetReferences(references=list(final_files.values())), - content=resolved_content, - ) - - def _compile_batch_internal( + def _compile_documents_direct( self, documents: Iterable[SkillDocument], - assets_id: str, path_resolver: PathResolver, - ) -> SkillBundle: - doc_map = {doc.skill_id: doc for doc in documents} - graph: dict[str, set[str]] = {} + ) -> tuple[dict[str, SkillBundleEntry], dict[str, SkillMetadata]]: + entries: dict[str, SkillBundleEntry] = {} metadata_cache: dict[str, SkillMetadata] = {} - - # Phase 1: Parse metadata and build dependency graph - for doc in doc_map.values(): + for doc in documents: metadata = self._parse_metadata(doc.content, doc.metadata) metadata_cache[doc.skill_id] = metadata + entries[doc.skill_id] = self._build_direct_entry(doc, metadata, path_resolver) + return entries, metadata_cache - deps: set[str] = set() + def _build_depends_on_map( + self, + metadata_cache: Mapping[str, SkillMetadata], + known_skill_ids: set[str], + ) -> dict[str, list[str]]: + depends_on_map: dict[str, list[str]] = {} + for skill_id, metadata in metadata_cache.items(): + deps: list[str] = [] + seen: set[str] = set() for file_ref in metadata.files: - if file_ref.asset_id in doc_map: - deps.add(file_ref.asset_id) - graph[doc.skill_id] = deps + dep_id = file_ref.asset_id + if dep_id in known_skill_ids and dep_id not in seen: + seen.add(dep_id) + deps.append(dep_id) + depends_on_map[skill_id] = deps + return depends_on_map - bundle = SkillBundle(assets_id=assets_id) - bundle.dependency_graph = {k: list(v) for k, v in graph.items()} + def _build_reference_map( + self, + depends_on_map: Mapping[str, list[str]], + all_skill_ids: set[str], + ) -> dict[str, list[str]]: + return { + skill_id: sorted(referrers) + for skill_id, referrers in invert_dependency_map(depends_on_map, all_skill_ids).items() + } - # Build reverse graph for propagation - reverse_graph: dict[str, set[str]] = {skill_id: set() for skill_id in doc_map} - for skill_id, deps in graph.items(): - for dep_id in deps: - if dep_id in reverse_graph: - reverse_graph[dep_id].add(skill_id) - bundle.reverse_graph = {k: list(v) for k, v in reverse_graph.items()} - - # Phase 2: Compile each skill independently (content + direct dependencies only) - for skill_id, doc in doc_map.items(): - metadata = metadata_cache[skill_id] - entry = self._compile_node_direct(doc, metadata, path_resolver) - bundle.upsert(entry) - - # Phase 3: Propagate transitive dependencies until fixed-point - self._propagate_transitive_dependencies(bundle, graph) - - return bundle - - def _compile_node_direct( + def _build_direct_entry( self, doc: SkillDocument, metadata: SkillMetadata, path_resolver: PathResolver, ) -> SkillBundleEntry: - """Compile a single skill with only its direct dependencies (no transitive).""" - direct_tools: dict[str, ToolDependency] = {} - direct_refs: dict[str, ToolReference] = {} - + direct_tool_deps: dict[str, ToolDependency] = {} + direct_tool_refs: dict[str, ToolReference] = {} for tool_ref in metadata.tools.values(): - key = tool_ref.tool_id() - if key not in direct_tools: - direct_tools[key] = ToolDependency( + direct_tool_deps.setdefault( + tool_ref.tool_id(), + ToolDependency( type=tool_ref.type, provider=tool_ref.provider, tool_name=tool_ref.tool_name, - ) - direct_refs[tool_ref.uuid] = tool_ref + enabled=tool_ref.enabled, + ), + ) + direct_tool_refs[tool_ref.uuid] = tool_ref direct_files: dict[str, FileReference] = {f.asset_id: f for f in metadata.files} resolved_content = self._resolve_content(doc.content, metadata, path_resolver, doc.skill_id) - return SkillBundleEntry( - skill_id=doc.skill_id, - source=SourceInfo( - asset_id=doc.skill_id, - content_digest=hashlib.sha256(doc.content.encode("utf-8")).hexdigest(), - ), - tools=ToolDependencies( - dependencies=list(direct_tools.values()), - references=list(direct_refs.values()), - ), - files=AssetReferences( - references=list(direct_files.values()), - ), - content=resolved_content, + direct_tools = ToolDependencies( + dependencies=list(direct_tool_deps.values()), + references=list(direct_tool_refs.values()), ) - - def _propagate_transitive_dependencies( - self, - bundle: SkillBundle, - graph: dict[str, set[str]], - ) -> None: - """Iteratively propagate transitive dependencies until no changes occur.""" - changed = True - while changed: - changed = False - for skill_id, dep_ids in graph.items(): - entry = bundle.get(skill_id) - if not entry: - continue - - # Collect current tools and files - current_tools: dict[str, ToolDependency] = {d.tool_id(): d for d in entry.tools.dependencies} - current_refs: dict[str, ToolReference] = {r.uuid: r for r in entry.tools.references} - current_files: dict[str, FileReference] = {f.asset_id: f for f in entry.files.references} - - original_tool_count = len(current_tools) - original_ref_count = len(current_refs) - original_file_count = len(current_files) - - # Merge from dependencies - for dep_id in dep_ids: - dep_entry = bundle.get(dep_id) - if not dep_entry: - continue - - for tool_dep in dep_entry.tools.dependencies: - key = tool_dep.tool_id() - if key not in current_tools: - current_tools[key] = tool_dep - - for tool_ref in dep_entry.tools.references: - if tool_ref.uuid not in current_refs: - current_refs[tool_ref.uuid] = tool_ref - - for file_ref in dep_entry.files.references: - if file_ref.asset_id not in current_files: - current_files[file_ref.asset_id] = file_ref - - # Check if anything changed - if ( - len(current_tools) != original_tool_count - or len(current_refs) != original_ref_count - or len(current_files) != original_file_count - ): - changed = True - # Update the entry with new transitive dependencies - updated_entry = SkillBundleEntry( - skill_id=entry.skill_id, - source=entry.source, - tools=ToolDependencies( - dependencies=list(current_tools.values()), - references=list(current_refs.values()), - ), - files=AssetReferences( - references=list(current_files.values()), - ), - content=entry.content, - ) - bundle.upsert(updated_entry) - - def _compile_template_internal( - self, - content: str, - metadata_dict: Mapping[str, Any], - context: SkillBundle, - path_resolver: PathResolver, - ) -> tuple[str, ToolDependencies]: - metadata = self._parse_metadata(content, metadata_dict) - - direct_deps: list[SkillBundleEntry] = [] - for file_ref in metadata.files: - artifact = context.get(file_ref.asset_id) - if artifact: - direct_deps.append(artifact) - - final_tools, final_refs = self._aggregate_dependencies(metadata, direct_deps) - - resolved_content = self._resolve_content(content, metadata, path_resolver, current_id="