diff --git a/api/core/workflow/nodes/variable_assigner/v1/node.py b/api/core/workflow/nodes/variable_assigner/v1/node.py index d2ea7d94ea..ac2870aa65 100644 --- a/api/core/workflow/nodes/variable_assigner/v1/node.py +++ b/api/core/workflow/nodes/variable_assigner/v1/node.py @@ -33,6 +33,15 @@ class VariableAssignerNode(Node[VariableAssignerData]): graph_runtime_state=graph_runtime_state, ) + def blocks_variable_output(self, variable_selectors: set[tuple[str, ...]]) -> bool: + """ + Check if this Variable Assigner node blocks the output of specific variables. + + Returns True if this node updates any of the requested conversation variables. + """ + assigned_selector = tuple(self.node_data.assigned_variable_selector) + return assigned_selector in variable_selectors + @classmethod def version(cls) -> str: return "1" diff --git a/api/tests/fixtures/workflow/test_streaming_conversation_variables_v1_overwrite.yml b/api/tests/fixtures/workflow/test_streaming_conversation_variables_v1_overwrite.yml new file mode 100644 index 0000000000..5b2a6260e9 --- /dev/null +++ b/api/tests/fixtures/workflow/test_streaming_conversation_variables_v1_overwrite.yml @@ -0,0 +1,158 @@ +app: + description: Validate v1 Variable Assigner blocks streaming until conversation variable is updated. + icon: 🤖 + icon_background: '#FFEAD5' + mode: advanced-chat + name: test_streaming_conversation_variables_v1_overwrite + use_icon_as_answer_icon: false +dependencies: [] +kind: app +version: 0.5.0 +workflow: + conversation_variables: + - description: '' + id: 6ddf2d7f-3d1b-4bb0-9a5e-9b0c87c7b5e6 + name: conv_var + selector: + - conversation + - conv_var + value: default + value_type: string + environment_variables: [] + features: + file_upload: + allowed_file_extensions: + - .JPG + - .JPEG + - .PNG + - .GIF + - .WEBP + - .SVG + allowed_file_types: + - image + allowed_file_upload_methods: + - local_file + - remote_url + enabled: false + fileUploadConfig: + audio_file_size_limit: 50 + batch_count_limit: 5 + file_size_limit: 15 + image_file_size_limit: 10 + video_file_size_limit: 100 + workflow_file_upload_limit: 10 + image: + enabled: false + number_limits: 3 + transfer_methods: + - local_file + - remote_url + number_limits: 3 + opening_statement: '' + retriever_resource: + enabled: true + sensitive_word_avoidance: + enabled: false + speech_to_text: + enabled: false + suggested_questions: [] + suggested_questions_after_answer: + enabled: false + text_to_speech: + enabled: false + language: '' + voice: '' + graph: + edges: + - data: + isInIteration: false + isInLoop: false + sourceType: start + targetType: assigner + id: start-source-assigner-target + source: start + sourceHandle: source + target: assigner + targetHandle: target + type: custom + zIndex: 0 + - data: + isInLoop: false + sourceType: assigner + targetType: answer + id: assigner-source-answer-target + source: assigner + sourceHandle: source + target: answer + targetHandle: target + type: custom + zIndex: 0 + nodes: + - data: + desc: '' + selected: false + title: Start + type: start + variables: [] + height: 54 + id: start + position: + x: 30 + y: 253 + positionAbsolute: + x: 30 + y: 253 + selected: false + sourcePosition: right + targetPosition: left + type: custom + width: 244 + - data: + answer: 'Current Value Of `conv_var` is:{{#conversation.conv_var#}}' + desc: '' + selected: false + title: Answer + type: answer + variables: [] + height: 106 + id: answer + position: + x: 638 + y: 253 + positionAbsolute: + x: 638 + y: 253 + selected: true + sourcePosition: right + targetPosition: left + type: custom + width: 244 + - data: + assigned_variable_selector: + - conversation + - conv_var + desc: '' + input_variable_selector: + - sys + - query + selected: false + title: Variable Assigner + type: assigner + write_mode: over-write + height: 84 + id: assigner + position: + x: 334 + y: 253 + positionAbsolute: + x: 334 + y: 253 + selected: false + sourcePosition: right + targetPosition: left + type: custom + width: 244 + viewport: + x: 0 + y: 0 + zoom: 0.7 diff --git a/api/tests/unit_tests/core/workflow/graph_engine/test_streaming_conversation_variables.py b/api/tests/unit_tests/core/workflow/graph_engine/test_streaming_conversation_variables.py index 1f4c063bf0..99157a7c3e 100644 --- a/api/tests/unit_tests/core/workflow/graph_engine/test_streaming_conversation_variables.py +++ b/api/tests/unit_tests/core/workflow/graph_engine/test_streaming_conversation_variables.py @@ -45,3 +45,33 @@ def test_streaming_conversation_variables(): runner = TableTestRunner() result = runner.run_test_case(case) assert result.success, f"Test failed: {result.error}" + + +def test_streaming_conversation_variables_v1_overwrite_waits_for_assignment(): + fixture_name = "test_streaming_conversation_variables_v1_overwrite" + input_query = "overwrite-value" + + case = WorkflowTestCase( + fixture_path=fixture_name, + use_auto_mock=False, + mock_config=MockConfigBuilder().build(), + query=input_query, + inputs={}, + expected_outputs={"answer": f"Current Value Of `conv_var` is:{input_query}"}, + ) + + runner = TableTestRunner() + result = runner.run_test_case(case) + assert result.success, f"Test failed: {result.error}" + + events = result.events + conv_var_chunk_events = [ + event + for event in events + if isinstance(event, NodeRunStreamChunkEvent) and tuple(event.selector) == ("conversation", "conv_var") + ] + + assert conv_var_chunk_events, "Expected conversation variable chunk events to be emitted" + assert all(event.chunk == input_query for event in conv_var_chunk_events), ( + "Expected streamed conversation variable value to match the input query" + )