diff --git a/api/controllers/console/agent/composer.py b/api/controllers/console/agent/composer.py index 7f7370454c5..8d2297a1b82 100644 --- a/api/controllers/console/agent/composer.py +++ b/api/controllers/console/agent/composer.py @@ -90,10 +90,12 @@ class WorkflowAgentComposerValidateApi(Resource): @login_required @account_initialization_required @get_app_model(mode=[AppMode.WORKFLOW, AppMode.ADVANCED_CHAT]) - def post(self, app_model: App, node_id: str): + @with_current_tenant_id + def post(self, tenant_id: str, app_model: App, node_id: str): payload = ComposerSavePayload.model_validate(console_ns.payload or {}) ComposerConfigValidator.validate_save_payload(payload) - return dump_response(AgentComposerValidateResponse, {"result": "success", "errors": []}) + findings = AgentComposerService.collect_validation_findings(tenant_id=tenant_id, payload=payload) + return dump_response(AgentComposerValidateResponse, {"result": "success", "errors": [], **findings}) @console_ns.route("/apps//workflows/draft/nodes//agent-composer/candidates") @@ -105,10 +107,17 @@ class WorkflowAgentComposerCandidatesApi(Resource): @login_required @account_initialization_required @get_app_model(mode=[AppMode.WORKFLOW, AppMode.ADVANCED_CHAT]) - def get(self, app_model: App, node_id: str): + @with_current_user_id + @with_current_tenant_id + def get(self, tenant_id: str, current_user_id: str, app_model: App, node_id: str): return dump_response( AgentComposerCandidatesResponse, - AgentComposerService.get_workflow_candidates(app_id=app_model.id), + AgentComposerService.get_workflow_candidates( + tenant_id=tenant_id, + app_id=app_model.id, + node_id=node_id, + user_id=current_user_id, + ), ) @@ -167,7 +176,7 @@ class AgentAppComposerApi(Resource): @setup_required @login_required @account_initialization_required - @get_app_model() + @get_app_model(mode=[AppMode.AGENT]) @with_current_tenant_id def get(self, tenant_id: str, app_model: App): return dump_response( @@ -181,7 +190,7 @@ class AgentAppComposerApi(Resource): @login_required @account_initialization_required @edit_permission_required - @get_app_model() + @get_app_model(mode=[AppMode.AGENT]) @with_current_user_id @with_current_tenant_id def put(self, tenant_id: str, account_id: str, app_model: App): @@ -206,11 +215,13 @@ class AgentAppComposerValidateApi(Resource): @setup_required @login_required @account_initialization_required - @get_app_model() - def post(self, app_model: App): + @get_app_model(mode=[AppMode.AGENT]) + @with_current_tenant_id + def post(self, tenant_id: str, app_model: App): payload = ComposerSavePayload.model_validate(console_ns.payload or {}) ComposerConfigValidator.validate_save_payload(payload) - return dump_response(AgentComposerValidateResponse, {"result": "success", "errors": []}) + findings = AgentComposerService.collect_validation_findings(tenant_id=tenant_id, payload=payload) + return dump_response(AgentComposerValidateResponse, {"result": "success", "errors": [], **findings}) @console_ns.route("/apps//agent-composer/candidates") @@ -221,9 +232,15 @@ class AgentAppComposerCandidatesApi(Resource): @setup_required @login_required @account_initialization_required - @get_app_model() - def get(self, app_model: App): + @get_app_model(mode=[AppMode.AGENT]) + @with_current_user_id + @with_current_tenant_id + def get(self, tenant_id: str, current_user_id: str, app_model: App): return dump_response( AgentComposerCandidatesResponse, - AgentComposerService.get_agent_app_candidates(app_id=app_model.id), + AgentComposerService.get_agent_app_candidates( + tenant_id=tenant_id, + app_id=app_model.id, + user_id=current_user_id, + ), ) diff --git a/api/core/app/apps/agent_app/runtime_request_builder.py b/api/core/app/apps/agent_app/runtime_request_builder.py index 9d93161a98d..df1e161b641 100644 --- a/api/core/app/apps/agent_app/runtime_request_builder.py +++ b/api/core/app/apps/agent_app/runtime_request_builder.py @@ -35,6 +35,7 @@ from core.workflow.nodes.agent_v2.plugin_tools_builder import ( from core.workflow.nodes.agent_v2.runtime_request_builder import build_shell_layer_config from models.agent_config_entities import AgentSoulConfig from models.provider_ids import ModelProviderID +from services.agent.prompt_mentions import build_soul_mention_resolver, expand_prompt_mentions class AgentAppRuntimeRequestBuildError(ValueError): @@ -135,7 +136,12 @@ class AgentAppRuntimeRequestBuilder: invoke_from=cast(DifyExecutionContextInvokeFrom, context.dify_context.invoke_from.value), agent_mode="agent_app", ), - agent_soul_prompt=agent_soul.prompt.system_prompt or None, + # ENG-616: expand slash-menu mention tokens to canonical names so + # no frontend-internal {{#…#}} marker ever reaches the model. + agent_soul_prompt=expand_prompt_mentions( + agent_soul.prompt.system_prompt, build_soul_mention_resolver(agent_soul) + ).strip() + or None, user_prompt=context.user_query, tools=tools_layer, include_shell=dify_config.AGENT_SHELL_ENABLED, diff --git a/api/core/workflow/graph_topology.py b/api/core/workflow/graph_topology.py new file mode 100644 index 00000000000..e5fe849840e --- /dev/null +++ b/api/core/workflow/graph_topology.py @@ -0,0 +1,86 @@ +"""Draft-workflow graph topology helper, shared by Agent v2 publish validation +and the agent-composer candidates endpoint (ENG-615). + +Extracted from ``core/workflow/nodes/agent_v2/validators.py`` so both call sites +parse the same ``Workflow.graph`` JSON shape (``nodes`` with string ids, +``edges`` with ``source``/``target``). +""" + +from __future__ import annotations + +from collections import defaultdict, deque +from collections.abc import Mapping, Sequence +from typing import Any + + +class WorkflowGraphTopology: + def __init__(self, *, node_ids: set[str], incoming: Mapping[str, Sequence[str]]) -> None: + self._node_ids = node_ids + self._incoming = incoming + + @classmethod + def from_graph(cls, graph: Mapping[str, Any]) -> WorkflowGraphTopology: + node_ids = cls._node_ids_from_graph(graph) + incoming: dict[str, list[str]] = defaultdict(list) + edges = graph.get("edges") + if isinstance(edges, list): + for edge in edges: + if not isinstance(edge, Mapping): + continue + source = edge.get("source") + target = edge.get("target") + if isinstance(source, str) and isinstance(target, str): + incoming[target].append(source) + return cls(node_ids=node_ids, incoming=incoming) + + def has_node(self, node_id: str) -> bool: + return node_id in self._node_ids + + def is_upstream(self, *, source_node_id: str, target_node_id: str) -> bool: + if source_node_id == target_node_id: + return False + visited: set[str] = set() + queue: deque[str] = deque(self._incoming.get(target_node_id, ())) + while queue: + candidate = queue.popleft() + if candidate == source_node_id: + return True + if candidate in visited: + continue + visited.add(candidate) + queue.extend(self._incoming.get(candidate, ())) + return False + + def upstream_node_ids(self, target_node_id: str) -> set[str]: + """All graph nodes reachable upstream of ``target_node_id`` (excluding it). + + Edges may reference ids missing from ``nodes`` (half-deleted graphs); + only real nodes are returned. + """ + visited: set[str] = set() + queue: deque[str] = deque(self._incoming.get(target_node_id, ())) + while queue: + candidate = queue.popleft() + if candidate in visited: + continue + visited.add(candidate) + queue.extend(self._incoming.get(candidate, ())) + visited.discard(target_node_id) + return visited & self._node_ids + + @staticmethod + def _node_ids_from_graph(graph: Mapping[str, Any]) -> set[str]: + node_ids: set[str] = set() + nodes = graph.get("nodes") + if not isinstance(nodes, list): + return node_ids + for node in nodes: + if not isinstance(node, Mapping): + continue + node_id = node.get("id") + if isinstance(node_id, str): + node_ids.add(node_id) + return node_ids + + +__all__ = ["WorkflowGraphTopology"] diff --git a/api/core/workflow/nodes/agent_v2/runtime_request_builder.py b/api/core/workflow/nodes/agent_v2/runtime_request_builder.py index e1f9fbdaba6..1a85b14d286 100644 --- a/api/core/workflow/nodes/agent_v2/runtime_request_builder.py +++ b/api/core/workflow/nodes/agent_v2/runtime_request_builder.py @@ -45,6 +45,11 @@ from models.agent_config_entities import ( effective_declared_outputs as _effective_declared_outputs, ) from models.provider_ids import ModelProviderID +from services.agent.prompt_mentions import ( + build_node_job_mention_resolver, + build_soul_mention_resolver, + expand_prompt_mentions, +) from .output_failure_orchestrator import retry_idempotency_key from .plugin_tools_builder import WorkflowAgentPluginToolsBuilder, WorkflowAgentPluginToolsBuildError @@ -129,7 +134,16 @@ class WorkflowAgentRuntimeRequestBuilder: metadata = self._build_metadata(context, agent_soul, node_job) workflow_context_prompt = self._build_workflow_context_prompt(context, node_job) - workflow_job_prompt = node_job.workflow_prompt.strip() or "Run this workflow Agent Node for the current run." + # ENG-616: expand slash-menu mention tokens into model-readable names. + # node_output mentions expand to their reference name only — the value + # stays in the Workflow context block (user_prompt) below. + workflow_job_prompt = ( + expand_prompt_mentions(node_job.workflow_prompt, build_node_job_mention_resolver(node_job)).strip() + or "Run this workflow Agent Node for the current run." + ) + soul_prompt = expand_prompt_mentions( + agent_soul.prompt.system_prompt, build_soul_mention_resolver(agent_soul) + ).strip() user_prompt = workflow_context_prompt.strip() or "Use the current workflow context." credentials = self._credentials_provider.fetch(agent_soul.model.model_provider, agent_soul.model.model) try: @@ -187,7 +201,7 @@ class WorkflowAgentRuntimeRequestBuilder: agent_mode=self._agent_backend_agent_mode(context.dify_context.invoke_from), invoke_from=cast(DifyExecutionContextInvokeFrom, context.dify_context.invoke_from.value), ), - agent_soul_prompt=agent_soul.prompt.system_prompt or None, + agent_soul_prompt=soul_prompt or None, workflow_node_job_prompt=workflow_job_prompt, user_prompt=user_prompt, output=self._build_output_config(node_job.declared_outputs), diff --git a/api/core/workflow/nodes/agent_v2/validators.py b/api/core/workflow/nodes/agent_v2/validators.py index cb2e54bda79..832a1a5e152 100644 --- a/api/core/workflow/nodes/agent_v2/validators.py +++ b/api/core/workflow/nodes/agent_v2/validators.py @@ -1,12 +1,12 @@ from __future__ import annotations -from collections import defaultdict, deque -from collections.abc import Iterator, Mapping, Sequence +from collections.abc import Iterator, Mapping from typing import Any from sqlalchemy import select from sqlalchemy.orm import Session +from core.workflow.graph_topology import WorkflowGraphTopology from graphon.enums import BuiltinNodeTypes from models.agent import Agent, AgentConfigSnapshot, AgentStatus, WorkflowAgentNodeBinding from models.agent_config_entities import ( @@ -523,54 +523,6 @@ class WorkflowAgentNodeValidator: ) -class _WorkflowGraphTopology: - def __init__(self, *, node_ids: set[str], incoming: Mapping[str, Sequence[str]]) -> None: - self._node_ids = node_ids - self._incoming = incoming - - @classmethod - def from_graph(cls, graph: Mapping[str, Any]) -> _WorkflowGraphTopology: - node_ids = cls._node_ids_from_graph(graph) - incoming: dict[str, list[str]] = defaultdict(list) - edges = graph.get("edges") - if isinstance(edges, list): - for edge in edges: - if not isinstance(edge, Mapping): - continue - source = edge.get("source") - target = edge.get("target") - if isinstance(source, str) and isinstance(target, str): - incoming[target].append(source) - return cls(node_ids=node_ids, incoming=incoming) - - def has_node(self, node_id: str) -> bool: - return node_id in self._node_ids - - def is_upstream(self, *, source_node_id: str, target_node_id: str) -> bool: - if source_node_id == target_node_id: - return False - visited: set[str] = set() - queue: deque[str] = deque(self._incoming.get(target_node_id, ())) - while queue: - candidate = queue.popleft() - if candidate == source_node_id: - return True - if candidate in visited: - continue - visited.add(candidate) - queue.extend(self._incoming.get(candidate, ())) - return False - - @staticmethod - def _node_ids_from_graph(graph: Mapping[str, Any]) -> set[str]: - node_ids: set[str] = set() - nodes = graph.get("nodes") - if not isinstance(nodes, list): - return node_ids - for node in nodes: - if not isinstance(node, Mapping): - continue - node_id = node.get("id") - if isinstance(node_id, str): - node_ids.add(node_id) - return node_ids +# Extracted to core/workflow/graph_topology.py (shared with the agent-composer +# candidates endpoint, ENG-615); kept as a private alias for existing call sites. +_WorkflowGraphTopology = WorkflowGraphTopology diff --git a/api/fields/agent_fields.py b/api/fields/agent_fields.py index cef5d9fc889..d27a5708bf6 100644 --- a/api/fields/agent_fields.py +++ b/api/fields/agent_fields.py @@ -1,4 +1,4 @@ -from typing import Literal +from typing import Annotated, Literal from pydantic import Field @@ -14,6 +14,7 @@ from models.agent import ( ) from models.agent_config_entities import ( AgentCliToolConfig, + AgentFileRefConfig, AgentHumanContactConfig, AgentKnowledgeDatasetConfig, AgentSkillRefConfig, @@ -154,6 +155,7 @@ class WorkflowAgentComposerResponse(ResponseModel): effective_declared_outputs: list[DeclaredOutputConfig] = Field(default_factory=list) save_options: list[ComposerSaveStrategy] impact_summary: AgentComposerImpactResponse | None = None + validation: "ComposerValidationFindingsResponse | None" = None app_id: str | None = None workflow_id: str | None = None node_id: str | None = None @@ -165,11 +167,32 @@ class AgentAppComposerResponse(ResponseModel): active_config_snapshot: AgentConfigSnapshotSummaryResponse agent_soul: AgentSoulConfig save_options: list[ComposerSaveStrategy] + validation: "ComposerValidationFindingsResponse | None" = None + + +class ComposerValidationWarningResponse(ResponseModel): + code: str + surface: str | None = None + kind: str | None = None + id: str | None = None + message: str | None = None + + +class ComposerKnowledgePlaceholderResponse(ResponseModel): + id: str + placeholder_name: str + + +class ComposerValidationFindingsResponse(ResponseModel): + warnings: list[ComposerValidationWarningResponse] = Field(default_factory=list) + knowledge_retrieval_placeholder: list[ComposerKnowledgePlaceholderResponse] = Field(default_factory=list) class AgentComposerValidateResponse(ResponseModel): result: Literal["success"] errors: list[str] = Field(default_factory=list) + warnings: list[ComposerValidationWarningResponse] = Field(default_factory=list) + knowledge_retrieval_placeholder: list[ComposerKnowledgePlaceholderResponse] = Field(default_factory=list) class AgentComposerDifyToolCandidateResponse(ResponseModel): @@ -181,6 +204,20 @@ class AgentComposerDifyToolCandidateResponse(ResponseModel): plugin_id: str | None = None +class AgentComposerSkillCandidateResponse(AgentSkillRefConfig): + kind: Literal["skill"] = "skill" + + +class AgentComposerFileCandidateResponse(AgentFileRefConfig): + kind: Literal["file"] = "file" + + +AgentComposerSkillFileCandidateResponse = Annotated[ + AgentComposerSkillCandidateResponse | AgentComposerFileCandidateResponse, + Field(discriminator="kind"), +] + + class AgentComposerNodeJobCandidatesResponse(ResponseModel): previous_node_outputs: list[WorkflowPreviousNodeOutputRef] = Field(default_factory=list) declare_output_types: list[DeclaredOutputType] = Field(default_factory=list) @@ -188,7 +225,7 @@ class AgentComposerNodeJobCandidatesResponse(ResponseModel): class AgentComposerSoulCandidatesResponse(ResponseModel): - skills_files: list[AgentSkillRefConfig] = Field(default_factory=list) + skills_files: list[AgentComposerSkillFileCandidateResponse] = Field(default_factory=list) dify_tools: list[AgentComposerDifyToolCandidateResponse] = Field(default_factory=list) cli_tools: list[AgentCliToolConfig] = Field(default_factory=list) knowledge_datasets: list[AgentKnowledgeDatasetConfig] = Field(default_factory=list) @@ -204,3 +241,4 @@ class AgentComposerCandidatesResponse(ResponseModel): default_factory=AgentComposerSoulCandidatesResponse ) capabilities: ComposerCandidateCapabilities = Field(default_factory=ComposerCandidateCapabilities) + truncated: bool = False diff --git a/api/openapi/markdown/console-swagger.md b/api/openapi/markdown/console-swagger.md index 69f57446554..8fd1a57e200 100644 --- a/api/openapi/markdown/console-swagger.md +++ b/api/openapi/markdown/console-swagger.md @@ -11808,6 +11808,7 @@ Get banner list | agent | [AgentComposerAgentResponse](#agentcomposeragentresponse) | | Yes | | agent_soul | [AgentSoulConfig](#agentsoulconfig) | | Yes | | save_options | [ [ComposerSaveStrategy](#composersavestrategy) ] | | Yes | +| validation | [ComposerValidationFindingsResponse](#composervalidationfindingsresponse) | | No | | variant | string | | Yes | #### AgentAppFeaturesPayload @@ -11903,6 +11904,7 @@ Risk marker for CLI tool bootstrap commands. | allowed_node_job_candidates | [AgentComposerNodeJobCandidatesResponse](#agentcomposernodejobcandidatesresponse) | | No | | allowed_soul_candidates | [AgentComposerSoulCandidatesResponse](#agentcomposersoulcandidatesresponse) | | No | | capabilities | [ComposerCandidateCapabilities](#composercandidatecapabilities) | | No | +| truncated | boolean | | No | | variant | [ComposerVariant](#composervariant) | | Yes | #### AgentComposerDifyToolCandidateResponse @@ -11916,6 +11918,22 @@ Risk marker for CLI tool bootstrap commands. | provider | string | | No | | provider_id | string | | No | +#### AgentComposerFileCandidateResponse + +| Name | Type | Description | Required | +| ---- | ---- | ----------- | -------- | +| file_id | string | | No | +| id | string | | No | +| kind | string | | No | +| name | string | | No | +| reference | string | | No | +| remote_url | string | | No | +| tenant_id | string | | No | +| transfer_method | string | | No | +| type | string | | No | +| upload_file_id | string | | No | +| url | string | | No | + #### AgentComposerImpactBindingResponse | Name | Type | Description | Required | @@ -11940,6 +11958,17 @@ Risk marker for CLI tool bootstrap commands. | human_contacts | [ [AgentHumanContactConfig](#agenthumancontactconfig) ] | | No | | previous_node_outputs | [ [WorkflowPreviousNodeOutputRef](#workflowpreviousnodeoutputref) ] | | No | +#### AgentComposerSkillCandidateResponse + +| Name | Type | Description | Required | +| ---- | ---- | ----------- | -------- | +| description | string | | No | +| file_id | string | | No | +| id | string | | No | +| kind | string | | No | +| name | string | | No | +| path | string | | No | + #### AgentComposerSoulCandidatesResponse | Name | Type | Description | Required | @@ -11948,7 +11977,7 @@ Risk marker for CLI tool bootstrap commands. | dify_tools | [ [AgentComposerDifyToolCandidateResponse](#agentcomposerdifytoolcandidateresponse) ] | | No | | human_contacts | [ [AgentHumanContactConfig](#agenthumancontactconfig) ] | | No | | knowledge_datasets | [ [AgentKnowledgeDatasetConfig](#agentknowledgedatasetconfig) ] | | No | -| skills_files | [ [AgentSkillRefConfig](#agentskillrefconfig) ] | | No | +| skills_files | [ ] | | No | #### AgentComposerSoulLockResponse @@ -11963,7 +11992,9 @@ Risk marker for CLI tool bootstrap commands. | Name | Type | Description | Required | | ---- | ---- | ----------- | -------- | | errors | [ string ] | | No | +| knowledge_retrieval_placeholder | [ [ComposerKnowledgePlaceholderResponse](#composerknowledgeplaceholderresponse) ] | | No | | result | string | | Yes | +| warnings | [ [ComposerValidationWarningResponse](#composervalidationwarningresponse) ] | | No | #### AgentConfigRevisionOperation @@ -13286,6 +13317,13 @@ Button styles for user actions. | ---- | ---- | ----------- | -------- | | human_roster_available | boolean | | No | +#### ComposerKnowledgePlaceholderResponse + +| Name | Type | Description | Required | +| ---- | ---- | ----------- | -------- | +| id | string | | Yes | +| placeholder_name | string | | Yes | + #### ComposerSavePayload | Name | Type | Description | Required | @@ -13314,6 +13352,23 @@ Button styles for user actions. | locked | boolean | | No | | unlocked_from_version_id | string | | No | +#### ComposerValidationFindingsResponse + +| Name | Type | Description | Required | +| ---- | ---- | ----------- | -------- | +| knowledge_retrieval_placeholder | [ [ComposerKnowledgePlaceholderResponse](#composerknowledgeplaceholderresponse) ] | | No | +| warnings | [ [ComposerValidationWarningResponse](#composervalidationwarningresponse) ] | | No | + +#### ComposerValidationWarningResponse + +| Name | Type | Description | Required | +| ---- | ---- | ----------- | -------- | +| code | string | | Yes | +| id | string | | No | +| kind | string | | No | +| message | string | | No | +| surface | string | | No | + #### ComposerVariant | Name | Type | Description | Required | @@ -17571,6 +17626,7 @@ How a workflow node is bound to an Agent. | node_job | [WorkflowNodeJobConfig](#workflownodejobconfig) | | Yes | | save_options | [ [ComposerSaveStrategy](#composersavestrategy) ] | | Yes | | soul_lock | [AgentComposerSoulLockResponse](#agentcomposersoullockresponse) | | Yes | +| validation | [ComposerValidationFindingsResponse](#composervalidationfindingsresponse) | | No | | variant | string | | Yes | | workflow_id | string | | No | diff --git a/api/services/agent/composer_candidates.py b/api/services/agent/composer_candidates.py new file mode 100644 index 00000000000..0a1419be399 --- /dev/null +++ b/api/services/agent/composer_candidates.py @@ -0,0 +1,210 @@ +"""Slash-menu candidates assembly (ENG-615). + +Pure assembly over injected loaders so the upstream-graph computation and the +per-source mapping are unit-testable without a database. IO wiring (draft +workflow / bindings / draft variables / datasets / workspace tools) lives in +``AgentComposerService.get_*_candidates``. + +``previous_node_outputs`` entries are emitted in the stored +``WorkflowPreviousNodeOutputRef`` shape (``selector``/``node_id``/``output``/ +``name``) so the frontend can write a selected candidate back into +``node_job.previous_node_output_refs`` verbatim; display extras +(``node_title``/``node_kind``/``value_type``/``inferred``) ride along via the +flexible config schema. Output enumeration follows the Node Output Inspector: +start variables + recorded ``sys.*`` variables are static, Agent v2 nodes use +their binding's declared outputs, and every other node kind is inferred from +the latest draft-run variables (``inferred: true``). +""" + +from __future__ import annotations + +from collections.abc import Callable, Mapping +from typing import Any + +from models.agent_config_entities import ( + AgentSoulConfig, + DeclaredOutputConfig, +) + +MAX_CANDIDATES_PER_LIST = 200 + +_SYSTEM_NODE_ID = "sys" + +# loader signatures injected by the service layer +DeclaredOutputsLoader = Callable[[str], list[DeclaredOutputConfig] | None] +DraftVariablesLoader = Callable[[str], list[tuple[str, str | None]]] +SystemVariablesLoader = Callable[[], list[tuple[str, str | None]]] +DatasetLookup = Callable[[list[str]], Mapping[str, Any]] +WorkspaceToolsLoader = Callable[[], list[dict[str, Any]]] + + +def previous_node_output_candidates( + *, + graph: Mapping[str, Any], + node_id: str, + declared_outputs_loader: DeclaredOutputsLoader, + draft_variables_loader: DraftVariablesLoader, + system_variables_loader: SystemVariablesLoader, +) -> tuple[list[dict[str, Any]], bool]: + """Enumerate upstream node outputs for ``node_id`` as writable ref candidates.""" + from core.workflow.graph_topology import WorkflowGraphTopology + + topology = WorkflowGraphTopology.from_graph(graph) + upstream = topology.upstream_node_ids(node_id) + + entries: list[dict[str, Any]] = [] + for name, value_type in system_variables_loader(): + entries.append( + _ref_entry( + node_id=_SYSTEM_NODE_ID, + output=name, + node_title="System", + node_kind="system", + value_type=value_type, + inferred=True, + ) + ) + + nodes = graph.get("nodes") + for node in nodes if isinstance(nodes, list) else []: + if not isinstance(node, Mapping): + continue + nid = node.get("id") + if not isinstance(nid, str) or nid not in upstream: + continue + raw_data = node.get("data") + data: Mapping[str, Any] = raw_data if isinstance(raw_data, Mapping) else {} + kind = str(data.get("type") or "unknown") + title = str(data.get("title") or nid) + + if kind == "start": + for variable in data.get("variables") or []: + if not isinstance(variable, Mapping): + continue + var_name = variable.get("variable") + if isinstance(var_name, str) and var_name: + entries.append( + _ref_entry( + node_id=nid, + output=var_name, + node_title=title, + node_kind=kind, + value_type=variable.get("type") if isinstance(variable.get("type"), str) else None, + inferred=False, + ) + ) + continue + + declared: list[DeclaredOutputConfig] | None = None + if kind == "agent" and str(data.get("version", "")) == "2": + declared = declared_outputs_loader(nid) + if declared is not None: + for output in declared: + entries.append( + _ref_entry( + node_id=nid, + output=output.name, + node_title=title, + node_kind=kind, + value_type=output.type.value, + inferred=False, + ) + ) + continue + + for var_name, value_type in draft_variables_loader(nid): + entries.append( + _ref_entry( + node_id=nid, + output=var_name, + node_title=title, + node_kind=kind, + value_type=value_type, + inferred=True, + ) + ) + + return _capped(entries) + + +def soul_candidates( + *, + agent_soul: AgentSoulConfig | None, + dataset_lookup: DatasetLookup, + workspace_tools_loader: WorkspaceToolsLoader, +) -> tuple[dict[str, list[dict[str, Any]]], bool]: + """Assemble the soul-surface candidate lists (design §3.2).""" + soul = agent_soul or AgentSoulConfig() + truncated = False + + skills_files = [{"kind": "skill", **skill.model_dump(exclude_none=True)} for skill in soul.skills_files.skills] + skills_files += [{"kind": "file", **file.model_dump(exclude_none=True)} for file in soul.skills_files.files] + + cli_tools = [tool.model_dump(exclude_none=True) for tool in soul.tools.cli_tools if tool.enabled] + + dataset_ids = [dataset.id for dataset in soul.knowledge.datasets if dataset.id] + dataset_rows = dataset_lookup(dataset_ids) if dataset_ids else {} + knowledge_datasets: list[dict[str, Any]] = [] + for dataset in soul.knowledge.datasets: + if not dataset.id: + continue + row = dataset_rows.get(dataset.id) + knowledge_datasets.append( + { + "id": dataset.id, + "name": (getattr(row, "name", None) or dataset.name or dataset.id), + "description": getattr(row, "description", None) or dataset.description, + "missing": row is None, + } + ) + + human_contacts = [contact.model_dump(exclude_none=True) for contact in soul.human.contacts] + dify_tools = workspace_tools_loader() + + lists = { + "skills_files": skills_files, + "dify_tools": dify_tools, + "cli_tools": cli_tools, + "knowledge_datasets": knowledge_datasets, + "human_contacts": human_contacts, + } + capped: dict[str, list[dict[str, Any]]] = {} + for key, values in lists.items(): + clipped, was_clipped = _capped(values) + truncated = truncated or was_clipped + capped[key] = clipped + return capped, truncated + + +def _ref_entry( + *, + node_id: str, + output: str, + node_title: str, + node_kind: str, + value_type: str | None, + inferred: bool, +) -> dict[str, Any]: + return { + "selector": [node_id, output], + "node_id": node_id, + "output": output, + "name": f"{node_title}/{output}", + "node_title": node_title, + "node_kind": node_kind, + "value_type": value_type, + "inferred": inferred, + } + + +def _capped(values: list[dict[str, Any]]) -> tuple[list[dict[str, Any]], bool]: + if len(values) > MAX_CANDIDATES_PER_LIST: + return values[:MAX_CANDIDATES_PER_LIST], True + return values, False + + +__all__ = [ + "MAX_CANDIDATES_PER_LIST", + "previous_node_output_candidates", + "soul_candidates", +] diff --git a/api/services/agent/composer_service.py b/api/services/agent/composer_service.py index 58500274d88..628c93ec389 100644 --- a/api/services/agent/composer_service.py +++ b/api/services/agent/composer_service.py @@ -1,3 +1,4 @@ +import logging from typing import Any from sqlalchemy import func, select @@ -39,6 +40,8 @@ from services.entities.agent_entities import ( # Mirrors Workflow.version when it is "draft" (see models/workflow.py). _DRAFT_WORKFLOW_VERSION = "draft" +logger = logging.getLogger(__name__) + class AgentComposerService: @classmethod @@ -108,7 +111,9 @@ class AgentComposerService: agent_id=agent.id if agent else None, version_id=binding.current_snapshot_id, ) - return cls._serialize_workflow_state(binding=binding, agent=agent, version=version) + state = cls._serialize_workflow_state(binding=binding, agent=agent, version=version) + state["validation"] = cls.collect_validation_findings(tenant_id=tenant_id, payload=payload) + return state @classmethod def load_agent_app_composer(cls, *, tenant_id: str, app_id: str) -> dict[str, Any]: @@ -205,42 +210,241 @@ class AgentComposerService: agent.updated_by = account_id db.session.commit() - return cls.load_agent_app_composer(tenant_id=tenant_id, app_id=app_id) + state = cls.load_agent_app_composer(tenant_id=tenant_id, app_id=app_id) + state["validation"] = cls.collect_validation_findings(tenant_id=tenant_id, payload=payload) + return state @classmethod - def get_workflow_candidates(cls, *, app_id: str) -> dict[str, Any]: + def collect_validation_findings(cls, *, tenant_id: str, payload: ComposerSavePayload) -> dict[str, Any]: + """ENG-617 soft findings, with DB-backed dataset existence for placeholders.""" + from services.agent.prompt_mentions import MentionKind, parse_prompt_mentions + + mentioned_ids: set[str] = set() + if payload.agent_soul is not None: + mentioned_ids |= { + mention.ref_id + for mention in parse_prompt_mentions(payload.agent_soul.prompt.system_prompt) + if mention.kind == MentionKind.KNOWLEDGE + } + existing_dataset_ids: set[str] | None = None + if mentioned_ids: + existing_dataset_ids = set(cls._dataset_rows(tenant_id=tenant_id, dataset_ids=sorted(mentioned_ids))) + return ComposerConfigValidator.collect_soft_findings(payload, existing_dataset_ids=existing_dataset_ids) + + @classmethod + def get_workflow_candidates(cls, *, tenant_id: str, app_id: str, node_id: str, user_id: str) -> dict[str, Any]: + """Slash-menu data source for the workflow Agent node composer (ENG-615).""" + from services.agent.composer_candidates import previous_node_output_candidates, soul_candidates + + try: + workflow = cls._get_draft_workflow(tenant_id=tenant_id, app_id=app_id) + except ValueError: + workflow = None + + node_job: WorkflowNodeJobConfig | None = None + agent_soul: AgentSoulConfig | None = None + if workflow is not None: + binding = cls._get_workflow_binding(tenant_id=tenant_id, workflow_id=workflow.id, node_id=node_id) + if binding is not None: + node_job = cls._parse_node_job(binding) + agent_soul = cls._load_binding_soul(tenant_id=tenant_id, binding=binding) + + truncated = False + previous_outputs: list[dict[str, Any]] = [] + if workflow is not None: + draft_variable_session = cls._draft_variable_session() + try: + previous_outputs, outputs_truncated = previous_node_output_candidates( + graph=workflow.graph_dict, + node_id=node_id, + declared_outputs_loader=lambda nid: cls._binding_declared_outputs( + tenant_id=tenant_id, workflow_id=workflow.id, node_id=nid + ), + draft_variables_loader=lambda nid: cls._draft_node_variables( + session=draft_variable_session, app_id=app_id, node_id=nid, user_id=user_id + ), + system_variables_loader=lambda: cls._draft_system_variables( + session=draft_variable_session, app_id=app_id, user_id=user_id + ), + ) + finally: + draft_variable_session.close() + truncated = truncated or outputs_truncated + + soul_lists, soul_truncated = soul_candidates( + agent_soul=agent_soul, + dataset_lookup=lambda ids: cls._dataset_rows(tenant_id=tenant_id, dataset_ids=ids), + workspace_tools_loader=lambda: cls._workspace_dify_tools(tenant_id=tenant_id, user_id=user_id), + ) + truncated = truncated or soul_truncated + response = ComposerCandidatesResponse( variant=ComposerVariant.WORKFLOW, allowed_node_job_candidates={ - "previous_node_outputs": [], + "previous_node_outputs": previous_outputs, "declare_output_types": ["string", "number", "object", "array", "boolean", "file"], - "human_contacts": [], - }, - allowed_soul_candidates={ - "skills_files": [], - "dify_tools": [], - "cli_tools": [], - "knowledge_datasets": [], - "human_contacts": [], + "human_contacts": [ + contact.model_dump(exclude_none=True) for contact in (node_job.human_contacts if node_job else []) + ], }, + allowed_soul_candidates=soul_lists, + truncated=truncated, ) return response.model_dump(mode="json") @classmethod - def get_agent_app_candidates(cls, *, app_id: str) -> dict[str, Any]: + def get_agent_app_candidates(cls, *, tenant_id: str, app_id: str, user_id: str) -> dict[str, Any]: + """Slash-menu data source for the Agent App (Console) composer (ENG-615).""" + from services.agent.composer_candidates import soul_candidates + + agent_soul = cls._load_agent_app_soul(tenant_id=tenant_id, app_id=app_id) + soul_lists, truncated = soul_candidates( + agent_soul=agent_soul, + dataset_lookup=lambda ids: cls._dataset_rows(tenant_id=tenant_id, dataset_ids=ids), + workspace_tools_loader=lambda: cls._workspace_dify_tools(tenant_id=tenant_id, user_id=user_id), + ) response = ComposerCandidatesResponse( variant=ComposerVariant.AGENT_APP, allowed_node_job_candidates={}, - allowed_soul_candidates={ - "skills_files": [], - "dify_tools": [], - "cli_tools": [], - "knowledge_datasets": [], - "human_contacts": [], - }, + allowed_soul_candidates=soul_lists, + truncated=truncated, ) return response.model_dump(mode="json") + # ── candidates IO helpers (ENG-615) ────────────────────────────────────── + + @staticmethod + def _parse_node_job(binding: WorkflowAgentNodeBinding) -> WorkflowNodeJobConfig | None: + try: + return WorkflowNodeJobConfig.model_validate(binding.node_job_config_dict) + except Exception: + logger.warning("candidates: malformed node_job_config for binding %s", binding.id, exc_info=True) + return None + + @classmethod + def _load_binding_soul(cls, *, tenant_id: str, binding: WorkflowAgentNodeBinding) -> AgentSoulConfig | None: + agent = cls._get_agent_if_present(tenant_id=tenant_id, agent_id=binding.agent_id) + version = cls._get_version_if_present( + tenant_id=tenant_id, + agent_id=agent.id if agent else None, + version_id=binding.current_snapshot_id, + ) + return cls._parse_soul_snapshot(version) + + @classmethod + def _load_agent_app_soul(cls, *, tenant_id: str, app_id: str) -> AgentSoulConfig | None: + agent = db.session.scalar( + select(Agent) + .where( + Agent.tenant_id == tenant_id, + Agent.app_id == app_id, + Agent.scope == AgentScope.ROSTER, + Agent.status == AgentStatus.ACTIVE, + ) + .order_by(Agent.created_at.desc()) + .limit(1) + ) + if agent is None: + return None + version = cls._get_version_if_present( + tenant_id=tenant_id, agent_id=agent.id, version_id=agent.active_config_snapshot_id + ) + return cls._parse_soul_snapshot(version) + + @staticmethod + def _parse_soul_snapshot(version: AgentConfigSnapshot | None) -> AgentSoulConfig | None: + if version is None: + return None + try: + return AgentSoulConfig.model_validate(version.config_snapshot_dict) + except Exception: + logger.warning("candidates: malformed soul snapshot %s", version.id, exc_info=True) + return None + + @classmethod + def _binding_declared_outputs( + cls, *, tenant_id: str, workflow_id: str, node_id: str + ) -> list[DeclaredOutputConfig] | None: + binding = cls._get_workflow_binding(tenant_id=tenant_id, workflow_id=workflow_id, node_id=node_id) + if binding is None: + return None + node_job = cls._parse_node_job(binding) + if node_job is None: + return None + return list(_effective_declared_outputs(node_job.declared_outputs)) + + @staticmethod + def _draft_variable_session(): + from sqlalchemy.orm import sessionmaker + + return sessionmaker(bind=db.engine, expire_on_commit=False)() + + @staticmethod + def _draft_node_variables(*, session: Any, app_id: str, node_id: str, user_id: str) -> list[tuple[str, str | None]]: + from services.workflow_draft_variable_service import WorkflowDraftVariableService + + variables = WorkflowDraftVariableService(session=session).list_node_variables(app_id, node_id, user_id) + return [(variable.name, variable.value_type.value) for variable in variables.variables] + + @staticmethod + def _draft_system_variables(*, session: Any, app_id: str, user_id: str) -> list[tuple[str, str | None]]: + from services.workflow_draft_variable_service import WorkflowDraftVariableService + + variables = WorkflowDraftVariableService(session=session).list_system_variables(app_id, user_id) + return [(variable.name, variable.value_type.value) for variable in variables.variables] + + @staticmethod + def _dataset_rows(*, tenant_id: str, dataset_ids: list[str]) -> dict[str, Any]: + """Tenant-scoped dataset lookup tolerating malformed ids. + + Mention ids come from user-editable prompt text; a non-UUID id can never + match a dataset row, so it is simply absent from the result (-> missing/ + placeholder semantics) instead of breaking the UUID-typed query. + """ + from uuid import UUID + + from services.dataset_service import DatasetService + + valid_ids: list[str] = [] + for dataset_id in dataset_ids: + try: + UUID(dataset_id) + except (ValueError, TypeError): + continue + valid_ids.append(dataset_id) + if not valid_ids: + return {} + rows, _ = DatasetService.get_datasets_by_ids(valid_ids, tenant_id) + return {str(row.id): row for row in rows} + + @staticmethod + def _workspace_dify_tools(*, tenant_id: str, user_id: str) -> list[dict[str, Any]]: + """Workspace Dify Plugin tools, same source as the tool selector. + + A plugin-daemon outage must degrade the slash menu to an empty tools + tab, not break the whole candidates endpoint. + """ + from services.tools.builtin_tools_manage_service import BuiltinToolManageService + + try: + providers = BuiltinToolManageService.list_builtin_tools(user_id, tenant_id) + except Exception: + logger.warning("candidates: failed to list workspace tools for tenant %s", tenant_id, exc_info=True) + return [] + tools: list[dict[str, Any]] = [] + for provider in providers: + for tool in provider.tools or []: + tools.append( + { + "id": f"{provider.name}/{tool.name}", + "name": tool.name, + "description": tool.label.en_US if tool.label else tool.name, + "provider": provider.name, + "plugin_id": provider.plugin_id or None, + } + ) + return tools + @classmethod def calculate_impact(cls, *, tenant_id: str, current_snapshot_id: str) -> dict[str, Any]: bindings = list( diff --git a/api/services/agent/composer_validator.py b/api/services/agent/composer_validator.py index 7fdff7232be..d98c37796f2 100644 --- a/api/services/agent/composer_validator.py +++ b/api/services/agent/composer_validator.py @@ -4,6 +4,17 @@ from typing import Any from pydantic import ValidationError from services.agent.errors import AgentSoulLockedError, InvalidComposerConfigError, PlaintextSecretNotAllowedError +from services.agent.prompt_mentions import ( + MAX_MENTIONS_PER_PROMPT, + NODE_JOB_PROMPT_ALLOWED_KINDS, + SOUL_PROMPT_ALLOWED_KINDS, + MentionKind, + MentionResolver, + build_node_job_mention_resolver, + build_soul_mention_resolver, + find_malformed_mention_markers, + parse_prompt_mentions, +) from services.entities.agent_entities import ( AgentSoulConfig, ComposerSavePayload, @@ -46,6 +57,158 @@ class ComposerConfigValidator: cls.validate_agent_soul(payload.agent_soul) if payload.node_job is not None: cls.validate_node_job(payload.node_job) + cls._validate_prompt_mentions(payload) + + @classmethod + def _validate_prompt_mentions(cls, payload: ComposerSavePayload) -> None: + """ENG-616 §2.4 allowlists + ENG-617 §5.2 human-must-be-referenced. + + Error messages start with a stable code token (``mention_kind_not_allowed`` + / ``mention_limit_exceeded`` / ``human_involvement_not_referenced``) so + the frontend can switch on it. + """ + if payload.agent_soul is not None: + cls._validate_surface_mentions( + prompt=payload.agent_soul.prompt.system_prompt, + allowed=SOUL_PROMPT_ALLOWED_KINDS, + surface="agent soul prompt", + ) + cls._require_human_mentions( + prompt=payload.agent_soul.prompt.system_prompt, + contacts=payload.agent_soul.human.contacts, + surface="agent soul prompt", + ) + if payload.node_job is not None: + cls._validate_surface_mentions( + prompt=payload.node_job.workflow_prompt, + allowed=NODE_JOB_PROMPT_ALLOWED_KINDS, + surface="workflow job prompt", + ) + cls._require_human_mentions( + prompt=payload.node_job.workflow_prompt, + contacts=payload.node_job.human_contacts, + surface="workflow job prompt", + ) + + @classmethod + def _validate_surface_mentions(cls, *, prompt: str, allowed: frozenset[MentionKind], surface: str) -> None: + mentions = parse_prompt_mentions(prompt) + if len(mentions) > MAX_MENTIONS_PER_PROMPT: + raise InvalidComposerConfigError( + f"mention_limit_exceeded: {surface} has {len(mentions)} mentions, " + f"exceeding the limit of {MAX_MENTIONS_PER_PROMPT}." + ) + for mention in mentions: + if mention.kind not in allowed: + raise InvalidComposerConfigError( + f"mention_kind_not_allowed: {surface} cannot reference {mention.kind.value} (id={mention.ref_id})." + ) + + @classmethod + def _require_human_mentions(cls, *, prompt: str, contacts: list[Any], surface: str) -> None: + """ENG-617 §5.2 (PRD: human involvement must be slash-referenced or save errors). + + Every configured human contact must appear as ``{{#human:#}}`` in the + corresponding prompt. A contact matches via any identity alias; contacts + carrying no identity at all cannot be referenced and are skipped. + """ + if not contacts: + return + mentioned = {mention.ref_id for mention in parse_prompt_mentions(prompt) if mention.kind == MentionKind.HUMAN} + for contact in contacts: + aliases = { + alias + for alias in (contact.id, contact.contact_id, contact.human_id, contact.email, contact.name) + if alias + } + if not aliases: + continue + if aliases.isdisjoint(mentioned): + display = contact.name or contact.email or contact.id or "human involvement" + raise InvalidComposerConfigError( + f"human_involvement_not_referenced: configured human involvement '{display}' " + f"must be referenced in the {surface} via the slash menu." + ) + + @classmethod + def collect_soft_findings( + cls, + payload: ComposerSavePayload, + *, + existing_dataset_ids: set[str] | None = None, + ) -> dict[str, Any]: + """ENG-617 §5.3/§5.4 soft findings — never block save. + + ``warnings`` carries ``mention_target_missing`` / ``mention_malformed`` + entries; ``knowledge_retrieval_placeholder`` keeps dangling knowledge + mentions with a placeholder name (0522 consensus) instead of dropping or + rejecting them. With ``existing_dataset_ids`` provided, configured-but- + deleted datasets surface as placeholders too. + """ + warnings: list[dict[str, Any]] = [] + placeholders: list[dict[str, str]] = [] + + surfaces: list[tuple[str, str, MentionResolver, frozenset[MentionKind]]] = [] + if payload.agent_soul is not None: + surfaces.append( + ( + "agent_soul", + payload.agent_soul.prompt.system_prompt, + build_soul_mention_resolver(payload.agent_soul), + SOUL_PROMPT_ALLOWED_KINDS, + ) + ) + if payload.node_job is not None: + surfaces.append( + ( + "node_job", + payload.node_job.workflow_prompt, + build_node_job_mention_resolver(payload.node_job), + NODE_JOB_PROMPT_ALLOWED_KINDS, + ) + ) + + for surface, prompt, resolver, allowed in surfaces: + for mention in parse_prompt_mentions(prompt): + if mention.kind not in allowed: + continue # hard-rejected by validate_save_payload + resolved = resolver(mention) + if mention.kind == MentionKind.KNOWLEDGE: + dangling = resolved is None or ( + existing_dataset_ids is not None and mention.ref_id not in existing_dataset_ids + ) + if dangling: + placeholders.append( + { + "id": mention.ref_id, + "placeholder_name": mention.label or f"Knowledge {mention.ref_id[:8]}", + } + ) + continue + if resolved is None: + warnings.append( + { + "code": "mention_target_missing", + "surface": surface, + "kind": mention.kind.value, + "id": mention.ref_id, + "message": f"{mention.kind.value} mention (id={mention.ref_id}) does not match " + "any configured item.", + } + ) + for marker in find_malformed_mention_markers(prompt): + warnings.append( + { + "code": "mention_malformed", + "surface": surface, + "kind": None, + "id": None, + "message": f"mention-shaped marker {marker!r} is malformed and will be " + "degraded to plain text at runtime.", + } + ) + + return {"warnings": warnings, "knowledge_retrieval_placeholder": placeholders} @classmethod def validate_agent_soul(cls, agent_soul: AgentSoulConfig) -> None: diff --git a/api/services/agent/prompt_mentions.py b/api/services/agent/prompt_mentions.py new file mode 100644 index 00000000000..880e14af9b3 --- /dev/null +++ b/api/services/agent/prompt_mentions.py @@ -0,0 +1,264 @@ +"""Prompt mention (slash-reference) serialization contract — ENG-616. + +Slash-menu insertions are stored inline in the plain-string prompt as tokens: + + [§:[: