diff --git a/api/core/ops/ops_trace_manager.py b/api/core/ops/ops_trace_manager.py index 38beda77d0..a4014111ed 100644 --- a/api/core/ops/ops_trace_manager.py +++ b/api/core/ops/ops_trace_manager.py @@ -1285,7 +1285,7 @@ class TraceQueueManager: self.trace_instance = OpsTraceManager.get_ops_trace_instance(app_id) self.flask_app = current_app._get_current_object() # type: ignore - from core.telemetry import is_enterprise_telemetry_enabled + from core.telemetry.gateway import is_enterprise_telemetry_enabled self._enterprise_telemetry_enabled = is_enterprise_telemetry_enabled() if trace_manager_timer is None: diff --git a/api/core/telemetry/__init__.py b/api/core/telemetry/__init__.py index b1d25403a0..3cb62bbbbf 100644 --- a/api/core/telemetry/__init__.py +++ b/api/core/telemetry/__init__.py @@ -1,10 +1,7 @@ -"""Community telemetry helpers. +"""Telemetry facade. -Provides ``emit()`` which enqueues trace events into the CE trace pipeline -(``TraceQueueManager`` → ``ops_trace`` Celery queue → Langfuse / LangSmith / etc.). - -Enterprise-only traces (node execution, draft node execution, prompt generation) -are silently dropped when enterprise telemetry is disabled. +Thin public API for emitting telemetry events. All routing logic +lives in ``core.telemetry.gateway`` which is shared by both CE and EE. """ from __future__ import annotations @@ -13,48 +10,34 @@ from typing import TYPE_CHECKING from core.ops.entities.trace_entity import TraceTaskName from core.telemetry.events import TelemetryContext, TelemetryEvent +from core.telemetry.gateway import TRACE_TASK_TO_CASE +from core.telemetry.gateway import emit as gateway_emit if TYPE_CHECKING: from core.ops.ops_trace_manager import TraceQueueManager -_ENTERPRISE_ONLY_TRACES: frozenset[TraceTaskName] = frozenset( - { - TraceTaskName.DRAFT_NODE_EXECUTION_TRACE, - TraceTaskName.NODE_EXECUTION_TRACE, - TraceTaskName.PROMPT_GENERATION_TRACE, - } -) - - -def _is_enterprise_telemetry_enabled() -> bool: - try: - from enterprise.telemetry.exporter import is_enterprise_telemetry_enabled - - return is_enterprise_telemetry_enabled() - except Exception: - return False - def emit(event: TelemetryEvent, trace_manager: TraceQueueManager | None = None) -> None: - from core.ops.ops_trace_manager import TraceQueueManager as LocalTraceQueueManager - from core.ops.ops_trace_manager import TraceTask + """Emit a telemetry event. - if event.name in _ENTERPRISE_ONLY_TRACES and not _is_enterprise_telemetry_enabled(): + Translates the ``TelemetryEvent`` (keyed by ``TraceTaskName``) into a + ``TelemetryCase`` and delegates to ``core.telemetry.gateway.emit()``. + """ + case = TRACE_TASK_TO_CASE.get(event.name) + if case is None: return - queue_manager = trace_manager or LocalTraceQueueManager( - app_id=event.context.app_id, - user_id=event.context.user_id, - ) - queue_manager.add_trace_task(TraceTask(event.name, **event.payload)) + context: dict[str, object] = { + "tenant_id": event.context.tenant_id, + "user_id": event.context.user_id, + "app_id": event.context.app_id, + } + gateway_emit(case, context, event.payload, trace_manager) -is_enterprise_telemetry_enabled = _is_enterprise_telemetry_enabled - __all__ = [ "TelemetryContext", "TelemetryEvent", "TraceTaskName", "emit", - "is_enterprise_telemetry_enabled", ] diff --git a/api/core/telemetry/gateway.py b/api/core/telemetry/gateway.py new file mode 100644 index 0000000000..14c3495ea3 --- /dev/null +++ b/api/core/telemetry/gateway.py @@ -0,0 +1,206 @@ +"""Telemetry gateway — single routing layer for all editions. + +Maps ``TelemetryCase`` → ``CaseRoute`` and dispatches events to either +the CE/EE trace pipeline (``TraceQueueManager``) or the enterprise-only +metric/log Celery queue. + +This module lives in ``core/`` so both CE and EE share one routing table +and one ``emit()`` entry point. No separate enterprise gateway module is +needed — enterprise-specific dispatch (Celery task, payload offloading) +is handled here behind lazy imports that no-op in CE. +""" + +from __future__ import annotations + +import json +import logging +import uuid +from typing import TYPE_CHECKING, Any + +from core.ops.entities.trace_entity import TraceTaskName +from enterprise.telemetry.contracts import CaseRoute, SignalType, TelemetryCase, TelemetryEnvelope +from extensions.ext_storage import storage + +if TYPE_CHECKING: + from core.ops.ops_trace_manager import TraceQueueManager + +logger = logging.getLogger(__name__) + +PAYLOAD_SIZE_THRESHOLD_BYTES = 1 * 1024 * 1024 + +# --------------------------------------------------------------------------- +# Routing table — authoritative mapping for all editions +# --------------------------------------------------------------------------- + +CASE_TO_TRACE_TASK: dict[TelemetryCase, TraceTaskName] = { + TelemetryCase.WORKFLOW_RUN: TraceTaskName.WORKFLOW_TRACE, + TelemetryCase.MESSAGE_RUN: TraceTaskName.MESSAGE_TRACE, + TelemetryCase.NODE_EXECUTION: TraceTaskName.NODE_EXECUTION_TRACE, + TelemetryCase.DRAFT_NODE_EXECUTION: TraceTaskName.DRAFT_NODE_EXECUTION_TRACE, + TelemetryCase.PROMPT_GENERATION: TraceTaskName.PROMPT_GENERATION_TRACE, + TelemetryCase.TOOL_EXECUTION: TraceTaskName.TOOL_TRACE, + TelemetryCase.MODERATION_CHECK: TraceTaskName.MODERATION_TRACE, + TelemetryCase.SUGGESTED_QUESTION: TraceTaskName.SUGGESTED_QUESTION_TRACE, + TelemetryCase.DATASET_RETRIEVAL: TraceTaskName.DATASET_RETRIEVAL_TRACE, + TelemetryCase.GENERATE_NAME: TraceTaskName.GENERATE_NAME_TRACE, +} + +TRACE_TASK_TO_CASE: dict[TraceTaskName, TelemetryCase] = {v: k for k, v in CASE_TO_TRACE_TASK.items()} + +CASE_ROUTING: dict[TelemetryCase, CaseRoute] = { + # TRACE — CE-eligible (flow in both CE and EE) + TelemetryCase.WORKFLOW_RUN: CaseRoute(signal_type=SignalType.TRACE, ce_eligible=True), + TelemetryCase.MESSAGE_RUN: CaseRoute(signal_type=SignalType.TRACE, ce_eligible=True), + TelemetryCase.TOOL_EXECUTION: CaseRoute(signal_type=SignalType.TRACE, ce_eligible=True), + TelemetryCase.MODERATION_CHECK: CaseRoute(signal_type=SignalType.TRACE, ce_eligible=True), + TelemetryCase.SUGGESTED_QUESTION: CaseRoute(signal_type=SignalType.TRACE, ce_eligible=True), + TelemetryCase.DATASET_RETRIEVAL: CaseRoute(signal_type=SignalType.TRACE, ce_eligible=True), + TelemetryCase.GENERATE_NAME: CaseRoute(signal_type=SignalType.TRACE, ce_eligible=True), + # TRACE — enterprise-only + TelemetryCase.NODE_EXECUTION: CaseRoute(signal_type=SignalType.TRACE, ce_eligible=False), + TelemetryCase.DRAFT_NODE_EXECUTION: CaseRoute(signal_type=SignalType.TRACE, ce_eligible=False), + TelemetryCase.PROMPT_GENERATION: CaseRoute(signal_type=SignalType.TRACE, ce_eligible=False), + # METRIC_LOG — enterprise-only (signal-driven, not trace) + TelemetryCase.APP_CREATED: CaseRoute(signal_type=SignalType.METRIC_LOG, ce_eligible=False), + TelemetryCase.APP_UPDATED: CaseRoute(signal_type=SignalType.METRIC_LOG, ce_eligible=False), + TelemetryCase.APP_DELETED: CaseRoute(signal_type=SignalType.METRIC_LOG, ce_eligible=False), + TelemetryCase.FEEDBACK_CREATED: CaseRoute(signal_type=SignalType.METRIC_LOG, ce_eligible=False), +} + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def is_enterprise_telemetry_enabled() -> bool: + try: + from enterprise.telemetry.exporter import is_enterprise_telemetry_enabled + + return is_enterprise_telemetry_enabled() + except Exception: + return False + + +def _handle_payload_sizing( + payload: dict[str, Any], + tenant_id: str, + event_id: str, +) -> tuple[dict[str, Any], str | None]: + """Inline or offload payload based on size. + + Returns ``(payload_for_envelope, storage_key | None)``. Payloads + exceeding ``PAYLOAD_SIZE_THRESHOLD_BYTES`` are written to object + storage and replaced with an empty dict in the envelope. + """ + try: + payload_json = json.dumps(payload) + payload_size = len(payload_json.encode("utf-8")) + except (TypeError, ValueError): + logger.warning("Failed to serialize payload for sizing: event_id=%s", event_id) + return payload, None + + if payload_size <= PAYLOAD_SIZE_THRESHOLD_BYTES: + return payload, None + + storage_key = f"telemetry/{tenant_id}/{event_id}.json" + try: + storage.save(storage_key, payload_json.encode("utf-8")) + logger.debug("Stored large payload to storage: key=%s, size=%d", storage_key, payload_size) + return {}, storage_key + except Exception: + logger.warning("Failed to store large payload, inlining instead: event_id=%s", event_id, exc_info=True) + return payload, None + + +# --------------------------------------------------------------------------- +# Public API +# --------------------------------------------------------------------------- + + +def emit( + case: TelemetryCase, + context: dict[str, Any], + payload: dict[str, Any], + trace_manager: TraceQueueManager | None = None, +) -> None: + """Route a telemetry event to the correct pipeline. + + TRACE events are enqueued into ``TraceQueueManager`` (works in both CE + and EE). Enterprise-only traces are silently dropped when EE is + disabled. + + METRIC_LOG events are dispatched to the enterprise Celery queue; + silently dropped when enterprise telemetry is unavailable. + """ + route = CASE_ROUTING.get(case) + if route is None: + logger.warning("Unknown telemetry case: %s, dropping event", case) + return + + if not route.ce_eligible and not is_enterprise_telemetry_enabled(): + logger.debug("Dropping EE-only event: case=%s (EE disabled)", case) + return + + if route.signal_type is SignalType.TRACE: + _emit_trace(case, context, payload, trace_manager) + else: + _emit_metric_log(case, context, payload) + + +def _emit_trace( + case: TelemetryCase, + context: dict[str, Any], + payload: dict[str, Any], + trace_manager: TraceQueueManager | None, +) -> None: + from core.ops.ops_trace_manager import TraceQueueManager as LocalTraceQueueManager + from core.ops.ops_trace_manager import TraceTask + + trace_task_name = CASE_TO_TRACE_TASK.get(case) + if trace_task_name is None: + logger.warning("No TraceTaskName mapping for case: %s", case) + return + + queue_manager = trace_manager or LocalTraceQueueManager( + app_id=context.get("app_id"), + user_id=context.get("user_id"), + ) + queue_manager.add_trace_task(TraceTask(trace_task_name, **payload)) + logger.debug("Enqueued trace task: case=%s, app_id=%s", case, context.get("app_id")) + + +def _emit_metric_log( + case: TelemetryCase, + context: dict[str, Any], + payload: dict[str, Any], +) -> None: + """Build envelope and dispatch to enterprise Celery queue. + + No-ops when the enterprise telemetry task is not importable (CE mode). + """ + try: + from tasks.enterprise_telemetry_task import process_enterprise_telemetry + except ImportError: + logger.debug("Enterprise metric/log dispatch unavailable, dropping: case=%s", case) + return + + tenant_id = context.get("tenant_id", "") + event_id = str(uuid.uuid4()) + + payload_for_envelope, payload_ref = _handle_payload_sizing(payload, tenant_id, event_id) + + envelope = TelemetryEnvelope( + case=case, + tenant_id=tenant_id, + event_id=event_id, + payload=payload_for_envelope, + metadata={"payload_ref": payload_ref} if payload_ref else None, + ) + + process_enterprise_telemetry.delay(envelope.model_dump_json()) + logger.debug( + "Enqueued metric/log event: case=%s, tenant_id=%s, event_id=%s", + case, + tenant_id, + event_id, + ) diff --git a/api/enterprise/telemetry/event_handlers.py b/api/enterprise/telemetry/event_handlers.py index 38276c7f0f..0a69d43774 100644 --- a/api/enterprise/telemetry/event_handlers.py +++ b/api/enterprise/telemetry/event_handlers.py @@ -1,13 +1,14 @@ """Blinker signal handlers for enterprise telemetry. Registered at import time via ``@signal.connect`` decorators. -Import must happen during ``ext_enterprise_telemetry.init_app()`` to ensure handlers fire. +Import must happen during ``ext_enterprise_telemetry.init_app()`` to +ensure handlers fire. Each handler delegates to ``core.telemetry.gateway`` +which handles routing, EE-gating, and dispatch. """ from __future__ import annotations import logging -import uuid from events.app_event import app_was_created, app_was_deleted, app_was_updated from events.feedback_event import feedback_was_created @@ -24,107 +25,60 @@ __all__ = [ @app_was_created.connect def _handle_app_created(sender: object, **kwargs: object) -> None: - from enterprise.telemetry.contracts import TelemetryCase, TelemetryEnvelope - from extensions.ext_enterprise_telemetry import get_enterprise_exporter - from tasks.enterprise_telemetry_task import process_enterprise_telemetry + from core.telemetry.gateway import emit as gateway_emit + from enterprise.telemetry.contracts import TelemetryCase - exporter = get_enterprise_exporter() - if not exporter: - return - - tenant_id = str(getattr(sender, "tenant_id", "") or "") - payload = { - "app_id": getattr(sender, "id", None), - "mode": getattr(sender, "mode", None), - } - - envelope = TelemetryEnvelope( + gateway_emit( case=TelemetryCase.APP_CREATED, - tenant_id=tenant_id, - event_id=str(uuid.uuid4()), - payload=payload, + context={"tenant_id": str(getattr(sender, "tenant_id", "") or "")}, + payload={ + "app_id": getattr(sender, "id", None), + "mode": getattr(sender, "mode", None), + }, ) - process_enterprise_telemetry.delay(envelope.model_dump_json()) - @app_was_deleted.connect def _handle_app_deleted(sender: object, **kwargs: object) -> None: - from enterprise.telemetry.contracts import TelemetryCase, TelemetryEnvelope - from extensions.ext_enterprise_telemetry import get_enterprise_exporter - from tasks.enterprise_telemetry_task import process_enterprise_telemetry + from core.telemetry.gateway import emit as gateway_emit + from enterprise.telemetry.contracts import TelemetryCase - exporter = get_enterprise_exporter() - if not exporter: - return - - tenant_id = str(getattr(sender, "tenant_id", "") or "") - payload = { - "app_id": getattr(sender, "id", None), - } - - envelope = TelemetryEnvelope( + gateway_emit( case=TelemetryCase.APP_DELETED, - tenant_id=tenant_id, - event_id=str(uuid.uuid4()), - payload=payload, + context={"tenant_id": str(getattr(sender, "tenant_id", "") or "")}, + payload={"app_id": getattr(sender, "id", None)}, ) - process_enterprise_telemetry.delay(envelope.model_dump_json()) - @app_was_updated.connect def _handle_app_updated(sender: object, **kwargs: object) -> None: - from enterprise.telemetry.contracts import TelemetryCase, TelemetryEnvelope - from extensions.ext_enterprise_telemetry import get_enterprise_exporter - from tasks.enterprise_telemetry_task import process_enterprise_telemetry + from core.telemetry.gateway import emit as gateway_emit + from enterprise.telemetry.contracts import TelemetryCase - exporter = get_enterprise_exporter() - if not exporter: - return - - tenant_id = str(getattr(sender, "tenant_id", "") or "") - payload = { - "app_id": getattr(sender, "id", None), - } - - envelope = TelemetryEnvelope( + gateway_emit( case=TelemetryCase.APP_UPDATED, - tenant_id=tenant_id, - event_id=str(uuid.uuid4()), - payload=payload, + context={"tenant_id": str(getattr(sender, "tenant_id", "") or "")}, + payload={"app_id": getattr(sender, "id", None)}, ) - process_enterprise_telemetry.delay(envelope.model_dump_json()) - @feedback_was_created.connect def _handle_feedback_created(sender: object, **kwargs: object) -> None: - from enterprise.telemetry.contracts import TelemetryCase, TelemetryEnvelope - from extensions.ext_enterprise_telemetry import get_enterprise_exporter - from tasks.enterprise_telemetry_task import process_enterprise_telemetry - - exporter = get_enterprise_exporter() - if not exporter: - return + from core.telemetry.gateway import emit as gateway_emit + from enterprise.telemetry.contracts import TelemetryCase tenant_id = str(kwargs.get("tenant_id", "") or "") - payload = { - "message_id": getattr(sender, "message_id", None), - "app_id": getattr(sender, "app_id", None), - "conversation_id": getattr(sender, "conversation_id", None), - "from_end_user_id": getattr(sender, "from_end_user_id", None), - "from_account_id": getattr(sender, "from_account_id", None), - "rating": getattr(sender, "rating", None), - "from_source": getattr(sender, "from_source", None), - "content": getattr(sender, "content", None), - } - - envelope = TelemetryEnvelope( + gateway_emit( case=TelemetryCase.FEEDBACK_CREATED, - tenant_id=tenant_id, - event_id=str(uuid.uuid4()), - payload=payload, + context={"tenant_id": tenant_id}, + payload={ + "message_id": getattr(sender, "message_id", None), + "app_id": getattr(sender, "app_id", None), + "conversation_id": getattr(sender, "conversation_id", None), + "from_end_user_id": getattr(sender, "from_end_user_id", None), + "from_account_id": getattr(sender, "from_account_id", None), + "rating": getattr(sender, "rating", None), + "from_source": getattr(sender, "from_source", None), + "content": getattr(sender, "content", None), + }, ) - - process_enterprise_telemetry.delay(envelope.model_dump_json()) diff --git a/api/enterprise/telemetry/gateway.py b/api/enterprise/telemetry/gateway.py deleted file mode 100644 index 73886e327e..0000000000 --- a/api/enterprise/telemetry/gateway.py +++ /dev/null @@ -1,199 +0,0 @@ -"""Telemetry gateway routing and dispatch. - -Maps ``TelemetryCase`` → ``CaseRoute`` (signal type + CE eligibility) -and dispatches events to either the trace pipeline or the metric/log -Celery queue. - -Singleton lifecycle is managed by ``ext_enterprise_telemetry.init_app()`` -which creates the instance during single-threaded Flask app startup. -Access via ``ext_enterprise_telemetry.get_gateway()``. -""" - -from __future__ import annotations - -import json -import logging -import uuid -from typing import TYPE_CHECKING, Any - -from core.ops.entities.trace_entity import TraceTaskName -from enterprise.telemetry.contracts import CaseRoute, SignalType, TelemetryCase, TelemetryEnvelope -from extensions.ext_storage import storage - -if TYPE_CHECKING: - from core.ops.ops_trace_manager import TraceQueueManager - -logger = logging.getLogger(__name__) - -PAYLOAD_SIZE_THRESHOLD_BYTES = 1 * 1024 * 1024 - -CASE_TO_TRACE_TASK: dict[TelemetryCase, TraceTaskName] = { - TelemetryCase.WORKFLOW_RUN: TraceTaskName.WORKFLOW_TRACE, - TelemetryCase.MESSAGE_RUN: TraceTaskName.MESSAGE_TRACE, - TelemetryCase.NODE_EXECUTION: TraceTaskName.NODE_EXECUTION_TRACE, - TelemetryCase.DRAFT_NODE_EXECUTION: TraceTaskName.DRAFT_NODE_EXECUTION_TRACE, - TelemetryCase.PROMPT_GENERATION: TraceTaskName.PROMPT_GENERATION_TRACE, -} - -CASE_ROUTING: dict[TelemetryCase, CaseRoute] = { - TelemetryCase.WORKFLOW_RUN: CaseRoute(signal_type=SignalType.TRACE, ce_eligible=True), - TelemetryCase.MESSAGE_RUN: CaseRoute(signal_type=SignalType.TRACE, ce_eligible=True), - TelemetryCase.NODE_EXECUTION: CaseRoute(signal_type=SignalType.TRACE, ce_eligible=False), - TelemetryCase.DRAFT_NODE_EXECUTION: CaseRoute(signal_type=SignalType.TRACE, ce_eligible=False), - TelemetryCase.PROMPT_GENERATION: CaseRoute(signal_type=SignalType.TRACE, ce_eligible=False), - TelemetryCase.APP_CREATED: CaseRoute(signal_type=SignalType.METRIC_LOG, ce_eligible=False), - TelemetryCase.APP_UPDATED: CaseRoute(signal_type=SignalType.METRIC_LOG, ce_eligible=False), - TelemetryCase.APP_DELETED: CaseRoute(signal_type=SignalType.METRIC_LOG, ce_eligible=False), - TelemetryCase.FEEDBACK_CREATED: CaseRoute(signal_type=SignalType.METRIC_LOG, ce_eligible=False), - TelemetryCase.TOOL_EXECUTION: CaseRoute(signal_type=SignalType.METRIC_LOG, ce_eligible=False), - TelemetryCase.MODERATION_CHECK: CaseRoute(signal_type=SignalType.METRIC_LOG, ce_eligible=False), - TelemetryCase.SUGGESTED_QUESTION: CaseRoute(signal_type=SignalType.METRIC_LOG, ce_eligible=False), - TelemetryCase.DATASET_RETRIEVAL: CaseRoute(signal_type=SignalType.METRIC_LOG, ce_eligible=False), - TelemetryCase.GENERATE_NAME: CaseRoute(signal_type=SignalType.METRIC_LOG, ce_eligible=False), -} - - -def _is_enterprise_telemetry_enabled() -> bool: - try: - from enterprise.telemetry.exporter import is_enterprise_telemetry_enabled - - return is_enterprise_telemetry_enabled() - except Exception: - return False - - -def _should_drop_ee_only_event(route: CaseRoute) -> bool: - """Return True when the event is enterprise-only and EE telemetry is disabled.""" - return not route.ce_eligible and not _is_enterprise_telemetry_enabled() - - -class TelemetryGateway: - """Routes telemetry events to the trace pipeline or the metric/log Celery queue. - - Stateless — instantiated once during ``ext_enterprise_telemetry.init_app()`` - and shared for the lifetime of the process. - """ - - def emit( - self, - case: TelemetryCase, - context: dict[str, Any], - payload: dict[str, Any], - trace_manager: TraceQueueManager | None = None, - ) -> None: - route = CASE_ROUTING.get(case) - if route is None: - logger.warning("Unknown telemetry case: %s, dropping event", case) - return - - if _should_drop_ee_only_event(route): - logger.debug("Dropping EE-only event: case=%s (EE disabled)", case) - return - - logger.debug( - "Gateway routing: case=%s, signal_type=%s, ce_eligible=%s", - case, - route.signal_type, - route.ce_eligible, - ) - - if route.signal_type is SignalType.TRACE: - self._emit_trace(case, context, payload, route, trace_manager) - else: - self._emit_metric_log(case, context, payload) - - def _emit_trace( - self, - case: TelemetryCase, - context: dict[str, Any], - payload: dict[str, Any], - route: CaseRoute, - trace_manager: TraceQueueManager | None, - ) -> None: - from core.ops.ops_trace_manager import TraceQueueManager as LocalTraceQueueManager - from core.ops.ops_trace_manager import TraceTask - - trace_task_name = CASE_TO_TRACE_TASK.get(case) - if trace_task_name is None: - logger.warning("No TraceTaskName mapping for case: %s", case) - return - - queue_manager = trace_manager or LocalTraceQueueManager( - app_id=context.get("app_id"), - user_id=context.get("user_id"), - ) - - queue_manager.add_trace_task(TraceTask(trace_task_name, **payload)) - logger.debug("Enqueued trace task: case=%s, app_id=%s", case, context.get("app_id")) - - def _emit_metric_log( - self, - case: TelemetryCase, - context: dict[str, Any], - payload: dict[str, Any], - ) -> None: - from tasks.enterprise_telemetry_task import process_enterprise_telemetry - - tenant_id = context.get("tenant_id", "") - event_id = str(uuid.uuid4()) - - payload_for_envelope, payload_ref = self._handle_payload_sizing(payload, tenant_id, event_id) - - envelope = TelemetryEnvelope( - case=case, - tenant_id=tenant_id, - event_id=event_id, - payload=payload_for_envelope, - metadata={"payload_ref": payload_ref} if payload_ref else None, - ) - - process_enterprise_telemetry.delay(envelope.model_dump_json()) - logger.debug( - "Enqueued metric/log event: case=%s, tenant_id=%s, event_id=%s", - case, - tenant_id, - event_id, - ) - - def _handle_payload_sizing( - self, - payload: dict[str, Any], - tenant_id: str, - event_id: str, - ) -> tuple[dict[str, Any], str | None]: - try: - payload_json = json.dumps(payload) - payload_size = len(payload_json.encode("utf-8")) - except (TypeError, ValueError): - logger.warning("Failed to serialize payload for sizing: event_id=%s", event_id) - return payload, None - - if payload_size <= PAYLOAD_SIZE_THRESHOLD_BYTES: - return payload, None - - storage_key = f"telemetry/{tenant_id}/{event_id}.json" - try: - storage.save(storage_key, payload_json.encode("utf-8")) - logger.debug("Stored large payload to storage: key=%s, size=%d", storage_key, payload_size) - return {}, storage_key - except Exception: - logger.warning("Failed to store large payload, inlining instead: event_id=%s", event_id, exc_info=True) - return payload, None - - -def emit( - case: TelemetryCase, - context: dict[str, Any], - payload: dict[str, Any], - trace_manager: TraceQueueManager | None = None, -) -> None: - """Module-level convenience wrapper. - - Fetches the gateway singleton from the extension; no-ops when - enterprise telemetry is disabled (gateway is ``None``). - """ - from extensions.ext_enterprise_telemetry import get_gateway - - gateway = get_gateway() - if gateway is not None: - gateway.emit(case, context, payload, trace_manager) diff --git a/api/extensions/ext_enterprise_telemetry.py b/api/extensions/ext_enterprise_telemetry.py index a24e14efa7..f785c00ae0 100644 --- a/api/extensions/ext_enterprise_telemetry.py +++ b/api/extensions/ext_enterprise_telemetry.py @@ -1,8 +1,8 @@ """Flask extension for enterprise telemetry lifecycle management. -Initializes the EnterpriseExporter and TelemetryGateway singletons during -``create_app()`` (single-threaded), registers blinker event handlers, -and hooks atexit for graceful shutdown. +Initializes the EnterpriseExporter singleton during ``create_app()`` +(single-threaded), registers blinker event handlers, and hooks atexit +for graceful shutdown. Skipped entirely when ``ENTERPRISE_ENABLED`` and ``ENTERPRISE_TELEMETRY_ENABLED`` are false (``is_enabled()`` gate). @@ -19,12 +19,10 @@ from configs import dify_config if TYPE_CHECKING: from dify_app import DifyApp from enterprise.telemetry.exporter import EnterpriseExporter - from enterprise.telemetry.gateway import TelemetryGateway logger = logging.getLogger(__name__) _exporter: EnterpriseExporter | None = None -_gateway: TelemetryGateway | None = None def is_enabled() -> bool: @@ -32,16 +30,14 @@ def is_enabled() -> bool: def init_app(app: DifyApp) -> None: - global _exporter, _gateway + global _exporter if not is_enabled(): return from enterprise.telemetry.exporter import EnterpriseExporter - from enterprise.telemetry.gateway import TelemetryGateway _exporter = EnterpriseExporter(dify_config) - _gateway = TelemetryGateway() atexit.register(_exporter.shutdown) # Import to trigger @signal.connect decorator registration @@ -52,7 +48,3 @@ def init_app(app: DifyApp) -> None: def get_enterprise_exporter() -> EnterpriseExporter | None: return _exporter - - -def get_gateway() -> TelemetryGateway | None: - return _gateway diff --git a/api/tests/unit_tests/core/ops/test_trace_queue_manager.py b/api/tests/unit_tests/core/ops/test_trace_queue_manager.py index 25adda21ec..44a58ab902 100644 --- a/api/tests/unit_tests/core/ops/test_trace_queue_manager.py +++ b/api/tests/unit_tests/core/ops/test_trace_queue_manager.py @@ -39,7 +39,7 @@ def trace_queue_manager_and_task(monkeypatch): class StubTraceQueueManager: def __init__(self, app_id=None): self.app_id = app_id - from core.telemetry import is_enterprise_telemetry_enabled + from core.telemetry.gateway import is_enterprise_telemetry_enabled self._enterprise_telemetry_enabled = is_enterprise_telemetry_enabled() self.trace_instance = StubOpsTraceManager.get_ops_trace_instance(app_id) @@ -87,7 +87,7 @@ class TestTraceQueueManagerTelemetryGuard: trace_task = TraceTask(trace_type=TraceTaskName.WORKFLOW_TRACE) with ( - patch("core.telemetry.is_enterprise_telemetry_enabled", return_value=False), + patch("core.telemetry.gateway.is_enterprise_telemetry_enabled", return_value=False), patch("core.ops.ops_trace_manager.OpsTraceManager.get_ops_trace_instance", return_value=None), patch("core.ops.ops_trace_manager.trace_manager_queue", mock_queue), ): @@ -109,7 +109,7 @@ class TestTraceQueueManagerTelemetryGuard: trace_task = TraceTask(trace_type=TraceTaskName.WORKFLOW_TRACE) with ( - patch("core.telemetry.is_enterprise_telemetry_enabled", return_value=True), + patch("core.telemetry.gateway.is_enterprise_telemetry_enabled", return_value=True), patch("core.ops.ops_trace_manager.OpsTraceManager.get_ops_trace_instance", return_value=None), patch("core.ops.ops_trace_manager.trace_manager_queue", mock_queue), ): @@ -135,7 +135,7 @@ class TestTraceQueueManagerTelemetryGuard: trace_task = TraceTask(trace_type=TraceTaskName.WORKFLOW_TRACE) with ( - patch("core.telemetry.is_enterprise_telemetry_enabled", return_value=False), + patch("core.telemetry.gateway.is_enterprise_telemetry_enabled", return_value=False), patch( "core.ops.ops_trace_manager.OpsTraceManager.get_ops_trace_instance", return_value=mock_trace_instance ), @@ -163,7 +163,7 @@ class TestTraceQueueManagerTelemetryGuard: trace_task = TraceTask(trace_type=TraceTaskName.WORKFLOW_TRACE) with ( - patch("core.telemetry.is_enterprise_telemetry_enabled", return_value=True), + patch("core.telemetry.gateway.is_enterprise_telemetry_enabled", return_value=True), patch( "core.ops.ops_trace_manager.OpsTraceManager.get_ops_trace_instance", return_value=mock_trace_instance ), @@ -189,7 +189,7 @@ class TestTraceQueueManagerTelemetryGuard: trace_task = TraceTask(trace_type=TraceTaskName.WORKFLOW_TRACE) with ( - patch("core.telemetry.is_enterprise_telemetry_enabled", return_value=True), + patch("core.telemetry.gateway.is_enterprise_telemetry_enabled", return_value=True), patch("core.ops.ops_trace_manager.OpsTraceManager.get_ops_trace_instance", return_value=None), patch("core.ops.ops_trace_manager.trace_manager_queue", mock_queue), ): diff --git a/api/tests/unit_tests/core/telemetry/test_facade.py b/api/tests/unit_tests/core/telemetry/test_facade.py index ae7b2ce818..64c2f6a971 100644 --- a/api/tests/unit_tests/core/telemetry/test_facade.py +++ b/api/tests/unit_tests/core/telemetry/test_facade.py @@ -53,7 +53,7 @@ def telemetry_test_setup(monkeypatch): class TestTelemetryEmit: - @patch("core.telemetry._is_enterprise_telemetry_enabled", return_value=True) + @patch("core.telemetry.gateway.is_enterprise_telemetry_enabled", return_value=True) def test_emit_enterprise_trace_creates_trace_task(self, _mock_ee, telemetry_test_setup): emit_fn, mock_queue = telemetry_test_setup @@ -107,7 +107,7 @@ class TestTelemetryEmit: mock_queue.put.assert_not_called() - @patch("core.telemetry._is_enterprise_telemetry_enabled", return_value=True) + @patch("core.telemetry.gateway.is_enterprise_telemetry_enabled", return_value=True) def test_emit_all_enterprise_only_traces_allowed_when_ee_enabled(self, _mock_ee, telemetry_test_setup): emit_fn, mock_queue = telemetry_test_setup @@ -136,7 +136,7 @@ class TestTelemetryEmit: called_task = mock_queue.put.call_args[0][0] assert called_task.trace_type == trace_name - @patch("core.telemetry._is_enterprise_telemetry_enabled", return_value=True) + @patch("core.telemetry.gateway.is_enterprise_telemetry_enabled", return_value=True) def test_emit_passes_name_directly_to_trace_task(self, _mock_ee, telemetry_test_setup): emit_fn, mock_queue = telemetry_test_setup @@ -157,7 +157,7 @@ class TestTelemetryEmit: assert called_task.trace_type == TraceTaskName.DRAFT_NODE_EXECUTION_TRACE assert isinstance(called_task.trace_type, TraceTaskName) - @patch("core.telemetry._is_enterprise_telemetry_enabled", return_value=True) + @patch("core.telemetry.gateway.is_enterprise_telemetry_enabled", return_value=True) def test_emit_with_provided_trace_manager(self, _mock_ee, telemetry_test_setup): emit_fn, mock_queue = telemetry_test_setup diff --git a/api/tests/unit_tests/core/telemetry/test_gateway_integration.py b/api/tests/unit_tests/core/telemetry/test_gateway_integration.py index 076cd00879..536d4374d6 100644 --- a/api/tests/unit_tests/core/telemetry/test_gateway_integration.py +++ b/api/tests/unit_tests/core/telemetry/test_gateway_integration.py @@ -5,14 +5,13 @@ from unittest.mock import MagicMock, patch import pytest -from core.telemetry import is_enterprise_telemetry_enabled +from core.telemetry.gateway import emit, is_enterprise_telemetry_enabled from enterprise.telemetry.contracts import TelemetryCase -from enterprise.telemetry.gateway import TelemetryGateway class TestTelemetryCoreExports: def test_is_enterprise_telemetry_enabled_exported(self) -> None: - from core.telemetry import is_enterprise_telemetry_enabled as exported_func + from core.telemetry.gateway import is_enterprise_telemetry_enabled as exported_func assert callable(exported_func) @@ -38,10 +37,6 @@ def mock_ops_trace_manager(): class TestGatewayIntegrationTraceRouting: - @pytest.fixture - def gateway(self) -> TelemetryGateway: - return TelemetryGateway() - @pytest.fixture def mock_trace_manager(self) -> MagicMock: return MagicMock() @@ -49,68 +44,61 @@ class TestGatewayIntegrationTraceRouting: @pytest.mark.usefixtures("mock_ops_trace_manager") def test_ce_eligible_trace_routed_to_trace_manager( self, - gateway: TelemetryGateway, mock_trace_manager: MagicMock, ) -> None: - with patch("enterprise.telemetry.gateway._is_enterprise_telemetry_enabled", return_value=True): + with patch("core.telemetry.gateway.is_enterprise_telemetry_enabled", return_value=True): context = {"app_id": "app-123", "user_id": "user-456", "tenant_id": "tenant-789"} payload = {"workflow_run_id": "run-abc"} - gateway.emit(TelemetryCase.WORKFLOW_RUN, context, payload, mock_trace_manager) + emit(TelemetryCase.WORKFLOW_RUN, context, payload, mock_trace_manager) mock_trace_manager.add_trace_task.assert_called_once() @pytest.mark.usefixtures("mock_ops_trace_manager") def test_ce_eligible_trace_routed_when_ee_disabled( self, - gateway: TelemetryGateway, mock_trace_manager: MagicMock, ) -> None: - with patch("enterprise.telemetry.gateway._is_enterprise_telemetry_enabled", return_value=False): + with patch("core.telemetry.gateway.is_enterprise_telemetry_enabled", return_value=False): context = {"app_id": "app-123", "user_id": "user-456"} payload = {"workflow_run_id": "run-abc"} - gateway.emit(TelemetryCase.WORKFLOW_RUN, context, payload, mock_trace_manager) + emit(TelemetryCase.WORKFLOW_RUN, context, payload, mock_trace_manager) mock_trace_manager.add_trace_task.assert_called_once() @pytest.mark.usefixtures("mock_ops_trace_manager") def test_enterprise_only_trace_dropped_when_ee_disabled( self, - gateway: TelemetryGateway, mock_trace_manager: MagicMock, ) -> None: - with patch("enterprise.telemetry.gateway._is_enterprise_telemetry_enabled", return_value=False): + with patch("core.telemetry.gateway.is_enterprise_telemetry_enabled", return_value=False): context = {"app_id": "app-123", "user_id": "user-456"} payload = {"node_id": "node-abc"} - gateway.emit(TelemetryCase.NODE_EXECUTION, context, payload, mock_trace_manager) + emit(TelemetryCase.NODE_EXECUTION, context, payload, mock_trace_manager) mock_trace_manager.add_trace_task.assert_not_called() @pytest.mark.usefixtures("mock_ops_trace_manager") def test_enterprise_only_trace_routed_when_ee_enabled( self, - gateway: TelemetryGateway, mock_trace_manager: MagicMock, ) -> None: - with patch("enterprise.telemetry.gateway._is_enterprise_telemetry_enabled", return_value=True): + with patch("core.telemetry.gateway.is_enterprise_telemetry_enabled", return_value=True): context = {"app_id": "app-123", "user_id": "user-456"} payload = {"node_id": "node-abc"} - gateway.emit(TelemetryCase.NODE_EXECUTION, context, payload, mock_trace_manager) + emit(TelemetryCase.NODE_EXECUTION, context, payload, mock_trace_manager) mock_trace_manager.add_trace_task.assert_called_once() class TestGatewayIntegrationMetricRouting: - @pytest.fixture - def gateway(self) -> TelemetryGateway: - return TelemetryGateway() - + @patch("core.telemetry.gateway.is_enterprise_telemetry_enabled", return_value=True) def test_metric_case_routes_to_celery_task( self, - gateway: TelemetryGateway, + _mock_ee_enabled: MagicMock, ) -> None: from enterprise.telemetry.contracts import TelemetryEnvelope @@ -118,7 +106,7 @@ class TestGatewayIntegrationMetricRouting: context = {"tenant_id": "tenant-123"} payload = {"app_id": "app-abc", "name": "My App"} - gateway.emit(TelemetryCase.APP_CREATED, context, payload) + emit(TelemetryCase.APP_CREATED, context, payload) mock_delay.assert_called_once() envelope_json = mock_delay.call_args[0][0] @@ -127,46 +115,36 @@ class TestGatewayIntegrationMetricRouting: assert envelope.tenant_id == "tenant-123" assert envelope.payload["app_id"] == "app-abc" - def test_tool_execution_metric_routed( + @pytest.mark.usefixtures("mock_ops_trace_manager") + @patch("core.telemetry.gateway.is_enterprise_telemetry_enabled", return_value=True) + def test_tool_execution_trace_routed( self, - gateway: TelemetryGateway, + _mock_ee_enabled: MagicMock, ) -> None: - from enterprise.telemetry.contracts import TelemetryEnvelope + mock_trace_manager = MagicMock() + context = {"tenant_id": "tenant-123", "app_id": "app-123"} + payload = {"tool_name": "test_tool", "tool_inputs": {}, "tool_outputs": "result"} - with patch("tasks.enterprise_telemetry_task.process_enterprise_telemetry.delay") as mock_delay: - context = {"tenant_id": "tenant-123", "app_id": "app-123"} - payload = {"tool_name": "test_tool", "tool_inputs": {}, "tool_outputs": "result"} + emit(TelemetryCase.TOOL_EXECUTION, context, payload, mock_trace_manager) - gateway.emit(TelemetryCase.TOOL_EXECUTION, context, payload) + mock_trace_manager.add_trace_task.assert_called_once() - mock_delay.assert_called_once() - envelope_json = mock_delay.call_args[0][0] - envelope = TelemetryEnvelope.model_validate_json(envelope_json) - assert envelope.case == TelemetryCase.TOOL_EXECUTION - - def test_moderation_check_metric_routed( + @pytest.mark.usefixtures("mock_ops_trace_manager") + @patch("core.telemetry.gateway.is_enterprise_telemetry_enabled", return_value=True) + def test_moderation_check_trace_routed( self, - gateway: TelemetryGateway, + _mock_ee_enabled: MagicMock, ) -> None: - from enterprise.telemetry.contracts import TelemetryEnvelope + mock_trace_manager = MagicMock() + context = {"tenant_id": "tenant-123", "app_id": "app-123"} + payload = {"message_id": "msg-123", "moderation_result": {"flagged": False}} - with patch("tasks.enterprise_telemetry_task.process_enterprise_telemetry.delay") as mock_delay: - context = {"tenant_id": "tenant-123", "app_id": "app-123"} - payload = {"message_id": "msg-123", "moderation_result": {"flagged": False}} + emit(TelemetryCase.MODERATION_CHECK, context, payload, mock_trace_manager) - gateway.emit(TelemetryCase.MODERATION_CHECK, context, payload) - - mock_delay.assert_called_once() - envelope_json = mock_delay.call_args[0][0] - envelope = TelemetryEnvelope.model_validate_json(envelope_json) - assert envelope.case == TelemetryCase.MODERATION_CHECK + mock_trace_manager.add_trace_task.assert_called_once() class TestGatewayIntegrationCEEligibility: - @pytest.fixture - def gateway(self) -> TelemetryGateway: - return TelemetryGateway() - @pytest.fixture def mock_trace_manager(self) -> MagicMock: return MagicMock() @@ -174,70 +152,65 @@ class TestGatewayIntegrationCEEligibility: @pytest.mark.usefixtures("mock_ops_trace_manager") def test_workflow_run_is_ce_eligible( self, - gateway: TelemetryGateway, mock_trace_manager: MagicMock, ) -> None: - with patch("enterprise.telemetry.gateway._is_enterprise_telemetry_enabled", return_value=False): + with patch("core.telemetry.gateway.is_enterprise_telemetry_enabled", return_value=False): context = {"app_id": "app-123", "user_id": "user-456"} payload = {"workflow_run_id": "run-abc"} - gateway.emit(TelemetryCase.WORKFLOW_RUN, context, payload, mock_trace_manager) + emit(TelemetryCase.WORKFLOW_RUN, context, payload, mock_trace_manager) mock_trace_manager.add_trace_task.assert_called_once() @pytest.mark.usefixtures("mock_ops_trace_manager") def test_message_run_is_ce_eligible( self, - gateway: TelemetryGateway, mock_trace_manager: MagicMock, ) -> None: - with patch("enterprise.telemetry.gateway._is_enterprise_telemetry_enabled", return_value=False): + with patch("core.telemetry.gateway.is_enterprise_telemetry_enabled", return_value=False): context = {"app_id": "app-123", "user_id": "user-456"} payload = {"message_id": "msg-abc", "conversation_id": "conv-123"} - gateway.emit(TelemetryCase.MESSAGE_RUN, context, payload, mock_trace_manager) + emit(TelemetryCase.MESSAGE_RUN, context, payload, mock_trace_manager) mock_trace_manager.add_trace_task.assert_called_once() @pytest.mark.usefixtures("mock_ops_trace_manager") def test_node_execution_not_ce_eligible( self, - gateway: TelemetryGateway, mock_trace_manager: MagicMock, ) -> None: - with patch("enterprise.telemetry.gateway._is_enterprise_telemetry_enabled", return_value=False): + with patch("core.telemetry.gateway.is_enterprise_telemetry_enabled", return_value=False): context = {"app_id": "app-123", "user_id": "user-456"} payload = {"node_id": "node-abc"} - gateway.emit(TelemetryCase.NODE_EXECUTION, context, payload, mock_trace_manager) + emit(TelemetryCase.NODE_EXECUTION, context, payload, mock_trace_manager) mock_trace_manager.add_trace_task.assert_not_called() @pytest.mark.usefixtures("mock_ops_trace_manager") def test_draft_node_execution_not_ce_eligible( self, - gateway: TelemetryGateway, mock_trace_manager: MagicMock, ) -> None: - with patch("enterprise.telemetry.gateway._is_enterprise_telemetry_enabled", return_value=False): + with patch("core.telemetry.gateway.is_enterprise_telemetry_enabled", return_value=False): context = {"app_id": "app-123", "user_id": "user-456"} payload = {"node_execution_data": {}} - gateway.emit(TelemetryCase.DRAFT_NODE_EXECUTION, context, payload, mock_trace_manager) + emit(TelemetryCase.DRAFT_NODE_EXECUTION, context, payload, mock_trace_manager) mock_trace_manager.add_trace_task.assert_not_called() @pytest.mark.usefixtures("mock_ops_trace_manager") def test_prompt_generation_not_ce_eligible( self, - gateway: TelemetryGateway, mock_trace_manager: MagicMock, ) -> None: - with patch("enterprise.telemetry.gateway._is_enterprise_telemetry_enabled", return_value=False): + with patch("core.telemetry.gateway.is_enterprise_telemetry_enabled", return_value=False): context = {"app_id": "app-123", "user_id": "user-456", "tenant_id": "tenant-789"} payload = {"operation_type": "generate", "instruction": "test"} - gateway.emit(TelemetryCase.PROMPT_GENERATION, context, payload, mock_trace_manager) + emit(TelemetryCase.PROMPT_GENERATION, context, payload, mock_trace_manager) mock_trace_manager.add_trace_task.assert_not_called() diff --git a/api/tests/unit_tests/enterprise/telemetry/test_contracts.py b/api/tests/unit_tests/enterprise/telemetry/test_contracts.py index ce2162c5f4..d81847b2f0 100644 --- a/api/tests/unit_tests/enterprise/telemetry/test_contracts.py +++ b/api/tests/unit_tests/enterprise/telemetry/test_contracts.py @@ -5,8 +5,8 @@ from __future__ import annotations import pytest from pydantic import ValidationError +from core.telemetry.gateway import CASE_ROUTING from enterprise.telemetry.contracts import CaseRoute, SignalType, TelemetryCase, TelemetryEnvelope -from enterprise.telemetry.gateway import CASE_ROUTING class TestTelemetryCase: @@ -221,11 +221,6 @@ class TestCaseRouting: TelemetryCase.APP_UPDATED, TelemetryCase.APP_DELETED, TelemetryCase.FEEDBACK_CREATED, - TelemetryCase.TOOL_EXECUTION, - TelemetryCase.MODERATION_CHECK, - TelemetryCase.SUGGESTED_QUESTION, - TelemetryCase.DATASET_RETRIEVAL, - TelemetryCase.GENERATE_NAME, } for case in metric_log_cases: route = CASE_ROUTING[case] @@ -240,17 +235,17 @@ class TestCaseRouting: TelemetryCase.NODE_EXECUTION, TelemetryCase.DRAFT_NODE_EXECUTION, TelemetryCase.PROMPT_GENERATION, + TelemetryCase.TOOL_EXECUTION, + TelemetryCase.MODERATION_CHECK, + TelemetryCase.SUGGESTED_QUESTION, + TelemetryCase.DATASET_RETRIEVAL, + TelemetryCase.GENERATE_NAME, } metric_log_cases = { TelemetryCase.APP_CREATED, TelemetryCase.APP_UPDATED, TelemetryCase.APP_DELETED, TelemetryCase.FEEDBACK_CREATED, - TelemetryCase.TOOL_EXECUTION, - TelemetryCase.MODERATION_CHECK, - TelemetryCase.SUGGESTED_QUESTION, - TelemetryCase.DATASET_RETRIEVAL, - TelemetryCase.GENERATE_NAME, } all_cases = trace_cases | metric_log_cases diff --git a/api/tests/unit_tests/enterprise/telemetry/test_event_handlers.py b/api/tests/unit_tests/enterprise/telemetry/test_event_handlers.py index 13902e8340..ad15c9f096 100644 --- a/api/tests/unit_tests/enterprise/telemetry/test_event_handlers.py +++ b/api/tests/unit_tests/enterprise/telemetry/test_event_handlers.py @@ -7,20 +7,12 @@ from enterprise.telemetry.contracts import TelemetryCase @pytest.fixture -def mock_exporter(): - with patch("extensions.ext_enterprise_telemetry.get_enterprise_exporter") as mock: - exporter = MagicMock() - mock.return_value = exporter - yield exporter - - -@pytest.fixture -def mock_task(): - with patch("tasks.enterprise_telemetry_task.process_enterprise_telemetry") as mock: +def mock_gateway_emit(): + with patch("core.telemetry.gateway.emit") as mock: yield mock -def test_handle_app_created_calls_task(mock_exporter, mock_task): +def test_handle_app_created_calls_task(mock_gateway_emit): sender = MagicMock() sender.id = "app-123" sender.tenant_id = "tenant-456" @@ -28,54 +20,53 @@ def test_handle_app_created_calls_task(mock_exporter, mock_task): event_handlers._handle_app_created(sender) - mock_task.delay.assert_called_once() - call_args = mock_task.delay.call_args[0][0] - assert "app_created" in call_args - assert "tenant-456" in call_args - assert "app-123" in call_args - assert "chat" in call_args + mock_gateway_emit.assert_called_once_with( + case=TelemetryCase.APP_CREATED, + context={"tenant_id": "tenant-456"}, + payload={"app_id": "app-123", "mode": "chat"}, + ) -def test_handle_app_created_no_exporter(mock_task): - with patch("extensions.ext_enterprise_telemetry.get_enterprise_exporter", return_value=None): - sender = MagicMock() - sender.id = "app-123" - sender.tenant_id = "tenant-456" +def test_handle_app_created_no_exporter(mock_gateway_emit): + """Gateway handles exporter availability internally; handler always calls gateway.""" + sender = MagicMock() + sender.id = "app-123" + sender.tenant_id = "tenant-456" - event_handlers._handle_app_created(sender) + event_handlers._handle_app_created(sender) - mock_task.delay.assert_not_called() + mock_gateway_emit.assert_called_once() -def test_handle_app_updated_calls_task(mock_exporter, mock_task): +def test_handle_app_updated_calls_task(mock_gateway_emit): sender = MagicMock() sender.id = "app-123" sender.tenant_id = "tenant-456" event_handlers._handle_app_updated(sender) - mock_task.delay.assert_called_once() - call_args = mock_task.delay.call_args[0][0] - assert "app_updated" in call_args - assert "tenant-456" in call_args - assert "app-123" in call_args + mock_gateway_emit.assert_called_once_with( + case=TelemetryCase.APP_UPDATED, + context={"tenant_id": "tenant-456"}, + payload={"app_id": "app-123"}, + ) -def test_handle_app_deleted_calls_task(mock_exporter, mock_task): +def test_handle_app_deleted_calls_task(mock_gateway_emit): sender = MagicMock() sender.id = "app-123" sender.tenant_id = "tenant-456" event_handlers._handle_app_deleted(sender) - mock_task.delay.assert_called_once() - call_args = mock_task.delay.call_args[0][0] - assert "app_deleted" in call_args - assert "tenant-456" in call_args - assert "app-123" in call_args + mock_gateway_emit.assert_called_once_with( + case=TelemetryCase.APP_DELETED, + context={"tenant_id": "tenant-456"}, + payload={"app_id": "app-123"}, + ) -def test_handle_feedback_created_calls_task(mock_exporter, mock_task): +def test_handle_feedback_created_calls_task(mock_gateway_emit): sender = MagicMock() sender.message_id = "msg-123" sender.app_id = "app-456" @@ -88,34 +79,34 @@ def test_handle_feedback_created_calls_task(mock_exporter, mock_task): event_handlers._handle_feedback_created(sender, tenant_id="tenant-456") - mock_task.delay.assert_called_once() - call_args = mock_task.delay.call_args[0][0] - assert "feedback_created" in call_args - assert "tenant-456" in call_args - assert "msg-123" in call_args - assert "app-456" in call_args - assert "conv-789" in call_args - assert "user-001" in call_args - assert "like" in call_args - assert "api" in call_args - assert "Great response!" in call_args + mock_gateway_emit.assert_called_once_with( + case=TelemetryCase.FEEDBACK_CREATED, + context={"tenant_id": "tenant-456"}, + payload={ + "message_id": "msg-123", + "app_id": "app-456", + "conversation_id": "conv-789", + "from_end_user_id": "user-001", + "from_account_id": None, + "rating": "like", + "from_source": "api", + "content": "Great response!", + }, + ) -def test_handle_feedback_created_no_exporter(mock_task): - with patch("extensions.ext_enterprise_telemetry.get_enterprise_exporter", return_value=None): - sender = MagicMock() - sender.message_id = "msg-123" +def test_handle_feedback_created_no_exporter(mock_gateway_emit): + """Gateway handles exporter availability internally; handler always calls gateway.""" + sender = MagicMock() + sender.message_id = "msg-123" - event_handlers._handle_feedback_created(sender, tenant_id="tenant-456") + event_handlers._handle_feedback_created(sender, tenant_id="tenant-456") - mock_task.delay.assert_not_called() + mock_gateway_emit.assert_called_once() -def test_handlers_create_valid_envelopes(mock_exporter, mock_task): - import json - - from enterprise.telemetry.contracts import TelemetryEnvelope - +def test_handlers_create_valid_envelopes(mock_gateway_emit): + """Verify handlers pass correct TelemetryCase and payload structure.""" sender = MagicMock() sender.id = "app-123" sender.tenant_id = "tenant-456" @@ -123,12 +114,8 @@ def test_handlers_create_valid_envelopes(mock_exporter, mock_task): event_handlers._handle_app_created(sender) - call_args = mock_task.delay.call_args[0][0] - envelope_dict = json.loads(call_args) - envelope = TelemetryEnvelope(**envelope_dict) - - assert envelope.case == TelemetryCase.APP_CREATED - assert envelope.tenant_id == "tenant-456" - assert envelope.event_id - assert envelope.payload["app_id"] == "app-123" - assert envelope.payload["mode"] == "chat" + call_kwargs = mock_gateway_emit.call_args[1] + assert call_kwargs["case"] == TelemetryCase.APP_CREATED + assert call_kwargs["context"]["tenant_id"] == "tenant-456" + assert call_kwargs["payload"]["app_id"] == "app-123" + assert call_kwargs["payload"]["mode"] == "chat" diff --git a/api/tests/unit_tests/enterprise/telemetry/test_gateway.py b/api/tests/unit_tests/enterprise/telemetry/test_gateway.py index ff226dd56c..d979dc7336 100644 --- a/api/tests/unit_tests/enterprise/telemetry/test_gateway.py +++ b/api/tests/unit_tests/enterprise/telemetry/test_gateway.py @@ -6,14 +6,13 @@ from unittest.mock import MagicMock, patch import pytest from core.ops.entities.trace_entity import TraceTaskName -from enterprise.telemetry.contracts import SignalType, TelemetryCase, TelemetryEnvelope -from enterprise.telemetry.gateway import ( +from core.telemetry.gateway import ( CASE_ROUTING, CASE_TO_TRACE_TASK, PAYLOAD_SIZE_THRESHOLD_BYTES, - TelemetryGateway, emit, ) +from enterprise.telemetry.contracts import SignalType, TelemetryCase, TelemetryEnvelope class TestCaseRoutingTable: @@ -38,17 +37,20 @@ class TestCaseRoutingTable: TelemetryCase.APP_UPDATED, TelemetryCase.APP_DELETED, TelemetryCase.FEEDBACK_CREATED, + ] + for case in metric_log_cases: + assert CASE_ROUTING[case].signal_type is SignalType.METRIC_LOG, f"{case} should be metric_log" + + def test_ce_eligible_cases(self) -> None: + ce_eligible_cases = [ + TelemetryCase.WORKFLOW_RUN, + TelemetryCase.MESSAGE_RUN, TelemetryCase.TOOL_EXECUTION, TelemetryCase.MODERATION_CHECK, TelemetryCase.SUGGESTED_QUESTION, TelemetryCase.DATASET_RETRIEVAL, TelemetryCase.GENERATE_NAME, ] - for case in metric_log_cases: - assert CASE_ROUTING[case].signal_type is SignalType.METRIC_LOG, f"{case} should be metric_log" - - def test_ce_eligible_cases(self) -> None: - ce_eligible_cases = [TelemetryCase.WORKFLOW_RUN, TelemetryCase.MESSAGE_RUN] for case in ce_eligible_cases: assert CASE_ROUTING[case].ce_eligible is True, f"{case} should be CE eligible" @@ -87,91 +89,80 @@ def mock_ops_trace_manager(): yield mock_module, mock_trace_entity -class TestTelemetryGatewayTraceRouting: - @pytest.fixture - def gateway(self) -> TelemetryGateway: - return TelemetryGateway() - +class TestGatewayTraceRouting: @pytest.fixture def mock_trace_manager(self) -> MagicMock: return MagicMock() - @patch("enterprise.telemetry.gateway._is_enterprise_telemetry_enabled", return_value=True) + @patch("core.telemetry.gateway.is_enterprise_telemetry_enabled", return_value=True) def test_trace_case_routes_to_trace_manager( self, _mock_ee_enabled: MagicMock, - gateway: TelemetryGateway, mock_trace_manager: MagicMock, mock_ops_trace_manager: tuple[MagicMock, MagicMock], ) -> None: context = {"app_id": "app-123", "user_id": "user-456", "tenant_id": "tenant-789"} payload = {"workflow_run_id": "run-abc"} - gateway.emit(TelemetryCase.WORKFLOW_RUN, context, payload, mock_trace_manager) + emit(TelemetryCase.WORKFLOW_RUN, context, payload, mock_trace_manager) mock_trace_manager.add_trace_task.assert_called_once() - @patch("enterprise.telemetry.gateway._is_enterprise_telemetry_enabled", return_value=False) + @patch("core.telemetry.gateway.is_enterprise_telemetry_enabled", return_value=False) def test_ce_eligible_trace_enqueued_when_ee_disabled( self, _mock_ee_enabled: MagicMock, - gateway: TelemetryGateway, mock_trace_manager: MagicMock, mock_ops_trace_manager: tuple[MagicMock, MagicMock], ) -> None: context = {"app_id": "app-123", "user_id": "user-456"} payload = {"workflow_run_id": "run-abc"} - gateway.emit(TelemetryCase.WORKFLOW_RUN, context, payload, mock_trace_manager) + emit(TelemetryCase.WORKFLOW_RUN, context, payload, mock_trace_manager) mock_trace_manager.add_trace_task.assert_called_once() - @patch("enterprise.telemetry.gateway._is_enterprise_telemetry_enabled", return_value=False) + @patch("core.telemetry.gateway.is_enterprise_telemetry_enabled", return_value=False) def test_enterprise_only_trace_dropped_when_ee_disabled( self, _mock_ee_enabled: MagicMock, - gateway: TelemetryGateway, mock_trace_manager: MagicMock, mock_ops_trace_manager: tuple[MagicMock, MagicMock], ) -> None: context = {"app_id": "app-123", "user_id": "user-456"} payload = {"node_id": "node-abc"} - gateway.emit(TelemetryCase.NODE_EXECUTION, context, payload, mock_trace_manager) + emit(TelemetryCase.NODE_EXECUTION, context, payload, mock_trace_manager) mock_trace_manager.add_trace_task.assert_not_called() - @patch("enterprise.telemetry.gateway._is_enterprise_telemetry_enabled", return_value=True) + @patch("core.telemetry.gateway.is_enterprise_telemetry_enabled", return_value=True) def test_enterprise_only_trace_enqueued_when_ee_enabled( self, _mock_ee_enabled: MagicMock, - gateway: TelemetryGateway, mock_trace_manager: MagicMock, mock_ops_trace_manager: tuple[MagicMock, MagicMock], ) -> None: context = {"app_id": "app-123", "user_id": "user-456"} payload = {"node_id": "node-abc"} - gateway.emit(TelemetryCase.NODE_EXECUTION, context, payload, mock_trace_manager) + emit(TelemetryCase.NODE_EXECUTION, context, payload, mock_trace_manager) mock_trace_manager.add_trace_task.assert_called_once() -class TestTelemetryGatewayMetricLogRouting: - @pytest.fixture - def gateway(self) -> TelemetryGateway: - return TelemetryGateway() - +class TestGatewayMetricLogRouting: + @patch("core.telemetry.gateway.is_enterprise_telemetry_enabled", return_value=True) @patch("tasks.enterprise_telemetry_task.process_enterprise_telemetry.delay") def test_metric_case_routes_to_celery_task( self, mock_delay: MagicMock, - gateway: TelemetryGateway, + _mock_ee_enabled: MagicMock, ) -> None: context = {"tenant_id": "tenant-123"} payload = {"app_id": "app-abc", "name": "My App"} - gateway.emit(TelemetryCase.APP_CREATED, context, payload) + emit(TelemetryCase.APP_CREATED, context, payload) mock_delay.assert_called_once() envelope_json = mock_delay.call_args[0][0] @@ -180,17 +171,18 @@ class TestTelemetryGatewayMetricLogRouting: assert envelope.tenant_id == "tenant-123" assert envelope.payload["app_id"] == "app-abc" + @patch("core.telemetry.gateway.is_enterprise_telemetry_enabled", return_value=True) @patch("tasks.enterprise_telemetry_task.process_enterprise_telemetry.delay") def test_envelope_has_unique_event_id( self, mock_delay: MagicMock, - gateway: TelemetryGateway, + _mock_ee_enabled: MagicMock, ) -> None: context = {"tenant_id": "tenant-123"} payload = {"app_id": "app-abc"} - gateway.emit(TelemetryCase.APP_CREATED, context, payload) - gateway.emit(TelemetryCase.APP_CREATED, context, payload) + emit(TelemetryCase.APP_CREATED, context, payload) + emit(TelemetryCase.APP_CREATED, context, payload) assert mock_delay.call_count == 2 envelope1 = TelemetryEnvelope.model_validate_json(mock_delay.call_args_list[0][0][0]) @@ -198,40 +190,38 @@ class TestTelemetryGatewayMetricLogRouting: assert envelope1.event_id != envelope2.event_id -class TestTelemetryGatewayPayloadSizing: - @pytest.fixture - def gateway(self) -> TelemetryGateway: - return TelemetryGateway() - +class TestGatewayPayloadSizing: + @patch("core.telemetry.gateway.is_enterprise_telemetry_enabled", return_value=True) @patch("tasks.enterprise_telemetry_task.process_enterprise_telemetry.delay") def test_small_payload_inlined( self, mock_delay: MagicMock, - gateway: TelemetryGateway, + _mock_ee_enabled: MagicMock, ) -> None: context = {"tenant_id": "tenant-123"} payload = {"key": "small_value"} - gateway.emit(TelemetryCase.APP_CREATED, context, payload) + emit(TelemetryCase.APP_CREATED, context, payload) envelope_json = mock_delay.call_args[0][0] envelope = TelemetryEnvelope.model_validate_json(envelope_json) assert envelope.payload == payload assert envelope.metadata is None - @patch("enterprise.telemetry.gateway.storage") + @patch("core.telemetry.gateway.is_enterprise_telemetry_enabled", return_value=True) + @patch("core.telemetry.gateway.storage") @patch("tasks.enterprise_telemetry_task.process_enterprise_telemetry.delay") def test_large_payload_stored( self, mock_delay: MagicMock, mock_storage: MagicMock, - gateway: TelemetryGateway, + _mock_ee_enabled: MagicMock, ) -> None: context = {"tenant_id": "tenant-123"} large_value = "x" * (PAYLOAD_SIZE_THRESHOLD_BYTES + 1000) payload = {"key": large_value} - gateway.emit(TelemetryCase.APP_CREATED, context, payload) + emit(TelemetryCase.APP_CREATED, context, payload) mock_storage.save.assert_called_once() storage_key = mock_storage.save.call_args[0][0] @@ -243,20 +233,21 @@ class TestTelemetryGatewayPayloadSizing: assert envelope.metadata is not None assert envelope.metadata["payload_ref"] == storage_key - @patch("enterprise.telemetry.gateway.storage") + @patch("core.telemetry.gateway.is_enterprise_telemetry_enabled", return_value=True) + @patch("core.telemetry.gateway.storage") @patch("tasks.enterprise_telemetry_task.process_enterprise_telemetry.delay") def test_large_payload_fallback_on_storage_error( self, mock_delay: MagicMock, mock_storage: MagicMock, - gateway: TelemetryGateway, + _mock_ee_enabled: MagicMock, ) -> None: mock_storage.save.side_effect = Exception("Storage failure") context = {"tenant_id": "tenant-123"} large_value = "x" * (PAYLOAD_SIZE_THRESHOLD_BYTES + 1000) payload = {"key": large_value} - gateway.emit(TelemetryCase.APP_CREATED, context, payload) + emit(TelemetryCase.APP_CREATED, context, payload) envelope_json = mock_delay.call_args[0][0] envelope = TelemetryEnvelope.model_validate_json(envelope_json) @@ -264,26 +255,6 @@ class TestTelemetryGatewayPayloadSizing: assert envelope.metadata is None -class TestModuleLevelFunctions: - @patch("extensions.ext_enterprise_telemetry.get_gateway") - @patch("enterprise.telemetry.gateway._is_enterprise_telemetry_enabled", return_value=True) - def test_emit_function_uses_gateway( - self, - _mock_ee_enabled: MagicMock, - mock_get_gateway: MagicMock, - mock_ops_trace_manager: tuple[MagicMock, MagicMock], - ) -> None: - mock_gateway = TelemetryGateway() - mock_get_gateway.return_value = mock_gateway - mock_trace_manager = MagicMock() - context = {"app_id": "app-123", "user_id": "user-456"} - payload = {"workflow_run_id": "run-abc"} - - with patch.object(mock_gateway, "emit") as mock_emit: - emit(TelemetryCase.WORKFLOW_RUN, context, payload, mock_trace_manager) - mock_emit.assert_called_once_with(TelemetryCase.WORKFLOW_RUN, context, payload, mock_trace_manager) - - class TestTraceTaskNameMapping: def test_workflow_run_mapping(self) -> None: assert CASE_TO_TRACE_TASK[TelemetryCase.WORKFLOW_RUN] is TraceTaskName.WORKFLOW_TRACE diff --git a/api/tests/unit_tests/enterprise/telemetry/test_metric_handler.py b/api/tests/unit_tests/enterprise/telemetry/test_metric_handler.py index a858c8e95a..9a345a55ff 100644 --- a/api/tests/unit_tests/enterprise/telemetry/test_metric_handler.py +++ b/api/tests/unit_tests/enterprise/telemetry/test_metric_handler.py @@ -304,6 +304,7 @@ def test_on_app_created_emits_correct_event(mock_redis): attributes={ "dify.app.id": "app-789", "dify.tenant_id": "tenant-123", + "dify.event.id": "event-456", "dify.app.mode": "chat", }, tenant_id="tenant-123", @@ -345,6 +346,7 @@ def test_on_app_updated_emits_correct_event(mock_redis): attributes={ "dify.app.id": "app-789", "dify.tenant_id": "tenant-123", + "dify.event.id": "event-456", }, tenant_id="tenant-123", ) @@ -384,6 +386,7 @@ def test_on_app_deleted_emits_correct_event(mock_redis): attributes={ "dify.app.id": "app-789", "dify.tenant_id": "tenant-123", + "dify.event.id": "event-456", }, tenant_id="tenant-123", )