refactor: convert isinstance chains to match/case (part 3) (#36242)

Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
This commit is contained in:
EvanYao 2026-05-16 16:16:14 +08:00 committed by GitHub
parent c0bdd6792f
commit e7e6fe8813
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 429 additions and 415 deletions

View File

@ -245,49 +245,50 @@ class AdvancedChatAppGenerateTaskPipeline(GraphRuntimeStateSupport):
"""
human_input_responses: list[HumanInputRequiredResponse] = []
for stream_response in generator:
if isinstance(stream_response, ErrorStreamResponse):
raise stream_response.err
elif isinstance(stream_response, HumanInputRequiredResponse):
human_input_responses.append(stream_response)
elif isinstance(stream_response, WorkflowPauseStreamResponse):
return AdvancedChatPausedBlockingResponse(
task_id=stream_response.task_id,
data=AdvancedChatPausedBlockingResponse.Data(
id=self._message_id,
mode=self._conversation_mode,
conversation_id=self._conversation_id,
message_id=self._message_id,
workflow_run_id=stream_response.data.workflow_run_id,
answer=self._task_state.answer,
metadata=self._message_end_to_stream_response().metadata,
created_at=self._message_created_at,
paused_nodes=stream_response.data.paused_nodes,
reasons=stream_response.data.reasons,
status=stream_response.data.status,
elapsed_time=stream_response.data.elapsed_time,
total_tokens=stream_response.data.total_tokens,
total_steps=stream_response.data.total_steps,
),
)
elif isinstance(stream_response, MessageEndStreamResponse):
extras = {}
if stream_response.metadata:
extras["metadata"] = stream_response.metadata
match stream_response:
case ErrorStreamResponse():
raise stream_response.err
case HumanInputRequiredResponse():
human_input_responses.append(stream_response)
case WorkflowPauseStreamResponse():
return AdvancedChatPausedBlockingResponse(
task_id=stream_response.task_id,
data=AdvancedChatPausedBlockingResponse.Data(
id=self._message_id,
mode=self._conversation_mode,
conversation_id=self._conversation_id,
message_id=self._message_id,
workflow_run_id=stream_response.data.workflow_run_id,
answer=self._task_state.answer,
metadata=self._message_end_to_stream_response().metadata,
created_at=self._message_created_at,
paused_nodes=stream_response.data.paused_nodes,
reasons=stream_response.data.reasons,
status=stream_response.data.status,
elapsed_time=stream_response.data.elapsed_time,
total_tokens=stream_response.data.total_tokens,
total_steps=stream_response.data.total_steps,
),
)
case MessageEndStreamResponse():
extras = {}
if stream_response.metadata:
extras["metadata"] = stream_response.metadata
return ChatbotAppBlockingResponse(
task_id=stream_response.task_id,
data=ChatbotAppBlockingResponse.Data(
id=self._message_id,
mode=self._conversation_mode,
conversation_id=self._conversation_id,
message_id=self._message_id,
answer=self._task_state.answer,
created_at=self._message_created_at,
**extras,
),
)
else:
continue
return ChatbotAppBlockingResponse(
task_id=stream_response.task_id,
data=ChatbotAppBlockingResponse.Data(
id=self._message_id,
mode=self._conversation_mode,
conversation_id=self._conversation_id,
message_id=self._message_id,
answer=self._task_state.answer,
created_at=self._message_created_at,
**extras,
),
)
case _:
continue
if human_input_responses:
return self._build_paused_blocking_response_from_human_input(human_input_responses)

View File

@ -145,50 +145,51 @@ class WorkflowAppGenerateTaskPipeline(GraphRuntimeStateSupport):
"""
human_input_responses: list[HumanInputRequiredResponse] = []
for stream_response in generator:
if isinstance(stream_response, ErrorStreamResponse):
raise stream_response.err
elif isinstance(stream_response, HumanInputRequiredResponse):
human_input_responses.append(stream_response)
elif isinstance(stream_response, WorkflowPauseStreamResponse):
return WorkflowAppPausedBlockingResponse(
task_id=self._application_generate_entity.task_id,
workflow_run_id=stream_response.data.workflow_run_id,
data=WorkflowAppPausedBlockingResponse.Data(
id=stream_response.data.workflow_run_id,
workflow_id=self._workflow.id,
status=stream_response.data.status,
outputs=stream_response.data.outputs or {},
error=None,
elapsed_time=stream_response.data.elapsed_time,
total_tokens=stream_response.data.total_tokens,
total_steps=stream_response.data.total_steps,
created_at=stream_response.data.created_at,
finished_at=None,
paused_nodes=stream_response.data.paused_nodes,
reasons=stream_response.data.reasons,
),
)
elif isinstance(stream_response, WorkflowFinishStreamResponse):
return WorkflowAppBlockingResponse(
task_id=self._application_generate_entity.task_id,
workflow_run_id=stream_response.data.id,
data=WorkflowAppBlockingResponse.Data(
id=stream_response.data.id,
workflow_id=stream_response.data.workflow_id,
status=stream_response.data.status,
outputs=stream_response.data.outputs,
error=stream_response.data.error,
elapsed_time=stream_response.data.elapsed_time,
total_tokens=stream_response.data.total_tokens,
total_steps=stream_response.data.total_steps,
created_at=int(stream_response.data.created_at),
finished_at=int(stream_response.data.finished_at) if stream_response.data.finished_at else None,
),
)
else:
continue
match stream_response:
case ErrorStreamResponse():
raise stream_response.err
case HumanInputRequiredResponse():
human_input_responses.append(stream_response)
case WorkflowPauseStreamResponse():
return WorkflowAppPausedBlockingResponse(
task_id=self._application_generate_entity.task_id,
workflow_run_id=stream_response.data.workflow_run_id,
data=WorkflowAppPausedBlockingResponse.Data(
id=stream_response.data.workflow_run_id,
workflow_id=self._workflow.id,
status=stream_response.data.status,
outputs=stream_response.data.outputs or {},
error=None,
elapsed_time=stream_response.data.elapsed_time,
total_tokens=stream_response.data.total_tokens,
total_steps=stream_response.data.total_steps,
created_at=stream_response.data.created_at,
finished_at=None,
paused_nodes=stream_response.data.paused_nodes,
reasons=stream_response.data.reasons,
),
)
case WorkflowFinishStreamResponse():
return WorkflowAppBlockingResponse(
task_id=self._application_generate_entity.task_id,
workflow_run_id=stream_response.data.id,
data=WorkflowAppBlockingResponse.Data(
id=stream_response.data.id,
workflow_id=stream_response.data.workflow_id,
status=stream_response.data.status,
outputs=stream_response.data.outputs,
error=stream_response.data.error,
elapsed_time=stream_response.data.elapsed_time,
total_tokens=stream_response.data.total_tokens,
total_steps=stream_response.data.total_steps,
created_at=int(stream_response.data.created_at),
finished_at=int(stream_response.data.finished_at)
if stream_response.data.finished_at
else None,
),
)
case _:
continue
if human_input_responses:
return self._build_paused_blocking_response_from_human_input(human_input_responses)

View File

@ -399,278 +399,281 @@ class WorkflowBasedAppRunner:
:param workflow_entry: workflow entry
:param event: event
"""
if isinstance(event, GraphRunStartedEvent):
self._publish_event(QueueWorkflowStartedEvent(reason=event.reason))
elif isinstance(event, GraphRunSucceededEvent):
self._publish_event(QueueWorkflowSucceededEvent(outputs=event.outputs))
elif isinstance(event, GraphRunPartialSucceededEvent):
self._publish_event(
QueueWorkflowPartialSuccessEvent(outputs=event.outputs, exceptions_count=event.exceptions_count)
)
elif isinstance(event, GraphRunFailedEvent):
self._publish_event(QueueWorkflowFailedEvent(error=event.error, exceptions_count=event.exceptions_count))
elif isinstance(event, GraphRunAbortedEvent):
self._publish_event(QueueWorkflowFailedEvent(error=event.reason or "Unknown error", exceptions_count=0))
elif isinstance(event, GraphRunPausedEvent):
runtime_state = workflow_entry.graph_engine.graph_runtime_state
paused_nodes = runtime_state.get_paused_nodes()
self._enqueue_human_input_notifications(event.reasons)
self._publish_event(
QueueWorkflowPausedEvent(
reasons=event.reasons,
outputs=event.outputs,
paused_nodes=paused_nodes,
match event:
case GraphRunStartedEvent():
self._publish_event(QueueWorkflowStartedEvent(reason=event.reason))
case GraphRunSucceededEvent():
self._publish_event(QueueWorkflowSucceededEvent(outputs=event.outputs))
case GraphRunPartialSucceededEvent():
self._publish_event(
QueueWorkflowPartialSuccessEvent(outputs=event.outputs, exceptions_count=event.exceptions_count)
)
)
elif isinstance(event, NodeRunHumanInputFormFilledEvent):
self._publish_event(
QueueHumanInputFormFilledEvent(
node_execution_id=event.id,
node_id=event.node_id,
node_type=event.node_type,
node_title=event.node_title,
rendered_content=event.rendered_content,
action_id=event.action_id,
action_text=event.action_text,
case GraphRunFailedEvent():
self._publish_event(
QueueWorkflowFailedEvent(error=event.error, exceptions_count=event.exceptions_count)
)
)
elif isinstance(event, NodeRunHumanInputFormTimeoutEvent):
self._publish_event(
QueueHumanInputFormTimeoutEvent(
node_id=event.node_id,
node_type=event.node_type,
node_title=event.node_title,
expiration_time=event.expiration_time,
case GraphRunAbortedEvent():
self._publish_event(QueueWorkflowFailedEvent(error=event.reason or "Unknown error", exceptions_count=0))
case GraphRunPausedEvent():
runtime_state = workflow_entry.graph_engine.graph_runtime_state
paused_nodes = runtime_state.get_paused_nodes()
self._enqueue_human_input_notifications(event.reasons)
self._publish_event(
QueueWorkflowPausedEvent(
reasons=event.reasons,
outputs=event.outputs,
paused_nodes=paused_nodes,
)
)
)
elif isinstance(event, NodeRunRetryEvent):
node_run_result = event.node_run_result
inputs = node_run_result.inputs
process_data = node_run_result.process_data
outputs = project_node_outputs_for_workflow_run(
node_type=event.node_type,
inputs=inputs,
outputs=node_run_result.outputs,
)
execution_metadata = node_run_result.metadata
self._publish_event(
QueueNodeRetryEvent(
node_execution_id=event.id,
node_id=event.node_id,
node_title=event.node_title,
case NodeRunHumanInputFormFilledEvent():
self._publish_event(
QueueHumanInputFormFilledEvent(
node_execution_id=event.id,
node_id=event.node_id,
node_type=event.node_type,
node_title=event.node_title,
rendered_content=event.rendered_content,
action_id=event.action_id,
action_text=event.action_text,
)
)
case NodeRunHumanInputFormTimeoutEvent():
self._publish_event(
QueueHumanInputFormTimeoutEvent(
node_id=event.node_id,
node_type=event.node_type,
node_title=event.node_title,
expiration_time=event.expiration_time,
)
)
case NodeRunRetryEvent():
node_run_result = event.node_run_result
inputs = node_run_result.inputs
process_data = node_run_result.process_data
outputs = project_node_outputs_for_workflow_run(
node_type=event.node_type,
start_at=event.start_at,
in_iteration_id=event.in_iteration_id,
in_loop_id=event.in_loop_id,
inputs=inputs,
process_data=process_data,
outputs=outputs,
error=event.error,
execution_metadata=execution_metadata,
retry_index=event.retry_index,
provider_type=event.provider_type,
provider_id=event.provider_id,
outputs=node_run_result.outputs,
)
)
elif isinstance(event, NodeRunStartedEvent):
self._publish_event(
QueueNodeStartedEvent(
node_execution_id=event.id,
node_id=event.node_id,
node_title=event.node_title,
node_type=event.node_type,
start_at=event.start_at,
in_iteration_id=event.in_iteration_id,
in_loop_id=event.in_loop_id,
agent_strategy=self._build_agent_strategy_info(event),
provider_type=event.provider_type,
provider_id=event.provider_id,
execution_metadata = node_run_result.metadata
self._publish_event(
QueueNodeRetryEvent(
node_execution_id=event.id,
node_id=event.node_id,
node_title=event.node_title,
node_type=event.node_type,
start_at=event.start_at,
in_iteration_id=event.in_iteration_id,
in_loop_id=event.in_loop_id,
inputs=inputs,
process_data=process_data,
outputs=outputs,
error=event.error,
execution_metadata=execution_metadata,
retry_index=event.retry_index,
provider_type=event.provider_type,
provider_id=event.provider_id,
)
)
)
elif isinstance(event, NodeRunSucceededEvent):
node_run_result = event.node_run_result
inputs = node_run_result.inputs
process_data = node_run_result.process_data
outputs = project_node_outputs_for_workflow_run(
node_type=event.node_type,
inputs=inputs,
outputs=node_run_result.outputs,
)
execution_metadata = node_run_result.metadata
self._publish_event(
QueueNodeSucceededEvent(
node_execution_id=event.id,
node_id=event.node_id,
case NodeRunStartedEvent():
self._publish_event(
QueueNodeStartedEvent(
node_execution_id=event.id,
node_id=event.node_id,
node_title=event.node_title,
node_type=event.node_type,
start_at=event.start_at,
in_iteration_id=event.in_iteration_id,
in_loop_id=event.in_loop_id,
agent_strategy=self._build_agent_strategy_info(event),
provider_type=event.provider_type,
provider_id=event.provider_id,
)
)
case NodeRunSucceededEvent():
node_run_result = event.node_run_result
inputs = node_run_result.inputs
process_data = node_run_result.process_data
outputs = project_node_outputs_for_workflow_run(
node_type=event.node_type,
start_at=event.start_at,
finished_at=event.finished_at,
inputs=inputs,
process_data=process_data,
outputs=outputs,
execution_metadata=execution_metadata,
in_iteration_id=event.in_iteration_id,
in_loop_id=event.in_loop_id,
outputs=node_run_result.outputs,
)
)
elif isinstance(event, NodeRunFailedEvent):
outputs = project_node_outputs_for_workflow_run(
node_type=event.node_type,
inputs=event.node_run_result.inputs,
outputs=event.node_run_result.outputs,
)
self._publish_event(
QueueNodeFailedEvent(
node_execution_id=event.id,
node_id=event.node_id,
execution_metadata = node_run_result.metadata
self._publish_event(
QueueNodeSucceededEvent(
node_execution_id=event.id,
node_id=event.node_id,
node_type=event.node_type,
start_at=event.start_at,
finished_at=event.finished_at,
inputs=inputs,
process_data=process_data,
outputs=outputs,
execution_metadata=execution_metadata,
in_iteration_id=event.in_iteration_id,
in_loop_id=event.in_loop_id,
)
)
case NodeRunFailedEvent():
outputs = project_node_outputs_for_workflow_run(
node_type=event.node_type,
start_at=event.start_at,
finished_at=event.finished_at,
inputs=event.node_run_result.inputs,
process_data=event.node_run_result.process_data,
outputs=outputs,
error=event.node_run_result.error or "Unknown error",
execution_metadata=event.node_run_result.metadata,
in_iteration_id=event.in_iteration_id,
in_loop_id=event.in_loop_id,
outputs=event.node_run_result.outputs,
)
)
elif isinstance(event, NodeRunExceptionEvent):
outputs = project_node_outputs_for_workflow_run(
node_type=event.node_type,
inputs=event.node_run_result.inputs,
outputs=event.node_run_result.outputs,
)
self._publish_event(
QueueNodeExceptionEvent(
node_execution_id=event.id,
node_id=event.node_id,
self._publish_event(
QueueNodeFailedEvent(
node_execution_id=event.id,
node_id=event.node_id,
node_type=event.node_type,
start_at=event.start_at,
finished_at=event.finished_at,
inputs=event.node_run_result.inputs,
process_data=event.node_run_result.process_data,
outputs=outputs,
error=event.node_run_result.error or "Unknown error",
execution_metadata=event.node_run_result.metadata,
in_iteration_id=event.in_iteration_id,
in_loop_id=event.in_loop_id,
)
)
case NodeRunExceptionEvent():
outputs = project_node_outputs_for_workflow_run(
node_type=event.node_type,
start_at=event.start_at,
finished_at=event.finished_at,
inputs=event.node_run_result.inputs,
process_data=event.node_run_result.process_data,
outputs=outputs,
error=event.node_run_result.error or "Unknown error",
execution_metadata=event.node_run_result.metadata,
in_iteration_id=event.in_iteration_id,
in_loop_id=event.in_loop_id,
outputs=event.node_run_result.outputs,
)
)
elif isinstance(event, NodeRunStreamChunkEvent):
self._publish_event(
QueueTextChunkEvent(
text=event.chunk,
from_variable_selector=list(event.selector),
in_iteration_id=event.in_iteration_id,
in_loop_id=event.in_loop_id,
self._publish_event(
QueueNodeExceptionEvent(
node_execution_id=event.id,
node_id=event.node_id,
node_type=event.node_type,
start_at=event.start_at,
finished_at=event.finished_at,
inputs=event.node_run_result.inputs,
process_data=event.node_run_result.process_data,
outputs=outputs,
error=event.node_run_result.error or "Unknown error",
execution_metadata=event.node_run_result.metadata,
in_iteration_id=event.in_iteration_id,
in_loop_id=event.in_loop_id,
)
)
)
elif isinstance(event, NodeRunRetrieverResourceEvent):
self._publish_event(
QueueRetrieverResourcesEvent(
retriever_resources=[
RetrievalSourceMetadata.model_validate(resource) for resource in event.retriever_resources
],
in_iteration_id=event.in_iteration_id,
in_loop_id=event.in_loop_id,
case NodeRunStreamChunkEvent():
self._publish_event(
QueueTextChunkEvent(
text=event.chunk,
from_variable_selector=list(event.selector),
in_iteration_id=event.in_iteration_id,
in_loop_id=event.in_loop_id,
)
)
)
elif isinstance(event, NodeRunAgentLogEvent):
self._publish_event(
QueueAgentLogEvent(
id=event.message_id,
label=event.label,
node_execution_id=event.node_execution_id,
parent_id=event.parent_id,
error=event.error,
status=event.status,
data=event.data,
metadata=event.metadata,
node_id=event.node_id,
case NodeRunRetrieverResourceEvent():
self._publish_event(
QueueRetrieverResourcesEvent(
retriever_resources=[
RetrievalSourceMetadata.model_validate(resource) for resource in event.retriever_resources
],
in_iteration_id=event.in_iteration_id,
in_loop_id=event.in_loop_id,
)
)
)
elif isinstance(event, NodeRunIterationStartedEvent):
self._publish_event(
QueueIterationStartEvent(
node_execution_id=event.id,
node_id=event.node_id,
node_type=event.node_type,
node_title=event.node_title,
start_at=event.start_at,
node_run_index=workflow_entry.graph_engine.graph_runtime_state.node_run_steps,
inputs=event.inputs,
metadata=event.metadata,
case NodeRunAgentLogEvent():
self._publish_event(
QueueAgentLogEvent(
id=event.message_id,
label=event.label,
node_execution_id=event.node_execution_id,
parent_id=event.parent_id,
error=event.error,
status=event.status,
data=event.data,
metadata=event.metadata,
node_id=event.node_id,
)
)
)
elif isinstance(event, NodeRunIterationNextEvent):
self._publish_event(
QueueIterationNextEvent(
node_execution_id=event.id,
node_id=event.node_id,
node_type=event.node_type,
node_title=event.node_title,
index=event.index,
node_run_index=workflow_entry.graph_engine.graph_runtime_state.node_run_steps,
output=event.pre_iteration_output,
case NodeRunIterationStartedEvent():
self._publish_event(
QueueIterationStartEvent(
node_execution_id=event.id,
node_id=event.node_id,
node_type=event.node_type,
node_title=event.node_title,
start_at=event.start_at,
node_run_index=workflow_entry.graph_engine.graph_runtime_state.node_run_steps,
inputs=event.inputs,
metadata=event.metadata,
)
)
)
elif isinstance(event, (NodeRunIterationSucceededEvent | NodeRunIterationFailedEvent)):
self._publish_event(
QueueIterationCompletedEvent(
node_execution_id=event.id,
node_id=event.node_id,
node_type=event.node_type,
node_title=event.node_title,
start_at=event.start_at,
node_run_index=workflow_entry.graph_engine.graph_runtime_state.node_run_steps,
inputs=event.inputs,
outputs=event.outputs,
metadata=event.metadata,
steps=event.steps,
error=event.error if isinstance(event, NodeRunIterationFailedEvent) else None,
case NodeRunIterationNextEvent():
self._publish_event(
QueueIterationNextEvent(
node_execution_id=event.id,
node_id=event.node_id,
node_type=event.node_type,
node_title=event.node_title,
index=event.index,
node_run_index=workflow_entry.graph_engine.graph_runtime_state.node_run_steps,
output=event.pre_iteration_output,
)
)
)
elif isinstance(event, NodeRunLoopStartedEvent):
self._publish_event(
QueueLoopStartEvent(
node_execution_id=event.id,
node_id=event.node_id,
node_type=event.node_type,
node_title=event.node_title,
start_at=event.start_at,
node_run_index=workflow_entry.graph_engine.graph_runtime_state.node_run_steps,
inputs=event.inputs,
metadata=event.metadata,
case NodeRunIterationSucceededEvent() | NodeRunIterationFailedEvent():
self._publish_event(
QueueIterationCompletedEvent(
node_execution_id=event.id,
node_id=event.node_id,
node_type=event.node_type,
node_title=event.node_title,
start_at=event.start_at,
node_run_index=workflow_entry.graph_engine.graph_runtime_state.node_run_steps,
inputs=event.inputs,
outputs=event.outputs,
metadata=event.metadata,
steps=event.steps,
error=event.error if isinstance(event, NodeRunIterationFailedEvent) else None,
)
)
)
elif isinstance(event, NodeRunLoopNextEvent):
self._publish_event(
QueueLoopNextEvent(
node_execution_id=event.id,
node_id=event.node_id,
node_type=event.node_type,
node_title=event.node_title,
index=event.index,
node_run_index=workflow_entry.graph_engine.graph_runtime_state.node_run_steps,
output=event.pre_loop_output,
case NodeRunLoopStartedEvent():
self._publish_event(
QueueLoopStartEvent(
node_execution_id=event.id,
node_id=event.node_id,
node_type=event.node_type,
node_title=event.node_title,
start_at=event.start_at,
node_run_index=workflow_entry.graph_engine.graph_runtime_state.node_run_steps,
inputs=event.inputs,
metadata=event.metadata,
)
)
)
elif isinstance(event, (NodeRunLoopSucceededEvent | NodeRunLoopFailedEvent)):
self._publish_event(
QueueLoopCompletedEvent(
node_execution_id=event.id,
node_id=event.node_id,
node_type=event.node_type,
node_title=event.node_title,
start_at=event.start_at,
node_run_index=workflow_entry.graph_engine.graph_runtime_state.node_run_steps,
inputs=event.inputs,
outputs=event.outputs,
metadata=event.metadata,
steps=event.steps,
error=event.error if isinstance(event, NodeRunLoopFailedEvent) else None,
case NodeRunLoopNextEvent():
self._publish_event(
QueueLoopNextEvent(
node_execution_id=event.id,
node_id=event.node_id,
node_type=event.node_type,
node_title=event.node_title,
index=event.index,
node_run_index=workflow_entry.graph_engine.graph_runtime_state.node_run_steps,
output=event.pre_loop_output,
)
)
case NodeRunLoopSucceededEvent() | NodeRunLoopFailedEvent():
self._publish_event(
QueueLoopCompletedEvent(
node_execution_id=event.id,
node_id=event.node_id,
node_type=event.node_type,
node_title=event.node_title,
start_at=event.start_at,
node_run_index=workflow_entry.graph_engine.graph_runtime_state.node_run_steps,
inputs=event.inputs,
outputs=event.outputs,
metadata=event.metadata,
steps=event.steps,
error=event.error if isinstance(event, NodeRunLoopFailedEvent) else None,
)
)
)
def _enqueue_human_input_notifications(self, reasons: Sequence[object]) -> None:
for reason in reasons:

View File

@ -95,13 +95,14 @@ class AppGeneratorTTSPublisher:
message_content = message.event.chunk.delta.message.content
if not message_content:
continue
if isinstance(message_content, str):
self.msg_text += message_content
elif isinstance(message_content, list):
for content in message_content:
if not isinstance(content, TextPromptMessageContent):
continue
self.msg_text += content.data
match message_content:
case str():
self.msg_text += message_content
case list():
for content in message_content:
if not isinstance(content, TextPromptMessageContent):
continue
self.msg_text += content.data
elif isinstance(message.event, QueueTextChunkEvent):
self.msg_text += message.event.text
elif isinstance(message.event, QueueNodeSucceededEvent):

View File

@ -67,23 +67,27 @@ class MCPTool(Tool):
# handle dify tool output
for content in result.content:
if isinstance(content, TextContent):
yield from self._process_text_content(content)
elif isinstance(content, ImageContent | AudioContent):
yield self.create_blob_message(
blob=base64.b64decode(content.data), meta={"mime_type": content.mimeType}
)
elif isinstance(content, EmbeddedResource):
resource = content.resource
if isinstance(resource, TextResourceContents):
yield self.create_text_message(resource.text)
elif isinstance(resource, BlobResourceContents):
mime_type = resource.mimeType or "application/octet-stream"
yield self.create_blob_message(blob=base64.b64decode(resource.blob), meta={"mime_type": mime_type})
else:
raise ToolInvokeError(f"Unsupported embedded resource type: {type(resource)}")
else:
logger.warning("Unsupported content type=%s", type(content))
match content:
case TextContent():
yield from self._process_text_content(content)
case ImageContent() | AudioContent():
yield self.create_blob_message(
blob=base64.b64decode(content.data), meta={"mime_type": content.mimeType}
)
case EmbeddedResource():
resource = content.resource
match resource:
case TextResourceContents():
yield self.create_text_message(resource.text)
case BlobResourceContents():
mime_type = resource.mimeType or "application/octet-stream"
yield self.create_blob_message(
blob=base64.b64decode(resource.blob), meta={"mime_type": mime_type}
)
case _:
raise ToolInvokeError(f"Unsupported embedded resource type: {type(resource)}")
case _:
logger.warning("Unsupported content type=%s", type(content))
# handle MCP structured output
if self.entity.output_schema and result.structuredContent:

View File

@ -77,28 +77,29 @@ class EnterpriseOtelTrace:
self._exporter = exporter
def trace(self, trace_info: BaseTraceInfo) -> None:
if isinstance(trace_info, WorkflowTraceInfo):
self._workflow_trace(trace_info)
elif isinstance(trace_info, MessageTraceInfo):
self._message_trace(trace_info)
elif isinstance(trace_info, ToolTraceInfo):
self._tool_trace(trace_info)
elif isinstance(trace_info, DraftNodeExecutionTrace):
self._draft_node_execution_trace(trace_info)
elif isinstance(trace_info, WorkflowNodeTraceInfo):
self._node_execution_trace(trace_info)
elif isinstance(trace_info, ModerationTraceInfo):
self._moderation_trace(trace_info)
elif isinstance(trace_info, SuggestedQuestionTraceInfo):
self._suggested_question_trace(trace_info)
elif isinstance(trace_info, DatasetRetrievalTraceInfo):
self._dataset_retrieval_trace(trace_info)
elif isinstance(trace_info, GenerateNameTraceInfo):
self._generate_name_trace(trace_info)
elif isinstance(trace_info, PromptGenerationTraceInfo):
self._prompt_generation_trace(trace_info)
else:
raise AssertionError("this statment should be unreachable")
match trace_info:
case WorkflowTraceInfo():
self._workflow_trace(trace_info)
case MessageTraceInfo():
self._message_trace(trace_info)
case ToolTraceInfo():
self._tool_trace(trace_info)
case DraftNodeExecutionTrace():
self._draft_node_execution_trace(trace_info)
case WorkflowNodeTraceInfo():
self._node_execution_trace(trace_info)
case ModerationTraceInfo():
self._moderation_trace(trace_info)
case SuggestedQuestionTraceInfo():
self._suggested_question_trace(trace_info)
case DatasetRetrievalTraceInfo():
self._dataset_retrieval_trace(trace_info)
case GenerateNameTraceInfo():
self._generate_name_trace(trace_info)
case PromptGenerationTraceInfo():
self._prompt_generation_trace(trace_info)
case _:
raise AssertionError("this statment should be unreachable")
def _common_attrs(self, trace_info: BaseTraceInfo) -> dict[str, Any]:
metadata = self._metadata(trace_info)

View File

@ -89,20 +89,21 @@ class MLflowDataTrace(BaseTraceInstance):
def trace(self, trace_info: BaseTraceInfo):
"""Simple dispatch to trace methods"""
try:
if isinstance(trace_info, WorkflowTraceInfo):
self.workflow_trace(trace_info)
elif isinstance(trace_info, MessageTraceInfo):
self.message_trace(trace_info)
elif isinstance(trace_info, ToolTraceInfo):
self.tool_trace(trace_info)
elif isinstance(trace_info, ModerationTraceInfo):
self.moderation_trace(trace_info)
elif isinstance(trace_info, DatasetRetrievalTraceInfo):
self.dataset_retrieval_trace(trace_info)
elif isinstance(trace_info, SuggestedQuestionTraceInfo):
self.suggested_question_trace(trace_info)
elif isinstance(trace_info, GenerateNameTraceInfo):
self.generate_name_trace(trace_info)
match trace_info:
case WorkflowTraceInfo():
self.workflow_trace(trace_info)
case MessageTraceInfo():
self.message_trace(trace_info)
case ToolTraceInfo():
self.tool_trace(trace_info)
case ModerationTraceInfo():
self.moderation_trace(trace_info)
case DatasetRetrievalTraceInfo():
self.dataset_retrieval_trace(trace_info)
case SuggestedQuestionTraceInfo():
self.suggested_question_trace(trace_info)
case GenerateNameTraceInfo():
self.generate_name_trace(trace_info)
except Exception:
logger.exception("[MLflow] Trace error")
raise
@ -480,14 +481,15 @@ class MLflowDataTrace(BaseTraceInstance):
def _parse_prompts(self, prompts):
"""Postprocess prompts format to be standard chat messages"""
if isinstance(prompts, str):
return prompts
elif isinstance(prompts, dict):
return self._parse_single_message(prompts)
elif isinstance(prompts, list):
messages = [self._parse_single_message(item) for item in prompts]
messages = self._resolve_tool_call_ids(messages)
return messages
match prompts:
case str():
return prompts
case dict():
return self._parse_single_message(prompts)
case list():
messages = [self._parse_single_message(item) for item in prompts]
messages = self._resolve_tool_call_ids(messages)
return messages
return prompts # Fallback to original format
def _parse_single_message(self, item: dict[str, Any]):

View File

@ -58,20 +58,21 @@ class TencentDataTrace(BaseTraceInstance):
def trace(self, trace_info: BaseTraceInfo) -> None:
"""Main tracing entry point - coordinates different trace types."""
if isinstance(trace_info, WorkflowTraceInfo):
self.workflow_trace(trace_info)
elif isinstance(trace_info, MessageTraceInfo):
self.message_trace(trace_info)
elif isinstance(trace_info, ModerationTraceInfo):
pass
elif isinstance(trace_info, SuggestedQuestionTraceInfo):
self.suggested_question_trace(trace_info)
elif isinstance(trace_info, DatasetRetrievalTraceInfo):
self.dataset_retrieval_trace(trace_info)
elif isinstance(trace_info, ToolTraceInfo):
self.tool_trace(trace_info)
elif isinstance(trace_info, GenerateNameTraceInfo):
pass
match trace_info:
case WorkflowTraceInfo():
self.workflow_trace(trace_info)
case MessageTraceInfo():
self.message_trace(trace_info)
case ModerationTraceInfo():
pass
case SuggestedQuestionTraceInfo():
self.suggested_question_trace(trace_info)
case DatasetRetrievalTraceInfo():
self.dataset_retrieval_trace(trace_info)
case ToolTraceInfo():
self.tool_trace(trace_info)
case GenerateNameTraceInfo():
pass
def api_check(self) -> bool:
return self.trace_client.api_check()