feat(telemetry): add prompt generation telemetry to Enterprise OTEL

- Add PromptGenerationTraceInfo trace entity with operation_type field
- Implement telemetry for rule-generate, code-generate, structured-output, instruction-modify operations
- Emit metrics: tokens (total/input/output), duration histogram, requests counter, errors counter
- Emit structured logs with model info and operation context
- Content redaction controlled by ENTERPRISE_INCLUDE_CONTENT env var
- Fix user_id propagation in TraceTask kwargs
- Fix latency calculation when llm_result is None

No spans exported - metrics and logs only for lightweight observability.
This commit is contained in:
GareArc 2026-02-04 00:38:17 -08:00
parent bada2d18a1
commit c56e5a5b71
No known key found for this signature in database
7 changed files with 474 additions and 97 deletions

View File

@ -79,7 +79,7 @@ class RuleGenerateApi(Resource):
@account_initialization_required
def post(self):
args = RuleGeneratePayload.model_validate(console_ns.payload)
_, current_tenant_id = current_account_with_tenant()
account, current_tenant_id = current_account_with_tenant()
try:
rules = LLMGenerator.generate_rule_config(
@ -87,6 +87,8 @@ class RuleGenerateApi(Resource):
instruction=args.instruction,
model_config=args.model_config_data,
no_variable=args.no_variable,
user_id=account.id,
app_id=None,
)
except ProviderTokenNotInitError as ex:
raise ProviderNotInitializeError(ex.description)
@ -113,7 +115,7 @@ class RuleCodeGenerateApi(Resource):
@account_initialization_required
def post(self):
args = RuleCodeGeneratePayload.model_validate(console_ns.payload)
_, current_tenant_id = current_account_with_tenant()
account, current_tenant_id = current_account_with_tenant()
try:
code_result = LLMGenerator.generate_code(
@ -121,6 +123,8 @@ class RuleCodeGenerateApi(Resource):
instruction=args.instruction,
model_config=args.model_config_data,
code_language=args.code_language,
user_id=account.id,
app_id=None,
)
except ProviderTokenNotInitError as ex:
raise ProviderNotInitializeError(ex.description)
@ -147,13 +151,15 @@ class RuleStructuredOutputGenerateApi(Resource):
@account_initialization_required
def post(self):
args = RuleStructuredOutputPayload.model_validate(console_ns.payload)
_, current_tenant_id = current_account_with_tenant()
account, current_tenant_id = current_account_with_tenant()
try:
structured_output = LLMGenerator.generate_structured_output(
tenant_id=current_tenant_id,
instruction=args.instruction,
model_config=args.model_config_data,
user_id=account.id,
app_id=None,
)
except ProviderTokenNotInitError as ex:
raise ProviderNotInitializeError(ex.description)
@ -180,14 +186,13 @@ class InstructionGenerateApi(Resource):
@account_initialization_required
def post(self):
args = InstructionGeneratePayload.model_validate(console_ns.payload)
_, current_tenant_id = current_account_with_tenant()
account, current_tenant_id = current_account_with_tenant()
providers: list[type[CodeNodeProvider]] = [Python3CodeProvider, JavascriptCodeProvider]
code_provider: type[CodeNodeProvider] | None = next(
(p for p in providers if p.is_accept_language(args.language)), None
)
code_template = code_provider.get_default_code() if code_provider else ""
try:
# Generate from nothing for a workflow node
if (args.current in (code_template, "")) and args.node_id != "":
app = db.session.query(App).where(App.id == args.flow_id).first()
if not app:
@ -207,6 +212,8 @@ class InstructionGenerateApi(Resource):
instruction=args.instruction,
model_config=args.model_config_data,
no_variable=True,
user_id=account.id,
app_id=args.flow_id,
)
case "agent":
return LLMGenerator.generate_rule_config(
@ -214,6 +221,8 @@ class InstructionGenerateApi(Resource):
instruction=args.instruction,
model_config=args.model_config_data,
no_variable=True,
user_id=account.id,
app_id=args.flow_id,
)
case "code":
return LLMGenerator.generate_code(
@ -221,10 +230,12 @@ class InstructionGenerateApi(Resource):
instruction=args.instruction,
model_config=args.model_config_data,
code_language=args.language,
user_id=account.id,
app_id=args.flow_id,
)
case _:
return {"error": f"invalid node type: {node_type}"}
if args.node_id == "" and args.current != "": # For legacy app without a workflow
if args.node_id == "" and args.current != "":
return LLMGenerator.instruction_modify_legacy(
tenant_id=current_tenant_id,
flow_id=args.flow_id,
@ -232,8 +243,10 @@ class InstructionGenerateApi(Resource):
instruction=args.instruction,
model_config=args.model_config_data,
ideal_output=args.ideal_output,
user_id=account.id,
app_id=args.flow_id,
)
if args.node_id != "" and args.current != "": # For workflow node
if args.node_id != "" and args.current != "":
return LLMGenerator.instruction_modify_workflow(
tenant_id=current_tenant_id,
flow_id=args.flow_id,
@ -243,6 +256,8 @@ class InstructionGenerateApi(Resource):
model_config=args.model_config_data,
ideal_output=args.ideal_output,
workflow_service=WorkflowService(),
user_id=account.id,
app_id=args.flow_id,
)
return {"error": "incompatible parameters"}, 400
except ProviderTokenNotInitError as ex:

