test(api): cover workflow typing paths

This commit is contained in:
Yanli 盐粒 2026-03-25 19:06:04 +08:00
parent 82acddddb4
commit 7b76fdc1d3
5 changed files with 431 additions and 0 deletions

View File

@ -33,6 +33,79 @@ from dify_graph.system_variable import SystemVariable
class TestWorkflowBasedAppRunner:
def test_get_graph_items_rejects_non_mapping_entries(self):
with pytest.raises(ValueError, match="nodes in workflow graph must be mappings"):
WorkflowBasedAppRunner._get_graph_items({"nodes": ["bad"], "edges": []})
with pytest.raises(ValueError, match="edges in workflow graph must be mappings"):
WorkflowBasedAppRunner._get_graph_items({"nodes": [], "edges": ["bad"]})
def test_extract_start_node_id_handles_missing_and_invalid_values(self):
assert WorkflowBasedAppRunner._extract_start_node_id(None) is None
assert WorkflowBasedAppRunner._extract_start_node_id({"data": "invalid"}) is None
assert WorkflowBasedAppRunner._extract_start_node_id({"data": {"start_node_id": 123}}) is None
assert WorkflowBasedAppRunner._extract_start_node_id({"data": {"start_node_id": "start-node"}}) == "start-node"
def test_build_single_node_graph_config_keeps_target_related_and_start_nodes(self):
graph_config, target_node_config = WorkflowBasedAppRunner._build_single_node_graph_config(
graph_config={
"nodes": [
{"id": "start-node", "data": {"type": "start", "version": "1"}},
{
"id": "loop-node",
"data": {"type": "loop", "version": "1", "start_node_id": "start-node"},
},
{
"id": "loop-child",
"data": {"type": "answer", "version": "1", "loop_id": "loop-node"},
},
{"id": "outside-node", "data": {"type": "answer", "version": "1"}},
],
"edges": [
{"source": "start-node", "target": "loop-node"},
{"source": "loop-node", "target": "loop-child"},
{"source": "loop-node", "target": "outside-node"},
],
},
node_id="loop-node",
node_type_filter_key="loop_id",
)
assert [node["id"] for node in graph_config["nodes"]] == ["start-node", "loop-node", "loop-child"]
assert graph_config["edges"] == [
{"source": "start-node", "target": "loop-node"},
{"source": "loop-node", "target": "loop-child"},
]
assert target_node_config["id"] == "loop-node"
def test_build_agent_strategy_info_validates_payload(self):
event = NodeRunStartedEvent(
id="exec",
node_id="node",
node_type=BuiltinNodeTypes.START,
node_title="Start",
start_at=datetime.utcnow(),
extras={"agent_strategy": {"name": "planner", "icon": "robot"}},
)
strategy = WorkflowBasedAppRunner._build_agent_strategy_info(event)
assert strategy is not None
assert strategy.name == "planner"
assert strategy.icon == "robot"
def test_build_agent_strategy_info_returns_none_for_invalid_payload(self):
event = NodeRunStartedEvent(
id="exec",
node_id="node",
node_type=BuiltinNodeTypes.START,
node_title="Start",
start_at=datetime.utcnow(),
extras={"agent_strategy": {"name": "planner", "extra": "ignored"}},
)
assert WorkflowBasedAppRunner._build_agent_strategy_info(event) is None
def test_resolve_user_from(self):
runner = WorkflowBasedAppRunner(queue_manager=SimpleNamespace(), app_id="app")
@ -174,6 +247,34 @@ class TestWorkflowBasedAppRunner:
assert paused_event.paused_nodes == ["node-1"]
assert emails
def test_enqueue_human_input_notifications_skips_invalid_reasons_and_logs_failures(self, monkeypatch):
runner = WorkflowBasedAppRunner(queue_manager=SimpleNamespace(), app_id="app")
seen_calls: list[tuple[dict[str, object], str]] = []
class _Dispatch:
def apply_async(self, *, kwargs, queue):
seen_calls.append((kwargs, queue))
raise RuntimeError("boom")
logged: list[str] = []
monkeypatch.setattr("core.app.apps.workflow_app_runner.dispatch_human_input_email_task", _Dispatch())
monkeypatch.setattr(
"core.app.apps.workflow_app_runner.logger",
SimpleNamespace(exception=lambda message, form_id: logged.append(f"{message}:{form_id}")),
)
runner._enqueue_human_input_notifications(
[
object(),
HumanInputRequired(form_id="", form_content="content", node_id="node", node_title="Node"),
HumanInputRequired(form_id="form-1", form_content="content", node_id="node", node_title="Node"),
]
)
assert seen_calls == [({"form_id": "form-1", "node_title": "Node"}, "mail")]
assert logged == ["Failed to enqueue human input email task for form %s:form-1"]
def test_handle_node_events_publishes_queue_events(self):
published: list[object] = []

