From 2e1efd62e1a9e568d9d587c063b10eed04d3100b Mon Sep 17 00:00:00 2001 From: -LAN- Date: Thu, 11 Dec 2025 12:53:37 +0800 Subject: [PATCH] revert: "fix(ops): add streaming metrics and LLM span for agent-chat traces" (#29469) --- .../advanced_chat/generate_task_pipeline.py | 90 ++----------------- api/core/app/entities/task_entities.py | 3 - .../easy_ui_based_generate_task_pipeline.py | 18 ---- api/core/ops/tencent_trace/span_builder.py | 53 ----------- api/core/ops/tencent_trace/tencent_trace.py | 6 +- api/models/model.py | 8 -- 6 files changed, 7 insertions(+), 171 deletions(-) diff --git a/api/core/app/apps/advanced_chat/generate_task_pipeline.py b/api/core/app/apps/advanced_chat/generate_task_pipeline.py index b297f3ff20..da1e9f19b6 100644 --- a/api/core/app/apps/advanced_chat/generate_task_pipeline.py +++ b/api/core/app/apps/advanced_chat/generate_task_pipeline.py @@ -62,8 +62,7 @@ from core.app.task_pipeline.message_cycle_manager import MessageCycleManager from core.base.tts import AppGeneratorTTSPublisher, AudioTrunk from core.model_runtime.entities.llm_entities import LLMUsage from core.model_runtime.utils.encoders import jsonable_encoder -from core.ops.entities.trace_entity import TraceTaskName -from core.ops.ops_trace_manager import TraceQueueManager, TraceTask +from core.ops.ops_trace_manager import TraceQueueManager from core.workflow.enums import WorkflowExecutionStatus from core.workflow.nodes import NodeType from core.workflow.repositories.draft_variable_repository import DraftVariableSaverFactory @@ -73,7 +72,7 @@ from extensions.ext_database import db from libs.datetime_utils import naive_utc_now from models import Account, Conversation, EndUser, Message, MessageFile from models.enums import CreatorUserRole -from models.workflow import Workflow, WorkflowNodeExecutionModel +from models.workflow import Workflow logger = logging.getLogger(__name__) @@ -581,7 +580,7 @@ class AdvancedChatAppGenerateTaskPipeline(GraphRuntimeStateSupport): with self._database_session() as session: # Save message - self._save_message(session=session, graph_runtime_state=resolved_state, trace_manager=trace_manager) + self._save_message(session=session, graph_runtime_state=resolved_state) yield workflow_finish_resp elif event.stopped_by in ( @@ -591,7 +590,7 @@ class AdvancedChatAppGenerateTaskPipeline(GraphRuntimeStateSupport): # When hitting input-moderation or annotation-reply, the workflow will not start with self._database_session() as session: # Save message - self._save_message(session=session, trace_manager=trace_manager) + self._save_message(session=session) yield self._message_end_to_stream_response() @@ -600,7 +599,6 @@ class AdvancedChatAppGenerateTaskPipeline(GraphRuntimeStateSupport): event: QueueAdvancedChatMessageEndEvent, *, graph_runtime_state: GraphRuntimeState | None = None, - trace_manager: TraceQueueManager | None = None, **kwargs, ) -> Generator[StreamResponse, None, None]: """Handle advanced chat message end events.""" @@ -618,7 +616,7 @@ class AdvancedChatAppGenerateTaskPipeline(GraphRuntimeStateSupport): # Save message with self._database_session() as session: - self._save_message(session=session, graph_runtime_state=resolved_state, trace_manager=trace_manager) + self._save_message(session=session, graph_runtime_state=resolved_state) yield self._message_end_to_stream_response() @@ -772,13 +770,7 @@ class AdvancedChatAppGenerateTaskPipeline(GraphRuntimeStateSupport): if self._conversation_name_generate_thread: logger.debug("Conversation name generation running as daemon thread") - def _save_message( - self, - *, - session: Session, - graph_runtime_state: GraphRuntimeState | None = None, - trace_manager: TraceQueueManager | None = None, - ): + def _save_message(self, *, session: Session, graph_runtime_state: GraphRuntimeState | None = None): message = self._get_message(session=session) # If there are assistant files, remove markdown image links from answer @@ -817,14 +809,6 @@ class AdvancedChatAppGenerateTaskPipeline(GraphRuntimeStateSupport): metadata = self._task_state.metadata.model_dump() message.message_metadata = json.dumps(jsonable_encoder(metadata)) - - # Extract model provider and model_id from workflow node executions for tracing - if message.workflow_run_id: - model_info = self._extract_model_info_from_workflow(session, message.workflow_run_id) - if model_info: - message.model_provider = model_info.get("provider") - message.model_id = model_info.get("model") - message_files = [ MessageFile( message_id=message.id, @@ -842,68 +826,6 @@ class AdvancedChatAppGenerateTaskPipeline(GraphRuntimeStateSupport): ] session.add_all(message_files) - # Trigger MESSAGE_TRACE for tracing integrations - if trace_manager: - trace_manager.add_trace_task( - TraceTask( - TraceTaskName.MESSAGE_TRACE, conversation_id=self._conversation_id, message_id=self._message_id - ) - ) - - def _extract_model_info_from_workflow(self, session: Session, workflow_run_id: str) -> dict[str, str] | None: - """ - Extract model provider and model_id from workflow node executions. - Returns dict with 'provider' and 'model' keys, or None if not found. - """ - try: - # Query workflow node executions for LLM or Agent nodes - stmt = ( - select(WorkflowNodeExecutionModel) - .where(WorkflowNodeExecutionModel.workflow_run_id == workflow_run_id) - .where(WorkflowNodeExecutionModel.node_type.in_(["llm", "agent"])) - .order_by(WorkflowNodeExecutionModel.created_at.desc()) - .limit(1) - ) - node_execution = session.scalar(stmt) - - if not node_execution: - return None - - # Try to extract from execution_metadata for agent nodes - if node_execution.execution_metadata: - try: - metadata = json.loads(node_execution.execution_metadata) - agent_log = metadata.get("agent_log", []) - # Look for the first agent thought with provider info - for log_entry in agent_log: - entry_metadata = log_entry.get("metadata", {}) - provider_str = entry_metadata.get("provider") - if provider_str: - # Parse format like "langgenius/deepseek/deepseek" - parts = provider_str.split("/") - if len(parts) >= 3: - return {"provider": parts[1], "model": parts[2]} - elif len(parts) == 2: - return {"provider": parts[0], "model": parts[1]} - except (json.JSONDecodeError, KeyError, AttributeError) as e: - logger.debug("Failed to parse execution_metadata: %s", e) - - # Try to extract from process_data for llm nodes - if node_execution.process_data: - try: - process_data = json.loads(node_execution.process_data) - provider = process_data.get("model_provider") - model = process_data.get("model_name") - if provider and model: - return {"provider": provider, "model": model} - except (json.JSONDecodeError, KeyError) as e: - logger.debug("Failed to parse process_data: %s", e) - - return None - except Exception as e: - logger.warning("Failed to extract model info from workflow: %s", e) - return None - def _seed_graph_runtime_state_from_queue_manager(self) -> None: """Bootstrap the cached runtime state from the queue manager when present.""" candidate = self._base_task_pipeline.queue_manager.graph_runtime_state diff --git a/api/core/app/entities/task_entities.py b/api/core/app/entities/task_entities.py index 7692128985..79a5e657b3 100644 --- a/api/core/app/entities/task_entities.py +++ b/api/core/app/entities/task_entities.py @@ -40,9 +40,6 @@ class EasyUITaskState(TaskState): """ llm_result: LLMResult - first_token_time: float | None = None - last_token_time: float | None = None - is_streaming_response: bool = False class WorkflowTaskState(TaskState): diff --git a/api/core/app/task_pipeline/easy_ui_based_generate_task_pipeline.py b/api/core/app/task_pipeline/easy_ui_based_generate_task_pipeline.py index 98548ddfbb..5c169f4db1 100644 --- a/api/core/app/task_pipeline/easy_ui_based_generate_task_pipeline.py +++ b/api/core/app/task_pipeline/easy_ui_based_generate_task_pipeline.py @@ -332,12 +332,6 @@ class EasyUIBasedGenerateTaskPipeline(BasedGenerateTaskPipeline): if not self._task_state.llm_result.prompt_messages: self._task_state.llm_result.prompt_messages = chunk.prompt_messages - # Track streaming response times - if self._task_state.first_token_time is None: - self._task_state.first_token_time = time.perf_counter() - self._task_state.is_streaming_response = True - self._task_state.last_token_time = time.perf_counter() - # handle output moderation chunk should_direct_answer = self._handle_output_moderation_chunk(cast(str, delta_text)) if should_direct_answer: @@ -404,18 +398,6 @@ class EasyUIBasedGenerateTaskPipeline(BasedGenerateTaskPipeline): message.total_price = usage.total_price message.currency = usage.currency self._task_state.llm_result.usage.latency = message.provider_response_latency - - # Add streaming metrics to usage if available - if self._task_state.is_streaming_response and self._task_state.first_token_time: - start_time = self.start_at - first_token_time = self._task_state.first_token_time - last_token_time = self._task_state.last_token_time or first_token_time - usage.time_to_first_token = round(first_token_time - start_time, 3) - usage.time_to_generate = round(last_token_time - first_token_time, 3) - - # Update metadata with the complete usage info - self._task_state.metadata.usage = usage - message.message_metadata = self._task_state.metadata.model_dump_json() if trace_manager: diff --git a/api/core/ops/tencent_trace/span_builder.py b/api/core/ops/tencent_trace/span_builder.py index db92e9b8bd..26e8779e3e 100644 --- a/api/core/ops/tencent_trace/span_builder.py +++ b/api/core/ops/tencent_trace/span_builder.py @@ -222,59 +222,6 @@ class TencentSpanBuilder: links=links, ) - @staticmethod - def build_message_llm_span( - trace_info: MessageTraceInfo, trace_id: int, parent_span_id: int, user_id: str - ) -> SpanData: - """Build LLM span for message traces with detailed LLM attributes.""" - status = Status(StatusCode.OK) - if trace_info.error: - status = Status(StatusCode.ERROR, trace_info.error) - - # Extract model information from `metadata`` or `message_data` - trace_metadata = trace_info.metadata or {} - message_data = trace_info.message_data or {} - - model_provider = trace_metadata.get("ls_provider") or ( - message_data.get("model_provider", "") if isinstance(message_data, dict) else "" - ) - model_name = trace_metadata.get("ls_model_name") or ( - message_data.get("model_id", "") if isinstance(message_data, dict) else "" - ) - - inputs_str = str(trace_info.inputs or "") - outputs_str = str(trace_info.outputs or "") - - attributes = { - GEN_AI_SESSION_ID: trace_metadata.get("conversation_id", ""), - GEN_AI_USER_ID: str(user_id), - GEN_AI_SPAN_KIND: GenAISpanKind.GENERATION.value, - GEN_AI_FRAMEWORK: "dify", - GEN_AI_MODEL_NAME: str(model_name), - GEN_AI_PROVIDER: str(model_provider), - GEN_AI_USAGE_INPUT_TOKENS: str(trace_info.message_tokens or 0), - GEN_AI_USAGE_OUTPUT_TOKENS: str(trace_info.answer_tokens or 0), - GEN_AI_USAGE_TOTAL_TOKENS: str(trace_info.total_tokens or 0), - GEN_AI_PROMPT: inputs_str, - GEN_AI_COMPLETION: outputs_str, - INPUT_VALUE: inputs_str, - OUTPUT_VALUE: outputs_str, - } - - if trace_info.is_streaming_request: - attributes[GEN_AI_IS_STREAMING_REQUEST] = "true" - - return SpanData( - trace_id=trace_id, - parent_span_id=parent_span_id, - span_id=TencentTraceUtils.convert_to_span_id(trace_info.message_id, "llm"), - name="GENERATION", - start_time=TencentSpanBuilder._get_time_nanoseconds(trace_info.start_time), - end_time=TencentSpanBuilder._get_time_nanoseconds(trace_info.end_time), - attributes=attributes, - status=status, - ) - @staticmethod def build_tool_span(trace_info: ToolTraceInfo, trace_id: int, parent_span_id: int) -> SpanData: """Build tool span.""" diff --git a/api/core/ops/tencent_trace/tencent_trace.py b/api/core/ops/tencent_trace/tencent_trace.py index c345cee7a9..93ec186863 100644 --- a/api/core/ops/tencent_trace/tencent_trace.py +++ b/api/core/ops/tencent_trace/tencent_trace.py @@ -107,12 +107,8 @@ class TencentDataTrace(BaseTraceInstance): links.append(TencentTraceUtils.create_link(trace_info.trace_id)) message_span = TencentSpanBuilder.build_message_span(trace_info, trace_id, str(user_id), links) - self.trace_client.add_span(message_span) - # Add LLM child span with detailed attributes - parent_span_id = TencentTraceUtils.convert_to_span_id(trace_info.message_id, "message") - llm_span = TencentSpanBuilder.build_message_llm_span(trace_info, trace_id, parent_span_id, str(user_id)) - self.trace_client.add_span(llm_span) + self.trace_client.add_span(message_span) self._record_message_llm_metrics(trace_info) diff --git a/api/models/model.py b/api/models/model.py index 6b0bf4b4a2..c8fa6fd406 100644 --- a/api/models/model.py +++ b/api/models/model.py @@ -1255,13 +1255,9 @@ class Message(Base): "id": self.id, "app_id": self.app_id, "conversation_id": self.conversation_id, - "model_provider": self.model_provider, "model_id": self.model_id, "inputs": self.inputs, "query": self.query, - "message_tokens": self.message_tokens, - "answer_tokens": self.answer_tokens, - "provider_response_latency": self.provider_response_latency, "total_price": self.total_price, "message": self.message, "answer": self.answer, @@ -1283,12 +1279,8 @@ class Message(Base): id=data["id"], app_id=data["app_id"], conversation_id=data["conversation_id"], - model_provider=data.get("model_provider"), model_id=data["model_id"], inputs=data["inputs"], - message_tokens=data.get("message_tokens", 0), - answer_tokens=data.get("answer_tokens", 0), - provider_response_latency=data.get("provider_response_latency", 0.0), total_price=data["total_price"], query=data["query"], message=data["message"],