feat: wire workflow agent node runtime (#36437)

Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
This commit is contained in:
zyssyz123 2026-05-20 20:39:45 +08:00 committed by GitHub
parent 56d4d54c16
commit 60cd346fa6
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
27 changed files with 2626 additions and 7 deletions

View File

@ -49,6 +49,7 @@ class AgentBackendModelConfig(BaseModel):
model: str
user_id: str | None = None
credentials: dict[str, DifyPluginCredentialValue] = Field(default_factory=dict)
model_settings: dict[str, JsonValue] = Field(default_factory=dict)
model_config: ClassVar[ConfigDict] = ConfigDict(extra="forbid")
@ -138,6 +139,7 @@ class AgentBackendRunRequestBuilder:
model_provider=run_input.model.model_provider,
model=run_input.model.model,
credentials=run_input.model.credentials,
model_settings=run_input.model.model_settings or None,
),
),
]

View File

@ -1,3 +1,4 @@
from configs.extra.agent_backend_config import AgentBackendConfig
from configs.extra.archive_config import ArchiveStorageConfig
from configs.extra.notion_config import NotionConfig
from configs.extra.sentry_config import SentryConfig
@ -5,6 +6,7 @@ from configs.extra.sentry_config import SentryConfig
class ExtraServiceConfig(
# place the configs in alphabet order
AgentBackendConfig,
ArchiveStorageConfig,
NotionConfig,
SentryConfig,

View File

@ -0,0 +1,23 @@
from pydantic import Field
from pydantic_settings import BaseSettings
class AgentBackendConfig(BaseSettings):
"""
Configuration settings for the Agent backend runtime integration.
"""
AGENT_BACKEND_BASE_URL: str | None = Field(
description="Base URL for the Dify Agent backend service.",
default=None,
)
AGENT_BACKEND_USE_FAKE: bool = Field(
description="Use the deterministic in-process fake Agent backend client.",
default=False,
)
AGENT_BACKEND_FAKE_SCENARIO: str = Field(
description="Scenario used by the fake Agent backend client.",
default="success",
)

View File

@ -37,6 +37,10 @@ from core.workflow.nodes.agent.plugin_strategy_adapter import (
PluginAgentStrategyResolver,
)
from core.workflow.nodes.agent.runtime_support import AgentRuntimeSupport
from core.workflow.nodes.agent_v2 import DifyAgentNode
from core.workflow.nodes.agent_v2.binding_resolver import WorkflowAgentBindingResolver
from core.workflow.nodes.agent_v2.output_adapter import WorkflowAgentOutputAdapter
from core.workflow.nodes.agent_v2.runtime_request_builder import WorkflowAgentRuntimeRequestBuilder
from core.workflow.system_variables import SystemVariableKey, get_system_text, system_variable_selector
from core.workflow.template_rendering import CodeExecutorJinja2TemplateRenderer
from graphon.entities.base_node_data import BaseNodeData
@ -438,12 +442,7 @@ class DifyNodeFactory(NodeFactory):
"tool_file_manager": self._bound_tool_file_manager_factory(),
"runtime": self._tool_runtime,
},
BuiltinNodeTypes.AGENT: lambda: {
"strategy_resolver": self._agent_strategy_resolver,
"presentation_provider": self._agent_strategy_presentation_provider,
"runtime_support": self._agent_runtime_support,
"message_transformer": self._agent_message_transformer,
},
BuiltinNodeTypes.AGENT: lambda: self._build_agent_node_init_kwargs(node_class=node_class),
}
node_init_kwargs = node_init_kwargs_factories.get(node_type, lambda: {})()
constructor_node_data = resolved_node_data.model_dump(mode="python", by_alias=True)
@ -469,6 +468,32 @@ class DifyNodeFactory(NodeFactory):
def _resolve_node_class(*, node_type: NodeType, node_version: str) -> type[Node]:
return resolve_workflow_node_class(node_type=node_type, node_version=node_version)
def _build_agent_node_init_kwargs(self, *, node_class: type[Node]) -> dict[str, object]:
if issubclass(node_class, DifyAgentNode):
from clients.agent_backend import AgentBackendRunEventAdapter, AgentBackendRunRequestBuilder
from clients.agent_backend.factory import create_agent_backend_run_client
return {
"binding_resolver": WorkflowAgentBindingResolver(),
"runtime_request_builder": WorkflowAgentRuntimeRequestBuilder(
credentials_provider=self._llm_credentials_provider,
request_builder=AgentBackendRunRequestBuilder(),
),
"agent_backend_client": create_agent_backend_run_client(
base_url=dify_config.AGENT_BACKEND_BASE_URL,
use_fake=dify_config.AGENT_BACKEND_USE_FAKE,
fake_scenario=dify_config.AGENT_BACKEND_FAKE_SCENARIO,
),
"event_adapter": AgentBackendRunEventAdapter(),
"output_adapter": WorkflowAgentOutputAdapter(),
}
return {
"strategy_resolver": self._agent_strategy_resolver,
"presentation_provider": self._agent_strategy_presentation_provider,
"runtime_support": self._agent_runtime_support,
"message_transformer": self._agent_message_transformer,
}
def _build_llm_compatible_node_init_kwargs(
self,
*,

View File

@ -0,0 +1,4 @@
from .agent_node import DifyAgentNode
from .entities import DifyAgentNodeData
__all__ = ["DifyAgentNode", "DifyAgentNodeData"]

View File

@ -0,0 +1,281 @@
from __future__ import annotations
from collections.abc import Generator, Mapping, Sequence
from typing import TYPE_CHECKING, Any
from clients.agent_backend import (
AgentBackendError,
AgentBackendHTTPError,
AgentBackendInternalEventType,
AgentBackendRunClient,
AgentBackendRunEventAdapter,
AgentBackendRunFailedInternalEvent,
AgentBackendRunSucceededInternalEvent,
AgentBackendStreamError,
AgentBackendStreamInternalEvent,
AgentBackendTransportError,
AgentBackendValidationError,
)
from core.app.entities.app_invoke_entities import DIFY_RUN_CONTEXT_KEY, DifyRunContext
from core.workflow.system_variables import SystemVariableKey, get_system_text
from graphon.enums import BuiltinNodeTypes, WorkflowNodeExecutionMetadataKey, WorkflowNodeExecutionStatus
from graphon.node_events import NodeEventBase, NodeRunResult, StreamCompletedEvent
from graphon.nodes.base.node import Node
from .binding_resolver import WorkflowAgentBindingError, WorkflowAgentBindingResolver
from .entities import DifyAgentNodeData
from .output_adapter import WorkflowAgentOutputAdapter
from .runtime_request_builder import (
WorkflowAgentRuntimeBuildContext,
WorkflowAgentRuntimeRequestBuilder,
WorkflowAgentRuntimeRequestBuildError,
)
if TYPE_CHECKING:
from graphon.entities import GraphInitParams
from graphon.runtime import GraphRuntimeState
class DifyAgentNode(Node[DifyAgentNodeData]):
node_type = BuiltinNodeTypes.AGENT
def __init__(
self,
node_id: str,
data: DifyAgentNodeData,
*,
graph_init_params: GraphInitParams,
graph_runtime_state: GraphRuntimeState,
binding_resolver: WorkflowAgentBindingResolver,
runtime_request_builder: WorkflowAgentRuntimeRequestBuilder,
agent_backend_client: AgentBackendRunClient,
event_adapter: AgentBackendRunEventAdapter,
output_adapter: WorkflowAgentOutputAdapter,
) -> None:
super().__init__(
node_id=node_id,
data=data,
graph_init_params=graph_init_params,
graph_runtime_state=graph_runtime_state,
)
self._binding_resolver = binding_resolver
self._runtime_request_builder = runtime_request_builder
self._agent_backend_client = agent_backend_client
self._event_adapter = event_adapter
self._output_adapter = output_adapter
@classmethod
def version(cls) -> str:
return "2"
def populate_start_event(self, event) -> None:
event.extras["agent_node"] = {"version": "2", "agent_node_kind": self.node_data.agent_node_kind}
def _run(self) -> Generator[NodeEventBase, None, None]:
dify_ctx = DifyRunContext.model_validate(self.require_run_context_value(DIFY_RUN_CONTEXT_KEY))
workflow_id = self.graph_init_params.workflow_id
workflow_run_id = get_system_text(
self.graph_runtime_state.variable_pool,
SystemVariableKey.WORKFLOW_EXECUTION_ID,
)
inputs: dict[str, Any] = {}
process_data: dict[str, Any] = {}
metadata: dict[str, Any] = {
"agent_backend": {
"status": "not_started",
}
}
try:
bundle = self._binding_resolver.resolve(
tenant_id=dify_ctx.tenant_id,
app_id=dify_ctx.app_id,
workflow_id=workflow_id,
node_id=self._node_id,
)
runtime_request = self._runtime_request_builder.build(
WorkflowAgentRuntimeBuildContext(
dify_context=dify_ctx,
workflow_id=workflow_id,
workflow_run_id=workflow_run_id,
node_id=self._node_id,
node_execution_id=self.id,
variable_pool=self.graph_runtime_state.variable_pool,
binding=bundle.binding,
agent=bundle.agent,
snapshot=bundle.snapshot,
)
)
inputs = {"agent_backend_request": runtime_request.redacted_request}
metadata = dict(runtime_request.metadata)
process_data = {
"agent_id": bundle.agent.id,
"agent_config_snapshot_id": bundle.snapshot.id,
"binding_id": bundle.binding.id,
}
create_response = self._agent_backend_client.create_run(runtime_request.request)
metadata["agent_backend"] = {
**dict(metadata.get("agent_backend") or {}),
"run_id": create_response.run_id,
"status": create_response.status,
}
except WorkflowAgentBindingError as error:
yield self._failure_event(
inputs=inputs,
process_data=process_data,
metadata=metadata,
error=str(error),
error_type=error.error_code,
)
return
except WorkflowAgentRuntimeRequestBuildError as error:
yield self._failure_event(
inputs=inputs,
process_data=process_data,
metadata=metadata,
error=str(error),
error_type=error.error_code,
)
return
except AgentBackendError as error:
yield self._failure_event(
inputs=inputs,
process_data=process_data,
metadata=metadata,
error=str(error),
error_type=self._agent_backend_error_type(error),
)
return
except Exception as error:
yield self._failure_event(
inputs=inputs,
process_data=process_data,
metadata=metadata,
error=str(error),
error_type="agent_workflow_node_runtime_error",
)
return
stream_event_count = 0
try:
for public_event in self._agent_backend_client.stream_events(create_response.run_id):
stream_event_count += 1
for internal_event in self._event_adapter.adapt(public_event):
if internal_event.type == AgentBackendInternalEventType.RUN_STARTED:
continue
if internal_event.type == AgentBackendInternalEventType.STREAM_EVENT:
if isinstance(internal_event, AgentBackendStreamInternalEvent):
self._record_stream_metadata(metadata, internal_event)
continue
metadata["agent_backend"] = {
**dict(metadata.get("agent_backend") or {}),
"stream_event_count": stream_event_count,
}
if isinstance(internal_event, AgentBackendRunSucceededInternalEvent):
yield StreamCompletedEvent(
node_run_result=self._output_adapter.build_success_result(
event=internal_event,
inputs=inputs,
process_data=process_data,
metadata=metadata,
)
)
return
if isinstance(
internal_event,
AgentBackendRunFailedInternalEvent,
) or internal_event.type in {
AgentBackendInternalEventType.RUN_CANCELLED,
AgentBackendInternalEventType.RUN_PAUSED,
}:
yield StreamCompletedEvent(
node_run_result=self._output_adapter.build_failure_result(
event=internal_event,
inputs=inputs,
process_data=process_data,
metadata=metadata,
)
)
return
except AgentBackendError as error:
yield self._failure_event(
inputs=inputs,
process_data=process_data,
metadata=metadata,
error=str(error),
error_type=self._agent_backend_error_type(error),
)
return
except Exception as error:
yield self._failure_event(
inputs=inputs,
process_data=process_data,
metadata=metadata,
error=str(error),
error_type="agent_backend_stream_error",
)
return
yield StreamCompletedEvent(
node_run_result=self._output_adapter.build_stream_exhausted_result(
inputs=inputs,
process_data=process_data,
metadata=metadata,
)
)
@staticmethod
def _failure_event(
*,
inputs: dict[str, Any],
process_data: dict[str, Any],
metadata: dict[str, Any],
error: str,
error_type: str,
) -> StreamCompletedEvent:
return StreamCompletedEvent(
node_run_result=NodeRunResult(
status=WorkflowNodeExecutionStatus.FAILED,
inputs=inputs,
process_data=process_data,
metadata={WorkflowNodeExecutionMetadataKey.AGENT_LOG: metadata},
outputs={},
error=error,
error_type=error_type,
)
)
@staticmethod
def _agent_backend_error_type(error: AgentBackendError) -> str:
if isinstance(error, AgentBackendValidationError):
return "agent_backend_validation_error"
if isinstance(error, AgentBackendHTTPError):
return "agent_backend_http_error"
if isinstance(error, AgentBackendStreamError):
return "agent_backend_stream_error"
if isinstance(error, AgentBackendTransportError):
return "agent_backend_transport_error"
return "agent_backend_error"
@staticmethod
def _record_stream_metadata(metadata: dict[str, Any], event: AgentBackendStreamInternalEvent) -> None:
agent_backend = dict(metadata.get("agent_backend") or {})
agent_backend["last_stream_event_id"] = event.source_event_id
if event.event_kind:
agent_backend["last_stream_event_kind"] = event.event_kind
if isinstance(event.data, Mapping):
usage = event.data.get("usage") or event.data.get("model_usage")
if isinstance(usage, Mapping):
agent_backend["usage"] = dict(usage)
metadata["agent_backend"] = agent_backend
@classmethod
def _extract_variable_selector_to_variable_mapping(
cls,
*,
graph_config: Mapping[str, Any],
node_id: str,
node_data: DifyAgentNodeData,
) -> Mapping[str, Sequence[str]]:
del graph_config, node_id, node_data
return {}

View File

@ -0,0 +1,93 @@
from __future__ import annotations
from dataclasses import dataclass
from sqlalchemy import select
from core.db.session_factory import session_factory
from models.agent import Agent, AgentConfigSnapshot, AgentStatus, WorkflowAgentNodeBinding
class WorkflowAgentBindingError(Exception):
error_code: str
def __init__(self, error_code: str, message: str) -> None:
self.error_code = error_code
super().__init__(message)
@dataclass(frozen=True, slots=True)
class WorkflowAgentBindingBundle:
binding: WorkflowAgentNodeBinding
agent: Agent
snapshot: AgentConfigSnapshot
class WorkflowAgentBindingResolver:
"""Resolve the Agent binding owned by the current workflow id and node id."""
def resolve(
self,
*,
tenant_id: str,
app_id: str,
workflow_id: str,
node_id: str,
) -> WorkflowAgentBindingBundle:
with session_factory.create_session() as session:
binding = session.scalar(
select(WorkflowAgentNodeBinding)
.where(
WorkflowAgentNodeBinding.tenant_id == tenant_id,
WorkflowAgentNodeBinding.app_id == app_id,
WorkflowAgentNodeBinding.workflow_id == workflow_id,
WorkflowAgentNodeBinding.node_id == node_id,
)
.limit(1)
)
if binding is None:
raise WorkflowAgentBindingError(
"agent_binding_not_found",
f"Workflow Agent binding not found for node {node_id}.",
)
if binding.agent_id is None:
raise WorkflowAgentBindingError("agent_not_available", "Workflow Agent binding has no agent.")
if binding.current_snapshot_id is None:
raise WorkflowAgentBindingError(
"agent_config_snapshot_not_found",
"Workflow Agent binding has no current config snapshot.",
)
agent = session.scalar(
select(Agent)
.where(
Agent.tenant_id == tenant_id,
Agent.id == binding.agent_id,
)
.limit(1)
)
if agent is None or agent.status == AgentStatus.ARCHIVED:
raise WorkflowAgentBindingError(
"agent_not_available",
f"Agent {binding.agent_id} is not available.",
)
snapshot = session.scalar(
select(AgentConfigSnapshot)
.where(
AgentConfigSnapshot.tenant_id == tenant_id,
AgentConfigSnapshot.agent_id == agent.id,
AgentConfigSnapshot.id == binding.current_snapshot_id,
)
.limit(1)
)
if snapshot is None:
raise WorkflowAgentBindingError(
"agent_config_snapshot_not_found",
f"Agent config snapshot {binding.current_snapshot_id} not found.",
)
session.expunge(binding)
session.expunge(agent)
session.expunge(snapshot)
return WorkflowAgentBindingBundle(binding=binding, agent=agent, snapshot=snapshot)

View File

@ -0,0 +1,17 @@
from typing import Literal
from pydantic import model_validator
from graphon.entities.base_node_data import BaseNodeData
from graphon.enums import BuiltinNodeTypes, NodeType
class DifyAgentNodeData(BaseNodeData):
type: NodeType = BuiltinNodeTypes.AGENT
agent_node_kind: Literal["dify_agent"] = "dify_agent"
@model_validator(mode="after")
def validate_version(self) -> "DifyAgentNodeData":
if self.version != "2":
raise ValueError("Dify Agent Node v2 requires version='2'")
return self

View File

@ -0,0 +1,255 @@
from __future__ import annotations
from collections.abc import Mapping
from typing import Any
from clients.agent_backend import (
AgentBackendInternalEvent,
AgentBackendInternalEventType,
AgentBackendRunCancelledInternalEvent,
AgentBackendRunFailedInternalEvent,
AgentBackendRunPausedInternalEvent,
AgentBackendRunSucceededInternalEvent,
)
from graphon.enums import WorkflowNodeExecutionMetadataKey, WorkflowNodeExecutionStatus
from graphon.file import File, FileTransferMethod, FileType
from graphon.model_runtime.entities.llm_entities import LLMUsage
from graphon.node_events import NodeRunResult
from graphon.variables.segments import ArrayFileSegment, FileSegment
class WorkflowAgentOutputAdapter:
"""Convert terminal Agent backend events into workflow node run results."""
def build_success_result(
self,
*,
event: AgentBackendRunSucceededInternalEvent,
inputs: dict[str, Any],
process_data: dict[str, Any],
metadata: dict[str, Any],
) -> NodeRunResult:
metadata = self._with_terminal_metadata(metadata, event, "succeeded")
usage = self._usage_from_metadata(metadata)
return NodeRunResult(
status=WorkflowNodeExecutionStatus.SUCCEEDED,
inputs=inputs,
process_data=process_data,
outputs=self._normalize_outputs(event.output),
metadata=self._build_node_metadata(metadata=metadata, usage=usage),
llm_usage=usage or LLMUsage.empty_usage(),
)
def build_failure_result(
self,
*,
event: (
AgentBackendRunFailedInternalEvent
| AgentBackendRunCancelledInternalEvent
| AgentBackendRunPausedInternalEvent
),
inputs: dict[str, Any],
process_data: dict[str, Any],
metadata: dict[str, Any],
) -> NodeRunResult:
status = WorkflowNodeExecutionStatus.FAILED
error = "Agent backend run failed."
error_type = "agent_backend_run_failed"
terminal_status = "failed"
match event:
case AgentBackendRunFailedInternalEvent():
error = event.error
error_type = event.reason or "agent_backend_run_failed"
terminal_status = "failed"
case AgentBackendRunCancelledInternalEvent():
error = event.message or "Agent backend run was cancelled."
error_type = "agent_backend_run_cancelled"
terminal_status = "cancelled"
case AgentBackendRunPausedInternalEvent():
error = event.message or "Agent backend run paused, but workflow Agent Node pause is not supported yet."
error_type = "agent_backend_paused_unsupported"
terminal_status = "paused"
metadata = self._with_terminal_metadata(metadata, event, terminal_status)
usage = self._usage_from_metadata(metadata)
return NodeRunResult(
status=status,
inputs=inputs,
process_data=process_data,
metadata=self._build_node_metadata(metadata=metadata, usage=usage),
llm_usage=usage or LLMUsage.empty_usage(),
error=error,
error_type=error_type,
)
def build_stream_exhausted_result(
self,
*,
inputs: dict[str, Any],
process_data: dict[str, Any],
metadata: dict[str, Any],
) -> NodeRunResult:
usage = self._usage_from_metadata(metadata)
return NodeRunResult(
status=WorkflowNodeExecutionStatus.FAILED,
inputs=inputs,
process_data=process_data,
metadata=self._build_node_metadata(metadata=metadata, usage=usage),
llm_usage=usage or LLMUsage.empty_usage(),
error="Agent backend stream ended before a terminal event.",
error_type="agent_backend_stream_error",
)
@classmethod
def _normalize_outputs(cls, output: Any) -> dict[str, Any]:
if isinstance(output, dict):
if cls._is_file_payload(output):
return {"file": cls._file_segment_from_payload(output)}
return {key: cls._normalize_output_value(value) for key, value in output.items()}
if isinstance(output, str):
return {"text": output}
return {"result": output}
@classmethod
def _normalize_output_value(cls, value: Any) -> Any:
if isinstance(value, File | FileSegment | ArrayFileSegment):
return value
if isinstance(value, Mapping):
if cls._is_file_payload(value):
return cls._file_segment_from_payload(value)
return {key: cls._normalize_output_value(item) for key, item in value.items()}
if isinstance(value, list):
if value and all(isinstance(item, Mapping) and cls._is_file_payload(item) for item in value):
return ArrayFileSegment(value=[cls._file_from_payload(item) for item in value])
return [cls._normalize_output_value(item) for item in value]
return value
@staticmethod
def _is_file_payload(value: Mapping[str, Any]) -> bool:
return any(value.get(key) for key in ("file_id", "upload_file_id", "tool_file_id", "url", "remote_url")) and (
"filename" in value or "mime_type" in value or "url" in value or "remote_url" in value
)
@classmethod
def _file_segment_from_payload(cls, value: Mapping[str, Any]) -> FileSegment:
return FileSegment(value=cls._file_from_payload(value))
@classmethod
def _file_from_payload(cls, value: Mapping[str, Any]) -> File:
remote_url = cls._string_value(value.get("remote_url") or value.get("url"))
upload_file_id = cls._string_value(value.get("upload_file_id") or value.get("file_id"))
tool_file_id = cls._string_value(value.get("tool_file_id"))
filename = cls._string_value(value.get("filename") or value.get("name"))
mime_type = cls._string_value(value.get("mime_type") or value.get("mimetype"))
extension = cls._extension_from_payload(value, filename)
file_type = cls._file_type_from_payload(value, mime_type)
size = value.get("size")
if not isinstance(size, int):
size = -1
if tool_file_id:
transfer_method = FileTransferMethod.TOOL_FILE
related_id = tool_file_id
elif remote_url:
transfer_method = FileTransferMethod.REMOTE_URL
related_id = None
else:
transfer_method = FileTransferMethod.LOCAL_FILE
related_id = upload_file_id
return File(
type=file_type,
transfer_method=transfer_method,
remote_url=remote_url if transfer_method == FileTransferMethod.REMOTE_URL else None,
related_id=related_id,
filename=filename,
extension=extension,
mime_type=mime_type,
size=size,
)
@staticmethod
def _string_value(value: Any) -> str | None:
return value if isinstance(value, str) and value else None
@classmethod
def _extension_from_payload(cls, value: Mapping[str, Any], filename: str | None) -> str | None:
extension = cls._string_value(value.get("extension"))
if extension:
return extension if extension.startswith(".") else f".{extension}"
if filename and "." in filename:
return f".{filename.rsplit('.', 1)[1]}"
return None
@staticmethod
def _file_type_from_payload(value: Mapping[str, Any], mime_type: str | None) -> FileType:
explicit_type = value.get("type") or value.get("file_type")
if isinstance(explicit_type, str):
try:
return FileType(explicit_type)
except ValueError:
pass
if mime_type:
if mime_type.startswith("image/"):
return FileType.IMAGE
if mime_type.startswith("audio/"):
return FileType.AUDIO
if mime_type.startswith("video/"):
return FileType.VIDEO
return FileType.DOCUMENT
return FileType.CUSTOM
@staticmethod
def _usage_from_metadata(metadata: Mapping[str, Any]) -> LLMUsage | None:
agent_backend = metadata.get("agent_backend")
if not isinstance(agent_backend, Mapping):
return None
usage = agent_backend.get("usage")
if not isinstance(usage, Mapping):
return None
try:
return LLMUsage.from_metadata(usage)
except (TypeError, ValueError):
return None
@staticmethod
def _build_node_metadata(
*,
metadata: dict[str, Any],
usage: LLMUsage | None,
) -> dict[WorkflowNodeExecutionMetadataKey, Any]:
node_metadata: dict[WorkflowNodeExecutionMetadataKey, Any] = {
WorkflowNodeExecutionMetadataKey.AGENT_LOG: metadata,
}
if usage is not None:
node_metadata[WorkflowNodeExecutionMetadataKey.TOTAL_TOKENS] = usage.total_tokens
node_metadata[WorkflowNodeExecutionMetadataKey.TOTAL_PRICE] = usage.total_price
node_metadata[WorkflowNodeExecutionMetadataKey.CURRENCY] = usage.currency
return node_metadata
@staticmethod
def _with_terminal_metadata(
metadata: dict[str, Any],
event: AgentBackendInternalEvent,
terminal_status: str,
) -> dict[str, Any]:
updated = dict(metadata)
agent_backend = dict(updated.get("agent_backend") or {})
agent_backend.update(
{
"run_id": event.run_id,
"terminal_event_id": event.source_event_id,
"status": terminal_status,
}
)
session_snapshot = None
if isinstance(event, AgentBackendRunSucceededInternalEvent | AgentBackendRunPausedInternalEvent):
session_snapshot = event.session_snapshot
if session_snapshot is not None:
agent_backend["session_snapshot"] = {
"layer_count": len(session_snapshot.layers),
}
updated["agent_backend"] = agent_backend
updated["terminal_event_type"] = AgentBackendInternalEventType(event.type).value
return updated

View File

@ -0,0 +1,55 @@
from __future__ import annotations
from typing import Any
from models.agent_config_entities import AgentSoulConfig
SUPPORTED_AGENT_BACKEND_FEATURES = frozenset(
{
"system_prompt",
"workflow_prompt",
"workflow_context",
"model",
"structured_output",
}
)
RESERVED_AGENT_BACKEND_FEATURES = frozenset(
{
"skills_files",
"tools",
"knowledge",
"human",
"env",
"sandbox",
"memory",
}
)
def build_runtime_feature_manifest(agent_soul: AgentSoulConfig) -> dict[str, Any]:
"""Describe PRD capabilities that are persisted but not executed in phase 3."""
warnings: list[dict[str, str]] = []
soul_dump = agent_soul.model_dump(mode="json")
for section in sorted(RESERVED_AGENT_BACKEND_FEATURES):
value = soul_dump.get(section)
has_value = bool(value)
if isinstance(value, dict):
has_value = any(bool(item) for item in value.values())
if has_value:
warnings.append(
{
"section": f"agent_soul.{section}",
"code": "agent_backend_layer_not_available",
"message": f"{section} is saved in Agent Soul but is not executed by Agent backend in phase 3.",
}
)
reserved_status = dict.fromkeys(sorted(RESERVED_AGENT_BACKEND_FEATURES), "reserved_not_executed")
return {
"supported": sorted(SUPPORTED_AGENT_BACKEND_FEATURES),
"reserved": sorted(RESERVED_AGENT_BACKEND_FEATURES),
"reserved_status": reserved_status,
"unsupported_runtime_warnings": warnings,
}

View File

@ -0,0 +1,288 @@
from __future__ import annotations
from collections.abc import Mapping, Sequence
from dataclasses import dataclass
from typing import Any, Literal, Protocol, cast
from dify_agent.protocol import CreateRunRequest, ExecutionContext
from clients.agent_backend import (
AgentBackendModelConfig,
AgentBackendOutputConfig,
AgentBackendRunRequestBuilder,
AgentBackendWorkflowNodeRunInput,
redact_for_agent_backend_log,
)
from core.app.entities.app_invoke_entities import DifyRunContext, InvokeFrom
from core.workflow.system_variables import SystemVariableKey, get_system_text
from graphon.variables.segments import Segment
from models.agent import Agent, AgentConfigSnapshot, WorkflowAgentNodeBinding
from models.agent_config_entities import (
AgentSoulConfig,
DeclaredOutputConfig,
DeclaredOutputType,
WorkflowNodeJobConfig,
)
from .runtime_feature_manifest import build_runtime_feature_manifest
class WorkflowAgentRuntimeRequestBuildError(ValueError):
"""Raised when workflow state cannot be mapped to a valid Agent backend run request."""
def __init__(self, error_code: str, message: str) -> None:
self.error_code = error_code
super().__init__(message)
class VariablePoolReader(Protocol):
def get(self, selector: Sequence[str], /) -> Segment | None: ...
def get_by_prefix(self, prefix: str, /) -> Mapping[str, object]: ...
class CredentialsProvider(Protocol):
def fetch(self, provider_name: str, model_name: str) -> dict[str, Any]: ...
@dataclass(frozen=True, slots=True)
class WorkflowAgentRuntimeBuildContext:
dify_context: DifyRunContext
workflow_id: str
workflow_run_id: str | None
node_id: str
node_execution_id: str
variable_pool: VariablePoolReader
binding: WorkflowAgentNodeBinding
agent: Agent
snapshot: AgentConfigSnapshot
@dataclass(frozen=True, slots=True)
class WorkflowAgentRuntimeRequest:
request: CreateRunRequest
redacted_request: dict[str, Any]
agent_soul: AgentSoulConfig
node_job: WorkflowNodeJobConfig
metadata: dict[str, Any]
class WorkflowAgentRuntimeRequestBuilder:
"""Build public Dify Agent run requests from workflow Agent v2 runtime state."""
def __init__(
self,
*,
credentials_provider: CredentialsProvider,
request_builder: AgentBackendRunRequestBuilder | None = None,
) -> None:
self._credentials_provider = credentials_provider
self._request_builder = request_builder or AgentBackendRunRequestBuilder()
def build(self, context: WorkflowAgentRuntimeBuildContext) -> WorkflowAgentRuntimeRequest:
agent_soul = AgentSoulConfig.model_validate(context.snapshot.config_snapshot_dict)
node_job = WorkflowNodeJobConfig.model_validate(context.binding.node_job_config_dict)
if agent_soul.model is None:
raise WorkflowAgentRuntimeRequestBuildError(
"agent_model_not_configured",
"Workflow Agent node requires Agent Soul model config.",
)
metadata = self._build_metadata(context, agent_soul, node_job)
workflow_context_prompt = self._build_workflow_context_prompt(context, node_job)
workflow_job_prompt = node_job.workflow_prompt.strip() or "Run this workflow Agent Node for the current run."
user_prompt = workflow_context_prompt.strip() or "Use the current workflow context."
credentials = self._credentials_provider.fetch(agent_soul.model.model_provider, agent_soul.model.model)
request = self._request_builder.build_for_workflow_node(
AgentBackendWorkflowNodeRunInput(
model=AgentBackendModelConfig(
tenant_id=context.dify_context.tenant_id,
plugin_id=agent_soul.model.plugin_id,
model_provider=agent_soul.model.model_provider,
model=agent_soul.model.model,
user_id=context.dify_context.user_id,
credentials=self._normalize_credentials(credentials),
model_settings=cast(dict[str, Any], agent_soul.model.model_settings),
),
execution_context=ExecutionContext(
tenant_id=context.dify_context.tenant_id,
app_id=context.dify_context.app_id,
workflow_id=context.workflow_id,
workflow_run_id=context.workflow_run_id,
node_id=context.node_id,
node_execution_id=context.node_execution_id,
conversation_id=get_system_text(context.variable_pool, SystemVariableKey.CONVERSATION_ID),
agent_id=context.agent.id,
agent_config_version_id=context.snapshot.id,
invoke_from=self._agent_backend_invoke_from(context.dify_context.invoke_from),
),
agent_soul_prompt=agent_soul.prompt.system_prompt or None,
workflow_node_job_prompt=workflow_job_prompt,
user_prompt=user_prompt,
output=self._build_output_config(node_job.declared_outputs),
idempotency_key=self._idempotency_key(context),
metadata=metadata,
)
)
redacted = cast(dict[str, Any], redact_for_agent_backend_log(request))
return WorkflowAgentRuntimeRequest(
request=request,
redacted_request=redacted,
agent_soul=agent_soul,
node_job=node_job,
metadata=metadata,
)
@staticmethod
def _agent_backend_invoke_from(invoke_from: InvokeFrom) -> Literal["workflow_run", "single_step"]:
if invoke_from in {InvokeFrom.DEBUGGER, InvokeFrom.VALIDATION}:
return "single_step"
return "workflow_run"
@staticmethod
def _idempotency_key(context: WorkflowAgentRuntimeBuildContext) -> str:
if context.workflow_run_id:
return f"{context.workflow_run_id}:{context.node_execution_id}"
return context.node_execution_id
@staticmethod
def _build_metadata(
context: WorkflowAgentRuntimeBuildContext,
agent_soul: AgentSoulConfig,
node_job: WorkflowNodeJobConfig,
) -> dict[str, Any]:
return {
"tenant_id": context.dify_context.tenant_id,
"app_id": context.dify_context.app_id,
"workflow_id": context.workflow_id,
"workflow_run_id": context.workflow_run_id,
"node_id": context.node_id,
"node_execution_id": context.node_execution_id,
"agent_id": context.agent.id,
"agent_config_snapshot_id": context.snapshot.id,
"binding_id": context.binding.id,
"workflow_node_job_mode": node_job.mode.value,
"runtime_support": build_runtime_feature_manifest(agent_soul),
}
def _build_workflow_context_prompt(
self,
context: WorkflowAgentRuntimeBuildContext,
node_job: WorkflowNodeJobConfig,
) -> str:
lines = ["Workflow context loaded for this run:"]
query = get_system_text(context.variable_pool, SystemVariableKey.QUERY)
if query:
lines.append(f"- User query: {query}")
resolved_outputs = self._resolve_previous_node_outputs(
context.variable_pool,
node_job.previous_node_output_refs,
)
if resolved_outputs:
lines.append("- Previous node outputs:")
for item in resolved_outputs:
lines.append(f" - {item['label']}: {item['value']}")
lines.append("The above workflow context is run-specific. Do not treat it as Agent Soul or persistent memory.")
return "\n".join(lines)
def _resolve_previous_node_outputs(
self,
variable_pool: VariablePoolReader,
refs: Sequence[Mapping[str, Any]],
) -> list[dict[str, Any]]:
resolved: list[dict[str, Any]] = []
for ref in refs:
selector = self._selector_from_ref(ref)
if not selector:
raise WorkflowAgentRuntimeRequestBuildError(
"invalid_previous_node_output_ref",
"Workflow Agent node has invalid previous node output ref.",
)
segment = variable_pool.get(selector)
if segment is None:
raise WorkflowAgentRuntimeRequestBuildError(
"missing_previous_node_output",
f"Workflow Agent node cannot resolve previous node output {'.'.join(selector)}.",
)
value = getattr(segment, "value", None)
resolved.append(
{
"label": ".".join(selector),
"value": self._summarize_value(value),
}
)
return resolved
@staticmethod
def _selector_from_ref(ref: Mapping[str, Any]) -> list[str] | None:
for key in ("selector", "variable_selector", "value_selector"):
value = ref.get(key)
if isinstance(value, list) and all(isinstance(item, str) for item in value):
return value
node_id = ref.get("node_id")
output_name = ref.get("output") or ref.get("name") or ref.get("variable") or ref.get("key")
if isinstance(node_id, str) and isinstance(output_name, str):
return [node_id, output_name]
return None
@staticmethod
def _summarize_value(value: Any) -> str:
text = str(value)
if len(text) > 2000:
return text[:2000] + "...[truncated]"
return text
@staticmethod
def _build_output_config(declared_outputs: Sequence[DeclaredOutputConfig]) -> AgentBackendOutputConfig | None:
if not declared_outputs:
return None
properties: dict[str, Any] = {}
required: list[str] = []
for output in declared_outputs:
properties[output.name] = WorkflowAgentRuntimeRequestBuilder._schema_for_declared_output(output)
if output.required:
required.append(output.name)
schema: dict[str, Any] = {"type": "object", "properties": properties}
if required:
schema["required"] = required
return AgentBackendOutputConfig(json_schema=schema)
@staticmethod
def _schema_for_declared_output(output: DeclaredOutputConfig) -> dict[str, Any]:
match output.type:
case DeclaredOutputType.STRING:
schema: dict[str, Any] = {"type": "string"}
case DeclaredOutputType.NUMBER:
schema = {"type": "number"}
case DeclaredOutputType.BOOLEAN:
schema = {"type": "boolean"}
case DeclaredOutputType.OBJECT:
schema = {"type": "object"}
case DeclaredOutputType.ARRAY:
schema = {"type": "array"}
case DeclaredOutputType.FILE:
schema = {
"type": "object",
"properties": {
"file_id": {"type": "string"},
"filename": {"type": "string"},
"mime_type": {"type": "string"},
"url": {"type": "string"},
},
}
if output.description:
schema["description"] = output.description
return schema
@staticmethod
def _normalize_credentials(credentials: Mapping[str, Any]) -> dict[str, str | int | float | bool | None]:
normalized: dict[str, str | int | float | bool | None] = {}
for key, value in credentials.items():
if isinstance(value, str | int | float | bool) or value is None:
normalized[key] = value
else:
normalized[key] = str(value)
return normalized

View File

@ -0,0 +1,388 @@
from __future__ import annotations
from collections import defaultdict, deque
from collections.abc import Iterator, Mapping, Sequence
from typing import Any
from sqlalchemy import select
from sqlalchemy.orm import Session
from graphon.enums import BuiltinNodeTypes
from models.agent import Agent, AgentConfigSnapshot, AgentStatus, WorkflowAgentNodeBinding
from models.agent_config_entities import AgentSoulConfig, WorkflowNodeJobConfig
from models.model import UploadFile
from models.workflow import Workflow
from .entities import DifyAgentNodeData
class WorkflowAgentNodeValidationError(ValueError):
"""Raised when a Workflow Agent v2 node cannot be executed or published."""
class WorkflowAgentNodeValidator:
"""Validate Agent v2 workflow nodes against graph topology and persisted bindings."""
_LOCKED_AGENT_SOUL_KEYS = frozenset(
{
"agent_soul",
"soul",
"prompt",
"system_prompt",
"skills_files",
"skills",
"files",
"tools",
"dify_tools",
"cli_tools",
"knowledge",
"env",
"environment",
"sandbox",
"sandbox_provider",
"memory",
"memory_strategy",
"model",
"app_features",
"app_variables",
"misc_legacy",
}
)
_SUPPORTED_HUMAN_CONTACT_CHANNELS = frozenset({"email", "slack", "web_app", "webapp", "chat"})
@classmethod
def validate_draft_workflow(cls, *, session: Session, workflow: Workflow) -> None:
cls._validate_workflow(session=session, workflow=workflow, require_binding=False)
@classmethod
def validate_published_workflow(cls, *, session: Session, workflow: Workflow) -> None:
cls._validate_workflow(session=session, workflow=workflow, require_binding=True)
@classmethod
def _validate_workflow(cls, *, session: Session, workflow: Workflow, require_binding: bool) -> None:
graph = workflow.graph_dict
topology = _WorkflowGraphTopology.from_graph(graph)
for node_id, node_data in cls.iter_agent_v2_nodes(graph):
cls._validate_node_schema(node_id=node_id, node_data=node_data)
binding = cls._find_binding(
session=session,
tenant_id=workflow.tenant_id,
app_id=workflow.app_id,
workflow_id=workflow.id,
node_id=node_id,
)
if binding is None:
if require_binding:
raise WorkflowAgentNodeValidationError(
f"Workflow Agent node {node_id} requires a binding before publishing."
)
continue
cls.validate_binding(session=session, binding=binding, topology=topology)
@classmethod
def validate_binding(
cls,
*,
session: Session,
binding: WorkflowAgentNodeBinding,
topology: _WorkflowGraphTopology | None = None,
) -> None:
if binding.agent_id is None:
raise WorkflowAgentNodeValidationError(f"Workflow Agent node {binding.node_id} is missing agent binding.")
if binding.current_snapshot_id is None:
raise WorkflowAgentNodeValidationError(
f"Workflow Agent node {binding.node_id} is missing config snapshot binding."
)
agent = session.scalar(
select(Agent)
.where(
Agent.tenant_id == binding.tenant_id,
Agent.id == binding.agent_id,
)
.limit(1)
)
if agent is None or agent.status == AgentStatus.ARCHIVED:
raise WorkflowAgentNodeValidationError(
f"Workflow Agent node {binding.node_id} references an unavailable agent."
)
snapshot = session.scalar(
select(AgentConfigSnapshot)
.where(
AgentConfigSnapshot.tenant_id == binding.tenant_id,
AgentConfigSnapshot.agent_id == agent.id,
AgentConfigSnapshot.id == binding.current_snapshot_id,
)
.limit(1)
)
if snapshot is None:
raise WorkflowAgentNodeValidationError(
f"Workflow Agent node {binding.node_id} references a missing config snapshot."
)
agent_soul = AgentSoulConfig.model_validate(snapshot.config_snapshot_dict)
if agent_soul.model is None:
raise WorkflowAgentNodeValidationError(
f"Workflow Agent node {binding.node_id} requires Agent Soul model config."
)
node_job = WorkflowNodeJobConfig.model_validate(binding.node_job_config_dict)
cls.validate_node_job(session=session, binding=binding, node_job=node_job, topology=topology)
@classmethod
def validate_node_job(
cls,
*,
session: Session,
binding: WorkflowAgentNodeBinding,
node_job: WorkflowNodeJobConfig,
topology: _WorkflowGraphTopology | None = None,
) -> None:
cls._validate_locked_agent_soul_not_overridden(binding=binding, node_job=node_job)
output_names: set[str] = set()
for output in node_job.declared_outputs:
if output.name in output_names:
raise WorkflowAgentNodeValidationError(
f"Workflow Agent node {binding.node_id} has duplicate output name {output.name}."
)
output_names.add(output.name)
for check in output.checks:
if check.benchmark_file_ref is not None:
cls._validate_file_ref(
session=session,
binding=binding,
file_ref=check.benchmark_file_ref,
ref_context=f"output {output.name} benchmark file",
)
for ref in node_job.previous_node_output_refs:
selector = cls.selector_from_ref(ref)
if selector is None:
raise WorkflowAgentNodeValidationError(
f"Workflow Agent node {binding.node_id} has invalid previous node output ref."
)
if topology is None:
continue
if len(selector) < 2:
raise WorkflowAgentNodeValidationError(
f"Workflow Agent node {binding.node_id} has incomplete previous node output ref."
)
source_node_id = selector[0]
if not topology.has_node(source_node_id):
raise WorkflowAgentNodeValidationError(
f"Workflow Agent node {binding.node_id} references missing previous node {source_node_id}."
)
if not topology.is_upstream(source_node_id=source_node_id, target_node_id=binding.node_id):
raise WorkflowAgentNodeValidationError(
f"Workflow Agent node {binding.node_id} references non-upstream previous node {source_node_id}."
)
for human_ref in node_job.human_contacts:
cls._validate_human_ref(binding=binding, human_ref=human_ref)
file_refs = node_job.metadata.get("file_refs")
if isinstance(file_refs, list):
for file_ref in file_refs:
if isinstance(file_ref, Mapping):
cls._validate_file_ref(
session=session,
binding=binding,
file_ref=file_ref,
ref_context="metadata file ref",
)
@staticmethod
def iter_agent_v2_nodes(graph_dict: Mapping[str, Any]) -> Iterator[tuple[str, Mapping[str, Any]]]:
nodes = graph_dict.get("nodes")
if not isinstance(nodes, list):
return
for node in nodes:
if not isinstance(node, Mapping):
continue
node_id = node.get("id")
node_data = node.get("data")
if not isinstance(node_id, str) or not isinstance(node_data, Mapping):
continue
if node_data.get("type") == BuiltinNodeTypes.AGENT and str(node_data.get("version")) == "2":
yield node_id, node_data
@staticmethod
def selector_from_ref(ref: Mapping[str, Any]) -> list[str] | None:
for key in ("selector", "variable_selector", "value_selector"):
value = ref.get(key)
if isinstance(value, list) and all(isinstance(item, str) for item in value):
return value
node_id = ref.get("node_id")
output_name = ref.get("output") or ref.get("name") or ref.get("variable") or ref.get("key")
if isinstance(node_id, str) and isinstance(output_name, str):
return [node_id, output_name]
return None
@staticmethod
def _validate_node_schema(*, node_id: str, node_data: Mapping[str, Any]) -> None:
try:
DifyAgentNodeData.model_validate(node_data)
except ValueError as exc:
raise WorkflowAgentNodeValidationError(
f"Workflow Agent node {node_id} has invalid Agent v2 node schema: {exc}"
) from exc
@classmethod
def _validate_locked_agent_soul_not_overridden(
cls,
*,
binding: WorkflowAgentNodeBinding,
node_job: WorkflowNodeJobConfig,
) -> None:
forbidden_paths = cls._find_locked_agent_soul_paths(node_job.metadata)
if forbidden_paths:
raise WorkflowAgentNodeValidationError(
f"Workflow Agent node {binding.node_id} cannot override locked Agent Soul fields: "
f"{', '.join(sorted(forbidden_paths))}."
)
@classmethod
def _find_locked_agent_soul_paths(cls, value: Any, *, path: str = "metadata") -> set[str]:
if not isinstance(value, Mapping):
return set()
forbidden: set[str] = set()
for key, item in value.items():
key_text = str(key)
if key_text in cls._LOCKED_AGENT_SOUL_KEYS:
forbidden.add(f"{path}.{key_text}")
forbidden.update(cls._find_locked_agent_soul_paths(item, path=f"{path}.{key_text}"))
return forbidden
@classmethod
def _validate_human_ref(
cls,
*,
binding: WorkflowAgentNodeBinding,
human_ref: Mapping[str, Any],
) -> None:
contact_id = human_ref.get("contact_id") or human_ref.get("human_id") or human_ref.get("id")
if not isinstance(contact_id, str) or not contact_id:
raise WorkflowAgentNodeValidationError(
f"Workflow Agent node {binding.node_id} has invalid human contact ref."
)
tenant_id = human_ref.get("tenant_id")
if tenant_id is not None and tenant_id != binding.tenant_id:
raise WorkflowAgentNodeValidationError(
f"Workflow Agent node {binding.node_id} references out-of-scope human contact {contact_id}."
)
channel = human_ref.get("channel") or human_ref.get("method") or human_ref.get("contact_method")
if channel is not None and channel not in cls._SUPPORTED_HUMAN_CONTACT_CHANNELS:
raise WorkflowAgentNodeValidationError(
f"Workflow Agent node {binding.node_id} references unsupported human contact channel {channel}."
)
@staticmethod
def _validate_file_ref(
*,
session: Session,
binding: WorkflowAgentNodeBinding,
file_ref: Mapping[str, Any],
ref_context: str,
) -> None:
tenant_id = file_ref.get("tenant_id")
if tenant_id is not None and tenant_id != binding.tenant_id:
raise WorkflowAgentNodeValidationError(
f"Workflow Agent node {binding.node_id} references out-of-scope {ref_context}."
)
upload_file_id = (
file_ref.get("upload_file_id") or file_ref.get("file_id") or file_ref.get("id") or file_ref.get("reference")
)
if upload_file_id is None and (file_ref.get("url") or file_ref.get("remote_url")):
return
if not isinstance(upload_file_id, str) or not upload_file_id:
raise WorkflowAgentNodeValidationError(f"Workflow Agent node {binding.node_id} has invalid {ref_context}.")
upload_file = session.scalar(
select(UploadFile)
.where(
UploadFile.tenant_id == binding.tenant_id,
UploadFile.id == upload_file_id,
)
.limit(1)
)
if upload_file is None:
raise WorkflowAgentNodeValidationError(
f"Workflow Agent node {binding.node_id} references missing or out-of-scope {ref_context}."
)
@staticmethod
def _find_binding(
*,
session: Session,
tenant_id: str,
app_id: str,
workflow_id: str,
node_id: str,
) -> WorkflowAgentNodeBinding | None:
return session.scalar(
select(WorkflowAgentNodeBinding)
.where(
WorkflowAgentNodeBinding.tenant_id == tenant_id,
WorkflowAgentNodeBinding.app_id == app_id,
WorkflowAgentNodeBinding.workflow_id == workflow_id,
WorkflowAgentNodeBinding.node_id == node_id,
)
.limit(1)
)
class _WorkflowGraphTopology:
def __init__(self, *, node_ids: set[str], incoming: Mapping[str, Sequence[str]]) -> None:
self._node_ids = node_ids
self._incoming = incoming
@classmethod
def from_graph(cls, graph: Mapping[str, Any]) -> _WorkflowGraphTopology:
node_ids = cls._node_ids_from_graph(graph)
incoming: dict[str, list[str]] = defaultdict(list)
edges = graph.get("edges")
if isinstance(edges, list):
for edge in edges:
if not isinstance(edge, Mapping):
continue
source = edge.get("source")
target = edge.get("target")
if isinstance(source, str) and isinstance(target, str):
incoming[target].append(source)
return cls(node_ids=node_ids, incoming=incoming)
def has_node(self, node_id: str) -> bool:
return node_id in self._node_ids
def is_upstream(self, *, source_node_id: str, target_node_id: str) -> bool:
if source_node_id == target_node_id:
return False
visited: set[str] = set()
queue: deque[str] = deque(self._incoming.get(target_node_id, ()))
while queue:
candidate = queue.popleft()
if candidate == source_node_id:
return True
if candidate in visited:
continue
visited.add(candidate)
queue.extend(self._incoming.get(candidate, ()))
return False
@staticmethod
def _node_ids_from_graph(graph: Mapping[str, Any]) -> set[str]:
node_ids: set[str] = set()
nodes = graph.get("nodes")
if not isinstance(nodes, list):
return node_ids
for node in nodes:
if not isinstance(node, Mapping):
continue
node_id = node.get("id")
if isinstance(node_id, str):
node_ids.add(node_id)
return node_ids

View File

@ -64,6 +64,24 @@ class AgentSoulMemoryConfig(BaseModel):
artifacts: list[dict[str, Any]] = Field(default_factory=list)
class AgentSoulModelCredentialRef(BaseModel):
"""Reference to model credentials resolved only at runtime."""
type: str = Field(min_length=1, max_length=64)
id: str | None = Field(default=None, max_length=255)
provider: str | None = Field(default=None, max_length=255)
class AgentSoulModelConfig(BaseModel):
"""Stable model selection for Agent runtime without storing secret values."""
plugin_id: str = Field(min_length=1, max_length=255)
model_provider: str = Field(min_length=1, max_length=255)
model: str = Field(min_length=1, max_length=255)
credential_ref: AgentSoulModelCredentialRef | None = None
model_settings: dict[str, Any] = Field(default_factory=dict)
class AppVariableConfig(BaseModel):
name: str = Field(min_length=1, max_length=255)
type: str = Field(min_length=1, max_length=64)
@ -83,6 +101,7 @@ class AgentSoulConfig(BaseModel):
env: AgentSoulEnvConfig = Field(default_factory=AgentSoulEnvConfig)
sandbox: AgentSoulSandboxConfig = Field(default_factory=AgentSoulSandboxConfig)
memory: AgentSoulMemoryConfig = Field(default_factory=AgentSoulMemoryConfig)
model: AgentSoulModelConfig | None = None
app_features: dict[str, Any] = Field(default_factory=dict)
app_variables: list[AppVariableConfig] = Field(default_factory=list)
misc_legacy: dict[str, Any] = Field(default_factory=dict)

View File

@ -10515,6 +10515,7 @@ Supported icon storage formats for Agent roster entries.
| knowledge | [AgentSoulKnowledgeConfig](#agentsoulknowledgeconfig) | | No |
| memory | [AgentSoulMemoryConfig](#agentsoulmemoryconfig) | | No |
| misc_legacy | object | | No |
| model | [AgentSoulModelConfig](#agentsoulmodelconfig) | | No |
| prompt | [AgentSoulPromptConfig](#agentsoulpromptconfig) | | No |
| sandbox | [AgentSoulSandboxConfig](#agentsoulsandboxconfig) | | No |
| schema_version | integer | | No |
@ -10551,6 +10552,28 @@ Supported icon storage formats for Agent roster entries.
| budget | string | | No |
| scope | string | | No |
#### AgentSoulModelConfig
Stable model selection for Agent runtime without storing secret values.
| Name | Type | Description | Required |
| ---- | ---- | ----------- | -------- |
| credential_ref | [AgentSoulModelCredentialRef](#agentsoulmodelcredentialref) | | No |
| model | string | | Yes |
| model_provider | string | | Yes |
| model_settings | object | | No |
| plugin_id | string | | Yes |
#### AgentSoulModelCredentialRef
Reference to model credentials resolved only at runtime.
| Name | Type | Description | Required |
| ---- | ---- | ----------- | -------- |
| id | string | | No |
| provider | string | | No |
| type | string | | Yes |
#### AgentSoulPromptConfig
| Name | Type | Description | Required |

View File

@ -0,0 +1,59 @@
from __future__ import annotations
from sqlalchemy import select
from sqlalchemy.orm import Session
from core.workflow.nodes.agent_v2.validators import WorkflowAgentNodeValidator
from models.agent import WorkflowAgentNodeBinding
from models.agent_config_entities import WorkflowNodeJobConfig
from models.workflow import Workflow
class WorkflowAgentPublishService:
"""Validate and freeze Workflow Agent v2 bindings during workflow publish."""
@classmethod
def validate_agent_nodes_for_publish(cls, *, session: Session, draft_workflow: Workflow) -> None:
WorkflowAgentNodeValidator.validate_published_workflow(session=session, workflow=draft_workflow)
@classmethod
def validate_agent_nodes_for_draft_sync(cls, *, session: Session, draft_workflow: Workflow) -> None:
WorkflowAgentNodeValidator.validate_draft_workflow(session=session, workflow=draft_workflow)
@classmethod
def copy_agent_node_bindings_to_published(
cls,
*,
session: Session,
draft_workflow: Workflow,
published_workflow: Workflow,
) -> None:
node_ids = {
node_id for node_id, _node_data in WorkflowAgentNodeValidator.iter_agent_v2_nodes(draft_workflow.graph_dict)
}
if not node_ids:
return
bindings = session.scalars(
select(WorkflowAgentNodeBinding).where(
WorkflowAgentNodeBinding.tenant_id == draft_workflow.tenant_id,
WorkflowAgentNodeBinding.app_id == draft_workflow.app_id,
WorkflowAgentNodeBinding.workflow_id == draft_workflow.id,
WorkflowAgentNodeBinding.node_id.in_(node_ids),
)
).all()
for binding in bindings:
copied = WorkflowAgentNodeBinding(
tenant_id=binding.tenant_id,
app_id=binding.app_id,
workflow_id=published_workflow.id,
node_id=binding.node_id,
binding_type=binding.binding_type,
agent_id=binding.agent_id,
current_snapshot_id=binding.current_snapshot_id,
node_job_config=WorkflowNodeJobConfig.model_validate(binding.node_job_config_dict),
created_by=binding.created_by,
updated_by=binding.updated_by,
)
session.add(copied)

View File

@ -312,6 +312,13 @@ class WorkflowService:
workflow.environment_variables = environment_variables
workflow.conversation_variables = conversation_variables
from services.agent.workflow_publish_service import WorkflowAgentPublishService
WorkflowAgentPublishService.validate_agent_nodes_for_draft_sync(
session=cast(Session, db.session),
draft_workflow=workflow,
)
# commit db session changes
db.session.commit()
@ -457,6 +464,13 @@ class WorkflowService:
# validate graph structure
self.validate_graph_structure(graph=draft_workflow.graph_dict)
from services.agent.workflow_publish_service import WorkflowAgentPublishService
WorkflowAgentPublishService.validate_agent_nodes_for_publish(
session=session,
draft_workflow=draft_workflow,
)
# billing check
if dify_config.BILLING_ENABLED:
limit_info = BillingService.get_info(app_model.tenant_id)
@ -490,6 +504,11 @@ class WorkflowService:
# commit db session changes
session.add(workflow)
WorkflowAgentPublishService.copy_agent_node_bindings_to_published(
session=session,
draft_workflow=draft_workflow,
published_workflow=workflow,
)
# trigger app workflow events
app_published_workflow_was_updated.send(app_model, published_workflow=workflow)

View File

@ -0,0 +1,155 @@
from types import SimpleNamespace
from typing import cast
from clients.agent_backend import (
AgentBackendRunEventAdapter,
AgentBackendStreamInternalEvent,
FakeAgentBackendRunClient,
FakeAgentBackendScenario,
)
from core.app.entities.app_invoke_entities import DIFY_RUN_CONTEXT_KEY, DifyRunContext, InvokeFrom, UserFrom
from core.workflow.nodes.agent_v2 import DifyAgentNode
from core.workflow.nodes.agent_v2.binding_resolver import WorkflowAgentBindingBundle, WorkflowAgentBindingResolver
from core.workflow.nodes.agent_v2.entities import DifyAgentNodeData
from core.workflow.nodes.agent_v2.output_adapter import WorkflowAgentOutputAdapter
from core.workflow.nodes.agent_v2.runtime_request_builder import WorkflowAgentRuntimeRequestBuilder
from graphon.entities import GraphInitParams
from graphon.enums import BuiltinNodeTypes, WorkflowNodeExecutionMetadataKey, WorkflowNodeExecutionStatus
from graphon.node_events import StreamCompletedEvent
from graphon.runtime import GraphRuntimeState
from graphon.variables.segments import StringSegment
from models.agent import Agent, AgentConfigSnapshot, WorkflowAgentNodeBinding
from models.agent_config_entities import AgentSoulConfig, AgentSoulModelConfig, WorkflowNodeJobConfig
class FakeCredentialsProvider:
def fetch(self, provider_name: str, model_name: str) -> dict[str, object]:
assert provider_name == "openai"
assert model_name == "gpt-test"
return {"api_key": "secret-key"}
class FakeVariablePool:
def get(self, selector):
values = {
("sys", "query"): "Summarize the report.",
("sys", "workflow_run_id"): "workflow-run-1",
("sys", "conversation_id"): "conversation-1",
("previous-node", "text"): "Previous result",
}
value = values.get(tuple(selector))
if value is None:
return None
return StringSegment(value=value)
def get_by_prefix(self, prefix):
return {}
class FakeBindingResolver(WorkflowAgentBindingResolver):
def __init__(self):
self.agent = Agent(id="agent-1", tenant_id="tenant-1", name="Agent")
self.snapshot = AgentConfigSnapshot(
id="snapshot-1",
tenant_id="tenant-1",
agent_id="agent-1",
version=1,
config_snapshot=AgentSoulConfig(
prompt={"system_prompt": "You are careful."},
model=AgentSoulModelConfig(
plugin_id="langgenius/openai",
model_provider="openai",
model="gpt-test",
),
),
)
self.binding = WorkflowAgentNodeBinding(
id="binding-1",
tenant_id="tenant-1",
app_id="app-1",
workflow_id="workflow-1",
node_id="agent-node",
agent_id="agent-1",
current_snapshot_id="snapshot-1",
node_job_config=WorkflowNodeJobConfig.model_validate(
{
"workflow_prompt": "Use the previous output.",
"previous_node_output_refs": [{"node_id": "previous-node", "output": "text"}],
"declared_outputs": [{"name": "text", "type": "string"}],
}
),
)
def resolve(self, **_kwargs):
return WorkflowAgentBindingBundle(binding=self.binding, agent=self.agent, snapshot=self.snapshot)
def _node(*, scenario: FakeAgentBackendScenario = FakeAgentBackendScenario.SUCCESS) -> DifyAgentNode:
graph_init_params = GraphInitParams(
workflow_id="workflow-1",
graph_config={"nodes": [], "edges": []},
run_context={
DIFY_RUN_CONTEXT_KEY: DifyRunContext(
tenant_id="tenant-1",
app_id="app-1",
user_id="user-1",
user_from=UserFrom.ACCOUNT,
invoke_from=InvokeFrom.DEBUGGER,
)
},
call_depth=0,
)
return DifyAgentNode(
node_id="agent-node",
data=DifyAgentNodeData.model_validate({"type": BuiltinNodeTypes.AGENT, "version": "2"}),
graph_init_params=graph_init_params,
graph_runtime_state=cast(GraphRuntimeState, SimpleNamespace(variable_pool=FakeVariablePool())),
binding_resolver=FakeBindingResolver(),
runtime_request_builder=WorkflowAgentRuntimeRequestBuilder(credentials_provider=FakeCredentialsProvider()),
agent_backend_client=FakeAgentBackendRunClient(scenario=scenario),
event_adapter=AgentBackendRunEventAdapter(),
output_adapter=WorkflowAgentOutputAdapter(),
)
def test_agent_node_run_maps_successful_agent_backend_run_to_node_result():
events = list(_node()._run())
assert len(events) == 1
result = cast(StreamCompletedEvent, events[0]).node_run_result
assert result.status == WorkflowNodeExecutionStatus.SUCCEEDED
assert result.outputs == {"text": "hello agent"}
agent_log = result.metadata[WorkflowNodeExecutionMetadataKey.AGENT_LOG]
assert agent_log["agent_backend"]["run_id"] == "fake-run-1"
assert agent_log["agent_backend"]["status"] == "succeeded"
assert result.process_data["agent_id"] == "agent-1"
assert result.inputs["agent_backend_request"]["composition"]["layers"][4]["config"]["credentials"] == "[REDACTED]"
def test_agent_node_run_maps_failed_agent_backend_run_to_node_result():
events = list(_node(scenario=FakeAgentBackendScenario.FAILED)._run())
assert len(events) == 1
result = cast(StreamCompletedEvent, events[0]).node_run_result
assert result.status == WorkflowNodeExecutionStatus.FAILED
assert result.error == "fake failure"
assert result.error_type == "unit_test"
def test_agent_node_records_stream_usage_metadata():
metadata = {"agent_backend": {"run_id": "run-1"}}
DifyAgentNode._record_stream_metadata(
metadata,
AgentBackendStreamInternalEvent(
run_id="run-1",
source_event_id="1-1",
event_kind="model_response",
data={"usage": {"prompt_tokens": 3, "completion_tokens": 4, "total_tokens": 7}},
),
)
agent_backend = metadata["agent_backend"]
assert agent_backend["last_stream_event_id"] == "1-1"
assert agent_backend["last_stream_event_kind"] == "model_response"
assert agent_backend["usage"] == {"prompt_tokens": 3, "completion_tokens": 4, "total_tokens": 7}

View File

@ -0,0 +1,121 @@
import pytest
from core.workflow.nodes.agent_v2.binding_resolver import (
WorkflowAgentBindingError,
WorkflowAgentBindingResolver,
)
from models.agent import Agent, AgentConfigSnapshot, AgentStatus, WorkflowAgentNodeBinding
from models.agent_config_entities import AgentSoulConfig, AgentSoulModelConfig, WorkflowNodeJobConfig
class FakeSession:
def __init__(self, scalar_results):
self._scalar_results = list(scalar_results)
self.expunge_calls = []
def __enter__(self):
return self
def __exit__(self, exc_type, exc, tb):
return False
def scalar(self, _stmt):
if not self._scalar_results:
return None
return self._scalar_results.pop(0)
def expunge(self, value):
self.expunge_calls.append(value)
def _binding() -> WorkflowAgentNodeBinding:
return WorkflowAgentNodeBinding(
id="binding-1",
tenant_id="tenant-1",
app_id="app-1",
workflow_id="workflow-1",
node_id="agent-node",
agent_id="agent-1",
current_snapshot_id="snapshot-1",
node_job_config=WorkflowNodeJobConfig(),
)
def _agent(*, status: AgentStatus = AgentStatus.ACTIVE) -> Agent:
return Agent(id="agent-1", tenant_id="tenant-1", name="Agent", status=status)
def _snapshot() -> AgentConfigSnapshot:
return AgentConfigSnapshot(
id="snapshot-1",
tenant_id="tenant-1",
agent_id="agent-1",
version=1,
config_snapshot=AgentSoulConfig(
model=AgentSoulModelConfig(
plugin_id="langgenius/openai",
model_provider="openai",
model="gpt-test",
)
),
)
def _resolve() -> dict[str, str]:
return {
"tenant_id": "tenant-1",
"app_id": "app-1",
"workflow_id": "workflow-1",
"node_id": "agent-node",
}
def test_binding_resolver_returns_detached_binding_bundle(monkeypatch: pytest.MonkeyPatch):
fake_session = FakeSession([_binding(), _agent(), _snapshot()])
monkeypatch.setattr(
"core.workflow.nodes.agent_v2.binding_resolver.session_factory.create_session",
lambda: fake_session,
)
bundle = WorkflowAgentBindingResolver().resolve(**_resolve())
assert bundle.binding.id == "binding-1"
assert bundle.agent.id == "agent-1"
assert bundle.snapshot.id == "snapshot-1"
assert fake_session.expunge_calls == [bundle.binding, bundle.agent, bundle.snapshot]
def test_binding_resolver_raises_when_binding_missing(monkeypatch: pytest.MonkeyPatch):
monkeypatch.setattr(
"core.workflow.nodes.agent_v2.binding_resolver.session_factory.create_session",
lambda: FakeSession([None]),
)
with pytest.raises(WorkflowAgentBindingError) as exc_info:
WorkflowAgentBindingResolver().resolve(**_resolve())
assert exc_info.value.error_code == "agent_binding_not_found"
def test_binding_resolver_raises_when_agent_archived(monkeypatch: pytest.MonkeyPatch):
monkeypatch.setattr(
"core.workflow.nodes.agent_v2.binding_resolver.session_factory.create_session",
lambda: FakeSession([_binding(), _agent(status=AgentStatus.ARCHIVED)]),
)
with pytest.raises(WorkflowAgentBindingError) as exc_info:
WorkflowAgentBindingResolver().resolve(**_resolve())
assert exc_info.value.error_code == "agent_not_available"
def test_binding_resolver_raises_when_snapshot_missing(monkeypatch: pytest.MonkeyPatch):
monkeypatch.setattr(
"core.workflow.nodes.agent_v2.binding_resolver.session_factory.create_session",
lambda: FakeSession([_binding(), _agent(), None]),
)
with pytest.raises(WorkflowAgentBindingError) as exc_info:
WorkflowAgentBindingResolver().resolve(**_resolve())
assert exc_info.value.error_code == "agent_config_snapshot_not_found"

View File

@ -0,0 +1,194 @@
from agenton.compositor import CompositorSessionSnapshot
from clients.agent_backend import (
AgentBackendRunCancelledInternalEvent,
AgentBackendRunFailedInternalEvent,
AgentBackendRunPausedInternalEvent,
AgentBackendRunSucceededInternalEvent,
)
from core.workflow.nodes.agent_v2.output_adapter import WorkflowAgentOutputAdapter
from graphon.enums import WorkflowNodeExecutionMetadataKey, WorkflowNodeExecutionStatus
from graphon.file import FileTransferMethod, FileType
from graphon.variables.segments import ArrayFileSegment, FileSegment
def test_success_output_adapter_preserves_dict_output():
result = WorkflowAgentOutputAdapter().build_success_result(
event=AgentBackendRunSucceededInternalEvent(
run_id="run-1",
source_event_id="2-0",
output={"summary": "ok"},
session_snapshot=CompositorSessionSnapshot(layers=[]),
),
inputs={},
process_data={},
metadata={"agent_backend": {"run_id": "run-1"}},
)
assert result.status == WorkflowNodeExecutionStatus.SUCCEEDED
assert result.outputs == {"summary": "ok"}
assert result.metadata[WorkflowNodeExecutionMetadataKey.AGENT_LOG]["agent_backend"]["status"] == "succeeded"
assert result.metadata[WorkflowNodeExecutionMetadataKey.AGENT_LOG]["agent_backend"]["session_snapshot"] == {
"layer_count": 0,
}
def test_failure_output_adapter_maps_paused_to_unsupported_failure():
result = WorkflowAgentOutputAdapter().build_failure_result(
event=AgentBackendRunPausedInternalEvent(
run_id="run-1",
source_event_id="2-0",
reason="human",
message=None,
session_snapshot=None,
),
inputs={},
process_data={},
metadata={},
)
assert result.status == WorkflowNodeExecutionStatus.FAILED
assert result.error_type == "agent_backend_paused_unsupported"
def test_failure_output_adapter_preserves_backend_failed_reason():
result = WorkflowAgentOutputAdapter().build_failure_result(
event=AgentBackendRunFailedInternalEvent(
run_id="run-1",
source_event_id="2-0",
error="bad request",
reason="validation",
),
inputs={},
process_data={},
metadata={},
)
assert result.status == WorkflowNodeExecutionStatus.FAILED
assert result.error == "bad request"
assert result.error_type == "validation"
def test_success_output_adapter_normalizes_string_and_scalar_outputs():
adapter = WorkflowAgentOutputAdapter()
string_result = adapter.build_success_result(
event=AgentBackendRunSucceededInternalEvent(
run_id="run-1",
source_event_id="2-0",
output="hello",
session_snapshot=CompositorSessionSnapshot(layers=[]),
),
inputs={},
process_data={},
metadata={},
)
scalar_result = adapter.build_success_result(
event=AgentBackendRunSucceededInternalEvent(
run_id="run-2",
source_event_id="2-0",
output=3,
session_snapshot=CompositorSessionSnapshot(layers=[]),
),
inputs={},
process_data={},
metadata={},
)
assert string_result.outputs == {"text": "hello"}
assert scalar_result.outputs == {"result": 3}
def test_success_output_adapter_normalizes_file_output_to_file_segments():
result = WorkflowAgentOutputAdapter().build_success_result(
event=AgentBackendRunSucceededInternalEvent(
run_id="run-1",
source_event_id="2-0",
output={
"report": {
"file_id": "upload-file-1",
"filename": "report.pdf",
"mime_type": "application/pdf",
"size": 12,
},
"attachments": [
{
"tool_file_id": "tool-file-1",
"filename": "chart.png",
"mime_type": "image/png",
}
],
},
session_snapshot=CompositorSessionSnapshot(layers=[]),
),
inputs={},
process_data={},
metadata={},
)
report = result.outputs["report"]
assert isinstance(report, FileSegment)
assert report.value.type == FileType.DOCUMENT
assert report.value.transfer_method == FileTransferMethod.LOCAL_FILE
assert report.value.reference == "upload-file-1"
attachments = result.outputs["attachments"]
assert isinstance(attachments, ArrayFileSegment)
assert attachments.value[0].type == FileType.IMAGE
assert attachments.value[0].transfer_method == FileTransferMethod.TOOL_FILE
assert attachments.value[0].reference == "tool-file-1"
def test_success_output_adapter_maps_backend_usage_to_llm_usage_and_metadata():
result = WorkflowAgentOutputAdapter().build_success_result(
event=AgentBackendRunSucceededInternalEvent(
run_id="run-1",
source_event_id="2-0",
output={"summary": "ok"},
session_snapshot=CompositorSessionSnapshot(layers=[]),
),
inputs={},
process_data={},
metadata={
"agent_backend": {
"usage": {
"prompt_tokens": 10,
"completion_tokens": 5,
"total_tokens": 15,
}
}
},
)
assert result.llm_usage.prompt_tokens == 10
assert result.llm_usage.completion_tokens == 5
assert result.llm_usage.total_tokens == 15
assert result.metadata[WorkflowNodeExecutionMetadataKey.TOTAL_TOKENS] == 15
def test_failure_output_adapter_maps_cancelled_to_failure_code():
result = WorkflowAgentOutputAdapter().build_failure_result(
event=AgentBackendRunCancelledInternalEvent(
run_id="run-1",
source_event_id="2-0",
reason="user_cancelled",
message=None,
),
inputs={},
process_data={},
metadata={},
)
assert result.status == WorkflowNodeExecutionStatus.FAILED
assert result.error_type == "agent_backend_run_cancelled"
def test_stream_exhausted_result_is_failed_with_stream_error():
result = WorkflowAgentOutputAdapter().build_stream_exhausted_result(
inputs={},
process_data={},
metadata={"agent_backend": {"run_id": "run-1"}},
)
assert result.status == WorkflowNodeExecutionStatus.FAILED
assert result.error_type == "agent_backend_stream_error"
assert result.metadata[WorkflowNodeExecutionMetadataKey.AGENT_LOG]["agent_backend"]["run_id"] == "run-1"

View File

@ -0,0 +1,219 @@
from dataclasses import replace
import pytest
from core.app.entities.app_invoke_entities import DifyRunContext, InvokeFrom, UserFrom
from core.workflow.nodes.agent_v2.runtime_request_builder import (
WorkflowAgentRuntimeBuildContext,
WorkflowAgentRuntimeRequestBuilder,
WorkflowAgentRuntimeRequestBuildError,
)
from graphon.variables.segments import StringSegment
from models.agent import Agent, AgentConfigSnapshot, WorkflowAgentNodeBinding
from models.agent_config_entities import (
AgentSoulConfig,
AgentSoulModelConfig,
DeclaredOutputType,
WorkflowNodeJobConfig,
)
class FakeCredentialsProvider:
def fetch(self, provider_name: str, model_name: str) -> dict[str, object]:
assert provider_name == "openai"
assert model_name == "gpt-test"
return {"api_key": "secret-key"}
class FakeVariablePool:
def get(self, selector):
if list(selector) == ["sys", "query"]:
return StringSegment(value="Summarize the report.")
if list(selector) == ["previous-node", "text"]:
return StringSegment(value="Previous result")
return None
def get_by_prefix(self, prefix):
return {}
def _context() -> WorkflowAgentRuntimeBuildContext:
agent = Agent(id="agent-1", tenant_id="tenant-1", name="Agent")
snapshot = AgentConfigSnapshot(
id="snapshot-1",
tenant_id="tenant-1",
agent_id="agent-1",
version=1,
config_snapshot=AgentSoulConfig(
prompt={"system_prompt": "You are careful."},
model=AgentSoulModelConfig(
plugin_id="langgenius/openai",
model_provider="openai",
model="gpt-test",
model_settings={"temperature": 0},
),
),
)
binding = WorkflowAgentNodeBinding(
id="binding-1",
tenant_id="tenant-1",
app_id="app-1",
workflow_id="workflow-1",
node_id="agent-node",
agent_id="agent-1",
current_snapshot_id="snapshot-1",
node_job_config=WorkflowNodeJobConfig.model_validate(
{
"workflow_prompt": "Use the previous output.",
"previous_node_output_refs": [{"node_id": "previous-node", "output": "text"}],
"declared_outputs": [{"name": "summary", "type": "string"}],
}
),
)
return WorkflowAgentRuntimeBuildContext(
dify_context=DifyRunContext(
tenant_id="tenant-1",
app_id="app-1",
user_id="user-1",
user_from=UserFrom.ACCOUNT,
invoke_from=InvokeFrom.DEBUGGER,
),
workflow_id="workflow-1",
workflow_run_id="run-1",
node_id="agent-node",
node_execution_id="node-exec-1",
variable_pool=FakeVariablePool(),
binding=binding,
agent=agent,
snapshot=snapshot,
)
def test_builds_create_run_request_from_agent_soul_and_node_job():
result = WorkflowAgentRuntimeRequestBuilder(credentials_provider=FakeCredentialsProvider()).build(_context())
dumped = result.request.model_dump(mode="json")
assert dumped["execution_context"]["agent_id"] == "agent-1"
assert dumped["execution_context"]["agent_config_version_id"] == "snapshot-1"
assert dumped["execution_context"]["invoke_from"] == "single_step"
assert dumped["idempotency_key"] == "run-1:node-exec-1"
assert dumped["composition"]["layers"][0]["config"]["prefix"] == "You are careful."
assert dumped["composition"]["layers"][1]["config"]["prefix"] == "Use the previous output."
assert "Previous result" in dumped["composition"]["layers"][2]["config"]["user"]
assert dumped["composition"]["layers"][-1]["config"]["json_schema"]["properties"]["summary"]["type"] == "string"
assert result.redacted_request["composition"]["layers"][4]["config"]["credentials"] == "[REDACTED]"
def test_builds_workflow_run_request_with_file_output_schema_and_reserved_metadata():
context = _context()
snapshot = AgentConfigSnapshot(
id="snapshot-1",
tenant_id="tenant-1",
agent_id="agent-1",
version=1,
config_snapshot=AgentSoulConfig(
prompt={"system_prompt": "You are careful."},
model=AgentSoulModelConfig(
plugin_id="langgenius/openai",
model_provider="openai",
model="gpt-test",
model_settings={"temperature": 0.2},
),
tools={"cli_tools": [{"name": "pytest"}]},
),
)
binding = WorkflowAgentNodeBinding(
id="binding-1",
tenant_id="tenant-1",
app_id="app-1",
workflow_id="workflow-1",
node_id="agent-node",
agent_id="agent-1",
current_snapshot_id="snapshot-1",
node_job_config=WorkflowNodeJobConfig.model_validate(
{
"declared_outputs": [
{"name": "report", "type": DeclaredOutputType.FILE},
{"name": "confidence", "type": DeclaredOutputType.NUMBER, "required": False},
],
}
),
)
dify_context = context.dify_context.model_copy(update={"invoke_from": InvokeFrom.SERVICE_API})
context = replace(context, dify_context=dify_context, workflow_run_id=None, snapshot=snapshot, binding=binding)
result = WorkflowAgentRuntimeRequestBuilder(credentials_provider=FakeCredentialsProvider()).build(context)
dumped = result.request.model_dump(mode="json")
assert dumped["execution_context"]["invoke_from"] == "workflow_run"
assert dumped["idempotency_key"] == "node-exec-1"
output_schema = dumped["composition"]["layers"][-1]["config"]["json_schema"]
assert output_schema["properties"]["report"]["properties"]["file_id"]["type"] == "string"
assert output_schema["properties"]["confidence"]["type"] == "number"
assert output_schema["required"] == ["report"]
assert dumped["composition"]["layers"][4]["config"]["model_settings"] == {"temperature": 0.2}
assert result.metadata["runtime_support"]["reserved_status"]["tools"] == "reserved_not_executed"
assert result.metadata["runtime_support"]["unsupported_runtime_warnings"][0]["section"] == "agent_soul.tools"
def test_requires_agent_soul_model_config():
context = _context()
snapshot = AgentConfigSnapshot(
id="snapshot-1",
tenant_id="tenant-1",
agent_id="agent-1",
version=1,
config_snapshot=AgentSoulConfig(),
)
context = replace(context, snapshot=snapshot)
with pytest.raises(WorkflowAgentRuntimeRequestBuildError, match="Agent Soul model"):
WorkflowAgentRuntimeRequestBuilder(credentials_provider=FakeCredentialsProvider()).build(context)
def test_missing_previous_node_output_fails_request_build():
context = _context()
binding = WorkflowAgentNodeBinding(
id="binding-1",
tenant_id="tenant-1",
app_id="app-1",
workflow_id="workflow-1",
node_id="agent-node",
agent_id="agent-1",
current_snapshot_id="snapshot-1",
node_job_config=WorkflowNodeJobConfig.model_validate(
{
"previous_node_output_refs": [{"node_id": "missing-node", "output": "text"}],
}
),
)
context = replace(context, binding=binding)
with pytest.raises(WorkflowAgentRuntimeRequestBuildError) as exc_info:
WorkflowAgentRuntimeRequestBuilder(credentials_provider=FakeCredentialsProvider()).build(context)
assert exc_info.value.error_code == "missing_previous_node_output"
def test_invalid_previous_node_output_ref_fails_request_build():
context = _context()
binding = WorkflowAgentNodeBinding(
id="binding-1",
tenant_id="tenant-1",
app_id="app-1",
workflow_id="workflow-1",
node_id="agent-node",
agent_id="agent-1",
current_snapshot_id="snapshot-1",
node_job_config=WorkflowNodeJobConfig.model_validate(
{
"previous_node_output_refs": [{"selector": ["previous-node", 1]}],
}
),
)
context = replace(context, binding=binding)
with pytest.raises(WorkflowAgentRuntimeRequestBuildError) as exc_info:
WorkflowAgentRuntimeRequestBuilder(credentials_provider=FakeCredentialsProvider()).build(context)
assert exc_info.value.error_code == "invalid_previous_node_output_ref"

View File

@ -0,0 +1,271 @@
import json
from types import SimpleNamespace
from unittest.mock import Mock
import pytest
from core.workflow.nodes.agent_v2.validators import (
WorkflowAgentNodeValidationError,
WorkflowAgentNodeValidator,
)
from models.agent import Agent, AgentConfigSnapshot, AgentStatus, WorkflowAgentNodeBinding
from models.agent_config_entities import AgentSoulConfig, AgentSoulModelConfig, WorkflowNodeJobConfig
from models.workflow import Workflow
def _workflow(graph: dict) -> Workflow:
return Workflow(
id="workflow-1",
tenant_id="tenant-1",
app_id="app-1",
graph=json.dumps(graph),
)
def _binding(node_job: WorkflowNodeJobConfig) -> WorkflowAgentNodeBinding:
return WorkflowAgentNodeBinding(
id="binding-1",
tenant_id="tenant-1",
app_id="app-1",
workflow_id="workflow-1",
node_id="agent-node",
agent_id="agent-1",
current_snapshot_id="snapshot-1",
node_job_config=node_job,
)
def _agent() -> Agent:
return Agent(id="agent-1", tenant_id="tenant-1", name="Agent", status=AgentStatus.ACTIVE)
def _snapshot() -> AgentConfigSnapshot:
return AgentConfigSnapshot(
id="snapshot-1",
tenant_id="tenant-1",
agent_id="agent-1",
version=1,
config_snapshot=AgentSoulConfig(
model=AgentSoulModelConfig(
plugin_id="langgenius/openai",
model_provider="openai",
model="gpt-test",
)
),
)
def _graph(edges: list[dict]) -> dict:
return {
"nodes": [
{"id": "start", "data": {"type": "start"}},
{"id": "previous-node", "data": {"type": "llm"}},
{"id": "agent-node", "data": {"type": "agent", "version": "2"}},
{"id": "later-node", "data": {"type": "llm"}},
],
"edges": edges,
}
def test_publish_validation_accepts_upstream_previous_output_ref():
node_job = WorkflowNodeJobConfig.model_validate(
{"previous_node_output_refs": [{"node_id": "previous-node", "output": "text"}]}
)
session = Mock()
session.scalar.side_effect = [_binding(node_job), _agent(), _snapshot()]
WorkflowAgentNodeValidator.validate_published_workflow(
session=session,
workflow=_workflow(
_graph(
[
{"source": "start", "target": "previous-node"},
{"source": "previous-node", "target": "agent-node"},
]
)
),
)
def test_publish_validation_rejects_non_upstream_previous_output_ref():
node_job = WorkflowNodeJobConfig.model_validate(
{"previous_node_output_refs": [{"node_id": "later-node", "output": "text"}]}
)
session = Mock()
session.scalar.side_effect = [_binding(node_job), _agent(), _snapshot()]
with pytest.raises(WorkflowAgentNodeValidationError, match="non-upstream"):
WorkflowAgentNodeValidator.validate_published_workflow(
session=session,
workflow=_workflow(
_graph(
[
{"source": "start", "target": "agent-node"},
{"source": "agent-node", "target": "later-node"},
]
)
),
)
def test_draft_validation_allows_unbound_agent_node():
session = Mock()
session.scalar.return_value = None
WorkflowAgentNodeValidator.validate_draft_workflow(
session=session,
workflow=_workflow(_graph([{"source": "start", "target": "agent-node"}])),
)
def test_publish_validation_requires_binding():
session = Mock()
session.scalar.return_value = None
with pytest.raises(WorkflowAgentNodeValidationError, match="requires a binding"):
WorkflowAgentNodeValidator.validate_published_workflow(
session=session,
workflow=_workflow(_graph([{"source": "start", "target": "agent-node"}])),
)
def test_publish_validation_rejects_duplicate_output_names():
node_job = WorkflowNodeJobConfig.model_validate(
{
"declared_outputs": [
{"name": "summary", "type": "string"},
{"name": "summary", "type": "number"},
]
}
)
session = Mock()
session.scalar.side_effect = [_binding(node_job), _agent(), _snapshot()]
with pytest.raises(WorkflowAgentNodeValidationError, match="duplicate output name"):
WorkflowAgentNodeValidator.validate_published_workflow(
session=session,
workflow=_workflow(_graph([{"source": "start", "target": "agent-node"}])),
)
def test_publish_validation_rejects_missing_agent_soul_model():
node_job = WorkflowNodeJobConfig.model_validate({})
snapshot = AgentConfigSnapshot(
id="snapshot-1",
tenant_id="tenant-1",
agent_id="agent-1",
version=1,
config_snapshot=AgentSoulConfig(),
)
session = Mock()
session.scalar.side_effect = [_binding(node_job), _agent(), snapshot]
with pytest.raises(WorkflowAgentNodeValidationError, match="requires Agent Soul model"):
WorkflowAgentNodeValidator.validate_published_workflow(
session=session,
workflow=_workflow(_graph([{"source": "start", "target": "agent-node"}])),
)
def test_publish_validation_rejects_missing_previous_node():
node_job = WorkflowNodeJobConfig.model_validate(
{"previous_node_output_refs": [{"node_id": "missing-node", "output": "text"}]}
)
session = Mock()
session.scalar.side_effect = [_binding(node_job), _agent(), _snapshot()]
with pytest.raises(WorkflowAgentNodeValidationError, match="references missing previous node"):
WorkflowAgentNodeValidator.validate_published_workflow(
session=session,
workflow=_workflow(_graph([{"source": "start", "target": "agent-node"}])),
)
def test_publish_validation_rejects_self_previous_output_ref():
node_job = WorkflowNodeJobConfig.model_validate(
{"previous_node_output_refs": [{"node_id": "agent-node", "output": "text"}]}
)
session = Mock()
session.scalar.side_effect = [_binding(node_job), _agent(), _snapshot()]
with pytest.raises(WorkflowAgentNodeValidationError, match="non-upstream"):
WorkflowAgentNodeValidator.validate_published_workflow(
session=session,
workflow=_workflow(_graph([{"source": "start", "target": "agent-node"}])),
)
def test_publish_validation_rejects_locked_agent_soul_override_in_metadata():
node_job = WorkflowNodeJobConfig.model_validate({"metadata": {"agent_soul": {"tools": []}}})
session = Mock()
session.scalar.side_effect = [_binding(node_job), _agent(), _snapshot()]
with pytest.raises(WorkflowAgentNodeValidationError, match="cannot override locked Agent Soul fields"):
WorkflowAgentNodeValidator.validate_published_workflow(
session=session,
workflow=_workflow(_graph([{"source": "start", "target": "agent-node"}])),
)
def test_publish_validation_rejects_invalid_human_contact_ref():
node_job = WorkflowNodeJobConfig.model_validate({"human_contacts": [{"channel": "slack"}]})
session = Mock()
session.scalar.side_effect = [_binding(node_job), _agent(), _snapshot()]
with pytest.raises(WorkflowAgentNodeValidationError, match="invalid human contact ref"):
WorkflowAgentNodeValidator.validate_published_workflow(
session=session,
workflow=_workflow(_graph([{"source": "start", "target": "agent-node"}])),
)
def test_publish_validation_rejects_out_of_scope_human_contact_ref():
node_job = WorkflowNodeJobConfig.model_validate(
{"human_contacts": [{"contact_id": "human-1", "tenant_id": "other-tenant", "channel": "slack"}]}
)
session = Mock()
session.scalar.side_effect = [_binding(node_job), _agent(), _snapshot()]
with pytest.raises(WorkflowAgentNodeValidationError, match="out-of-scope human contact"):
WorkflowAgentNodeValidator.validate_published_workflow(
session=session,
workflow=_workflow(_graph([{"source": "start", "target": "agent-node"}])),
)
def test_publish_validation_accepts_tenant_scoped_file_ref():
node_job = WorkflowNodeJobConfig.model_validate(
{
"declared_outputs": [
{
"name": "report",
"type": "file",
"checks": [{"type": "benchmark", "benchmark_file_ref": {"upload_file_id": "file-1"}}],
}
]
}
)
session = Mock()
session.scalar.side_effect = [
_binding(node_job),
_agent(),
_snapshot(),
SimpleNamespace(id="file-1", tenant_id="tenant-1"),
]
WorkflowAgentNodeValidator.validate_published_workflow(
session=session,
workflow=_workflow(_graph([{"source": "start", "target": "agent-node"}])),
)
def test_publish_validation_rejects_missing_file_ref():
node_job = WorkflowNodeJobConfig.model_validate({"metadata": {"file_refs": [{"upload_file_id": "missing-file"}]}})
session = Mock()
session.scalar.side_effect = [_binding(node_job), _agent(), _snapshot(), None]
with pytest.raises(WorkflowAgentNodeValidationError, match="missing or out-of-scope metadata file ref"):
WorkflowAgentNodeValidator.validate_published_workflow(
session=session,
workflow=_workflow(_graph([{"source": "start", "target": "agent-node"}])),
)

View File

@ -24,6 +24,7 @@ def test_moved_core_nodes_resolve_after_importing_production_entrypoints():
from core.workflow import workflow_entry
from core.workflow.nodes.knowledge_index import KNOWLEDGE_INDEX_NODE_TYPE
from core.workflow.node_factory import DifyNodeFactory, NODE_TYPE_CLASSES_MAPPING
from core.workflow.nodes.agent_v2 import DifyAgentNode
from graphon.enums import BuiltinNodeTypes
from services import workflow_service
from services.rag_pipeline import rag_pipeline
@ -40,6 +41,11 @@ def test_moved_core_nodes_resolve_after_importing_production_entrypoints():
assert node_type in NODE_TYPE_CLASSES_MAPPING, node_type
resolved = DifyNodeFactory._resolve_node_class(node_type=node_type, node_version="1")
assert resolved.__module__.startswith("core.workflow.nodes."), resolved.__module__
assert DifyNodeFactory._resolve_node_class(
node_type=BuiltinNodeTypes.AGENT,
node_version="2",
) is DifyAgentNode
"""
)
completed = subprocess.run(

View File

@ -1,6 +1,6 @@
import pytest
from models.agent_config_entities import AgentKnowledgeQueryMode, DeclaredOutputType
from models.agent_config_entities import AgentKnowledgeQueryMode, AgentSoulModelConfig, DeclaredOutputType
from services.agent.composer_service import AgentComposerService
from services.agent.composer_validator import ComposerConfigValidator
from services.agent.errors import AgentSoulLockedError, PlaintextSecretNotAllowedError
@ -88,6 +88,22 @@ def test_knowledge_query_mode_uses_stable_backend_enums():
assert config.knowledge.query_mode == AgentKnowledgeQueryMode.GENERATED_QUERY
def test_agent_soul_model_config_is_first_class_without_credentials():
config = AgentSoulConfig(
model=AgentSoulModelConfig(
plugin_id="langgenius/openai",
model_provider="openai",
model="gpt-test",
credential_ref={"type": "provider", "id": "credential-1"},
model_settings={"temperature": 0},
)
)
dumped = config.model_dump(mode="json")
assert dumped["model"]["plugin_id"] == "langgenius/openai"
assert dumped["model"]["credential_ref"] == {"type": "provider", "id": "credential-1", "provider": None}
def test_declared_outputs_support_file_check_and_failure_strategy():
node_job = WorkflowNodeJobConfig.model_validate(
{

View File

@ -34,6 +34,7 @@ export type AgentSoulConfig = {
misc_legacy?: {
[key: string]: unknown
}
model?: AgentSoulModelConfig
prompt?: AgentSoulPromptConfig
sandbox?: AgentSoulSandboxConfig
schema_version?: number
@ -86,6 +87,16 @@ export type AgentSoulMemoryConfig = {
scope?: string | null
}
export type AgentSoulModelConfig = {
credential_ref?: AgentSoulModelCredentialRef
model: string
model_provider: string
model_settings?: {
[key: string]: unknown
}
plugin_id: string
}
export type AgentSoulPromptConfig = {
system_prompt?: string
}
@ -117,6 +128,12 @@ export type AgentSoulToolsConfig = {
export type AgentKnowledgeQueryMode = 'generated_query' | 'user_query'
export type AgentSoulModelCredentialRef = {
id?: string | null
provider?: string | null
type: string
}
export type GetAgentsData = {
body?: never
path?: never

View File

@ -100,6 +100,30 @@ export const zAgentSoulKnowledgeConfig = z.object({
query_mode: zAgentKnowledgeQueryMode.optional(),
})
/**
* AgentSoulModelCredentialRef
*
* Reference to model credentials resolved only at runtime.
*/
export const zAgentSoulModelCredentialRef = z.object({
id: z.string().max(255).nullish(),
provider: z.string().max(255).nullish(),
type: z.string().min(1).max(64),
})
/**
* AgentSoulModelConfig
*
* Stable model selection for Agent runtime without storing secret values.
*/
export const zAgentSoulModelConfig = z.object({
credential_ref: zAgentSoulModelCredentialRef.optional(),
model: z.string().min(1).max(255),
model_provider: z.string().min(1).max(255),
model_settings: z.record(z.string(), z.unknown()).optional(),
plugin_id: z.string().min(1).max(255),
})
/**
* AgentSoulConfig
*/
@ -111,6 +135,7 @@ export const zAgentSoulConfig = z.object({
knowledge: zAgentSoulKnowledgeConfig.optional(),
memory: zAgentSoulMemoryConfig.optional(),
misc_legacy: z.record(z.string(), z.unknown()).optional(),
model: zAgentSoulModelConfig.optional(),
prompt: zAgentSoulPromptConfig.optional(),
sandbox: zAgentSoulSandboxConfig.optional(),
schema_version: z.int().optional().default(1),

View File

@ -973,6 +973,7 @@ export type AgentSoulConfig = {
misc_legacy?: {
[key: string]: unknown
}
model?: AgentSoulModelConfig
prompt?: AgentSoulPromptConfig
sandbox?: AgentSoulSandboxConfig
schema_version?: number
@ -1395,6 +1396,16 @@ export type AgentSoulMemoryConfig = {
scope?: string | null
}
export type AgentSoulModelConfig = {
credential_ref?: AgentSoulModelCredentialRef
model: string
model_provider: string
model_settings?: {
[key: string]: unknown
}
plugin_id: string
}
export type AgentSoulPromptConfig = {
system_prompt?: string
}
@ -1507,6 +1518,12 @@ export type WorkflowRunForArchivedLogResponse = {
export type AgentKnowledgeQueryMode = 'generated_query' | 'user_query'
export type AgentSoulModelCredentialRef = {
id?: string | null
provider?: string | null
type: string
}
export type DeclaredOutputCheckConfig = {
benchmark_file_ref?: {
[key: string]: unknown

View File

@ -1747,6 +1747,30 @@ export const zAgentSoulKnowledgeConfig = z.object({
query_mode: zAgentKnowledgeQueryMode.optional(),
})
/**
* AgentSoulModelCredentialRef
*
* Reference to model credentials resolved only at runtime.
*/
export const zAgentSoulModelCredentialRef = z.object({
id: z.string().max(255).nullish(),
provider: z.string().max(255).nullish(),
type: z.string().min(1).max(64),
})
/**
* AgentSoulModelConfig
*
* Stable model selection for Agent runtime without storing secret values.
*/
export const zAgentSoulModelConfig = z.object({
credential_ref: zAgentSoulModelCredentialRef.optional(),
model: z.string().min(1).max(255),
model_provider: z.string().min(1).max(255),
model_settings: z.record(z.string(), z.unknown()).optional(),
plugin_id: z.string().min(1).max(255),
})
/**
* AgentSoulConfig
*/
@ -1758,6 +1782,7 @@ export const zAgentSoulConfig = z.object({
knowledge: zAgentSoulKnowledgeConfig.optional(),
memory: zAgentSoulMemoryConfig.optional(),
misc_legacy: z.record(z.string(), z.unknown()).optional(),
model: zAgentSoulModelConfig.optional(),
prompt: zAgentSoulPromptConfig.optional(),
sandbox: zAgentSoulSandboxConfig.optional(),
schema_version: z.int().optional().default(1),