filter virtual start node event for snippet draft run.

This commit is contained in:
FFXN 2026-04-24 14:41:33 +08:00
parent 19037268cd
commit e9cc078fb5
2 changed files with 122 additions and 9 deletions

View File

@ -80,6 +80,50 @@ class SnippetGenerateService:
# Specific ID for the injected virtual Start node so it can be recognised
_VIRTUAL_START_NODE_ID = "__snippet_virtual_start__"
@classmethod
def _is_virtual_start_event(cls, message: Mapping[str, Any] | str) -> bool:
"""
Return True when *message* is a snippet-only virtual Start node event.
The virtual Start node is injected purely for snippet execution and is
not part of the persisted draft graph. Filter its node lifecycle events
out of the SSE stream so the frontend only receives nodes that exist on
the canvas.
"""
if not isinstance(message, Mapping):
return False
if message.get("event") not in {"node_started", "node_finished"}:
return False
data = message.get("data")
if not isinstance(data, Mapping):
return False
return data.get("node_id") == cls._VIRTUAL_START_NODE_ID
@classmethod
def _filter_virtual_start_events(
cls,
response: Mapping[str, Any] | Generator[Mapping[str, Any] | str, None, None],
) -> Mapping[str, Any] | Generator[Mapping[str, Any] | str, None, None]:
"""
Drop snippet virtual Start node lifecycle events from stream responses.
Blocking responses are returned unchanged because they never expose the
injected node as a standalone event payload.
"""
if isinstance(response, Mapping):
return response
def _stream() -> Generator[Mapping[str, Any] | str, None, None]:
for message in response:
if cls._is_virtual_start_event(message):
continue
yield message
return _stream()
@classmethod
def generate(
cls,
@ -119,16 +163,18 @@ class SnippetGenerateService:
# Adapt snippet to App-like interface for WorkflowAppGenerator
app_proxy = _SnippetAsApp(snippet)
response = WorkflowAppGenerator().generate(
app_model=app_proxy, # type: ignore[arg-type]
workflow=workflow,
user=user,
args=args,
invoke_from=invoke_from,
streaming=streaming,
call_depth=0,
)
return WorkflowAppGenerator.convert_to_event_stream(
WorkflowAppGenerator().generate(
app_model=app_proxy, # type: ignore[arg-type]
workflow=workflow,
user=user,
args=args,
invoke_from=invoke_from,
streaming=streaming,
call_depth=0,
)
cls._filter_virtual_start_events(response)
)
@classmethod

View File

@ -0,0 +1,67 @@
from types import SimpleNamespace
from core.app.entities.app_invoke_entities import InvokeFrom
from services.snippet_generate_service import SnippetGenerateService
def test_generate_filters_virtual_start_events(monkeypatch) -> None:
snippet = SimpleNamespace(id="snippet-id", tenant_id="tenant-id")
user = SimpleNamespace(id="user-id")
workflow = SimpleNamespace(id="workflow-id")
monkeypatch.setattr(
"services.snippet_generate_service.SnippetService",
lambda: SimpleNamespace(get_draft_workflow=lambda *, snippet: workflow),
)
monkeypatch.setattr(
SnippetGenerateService,
"_ensure_start_node",
classmethod(lambda cls, workflow, snippet: workflow),
)
stream_messages = iter(
[
{"event": "workflow_started", "data": {"id": "run-1"}},
{
"event": "node_started",
"data": {"node_id": SnippetGenerateService._VIRTUAL_START_NODE_ID, "title": "Start"},
},
{
"event": "node_finished",
"data": {"node_id": SnippetGenerateService._VIRTUAL_START_NODE_ID, "status": "succeeded"},
},
{"event": "node_started", "data": {"node_id": "code-node", "title": "Code"}},
{"event": "node_finished", "data": {"node_id": "code-node", "status": "succeeded"}},
{"event": "workflow_finished", "data": {"id": "run-1", "status": "succeeded"}},
]
)
monkeypatch.setattr(
"services.snippet_generate_service.WorkflowAppGenerator.generate",
lambda self, **kwargs: stream_messages,
)
monkeypatch.setattr(
"services.snippet_generate_service.WorkflowAppGenerator.convert_to_event_stream",
lambda generator: list(generator),
)
response = SnippetGenerateService.generate(
snippet=snippet,
user=user,
args={"inputs": {}},
invoke_from=InvokeFrom.DEBUGGER,
streaming=True,
)
assert response == [
{"event": "workflow_started", "data": {"id": "run-1"}},
{"event": "node_started", "data": {"node_id": "code-node", "title": "Code"}},
{"event": "node_finished", "data": {"node_id": "code-node", "status": "succeeded"}},
{"event": "workflow_finished", "data": {"id": "run-1", "status": "succeeded"}},
]
def test_filter_virtual_start_events_returns_blocking_response_unchanged() -> None:
blocking_response = {"task_id": "task-1", "data": {"status": "succeeded"}}
assert SnippetGenerateService._filter_virtual_start_events(blocking_response) is blocking_response