fix(telemetry): use atomic Redis SET NX for idempotency and register Celery queue

This commit is contained in:
GareArc 2026-02-05 20:07:11 -08:00
parent 1663a7ab4c
commit 6e47e163b8
No known key found for this signature in database
3 changed files with 28 additions and 31 deletions

View File

@ -122,7 +122,7 @@ These commands assume you start from the repository root.
```bash
cd api
uv run celery -A app.celery worker -P threads -c 2 --loglevel INFO -Q dataset,priority_dataset,priority_pipeline,pipeline,mail,ops_trace,app_deletion,plugin,workflow_storage,conversation,workflow,schedule_poller,schedule_executor,triggered_workflow_dispatcher,trigger_refresh_executor,retention
uv run celery -A app.celery worker -P threads -c 2 --loglevel INFO -Q dataset,priority_dataset,priority_pipeline,pipeline,mail,ops_trace,app_deletion,plugin,workflow_storage,conversation,workflow,schedule_poller,schedule_executor,triggered_workflow_dispatcher,trigger_refresh_executor,retention,enterprise_telemetry
```
1. Optional: start Celery Beat (scheduled tasks, in a new terminal).

View File

@ -121,14 +121,10 @@ class EnterpriseMetricHandler:
dedup_key = f"telemetry:dedup:{envelope.tenant_id}:{envelope.event_id}"
try:
# Try to get existing value
existing = redis_client.get(dedup_key)
if existing is not None:
return True
# First time seeing this event - mark as seen with 1h TTL
redis_client.setex(dedup_key, 3600, b"1")
return False
# Atomic set-if-not-exists with 1h TTL
# Returns True if key was set (first time), None if already exists (duplicate)
was_set = redis_client.set(dedup_key, b"1", nx=True, ex=3600)
return was_set is None
except Exception:
# Fail open: if Redis is unavailable, process the event
# (prefer occasional duplicate over lost data)

View File