View File

@ -151,7 +151,15 @@ class LLMGenerator:
return questions
@classmethod
def generate_rule_config(cls, tenant_id: str, instruction: str, model_config: dict, no_variable: bool):
def generate_rule_config(
cls,
tenant_id: str,
instruction: str,
model_config: dict,
no_variable: bool,
user_id: str | None = None,
app_id: str | None = None,
):
output_parser = RuleConfigGeneratorOutputParser()
error = ""
@ -179,22 +187,40 @@ class LLMGenerator:
model=model_config.get("name", ""),
)
try:
response: LLMResult = model_instance.invoke_llm(
prompt_messages=list(prompt_messages), model_parameters=model_parameters, stream=False
)
llm_result = None
with measure_time() as timer:
try:
llm_result = model_instance.invoke_llm(
prompt_messages=list(prompt_messages), model_parameters=model_parameters, stream=False
)
rule_config["prompt"] = cast(str, response.message.content)
rule_config["prompt"] = cast(str, llm_result.message.content)
except InvokeError as e:
error = str(e)
error_step = "generate rule config"
except Exception as e:
logger.exception("Failed to generate rule config, model: %s", model_config.get("name"))
rule_config["error"] = str(e)
except InvokeError as e:
error = str(e)
error_step = "generate rule config"
except Exception as e:
logger.exception("Failed to generate rule config, model: %s", model_config.get("name"))
rule_config["error"] = str(e)
error = str(e)
rule_config["error"] = f"Failed to {error_step}. Error: {error}" if error else ""
if user_id:
prompt_value = rule_config.get("prompt", "")
generated_output = str(prompt_value) if prompt_value else ""
cls._emit_prompt_generation_trace(
tenant_id=tenant_id,
user_id=user_id,
app_id=app_id,
operation_type="rule_generate",
instruction=instruction,
generated_output=generated_output,
llm_result=llm_result,
timer=timer,
error=error or None,
)
return rule_config
# get rule config prompt, parameter and statement
@ -286,7 +312,15 @@ class LLMGenerator:
return rule_config
@classmethod
def generate_code(cls, tenant_id: str, instruction: str, model_config: dict, code_language: str = "javascript"):
def generate_code(
cls,
tenant_id: str,
instruction: str,
model_config: dict,
code_language: str = "javascript",
user_id: str | None = None,
app_id: str | None = None,
):
if code_language == "python":
prompt_template = PromptTemplateParser(PYTHON_CODE_GENERATOR_PROMPT_TEMPLATE)
else:
@ -310,22 +344,42 @@ class LLMGenerator:
prompt_messages = [UserPromptMessage(content=prompt)]
model_parameters = model_config.get("completion_params", {})
try:
response: LLMResult = model_instance.invoke_llm(
prompt_messages=list(prompt_messages), model_parameters=model_parameters, stream=False
llm_result = None
error = None
with measure_time() as timer:
try:
llm_result = model_instance.invoke_llm(
prompt_messages=list(prompt_messages), model_parameters=model_parameters, stream=False
)
generated_code = cast(str, llm_result.message.content)
result = {"code": generated_code, "language": code_language, "error": ""}
except InvokeError as e:
error = str(e)
result = {"code": "", "language": code_language, "error": f"Failed to generate code. Error: {error}"}
except Exception as e:
logger.exception(
"Failed to invoke LLM model, model: %s, language: %s", model_config.get("name"), code_language
)
error = str(e)
result = {"code": "", "language": code_language, "error": f"An unexpected error occurred: {str(e)}"}
if user_id:
cls._emit_prompt_generation_trace(
tenant_id=tenant_id,
user_id=user_id,
app_id=app_id,
operation_type="code_generate",
instruction=instruction,
generated_output=result.get("code", ""),
llm_result=llm_result,
timer=timer,
error=error,
)
generated_code = cast(str, response.message.content)
return {"code": generated_code, "language": code_language, "error": ""}
except InvokeError as e:
error = str(e)
return {"code": "", "language": code_language, "error": f"Failed to generate code. Error: {error}"}
except Exception as e:
logger.exception(
"Failed to invoke LLM model, model: %s, language: %s", model_config.get("name"), code_language
)
return {"code": "", "language": code_language, "error": f"An unexpected error occurred: {str(e)}"}
return result
@classmethod
def generate_qa_document(cls, tenant_id: str, query, document_language: str):
@ -355,7 +409,9 @@ class LLMGenerator:
return answer.strip()
@classmethod
def generate_structured_output(cls, tenant_id: str, instruction: str, model_config: dict):
def generate_structured_output(
cls, tenant_id: str, instruction: str, model_config: dict, user_id: str | None = None, app_id: str | None = None
):
model_manager = ModelManager()
model_instance = model_manager.get_model_instance(
tenant_id=tenant_id,
@ -370,43 +426,71 @@ class LLMGenerator:
]
model_parameters = model_config.get("model_parameters", {})
try:
response: LLMResult = model_instance.invoke_llm(
prompt_messages=list(prompt_messages), model_parameters=model_parameters, stream=False
llm_result = None
error = None
result = {"output": "", "error": ""}
with measure_time() as timer:
try:
llm_result = model_instance.invoke_llm(
prompt_messages=list(prompt_messages), model_parameters=model_parameters, stream=False
)
raw_content = llm_result.message.content
if not isinstance(raw_content, str):
raise ValueError(f"LLM response content must be a string, got: {type(raw_content)}")
try:
parsed_content = json.loads(raw_content)
except json.JSONDecodeError:
parsed_content = json_repair.loads(raw_content)
if not isinstance(parsed_content, dict | list):
raise ValueError(f"Failed to parse structured output from llm: {raw_content}")
generated_json_schema = json.dumps(parsed_content, indent=2, ensure_ascii=False)
result = {"output": generated_json_schema, "error": ""}
except InvokeError as e:
error = str(e)
result = {"output": "", "error": f"Failed to generate JSON Schema. Error: {error}"}
except Exception as e:
logger.exception("Failed to invoke LLM model, model: %s", model_config.get("name"))
error = str(e)
result = {"output": "", "error": f"An unexpected error occurred: {str(e)}"}
if user_id:
cls._emit_prompt_generation_trace(
tenant_id=tenant_id,
user_id=user_id,
app_id=app_id,
operation_type="structured_output",
instruction=instruction,
generated_output=result.get("output", ""),
llm_result=llm_result,
timer=timer,
error=error,
)
raw_content = response.message.content
if not isinstance(raw_content, str):
raise ValueError(f"LLM response content must be a string, got: {type(raw_content)}")
try:
parsed_content = json.loads(raw_content)
except json.JSONDecodeError:
parsed_content = json_repair.loads(raw_content)
if not isinstance(parsed_content, dict | list):
raise ValueError(f"Failed to parse structured output from llm: {raw_content}")
generated_json_schema = json.dumps(parsed_content, indent=2, ensure_ascii=False)
return {"output": generated_json_schema, "error": ""}
except InvokeError as e:
error = str(e)
return {"output": "", "error": f"Failed to generate JSON Schema. Error: {error}"}
except Exception as e:
logger.exception("Failed to invoke LLM model, model: %s", model_config.get("name"))
return {"output": "", "error": f"An unexpected error occurred: {str(e)}"}
return result
@staticmethod
def instruction_modify_legacy(
tenant_id: str, flow_id: str, current: str, instruction: str, model_config: dict, ideal_output: str | None
tenant_id: str,
flow_id: str,
current: str,
instruction: str,
model_config: dict,
ideal_output: str | None,
user_id: str | None = None,
app_id: str | None = None,
):
last_run: Message | None = (
db.session.query(Message).where(Message.app_id == flow_id).order_by(Message.created_at.desc()).first()
)
if not last_run:
return LLMGenerator.__instruction_modify_common(
result = LLMGenerator.__instruction_modify_common(
tenant_id=tenant_id,
model_config=model_config,
last_run=None,
@ -415,22 +499,28 @@ class LLMGenerator:
instruction=instruction,
node_type="llm",
ideal_output=ideal_output,
user_id=user_id,
app_id=app_id,
)
last_run_dict = {
"query": last_run.query,
"answer": last_run.answer,
"error": last_run.error,
}
return LLMGenerator.__instruction_modify_common(
tenant_id=tenant_id,
model_config=model_config,
last_run=last_run_dict,
current=current,
error_message=str(last_run.error),
instruction=instruction,
node_type="llm",
ideal_output=ideal_output,
)
else:
last_run_dict = {
"query": last_run.query,
"answer": last_run.answer,
"error": last_run.error,
}
result = LLMGenerator.__instruction_modify_common(
tenant_id=tenant_id,
model_config=model_config,
last_run=last_run_dict,
current=current,
error_message=str(last_run.error),
instruction=instruction,
node_type="llm",
ideal_output=ideal_output,
user_id=user_id,
app_id=app_id,
)
return result
@staticmethod
def instruction_modify_workflow(
@ -442,6 +532,8 @@ class LLMGenerator:
model_config: dict,
ideal_output: str | None,
workflow_service: WorkflowServiceInterface,
user_id: str | None = None,
app_id: str | None = None,
):
session = db.session()
@ -472,6 +564,8 @@ class LLMGenerator:
instruction=instruction,
node_type=node_type,
ideal_output=ideal_output,
user_id=user_id,
app_id=app_id,
)
def agent_log_of(node_execution: WorkflowNodeExecutionModel) -> Sequence:
@ -505,6 +599,8 @@ class LLMGenerator:
instruction=instruction,
node_type=last_run.node_type,
ideal_output=ideal_output,
user_id=user_id,
app_id=app_id,
)
@staticmethod
@ -517,6 +613,8 @@ class LLMGenerator:
instruction: str,
node_type: str,
ideal_output: str | None,
user_id: str | None = None,
app_id: str | None = None,
):
LAST_RUN = "{{#last_run#}}"
CURRENT = "{{#current#}}"
@ -556,26 +654,114 @@ class LLMGenerator:
]
model_parameters = {"temperature": 0.4}
try:
response: LLMResult = model_instance.invoke_llm(
prompt_messages=list(prompt_messages), model_parameters=model_parameters, stream=False
llm_result = None
error = None
result = {}
with measure_time() as timer:
try:
llm_result = model_instance.invoke_llm(
prompt_messages=list(prompt_messages), model_parameters=model_parameters, stream=False
)
generated_raw = llm_result.message.get_text_content()
first_brace = generated_raw.find("{")
last_brace = generated_raw.rfind("}")
if first_brace == -1 or last_brace == -1 or last_brace < first_brace:
raise ValueError(f"Could not find a valid JSON object in response: {generated_raw}")
json_str = generated_raw[first_brace : last_brace + 1]
data = json_repair.loads(json_str)
if not isinstance(data, dict):
raise TypeError(f"Expected a JSON object, but got {type(data).__name__}")
result = data
except InvokeError as e:
error = str(e)
result = {"error": f"Failed to generate code. Error: {error}"}
except Exception as e:
logger.exception(
"Failed to invoke LLM model, model: %s", json.dumps(model_config.get("name")), exc_info=True
)
error = str(e)
result = {"error": f"An unexpected error occurred: {str(e)}"}
if user_id:
generated_output = ""
if isinstance(result, dict):
for key in ["prompt", "code", "output", "modified"]:
if result.get(key):
generated_output = str(result[key])
break
LLMGenerator._emit_prompt_generation_trace(
tenant_id=tenant_id,
user_id=user_id,
app_id=app_id,
operation_type="instruction_modify",
instruction=instruction,
generated_output=generated_output,
llm_result=llm_result,
timer=timer,
error=error,
)
generated_raw = response.message.get_text_content()
first_brace = generated_raw.find("{")
last_brace = generated_raw.rfind("}")
if first_brace == -1 or last_brace == -1 or last_brace < first_brace:
raise ValueError(f"Could not find a valid JSON object in response: {generated_raw}")
json_str = generated_raw[first_brace : last_brace + 1]
data = json_repair.loads(json_str)
if not isinstance(data, dict):
raise TypeError(f"Expected a JSON object, but got {type(data).__name__}")
return data
except InvokeError as e:
error = str(e)
return {"error": f"Failed to generate code. Error: {error}"}
except Exception as e:
logger.exception(
"Failed to invoke LLM model, model: %s", json.dumps(model_config.get("name")), exc_info=True
return result
@classmethod
def _emit_prompt_generation_trace(
cls,
tenant_id: str,
user_id: str,
app_id: str | None,
operation_type: str,
instruction: str,
generated_output: str,
llm_result: LLMResult | None,
timer,
error: str | None = None,
):
if llm_result:
prompt_tokens = llm_result.usage.prompt_tokens
completion_tokens = llm_result.usage.completion_tokens
total_tokens = llm_result.usage.total_tokens
model_name = llm_result.model
model_provider = model_name.split("/")[0] if "/" in model_name else ""
latency = llm_result.usage.latency
total_price = float(llm_result.usage.total_price) if llm_result.usage.total_price else None
currency = llm_result.usage.currency
else:
prompt_tokens = 0
completion_tokens = 0
total_tokens = 0
model_provider = ""
model_name = ""
latency = 0.0
if timer:
start_time = timer.get("start")
end_time = timer.get("end")
if start_time and end_time:
latency = (end_time - start_time).total_seconds()
total_price = None
currency = None
trace_manager = TraceQueueManager(app_id=app_id)
trace_manager.add_trace_task(
TraceTask(
TraceTaskName.PROMPT_GENERATION_TRACE,
tenant_id=tenant_id,
user_id=user_id,
app_id=app_id,
operation_type=operation_type,
instruction=instruction,
generated_output=generated_output,
prompt_tokens=prompt_tokens,
completion_tokens=completion_tokens,
total_tokens=total_tokens,
model_provider=model_provider,
model_name=model_name,
latency=latency,
total_price=total_price,
currency=currency,
timer=timer,
error=error,
)
return {"error": f"An unexpected error occurred: {str(e)}"}
)

View File

@ -118,6 +118,33 @@ class GenerateNameTraceInfo(BaseTraceInfo):
tenant_id: str
class PromptGenerationTraceInfo(BaseTraceInfo):
"""Trace information for prompt generation operations (rule-generate, code-generate, etc.)."""
tenant_id: str
user_id: str
app_id: str | None = None
operation_type: str
instruction: str
prompt_tokens: int
completion_tokens: int
total_tokens: int
model_provider: str
model_name: str
latency: float
total_price: float | None = None
currency: str | None = None
error: str | None = None
model_config = ConfigDict(protected_namespaces=())
class WorkflowNodeTraceInfo(BaseTraceInfo):
workflow_id: str
workflow_run_id: str
@ -178,6 +205,7 @@ trace_info_info_map = {
"DatasetRetrievalTraceInfo": DatasetRetrievalTraceInfo,
"ToolTraceInfo": ToolTraceInfo,
"GenerateNameTraceInfo": GenerateNameTraceInfo,
"PromptGenerationTraceInfo": PromptGenerationTraceInfo,
"WorkflowNodeTraceInfo": WorkflowNodeTraceInfo,
"DraftNodeExecutionTrace": DraftNodeExecutionTrace,
}
@ -193,5 +221,6 @@ class TraceTaskName(StrEnum):
DATASET_RETRIEVAL_TRACE = "dataset_retrieval"
TOOL_TRACE = "tool"
GENERATE_NAME_TRACE = "generate_conversation_name"
PROMPT_GENERATION_TRACE = "prompt_generation"
DATASOURCE_TRACE = "datasource"
NODE_EXECUTION_TRACE = "node_execution"

View File

@ -25,6 +25,7 @@ from core.ops.entities.trace_entity import (
GenerateNameTraceInfo,
MessageTraceInfo,
ModerationTraceInfo,
PromptGenerationTraceInfo,
SuggestedQuestionTraceInfo,
TaskData,
ToolTraceInfo,
@ -590,6 +591,8 @@ class TraceTask:
self.app_id = None
self.trace_id = None
self.kwargs = kwargs
if user_id is not None and "user_id" not in self.kwargs:
self.kwargs["user_id"] = user_id
external_trace_id = kwargs.get("external_trace_id")
if external_trace_id:
self.trace_id = external_trace_id
@ -619,6 +622,7 @@ class TraceTask:
TraceTaskName.GENERATE_NAME_TRACE: lambda: self.generate_name_trace(
conversation_id=self.conversation_id, timer=self.timer, **self.kwargs
),
TraceTaskName.PROMPT_GENERATION_TRACE: lambda: self.prompt_generation_trace(**self.kwargs),
TraceTaskName.NODE_EXECUTION_TRACE: lambda: self.node_execution_trace(**self.kwargs),
TraceTaskName.DRAFT_NODE_EXECUTION_TRACE: lambda: self.draft_node_execution_trace(**self.kwargs),
}
@ -1062,6 +1066,71 @@ class TraceTask:
return generate_name_trace_info
def prompt_generation_trace(self, **kwargs) -> PromptGenerationTraceInfo | dict:
tenant_id = kwargs.get("tenant_id", "")
user_id = kwargs.get("user_id", "")
app_id = kwargs.get("app_id")
operation_type = kwargs.get("operation_type", "")
instruction = kwargs.get("instruction", "")
generated_output = kwargs.get("generated_output", "")
prompt_tokens = kwargs.get("prompt_tokens", 0)
completion_tokens = kwargs.get("completion_tokens", 0)
total_tokens = kwargs.get("total_tokens", 0)
model_provider = kwargs.get("model_provider", "")
model_name = kwargs.get("model_name", "")
latency = kwargs.get("latency", 0.0)
timer = kwargs.get("timer")
start_time = timer.get("start") if timer else None
end_time = timer.get("end") if timer else None
total_price = kwargs.get("total_price")
currency = kwargs.get("currency")
error = kwargs.get("error")
app_name = None
workspace_name = None
if app_id:
app_name, workspace_name = _lookup_app_and_workspace_names(app_id, tenant_id)
metadata = {
"tenant_id": tenant_id,
"user_id": user_id,
"app_id": app_id or "",
"app_name": app_name,
"workspace_name": workspace_name,
"operation_type": operation_type,
"model_provider": model_provider,
"model_name": model_name,
}
return PromptGenerationTraceInfo(
trace_id=self.trace_id,
inputs=instruction,
outputs=generated_output,
start_time=start_time,
end_time=end_time,
metadata=metadata,
tenant_id=tenant_id,
user_id=user_id,
app_id=app_id,
operation_type=operation_type,
instruction=instruction,
prompt_tokens=prompt_tokens,
completion_tokens=completion_tokens,
total_tokens=total_tokens,
model_provider=model_provider,
model_name=model_name,
latency=latency,
total_price=total_price,
currency=currency,
error=error,
)
def node_execution_trace(self, **kwargs) -> WorkflowNodeTraceInfo | dict:
node_data: dict = kwargs.get("node_execution_data", {})
if not node_data:

View File

@ -21,6 +21,7 @@ from core.ops.entities.trace_entity import (
GenerateNameTraceInfo,
MessageTraceInfo,
ModerationTraceInfo,
PromptGenerationTraceInfo,
SuggestedQuestionTraceInfo,
ToolTraceInfo,
WorkflowNodeTraceInfo,
@ -70,6 +71,8 @@ class EnterpriseOtelTrace:
self._dataset_retrieval_trace(trace_info)
elif isinstance(trace_info, GenerateNameTraceInfo):
self._generate_name_trace(trace_info)
elif isinstance(trace_info, PromptGenerationTraceInfo):
self._prompt_generation_trace(trace_info)
def _common_attrs(self, trace_info: BaseTraceInfo) -> dict[str, Any]:
return {
@ -582,3 +585,74 @@ class EnterpriseOtelTrace:
labels = {"tenant_id": info.tenant_id, "app_id": info.metadata.get("app_id", "")}
self._exporter.increment_counter(EnterpriseTelemetryCounter.REQUESTS, 1, {**labels, "type": "generate_name"})
def _prompt_generation_trace(self, info: PromptGenerationTraceInfo) -> None:
attrs = {
"dify.trace_id": info.trace_id,
"dify.tenant_id": info.tenant_id,
"dify.user.id": info.user_id,
"dify.app.id": info.app_id or "",
"dify.app.name": info.metadata.get("app_name"),
"dify.workspace.name": info.metadata.get("workspace_name"),
"dify.operation.type": info.operation_type,
"gen_ai.provider.name": info.model_provider,
"gen_ai.request.model": info.model_name,
"gen_ai.usage.input_tokens": info.prompt_tokens,
"gen_ai.usage.output_tokens": info.completion_tokens,
"gen_ai.usage.total_tokens": info.total_tokens,
"dify.prompt_generation.latency": info.latency,
"dify.prompt_generation.error": info.error,
}
if info.total_price is not None:
attrs["dify.prompt_generation.total_price"] = info.total_price
attrs["dify.prompt_generation.currency"] = info.currency
if self._exporter.include_content:
attrs["dify.prompt_generation.instruction"] = info.instruction
attrs["dify.prompt_generation.output"] = self._maybe_json(info.outputs)
else:
ref = f"ref:trace_id={info.trace_id}"
attrs["dify.prompt_generation.instruction"] = ref
attrs["dify.prompt_generation.output"] = ref
emit_metric_only_event(
event_name="dify.prompt_generation.execution",
attributes=attrs,
tenant_id=info.tenant_id,
user_id=info.user_id,
)
labels: dict[str, Any] = {
"tenant_id": info.tenant_id,
"app_id": info.app_id or "",
"operation_type": info.operation_type,
"model_provider": info.model_provider,
"model_name": info.model_name,
}
self._exporter.increment_counter(EnterpriseTelemetryCounter.TOKENS, info.total_tokens, labels)
if info.prompt_tokens > 0:
self._exporter.increment_counter(EnterpriseTelemetryCounter.INPUT_TOKENS, info.prompt_tokens, labels)
if info.completion_tokens > 0:
self._exporter.increment_counter(EnterpriseTelemetryCounter.OUTPUT_TOKENS, info.completion_tokens, labels)
status = "failed" if info.error else "success"
self._exporter.increment_counter(
EnterpriseTelemetryCounter.REQUESTS,
1,
{**labels, "type": "prompt_generation", "status": status},
)
self._exporter.record_histogram(
EnterpriseTelemetryHistogram.PROMPT_GENERATION_DURATION,
info.latency,
labels,
)
if info.error:
self._exporter.increment_counter(
EnterpriseTelemetryCounter.ERRORS,
1,
{**labels, "type": "prompt_generation"},
)

View File

@ -23,6 +23,7 @@ class EnterpriseTelemetryHistogram(StrEnum):
MESSAGE_DURATION = "message_duration"
MESSAGE_TTFT = "message_ttft"
TOOL_DURATION = "tool_duration"
PROMPT_GENERATION_DURATION = "prompt_generation_duration"
__all__ = [

View File

@ -150,6 +150,9 @@ class EnterpriseExporter:
"dify.message.time_to_first_token", unit="s"
),
EnterpriseTelemetryHistogram.TOOL_DURATION: meter.create_histogram("dify.tool.duration", unit="s"),
EnterpriseTelemetryHistogram.PROMPT_GENERATION_DURATION: meter.create_histogram(
"dify.prompt_generation.duration", unit="s"
),
}
def export_span(