feat(telemetry): add node_execution_id and app_id support to trace metadata

- Forward kwargs to message_trace to preserve node_execution_id
- Add node_execution_id extraction to all trace methods
- Add app_id parameter to prompt generation API endpoints
- Enable app_id tracing for rule_generate, code_generate, and structured_output operations
This commit is contained in:
GareArc 2026-02-04 20:24:50 -08:00
parent f5043a8ac8
commit 052f50805f
No known key found for this signature in database
5 changed files with 87 additions and 13 deletions

View File

@ -30,6 +30,7 @@ class RuleGeneratePayload(BaseModel):
instruction: str = Field(..., description="Rule generation instruction")
model_config_data: dict[str, Any] = Field(..., alias="model_config", description="Model configuration")
no_variable: bool = Field(default=False, description="Whether to exclude variables")
app_id: str | None = Field(default=None, description="App ID for prompt generation tracing")
class RuleCodeGeneratePayload(RuleGeneratePayload):
@ -39,6 +40,7 @@ class RuleCodeGeneratePayload(RuleGeneratePayload):
class RuleStructuredOutputPayload(BaseModel):
instruction: str = Field(..., description="Structured output generation instruction")
model_config_data: dict[str, Any] = Field(..., alias="model_config", description="Model configuration")
app_id: str | None = Field(default=None, description="App ID for prompt generation tracing")
class InstructionGeneratePayload(BaseModel):
@ -49,6 +51,7 @@ class InstructionGeneratePayload(BaseModel):
instruction: str = Field(..., description="Instruction for generation")
model_config_data: dict[str, Any] = Field(..., alias="model_config", description="Model configuration")
ideal_output: str = Field(default="", description="Expected ideal output")
app_id: str | None = Field(default=None, description="App ID for prompt generation tracing")
class InstructionTemplatePayload(BaseModel):
@ -88,6 +91,7 @@ class RuleGenerateApi(Resource):
model_config=args.model_config_data,
no_variable=args.no_variable,
user_id=account.id,
app_id=args.app_id,
)
except ProviderTokenNotInitError as ex:
raise ProviderNotInitializeError(ex.description)
@ -123,6 +127,7 @@ class RuleCodeGenerateApi(Resource):
model_config=args.model_config_data,
code_language=args.code_language,
user_id=account.id,
app_id=args.app_id,
)
except ProviderTokenNotInitError as ex:
raise ProviderNotInitializeError(ex.description)
@ -157,6 +162,7 @@ class RuleStructuredOutputGenerateApi(Resource):
instruction=args.instruction,
model_config=args.model_config_data,
user_id=account.id,
app_id=args.app_id,
)
except ProviderTokenNotInitError as ex:
raise ProviderNotInitializeError(ex.description)
@ -184,6 +190,7 @@ class InstructionGenerateApi(Resource):
def post(self):
args = InstructionGeneratePayload.model_validate(console_ns.payload)
account, current_tenant_id = current_account_with_tenant()
app_id = args.app_id or args.flow_id
providers: list[type[CodeNodeProvider]] = [Python3CodeProvider, JavascriptCodeProvider]
code_provider: type[CodeNodeProvider] | None = next(
(p for p in providers if p.is_accept_language(args.language)), None
@ -210,7 +217,7 @@ class InstructionGenerateApi(Resource):
model_config=args.model_config_data,
no_variable=True,
user_id=account.id,
app_id=args.flow_id,
app_id=app_id,
)
case "agent":
return LLMGenerator.generate_rule_config(
@ -219,7 +226,7 @@ class InstructionGenerateApi(Resource):
model_config=args.model_config_data,
no_variable=True,
user_id=account.id,
app_id=args.flow_id,
app_id=app_id,
)
case "code":
return LLMGenerator.generate_code(
@ -228,7 +235,7 @@ class InstructionGenerateApi(Resource):
model_config=args.model_config_data,
code_language=args.language,
user_id=account.id,
app_id=args.flow_id,
app_id=app_id,
)
case _:
return {"error": f"invalid node type: {node_type}"}
@ -241,7 +248,7 @@ class InstructionGenerateApi(Resource):
model_config=args.model_config_data,
ideal_output=args.ideal_output,
user_id=account.id,
app_id=args.flow_id,
app_id=app_id,
)
if args.node_id != "" and args.current != "":
return LLMGenerator.instruction_modify_workflow(
@ -254,7 +261,7 @@ class InstructionGenerateApi(Resource):
ideal_output=args.ideal_output,
workflow_service=WorkflowService(),
user_id=account.id,
app_id=args.flow_id,
app_id=app_id,
)
return {"error": "incompatible parameters"}, 400
except ProviderTokenNotInitError as ex:

