From cdc854d15be6c51ebdee33dc697d6c896e87afad Mon Sep 17 00:00:00 2001 From: -LAN- Date: Mon, 23 Dec 2024 12:11:30 +0800 Subject: [PATCH] fix: replace Exception with ValueError for uninitialized workflow run and graph runtime state Signed-off-by: -LAN- --- .../apps/workflow/generate_task_pipeline.py | 28 ++++++++++--------- 1 file changed, 15 insertions(+), 13 deletions(-) diff --git a/api/core/app/apps/workflow/generate_task_pipeline.py b/api/core/app/apps/workflow/generate_task_pipeline.py index fd84908975..bd63c4ec7a 100644 --- a/api/core/app/apps/workflow/generate_task_pipeline.py +++ b/api/core/app/apps/workflow/generate_task_pipeline.py @@ -218,7 +218,7 @@ class WorkflowAppGenerateTaskPipeline(BasedGenerateTaskPipeline, WorkflowCycleMa break else: yield MessageAudioStreamResponse(audio=audio_trunk.audio, task_id=task_id) - except Exception as e: + except Exception: logger.exception(f"Fails to get audio trunk, task_id: {task_id}") break if tts_publisher: @@ -258,6 +258,8 @@ class WorkflowAppGenerateTaskPipeline(BasedGenerateTaskPipeline, WorkflowCycleMa event, QueueNodeRetryEvent, ): + if not workflow_run: + raise ValueError("workflow run not initialized.") workflow_node_execution = self._handle_workflow_node_execution_retried( workflow_run=workflow_run, event=event ) @@ -272,7 +274,7 @@ class WorkflowAppGenerateTaskPipeline(BasedGenerateTaskPipeline, WorkflowCycleMa yield response elif isinstance(event, QueueNodeStartedEvent): if not workflow_run: - raise Exception("Workflow run not initialized.") + raise ValueError("workflow run not initialized.") workflow_node_execution = self._handle_node_execution_start(workflow_run=workflow_run, event=event) @@ -308,45 +310,45 @@ class WorkflowAppGenerateTaskPipeline(BasedGenerateTaskPipeline, WorkflowCycleMa elif isinstance(event, QueueParallelBranchRunStartedEvent): if not workflow_run: - raise Exception("Workflow run not initialized.") + raise ValueError("workflow run not initialized.") yield self._workflow_parallel_branch_start_to_stream_response( task_id=self._application_generate_entity.task_id, workflow_run=workflow_run, event=event ) elif isinstance(event, QueueParallelBranchRunSucceededEvent | QueueParallelBranchRunFailedEvent): if not workflow_run: - raise Exception("Workflow run not initialized.") + raise ValueError("workflow run not initialized.") yield self._workflow_parallel_branch_finished_to_stream_response( task_id=self._application_generate_entity.task_id, workflow_run=workflow_run, event=event ) elif isinstance(event, QueueIterationStartEvent): if not workflow_run: - raise Exception("Workflow run not initialized.") + raise ValueError("workflow run not initialized.") yield self._workflow_iteration_start_to_stream_response( task_id=self._application_generate_entity.task_id, workflow_run=workflow_run, event=event ) elif isinstance(event, QueueIterationNextEvent): if not workflow_run: - raise Exception("Workflow run not initialized.") + raise ValueError("workflow run not initialized.") yield self._workflow_iteration_next_to_stream_response( task_id=self._application_generate_entity.task_id, workflow_run=workflow_run, event=event ) elif isinstance(event, QueueIterationCompletedEvent): if not workflow_run: - raise Exception("Workflow run not initialized.") + raise ValueError("workflow run not initialized.") yield self._workflow_iteration_completed_to_stream_response( task_id=self._application_generate_entity.task_id, workflow_run=workflow_run, event=event ) elif isinstance(event, QueueWorkflowSucceededEvent): if not workflow_run: - raise Exception("Workflow run not initialized.") + raise ValueError("workflow run not initialized.") if not graph_runtime_state: - raise Exception("Graph runtime state not initialized.") + raise ValueError("graph runtime state not initialized.") workflow_run = self._handle_workflow_run_success( workflow_run=workflow_run, @@ -366,10 +368,10 @@ class WorkflowAppGenerateTaskPipeline(BasedGenerateTaskPipeline, WorkflowCycleMa ) elif isinstance(event, QueueWorkflowPartialSuccessEvent): if not workflow_run: - raise Exception("Workflow run not initialized.") + raise ValueError("workflow run not initialized.") if not graph_runtime_state: - raise Exception("Graph runtime state not initialized.") + raise ValueError("graph runtime state not initialized.") workflow_run = self._handle_workflow_run_partial_success( workflow_run=workflow_run, @@ -390,10 +392,10 @@ class WorkflowAppGenerateTaskPipeline(BasedGenerateTaskPipeline, WorkflowCycleMa ) elif isinstance(event, QueueWorkflowFailedEvent | QueueStopEvent): if not workflow_run: - raise Exception("Workflow run not initialized.") + raise ValueError("workflow run not initialized.") if not graph_runtime_state: - raise Exception("Graph runtime state not initialized.") + raise ValueError("graph runtime state not initialized.") workflow_run = self._handle_workflow_run_failed( workflow_run=workflow_run, start_at=graph_runtime_state.start_at,