diff --git a/api/core/app/apps/advanced_chat/app_runner.py b/api/core/app/apps/advanced_chat/app_runner.py index 5f5fd7010c..1c54cf3dc5 100644 --- a/api/core/app/apps/advanced_chat/app_runner.py +++ b/api/core/app/apps/advanced_chat/app_runner.py @@ -1,4 +1,5 @@ import logging +import os import time from typing import Optional, cast @@ -6,6 +7,7 @@ from core.app.apps.advanced_chat.app_config_manager import AdvancedChatAppConfig from core.app.apps.advanced_chat.workflow_event_trigger_callback import WorkflowEventTriggerCallback from core.app.apps.base_app_queue_manager import AppQueueManager, PublishFrom from core.app.apps.base_app_runner import AppRunner +from core.app.apps.workflow_logging_callback import WorkflowLoggingCallback from core.app.entities.app_invoke_entities import ( AdvancedChatAppGenerateEntity, InvokeFrom, @@ -76,6 +78,14 @@ class AdvancedChatAppRunner(AppRunner): db.session.close() + workflow_callbacks = [WorkflowEventTriggerCallback( + queue_manager=queue_manager, + workflow=workflow + )] + + if bool(os.environ.get("DEBUG", 'False').lower() == 'true'): + workflow_callbacks.append(WorkflowLoggingCallback()) + # RUN WORKFLOW workflow_engine_manager = WorkflowEngineManager() workflow_engine_manager.run_workflow( @@ -90,10 +100,7 @@ class AdvancedChatAppRunner(AppRunner): SystemVariable.FILES: files, SystemVariable.CONVERSATION: conversation.id, }, - callbacks=[WorkflowEventTriggerCallback( - queue_manager=queue_manager, - workflow=workflow - )] + callbacks=workflow_callbacks ) def get_workflow(self, app_model: App, workflow_id: str) -> Optional[Workflow]: diff --git a/api/core/app/apps/workflow/app_runner.py b/api/core/app/apps/workflow/app_runner.py index 5712aa68cb..4de6f28290 100644 --- a/api/core/app/apps/workflow/app_runner.py +++ b/api/core/app/apps/workflow/app_runner.py @@ -1,9 +1,11 @@ import logging +import os from typing import Optional, cast from core.app.apps.base_app_queue_manager import AppQueueManager from core.app.apps.workflow.app_config_manager import WorkflowAppConfig from core.app.apps.workflow.workflow_event_trigger_callback import WorkflowEventTriggerCallback +from core.app.apps.workflow_logging_callback import WorkflowLoggingCallback from core.app.entities.app_invoke_entities import ( InvokeFrom, WorkflowAppGenerateEntity, @@ -47,6 +49,14 @@ class WorkflowAppRunner: db.session.close() + workflow_callbacks = [WorkflowEventTriggerCallback( + queue_manager=queue_manager, + workflow=workflow + )] + + if bool(os.environ.get("DEBUG", 'False').lower() == 'true'): + workflow_callbacks.append(WorkflowLoggingCallback()) + # RUN WORKFLOW workflow_engine_manager = WorkflowEngineManager() workflow_engine_manager.run_workflow( @@ -59,10 +69,7 @@ class WorkflowAppRunner: system_inputs={ SystemVariable.FILES: files }, - callbacks=[WorkflowEventTriggerCallback( - queue_manager=queue_manager, - workflow=workflow - )] + callbacks=workflow_callbacks ) def get_workflow(self, app_model: App, workflow_id: str) -> Optional[Workflow]: diff --git a/api/core/app/apps/workflow_logging_callback.py b/api/core/app/apps/workflow_logging_callback.py new file mode 100644 index 0000000000..4627c21c7a --- /dev/null +++ b/api/core/app/apps/workflow_logging_callback.py @@ -0,0 +1,122 @@ +from typing import Optional + +from core.app.entities.queue_entities import AppQueueEvent +from core.model_runtime.utils.encoders import jsonable_encoder +from core.workflow.callbacks.base_workflow_callback import BaseWorkflowCallback +from core.workflow.entities.base_node_data_entities import BaseNodeData +from core.workflow.entities.node_entities import NodeType + +_TEXT_COLOR_MAPPING = { + "blue": "36;1", + "yellow": "33;1", + "pink": "38;5;200", + "green": "32;1", + "red": "31;1", +} + + +class WorkflowLoggingCallback(BaseWorkflowCallback): + + def __init__(self) -> None: + self.current_node_id = None + + def on_workflow_run_started(self) -> None: + """ + Workflow run started + """ + self.print_text("\n[on_workflow_run_started]", color='pink') + + def on_workflow_run_succeeded(self) -> None: + """ + Workflow run succeeded + """ + self.print_text("\n[on_workflow_run_succeeded]", color='green') + + def on_workflow_run_failed(self, error: str) -> None: + """ + Workflow run failed + """ + self.print_text("\n[on_workflow_run_failed]", color='red') + + def on_workflow_node_execute_started(self, node_id: str, + node_type: NodeType, + node_data: BaseNodeData, + node_run_index: int = 1, + predecessor_node_id: Optional[str] = None) -> None: + """ + Workflow node execute started + """ + self.print_text("\n[on_workflow_node_execute_started]", color='yellow') + self.print_text(f"Node ID: {node_id}", color='yellow') + self.print_text(f"Type: {node_type.value}", color='yellow') + self.print_text(f"Index: {node_run_index}", color='yellow') + if predecessor_node_id: + self.print_text(f"Predecessor Node ID: {predecessor_node_id}", color='yellow') + + def on_workflow_node_execute_succeeded(self, node_id: str, + node_type: NodeType, + node_data: BaseNodeData, + inputs: Optional[dict] = None, + process_data: Optional[dict] = None, + outputs: Optional[dict] = None, + execution_metadata: Optional[dict] = None) -> None: + """ + Workflow node execute succeeded + """ + self.print_text("\n[on_workflow_node_execute_succeeded]", color='green') + self.print_text(f"Node ID: {node_id}", color='green') + self.print_text(f"Type: {node_type.value}", color='green') + self.print_text(f"Inputs: {jsonable_encoder(inputs) if inputs else ''}", color='green') + self.print_text(f"Process Data: {jsonable_encoder(process_data) if process_data else ''}", color='green') + self.print_text(f"Outputs: {jsonable_encoder(outputs) if outputs else ''}", color='green') + self.print_text(f"Metadata: {jsonable_encoder(execution_metadata) if execution_metadata else ''}", + color='green') + + def on_workflow_node_execute_failed(self, node_id: str, + node_type: NodeType, + node_data: BaseNodeData, + error: str, + inputs: Optional[dict] = None, + outputs: Optional[dict] = None, + process_data: Optional[dict] = None) -> None: + """ + Workflow node execute failed + """ + self.print_text("\n[on_workflow_node_execute_failed]", color='red') + self.print_text(f"Node ID: {node_id}", color='red') + self.print_text(f"Type: {node_type.value}", color='red') + self.print_text(f"Error: {error}", color='red') + self.print_text(f"Inputs: {jsonable_encoder(inputs) if inputs else ''}", color='red') + self.print_text(f"Process Data: {jsonable_encoder(process_data) if process_data else ''}", color='red') + self.print_text(f"Outputs: {jsonable_encoder(outputs) if outputs else ''}", color='red') + + def on_node_text_chunk(self, node_id: str, text: str, metadata: Optional[dict] = None) -> None: + """ + Publish text chunk + """ + if not self.current_node_id or self.current_node_id != node_id: + self.current_node_id = node_id + self.print_text('\n[on_node_text_chunk]') + self.print_text(f"Node ID: {node_id}") + self.print_text(f"Metadata: {jsonable_encoder(metadata) if metadata else ''}") + + self.print_text(text, color="pink", end="") + + def on_event(self, event: AppQueueEvent) -> None: + """ + Publish event + """ + self.print_text("\n[on_workflow_event]", color='blue') + self.print_text(f"Event: {jsonable_encoder(event)}", color='blue') + + def print_text( + self, text: str, color: Optional[str] = None, end: str = "\n" + ) -> None: + """Print text with highlighting and no end characters.""" + text_to_print = self._get_colored_text(text, color) if color else text + print(f'{text_to_print}', end=end) + + def _get_colored_text(self, text: str, color: str) -> str: + """Get colored text.""" + color_str = _TEXT_COLOR_MAPPING[color] + return f"\u001b[{color_str}m\033[1;3m{text}\u001b[0m" diff --git a/api/core/model_runtime/model_providers/__base/large_language_model.py b/api/core/model_runtime/model_providers/__base/large_language_model.py index 4b546a5356..40bde38565 100644 --- a/api/core/model_runtime/model_providers/__base/large_language_model.py +++ b/api/core/model_runtime/model_providers/__base/large_language_model.py @@ -63,7 +63,7 @@ class LargeLanguageModel(AIModel): callbacks = callbacks or [] - if bool(os.environ.get("DEBUG")): + if bool(os.environ.get("DEBUG", 'False').lower() == 'true'): callbacks.append(LoggingCallback()) # trigger before invoke callbacks