View File

@ -1,6 +1,7 @@
from typing import Any
from flask import request
from flask_login import current_user
from flask_restx import Resource, fields
from pydantic import BaseModel, Field
from werkzeug.exceptions import BadRequest
@ -77,7 +78,10 @@ class TraceAppConfigApi(Resource):
try:
result = OpsService.create_tracing_app_config(
app_id=app_id, tracing_provider=args.tracing_provider, tracing_config=args.tracing_config
app_id=app_id,
tracing_provider=args.tracing_provider,
tracing_config=args.tracing_config,
account_id=current_user.id,
)
if not result:
raise TracingConfigIsExist()
@ -102,7 +106,10 @@ class TraceAppConfigApi(Resource):
try:
result = OpsService.update_tracing_app_config(
app_id=app_id, tracing_provider=args.tracing_provider, tracing_config=args.tracing_config
app_id=app_id,
tracing_provider=args.tracing_provider,
tracing_config=args.tracing_config,
account_id=current_user.id,
)
if not result:
raise TracingConfigNotExist()
@ -124,7 +131,9 @@ class TraceAppConfigApi(Resource):
args = TraceProviderQuery.model_validate(request.args.to_dict(flat=True)) # type: ignore
try:
result = OpsService.delete_tracing_app_config(app_id=app_id, tracing_provider=args.tracing_provider)
result = OpsService.delete_tracing_app_config(
app_id=app_id, tracing_provider=args.tracing_provider, account_id=current_user.id
)
if not result:
raise TracingConfigNotExist()
return {"result": "success"}, 204

View File

