chore(graph_engine): Use `XXX | None` instead of `Optional[XXX]`

This commit is contained in:
-LAN- 2025-08-28 04:59:49 +08:00
parent ef21097774
commit affedd6ce4
No known key found for this signature in database
GPG Key ID: 6BA0D108DED011FF
14 changed files with 34 additions and 41 deletions

View File

@ -3,7 +3,6 @@ NodeExecution entity representing a node's execution state.
"""
from dataclasses import dataclass
from typing import Optional
from core.workflow.enums import NodeState
@ -20,8 +19,8 @@ class NodeExecution:
node_id: str
state: NodeState = NodeState.UNKNOWN
retry_count: int = 0
execution_id: Optional[str] = None
error: Optional[str] = None
execution_id: str | None = None
error: str | None = None
def mark_started(self, execution_id: str) -> None:
"""Mark the node as started with an execution ID."""

View File

@ -6,7 +6,7 @@ instance to control its execution flow.
"""
from enum import Enum
from typing import Any, Optional
from typing import Any
from pydantic import BaseModel, Field
@ -23,11 +23,11 @@ class GraphEngineCommand(BaseModel):
"""Base class for all GraphEngine commands."""
command_type: CommandType = Field(..., description="Type of command")
payload: Optional[dict[str, Any]] = Field(default=None, description="Optional command payload")
payload: dict[str, Any] | None = Field(default=None, description="Optional command payload")
class AbortCommand(GraphEngineCommand):
"""Command to abort a running workflow execution."""
command_type: CommandType = Field(default=CommandType.ABORT, description="Type of command")
reason: Optional[str] = Field(default=None, description="Optional reason for abort")
reason: str | None = Field(default=None, description="Optional reason for abort")

View File

@ -9,7 +9,7 @@ import contextvars
import logging
import queue
from collections.abc import Generator, Mapping
from typing import Any, Optional
from typing import Any
from flask import Flask, current_app
@ -210,7 +210,7 @@ class GraphEngine:
def _setup_worker_management(self) -> None:
"""Initialize worker management subsystem."""
# Capture context for workers
flask_app: Optional[Flask] = None
flask_app: Flask | None = None
try:
flask_app = current_app._get_current_object() # type: ignore
except RuntimeError:

View File

@ -3,7 +3,6 @@ Branch node handling for graph traversal.
"""
from collections.abc import Sequence
from typing import Optional
from core.workflow.graph import Graph
from core.workflow.graph_events.node import NodeRunStreamChunkEvent
@ -43,7 +42,7 @@ class BranchHandler:
self.edge_state_manager = edge_state_manager
def handle_branch_completion(
self, node_id: str, selected_handle: Optional[str]
self, node_id: str, selected_handle: str | None
) -> tuple[Sequence[str], Sequence[NodeRunStreamChunkEvent]]:
"""
Handle completion of a branch node.

View File

@ -6,7 +6,6 @@ intercept and respond to GraphEngine events.
"""
from abc import ABC, abstractmethod
from typing import Optional
from core.workflow.entities import GraphRuntimeState
from core.workflow.graph_engine.protocols.command_channel import CommandChannel
@ -28,8 +27,8 @@ class Layer(ABC):
def __init__(self) -> None:
"""Initialize the layer. Subclasses can override with custom parameters."""
self.graph_runtime_state: Optional[GraphRuntimeState] = None
self.command_channel: Optional[CommandChannel] = None
self.graph_runtime_state: GraphRuntimeState | None = None
self.command_channel: CommandChannel | None = None
def initialize(self, graph_runtime_state: GraphRuntimeState, command_channel: CommandChannel) -> None:
"""
@ -73,7 +72,7 @@ class Layer(ABC):
pass
@abstractmethod
def on_graph_end(self, error: Optional[Exception]) -> None:
def on_graph_end(self, error: Exception | None) -> None:
"""
Called when graph execution ends.

View File

@ -7,7 +7,7 @@ graph execution for debugging purposes.
import logging
from collections.abc import Mapping
from typing import Any, Optional
from typing import Any
from core.workflow.graph_events import (
GraphEngineEvent,
@ -221,7 +221,7 @@ class DebugLoggingLayer(Layer):
# Log unknown events at debug level
self.logger.debug("Event: %s", event_class)
def on_graph_end(self, error: Optional[Exception]) -> None:
def on_graph_end(self, error: Exception | None) -> None:
"""Log graph execution end with summary statistics."""
self.logger.info("=" * 80)

View File

@ -11,7 +11,6 @@ When limits are exceeded, the layer automatically aborts execution.
import logging
import time
from enum import Enum
from typing import Optional
from core.workflow.graph_engine.entities.commands import AbortCommand, CommandType
from core.workflow.graph_engine.layers import Layer
@ -53,7 +52,7 @@ class ExecutionLimitsLayer(Layer):
self.max_time = max_time
# Runtime tracking
self.start_time: Optional[float] = None
self.start_time: float | None = None
self.step_count = 0
self.logger = logging.getLogger(__name__)
@ -94,7 +93,7 @@ class ExecutionLimitsLayer(Layer):
if self._reached_time_limitation():
self._send_abort_command(LimitType.TIME_LIMIT)
def on_graph_end(self, error: Optional[Exception]) -> None:
def on_graph_end(self, error: Exception | None) -> None:
"""Called when graph execution ends."""
if self._execution_started and not self._execution_ended:
self._execution_ended = True

View File

@ -6,7 +6,6 @@ using the new Redis command channel, without requiring user permission checks.
Supports stop, pause, and resume operations.
"""
from typing import Optional
from core.workflow.graph_engine.command_channels.redis_channel import RedisChannel
from core.workflow.graph_engine.entities.commands import AbortCommand
@ -23,7 +22,7 @@ class GraphEngineManager:
"""
@staticmethod
def send_stop_command(task_id: str, reason: Optional[str] = None) -> None:
def send_stop_command(task_id: str, reason: str | None = None) -> None:
"""
Send a stop command to a running workflow.

