diff --git a/api/core/workflow/graph_engine/domain/node_execution.py b/api/core/workflow/graph_engine/domain/node_execution.py index 937ae0fb93..85700caa3a 100644 --- a/api/core/workflow/graph_engine/domain/node_execution.py +++ b/api/core/workflow/graph_engine/domain/node_execution.py @@ -3,7 +3,6 @@ NodeExecution entity representing a node's execution state. """ from dataclasses import dataclass -from typing import Optional from core.workflow.enums import NodeState @@ -20,8 +19,8 @@ class NodeExecution: node_id: str state: NodeState = NodeState.UNKNOWN retry_count: int = 0 - execution_id: Optional[str] = None - error: Optional[str] = None + execution_id: str | None = None + error: str | None = None def mark_started(self, execution_id: str) -> None: """Mark the node as started with an execution ID.""" diff --git a/api/core/workflow/graph_engine/entities/commands.py b/api/core/workflow/graph_engine/entities/commands.py index a92ebf512d..7e25fc0866 100644 --- a/api/core/workflow/graph_engine/entities/commands.py +++ b/api/core/workflow/graph_engine/entities/commands.py @@ -6,7 +6,7 @@ instance to control its execution flow. """ from enum import Enum -from typing import Any, Optional +from typing import Any from pydantic import BaseModel, Field @@ -23,11 +23,11 @@ class GraphEngineCommand(BaseModel): """Base class for all GraphEngine commands.""" command_type: CommandType = Field(..., description="Type of command") - payload: Optional[dict[str, Any]] = Field(default=None, description="Optional command payload") + payload: dict[str, Any] | None = Field(default=None, description="Optional command payload") class AbortCommand(GraphEngineCommand): """Command to abort a running workflow execution.""" command_type: CommandType = Field(default=CommandType.ABORT, description="Type of command") - reason: Optional[str] = Field(default=None, description="Optional reason for abort") + reason: str | None = Field(default=None, description="Optional reason for abort") diff --git a/api/core/workflow/graph_engine/graph_engine.py b/api/core/workflow/graph_engine/graph_engine.py index 6f4f71bfe2..df194de909 100644 --- a/api/core/workflow/graph_engine/graph_engine.py +++ b/api/core/workflow/graph_engine/graph_engine.py @@ -9,7 +9,7 @@ import contextvars import logging import queue from collections.abc import Generator, Mapping -from typing import Any, Optional +from typing import Any from flask import Flask, current_app @@ -210,7 +210,7 @@ class GraphEngine: def _setup_worker_management(self) -> None: """Initialize worker management subsystem.""" # Capture context for workers - flask_app: Optional[Flask] = None + flask_app: Flask | None = None try: flask_app = current_app._get_current_object() # type: ignore except RuntimeError: diff --git a/api/core/workflow/graph_engine/graph_traversal/branch_handler.py b/api/core/workflow/graph_engine/graph_traversal/branch_handler.py index 94771a55a4..4bc3bd1e0a 100644 --- a/api/core/workflow/graph_engine/graph_traversal/branch_handler.py +++ b/api/core/workflow/graph_engine/graph_traversal/branch_handler.py @@ -3,7 +3,6 @@ Branch node handling for graph traversal. """ from collections.abc import Sequence -from typing import Optional from core.workflow.graph import Graph from core.workflow.graph_events.node import NodeRunStreamChunkEvent @@ -43,7 +42,7 @@ class BranchHandler: self.edge_state_manager = edge_state_manager def handle_branch_completion( - self, node_id: str, selected_handle: Optional[str] + self, node_id: str, selected_handle: str | None ) -> tuple[Sequence[str], Sequence[NodeRunStreamChunkEvent]]: """ Handle completion of a branch node. diff --git a/api/core/workflow/graph_engine/layers/base.py b/api/core/workflow/graph_engine/layers/base.py index df8115c526..febdc3de6d 100644 --- a/api/core/workflow/graph_engine/layers/base.py +++ b/api/core/workflow/graph_engine/layers/base.py @@ -6,7 +6,6 @@ intercept and respond to GraphEngine events. """ from abc import ABC, abstractmethod -from typing import Optional from core.workflow.entities import GraphRuntimeState from core.workflow.graph_engine.protocols.command_channel import CommandChannel @@ -28,8 +27,8 @@ class Layer(ABC): def __init__(self) -> None: """Initialize the layer. Subclasses can override with custom parameters.""" - self.graph_runtime_state: Optional[GraphRuntimeState] = None - self.command_channel: Optional[CommandChannel] = None + self.graph_runtime_state: GraphRuntimeState | None = None + self.command_channel: CommandChannel | None = None def initialize(self, graph_runtime_state: GraphRuntimeState, command_channel: CommandChannel) -> None: """ @@ -73,7 +72,7 @@ class Layer(ABC): pass @abstractmethod - def on_graph_end(self, error: Optional[Exception]) -> None: + def on_graph_end(self, error: Exception | None) -> None: """ Called when graph execution ends. diff --git a/api/core/workflow/graph_engine/layers/debug_logging.py b/api/core/workflow/graph_engine/layers/debug_logging.py index b5222c51d3..73db3260b1 100644 --- a/api/core/workflow/graph_engine/layers/debug_logging.py +++ b/api/core/workflow/graph_engine/layers/debug_logging.py @@ -7,7 +7,7 @@ graph execution for debugging purposes. import logging from collections.abc import Mapping -from typing import Any, Optional +from typing import Any from core.workflow.graph_events import ( GraphEngineEvent, @@ -221,7 +221,7 @@ class DebugLoggingLayer(Layer): # Log unknown events at debug level self.logger.debug("Event: %s", event_class) - def on_graph_end(self, error: Optional[Exception]) -> None: + def on_graph_end(self, error: Exception | None) -> None: """Log graph execution end with summary statistics.""" self.logger.info("=" * 80) diff --git a/api/core/workflow/graph_engine/layers/execution_limits.py b/api/core/workflow/graph_engine/layers/execution_limits.py index 321a7df8c3..71b6e045fd 100644 --- a/api/core/workflow/graph_engine/layers/execution_limits.py +++ b/api/core/workflow/graph_engine/layers/execution_limits.py @@ -11,7 +11,6 @@ When limits are exceeded, the layer automatically aborts execution. import logging import time from enum import Enum -from typing import Optional from core.workflow.graph_engine.entities.commands import AbortCommand, CommandType from core.workflow.graph_engine.layers import Layer @@ -53,7 +52,7 @@ class ExecutionLimitsLayer(Layer): self.max_time = max_time # Runtime tracking - self.start_time: Optional[float] = None + self.start_time: float | None = None self.step_count = 0 self.logger = logging.getLogger(__name__) @@ -94,7 +93,7 @@ class ExecutionLimitsLayer(Layer): if self._reached_time_limitation(): self._send_abort_command(LimitType.TIME_LIMIT) - def on_graph_end(self, error: Optional[Exception]) -> None: + def on_graph_end(self, error: Exception | None) -> None: """Called when graph execution ends.""" if self._execution_started and not self._execution_ended: self._execution_ended = True diff --git a/api/core/workflow/graph_engine/manager.py b/api/core/workflow/graph_engine/manager.py index a4f9cc7192..131006d1a7 100644 --- a/api/core/workflow/graph_engine/manager.py +++ b/api/core/workflow/graph_engine/manager.py @@ -6,7 +6,6 @@ using the new Redis command channel, without requiring user permission checks. Supports stop, pause, and resume operations. """ -from typing import Optional from core.workflow.graph_engine.command_channels.redis_channel import RedisChannel from core.workflow.graph_engine.entities.commands import AbortCommand @@ -23,7 +22,7 @@ class GraphEngineManager: """ @staticmethod - def send_stop_command(task_id: str, reason: Optional[str] = None) -> None: + def send_stop_command(task_id: str, reason: str | None = None) -> None: """ Send a stop command to a running workflow. diff --git a/api/core/workflow/graph_engine/orchestration/dispatcher.py b/api/core/workflow/graph_engine/orchestration/dispatcher.py index bee4651def..87ad3d0117 100644 --- a/api/core/workflow/graph_engine/orchestration/dispatcher.py +++ b/api/core/workflow/graph_engine/orchestration/dispatcher.py @@ -6,7 +6,7 @@ import logging import queue import threading import time -from typing import TYPE_CHECKING, Optional +from typing import TYPE_CHECKING from core.workflow.graph_events.base import GraphNodeEventBase @@ -34,7 +34,7 @@ class Dispatcher: event_collector: EventCollector, execution_coordinator: ExecutionCoordinator, max_execution_time: int, - event_emitter: Optional[EventEmitter] = None, + event_emitter: EventEmitter | None = None, ) -> None: """ Initialize the dispatcher. @@ -54,9 +54,9 @@ class Dispatcher: self.max_execution_time = max_execution_time self.event_emitter = event_emitter - self._thread: Optional[threading.Thread] = None + self._thread: threading.Thread | None = None self._stop_event = threading.Event() - self._start_time: Optional[float] = None + self._start_time: float | None = None def start(self) -> None: """Start the dispatcher thread.""" diff --git a/api/core/workflow/graph_engine/output_registry/registry.py b/api/core/workflow/graph_engine/output_registry/registry.py index 0f3e690eb1..42ccf51d62 100644 --- a/api/core/workflow/graph_engine/output_registry/registry.py +++ b/api/core/workflow/graph_engine/output_registry/registry.py @@ -7,7 +7,7 @@ thread-safe storage for node outputs. from collections.abc import Sequence from threading import RLock -from typing import TYPE_CHECKING, Optional, Union +from typing import TYPE_CHECKING, Union from core.variables import Segment from core.workflow.entities.variable_pool import VariablePool @@ -47,7 +47,7 @@ class OutputRegistry: with self._lock: self._scalars.add(selector, value) - def get_scalar(self, selector: Sequence[str]) -> Optional["Segment"]: + def get_scalar(self, selector: Sequence[str]) -> "Segment" | None: """ Get a scalar value for the given selector. @@ -81,7 +81,7 @@ class OutputRegistry: except ValueError: raise ValueError(f"Stream {'.'.join(selector)} is already closed") - def pop_chunk(self, selector: Sequence[str]) -> Optional["NodeRunStreamChunkEvent"]: + def pop_chunk(self, selector: Sequence[str]) -> "NodeRunStreamChunkEvent" | None: """ Pop the next unread NodeRunStreamChunkEvent from the stream. diff --git a/api/core/workflow/graph_engine/output_registry/stream.py b/api/core/workflow/graph_engine/output_registry/stream.py index dc12e479a4..1e52d4efaa 100644 --- a/api/core/workflow/graph_engine/output_registry/stream.py +++ b/api/core/workflow/graph_engine/output_registry/stream.py @@ -5,7 +5,7 @@ This module contains the private Stream class used internally by OutputRegistry to manage streaming data chunks. """ -from typing import TYPE_CHECKING, Optional +from typing import TYPE_CHECKING if TYPE_CHECKING: from core.workflow.graph_events import NodeRunStreamChunkEvent @@ -41,7 +41,7 @@ class Stream: raise ValueError("Cannot append to a closed stream") self.events.append(event) - def pop_next(self) -> Optional["NodeRunStreamChunkEvent"]: + def pop_next(self) -> "NodeRunStreamChunkEvent" | None: """ Pop the next unread NodeRunStreamChunkEvent from the stream. diff --git a/api/core/workflow/graph_engine/response_coordinator/coordinator.py b/api/core/workflow/graph_engine/response_coordinator/coordinator.py index 40c7d19102..380f5de04c 100644 --- a/api/core/workflow/graph_engine/response_coordinator/coordinator.py +++ b/api/core/workflow/graph_engine/response_coordinator/coordinator.py @@ -9,7 +9,7 @@ import logging from collections import deque from collections.abc import Sequence from threading import RLock -from typing import Optional, TypeAlias +from typing import TypeAlias from uuid import uuid4 from core.workflow.enums import NodeExecutionType, NodeState @@ -45,7 +45,7 @@ class ResponseStreamCoordinator: """ self.registry = registry self.graph = graph - self.active_session: Optional[ResponseSession] = None + self.active_session: ResponseSession | None = None self.waiting_sessions: deque[ResponseSession] = deque() self.lock = RLock() diff --git a/api/core/workflow/graph_engine/worker.py b/api/core/workflow/graph_engine/worker.py index bc4025978a..b7eedbc871 100644 --- a/api/core/workflow/graph_engine/worker.py +++ b/api/core/workflow/graph_engine/worker.py @@ -11,7 +11,6 @@ import threading import time from collections.abc import Callable from datetime import datetime -from typing import Optional from uuid import uuid4 from flask import Flask @@ -38,10 +37,10 @@ class Worker(threading.Thread): event_queue: queue.Queue[GraphNodeEventBase], graph: Graph, worker_id: int = 0, - flask_app: Optional[Flask] = None, - context_vars: Optional[contextvars.Context] = None, - on_idle_callback: Optional[Callable[[int], None]] = None, - on_active_callback: Optional[Callable[[int], None]] = None, + flask_app: Flask | None = None, + context_vars: contextvars.Context | None = None, + on_idle_callback: Callable[[int], None] | None = None, + on_active_callback: Callable[[int], None] | None = None, ) -> None: """ Initialize worker thread. diff --git a/api/core/workflow/graph_engine/worker_management/worker_factory.py b/api/core/workflow/graph_engine/worker_management/worker_factory.py index 76cfc45b10..8714ee88d1 100644 --- a/api/core/workflow/graph_engine/worker_management/worker_factory.py +++ b/api/core/workflow/graph_engine/worker_management/worker_factory.py @@ -5,7 +5,6 @@ Factory for creating worker instances. import contextvars import queue from collections.abc import Callable -from typing import Optional from flask import Flask @@ -24,7 +23,7 @@ class WorkerFactory: def __init__( self, - flask_app: Optional[Flask], + flask_app: Flask | None, context_vars: contextvars.Context, ) -> None: """ @@ -43,8 +42,8 @@ class WorkerFactory: ready_queue: queue.Queue[str], event_queue: queue.Queue, graph: Graph, - on_idle_callback: Optional[Callable[[int], None]] = None, - on_active_callback: Optional[Callable[[int], None]] = None, + on_idle_callback: Callable[[int], None] | None = None, + on_active_callback: Callable[[int], None] | None = None, ) -> Worker: """ Create a new worker instance.