From 052f50805f6e3d88c789ce424a0666348308a286 Mon Sep 17 00:00:00 2001 From: GareArc Date: Wed, 4 Feb 2026 20:24:50 -0800 Subject: [PATCH] feat(telemetry): add node_execution_id and app_id support to trace metadata - Forward kwargs to message_trace to preserve node_execution_id - Add node_execution_id extraction to all trace methods - Add app_id parameter to prompt generation API endpoints - Enable app_id tracing for rule_generate, code_generate, and structured_output operations --- api/controllers/console/app/generator.py | 17 ++++++++---- api/controllers/console/app/ops_trace.py | 15 ++++++++--- api/core/ops/ops_trace_manager.py | 20 +++++++++++++-- api/enterprise/telemetry/enterprise_trace.py | 27 ++++++++++++++++++++ api/services/ops_service.py | 21 ++++++++++++--- 5 files changed, 87 insertions(+), 13 deletions(-) diff --git a/api/controllers/console/app/generator.py b/api/controllers/console/app/generator.py index f81bcf7c0b..280b021bb5 100644 --- a/api/controllers/console/app/generator.py +++ b/api/controllers/console/app/generator.py @@ -30,6 +30,7 @@ class RuleGeneratePayload(BaseModel): instruction: str = Field(..., description="Rule generation instruction") model_config_data: dict[str, Any] = Field(..., alias="model_config", description="Model configuration") no_variable: bool = Field(default=False, description="Whether to exclude variables") + app_id: str | None = Field(default=None, description="App ID for prompt generation tracing") class RuleCodeGeneratePayload(RuleGeneratePayload): @@ -39,6 +40,7 @@ class RuleCodeGeneratePayload(RuleGeneratePayload): class RuleStructuredOutputPayload(BaseModel): instruction: str = Field(..., description="Structured output generation instruction") model_config_data: dict[str, Any] = Field(..., alias="model_config", description="Model configuration") + app_id: str | None = Field(default=None, description="App ID for prompt generation tracing") class InstructionGeneratePayload(BaseModel): @@ -49,6 +51,7 @@ class InstructionGeneratePayload(BaseModel): instruction: str = Field(..., description="Instruction for generation") model_config_data: dict[str, Any] = Field(..., alias="model_config", description="Model configuration") ideal_output: str = Field(default="", description="Expected ideal output") + app_id: str | None = Field(default=None, description="App ID for prompt generation tracing") class InstructionTemplatePayload(BaseModel): @@ -88,6 +91,7 @@ class RuleGenerateApi(Resource): model_config=args.model_config_data, no_variable=args.no_variable, user_id=account.id, + app_id=args.app_id, ) except ProviderTokenNotInitError as ex: raise ProviderNotInitializeError(ex.description) @@ -123,6 +127,7 @@ class RuleCodeGenerateApi(Resource): model_config=args.model_config_data, code_language=args.code_language, user_id=account.id, + app_id=args.app_id, ) except ProviderTokenNotInitError as ex: raise ProviderNotInitializeError(ex.description) @@ -157,6 +162,7 @@ class RuleStructuredOutputGenerateApi(Resource): instruction=args.instruction, model_config=args.model_config_data, user_id=account.id, + app_id=args.app_id, ) except ProviderTokenNotInitError as ex: raise ProviderNotInitializeError(ex.description) @@ -184,6 +190,7 @@ class InstructionGenerateApi(Resource): def post(self): args = InstructionGeneratePayload.model_validate(console_ns.payload) account, current_tenant_id = current_account_with_tenant() + app_id = args.app_id or args.flow_id providers: list[type[CodeNodeProvider]] = [Python3CodeProvider, JavascriptCodeProvider] code_provider: type[CodeNodeProvider] | None = next( (p for p in providers if p.is_accept_language(args.language)), None @@ -210,7 +217,7 @@ class InstructionGenerateApi(Resource): model_config=args.model_config_data, no_variable=True, user_id=account.id, - app_id=args.flow_id, + app_id=app_id, ) case "agent": return LLMGenerator.generate_rule_config( @@ -219,7 +226,7 @@ class InstructionGenerateApi(Resource): model_config=args.model_config_data, no_variable=True, user_id=account.id, - app_id=args.flow_id, + app_id=app_id, ) case "code": return LLMGenerator.generate_code( @@ -228,7 +235,7 @@ class InstructionGenerateApi(Resource): model_config=args.model_config_data, code_language=args.language, user_id=account.id, - app_id=args.flow_id, + app_id=app_id, ) case _: return {"error": f"invalid node type: {node_type}"} @@ -241,7 +248,7 @@ class InstructionGenerateApi(Resource): model_config=args.model_config_data, ideal_output=args.ideal_output, user_id=account.id, - app_id=args.flow_id, + app_id=app_id, ) if args.node_id != "" and args.current != "": return LLMGenerator.instruction_modify_workflow( @@ -254,7 +261,7 @@ class InstructionGenerateApi(Resource): ideal_output=args.ideal_output, workflow_service=WorkflowService(), user_id=account.id, - app_id=args.flow_id, + app_id=app_id, ) return {"error": "incompatible parameters"}, 400 except ProviderTokenNotInitError as ex: diff --git a/api/controllers/console/app/ops_trace.py b/api/controllers/console/app/ops_trace.py index cbcf513162..c5622c7006 100644 --- a/api/controllers/console/app/ops_trace.py +++ b/api/controllers/console/app/ops_trace.py @@ -1,6 +1,7 @@ from typing import Any from flask import request +from flask_login import current_user from flask_restx import Resource, fields from pydantic import BaseModel, Field from werkzeug.exceptions import BadRequest @@ -77,7 +78,10 @@ class TraceAppConfigApi(Resource): try: result = OpsService.create_tracing_app_config( - app_id=app_id, tracing_provider=args.tracing_provider, tracing_config=args.tracing_config + app_id=app_id, + tracing_provider=args.tracing_provider, + tracing_config=args.tracing_config, + account_id=current_user.id, ) if not result: raise TracingConfigIsExist() @@ -102,7 +106,10 @@ class TraceAppConfigApi(Resource): try: result = OpsService.update_tracing_app_config( - app_id=app_id, tracing_provider=args.tracing_provider, tracing_config=args.tracing_config + app_id=app_id, + tracing_provider=args.tracing_provider, + tracing_config=args.tracing_config, + account_id=current_user.id, ) if not result: raise TracingConfigNotExist() @@ -124,7 +131,9 @@ class TraceAppConfigApi(Resource): args = TraceProviderQuery.model_validate(request.args.to_dict(flat=True)) # type: ignore try: - result = OpsService.delete_tracing_app_config(app_id=app_id, tracing_provider=args.tracing_provider) + result = OpsService.delete_tracing_app_config( + app_id=app_id, tracing_provider=args.tracing_provider, account_id=current_user.id + ) if not result: raise TracingConfigNotExist() return {"result": "success"}, 204 diff --git a/api/core/ops/ops_trace_manager.py b/api/core/ops/ops_trace_manager.py index 4998c764ec..efeea1ee11 100644 --- a/api/core/ops/ops_trace_manager.py +++ b/api/core/ops/ops_trace_manager.py @@ -608,7 +608,7 @@ class TraceTask: TraceTaskName.WORKFLOW_TRACE: lambda: self.workflow_trace( workflow_run_id=self.workflow_run_id, conversation_id=self.conversation_id, user_id=self.user_id ), - TraceTaskName.MESSAGE_TRACE: lambda: self.message_trace(message_id=self.message_id), + TraceTaskName.MESSAGE_TRACE: lambda: self.message_trace(message_id=self.message_id, **self.kwargs), TraceTaskName.MODERATION_TRACE: lambda: self.moderation_trace( message_id=self.message_id, timer=self.timer, **self.kwargs ), @@ -736,7 +736,7 @@ class TraceTask: ) return workflow_trace_info - def message_trace(self, message_id: str | None): + def message_trace(self, message_id: str | None, **kwargs): if not message_id: return {} message_data = get_message_data(message_id) @@ -784,6 +784,8 @@ class TraceTask: "app_name": app_name, "workspace_name": workspace_name, } + if node_execution_id := kwargs.get("node_execution_id"): + metadata["node_execution_id"] = node_execution_id message_tokens = message_data.message_tokens @@ -825,6 +827,8 @@ class TraceTask: "preset_response": moderation_result.preset_response, "query": moderation_result.query, } + if node_execution_id := kwargs.get("node_execution_id"): + metadata["node_execution_id"] = node_execution_id # get workflow_app_log_id workflow_app_log_id = None @@ -866,6 +870,8 @@ class TraceTask: "workflow_run_id": message_data.workflow_run_id, "from_source": message_data.from_source, } + if node_execution_id := kwargs.get("node_execution_id"): + metadata["node_execution_id"] = node_execution_id # get workflow_app_log_id workflow_app_log_id = None @@ -952,6 +958,8 @@ class TraceTask: "workspace_name": workspace_name, "embedding_models": embedding_models, } + if node_execution_id := kwargs.get("node_execution_id"): + metadata["node_execution_id"] = node_execution_id dataset_retrieval_trace_info = DatasetRetrievalTraceInfo( trace_id=self.trace_id, @@ -1000,6 +1008,10 @@ class TraceTask: "error": error, "tool_parameters": tool_parameters, } + if message_data.workflow_run_id: + metadata["workflow_run_id"] = message_data.workflow_run_id + if node_execution_id := kwargs.get("node_execution_id"): + metadata["node_execution_id"] = node_execution_id file_url = "" message_file_data = db.session.query(MessageFile).filter_by(message_id=message_id).first() @@ -1054,6 +1066,8 @@ class TraceTask: "conversation_id": conversation_id, "tenant_id": tenant_id, } + if node_execution_id := kwargs.get("node_execution_id"): + metadata["node_execution_id"] = node_execution_id generate_name_trace_info = GenerateNameTraceInfo( trace_id=self.trace_id, @@ -1109,6 +1123,8 @@ class TraceTask: "model_provider": model_provider, "model_name": model_name, } + if node_execution_id := kwargs.get("node_execution_id"): + metadata["node_execution_id"] = node_execution_id return PromptGenerationTraceInfo( trace_id=self.trace_id, diff --git a/api/enterprise/telemetry/enterprise_trace.py b/api/enterprise/telemetry/enterprise_trace.py index f23f84637e..6388356fd7 100644 --- a/api/enterprise/telemetry/enterprise_trace.py +++ b/api/enterprise/telemetry/enterprise_trace.py @@ -357,6 +357,8 @@ class EnterpriseOtelTrace: "dify.workflow.run_id": info.metadata.get("workflow_run_id"), } ) + if info.metadata.get("node_execution_id"): + attrs["dify.node.execution_id"] = info.metadata.get("node_execution_id") if self._exporter.include_content: attrs["dify.message.inputs"] = self._maybe_json(info.inputs) @@ -370,6 +372,7 @@ class EnterpriseOtelTrace: event_name="dify.message.run", attributes=attrs, trace_id_source=info.metadata.get("workflow_run_id") or str(info.message_id) if info.message_id else None, + span_id_source=info.metadata.get("node_execution_id"), tenant_id=info.metadata.get("tenant_id"), user_id=info.metadata.get("user_id"), ) @@ -407,8 +410,11 @@ class EnterpriseOtelTrace: "gen_ai.tool.name": info.tool_name, "dify.tool.time_cost": info.time_cost, "dify.tool.error": info.error, + "dify.workflow.run_id": info.metadata.get("workflow_run_id"), } ) + if info.metadata.get("node_execution_id"): + attrs["dify.node.execution_id"] = info.metadata.get("node_execution_id") if self._exporter.include_content: attrs["dify.tool.inputs"] = self._maybe_json(info.tool_inputs) @@ -425,6 +431,7 @@ class EnterpriseOtelTrace: emit_metric_only_event( event_name="dify.tool.execution", attributes=attrs, + span_id_source=info.metadata.get("node_execution_id"), tenant_id=info.metadata.get("tenant_id"), user_id=info.metadata.get("user_id"), ) @@ -447,8 +454,11 @@ class EnterpriseOtelTrace: "dify.moderation.flagged": info.flagged, "dify.moderation.action": info.action, "dify.moderation.preset_response": info.preset_response, + "dify.workflow.run_id": info.metadata.get("workflow_run_id"), } ) + if info.metadata.get("node_execution_id"): + attrs["dify.node.execution_id"] = info.metadata.get("node_execution_id") if self._exporter.include_content: attrs["dify.moderation.query"] = info.query @@ -458,6 +468,7 @@ class EnterpriseOtelTrace: emit_metric_only_event( event_name="dify.moderation.check", attributes=attrs, + span_id_source=info.metadata.get("node_execution_id"), tenant_id=info.metadata.get("tenant_id"), user_id=info.metadata.get("user_id"), ) @@ -475,8 +486,11 @@ class EnterpriseOtelTrace: "gen_ai.provider.name": info.model_provider, "gen_ai.request.model": info.model_id, "dify.suggested_question.count": len(info.suggested_question), + "dify.workflow.run_id": info.metadata.get("workflow_run_id"), } ) + if info.metadata.get("node_execution_id"): + attrs["dify.node.execution_id"] = info.metadata.get("node_execution_id") if self._exporter.include_content: attrs["dify.suggested_question.questions"] = self._maybe_json(info.suggested_question) @@ -486,6 +500,7 @@ class EnterpriseOtelTrace: emit_metric_only_event( event_name="dify.suggested_question.generation", attributes=attrs, + span_id_source=info.metadata.get("node_execution_id"), tenant_id=info.metadata.get("tenant_id"), user_id=info.metadata.get("user_id"), ) @@ -498,6 +513,9 @@ class EnterpriseOtelTrace: def _dataset_retrieval_trace(self, info: DatasetRetrievalTraceInfo) -> None: attrs = self._common_attrs(info) attrs["dify.dataset.error"] = info.error + attrs["dify.workflow.run_id"] = info.metadata.get("workflow_run_id") + if info.metadata.get("node_execution_id"): + attrs["dify.node.execution_id"] = info.metadata.get("node_execution_id") docs = info.documents or [] dataset_ids: list[str] = [] @@ -550,6 +568,9 @@ class EnterpriseOtelTrace: emit_metric_only_event( event_name="dify.dataset.retrieval", attributes=attrs, + trace_id_source=info.metadata.get("workflow_run_id") or str(info.message_id) if info.message_id else None, + span_id_source=info.metadata.get("node_execution_id") + or (str(info.message_id) if info.message_id else None), tenant_id=info.metadata.get("tenant_id"), user_id=info.metadata.get("user_id"), ) @@ -567,6 +588,8 @@ class EnterpriseOtelTrace: def _generate_name_trace(self, info: GenerateNameTraceInfo) -> None: attrs = self._common_attrs(info) attrs["dify.conversation.id"] = info.conversation_id + if info.metadata.get("node_execution_id"): + attrs["dify.node.execution_id"] = info.metadata.get("node_execution_id") if self._exporter.include_content: attrs["dify.generate_name.inputs"] = self._maybe_json(info.inputs) @@ -579,6 +602,7 @@ class EnterpriseOtelTrace: emit_metric_only_event( event_name="dify.generate_name.execution", attributes=attrs, + span_id_source=info.metadata.get("node_execution_id"), tenant_id=info.tenant_id, user_id=info.metadata.get("user_id"), ) @@ -603,6 +627,8 @@ class EnterpriseOtelTrace: "dify.prompt_generation.latency": info.latency, "dify.prompt_generation.error": info.error, } + if info.metadata.get("node_execution_id"): + attrs["dify.node.execution_id"] = info.metadata.get("node_execution_id") if info.total_price is not None: attrs["dify.prompt_generation.total_price"] = info.total_price @@ -619,6 +645,7 @@ class EnterpriseOtelTrace: emit_metric_only_event( event_name="dify.prompt_generation.execution", attributes=attrs, + span_id_source=info.metadata.get("node_execution_id"), tenant_id=info.tenant_id, user_id=info.user_id, ) diff --git a/api/services/ops_service.py b/api/services/ops_service.py index 50ea832085..c1c92b2de8 100644 --- a/api/services/ops_service.py +++ b/api/services/ops_service.py @@ -1,3 +1,4 @@ +import logging from typing import Any from core.ops.entities.config_entity import BaseTracingConfig @@ -5,6 +6,8 @@ from core.ops.ops_trace_manager import OpsTraceManager, provider_config_map from extensions.ext_database import db from models.model import App, TraceAppConfig +logger = logging.getLogger(__name__) + class OpsService: @classmethod @@ -135,12 +138,13 @@ class OpsService: return trace_config_data.to_dict() @classmethod - def create_tracing_app_config(cls, app_id: str, tracing_provider: str, tracing_config: dict): + def create_tracing_app_config(cls, app_id: str, tracing_provider: str, tracing_config: dict, account_id: str): """ Create tracing app config :param app_id: app id :param tracing_provider: tracing provider :param tracing_config: tracing config + :param account_id: account id of the user creating the config :return: """ try: @@ -207,15 +211,19 @@ class OpsService: db.session.add(trace_config_data) db.session.commit() + # Log the creation with modifier information + logger.info("Trace config created: app_id=%s, provider=%s, created_by=%s", app_id, tracing_provider, account_id) + return {"result": "success"} @classmethod - def update_tracing_app_config(cls, app_id: str, tracing_provider: str, tracing_config: dict): + def update_tracing_app_config(cls, app_id: str, tracing_provider: str, tracing_config: dict, account_id: str): """ Update tracing app config :param app_id: app id :param tracing_provider: tracing provider :param tracing_config: tracing config + :param account_id: account id of the user updating the config :return: """ try: @@ -251,14 +259,18 @@ class OpsService: current_trace_config.tracing_config = tracing_config db.session.commit() + # Log the update with modifier information + logger.info("Trace config updated: app_id=%s, provider=%s, updated_by=%s", app_id, tracing_provider, account_id) + return current_trace_config.to_dict() @classmethod - def delete_tracing_app_config(cls, app_id: str, tracing_provider: str): + def delete_tracing_app_config(cls, app_id: str, tracing_provider: str, account_id: str): """ Delete tracing app config :param app_id: app id :param tracing_provider: tracing provider + :param account_id: account id of the user deleting the config :return: """ trace_config = ( @@ -270,6 +282,9 @@ class OpsService: if not trace_config: return None + # Log the deletion with modifier information + logger.info("Trace config deleted: app_id=%s, provider=%s, deleted_by=%s", app_id, tracing_provider, account_id) + db.session.delete(trace_config) db.session.commit()