diff --git a/api/core/app/apps/pipeline/generate_response_converter.py b/api/core/app/apps/pipeline/generate_response_converter.py index 3913657ae8..03d7049150 100644 --- a/api/core/app/apps/pipeline/generate_response_converter.py +++ b/api/core/app/apps/pipeline/generate_response_converter.py @@ -45,20 +45,24 @@ class WorkflowAppGenerateResponseConverter(AppGenerateResponseConverter[Workflow chunk = cast(WorkflowAppStreamResponse, chunk) sub_stream_response = chunk.stream_response - if isinstance(sub_stream_response, PingStreamResponse): - yield "ping" - continue + match sub_stream_response: + case PingStreamResponse(): + yield "ping" + continue + case ErrorStreamResponse(): + response_chunk = { + "event": sub_stream_response.event.value, + "workflow_run_id": chunk.workflow_run_id, + } + data = cls._error_to_stream_response(sub_stream_response.err) + response_chunk.update(cast(dict, data)) + case _: + response_chunk = { + "event": sub_stream_response.event.value, + "workflow_run_id": chunk.workflow_run_id, + } + response_chunk.update(sub_stream_response.model_dump()) - response_chunk = { - "event": sub_stream_response.event.value, - "workflow_run_id": chunk.workflow_run_id, - } - - if isinstance(sub_stream_response, ErrorStreamResponse): - data = cls._error_to_stream_response(sub_stream_response.err) - response_chunk.update(cast(dict, data)) - else: - response_chunk.update(sub_stream_response.model_dump()) yield response_chunk @classmethod @@ -74,20 +78,28 @@ class WorkflowAppGenerateResponseConverter(AppGenerateResponseConverter[Workflow chunk = cast(WorkflowAppStreamResponse, chunk) sub_stream_response = chunk.stream_response - if isinstance(sub_stream_response, PingStreamResponse): - yield "ping" - continue + match sub_stream_response: + case PingStreamResponse(): + yield "ping" + continue + case ErrorStreamResponse(): + response_chunk = { + "event": sub_stream_response.event.value, + "workflow_run_id": chunk.workflow_run_id, + } + data = cls._error_to_stream_response(sub_stream_response.err) + response_chunk.update(cast(dict, data)) + case NodeStartStreamResponse() | NodeFinishStreamResponse(): + response_chunk = { + "event": sub_stream_response.event.value, + "workflow_run_id": chunk.workflow_run_id, + } + response_chunk.update(cast(dict, sub_stream_response.to_ignore_detail_dict())) + case _: + response_chunk = { + "event": sub_stream_response.event.value, + "workflow_run_id": chunk.workflow_run_id, + } + response_chunk.update(sub_stream_response.model_dump()) - response_chunk = { - "event": sub_stream_response.event.value, - "workflow_run_id": chunk.workflow_run_id, - } - - if isinstance(sub_stream_response, ErrorStreamResponse): - data = cls._error_to_stream_response(sub_stream_response.err) - response_chunk.update(cast(dict, data)) - elif isinstance(sub_stream_response, NodeStartStreamResponse | NodeFinishStreamResponse): - response_chunk.update(cast(dict, sub_stream_response.to_ignore_detail_dict())) - else: - response_chunk.update(sub_stream_response.model_dump()) yield response_chunk diff --git a/api/core/app/apps/workflow/generate_response_converter.py b/api/core/app/apps/workflow/generate_response_converter.py index 4037388798..c390ad30c9 100644 --- a/api/core/app/apps/workflow/generate_response_converter.py +++ b/api/core/app/apps/workflow/generate_response_converter.py @@ -52,20 +52,24 @@ class WorkflowAppGenerateResponseConverter( chunk = cast(WorkflowAppStreamResponse, chunk) sub_stream_response = chunk.stream_response - if isinstance(sub_stream_response, PingStreamResponse): - yield "ping" - continue + match sub_stream_response: + case PingStreamResponse(): + yield "ping" + continue + case ErrorStreamResponse(): + response_chunk: dict[str, object] = { + "event": sub_stream_response.event.value, + "workflow_run_id": chunk.workflow_run_id, + } + data = cls._error_to_stream_response(sub_stream_response.err) + response_chunk.update(data) + case _: + response_chunk = { + "event": sub_stream_response.event.value, + "workflow_run_id": chunk.workflow_run_id, + } + response_chunk.update(sub_stream_response.model_dump(mode="json")) - response_chunk: dict[str, object] = { - "event": sub_stream_response.event.value, - "workflow_run_id": chunk.workflow_run_id, - } - - if isinstance(sub_stream_response, ErrorStreamResponse): - data = cls._error_to_stream_response(sub_stream_response.err) - response_chunk.update(data) - else: - response_chunk.update(sub_stream_response.model_dump(mode="json")) yield response_chunk @classmethod @@ -81,20 +85,28 @@ class WorkflowAppGenerateResponseConverter( chunk = cast(WorkflowAppStreamResponse, chunk) sub_stream_response = chunk.stream_response - if isinstance(sub_stream_response, PingStreamResponse): - yield "ping" - continue + match sub_stream_response: + case PingStreamResponse(): + yield "ping" + continue + case ErrorStreamResponse(): + response_chunk: dict[str, object] = { + "event": sub_stream_response.event.value, + "workflow_run_id": chunk.workflow_run_id, + } + data = cls._error_to_stream_response(sub_stream_response.err) + response_chunk.update(data) + case NodeStartStreamResponse() | NodeFinishStreamResponse(): + response_chunk = { + "event": sub_stream_response.event.value, + "workflow_run_id": chunk.workflow_run_id, + } + response_chunk.update(sub_stream_response.to_ignore_detail_dict()) + case _: + response_chunk = { + "event": sub_stream_response.event.value, + "workflow_run_id": chunk.workflow_run_id, + } + response_chunk.update(sub_stream_response.model_dump(mode="json")) - response_chunk: dict[str, object] = { - "event": sub_stream_response.event.value, - "workflow_run_id": chunk.workflow_run_id, - } - - if isinstance(sub_stream_response, ErrorStreamResponse): - data = cls._error_to_stream_response(sub_stream_response.err) - response_chunk.update(data) - elif isinstance(sub_stream_response, NodeStartStreamResponse | NodeFinishStreamResponse): - response_chunk.update(sub_stream_response.to_ignore_detail_dict()) - else: - response_chunk.update(sub_stream_response.model_dump(mode="json")) yield response_chunk