View File

@ -6,7 +6,7 @@ import logging
import queue
import threading
import time
from typing import TYPE_CHECKING, Optional
from typing import TYPE_CHECKING
from core.workflow.graph_events.base import GraphNodeEventBase
@ -34,7 +34,7 @@ class Dispatcher:
event_collector: EventCollector,
execution_coordinator: ExecutionCoordinator,
max_execution_time: int,
event_emitter: Optional[EventEmitter] = None,
event_emitter: EventEmitter | None = None,
) -> None:
"""
Initialize the dispatcher.
@ -54,9 +54,9 @@ class Dispatcher:
self.max_execution_time = max_execution_time
self.event_emitter = event_emitter
self._thread: Optional[threading.Thread] = None
self._thread: threading.Thread | None = None
self._stop_event = threading.Event()
self._start_time: Optional[float] = None
self._start_time: float | None = None
def start(self) -> None:
"""Start the dispatcher thread."""

View File

@ -7,7 +7,7 @@ thread-safe storage for node outputs.
from collections.abc import Sequence
from threading import RLock
from typing import TYPE_CHECKING, Optional, Union
from typing import TYPE_CHECKING, Union
from core.variables import Segment
from core.workflow.entities.variable_pool import VariablePool
@ -47,7 +47,7 @@ class OutputRegistry:
with self._lock:
self._scalars.add(selector, value)
def get_scalar(self, selector: Sequence[str]) -> Optional["Segment"]:
def get_scalar(self, selector: Sequence[str]) -> "Segment" | None:
"""
Get a scalar value for the given selector.
@ -81,7 +81,7 @@ class OutputRegistry:
except ValueError:
raise ValueError(f"Stream {'.'.join(selector)} is already closed")
def pop_chunk(self, selector: Sequence[str]) -> Optional["NodeRunStreamChunkEvent"]:
def pop_chunk(self, selector: Sequence[str]) -> "NodeRunStreamChunkEvent" | None:
"""
Pop the next unread NodeRunStreamChunkEvent from the stream.

View File

@ -5,7 +5,7 @@ This module contains the private Stream class used internally by OutputRegistry
to manage streaming data chunks.
"""
from typing import TYPE_CHECKING, Optional
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from core.workflow.graph_events import NodeRunStreamChunkEvent
@ -41,7 +41,7 @@ class Stream:
raise ValueError("Cannot append to a closed stream")
self.events.append(event)
def pop_next(self) -> Optional["NodeRunStreamChunkEvent"]:
def pop_next(self) -> "NodeRunStreamChunkEvent" | None:
"""
Pop the next unread NodeRunStreamChunkEvent from the stream.

View File

@ -9,7 +9,7 @@ import logging
from collections import deque
from collections.abc import Sequence
from threading import RLock
from typing import Optional, TypeAlias
from typing import TypeAlias
from uuid import uuid4
from core.workflow.enums import NodeExecutionType, NodeState
@ -45,7 +45,7 @@ class ResponseStreamCoordinator:
"""
self.registry = registry
self.graph = graph
self.active_session: Optional[ResponseSession] = None
self.active_session: ResponseSession | None = None
self.waiting_sessions: deque[ResponseSession] = deque()
self.lock = RLock()

View File

@ -11,7 +11,6 @@ import threading
import time
from collections.abc import Callable
from datetime import datetime
from typing import Optional
from uuid import uuid4
from flask import Flask
@ -38,10 +37,10 @@ class Worker(threading.Thread):
event_queue: queue.Queue[GraphNodeEventBase],
graph: Graph,
worker_id: int = 0,
flask_app: Optional[Flask] = None,
context_vars: Optional[contextvars.Context] = None,
on_idle_callback: Optional[Callable[[int], None]] = None,
on_active_callback: Optional[Callable[[int], None]] = None,
flask_app: Flask | None = None,
context_vars: contextvars.Context | None = None,
on_idle_callback: Callable[[int], None] | None = None,
on_active_callback: Callable[[int], None] | None = None,
) -> None:
"""
Initialize worker thread.

View File

@ -5,7 +5,6 @@ Factory for creating worker instances.
import contextvars
import queue
from collections.abc import Callable
from typing import Optional
from flask import Flask
@ -24,7 +23,7 @@ class WorkerFactory:
def __init__(
self,
flask_app: Optional[Flask],
flask_app: Flask | None,
context_vars: contextvars.Context,
) -> None:
"""
@ -43,8 +42,8 @@ class WorkerFactory:
ready_queue: queue.Queue[str],
event_queue: queue.Queue,
graph: Graph,
on_idle_callback: Optional[Callable[[int], None]] = None,
on_active_callback: Optional[Callable[[int], None]] = None,
on_idle_callback: Callable[[int], None] | None = None,
on_active_callback: Callable[[int], None] | None = None,
) -> Worker:
"""
Create a new worker instance.