diff --git a/api/core/workflow/graph_engine/command_channels/in_memory_channel.py b/api/core/workflow/graph_engine/command_channels/in_memory_channel.py index ef498e6890..bdaf236796 100644 --- a/api/core/workflow/graph_engine/command_channels/in_memory_channel.py +++ b/api/core/workflow/graph_engine/command_channels/in_memory_channel.py @@ -6,10 +6,12 @@ within a single process. Each instance handles commands for one workflow executi """ from queue import Queue +from typing import final from ..entities.commands import GraphEngineCommand +@final class InMemoryChannel: """ In-memory command channel implementation using a thread-safe queue. diff --git a/api/core/workflow/graph_engine/command_channels/redis_channel.py b/api/core/workflow/graph_engine/command_channels/redis_channel.py index 0350044690..7809e43e32 100644 --- a/api/core/workflow/graph_engine/command_channels/redis_channel.py +++ b/api/core/workflow/graph_engine/command_channels/redis_channel.py @@ -7,7 +7,7 @@ Each instance uses a unique key for its command queue. """ import json -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, final from ..entities.commands import AbortCommand, CommandType, GraphEngineCommand @@ -15,6 +15,7 @@ if TYPE_CHECKING: from extensions.ext_redis import RedisClientWrapper +@final class RedisChannel: """ Redis-based command channel implementation for distributed systems. diff --git a/api/core/workflow/graph_engine/command_processing/command_handlers.py b/api/core/workflow/graph_engine/command_processing/command_handlers.py index f8bae5e21a..9f8d20b1b9 100644 --- a/api/core/workflow/graph_engine/command_processing/command_handlers.py +++ b/api/core/workflow/graph_engine/command_processing/command_handlers.py @@ -3,6 +3,7 @@ Command handler implementations. """ import logging +from typing import final from ..domain.graph_execution import GraphExecution from ..entities.commands import AbortCommand, GraphEngineCommand @@ -11,6 +12,7 @@ from .command_processor import CommandHandler logger = logging.getLogger(__name__) +@final class AbortCommandHandler(CommandHandler): """Handles abort commands.""" diff --git a/api/core/workflow/graph_engine/command_processing/command_processor.py b/api/core/workflow/graph_engine/command_processing/command_processor.py index 06b3a8d8a4..2521058ef2 100644 --- a/api/core/workflow/graph_engine/command_processing/command_processor.py +++ b/api/core/workflow/graph_engine/command_processing/command_processor.py @@ -3,7 +3,7 @@ Main command processor for handling external commands. """ import logging -from typing import Protocol +from typing import Protocol, final from ..domain.graph_execution import GraphExecution from ..entities.commands import GraphEngineCommand @@ -18,6 +18,7 @@ class CommandHandler(Protocol): def handle(self, command: GraphEngineCommand, execution: GraphExecution) -> None: ... +@final class CommandProcessor: """ Processes external commands sent to the engine. diff --git a/api/core/workflow/graph_engine/error_handling/abort_strategy.py b/api/core/workflow/graph_engine/error_handling/abort_strategy.py index ba2a7b514f..6a805bd124 100644 --- a/api/core/workflow/graph_engine/error_handling/abort_strategy.py +++ b/api/core/workflow/graph_engine/error_handling/abort_strategy.py @@ -3,6 +3,7 @@ Abort error strategy implementation. """ import logging +from typing import final from core.workflow.graph import Graph from core.workflow.graph_events import GraphNodeEventBase, NodeRunFailedEvent @@ -10,6 +11,7 @@ from core.workflow.graph_events import GraphNodeEventBase, NodeRunFailedEvent logger = logging.getLogger(__name__) +@final class AbortStrategy: """ Error strategy that aborts execution on failure. diff --git a/api/core/workflow/graph_engine/error_handling/default_value_strategy.py b/api/core/workflow/graph_engine/error_handling/default_value_strategy.py index 19e5b4c0ae..61d36399aa 100644 --- a/api/core/workflow/graph_engine/error_handling/default_value_strategy.py +++ b/api/core/workflow/graph_engine/error_handling/default_value_strategy.py @@ -2,12 +2,15 @@ Default value error strategy implementation. """ +from typing import final + from core.workflow.enums import ErrorStrategy, WorkflowNodeExecutionMetadataKey, WorkflowNodeExecutionStatus from core.workflow.graph import Graph from core.workflow.graph_events import GraphNodeEventBase, NodeRunExceptionEvent, NodeRunFailedEvent from core.workflow.node_events import NodeRunResult +@final class DefaultValueStrategy: """ Error strategy that uses default values on failure. diff --git a/api/core/workflow/graph_engine/error_handling/fail_branch_strategy.py b/api/core/workflow/graph_engine/error_handling/fail_branch_strategy.py index d3be64539e..437c2bc7da 100644 --- a/api/core/workflow/graph_engine/error_handling/fail_branch_strategy.py +++ b/api/core/workflow/graph_engine/error_handling/fail_branch_strategy.py @@ -2,12 +2,15 @@ Fail branch error strategy implementation. """ +from typing import final + from core.workflow.enums import ErrorStrategy, WorkflowNodeExecutionMetadataKey, WorkflowNodeExecutionStatus from core.workflow.graph import Graph from core.workflow.graph_events import GraphNodeEventBase, NodeRunExceptionEvent, NodeRunFailedEvent from core.workflow.node_events import NodeRunResult +@final class FailBranchStrategy: """ Error strategy that continues execution via a fail branch. diff --git a/api/core/workflow/graph_engine/error_handling/retry_strategy.py b/api/core/workflow/graph_engine/error_handling/retry_strategy.py index af1058569f..e4010b6bdb 100644 --- a/api/core/workflow/graph_engine/error_handling/retry_strategy.py +++ b/api/core/workflow/graph_engine/error_handling/retry_strategy.py @@ -3,11 +3,13 @@ Retry error strategy implementation. """ import time +from typing import final from core.workflow.graph import Graph from core.workflow.graph_events import GraphNodeEventBase, NodeRunFailedEvent, NodeRunRetryEvent +@final class RetryStrategy: """ Error strategy that retries failed nodes. diff --git a/api/core/workflow/graph_engine/event_management/event_collector.py b/api/core/workflow/graph_engine/event_management/event_collector.py index 20c1419324..a41dcf5b10 100644 --- a/api/core/workflow/graph_engine/event_management/event_collector.py +++ b/api/core/workflow/graph_engine/event_management/event_collector.py @@ -3,12 +3,14 @@ Event collector for buffering and managing events. """ import threading +from typing import final from core.workflow.graph_events import GraphEngineEvent from ..layers.base import Layer +@final class ReadWriteLock: """ A read-write lock implementation that allows multiple concurrent readers @@ -56,6 +58,7 @@ class ReadWriteLock: return WriteLockContext(self) +@final class ReadLockContext: """Context manager for read locks.""" @@ -70,6 +73,7 @@ class ReadLockContext: self._lock.release_read() +@final class WriteLockContext: """Context manager for write locks.""" @@ -84,6 +88,7 @@ class WriteLockContext: self._lock.release_write() +@final class EventCollector: """ Collects and buffers events for later retrieval. diff --git a/api/core/workflow/graph_engine/event_management/event_emitter.py b/api/core/workflow/graph_engine/event_management/event_emitter.py index 36f9d5d5a2..6fb0b96e8c 100644 --- a/api/core/workflow/graph_engine/event_management/event_emitter.py +++ b/api/core/workflow/graph_engine/event_management/event_emitter.py @@ -5,12 +5,14 @@ Event emitter for yielding events to external consumers. import threading import time from collections.abc import Generator +from typing import final from core.workflow.graph_events import GraphEngineEvent from .event_collector import EventCollector +@final class EventEmitter: """ Emits collected events as a generator for external consumption. diff --git a/api/core/workflow/graph_engine/graph_engine.py b/api/core/workflow/graph_engine/graph_engine.py index df194de909..dd98536fba 100644 --- a/api/core/workflow/graph_engine/graph_engine.py +++ b/api/core/workflow/graph_engine/graph_engine.py @@ -9,7 +9,7 @@ import contextvars import logging import queue from collections.abc import Generator, Mapping -from typing import Any +from typing import final from flask import Flask, current_app @@ -45,6 +45,7 @@ from .worker_management import ActivityTracker, DynamicScaler, WorkerFactory, Wo logger = logging.getLogger(__name__) +@final class GraphEngine: """ Queue-based graph execution engine. @@ -63,7 +64,7 @@ class GraphEngine: invoke_from: InvokeFrom, call_depth: int, graph: Graph, - graph_config: Mapping[str, Any], + graph_config: Mapping[str, object], graph_runtime_state: GraphRuntimeState, max_execution_steps: int, max_execution_time: int, @@ -186,7 +187,7 @@ class GraphEngine: event_handler=self.event_handler_registry, event_collector=self.event_collector, command_processor=self.command_processor, - worker_pool=self.worker_pool, + worker_pool=self._worker_pool, ) self.dispatcher = Dispatcher( @@ -219,8 +220,8 @@ class GraphEngine: context_vars = contextvars.copy_context() # Create worker management components - self.activity_tracker = ActivityTracker() - self.dynamic_scaler = DynamicScaler( + self._activity_tracker = ActivityTracker() + self._dynamic_scaler = DynamicScaler( min_workers=(self._min_workers if self._min_workers is not None else dify_config.GRAPH_ENGINE_MIN_WORKERS), max_workers=(self._max_workers if self._max_workers is not None else dify_config.GRAPH_ENGINE_MAX_WORKERS), scale_up_threshold=( @@ -234,15 +235,15 @@ class GraphEngine: else dify_config.GRAPH_ENGINE_SCALE_DOWN_IDLE_TIME ), ) - self.worker_factory = WorkerFactory(flask_app, context_vars) + self._worker_factory = WorkerFactory(flask_app, context_vars) - self.worker_pool = WorkerPool( + self._worker_pool = WorkerPool( ready_queue=self.ready_queue, event_queue=self.event_queue, graph=self.graph, - worker_factory=self.worker_factory, - dynamic_scaler=self.dynamic_scaler, - activity_tracker=self.activity_tracker, + worker_factory=self._worker_factory, + dynamic_scaler=self._dynamic_scaler, + activity_tracker=self._activity_tracker, ) def _validate_graph_state_consistency(self) -> None: @@ -320,10 +321,10 @@ class GraphEngine: def _start_execution(self) -> None: """Start execution subsystems.""" # Calculate initial worker count - initial_workers = self.dynamic_scaler.calculate_initial_workers(self.graph) + initial_workers = self._dynamic_scaler.calculate_initial_workers(self.graph) # Start worker pool - self.worker_pool.start(initial_workers) + self._worker_pool.start(initial_workers) # Register response nodes for node in self.graph.nodes.values(): @@ -341,7 +342,7 @@ class GraphEngine: def _stop_execution(self) -> None: """Stop execution subsystems.""" self.dispatcher.stop() - self.worker_pool.stop() + self._worker_pool.stop() # Don't mark complete here as the dispatcher already does it # Notify layers diff --git a/api/core/workflow/graph_engine/graph_traversal/branch_handler.py b/api/core/workflow/graph_engine/graph_traversal/branch_handler.py index 4bc3bd1e0a..b371f3bc73 100644 --- a/api/core/workflow/graph_engine/graph_traversal/branch_handler.py +++ b/api/core/workflow/graph_engine/graph_traversal/branch_handler.py @@ -3,6 +3,7 @@ Branch node handling for graph traversal. """ from collections.abc import Sequence +from typing import final from core.workflow.graph import Graph from core.workflow.graph_events.node import NodeRunStreamChunkEvent @@ -12,6 +13,7 @@ from .edge_processor import EdgeProcessor from .skip_propagator import SkipPropagator +@final class BranchHandler: """ Handles branch node logic during graph traversal. diff --git a/api/core/workflow/graph_engine/graph_traversal/edge_processor.py b/api/core/workflow/graph_engine/graph_traversal/edge_processor.py index 76e6d819bf..ac2c658b4b 100644 --- a/api/core/workflow/graph_engine/graph_traversal/edge_processor.py +++ b/api/core/workflow/graph_engine/graph_traversal/edge_processor.py @@ -3,6 +3,7 @@ Edge processing logic for graph traversal. """ from collections.abc import Sequence +from typing import final from core.workflow.enums import NodeExecutionType from core.workflow.graph import Edge, Graph @@ -12,6 +13,7 @@ from ..response_coordinator import ResponseStreamCoordinator from ..state_management import EdgeStateManager, NodeStateManager +@final class EdgeProcessor: """ Processes edges during graph execution. diff --git a/api/core/workflow/graph_engine/graph_traversal/node_readiness.py b/api/core/workflow/graph_engine/graph_traversal/node_readiness.py index 29e74e2f3f..59bce3942c 100644 --- a/api/core/workflow/graph_engine/graph_traversal/node_readiness.py +++ b/api/core/workflow/graph_engine/graph_traversal/node_readiness.py @@ -2,10 +2,13 @@ Node readiness checking for execution. """ +from typing import final + from core.workflow.enums import NodeState from core.workflow.graph import Graph +@final class NodeReadinessChecker: """ Checks if nodes are ready for execution based on their dependencies. diff --git a/api/core/workflow/graph_engine/graph_traversal/skip_propagator.py b/api/core/workflow/graph_engine/graph_traversal/skip_propagator.py index d4bceeb333..5ac445d405 100644 --- a/api/core/workflow/graph_engine/graph_traversal/skip_propagator.py +++ b/api/core/workflow/graph_engine/graph_traversal/skip_propagator.py @@ -3,12 +3,14 @@ Skip state propagation through the graph. """ from collections.abc import Sequence +from typing import final from core.workflow.graph import Edge, Graph from ..state_management import EdgeStateManager, NodeStateManager +@final class SkipPropagator: """ Propagates skip states through the graph. diff --git a/api/core/workflow/graph_engine/layers/debug_logging.py b/api/core/workflow/graph_engine/layers/debug_logging.py index 73db3260b1..3052600161 100644 --- a/api/core/workflow/graph_engine/layers/debug_logging.py +++ b/api/core/workflow/graph_engine/layers/debug_logging.py @@ -7,7 +7,7 @@ graph execution for debugging purposes. import logging from collections.abc import Mapping -from typing import Any +from typing import Any, final from core.workflow.graph_events import ( GraphEngineEvent, @@ -34,6 +34,7 @@ from core.workflow.graph_events import ( from .base import Layer +@final class DebugLoggingLayer(Layer): """ A layer that provides comprehensive logging of GraphEngine execution. diff --git a/api/core/workflow/graph_engine/layers/execution_limits.py b/api/core/workflow/graph_engine/layers/execution_limits.py index 71b6e045fd..efda0bacbe 100644 --- a/api/core/workflow/graph_engine/layers/execution_limits.py +++ b/api/core/workflow/graph_engine/layers/execution_limits.py @@ -11,6 +11,7 @@ When limits are exceeded, the layer automatically aborts execution. import logging import time from enum import Enum +from typing import final from core.workflow.graph_engine.entities.commands import AbortCommand, CommandType from core.workflow.graph_engine.layers import Layer @@ -28,6 +29,7 @@ class LimitType(Enum): TIME_LIMIT = "time_limit" +@final class ExecutionLimitsLayer(Layer): """ Layer that enforces execution limits for workflows. diff --git a/api/core/workflow/graph_engine/manager.py b/api/core/workflow/graph_engine/manager.py index 3c9f347816..ed62209acb 100644 --- a/api/core/workflow/graph_engine/manager.py +++ b/api/core/workflow/graph_engine/manager.py @@ -6,11 +6,14 @@ using the new Redis command channel, without requiring user permission checks. Supports stop, pause, and resume operations. """ +from typing import final + from core.workflow.graph_engine.command_channels.redis_channel import RedisChannel from core.workflow.graph_engine.entities.commands import AbortCommand from extensions.ext_redis import redis_client +@final class GraphEngineManager: """ Manager for sending control commands to GraphEngine instances. diff --git a/api/core/workflow/graph_engine/orchestration/dispatcher.py b/api/core/workflow/graph_engine/orchestration/dispatcher.py index 87ad3d0117..694355298c 100644 --- a/api/core/workflow/graph_engine/orchestration/dispatcher.py +++ b/api/core/workflow/graph_engine/orchestration/dispatcher.py @@ -6,7 +6,7 @@ import logging import queue import threading import time -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, final from core.workflow.graph_events.base import GraphNodeEventBase @@ -19,6 +19,7 @@ if TYPE_CHECKING: logger = logging.getLogger(__name__) +@final class Dispatcher: """ Main dispatcher that processes events from the event queue. diff --git a/api/core/workflow/graph_engine/orchestration/execution_coordinator.py b/api/core/workflow/graph_engine/orchestration/execution_coordinator.py index 899cb6a0d5..5f95b5b29e 100644 --- a/api/core/workflow/graph_engine/orchestration/execution_coordinator.py +++ b/api/core/workflow/graph_engine/orchestration/execution_coordinator.py @@ -2,7 +2,7 @@ Execution coordinator for managing overall workflow execution. """ -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, final from ..command_processing import CommandProcessor from ..domain import GraphExecution @@ -14,6 +14,7 @@ if TYPE_CHECKING: from ..event_management import EventHandlerRegistry +@final class ExecutionCoordinator: """ Coordinates overall execution flow between subsystems. diff --git a/api/core/workflow/graph_engine/output_registry/registry.py b/api/core/workflow/graph_engine/output_registry/registry.py index 6ffc6b178a..4df7da207c 100644 --- a/api/core/workflow/graph_engine/output_registry/registry.py +++ b/api/core/workflow/graph_engine/output_registry/registry.py @@ -7,7 +7,7 @@ thread-safe storage for node outputs. from collections.abc import Sequence from threading import RLock -from typing import TYPE_CHECKING, Union +from typing import TYPE_CHECKING, Union, final from core.variables import Segment from core.workflow.entities.variable_pool import VariablePool @@ -18,6 +18,7 @@ if TYPE_CHECKING: from core.workflow.graph_events import NodeRunStreamChunkEvent +@final class OutputRegistry: """ Thread-safe registry for storing and retrieving node outputs. diff --git a/api/core/workflow/graph_engine/output_registry/stream.py b/api/core/workflow/graph_engine/output_registry/stream.py index e9a097d85f..8a99b56d1f 100644 --- a/api/core/workflow/graph_engine/output_registry/stream.py +++ b/api/core/workflow/graph_engine/output_registry/stream.py @@ -5,12 +5,13 @@ This module contains the private Stream class used internally by OutputRegistry to manage streaming data chunks. """ -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, final if TYPE_CHECKING: from core.workflow.graph_events import NodeRunStreamChunkEvent +@final class Stream: """ A stream that holds NodeRunStreamChunkEvent objects and tracks read position. diff --git a/api/core/workflow/graph_engine/response_coordinator/coordinator.py b/api/core/workflow/graph_engine/response_coordinator/coordinator.py index 380f5de04c..4c3cc167fa 100644 --- a/api/core/workflow/graph_engine/response_coordinator/coordinator.py +++ b/api/core/workflow/graph_engine/response_coordinator/coordinator.py @@ -9,7 +9,7 @@ import logging from collections import deque from collections.abc import Sequence from threading import RLock -from typing import TypeAlias +from typing import TypeAlias, final from uuid import uuid4 from core.workflow.enums import NodeExecutionType, NodeState @@ -28,6 +28,7 @@ NodeID: TypeAlias = str EdgeID: TypeAlias = str +@final class ResponseStreamCoordinator: """ Manages response streaming sessions without relying on global state. diff --git a/api/core/workflow/graph_engine/state_management/edge_state_manager.py b/api/core/workflow/graph_engine/state_management/edge_state_manager.py index 32d6ca5780..747062284a 100644 --- a/api/core/workflow/graph_engine/state_management/edge_state_manager.py +++ b/api/core/workflow/graph_engine/state_management/edge_state_manager.py @@ -4,7 +4,7 @@ Manager for edge states during graph execution. import threading from collections.abc import Sequence -from typing import TypedDict +from typing import TypedDict, final from core.workflow.enums import NodeState from core.workflow.graph import Edge, Graph @@ -18,6 +18,7 @@ class EdgeStateAnalysis(TypedDict): all_skipped: bool +@final class EdgeStateManager: """ Manages edge states and transitions during graph execution. diff --git a/api/core/workflow/graph_engine/state_management/execution_tracker.py b/api/core/workflow/graph_engine/state_management/execution_tracker.py index 2008f30777..01fa80f2ce 100644 --- a/api/core/workflow/graph_engine/state_management/execution_tracker.py +++ b/api/core/workflow/graph_engine/state_management/execution_tracker.py @@ -3,8 +3,10 @@ Tracker for currently executing nodes. """ import threading +from typing import final +@final class ExecutionTracker: """ Tracks nodes that are currently being executed. diff --git a/api/core/workflow/graph_engine/state_management/node_state_manager.py b/api/core/workflow/graph_engine/state_management/node_state_manager.py index 61bb639cda..d5ed42ad1d 100644 --- a/api/core/workflow/graph_engine/state_management/node_state_manager.py +++ b/api/core/workflow/graph_engine/state_management/node_state_manager.py @@ -4,11 +4,13 @@ Manager for node states during graph execution. import queue import threading +from typing import final from core.workflow.enums import NodeState from core.workflow.graph import Graph +@final class NodeStateManager: """ Manages node states and the ready queue for execution. diff --git a/api/core/workflow/graph_engine/worker.py b/api/core/workflow/graph_engine/worker.py index b7eedbc871..dacf6f0435 100644 --- a/api/core/workflow/graph_engine/worker.py +++ b/api/core/workflow/graph_engine/worker.py @@ -11,6 +11,7 @@ import threading import time from collections.abc import Callable from datetime import datetime +from typing import final from uuid import uuid4 from flask import Flask @@ -22,6 +23,7 @@ from core.workflow.nodes.base.node import Node from libs.flask_utils import preserve_flask_contexts +@final class Worker(threading.Thread): """ Worker thread that executes nodes from the ready queue. diff --git a/api/core/workflow/graph_engine/worker_management/activity_tracker.py b/api/core/workflow/graph_engine/worker_management/activity_tracker.py index 5203fc6b6c..b2125a0158 100644 --- a/api/core/workflow/graph_engine/worker_management/activity_tracker.py +++ b/api/core/workflow/graph_engine/worker_management/activity_tracker.py @@ -4,8 +4,10 @@ Activity tracker for monitoring worker activity. import threading import time +from typing import final +@final class ActivityTracker: """ Tracks worker activity for scaling decisions. diff --git a/api/core/workflow/graph_engine/worker_management/dynamic_scaler.py b/api/core/workflow/graph_engine/worker_management/dynamic_scaler.py index 7a1920a724..7450b02618 100644 --- a/api/core/workflow/graph_engine/worker_management/dynamic_scaler.py +++ b/api/core/workflow/graph_engine/worker_management/dynamic_scaler.py @@ -2,9 +2,12 @@ Dynamic scaler for worker pool sizing. """ +from typing import final + from core.workflow.graph import Graph +@final class DynamicScaler: """ Manages dynamic scaling decisions for the worker pool. diff --git a/api/core/workflow/graph_engine/worker_management/worker_factory.py b/api/core/workflow/graph_engine/worker_management/worker_factory.py index 8714ee88d1..673ca11f26 100644 --- a/api/core/workflow/graph_engine/worker_management/worker_factory.py +++ b/api/core/workflow/graph_engine/worker_management/worker_factory.py @@ -5,6 +5,7 @@ Factory for creating worker instances. import contextvars import queue from collections.abc import Callable +from typing import final from flask import Flask @@ -13,6 +14,7 @@ from core.workflow.graph import Graph from ..worker import Worker +@final class WorkerFactory: """ Factory for creating worker instances with proper context. diff --git a/api/core/workflow/graph_engine/worker_management/worker_pool.py b/api/core/workflow/graph_engine/worker_management/worker_pool.py index 8faa9da156..55250809cd 100644 --- a/api/core/workflow/graph_engine/worker_management/worker_pool.py +++ b/api/core/workflow/graph_engine/worker_management/worker_pool.py @@ -4,6 +4,7 @@ Worker pool management. import queue import threading +from typing import final from core.workflow.graph import Graph @@ -13,6 +14,7 @@ from .dynamic_scaler import DynamicScaler from .worker_factory import WorkerFactory +@final class WorkerPool: """ Manages a pool of worker threads for executing nodes.