feat(agent-v2): add agent draft build lifecycle (#37907)

This commit is contained in:
zyssyz123 2026-06-25 10:13:52 +08:00 committed by GitHub
parent 2382a49616
commit 6c01798854
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 63 additions and 142 deletions

View File

@ -1,16 +1,5 @@
"""add workflow_version to workflow_agent_node_bindings
Restores the stage 1 §5.3 unique key
``(tenant_id, workflow_id, workflow_version, node_id)`` so draft and published
workflow bindings can coexist at the same workflow_id once we want to track
them per workflow version. ``workflow_version`` mirrors ``workflows.version``
("draft" or a published version string).
Because the New Agent Experience feature is pre-release, this table is empty
in every environment that matters; the ``server_default='draft'`` only exists
to keep developer-local rows valid during the alter and is dropped immediately
afterward so application code must specify ``workflow_version`` explicitly.
Revision ID: 97e2e1a644e8
Revises: f8b6b7e9c421
Create Date: 2026-05-25 11:43:37.611300
@ -33,10 +22,8 @@ def upgrade():
'workflow_version',
sa.String(length=255),
nullable=False,
server_default='draft',
)
)
batch_op.alter_column('workflow_version', server_default=None)
batch_op.drop_constraint(
batch_op.f('workflow_agent_node_binding_node_unique'), type_='unique'
)

View File

@ -18,8 +18,7 @@ depends_on = None
def upgrade():
with op.batch_alter_table("agents", schema=None) as batch_op:
batch_op.add_column(sa.Column("role", sa.String(length=255), nullable=False, server_default=""))
batch_op.alter_column("role", server_default=None)
batch_op.add_column(sa.Column("role", sa.String(length=255), nullable=False))
def downgrade():

View File

@ -6,15 +6,9 @@ Create Date: 2026-06-18 23:00:00.000000
"""
from __future__ import annotations
import json
from typing import Any
from alembic import op
import sqlalchemy as sa
from alembic import op
from sqlalchemy.dialects import mysql
from sqlalchemy.engine.mock import MockConnection
# revision identifiers, used by Alembic.
revision = "b2515f9d4c2a"
@ -37,46 +31,9 @@ def upgrade() -> None:
"agent_drive_files",
["tenant_id", "agent_id", "is_skill", "key"],
)
_remove_skills_files_from_snapshots()
def downgrade() -> None:
op.drop_index("agent_drive_files_tenant_agent_is_skill_key_idx", table_name="agent_drive_files")
op.drop_column("agent_drive_files", "skill_metadata")
op.drop_column("agent_drive_files", "is_skill")
def _remove_skills_files_from_snapshots() -> None:
connection = op.get_bind()
if connection is None or isinstance(connection, MockConnection):
return
snapshots = sa.table(
"agent_config_snapshots",
sa.column("id", sa.String()),
sa.column("config_snapshot", sa.Text()),
)
rows = connection.execute(sa.select(snapshots.c.id, snapshots.c.config_snapshot)).fetchall()
for row in rows:
cleaned = _strip_skills_files(row.config_snapshot)
if cleaned is None:
continue
connection.execute(
snapshots.update()
.where(snapshots.c.id == row.id)
.values(config_snapshot=json.dumps(cleaned, separators=(",", ":"), sort_keys=True))
)
def _strip_skills_files(raw_snapshot: Any) -> dict[str, Any] | None:
if raw_snapshot is None:
return None
if isinstance(raw_snapshot, str):
snapshot = json.loads(raw_snapshot)
elif isinstance(raw_snapshot, dict):
snapshot = dict(raw_snapshot)
else:
snapshot = dict(raw_snapshot)
if not isinstance(snapshot, dict) or "skills_files" not in snapshot:
return None
snapshot.pop("skills_files", None)
return snapshot

View File

