diff --git a/api/services/snippet_generate_service.py b/api/services/snippet_generate_service.py index 5e0d25c8f7..bbd183f586 100644 --- a/api/services/snippet_generate_service.py +++ b/api/services/snippet_generate_service.py @@ -80,6 +80,50 @@ class SnippetGenerateService: # Specific ID for the injected virtual Start node so it can be recognised _VIRTUAL_START_NODE_ID = "__snippet_virtual_start__" + @classmethod + def _is_virtual_start_event(cls, message: Mapping[str, Any] | str) -> bool: + """ + Return True when *message* is a snippet-only virtual Start node event. + + The virtual Start node is injected purely for snippet execution and is + not part of the persisted draft graph. Filter its node lifecycle events + out of the SSE stream so the frontend only receives nodes that exist on + the canvas. + """ + if not isinstance(message, Mapping): + return False + + if message.get("event") not in {"node_started", "node_finished"}: + return False + + data = message.get("data") + if not isinstance(data, Mapping): + return False + + return data.get("node_id") == cls._VIRTUAL_START_NODE_ID + + @classmethod + def _filter_virtual_start_events( + cls, + response: Mapping[str, Any] | Generator[Mapping[str, Any] | str, None, None], + ) -> Mapping[str, Any] | Generator[Mapping[str, Any] | str, None, None]: + """ + Drop snippet virtual Start node lifecycle events from stream responses. + + Blocking responses are returned unchanged because they never expose the + injected node as a standalone event payload. + """ + if isinstance(response, Mapping): + return response + + def _stream() -> Generator[Mapping[str, Any] | str, None, None]: + for message in response: + if cls._is_virtual_start_event(message): + continue + yield message + + return _stream() + @classmethod def generate( cls, @@ -119,16 +163,18 @@ class SnippetGenerateService: # Adapt snippet to App-like interface for WorkflowAppGenerator app_proxy = _SnippetAsApp(snippet) + response = WorkflowAppGenerator().generate( + app_model=app_proxy, # type: ignore[arg-type] + workflow=workflow, + user=user, + args=args, + invoke_from=invoke_from, + streaming=streaming, + call_depth=0, + ) + return WorkflowAppGenerator.convert_to_event_stream( - WorkflowAppGenerator().generate( - app_model=app_proxy, # type: ignore[arg-type] - workflow=workflow, - user=user, - args=args, - invoke_from=invoke_from, - streaming=streaming, - call_depth=0, - ) + cls._filter_virtual_start_events(response) ) @classmethod diff --git a/api/tests/unit_tests/services/test_snippet_generate_service.py b/api/tests/unit_tests/services/test_snippet_generate_service.py new file mode 100644 index 0000000000..a88d9385c8 --- /dev/null +++ b/api/tests/unit_tests/services/test_snippet_generate_service.py @@ -0,0 +1,67 @@ +from types import SimpleNamespace + +from core.app.entities.app_invoke_entities import InvokeFrom +from services.snippet_generate_service import SnippetGenerateService + + +def test_generate_filters_virtual_start_events(monkeypatch) -> None: + snippet = SimpleNamespace(id="snippet-id", tenant_id="tenant-id") + user = SimpleNamespace(id="user-id") + workflow = SimpleNamespace(id="workflow-id") + + monkeypatch.setattr( + "services.snippet_generate_service.SnippetService", + lambda: SimpleNamespace(get_draft_workflow=lambda *, snippet: workflow), + ) + monkeypatch.setattr( + SnippetGenerateService, + "_ensure_start_node", + classmethod(lambda cls, workflow, snippet: workflow), + ) + + stream_messages = iter( + [ + {"event": "workflow_started", "data": {"id": "run-1"}}, + { + "event": "node_started", + "data": {"node_id": SnippetGenerateService._VIRTUAL_START_NODE_ID, "title": "Start"}, + }, + { + "event": "node_finished", + "data": {"node_id": SnippetGenerateService._VIRTUAL_START_NODE_ID, "status": "succeeded"}, + }, + {"event": "node_started", "data": {"node_id": "code-node", "title": "Code"}}, + {"event": "node_finished", "data": {"node_id": "code-node", "status": "succeeded"}}, + {"event": "workflow_finished", "data": {"id": "run-1", "status": "succeeded"}}, + ] + ) + + monkeypatch.setattr( + "services.snippet_generate_service.WorkflowAppGenerator.generate", + lambda self, **kwargs: stream_messages, + ) + monkeypatch.setattr( + "services.snippet_generate_service.WorkflowAppGenerator.convert_to_event_stream", + lambda generator: list(generator), + ) + + response = SnippetGenerateService.generate( + snippet=snippet, + user=user, + args={"inputs": {}}, + invoke_from=InvokeFrom.DEBUGGER, + streaming=True, + ) + + assert response == [ + {"event": "workflow_started", "data": {"id": "run-1"}}, + {"event": "node_started", "data": {"node_id": "code-node", "title": "Code"}}, + {"event": "node_finished", "data": {"node_id": "code-node", "status": "succeeded"}}, + {"event": "workflow_finished", "data": {"id": "run-1", "status": "succeeded"}}, + ] + + +def test_filter_virtual_start_events_returns_blocking_response_unchanged() -> None: + blocking_response = {"task_id": "task-1", "data": {"status": "succeeded"}} + + assert SnippetGenerateService._filter_virtual_start_events(blocking_response) is blocking_response