View File

@ -0,0 +1,125 @@
from types import SimpleNamespace
import pytest
from core.tools.entities.tool_entities import ToolProviderType
from core.workflow.nodes.agent.exceptions import AgentVariableNotFoundError
from core.workflow.nodes.agent.runtime_support import AgentRuntimeSupport
def test_filter_mcp_type_tool_depends_on_strategy_meta_version() -> None:
runtime_support = AgentRuntimeSupport()
tools = [
{"type": ToolProviderType.BUILT_IN, "tool_name": "search"},
{"type": ToolProviderType.MCP, "tool_name": "mcp-tool"},
]
filtered_tools = runtime_support._filter_mcp_type_tool(SimpleNamespace(meta_version="0.0.1"), tools)
preserved_tools = runtime_support._filter_mcp_type_tool(SimpleNamespace(meta_version="0.0.2"), tools)
assert filtered_tools == [{"type": ToolProviderType.BUILT_IN, "tool_name": "search"}]
assert preserved_tools == tools
def test_normalize_tool_payloads_keeps_enabled_tools_and_resolves_values() -> None:
runtime_support = AgentRuntimeSupport()
variable_pool = SimpleNamespace(get=lambda selector: SimpleNamespace(value=f"resolved:{'.'.join(selector)}"))
normalized_tools = runtime_support._normalize_tool_payloads(
strategy=SimpleNamespace(meta_version="0.0.2"),
tools=[
{
"enabled": True,
"tool_name": "search",
"schemas": {"ignored": True},
"parameters": {
"query": {
"auto": 0,
"value": {"type": "variable", "value": ["start", "query"]},
},
"top_k": {
"auto": 0,
"value": {"type": "constant", "value": 3},
},
"optional": {"auto": 1, "value": {"type": "constant", "value": "skip"}},
},
"settings": {
"region": {"value": "us"},
"safe": {"value": True},
},
},
{"enabled": False, "tool_name": "disabled"},
],
variable_pool=variable_pool,
)
assert normalized_tools == [
{
"enabled": True,
"tool_name": "search",
"parameters": {"query": "resolved:start.query", "top_k": 3, "optional": None},
"settings": {"region": "us", "safe": True},
}
]
def test_resolve_tool_parameters_raises_for_missing_variable() -> None:
runtime_support = AgentRuntimeSupport()
variable_pool = SimpleNamespace(get=lambda _selector: None)
with pytest.raises(AgentVariableNotFoundError, match=r"\['start', 'query'\]"):
runtime_support._resolve_tool_parameters(
tool={
"parameters": {
"query": {
"auto": 0,
"value": {"type": "variable", "value": ["start", "query"]},
}
}
},
variable_pool=variable_pool,
)
def test_build_credentials_collects_valid_tool_credentials_only() -> None:
runtime_support = AgentRuntimeSupport()
credentials = runtime_support.build_credentials(
parameters={
"tools": [
{
"credential_id": "cred-1",
"identity": {
"author": "author",
"name": "tool",
"label": {"en_US": "Tool"},
"provider": "provider-a",
},
},
{
"credential_id": "cred-2",
"identity": {"author": "author"},
},
{
"credential_id": None,
"identity": {
"author": "author",
"name": "tool",
"label": {"en_US": "Tool"},
"provider": "provider-b",
},
},
"invalid",
]
}
)
assert credentials.tool_credentials == {"provider-a": "cred-1"}
def test_coerce_named_json_objects_requires_string_keys_and_json_object_values() -> None:
runtime_support = AgentRuntimeSupport()
assert runtime_support._coerce_named_json_objects({"valid": {"value": 1}}) == {"valid": {"value": 1}}
assert runtime_support._coerce_named_json_objects({1: {"value": 1}}) is None
assert runtime_support._coerce_named_json_objects({"invalid": object()}) is None

