fix(api): abort quota-tracked nodes on missing model identity

Treat missing public model identity on quota-tracked workflow nodes as an execution bug instead of silently skipping quota handling.

Abort the workflow for both pre-run and post-run identity lookup failures and extend the layer tests to assert the stop-event and abort-command behavior.
This commit is contained in:
-LAN- 2026-04-22 14:43:26 +08:00
parent ebb472c5c2
commit 2ced9316b8
2 changed files with 36 additions and 1 deletions

View File

@ -6,7 +6,9 @@ This layer centralizes model-quota handling outside node implementations.
Graphon LLM-backed nodes expose provider/model identity through public node
configuration and, after execution, through ``node_run_result.inputs``. Resolve
quota billing from that public identity instead of depending on
``ModelInstance`` reconstruction inside the workflow layer.
``ModelInstance`` reconstruction inside the workflow layer. Missing identity on
quota-tracked nodes is treated as a workflow bug and aborts execution so quota
handling is never silently skipped.
"""
import logging
@ -64,6 +66,10 @@ class LLMQuotaLayer(GraphEngineLayer):
model_identity = self._extract_model_identity_from_node(node)
if model_identity is None:
self._abort_for_missing_model_identity(
node=node,
reason="LLM quota check requires public node model identity before execution.",
)
return
provider, model_name = model_identity
@ -87,6 +93,10 @@ class LLMQuotaLayer(GraphEngineLayer):
model_identity = self._extract_model_identity_from_result_event(result_event)
if model_identity is None:
self._abort_for_missing_model_identity(
node=node,
reason="LLM quota deduction requires model identity in the node result event.",
)
return
provider, model_name = model_identity
@ -111,6 +121,11 @@ class LLMQuotaLayer(GraphEngineLayer):
if stop_event is not None:
stop_event.set()
def _abort_for_missing_model_identity(self, *, node: Node, reason: str) -> None:
self._set_stop_event(node)
self._send_abort_command(reason=reason)
logger.error("LLM quota handling aborted, node_id=%s, reason=%s", node.id, reason)
def _send_abort_command(self, *, reason: str) -> None:
if not self.command_channel or self._abort_sent:
return

View File

@ -170,22 +170,42 @@ def test_quota_precheck_passes_without_abort() -> None:
def test_precheck_requires_public_node_model_config() -> None:
layer = LLMQuotaLayer(tenant_id="tenant-id")
stop_event = threading.Event()
layer.command_channel = MagicMock()
node = _build_node(node_type=BuiltinNodeTypes.LLM)
node.node_data = SimpleNamespace()
node.graph_runtime_state = MagicMock()
node.graph_runtime_state.stop_event = stop_event
with patch("core.app.workflow.layers.llm_quota.ensure_llm_quota_available_for_model", autospec=True) as mock_check:
layer.on_node_run_start(node)
assert stop_event.is_set()
mock_check.assert_not_called()
layer.command_channel.send_command.assert_called_once()
abort_command = layer.command_channel.send_command.call_args.args[0]
assert abort_command.command_type == CommandType.ABORT
assert abort_command.reason == "LLM quota check requires public node model identity before execution."
def test_deduction_requires_public_event_model_identity() -> None:
layer = LLMQuotaLayer(tenant_id="tenant-id")
stop_event = threading.Event()
layer.command_channel = MagicMock()
node = _build_node(node_type=BuiltinNodeTypes.LLM)
node.graph_runtime_state = MagicMock()
node.graph_runtime_state.stop_event = stop_event
result_event = _build_succeeded_event()
result_event.node_run_result.inputs = {"question": "hello"}
with patch("core.app.workflow.layers.llm_quota.deduct_llm_quota_for_model", autospec=True) as mock_deduct:
layer.on_node_run_end(node=node, error=None, result_event=result_event)
assert stop_event.is_set()
mock_deduct.assert_not_called()
layer.command_channel.send_command.assert_called_once()
abort_command = layer.command_channel.send_command.call_args.args[0]
assert abort_command.command_type == CommandType.ABORT
assert abort_command.reason == "LLM quota deduction requires model identity in the node result event."