dify/api/tasks/ops_trace_task.py
Blackoutta 7fc40e6c9e
feat: improve phoenix workflow tracing (#35605)
Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
Co-authored-by: -LAN- <laipz8200@outlook.com>
2026-05-11 08:37:17 +00:00

142 lines
5.9 KiB
Python

"""
Celery task for asynchronous ops trace dispatch.
Trace providers may report explicitly retryable dispatch failures through the
core retryable exception contract. The task preserves the payload file only
when Celery accepts the retry request; successful dispatches and terminal
failures clean up the stored payload.
One concrete producer today is Phoenix nested workflow tracing. The outer
workflow tool span publishes a restorable parent span context asynchronously,
while the nested workflow trace may be picked up by Celery first. In that
ordering window, the provider raises a retryable core exception instead of
dropping the trace or emitting it under the wrong parent. The task intentionally
does not know that the provider is Phoenix; it only honors the core retryable
dispatch contract.
"""
import json
import logging
from celery import shared_task
from celery.exceptions import Retry
from flask import current_app
from configs import dify_config
from core.ops.entities.config_entity import OPS_FILE_PATH, OPS_TRACE_FAILED_KEY
from core.ops.entities.trace_entity import trace_info_info_map
from core.ops.exceptions import RetryableTraceDispatchError
from core.rag.models.document import Document
from extensions.ext_redis import redis_client
from extensions.ext_storage import storage
from models.model import Message
from models.workflow import WorkflowRun
logger = logging.getLogger(__name__)
_RETRYABLE_TRACE_DISPATCH_LIMIT = dify_config.OPS_TRACE_RETRYABLE_DISPATCH_MAX_RETRIES
_RETRYABLE_TRACE_DISPATCH_DELAY_SECONDS = dify_config.OPS_TRACE_RETRYABLE_DISPATCH_DELAY_SECONDS
@shared_task(
queue="ops_trace",
bind=True,
max_retries=_RETRYABLE_TRACE_DISPATCH_LIMIT,
default_retry_delay=_RETRYABLE_TRACE_DISPATCH_DELAY_SECONDS,
)
def process_trace_tasks(self, file_info):
"""
Async process trace tasks
Usage: process_trace_tasks.delay(tasks_data)
"""
from core.ops.ops_trace_manager import OpsTraceManager
app_id = file_info.get("app_id")
file_id = file_info.get("file_id")
file_path = f"{OPS_FILE_PATH}{app_id}/{file_id}.json"
file_data = json.loads(storage.load(file_path))
trace_info = file_data.get("trace_info")
trace_info_type = file_data.get("trace_info_type")
enterprise_trace_dispatched = bool(file_data.get("_enterprise_trace_dispatched"))
trace_instance = OpsTraceManager.get_ops_trace_instance(app_id)
if trace_info.get("message_data"):
trace_info["message_data"] = Message.from_dict(data=trace_info["message_data"])
if trace_info.get("workflow_data"):
trace_info["workflow_data"] = WorkflowRun.from_dict(data=trace_info["workflow_data"])
if trace_info.get("documents"):
trace_info["documents"] = [Document.model_validate(doc) for doc in trace_info["documents"]]
should_delete_file = True
try:
trace_type = trace_info_info_map.get(trace_info_type)
if trace_type:
trace_info = trace_type(**trace_info)
from extensions.ext_enterprise_telemetry import is_enabled as is_ee_telemetry_enabled
if is_ee_telemetry_enabled() and not enterprise_trace_dispatched:
from enterprise.telemetry.enterprise_trace import EnterpriseOtelTrace
try:
EnterpriseOtelTrace().trace(trace_info)
except Exception:
logger.exception("Enterprise trace failed for app_id: %s", app_id)
else:
file_data["_enterprise_trace_dispatched"] = True
enterprise_trace_dispatched = True
if trace_instance:
with current_app.app_context():
trace_instance.trace(trace_info)
logger.info("Processing trace tasks success, app_id: %s", app_id)
except RetryableTraceDispatchError as e:
# Retryable dispatch failures represent a transient provider-side
# ordering gap, not corrupt payload data. Keep the payload only after
# Celery accepts the retry request; otherwise this attempt becomes a
# terminal failure and the stored file is cleaned up in `finally`.
#
# Enterprise telemetry runs before provider dispatch. If it already ran
# and provider dispatch asks for a retry, persist that private flag so
# the next attempt does not emit the same enterprise trace twice.
if self.request.retries >= _RETRYABLE_TRACE_DISPATCH_LIMIT:
logger.exception("Retryable trace dispatch budget exhausted, app_id: %s", app_id)
failed_key = f"{OPS_TRACE_FAILED_KEY}_{app_id}"
redis_client.incr(failed_key)
else:
logger.warning(
"Retryable trace dispatch failure, scheduling retry %s/%s for app_id %s: %s",
self.request.retries + 1,
_RETRYABLE_TRACE_DISPATCH_LIMIT,
app_id,
e,
)
try:
if enterprise_trace_dispatched:
storage.save(file_path, json.dumps(file_data).encode("utf-8"))
raise self.retry(exc=e, countdown=_RETRYABLE_TRACE_DISPATCH_DELAY_SECONDS)
except Retry:
should_delete_file = False
raise
except Exception:
logger.exception("Failed to schedule trace dispatch retry, app_id: %s", app_id)
failed_key = f"{OPS_TRACE_FAILED_KEY}_{app_id}"
redis_client.incr(failed_key)
except Exception as e:
logger.exception("Processing trace tasks failed, app_id: %s", app_id)
failed_key = f"{OPS_TRACE_FAILED_KEY}_{app_id}"
redis_client.incr(failed_key)
finally:
if should_delete_file:
try:
storage.delete(file_path)
except Exception as e:
logger.warning(
"Failed to delete trace file %s for app_id %s: %s",
file_path,
app_id,
e,
)