From f439e081b557fade41dbe4f36830c681be21c2bf Mon Sep 17 00:00:00 2001 From: Novice Date: Wed, 24 Dec 2025 11:28:52 +0800 Subject: [PATCH] fix: loop streaming by clearing stale subgraph variables (#30059) --- api/core/workflow/enums.py | 1 + api/core/workflow/nodes/loop/entities.py | 6 ++++++ api/core/workflow/nodes/loop/loop_node.py | 22 ++++++++++++++++++++-- 3 files changed, 27 insertions(+), 2 deletions(-) diff --git a/api/core/workflow/enums.py b/api/core/workflow/enums.py index cf12d5ec1f..c08b62a253 100644 --- a/api/core/workflow/enums.py +++ b/api/core/workflow/enums.py @@ -247,6 +247,7 @@ class WorkflowNodeExecutionMetadataKey(StrEnum): ERROR_STRATEGY = "error_strategy" # node in continue on error mode return the field LOOP_VARIABLE_MAP = "loop_variable_map" # single loop variable output DATASOURCE_INFO = "datasource_info" + COMPLETED_REASON = "completed_reason" # completed reason for loop node class WorkflowNodeExecutionStatus(StrEnum): diff --git a/api/core/workflow/nodes/loop/entities.py b/api/core/workflow/nodes/loop/entities.py index 4fcad888e4..92a8702fc3 100644 --- a/api/core/workflow/nodes/loop/entities.py +++ b/api/core/workflow/nodes/loop/entities.py @@ -1,3 +1,4 @@ +from enum import StrEnum from typing import Annotated, Any, Literal from pydantic import AfterValidator, BaseModel, Field, field_validator @@ -96,3 +97,8 @@ class LoopState(BaseLoopState): Get current output. """ return self.current_output + + +class LoopCompletedReason(StrEnum): + LOOP_BREAK = "loop_break" + LOOP_COMPLETED = "loop_completed" diff --git a/api/core/workflow/nodes/loop/loop_node.py b/api/core/workflow/nodes/loop/loop_node.py index 1c26bbc2d0..1f9fc8a115 100644 --- a/api/core/workflow/nodes/loop/loop_node.py +++ b/api/core/workflow/nodes/loop/loop_node.py @@ -29,7 +29,7 @@ from core.workflow.node_events import ( ) from core.workflow.nodes.base import LLMUsageTrackingMixin from core.workflow.nodes.base.node import Node -from core.workflow.nodes.loop.entities import LoopNodeData, LoopVariableData +from core.workflow.nodes.loop.entities import LoopCompletedReason, LoopNodeData, LoopVariableData from core.workflow.utils.condition.processor import ConditionProcessor from factories.variable_factory import TypeMismatchError, build_segment_with_type, segment_to_variable from libs.datetime_utils import naive_utc_now @@ -96,6 +96,7 @@ class LoopNode(LLMUsageTrackingMixin, Node[LoopNodeData]): loop_duration_map: dict[str, float] = {} single_loop_variable_map: dict[str, dict[str, Any]] = {} # single loop variable output loop_usage = LLMUsage.empty_usage() + loop_node_ids = self._extract_loop_node_ids_from_config(self.graph_config, self._node_id) # Start Loop event yield LoopStartedEvent( @@ -118,6 +119,8 @@ class LoopNode(LLMUsageTrackingMixin, Node[LoopNodeData]): loop_count = 0 for i in range(loop_count): + # Clear stale variables from previous loop iterations to avoid streaming old values + self._clear_loop_subgraph_variables(loop_node_ids) graph_engine = self._create_graph_engine(start_at=start_at, root_node_id=root_node_id) loop_start_time = naive_utc_now() @@ -177,7 +180,11 @@ class LoopNode(LLMUsageTrackingMixin, Node[LoopNodeData]): WorkflowNodeExecutionMetadataKey.TOTAL_TOKENS: loop_usage.total_tokens, WorkflowNodeExecutionMetadataKey.TOTAL_PRICE: loop_usage.total_price, WorkflowNodeExecutionMetadataKey.CURRENCY: loop_usage.currency, - "completed_reason": "loop_break" if reach_break_condition else "loop_completed", + WorkflowNodeExecutionMetadataKey.COMPLETED_REASON: ( + LoopCompletedReason.LOOP_BREAK + if reach_break_condition + else LoopCompletedReason.LOOP_COMPLETED.value + ), WorkflowNodeExecutionMetadataKey.LOOP_DURATION_MAP: loop_duration_map, WorkflowNodeExecutionMetadataKey.LOOP_VARIABLE_MAP: single_loop_variable_map, }, @@ -274,6 +281,17 @@ class LoopNode(LLMUsageTrackingMixin, Node[LoopNodeData]): if WorkflowNodeExecutionMetadataKey.LOOP_ID not in current_metadata: event.node_run_result.metadata = {**current_metadata, **loop_metadata} + def _clear_loop_subgraph_variables(self, loop_node_ids: set[str]) -> None: + """ + Remove variables produced by loop sub-graph nodes from previous iterations. + + Keeping stale variables causes a freshly created response coordinator in the + next iteration to fall back to outdated values when no stream chunks exist. + """ + variable_pool = self.graph_runtime_state.variable_pool + for node_id in loop_node_ids: + variable_pool.remove([node_id]) + @classmethod def _extract_variable_selector_to_variable_mapping( cls,