feat(telemetry): add invoked_by user tracking to enterprise OTEL

This commit is contained in:
GareArc 2026-02-03 01:23:15 -08:00
parent f627c7fdab
commit ae43fd96bf
No known key found for this signature in database
4 changed files with 32 additions and 0 deletions

View File

@ -52,6 +52,8 @@ class WorkflowTraceInfo(BaseTraceInfo):
query: str
metadata: dict[str, Any]
invoked_by: str | None = None
class MessageTraceInfo(BaseTraceInfo):
conversation_model: str
@ -151,6 +153,8 @@ class WorkflowNodeTraceInfo(BaseTraceInfo):
node_outputs: Mapping[str, Any] | None = None
process_data: Mapping[str, Any] | None = None
invoked_by: str | None = None
model_config = ConfigDict(protected_namespaces=())

View File

@ -520,6 +520,27 @@ class TraceTask:
cls._workflow_run_repo = DifyAPIRepositoryFactory.create_api_workflow_run_repository(session_maker)
return cls._workflow_run_repo
@classmethod
def _get_user_id_from_metadata(cls, metadata: dict[str, Any]) -> str:
"""Extract user ID from metadata, prioritizing end_user over account.
Returns the actual user ID (end_user or account) who invoked the workflow,
regardless of invoke_from context.
"""
# Priority 1: End user (external users via API/WebApp)
if user_id := metadata.get("from_end_user_id"):
return f"end_user:{user_id}"
# Priority 2: Account user (internal users via console/debugger)
if user_id := metadata.get("from_account_id"):
return f"account:{user_id}"
# Priority 3: User (internal users via console/debugger)
if user_id := metadata.get("user_id"):
return f"user:{user_id}"
return "anonymous"
def __init__(
self,
trace_type: Any,
@ -670,6 +691,7 @@ class TraceTask:
message_id=message_id,
start_time=workflow_run.created_at,
end_time=workflow_run.finished_at,
invoked_by=self._get_user_id_from_metadata(metadata),
)
return workflow_trace_info
@ -1081,6 +1103,7 @@ class TraceTask:
node_inputs=node_data.get("node_inputs"),
node_outputs=node_data.get("node_outputs"),
process_data=node_data.get("process_data"),
invoked_by=self._get_user_id_from_metadata(metadata),
)
def draft_node_execution_trace(self, **kwargs) -> DraftNodeExecutionTrace | dict:

View File

@ -110,6 +110,7 @@ class EnterpriseOtelTrace:
"dify.invoke_from": info.metadata.get("triggered_from"),
"dify.conversation.id": info.conversation_id,
"dify.message.id": info.message_id,
"dify.invoked_by": info.invoked_by,
}
trace_correlation_override: str | None = None
@ -230,6 +231,7 @@ class EnterpriseOtelTrace:
"dify.node.iteration_id": info.iteration_id,
"dify.node.loop_id": info.loop_id,
"dify.node.parallel_id": info.parallel_id,
"dify.node.invoked_by": info.invoked_by,
}
trace_correlation_override = trace_correlation_override_param

View File

@ -21,3 +21,6 @@ class DifySpanAttributes:
INVOKE_FROM = "dify.invoke_from"
"""Invocation source, e.g. SERVICE_API, WEB_APP, DEBUGGER."""
INVOKED_BY = "dify.invoked_by"
"""Invoked by, e.g. end_user, account, user."""