diff --git a/api/services/snippet_generate_service.py b/api/services/snippet_generate_service.py index 5e0d25c8f7..bbd183f586 100644 --- a/api/services/snippet_generate_service.py +++ b/api/services/snippet_generate_service.py @@ -80,6 +80,50 @@ class SnippetGenerateService: # Specific ID for the injected virtual Start node so it can be recognised _VIRTUAL_START_NODE_ID = "__snippet_virtual_start__" + @classmethod + def _is_virtual_start_event(cls, message: Mapping[str, Any] | str) -> bool: + """ + Return True when *message* is a snippet-only virtual Start node event. + + The virtual Start node is injected purely for snippet execution and is + not part of the persisted draft graph. Filter its node lifecycle events + out of the SSE stream so the frontend only receives nodes that exist on + the canvas. + """ + if not isinstance(message, Mapping): + return False + + if message.get("event") not in {"node_started", "node_finished"}: + return False + + data = message.get("data") + if not isinstance(data, Mapping): + return False + + return data.get("node_id") == cls._VIRTUAL_START_NODE_ID + + @classmethod + def _filter_virtual_start_events( + cls, + response: Mapping[str, Any] | Generator[Mapping[str, Any] | str, None, None], + ) -> Mapping[str, Any] | Generator[Mapping[str, Any] | str, None, None]: + """ + Drop snippet virtual Start node lifecycle events from stream responses. + + Blocking responses are returned unchanged because they never expose the + injected node as a standalone event payload. + """ + if isinstance(response, Mapping): + return response + + def _stream() -> Generator[Mapping[str, Any] | str, None, None]: + for message in response: + if cls._is_virtual_start_event(message): + continue + yield message + + return _stream() + @classmethod def generate( cls, @@ -119,16 +163,18 @@ class SnippetGenerateService: # Adapt snippet to App-like interface for WorkflowAppGenerator app_proxy = _SnippetAsApp(snippet) + response = WorkflowAppGenerator().generate( + app_model=app_proxy, # type: ignore[arg-type] + workflow=workflow, + user=user, + args=args, + invoke_from=invoke_from, + streaming=streaming, + call_depth=0, + ) + return WorkflowAppGenerator.convert_to_event_stream( - WorkflowAppGenerator().generate( - app_model=app_proxy, # type: ignore[arg-type] - workflow=workflow, - user=user, - args=args, - invoke_from=invoke_from, - streaming=streaming, - call_depth=0, - ) + cls._filter_virtual_start_events(response) ) @classmethod