@ -608,7 +608,7 @@ class TraceTask:
TraceTaskName.WORKFLOW_TRACE: lambda: self.workflow_trace(
workflow_run_id=self.workflow_run_id, conversation_id=self.conversation_id, user_id=self.user_id
),
TraceTaskName.MESSAGE_TRACE: lambda: self.message_trace(message_id=self.message_id),
TraceTaskName.MESSAGE_TRACE: lambda: self.message_trace(message_id=self.message_id, **self.kwargs),
TraceTaskName.MODERATION_TRACE: lambda: self.moderation_trace(
message_id=self.message_id, timer=self.timer, **self.kwargs
),
@ -736,7 +736,7 @@ class TraceTask:
)
return workflow_trace_info
def message_trace(self, message_id: str | None):
def message_trace(self, message_id: str | None, **kwargs):
if not message_id:
return {}
message_data = get_message_data(message_id)
@ -784,6 +784,8 @@ class TraceTask:
"app_name": app_name,
"workspace_name": workspace_name,
}
if node_execution_id := kwargs.get("node_execution_id"):
metadata["node_execution_id"] = node_execution_id
message_tokens = message_data.message_tokens
@ -825,6 +827,8 @@ class TraceTask:
"preset_response": moderation_result.preset_response,
"query": moderation_result.query,
}
if node_execution_id := kwargs.get("node_execution_id"):
metadata["node_execution_id"] = node_execution_id
# get workflow_app_log_id
workflow_app_log_id = None
@ -866,6 +870,8 @@ class TraceTask:
"workflow_run_id": message_data.workflow_run_id,
"from_source": message_data.from_source,
}
if node_execution_id := kwargs.get("node_execution_id"):
metadata["node_execution_id"] = node_execution_id
# get workflow_app_log_id
workflow_app_log_id = None
@ -952,6 +958,8 @@ class TraceTask:
"workspace_name": workspace_name,
"embedding_models": embedding_models,
}
if node_execution_id := kwargs.get("node_execution_id"):
metadata["node_execution_id"] = node_execution_id
dataset_retrieval_trace_info = DatasetRetrievalTraceInfo(
trace_id=self.trace_id,
@ -1000,6 +1008,10 @@ class TraceTask:
"error": error,
"tool_parameters": tool_parameters,
}
if message_data.workflow_run_id:
metadata["workflow_run_id"] = message_data.workflow_run_id
if node_execution_id := kwargs.get("node_execution_id"):
metadata["node_execution_id"] = node_execution_id
file_url = ""
message_file_data = db.session.query(MessageFile).filter_by(message_id=message_id).first()
@ -1054,6 +1066,8 @@ class TraceTask:
"conversation_id": conversation_id,
"tenant_id": tenant_id,
}
if node_execution_id := kwargs.get("node_execution_id"):
metadata["node_execution_id"] = node_execution_id
generate_name_trace_info = GenerateNameTraceInfo(
trace_id=self.trace_id,
@ -1109,6 +1123,8 @@ class TraceTask:
"model_provider": model_provider,
"model_name": model_name,
}
if node_execution_id := kwargs.get("node_execution_id"):
metadata["node_execution_id"] = node_execution_id
return PromptGenerationTraceInfo(
trace_id=self.trace_id,

View File

@ -357,6 +357,8 @@ class EnterpriseOtelTrace:
"dify.workflow.run_id": info.metadata.get("workflow_run_id"),
}
)
if info.metadata.get("node_execution_id"):
attrs["dify.node.execution_id"] = info.metadata.get("node_execution_id")
if self._exporter.include_content:
attrs["dify.message.inputs"] = self._maybe_json(info.inputs)
@ -370,6 +372,7 @@ class EnterpriseOtelTrace:
event_name="dify.message.run",
attributes=attrs,
trace_id_source=info.metadata.get("workflow_run_id") or str(info.message_id) if info.message_id else None,
span_id_source=info.metadata.get("node_execution_id"),
tenant_id=info.metadata.get("tenant_id"),
user_id=info.metadata.get("user_id"),
)
@ -407,8 +410,11 @@ class EnterpriseOtelTrace:
"gen_ai.tool.name": info.tool_name,
"dify.tool.time_cost": info.time_cost,
"dify.tool.error": info.error,
"dify.workflow.run_id": info.metadata.get("workflow_run_id"),
}
)
if info.metadata.get("node_execution_id"):
attrs["dify.node.execution_id"] = info.metadata.get("node_execution_id")
if self._exporter.include_content:
attrs["dify.tool.inputs"] = self._maybe_json(info.tool_inputs)
@ -425,6 +431,7 @@ class EnterpriseOtelTrace:
emit_metric_only_event(
event_name="dify.tool.execution",
attributes=attrs,
span_id_source=info.metadata.get("node_execution_id"),
tenant_id=info.metadata.get("tenant_id"),
user_id=info.metadata.get("user_id"),
)
@ -447,8 +454,11 @@ class EnterpriseOtelTrace:
"dify.moderation.flagged": info.flagged,
"dify.moderation.action": info.action,
"dify.moderation.preset_response": info.preset_response,
"dify.workflow.run_id": info.metadata.get("workflow_run_id"),
}
)
if info.metadata.get("node_execution_id"):
attrs["dify.node.execution_id"] = info.metadata.get("node_execution_id")
if self._exporter.include_content:
attrs["dify.moderation.query"] = info.query
@ -458,6 +468,7 @@ class EnterpriseOtelTrace:
emit_metric_only_event(
event_name="dify.moderation.check",
attributes=attrs,
span_id_source=info.metadata.get("node_execution_id"),
tenant_id=info.metadata.get("tenant_id"),
user_id=info.metadata.get("user_id"),
)
@ -475,8 +486,11 @@ class EnterpriseOtelTrace:
"gen_ai.provider.name": info.model_provider,
"gen_ai.request.model": info.model_id,
"dify.suggested_question.count": len(info.suggested_question),
"dify.workflow.run_id": info.metadata.get("workflow_run_id"),
}
)
if info.metadata.get("node_execution_id"):
attrs["dify.node.execution_id"] = info.metadata.get("node_execution_id")
if self._exporter.include_content:
attrs["dify.suggested_question.questions"] = self._maybe_json(info.suggested_question)
@ -486,6 +500,7 @@ class EnterpriseOtelTrace:
emit_metric_only_event(
event_name="dify.suggested_question.generation",
attributes=attrs,
span_id_source=info.metadata.get("node_execution_id"),
tenant_id=info.metadata.get("tenant_id"),
user_id=info.metadata.get("user_id"),
)
@ -498,6 +513,9 @@ class EnterpriseOtelTrace:
def _dataset_retrieval_trace(self, info: DatasetRetrievalTraceInfo) -> None:
attrs = self._common_attrs(info)
attrs["dify.dataset.error"] = info.error
attrs["dify.workflow.run_id"] = info.metadata.get("workflow_run_id")
if info.metadata.get("node_execution_id"):
attrs["dify.node.execution_id"] = info.metadata.get("node_execution_id")
docs = info.documents or []
dataset_ids: list[str] = []
@ -550,6 +568,9 @@ class EnterpriseOtelTrace:
emit_metric_only_event(
event_name="dify.dataset.retrieval",
attributes=attrs,
trace_id_source=info.metadata.get("workflow_run_id") or str(info.message_id) if info.message_id else None,
span_id_source=info.metadata.get("node_execution_id")
or (str(info.message_id) if info.message_id else None),
tenant_id=info.metadata.get("tenant_id"),
user_id=info.metadata.get("user_id"),
)
@ -567,6 +588,8 @@ class EnterpriseOtelTrace:
def _generate_name_trace(self, info: GenerateNameTraceInfo) -> None:
attrs = self._common_attrs(info)
attrs["dify.conversation.id"] = info.conversation_id
if info.metadata.get("node_execution_id"):
attrs["dify.node.execution_id"] = info.metadata.get("node_execution_id")
if self._exporter.include_content:
attrs["dify.generate_name.inputs"] = self._maybe_json(info.inputs)
@ -579,6 +602,7 @@ class EnterpriseOtelTrace:
emit_metric_only_event(
event_name="dify.generate_name.execution",
attributes=attrs,
span_id_source=info.metadata.get("node_execution_id"),
tenant_id=info.tenant_id,
user_id=info.metadata.get("user_id"),
)
@ -603,6 +627,8 @@ class EnterpriseOtelTrace:
"dify.prompt_generation.latency": info.latency,
"dify.prompt_generation.error": info.error,
}
if info.metadata.get("node_execution_id"):
attrs["dify.node.execution_id"] = info.metadata.get("node_execution_id")
if info.total_price is not None:
attrs["dify.prompt_generation.total_price"] = info.total_price
@ -619,6 +645,7 @@ class EnterpriseOtelTrace:
emit_metric_only_event(
event_name="dify.prompt_generation.execution",
attributes=attrs,
span_id_source=info.metadata.get("node_execution_id"),
tenant_id=info.tenant_id,
user_id=info.user_id,
)