@ -6,13 +6,10 @@ Create Date: 2026-06-24 20:15:00.000000
"""
from datetime import UTC, datetime
import sqlalchemy as sa
from alembic import op
import models
from libs.uuid_utils import uuidv7
# revision identifiers, used by Alembic.
revision = "e4f5a6b7c8d9"
@ -21,26 +18,15 @@ branch_labels = None
depends_on = None
def _is_pg(conn) -> bool:
return conn.dialect.name == "postgresql"
def _uuid_column(name: str, *, nullable: bool = False, primary_key: bool = False) -> sa.Column:
kwargs = {"nullable": nullable, "primary_key": primary_key}
if primary_key and _is_pg(op.get_bind()):
kwargs["server_default"] = sa.text("uuidv7()")
return sa.Column(name, models.types.StringUUID(), **kwargs)
def upgrade():
op.create_table(
"agent_config_drafts",
_uuid_column("id", primary_key=True),
sa.Column("id", models.types.StringUUID(), nullable=False),
sa.Column("tenant_id", models.types.StringUUID(), nullable=False),
sa.Column("agent_id", models.types.StringUUID(), nullable=False),
sa.Column("draft_type", sa.String(length=32), nullable=False),
sa.Column("account_id", models.types.StringUUID(), nullable=True),
sa.Column("draft_owner_key", sa.String(length=255), server_default="", nullable=False),
sa.Column("draft_owner_key", sa.String(length=255), nullable=False),
sa.Column("base_snapshot_id", models.types.StringUUID(), nullable=True),
sa.Column("config_snapshot", models.types.LongText(), nullable=False),
sa.Column("created_by", models.types.StringUUID(), nullable=True),
@ -63,71 +49,6 @@ def upgrade():
["tenant_id", "base_snapshot_id"],
)
bind = op.get_bind()
now = datetime.now(UTC).replace(tzinfo=None)
if bind.dialect.name == "postgresql":
op.execute(
sa.text(
"""
INSERT INTO agent_config_drafts (
id, tenant_id, agent_id, draft_type, account_id, draft_owner_key, base_snapshot_id,
config_snapshot, created_by, updated_by, created_at, updated_at
)
SELECT
uuidv7(), a.tenant_id, a.id, 'draft', NULL, '', s.id,
s.config_snapshot, a.created_by, a.updated_by, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP
FROM agents a
JOIN agent_config_snapshots s
ON s.tenant_id = a.tenant_id
AND s.agent_id = a.id
AND s.id = a.active_config_snapshot_id
WHERE a.active_config_snapshot_id IS NOT NULL
"""
)
)
else:
agents = bind.execute(
sa.text(
"""
SELECT
a.tenant_id, a.id AS agent_id, a.created_by, a.updated_by,
s.id AS snapshot_id, s.config_snapshot
FROM agents a
JOIN agent_config_snapshots s
ON s.tenant_id = a.tenant_id
AND s.agent_id = a.id
AND s.id = a.active_config_snapshot_id
WHERE a.active_config_snapshot_id IS NOT NULL
"""
)
).mappings()
for row in agents:
bind.execute(
sa.text(
"""
INSERT INTO agent_config_drafts (
id, tenant_id, agent_id, draft_type, account_id, draft_owner_key, base_snapshot_id,
config_snapshot, created_by, updated_by, created_at, updated_at
)
VALUES (
:id, :tenant_id, :agent_id, 'draft', NULL, '', :snapshot_id,
:config_snapshot, :created_by, :updated_by, :created_at, :updated_at
)
"""
),
{
"id": str(uuidv7()),
"tenant_id": row["tenant_id"],
"agent_id": row["agent_id"],
"snapshot_id": row["snapshot_id"],
"config_snapshot": row["config_snapshot"],
"created_by": row["created_by"],
"updated_by": row["updated_by"],
"created_at": now,
"updated_at": now,
},
)
def downgrade():
op.drop_index("agent_config_draft_base_snapshot_idx", table_name="agent_config_drafts")

View File

@ -576,7 +576,7 @@ class AgentRosterService:
Agent.status == AgentStatus.ACTIVE,
)
).all()
return {agent.app_id: agent for agent in agents if agent.app_id}
return {agent.app_id: agent for agent in agents if agent.app_id and agent.id}
def get_app_backing_agent(self, *, tenant_id: str, app_id: str) -> Agent | None:
"""Return the roster Agent that backs the given Agent App, if any."""
@ -860,11 +860,15 @@ class AgentRosterService:
def load_active_config_is_published_by_agent_id(self, *, tenant_id: str, agents: list[Agent]) -> dict[str, bool]:
"""Return whether each Agent's normal draft is aligned with its active published snapshot."""
agents = [agent for agent in agents if agent.id]
if not agents:
return {}
published_agent_ids = self._load_published_active_snapshot_agent_ids(tenant_id=tenant_id, agents=agents)
drafts = self._session.scalars(
select(AgentConfigDraft).where(
AgentConfigDraft.tenant_id == tenant_id,
AgentConfigDraft.agent_id.in_([agent.id for agent in agents] or [""]),
AgentConfigDraft.agent_id.in_([agent.id for agent in agents]),
AgentConfigDraft.draft_type == AgentConfigDraftType.DRAFT,
AgentConfigDraft.account_id.is_(None),
)

View File

@ -1731,6 +1731,59 @@ def test_active_config_is_published_flags_handle_matching_and_empty_snapshots():
) == {"agent-2": False}
def test_active_config_is_published_skips_empty_agent_ids():
empty_id_agent = Agent(
id="",
tenant_id="tenant-1",
name="Broken",
description="",
agent_kind=AgentKind.DIFY_AGENT,
scope=AgentScope.ROSTER,
source=AgentSource.AGENT_APP,
status=AgentStatus.ACTIVE,
active_config_snapshot_id=None,
)
fake_session = FakeSession(scalars=[["should-not-be-read"]])
assert AgentRosterService(fake_session).load_active_config_is_published_by_agent_id(
tenant_id="tenant-1",
agents=[empty_id_agent],
) == {}
assert fake_session._scalars == [["should-not-be-read"]]
def test_load_app_backing_agents_skips_empty_agent_ids():
valid_agent = Agent(
id="agent-1",
tenant_id="tenant-1",
name="Valid",
description="",
agent_kind=AgentKind.DIFY_AGENT,
scope=AgentScope.ROSTER,
source=AgentSource.AGENT_APP,
app_id="app-1",
status=AgentStatus.ACTIVE,
)
empty_id_agent = Agent(
id="",
tenant_id="tenant-1",
name="Broken",
description="",
agent_kind=AgentKind.DIFY_AGENT,
scope=AgentScope.ROSTER,
source=AgentSource.AGENT_APP,
app_id="app-2",
status=AgentStatus.ACTIVE,
)
result = AgentRosterService(FakeSession(scalars=[[valid_agent, empty_id_agent]])).load_app_backing_agents_by_app_id(
tenant_id="tenant-1",
app_ids=["app-1", "app-2"],
)
assert result == {"app-1": valid_agent}
def test_published_references_include_app_display_fields_and_sort_by_updated_at():
recent_updated_at = datetime(2026, 1, 7, 3, 4, 5, tzinfo=UTC)
stale_updated_at = datetime(2026, 1, 6, 3, 4, 5, tzinfo=UTC)