refactor(graph_engine): Merge error strategies into error_handler.py

Signed-off-by: -LAN- <laipz8200@outlook.com>
This commit is contained in:
-LAN- 2025-09-10 01:49:46 +08:00
parent e060d7c28c
commit d52621fce3
No known key found for this signature in database
GPG Key ID: 6BA0D108DED011FF
7 changed files with 150 additions and 266 deletions

View File

@ -1,20 +1,12 @@
"""
Error handling strategies for graph engine.
Error handling for graph engine.
This package implements different error recovery strategies using
the Strategy pattern for clean separation of concerns.
This package provides error handling functionality for managing
node execution failures with different recovery strategies.
"""
from .abort_strategy import AbortStrategy
from .default_value_strategy import DefaultValueStrategy
from .error_handler import ErrorHandler
from .fail_branch_strategy import FailBranchStrategy
from .retry_strategy import RetryStrategy
__all__ = [
"AbortStrategy",
"DefaultValueStrategy",
"ErrorHandler",
"FailBranchStrategy",
"RetryStrategy",
]

View File

@ -1,40 +0,0 @@
"""
Abort error strategy implementation.
"""
import logging
from typing import final
from core.workflow.graph import Graph
from core.workflow.graph_events import GraphNodeEventBase, NodeRunFailedEvent
logger = logging.getLogger(__name__)
@final
class AbortStrategy:
"""
Error strategy that aborts execution on failure.
This is the default strategy when no other strategy is specified.
It stops the entire graph execution when a node fails.
"""
def handle_error(self, event: NodeRunFailedEvent, graph: Graph, retry_count: int) -> GraphNodeEventBase | None:
"""
Handle error by aborting execution.
Args:
event: The failure event
graph: The workflow graph
retry_count: Current retry attempt count (unused)
Returns:
None - signals abortion
"""
_ = graph
_ = retry_count
logger.error("Node %s failed with ABORT strategy: %s", event.node_id, event.error)
# Return None to signal that execution should stop
return None

View File

@ -1,58 +0,0 @@
"""
Default value error strategy implementation.
"""
from typing import final
from core.workflow.enums import ErrorStrategy, WorkflowNodeExecutionMetadataKey, WorkflowNodeExecutionStatus
from core.workflow.graph import Graph
from core.workflow.graph_events import GraphNodeEventBase, NodeRunExceptionEvent, NodeRunFailedEvent
from core.workflow.node_events import NodeRunResult
@final
class DefaultValueStrategy:
"""
Error strategy that uses default values on failure.
This strategy allows nodes to fail gracefully by providing
predefined default output values.
"""
def handle_error(self, event: NodeRunFailedEvent, graph: Graph, retry_count: int) -> GraphNodeEventBase | None:
"""
Handle error by using default values.
Args:
event: The failure event
graph: The workflow graph
retry_count: Current retry attempt count (unused)
Returns:
NodeRunExceptionEvent with default values
"""
_ = retry_count
node = graph.nodes[event.node_id]
outputs = {
**node.default_value_dict,
"error_message": event.node_run_result.error,
"error_type": event.node_run_result.error_type,
}
return NodeRunExceptionEvent(
id=event.id,
node_id=event.node_id,
node_type=event.node_type,
start_at=event.start_at,
node_run_result=NodeRunResult(
status=WorkflowNodeExecutionStatus.EXCEPTION,
inputs=event.node_run_result.inputs,
process_data=event.node_run_result.process_data,
outputs=outputs,
metadata={
WorkflowNodeExecutionMetadataKey.ERROR_STRATEGY: ErrorStrategy.DEFAULT_VALUE,
},
),
error=event.error,
)

View File

