diff --git a/api/core/ops/arize_phoenix_trace/arize_phoenix_trace.py b/api/core/ops/arize_phoenix_trace/arize_phoenix_trace.py index 347992fa0d..a7b73e032e 100644 --- a/api/core/ops/arize_phoenix_trace/arize_phoenix_trace.py +++ b/api/core/ops/arize_phoenix_trace/arize_phoenix_trace.py @@ -6,7 +6,13 @@ from datetime import datetime, timedelta from typing import Any, Union, cast from urllib.parse import urlparse -from openinference.semconv.trace import OpenInferenceMimeTypeValues, OpenInferenceSpanKindValues, SpanAttributes +from openinference.semconv.trace import ( + MessageAttributes, + OpenInferenceMimeTypeValues, + OpenInferenceSpanKindValues, + SpanAttributes, + ToolCallAttributes, +) from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter as GrpcOTLPSpanExporter from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter as HttpOTLPSpanExporter from opentelemetry.sdk import trace as trace_sdk @@ -95,14 +101,14 @@ def setup_tracer(arize_phoenix_config: ArizeConfig | PhoenixConfig) -> tuple[tra def datetime_to_nanos(dt: datetime | None) -> int: - """Convert datetime to nanoseconds since epoch. If None, use current time.""" + """Convert datetime to nanoseconds since epoch for Arize/Phoenix.""" if dt is None: dt = datetime.now() return int(dt.timestamp() * 1_000_000_000) def error_to_string(error: Exception | str | None) -> str: - """Convert an error to a string with traceback information.""" + """Convert an error to a string with traceback information for Arize/Phoenix.""" error_message = "Empty Stack Trace" if error: if isinstance(error, Exception): @@ -114,7 +120,7 @@ def error_to_string(error: Exception | str | None) -> str: def set_span_status(current_span: Span, error: Exception | str | None = None): - """Set the status of the current span based on the presence of an error.""" + """Set the status of the current span based on the presence of an error for Arize/Phoenix.""" if error: error_string = error_to_string(error) current_span.set_status(Status(StatusCode.ERROR, error_string)) @@ -138,10 +144,17 @@ def set_span_status(current_span: Span, error: Exception | str | None = None): def safe_json_dumps(obj: Any) -> str: - """A convenience wrapper around `json.dumps` that ensures that any object can be safely encoded.""" + """A convenience wrapper to ensure that any object can be safely encoded for Arize/Phoenix.""" return json.dumps(obj, default=str, ensure_ascii=False) +def wrap_span_metadata(metadata, **kwargs): + """Add common metatada to all trace entity types for Arize/Phoenix.""" + metadata["created_from"] = "Dify" + metadata.update(kwargs) + return metadata + + class ArizePhoenixDataTrace(BaseTraceInstance): def __init__( self, @@ -183,16 +196,27 @@ class ArizePhoenixDataTrace(BaseTraceInstance): raise def workflow_trace(self, trace_info: WorkflowTraceInfo): - workflow_metadata = { - "workflow_run_id": trace_info.workflow_run_id or "", - "message_id": trace_info.message_id or "", - "workflow_app_log_id": trace_info.workflow_app_log_id or "", - "status": trace_info.workflow_run_status or "", - "status_message": trace_info.error or "", - "level": "ERROR" if trace_info.error else "DEFAULT", - "total_tokens": trace_info.total_tokens or 0, - } - workflow_metadata.update(trace_info.metadata) + file_list = trace_info.file_list if isinstance(trace_info.file_list, list) else [] + + metadata = wrap_span_metadata( + trace_info.metadata, + trace_id=trace_info.trace_id or "", + message_id=trace_info.message_id or "", + status=trace_info.workflow_run_status or "", + status_message=trace_info.error or "", + level="ERROR" if trace_info.error else "DEFAULT", + trace_entity_type="workflow", + conversation_id=trace_info.conversation_id or "", + workflow_app_log_id=trace_info.workflow_app_log_id or "", + workflow_id=trace_info.workflow_id or "", + tenant_id=trace_info.tenant_id or "", + workflow_run_id=trace_info.workflow_run_id or "", + workflow_run_elapsed_time=trace_info.workflow_run_elapsed_time or 0, + workflow_run_version=trace_info.workflow_run_version or "", + total_tokens=trace_info.total_tokens or 0, + file_list=safe_json_dumps(file_list), + query=trace_info.query or "", + ) dify_trace_id = trace_info.trace_id or trace_info.message_id or trace_info.workflow_run_id self.ensure_root_span(dify_trace_id) @@ -201,10 +225,12 @@ class ArizePhoenixDataTrace(BaseTraceInstance): workflow_span = self.tracer.start_span( name=TraceTaskName.WORKFLOW_TRACE.value, attributes={ - SpanAttributes.INPUT_VALUE: json.dumps(trace_info.workflow_run_inputs, ensure_ascii=False), - SpanAttributes.OUTPUT_VALUE: json.dumps(trace_info.workflow_run_outputs, ensure_ascii=False), SpanAttributes.OPENINFERENCE_SPAN_KIND: OpenInferenceSpanKindValues.CHAIN.value, - SpanAttributes.METADATA: json.dumps(workflow_metadata, ensure_ascii=False), + SpanAttributes.INPUT_VALUE: safe_json_dumps(trace_info.workflow_run_inputs), + SpanAttributes.INPUT_MIME_TYPE: OpenInferenceMimeTypeValues.JSON.value, + SpanAttributes.OUTPUT_VALUE: safe_json_dumps(trace_info.workflow_run_outputs), + SpanAttributes.OUTPUT_MIME_TYPE: OpenInferenceMimeTypeValues.JSON.value, + SpanAttributes.METADATA: safe_json_dumps(metadata), SpanAttributes.SESSION_ID: trace_info.conversation_id or "", }, start_time=datetime_to_nanos(trace_info.start_time), @@ -257,6 +283,7 @@ class ArizePhoenixDataTrace(BaseTraceInstance): "app_id": app_id, "app_name": node_execution.title, "status": node_execution.status, + "status_message": node_execution.error or "", "level": "ERROR" if node_execution.status == "failed" else "DEFAULT", } ) @@ -290,11 +317,11 @@ class ArizePhoenixDataTrace(BaseTraceInstance): node_span = self.tracer.start_span( name=node_execution.node_type, attributes={ + SpanAttributes.OPENINFERENCE_SPAN_KIND: span_kind.value, SpanAttributes.INPUT_VALUE: safe_json_dumps(inputs_value), SpanAttributes.INPUT_MIME_TYPE: OpenInferenceMimeTypeValues.JSON.value, SpanAttributes.OUTPUT_VALUE: safe_json_dumps(outputs_value), SpanAttributes.OUTPUT_MIME_TYPE: OpenInferenceMimeTypeValues.JSON.value, - SpanAttributes.OPENINFERENCE_SPAN_KIND: span_kind.value, SpanAttributes.METADATA: safe_json_dumps(node_metadata), SpanAttributes.SESSION_ID: trace_info.conversation_id or "", }, @@ -339,30 +366,37 @@ class ArizePhoenixDataTrace(BaseTraceInstance): def message_trace(self, trace_info: MessageTraceInfo): if trace_info.message_data is None: + logger.warning("[Arize/Phoenix] Message data is None, skipping message trace.") return - file_list = cast(list[str], trace_info.file_list) or [] + file_list = trace_info.file_list if isinstance(trace_info.file_list, list) else [] message_file_data: MessageFile | None = trace_info.message_file_data if message_file_data is not None: file_url = f"{self.file_base_url}/{message_file_data.url}" if message_file_data else "" file_list.append(file_url) - message_metadata = { - "message_id": trace_info.message_id or "", - "conversation_mode": str(trace_info.conversation_mode or ""), - "user_id": trace_info.message_data.from_account_id or "", - "file_list": json.dumps(file_list), - "status": trace_info.message_data.status or "", - "status_message": trace_info.error or "", - "level": "ERROR" if trace_info.error else "DEFAULT", - "total_tokens": trace_info.total_tokens or 0, - "prompt_tokens": trace_info.message_tokens or 0, - "completion_tokens": trace_info.answer_tokens or 0, - "ls_provider": trace_info.message_data.model_provider or "", - "ls_model_name": trace_info.message_data.model_id or "", - } - message_metadata.update(trace_info.metadata) + metadata = wrap_span_metadata( + trace_info.metadata, + trace_id=trace_info.trace_id or "", + message_id=trace_info.message_id or "", + status=trace_info.message_data.status or "", + status_message=trace_info.error or "", + level="ERROR" if trace_info.error else "DEFAULT", + trace_entity_type="message", + conversation_model=trace_info.conversation_model or "", + message_tokens=trace_info.message_tokens or 0, + answer_tokens=trace_info.answer_tokens or 0, + total_tokens=trace_info.total_tokens or 0, + conversation_mode=trace_info.conversation_mode or "", + gen_ai_server_time_to_first_token=trace_info.gen_ai_server_time_to_first_token or 0, + llm_streaming_time_to_generate=trace_info.llm_streaming_time_to_generate or 0, + is_streaming_request=trace_info.is_streaming_request or False, + user_id=trace_info.message_data.from_account_id or "", + file_list=safe_json_dumps(file_list), + model_provider=trace_info.message_data.model_provider or "", + model_id=trace_info.message_data.model_id or "", + ) # Add end user data if available if trace_info.message_data.from_end_user_id: @@ -370,14 +404,16 @@ class ArizePhoenixDataTrace(BaseTraceInstance): db.session.query(EndUser).where(EndUser.id == trace_info.message_data.from_end_user_id).first() ) if end_user_data is not None: - message_metadata["end_user_id"] = end_user_data.session_id + metadata["end_user_id"] = end_user_data.session_id attributes = { - SpanAttributes.INPUT_VALUE: trace_info.message_data.query, - SpanAttributes.OUTPUT_VALUE: trace_info.message_data.answer, SpanAttributes.OPENINFERENCE_SPAN_KIND: OpenInferenceSpanKindValues.CHAIN.value, - SpanAttributes.METADATA: json.dumps(message_metadata, ensure_ascii=False), - SpanAttributes.SESSION_ID: trace_info.message_data.conversation_id, + SpanAttributes.INPUT_VALUE: trace_info.message_data.query, + SpanAttributes.INPUT_MIME_TYPE: OpenInferenceMimeTypeValues.TEXT.value, + SpanAttributes.OUTPUT_VALUE: trace_info.message_data.answer, + SpanAttributes.OUTPUT_MIME_TYPE: OpenInferenceMimeTypeValues.TEXT.value, + SpanAttributes.METADATA: safe_json_dumps(metadata), + SpanAttributes.SESSION_ID: trace_info.message_data.conversation_id or "", } dify_trace_id = trace_info.trace_id or trace_info.message_id @@ -393,8 +429,10 @@ class ArizePhoenixDataTrace(BaseTraceInstance): try: # Convert outputs to string based on type + outputs_mime_type = OpenInferenceMimeTypeValues.TEXT.value if isinstance(trace_info.outputs, dict | list): - outputs_str = json.dumps(trace_info.outputs, ensure_ascii=False) + outputs_str = safe_json_dumps(trace_info.outputs) + outputs_mime_type = OpenInferenceMimeTypeValues.JSON.value elif isinstance(trace_info.outputs, str): outputs_str = trace_info.outputs else: @@ -402,10 +440,12 @@ class ArizePhoenixDataTrace(BaseTraceInstance): llm_attributes = { SpanAttributes.OPENINFERENCE_SPAN_KIND: OpenInferenceSpanKindValues.LLM.value, - SpanAttributes.INPUT_VALUE: json.dumps(trace_info.inputs, ensure_ascii=False), + SpanAttributes.INPUT_VALUE: safe_json_dumps(trace_info.inputs), + SpanAttributes.INPUT_MIME_TYPE: OpenInferenceMimeTypeValues.JSON.value, SpanAttributes.OUTPUT_VALUE: outputs_str, - SpanAttributes.METADATA: json.dumps(message_metadata, ensure_ascii=False), - SpanAttributes.SESSION_ID: trace_info.message_data.conversation_id, + SpanAttributes.OUTPUT_MIME_TYPE: outputs_mime_type, + SpanAttributes.METADATA: safe_json_dumps(metadata), + SpanAttributes.SESSION_ID: trace_info.message_data.conversation_id or "", } llm_attributes.update(self._construct_llm_attributes(trace_info.inputs)) if trace_info.total_tokens is not None and trace_info.total_tokens > 0: @@ -449,16 +489,20 @@ class ArizePhoenixDataTrace(BaseTraceInstance): def moderation_trace(self, trace_info: ModerationTraceInfo): if trace_info.message_data is None: + logger.warning("[Arize/Phoenix] Message data is None, skipping moderation trace.") return - metadata = { - "message_id": trace_info.message_id, - "tool_name": "moderation", - "status": trace_info.message_data.status, - "status_message": trace_info.message_data.error or "", - "level": "ERROR" if trace_info.message_data.error else "DEFAULT", - } - metadata.update(trace_info.metadata) + metadata = wrap_span_metadata( + trace_info.metadata, + trace_id=trace_info.trace_id or "", + message_id=trace_info.message_id or "", + status=trace_info.message_data.status or "", + status_message=trace_info.message_data.error or "", + level="ERROR" if trace_info.message_data.error else "DEFAULT", + trace_entity_type="moderation", + model_provider=trace_info.message_data.model_provider or "", + model_id=trace_info.message_data.model_id or "", + ) dify_trace_id = trace_info.trace_id or trace_info.message_id self.ensure_root_span(dify_trace_id) @@ -467,18 +511,19 @@ class ArizePhoenixDataTrace(BaseTraceInstance): span = self.tracer.start_span( name=TraceTaskName.MODERATION_TRACE.value, attributes={ - SpanAttributes.INPUT_VALUE: json.dumps(trace_info.inputs, ensure_ascii=False), - SpanAttributes.OUTPUT_VALUE: json.dumps( + SpanAttributes.OPENINFERENCE_SPAN_KIND: OpenInferenceSpanKindValues.TOOL.value, + SpanAttributes.INPUT_VALUE: safe_json_dumps(trace_info.inputs), + SpanAttributes.INPUT_MIME_TYPE: OpenInferenceMimeTypeValues.JSON.value, + SpanAttributes.OUTPUT_VALUE: safe_json_dumps( { - "action": trace_info.action, "flagged": trace_info.flagged, + "action": trace_info.action, "preset_response": trace_info.preset_response, - "inputs": trace_info.inputs, - }, - ensure_ascii=False, + "query": trace_info.query, + } ), - SpanAttributes.OPENINFERENCE_SPAN_KIND: OpenInferenceSpanKindValues.CHAIN.value, - SpanAttributes.METADATA: json.dumps(metadata, ensure_ascii=False), + SpanAttributes.OUTPUT_MIME_TYPE: OpenInferenceMimeTypeValues.JSON.value, + SpanAttributes.METADATA: safe_json_dumps(metadata), }, start_time=datetime_to_nanos(trace_info.start_time), context=root_span_context, @@ -494,22 +539,28 @@ class ArizePhoenixDataTrace(BaseTraceInstance): def suggested_question_trace(self, trace_info: SuggestedQuestionTraceInfo): if trace_info.message_data is None: + logger.warning("[Arize/Phoenix] Message data is None, skipping suggested question trace.") return start_time = trace_info.start_time or trace_info.message_data.created_at end_time = trace_info.end_time or trace_info.message_data.updated_at - metadata = { - "message_id": trace_info.message_id, - "tool_name": "suggested_question", - "status": trace_info.status, - "status_message": trace_info.error or "", - "level": "ERROR" if trace_info.error else "DEFAULT", - "total_tokens": trace_info.total_tokens, - "ls_provider": trace_info.model_provider or "", - "ls_model_name": trace_info.model_id or "", - } - metadata.update(trace_info.metadata) + metadata = wrap_span_metadata( + trace_info.metadata, + trace_id=trace_info.trace_id or "", + message_id=trace_info.message_id or "", + status=trace_info.status or "", + status_message=trace_info.status_message or "", + level=trace_info.level or "", + trace_entity_type="suggested_question", + total_tokens=trace_info.total_tokens or 0, + from_account_id=trace_info.from_account_id or "", + agent_based=trace_info.agent_based or False, + from_source=trace_info.from_source or "", + model_provider=trace_info.model_provider or "", + model_id=trace_info.model_id or "", + workflow_run_id=trace_info.workflow_run_id or "", + ) dify_trace_id = trace_info.trace_id or trace_info.message_id self.ensure_root_span(dify_trace_id) @@ -518,10 +569,12 @@ class ArizePhoenixDataTrace(BaseTraceInstance): span = self.tracer.start_span( name=TraceTaskName.SUGGESTED_QUESTION_TRACE.value, attributes={ - SpanAttributes.INPUT_VALUE: json.dumps(trace_info.inputs, ensure_ascii=False), - SpanAttributes.OUTPUT_VALUE: json.dumps(trace_info.suggested_question, ensure_ascii=False), - SpanAttributes.OPENINFERENCE_SPAN_KIND: OpenInferenceSpanKindValues.CHAIN.value, - SpanAttributes.METADATA: json.dumps(metadata, ensure_ascii=False), + SpanAttributes.OPENINFERENCE_SPAN_KIND: OpenInferenceSpanKindValues.TOOL.value, + SpanAttributes.INPUT_VALUE: safe_json_dumps(trace_info.inputs), + SpanAttributes.INPUT_MIME_TYPE: OpenInferenceMimeTypeValues.JSON.value, + SpanAttributes.OUTPUT_VALUE: safe_json_dumps(trace_info.suggested_question), + SpanAttributes.OUTPUT_MIME_TYPE: OpenInferenceMimeTypeValues.JSON.value, + SpanAttributes.METADATA: safe_json_dumps(metadata), }, start_time=datetime_to_nanos(start_time), context=root_span_context, @@ -537,21 +590,23 @@ class ArizePhoenixDataTrace(BaseTraceInstance): def dataset_retrieval_trace(self, trace_info: DatasetRetrievalTraceInfo): if trace_info.message_data is None: + logger.warning("[Arize/Phoenix] Message data is None, skipping dataset retrieval trace.") return start_time = trace_info.start_time or trace_info.message_data.created_at end_time = trace_info.end_time or trace_info.message_data.updated_at - metadata = { - "message_id": trace_info.message_id, - "tool_name": "dataset_retrieval", - "status": trace_info.message_data.status, - "status_message": trace_info.message_data.error or "", - "level": "ERROR" if trace_info.message_data.error else "DEFAULT", - "ls_provider": trace_info.message_data.model_provider or "", - "ls_model_name": trace_info.message_data.model_id or "", - } - metadata.update(trace_info.metadata) + metadata = wrap_span_metadata( + trace_info.metadata, + trace_id=trace_info.trace_id or "", + message_id=trace_info.message_id or "", + status=trace_info.message_data.status or "", + status_message=trace_info.error or "", + level="ERROR" if trace_info.error else "DEFAULT", + trace_entity_type="dataset_retrieval", + model_provider=trace_info.message_data.model_provider or "", + model_id=trace_info.message_data.model_id or "", + ) dify_trace_id = trace_info.trace_id or trace_info.message_id self.ensure_root_span(dify_trace_id) @@ -560,20 +615,20 @@ class ArizePhoenixDataTrace(BaseTraceInstance): span = self.tracer.start_span( name=TraceTaskName.DATASET_RETRIEVAL_TRACE.value, attributes={ - SpanAttributes.INPUT_VALUE: json.dumps(trace_info.inputs, ensure_ascii=False), - SpanAttributes.OUTPUT_VALUE: json.dumps({"documents": trace_info.documents}, ensure_ascii=False), SpanAttributes.OPENINFERENCE_SPAN_KIND: OpenInferenceSpanKindValues.RETRIEVER.value, - SpanAttributes.METADATA: json.dumps(metadata, ensure_ascii=False), - "start_time": start_time.isoformat() if start_time else "", - "end_time": end_time.isoformat() if end_time else "", + SpanAttributes.INPUT_VALUE: safe_json_dumps(trace_info.inputs), + SpanAttributes.INPUT_MIME_TYPE: OpenInferenceMimeTypeValues.JSON.value, + SpanAttributes.OUTPUT_VALUE: safe_json_dumps({"documents": trace_info.documents}), + SpanAttributes.OUTPUT_MIME_TYPE: OpenInferenceMimeTypeValues.JSON.value, + SpanAttributes.METADATA: safe_json_dumps(metadata), }, start_time=datetime_to_nanos(start_time), context=root_span_context, ) try: - if trace_info.message_data.error: - set_span_status(span, trace_info.message_data.error) + if trace_info.error: + set_span_status(span, trace_info.error) else: set_span_status(span) finally: @@ -584,30 +639,34 @@ class ArizePhoenixDataTrace(BaseTraceInstance): logger.warning("[Arize/Phoenix] Message data is None, skipping tool trace.") return - metadata = { - "message_id": trace_info.message_id, - "tool_config": json.dumps(trace_info.tool_config, ensure_ascii=False), - } + metadata = wrap_span_metadata( + trace_info.metadata, + trace_id=trace_info.trace_id or "", + message_id=trace_info.message_id or "", + status=trace_info.message_data.status or "", + status_message=trace_info.error or "", + level="ERROR" if trace_info.error else "DEFAULT", + trace_entity_type="tool", + tool_config=safe_json_dumps(trace_info.tool_config), + time_cost=trace_info.time_cost or 0, + file_url=trace_info.file_url or "", + ) dify_trace_id = trace_info.trace_id or trace_info.message_id self.ensure_root_span(dify_trace_id) root_span_context = self.propagator.extract(carrier=self.carrier) - tool_params_str = ( - json.dumps(trace_info.tool_parameters, ensure_ascii=False) - if isinstance(trace_info.tool_parameters, dict) - else str(trace_info.tool_parameters) - ) - span = self.tracer.start_span( name=trace_info.tool_name, attributes={ - SpanAttributes.INPUT_VALUE: json.dumps(trace_info.tool_inputs, ensure_ascii=False), - SpanAttributes.OUTPUT_VALUE: trace_info.tool_outputs, SpanAttributes.OPENINFERENCE_SPAN_KIND: OpenInferenceSpanKindValues.TOOL.value, - SpanAttributes.METADATA: json.dumps(metadata, ensure_ascii=False), + SpanAttributes.INPUT_VALUE: safe_json_dumps(trace_info.tool_inputs), + SpanAttributes.INPUT_MIME_TYPE: OpenInferenceMimeTypeValues.JSON.value, + SpanAttributes.OUTPUT_VALUE: trace_info.tool_outputs, + SpanAttributes.OUTPUT_MIME_TYPE: OpenInferenceMimeTypeValues.TEXT.value, + SpanAttributes.METADATA: safe_json_dumps(metadata), SpanAttributes.TOOL_NAME: trace_info.tool_name, - SpanAttributes.TOOL_PARAMETERS: tool_params_str, + SpanAttributes.TOOL_PARAMETERS: safe_json_dumps(trace_info.tool_parameters), }, start_time=datetime_to_nanos(trace_info.start_time), context=root_span_context, @@ -623,16 +682,22 @@ class ArizePhoenixDataTrace(BaseTraceInstance): def generate_name_trace(self, trace_info: GenerateNameTraceInfo): if trace_info.message_data is None: + logger.warning("[Arize/Phoenix] Message data is None, skipping generate name trace.") return - metadata = { - "project_name": self.project, - "message_id": trace_info.message_id, - "status": trace_info.message_data.status, - "status_message": trace_info.message_data.error or "", - "level": "ERROR" if trace_info.message_data.error else "DEFAULT", - } - metadata.update(trace_info.metadata) + metadata = wrap_span_metadata( + trace_info.metadata, + trace_id=trace_info.trace_id or "", + message_id=trace_info.message_id or "", + status=trace_info.message_data.status or "", + status_message=trace_info.message_data.error or "", + level="ERROR" if trace_info.message_data.error else "DEFAULT", + trace_entity_type="generate_name", + model_provider=trace_info.message_data.model_provider or "", + model_id=trace_info.message_data.model_id or "", + conversation_id=trace_info.conversation_id or "", + tenant_id=trace_info.tenant_id, + ) dify_trace_id = trace_info.trace_id or trace_info.message_id or trace_info.conversation_id self.ensure_root_span(dify_trace_id) @@ -641,13 +706,13 @@ class ArizePhoenixDataTrace(BaseTraceInstance): span = self.tracer.start_span( name=TraceTaskName.GENERATE_NAME_TRACE.value, attributes={ - SpanAttributes.INPUT_VALUE: json.dumps(trace_info.inputs, ensure_ascii=False), - SpanAttributes.OUTPUT_VALUE: json.dumps(trace_info.outputs, ensure_ascii=False), SpanAttributes.OPENINFERENCE_SPAN_KIND: OpenInferenceSpanKindValues.CHAIN.value, - SpanAttributes.METADATA: json.dumps(metadata, ensure_ascii=False), - SpanAttributes.SESSION_ID: trace_info.message_data.conversation_id, - "start_time": trace_info.start_time.isoformat() if trace_info.start_time else "", - "end_time": trace_info.end_time.isoformat() if trace_info.end_time else "", + SpanAttributes.INPUT_VALUE: safe_json_dumps(trace_info.inputs), + SpanAttributes.INPUT_MIME_TYPE: OpenInferenceMimeTypeValues.JSON.value, + SpanAttributes.OUTPUT_VALUE: safe_json_dumps(trace_info.outputs), + SpanAttributes.OUTPUT_MIME_TYPE: OpenInferenceMimeTypeValues.JSON.value, + SpanAttributes.METADATA: safe_json_dumps(metadata), + SpanAttributes.SESSION_ID: trace_info.conversation_id or "", }, start_time=datetime_to_nanos(trace_info.start_time), context=root_span_context, @@ -688,32 +753,85 @@ class ArizePhoenixDataTrace(BaseTraceInstance): raise ValueError(f"[Arize/Phoenix] API check failed: {str(e)}") def get_project_url(self): + """Build a redirect URL that forwards the user to the correct project for Arize/Phoenix.""" try: - if self.arize_phoenix_config.endpoint == "https://otlp.arize.com": - return "https://app.arize.com/" - else: - return f"{self.arize_phoenix_config.endpoint}/projects/" + project_name = self.arize_phoenix_config.project + endpoint = self.arize_phoenix_config.endpoint.rstrip("/") + + # Arize + if isinstance(self.arize_phoenix_config, ArizeConfig): + return f"https://app.arize.com/?redirect_project_name={project_name}" + + # Phoenix + return f"{endpoint}/projects/?redirect_project_name={project_name}" + except Exception as e: - logger.info("[Arize/Phoenix] Get run url failed: %s", str(e), exc_info=True) - raise ValueError(f"[Arize/Phoenix] Get run url failed: {str(e)}") + logger.info("[Arize/Phoenix] Failed to construct project URL: %s", str(e), exc_info=True) + raise ValueError(f"[Arize/Phoenix] Failed to construct project URL: {str(e)}") def _construct_llm_attributes(self, prompts: dict | list | str | None) -> dict[str, str]: - """Helper method to construct LLM attributes with passed prompts.""" - attributes = {} + """Construct LLM attributes with passed prompts for Arize/Phoenix.""" + attributes: dict[str, str] = {} + + def set_attribute(path: str, value: object) -> None: + """Store an attribute safely as a string.""" + if value is None: + return + try: + if isinstance(value, (dict, list)): + value = safe_json_dumps(value) + attributes[path] = str(value) + except Exception: + attributes[path] = str(value) + + def set_message_attribute(message_index: int, key: str, value: object) -> None: + path = f"{SpanAttributes.LLM_INPUT_MESSAGES}.{message_index}.{key}" + set_attribute(path, value) + + def set_tool_call_attributes(message_index: int, tool_index: int, tool_call: dict | object | None) -> None: + """Extract and assign tool call details safely.""" + if not tool_call: + return + + def safe_get(obj, key, default=None): + if isinstance(obj, dict): + return obj.get(key, default) + return getattr(obj, key, default) + + function_obj = safe_get(tool_call, "function", {}) + function_name = safe_get(function_obj, "name", "") + function_args = safe_get(function_obj, "arguments", {}) + call_id = safe_get(tool_call, "id", "") + + base_path = ( + f"{SpanAttributes.LLM_INPUT_MESSAGES}." + f"{message_index}.{MessageAttributes.MESSAGE_TOOL_CALLS}.{tool_index}" + ) + + set_attribute(f"{base_path}.{ToolCallAttributes.TOOL_CALL_FUNCTION_NAME}", function_name) + set_attribute(f"{base_path}.{ToolCallAttributes.TOOL_CALL_FUNCTION_ARGUMENTS_JSON}", function_args) + set_attribute(f"{base_path}.{ToolCallAttributes.TOOL_CALL_ID}", call_id) + + # Handle list of messages if isinstance(prompts, list): - for i, msg in enumerate(prompts): - if isinstance(msg, dict): - attributes[f"{SpanAttributes.LLM_INPUT_MESSAGES}.{i}.message.content"] = msg.get("text", "") - attributes[f"{SpanAttributes.LLM_INPUT_MESSAGES}.{i}.message.role"] = msg.get("role", "user") - # todo: handle assistant and tool role messages, as they don't always - # have a text field, but may have a tool_calls field instead - # e.g. 'tool_calls': [{'id': '98af3a29-b066-45a5-b4b1-46c74ddafc58', - # 'type': 'function', 'function': {'name': 'current_time', 'arguments': '{}'}}]} - elif isinstance(prompts, dict): - attributes[f"{SpanAttributes.LLM_INPUT_MESSAGES}.0.message.content"] = json.dumps(prompts) - attributes[f"{SpanAttributes.LLM_INPUT_MESSAGES}.0.message.role"] = "user" - elif isinstance(prompts, str): - attributes[f"{SpanAttributes.LLM_INPUT_MESSAGES}.0.message.content"] = prompts - attributes[f"{SpanAttributes.LLM_INPUT_MESSAGES}.0.message.role"] = "user" + for message_index, message in enumerate(prompts): + if not isinstance(message, dict): + continue + + role = message.get("role", "user") + content = message.get("text") or message.get("content") or "" + + set_message_attribute(message_index, MessageAttributes.MESSAGE_ROLE, role) + set_message_attribute(message_index, MessageAttributes.MESSAGE_CONTENT, content) + + tool_calls = message.get("tool_calls") or [] + if isinstance(tool_calls, list): + for tool_index, tool_call in enumerate(tool_calls): + set_tool_call_attributes(message_index, tool_index, tool_call) + + # Handle single dict or plain string prompt + elif isinstance(prompts, (dict, str)): + set_message_attribute(0, MessageAttributes.MESSAGE_CONTENT, prompts) + set_message_attribute(0, MessageAttributes.MESSAGE_ROLE, "user") return attributes