diff --git a/api/core/app/apps/common/legacy_response_adapter.py b/api/core/app/apps/common/legacy_response_adapter.py new file mode 100644 index 0000000000..5b66e39b94 --- /dev/null +++ b/api/core/app/apps/common/legacy_response_adapter.py @@ -0,0 +1,54 @@ +"""Legacy Response Adapter for transparent upgrade. + +When old apps (chat/completion/agent-chat) run through the Agent V2 +workflow engine via transparent upgrade, the SSE events are in workflow +format (workflow_started, node_started, etc.). This adapter filters out +workflow-specific events and passes through only the events that old +clients expect (message, message_end, etc.). +""" + +from __future__ import annotations + +import json +import logging +from collections.abc import Generator +from typing import Any + +logger = logging.getLogger(__name__) + +WORKFLOW_ONLY_EVENTS = frozenset({ + "workflow_started", + "workflow_finished", + "node_started", + "node_finished", + "iteration_started", + "iteration_next", + "iteration_completed", +}) + + +def adapt_workflow_stream_for_legacy( + stream: Generator[str, None, None], +) -> Generator[str, None, None]: + """Filter workflow-specific SSE events from a streaming response. + + Passes through message, message_end, agent_log, error, ping events. + Suppresses workflow_started, workflow_finished, node_started, node_finished. + + This makes the SSE stream look more like what old easy-UI apps produce, + while still carrying the actual LLM response content. + """ + for chunk in stream: + if not chunk or not chunk.strip(): + yield chunk + continue + + try: + if chunk.startswith("data: "): + data = json.loads(chunk[6:]) + event = data.get("event", "") + if event in WORKFLOW_ONLY_EVENTS: + continue + yield chunk + except (json.JSONDecodeError, TypeError): + yield chunk diff --git a/api/core/sandbox/builder.py b/api/core/sandbox/builder.py index 97b9f96fb8..52a6f313b0 100644 --- a/api/core/sandbox/builder.py +++ b/api/core/sandbox/builder.py @@ -141,7 +141,16 @@ class SandboxBuilder: assets_id=self._assets_id, ) - for init in self._initializers: + all_initializers = list(self._initializers) + try: + from core.sandbox.initializer.skill_initializer import SkillInitializer + + if not any(isinstance(i, SkillInitializer) for i in all_initializers): + all_initializers.append(SkillInitializer()) + except ImportError: + pass + + for init in all_initializers: if isinstance(init, SyncSandboxInitializer): init.initialize(sandbox, ctx) except Exception as exc: diff --git a/api/core/workflow/nodes/agent_v2/node.py b/api/core/workflow/nodes/agent_v2/node.py index af214d44e1..59c6e0a717 100644 --- a/api/core/workflow/nodes/agent_v2/node.py +++ b/api/core/workflow/nodes/agent_v2/node.py @@ -184,6 +184,7 @@ class AgentV2Node(Node[AgentV2NodeData]): metadata[WorkflowNodeExecutionMetadataKey.TOTAL_TOKENS] = usage.total_tokens metadata[WorkflowNodeExecutionMetadataKey.TOTAL_PRICE] = usage.total_price metadata[WorkflowNodeExecutionMetadataKey.CURRENCY] = usage.currency + self.graph_runtime_state.add_tokens(usage.total_tokens) yield StreamCompletedEvent( node_run_result=NodeRunResult( @@ -262,6 +263,9 @@ class AgentV2Node(Node[AgentV2NodeData]): node_execution_id=self.id, ) + if result.usage and hasattr(result.usage, "total_tokens"): + self.graph_runtime_state.add_tokens(result.usage.total_tokens) + yield StreamCompletedEvent( node_run_result=NodeRunResult( status=WorkflowNodeExecutionStatus.SUCCEEDED,