From 483f71f03ce27201ba9a6dabc06ee46aeace0109 Mon Sep 17 00:00:00 2001 From: takatost Date: Fri, 26 Jul 2024 20:13:11 +0800 Subject: [PATCH] fix logging --- .../app/apps/workflow_logging_callback.py | 95 +++++++++++++------ 1 file changed, 67 insertions(+), 28 deletions(-) diff --git a/api/core/app/apps/workflow_logging_callback.py b/api/core/app/apps/workflow_logging_callback.py index 699902a137..42e7d41080 100644 --- a/api/core/app/apps/workflow_logging_callback.py +++ b/api/core/app/apps/workflow_logging_callback.py @@ -2,13 +2,15 @@ from typing import Optional from core.model_runtime.utils.encoders import jsonable_encoder from core.workflow.callbacks.base_workflow_callback import WorkflowCallback -from core.workflow.entities.base_node_data_entities import BaseNodeData -from core.workflow.entities.node_entities import NodeType from core.workflow.graph_engine.entities.event import ( GraphEngineEvent, GraphRunFailedEvent, GraphRunStartedEvent, GraphRunSucceededEvent, + IterationRunFailedEvent, + IterationRunNextEvent, + IterationRunStartedEvent, + IterationRunSucceededEvent, NodeRunFailedEvent, NodeRunStartedEvent, NodeRunStreamChunkEvent, @@ -71,6 +73,27 @@ class WorkflowLoggingCallback(WorkflowCallback): graph_runtime_state=graph_runtime_state, event=event ) + elif isinstance(event, IterationRunStartedEvent): + self.on_workflow_iteration_started( + graph=graph, + graph_init_params=graph_init_params, + graph_runtime_state=graph_runtime_state, + event=event + ) + elif isinstance(event, IterationRunNextEvent): + self.on_workflow_iteration_next( + graph=graph, + graph_init_params=graph_init_params, + graph_runtime_state=graph_runtime_state, + event=event + ) + elif isinstance(event, IterationRunSucceededEvent | IterationRunFailedEvent): + self.on_workflow_iteration_completed( + graph=graph, + graph_init_params=graph_init_params, + graph_runtime_state=graph_runtime_state, + event=event + ) def on_workflow_node_execute_started( self, @@ -112,11 +135,16 @@ class WorkflowLoggingCallback(WorkflowCallback): if route_node_state.node_run_result: node_run_result = route_node_state.node_run_result - self.print_text(f"Inputs: {jsonable_encoder(node_run_result.inputs) if node_run_result.inputs else ''}", color='green') - self.print_text(f"Process Data: {jsonable_encoder(node_run_result.process_data) if node_run_result.process_data else ''}", color='green') - self.print_text(f"Outputs: {jsonable_encoder(node_run_result.outputs) if node_run_result.outputs else ''}", color='green') - self.print_text(f"Metadata: {jsonable_encoder(node_run_result.execution_metadata) if node_run_result.execution_metadata else ''}", + self.print_text(f"Inputs: {jsonable_encoder(node_run_result.inputs) if node_run_result.inputs else ''}", color='green') + self.print_text( + f"Process Data: {jsonable_encoder(node_run_result.process_data) if node_run_result.process_data else ''}", + color='green') + self.print_text(f"Outputs: {jsonable_encoder(node_run_result.outputs) if node_run_result.outputs else ''}", + color='green') + self.print_text( + f"Metadata: {jsonable_encoder(node_run_result.execution_metadata) if node_run_result.execution_metadata else ''}", + color='green') def on_workflow_node_execute_failed( self, @@ -141,9 +169,13 @@ class WorkflowLoggingCallback(WorkflowCallback): if route_node_state.node_run_result: node_run_result = route_node_state.node_run_result self.print_text(f"Error: {node_run_result.error}", color='red') - self.print_text(f"Inputs: {jsonable_encoder(node_run_result.inputs) if node_run_result.inputs else ''}", color='red') - self.print_text(f"Process Data: {jsonable_encoder(node_run_result.process_data) if node_run_result.process_data else ''}", color='red') - self.print_text(f"Outputs: {jsonable_encoder(node_run_result.outputs) if node_run_result.outputs else ''}", color='red') + self.print_text(f"Inputs: {jsonable_encoder(node_run_result.inputs) if node_run_result.inputs else ''}", + color='red') + self.print_text( + f"Process Data: {jsonable_encoder(node_run_result.process_data) if node_run_result.process_data else ''}", + color='red') + self.print_text(f"Outputs: {jsonable_encoder(node_run_result.outputs) if node_run_result.outputs else ''}", + color='red') def on_node_text_chunk( self, @@ -163,42 +195,49 @@ class WorkflowLoggingCallback(WorkflowCallback): node_run_result = route_node_state.node_run_result if node_run_result: - self.print_text(f"Metadata: {jsonable_encoder(node_run_result.metadata) if node_run_result.metadata else ''}") + self.print_text( + f"Metadata: {jsonable_encoder(node_run_result.metadata) if node_run_result.metadata else ''}") self.print_text(event.chunk_content, color="pink", end="") - def on_workflow_iteration_started(self, - node_id: str, - node_type: NodeType, - node_run_index: int = 1, - node_data: Optional[BaseNodeData] = None, - inputs: dict = None, - predecessor_node_id: Optional[str] = None, - metadata: Optional[dict] = None) -> None: + def on_workflow_iteration_started( + self, + graph: Graph, + graph_init_params: GraphInitParams, + graph_runtime_state: GraphRuntimeState, + event: IterationRunStartedEvent + ) -> None: """ Publish iteration started """ self.print_text("\n[on_workflow_iteration_started]", color='blue') - self.print_text(f"Node ID: {node_id}", color='blue') + self.print_text(f"Node ID: {event.iteration_id}", color='blue') - def on_workflow_iteration_next(self, node_id: str, - node_type: NodeType, - index: int, - node_run_index: int, - output: Optional[dict]) -> None: + def on_workflow_iteration_next( + self, + graph: Graph, + graph_init_params: GraphInitParams, + graph_runtime_state: GraphRuntimeState, + event: IterationRunNextEvent + ) -> None: """ Publish iteration next """ self.print_text("\n[on_workflow_iteration_next]", color='blue') + self.print_text(f"Node ID: {event.iteration_id}", color='blue') - def on_workflow_iteration_completed(self, node_id: str, - node_type: NodeType, - node_run_index: int, - outputs: dict) -> None: + def on_workflow_iteration_completed( + self, + graph: Graph, + graph_init_params: GraphInitParams, + graph_runtime_state: GraphRuntimeState, + event: IterationRunSucceededEvent | IterationRunFailedEvent + ) -> None: """ Publish iteration completed """ self.print_text("\n[on_workflow_iteration_completed]", color='blue') + self.print_text(f"Node ID: {event.iteration_id}", color='blue') def print_text( self, text: str, color: Optional[str] = None, end: str = "\n"