@ -2,20 +2,31 @@
Main error handler that coordinates error strategies.
"""
import logging
import time
from typing import TYPE_CHECKING, final
from core.workflow.enums import ErrorStrategy as ErrorStrategyEnum
from core.workflow.enums import (
ErrorStrategy as ErrorStrategyEnum,
)
from core.workflow.enums import (
WorkflowNodeExecutionMetadataKey,
WorkflowNodeExecutionStatus,
)
from core.workflow.graph import Graph
from core.workflow.graph_events import GraphNodeEventBase, NodeRunFailedEvent
from .abort_strategy import AbortStrategy
from .default_value_strategy import DefaultValueStrategy
from .fail_branch_strategy import FailBranchStrategy
from .retry_strategy import RetryStrategy
from core.workflow.graph_events import (
GraphNodeEventBase,
NodeRunExceptionEvent,
NodeRunFailedEvent,
NodeRunRetryEvent,
)
from core.workflow.node_events import NodeRunResult
if TYPE_CHECKING:
from ..domain import GraphExecution
logger = logging.getLogger(__name__)
@final
class ErrorHandler:
@ -38,12 +49,6 @@ class ErrorHandler:
self._graph = graph
self._graph_execution = graph_execution
# Initialize strategies
self._abort_strategy = AbortStrategy()
self._retry_strategy = RetryStrategy()
self._fail_branch_strategy = FailBranchStrategy()
self._default_value_strategy = DefaultValueStrategy()
def handle_node_failure(self, event: NodeRunFailedEvent) -> GraphNodeEventBase | None:
"""
Handle a node failure event.
@ -64,7 +69,7 @@ class ErrorHandler:
# First check if retry is configured and not exhausted
if node.retry and retry_count < node.retry_config.max_retries:
result = self._retry_strategy.handle_error(event, self._graph, retry_count)
result = self._handle_retry(event, retry_count)
if result:
# Retry count will be incremented when NodeRunRetryEvent is handled
return result
@ -74,8 +79,133 @@ class ErrorHandler:
match strategy:
case None:
return self._abort_strategy.handle_error(event, self._graph, retry_count)
return self._handle_abort(event)
case ErrorStrategyEnum.FAIL_BRANCH:
return self._fail_branch_strategy.handle_error(event, self._graph, retry_count)
return self._handle_fail_branch(event)
case ErrorStrategyEnum.DEFAULT_VALUE:
return self._default_value_strategy.handle_error(event, self._graph, retry_count)
return self._handle_default_value(event)
def _handle_abort(self, event: NodeRunFailedEvent):
"""
Handle error by aborting execution.
This is the default strategy when no other strategy is specified.
It stops the entire graph execution when a node fails.
Args:
event: The failure event
Returns:
None - signals abortion
"""
logger.error("Node %s failed with ABORT strategy: %s", event.node_id, event.error)
# Return None to signal that execution should stop
def _handle_retry(self, event: NodeRunFailedEvent, retry_count: int):
"""
Handle error by retrying the node.
This strategy re-attempts node execution up to a configured
maximum number of retries with configurable intervals.
Args:
event: The failure event
retry_count: Current retry attempt count
Returns:
NodeRunRetryEvent if retry should occur, None otherwise
"""
node = self._graph.nodes[event.node_id]
# Check if we've exceeded max retries
if not node.retry or retry_count >= node.retry_config.max_retries:
return None
# Wait for retry interval
time.sleep(node.retry_config.retry_interval_seconds)
# Create retry event
return NodeRunRetryEvent(
id=event.id,
node_title=node.title,
node_id=event.node_id,
node_type=event.node_type,
node_run_result=event.node_run_result,
start_at=event.start_at,
error=event.error,
retry_index=retry_count + 1,
)
def _handle_fail_branch(self, event: NodeRunFailedEvent):
"""
Handle error by taking the fail branch.
This strategy converts failures to exceptions and routes execution
through a designated fail-branch edge.
Args:
event: The failure event
Returns:
NodeRunExceptionEvent to continue via fail branch
"""
outputs = {
"error_message": event.node_run_result.error,
"error_type": event.node_run_result.error_type,
}
return NodeRunExceptionEvent(
id=event.id,
node_id=event.node_id,
node_type=event.node_type,
start_at=event.start_at,
node_run_result=NodeRunResult(
status=WorkflowNodeExecutionStatus.EXCEPTION,
inputs=event.node_run_result.inputs,
process_data=event.node_run_result.process_data,
outputs=outputs,
edge_source_handle="fail-branch",
metadata={
WorkflowNodeExecutionMetadataKey.ERROR_STRATEGY: ErrorStrategyEnum.FAIL_BRANCH,
},
),
error=event.error,
)
def _handle_default_value(self, event: NodeRunFailedEvent):
"""
Handle error by using default values.
This strategy allows nodes to fail gracefully by providing
predefined default output values.
Args:
event: The failure event
Returns:
NodeRunExceptionEvent with default values
"""
node = self._graph.nodes[event.node_id]
outputs = {
**node.default_value_dict,
"error_message": event.node_run_result.error,
"error_type": event.node_run_result.error_type,
}
return NodeRunExceptionEvent(
id=event.id,
node_id=event.node_id,
node_type=event.node_type,
start_at=event.start_at,
node_run_result=NodeRunResult(
status=WorkflowNodeExecutionStatus.EXCEPTION,
inputs=event.node_run_result.inputs,
process_data=event.node_run_result.process_data,
outputs=outputs,
metadata={
WorkflowNodeExecutionMetadataKey.ERROR_STRATEGY: ErrorStrategyEnum.DEFAULT_VALUE,
},
),
error=event.error,
)

