fix(workflow): use assertions for runtime state invariants

This commit is contained in:
-LAN- 2026-04-14 14:39:53 +08:00
parent b8089c1d66
commit e58a0e818e
No known key found for this signature in database
GPG Key ID: 6BA0D108DED011FF
4 changed files with 9 additions and 9 deletions

View File

@ -186,10 +186,10 @@ class WorkflowBasedAppRunner:
A tuple containing (graph, variable_pool, graph_runtime_state)
Raises:
ValueError: If neither single_iteration_run nor single_loop_run is specified
AssertionError: If neither single_iteration_run nor single_loop_run is specified
"""
if single_iteration_run is None and single_loop_run is None:
raise ValueError("Neither single_iteration_run nor single_loop_run is specified")
raise AssertionError("Neither single_iteration_run nor single_loop_run is specified")
# Create initial runtime state with variable pool containing environment variables
variable_pool = VariablePool()

View File

@ -23,7 +23,7 @@ def _require_workflow_id(workflow_id: str) -> str:
"""Validate that workflow-scoped runtime collaborators receive a real id."""
if not workflow_id:
raise ValueError("workflow_id must be a non-empty string")
raise AssertionError("workflow_id must be a non-empty string")
return workflow_id
@ -78,7 +78,7 @@ def ensure_graph_runtime_state_initialized(
elif not graph_execution.workflow_id:
graph_execution.workflow_id = workflow_id
elif graph_execution.workflow_id != workflow_id:
raise ValueError("GraphRuntimeState workflow_id does not match graph execution workflow_id")
raise AssertionError("GraphRuntimeState workflow_id does not match graph execution workflow_id")
return graph_runtime_state
@ -100,7 +100,7 @@ def bind_graph_runtime_state_to_graph(
attached_graph = state._graph
if attached_graph is not None and attached_graph is not graph:
raise ValueError("GraphRuntimeState already attached to a different graph instance")
raise AssertionError("GraphRuntimeState already attached to a different graph instance")
if state._response_coordinator is None:
response_coordinator = ResponseStreamCoordinator(

View File

@ -87,7 +87,7 @@ class TestWorkflowBasedAppRunner:
workflow = SimpleNamespace(environment_variables=[], graph_dict={})
with pytest.raises(ValueError, match="Neither single_iteration_run nor single_loop_run"):
with pytest.raises(AssertionError, match="Neither single_iteration_run nor single_loop_run"):
runner._prepare_single_node_execution(workflow, None, None, user_id="00000000-0000-0000-0000-000000000001")
def test_get_graph_and_variable_pool_for_single_node_run(self, monkeypatch):

View File

@ -73,7 +73,7 @@ class TestCreateGraphRuntimeState:
assert runtime_state.node_run_steps == 4
def test_rejects_blank_workflow_id(self) -> None:
with pytest.raises(ValueError, match="workflow_id must be a non-empty string"):
with pytest.raises(AssertionError, match="workflow_id must be a non-empty string"):
create_graph_runtime_state(
variable_pool=_build_variable_pool(),
start_at=0.0,
@ -138,7 +138,7 @@ class TestEnsureGraphRuntimeStateInitialized:
graph_execution=GraphExecution(workflow_id="other-workflow"),
)
with pytest.raises(ValueError, match="workflow_id does not match graph execution workflow_id"):
with pytest.raises(AssertionError, match="workflow_id does not match graph execution workflow_id"):
ensure_graph_runtime_state_initialized(
runtime_state,
workflow_id="workflow-id",
@ -208,7 +208,7 @@ class TestBindGraphRuntimeStateToGraph:
workflow_id="workflow-id",
)
with pytest.raises(ValueError, match="already attached to a different graph instance"):
with pytest.raises(AssertionError, match="already attached to a different graph instance"):
bind_graph_runtime_state_to_graph(
runtime_state,
second_graph,