mirror of
https://github.com/langgenius/dify.git
synced 2026-03-14 13:51:33 +08:00
Merge branch '1.12.1-otel-ee' into deploy/enterprise
This commit is contained in:
commit
370e1fa5e2
@ -1285,7 +1285,7 @@ class TraceQueueManager:
|
||||
self.trace_instance = OpsTraceManager.get_ops_trace_instance(app_id)
|
||||
self.flask_app = current_app._get_current_object() # type: ignore
|
||||
|
||||
from core.telemetry import is_enterprise_telemetry_enabled
|
||||
from core.telemetry.gateway import is_enterprise_telemetry_enabled
|
||||
|
||||
self._enterprise_telemetry_enabled = is_enterprise_telemetry_enabled()
|
||||
if trace_manager_timer is None:
|
||||
|
||||
@ -1,10 +1,7 @@
|
||||
"""Community telemetry helpers.
|
||||
"""Telemetry facade.
|
||||
|
||||
Provides ``emit()`` which enqueues trace events into the CE trace pipeline
|
||||
(``TraceQueueManager`` → ``ops_trace`` Celery queue → Langfuse / LangSmith / etc.).
|
||||
|
||||
Enterprise-only traces (node execution, draft node execution, prompt generation)
|
||||
are silently dropped when enterprise telemetry is disabled.
|
||||
Thin public API for emitting telemetry events. All routing logic
|
||||
lives in ``core.telemetry.gateway`` which is shared by both CE and EE.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
@ -13,48 +10,34 @@ from typing import TYPE_CHECKING
|
||||
|
||||
from core.ops.entities.trace_entity import TraceTaskName
|
||||
from core.telemetry.events import TelemetryContext, TelemetryEvent
|
||||
from core.telemetry.gateway import TRACE_TASK_TO_CASE
|
||||
from core.telemetry.gateway import emit as gateway_emit
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from core.ops.ops_trace_manager import TraceQueueManager
|
||||
|
||||
_ENTERPRISE_ONLY_TRACES: frozenset[TraceTaskName] = frozenset(
|
||||
{
|
||||
TraceTaskName.DRAFT_NODE_EXECUTION_TRACE,
|
||||
TraceTaskName.NODE_EXECUTION_TRACE,
|
||||
TraceTaskName.PROMPT_GENERATION_TRACE,
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
def _is_enterprise_telemetry_enabled() -> bool:
|
||||
try:
|
||||
from enterprise.telemetry.exporter import is_enterprise_telemetry_enabled
|
||||
|
||||
return is_enterprise_telemetry_enabled()
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
|
||||
def emit(event: TelemetryEvent, trace_manager: TraceQueueManager | None = None) -> None:
|
||||
from core.ops.ops_trace_manager import TraceQueueManager as LocalTraceQueueManager
|
||||
from core.ops.ops_trace_manager import TraceTask
|
||||
"""Emit a telemetry event.
|
||||
|
||||
if event.name in _ENTERPRISE_ONLY_TRACES and not _is_enterprise_telemetry_enabled():
|
||||
Translates the ``TelemetryEvent`` (keyed by ``TraceTaskName``) into a
|
||||
``TelemetryCase`` and delegates to ``core.telemetry.gateway.emit()``.
|
||||
"""
|
||||
case = TRACE_TASK_TO_CASE.get(event.name)
|
||||
if case is None:
|
||||
return
|
||||
|
||||
queue_manager = trace_manager or LocalTraceQueueManager(
|
||||
app_id=event.context.app_id,
|
||||
user_id=event.context.user_id,
|
||||
)
|
||||
queue_manager.add_trace_task(TraceTask(event.name, **event.payload))
|
||||
context: dict[str, object] = {
|
||||
"tenant_id": event.context.tenant_id,
|
||||
"user_id": event.context.user_id,
|
||||
"app_id": event.context.app_id,
|
||||
}
|
||||
gateway_emit(case, context, event.payload, trace_manager)
|
||||
|
||||
|
||||
is_enterprise_telemetry_enabled = _is_enterprise_telemetry_enabled
|
||||
|
||||
__all__ = [
|
||||
"TelemetryContext",
|
||||
"TelemetryEvent",
|
||||
"TraceTaskName",
|
||||
"emit",
|
||||
"is_enterprise_telemetry_enabled",
|
||||
]
|
||||
|
||||
206
api/core/telemetry/gateway.py
Normal file
206
api/core/telemetry/gateway.py
Normal file
@ -0,0 +1,206 @@
|
||||
"""Telemetry gateway — single routing layer for all editions.
|
||||
|
||||
Maps ``TelemetryCase`` → ``CaseRoute`` and dispatches events to either
|
||||
the CE/EE trace pipeline (``TraceQueueManager``) or the enterprise-only
|
||||
metric/log Celery queue.
|
||||
|
||||
This module lives in ``core/`` so both CE and EE share one routing table
|
||||
and one ``emit()`` entry point. No separate enterprise gateway module is
|
||||
needed — enterprise-specific dispatch (Celery task, payload offloading)
|
||||
is handled here behind lazy imports that no-op in CE.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import logging
|
||||
import uuid
|
||||
from typing import TYPE_CHECKING, Any
|
||||
|
||||
from core.ops.entities.trace_entity import TraceTaskName
|
||||
from enterprise.telemetry.contracts import CaseRoute, SignalType, TelemetryCase, TelemetryEnvelope
|
||||
from extensions.ext_storage import storage
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from core.ops.ops_trace_manager import TraceQueueManager
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
PAYLOAD_SIZE_THRESHOLD_BYTES = 1 * 1024 * 1024
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Routing table — authoritative mapping for all editions
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
CASE_TO_TRACE_TASK: dict[TelemetryCase, TraceTaskName] = {
|
||||
TelemetryCase.WORKFLOW_RUN: TraceTaskName.WORKFLOW_TRACE,
|
||||
TelemetryCase.MESSAGE_RUN: TraceTaskName.MESSAGE_TRACE,
|
||||
TelemetryCase.NODE_EXECUTION: TraceTaskName.NODE_EXECUTION_TRACE,
|
||||
TelemetryCase.DRAFT_NODE_EXECUTION: TraceTaskName.DRAFT_NODE_EXECUTION_TRACE,
|
||||
TelemetryCase.PROMPT_GENERATION: TraceTaskName.PROMPT_GENERATION_TRACE,
|
||||
TelemetryCase.TOOL_EXECUTION: TraceTaskName.TOOL_TRACE,
|
||||
TelemetryCase.MODERATION_CHECK: TraceTaskName.MODERATION_TRACE,
|
||||
TelemetryCase.SUGGESTED_QUESTION: TraceTaskName.SUGGESTED_QUESTION_TRACE,
|
||||
TelemetryCase.DATASET_RETRIEVAL: TraceTaskName.DATASET_RETRIEVAL_TRACE,
|
||||
TelemetryCase.GENERATE_NAME: TraceTaskName.GENERATE_NAME_TRACE,
|
||||
}
|
||||
|
||||
TRACE_TASK_TO_CASE: dict[TraceTaskName, TelemetryCase] = {v: k for k, v in CASE_TO_TRACE_TASK.items()}
|
||||
|
||||
CASE_ROUTING: dict[TelemetryCase, CaseRoute] = {
|
||||
# TRACE — CE-eligible (flow in both CE and EE)
|
||||
TelemetryCase.WORKFLOW_RUN: CaseRoute(signal_type=SignalType.TRACE, ce_eligible=True),
|
||||
TelemetryCase.MESSAGE_RUN: CaseRoute(signal_type=SignalType.TRACE, ce_eligible=True),
|
||||
TelemetryCase.TOOL_EXECUTION: CaseRoute(signal_type=SignalType.TRACE, ce_eligible=True),
|
||||
TelemetryCase.MODERATION_CHECK: CaseRoute(signal_type=SignalType.TRACE, ce_eligible=True),
|
||||
TelemetryCase.SUGGESTED_QUESTION: CaseRoute(signal_type=SignalType.TRACE, ce_eligible=True),
|
||||
TelemetryCase.DATASET_RETRIEVAL: CaseRoute(signal_type=SignalType.TRACE, ce_eligible=True),
|
||||
TelemetryCase.GENERATE_NAME: CaseRoute(signal_type=SignalType.TRACE, ce_eligible=True),
|
||||
# TRACE — enterprise-only
|
||||
TelemetryCase.NODE_EXECUTION: CaseRoute(signal_type=SignalType.TRACE, ce_eligible=False),
|
||||
TelemetryCase.DRAFT_NODE_EXECUTION: CaseRoute(signal_type=SignalType.TRACE, ce_eligible=False),
|
||||
TelemetryCase.PROMPT_GENERATION: CaseRoute(signal_type=SignalType.TRACE, ce_eligible=False),
|
||||
# METRIC_LOG — enterprise-only (signal-driven, not trace)
|
||||
TelemetryCase.APP_CREATED: CaseRoute(signal_type=SignalType.METRIC_LOG, ce_eligible=False),
|
||||
TelemetryCase.APP_UPDATED: CaseRoute(signal_type=SignalType.METRIC_LOG, ce_eligible=False),
|
||||
TelemetryCase.APP_DELETED: CaseRoute(signal_type=SignalType.METRIC_LOG, ce_eligible=False),
|
||||
TelemetryCase.FEEDBACK_CREATED: CaseRoute(signal_type=SignalType.METRIC_LOG, ce_eligible=False),
|
||||
}
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Helpers
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def is_enterprise_telemetry_enabled() -> bool:
|
||||
try:
|
||||
from enterprise.telemetry.exporter import is_enterprise_telemetry_enabled
|
||||
|
||||
return is_enterprise_telemetry_enabled()
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
|
||||
def _handle_payload_sizing(
|
||||
payload: dict[str, Any],
|
||||
tenant_id: str,
|
||||
event_id: str,
|
||||
) -> tuple[dict[str, Any], str | None]:
|
||||
"""Inline or offload payload based on size.
|
||||
|
||||
Returns ``(payload_for_envelope, storage_key | None)``. Payloads
|
||||
exceeding ``PAYLOAD_SIZE_THRESHOLD_BYTES`` are written to object
|
||||
storage and replaced with an empty dict in the envelope.
|
||||
"""
|
||||
try:
|
||||
payload_json = json.dumps(payload)
|
||||
payload_size = len(payload_json.encode("utf-8"))
|
||||
except (TypeError, ValueError):
|
||||
logger.warning("Failed to serialize payload for sizing: event_id=%s", event_id)
|
||||
return payload, None
|
||||
|
||||
if payload_size <= PAYLOAD_SIZE_THRESHOLD_BYTES:
|
||||
return payload, None
|
||||
|
||||
storage_key = f"telemetry/{tenant_id}/{event_id}.json"
|
||||
try:
|
||||
storage.save(storage_key, payload_json.encode("utf-8"))
|
||||
logger.debug("Stored large payload to storage: key=%s, size=%d", storage_key, payload_size)
|
||||
return {}, storage_key
|
||||
except Exception:
|
||||
logger.warning("Failed to store large payload, inlining instead: event_id=%s", event_id, exc_info=True)
|
||||
return payload, None
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Public API
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def emit(
|
||||
case: TelemetryCase,
|
||||
context: dict[str, Any],
|
||||
payload: dict[str, Any],
|
||||
trace_manager: TraceQueueManager | None = None,
|
||||
) -> None:
|
||||
"""Route a telemetry event to the correct pipeline.
|
||||
|
||||
TRACE events are enqueued into ``TraceQueueManager`` (works in both CE
|
||||
and EE). Enterprise-only traces are silently dropped when EE is
|
||||
disabled.
|
||||
|
||||
METRIC_LOG events are dispatched to the enterprise Celery queue;
|
||||
silently dropped when enterprise telemetry is unavailable.
|
||||
"""
|
||||
route = CASE_ROUTING.get(case)
|
||||
if route is None:
|
||||
logger.warning("Unknown telemetry case: %s, dropping event", case)
|
||||
return
|
||||
|
||||
if not route.ce_eligible and not is_enterprise_telemetry_enabled():
|
||||
logger.debug("Dropping EE-only event: case=%s (EE disabled)", case)
|
||||
return
|
||||
|
||||
if route.signal_type is SignalType.TRACE:
|
||||
_emit_trace(case, context, payload, trace_manager)
|
||||
else:
|
||||
_emit_metric_log(case, context, payload)
|
||||
|
||||
|
||||
def _emit_trace(
|
||||
case: TelemetryCase,
|
||||
context: dict[str, Any],
|
||||
payload: dict[str, Any],
|
||||
trace_manager: TraceQueueManager | None,
|
||||
) -> None:
|
||||
from core.ops.ops_trace_manager import TraceQueueManager as LocalTraceQueueManager
|
||||
from core.ops.ops_trace_manager import TraceTask
|
||||
|
||||
trace_task_name = CASE_TO_TRACE_TASK.get(case)
|
||||
if trace_task_name is None:
|
||||
logger.warning("No TraceTaskName mapping for case: %s", case)
|
||||
return
|
||||
|
||||
queue_manager = trace_manager or LocalTraceQueueManager(
|
||||
app_id=context.get("app_id"),
|
||||
user_id=context.get("user_id"),
|
||||
)
|
||||
queue_manager.add_trace_task(TraceTask(trace_task_name, **payload))
|
||||
logger.debug("Enqueued trace task: case=%s, app_id=%s", case, context.get("app_id"))
|
||||
|
||||
|
||||
def _emit_metric_log(
|
||||
case: TelemetryCase,
|
||||
context: dict[str, Any],
|
||||
payload: dict[str, Any],
|
||||
) -> None:
|
||||
"""Build envelope and dispatch to enterprise Celery queue.
|
||||
|
||||
No-ops when the enterprise telemetry task is not importable (CE mode).
|
||||
"""
|
||||
try:
|
||||
from tasks.enterprise_telemetry_task import process_enterprise_telemetry
|
||||
except ImportError:
|
||||
logger.debug("Enterprise metric/log dispatch unavailable, dropping: case=%s", case)
|
||||
return
|
||||
|
||||
tenant_id = context.get("tenant_id", "")
|
||||
event_id = str(uuid.uuid4())
|
||||
|
||||
payload_for_envelope, payload_ref = _handle_payload_sizing(payload, tenant_id, event_id)
|
||||
|
||||
envelope = TelemetryEnvelope(
|
||||
case=case,
|
||||
tenant_id=tenant_id,
|
||||
event_id=event_id,
|
||||
payload=payload_for_envelope,
|
||||
metadata={"payload_ref": payload_ref} if payload_ref else None,
|
||||
)
|
||||
|
||||
process_enterprise_telemetry.delay(envelope.model_dump_json())
|
||||
logger.debug(
|
||||
"Enqueued metric/log event: case=%s, tenant_id=%s, event_id=%s",
|
||||
case,
|
||||
tenant_id,
|
||||
event_id,
|
||||
)
|
||||
@ -1,13 +1,14 @@
|
||||
"""Blinker signal handlers for enterprise telemetry.
|
||||
|
||||
Registered at import time via ``@signal.connect`` decorators.
|
||||
Import must happen during ``ext_enterprise_telemetry.init_app()`` to ensure handlers fire.
|
||||
Import must happen during ``ext_enterprise_telemetry.init_app()`` to
|
||||
ensure handlers fire. Each handler delegates to ``core.telemetry.gateway``
|
||||
which handles routing, EE-gating, and dispatch.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
import uuid
|
||||
|
||||
from events.app_event import app_was_created, app_was_deleted, app_was_updated
|
||||
from events.feedback_event import feedback_was_created
|
||||
@ -24,107 +25,60 @@ __all__ = [
|
||||
|
||||
@app_was_created.connect
|
||||
def _handle_app_created(sender: object, **kwargs: object) -> None:
|
||||
from enterprise.telemetry.contracts import TelemetryCase, TelemetryEnvelope
|
||||
from extensions.ext_enterprise_telemetry import get_enterprise_exporter
|
||||
from tasks.enterprise_telemetry_task import process_enterprise_telemetry
|
||||
from core.telemetry.gateway import emit as gateway_emit
|
||||
from enterprise.telemetry.contracts import TelemetryCase
|
||||
|
||||
exporter = get_enterprise_exporter()
|
||||
if not exporter:
|
||||
return
|
||||
|
||||
tenant_id = str(getattr(sender, "tenant_id", "") or "")
|
||||
payload = {
|
||||
"app_id": getattr(sender, "id", None),
|
||||
"mode": getattr(sender, "mode", None),
|
||||
}
|
||||
|
||||
envelope = TelemetryEnvelope(
|
||||
gateway_emit(
|
||||
case=TelemetryCase.APP_CREATED,
|
||||
tenant_id=tenant_id,
|
||||
event_id=str(uuid.uuid4()),
|
||||
payload=payload,
|
||||
context={"tenant_id": str(getattr(sender, "tenant_id", "") or "")},
|
||||
payload={
|
||||
"app_id": getattr(sender, "id", None),
|
||||
"mode": getattr(sender, "mode", None),
|
||||
},
|
||||
)
|
||||
|
||||
process_enterprise_telemetry.delay(envelope.model_dump_json())
|
||||
|
||||
|
||||
@app_was_deleted.connect
|
||||
def _handle_app_deleted(sender: object, **kwargs: object) -> None:
|
||||
from enterprise.telemetry.contracts import TelemetryCase, TelemetryEnvelope
|
||||
from extensions.ext_enterprise_telemetry import get_enterprise_exporter
|
||||
from tasks.enterprise_telemetry_task import process_enterprise_telemetry
|
||||
from core.telemetry.gateway import emit as gateway_emit
|
||||
from enterprise.telemetry.contracts import TelemetryCase
|
||||
|
||||
exporter = get_enterprise_exporter()
|
||||
if not exporter:
|
||||
return
|
||||
|
||||
tenant_id = str(getattr(sender, "tenant_id", "") or "")
|
||||
payload = {
|
||||
"app_id": getattr(sender, "id", None),
|
||||
}
|
||||
|
||||
envelope = TelemetryEnvelope(
|
||||
gateway_emit(
|
||||
case=TelemetryCase.APP_DELETED,
|
||||
tenant_id=tenant_id,
|
||||
event_id=str(uuid.uuid4()),
|
||||
payload=payload,
|
||||
context={"tenant_id": str(getattr(sender, "tenant_id", "") or "")},
|
||||
payload={"app_id": getattr(sender, "id", None)},
|
||||
)
|
||||
|
||||
process_enterprise_telemetry.delay(envelope.model_dump_json())
|
||||
|
||||
|
||||
@app_was_updated.connect
|
||||
def _handle_app_updated(sender: object, **kwargs: object) -> None:
|
||||
from enterprise.telemetry.contracts import TelemetryCase, TelemetryEnvelope
|
||||
from extensions.ext_enterprise_telemetry import get_enterprise_exporter
|
||||
from tasks.enterprise_telemetry_task import process_enterprise_telemetry
|
||||
from core.telemetry.gateway import emit as gateway_emit
|
||||
from enterprise.telemetry.contracts import TelemetryCase
|
||||
|
||||
exporter = get_enterprise_exporter()
|
||||
if not exporter:
|
||||
return
|
||||
|
||||
tenant_id = str(getattr(sender, "tenant_id", "") or "")
|
||||
payload = {
|
||||
"app_id": getattr(sender, "id", None),
|
||||
}
|
||||
|
||||
envelope = TelemetryEnvelope(
|
||||
gateway_emit(
|
||||
case=TelemetryCase.APP_UPDATED,
|
||||
tenant_id=tenant_id,
|
||||
event_id=str(uuid.uuid4()),
|
||||
payload=payload,
|
||||
context={"tenant_id": str(getattr(sender, "tenant_id", "") or "")},
|
||||
payload={"app_id": getattr(sender, "id", None)},
|
||||
)
|
||||
|
||||
process_enterprise_telemetry.delay(envelope.model_dump_json())
|
||||
|
||||
|
||||
@feedback_was_created.connect
|
||||
def _handle_feedback_created(sender: object, **kwargs: object) -> None:
|
||||
from enterprise.telemetry.contracts import TelemetryCase, TelemetryEnvelope
|
||||
from extensions.ext_enterprise_telemetry import get_enterprise_exporter
|
||||
from tasks.enterprise_telemetry_task import process_enterprise_telemetry
|
||||
|
||||
exporter = get_enterprise_exporter()
|
||||
if not exporter:
|
||||
return
|
||||
from core.telemetry.gateway import emit as gateway_emit
|
||||
from enterprise.telemetry.contracts import TelemetryCase
|
||||
|
||||
tenant_id = str(kwargs.get("tenant_id", "") or "")
|
||||
payload = {
|
||||
"message_id": getattr(sender, "message_id", None),
|
||||
"app_id": getattr(sender, "app_id", None),
|
||||
"conversation_id": getattr(sender, "conversation_id", None),
|
||||
"from_end_user_id": getattr(sender, "from_end_user_id", None),
|
||||
"from_account_id": getattr(sender, "from_account_id", None),
|
||||
"rating": getattr(sender, "rating", None),
|
||||
"from_source": getattr(sender, "from_source", None),
|
||||
"content": getattr(sender, "content", None),
|
||||
}
|
||||
|
||||
envelope = TelemetryEnvelope(
|
||||
gateway_emit(
|
||||
case=TelemetryCase.FEEDBACK_CREATED,
|
||||
tenant_id=tenant_id,
|
||||
event_id=str(uuid.uuid4()),
|
||||
payload=payload,
|
||||
context={"tenant_id": tenant_id},
|
||||
payload={
|
||||
"message_id": getattr(sender, "message_id", None),
|
||||
"app_id": getattr(sender, "app_id", None),
|
||||
"conversation_id": getattr(sender, "conversation_id", None),
|
||||
"from_end_user_id": getattr(sender, "from_end_user_id", None),
|
||||
"from_account_id": getattr(sender, "from_account_id", None),
|
||||
"rating": getattr(sender, "rating", None),
|
||||
"from_source": getattr(sender, "from_source", None),
|
||||
"content": getattr(sender, "content", None),
|
||||
},
|
||||
)
|
||||
|
||||
process_enterprise_telemetry.delay(envelope.model_dump_json())
|
||||
|
||||
@ -1,199 +0,0 @@
|
||||
"""Telemetry gateway routing and dispatch.
|
||||
|
||||
Maps ``TelemetryCase`` → ``CaseRoute`` (signal type + CE eligibility)
|
||||
and dispatches events to either the trace pipeline or the metric/log
|
||||
Celery queue.
|
||||
|
||||
Singleton lifecycle is managed by ``ext_enterprise_telemetry.init_app()``
|
||||
which creates the instance during single-threaded Flask app startup.
|
||||
Access via ``ext_enterprise_telemetry.get_gateway()``.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import logging
|
||||
import uuid
|
||||
from typing import TYPE_CHECKING, Any
|
||||
|
||||
from core.ops.entities.trace_entity import TraceTaskName
|
||||
from enterprise.telemetry.contracts import CaseRoute, SignalType, TelemetryCase, TelemetryEnvelope
|
||||
from extensions.ext_storage import storage
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from core.ops.ops_trace_manager import TraceQueueManager
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
PAYLOAD_SIZE_THRESHOLD_BYTES = 1 * 1024 * 1024
|
||||
|
||||
CASE_TO_TRACE_TASK: dict[TelemetryCase, TraceTaskName] = {
|
||||
TelemetryCase.WORKFLOW_RUN: TraceTaskName.WORKFLOW_TRACE,
|
||||
TelemetryCase.MESSAGE_RUN: TraceTaskName.MESSAGE_TRACE,
|
||||
TelemetryCase.NODE_EXECUTION: TraceTaskName.NODE_EXECUTION_TRACE,
|
||||
TelemetryCase.DRAFT_NODE_EXECUTION: TraceTaskName.DRAFT_NODE_EXECUTION_TRACE,
|
||||
TelemetryCase.PROMPT_GENERATION: TraceTaskName.PROMPT_GENERATION_TRACE,
|
||||
}
|
||||
|
||||
CASE_ROUTING: dict[TelemetryCase, CaseRoute] = {
|
||||
TelemetryCase.WORKFLOW_RUN: CaseRoute(signal_type=SignalType.TRACE, ce_eligible=True),
|
||||
TelemetryCase.MESSAGE_RUN: CaseRoute(signal_type=SignalType.TRACE, ce_eligible=True),
|
||||
TelemetryCase.NODE_EXECUTION: CaseRoute(signal_type=SignalType.TRACE, ce_eligible=False),
|
||||
TelemetryCase.DRAFT_NODE_EXECUTION: CaseRoute(signal_type=SignalType.TRACE, ce_eligible=False),
|
||||
TelemetryCase.PROMPT_GENERATION: CaseRoute(signal_type=SignalType.TRACE, ce_eligible=False),
|
||||
TelemetryCase.APP_CREATED: CaseRoute(signal_type=SignalType.METRIC_LOG, ce_eligible=False),
|
||||
TelemetryCase.APP_UPDATED: CaseRoute(signal_type=SignalType.METRIC_LOG, ce_eligible=False),
|
||||
TelemetryCase.APP_DELETED: CaseRoute(signal_type=SignalType.METRIC_LOG, ce_eligible=False),
|
||||
TelemetryCase.FEEDBACK_CREATED: CaseRoute(signal_type=SignalType.METRIC_LOG, ce_eligible=False),
|
||||
TelemetryCase.TOOL_EXECUTION: CaseRoute(signal_type=SignalType.METRIC_LOG, ce_eligible=False),
|
||||
TelemetryCase.MODERATION_CHECK: CaseRoute(signal_type=SignalType.METRIC_LOG, ce_eligible=False),
|
||||
TelemetryCase.SUGGESTED_QUESTION: CaseRoute(signal_type=SignalType.METRIC_LOG, ce_eligible=False),
|
||||
TelemetryCase.DATASET_RETRIEVAL: CaseRoute(signal_type=SignalType.METRIC_LOG, ce_eligible=False),
|
||||
TelemetryCase.GENERATE_NAME: CaseRoute(signal_type=SignalType.METRIC_LOG, ce_eligible=False),
|
||||
}
|
||||
|
||||
|
||||
def _is_enterprise_telemetry_enabled() -> bool:
|
||||
try:
|
||||
from enterprise.telemetry.exporter import is_enterprise_telemetry_enabled
|
||||
|
||||
return is_enterprise_telemetry_enabled()
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
|
||||
def _should_drop_ee_only_event(route: CaseRoute) -> bool:
|
||||
"""Return True when the event is enterprise-only and EE telemetry is disabled."""
|
||||
return not route.ce_eligible and not _is_enterprise_telemetry_enabled()
|
||||
|
||||
|
||||
class TelemetryGateway:
|
||||
"""Routes telemetry events to the trace pipeline or the metric/log Celery queue.
|
||||
|
||||
Stateless — instantiated once during ``ext_enterprise_telemetry.init_app()``
|
||||
and shared for the lifetime of the process.
|
||||
"""
|
||||
|
||||
def emit(
|
||||
self,
|
||||
case: TelemetryCase,
|
||||
context: dict[str, Any],
|
||||
payload: dict[str, Any],
|
||||
trace_manager: TraceQueueManager | None = None,
|
||||
) -> None:
|
||||
route = CASE_ROUTING.get(case)
|
||||
if route is None:
|
||||
logger.warning("Unknown telemetry case: %s, dropping event", case)
|
||||
return
|
||||
|
||||
if _should_drop_ee_only_event(route):
|
||||
logger.debug("Dropping EE-only event: case=%s (EE disabled)", case)
|
||||
return
|
||||
|
||||
logger.debug(
|
||||
"Gateway routing: case=%s, signal_type=%s, ce_eligible=%s",
|
||||
case,
|
||||
route.signal_type,
|
||||
route.ce_eligible,
|
||||
)
|
||||
|
||||
if route.signal_type is SignalType.TRACE:
|
||||
self._emit_trace(case, context, payload, route, trace_manager)
|
||||
else:
|
||||
self._emit_metric_log(case, context, payload)
|
||||
|
||||
def _emit_trace(
|
||||
self,
|
||||
case: TelemetryCase,
|
||||
context: dict[str, Any],
|
||||
payload: dict[str, Any],
|
||||
route: CaseRoute,
|
||||
trace_manager: TraceQueueManager | None,
|
||||
) -> None:
|
||||
from core.ops.ops_trace_manager import TraceQueueManager as LocalTraceQueueManager
|
||||
from core.ops.ops_trace_manager import TraceTask
|
||||
|
||||
trace_task_name = CASE_TO_TRACE_TASK.get(case)
|
||||
if trace_task_name is None:
|
||||
logger.warning("No TraceTaskName mapping for case: %s", case)
|
||||
return
|
||||
|
||||
queue_manager = trace_manager or LocalTraceQueueManager(
|
||||
app_id=context.get("app_id"),
|
||||
user_id=context.get("user_id"),
|
||||
)
|
||||
|
||||
queue_manager.add_trace_task(TraceTask(trace_task_name, **payload))
|
||||
logger.debug("Enqueued trace task: case=%s, app_id=%s", case, context.get("app_id"))
|
||||
|
||||
def _emit_metric_log(
|
||||
self,
|
||||
case: TelemetryCase,
|
||||
context: dict[str, Any],
|
||||
payload: dict[str, Any],
|
||||
) -> None:
|
||||
from tasks.enterprise_telemetry_task import process_enterprise_telemetry
|
||||
|
||||
tenant_id = context.get("tenant_id", "")
|
||||
event_id = str(uuid.uuid4())
|
||||
|
||||
payload_for_envelope, payload_ref = self._handle_payload_sizing(payload, tenant_id, event_id)
|
||||
|
||||
envelope = TelemetryEnvelope(
|
||||
case=case,
|
||||
tenant_id=tenant_id,
|
||||
event_id=event_id,
|
||||
payload=payload_for_envelope,
|
||||
metadata={"payload_ref": payload_ref} if payload_ref else None,
|
||||
)
|
||||
|
||||
process_enterprise_telemetry.delay(envelope.model_dump_json())
|
||||
logger.debug(
|
||||
"Enqueued metric/log event: case=%s, tenant_id=%s, event_id=%s",
|
||||
case,
|
||||
tenant_id,
|
||||
event_id,
|
||||
)
|
||||
|
||||
def _handle_payload_sizing(
|
||||
self,
|
||||
payload: dict[str, Any],
|
||||
tenant_id: str,
|
||||
event_id: str,
|
||||
) -> tuple[dict[str, Any], str | None]:
|
||||
try:
|
||||
payload_json = json.dumps(payload)
|
||||
payload_size = len(payload_json.encode("utf-8"))
|
||||
except (TypeError, ValueError):
|
||||
logger.warning("Failed to serialize payload for sizing: event_id=%s", event_id)
|
||||
return payload, None
|
||||
|
||||
if payload_size <= PAYLOAD_SIZE_THRESHOLD_BYTES:
|
||||
return payload, None
|
||||
|
||||
storage_key = f"telemetry/{tenant_id}/{event_id}.json"
|
||||
try:
|
||||
storage.save(storage_key, payload_json.encode("utf-8"))
|
||||
logger.debug("Stored large payload to storage: key=%s, size=%d", storage_key, payload_size)
|
||||
return {}, storage_key
|
||||
except Exception:
|
||||
logger.warning("Failed to store large payload, inlining instead: event_id=%s", event_id, exc_info=True)
|
||||
return payload, None
|
||||
|
||||
|
||||
def emit(
|
||||
case: TelemetryCase,
|
||||
context: dict[str, Any],
|
||||
payload: dict[str, Any],
|
||||
trace_manager: TraceQueueManager | None = None,
|
||||
) -> None:
|
||||
"""Module-level convenience wrapper.
|
||||
|
||||
Fetches the gateway singleton from the extension; no-ops when
|
||||
enterprise telemetry is disabled (gateway is ``None``).
|
||||
"""
|
||||
from extensions.ext_enterprise_telemetry import get_gateway
|
||||
|
||||
gateway = get_gateway()
|
||||
if gateway is not None:
|
||||
gateway.emit(case, context, payload, trace_manager)
|
||||
@ -1,8 +1,8 @@
|
||||
"""Flask extension for enterprise telemetry lifecycle management.
|
||||
|
||||
Initializes the EnterpriseExporter and TelemetryGateway singletons during
|
||||
``create_app()`` (single-threaded), registers blinker event handlers,
|
||||
and hooks atexit for graceful shutdown.
|
||||
Initializes the EnterpriseExporter singleton during ``create_app()``
|
||||
(single-threaded), registers blinker event handlers, and hooks atexit
|
||||
for graceful shutdown.
|
||||
|
||||
Skipped entirely when ``ENTERPRISE_ENABLED`` and ``ENTERPRISE_TELEMETRY_ENABLED``
|
||||
are false (``is_enabled()`` gate).
|
||||
@ -19,12 +19,10 @@ from configs import dify_config
|
||||
if TYPE_CHECKING:
|
||||
from dify_app import DifyApp
|
||||
from enterprise.telemetry.exporter import EnterpriseExporter
|
||||
from enterprise.telemetry.gateway import TelemetryGateway
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
_exporter: EnterpriseExporter | None = None
|
||||
_gateway: TelemetryGateway | None = None
|
||||
|
||||
|
||||
def is_enabled() -> bool:
|
||||
@ -32,16 +30,14 @@ def is_enabled() -> bool:
|
||||
|
||||
|
||||
def init_app(app: DifyApp) -> None:
|
||||
global _exporter, _gateway
|
||||
global _exporter
|
||||
|
||||
if not is_enabled():
|
||||
return
|
||||
|
||||
from enterprise.telemetry.exporter import EnterpriseExporter
|
||||
from enterprise.telemetry.gateway import TelemetryGateway
|
||||
|
||||
_exporter = EnterpriseExporter(dify_config)
|
||||
_gateway = TelemetryGateway()
|
||||
atexit.register(_exporter.shutdown)
|
||||
|
||||
# Import to trigger @signal.connect decorator registration
|
||||
@ -52,7 +48,3 @@ def init_app(app: DifyApp) -> None:
|
||||
|
||||
def get_enterprise_exporter() -> EnterpriseExporter | None:
|
||||
return _exporter
|
||||
|
||||
|
||||
def get_gateway() -> TelemetryGateway | None:
|
||||
return _gateway
|
||||
|
||||
@ -39,7 +39,7 @@ def trace_queue_manager_and_task(monkeypatch):
|
||||
class StubTraceQueueManager:
|
||||
def __init__(self, app_id=None):
|
||||
self.app_id = app_id
|
||||
from core.telemetry import is_enterprise_telemetry_enabled
|
||||
from core.telemetry.gateway import is_enterprise_telemetry_enabled
|
||||
|
||||
self._enterprise_telemetry_enabled = is_enterprise_telemetry_enabled()
|
||||
self.trace_instance = StubOpsTraceManager.get_ops_trace_instance(app_id)
|
||||
@ -87,7 +87,7 @@ class TestTraceQueueManagerTelemetryGuard:
|
||||
trace_task = TraceTask(trace_type=TraceTaskName.WORKFLOW_TRACE)
|
||||
|
||||
with (
|
||||
patch("core.telemetry.is_enterprise_telemetry_enabled", return_value=False),
|
||||
patch("core.telemetry.gateway.is_enterprise_telemetry_enabled", return_value=False),
|
||||
patch("core.ops.ops_trace_manager.OpsTraceManager.get_ops_trace_instance", return_value=None),
|
||||
patch("core.ops.ops_trace_manager.trace_manager_queue", mock_queue),
|
||||
):
|
||||
@ -109,7 +109,7 @@ class TestTraceQueueManagerTelemetryGuard:
|
||||
trace_task = TraceTask(trace_type=TraceTaskName.WORKFLOW_TRACE)
|
||||
|
||||
with (
|
||||
patch("core.telemetry.is_enterprise_telemetry_enabled", return_value=True),
|
||||
patch("core.telemetry.gateway.is_enterprise_telemetry_enabled", return_value=True),
|
||||
patch("core.ops.ops_trace_manager.OpsTraceManager.get_ops_trace_instance", return_value=None),
|
||||
patch("core.ops.ops_trace_manager.trace_manager_queue", mock_queue),
|
||||
):
|
||||
@ -135,7 +135,7 @@ class TestTraceQueueManagerTelemetryGuard:
|
||||
trace_task = TraceTask(trace_type=TraceTaskName.WORKFLOW_TRACE)
|
||||
|
||||
with (
|
||||
patch("core.telemetry.is_enterprise_telemetry_enabled", return_value=False),
|
||||
patch("core.telemetry.gateway.is_enterprise_telemetry_enabled", return_value=False),
|
||||
patch(
|
||||
"core.ops.ops_trace_manager.OpsTraceManager.get_ops_trace_instance", return_value=mock_trace_instance
|
||||
),
|
||||
@ -163,7 +163,7 @@ class TestTraceQueueManagerTelemetryGuard:
|
||||
trace_task = TraceTask(trace_type=TraceTaskName.WORKFLOW_TRACE)
|
||||
|
||||
with (
|
||||
patch("core.telemetry.is_enterprise_telemetry_enabled", return_value=True),
|
||||
patch("core.telemetry.gateway.is_enterprise_telemetry_enabled", return_value=True),
|
||||
patch(
|
||||
"core.ops.ops_trace_manager.OpsTraceManager.get_ops_trace_instance", return_value=mock_trace_instance
|
||||
),
|
||||
@ -189,7 +189,7 @@ class TestTraceQueueManagerTelemetryGuard:
|
||||
trace_task = TraceTask(trace_type=TraceTaskName.WORKFLOW_TRACE)
|
||||
|
||||
with (
|
||||
patch("core.telemetry.is_enterprise_telemetry_enabled", return_value=True),
|
||||
patch("core.telemetry.gateway.is_enterprise_telemetry_enabled", return_value=True),
|
||||
patch("core.ops.ops_trace_manager.OpsTraceManager.get_ops_trace_instance", return_value=None),
|
||||
patch("core.ops.ops_trace_manager.trace_manager_queue", mock_queue),
|
||||
):
|
||||
|
||||
@ -53,7 +53,7 @@ def telemetry_test_setup(monkeypatch):
|
||||
|
||||
|
||||
class TestTelemetryEmit:
|
||||
@patch("core.telemetry._is_enterprise_telemetry_enabled", return_value=True)
|
||||
@patch("core.telemetry.gateway.is_enterprise_telemetry_enabled", return_value=True)
|
||||
def test_emit_enterprise_trace_creates_trace_task(self, _mock_ee, telemetry_test_setup):
|
||||
emit_fn, mock_queue = telemetry_test_setup
|
||||
|
||||
@ -107,7 +107,7 @@ class TestTelemetryEmit:
|
||||
|
||||
mock_queue.put.assert_not_called()
|
||||
|
||||
@patch("core.telemetry._is_enterprise_telemetry_enabled", return_value=True)
|
||||
@patch("core.telemetry.gateway.is_enterprise_telemetry_enabled", return_value=True)
|
||||
def test_emit_all_enterprise_only_traces_allowed_when_ee_enabled(self, _mock_ee, telemetry_test_setup):
|
||||
emit_fn, mock_queue = telemetry_test_setup
|
||||
|
||||
@ -136,7 +136,7 @@ class TestTelemetryEmit:
|
||||
called_task = mock_queue.put.call_args[0][0]
|
||||
assert called_task.trace_type == trace_name
|
||||
|
||||
@patch("core.telemetry._is_enterprise_telemetry_enabled", return_value=True)
|
||||
@patch("core.telemetry.gateway.is_enterprise_telemetry_enabled", return_value=True)
|
||||
def test_emit_passes_name_directly_to_trace_task(self, _mock_ee, telemetry_test_setup):
|
||||
emit_fn, mock_queue = telemetry_test_setup
|
||||
|
||||
@ -157,7 +157,7 @@ class TestTelemetryEmit:
|
||||
assert called_task.trace_type == TraceTaskName.DRAFT_NODE_EXECUTION_TRACE
|
||||
assert isinstance(called_task.trace_type, TraceTaskName)
|
||||
|
||||
@patch("core.telemetry._is_enterprise_telemetry_enabled", return_value=True)
|
||||
@patch("core.telemetry.gateway.is_enterprise_telemetry_enabled", return_value=True)
|
||||
def test_emit_with_provided_trace_manager(self, _mock_ee, telemetry_test_setup):
|
||||
emit_fn, mock_queue = telemetry_test_setup
|
||||
|
||||
|
||||
@ -5,14 +5,13 @@ from unittest.mock import MagicMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
from core.telemetry import is_enterprise_telemetry_enabled
|
||||
from core.telemetry.gateway import emit, is_enterprise_telemetry_enabled
|
||||
from enterprise.telemetry.contracts import TelemetryCase
|
||||
from enterprise.telemetry.gateway import TelemetryGateway
|
||||
|
||||
|
||||
class TestTelemetryCoreExports:
|
||||
def test_is_enterprise_telemetry_enabled_exported(self) -> None:
|
||||
from core.telemetry import is_enterprise_telemetry_enabled as exported_func
|
||||
from core.telemetry.gateway import is_enterprise_telemetry_enabled as exported_func
|
||||
|
||||
assert callable(exported_func)
|
||||
|
||||
@ -38,10 +37,6 @@ def mock_ops_trace_manager():
|
||||
|
||||
|
||||
class TestGatewayIntegrationTraceRouting:
|
||||
@pytest.fixture
|
||||
def gateway(self) -> TelemetryGateway:
|
||||
return TelemetryGateway()
|
||||
|
||||
@pytest.fixture
|
||||
def mock_trace_manager(self) -> MagicMock:
|
||||
return MagicMock()
|
||||
@ -49,68 +44,61 @@ class TestGatewayIntegrationTraceRouting:
|
||||
@pytest.mark.usefixtures("mock_ops_trace_manager")
|
||||
def test_ce_eligible_trace_routed_to_trace_manager(
|
||||
self,
|
||||
gateway: TelemetryGateway,
|
||||
mock_trace_manager: MagicMock,
|
||||
) -> None:
|
||||
with patch("enterprise.telemetry.gateway._is_enterprise_telemetry_enabled", return_value=True):
|
||||
with patch("core.telemetry.gateway.is_enterprise_telemetry_enabled", return_value=True):
|
||||
context = {"app_id": "app-123", "user_id": "user-456", "tenant_id": "tenant-789"}
|
||||
payload = {"workflow_run_id": "run-abc"}
|
||||
|
||||
gateway.emit(TelemetryCase.WORKFLOW_RUN, context, payload, mock_trace_manager)
|
||||
emit(TelemetryCase.WORKFLOW_RUN, context, payload, mock_trace_manager)
|
||||
|
||||
mock_trace_manager.add_trace_task.assert_called_once()
|
||||
|
||||
@pytest.mark.usefixtures("mock_ops_trace_manager")
|
||||
def test_ce_eligible_trace_routed_when_ee_disabled(
|
||||
self,
|
||||
gateway: TelemetryGateway,
|
||||
mock_trace_manager: MagicMock,
|
||||
) -> None:
|
||||
with patch("enterprise.telemetry.gateway._is_enterprise_telemetry_enabled", return_value=False):
|
||||
with patch("core.telemetry.gateway.is_enterprise_telemetry_enabled", return_value=False):
|
||||
context = {"app_id": "app-123", "user_id": "user-456"}
|
||||
payload = {"workflow_run_id": "run-abc"}
|
||||
|
||||
gateway.emit(TelemetryCase.WORKFLOW_RUN, context, payload, mock_trace_manager)
|
||||
emit(TelemetryCase.WORKFLOW_RUN, context, payload, mock_trace_manager)
|
||||
|
||||
mock_trace_manager.add_trace_task.assert_called_once()
|
||||
|
||||
@pytest.mark.usefixtures("mock_ops_trace_manager")
|
||||
def test_enterprise_only_trace_dropped_when_ee_disabled(
|
||||
self,
|
||||
gateway: TelemetryGateway,
|
||||
mock_trace_manager: MagicMock,
|
||||
) -> None:
|
||||
with patch("enterprise.telemetry.gateway._is_enterprise_telemetry_enabled", return_value=False):
|
||||
with patch("core.telemetry.gateway.is_enterprise_telemetry_enabled", return_value=False):
|
||||
context = {"app_id": "app-123", "user_id": "user-456"}
|
||||
payload = {"node_id": "node-abc"}
|
||||
|
||||
gateway.emit(TelemetryCase.NODE_EXECUTION, context, payload, mock_trace_manager)
|
||||
emit(TelemetryCase.NODE_EXECUTION, context, payload, mock_trace_manager)
|
||||
|
||||
mock_trace_manager.add_trace_task.assert_not_called()
|
||||
|
||||
@pytest.mark.usefixtures("mock_ops_trace_manager")
|
||||
def test_enterprise_only_trace_routed_when_ee_enabled(
|
||||
self,
|
||||
gateway: TelemetryGateway,
|
||||
mock_trace_manager: MagicMock,
|
||||
) -> None:
|
||||
with patch("enterprise.telemetry.gateway._is_enterprise_telemetry_enabled", return_value=True):
|
||||
with patch("core.telemetry.gateway.is_enterprise_telemetry_enabled", return_value=True):
|
||||
context = {"app_id": "app-123", "user_id": "user-456"}
|
||||
payload = {"node_id": "node-abc"}
|
||||
|
||||
gateway.emit(TelemetryCase.NODE_EXECUTION, context, payload, mock_trace_manager)
|
||||
emit(TelemetryCase.NODE_EXECUTION, context, payload, mock_trace_manager)
|
||||
|
||||
mock_trace_manager.add_trace_task.assert_called_once()
|
||||
|
||||
|
||||
class TestGatewayIntegrationMetricRouting:
|
||||
@pytest.fixture
|
||||
def gateway(self) -> TelemetryGateway:
|
||||
return TelemetryGateway()
|
||||
|
||||
@patch("core.telemetry.gateway.is_enterprise_telemetry_enabled", return_value=True)
|
||||
def test_metric_case_routes_to_celery_task(
|
||||
self,
|
||||
gateway: TelemetryGateway,
|
||||
_mock_ee_enabled: MagicMock,
|
||||
) -> None:
|
||||
from enterprise.telemetry.contracts import TelemetryEnvelope
|
||||
|
||||
@ -118,7 +106,7 @@ class TestGatewayIntegrationMetricRouting:
|
||||
context = {"tenant_id": "tenant-123"}
|
||||
payload = {"app_id": "app-abc", "name": "My App"}
|
||||
|
||||
gateway.emit(TelemetryCase.APP_CREATED, context, payload)
|
||||
emit(TelemetryCase.APP_CREATED, context, payload)
|
||||
|
||||
mock_delay.assert_called_once()
|
||||
envelope_json = mock_delay.call_args[0][0]
|
||||
@ -127,46 +115,36 @@ class TestGatewayIntegrationMetricRouting:
|
||||
assert envelope.tenant_id == "tenant-123"
|
||||
assert envelope.payload["app_id"] == "app-abc"
|
||||
|
||||
def test_tool_execution_metric_routed(
|
||||
@pytest.mark.usefixtures("mock_ops_trace_manager")
|
||||
@patch("core.telemetry.gateway.is_enterprise_telemetry_enabled", return_value=True)
|
||||
def test_tool_execution_trace_routed(
|
||||
self,
|
||||
gateway: TelemetryGateway,
|
||||
_mock_ee_enabled: MagicMock,
|
||||
) -> None:
|
||||
from enterprise.telemetry.contracts import TelemetryEnvelope
|
||||
mock_trace_manager = MagicMock()
|
||||
context = {"tenant_id": "tenant-123", "app_id": "app-123"}
|
||||
payload = {"tool_name": "test_tool", "tool_inputs": {}, "tool_outputs": "result"}
|
||||
|
||||
with patch("tasks.enterprise_telemetry_task.process_enterprise_telemetry.delay") as mock_delay:
|
||||
context = {"tenant_id": "tenant-123", "app_id": "app-123"}
|
||||
payload = {"tool_name": "test_tool", "tool_inputs": {}, "tool_outputs": "result"}
|
||||
emit(TelemetryCase.TOOL_EXECUTION, context, payload, mock_trace_manager)
|
||||
|
||||
gateway.emit(TelemetryCase.TOOL_EXECUTION, context, payload)
|
||||
mock_trace_manager.add_trace_task.assert_called_once()
|
||||
|
||||
mock_delay.assert_called_once()
|
||||
envelope_json = mock_delay.call_args[0][0]
|
||||
envelope = TelemetryEnvelope.model_validate_json(envelope_json)
|
||||
assert envelope.case == TelemetryCase.TOOL_EXECUTION
|
||||
|
||||
def test_moderation_check_metric_routed(
|
||||
@pytest.mark.usefixtures("mock_ops_trace_manager")
|
||||
@patch("core.telemetry.gateway.is_enterprise_telemetry_enabled", return_value=True)
|
||||
def test_moderation_check_trace_routed(
|
||||
self,
|
||||
gateway: TelemetryGateway,
|
||||
_mock_ee_enabled: MagicMock,
|
||||
) -> None:
|
||||
from enterprise.telemetry.contracts import TelemetryEnvelope
|
||||
mock_trace_manager = MagicMock()
|
||||
context = {"tenant_id": "tenant-123", "app_id": "app-123"}
|
||||
payload = {"message_id": "msg-123", "moderation_result": {"flagged": False}}
|
||||
|
||||
with patch("tasks.enterprise_telemetry_task.process_enterprise_telemetry.delay") as mock_delay:
|
||||
context = {"tenant_id": "tenant-123", "app_id": "app-123"}
|
||||
payload = {"message_id": "msg-123", "moderation_result": {"flagged": False}}
|
||||
emit(TelemetryCase.MODERATION_CHECK, context, payload, mock_trace_manager)
|
||||
|
||||
gateway.emit(TelemetryCase.MODERATION_CHECK, context, payload)
|
||||
|
||||
mock_delay.assert_called_once()
|
||||
envelope_json = mock_delay.call_args[0][0]
|
||||
envelope = TelemetryEnvelope.model_validate_json(envelope_json)
|
||||
assert envelope.case == TelemetryCase.MODERATION_CHECK
|
||||
mock_trace_manager.add_trace_task.assert_called_once()
|
||||
|
||||
|
||||
class TestGatewayIntegrationCEEligibility:
|
||||
@pytest.fixture
|
||||
def gateway(self) -> TelemetryGateway:
|
||||
return TelemetryGateway()
|
||||
|
||||
@pytest.fixture
|
||||
def mock_trace_manager(self) -> MagicMock:
|
||||
return MagicMock()
|
||||
@ -174,70 +152,65 @@ class TestGatewayIntegrationCEEligibility:
|
||||
@pytest.mark.usefixtures("mock_ops_trace_manager")
|
||||
def test_workflow_run_is_ce_eligible(
|
||||
self,
|
||||
gateway: TelemetryGateway,
|
||||
mock_trace_manager: MagicMock,
|
||||
) -> None:
|
||||
with patch("enterprise.telemetry.gateway._is_enterprise_telemetry_enabled", return_value=False):
|
||||
with patch("core.telemetry.gateway.is_enterprise_telemetry_enabled", return_value=False):
|
||||
context = {"app_id": "app-123", "user_id": "user-456"}
|
||||
payload = {"workflow_run_id": "run-abc"}
|
||||
|
||||
gateway.emit(TelemetryCase.WORKFLOW_RUN, context, payload, mock_trace_manager)
|
||||
emit(TelemetryCase.WORKFLOW_RUN, context, payload, mock_trace_manager)
|
||||
|
||||
mock_trace_manager.add_trace_task.assert_called_once()
|
||||
|
||||
@pytest.mark.usefixtures("mock_ops_trace_manager")
|
||||
def test_message_run_is_ce_eligible(
|
||||
self,
|
||||
gateway: TelemetryGateway,
|
||||
mock_trace_manager: MagicMock,
|
||||
) -> None:
|
||||
with patch("enterprise.telemetry.gateway._is_enterprise_telemetry_enabled", return_value=False):
|
||||
with patch("core.telemetry.gateway.is_enterprise_telemetry_enabled", return_value=False):
|
||||
context = {"app_id": "app-123", "user_id": "user-456"}
|
||||
payload = {"message_id": "msg-abc", "conversation_id": "conv-123"}
|
||||
|
||||
gateway.emit(TelemetryCase.MESSAGE_RUN, context, payload, mock_trace_manager)
|
||||
emit(TelemetryCase.MESSAGE_RUN, context, payload, mock_trace_manager)
|
||||
|
||||
mock_trace_manager.add_trace_task.assert_called_once()
|
||||
|
||||
@pytest.mark.usefixtures("mock_ops_trace_manager")
|
||||
def test_node_execution_not_ce_eligible(
|
||||
self,
|
||||
gateway: TelemetryGateway,
|
||||
mock_trace_manager: MagicMock,
|
||||
) -> None:
|
||||
with patch("enterprise.telemetry.gateway._is_enterprise_telemetry_enabled", return_value=False):
|
||||
with patch("core.telemetry.gateway.is_enterprise_telemetry_enabled", return_value=False):
|
||||
context = {"app_id": "app-123", "user_id": "user-456"}
|
||||
payload = {"node_id": "node-abc"}
|
||||
|
||||
gateway.emit(TelemetryCase.NODE_EXECUTION, context, payload, mock_trace_manager)
|
||||
emit(TelemetryCase.NODE_EXECUTION, context, payload, mock_trace_manager)
|
||||
|
||||
mock_trace_manager.add_trace_task.assert_not_called()
|
||||
|
||||
@pytest.mark.usefixtures("mock_ops_trace_manager")
|
||||
def test_draft_node_execution_not_ce_eligible(
|
||||
self,
|
||||
gateway: TelemetryGateway,
|
||||
mock_trace_manager: MagicMock,
|
||||
) -> None:
|
||||
with patch("enterprise.telemetry.gateway._is_enterprise_telemetry_enabled", return_value=False):
|
||||
with patch("core.telemetry.gateway.is_enterprise_telemetry_enabled", return_value=False):
|
||||
context = {"app_id": "app-123", "user_id": "user-456"}
|
||||
payload = {"node_execution_data": {}}
|
||||
|
||||
gateway.emit(TelemetryCase.DRAFT_NODE_EXECUTION, context, payload, mock_trace_manager)
|
||||
emit(TelemetryCase.DRAFT_NODE_EXECUTION, context, payload, mock_trace_manager)
|
||||
|
||||
mock_trace_manager.add_trace_task.assert_not_called()
|
||||
|
||||
@pytest.mark.usefixtures("mock_ops_trace_manager")
|
||||
def test_prompt_generation_not_ce_eligible(
|
||||
self,
|
||||
gateway: TelemetryGateway,
|
||||
mock_trace_manager: MagicMock,
|
||||
) -> None:
|
||||
with patch("enterprise.telemetry.gateway._is_enterprise_telemetry_enabled", return_value=False):
|
||||
with patch("core.telemetry.gateway.is_enterprise_telemetry_enabled", return_value=False):
|
||||
context = {"app_id": "app-123", "user_id": "user-456", "tenant_id": "tenant-789"}
|
||||
payload = {"operation_type": "generate", "instruction": "test"}
|
||||
|
||||
gateway.emit(TelemetryCase.PROMPT_GENERATION, context, payload, mock_trace_manager)
|
||||
emit(TelemetryCase.PROMPT_GENERATION, context, payload, mock_trace_manager)
|
||||
|
||||
mock_trace_manager.add_trace_task.assert_not_called()
|
||||
|
||||
|
||||
@ -5,8 +5,8 @@ from __future__ import annotations
|
||||
import pytest
|
||||
from pydantic import ValidationError
|
||||
|
||||
from core.telemetry.gateway import CASE_ROUTING
|
||||
from enterprise.telemetry.contracts import CaseRoute, SignalType, TelemetryCase, TelemetryEnvelope
|
||||
from enterprise.telemetry.gateway import CASE_ROUTING
|
||||
|
||||
|
||||
class TestTelemetryCase:
|
||||
@ -221,11 +221,6 @@ class TestCaseRouting:
|
||||
TelemetryCase.APP_UPDATED,
|
||||
TelemetryCase.APP_DELETED,
|
||||
TelemetryCase.FEEDBACK_CREATED,
|
||||
TelemetryCase.TOOL_EXECUTION,
|
||||
TelemetryCase.MODERATION_CHECK,
|
||||
TelemetryCase.SUGGESTED_QUESTION,
|
||||
TelemetryCase.DATASET_RETRIEVAL,
|
||||
TelemetryCase.GENERATE_NAME,
|
||||
}
|
||||
for case in metric_log_cases:
|
||||
route = CASE_ROUTING[case]
|
||||
@ -240,17 +235,17 @@ class TestCaseRouting:
|
||||
TelemetryCase.NODE_EXECUTION,
|
||||
TelemetryCase.DRAFT_NODE_EXECUTION,
|
||||
TelemetryCase.PROMPT_GENERATION,
|
||||
TelemetryCase.TOOL_EXECUTION,
|
||||
TelemetryCase.MODERATION_CHECK,
|
||||
TelemetryCase.SUGGESTED_QUESTION,
|
||||
TelemetryCase.DATASET_RETRIEVAL,
|
||||
TelemetryCase.GENERATE_NAME,
|
||||
}
|
||||
metric_log_cases = {
|
||||
TelemetryCase.APP_CREATED,
|
||||
TelemetryCase.APP_UPDATED,
|
||||
TelemetryCase.APP_DELETED,
|
||||
TelemetryCase.FEEDBACK_CREATED,
|
||||
TelemetryCase.TOOL_EXECUTION,
|
||||
TelemetryCase.MODERATION_CHECK,
|
||||
TelemetryCase.SUGGESTED_QUESTION,
|
||||
TelemetryCase.DATASET_RETRIEVAL,
|
||||
TelemetryCase.GENERATE_NAME,
|
||||
}
|
||||
|
||||
all_cases = trace_cases | metric_log_cases
|
||||
|
||||
@ -7,20 +7,12 @@ from enterprise.telemetry.contracts import TelemetryCase
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_exporter():
|
||||
with patch("extensions.ext_enterprise_telemetry.get_enterprise_exporter") as mock:
|
||||
exporter = MagicMock()
|
||||
mock.return_value = exporter
|
||||
yield exporter
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_task():
|
||||
with patch("tasks.enterprise_telemetry_task.process_enterprise_telemetry") as mock:
|
||||
def mock_gateway_emit():
|
||||
with patch("core.telemetry.gateway.emit") as mock:
|
||||
yield mock
|
||||
|
||||
|
||||
def test_handle_app_created_calls_task(mock_exporter, mock_task):
|
||||
def test_handle_app_created_calls_task(mock_gateway_emit):
|
||||
sender = MagicMock()
|
||||
sender.id = "app-123"
|
||||
sender.tenant_id = "tenant-456"
|
||||
@ -28,54 +20,53 @@ def test_handle_app_created_calls_task(mock_exporter, mock_task):
|
||||
|
||||
event_handlers._handle_app_created(sender)
|
||||
|
||||
mock_task.delay.assert_called_once()
|
||||
call_args = mock_task.delay.call_args[0][0]
|
||||
assert "app_created" in call_args
|
||||
assert "tenant-456" in call_args
|
||||
assert "app-123" in call_args
|
||||
assert "chat" in call_args
|
||||
mock_gateway_emit.assert_called_once_with(
|
||||
case=TelemetryCase.APP_CREATED,
|
||||
context={"tenant_id": "tenant-456"},
|
||||
payload={"app_id": "app-123", "mode": "chat"},
|
||||
)
|
||||
|
||||
|
||||
def test_handle_app_created_no_exporter(mock_task):
|
||||
with patch("extensions.ext_enterprise_telemetry.get_enterprise_exporter", return_value=None):
|
||||
sender = MagicMock()
|
||||
sender.id = "app-123"
|
||||
sender.tenant_id = "tenant-456"
|
||||
def test_handle_app_created_no_exporter(mock_gateway_emit):
|
||||
"""Gateway handles exporter availability internally; handler always calls gateway."""
|
||||
sender = MagicMock()
|
||||
sender.id = "app-123"
|
||||
sender.tenant_id = "tenant-456"
|
||||
|
||||
event_handlers._handle_app_created(sender)
|
||||
event_handlers._handle_app_created(sender)
|
||||
|
||||
mock_task.delay.assert_not_called()
|
||||
mock_gateway_emit.assert_called_once()
|
||||
|
||||
|
||||
def test_handle_app_updated_calls_task(mock_exporter, mock_task):
|
||||
def test_handle_app_updated_calls_task(mock_gateway_emit):
|
||||
sender = MagicMock()
|
||||
sender.id = "app-123"
|
||||
sender.tenant_id = "tenant-456"
|
||||
|
||||
event_handlers._handle_app_updated(sender)
|
||||
|
||||
mock_task.delay.assert_called_once()
|
||||
call_args = mock_task.delay.call_args[0][0]
|
||||
assert "app_updated" in call_args
|
||||
assert "tenant-456" in call_args
|
||||
assert "app-123" in call_args
|
||||
mock_gateway_emit.assert_called_once_with(
|
||||
case=TelemetryCase.APP_UPDATED,
|
||||
context={"tenant_id": "tenant-456"},
|
||||
payload={"app_id": "app-123"},
|
||||
)
|
||||
|
||||
|
||||
def test_handle_app_deleted_calls_task(mock_exporter, mock_task):
|
||||
def test_handle_app_deleted_calls_task(mock_gateway_emit):
|
||||
sender = MagicMock()
|
||||
sender.id = "app-123"
|
||||
sender.tenant_id = "tenant-456"
|
||||
|
||||
event_handlers._handle_app_deleted(sender)
|
||||
|
||||
mock_task.delay.assert_called_once()
|
||||
call_args = mock_task.delay.call_args[0][0]
|
||||
assert "app_deleted" in call_args
|
||||
assert "tenant-456" in call_args
|
||||
assert "app-123" in call_args
|
||||
mock_gateway_emit.assert_called_once_with(
|
||||
case=TelemetryCase.APP_DELETED,
|
||||
context={"tenant_id": "tenant-456"},
|
||||
payload={"app_id": "app-123"},
|
||||
)
|
||||
|
||||
|
||||
def test_handle_feedback_created_calls_task(mock_exporter, mock_task):
|
||||
def test_handle_feedback_created_calls_task(mock_gateway_emit):
|
||||
sender = MagicMock()
|
||||
sender.message_id = "msg-123"
|
||||
sender.app_id = "app-456"
|
||||
@ -88,34 +79,34 @@ def test_handle_feedback_created_calls_task(mock_exporter, mock_task):
|
||||
|
||||
event_handlers._handle_feedback_created(sender, tenant_id="tenant-456")
|
||||
|
||||
mock_task.delay.assert_called_once()
|
||||
call_args = mock_task.delay.call_args[0][0]
|
||||
assert "feedback_created" in call_args
|
||||
assert "tenant-456" in call_args
|
||||
assert "msg-123" in call_args
|
||||
assert "app-456" in call_args
|
||||
assert "conv-789" in call_args
|
||||
assert "user-001" in call_args
|
||||
assert "like" in call_args
|
||||
assert "api" in call_args
|
||||
assert "Great response!" in call_args
|
||||
mock_gateway_emit.assert_called_once_with(
|
||||
case=TelemetryCase.FEEDBACK_CREATED,
|
||||
context={"tenant_id": "tenant-456"},
|
||||
payload={
|
||||
"message_id": "msg-123",
|
||||
"app_id": "app-456",
|
||||
"conversation_id": "conv-789",
|
||||
"from_end_user_id": "user-001",
|
||||
"from_account_id": None,
|
||||
"rating": "like",
|
||||
"from_source": "api",
|
||||
"content": "Great response!",
|
||||
},
|
||||
)
|
||||
|
||||
|
||||
def test_handle_feedback_created_no_exporter(mock_task):
|
||||
with patch("extensions.ext_enterprise_telemetry.get_enterprise_exporter", return_value=None):
|
||||
sender = MagicMock()
|
||||
sender.message_id = "msg-123"
|
||||
def test_handle_feedback_created_no_exporter(mock_gateway_emit):
|
||||
"""Gateway handles exporter availability internally; handler always calls gateway."""
|
||||
sender = MagicMock()
|
||||
sender.message_id = "msg-123"
|
||||
|
||||
event_handlers._handle_feedback_created(sender, tenant_id="tenant-456")
|
||||
event_handlers._handle_feedback_created(sender, tenant_id="tenant-456")
|
||||
|
||||
mock_task.delay.assert_not_called()
|
||||
mock_gateway_emit.assert_called_once()
|
||||
|
||||
|
||||
def test_handlers_create_valid_envelopes(mock_exporter, mock_task):
|
||||
import json
|
||||
|
||||
from enterprise.telemetry.contracts import TelemetryEnvelope
|
||||
|
||||
def test_handlers_create_valid_envelopes(mock_gateway_emit):
|
||||
"""Verify handlers pass correct TelemetryCase and payload structure."""
|
||||
sender = MagicMock()
|
||||
sender.id = "app-123"
|
||||
sender.tenant_id = "tenant-456"
|
||||
@ -123,12 +114,8 @@ def test_handlers_create_valid_envelopes(mock_exporter, mock_task):
|
||||
|
||||
event_handlers._handle_app_created(sender)
|
||||
|
||||
call_args = mock_task.delay.call_args[0][0]
|
||||
envelope_dict = json.loads(call_args)
|
||||
envelope = TelemetryEnvelope(**envelope_dict)
|
||||
|
||||
assert envelope.case == TelemetryCase.APP_CREATED
|
||||
assert envelope.tenant_id == "tenant-456"
|
||||
assert envelope.event_id
|
||||
assert envelope.payload["app_id"] == "app-123"
|
||||
assert envelope.payload["mode"] == "chat"
|
||||
call_kwargs = mock_gateway_emit.call_args[1]
|
||||
assert call_kwargs["case"] == TelemetryCase.APP_CREATED
|
||||
assert call_kwargs["context"]["tenant_id"] == "tenant-456"
|
||||
assert call_kwargs["payload"]["app_id"] == "app-123"
|
||||
assert call_kwargs["payload"]["mode"] == "chat"
|
||||
|
||||
@ -6,14 +6,13 @@ from unittest.mock import MagicMock, patch
|
||||
import pytest
|
||||
|
||||
from core.ops.entities.trace_entity import TraceTaskName
|
||||
from enterprise.telemetry.contracts import SignalType, TelemetryCase, TelemetryEnvelope
|
||||
from enterprise.telemetry.gateway import (
|
||||
from core.telemetry.gateway import (
|
||||
CASE_ROUTING,
|
||||
CASE_TO_TRACE_TASK,
|
||||
PAYLOAD_SIZE_THRESHOLD_BYTES,
|
||||
TelemetryGateway,
|
||||
emit,
|
||||
)
|
||||
from enterprise.telemetry.contracts import SignalType, TelemetryCase, TelemetryEnvelope
|
||||
|
||||
|
||||
class TestCaseRoutingTable:
|
||||
@ -38,17 +37,20 @@ class TestCaseRoutingTable:
|
||||
TelemetryCase.APP_UPDATED,
|
||||
TelemetryCase.APP_DELETED,
|
||||
TelemetryCase.FEEDBACK_CREATED,
|
||||
]
|
||||
for case in metric_log_cases:
|
||||
assert CASE_ROUTING[case].signal_type is SignalType.METRIC_LOG, f"{case} should be metric_log"
|
||||
|
||||
def test_ce_eligible_cases(self) -> None:
|
||||
ce_eligible_cases = [
|
||||
TelemetryCase.WORKFLOW_RUN,
|
||||
TelemetryCase.MESSAGE_RUN,
|
||||
TelemetryCase.TOOL_EXECUTION,
|
||||
TelemetryCase.MODERATION_CHECK,
|
||||
TelemetryCase.SUGGESTED_QUESTION,
|
||||
TelemetryCase.DATASET_RETRIEVAL,
|
||||
TelemetryCase.GENERATE_NAME,
|
||||
]
|
||||
for case in metric_log_cases:
|
||||
assert CASE_ROUTING[case].signal_type is SignalType.METRIC_LOG, f"{case} should be metric_log"
|
||||
|
||||
def test_ce_eligible_cases(self) -> None:
|
||||
ce_eligible_cases = [TelemetryCase.WORKFLOW_RUN, TelemetryCase.MESSAGE_RUN]
|
||||
for case in ce_eligible_cases:
|
||||
assert CASE_ROUTING[case].ce_eligible is True, f"{case} should be CE eligible"
|
||||
|
||||
@ -87,91 +89,80 @@ def mock_ops_trace_manager():
|
||||
yield mock_module, mock_trace_entity
|
||||
|
||||
|
||||
class TestTelemetryGatewayTraceRouting:
|
||||
@pytest.fixture
|
||||
def gateway(self) -> TelemetryGateway:
|
||||
return TelemetryGateway()
|
||||
|
||||
class TestGatewayTraceRouting:
|
||||
@pytest.fixture
|
||||
def mock_trace_manager(self) -> MagicMock:
|
||||
return MagicMock()
|
||||
|
||||
@patch("enterprise.telemetry.gateway._is_enterprise_telemetry_enabled", return_value=True)
|
||||
@patch("core.telemetry.gateway.is_enterprise_telemetry_enabled", return_value=True)
|
||||
def test_trace_case_routes_to_trace_manager(
|
||||
self,
|
||||
_mock_ee_enabled: MagicMock,
|
||||
gateway: TelemetryGateway,
|
||||
mock_trace_manager: MagicMock,
|
||||
mock_ops_trace_manager: tuple[MagicMock, MagicMock],
|
||||
) -> None:
|
||||
context = {"app_id": "app-123", "user_id": "user-456", "tenant_id": "tenant-789"}
|
||||
payload = {"workflow_run_id": "run-abc"}
|
||||
|
||||
gateway.emit(TelemetryCase.WORKFLOW_RUN, context, payload, mock_trace_manager)
|
||||
emit(TelemetryCase.WORKFLOW_RUN, context, payload, mock_trace_manager)
|
||||
|
||||
mock_trace_manager.add_trace_task.assert_called_once()
|
||||
|
||||
@patch("enterprise.telemetry.gateway._is_enterprise_telemetry_enabled", return_value=False)
|
||||
@patch("core.telemetry.gateway.is_enterprise_telemetry_enabled", return_value=False)
|
||||
def test_ce_eligible_trace_enqueued_when_ee_disabled(
|
||||
self,
|
||||
_mock_ee_enabled: MagicMock,
|
||||
gateway: TelemetryGateway,
|
||||
mock_trace_manager: MagicMock,
|
||||
mock_ops_trace_manager: tuple[MagicMock, MagicMock],
|
||||
) -> None:
|
||||
context = {"app_id": "app-123", "user_id": "user-456"}
|
||||
payload = {"workflow_run_id": "run-abc"}
|
||||
|
||||
gateway.emit(TelemetryCase.WORKFLOW_RUN, context, payload, mock_trace_manager)
|
||||
emit(TelemetryCase.WORKFLOW_RUN, context, payload, mock_trace_manager)
|
||||
|
||||
mock_trace_manager.add_trace_task.assert_called_once()
|
||||
|
||||
@patch("enterprise.telemetry.gateway._is_enterprise_telemetry_enabled", return_value=False)
|
||||
@patch("core.telemetry.gateway.is_enterprise_telemetry_enabled", return_value=False)
|
||||
def test_enterprise_only_trace_dropped_when_ee_disabled(
|
||||
self,
|
||||
_mock_ee_enabled: MagicMock,
|
||||
gateway: TelemetryGateway,
|
||||
mock_trace_manager: MagicMock,
|
||||
mock_ops_trace_manager: tuple[MagicMock, MagicMock],
|
||||
) -> None:
|
||||
context = {"app_id": "app-123", "user_id": "user-456"}
|
||||
payload = {"node_id": "node-abc"}
|
||||
|
||||
gateway.emit(TelemetryCase.NODE_EXECUTION, context, payload, mock_trace_manager)
|
||||
emit(TelemetryCase.NODE_EXECUTION, context, payload, mock_trace_manager)
|
||||
|
||||
mock_trace_manager.add_trace_task.assert_not_called()
|
||||
|
||||
@patch("enterprise.telemetry.gateway._is_enterprise_telemetry_enabled", return_value=True)
|
||||
@patch("core.telemetry.gateway.is_enterprise_telemetry_enabled", return_value=True)
|
||||
def test_enterprise_only_trace_enqueued_when_ee_enabled(
|
||||
self,
|
||||
_mock_ee_enabled: MagicMock,
|
||||
gateway: TelemetryGateway,
|
||||
mock_trace_manager: MagicMock,
|
||||
mock_ops_trace_manager: tuple[MagicMock, MagicMock],
|
||||
) -> None:
|
||||
context = {"app_id": "app-123", "user_id": "user-456"}
|
||||
payload = {"node_id": "node-abc"}
|
||||
|
||||
gateway.emit(TelemetryCase.NODE_EXECUTION, context, payload, mock_trace_manager)
|
||||
emit(TelemetryCase.NODE_EXECUTION, context, payload, mock_trace_manager)
|
||||
|
||||
mock_trace_manager.add_trace_task.assert_called_once()
|
||||
|
||||
|
||||
class TestTelemetryGatewayMetricLogRouting:
|
||||
@pytest.fixture
|
||||
def gateway(self) -> TelemetryGateway:
|
||||
return TelemetryGateway()
|
||||
|
||||
class TestGatewayMetricLogRouting:
|
||||
@patch("core.telemetry.gateway.is_enterprise_telemetry_enabled", return_value=True)
|
||||
@patch("tasks.enterprise_telemetry_task.process_enterprise_telemetry.delay")
|
||||
def test_metric_case_routes_to_celery_task(
|
||||
self,
|
||||
mock_delay: MagicMock,
|
||||
gateway: TelemetryGateway,
|
||||
_mock_ee_enabled: MagicMock,
|
||||
) -> None:
|
||||
context = {"tenant_id": "tenant-123"}
|
||||
payload = {"app_id": "app-abc", "name": "My App"}
|
||||
|
||||
gateway.emit(TelemetryCase.APP_CREATED, context, payload)
|
||||
emit(TelemetryCase.APP_CREATED, context, payload)
|
||||
|
||||
mock_delay.assert_called_once()
|
||||
envelope_json = mock_delay.call_args[0][0]
|
||||
@ -180,17 +171,18 @@ class TestTelemetryGatewayMetricLogRouting:
|
||||
assert envelope.tenant_id == "tenant-123"
|
||||
assert envelope.payload["app_id"] == "app-abc"
|
||||
|
||||
@patch("core.telemetry.gateway.is_enterprise_telemetry_enabled", return_value=True)
|
||||
@patch("tasks.enterprise_telemetry_task.process_enterprise_telemetry.delay")
|
||||
def test_envelope_has_unique_event_id(
|
||||
self,
|
||||
mock_delay: MagicMock,
|
||||
gateway: TelemetryGateway,
|
||||
_mock_ee_enabled: MagicMock,
|
||||
) -> None:
|
||||
context = {"tenant_id": "tenant-123"}
|
||||
payload = {"app_id": "app-abc"}
|
||||
|
||||
gateway.emit(TelemetryCase.APP_CREATED, context, payload)
|
||||
gateway.emit(TelemetryCase.APP_CREATED, context, payload)
|
||||
emit(TelemetryCase.APP_CREATED, context, payload)
|
||||
emit(TelemetryCase.APP_CREATED, context, payload)
|
||||
|
||||
assert mock_delay.call_count == 2
|
||||
envelope1 = TelemetryEnvelope.model_validate_json(mock_delay.call_args_list[0][0][0])
|
||||
@ -198,40 +190,38 @@ class TestTelemetryGatewayMetricLogRouting:
|
||||
assert envelope1.event_id != envelope2.event_id
|
||||
|
||||
|
||||
class TestTelemetryGatewayPayloadSizing:
|
||||
@pytest.fixture
|
||||
def gateway(self) -> TelemetryGateway:
|
||||
return TelemetryGateway()
|
||||
|
||||
class TestGatewayPayloadSizing:
|
||||
@patch("core.telemetry.gateway.is_enterprise_telemetry_enabled", return_value=True)
|
||||
@patch("tasks.enterprise_telemetry_task.process_enterprise_telemetry.delay")
|
||||
def test_small_payload_inlined(
|
||||
self,
|
||||
mock_delay: MagicMock,
|
||||
gateway: TelemetryGateway,
|
||||
_mock_ee_enabled: MagicMock,
|
||||
) -> None:
|
||||
context = {"tenant_id": "tenant-123"}
|
||||
payload = {"key": "small_value"}
|
||||
|
||||
gateway.emit(TelemetryCase.APP_CREATED, context, payload)
|
||||
emit(TelemetryCase.APP_CREATED, context, payload)
|
||||
|
||||
envelope_json = mock_delay.call_args[0][0]
|
||||
envelope = TelemetryEnvelope.model_validate_json(envelope_json)
|
||||
assert envelope.payload == payload
|
||||
assert envelope.metadata is None
|
||||
|
||||
@patch("enterprise.telemetry.gateway.storage")
|
||||
@patch("core.telemetry.gateway.is_enterprise_telemetry_enabled", return_value=True)
|
||||
@patch("core.telemetry.gateway.storage")
|
||||
@patch("tasks.enterprise_telemetry_task.process_enterprise_telemetry.delay")
|
||||
def test_large_payload_stored(
|
||||
self,
|
||||
mock_delay: MagicMock,
|
||||
mock_storage: MagicMock,
|
||||
gateway: TelemetryGateway,
|
||||
_mock_ee_enabled: MagicMock,
|
||||
) -> None:
|
||||
context = {"tenant_id": "tenant-123"}
|
||||
large_value = "x" * (PAYLOAD_SIZE_THRESHOLD_BYTES + 1000)
|
||||
payload = {"key": large_value}
|
||||
|
||||
gateway.emit(TelemetryCase.APP_CREATED, context, payload)
|
||||
emit(TelemetryCase.APP_CREATED, context, payload)
|
||||
|
||||
mock_storage.save.assert_called_once()
|
||||
storage_key = mock_storage.save.call_args[0][0]
|
||||
@ -243,20 +233,21 @@ class TestTelemetryGatewayPayloadSizing:
|
||||
assert envelope.metadata is not None
|
||||
assert envelope.metadata["payload_ref"] == storage_key
|
||||
|
||||
@patch("enterprise.telemetry.gateway.storage")
|
||||
@patch("core.telemetry.gateway.is_enterprise_telemetry_enabled", return_value=True)
|
||||
@patch("core.telemetry.gateway.storage")
|
||||
@patch("tasks.enterprise_telemetry_task.process_enterprise_telemetry.delay")
|
||||
def test_large_payload_fallback_on_storage_error(
|
||||
self,
|
||||
mock_delay: MagicMock,
|
||||
mock_storage: MagicMock,
|
||||
gateway: TelemetryGateway,
|
||||
_mock_ee_enabled: MagicMock,
|
||||
) -> None:
|
||||
mock_storage.save.side_effect = Exception("Storage failure")
|
||||
context = {"tenant_id": "tenant-123"}
|
||||
large_value = "x" * (PAYLOAD_SIZE_THRESHOLD_BYTES + 1000)
|
||||
payload = {"key": large_value}
|
||||
|
||||
gateway.emit(TelemetryCase.APP_CREATED, context, payload)
|
||||
emit(TelemetryCase.APP_CREATED, context, payload)
|
||||
|
||||
envelope_json = mock_delay.call_args[0][0]
|
||||
envelope = TelemetryEnvelope.model_validate_json(envelope_json)
|
||||
@ -264,26 +255,6 @@ class TestTelemetryGatewayPayloadSizing:
|
||||
assert envelope.metadata is None
|
||||
|
||||
|
||||
class TestModuleLevelFunctions:
|
||||
@patch("extensions.ext_enterprise_telemetry.get_gateway")
|
||||
@patch("enterprise.telemetry.gateway._is_enterprise_telemetry_enabled", return_value=True)
|
||||
def test_emit_function_uses_gateway(
|
||||
self,
|
||||
_mock_ee_enabled: MagicMock,
|
||||
mock_get_gateway: MagicMock,
|
||||
mock_ops_trace_manager: tuple[MagicMock, MagicMock],
|
||||
) -> None:
|
||||
mock_gateway = TelemetryGateway()
|
||||
mock_get_gateway.return_value = mock_gateway
|
||||
mock_trace_manager = MagicMock()
|
||||
context = {"app_id": "app-123", "user_id": "user-456"}
|
||||
payload = {"workflow_run_id": "run-abc"}
|
||||
|
||||
with patch.object(mock_gateway, "emit") as mock_emit:
|
||||
emit(TelemetryCase.WORKFLOW_RUN, context, payload, mock_trace_manager)
|
||||
mock_emit.assert_called_once_with(TelemetryCase.WORKFLOW_RUN, context, payload, mock_trace_manager)
|
||||
|
||||
|
||||
class TestTraceTaskNameMapping:
|
||||
def test_workflow_run_mapping(self) -> None:
|
||||
assert CASE_TO_TRACE_TASK[TelemetryCase.WORKFLOW_RUN] is TraceTaskName.WORKFLOW_TRACE
|
||||
|
||||
@ -304,6 +304,7 @@ def test_on_app_created_emits_correct_event(mock_redis):
|
||||
attributes={
|
||||
"dify.app.id": "app-789",
|
||||
"dify.tenant_id": "tenant-123",
|
||||
"dify.event.id": "event-456",
|
||||
"dify.app.mode": "chat",
|
||||
},
|
||||
tenant_id="tenant-123",
|
||||
@ -345,6 +346,7 @@ def test_on_app_updated_emits_correct_event(mock_redis):
|
||||
attributes={
|
||||
"dify.app.id": "app-789",
|
||||
"dify.tenant_id": "tenant-123",
|
||||
"dify.event.id": "event-456",
|
||||
},
|
||||
tenant_id="tenant-123",
|
||||
)
|
||||
@ -384,6 +386,7 @@ def test_on_app_deleted_emits_correct_event(mock_redis):
|
||||
attributes={
|
||||
"dify.app.id": "app-789",
|
||||
"dify.tenant_id": "tenant-123",
|
||||
"dify.event.id": "event-456",
|
||||
},
|
||||
tenant_id="tenant-123",
|
||||
)
|
||||
|
||||
Loading…
Reference in New Issue
Block a user