diff --git a/api/controllers/console/app/workflow.py b/api/controllers/console/app/workflow.py index 58393a978d..f228c3ec4a 100644 --- a/api/controllers/console/app/workflow.py +++ b/api/controllers/console/app/workflow.py @@ -15,7 +15,7 @@ from core.app.apps.base_app_queue_manager import AppQueueManager from core.app.entities.app_invoke_entities import InvokeFrom from factories import variable_factory from fields.workflow_fields import workflow_fields -from fields.workflow_run_fields import single_step_node_execution_fields +from fields.workflow_run_fields import workflow_run_node_execution_fields from libs import helper from libs.helper import TimestampField, uuid_value from libs.login import current_user, login_required @@ -285,7 +285,7 @@ class DraftWorkflowNodeRunApi(Resource): @login_required @account_initialization_required @get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW]) - @marshal_with(single_step_node_execution_fields) + @marshal_with(workflow_run_node_execution_fields) def post(self, app_model: App, node_id: str): """ Run draft workflow node diff --git a/api/core/helper/ssrf_proxy.py b/api/core/helper/ssrf_proxy.py index 6153becc2a..425b3535c4 100644 --- a/api/core/helper/ssrf_proxy.py +++ b/api/core/helper/ssrf_proxy.py @@ -45,6 +45,7 @@ def make_request(method, url, max_retries=SSRF_DEFAULT_MAX_RETRIES, **kwargs): ) retries = 0 + stream = kwargs.pop("stream", False) while retries <= max_retries: try: if dify_config.SSRF_PROXY_ALL_URL: @@ -63,15 +64,14 @@ def make_request(method, url, max_retries=SSRF_DEFAULT_MAX_RETRIES, **kwargs): logging.warning(f"Received status code {response.status_code} for URL {url} which is in the force list") except httpx.RequestError as e: + logging.warning(f"Request to URL {url} failed on attempt {retries + 1}: {e}") if max_retries == 0: raise - logging.warning(f"Request to URL {url} failed on attempt {retries + 1}: {e}") retries += 1 if retries <= max_retries: time.sleep(BACKOFF_FACTOR * (2 ** (retries - 1))) - if max_retries != 0: - raise MaxRetriesExceededError(f"Reached maximum retries ({max_retries}) for URL {url}") + raise MaxRetriesExceededError(f"Reached maximum retries ({max_retries}) for URL {url}") def get(url, max_retries=SSRF_DEFAULT_MAX_RETRIES, **kwargs): diff --git a/api/fields/workflow_run_fields.py b/api/fields/workflow_run_fields.py index caca88a9b7..74fdf8bd97 100644 --- a/api/fields/workflow_run_fields.py +++ b/api/fields/workflow_run_fields.py @@ -116,11 +116,6 @@ workflow_run_node_execution_fields = { "finished_at": TimestampField, } -single_step_node_execution_fields = { - **workflow_run_node_execution_fields, - "retry_events": fields.List(fields.Nested(retry_event_field)), -} - workflow_run_node_execution_list_fields = { "data": fields.List(fields.Nested(workflow_run_node_execution_fields)), } diff --git a/api/services/workflow_service.py b/api/services/workflow_service.py index baa69b3d8d..84768d5af0 100644 --- a/api/services/workflow_service.py +++ b/api/services/workflow_service.py @@ -15,7 +15,6 @@ from core.workflow.nodes.base.entities import BaseNodeData from core.workflow.nodes.base.node import BaseNode from core.workflow.nodes.enums import ErrorStrategy from core.workflow.nodes.event import RunCompletedEvent -from core.workflow.nodes.event.event import SingleStepRetryEvent from core.workflow.nodes.node_mapping import LATEST_VERSION, NODE_TYPE_CLASSES_MAPPING from core.workflow.workflow_entry import WorkflowEntry from events.app_event import app_draft_workflow_was_synced, app_published_workflow_was_updated @@ -221,95 +220,56 @@ class WorkflowService: # run draft workflow node start_at = time.perf_counter() - retries = 0 - max_retries = 0 - should_retry = True - retry_events = [] try: - while retries <= max_retries and should_retry: - retry_start_at = time.perf_counter() - node_instance, generator = WorkflowEntry.single_step_run( - workflow=draft_workflow, - node_id=node_id, - user_inputs=user_inputs, - user_id=account.id, - ) - node_instance = cast(BaseNode[BaseNodeData], node_instance) - max_retries = ( - node_instance.node_data.retry_config.max_retries if node_instance.node_data.retry_config else 0 - ) - retry_interval = node_instance.node_data.retry_config.retry_interval_seconds - node_run_result: NodeRunResult | None = None - for event in generator: - if isinstance(event, RunCompletedEvent): - node_run_result = event.run_result + node_instance, generator = WorkflowEntry.single_step_run( + workflow=draft_workflow, + node_id=node_id, + user_inputs=user_inputs, + user_id=account.id, + ) + node_instance = cast(BaseNode[BaseNodeData], node_instance) + node_run_result: NodeRunResult | None = None + for event in generator: + if isinstance(event, RunCompletedEvent): + node_run_result = event.run_result - # sign output files - node_run_result.outputs = WorkflowEntry.handle_special_values(node_run_result.outputs) - break + # sign output files + node_run_result.outputs = WorkflowEntry.handle_special_values(node_run_result.outputs) + break - if not node_run_result: - raise ValueError("Node run failed with no run result") - # single step debug mode error handling return - if node_run_result.status == WorkflowNodeExecutionStatus.FAILED: - if ( - retries == max_retries - and node_instance.node_type == NodeType.HTTP_REQUEST - and node_run_result.outputs - and not node_instance.should_continue_on_error - ): - node_run_result.status = WorkflowNodeExecutionStatus.SUCCEEDED - should_retry = False - else: - if node_instance.should_retry: - node_run_result.status = WorkflowNodeExecutionStatus.RETRY - retries += 1 - node_run_result.retry_index = retries - retry_events.append( - SingleStepRetryEvent( - elapsed_time=time.perf_counter() - retry_start_at, - inputs=WorkflowEntry.handle_special_values(node_run_result.inputs), - process_data=WorkflowEntry.handle_special_values(node_run_result.process_data), - outputs=WorkflowEntry.handle_special_values(node_run_result.outputs), - metadata=node_run_result.metadata, - llm_usage=node_run_result.llm_usage, - error=node_run_result.error, - retry_index=node_run_result.retry_index, - ) - ) - time.sleep(retry_interval) - else: - should_retry = False - if node_instance.should_continue_on_error: - node_error_args = { - "status": WorkflowNodeExecutionStatus.EXCEPTION, - "error": node_run_result.error, - "inputs": node_run_result.inputs, - "metadata": {"error_strategy": node_instance.node_data.error_strategy}, - } - if node_instance.node_data.error_strategy is ErrorStrategy.DEFAULT_VALUE: - node_run_result = NodeRunResult( - **node_error_args, - outputs={ - **node_instance.node_data.default_value_dict, - "error_message": node_run_result.error, - "error_type": node_run_result.error_type, - }, - ) - else: - node_run_result = NodeRunResult( - **node_error_args, - outputs={ - "error_message": node_run_result.error, - "error_type": node_run_result.error_type, - }, - ) - run_succeeded = node_run_result.status in ( - WorkflowNodeExecutionStatus.SUCCEEDED, - WorkflowNodeExecutionStatus.EXCEPTION, - ) - error = node_run_result.error if not run_succeeded else None + if not node_run_result: + raise ValueError("Node run failed with no run result") + # single step debug mode error handling return + if node_run_result.status == WorkflowNodeExecutionStatus.FAILED and node_instance.should_continue_on_error: + node_error_args = { + "status": WorkflowNodeExecutionStatus.EXCEPTION, + "error": node_run_result.error, + "inputs": node_run_result.inputs, + "metadata": {"error_strategy": node_instance.node_data.error_strategy}, + } + if node_instance.node_data.error_strategy is ErrorStrategy.DEFAULT_VALUE: + node_run_result = NodeRunResult( + **node_error_args, + outputs={ + **node_instance.node_data.default_value_dict, + "error_message": node_run_result.error, + "error_type": node_run_result.error_type, + }, + ) + else: + node_run_result = NodeRunResult( + **node_error_args, + outputs={ + "error_message": node_run_result.error, + "error_type": node_run_result.error_type, + }, + ) + run_succeeded = node_run_result.status in ( + WorkflowNodeExecutionStatus.SUCCEEDED, + WorkflowNodeExecutionStatus.EXCEPTION, + ) + error = node_run_result.error if not run_succeeded else None except WorkflowNodeRunFailedError as e: node_instance = e.node_instance run_succeeded = False @@ -358,7 +318,6 @@ class WorkflowService: db.session.add(workflow_node_execution) db.session.commit() - workflow_node_execution.retry_events = retry_events return workflow_node_execution