From 3d3e8d75d89bcba72cab0596845a0ec12f2edd65 Mon Sep 17 00:00:00 2001 From: GareArc Date: Thu, 5 Feb 2026 19:03:33 -0800 Subject: [PATCH] feat(telemetry): add gateway envelope contracts and routing table --- api/enterprise/telemetry/contracts.py | 76 +++++ api/enterprise/telemetry/gateway.py | 26 ++ .../enterprise/telemetry/test_contracts.py | 264 ++++++++++++++++++ 3 files changed, 366 insertions(+) create mode 100644 api/enterprise/telemetry/contracts.py create mode 100644 api/enterprise/telemetry/gateway.py create mode 100644 api/tests/unit_tests/enterprise/telemetry/test_contracts.py diff --git a/api/enterprise/telemetry/contracts.py b/api/enterprise/telemetry/contracts.py new file mode 100644 index 0000000000..a0bf4e8f19 --- /dev/null +++ b/api/enterprise/telemetry/contracts.py @@ -0,0 +1,76 @@ +"""Telemetry gateway contracts and data structures. + +This module defines the envelope format for telemetry events and the routing +configuration that determines how each event type is processed. +""" + +from __future__ import annotations + +from enum import StrEnum +from typing import Any, Literal + +from pydantic import BaseModel, field_validator + + +class TelemetryCase(StrEnum): + """Enumeration of all known telemetry event cases.""" + + WORKFLOW_RUN = "workflow_run" + NODE_EXECUTION = "node_execution" + DRAFT_NODE_EXECUTION = "draft_node_execution" + MESSAGE_RUN = "message_run" + TOOL_EXECUTION = "tool_execution" + MODERATION_CHECK = "moderation_check" + SUGGESTED_QUESTION = "suggested_question" + DATASET_RETRIEVAL = "dataset_retrieval" + GENERATE_NAME = "generate_name" + PROMPT_GENERATION = "prompt_generation" + APP_CREATED = "app_created" + APP_UPDATED = "app_updated" + APP_DELETED = "app_deleted" + FEEDBACK_CREATED = "feedback_created" + + +class CaseRoute(BaseModel): + """Routing configuration for a telemetry case. + + Attributes: + signal_type: The type of signal ("trace" or "metric_log"). + ce_eligible: Whether this case is eligible for customer engagement. + """ + + signal_type: Literal["trace", "metric_log"] + ce_eligible: bool + + +class TelemetryEnvelope(BaseModel): + """Envelope for telemetry events. + + Attributes: + case: The telemetry case type. + tenant_id: The tenant identifier. + event_id: Unique event identifier for deduplication. + payload: The main event payload. + payload_fallback: Fallback payload (max 64KB). + metadata: Optional metadata dictionary. + """ + + case: TelemetryCase + tenant_id: str + event_id: str + payload: dict[str, Any] + payload_fallback: bytes | None = None + metadata: dict[str, Any] | None = None + + @field_validator("payload_fallback") + @classmethod + def validate_payload_fallback_size(cls, v: bytes | None) -> bytes | None: + """Validate that payload_fallback does not exceed 64KB.""" + if v is not None and len(v) > 65536: # 64 * 1024 + raise ValueError("payload_fallback must not exceed 64KB") + return v + + class Config: + """Pydantic configuration.""" + + use_enum_values = False diff --git a/api/enterprise/telemetry/gateway.py b/api/enterprise/telemetry/gateway.py new file mode 100644 index 0000000000..d04222fd9c --- /dev/null +++ b/api/enterprise/telemetry/gateway.py @@ -0,0 +1,26 @@ +"""Telemetry gateway routing configuration. + +This module defines the routing table that maps telemetry cases to their +processing routes (trace vs metric/log) and customer engagement eligibility. +""" + +from __future__ import annotations + +from enterprise.telemetry.contracts import CaseRoute, TelemetryCase + +CASE_ROUTING: dict[TelemetryCase, CaseRoute] = { + TelemetryCase.WORKFLOW_RUN: CaseRoute(signal_type="trace", ce_eligible=True), + TelemetryCase.MESSAGE_RUN: CaseRoute(signal_type="trace", ce_eligible=True), + TelemetryCase.NODE_EXECUTION: CaseRoute(signal_type="trace", ce_eligible=False), + TelemetryCase.DRAFT_NODE_EXECUTION: CaseRoute(signal_type="trace", ce_eligible=False), + TelemetryCase.PROMPT_GENERATION: CaseRoute(signal_type="trace", ce_eligible=False), + TelemetryCase.APP_CREATED: CaseRoute(signal_type="metric_log", ce_eligible=False), + TelemetryCase.APP_UPDATED: CaseRoute(signal_type="metric_log", ce_eligible=False), + TelemetryCase.APP_DELETED: CaseRoute(signal_type="metric_log", ce_eligible=False), + TelemetryCase.FEEDBACK_CREATED: CaseRoute(signal_type="metric_log", ce_eligible=False), + TelemetryCase.TOOL_EXECUTION: CaseRoute(signal_type="metric_log", ce_eligible=False), + TelemetryCase.MODERATION_CHECK: CaseRoute(signal_type="metric_log", ce_eligible=False), + TelemetryCase.SUGGESTED_QUESTION: CaseRoute(signal_type="metric_log", ce_eligible=False), + TelemetryCase.DATASET_RETRIEVAL: CaseRoute(signal_type="metric_log", ce_eligible=False), + TelemetryCase.GENERATE_NAME: CaseRoute(signal_type="metric_log", ce_eligible=False), +} diff --git a/api/tests/unit_tests/enterprise/telemetry/test_contracts.py b/api/tests/unit_tests/enterprise/telemetry/test_contracts.py new file mode 100644 index 0000000000..38c279da67 --- /dev/null +++ b/api/tests/unit_tests/enterprise/telemetry/test_contracts.py @@ -0,0 +1,264 @@ +"""Unit tests for telemetry gateway contracts.""" + +from __future__ import annotations + +import pytest +from pydantic import ValidationError + +from enterprise.telemetry.contracts import CaseRoute, TelemetryCase, TelemetryEnvelope +from enterprise.telemetry.gateway import CASE_ROUTING + + +class TestTelemetryCase: + """Tests for TelemetryCase enum.""" + + def test_all_cases_defined(self) -> None: + """Verify all 14 telemetry cases are defined.""" + expected_cases = { + "WORKFLOW_RUN", + "NODE_EXECUTION", + "DRAFT_NODE_EXECUTION", + "MESSAGE_RUN", + "TOOL_EXECUTION", + "MODERATION_CHECK", + "SUGGESTED_QUESTION", + "DATASET_RETRIEVAL", + "GENERATE_NAME", + "PROMPT_GENERATION", + "APP_CREATED", + "APP_UPDATED", + "APP_DELETED", + "FEEDBACK_CREATED", + } + actual_cases = {case.name for case in TelemetryCase} + assert actual_cases == expected_cases + + def test_case_values(self) -> None: + """Verify case enum values are correct.""" + assert TelemetryCase.WORKFLOW_RUN.value == "workflow_run" + assert TelemetryCase.NODE_EXECUTION.value == "node_execution" + assert TelemetryCase.DRAFT_NODE_EXECUTION.value == "draft_node_execution" + assert TelemetryCase.MESSAGE_RUN.value == "message_run" + assert TelemetryCase.TOOL_EXECUTION.value == "tool_execution" + assert TelemetryCase.MODERATION_CHECK.value == "moderation_check" + assert TelemetryCase.SUGGESTED_QUESTION.value == "suggested_question" + assert TelemetryCase.DATASET_RETRIEVAL.value == "dataset_retrieval" + assert TelemetryCase.GENERATE_NAME.value == "generate_name" + assert TelemetryCase.PROMPT_GENERATION.value == "prompt_generation" + assert TelemetryCase.APP_CREATED.value == "app_created" + assert TelemetryCase.APP_UPDATED.value == "app_updated" + assert TelemetryCase.APP_DELETED.value == "app_deleted" + assert TelemetryCase.FEEDBACK_CREATED.value == "feedback_created" + + +class TestCaseRoute: + """Tests for CaseRoute model.""" + + def test_valid_trace_route(self) -> None: + """Verify valid trace route creation.""" + route = CaseRoute(signal_type="trace", ce_eligible=True) + assert route.signal_type == "trace" + assert route.ce_eligible is True + + def test_valid_metric_log_route(self) -> None: + """Verify valid metric_log route creation.""" + route = CaseRoute(signal_type="metric_log", ce_eligible=False) + assert route.signal_type == "metric_log" + assert route.ce_eligible is False + + def test_invalid_signal_type(self) -> None: + """Verify invalid signal_type is rejected.""" + with pytest.raises(ValidationError): + CaseRoute(signal_type="invalid", ce_eligible=True) + + +class TestTelemetryEnvelope: + """Tests for TelemetryEnvelope model.""" + + def test_valid_envelope_minimal(self) -> None: + """Verify valid minimal envelope creation.""" + envelope = TelemetryEnvelope( + case=TelemetryCase.WORKFLOW_RUN, + tenant_id="tenant-123", + event_id="event-456", + payload={"key": "value"}, + ) + assert envelope.case == TelemetryCase.WORKFLOW_RUN + assert envelope.tenant_id == "tenant-123" + assert envelope.event_id == "event-456" + assert envelope.payload == {"key": "value"} + assert envelope.payload_fallback is None + assert envelope.metadata is None + + def test_valid_envelope_full(self) -> None: + """Verify valid envelope with all fields.""" + metadata = {"source": "api"} + fallback = b"fallback data" + envelope = TelemetryEnvelope( + case=TelemetryCase.MESSAGE_RUN, + tenant_id="tenant-789", + event_id="event-012", + payload={"message": "hello"}, + payload_fallback=fallback, + metadata=metadata, + ) + assert envelope.case == TelemetryCase.MESSAGE_RUN + assert envelope.tenant_id == "tenant-789" + assert envelope.event_id == "event-012" + assert envelope.payload == {"message": "hello"} + assert envelope.payload_fallback == fallback + assert envelope.metadata == metadata + + def test_missing_required_case(self) -> None: + """Verify missing case field is rejected.""" + with pytest.raises(ValidationError): + TelemetryEnvelope( + tenant_id="tenant-123", + event_id="event-456", + payload={"key": "value"}, + ) + + def test_missing_required_tenant_id(self) -> None: + """Verify missing tenant_id field is rejected.""" + with pytest.raises(ValidationError): + TelemetryEnvelope( + case=TelemetryCase.WORKFLOW_RUN, + event_id="event-456", + payload={"key": "value"}, + ) + + def test_missing_required_event_id(self) -> None: + """Verify missing event_id field is rejected.""" + with pytest.raises(ValidationError): + TelemetryEnvelope( + case=TelemetryCase.WORKFLOW_RUN, + tenant_id="tenant-123", + payload={"key": "value"}, + ) + + def test_missing_required_payload(self) -> None: + """Verify missing payload field is rejected.""" + with pytest.raises(ValidationError): + TelemetryEnvelope( + case=TelemetryCase.WORKFLOW_RUN, + tenant_id="tenant-123", + event_id="event-456", + ) + + def test_payload_fallback_within_limit(self) -> None: + """Verify payload_fallback within 64KB limit is accepted.""" + fallback = b"x" * 65536 + envelope = TelemetryEnvelope( + case=TelemetryCase.WORKFLOW_RUN, + tenant_id="tenant-123", + event_id="event-456", + payload={"key": "value"}, + payload_fallback=fallback, + ) + assert envelope.payload_fallback == fallback + + def test_payload_fallback_exceeds_limit(self) -> None: + """Verify payload_fallback exceeding 64KB is rejected.""" + fallback = b"x" * 65537 + with pytest.raises(ValidationError) as exc_info: + TelemetryEnvelope( + case=TelemetryCase.WORKFLOW_RUN, + tenant_id="tenant-123", + event_id="event-456", + payload={"key": "value"}, + payload_fallback=fallback, + ) + assert "64KB" in str(exc_info.value) + + def test_payload_fallback_none(self) -> None: + """Verify payload_fallback can be None.""" + envelope = TelemetryEnvelope( + case=TelemetryCase.WORKFLOW_RUN, + tenant_id="tenant-123", + event_id="event-456", + payload={"key": "value"}, + payload_fallback=None, + ) + assert envelope.payload_fallback is None + + +class TestCaseRouting: + """Tests for CASE_ROUTING table.""" + + def test_all_cases_routed(self) -> None: + """Verify all 14 cases have routing entries.""" + assert len(CASE_ROUTING) == 14 + for case in TelemetryCase: + assert case in CASE_ROUTING + + def test_trace_ce_eligible_cases(self) -> None: + """Verify trace cases with CE eligibility.""" + ce_eligible_trace_cases = { + TelemetryCase.WORKFLOW_RUN, + TelemetryCase.MESSAGE_RUN, + } + for case in ce_eligible_trace_cases: + route = CASE_ROUTING[case] + assert route.signal_type == "trace" + assert route.ce_eligible is True + + def test_trace_enterprise_only_cases(self) -> None: + """Verify trace cases that are enterprise-only.""" + enterprise_only_trace_cases = { + TelemetryCase.NODE_EXECUTION, + TelemetryCase.DRAFT_NODE_EXECUTION, + TelemetryCase.PROMPT_GENERATION, + } + for case in enterprise_only_trace_cases: + route = CASE_ROUTING[case] + assert route.signal_type == "trace" + assert route.ce_eligible is False + + def test_metric_log_cases(self) -> None: + """Verify metric/log-only cases.""" + metric_log_cases = { + TelemetryCase.APP_CREATED, + TelemetryCase.APP_UPDATED, + TelemetryCase.APP_DELETED, + TelemetryCase.FEEDBACK_CREATED, + TelemetryCase.TOOL_EXECUTION, + TelemetryCase.MODERATION_CHECK, + TelemetryCase.SUGGESTED_QUESTION, + TelemetryCase.DATASET_RETRIEVAL, + TelemetryCase.GENERATE_NAME, + } + for case in metric_log_cases: + route = CASE_ROUTING[case] + assert route.signal_type == "metric_log" + assert route.ce_eligible is False + + def test_routing_table_completeness(self) -> None: + """Verify routing table covers all cases with correct types.""" + trace_cases = { + TelemetryCase.WORKFLOW_RUN, + TelemetryCase.MESSAGE_RUN, + TelemetryCase.NODE_EXECUTION, + TelemetryCase.DRAFT_NODE_EXECUTION, + TelemetryCase.PROMPT_GENERATION, + } + metric_log_cases = { + TelemetryCase.APP_CREATED, + TelemetryCase.APP_UPDATED, + TelemetryCase.APP_DELETED, + TelemetryCase.FEEDBACK_CREATED, + TelemetryCase.TOOL_EXECUTION, + TelemetryCase.MODERATION_CHECK, + TelemetryCase.SUGGESTED_QUESTION, + TelemetryCase.DATASET_RETRIEVAL, + TelemetryCase.GENERATE_NAME, + } + + all_cases = trace_cases | metric_log_cases + assert len(all_cases) == 14 + assert all_cases == set(TelemetryCase) + + for case in trace_cases: + assert CASE_ROUTING[case].signal_type == "trace" + + for case in metric_log_cases: + assert CASE_ROUTING[case].signal_type == "metric_log"