From 127291a90f5b00fa5c3fb6e5953c3533a116aabf Mon Sep 17 00:00:00 2001 From: Novice Lee Date: Thu, 19 Dec 2024 17:03:05 +0800 Subject: [PATCH] feat: add single step retry --- api/core/app/apps/workflow_app_runner.py | 1 + api/core/app/entities/queue_entities.py | 1 + .../task_pipeline/workflow_cycle_manage.py | 29 ++-- api/core/workflow/entities/node_entities.py | 3 + .../workflow/graph_engine/entities/event.py | 1 + .../workflow/graph_engine/graph_engine.py | 7 +- api/core/workflow/nodes/base/entities.py | 4 + api/core/workflow/nodes/event/event.py | 15 ++ api/fields/workflow_run_fields.py | 12 ++ api/services/workflow_service.py | 137 ++++++++++++------ 10 files changed, 142 insertions(+), 68 deletions(-) diff --git a/api/core/app/apps/workflow_app_runner.py b/api/core/app/apps/workflow_app_runner.py index 8374d8a64a..2fbf711175 100644 --- a/api/core/app/apps/workflow_app_runner.py +++ b/api/core/app/apps/workflow_app_runner.py @@ -449,6 +449,7 @@ class WorkflowBasedAppRunner(AppRunner): else {}, in_iteration_id=event.in_iteration_id, retry_index=event.retry_index, + start_index=event.start_index, ) ) diff --git a/api/core/app/entities/queue_entities.py b/api/core/app/entities/queue_entities.py index f9434fab68..49b7e80246 100644 --- a/api/core/app/entities/queue_entities.py +++ b/api/core/app/entities/queue_entities.py @@ -342,6 +342,7 @@ class QueueNodeRetryEvent(AppQueueEvent): error: str retry_index: int # retry index + start_index: int # start index class QueueNodeInIterationFailedEvent(AppQueueEvent): diff --git a/api/core/app/task_pipeline/workflow_cycle_manage.py b/api/core/app/task_pipeline/workflow_cycle_manage.py index 041e82b099..e2fa12b1cd 100644 --- a/api/core/app/task_pipeline/workflow_cycle_manage.py +++ b/api/core/app/task_pipeline/workflow_cycle_manage.py @@ -436,8 +436,10 @@ class WorkflowCycleManage: created_at = event.start_at finished_at = datetime.now(UTC).replace(tzinfo=None) elapsed_time = (finished_at - created_at).total_seconds() - workflow_node_execution = WorkflowNodeExecution() + inputs = WorkflowEntry.handle_special_values(event.inputs) + outputs = WorkflowEntry.handle_special_values(event.outputs) + workflow_node_execution = WorkflowNodeExecution() workflow_node_execution.tenant_id = workflow_run.tenant_id workflow_node_execution.app_id = workflow_run.app_id workflow_node_execution.workflow_id = workflow_run.workflow_id @@ -454,31 +456,18 @@ class WorkflowCycleManage: workflow_node_execution.finished_at = finished_at workflow_node_execution.elapsed_time = elapsed_time workflow_node_execution.error = event.error + workflow_node_execution.inputs = json.dumps(inputs) if inputs else None + workflow_node_execution.outputs = json.dumps(outputs) if outputs else None workflow_node_execution.execution_metadata = json.dumps( { NodeRunMetadataKey.ITERATION_ID: event.in_iteration_id, } ) + workflow_node_execution.index = event.start_index - with Session(db.engine, expire_on_commit=False) as session: - failed_execution = ( - session.query(WorkflowNodeExecution) - .filter( - WorkflowNodeExecution.tenant_id == workflow_run.tenant_id, - WorkflowNodeExecution.app_id == workflow_run.app_id, - WorkflowNodeExecution.workflow_id == workflow_run.workflow_id, - WorkflowNodeExecution.workflow_run_id == workflow_run.id, - WorkflowNodeExecution.status == WorkflowNodeExecutionStatus.RUNNING.value, - ) - .first() - ) - - node_run_index = failed_execution.index - workflow_node_execution.index = node_run_index - - session.add(workflow_node_execution) - session.commit() - session.refresh(workflow_node_execution) + db.session.add(workflow_node_execution) + db.session.commit() + db.session.refresh(workflow_node_execution) return workflow_node_execution diff --git a/api/core/workflow/entities/node_entities.py b/api/core/workflow/entities/node_entities.py index 976a5ef74e..ca01dcd7d8 100644 --- a/api/core/workflow/entities/node_entities.py +++ b/api/core/workflow/entities/node_entities.py @@ -45,3 +45,6 @@ class NodeRunResult(BaseModel): error: Optional[str] = None # error message if status is failed error_type: Optional[str] = None # error type if status is failed + + # single step node run retry + retry_index: int = 0 diff --git a/api/core/workflow/graph_engine/entities/event.py b/api/core/workflow/graph_engine/entities/event.py index 4efe0f4ed9..9997153164 100644 --- a/api/core/workflow/graph_engine/entities/event.py +++ b/api/core/workflow/graph_engine/entities/event.py @@ -101,6 +101,7 @@ class NodeRunRetryEvent(BaseNodeEvent): error: str = Field(..., description="error") retry_index: int = Field(..., description="which retry attempt is about to be performed") start_at: datetime = Field(..., description="retry start time") + start_index: int = Field(..., description="retry start index") ########################################### diff --git a/api/core/workflow/graph_engine/graph_engine.py b/api/core/workflow/graph_engine/graph_engine.py index 78147f308e..e292b09968 100644 --- a/api/core/workflow/graph_engine/graph_engine.py +++ b/api/core/workflow/graph_engine/graph_engine.py @@ -610,7 +610,7 @@ class GraphEngine: db.session.close() max_retries = node_instance.node_data.retry_config.max_retries - retry_interval = node_instance.node_data.retry_config.retry_interval + retry_interval = node_instance.node_data.retry_config.retry_interval_seconds retries = 0 shoudl_continue_retry = True while shoudl_continue_retry and retries <= max_retries: @@ -641,6 +641,8 @@ class GraphEngine: run_result.status = WorkflowNodeExecutionStatus.SUCCEEDED if node_instance.should_retry and retries < max_retries: retries += 1 + self.graph_runtime_state.node_run_steps += 1 + route_node_state.node_run_result = run_result yield NodeRunRetryEvent( id=node_instance.id, node_id=node_instance.node_id, @@ -654,8 +656,9 @@ class GraphEngine: parent_parallel_id=parent_parallel_id, parent_parallel_start_node_id=parent_parallel_start_node_id, start_at=retry_start_at, + start_index=self.graph_runtime_state.node_run_steps, ) - time.sleep(retry_interval / 1000) + time.sleep(retry_interval) continue route_node_state.set_finished(run_result=run_result) diff --git a/api/core/workflow/nodes/base/entities.py b/api/core/workflow/nodes/base/entities.py index bc6eed78bb..529fd7be74 100644 --- a/api/core/workflow/nodes/base/entities.py +++ b/api/core/workflow/nodes/base/entities.py @@ -113,6 +113,10 @@ class RetryConfig(BaseModel): retry_interval: int = 0 # retry interval in milliseconds retry_enabled: bool = False # whether retry is enabled + @property + def retry_interval_seconds(self) -> float: + return self.retry_interval / 1000 + class BaseNodeData(ABC, BaseModel): title: str diff --git a/api/core/workflow/nodes/event/event.py b/api/core/workflow/nodes/event/event.py index 2c6a745507..0dc35e7d77 100644 --- a/api/core/workflow/nodes/event/event.py +++ b/api/core/workflow/nodes/event/event.py @@ -4,6 +4,7 @@ from pydantic import BaseModel, Field from core.model_runtime.entities.llm_entities import LLMUsage from core.workflow.entities.node_entities import NodeRunResult +from models.workflow import WorkflowNodeExecutionStatus class RunCompletedEvent(BaseModel): @@ -36,3 +37,17 @@ class RunRetryEvent(BaseModel): error: str = Field(..., description="error") retry_index: int = Field(..., description="Retry attempt number") start_at: datetime = Field(..., description="Retry start time") + + +class SingleStepRetryEvent(BaseModel): + """Single step retry event""" + + status: str = WorkflowNodeExecutionStatus.RETRY.value + + inputs: dict | None = Field(..., description="input") + error: str = Field(..., description="error") + outputs: dict = Field(..., description="output") + retry_index: int = Field(..., description="Retry attempt number") + error: str = Field(..., description="error") + elapsed_time: float = Field(..., description="elapsed time") + execution_metadata: dict | None = Field(..., description="execution metadata") diff --git a/api/fields/workflow_run_fields.py b/api/fields/workflow_run_fields.py index da3e353cd6..7c01ffc2c6 100644 --- a/api/fields/workflow_run_fields.py +++ b/api/fields/workflow_run_fields.py @@ -81,6 +81,17 @@ workflow_run_detail_fields = { "exceptions_count": fields.Integer, } +retry_event_field = { + "error": fields.String, + "retry_index": fields.Integer, + "inputs": fields.Raw(attribute="inputs"), + "elapsed_time": fields.Float, + "execution_metadata": fields.Raw(attribute="execution_metadata_dict"), + "status": fields.String, + "outputs": fields.Raw(attribute="outputs"), +} + + workflow_run_node_execution_fields = { "id": fields.String, "index": fields.Integer, @@ -101,6 +112,7 @@ workflow_run_node_execution_fields = { "created_by_account": fields.Nested(simple_account_fields, attribute="created_by_account", allow_null=True), "created_by_end_user": fields.Nested(simple_end_user_fields, attribute="created_by_end_user", allow_null=True), "finished_at": TimestampField, + "retry_events": fields.List(fields.Nested(retry_event_field)), } workflow_run_node_execution_list_fields = { diff --git a/api/services/workflow_service.py b/api/services/workflow_service.py index 84768d5af0..ead552d6c2 100644 --- a/api/services/workflow_service.py +++ b/api/services/workflow_service.py @@ -15,6 +15,7 @@ from core.workflow.nodes.base.entities import BaseNodeData from core.workflow.nodes.base.node import BaseNode from core.workflow.nodes.enums import ErrorStrategy from core.workflow.nodes.event import RunCompletedEvent +from core.workflow.nodes.event.event import SingleStepRetryEvent from core.workflow.nodes.node_mapping import LATEST_VERSION, NODE_TYPE_CLASSES_MAPPING from core.workflow.workflow_entry import WorkflowEntry from events.app_event import app_draft_workflow_was_synced, app_published_workflow_was_updated @@ -220,56 +221,99 @@ class WorkflowService: # run draft workflow node start_at = time.perf_counter() + retries = 0 + max_retries = 0 + should_retry = True + retry_events = [] try: - node_instance, generator = WorkflowEntry.single_step_run( - workflow=draft_workflow, - node_id=node_id, - user_inputs=user_inputs, - user_id=account.id, - ) - node_instance = cast(BaseNode[BaseNodeData], node_instance) - node_run_result: NodeRunResult | None = None - for event in generator: - if isinstance(event, RunCompletedEvent): - node_run_result = event.run_result + while retries <= max_retries and should_retry: + retry_start_at = time.perf_counter() + node_instance, generator = WorkflowEntry.single_step_run( + workflow=draft_workflow, + node_id=node_id, + user_inputs=user_inputs, + user_id=account.id, + ) + node_instance = cast(BaseNode[BaseNodeData], node_instance) + max_retries = ( + node_instance.node_data.retry_config.max_retries if node_instance.node_data.retry_config else 0 + ) + retry_interval = node_instance.node_data.retry_config.retry_interval_seconds + node_run_result: NodeRunResult | None = None + for event in generator: + if isinstance(event, RunCompletedEvent): + node_run_result = event.run_result - # sign output files - node_run_result.outputs = WorkflowEntry.handle_special_values(node_run_result.outputs) - break + # sign output files + node_run_result.outputs = WorkflowEntry.handle_special_values(node_run_result.outputs) + break - if not node_run_result: - raise ValueError("Node run failed with no run result") - # single step debug mode error handling return - if node_run_result.status == WorkflowNodeExecutionStatus.FAILED and node_instance.should_continue_on_error: - node_error_args = { - "status": WorkflowNodeExecutionStatus.EXCEPTION, - "error": node_run_result.error, - "inputs": node_run_result.inputs, - "metadata": {"error_strategy": node_instance.node_data.error_strategy}, - } - if node_instance.node_data.error_strategy is ErrorStrategy.DEFAULT_VALUE: - node_run_result = NodeRunResult( - **node_error_args, - outputs={ - **node_instance.node_data.default_value_dict, - "error_message": node_run_result.error, - "error_type": node_run_result.error_type, - }, - ) - else: - node_run_result = NodeRunResult( - **node_error_args, - outputs={ - "error_message": node_run_result.error, - "error_type": node_run_result.error_type, - }, - ) - run_succeeded = node_run_result.status in ( - WorkflowNodeExecutionStatus.SUCCEEDED, - WorkflowNodeExecutionStatus.EXCEPTION, - ) - error = node_run_result.error if not run_succeeded else None + if not node_run_result: + raise ValueError("Node run failed with no run result") + # single step debug mode error handling return + if node_run_result.status == WorkflowNodeExecutionStatus.FAILED: + if ( + retries == max_retries + and node_instance.node_type == NodeType.HTTP_REQUEST + and node_run_result.outputs + and not node_instance.should_continue_on_error + ): + node_run_result.status = WorkflowNodeExecutionStatus.SUCCEEDED + should_retry = False + else: + if node_instance.should_retry: + node_run_result.status = WorkflowNodeExecutionStatus.RETRY + retries += 1 + node_run_result.retry_index = retries + retry_events.append( + SingleStepRetryEvent( + inputs=WorkflowEntry.handle_special_values(node_run_result.inputs) + if node_run_result.inputs + else None, + error=node_run_result.error, + outputs=WorkflowEntry.handle_special_values(node_run_result.outputs) + if node_run_result.outputs + else None, + retry_index=node_run_result.retry_index, + elapsed_time=time.perf_counter() - retry_start_at, + execution_metadata=WorkflowEntry.handle_special_values(node_run_result.metadata) + if node_run_result.metadata + else None, + ) + ) + time.sleep(retry_interval) + else: + should_retry = False + if node_instance.should_continue_on_error: + node_error_args = { + "status": WorkflowNodeExecutionStatus.EXCEPTION, + "error": node_run_result.error, + "inputs": node_run_result.inputs, + "metadata": {"error_strategy": node_instance.node_data.error_strategy}, + } + if node_instance.node_data.error_strategy is ErrorStrategy.DEFAULT_VALUE: + node_run_result = NodeRunResult( + **node_error_args, + outputs={ + **node_instance.node_data.default_value_dict, + "error_message": node_run_result.error, + "error_type": node_run_result.error_type, + }, + ) + else: + node_run_result = NodeRunResult( + **node_error_args, + outputs={ + "error_message": node_run_result.error, + "error_type": node_run_result.error_type, + }, + ) + run_succeeded = node_run_result.status in ( + WorkflowNodeExecutionStatus.SUCCEEDED, + WorkflowNodeExecutionStatus.EXCEPTION, + ) + error = node_run_result.error if not run_succeeded else None except WorkflowNodeRunFailedError as e: node_instance = e.node_instance run_succeeded = False @@ -318,6 +362,7 @@ class WorkflowService: db.session.add(workflow_node_execution) db.session.commit() + workflow_node_execution.retry_events = retry_events return workflow_node_execution