fix(agent): agent composer publish validation (#37803)

This commit is contained in:
林玮 (Jade Lin) 2026-06-23 16:48:40 +08:00 committed by GitHub
parent 0fcaf92d67
commit 04d226384f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 312 additions and 13 deletions

View File

@ -104,7 +104,7 @@ class WorkflowAgentComposerValidateApi(Resource):
@with_current_tenant_id
def post(self, tenant_id: str, app_model: App, node_id: str):
payload = ComposerSavePayload.model_validate(console_ns.payload or {})
ComposerConfigValidator.validate_save_payload(payload)
ComposerConfigValidator.validate_publish_payload(payload)
findings = AgentComposerService.collect_validation_findings(
tenant_id=tenant_id,
payload=payload,
@ -238,7 +238,7 @@ class AgentComposerValidateApi(Resource):
def post(self, tenant_id: str, agent_id: UUID):
_resolve_agent_app_id(tenant_id=tenant_id, agent_id=agent_id)
payload = ComposerSavePayload.model_validate(console_ns.payload or {})
ComposerConfigValidator.validate_save_payload(payload)
ComposerConfigValidator.validate_publish_payload(payload)
findings = AgentComposerService.collect_validation_findings(
tenant_id=tenant_id,
payload=payload,

View File

@ -46,6 +46,13 @@ from services.entities.agent_entities import (
# WorkflowAgentNodeBinding.workflow_version tag for the draft workflow row.
# Mirrors Workflow.version when it is "draft" (see models/workflow.py).
_DRAFT_WORKFLOW_VERSION = "draft"
_PUBLISH_SAVE_STRATEGIES = frozenset(
{
ComposerSaveStrategy.SAVE_AS_NEW_VERSION,
ComposerSaveStrategy.SAVE_AS_NEW_AGENT,
ComposerSaveStrategy.SAVE_TO_ROSTER,
}
)
logger = logging.getLogger(__name__)
@ -71,6 +78,13 @@ def _backfill_cli_tool_ids(agent_soul: AgentSoulConfig | None) -> None:
seen_ids.add(minted)
def _validate_composer_payload_for_strategy(payload: ComposerSavePayload) -> None:
if payload.save_strategy in _PUBLISH_SAVE_STRATEGIES:
ComposerConfigValidator.validate_publish_payload(payload)
return
ComposerConfigValidator.validate_draft_save_payload(payload)
class AgentComposerService:
@classmethod
def load_workflow_composer(cls, *, tenant_id: str, app_id: str, node_id: str) -> dict[str, Any]:
@ -100,7 +114,7 @@ class AgentComposerService:
raise ValueError("Workflow composer endpoint only accepts workflow variant")
_backfill_cli_tool_ids(payload.agent_soul)
ComposerConfigValidator.validate_save_payload(payload)
_validate_composer_payload_for_strategy(payload)
workflow = cls._get_draft_workflow(tenant_id=tenant_id, app_id=app_id)
binding = cls._get_workflow_binding(tenant_id=tenant_id, workflow_id=workflow.id, node_id=node_id)
@ -194,7 +208,7 @@ class AgentComposerService:
if payload.variant != ComposerVariant.AGENT_APP:
raise ValueError("Agent App composer endpoint only accepts agent_app variant")
_backfill_cli_tool_ids(payload.agent_soul)
ComposerConfigValidator.validate_save_payload(payload)
_validate_composer_payload_for_strategy(payload)
if payload.agent_soul is None:
raise ValueError("agent_soul is required")

View File

@ -50,7 +50,7 @@ _DANGEROUS_ACK_KEYS = (
class ComposerConfigValidator:
@classmethod
def validate_save_payload(cls, payload: ComposerSavePayload) -> None:
def validate_draft_save_payload(cls, payload: ComposerSavePayload) -> None:
if (
payload.variant == ComposerVariant.WORKFLOW
and payload.soul_lock.locked
@ -59,6 +59,13 @@ class ComposerConfigValidator:
):
raise AgentSoulLockedError()
@classmethod
def validate_save_payload(cls, payload: ComposerSavePayload) -> None:
cls.validate_publish_payload(payload)
@classmethod
def validate_publish_payload(cls, payload: ComposerSavePayload) -> None:
cls.validate_draft_save_payload(payload)
if payload.agent_soul is not None:
cls.validate_agent_soul(payload.agent_soul)
if payload.node_job is not None:

View File

@ -8,17 +8,25 @@ from pydantic import ValidationError
from sqlalchemy import select
from sqlalchemy.orm import Session
from core.workflow.nodes.agent_v2.validators import WorkflowAgentNodeValidator
from core.workflow.nodes.agent_v2.validators import WorkflowAgentNodeValidationError, WorkflowAgentNodeValidator
from models.agent import (
Agent,
AgentConfigSnapshot,
AgentDriveFile,
AgentScope,
AgentStatus,
WorkflowAgentBindingType,
WorkflowAgentNodeBinding,
)
from models.agent_config_entities import DeclaredOutputConfig, WorkflowNodeJobConfig
from models.agent_config_entities import AgentSoulConfig, DeclaredOutputConfig, WorkflowNodeJobConfig
from models.workflow import Workflow
from services.agent.composer_validator import ComposerConfigValidator
from services.entities.agent_entities import (
ComposerSavePayload,
ComposerSaveStrategy,
ComposerSoulLockPayload,
ComposerVariant,
)
class WorkflowAgentPublishService:
@ -67,11 +75,131 @@ class WorkflowAgentPublishService:
@classmethod
def validate_agent_nodes_for_publish(cls, *, session: Session, draft_workflow: Workflow) -> None:
WorkflowAgentNodeValidator.validate_published_workflow(session=session, workflow=draft_workflow)
cls._validate_composer_configs_for_publish(session=session, draft_workflow=draft_workflow)
@classmethod
def validate_agent_nodes_for_draft_sync(cls, *, session: Session, draft_workflow: Workflow) -> None:
WorkflowAgentNodeValidator.validate_draft_workflow(session=session, workflow=draft_workflow)
@classmethod
def _validate_composer_configs_for_publish(cls, *, session: Session, draft_workflow: Workflow) -> None:
node_ids = {
node_id for node_id, _node_data in WorkflowAgentNodeValidator.iter_agent_v2_nodes(draft_workflow.graph_dict)
}
if not node_ids:
return
bindings = session.scalars(
select(WorkflowAgentNodeBinding).where(
WorkflowAgentNodeBinding.tenant_id == draft_workflow.tenant_id,
WorkflowAgentNodeBinding.app_id == draft_workflow.app_id,
WorkflowAgentNodeBinding.workflow_id == draft_workflow.id,
WorkflowAgentNodeBinding.workflow_version == draft_workflow.version,
WorkflowAgentNodeBinding.node_id.in_(node_ids),
)
).all()
for binding in bindings:
cls._validate_binding_composer_config_for_publish(session=session, binding=binding)
@classmethod
def _validate_binding_composer_config_for_publish(
cls,
*,
session: Session,
binding: WorkflowAgentNodeBinding,
) -> None:
if not binding.agent_id:
return
agent = session.scalar(
select(Agent)
.where(
Agent.tenant_id == binding.tenant_id,
Agent.id == binding.agent_id,
)
.limit(1)
)
if agent is None:
return
snapshot_id = (
agent.active_config_snapshot_id
if binding.binding_type == WorkflowAgentBindingType.ROSTER_AGENT
else binding.current_snapshot_id
)
if snapshot_id is None:
return
snapshot = session.scalar(
select(AgentConfigSnapshot)
.where(
AgentConfigSnapshot.tenant_id == binding.tenant_id,
AgentConfigSnapshot.agent_id == agent.id,
AgentConfigSnapshot.id == snapshot_id,
)
.limit(1)
)
if snapshot is None:
return
agent_soul = AgentSoulConfig.model_validate(snapshot.config_snapshot_dict)
node_job = WorkflowNodeJobConfig.model_validate(binding.node_job_config_dict)
payload = ComposerSavePayload.model_construct(
variant=ComposerVariant.WORKFLOW,
save_strategy=ComposerSaveStrategy.NODE_JOB_ONLY,
soul_lock=ComposerSoulLockPayload(locked=False),
agent_soul=agent_soul,
node_job=node_job,
)
ComposerConfigValidator.validate_publish_payload(payload)
# ENG-623 §4.4: drive-backed refs must point at real drive rows before
# publishing. This stays out of composer save so autosave/save-draft can
# persist incomplete refs and surface them as non-blocking findings.
cls._require_drive_refs_resolved_for_publish(session=session, binding=binding, agent_soul=agent_soul)
@classmethod
def _require_drive_refs_resolved_for_publish(
cls,
*,
session: Session,
binding: WorkflowAgentNodeBinding,
agent_soul: AgentSoulConfig,
) -> None:
from services.agent.prompt_mentions import MentionKind, parse_prompt_mentions
from services.agent_drive_service import decode_drive_mention_ref
wanted_keys: dict[str, tuple[str, str]] = {}
for mention in parse_prompt_mentions(agent_soul.prompt.system_prompt):
if mention.kind not in {MentionKind.SKILL, MentionKind.FILE}:
continue
drive_key = decode_drive_mention_ref(mention.ref_id)
if not drive_key:
continue
code = "skill_ref_dangling" if mention.kind == MentionKind.SKILL else "file_ref_dangling"
wanted_keys[drive_key] = (code, mention.label or drive_key)
if not wanted_keys or not binding.agent_id:
return
existing_keys = set(
session.scalars(
select(AgentDriveFile.key).where(
AgentDriveFile.tenant_id == binding.tenant_id,
AgentDriveFile.agent_id == binding.agent_id,
AgentDriveFile.key.in_(sorted(wanted_keys)),
)
).all()
)
messages: list[str] = []
for key, (code, display) in wanted_keys.items():
if key in existing_keys:
continue
kind = "skill" if code == "skill_ref_dangling" else "file"
messages.append(f"{code}: {kind} '{display}' has no drive entry for key '{key}'.")
if messages:
raise WorkflowAgentNodeValidationError(
f"Workflow Agent node {binding.node_id} has invalid Agent Soul drive refs: {'; '.join(messages)}"
)
@classmethod
def sync_agent_bindings_for_draft(
cls,

View File

@ -972,7 +972,7 @@ def test_workflow_composer_get_put_validate_candidates_impact_and_save(
"save_workflow_composer",
lambda **kwargs: _workflow_composer_response(save_options=[kwargs["payload"].save_strategy.value]),
)
monkeypatch.setattr(composer_controller.ComposerConfigValidator, "validate_save_payload", lambda payload: None)
monkeypatch.setattr(composer_controller.ComposerConfigValidator, "validate_publish_payload", lambda payload: None)
monkeypatch.setattr(
composer_controller.AgentComposerService, "resolve_workflow_node_agent_id", lambda **kwargs: None
)
@ -1067,7 +1067,7 @@ def test_agent_composer_routes_resolve_app_from_agent_id(
"save_agent_app_composer",
save_agent_app_composer,
)
monkeypatch.setattr(composer_controller.ComposerConfigValidator, "validate_save_payload", lambda payload: None)
monkeypatch.setattr(composer_controller.ComposerConfigValidator, "validate_publish_payload", lambda payload: None)
monkeypatch.setattr(
composer_controller.AgentComposerService,
"collect_validation_findings",

View File

@ -3,7 +3,7 @@ import pytest
from models.agent_config_entities import AgentKnowledgeQueryMode, AgentSoulModelConfig, DeclaredOutputType
from services.agent.composer_service import AgentComposerService
from services.agent.composer_validator import ComposerConfigValidator
from services.agent.errors import AgentSoulLockedError, PlaintextSecretNotAllowedError
from services.agent.errors import AgentSoulLockedError, InvalidComposerConfigError, PlaintextSecretNotAllowedError
from services.entities.agent_entities import (
AgentSoulConfig,
ComposerSavePayload,
@ -64,6 +64,24 @@ def test_locked_workflow_node_job_only_allows_inline_soul_payload():
ComposerConfigValidator.validate_save_payload(payload)
def test_draft_save_payload_skips_publish_only_agent_soul_validation():
payload = ComposerSavePayload.model_validate(
{
"variant": ComposerVariant.AGENT_APP,
"save_strategy": ComposerSaveStrategy.SAVE_TO_CURRENT_VERSION,
"agent_soul": {
"prompt": {"system_prompt": "no human reference yet"},
"human": {"contacts": [{"id": "human-1", "name": "Reviewer"}]},
"env": {"variables": [{"name": "bad-name"}]},
},
}
)
ComposerConfigValidator.validate_draft_save_payload(payload)
with pytest.raises(InvalidComposerConfigError):
ComposerConfigValidator.validate_publish_payload(payload)
def test_agent_app_soul_allows_app_features_and_variables():
payload = ComposerSavePayload.model_validate(
{

View File

@ -4,6 +4,7 @@ from types import SimpleNamespace
import pytest
from core.workflow.nodes.agent_v2.validators import WorkflowAgentNodeValidationError
from models.agent import (
Agent,
AgentConfigRevisionOperation,
@ -168,7 +169,7 @@ def test_save_workflow_composer_dispatches_save_strategy(monkeypatch, strategy,
calls = []
monkeypatch.setattr(composer_service.db, "session", fake_session)
monkeypatch.setattr(composer_service.ComposerConfigValidator, "validate_save_payload", lambda payload: None)
monkeypatch.setattr(composer_service.ComposerConfigValidator, "validate_draft_save_payload", lambda payload: None)
monkeypatch.setattr(AgentComposerService, "_get_draft_workflow", lambda **kwargs: SimpleNamespace(id="workflow-1"))
monkeypatch.setattr(AgentComposerService, "_get_workflow_binding", lambda **kwargs: None)
monkeypatch.setattr(
@ -221,12 +222,52 @@ def test_save_workflow_composer_rejects_agent_app_variant():
)
def _duplicate_env_secret_payload(strategy: ComposerSaveStrategy) -> ComposerSavePayload:
return ComposerSavePayload.model_validate(
{
"variant": ComposerVariant.AGENT_APP.value,
"save_strategy": strategy.value,
"agent_soul": {
"prompt": {"system_prompt": "x"},
"env": {
"variables": [{"name": "TOKEN", "value": "plain"}],
"secret_refs": [{"name": "TOKEN", "value": "credential-1"}],
},
},
}
)
@pytest.mark.parametrize(
"strategy",
[
ComposerSaveStrategy.NODE_JOB_ONLY,
ComposerSaveStrategy.SAVE_TO_CURRENT_VERSION,
],
)
def test_draft_save_strategies_skip_publish_validation(strategy: ComposerSaveStrategy):
composer_service._validate_composer_payload_for_strategy(_duplicate_env_secret_payload(strategy))
@pytest.mark.parametrize(
"strategy",
[
ComposerSaveStrategy.SAVE_AS_NEW_VERSION,
ComposerSaveStrategy.SAVE_AS_NEW_AGENT,
ComposerSaveStrategy.SAVE_TO_ROSTER,
],
)
def test_publish_save_strategies_run_publish_validation(strategy: ComposerSaveStrategy):
with pytest.raises(InvalidComposerConfigError, match="duplicate env/secret name 'TOKEN'"):
composer_service._validate_composer_payload_for_strategy(_duplicate_env_secret_payload(strategy))
def test_save_agent_app_composer_creates_agent_when_missing(monkeypatch: pytest.MonkeyPatch):
fake_session = FakeSession(scalar=[None])
created_version = SimpleNamespace(id="version-1")
monkeypatch.setattr(composer_service.db, "session", fake_session)
monkeypatch.setattr(composer_service.ComposerConfigValidator, "validate_save_payload", lambda payload: None)
monkeypatch.setattr(composer_service.ComposerConfigValidator, "validate_draft_save_payload", lambda payload: None)
monkeypatch.setattr(AgentComposerService, "_create_config_version", lambda **kwargs: created_version)
monkeypatch.setattr(AgentComposerService, "load_agent_app_composer", lambda **kwargs: {"loaded": True})
payload = ComposerSavePayload.model_validate(
@ -256,7 +297,7 @@ def test_save_agent_app_composer_updates_current_version(monkeypatch: pytest.Mon
updated = {}
monkeypatch.setattr(composer_service.db, "session", fake_session)
monkeypatch.setattr(composer_service.ComposerConfigValidator, "validate_save_payload", lambda payload: None)
monkeypatch.setattr(composer_service.ComposerConfigValidator, "validate_draft_save_payload", lambda payload: None)
monkeypatch.setattr(AgentComposerService, "_require_version", lambda **kwargs: SimpleNamespace(id="version-1"))
monkeypatch.setattr(
AgentComposerService,
@ -2060,6 +2101,97 @@ class TestListWorkflowsReferencingAppAgent:
class TestWorkflowAgentDraftBindingSync:
def _agent_workflow(self) -> Workflow:
return Workflow(
id="workflow-1",
tenant_id="tenant-1",
app_id="app-1",
version=Workflow.VERSION_DRAFT,
graph=json.dumps(
{
"nodes": [{"id": "agent-node", "data": {"type": "agent", "version": "2"}}],
"edges": [],
}
),
)
def _agent_binding(self) -> WorkflowAgentNodeBinding:
return WorkflowAgentNodeBinding(
id="binding-1",
tenant_id="tenant-1",
app_id="app-1",
workflow_id="workflow-1",
workflow_version=Workflow.VERSION_DRAFT,
node_id="agent-node",
binding_type=WorkflowAgentBindingType.ROSTER_AGENT,
agent_id="agent-1",
current_snapshot_id="snapshot-1",
node_job_config=WorkflowNodeJobConfig(),
)
def _publish_agent(self) -> Agent:
return Agent(
id="agent-1",
tenant_id="tenant-1",
name="Iris",
status=AgentStatus.ACTIVE,
active_config_snapshot_id="snapshot-1",
)
def _snapshot(self, agent_soul: AgentSoulConfig) -> AgentConfigSnapshot:
return AgentConfigSnapshot(
id="snapshot-1",
tenant_id="tenant-1",
agent_id="agent-1",
version=1,
config_snapshot=agent_soul,
)
def test_publish_validation_rejects_agent_soul_publish_only_errors(self):
binding = self._agent_binding()
agent_soul = AgentSoulConfig.model_validate(
{
"model": {
"plugin_id": "langgenius/openai/openai",
"model_provider": "openai",
"model": "gpt-4o",
},
"prompt": {"system_prompt": "no human reference yet"},
"human": {"contacts": [{"id": "human-1", "name": "Reviewer"}]},
}
)
agent = self._publish_agent()
snapshot = self._snapshot(agent_soul)
session = FakeSession(scalar=[binding, agent, snapshot, agent, snapshot], scalars=[[binding]])
with pytest.raises(InvalidComposerConfigError, match="human_involvement_not_referenced"):
WorkflowAgentPublishService.validate_agent_nodes_for_publish(
session=session,
draft_workflow=self._agent_workflow(),
)
def test_publish_validation_rejects_dangling_agent_soul_drive_refs(self):
binding = self._agent_binding()
agent_soul = AgentSoulConfig.model_validate(
{
"model": {
"plugin_id": "langgenius/openai/openai",
"model_provider": "openai",
"model": "gpt-4o",
},
"prompt": {"system_prompt": "Use [§skill:research%2FSKILL.md:Research§]."},
}
)
agent = self._publish_agent()
snapshot = self._snapshot(agent_soul)
session = FakeSession(scalar=[binding, agent, snapshot, agent, snapshot], scalars=[[binding], []])
with pytest.raises(WorkflowAgentNodeValidationError, match="skill_ref_dangling"):
WorkflowAgentPublishService.validate_agent_nodes_for_publish(
session=session,
draft_workflow=self._agent_workflow(),
)
def test_projects_binding_declared_outputs_to_draft_graph_response(self):
workflow = Workflow(
id="workflow-1",