View File

@ -13,6 +13,7 @@ from core.model_manager import ModelInstance
from core.prompt.entities.advanced_prompt_entities import MemoryConfig
from dify_graph.entities import GraphInitParams
from dify_graph.file import File, FileTransferMethod, FileType
from dify_graph.model_runtime.entities import LLMMode
from dify_graph.model_runtime.entities.common_entities import I18nObject
from dify_graph.model_runtime.entities.message_entities import (
AssistantPromptMessage,
@ -55,6 +56,70 @@ class MockTokenBufferMemory:
return self.history_messages
def test_llm_node_data_normalizes_optional_configs_and_legacy_structured_output() -> None:
node_data = LLMNodeData.model_validate(
{
"title": "Test LLM",
"model": {"provider": "openai", "name": "gpt-4o-mini", "mode": LLMMode.CHAT, "completion_params": {}},
"prompt_template": [],
"prompt_config": None,
"memory": None,
"context": {"enabled": False},
"vision": {"enabled": True, "configs": None},
"structured_output": {
"schema": {"type": "object"},
"name": "Response",
"description": "Structured",
},
"structured_output_enabled": True,
}
)
assert node_data.prompt_config.jinja2_variables == []
assert node_data.vision.configs.variable_selector == ["sys", "files"]
assert node_data.structured_output == {
"schema": {"type": "object"},
"name": "Response",
"description": "Structured",
}
assert node_data.structured_output_enabled is True
def test_llm_node_data_discards_legacy_structured_output_without_schema() -> None:
node_data = LLMNodeData.model_validate(
{
"title": "Test LLM",
"model": {"provider": "openai", "name": "gpt-4o-mini", "mode": LLMMode.CHAT, "completion_params": {}},
"prompt_template": [],
"memory": None,
"context": {"enabled": False},
"vision": {"enabled": False},
"structured_output": {"name": "Missing schema"},
"structured_output_enabled": True,
}
)
assert node_data.structured_output is None
assert node_data.structured_output_enabled is False
def test_prompt_config_converts_none_jinja_variables() -> None:
prompt_config = LLMNodeData.model_validate(
{
"title": "Test LLM",
"model": {"provider": "openai", "name": "gpt-4o-mini", "mode": LLMMode.CHAT, "completion_params": {}},
"prompt_template": [],
"prompt_config": None,
"memory": None,
"context": {"enabled": False},
"vision": {"enabled": False},
"structured_output_enabled": False,
}
).prompt_config
assert prompt_config.jinja2_variables == []
@pytest.fixture
def llm_node_data() -> LLMNodeData:
return LLMNodeData(

View File

@ -1,4 +1,7 @@
from types import SimpleNamespace
import pytest
from pydantic import ValidationError
from dify_graph.entities.graph_config import NodeConfigDictAdapter
from dify_graph.nodes.loop.entities import LoopNodeData, LoopValue
@ -71,3 +74,86 @@ def test_get_segment_for_constant_accepts_native_array_values(
assert segment.value_type == var_type
assert segment.value == expected_value
def test_loop_variable_data_validates_variable_selector_and_constant_value() -> None:
variable_input = LoopNodeData(
title="Loop",
loop_count=1,
break_conditions=[],
logical_operator="and",
loop_variables=[
{
"label": "question",
"var_type": SegmentType.STRING,
"value_type": "variable",
"value": ["start", "question"],
},
{
"label": "payload",
"var_type": SegmentType.OBJECT,
"value_type": "constant",
"value": {"count": 1, "items": ["a", 2]},
},
],
)
assert variable_input.loop_variables[0].require_variable_selector() == ["start", "question"]
assert variable_input.loop_variables[1].require_constant_value() == {"count": 1, "items": ["a", 2]}
def test_loop_variable_data_rejects_missing_variable_selector() -> None:
with pytest.raises(ValidationError, match="Variable loop inputs require a selector"):
LoopNodeData(
title="Loop",
loop_count=1,
break_conditions=[],
logical_operator="and",
loop_variables=[
{
"label": "question",
"var_type": SegmentType.STRING,
"value_type": "variable",
"value": None,
}
],
)
def test_loop_node_data_outputs_default_to_empty_mapping_for_none() -> None:
node_data = LoopNodeData(
title="Loop",
loop_count=1,
break_conditions=[],
logical_operator="and",
outputs=None,
)
assert node_data.outputs == {}
def test_append_loop_info_to_event_preserves_existing_loop_metadata() -> None:
node = object.__new__(LoopNode)
node._node_id = "loop-node"
event = SimpleNamespace(
node_run_result=SimpleNamespace(metadata={"loop_id": "existing-loop", "other": "value"}),
in_loop_id=None,
)
node._append_loop_info_to_event(event=event, loop_run_index=2)
assert event.in_loop_id == "loop-node"
assert event.node_run_result.metadata == {"loop_id": "existing-loop", "other": "value"}
def test_clear_loop_subgraph_variables_removes_each_loop_node() -> None:
node = object.__new__(LoopNode)
remove_calls: list[list[str]] = []
node.graph_runtime_state = SimpleNamespace(
variable_pool=SimpleNamespace(remove=lambda selector: remove_calls.append(selector))
)
node._clear_loop_subgraph_variables({"child-a", "child-b"})
assert sorted(remove_calls) == [["child-a"], ["child-b"]]

View File

@ -13,6 +13,8 @@ from core.tools.utils.message_transformer import ToolFileMessageTransformer
from dify_graph.file import File, FileTransferMethod, FileType
from dify_graph.model_runtime.entities.llm_entities import LLMUsage
from dify_graph.node_events import StreamChunkEvent, StreamCompletedEvent
from dify_graph.nodes.tool.entities import ToolEntity as WorkflowToolEntity
from dify_graph.nodes.tool.entities import ToolNodeData
from dify_graph.runtime import GraphRuntimeState, VariablePool
from dify_graph.system_variable import SystemVariable
from dify_graph.variables.segments import ArrayFileSegment
@ -167,3 +169,55 @@ def test_plain_link_messages_remain_links(tool_node: ToolNode):
files_segment = completed_events[0].node_run_result.outputs["files"]
assert isinstance(files_segment, ArrayFileSegment)
assert files_segment.value == []
def test_workflow_tool_entity_accepts_primitives_and_tool_input_payloads() -> None:
entity = WorkflowToolEntity(
provider_id="provider",
provider_type="builtin",
provider_name="provider",
tool_name="search",
tool_label="Search",
tool_configurations={
"timeout": 30,
"query": {"type": "mixed", "value": "hello {{name}}"},
"selector": {"type": "variable", "value": ["start", "question"]},
},
)
assert entity.tool_configurations == {
"timeout": 30,
"query": {"type": "mixed", "value": "hello {{name}}"},
"selector": {"type": "variable", "value": ["start", "question"]},
}
def test_workflow_tool_entity_rejects_invalid_configuration_entries() -> None:
with pytest.raises(TypeError, match="Tool configuration values must be primitives"):
WorkflowToolEntity(
provider_id="provider",
provider_type="builtin",
provider_name="provider",
tool_name="search",
tool_label="Search",
tool_configurations={"bad": [object()]},
)
def test_tool_node_data_filters_missing_tool_parameter_values() -> None:
node_data = ToolNodeData(
title="Tool",
provider_id="provider",
provider_type="builtin",
provider_name="provider",
tool_name="search",
tool_label="Search",
tool_configurations={},
tool_parameters={
"query": {"type": "mixed", "value": "hello"},
"skip_none": None,
"skip_empty": {"type": "constant", "value": None},
},
)
assert set(node_data.tool_parameters.keys()) == {"query"}