From 915b4ce840b3ae36669b3fab6d87863ad3d09a6b Mon Sep 17 00:00:00 2001 From: GareArc Date: Thu, 29 Jan 2026 17:06:29 -0800 Subject: [PATCH 1/2] feat: Add parent trace context propagation for workflow-as-tool hierarchy Enables distributed tracing for nested workflows across all trace providers (Langfuse, LangSmith, community providers). When a workflow invokes another workflow via workflow-as-tool, the child workflow now includes parent context attributes that allow trace systems to reconstruct the full execution tree. Changes: - Add parent_trace_context field to WorkflowTool - Set parent context in tool node when invoking workflow-as-tool - Extract and pass parent context through app generator This is a community enhancement (ungated) that improves distributed tracing for all users. Parent context includes: trace_id, node_execution_id, workflow_run_id, and app_id. --- api/core/app/apps/workflow/app_generator.py | 5 ++++- api/core/tools/workflow_as_tool/tool.py | 7 ++++++- api/core/workflow/nodes/tool/tool_node.py | 11 +++++++++++ 3 files changed, 21 insertions(+), 2 deletions(-) diff --git a/api/core/app/apps/workflow/app_generator.py b/api/core/app/apps/workflow/app_generator.py index 0165c74295..f21f40bbc5 100644 --- a/api/core/app/apps/workflow/app_generator.py +++ b/api/core/app/apps/workflow/app_generator.py @@ -141,9 +141,12 @@ class WorkflowAppGenerator(BaseAppGenerator): inputs: Mapping[str, Any] = args["inputs"] - extras = { + extras: dict[str, Any] = { **extract_external_trace_id_from_args(args), } + parent_trace_context = args.get("_parent_trace_context") + if parent_trace_context: + extras["parent_trace_context"] = parent_trace_context workflow_run_id = str(uuid.uuid4()) # FIXME (Yeuoly): we need to remove the SKIP_PREPARE_USER_INPUTS_KEY from the args # trigger shouldn't prepare user inputs diff --git a/api/core/tools/workflow_as_tool/tool.py b/api/core/tools/workflow_as_tool/tool.py index 33b15b3438..2a8cf0c280 100644 --- a/api/core/tools/workflow_as_tool/tool.py +++ b/api/core/tools/workflow_as_tool/tool.py @@ -51,6 +51,7 @@ class WorkflowTool(Tool): self.workflow_call_depth = workflow_call_depth self.label = label self._latest_usage = LLMUsage.empty_usage() + self.parent_trace_context: dict[str, str] | None = None super().__init__(entity=entity, runtime=runtime) @@ -91,11 +92,15 @@ class WorkflowTool(Tool): self._latest_usage = LLMUsage.empty_usage() + args: dict[str, Any] = {"inputs": tool_parameters, "files": files} + if self.parent_trace_context: + args["_parent_trace_context"] = self.parent_trace_context + result = generator.generate( app_model=app, workflow=workflow, user=user, - args={"inputs": tool_parameters, "files": files}, + args=args, invoke_from=self.runtime.invoke_from, streaming=False, call_depth=self.workflow_call_depth + 1, diff --git a/api/core/workflow/nodes/tool/tool_node.py b/api/core/workflow/nodes/tool/tool_node.py index 2e7ec757b4..2a6b59437c 100644 --- a/api/core/workflow/nodes/tool/tool_node.py +++ b/api/core/workflow/nodes/tool/tool_node.py @@ -105,6 +105,17 @@ class ToolNode(Node[ToolNodeData]): # get conversation id conversation_id = self.graph_runtime_state.variable_pool.get(["sys", SystemVariableKey.CONVERSATION_ID]) + from core.tools.workflow_as_tool.tool import WorkflowTool + + if isinstance(tool_runtime, WorkflowTool): + workflow_run_id_var = self.graph_runtime_state.variable_pool.get(["sys", SystemVariableKey.WORKFLOW_RUN_ID]) + tool_runtime.parent_trace_context = { + "trace_id": str(workflow_run_id_var.text) if workflow_run_id_var else "", + "parent_node_execution_id": self.execution_id, + "parent_workflow_run_id": str(workflow_run_id_var.text) if workflow_run_id_var else "", + "parent_app_id": self.app_id, + } + try: message_stream = ToolEngine.generic_invoke( tool=tool_runtime, From 8fc0cbe20de0315f29c5118001836a46ff8fe674 Mon Sep 17 00:00:00 2001 From: GareArc Date: Mon, 2 Feb 2026 15:09:44 -0800 Subject: [PATCH 2/2] feat(enterprise): Add OTEL telemetry with slim traces, metrics, and structured logs - Add EnterpriseOtelTrace handler with span emission for workflows and nodes - Implement minimal-span strategy: slim spans + detailed companion logs - Add deterministic span/trace IDs for cross-workflow trace correlation - Add metric collection at 100% accuracy (counters & histograms) - Add event handlers for app lifecycle and feedback telemetry - Add cross-workflow trace linking with parent context propagation - Add OTEL exporter with configurable sampling and privacy controls - Wire enterprise telemetry into workflow execution pipeline - Add telemetry configuration in enterprise configs --- api/app_factory.py | 2 + api/configs/app_config.py | 4 +- api/configs/enterprise/__init__.py | 36 ++ .../advanced_chat/generate_task_pipeline.py | 29 +- api/core/logging/filters.py | 32 +- api/core/ops/entities/trace_entity.py | 48 ++ api/core/ops/langfuse_trace/langfuse_trace.py | 57 +- .../ops/langsmith_trace/langsmith_trace.py | 40 +- api/core/ops/ops_trace_manager.py | 196 +++++- api/core/tools/workflow_as_tool/tool.py | 1 - .../graph_engine/layers/persistence.py | 96 +++ api/core/workflow/nodes/tool/tool_node.py | 6 +- api/enterprise/__init__.py | 0 api/enterprise/telemetry/__init__.py | 0 api/enterprise/telemetry/draft_trace.py | 77 +++ api/enterprise/telemetry/enterprise_trace.py | 570 ++++++++++++++++++ api/enterprise/telemetry/entities/__init__.py | 30 + api/enterprise/telemetry/event_handlers.py | 146 +++++ api/enterprise/telemetry/exporter.py | 200 ++++++ api/enterprise/telemetry/id_generator.py | 76 +++ api/enterprise/telemetry/telemetry_log.py | 91 +++ api/events/app_event.py | 6 + api/events/feedback_event.py | 4 + api/extensions/ext_enterprise_telemetry.py | 48 ++ api/services/app_service.py | 4 +- .../enterprise/account_deletion_sync.py | 4 +- api/services/message_service.py | 4 + api/services/workflow_service.py | 23 +- api/tasks/ops_trace_task.py | 19 +- 29 files changed, 1804 insertions(+), 45 deletions(-) create mode 100644 api/enterprise/__init__.py create mode 100644 api/enterprise/telemetry/__init__.py create mode 100644 api/enterprise/telemetry/draft_trace.py create mode 100644 api/enterprise/telemetry/enterprise_trace.py create mode 100644 api/enterprise/telemetry/entities/__init__.py create mode 100644 api/enterprise/telemetry/event_handlers.py create mode 100644 api/enterprise/telemetry/exporter.py create mode 100644 api/enterprise/telemetry/id_generator.py create mode 100644 api/enterprise/telemetry/telemetry_log.py create mode 100644 api/events/feedback_event.py create mode 100644 api/extensions/ext_enterprise_telemetry.py diff --git a/api/app_factory.py b/api/app_factory.py index f827842d68..62494ca67e 100644 --- a/api/app_factory.py +++ b/api/app_factory.py @@ -79,6 +79,7 @@ def initialize_extensions(app: DifyApp): ext_commands, ext_compress, ext_database, + ext_enterprise_telemetry, ext_forward_refs, ext_hosting_provider, ext_import_modules, @@ -125,6 +126,7 @@ def initialize_extensions(app: DifyApp): ext_blueprints, ext_commands, ext_otel, + ext_enterprise_telemetry, ext_request_logging, ext_session_factory, ] diff --git a/api/configs/app_config.py b/api/configs/app_config.py index d3b1cf9d5b..831f0a49e0 100644 --- a/api/configs/app_config.py +++ b/api/configs/app_config.py @@ -8,7 +8,7 @@ from pydantic_settings import BaseSettings, PydanticBaseSettingsSource, Settings from libs.file_utils import search_file_upwards from .deploy import DeploymentConfig -from .enterprise import EnterpriseFeatureConfig +from .enterprise import EnterpriseFeatureConfig, EnterpriseTelemetryConfig from .extra import ExtraServiceConfig from .feature import FeatureConfig from .middleware import MiddlewareConfig @@ -73,6 +73,8 @@ class DifyConfig( # Enterprise feature configs # **Before using, please contact business@dify.ai by email to inquire about licensing matters.** EnterpriseFeatureConfig, + # Enterprise telemetry configs + EnterpriseTelemetryConfig, ): model_config = SettingsConfigDict( # read from dotenv format config file diff --git a/api/configs/enterprise/__init__.py b/api/configs/enterprise/__init__.py index eda6345e14..fdd9fcfdf1 100644 --- a/api/configs/enterprise/__init__.py +++ b/api/configs/enterprise/__init__.py @@ -18,3 +18,39 @@ class EnterpriseFeatureConfig(BaseSettings): description="Allow customization of the enterprise logo.", default=False, ) + + +class EnterpriseTelemetryConfig(BaseSettings): + """ + Configuration for enterprise telemetry. + """ + + ENTERPRISE_TELEMETRY_ENABLED: bool = Field( + description="Enable enterprise telemetry collection (also requires ENTERPRISE_ENABLED=true).", + default=False, + ) + + ENTERPRISE_OTLP_ENDPOINT: str = Field( + description="Enterprise OTEL collector endpoint.", + default="", + ) + + ENTERPRISE_OTLP_HEADERS: str = Field( + description="Auth headers for OTLP export (key=value,key2=value2).", + default="", + ) + + ENTERPRISE_INCLUDE_CONTENT: bool = Field( + description="Include input/output content in traces (privacy toggle).", + default=True, + ) + + ENTERPRISE_SERVICE_NAME: str = Field( + description="Service name for OTEL resource.", + default="dify", + ) + + ENTERPRISE_OTEL_SAMPLING_RATE: float = Field( + description="Sampling rate for enterprise traces (0.0 to 1.0, default 1.0 = 100%).", + default=1.0, + ) diff --git a/api/core/app/apps/advanced_chat/generate_task_pipeline.py b/api/core/app/apps/advanced_chat/generate_task_pipeline.py index da1e9f19b6..a707053ab0 100644 --- a/api/core/app/apps/advanced_chat/generate_task_pipeline.py +++ b/api/core/app/apps/advanced_chat/generate_task_pipeline.py @@ -62,7 +62,7 @@ from core.app.task_pipeline.message_cycle_manager import MessageCycleManager from core.base.tts import AppGeneratorTTSPublisher, AudioTrunk from core.model_runtime.entities.llm_entities import LLMUsage from core.model_runtime.utils.encoders import jsonable_encoder -from core.ops.ops_trace_manager import TraceQueueManager +from core.ops.ops_trace_manager import TraceQueueManager, TraceTask, TraceTaskName from core.workflow.enums import WorkflowExecutionStatus from core.workflow.nodes import NodeType from core.workflow.repositories.draft_variable_repository import DraftVariableSaverFactory @@ -564,7 +564,6 @@ class AdvancedChatAppGenerateTaskPipeline(GraphRuntimeStateSupport): **kwargs, ) -> Generator[StreamResponse, None, None]: """Handle stop events.""" - _ = trace_manager resolved_state = None if self._workflow_run_id: resolved_state = self._resolve_graph_runtime_state(graph_runtime_state) @@ -579,8 +578,7 @@ class AdvancedChatAppGenerateTaskPipeline(GraphRuntimeStateSupport): ) with self._database_session() as session: - # Save message - self._save_message(session=session, graph_runtime_state=resolved_state) + self._save_message(session=session, graph_runtime_state=resolved_state, trace_manager=trace_manager) yield workflow_finish_resp elif event.stopped_by in ( @@ -589,8 +587,7 @@ class AdvancedChatAppGenerateTaskPipeline(GraphRuntimeStateSupport): ): # When hitting input-moderation or annotation-reply, the workflow will not start with self._database_session() as session: - # Save message - self._save_message(session=session) + self._save_message(session=session, trace_manager=trace_manager) yield self._message_end_to_stream_response() @@ -599,6 +596,7 @@ class AdvancedChatAppGenerateTaskPipeline(GraphRuntimeStateSupport): event: QueueAdvancedChatMessageEndEvent, *, graph_runtime_state: GraphRuntimeState | None = None, + trace_manager: TraceQueueManager | None = None, **kwargs, ) -> Generator[StreamResponse, None, None]: """Handle advanced chat message end events.""" @@ -616,7 +614,7 @@ class AdvancedChatAppGenerateTaskPipeline(GraphRuntimeStateSupport): # Save message with self._database_session() as session: - self._save_message(session=session, graph_runtime_state=resolved_state) + self._save_message(session=session, graph_runtime_state=resolved_state, trace_manager=trace_manager) yield self._message_end_to_stream_response() @@ -770,7 +768,13 @@ class AdvancedChatAppGenerateTaskPipeline(GraphRuntimeStateSupport): if self._conversation_name_generate_thread: logger.debug("Conversation name generation running as daemon thread") - def _save_message(self, *, session: Session, graph_runtime_state: GraphRuntimeState | None = None): + def _save_message( + self, + *, + session: Session, + graph_runtime_state: GraphRuntimeState | None = None, + trace_manager: TraceQueueManager | None = None, + ): message = self._get_message(session=session) # If there are assistant files, remove markdown image links from answer @@ -826,6 +830,15 @@ class AdvancedChatAppGenerateTaskPipeline(GraphRuntimeStateSupport): ] session.add_all(message_files) + if trace_manager: + trace_manager.add_trace_task( + TraceTask( + TraceTaskName.MESSAGE_TRACE, + conversation_id=str(message.conversation_id), + message_id=str(message.id), + ) + ) + def _seed_graph_runtime_state_from_queue_manager(self) -> None: """Bootstrap the cached runtime state from the queue manager when present.""" candidate = self._base_task_pipeline.queue_manager.graph_runtime_state diff --git a/api/core/logging/filters.py b/api/core/logging/filters.py index 1e8aa8d566..bc816eb66b 100644 --- a/api/core/logging/filters.py +++ b/api/core/logging/filters.py @@ -15,16 +15,23 @@ class TraceContextFilter(logging.Filter): """ def filter(self, record: logging.LogRecord) -> bool: - # Get trace context from OpenTelemetry - trace_id, span_id = self._get_otel_context() + # Preserve explicit trace_id set by the caller (e.g. emit_metric_only_event) + existing_trace_id = getattr(record, "trace_id", "") + if not existing_trace_id: + # Get trace context from OpenTelemetry + trace_id, span_id = self._get_otel_context() - # Set trace_id (fallback to ContextVar if no OTEL context) - if trace_id: - record.trace_id = trace_id + # Set trace_id (fallback to ContextVar if no OTEL context) + if trace_id: + record.trace_id = trace_id + else: + record.trace_id = get_trace_id() + + record.span_id = span_id or "" else: - record.trace_id = get_trace_id() - - record.span_id = span_id or "" + # Keep existing trace_id; only fill span_id if missing + if not getattr(record, "span_id", ""): + record.span_id = "" # For backward compatibility, also set req_id record.req_id = get_request_id() @@ -55,9 +62,12 @@ class IdentityContextFilter(logging.Filter): def filter(self, record: logging.LogRecord) -> bool: identity = self._extract_identity() - record.tenant_id = identity.get("tenant_id", "") - record.user_id = identity.get("user_id", "") - record.user_type = identity.get("user_type", "") + if not getattr(record, "tenant_id", ""): + record.tenant_id = identity.get("tenant_id", "") + if not getattr(record, "user_id", ""): + record.user_id = identity.get("user_id", "") + if not getattr(record, "user_type", ""): + record.user_type = identity.get("user_type", "") return True def _extract_identity(self) -> dict[str, str]: diff --git a/api/core/ops/entities/trace_entity.py b/api/core/ops/entities/trace_entity.py index 50a2cdea63..580540847c 100644 --- a/api/core/ops/entities/trace_entity.py +++ b/api/core/ops/entities/trace_entity.py @@ -114,6 +114,50 @@ class GenerateNameTraceInfo(BaseTraceInfo): tenant_id: str +class WorkflowNodeTraceInfo(BaseTraceInfo): + workflow_id: str + workflow_run_id: str + tenant_id: str + node_execution_id: str + node_id: str + node_type: str + title: str + + status: str + error: str | None = None + elapsed_time: float + + index: int + predecessor_node_id: str | None = None + + total_tokens: int = 0 + total_price: float = 0.0 + currency: str | None = None + + model_provider: str | None = None + model_name: str | None = None + prompt_tokens: int | None = None + completion_tokens: int | None = None + + tool_name: str | None = None + + iteration_id: str | None = None + iteration_index: int | None = None + loop_id: str | None = None + loop_index: int | None = None + parallel_id: str | None = None + + node_inputs: Mapping[str, Any] | None = None + node_outputs: Mapping[str, Any] | None = None + process_data: Mapping[str, Any] | None = None + + model_config = ConfigDict(protected_namespaces=()) + + +class DraftNodeExecutionTrace(WorkflowNodeTraceInfo): + pass + + class TaskData(BaseModel): app_id: str trace_info_type: str @@ -128,12 +172,15 @@ trace_info_info_map = { "DatasetRetrievalTraceInfo": DatasetRetrievalTraceInfo, "ToolTraceInfo": ToolTraceInfo, "GenerateNameTraceInfo": GenerateNameTraceInfo, + "WorkflowNodeTraceInfo": WorkflowNodeTraceInfo, + "DraftNodeExecutionTrace": DraftNodeExecutionTrace, } class TraceTaskName(StrEnum): CONVERSATION_TRACE = "conversation" WORKFLOW_TRACE = "workflow" + DRAFT_NODE_EXECUTION_TRACE = "draft_node_execution" MESSAGE_TRACE = "message" MODERATION_TRACE = "moderation" SUGGESTED_QUESTION_TRACE = "suggested_question" @@ -141,3 +188,4 @@ class TraceTaskName(StrEnum): TOOL_TRACE = "tool" GENERATE_NAME_TRACE = "generate_conversation_name" DATASOURCE_TRACE = "datasource" + NODE_EXECUTION_TRACE = "node_execution" diff --git a/api/core/ops/langfuse_trace/langfuse_trace.py b/api/core/ops/langfuse_trace/langfuse_trace.py index 4de4f403ce..422a121311 100644 --- a/api/core/ops/langfuse_trace/langfuse_trace.py +++ b/api/core/ops/langfuse_trace/langfuse_trace.py @@ -3,6 +3,7 @@ import os from datetime import datetime, timedelta from langfuse import Langfuse +from sqlalchemy import select from sqlalchemy.orm import sessionmaker from core.ops.base_trace_instance import BaseTraceInstance @@ -30,7 +31,7 @@ from core.ops.utils import filter_none_values from core.repositories import DifyCoreRepositoryFactory from core.workflow.enums import NodeType from extensions.ext_database import db -from models import EndUser, WorkflowNodeExecutionTriggeredFrom +from models import EndUser, Message, WorkflowNodeExecutionTriggeredFrom from models.enums import MessageStatus logger = logging.getLogger(__name__) @@ -71,7 +72,50 @@ class LangFuseDataTrace(BaseTraceInstance): metadata = trace_info.metadata metadata["workflow_app_log_id"] = trace_info.workflow_app_log_id - if trace_info.message_id: + # Check for parent_trace_context to detect nested workflow + parent_trace_context = trace_info.metadata.get("parent_trace_context") + + if parent_trace_context: + # Nested workflow: create span under outer trace + outer_trace_id = parent_trace_context.get("trace_id") + parent_node_execution_id = parent_trace_context.get("parent_node_execution_id") + parent_conversation_id = parent_trace_context.get("parent_conversation_id") + parent_workflow_run_id = parent_trace_context.get("parent_workflow_run_id") + + # Resolve outer trace_id: try message_id lookup first, fallback to workflow_run_id + if parent_conversation_id: + session_factory = sessionmaker(bind=db.engine) + with session_factory() as session: + message_data_stmt = select(Message.id).where( + Message.conversation_id == parent_conversation_id, + Message.workflow_run_id == parent_workflow_run_id, + ) + resolved_message_id = session.scalar(message_data_stmt) + if resolved_message_id: + outer_trace_id = resolved_message_id + else: + outer_trace_id = parent_workflow_run_id + else: + outer_trace_id = parent_workflow_run_id + + # Create inner workflow span under outer trace + workflow_span_data = LangfuseSpan( + id=trace_info.workflow_run_id, + name=TraceTaskName.WORKFLOW_TRACE, + input=dict(trace_info.workflow_run_inputs), + output=dict(trace_info.workflow_run_outputs), + trace_id=outer_trace_id, + parent_observation_id=parent_node_execution_id, + start_time=trace_info.start_time, + end_time=trace_info.end_time, + metadata=metadata, + level=LevelEnum.DEFAULT if trace_info.error == "" else LevelEnum.ERROR, + status_message=trace_info.error or "", + ) + self.add_span(langfuse_span_data=workflow_span_data) + # Use outer_trace_id for all node spans/generations + trace_id = outer_trace_id + elif trace_info.message_id: trace_id = trace_info.trace_id or trace_info.message_id name = TraceTaskName.MESSAGE_TRACE trace_data = LangfuseTrace( @@ -174,6 +218,11 @@ class LangFuseDataTrace(BaseTraceInstance): } ) + # Determine parent_observation_id for nested workflows + node_parent_observation_id = None + if parent_trace_context or trace_info.message_id: + node_parent_observation_id = trace_info.workflow_run_id + # add generation span if process_data and process_data.get("model_mode") == "chat": total_token = metadata.get("total_tokens", 0) @@ -206,7 +255,7 @@ class LangFuseDataTrace(BaseTraceInstance): metadata=metadata, level=(LevelEnum.DEFAULT if status == "succeeded" else LevelEnum.ERROR), status_message=trace_info.error or "", - parent_observation_id=trace_info.workflow_run_id if trace_info.message_id else None, + parent_observation_id=node_parent_observation_id, usage=generation_usage, ) @@ -225,7 +274,7 @@ class LangFuseDataTrace(BaseTraceInstance): metadata=metadata, level=(LevelEnum.DEFAULT if status == "succeeded" else LevelEnum.ERROR), status_message=trace_info.error or "", - parent_observation_id=trace_info.workflow_run_id if trace_info.message_id else None, + parent_observation_id=node_parent_observation_id, ) self.add_span(langfuse_span_data=span_data) diff --git a/api/core/ops/langsmith_trace/langsmith_trace.py b/api/core/ops/langsmith_trace/langsmith_trace.py index 8b8117b24c..7ca51e10ef 100644 --- a/api/core/ops/langsmith_trace/langsmith_trace.py +++ b/api/core/ops/langsmith_trace/langsmith_trace.py @@ -6,6 +6,7 @@ from typing import cast from langsmith import Client from langsmith.schemas import RunBase +from sqlalchemy import select from sqlalchemy.orm import sessionmaker from core.ops.base_trace_instance import BaseTraceInstance @@ -30,7 +31,7 @@ from core.ops.utils import filter_none_values, generate_dotted_order from core.repositories import DifyCoreRepositoryFactory from core.workflow.enums import NodeType, WorkflowNodeExecutionMetadataKey from extensions.ext_database import db -from models import EndUser, MessageFile, WorkflowNodeExecutionTriggeredFrom +from models import EndUser, Message, MessageFile, WorkflowNodeExecutionTriggeredFrom logger = logging.getLogger(__name__) @@ -64,7 +65,35 @@ class LangSmithDataTrace(BaseTraceInstance): self.generate_name_trace(trace_info) def workflow_trace(self, trace_info: WorkflowTraceInfo): - trace_id = trace_info.trace_id or trace_info.message_id or trace_info.workflow_run_id + # Check for parent_trace_context for cross-workflow linking + parent_trace_context = trace_info.metadata.get("parent_trace_context") + + if parent_trace_context: + # Inner workflow: resolve outer trace_id and link to parent node + outer_trace_id = parent_trace_context.get("parent_workflow_run_id") + + # Try to resolve message_id from conversation_id if available + if parent_trace_context.get("parent_conversation_id"): + try: + session_factory = sessionmaker(bind=db.engine) + with session_factory() as session: + message_data_stmt = select(Message.id).where( + Message.conversation_id == parent_trace_context["parent_conversation_id"], + Message.workflow_run_id == parent_trace_context["parent_workflow_run_id"], + ) + resolved_message_id = session.scalar(message_data_stmt) + if resolved_message_id: + outer_trace_id = resolved_message_id + except Exception as e: + logger.debug("Failed to resolve message_id from conversation_id: %s", str(e)) + + trace_id = outer_trace_id + parent_run_id = parent_trace_context.get("parent_node_execution_id") + else: + # Outer workflow: existing behavior + trace_id = trace_info.trace_id or trace_info.message_id or trace_info.workflow_run_id + parent_run_id = trace_info.message_id or None + if trace_info.start_time is None: trace_info.start_time = datetime.now() message_dotted_order = ( @@ -78,7 +107,8 @@ class LangSmithDataTrace(BaseTraceInstance): metadata = trace_info.metadata metadata["workflow_app_log_id"] = trace_info.workflow_app_log_id - if trace_info.message_id: + # Only create message_run for outer workflows (no parent_trace_context) + if trace_info.message_id and not parent_trace_context: message_run = LangSmithRunModel( id=trace_info.message_id, name=TraceTaskName.MESSAGE_TRACE, @@ -121,9 +151,9 @@ class LangSmithDataTrace(BaseTraceInstance): }, error=trace_info.error, tags=["workflow"], - parent_run_id=trace_info.message_id or None, + parent_run_id=parent_run_id, trace_id=trace_id, - dotted_order=workflow_dotted_order, + dotted_order=None if parent_trace_context else workflow_dotted_order, serialized=None, events=[], session_id=None, diff --git a/api/core/ops/ops_trace_manager.py b/api/core/ops/ops_trace_manager.py index f45f15a6da..c5ac634e2d 100644 --- a/api/core/ops/ops_trace_manager.py +++ b/api/core/ops/ops_trace_manager.py @@ -21,6 +21,7 @@ from core.ops.entities.config_entity import ( ) from core.ops.entities.trace_entity import ( DatasetRetrievalTraceInfo, + DraftNodeExecutionTrace, GenerateNameTraceInfo, MessageTraceInfo, ModerationTraceInfo, @@ -28,12 +29,16 @@ from core.ops.entities.trace_entity import ( TaskData, ToolTraceInfo, TraceTaskName, + WorkflowNodeTraceInfo, WorkflowTraceInfo, ) from core.ops.utils import get_message_data from extensions.ext_database import db from extensions.ext_storage import storage +from models.account import Tenant +from models.dataset import Dataset from models.model import App, AppModelConfig, Conversation, Message, MessageFile, TraceAppConfig +from models.tools import ApiToolProvider, BuiltinToolProvider, MCPToolProvider, WorkflowToolProvider from models.workflow import WorkflowAppLog from repositories.factory import DifyAPIRepositoryFactory from tasks.ops_trace_task import process_trace_tasks @@ -44,6 +49,44 @@ if TYPE_CHECKING: logger = logging.getLogger(__name__) +def _lookup_app_and_workspace_names(app_id: str | None, tenant_id: str | None) -> tuple[str, str]: + """Return (app_name, workspace_name) for the given IDs. Falls back to empty strings.""" + app_name = "" + workspace_name = "" + if not app_id and not tenant_id: + return app_name, workspace_name + with Session(db.engine) as session: + if app_id: + name = session.scalar(select(App.name).where(App.id == app_id)) + if name: + app_name = name + if tenant_id: + name = session.scalar(select(Tenant.name).where(Tenant.id == tenant_id)) + if name: + workspace_name = name + return app_name, workspace_name + + +_PROVIDER_TYPE_TO_MODEL: dict[str, type] = { + "builtin": BuiltinToolProvider, + "plugin": BuiltinToolProvider, + "api": ApiToolProvider, + "workflow": WorkflowToolProvider, + "mcp": MCPToolProvider, +} + + +def _lookup_credential_name(credential_id: str | None, provider_type: str | None) -> str: + if not credential_id: + return "" + model_cls = _PROVIDER_TYPE_TO_MODEL.get(provider_type or "") + if not model_cls: + return "" + with Session(db.engine) as session: + name = session.scalar(select(model_cls.name).where(model_cls.id == credential_id)) + return str(name) if name else "" + + class OpsTraceProviderConfigMap(collections.UserDict[str, dict[str, Any]]): def __getitem__(self, provider: str) -> dict[str, Any]: match provider: @@ -526,6 +569,8 @@ class TraceTask: TraceTaskName.GENERATE_NAME_TRACE: lambda: self.generate_name_trace( conversation_id=self.conversation_id, timer=self.timer, **self.kwargs ), + TraceTaskName.NODE_EXECUTION_TRACE: lambda: self.node_execution_trace(**self.kwargs), + TraceTaskName.DRAFT_NODE_EXECUTION_TRACE: lambda: self.draft_node_execution_trace(**self.kwargs), } return preprocess_map.get(self.trace_type, lambda: None)() @@ -581,7 +626,9 @@ class TraceTask: ) message_id = session.scalar(message_data_stmt) - metadata = { + app_name, workspace_name = _lookup_app_and_workspace_names(workflow_run.app_id, tenant_id) + + metadata: dict[str, Any] = { "workflow_id": workflow_id, "conversation_id": conversation_id, "workflow_run_id": workflow_run_id, @@ -594,8 +641,14 @@ class TraceTask: "triggered_from": workflow_run.triggered_from, "user_id": user_id, "app_id": workflow_run.app_id, + "app_name": app_name, + "workspace_name": workspace_name, } + parent_trace_context = self.kwargs.get("parent_trace_context") + if parent_trace_context: + metadata["parent_trace_context"] = parent_trace_context + workflow_trace_info = WorkflowTraceInfo( trace_id=self.trace_id, workflow_data=workflow_run.to_dict(), @@ -643,6 +696,14 @@ class TraceTask: streaming_metrics = self._extract_streaming_metrics(message_data) + tenant_id = "" + with Session(db.engine) as session: + tid = session.scalar(select(App.tenant_id).where(App.id == message_data.app_id)) + if tid: + tenant_id = str(tid) + + app_name, workspace_name = _lookup_app_and_workspace_names(message_data.app_id, tenant_id) + metadata = { "conversation_id": message_data.conversation_id, "ls_provider": message_data.model_provider, @@ -654,6 +715,11 @@ class TraceTask: "workflow_run_id": message_data.workflow_run_id, "from_source": message_data.from_source, "message_id": message_id, + "tenant_id": tenant_id, + "app_id": message_data.app_id, + "user_id": message_data.from_end_user_id or message_data.from_account_id, + "app_name": app_name, + "workspace_name": workspace_name, } message_tokens = message_data.message_tokens @@ -776,6 +842,36 @@ class TraceTask: if not message_data: return {} + tenant_id = "" + with Session(db.engine) as session: + tid = session.scalar(select(App.tenant_id).where(App.id == message_data.app_id)) + if tid: + tenant_id = str(tid) + + app_name, workspace_name = _lookup_app_and_workspace_names(message_data.app_id, tenant_id) + + doc_list = [doc.model_dump() for doc in documents] if documents else [] + dataset_ids: set[str] = set() + for doc in doc_list: + doc_meta = doc.get("metadata") or {} + did = doc_meta.get("dataset_id") + if did: + dataset_ids.add(did) + + embedding_models: dict[str, dict[str, str]] = {} + if dataset_ids: + with Session(db.engine) as session: + rows = session.execute( + select(Dataset.id, Dataset.embedding_model, Dataset.embedding_model_provider).where( + Dataset.id.in_(list(dataset_ids)) + ) + ).all() + for row in rows: + embedding_models[str(row[0])] = { + "embedding_model": row[1] or "", + "embedding_model_provider": row[2] or "", + } + metadata = { "message_id": message_id, "ls_provider": message_data.model_provider, @@ -786,13 +882,19 @@ class TraceTask: "agent_based": message_data.agent_based, "workflow_run_id": message_data.workflow_run_id, "from_source": message_data.from_source, + "tenant_id": tenant_id, + "app_id": message_data.app_id, + "user_id": message_data.from_end_user_id or message_data.from_account_id, + "app_name": app_name, + "workspace_name": workspace_name, + "embedding_models": embedding_models, } dataset_retrieval_trace_info = DatasetRetrievalTraceInfo( trace_id=self.trace_id, message_id=message_id, inputs=message_data.query or message_data.inputs, - documents=[doc.model_dump() for doc in documents] if documents else [], + documents=doc_list, start_time=timer.get("start"), end_time=timer.get("end"), metadata=metadata, @@ -903,6 +1005,90 @@ class TraceTask: return generate_name_trace_info + def node_execution_trace(self, **kwargs) -> WorkflowNodeTraceInfo | dict: + node_data: dict = kwargs.get("node_execution_data", {}) + if not node_data: + return {} + + app_name, workspace_name = _lookup_app_and_workspace_names(node_data.get("app_id"), node_data.get("tenant_id")) + + credential_name = _lookup_credential_name( + node_data.get("credential_id"), node_data.get("credential_provider_type") + ) + + metadata: dict[str, Any] = { + "tenant_id": node_data.get("tenant_id"), + "app_id": node_data.get("app_id"), + "app_name": app_name, + "workspace_name": workspace_name, + "user_id": node_data.get("user_id"), + "dataset_ids": node_data.get("dataset_ids"), + "dataset_names": node_data.get("dataset_names"), + "plugin_name": node_data.get("plugin_name"), + "credential_name": credential_name, + } + + parent_trace_context = node_data.get("parent_trace_context") + if parent_trace_context: + metadata["parent_trace_context"] = parent_trace_context + + message_id: str | None = None + conversation_id = node_data.get("conversation_id") + workflow_execution_id = node_data.get("workflow_execution_id") + if conversation_id and workflow_execution_id and not parent_trace_context: + with Session(db.engine) as session: + msg_id = session.scalar( + select(Message.id).where( + Message.conversation_id == conversation_id, + Message.workflow_run_id == workflow_execution_id, + ) + ) + if msg_id: + message_id = str(msg_id) + metadata["message_id"] = message_id + + return WorkflowNodeTraceInfo( + trace_id=self.trace_id, + message_id=message_id, + start_time=node_data.get("created_at"), + end_time=node_data.get("finished_at"), + metadata=metadata, + workflow_id=node_data.get("workflow_id", ""), + workflow_run_id=node_data.get("workflow_execution_id", ""), + tenant_id=node_data.get("tenant_id", ""), + node_execution_id=node_data.get("node_execution_id", ""), + node_id=node_data.get("node_id", ""), + node_type=node_data.get("node_type", ""), + title=node_data.get("title", ""), + status=node_data.get("status", ""), + error=node_data.get("error"), + elapsed_time=node_data.get("elapsed_time", 0.0), + index=node_data.get("index", 0), + predecessor_node_id=node_data.get("predecessor_node_id"), + total_tokens=node_data.get("total_tokens", 0), + total_price=node_data.get("total_price", 0.0), + currency=node_data.get("currency"), + model_provider=node_data.get("model_provider"), + model_name=node_data.get("model_name"), + prompt_tokens=node_data.get("prompt_tokens"), + completion_tokens=node_data.get("completion_tokens"), + tool_name=node_data.get("tool_name"), + iteration_id=node_data.get("iteration_id"), + iteration_index=node_data.get("iteration_index"), + loop_id=node_data.get("loop_id"), + loop_index=node_data.get("loop_index"), + parallel_id=node_data.get("parallel_id"), + node_inputs=node_data.get("node_inputs"), + node_outputs=node_data.get("node_outputs"), + process_data=node_data.get("process_data"), + ) + + def draft_node_execution_trace(self, **kwargs) -> DraftNodeExecutionTrace | dict: + node_trace = self.node_execution_trace(**kwargs) + if not node_trace or not isinstance(node_trace, WorkflowNodeTraceInfo): + return node_trace + return DraftNodeExecutionTrace(**node_trace.model_dump()) + def _extract_streaming_metrics(self, message_data) -> dict: if not message_data.message_metadata: return {} @@ -936,13 +1122,17 @@ class TraceQueueManager: self.user_id = user_id self.trace_instance = OpsTraceManager.get_ops_trace_instance(app_id) self.flask_app = current_app._get_current_object() # type: ignore + + from enterprise.telemetry.exporter import is_enterprise_telemetry_enabled + + self._enterprise_telemetry_enabled = is_enterprise_telemetry_enabled() if trace_manager_timer is None: self.start_timer() def add_trace_task(self, trace_task: TraceTask): global trace_manager_timer, trace_manager_queue try: - if self.trace_instance: + if self._enterprise_telemetry_enabled or self.trace_instance: trace_task.app_id = self.app_id trace_manager_queue.put(trace_task) except Exception: diff --git a/api/core/tools/workflow_as_tool/tool.py b/api/core/tools/workflow_as_tool/tool.py index 2a8cf0c280..0106f60c0d 100644 --- a/api/core/tools/workflow_as_tool/tool.py +++ b/api/core/tools/workflow_as_tool/tool.py @@ -5,7 +5,6 @@ import logging from collections.abc import Generator, Mapping, Sequence from typing import Any, cast -from flask import has_request_context from sqlalchemy import select from core.db.session_factory import session_factory diff --git a/api/core/workflow/graph_engine/layers/persistence.py b/api/core/workflow/graph_engine/layers/persistence.py index e81df4f3b7..46b6f12b38 100644 --- a/api/core/workflow/graph_engine/layers/persistence.py +++ b/api/core/workflow/graph_engine/layers/persistence.py @@ -371,6 +371,7 @@ class WorkflowPersistenceLayer(GraphEngineLayer): self._workflow_node_execution_repository.save(domain_execution) self._workflow_node_execution_repository.save_execution_data(domain_execution) + self._enqueue_node_trace_task(domain_execution) def _fail_running_node_executions(self, *, error_message: str) -> None: now = naive_utc_now() @@ -388,8 +389,10 @@ class WorkflowPersistenceLayer(GraphEngineLayer): conversation_id = self._system_variables().get(SystemVariableKey.CONVERSATION_ID.value) external_trace_id = None + parent_trace_context = None if isinstance(self._application_generate_entity, (WorkflowAppGenerateEntity, AdvancedChatAppGenerateEntity)): external_trace_id = self._application_generate_entity.extras.get("external_trace_id") + parent_trace_context = self._application_generate_entity.extras.get("parent_trace_context") trace_task = TraceTask( TraceTaskName.WORKFLOW_TRACE, @@ -397,6 +400,99 @@ class WorkflowPersistenceLayer(GraphEngineLayer): conversation_id=conversation_id, user_id=self._trace_manager.user_id, external_trace_id=external_trace_id, + parent_trace_context=parent_trace_context, + ) + self._trace_manager.add_trace_task(trace_task) + + def _enqueue_node_trace_task(self, domain_execution: WorkflowNodeExecution) -> None: + if not self._trace_manager: + return + + from enterprise.telemetry.exporter import is_enterprise_telemetry_enabled + + if not is_enterprise_telemetry_enabled(): + return + + execution = self._get_workflow_execution() + meta = domain_execution.metadata or {} + + parent_trace_context = None + if isinstance(self._application_generate_entity, (WorkflowAppGenerateEntity, AdvancedChatAppGenerateEntity)): + parent_trace_context = self._application_generate_entity.extras.get("parent_trace_context") + + node_data: dict[str, Any] = { + "workflow_id": domain_execution.workflow_id, + "workflow_execution_id": execution.id_, + "tenant_id": self._application_generate_entity.app_config.tenant_id, + "app_id": self._application_generate_entity.app_config.app_id, + "node_execution_id": domain_execution.id, + "node_id": domain_execution.node_id, + "node_type": str(domain_execution.node_type.value), + "title": domain_execution.title, + "status": str(domain_execution.status.value), + "error": domain_execution.error, + "elapsed_time": domain_execution.elapsed_time, + "index": domain_execution.index, + "predecessor_node_id": domain_execution.predecessor_node_id, + "created_at": domain_execution.created_at, + "finished_at": domain_execution.finished_at, + "total_tokens": meta.get(WorkflowNodeExecutionMetadataKey.TOTAL_TOKENS, 0), + "total_price": meta.get(WorkflowNodeExecutionMetadataKey.TOTAL_PRICE, 0.0), + "currency": meta.get(WorkflowNodeExecutionMetadataKey.CURRENCY), + "tool_name": (meta.get(WorkflowNodeExecutionMetadataKey.TOOL_INFO) or {}).get("tool_name") + if isinstance(meta.get(WorkflowNodeExecutionMetadataKey.TOOL_INFO), dict) + else None, + "iteration_id": meta.get(WorkflowNodeExecutionMetadataKey.ITERATION_ID), + "iteration_index": meta.get(WorkflowNodeExecutionMetadataKey.ITERATION_INDEX), + "loop_id": meta.get(WorkflowNodeExecutionMetadataKey.LOOP_ID), + "loop_index": meta.get(WorkflowNodeExecutionMetadataKey.LOOP_INDEX), + "parallel_id": meta.get(WorkflowNodeExecutionMetadataKey.PARALLEL_ID), + "node_inputs": dict(domain_execution.inputs) if domain_execution.inputs else None, + "node_outputs": dict(domain_execution.outputs) if domain_execution.outputs else None, + "process_data": dict(domain_execution.process_data) if domain_execution.process_data else None, + } + node_data["invoke_from"] = self._application_generate_entity.invoke_from.value + node_data["user_id"] = self._system_variables().get(SystemVariableKey.USER_ID.value) + + if domain_execution.node_type.value == "knowledge-retrieval" and domain_execution.outputs: + results = domain_execution.outputs.get("result") or [] + dataset_ids: list[str] = [] + dataset_names: list[str] = [] + for doc in results: + if not isinstance(doc, dict): + continue + doc_meta = doc.get("metadata") or {} + did = doc_meta.get("dataset_id") + dname = doc_meta.get("dataset_name") + if did and did not in dataset_ids: + dataset_ids.append(did) + if dname and dname not in dataset_names: + dataset_names.append(dname) + if dataset_ids: + node_data["dataset_ids"] = dataset_ids + if dataset_names: + node_data["dataset_names"] = dataset_names + + tool_info = meta.get(WorkflowNodeExecutionMetadataKey.TOOL_INFO) + if isinstance(tool_info, dict): + plugin_id = tool_info.get("plugin_unique_identifier") + if plugin_id: + node_data["plugin_name"] = plugin_id + credential_id = tool_info.get("credential_id") + if credential_id: + node_data["credential_id"] = credential_id + node_data["credential_provider_type"] = tool_info.get("provider_type") + + conversation_id = self._system_variables().get(SystemVariableKey.CONVERSATION_ID.value) + if conversation_id: + node_data["conversation_id"] = conversation_id + + if parent_trace_context: + node_data["parent_trace_context"] = parent_trace_context + + trace_task = TraceTask( + TraceTaskName.NODE_EXECUTION_TRACE, + node_execution_data=node_data, ) self._trace_manager.add_trace_task(trace_task) diff --git a/api/core/workflow/nodes/tool/tool_node.py b/api/core/workflow/nodes/tool/tool_node.py index 2a6b59437c..f80e2f8f87 100644 --- a/api/core/workflow/nodes/tool/tool_node.py +++ b/api/core/workflow/nodes/tool/tool_node.py @@ -61,6 +61,7 @@ class ToolNode(Node[ToolNodeData]): "provider_type": self.node_data.provider_type.value, "provider_id": self.node_data.provider_id, "plugin_unique_identifier": self.node_data.plugin_unique_identifier, + "credential_id": self.node_data.credential_id, } # get tool runtime @@ -108,12 +109,15 @@ class ToolNode(Node[ToolNodeData]): from core.tools.workflow_as_tool.tool import WorkflowTool if isinstance(tool_runtime, WorkflowTool): - workflow_run_id_var = self.graph_runtime_state.variable_pool.get(["sys", SystemVariableKey.WORKFLOW_RUN_ID]) + workflow_run_id_var = self.graph_runtime_state.variable_pool.get( + ["sys", SystemVariableKey.WORKFLOW_EXECUTION_ID] + ) tool_runtime.parent_trace_context = { "trace_id": str(workflow_run_id_var.text) if workflow_run_id_var else "", "parent_node_execution_id": self.execution_id, "parent_workflow_run_id": str(workflow_run_id_var.text) if workflow_run_id_var else "", "parent_app_id": self.app_id, + "parent_conversation_id": conversation_id.text if conversation_id else None, } try: diff --git a/api/enterprise/__init__.py b/api/enterprise/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/api/enterprise/telemetry/__init__.py b/api/enterprise/telemetry/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/api/enterprise/telemetry/draft_trace.py b/api/enterprise/telemetry/draft_trace.py new file mode 100644 index 0000000000..cbc7c66cd8 --- /dev/null +++ b/api/enterprise/telemetry/draft_trace.py @@ -0,0 +1,77 @@ +from __future__ import annotations + +from collections.abc import Mapping +from typing import Any + +from core.ops.entities.trace_entity import TraceTaskName +from core.ops.ops_trace_manager import TraceQueueManager, TraceTask +from core.workflow.enums import WorkflowNodeExecutionMetadataKey +from enterprise.telemetry.exporter import is_enterprise_telemetry_enabled +from models.workflow import WorkflowNodeExecutionModel + + +def enqueue_draft_node_execution_trace( + *, + execution: WorkflowNodeExecutionModel, + outputs: Mapping[str, Any] | None, + workflow_execution_id: str | None, + user_id: str, +) -> None: + if not is_enterprise_telemetry_enabled(): + return + + trace_manager = TraceQueueManager(app_id=execution.app_id, user_id=user_id) + node_data = _build_node_execution_data( + execution=execution, + outputs=outputs, + workflow_execution_id=workflow_execution_id, + ) + trace_manager.add_trace_task( + TraceTask( + TraceTaskName.DRAFT_NODE_EXECUTION_TRACE, + node_execution_data=node_data, + ) + ) + + +def _build_node_execution_data( + *, + execution: WorkflowNodeExecutionModel, + outputs: Mapping[str, Any] | None, + workflow_execution_id: str | None, +) -> dict[str, Any]: + metadata = execution.execution_metadata_dict + node_outputs = outputs if outputs is not None else execution.outputs_dict + execution_id = workflow_execution_id or execution.workflow_run_id or execution.id + + return { + "workflow_id": execution.workflow_id, + "workflow_execution_id": execution_id, + "tenant_id": execution.tenant_id, + "app_id": execution.app_id, + "node_execution_id": execution.id, + "node_id": execution.node_id, + "node_type": execution.node_type, + "title": execution.title, + "status": execution.status, + "error": execution.error, + "elapsed_time": execution.elapsed_time, + "index": execution.index, + "predecessor_node_id": execution.predecessor_node_id, + "created_at": execution.created_at, + "finished_at": execution.finished_at, + "total_tokens": metadata.get(WorkflowNodeExecutionMetadataKey.TOTAL_TOKENS, 0), + "total_price": metadata.get(WorkflowNodeExecutionMetadataKey.TOTAL_PRICE, 0.0), + "currency": metadata.get(WorkflowNodeExecutionMetadataKey.CURRENCY), + "tool_name": (metadata.get(WorkflowNodeExecutionMetadataKey.TOOL_INFO) or {}).get("tool_name") + if isinstance(metadata.get(WorkflowNodeExecutionMetadataKey.TOOL_INFO), dict) + else None, + "iteration_id": metadata.get(WorkflowNodeExecutionMetadataKey.ITERATION_ID), + "iteration_index": metadata.get(WorkflowNodeExecutionMetadataKey.ITERATION_INDEX), + "loop_id": metadata.get(WorkflowNodeExecutionMetadataKey.LOOP_ID), + "loop_index": metadata.get(WorkflowNodeExecutionMetadataKey.LOOP_INDEX), + "parallel_id": metadata.get(WorkflowNodeExecutionMetadataKey.PARALLEL_ID), + "node_inputs": execution.inputs_dict, + "node_outputs": node_outputs, + "process_data": execution.process_data_dict, + } diff --git a/api/enterprise/telemetry/enterprise_trace.py b/api/enterprise/telemetry/enterprise_trace.py new file mode 100644 index 0000000000..fc45f0db6c --- /dev/null +++ b/api/enterprise/telemetry/enterprise_trace.py @@ -0,0 +1,570 @@ +"""Enterprise trace handler — duck-typed, NOT a BaseTraceInstance subclass. + +Invoked directly in the Celery task, not through OpsTraceManager dispatch. +Only requires a matching ``trace(trace_info)`` method signature. + +Signal strategy: +- **Traces (spans)**: workflow run, node execution, draft node execution only. +- **Metrics + structured logs**: all other event types. +""" + +from __future__ import annotations + +import json +import logging +from typing import Any + +from core.ops.entities.trace_entity import ( + BaseTraceInfo, + DatasetRetrievalTraceInfo, + DraftNodeExecutionTrace, + GenerateNameTraceInfo, + MessageTraceInfo, + ModerationTraceInfo, + SuggestedQuestionTraceInfo, + ToolTraceInfo, + WorkflowNodeTraceInfo, + WorkflowTraceInfo, +) +from enterprise.telemetry.entities import ( + EnterpriseTelemetryCounter, + EnterpriseTelemetryHistogram, + EnterpriseTelemetrySpan, +) +from enterprise.telemetry.telemetry_log import emit_metric_only_event, emit_telemetry_log + +logger = logging.getLogger(__name__) + + +class EnterpriseOtelTrace: + """Duck-typed enterprise trace handler. + + ``*_trace`` methods emit spans (workflow/node only) or structured logs + (all other events), plus metrics at 100 % accuracy. + """ + + def __init__(self) -> None: + from extensions.ext_enterprise_telemetry import get_enterprise_exporter + + exporter = get_enterprise_exporter() + if exporter is None: + raise RuntimeError("EnterpriseOtelTrace instantiated but exporter is not initialized") + self._exporter = exporter + + def trace(self, trace_info: BaseTraceInfo) -> None: + if isinstance(trace_info, WorkflowTraceInfo): + self._workflow_trace(trace_info) + elif isinstance(trace_info, MessageTraceInfo): + self._message_trace(trace_info) + elif isinstance(trace_info, ToolTraceInfo): + self._tool_trace(trace_info) + elif isinstance(trace_info, DraftNodeExecutionTrace): + self._draft_node_execution_trace(trace_info) + elif isinstance(trace_info, WorkflowNodeTraceInfo): + self._node_execution_trace(trace_info) + elif isinstance(trace_info, ModerationTraceInfo): + self._moderation_trace(trace_info) + elif isinstance(trace_info, SuggestedQuestionTraceInfo): + self._suggested_question_trace(trace_info) + elif isinstance(trace_info, DatasetRetrievalTraceInfo): + self._dataset_retrieval_trace(trace_info) + elif isinstance(trace_info, GenerateNameTraceInfo): + self._generate_name_trace(trace_info) + + def _common_attrs(self, trace_info: BaseTraceInfo) -> dict[str, Any]: + return { + "dify.trace_id": trace_info.trace_id, + "dify.tenant_id": trace_info.metadata.get("tenant_id"), + "dify.app_id": trace_info.metadata.get("app_id"), + "dify.app.name": trace_info.metadata.get("app_name"), + "dify.workspace.name": trace_info.metadata.get("workspace_name"), + "gen_ai.user.id": trace_info.metadata.get("user_id"), + "dify.message.id": trace_info.message_id, + } + + def _maybe_json(self, value: Any) -> str | None: + if value is None: + return None + if isinstance(value, str): + return value + try: + return json.dumps(value, default=str) + except (TypeError, ValueError): + return str(value) + + # ------------------------------------------------------------------ + # SPAN-emitting handlers (workflow, node execution, draft node) + # ------------------------------------------------------------------ + + def _workflow_trace(self, info: WorkflowTraceInfo) -> None: + # -- Slim span attrs: identity + structure + status + timing only -- + span_attrs: dict[str, Any] = { + "dify.trace_id": info.trace_id, + "dify.tenant_id": info.metadata.get("tenant_id"), + "dify.app_id": info.metadata.get("app_id"), + "dify.workflow.id": info.workflow_id, + "dify.workflow.run_id": info.workflow_run_id, + "dify.workflow.status": info.workflow_run_status, + "dify.workflow.error": info.error, + "dify.workflow.elapsed_time": info.workflow_run_elapsed_time, + "dify.invoke_from": info.metadata.get("triggered_from"), + "dify.conversation.id": info.conversation_id, + "dify.message.id": info.message_id, + } + + trace_correlation_override: str | None = None + parent_span_id_source: str | None = None + + parent_ctx = info.metadata.get("parent_trace_context") + if parent_ctx and isinstance(parent_ctx, dict): + span_attrs["dify.parent.trace_id"] = parent_ctx.get("trace_id") + span_attrs["dify.parent.node.execution_id"] = parent_ctx.get("parent_node_execution_id") + span_attrs["dify.parent.workflow.run_id"] = parent_ctx.get("parent_workflow_run_id") + span_attrs["dify.parent.app.id"] = parent_ctx.get("parent_app_id") + + trace_correlation_override = parent_ctx.get("parent_workflow_run_id") + parent_span_id_source = parent_ctx.get("parent_node_execution_id") + + self._exporter.export_span( + EnterpriseTelemetrySpan.WORKFLOW_RUN, + span_attrs, + correlation_id=info.workflow_run_id, + span_id_source=info.workflow_run_id, + start_time=info.start_time, + end_time=info.end_time, + trace_correlation_override=trace_correlation_override, + parent_span_id_source=parent_span_id_source, + ) + + # -- Companion log: ALL attrs (span + detail) for full picture -- + log_attrs: dict[str, Any] = {**span_attrs} + log_attrs.update( + { + "dify.app.name": info.metadata.get("app_name"), + "dify.workspace.name": info.metadata.get("workspace_name"), + "gen_ai.user.id": info.metadata.get("user_id"), + "gen_ai.usage.total_tokens": info.total_tokens, + "dify.workflow.version": info.workflow_run_version, + } + ) + + if self._exporter.include_content: + log_attrs["dify.workflow.inputs"] = self._maybe_json(info.workflow_run_inputs) + log_attrs["dify.workflow.outputs"] = self._maybe_json(info.workflow_run_outputs) + log_attrs["dify.workflow.query"] = info.query + else: + ref = f"ref:workflow_run_id={info.workflow_run_id}" + log_attrs["dify.workflow.inputs"] = ref + log_attrs["dify.workflow.outputs"] = ref + log_attrs["dify.workflow.query"] = ref + + emit_telemetry_log( + event_name="dify.workflow.run", + attributes=log_attrs, + signal="span_detail", + trace_id_source=info.workflow_run_id, + tenant_id=info.metadata.get("tenant_id"), + user_id=info.metadata.get("user_id"), + ) + + # -- Metrics -- + labels = { + "tenant_id": info.tenant_id, + "app_id": info.metadata.get("app_id", ""), + } + self._exporter.increment_counter(EnterpriseTelemetryCounter.TOKENS, info.total_tokens, labels) + invoke_from = info.metadata.get("triggered_from", "") + self._exporter.increment_counter( + EnterpriseTelemetryCounter.REQUESTS, + 1, + {**labels, "type": "workflow", "status": info.workflow_run_status, "invoke_from": invoke_from}, + ) + self._exporter.record_histogram( + EnterpriseTelemetryHistogram.WORKFLOW_DURATION, + float(info.workflow_run_elapsed_time), + {**labels, "status": info.workflow_run_status}, + ) + + if info.error: + self._exporter.increment_counter(EnterpriseTelemetryCounter.ERRORS, 1, {**labels, "type": "workflow"}) + + def _node_execution_trace(self, info: WorkflowNodeTraceInfo) -> None: + self._emit_node_execution_trace(info, EnterpriseTelemetrySpan.NODE_EXECUTION, "node") + + def _draft_node_execution_trace(self, info: DraftNodeExecutionTrace) -> None: + self._emit_node_execution_trace( + info, + EnterpriseTelemetrySpan.DRAFT_NODE_EXECUTION, + "draft_node", + correlation_id_override=info.node_execution_id, + trace_correlation_override_param=info.workflow_run_id, + ) + + def _emit_node_execution_trace( + self, + info: WorkflowNodeTraceInfo, + span_name: EnterpriseTelemetrySpan, + request_type: str, + correlation_id_override: str | None = None, + trace_correlation_override_param: str | None = None, + ) -> None: + # -- Slim span attrs: identity + structure + status + timing -- + span_attrs: dict[str, Any] = { + "dify.trace_id": info.trace_id, + "dify.tenant_id": info.tenant_id, + "dify.app_id": info.metadata.get("app_id"), + "dify.workflow.id": info.workflow_id, + "dify.workflow.run_id": info.workflow_run_id, + "dify.message.id": info.message_id, + "dify.conversation.id": info.metadata.get("conversation_id"), + "dify.node.execution_id": info.node_execution_id, + "dify.node.id": info.node_id, + "dify.node.type": info.node_type, + "dify.node.title": info.title, + "dify.node.status": info.status, + "dify.node.error": info.error, + "dify.node.elapsed_time": info.elapsed_time, + "dify.node.index": info.index, + "dify.node.predecessor_node_id": info.predecessor_node_id, + "dify.node.iteration_id": info.iteration_id, + "dify.node.loop_id": info.loop_id, + "dify.node.parallel_id": info.parallel_id, + } + + trace_correlation_override = trace_correlation_override_param + parent_ctx = info.metadata.get("parent_trace_context") + if parent_ctx and isinstance(parent_ctx, dict): + trace_correlation_override = parent_ctx.get("parent_workflow_run_id") or trace_correlation_override + + effective_correlation_id = correlation_id_override or info.workflow_run_id + self._exporter.export_span( + span_name, + span_attrs, + correlation_id=effective_correlation_id, + span_id_source=info.node_execution_id, + start_time=info.start_time, + end_time=info.end_time, + trace_correlation_override=trace_correlation_override, + ) + + # -- Companion log: ALL attrs (span + detail) -- + log_attrs: dict[str, Any] = {**span_attrs} + log_attrs.update( + { + "dify.app.name": info.metadata.get("app_name"), + "dify.workspace.name": info.metadata.get("workspace_name"), + "dify.invoke_from": info.metadata.get("invoke_from"), + "gen_ai.user.id": info.metadata.get("user_id"), + "gen_ai.usage.total_tokens": info.total_tokens, + "dify.node.total_price": info.total_price, + "dify.node.currency": info.currency, + "gen_ai.provider.name": info.model_provider, + "gen_ai.request.model": info.model_name, + "gen_ai.usage.input_tokens": info.prompt_tokens, + "gen_ai.usage.output_tokens": info.completion_tokens, + "gen_ai.tool.name": info.tool_name, + "dify.node.iteration_index": info.iteration_index, + "dify.node.loop_index": info.loop_index, + "dify.plugin.name": info.metadata.get("plugin_name"), + "dify.credential.name": info.metadata.get("credential_name"), + "dify.dataset.ids": self._maybe_json(info.metadata.get("dataset_ids")), + "dify.dataset.names": self._maybe_json(info.metadata.get("dataset_names")), + } + ) + + if self._exporter.include_content: + log_attrs["dify.node.inputs"] = self._maybe_json(info.node_inputs) + log_attrs["dify.node.outputs"] = self._maybe_json(info.node_outputs) + log_attrs["dify.node.process_data"] = self._maybe_json(info.process_data) + else: + ref = f"ref:node_execution_id={info.node_execution_id}" + log_attrs["dify.node.inputs"] = ref + log_attrs["dify.node.outputs"] = ref + log_attrs["dify.node.process_data"] = ref + + emit_telemetry_log( + event_name=span_name.value, + attributes=log_attrs, + signal="span_detail", + trace_id_source=info.workflow_run_id, + tenant_id=info.tenant_id, + user_id=info.metadata.get("user_id"), + ) + + # -- Metrics -- + labels = { + "tenant_id": info.tenant_id, + "app_id": info.metadata.get("app_id", ""), + "node_type": info.node_type, + "model_provider": info.model_provider or "", + } + if info.total_tokens: + token_labels = {**labels, "model_name": info.model_name or ""} + self._exporter.increment_counter(EnterpriseTelemetryCounter.TOKENS, info.total_tokens, token_labels) + self._exporter.increment_counter( + EnterpriseTelemetryCounter.REQUESTS, 1, {**labels, "type": request_type, "status": info.status} + ) + duration_labels = dict(labels) + plugin_name = info.metadata.get("plugin_name") + if plugin_name and info.node_type in {"tool", "knowledge-retrieval"}: + duration_labels["plugin_name"] = plugin_name + self._exporter.record_histogram(EnterpriseTelemetryHistogram.NODE_DURATION, info.elapsed_time, duration_labels) + + if info.error: + self._exporter.increment_counter(EnterpriseTelemetryCounter.ERRORS, 1, {**labels, "type": request_type}) + + # ------------------------------------------------------------------ + # METRIC-ONLY handlers (structured log + counters/histograms) + # ------------------------------------------------------------------ + + def _message_trace(self, info: MessageTraceInfo) -> None: + attrs = self._common_attrs(info) + attrs.update( + { + "dify.invoke_from": info.metadata.get("from_source"), + "dify.conversation.id": info.metadata.get("conversation_id"), + "dify.conversation.mode": info.conversation_mode, + "gen_ai.provider.name": info.metadata.get("ls_provider"), + "gen_ai.request.model": info.metadata.get("ls_model_name"), + "gen_ai.usage.input_tokens": info.message_tokens, + "gen_ai.usage.output_tokens": info.answer_tokens, + "gen_ai.usage.total_tokens": info.total_tokens, + "dify.message.status": info.metadata.get("status"), + "dify.message.error": info.error, + "dify.message.from_source": info.metadata.get("from_source"), + "dify.message.from_end_user_id": info.metadata.get("from_end_user_id"), + "dify.message.from_account_id": info.metadata.get("from_account_id"), + "dify.streaming": info.is_streaming_request, + "dify.message.time_to_first_token": info.gen_ai_server_time_to_first_token, + "dify.message.streaming_duration": info.llm_streaming_time_to_generate, + "dify.workflow.run_id": info.metadata.get("workflow_run_id"), + } + ) + + if self._exporter.include_content: + attrs["dify.message.inputs"] = self._maybe_json(info.inputs) + attrs["dify.message.outputs"] = self._maybe_json(info.outputs) + else: + ref = f"ref:message_id={info.message_id}" + attrs["dify.message.inputs"] = ref + attrs["dify.message.outputs"] = ref + + emit_metric_only_event( + event_name="dify.message.run", + attributes=attrs, + trace_id_source=info.metadata.get("workflow_run_id") or str(info.message_id) if info.message_id else None, + tenant_id=info.metadata.get("tenant_id"), + user_id=info.metadata.get("user_id"), + ) + + labels = { + "tenant_id": info.metadata.get("tenant_id", ""), + "app_id": info.metadata.get("app_id", ""), + "model_provider": info.metadata.get("ls_provider", ""), + "model_name": info.metadata.get("ls_model_name", ""), + } + self._exporter.increment_counter(EnterpriseTelemetryCounter.TOKENS, info.total_tokens, labels) + invoke_from = info.metadata.get("from_source", "") + self._exporter.increment_counter( + EnterpriseTelemetryCounter.REQUESTS, + 1, + {**labels, "type": "message", "status": info.metadata.get("status", ""), "invoke_from": invoke_from}, + ) + + if info.start_time and info.end_time: + duration = (info.end_time - info.start_time).total_seconds() + self._exporter.record_histogram(EnterpriseTelemetryHistogram.MESSAGE_DURATION, duration, labels) + + if info.gen_ai_server_time_to_first_token is not None: + self._exporter.record_histogram( + EnterpriseTelemetryHistogram.MESSAGE_TTFT, info.gen_ai_server_time_to_first_token, labels + ) + + if info.error: + self._exporter.increment_counter(EnterpriseTelemetryCounter.ERRORS, 1, {**labels, "type": "message"}) + + def _tool_trace(self, info: ToolTraceInfo) -> None: + attrs = self._common_attrs(info) + attrs.update( + { + "gen_ai.tool.name": info.tool_name, + "dify.tool.time_cost": info.time_cost, + "dify.tool.error": info.error, + } + ) + + if self._exporter.include_content: + attrs["dify.tool.inputs"] = self._maybe_json(info.tool_inputs) + attrs["dify.tool.outputs"] = info.tool_outputs + attrs["dify.tool.parameters"] = self._maybe_json(info.tool_parameters) + attrs["dify.tool.config"] = self._maybe_json(info.tool_config) + else: + ref = f"ref:message_id={info.message_id}" + attrs["dify.tool.inputs"] = ref + attrs["dify.tool.outputs"] = ref + attrs["dify.tool.parameters"] = ref + attrs["dify.tool.config"] = ref + + emit_metric_only_event( + event_name="dify.tool.execution", + attributes=attrs, + tenant_id=info.metadata.get("tenant_id"), + user_id=info.metadata.get("user_id"), + ) + + labels = { + "tenant_id": info.metadata.get("tenant_id", ""), + "app_id": info.metadata.get("app_id", ""), + "tool_name": info.tool_name, + } + self._exporter.increment_counter(EnterpriseTelemetryCounter.REQUESTS, 1, {**labels, "type": "tool"}) + self._exporter.record_histogram(EnterpriseTelemetryHistogram.TOOL_DURATION, float(info.time_cost), labels) + + if info.error: + self._exporter.increment_counter(EnterpriseTelemetryCounter.ERRORS, 1, {**labels, "type": "tool"}) + + def _moderation_trace(self, info: ModerationTraceInfo) -> None: + attrs = self._common_attrs(info) + attrs.update( + { + "dify.moderation.flagged": info.flagged, + "dify.moderation.action": info.action, + "dify.moderation.preset_response": info.preset_response, + } + ) + + if self._exporter.include_content: + attrs["dify.moderation.query"] = info.query + else: + attrs["dify.moderation.query"] = f"ref:message_id={info.message_id}" + + emit_metric_only_event( + event_name="dify.moderation.check", + attributes=attrs, + tenant_id=info.metadata.get("tenant_id"), + user_id=info.metadata.get("user_id"), + ) + + labels = {"tenant_id": info.metadata.get("tenant_id", ""), "app_id": info.metadata.get("app_id", "")} + self._exporter.increment_counter(EnterpriseTelemetryCounter.REQUESTS, 1, {**labels, "type": "moderation"}) + + def _suggested_question_trace(self, info: SuggestedQuestionTraceInfo) -> None: + attrs = self._common_attrs(info) + attrs.update( + { + "gen_ai.usage.total_tokens": info.total_tokens, + "dify.suggested_question.status": info.status, + "dify.suggested_question.error": info.error, + "gen_ai.provider.name": info.model_provider, + "gen_ai.request.model": info.model_id, + "dify.suggested_question.count": len(info.suggested_question), + } + ) + + if self._exporter.include_content: + attrs["dify.suggested_question.questions"] = self._maybe_json(info.suggested_question) + else: + attrs["dify.suggested_question.questions"] = f"ref:message_id={info.message_id}" + + emit_metric_only_event( + event_name="dify.suggested_question.generation", + attributes=attrs, + tenant_id=info.metadata.get("tenant_id"), + user_id=info.metadata.get("user_id"), + ) + + labels = {"tenant_id": info.metadata.get("tenant_id", ""), "app_id": info.metadata.get("app_id", "")} + self._exporter.increment_counter( + EnterpriseTelemetryCounter.REQUESTS, 1, {**labels, "type": "suggested_question"} + ) + + def _dataset_retrieval_trace(self, info: DatasetRetrievalTraceInfo) -> None: + attrs = self._common_attrs(info) + attrs["dify.dataset.error"] = info.error + + docs = info.documents or [] + dataset_ids: list[str] = [] + dataset_names: list[str] = [] + structured_docs: list[dict] = [] + for doc in docs: + meta = doc.get("metadata", {}) if isinstance(doc, dict) else {} + did = meta.get("dataset_id") + dname = meta.get("dataset_name") + if did and did not in dataset_ids: + dataset_ids.append(did) + if dname and dname not in dataset_names: + dataset_names.append(dname) + structured_docs.append( + { + "dataset_id": did, + "document_id": meta.get("document_id"), + "segment_id": meta.get("segment_id"), + "score": meta.get("score"), + } + ) + + attrs["dify.dataset.ids"] = self._maybe_json(dataset_ids) + attrs["dify.dataset.names"] = self._maybe_json(dataset_names) + attrs["dify.retrieval.document_count"] = len(docs) + + embedding_models = info.metadata.get("embedding_models") or {} + if isinstance(embedding_models, dict): + providers: list[str] = [] + models: list[str] = [] + for ds_info in embedding_models.values(): + if isinstance(ds_info, dict): + p = ds_info.get("embedding_model_provider", "") + m = ds_info.get("embedding_model", "") + if p and p not in providers: + providers.append(p) + if m and m not in models: + models.append(m) + attrs["dify.dataset.embedding_providers"] = self._maybe_json(providers) + attrs["dify.dataset.embedding_models"] = self._maybe_json(models) + + if self._exporter.include_content: + attrs["dify.retrieval.query"] = self._maybe_json(info.inputs) + attrs["dify.dataset.documents"] = self._maybe_json(structured_docs) + else: + ref = f"ref:message_id={info.message_id}" + attrs["dify.retrieval.query"] = ref + attrs["dify.dataset.documents"] = ref + + emit_metric_only_event( + event_name="dify.dataset.retrieval", + attributes=attrs, + tenant_id=info.metadata.get("tenant_id"), + user_id=info.metadata.get("user_id"), + ) + + labels = {"tenant_id": info.metadata.get("tenant_id", ""), "app_id": info.metadata.get("app_id", "")} + self._exporter.increment_counter( + EnterpriseTelemetryCounter.REQUESTS, 1, {**labels, "type": "dataset_retrieval"} + ) + + for did in dataset_ids: + self._exporter.increment_counter( + EnterpriseTelemetryCounter.DATASET_RETRIEVALS, 1, {**labels, "dataset_id": did} + ) + + def _generate_name_trace(self, info: GenerateNameTraceInfo) -> None: + attrs = self._common_attrs(info) + attrs["dify.conversation.id"] = info.conversation_id + + if self._exporter.include_content: + attrs["dify.generate_name.inputs"] = self._maybe_json(info.inputs) + attrs["dify.generate_name.outputs"] = self._maybe_json(info.outputs) + else: + ref = f"ref:conversation_id={info.conversation_id}" + attrs["dify.generate_name.inputs"] = ref + attrs["dify.generate_name.outputs"] = ref + + emit_metric_only_event( + event_name="dify.generate_name.execution", + attributes=attrs, + tenant_id=info.tenant_id, + user_id=info.metadata.get("user_id"), + ) + + labels = {"tenant_id": info.tenant_id, "app_id": info.metadata.get("app_id", "")} + self._exporter.increment_counter(EnterpriseTelemetryCounter.REQUESTS, 1, {**labels, "type": "generate_name"}) diff --git a/api/enterprise/telemetry/entities/__init__.py b/api/enterprise/telemetry/entities/__init__.py new file mode 100644 index 0000000000..7c45bbfcbe --- /dev/null +++ b/api/enterprise/telemetry/entities/__init__.py @@ -0,0 +1,30 @@ +from enum import StrEnum + + +class EnterpriseTelemetrySpan(StrEnum): + WORKFLOW_RUN = "dify.workflow.run" + NODE_EXECUTION = "dify.node.execution" + DRAFT_NODE_EXECUTION = "dify.node.execution.draft" + + +class EnterpriseTelemetryCounter(StrEnum): + TOKENS = "tokens" + REQUESTS = "requests" + ERRORS = "errors" + FEEDBACK = "feedback" + DATASET_RETRIEVALS = "dataset_retrievals" + + +class EnterpriseTelemetryHistogram(StrEnum): + WORKFLOW_DURATION = "workflow_duration" + NODE_DURATION = "node_duration" + MESSAGE_DURATION = "message_duration" + MESSAGE_TTFT = "message_ttft" + TOOL_DURATION = "tool_duration" + + +__all__ = [ + "EnterpriseTelemetryCounter", + "EnterpriseTelemetryHistogram", + "EnterpriseTelemetrySpan", +] diff --git a/api/enterprise/telemetry/event_handlers.py b/api/enterprise/telemetry/event_handlers.py new file mode 100644 index 0000000000..837a93a5bf --- /dev/null +++ b/api/enterprise/telemetry/event_handlers.py @@ -0,0 +1,146 @@ +"""Blinker signal handlers for enterprise telemetry. + +Registered at import time via ``@signal.connect`` decorators. +Import must happen during ``ext_enterprise_telemetry.init_app()`` to ensure handlers fire. +""" + +from __future__ import annotations + +import logging + +from enterprise.telemetry.entities import EnterpriseTelemetryCounter +from enterprise.telemetry.telemetry_log import emit_metric_only_event +from events.app_event import app_was_created, app_was_deleted, app_was_updated +from events.feedback_event import feedback_was_created + +logger = logging.getLogger(__name__) + +__all__ = [ + "_handle_app_created", + "_handle_app_deleted", + "_handle_app_updated", + "_handle_feedback_created", +] + + +@app_was_created.connect +def _handle_app_created(sender: object, **kwargs: object) -> None: + from extensions.ext_enterprise_telemetry import get_enterprise_exporter + + exporter = get_enterprise_exporter() + if not exporter: + return + + attrs = { + "dify.app.id": getattr(sender, "id", None), + "dify.tenant_id": getattr(sender, "tenant_id", None), + "dify.app.mode": getattr(sender, "mode", None), + } + + emit_metric_only_event( + event_name="dify.app.created", + attributes=attrs, + tenant_id=str(getattr(sender, "tenant_id", "") or ""), + ) + exporter.increment_counter( + EnterpriseTelemetryCounter.REQUESTS, + 1, + { + "type": "app.created", + "tenant_id": getattr(sender, "tenant_id", ""), + }, + ) + + +@app_was_deleted.connect +def _handle_app_deleted(sender: object, **kwargs: object) -> None: + from extensions.ext_enterprise_telemetry import get_enterprise_exporter + + exporter = get_enterprise_exporter() + if not exporter: + return + + attrs = { + "dify.app.id": getattr(sender, "id", None), + "dify.tenant_id": getattr(sender, "tenant_id", None), + } + + emit_metric_only_event( + event_name="dify.app.deleted", + attributes=attrs, + tenant_id=str(getattr(sender, "tenant_id", "") or ""), + ) + exporter.increment_counter( + EnterpriseTelemetryCounter.REQUESTS, + 1, + { + "type": "app.deleted", + "tenant_id": getattr(sender, "tenant_id", ""), + }, + ) + + +@app_was_updated.connect +def _handle_app_updated(sender: object, **kwargs: object) -> None: + from extensions.ext_enterprise_telemetry import get_enterprise_exporter + + exporter = get_enterprise_exporter() + if not exporter: + return + + attrs = { + "dify.app.id": getattr(sender, "id", None), + "dify.tenant_id": getattr(sender, "tenant_id", None), + } + + emit_metric_only_event( + event_name="dify.app.updated", + attributes=attrs, + tenant_id=str(getattr(sender, "tenant_id", "") or ""), + ) + exporter.increment_counter( + EnterpriseTelemetryCounter.REQUESTS, + 1, + { + "type": "app.updated", + "tenant_id": getattr(sender, "tenant_id", ""), + }, + ) + + +@feedback_was_created.connect +def _handle_feedback_created(sender: object, **kwargs: object) -> None: + from extensions.ext_enterprise_telemetry import get_enterprise_exporter + + exporter = get_enterprise_exporter() + if not exporter: + return + + include_content = exporter.include_content + attrs: dict = { + "dify.message.id": getattr(sender, "message_id", None), + "dify.tenant_id": kwargs.get("tenant_id"), + "dify.app_id": getattr(sender, "app_id", None), + "dify.conversation.id": getattr(sender, "conversation_id", None), + "gen_ai.user.id": getattr(sender, "from_end_user_id", None) or getattr(sender, "from_account_id", None), + "dify.feedback.rating": getattr(sender, "rating", None), + "dify.feedback.from_source": getattr(sender, "from_source", None), + } + if include_content: + attrs["dify.feedback.content"] = getattr(sender, "content", None) + + emit_metric_only_event( + event_name="dify.feedback.created", + attributes=attrs, + tenant_id=str(kwargs.get("tenant_id", "") or ""), + user_id=str(getattr(sender, "from_end_user_id", None) or getattr(sender, "from_account_id", None) or ""), + ) + exporter.increment_counter( + EnterpriseTelemetryCounter.FEEDBACK, + 1, + { + "tenant_id": str(kwargs.get("tenant_id", "")), + "app_id": str(getattr(sender, "app_id", "")), + "rating": str(getattr(sender, "rating", "")), + }, + ) diff --git a/api/enterprise/telemetry/exporter.py b/api/enterprise/telemetry/exporter.py new file mode 100644 index 0000000000..1ad2307ea7 --- /dev/null +++ b/api/enterprise/telemetry/exporter.py @@ -0,0 +1,200 @@ +"""Enterprise OTEL exporter — shared by EnterpriseOtelTrace, event handlers, and direct instrumentation. + +Uses its own TracerProvider (configurable sampling, separate from ext_otel.py infrastructure) +and the global MeterProvider (shared with ext_otel.py — both target the same collector). + +Initialized once during Flask extension init (single-threaded via ext_enterprise_telemetry.py). +Accessed via ``ext_enterprise_telemetry.get_enterprise_exporter()`` from any thread/process. +""" + +import logging +import socket +import uuid +from datetime import datetime +from typing import cast + +from opentelemetry import metrics, trace +from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter +from opentelemetry.sdk.resources import Resource +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace.export import BatchSpanProcessor +from opentelemetry.sdk.trace.sampling import ParentBasedTraceIdRatio +from opentelemetry.semconv.resource import ResourceAttributes +from opentelemetry.trace import SpanContext, TraceFlags + +from configs import dify_config +from enterprise.telemetry.entities import EnterpriseTelemetryCounter, EnterpriseTelemetryHistogram +from enterprise.telemetry.id_generator import ( + CorrelationIdGenerator, + compute_deterministic_span_id, + set_correlation_id, + set_span_id_source, +) + +logger = logging.getLogger(__name__) + + +def is_enterprise_telemetry_enabled() -> bool: + return bool(dify_config.ENTERPRISE_ENABLED and dify_config.ENTERPRISE_TELEMETRY_ENABLED) + + +def _parse_otlp_headers(raw: str) -> dict[str, str]: + """Parse ``key=value,key2=value2`` into a dict.""" + if not raw: + return {} + headers: dict[str, str] = {} + for pair in raw.split(","): + if "=" not in pair: + continue + k, v = pair.split("=", 1) + headers[k.strip()] = v.strip() + return headers + + +def _datetime_to_ns(dt: datetime) -> int: + """Convert a datetime to nanoseconds since epoch (OTEL convention).""" + return int(dt.timestamp() * 1_000_000_000) + + +class EnterpriseExporter: + """Shared OTEL exporter for all enterprise telemetry. + + ``export_span`` creates spans with optional real timestamps, deterministic + span/trace IDs, and cross-workflow parent linking. + ``increment_counter`` / ``record_histogram`` emit OTEL metrics at 100% accuracy. + """ + + def __init__(self, config: object) -> None: + endpoint: str = getattr(config, "ENTERPRISE_OTLP_ENDPOINT", "") + headers_raw: str = getattr(config, "ENTERPRISE_OTLP_HEADERS", "") + service_name: str = getattr(config, "ENTERPRISE_SERVICE_NAME", "dify") + sampling_rate: float = getattr(config, "ENTERPRISE_OTEL_SAMPLING_RATE", 1.0) + self.include_content: bool = getattr(config, "ENTERPRISE_INCLUDE_CONTENT", True) + + resource = Resource( + attributes={ + ResourceAttributes.SERVICE_NAME: service_name, + ResourceAttributes.HOST_NAME: socket.gethostname(), + } + ) + sampler = ParentBasedTraceIdRatio(sampling_rate) + id_generator = CorrelationIdGenerator() + self._tracer_provider = TracerProvider(resource=resource, sampler=sampler, id_generator=id_generator) + + headers = _parse_otlp_headers(headers_raw) + trace_endpoint = f"{endpoint}/v1/traces" if endpoint else "" + self._tracer_provider.add_span_processor( + BatchSpanProcessor(OTLPSpanExporter(endpoint=trace_endpoint or None, headers=headers)) + ) + self._tracer = self._tracer_provider.get_tracer("dify.enterprise") + + meter = metrics.get_meter("dify.enterprise") + self._counters = { + EnterpriseTelemetryCounter.TOKENS: meter.create_counter("dify.tokens.total", unit="{token}"), + EnterpriseTelemetryCounter.REQUESTS: meter.create_counter("dify.requests.total", unit="{request}"), + EnterpriseTelemetryCounter.ERRORS: meter.create_counter("dify.errors.total", unit="{error}"), + EnterpriseTelemetryCounter.FEEDBACK: meter.create_counter("dify.feedback.total", unit="{feedback}"), + EnterpriseTelemetryCounter.DATASET_RETRIEVALS: meter.create_counter( + "dify.dataset.retrievals.total", unit="{retrieval}" + ), + } + self._histograms = { + EnterpriseTelemetryHistogram.WORKFLOW_DURATION: meter.create_histogram("dify.workflow.duration", unit="s"), + EnterpriseTelemetryHistogram.NODE_DURATION: meter.create_histogram("dify.node.duration", unit="s"), + EnterpriseTelemetryHistogram.MESSAGE_DURATION: meter.create_histogram("dify.message.duration", unit="s"), + EnterpriseTelemetryHistogram.MESSAGE_TTFT: meter.create_histogram( + "dify.message.time_to_first_token", unit="s" + ), + EnterpriseTelemetryHistogram.TOOL_DURATION: meter.create_histogram("dify.tool.duration", unit="s"), + } + + def export_span( + self, + name: str, + attributes: dict, + correlation_id: str | None = None, + span_id_source: str | None = None, + start_time: datetime | None = None, + end_time: datetime | None = None, + trace_correlation_override: str | None = None, + parent_span_id_source: str | None = None, + ) -> None: + """Export an OTEL span with optional deterministic IDs and real timestamps. + + Args: + name: Span operation name. + attributes: Span attributes dict. + correlation_id: Source for trace_id derivation (groups spans in one trace). + span_id_source: Source for deterministic span_id (e.g. workflow_run_id or node_execution_id). + start_time: Real span start time. When None, uses current time. + end_time: Real span end time. When None, span ends immediately. + trace_correlation_override: Override trace_id source (for cross-workflow linking). + When set, trace_id is derived from this instead of ``correlation_id``. + parent_span_id_source: Override parent span_id source (for cross-workflow linking). + When set, parent span_id is derived from this value. When None and + ``correlation_id`` is set, parent is the workflow root span. + """ + effective_trace_correlation = trace_correlation_override or correlation_id + set_correlation_id(effective_trace_correlation) + set_span_id_source(span_id_source) + + try: + parent_context = None + # A span is the "root" of its correlation group when span_id_source == correlation_id + # (i.e. a workflow root span). All other spans are children. + if parent_span_id_source: + # Cross-workflow linking: parent is an explicit span (e.g. tool node in outer workflow) + parent_span_id = compute_deterministic_span_id(parent_span_id_source) + parent_trace_id = ( + cast(int, uuid.UUID(effective_trace_correlation).int) if effective_trace_correlation else 0 + ) + if parent_trace_id: + parent_span_context = SpanContext( + trace_id=parent_trace_id, + span_id=parent_span_id, + is_remote=True, + trace_flags=TraceFlags(TraceFlags.SAMPLED), + ) + parent_context = trace.set_span_in_context(trace.NonRecordingSpan(parent_span_context)) + elif correlation_id and correlation_id != span_id_source: + # Child span: parent is the correlation-group root (workflow root span) + parent_span_id = compute_deterministic_span_id(correlation_id) + parent_trace_id = cast(int, uuid.UUID(effective_trace_correlation or correlation_id).int) + parent_span_context = SpanContext( + trace_id=parent_trace_id, + span_id=parent_span_id, + is_remote=True, + trace_flags=TraceFlags(TraceFlags.SAMPLED), + ) + parent_context = trace.set_span_in_context(trace.NonRecordingSpan(parent_span_context)) + + span_kwargs: dict = {"context": parent_context} + if start_time is not None: + span_kwargs["start_time"] = _datetime_to_ns(start_time) + if end_time is not None: + span_kwargs["end_on_exit"] = False + + with self._tracer.start_as_current_span(name, **span_kwargs) as span: + for key, value in attributes.items(): + if value is not None: + span.set_attribute(key, value) + if end_time is not None: + span.end(end_time=_datetime_to_ns(end_time)) + except Exception: + logger.exception("Failed to export span %s", name) + finally: + set_correlation_id(None) + set_span_id_source(None) + + def increment_counter(self, name: EnterpriseTelemetryCounter, value: int, labels: dict) -> None: + counter = self._counters.get(name) + if counter: + counter.add(value, labels) + + def record_histogram(self, name: EnterpriseTelemetryHistogram, value: float, labels: dict) -> None: + histogram = self._histograms.get(name) + if histogram: + histogram.record(value, labels) + + def shutdown(self) -> None: + self._tracer_provider.shutdown() diff --git a/api/enterprise/telemetry/id_generator.py b/api/enterprise/telemetry/id_generator.py new file mode 100644 index 0000000000..8f4760cac2 --- /dev/null +++ b/api/enterprise/telemetry/id_generator.py @@ -0,0 +1,76 @@ +"""Custom OTEL ID Generator for correlation-based trace/span ID derivation. + +Uses contextvars for thread-safe correlation_id -> trace_id mapping. +When a span_id_source is set, the span_id is derived deterministically +from that value, enabling any span to reference another as parent +without depending on span creation order. +""" + +import random +import uuid +from contextvars import ContextVar +from typing import cast + +from opentelemetry.sdk.trace.id_generator import IdGenerator + +_correlation_id_context: ContextVar[str | None] = ContextVar("correlation_id", default=None) +_span_id_source_context: ContextVar[str | None] = ContextVar("span_id_source", default=None) + + +def set_correlation_id(correlation_id: str | None) -> None: + _correlation_id_context.set(correlation_id) + + +def get_correlation_id() -> str | None: + return _correlation_id_context.get() + + +def set_span_id_source(source_id: str | None) -> None: + """Set the source for deterministic span_id generation. + + When set, ``generate_span_id()`` derives the span_id from this value + (lower 64 bits of the UUID). Pass the ``workflow_run_id`` for workflow + root spans or ``node_execution_id`` for node spans. + """ + _span_id_source_context.set(source_id) + + +def compute_deterministic_span_id(source_id: str) -> int: + """Derive a deterministic span_id from any UUID string. + + Uses the lower 64 bits of the UUID, guaranteeing non-zero output + (OTEL requires span_id != 0). + """ + span_id = cast(int, uuid.UUID(source_id).int) & ((1 << 64) - 1) + return span_id if span_id != 0 else 1 + + +class CorrelationIdGenerator(IdGenerator): + """ID generator that derives trace_id and optionally span_id from context. + + - trace_id: always derived from correlation_id (groups all spans in one trace) + - span_id: derived from span_id_source when set (enables deterministic + parent-child linking), otherwise random + """ + + def generate_trace_id(self) -> int: + correlation_id = _correlation_id_context.get() + if correlation_id: + try: + return cast(int, uuid.UUID(correlation_id).int) + except (ValueError, AttributeError): + pass + return random.getrandbits(128) + + def generate_span_id(self) -> int: + source = _span_id_source_context.get() + if source: + try: + return compute_deterministic_span_id(source) + except (ValueError, AttributeError): + pass + + span_id = random.getrandbits(64) + while span_id == 0: + span_id = random.getrandbits(64) + return span_id diff --git a/api/enterprise/telemetry/telemetry_log.py b/api/enterprise/telemetry/telemetry_log.py new file mode 100644 index 0000000000..aa44ad59b9 --- /dev/null +++ b/api/enterprise/telemetry/telemetry_log.py @@ -0,0 +1,91 @@ +"""Structured-log emitter for enterprise telemetry events. + +Emits structured JSON log lines correlated with OTEL traces via trace_id. +Picked up by ``StructuredJSONFormatter`` → stdout/Loki/Elastic. +""" + +from __future__ import annotations + +import logging +import uuid +from typing import Any + +logger = logging.getLogger("dify.telemetry") + + +def compute_trace_id_hex(uuid_str: str | None) -> str: + """Convert a business UUID string to a 32-hex OTEL-compatible trace_id. + + Returns empty string when *uuid_str* is ``None`` or invalid. + """ + if not uuid_str: + return "" + try: + return f"{uuid.UUID(uuid_str).int:032x}" + except (ValueError, AttributeError): + return "" + + +def emit_telemetry_log( + *, + event_name: str, + attributes: dict[str, Any], + signal: str = "metric_only", + trace_id_source: str | None = None, + tenant_id: str | None = None, + user_id: str | None = None, +) -> None: + """Emit a structured log line for a telemetry event. + + Parameters + ---------- + event_name: + Canonical event name, e.g. ``"dify.workflow.run"``. + attributes: + All event-specific attributes (already built by the caller). + signal: + ``"metric_only"`` for events with no span, ``"span_detail"`` + for detail logs accompanying a slim span. + trace_id_source: + A UUID string (e.g. ``workflow_run_id``) used to derive a 32-hex + trace_id for cross-signal correlation. + tenant_id: + Tenant identifier (for the ``IdentityContextFilter``). + user_id: + User identifier (for the ``IdentityContextFilter``). + """ + attrs = { + "dify.event.name": event_name, + "dify.event.signal": signal, + **attributes, + } + + extra: dict[str, Any] = {"attributes": attrs} + + trace_id_hex = compute_trace_id_hex(trace_id_source) + if trace_id_hex: + extra["trace_id"] = trace_id_hex + if tenant_id: + extra["tenant_id"] = tenant_id + if user_id: + extra["user_id"] = user_id + + logger.info("telemetry.%s", signal, extra=extra) + + +def emit_metric_only_event( + *, + event_name: str, + attributes: dict[str, Any], + trace_id_source: str | None = None, + tenant_id: str | None = None, + user_id: str | None = None, +) -> None: + emit_telemetry_log( + event_name=event_name, + attributes=attributes, + signal="metric_only", + trace_id_source=trace_id_source, + tenant_id=tenant_id, + user_id=user_id, + ) diff --git a/api/events/app_event.py b/api/events/app_event.py index f2ce71bbbb..3a0094b77c 100644 --- a/api/events/app_event.py +++ b/api/events/app_event.py @@ -3,6 +3,12 @@ from blinker import signal # sender: app app_was_created = signal("app-was-created") +# sender: app +app_was_deleted = signal("app-was-deleted") + +# sender: app +app_was_updated = signal("app-was-updated") + # sender: app, kwargs: app_model_config app_model_config_was_updated = signal("app-model-config-was-updated") diff --git a/api/events/feedback_event.py b/api/events/feedback_event.py new file mode 100644 index 0000000000..8d91d5c5e5 --- /dev/null +++ b/api/events/feedback_event.py @@ -0,0 +1,4 @@ +from blinker import signal + +# sender: MessageFeedback, kwargs: tenant_id +feedback_was_created = signal("feedback-was-created") diff --git a/api/extensions/ext_enterprise_telemetry.py b/api/extensions/ext_enterprise_telemetry.py new file mode 100644 index 0000000000..f2c68c479c --- /dev/null +++ b/api/extensions/ext_enterprise_telemetry.py @@ -0,0 +1,48 @@ +"""Flask extension for enterprise telemetry lifecycle management. + +Initializes the EnterpriseExporter singleton during ``create_app()`` (single-threaded), +registers blinker event handlers, and hooks atexit for graceful shutdown. + +Skipped entirely when ``ENTERPRISE_ENABLED`` and ``ENTERPRISE_TELEMETRY_ENABLED`` are false (``is_enabled()`` gate). +""" + +from __future__ import annotations + +import atexit +import logging +from typing import TYPE_CHECKING + +from configs import dify_config + +if TYPE_CHECKING: + from dify_app import DifyApp + from enterprise.telemetry.exporter import EnterpriseExporter + +logger = logging.getLogger(__name__) + +_exporter: EnterpriseExporter | None = None + + +def is_enabled() -> bool: + return bool(dify_config.ENTERPRISE_ENABLED and dify_config.ENTERPRISE_TELEMETRY_ENABLED) + + +def init_app(app: DifyApp) -> None: + global _exporter + + if not is_enabled(): + return + + from enterprise.telemetry.exporter import EnterpriseExporter + + _exporter = EnterpriseExporter(dify_config) + atexit.register(_exporter.shutdown) + + # Import to trigger @signal.connect decorator registration + import enterprise.telemetry.event_handlers # noqa: F401 # type: ignore[reportUnusedImport] + + logger.info("Enterprise telemetry initialized") + + +def get_enterprise_exporter() -> EnterpriseExporter | None: + return _exporter diff --git a/api/services/app_service.py b/api/services/app_service.py index 02ebfbace0..163475ded0 100644 --- a/api/services/app_service.py +++ b/api/services/app_service.py @@ -14,7 +14,7 @@ from core.model_runtime.entities.model_entities import ModelPropertyKey, ModelTy from core.model_runtime.model_providers.__base.large_language_model import LargeLanguageModel from core.tools.tool_manager import ToolManager from core.tools.utils.configuration import ToolParameterConfigurationManager -from events.app_event import app_was_created +from events.app_event import app_was_created, app_was_deleted from extensions.ext_database import db from libs.datetime_utils import naive_utc_now from libs.login import current_user @@ -341,6 +341,8 @@ class AppService: db.session.delete(app) db.session.commit() + app_was_deleted.send(app) + # clean up web app settings if FeatureService.get_system_features().webapp_auth.enabled: EnterpriseService.WebAppAuth.cleanup_webapp(app.id) diff --git a/api/services/enterprise/account_deletion_sync.py b/api/services/enterprise/account_deletion_sync.py index f8f8189891..c7ff42894d 100644 --- a/api/services/enterprise/account_deletion_sync.py +++ b/api/services/enterprise/account_deletion_sync.py @@ -81,7 +81,7 @@ def sync_workspace_member_removal(workspace_id: str, member_id: str, *, source: bool: True if task was queued (or skipped in community), False if queueing failed """ if not dify_config.ENTERPRISE_ENABLED: - return True + return True return _queue_task(workspace_id=workspace_id, member_id=member_id, source=source) @@ -101,7 +101,7 @@ def sync_account_deletion(account_id: str, *, source: str) -> bool: bool: True if all tasks were queued (or skipped in community), False if any queueing failed """ if not dify_config.ENTERPRISE_ENABLED: - return True + return True # Fetch all workspaces the account belongs to workspace_joins = db.session.query(TenantAccountJoin).filter_by(account_id=account_id).all() diff --git a/api/services/message_service.py b/api/services/message_service.py index e1a256e64d..fe04659883 100644 --- a/api/services/message_service.py +++ b/api/services/message_service.py @@ -10,6 +10,7 @@ from core.model_runtime.entities.model_entities import ModelType from core.ops.entities.trace_entity import TraceTaskName from core.ops.ops_trace_manager import TraceQueueManager, TraceTask from core.ops.utils import measure_time +from events.feedback_event import feedback_was_created from extensions.ext_database import db from libs.infinite_scroll_pagination import InfiniteScrollPagination from models import Account @@ -179,6 +180,9 @@ class MessageService: db.session.commit() + if feedback and rating: + feedback_was_created.send(feedback, tenant_id=app_model.tenant_id) + return feedback @classmethod diff --git a/api/services/workflow_service.py b/api/services/workflow_service.py index b45a167b73..c711f22ef4 100644 --- a/api/services/workflow_service.py +++ b/api/services/workflow_service.py @@ -27,6 +27,7 @@ from core.workflow.nodes.start.entities import StartNodeData from core.workflow.runtime import VariablePool from core.workflow.system_variable import SystemVariable from core.workflow.workflow_entry import WorkflowEntry +from enterprise.telemetry.draft_trace import enqueue_draft_node_execution_trace from enums.cloud_plan import CloudPlan from events.app_event import app_draft_workflow_was_synced, app_published_workflow_was_updated from extensions.ext_database import db @@ -647,6 +648,7 @@ class WorkflowService: node_config = draft_workflow.get_node_config_by_id(node_id) node_type = Workflow.get_node_type_from_node_config(node_config) node_data = node_config.get("data", {}) + workflow_execution_id: str | None = None if node_type.is_start_node: with Session(bind=db.engine) as session, session.begin(): draft_var_srv = WorkflowDraftVariableService(session) @@ -672,10 +674,13 @@ class WorkflowService: node_type=node_type, conversation_id=conversation_id, ) + workflow_execution_id = variable_pool.system_variables.workflow_execution_id else: + workflow_execution_id = str(uuid.uuid4()) + system_variable = SystemVariable(workflow_execution_id=workflow_execution_id) variable_pool = VariablePool( - system_variables=SystemVariable.empty(), + system_variables=system_variable, user_inputs=user_inputs, environment_variables=draft_workflow.environment_variables, conversation_variables=[], @@ -729,6 +734,13 @@ class WorkflowService: with Session(db.engine) as session: outputs = workflow_node_execution.load_full_outputs(session, storage) + enqueue_draft_node_execution_trace( + execution=workflow_node_execution, + outputs=outputs, + workflow_execution_id=workflow_execution_id, + user_id=account.id, + ) + with Session(bind=db.engine) as session, session.begin(): draft_var_saver = DraftVariableSaver( session=session, @@ -784,19 +796,20 @@ class WorkflowService: Returns: WorkflowNodeExecution: The execution result """ + created_at = naive_utc_now() node, node_run_result, run_succeeded, error = self._execute_node_safely(invoke_node_fn) + finished_at = naive_utc_now() - # Create base node execution node_execution = WorkflowNodeExecution( - id=str(uuid.uuid4()), + id=node.execution_id or str(uuid.uuid4()), workflow_id="", # Single-step execution has no workflow ID index=1, node_id=node_id, node_type=node.node_type, title=node.title, elapsed_time=time.perf_counter() - start_at, - created_at=naive_utc_now(), - finished_at=naive_utc_now(), + created_at=created_at, + finished_at=finished_at, ) # Populate execution result data diff --git a/api/tasks/ops_trace_task.py b/api/tasks/ops_trace_task.py index 72e3b42ca7..f7d08d6207 100644 --- a/api/tasks/ops_trace_task.py +++ b/api/tasks/ops_trace_task.py @@ -39,12 +39,25 @@ def process_trace_tasks(file_info): trace_info["documents"] = [Document.model_validate(doc) for doc in trace_info["documents"]] try: + trace_type = trace_info_info_map.get(trace_info_type) + if trace_type: + trace_info = trace_type(**trace_info) + + # process enterprise trace separately + from enterprise.telemetry.exporter import is_enterprise_telemetry_enabled + + if is_enterprise_telemetry_enabled(): + from enterprise.telemetry.enterprise_trace import EnterpriseOtelTrace + + try: + EnterpriseOtelTrace().trace(trace_info) + except Exception: + logger.warning("Enterprise trace failed for app_id: %s", app_id, exc_info=True) + if trace_instance: with current_app.app_context(): - trace_type = trace_info_info_map.get(trace_info_type) - if trace_type: - trace_info = trace_type(**trace_info) trace_instance.trace(trace_info) + logger.info("Processing trace tasks success, app_id: %s", app_id) except Exception as e: logger.info("error:\n\n\n%s\n\n\n\n", e)