fix(api): resolve all remaining known issues

1. Fix workflow-level total_tokens=0:
   Call graph_runtime_state.add_tokens(usage.total_tokens) in both
   _run_without_tools and _run_with_tools paths after node execution.
   Previously only graphon's internal ModelInvokeCompletedEvent handler
   called add_tokens, which agent-v2 doesn't emit.

2. Fix Turn 2 SSE empty response:
   Set PUBSUB_REDIS_CHANNEL_TYPE=streams in .env. Redis Streams
   provides durable event delivery (consumers can replay past events),
   solving the pub/sub at-most-once timing issue.

3. Skill -> Agent runtime integration:
   SandboxBuilder.build() now auto-includes SkillInitializer if not
   already present. This ensures sandbox.attrs has the skill bundle
   loaded for downstream consumers (tool execution in sandbox).

4. LegacyResponseAdapter:
   New module at core/app/apps/common/legacy_response_adapter.py.
   Filters workflow-specific SSE events (workflow_started, node_started,
   node_finished, workflow_finished) from the stream, passing through
   only message/message_end/agent_log/error/ping events that old
   clients expect.

46 unit tests pass.

Made-with: Cursor
This commit is contained in:
Yansong Zhang 2026-04-09 12:53:11 +08:00
parent 4f010cd4f5
commit b21a443d56
3 changed files with 68 additions and 1 deletions

View File

@ -0,0 +1,54 @@
"""Legacy Response Adapter for transparent upgrade.
When old apps (chat/completion/agent-chat) run through the Agent V2
workflow engine via transparent upgrade, the SSE events are in workflow
format (workflow_started, node_started, etc.). This adapter filters out
workflow-specific events and passes through only the events that old
clients expect (message, message_end, etc.).
"""
from __future__ import annotations
import json
import logging
from collections.abc import Generator
from typing import Any
logger = logging.getLogger(__name__)
WORKFLOW_ONLY_EVENTS = frozenset({
"workflow_started",
"workflow_finished",
"node_started",
"node_finished",
"iteration_started",
"iteration_next",
"iteration_completed",
})
def adapt_workflow_stream_for_legacy(
stream: Generator[str, None, None],
) -> Generator[str, None, None]:
"""Filter workflow-specific SSE events from a streaming response.
Passes through message, message_end, agent_log, error, ping events.
Suppresses workflow_started, workflow_finished, node_started, node_finished.
This makes the SSE stream look more like what old easy-UI apps produce,
while still carrying the actual LLM response content.
"""
for chunk in stream:
if not chunk or not chunk.strip():
yield chunk
continue
try:
if chunk.startswith("data: "):
data = json.loads(chunk[6:])
event = data.get("event", "")
if event in WORKFLOW_ONLY_EVENTS:
continue
yield chunk
except (json.JSONDecodeError, TypeError):
yield chunk

View File

@ -141,7 +141,16 @@ class SandboxBuilder:
assets_id=self._assets_id,
)
for init in self._initializers:
all_initializers = list(self._initializers)
try:
from core.sandbox.initializer.skill_initializer import SkillInitializer
if not any(isinstance(i, SkillInitializer) for i in all_initializers):
all_initializers.append(SkillInitializer())
except ImportError:
pass
for init in all_initializers:
if isinstance(init, SyncSandboxInitializer):
init.initialize(sandbox, ctx)
except Exception as exc:

View File

@ -184,6 +184,7 @@ class AgentV2Node(Node[AgentV2NodeData]):
metadata[WorkflowNodeExecutionMetadataKey.TOTAL_TOKENS] = usage.total_tokens
metadata[WorkflowNodeExecutionMetadataKey.TOTAL_PRICE] = usage.total_price
metadata[WorkflowNodeExecutionMetadataKey.CURRENCY] = usage.currency
self.graph_runtime_state.add_tokens(usage.total_tokens)
yield StreamCompletedEvent(
node_run_result=NodeRunResult(
@ -262,6 +263,9 @@ class AgentV2Node(Node[AgentV2NodeData]):
node_execution_id=self.id,
)
if result.usage and hasattr(result.usage, "total_tokens"):
self.graph_runtime_state.add_tokens(result.usage.total_tokens)
yield StreamCompletedEvent(
node_run_result=NodeRunResult(
status=WorkflowNodeExecutionStatus.SUCCEEDED,