chore(graph_engine): add final mark to classes

Signed-off-by: -LAN- <laipz8200@outlook.com>
This commit is contained in:
-LAN- 2025-08-28 15:38:35 +08:00
parent e3a7b1f691
commit c396788128
No known key found for this signature in database
GPG Key ID: 6BA0D108DED011FF
31 changed files with 82 additions and 22 deletions

View File

@ -6,10 +6,12 @@ within a single process. Each instance handles commands for one workflow executi
"""
from queue import Queue
from typing import final
from ..entities.commands import GraphEngineCommand
@final
class InMemoryChannel:
"""
In-memory command channel implementation using a thread-safe queue.

View File

@ -7,7 +7,7 @@ Each instance uses a unique key for its command queue.
"""
import json
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, final
from ..entities.commands import AbortCommand, CommandType, GraphEngineCommand
@ -15,6 +15,7 @@ if TYPE_CHECKING:
from extensions.ext_redis import RedisClientWrapper
@final
class RedisChannel:
"""
Redis-based command channel implementation for distributed systems.

View File

@ -3,6 +3,7 @@ Command handler implementations.
"""
import logging
from typing import final
from ..domain.graph_execution import GraphExecution
from ..entities.commands import AbortCommand, GraphEngineCommand
@ -11,6 +12,7 @@ from .command_processor import CommandHandler
logger = logging.getLogger(__name__)
@final
class AbortCommandHandler(CommandHandler):
"""Handles abort commands."""

View File

@ -3,7 +3,7 @@ Main command processor for handling external commands.
"""
import logging
from typing import Protocol
from typing import Protocol, final
from ..domain.graph_execution import GraphExecution
from ..entities.commands import GraphEngineCommand
@ -18,6 +18,7 @@ class CommandHandler(Protocol):
def handle(self, command: GraphEngineCommand, execution: GraphExecution) -> None: ...
@final
class CommandProcessor:
"""
Processes external commands sent to the engine.

View File

@ -3,6 +3,7 @@ Abort error strategy implementation.
"""
import logging
from typing import final
from core.workflow.graph import Graph
from core.workflow.graph_events import GraphNodeEventBase, NodeRunFailedEvent
@ -10,6 +11,7 @@ from core.workflow.graph_events import GraphNodeEventBase, NodeRunFailedEvent
logger = logging.getLogger(__name__)
@final
class AbortStrategy:
"""
Error strategy that aborts execution on failure.

View File

@ -2,12 +2,15 @@
Default value error strategy implementation.
"""
from typing import final
from core.workflow.enums import ErrorStrategy, WorkflowNodeExecutionMetadataKey, WorkflowNodeExecutionStatus
from core.workflow.graph import Graph
from core.workflow.graph_events import GraphNodeEventBase, NodeRunExceptionEvent, NodeRunFailedEvent
from core.workflow.node_events import NodeRunResult
@final
class DefaultValueStrategy:
"""
Error strategy that uses default values on failure.

View File

@ -2,12 +2,15 @@
Fail branch error strategy implementation.
"""
from typing import final
from core.workflow.enums import ErrorStrategy, WorkflowNodeExecutionMetadataKey, WorkflowNodeExecutionStatus
from core.workflow.graph import Graph
from core.workflow.graph_events import GraphNodeEventBase, NodeRunExceptionEvent, NodeRunFailedEvent
from core.workflow.node_events import NodeRunResult
@final
class FailBranchStrategy:
"""
Error strategy that continues execution via a fail branch.

View File

@ -3,11 +3,13 @@ Retry error strategy implementation.
"""
import time
from typing import final
from core.workflow.graph import Graph
from core.workflow.graph_events import GraphNodeEventBase, NodeRunFailedEvent, NodeRunRetryEvent
@final
class RetryStrategy:
"""
Error strategy that retries failed nodes.

View File

@ -3,12 +3,14 @@ Event collector for buffering and managing events.
"""
import threading
from typing import final
from core.workflow.graph_events import GraphEngineEvent
from ..layers.base import Layer
@final
class ReadWriteLock:
"""
A read-write lock implementation that allows multiple concurrent readers
@ -56,6 +58,7 @@ class ReadWriteLock:
return WriteLockContext(self)
@final
class ReadLockContext:
"""Context manager for read locks."""
@ -70,6 +73,7 @@ class ReadLockContext:
self._lock.release_read()
@final
class WriteLockContext:
"""Context manager for write locks."""
@ -84,6 +88,7 @@ class WriteLockContext:
self._lock.release_write()
@final
class EventCollector:
"""
Collects and buffers events for later retrieval.

