diff --git a/api/controllers/console/app/generator.py b/api/controllers/console/app/generator.py index b4fc44767a..917d4f0753 100644 --- a/api/controllers/console/app/generator.py +++ b/api/controllers/console/app/generator.py @@ -79,7 +79,7 @@ class RuleGenerateApi(Resource): @account_initialization_required def post(self): args = RuleGeneratePayload.model_validate(console_ns.payload) - _, current_tenant_id = current_account_with_tenant() + account, current_tenant_id = current_account_with_tenant() try: rules = LLMGenerator.generate_rule_config( @@ -87,6 +87,8 @@ class RuleGenerateApi(Resource): instruction=args.instruction, model_config=args.model_config_data, no_variable=args.no_variable, + user_id=account.id, + app_id=None, ) except ProviderTokenNotInitError as ex: raise ProviderNotInitializeError(ex.description) @@ -113,7 +115,7 @@ class RuleCodeGenerateApi(Resource): @account_initialization_required def post(self): args = RuleCodeGeneratePayload.model_validate(console_ns.payload) - _, current_tenant_id = current_account_with_tenant() + account, current_tenant_id = current_account_with_tenant() try: code_result = LLMGenerator.generate_code( @@ -121,6 +123,8 @@ class RuleCodeGenerateApi(Resource): instruction=args.instruction, model_config=args.model_config_data, code_language=args.code_language, + user_id=account.id, + app_id=None, ) except ProviderTokenNotInitError as ex: raise ProviderNotInitializeError(ex.description) @@ -147,13 +151,15 @@ class RuleStructuredOutputGenerateApi(Resource): @account_initialization_required def post(self): args = RuleStructuredOutputPayload.model_validate(console_ns.payload) - _, current_tenant_id = current_account_with_tenant() + account, current_tenant_id = current_account_with_tenant() try: structured_output = LLMGenerator.generate_structured_output( tenant_id=current_tenant_id, instruction=args.instruction, model_config=args.model_config_data, + user_id=account.id, + app_id=None, ) except ProviderTokenNotInitError as ex: raise ProviderNotInitializeError(ex.description) @@ -180,14 +186,13 @@ class InstructionGenerateApi(Resource): @account_initialization_required def post(self): args = InstructionGeneratePayload.model_validate(console_ns.payload) - _, current_tenant_id = current_account_with_tenant() + account, current_tenant_id = current_account_with_tenant() providers: list[type[CodeNodeProvider]] = [Python3CodeProvider, JavascriptCodeProvider] code_provider: type[CodeNodeProvider] | None = next( (p for p in providers if p.is_accept_language(args.language)), None ) code_template = code_provider.get_default_code() if code_provider else "" try: - # Generate from nothing for a workflow node if (args.current in (code_template, "")) and args.node_id != "": app = db.session.query(App).where(App.id == args.flow_id).first() if not app: @@ -207,6 +212,8 @@ class InstructionGenerateApi(Resource): instruction=args.instruction, model_config=args.model_config_data, no_variable=True, + user_id=account.id, + app_id=args.flow_id, ) case "agent": return LLMGenerator.generate_rule_config( @@ -214,6 +221,8 @@ class InstructionGenerateApi(Resource): instruction=args.instruction, model_config=args.model_config_data, no_variable=True, + user_id=account.id, + app_id=args.flow_id, ) case "code": return LLMGenerator.generate_code( @@ -221,10 +230,12 @@ class InstructionGenerateApi(Resource): instruction=args.instruction, model_config=args.model_config_data, code_language=args.language, + user_id=account.id, + app_id=args.flow_id, ) case _: return {"error": f"invalid node type: {node_type}"} - if args.node_id == "" and args.current != "": # For legacy app without a workflow + if args.node_id == "" and args.current != "": return LLMGenerator.instruction_modify_legacy( tenant_id=current_tenant_id, flow_id=args.flow_id, @@ -232,8 +243,10 @@ class InstructionGenerateApi(Resource): instruction=args.instruction, model_config=args.model_config_data, ideal_output=args.ideal_output, + user_id=account.id, + app_id=args.flow_id, ) - if args.node_id != "" and args.current != "": # For workflow node + if args.node_id != "" and args.current != "": return LLMGenerator.instruction_modify_workflow( tenant_id=current_tenant_id, flow_id=args.flow_id, @@ -243,6 +256,8 @@ class InstructionGenerateApi(Resource): model_config=args.model_config_data, ideal_output=args.ideal_output, workflow_service=WorkflowService(), + user_id=account.id, + app_id=args.flow_id, ) return {"error": "incompatible parameters"}, 400 except ProviderTokenNotInitError as ex: diff --git a/api/core/llm_generator/llm_generator.py b/api/core/llm_generator/llm_generator.py index b4c3ec1caf..4bb64160b6 100644 --- a/api/core/llm_generator/llm_generator.py +++ b/api/core/llm_generator/llm_generator.py @@ -151,7 +151,15 @@ class LLMGenerator: return questions @classmethod - def generate_rule_config(cls, tenant_id: str, instruction: str, model_config: dict, no_variable: bool): + def generate_rule_config( + cls, + tenant_id: str, + instruction: str, + model_config: dict, + no_variable: bool, + user_id: str | None = None, + app_id: str | None = None, + ): output_parser = RuleConfigGeneratorOutputParser() error = "" @@ -179,22 +187,40 @@ class LLMGenerator: model=model_config.get("name", ""), ) - try: - response: LLMResult = model_instance.invoke_llm( - prompt_messages=list(prompt_messages), model_parameters=model_parameters, stream=False - ) + llm_result = None + with measure_time() as timer: + try: + llm_result = model_instance.invoke_llm( + prompt_messages=list(prompt_messages), model_parameters=model_parameters, stream=False + ) - rule_config["prompt"] = cast(str, response.message.content) + rule_config["prompt"] = cast(str, llm_result.message.content) - except InvokeError as e: - error = str(e) - error_step = "generate rule config" - except Exception as e: - logger.exception("Failed to generate rule config, model: %s", model_config.get("name")) - rule_config["error"] = str(e) + except InvokeError as e: + error = str(e) + error_step = "generate rule config" + except Exception as e: + logger.exception("Failed to generate rule config, model: %s", model_config.get("name")) + rule_config["error"] = str(e) + error = str(e) rule_config["error"] = f"Failed to {error_step}. Error: {error}" if error else "" + if user_id: + prompt_value = rule_config.get("prompt", "") + generated_output = str(prompt_value) if prompt_value else "" + cls._emit_prompt_generation_trace( + tenant_id=tenant_id, + user_id=user_id, + app_id=app_id, + operation_type="rule_generate", + instruction=instruction, + generated_output=generated_output, + llm_result=llm_result, + timer=timer, + error=error or None, + ) + return rule_config # get rule config prompt, parameter and statement @@ -286,7 +312,15 @@ class LLMGenerator: return rule_config @classmethod - def generate_code(cls, tenant_id: str, instruction: str, model_config: dict, code_language: str = "javascript"): + def generate_code( + cls, + tenant_id: str, + instruction: str, + model_config: dict, + code_language: str = "javascript", + user_id: str | None = None, + app_id: str | None = None, + ): if code_language == "python": prompt_template = PromptTemplateParser(PYTHON_CODE_GENERATOR_PROMPT_TEMPLATE) else: @@ -310,22 +344,42 @@ class LLMGenerator: prompt_messages = [UserPromptMessage(content=prompt)] model_parameters = model_config.get("completion_params", {}) - try: - response: LLMResult = model_instance.invoke_llm( - prompt_messages=list(prompt_messages), model_parameters=model_parameters, stream=False + + llm_result = None + error = None + with measure_time() as timer: + try: + llm_result = model_instance.invoke_llm( + prompt_messages=list(prompt_messages), model_parameters=model_parameters, stream=False + ) + + generated_code = cast(str, llm_result.message.content) + result = {"code": generated_code, "language": code_language, "error": ""} + + except InvokeError as e: + error = str(e) + result = {"code": "", "language": code_language, "error": f"Failed to generate code. Error: {error}"} + except Exception as e: + logger.exception( + "Failed to invoke LLM model, model: %s, language: %s", model_config.get("name"), code_language + ) + error = str(e) + result = {"code": "", "language": code_language, "error": f"An unexpected error occurred: {str(e)}"} + + if user_id: + cls._emit_prompt_generation_trace( + tenant_id=tenant_id, + user_id=user_id, + app_id=app_id, + operation_type="code_generate", + instruction=instruction, + generated_output=result.get("code", ""), + llm_result=llm_result, + timer=timer, + error=error, ) - generated_code = cast(str, response.message.content) - return {"code": generated_code, "language": code_language, "error": ""} - - except InvokeError as e: - error = str(e) - return {"code": "", "language": code_language, "error": f"Failed to generate code. Error: {error}"} - except Exception as e: - logger.exception( - "Failed to invoke LLM model, model: %s, language: %s", model_config.get("name"), code_language - ) - return {"code": "", "language": code_language, "error": f"An unexpected error occurred: {str(e)}"} + return result @classmethod def generate_qa_document(cls, tenant_id: str, query, document_language: str): @@ -355,7 +409,9 @@ class LLMGenerator: return answer.strip() @classmethod - def generate_structured_output(cls, tenant_id: str, instruction: str, model_config: dict): + def generate_structured_output( + cls, tenant_id: str, instruction: str, model_config: dict, user_id: str | None = None, app_id: str | None = None + ): model_manager = ModelManager() model_instance = model_manager.get_model_instance( tenant_id=tenant_id, @@ -370,43 +426,71 @@ class LLMGenerator: ] model_parameters = model_config.get("model_parameters", {}) - try: - response: LLMResult = model_instance.invoke_llm( - prompt_messages=list(prompt_messages), model_parameters=model_parameters, stream=False + llm_result = None + error = None + result = {"output": "", "error": ""} + + with measure_time() as timer: + try: + llm_result = model_instance.invoke_llm( + prompt_messages=list(prompt_messages), model_parameters=model_parameters, stream=False + ) + + raw_content = llm_result.message.content + + if not isinstance(raw_content, str): + raise ValueError(f"LLM response content must be a string, got: {type(raw_content)}") + + try: + parsed_content = json.loads(raw_content) + except json.JSONDecodeError: + parsed_content = json_repair.loads(raw_content) + + if not isinstance(parsed_content, dict | list): + raise ValueError(f"Failed to parse structured output from llm: {raw_content}") + + generated_json_schema = json.dumps(parsed_content, indent=2, ensure_ascii=False) + result = {"output": generated_json_schema, "error": ""} + + except InvokeError as e: + error = str(e) + result = {"output": "", "error": f"Failed to generate JSON Schema. Error: {error}"} + except Exception as e: + logger.exception("Failed to invoke LLM model, model: %s", model_config.get("name")) + error = str(e) + result = {"output": "", "error": f"An unexpected error occurred: {str(e)}"} + + if user_id: + cls._emit_prompt_generation_trace( + tenant_id=tenant_id, + user_id=user_id, + app_id=app_id, + operation_type="structured_output", + instruction=instruction, + generated_output=result.get("output", ""), + llm_result=llm_result, + timer=timer, + error=error, ) - raw_content = response.message.content - - if not isinstance(raw_content, str): - raise ValueError(f"LLM response content must be a string, got: {type(raw_content)}") - - try: - parsed_content = json.loads(raw_content) - except json.JSONDecodeError: - parsed_content = json_repair.loads(raw_content) - - if not isinstance(parsed_content, dict | list): - raise ValueError(f"Failed to parse structured output from llm: {raw_content}") - - generated_json_schema = json.dumps(parsed_content, indent=2, ensure_ascii=False) - return {"output": generated_json_schema, "error": ""} - - except InvokeError as e: - error = str(e) - return {"output": "", "error": f"Failed to generate JSON Schema. Error: {error}"} - except Exception as e: - logger.exception("Failed to invoke LLM model, model: %s", model_config.get("name")) - return {"output": "", "error": f"An unexpected error occurred: {str(e)}"} + return result @staticmethod def instruction_modify_legacy( - tenant_id: str, flow_id: str, current: str, instruction: str, model_config: dict, ideal_output: str | None + tenant_id: str, + flow_id: str, + current: str, + instruction: str, + model_config: dict, + ideal_output: str | None, + user_id: str | None = None, + app_id: str | None = None, ): last_run: Message | None = ( db.session.query(Message).where(Message.app_id == flow_id).order_by(Message.created_at.desc()).first() ) if not last_run: - return LLMGenerator.__instruction_modify_common( + result = LLMGenerator.__instruction_modify_common( tenant_id=tenant_id, model_config=model_config, last_run=None, @@ -415,22 +499,28 @@ class LLMGenerator: instruction=instruction, node_type="llm", ideal_output=ideal_output, + user_id=user_id, + app_id=app_id, ) - last_run_dict = { - "query": last_run.query, - "answer": last_run.answer, - "error": last_run.error, - } - return LLMGenerator.__instruction_modify_common( - tenant_id=tenant_id, - model_config=model_config, - last_run=last_run_dict, - current=current, - error_message=str(last_run.error), - instruction=instruction, - node_type="llm", - ideal_output=ideal_output, - ) + else: + last_run_dict = { + "query": last_run.query, + "answer": last_run.answer, + "error": last_run.error, + } + result = LLMGenerator.__instruction_modify_common( + tenant_id=tenant_id, + model_config=model_config, + last_run=last_run_dict, + current=current, + error_message=str(last_run.error), + instruction=instruction, + node_type="llm", + ideal_output=ideal_output, + user_id=user_id, + app_id=app_id, + ) + return result @staticmethod def instruction_modify_workflow( @@ -442,6 +532,8 @@ class LLMGenerator: model_config: dict, ideal_output: str | None, workflow_service: WorkflowServiceInterface, + user_id: str | None = None, + app_id: str | None = None, ): session = db.session() @@ -472,6 +564,8 @@ class LLMGenerator: instruction=instruction, node_type=node_type, ideal_output=ideal_output, + user_id=user_id, + app_id=app_id, ) def agent_log_of(node_execution: WorkflowNodeExecutionModel) -> Sequence: @@ -505,6 +599,8 @@ class LLMGenerator: instruction=instruction, node_type=last_run.node_type, ideal_output=ideal_output, + user_id=user_id, + app_id=app_id, ) @staticmethod @@ -517,6 +613,8 @@ class LLMGenerator: instruction: str, node_type: str, ideal_output: str | None, + user_id: str | None = None, + app_id: str | None = None, ): LAST_RUN = "{{#last_run#}}" CURRENT = "{{#current#}}" @@ -556,26 +654,114 @@ class LLMGenerator: ] model_parameters = {"temperature": 0.4} - try: - response: LLMResult = model_instance.invoke_llm( - prompt_messages=list(prompt_messages), model_parameters=model_parameters, stream=False + llm_result = None + error = None + result = {} + + with measure_time() as timer: + try: + llm_result = model_instance.invoke_llm( + prompt_messages=list(prompt_messages), model_parameters=model_parameters, stream=False + ) + + generated_raw = llm_result.message.get_text_content() + first_brace = generated_raw.find("{") + last_brace = generated_raw.rfind("}") + if first_brace == -1 or last_brace == -1 or last_brace < first_brace: + raise ValueError(f"Could not find a valid JSON object in response: {generated_raw}") + json_str = generated_raw[first_brace : last_brace + 1] + data = json_repair.loads(json_str) + if not isinstance(data, dict): + raise TypeError(f"Expected a JSON object, but got {type(data).__name__}") + result = data + except InvokeError as e: + error = str(e) + result = {"error": f"Failed to generate code. Error: {error}"} + except Exception as e: + logger.exception( + "Failed to invoke LLM model, model: %s", json.dumps(model_config.get("name")), exc_info=True + ) + error = str(e) + result = {"error": f"An unexpected error occurred: {str(e)}"} + + if user_id: + generated_output = "" + if isinstance(result, dict): + for key in ["prompt", "code", "output", "modified"]: + if result.get(key): + generated_output = str(result[key]) + break + + LLMGenerator._emit_prompt_generation_trace( + tenant_id=tenant_id, + user_id=user_id, + app_id=app_id, + operation_type="instruction_modify", + instruction=instruction, + generated_output=generated_output, + llm_result=llm_result, + timer=timer, + error=error, ) - generated_raw = response.message.get_text_content() - first_brace = generated_raw.find("{") - last_brace = generated_raw.rfind("}") - if first_brace == -1 or last_brace == -1 or last_brace < first_brace: - raise ValueError(f"Could not find a valid JSON object in response: {generated_raw}") - json_str = generated_raw[first_brace : last_brace + 1] - data = json_repair.loads(json_str) - if not isinstance(data, dict): - raise TypeError(f"Expected a JSON object, but got {type(data).__name__}") - return data - except InvokeError as e: - error = str(e) - return {"error": f"Failed to generate code. Error: {error}"} - except Exception as e: - logger.exception( - "Failed to invoke LLM model, model: %s", json.dumps(model_config.get("name")), exc_info=True + return result + + @classmethod + def _emit_prompt_generation_trace( + cls, + tenant_id: str, + user_id: str, + app_id: str | None, + operation_type: str, + instruction: str, + generated_output: str, + llm_result: LLMResult | None, + timer, + error: str | None = None, + ): + if llm_result: + prompt_tokens = llm_result.usage.prompt_tokens + completion_tokens = llm_result.usage.completion_tokens + total_tokens = llm_result.usage.total_tokens + model_name = llm_result.model + model_provider = model_name.split("/")[0] if "/" in model_name else "" + latency = llm_result.usage.latency + total_price = float(llm_result.usage.total_price) if llm_result.usage.total_price else None + currency = llm_result.usage.currency + else: + prompt_tokens = 0 + completion_tokens = 0 + total_tokens = 0 + model_provider = "" + model_name = "" + latency = 0.0 + if timer: + start_time = timer.get("start") + end_time = timer.get("end") + if start_time and end_time: + latency = (end_time - start_time).total_seconds() + total_price = None + currency = None + + trace_manager = TraceQueueManager(app_id=app_id) + trace_manager.add_trace_task( + TraceTask( + TraceTaskName.PROMPT_GENERATION_TRACE, + tenant_id=tenant_id, + user_id=user_id, + app_id=app_id, + operation_type=operation_type, + instruction=instruction, + generated_output=generated_output, + prompt_tokens=prompt_tokens, + completion_tokens=completion_tokens, + total_tokens=total_tokens, + model_provider=model_provider, + model_name=model_name, + latency=latency, + total_price=total_price, + currency=currency, + timer=timer, + error=error, ) - return {"error": f"An unexpected error occurred: {str(e)}"} + ) diff --git a/api/core/ops/entities/trace_entity.py b/api/core/ops/entities/trace_entity.py index fa2ec3f21f..47074da8b1 100644 --- a/api/core/ops/entities/trace_entity.py +++ b/api/core/ops/entities/trace_entity.py @@ -118,6 +118,33 @@ class GenerateNameTraceInfo(BaseTraceInfo): tenant_id: str +class PromptGenerationTraceInfo(BaseTraceInfo): + """Trace information for prompt generation operations (rule-generate, code-generate, etc.).""" + + tenant_id: str + user_id: str + app_id: str | None = None + + operation_type: str + instruction: str + + prompt_tokens: int + completion_tokens: int + total_tokens: int + + model_provider: str + model_name: str + + latency: float + + total_price: float | None = None + currency: str | None = None + + error: str | None = None + + model_config = ConfigDict(protected_namespaces=()) + + class WorkflowNodeTraceInfo(BaseTraceInfo): workflow_id: str workflow_run_id: str @@ -178,6 +205,7 @@ trace_info_info_map = { "DatasetRetrievalTraceInfo": DatasetRetrievalTraceInfo, "ToolTraceInfo": ToolTraceInfo, "GenerateNameTraceInfo": GenerateNameTraceInfo, + "PromptGenerationTraceInfo": PromptGenerationTraceInfo, "WorkflowNodeTraceInfo": WorkflowNodeTraceInfo, "DraftNodeExecutionTrace": DraftNodeExecutionTrace, } @@ -193,5 +221,6 @@ class TraceTaskName(StrEnum): DATASET_RETRIEVAL_TRACE = "dataset_retrieval" TOOL_TRACE = "tool" GENERATE_NAME_TRACE = "generate_conversation_name" + PROMPT_GENERATION_TRACE = "prompt_generation" DATASOURCE_TRACE = "datasource" NODE_EXECUTION_TRACE = "node_execution" diff --git a/api/core/ops/ops_trace_manager.py b/api/core/ops/ops_trace_manager.py index 7a876fb54f..2927fee1d2 100644 --- a/api/core/ops/ops_trace_manager.py +++ b/api/core/ops/ops_trace_manager.py @@ -25,6 +25,7 @@ from core.ops.entities.trace_entity import ( GenerateNameTraceInfo, MessageTraceInfo, ModerationTraceInfo, + PromptGenerationTraceInfo, SuggestedQuestionTraceInfo, TaskData, ToolTraceInfo, @@ -590,6 +591,8 @@ class TraceTask: self.app_id = None self.trace_id = None self.kwargs = kwargs + if user_id is not None and "user_id" not in self.kwargs: + self.kwargs["user_id"] = user_id external_trace_id = kwargs.get("external_trace_id") if external_trace_id: self.trace_id = external_trace_id @@ -619,6 +622,7 @@ class TraceTask: TraceTaskName.GENERATE_NAME_TRACE: lambda: self.generate_name_trace( conversation_id=self.conversation_id, timer=self.timer, **self.kwargs ), + TraceTaskName.PROMPT_GENERATION_TRACE: lambda: self.prompt_generation_trace(**self.kwargs), TraceTaskName.NODE_EXECUTION_TRACE: lambda: self.node_execution_trace(**self.kwargs), TraceTaskName.DRAFT_NODE_EXECUTION_TRACE: lambda: self.draft_node_execution_trace(**self.kwargs), } @@ -1062,6 +1066,71 @@ class TraceTask: return generate_name_trace_info + def prompt_generation_trace(self, **kwargs) -> PromptGenerationTraceInfo | dict: + tenant_id = kwargs.get("tenant_id", "") + user_id = kwargs.get("user_id", "") + app_id = kwargs.get("app_id") + operation_type = kwargs.get("operation_type", "") + instruction = kwargs.get("instruction", "") + generated_output = kwargs.get("generated_output", "") + + prompt_tokens = kwargs.get("prompt_tokens", 0) + completion_tokens = kwargs.get("completion_tokens", 0) + total_tokens = kwargs.get("total_tokens", 0) + + model_provider = kwargs.get("model_provider", "") + model_name = kwargs.get("model_name", "") + + latency = kwargs.get("latency", 0.0) + + timer = kwargs.get("timer") + start_time = timer.get("start") if timer else None + end_time = timer.get("end") if timer else None + + total_price = kwargs.get("total_price") + currency = kwargs.get("currency") + + error = kwargs.get("error") + + app_name = None + workspace_name = None + if app_id: + app_name, workspace_name = _lookup_app_and_workspace_names(app_id, tenant_id) + + metadata = { + "tenant_id": tenant_id, + "user_id": user_id, + "app_id": app_id or "", + "app_name": app_name, + "workspace_name": workspace_name, + "operation_type": operation_type, + "model_provider": model_provider, + "model_name": model_name, + } + + return PromptGenerationTraceInfo( + trace_id=self.trace_id, + inputs=instruction, + outputs=generated_output, + start_time=start_time, + end_time=end_time, + metadata=metadata, + tenant_id=tenant_id, + user_id=user_id, + app_id=app_id, + operation_type=operation_type, + instruction=instruction, + prompt_tokens=prompt_tokens, + completion_tokens=completion_tokens, + total_tokens=total_tokens, + model_provider=model_provider, + model_name=model_name, + latency=latency, + total_price=total_price, + currency=currency, + error=error, + ) + def node_execution_trace(self, **kwargs) -> WorkflowNodeTraceInfo | dict: node_data: dict = kwargs.get("node_execution_data", {}) if not node_data: diff --git a/api/enterprise/telemetry/enterprise_trace.py b/api/enterprise/telemetry/enterprise_trace.py index a1d52e0441..f23f84637e 100644 --- a/api/enterprise/telemetry/enterprise_trace.py +++ b/api/enterprise/telemetry/enterprise_trace.py @@ -21,6 +21,7 @@ from core.ops.entities.trace_entity import ( GenerateNameTraceInfo, MessageTraceInfo, ModerationTraceInfo, + PromptGenerationTraceInfo, SuggestedQuestionTraceInfo, ToolTraceInfo, WorkflowNodeTraceInfo, @@ -70,6 +71,8 @@ class EnterpriseOtelTrace: self._dataset_retrieval_trace(trace_info) elif isinstance(trace_info, GenerateNameTraceInfo): self._generate_name_trace(trace_info) + elif isinstance(trace_info, PromptGenerationTraceInfo): + self._prompt_generation_trace(trace_info) def _common_attrs(self, trace_info: BaseTraceInfo) -> dict[str, Any]: return { @@ -582,3 +585,74 @@ class EnterpriseOtelTrace: labels = {"tenant_id": info.tenant_id, "app_id": info.metadata.get("app_id", "")} self._exporter.increment_counter(EnterpriseTelemetryCounter.REQUESTS, 1, {**labels, "type": "generate_name"}) + + def _prompt_generation_trace(self, info: PromptGenerationTraceInfo) -> None: + attrs = { + "dify.trace_id": info.trace_id, + "dify.tenant_id": info.tenant_id, + "dify.user.id": info.user_id, + "dify.app.id": info.app_id or "", + "dify.app.name": info.metadata.get("app_name"), + "dify.workspace.name": info.metadata.get("workspace_name"), + "dify.operation.type": info.operation_type, + "gen_ai.provider.name": info.model_provider, + "gen_ai.request.model": info.model_name, + "gen_ai.usage.input_tokens": info.prompt_tokens, + "gen_ai.usage.output_tokens": info.completion_tokens, + "gen_ai.usage.total_tokens": info.total_tokens, + "dify.prompt_generation.latency": info.latency, + "dify.prompt_generation.error": info.error, + } + + if info.total_price is not None: + attrs["dify.prompt_generation.total_price"] = info.total_price + attrs["dify.prompt_generation.currency"] = info.currency + + if self._exporter.include_content: + attrs["dify.prompt_generation.instruction"] = info.instruction + attrs["dify.prompt_generation.output"] = self._maybe_json(info.outputs) + else: + ref = f"ref:trace_id={info.trace_id}" + attrs["dify.prompt_generation.instruction"] = ref + attrs["dify.prompt_generation.output"] = ref + + emit_metric_only_event( + event_name="dify.prompt_generation.execution", + attributes=attrs, + tenant_id=info.tenant_id, + user_id=info.user_id, + ) + + labels: dict[str, Any] = { + "tenant_id": info.tenant_id, + "app_id": info.app_id or "", + "operation_type": info.operation_type, + "model_provider": info.model_provider, + "model_name": info.model_name, + } + + self._exporter.increment_counter(EnterpriseTelemetryCounter.TOKENS, info.total_tokens, labels) + if info.prompt_tokens > 0: + self._exporter.increment_counter(EnterpriseTelemetryCounter.INPUT_TOKENS, info.prompt_tokens, labels) + if info.completion_tokens > 0: + self._exporter.increment_counter(EnterpriseTelemetryCounter.OUTPUT_TOKENS, info.completion_tokens, labels) + + status = "failed" if info.error else "success" + self._exporter.increment_counter( + EnterpriseTelemetryCounter.REQUESTS, + 1, + {**labels, "type": "prompt_generation", "status": status}, + ) + + self._exporter.record_histogram( + EnterpriseTelemetryHistogram.PROMPT_GENERATION_DURATION, + info.latency, + labels, + ) + + if info.error: + self._exporter.increment_counter( + EnterpriseTelemetryCounter.ERRORS, + 1, + {**labels, "type": "prompt_generation"}, + ) diff --git a/api/enterprise/telemetry/entities/__init__.py b/api/enterprise/telemetry/entities/__init__.py index c7fd99bd01..c0a5499c6c 100644 --- a/api/enterprise/telemetry/entities/__init__.py +++ b/api/enterprise/telemetry/entities/__init__.py @@ -23,6 +23,7 @@ class EnterpriseTelemetryHistogram(StrEnum): MESSAGE_DURATION = "message_duration" MESSAGE_TTFT = "message_ttft" TOOL_DURATION = "tool_duration" + PROMPT_GENERATION_DURATION = "prompt_generation_duration" __all__ = [ diff --git a/api/enterprise/telemetry/exporter.py b/api/enterprise/telemetry/exporter.py index cbbdbb84bf..529c38741a 100644 --- a/api/enterprise/telemetry/exporter.py +++ b/api/enterprise/telemetry/exporter.py @@ -150,6 +150,9 @@ class EnterpriseExporter: "dify.message.time_to_first_token", unit="s" ), EnterpriseTelemetryHistogram.TOOL_DURATION: meter.create_histogram("dify.tool.duration", unit="s"), + EnterpriseTelemetryHistogram.PROMPT_GENERATION_DURATION: meter.create_histogram( + "dify.prompt_generation.duration", unit="s" + ), } def export_span(