diff --git a/api/controllers/console/app/workflow.py b/api/controllers/console/app/workflow.py index f228c3ec4a..58393a978d 100644 --- a/api/controllers/console/app/workflow.py +++ b/api/controllers/console/app/workflow.py @@ -15,7 +15,7 @@ from core.app.apps.base_app_queue_manager import AppQueueManager from core.app.entities.app_invoke_entities import InvokeFrom from factories import variable_factory from fields.workflow_fields import workflow_fields -from fields.workflow_run_fields import workflow_run_node_execution_fields +from fields.workflow_run_fields import single_step_node_execution_fields from libs import helper from libs.helper import TimestampField, uuid_value from libs.login import current_user, login_required @@ -285,7 +285,7 @@ class DraftWorkflowNodeRunApi(Resource): @login_required @account_initialization_required @get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW]) - @marshal_with(workflow_run_node_execution_fields) + @marshal_with(single_step_node_execution_fields) def post(self, app_model: App, node_id: str): """ Run draft workflow node diff --git a/api/core/app/apps/advanced_chat/generate_task_pipeline.py b/api/core/app/apps/advanced_chat/generate_task_pipeline.py index ce0e959627..8e1731b314 100644 --- a/api/core/app/apps/advanced_chat/generate_task_pipeline.py +++ b/api/core/app/apps/advanced_chat/generate_task_pipeline.py @@ -291,6 +291,22 @@ class AdvancedChatAppGenerateTaskPipeline(BasedGenerateTaskPipeline, WorkflowCyc yield self._workflow_start_to_stream_response( task_id=self._application_generate_entity.task_id, workflow_run=workflow_run ) + elif isinstance( + event, + QueueNodeRetryEvent, + ): + workflow_node_execution = self._handle_workflow_node_execution_retried( + workflow_run=workflow_run, event=event + ) + + response = self._workflow_node_retry_to_stream_response( + event=event, + task_id=self._application_generate_entity.task_id, + workflow_node_execution=workflow_node_execution, + ) + + if response: + yield response elif isinstance(event, QueueNodeStartedEvent): if not workflow_run: raise Exception("Workflow run not initialized.") @@ -331,22 +347,7 @@ class AdvancedChatAppGenerateTaskPipeline(BasedGenerateTaskPipeline, WorkflowCyc if response: yield response - elif isinstance( - event, - QueueNodeRetryEvent, - ): - workflow_node_execution = self._handle_workflow_node_execution_retried( - workflow_run=workflow_run, event=event - ) - response = self._workflow_node_retry_to_stream_response( - event=event, - task_id=self._application_generate_entity.task_id, - workflow_node_execution=workflow_node_execution, - ) - - if response: - yield response elif isinstance(event, QueueParallelBranchRunStartedEvent): if not workflow_run: raise Exception("Workflow run not initialized.") diff --git a/api/core/app/apps/workflow/generate_task_pipeline.py b/api/core/app/apps/workflow/generate_task_pipeline.py index 79e5e2bcb9..b129904efb 100644 --- a/api/core/app/apps/workflow/generate_task_pipeline.py +++ b/api/core/app/apps/workflow/generate_task_pipeline.py @@ -254,6 +254,22 @@ class WorkflowAppGenerateTaskPipeline(BasedGenerateTaskPipeline, WorkflowCycleMa yield self._workflow_start_to_stream_response( task_id=self._application_generate_entity.task_id, workflow_run=workflow_run ) + elif isinstance( + event, + QueueNodeRetryEvent, + ): + workflow_node_execution = self._handle_workflow_node_execution_retried( + workflow_run=workflow_run, event=event + ) + + response = self._workflow_node_retry_to_stream_response( + event=event, + task_id=self._application_generate_entity.task_id, + workflow_node_execution=workflow_node_execution, + ) + + if response: + yield response elif isinstance(event, QueueNodeStartedEvent): if not workflow_run: raise Exception("Workflow run not initialized.") @@ -289,22 +305,6 @@ class WorkflowAppGenerateTaskPipeline(BasedGenerateTaskPipeline, WorkflowCycleMa ) if node_failed_response: yield node_failed_response - elif isinstance( - event, - QueueNodeRetryEvent, - ): - workflow_node_execution = self._handle_workflow_node_execution_retried( - workflow_run=workflow_run, event=event - ) - - response = self._workflow_node_retry_to_stream_response( - event=event, - task_id=self._application_generate_entity.task_id, - workflow_node_execution=workflow_node_execution, - ) - - if response: - yield response elif isinstance(event, QueueParallelBranchRunStartedEvent): if not workflow_run: diff --git a/api/core/app/apps/workflow_app_runner.py b/api/core/app/apps/workflow_app_runner.py index 2fbf711175..bf3509c7a0 100644 --- a/api/core/app/apps/workflow_app_runner.py +++ b/api/core/app/apps/workflow_app_runner.py @@ -188,6 +188,38 @@ class WorkflowBasedAppRunner(AppRunner): ) elif isinstance(event, GraphRunFailedEvent): self._publish_event(QueueWorkflowFailedEvent(error=event.error, exceptions_count=event.exceptions_count)) + elif isinstance(event, NodeRunRetryEvent): + self._publish_event( + QueueNodeRetryEvent( + node_execution_id=event.id, + node_id=event.node_id, + node_type=event.node_type, + node_data=event.node_data, + parallel_id=event.parallel_id, + parallel_start_node_id=event.parallel_start_node_id, + parent_parallel_id=event.parent_parallel_id, + parent_parallel_start_node_id=event.parent_parallel_start_node_id, + start_at=event.start_at, + node_run_index=event.node_run_index, + predecessor_node_id=event.predecessor_node_id, + in_iteration_id=event.in_iteration_id, + parallel_mode_run_id=event.parallel_mode_run_id, + inputs=event.route_node_state.node_run_result.inputs + if event.route_node_state.node_run_result + else {}, + process_data=event.route_node_state.node_run_result.process_data + if event.route_node_state.node_run_result + else {}, + outputs=event.route_node_state.node_run_result.outputs + if event.route_node_state.node_run_result + else {}, + error=event.error, + execution_metadata=event.route_node_state.node_run_result.metadata + if event.route_node_state.node_run_result + else {}, + retry_index=event.retry_index, + ) + ) elif isinstance(event, NodeRunStartedEvent): self._publish_event( QueueNodeStartedEvent( @@ -422,36 +454,6 @@ class WorkflowBasedAppRunner(AppRunner): error=event.error if isinstance(event, IterationRunFailedEvent) else None, ) ) - elif isinstance(event, NodeRunRetryEvent): - self._publish_event( - QueueNodeRetryEvent( - node_execution_id=event.id, - node_id=event.node_id, - node_type=event.node_type, - node_data=event.node_data, - parallel_id=event.parallel_id, - parallel_start_node_id=event.parallel_start_node_id, - parent_parallel_id=event.parent_parallel_id, - parent_parallel_start_node_id=event.parent_parallel_start_node_id, - start_at=event.start_at, - inputs=event.route_node_state.node_run_result.inputs - if event.route_node_state.node_run_result - else {}, - process_data=event.route_node_state.node_run_result.process_data - if event.route_node_state.node_run_result - else {}, - outputs=event.route_node_state.node_run_result.outputs - if event.route_node_state.node_run_result - else {}, - error=event.error, - execution_metadata=event.route_node_state.node_run_result.metadata - if event.route_node_state.node_run_result - else {}, - in_iteration_id=event.in_iteration_id, - retry_index=event.retry_index, - start_index=event.start_index, - ) - ) def get_workflow(self, app_model: App, workflow_id: str) -> Optional[Workflow]: """ diff --git a/api/core/app/entities/queue_entities.py b/api/core/app/entities/queue_entities.py index 49b7e80246..3c9f05de5b 100644 --- a/api/core/app/entities/queue_entities.py +++ b/api/core/app/entities/queue_entities.py @@ -314,27 +314,11 @@ class QueueNodeSucceededEvent(AppQueueEvent): iteration_duration_map: Optional[dict[str, float]] = None -class QueueNodeRetryEvent(AppQueueEvent): +class QueueNodeRetryEvent(QueueNodeStartedEvent): """QueueNodeRetryEvent entity""" event: QueueEvent = QueueEvent.RETRY - node_execution_id: str - node_id: str - node_type: NodeType - node_data: BaseNodeData - parallel_id: Optional[str] = None - """parallel id if node is in parallel""" - parallel_start_node_id: Optional[str] = None - """parallel start node id if node is in parallel""" - parent_parallel_id: Optional[str] = None - """parent parallel id if node is in parallel""" - parent_parallel_start_node_id: Optional[str] = None - """parent parallel start node id if node is in parallel""" - in_iteration_id: Optional[str] = None - """iteration id if node is in iteration""" - start_at: datetime - inputs: Optional[dict[str, Any]] = None process_data: Optional[dict[str, Any]] = None outputs: Optional[dict[str, Any]] = None @@ -342,7 +326,6 @@ class QueueNodeRetryEvent(AppQueueEvent): error: str retry_index: int # retry index - start_index: int # start index class QueueNodeInIterationFailedEvent(AppQueueEvent): diff --git a/api/core/app/task_pipeline/workflow_cycle_manage.py b/api/core/app/task_pipeline/workflow_cycle_manage.py index e2fa12b1cd..951fef1fa1 100644 --- a/api/core/app/task_pipeline/workflow_cycle_manage.py +++ b/api/core/app/task_pipeline/workflow_cycle_manage.py @@ -445,6 +445,7 @@ class WorkflowCycleManage: workflow_node_execution.workflow_id = workflow_run.workflow_id workflow_node_execution.triggered_from = WorkflowNodeExecutionTriggeredFrom.WORKFLOW_RUN.value workflow_node_execution.workflow_run_id = workflow_run.id + workflow_node_execution.predecessor_node_id = event.predecessor_node_id workflow_node_execution.node_execution_id = event.node_execution_id workflow_node_execution.node_id = event.node_id workflow_node_execution.node_type = event.node_type.value @@ -461,9 +462,11 @@ class WorkflowCycleManage: workflow_node_execution.execution_metadata = json.dumps( { NodeRunMetadataKey.ITERATION_ID: event.in_iteration_id, + NodeRunMetadataKey.PARALLEL_MODE_RUN_ID: event.parallel_mode_run_id, + NodeRunMetadataKey.ITERATION_ID: event.in_iteration_id, } ) - workflow_node_execution.index = event.start_index + workflow_node_execution.index = event.node_run_index db.session.add(workflow_node_execution) db.session.commit() diff --git a/api/core/helper/ssrf_proxy.py b/api/core/helper/ssrf_proxy.py index d8aa805364..6153becc2a 100644 --- a/api/core/helper/ssrf_proxy.py +++ b/api/core/helper/ssrf_proxy.py @@ -63,13 +63,15 @@ def make_request(method, url, max_retries=SSRF_DEFAULT_MAX_RETRIES, **kwargs): logging.warning(f"Received status code {response.status_code} for URL {url} which is in the force list") except httpx.RequestError as e: + if max_retries == 0: + raise logging.warning(f"Request to URL {url} failed on attempt {retries + 1}: {e}") retries += 1 if retries <= max_retries: time.sleep(BACKOFF_FACTOR * (2 ** (retries - 1))) - - raise MaxRetriesExceededError(f"Reached maximum retries ({max_retries}) for URL {url}") + if max_retries != 0: + raise MaxRetriesExceededError(f"Reached maximum retries ({max_retries}) for URL {url}") def get(url, max_retries=SSRF_DEFAULT_MAX_RETRIES, **kwargs): diff --git a/api/core/workflow/graph_engine/entities/event.py b/api/core/workflow/graph_engine/entities/event.py index 9997153164..396b10747f 100644 --- a/api/core/workflow/graph_engine/entities/event.py +++ b/api/core/workflow/graph_engine/entities/event.py @@ -97,11 +97,11 @@ class NodeInIterationFailedEvent(BaseNodeEvent): error: str = Field(..., description="error") -class NodeRunRetryEvent(BaseNodeEvent): +class NodeRunRetryEvent(NodeRunStartedEvent): error: str = Field(..., description="error") retry_index: int = Field(..., description="which retry attempt is about to be performed") start_at: datetime = Field(..., description="retry start time") - start_index: int = Field(..., description="retry start index") + node_run_index: int = Field(..., description="retry run index") ########################################### diff --git a/api/core/workflow/graph_engine/graph_engine.py b/api/core/workflow/graph_engine/graph_engine.py index e292b09968..2bb74babe7 100644 --- a/api/core/workflow/graph_engine/graph_engine.py +++ b/api/core/workflow/graph_engine/graph_engine.py @@ -649,14 +649,15 @@ class GraphEngine: node_type=node_instance.node_type, node_data=node_instance.node_data, route_node_state=route_node_state, - error=run_result.error, - retry_index=retries, + predecessor_node_id=node_instance.previous_node_id, parallel_id=parallel_id, parallel_start_node_id=parallel_start_node_id, parent_parallel_id=parent_parallel_id, parent_parallel_start_node_id=parent_parallel_start_node_id, + error=run_result.error, + retry_index=retries, start_at=retry_start_at, - start_index=self.graph_runtime_state.node_run_steps, + node_run_index=self.graph_runtime_state.node_run_steps, ) time.sleep(retry_interval) continue diff --git a/api/core/workflow/nodes/event/event.py b/api/core/workflow/nodes/event/event.py index 0dc35e7d77..65147bc406 100644 --- a/api/core/workflow/nodes/event/event.py +++ b/api/core/workflow/nodes/event/event.py @@ -46,7 +46,7 @@ class SingleStepRetryEvent(BaseModel): inputs: dict | None = Field(..., description="input") error: str = Field(..., description="error") - outputs: dict = Field(..., description="output") + outputs: dict | None = Field(..., description="output") retry_index: int = Field(..., description="Retry attempt number") error: str = Field(..., description="error") elapsed_time: float = Field(..., description="elapsed time") diff --git a/api/core/workflow/nodes/http_request/executor.py b/api/core/workflow/nodes/http_request/executor.py index 92f190091b..038197317c 100644 --- a/api/core/workflow/nodes/http_request/executor.py +++ b/api/core/workflow/nodes/http_request/executor.py @@ -251,6 +251,8 @@ class Executor: response = getattr(ssrf_proxy, self.method)(**request_args) except ssrf_proxy.MaxRetriesExceededError as e: raise HttpRequestNodeError(str(e)) + except httpx.RequestError as e: + raise HttpRequestNodeError(str(e)) return response def invoke(self) -> Response: diff --git a/api/fields/workflow_run_fields.py b/api/fields/workflow_run_fields.py index 7c01ffc2c6..99c8dc1004 100644 --- a/api/fields/workflow_run_fields.py +++ b/api/fields/workflow_run_fields.py @@ -112,6 +112,10 @@ workflow_run_node_execution_fields = { "created_by_account": fields.Nested(simple_account_fields, attribute="created_by_account", allow_null=True), "created_by_end_user": fields.Nested(simple_end_user_fields, attribute="created_by_end_user", allow_null=True), "finished_at": TimestampField, +} + +single_step_node_execution_fields = { + **workflow_run_node_execution_fields, "retry_events": fields.List(fields.Nested(retry_event_field)), } diff --git a/api/migrations/versions/2024_12_20_0628-e1944c35e15e_add_retry_index_field_to_node_execution_.py b/api/migrations/versions/2024_12_20_0628-e1944c35e15e_add_retry_index_field_to_node_execution_.py deleted file mode 100644 index 3254c23c96..0000000000 --- a/api/migrations/versions/2024_12_20_0628-e1944c35e15e_add_retry_index_field_to_node_execution_.py +++ /dev/null @@ -1,33 +0,0 @@ -"""add retry_index field to node-execution model - -Revision ID: e1944c35e15e -Revises: 11b07f66c737 -Create Date: 2024-12-20 06:28:30.287197 - -""" -from alembic import op -import models as models -import sqlalchemy as sa - - -# revision identifiers, used by Alembic. -revision = 'e1944c35e15e' -down_revision = '11b07f66c737' -branch_labels = None -depends_on = None - - -def upgrade(): - # ### commands auto generated by Alembic - please adjust! ### - with op.batch_alter_table('workflow_node_executions', schema=None) as batch_op: - batch_op.add_column(sa.Column('retry_index', sa.Integer(), server_default=sa.text('0'), nullable=True)) - - # ### end Alembic commands ### - - -def downgrade(): - # ### commands auto generated by Alembic - please adjust! ### - with op.batch_alter_table('workflow_node_executions', schema=None) as batch_op: - batch_op.drop_column('retry_index') - - # ### end Alembic commands ### diff --git a/api/models/workflow.py b/api/models/workflow.py index e933382a84..51a6fbc8c8 100644 --- a/api/models/workflow.py +++ b/api/models/workflow.py @@ -640,7 +640,6 @@ class WorkflowNodeExecution(db.Model): created_by_role = db.Column(db.String(255), nullable=False) created_by = db.Column(StringUUID, nullable=False) finished_at = db.Column(db.DateTime) - retry_index = db.Column(db.Integer, server_default=db.text("0")) @property def created_by_account(self):