View File

@ -5,12 +5,14 @@ Event emitter for yielding events to external consumers.
import threading
import time
from collections.abc import Generator
from typing import final
from core.workflow.graph_events import GraphEngineEvent
from .event_collector import EventCollector
@final
class EventEmitter:
"""
Emits collected events as a generator for external consumption.

View File

@ -9,7 +9,7 @@ import contextvars
import logging
import queue
from collections.abc import Generator, Mapping
from typing import Any
from typing import final
from flask import Flask, current_app
@ -45,6 +45,7 @@ from .worker_management import ActivityTracker, DynamicScaler, WorkerFactory, Wo
logger = logging.getLogger(__name__)
@final
class GraphEngine:
"""
Queue-based graph execution engine.
@ -63,7 +64,7 @@ class GraphEngine:
invoke_from: InvokeFrom,
call_depth: int,
graph: Graph,
graph_config: Mapping[str, Any],
graph_config: Mapping[str, object],
graph_runtime_state: GraphRuntimeState,
max_execution_steps: int,
max_execution_time: int,
@ -186,7 +187,7 @@ class GraphEngine:
event_handler=self.event_handler_registry,
event_collector=self.event_collector,
command_processor=self.command_processor,
worker_pool=self.worker_pool,
worker_pool=self._worker_pool,
)
self.dispatcher = Dispatcher(
@ -219,8 +220,8 @@ class GraphEngine:
context_vars = contextvars.copy_context()
# Create worker management components
self.activity_tracker = ActivityTracker()
self.dynamic_scaler = DynamicScaler(
self._activity_tracker = ActivityTracker()
self._dynamic_scaler = DynamicScaler(
min_workers=(self._min_workers if self._min_workers is not None else dify_config.GRAPH_ENGINE_MIN_WORKERS),
max_workers=(self._max_workers if self._max_workers is not None else dify_config.GRAPH_ENGINE_MAX_WORKERS),
scale_up_threshold=(
@ -234,15 +235,15 @@ class GraphEngine:
else dify_config.GRAPH_ENGINE_SCALE_DOWN_IDLE_TIME
),
)
self.worker_factory = WorkerFactory(flask_app, context_vars)
self._worker_factory = WorkerFactory(flask_app, context_vars)
self.worker_pool = WorkerPool(
self._worker_pool = WorkerPool(
ready_queue=self.ready_queue,
event_queue=self.event_queue,
graph=self.graph,
worker_factory=self.worker_factory,
dynamic_scaler=self.dynamic_scaler,
activity_tracker=self.activity_tracker,
worker_factory=self._worker_factory,
dynamic_scaler=self._dynamic_scaler,
activity_tracker=self._activity_tracker,
)
def _validate_graph_state_consistency(self) -> None:
@ -320,10 +321,10 @@ class GraphEngine:
def _start_execution(self) -> None:
"""Start execution subsystems."""
# Calculate initial worker count
initial_workers = self.dynamic_scaler.calculate_initial_workers(self.graph)
initial_workers = self._dynamic_scaler.calculate_initial_workers(self.graph)
# Start worker pool
self.worker_pool.start(initial_workers)
self._worker_pool.start(initial_workers)
# Register response nodes
for node in self.graph.nodes.values():
@ -341,7 +342,7 @@ class GraphEngine:
def _stop_execution(self) -> None:
"""Stop execution subsystems."""
self.dispatcher.stop()
self.worker_pool.stop()
self._worker_pool.stop()
# Don't mark complete here as the dispatcher already does it
# Notify layers

View File

@ -3,6 +3,7 @@ Branch node handling for graph traversal.
"""
from collections.abc import Sequence
from typing import final
from core.workflow.graph import Graph
from core.workflow.graph_events.node import NodeRunStreamChunkEvent
@ -12,6 +13,7 @@ from .edge_processor import EdgeProcessor
from .skip_propagator import SkipPropagator
@final
class BranchHandler:
"""
Handles branch node logic during graph traversal.

View File

@ -3,6 +3,7 @@ Edge processing logic for graph traversal.
"""
from collections.abc import Sequence
from typing import final
from core.workflow.enums import NodeExecutionType
from core.workflow.graph import Edge, Graph
@ -12,6 +13,7 @@ from ..response_coordinator import ResponseStreamCoordinator
from ..state_management import EdgeStateManager, NodeStateManager
@final
class EdgeProcessor:
"""
Processes edges during graph execution.

View File

@ -2,10 +2,13 @@
Node readiness checking for execution.
"""
from typing import final
from core.workflow.enums import NodeState
from core.workflow.graph import Graph
@final
class NodeReadinessChecker:
"""
Checks if nodes are ready for execution based on their dependencies.

View File

@ -3,12 +3,14 @@ Skip state propagation through the graph.
"""
from collections.abc import Sequence
from typing import final
from core.workflow.graph import Edge, Graph
from ..state_management import EdgeStateManager, NodeStateManager
@final
class SkipPropagator:
"""
Propagates skip states through the graph.

View File

@ -7,7 +7,7 @@ graph execution for debugging purposes.
import logging
from collections.abc import Mapping
from typing import Any
from typing import Any, final
from core.workflow.graph_events import (
GraphEngineEvent,
@ -34,6 +34,7 @@ from core.workflow.graph_events import (
from .base import Layer
@final
class DebugLoggingLayer(Layer):
"""
A layer that provides comprehensive logging of GraphEngine execution.

View File

@ -11,6 +11,7 @@ When limits are exceeded, the layer automatically aborts execution.
import logging
import time
from enum import Enum
from typing import final
from core.workflow.graph_engine.entities.commands import AbortCommand, CommandType
from core.workflow.graph_engine.layers import Layer
@ -28,6 +29,7 @@ class LimitType(Enum):
TIME_LIMIT = "time_limit"
@final
class ExecutionLimitsLayer(Layer):
"""
Layer that enforces execution limits for workflows.

View File

@ -6,11 +6,14 @@ using the new Redis command channel, without requiring user permission checks.
Supports stop, pause, and resume operations.
"""
from typing import final
from core.workflow.graph_engine.command_channels.redis_channel import RedisChannel
from core.workflow.graph_engine.entities.commands import AbortCommand
from extensions.ext_redis import redis_client
@final
class GraphEngineManager:
"""
Manager for sending control commands to GraphEngine instances.

View File

@ -6,7 +6,7 @@ import logging
import queue
import threading
import time
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, final
from core.workflow.graph_events.base import GraphNodeEventBase
@ -19,6 +19,7 @@ if TYPE_CHECKING:
logger = logging.getLogger(__name__)
@final
class Dispatcher:
"""
Main dispatcher that processes events from the event queue.

View File

@ -2,7 +2,7 @@
Execution coordinator for managing overall workflow execution.
"""
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, final
from ..command_processing import CommandProcessor
from ..domain import GraphExecution
@ -14,6 +14,7 @@ if TYPE_CHECKING:
from ..event_management import EventHandlerRegistry
@final
class ExecutionCoordinator:
"""
Coordinates overall execution flow between subsystems.

View File

@ -7,7 +7,7 @@ thread-safe storage for node outputs.
from collections.abc import Sequence
from threading import RLock
from typing import TYPE_CHECKING, Union
from typing import TYPE_CHECKING, Union, final
from core.variables import Segment
from core.workflow.entities.variable_pool import VariablePool
@ -18,6 +18,7 @@ if TYPE_CHECKING:
from core.workflow.graph_events import NodeRunStreamChunkEvent
@final
class OutputRegistry:
"""
Thread-safe registry for storing and retrieving node outputs.

View File

@ -5,12 +5,13 @@ This module contains the private Stream class used internally by OutputRegistry
to manage streaming data chunks.
"""
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, final
if TYPE_CHECKING:
from core.workflow.graph_events import NodeRunStreamChunkEvent
@final
class Stream:
"""
A stream that holds NodeRunStreamChunkEvent objects and tracks read position.

View File

@ -9,7 +9,7 @@ import logging
from collections import deque
from collections.abc import Sequence
from threading import RLock
from typing import TypeAlias
from typing import TypeAlias, final
from uuid import uuid4
from core.workflow.enums import NodeExecutionType, NodeState
@ -28,6 +28,7 @@ NodeID: TypeAlias = str
EdgeID: TypeAlias = str
@final
class ResponseStreamCoordinator:
"""
Manages response streaming sessions without relying on global state.

View File

@ -4,7 +4,7 @@ Manager for edge states during graph execution.
import threading
from collections.abc import Sequence
from typing import TypedDict
from typing import TypedDict, final
from core.workflow.enums import NodeState
from core.workflow.graph import Edge, Graph
@ -18,6 +18,7 @@ class EdgeStateAnalysis(TypedDict):
all_skipped: bool
@final
class EdgeStateManager:
"""
Manages edge states and transitions during graph execution.

View File

@ -3,8 +3,10 @@ Tracker for currently executing nodes.
"""
import threading
from typing import final
@final
class ExecutionTracker:
"""
Tracks nodes that are currently being executed.

View File

@ -4,11 +4,13 @@ Manager for node states during graph execution.
import queue
import threading
from typing import final
from core.workflow.enums import NodeState
from core.workflow.graph import Graph
@final
class NodeStateManager:
"""
Manages node states and the ready queue for execution.

View File

@ -11,6 +11,7 @@ import threading
import time
from collections.abc import Callable
from datetime import datetime
from typing import final
from uuid import uuid4
from flask import Flask
@ -22,6 +23,7 @@ from core.workflow.nodes.base.node import Node
from libs.flask_utils import preserve_flask_contexts
@final
class Worker(threading.Thread):
"""
Worker thread that executes nodes from the ready queue.

View File

@ -4,8 +4,10 @@ Activity tracker for monitoring worker activity.
import threading
import time
from typing import final
@final
class ActivityTracker:
"""
Tracks worker activity for scaling decisions.

View File

@ -2,9 +2,12 @@
Dynamic scaler for worker pool sizing.
"""
from typing import final
from core.workflow.graph import Graph
@final
class DynamicScaler:
"""
Manages dynamic scaling decisions for the worker pool.

View File

@ -5,6 +5,7 @@ Factory for creating worker instances.
import contextvars
import queue
from collections.abc import Callable
from typing import final
from flask import Flask
@ -13,6 +14,7 @@ from core.workflow.graph import Graph
from ..worker import Worker
@final
class WorkerFactory:
"""
Factory for creating worker instances with proper context.

View File

@ -4,6 +4,7 @@ Worker pool management.
import queue
import threading
from typing import final
from core.workflow.graph import Graph
@ -13,6 +14,7 @@ from .dynamic_scaler import DynamicScaler
from .worker_factory import WorkerFactory
@final
class WorkerPool:
"""
Manages a pool of worker threads for executing nodes.