test(test_workflow_service): Use new engine's method.

Signed-off-by: -LAN- <laipz8200@outlook.com>
This commit is contained in:
-LAN- 2025-08-29 22:06:10 +08:00
parent 8eb78c04b2
commit 3aa48efd0a
No known key found for this signature in database
GPG Key ID: 6BA0D108DED011FF
1 changed files with 44 additions and 36 deletions

View File

@ -1421,16 +1421,16 @@ class TestWorkflowService:
# Mock successful node execution
def mock_successful_invoke():
from core.workflow.entities.node_entities import NodeRunResult
from core.workflow.entities.workflow_node_execution import WorkflowNodeExecutionStatus
from core.workflow.nodes.base.node import BaseNode
from core.workflow.nodes.event import RunCompletedEvent
from core.workflow.enums import WorkflowNodeExecutionStatus
from core.workflow.graph_events import NodeRunSucceededEvent
from core.workflow.node_events import NodeRunResult
from core.workflow.nodes.base.node import Node
# Create mock node
mock_node = MagicMock(spec=BaseNode)
mock_node.type_ = "start" # Use valid NodeType
mock_node = MagicMock(spec=Node)
mock_node.node_type = "start" # Use valid NodeType
mock_node.title = "Test Node"
mock_node.continue_on_error = False
mock_node.error_strategy = None
# Create mock result with valid metadata
mock_result = NodeRunResult(
@ -1442,14 +1442,18 @@ class TestWorkflowService:
)
# Create mock event
mock_event = RunCompletedEvent(run_result=mock_result)
mock_event = NodeRunSucceededEvent(node_run_result=mock_result)
return mock_node, [mock_event]
# Return node and generator
def event_generator():
yield mock_event
return mock_node, event_generator()
workflow_service = WorkflowService()
# Act
result = workflow_service._handle_node_run_result(
result = workflow_service._handle_single_step_result(
invoke_node_fn=mock_successful_invoke, start_at=start_at, node_id=node_id
)
@ -1459,7 +1463,7 @@ class TestWorkflowService:
assert result.node_type == "start" # Should match the mock node type
assert result.title == "Test Node"
# Import the enum for comparison
from core.workflow.entities.workflow_node_execution import WorkflowNodeExecutionStatus
from core.workflow.enums import WorkflowNodeExecutionStatus
assert result.status == WorkflowNodeExecutionStatus.SUCCEEDED
assert result.inputs is not None
@ -1481,34 +1485,37 @@ class TestWorkflowService:
# Mock failed node execution
def mock_failed_invoke():
from core.workflow.entities.node_entities import NodeRunResult
from core.workflow.entities.workflow_node_execution import WorkflowNodeExecutionStatus
from core.workflow.nodes.base.node import BaseNode
from core.workflow.nodes.event import RunCompletedEvent
from core.workflow.enums import WorkflowNodeExecutionStatus
from core.workflow.graph_events import NodeRunFailedEvent
from core.workflow.node_events import NodeRunResult
from core.workflow.nodes.base.node import Node
# Create mock node
mock_node = MagicMock(spec=BaseNode)
mock_node.type_ = "llm" # Use valid NodeType
mock_node = MagicMock(spec=Node)
mock_node.node_type = "llm" # Use valid NodeType
mock_node.title = "Test Node"
mock_node.continue_on_error = False
mock_node.error_strategy = None
# Create mock failed result
mock_result = NodeRunResult(
status=WorkflowNodeExecutionStatus.FAILED,
inputs={"input1": "value1"},
error="Test error message",
error_type="TestError",
)
# Create mock event
mock_event = RunCompletedEvent(run_result=mock_result)
mock_event = NodeRunFailedEvent(node_run_result=mock_result)
return mock_node, [mock_event]
# Return node and generator
def event_generator():
yield mock_event
return mock_node, event_generator()
workflow_service = WorkflowService()
# Act
result = workflow_service._handle_node_run_result(
result = workflow_service._handle_single_step_result(
invoke_node_fn=mock_failed_invoke, start_at=start_at, node_id=node_id
)
@ -1516,7 +1523,7 @@ class TestWorkflowService:
assert result is not None
assert result.node_id == node_id
# Import the enum for comparison
from core.workflow.entities.workflow_node_execution import WorkflowNodeExecutionStatus
from core.workflow.enums import WorkflowNodeExecutionStatus
assert result.status == WorkflowNodeExecutionStatus.FAILED
assert result.error is not None
@ -1537,17 +1544,15 @@ class TestWorkflowService:
# Mock node execution with continue_on_error
def mock_continue_on_error_invoke():
from core.workflow.entities.node_entities import NodeRunResult
from core.workflow.entities.workflow_node_execution import WorkflowNodeExecutionStatus
from core.workflow.nodes.base.node import BaseNode
from core.workflow.nodes.enums import ErrorStrategy
from core.workflow.nodes.event import RunCompletedEvent
from core.workflow.enums import ErrorStrategy, WorkflowNodeExecutionStatus
from core.workflow.graph_events import NodeRunFailedEvent
from core.workflow.node_events import NodeRunResult
from core.workflow.nodes.base.node import Node
# Create mock node with continue_on_error
mock_node = MagicMock(spec=BaseNode)
mock_node.type_ = "tool" # Use valid NodeType
mock_node = MagicMock(spec=Node)
mock_node.node_type = "tool" # Use valid NodeType
mock_node.title = "Test Node"
mock_node.continue_on_error = True
mock_node.error_strategy = ErrorStrategy.DEFAULT_VALUE
mock_node.default_value_dict = {"default_output": "default_value"}
@ -1556,18 +1561,21 @@ class TestWorkflowService:
status=WorkflowNodeExecutionStatus.FAILED,
inputs={"input1": "value1"},
error="Test error message",
error_type="TestError",
)
# Create mock event
mock_event = RunCompletedEvent(run_result=mock_result)
mock_event = NodeRunFailedEvent(node_run_result=mock_result)
return mock_node, [mock_event]
# Return node and generator
def event_generator():
yield mock_event
return mock_node, event_generator()
workflow_service = WorkflowService()
# Act
result = workflow_service._handle_node_run_result(
result = workflow_service._handle_single_step_result(
invoke_node_fn=mock_continue_on_error_invoke, start_at=start_at, node_id=node_id
)
@ -1575,7 +1583,7 @@ class TestWorkflowService:
assert result is not None
assert result.node_id == node_id
# Import the enum for comparison
from core.workflow.entities.workflow_node_execution import WorkflowNodeExecutionStatus
from core.workflow.enums import WorkflowNodeExecutionStatus
assert result.status == WorkflowNodeExecutionStatus.EXCEPTION # Should be EXCEPTION, not FAILED
assert result.outputs is not None