refactor(tool-access): rename ToolKey to ToolDescription and update ToolAccessPolicy to use mappings for allowed tools and credentials

This commit is contained in:
Harry 2026-01-27 15:14:12 +08:00
parent b889ab8853
commit 504280995d
2 changed files with 39 additions and 26 deletions

View File

@ -9,7 +9,7 @@ from .skill_metadata import (
ToolFieldConfig,
ToolReference,
)
from .tool_access_policy import ToolAccessPolicy, ToolInvocationRequest, ToolKey
from .tool_access_policy import ToolAccessPolicy, ToolDescription, ToolInvocationRequest
from .tool_dependencies import ToolDependencies, ToolDependency
__all__ = [
@ -24,8 +24,8 @@ __all__ = [
"ToolConfiguration",
"ToolDependencies",
"ToolDependency",
"ToolDescription",
"ToolFieldConfig",
"ToolInvocationRequest",
"ToolKey",
"ToolReference",
]

View File

@ -1,10 +1,12 @@
from collections.abc import Mapping
from pydantic import BaseModel, ConfigDict, Field
from core.skill.entities.tool_dependencies import ToolDependencies
from core.tools.entities.tool_entities import ToolProviderType
class ToolKey(BaseModel):
class ToolDescription(BaseModel):
"""Immutable identifier for a tool (type + provider + name)."""
model_config = ConfigDict(frozen=True)
@ -13,6 +15,9 @@ class ToolKey(BaseModel):
provider: str
tool_name: str
def tool_id(self) -> str:
return f"{self.tool_type.value}:{self.provider}:{self.tool_name}"
class ToolInvocationRequest(BaseModel):
"""A request to invoke a specific tool with optional credential."""
@ -25,8 +30,8 @@ class ToolInvocationRequest(BaseModel):
credential_id: str | None = None
@property
def key(self) -> ToolKey:
return ToolKey(tool_type=self.tool_type, provider=self.provider, tool_name=self.tool_name)
def tool_description(self) -> ToolDescription:
return ToolDescription(tool_type=self.tool_type, provider=self.provider, tool_name=self.tool_name)
class ToolAccessPolicy(BaseModel):
@ -41,30 +46,37 @@ class ToolAccessPolicy(BaseModel):
model_config = ConfigDict(frozen=True)
allowed_tools: frozenset[ToolKey] = Field(default_factory=frozenset)
credential_ids_by_tool: dict[ToolKey, frozenset[str | None]] = Field(default_factory=dict)
allowed_tools: Mapping[str, ToolDescription] = Field(default_factory=dict)
credentials_by_tool: Mapping[str, set[str]] = Field(default_factory=dict)
@classmethod
def from_dependencies(cls, deps: ToolDependencies | None) -> "ToolAccessPolicy":
"""Create a ToolAccessPolicy from ToolDependencies."""
if deps is None or deps.is_empty():
return cls()
def to_key(t: ToolProviderType, p: str, n: str) -> ToolKey:
return ToolKey(tool_type=t, provider=p, tool_name=n)
allowed_tools: dict[str, ToolDescription] = {}
credentials_by_tool: dict[str, set[str]] = {}
tools: set[ToolKey] = set()
tools.update(to_key(dep.type, dep.provider, dep.tool_name) for dep in deps.dependencies)
tools.update(to_key(ref.type, ref.provider, ref.tool_name) for ref in deps.references)
# Process dependencies - tools that can be used without specific credentials
for dep in deps.dependencies:
tool_desc = ToolDescription(tool_type=dep.type, provider=dep.provider, tool_name=dep.tool_name)
tool_id = tool_desc.tool_id()
allowed_tools[tool_id] = tool_desc
creds: dict[ToolKey, set[str | None]] = {}
# Process references - tools that may require specific credentials
for ref in deps.references:
key = to_key(ref.type, ref.provider, ref.tool_name)
creds.setdefault(key, set()).add(ref.credential_id)
tool_desc = ToolDescription(tool_type=ref.type, provider=ref.provider, tool_name=ref.tool_name)
tool_id = tool_desc.tool_id()
allowed_tools[tool_id] = tool_desc
return cls(
allowed_tools=frozenset(tools),
credential_ids_by_tool={k: frozenset(v) for k, v in creds.items()},
)
# If reference has a credential_id, add it to the allowed credentials for this tool
if ref.credential_id is not None:
if tool_id not in credentials_by_tool:
credentials_by_tool[tool_id] = set()
credentials_by_tool[tool_id].add(ref.credential_id)
return cls(allowed_tools=allowed_tools, credentials_by_tool=credentials_by_tool)
def is_empty(self) -> bool:
return len(self.allowed_tools) == 0
@ -76,12 +88,13 @@ class ToolAccessPolicy(BaseModel):
if self.is_empty():
return True
if request.key not in self.allowed_tools:
tool_id = request.tool_description.tool_id()
if tool_id not in self.allowed_tools:
return False
allowed_credentials = self.credential_ids_by_tool.get(request.key)
if not allowed_credentials:
# No references for this tool: only allow invocation without credential.
return request.credential_id is None
return request.credential_id in allowed_credentials
# No special credential required, use default credentials only
if request.credential_id is None or request.credential_id == "":
return self.credentials_by_tool.get(tool_id) is None
# Special credential required, check if it is allowed
else:
return request.credential_id in self.credentials_by_tool.get(tool_id, set())