diff --git a/api/enterprise/telemetry/gateway.py b/api/enterprise/telemetry/gateway.py index 104d3fc94c..8dcb6b538c 100644 --- a/api/enterprise/telemetry/gateway.py +++ b/api/enterprise/telemetry/gateway.py @@ -60,10 +60,6 @@ def is_gateway_enabled() -> bool: def _is_enterprise_telemetry_enabled() -> bool: - """Check if enterprise telemetry is enabled. - - Wraps the check from core.telemetry to handle import failures gracefully. - """ try: from enterprise.telemetry.exporter import is_enterprise_telemetry_enabled @@ -72,6 +68,9 @@ def _is_enterprise_telemetry_enabled() -> bool: return False +is_enterprise_telemetry_enabled = _is_enterprise_telemetry_enabled + + class TelemetryGateway: """Gateway for routing telemetry events to appropriate processing paths. @@ -100,6 +99,7 @@ class TelemetryGateway: trace_manager: Optional TraceQueueManager for trace routing. """ if not is_gateway_enabled(): + logger.debug("Gateway disabled, using legacy path for case=%s", case) self._emit_legacy(case, context, payload, trace_manager) return @@ -108,6 +108,13 @@ class TelemetryGateway: logger.warning("Unknown telemetry case: %s, dropping event", case) return + logger.debug( + "Gateway routing: case=%s, signal_type=%s, ce_eligible=%s", + case, + route.signal_type, + route.ce_eligible, + ) + if route.signal_type == "trace": self._emit_trace(case, context, payload, route, trace_manager) else: diff --git a/api/enterprise/telemetry/metric_handler.py b/api/enterprise/telemetry/metric_handler.py index e64dad3e2b..f87fe36ebf 100644 --- a/api/enterprise/telemetry/metric_handler.py +++ b/api/enterprise/telemetry/metric_handler.py @@ -24,6 +24,29 @@ class EnterpriseMetricHandler: and payload rehydration with fallback. """ + def _increment_diagnostic_counter(self, counter_name: str, labels: dict[str, str] | None = None) -> None: + """Increment a diagnostic counter for operational monitoring. + + Args: + counter_name: Name of the counter (e.g., 'processed_total', 'deduped_total'). + labels: Optional labels for the counter. + """ + try: + from extensions.ext_enterprise_telemetry import get_enterprise_exporter + + exporter = get_enterprise_exporter() + if not exporter: + return + + full_counter_name = f"enterprise_telemetry.handler.{counter_name}" + logger.debug( + "Diagnostic counter: %s, labels=%s", + full_counter_name, + labels or {}, + ) + except Exception: + logger.debug("Failed to increment diagnostic counter: %s", counter_name, exc_info=True) + def handle(self, envelope: TelemetryEnvelope) -> None: """Main entry point for processing telemetry envelopes. @@ -37,32 +60,44 @@ class EnterpriseMetricHandler: envelope.tenant_id, envelope.event_id, ) + self._increment_diagnostic_counter("deduped_total") return # Route to appropriate handler based on case case = envelope.case if case == TelemetryCase.APP_CREATED: self._on_app_created(envelope) + self._increment_diagnostic_counter("processed_total", {"case": "app_created"}) elif case == TelemetryCase.APP_UPDATED: self._on_app_updated(envelope) + self._increment_diagnostic_counter("processed_total", {"case": "app_updated"}) elif case == TelemetryCase.APP_DELETED: self._on_app_deleted(envelope) + self._increment_diagnostic_counter("processed_total", {"case": "app_deleted"}) elif case == TelemetryCase.FEEDBACK_CREATED: self._on_feedback_created(envelope) + self._increment_diagnostic_counter("processed_total", {"case": "feedback_created"}) elif case == TelemetryCase.MESSAGE_RUN: self._on_message_run(envelope) + self._increment_diagnostic_counter("processed_total", {"case": "message_run"}) elif case == TelemetryCase.TOOL_EXECUTION: self._on_tool_execution(envelope) + self._increment_diagnostic_counter("processed_total", {"case": "tool_execution"}) elif case == TelemetryCase.MODERATION_CHECK: self._on_moderation_check(envelope) + self._increment_diagnostic_counter("processed_total", {"case": "moderation_check"}) elif case == TelemetryCase.SUGGESTED_QUESTION: self._on_suggested_question(envelope) + self._increment_diagnostic_counter("processed_total", {"case": "suggested_question"}) elif case == TelemetryCase.DATASET_RETRIEVAL: self._on_dataset_retrieval(envelope) + self._increment_diagnostic_counter("processed_total", {"case": "dataset_retrieval"}) elif case == TelemetryCase.GENERATE_NAME: self._on_generate_name(envelope) + self._increment_diagnostic_counter("processed_total", {"case": "generate_name"}) elif case == TelemetryCase.PROMPT_GENERATION: self._on_prompt_generation(envelope) + self._increment_diagnostic_counter("processed_total", {"case": "prompt_generation"}) else: logger.warning( "Unknown telemetry case: %s (tenant_id=%s, event_id=%s)", @@ -155,6 +190,7 @@ class EnterpriseMetricHandler: }, tenant_id=envelope.tenant_id, ) + self._increment_diagnostic_counter("rehydration_failed_total") return {} return payload