chore(graph_engine): Use `TYPE | None` instead of `Optional`

Signed-off-by: -LAN- <laipz8200@outlook.com>
This commit is contained in:
-LAN- 2025-08-28 04:30:50 +08:00
parent 8129ca7c05
commit 7cbf4093f4
No known key found for this signature in database
GPG Key ID: 6BA0D108DED011FF
5 changed files with 5 additions and 12 deletions

View File

@ -3,7 +3,6 @@ GraphExecution aggregate root managing the overall graph execution state.
"""
from dataclasses import dataclass, field
from typing import Optional
from .node_execution import NodeExecution
@ -21,7 +20,7 @@ class GraphExecution:
started: bool = False
completed: bool = False
aborted: bool = False
error: Optional[Exception] = None
error: Exception | None = None
node_executions: dict[str, NodeExecution] = field(default_factory=dict)
def start(self) -> None:

View File

@ -3,7 +3,6 @@ Abort error strategy implementation.
"""
import logging
from typing import Optional
from core.workflow.graph import Graph
from core.workflow.graph_events import GraphNodeEventBase, NodeRunFailedEvent
@ -19,7 +18,7 @@ class AbortStrategy:
It stops the entire graph execution when a node fails.
"""
def handle_error(self, event: NodeRunFailedEvent, graph: Graph, retry_count: int) -> Optional[GraphNodeEventBase]:
def handle_error(self, event: NodeRunFailedEvent, graph: Graph, retry_count: int) -> GraphNodeEventBase | None:
"""
Handle error by aborting execution.

View File

@ -2,8 +2,6 @@
Default value error strategy implementation.
"""
from typing import Optional
from core.workflow.enums import ErrorStrategy, WorkflowNodeExecutionMetadataKey, WorkflowNodeExecutionStatus
from core.workflow.graph import Graph
from core.workflow.graph_events import GraphNodeEventBase, NodeRunExceptionEvent, NodeRunFailedEvent
@ -18,7 +16,7 @@ class DefaultValueStrategy:
predefined default output values.
"""
def handle_error(self, event: NodeRunFailedEvent, graph: Graph, retry_count: int) -> Optional[GraphNodeEventBase]:
def handle_error(self, event: NodeRunFailedEvent, graph: Graph, retry_count: int) -> GraphNodeEventBase | None:
"""
Handle error by using default values.

View File

@ -2,8 +2,6 @@
Fail branch error strategy implementation.
"""
from typing import Optional
from core.workflow.enums import ErrorStrategy, WorkflowNodeExecutionMetadataKey, WorkflowNodeExecutionStatus
from core.workflow.graph import Graph
from core.workflow.graph_events import GraphNodeEventBase, NodeRunExceptionEvent, NodeRunFailedEvent
@ -18,7 +16,7 @@ class FailBranchStrategy:
through a designated fail-branch edge.
"""
def handle_error(self, event: NodeRunFailedEvent, graph: Graph, retry_count: int) -> Optional[GraphNodeEventBase]:
def handle_error(self, event: NodeRunFailedEvent, graph: Graph, retry_count: int) -> GraphNodeEventBase | None:
"""
Handle error by taking the fail branch.

View File

@ -3,7 +3,6 @@ Retry error strategy implementation.
"""
import time
from typing import Optional
from core.workflow.graph import Graph
from core.workflow.graph_events import GraphNodeEventBase, NodeRunFailedEvent, NodeRunRetryEvent
@ -17,7 +16,7 @@ class RetryStrategy:
maximum number of retries with configurable intervals.
"""
def handle_error(self, event: NodeRunFailedEvent, graph: Graph, retry_count: int) -> Optional[GraphNodeEventBase]:
def handle_error(self, event: NodeRunFailedEvent, graph: Graph, retry_count: int) -> GraphNodeEventBase | None:
"""
Handle error by retrying the node.