View File

@ -1,3 +1,4 @@
import logging
from typing import Any
from core.ops.entities.config_entity import BaseTracingConfig
@ -5,6 +6,8 @@ from core.ops.ops_trace_manager import OpsTraceManager, provider_config_map
from extensions.ext_database import db
from models.model import App, TraceAppConfig
logger = logging.getLogger(__name__)
class OpsService:
@classmethod
@ -135,12 +138,13 @@ class OpsService:
return trace_config_data.to_dict()
@classmethod
def create_tracing_app_config(cls, app_id: str, tracing_provider: str, tracing_config: dict):
def create_tracing_app_config(cls, app_id: str, tracing_provider: str, tracing_config: dict, account_id: str):
"""
Create tracing app config
:param app_id: app id
:param tracing_provider: tracing provider
:param tracing_config: tracing config
:param account_id: account id of the user creating the config
:return:
"""
try:
@ -207,15 +211,19 @@ class OpsService:
db.session.add(trace_config_data)
db.session.commit()
# Log the creation with modifier information
logger.info("Trace config created: app_id=%s, provider=%s, created_by=%s", app_id, tracing_provider, account_id)
return {"result": "success"}
@classmethod
def update_tracing_app_config(cls, app_id: str, tracing_provider: str, tracing_config: dict):
def update_tracing_app_config(cls, app_id: str, tracing_provider: str, tracing_config: dict, account_id: str):
"""
Update tracing app config
:param app_id: app id
:param tracing_provider: tracing provider
:param tracing_config: tracing config
:param account_id: account id of the user updating the config
:return:
"""
try:
@ -251,14 +259,18 @@ class OpsService:
current_trace_config.tracing_config = tracing_config
db.session.commit()
# Log the update with modifier information
logger.info("Trace config updated: app_id=%s, provider=%s, updated_by=%s", app_id, tracing_provider, account_id)
return current_trace_config.to_dict()
@classmethod
def delete_tracing_app_config(cls, app_id: str, tracing_provider: str):
def delete_tracing_app_config(cls, app_id: str, tracing_provider: str, account_id: str):
"""
Delete tracing app config
:param app_id: app id
:param tracing_provider: tracing provider
:param account_id: account id of the user deleting the config
:return:
"""
trace_config = (
@ -270,6 +282,9 @@ class OpsService:
if not trace_config:
return None
# Log the deletion with modifier information
logger.info("Trace config deleted: app_id=%s, provider=%s, deleted_by=%s", app_id, tracing_provider, account_id)
db.session.delete(trace_config)
db.session.commit()