diff --git a/api/controllers/console/agent/roster.py b/api/controllers/console/agent/roster.py index c305a816eec..1066e51feae 100644 --- a/api/controllers/console/agent/roster.py +++ b/api/controllers/console/agent/roster.py @@ -18,6 +18,7 @@ from fields.agent_fields import ( AgentConfigSnapshotDetailResponse, AgentConfigSnapshotListResponse, AgentInviteOptionsResponse, + AgentPublishedReferenceResponse, AgentRosterListResponse, AgentRosterResponse, ) @@ -48,6 +49,7 @@ register_response_schema_models( AgentConfigSnapshotDetailResponse, AgentConfigSnapshotListResponse, AgentInviteOptionsResponse, + AgentPublishedReferenceResponse, AgentRosterListResponse, AgentRosterResponse, ) diff --git a/api/core/workflow/nodes/agent_v2/binding_resolver.py b/api/core/workflow/nodes/agent_v2/binding_resolver.py index d2f50b0ae4d..8dbabff2b8f 100644 --- a/api/core/workflow/nodes/agent_v2/binding_resolver.py +++ b/api/core/workflow/nodes/agent_v2/binding_resolver.py @@ -5,7 +5,7 @@ from dataclasses import dataclass from sqlalchemy import select from core.db.session_factory import session_factory -from models.agent import Agent, AgentConfigSnapshot, AgentStatus, WorkflowAgentNodeBinding +from models.agent import Agent, AgentConfigSnapshot, AgentStatus, WorkflowAgentBindingType, WorkflowAgentNodeBinding class WorkflowAgentBindingError(Exception): @@ -52,11 +52,6 @@ class WorkflowAgentBindingResolver: ) if binding.agent_id is None: raise WorkflowAgentBindingError("agent_not_available", "Workflow Agent binding has no agent.") - if binding.current_snapshot_id is None: - raise WorkflowAgentBindingError( - "agent_config_snapshot_not_found", - "Workflow Agent binding has no current config snapshot.", - ) agent = session.scalar( select(Agent) @@ -72,19 +67,30 @@ class WorkflowAgentBindingResolver: f"Agent {binding.agent_id} is not available.", ) + snapshot_id = ( + agent.active_config_snapshot_id + if binding.binding_type == WorkflowAgentBindingType.ROSTER_AGENT + else binding.current_snapshot_id + ) + if snapshot_id is None: + raise WorkflowAgentBindingError( + "agent_config_snapshot_not_found", + "Workflow Agent binding has no current config snapshot.", + ) + snapshot = session.scalar( select(AgentConfigSnapshot) .where( AgentConfigSnapshot.tenant_id == tenant_id, AgentConfigSnapshot.agent_id == agent.id, - AgentConfigSnapshot.id == binding.current_snapshot_id, + AgentConfigSnapshot.id == snapshot_id, ) .limit(1) ) if snapshot is None: raise WorkflowAgentBindingError( "agent_config_snapshot_not_found", - f"Agent config snapshot {binding.current_snapshot_id} not found.", + f"Agent config snapshot {snapshot_id} not found.", ) session.expunge(binding) diff --git a/api/core/workflow/nodes/agent_v2/validators.py b/api/core/workflow/nodes/agent_v2/validators.py index 59e6934ba5e..ca3adb5b0d1 100644 --- a/api/core/workflow/nodes/agent_v2/validators.py +++ b/api/core/workflow/nodes/agent_v2/validators.py @@ -8,7 +8,7 @@ from sqlalchemy.orm import Session from core.workflow.graph_topology import WorkflowGraphTopology from graphon.enums import BuiltinNodeTypes -from models.agent import Agent, AgentConfigSnapshot, AgentStatus, WorkflowAgentNodeBinding +from models.agent import Agent, AgentConfigSnapshot, AgentStatus, WorkflowAgentBindingType, WorkflowAgentNodeBinding from models.agent_config_entities import ( AgentFileRefConfig, AgentHumanContactConfig, @@ -102,10 +102,6 @@ class WorkflowAgentNodeValidator: ) -> None: if binding.agent_id is None: raise WorkflowAgentNodeValidationError(f"Workflow Agent node {binding.node_id} is missing agent binding.") - if binding.current_snapshot_id is None: - raise WorkflowAgentNodeValidationError( - f"Workflow Agent node {binding.node_id} is missing config snapshot binding." - ) agent = session.scalar( select(Agent) @@ -120,12 +116,22 @@ class WorkflowAgentNodeValidator: f"Workflow Agent node {binding.node_id} references an unavailable agent." ) + snapshot_id = ( + agent.active_config_snapshot_id + if binding.binding_type == WorkflowAgentBindingType.ROSTER_AGENT + else binding.current_snapshot_id + ) + if snapshot_id is None: + raise WorkflowAgentNodeValidationError( + f"Workflow Agent node {binding.node_id} is missing config snapshot binding." + ) + snapshot = session.scalar( select(AgentConfigSnapshot) .where( AgentConfigSnapshot.tenant_id == binding.tenant_id, AgentConfigSnapshot.agent_id == agent.id, - AgentConfigSnapshot.id == binding.current_snapshot_id, + AgentConfigSnapshot.id == snapshot_id, ) .limit(1) ) diff --git a/api/fields/agent_fields.py b/api/fields/agent_fields.py index 4c9b48094ae..ce4854ce360 100644 --- a/api/fields/agent_fields.py +++ b/api/fields/agent_fields.py @@ -41,10 +41,20 @@ class AgentConfigSnapshotSummaryResponse(ResponseModel): created_at: int | None = None +class AgentPublishedReferenceResponse(ResponseModel): + app_id: str + app_name: str + app_mode: str + workflow_id: str + workflow_version: str + node_ids: list[str] = Field(default_factory=list) + + class AgentRosterResponse(ResponseModel): id: str name: str description: str + role: str = "" icon_type: AgentIconType | None = None icon: str | None = None icon_background: str | None = None @@ -63,6 +73,9 @@ class AgentRosterResponse(ResponseModel): archived_at: int | None = None created_at: int | None = None updated_at: int | None = None + published_reference_count: int = 0 + published_node_reference_count: int = 0 + published_references: list[AgentPublishedReferenceResponse] = Field(default_factory=list) class AgentInviteOptionResponse(AgentRosterResponse): diff --git a/api/migrations/versions/2026_06_12_1100-0b2f2c8a9d1e_add_agent_role.py b/api/migrations/versions/2026_06_12_1100-0b2f2c8a9d1e_add_agent_role.py new file mode 100644 index 00000000000..900f7da06fc --- /dev/null +++ b/api/migrations/versions/2026_06_12_1100-0b2f2c8a9d1e_add_agent_role.py @@ -0,0 +1,27 @@ +"""add agent role + +Revision ID: 0b2f2c8a9d1e +Revises: 7bad07dc267d +Create Date: 2026-06-12 11:00:00.000000 + +""" + +import sqlalchemy as sa +from alembic import op + +# revision identifiers, used by Alembic. +revision = "0b2f2c8a9d1e" +down_revision = "7bad07dc267d" +branch_labels = None +depends_on = None + + +def upgrade(): + with op.batch_alter_table("agents", schema=None) as batch_op: + batch_op.add_column(sa.Column("role", sa.String(length=255), nullable=False, server_default="")) + batch_op.alter_column("role", server_default=None) + + +def downgrade(): + with op.batch_alter_table("agents", schema=None) as batch_op: + batch_op.drop_column("role") diff --git a/api/models/agent.py b/api/models/agent.py index 9624bf53359..8487bc18962 100644 --- a/api/models/agent.py +++ b/api/models/agent.py @@ -136,6 +136,7 @@ class Agent(DefaultFieldsMixin, Base): tenant_id: Mapped[str] = mapped_column(StringUUID, nullable=False) name: Mapped[str] = mapped_column(String(255), nullable=False) description: Mapped[str] = mapped_column(LongText, nullable=False, default="") + role: Mapped[str] = mapped_column(String(255), nullable=False, default="") icon_type: Mapped[AgentIconType | None] = mapped_column(EnumText(AgentIconType, length=32), nullable=True) icon: Mapped[str | None] = mapped_column( String(255), diff --git a/api/openapi/markdown/console-swagger.md b/api/openapi/markdown/console-swagger.md index 02a0109d938..2b66bcab70a 100644 --- a/api/openapi/markdown/console-swagger.md +++ b/api/openapi/markdown/console-swagger.md @@ -12148,6 +12148,10 @@ Supported icon storage formats for Agent roster entries. | in_current_workflow_count | integer | | No | | is_in_current_workflow | boolean | | No | | name | string | | Yes | +| published_node_reference_count | integer | | No | +| published_reference_count | integer | | No | +| published_references | [ [AgentPublishedReferenceResponse](#agentpublishedreferenceresponse) ] | | No | +| role | string | | No | | scope | [AgentScope](#agentscope) | | Yes | | source | [AgentSource](#agentsource) | | Yes | | status | [AgentStatus](#agentstatus) | | Yes | @@ -12255,6 +12259,17 @@ the current roster/workflow APIs scoped to Dify Agent. | state | string | | No | | status | string | | No | +#### AgentPublishedReferenceResponse + +| Name | Type | Description | Required | +| ---- | ---- | ----------- | -------- | +| app_id | string | | Yes | +| app_mode | string | | Yes | +| app_name | string | | Yes | +| node_ids | [ string ] | | No | +| workflow_id | string | | Yes | +| workflow_version | string | | Yes | + #### AgentReferencingWorkflowResponse | Name | Type | Description | Required | @@ -12299,6 +12314,10 @@ the current roster/workflow APIs scoped to Dify Agent. | icon_type | [AgentIconType](#agenticontype) | | No | | id | string | | Yes | | name | string | | Yes | +| published_node_reference_count | integer | | No | +| published_reference_count | integer | | No | +| published_references | [ [AgentPublishedReferenceResponse](#agentpublishedreferenceresponse) ] | | No | +| role | string | | No | | scope | [AgentScope](#agentscope) | | Yes | | source | [AgentSource](#agentsource) | | Yes | | status | [AgentStatus](#agentstatus) | | Yes | @@ -16548,6 +16567,7 @@ Payload for publishing snippet workflow. | icon_background | string | | No | | icon_type | [AgentIconType](#agenticontype) | | No | | name | string | | Yes | +| role | string | | No | | version_note | string | | No | #### RosterAgentUpdatePayload @@ -16559,6 +16579,7 @@ Payload for publishing snippet workflow. | icon_background | string | | No | | icon_type | [AgentIconType](#agenticontype) | | No | | name | string | | No | +| role | string | | No | #### RosterListQuery diff --git a/api/services/agent/composer_service.py b/api/services/agent/composer_service.py index 9458460f512..a2d73929035 100644 --- a/api/services/agent/composer_service.py +++ b/api/services/agent/composer_service.py @@ -2,7 +2,7 @@ import logging import uuid from typing import Any -from sqlalchemy import func, select +from sqlalchemy import func, or_, select from sqlalchemy.exc import IntegrityError from extensions.ext_database import db @@ -74,10 +74,15 @@ class AgentComposerService: return cls._empty_workflow_state(app_id=app_id, workflow_id=workflow.id, node_id=node_id) agent = cls._get_agent_if_present(tenant_id=tenant_id, agent_id=binding.agent_id) + version_id = ( + agent.active_config_snapshot_id + if agent and binding.binding_type == WorkflowAgentBindingType.ROSTER_AGENT + else binding.current_snapshot_id + ) version = cls._get_version_if_present( tenant_id=tenant_id, agent_id=agent.id if agent else None, - version_id=binding.current_snapshot_id, + version_id=version_id, ) return cls._serialize_workflow_state(binding=binding, agent=agent, version=version) @@ -129,10 +134,15 @@ class AgentComposerService: db.session.commit() agent = cls._get_agent_if_present(tenant_id=tenant_id, agent_id=binding.agent_id) + version_id = ( + agent.active_config_snapshot_id + if agent and binding.binding_type == WorkflowAgentBindingType.ROSTER_AGENT + else binding.current_snapshot_id + ) version = cls._get_version_if_present( tenant_id=tenant_id, agent_id=agent.id if agent else None, - version_id=binding.current_snapshot_id, + version_id=version_id, ) state = cls._serialize_workflow_state(binding=binding, agent=agent, version=version) state["validation"] = cls.collect_validation_findings(tenant_id=tenant_id, payload=payload) @@ -489,11 +499,26 @@ class AgentComposerService: @classmethod def calculate_impact(cls, *, tenant_id: str, current_snapshot_id: str) -> dict[str, Any]: + snapshot = db.session.scalar( + select(AgentConfigSnapshot) + .where( + AgentConfigSnapshot.tenant_id == tenant_id, + AgentConfigSnapshot.id == current_snapshot_id, + ) + .limit(1) + ) + agent_id = snapshot.agent_id if snapshot else None + predicates = [WorkflowAgentNodeBinding.current_snapshot_id == current_snapshot_id] + if agent_id: + predicates.append( + (WorkflowAgentNodeBinding.agent_id == agent_id) + & (WorkflowAgentNodeBinding.binding_type == WorkflowAgentBindingType.ROSTER_AGENT) + ) bindings = list( db.session.scalars( select(WorkflowAgentNodeBinding).where( WorkflowAgentNodeBinding.tenant_id == tenant_id, - WorkflowAgentNodeBinding.current_snapshot_id == current_snapshot_id, + or_(*predicates), ) ).all() ) @@ -1003,7 +1028,7 @@ class AgentComposerService: "id": binding.id, "binding_type": binding.binding_type.value, "agent_id": binding.agent_id, - "current_snapshot_id": binding.current_snapshot_id, + "current_snapshot_id": version.id if version else binding.current_snapshot_id, "workflow_id": binding.workflow_id, "node_id": binding.node_id, }, @@ -1022,10 +1047,8 @@ class AgentComposerService: # this is the same list (so callers don't need to special-case). "effective_declared_outputs": cls._serialize_effective_outputs(cls._declared_outputs_from_binding(binding)), "save_options": save_options, - "impact_summary": cls.calculate_impact( - tenant_id=binding.tenant_id, current_snapshot_id=binding.current_snapshot_id - ) - if binding.current_snapshot_id + "impact_summary": cls.calculate_impact(tenant_id=binding.tenant_id, current_snapshot_id=version.id) + if version else None, } diff --git a/api/services/agent/roster_service.py b/api/services/agent/roster_service.py index c4ec16c0b73..ab57e22268a 100644 --- a/api/services/agent/roster_service.py +++ b/api/services/agent/roster_service.py @@ -18,6 +18,7 @@ from models.agent import ( WorkflowAgentNodeBinding, ) from models.agent_config_entities import AgentSoulConfig +from models.enums import AppStatus from models.model import App from models.workflow import Workflow from services.agent.composer_validator import ComposerConfigValidator @@ -37,6 +38,7 @@ class AgentReferencingWorkflow(TypedDict): app_name: str app_mode: str workflow_id: str + workflow_version: str node_ids: list[str] @@ -45,11 +47,17 @@ class AgentRosterService: self._session = session @staticmethod - def serialize_agent(agent: Agent, active_version: AgentConfigSnapshot | None = None) -> dict[str, Any]: + def serialize_agent( + agent: Agent, + active_version: AgentConfigSnapshot | None = None, + published_references: list[AgentReferencingWorkflow] | None = None, + ) -> dict[str, Any]: + published_references = published_references or [] return { "id": agent.id, "name": agent.name, "description": agent.description, + "role": agent.role or "", "icon_type": agent.icon_type.value if agent.icon_type else None, "icon": agent.icon, "icon_background": agent.icon_background, @@ -68,6 +76,9 @@ class AgentRosterService: "archived_at": to_timestamp(agent.archived_at), "created_at": to_timestamp(agent.created_at), "updated_at": to_timestamp(agent.updated_at), + "published_reference_count": len(published_references), + "published_node_reference_count": sum(len(item["node_ids"]) for item in published_references), + "published_references": published_references, } @staticmethod @@ -104,13 +115,23 @@ class AgentRosterService: versions_by_id = self._load_versions_by_id( [agent.active_config_snapshot_id for agent in agents if agent.active_config_snapshot_id] ) + published_references_by_agent_id = self._load_published_references_by_agent_id( + tenant_id=tenant_id, + agent_ids=[agent.id for agent in agents], + ) data = [] for agent in agents: active_version = ( versions_by_id.get(agent.active_config_snapshot_id) if agent.active_config_snapshot_id else None ) - data.append(self.serialize_agent(agent, active_version)) + data.append( + self.serialize_agent( + agent, + active_version, + published_references_by_agent_id.get(agent.id, []), + ) + ) return { "data": data, @@ -170,6 +191,7 @@ class AgentRosterService: tenant_id=tenant_id, name=payload.name, description=payload.description, + role=payload.role, icon_type=payload.icon_type, icon=payload.icon, icon_background=payload.icon_background, @@ -241,6 +263,7 @@ class AgentRosterService: tenant_id=tenant_id, name=name, description=description, + role="", icon_type=icon_type, icon=icon, icon_background=icon_background, @@ -306,48 +329,18 @@ class AgentRosterService: if agent is None: return [] - bindings = self._session.scalars( - select(WorkflowAgentNodeBinding).where( - WorkflowAgentNodeBinding.tenant_id == tenant_id, - WorkflowAgentNodeBinding.agent_id == agent.id, - WorkflowAgentNodeBinding.binding_type == WorkflowAgentBindingType.ROSTER_AGENT, - ) - ).all() - if not bindings: - return [] - - # Collapse the per-version / per-node rows into one entry per workflow app. - node_ids_by_workflow: dict[tuple[str, str], set[str]] = {} - for binding in bindings: - node_ids_by_workflow.setdefault((binding.app_id, binding.workflow_id), set()).add(binding.node_id) - - referenced_app_ids = {workflow_app_id for workflow_app_id, _ in node_ids_by_workflow} - apps = {app.id: app for app in self._session.scalars(select(App).where(App.id.in_(referenced_app_ids))).all()} - - result: list[AgentReferencingWorkflow] = [] - for (workflow_app_id, workflow_id), node_ids in node_ids_by_workflow.items(): - app = apps.get(workflow_app_id) - if app is None: - # Orphaned binding (workflow app deleted): skip rather than 500. - continue - result.append( - AgentReferencingWorkflow( - app_id=workflow_app_id, - app_name=app.name, - app_mode=str(app.mode), - workflow_id=workflow_id, - node_ids=sorted(node_ids), - ) - ) - result.sort(key=lambda item: item["app_name"].lower()) - return result + return self._load_published_references_by_agent_id(tenant_id=tenant_id, agent_ids=[agent.id]).get(agent.id, []) def get_roster_agent_detail(self, *, tenant_id: str, agent_id: str) -> dict[str, Any]: agent = self._get_agent(tenant_id=tenant_id, agent_id=agent_id, roster_only=True) active_version = self._get_version( tenant_id=tenant_id, agent_id=agent.id, version_id=agent.active_config_snapshot_id ) - return self.serialize_agent(agent, active_version) + published_references_by_agent_id = self._load_published_references_by_agent_id( + tenant_id=tenant_id, + agent_ids=[agent.id], + ) + return self.serialize_agent(agent, active_version, published_references_by_agent_id.get(agent.id, [])) def update_roster_agent( self, *, tenant_id: str, agent_id: str, account_id: str, payload: RosterAgentUpdatePayload @@ -450,6 +443,68 @@ class AgentRosterService: raise AgentVersionNotFoundError() return version + def _load_published_references_by_agent_id( + self, *, tenant_id: str, agent_ids: list[str] + ) -> dict[str, list[AgentReferencingWorkflow]]: + if not agent_ids: + return {} + + bindings = list( + self._session.scalars( + select(WorkflowAgentNodeBinding).where( + WorkflowAgentNodeBinding.tenant_id == tenant_id, + WorkflowAgentNodeBinding.agent_id.in_(agent_ids), + WorkflowAgentNodeBinding.binding_type == WorkflowAgentBindingType.ROSTER_AGENT, + WorkflowAgentNodeBinding.workflow_version != Workflow.VERSION_DRAFT, + ) + ).all() + ) + if not bindings: + return {} + + app_ids = {binding.app_id for binding in bindings} + apps = { + app.id: app + for app in self._session.scalars( + select(App).where( + App.tenant_id == tenant_id, + App.id.in_(app_ids), + App.status == AppStatus.NORMAL, + ) + ).all() + } + + grouped: dict[str, dict[tuple[str, str], AgentReferencingWorkflow]] = {} + for binding in bindings: + if not binding.agent_id: + continue + app = apps.get(binding.app_id) + if app is None or app.workflow_id != binding.workflow_id: + continue + by_workflow = grouped.setdefault(binding.agent_id, {}) + key = (binding.app_id, binding.workflow_id) + item = by_workflow.setdefault( + key, + AgentReferencingWorkflow( + app_id=binding.app_id, + app_name=app.name, + app_mode=str(app.mode), + workflow_id=binding.workflow_id, + workflow_version=binding.workflow_version, + node_ids=[], + ), + ) + item["node_ids"].append(binding.node_id) + + result: dict[str, list[AgentReferencingWorkflow]] = {} + for agent_id, by_workflow in grouped.items(): + references = list(by_workflow.values()) + for reference in references: + reference["node_ids"] = sorted(set(reference["node_ids"])) + references.sort(key=lambda item: (item["app_name"].lower(), item["workflow_id"])) + result[agent_id] = references + return result + def _load_versions_by_id(self, version_ids: list[str]) -> dict[str, AgentConfigSnapshot]: if not version_ids: return {} diff --git a/api/services/agent/workflow_publish_service.py b/api/services/agent/workflow_publish_service.py index af3e5112290..13927026c48 100644 --- a/api/services/agent/workflow_publish_service.py +++ b/api/services/agent/workflow_publish_service.py @@ -1,10 +1,13 @@ from __future__ import annotations +from collections.abc import Mapping +from typing import Any + from sqlalchemy import select from sqlalchemy.orm import Session from core.workflow.nodes.agent_v2.validators import WorkflowAgentNodeValidator -from models.agent import WorkflowAgentNodeBinding +from models.agent import Agent, AgentScope, AgentStatus, WorkflowAgentBindingType, WorkflowAgentNodeBinding from models.agent_config_entities import WorkflowNodeJobConfig from models.workflow import Workflow @@ -12,6 +15,9 @@ from models.workflow import Workflow class WorkflowAgentPublishService: """Validate and freeze Workflow Agent v2 bindings during workflow publish.""" + _DRAFT_WORKFLOW_VERSION = Workflow.VERSION_DRAFT + _AGENT_BINDING_KEY = "agent_binding" + @classmethod def validate_agent_nodes_for_publish(cls, *, session: Session, draft_workflow: Workflow) -> None: WorkflowAgentNodeValidator.validate_published_workflow(session=session, workflow=draft_workflow) @@ -20,6 +26,100 @@ class WorkflowAgentPublishService: def validate_agent_nodes_for_draft_sync(cls, *, session: Session, draft_workflow: Workflow) -> None: WorkflowAgentNodeValidator.validate_draft_workflow(session=session, workflow=draft_workflow) + @classmethod + def sync_roster_agent_bindings_for_draft( + cls, + *, + session: Session, + draft_workflow: Workflow, + account_id: str, + ) -> None: + agent_nodes = dict(WorkflowAgentNodeValidator.iter_agent_v2_nodes(draft_workflow.graph_dict)) + existing_bindings = list( + session.scalars( + select(WorkflowAgentNodeBinding).where( + WorkflowAgentNodeBinding.tenant_id == draft_workflow.tenant_id, + WorkflowAgentNodeBinding.app_id == draft_workflow.app_id, + WorkflowAgentNodeBinding.workflow_id == draft_workflow.id, + WorkflowAgentNodeBinding.workflow_version == cls._DRAFT_WORKFLOW_VERSION, + ) + ).all() + ) + existing_by_node_id = {binding.node_id: binding for binding in existing_bindings} + + for binding in existing_bindings: + if binding.node_id not in agent_nodes: + session.delete(binding) + + for node_id, node_data in agent_nodes.items(): + binding_payload = node_data.get(cls._AGENT_BINDING_KEY) + if binding_payload is None: + continue + if not isinstance(binding_payload, Mapping): + raise ValueError(f"Workflow Agent node {node_id} has invalid agent_binding.") + cls._sync_roster_agent_binding_for_node( + session=session, + draft_workflow=draft_workflow, + node_id=node_id, + node_binding=binding_payload, + existing_binding=existing_by_node_id.get(node_id), + account_id=account_id, + ) + session.flush() + + @classmethod + def _sync_roster_agent_binding_for_node( + cls, + *, + session: Session, + draft_workflow: Workflow, + node_id: str, + node_binding: Mapping[str, Any], + existing_binding: WorkflowAgentNodeBinding | None, + account_id: str, + ) -> None: + binding_type = node_binding.get("binding_type") + if binding_type != WorkflowAgentBindingType.ROSTER_AGENT.value: + raise ValueError(f"Workflow Agent node {node_id} only supports roster_agent graph binding.") + agent_id = node_binding.get("agent_id") + if not isinstance(agent_id, str) or not agent_id: + raise ValueError(f"Workflow Agent node {node_id} roster_agent binding requires agent_id.") + + agent = session.scalar( + select(Agent) + .where( + Agent.tenant_id == draft_workflow.tenant_id, + Agent.id == agent_id, + Agent.scope == AgentScope.ROSTER, + Agent.status == AgentStatus.ACTIVE, + ) + .limit(1) + ) + if agent is None: + raise ValueError(f"Workflow Agent node {node_id} references an unavailable roster agent.") + if not agent.active_config_snapshot_id: + raise ValueError(f"Workflow Agent node {node_id} roster agent has no active config snapshot.") + + binding = existing_binding + if binding is None: + binding = WorkflowAgentNodeBinding( + tenant_id=draft_workflow.tenant_id, + app_id=draft_workflow.app_id, + workflow_id=draft_workflow.id, + workflow_version=cls._DRAFT_WORKFLOW_VERSION, + node_id=node_id, + node_job_config=WorkflowNodeJobConfig(), + created_by=account_id, + ) + session.add(binding) + elif not binding.node_job_config: + binding.node_job_config = WorkflowNodeJobConfig() + + binding.binding_type = WorkflowAgentBindingType.ROSTER_AGENT + binding.agent_id = agent.id + binding.current_snapshot_id = agent.active_config_snapshot_id + binding.updated_by = account_id + @classmethod def copy_agent_node_bindings_to_published( cls, @@ -43,8 +143,26 @@ class WorkflowAgentPublishService: WorkflowAgentNodeBinding.node_id.in_(node_ids), ) ).all() + if not bindings: + return + + agents_by_id = { + agent.id: agent + for agent in session.scalars( + select(Agent).where( + Agent.tenant_id == draft_workflow.tenant_id, + Agent.id.in_({binding.agent_id for binding in bindings if binding.agent_id}), + ) + ).all() + } for binding in bindings: + agent = agents_by_id.get(binding.agent_id) if binding.agent_id else None + current_snapshot_id = ( + agent.active_config_snapshot_id + if agent is not None and binding.binding_type == WorkflowAgentBindingType.ROSTER_AGENT + else binding.current_snapshot_id + ) copied = WorkflowAgentNodeBinding( tenant_id=binding.tenant_id, app_id=binding.app_id, @@ -53,7 +171,7 @@ class WorkflowAgentPublishService: node_id=binding.node_id, binding_type=binding.binding_type, agent_id=binding.agent_id, - current_snapshot_id=binding.current_snapshot_id, + current_snapshot_id=current_snapshot_id, node_job_config=WorkflowNodeJobConfig.model_validate(binding.node_job_config_dict), created_by=binding.created_by, updated_by=binding.updated_by, diff --git a/api/services/entities/agent_entities.py b/api/services/entities/agent_entities.py index e2730d1357a..63ae101533d 100644 --- a/api/services/entities/agent_entities.py +++ b/api/services/entities/agent_entities.py @@ -61,6 +61,7 @@ class ComposerSavePayload(BaseModel): class RosterAgentCreatePayload(BaseModel): name: str = Field(min_length=1, max_length=255) description: str = "" + role: str = Field(default="", max_length=255) icon_type: AgentIconType | None = None icon: str | None = Field(default=None, max_length=255) icon_background: str | None = Field(default=None, max_length=255) @@ -71,6 +72,7 @@ class RosterAgentCreatePayload(BaseModel): class RosterAgentUpdatePayload(BaseModel): name: str | None = Field(default=None, min_length=1, max_length=255) description: str | None = None + role: str | None = Field(default=None, max_length=255) icon_type: AgentIconType | None = None icon: str | None = Field(default=None, max_length=255) icon_background: str | None = Field(default=None, max_length=255) diff --git a/api/services/workflow_service.py b/api/services/workflow_service.py index 50dd977749b..4be120ac782 100644 --- a/api/services/workflow_service.py +++ b/api/services/workflow_service.py @@ -322,6 +322,12 @@ class WorkflowService: from services.agent.workflow_publish_service import WorkflowAgentPublishService + db.session.flush() + WorkflowAgentPublishService.sync_roster_agent_bindings_for_draft( + session=cast(Session, db.session), + draft_workflow=workflow, + account_id=account.id, + ) WorkflowAgentPublishService.validate_agent_nodes_for_draft_sync( session=cast(Session, db.session), draft_workflow=workflow, diff --git a/api/tests/unit_tests/core/workflow/nodes/agent_v2/test_binding_resolver.py b/api/tests/unit_tests/core/workflow/nodes/agent_v2/test_binding_resolver.py index 1f75e4c19d9..a628c76b38d 100644 --- a/api/tests/unit_tests/core/workflow/nodes/agent_v2/test_binding_resolver.py +++ b/api/tests/unit_tests/core/workflow/nodes/agent_v2/test_binding_resolver.py @@ -4,7 +4,7 @@ from core.workflow.nodes.agent_v2.binding_resolver import ( WorkflowAgentBindingError, WorkflowAgentBindingResolver, ) -from models.agent import Agent, AgentConfigSnapshot, AgentStatus, WorkflowAgentNodeBinding +from models.agent import Agent, AgentConfigSnapshot, AgentStatus, WorkflowAgentBindingType, WorkflowAgentNodeBinding from models.agent_config_entities import AgentSoulConfig, AgentSoulModelConfig, WorkflowNodeJobConfig @@ -85,6 +85,25 @@ def test_binding_resolver_returns_detached_binding_bundle(monkeypatch: pytest.Mo assert fake_session.expunge_calls == [bundle.binding, bundle.agent, bundle.snapshot] +def test_binding_resolver_uses_active_snapshot_for_roster_agent(monkeypatch: pytest.MonkeyPatch): + binding = _binding() + binding.binding_type = WorkflowAgentBindingType.ROSTER_AGENT + binding.current_snapshot_id = "old-snapshot" + agent = _agent() + agent.active_config_snapshot_id = "active-snapshot" + snapshot = _snapshot() + snapshot.id = "active-snapshot" + fake_session = FakeSession([binding, agent, snapshot]) + monkeypatch.setattr( + "core.workflow.nodes.agent_v2.binding_resolver.session_factory.create_session", + lambda: fake_session, + ) + + bundle = WorkflowAgentBindingResolver().resolve(**_resolve()) + + assert bundle.snapshot.id == "active-snapshot" + + def test_binding_resolver_raises_when_binding_missing(monkeypatch: pytest.MonkeyPatch): monkeypatch.setattr( "core.workflow.nodes.agent_v2.binding_resolver.session_factory.create_session", diff --git a/api/tests/unit_tests/core/workflow/nodes/agent_v2/test_validators.py b/api/tests/unit_tests/core/workflow/nodes/agent_v2/test_validators.py index 7237f01319b..440bd49e5c0 100644 --- a/api/tests/unit_tests/core/workflow/nodes/agent_v2/test_validators.py +++ b/api/tests/unit_tests/core/workflow/nodes/agent_v2/test_validators.py @@ -8,7 +8,7 @@ from core.workflow.nodes.agent_v2.validators import ( WorkflowAgentNodeValidationError, WorkflowAgentNodeValidator, ) -from models.agent import Agent, AgentConfigSnapshot, AgentStatus, WorkflowAgentNodeBinding +from models.agent import Agent, AgentConfigSnapshot, AgentStatus, WorkflowAgentBindingType, WorkflowAgentNodeBinding from models.agent_config_entities import AgentSoulConfig, AgentSoulModelConfig, WorkflowNodeJobConfig from models.workflow import Workflow @@ -111,6 +111,24 @@ def test_publish_validation_accepts_upstream_previous_output_ref(): ) +def test_publish_validation_uses_active_snapshot_for_roster_agent(): + node_job = WorkflowNodeJobConfig() + binding = _binding(node_job) + binding.binding_type = WorkflowAgentBindingType.ROSTER_AGENT + binding.current_snapshot_id = "old-snapshot" + agent = _agent() + agent.active_config_snapshot_id = "active-snapshot" + snapshot = _snapshot() + snapshot.id = "active-snapshot" + session = Mock() + session.scalar.side_effect = [binding, agent, snapshot] + + WorkflowAgentNodeValidator.validate_published_workflow( + session=session, + workflow=_workflow(_graph([{"source": "start", "target": "agent-node"}])), + ) + + def test_publish_validation_rejects_non_upstream_previous_output_ref(): node_job = WorkflowNodeJobConfig.model_validate( {"previous_node_output_refs": [{"node_id": "later-node", "output": "text"}]} diff --git a/api/tests/unit_tests/services/agent/test_agent_services.py b/api/tests/unit_tests/services/agent/test_agent_services.py index fd73d65d32d..f7bcde44f50 100644 --- a/api/tests/unit_tests/services/agent/test_agent_services.py +++ b/api/tests/unit_tests/services/agent/test_agent_services.py @@ -14,11 +14,14 @@ from models.agent import ( WorkflowAgentBindingType, WorkflowAgentNodeBinding, ) +from models.agent_config_entities import WorkflowNodeJobConfig +from models.workflow import Workflow from services.agent import composer_service, roster_service from services.agent.composer_service import AgentComposerService from services.agent.composer_validator import ComposerConfigValidator from services.agent.errors import InvalidComposerConfigError from services.agent.roster_service import AgentRosterService +from services.agent.workflow_publish_service import WorkflowAgentPublishService from services.entities.agent_entities import AgentSoulConfig, ComposerSavePayload, ComposerSaveStrategy, ComposerVariant @@ -35,6 +38,7 @@ class FakeSession: self._scalars = list(scalars or []) self._scalar = list(scalar or []) self.added = [] + self.deleted = [] self.commits = 0 self.flushes = 0 self.rollbacks = 0 @@ -52,6 +56,9 @@ class FakeSession: def add(self, value): self.added.append(value) + def delete(self, value): + self.deleted.append(value) + def flush(self): self.flushes += 1 for index, value in enumerate(self.added, start=1): @@ -84,10 +91,18 @@ def test_load_workflow_composer_returns_empty_state(monkeypatch): def test_load_workflow_composer_serializes_existing_binding(monkeypatch): - binding = SimpleNamespace(agent_id="agent-1", current_snapshot_id="version-1") + binding = SimpleNamespace( + agent_id="agent-1", + binding_type=WorkflowAgentBindingType.ROSTER_AGENT, + current_snapshot_id="version-1", + ) monkeypatch.setattr(AgentComposerService, "_get_draft_workflow", lambda **kwargs: SimpleNamespace(id="workflow-1")) monkeypatch.setattr(AgentComposerService, "_get_workflow_binding", lambda **kwargs: binding) - monkeypatch.setattr(AgentComposerService, "_get_agent_if_present", lambda **kwargs: SimpleNamespace(id="agent-1")) + monkeypatch.setattr( + AgentComposerService, + "_get_agent_if_present", + lambda **kwargs: SimpleNamespace(id="agent-1", active_config_snapshot_id="version-1"), + ) monkeypatch.setattr( AgentComposerService, "_get_version_if_present", @@ -116,14 +131,22 @@ def test_load_workflow_composer_serializes_existing_binding(monkeypatch): ) def test_save_workflow_composer_dispatches_save_strategy(monkeypatch, strategy, helper_name): fake_session = FakeSession() - binding = SimpleNamespace(agent_id="agent-1", current_snapshot_id="version-1") + binding = SimpleNamespace( + agent_id="agent-1", + binding_type=WorkflowAgentBindingType.ROSTER_AGENT, + current_snapshot_id="version-1", + ) calls = [] monkeypatch.setattr(composer_service.db, "session", fake_session) monkeypatch.setattr(composer_service.ComposerConfigValidator, "validate_save_payload", lambda payload: None) monkeypatch.setattr(AgentComposerService, "_get_draft_workflow", lambda **kwargs: SimpleNamespace(id="workflow-1")) monkeypatch.setattr(AgentComposerService, "_get_workflow_binding", lambda **kwargs: None) - monkeypatch.setattr(AgentComposerService, "_get_agent_if_present", lambda **kwargs: SimpleNamespace(id="agent-1")) + monkeypatch.setattr( + AgentComposerService, + "_get_agent_if_present", + lambda **kwargs: SimpleNamespace(id="agent-1", active_config_snapshot_id="version-1"), + ) monkeypatch.setattr( AgentComposerService, "_get_version_if_present", @@ -523,6 +546,7 @@ def test_roster_list_and_invite_options(monkeypatch): tenant_id="tenant-1", name="Analyst", description="", + role="researcher", agent_kind=AgentKind.DIFY_AGENT, scope=AgentScope.ROSTER, source=AgentSource.AGENT_APP, @@ -539,11 +563,13 @@ def test_roster_list_and_invite_options(monkeypatch): ) service = AgentRosterService(fake_session) monkeypatch.setattr(service, "_load_versions_by_id", lambda version_ids: {"version-1": version}) + monkeypatch.setattr(service, "_load_published_references_by_agent_id", lambda **kwargs: {}) listed = service.list_roster_agents(tenant_id="tenant-1", page=1, limit=20) invited = service.list_invite_options(tenant_id="tenant-1", page=1, limit=20, app_id="app-1") assert listed["data"][0]["active_config_snapshot"]["id"] == "version-1" + assert listed["data"][0]["role"] == "researcher" assert listed["data"][0]["created_at"] == int(created_at.timestamp()) assert listed["data"][0]["updated_at"] == int(updated_at.timestamp()) assert listed["data"][0]["active_config_snapshot"]["created_at"] == int(version_created_at.timestamp()) @@ -886,13 +912,19 @@ class TestListWorkflowsReferencingAppAgent: def test_groups_bindings_by_workflow_app_and_sorts_by_name(self): agent = SimpleNamespace(id="agent-1") bindings = [ - SimpleNamespace(app_id="wf-app-1", workflow_id="wf-1", node_id="node-b"), - SimpleNamespace(app_id="wf-app-1", workflow_id="wf-1", node_id="node-a"), - SimpleNamespace(app_id="wf-app-2", workflow_id="wf-2", node_id="node-a"), + SimpleNamespace( + agent_id="agent-1", app_id="wf-app-1", workflow_id="wf-1", workflow_version="v1", node_id="node-b" + ), + SimpleNamespace( + agent_id="agent-1", app_id="wf-app-1", workflow_id="wf-1", workflow_version="v1", node_id="node-a" + ), + SimpleNamespace( + agent_id="agent-1", app_id="wf-app-2", workflow_id="wf-2", workflow_version="v2", node_id="node-a" + ), ] apps = [ - SimpleNamespace(id="wf-app-1", name="Beta Flow", mode="workflow"), - SimpleNamespace(id="wf-app-2", name="Alpha Flow", mode="advanced-chat"), + SimpleNamespace(id="wf-app-1", name="Beta Flow", mode="workflow", workflow_id="wf-1"), + SimpleNamespace(id="wf-app-2", name="Alpha Flow", mode="advanced-chat", workflow_id="wf-2"), ] # scalar -> backing agent; scalars -> bindings, then resolved apps. session = FakeSession(scalar=[agent], scalars=[bindings, apps]) @@ -904,6 +936,7 @@ class TestListWorkflowsReferencingAppAgent: beta = next(r for r in result if r["app_id"] == "wf-app-1") assert beta["node_ids"] == ["node-a", "node-b"] # deduped + sorted assert beta["workflow_id"] == "wf-1" + assert beta["workflow_version"] == "v1" def test_returns_empty_when_no_backing_agent(self): session = FakeSession() # scalar() -> None @@ -920,12 +953,100 @@ class TestListWorkflowsReferencingAppAgent: def test_skips_orphaned_binding_whose_app_is_gone(self): agent = SimpleNamespace(id="agent-1") - bindings = [SimpleNamespace(app_id="wf-app-gone", workflow_id="wf-9", node_id="node-a")] + bindings = [ + SimpleNamespace( + agent_id="agent-1", app_id="wf-app-gone", workflow_id="wf-9", workflow_version="v9", node_id="node-a" + ) + ] session = FakeSession(scalar=[agent], scalars=[bindings, []]) # no apps resolved service = AgentRosterService(session) assert service.list_workflows_referencing_app_agent(tenant_id="tenant-1", app_id="app-1") == [] + def test_skips_historical_published_workflow_versions(self): + agent = SimpleNamespace(id="agent-1") + bindings = [ + SimpleNamespace( + agent_id="agent-1", app_id="wf-app-1", workflow_id="old-wf", workflow_version="old", node_id="old" + ), + SimpleNamespace( + agent_id="agent-1", app_id="wf-app-1", workflow_id="current-wf", workflow_version="v2", node_id="new" + ), + ] + apps = [SimpleNamespace(id="wf-app-1", name="Flow", mode="workflow", workflow_id="current-wf")] + session = FakeSession(scalar=[agent], scalars=[bindings, apps]) + service = AgentRosterService(session) + + result = service.list_workflows_referencing_app_agent(tenant_id="tenant-1", app_id="app-1") + + assert len(result) == 1 + assert result[0]["workflow_id"] == "current-wf" + assert result[0]["node_ids"] == ["new"] + + +class TestWorkflowAgentDraftBindingSync: + def test_creates_roster_binding_from_agent_node_graph(self): + workflow = Workflow( + id="workflow-1", + tenant_id="tenant-1", + app_id="app-1", + version=Workflow.VERSION_DRAFT, + graph='{"nodes":[{"id":"agent-node","data":{"type":"agent","version":"2","agent_binding":{"binding_type":"roster_agent","agent_id":"agent-1"}}}]}', + ) + agent = Agent( + id="agent-1", + tenant_id="tenant-1", + name="Agent", + agent_kind=AgentKind.DIFY_AGENT, + scope=AgentScope.ROSTER, + source=AgentSource.AGENT_APP, + status=AgentStatus.ACTIVE, + active_config_snapshot_id="snapshot-2", + ) + session = FakeSession(scalar=[agent], scalars=[[]]) + + WorkflowAgentPublishService.sync_roster_agent_bindings_for_draft( + session=session, + draft_workflow=workflow, + account_id="account-1", + ) + + binding = next(item for item in session.added if isinstance(item, WorkflowAgentNodeBinding)) + assert binding.binding_type == WorkflowAgentBindingType.ROSTER_AGENT + assert binding.agent_id == "agent-1" + assert binding.current_snapshot_id == "snapshot-2" + assert binding.node_job_config_dict == WorkflowNodeJobConfig().model_dump(mode="json") + + def test_deletes_draft_binding_when_agent_node_removed(self): + workflow = Workflow( + id="workflow-1", + tenant_id="tenant-1", + app_id="app-1", + version=Workflow.VERSION_DRAFT, + graph='{"nodes":[]}', + ) + stale_binding = WorkflowAgentNodeBinding( + id="binding-1", + tenant_id="tenant-1", + app_id="app-1", + workflow_id="workflow-1", + workflow_version=Workflow.VERSION_DRAFT, + node_id="removed-node", + binding_type=WorkflowAgentBindingType.ROSTER_AGENT, + agent_id="agent-1", + current_snapshot_id="snapshot-1", + node_job_config=WorkflowNodeJobConfig(), + ) + session = FakeSession(scalars=[[stale_binding]]) + + WorkflowAgentPublishService.sync_roster_agent_bindings_for_draft( + session=session, + draft_workflow=workflow, + account_id="account-1", + ) + + assert session.deleted == [stale_binding] + def test_dataset_rows_filters_malformed_ids(monkeypatch): """Mention ids are user-editable text: a non-UUID id must read as missing diff --git a/packages/contracts/generated/api/console/agents/types.gen.ts b/packages/contracts/generated/api/console/agents/types.gen.ts index 9b3eab813d8..4aa49fc18f6 100644 --- a/packages/contracts/generated/api/console/agents/types.gen.ts +++ b/packages/contracts/generated/api/console/agents/types.gen.ts @@ -19,6 +19,7 @@ export type RosterAgentCreatePayload = { icon_background?: string | null icon_type?: AgentIconType name: string + role?: string version_note?: string | null } @@ -37,6 +38,10 @@ export type AgentRosterResponse = { icon_type?: AgentIconType id: string name: string + published_node_reference_count?: number + published_reference_count?: number + published_references?: Array + role?: string scope: AgentScope source: AgentSource status: AgentStatus @@ -60,6 +65,7 @@ export type RosterAgentUpdatePayload = { icon_background?: string | null icon_type?: AgentIconType name?: string | null + role?: string | null } export type AgentConfigSnapshotListResponse = { @@ -108,6 +114,15 @@ export type AgentConfigSnapshotSummaryResponse = { export type AgentKind = 'dify_agent' +export type AgentPublishedReferenceResponse = { + app_id: string + app_mode: string + app_name: string + node_ids?: Array + workflow_id: string + workflow_version: string +} + export type AgentScope = 'roster' | 'workflow_only' export type AgentSource = 'agent_app' | 'imported' | 'system' | 'workflow' @@ -132,6 +147,10 @@ export type AgentInviteOptionResponse = { in_current_workflow_count?: number is_in_current_workflow?: boolean name: string + published_node_reference_count?: number + published_reference_count?: number + published_references?: Array + role?: string scope: AgentScope source: AgentSource status: AgentStatus diff --git a/packages/contracts/generated/api/console/agents/zod.gen.ts b/packages/contracts/generated/api/console/agents/zod.gen.ts index eeb6c01dc40..35de844cf2d 100644 --- a/packages/contracts/generated/api/console/agents/zod.gen.ts +++ b/packages/contracts/generated/api/console/agents/zod.gen.ts @@ -18,6 +18,7 @@ export const zRosterAgentUpdatePayload = z.object({ icon_background: z.string().max(255).nullish(), icon_type: zAgentIconType.optional(), name: z.string().min(1).max(255).nullish(), + role: z.string().max(255).nullish(), }) /** @@ -50,6 +51,18 @@ export const zAgentConfigSnapshotListResponse = z.object({ */ export const zAgentKind = z.enum(['dify_agent']) +/** + * AgentPublishedReferenceResponse + */ +export const zAgentPublishedReferenceResponse = z.object({ + app_id: z.string(), + app_mode: z.string(), + app_name: z.string(), + node_ids: z.array(z.string()).optional(), + workflow_id: z.string(), + workflow_version: z.string(), +}) + /** * AgentScope * @@ -89,6 +102,10 @@ export const zAgentRosterResponse = z.object({ icon_type: zAgentIconType.optional(), id: z.string(), name: z.string(), + published_node_reference_count: z.int().optional().default(0), + published_reference_count: z.int().optional().default(0), + published_references: z.array(zAgentPublishedReferenceResponse).optional(), + role: z.string().optional().default(''), scope: zAgentScope, source: zAgentSource, status: zAgentStatus, @@ -130,6 +147,10 @@ export const zAgentInviteOptionResponse = z.object({ in_current_workflow_count: z.int().optional().default(0), is_in_current_workflow: z.boolean().optional().default(false), name: z.string(), + published_node_reference_count: z.int().optional().default(0), + published_reference_count: z.int().optional().default(0), + published_references: z.array(zAgentPublishedReferenceResponse).optional(), + role: z.string().optional().default(''), scope: zAgentScope, source: zAgentSource, status: zAgentStatus, @@ -632,6 +653,7 @@ export const zRosterAgentCreatePayload = z.object({ icon_background: z.string().max(255).nullish(), icon_type: zAgentIconType.optional(), name: z.string().min(1).max(255), + role: z.string().max(255).optional().default(''), version_note: z.string().nullish(), })