diff --git a/api/controllers/console/app/workflow.py b/api/controllers/console/app/workflow.py index f228c3ec4a..e8a65b1b4e 100644 --- a/api/controllers/console/app/workflow.py +++ b/api/controllers/console/app/workflow.py @@ -440,6 +440,31 @@ class WorkflowConfigApi(Resource): } +class DraftWorkflowNodeRetriableApi(Resource): + @setup_required + @login_required + @account_initialization_required + @get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW]) + @marshal_with(workflow_run_node_execution_fields) + def post(self, app_model: App, node_id: str): + """ + Run draft workflow node + """ + # The role of the current user in the ta table must be admin, owner, or editor + if not current_user.is_editor: + raise Forbidden() + + parser = reqparse.RequestParser() + parser.add_argument("inputs", type=dict, required=True, nullable=False, location="json") + args = parser.parse_args() + workflow_service = WorkflowService() + workflow_node_execution = workflow_service.run_retriable_draft_workflow_node( + app_model=app_model, node_id=node_id, user_inputs=args.get("inputs", {}), account=current_user + ) + + return workflow_node_execution + + api.add_resource(DraftWorkflowApi, "/apps//workflows/draft") api.add_resource(WorkflowConfigApi, "/apps//workflows/draft/config") api.add_resource(AdvancedChatDraftWorkflowRunApi, "/apps//advanced-chat/workflows/draft/run") @@ -459,3 +484,4 @@ api.add_resource( DefaultBlockConfigApi, "/apps//workflows/default-workflow-block-configs/" ) api.add_resource(ConvertToWorkflowApi, "/apps//convert-to-workflow") +api.add_resource(DraftWorkflowNodeRetriableApi, "/apps//workflows/draft/retry/nodes//run") diff --git a/api/services/workflow_service.py b/api/services/workflow_service.py index 84768d5af0..708a129d0a 100644 --- a/api/services/workflow_service.py +++ b/api/services/workflow_service.py @@ -4,11 +4,12 @@ from collections.abc import Sequence from datetime import UTC, datetime from typing import Optional, cast +from core import workflow from core.app.apps.advanced_chat.app_config_manager import AdvancedChatAppConfigManager from core.app.apps.workflow.app_config_manager import WorkflowAppConfigManager from core.model_runtime.utils.encoders import jsonable_encoder from core.variables import Variable -from core.workflow.entities.node_entities import NodeRunResult +from core.workflow.entities.node_entities import NodeRunMetadataKey, NodeRunResult from core.workflow.errors import WorkflowNodeRunFailedError from core.workflow.nodes import NodeType from core.workflow.nodes.base.entities import BaseNodeData @@ -242,29 +243,7 @@ class WorkflowService: raise ValueError("Node run failed with no run result") # single step debug mode error handling return if node_run_result.status == WorkflowNodeExecutionStatus.FAILED and node_instance.should_continue_on_error: - node_error_args = { - "status": WorkflowNodeExecutionStatus.EXCEPTION, - "error": node_run_result.error, - "inputs": node_run_result.inputs, - "metadata": {"error_strategy": node_instance.node_data.error_strategy}, - } - if node_instance.node_data.error_strategy is ErrorStrategy.DEFAULT_VALUE: - node_run_result = NodeRunResult( - **node_error_args, - outputs={ - **node_instance.node_data.default_value_dict, - "error_message": node_run_result.error, - "error_type": node_run_result.error_type, - }, - ) - else: - node_run_result = NodeRunResult( - **node_error_args, - outputs={ - "error_message": node_run_result.error, - "error_type": node_run_result.error_type, - }, - ) + node_run_result = self._handle_continue_on_error(node_instance, node_run_result) run_succeeded = node_run_result.status in ( WorkflowNodeExecutionStatus.SUCCEEDED, WorkflowNodeExecutionStatus.EXCEPTION, @@ -360,3 +339,143 @@ class WorkflowService: ) else: raise ValueError(f"Invalid app mode: {app_model.mode}") + + def run_retriable_draft_workflow_node( + self, app_model: App, node_id: str, user_inputs: dict, account: Account + ) -> list[WorkflowNodeExecution]: + """ + Run draft retry workflow node + """ + # fetch draft workflow by app_model + draft_workflow = self.get_draft_workflow(app_model=app_model) + if not draft_workflow: + raise ValueError("Workflow not initialized") + # init retry variables + start_at = time.perf_counter() + should_retry = True + retries = 0 + max_retries = 0 + retry_interval = 0 + list_node_executions = [] + + while retries <= max_retries and should_retry: + reties_start_at = time.perf_counter() + should_retry = False + try: + # run draft workflow node + node_instance, generator = WorkflowEntry.single_step_run( + workflow=draft_workflow, + node_id=node_id, + user_inputs=user_inputs, + user_id=account.id, + ) + node_instance = cast(BaseNode[BaseNodeData], node_instance) + max_retries = node_instance.node_data.retry_config.max_retries + retry_interval = node_instance.node_data.retry_config.retry_interval_seconds + node_run_result: NodeRunResult | None = None + for event in generator: + if isinstance(event, RunCompletedEvent): + node_run_result = event.run_result + + # sign output files + node_run_result.outputs = WorkflowEntry.handle_special_values(node_run_result.outputs) + break + + if not node_run_result: + raise ValueError("Node run failed with no run result") + # single step debug mode error handling return + if node_run_result.status == WorkflowNodeExecutionStatus.FAILED: + if node_instance.should_retry and retries < max_retries: + retries += 1 + should_retry = True + node_run_result.status = WorkflowNodeExecutionStatus.RETRY + + elif node_instance.should_continue_on_error: + node_run_result = self._handle_continue_on_error(node_instance, node_run_result) + + elif node_instance.node_type == NodeType.HTTP_REQUEST and node_run_result.outputs: + node_run_result.status = WorkflowNodeExecutionStatus.SUCCEEDED + + error = node_run_result.error or None + except WorkflowNodeRunFailedError as e: + node_instance = e.node_instance + node_run_result = None + error = e.error + + start_at = ( + reties_start_at + if not node_run_result or node_run_result.status == WorkflowNodeExecutionStatus.RETRY + else start_at + ) + + workflow_node_execution = WorkflowNodeExecution() + workflow_node_execution.tenant_id = app_model.tenant_id + workflow_node_execution.app_id = app_model.id + workflow_node_execution.workflow_id = draft_workflow.id + workflow_node_execution.triggered_from = WorkflowNodeExecutionTriggeredFrom.SINGLE_STEP.value + workflow_node_execution.index = 1 + workflow_node_execution.node_id = node_id + workflow_node_execution.node_type = node_instance.node_type + workflow_node_execution.title = node_instance.node_data.title + workflow_node_execution.elapsed_time = time.perf_counter() - start_at + workflow_node_execution.created_by_role = CreatedByRole.ACCOUNT.value + workflow_node_execution.created_by = account.id + workflow_node_execution.created_at = datetime.now(UTC).replace(tzinfo=None) + workflow_node_execution.finished_at = datetime.now(UTC).replace(tzinfo=None) + workflow_node_execution.status = WorkflowNodeExecutionStatus.FAILED.value + workflow_node_execution.error = error or None + + if node_run_result: + # create workflow node execution + inputs = WorkflowEntry.handle_special_values(node_run_result.inputs) if node_run_result.inputs else None + process_data = ( + WorkflowEntry.handle_special_values(node_run_result.process_data) + if node_run_result.process_data + else None + ) + outputs = ( + WorkflowEntry.handle_special_values(node_run_result.outputs) if node_run_result.outputs else None + ) + workflow_node_execution.status = node_run_result.status.value + workflow_node_execution.inputs = json.dumps(inputs) + workflow_node_execution.process_data = json.dumps(process_data) + workflow_node_execution.outputs = json.dumps(outputs) + workflow_node_execution.execution_metadata = ( + json.dumps(jsonable_encoder(node_run_result.metadata)) if node_run_result.metadata else None + ) + + db.session.add(workflow_node_execution) + list_node_executions.append(workflow_node_execution) + db.session.commit() + if should_retry and retry_interval: + time.sleep(retry_interval) + + return list_node_executions + + def _handle_continue_on_error( + self, node_instance: BaseNode[BaseNodeData], node_run_result: NodeRunResult + ) -> NodeRunResult: + node_error_args = { + "status": WorkflowNodeExecutionStatus.EXCEPTION, + "error": node_run_result.error, + "inputs": node_run_result.inputs, + "metadata": {NodeRunMetadataKey.ERROR_STRATEGY: node_instance.node_data.error_strategy}, + } + if node_instance.node_data.error_strategy is ErrorStrategy.DEFAULT_VALUE: + node_run_result = NodeRunResult( + **node_error_args, + outputs={ + **node_instance.node_data.default_value_dict, + "error_message": node_run_result.error, + "error_type": node_run_result.error_type, + }, + ) + else: + node_run_result = NodeRunResult( + **node_error_args, + outputs={ + "error_message": node_run_result.error, + "error_type": node_run_result.error_type, + }, + ) + return node_run_result