diff --git a/api/core/ops/opik_trace/opik_trace.py b/api/core/ops/opik_trace/opik_trace.py index 2f59f6a54a..4eb47e3556 100644 --- a/api/core/ops/opik_trace/opik_trace.py +++ b/api/core/ops/opik_trace/opik_trace.py @@ -36,7 +36,7 @@ def wrap_dict(key_name, data): def wrap_metadata(metadata, **kwargs): """Add common metatada to all Traces and Spans""" - metadata["created_from"] = "opik" + metadata["created_from"] = "dify" metadata.update(kwargs) @@ -75,16 +75,23 @@ class OpikDataTrace(BaseTraceInstance): self.generate_name_trace(trace_info) def workflow_trace(self, trace_info: WorkflowTraceInfo): - dify_trace_id = trace_info.message_id or trace_info.workflow_app_log_id or trace_info.workflow_run_id + dify_trace_id = trace_info.workflow_run_id opik_trace_id = uuid4_to_uuid7(trace_info.start_time, dify_trace_id) + workflow_metadata = wrap_metadata( + trace_info.metadata, message_id=trace_info.message_id, workflow_app_log_id=trace_info.workflow_app_log_id + ) + root_span_id = None if trace_info.message_id: + dify_trace_id = trace_info.message_id + opik_trace_id = uuid4_to_uuid7(trace_info.start_time, dify_trace_id) + trace_data = { "id": opik_trace_id, "name": TraceTaskName.MESSAGE_TRACE.value, "start_time": trace_info.start_time, "end_time": trace_info.end_time, - "metadata": wrap_metadata(trace_info.metadata, message_id=trace_info.message_id), + "metadata": workflow_metadata, "input": wrap_dict("input", trace_info.workflow_run_inputs), "output": wrap_dict("output", trace_info.workflow_run_outputs), "tags": ["message", "workflow"], @@ -92,23 +99,34 @@ class OpikDataTrace(BaseTraceInstance): } self.add_trace(trace_data) - span_id = trace_info.workflow_app_log_id or trace_info.workflow_run_id - span_data = { - "trace_id": opik_trace_id, - "id": uuid4_to_uuid7(trace_info.start_time, span_id), - "parent_span_id": None, - "name": TraceTaskName.WORKFLOW_TRACE.value, - "type": "tool", - "start_time": trace_info.start_time, - "end_time": trace_info.end_time, - "metadata": wrap_metadata(trace_info.metadata), - "input": wrap_dict("input", trace_info.workflow_run_inputs), - "output": wrap_dict("output", trace_info.workflow_run_outputs), - "tags": ["workflow"], - "project_name": self.project, - } - - self.add_span(span_data) + root_span_id = uuid4_to_uuid7(trace_info.start_time, trace_info.workflow_run_id) + span_data = { + "id": root_span_id, + "parent_span_id": None, + "trace_id": opik_trace_id, + "name": TraceTaskName.WORKFLOW_TRACE.value, + "input": wrap_dict("input", trace_info.workflow_run_inputs), + "output": wrap_dict("output", trace_info.workflow_run_outputs), + "start_time": trace_info.start_time, + "end_time": trace_info.end_time, + "metadata": workflow_metadata, + "tags": ["workflow"], + "project_name": self.project, + } + self.add_span(span_data) + else: + trace_data = { + "id": opik_trace_id, + "name": TraceTaskName.MESSAGE_TRACE.value, + "start_time": trace_info.start_time, + "end_time": trace_info.end_time, + "metadata": workflow_metadata, + "input": wrap_dict("input", trace_info.workflow_run_inputs), + "output": wrap_dict("output", trace_info.workflow_run_outputs), + "tags": ["workflow"], + "project_name": self.project, + } + self.add_trace(trace_data) # through workflow_run_id get all_nodes_execution workflow_nodes_execution_id_records = (