fix logging

This commit is contained in:
takatost 2024-07-26 20:13:11 +08:00
parent beea1e1663
commit 483f71f03c
1 changed files with 67 additions and 28 deletions

View File

@ -2,13 +2,15 @@ from typing import Optional
from core.model_runtime.utils.encoders import jsonable_encoder
from core.workflow.callbacks.base_workflow_callback import WorkflowCallback
from core.workflow.entities.base_node_data_entities import BaseNodeData
from core.workflow.entities.node_entities import NodeType
from core.workflow.graph_engine.entities.event import (
GraphEngineEvent,
GraphRunFailedEvent,
GraphRunStartedEvent,
GraphRunSucceededEvent,
IterationRunFailedEvent,
IterationRunNextEvent,
IterationRunStartedEvent,
IterationRunSucceededEvent,
NodeRunFailedEvent,
NodeRunStartedEvent,
NodeRunStreamChunkEvent,
@ -71,6 +73,27 @@ class WorkflowLoggingCallback(WorkflowCallback):
graph_runtime_state=graph_runtime_state,
event=event
)
elif isinstance(event, IterationRunStartedEvent):
self.on_workflow_iteration_started(
graph=graph,
graph_init_params=graph_init_params,
graph_runtime_state=graph_runtime_state,
event=event
)
elif isinstance(event, IterationRunNextEvent):
self.on_workflow_iteration_next(
graph=graph,
graph_init_params=graph_init_params,
graph_runtime_state=graph_runtime_state,
event=event
)
elif isinstance(event, IterationRunSucceededEvent | IterationRunFailedEvent):
self.on_workflow_iteration_completed(
graph=graph,
graph_init_params=graph_init_params,
graph_runtime_state=graph_runtime_state,
event=event
)
def on_workflow_node_execute_started(
self,
@ -112,11 +135,16 @@ class WorkflowLoggingCallback(WorkflowCallback):
if route_node_state.node_run_result:
node_run_result = route_node_state.node_run_result
self.print_text(f"Inputs: {jsonable_encoder(node_run_result.inputs) if node_run_result.inputs else ''}", color='green')
self.print_text(f"Process Data: {jsonable_encoder(node_run_result.process_data) if node_run_result.process_data else ''}", color='green')
self.print_text(f"Outputs: {jsonable_encoder(node_run_result.outputs) if node_run_result.outputs else ''}", color='green')
self.print_text(f"Metadata: {jsonable_encoder(node_run_result.execution_metadata) if node_run_result.execution_metadata else ''}",
self.print_text(f"Inputs: {jsonable_encoder(node_run_result.inputs) if node_run_result.inputs else ''}",
color='green')
self.print_text(
f"Process Data: {jsonable_encoder(node_run_result.process_data) if node_run_result.process_data else ''}",
color='green')
self.print_text(f"Outputs: {jsonable_encoder(node_run_result.outputs) if node_run_result.outputs else ''}",
color='green')
self.print_text(
f"Metadata: {jsonable_encoder(node_run_result.execution_metadata) if node_run_result.execution_metadata else ''}",
color='green')
def on_workflow_node_execute_failed(
self,
@ -141,9 +169,13 @@ class WorkflowLoggingCallback(WorkflowCallback):
if route_node_state.node_run_result:
node_run_result = route_node_state.node_run_result
self.print_text(f"Error: {node_run_result.error}", color='red')
self.print_text(f"Inputs: {jsonable_encoder(node_run_result.inputs) if node_run_result.inputs else ''}", color='red')
self.print_text(f"Process Data: {jsonable_encoder(node_run_result.process_data) if node_run_result.process_data else ''}", color='red')
self.print_text(f"Outputs: {jsonable_encoder(node_run_result.outputs) if node_run_result.outputs else ''}", color='red')
self.print_text(f"Inputs: {jsonable_encoder(node_run_result.inputs) if node_run_result.inputs else ''}",
color='red')
self.print_text(
f"Process Data: {jsonable_encoder(node_run_result.process_data) if node_run_result.process_data else ''}",
color='red')
self.print_text(f"Outputs: {jsonable_encoder(node_run_result.outputs) if node_run_result.outputs else ''}",
color='red')
def on_node_text_chunk(
self,
@ -163,42 +195,49 @@ class WorkflowLoggingCallback(WorkflowCallback):
node_run_result = route_node_state.node_run_result
if node_run_result:
self.print_text(f"Metadata: {jsonable_encoder(node_run_result.metadata) if node_run_result.metadata else ''}")
self.print_text(
f"Metadata: {jsonable_encoder(node_run_result.metadata) if node_run_result.metadata else ''}")
self.print_text(event.chunk_content, color="pink", end="")
def on_workflow_iteration_started(self,
node_id: str,
node_type: NodeType,
node_run_index: int = 1,
node_data: Optional[BaseNodeData] = None,
inputs: dict = None,
predecessor_node_id: Optional[str] = None,
metadata: Optional[dict] = None) -> None:
def on_workflow_iteration_started(
self,
graph: Graph,
graph_init_params: GraphInitParams,
graph_runtime_state: GraphRuntimeState,
event: IterationRunStartedEvent
) -> None:
"""
Publish iteration started
"""
self.print_text("\n[on_workflow_iteration_started]", color='blue')
self.print_text(f"Node ID: {node_id}", color='blue')
self.print_text(f"Node ID: {event.iteration_id}", color='blue')
def on_workflow_iteration_next(self, node_id: str,
node_type: NodeType,
index: int,
node_run_index: int,
output: Optional[dict]) -> None:
def on_workflow_iteration_next(
self,
graph: Graph,
graph_init_params: GraphInitParams,
graph_runtime_state: GraphRuntimeState,
event: IterationRunNextEvent
) -> None:
"""
Publish iteration next
"""
self.print_text("\n[on_workflow_iteration_next]", color='blue')
self.print_text(f"Node ID: {event.iteration_id}", color='blue')
def on_workflow_iteration_completed(self, node_id: str,
node_type: NodeType,
node_run_index: int,
outputs: dict) -> None:
def on_workflow_iteration_completed(
self,
graph: Graph,
graph_init_params: GraphInitParams,
graph_runtime_state: GraphRuntimeState,
event: IterationRunSucceededEvent | IterationRunFailedEvent
) -> None:
"""
Publish iteration completed
"""
self.print_text("\n[on_workflow_iteration_completed]", color='blue')
self.print_text(f"Node ID: {event.iteration_id}", color='blue')
def print_text(
self, text: str, color: Optional[str] = None, end: str = "\n"