diff --git a/api/core/workflow/graph_engine/error_handling/__init__.py b/api/core/workflow/graph_engine/error_handling/__init__.py index 1316710d0d..3189fea0c9 100644 --- a/api/core/workflow/graph_engine/error_handling/__init__.py +++ b/api/core/workflow/graph_engine/error_handling/__init__.py @@ -1,20 +1,12 @@ """ -Error handling strategies for graph engine. +Error handling for graph engine. -This package implements different error recovery strategies using -the Strategy pattern for clean separation of concerns. +This package provides error handling functionality for managing +node execution failures with different recovery strategies. """ -from .abort_strategy import AbortStrategy -from .default_value_strategy import DefaultValueStrategy from .error_handler import ErrorHandler -from .fail_branch_strategy import FailBranchStrategy -from .retry_strategy import RetryStrategy __all__ = [ - "AbortStrategy", - "DefaultValueStrategy", "ErrorHandler", - "FailBranchStrategy", - "RetryStrategy", ] diff --git a/api/core/workflow/graph_engine/error_handling/abort_strategy.py b/api/core/workflow/graph_engine/error_handling/abort_strategy.py deleted file mode 100644 index 4593f004f3..0000000000 --- a/api/core/workflow/graph_engine/error_handling/abort_strategy.py +++ /dev/null @@ -1,40 +0,0 @@ -""" -Abort error strategy implementation. -""" - -import logging -from typing import final - -from core.workflow.graph import Graph -from core.workflow.graph_events import GraphNodeEventBase, NodeRunFailedEvent - -logger = logging.getLogger(__name__) - - -@final -class AbortStrategy: - """ - Error strategy that aborts execution on failure. - - This is the default strategy when no other strategy is specified. - It stops the entire graph execution when a node fails. - """ - - def handle_error(self, event: NodeRunFailedEvent, graph: Graph, retry_count: int) -> GraphNodeEventBase | None: - """ - Handle error by aborting execution. - - Args: - event: The failure event - graph: The workflow graph - retry_count: Current retry attempt count (unused) - - Returns: - None - signals abortion - """ - _ = graph - _ = retry_count - logger.error("Node %s failed with ABORT strategy: %s", event.node_id, event.error) - - # Return None to signal that execution should stop - return None diff --git a/api/core/workflow/graph_engine/error_handling/default_value_strategy.py b/api/core/workflow/graph_engine/error_handling/default_value_strategy.py deleted file mode 100644 index 3cdcec88e5..0000000000 --- a/api/core/workflow/graph_engine/error_handling/default_value_strategy.py +++ /dev/null @@ -1,58 +0,0 @@ -""" -Default value error strategy implementation. -""" - -from typing import final - -from core.workflow.enums import ErrorStrategy, WorkflowNodeExecutionMetadataKey, WorkflowNodeExecutionStatus -from core.workflow.graph import Graph -from core.workflow.graph_events import GraphNodeEventBase, NodeRunExceptionEvent, NodeRunFailedEvent -from core.workflow.node_events import NodeRunResult - - -@final -class DefaultValueStrategy: - """ - Error strategy that uses default values on failure. - - This strategy allows nodes to fail gracefully by providing - predefined default output values. - """ - - def handle_error(self, event: NodeRunFailedEvent, graph: Graph, retry_count: int) -> GraphNodeEventBase | None: - """ - Handle error by using default values. - - Args: - event: The failure event - graph: The workflow graph - retry_count: Current retry attempt count (unused) - - Returns: - NodeRunExceptionEvent with default values - """ - _ = retry_count - node = graph.nodes[event.node_id] - - outputs = { - **node.default_value_dict, - "error_message": event.node_run_result.error, - "error_type": event.node_run_result.error_type, - } - - return NodeRunExceptionEvent( - id=event.id, - node_id=event.node_id, - node_type=event.node_type, - start_at=event.start_at, - node_run_result=NodeRunResult( - status=WorkflowNodeExecutionStatus.EXCEPTION, - inputs=event.node_run_result.inputs, - process_data=event.node_run_result.process_data, - outputs=outputs, - metadata={ - WorkflowNodeExecutionMetadataKey.ERROR_STRATEGY: ErrorStrategy.DEFAULT_VALUE, - }, - ), - error=event.error, - ) diff --git a/api/core/workflow/graph_engine/error_handling/error_handler.py b/api/core/workflow/graph_engine/error_handling/error_handler.py index b51d7e4dad..aa6e6287e0 100644 --- a/api/core/workflow/graph_engine/error_handling/error_handler.py +++ b/api/core/workflow/graph_engine/error_handling/error_handler.py @@ -2,20 +2,31 @@ Main error handler that coordinates error strategies. """ +import logging +import time from typing import TYPE_CHECKING, final -from core.workflow.enums import ErrorStrategy as ErrorStrategyEnum +from core.workflow.enums import ( + ErrorStrategy as ErrorStrategyEnum, +) +from core.workflow.enums import ( + WorkflowNodeExecutionMetadataKey, + WorkflowNodeExecutionStatus, +) from core.workflow.graph import Graph -from core.workflow.graph_events import GraphNodeEventBase, NodeRunFailedEvent - -from .abort_strategy import AbortStrategy -from .default_value_strategy import DefaultValueStrategy -from .fail_branch_strategy import FailBranchStrategy -from .retry_strategy import RetryStrategy +from core.workflow.graph_events import ( + GraphNodeEventBase, + NodeRunExceptionEvent, + NodeRunFailedEvent, + NodeRunRetryEvent, +) +from core.workflow.node_events import NodeRunResult if TYPE_CHECKING: from ..domain import GraphExecution +logger = logging.getLogger(__name__) + @final class ErrorHandler: @@ -38,12 +49,6 @@ class ErrorHandler: self._graph = graph self._graph_execution = graph_execution - # Initialize strategies - self._abort_strategy = AbortStrategy() - self._retry_strategy = RetryStrategy() - self._fail_branch_strategy = FailBranchStrategy() - self._default_value_strategy = DefaultValueStrategy() - def handle_node_failure(self, event: NodeRunFailedEvent) -> GraphNodeEventBase | None: """ Handle a node failure event. @@ -64,7 +69,7 @@ class ErrorHandler: # First check if retry is configured and not exhausted if node.retry and retry_count < node.retry_config.max_retries: - result = self._retry_strategy.handle_error(event, self._graph, retry_count) + result = self._handle_retry(event, retry_count) if result: # Retry count will be incremented when NodeRunRetryEvent is handled return result @@ -74,8 +79,133 @@ class ErrorHandler: match strategy: case None: - return self._abort_strategy.handle_error(event, self._graph, retry_count) + return self._handle_abort(event) case ErrorStrategyEnum.FAIL_BRANCH: - return self._fail_branch_strategy.handle_error(event, self._graph, retry_count) + return self._handle_fail_branch(event) case ErrorStrategyEnum.DEFAULT_VALUE: - return self._default_value_strategy.handle_error(event, self._graph, retry_count) + return self._handle_default_value(event) + + def _handle_abort(self, event: NodeRunFailedEvent): + """ + Handle error by aborting execution. + + This is the default strategy when no other strategy is specified. + It stops the entire graph execution when a node fails. + + Args: + event: The failure event + + Returns: + None - signals abortion + """ + logger.error("Node %s failed with ABORT strategy: %s", event.node_id, event.error) + # Return None to signal that execution should stop + + def _handle_retry(self, event: NodeRunFailedEvent, retry_count: int): + """ + Handle error by retrying the node. + + This strategy re-attempts node execution up to a configured + maximum number of retries with configurable intervals. + + Args: + event: The failure event + retry_count: Current retry attempt count + + Returns: + NodeRunRetryEvent if retry should occur, None otherwise + """ + node = self._graph.nodes[event.node_id] + + # Check if we've exceeded max retries + if not node.retry or retry_count >= node.retry_config.max_retries: + return None + + # Wait for retry interval + time.sleep(node.retry_config.retry_interval_seconds) + + # Create retry event + return NodeRunRetryEvent( + id=event.id, + node_title=node.title, + node_id=event.node_id, + node_type=event.node_type, + node_run_result=event.node_run_result, + start_at=event.start_at, + error=event.error, + retry_index=retry_count + 1, + ) + + def _handle_fail_branch(self, event: NodeRunFailedEvent): + """ + Handle error by taking the fail branch. + + This strategy converts failures to exceptions and routes execution + through a designated fail-branch edge. + + Args: + event: The failure event + + Returns: + NodeRunExceptionEvent to continue via fail branch + """ + outputs = { + "error_message": event.node_run_result.error, + "error_type": event.node_run_result.error_type, + } + + return NodeRunExceptionEvent( + id=event.id, + node_id=event.node_id, + node_type=event.node_type, + start_at=event.start_at, + node_run_result=NodeRunResult( + status=WorkflowNodeExecutionStatus.EXCEPTION, + inputs=event.node_run_result.inputs, + process_data=event.node_run_result.process_data, + outputs=outputs, + edge_source_handle="fail-branch", + metadata={ + WorkflowNodeExecutionMetadataKey.ERROR_STRATEGY: ErrorStrategyEnum.FAIL_BRANCH, + }, + ), + error=event.error, + ) + + def _handle_default_value(self, event: NodeRunFailedEvent): + """ + Handle error by using default values. + + This strategy allows nodes to fail gracefully by providing + predefined default output values. + + Args: + event: The failure event + + Returns: + NodeRunExceptionEvent with default values + """ + node = self._graph.nodes[event.node_id] + + outputs = { + **node.default_value_dict, + "error_message": event.node_run_result.error, + "error_type": event.node_run_result.error_type, + } + + return NodeRunExceptionEvent( + id=event.id, + node_id=event.node_id, + node_type=event.node_type, + start_at=event.start_at, + node_run_result=NodeRunResult( + status=WorkflowNodeExecutionStatus.EXCEPTION, + inputs=event.node_run_result.inputs, + process_data=event.node_run_result.process_data, + outputs=outputs, + metadata={ + WorkflowNodeExecutionMetadataKey.ERROR_STRATEGY: ErrorStrategyEnum.DEFAULT_VALUE, + }, + ), + error=event.error, + ) diff --git a/api/core/workflow/graph_engine/error_handling/fail_branch_strategy.py b/api/core/workflow/graph_engine/error_handling/fail_branch_strategy.py deleted file mode 100644 index 1c156b5be1..0000000000 --- a/api/core/workflow/graph_engine/error_handling/fail_branch_strategy.py +++ /dev/null @@ -1,57 +0,0 @@ -""" -Fail branch error strategy implementation. -""" - -from typing import final - -from core.workflow.enums import ErrorStrategy, WorkflowNodeExecutionMetadataKey, WorkflowNodeExecutionStatus -from core.workflow.graph import Graph -from core.workflow.graph_events import GraphNodeEventBase, NodeRunExceptionEvent, NodeRunFailedEvent -from core.workflow.node_events import NodeRunResult - - -@final -class FailBranchStrategy: - """ - Error strategy that continues execution via a fail branch. - - This strategy converts failures to exceptions and routes execution - through a designated fail-branch edge. - """ - - def handle_error(self, event: NodeRunFailedEvent, graph: Graph, retry_count: int) -> GraphNodeEventBase | None: - """ - Handle error by taking the fail branch. - - Args: - event: The failure event - graph: The workflow graph - retry_count: Current retry attempt count (unused) - - Returns: - NodeRunExceptionEvent to continue via fail branch - """ - _ = graph - _ = retry_count - outputs = { - "error_message": event.node_run_result.error, - "error_type": event.node_run_result.error_type, - } - - return NodeRunExceptionEvent( - id=event.id, - node_id=event.node_id, - node_type=event.node_type, - start_at=event.start_at, - node_run_result=NodeRunResult( - status=WorkflowNodeExecutionStatus.EXCEPTION, - inputs=event.node_run_result.inputs, - process_data=event.node_run_result.process_data, - outputs=outputs, - edge_source_handle="fail-branch", - metadata={ - WorkflowNodeExecutionMetadataKey.ERROR_STRATEGY: ErrorStrategy.FAIL_BRANCH, - }, - ), - error=event.error, - ) diff --git a/api/core/workflow/graph_engine/error_handling/retry_strategy.py b/api/core/workflow/graph_engine/error_handling/retry_strategy.py deleted file mode 100644 index e4010b6bdb..0000000000 --- a/api/core/workflow/graph_engine/error_handling/retry_strategy.py +++ /dev/null @@ -1,52 +0,0 @@ -""" -Retry error strategy implementation. -""" - -import time -from typing import final - -from core.workflow.graph import Graph -from core.workflow.graph_events import GraphNodeEventBase, NodeRunFailedEvent, NodeRunRetryEvent - - -@final -class RetryStrategy: - """ - Error strategy that retries failed nodes. - - This strategy re-attempts node execution up to a configured - maximum number of retries with configurable intervals. - """ - - def handle_error(self, event: NodeRunFailedEvent, graph: Graph, retry_count: int) -> GraphNodeEventBase | None: - """ - Handle error by retrying the node. - - Args: - event: The failure event - graph: The workflow graph - retry_count: Current retry attempt count - - Returns: - NodeRunRetryEvent if retry should occur, None otherwise - """ - node = graph.nodes[event.node_id] - - # Check if we've exceeded max retries - if not node.retry or retry_count >= node.retry_config.max_retries: - return None - - # Wait for retry interval - time.sleep(node.retry_config.retry_interval_seconds) - - # Create retry event - return NodeRunRetryEvent( - id=event.id, - node_title=node.title, - node_id=event.node_id, - node_type=event.node_type, - node_run_result=event.node_run_result, - start_at=event.start_at, - error=event.error, - retry_index=retry_count + 1, - ) diff --git a/api/core/workflow/graph_engine/protocols/error_strategy.py b/api/core/workflow/graph_engine/protocols/error_strategy.py deleted file mode 100644 index bf8b316423..0000000000 --- a/api/core/workflow/graph_engine/protocols/error_strategy.py +++ /dev/null @@ -1,31 +0,0 @@ -""" -Base error strategy protocol. -""" - -from typing import Protocol - -from core.workflow.graph import Graph -from core.workflow.graph_events import GraphNodeEventBase, NodeRunFailedEvent - - -class ErrorStrategy(Protocol): - """ - Protocol for error handling strategies. - - Each strategy implements a different approach to handling - node execution failures. - """ - - def handle_error(self, event: NodeRunFailedEvent, graph: Graph, retry_count: int) -> GraphNodeEventBase | None: - """ - Handle a node failure event. - - Args: - event: The failure event - graph: The workflow graph - retry_count: Current retry attempt count - - Returns: - Optional new event to process, or None to stop - """ - ...