@ -25,7 +25,7 @@ def sample_envelope():
def test_dispatch_app_created(sample_envelope, mock_redis):
mock_redis.get.return_value = None
mock_redis.set.return_value = True
handler = EnterpriseMetricHandler()
with patch.object(handler, "_on_app_created") as mock_handler:
@ -34,7 +34,7 @@ def test_dispatch_app_created(sample_envelope, mock_redis):
def test_dispatch_app_updated(mock_redis):
mock_redis.get.return_value = None
mock_redis.set.return_value = True
envelope = TelemetryEnvelope(
case=TelemetryCase.APP_UPDATED,
tenant_id="test-tenant",
@ -49,7 +49,7 @@ def test_dispatch_app_updated(mock_redis):
def test_dispatch_app_deleted(mock_redis):
mock_redis.get.return_value = None
mock_redis.set.return_value = True
envelope = TelemetryEnvelope(
case=TelemetryCase.APP_DELETED,
tenant_id="test-tenant",
@ -64,7 +64,7 @@ def test_dispatch_app_deleted(mock_redis):
def test_dispatch_feedback_created(mock_redis):
mock_redis.get.return_value = None
mock_redis.set.return_value = True
envelope = TelemetryEnvelope(
case=TelemetryCase.FEEDBACK_CREATED,
tenant_id="test-tenant",
@ -79,7 +79,7 @@ def test_dispatch_feedback_created(mock_redis):
def test_dispatch_message_run(mock_redis):
mock_redis.get.return_value = None
mock_redis.set.return_value = True
envelope = TelemetryEnvelope(
case=TelemetryCase.MESSAGE_RUN,
tenant_id="test-tenant",
@ -94,7 +94,7 @@ def test_dispatch_message_run(mock_redis):
def test_dispatch_tool_execution(mock_redis):
mock_redis.get.return_value = None
mock_redis.set.return_value = True
envelope = TelemetryEnvelope(
case=TelemetryCase.TOOL_EXECUTION,
tenant_id="test-tenant",
@ -109,7 +109,7 @@ def test_dispatch_tool_execution(mock_redis):
def test_dispatch_moderation_check(mock_redis):
mock_redis.get.return_value = None
mock_redis.set.return_value = True
envelope = TelemetryEnvelope(
case=TelemetryCase.MODERATION_CHECK,
tenant_id="test-tenant",
@ -124,7 +124,7 @@ def test_dispatch_moderation_check(mock_redis):
def test_dispatch_suggested_question(mock_redis):
mock_redis.get.return_value = None
mock_redis.set.return_value = True
envelope = TelemetryEnvelope(
case=TelemetryCase.SUGGESTED_QUESTION,
tenant_id="test-tenant",
@ -139,7 +139,7 @@ def test_dispatch_suggested_question(mock_redis):
def test_dispatch_dataset_retrieval(mock_redis):
mock_redis.get.return_value = None
mock_redis.set.return_value = True
envelope = TelemetryEnvelope(
case=TelemetryCase.DATASET_RETRIEVAL,
tenant_id="test-tenant",
@ -154,7 +154,7 @@ def test_dispatch_dataset_retrieval(mock_redis):
def test_dispatch_generate_name(mock_redis):
mock_redis.get.return_value = None
mock_redis.set.return_value = True
envelope = TelemetryEnvelope(
case=TelemetryCase.GENERATE_NAME,
tenant_id="test-tenant",
@ -169,7 +169,7 @@ def test_dispatch_generate_name(mock_redis):
def test_dispatch_prompt_generation(mock_redis):
mock_redis.get.return_value = None
mock_redis.set.return_value = True
envelope = TelemetryEnvelope(
case=TelemetryCase.PROMPT_GENERATION,
tenant_id="test-tenant",
@ -184,7 +184,7 @@ def test_dispatch_prompt_generation(mock_redis):
def test_all_known_cases_have_handlers(mock_redis):
mock_redis.get.return_value = None
mock_redis.set.return_value = True
handler = EnterpriseMetricHandler()
for case in TelemetryCase:
@ -198,7 +198,7 @@ def test_all_known_cases_have_handlers(mock_redis):
def test_idempotency_duplicate(sample_envelope, mock_redis):
mock_redis.get.return_value = b"1"
mock_redis.set.return_value = None
handler = EnterpriseMetricHandler()
with patch.object(handler, "_on_app_created") as mock_handler:
@ -207,21 +207,22 @@ def test_idempotency_duplicate(sample_envelope, mock_redis):
def test_idempotency_first_seen(sample_envelope, mock_redis):
mock_redis.get.return_value = None
mock_redis.set.return_value = True
handler = EnterpriseMetricHandler()
is_dup = handler._is_duplicate(sample_envelope)
assert is_dup is False
mock_redis.setex.assert_called_once_with(
mock_redis.set.assert_called_once_with(
"telemetry:dedup:test-tenant:test-event-123",
3600,
b"1",
nx=True,
ex=3600,
)
def test_idempotency_redis_failure_fails_open(sample_envelope, mock_redis, caplog):
mock_redis.get.side_effect = Exception("Redis unavailable")
mock_redis.set.side_effect = Exception("Redis unavailable")
handler = EnterpriseMetricHandler()
is_dup = handler._is_duplicate(sample_envelope)
@ -276,7 +277,7 @@ def test_rehydration_emits_degraded_event_on_failure():
def test_on_app_created_emits_correct_event(mock_redis):
mock_redis.get.return_value = None
mock_redis.set.return_value = True
envelope = TelemetryEnvelope(
case=TelemetryCase.APP_CREATED,
tenant_id="tenant-123",
@ -311,7 +312,7 @@ def test_on_app_created_emits_correct_event(mock_redis):
def test_on_app_updated_emits_correct_event(mock_redis):
mock_redis.get.return_value = None
mock_redis.set.return_value = True
envelope = TelemetryEnvelope(
case=TelemetryCase.APP_UPDATED,
tenant_id="tenant-123",
@ -343,7 +344,7 @@ def test_on_app_updated_emits_correct_event(mock_redis):
def test_on_app_deleted_emits_correct_event(mock_redis):
mock_redis.get.return_value = None
mock_redis.set.return_value = True
envelope = TelemetryEnvelope(
case=TelemetryCase.APP_DELETED,
tenant_id="tenant-123",
@ -375,7 +376,7 @@ def test_on_app_deleted_emits_correct_event(mock_redis):
def test_on_feedback_created_emits_correct_event(mock_redis):
mock_redis.get.return_value = None
mock_redis.set.return_value = True
envelope = TelemetryEnvelope(
case=TelemetryCase.FEEDBACK_CREATED,
tenant_id="tenant-123",
@ -418,7 +419,7 @@ def test_on_feedback_created_emits_correct_event(mock_redis):
def test_on_feedback_created_without_content(mock_redis):
mock_redis.get.return_value = None
mock_redis.set.return_value = True
envelope = TelemetryEnvelope(
case=TelemetryCase.FEEDBACK_CREATED,
tenant_id="tenant-123",