From 60cd346fa64bfd7b973c4fb4aa55a3e198173076 Mon Sep 17 00:00:00 2001 From: zyssyz123 <916125788@qq.com> Date: Wed, 20 May 2026 20:39:45 +0800 Subject: [PATCH] feat: wire workflow agent node runtime (#36437) Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com> --- api/clients/agent_backend/request_builder.py | 2 + api/configs/extra/__init__.py | 2 + api/configs/extra/agent_backend_config.py | 23 ++ api/core/workflow/node_factory.py | 37 +- api/core/workflow/nodes/agent_v2/__init__.py | 4 + .../workflow/nodes/agent_v2/agent_node.py | 281 +++++++++++++ .../nodes/agent_v2/binding_resolver.py | 93 +++++ api/core/workflow/nodes/agent_v2/entities.py | 17 + .../workflow/nodes/agent_v2/output_adapter.py | 255 ++++++++++++ .../agent_v2/runtime_feature_manifest.py | 55 +++ .../nodes/agent_v2/runtime_request_builder.py | 288 +++++++++++++ .../workflow/nodes/agent_v2/validators.py | 388 ++++++++++++++++++ api/models/agent_config_entities.py | 19 + api/openapi/markdown/console-swagger.md | 23 ++ .../agent/workflow_publish_service.py | 59 +++ api/services/workflow_service.py | 19 + .../nodes/agent_v2/test_agent_node.py | 155 +++++++ .../nodes/agent_v2/test_binding_resolver.py | 121 ++++++ .../nodes/agent_v2/test_output_adapter.py | 194 +++++++++ .../agent_v2/test_runtime_request_builder.py | 219 ++++++++++ .../nodes/agent_v2/test_validators.py | 271 ++++++++++++ .../workflow/test_node_mapping_bootstrap.py | 6 + .../agent/test_agent_composer_entities.py | 18 +- .../generated/api/console/agents/types.gen.ts | 17 + .../generated/api/console/agents/zod.gen.ts | 25 ++ .../generated/api/console/apps/types.gen.ts | 17 + .../generated/api/console/apps/zod.gen.ts | 25 ++ 27 files changed, 2626 insertions(+), 7 deletions(-) create mode 100644 api/configs/extra/agent_backend_config.py create mode 100644 api/core/workflow/nodes/agent_v2/__init__.py create mode 100644 api/core/workflow/nodes/agent_v2/agent_node.py create mode 100644 api/core/workflow/nodes/agent_v2/binding_resolver.py create mode 100644 api/core/workflow/nodes/agent_v2/entities.py create mode 100644 api/core/workflow/nodes/agent_v2/output_adapter.py create mode 100644 api/core/workflow/nodes/agent_v2/runtime_feature_manifest.py create mode 100644 api/core/workflow/nodes/agent_v2/runtime_request_builder.py create mode 100644 api/core/workflow/nodes/agent_v2/validators.py create mode 100644 api/services/agent/workflow_publish_service.py create mode 100644 api/tests/unit_tests/core/workflow/nodes/agent_v2/test_agent_node.py create mode 100644 api/tests/unit_tests/core/workflow/nodes/agent_v2/test_binding_resolver.py create mode 100644 api/tests/unit_tests/core/workflow/nodes/agent_v2/test_output_adapter.py create mode 100644 api/tests/unit_tests/core/workflow/nodes/agent_v2/test_runtime_request_builder.py create mode 100644 api/tests/unit_tests/core/workflow/nodes/agent_v2/test_validators.py diff --git a/api/clients/agent_backend/request_builder.py b/api/clients/agent_backend/request_builder.py index 41b9ce059d..a886fe849f 100644 --- a/api/clients/agent_backend/request_builder.py +++ b/api/clients/agent_backend/request_builder.py @@ -49,6 +49,7 @@ class AgentBackendModelConfig(BaseModel): model: str user_id: str | None = None credentials: dict[str, DifyPluginCredentialValue] = Field(default_factory=dict) + model_settings: dict[str, JsonValue] = Field(default_factory=dict) model_config: ClassVar[ConfigDict] = ConfigDict(extra="forbid") @@ -138,6 +139,7 @@ class AgentBackendRunRequestBuilder: model_provider=run_input.model.model_provider, model=run_input.model.model, credentials=run_input.model.credentials, + model_settings=run_input.model.model_settings or None, ), ), ] diff --git a/api/configs/extra/__init__.py b/api/configs/extra/__init__.py index de97adfc0e..a2246db208 100644 --- a/api/configs/extra/__init__.py +++ b/api/configs/extra/__init__.py @@ -1,3 +1,4 @@ +from configs.extra.agent_backend_config import AgentBackendConfig from configs.extra.archive_config import ArchiveStorageConfig from configs.extra.notion_config import NotionConfig from configs.extra.sentry_config import SentryConfig @@ -5,6 +6,7 @@ from configs.extra.sentry_config import SentryConfig class ExtraServiceConfig( # place the configs in alphabet order + AgentBackendConfig, ArchiveStorageConfig, NotionConfig, SentryConfig, diff --git a/api/configs/extra/agent_backend_config.py b/api/configs/extra/agent_backend_config.py new file mode 100644 index 0000000000..ae1dc2ed22 --- /dev/null +++ b/api/configs/extra/agent_backend_config.py @@ -0,0 +1,23 @@ +from pydantic import Field +from pydantic_settings import BaseSettings + + +class AgentBackendConfig(BaseSettings): + """ + Configuration settings for the Agent backend runtime integration. + """ + + AGENT_BACKEND_BASE_URL: str | None = Field( + description="Base URL for the Dify Agent backend service.", + default=None, + ) + + AGENT_BACKEND_USE_FAKE: bool = Field( + description="Use the deterministic in-process fake Agent backend client.", + default=False, + ) + + AGENT_BACKEND_FAKE_SCENARIO: str = Field( + description="Scenario used by the fake Agent backend client.", + default="success", + ) diff --git a/api/core/workflow/node_factory.py b/api/core/workflow/node_factory.py index 5d572bbd5e..baaa536a5c 100644 --- a/api/core/workflow/node_factory.py +++ b/api/core/workflow/node_factory.py @@ -37,6 +37,10 @@ from core.workflow.nodes.agent.plugin_strategy_adapter import ( PluginAgentStrategyResolver, ) from core.workflow.nodes.agent.runtime_support import AgentRuntimeSupport +from core.workflow.nodes.agent_v2 import DifyAgentNode +from core.workflow.nodes.agent_v2.binding_resolver import WorkflowAgentBindingResolver +from core.workflow.nodes.agent_v2.output_adapter import WorkflowAgentOutputAdapter +from core.workflow.nodes.agent_v2.runtime_request_builder import WorkflowAgentRuntimeRequestBuilder from core.workflow.system_variables import SystemVariableKey, get_system_text, system_variable_selector from core.workflow.template_rendering import CodeExecutorJinja2TemplateRenderer from graphon.entities.base_node_data import BaseNodeData @@ -438,12 +442,7 @@ class DifyNodeFactory(NodeFactory): "tool_file_manager": self._bound_tool_file_manager_factory(), "runtime": self._tool_runtime, }, - BuiltinNodeTypes.AGENT: lambda: { - "strategy_resolver": self._agent_strategy_resolver, - "presentation_provider": self._agent_strategy_presentation_provider, - "runtime_support": self._agent_runtime_support, - "message_transformer": self._agent_message_transformer, - }, + BuiltinNodeTypes.AGENT: lambda: self._build_agent_node_init_kwargs(node_class=node_class), } node_init_kwargs = node_init_kwargs_factories.get(node_type, lambda: {})() constructor_node_data = resolved_node_data.model_dump(mode="python", by_alias=True) @@ -469,6 +468,32 @@ class DifyNodeFactory(NodeFactory): def _resolve_node_class(*, node_type: NodeType, node_version: str) -> type[Node]: return resolve_workflow_node_class(node_type=node_type, node_version=node_version) + def _build_agent_node_init_kwargs(self, *, node_class: type[Node]) -> dict[str, object]: + if issubclass(node_class, DifyAgentNode): + from clients.agent_backend import AgentBackendRunEventAdapter, AgentBackendRunRequestBuilder + from clients.agent_backend.factory import create_agent_backend_run_client + + return { + "binding_resolver": WorkflowAgentBindingResolver(), + "runtime_request_builder": WorkflowAgentRuntimeRequestBuilder( + credentials_provider=self._llm_credentials_provider, + request_builder=AgentBackendRunRequestBuilder(), + ), + "agent_backend_client": create_agent_backend_run_client( + base_url=dify_config.AGENT_BACKEND_BASE_URL, + use_fake=dify_config.AGENT_BACKEND_USE_FAKE, + fake_scenario=dify_config.AGENT_BACKEND_FAKE_SCENARIO, + ), + "event_adapter": AgentBackendRunEventAdapter(), + "output_adapter": WorkflowAgentOutputAdapter(), + } + return { + "strategy_resolver": self._agent_strategy_resolver, + "presentation_provider": self._agent_strategy_presentation_provider, + "runtime_support": self._agent_runtime_support, + "message_transformer": self._agent_message_transformer, + } + def _build_llm_compatible_node_init_kwargs( self, *, diff --git a/api/core/workflow/nodes/agent_v2/__init__.py b/api/core/workflow/nodes/agent_v2/__init__.py new file mode 100644 index 0000000000..eee4434472 --- /dev/null +++ b/api/core/workflow/nodes/agent_v2/__init__.py @@ -0,0 +1,4 @@ +from .agent_node import DifyAgentNode +from .entities import DifyAgentNodeData + +__all__ = ["DifyAgentNode", "DifyAgentNodeData"] diff --git a/api/core/workflow/nodes/agent_v2/agent_node.py b/api/core/workflow/nodes/agent_v2/agent_node.py new file mode 100644 index 0000000000..0409579a74 --- /dev/null +++ b/api/core/workflow/nodes/agent_v2/agent_node.py @@ -0,0 +1,281 @@ +from __future__ import annotations + +from collections.abc import Generator, Mapping, Sequence +from typing import TYPE_CHECKING, Any + +from clients.agent_backend import ( + AgentBackendError, + AgentBackendHTTPError, + AgentBackendInternalEventType, + AgentBackendRunClient, + AgentBackendRunEventAdapter, + AgentBackendRunFailedInternalEvent, + AgentBackendRunSucceededInternalEvent, + AgentBackendStreamError, + AgentBackendStreamInternalEvent, + AgentBackendTransportError, + AgentBackendValidationError, +) +from core.app.entities.app_invoke_entities import DIFY_RUN_CONTEXT_KEY, DifyRunContext +from core.workflow.system_variables import SystemVariableKey, get_system_text +from graphon.enums import BuiltinNodeTypes, WorkflowNodeExecutionMetadataKey, WorkflowNodeExecutionStatus +from graphon.node_events import NodeEventBase, NodeRunResult, StreamCompletedEvent +from graphon.nodes.base.node import Node + +from .binding_resolver import WorkflowAgentBindingError, WorkflowAgentBindingResolver +from .entities import DifyAgentNodeData +from .output_adapter import WorkflowAgentOutputAdapter +from .runtime_request_builder import ( + WorkflowAgentRuntimeBuildContext, + WorkflowAgentRuntimeRequestBuilder, + WorkflowAgentRuntimeRequestBuildError, +) + +if TYPE_CHECKING: + from graphon.entities import GraphInitParams + from graphon.runtime import GraphRuntimeState + + +class DifyAgentNode(Node[DifyAgentNodeData]): + node_type = BuiltinNodeTypes.AGENT + + def __init__( + self, + node_id: str, + data: DifyAgentNodeData, + *, + graph_init_params: GraphInitParams, + graph_runtime_state: GraphRuntimeState, + binding_resolver: WorkflowAgentBindingResolver, + runtime_request_builder: WorkflowAgentRuntimeRequestBuilder, + agent_backend_client: AgentBackendRunClient, + event_adapter: AgentBackendRunEventAdapter, + output_adapter: WorkflowAgentOutputAdapter, + ) -> None: + super().__init__( + node_id=node_id, + data=data, + graph_init_params=graph_init_params, + graph_runtime_state=graph_runtime_state, + ) + self._binding_resolver = binding_resolver + self._runtime_request_builder = runtime_request_builder + self._agent_backend_client = agent_backend_client + self._event_adapter = event_adapter + self._output_adapter = output_adapter + + @classmethod + def version(cls) -> str: + return "2" + + def populate_start_event(self, event) -> None: + event.extras["agent_node"] = {"version": "2", "agent_node_kind": self.node_data.agent_node_kind} + + def _run(self) -> Generator[NodeEventBase, None, None]: + dify_ctx = DifyRunContext.model_validate(self.require_run_context_value(DIFY_RUN_CONTEXT_KEY)) + workflow_id = self.graph_init_params.workflow_id + workflow_run_id = get_system_text( + self.graph_runtime_state.variable_pool, + SystemVariableKey.WORKFLOW_EXECUTION_ID, + ) + inputs: dict[str, Any] = {} + process_data: dict[str, Any] = {} + metadata: dict[str, Any] = { + "agent_backend": { + "status": "not_started", + } + } + + try: + bundle = self._binding_resolver.resolve( + tenant_id=dify_ctx.tenant_id, + app_id=dify_ctx.app_id, + workflow_id=workflow_id, + node_id=self._node_id, + ) + runtime_request = self._runtime_request_builder.build( + WorkflowAgentRuntimeBuildContext( + dify_context=dify_ctx, + workflow_id=workflow_id, + workflow_run_id=workflow_run_id, + node_id=self._node_id, + node_execution_id=self.id, + variable_pool=self.graph_runtime_state.variable_pool, + binding=bundle.binding, + agent=bundle.agent, + snapshot=bundle.snapshot, + ) + ) + inputs = {"agent_backend_request": runtime_request.redacted_request} + metadata = dict(runtime_request.metadata) + process_data = { + "agent_id": bundle.agent.id, + "agent_config_snapshot_id": bundle.snapshot.id, + "binding_id": bundle.binding.id, + } + create_response = self._agent_backend_client.create_run(runtime_request.request) + metadata["agent_backend"] = { + **dict(metadata.get("agent_backend") or {}), + "run_id": create_response.run_id, + "status": create_response.status, + } + except WorkflowAgentBindingError as error: + yield self._failure_event( + inputs=inputs, + process_data=process_data, + metadata=metadata, + error=str(error), + error_type=error.error_code, + ) + return + except WorkflowAgentRuntimeRequestBuildError as error: + yield self._failure_event( + inputs=inputs, + process_data=process_data, + metadata=metadata, + error=str(error), + error_type=error.error_code, + ) + return + except AgentBackendError as error: + yield self._failure_event( + inputs=inputs, + process_data=process_data, + metadata=metadata, + error=str(error), + error_type=self._agent_backend_error_type(error), + ) + return + except Exception as error: + yield self._failure_event( + inputs=inputs, + process_data=process_data, + metadata=metadata, + error=str(error), + error_type="agent_workflow_node_runtime_error", + ) + return + + stream_event_count = 0 + try: + for public_event in self._agent_backend_client.stream_events(create_response.run_id): + stream_event_count += 1 + for internal_event in self._event_adapter.adapt(public_event): + if internal_event.type == AgentBackendInternalEventType.RUN_STARTED: + continue + if internal_event.type == AgentBackendInternalEventType.STREAM_EVENT: + if isinstance(internal_event, AgentBackendStreamInternalEvent): + self._record_stream_metadata(metadata, internal_event) + continue + metadata["agent_backend"] = { + **dict(metadata.get("agent_backend") or {}), + "stream_event_count": stream_event_count, + } + if isinstance(internal_event, AgentBackendRunSucceededInternalEvent): + yield StreamCompletedEvent( + node_run_result=self._output_adapter.build_success_result( + event=internal_event, + inputs=inputs, + process_data=process_data, + metadata=metadata, + ) + ) + return + if isinstance( + internal_event, + AgentBackendRunFailedInternalEvent, + ) or internal_event.type in { + AgentBackendInternalEventType.RUN_CANCELLED, + AgentBackendInternalEventType.RUN_PAUSED, + }: + yield StreamCompletedEvent( + node_run_result=self._output_adapter.build_failure_result( + event=internal_event, + inputs=inputs, + process_data=process_data, + metadata=metadata, + ) + ) + return + except AgentBackendError as error: + yield self._failure_event( + inputs=inputs, + process_data=process_data, + metadata=metadata, + error=str(error), + error_type=self._agent_backend_error_type(error), + ) + return + except Exception as error: + yield self._failure_event( + inputs=inputs, + process_data=process_data, + metadata=metadata, + error=str(error), + error_type="agent_backend_stream_error", + ) + return + + yield StreamCompletedEvent( + node_run_result=self._output_adapter.build_stream_exhausted_result( + inputs=inputs, + process_data=process_data, + metadata=metadata, + ) + ) + + @staticmethod + def _failure_event( + *, + inputs: dict[str, Any], + process_data: dict[str, Any], + metadata: dict[str, Any], + error: str, + error_type: str, + ) -> StreamCompletedEvent: + return StreamCompletedEvent( + node_run_result=NodeRunResult( + status=WorkflowNodeExecutionStatus.FAILED, + inputs=inputs, + process_data=process_data, + metadata={WorkflowNodeExecutionMetadataKey.AGENT_LOG: metadata}, + outputs={}, + error=error, + error_type=error_type, + ) + ) + + @staticmethod + def _agent_backend_error_type(error: AgentBackendError) -> str: + if isinstance(error, AgentBackendValidationError): + return "agent_backend_validation_error" + if isinstance(error, AgentBackendHTTPError): + return "agent_backend_http_error" + if isinstance(error, AgentBackendStreamError): + return "agent_backend_stream_error" + if isinstance(error, AgentBackendTransportError): + return "agent_backend_transport_error" + return "agent_backend_error" + + @staticmethod + def _record_stream_metadata(metadata: dict[str, Any], event: AgentBackendStreamInternalEvent) -> None: + agent_backend = dict(metadata.get("agent_backend") or {}) + agent_backend["last_stream_event_id"] = event.source_event_id + if event.event_kind: + agent_backend["last_stream_event_kind"] = event.event_kind + if isinstance(event.data, Mapping): + usage = event.data.get("usage") or event.data.get("model_usage") + if isinstance(usage, Mapping): + agent_backend["usage"] = dict(usage) + metadata["agent_backend"] = agent_backend + + @classmethod + def _extract_variable_selector_to_variable_mapping( + cls, + *, + graph_config: Mapping[str, Any], + node_id: str, + node_data: DifyAgentNodeData, + ) -> Mapping[str, Sequence[str]]: + del graph_config, node_id, node_data + return {} diff --git a/api/core/workflow/nodes/agent_v2/binding_resolver.py b/api/core/workflow/nodes/agent_v2/binding_resolver.py new file mode 100644 index 0000000000..d2f50b0ae4 --- /dev/null +++ b/api/core/workflow/nodes/agent_v2/binding_resolver.py @@ -0,0 +1,93 @@ +from __future__ import annotations + +from dataclasses import dataclass + +from sqlalchemy import select + +from core.db.session_factory import session_factory +from models.agent import Agent, AgentConfigSnapshot, AgentStatus, WorkflowAgentNodeBinding + + +class WorkflowAgentBindingError(Exception): + error_code: str + + def __init__(self, error_code: str, message: str) -> None: + self.error_code = error_code + super().__init__(message) + + +@dataclass(frozen=True, slots=True) +class WorkflowAgentBindingBundle: + binding: WorkflowAgentNodeBinding + agent: Agent + snapshot: AgentConfigSnapshot + + +class WorkflowAgentBindingResolver: + """Resolve the Agent binding owned by the current workflow id and node id.""" + + def resolve( + self, + *, + tenant_id: str, + app_id: str, + workflow_id: str, + node_id: str, + ) -> WorkflowAgentBindingBundle: + with session_factory.create_session() as session: + binding = session.scalar( + select(WorkflowAgentNodeBinding) + .where( + WorkflowAgentNodeBinding.tenant_id == tenant_id, + WorkflowAgentNodeBinding.app_id == app_id, + WorkflowAgentNodeBinding.workflow_id == workflow_id, + WorkflowAgentNodeBinding.node_id == node_id, + ) + .limit(1) + ) + if binding is None: + raise WorkflowAgentBindingError( + "agent_binding_not_found", + f"Workflow Agent binding not found for node {node_id}.", + ) + if binding.agent_id is None: + raise WorkflowAgentBindingError("agent_not_available", "Workflow Agent binding has no agent.") + if binding.current_snapshot_id is None: + raise WorkflowAgentBindingError( + "agent_config_snapshot_not_found", + "Workflow Agent binding has no current config snapshot.", + ) + + agent = session.scalar( + select(Agent) + .where( + Agent.tenant_id == tenant_id, + Agent.id == binding.agent_id, + ) + .limit(1) + ) + if agent is None or agent.status == AgentStatus.ARCHIVED: + raise WorkflowAgentBindingError( + "agent_not_available", + f"Agent {binding.agent_id} is not available.", + ) + + snapshot = session.scalar( + select(AgentConfigSnapshot) + .where( + AgentConfigSnapshot.tenant_id == tenant_id, + AgentConfigSnapshot.agent_id == agent.id, + AgentConfigSnapshot.id == binding.current_snapshot_id, + ) + .limit(1) + ) + if snapshot is None: + raise WorkflowAgentBindingError( + "agent_config_snapshot_not_found", + f"Agent config snapshot {binding.current_snapshot_id} not found.", + ) + + session.expunge(binding) + session.expunge(agent) + session.expunge(snapshot) + return WorkflowAgentBindingBundle(binding=binding, agent=agent, snapshot=snapshot) diff --git a/api/core/workflow/nodes/agent_v2/entities.py b/api/core/workflow/nodes/agent_v2/entities.py new file mode 100644 index 0000000000..eb36b9cf1e --- /dev/null +++ b/api/core/workflow/nodes/agent_v2/entities.py @@ -0,0 +1,17 @@ +from typing import Literal + +from pydantic import model_validator + +from graphon.entities.base_node_data import BaseNodeData +from graphon.enums import BuiltinNodeTypes, NodeType + + +class DifyAgentNodeData(BaseNodeData): + type: NodeType = BuiltinNodeTypes.AGENT + agent_node_kind: Literal["dify_agent"] = "dify_agent" + + @model_validator(mode="after") + def validate_version(self) -> "DifyAgentNodeData": + if self.version != "2": + raise ValueError("Dify Agent Node v2 requires version='2'") + return self diff --git a/api/core/workflow/nodes/agent_v2/output_adapter.py b/api/core/workflow/nodes/agent_v2/output_adapter.py new file mode 100644 index 0000000000..0aecfec4a6 --- /dev/null +++ b/api/core/workflow/nodes/agent_v2/output_adapter.py @@ -0,0 +1,255 @@ +from __future__ import annotations + +from collections.abc import Mapping +from typing import Any + +from clients.agent_backend import ( + AgentBackendInternalEvent, + AgentBackendInternalEventType, + AgentBackendRunCancelledInternalEvent, + AgentBackendRunFailedInternalEvent, + AgentBackendRunPausedInternalEvent, + AgentBackendRunSucceededInternalEvent, +) +from graphon.enums import WorkflowNodeExecutionMetadataKey, WorkflowNodeExecutionStatus +from graphon.file import File, FileTransferMethod, FileType +from graphon.model_runtime.entities.llm_entities import LLMUsage +from graphon.node_events import NodeRunResult +from graphon.variables.segments import ArrayFileSegment, FileSegment + + +class WorkflowAgentOutputAdapter: + """Convert terminal Agent backend events into workflow node run results.""" + + def build_success_result( + self, + *, + event: AgentBackendRunSucceededInternalEvent, + inputs: dict[str, Any], + process_data: dict[str, Any], + metadata: dict[str, Any], + ) -> NodeRunResult: + metadata = self._with_terminal_metadata(metadata, event, "succeeded") + usage = self._usage_from_metadata(metadata) + return NodeRunResult( + status=WorkflowNodeExecutionStatus.SUCCEEDED, + inputs=inputs, + process_data=process_data, + outputs=self._normalize_outputs(event.output), + metadata=self._build_node_metadata(metadata=metadata, usage=usage), + llm_usage=usage or LLMUsage.empty_usage(), + ) + + def build_failure_result( + self, + *, + event: ( + AgentBackendRunFailedInternalEvent + | AgentBackendRunCancelledInternalEvent + | AgentBackendRunPausedInternalEvent + ), + inputs: dict[str, Any], + process_data: dict[str, Any], + metadata: dict[str, Any], + ) -> NodeRunResult: + status = WorkflowNodeExecutionStatus.FAILED + error = "Agent backend run failed." + error_type = "agent_backend_run_failed" + terminal_status = "failed" + + match event: + case AgentBackendRunFailedInternalEvent(): + error = event.error + error_type = event.reason or "agent_backend_run_failed" + terminal_status = "failed" + case AgentBackendRunCancelledInternalEvent(): + error = event.message or "Agent backend run was cancelled." + error_type = "agent_backend_run_cancelled" + terminal_status = "cancelled" + case AgentBackendRunPausedInternalEvent(): + error = event.message or "Agent backend run paused, but workflow Agent Node pause is not supported yet." + error_type = "agent_backend_paused_unsupported" + terminal_status = "paused" + + metadata = self._with_terminal_metadata(metadata, event, terminal_status) + usage = self._usage_from_metadata(metadata) + return NodeRunResult( + status=status, + inputs=inputs, + process_data=process_data, + metadata=self._build_node_metadata(metadata=metadata, usage=usage), + llm_usage=usage or LLMUsage.empty_usage(), + error=error, + error_type=error_type, + ) + + def build_stream_exhausted_result( + self, + *, + inputs: dict[str, Any], + process_data: dict[str, Any], + metadata: dict[str, Any], + ) -> NodeRunResult: + usage = self._usage_from_metadata(metadata) + return NodeRunResult( + status=WorkflowNodeExecutionStatus.FAILED, + inputs=inputs, + process_data=process_data, + metadata=self._build_node_metadata(metadata=metadata, usage=usage), + llm_usage=usage or LLMUsage.empty_usage(), + error="Agent backend stream ended before a terminal event.", + error_type="agent_backend_stream_error", + ) + + @classmethod + def _normalize_outputs(cls, output: Any) -> dict[str, Any]: + if isinstance(output, dict): + if cls._is_file_payload(output): + return {"file": cls._file_segment_from_payload(output)} + return {key: cls._normalize_output_value(value) for key, value in output.items()} + if isinstance(output, str): + return {"text": output} + return {"result": output} + + @classmethod + def _normalize_output_value(cls, value: Any) -> Any: + if isinstance(value, File | FileSegment | ArrayFileSegment): + return value + if isinstance(value, Mapping): + if cls._is_file_payload(value): + return cls._file_segment_from_payload(value) + return {key: cls._normalize_output_value(item) for key, item in value.items()} + if isinstance(value, list): + if value and all(isinstance(item, Mapping) and cls._is_file_payload(item) for item in value): + return ArrayFileSegment(value=[cls._file_from_payload(item) for item in value]) + return [cls._normalize_output_value(item) for item in value] + return value + + @staticmethod + def _is_file_payload(value: Mapping[str, Any]) -> bool: + return any(value.get(key) for key in ("file_id", "upload_file_id", "tool_file_id", "url", "remote_url")) and ( + "filename" in value or "mime_type" in value or "url" in value or "remote_url" in value + ) + + @classmethod + def _file_segment_from_payload(cls, value: Mapping[str, Any]) -> FileSegment: + return FileSegment(value=cls._file_from_payload(value)) + + @classmethod + def _file_from_payload(cls, value: Mapping[str, Any]) -> File: + remote_url = cls._string_value(value.get("remote_url") or value.get("url")) + upload_file_id = cls._string_value(value.get("upload_file_id") or value.get("file_id")) + tool_file_id = cls._string_value(value.get("tool_file_id")) + filename = cls._string_value(value.get("filename") or value.get("name")) + mime_type = cls._string_value(value.get("mime_type") or value.get("mimetype")) + extension = cls._extension_from_payload(value, filename) + file_type = cls._file_type_from_payload(value, mime_type) + size = value.get("size") + if not isinstance(size, int): + size = -1 + + if tool_file_id: + transfer_method = FileTransferMethod.TOOL_FILE + related_id = tool_file_id + elif remote_url: + transfer_method = FileTransferMethod.REMOTE_URL + related_id = None + else: + transfer_method = FileTransferMethod.LOCAL_FILE + related_id = upload_file_id + + return File( + type=file_type, + transfer_method=transfer_method, + remote_url=remote_url if transfer_method == FileTransferMethod.REMOTE_URL else None, + related_id=related_id, + filename=filename, + extension=extension, + mime_type=mime_type, + size=size, + ) + + @staticmethod + def _string_value(value: Any) -> str | None: + return value if isinstance(value, str) and value else None + + @classmethod + def _extension_from_payload(cls, value: Mapping[str, Any], filename: str | None) -> str | None: + extension = cls._string_value(value.get("extension")) + if extension: + return extension if extension.startswith(".") else f".{extension}" + if filename and "." in filename: + return f".{filename.rsplit('.', 1)[1]}" + return None + + @staticmethod + def _file_type_from_payload(value: Mapping[str, Any], mime_type: str | None) -> FileType: + explicit_type = value.get("type") or value.get("file_type") + if isinstance(explicit_type, str): + try: + return FileType(explicit_type) + except ValueError: + pass + if mime_type: + if mime_type.startswith("image/"): + return FileType.IMAGE + if mime_type.startswith("audio/"): + return FileType.AUDIO + if mime_type.startswith("video/"): + return FileType.VIDEO + return FileType.DOCUMENT + return FileType.CUSTOM + + @staticmethod + def _usage_from_metadata(metadata: Mapping[str, Any]) -> LLMUsage | None: + agent_backend = metadata.get("agent_backend") + if not isinstance(agent_backend, Mapping): + return None + usage = agent_backend.get("usage") + if not isinstance(usage, Mapping): + return None + try: + return LLMUsage.from_metadata(usage) + except (TypeError, ValueError): + return None + + @staticmethod + def _build_node_metadata( + *, + metadata: dict[str, Any], + usage: LLMUsage | None, + ) -> dict[WorkflowNodeExecutionMetadataKey, Any]: + node_metadata: dict[WorkflowNodeExecutionMetadataKey, Any] = { + WorkflowNodeExecutionMetadataKey.AGENT_LOG: metadata, + } + if usage is not None: + node_metadata[WorkflowNodeExecutionMetadataKey.TOTAL_TOKENS] = usage.total_tokens + node_metadata[WorkflowNodeExecutionMetadataKey.TOTAL_PRICE] = usage.total_price + node_metadata[WorkflowNodeExecutionMetadataKey.CURRENCY] = usage.currency + return node_metadata + + @staticmethod + def _with_terminal_metadata( + metadata: dict[str, Any], + event: AgentBackendInternalEvent, + terminal_status: str, + ) -> dict[str, Any]: + updated = dict(metadata) + agent_backend = dict(updated.get("agent_backend") or {}) + agent_backend.update( + { + "run_id": event.run_id, + "terminal_event_id": event.source_event_id, + "status": terminal_status, + } + ) + session_snapshot = None + if isinstance(event, AgentBackendRunSucceededInternalEvent | AgentBackendRunPausedInternalEvent): + session_snapshot = event.session_snapshot + if session_snapshot is not None: + agent_backend["session_snapshot"] = { + "layer_count": len(session_snapshot.layers), + } + updated["agent_backend"] = agent_backend + updated["terminal_event_type"] = AgentBackendInternalEventType(event.type).value + return updated diff --git a/api/core/workflow/nodes/agent_v2/runtime_feature_manifest.py b/api/core/workflow/nodes/agent_v2/runtime_feature_manifest.py new file mode 100644 index 0000000000..afd730f652 --- /dev/null +++ b/api/core/workflow/nodes/agent_v2/runtime_feature_manifest.py @@ -0,0 +1,55 @@ +from __future__ import annotations + +from typing import Any + +from models.agent_config_entities import AgentSoulConfig + +SUPPORTED_AGENT_BACKEND_FEATURES = frozenset( + { + "system_prompt", + "workflow_prompt", + "workflow_context", + "model", + "structured_output", + } +) + +RESERVED_AGENT_BACKEND_FEATURES = frozenset( + { + "skills_files", + "tools", + "knowledge", + "human", + "env", + "sandbox", + "memory", + } +) + + +def build_runtime_feature_manifest(agent_soul: AgentSoulConfig) -> dict[str, Any]: + """Describe PRD capabilities that are persisted but not executed in phase 3.""" + warnings: list[dict[str, str]] = [] + soul_dump = agent_soul.model_dump(mode="json") + for section in sorted(RESERVED_AGENT_BACKEND_FEATURES): + value = soul_dump.get(section) + has_value = bool(value) + if isinstance(value, dict): + has_value = any(bool(item) for item in value.values()) + if has_value: + warnings.append( + { + "section": f"agent_soul.{section}", + "code": "agent_backend_layer_not_available", + "message": f"{section} is saved in Agent Soul but is not executed by Agent backend in phase 3.", + } + ) + + reserved_status = dict.fromkeys(sorted(RESERVED_AGENT_BACKEND_FEATURES), "reserved_not_executed") + + return { + "supported": sorted(SUPPORTED_AGENT_BACKEND_FEATURES), + "reserved": sorted(RESERVED_AGENT_BACKEND_FEATURES), + "reserved_status": reserved_status, + "unsupported_runtime_warnings": warnings, + } diff --git a/api/core/workflow/nodes/agent_v2/runtime_request_builder.py b/api/core/workflow/nodes/agent_v2/runtime_request_builder.py new file mode 100644 index 0000000000..66a4418b68 --- /dev/null +++ b/api/core/workflow/nodes/agent_v2/runtime_request_builder.py @@ -0,0 +1,288 @@ +from __future__ import annotations + +from collections.abc import Mapping, Sequence +from dataclasses import dataclass +from typing import Any, Literal, Protocol, cast + +from dify_agent.protocol import CreateRunRequest, ExecutionContext + +from clients.agent_backend import ( + AgentBackendModelConfig, + AgentBackendOutputConfig, + AgentBackendRunRequestBuilder, + AgentBackendWorkflowNodeRunInput, + redact_for_agent_backend_log, +) +from core.app.entities.app_invoke_entities import DifyRunContext, InvokeFrom +from core.workflow.system_variables import SystemVariableKey, get_system_text +from graphon.variables.segments import Segment +from models.agent import Agent, AgentConfigSnapshot, WorkflowAgentNodeBinding +from models.agent_config_entities import ( + AgentSoulConfig, + DeclaredOutputConfig, + DeclaredOutputType, + WorkflowNodeJobConfig, +) + +from .runtime_feature_manifest import build_runtime_feature_manifest + + +class WorkflowAgentRuntimeRequestBuildError(ValueError): + """Raised when workflow state cannot be mapped to a valid Agent backend run request.""" + + def __init__(self, error_code: str, message: str) -> None: + self.error_code = error_code + super().__init__(message) + + +class VariablePoolReader(Protocol): + def get(self, selector: Sequence[str], /) -> Segment | None: ... + + def get_by_prefix(self, prefix: str, /) -> Mapping[str, object]: ... + + +class CredentialsProvider(Protocol): + def fetch(self, provider_name: str, model_name: str) -> dict[str, Any]: ... + + +@dataclass(frozen=True, slots=True) +class WorkflowAgentRuntimeBuildContext: + dify_context: DifyRunContext + workflow_id: str + workflow_run_id: str | None + node_id: str + node_execution_id: str + variable_pool: VariablePoolReader + binding: WorkflowAgentNodeBinding + agent: Agent + snapshot: AgentConfigSnapshot + + +@dataclass(frozen=True, slots=True) +class WorkflowAgentRuntimeRequest: + request: CreateRunRequest + redacted_request: dict[str, Any] + agent_soul: AgentSoulConfig + node_job: WorkflowNodeJobConfig + metadata: dict[str, Any] + + +class WorkflowAgentRuntimeRequestBuilder: + """Build public Dify Agent run requests from workflow Agent v2 runtime state.""" + + def __init__( + self, + *, + credentials_provider: CredentialsProvider, + request_builder: AgentBackendRunRequestBuilder | None = None, + ) -> None: + self._credentials_provider = credentials_provider + self._request_builder = request_builder or AgentBackendRunRequestBuilder() + + def build(self, context: WorkflowAgentRuntimeBuildContext) -> WorkflowAgentRuntimeRequest: + agent_soul = AgentSoulConfig.model_validate(context.snapshot.config_snapshot_dict) + node_job = WorkflowNodeJobConfig.model_validate(context.binding.node_job_config_dict) + if agent_soul.model is None: + raise WorkflowAgentRuntimeRequestBuildError( + "agent_model_not_configured", + "Workflow Agent node requires Agent Soul model config.", + ) + + metadata = self._build_metadata(context, agent_soul, node_job) + workflow_context_prompt = self._build_workflow_context_prompt(context, node_job) + workflow_job_prompt = node_job.workflow_prompt.strip() or "Run this workflow Agent Node for the current run." + user_prompt = workflow_context_prompt.strip() or "Use the current workflow context." + credentials = self._credentials_provider.fetch(agent_soul.model.model_provider, agent_soul.model.model) + + request = self._request_builder.build_for_workflow_node( + AgentBackendWorkflowNodeRunInput( + model=AgentBackendModelConfig( + tenant_id=context.dify_context.tenant_id, + plugin_id=agent_soul.model.plugin_id, + model_provider=agent_soul.model.model_provider, + model=agent_soul.model.model, + user_id=context.dify_context.user_id, + credentials=self._normalize_credentials(credentials), + model_settings=cast(dict[str, Any], agent_soul.model.model_settings), + ), + execution_context=ExecutionContext( + tenant_id=context.dify_context.tenant_id, + app_id=context.dify_context.app_id, + workflow_id=context.workflow_id, + workflow_run_id=context.workflow_run_id, + node_id=context.node_id, + node_execution_id=context.node_execution_id, + conversation_id=get_system_text(context.variable_pool, SystemVariableKey.CONVERSATION_ID), + agent_id=context.agent.id, + agent_config_version_id=context.snapshot.id, + invoke_from=self._agent_backend_invoke_from(context.dify_context.invoke_from), + ), + agent_soul_prompt=agent_soul.prompt.system_prompt or None, + workflow_node_job_prompt=workflow_job_prompt, + user_prompt=user_prompt, + output=self._build_output_config(node_job.declared_outputs), + idempotency_key=self._idempotency_key(context), + metadata=metadata, + ) + ) + redacted = cast(dict[str, Any], redact_for_agent_backend_log(request)) + return WorkflowAgentRuntimeRequest( + request=request, + redacted_request=redacted, + agent_soul=agent_soul, + node_job=node_job, + metadata=metadata, + ) + + @staticmethod + def _agent_backend_invoke_from(invoke_from: InvokeFrom) -> Literal["workflow_run", "single_step"]: + if invoke_from in {InvokeFrom.DEBUGGER, InvokeFrom.VALIDATION}: + return "single_step" + return "workflow_run" + + @staticmethod + def _idempotency_key(context: WorkflowAgentRuntimeBuildContext) -> str: + if context.workflow_run_id: + return f"{context.workflow_run_id}:{context.node_execution_id}" + return context.node_execution_id + + @staticmethod + def _build_metadata( + context: WorkflowAgentRuntimeBuildContext, + agent_soul: AgentSoulConfig, + node_job: WorkflowNodeJobConfig, + ) -> dict[str, Any]: + return { + "tenant_id": context.dify_context.tenant_id, + "app_id": context.dify_context.app_id, + "workflow_id": context.workflow_id, + "workflow_run_id": context.workflow_run_id, + "node_id": context.node_id, + "node_execution_id": context.node_execution_id, + "agent_id": context.agent.id, + "agent_config_snapshot_id": context.snapshot.id, + "binding_id": context.binding.id, + "workflow_node_job_mode": node_job.mode.value, + "runtime_support": build_runtime_feature_manifest(agent_soul), + } + + def _build_workflow_context_prompt( + self, + context: WorkflowAgentRuntimeBuildContext, + node_job: WorkflowNodeJobConfig, + ) -> str: + lines = ["Workflow context loaded for this run:"] + query = get_system_text(context.variable_pool, SystemVariableKey.QUERY) + if query: + lines.append(f"- User query: {query}") + + resolved_outputs = self._resolve_previous_node_outputs( + context.variable_pool, + node_job.previous_node_output_refs, + ) + if resolved_outputs: + lines.append("- Previous node outputs:") + for item in resolved_outputs: + lines.append(f" - {item['label']}: {item['value']}") + + lines.append("The above workflow context is run-specific. Do not treat it as Agent Soul or persistent memory.") + return "\n".join(lines) + + def _resolve_previous_node_outputs( + self, + variable_pool: VariablePoolReader, + refs: Sequence[Mapping[str, Any]], + ) -> list[dict[str, Any]]: + resolved: list[dict[str, Any]] = [] + for ref in refs: + selector = self._selector_from_ref(ref) + if not selector: + raise WorkflowAgentRuntimeRequestBuildError( + "invalid_previous_node_output_ref", + "Workflow Agent node has invalid previous node output ref.", + ) + segment = variable_pool.get(selector) + if segment is None: + raise WorkflowAgentRuntimeRequestBuildError( + "missing_previous_node_output", + f"Workflow Agent node cannot resolve previous node output {'.'.join(selector)}.", + ) + value = getattr(segment, "value", None) + resolved.append( + { + "label": ".".join(selector), + "value": self._summarize_value(value), + } + ) + return resolved + + @staticmethod + def _selector_from_ref(ref: Mapping[str, Any]) -> list[str] | None: + for key in ("selector", "variable_selector", "value_selector"): + value = ref.get(key) + if isinstance(value, list) and all(isinstance(item, str) for item in value): + return value + node_id = ref.get("node_id") + output_name = ref.get("output") or ref.get("name") or ref.get("variable") or ref.get("key") + if isinstance(node_id, str) and isinstance(output_name, str): + return [node_id, output_name] + return None + + @staticmethod + def _summarize_value(value: Any) -> str: + text = str(value) + if len(text) > 2000: + return text[:2000] + "...[truncated]" + return text + + @staticmethod + def _build_output_config(declared_outputs: Sequence[DeclaredOutputConfig]) -> AgentBackendOutputConfig | None: + if not declared_outputs: + return None + properties: dict[str, Any] = {} + required: list[str] = [] + for output in declared_outputs: + properties[output.name] = WorkflowAgentRuntimeRequestBuilder._schema_for_declared_output(output) + if output.required: + required.append(output.name) + schema: dict[str, Any] = {"type": "object", "properties": properties} + if required: + schema["required"] = required + return AgentBackendOutputConfig(json_schema=schema) + + @staticmethod + def _schema_for_declared_output(output: DeclaredOutputConfig) -> dict[str, Any]: + match output.type: + case DeclaredOutputType.STRING: + schema: dict[str, Any] = {"type": "string"} + case DeclaredOutputType.NUMBER: + schema = {"type": "number"} + case DeclaredOutputType.BOOLEAN: + schema = {"type": "boolean"} + case DeclaredOutputType.OBJECT: + schema = {"type": "object"} + case DeclaredOutputType.ARRAY: + schema = {"type": "array"} + case DeclaredOutputType.FILE: + schema = { + "type": "object", + "properties": { + "file_id": {"type": "string"}, + "filename": {"type": "string"}, + "mime_type": {"type": "string"}, + "url": {"type": "string"}, + }, + } + if output.description: + schema["description"] = output.description + return schema + + @staticmethod + def _normalize_credentials(credentials: Mapping[str, Any]) -> dict[str, str | int | float | bool | None]: + normalized: dict[str, str | int | float | bool | None] = {} + for key, value in credentials.items(): + if isinstance(value, str | int | float | bool) or value is None: + normalized[key] = value + else: + normalized[key] = str(value) + return normalized diff --git a/api/core/workflow/nodes/agent_v2/validators.py b/api/core/workflow/nodes/agent_v2/validators.py new file mode 100644 index 0000000000..f54be8621a --- /dev/null +++ b/api/core/workflow/nodes/agent_v2/validators.py @@ -0,0 +1,388 @@ +from __future__ import annotations + +from collections import defaultdict, deque +from collections.abc import Iterator, Mapping, Sequence +from typing import Any + +from sqlalchemy import select +from sqlalchemy.orm import Session + +from graphon.enums import BuiltinNodeTypes +from models.agent import Agent, AgentConfigSnapshot, AgentStatus, WorkflowAgentNodeBinding +from models.agent_config_entities import AgentSoulConfig, WorkflowNodeJobConfig +from models.model import UploadFile +from models.workflow import Workflow + +from .entities import DifyAgentNodeData + + +class WorkflowAgentNodeValidationError(ValueError): + """Raised when a Workflow Agent v2 node cannot be executed or published.""" + + +class WorkflowAgentNodeValidator: + """Validate Agent v2 workflow nodes against graph topology and persisted bindings.""" + + _LOCKED_AGENT_SOUL_KEYS = frozenset( + { + "agent_soul", + "soul", + "prompt", + "system_prompt", + "skills_files", + "skills", + "files", + "tools", + "dify_tools", + "cli_tools", + "knowledge", + "env", + "environment", + "sandbox", + "sandbox_provider", + "memory", + "memory_strategy", + "model", + "app_features", + "app_variables", + "misc_legacy", + } + ) + _SUPPORTED_HUMAN_CONTACT_CHANNELS = frozenset({"email", "slack", "web_app", "webapp", "chat"}) + + @classmethod + def validate_draft_workflow(cls, *, session: Session, workflow: Workflow) -> None: + cls._validate_workflow(session=session, workflow=workflow, require_binding=False) + + @classmethod + def validate_published_workflow(cls, *, session: Session, workflow: Workflow) -> None: + cls._validate_workflow(session=session, workflow=workflow, require_binding=True) + + @classmethod + def _validate_workflow(cls, *, session: Session, workflow: Workflow, require_binding: bool) -> None: + graph = workflow.graph_dict + topology = _WorkflowGraphTopology.from_graph(graph) + for node_id, node_data in cls.iter_agent_v2_nodes(graph): + cls._validate_node_schema(node_id=node_id, node_data=node_data) + binding = cls._find_binding( + session=session, + tenant_id=workflow.tenant_id, + app_id=workflow.app_id, + workflow_id=workflow.id, + node_id=node_id, + ) + if binding is None: + if require_binding: + raise WorkflowAgentNodeValidationError( + f"Workflow Agent node {node_id} requires a binding before publishing." + ) + continue + cls.validate_binding(session=session, binding=binding, topology=topology) + + @classmethod + def validate_binding( + cls, + *, + session: Session, + binding: WorkflowAgentNodeBinding, + topology: _WorkflowGraphTopology | None = None, + ) -> None: + if binding.agent_id is None: + raise WorkflowAgentNodeValidationError(f"Workflow Agent node {binding.node_id} is missing agent binding.") + if binding.current_snapshot_id is None: + raise WorkflowAgentNodeValidationError( + f"Workflow Agent node {binding.node_id} is missing config snapshot binding." + ) + + agent = session.scalar( + select(Agent) + .where( + Agent.tenant_id == binding.tenant_id, + Agent.id == binding.agent_id, + ) + .limit(1) + ) + if agent is None or agent.status == AgentStatus.ARCHIVED: + raise WorkflowAgentNodeValidationError( + f"Workflow Agent node {binding.node_id} references an unavailable agent." + ) + + snapshot = session.scalar( + select(AgentConfigSnapshot) + .where( + AgentConfigSnapshot.tenant_id == binding.tenant_id, + AgentConfigSnapshot.agent_id == agent.id, + AgentConfigSnapshot.id == binding.current_snapshot_id, + ) + .limit(1) + ) + if snapshot is None: + raise WorkflowAgentNodeValidationError( + f"Workflow Agent node {binding.node_id} references a missing config snapshot." + ) + + agent_soul = AgentSoulConfig.model_validate(snapshot.config_snapshot_dict) + if agent_soul.model is None: + raise WorkflowAgentNodeValidationError( + f"Workflow Agent node {binding.node_id} requires Agent Soul model config." + ) + node_job = WorkflowNodeJobConfig.model_validate(binding.node_job_config_dict) + cls.validate_node_job(session=session, binding=binding, node_job=node_job, topology=topology) + + @classmethod + def validate_node_job( + cls, + *, + session: Session, + binding: WorkflowAgentNodeBinding, + node_job: WorkflowNodeJobConfig, + topology: _WorkflowGraphTopology | None = None, + ) -> None: + cls._validate_locked_agent_soul_not_overridden(binding=binding, node_job=node_job) + + output_names: set[str] = set() + for output in node_job.declared_outputs: + if output.name in output_names: + raise WorkflowAgentNodeValidationError( + f"Workflow Agent node {binding.node_id} has duplicate output name {output.name}." + ) + output_names.add(output.name) + for check in output.checks: + if check.benchmark_file_ref is not None: + cls._validate_file_ref( + session=session, + binding=binding, + file_ref=check.benchmark_file_ref, + ref_context=f"output {output.name} benchmark file", + ) + + for ref in node_job.previous_node_output_refs: + selector = cls.selector_from_ref(ref) + if selector is None: + raise WorkflowAgentNodeValidationError( + f"Workflow Agent node {binding.node_id} has invalid previous node output ref." + ) + if topology is None: + continue + if len(selector) < 2: + raise WorkflowAgentNodeValidationError( + f"Workflow Agent node {binding.node_id} has incomplete previous node output ref." + ) + source_node_id = selector[0] + if not topology.has_node(source_node_id): + raise WorkflowAgentNodeValidationError( + f"Workflow Agent node {binding.node_id} references missing previous node {source_node_id}." + ) + if not topology.is_upstream(source_node_id=source_node_id, target_node_id=binding.node_id): + raise WorkflowAgentNodeValidationError( + f"Workflow Agent node {binding.node_id} references non-upstream previous node {source_node_id}." + ) + + for human_ref in node_job.human_contacts: + cls._validate_human_ref(binding=binding, human_ref=human_ref) + + file_refs = node_job.metadata.get("file_refs") + if isinstance(file_refs, list): + for file_ref in file_refs: + if isinstance(file_ref, Mapping): + cls._validate_file_ref( + session=session, + binding=binding, + file_ref=file_ref, + ref_context="metadata file ref", + ) + + @staticmethod + def iter_agent_v2_nodes(graph_dict: Mapping[str, Any]) -> Iterator[tuple[str, Mapping[str, Any]]]: + nodes = graph_dict.get("nodes") + if not isinstance(nodes, list): + return + for node in nodes: + if not isinstance(node, Mapping): + continue + node_id = node.get("id") + node_data = node.get("data") + if not isinstance(node_id, str) or not isinstance(node_data, Mapping): + continue + if node_data.get("type") == BuiltinNodeTypes.AGENT and str(node_data.get("version")) == "2": + yield node_id, node_data + + @staticmethod + def selector_from_ref(ref: Mapping[str, Any]) -> list[str] | None: + for key in ("selector", "variable_selector", "value_selector"): + value = ref.get(key) + if isinstance(value, list) and all(isinstance(item, str) for item in value): + return value + node_id = ref.get("node_id") + output_name = ref.get("output") or ref.get("name") or ref.get("variable") or ref.get("key") + if isinstance(node_id, str) and isinstance(output_name, str): + return [node_id, output_name] + return None + + @staticmethod + def _validate_node_schema(*, node_id: str, node_data: Mapping[str, Any]) -> None: + try: + DifyAgentNodeData.model_validate(node_data) + except ValueError as exc: + raise WorkflowAgentNodeValidationError( + f"Workflow Agent node {node_id} has invalid Agent v2 node schema: {exc}" + ) from exc + + @classmethod + def _validate_locked_agent_soul_not_overridden( + cls, + *, + binding: WorkflowAgentNodeBinding, + node_job: WorkflowNodeJobConfig, + ) -> None: + forbidden_paths = cls._find_locked_agent_soul_paths(node_job.metadata) + if forbidden_paths: + raise WorkflowAgentNodeValidationError( + f"Workflow Agent node {binding.node_id} cannot override locked Agent Soul fields: " + f"{', '.join(sorted(forbidden_paths))}." + ) + + @classmethod + def _find_locked_agent_soul_paths(cls, value: Any, *, path: str = "metadata") -> set[str]: + if not isinstance(value, Mapping): + return set() + forbidden: set[str] = set() + for key, item in value.items(): + key_text = str(key) + if key_text in cls._LOCKED_AGENT_SOUL_KEYS: + forbidden.add(f"{path}.{key_text}") + forbidden.update(cls._find_locked_agent_soul_paths(item, path=f"{path}.{key_text}")) + return forbidden + + @classmethod + def _validate_human_ref( + cls, + *, + binding: WorkflowAgentNodeBinding, + human_ref: Mapping[str, Any], + ) -> None: + contact_id = human_ref.get("contact_id") or human_ref.get("human_id") or human_ref.get("id") + if not isinstance(contact_id, str) or not contact_id: + raise WorkflowAgentNodeValidationError( + f"Workflow Agent node {binding.node_id} has invalid human contact ref." + ) + + tenant_id = human_ref.get("tenant_id") + if tenant_id is not None and tenant_id != binding.tenant_id: + raise WorkflowAgentNodeValidationError( + f"Workflow Agent node {binding.node_id} references out-of-scope human contact {contact_id}." + ) + + channel = human_ref.get("channel") or human_ref.get("method") or human_ref.get("contact_method") + if channel is not None and channel not in cls._SUPPORTED_HUMAN_CONTACT_CHANNELS: + raise WorkflowAgentNodeValidationError( + f"Workflow Agent node {binding.node_id} references unsupported human contact channel {channel}." + ) + + @staticmethod + def _validate_file_ref( + *, + session: Session, + binding: WorkflowAgentNodeBinding, + file_ref: Mapping[str, Any], + ref_context: str, + ) -> None: + tenant_id = file_ref.get("tenant_id") + if tenant_id is not None and tenant_id != binding.tenant_id: + raise WorkflowAgentNodeValidationError( + f"Workflow Agent node {binding.node_id} references out-of-scope {ref_context}." + ) + + upload_file_id = ( + file_ref.get("upload_file_id") or file_ref.get("file_id") or file_ref.get("id") or file_ref.get("reference") + ) + if upload_file_id is None and (file_ref.get("url") or file_ref.get("remote_url")): + return + if not isinstance(upload_file_id, str) or not upload_file_id: + raise WorkflowAgentNodeValidationError(f"Workflow Agent node {binding.node_id} has invalid {ref_context}.") + + upload_file = session.scalar( + select(UploadFile) + .where( + UploadFile.tenant_id == binding.tenant_id, + UploadFile.id == upload_file_id, + ) + .limit(1) + ) + if upload_file is None: + raise WorkflowAgentNodeValidationError( + f"Workflow Agent node {binding.node_id} references missing or out-of-scope {ref_context}." + ) + + @staticmethod + def _find_binding( + *, + session: Session, + tenant_id: str, + app_id: str, + workflow_id: str, + node_id: str, + ) -> WorkflowAgentNodeBinding | None: + return session.scalar( + select(WorkflowAgentNodeBinding) + .where( + WorkflowAgentNodeBinding.tenant_id == tenant_id, + WorkflowAgentNodeBinding.app_id == app_id, + WorkflowAgentNodeBinding.workflow_id == workflow_id, + WorkflowAgentNodeBinding.node_id == node_id, + ) + .limit(1) + ) + + +class _WorkflowGraphTopology: + def __init__(self, *, node_ids: set[str], incoming: Mapping[str, Sequence[str]]) -> None: + self._node_ids = node_ids + self._incoming = incoming + + @classmethod + def from_graph(cls, graph: Mapping[str, Any]) -> _WorkflowGraphTopology: + node_ids = cls._node_ids_from_graph(graph) + incoming: dict[str, list[str]] = defaultdict(list) + edges = graph.get("edges") + if isinstance(edges, list): + for edge in edges: + if not isinstance(edge, Mapping): + continue + source = edge.get("source") + target = edge.get("target") + if isinstance(source, str) and isinstance(target, str): + incoming[target].append(source) + return cls(node_ids=node_ids, incoming=incoming) + + def has_node(self, node_id: str) -> bool: + return node_id in self._node_ids + + def is_upstream(self, *, source_node_id: str, target_node_id: str) -> bool: + if source_node_id == target_node_id: + return False + visited: set[str] = set() + queue: deque[str] = deque(self._incoming.get(target_node_id, ())) + while queue: + candidate = queue.popleft() + if candidate == source_node_id: + return True + if candidate in visited: + continue + visited.add(candidate) + queue.extend(self._incoming.get(candidate, ())) + return False + + @staticmethod + def _node_ids_from_graph(graph: Mapping[str, Any]) -> set[str]: + node_ids: set[str] = set() + nodes = graph.get("nodes") + if not isinstance(nodes, list): + return node_ids + for node in nodes: + if not isinstance(node, Mapping): + continue + node_id = node.get("id") + if isinstance(node_id, str): + node_ids.add(node_id) + return node_ids diff --git a/api/models/agent_config_entities.py b/api/models/agent_config_entities.py index 2044d48e40..c07f51b261 100644 --- a/api/models/agent_config_entities.py +++ b/api/models/agent_config_entities.py @@ -64,6 +64,24 @@ class AgentSoulMemoryConfig(BaseModel): artifacts: list[dict[str, Any]] = Field(default_factory=list) +class AgentSoulModelCredentialRef(BaseModel): + """Reference to model credentials resolved only at runtime.""" + + type: str = Field(min_length=1, max_length=64) + id: str | None = Field(default=None, max_length=255) + provider: str | None = Field(default=None, max_length=255) + + +class AgentSoulModelConfig(BaseModel): + """Stable model selection for Agent runtime without storing secret values.""" + + plugin_id: str = Field(min_length=1, max_length=255) + model_provider: str = Field(min_length=1, max_length=255) + model: str = Field(min_length=1, max_length=255) + credential_ref: AgentSoulModelCredentialRef | None = None + model_settings: dict[str, Any] = Field(default_factory=dict) + + class AppVariableConfig(BaseModel): name: str = Field(min_length=1, max_length=255) type: str = Field(min_length=1, max_length=64) @@ -83,6 +101,7 @@ class AgentSoulConfig(BaseModel): env: AgentSoulEnvConfig = Field(default_factory=AgentSoulEnvConfig) sandbox: AgentSoulSandboxConfig = Field(default_factory=AgentSoulSandboxConfig) memory: AgentSoulMemoryConfig = Field(default_factory=AgentSoulMemoryConfig) + model: AgentSoulModelConfig | None = None app_features: dict[str, Any] = Field(default_factory=dict) app_variables: list[AppVariableConfig] = Field(default_factory=list) misc_legacy: dict[str, Any] = Field(default_factory=dict) diff --git a/api/openapi/markdown/console-swagger.md b/api/openapi/markdown/console-swagger.md index 3e036313f1..cc8c1e1d6b 100644 --- a/api/openapi/markdown/console-swagger.md +++ b/api/openapi/markdown/console-swagger.md @@ -10515,6 +10515,7 @@ Supported icon storage formats for Agent roster entries. | knowledge | [AgentSoulKnowledgeConfig](#agentsoulknowledgeconfig) | | No | | memory | [AgentSoulMemoryConfig](#agentsoulmemoryconfig) | | No | | misc_legacy | object | | No | +| model | [AgentSoulModelConfig](#agentsoulmodelconfig) | | No | | prompt | [AgentSoulPromptConfig](#agentsoulpromptconfig) | | No | | sandbox | [AgentSoulSandboxConfig](#agentsoulsandboxconfig) | | No | | schema_version | integer | | No | @@ -10551,6 +10552,28 @@ Supported icon storage formats for Agent roster entries. | budget | string | | No | | scope | string | | No | +#### AgentSoulModelConfig + +Stable model selection for Agent runtime without storing secret values. + +| Name | Type | Description | Required | +| ---- | ---- | ----------- | -------- | +| credential_ref | [AgentSoulModelCredentialRef](#agentsoulmodelcredentialref) | | No | +| model | string | | Yes | +| model_provider | string | | Yes | +| model_settings | object | | No | +| plugin_id | string | | Yes | + +#### AgentSoulModelCredentialRef + +Reference to model credentials resolved only at runtime. + +| Name | Type | Description | Required | +| ---- | ---- | ----------- | -------- | +| id | string | | No | +| provider | string | | No | +| type | string | | Yes | + #### AgentSoulPromptConfig | Name | Type | Description | Required | diff --git a/api/services/agent/workflow_publish_service.py b/api/services/agent/workflow_publish_service.py new file mode 100644 index 0000000000..06985dc3fa --- /dev/null +++ b/api/services/agent/workflow_publish_service.py @@ -0,0 +1,59 @@ +from __future__ import annotations + +from sqlalchemy import select +from sqlalchemy.orm import Session + +from core.workflow.nodes.agent_v2.validators import WorkflowAgentNodeValidator +from models.agent import WorkflowAgentNodeBinding +from models.agent_config_entities import WorkflowNodeJobConfig +from models.workflow import Workflow + + +class WorkflowAgentPublishService: + """Validate and freeze Workflow Agent v2 bindings during workflow publish.""" + + @classmethod + def validate_agent_nodes_for_publish(cls, *, session: Session, draft_workflow: Workflow) -> None: + WorkflowAgentNodeValidator.validate_published_workflow(session=session, workflow=draft_workflow) + + @classmethod + def validate_agent_nodes_for_draft_sync(cls, *, session: Session, draft_workflow: Workflow) -> None: + WorkflowAgentNodeValidator.validate_draft_workflow(session=session, workflow=draft_workflow) + + @classmethod + def copy_agent_node_bindings_to_published( + cls, + *, + session: Session, + draft_workflow: Workflow, + published_workflow: Workflow, + ) -> None: + node_ids = { + node_id for node_id, _node_data in WorkflowAgentNodeValidator.iter_agent_v2_nodes(draft_workflow.graph_dict) + } + if not node_ids: + return + + bindings = session.scalars( + select(WorkflowAgentNodeBinding).where( + WorkflowAgentNodeBinding.tenant_id == draft_workflow.tenant_id, + WorkflowAgentNodeBinding.app_id == draft_workflow.app_id, + WorkflowAgentNodeBinding.workflow_id == draft_workflow.id, + WorkflowAgentNodeBinding.node_id.in_(node_ids), + ) + ).all() + + for binding in bindings: + copied = WorkflowAgentNodeBinding( + tenant_id=binding.tenant_id, + app_id=binding.app_id, + workflow_id=published_workflow.id, + node_id=binding.node_id, + binding_type=binding.binding_type, + agent_id=binding.agent_id, + current_snapshot_id=binding.current_snapshot_id, + node_job_config=WorkflowNodeJobConfig.model_validate(binding.node_job_config_dict), + created_by=binding.created_by, + updated_by=binding.updated_by, + ) + session.add(copied) diff --git a/api/services/workflow_service.py b/api/services/workflow_service.py index 1b0e10d784..6d9ee97fa4 100644 --- a/api/services/workflow_service.py +++ b/api/services/workflow_service.py @@ -312,6 +312,13 @@ class WorkflowService: workflow.environment_variables = environment_variables workflow.conversation_variables = conversation_variables + from services.agent.workflow_publish_service import WorkflowAgentPublishService + + WorkflowAgentPublishService.validate_agent_nodes_for_draft_sync( + session=cast(Session, db.session), + draft_workflow=workflow, + ) + # commit db session changes db.session.commit() @@ -457,6 +464,13 @@ class WorkflowService: # validate graph structure self.validate_graph_structure(graph=draft_workflow.graph_dict) + from services.agent.workflow_publish_service import WorkflowAgentPublishService + + WorkflowAgentPublishService.validate_agent_nodes_for_publish( + session=session, + draft_workflow=draft_workflow, + ) + # billing check if dify_config.BILLING_ENABLED: limit_info = BillingService.get_info(app_model.tenant_id) @@ -490,6 +504,11 @@ class WorkflowService: # commit db session changes session.add(workflow) + WorkflowAgentPublishService.copy_agent_node_bindings_to_published( + session=session, + draft_workflow=draft_workflow, + published_workflow=workflow, + ) # trigger app workflow events app_published_workflow_was_updated.send(app_model, published_workflow=workflow) diff --git a/api/tests/unit_tests/core/workflow/nodes/agent_v2/test_agent_node.py b/api/tests/unit_tests/core/workflow/nodes/agent_v2/test_agent_node.py new file mode 100644 index 0000000000..5a0cf87688 --- /dev/null +++ b/api/tests/unit_tests/core/workflow/nodes/agent_v2/test_agent_node.py @@ -0,0 +1,155 @@ +from types import SimpleNamespace +from typing import cast + +from clients.agent_backend import ( + AgentBackendRunEventAdapter, + AgentBackendStreamInternalEvent, + FakeAgentBackendRunClient, + FakeAgentBackendScenario, +) +from core.app.entities.app_invoke_entities import DIFY_RUN_CONTEXT_KEY, DifyRunContext, InvokeFrom, UserFrom +from core.workflow.nodes.agent_v2 import DifyAgentNode +from core.workflow.nodes.agent_v2.binding_resolver import WorkflowAgentBindingBundle, WorkflowAgentBindingResolver +from core.workflow.nodes.agent_v2.entities import DifyAgentNodeData +from core.workflow.nodes.agent_v2.output_adapter import WorkflowAgentOutputAdapter +from core.workflow.nodes.agent_v2.runtime_request_builder import WorkflowAgentRuntimeRequestBuilder +from graphon.entities import GraphInitParams +from graphon.enums import BuiltinNodeTypes, WorkflowNodeExecutionMetadataKey, WorkflowNodeExecutionStatus +from graphon.node_events import StreamCompletedEvent +from graphon.runtime import GraphRuntimeState +from graphon.variables.segments import StringSegment +from models.agent import Agent, AgentConfigSnapshot, WorkflowAgentNodeBinding +from models.agent_config_entities import AgentSoulConfig, AgentSoulModelConfig, WorkflowNodeJobConfig + + +class FakeCredentialsProvider: + def fetch(self, provider_name: str, model_name: str) -> dict[str, object]: + assert provider_name == "openai" + assert model_name == "gpt-test" + return {"api_key": "secret-key"} + + +class FakeVariablePool: + def get(self, selector): + values = { + ("sys", "query"): "Summarize the report.", + ("sys", "workflow_run_id"): "workflow-run-1", + ("sys", "conversation_id"): "conversation-1", + ("previous-node", "text"): "Previous result", + } + value = values.get(tuple(selector)) + if value is None: + return None + return StringSegment(value=value) + + def get_by_prefix(self, prefix): + return {} + + +class FakeBindingResolver(WorkflowAgentBindingResolver): + def __init__(self): + self.agent = Agent(id="agent-1", tenant_id="tenant-1", name="Agent") + self.snapshot = AgentConfigSnapshot( + id="snapshot-1", + tenant_id="tenant-1", + agent_id="agent-1", + version=1, + config_snapshot=AgentSoulConfig( + prompt={"system_prompt": "You are careful."}, + model=AgentSoulModelConfig( + plugin_id="langgenius/openai", + model_provider="openai", + model="gpt-test", + ), + ), + ) + self.binding = WorkflowAgentNodeBinding( + id="binding-1", + tenant_id="tenant-1", + app_id="app-1", + workflow_id="workflow-1", + node_id="agent-node", + agent_id="agent-1", + current_snapshot_id="snapshot-1", + node_job_config=WorkflowNodeJobConfig.model_validate( + { + "workflow_prompt": "Use the previous output.", + "previous_node_output_refs": [{"node_id": "previous-node", "output": "text"}], + "declared_outputs": [{"name": "text", "type": "string"}], + } + ), + ) + + def resolve(self, **_kwargs): + return WorkflowAgentBindingBundle(binding=self.binding, agent=self.agent, snapshot=self.snapshot) + + +def _node(*, scenario: FakeAgentBackendScenario = FakeAgentBackendScenario.SUCCESS) -> DifyAgentNode: + graph_init_params = GraphInitParams( + workflow_id="workflow-1", + graph_config={"nodes": [], "edges": []}, + run_context={ + DIFY_RUN_CONTEXT_KEY: DifyRunContext( + tenant_id="tenant-1", + app_id="app-1", + user_id="user-1", + user_from=UserFrom.ACCOUNT, + invoke_from=InvokeFrom.DEBUGGER, + ) + }, + call_depth=0, + ) + return DifyAgentNode( + node_id="agent-node", + data=DifyAgentNodeData.model_validate({"type": BuiltinNodeTypes.AGENT, "version": "2"}), + graph_init_params=graph_init_params, + graph_runtime_state=cast(GraphRuntimeState, SimpleNamespace(variable_pool=FakeVariablePool())), + binding_resolver=FakeBindingResolver(), + runtime_request_builder=WorkflowAgentRuntimeRequestBuilder(credentials_provider=FakeCredentialsProvider()), + agent_backend_client=FakeAgentBackendRunClient(scenario=scenario), + event_adapter=AgentBackendRunEventAdapter(), + output_adapter=WorkflowAgentOutputAdapter(), + ) + + +def test_agent_node_run_maps_successful_agent_backend_run_to_node_result(): + events = list(_node()._run()) + + assert len(events) == 1 + result = cast(StreamCompletedEvent, events[0]).node_run_result + assert result.status == WorkflowNodeExecutionStatus.SUCCEEDED + assert result.outputs == {"text": "hello agent"} + agent_log = result.metadata[WorkflowNodeExecutionMetadataKey.AGENT_LOG] + assert agent_log["agent_backend"]["run_id"] == "fake-run-1" + assert agent_log["agent_backend"]["status"] == "succeeded" + assert result.process_data["agent_id"] == "agent-1" + assert result.inputs["agent_backend_request"]["composition"]["layers"][4]["config"]["credentials"] == "[REDACTED]" + + +def test_agent_node_run_maps_failed_agent_backend_run_to_node_result(): + events = list(_node(scenario=FakeAgentBackendScenario.FAILED)._run()) + + assert len(events) == 1 + result = cast(StreamCompletedEvent, events[0]).node_run_result + assert result.status == WorkflowNodeExecutionStatus.FAILED + assert result.error == "fake failure" + assert result.error_type == "unit_test" + + +def test_agent_node_records_stream_usage_metadata(): + metadata = {"agent_backend": {"run_id": "run-1"}} + + DifyAgentNode._record_stream_metadata( + metadata, + AgentBackendStreamInternalEvent( + run_id="run-1", + source_event_id="1-1", + event_kind="model_response", + data={"usage": {"prompt_tokens": 3, "completion_tokens": 4, "total_tokens": 7}}, + ), + ) + + agent_backend = metadata["agent_backend"] + assert agent_backend["last_stream_event_id"] == "1-1" + assert agent_backend["last_stream_event_kind"] == "model_response" + assert agent_backend["usage"] == {"prompt_tokens": 3, "completion_tokens": 4, "total_tokens": 7} diff --git a/api/tests/unit_tests/core/workflow/nodes/agent_v2/test_binding_resolver.py b/api/tests/unit_tests/core/workflow/nodes/agent_v2/test_binding_resolver.py new file mode 100644 index 0000000000..1f75e4c19d --- /dev/null +++ b/api/tests/unit_tests/core/workflow/nodes/agent_v2/test_binding_resolver.py @@ -0,0 +1,121 @@ +import pytest + +from core.workflow.nodes.agent_v2.binding_resolver import ( + WorkflowAgentBindingError, + WorkflowAgentBindingResolver, +) +from models.agent import Agent, AgentConfigSnapshot, AgentStatus, WorkflowAgentNodeBinding +from models.agent_config_entities import AgentSoulConfig, AgentSoulModelConfig, WorkflowNodeJobConfig + + +class FakeSession: + def __init__(self, scalar_results): + self._scalar_results = list(scalar_results) + self.expunge_calls = [] + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc, tb): + return False + + def scalar(self, _stmt): + if not self._scalar_results: + return None + return self._scalar_results.pop(0) + + def expunge(self, value): + self.expunge_calls.append(value) + + +def _binding() -> WorkflowAgentNodeBinding: + return WorkflowAgentNodeBinding( + id="binding-1", + tenant_id="tenant-1", + app_id="app-1", + workflow_id="workflow-1", + node_id="agent-node", + agent_id="agent-1", + current_snapshot_id="snapshot-1", + node_job_config=WorkflowNodeJobConfig(), + ) + + +def _agent(*, status: AgentStatus = AgentStatus.ACTIVE) -> Agent: + return Agent(id="agent-1", tenant_id="tenant-1", name="Agent", status=status) + + +def _snapshot() -> AgentConfigSnapshot: + return AgentConfigSnapshot( + id="snapshot-1", + tenant_id="tenant-1", + agent_id="agent-1", + version=1, + config_snapshot=AgentSoulConfig( + model=AgentSoulModelConfig( + plugin_id="langgenius/openai", + model_provider="openai", + model="gpt-test", + ) + ), + ) + + +def _resolve() -> dict[str, str]: + return { + "tenant_id": "tenant-1", + "app_id": "app-1", + "workflow_id": "workflow-1", + "node_id": "agent-node", + } + + +def test_binding_resolver_returns_detached_binding_bundle(monkeypatch: pytest.MonkeyPatch): + fake_session = FakeSession([_binding(), _agent(), _snapshot()]) + monkeypatch.setattr( + "core.workflow.nodes.agent_v2.binding_resolver.session_factory.create_session", + lambda: fake_session, + ) + + bundle = WorkflowAgentBindingResolver().resolve(**_resolve()) + + assert bundle.binding.id == "binding-1" + assert bundle.agent.id == "agent-1" + assert bundle.snapshot.id == "snapshot-1" + assert fake_session.expunge_calls == [bundle.binding, bundle.agent, bundle.snapshot] + + +def test_binding_resolver_raises_when_binding_missing(monkeypatch: pytest.MonkeyPatch): + monkeypatch.setattr( + "core.workflow.nodes.agent_v2.binding_resolver.session_factory.create_session", + lambda: FakeSession([None]), + ) + + with pytest.raises(WorkflowAgentBindingError) as exc_info: + WorkflowAgentBindingResolver().resolve(**_resolve()) + + assert exc_info.value.error_code == "agent_binding_not_found" + + +def test_binding_resolver_raises_when_agent_archived(monkeypatch: pytest.MonkeyPatch): + monkeypatch.setattr( + "core.workflow.nodes.agent_v2.binding_resolver.session_factory.create_session", + lambda: FakeSession([_binding(), _agent(status=AgentStatus.ARCHIVED)]), + ) + + with pytest.raises(WorkflowAgentBindingError) as exc_info: + WorkflowAgentBindingResolver().resolve(**_resolve()) + + assert exc_info.value.error_code == "agent_not_available" + + +def test_binding_resolver_raises_when_snapshot_missing(monkeypatch: pytest.MonkeyPatch): + monkeypatch.setattr( + "core.workflow.nodes.agent_v2.binding_resolver.session_factory.create_session", + lambda: FakeSession([_binding(), _agent(), None]), + ) + + with pytest.raises(WorkflowAgentBindingError) as exc_info: + WorkflowAgentBindingResolver().resolve(**_resolve()) + + assert exc_info.value.error_code == "agent_config_snapshot_not_found" diff --git a/api/tests/unit_tests/core/workflow/nodes/agent_v2/test_output_adapter.py b/api/tests/unit_tests/core/workflow/nodes/agent_v2/test_output_adapter.py new file mode 100644 index 0000000000..8b2feb2ad6 --- /dev/null +++ b/api/tests/unit_tests/core/workflow/nodes/agent_v2/test_output_adapter.py @@ -0,0 +1,194 @@ +from agenton.compositor import CompositorSessionSnapshot + +from clients.agent_backend import ( + AgentBackendRunCancelledInternalEvent, + AgentBackendRunFailedInternalEvent, + AgentBackendRunPausedInternalEvent, + AgentBackendRunSucceededInternalEvent, +) +from core.workflow.nodes.agent_v2.output_adapter import WorkflowAgentOutputAdapter +from graphon.enums import WorkflowNodeExecutionMetadataKey, WorkflowNodeExecutionStatus +from graphon.file import FileTransferMethod, FileType +from graphon.variables.segments import ArrayFileSegment, FileSegment + + +def test_success_output_adapter_preserves_dict_output(): + result = WorkflowAgentOutputAdapter().build_success_result( + event=AgentBackendRunSucceededInternalEvent( + run_id="run-1", + source_event_id="2-0", + output={"summary": "ok"}, + session_snapshot=CompositorSessionSnapshot(layers=[]), + ), + inputs={}, + process_data={}, + metadata={"agent_backend": {"run_id": "run-1"}}, + ) + + assert result.status == WorkflowNodeExecutionStatus.SUCCEEDED + assert result.outputs == {"summary": "ok"} + assert result.metadata[WorkflowNodeExecutionMetadataKey.AGENT_LOG]["agent_backend"]["status"] == "succeeded" + assert result.metadata[WorkflowNodeExecutionMetadataKey.AGENT_LOG]["agent_backend"]["session_snapshot"] == { + "layer_count": 0, + } + + +def test_failure_output_adapter_maps_paused_to_unsupported_failure(): + result = WorkflowAgentOutputAdapter().build_failure_result( + event=AgentBackendRunPausedInternalEvent( + run_id="run-1", + source_event_id="2-0", + reason="human", + message=None, + session_snapshot=None, + ), + inputs={}, + process_data={}, + metadata={}, + ) + + assert result.status == WorkflowNodeExecutionStatus.FAILED + assert result.error_type == "agent_backend_paused_unsupported" + + +def test_failure_output_adapter_preserves_backend_failed_reason(): + result = WorkflowAgentOutputAdapter().build_failure_result( + event=AgentBackendRunFailedInternalEvent( + run_id="run-1", + source_event_id="2-0", + error="bad request", + reason="validation", + ), + inputs={}, + process_data={}, + metadata={}, + ) + + assert result.status == WorkflowNodeExecutionStatus.FAILED + assert result.error == "bad request" + assert result.error_type == "validation" + + +def test_success_output_adapter_normalizes_string_and_scalar_outputs(): + adapter = WorkflowAgentOutputAdapter() + string_result = adapter.build_success_result( + event=AgentBackendRunSucceededInternalEvent( + run_id="run-1", + source_event_id="2-0", + output="hello", + session_snapshot=CompositorSessionSnapshot(layers=[]), + ), + inputs={}, + process_data={}, + metadata={}, + ) + scalar_result = adapter.build_success_result( + event=AgentBackendRunSucceededInternalEvent( + run_id="run-2", + source_event_id="2-0", + output=3, + session_snapshot=CompositorSessionSnapshot(layers=[]), + ), + inputs={}, + process_data={}, + metadata={}, + ) + + assert string_result.outputs == {"text": "hello"} + assert scalar_result.outputs == {"result": 3} + + +def test_success_output_adapter_normalizes_file_output_to_file_segments(): + result = WorkflowAgentOutputAdapter().build_success_result( + event=AgentBackendRunSucceededInternalEvent( + run_id="run-1", + source_event_id="2-0", + output={ + "report": { + "file_id": "upload-file-1", + "filename": "report.pdf", + "mime_type": "application/pdf", + "size": 12, + }, + "attachments": [ + { + "tool_file_id": "tool-file-1", + "filename": "chart.png", + "mime_type": "image/png", + } + ], + }, + session_snapshot=CompositorSessionSnapshot(layers=[]), + ), + inputs={}, + process_data={}, + metadata={}, + ) + + report = result.outputs["report"] + assert isinstance(report, FileSegment) + assert report.value.type == FileType.DOCUMENT + assert report.value.transfer_method == FileTransferMethod.LOCAL_FILE + assert report.value.reference == "upload-file-1" + + attachments = result.outputs["attachments"] + assert isinstance(attachments, ArrayFileSegment) + assert attachments.value[0].type == FileType.IMAGE + assert attachments.value[0].transfer_method == FileTransferMethod.TOOL_FILE + assert attachments.value[0].reference == "tool-file-1" + + +def test_success_output_adapter_maps_backend_usage_to_llm_usage_and_metadata(): + result = WorkflowAgentOutputAdapter().build_success_result( + event=AgentBackendRunSucceededInternalEvent( + run_id="run-1", + source_event_id="2-0", + output={"summary": "ok"}, + session_snapshot=CompositorSessionSnapshot(layers=[]), + ), + inputs={}, + process_data={}, + metadata={ + "agent_backend": { + "usage": { + "prompt_tokens": 10, + "completion_tokens": 5, + "total_tokens": 15, + } + } + }, + ) + + assert result.llm_usage.prompt_tokens == 10 + assert result.llm_usage.completion_tokens == 5 + assert result.llm_usage.total_tokens == 15 + assert result.metadata[WorkflowNodeExecutionMetadataKey.TOTAL_TOKENS] == 15 + + +def test_failure_output_adapter_maps_cancelled_to_failure_code(): + result = WorkflowAgentOutputAdapter().build_failure_result( + event=AgentBackendRunCancelledInternalEvent( + run_id="run-1", + source_event_id="2-0", + reason="user_cancelled", + message=None, + ), + inputs={}, + process_data={}, + metadata={}, + ) + + assert result.status == WorkflowNodeExecutionStatus.FAILED + assert result.error_type == "agent_backend_run_cancelled" + + +def test_stream_exhausted_result_is_failed_with_stream_error(): + result = WorkflowAgentOutputAdapter().build_stream_exhausted_result( + inputs={}, + process_data={}, + metadata={"agent_backend": {"run_id": "run-1"}}, + ) + + assert result.status == WorkflowNodeExecutionStatus.FAILED + assert result.error_type == "agent_backend_stream_error" + assert result.metadata[WorkflowNodeExecutionMetadataKey.AGENT_LOG]["agent_backend"]["run_id"] == "run-1" diff --git a/api/tests/unit_tests/core/workflow/nodes/agent_v2/test_runtime_request_builder.py b/api/tests/unit_tests/core/workflow/nodes/agent_v2/test_runtime_request_builder.py new file mode 100644 index 0000000000..d50a61883b --- /dev/null +++ b/api/tests/unit_tests/core/workflow/nodes/agent_v2/test_runtime_request_builder.py @@ -0,0 +1,219 @@ +from dataclasses import replace + +import pytest + +from core.app.entities.app_invoke_entities import DifyRunContext, InvokeFrom, UserFrom +from core.workflow.nodes.agent_v2.runtime_request_builder import ( + WorkflowAgentRuntimeBuildContext, + WorkflowAgentRuntimeRequestBuilder, + WorkflowAgentRuntimeRequestBuildError, +) +from graphon.variables.segments import StringSegment +from models.agent import Agent, AgentConfigSnapshot, WorkflowAgentNodeBinding +from models.agent_config_entities import ( + AgentSoulConfig, + AgentSoulModelConfig, + DeclaredOutputType, + WorkflowNodeJobConfig, +) + + +class FakeCredentialsProvider: + def fetch(self, provider_name: str, model_name: str) -> dict[str, object]: + assert provider_name == "openai" + assert model_name == "gpt-test" + return {"api_key": "secret-key"} + + +class FakeVariablePool: + def get(self, selector): + if list(selector) == ["sys", "query"]: + return StringSegment(value="Summarize the report.") + if list(selector) == ["previous-node", "text"]: + return StringSegment(value="Previous result") + return None + + def get_by_prefix(self, prefix): + return {} + + +def _context() -> WorkflowAgentRuntimeBuildContext: + agent = Agent(id="agent-1", tenant_id="tenant-1", name="Agent") + snapshot = AgentConfigSnapshot( + id="snapshot-1", + tenant_id="tenant-1", + agent_id="agent-1", + version=1, + config_snapshot=AgentSoulConfig( + prompt={"system_prompt": "You are careful."}, + model=AgentSoulModelConfig( + plugin_id="langgenius/openai", + model_provider="openai", + model="gpt-test", + model_settings={"temperature": 0}, + ), + ), + ) + binding = WorkflowAgentNodeBinding( + id="binding-1", + tenant_id="tenant-1", + app_id="app-1", + workflow_id="workflow-1", + node_id="agent-node", + agent_id="agent-1", + current_snapshot_id="snapshot-1", + node_job_config=WorkflowNodeJobConfig.model_validate( + { + "workflow_prompt": "Use the previous output.", + "previous_node_output_refs": [{"node_id": "previous-node", "output": "text"}], + "declared_outputs": [{"name": "summary", "type": "string"}], + } + ), + ) + return WorkflowAgentRuntimeBuildContext( + dify_context=DifyRunContext( + tenant_id="tenant-1", + app_id="app-1", + user_id="user-1", + user_from=UserFrom.ACCOUNT, + invoke_from=InvokeFrom.DEBUGGER, + ), + workflow_id="workflow-1", + workflow_run_id="run-1", + node_id="agent-node", + node_execution_id="node-exec-1", + variable_pool=FakeVariablePool(), + binding=binding, + agent=agent, + snapshot=snapshot, + ) + + +def test_builds_create_run_request_from_agent_soul_and_node_job(): + result = WorkflowAgentRuntimeRequestBuilder(credentials_provider=FakeCredentialsProvider()).build(_context()) + + dumped = result.request.model_dump(mode="json") + assert dumped["execution_context"]["agent_id"] == "agent-1" + assert dumped["execution_context"]["agent_config_version_id"] == "snapshot-1" + assert dumped["execution_context"]["invoke_from"] == "single_step" + assert dumped["idempotency_key"] == "run-1:node-exec-1" + assert dumped["composition"]["layers"][0]["config"]["prefix"] == "You are careful." + assert dumped["composition"]["layers"][1]["config"]["prefix"] == "Use the previous output." + assert "Previous result" in dumped["composition"]["layers"][2]["config"]["user"] + assert dumped["composition"]["layers"][-1]["config"]["json_schema"]["properties"]["summary"]["type"] == "string" + assert result.redacted_request["composition"]["layers"][4]["config"]["credentials"] == "[REDACTED]" + + +def test_builds_workflow_run_request_with_file_output_schema_and_reserved_metadata(): + context = _context() + snapshot = AgentConfigSnapshot( + id="snapshot-1", + tenant_id="tenant-1", + agent_id="agent-1", + version=1, + config_snapshot=AgentSoulConfig( + prompt={"system_prompt": "You are careful."}, + model=AgentSoulModelConfig( + plugin_id="langgenius/openai", + model_provider="openai", + model="gpt-test", + model_settings={"temperature": 0.2}, + ), + tools={"cli_tools": [{"name": "pytest"}]}, + ), + ) + binding = WorkflowAgentNodeBinding( + id="binding-1", + tenant_id="tenant-1", + app_id="app-1", + workflow_id="workflow-1", + node_id="agent-node", + agent_id="agent-1", + current_snapshot_id="snapshot-1", + node_job_config=WorkflowNodeJobConfig.model_validate( + { + "declared_outputs": [ + {"name": "report", "type": DeclaredOutputType.FILE}, + {"name": "confidence", "type": DeclaredOutputType.NUMBER, "required": False}, + ], + } + ), + ) + dify_context = context.dify_context.model_copy(update={"invoke_from": InvokeFrom.SERVICE_API}) + context = replace(context, dify_context=dify_context, workflow_run_id=None, snapshot=snapshot, binding=binding) + + result = WorkflowAgentRuntimeRequestBuilder(credentials_provider=FakeCredentialsProvider()).build(context) + + dumped = result.request.model_dump(mode="json") + assert dumped["execution_context"]["invoke_from"] == "workflow_run" + assert dumped["idempotency_key"] == "node-exec-1" + output_schema = dumped["composition"]["layers"][-1]["config"]["json_schema"] + assert output_schema["properties"]["report"]["properties"]["file_id"]["type"] == "string" + assert output_schema["properties"]["confidence"]["type"] == "number" + assert output_schema["required"] == ["report"] + assert dumped["composition"]["layers"][4]["config"]["model_settings"] == {"temperature": 0.2} + assert result.metadata["runtime_support"]["reserved_status"]["tools"] == "reserved_not_executed" + assert result.metadata["runtime_support"]["unsupported_runtime_warnings"][0]["section"] == "agent_soul.tools" + + +def test_requires_agent_soul_model_config(): + context = _context() + snapshot = AgentConfigSnapshot( + id="snapshot-1", + tenant_id="tenant-1", + agent_id="agent-1", + version=1, + config_snapshot=AgentSoulConfig(), + ) + context = replace(context, snapshot=snapshot) + + with pytest.raises(WorkflowAgentRuntimeRequestBuildError, match="Agent Soul model"): + WorkflowAgentRuntimeRequestBuilder(credentials_provider=FakeCredentialsProvider()).build(context) + + +def test_missing_previous_node_output_fails_request_build(): + context = _context() + binding = WorkflowAgentNodeBinding( + id="binding-1", + tenant_id="tenant-1", + app_id="app-1", + workflow_id="workflow-1", + node_id="agent-node", + agent_id="agent-1", + current_snapshot_id="snapshot-1", + node_job_config=WorkflowNodeJobConfig.model_validate( + { + "previous_node_output_refs": [{"node_id": "missing-node", "output": "text"}], + } + ), + ) + context = replace(context, binding=binding) + + with pytest.raises(WorkflowAgentRuntimeRequestBuildError) as exc_info: + WorkflowAgentRuntimeRequestBuilder(credentials_provider=FakeCredentialsProvider()).build(context) + + assert exc_info.value.error_code == "missing_previous_node_output" + + +def test_invalid_previous_node_output_ref_fails_request_build(): + context = _context() + binding = WorkflowAgentNodeBinding( + id="binding-1", + tenant_id="tenant-1", + app_id="app-1", + workflow_id="workflow-1", + node_id="agent-node", + agent_id="agent-1", + current_snapshot_id="snapshot-1", + node_job_config=WorkflowNodeJobConfig.model_validate( + { + "previous_node_output_refs": [{"selector": ["previous-node", 1]}], + } + ), + ) + context = replace(context, binding=binding) + + with pytest.raises(WorkflowAgentRuntimeRequestBuildError) as exc_info: + WorkflowAgentRuntimeRequestBuilder(credentials_provider=FakeCredentialsProvider()).build(context) + + assert exc_info.value.error_code == "invalid_previous_node_output_ref" diff --git a/api/tests/unit_tests/core/workflow/nodes/agent_v2/test_validators.py b/api/tests/unit_tests/core/workflow/nodes/agent_v2/test_validators.py new file mode 100644 index 0000000000..99d1e91c44 --- /dev/null +++ b/api/tests/unit_tests/core/workflow/nodes/agent_v2/test_validators.py @@ -0,0 +1,271 @@ +import json +from types import SimpleNamespace +from unittest.mock import Mock + +import pytest + +from core.workflow.nodes.agent_v2.validators import ( + WorkflowAgentNodeValidationError, + WorkflowAgentNodeValidator, +) +from models.agent import Agent, AgentConfigSnapshot, AgentStatus, WorkflowAgentNodeBinding +from models.agent_config_entities import AgentSoulConfig, AgentSoulModelConfig, WorkflowNodeJobConfig +from models.workflow import Workflow + + +def _workflow(graph: dict) -> Workflow: + return Workflow( + id="workflow-1", + tenant_id="tenant-1", + app_id="app-1", + graph=json.dumps(graph), + ) + + +def _binding(node_job: WorkflowNodeJobConfig) -> WorkflowAgentNodeBinding: + return WorkflowAgentNodeBinding( + id="binding-1", + tenant_id="tenant-1", + app_id="app-1", + workflow_id="workflow-1", + node_id="agent-node", + agent_id="agent-1", + current_snapshot_id="snapshot-1", + node_job_config=node_job, + ) + + +def _agent() -> Agent: + return Agent(id="agent-1", tenant_id="tenant-1", name="Agent", status=AgentStatus.ACTIVE) + + +def _snapshot() -> AgentConfigSnapshot: + return AgentConfigSnapshot( + id="snapshot-1", + tenant_id="tenant-1", + agent_id="agent-1", + version=1, + config_snapshot=AgentSoulConfig( + model=AgentSoulModelConfig( + plugin_id="langgenius/openai", + model_provider="openai", + model="gpt-test", + ) + ), + ) + + +def _graph(edges: list[dict]) -> dict: + return { + "nodes": [ + {"id": "start", "data": {"type": "start"}}, + {"id": "previous-node", "data": {"type": "llm"}}, + {"id": "agent-node", "data": {"type": "agent", "version": "2"}}, + {"id": "later-node", "data": {"type": "llm"}}, + ], + "edges": edges, + } + + +def test_publish_validation_accepts_upstream_previous_output_ref(): + node_job = WorkflowNodeJobConfig.model_validate( + {"previous_node_output_refs": [{"node_id": "previous-node", "output": "text"}]} + ) + session = Mock() + session.scalar.side_effect = [_binding(node_job), _agent(), _snapshot()] + + WorkflowAgentNodeValidator.validate_published_workflow( + session=session, + workflow=_workflow( + _graph( + [ + {"source": "start", "target": "previous-node"}, + {"source": "previous-node", "target": "agent-node"}, + ] + ) + ), + ) + + +def test_publish_validation_rejects_non_upstream_previous_output_ref(): + node_job = WorkflowNodeJobConfig.model_validate( + {"previous_node_output_refs": [{"node_id": "later-node", "output": "text"}]} + ) + session = Mock() + session.scalar.side_effect = [_binding(node_job), _agent(), _snapshot()] + + with pytest.raises(WorkflowAgentNodeValidationError, match="non-upstream"): + WorkflowAgentNodeValidator.validate_published_workflow( + session=session, + workflow=_workflow( + _graph( + [ + {"source": "start", "target": "agent-node"}, + {"source": "agent-node", "target": "later-node"}, + ] + ) + ), + ) + + +def test_draft_validation_allows_unbound_agent_node(): + session = Mock() + session.scalar.return_value = None + + WorkflowAgentNodeValidator.validate_draft_workflow( + session=session, + workflow=_workflow(_graph([{"source": "start", "target": "agent-node"}])), + ) + + +def test_publish_validation_requires_binding(): + session = Mock() + session.scalar.return_value = None + + with pytest.raises(WorkflowAgentNodeValidationError, match="requires a binding"): + WorkflowAgentNodeValidator.validate_published_workflow( + session=session, + workflow=_workflow(_graph([{"source": "start", "target": "agent-node"}])), + ) + + +def test_publish_validation_rejects_duplicate_output_names(): + node_job = WorkflowNodeJobConfig.model_validate( + { + "declared_outputs": [ + {"name": "summary", "type": "string"}, + {"name": "summary", "type": "number"}, + ] + } + ) + session = Mock() + session.scalar.side_effect = [_binding(node_job), _agent(), _snapshot()] + + with pytest.raises(WorkflowAgentNodeValidationError, match="duplicate output name"): + WorkflowAgentNodeValidator.validate_published_workflow( + session=session, + workflow=_workflow(_graph([{"source": "start", "target": "agent-node"}])), + ) + + +def test_publish_validation_rejects_missing_agent_soul_model(): + node_job = WorkflowNodeJobConfig.model_validate({}) + snapshot = AgentConfigSnapshot( + id="snapshot-1", + tenant_id="tenant-1", + agent_id="agent-1", + version=1, + config_snapshot=AgentSoulConfig(), + ) + session = Mock() + session.scalar.side_effect = [_binding(node_job), _agent(), snapshot] + + with pytest.raises(WorkflowAgentNodeValidationError, match="requires Agent Soul model"): + WorkflowAgentNodeValidator.validate_published_workflow( + session=session, + workflow=_workflow(_graph([{"source": "start", "target": "agent-node"}])), + ) + + +def test_publish_validation_rejects_missing_previous_node(): + node_job = WorkflowNodeJobConfig.model_validate( + {"previous_node_output_refs": [{"node_id": "missing-node", "output": "text"}]} + ) + session = Mock() + session.scalar.side_effect = [_binding(node_job), _agent(), _snapshot()] + + with pytest.raises(WorkflowAgentNodeValidationError, match="references missing previous node"): + WorkflowAgentNodeValidator.validate_published_workflow( + session=session, + workflow=_workflow(_graph([{"source": "start", "target": "agent-node"}])), + ) + + +def test_publish_validation_rejects_self_previous_output_ref(): + node_job = WorkflowNodeJobConfig.model_validate( + {"previous_node_output_refs": [{"node_id": "agent-node", "output": "text"}]} + ) + session = Mock() + session.scalar.side_effect = [_binding(node_job), _agent(), _snapshot()] + + with pytest.raises(WorkflowAgentNodeValidationError, match="non-upstream"): + WorkflowAgentNodeValidator.validate_published_workflow( + session=session, + workflow=_workflow(_graph([{"source": "start", "target": "agent-node"}])), + ) + + +def test_publish_validation_rejects_locked_agent_soul_override_in_metadata(): + node_job = WorkflowNodeJobConfig.model_validate({"metadata": {"agent_soul": {"tools": []}}}) + session = Mock() + session.scalar.side_effect = [_binding(node_job), _agent(), _snapshot()] + + with pytest.raises(WorkflowAgentNodeValidationError, match="cannot override locked Agent Soul fields"): + WorkflowAgentNodeValidator.validate_published_workflow( + session=session, + workflow=_workflow(_graph([{"source": "start", "target": "agent-node"}])), + ) + + +def test_publish_validation_rejects_invalid_human_contact_ref(): + node_job = WorkflowNodeJobConfig.model_validate({"human_contacts": [{"channel": "slack"}]}) + session = Mock() + session.scalar.side_effect = [_binding(node_job), _agent(), _snapshot()] + + with pytest.raises(WorkflowAgentNodeValidationError, match="invalid human contact ref"): + WorkflowAgentNodeValidator.validate_published_workflow( + session=session, + workflow=_workflow(_graph([{"source": "start", "target": "agent-node"}])), + ) + + +def test_publish_validation_rejects_out_of_scope_human_contact_ref(): + node_job = WorkflowNodeJobConfig.model_validate( + {"human_contacts": [{"contact_id": "human-1", "tenant_id": "other-tenant", "channel": "slack"}]} + ) + session = Mock() + session.scalar.side_effect = [_binding(node_job), _agent(), _snapshot()] + + with pytest.raises(WorkflowAgentNodeValidationError, match="out-of-scope human contact"): + WorkflowAgentNodeValidator.validate_published_workflow( + session=session, + workflow=_workflow(_graph([{"source": "start", "target": "agent-node"}])), + ) + + +def test_publish_validation_accepts_tenant_scoped_file_ref(): + node_job = WorkflowNodeJobConfig.model_validate( + { + "declared_outputs": [ + { + "name": "report", + "type": "file", + "checks": [{"type": "benchmark", "benchmark_file_ref": {"upload_file_id": "file-1"}}], + } + ] + } + ) + session = Mock() + session.scalar.side_effect = [ + _binding(node_job), + _agent(), + _snapshot(), + SimpleNamespace(id="file-1", tenant_id="tenant-1"), + ] + + WorkflowAgentNodeValidator.validate_published_workflow( + session=session, + workflow=_workflow(_graph([{"source": "start", "target": "agent-node"}])), + ) + + +def test_publish_validation_rejects_missing_file_ref(): + node_job = WorkflowNodeJobConfig.model_validate({"metadata": {"file_refs": [{"upload_file_id": "missing-file"}]}}) + session = Mock() + session.scalar.side_effect = [_binding(node_job), _agent(), _snapshot(), None] + + with pytest.raises(WorkflowAgentNodeValidationError, match="missing or out-of-scope metadata file ref"): + WorkflowAgentNodeValidator.validate_published_workflow( + session=session, + workflow=_workflow(_graph([{"source": "start", "target": "agent-node"}])), + ) diff --git a/api/tests/unit_tests/core/workflow/test_node_mapping_bootstrap.py b/api/tests/unit_tests/core/workflow/test_node_mapping_bootstrap.py index 2dd3953d9a..c7ff7e5a34 100644 --- a/api/tests/unit_tests/core/workflow/test_node_mapping_bootstrap.py +++ b/api/tests/unit_tests/core/workflow/test_node_mapping_bootstrap.py @@ -24,6 +24,7 @@ def test_moved_core_nodes_resolve_after_importing_production_entrypoints(): from core.workflow import workflow_entry from core.workflow.nodes.knowledge_index import KNOWLEDGE_INDEX_NODE_TYPE from core.workflow.node_factory import DifyNodeFactory, NODE_TYPE_CLASSES_MAPPING + from core.workflow.nodes.agent_v2 import DifyAgentNode from graphon.enums import BuiltinNodeTypes from services import workflow_service from services.rag_pipeline import rag_pipeline @@ -40,6 +41,11 @@ def test_moved_core_nodes_resolve_after_importing_production_entrypoints(): assert node_type in NODE_TYPE_CLASSES_MAPPING, node_type resolved = DifyNodeFactory._resolve_node_class(node_type=node_type, node_version="1") assert resolved.__module__.startswith("core.workflow.nodes."), resolved.__module__ + + assert DifyNodeFactory._resolve_node_class( + node_type=BuiltinNodeTypes.AGENT, + node_version="2", + ) is DifyAgentNode """ ) completed = subprocess.run( diff --git a/api/tests/unit_tests/services/agent/test_agent_composer_entities.py b/api/tests/unit_tests/services/agent/test_agent_composer_entities.py index 4cbc9cad8c..af7ae36644 100644 --- a/api/tests/unit_tests/services/agent/test_agent_composer_entities.py +++ b/api/tests/unit_tests/services/agent/test_agent_composer_entities.py @@ -1,6 +1,6 @@ import pytest -from models.agent_config_entities import AgentKnowledgeQueryMode, DeclaredOutputType +from models.agent_config_entities import AgentKnowledgeQueryMode, AgentSoulModelConfig, DeclaredOutputType from services.agent.composer_service import AgentComposerService from services.agent.composer_validator import ComposerConfigValidator from services.agent.errors import AgentSoulLockedError, PlaintextSecretNotAllowedError @@ -88,6 +88,22 @@ def test_knowledge_query_mode_uses_stable_backend_enums(): assert config.knowledge.query_mode == AgentKnowledgeQueryMode.GENERATED_QUERY +def test_agent_soul_model_config_is_first_class_without_credentials(): + config = AgentSoulConfig( + model=AgentSoulModelConfig( + plugin_id="langgenius/openai", + model_provider="openai", + model="gpt-test", + credential_ref={"type": "provider", "id": "credential-1"}, + model_settings={"temperature": 0}, + ) + ) + + dumped = config.model_dump(mode="json") + assert dumped["model"]["plugin_id"] == "langgenius/openai" + assert dumped["model"]["credential_ref"] == {"type": "provider", "id": "credential-1", "provider": None} + + def test_declared_outputs_support_file_check_and_failure_strategy(): node_job = WorkflowNodeJobConfig.model_validate( { diff --git a/packages/contracts/generated/api/console/agents/types.gen.ts b/packages/contracts/generated/api/console/agents/types.gen.ts index 10a784b9dd..8a4540f933 100644 --- a/packages/contracts/generated/api/console/agents/types.gen.ts +++ b/packages/contracts/generated/api/console/agents/types.gen.ts @@ -34,6 +34,7 @@ export type AgentSoulConfig = { misc_legacy?: { [key: string]: unknown } + model?: AgentSoulModelConfig prompt?: AgentSoulPromptConfig sandbox?: AgentSoulSandboxConfig schema_version?: number @@ -86,6 +87,16 @@ export type AgentSoulMemoryConfig = { scope?: string | null } +export type AgentSoulModelConfig = { + credential_ref?: AgentSoulModelCredentialRef + model: string + model_provider: string + model_settings?: { + [key: string]: unknown + } + plugin_id: string +} + export type AgentSoulPromptConfig = { system_prompt?: string } @@ -117,6 +128,12 @@ export type AgentSoulToolsConfig = { export type AgentKnowledgeQueryMode = 'generated_query' | 'user_query' +export type AgentSoulModelCredentialRef = { + id?: string | null + provider?: string | null + type: string +} + export type GetAgentsData = { body?: never path?: never diff --git a/packages/contracts/generated/api/console/agents/zod.gen.ts b/packages/contracts/generated/api/console/agents/zod.gen.ts index dd9cabdffd..f84b5fc411 100644 --- a/packages/contracts/generated/api/console/agents/zod.gen.ts +++ b/packages/contracts/generated/api/console/agents/zod.gen.ts @@ -100,6 +100,30 @@ export const zAgentSoulKnowledgeConfig = z.object({ query_mode: zAgentKnowledgeQueryMode.optional(), }) +/** + * AgentSoulModelCredentialRef + * + * Reference to model credentials resolved only at runtime. + */ +export const zAgentSoulModelCredentialRef = z.object({ + id: z.string().max(255).nullish(), + provider: z.string().max(255).nullish(), + type: z.string().min(1).max(64), +}) + +/** + * AgentSoulModelConfig + * + * Stable model selection for Agent runtime without storing secret values. + */ +export const zAgentSoulModelConfig = z.object({ + credential_ref: zAgentSoulModelCredentialRef.optional(), + model: z.string().min(1).max(255), + model_provider: z.string().min(1).max(255), + model_settings: z.record(z.string(), z.unknown()).optional(), + plugin_id: z.string().min(1).max(255), +}) + /** * AgentSoulConfig */ @@ -111,6 +135,7 @@ export const zAgentSoulConfig = z.object({ knowledge: zAgentSoulKnowledgeConfig.optional(), memory: zAgentSoulMemoryConfig.optional(), misc_legacy: z.record(z.string(), z.unknown()).optional(), + model: zAgentSoulModelConfig.optional(), prompt: zAgentSoulPromptConfig.optional(), sandbox: zAgentSoulSandboxConfig.optional(), schema_version: z.int().optional().default(1), diff --git a/packages/contracts/generated/api/console/apps/types.gen.ts b/packages/contracts/generated/api/console/apps/types.gen.ts index 55bf4e4722..5a529ea49d 100644 --- a/packages/contracts/generated/api/console/apps/types.gen.ts +++ b/packages/contracts/generated/api/console/apps/types.gen.ts @@ -973,6 +973,7 @@ export type AgentSoulConfig = { misc_legacy?: { [key: string]: unknown } + model?: AgentSoulModelConfig prompt?: AgentSoulPromptConfig sandbox?: AgentSoulSandboxConfig schema_version?: number @@ -1395,6 +1396,16 @@ export type AgentSoulMemoryConfig = { scope?: string | null } +export type AgentSoulModelConfig = { + credential_ref?: AgentSoulModelCredentialRef + model: string + model_provider: string + model_settings?: { + [key: string]: unknown + } + plugin_id: string +} + export type AgentSoulPromptConfig = { system_prompt?: string } @@ -1507,6 +1518,12 @@ export type WorkflowRunForArchivedLogResponse = { export type AgentKnowledgeQueryMode = 'generated_query' | 'user_query' +export type AgentSoulModelCredentialRef = { + id?: string | null + provider?: string | null + type: string +} + export type DeclaredOutputCheckConfig = { benchmark_file_ref?: { [key: string]: unknown diff --git a/packages/contracts/generated/api/console/apps/zod.gen.ts b/packages/contracts/generated/api/console/apps/zod.gen.ts index cd7175f388..5c5b0fa213 100644 --- a/packages/contracts/generated/api/console/apps/zod.gen.ts +++ b/packages/contracts/generated/api/console/apps/zod.gen.ts @@ -1747,6 +1747,30 @@ export const zAgentSoulKnowledgeConfig = z.object({ query_mode: zAgentKnowledgeQueryMode.optional(), }) +/** + * AgentSoulModelCredentialRef + * + * Reference to model credentials resolved only at runtime. + */ +export const zAgentSoulModelCredentialRef = z.object({ + id: z.string().max(255).nullish(), + provider: z.string().max(255).nullish(), + type: z.string().min(1).max(64), +}) + +/** + * AgentSoulModelConfig + * + * Stable model selection for Agent runtime without storing secret values. + */ +export const zAgentSoulModelConfig = z.object({ + credential_ref: zAgentSoulModelCredentialRef.optional(), + model: z.string().min(1).max(255), + model_provider: z.string().min(1).max(255), + model_settings: z.record(z.string(), z.unknown()).optional(), + plugin_id: z.string().min(1).max(255), +}) + /** * AgentSoulConfig */ @@ -1758,6 +1782,7 @@ export const zAgentSoulConfig = z.object({ knowledge: zAgentSoulKnowledgeConfig.optional(), memory: zAgentSoulMemoryConfig.optional(), misc_legacy: z.record(z.string(), z.unknown()).optional(), + model: zAgentSoulModelConfig.optional(), prompt: zAgentSoulPromptConfig.optional(), sandbox: zAgentSoulSandboxConfig.optional(), schema_version: z.int().optional().default(1),