diff --git a/api/README.md b/api/README.md index 9d89b490b0..9871d2c311 100644 --- a/api/README.md +++ b/api/README.md @@ -122,7 +122,7 @@ These commands assume you start from the repository root. ```bash cd api - uv run celery -A app.celery worker -P threads -c 2 --loglevel INFO -Q dataset,priority_dataset,priority_pipeline,pipeline,mail,ops_trace,app_deletion,plugin,workflow_storage,conversation,workflow,schedule_poller,schedule_executor,triggered_workflow_dispatcher,trigger_refresh_executor,retention + uv run celery -A app.celery worker -P threads -c 2 --loglevel INFO -Q dataset,priority_dataset,priority_pipeline,pipeline,mail,ops_trace,app_deletion,plugin,workflow_storage,conversation,workflow,schedule_poller,schedule_executor,triggered_workflow_dispatcher,trigger_refresh_executor,retention,enterprise_telemetry ``` 1. Optional: start Celery Beat (scheduled tasks, in a new terminal). diff --git a/api/enterprise/telemetry/metric_handler.py b/api/enterprise/telemetry/metric_handler.py index f87fe36ebf..cfe1768a10 100644 --- a/api/enterprise/telemetry/metric_handler.py +++ b/api/enterprise/telemetry/metric_handler.py @@ -121,14 +121,10 @@ class EnterpriseMetricHandler: dedup_key = f"telemetry:dedup:{envelope.tenant_id}:{envelope.event_id}" try: - # Try to get existing value - existing = redis_client.get(dedup_key) - if existing is not None: - return True - - # First time seeing this event - mark as seen with 1h TTL - redis_client.setex(dedup_key, 3600, b"1") - return False + # Atomic set-if-not-exists with 1h TTL + # Returns True if key was set (first time), None if already exists (duplicate) + was_set = redis_client.set(dedup_key, b"1", nx=True, ex=3600) + return was_set is None except Exception: # Fail open: if Redis is unavailable, process the event # (prefer occasional duplicate over lost data) diff --git a/api/tests/unit_tests/enterprise/telemetry/test_metric_handler.py b/api/tests/unit_tests/enterprise/telemetry/test_metric_handler.py index 59f234c183..581e1631d5 100644 --- a/api/tests/unit_tests/enterprise/telemetry/test_metric_handler.py +++ b/api/tests/unit_tests/enterprise/telemetry/test_metric_handler.py @@ -25,7 +25,7 @@ def sample_envelope(): def test_dispatch_app_created(sample_envelope, mock_redis): - mock_redis.get.return_value = None + mock_redis.set.return_value = True handler = EnterpriseMetricHandler() with patch.object(handler, "_on_app_created") as mock_handler: @@ -34,7 +34,7 @@ def test_dispatch_app_created(sample_envelope, mock_redis): def test_dispatch_app_updated(mock_redis): - mock_redis.get.return_value = None + mock_redis.set.return_value = True envelope = TelemetryEnvelope( case=TelemetryCase.APP_UPDATED, tenant_id="test-tenant", @@ -49,7 +49,7 @@ def test_dispatch_app_updated(mock_redis): def test_dispatch_app_deleted(mock_redis): - mock_redis.get.return_value = None + mock_redis.set.return_value = True envelope = TelemetryEnvelope( case=TelemetryCase.APP_DELETED, tenant_id="test-tenant", @@ -64,7 +64,7 @@ def test_dispatch_app_deleted(mock_redis): def test_dispatch_feedback_created(mock_redis): - mock_redis.get.return_value = None + mock_redis.set.return_value = True envelope = TelemetryEnvelope( case=TelemetryCase.FEEDBACK_CREATED, tenant_id="test-tenant", @@ -79,7 +79,7 @@ def test_dispatch_feedback_created(mock_redis): def test_dispatch_message_run(mock_redis): - mock_redis.get.return_value = None + mock_redis.set.return_value = True envelope = TelemetryEnvelope( case=TelemetryCase.MESSAGE_RUN, tenant_id="test-tenant", @@ -94,7 +94,7 @@ def test_dispatch_message_run(mock_redis): def test_dispatch_tool_execution(mock_redis): - mock_redis.get.return_value = None + mock_redis.set.return_value = True envelope = TelemetryEnvelope( case=TelemetryCase.TOOL_EXECUTION, tenant_id="test-tenant", @@ -109,7 +109,7 @@ def test_dispatch_tool_execution(mock_redis): def test_dispatch_moderation_check(mock_redis): - mock_redis.get.return_value = None + mock_redis.set.return_value = True envelope = TelemetryEnvelope( case=TelemetryCase.MODERATION_CHECK, tenant_id="test-tenant", @@ -124,7 +124,7 @@ def test_dispatch_moderation_check(mock_redis): def test_dispatch_suggested_question(mock_redis): - mock_redis.get.return_value = None + mock_redis.set.return_value = True envelope = TelemetryEnvelope( case=TelemetryCase.SUGGESTED_QUESTION, tenant_id="test-tenant", @@ -139,7 +139,7 @@ def test_dispatch_suggested_question(mock_redis): def test_dispatch_dataset_retrieval(mock_redis): - mock_redis.get.return_value = None + mock_redis.set.return_value = True envelope = TelemetryEnvelope( case=TelemetryCase.DATASET_RETRIEVAL, tenant_id="test-tenant", @@ -154,7 +154,7 @@ def test_dispatch_dataset_retrieval(mock_redis): def test_dispatch_generate_name(mock_redis): - mock_redis.get.return_value = None + mock_redis.set.return_value = True envelope = TelemetryEnvelope( case=TelemetryCase.GENERATE_NAME, tenant_id="test-tenant", @@ -169,7 +169,7 @@ def test_dispatch_generate_name(mock_redis): def test_dispatch_prompt_generation(mock_redis): - mock_redis.get.return_value = None + mock_redis.set.return_value = True envelope = TelemetryEnvelope( case=TelemetryCase.PROMPT_GENERATION, tenant_id="test-tenant", @@ -184,7 +184,7 @@ def test_dispatch_prompt_generation(mock_redis): def test_all_known_cases_have_handlers(mock_redis): - mock_redis.get.return_value = None + mock_redis.set.return_value = True handler = EnterpriseMetricHandler() for case in TelemetryCase: @@ -198,7 +198,7 @@ def test_all_known_cases_have_handlers(mock_redis): def test_idempotency_duplicate(sample_envelope, mock_redis): - mock_redis.get.return_value = b"1" + mock_redis.set.return_value = None handler = EnterpriseMetricHandler() with patch.object(handler, "_on_app_created") as mock_handler: @@ -207,21 +207,22 @@ def test_idempotency_duplicate(sample_envelope, mock_redis): def test_idempotency_first_seen(sample_envelope, mock_redis): - mock_redis.get.return_value = None + mock_redis.set.return_value = True handler = EnterpriseMetricHandler() is_dup = handler._is_duplicate(sample_envelope) assert is_dup is False - mock_redis.setex.assert_called_once_with( + mock_redis.set.assert_called_once_with( "telemetry:dedup:test-tenant:test-event-123", - 3600, b"1", + nx=True, + ex=3600, ) def test_idempotency_redis_failure_fails_open(sample_envelope, mock_redis, caplog): - mock_redis.get.side_effect = Exception("Redis unavailable") + mock_redis.set.side_effect = Exception("Redis unavailable") handler = EnterpriseMetricHandler() is_dup = handler._is_duplicate(sample_envelope) @@ -276,7 +277,7 @@ def test_rehydration_emits_degraded_event_on_failure(): def test_on_app_created_emits_correct_event(mock_redis): - mock_redis.get.return_value = None + mock_redis.set.return_value = True envelope = TelemetryEnvelope( case=TelemetryCase.APP_CREATED, tenant_id="tenant-123", @@ -311,7 +312,7 @@ def test_on_app_created_emits_correct_event(mock_redis): def test_on_app_updated_emits_correct_event(mock_redis): - mock_redis.get.return_value = None + mock_redis.set.return_value = True envelope = TelemetryEnvelope( case=TelemetryCase.APP_UPDATED, tenant_id="tenant-123", @@ -343,7 +344,7 @@ def test_on_app_updated_emits_correct_event(mock_redis): def test_on_app_deleted_emits_correct_event(mock_redis): - mock_redis.get.return_value = None + mock_redis.set.return_value = True envelope = TelemetryEnvelope( case=TelemetryCase.APP_DELETED, tenant_id="tenant-123", @@ -375,7 +376,7 @@ def test_on_app_deleted_emits_correct_event(mock_redis): def test_on_feedback_created_emits_correct_event(mock_redis): - mock_redis.get.return_value = None + mock_redis.set.return_value = True envelope = TelemetryEnvelope( case=TelemetryCase.FEEDBACK_CREATED, tenant_id="tenant-123", @@ -418,7 +419,7 @@ def test_on_feedback_created_emits_correct_event(mock_redis): def test_on_feedback_created_without_content(mock_redis): - mock_redis.get.return_value = None + mock_redis.set.return_value = True envelope = TelemetryEnvelope( case=TelemetryCase.FEEDBACK_CREATED, tenant_id="tenant-123",