mirror of https://github.com/langgenius/dify.git
Merge branch '1.11.4-otel-telemetry-ee' into deploy/enterprise
This commit is contained in:
commit
1ca7d69197
|
|
@ -79,7 +79,7 @@ class RuleGenerateApi(Resource):
|
|||
@account_initialization_required
|
||||
def post(self):
|
||||
args = RuleGeneratePayload.model_validate(console_ns.payload)
|
||||
_, current_tenant_id = current_account_with_tenant()
|
||||
account, current_tenant_id = current_account_with_tenant()
|
||||
|
||||
try:
|
||||
rules = LLMGenerator.generate_rule_config(
|
||||
|
|
@ -87,6 +87,8 @@ class RuleGenerateApi(Resource):
|
|||
instruction=args.instruction,
|
||||
model_config=args.model_config_data,
|
||||
no_variable=args.no_variable,
|
||||
user_id=account.id,
|
||||
app_id=None,
|
||||
)
|
||||
except ProviderTokenNotInitError as ex:
|
||||
raise ProviderNotInitializeError(ex.description)
|
||||
|
|
@ -113,7 +115,7 @@ class RuleCodeGenerateApi(Resource):
|
|||
@account_initialization_required
|
||||
def post(self):
|
||||
args = RuleCodeGeneratePayload.model_validate(console_ns.payload)
|
||||
_, current_tenant_id = current_account_with_tenant()
|
||||
account, current_tenant_id = current_account_with_tenant()
|
||||
|
||||
try:
|
||||
code_result = LLMGenerator.generate_code(
|
||||
|
|
@ -121,6 +123,8 @@ class RuleCodeGenerateApi(Resource):
|
|||
instruction=args.instruction,
|
||||
model_config=args.model_config_data,
|
||||
code_language=args.code_language,
|
||||
user_id=account.id,
|
||||
app_id=None,
|
||||
)
|
||||
except ProviderTokenNotInitError as ex:
|
||||
raise ProviderNotInitializeError(ex.description)
|
||||
|
|
@ -147,13 +151,15 @@ class RuleStructuredOutputGenerateApi(Resource):
|
|||
@account_initialization_required
|
||||
def post(self):
|
||||
args = RuleStructuredOutputPayload.model_validate(console_ns.payload)
|
||||
_, current_tenant_id = current_account_with_tenant()
|
||||
account, current_tenant_id = current_account_with_tenant()
|
||||
|
||||
try:
|
||||
structured_output = LLMGenerator.generate_structured_output(
|
||||
tenant_id=current_tenant_id,
|
||||
instruction=args.instruction,
|
||||
model_config=args.model_config_data,
|
||||
user_id=account.id,
|
||||
app_id=None,
|
||||
)
|
||||
except ProviderTokenNotInitError as ex:
|
||||
raise ProviderNotInitializeError(ex.description)
|
||||
|
|
@ -180,14 +186,13 @@ class InstructionGenerateApi(Resource):
|
|||
@account_initialization_required
|
||||
def post(self):
|
||||
args = InstructionGeneratePayload.model_validate(console_ns.payload)
|
||||
_, current_tenant_id = current_account_with_tenant()
|
||||
account, current_tenant_id = current_account_with_tenant()
|
||||
providers: list[type[CodeNodeProvider]] = [Python3CodeProvider, JavascriptCodeProvider]
|
||||
code_provider: type[CodeNodeProvider] | None = next(
|
||||
(p for p in providers if p.is_accept_language(args.language)), None
|
||||
)
|
||||
code_template = code_provider.get_default_code() if code_provider else ""
|
||||
try:
|
||||
# Generate from nothing for a workflow node
|
||||
if (args.current in (code_template, "")) and args.node_id != "":
|
||||
app = db.session.query(App).where(App.id == args.flow_id).first()
|
||||
if not app:
|
||||
|
|
@ -207,6 +212,8 @@ class InstructionGenerateApi(Resource):
|
|||
instruction=args.instruction,
|
||||
model_config=args.model_config_data,
|
||||
no_variable=True,
|
||||
user_id=account.id,
|
||||
app_id=args.flow_id,
|
||||
)
|
||||
case "agent":
|
||||
return LLMGenerator.generate_rule_config(
|
||||
|
|
@ -214,6 +221,8 @@ class InstructionGenerateApi(Resource):
|
|||
instruction=args.instruction,
|
||||
model_config=args.model_config_data,
|
||||
no_variable=True,
|
||||
user_id=account.id,
|
||||
app_id=args.flow_id,
|
||||
)
|
||||
case "code":
|
||||
return LLMGenerator.generate_code(
|
||||
|
|
@ -221,10 +230,12 @@ class InstructionGenerateApi(Resource):
|
|||
instruction=args.instruction,
|
||||
model_config=args.model_config_data,
|
||||
code_language=args.language,
|
||||
user_id=account.id,
|
||||
app_id=args.flow_id,
|
||||
)
|
||||
case _:
|
||||
return {"error": f"invalid node type: {node_type}"}
|
||||
if args.node_id == "" and args.current != "": # For legacy app without a workflow
|
||||
if args.node_id == "" and args.current != "":
|
||||
return LLMGenerator.instruction_modify_legacy(
|
||||
tenant_id=current_tenant_id,
|
||||
flow_id=args.flow_id,
|
||||
|
|
@ -232,8 +243,10 @@ class InstructionGenerateApi(Resource):
|
|||
instruction=args.instruction,
|
||||
model_config=args.model_config_data,
|
||||
ideal_output=args.ideal_output,
|
||||
user_id=account.id,
|
||||
app_id=args.flow_id,
|
||||
)
|
||||
if args.node_id != "" and args.current != "": # For workflow node
|
||||
if args.node_id != "" and args.current != "":
|
||||
return LLMGenerator.instruction_modify_workflow(
|
||||
tenant_id=current_tenant_id,
|
||||
flow_id=args.flow_id,
|
||||
|
|
@ -243,6 +256,8 @@ class InstructionGenerateApi(Resource):
|
|||
model_config=args.model_config_data,
|
||||
ideal_output=args.ideal_output,
|
||||
workflow_service=WorkflowService(),
|
||||
user_id=account.id,
|
||||
app_id=args.flow_id,
|
||||
)
|
||||
return {"error": "incompatible parameters"}, 400
|
||||
except ProviderTokenNotInitError as ex:
|
||||
|
|
|
|||
|
|
@ -151,7 +151,15 @@ class LLMGenerator:
|
|||
return questions
|
||||
|
||||
@classmethod
|
||||
def generate_rule_config(cls, tenant_id: str, instruction: str, model_config: dict, no_variable: bool):
|
||||
def generate_rule_config(
|
||||
cls,
|
||||
tenant_id: str,
|
||||
instruction: str,
|
||||
model_config: dict,
|
||||
no_variable: bool,
|
||||
user_id: str | None = None,
|
||||
app_id: str | None = None,
|
||||
):
|
||||
output_parser = RuleConfigGeneratorOutputParser()
|
||||
|
||||
error = ""
|
||||
|
|
@ -179,22 +187,40 @@ class LLMGenerator:
|
|||
model=model_config.get("name", ""),
|
||||
)
|
||||
|
||||
try:
|
||||
response: LLMResult = model_instance.invoke_llm(
|
||||
prompt_messages=list(prompt_messages), model_parameters=model_parameters, stream=False
|
||||
)
|
||||
llm_result = None
|
||||
with measure_time() as timer:
|
||||
try:
|
||||
llm_result = model_instance.invoke_llm(
|
||||
prompt_messages=list(prompt_messages), model_parameters=model_parameters, stream=False
|
||||
)
|
||||
|
||||
rule_config["prompt"] = cast(str, response.message.content)
|
||||
rule_config["prompt"] = cast(str, llm_result.message.content)
|
||||
|
||||
except InvokeError as e:
|
||||
error = str(e)
|
||||
error_step = "generate rule config"
|
||||
except Exception as e:
|
||||
logger.exception("Failed to generate rule config, model: %s", model_config.get("name"))
|
||||
rule_config["error"] = str(e)
|
||||
except InvokeError as e:
|
||||
error = str(e)
|
||||
error_step = "generate rule config"
|
||||
except Exception as e:
|
||||
logger.exception("Failed to generate rule config, model: %s", model_config.get("name"))
|
||||
rule_config["error"] = str(e)
|
||||
error = str(e)
|
||||
|
||||
rule_config["error"] = f"Failed to {error_step}. Error: {error}" if error else ""
|
||||
|
||||
if user_id:
|
||||
prompt_value = rule_config.get("prompt", "")
|
||||
generated_output = str(prompt_value) if prompt_value else ""
|
||||
cls._emit_prompt_generation_trace(
|
||||
tenant_id=tenant_id,
|
||||
user_id=user_id,
|
||||
app_id=app_id,
|
||||
operation_type="rule_generate",
|
||||
instruction=instruction,
|
||||
generated_output=generated_output,
|
||||
llm_result=llm_result,
|
||||
timer=timer,
|
||||
error=error or None,
|
||||
)
|
||||
|
||||
return rule_config
|
||||
|
||||
# get rule config prompt, parameter and statement
|
||||
|
|
@ -286,7 +312,15 @@ class LLMGenerator:
|
|||
return rule_config
|
||||
|
||||
@classmethod
|
||||
def generate_code(cls, tenant_id: str, instruction: str, model_config: dict, code_language: str = "javascript"):
|
||||
def generate_code(
|
||||
cls,
|
||||
tenant_id: str,
|
||||
instruction: str,
|
||||
model_config: dict,
|
||||
code_language: str = "javascript",
|
||||
user_id: str | None = None,
|
||||
app_id: str | None = None,
|
||||
):
|
||||
if code_language == "python":
|
||||
prompt_template = PromptTemplateParser(PYTHON_CODE_GENERATOR_PROMPT_TEMPLATE)
|
||||
else:
|
||||
|
|
@ -310,22 +344,42 @@ class LLMGenerator:
|
|||
|
||||
prompt_messages = [UserPromptMessage(content=prompt)]
|
||||
model_parameters = model_config.get("completion_params", {})
|
||||
try:
|
||||
response: LLMResult = model_instance.invoke_llm(
|
||||
prompt_messages=list(prompt_messages), model_parameters=model_parameters, stream=False
|
||||
|
||||
llm_result = None
|
||||
error = None
|
||||
with measure_time() as timer:
|
||||
try:
|
||||
llm_result = model_instance.invoke_llm(
|
||||
prompt_messages=list(prompt_messages), model_parameters=model_parameters, stream=False
|
||||
)
|
||||
|
||||
generated_code = cast(str, llm_result.message.content)
|
||||
result = {"code": generated_code, "language": code_language, "error": ""}
|
||||
|
||||
except InvokeError as e:
|
||||
error = str(e)
|
||||
result = {"code": "", "language": code_language, "error": f"Failed to generate code. Error: {error}"}
|
||||
except Exception as e:
|
||||
logger.exception(
|
||||
"Failed to invoke LLM model, model: %s, language: %s", model_config.get("name"), code_language
|
||||
)
|
||||
error = str(e)
|
||||
result = {"code": "", "language": code_language, "error": f"An unexpected error occurred: {str(e)}"}
|
||||
|
||||
if user_id:
|
||||
cls._emit_prompt_generation_trace(
|
||||
tenant_id=tenant_id,
|
||||
user_id=user_id,
|
||||
app_id=app_id,
|
||||
operation_type="code_generate",
|
||||
instruction=instruction,
|
||||
generated_output=result.get("code", ""),
|
||||
llm_result=llm_result,
|
||||
timer=timer,
|
||||
error=error,
|
||||
)
|
||||
|
||||
generated_code = cast(str, response.message.content)
|
||||
return {"code": generated_code, "language": code_language, "error": ""}
|
||||
|
||||
except InvokeError as e:
|
||||
error = str(e)
|
||||
return {"code": "", "language": code_language, "error": f"Failed to generate code. Error: {error}"}
|
||||
except Exception as e:
|
||||
logger.exception(
|
||||
"Failed to invoke LLM model, model: %s, language: %s", model_config.get("name"), code_language
|
||||
)
|
||||
return {"code": "", "language": code_language, "error": f"An unexpected error occurred: {str(e)}"}
|
||||
return result
|
||||
|
||||
@classmethod
|
||||
def generate_qa_document(cls, tenant_id: str, query, document_language: str):
|
||||
|
|
@ -355,7 +409,9 @@ class LLMGenerator:
|
|||
return answer.strip()
|
||||
|
||||
@classmethod
|
||||
def generate_structured_output(cls, tenant_id: str, instruction: str, model_config: dict):
|
||||
def generate_structured_output(
|
||||
cls, tenant_id: str, instruction: str, model_config: dict, user_id: str | None = None, app_id: str | None = None
|
||||
):
|
||||
model_manager = ModelManager()
|
||||
model_instance = model_manager.get_model_instance(
|
||||
tenant_id=tenant_id,
|
||||
|
|
@ -370,43 +426,71 @@ class LLMGenerator:
|
|||
]
|
||||
model_parameters = model_config.get("model_parameters", {})
|
||||
|
||||
try:
|
||||
response: LLMResult = model_instance.invoke_llm(
|
||||
prompt_messages=list(prompt_messages), model_parameters=model_parameters, stream=False
|
||||
llm_result = None
|
||||
error = None
|
||||
result = {"output": "", "error": ""}
|
||||
|
||||
with measure_time() as timer:
|
||||
try:
|
||||
llm_result = model_instance.invoke_llm(
|
||||
prompt_messages=list(prompt_messages), model_parameters=model_parameters, stream=False
|
||||
)
|
||||
|
||||
raw_content = llm_result.message.content
|
||||
|
||||
if not isinstance(raw_content, str):
|
||||
raise ValueError(f"LLM response content must be a string, got: {type(raw_content)}")
|
||||
|
||||
try:
|
||||
parsed_content = json.loads(raw_content)
|
||||
except json.JSONDecodeError:
|
||||
parsed_content = json_repair.loads(raw_content)
|
||||
|
||||
if not isinstance(parsed_content, dict | list):
|
||||
raise ValueError(f"Failed to parse structured output from llm: {raw_content}")
|
||||
|
||||
generated_json_schema = json.dumps(parsed_content, indent=2, ensure_ascii=False)
|
||||
result = {"output": generated_json_schema, "error": ""}
|
||||
|
||||
except InvokeError as e:
|
||||
error = str(e)
|
||||
result = {"output": "", "error": f"Failed to generate JSON Schema. Error: {error}"}
|
||||
except Exception as e:
|
||||
logger.exception("Failed to invoke LLM model, model: %s", model_config.get("name"))
|
||||
error = str(e)
|
||||
result = {"output": "", "error": f"An unexpected error occurred: {str(e)}"}
|
||||
|
||||
if user_id:
|
||||
cls._emit_prompt_generation_trace(
|
||||
tenant_id=tenant_id,
|
||||
user_id=user_id,
|
||||
app_id=app_id,
|
||||
operation_type="structured_output",
|
||||
instruction=instruction,
|
||||
generated_output=result.get("output", ""),
|
||||
llm_result=llm_result,
|
||||
timer=timer,
|
||||
error=error,
|
||||
)
|
||||
|
||||
raw_content = response.message.content
|
||||
|
||||
if not isinstance(raw_content, str):
|
||||
raise ValueError(f"LLM response content must be a string, got: {type(raw_content)}")
|
||||
|
||||
try:
|
||||
parsed_content = json.loads(raw_content)
|
||||
except json.JSONDecodeError:
|
||||
parsed_content = json_repair.loads(raw_content)
|
||||
|
||||
if not isinstance(parsed_content, dict | list):
|
||||
raise ValueError(f"Failed to parse structured output from llm: {raw_content}")
|
||||
|
||||
generated_json_schema = json.dumps(parsed_content, indent=2, ensure_ascii=False)
|
||||
return {"output": generated_json_schema, "error": ""}
|
||||
|
||||
except InvokeError as e:
|
||||
error = str(e)
|
||||
return {"output": "", "error": f"Failed to generate JSON Schema. Error: {error}"}
|
||||
except Exception as e:
|
||||
logger.exception("Failed to invoke LLM model, model: %s", model_config.get("name"))
|
||||
return {"output": "", "error": f"An unexpected error occurred: {str(e)}"}
|
||||
return result
|
||||
|
||||
@staticmethod
|
||||
def instruction_modify_legacy(
|
||||
tenant_id: str, flow_id: str, current: str, instruction: str, model_config: dict, ideal_output: str | None
|
||||
tenant_id: str,
|
||||
flow_id: str,
|
||||
current: str,
|
||||
instruction: str,
|
||||
model_config: dict,
|
||||
ideal_output: str | None,
|
||||
user_id: str | None = None,
|
||||
app_id: str | None = None,
|
||||
):
|
||||
last_run: Message | None = (
|
||||
db.session.query(Message).where(Message.app_id == flow_id).order_by(Message.created_at.desc()).first()
|
||||
)
|
||||
if not last_run:
|
||||
return LLMGenerator.__instruction_modify_common(
|
||||
result = LLMGenerator.__instruction_modify_common(
|
||||
tenant_id=tenant_id,
|
||||
model_config=model_config,
|
||||
last_run=None,
|
||||
|
|
@ -415,22 +499,28 @@ class LLMGenerator:
|
|||
instruction=instruction,
|
||||
node_type="llm",
|
||||
ideal_output=ideal_output,
|
||||
user_id=user_id,
|
||||
app_id=app_id,
|
||||
)
|
||||
last_run_dict = {
|
||||
"query": last_run.query,
|
||||
"answer": last_run.answer,
|
||||
"error": last_run.error,
|
||||
}
|
||||
return LLMGenerator.__instruction_modify_common(
|
||||
tenant_id=tenant_id,
|
||||
model_config=model_config,
|
||||
last_run=last_run_dict,
|
||||
current=current,
|
||||
error_message=str(last_run.error),
|
||||
instruction=instruction,
|
||||
node_type="llm",
|
||||
ideal_output=ideal_output,
|
||||
)
|
||||
else:
|
||||
last_run_dict = {
|
||||
"query": last_run.query,
|
||||
"answer": last_run.answer,
|
||||
"error": last_run.error,
|
||||
}
|
||||
result = LLMGenerator.__instruction_modify_common(
|
||||
tenant_id=tenant_id,
|
||||
model_config=model_config,
|
||||
last_run=last_run_dict,
|
||||
current=current,
|
||||
error_message=str(last_run.error),
|
||||
instruction=instruction,
|
||||
node_type="llm",
|
||||
ideal_output=ideal_output,
|
||||
user_id=user_id,
|
||||
app_id=app_id,
|
||||
)
|
||||
return result
|
||||
|
||||
@staticmethod
|
||||
def instruction_modify_workflow(
|
||||
|
|
@ -442,6 +532,8 @@ class LLMGenerator:
|
|||
model_config: dict,
|
||||
ideal_output: str | None,
|
||||
workflow_service: WorkflowServiceInterface,
|
||||
user_id: str | None = None,
|
||||
app_id: str | None = None,
|
||||
):
|
||||
session = db.session()
|
||||
|
||||
|
|
@ -472,6 +564,8 @@ class LLMGenerator:
|
|||
instruction=instruction,
|
||||
node_type=node_type,
|
||||
ideal_output=ideal_output,
|
||||
user_id=user_id,
|
||||
app_id=app_id,
|
||||
)
|
||||
|
||||
def agent_log_of(node_execution: WorkflowNodeExecutionModel) -> Sequence:
|
||||
|
|
@ -505,6 +599,8 @@ class LLMGenerator:
|
|||
instruction=instruction,
|
||||
node_type=last_run.node_type,
|
||||
ideal_output=ideal_output,
|
||||
user_id=user_id,
|
||||
app_id=app_id,
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
|
|
@ -517,6 +613,8 @@ class LLMGenerator:
|
|||
instruction: str,
|
||||
node_type: str,
|
||||
ideal_output: str | None,
|
||||
user_id: str | None = None,
|
||||
app_id: str | None = None,
|
||||
):
|
||||
LAST_RUN = "{{#last_run#}}"
|
||||
CURRENT = "{{#current#}}"
|
||||
|
|
@ -556,26 +654,114 @@ class LLMGenerator:
|
|||
]
|
||||
model_parameters = {"temperature": 0.4}
|
||||
|
||||
try:
|
||||
response: LLMResult = model_instance.invoke_llm(
|
||||
prompt_messages=list(prompt_messages), model_parameters=model_parameters, stream=False
|
||||
llm_result = None
|
||||
error = None
|
||||
result = {}
|
||||
|
||||
with measure_time() as timer:
|
||||
try:
|
||||
llm_result = model_instance.invoke_llm(
|
||||
prompt_messages=list(prompt_messages), model_parameters=model_parameters, stream=False
|
||||
)
|
||||
|
||||
generated_raw = llm_result.message.get_text_content()
|
||||
first_brace = generated_raw.find("{")
|
||||
last_brace = generated_raw.rfind("}")
|
||||
if first_brace == -1 or last_brace == -1 or last_brace < first_brace:
|
||||
raise ValueError(f"Could not find a valid JSON object in response: {generated_raw}")
|
||||
json_str = generated_raw[first_brace : last_brace + 1]
|
||||
data = json_repair.loads(json_str)
|
||||
if not isinstance(data, dict):
|
||||
raise TypeError(f"Expected a JSON object, but got {type(data).__name__}")
|
||||
result = data
|
||||
except InvokeError as e:
|
||||
error = str(e)
|
||||
result = {"error": f"Failed to generate code. Error: {error}"}
|
||||
except Exception as e:
|
||||
logger.exception(
|
||||
"Failed to invoke LLM model, model: %s", json.dumps(model_config.get("name")), exc_info=True
|
||||
)
|
||||
error = str(e)
|
||||
result = {"error": f"An unexpected error occurred: {str(e)}"}
|
||||
|
||||
if user_id:
|
||||
generated_output = ""
|
||||
if isinstance(result, dict):
|
||||
for key in ["prompt", "code", "output", "modified"]:
|
||||
if result.get(key):
|
||||
generated_output = str(result[key])
|
||||
break
|
||||
|
||||
LLMGenerator._emit_prompt_generation_trace(
|
||||
tenant_id=tenant_id,
|
||||
user_id=user_id,
|
||||
app_id=app_id,
|
||||
operation_type="instruction_modify",
|
||||
instruction=instruction,
|
||||
generated_output=generated_output,
|
||||
llm_result=llm_result,
|
||||
timer=timer,
|
||||
error=error,
|
||||
)
|
||||
|
||||
generated_raw = response.message.get_text_content()
|
||||
first_brace = generated_raw.find("{")
|
||||
last_brace = generated_raw.rfind("}")
|
||||
if first_brace == -1 or last_brace == -1 or last_brace < first_brace:
|
||||
raise ValueError(f"Could not find a valid JSON object in response: {generated_raw}")
|
||||
json_str = generated_raw[first_brace : last_brace + 1]
|
||||
data = json_repair.loads(json_str)
|
||||
if not isinstance(data, dict):
|
||||
raise TypeError(f"Expected a JSON object, but got {type(data).__name__}")
|
||||
return data
|
||||
except InvokeError as e:
|
||||
error = str(e)
|
||||
return {"error": f"Failed to generate code. Error: {error}"}
|
||||
except Exception as e:
|
||||
logger.exception(
|
||||
"Failed to invoke LLM model, model: %s", json.dumps(model_config.get("name")), exc_info=True
|
||||
return result
|
||||
|
||||
@classmethod
|
||||
def _emit_prompt_generation_trace(
|
||||
cls,
|
||||
tenant_id: str,
|
||||
user_id: str,
|
||||
app_id: str | None,
|
||||
operation_type: str,
|
||||
instruction: str,
|
||||
generated_output: str,
|
||||
llm_result: LLMResult | None,
|
||||
timer,
|
||||
error: str | None = None,
|
||||
):
|
||||
if llm_result:
|
||||
prompt_tokens = llm_result.usage.prompt_tokens
|
||||
completion_tokens = llm_result.usage.completion_tokens
|
||||
total_tokens = llm_result.usage.total_tokens
|
||||
model_name = llm_result.model
|
||||
model_provider = model_name.split("/")[0] if "/" in model_name else ""
|
||||
latency = llm_result.usage.latency
|
||||
total_price = float(llm_result.usage.total_price) if llm_result.usage.total_price else None
|
||||
currency = llm_result.usage.currency
|
||||
else:
|
||||
prompt_tokens = 0
|
||||
completion_tokens = 0
|
||||
total_tokens = 0
|
||||
model_provider = ""
|
||||
model_name = ""
|
||||
latency = 0.0
|
||||
if timer:
|
||||
start_time = timer.get("start")
|
||||
end_time = timer.get("end")
|
||||
if start_time and end_time:
|
||||
latency = (end_time - start_time).total_seconds()
|
||||
total_price = None
|
||||
currency = None
|
||||
|
||||
trace_manager = TraceQueueManager(app_id=app_id)
|
||||
trace_manager.add_trace_task(
|
||||
TraceTask(
|
||||
TraceTaskName.PROMPT_GENERATION_TRACE,
|
||||
tenant_id=tenant_id,
|
||||
user_id=user_id,
|
||||
app_id=app_id,
|
||||
operation_type=operation_type,
|
||||
instruction=instruction,
|
||||
generated_output=generated_output,
|
||||
prompt_tokens=prompt_tokens,
|
||||
completion_tokens=completion_tokens,
|
||||
total_tokens=total_tokens,
|
||||
model_provider=model_provider,
|
||||
model_name=model_name,
|
||||
latency=latency,
|
||||
total_price=total_price,
|
||||
currency=currency,
|
||||
timer=timer,
|
||||
error=error,
|
||||
)
|
||||
return {"error": f"An unexpected error occurred: {str(e)}"}
|
||||
)
|
||||
|
|
|
|||
|
|
@ -118,6 +118,33 @@ class GenerateNameTraceInfo(BaseTraceInfo):
|
|||
tenant_id: str
|
||||
|
||||
|
||||
class PromptGenerationTraceInfo(BaseTraceInfo):
|
||||
"""Trace information for prompt generation operations (rule-generate, code-generate, etc.)."""
|
||||
|
||||
tenant_id: str
|
||||
user_id: str
|
||||
app_id: str | None = None
|
||||
|
||||
operation_type: str
|
||||
instruction: str
|
||||
|
||||
prompt_tokens: int
|
||||
completion_tokens: int
|
||||
total_tokens: int
|
||||
|
||||
model_provider: str
|
||||
model_name: str
|
||||
|
||||
latency: float
|
||||
|
||||
total_price: float | None = None
|
||||
currency: str | None = None
|
||||
|
||||
error: str | None = None
|
||||
|
||||
model_config = ConfigDict(protected_namespaces=())
|
||||
|
||||
|
||||
class WorkflowNodeTraceInfo(BaseTraceInfo):
|
||||
workflow_id: str
|
||||
workflow_run_id: str
|
||||
|
|
@ -178,6 +205,7 @@ trace_info_info_map = {
|
|||
"DatasetRetrievalTraceInfo": DatasetRetrievalTraceInfo,
|
||||
"ToolTraceInfo": ToolTraceInfo,
|
||||
"GenerateNameTraceInfo": GenerateNameTraceInfo,
|
||||
"PromptGenerationTraceInfo": PromptGenerationTraceInfo,
|
||||
"WorkflowNodeTraceInfo": WorkflowNodeTraceInfo,
|
||||
"DraftNodeExecutionTrace": DraftNodeExecutionTrace,
|
||||
}
|
||||
|
|
@ -193,5 +221,6 @@ class TraceTaskName(StrEnum):
|
|||
DATASET_RETRIEVAL_TRACE = "dataset_retrieval"
|
||||
TOOL_TRACE = "tool"
|
||||
GENERATE_NAME_TRACE = "generate_conversation_name"
|
||||
PROMPT_GENERATION_TRACE = "prompt_generation"
|
||||
DATASOURCE_TRACE = "datasource"
|
||||
NODE_EXECUTION_TRACE = "node_execution"
|
||||
|
|
|
|||
|
|
@ -25,6 +25,7 @@ from core.ops.entities.trace_entity import (
|
|||
GenerateNameTraceInfo,
|
||||
MessageTraceInfo,
|
||||
ModerationTraceInfo,
|
||||
PromptGenerationTraceInfo,
|
||||
SuggestedQuestionTraceInfo,
|
||||
TaskData,
|
||||
ToolTraceInfo,
|
||||
|
|
@ -590,6 +591,8 @@ class TraceTask:
|
|||
self.app_id = None
|
||||
self.trace_id = None
|
||||
self.kwargs = kwargs
|
||||
if user_id is not None and "user_id" not in self.kwargs:
|
||||
self.kwargs["user_id"] = user_id
|
||||
external_trace_id = kwargs.get("external_trace_id")
|
||||
if external_trace_id:
|
||||
self.trace_id = external_trace_id
|
||||
|
|
@ -619,6 +622,7 @@ class TraceTask:
|
|||
TraceTaskName.GENERATE_NAME_TRACE: lambda: self.generate_name_trace(
|
||||
conversation_id=self.conversation_id, timer=self.timer, **self.kwargs
|
||||
),
|
||||
TraceTaskName.PROMPT_GENERATION_TRACE: lambda: self.prompt_generation_trace(**self.kwargs),
|
||||
TraceTaskName.NODE_EXECUTION_TRACE: lambda: self.node_execution_trace(**self.kwargs),
|
||||
TraceTaskName.DRAFT_NODE_EXECUTION_TRACE: lambda: self.draft_node_execution_trace(**self.kwargs),
|
||||
}
|
||||
|
|
@ -1062,6 +1066,71 @@ class TraceTask:
|
|||
|
||||
return generate_name_trace_info
|
||||
|
||||
def prompt_generation_trace(self, **kwargs) -> PromptGenerationTraceInfo | dict:
|
||||
tenant_id = kwargs.get("tenant_id", "")
|
||||
user_id = kwargs.get("user_id", "")
|
||||
app_id = kwargs.get("app_id")
|
||||
operation_type = kwargs.get("operation_type", "")
|
||||
instruction = kwargs.get("instruction", "")
|
||||
generated_output = kwargs.get("generated_output", "")
|
||||
|
||||
prompt_tokens = kwargs.get("prompt_tokens", 0)
|
||||
completion_tokens = kwargs.get("completion_tokens", 0)
|
||||
total_tokens = kwargs.get("total_tokens", 0)
|
||||
|
||||
model_provider = kwargs.get("model_provider", "")
|
||||
model_name = kwargs.get("model_name", "")
|
||||
|
||||
latency = kwargs.get("latency", 0.0)
|
||||
|
||||
timer = kwargs.get("timer")
|
||||
start_time = timer.get("start") if timer else None
|
||||
end_time = timer.get("end") if timer else None
|
||||
|
||||
total_price = kwargs.get("total_price")
|
||||
currency = kwargs.get("currency")
|
||||
|
||||
error = kwargs.get("error")
|
||||
|
||||
app_name = None
|
||||
workspace_name = None
|
||||
if app_id:
|
||||
app_name, workspace_name = _lookup_app_and_workspace_names(app_id, tenant_id)
|
||||
|
||||
metadata = {
|
||||
"tenant_id": tenant_id,
|
||||
"user_id": user_id,
|
||||
"app_id": app_id or "",
|
||||
"app_name": app_name,
|
||||
"workspace_name": workspace_name,
|
||||
"operation_type": operation_type,
|
||||
"model_provider": model_provider,
|
||||
"model_name": model_name,
|
||||
}
|
||||
|
||||
return PromptGenerationTraceInfo(
|
||||
trace_id=self.trace_id,
|
||||
inputs=instruction,
|
||||
outputs=generated_output,
|
||||
start_time=start_time,
|
||||
end_time=end_time,
|
||||
metadata=metadata,
|
||||
tenant_id=tenant_id,
|
||||
user_id=user_id,
|
||||
app_id=app_id,
|
||||
operation_type=operation_type,
|
||||
instruction=instruction,
|
||||
prompt_tokens=prompt_tokens,
|
||||
completion_tokens=completion_tokens,
|
||||
total_tokens=total_tokens,
|
||||
model_provider=model_provider,
|
||||
model_name=model_name,
|
||||
latency=latency,
|
||||
total_price=total_price,
|
||||
currency=currency,
|
||||
error=error,
|
||||
)
|
||||
|
||||
def node_execution_trace(self, **kwargs) -> WorkflowNodeTraceInfo | dict:
|
||||
node_data: dict = kwargs.get("node_execution_data", {})
|
||||
if not node_data:
|
||||
|
|
|
|||
|
|
@ -21,6 +21,7 @@ from core.ops.entities.trace_entity import (
|
|||
GenerateNameTraceInfo,
|
||||
MessageTraceInfo,
|
||||
ModerationTraceInfo,
|
||||
PromptGenerationTraceInfo,
|
||||
SuggestedQuestionTraceInfo,
|
||||
ToolTraceInfo,
|
||||
WorkflowNodeTraceInfo,
|
||||
|
|
@ -70,6 +71,8 @@ class EnterpriseOtelTrace:
|
|||
self._dataset_retrieval_trace(trace_info)
|
||||
elif isinstance(trace_info, GenerateNameTraceInfo):
|
||||
self._generate_name_trace(trace_info)
|
||||
elif isinstance(trace_info, PromptGenerationTraceInfo):
|
||||
self._prompt_generation_trace(trace_info)
|
||||
|
||||
def _common_attrs(self, trace_info: BaseTraceInfo) -> dict[str, Any]:
|
||||
return {
|
||||
|
|
@ -582,3 +585,74 @@ class EnterpriseOtelTrace:
|
|||
|
||||
labels = {"tenant_id": info.tenant_id, "app_id": info.metadata.get("app_id", "")}
|
||||
self._exporter.increment_counter(EnterpriseTelemetryCounter.REQUESTS, 1, {**labels, "type": "generate_name"})
|
||||
|
||||
def _prompt_generation_trace(self, info: PromptGenerationTraceInfo) -> None:
|
||||
attrs = {
|
||||
"dify.trace_id": info.trace_id,
|
||||
"dify.tenant_id": info.tenant_id,
|
||||
"dify.user.id": info.user_id,
|
||||
"dify.app.id": info.app_id or "",
|
||||
"dify.app.name": info.metadata.get("app_name"),
|
||||
"dify.workspace.name": info.metadata.get("workspace_name"),
|
||||
"dify.operation.type": info.operation_type,
|
||||
"gen_ai.provider.name": info.model_provider,
|
||||
"gen_ai.request.model": info.model_name,
|
||||
"gen_ai.usage.input_tokens": info.prompt_tokens,
|
||||
"gen_ai.usage.output_tokens": info.completion_tokens,
|
||||
"gen_ai.usage.total_tokens": info.total_tokens,
|
||||
"dify.prompt_generation.latency": info.latency,
|
||||
"dify.prompt_generation.error": info.error,
|
||||
}
|
||||
|
||||
if info.total_price is not None:
|
||||
attrs["dify.prompt_generation.total_price"] = info.total_price
|
||||
attrs["dify.prompt_generation.currency"] = info.currency
|
||||
|
||||
if self._exporter.include_content:
|
||||
attrs["dify.prompt_generation.instruction"] = info.instruction
|
||||
attrs["dify.prompt_generation.output"] = self._maybe_json(info.outputs)
|
||||
else:
|
||||
ref = f"ref:trace_id={info.trace_id}"
|
||||
attrs["dify.prompt_generation.instruction"] = ref
|
||||
attrs["dify.prompt_generation.output"] = ref
|
||||
|
||||
emit_metric_only_event(
|
||||
event_name="dify.prompt_generation.execution",
|
||||
attributes=attrs,
|
||||
tenant_id=info.tenant_id,
|
||||
user_id=info.user_id,
|
||||
)
|
||||
|
||||
labels: dict[str, Any] = {
|
||||
"tenant_id": info.tenant_id,
|
||||
"app_id": info.app_id or "",
|
||||
"operation_type": info.operation_type,
|
||||
"model_provider": info.model_provider,
|
||||
"model_name": info.model_name,
|
||||
}
|
||||
|
||||
self._exporter.increment_counter(EnterpriseTelemetryCounter.TOKENS, info.total_tokens, labels)
|
||||
if info.prompt_tokens > 0:
|
||||
self._exporter.increment_counter(EnterpriseTelemetryCounter.INPUT_TOKENS, info.prompt_tokens, labels)
|
||||
if info.completion_tokens > 0:
|
||||
self._exporter.increment_counter(EnterpriseTelemetryCounter.OUTPUT_TOKENS, info.completion_tokens, labels)
|
||||
|
||||
status = "failed" if info.error else "success"
|
||||
self._exporter.increment_counter(
|
||||
EnterpriseTelemetryCounter.REQUESTS,
|
||||
1,
|
||||
{**labels, "type": "prompt_generation", "status": status},
|
||||
)
|
||||
|
||||
self._exporter.record_histogram(
|
||||
EnterpriseTelemetryHistogram.PROMPT_GENERATION_DURATION,
|
||||
info.latency,
|
||||
labels,
|
||||
)
|
||||
|
||||
if info.error:
|
||||
self._exporter.increment_counter(
|
||||
EnterpriseTelemetryCounter.ERRORS,
|
||||
1,
|
||||
{**labels, "type": "prompt_generation"},
|
||||
)
|
||||
|
|
|
|||
|
|
@ -23,6 +23,7 @@ class EnterpriseTelemetryHistogram(StrEnum):
|
|||
MESSAGE_DURATION = "message_duration"
|
||||
MESSAGE_TTFT = "message_ttft"
|
||||
TOOL_DURATION = "tool_duration"
|
||||
PROMPT_GENERATION_DURATION = "prompt_generation_duration"
|
||||
|
||||
|
||||
__all__ = [
|
||||
|
|
|
|||
|
|
@ -150,6 +150,9 @@ class EnterpriseExporter:
|
|||
"dify.message.time_to_first_token", unit="s"
|
||||
),
|
||||
EnterpriseTelemetryHistogram.TOOL_DURATION: meter.create_histogram("dify.tool.duration", unit="s"),
|
||||
EnterpriseTelemetryHistogram.PROMPT_GENERATION_DURATION: meter.create_histogram(
|
||||
"dify.prompt_generation.duration", unit="s"
|
||||
),
|
||||
}
|
||||
|
||||
def export_span(
|
||||
|
|
|
|||
Loading…
Reference in New Issue