From fa910be0f65e79b6b93dc83ab2424797355b8267 Mon Sep 17 00:00:00 2001 From: Anubhav Singh Date: Thu, 20 Nov 2025 09:07:01 +0530 Subject: [PATCH] Fix duration displayed for workflow steps on Weave dashboard (#28289) --- api/core/ops/weave_trace/weave_trace.py | 94 +++++++++++++++++++++---- 1 file changed, 81 insertions(+), 13 deletions(-) diff --git a/api/core/ops/weave_trace/weave_trace.py b/api/core/ops/weave_trace/weave_trace.py index 9b3d7a8192..2134be0bce 100644 --- a/api/core/ops/weave_trace/weave_trace.py +++ b/api/core/ops/weave_trace/weave_trace.py @@ -1,12 +1,20 @@ import logging import os import uuid -from datetime import datetime, timedelta +from datetime import UTC, datetime, timedelta from typing import Any, cast import wandb import weave from sqlalchemy.orm import sessionmaker +from weave.trace_server.trace_server_interface import ( + CallEndReq, + CallStartReq, + EndedCallSchemaForInsert, + StartedCallSchemaForInsert, + SummaryInsertMap, + TraceStatus, +) from core.ops.base_trace_instance import BaseTraceInstance from core.ops.entities.config_entity import WeaveConfig @@ -57,6 +65,7 @@ class WeaveDataTrace(BaseTraceInstance): ) self.file_base_url = os.getenv("FILES_URL", "http://127.0.0.1:5001") self.calls: dict[str, Any] = {} + self.project_id = f"{self.weave_client.entity}/{self.weave_client.project}" def get_project_url( self, @@ -424,6 +433,13 @@ class WeaveDataTrace(BaseTraceInstance): logger.debug("Weave API check failed: %s", str(e)) raise ValueError(f"Weave API check failed: {str(e)}") + def _normalize_time(self, dt: datetime | None) -> datetime: + if dt is None: + return datetime.now(UTC) + if dt.tzinfo is None: + return dt.replace(tzinfo=UTC) + return dt + def start_call(self, run_data: WeaveTraceModel, parent_run_id: str | None = None): inputs = run_data.inputs if inputs is None: @@ -437,19 +453,71 @@ class WeaveDataTrace(BaseTraceInstance): elif not isinstance(attributes, dict): attributes = {"attributes": str(attributes)} - call = self.weave_client.create_call( - op=run_data.op, - inputs=inputs, - attributes=attributes, + start_time = attributes.get("start_time") if isinstance(attributes, dict) else None + started_at = self._normalize_time(start_time if isinstance(start_time, datetime) else None) + trace_id = attributes.get("trace_id") if isinstance(attributes, dict) else None + if trace_id is None: + trace_id = run_data.id + + call_start_req = CallStartReq( + start=StartedCallSchemaForInsert( + project_id=self.project_id, + id=run_data.id, + op_name=str(run_data.op), + trace_id=trace_id, + parent_id=parent_run_id, + started_at=started_at, + attributes=attributes, + inputs=inputs, + wb_user_id=None, + ) ) - self.calls[run_data.id] = call - if parent_run_id: - self.calls[run_data.id].parent_id = parent_run_id + self.weave_client.server.call_start(call_start_req) + self.calls[run_data.id] = {"trace_id": trace_id, "parent_id": parent_run_id} def finish_call(self, run_data: WeaveTraceModel): - call = self.calls.get(run_data.id) - if call: - exception = Exception(run_data.exception) if run_data.exception else None - self.weave_client.finish_call(call=call, output=run_data.outputs, exception=exception) - else: + call_meta = self.calls.get(run_data.id) + if not call_meta: raise ValueError(f"Call with id {run_data.id} not found") + + attributes = run_data.attributes + if attributes is None: + attributes = {} + elif not isinstance(attributes, dict): + attributes = {"attributes": str(attributes)} + + start_time = attributes.get("start_time") if isinstance(attributes, dict) else None + end_time = attributes.get("end_time") if isinstance(attributes, dict) else None + started_at = self._normalize_time(start_time if isinstance(start_time, datetime) else None) + ended_at = self._normalize_time(end_time if isinstance(end_time, datetime) else None) + elapsed_ms = int((ended_at - started_at).total_seconds() * 1000) + if elapsed_ms < 0: + elapsed_ms = 0 + + status_counts = { + TraceStatus.SUCCESS: 0, + TraceStatus.ERROR: 0, + } + if run_data.exception: + status_counts[TraceStatus.ERROR] = 1 + else: + status_counts[TraceStatus.SUCCESS] = 1 + + summary: dict[str, Any] = { + "status_counts": status_counts, + "weave": {"latency_ms": elapsed_ms}, + } + + exception_str = str(run_data.exception) if run_data.exception else None + + call_end_req = CallEndReq( + end=EndedCallSchemaForInsert( + project_id=self.project_id, + id=run_data.id, + ended_at=ended_at, + exception=exception_str, + output=run_data.outputs, + summary=cast(SummaryInsertMap, summary), + ) + ) + self.weave_client.server.call_end(call_end_req)