mirror of
https://github.com/langgenius/dify.git
synced 2026-06-08 00:41:55 +08:00
Co-authored-by: Claude Opus 4.8 (1M context) <noreply@anthropic.com> Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
173 lines
7.6 KiB
Python
173 lines
7.6 KiB
Python
import re
|
|
from typing import Any
|
|
|
|
from pydantic import ValidationError
|
|
|
|
from services.agent.errors import AgentSoulLockedError, InvalidComposerConfigError, PlaintextSecretNotAllowedError
|
|
from services.entities.agent_entities import (
|
|
AgentSoulConfig,
|
|
ComposerSavePayload,
|
|
ComposerVariant,
|
|
WorkflowNodeJobConfig,
|
|
)
|
|
|
|
_PLAINTEXT_SECRET_KEYS = {
|
|
"api_key",
|
|
"apikey",
|
|
"authorization",
|
|
"password",
|
|
"secret",
|
|
"secret_key",
|
|
}
|
|
|
|
# Env/secret names become shell ``export`` identifiers in the sandbox bootstrap, so
|
|
# they must be valid shell identifiers. Validating here fails fast at composer save
|
|
# with a friendly error instead of at run time in the agent backend shell layer.
|
|
_SHELL_ENV_NAME_PATTERN = re.compile(r"^[A-Za-z_][A-Za-z0-9_]*$")
|
|
_CLI_TOOL_NAME_KEYS = ("name", "tool_name", "label")
|
|
_CLI_TOOL_COMMAND_KEYS = ("command", "install_command", "install", "setup_command")
|
|
_DENIED_PERMISSION_STATUSES = frozenset({"unauthorized", "denied", "forbidden", "invalid", "unavailable"})
|
|
_DANGEROUS_FLAG_KEYS = ("dangerous", "dangerous_command", "requires_confirmation")
|
|
_DANGEROUS_ACK_KEYS = (
|
|
"dangerous_acknowledged",
|
|
"dangerous_accepted",
|
|
"risk_accepted",
|
|
"approved",
|
|
)
|
|
|
|
|
|
class ComposerConfigValidator:
|
|
@classmethod
|
|
def validate_save_payload(cls, payload: ComposerSavePayload) -> None:
|
|
if payload.variant == ComposerVariant.WORKFLOW and payload.soul_lock.locked and payload.agent_soul is not None:
|
|
raise AgentSoulLockedError()
|
|
|
|
if payload.agent_soul is not None:
|
|
cls.validate_agent_soul(payload.agent_soul)
|
|
if payload.node_job is not None:
|
|
cls.validate_node_job(payload.node_job)
|
|
|
|
@classmethod
|
|
def validate_agent_soul(cls, agent_soul: AgentSoulConfig) -> None:
|
|
dumped = agent_soul.model_dump(mode="json")
|
|
cls._reject_plaintext_secrets(dumped, path="agent_soul")
|
|
cls._validate_shell_config(dumped)
|
|
|
|
@classmethod
|
|
def validate_node_job(cls, node_job: WorkflowNodeJobConfig) -> None:
|
|
cls._reject_plaintext_secrets(node_job.model_dump(mode="json"), path="node_job")
|
|
|
|
@classmethod
|
|
def validate_agent_soul_dict(cls, value: dict[str, Any]) -> AgentSoulConfig:
|
|
try:
|
|
config = AgentSoulConfig.model_validate(value)
|
|
except ValidationError as exc:
|
|
raise InvalidComposerConfigError(str(exc)) from exc
|
|
cls.validate_agent_soul(config)
|
|
return config
|
|
|
|
@classmethod
|
|
def validate_node_job_dict(cls, value: dict[str, Any]) -> WorkflowNodeJobConfig:
|
|
try:
|
|
config = WorkflowNodeJobConfig.model_validate(value)
|
|
except ValidationError as exc:
|
|
raise InvalidComposerConfigError(str(exc)) from exc
|
|
cls.validate_node_job(config)
|
|
return config
|
|
|
|
@classmethod
|
|
def _validate_shell_config(cls, soul: dict[str, Any]) -> None:
|
|
"""Fail fast on shell env/secret/CLI config the sandbox would otherwise reject at run time."""
|
|
env = soul.get("env") or {}
|
|
seen_env_names: set[str] = set()
|
|
for section in ("variables", "secret_refs"):
|
|
entries = env.get(section)
|
|
if not isinstance(entries, list):
|
|
continue
|
|
for entry in entries:
|
|
if not isinstance(entry, dict):
|
|
continue
|
|
raw_name = entry.get("name")
|
|
if not isinstance(raw_name, str) or not raw_name.strip():
|
|
# Unnamed draft rows are tolerated; only named entries are bound to the shell.
|
|
continue
|
|
name = raw_name.strip()
|
|
if not _SHELL_ENV_NAME_PATTERN.fullmatch(name):
|
|
raise InvalidComposerConfigError(
|
|
f"env/secret name '{name}' must be a valid shell identifier (^[A-Za-z_][A-Za-z0-9_]*$)."
|
|
)
|
|
if section == "secret_refs" and cls._permission_denied(entry):
|
|
raise InvalidComposerConfigError(f"secret reference '{name}' is not authorized for this agent.")
|
|
if name in seen_env_names:
|
|
raise InvalidComposerConfigError(
|
|
f"duplicate env/secret name '{name}': environment variables and secret references "
|
|
"share the shell namespace."
|
|
)
|
|
seen_env_names.add(name)
|
|
|
|
tools = soul.get("tools") or {}
|
|
cli_tools = tools.get("cli_tools")
|
|
if isinstance(cli_tools, list):
|
|
for entry in cli_tools:
|
|
if not isinstance(entry, dict) or entry.get("enabled") is False:
|
|
continue
|
|
has_name = any(isinstance(entry.get(key), str) and entry[key].strip() for key in _CLI_TOOL_NAME_KEYS)
|
|
has_command = cls._has_install_command(entry)
|
|
if not has_name and not has_command:
|
|
raise InvalidComposerConfigError("an enabled CLI tool must declare a name or an install command.")
|
|
if cls._permission_denied(entry) or entry.get("pre_authorized") is False:
|
|
raise InvalidComposerConfigError("an enabled CLI tool is not authorized for runtime bootstrap.")
|
|
if cls._dangerous_without_acknowledgement(entry):
|
|
raise InvalidComposerConfigError(
|
|
"a dangerous CLI tool command must be explicitly acknowledged before save."
|
|
)
|
|
|
|
@classmethod
|
|
def _reject_plaintext_secrets(cls, value: Any, *, path: str) -> None:
|
|
if isinstance(value, dict):
|
|
for key, nested in value.items():
|
|
normalized_key = key.lower().replace("-", "_")
|
|
nested_path = f"{path}.{key}"
|
|
if normalized_key in _PLAINTEXT_SECRET_KEYS and isinstance(nested, str) and nested:
|
|
raise PlaintextSecretNotAllowedError(f"Plaintext secret is not allowed at {nested_path}")
|
|
cls._reject_plaintext_secrets(nested, path=nested_path)
|
|
elif isinstance(value, list):
|
|
for index, nested in enumerate(value):
|
|
cls._reject_plaintext_secrets(nested, path=f"{path}[{index}]")
|
|
|
|
@classmethod
|
|
def _has_install_command(cls, entry: dict[str, Any]) -> bool:
|
|
raw_commands = entry.get("install_commands")
|
|
if isinstance(raw_commands, list) and any(
|
|
isinstance(command, str) and command.strip() for command in raw_commands
|
|
):
|
|
return True
|
|
return any(isinstance(entry.get(key), str) and entry[key].strip() for key in _CLI_TOOL_COMMAND_KEYS)
|
|
|
|
@classmethod
|
|
def _permission_denied(cls, entry: dict[str, Any]) -> bool:
|
|
permission = entry.get("permission")
|
|
if isinstance(permission, dict):
|
|
allowed = permission.get("allowed")
|
|
if allowed is False:
|
|
return True
|
|
status = permission.get("status") or permission.get("state")
|
|
if isinstance(status, str) and status in _DENIED_PERMISSION_STATUSES:
|
|
return True
|
|
|
|
for key in ("authorization_status", "permission_status", "status"):
|
|
status = entry.get(key)
|
|
if isinstance(status, str) and status in _DENIED_PERMISSION_STATUSES:
|
|
return True
|
|
return False
|
|
|
|
@classmethod
|
|
def _dangerous_without_acknowledgement(cls, entry: dict[str, Any]) -> bool:
|
|
dangerous = any(entry.get(key) is True for key in _DANGEROUS_FLAG_KEYS)
|
|
risk_level = entry.get("risk_level")
|
|
if isinstance(risk_level, str) and risk_level == "dangerous":
|
|
dangerous = True
|
|
if not dangerous:
|
|
return False
|
|
return not any(entry.get(key) is True for key in _DANGEROUS_ACK_KEYS)
|