From d76ad15fca5ea6a1ecc1beada8cecda231f2ebbb Mon Sep 17 00:00:00 2001 From: -LAN- Date: Wed, 28 Jan 2026 00:54:21 +0800 Subject: [PATCH] =?UTF-8?q?refactor(graph=5Fengine):=20move=20observabilit?= =?UTF-8?q?y=20layer=20and=20persistence=20laye=E2=80=A6=20(#31620)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- api/.importlinter | 6 +--- api/core/app/apps/advanced_chat/app_runner.py | 2 +- api/core/app/apps/pipeline/pipeline_runner.py | 2 +- api/core/app/apps/workflow/app_runner.py | 2 +- api/core/app/workflow/layers/__init__.py | 10 ++++++ .../workflow}/layers/observability.py | 0 .../workflow}/layers/persistence.py | 4 ++- .../workflow/graph_engine/layers/__init__.py | 2 -- api/core/workflow/workflow_entry.py | 3 +- .../workflow/graph_engine/layers/conftest.py | 4 +-- .../graph_engine/layers/test_observability.py | 32 +++++++++---------- 11 files changed, 37 insertions(+), 30 deletions(-) create mode 100644 api/core/app/workflow/layers/__init__.py rename api/core/{workflow/graph_engine => app/workflow}/layers/observability.py (100%) rename api/core/{workflow/graph_engine => app/workflow}/layers/persistence.py (99%) diff --git a/api/.importlinter b/api/.importlinter index b676e97591..cc7ffc15c8 100644 --- a/api/.importlinter +++ b/api/.importlinter @@ -104,9 +104,7 @@ forbidden_modules = ignore_imports = core.workflow.nodes.loop.loop_node -> core.app.workflow.node_factory core.workflow.graph_engine.command_channels.redis_channel -> extensions.ext_redis - core.workflow.graph_engine.layers.observability -> configs - core.workflow.graph_engine.layers.observability -> extensions.otel.runtime - core.workflow.graph_engine.layers.persistence -> core.ops.ops_trace_manager + core.workflow.workflow_entry -> core.app.workflow.layers.observability core.workflow.graph_engine.worker_management.worker_pool -> configs core.workflow.nodes.agent.agent_node -> core.model_manager core.workflow.nodes.agent.agent_node -> core.provider_manager @@ -147,7 +145,6 @@ ignore_imports = core.workflow.workflow_entry -> models.workflow core.workflow.nodes.agent.agent_node -> core.agent.entities core.workflow.nodes.agent.agent_node -> core.agent.plugin_entities - core.workflow.graph_engine.layers.persistence -> core.app.entities.app_invoke_entities core.workflow.nodes.base.node -> core.app.entities.app_invoke_entities core.workflow.nodes.knowledge_index.knowledge_index_node -> core.app.entities.app_invoke_entities core.workflow.nodes.knowledge_retrieval.knowledge_retrieval_node -> core.app.app_config.entities @@ -217,7 +214,6 @@ ignore_imports = core.workflow.nodes.llm.node -> core.llm_generator.output_parser.errors core.workflow.nodes.llm.node -> core.llm_generator.output_parser.structured_output core.workflow.nodes.llm.node -> core.model_manager - core.workflow.graph_engine.layers.persistence -> core.ops.entities.trace_entity core.workflow.nodes.agent.entities -> core.prompt.entities.advanced_prompt_entities core.workflow.nodes.knowledge_retrieval.knowledge_retrieval_node -> core.prompt.simple_prompt_transform core.workflow.nodes.llm.entities -> core.prompt.entities.advanced_prompt_entities diff --git a/api/core/app/apps/advanced_chat/app_runner.py b/api/core/app/apps/advanced_chat/app_runner.py index a258144d35..d702db0908 100644 --- a/api/core/app/apps/advanced_chat/app_runner.py +++ b/api/core/app/apps/advanced_chat/app_runner.py @@ -21,6 +21,7 @@ from core.app.entities.queue_entities import ( ) from core.app.features.annotation_reply.annotation_reply import AnnotationReplyFeature from core.app.layers.conversation_variable_persist_layer import ConversationVariablePersistenceLayer +from core.app.workflow.layers.persistence import PersistenceWorkflowInfo, WorkflowPersistenceLayer from core.db.session_factory import session_factory from core.moderation.base import ModerationError from core.moderation.input_moderation import InputModeration @@ -28,7 +29,6 @@ from core.variables.variables import Variable from core.workflow.enums import WorkflowType from core.workflow.graph_engine.command_channels.redis_channel import RedisChannel from core.workflow.graph_engine.layers.base import GraphEngineLayer -from core.workflow.graph_engine.layers.persistence import PersistenceWorkflowInfo, WorkflowPersistenceLayer from core.workflow.repositories.workflow_execution_repository import WorkflowExecutionRepository from core.workflow.repositories.workflow_node_execution_repository import WorkflowNodeExecutionRepository from core.workflow.runtime import GraphRuntimeState, VariablePool diff --git a/api/core/app/apps/pipeline/pipeline_runner.py b/api/core/app/apps/pipeline/pipeline_runner.py index 34d02a1e51..8ea34344b2 100644 --- a/api/core/app/apps/pipeline/pipeline_runner.py +++ b/api/core/app/apps/pipeline/pipeline_runner.py @@ -9,12 +9,12 @@ from core.app.entities.app_invoke_entities import ( InvokeFrom, RagPipelineGenerateEntity, ) +from core.app.workflow.layers.persistence import PersistenceWorkflowInfo, WorkflowPersistenceLayer from core.app.workflow.node_factory import DifyNodeFactory from core.variables.variables import RAGPipelineVariable, RAGPipelineVariableInput from core.workflow.entities.graph_init_params import GraphInitParams from core.workflow.enums import WorkflowType from core.workflow.graph import Graph -from core.workflow.graph_engine.layers.persistence import PersistenceWorkflowInfo, WorkflowPersistenceLayer from core.workflow.graph_events import GraphEngineEvent, GraphRunFailedEvent from core.workflow.repositories.workflow_execution_repository import WorkflowExecutionRepository from core.workflow.repositories.workflow_node_execution_repository import WorkflowNodeExecutionRepository diff --git a/api/core/app/apps/workflow/app_runner.py b/api/core/app/apps/workflow/app_runner.py index 8dbdc1d58c..0ee3c177f2 100644 --- a/api/core/app/apps/workflow/app_runner.py +++ b/api/core/app/apps/workflow/app_runner.py @@ -7,10 +7,10 @@ from core.app.apps.base_app_queue_manager import AppQueueManager from core.app.apps.workflow.app_config_manager import WorkflowAppConfig from core.app.apps.workflow_app_runner import WorkflowBasedAppRunner from core.app.entities.app_invoke_entities import InvokeFrom, WorkflowAppGenerateEntity +from core.app.workflow.layers.persistence import PersistenceWorkflowInfo, WorkflowPersistenceLayer from core.workflow.enums import WorkflowType from core.workflow.graph_engine.command_channels.redis_channel import RedisChannel from core.workflow.graph_engine.layers.base import GraphEngineLayer -from core.workflow.graph_engine.layers.persistence import PersistenceWorkflowInfo, WorkflowPersistenceLayer from core.workflow.repositories.workflow_execution_repository import WorkflowExecutionRepository from core.workflow.repositories.workflow_node_execution_repository import WorkflowNodeExecutionRepository from core.workflow.runtime import GraphRuntimeState, VariablePool diff --git a/api/core/app/workflow/layers/__init__.py b/api/core/app/workflow/layers/__init__.py new file mode 100644 index 0000000000..945f75303c --- /dev/null +++ b/api/core/app/workflow/layers/__init__.py @@ -0,0 +1,10 @@ +"""Workflow-level GraphEngine layers that depend on outer infrastructure.""" + +from .observability import ObservabilityLayer +from .persistence import PersistenceWorkflowInfo, WorkflowPersistenceLayer + +__all__ = [ + "ObservabilityLayer", + "PersistenceWorkflowInfo", + "WorkflowPersistenceLayer", +] diff --git a/api/core/workflow/graph_engine/layers/observability.py b/api/core/app/workflow/layers/observability.py similarity index 100% rename from api/core/workflow/graph_engine/layers/observability.py rename to api/core/app/workflow/layers/observability.py diff --git a/api/core/workflow/graph_engine/layers/persistence.py b/api/core/app/workflow/layers/persistence.py similarity index 99% rename from api/core/workflow/graph_engine/layers/persistence.py rename to api/core/app/workflow/layers/persistence.py index e81df4f3b7..41052b4f52 100644 --- a/api/core/workflow/graph_engine/layers/persistence.py +++ b/api/core/app/workflow/layers/persistence.py @@ -45,7 +45,6 @@ from core.workflow.graph_events import ( from core.workflow.node_events import NodeRunResult from core.workflow.repositories.workflow_execution_repository import WorkflowExecutionRepository from core.workflow.repositories.workflow_node_execution_repository import WorkflowNodeExecutionRepository -from core.workflow.workflow_entry import WorkflowEntry from libs.datetime_utils import naive_utc_now @@ -316,6 +315,9 @@ class WorkflowPersistenceLayer(GraphEngineLayer): # workflow inputs stay reusable without binding future runs to this conversation. continue inputs[f"sys.{field_name}"] = value + # Local import to avoid circular dependency during app bootstrapping. + from core.workflow.workflow_entry import WorkflowEntry + handled = WorkflowEntry.handle_special_values(inputs) return handled or {} diff --git a/api/core/workflow/graph_engine/layers/__init__.py b/api/core/workflow/graph_engine/layers/__init__.py index 772433e48c..0a29a52993 100644 --- a/api/core/workflow/graph_engine/layers/__init__.py +++ b/api/core/workflow/graph_engine/layers/__init__.py @@ -8,11 +8,9 @@ with middleware-like components that can observe events and interact with execut from .base import GraphEngineLayer from .debug_logging import DebugLoggingLayer from .execution_limits import ExecutionLimitsLayer -from .observability import ObservabilityLayer __all__ = [ "DebugLoggingLayer", "ExecutionLimitsLayer", "GraphEngineLayer", - "ObservabilityLayer", ] diff --git a/api/core/workflow/workflow_entry.py b/api/core/workflow/workflow_entry.py index c7bcc66c8b..a325afb9fc 100644 --- a/api/core/workflow/workflow_entry.py +++ b/api/core/workflow/workflow_entry.py @@ -7,6 +7,7 @@ from typing import Any from configs import dify_config from core.app.apps.exc import GenerateTaskStoppedError from core.app.entities.app_invoke_entities import InvokeFrom +from core.app.workflow.layers.observability import ObservabilityLayer from core.app.workflow.node_factory import DifyNodeFactory from core.file.models import File from core.workflow.constants import ENVIRONMENT_VARIABLE_NODE_ID @@ -15,7 +16,7 @@ from core.workflow.errors import WorkflowNodeRunFailedError from core.workflow.graph import Graph from core.workflow.graph_engine import GraphEngine from core.workflow.graph_engine.command_channels import InMemoryChannel -from core.workflow.graph_engine.layers import DebugLoggingLayer, ExecutionLimitsLayer, ObservabilityLayer +from core.workflow.graph_engine.layers import DebugLoggingLayer, ExecutionLimitsLayer from core.workflow.graph_engine.protocols.command_channel import CommandChannel from core.workflow.graph_events import GraphEngineEvent, GraphNodeEventBase, GraphRunFailedEvent from core.workflow.nodes import NodeType diff --git a/api/tests/unit_tests/core/workflow/graph_engine/layers/conftest.py b/api/tests/unit_tests/core/workflow/graph_engine/layers/conftest.py index 51da3b7d73..35a234be0b 100644 --- a/api/tests/unit_tests/core/workflow/graph_engine/layers/conftest.py +++ b/api/tests/unit_tests/core/workflow/graph_engine/layers/conftest.py @@ -90,14 +90,14 @@ def mock_tool_node(): @pytest.fixture def mock_is_instrument_flag_enabled_false(): """Mock is_instrument_flag_enabled to return False.""" - with patch("core.workflow.graph_engine.layers.observability.is_instrument_flag_enabled", return_value=False): + with patch("core.app.workflow.layers.observability.is_instrument_flag_enabled", return_value=False): yield @pytest.fixture def mock_is_instrument_flag_enabled_true(): """Mock is_instrument_flag_enabled to return True.""" - with patch("core.workflow.graph_engine.layers.observability.is_instrument_flag_enabled", return_value=True): + with patch("core.app.workflow.layers.observability.is_instrument_flag_enabled", return_value=True): yield diff --git a/api/tests/unit_tests/core/workflow/graph_engine/layers/test_observability.py b/api/tests/unit_tests/core/workflow/graph_engine/layers/test_observability.py index 8cc080fe94..ade846df28 100644 --- a/api/tests/unit_tests/core/workflow/graph_engine/layers/test_observability.py +++ b/api/tests/unit_tests/core/workflow/graph_engine/layers/test_observability.py @@ -15,14 +15,14 @@ from unittest.mock import patch import pytest from opentelemetry.trace import StatusCode +from core.app.workflow.layers.observability import ObservabilityLayer from core.workflow.enums import NodeType -from core.workflow.graph_engine.layers.observability import ObservabilityLayer class TestObservabilityLayerInitialization: """Test ObservabilityLayer initialization logic.""" - @patch("core.workflow.graph_engine.layers.observability.dify_config.ENABLE_OTEL", True) + @patch("core.app.workflow.layers.observability.dify_config.ENABLE_OTEL", True) @pytest.mark.usefixtures("mock_is_instrument_flag_enabled_false") def test_initialization_when_otel_enabled(self, tracer_provider_with_memory_exporter): """Test that layer initializes correctly when OTel is enabled.""" @@ -32,7 +32,7 @@ class TestObservabilityLayerInitialization: assert NodeType.TOOL in layer._parsers assert layer._default_parser is not None - @patch("core.workflow.graph_engine.layers.observability.dify_config.ENABLE_OTEL", False) + @patch("core.app.workflow.layers.observability.dify_config.ENABLE_OTEL", False) @pytest.mark.usefixtures("mock_is_instrument_flag_enabled_true") def test_initialization_when_instrument_flag_enabled(self, tracer_provider_with_memory_exporter): """Test that layer enables when instrument flag is enabled.""" @@ -46,7 +46,7 @@ class TestObservabilityLayerInitialization: class TestObservabilityLayerNodeSpanLifecycle: """Test node span creation and lifecycle management.""" - @patch("core.workflow.graph_engine.layers.observability.dify_config.ENABLE_OTEL", True) + @patch("core.app.workflow.layers.observability.dify_config.ENABLE_OTEL", True) @pytest.mark.usefixtures("mock_is_instrument_flag_enabled_false") def test_node_span_created_and_ended( self, tracer_provider_with_memory_exporter, memory_span_exporter, mock_llm_node @@ -63,7 +63,7 @@ class TestObservabilityLayerNodeSpanLifecycle: assert spans[0].name == mock_llm_node.title assert spans[0].status.status_code == StatusCode.OK - @patch("core.workflow.graph_engine.layers.observability.dify_config.ENABLE_OTEL", True) + @patch("core.app.workflow.layers.observability.dify_config.ENABLE_OTEL", True) @pytest.mark.usefixtures("mock_is_instrument_flag_enabled_false") def test_node_error_recorded_in_span( self, tracer_provider_with_memory_exporter, memory_span_exporter, mock_llm_node @@ -82,7 +82,7 @@ class TestObservabilityLayerNodeSpanLifecycle: assert len(spans[0].events) > 0 assert any("exception" in event.name.lower() for event in spans[0].events) - @patch("core.workflow.graph_engine.layers.observability.dify_config.ENABLE_OTEL", True) + @patch("core.app.workflow.layers.observability.dify_config.ENABLE_OTEL", True) @pytest.mark.usefixtures("mock_is_instrument_flag_enabled_false") def test_node_end_without_start_handled_gracefully( self, tracer_provider_with_memory_exporter, memory_span_exporter, mock_llm_node @@ -100,7 +100,7 @@ class TestObservabilityLayerNodeSpanLifecycle: class TestObservabilityLayerParserIntegration: """Test parser integration for different node types.""" - @patch("core.workflow.graph_engine.layers.observability.dify_config.ENABLE_OTEL", True) + @patch("core.app.workflow.layers.observability.dify_config.ENABLE_OTEL", True) @pytest.mark.usefixtures("mock_is_instrument_flag_enabled_false") def test_default_parser_used_for_regular_node( self, tracer_provider_with_memory_exporter, memory_span_exporter, mock_start_node @@ -119,7 +119,7 @@ class TestObservabilityLayerParserIntegration: assert attrs["node.execution_id"] == mock_start_node.execution_id assert attrs["node.type"] == mock_start_node.node_type.value - @patch("core.workflow.graph_engine.layers.observability.dify_config.ENABLE_OTEL", True) + @patch("core.app.workflow.layers.observability.dify_config.ENABLE_OTEL", True) @pytest.mark.usefixtures("mock_is_instrument_flag_enabled_false") def test_tool_parser_used_for_tool_node( self, tracer_provider_with_memory_exporter, memory_span_exporter, mock_tool_node @@ -138,7 +138,7 @@ class TestObservabilityLayerParserIntegration: assert attrs["gen_ai.tool.name"] == mock_tool_node.title assert attrs["gen_ai.tool.type"] == mock_tool_node._node_data.provider_type.value - @patch("core.workflow.graph_engine.layers.observability.dify_config.ENABLE_OTEL", True) + @patch("core.app.workflow.layers.observability.dify_config.ENABLE_OTEL", True) @pytest.mark.usefixtures("mock_is_instrument_flag_enabled_false") def test_llm_parser_used_for_llm_node( self, tracer_provider_with_memory_exporter, memory_span_exporter, mock_llm_node, mock_result_event @@ -176,7 +176,7 @@ class TestObservabilityLayerParserIntegration: assert attrs["gen_ai.completion"] == "test completion" assert attrs["gen_ai.response.finish_reason"] == "stop" - @patch("core.workflow.graph_engine.layers.observability.dify_config.ENABLE_OTEL", True) + @patch("core.app.workflow.layers.observability.dify_config.ENABLE_OTEL", True) @pytest.mark.usefixtures("mock_is_instrument_flag_enabled_false") def test_retrieval_parser_used_for_retrieval_node( self, tracer_provider_with_memory_exporter, memory_span_exporter, mock_retrieval_node, mock_result_event @@ -204,7 +204,7 @@ class TestObservabilityLayerParserIntegration: assert attrs["retrieval.query"] == "test query" assert "retrieval.document" in attrs - @patch("core.workflow.graph_engine.layers.observability.dify_config.ENABLE_OTEL", True) + @patch("core.app.workflow.layers.observability.dify_config.ENABLE_OTEL", True) @pytest.mark.usefixtures("mock_is_instrument_flag_enabled_false") def test_result_event_extracts_inputs_and_outputs( self, tracer_provider_with_memory_exporter, memory_span_exporter, mock_start_node, mock_result_event @@ -235,7 +235,7 @@ class TestObservabilityLayerParserIntegration: class TestObservabilityLayerGraphLifecycle: """Test graph lifecycle management.""" - @patch("core.workflow.graph_engine.layers.observability.dify_config.ENABLE_OTEL", True) + @patch("core.app.workflow.layers.observability.dify_config.ENABLE_OTEL", True) @pytest.mark.usefixtures("mock_is_instrument_flag_enabled_false") def test_on_graph_start_clears_contexts(self, tracer_provider_with_memory_exporter, mock_llm_node): """Test that on_graph_start clears node contexts.""" @@ -248,7 +248,7 @@ class TestObservabilityLayerGraphLifecycle: layer.on_graph_start() assert len(layer._node_contexts) == 0 - @patch("core.workflow.graph_engine.layers.observability.dify_config.ENABLE_OTEL", True) + @patch("core.app.workflow.layers.observability.dify_config.ENABLE_OTEL", True) @pytest.mark.usefixtures("mock_is_instrument_flag_enabled_false") def test_on_graph_end_with_no_unfinished_spans( self, tracer_provider_with_memory_exporter, memory_span_exporter, mock_llm_node @@ -264,7 +264,7 @@ class TestObservabilityLayerGraphLifecycle: spans = memory_span_exporter.get_finished_spans() assert len(spans) == 1 - @patch("core.workflow.graph_engine.layers.observability.dify_config.ENABLE_OTEL", True) + @patch("core.app.workflow.layers.observability.dify_config.ENABLE_OTEL", True) @pytest.mark.usefixtures("mock_is_instrument_flag_enabled_false") def test_on_graph_end_with_unfinished_spans_logs_warning( self, tracer_provider_with_memory_exporter, mock_llm_node, caplog @@ -285,7 +285,7 @@ class TestObservabilityLayerGraphLifecycle: class TestObservabilityLayerDisabledMode: """Test behavior when layer is disabled.""" - @patch("core.workflow.graph_engine.layers.observability.dify_config.ENABLE_OTEL", False) + @patch("core.app.workflow.layers.observability.dify_config.ENABLE_OTEL", False) @pytest.mark.usefixtures("mock_is_instrument_flag_enabled_false") def test_disabled_mode_skips_node_start(self, memory_span_exporter, mock_start_node): """Test that disabled layer doesn't create spans on node start.""" @@ -299,7 +299,7 @@ class TestObservabilityLayerDisabledMode: spans = memory_span_exporter.get_finished_spans() assert len(spans) == 0 - @patch("core.workflow.graph_engine.layers.observability.dify_config.ENABLE_OTEL", False) + @patch("core.app.workflow.layers.observability.dify_config.ENABLE_OTEL", False) @pytest.mark.usefixtures("mock_is_instrument_flag_enabled_false") def test_disabled_mode_skips_node_end(self, memory_span_exporter, mock_llm_node): """Test that disabled layer doesn't process node end."""