diff --git a/api/.importlinter b/api/.importlinter index acb21ae522..2dec958788 100644 --- a/api/.importlinter +++ b/api/.importlinter @@ -53,7 +53,6 @@ ignore_imports = core.workflow.nodes.llm.llm_utils -> extensions.ext_database core.workflow.nodes.llm.node -> extensions.ext_database core.workflow.nodes.tool.tool_node -> extensions.ext_database - core.workflow.nodes.variable_assigner.common.impl -> extensions.ext_database core.workflow.graph_engine.command_channels.redis_channel -> extensions.ext_redis core.workflow.graph_engine.manager -> extensions.ext_redis core.workflow.nodes.knowledge_retrieval.knowledge_retrieval_node -> extensions.ext_redis diff --git a/api/core/app/apps/advanced_chat/app_runner.py b/api/core/app/apps/advanced_chat/app_runner.py index ee092e55c5..a440b35035 100644 --- a/api/core/app/apps/advanced_chat/app_runner.py +++ b/api/core/app/apps/advanced_chat/app_runner.py @@ -20,6 +20,7 @@ from core.app.entities.queue_entities import ( QueueTextChunkEvent, ) from core.app.features.annotation_reply.annotation_reply import AnnotationReplyFeature +from core.app.layers.conversation_variable_persist_layer import ConversationVariablePersistenceLayer from core.moderation.base import ModerationError from core.moderation.input_moderation import InputModeration from core.variables.variables import VariableUnion @@ -40,6 +41,7 @@ from models import Workflow from models.enums import UserFrom from models.model import App, Conversation, Message, MessageAnnotation from models.workflow import ConversationVariable +from services.conversation_variable_updater import conversation_variable_updater_factory logger = logging.getLogger(__name__) @@ -200,6 +202,8 @@ class AdvancedChatAppRunner(WorkflowBasedAppRunner): ) workflow_entry.graph_engine.layer(persistence_layer) + conversation_variable_layer = ConversationVariablePersistenceLayer(conversation_variable_updater_factory()) + workflow_entry.graph_engine.layer(conversation_variable_layer) for layer in self._graph_engine_layers: workflow_entry.graph_engine.layer(layer) diff --git a/api/core/app/layers/conversation_variable_persist_layer.py b/api/core/app/layers/conversation_variable_persist_layer.py new file mode 100644 index 0000000000..77cc00bdc9 --- /dev/null +++ b/api/core/app/layers/conversation_variable_persist_layer.py @@ -0,0 +1,60 @@ +import logging + +from core.variables import Variable +from core.workflow.constants import CONVERSATION_VARIABLE_NODE_ID +from core.workflow.conversation_variable_updater import ConversationVariableUpdater +from core.workflow.enums import NodeType +from core.workflow.graph_engine.layers.base import GraphEngineLayer +from core.workflow.graph_events import GraphEngineEvent, NodeRunSucceededEvent +from core.workflow.nodes.variable_assigner.common import helpers as common_helpers + +logger = logging.getLogger(__name__) + + +class ConversationVariablePersistenceLayer(GraphEngineLayer): + def __init__(self, conversation_variable_updater: ConversationVariableUpdater) -> None: + super().__init__() + self._conversation_variable_updater = conversation_variable_updater + + def on_graph_start(self) -> None: + pass + + def on_event(self, event: GraphEngineEvent) -> None: + if not isinstance(event, NodeRunSucceededEvent): + return + if event.node_type != NodeType.VARIABLE_ASSIGNER: + return + if self.graph_runtime_state is None: + return + + updated_variables = common_helpers.get_updated_variables(event.node_run_result.process_data) or [] + if not updated_variables: + return + + conversation_id = self.graph_runtime_state.system_variable.conversation_id + if conversation_id is None: + return + + updated_any = False + for item in updated_variables: + selector = item.selector + if len(selector) < 2: + logger.warning("Conversation variable selector invalid. selector=%s", selector) + continue + if selector[0] != CONVERSATION_VARIABLE_NODE_ID: + continue + variable = self.graph_runtime_state.variable_pool.get(selector) + if not isinstance(variable, Variable): + logger.warning( + "Conversation variable not found in variable pool. selector=%s", + selector, + ) + continue + self._conversation_variable_updater.update(conversation_id=conversation_id, variable=variable) + updated_any = True + + if updated_any: + self._conversation_variable_updater.flush() + + def on_graph_end(self, error: Exception | None) -> None: + pass diff --git a/api/core/workflow/nodes/node_factory.py b/api/core/workflow/nodes/node_factory.py index f177aef665..557d3a330d 100644 --- a/api/core/workflow/nodes/node_factory.py +++ b/api/core/workflow/nodes/node_factory.py @@ -113,7 +113,6 @@ class DifyNodeFactory(NodeFactory): code_providers=self._code_providers, code_limits=self._code_limits, ) - if node_type == NodeType.TEMPLATE_TRANSFORM: return TemplateTransformNode( id=node_id, diff --git a/api/core/workflow/nodes/variable_assigner/v1/node.py b/api/core/workflow/nodes/variable_assigner/v1/node.py index da23207b62..d2ea7d94ea 100644 --- a/api/core/workflow/nodes/variable_assigner/v1/node.py +++ b/api/core/workflow/nodes/variable_assigner/v1/node.py @@ -1,9 +1,8 @@ -from collections.abc import Callable, Mapping, Sequence -from typing import TYPE_CHECKING, Any, TypeAlias +from collections.abc import Mapping, Sequence +from typing import TYPE_CHECKING, Any from core.variables import SegmentType, Variable from core.workflow.constants import CONVERSATION_VARIABLE_NODE_ID -from core.workflow.conversation_variable_updater import ConversationVariableUpdater from core.workflow.entities import GraphInitParams from core.workflow.enums import NodeType, WorkflowNodeExecutionStatus from core.workflow.node_events import NodeRunResult @@ -11,19 +10,14 @@ from core.workflow.nodes.base.node import Node from core.workflow.nodes.variable_assigner.common import helpers as common_helpers from core.workflow.nodes.variable_assigner.common.exc import VariableOperatorNodeError -from ..common.impl import conversation_variable_updater_factory from .node_data import VariableAssignerData, WriteMode if TYPE_CHECKING: from core.workflow.runtime import GraphRuntimeState -_CONV_VAR_UPDATER_FACTORY: TypeAlias = Callable[[], ConversationVariableUpdater] - - class VariableAssignerNode(Node[VariableAssignerData]): node_type = NodeType.VARIABLE_ASSIGNER - _conv_var_updater_factory: _CONV_VAR_UPDATER_FACTORY def __init__( self, @@ -31,7 +25,6 @@ class VariableAssignerNode(Node[VariableAssignerData]): config: Mapping[str, Any], graph_init_params: "GraphInitParams", graph_runtime_state: "GraphRuntimeState", - conv_var_updater_factory: _CONV_VAR_UPDATER_FACTORY = conversation_variable_updater_factory, ): super().__init__( id=id, @@ -39,7 +32,6 @@ class VariableAssignerNode(Node[VariableAssignerData]): graph_init_params=graph_init_params, graph_runtime_state=graph_runtime_state, ) - self._conv_var_updater_factory = conv_var_updater_factory @classmethod def version(cls) -> str: @@ -96,16 +88,7 @@ class VariableAssignerNode(Node[VariableAssignerData]): # Over write the variable. self.graph_runtime_state.variable_pool.add(assigned_variable_selector, updated_variable) - # TODO: Move database operation to the pipeline. - # Update conversation variable. - conversation_id = self.graph_runtime_state.variable_pool.get(["sys", "conversation_id"]) - if not conversation_id: - raise VariableOperatorNodeError("conversation_id not found") - conv_var_updater = self._conv_var_updater_factory() - conv_var_updater.update(conversation_id=conversation_id.text, variable=updated_variable) - conv_var_updater.flush() updated_variables = [common_helpers.variable_to_processed_data(assigned_variable_selector, updated_variable)] - return NodeRunResult( status=WorkflowNodeExecutionStatus.SUCCEEDED, inputs={ diff --git a/api/core/workflow/nodes/variable_assigner/v2/node.py b/api/core/workflow/nodes/variable_assigner/v2/node.py index 389fb54d35..486e6bb6a7 100644 --- a/api/core/workflow/nodes/variable_assigner/v2/node.py +++ b/api/core/workflow/nodes/variable_assigner/v2/node.py @@ -1,24 +1,20 @@ import json from collections.abc import Mapping, MutableMapping, Sequence -from typing import Any, cast +from typing import TYPE_CHECKING, Any -from core.app.entities.app_invoke_entities import InvokeFrom from core.variables import SegmentType, Variable from core.variables.consts import SELECTORS_LENGTH from core.workflow.constants import CONVERSATION_VARIABLE_NODE_ID -from core.workflow.conversation_variable_updater import ConversationVariableUpdater from core.workflow.enums import NodeType, WorkflowNodeExecutionStatus from core.workflow.node_events import NodeRunResult from core.workflow.nodes.base.node import Node from core.workflow.nodes.variable_assigner.common import helpers as common_helpers from core.workflow.nodes.variable_assigner.common.exc import VariableOperatorNodeError -from core.workflow.nodes.variable_assigner.common.impl import conversation_variable_updater_factory from . import helpers from .entities import VariableAssignerNodeData, VariableOperationItem from .enums import InputType, Operation from .exc import ( - ConversationIDNotFoundError, InputTypeNotSupportedError, InvalidDataError, InvalidInputValueError, @@ -26,6 +22,10 @@ from .exc import ( VariableNotFoundError, ) +if TYPE_CHECKING: + from core.workflow.entities import GraphInitParams + from core.workflow.runtime import GraphRuntimeState + def _target_mapping_from_item(mapping: MutableMapping[str, Sequence[str]], node_id: str, item: VariableOperationItem): selector_node_id = item.variable_selector[0] @@ -53,6 +53,20 @@ def _source_mapping_from_item(mapping: MutableMapping[str, Sequence[str]], node_ class VariableAssignerNode(Node[VariableAssignerNodeData]): node_type = NodeType.VARIABLE_ASSIGNER + def __init__( + self, + id: str, + config: Mapping[str, Any], + graph_init_params: "GraphInitParams", + graph_runtime_state: "GraphRuntimeState", + ): + super().__init__( + id=id, + config=config, + graph_init_params=graph_init_params, + graph_runtime_state=graph_runtime_state, + ) + def blocks_variable_output(self, variable_selectors: set[tuple[str, ...]]) -> bool: """ Check if this Variable Assigner node blocks the output of specific variables. @@ -70,9 +84,6 @@ class VariableAssignerNode(Node[VariableAssignerNodeData]): return False - def _conv_var_updater_factory(self) -> ConversationVariableUpdater: - return conversation_variable_updater_factory() - @classmethod def version(cls) -> str: return "2" @@ -179,26 +190,12 @@ class VariableAssignerNode(Node[VariableAssignerNodeData]): # remove the duplicated items first. updated_variable_selectors = list(set(map(tuple, updated_variable_selectors))) - conv_var_updater = self._conv_var_updater_factory() - # Update variables for selector in updated_variable_selectors: variable = self.graph_runtime_state.variable_pool.get(selector) if not isinstance(variable, Variable): raise VariableNotFoundError(variable_selector=selector) process_data[variable.name] = variable.value - if variable.selector[0] == CONVERSATION_VARIABLE_NODE_ID: - conversation_id = self.graph_runtime_state.variable_pool.get(["sys", "conversation_id"]) - if not conversation_id: - if self.invoke_from != InvokeFrom.DEBUGGER: - raise ConversationIDNotFoundError - else: - conversation_id = conversation_id.value - conv_var_updater.update( - conversation_id=cast(str, conversation_id), - variable=variable, - ) - conv_var_updater.flush() updated_variables = [ common_helpers.variable_to_processed_data(selector, seg) for selector in updated_variable_selectors diff --git a/api/core/workflow/runtime/graph_runtime_state_protocol.py b/api/core/workflow/runtime/graph_runtime_state_protocol.py index 5e0878e873..bfbb5ba704 100644 --- a/api/core/workflow/runtime/graph_runtime_state_protocol.py +++ b/api/core/workflow/runtime/graph_runtime_state_protocol.py @@ -1,4 +1,4 @@ -from collections.abc import Mapping +from collections.abc import Mapping, Sequence from typing import Any, Protocol from core.model_runtime.entities.llm_entities import LLMUsage @@ -9,7 +9,7 @@ from core.workflow.system_variable import SystemVariableReadOnlyView class ReadOnlyVariablePool(Protocol): """Read-only interface for VariablePool.""" - def get(self, node_id: str, variable_key: str) -> Segment | None: + def get(self, selector: Sequence[str], /) -> Segment | None: """Get a variable value (read-only).""" ... diff --git a/api/core/workflow/runtime/read_only_wrappers.py b/api/core/workflow/runtime/read_only_wrappers.py index 8539727fd6..d3e4c60d9b 100644 --- a/api/core/workflow/runtime/read_only_wrappers.py +++ b/api/core/workflow/runtime/read_only_wrappers.py @@ -1,6 +1,6 @@ from __future__ import annotations -from collections.abc import Mapping +from collections.abc import Mapping, Sequence from copy import deepcopy from typing import Any @@ -18,9 +18,9 @@ class ReadOnlyVariablePoolWrapper: def __init__(self, variable_pool: VariablePool) -> None: self._variable_pool = variable_pool - def get(self, node_id: str, variable_key: str) -> Segment | None: + def get(self, selector: Sequence[str], /) -> Segment | None: """Return a copy of a variable value if present.""" - value = self._variable_pool.get([node_id, variable_key]) + value = self._variable_pool.get(selector) return deepcopy(value) if value is not None else None def get_all_by_node(self, node_id: str) -> Mapping[str, object]: diff --git a/api/services/conversation_service.py b/api/services/conversation_service.py index f56de36ef7..038c9feb12 100644 --- a/api/services/conversation_service.py +++ b/api/services/conversation_service.py @@ -11,13 +11,13 @@ from core.app.entities.app_invoke_entities import InvokeFrom from core.db.session_factory import session_factory from core.llm_generator.llm_generator import LLMGenerator from core.variables.types import SegmentType -from core.workflow.nodes.variable_assigner.common.impl import conversation_variable_updater_factory from extensions.ext_database import db from factories import variable_factory from libs.datetime_utils import naive_utc_now from libs.infinite_scroll_pagination import InfiniteScrollPagination from models import Account, ConversationVariable from models.model import App, Conversation, EndUser, Message +from services.conversation_variable_updater import conversation_variable_updater_factory from services.errors.conversation import ( ConversationNotExistsError, ConversationVariableNotExistsError, diff --git a/api/core/workflow/nodes/variable_assigner/common/impl.py b/api/services/conversation_variable_updater.py similarity index 72% rename from api/core/workflow/nodes/variable_assigner/common/impl.py rename to api/services/conversation_variable_updater.py index 050e213535..2da507fac7 100644 --- a/api/core/workflow/nodes/variable_assigner/common/impl.py +++ b/api/services/conversation_variable_updater.py @@ -5,22 +5,24 @@ from core.variables.variables import Variable from extensions.ext_database import db from models import ConversationVariable -from .exc import VariableOperatorNodeError + +class ConversationVariableNotFoundError(Exception): + pass class ConversationVariableUpdaterImpl: - def update(self, conversation_id: str, variable: Variable): + def update(self, conversation_id: str, variable: Variable) -> None: stmt = select(ConversationVariable).where( ConversationVariable.id == variable.id, ConversationVariable.conversation_id == conversation_id ) with Session(db.engine) as session: row = session.scalar(stmt) if not row: - raise VariableOperatorNodeError("conversation variable not found in the database") + raise ConversationVariableNotFoundError("conversation variable not found in the database") row.data = variable.model_dump_json() session.commit() - def flush(self): + def flush(self) -> None: pass diff --git a/api/tests/unit_tests/core/app/layers/test_conversation_variable_persist_layer.py b/api/tests/unit_tests/core/app/layers/test_conversation_variable_persist_layer.py new file mode 100644 index 0000000000..b6e8cc9c8e --- /dev/null +++ b/api/tests/unit_tests/core/app/layers/test_conversation_variable_persist_layer.py @@ -0,0 +1,144 @@ +from collections.abc import Sequence +from datetime import datetime +from unittest.mock import Mock + +from core.app.layers.conversation_variable_persist_layer import ConversationVariablePersistenceLayer +from core.variables import StringVariable +from core.variables.segments import Segment +from core.workflow.constants import CONVERSATION_VARIABLE_NODE_ID +from core.workflow.enums import NodeType, WorkflowNodeExecutionStatus +from core.workflow.graph_engine.protocols.command_channel import CommandChannel +from core.workflow.graph_events.node import NodeRunSucceededEvent +from core.workflow.node_events import NodeRunResult +from core.workflow.nodes.variable_assigner.common import helpers as common_helpers +from core.workflow.runtime.graph_runtime_state_protocol import ReadOnlyGraphRuntimeState +from core.workflow.system_variable import SystemVariable + + +class MockReadOnlyVariablePool: + def __init__(self, variables: dict[tuple[str, str], Segment] | None = None) -> None: + self._variables = variables or {} + + def get(self, selector: Sequence[str]) -> Segment | None: + if len(selector) < 2: + return None + return self._variables.get((selector[0], selector[1])) + + def get_all_by_node(self, node_id: str) -> dict[str, object]: + return {key: value for (nid, key), value in self._variables.items() if nid == node_id} + + def get_by_prefix(self, prefix: str) -> dict[str, object]: + return {key: value for (nid, key), value in self._variables.items() if nid == prefix} + + +def _build_graph_runtime_state( + variable_pool: MockReadOnlyVariablePool, + conversation_id: str | None = None, +) -> ReadOnlyGraphRuntimeState: + graph_runtime_state = Mock(spec=ReadOnlyGraphRuntimeState) + graph_runtime_state.variable_pool = variable_pool + graph_runtime_state.system_variable = SystemVariable(conversation_id=conversation_id).as_view() + return graph_runtime_state + + +def _build_node_run_succeeded_event( + *, + node_type: NodeType, + outputs: dict[str, object] | None = None, + process_data: dict[str, object] | None = None, +) -> NodeRunSucceededEvent: + return NodeRunSucceededEvent( + id="node-exec-id", + node_id="assigner", + node_type=node_type, + start_at=datetime.utcnow(), + node_run_result=NodeRunResult( + status=WorkflowNodeExecutionStatus.SUCCEEDED, + outputs=outputs or {}, + process_data=process_data or {}, + ), + ) + + +def test_persists_conversation_variables_from_assigner_output(): + conversation_id = "conv-123" + variable = StringVariable( + id="var-1", + name="name", + value="updated", + selector=[CONVERSATION_VARIABLE_NODE_ID, "name"], + ) + process_data = common_helpers.set_updated_variables( + {}, [common_helpers.variable_to_processed_data(variable.selector, variable)] + ) + + variable_pool = MockReadOnlyVariablePool({(CONVERSATION_VARIABLE_NODE_ID, "name"): variable}) + + updater = Mock() + layer = ConversationVariablePersistenceLayer(updater) + layer.initialize(_build_graph_runtime_state(variable_pool, conversation_id), Mock(spec=CommandChannel)) + + event = _build_node_run_succeeded_event(node_type=NodeType.VARIABLE_ASSIGNER, process_data=process_data) + layer.on_event(event) + + updater.update.assert_called_once_with(conversation_id=conversation_id, variable=variable) + updater.flush.assert_called_once() + + +def test_skips_when_outputs_missing(): + conversation_id = "conv-456" + variable = StringVariable( + id="var-2", + name="name", + value="updated", + selector=[CONVERSATION_VARIABLE_NODE_ID, "name"], + ) + + variable_pool = MockReadOnlyVariablePool({(CONVERSATION_VARIABLE_NODE_ID, "name"): variable}) + + updater = Mock() + layer = ConversationVariablePersistenceLayer(updater) + layer.initialize(_build_graph_runtime_state(variable_pool, conversation_id), Mock(spec=CommandChannel)) + + event = _build_node_run_succeeded_event(node_type=NodeType.VARIABLE_ASSIGNER) + layer.on_event(event) + + updater.update.assert_not_called() + updater.flush.assert_not_called() + + +def test_skips_non_assigner_nodes(): + updater = Mock() + layer = ConversationVariablePersistenceLayer(updater) + layer.initialize(_build_graph_runtime_state(MockReadOnlyVariablePool()), Mock(spec=CommandChannel)) + + event = _build_node_run_succeeded_event(node_type=NodeType.LLM) + layer.on_event(event) + + updater.update.assert_not_called() + updater.flush.assert_not_called() + + +def test_skips_non_conversation_variables(): + conversation_id = "conv-789" + non_conversation_variable = StringVariable( + id="var-3", + name="name", + value="updated", + selector=["environment", "name"], + ) + process_data = common_helpers.set_updated_variables( + {}, [common_helpers.variable_to_processed_data(non_conversation_variable.selector, non_conversation_variable)] + ) + + variable_pool = MockReadOnlyVariablePool() + + updater = Mock() + layer = ConversationVariablePersistenceLayer(updater) + layer.initialize(_build_graph_runtime_state(variable_pool, conversation_id), Mock(spec=CommandChannel)) + + event = _build_node_run_succeeded_event(node_type=NodeType.VARIABLE_ASSIGNER, process_data=process_data) + layer.on_event(event) + + updater.update.assert_not_called() + updater.flush.assert_not_called() diff --git a/api/tests/unit_tests/core/app/layers/test_pause_state_persist_layer.py b/api/tests/unit_tests/core/app/layers/test_pause_state_persist_layer.py index 5a5386ee57..1d885f6b2e 100644 --- a/api/tests/unit_tests/core/app/layers/test_pause_state_persist_layer.py +++ b/api/tests/unit_tests/core/app/layers/test_pause_state_persist_layer.py @@ -1,4 +1,5 @@ import json +from collections.abc import Sequence from time import time from unittest.mock import Mock @@ -67,8 +68,10 @@ class MockReadOnlyVariablePool: def __init__(self, variables: dict[tuple[str, str], object] | None = None): self._variables = variables or {} - def get(self, node_id: str, variable_key: str) -> Segment | None: - value = self._variables.get((node_id, variable_key)) + def get(self, selector: Sequence[str]) -> Segment | None: + if len(selector) < 2: + return None + value = self._variables.get((selector[0], selector[1])) if value is None: return None mock_segment = Mock(spec=Segment) diff --git a/api/tests/unit_tests/core/workflow/nodes/variable_assigner/v1/test_variable_assigner_v1.py b/api/tests/unit_tests/core/workflow/nodes/variable_assigner/v1/test_variable_assigner_v1.py index c62fc4d8fe..1df75380af 100644 --- a/api/tests/unit_tests/core/workflow/nodes/variable_assigner/v1/test_variable_assigner_v1.py +++ b/api/tests/unit_tests/core/workflow/nodes/variable_assigner/v1/test_variable_assigner_v1.py @@ -1,14 +1,14 @@ import time import uuid -from unittest import mock from uuid import uuid4 from core.app.entities.app_invoke_entities import InvokeFrom from core.variables import ArrayStringVariable, StringVariable -from core.workflow.conversation_variable_updater import ConversationVariableUpdater from core.workflow.entities import GraphInitParams from core.workflow.graph import Graph +from core.workflow.graph_events.node import NodeRunSucceededEvent from core.workflow.nodes.node_factory import DifyNodeFactory +from core.workflow.nodes.variable_assigner.common import helpers as common_helpers from core.workflow.nodes.variable_assigner.v1 import VariableAssignerNode from core.workflow.nodes.variable_assigner.v1.node_data import WriteMode from core.workflow.runtime import GraphRuntimeState, VariablePool @@ -86,9 +86,6 @@ def test_overwrite_string_variable(): ) graph = Graph.init(graph_config=graph_config, node_factory=node_factory) - mock_conv_var_updater = mock.Mock(spec=ConversationVariableUpdater) - mock_conv_var_updater_factory = mock.Mock(return_value=mock_conv_var_updater) - node_config = { "id": "node_id", "data": { @@ -104,20 +101,14 @@ def test_overwrite_string_variable(): graph_init_params=init_params, graph_runtime_state=graph_runtime_state, config=node_config, - conv_var_updater_factory=mock_conv_var_updater_factory, ) - list(node.run()) - expected_var = StringVariable( - id=conversation_variable.id, - name=conversation_variable.name, - description=conversation_variable.description, - selector=conversation_variable.selector, - value_type=conversation_variable.value_type, - value=input_variable.value, - ) - mock_conv_var_updater.update.assert_called_once_with(conversation_id=conversation_id, variable=expected_var) - mock_conv_var_updater.flush.assert_called_once() + events = list(node.run()) + succeeded_event = next(event for event in events if isinstance(event, NodeRunSucceededEvent)) + updated_variables = common_helpers.get_updated_variables(succeeded_event.node_run_result.process_data) + assert updated_variables is not None + assert updated_variables[0].name == conversation_variable.name + assert updated_variables[0].new_value == input_variable.value got = variable_pool.get(["conversation", conversation_variable.name]) assert got is not None @@ -191,9 +182,6 @@ def test_append_variable_to_array(): ) graph = Graph.init(graph_config=graph_config, node_factory=node_factory) - mock_conv_var_updater = mock.Mock(spec=ConversationVariableUpdater) - mock_conv_var_updater_factory = mock.Mock(return_value=mock_conv_var_updater) - node_config = { "id": "node_id", "data": { @@ -209,22 +197,14 @@ def test_append_variable_to_array(): graph_init_params=init_params, graph_runtime_state=graph_runtime_state, config=node_config, - conv_var_updater_factory=mock_conv_var_updater_factory, ) - list(node.run()) - expected_value = list(conversation_variable.value) - expected_value.append(input_variable.value) - expected_var = ArrayStringVariable( - id=conversation_variable.id, - name=conversation_variable.name, - description=conversation_variable.description, - selector=conversation_variable.selector, - value_type=conversation_variable.value_type, - value=expected_value, - ) - mock_conv_var_updater.update.assert_called_once_with(conversation_id=conversation_id, variable=expected_var) - mock_conv_var_updater.flush.assert_called_once() + events = list(node.run()) + succeeded_event = next(event for event in events if isinstance(event, NodeRunSucceededEvent)) + updated_variables = common_helpers.get_updated_variables(succeeded_event.node_run_result.process_data) + assert updated_variables is not None + assert updated_variables[0].name == conversation_variable.name + assert updated_variables[0].new_value == ["the first value", "the second value"] got = variable_pool.get(["conversation", conversation_variable.name]) assert got is not None @@ -287,9 +267,6 @@ def test_clear_array(): ) graph = Graph.init(graph_config=graph_config, node_factory=node_factory) - mock_conv_var_updater = mock.Mock(spec=ConversationVariableUpdater) - mock_conv_var_updater_factory = mock.Mock(return_value=mock_conv_var_updater) - node_config = { "id": "node_id", "data": { @@ -305,20 +282,14 @@ def test_clear_array(): graph_init_params=init_params, graph_runtime_state=graph_runtime_state, config=node_config, - conv_var_updater_factory=mock_conv_var_updater_factory, ) - list(node.run()) - expected_var = ArrayStringVariable( - id=conversation_variable.id, - name=conversation_variable.name, - description=conversation_variable.description, - selector=conversation_variable.selector, - value_type=conversation_variable.value_type, - value=[], - ) - mock_conv_var_updater.update.assert_called_once_with(conversation_id=conversation_id, variable=expected_var) - mock_conv_var_updater.flush.assert_called_once() + events = list(node.run()) + succeeded_event = next(event for event in events if isinstance(event, NodeRunSucceededEvent)) + updated_variables = common_helpers.get_updated_variables(succeeded_event.node_run_result.process_data) + assert updated_variables is not None + assert updated_variables[0].name == conversation_variable.name + assert updated_variables[0].new_value == [] got = variable_pool.get(["conversation", conversation_variable.name]) assert got is not None diff --git a/api/tests/unit_tests/core/workflow/nodes/variable_assigner/v2/test_variable_assigner_v2.py b/api/tests/unit_tests/core/workflow/nodes/variable_assigner/v2/test_variable_assigner_v2.py index caa36734ad..353d56fe25 100644 --- a/api/tests/unit_tests/core/workflow/nodes/variable_assigner/v2/test_variable_assigner_v2.py +++ b/api/tests/unit_tests/core/workflow/nodes/variable_assigner/v2/test_variable_assigner_v2.py @@ -390,3 +390,42 @@ def test_remove_last_from_empty_array(): got = variable_pool.get(["conversation", conversation_variable.name]) assert got is not None assert got.to_object() == [] + + +def test_node_factory_creates_variable_assigner_node(): + graph_config = { + "edges": [], + "nodes": [ + { + "data": {"type": "assigner", "version": "2", "title": "Variable Assigner", "items": []}, + "id": "assigner", + }, + ], + } + + init_params = GraphInitParams( + tenant_id="1", + app_id="1", + workflow_id="1", + graph_config=graph_config, + user_id="1", + user_from=UserFrom.ACCOUNT, + invoke_from=InvokeFrom.DEBUGGER, + call_depth=0, + ) + variable_pool = VariablePool( + system_variables=SystemVariable(conversation_id="conversation_id"), + user_inputs={}, + environment_variables=[], + conversation_variables=[], + ) + graph_runtime_state = GraphRuntimeState(variable_pool=variable_pool, start_at=time.perf_counter()) + + node_factory = DifyNodeFactory( + graph_init_params=init_params, + graph_runtime_state=graph_runtime_state, + ) + + node = node_factory.create_node(graph_config["nodes"][0]) + + assert isinstance(node, VariableAssignerNode)