From e7e6fe8813c30d7d4815d7d347f0b48748c4aaa0 Mon Sep 17 00:00:00 2001 From: EvanYao <2869018789@qq.com> Date: Sat, 16 May 2026 16:16:14 +0800 Subject: [PATCH] refactor: convert isinstance chains to match/case (part 3) (#36242) Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com> --- .../advanced_chat/generate_task_pipeline.py | 85 +-- .../apps/workflow/generate_task_pipeline.py | 89 ++-- api/core/app/apps/workflow_app_runner.py | 497 +++++++++--------- .../base/tts/app_generator_tts_publisher.py | 15 +- api/core/tools/mcp_tool/tool.py | 38 +- api/enterprise/telemetry/enterprise_trace.py | 45 +- .../src/dify_trace_mlflow/mlflow_trace.py | 46 +- .../src/dify_trace_tencent/tencent_trace.py | 29 +- 8 files changed, 429 insertions(+), 415 deletions(-) diff --git a/api/core/app/apps/advanced_chat/generate_task_pipeline.py b/api/core/app/apps/advanced_chat/generate_task_pipeline.py index 3c46f91e51..adbe2241ef 100644 --- a/api/core/app/apps/advanced_chat/generate_task_pipeline.py +++ b/api/core/app/apps/advanced_chat/generate_task_pipeline.py @@ -245,49 +245,50 @@ class AdvancedChatAppGenerateTaskPipeline(GraphRuntimeStateSupport): """ human_input_responses: list[HumanInputRequiredResponse] = [] for stream_response in generator: - if isinstance(stream_response, ErrorStreamResponse): - raise stream_response.err - elif isinstance(stream_response, HumanInputRequiredResponse): - human_input_responses.append(stream_response) - elif isinstance(stream_response, WorkflowPauseStreamResponse): - return AdvancedChatPausedBlockingResponse( - task_id=stream_response.task_id, - data=AdvancedChatPausedBlockingResponse.Data( - id=self._message_id, - mode=self._conversation_mode, - conversation_id=self._conversation_id, - message_id=self._message_id, - workflow_run_id=stream_response.data.workflow_run_id, - answer=self._task_state.answer, - metadata=self._message_end_to_stream_response().metadata, - created_at=self._message_created_at, - paused_nodes=stream_response.data.paused_nodes, - reasons=stream_response.data.reasons, - status=stream_response.data.status, - elapsed_time=stream_response.data.elapsed_time, - total_tokens=stream_response.data.total_tokens, - total_steps=stream_response.data.total_steps, - ), - ) - elif isinstance(stream_response, MessageEndStreamResponse): - extras = {} - if stream_response.metadata: - extras["metadata"] = stream_response.metadata + match stream_response: + case ErrorStreamResponse(): + raise stream_response.err + case HumanInputRequiredResponse(): + human_input_responses.append(stream_response) + case WorkflowPauseStreamResponse(): + return AdvancedChatPausedBlockingResponse( + task_id=stream_response.task_id, + data=AdvancedChatPausedBlockingResponse.Data( + id=self._message_id, + mode=self._conversation_mode, + conversation_id=self._conversation_id, + message_id=self._message_id, + workflow_run_id=stream_response.data.workflow_run_id, + answer=self._task_state.answer, + metadata=self._message_end_to_stream_response().metadata, + created_at=self._message_created_at, + paused_nodes=stream_response.data.paused_nodes, + reasons=stream_response.data.reasons, + status=stream_response.data.status, + elapsed_time=stream_response.data.elapsed_time, + total_tokens=stream_response.data.total_tokens, + total_steps=stream_response.data.total_steps, + ), + ) + case MessageEndStreamResponse(): + extras = {} + if stream_response.metadata: + extras["metadata"] = stream_response.metadata - return ChatbotAppBlockingResponse( - task_id=stream_response.task_id, - data=ChatbotAppBlockingResponse.Data( - id=self._message_id, - mode=self._conversation_mode, - conversation_id=self._conversation_id, - message_id=self._message_id, - answer=self._task_state.answer, - created_at=self._message_created_at, - **extras, - ), - ) - else: - continue + return ChatbotAppBlockingResponse( + task_id=stream_response.task_id, + data=ChatbotAppBlockingResponse.Data( + id=self._message_id, + mode=self._conversation_mode, + conversation_id=self._conversation_id, + message_id=self._message_id, + answer=self._task_state.answer, + created_at=self._message_created_at, + **extras, + ), + ) + case _: + continue if human_input_responses: return self._build_paused_blocking_response_from_human_input(human_input_responses) diff --git a/api/core/app/apps/workflow/generate_task_pipeline.py b/api/core/app/apps/workflow/generate_task_pipeline.py index 87d9b73078..80f8e3ad4a 100644 --- a/api/core/app/apps/workflow/generate_task_pipeline.py +++ b/api/core/app/apps/workflow/generate_task_pipeline.py @@ -145,50 +145,51 @@ class WorkflowAppGenerateTaskPipeline(GraphRuntimeStateSupport): """ human_input_responses: list[HumanInputRequiredResponse] = [] for stream_response in generator: - if isinstance(stream_response, ErrorStreamResponse): - raise stream_response.err - elif isinstance(stream_response, HumanInputRequiredResponse): - human_input_responses.append(stream_response) - elif isinstance(stream_response, WorkflowPauseStreamResponse): - return WorkflowAppPausedBlockingResponse( - task_id=self._application_generate_entity.task_id, - workflow_run_id=stream_response.data.workflow_run_id, - data=WorkflowAppPausedBlockingResponse.Data( - id=stream_response.data.workflow_run_id, - workflow_id=self._workflow.id, - status=stream_response.data.status, - outputs=stream_response.data.outputs or {}, - error=None, - elapsed_time=stream_response.data.elapsed_time, - total_tokens=stream_response.data.total_tokens, - total_steps=stream_response.data.total_steps, - created_at=stream_response.data.created_at, - finished_at=None, - paused_nodes=stream_response.data.paused_nodes, - reasons=stream_response.data.reasons, - ), - ) - - elif isinstance(stream_response, WorkflowFinishStreamResponse): - return WorkflowAppBlockingResponse( - task_id=self._application_generate_entity.task_id, - workflow_run_id=stream_response.data.id, - data=WorkflowAppBlockingResponse.Data( - id=stream_response.data.id, - workflow_id=stream_response.data.workflow_id, - status=stream_response.data.status, - outputs=stream_response.data.outputs, - error=stream_response.data.error, - elapsed_time=stream_response.data.elapsed_time, - total_tokens=stream_response.data.total_tokens, - total_steps=stream_response.data.total_steps, - created_at=int(stream_response.data.created_at), - finished_at=int(stream_response.data.finished_at) if stream_response.data.finished_at else None, - ), - ) - - else: - continue + match stream_response: + case ErrorStreamResponse(): + raise stream_response.err + case HumanInputRequiredResponse(): + human_input_responses.append(stream_response) + case WorkflowPauseStreamResponse(): + return WorkflowAppPausedBlockingResponse( + task_id=self._application_generate_entity.task_id, + workflow_run_id=stream_response.data.workflow_run_id, + data=WorkflowAppPausedBlockingResponse.Data( + id=stream_response.data.workflow_run_id, + workflow_id=self._workflow.id, + status=stream_response.data.status, + outputs=stream_response.data.outputs or {}, + error=None, + elapsed_time=stream_response.data.elapsed_time, + total_tokens=stream_response.data.total_tokens, + total_steps=stream_response.data.total_steps, + created_at=stream_response.data.created_at, + finished_at=None, + paused_nodes=stream_response.data.paused_nodes, + reasons=stream_response.data.reasons, + ), + ) + case WorkflowFinishStreamResponse(): + return WorkflowAppBlockingResponse( + task_id=self._application_generate_entity.task_id, + workflow_run_id=stream_response.data.id, + data=WorkflowAppBlockingResponse.Data( + id=stream_response.data.id, + workflow_id=stream_response.data.workflow_id, + status=stream_response.data.status, + outputs=stream_response.data.outputs, + error=stream_response.data.error, + elapsed_time=stream_response.data.elapsed_time, + total_tokens=stream_response.data.total_tokens, + total_steps=stream_response.data.total_steps, + created_at=int(stream_response.data.created_at), + finished_at=int(stream_response.data.finished_at) + if stream_response.data.finished_at + else None, + ), + ) + case _: + continue if human_input_responses: return self._build_paused_blocking_response_from_human_input(human_input_responses) diff --git a/api/core/app/apps/workflow_app_runner.py b/api/core/app/apps/workflow_app_runner.py index 047b54c86c..84e9573416 100644 --- a/api/core/app/apps/workflow_app_runner.py +++ b/api/core/app/apps/workflow_app_runner.py @@ -399,278 +399,281 @@ class WorkflowBasedAppRunner: :param workflow_entry: workflow entry :param event: event """ - if isinstance(event, GraphRunStartedEvent): - self._publish_event(QueueWorkflowStartedEvent(reason=event.reason)) - elif isinstance(event, GraphRunSucceededEvent): - self._publish_event(QueueWorkflowSucceededEvent(outputs=event.outputs)) - elif isinstance(event, GraphRunPartialSucceededEvent): - self._publish_event( - QueueWorkflowPartialSuccessEvent(outputs=event.outputs, exceptions_count=event.exceptions_count) - ) - elif isinstance(event, GraphRunFailedEvent): - self._publish_event(QueueWorkflowFailedEvent(error=event.error, exceptions_count=event.exceptions_count)) - elif isinstance(event, GraphRunAbortedEvent): - self._publish_event(QueueWorkflowFailedEvent(error=event.reason or "Unknown error", exceptions_count=0)) - elif isinstance(event, GraphRunPausedEvent): - runtime_state = workflow_entry.graph_engine.graph_runtime_state - paused_nodes = runtime_state.get_paused_nodes() - self._enqueue_human_input_notifications(event.reasons) - self._publish_event( - QueueWorkflowPausedEvent( - reasons=event.reasons, - outputs=event.outputs, - paused_nodes=paused_nodes, + match event: + case GraphRunStartedEvent(): + self._publish_event(QueueWorkflowStartedEvent(reason=event.reason)) + case GraphRunSucceededEvent(): + self._publish_event(QueueWorkflowSucceededEvent(outputs=event.outputs)) + case GraphRunPartialSucceededEvent(): + self._publish_event( + QueueWorkflowPartialSuccessEvent(outputs=event.outputs, exceptions_count=event.exceptions_count) ) - ) - elif isinstance(event, NodeRunHumanInputFormFilledEvent): - self._publish_event( - QueueHumanInputFormFilledEvent( - node_execution_id=event.id, - node_id=event.node_id, - node_type=event.node_type, - node_title=event.node_title, - rendered_content=event.rendered_content, - action_id=event.action_id, - action_text=event.action_text, + case GraphRunFailedEvent(): + self._publish_event( + QueueWorkflowFailedEvent(error=event.error, exceptions_count=event.exceptions_count) ) - ) - elif isinstance(event, NodeRunHumanInputFormTimeoutEvent): - self._publish_event( - QueueHumanInputFormTimeoutEvent( - node_id=event.node_id, - node_type=event.node_type, - node_title=event.node_title, - expiration_time=event.expiration_time, + case GraphRunAbortedEvent(): + self._publish_event(QueueWorkflowFailedEvent(error=event.reason or "Unknown error", exceptions_count=0)) + case GraphRunPausedEvent(): + runtime_state = workflow_entry.graph_engine.graph_runtime_state + paused_nodes = runtime_state.get_paused_nodes() + self._enqueue_human_input_notifications(event.reasons) + self._publish_event( + QueueWorkflowPausedEvent( + reasons=event.reasons, + outputs=event.outputs, + paused_nodes=paused_nodes, + ) ) - ) - elif isinstance(event, NodeRunRetryEvent): - node_run_result = event.node_run_result - inputs = node_run_result.inputs - process_data = node_run_result.process_data - outputs = project_node_outputs_for_workflow_run( - node_type=event.node_type, - inputs=inputs, - outputs=node_run_result.outputs, - ) - execution_metadata = node_run_result.metadata - self._publish_event( - QueueNodeRetryEvent( - node_execution_id=event.id, - node_id=event.node_id, - node_title=event.node_title, + case NodeRunHumanInputFormFilledEvent(): + self._publish_event( + QueueHumanInputFormFilledEvent( + node_execution_id=event.id, + node_id=event.node_id, + node_type=event.node_type, + node_title=event.node_title, + rendered_content=event.rendered_content, + action_id=event.action_id, + action_text=event.action_text, + ) + ) + case NodeRunHumanInputFormTimeoutEvent(): + self._publish_event( + QueueHumanInputFormTimeoutEvent( + node_id=event.node_id, + node_type=event.node_type, + node_title=event.node_title, + expiration_time=event.expiration_time, + ) + ) + case NodeRunRetryEvent(): + node_run_result = event.node_run_result + inputs = node_run_result.inputs + process_data = node_run_result.process_data + outputs = project_node_outputs_for_workflow_run( node_type=event.node_type, - start_at=event.start_at, - in_iteration_id=event.in_iteration_id, - in_loop_id=event.in_loop_id, inputs=inputs, - process_data=process_data, - outputs=outputs, - error=event.error, - execution_metadata=execution_metadata, - retry_index=event.retry_index, - provider_type=event.provider_type, - provider_id=event.provider_id, + outputs=node_run_result.outputs, ) - ) - elif isinstance(event, NodeRunStartedEvent): - self._publish_event( - QueueNodeStartedEvent( - node_execution_id=event.id, - node_id=event.node_id, - node_title=event.node_title, - node_type=event.node_type, - start_at=event.start_at, - in_iteration_id=event.in_iteration_id, - in_loop_id=event.in_loop_id, - agent_strategy=self._build_agent_strategy_info(event), - provider_type=event.provider_type, - provider_id=event.provider_id, + execution_metadata = node_run_result.metadata + self._publish_event( + QueueNodeRetryEvent( + node_execution_id=event.id, + node_id=event.node_id, + node_title=event.node_title, + node_type=event.node_type, + start_at=event.start_at, + in_iteration_id=event.in_iteration_id, + in_loop_id=event.in_loop_id, + inputs=inputs, + process_data=process_data, + outputs=outputs, + error=event.error, + execution_metadata=execution_metadata, + retry_index=event.retry_index, + provider_type=event.provider_type, + provider_id=event.provider_id, + ) ) - ) - elif isinstance(event, NodeRunSucceededEvent): - node_run_result = event.node_run_result - inputs = node_run_result.inputs - process_data = node_run_result.process_data - outputs = project_node_outputs_for_workflow_run( - node_type=event.node_type, - inputs=inputs, - outputs=node_run_result.outputs, - ) - execution_metadata = node_run_result.metadata - self._publish_event( - QueueNodeSucceededEvent( - node_execution_id=event.id, - node_id=event.node_id, + case NodeRunStartedEvent(): + self._publish_event( + QueueNodeStartedEvent( + node_execution_id=event.id, + node_id=event.node_id, + node_title=event.node_title, + node_type=event.node_type, + start_at=event.start_at, + in_iteration_id=event.in_iteration_id, + in_loop_id=event.in_loop_id, + agent_strategy=self._build_agent_strategy_info(event), + provider_type=event.provider_type, + provider_id=event.provider_id, + ) + ) + case NodeRunSucceededEvent(): + node_run_result = event.node_run_result + inputs = node_run_result.inputs + process_data = node_run_result.process_data + outputs = project_node_outputs_for_workflow_run( node_type=event.node_type, - start_at=event.start_at, - finished_at=event.finished_at, inputs=inputs, - process_data=process_data, - outputs=outputs, - execution_metadata=execution_metadata, - in_iteration_id=event.in_iteration_id, - in_loop_id=event.in_loop_id, + outputs=node_run_result.outputs, ) - ) - elif isinstance(event, NodeRunFailedEvent): - outputs = project_node_outputs_for_workflow_run( - node_type=event.node_type, - inputs=event.node_run_result.inputs, - outputs=event.node_run_result.outputs, - ) - self._publish_event( - QueueNodeFailedEvent( - node_execution_id=event.id, - node_id=event.node_id, + execution_metadata = node_run_result.metadata + self._publish_event( + QueueNodeSucceededEvent( + node_execution_id=event.id, + node_id=event.node_id, + node_type=event.node_type, + start_at=event.start_at, + finished_at=event.finished_at, + inputs=inputs, + process_data=process_data, + outputs=outputs, + execution_metadata=execution_metadata, + in_iteration_id=event.in_iteration_id, + in_loop_id=event.in_loop_id, + ) + ) + case NodeRunFailedEvent(): + outputs = project_node_outputs_for_workflow_run( node_type=event.node_type, - start_at=event.start_at, - finished_at=event.finished_at, inputs=event.node_run_result.inputs, - process_data=event.node_run_result.process_data, - outputs=outputs, - error=event.node_run_result.error or "Unknown error", - execution_metadata=event.node_run_result.metadata, - in_iteration_id=event.in_iteration_id, - in_loop_id=event.in_loop_id, + outputs=event.node_run_result.outputs, ) - ) - elif isinstance(event, NodeRunExceptionEvent): - outputs = project_node_outputs_for_workflow_run( - node_type=event.node_type, - inputs=event.node_run_result.inputs, - outputs=event.node_run_result.outputs, - ) - self._publish_event( - QueueNodeExceptionEvent( - node_execution_id=event.id, - node_id=event.node_id, + self._publish_event( + QueueNodeFailedEvent( + node_execution_id=event.id, + node_id=event.node_id, + node_type=event.node_type, + start_at=event.start_at, + finished_at=event.finished_at, + inputs=event.node_run_result.inputs, + process_data=event.node_run_result.process_data, + outputs=outputs, + error=event.node_run_result.error or "Unknown error", + execution_metadata=event.node_run_result.metadata, + in_iteration_id=event.in_iteration_id, + in_loop_id=event.in_loop_id, + ) + ) + case NodeRunExceptionEvent(): + outputs = project_node_outputs_for_workflow_run( node_type=event.node_type, - start_at=event.start_at, - finished_at=event.finished_at, inputs=event.node_run_result.inputs, - process_data=event.node_run_result.process_data, - outputs=outputs, - error=event.node_run_result.error or "Unknown error", - execution_metadata=event.node_run_result.metadata, - in_iteration_id=event.in_iteration_id, - in_loop_id=event.in_loop_id, + outputs=event.node_run_result.outputs, ) - ) - elif isinstance(event, NodeRunStreamChunkEvent): - self._publish_event( - QueueTextChunkEvent( - text=event.chunk, - from_variable_selector=list(event.selector), - in_iteration_id=event.in_iteration_id, - in_loop_id=event.in_loop_id, + self._publish_event( + QueueNodeExceptionEvent( + node_execution_id=event.id, + node_id=event.node_id, + node_type=event.node_type, + start_at=event.start_at, + finished_at=event.finished_at, + inputs=event.node_run_result.inputs, + process_data=event.node_run_result.process_data, + outputs=outputs, + error=event.node_run_result.error or "Unknown error", + execution_metadata=event.node_run_result.metadata, + in_iteration_id=event.in_iteration_id, + in_loop_id=event.in_loop_id, + ) ) - ) - elif isinstance(event, NodeRunRetrieverResourceEvent): - self._publish_event( - QueueRetrieverResourcesEvent( - retriever_resources=[ - RetrievalSourceMetadata.model_validate(resource) for resource in event.retriever_resources - ], - in_iteration_id=event.in_iteration_id, - in_loop_id=event.in_loop_id, + case NodeRunStreamChunkEvent(): + self._publish_event( + QueueTextChunkEvent( + text=event.chunk, + from_variable_selector=list(event.selector), + in_iteration_id=event.in_iteration_id, + in_loop_id=event.in_loop_id, + ) ) - ) - elif isinstance(event, NodeRunAgentLogEvent): - self._publish_event( - QueueAgentLogEvent( - id=event.message_id, - label=event.label, - node_execution_id=event.node_execution_id, - parent_id=event.parent_id, - error=event.error, - status=event.status, - data=event.data, - metadata=event.metadata, - node_id=event.node_id, + case NodeRunRetrieverResourceEvent(): + self._publish_event( + QueueRetrieverResourcesEvent( + retriever_resources=[ + RetrievalSourceMetadata.model_validate(resource) for resource in event.retriever_resources + ], + in_iteration_id=event.in_iteration_id, + in_loop_id=event.in_loop_id, + ) ) - ) - elif isinstance(event, NodeRunIterationStartedEvent): - self._publish_event( - QueueIterationStartEvent( - node_execution_id=event.id, - node_id=event.node_id, - node_type=event.node_type, - node_title=event.node_title, - start_at=event.start_at, - node_run_index=workflow_entry.graph_engine.graph_runtime_state.node_run_steps, - inputs=event.inputs, - metadata=event.metadata, + case NodeRunAgentLogEvent(): + self._publish_event( + QueueAgentLogEvent( + id=event.message_id, + label=event.label, + node_execution_id=event.node_execution_id, + parent_id=event.parent_id, + error=event.error, + status=event.status, + data=event.data, + metadata=event.metadata, + node_id=event.node_id, + ) ) - ) - elif isinstance(event, NodeRunIterationNextEvent): - self._publish_event( - QueueIterationNextEvent( - node_execution_id=event.id, - node_id=event.node_id, - node_type=event.node_type, - node_title=event.node_title, - index=event.index, - node_run_index=workflow_entry.graph_engine.graph_runtime_state.node_run_steps, - output=event.pre_iteration_output, + case NodeRunIterationStartedEvent(): + self._publish_event( + QueueIterationStartEvent( + node_execution_id=event.id, + node_id=event.node_id, + node_type=event.node_type, + node_title=event.node_title, + start_at=event.start_at, + node_run_index=workflow_entry.graph_engine.graph_runtime_state.node_run_steps, + inputs=event.inputs, + metadata=event.metadata, + ) ) - ) - elif isinstance(event, (NodeRunIterationSucceededEvent | NodeRunIterationFailedEvent)): - self._publish_event( - QueueIterationCompletedEvent( - node_execution_id=event.id, - node_id=event.node_id, - node_type=event.node_type, - node_title=event.node_title, - start_at=event.start_at, - node_run_index=workflow_entry.graph_engine.graph_runtime_state.node_run_steps, - inputs=event.inputs, - outputs=event.outputs, - metadata=event.metadata, - steps=event.steps, - error=event.error if isinstance(event, NodeRunIterationFailedEvent) else None, + case NodeRunIterationNextEvent(): + self._publish_event( + QueueIterationNextEvent( + node_execution_id=event.id, + node_id=event.node_id, + node_type=event.node_type, + node_title=event.node_title, + index=event.index, + node_run_index=workflow_entry.graph_engine.graph_runtime_state.node_run_steps, + output=event.pre_iteration_output, + ) ) - ) - elif isinstance(event, NodeRunLoopStartedEvent): - self._publish_event( - QueueLoopStartEvent( - node_execution_id=event.id, - node_id=event.node_id, - node_type=event.node_type, - node_title=event.node_title, - start_at=event.start_at, - node_run_index=workflow_entry.graph_engine.graph_runtime_state.node_run_steps, - inputs=event.inputs, - metadata=event.metadata, + case NodeRunIterationSucceededEvent() | NodeRunIterationFailedEvent(): + self._publish_event( + QueueIterationCompletedEvent( + node_execution_id=event.id, + node_id=event.node_id, + node_type=event.node_type, + node_title=event.node_title, + start_at=event.start_at, + node_run_index=workflow_entry.graph_engine.graph_runtime_state.node_run_steps, + inputs=event.inputs, + outputs=event.outputs, + metadata=event.metadata, + steps=event.steps, + error=event.error if isinstance(event, NodeRunIterationFailedEvent) else None, + ) ) - ) - elif isinstance(event, NodeRunLoopNextEvent): - self._publish_event( - QueueLoopNextEvent( - node_execution_id=event.id, - node_id=event.node_id, - node_type=event.node_type, - node_title=event.node_title, - index=event.index, - node_run_index=workflow_entry.graph_engine.graph_runtime_state.node_run_steps, - output=event.pre_loop_output, + case NodeRunLoopStartedEvent(): + self._publish_event( + QueueLoopStartEvent( + node_execution_id=event.id, + node_id=event.node_id, + node_type=event.node_type, + node_title=event.node_title, + start_at=event.start_at, + node_run_index=workflow_entry.graph_engine.graph_runtime_state.node_run_steps, + inputs=event.inputs, + metadata=event.metadata, + ) ) - ) - elif isinstance(event, (NodeRunLoopSucceededEvent | NodeRunLoopFailedEvent)): - self._publish_event( - QueueLoopCompletedEvent( - node_execution_id=event.id, - node_id=event.node_id, - node_type=event.node_type, - node_title=event.node_title, - start_at=event.start_at, - node_run_index=workflow_entry.graph_engine.graph_runtime_state.node_run_steps, - inputs=event.inputs, - outputs=event.outputs, - metadata=event.metadata, - steps=event.steps, - error=event.error if isinstance(event, NodeRunLoopFailedEvent) else None, + case NodeRunLoopNextEvent(): + self._publish_event( + QueueLoopNextEvent( + node_execution_id=event.id, + node_id=event.node_id, + node_type=event.node_type, + node_title=event.node_title, + index=event.index, + node_run_index=workflow_entry.graph_engine.graph_runtime_state.node_run_steps, + output=event.pre_loop_output, + ) + ) + case NodeRunLoopSucceededEvent() | NodeRunLoopFailedEvent(): + self._publish_event( + QueueLoopCompletedEvent( + node_execution_id=event.id, + node_id=event.node_id, + node_type=event.node_type, + node_title=event.node_title, + start_at=event.start_at, + node_run_index=workflow_entry.graph_engine.graph_runtime_state.node_run_steps, + inputs=event.inputs, + outputs=event.outputs, + metadata=event.metadata, + steps=event.steps, + error=event.error if isinstance(event, NodeRunLoopFailedEvent) else None, + ) ) - ) def _enqueue_human_input_notifications(self, reasons: Sequence[object]) -> None: for reason in reasons: diff --git a/api/core/base/tts/app_generator_tts_publisher.py b/api/core/base/tts/app_generator_tts_publisher.py index 9e3c187210..7002b1a470 100644 --- a/api/core/base/tts/app_generator_tts_publisher.py +++ b/api/core/base/tts/app_generator_tts_publisher.py @@ -95,13 +95,14 @@ class AppGeneratorTTSPublisher: message_content = message.event.chunk.delta.message.content if not message_content: continue - if isinstance(message_content, str): - self.msg_text += message_content - elif isinstance(message_content, list): - for content in message_content: - if not isinstance(content, TextPromptMessageContent): - continue - self.msg_text += content.data + match message_content: + case str(): + self.msg_text += message_content + case list(): + for content in message_content: + if not isinstance(content, TextPromptMessageContent): + continue + self.msg_text += content.data elif isinstance(message.event, QueueTextChunkEvent): self.msg_text += message.event.text elif isinstance(message.event, QueueNodeSucceededEvent): diff --git a/api/core/tools/mcp_tool/tool.py b/api/core/tools/mcp_tool/tool.py index 00fc8a8282..b54adea98e 100644 --- a/api/core/tools/mcp_tool/tool.py +++ b/api/core/tools/mcp_tool/tool.py @@ -67,23 +67,27 @@ class MCPTool(Tool): # handle dify tool output for content in result.content: - if isinstance(content, TextContent): - yield from self._process_text_content(content) - elif isinstance(content, ImageContent | AudioContent): - yield self.create_blob_message( - blob=base64.b64decode(content.data), meta={"mime_type": content.mimeType} - ) - elif isinstance(content, EmbeddedResource): - resource = content.resource - if isinstance(resource, TextResourceContents): - yield self.create_text_message(resource.text) - elif isinstance(resource, BlobResourceContents): - mime_type = resource.mimeType or "application/octet-stream" - yield self.create_blob_message(blob=base64.b64decode(resource.blob), meta={"mime_type": mime_type}) - else: - raise ToolInvokeError(f"Unsupported embedded resource type: {type(resource)}") - else: - logger.warning("Unsupported content type=%s", type(content)) + match content: + case TextContent(): + yield from self._process_text_content(content) + case ImageContent() | AudioContent(): + yield self.create_blob_message( + blob=base64.b64decode(content.data), meta={"mime_type": content.mimeType} + ) + case EmbeddedResource(): + resource = content.resource + match resource: + case TextResourceContents(): + yield self.create_text_message(resource.text) + case BlobResourceContents(): + mime_type = resource.mimeType or "application/octet-stream" + yield self.create_blob_message( + blob=base64.b64decode(resource.blob), meta={"mime_type": mime_type} + ) + case _: + raise ToolInvokeError(f"Unsupported embedded resource type: {type(resource)}") + case _: + logger.warning("Unsupported content type=%s", type(content)) # handle MCP structured output if self.entity.output_schema and result.structuredContent: diff --git a/api/enterprise/telemetry/enterprise_trace.py b/api/enterprise/telemetry/enterprise_trace.py index fc17d9d93e..25c1a96dc1 100644 --- a/api/enterprise/telemetry/enterprise_trace.py +++ b/api/enterprise/telemetry/enterprise_trace.py @@ -77,28 +77,29 @@ class EnterpriseOtelTrace: self._exporter = exporter def trace(self, trace_info: BaseTraceInfo) -> None: - if isinstance(trace_info, WorkflowTraceInfo): - self._workflow_trace(trace_info) - elif isinstance(trace_info, MessageTraceInfo): - self._message_trace(trace_info) - elif isinstance(trace_info, ToolTraceInfo): - self._tool_trace(trace_info) - elif isinstance(trace_info, DraftNodeExecutionTrace): - self._draft_node_execution_trace(trace_info) - elif isinstance(trace_info, WorkflowNodeTraceInfo): - self._node_execution_trace(trace_info) - elif isinstance(trace_info, ModerationTraceInfo): - self._moderation_trace(trace_info) - elif isinstance(trace_info, SuggestedQuestionTraceInfo): - self._suggested_question_trace(trace_info) - elif isinstance(trace_info, DatasetRetrievalTraceInfo): - self._dataset_retrieval_trace(trace_info) - elif isinstance(trace_info, GenerateNameTraceInfo): - self._generate_name_trace(trace_info) - elif isinstance(trace_info, PromptGenerationTraceInfo): - self._prompt_generation_trace(trace_info) - else: - raise AssertionError("this statment should be unreachable") + match trace_info: + case WorkflowTraceInfo(): + self._workflow_trace(trace_info) + case MessageTraceInfo(): + self._message_trace(trace_info) + case ToolTraceInfo(): + self._tool_trace(trace_info) + case DraftNodeExecutionTrace(): + self._draft_node_execution_trace(trace_info) + case WorkflowNodeTraceInfo(): + self._node_execution_trace(trace_info) + case ModerationTraceInfo(): + self._moderation_trace(trace_info) + case SuggestedQuestionTraceInfo(): + self._suggested_question_trace(trace_info) + case DatasetRetrievalTraceInfo(): + self._dataset_retrieval_trace(trace_info) + case GenerateNameTraceInfo(): + self._generate_name_trace(trace_info) + case PromptGenerationTraceInfo(): + self._prompt_generation_trace(trace_info) + case _: + raise AssertionError("this statment should be unreachable") def _common_attrs(self, trace_info: BaseTraceInfo) -> dict[str, Any]: metadata = self._metadata(trace_info) diff --git a/api/providers/trace/trace-mlflow/src/dify_trace_mlflow/mlflow_trace.py b/api/providers/trace/trace-mlflow/src/dify_trace_mlflow/mlflow_trace.py index 4e4c45a532..e36b3dee40 100644 --- a/api/providers/trace/trace-mlflow/src/dify_trace_mlflow/mlflow_trace.py +++ b/api/providers/trace/trace-mlflow/src/dify_trace_mlflow/mlflow_trace.py @@ -89,20 +89,21 @@ class MLflowDataTrace(BaseTraceInstance): def trace(self, trace_info: BaseTraceInfo): """Simple dispatch to trace methods""" try: - if isinstance(trace_info, WorkflowTraceInfo): - self.workflow_trace(trace_info) - elif isinstance(trace_info, MessageTraceInfo): - self.message_trace(trace_info) - elif isinstance(trace_info, ToolTraceInfo): - self.tool_trace(trace_info) - elif isinstance(trace_info, ModerationTraceInfo): - self.moderation_trace(trace_info) - elif isinstance(trace_info, DatasetRetrievalTraceInfo): - self.dataset_retrieval_trace(trace_info) - elif isinstance(trace_info, SuggestedQuestionTraceInfo): - self.suggested_question_trace(trace_info) - elif isinstance(trace_info, GenerateNameTraceInfo): - self.generate_name_trace(trace_info) + match trace_info: + case WorkflowTraceInfo(): + self.workflow_trace(trace_info) + case MessageTraceInfo(): + self.message_trace(trace_info) + case ToolTraceInfo(): + self.tool_trace(trace_info) + case ModerationTraceInfo(): + self.moderation_trace(trace_info) + case DatasetRetrievalTraceInfo(): + self.dataset_retrieval_trace(trace_info) + case SuggestedQuestionTraceInfo(): + self.suggested_question_trace(trace_info) + case GenerateNameTraceInfo(): + self.generate_name_trace(trace_info) except Exception: logger.exception("[MLflow] Trace error") raise @@ -480,14 +481,15 @@ class MLflowDataTrace(BaseTraceInstance): def _parse_prompts(self, prompts): """Postprocess prompts format to be standard chat messages""" - if isinstance(prompts, str): - return prompts - elif isinstance(prompts, dict): - return self._parse_single_message(prompts) - elif isinstance(prompts, list): - messages = [self._parse_single_message(item) for item in prompts] - messages = self._resolve_tool_call_ids(messages) - return messages + match prompts: + case str(): + return prompts + case dict(): + return self._parse_single_message(prompts) + case list(): + messages = [self._parse_single_message(item) for item in prompts] + messages = self._resolve_tool_call_ids(messages) + return messages return prompts # Fallback to original format def _parse_single_message(self, item: dict[str, Any]): diff --git a/api/providers/trace/trace-tencent/src/dify_trace_tencent/tencent_trace.py b/api/providers/trace/trace-tencent/src/dify_trace_tencent/tencent_trace.py index a8c480e4a5..3e1dc1d9f6 100644 --- a/api/providers/trace/trace-tencent/src/dify_trace_tencent/tencent_trace.py +++ b/api/providers/trace/trace-tencent/src/dify_trace_tencent/tencent_trace.py @@ -58,20 +58,21 @@ class TencentDataTrace(BaseTraceInstance): def trace(self, trace_info: BaseTraceInfo) -> None: """Main tracing entry point - coordinates different trace types.""" - if isinstance(trace_info, WorkflowTraceInfo): - self.workflow_trace(trace_info) - elif isinstance(trace_info, MessageTraceInfo): - self.message_trace(trace_info) - elif isinstance(trace_info, ModerationTraceInfo): - pass - elif isinstance(trace_info, SuggestedQuestionTraceInfo): - self.suggested_question_trace(trace_info) - elif isinstance(trace_info, DatasetRetrievalTraceInfo): - self.dataset_retrieval_trace(trace_info) - elif isinstance(trace_info, ToolTraceInfo): - self.tool_trace(trace_info) - elif isinstance(trace_info, GenerateNameTraceInfo): - pass + match trace_info: + case WorkflowTraceInfo(): + self.workflow_trace(trace_info) + case MessageTraceInfo(): + self.message_trace(trace_info) + case ModerationTraceInfo(): + pass + case SuggestedQuestionTraceInfo(): + self.suggested_question_trace(trace_info) + case DatasetRetrievalTraceInfo(): + self.dataset_retrieval_trace(trace_info) + case ToolTraceInfo(): + self.tool_trace(trace_info) + case GenerateNameTraceInfo(): + pass def api_check(self) -> bool: return self.trace_client.api_check()