From 2126a5a11b1899d98fd98560169d30a2c3f81777 Mon Sep 17 00:00:00 2001 From: xuwei95 <1013104194@qq.com> Date: Mon, 13 Apr 2026 10:18:44 +0800 Subject: [PATCH] feat: try to see the hierarchy in phonenix trace rather than all single events try to see the hierarchy in phonenix trace rather than all single events --- .../arize_phoenix_trace.py | 2470 +++++++++++++---- 1 file changed, 1990 insertions(+), 480 deletions(-) diff --git a/api/providers/trace/trace-arize-phoenix/src/dify_trace_arize_phoenix/arize_phoenix_trace.py b/api/providers/trace/trace-arize-phoenix/src/dify_trace_arize_phoenix/arize_phoenix_trace.py index 96df49ed0e..7fb23482bf 100644 --- a/api/providers/trace/trace-arize-phoenix/src/dify_trace_arize_phoenix/arize_phoenix_trace.py +++ b/api/providers/trace/trace-arize-phoenix/src/dify_trace_arize_phoenix/arize_phoenix_trace.py @@ -1,7 +1,7 @@ +import hashlib import json import logging import os -import traceback from datetime import datetime, timedelta from typing import Any, Union, cast from urllib.parse import urlparse @@ -13,16 +13,15 @@ from openinference.semconv.trace import ( SpanAttributes, ToolCallAttributes, ) +from opentelemetry import trace from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter as GrpcOTLPSpanExporter from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter as HttpOTLPSpanExporter from opentelemetry.sdk import trace as trace_sdk from opentelemetry.sdk.resources import Resource from opentelemetry.sdk.trace.export import SimpleSpanProcessor -from opentelemetry.semconv.attributes import exception_attributes -from opentelemetry.trace import Span, Status, StatusCode, set_span_in_context, use_span -from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator -from opentelemetry.util.types import AttributeValue -from sqlalchemy.orm import sessionmaker +from opentelemetry.sdk.trace.id_generator import RandomIdGenerator +from opentelemetry.trace import SpanContext, TraceFlags, TraceState +from sqlalchemy import select, text from core.ops.base_trace_instance import BaseTraceInstance from core.ops.entities.trace_entity import ( @@ -42,7 +41,7 @@ from dify_trace_arize_phoenix.config import ArizeConfig, PhoenixConfig from extensions.ext_database import db from graphon.enums import WorkflowNodeExecutionStatus from models.model import EndUser, MessageFile -from models.workflow import WorkflowNodeExecutionTriggeredFrom +from models.workflow import WorkflowNodeExecutionModel logger = logging.getLogger(__name__) @@ -76,11 +75,22 @@ def setup_tracer(arize_phoenix_config: ArizeConfig | PhoenixConfig) -> tuple[tra "api_key": arize_phoenix_config.api_key or "", "authorization": f"Bearer {arize_phoenix_config.api_key or ''}", } - exporter = HttpOTLPSpanExporter( - endpoint=phoenix_endpoint, - headers=phoenix_headers, - timeout=30, - ) + # Test connectivity first + try: + # Create a test exporter with short timeout + test_exporter = HttpOTLPSpanExporter( + endpoint=phoenix_endpoint, + headers=phoenix_headers, + timeout=5, + ) + # Try to export an empty batch to test connectivity + test_exporter.export([]) + logger.info("[Arize/Phoenix] Connectivity test successful") + test_exporter.timeout = 30 + exporter = test_exporter + except Exception as connectivity_error: + logger.warning("[Arize/Phoenix] Connectivity test failed: %s, using shorter timeout", str(connectivity_error)) + raise attributes = { "openinference.project.name": arize_phoenix_config.project or "", @@ -103,78 +113,46 @@ def setup_tracer(arize_phoenix_config: ArizeConfig | PhoenixConfig) -> tuple[tra def datetime_to_nanos(dt: datetime | None) -> int: - """Convert datetime to nanoseconds since epoch for Arize/Phoenix.""" + """Convert datetime to nanoseconds since epoch. If None, use current time.""" if dt is None: dt = datetime.now() return int(dt.timestamp() * 1_000_000_000) -def error_to_string(error: Exception | str | None) -> str: - """Convert an error to a string with traceback information for Arize/Phoenix.""" - error_message = "Empty Stack Trace" - if error: - if isinstance(error, Exception): - string_stacktrace = "".join(traceback.format_exception(error)) - error_message = f"{error.__class__.__name__}: {error}\n\n{string_stacktrace}" - else: - error_message = str(error) - return error_message - - -def set_span_status(current_span: Span, error: Exception | str | None = None): - """Set the status of the current span based on the presence of an error for Arize/Phoenix.""" - if error: - error_string = error_to_string(error) - current_span.set_status(Status(StatusCode.ERROR, error_string)) - - if isinstance(error, Exception): - current_span.record_exception(error) - else: - exception_type = error.__class__.__name__ - exception_message = str(error) - if not exception_message: - exception_message = repr(error) - attributes: dict[str, AttributeValue] = { - exception_attributes.EXCEPTION_TYPE: exception_type, - exception_attributes.EXCEPTION_MESSAGE: exception_message, - exception_attributes.EXCEPTION_ESCAPED: False, - exception_attributes.EXCEPTION_STACKTRACE: error_string, - } - current_span.add_event(name="exception", attributes=attributes) - else: - current_span.set_status(Status(StatusCode.OK)) - - -def safe_json_dumps(obj: Any) -> str: - """A convenience wrapper to ensure that any object can be safely encoded for Arize/Phoenix.""" - return json.dumps(obj, default=str, ensure_ascii=False) - - -def wrap_span_metadata(metadata, **kwargs): - """Add common metatada to all trace entity types for Arize/Phoenix.""" - metadata["created_from"] = "Dify" - metadata.update(kwargs) - return metadata - - -# Mapping from built-in node type strings to OpenInference span kinds. -# Node types not listed here default to CHAIN. -_NODE_TYPE_TO_SPAN_KIND: dict[str, OpenInferenceSpanKindValues] = { - "llm": OpenInferenceSpanKindValues.LLM, - "knowledge-retrieval": OpenInferenceSpanKindValues.RETRIEVER, - "tool": OpenInferenceSpanKindValues.TOOL, - "agent": OpenInferenceSpanKindValues.AGENT, -} - - -def _get_node_span_kind(node_type: str) -> OpenInferenceSpanKindValues: - """Return the OpenInference span kind for a given workflow node type. - - Covers every built-in node type string. Nodes that do not have a - specialised span kind (e.g. ``start``, ``end``, ``if-else``, - ``code``, ``loop``, ``iteration``, etc.) are mapped to ``CHAIN``. +def string_to_trace_id128(string: str | None) -> int: """ - return _NODE_TYPE_TO_SPAN_KIND.get(node_type, OpenInferenceSpanKindValues.CHAIN) + Convert any input string into a stable 128-bit integer trace ID. + + This uses SHA-256 hashing and takes the first 16 bytes (128 bits) of the digest. + It's suitable for generating consistent, unique identifiers from strings. + """ + if string is None: + string = "" + hash_object = hashlib.sha256(string.encode()) + + # Take the first 16 bytes (128 bits) of the hash digest + digest = hash_object.digest()[:16] + + # Convert to a 128-bit integer + return int.from_bytes(digest, byteorder="big") + + +def string_to_span_id64(string: str | None) -> int: + """ + Convert any input string into a stable 64-bit integer span ID. + + This uses SHA-256 hashing and takes the first 8 bytes (64 bits) of the digest. + Generates consistent span IDs from workflow_run_id or node identifiers. + """ + if string is None: + string = "" + hash_object = hashlib.sha256(string.encode()) + + # Take the first 8 bytes (64 bits) of the hash digest + digest = hash_object.digest()[:8] + + # Convert to a 64-bit integer + return int.from_bytes(digest, byteorder="big") class ArizePhoenixDataTrace(BaseTraceInstance): @@ -183,16 +161,17 @@ class ArizePhoenixDataTrace(BaseTraceInstance): arize_phoenix_config: ArizeConfig | PhoenixConfig, ): super().__init__(arize_phoenix_config) + import logging + + logging.basicConfig() + logging.getLogger().setLevel(logging.DEBUG) self.arize_phoenix_config = arize_phoenix_config self.tracer, self.processor = setup_tracer(arize_phoenix_config) self.project = arize_phoenix_config.project self.file_base_url = os.getenv("FILES_URL", "http://127.0.0.1:5001") - self.propagator = TraceContextTextMapPropagator() - self.dify_trace_ids: set[str] = set() def trace(self, trace_info: BaseTraceInfo): - logger.info("[Arize/Phoenix] Trace Entity Info: %s", trace_info) - logger.info("[Arize/Phoenix] Trace Entity Type: %s", type(trace_info)) + logger.info("[Arize/Phoenix] Trace: %s", trace_info) try: if isinstance(trace_info, WorkflowTraceInfo): self.workflow_trace(trace_info) @@ -210,238 +189,584 @@ class ArizePhoenixDataTrace(BaseTraceInstance): self.generate_name_trace(trace_info) except Exception as e: - logger.error("[Arize/Phoenix] Trace Entity Error: %s", str(e), exc_info=True) - raise + # Check if it's a connectivity issue + if "ConnectTimeout" in str(e) or "Connection" in str(e) or "timeout" in str(e).lower(): + logger.warning("[Arize/Phoenix] Phoenix server connectivity issue, skipping trace: %s", str(e)) + return # Skip the trace instead of raising + else: + logger.error("[Arize/Phoenix] Error in the trace: %s", str(e), exc_info=True) + raise def workflow_trace(self, trace_info: WorkflowTraceInfo): - file_list = trace_info.file_list if isinstance(trace_info.file_list, list) else [] + # Get app info for enhanced metadata + app_info = self._get_app_info_from_workflow_run_id(trace_info.workflow_run_id or "") - metadata = wrap_span_metadata( - trace_info.metadata, - trace_id=trace_info.trace_id or "", - message_id=trace_info.message_id or "", - status=trace_info.workflow_run_status or "", - status_message=trace_info.error or "", - level="ERROR" if trace_info.error else "DEFAULT", - trace_entity_type="workflow", - conversation_id=trace_info.conversation_id or "", - workflow_app_log_id=trace_info.workflow_app_log_id or "", - workflow_id=trace_info.workflow_id or "", - tenant_id=trace_info.tenant_id or "", - workflow_run_id=trace_info.workflow_run_id or "", - workflow_run_elapsed_time=trace_info.workflow_run_elapsed_time or 0, - workflow_run_version=trace_info.workflow_run_version or "", - total_tokens=trace_info.total_tokens or 0, - file_list=safe_json_dumps(file_list), - query=trace_info.query or "", + workflow_metadata = { + "workflow_run_id": trace_info.workflow_run_id or "", + "message_id": trace_info.message_id or "", + "workflow_app_log_id": trace_info.workflow_app_log_id or "", + "app_id": app_info["app_id"], + "app_name": app_info["app_name"], + "status": trace_info.workflow_run_status or "", + "status_message": trace_info.error or "", + "level": "ERROR" if trace_info.error else "DEFAULT", + "total_tokens": trace_info.total_tokens or 0, + } + workflow_metadata.update(trace_info.metadata) + + # Check if this is a child workflow called as a tool + parent_trace_context = self._get_parent_workflow_context(trace_info) + + if parent_trace_context: + # Child workflow nests under parent tool in same trace - FOLLOW EXECUTION FLOW + trace_id = parent_trace_context['trace_id'] # Use parent trace ID for single flow + parent_span_id = parent_trace_context['parent_span_id'] + + workflow_metadata["nested_workflow"] = "true" + workflow_metadata["parent_trace_id"] = hex(parent_trace_context['trace_id']) + workflow_metadata["parent_tool_span_id"] = hex(parent_trace_context['parent_span_id']) + + logger.info("[Arize/Phoenix] Child workflow nesting under parent tool in single execution flow") + else: + # Create new trace for root workflow - this becomes the main execution flow + trace_id = string_to_trace_id128(trace_info.trace_id or trace_info.workflow_run_id) + parent_span_id = None + workflow_metadata["nested_workflow"] = "false" + + # Generate consistent workflow span ID from workflow_run_id + span_id = string_to_span_id64(trace_info.workflow_run_id) + context = SpanContext( + trace_id=trace_id, + span_id=span_id, + is_remote=False, + trace_flags=TraceFlags(TraceFlags.SAMPLED), + trace_state=TraceState(), ) - dify_trace_id = trace_info.trace_id or trace_info.message_id or trace_info.workflow_run_id - self.ensure_root_span(dify_trace_id) - root_span_context = self.propagator.extract(carrier=self.carrier) + # Create descriptive workflow span name based on execution flow + app_name_clean = app_info["app_name"].replace(" ", "_").replace("-", "_")[:20] + workflow_id_short = trace_info.workflow_run_id[:8] if trace_info.workflow_run_id else "unknown" + if parent_trace_context: + # Child workflow nested under parent tool - use descriptive nested name + parent_tool_name = parent_trace_context.get('workflow_tool_name', 'UnknownTool') + if parent_tool_name and parent_tool_name not in ['workflow', 'unknown', 'UnknownTool', '']: + parent_tool_clean = parent_tool_name.replace(' ', '_').replace('-', '_')[:15] + workflow_span_name = "nested_{}_{}_{}".format( + app_name_clean, + parent_tool_clean, + workflow_id_short + ) + else: + workflow_span_name = "nested_{}_{}".format(app_name_clean, workflow_id_short) + else: + # Root workflow - this is the main execution flow + workflow_span_name = "{}_{}".format(app_name_clean, workflow_id_short) + + logger.info("[Arize/Phoenix] Creating workflow span: {} ({}workflow)".format( + workflow_span_name, "nested " if parent_trace_context else "main ")) + + # Create workflow span attributes with nested context + import json as json_module # Avoid any local variable conflicts + workflow_attributes = { + SpanAttributes.INPUT_VALUE: json_module.dumps(trace_info.workflow_run_inputs, ensure_ascii=False), + SpanAttributes.OUTPUT_VALUE: json_module.dumps(trace_info.workflow_run_outputs, ensure_ascii=False), + SpanAttributes.OPENINFERENCE_SPAN_KIND: OpenInferenceSpanKindValues.CHAIN.value, + SpanAttributes.METADATA: json_module.dumps(workflow_metadata, ensure_ascii=False), + SpanAttributes.SESSION_ID: trace_info.conversation_id or trace_info.workflow_id or "", + } + + # Set up proper parent context for nesting child workflows under parent tools + if parent_trace_context: + # Child workflow nests under parent tool span - create proper parent context + parent_span_context = SpanContext( + trace_id=parent_trace_context['trace_id'], + span_id=parent_trace_context['parent_span_id'], # Parent TOOL span + is_remote=False, + trace_flags=TraceFlags(TraceFlags.SAMPLED), + trace_state=TraceState(), + ) + workflow_context_parent = trace.set_span_in_context(trace.NonRecordingSpan(parent_span_context)) + + logger.info("[Arize/Phoenix] Child workflow will nest under parent tool span: {}".format( + hex(parent_trace_context['parent_span_id']))) + else: + # Root workflow uses its own context + workflow_context_parent = trace.set_span_in_context(trace.NonRecordingSpan(context)) + + # Use with statement to properly set span context hierarchy workflow_span = self.tracer.start_span( - name=TraceTaskName.WORKFLOW_TRACE.value, - attributes={ - SpanAttributes.OPENINFERENCE_SPAN_KIND: OpenInferenceSpanKindValues.CHAIN.value, - SpanAttributes.INPUT_VALUE: safe_json_dumps(trace_info.workflow_run_inputs), - SpanAttributes.INPUT_MIME_TYPE: OpenInferenceMimeTypeValues.JSON.value, - SpanAttributes.OUTPUT_VALUE: safe_json_dumps(trace_info.workflow_run_outputs), - SpanAttributes.OUTPUT_MIME_TYPE: OpenInferenceMimeTypeValues.JSON.value, - SpanAttributes.METADATA: safe_json_dumps(metadata), - SpanAttributes.SESSION_ID: trace_info.conversation_id or "", - }, + name=workflow_span_name, + attributes=workflow_attributes, start_time=datetime_to_nanos(trace_info.start_time), - context=root_span_context, + context=workflow_context_parent, ) - # Through workflow_run_id, get all_nodes_execution using repository - session_factory = sessionmaker(bind=db.engine) - - # Find the app's creator account - app_id = trace_info.metadata.get("app_id") - if not app_id: - raise ValueError("No app_id found in trace_info metadata") - - service_account = self.get_service_account_with_tenant(app_id) - - workflow_node_execution_repository = DifyCoreRepositoryFactory.create_workflow_node_execution_repository( - session_factory=session_factory, - user=service_account, - app_id=app_id, - triggered_from=WorkflowNodeExecutionTriggeredFrom.WORKFLOW_RUN, - ) - - # Get all executions for this workflow run - workflow_node_executions = workflow_node_execution_repository.get_by_workflow_execution( - workflow_execution_id=trace_info.workflow_run_id - ) + # Set workflow span as current context for child spans + workflow_context = trace.set_span_in_context(workflow_span) try: - for node_execution in workflow_node_executions: - tenant_id = trace_info.tenant_id # Use from trace_info instead - app_id = trace_info.metadata.get("app_id") # Use from trace_info instead - inputs_value = node_execution.inputs or {} - outputs_value = node_execution.outputs or {} + # Process workflow nodes + nodes = self._get_workflow_nodes(trace_info.workflow_run_id) + logger.info("[Arize/Phoenix] Processing {} workflow nodes".format(len(nodes))) - created_at = node_execution.created_at or datetime.now() - elapsed_time = node_execution.elapsed_time - finished_at = created_at + timedelta(seconds=elapsed_time) + # Get the original workflow graph to understand proper relationships + workflow_graph = self._get_workflow_graph(trace_info.workflow_run_id) - process_data = node_execution.process_data or {} - execution_metadata = node_execution.metadata or {} - node_metadata = {str(k): v for k, v in execution_metadata.items()} + # Build comprehensive hierarchy using both graph and execution data + if workflow_graph: + hierarchy_map, node_mapping, execution_context = self._build_comprehensive_hierarchy(workflow_graph, nodes) + else: + hierarchy_map, node_mapping, execution_context = {}, {}, {} - node_metadata.update( - { + # Store created spans by node_id for hierarchy building + node_spans = {} + + # Enhanced node sorting: First by status (succeeded/failed first), then by index, then by created_at + # This ensures completed nodes are processed before failed/pending ones + def sort_nodes_key(node): + # Priority: succeeded > running > failed > other + status_priority = { + 'succeeded': 0, + 'running': 1, + 'failed': 2, + 'stopped': 3 + } + status_val = status_priority.get(getattr(node, 'status', 'other'), 4) + index_val = getattr(node, 'index', 999) # Large number for nodes without index + created_time = getattr(node, 'created_at', datetime.now()) + + return (status_val, index_val, created_time) + + sorted_nodes = sorted(nodes, key=sort_nodes_key) + + # Debug: Log node processing order + logger.info("[Arize/Phoenix] Node processing order:") + for i, node in enumerate(sorted_nodes[:10]): # Log first 10 + logger.info(" {}: {} (type={}, status={}, index={})".format( + i+1, node.id[:8], node.node_type, getattr(node, 'status', 'unknown'), + getattr(node, 'index', 'none'))) + + for node_execution in sorted_nodes: + try: + logger.debug("[Arize/Phoenix] Processing node {} of type {}".format(node_execution.id, node_execution.node_type)) + created_at = node_execution.created_at or datetime.now() + elapsed_time = node_execution.elapsed_time or 0.0 + finished_at = created_at + timedelta(seconds=elapsed_time) + + try: + process_data = json.loads(node_execution.process_data) if node_execution.process_data else {} + except (json.JSONDecodeError, TypeError): + process_data = {} + logger.warning("[Arize/Phoenix] Invalid process_data JSON for node {}".format(node_execution.id)) + + # Enhanced metadata with execution context and decision paths + node_metadata = { "node_id": node_execution.id, + "graph_node_id": getattr(node_execution, 'node_id', node_execution.id), "node_type": node_execution.node_type, "node_status": node_execution.status, - "tenant_id": tenant_id, - "app_id": app_id, + "tenant_id": node_execution.tenant_id, + "app_id": node_execution.app_id, "app_name": node_execution.title, "status": node_execution.status, - "status_message": node_execution.error or "", - "level": "ERROR" if node_execution.status == WorkflowNodeExecutionStatus.FAILED else "DEFAULT", + "level": "ERROR" if node_execution.status != "succeeded" else "DEFAULT", + "node_index": getattr(node_execution, 'index', 0), + "predecessor_node_id": getattr(node_execution, 'predecessor_node_id', None), + "execution_order": getattr(node_execution, 'index', 0), } - ) - # Determine the correct span kind based on node type - span_kind = _get_node_span_kind(node_execution.node_type) - if node_execution.node_type == "llm": - provider = process_data.get("model_provider") - model = process_data.get("model_name") - if provider: - node_metadata["ls_provider"] = provider - if model: - node_metadata["ls_model_name"] = model + # Add decision path information for classifiers and if/else nodes + if node_execution.node_type in ['question-classifier', 'if-else']: + decision_output = self._extract_decision_output(node_execution) + if decision_output: + if node_execution.node_type == 'question-classifier': + node_metadata["decision_class_id"] = decision_output.get('class_id', '') + node_metadata["decision_class_name"] = decision_output.get('class_name', '') + node_metadata["decision_usage"] = decision_output.get('usage', {}) + node_metadata["decision_output"] = decision_output - usage_data = ( - process_data.get("usage", {}) if "usage" in process_data else outputs_value.get("usage", {}) - ) - if usage_data: - node_metadata["total_tokens"] = usage_data.get("total_tokens", 0) - node_metadata["prompt_tokens"] = usage_data.get("prompt_tokens", 0) - node_metadata["completion_tokens"] = usage_data.get("completion_tokens", 0) + # Add loop context + if node_execution.node_type == 'loop': + node_metadata["loop_type"] = "main_loop" + node_metadata["contains_children"] = "true" - workflow_span_context = set_span_in_context(workflow_span) - node_span = self.tracer.start_span( - name=node_execution.node_type, - attributes={ - SpanAttributes.OPENINFERENCE_SPAN_KIND: span_kind.value, - SpanAttributes.INPUT_VALUE: safe_json_dumps(inputs_value), - SpanAttributes.INPUT_MIME_TYPE: OpenInferenceMimeTypeValues.JSON.value, - SpanAttributes.OUTPUT_VALUE: safe_json_dumps(outputs_value), - SpanAttributes.OUTPUT_MIME_TYPE: OpenInferenceMimeTypeValues.JSON.value, - SpanAttributes.METADATA: safe_json_dumps(node_metadata), - SpanAttributes.SESSION_ID: trace_info.conversation_id or "", - }, - start_time=datetime_to_nanos(created_at), - context=workflow_span_context, - ) + # Add tool context with child workflow detection + if node_execution.node_type == 'tool': + # Check if this tool triggers a child workflow + child_workflow_id = self._find_child_workflow_by_timing(node_execution) + if child_workflow_id: + node_metadata["child_workflow_run_id"] = child_workflow_id + node_metadata["nested_workflow"] = "true" + node_metadata["tool_triggers_workflow"] = "true" - try: + # Add input context for workflow tools + if node_execution.inputs: + try: + inputs = json.loads(node_execution.inputs) if isinstance(node_execution.inputs, str) else node_execution.inputs + if isinstance(inputs, dict) and 'topic' in inputs: + node_metadata["workflow_input_topic"] = inputs['topic'] + except: + pass + + if node_execution.execution_metadata: + try: + node_metadata.update(json.loads(node_execution.execution_metadata)) + except (json.JSONDecodeError, TypeError): + logger.warning("[Arize/Phoenix] Invalid execution_metadata JSON for node {}".format(node_execution.id)) + + # Determine the correct span kind based on node type + span_kind = OpenInferenceSpanKindValues.CHAIN.value if node_execution.node_type == "llm": - llm_attributes: dict[str, Any] = { - SpanAttributes.INPUT_VALUE: json.dumps(process_data.get("prompts", []), ensure_ascii=False), - } + span_kind = OpenInferenceSpanKindValues.LLM.value provider = process_data.get("model_provider") model = process_data.get("model_name") if provider: - llm_attributes[SpanAttributes.LLM_PROVIDER] = provider + node_metadata["ls_provider"] = provider if model: - llm_attributes[SpanAttributes.LLM_MODEL_NAME] = model - usage_data = ( - process_data.get("usage", {}) if "usage" in process_data else outputs_value.get("usage", {}) - ) + node_metadata["ls_model_name"] = model + + try: + outputs = json.loads(node_execution.outputs) if node_execution.outputs else {} + outputs_usage = outputs.get("usage", {}) + except (json.JSONDecodeError, TypeError): + outputs_usage = {} + logger.warning("[Arize/Phoenix] Invalid outputs JSON for node {}".format(node_execution.id)) + + usage_data = process_data.get("usage", {}) if "usage" in process_data else outputs_usage if usage_data: - llm_attributes[SpanAttributes.LLM_TOKEN_COUNT_TOTAL] = usage_data.get("total_tokens", 0) - llm_attributes[SpanAttributes.LLM_TOKEN_COUNT_PROMPT] = usage_data.get("prompt_tokens", 0) - llm_attributes[SpanAttributes.LLM_TOKEN_COUNT_COMPLETION] = usage_data.get( - "completion_tokens", 0 - ) - llm_attributes.update(self._construct_llm_attributes(process_data.get("prompts", []))) - node_span.set_attributes(llm_attributes) - finally: - if node_execution.status == WorkflowNodeExecutionStatus.FAILED: - set_span_status(node_span, node_execution.error) + node_metadata["total_tokens"] = usage_data.get("total_tokens", 0) + node_metadata["prompt_tokens"] = usage_data.get("prompt_tokens", 0) + node_metadata["completion_tokens"] = usage_data.get("completion_tokens", 0) + elif node_execution.node_type == "dataset_retrieval": + span_kind = OpenInferenceSpanKindValues.RETRIEVER.value + elif node_execution.node_type == "tool": + span_kind = OpenInferenceSpanKindValues.TOOL.value else: - set_span_status(node_span) - node_span.end(end_time=datetime_to_nanos(finished_at)) + span_kind = OpenInferenceSpanKindValues.CHAIN.value + + # Create consistent node span ID from workflow_run_id + node_id + node_span_id = string_to_span_id64(f"{trace_info.workflow_run_id}_{node_execution.id}") + node_span_context = SpanContext( + trace_id=trace_id, + span_id=node_span_id, + is_remote=False, + trace_flags=TraceFlags(TraceFlags.SAMPLED), + trace_state=TraceState(), + ) + + # Use workflow graph to determine proper hierarchy + node_id = getattr(node_execution, 'node_id', node_execution.id) + node_type = node_execution.node_type + index_val = getattr(node_execution, 'index', 0) + + logger.info("[Arize/Phoenix] Node debug - id: {}, node_id: {}, type: {}, index: {}".format( + node_execution.id, node_id, node_type, index_val)) + + # Enhanced parent context determination with multi-pass processing + parent_context = workflow_context # Default to workflow as parent + parent_node_id = hierarchy_map.get(node_id) + + # Special handling for different node types + if node_execution.node_type == 'start': + # Start nodes are always direct children of workflow + parent_context = workflow_context + logger.info("[Arize/Phoenix] Node {} (start) is direct child of workflow".format(node_id)) + + elif node_execution.node_type == 'end': + # End nodes should be children of the last executed non-end node or workflow + last_non_end_span = None + for existing_node_id, span in node_spans.items(): + if existing_node_id != node_id: # Not self + last_non_end_span = span + + if last_non_end_span: + parent_context = trace.set_span_in_context(last_non_end_span) + logger.info("[Arize/Phoenix] Node {} (end) parented to last executed node".format(node_id)) + else: + parent_context = workflow_context + logger.info("[Arize/Phoenix] Node {} (end) is direct child of workflow".format(node_id)) + + elif parent_node_id and parent_node_id in node_spans: + # This node has a parent in the workflow graph that's already processed + parent_span = node_spans[parent_node_id] + parent_context = trace.set_span_in_context(parent_span) + logger.info("[Arize/Phoenix] Node {} ({}) is child of {} (from graph)".format( + node_id, node_type, parent_node_id)) + + elif node_execution.node_type in ['tool', 'llm', 'http-request']: + # For execution nodes, try to find logical parent from execution context + logical_parent = self._find_logical_parent_span(node_execution, node_spans, execution_context) + if logical_parent: + parent_context = trace.set_span_in_context(logical_parent) + logger.info("[Arize/Phoenix] Node {} ({}) found logical parent".format(node_id, node_type)) + else: + parent_context = workflow_context + logger.info("[Arize/Phoenix] Node {} ({}) using workflow as parent (no logical parent)".format( + node_id, node_type)) + + else: + # Default: use workflow as parent + parent_context = workflow_context + if parent_node_id: + logger.debug("[Arize/Phoenix] Node {} ({}) parent {} not yet processed, using workflow".format( + node_id, node_type, parent_node_id)) + else: + logger.debug("[Arize/Phoenix] Node {} ({}) is direct child of workflow (no parent in graph)".format( + node_id, node_type)) + + # Create descriptive span name using node_type and human-readable title + node_index = getattr(node_execution, 'index', 0) + node_title_clean = getattr(node_execution, 'title', '').replace(' ', '_').replace('-', '_')[:20] + + # Smart naming: avoid repetition when title equals type + if node_title_clean: + # Check if title is just the node_type (case-insensitive) + if node_title_clean.lower() == node_execution.node_type.lower(): + # Use just the node_type when title is redundant + base_name = node_execution.node_type + else: + # Use type_title when they're different + base_name = "{}_{}".format(node_execution.node_type, node_title_clean) + else: + # Fallback to ID if no title + base_name = "{}_{}".format(node_execution.node_type, node_execution.id[:8]) + + logger.debug("[Arize/Phoenix] Node naming: type='{}', title='{}' -> span='{}'".format( + node_execution.node_type, getattr(node_execution, 'title', ''), base_name)) + + # Add decision context to span name + if node_execution.node_type == 'question-classifier': + decision_output = self._extract_decision_output(node_execution) + if decision_output and decision_output.get('class_name'): + # Truncate class name for readability + class_name = decision_output['class_name'][:30].replace('\n', ' ') + span_name = "{}_classifier_[{}]".format(base_name, class_name) + else: + span_name = "{}_classifier".format(base_name) + elif node_execution.node_type == 'if-else': + span_name = "{}_condition".format(base_name) + elif node_execution.node_type == 'loop': + span_name = "{}_main_loop".format(base_name) + elif node_execution.node_type == 'llm': + # Add model info to LLM spans + model_name = process_data.get('model_name', '').replace('claude-3-', 'c3-') + span_name = "{}_{}".format(base_name, model_name) + elif node_execution.node_type == 'workflow': + # Enhanced naming for workflow tool calls + workflow_tool_name = self._get_workflow_tool_name(node_execution) + if workflow_tool_name: + span_name = "{}_tool_[{}]".format(base_name, workflow_tool_name[:20]) + else: + span_name = "{}_workflow_tool".format(base_name) + elif node_execution.node_type == 'tool': + # Check if this tool triggers a child workflow (via timing) + child_workflow_id = self._find_child_workflow_by_timing(node_execution) + if child_workflow_id: + # ENHANCED NAMING SCHEME: Flexible based on tool name availability + tool_name = self._get_tool_name(node_execution) # Don't default to "UnknownTool" + + # Get child workflow app name for better linking + child_app_info = self._get_app_info_from_workflow_run_id(child_workflow_id) + child_workflow_name = child_app_info.get("app_name", "UnknownWorkflow").replace(' ', '_').replace('-', '_')[:15] + child_workflow_id_short = child_workflow_id[:8] + + if tool_name and tool_name not in ['workflow', 'unknown', 'UnknownTool']: + # Format: tool_{toolname}_{workflowname}_{corresponding_subworkflow_runid} + tool_name_clean = tool_name.replace(' ', '_').replace('-', '_')[:15] + span_name = "tool_{}_{}_{}_{}".format( + base_name.replace('tool_', ''), # Remove prefix to avoid tool_tool_ + tool_name_clean, + child_workflow_name, + child_workflow_id_short + ) + logger.info("[Arize/Phoenix] Tool (with name) triggers child workflow - naming: {} -> child: {}".format( + span_name, child_workflow_id[:8])) + else: + # Skip tool_name if unknown, use: tool_{workflowname}_{corresponding_subworkflow_runid} + span_name = "tool_{}_{}_{}".format( + base_name.replace('tool_', ''), # Remove prefix + child_workflow_name, + child_workflow_id_short + ) + logger.info("[Arize/Phoenix] Tool (no name) triggers child workflow - naming: {} -> child: {}".format( + span_name, child_workflow_id[:8])) + else: + # Regular tool + tool_name = self._get_tool_name(node_execution) + if tool_name: + span_name = "{}_[{}]".format(base_name, tool_name[:20]) + else: + span_name = "{}_tool".format(base_name) + elif node_execution.node_type == 'http-request': + # Enhanced naming for API calls + api_info = self._get_api_info(node_execution) + if api_info and api_info.get('method') and api_info.get('url'): + method = api_info['method'] + url_part = api_info['url'].split('/')[-1][:15] if '/' in api_info['url'] else api_info['url'][:15] + span_name = "{}_{}[{}]".format(base_name, method, url_part) + else: + span_name = "{}_api_call".format(base_name) + else: + span_name = base_name + + logger.info("[Arize/Phoenix] Creating node span: {}".format(span_name)) + + # Create span with proper parent context + node_span = self.tracer.start_span( + name=span_name, + attributes={ + SpanAttributes.INPUT_VALUE: node_execution.inputs or "{}", + SpanAttributes.OUTPUT_VALUE: node_execution.outputs or "{}", + SpanAttributes.OPENINFERENCE_SPAN_KIND: span_kind, + SpanAttributes.METADATA: json.dumps(node_metadata, ensure_ascii=False), + SpanAttributes.SESSION_ID: trace_info.conversation_id or "", + }, + start_time=datetime_to_nanos(created_at), + context=parent_context, # Use determined parent context + ) + + # Store the span for potential use as parent by successor nodes + node_id = getattr(node_execution, 'node_id', node_execution.id) + node_spans[node_id] = node_span + + # For tool nodes that trigger child workflows, prepare for direct nesting + if node_execution.node_type == 'tool': + child_workflow_id = self._find_child_workflow_by_timing(node_execution) + if child_workflow_id: + # Mark tool as triggering a child workflow - child will nest under this tool + node_metadata["triggers_child_workflow"] = "true" + node_metadata["child_workflow_run_id"] = child_workflow_id + + logger.info("[Arize/Phoenix] Tool {} will have child workflow {} nested under it".format( + node_execution.id[:8], child_workflow_id[:8])) + + + try: + if node_execution.node_type == "llm": + llm_attributes: dict[str, Any] = { + SpanAttributes.INPUT_VALUE: json.dumps(process_data.get("prompts", []), ensure_ascii=False), + } + provider = process_data.get("model_provider") + model = process_data.get("model_name") + if provider: + llm_attributes[SpanAttributes.LLM_PROVIDER] = provider + if model: + llm_attributes[SpanAttributes.LLM_MODEL_NAME] = model + + try: + llm_outputs = json.loads(node_execution.outputs) if node_execution.outputs else {} + llm_outputs_usage = llm_outputs.get("usage", {}) + except (json.JSONDecodeError, TypeError): + llm_outputs_usage = {} + logger.warning("[Arize/Phoenix] Invalid LLM outputs JSON for node {}".format(node_execution.id)) + + usage_data = ( + process_data.get("usage", {}) if "usage" in process_data else llm_outputs_usage + ) + if usage_data: + llm_attributes[SpanAttributes.LLM_TOKEN_COUNT_TOTAL] = usage_data.get("total_tokens", 0) + llm_attributes[SpanAttributes.LLM_TOKEN_COUNT_PROMPT] = usage_data.get("prompt_tokens", 0) + llm_attributes[SpanAttributes.LLM_TOKEN_COUNT_COMPLETION] = usage_data.get( + "completion_tokens", 0 + ) + llm_attributes.update(self._construct_llm_attributes(process_data.get("prompts", []))) + node_span.set_attributes(llm_attributes) + finally: + node_span.end(end_time=datetime_to_nanos(finished_at)) + + except AttributeError as e: + logger.error(f"[Arize/Phoenix] Node data access error for {getattr(node_execution, 'id', 'unknown')}: {e}") + continue + except (json.JSONDecodeError, TypeError) as e: + logger.error(f"[Arize/Phoenix] JSON parsing error for node {getattr(node_execution, 'id', 'unknown')}: {e}") + continue + except Exception as e: + logger.error(f"[Arize/Phoenix] Unexpected error processing node {getattr(node_execution, 'id', 'unknown')}: {e}") + continue + + logger.info("[Arize/Phoenix] Completed workflow trace with {} nodes".format(len(nodes))) + + except Exception as e: + logger.error(f"[Arize/Phoenix] Workflow tracing failed: {e}", exc_info=True) + raise ValueError(f"[Arize/Phoenix] Workflow trace failed: {str(e)}") finally: - if trace_info.error: - set_span_status(workflow_span, trace_info.error) - else: - set_span_status(workflow_span) workflow_span.end(end_time=datetime_to_nanos(trace_info.end_time)) def message_trace(self, trace_info: MessageTraceInfo): if trace_info.message_data is None: - logger.warning("[Arize/Phoenix] Message data is None, skipping message trace.") return - file_list = trace_info.file_list if isinstance(trace_info.file_list, list) else [] + file_list = cast(list[str], trace_info.file_list) or [] message_file_data: MessageFile | None = trace_info.message_file_data if message_file_data is not None: file_url = f"{self.file_base_url}/{message_file_data.url}" if message_file_data else "" file_list.append(file_url) - metadata = wrap_span_metadata( - trace_info.metadata, - trace_id=trace_info.trace_id or "", - message_id=trace_info.message_id or "", - status=trace_info.message_data.status or "", - status_message=trace_info.error or "", - level="ERROR" if trace_info.error else "DEFAULT", - trace_entity_type="message", - conversation_model=trace_info.conversation_model or "", - message_tokens=trace_info.message_tokens or 0, - answer_tokens=trace_info.answer_tokens or 0, - total_tokens=trace_info.total_tokens or 0, - conversation_mode=trace_info.conversation_mode or "", - gen_ai_server_time_to_first_token=trace_info.gen_ai_server_time_to_first_token or 0, - llm_streaming_time_to_generate=trace_info.llm_streaming_time_to_generate or 0, - is_streaming_request=trace_info.is_streaming_request or False, - user_id=trace_info.message_data.from_account_id or "", - file_list=safe_json_dumps(file_list), - model_provider=trace_info.message_data.model_provider or "", - model_id=trace_info.message_data.model_id or "", - ) + message_metadata = { + "message_id": trace_info.message_id or "", + "conversation_mode": str(trace_info.conversation_mode or ""), + "user_id": trace_info.message_data.from_account_id or "", + "file_list": json.dumps(file_list), + "status": trace_info.message_data.status or "", + "status_message": trace_info.error or "", + "level": "ERROR" if trace_info.error else "DEFAULT", + "total_tokens": trace_info.total_tokens or 0, + "prompt_tokens": trace_info.message_tokens or 0, + "completion_tokens": trace_info.answer_tokens or 0, + "ls_provider": trace_info.message_data.model_provider or "", + "ls_model_name": trace_info.message_data.model_id or "", + } + message_metadata.update(trace_info.metadata) # Add end user data if available if trace_info.message_data.from_end_user_id: - end_user_data: EndUser | None = db.session.get(EndUser, trace_info.message_data.from_end_user_id) + end_user_data: EndUser | None = ( + db.session.query(EndUser).where(EndUser.id == trace_info.message_data.from_end_user_id).first() + ) if end_user_data is not None: - metadata["end_user_id"] = end_user_data.session_id + message_metadata["end_user_id"] = end_user_data.session_id attributes = { - SpanAttributes.OPENINFERENCE_SPAN_KIND: OpenInferenceSpanKindValues.CHAIN.value, SpanAttributes.INPUT_VALUE: trace_info.message_data.query, - SpanAttributes.INPUT_MIME_TYPE: OpenInferenceMimeTypeValues.TEXT.value, SpanAttributes.OUTPUT_VALUE: trace_info.message_data.answer, - SpanAttributes.OUTPUT_MIME_TYPE: OpenInferenceMimeTypeValues.TEXT.value, - SpanAttributes.METADATA: safe_json_dumps(metadata), - SpanAttributes.SESSION_ID: trace_info.message_data.conversation_id or "", + SpanAttributes.OPENINFERENCE_SPAN_KIND: OpenInferenceSpanKindValues.CHAIN.value, + SpanAttributes.METADATA: json.dumps(message_metadata, ensure_ascii=False), + SpanAttributes.SESSION_ID: trace_info.message_data.conversation_id, } - dify_trace_id = trace_info.trace_id or trace_info.message_id - self.ensure_root_span(dify_trace_id) - root_span_context = self.propagator.extract(carrier=self.carrier) + trace_id = string_to_trace_id128(trace_info.trace_id or trace_info.message_id) + message_span_id = string_to_span_id64(trace_info.message_id) + span_context = SpanContext( + trace_id=trace_id, + span_id=message_span_id, + is_remote=False, + trace_flags=TraceFlags(TraceFlags.SAMPLED), + trace_state=TraceState(), + ) message_span = self.tracer.start_span( name=TraceTaskName.MESSAGE_TRACE.value, attributes=attributes, start_time=datetime_to_nanos(trace_info.start_time), - context=root_span_context, + context=trace.set_span_in_context(trace.NonRecordingSpan(span_context)), ) try: + if trace_info.error: + message_span.add_event( + "exception", + attributes={ + "exception.message": trace_info.error, + "exception.type": "Error", + "exception.stacktrace": trace_info.error, + }, + ) + # Convert outputs to string based on type - outputs_mime_type = OpenInferenceMimeTypeValues.TEXT.value if isinstance(trace_info.outputs, dict | list): - outputs_str = safe_json_dumps(trace_info.outputs) - outputs_mime_type = OpenInferenceMimeTypeValues.JSON.value + outputs_str = json.dumps(trace_info.outputs, ensure_ascii=False) elif isinstance(trace_info.outputs, str): outputs_str = trace_info.outputs else: @@ -449,12 +774,10 @@ class ArizePhoenixDataTrace(BaseTraceInstance): llm_attributes = { SpanAttributes.OPENINFERENCE_SPAN_KIND: OpenInferenceSpanKindValues.LLM.value, - SpanAttributes.INPUT_VALUE: safe_json_dumps(trace_info.inputs), - SpanAttributes.INPUT_MIME_TYPE: OpenInferenceMimeTypeValues.JSON.value, + SpanAttributes.INPUT_VALUE: json.dumps(trace_info.inputs, ensure_ascii=False), SpanAttributes.OUTPUT_VALUE: outputs_str, - SpanAttributes.OUTPUT_MIME_TYPE: outputs_mime_type, - SpanAttributes.METADATA: safe_json_dumps(metadata), - SpanAttributes.SESSION_ID: trace_info.message_data.conversation_id or "", + SpanAttributes.METADATA: json.dumps(message_metadata, ensure_ascii=False), + SpanAttributes.SESSION_ID: trace_info.message_data.conversation_id, } llm_attributes.update(self._construct_llm_attributes(trace_info.inputs)) if trace_info.total_tokens is not None and trace_info.total_tokens > 0: @@ -470,176 +793,195 @@ class ArizePhoenixDataTrace(BaseTraceInstance): llm_attributes[SpanAttributes.LLM_PROVIDER] = trace_info.message_data.model_provider if trace_info.message_data and trace_info.message_data.message_metadata: - metadata_dict = JSON_DICT_ADAPTER.validate_json(trace_info.message_data.message_metadata) - if model_params := metadata_dict.get("model_parameters"): + metadata_dict = json.loads(trace_info.message_data.message_metadata) + model_params = metadata_dict.get("model_parameters") + if model_params: llm_attributes[SpanAttributes.LLM_INVOCATION_PARAMETERS] = json.dumps(model_params) - message_span_context = set_span_in_context(message_span) llm_span = self.tracer.start_span( name="llm", attributes=llm_attributes, start_time=datetime_to_nanos(trace_info.start_time), - context=message_span_context, + context=trace.set_span_in_context(trace.NonRecordingSpan(span_context)), ) try: - if trace_info.message_data.error: - set_span_status(llm_span, trace_info.message_data.error) - else: - set_span_status(llm_span) + if trace_info.error: + llm_span.add_event( + "exception", + attributes={ + "exception.message": trace_info.error, + "exception.type": "Error", + "exception.stacktrace": trace_info.error, + }, + ) finally: llm_span.end(end_time=datetime_to_nanos(trace_info.end_time)) finally: - if trace_info.error: - set_span_status(message_span, trace_info.error) - else: - set_span_status(message_span) message_span.end(end_time=datetime_to_nanos(trace_info.end_time)) def moderation_trace(self, trace_info: ModerationTraceInfo): if trace_info.message_data is None: - logger.warning("[Arize/Phoenix] Message data is None, skipping moderation trace.") return - metadata = wrap_span_metadata( - trace_info.metadata, - trace_id=trace_info.trace_id or "", - message_id=trace_info.message_id or "", - status=trace_info.message_data.status or "", - status_message=trace_info.message_data.error or "", - level="ERROR" if trace_info.message_data.error else "DEFAULT", - trace_entity_type="moderation", - model_provider=trace_info.message_data.model_provider or "", - model_id=trace_info.message_data.model_id or "", - ) + metadata = { + "message_id": trace_info.message_id, + "tool_name": "moderation", + "status": trace_info.message_data.status, + "status_message": trace_info.message_data.error or "", + "level": "ERROR" if trace_info.message_data.error else "DEFAULT", + } + metadata.update(trace_info.metadata) - dify_trace_id = trace_info.trace_id or trace_info.message_id - self.ensure_root_span(dify_trace_id) - root_span_context = self.propagator.extract(carrier=self.carrier) + trace_id = string_to_trace_id128(trace_info.message_id) + span_id = string_to_span_id64(trace_info.message_id) + context = SpanContext( + trace_id=trace_id, + span_id=span_id, + is_remote=False, + trace_flags=TraceFlags(TraceFlags.SAMPLED), + trace_state=TraceState(), + ) span = self.tracer.start_span( name=TraceTaskName.MODERATION_TRACE.value, attributes={ - SpanAttributes.OPENINFERENCE_SPAN_KIND: OpenInferenceSpanKindValues.TOOL.value, - SpanAttributes.INPUT_VALUE: safe_json_dumps(trace_info.inputs), - SpanAttributes.INPUT_MIME_TYPE: OpenInferenceMimeTypeValues.JSON.value, - SpanAttributes.OUTPUT_VALUE: safe_json_dumps( + SpanAttributes.INPUT_VALUE: json.dumps(trace_info.inputs, ensure_ascii=False), + SpanAttributes.OUTPUT_VALUE: json.dumps( { - "flagged": trace_info.flagged, "action": trace_info.action, + "flagged": trace_info.flagged, "preset_response": trace_info.preset_response, - "query": trace_info.query, - } + "inputs": trace_info.inputs, + }, + ensure_ascii=False, ), - SpanAttributes.OUTPUT_MIME_TYPE: OpenInferenceMimeTypeValues.JSON.value, - SpanAttributes.METADATA: safe_json_dumps(metadata), + SpanAttributes.OPENINFERENCE_SPAN_KIND: OpenInferenceSpanKindValues.CHAIN.value, + SpanAttributes.METADATA: json.dumps(metadata, ensure_ascii=False), }, start_time=datetime_to_nanos(trace_info.start_time), - context=root_span_context, + context=trace.set_span_in_context(trace.NonRecordingSpan(context)), ) try: if trace_info.message_data.error: - set_span_status(span, trace_info.message_data.error) - else: - set_span_status(span) + span.add_event( + "exception", + attributes={ + "exception.message": trace_info.message_data.error, + "exception.type": "Error", + "exception.stacktrace": trace_info.message_data.error, + }, + ) finally: span.end(end_time=datetime_to_nanos(trace_info.end_time)) def suggested_question_trace(self, trace_info: SuggestedQuestionTraceInfo): if trace_info.message_data is None: - logger.warning("[Arize/Phoenix] Message data is None, skipping suggested question trace.") return start_time = trace_info.start_time or trace_info.message_data.created_at end_time = trace_info.end_time or trace_info.message_data.updated_at - metadata = wrap_span_metadata( - trace_info.metadata, - trace_id=trace_info.trace_id or "", - message_id=trace_info.message_id or "", - status=trace_info.status or "", - status_message=trace_info.status_message or "", - level=trace_info.level or "", - trace_entity_type="suggested_question", - total_tokens=trace_info.total_tokens or 0, - from_account_id=trace_info.from_account_id or "", - agent_based=trace_info.agent_based or False, - from_source=trace_info.from_source or "", - model_provider=trace_info.model_provider or "", - model_id=trace_info.model_id or "", - workflow_run_id=trace_info.workflow_run_id or "", - ) + metadata = { + "message_id": trace_info.message_id, + "tool_name": "suggested_question", + "status": trace_info.status, + "status_message": trace_info.error or "", + "level": "ERROR" if trace_info.error else "DEFAULT", + "total_tokens": trace_info.total_tokens, + "ls_provider": trace_info.model_provider or "", + "ls_model_name": trace_info.model_id or "", + } + metadata.update(trace_info.metadata) - dify_trace_id = trace_info.trace_id or trace_info.message_id - self.ensure_root_span(dify_trace_id) - root_span_context = self.propagator.extract(carrier=self.carrier) + trace_id = string_to_trace_id128(trace_info.message_id) + span_id = string_to_span_id64(trace_info.message_id) + context = SpanContext( + trace_id=trace_id, + span_id=span_id, + is_remote=False, + trace_flags=TraceFlags(TraceFlags.SAMPLED), + trace_state=TraceState(), + ) span = self.tracer.start_span( name=TraceTaskName.SUGGESTED_QUESTION_TRACE.value, attributes={ - SpanAttributes.OPENINFERENCE_SPAN_KIND: OpenInferenceSpanKindValues.TOOL.value, - SpanAttributes.INPUT_VALUE: safe_json_dumps(trace_info.inputs), - SpanAttributes.INPUT_MIME_TYPE: OpenInferenceMimeTypeValues.JSON.value, - SpanAttributes.OUTPUT_VALUE: safe_json_dumps(trace_info.suggested_question), - SpanAttributes.OUTPUT_MIME_TYPE: OpenInferenceMimeTypeValues.JSON.value, - SpanAttributes.METADATA: safe_json_dumps(metadata), + SpanAttributes.INPUT_VALUE: json.dumps(trace_info.inputs, ensure_ascii=False), + SpanAttributes.OUTPUT_VALUE: json.dumps(trace_info.suggested_question, ensure_ascii=False), + SpanAttributes.OPENINFERENCE_SPAN_KIND: OpenInferenceSpanKindValues.CHAIN.value, + SpanAttributes.METADATA: json.dumps(metadata, ensure_ascii=False), }, start_time=datetime_to_nanos(start_time), - context=root_span_context, + context=trace.set_span_in_context(trace.NonRecordingSpan(context)), ) try: if trace_info.error: - set_span_status(span, trace_info.error) - else: - set_span_status(span) + span.add_event( + "exception", + attributes={ + "exception.message": trace_info.error, + "exception.type": "Error", + "exception.stacktrace": trace_info.error, + }, + ) finally: span.end(end_time=datetime_to_nanos(end_time)) def dataset_retrieval_trace(self, trace_info: DatasetRetrievalTraceInfo): if trace_info.message_data is None: - logger.warning("[Arize/Phoenix] Message data is None, skipping dataset retrieval trace.") return start_time = trace_info.start_time or trace_info.message_data.created_at end_time = trace_info.end_time or trace_info.message_data.updated_at - metadata = wrap_span_metadata( - trace_info.metadata, - trace_id=trace_info.trace_id or "", - message_id=trace_info.message_id or "", - status=trace_info.message_data.status or "", - status_message=trace_info.error or "", - level="ERROR" if trace_info.error else "DEFAULT", - trace_entity_type="dataset_retrieval", - model_provider=trace_info.message_data.model_provider or "", - model_id=trace_info.message_data.model_id or "", - ) + metadata = { + "message_id": trace_info.message_id, + "tool_name": "dataset_retrieval", + "status": trace_info.message_data.status, + "status_message": trace_info.message_data.error or "", + "level": "ERROR" if trace_info.message_data.error else "DEFAULT", + "ls_provider": trace_info.message_data.model_provider or "", + "ls_model_name": trace_info.message_data.model_id or "", + } + metadata.update(trace_info.metadata) - dify_trace_id = trace_info.trace_id or trace_info.message_id - self.ensure_root_span(dify_trace_id) - root_span_context = self.propagator.extract(carrier=self.carrier) + trace_id = string_to_trace_id128(trace_info.message_id) + span_id = string_to_span_id64(trace_info.message_id) + context = SpanContext( + trace_id=trace_id, + span_id=span_id, + is_remote=False, + trace_flags=TraceFlags(TraceFlags.SAMPLED), + trace_state=TraceState(), + ) span = self.tracer.start_span( name=TraceTaskName.DATASET_RETRIEVAL_TRACE.value, attributes={ + SpanAttributes.INPUT_VALUE: json.dumps(trace_info.inputs, ensure_ascii=False), + SpanAttributes.OUTPUT_VALUE: json.dumps({"documents": trace_info.documents}, ensure_ascii=False), SpanAttributes.OPENINFERENCE_SPAN_KIND: OpenInferenceSpanKindValues.RETRIEVER.value, - SpanAttributes.INPUT_VALUE: safe_json_dumps(trace_info.inputs), - SpanAttributes.INPUT_MIME_TYPE: OpenInferenceMimeTypeValues.JSON.value, - SpanAttributes.OUTPUT_VALUE: safe_json_dumps({"documents": trace_info.documents}), - SpanAttributes.OUTPUT_MIME_TYPE: OpenInferenceMimeTypeValues.JSON.value, - SpanAttributes.METADATA: safe_json_dumps(metadata), + SpanAttributes.METADATA: json.dumps(metadata, ensure_ascii=False), + "start_time": start_time.isoformat() if start_time else "", + "end_time": end_time.isoformat() if end_time else "", }, start_time=datetime_to_nanos(start_time), - context=root_span_context, + context=trace.set_span_in_context(trace.NonRecordingSpan(context)), ) try: - if trace_info.error: - set_span_status(span, trace_info.error) - else: - set_span_status(span) + if trace_info.message_data.error: + span.add_event( + "exception", + attributes={ + "exception.message": trace_info.message_data.error, + "exception.type": "Error", + "exception.stacktrace": trace_info.message_data.error, + }, + ) finally: span.end(end_time=datetime_to_nanos(end_time)) @@ -648,110 +990,110 @@ class ArizePhoenixDataTrace(BaseTraceInstance): logger.warning("[Arize/Phoenix] Message data is None, skipping tool trace.") return - metadata = wrap_span_metadata( - trace_info.metadata, - trace_id=trace_info.trace_id or "", - message_id=trace_info.message_id or "", - status=trace_info.message_data.status or "", - status_message=trace_info.error or "", - level="ERROR" if trace_info.error else "DEFAULT", - trace_entity_type="tool", - tool_config=safe_json_dumps(trace_info.tool_config), - time_cost=trace_info.time_cost or 0, - file_url=trace_info.file_url or "", + metadata = { + "message_id": trace_info.message_id, + "tool_config": json.dumps(trace_info.tool_config, ensure_ascii=False), + } + + trace_id = string_to_trace_id128(trace_info.message_id) + tool_span_id = string_to_span_id64(f"{trace_info.message_id}_{trace_info.tool_name}") + logger.info("[Arize/Phoenix] Creating tool trace with trace_id: %s, span_id: %s", trace_id, tool_span_id) + + # Create span context with the same trace_id as the parent + # todo: Create with the appropriate parent span context, so that the tool span is + # a child of the appropriate span (e.g. message span) + span_context = SpanContext( + trace_id=trace_id, + span_id=tool_span_id, + is_remote=False, + trace_flags=TraceFlags(TraceFlags.SAMPLED), + trace_state=TraceState(), ) - dify_trace_id = trace_info.trace_id or trace_info.message_id - self.ensure_root_span(dify_trace_id) - root_span_context = self.propagator.extract(carrier=self.carrier) + tool_params_str = ( + json.dumps(trace_info.tool_parameters, ensure_ascii=False) + if isinstance(trace_info.tool_parameters, dict) + else str(trace_info.tool_parameters) + ) span = self.tracer.start_span( name=trace_info.tool_name, attributes={ - SpanAttributes.OPENINFERENCE_SPAN_KIND: OpenInferenceSpanKindValues.TOOL.value, - SpanAttributes.INPUT_VALUE: safe_json_dumps(trace_info.tool_inputs), - SpanAttributes.INPUT_MIME_TYPE: OpenInferenceMimeTypeValues.JSON.value, + SpanAttributes.INPUT_VALUE: json.dumps(trace_info.tool_inputs, ensure_ascii=False), SpanAttributes.OUTPUT_VALUE: trace_info.tool_outputs, - SpanAttributes.OUTPUT_MIME_TYPE: OpenInferenceMimeTypeValues.TEXT.value, - SpanAttributes.METADATA: safe_json_dumps(metadata), + SpanAttributes.OPENINFERENCE_SPAN_KIND: OpenInferenceSpanKindValues.TOOL.value, + SpanAttributes.METADATA: json.dumps(metadata, ensure_ascii=False), SpanAttributes.TOOL_NAME: trace_info.tool_name, - SpanAttributes.TOOL_PARAMETERS: safe_json_dumps(trace_info.tool_parameters), + SpanAttributes.TOOL_PARAMETERS: tool_params_str, }, start_time=datetime_to_nanos(trace_info.start_time), - context=root_span_context, + context=trace.set_span_in_context(trace.NonRecordingSpan(span_context)), ) try: if trace_info.error: - set_span_status(span, trace_info.error) - else: - set_span_status(span) + span.add_event( + "exception", + attributes={ + "exception.message": trace_info.error, + "exception.type": "Error", + "exception.stacktrace": trace_info.error, + }, + ) finally: span.end(end_time=datetime_to_nanos(trace_info.end_time)) def generate_name_trace(self, trace_info: GenerateNameTraceInfo): if trace_info.message_data is None: - logger.warning("[Arize/Phoenix] Message data is None, skipping generate name trace.") return - metadata = wrap_span_metadata( - trace_info.metadata, - trace_id=trace_info.trace_id or "", - message_id=trace_info.message_id or "", - status=trace_info.message_data.status or "", - status_message=trace_info.message_data.error or "", - level="ERROR" if trace_info.message_data.error else "DEFAULT", - trace_entity_type="generate_name", - model_provider=trace_info.message_data.model_provider or "", - model_id=trace_info.message_data.model_id or "", - conversation_id=trace_info.conversation_id or "", - tenant_id=trace_info.tenant_id, - ) + metadata = { + "project_name": self.project, + "message_id": trace_info.message_id, + "status": trace_info.message_data.status, + "status_message": trace_info.message_data.error or "", + "level": "ERROR" if trace_info.message_data.error else "DEFAULT", + } + metadata.update(trace_info.metadata) - dify_trace_id = trace_info.trace_id or trace_info.message_id or trace_info.conversation_id - self.ensure_root_span(dify_trace_id) - root_span_context = self.propagator.extract(carrier=self.carrier) + trace_id = string_to_trace_id128(trace_info.message_id) + span_id = string_to_span_id64(trace_info.message_id) + context = SpanContext( + trace_id=trace_id, + span_id=span_id, + is_remote=False, + trace_flags=TraceFlags(TraceFlags.SAMPLED), + trace_state=TraceState(), + ) span = self.tracer.start_span( name=TraceTaskName.GENERATE_NAME_TRACE.value, attributes={ + SpanAttributes.INPUT_VALUE: json.dumps(trace_info.inputs, ensure_ascii=False), + SpanAttributes.OUTPUT_VALUE: json.dumps(trace_info.outputs, ensure_ascii=False), SpanAttributes.OPENINFERENCE_SPAN_KIND: OpenInferenceSpanKindValues.CHAIN.value, - SpanAttributes.INPUT_VALUE: safe_json_dumps(trace_info.inputs), - SpanAttributes.INPUT_MIME_TYPE: OpenInferenceMimeTypeValues.JSON.value, - SpanAttributes.OUTPUT_VALUE: safe_json_dumps(trace_info.outputs), - SpanAttributes.OUTPUT_MIME_TYPE: OpenInferenceMimeTypeValues.JSON.value, - SpanAttributes.METADATA: safe_json_dumps(metadata), - SpanAttributes.SESSION_ID: trace_info.conversation_id or "", + SpanAttributes.METADATA: json.dumps(metadata, ensure_ascii=False), + SpanAttributes.SESSION_ID: trace_info.message_data.conversation_id, + "start_time": trace_info.start_time.isoformat() if trace_info.start_time else "", + "end_time": trace_info.end_time.isoformat() if trace_info.end_time else "", }, start_time=datetime_to_nanos(trace_info.start_time), - context=root_span_context, + context=trace.set_span_in_context(trace.NonRecordingSpan(context)), ) try: if trace_info.message_data.error: - set_span_status(span, trace_info.message_data.error) - else: - set_span_status(span) + span.add_event( + "exception", + attributes={ + "exception.message": trace_info.message_data.error, + "exception.type": "Error", + "exception.stacktrace": trace_info.message_data.error, + }, + ) finally: span.end(end_time=datetime_to_nanos(trace_info.end_time)) - def ensure_root_span(self, dify_trace_id: str | None): - """Ensure a unique root span exists for the given Dify trace ID.""" - if str(dify_trace_id) not in self.dify_trace_ids: - self.carrier: dict[str, str] = {} - - root_span = self.tracer.start_span(name="Dify") - root_span.set_attribute(SpanAttributes.OPENINFERENCE_SPAN_KIND, OpenInferenceSpanKindValues.CHAIN.value) - root_span.set_attribute("dify_project_name", str(self.project)) - root_span.set_attribute("dify_trace_id", str(dify_trace_id)) - - with use_span(root_span, end_on_exit=False): - self.propagator.inject(carrier=self.carrier) - - set_span_status(root_span) - root_span.end() - self.dify_trace_ids.add(str(dify_trace_id)) - def api_check(self): try: with self.tracer.start_span("api_check") as span: @@ -762,87 +1104,1255 @@ class ArizePhoenixDataTrace(BaseTraceInstance): raise ValueError(f"[Arize/Phoenix] API check failed: {str(e)}") def get_project_url(self): - """Build a redirect URL that forwards the user to the correct project for Arize/Phoenix.""" try: - project_name = self.arize_phoenix_config.project - endpoint = self.arize_phoenix_config.endpoint.rstrip("/") + if self.arize_phoenix_config.endpoint == "https://otlp.arize.com": + return "https://app.arize.com/" + else: + return f"{self.arize_phoenix_config.endpoint}/projects/" + except Exception as e: + logger.info("[Arize/Phoenix] Get run url failed: %s", str(e), exc_info=True) + raise ValueError(f"[Arize/Phoenix] Get run url failed: {str(e)}") - # Arize - if isinstance(self.arize_phoenix_config, ArizeConfig): - return f"https://app.arize.com/?redirect_project_name={project_name}" + def _get_workflow_nodes(self, workflow_run_id: str): + """Helper method to get workflow nodes""" + workflow_nodes = db.session.execute( + select(WorkflowNodeExecutionModel).where( + WorkflowNodeExecutionModel.workflow_run_id == workflow_run_id + ) + ).scalars().all() - # Phoenix - return f"{endpoint}/projects/?redirect_project_name={project_name}" + # Debug: Check what we're actually getting + logger.info("[Arize/Phoenix] Query returned {} nodes".format(len(workflow_nodes))) + if workflow_nodes: + first_node = workflow_nodes[0] + logger.info("[Arize/Phoenix] First node type: {}".format(type(first_node))) + logger.info("[Arize/Phoenix] First node value: {}".format(first_node)) + + return workflow_nodes + + def _get_workflow_graph(self, workflow_run_id: str): + """Get the original workflow graph to understand node relationships""" + try: + # Test database connectivity first + logger.info("[Arize/Phoenix] Testing database access for workflow_run_id: {}".format(workflow_run_id)) + + # Test basic database connection + test_result = db.session.execute(select(text('1'))).scalar() + logger.info("[Arize/Phoenix] Database connection test result: {}".format(test_result)) + + # Import here to avoid circular imports + from models.workflow import WorkflowRun, Workflow + + # Test WorkflowRun table access + logger.info("[Arize/Phoenix] Attempting to query workflow_runs table...") + workflow_run = db.session.execute( + select(WorkflowRun).where(WorkflowRun.id == workflow_run_id) + ).scalars().first() + + if not workflow_run: + # Try to find any workflow runs to verify table access + logger.info("[Arize/Phoenix] Specific workflow run not found, checking if table has any records...") + any_run = db.session.execute(select(WorkflowRun).limit(1)).scalars().first() + if any_run: + logger.info("[Arize/Phoenix] WorkflowRun table accessible, but run {} not found".format(workflow_run_id)) + else: + logger.info("[Arize/Phoenix] WorkflowRun table appears empty or inaccessible") + return None + + logger.info("[Arize/Phoenix] Found workflow run, workflow_id: {}".format(workflow_run.workflow_id)) + + # Test Workflow table access + logger.info("[Arize/Phoenix] Attempting to query workflows table...") + workflow = db.session.execute( + select(Workflow).where(Workflow.id == workflow_run.workflow_id) + ).scalars().first() + + if not workflow: + logger.warning("[Arize/Phoenix] Workflow not found for id: {}".format(workflow_run.workflow_id)) + return None + + if not workflow.graph: + logger.warning("[Arize/Phoenix] Workflow graph is empty for workflow: {}".format(workflow_run.workflow_id)) + return None + + logger.info("[Arize/Phoenix] Found workflow graph, type: {}, length: {}".format( + type(workflow.graph), len(str(workflow.graph)))) + + import json + graph = json.loads(workflow.graph) if isinstance(workflow.graph, str) else workflow.graph + logger.info("[Arize/Phoenix] Parsed workflow graph with {} nodes, {} edges".format( + len(graph.get('nodes', [])), len(graph.get('edges', [])))) + + return graph except Exception as e: - logger.info("[Arize/Phoenix] Failed to construct project URL: %s", str(e), exc_info=True) - raise ValueError(f"[Arize/Phoenix] Failed to construct project URL: {str(e)}") + logger.error("[Arize/Phoenix] Failed to get workflow graph: {}".format(str(e)), exc_info=True) + return None - def _construct_llm_attributes(self, prompts: dict[str, Any] | list[Any] | str | None) -> dict[str, str]: - """Construct LLM attributes with passed prompts for Arize/Phoenix.""" - attributes: dict[str, str] = {} + def _build_comprehensive_hierarchy(self, workflow_graph, execution_nodes): + """Build comprehensive hierarchy using both graph design AND execution data""" + hierarchy_map = {} + node_mapping = {} + execution_context = {} - def set_attribute(path: str, value: object) -> None: - """Store an attribute safely as a string.""" - if value is None: - return - try: - if isinstance(value, (dict, list)): - value = safe_json_dumps(value) - attributes[path] = str(value) - except Exception: - attributes[path] = str(value) + # 1. Build graph-based relationships + if workflow_graph and 'edges' in workflow_graph: + for edge in workflow_graph['edges']: + source = edge.get('source') + target = edge.get('target') + if source and target: + hierarchy_map[target] = source - def set_message_attribute(message_index: int, key: str, value: object) -> None: - path = f"{SpanAttributes.LLM_INPUT_MESSAGES}.{message_index}.{key}" - set_attribute(path, value) + # 2. Map execution nodes to graph nodes + for exec_node in execution_nodes: + node_id = getattr(exec_node, 'node_id', exec_node.id) + exec_id = exec_node.id + node_mapping[exec_id] = node_id - def set_tool_call_attributes( - message_index: int, tool_index: int, tool_call: dict[str, Any] | object | None - ) -> None: - """Extract and assign tool call details safely.""" - if not tool_call: - return + # Store execution context + execution_context[exec_id] = { + 'node_type': exec_node.node_type, + 'status': exec_node.status, + 'index': getattr(exec_node, 'index', 0), + 'created_at': exec_node.created_at + } - def safe_get(obj, key, default=None): - if isinstance(obj, dict): - return obj.get(key, default) - return getattr(obj, key, default) + # 3. Handle decision node outputs for conditional hierarchy + decision_relationships = self._resolve_decision_paths(execution_nodes, hierarchy_map) + hierarchy_map.update(decision_relationships) - function_obj = safe_get(tool_call, "function", {}) - function_name = safe_get(function_obj, "name", "") - function_args = safe_get(function_obj, "arguments", {}) - call_id = safe_get(tool_call, "id", "") + # 4. Handle loop-based execution + loop_relationships = self._resolve_loop_hierarchy(execution_nodes, workflow_graph) + hierarchy_map.update(loop_relationships) - base_path = ( - f"{SpanAttributes.LLM_INPUT_MESSAGES}." - f"{message_index}.{MessageAttributes.MESSAGE_TOOL_CALLS}.{tool_index}" - ) + logger.info("[Arize/Phoenix] Built comprehensive hierarchy:") + logger.info(" Graph relationships: {}".format(len(hierarchy_map))) + logger.info(" Execution mappings: {}".format(len(node_mapping))) + logger.info(" Decision paths: {}".format(len(decision_relationships))) + logger.info(" Loop relationships: {}".format(len(loop_relationships))) - set_attribute(f"{base_path}.{ToolCallAttributes.TOOL_CALL_FUNCTION_NAME}", function_name) - set_attribute(f"{base_path}.{ToolCallAttributes.TOOL_CALL_FUNCTION_ARGUMENTS_JSON}", function_args) - set_attribute(f"{base_path}.{ToolCallAttributes.TOOL_CALL_ID}", call_id) + return hierarchy_map, node_mapping, execution_context - # Handle list of messages - if isinstance(prompts, list): - for message_index, message in enumerate(prompts): - if not isinstance(message, dict): + def _get_parent_workflow_context(self, trace_info): + """Detect if this workflow is called as a tool from another workflow""" + try: + # Check if this workflow run has a parent workflow context + # This could be stored in trace_info.metadata or determined by checking + # if there's a recent workflow tool execution that matches this run + + # Look for parent context in metadata + if hasattr(trace_info, 'metadata') and trace_info.metadata: + parent_trace_id = trace_info.metadata.get('parent_trace_id') + parent_span_id = trace_info.metadata.get('parent_span_id') + + if parent_trace_id and parent_span_id: + return { + 'trace_id': int(parent_trace_id), + 'parent_span_id': int(parent_span_id) + } + + # Alternative: Check if there's a workflow tool node in another workflow + # that references this workflow_run_id (more complex database query) + parent_context = self._find_parent_workflow_tool(trace_info.workflow_run_id) + if parent_context: + return parent_context + + except Exception as e: + logger.warning("[Arize/Phoenix] Could not determine parent workflow context: {}".format(str(e))) + + return None + + def _find_parent_workflow_tool(self, child_workflow_run_id): + """Find parent workflow that called this workflow as a tool""" + try: + from models.workflow import WorkflowRun, Workflow, WorkflowNodeExecutionModel + from models.tools import WorkflowToolProvider + + # Step 1: Get child workflow's app_id + child_workflow_app_id = self._get_workflow_app_id(child_workflow_run_id) + if not child_workflow_app_id: + logger.warning("[Arize/Phoenix] Could not get app_id for workflow_run: {}".format(child_workflow_run_id[:8])) + return None + + # Step 2: Check if this app is registered as a workflow tool + workflow_tool = db.session.execute( + select(WorkflowToolProvider).where( + WorkflowToolProvider.app_id == child_workflow_app_id + ) + ).scalars().first() + + if not workflow_tool: + logger.debug("[Arize/Phoenix] App {} is not registered as a workflow tool".format(child_workflow_app_id[:8])) + return None + + logger.info("[Arize/Phoenix] Found workflow tool registration: {} (app: {})".format( + workflow_tool.name, child_workflow_app_id[:8])) + + # Step 3: Get child workflow timing for correlation + child_run = db.session.execute( + select(WorkflowRun).where(WorkflowRun.id == child_workflow_run_id) + ).scalars().first() + + if not child_run or not child_run.created_at: + return None + + # Step 4: Look for recent tool executions that might have called this workflow tool + # Wider window for tools in loops/iterations that may have delays + start_window = child_run.created_at - timedelta(seconds=60) # Increased from 30s + end_window = child_run.created_at + timedelta(seconds=15) # Increased from 5s + + logger.debug("[Arize/Phoenix] Child workflow timing - start: {}, search window: {} to {}".format( + child_run.created_at, start_window, end_window)) + + # Search for tool nodes that executed this specific workflow tool + potential_parent_tools = db.session.execute( + select(WorkflowNodeExecutionModel).where( + WorkflowNodeExecutionModel.node_type.in_(['workflow', 'tool']), + WorkflowNodeExecutionModel.created_at >= start_window, + WorkflowNodeExecutionModel.created_at <= end_window + ).order_by(WorkflowNodeExecutionModel.created_at.desc()) + ).scalars().all() + + logger.info("[Arize/Phoenix] Found {} potential parent tool executions in time window for workflow '{}'".format( + len(potential_parent_tools), workflow_tool.name)) + + # Debug: Log details of each potential parent tool + for i, tool in enumerate(potential_parent_tools[:5]): # Log first 5 only + try: + inputs = json.loads(tool.inputs) if tool.inputs else {} + topic = inputs.get('topic', 'No topic') + logger.debug("[Arize/Phoenix] Potential parent {}: node_id={}, type={}, topic='{}', time={}".format( + i + 1, tool.id[:8], tool.node_type, topic, tool.created_at)) + except Exception: + logger.debug("[Arize/Phoenix] Potential parent {}: node_id={}, type={}, time={}".format( + i + 1, tool.id[:8], tool.node_type, tool.created_at)) + + # Step 5: Direct SQL approach - find workflows created after tool execution + logger.debug("[Arize/Phoenix] Trying direct SQL approach for tool->workflow mapping") + direct_parent = self._find_parent_tool_by_sql(child_workflow_run_id, potential_parent_tools) + if direct_parent: + return direct_parent + + # Step 6: Check each tool execution to see if it called our workflow tool + for tool_node in potential_parent_tools: + if self._tool_called_workflow_tool(tool_node, workflow_tool, child_workflow_run_id): + parent_run_id = tool_node.workflow_run_id + parent_trace_id = string_to_trace_id128(parent_run_id) + + logger.info("[Arize/Phoenix] Found parent workflow tool: {} -> child workflow: {} (tool: {})".format( + tool_node.id[:8], child_workflow_run_id[:8], workflow_tool.name)) + + # Get parent app name for additional context + parent_app_info = self._get_app_info_from_workflow_run_id(parent_run_id) + + return { + 'trace_id': parent_trace_id, + 'parent_span_id': string_to_span_id64(f"{parent_run_id}_{tool_node.id}"), + 'parent_workflow_run_id': parent_run_id, # Add parent workflow run ID + 'workflow_tool_name': workflow_tool.name, + 'parent_app_name': parent_app_info.get("app_name", "Unknown Parent App") + } + + # Step 6: Fallback to old method if no specific tool match found + logger.debug("[Arize/Phoenix] No specific tool match found, trying fallback timing correlation") + return self._fallback_parent_tool_search(child_workflow_run_id) + + except Exception as e: + logger.warning("[Arize/Phoenix] Error finding parent workflow tool: {}".format(str(e))) + + return None + + def _find_parent_tool_by_sql(self, child_workflow_run_id, potential_parent_tools): + """Direct SQL approach to find which tool created this workflow""" + try: + from models.workflow import WorkflowRun + + # Get child workflow details + child_run = db.session.execute( + select(WorkflowRun).where(WorkflowRun.id == child_workflow_run_id) + ).scalars().first() + + if not child_run: + return None + + logger.debug("[Arize/Phoenix] Child workflow: app_id={}, start_time={}".format( + child_run.app_id, child_run.created_at)) + + # Look for tools that executed just before child workflow + for tool_node in potential_parent_tools: + if not tool_node.created_at: continue - role = message.get("role", "user") - content = message.get("text") or message.get("content") or "" + # Calculate time difference + time_diff = (child_run.created_at - tool_node.created_at).total_seconds() - set_message_attribute(message_index, MessageAttributes.MESSAGE_ROLE, role) - set_message_attribute(message_index, MessageAttributes.MESSAGE_CONTENT, content) + # Check if timing is reasonable (0-30 seconds after tool) + if 0 <= time_diff <= 30: + # Check if this tool has workflow execution indicators + has_workflow_output = self._tool_has_workflow_output(tool_node, child_workflow_run_id) - tool_calls = message.get("tool_calls") or [] - if isinstance(tool_calls, list): - for tool_index, tool_call in enumerate(tool_calls): - set_tool_call_attributes(message_index, tool_index, tool_call) + if has_workflow_output: + parent_run_id = tool_node.workflow_run_id + parent_trace_id = string_to_trace_id128(parent_run_id) - # Handle single dict or plain string prompt - elif isinstance(prompts, (dict, str)): - set_message_attribute(0, MessageAttributes.MESSAGE_CONTENT, prompts) - set_message_attribute(0, MessageAttributes.MESSAGE_ROLE, "user") + logger.info("[Arize/Phoenix] SQL Direct Match: Tool {} -> Child workflow {} ({}s gap)".format( + tool_node.id[:8], child_workflow_run_id[:8], time_diff)) + + # Get parent app info + parent_app_info = self._get_app_info_from_workflow_run_id(parent_run_id) + + return { + 'trace_id': parent_trace_id, + 'parent_span_id': string_to_span_id64(f"{parent_run_id}_{tool_node.id}"), + 'parent_workflow_run_id': parent_run_id, # Add parent workflow run ID + 'workflow_tool_name': f'direct_sql_match_{tool_node.node_type}', + 'parent_app_name': parent_app_info.get("app_name", "Unknown Parent App") + } + + return None + + except Exception as e: + logger.warning("[Arize/Phoenix] Error in SQL parent tool search: {}".format(str(e))) + return None + + def _tool_has_workflow_output(self, tool_node, target_workflow_run_id): + """Check if tool node has indicators of creating a workflow""" + try: + # Check outputs for workflow execution evidence + if hasattr(tool_node, 'outputs') and tool_node.outputs: + import json + outputs = json.loads(tool_node.outputs) if isinstance(tool_node.outputs, str) else tool_node.outputs + + if isinstance(outputs, dict): + output_str = str(outputs) + # Direct match + if target_workflow_run_id in output_str: + logger.debug("[Arize/Phoenix] Tool {} output contains target workflow_run_id".format(tool_node.id[:8])) + return True + + # Pattern match for workflow indicators + workflow_indicators = ['workflow', 'run_id', 'execution', 'started', 'completed'] + if any(indicator in output_str.lower() for indicator in workflow_indicators): + logger.debug("[Arize/Phoenix] Tool {} output has workflow indicators".format(tool_node.id[:8])) + return True + + # Check process_data for workflow creation + if hasattr(tool_node, 'process_data') and tool_node.process_data: + import json + process_data = json.loads(tool_node.process_data) if isinstance(tool_node.process_data, str) else tool_node.process_data + + if isinstance(process_data, dict): + process_str = str(process_data) + if target_workflow_run_id in process_str: + logger.debug("[Arize/Phoenix] Tool {} process_data contains target workflow_run_id".format(tool_node.id[:8])) + return True + + return False + + except Exception as e: + logger.debug("[Arize/Phoenix] Error checking tool workflow output: {}".format(str(e))) + return False + + def _get_workflow_app_id(self, workflow_run_id): + """Get the app_id for a workflow_run_id""" + try: + from models.workflow import WorkflowRun, Workflow + + # Get workflow run -> workflow -> app_id + workflow_run = db.session.execute( + select(WorkflowRun).where(WorkflowRun.id == workflow_run_id) + ).scalars().first() + + if not workflow_run: + return None + + workflow = db.session.execute( + select(Workflow).where(Workflow.id == workflow_run.workflow_id) + ).scalars().first() + + if not workflow: + return None + + return workflow.app_id + + except Exception as e: + logger.warning("[Arize/Phoenix] Error getting app_id for workflow_run {}: {}".format( + workflow_run_id[:8], str(e))) + return None + + def _tool_called_workflow_tool(self, tool_node, workflow_tool, child_workflow_run_id): + """Check if a tool node execution called the specific workflow tool""" + try: + # Method 1: Check process_data for workflow tool references + if hasattr(tool_node, 'process_data') and tool_node.process_data: + import json + process_data = json.loads(tool_node.process_data) if isinstance(tool_node.process_data, str) else tool_node.process_data + + if isinstance(process_data, dict): + # Look for workflow tool specific identifiers + tool_provider = process_data.get('tool_provider', '') + tool_name = process_data.get('tool_name', '') + app_id_ref = process_data.get('app_id', '') + + # Check if this matches our workflow tool + if (workflow_tool.name in tool_name or + workflow_tool.app_id == app_id_ref or + 'workflow' in tool_provider.lower()): + logger.debug("[Arize/Phoenix] Tool {} matches workflow tool by process_data".format(tool_node.id[:8])) + return True + + # Method 2: Check inputs for workflow tool references + if hasattr(tool_node, 'inputs') and tool_node.inputs: + import json + inputs = json.loads(tool_node.inputs) if isinstance(tool_node.inputs, str) else tool_node.inputs + + if isinstance(inputs, dict): + # Check for app_id or workflow references in inputs + if workflow_tool.app_id in str(inputs): + logger.debug("[Arize/Phoenix] Tool {} matches workflow tool by app_id in inputs".format(tool_node.id[:8])) + return True + + # Log for debugging + logger.debug("[Arize/Phoenix] Tool {} inputs checked, no app_id match".format(tool_node.id[:8])) + + # Method 3: Check outputs for child workflow_run_id (MOST IMPORTANT) + if hasattr(tool_node, 'outputs') and tool_node.outputs: + import json + outputs = json.loads(tool_node.outputs) if isinstance(tool_node.outputs, str) else tool_node.outputs + + if isinstance(outputs, dict): + # Direct match: tool output contains this workflow_run_id + if child_workflow_run_id in str(outputs): + logger.info("[Arize/Phoenix] FOUND: Tool {} created child workflow {} (in outputs)".format( + tool_node.id[:8], child_workflow_run_id[:8])) + return True + + # Check for workflow execution references in outputs + output_str = str(outputs).lower() + if any(key in output_str for key in ['workflow_run_id', 'execution_id', 'run_id']): + logger.debug("[Arize/Phoenix] Tool {} has workflow references in outputs".format(tool_node.id[:8])) + # Additional logging for debugging + logger.debug("[Arize/Phoenix] Tool outputs keys: {}".format(list(outputs.keys()))) + + # Method 4: Enhanced process_data checking + if hasattr(tool_node, 'process_data') and tool_node.process_data: + import json + try: + process_data = json.loads(tool_node.process_data) if isinstance(tool_node.process_data, str) else tool_node.process_data + + if isinstance(process_data, dict): + # Look for workflow execution metadata in process_data + if child_workflow_run_id in str(process_data): + logger.info("[Arize/Phoenix] FOUND: Tool {} created child workflow {} (in process_data)".format( + tool_node.id[:8], child_workflow_run_id[:8])) + return True + + except Exception as e: + logger.debug("[Arize/Phoenix] Could not parse process_data for tool {}: {}".format(tool_node.id[:8], e)) + + # Method 4: Timing-based correlation (within 15 seconds) + if hasattr(tool_node, 'created_at') and tool_node.created_at: + from models.workflow import WorkflowRun + + child_run = db.session.execute( + select(WorkflowRun).where(WorkflowRun.id == child_workflow_run_id) + ).scalars().first() + + if child_run and child_run.created_at: + time_diff = (child_run.created_at - tool_node.created_at).total_seconds() + if 0 <= time_diff <= 15: # Child started within 15 seconds after tool + logger.debug("[Arize/Phoenix] Tool {} matches by timing correlation ({:.1f}s)".format( + tool_node.id[:8], time_diff)) + return True + + except Exception as e: + logger.debug("[Arize/Phoenix] Error checking tool-workflow correlation: {}".format(str(e))) + + return False + + def _fallback_parent_tool_search(self, child_workflow_run_id): + """Fallback to original timing-based search method""" + try: + from models.workflow import WorkflowNodeExecutionModel + + # Original method: look for recent tool/workflow nodes + recent_workflow_tools = db.session.execute( + select(WorkflowNodeExecutionModel).where( + WorkflowNodeExecutionModel.node_type.in_(['workflow', 'tool']) + ).order_by(WorkflowNodeExecutionModel.created_at.desc()).limit(20) + ).scalars().all() + + for tool_node in recent_workflow_tools: + # Use original timing-based method + if self._tool_created_workflow(tool_node, child_workflow_run_id): + parent_run_id = tool_node.workflow_run_id + parent_trace_id = string_to_trace_id128(parent_run_id) + + logger.info("[Arize/Phoenix] Fallback: Found parent workflow tool: {} -> child: {}".format( + tool_node.id[:8], child_workflow_run_id[:8])) + + # Get parent app name even for fallback cases + parent_app_info = self._get_app_info_from_workflow_run_id(parent_run_id) + + return { + 'trace_id': parent_trace_id, + 'parent_span_id': string_to_span_id64(f"{parent_run_id}_{tool_node.id}"), + 'parent_workflow_run_id': parent_run_id, # Add parent workflow run ID + 'workflow_tool_name': 'unknown_workflow_tool', + 'parent_app_name': parent_app_info.get("app_name", "Unknown Parent App") + } + + except Exception as e: + logger.warning("[Arize/Phoenix] Error in fallback parent tool search: {}".format(str(e))) + + return None + + def _tool_created_workflow(self, tool_node, child_workflow_run_id): + """Check if a tool node created a specific child workflow""" + try: + # Method 1: Check outputs for workflow reference + if tool_node.outputs: + import json + outputs = json.loads(tool_node.outputs) if isinstance(tool_node.outputs, str) else tool_node.outputs + + # Check if outputs contain reference to the child workflow + if isinstance(outputs, dict): + # Look for various workflow reference patterns + workflow_refs = [ + outputs.get('workflow_run_id'), + outputs.get('app_id'), + outputs.get('run_id'), + outputs.get('execution_id') + ] + if any(ref == child_workflow_run_id for ref in workflow_refs if ref): + return True + + # Method 2: Check process_data for workflow tool configuration + if hasattr(tool_node, 'process_data') and tool_node.process_data: + import json + process_data = json.loads(tool_node.process_data) if isinstance(tool_node.process_data, str) else tool_node.process_data + + # Check if this tool is configured to call workflows + if isinstance(process_data, dict): + tool_type = process_data.get('tool_type') or process_data.get('provider') + if tool_type and 'workflow' in str(tool_type).lower(): + # This is likely a workflow tool - check timing + return self._check_timing_relationship(tool_node, child_workflow_run_id) + + # Method 3: Check timing for any tool node (fallback) + return self._check_timing_relationship(tool_node, child_workflow_run_id) + + except Exception as e: + logger.debug("[Arize/Phoenix] Error checking tool-workflow relationship: {}".format(str(e))) + + return False + + def _check_timing_relationship(self, tool_node, child_workflow_run_id): + """Check timing relationship between tool execution and child workflow start""" + try: + from models.workflow import WorkflowRun + + # Get the child workflow run start time + child_run = db.session.execute( + select(WorkflowRun).where(WorkflowRun.id == child_workflow_run_id) + ).scalars().first() + + if child_run and tool_node.created_at and child_run.created_at: + time_diff = (child_run.created_at - tool_node.created_at).total_seconds() + + # Based on analysis: child workflows start within 15 seconds, often 0-1s + if 0 <= time_diff <= 15: + logger.info("[Arize/Phoenix] Timing match: tool {} -> workflow {} ({:.1f}s delay)".format( + tool_node.id[:8], child_workflow_run_id[:8], time_diff)) + return True + + except Exception: + pass + + return False + + def _find_child_workflow_by_timing(self, tool_node): + """Find child workflow triggered by this tool using enhanced correlation""" + try: + from models.workflow import WorkflowRun + + if not tool_node.created_at: + return None + + # Enhanced method: Check if this tool has already been associated with a child workflow + # This prevents the same workflow from being tagged to multiple tools + tool_node_id = tool_node.id + + # First, check outputs for direct workflow reference + if hasattr(tool_node, 'outputs') and tool_node.outputs: + outputs = json.loads(tool_node.outputs) if isinstance(tool_node.outputs, str) else tool_node.outputs + if isinstance(outputs, dict): + # Look for direct workflow_run_id reference in outputs + for key, value in outputs.items(): + if 'workflow_run_id' in str(key).lower() or 'run_id' in str(key).lower(): + potential_workflow_id = str(value) + if len(potential_workflow_id) > 30: # Workflow IDs are long UUIDs + logger.info("[Arize/Phoenix] Found direct workflow reference in tool {} outputs: {}".format( + tool_node_id[:8], potential_workflow_id[:8])) + return potential_workflow_id + + # Second, check process_data for workflow execution evidence + if hasattr(tool_node, 'process_data') and tool_node.process_data: + process_data = json.loads(tool_node.process_data) if isinstance(tool_node.process_data, str) else tool_node.process_data + if isinstance(process_data, dict): + # Look for workflow execution metadata + for key, value in process_data.items(): + if 'workflow' in str(key).lower() and 'run' in str(key).lower(): + potential_workflow_id = str(value) + if len(potential_workflow_id) > 30: + logger.info("[Arize/Phoenix] Found workflow reference in tool {} process_data: {}".format( + tool_node_id[:8], potential_workflow_id[:8])) + return potential_workflow_id + + # Third, enhanced timing correlation with uniqueness check + potential_children = db.session.execute( + select(WorkflowRun).where( + WorkflowRun.created_at >= tool_node.created_at, + WorkflowRun.created_at <= tool_node.created_at + timedelta(seconds=15) + ).order_by(WorkflowRun.created_at) + ).scalars().all() + + if not potential_children: + return None + + # Check if any of these workflows are already associated with other tools in this workflow + tool_workflow_run_id = tool_node.workflow_run_id + already_associated = set() + + # Get other tool nodes from the same workflow + from models.workflow import WorkflowNodeExecutionModel + other_tools = db.session.execute( + select(WorkflowNodeExecutionModel).where( + WorkflowNodeExecutionModel.workflow_run_id == tool_workflow_run_id, + WorkflowNodeExecutionModel.node_type == 'tool', + WorkflowNodeExecutionModel.id != tool_node_id + ) + ).scalars().all() + + # Check what child workflows other tools have already claimed + for other_tool in other_tools: + if other_tool.created_at and other_tool.created_at <= tool_node.created_at: + # This tool executed before our tool, check its potential children + for child_run in potential_children: + time_diff = (child_run.created_at - other_tool.created_at).total_seconds() + if 0 <= time_diff <= 15: # This child might belong to the earlier tool + already_associated.add(child_run.id) + logger.debug("[Arize/Phoenix] Child workflow {} already associated with earlier tool {}".format( + child_run.id[:8], other_tool.id[:8])) + + # Find the best available match that's not already associated + best_match = None + best_time_diff = float('inf') + + for child_run in potential_children: + if child_run.id not in already_associated: # Only consider unassociated workflows + time_diff = (child_run.created_at - tool_node.created_at).total_seconds() + if 0 <= time_diff < best_time_diff: + best_time_diff = time_diff + best_match = child_run + + if best_match and best_time_diff <= 15: + logger.info("[Arize/Phoenix] Found unique child workflow: tool {} -> {} ({:.1f}s)".format( + tool_node.id[:8], best_match.id[:8], best_time_diff)) + return best_match.id + + if potential_children: + logger.debug("[Arize/Phoenix] Tool {} - all potential children already associated with other tools".format( + tool_node_id[:8])) + + except Exception as e: + logger.debug("[Arize/Phoenix] Error finding child workflow: {}".format(str(e))) + + return None + + def _find_parent_tools_by_timing(self, child_workflow_run_id): + """Find parent tool nodes that might have triggered this workflow""" + try: + from models.workflow import WorkflowRun, WorkflowNodeExecutionModel + + # Get the child workflow start time + child_run = db.session.execute( + select(WorkflowRun).where(WorkflowRun.id == child_workflow_run_id) + ).scalars().first() + + if not child_run or not child_run.created_at: + return [] + + # Look for tool nodes that executed shortly before this workflow started + start_window = child_run.created_at - timedelta(seconds=30) + end_window = child_run.created_at + timedelta(seconds=5) + + potential_parent_tools = db.session.execute( + select(WorkflowNodeExecutionModel).where( + WorkflowNodeExecutionModel.node_type == 'tool', + WorkflowNodeExecutionModel.created_at >= start_window, + WorkflowNodeExecutionModel.created_at <= end_window + ).order_by(WorkflowNodeExecutionModel.created_at.desc()) + ).scalars().all() + + # Filter to tools that are close in timing + matching_tools = [] + for tool in potential_parent_tools: + if tool.created_at: + time_diff = (child_run.created_at - tool.created_at).total_seconds() + if 0 <= time_diff <= 15: # Within 15 seconds before child start + matching_tools.append(tool.id) + + return matching_tools + + except Exception as e: + logger.debug("[Arize/Phoenix] Error finding parent tools: {}".format(str(e))) + return [] + + def _get_parent_workflow_run_from_tool(self, tool_node_id): + """Get the parent workflow run ID from a tool node ID""" + try: + from models.workflow import WorkflowNodeExecutionModel + + tool_node = db.session.execute( + select(WorkflowNodeExecutionModel).where(WorkflowNodeExecutionModel.id == tool_node_id) + ).scalars().first() + + if tool_node: + return tool_node.workflow_run_id + + except Exception: + pass + + return None + + def _find_logical_parent_span(self, node_execution, node_spans, execution_context): + """Find logical parent span based on execution context and node relationships""" + try: + node_id = getattr(node_execution, 'node_id', node_execution.id) + node_index = getattr(node_execution, 'index', 0) + + # For tools and LLMs, try to find the most recent parent span + # Look for nodes with lower index (executed before this one) + potential_parents = [] + + for span_node_id, span in node_spans.items(): + # Get execution context for this span + if span_node_id in execution_context: + context = execution_context[span_node_id] + span_index = context.get('index', 0) + + # This span executed before current node + if span_index < node_index: + potential_parents.append((span_index, span)) + + # Return the most recent parent (highest index that's still lower than current) + if potential_parents: + potential_parents.sort(key=lambda x: x[0], reverse=True) + return potential_parents[0][1] + + except Exception as e: + logger.debug("[Arize/Phoenix] Error finding logical parent: {}".format(str(e))) + + return None + + def _get_workflow_tool_name(self, node_execution): + """Extract workflow tool name from node execution data""" + try: + if hasattr(node_execution, 'process_data') and node_execution.process_data: + process_data = json.loads(node_execution.process_data) if isinstance(node_execution.process_data, str) else node_execution.process_data + + # Look for workflow name in process data + workflow_name = process_data.get('workflow_name') or process_data.get('app_name') + if workflow_name: + return workflow_name + + # Fallback: Check inputs for workflow reference + if node_execution.inputs: + inputs = json.loads(node_execution.inputs) if isinstance(node_execution.inputs, str) else node_execution.inputs + + workflow_ref = inputs.get('workflow_id') or inputs.get('app_id') + if workflow_ref: + return "workflow_{}".format(workflow_ref[:8]) + + except Exception as e: + logger.debug("[Arize/Phoenix] Could not extract workflow tool name: {}".format(str(e))) + + return None + + def _is_workflow_tool(self, node_execution): + """Check if a tool node is actually calling a workflow""" + try: + if hasattr(node_execution, 'process_data') and node_execution.process_data: + process_data = json.loads(node_execution.process_data) if isinstance(node_execution.process_data, str) else node_execution.process_data + + # Check for workflow tool indicators + if isinstance(process_data, dict): + tool_type = process_data.get('tool_type') or process_data.get('provider') or process_data.get('tool_name', '') + + # Look for workflow-related indicators + workflow_indicators = ['workflow', 'app', 'dify'] + if any(indicator in str(tool_type).lower() for indicator in workflow_indicators): + return True + + # Check for app_id in tool configuration + if 'app_id' in process_data or 'workflow_id' in process_data: + return True + + except Exception: + pass + + return False + + def _get_tool_name(self, node_execution): + """Extract tool name from regular tool nodes - enhanced with database lookup""" + try: + # Method 1: Try to get tool name from process_data + if hasattr(node_execution, 'process_data') and node_execution.process_data: + process_data = json.loads(node_execution.process_data) if isinstance(node_execution.process_data, str) else node_execution.process_data + + if isinstance(process_data, dict): + # Try different name fields + tool_name = (process_data.get('tool_name') or + process_data.get('provider') or + process_data.get('tool_type') or + process_data.get('name')) + + if tool_name and tool_name not in ['workflow', 'unknown', '']: + return str(tool_name).replace('_', ' ').title() + + # Method 2: Try to get from tool configuration in process_data + tool_config = process_data.get('tool_config', {}) + if isinstance(tool_config, dict): + config_name = tool_config.get('name') or tool_config.get('provider') + if config_name and config_name not in ['workflow', 'unknown', '']: + return str(config_name).replace('_', ' ').title() + + # Method 3: Try to extract from inputs if it's a workflow tool + if hasattr(node_execution, 'inputs') and node_execution.inputs: + inputs = json.loads(node_execution.inputs) if isinstance(node_execution.inputs, str) else node_execution.inputs + if isinstance(inputs, dict): + # Check if there's a tool reference in inputs + app_id = inputs.get('app_id') + if app_id: + # Query the tools table for this app_id + tool_from_db = self._get_tool_name_from_db(app_id) + if tool_from_db: + return tool_from_db + + # Method 4: Try to get tool name from database using node metadata + if hasattr(node_execution, 'execution_metadata') and node_execution.execution_metadata: + metadata = json.loads(node_execution.execution_metadata) if isinstance(node_execution.execution_metadata, str) else node_execution.execution_metadata + if isinstance(metadata, dict): + tool_provider_id = metadata.get('tool_provider_id') + if tool_provider_id: + tool_from_db = self._get_tool_name_by_provider_id(tool_provider_id) + if tool_from_db: + return tool_from_db + + except Exception as e: + logger.debug("[Arize/Phoenix] Error extracting tool name: {}".format(str(e))) + + return None + + def _is_child_workflow(self, trace_info): + """Determine if this is a child workflow based on parent context""" + try: + # Check if this workflow was called as a tool from another workflow + parent_context = self._get_parent_workflow_context(trace_info) + return parent_context is not None + except Exception: + return False + + def _ensure_child_workflows_processed(self, main_trace_info): + """Ensure all child workflows are processed before main workflow""" + try: + # Get all tool nodes from this workflow that might trigger child workflows + nodes = self._get_workflow_nodes(main_trace_info.workflow_run_id) + tool_nodes = [n for n in nodes if n.node_type == 'tool'] + + for tool_node in tool_nodes: + child_workflow_id = self._find_child_workflow_by_timing(tool_node) + if child_workflow_id: + # Check if child workflow trace exists + if not self._child_workflow_trace_exists(child_workflow_id): + logger.info("[Arize/Phoenix] Child workflow {} not yet traced, should be processed first".format( + child_workflow_id[:8])) + # In a real implementation, you might queue the child workflow for processing + # For now, we'll log this as a processing order issue + + except Exception as e: + logger.debug("[Arize/Phoenix] Error ensuring child workflows processed: {}".format(str(e))) + + def _child_workflow_trace_exists(self, workflow_run_id): + """Check if a child workflow has already been traced""" + try: + # This could check a cache, database, or trace store + # For now, we'll implement basic logic + return False # Placeholder - implement based on your trace storage + except Exception: + return False + + def _get_child_workflow_references(self, main_workflow_run_id): + """Get references to child workflow traces for main workflow""" + try: + child_refs = [] + nodes = self._get_workflow_nodes(main_workflow_run_id) + tool_nodes = [n for n in nodes if n.node_type == 'tool'] + + for tool_node in tool_nodes: + child_workflow_id = self._find_child_workflow_by_timing(tool_node) + if child_workflow_id: + trace_info = self._get_child_workflow_trace_info(child_workflow_id) + if trace_info: + child_refs.append({ + 'child_workflow_run_id': child_workflow_id, + 'child_trace_id': trace_info.get('trace_id', ''), + 'calling_tool_id': tool_node.id, + 'tool_index': getattr(tool_node, 'index', 0) + }) + + return child_refs + + except Exception as e: + logger.debug("[Arize/Phoenix] Error getting child workflow references: {}".format(str(e))) + return [] + + def _get_child_workflow_trace_info(self, child_workflow_run_id): + """Get trace information for a child workflow""" + try: + # This would typically query your trace store or cache + # For now, we'll generate the expected trace/span IDs + trace_id = string_to_trace_id128(child_workflow_run_id) + span_id = string_to_span_id64(child_workflow_run_id) + + return { + 'trace_id': hex(trace_id), + 'span_id': hex(span_id), + 'workflow_run_id': child_workflow_run_id + } + + except Exception as e: + logger.debug("[Arize/Phoenix] Error getting child workflow trace info: {}".format(str(e))) + return None + + def _get_tool_name_from_db(self, app_id): + """Query tools table to get tool name by app_id""" + try: + from models.tools import WorkflowToolProvider + + tool_provider = db.session.execute( + select(WorkflowToolProvider).where(WorkflowToolProvider.app_id == app_id) + ).scalars().first() + + if tool_provider: + return tool_provider.name + + except Exception as e: + logger.debug("[Arize/Phoenix] Error getting tool name from DB by app_id: {}".format(str(e))) + + return None + + def _get_tool_name_by_provider_id(self, provider_id): + """Query tools table to get tool name by provider_id""" + try: + # Try different tool tables based on provider type + from models.tools import BuiltinToolProvider, ApiToolProvider, WorkflowToolProvider + + # Try workflow tool provider first + tool = db.session.execute( + select(WorkflowToolProvider).where(WorkflowToolProvider.id == provider_id) + ).scalars().first() + + if tool: + return tool.name + + # Try builtin tool provider + tool = db.session.execute( + select(BuiltinToolProvider).where(BuiltinToolProvider.id == provider_id) + ).scalars().first() + + if tool: + return tool.name + + # Try API tool provider + tool = db.session.execute( + select(ApiToolProvider).where(ApiToolProvider.id == provider_id) + ).scalars().first() + + if tool: + return tool.name + + except Exception as e: + logger.debug("[Arize/Phoenix] Error getting tool name from DB by provider_id: {}".format(str(e))) + + return None + + def _get_api_info(self, node_execution): + """Extract API call information from HTTP request node""" + try: + if hasattr(node_execution, 'process_data') and node_execution.process_data: + process_data = json.loads(node_execution.process_data) if isinstance(node_execution.process_data, str) else node_execution.process_data + + return { + 'method': process_data.get('method', ''), + 'url': process_data.get('url', ''), + 'status_code': process_data.get('status_code', ''), + } + + except Exception: + pass + + return None + + def _resolve_decision_paths(self, execution_nodes, base_hierarchy): + """Resolve actual execution paths from decision nodes (question-classifier, if-else)""" + decision_relationships = {} + + # Group nodes by execution order + sorted_nodes = sorted(execution_nodes, key=lambda n: getattr(n, 'index', 0)) + + for i, node in enumerate(sorted_nodes): + if node.node_type in ['question-classifier', 'if-else']: + # Find which nodes executed after this decision node + next_nodes = sorted_nodes[i+1:] + + # Get the decision output to understand which path was taken + decision_output = self._extract_decision_output(node) + + # For question-classifier, use class_id to determine next node + if node.node_type == 'question-classifier' and decision_output: + class_id = decision_output.get('class_id') + if class_id and next_nodes: + # The immediate next node is likely the one chosen by classifier + next_node = next_nodes[0] + next_node_id = getattr(next_node, 'node_id', next_node.id) + decision_node_id = getattr(node, 'node_id', node.id) + + # Override hierarchy to show actual execution path + decision_relationships[next_node_id] = decision_node_id + + logger.info("[Arize/Phoenix] Decision path: {} -> {} (class: {})".format( + decision_node_id, next_node_id, decision_output.get('class_name', '')[:50])) + + # For if-else, determine which branch was taken based on subsequent execution + elif node.node_type == 'if-else' and next_nodes: + # The next executed node shows which branch was taken + next_node = next_nodes[0] + next_node_id = getattr(next_node, 'node_id', next_node.id) + decision_node_id = getattr(node, 'node_id', node.id) + + decision_relationships[next_node_id] = decision_node_id + + logger.info("[Arize/Phoenix] IF/ELSE path: {} -> {}".format( + decision_node_id, next_node_id)) + + return decision_relationships + + def _extract_decision_output(self, node): + """Extract decision output from question-classifier or if-else node""" + try: + if node.outputs: + import json + outputs = json.loads(node.outputs) if isinstance(node.outputs, str) else node.outputs + return outputs + except Exception as e: + logger.warning("[Arize/Phoenix] Could not parse decision output for node {}: {}".format(node.id, str(e))) + return None + + def _resolve_loop_hierarchy(self, execution_nodes, workflow_graph): + """Handle loop-based execution hierarchy with enhanced iteration detection""" + loop_relationships = {} + + # Find loop and loop-start nodes + loop_nodes = [n for n in execution_nodes if n.node_type == 'loop'] + iteration_nodes = [n for n in execution_nodes if n.node_type == 'iteration'] # Add iteration support + + # Handle traditional loops + for loop_node in loop_nodes: + loop_node_id = getattr(loop_node, 'node_id', loop_node.id) + loop_index = getattr(loop_node, 'index', 0) + + logger.info("[Arize/Phoenix] Processing loop node: {} (index: {})".format(loop_node_id, loop_index)) + + # Find nodes that should be children of this loop + # These are nodes that execute after the loop starts and before the loop ends + for node in execution_nodes: + if node != loop_node and node.node_type not in ['start', 'end']: + node_id = getattr(node, 'node_id', node.id) + node_index = getattr(node, 'index', 0) + + # Check if node is inside loop based on execution order and graph structure + if self._is_node_in_loop_execution(node, loop_node, execution_nodes, workflow_graph): + loop_relationships[node_id] = loop_node_id + logger.info("[Arize/Phoenix] Loop child: {} -> loop: {}".format(node_id, loop_node_id)) + + # Handle iteration nodes (similar to loops but different node type) + for iteration_node in iteration_nodes: + iteration_node_id = getattr(iteration_node, 'node_id', iteration_node.id) + iteration_index = getattr(iteration_node, 'index', 0) + + logger.info("[Arize/Phoenix] Processing iteration node: {} (index: {})".format(iteration_node_id, iteration_index)) + + # Find tools and other nodes that should be children of this iteration + for node in execution_nodes: + if node != iteration_node and node.node_type not in ['start', 'end']: + node_id = getattr(node, 'node_id', node.id) + node_index = getattr(node, 'index', 0) + + # For iterations, tools executed within the iteration timeframe should be children + if self._is_node_in_iteration_execution(node, iteration_node, execution_nodes): + loop_relationships[node_id] = iteration_node_id + logger.info("[Arize/Phoenix] Iteration child: {} -> iteration: {}".format(node_id, iteration_node_id)) + + return loop_relationships + + def _is_node_in_loop(self, node_id, loop_node_id, workflow_graph): + """Check if a node is inside a loop based on graph structure""" + try: + # Look for nodes with parentId matching the loop + for graph_node in workflow_graph.get('nodes', []): + if graph_node.get('id') == node_id: + parent_id = graph_node.get('parentId') + if parent_id == loop_node_id: + return True + return False + except Exception: + return False + + def _is_node_in_loop_execution(self, node, loop_node, execution_nodes, workflow_graph): + """Enhanced check if a node is inside a loop based on execution order and graph structure""" + try: + node_index = getattr(node, 'index', 0) + loop_index = getattr(loop_node, 'index', 0) + node_id = getattr(node, 'node_id', node.id) + loop_node_id = getattr(loop_node, 'node_id', loop_node.id) + + # Method 1: Graph-based check (if available) + if workflow_graph and self._is_node_in_loop(node_id, loop_node_id, workflow_graph): + return True + + # Method 2: Execution order check + # Nodes inside loops typically execute after the loop node + if node_index > loop_index: + # Check if there's a reasonable execution gap (not too far apart) + if (node_index - loop_index) <= 50: # Reasonable iteration range + return True + + # Method 3: Check for loop iteration patterns + # Tools that execute multiple times are likely inside loops + node_type = node.node_type + if node_type in ['tool', 'llm'] and self._node_appears_multiple_times(node, execution_nodes): + return True + + return False + + except Exception: + return False + + def _is_node_in_iteration_execution(self, node, iteration_node, execution_nodes): + """Check if a node is inside an iteration based on execution timing""" + try: + node_created = getattr(node, 'created_at', None) + iteration_created = getattr(iteration_node, 'created_at', None) + + if not node_created or not iteration_created: + return False + + # Check if node executed during or shortly after the iteration + from datetime import timedelta + time_diff = (node_created - iteration_created).total_seconds() + + # Node executed within reasonable time after iteration started (0 to 300 seconds) + if 0 <= time_diff <= 300: + return True + + return False + + except Exception: + return False + + def _node_appears_multiple_times(self, node, execution_nodes): + """Check if a node appears to be executed multiple times (indicating loop iteration)""" + try: + node_id = getattr(node, 'node_id', node.id) + count = 0 + + for exec_node in execution_nodes: + exec_node_id = getattr(exec_node, 'node_id', exec_node.id) + if exec_node_id == node_id: + count += 1 + + return count > 1 + + except Exception: + return False + + def _construct_llm_attributes(self, prompts: dict | list | str | None) -> dict[str, str]: + """Helper method to construct LLM attributes with passed prompts.""" + attributes = {} + if isinstance(prompts, list): + for i, msg in enumerate(prompts): + if isinstance(msg, dict): + attributes[f"{SpanAttributes.LLM_INPUT_MESSAGES}.{i}.message.content"] = msg.get("text", "") + attributes[f"{SpanAttributes.LLM_INPUT_MESSAGES}.{i}.message.role"] = msg.get("role", "user") + # todo: handle assistant and tool role messages, as they don't always + # have a text field, but may have a tool_calls field instead + # e.g. 'tool_calls': [{'id': '98af3a29-b066-45a5-b4b1-46c74ddafc58', + # 'type': 'function', 'function': {'name': 'current_time', 'arguments': '{}'}}]} + elif isinstance(prompts, dict): + attributes[f"{SpanAttributes.LLM_INPUT_MESSAGES}.0.message.content"] = json.dumps(prompts) + attributes[f"{SpanAttributes.LLM_INPUT_MESSAGES}.0.message.role"] = "user" + elif isinstance(prompts, str): + attributes[f"{SpanAttributes.LLM_INPUT_MESSAGES}.0.message.content"] = prompts + attributes[f"{SpanAttributes.LLM_INPUT_MESSAGES}.0.message.role"] = "user" return attributes + + def _get_app_info_from_workflow_run_id(self, workflow_run_id: str) -> dict: + """ + Get app info (id and name) from workflow_run_id for enhanced tracing metadata + + Database relationship: workflow_run -> workflow -> app -> app.name, app.id + Returns: {"app_id": str, "app_name": str} + """ + try: + from models.workflow import WorkflowRun, Workflow + from models.model import App + + # Get workflow run + workflow_run = db.session.execute( + select(WorkflowRun).where(WorkflowRun.id == workflow_run_id) + ).scalars().first() + + if not workflow_run: + logger.warning("[Arize/Phoenix] WorkflowRun not found for ID: %s", workflow_run_id) + return {"app_id": "unknown", "app_name": "Unknown Workflow Run"} + + # Get workflow + workflow = db.session.execute( + select(Workflow).where(Workflow.id == workflow_run.workflow_id) + ).scalars().first() + + if not workflow: + logger.warning("[Arize/Phoenix] Workflow not found for ID: %s", workflow_run.workflow_id) + return {"app_id": "unknown", "app_name": "Unknown Workflow"} + + # Get app + app = db.session.execute( + select(App).where(App.id == workflow.app_id) + ).scalars().first() + + if not app: + logger.warning("[Arize/Phoenix] App not found for ID: %s", workflow.app_id) + return {"app_id": workflow.app_id, "app_name": "Unknown App"} + + logger.info("[Arize/Phoenix] Found app: %s (%s) for workflow_run: %s", + app.name, app.id, workflow_run_id[:8]) + + return {"app_id": app.id, "app_name": app.name} + + except Exception as e: + logger.error("[Arize/Phoenix] Error getting app info for workflow_run %s: %s", + workflow_run_id[:8], str(e)) + return {"app_id": "error", "app_name": f"Error: {str(e)[:50]}"}