refactor: use match cases for workflow stream responses (#36267)

This commit is contained in:
D-393Patel 2026-05-18 03:08:20 +05:30 committed by GitHub
parent 3c70d28064
commit 127fbf2c9a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 80 additions and 56 deletions

View File

@ -45,20 +45,24 @@ class WorkflowAppGenerateResponseConverter(AppGenerateResponseConverter[Workflow
chunk = cast(WorkflowAppStreamResponse, chunk)
sub_stream_response = chunk.stream_response
if isinstance(sub_stream_response, PingStreamResponse):
yield "ping"
continue
match sub_stream_response:
case PingStreamResponse():
yield "ping"
continue
case ErrorStreamResponse():
response_chunk = {
"event": sub_stream_response.event.value,
"workflow_run_id": chunk.workflow_run_id,
}
data = cls._error_to_stream_response(sub_stream_response.err)
response_chunk.update(cast(dict, data))
case _:
response_chunk = {
"event": sub_stream_response.event.value,
"workflow_run_id": chunk.workflow_run_id,
}
response_chunk.update(sub_stream_response.model_dump())
response_chunk = {
"event": sub_stream_response.event.value,
"workflow_run_id": chunk.workflow_run_id,
}
if isinstance(sub_stream_response, ErrorStreamResponse):
data = cls._error_to_stream_response(sub_stream_response.err)
response_chunk.update(cast(dict, data))
else:
response_chunk.update(sub_stream_response.model_dump())
yield response_chunk
@classmethod
@ -74,20 +78,28 @@ class WorkflowAppGenerateResponseConverter(AppGenerateResponseConverter[Workflow
chunk = cast(WorkflowAppStreamResponse, chunk)
sub_stream_response = chunk.stream_response
if isinstance(sub_stream_response, PingStreamResponse):
yield "ping"
continue
match sub_stream_response:
case PingStreamResponse():
yield "ping"
continue
case ErrorStreamResponse():
response_chunk = {
"event": sub_stream_response.event.value,
"workflow_run_id": chunk.workflow_run_id,
}
data = cls._error_to_stream_response(sub_stream_response.err)
response_chunk.update(cast(dict, data))
case NodeStartStreamResponse() | NodeFinishStreamResponse():
response_chunk = {
"event": sub_stream_response.event.value,
"workflow_run_id": chunk.workflow_run_id,
}
response_chunk.update(cast(dict, sub_stream_response.to_ignore_detail_dict()))
case _:
response_chunk = {
"event": sub_stream_response.event.value,
"workflow_run_id": chunk.workflow_run_id,
}
response_chunk.update(sub_stream_response.model_dump())
response_chunk = {
"event": sub_stream_response.event.value,
"workflow_run_id": chunk.workflow_run_id,
}
if isinstance(sub_stream_response, ErrorStreamResponse):
data = cls._error_to_stream_response(sub_stream_response.err)
response_chunk.update(cast(dict, data))
elif isinstance(sub_stream_response, NodeStartStreamResponse | NodeFinishStreamResponse):
response_chunk.update(cast(dict, sub_stream_response.to_ignore_detail_dict()))
else:
response_chunk.update(sub_stream_response.model_dump())
yield response_chunk

View File

@ -52,20 +52,24 @@ class WorkflowAppGenerateResponseConverter(
chunk = cast(WorkflowAppStreamResponse, chunk)
sub_stream_response = chunk.stream_response
if isinstance(sub_stream_response, PingStreamResponse):
yield "ping"
continue
match sub_stream_response:
case PingStreamResponse():
yield "ping"
continue
case ErrorStreamResponse():
response_chunk: dict[str, object] = {
"event": sub_stream_response.event.value,
"workflow_run_id": chunk.workflow_run_id,
}
data = cls._error_to_stream_response(sub_stream_response.err)
response_chunk.update(data)
case _:
response_chunk = {
"event": sub_stream_response.event.value,
"workflow_run_id": chunk.workflow_run_id,
}
response_chunk.update(sub_stream_response.model_dump(mode="json"))
response_chunk: dict[str, object] = {
"event": sub_stream_response.event.value,
"workflow_run_id": chunk.workflow_run_id,
}
if isinstance(sub_stream_response, ErrorStreamResponse):
data = cls._error_to_stream_response(sub_stream_response.err)
response_chunk.update(data)
else:
response_chunk.update(sub_stream_response.model_dump(mode="json"))
yield response_chunk
@classmethod
@ -81,20 +85,28 @@ class WorkflowAppGenerateResponseConverter(
chunk = cast(WorkflowAppStreamResponse, chunk)
sub_stream_response = chunk.stream_response
if isinstance(sub_stream_response, PingStreamResponse):
yield "ping"
continue
match sub_stream_response:
case PingStreamResponse():
yield "ping"
continue
case ErrorStreamResponse():
response_chunk: dict[str, object] = {
"event": sub_stream_response.event.value,
"workflow_run_id": chunk.workflow_run_id,
}
data = cls._error_to_stream_response(sub_stream_response.err)
response_chunk.update(data)
case NodeStartStreamResponse() | NodeFinishStreamResponse():
response_chunk = {
"event": sub_stream_response.event.value,
"workflow_run_id": chunk.workflow_run_id,
}
response_chunk.update(sub_stream_response.to_ignore_detail_dict())
case _:
response_chunk = {
"event": sub_stream_response.event.value,
"workflow_run_id": chunk.workflow_run_id,
}
response_chunk.update(sub_stream_response.model_dump(mode="json"))
response_chunk: dict[str, object] = {
"event": sub_stream_response.event.value,
"workflow_run_id": chunk.workflow_run_id,
}
if isinstance(sub_stream_response, ErrorStreamResponse):
data = cls._error_to_stream_response(sub_stream_response.err)
response_chunk.update(data)
elif isinstance(sub_stream_response, NodeStartStreamResponse | NodeFinishStreamResponse):
response_chunk.update(sub_stream_response.to_ignore_detail_dict())
else:
response_chunk.update(sub_stream_response.model_dump(mode="json"))
yield response_chunk