fix(api): add Phoenix wrapper spans and error tracing (#36388)

Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
This commit is contained in:
Blackoutta 2026-05-19 18:09:23 +08:00 committed by GitHub
parent 2565637e36
commit d1417bbe4b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 573 additions and 29 deletions

View File

@ -4,6 +4,7 @@ import os
import re
import traceback
from collections.abc import Mapping, Sequence
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from typing import Any, Protocol, Union, cast
from urllib.parse import urlparse
@ -59,6 +60,7 @@ _PHOENIX_PARENT_SPAN_CONTEXT_TTL_SECONDS = 300
_TRACEPARENT_PATTERN = re.compile(
r"^(?P<version>[0-9a-f]{2})-(?P<trace_id>[0-9a-f]{32})-(?P<span_id>[0-9a-f]{16})-(?P<flags>[0-9a-f]{2})$"
)
_WRAPPER_INDEX_PATTERN = re.compile(r"^[A-Za-z0-9_.:-]+$")
def _phoenix_parent_span_redis_key(parent_node_execution_id: str) -> str:
@ -261,21 +263,7 @@ def set_span_status(current_span: Span, error: Exception | str | None = None):
if error:
error_string = error_to_string(error)
current_span.set_status(Status(StatusCode.ERROR, error_string))
if isinstance(error, Exception):
current_span.record_exception(error)
else:
exception_type = error.__class__.__name__
exception_message = str(error)
if not exception_message:
exception_message = repr(error)
attributes: dict[str, AttributeValue] = {
exception_attributes.EXCEPTION_TYPE: exception_type,
exception_attributes.EXCEPTION_MESSAGE: exception_message,
exception_attributes.EXCEPTION_ESCAPED: False,
exception_attributes.EXCEPTION_STACKTRACE: error_string,
}
current_span.add_event(name="exception", attributes=attributes)
_record_exception_event(current_span, error)
else:
current_span.set_status(Status(StatusCode.OK))
@ -473,6 +461,155 @@ def _build_execution_id_by_node_id(node_executions: Sequence[_NodeExecutionIdent
return execution_id_by_node_id
@dataclass(frozen=True)
class _WrapperGroupKey:
wrapper_type: str
container_execution_id: str
index: str
@dataclass
class _WrapperGroup:
key: _WrapperGroupKey
container_execution_id: str
child_execution_ids: set[str] = field(default_factory=set)
start_time: datetime | None = None
end_time: datetime | None = None
has_error: bool = False
def _normalize_wrapper_index(value: Any) -> str | None:
"""Normalize stable loop/iteration indexes used for synthetic wrapper spans."""
if isinstance(value, bool):
return None
if isinstance(value, int):
return str(value) if value >= 0 else None
if isinstance(value, str) and _WRAPPER_INDEX_PATTERN.fullmatch(value):
return value
return None
def _execution_metadata_mapping(node_execution: object) -> Mapping[Any, Any]:
"""Return execution metadata from repository models and test doubles."""
execution_metadata = getattr(node_execution, "execution_metadata_dict", None)
if isinstance(execution_metadata, Mapping):
return execution_metadata
execution_metadata = getattr(node_execution, "metadata", None)
if isinstance(execution_metadata, Mapping):
return execution_metadata
return {}
def _node_finished_at(node_execution: object) -> datetime:
end_time = getattr(node_execution, "end_time", None)
if isinstance(end_time, datetime):
return end_time
created_at = getattr(node_execution, "created_at", None) or getattr(node_execution, "start_time", None)
if not isinstance(created_at, datetime):
created_at = datetime.now()
elapsed_time = getattr(node_execution, "elapsed_time", None) or 0.0
return created_at + timedelta(seconds=elapsed_time)
def _metadata_or_attr(node_execution: object, node_metadata: Mapping[str, Any], key: str) -> Any:
value = node_metadata.get(key)
if value is not None:
return value
return getattr(node_execution, key, None)
def _node_execution_failed(node_execution: object) -> bool:
status = getattr(node_execution, "status", None)
return status in {WorkflowNodeExecutionStatus.FAILED, "failed"}
def _node_execution_handled_exception(node_execution: object) -> bool:
status = getattr(node_execution, "status", None)
return status in {WorkflowNodeExecutionStatus.EXCEPTION, "exception"}
def _record_exception_event(current_span: Span, error: Exception | str | None = None) -> None:
if not error:
return
error_string = error_to_string(error)
if isinstance(error, Exception):
current_span.record_exception(error)
return
exception_message = str(error) or repr(error)
attributes: dict[str, AttributeValue] = {
exception_attributes.EXCEPTION_TYPE: error.__class__.__name__,
exception_attributes.EXCEPTION_MESSAGE: exception_message,
exception_attributes.EXCEPTION_ESCAPED: False,
exception_attributes.EXCEPTION_STACKTRACE: error_string,
}
current_span.add_event(name="exception", attributes=attributes)
def _resolve_wrapper_group_key(
node_execution: _NodeExecutionIdentityLike,
node_metadata: Mapping[Any, Any],
execution_id_by_node_id: Mapping[str, str],
) -> _WrapperGroupKey | None:
for wrapper_type, container_key, index_key in (
("iteration", "iteration_id", "iteration_index"),
("loop", "loop_id", "loop_index"),
):
container_id = _metadata_or_attr(node_execution, node_metadata, container_key)
if not isinstance(container_id, str) or not container_id:
continue
container_execution_id = execution_id_by_node_id.get(container_id)
if container_execution_id is None or container_execution_id == _get_node_execution_id(node_execution):
continue
index = _normalize_wrapper_index(_metadata_or_attr(node_execution, node_metadata, index_key))
if index is None:
continue
return _WrapperGroupKey(
wrapper_type=wrapper_type,
container_execution_id=container_execution_id,
index=index,
)
return None
def _build_wrapper_groups(
node_executions: Sequence[_NodeExecutionIdentityLike],
) -> dict[_WrapperGroupKey, _WrapperGroup]:
"""Group repeated loop/iteration body executions behind synthetic Phoenix spans."""
execution_id_by_node_id = _build_execution_id_by_node_id(node_executions)
groups: dict[_WrapperGroupKey, _WrapperGroup] = {}
for node_execution in node_executions:
node_metadata = _execution_metadata_mapping(node_execution)
group_key = _resolve_wrapper_group_key(node_execution, node_metadata, execution_id_by_node_id)
if group_key is None:
continue
group = groups.setdefault(
group_key,
_WrapperGroup(key=group_key, container_execution_id=group_key.container_execution_id),
)
execution_id = _get_node_execution_id(node_execution)
group.child_execution_ids.add(execution_id)
created_at = getattr(node_execution, "created_at", None) or getattr(node_execution, "start_time", None)
if not isinstance(created_at, datetime):
created_at = datetime.now()
finished_at = _node_finished_at(node_execution)
group.start_time = created_at if group.start_time is None else min(group.start_time, created_at)
group.end_time = finished_at if group.end_time is None else max(group.end_time, finished_at)
group.has_error = group.has_error or _node_execution_failed(node_execution)
return groups
def _build_graph_parent_index(node_executions: Sequence[_NodeExecutionIdentityLike]) -> dict[str, str]:
"""Build an execution-id parent index from predecessor node ids."""
execution_id_by_node_id = _build_execution_id_by_node_id(node_executions)
@ -634,16 +771,19 @@ class ArizePhoenixDataTrace(BaseTraceInstance):
if not isinstance(workflow_root_span_name, str) or not workflow_root_span_name.strip():
workflow_root_span_name = None
workflow_parent_carrier = self.ensure_root_span(
root_trace_id,
root_span_name=workflow_root_span_name,
root_span_attributes={
root_span_kwargs: dict[str, Any] = {
"root_span_name": workflow_root_span_name,
"root_span_attributes": {
SpanAttributes.INPUT_VALUE: safe_json_dumps(trace_info.workflow_run_inputs),
SpanAttributes.INPUT_MIME_TYPE: OpenInferenceMimeTypeValues.JSON.value,
SpanAttributes.OUTPUT_VALUE: safe_json_dumps(trace_info.workflow_run_outputs),
SpanAttributes.OUTPUT_MIME_TYPE: OpenInferenceMimeTypeValues.JSON.value,
},
)
}
if trace_info.error:
root_span_kwargs["root_span_error"] = trace_info.error
workflow_parent_carrier = self.ensure_root_span(root_trace_id, **root_span_kwargs)
workflow_span_context = self.propagator.extract(carrier=workflow_parent_carrier)
@ -691,10 +831,67 @@ class ArizePhoenixDataTrace(BaseTraceInstance):
}
span_by_execution_id: dict[str, Span] = {}
emitting_execution_ids: set[str] = set()
wrapper_groups = _build_wrapper_groups(workflow_node_executions)
wrapper_span_by_key: dict[_WrapperGroupKey, Span] = {}
finalized_wrapper_keys: set[_WrapperGroupKey] = set()
wrapper_key_by_child_execution_id: dict[str, _WrapperGroupKey] = {}
for group_key, group in wrapper_groups.items():
for child_execution_id in group.child_execution_ids:
wrapper_key_by_child_execution_id[child_execution_id] = group_key
workflow_span_error: Exception | str | None = trace_info.error
try:
def emit_wrapper_span(group_key: _WrapperGroupKey) -> Span | None:
existing_span = wrapper_span_by_key.get(group_key)
if existing_span is not None:
return existing_span
group = wrapper_groups.get(group_key)
if group is None:
return None
if group.container_execution_id not in span_by_execution_id:
parent_node_execution = node_execution_by_execution_id.get(group.container_execution_id)
if parent_node_execution is not None:
emit_node_span(parent_node_execution)
container_span = span_by_execution_id.get(group.container_execution_id)
if container_span is None:
return None
metadata = {
"synthetic": True,
"wrapper_type": group_key.wrapper_type,
"wrapper_index": group_key.index,
"container_execution_id": group.container_execution_id,
}
wrapper_span = self.tracer.start_span(
name=f"{group_key.wrapper_type}[{group_key.index}]",
attributes={
SpanAttributes.OPENINFERENCE_SPAN_KIND: OpenInferenceSpanKindValues.CHAIN.value,
SpanAttributes.METADATA: safe_json_dumps(metadata),
SpanAttributes.SESSION_ID: workflow_session_id or "",
"dify.wrapper.synthetic": True,
"dify.wrapper.type": group_key.wrapper_type,
"dify.wrapper.index": group_key.index,
"dify.wrapper.container_execution_id": group.container_execution_id,
},
start_time=datetime_to_nanos(group.start_time),
context=set_span_in_context(container_span),
)
wrapper_span_by_key[group_key] = wrapper_span
return wrapper_span
def finalize_wrapper_spans() -> None:
for group_key, wrapper_span in list(wrapper_span_by_key.items()):
if group_key in finalized_wrapper_keys:
continue
group = wrapper_groups[group_key]
set_span_status(wrapper_span, "wrapper child failed" if group.has_error else None)
wrapper_span.end(end_time=datetime_to_nanos(group.end_time))
finalized_wrapper_keys.add(group_key)
def emit_node_span(node_execution: _NodeExecutionLike) -> Span:
execution_id = _get_node_execution_id(node_execution)
existing_span = span_by_execution_id.get(execution_id)
@ -726,11 +923,10 @@ class ArizePhoenixDataTrace(BaseTraceInstance):
outputs_value = node_execution.outputs or {}
created_at = node_execution.created_at or datetime.now()
elapsed_time = node_execution.elapsed_time or 0
finished_at = created_at + timedelta(seconds=elapsed_time)
finished_at = _node_finished_at(node_execution)
process_data = node_execution.process_data or {}
execution_metadata = node_execution.metadata or {}
execution_metadata = _execution_metadata_mapping(node_execution)
node_metadata = {str(k): v for k, v in execution_metadata.items()}
node_metadata.update(
@ -744,6 +940,10 @@ class ArizePhoenixDataTrace(BaseTraceInstance):
"status": node_execution.status,
"status_message": node_execution.error or "",
"level": "ERROR" if node_execution.status == WorkflowNodeExecutionStatus.FAILED else "DEFAULT",
"loop_id": _metadata_or_attr(node_execution, node_metadata, "loop_id"),
"loop_index": _metadata_or_attr(node_execution, node_metadata, "loop_index"),
"iteration_id": _metadata_or_attr(node_execution, node_metadata, "iteration_id"),
"iteration_index": _metadata_or_attr(node_execution, node_metadata, "iteration_index"),
}
)
@ -765,7 +965,9 @@ class ArizePhoenixDataTrace(BaseTraceInstance):
node_metadata["prompt_tokens"] = usage_data.get("prompt_tokens", 0)
node_metadata["completion_tokens"] = usage_data.get("completion_tokens", 0)
parent_span = _resolve_node_parent(
wrapper_group_key = wrapper_key_by_child_execution_id.get(execution_id)
wrapper_parent_span = emit_wrapper_span(wrapper_group_key) if wrapper_group_key else None
parent_span = wrapper_parent_span or _resolve_node_parent(
execution_id=execution_id,
predecessor_execution_id=None,
structured_parent_execution_id=structured_parent_execution_id,
@ -774,6 +976,8 @@ class ArizePhoenixDataTrace(BaseTraceInstance):
workflow_span=workflow_span,
)
workflow_span_context = set_span_in_context(parent_span)
loop_index = node_metadata.get("loop_index")
iteration_index = node_metadata.get("iteration_index")
node_span = self.tracer.start_span(
name=_resolve_workflow_node_span_name(node_execution, node_title_by_id),
attributes={
@ -784,6 +988,13 @@ class ArizePhoenixDataTrace(BaseTraceInstance):
SpanAttributes.OUTPUT_MIME_TYPE: OpenInferenceMimeTypeValues.JSON.value,
SpanAttributes.METADATA: safe_json_dumps(node_metadata),
SpanAttributes.SESSION_ID: workflow_session_id or "",
"dify.node.execution_id": execution_id,
"dify.node.loop_id": cast(AttributeValue, node_metadata.get("loop_id") or ""),
"dify.node.loop_index": cast(AttributeValue, loop_index if loop_index is not None else ""),
"dify.node.iteration_id": cast(AttributeValue, node_metadata.get("iteration_id") or ""),
"dify.node.iteration_index": cast(
AttributeValue, iteration_index if iteration_index is not None else ""
),
},
start_time=datetime_to_nanos(created_at),
context=workflow_span_context,
@ -824,15 +1035,18 @@ class ArizePhoenixDataTrace(BaseTraceInstance):
finally:
if node_span_error is not None:
set_span_status(node_span, node_span_error)
elif node_execution.status == WorkflowNodeExecutionStatus.FAILED:
elif _node_execution_failed(node_execution) or _node_execution_handled_exception(node_execution):
set_span_status(node_span, node_execution.error)
else:
set_span_status(node_span)
node_span.end(end_time=datetime_to_nanos(finished_at))
return node_span
for node_execution in workflow_node_executions:
emit_node_span(node_execution)
try:
for node_execution in workflow_node_executions:
emit_node_span(node_execution)
finally:
finalize_wrapper_spans()
except Exception as e:
workflow_span_error = e
raise
@ -1205,6 +1419,7 @@ class ArizePhoenixDataTrace(BaseTraceInstance):
dify_trace_id: str | None,
*,
root_span_name: str | None = None,
root_span_error: Exception | str | None = None,
root_span_attributes: Mapping[str, AttributeValue] | None = None,
):
"""Ensure a unique root span exists for the given Dify trace ID."""
@ -1226,7 +1441,7 @@ class ArizePhoenixDataTrace(BaseTraceInstance):
with use_span(root_span, end_on_exit=False):
self.propagator.inject(carrier=carrier)
set_span_status(root_span)
set_span_status(root_span, root_span_error)
root_span.end()
self.dify_trace_ids.add(trace_key)
self.root_span_carriers[trace_key] = carrier

View File

@ -12,7 +12,9 @@ from dify_trace_arize_phoenix.arize_phoenix_trace import (
ArizePhoenixDataTrace,
_app_uses_phoenix_provider,
_build_graph_parent_index,
_build_wrapper_groups,
_get_node_span_kind,
_normalize_wrapper_index,
_parent_workflow_can_publish_span_context,
_phoenix_parent_span_redis_key,
_resolve_node_parent,
@ -49,7 +51,7 @@ from core.ops.entities.trace_entity import (
WorkflowTraceInfo,
)
from core.ops.exceptions import PendingTraceParentContextError
from graphon.enums import BUILT_IN_NODE_TYPES, BuiltinNodeTypes
from graphon.enums import BUILT_IN_NODE_TYPES, BuiltinNodeTypes, WorkflowNodeExecutionStatus
# --- Helpers ---
@ -275,6 +277,32 @@ def test_app_uses_phoenix_provider_only_for_enabled_arize_or_phoenix():
assert _app_uses_phoenix_provider(None) is False
def test_normalize_wrapper_index_accepts_stable_values():
assert _normalize_wrapper_index(0) == "0"
assert _normalize_wrapper_index(12) == "12"
assert _normalize_wrapper_index("01") == "01"
assert _normalize_wrapper_index("branch-1_A.2:3") == "branch-1_A.2:3"
@pytest.mark.parametrize(
"value",
[
True,
False,
-1,
1.0,
"",
" 1",
"1 ",
"group/1",
"group]1",
None,
],
)
def test_normalize_wrapper_index_rejects_unstable_values(value):
assert _normalize_wrapper_index(value) is None
def test_parent_workflow_can_publish_span_context_keeps_unknown_parent_retryable(monkeypatch):
monkeypatch.setattr(
"dify_trace_arize_phoenix.arize_phoenix_trace.db.session.query",
@ -557,6 +585,59 @@ class TestWorkflowHierarchyHelpers:
assert structured_parent_execution_id == "iteration-execution-1"
def test_build_wrapper_groups_groups_loop_children_by_index(self):
loop = _make_workflow_node_trace_info(
node_execution_id="loop-execution-1",
node_id="loop-node-1",
node_type="loop",
)
first = _make_workflow_node_trace_info(
node_execution_id="first-execution-1",
node_id="body-node-1",
node_type="tool",
loop_id="loop-node-1",
loop_index=0,
)
second = _make_workflow_node_trace_info(
node_execution_id="second-execution-1",
node_id="body-node-2",
node_type="tool",
loop_id="loop-node-1",
loop_index=1,
)
groups = _build_wrapper_groups([loop, first, second])
assert [(group.key.wrapper_type, group.key.index) for group in groups.values()] == [
("loop", "0"),
("loop", "1"),
]
assert [group.container_execution_id for group in groups.values()] == [
"loop-execution-1",
"loop-execution-1",
]
def test_build_wrapper_groups_skips_ambiguous_container_graph_ids(self):
first_loop = _make_workflow_node_trace_info(
node_execution_id="loop-execution-1",
node_id="loop-node-1",
node_type="loop",
)
second_loop = _make_workflow_node_trace_info(
node_execution_id="loop-execution-2",
node_id="loop-node-1",
node_type="loop",
)
child = _make_workflow_node_trace_info(
node_execution_id="child-execution-1",
node_id="body-node-1",
node_type="tool",
loop_id="loop-node-1",
loop_index=0,
)
assert _build_wrapper_groups([first_loop, second_loop, child]) == {}
@patch("dify_trace_arize_phoenix.arize_phoenix_trace.GrpcOTLPSpanExporter")
@patch("dify_trace_arize_phoenix.arize_phoenix_trace.trace_sdk.TracerProvider")
@ -764,6 +845,34 @@ def test_workflow_trace_uses_workflow_run_id_for_root_span_and_populates_root_in
assert root_span_call.kwargs["attributes"][SpanAttributes.OUTPUT_MIME_TYPE] == "application/json"
@patch("dify_trace_arize_phoenix.arize_phoenix_trace.db")
@patch("dify_trace_arize_phoenix.arize_phoenix_trace.DifyCoreRepositoryFactory")
@patch("dify_trace_arize_phoenix.arize_phoenix_trace.sessionmaker")
def test_workflow_trace_propagates_workflow_error_to_root_span(
mock_sessionmaker,
mock_repo_factory,
mock_db,
trace_instance,
):
mock_db.engine = MagicMock()
info = _make_workflow_info(
workflow_run_status="failed",
error="Traceback (most recent call last): RuntimeError: workflow failed",
)
repo = MagicMock()
repo.get_by_workflow_execution.return_value = []
mock_repo_factory.create_workflow_node_execution_repository.return_value = repo
with (
patch.object(trace_instance, "get_service_account_with_tenant", return_value=MagicMock()),
patch.object(trace_instance, "ensure_root_span", return_value={}) as mock_ensure_root_span,
):
trace_instance.workflow_trace(info)
mock_ensure_root_span.assert_called_once()
assert mock_ensure_root_span.call_args.kwargs["root_span_error"] == info.error
@patch("dify_trace_arize_phoenix.arize_phoenix_trace.db")
@patch("dify_trace_arize_phoenix.arize_phoenix_trace.DifyCoreRepositoryFactory")
@patch("dify_trace_arize_phoenix.arize_phoenix_trace.sessionmaker")
@ -1512,6 +1621,213 @@ def test_workflow_trace_keeps_duplicate_body_node_children_under_enclosing_struc
assert child_node_call.kwargs["context"] == f"context:{enclosing_node_type}"
@patch("dify_trace_arize_phoenix.arize_phoenix_trace.db")
@patch("dify_trace_arize_phoenix.arize_phoenix_trace.DifyCoreRepositoryFactory")
@patch("dify_trace_arize_phoenix.arize_phoenix_trace.sessionmaker")
def test_workflow_trace_records_exception_node_event_without_failing_root_span(
mock_sessionmaker, mock_repo_factory, mock_db, trace_instance
):
mock_db.engine = MagicMock()
info = _make_workflow_info(workflow_run_status="succeeded", error=None)
repo = MagicMock()
handled_error_node = _make_node_execution(
id="code-execution-1",
node_execution_id="code-execution-1",
node_id="code-node-1",
node_type="code",
title="Code with error handling",
status=WorkflowNodeExecutionStatus.EXCEPTION,
error="Traceback (most recent call last): RuntimeError: handled failure",
metadata={"error_strategy": "fail-branch"},
)
repo.get_by_workflow_execution.return_value = [handled_error_node]
mock_repo_factory.create_workflow_node_execution_repository.return_value = repo
root_span = MagicMock(name="root-span")
workflow_span = MagicMock(name="workflow-span")
node_span = MagicMock(name="node-span")
trace_instance.tracer.start_span.side_effect = [root_span, workflow_span, node_span]
with (
patch.object(trace_instance, "get_service_account_with_tenant", return_value=MagicMock()),
patch.object(trace_instance.propagator, "extract", return_value="root-context"),
):
trace_instance.workflow_trace(info)
root_span.set_status.assert_called_once()
assert root_span.set_status.call_args.args[0].status_code == StatusCode.OK
workflow_span.set_status.assert_called_once()
assert workflow_span.set_status.call_args.args[0].status_code == StatusCode.OK
node_span.set_status.assert_called_once()
assert node_span.set_status.call_args.args[0].status_code == StatusCode.ERROR
node_span.add_event.assert_called_once()
assert node_span.add_event.call_args.kwargs["name"] == "exception"
assert node_span.add_event.call_args.kwargs["attributes"][OTELSpanAttributes.EXCEPTION_MESSAGE] == (
"Traceback (most recent call last): RuntimeError: handled failure"
)
@patch("dify_trace_arize_phoenix.arize_phoenix_trace.db")
@patch("dify_trace_arize_phoenix.arize_phoenix_trace.DifyCoreRepositoryFactory")
@patch("dify_trace_arize_phoenix.arize_phoenix_trace.sessionmaker")
def test_workflow_trace_groups_loop_iteration_children_under_wrapper_spans(
mock_sessionmaker, mock_repo_factory, mock_db, trace_instance
):
mock_db.engine = MagicMock()
info = _make_workflow_info(conversation_id="conversation-1")
repo = MagicMock()
loop_node = _make_node_execution(
id="loop-execution-1",
node_execution_id="loop-execution-1",
node_id="loop-node-1",
node_type="loop",
title="Main Loop",
)
first_body_node = _make_node_execution(
id="body-execution-1",
node_execution_id="body-execution-1",
node_id="body-node-1",
node_type="tool",
loop_id="loop-node-1",
metadata={"loop_id": "loop-node-1", "loop_index": 0},
)
second_body_node = _make_node_execution(
id="body-execution-2",
node_execution_id="body-execution-2",
node_id="body-node-1",
node_type="tool",
loop_id="loop-node-1",
metadata={"loop_id": "loop-node-1", "loop_index": 1},
)
repo.get_by_workflow_execution.return_value = [loop_node, first_body_node, second_body_node]
mock_repo_factory.create_workflow_node_execution_repository.return_value = repo
workflow_span = MagicMock(name="workflow-span")
workflow_span._context_label = "workflow"
loop_span = MagicMock(name="loop-span")
loop_span._context_label = "loop"
first_wrapper_span = MagicMock(name="loop-wrapper-0")
first_wrapper_span._context_label = "loop-wrapper-0"
first_body_span = MagicMock(name="body-span-1")
first_body_span._context_label = "body-1"
second_wrapper_span = MagicMock(name="loop-wrapper-1")
second_wrapper_span._context_label = "loop-wrapper-1"
second_body_span = MagicMock(name="body-span-2")
second_body_span._context_label = "body-2"
trace_instance.tracer.start_span.side_effect = [
workflow_span,
loop_span,
first_wrapper_span,
first_body_span,
second_wrapper_span,
second_body_span,
]
with (
patch.object(trace_instance, "get_service_account_with_tenant", return_value=MagicMock()),
patch.object(trace_instance, "ensure_root_span", return_value={}),
patch.object(trace_instance.propagator, "extract", return_value="root-context"),
patch(
"dify_trace_arize_phoenix.arize_phoenix_trace.set_span_in_context",
side_effect=lambda span: f"context:{span._context_label}",
),
):
trace_instance.workflow_trace(info)
first_wrapper_call = _get_start_span_call(trace_instance.tracer.start_span, span_name="loop[0]")
second_wrapper_call = _get_start_span_call(trace_instance.tracer.start_span, span_name="loop[1]")
first_body_call = trace_instance.tracer.start_span.call_args_list[3]
second_body_call = trace_instance.tracer.start_span.call_args_list[5]
assert first_wrapper_call.kwargs["context"] == "context:loop"
assert second_wrapper_call.kwargs["context"] == "context:loop"
assert first_wrapper_call.kwargs["attributes"]["dify.wrapper.synthetic"] is True
assert first_wrapper_call.kwargs["attributes"]["dify.wrapper.index"] == "0"
assert first_body_call.kwargs["context"] == "context:loop-wrapper-0"
assert second_body_call.kwargs["context"] == "context:loop-wrapper-1"
assert first_body_call.kwargs["attributes"][SpanAttributes.SESSION_ID] == "conversation-1"
assert json.loads(first_body_call.kwargs["attributes"][SpanAttributes.METADATA])["loop_index"] == 0
assert first_body_call.kwargs["attributes"]["dify.node.loop_index"] == 0
@patch("dify_trace_arize_phoenix.arize_phoenix_trace.db")
@patch("dify_trace_arize_phoenix.arize_phoenix_trace.DifyCoreRepositoryFactory")
@patch("dify_trace_arize_phoenix.arize_phoenix_trace.sessionmaker")
def test_workflow_trace_finalizes_loop_wrapper_with_child_time_bounds_and_error_status(
mock_sessionmaker, mock_repo_factory, mock_db, trace_instance
):
mock_db.engine = MagicMock()
info = _make_workflow_info()
repo = MagicMock()
loop_node = _make_node_execution(
id="loop-execution-1",
node_execution_id="loop-execution-1",
node_id="loop-node-1",
node_type="loop",
)
first_body_node = _make_node_execution(
id="body-execution-1",
node_execution_id="body-execution-1",
node_id="body-node-1",
node_type="tool",
created_at=_dt() + timedelta(seconds=3),
elapsed_time=2.0,
loop_id="loop-node-1",
metadata={"loop_id": "loop-node-1", "loop_index": 0},
)
failed_body_node = _make_node_execution(
id="body-execution-2",
node_execution_id="body-execution-2",
node_id="body-node-2",
node_type="tool",
status=WorkflowNodeExecutionStatus.FAILED,
error="body failed",
created_at=_dt() + timedelta(seconds=4),
elapsed_time=5.0,
loop_id="loop-node-1",
metadata={"loop_id": "loop-node-1", "loop_index": 0},
)
repo.get_by_workflow_execution.return_value = [loop_node, first_body_node, failed_body_node]
mock_repo_factory.create_workflow_node_execution_repository.return_value = repo
workflow_span = MagicMock(name="workflow-span")
workflow_span._context_label = "workflow"
loop_span = MagicMock(name="loop-span")
loop_span._context_label = "loop"
wrapper_span = MagicMock(name="loop-wrapper-0")
wrapper_span._context_label = "loop-wrapper-0"
first_body_span = MagicMock(name="body-span-1")
first_body_span._context_label = "body-1"
failed_body_span = MagicMock(name="body-span-2")
failed_body_span._context_label = "body-2"
trace_instance.tracer.start_span.side_effect = [
workflow_span,
loop_span,
wrapper_span,
first_body_span,
failed_body_span,
]
with (
patch.object(trace_instance, "get_service_account_with_tenant", return_value=MagicMock()),
patch.object(trace_instance, "ensure_root_span", return_value={}),
patch.object(trace_instance.propagator, "extract", return_value="root-context"),
patch(
"dify_trace_arize_phoenix.arize_phoenix_trace.set_span_in_context",
side_effect=lambda span: f"context:{span._context_label}",
),
):
trace_instance.workflow_trace(info)
wrapper_call = _get_start_span_call(trace_instance.tracer.start_span, span_name="loop[0]")
assert wrapper_call.kwargs["start_time"] == datetime_to_nanos(first_body_node.created_at)
wrapper_span.end.assert_called_once_with(
end_time=datetime_to_nanos(failed_body_node.created_at + timedelta(seconds=failed_body_node.elapsed_time))
)
wrapper_span.set_status.assert_called_once()
assert wrapper_span.set_status.call_args.args[0].status_code == StatusCode.ERROR
@patch("dify_trace_arize_phoenix.arize_phoenix_trace.db")
@patch("dify_trace_arize_phoenix.arize_phoenix_trace.DifyCoreRepositoryFactory")
@patch("dify_trace_arize_phoenix.arize_phoenix_trace.sessionmaker")
@ -1770,6 +2086,19 @@ def test_ensure_root_span_uses_custom_name_and_attributes(trace_instance):
)
def test_ensure_root_span_records_error_status_and_exception_event(trace_instance):
trace_instance.ensure_root_span("tid", root_span_error="workflow failed")
root_span = trace_instance.tracer.start_span.return_value
root_span.set_status.assert_called_once()
assert root_span.set_status.call_args.args[0].status_code == StatusCode.ERROR
root_span.add_event.assert_called_once()
assert root_span.add_event.call_args.kwargs["name"] == "exception"
assert root_span.add_event.call_args.kwargs["attributes"][OTELSpanAttributes.EXCEPTION_MESSAGE] == (
"workflow failed"
)
def test_ensure_root_span_falls_back_to_dify_name_when_custom_name_is_blank(trace_instance):
trace_instance.ensure_root_span("tid", root_span_name=" ")