diff --git a/api/core/workflow/graph/__init__.py b/api/core/workflow/graph/__init__.py index 6bfed26c44..900e704b65 100644 --- a/api/core/workflow/graph/__init__.py +++ b/api/core/workflow/graph/__init__.py @@ -1,5 +1,16 @@ from .edge import Edge from .graph import Graph, NodeFactory +from .graph_runtime_state_protocol import ReadOnlyGraphRuntimeState, ReadOnlyVariablePool from .graph_template import GraphTemplate +from .read_only_state_wrapper import ReadOnlyGraphRuntimeStateWrapper, ReadOnlyVariablePoolWrapper -__all__ = ["Edge", "Graph", "GraphTemplate", "NodeFactory"] +__all__ = [ + "Edge", + "Graph", + "GraphTemplate", + "NodeFactory", + "ReadOnlyGraphRuntimeState", + "ReadOnlyVariablePool", + "ReadOnlyGraphRuntimeStateWrapper", + "ReadOnlyVariablePoolWrapper", +] diff --git a/api/core/workflow/graph/graph_runtime_state_protocol.py b/api/core/workflow/graph/graph_runtime_state_protocol.py new file mode 100644 index 0000000000..173076eb19 --- /dev/null +++ b/api/core/workflow/graph/graph_runtime_state_protocol.py @@ -0,0 +1,59 @@ +from typing import Any, Protocol + +from core.model_runtime.entities.llm_entities import LLMUsage + + +class ReadOnlyVariablePool(Protocol): + """Read-only interface for VariablePool.""" + + def get(self, node_id: str, variable_key: str) -> Any: + """Get a variable value (read-only).""" + ... + + def get_all_by_node(self, node_id: str) -> dict[str, Any]: + """Get all variables for a node (read-only).""" + ... + + +class ReadOnlyGraphRuntimeState(Protocol): + """ + Read-only view of GraphRuntimeState for layers. + + This protocol defines a read-only interface that prevents layers from + modifying the graph runtime state while still allowing observation. + All methods return defensive copies to ensure immutability. + """ + + @property + def variable_pool(self) -> ReadOnlyVariablePool: + """Get read-only access to the variable pool.""" + ... + + @property + def start_at(self) -> float: + """Get the start time (read-only).""" + ... + + @property + def total_tokens(self) -> int: + """Get the total tokens count (read-only).""" + ... + + @property + def llm_usage(self) -> LLMUsage: + """Get a copy of LLM usage info (read-only).""" + ... + + @property + def outputs(self) -> dict[str, Any]: + """Get a defensive copy of outputs (read-only).""" + ... + + @property + def node_run_steps(self) -> int: + """Get the node run steps count (read-only).""" + ... + + def get_output(self, key: str, default: Any = None) -> Any: + """Get a single output value (returns a copy).""" + ... \ No newline at end of file diff --git a/api/core/workflow/graph/read_only_state_wrapper.py b/api/core/workflow/graph/read_only_state_wrapper.py new file mode 100644 index 0000000000..f643baf5fc --- /dev/null +++ b/api/core/workflow/graph/read_only_state_wrapper.py @@ -0,0 +1,76 @@ +from copy import deepcopy +from typing import Any + +from core.model_runtime.entities.llm_entities import LLMUsage +from core.workflow.entities.graph_runtime_state import GraphRuntimeState +from core.workflow.entities.variable_pool import VariablePool + + +class ReadOnlyVariablePoolWrapper: + """Wrapper that provides read-only access to VariablePool.""" + + def __init__(self, variable_pool: VariablePool): + self._variable_pool = variable_pool + + def get(self, node_id: str, variable_key: str) -> Any: + """Get a variable value (returns a defensive copy).""" + value = self._variable_pool.get(node_id, variable_key) + return deepcopy(value) if value is not None else None + + def get_all_by_node(self, node_id: str) -> dict[str, Any]: + """Get all variables for a node (returns defensive copies).""" + variables = {} + if node_id in self._variable_pool.variable_dictionary: + for key, var in self._variable_pool.variable_dictionary[node_id].items(): + # FIXME(-LAN-): Handle the actual Variable object structure + value = var.value if hasattr(var, "value") else var + variables[key] = deepcopy(value) + return variables + + +class ReadOnlyGraphRuntimeStateWrapper: + """ + Wrapper that provides read-only access to GraphRuntimeState. + + This wrapper ensures that layers can observe the state without + modifying it. All returned values are defensive copies. + """ + + def __init__(self, state: GraphRuntimeState): + self._state = state + self._variable_pool_wrapper = ReadOnlyVariablePoolWrapper(state.variable_pool) + + @property + def variable_pool(self) -> ReadOnlyVariablePoolWrapper: + """Get read-only access to the variable pool.""" + return self._variable_pool_wrapper + + @property + def start_at(self) -> float: + """Get the start time (read-only).""" + return self._state.start_at + + @property + def total_tokens(self) -> int: + """Get the total tokens count (read-only).""" + return self._state.total_tokens + + @property + def llm_usage(self) -> LLMUsage: + """Get a copy of LLM usage info (read-only).""" + # Return a copy to prevent modification + return self._state.llm_usage.model_copy() + + @property + def outputs(self) -> dict[str, Any]: + """Get a defensive copy of outputs (read-only).""" + return deepcopy(self._state.outputs) + + @property + def node_run_steps(self) -> int: + """Get the node run steps count (read-only).""" + return self._state.node_run_steps + + def get_output(self, key: str, default: Any = None) -> Any: + """Get a single output value (returns a copy).""" + return self._state.get_output(key, default) \ No newline at end of file diff --git a/api/core/workflow/graph_engine/graph_engine.py b/api/core/workflow/graph_engine/graph_engine.py index 7fd2825020..fbb63dff8b 100644 --- a/api/core/workflow/graph_engine/graph_engine.py +++ b/api/core/workflow/graph_engine/graph_engine.py @@ -16,6 +16,7 @@ from flask import Flask, current_app from core.app.entities.app_invoke_entities import InvokeFrom from core.workflow.entities import GraphRuntimeState from core.workflow.enums import NodeExecutionType +from core.workflow.graph.read_only_state_wrapper import ReadOnlyGraphRuntimeStateWrapper from core.workflow.graph import Graph from core.workflow.graph_events import ( GraphEngineEvent, @@ -288,9 +289,11 @@ class GraphEngine: def _initialize_layers(self) -> None: """Initialize layers with context.""" self._event_manager.set_layers(self._layers) + # Create a read-only wrapper for the runtime state + read_only_state = ReadOnlyGraphRuntimeStateWrapper(self._graph_runtime_state) for layer in self._layers: try: - layer.initialize(self._graph_runtime_state, self._command_channel) + layer.initialize(read_only_state, self._command_channel) except Exception as e: logger.warning("Failed to initialize layer %s: %s", layer.__class__.__name__, e) diff --git a/api/core/workflow/graph_engine/layers/base.py b/api/core/workflow/graph_engine/layers/base.py index 9899d46016..dfac49e11a 100644 --- a/api/core/workflow/graph_engine/layers/base.py +++ b/api/core/workflow/graph_engine/layers/base.py @@ -7,7 +7,7 @@ intercept and respond to GraphEngine events. from abc import ABC, abstractmethod -from core.workflow.entities import GraphRuntimeState +from core.workflow.graph.graph_runtime_state_protocol import ReadOnlyGraphRuntimeState from core.workflow.graph_engine.protocols.command_channel import CommandChannel from core.workflow.graph_events import GraphEngineEvent @@ -27,19 +27,19 @@ class GraphEngineLayer(ABC): def __init__(self) -> None: """Initialize the layer. Subclasses can override with custom parameters.""" - self.graph_runtime_state: GraphRuntimeState | None = None + self.graph_runtime_state: ReadOnlyGraphRuntimeState | None = None self.command_channel: CommandChannel | None = None - def initialize(self, graph_runtime_state: GraphRuntimeState, command_channel: CommandChannel) -> None: + def initialize(self, graph_runtime_state: ReadOnlyGraphRuntimeState, command_channel: CommandChannel) -> None: """ Initialize the layer with engine dependencies. - Called by GraphEngine before execution starts to inject the runtime state - and command channel. This allows layers to access engine context and send - commands. + Called by GraphEngine before execution starts to inject the read-only runtime state + and command channel. This allows layers to observe engine context and send + commands, but prevents direct state modification. Args: - graph_runtime_state: The runtime state of the graph execution + graph_runtime_state: Read-only view of the runtime state command_channel: Channel for sending commands to the engine """ self.graph_runtime_state = graph_runtime_state