diff --git a/api/providers/trace/trace-arize-phoenix/src/dify_trace_arize_phoenix/arize_phoenix_trace.py b/api/providers/trace/trace-arize-phoenix/src/dify_trace_arize_phoenix/arize_phoenix_trace.py index 9b35612b52..3150a67bc9 100644 --- a/api/providers/trace/trace-arize-phoenix/src/dify_trace_arize_phoenix/arize_phoenix_trace.py +++ b/api/providers/trace/trace-arize-phoenix/src/dify_trace_arize_phoenix/arize_phoenix_trace.py @@ -4,6 +4,7 @@ import os import re import traceback from collections.abc import Mapping, Sequence +from dataclasses import dataclass, field from datetime import datetime, timedelta from typing import Any, Protocol, Union, cast from urllib.parse import urlparse @@ -59,6 +60,7 @@ _PHOENIX_PARENT_SPAN_CONTEXT_TTL_SECONDS = 300 _TRACEPARENT_PATTERN = re.compile( r"^(?P[0-9a-f]{2})-(?P[0-9a-f]{32})-(?P[0-9a-f]{16})-(?P[0-9a-f]{2})$" ) +_WRAPPER_INDEX_PATTERN = re.compile(r"^[A-Za-z0-9_.:-]+$") def _phoenix_parent_span_redis_key(parent_node_execution_id: str) -> str: @@ -261,21 +263,7 @@ def set_span_status(current_span: Span, error: Exception | str | None = None): if error: error_string = error_to_string(error) current_span.set_status(Status(StatusCode.ERROR, error_string)) - - if isinstance(error, Exception): - current_span.record_exception(error) - else: - exception_type = error.__class__.__name__ - exception_message = str(error) - if not exception_message: - exception_message = repr(error) - attributes: dict[str, AttributeValue] = { - exception_attributes.EXCEPTION_TYPE: exception_type, - exception_attributes.EXCEPTION_MESSAGE: exception_message, - exception_attributes.EXCEPTION_ESCAPED: False, - exception_attributes.EXCEPTION_STACKTRACE: error_string, - } - current_span.add_event(name="exception", attributes=attributes) + _record_exception_event(current_span, error) else: current_span.set_status(Status(StatusCode.OK)) @@ -473,6 +461,155 @@ def _build_execution_id_by_node_id(node_executions: Sequence[_NodeExecutionIdent return execution_id_by_node_id +@dataclass(frozen=True) +class _WrapperGroupKey: + wrapper_type: str + container_execution_id: str + index: str + + +@dataclass +class _WrapperGroup: + key: _WrapperGroupKey + container_execution_id: str + child_execution_ids: set[str] = field(default_factory=set) + start_time: datetime | None = None + end_time: datetime | None = None + has_error: bool = False + + +def _normalize_wrapper_index(value: Any) -> str | None: + """Normalize stable loop/iteration indexes used for synthetic wrapper spans.""" + if isinstance(value, bool): + return None + if isinstance(value, int): + return str(value) if value >= 0 else None + if isinstance(value, str) and _WRAPPER_INDEX_PATTERN.fullmatch(value): + return value + return None + + +def _execution_metadata_mapping(node_execution: object) -> Mapping[Any, Any]: + """Return execution metadata from repository models and test doubles.""" + execution_metadata = getattr(node_execution, "execution_metadata_dict", None) + if isinstance(execution_metadata, Mapping): + return execution_metadata + + execution_metadata = getattr(node_execution, "metadata", None) + if isinstance(execution_metadata, Mapping): + return execution_metadata + + return {} + + +def _node_finished_at(node_execution: object) -> datetime: + end_time = getattr(node_execution, "end_time", None) + if isinstance(end_time, datetime): + return end_time + created_at = getattr(node_execution, "created_at", None) or getattr(node_execution, "start_time", None) + if not isinstance(created_at, datetime): + created_at = datetime.now() + elapsed_time = getattr(node_execution, "elapsed_time", None) or 0.0 + return created_at + timedelta(seconds=elapsed_time) + + +def _metadata_or_attr(node_execution: object, node_metadata: Mapping[str, Any], key: str) -> Any: + value = node_metadata.get(key) + if value is not None: + return value + return getattr(node_execution, key, None) + + +def _node_execution_failed(node_execution: object) -> bool: + status = getattr(node_execution, "status", None) + return status in {WorkflowNodeExecutionStatus.FAILED, "failed"} + + +def _node_execution_handled_exception(node_execution: object) -> bool: + status = getattr(node_execution, "status", None) + return status in {WorkflowNodeExecutionStatus.EXCEPTION, "exception"} + + +def _record_exception_event(current_span: Span, error: Exception | str | None = None) -> None: + if not error: + return + + error_string = error_to_string(error) + if isinstance(error, Exception): + current_span.record_exception(error) + return + + exception_message = str(error) or repr(error) + attributes: dict[str, AttributeValue] = { + exception_attributes.EXCEPTION_TYPE: error.__class__.__name__, + exception_attributes.EXCEPTION_MESSAGE: exception_message, + exception_attributes.EXCEPTION_ESCAPED: False, + exception_attributes.EXCEPTION_STACKTRACE: error_string, + } + current_span.add_event(name="exception", attributes=attributes) + + +def _resolve_wrapper_group_key( + node_execution: _NodeExecutionIdentityLike, + node_metadata: Mapping[Any, Any], + execution_id_by_node_id: Mapping[str, str], +) -> _WrapperGroupKey | None: + for wrapper_type, container_key, index_key in ( + ("iteration", "iteration_id", "iteration_index"), + ("loop", "loop_id", "loop_index"), + ): + container_id = _metadata_or_attr(node_execution, node_metadata, container_key) + if not isinstance(container_id, str) or not container_id: + continue + + container_execution_id = execution_id_by_node_id.get(container_id) + if container_execution_id is None or container_execution_id == _get_node_execution_id(node_execution): + continue + + index = _normalize_wrapper_index(_metadata_or_attr(node_execution, node_metadata, index_key)) + if index is None: + continue + + return _WrapperGroupKey( + wrapper_type=wrapper_type, + container_execution_id=container_execution_id, + index=index, + ) + + return None + + +def _build_wrapper_groups( + node_executions: Sequence[_NodeExecutionIdentityLike], +) -> dict[_WrapperGroupKey, _WrapperGroup]: + """Group repeated loop/iteration body executions behind synthetic Phoenix spans.""" + execution_id_by_node_id = _build_execution_id_by_node_id(node_executions) + groups: dict[_WrapperGroupKey, _WrapperGroup] = {} + + for node_execution in node_executions: + node_metadata = _execution_metadata_mapping(node_execution) + group_key = _resolve_wrapper_group_key(node_execution, node_metadata, execution_id_by_node_id) + if group_key is None: + continue + + group = groups.setdefault( + group_key, + _WrapperGroup(key=group_key, container_execution_id=group_key.container_execution_id), + ) + execution_id = _get_node_execution_id(node_execution) + group.child_execution_ids.add(execution_id) + + created_at = getattr(node_execution, "created_at", None) or getattr(node_execution, "start_time", None) + if not isinstance(created_at, datetime): + created_at = datetime.now() + finished_at = _node_finished_at(node_execution) + group.start_time = created_at if group.start_time is None else min(group.start_time, created_at) + group.end_time = finished_at if group.end_time is None else max(group.end_time, finished_at) + group.has_error = group.has_error or _node_execution_failed(node_execution) + + return groups + + def _build_graph_parent_index(node_executions: Sequence[_NodeExecutionIdentityLike]) -> dict[str, str]: """Build an execution-id parent index from predecessor node ids.""" execution_id_by_node_id = _build_execution_id_by_node_id(node_executions) @@ -634,16 +771,19 @@ class ArizePhoenixDataTrace(BaseTraceInstance): if not isinstance(workflow_root_span_name, str) or not workflow_root_span_name.strip(): workflow_root_span_name = None - workflow_parent_carrier = self.ensure_root_span( - root_trace_id, - root_span_name=workflow_root_span_name, - root_span_attributes={ + root_span_kwargs: dict[str, Any] = { + "root_span_name": workflow_root_span_name, + "root_span_attributes": { SpanAttributes.INPUT_VALUE: safe_json_dumps(trace_info.workflow_run_inputs), SpanAttributes.INPUT_MIME_TYPE: OpenInferenceMimeTypeValues.JSON.value, SpanAttributes.OUTPUT_VALUE: safe_json_dumps(trace_info.workflow_run_outputs), SpanAttributes.OUTPUT_MIME_TYPE: OpenInferenceMimeTypeValues.JSON.value, }, - ) + } + if trace_info.error: + root_span_kwargs["root_span_error"] = trace_info.error + + workflow_parent_carrier = self.ensure_root_span(root_trace_id, **root_span_kwargs) workflow_span_context = self.propagator.extract(carrier=workflow_parent_carrier) @@ -691,10 +831,67 @@ class ArizePhoenixDataTrace(BaseTraceInstance): } span_by_execution_id: dict[str, Span] = {} emitting_execution_ids: set[str] = set() + wrapper_groups = _build_wrapper_groups(workflow_node_executions) + wrapper_span_by_key: dict[_WrapperGroupKey, Span] = {} + finalized_wrapper_keys: set[_WrapperGroupKey] = set() + wrapper_key_by_child_execution_id: dict[str, _WrapperGroupKey] = {} + for group_key, group in wrapper_groups.items(): + for child_execution_id in group.child_execution_ids: + wrapper_key_by_child_execution_id[child_execution_id] = group_key workflow_span_error: Exception | str | None = trace_info.error try: + def emit_wrapper_span(group_key: _WrapperGroupKey) -> Span | None: + existing_span = wrapper_span_by_key.get(group_key) + if existing_span is not None: + return existing_span + + group = wrapper_groups.get(group_key) + if group is None: + return None + + if group.container_execution_id not in span_by_execution_id: + parent_node_execution = node_execution_by_execution_id.get(group.container_execution_id) + if parent_node_execution is not None: + emit_node_span(parent_node_execution) + + container_span = span_by_execution_id.get(group.container_execution_id) + if container_span is None: + return None + + metadata = { + "synthetic": True, + "wrapper_type": group_key.wrapper_type, + "wrapper_index": group_key.index, + "container_execution_id": group.container_execution_id, + } + wrapper_span = self.tracer.start_span( + name=f"{group_key.wrapper_type}[{group_key.index}]", + attributes={ + SpanAttributes.OPENINFERENCE_SPAN_KIND: OpenInferenceSpanKindValues.CHAIN.value, + SpanAttributes.METADATA: safe_json_dumps(metadata), + SpanAttributes.SESSION_ID: workflow_session_id or "", + "dify.wrapper.synthetic": True, + "dify.wrapper.type": group_key.wrapper_type, + "dify.wrapper.index": group_key.index, + "dify.wrapper.container_execution_id": group.container_execution_id, + }, + start_time=datetime_to_nanos(group.start_time), + context=set_span_in_context(container_span), + ) + wrapper_span_by_key[group_key] = wrapper_span + return wrapper_span + + def finalize_wrapper_spans() -> None: + for group_key, wrapper_span in list(wrapper_span_by_key.items()): + if group_key in finalized_wrapper_keys: + continue + group = wrapper_groups[group_key] + set_span_status(wrapper_span, "wrapper child failed" if group.has_error else None) + wrapper_span.end(end_time=datetime_to_nanos(group.end_time)) + finalized_wrapper_keys.add(group_key) + def emit_node_span(node_execution: _NodeExecutionLike) -> Span: execution_id = _get_node_execution_id(node_execution) existing_span = span_by_execution_id.get(execution_id) @@ -726,11 +923,10 @@ class ArizePhoenixDataTrace(BaseTraceInstance): outputs_value = node_execution.outputs or {} created_at = node_execution.created_at or datetime.now() - elapsed_time = node_execution.elapsed_time or 0 - finished_at = created_at + timedelta(seconds=elapsed_time) + finished_at = _node_finished_at(node_execution) process_data = node_execution.process_data or {} - execution_metadata = node_execution.metadata or {} + execution_metadata = _execution_metadata_mapping(node_execution) node_metadata = {str(k): v for k, v in execution_metadata.items()} node_metadata.update( @@ -744,6 +940,10 @@ class ArizePhoenixDataTrace(BaseTraceInstance): "status": node_execution.status, "status_message": node_execution.error or "", "level": "ERROR" if node_execution.status == WorkflowNodeExecutionStatus.FAILED else "DEFAULT", + "loop_id": _metadata_or_attr(node_execution, node_metadata, "loop_id"), + "loop_index": _metadata_or_attr(node_execution, node_metadata, "loop_index"), + "iteration_id": _metadata_or_attr(node_execution, node_metadata, "iteration_id"), + "iteration_index": _metadata_or_attr(node_execution, node_metadata, "iteration_index"), } ) @@ -765,7 +965,9 @@ class ArizePhoenixDataTrace(BaseTraceInstance): node_metadata["prompt_tokens"] = usage_data.get("prompt_tokens", 0) node_metadata["completion_tokens"] = usage_data.get("completion_tokens", 0) - parent_span = _resolve_node_parent( + wrapper_group_key = wrapper_key_by_child_execution_id.get(execution_id) + wrapper_parent_span = emit_wrapper_span(wrapper_group_key) if wrapper_group_key else None + parent_span = wrapper_parent_span or _resolve_node_parent( execution_id=execution_id, predecessor_execution_id=None, structured_parent_execution_id=structured_parent_execution_id, @@ -774,6 +976,8 @@ class ArizePhoenixDataTrace(BaseTraceInstance): workflow_span=workflow_span, ) workflow_span_context = set_span_in_context(parent_span) + loop_index = node_metadata.get("loop_index") + iteration_index = node_metadata.get("iteration_index") node_span = self.tracer.start_span( name=_resolve_workflow_node_span_name(node_execution, node_title_by_id), attributes={ @@ -784,6 +988,13 @@ class ArizePhoenixDataTrace(BaseTraceInstance): SpanAttributes.OUTPUT_MIME_TYPE: OpenInferenceMimeTypeValues.JSON.value, SpanAttributes.METADATA: safe_json_dumps(node_metadata), SpanAttributes.SESSION_ID: workflow_session_id or "", + "dify.node.execution_id": execution_id, + "dify.node.loop_id": cast(AttributeValue, node_metadata.get("loop_id") or ""), + "dify.node.loop_index": cast(AttributeValue, loop_index if loop_index is not None else ""), + "dify.node.iteration_id": cast(AttributeValue, node_metadata.get("iteration_id") or ""), + "dify.node.iteration_index": cast( + AttributeValue, iteration_index if iteration_index is not None else "" + ), }, start_time=datetime_to_nanos(created_at), context=workflow_span_context, @@ -824,15 +1035,18 @@ class ArizePhoenixDataTrace(BaseTraceInstance): finally: if node_span_error is not None: set_span_status(node_span, node_span_error) - elif node_execution.status == WorkflowNodeExecutionStatus.FAILED: + elif _node_execution_failed(node_execution) or _node_execution_handled_exception(node_execution): set_span_status(node_span, node_execution.error) else: set_span_status(node_span) node_span.end(end_time=datetime_to_nanos(finished_at)) return node_span - for node_execution in workflow_node_executions: - emit_node_span(node_execution) + try: + for node_execution in workflow_node_executions: + emit_node_span(node_execution) + finally: + finalize_wrapper_spans() except Exception as e: workflow_span_error = e raise @@ -1205,6 +1419,7 @@ class ArizePhoenixDataTrace(BaseTraceInstance): dify_trace_id: str | None, *, root_span_name: str | None = None, + root_span_error: Exception | str | None = None, root_span_attributes: Mapping[str, AttributeValue] | None = None, ): """Ensure a unique root span exists for the given Dify trace ID.""" @@ -1226,7 +1441,7 @@ class ArizePhoenixDataTrace(BaseTraceInstance): with use_span(root_span, end_on_exit=False): self.propagator.inject(carrier=carrier) - set_span_status(root_span) + set_span_status(root_span, root_span_error) root_span.end() self.dify_trace_ids.add(trace_key) self.root_span_carriers[trace_key] = carrier diff --git a/api/providers/trace/trace-arize-phoenix/tests/unit_tests/arize_phoenix_trace/test_arize_phoenix_trace.py b/api/providers/trace/trace-arize-phoenix/tests/unit_tests/arize_phoenix_trace/test_arize_phoenix_trace.py index 9b244e3008..0c62880715 100644 --- a/api/providers/trace/trace-arize-phoenix/tests/unit_tests/arize_phoenix_trace/test_arize_phoenix_trace.py +++ b/api/providers/trace/trace-arize-phoenix/tests/unit_tests/arize_phoenix_trace/test_arize_phoenix_trace.py @@ -12,7 +12,9 @@ from dify_trace_arize_phoenix.arize_phoenix_trace import ( ArizePhoenixDataTrace, _app_uses_phoenix_provider, _build_graph_parent_index, + _build_wrapper_groups, _get_node_span_kind, + _normalize_wrapper_index, _parent_workflow_can_publish_span_context, _phoenix_parent_span_redis_key, _resolve_node_parent, @@ -49,7 +51,7 @@ from core.ops.entities.trace_entity import ( WorkflowTraceInfo, ) from core.ops.exceptions import PendingTraceParentContextError -from graphon.enums import BUILT_IN_NODE_TYPES, BuiltinNodeTypes +from graphon.enums import BUILT_IN_NODE_TYPES, BuiltinNodeTypes, WorkflowNodeExecutionStatus # --- Helpers --- @@ -275,6 +277,32 @@ def test_app_uses_phoenix_provider_only_for_enabled_arize_or_phoenix(): assert _app_uses_phoenix_provider(None) is False +def test_normalize_wrapper_index_accepts_stable_values(): + assert _normalize_wrapper_index(0) == "0" + assert _normalize_wrapper_index(12) == "12" + assert _normalize_wrapper_index("01") == "01" + assert _normalize_wrapper_index("branch-1_A.2:3") == "branch-1_A.2:3" + + +@pytest.mark.parametrize( + "value", + [ + True, + False, + -1, + 1.0, + "", + " 1", + "1 ", + "group/1", + "group]1", + None, + ], +) +def test_normalize_wrapper_index_rejects_unstable_values(value): + assert _normalize_wrapper_index(value) is None + + def test_parent_workflow_can_publish_span_context_keeps_unknown_parent_retryable(monkeypatch): monkeypatch.setattr( "dify_trace_arize_phoenix.arize_phoenix_trace.db.session.query", @@ -557,6 +585,59 @@ class TestWorkflowHierarchyHelpers: assert structured_parent_execution_id == "iteration-execution-1" + def test_build_wrapper_groups_groups_loop_children_by_index(self): + loop = _make_workflow_node_trace_info( + node_execution_id="loop-execution-1", + node_id="loop-node-1", + node_type="loop", + ) + first = _make_workflow_node_trace_info( + node_execution_id="first-execution-1", + node_id="body-node-1", + node_type="tool", + loop_id="loop-node-1", + loop_index=0, + ) + second = _make_workflow_node_trace_info( + node_execution_id="second-execution-1", + node_id="body-node-2", + node_type="tool", + loop_id="loop-node-1", + loop_index=1, + ) + + groups = _build_wrapper_groups([loop, first, second]) + + assert [(group.key.wrapper_type, group.key.index) for group in groups.values()] == [ + ("loop", "0"), + ("loop", "1"), + ] + assert [group.container_execution_id for group in groups.values()] == [ + "loop-execution-1", + "loop-execution-1", + ] + + def test_build_wrapper_groups_skips_ambiguous_container_graph_ids(self): + first_loop = _make_workflow_node_trace_info( + node_execution_id="loop-execution-1", + node_id="loop-node-1", + node_type="loop", + ) + second_loop = _make_workflow_node_trace_info( + node_execution_id="loop-execution-2", + node_id="loop-node-1", + node_type="loop", + ) + child = _make_workflow_node_trace_info( + node_execution_id="child-execution-1", + node_id="body-node-1", + node_type="tool", + loop_id="loop-node-1", + loop_index=0, + ) + + assert _build_wrapper_groups([first_loop, second_loop, child]) == {} + @patch("dify_trace_arize_phoenix.arize_phoenix_trace.GrpcOTLPSpanExporter") @patch("dify_trace_arize_phoenix.arize_phoenix_trace.trace_sdk.TracerProvider") @@ -764,6 +845,34 @@ def test_workflow_trace_uses_workflow_run_id_for_root_span_and_populates_root_in assert root_span_call.kwargs["attributes"][SpanAttributes.OUTPUT_MIME_TYPE] == "application/json" +@patch("dify_trace_arize_phoenix.arize_phoenix_trace.db") +@patch("dify_trace_arize_phoenix.arize_phoenix_trace.DifyCoreRepositoryFactory") +@patch("dify_trace_arize_phoenix.arize_phoenix_trace.sessionmaker") +def test_workflow_trace_propagates_workflow_error_to_root_span( + mock_sessionmaker, + mock_repo_factory, + mock_db, + trace_instance, +): + mock_db.engine = MagicMock() + info = _make_workflow_info( + workflow_run_status="failed", + error="Traceback (most recent call last): RuntimeError: workflow failed", + ) + repo = MagicMock() + repo.get_by_workflow_execution.return_value = [] + mock_repo_factory.create_workflow_node_execution_repository.return_value = repo + + with ( + patch.object(trace_instance, "get_service_account_with_tenant", return_value=MagicMock()), + patch.object(trace_instance, "ensure_root_span", return_value={}) as mock_ensure_root_span, + ): + trace_instance.workflow_trace(info) + + mock_ensure_root_span.assert_called_once() + assert mock_ensure_root_span.call_args.kwargs["root_span_error"] == info.error + + @patch("dify_trace_arize_phoenix.arize_phoenix_trace.db") @patch("dify_trace_arize_phoenix.arize_phoenix_trace.DifyCoreRepositoryFactory") @patch("dify_trace_arize_phoenix.arize_phoenix_trace.sessionmaker") @@ -1512,6 +1621,213 @@ def test_workflow_trace_keeps_duplicate_body_node_children_under_enclosing_struc assert child_node_call.kwargs["context"] == f"context:{enclosing_node_type}" +@patch("dify_trace_arize_phoenix.arize_phoenix_trace.db") +@patch("dify_trace_arize_phoenix.arize_phoenix_trace.DifyCoreRepositoryFactory") +@patch("dify_trace_arize_phoenix.arize_phoenix_trace.sessionmaker") +def test_workflow_trace_records_exception_node_event_without_failing_root_span( + mock_sessionmaker, mock_repo_factory, mock_db, trace_instance +): + mock_db.engine = MagicMock() + info = _make_workflow_info(workflow_run_status="succeeded", error=None) + repo = MagicMock() + handled_error_node = _make_node_execution( + id="code-execution-1", + node_execution_id="code-execution-1", + node_id="code-node-1", + node_type="code", + title="Code with error handling", + status=WorkflowNodeExecutionStatus.EXCEPTION, + error="Traceback (most recent call last): RuntimeError: handled failure", + metadata={"error_strategy": "fail-branch"}, + ) + repo.get_by_workflow_execution.return_value = [handled_error_node] + mock_repo_factory.create_workflow_node_execution_repository.return_value = repo + + root_span = MagicMock(name="root-span") + workflow_span = MagicMock(name="workflow-span") + node_span = MagicMock(name="node-span") + trace_instance.tracer.start_span.side_effect = [root_span, workflow_span, node_span] + + with ( + patch.object(trace_instance, "get_service_account_with_tenant", return_value=MagicMock()), + patch.object(trace_instance.propagator, "extract", return_value="root-context"), + ): + trace_instance.workflow_trace(info) + + root_span.set_status.assert_called_once() + assert root_span.set_status.call_args.args[0].status_code == StatusCode.OK + workflow_span.set_status.assert_called_once() + assert workflow_span.set_status.call_args.args[0].status_code == StatusCode.OK + node_span.set_status.assert_called_once() + assert node_span.set_status.call_args.args[0].status_code == StatusCode.ERROR + node_span.add_event.assert_called_once() + assert node_span.add_event.call_args.kwargs["name"] == "exception" + assert node_span.add_event.call_args.kwargs["attributes"][OTELSpanAttributes.EXCEPTION_MESSAGE] == ( + "Traceback (most recent call last): RuntimeError: handled failure" + ) + + +@patch("dify_trace_arize_phoenix.arize_phoenix_trace.db") +@patch("dify_trace_arize_phoenix.arize_phoenix_trace.DifyCoreRepositoryFactory") +@patch("dify_trace_arize_phoenix.arize_phoenix_trace.sessionmaker") +def test_workflow_trace_groups_loop_iteration_children_under_wrapper_spans( + mock_sessionmaker, mock_repo_factory, mock_db, trace_instance +): + mock_db.engine = MagicMock() + info = _make_workflow_info(conversation_id="conversation-1") + repo = MagicMock() + loop_node = _make_node_execution( + id="loop-execution-1", + node_execution_id="loop-execution-1", + node_id="loop-node-1", + node_type="loop", + title="Main Loop", + ) + first_body_node = _make_node_execution( + id="body-execution-1", + node_execution_id="body-execution-1", + node_id="body-node-1", + node_type="tool", + loop_id="loop-node-1", + metadata={"loop_id": "loop-node-1", "loop_index": 0}, + ) + second_body_node = _make_node_execution( + id="body-execution-2", + node_execution_id="body-execution-2", + node_id="body-node-1", + node_type="tool", + loop_id="loop-node-1", + metadata={"loop_id": "loop-node-1", "loop_index": 1}, + ) + repo.get_by_workflow_execution.return_value = [loop_node, first_body_node, second_body_node] + mock_repo_factory.create_workflow_node_execution_repository.return_value = repo + + workflow_span = MagicMock(name="workflow-span") + workflow_span._context_label = "workflow" + loop_span = MagicMock(name="loop-span") + loop_span._context_label = "loop" + first_wrapper_span = MagicMock(name="loop-wrapper-0") + first_wrapper_span._context_label = "loop-wrapper-0" + first_body_span = MagicMock(name="body-span-1") + first_body_span._context_label = "body-1" + second_wrapper_span = MagicMock(name="loop-wrapper-1") + second_wrapper_span._context_label = "loop-wrapper-1" + second_body_span = MagicMock(name="body-span-2") + second_body_span._context_label = "body-2" + trace_instance.tracer.start_span.side_effect = [ + workflow_span, + loop_span, + first_wrapper_span, + first_body_span, + second_wrapper_span, + second_body_span, + ] + + with ( + patch.object(trace_instance, "get_service_account_with_tenant", return_value=MagicMock()), + patch.object(trace_instance, "ensure_root_span", return_value={}), + patch.object(trace_instance.propagator, "extract", return_value="root-context"), + patch( + "dify_trace_arize_phoenix.arize_phoenix_trace.set_span_in_context", + side_effect=lambda span: f"context:{span._context_label}", + ), + ): + trace_instance.workflow_trace(info) + + first_wrapper_call = _get_start_span_call(trace_instance.tracer.start_span, span_name="loop[0]") + second_wrapper_call = _get_start_span_call(trace_instance.tracer.start_span, span_name="loop[1]") + first_body_call = trace_instance.tracer.start_span.call_args_list[3] + second_body_call = trace_instance.tracer.start_span.call_args_list[5] + + assert first_wrapper_call.kwargs["context"] == "context:loop" + assert second_wrapper_call.kwargs["context"] == "context:loop" + assert first_wrapper_call.kwargs["attributes"]["dify.wrapper.synthetic"] is True + assert first_wrapper_call.kwargs["attributes"]["dify.wrapper.index"] == "0" + assert first_body_call.kwargs["context"] == "context:loop-wrapper-0" + assert second_body_call.kwargs["context"] == "context:loop-wrapper-1" + assert first_body_call.kwargs["attributes"][SpanAttributes.SESSION_ID] == "conversation-1" + assert json.loads(first_body_call.kwargs["attributes"][SpanAttributes.METADATA])["loop_index"] == 0 + assert first_body_call.kwargs["attributes"]["dify.node.loop_index"] == 0 + + +@patch("dify_trace_arize_phoenix.arize_phoenix_trace.db") +@patch("dify_trace_arize_phoenix.arize_phoenix_trace.DifyCoreRepositoryFactory") +@patch("dify_trace_arize_phoenix.arize_phoenix_trace.sessionmaker") +def test_workflow_trace_finalizes_loop_wrapper_with_child_time_bounds_and_error_status( + mock_sessionmaker, mock_repo_factory, mock_db, trace_instance +): + mock_db.engine = MagicMock() + info = _make_workflow_info() + repo = MagicMock() + loop_node = _make_node_execution( + id="loop-execution-1", + node_execution_id="loop-execution-1", + node_id="loop-node-1", + node_type="loop", + ) + first_body_node = _make_node_execution( + id="body-execution-1", + node_execution_id="body-execution-1", + node_id="body-node-1", + node_type="tool", + created_at=_dt() + timedelta(seconds=3), + elapsed_time=2.0, + loop_id="loop-node-1", + metadata={"loop_id": "loop-node-1", "loop_index": 0}, + ) + failed_body_node = _make_node_execution( + id="body-execution-2", + node_execution_id="body-execution-2", + node_id="body-node-2", + node_type="tool", + status=WorkflowNodeExecutionStatus.FAILED, + error="body failed", + created_at=_dt() + timedelta(seconds=4), + elapsed_time=5.0, + loop_id="loop-node-1", + metadata={"loop_id": "loop-node-1", "loop_index": 0}, + ) + repo.get_by_workflow_execution.return_value = [loop_node, first_body_node, failed_body_node] + mock_repo_factory.create_workflow_node_execution_repository.return_value = repo + + workflow_span = MagicMock(name="workflow-span") + workflow_span._context_label = "workflow" + loop_span = MagicMock(name="loop-span") + loop_span._context_label = "loop" + wrapper_span = MagicMock(name="loop-wrapper-0") + wrapper_span._context_label = "loop-wrapper-0" + first_body_span = MagicMock(name="body-span-1") + first_body_span._context_label = "body-1" + failed_body_span = MagicMock(name="body-span-2") + failed_body_span._context_label = "body-2" + trace_instance.tracer.start_span.side_effect = [ + workflow_span, + loop_span, + wrapper_span, + first_body_span, + failed_body_span, + ] + + with ( + patch.object(trace_instance, "get_service_account_with_tenant", return_value=MagicMock()), + patch.object(trace_instance, "ensure_root_span", return_value={}), + patch.object(trace_instance.propagator, "extract", return_value="root-context"), + patch( + "dify_trace_arize_phoenix.arize_phoenix_trace.set_span_in_context", + side_effect=lambda span: f"context:{span._context_label}", + ), + ): + trace_instance.workflow_trace(info) + + wrapper_call = _get_start_span_call(trace_instance.tracer.start_span, span_name="loop[0]") + assert wrapper_call.kwargs["start_time"] == datetime_to_nanos(first_body_node.created_at) + wrapper_span.end.assert_called_once_with( + end_time=datetime_to_nanos(failed_body_node.created_at + timedelta(seconds=failed_body_node.elapsed_time)) + ) + wrapper_span.set_status.assert_called_once() + assert wrapper_span.set_status.call_args.args[0].status_code == StatusCode.ERROR + + @patch("dify_trace_arize_phoenix.arize_phoenix_trace.db") @patch("dify_trace_arize_phoenix.arize_phoenix_trace.DifyCoreRepositoryFactory") @patch("dify_trace_arize_phoenix.arize_phoenix_trace.sessionmaker") @@ -1770,6 +2086,19 @@ def test_ensure_root_span_uses_custom_name_and_attributes(trace_instance): ) +def test_ensure_root_span_records_error_status_and_exception_event(trace_instance): + trace_instance.ensure_root_span("tid", root_span_error="workflow failed") + + root_span = trace_instance.tracer.start_span.return_value + root_span.set_status.assert_called_once() + assert root_span.set_status.call_args.args[0].status_code == StatusCode.ERROR + root_span.add_event.assert_called_once() + assert root_span.add_event.call_args.kwargs["name"] == "exception" + assert root_span.add_event.call_args.kwargs["attributes"][OTELSpanAttributes.EXCEPTION_MESSAGE] == ( + "workflow failed" + ) + + def test_ensure_root_span_falls_back_to_dify_name_when_custom_name_is_blank(trace_instance): trace_instance.ensure_root_span("tid", root_span_name=" ")