import collections import json import logging import os import queue import threading import time from datetime import timedelta from typing import TYPE_CHECKING, Any, Optional, Union from uuid import UUID, uuid4 from cachetools import LRUCache from flask import current_app from sqlalchemy import select from sqlalchemy.orm import Session, sessionmaker from core.helper.encrypter import batch_decrypt_token, encrypt_token, obfuscated_token from core.ops.entities.config_entity import ( OPS_FILE_PATH, TracingProviderEnum, ) from core.ops.entities.trace_entity import ( DatasetRetrievalTraceInfo, DraftNodeExecutionTrace, GenerateNameTraceInfo, MessageTraceInfo, ModerationTraceInfo, SuggestedQuestionTraceInfo, TaskData, ToolTraceInfo, TraceTaskName, WorkflowNodeTraceInfo, WorkflowTraceInfo, ) from core.ops.utils import get_message_data from extensions.ext_database import db from extensions.ext_storage import storage from models.account import Tenant from models.dataset import Dataset from models.model import App, AppModelConfig, Conversation, Message, MessageFile, TraceAppConfig from models.tools import ApiToolProvider, BuiltinToolProvider, MCPToolProvider, WorkflowToolProvider from models.workflow import WorkflowAppLog from repositories.factory import DifyAPIRepositoryFactory from tasks.ops_trace_task import process_trace_tasks if TYPE_CHECKING: from core.workflow.entities import WorkflowExecution logger = logging.getLogger(__name__) def _lookup_app_and_workspace_names(app_id: str | None, tenant_id: str | None) -> tuple[str, str]: """Return (app_name, workspace_name) for the given IDs. Falls back to empty strings.""" app_name = "" workspace_name = "" if not app_id and not tenant_id: return app_name, workspace_name with Session(db.engine) as session: if app_id: name = session.scalar(select(App.name).where(App.id == app_id)) if name: app_name = name if tenant_id: name = session.scalar(select(Tenant.name).where(Tenant.id == tenant_id)) if name: workspace_name = name return app_name, workspace_name _PROVIDER_TYPE_TO_MODEL: dict[str, type] = { "builtin": BuiltinToolProvider, "plugin": BuiltinToolProvider, "api": ApiToolProvider, "workflow": WorkflowToolProvider, "mcp": MCPToolProvider, } def _lookup_credential_name(credential_id: str | None, provider_type: str | None) -> str: if not credential_id: return "" model_cls = _PROVIDER_TYPE_TO_MODEL.get(provider_type or "") if not model_cls: return "" with Session(db.engine) as session: name = session.scalar(select(model_cls.name).where(model_cls.id == credential_id)) return str(name) if name else "" class OpsTraceProviderConfigMap(collections.UserDict[str, dict[str, Any]]): def __getitem__(self, provider: str) -> dict[str, Any]: match provider: case TracingProviderEnum.LANGFUSE: from core.ops.entities.config_entity import LangfuseConfig from core.ops.langfuse_trace.langfuse_trace import LangFuseDataTrace return { "config_class": LangfuseConfig, "secret_keys": ["public_key", "secret_key"], "other_keys": ["host", "project_key"], "trace_instance": LangFuseDataTrace, } case TracingProviderEnum.LANGSMITH: from core.ops.entities.config_entity import LangSmithConfig from core.ops.langsmith_trace.langsmith_trace import LangSmithDataTrace return { "config_class": LangSmithConfig, "secret_keys": ["api_key"], "other_keys": ["project", "endpoint"], "trace_instance": LangSmithDataTrace, } case TracingProviderEnum.OPIK: from core.ops.entities.config_entity import OpikConfig from core.ops.opik_trace.opik_trace import OpikDataTrace return { "config_class": OpikConfig, "secret_keys": ["api_key"], "other_keys": ["project", "url", "workspace"], "trace_instance": OpikDataTrace, } case TracingProviderEnum.WEAVE: from core.ops.entities.config_entity import WeaveConfig from core.ops.weave_trace.weave_trace import WeaveDataTrace return { "config_class": WeaveConfig, "secret_keys": ["api_key"], "other_keys": ["project", "entity", "endpoint", "host"], "trace_instance": WeaveDataTrace, } case TracingProviderEnum.ARIZE: from core.ops.arize_phoenix_trace.arize_phoenix_trace import ArizePhoenixDataTrace from core.ops.entities.config_entity import ArizeConfig return { "config_class": ArizeConfig, "secret_keys": ["api_key", "space_id"], "other_keys": ["project", "endpoint"], "trace_instance": ArizePhoenixDataTrace, } case TracingProviderEnum.PHOENIX: from core.ops.arize_phoenix_trace.arize_phoenix_trace import ArizePhoenixDataTrace from core.ops.entities.config_entity import PhoenixConfig return { "config_class": PhoenixConfig, "secret_keys": ["api_key"], "other_keys": ["project", "endpoint"], "trace_instance": ArizePhoenixDataTrace, } case TracingProviderEnum.ALIYUN: from core.ops.aliyun_trace.aliyun_trace import AliyunDataTrace from core.ops.entities.config_entity import AliyunConfig return { "config_class": AliyunConfig, "secret_keys": ["license_key"], "other_keys": ["endpoint", "app_name"], "trace_instance": AliyunDataTrace, } case TracingProviderEnum.MLFLOW: from core.ops.entities.config_entity import MLflowConfig from core.ops.mlflow_trace.mlflow_trace import MLflowDataTrace return { "config_class": MLflowConfig, "secret_keys": ["password"], "other_keys": ["tracking_uri", "experiment_id", "username"], "trace_instance": MLflowDataTrace, } case TracingProviderEnum.DATABRICKS: from core.ops.entities.config_entity import DatabricksConfig from core.ops.mlflow_trace.mlflow_trace import MLflowDataTrace return { "config_class": DatabricksConfig, "secret_keys": ["personal_access_token", "client_secret"], "other_keys": ["host", "client_id", "experiment_id"], "trace_instance": MLflowDataTrace, } case TracingProviderEnum.TENCENT: from core.ops.entities.config_entity import TencentConfig from core.ops.tencent_trace.tencent_trace import TencentDataTrace return { "config_class": TencentConfig, "secret_keys": ["token"], "other_keys": ["endpoint", "service_name"], "trace_instance": TencentDataTrace, } case _: raise KeyError(f"Unsupported tracing provider: {provider}") provider_config_map = OpsTraceProviderConfigMap() class OpsTraceManager: ops_trace_instances_cache: LRUCache = LRUCache(maxsize=128) decrypted_configs_cache: LRUCache = LRUCache(maxsize=128) _decryption_cache_lock = threading.RLock() @classmethod def encrypt_tracing_config( cls, tenant_id: str, tracing_provider: str, tracing_config: dict, current_trace_config=None ): """ Encrypt tracing config. :param tenant_id: tenant id :param tracing_provider: tracing provider :param tracing_config: tracing config dictionary to be encrypted :param current_trace_config: current tracing configuration for keeping existing values :return: encrypted tracing configuration """ # Get the configuration class and the keys that require encryption config_class, secret_keys, other_keys = ( provider_config_map[tracing_provider]["config_class"], provider_config_map[tracing_provider]["secret_keys"], provider_config_map[tracing_provider]["other_keys"], ) new_config: dict[str, Any] = {} # Encrypt necessary keys for key in secret_keys: if key in tracing_config: if "*" in tracing_config[key]: # If the key contains '*', retain the original value from the current config if current_trace_config: new_config[key] = current_trace_config.get(key, tracing_config[key]) else: new_config[key] = tracing_config[key] else: # Otherwise, encrypt the key new_config[key] = encrypt_token(tenant_id, tracing_config[key]) for key in other_keys: new_config[key] = tracing_config.get(key, "") # Create a new instance of the config class with the new configuration encrypted_config = config_class(**new_config) return encrypted_config.model_dump() @classmethod def decrypt_tracing_config(cls, tenant_id: str, tracing_provider: str, tracing_config: dict): """ Decrypt tracing config :param tenant_id: tenant id :param tracing_provider: tracing provider :param tracing_config: tracing config :return: """ config_json = json.dumps(tracing_config, sort_keys=True) decrypted_config_key = ( tenant_id, tracing_provider, config_json, ) # First check without lock for performance cached_config = cls.decrypted_configs_cache.get(decrypted_config_key) if cached_config is not None: return dict(cached_config) with cls._decryption_cache_lock: # Second check (double-checked locking) to prevent race conditions cached_config = cls.decrypted_configs_cache.get(decrypted_config_key) if cached_config is not None: return dict(cached_config) config_class, secret_keys, other_keys = ( provider_config_map[tracing_provider]["config_class"], provider_config_map[tracing_provider]["secret_keys"], provider_config_map[tracing_provider]["other_keys"], ) new_config: dict[str, Any] = {} keys_to_decrypt = [key for key in secret_keys if key in tracing_config] if keys_to_decrypt: decrypted_values = batch_decrypt_token(tenant_id, [tracing_config[key] for key in keys_to_decrypt]) new_config.update(zip(keys_to_decrypt, decrypted_values)) for key in other_keys: new_config[key] = tracing_config.get(key, "") decrypted_config = config_class(**new_config).model_dump() cls.decrypted_configs_cache[decrypted_config_key] = decrypted_config return dict(decrypted_config) @classmethod def obfuscated_decrypt_token(cls, tracing_provider: str, decrypt_tracing_config: dict): """ Decrypt tracing config :param tracing_provider: tracing provider :param decrypt_tracing_config: tracing config :return: """ config_class, secret_keys, other_keys = ( provider_config_map[tracing_provider]["config_class"], provider_config_map[tracing_provider]["secret_keys"], provider_config_map[tracing_provider]["other_keys"], ) new_config: dict[str, Any] = {} for key in secret_keys: if key in decrypt_tracing_config: new_config[key] = obfuscated_token(decrypt_tracing_config[key]) for key in other_keys: new_config[key] = decrypt_tracing_config.get(key, "") return config_class(**new_config).model_dump() @classmethod def get_decrypted_tracing_config(cls, app_id: str, tracing_provider: str): """ Get decrypted tracing config :param app_id: app id :param tracing_provider: tracing provider :return: """ trace_config_data: TraceAppConfig | None = ( db.session.query(TraceAppConfig) .where(TraceAppConfig.app_id == app_id, TraceAppConfig.tracing_provider == tracing_provider) .first() ) if not trace_config_data: return None # decrypt_token stmt = select(App).where(App.id == app_id) app = db.session.scalar(stmt) if not app: raise ValueError("App not found") tenant_id = app.tenant_id if trace_config_data.tracing_config is None: raise ValueError("Tracing config cannot be None.") decrypt_tracing_config = cls.decrypt_tracing_config( tenant_id, tracing_provider, trace_config_data.tracing_config ) return decrypt_tracing_config @classmethod def get_ops_trace_instance( cls, app_id: Union[UUID, str] | None = None, ): """ Get ops trace through model config :param app_id: app_id :return: """ if isinstance(app_id, UUID): app_id = str(app_id) if app_id is None: return None app: App | None = db.session.query(App).where(App.id == app_id).first() if app is None: return None app_ops_trace_config = json.loads(app.tracing) if app.tracing else None if app_ops_trace_config is None: return None if not app_ops_trace_config.get("enabled"): return None tracing_provider = app_ops_trace_config.get("tracing_provider") if tracing_provider is None: return None try: provider_config_map[tracing_provider] except KeyError: return None # decrypt_token decrypt_trace_config = cls.get_decrypted_tracing_config(app_id, tracing_provider) if not decrypt_trace_config: return None trace_instance, config_class = ( provider_config_map[tracing_provider]["trace_instance"], provider_config_map[tracing_provider]["config_class"], ) decrypt_trace_config_key = json.dumps(decrypt_trace_config, sort_keys=True) tracing_instance = cls.ops_trace_instances_cache.get(decrypt_trace_config_key) if tracing_instance is None: # create new tracing_instance and update the cache if it absent tracing_instance = trace_instance(config_class(**decrypt_trace_config)) cls.ops_trace_instances_cache[decrypt_trace_config_key] = tracing_instance logger.info("new tracing_instance for app_id: %s", app_id) return tracing_instance @classmethod def get_app_config_through_message_id(cls, message_id: str): app_model_config = None message_stmt = select(Message).where(Message.id == message_id) message_data = db.session.scalar(message_stmt) if not message_data: return None conversation_id = message_data.conversation_id conversation_stmt = select(Conversation).where(Conversation.id == conversation_id) conversation_data = db.session.scalar(conversation_stmt) if not conversation_data: return None if conversation_data.app_model_config_id: config_stmt = select(AppModelConfig).where(AppModelConfig.id == conversation_data.app_model_config_id) app_model_config = db.session.scalar(config_stmt) elif conversation_data.app_model_config_id is None and conversation_data.override_model_configs: app_model_config = conversation_data.override_model_configs return app_model_config @classmethod def update_app_tracing_config(cls, app_id: str, enabled: bool, tracing_provider: str | None): """ Update app tracing config :param app_id: app id :param enabled: enabled :param tracing_provider: tracing provider (None when disabling) :return: """ # auth check if tracing_provider is not None: try: provider_config_map[tracing_provider] except KeyError: raise ValueError(f"Invalid tracing provider: {tracing_provider}") app_config: App | None = db.session.query(App).where(App.id == app_id).first() if not app_config: raise ValueError("App not found") app_config.tracing = json.dumps( { "enabled": enabled, "tracing_provider": tracing_provider, } ) db.session.commit() @classmethod def get_app_tracing_config(cls, app_id: str): """ Get app tracing config :param app_id: app id :return: """ app: App | None = db.session.query(App).where(App.id == app_id).first() if not app: raise ValueError("App not found") if not app.tracing: return {"enabled": False, "tracing_provider": None} app_trace_config = json.loads(app.tracing) return app_trace_config @staticmethod def check_trace_config_is_effective(tracing_config: dict, tracing_provider: str): """ Check trace config is effective :param tracing_config: tracing config :param tracing_provider: tracing provider :return: """ config_type, trace_instance = ( provider_config_map[tracing_provider]["config_class"], provider_config_map[tracing_provider]["trace_instance"], ) tracing_config = config_type(**tracing_config) return trace_instance(tracing_config).api_check() @staticmethod def get_trace_config_project_key(tracing_config: dict, tracing_provider: str): """ get trace config is project key :param tracing_config: tracing config :param tracing_provider: tracing provider :return: """ config_type, trace_instance = ( provider_config_map[tracing_provider]["config_class"], provider_config_map[tracing_provider]["trace_instance"], ) tracing_config = config_type(**tracing_config) return trace_instance(tracing_config).get_project_key() @staticmethod def get_trace_config_project_url(tracing_config: dict, tracing_provider: str): """ get trace config is project key :param tracing_config: tracing config :param tracing_provider: tracing provider :return: """ config_type, trace_instance = ( provider_config_map[tracing_provider]["config_class"], provider_config_map[tracing_provider]["trace_instance"], ) tracing_config = config_type(**tracing_config) return trace_instance(tracing_config).get_project_url() class TraceTask: _workflow_run_repo = None _repo_lock = threading.Lock() @classmethod def _get_workflow_run_repo(cls): if cls._workflow_run_repo is None: with cls._repo_lock: if cls._workflow_run_repo is None: session_maker = sessionmaker(bind=db.engine, expire_on_commit=False) cls._workflow_run_repo = DifyAPIRepositoryFactory.create_api_workflow_run_repository(session_maker) return cls._workflow_run_repo def __init__( self, trace_type: Any, message_id: str | None = None, workflow_execution: Optional["WorkflowExecution"] = None, conversation_id: str | None = None, user_id: str | None = None, timer: Any | None = None, **kwargs, ): self.trace_type = trace_type self.message_id = message_id self.workflow_run_id = workflow_execution.id_ if workflow_execution else None self.conversation_id = conversation_id self.user_id = user_id self.timer = timer self.file_base_url = os.getenv("FILES_URL", "http://127.0.0.1:5001") self.app_id = None self.trace_id = None self.kwargs = kwargs external_trace_id = kwargs.get("external_trace_id") if external_trace_id: self.trace_id = external_trace_id def execute(self): return self.preprocess() def preprocess(self): preprocess_map = { TraceTaskName.CONVERSATION_TRACE: lambda: self.conversation_trace(**self.kwargs), TraceTaskName.WORKFLOW_TRACE: lambda: self.workflow_trace( workflow_run_id=self.workflow_run_id, conversation_id=self.conversation_id, user_id=self.user_id ), TraceTaskName.MESSAGE_TRACE: lambda: self.message_trace(message_id=self.message_id), TraceTaskName.MODERATION_TRACE: lambda: self.moderation_trace( message_id=self.message_id, timer=self.timer, **self.kwargs ), TraceTaskName.SUGGESTED_QUESTION_TRACE: lambda: self.suggested_question_trace( message_id=self.message_id, timer=self.timer, **self.kwargs ), TraceTaskName.DATASET_RETRIEVAL_TRACE: lambda: self.dataset_retrieval_trace( message_id=self.message_id, timer=self.timer, **self.kwargs ), TraceTaskName.TOOL_TRACE: lambda: self.tool_trace( message_id=self.message_id, timer=self.timer, **self.kwargs ), TraceTaskName.GENERATE_NAME_TRACE: lambda: self.generate_name_trace( conversation_id=self.conversation_id, timer=self.timer, **self.kwargs ), TraceTaskName.NODE_EXECUTION_TRACE: lambda: self.node_execution_trace(**self.kwargs), TraceTaskName.DRAFT_NODE_EXECUTION_TRACE: lambda: self.draft_node_execution_trace(**self.kwargs), } return preprocess_map.get(self.trace_type, lambda: None)() # process methods for different trace types def conversation_trace(self, **kwargs): return kwargs def workflow_trace( self, *, workflow_run_id: str | None, conversation_id: str | None, user_id: str | None, ): if not workflow_run_id: return {} workflow_run_repo = self._get_workflow_run_repo() workflow_run = workflow_run_repo.get_workflow_run_by_id_without_tenant(run_id=workflow_run_id) if not workflow_run: raise ValueError("Workflow run not found") workflow_id = workflow_run.workflow_id tenant_id = workflow_run.tenant_id workflow_run_id = workflow_run.id workflow_run_elapsed_time = workflow_run.elapsed_time workflow_run_status = workflow_run.status workflow_run_inputs = workflow_run.inputs_dict workflow_run_outputs = workflow_run.outputs_dict workflow_run_version = workflow_run.version error = workflow_run.error or "" total_tokens = workflow_run.total_tokens file_list = workflow_run_inputs.get("sys.file") or [] query = workflow_run_inputs.get("query") or workflow_run_inputs.get("sys.query") or "" with Session(db.engine) as session: # get workflow_app_log_id workflow_app_log_data_stmt = select(WorkflowAppLog.id).where( WorkflowAppLog.tenant_id == tenant_id, WorkflowAppLog.app_id == workflow_run.app_id, WorkflowAppLog.workflow_run_id == workflow_run.id, ) workflow_app_log_id = session.scalar(workflow_app_log_data_stmt) # get message_id message_id = None if conversation_id: message_data_stmt = select(Message.id).where( Message.conversation_id == conversation_id, Message.workflow_run_id == workflow_run_id, ) message_id = session.scalar(message_data_stmt) app_name, workspace_name = _lookup_app_and_workspace_names(workflow_run.app_id, tenant_id) metadata: dict[str, Any] = { "workflow_id": workflow_id, "conversation_id": conversation_id, "workflow_run_id": workflow_run_id, "tenant_id": tenant_id, "elapsed_time": workflow_run_elapsed_time, "status": workflow_run_status, "version": workflow_run_version, "total_tokens": total_tokens, "file_list": file_list, "triggered_from": workflow_run.triggered_from, "user_id": user_id, "app_id": workflow_run.app_id, "app_name": app_name, "workspace_name": workspace_name, } parent_trace_context = self.kwargs.get("parent_trace_context") if parent_trace_context: metadata["parent_trace_context"] = parent_trace_context workflow_trace_info = WorkflowTraceInfo( trace_id=self.trace_id, workflow_data=workflow_run.to_dict(), conversation_id=conversation_id, workflow_id=workflow_id, tenant_id=tenant_id, workflow_run_id=workflow_run_id, workflow_run_elapsed_time=workflow_run_elapsed_time, workflow_run_status=workflow_run_status, workflow_run_inputs=workflow_run_inputs, workflow_run_outputs=workflow_run_outputs, workflow_run_version=workflow_run_version, error=error, total_tokens=total_tokens, file_list=file_list, query=query, metadata=metadata, workflow_app_log_id=workflow_app_log_id, message_id=message_id, start_time=workflow_run.created_at, end_time=workflow_run.finished_at, ) return workflow_trace_info def message_trace(self, message_id: str | None): if not message_id: return {} message_data = get_message_data(message_id) if not message_data: return {} conversation_mode_stmt = select(Conversation.mode).where(Conversation.id == message_data.conversation_id) conversation_mode = db.session.scalars(conversation_mode_stmt).all() if not conversation_mode or len(conversation_mode) == 0: return {} conversation_mode = conversation_mode[0] created_at = message_data.created_at inputs = message_data.message # get message file data message_file_data = db.session.query(MessageFile).filter_by(message_id=message_id).first() file_list = [] if message_file_data and message_file_data.url is not None: file_url = f"{self.file_base_url}/{message_file_data.url}" if message_file_data else "" file_list.append(file_url) streaming_metrics = self._extract_streaming_metrics(message_data) tenant_id = "" with Session(db.engine) as session: tid = session.scalar(select(App.tenant_id).where(App.id == message_data.app_id)) if tid: tenant_id = str(tid) app_name, workspace_name = _lookup_app_and_workspace_names(message_data.app_id, tenant_id) metadata = { "conversation_id": message_data.conversation_id, "ls_provider": message_data.model_provider, "ls_model_name": message_data.model_id, "status": message_data.status, "from_end_user_id": message_data.from_end_user_id, "from_account_id": message_data.from_account_id, "agent_based": message_data.agent_based, "workflow_run_id": message_data.workflow_run_id, "from_source": message_data.from_source, "message_id": message_id, "tenant_id": tenant_id, "app_id": message_data.app_id, "user_id": message_data.from_end_user_id or message_data.from_account_id, "app_name": app_name, "workspace_name": workspace_name, } message_tokens = message_data.message_tokens message_trace_info = MessageTraceInfo( trace_id=self.trace_id, message_id=message_id, message_data=message_data.to_dict(), conversation_model=conversation_mode, message_tokens=message_tokens, answer_tokens=message_data.answer_tokens, total_tokens=message_tokens + message_data.answer_tokens, error=message_data.error or "", inputs=inputs, outputs=message_data.answer, file_list=file_list, start_time=created_at, end_time=created_at + timedelta(seconds=message_data.provider_response_latency), metadata=metadata, message_file_data=message_file_data, conversation_mode=conversation_mode, gen_ai_server_time_to_first_token=streaming_metrics.get("gen_ai_server_time_to_first_token"), llm_streaming_time_to_generate=streaming_metrics.get("llm_streaming_time_to_generate"), is_streaming_request=streaming_metrics.get("is_streaming_request", False), ) return message_trace_info def moderation_trace(self, message_id, timer, **kwargs): moderation_result = kwargs.get("moderation_result") if not moderation_result: return {} inputs = kwargs.get("inputs") message_data = get_message_data(message_id) if not message_data: return {} metadata = { "message_id": message_id, "action": moderation_result.action, "preset_response": moderation_result.preset_response, "query": moderation_result.query, } # get workflow_app_log_id workflow_app_log_id = None if message_data.workflow_run_id: workflow_app_log_data = ( db.session.query(WorkflowAppLog).filter_by(workflow_run_id=message_data.workflow_run_id).first() ) workflow_app_log_id = str(workflow_app_log_data.id) if workflow_app_log_data else None moderation_trace_info = ModerationTraceInfo( trace_id=self.trace_id, message_id=workflow_app_log_id or message_id, inputs=inputs, message_data=message_data.to_dict(), flagged=moderation_result.flagged, action=moderation_result.action, preset_response=moderation_result.preset_response, query=moderation_result.query, start_time=timer.get("start"), end_time=timer.get("end"), metadata=metadata, ) return moderation_trace_info def suggested_question_trace(self, message_id, timer, **kwargs): suggested_question = kwargs.get("suggested_question", []) message_data = get_message_data(message_id) if not message_data: return {} metadata = { "message_id": message_id, "ls_provider": message_data.model_provider, "ls_model_name": message_data.model_id, "status": message_data.status, "from_end_user_id": message_data.from_end_user_id, "from_account_id": message_data.from_account_id, "agent_based": message_data.agent_based, "workflow_run_id": message_data.workflow_run_id, "from_source": message_data.from_source, } # get workflow_app_log_id workflow_app_log_id = None if message_data.workflow_run_id: workflow_app_log_data = ( db.session.query(WorkflowAppLog).filter_by(workflow_run_id=message_data.workflow_run_id).first() ) workflow_app_log_id = str(workflow_app_log_data.id) if workflow_app_log_data else None suggested_question_trace_info = SuggestedQuestionTraceInfo( trace_id=self.trace_id, message_id=workflow_app_log_id or message_id, message_data=message_data.to_dict(), inputs=message_data.message, outputs=message_data.answer, start_time=timer.get("start"), end_time=timer.get("end"), metadata=metadata, total_tokens=message_data.message_tokens + message_data.answer_tokens, status=message_data.status, error=message_data.error, from_account_id=message_data.from_account_id, agent_based=message_data.agent_based, from_source=message_data.from_source, model_provider=message_data.model_provider, model_id=message_data.model_id, suggested_question=suggested_question, level=message_data.status, status_message=message_data.error, ) return suggested_question_trace_info def dataset_retrieval_trace(self, message_id, timer, **kwargs): documents = kwargs.get("documents") message_data = get_message_data(message_id) if not message_data: return {} tenant_id = "" with Session(db.engine) as session: tid = session.scalar(select(App.tenant_id).where(App.id == message_data.app_id)) if tid: tenant_id = str(tid) app_name, workspace_name = _lookup_app_and_workspace_names(message_data.app_id, tenant_id) doc_list = [doc.model_dump() for doc in documents] if documents else [] dataset_ids: set[str] = set() for doc in doc_list: doc_meta = doc.get("metadata") or {} did = doc_meta.get("dataset_id") if did: dataset_ids.add(did) embedding_models: dict[str, dict[str, str]] = {} if dataset_ids: with Session(db.engine) as session: rows = session.execute( select(Dataset.id, Dataset.embedding_model, Dataset.embedding_model_provider).where( Dataset.id.in_(list(dataset_ids)) ) ).all() for row in rows: embedding_models[str(row[0])] = { "embedding_model": row[1] or "", "embedding_model_provider": row[2] or "", } metadata = { "message_id": message_id, "ls_provider": message_data.model_provider, "ls_model_name": message_data.model_id, "status": message_data.status, "from_end_user_id": message_data.from_end_user_id, "from_account_id": message_data.from_account_id, "agent_based": message_data.agent_based, "workflow_run_id": message_data.workflow_run_id, "from_source": message_data.from_source, "tenant_id": tenant_id, "app_id": message_data.app_id, "user_id": message_data.from_end_user_id or message_data.from_account_id, "app_name": app_name, "workspace_name": workspace_name, "embedding_models": embedding_models, } dataset_retrieval_trace_info = DatasetRetrievalTraceInfo( trace_id=self.trace_id, message_id=message_id, inputs=message_data.query or message_data.inputs, documents=doc_list, start_time=timer.get("start"), end_time=timer.get("end"), metadata=metadata, message_data=message_data.to_dict(), error=kwargs.get("error"), ) return dataset_retrieval_trace_info def tool_trace(self, message_id, timer, **kwargs): tool_name = kwargs.get("tool_name", "") tool_inputs = kwargs.get("tool_inputs", {}) tool_outputs = kwargs.get("tool_outputs", {}) message_data = get_message_data(message_id) if not message_data: return {} tool_config = {} time_cost = 0 error = None tool_parameters = {} created_time = message_data.created_at end_time = message_data.updated_at agent_thoughts = message_data.agent_thoughts for agent_thought in agent_thoughts: if tool_name in agent_thought.tools: created_time = agent_thought.created_at tool_meta_data = agent_thought.tool_meta.get(tool_name, {}) tool_config = tool_meta_data.get("tool_config", {}) time_cost = tool_meta_data.get("time_cost", 0) end_time = created_time + timedelta(seconds=time_cost) error = tool_meta_data.get("error", "") tool_parameters = tool_meta_data.get("tool_parameters", {}) metadata = { "message_id": message_id, "tool_name": tool_name, "tool_inputs": tool_inputs, "tool_outputs": tool_outputs, "tool_config": tool_config, "time_cost": time_cost, "error": error, "tool_parameters": tool_parameters, } file_url = "" message_file_data = db.session.query(MessageFile).filter_by(message_id=message_id).first() if message_file_data: message_file_id = message_file_data.id if message_file_data else None type = message_file_data.type created_by_role = message_file_data.created_by_role created_user_id = message_file_data.created_by file_url = f"{self.file_base_url}/{message_file_data.url}" metadata.update( { "message_file_id": message_file_id, "created_by_role": created_by_role, "created_user_id": created_user_id, "type": type, } ) tool_trace_info = ToolTraceInfo( trace_id=self.trace_id, message_id=message_id, message_data=message_data.to_dict(), tool_name=tool_name, start_time=timer.get("start") if timer else created_time, end_time=timer.get("end") if timer else end_time, tool_inputs=tool_inputs, tool_outputs=tool_outputs, metadata=metadata, message_file_data=message_file_data, error=error, inputs=message_data.message, outputs=message_data.answer, tool_config=tool_config, time_cost=time_cost, tool_parameters=tool_parameters, file_url=file_url, ) return tool_trace_info def generate_name_trace(self, conversation_id, timer, **kwargs): generate_conversation_name = kwargs.get("generate_conversation_name") inputs = kwargs.get("inputs") tenant_id = kwargs.get("tenant_id") if not tenant_id: return {} start_time = timer.get("start") end_time = timer.get("end") metadata = { "conversation_id": conversation_id, "tenant_id": tenant_id, } generate_name_trace_info = GenerateNameTraceInfo( trace_id=self.trace_id, conversation_id=conversation_id, inputs=inputs, outputs=generate_conversation_name, start_time=start_time, end_time=end_time, metadata=metadata, tenant_id=tenant_id, ) return generate_name_trace_info def node_execution_trace(self, **kwargs) -> WorkflowNodeTraceInfo | dict: node_data: dict = kwargs.get("node_execution_data", {}) if not node_data: return {} app_name, workspace_name = _lookup_app_and_workspace_names(node_data.get("app_id"), node_data.get("tenant_id")) credential_name = _lookup_credential_name( node_data.get("credential_id"), node_data.get("credential_provider_type") ) metadata: dict[str, Any] = { "tenant_id": node_data.get("tenant_id"), "app_id": node_data.get("app_id"), "app_name": app_name, "workspace_name": workspace_name, "user_id": node_data.get("user_id"), "dataset_ids": node_data.get("dataset_ids"), "dataset_names": node_data.get("dataset_names"), "plugin_name": node_data.get("plugin_name"), "credential_name": credential_name, } parent_trace_context = node_data.get("parent_trace_context") if parent_trace_context: metadata["parent_trace_context"] = parent_trace_context message_id: str | None = None conversation_id = node_data.get("conversation_id") workflow_execution_id = node_data.get("workflow_execution_id") if conversation_id and workflow_execution_id and not parent_trace_context: with Session(db.engine) as session: msg_id = session.scalar( select(Message.id).where( Message.conversation_id == conversation_id, Message.workflow_run_id == workflow_execution_id, ) ) if msg_id: message_id = str(msg_id) metadata["message_id"] = message_id return WorkflowNodeTraceInfo( trace_id=self.trace_id, message_id=message_id, start_time=node_data.get("created_at"), end_time=node_data.get("finished_at"), metadata=metadata, workflow_id=node_data.get("workflow_id", ""), workflow_run_id=node_data.get("workflow_execution_id", ""), tenant_id=node_data.get("tenant_id", ""), node_execution_id=node_data.get("node_execution_id", ""), node_id=node_data.get("node_id", ""), node_type=node_data.get("node_type", ""), title=node_data.get("title", ""), status=node_data.get("status", ""), error=node_data.get("error"), elapsed_time=node_data.get("elapsed_time", 0.0), index=node_data.get("index", 0), predecessor_node_id=node_data.get("predecessor_node_id"), total_tokens=node_data.get("total_tokens", 0), total_price=node_data.get("total_price", 0.0), currency=node_data.get("currency"), model_provider=node_data.get("model_provider"), model_name=node_data.get("model_name"), prompt_tokens=node_data.get("prompt_tokens"), completion_tokens=node_data.get("completion_tokens"), tool_name=node_data.get("tool_name"), iteration_id=node_data.get("iteration_id"), iteration_index=node_data.get("iteration_index"), loop_id=node_data.get("loop_id"), loop_index=node_data.get("loop_index"), parallel_id=node_data.get("parallel_id"), node_inputs=node_data.get("node_inputs"), node_outputs=node_data.get("node_outputs"), process_data=node_data.get("process_data"), ) def draft_node_execution_trace(self, **kwargs) -> DraftNodeExecutionTrace | dict: node_trace = self.node_execution_trace(**kwargs) if not node_trace or not isinstance(node_trace, WorkflowNodeTraceInfo): return node_trace return DraftNodeExecutionTrace(**node_trace.model_dump()) def _extract_streaming_metrics(self, message_data) -> dict: if not message_data.message_metadata: return {} try: metadata = json.loads(message_data.message_metadata) usage = metadata.get("usage", {}) time_to_first_token = usage.get("time_to_first_token") time_to_generate = usage.get("time_to_generate") return { "gen_ai_server_time_to_first_token": time_to_first_token, "llm_streaming_time_to_generate": time_to_generate, "is_streaming_request": time_to_first_token is not None, } except (json.JSONDecodeError, AttributeError): return {} trace_manager_timer: threading.Timer | None = None trace_manager_queue: queue.Queue = queue.Queue() trace_manager_interval = int(os.getenv("TRACE_QUEUE_MANAGER_INTERVAL", 5)) trace_manager_batch_size = int(os.getenv("TRACE_QUEUE_MANAGER_BATCH_SIZE", 100)) class TraceQueueManager: def __init__(self, app_id=None, user_id=None): global trace_manager_timer self.app_id = app_id self.user_id = user_id self.trace_instance = OpsTraceManager.get_ops_trace_instance(app_id) self.flask_app = current_app._get_current_object() # type: ignore from enterprise.telemetry.exporter import is_enterprise_telemetry_enabled self._enterprise_telemetry_enabled = is_enterprise_telemetry_enabled() if trace_manager_timer is None: self.start_timer() def add_trace_task(self, trace_task: TraceTask): global trace_manager_timer, trace_manager_queue try: if self._enterprise_telemetry_enabled or self.trace_instance: trace_task.app_id = self.app_id trace_manager_queue.put(trace_task) except Exception: logger.exception("Error adding trace task, trace_type %s", trace_task.trace_type) finally: self.start_timer() def collect_tasks(self): global trace_manager_queue tasks: list[TraceTask] = [] while len(tasks) < trace_manager_batch_size and not trace_manager_queue.empty(): task = trace_manager_queue.get_nowait() tasks.append(task) trace_manager_queue.task_done() return tasks def run(self): try: tasks = self.collect_tasks() if tasks: self.send_to_celery(tasks) except Exception: logger.exception("Error processing trace tasks") def start_timer(self): global trace_manager_timer if trace_manager_timer is None or not trace_manager_timer.is_alive(): trace_manager_timer = threading.Timer(trace_manager_interval, self.run) trace_manager_timer.name = f"trace_manager_timer_{time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())}" trace_manager_timer.daemon = False trace_manager_timer.start() def send_to_celery(self, tasks: list[TraceTask]): with self.flask_app.app_context(): for task in tasks: if task.app_id is None: continue file_id = uuid4().hex trace_info = task.execute() task_data = TaskData( app_id=task.app_id, trace_info_type=type(trace_info).__name__, trace_info=trace_info.model_dump() if trace_info else None, ) file_path = f"{OPS_FILE_PATH}{task.app_id}/{file_id}.json" storage.save(file_path, task_data.model_dump_json().encode("utf-8")) file_info = { "file_id": file_id, "app_id": task.app_id, } process_trace_tasks.delay(file_info) # type: ignore