From b6cea710232943c96d18a078c903a0ff0567393c Mon Sep 17 00:00:00 2001 From: -LAN- Date: Mon, 29 Sep 2025 18:15:22 +0800 Subject: [PATCH] fix(workflow): sync iteration conversation variables (#26368) Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com> Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> --- .../nodes/iteration/iteration_node.py | 49 ++- ...ate-conversation-variable-in-iteration.yml | 316 ++++++++++++++++++ ..._update_conversation_variable_iteration.py | 41 +++ 3 files changed, 402 insertions(+), 4 deletions(-) create mode 100644 api/tests/fixtures/workflow/update-conversation-variable-in-iteration.yml create mode 100644 api/tests/unit_tests/core/workflow/graph_engine/test_update_conversation_variable_iteration.py diff --git a/api/core/workflow/nodes/iteration/iteration_node.py b/api/core/workflow/nodes/iteration/iteration_node.py index 1a417b5739..a05a6b1b96 100644 --- a/api/core/workflow/nodes/iteration/iteration_node.py +++ b/api/core/workflow/nodes/iteration/iteration_node.py @@ -10,6 +10,8 @@ from typing_extensions import TypeIs from core.variables import IntegerVariable, NoneSegment from core.variables.segments import ArrayAnySegment, ArraySegment +from core.variables.variables import VariableUnion +from core.workflow.constants import CONVERSATION_VARIABLE_NODE_ID from core.workflow.entities import VariablePool from core.workflow.enums import ( ErrorStrategy, @@ -217,6 +219,13 @@ class IterationNode(Node): graph_engine=graph_engine, ) + # Sync conversation variables after each iteration completes + self._sync_conversation_variables_from_snapshot( + self._extract_conversation_variable_snapshot( + variable_pool=graph_engine.graph_runtime_state.variable_pool + ) + ) + # Update the total tokens from this iteration self.graph_runtime_state.total_tokens += graph_engine.graph_runtime_state.total_tokens iter_run_map[str(index)] = (datetime.now(UTC).replace(tzinfo=None) - iter_start_at).total_seconds() @@ -235,7 +244,10 @@ class IterationNode(Node): with ThreadPoolExecutor(max_workers=max_workers) as executor: # Submit all iteration tasks - future_to_index: dict[Future[tuple[datetime, list[GraphNodeEventBase], object | None, int]], int] = {} + future_to_index: dict[ + Future[tuple[datetime, list[GraphNodeEventBase], object | None, int, dict[str, VariableUnion]]], + int, + ] = {} for index, item in enumerate(iterator_list_value): yield IterationNextEvent(index=index) future = executor.submit( @@ -252,7 +264,7 @@ class IterationNode(Node): index = future_to_index[future] try: result = future.result() - iter_start_at, events, output_value, tokens_used = result + iter_start_at, events, output_value, tokens_used, conversation_snapshot = result # Update outputs at the correct index outputs[index] = output_value @@ -264,6 +276,9 @@ class IterationNode(Node): self.graph_runtime_state.total_tokens += tokens_used iter_run_map[str(index)] = (datetime.now(UTC).replace(tzinfo=None) - iter_start_at).total_seconds() + # Sync conversation variables after iteration completion + self._sync_conversation_variables_from_snapshot(conversation_snapshot) + except Exception as e: # Handle errors based on error_handle_mode match self._node_data.error_handle_mode: @@ -288,7 +303,7 @@ class IterationNode(Node): item: object, flask_app: Flask, context_vars: contextvars.Context, - ) -> tuple[datetime, list[GraphNodeEventBase], object | None, int]: + ) -> tuple[datetime, list[GraphNodeEventBase], object | None, int, dict[str, VariableUnion]]: """Execute a single iteration in parallel mode and return results.""" with preserve_flask_contexts(flask_app=flask_app, context_vars=context_vars): iter_start_at = datetime.now(UTC).replace(tzinfo=None) @@ -307,8 +322,17 @@ class IterationNode(Node): # Get the output value from the temporary outputs list output_value = outputs_temp[0] if outputs_temp else None + conversation_snapshot = self._extract_conversation_variable_snapshot( + variable_pool=graph_engine.graph_runtime_state.variable_pool + ) - return iter_start_at, events, output_value, graph_engine.graph_runtime_state.total_tokens + return ( + iter_start_at, + events, + output_value, + graph_engine.graph_runtime_state.total_tokens, + conversation_snapshot, + ) def _handle_iteration_success( self, @@ -430,6 +454,23 @@ class IterationNode(Node): return variable_mapping + def _extract_conversation_variable_snapshot(self, *, variable_pool: VariablePool) -> dict[str, VariableUnion]: + conversation_variables = variable_pool.variable_dictionary.get(CONVERSATION_VARIABLE_NODE_ID, {}) + return {name: variable.model_copy(deep=True) for name, variable in conversation_variables.items()} + + def _sync_conversation_variables_from_snapshot(self, snapshot: dict[str, VariableUnion]) -> None: + parent_pool = self.graph_runtime_state.variable_pool + parent_conversations = parent_pool.variable_dictionary.get(CONVERSATION_VARIABLE_NODE_ID, {}) + + current_keys = set(parent_conversations.keys()) + snapshot_keys = set(snapshot.keys()) + + for removed_key in current_keys - snapshot_keys: + parent_pool.remove((CONVERSATION_VARIABLE_NODE_ID, removed_key)) + + for name, variable in snapshot.items(): + parent_pool.add((CONVERSATION_VARIABLE_NODE_ID, name), variable) + def _append_iteration_info_to_event( self, event: GraphNodeEventBase, diff --git a/api/tests/fixtures/workflow/update-conversation-variable-in-iteration.yml b/api/tests/fixtures/workflow/update-conversation-variable-in-iteration.yml new file mode 100644 index 0000000000..ffc6eb9120 --- /dev/null +++ b/api/tests/fixtures/workflow/update-conversation-variable-in-iteration.yml @@ -0,0 +1,316 @@ +app: + description: 'This chatflow receives a sys.query, writes it into the `answer` variable, + and then outputs the `answer` variable. + + + `answer` is a conversation variable with a blank default value; it will be updated + in an iteration node. + + + if this chatflow works correctly, it will output the `sys.query` as the same.' + icon: 🤖 + icon_background: '#FFEAD5' + mode: advanced-chat + name: update-conversation-variable-in-iteration + use_icon_as_answer_icon: false +dependencies: [] +kind: app +version: 0.4.0 +workflow: + conversation_variables: + - description: '' + id: c30af82d-b2ec-417d-a861-4dd78584faa4 + name: answer + selector: + - conversation + - answer + value: '' + value_type: string + environment_variables: [] + features: + file_upload: + allowed_file_extensions: + - .JPG + - .JPEG + - .PNG + - .GIF + - .WEBP + - .SVG + allowed_file_types: + - image + allowed_file_upload_methods: + - local_file + - remote_url + enabled: false + fileUploadConfig: + audio_file_size_limit: 50 + batch_count_limit: 5 + file_size_limit: 15 + image_file_size_limit: 10 + video_file_size_limit: 100 + workflow_file_upload_limit: 10 + image: + enabled: false + number_limits: 3 + transfer_methods: + - local_file + - remote_url + number_limits: 3 + opening_statement: '' + retriever_resource: + enabled: true + sensitive_word_avoidance: + enabled: false + speech_to_text: + enabled: false + suggested_questions: [] + suggested_questions_after_answer: + enabled: false + text_to_speech: + enabled: false + language: '' + voice: '' + graph: + edges: + - data: + isInIteration: false + isInLoop: false + sourceType: start + targetType: code + id: 1759032354471-source-1759032363865-target + source: '1759032354471' + sourceHandle: source + target: '1759032363865' + targetHandle: target + type: custom + zIndex: 0 + - data: + isInIteration: false + isInLoop: false + sourceType: code + targetType: iteration + id: 1759032363865-source-1759032379989-target + source: '1759032363865' + sourceHandle: source + target: '1759032379989' + targetHandle: target + type: custom + zIndex: 0 + - data: + isInIteration: true + isInLoop: false + iteration_id: '1759032379989' + sourceType: iteration-start + targetType: assigner + id: 1759032379989start-source-1759032394460-target + source: 1759032379989start + sourceHandle: source + target: '1759032394460' + targetHandle: target + type: custom + zIndex: 1002 + - data: + isInIteration: false + isInLoop: false + sourceType: iteration + targetType: answer + id: 1759032379989-source-1759032410331-target + source: '1759032379989' + sourceHandle: source + target: '1759032410331' + targetHandle: target + type: custom + zIndex: 0 + - data: + isInIteration: true + isInLoop: false + iteration_id: '1759032379989' + sourceType: assigner + targetType: code + id: 1759032394460-source-1759032476318-target + source: '1759032394460' + sourceHandle: source + target: '1759032476318' + targetHandle: target + type: custom + zIndex: 1002 + nodes: + - data: + selected: false + title: Start + type: start + variables: [] + height: 52 + id: '1759032354471' + position: + x: 30 + y: 302 + positionAbsolute: + x: 30 + y: 302 + selected: false + sourcePosition: right + targetPosition: left + type: custom + width: 242 + - data: + code: "\ndef main():\n return {\n \"result\": [1],\n }\n" + code_language: python3 + outputs: + result: + children: null + type: array[number] + selected: false + title: Code + type: code + variables: [] + height: 52 + id: '1759032363865' + position: + x: 332 + y: 302 + positionAbsolute: + x: 332 + y: 302 + sourcePosition: right + targetPosition: left + type: custom + width: 242 + - data: + error_handle_mode: terminated + height: 204 + is_parallel: false + iterator_input_type: array[number] + iterator_selector: + - '1759032363865' + - result + output_selector: + - '1759032476318' + - result + output_type: array[string] + parallel_nums: 10 + selected: false + start_node_id: 1759032379989start + title: Iteration + type: iteration + width: 808 + height: 204 + id: '1759032379989' + position: + x: 634 + y: 302 + positionAbsolute: + x: 634 + y: 302 + selected: true + sourcePosition: right + targetPosition: left + type: custom + width: 808 + zIndex: 1 + - data: + desc: '' + isInIteration: true + selected: false + title: '' + type: iteration-start + draggable: false + height: 48 + id: 1759032379989start + parentId: '1759032379989' + position: + x: 60 + y: 78 + positionAbsolute: + x: 694 + y: 380 + selectable: false + sourcePosition: right + targetPosition: left + type: custom-iteration-start + width: 44 + zIndex: 1002 + - data: + isInIteration: true + isInLoop: false + items: + - input_type: variable + operation: over-write + value: + - sys + - query + variable_selector: + - conversation + - answer + write_mode: over-write + iteration_id: '1759032379989' + selected: false + title: Variable Assigner + type: assigner + version: '2' + height: 84 + id: '1759032394460' + parentId: '1759032379989' + position: + x: 204 + y: 60 + positionAbsolute: + x: 838 + y: 362 + sourcePosition: right + targetPosition: left + type: custom + width: 242 + zIndex: 1002 + - data: + answer: '{{#conversation.answer#}}' + selected: false + title: Answer + type: answer + variables: [] + height: 104 + id: '1759032410331' + position: + x: 1502 + y: 302 + positionAbsolute: + x: 1502 + y: 302 + selected: false + sourcePosition: right + targetPosition: left + type: custom + width: 242 + - data: + code: "\ndef main():\n return {\n \"result\": '',\n }\n" + code_language: python3 + isInIteration: true + isInLoop: false + iteration_id: '1759032379989' + outputs: + result: + children: null + type: string + selected: false + title: Code 2 + type: code + variables: [] + height: 52 + id: '1759032476318' + parentId: '1759032379989' + position: + x: 506 + y: 76 + positionAbsolute: + x: 1140 + y: 378 + sourcePosition: right + targetPosition: left + type: custom + width: 242 + zIndex: 1002 + viewport: + x: 120.39999999999998 + y: 85.20000000000005 + zoom: 0.7 + rag_pipeline_variables: [] diff --git a/api/tests/unit_tests/core/workflow/graph_engine/test_update_conversation_variable_iteration.py b/api/tests/unit_tests/core/workflow/graph_engine/test_update_conversation_variable_iteration.py new file mode 100644 index 0000000000..a7309f64de --- /dev/null +++ b/api/tests/unit_tests/core/workflow/graph_engine/test_update_conversation_variable_iteration.py @@ -0,0 +1,41 @@ +"""Validate conversation variable updates inside an iteration workflow. + +This test uses the ``update-conversation-variable-in-iteration`` fixture, which +routes ``sys.query`` into the conversation variable ``answer`` from within an +iteration container. The workflow should surface that updated conversation +variable in the final answer output. + +Code nodes in the fixture are mocked because their concrete outputs are not +relevant to verifying variable propagation semantics. +""" + +from .test_mock_config import MockConfigBuilder +from .test_table_runner import TableTestRunner, WorkflowTestCase + + +def test_update_conversation_variable_in_iteration(): + fixture_name = "update-conversation-variable-in-iteration" + user_query = "ensure conversation variable syncs" + + mock_config = ( + MockConfigBuilder() + .with_node_output("1759032363865", {"result": [1]}) + .with_node_output("1759032476318", {"result": ""}) + .build() + ) + + case = WorkflowTestCase( + fixture_path=fixture_name, + use_auto_mock=True, + mock_config=mock_config, + query=user_query, + expected_outputs={"answer": user_query}, + description="Conversation variable updated within iteration should flow to answer output.", + ) + + runner = TableTestRunner() + result = runner.run_test_case(case) + + assert result.success, f"Workflow execution failed: {result.error}" + assert result.actual_outputs is not None + assert result.actual_outputs.get("answer") == user_query