mirror of
https://github.com/langgenius/dify.git
synced 2026-05-13 08:57:28 +08:00
add logging callback for workflow
This commit is contained in:
parent
2da7cc6928
commit
1c7573a686
@ -1,4 +1,5 @@
|
|||||||
import logging
|
import logging
|
||||||
|
import os
|
||||||
import time
|
import time
|
||||||
from typing import Optional, cast
|
from typing import Optional, cast
|
||||||
|
|
||||||
@ -6,6 +7,7 @@ from core.app.apps.advanced_chat.app_config_manager import AdvancedChatAppConfig
|
|||||||
from core.app.apps.advanced_chat.workflow_event_trigger_callback import WorkflowEventTriggerCallback
|
from core.app.apps.advanced_chat.workflow_event_trigger_callback import WorkflowEventTriggerCallback
|
||||||
from core.app.apps.base_app_queue_manager import AppQueueManager, PublishFrom
|
from core.app.apps.base_app_queue_manager import AppQueueManager, PublishFrom
|
||||||
from core.app.apps.base_app_runner import AppRunner
|
from core.app.apps.base_app_runner import AppRunner
|
||||||
|
from core.app.apps.workflow_logging_callback import WorkflowLoggingCallback
|
||||||
from core.app.entities.app_invoke_entities import (
|
from core.app.entities.app_invoke_entities import (
|
||||||
AdvancedChatAppGenerateEntity,
|
AdvancedChatAppGenerateEntity,
|
||||||
InvokeFrom,
|
InvokeFrom,
|
||||||
@ -76,6 +78,14 @@ class AdvancedChatAppRunner(AppRunner):
|
|||||||
|
|
||||||
db.session.close()
|
db.session.close()
|
||||||
|
|
||||||
|
workflow_callbacks = [WorkflowEventTriggerCallback(
|
||||||
|
queue_manager=queue_manager,
|
||||||
|
workflow=workflow
|
||||||
|
)]
|
||||||
|
|
||||||
|
if bool(os.environ.get("DEBUG", 'False').lower() == 'true'):
|
||||||
|
workflow_callbacks.append(WorkflowLoggingCallback())
|
||||||
|
|
||||||
# RUN WORKFLOW
|
# RUN WORKFLOW
|
||||||
workflow_engine_manager = WorkflowEngineManager()
|
workflow_engine_manager = WorkflowEngineManager()
|
||||||
workflow_engine_manager.run_workflow(
|
workflow_engine_manager.run_workflow(
|
||||||
@ -90,10 +100,7 @@ class AdvancedChatAppRunner(AppRunner):
|
|||||||
SystemVariable.FILES: files,
|
SystemVariable.FILES: files,
|
||||||
SystemVariable.CONVERSATION: conversation.id,
|
SystemVariable.CONVERSATION: conversation.id,
|
||||||
},
|
},
|
||||||
callbacks=[WorkflowEventTriggerCallback(
|
callbacks=workflow_callbacks
|
||||||
queue_manager=queue_manager,
|
|
||||||
workflow=workflow
|
|
||||||
)]
|
|
||||||
)
|
)
|
||||||
|
|
||||||
def get_workflow(self, app_model: App, workflow_id: str) -> Optional[Workflow]:
|
def get_workflow(self, app_model: App, workflow_id: str) -> Optional[Workflow]:
|
||||||
|
|||||||
@ -1,9 +1,11 @@
|
|||||||
import logging
|
import logging
|
||||||
|
import os
|
||||||
from typing import Optional, cast
|
from typing import Optional, cast
|
||||||
|
|
||||||
from core.app.apps.base_app_queue_manager import AppQueueManager
|
from core.app.apps.base_app_queue_manager import AppQueueManager
|
||||||
from core.app.apps.workflow.app_config_manager import WorkflowAppConfig
|
from core.app.apps.workflow.app_config_manager import WorkflowAppConfig
|
||||||
from core.app.apps.workflow.workflow_event_trigger_callback import WorkflowEventTriggerCallback
|
from core.app.apps.workflow.workflow_event_trigger_callback import WorkflowEventTriggerCallback
|
||||||
|
from core.app.apps.workflow_logging_callback import WorkflowLoggingCallback
|
||||||
from core.app.entities.app_invoke_entities import (
|
from core.app.entities.app_invoke_entities import (
|
||||||
InvokeFrom,
|
InvokeFrom,
|
||||||
WorkflowAppGenerateEntity,
|
WorkflowAppGenerateEntity,
|
||||||
@ -47,6 +49,14 @@ class WorkflowAppRunner:
|
|||||||
|
|
||||||
db.session.close()
|
db.session.close()
|
||||||
|
|
||||||
|
workflow_callbacks = [WorkflowEventTriggerCallback(
|
||||||
|
queue_manager=queue_manager,
|
||||||
|
workflow=workflow
|
||||||
|
)]
|
||||||
|
|
||||||
|
if bool(os.environ.get("DEBUG", 'False').lower() == 'true'):
|
||||||
|
workflow_callbacks.append(WorkflowLoggingCallback())
|
||||||
|
|
||||||
# RUN WORKFLOW
|
# RUN WORKFLOW
|
||||||
workflow_engine_manager = WorkflowEngineManager()
|
workflow_engine_manager = WorkflowEngineManager()
|
||||||
workflow_engine_manager.run_workflow(
|
workflow_engine_manager.run_workflow(
|
||||||
@ -59,10 +69,7 @@ class WorkflowAppRunner:
|
|||||||
system_inputs={
|
system_inputs={
|
||||||
SystemVariable.FILES: files
|
SystemVariable.FILES: files
|
||||||
},
|
},
|
||||||
callbacks=[WorkflowEventTriggerCallback(
|
callbacks=workflow_callbacks
|
||||||
queue_manager=queue_manager,
|
|
||||||
workflow=workflow
|
|
||||||
)]
|
|
||||||
)
|
)
|
||||||
|
|
||||||
def get_workflow(self, app_model: App, workflow_id: str) -> Optional[Workflow]:
|
def get_workflow(self, app_model: App, workflow_id: str) -> Optional[Workflow]:
|
||||||
|
|||||||
122
api/core/app/apps/workflow_logging_callback.py
Normal file
122
api/core/app/apps/workflow_logging_callback.py
Normal file
@ -0,0 +1,122 @@
|
|||||||
|
from typing import Optional
|
||||||
|
|
||||||
|
from core.app.entities.queue_entities import AppQueueEvent
|
||||||
|
from core.model_runtime.utils.encoders import jsonable_encoder
|
||||||
|
from core.workflow.callbacks.base_workflow_callback import BaseWorkflowCallback
|
||||||
|
from core.workflow.entities.base_node_data_entities import BaseNodeData
|
||||||
|
from core.workflow.entities.node_entities import NodeType
|
||||||
|
|
||||||
|
_TEXT_COLOR_MAPPING = {
|
||||||
|
"blue": "36;1",
|
||||||
|
"yellow": "33;1",
|
||||||
|
"pink": "38;5;200",
|
||||||
|
"green": "32;1",
|
||||||
|
"red": "31;1",
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
class WorkflowLoggingCallback(BaseWorkflowCallback):
|
||||||
|
|
||||||
|
def __init__(self) -> None:
|
||||||
|
self.current_node_id = None
|
||||||
|
|
||||||
|
def on_workflow_run_started(self) -> None:
|
||||||
|
"""
|
||||||
|
Workflow run started
|
||||||
|
"""
|
||||||
|
self.print_text("\n[on_workflow_run_started]", color='pink')
|
||||||
|
|
||||||
|
def on_workflow_run_succeeded(self) -> None:
|
||||||
|
"""
|
||||||
|
Workflow run succeeded
|
||||||
|
"""
|
||||||
|
self.print_text("\n[on_workflow_run_succeeded]", color='green')
|
||||||
|
|
||||||
|
def on_workflow_run_failed(self, error: str) -> None:
|
||||||
|
"""
|
||||||
|
Workflow run failed
|
||||||
|
"""
|
||||||
|
self.print_text("\n[on_workflow_run_failed]", color='red')
|
||||||
|
|
||||||
|
def on_workflow_node_execute_started(self, node_id: str,
|
||||||
|
node_type: NodeType,
|
||||||
|
node_data: BaseNodeData,
|
||||||
|
node_run_index: int = 1,
|
||||||
|
predecessor_node_id: Optional[str] = None) -> None:
|
||||||
|
"""
|
||||||
|
Workflow node execute started
|
||||||
|
"""
|
||||||
|
self.print_text("\n[on_workflow_node_execute_started]", color='yellow')
|
||||||
|
self.print_text(f"Node ID: {node_id}", color='yellow')
|
||||||
|
self.print_text(f"Type: {node_type.value}", color='yellow')
|
||||||
|
self.print_text(f"Index: {node_run_index}", color='yellow')
|
||||||
|
if predecessor_node_id:
|
||||||
|
self.print_text(f"Predecessor Node ID: {predecessor_node_id}", color='yellow')
|
||||||
|
|
||||||
|
def on_workflow_node_execute_succeeded(self, node_id: str,
|
||||||
|
node_type: NodeType,
|
||||||
|
node_data: BaseNodeData,
|
||||||
|
inputs: Optional[dict] = None,
|
||||||
|
process_data: Optional[dict] = None,
|
||||||
|
outputs: Optional[dict] = None,
|
||||||
|
execution_metadata: Optional[dict] = None) -> None:
|
||||||
|
"""
|
||||||
|
Workflow node execute succeeded
|
||||||
|
"""
|
||||||
|
self.print_text("\n[on_workflow_node_execute_succeeded]", color='green')
|
||||||
|
self.print_text(f"Node ID: {node_id}", color='green')
|
||||||
|
self.print_text(f"Type: {node_type.value}", color='green')
|
||||||
|
self.print_text(f"Inputs: {jsonable_encoder(inputs) if inputs else ''}", color='green')
|
||||||
|
self.print_text(f"Process Data: {jsonable_encoder(process_data) if process_data else ''}", color='green')
|
||||||
|
self.print_text(f"Outputs: {jsonable_encoder(outputs) if outputs else ''}", color='green')
|
||||||
|
self.print_text(f"Metadata: {jsonable_encoder(execution_metadata) if execution_metadata else ''}",
|
||||||
|
color='green')
|
||||||
|
|
||||||
|
def on_workflow_node_execute_failed(self, node_id: str,
|
||||||
|
node_type: NodeType,
|
||||||
|
node_data: BaseNodeData,
|
||||||
|
error: str,
|
||||||
|
inputs: Optional[dict] = None,
|
||||||
|
outputs: Optional[dict] = None,
|
||||||
|
process_data: Optional[dict] = None) -> None:
|
||||||
|
"""
|
||||||
|
Workflow node execute failed
|
||||||
|
"""
|
||||||
|
self.print_text("\n[on_workflow_node_execute_failed]", color='red')
|
||||||
|
self.print_text(f"Node ID: {node_id}", color='red')
|
||||||
|
self.print_text(f"Type: {node_type.value}", color='red')
|
||||||
|
self.print_text(f"Error: {error}", color='red')
|
||||||
|
self.print_text(f"Inputs: {jsonable_encoder(inputs) if inputs else ''}", color='red')
|
||||||
|
self.print_text(f"Process Data: {jsonable_encoder(process_data) if process_data else ''}", color='red')
|
||||||
|
self.print_text(f"Outputs: {jsonable_encoder(outputs) if outputs else ''}", color='red')
|
||||||
|
|
||||||
|
def on_node_text_chunk(self, node_id: str, text: str, metadata: Optional[dict] = None) -> None:
|
||||||
|
"""
|
||||||
|
Publish text chunk
|
||||||
|
"""
|
||||||
|
if not self.current_node_id or self.current_node_id != node_id:
|
||||||
|
self.current_node_id = node_id
|
||||||
|
self.print_text('\n[on_node_text_chunk]')
|
||||||
|
self.print_text(f"Node ID: {node_id}")
|
||||||
|
self.print_text(f"Metadata: {jsonable_encoder(metadata) if metadata else ''}")
|
||||||
|
|
||||||
|
self.print_text(text, color="pink", end="")
|
||||||
|
|
||||||
|
def on_event(self, event: AppQueueEvent) -> None:
|
||||||
|
"""
|
||||||
|
Publish event
|
||||||
|
"""
|
||||||
|
self.print_text("\n[on_workflow_event]", color='blue')
|
||||||
|
self.print_text(f"Event: {jsonable_encoder(event)}", color='blue')
|
||||||
|
|
||||||
|
def print_text(
|
||||||
|
self, text: str, color: Optional[str] = None, end: str = "\n"
|
||||||
|
) -> None:
|
||||||
|
"""Print text with highlighting and no end characters."""
|
||||||
|
text_to_print = self._get_colored_text(text, color) if color else text
|
||||||
|
print(f'{text_to_print}', end=end)
|
||||||
|
|
||||||
|
def _get_colored_text(self, text: str, color: str) -> str:
|
||||||
|
"""Get colored text."""
|
||||||
|
color_str = _TEXT_COLOR_MAPPING[color]
|
||||||
|
return f"\u001b[{color_str}m\033[1;3m{text}\u001b[0m"
|
||||||
@ -63,7 +63,7 @@ class LargeLanguageModel(AIModel):
|
|||||||
|
|
||||||
callbacks = callbacks or []
|
callbacks = callbacks or []
|
||||||
|
|
||||||
if bool(os.environ.get("DEBUG")):
|
if bool(os.environ.get("DEBUG", 'False').lower() == 'true'):
|
||||||
callbacks.append(LoggingCallback())
|
callbacks.append(LoggingCallback())
|
||||||
|
|
||||||
# trigger before invoke callbacks
|
# trigger before invoke callbacks
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user