View File

@ -1,57 +0,0 @@
"""
Fail branch error strategy implementation.
"""
from typing import final
from core.workflow.enums import ErrorStrategy, WorkflowNodeExecutionMetadataKey, WorkflowNodeExecutionStatus
from core.workflow.graph import Graph
from core.workflow.graph_events import GraphNodeEventBase, NodeRunExceptionEvent, NodeRunFailedEvent
from core.workflow.node_events import NodeRunResult
@final
class FailBranchStrategy:
"""
Error strategy that continues execution via a fail branch.
This strategy converts failures to exceptions and routes execution
through a designated fail-branch edge.
"""
def handle_error(self, event: NodeRunFailedEvent, graph: Graph, retry_count: int) -> GraphNodeEventBase | None:
"""
Handle error by taking the fail branch.
Args:
event: The failure event
graph: The workflow graph
retry_count: Current retry attempt count (unused)
Returns:
NodeRunExceptionEvent to continue via fail branch
"""
_ = graph
_ = retry_count
outputs = {
"error_message": event.node_run_result.error,
"error_type": event.node_run_result.error_type,
}
return NodeRunExceptionEvent(
id=event.id,
node_id=event.node_id,
node_type=event.node_type,
start_at=event.start_at,
node_run_result=NodeRunResult(
status=WorkflowNodeExecutionStatus.EXCEPTION,
inputs=event.node_run_result.inputs,
process_data=event.node_run_result.process_data,
outputs=outputs,
edge_source_handle="fail-branch",
metadata={
WorkflowNodeExecutionMetadataKey.ERROR_STRATEGY: ErrorStrategy.FAIL_BRANCH,
},
),
error=event.error,
)

View File

@ -1,52 +0,0 @@
"""
Retry error strategy implementation.
"""
import time
from typing import final
from core.workflow.graph import Graph
from core.workflow.graph_events import GraphNodeEventBase, NodeRunFailedEvent, NodeRunRetryEvent
@final
class RetryStrategy:
"""
Error strategy that retries failed nodes.
This strategy re-attempts node execution up to a configured
maximum number of retries with configurable intervals.
"""
def handle_error(self, event: NodeRunFailedEvent, graph: Graph, retry_count: int) -> GraphNodeEventBase | None:
"""
Handle error by retrying the node.
Args:
event: The failure event
graph: The workflow graph
retry_count: Current retry attempt count
Returns:
NodeRunRetryEvent if retry should occur, None otherwise
"""
node = graph.nodes[event.node_id]
# Check if we've exceeded max retries
if not node.retry or retry_count >= node.retry_config.max_retries:
return None
# Wait for retry interval
time.sleep(node.retry_config.retry_interval_seconds)
# Create retry event
return NodeRunRetryEvent(
id=event.id,
node_title=node.title,
node_id=event.node_id,
node_type=event.node_type,
node_run_result=event.node_run_result,
start_at=event.start_at,
error=event.error,
retry_index=retry_count + 1,
)

View File

@ -1,31 +0,0 @@
"""
Base error strategy protocol.
"""
from typing import Protocol
from core.workflow.graph import Graph
from core.workflow.graph_events import GraphNodeEventBase, NodeRunFailedEvent
class ErrorStrategy(Protocol):
"""
Protocol for error handling strategies.
Each strategy implements a different approach to handling
node execution failures.
"""
def handle_error(self, event: NodeRunFailedEvent, graph: Graph, retry_count: int) -> GraphNodeEventBase | None:
"""
Handle a node failure event.
Args:
event: The failure event
graph: The workflow graph
retry_count: Current retry attempt count
Returns:
Optional new event to process, or None to stop
"""
...