fix(telemetry): ensure CE safety for enterprise-only imports and DB lookups

- Move enqueue_draft_node_execution_trace import inside call site in workflow_service.py
- Make gateway.py enterprise type imports lazy (routing dicts built on first call)
- Restore typed ModelConfig in llm_generator method signatures (revert dict regression)
- Fix generate_structured_output using wrong key model_parameters -> completion_params
- Replace unsafe cast(str, msg.content) with get_text_content() across llm_generator
- Remove duplicated payload classes from generator.py, import from core.llm_generator.entities
- Gate _lookup_app_and_workspace_names and credential lookups in ops_trace_manager behind is_enterprise_telemetry_enabled()
This commit is contained in:
GareArc 2026-03-02 18:45:33 -08:00
parent d6de27a25a
commit 8a3485454a
No known key found for this signature in database
8 changed files with 167 additions and 131 deletions

View File

@ -1,5 +1,4 @@
from collections.abc import Sequence from collections.abc import Sequence
from typing import Any
from flask_restx import Resource from flask_restx import Resource
from pydantic import BaseModel, Field from pydantic import BaseModel, Field
@ -12,10 +11,12 @@ from controllers.console.app.error import (
ProviderQuotaExceededError, ProviderQuotaExceededError,
) )
from controllers.console.wraps import account_initialization_required, setup_required from controllers.console.wraps import account_initialization_required, setup_required
from core.app.app_config.entities import ModelConfig
from core.errors.error import ModelCurrentlyNotSupportError, ProviderTokenNotInitError, QuotaExceededError from core.errors.error import ModelCurrentlyNotSupportError, ProviderTokenNotInitError, QuotaExceededError
from core.helper.code_executor.code_node_provider import CodeNodeProvider from core.helper.code_executor.code_node_provider import CodeNodeProvider
from core.helper.code_executor.javascript.javascript_code_provider import JavascriptCodeProvider from core.helper.code_executor.javascript.javascript_code_provider import JavascriptCodeProvider
from core.helper.code_executor.python3.python3_code_provider import Python3CodeProvider from core.helper.code_executor.python3.python3_code_provider import Python3CodeProvider
from core.llm_generator.entities import RuleCodeGeneratePayload, RuleGeneratePayload, RuleStructuredOutputPayload
from core.llm_generator.llm_generator import LLMGenerator from core.llm_generator.llm_generator import LLMGenerator
from core.model_runtime.errors.invoke import InvokeError from core.model_runtime.errors.invoke import InvokeError
from extensions.ext_database import db from extensions.ext_database import db
@ -26,30 +27,13 @@ from services.workflow_service import WorkflowService
DEFAULT_REF_TEMPLATE_SWAGGER_2_0 = "#/definitions/{model}" DEFAULT_REF_TEMPLATE_SWAGGER_2_0 = "#/definitions/{model}"
class RuleGeneratePayload(BaseModel):
instruction: str = Field(..., description="Rule generation instruction")
model_config_data: dict[str, Any] = Field(..., alias="model_config", description="Model configuration")
no_variable: bool = Field(default=False, description="Whether to exclude variables")
app_id: str | None = Field(default=None, description="App ID for prompt generation tracing")
class RuleCodeGeneratePayload(RuleGeneratePayload):
code_language: str = Field(default="javascript", description="Programming language for code generation")
class RuleStructuredOutputPayload(BaseModel):
instruction: str = Field(..., description="Structured output generation instruction")
model_config_data: dict[str, Any] = Field(..., alias="model_config", description="Model configuration")
app_id: str | None = Field(default=None, description="App ID for prompt generation tracing")
class InstructionGeneratePayload(BaseModel): class InstructionGeneratePayload(BaseModel):
flow_id: str = Field(..., description="Workflow/Flow ID") flow_id: str = Field(..., description="Workflow/Flow ID")
node_id: str = Field(default="", description="Node ID for workflow context") node_id: str = Field(default="", description="Node ID for workflow context")
current: str = Field(default="", description="Current instruction text") current: str = Field(default="", description="Current instruction text")
language: str = Field(default="javascript", description="Programming language (javascript/python)") language: str = Field(default="javascript", description="Programming language (javascript/python)")
instruction: str = Field(..., description="Instruction for generation") instruction: str = Field(..., description="Instruction for generation")
model_config_data: dict[str, Any] = Field(..., alias="model_config", description="Model configuration") model_config_data: ModelConfig = Field(..., alias="model_config", description="Model configuration")
ideal_output: str = Field(default="", description="Expected ideal output") ideal_output: str = Field(default="", description="Expected ideal output")
app_id: str | None = Field(default=None, description="App ID for prompt generation tracing") app_id: str | None = Field(default=None, description="App ID for prompt generation tracing")

View File

@ -9,6 +9,7 @@ class RuleGeneratePayload(BaseModel):
instruction: str = Field(..., description="Rule generation instruction") instruction: str = Field(..., description="Rule generation instruction")
model_config_data: ModelConfig = Field(..., alias="model_config", description="Model configuration") model_config_data: ModelConfig = Field(..., alias="model_config", description="Model configuration")
no_variable: bool = Field(default=False, description="Whether to exclude variables") no_variable: bool = Field(default=False, description="Whether to exclude variables")
app_id: str | None = Field(default=None, description="App ID for prompt generation tracing")
class RuleCodeGeneratePayload(RuleGeneratePayload): class RuleCodeGeneratePayload(RuleGeneratePayload):
@ -18,3 +19,4 @@ class RuleCodeGeneratePayload(RuleGeneratePayload):
class RuleStructuredOutputPayload(BaseModel): class RuleStructuredOutputPayload(BaseModel):
instruction: str = Field(..., description="Structured output generation instruction") instruction: str = Field(..., description="Structured output generation instruction")
model_config_data: ModelConfig = Field(..., alias="model_config", description="Model configuration") model_config_data: ModelConfig = Field(..., alias="model_config", description="Model configuration")
app_id: str | None = Field(default=None, description="App ID for prompt generation tracing")

View File

@ -6,6 +6,7 @@ from typing import Protocol, cast
import json_repair import json_repair
from core.app.app_config.entities import ModelConfig
from core.llm_generator.output_parser.rule_config_generator import RuleConfigGeneratorOutputParser from core.llm_generator.output_parser.rule_config_generator import RuleConfigGeneratorOutputParser
from core.llm_generator.output_parser.suggested_questions_after_answer import SuggestedQuestionsAfterAnswerOutputParser from core.llm_generator.output_parser.suggested_questions_after_answer import SuggestedQuestionsAfterAnswerOutputParser
from core.llm_generator.prompts import ( from core.llm_generator.prompts import (
@ -72,7 +73,7 @@ class LLMGenerator:
response: LLMResult = model_instance.invoke_llm( response: LLMResult = model_instance.invoke_llm(
prompt_messages=list(prompts), model_parameters={"max_tokens": 500, "temperature": 1}, stream=False prompt_messages=list(prompts), model_parameters={"max_tokens": 500, "temperature": 1}, stream=False
) )
answer = cast(str, response.message.content) answer = response.message.get_text_content()
if answer is None: if answer is None:
return "" return ""
try: try:
@ -158,7 +159,7 @@ class LLMGenerator:
cls, cls,
tenant_id: str, tenant_id: str,
instruction: str, instruction: str,
model_config: dict, model_config: ModelConfig,
no_variable: bool, no_variable: bool,
user_id: str | None = None, user_id: str | None = None,
app_id: str | None = None, app_id: str | None = None,
@ -168,7 +169,7 @@ class LLMGenerator:
error = "" error = ""
error_step = "" error_step = ""
rule_config = {"prompt": "", "variables": [], "opening_statement": "", "error": ""} rule_config = {"prompt": "", "variables": [], "opening_statement": "", "error": ""}
model_parameters = model_config.get("completion_params", {}) model_parameters = model_config.completion_params
if no_variable: if no_variable:
prompt_template = PromptTemplateParser(WORKFLOW_RULE_CONFIG_PROMPT_GENERATE_TEMPLATE) prompt_template = PromptTemplateParser(WORKFLOW_RULE_CONFIG_PROMPT_GENERATE_TEMPLATE)
@ -186,8 +187,8 @@ class LLMGenerator:
model_instance = model_manager.get_model_instance( model_instance = model_manager.get_model_instance(
tenant_id=tenant_id, tenant_id=tenant_id,
model_type=ModelType.LLM, model_type=ModelType.LLM,
provider=model_config.get("provider", ""), provider=model_config.provider,
model=model_config.get("name", ""), model=model_config.name,
) )
llm_result = None llm_result = None
@ -197,13 +198,13 @@ class LLMGenerator:
prompt_messages=list(prompt_messages), model_parameters=model_parameters, stream=False prompt_messages=list(prompt_messages), model_parameters=model_parameters, stream=False
) )
rule_config["prompt"] = cast(str, llm_result.message.content) rule_config["prompt"] = llm_result.message.get_text_content() or ""
except InvokeError as e: except InvokeError as e:
error = str(e) error = str(e)
error_step = "generate rule config" error_step = "generate rule config"
except Exception as e: except Exception as e:
logger.exception("Failed to generate rule config, model: %s", model_config.get("name")) logger.exception("Failed to generate rule config, model: %s", model_config.name)
rule_config["error"] = str(e) rule_config["error"] = str(e)
error = str(e) error = str(e)
@ -250,8 +251,8 @@ class LLMGenerator:
model_instance = model_manager.get_model_instance( model_instance = model_manager.get_model_instance(
tenant_id=tenant_id, tenant_id=tenant_id,
model_type=ModelType.LLM, model_type=ModelType.LLM,
provider=model_config.get("provider", ""), provider=model_config.provider,
model=model_config.get("name", ""), model=model_config.name,
) )
llm_result = None llm_result = None
@ -284,7 +285,7 @@ class LLMGenerator:
return rule_config return rule_config
rule_config["prompt"] = cast(str, prompt_content.message.content) rule_config["prompt"] = prompt_content.message.get_text_content() or ""
if not isinstance(prompt_content.message.content, str): if not isinstance(prompt_content.message.content, str):
raise NotImplementedError("prompt content is not a string") raise NotImplementedError("prompt content is not a string")
@ -311,7 +312,7 @@ class LLMGenerator:
prompt_messages=list(parameter_messages), model_parameters=model_parameters, stream=False prompt_messages=list(parameter_messages), model_parameters=model_parameters, stream=False
) )
rule_config["variables"] = re.findall( rule_config["variables"] = re.findall(
r'"\s*([^"]+)\s*"', cast(str, parameter_content.message.content) r'"\s*([^"]+)\s*"', prompt_content.message.get_text_content() or ""
) )
except InvokeError as e: except InvokeError as e:
error = str(e) error = str(e)
@ -321,13 +322,13 @@ class LLMGenerator:
statement_content: LLMResult = model_instance.invoke_llm( statement_content: LLMResult = model_instance.invoke_llm(
prompt_messages=list(statement_messages), model_parameters=model_parameters, stream=False prompt_messages=list(statement_messages), model_parameters=model_parameters, stream=False
) )
rule_config["opening_statement"] = cast(str, statement_content.message.content) rule_config["opening_statement"] = statement_content.message.get_text_content() or ""
except InvokeError as e: except InvokeError as e:
error = str(e) error = str(e)
error_step = "generate conversation opener" error_step = "generate conversation opener"
except Exception as e: except Exception as e:
logger.exception("Failed to generate rule config, model: %s", model_config.get("name")) logger.exception("Failed to generate rule config, model: %s", model_config.name)
rule_config["error"] = str(e) rule_config["error"] = str(e)
error = str(e) error = str(e)
@ -355,7 +356,7 @@ class LLMGenerator:
cls, cls,
tenant_id: str, tenant_id: str,
instruction: str, instruction: str,
model_config: dict, model_config: ModelConfig,
code_language: str = "javascript", code_language: str = "javascript",
user_id: str | None = None, user_id: str | None = None,
app_id: str | None = None, app_id: str | None = None,
@ -377,12 +378,12 @@ class LLMGenerator:
model_instance = model_manager.get_model_instance( model_instance = model_manager.get_model_instance(
tenant_id=tenant_id, tenant_id=tenant_id,
model_type=ModelType.LLM, model_type=ModelType.LLM,
provider=model_config.get("provider", ""), provider=model_config.provider,
model=model_config.get("name", ""), model=model_config.name,
) )
prompt_messages = [UserPromptMessage(content=prompt)] prompt_messages = [UserPromptMessage(content=prompt)]
model_parameters = model_config.get("completion_params", {}) model_parameters = model_config.completion_params
llm_result = None llm_result = None
error = None error = None
@ -392,7 +393,7 @@ class LLMGenerator:
prompt_messages=list(prompt_messages), model_parameters=model_parameters, stream=False prompt_messages=list(prompt_messages), model_parameters=model_parameters, stream=False
) )
generated_code = cast(str, llm_result.message.content) generated_code = llm_result.message.get_text_content() or ""
result = {"code": generated_code, "language": code_language, "error": ""} result = {"code": generated_code, "language": code_language, "error": ""}
except InvokeError as e: except InvokeError as e:
@ -400,7 +401,7 @@ class LLMGenerator:
result = {"code": "", "language": code_language, "error": f"Failed to generate code. Error: {error}"} result = {"code": "", "language": code_language, "error": f"Failed to generate code. Error: {error}"}
except Exception as e: except Exception as e:
logger.exception( logger.exception(
"Failed to invoke LLM model, model: %s, language: %s", model_config.get("name"), code_language "Failed to invoke LLM model, model: %s, language: %s", model_config.name, code_language
) )
error = str(e) error = str(e)
result = {"code": "", "language": code_language, "error": f"An unexpected error occurred: {str(e)}"} result = {"code": "", "language": code_language, "error": f"An unexpected error occurred: {str(e)}"}
@ -445,26 +446,31 @@ class LLMGenerator:
raise TypeError("Expected LLMResult when stream=False") raise TypeError("Expected LLMResult when stream=False")
response = result response = result
answer = cast(str, response.message.content) answer = response.message.get_text_content() or ""
return answer.strip() return answer.strip()
@classmethod @classmethod
def generate_structured_output( def generate_structured_output(
cls, tenant_id: str, instruction: str, model_config: dict, user_id: str | None = None, app_id: str | None = None cls,
tenant_id: str,
instruction: str,
model_config: ModelConfig,
user_id: str | None = None,
app_id: str | None = None,
): ):
model_manager = ModelManager() model_manager = ModelManager()
model_instance = model_manager.get_model_instance( model_instance = model_manager.get_model_instance(
tenant_id=tenant_id, tenant_id=tenant_id,
model_type=ModelType.LLM, model_type=ModelType.LLM,
provider=model_config.get("provider", ""), provider=model_config.provider,
model=model_config.get("name", ""), model=model_config.name,
) )
prompt_messages = [ prompt_messages = [
SystemPromptMessage(content=SYSTEM_STRUCTURED_OUTPUT_GENERATE), SystemPromptMessage(content=SYSTEM_STRUCTURED_OUTPUT_GENERATE),
UserPromptMessage(content=instruction), UserPromptMessage(content=instruction),
] ]
model_parameters = model_config.get("model_parameters", {}) model_parameters = model_config.completion_params
llm_result = None llm_result = None
error = None error = None
@ -496,7 +502,7 @@ class LLMGenerator:
error = str(e) error = str(e)
result = {"output": "", "error": f"Failed to generate JSON Schema. Error: {error}"} result = {"output": "", "error": f"Failed to generate JSON Schema. Error: {error}"}
except Exception as e: except Exception as e:
logger.exception("Failed to invoke LLM model, model: %s", model_config.get("name")) logger.exception("Failed to invoke LLM model, model: %s", model_config.name)
error = str(e) error = str(e)
result = {"output": "", "error": f"An unexpected error occurred: {str(e)}"} result = {"output": "", "error": f"An unexpected error occurred: {str(e)}"}
@ -522,7 +528,7 @@ class LLMGenerator:
flow_id: str, flow_id: str,
current: str, current: str,
instruction: str, instruction: str,
model_config: dict, model_config: ModelConfig,
ideal_output: str | None, ideal_output: str | None,
user_id: str | None = None, user_id: str | None = None,
app_id: str | None = None, app_id: str | None = None,
@ -570,7 +576,7 @@ class LLMGenerator:
node_id: str, node_id: str,
current: str, current: str,
instruction: str, instruction: str,
model_config: dict, model_config: ModelConfig,
ideal_output: str | None, ideal_output: str | None,
workflow_service: WorkflowServiceInterface, workflow_service: WorkflowServiceInterface,
user_id: str | None = None, user_id: str | None = None,
@ -647,7 +653,7 @@ class LLMGenerator:
@staticmethod @staticmethod
def __instruction_modify_common( def __instruction_modify_common(
tenant_id: str, tenant_id: str,
model_config: dict, model_config: ModelConfig,
last_run: dict | None, last_run: dict | None,
current: str | None, current: str | None,
error_message: str | None, error_message: str | None,
@ -670,8 +676,8 @@ class LLMGenerator:
model_instance = ModelManager().get_model_instance( model_instance = ModelManager().get_model_instance(
tenant_id=tenant_id, tenant_id=tenant_id,
model_type=ModelType.LLM, model_type=ModelType.LLM,
provider=model_config.get("provider", ""), provider=model_config.provider,
model=model_config.get("name", ""), model=model_config.name,
) )
match node_type: match node_type:
case "llm" | "agent": case "llm" | "agent":
@ -719,9 +725,7 @@ class LLMGenerator:
error = str(e) error = str(e)
result = {"error": f"Failed to generate code. Error: {error}"} result = {"error": f"Failed to generate code. Error: {error}"}
except Exception as e: except Exception as e:
logger.exception( logger.exception("Failed to invoke LLM model, model: %s", json.dumps(model_config.name), exc_info=True)
"Failed to invoke LLM model, model: %s", json.dumps(model_config.get("name")), exc_info=True
)
error = str(e) error = str(e)
result = {"error": f"An unexpected error occurred: {str(e)}"} result = {"error": f"An unexpected error occurred: {str(e)}"}
@ -758,7 +762,7 @@ class LLMGenerator:
instruction: str, instruction: str,
generated_output: str, generated_output: str,
llm_result: LLMResult | None, llm_result: LLMResult | None,
model_config: dict | None = None, model_config: ModelConfig | None = None,
timer=None, timer=None,
error: str | None = None, error: str | None = None,
): ):
@ -768,8 +772,8 @@ class LLMGenerator:
total_tokens = llm_result.usage.total_tokens total_tokens = llm_result.usage.total_tokens
model_name = llm_result.model model_name = llm_result.model
# Extract provider from model_config if available, otherwise fall back to parsing model name # Extract provider from model_config if available, otherwise fall back to parsing model name
if model_config and model_config.get("provider"): if model_config and model_config.provider:
model_provider = model_config.get("provider", "") model_provider = model_config.provider
else: else:
model_provider = model_name.split("/")[0] if "/" in model_name else "" model_provider = model_name.split("/")[0] if "/" in model_name else ""
latency = llm_result.usage.latency latency = llm_result.usage.latency
@ -779,8 +783,8 @@ class LLMGenerator:
prompt_tokens = 0 prompt_tokens = 0
completion_tokens = 0 completion_tokens = 0
total_tokens = 0 total_tokens = 0
model_provider = model_config.get("provider", "") if model_config else "" model_provider = model_config.provider if model_config else ""
model_name = model_config.get("name", "") if model_config else "" model_name = model_config.name if model_config else ""
latency = 0.0 latency = 0.0
if timer: if timer:
start_time = timer.get("start") start_time = timer.get("start")

View File

@ -39,8 +39,8 @@ from extensions.ext_storage import storage
from models.account import Tenant from models.account import Tenant
from models.dataset import Dataset from models.dataset import Dataset
from models.model import App, AppModelConfig, Conversation, Message, MessageFile, TraceAppConfig from models.model import App, AppModelConfig, Conversation, Message, MessageFile, TraceAppConfig
from models.tools import ApiToolProvider, BuiltinToolProvider, MCPToolProvider, WorkflowToolProvider
from models.provider import Provider, ProviderCredential, ProviderModel, ProviderModelCredential, ProviderType from models.provider import Provider, ProviderCredential, ProviderModel, ProviderModelCredential, ProviderType
from models.tools import ApiToolProvider, BuiltinToolProvider, MCPToolProvider, WorkflowToolProvider
from models.workflow import WorkflowAppLog from models.workflow import WorkflowAppLog
from tasks.ops_trace_task import process_trace_tasks from tasks.ops_trace_task import process_trace_tasks
@ -153,9 +153,7 @@ def _lookup_llm_credential_info(
else: else:
# Query ProviderCredential # Query ProviderCredential
cred_name = session.scalar( cred_name = session.scalar(
select(ProviderCredential.credential_name).where( select(ProviderCredential.credential_name).where(ProviderCredential.id == credential_id)
ProviderCredential.id == credential_id
)
) )
if cred_name: if cred_name:
@ -788,7 +786,12 @@ class TraceTask:
) )
message_id = session.scalar(message_data_stmt) message_id = session.scalar(message_data_stmt)
app_name, workspace_name = _lookup_app_and_workspace_names(workflow_run.app_id, tenant_id) from core.telemetry.gateway import is_enterprise_telemetry_enabled
if is_enterprise_telemetry_enabled():
app_name, workspace_name = _lookup_app_and_workspace_names(workflow_run.app_id, tenant_id)
else:
app_name, workspace_name = "", ""
metadata: dict[str, Any] = { metadata: dict[str, Any] = {
"workflow_id": workflow_id, "workflow_id": workflow_id,
@ -867,7 +870,12 @@ class TraceTask:
if tid: if tid:
tenant_id = str(tid) tenant_id = str(tid)
app_name, workspace_name = _lookup_app_and_workspace_names(message_data.app_id, tenant_id) from core.telemetry.gateway import is_enterprise_telemetry_enabled
if is_enterprise_telemetry_enabled():
app_name, workspace_name = _lookup_app_and_workspace_names(message_data.app_id, tenant_id)
else:
app_name, workspace_name = "", ""
metadata = { metadata = {
"conversation_id": message_data.conversation_id, "conversation_id": message_data.conversation_id,
@ -904,7 +912,9 @@ class TraceTask:
outputs=message_data.answer, outputs=message_data.answer,
file_list=file_list, file_list=file_list,
start_time=created_at, start_time=created_at,
end_time=message_data.updated_at if message_data.updated_at and message_data.updated_at > created_at else created_at + timedelta(seconds=message_data.provider_response_latency), end_time=message_data.updated_at
if message_data.updated_at and message_data.updated_at > created_at
else created_at + timedelta(seconds=message_data.provider_response_latency),
metadata=metadata, metadata=metadata,
message_file_data=message_file_data, message_file_data=message_file_data,
conversation_mode=conversation_mode, conversation_mode=conversation_mode,
@ -1019,7 +1029,12 @@ class TraceTask:
if tid: if tid:
tenant_id = str(tid) tenant_id = str(tid)
app_name, workspace_name = _lookup_app_and_workspace_names(message_data.app_id, tenant_id) from core.telemetry.gateway import is_enterprise_telemetry_enabled
if is_enterprise_telemetry_enabled():
app_name, workspace_name = _lookup_app_and_workspace_names(message_data.app_id, tenant_id)
else:
app_name, workspace_name = "", ""
doc_list = [doc.model_dump() for doc in documents] if documents else [] doc_list = [doc.model_dump() for doc in documents] if documents else []
dataset_ids: set[str] = set() dataset_ids: set[str] = set()
@ -1269,24 +1284,32 @@ class TraceTask:
if not node_data: if not node_data:
return {} return {}
app_name, workspace_name = _lookup_app_and_workspace_names(node_data.get("app_id"), node_data.get("tenant_id")) from core.telemetry.gateway import is_enterprise_telemetry_enabled
if is_enterprise_telemetry_enabled():
app_name, workspace_name = _lookup_app_and_workspace_names(
node_data.get("app_id"), node_data.get("tenant_id")
)
else:
app_name, workspace_name = "", ""
# Try tool credential lookup first # Try tool credential lookup first
credential_id = node_data.get("credential_id") credential_id = node_data.get("credential_id")
credential_name = _lookup_credential_name(credential_id, node_data.get("credential_provider_type")) if is_enterprise_telemetry_enabled():
credential_name = _lookup_credential_name(credential_id, node_data.get("credential_provider_type"))
# If no credential_id found (e.g., LLM nodes), try LLM credential lookup # If no credential_id found (e.g., LLM nodes), try LLM credential lookup
if not credential_id: if not credential_id:
llm_cred_id, llm_cred_name = _lookup_llm_credential_info( llm_cred_id, llm_cred_name = _lookup_llm_credential_info(
tenant_id=node_data.get("tenant_id"), tenant_id=node_data.get("tenant_id"),
provider=node_data.get("model_provider"), provider=node_data.get("model_provider"),
model=node_data.get("model_name"), model=node_data.get("model_name"),
model_type="llm", model_type="llm",
) )
if llm_cred_id: if llm_cred_id:
credential_id = llm_cred_id credential_id = llm_cred_id
credential_name = llm_cred_name credential_name = llm_cred_name
else:
credential_name = ""
metadata: dict[str, Any] = { metadata: dict[str, Any] = {
"tenant_id": node_data.get("tenant_id"), "tenant_id": node_data.get("tenant_id"),
"app_id": node_data.get("app_id"), "app_id": node_data.get("app_id"),

View File

@ -18,11 +18,11 @@ import uuid
from typing import TYPE_CHECKING, Any from typing import TYPE_CHECKING, Any
from core.ops.entities.trace_entity import TraceTaskName from core.ops.entities.trace_entity import TraceTaskName
from enterprise.telemetry.contracts import CaseRoute, SignalType, TelemetryCase, TelemetryEnvelope
from extensions.ext_storage import storage from extensions.ext_storage import storage
if TYPE_CHECKING: if TYPE_CHECKING:
from core.ops.ops_trace_manager import TraceQueueManager from core.ops.ops_trace_manager import TraceQueueManager
from enterprise.telemetry.contracts import TelemetryCase
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -32,40 +32,56 @@ PAYLOAD_SIZE_THRESHOLD_BYTES = 1 * 1024 * 1024
# Routing table — authoritative mapping for all editions # Routing table — authoritative mapping for all editions
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
CASE_TO_TRACE_TASK: dict[TelemetryCase, TraceTaskName] = { _case_to_trace_task: dict | None = None
TelemetryCase.WORKFLOW_RUN: TraceTaskName.WORKFLOW_TRACE, _case_routing: dict | None = None
TelemetryCase.MESSAGE_RUN: TraceTaskName.MESSAGE_TRACE,
TelemetryCase.NODE_EXECUTION: TraceTaskName.NODE_EXECUTION_TRACE,
TelemetryCase.DRAFT_NODE_EXECUTION: TraceTaskName.DRAFT_NODE_EXECUTION_TRACE,
TelemetryCase.PROMPT_GENERATION: TraceTaskName.PROMPT_GENERATION_TRACE,
TelemetryCase.TOOL_EXECUTION: TraceTaskName.TOOL_TRACE,
TelemetryCase.MODERATION_CHECK: TraceTaskName.MODERATION_TRACE,
TelemetryCase.SUGGESTED_QUESTION: TraceTaskName.SUGGESTED_QUESTION_TRACE,
TelemetryCase.DATASET_RETRIEVAL: TraceTaskName.DATASET_RETRIEVAL_TRACE,
TelemetryCase.GENERATE_NAME: TraceTaskName.GENERATE_NAME_TRACE,
}
TRACE_TASK_TO_CASE: dict[TraceTaskName, TelemetryCase] = {v: k for k, v in CASE_TO_TRACE_TASK.items()}
CASE_ROUTING: dict[TelemetryCase, CaseRoute] = { def _get_case_to_trace_task() -> dict:
# TRACE — CE-eligible (flow in both CE and EE) global _case_to_trace_task
TelemetryCase.WORKFLOW_RUN: CaseRoute(signal_type=SignalType.TRACE, ce_eligible=True), if _case_to_trace_task is None:
TelemetryCase.MESSAGE_RUN: CaseRoute(signal_type=SignalType.TRACE, ce_eligible=True), from enterprise.telemetry.contracts import TelemetryCase
TelemetryCase.TOOL_EXECUTION: CaseRoute(signal_type=SignalType.TRACE, ce_eligible=True),
TelemetryCase.MODERATION_CHECK: CaseRoute(signal_type=SignalType.TRACE, ce_eligible=True), _case_to_trace_task = {
TelemetryCase.SUGGESTED_QUESTION: CaseRoute(signal_type=SignalType.TRACE, ce_eligible=True), TelemetryCase.WORKFLOW_RUN: TraceTaskName.WORKFLOW_TRACE,
TelemetryCase.DATASET_RETRIEVAL: CaseRoute(signal_type=SignalType.TRACE, ce_eligible=True), TelemetryCase.MESSAGE_RUN: TraceTaskName.MESSAGE_TRACE,
TelemetryCase.GENERATE_NAME: CaseRoute(signal_type=SignalType.TRACE, ce_eligible=True), TelemetryCase.NODE_EXECUTION: TraceTaskName.NODE_EXECUTION_TRACE,
# TRACE — enterprise-only TelemetryCase.DRAFT_NODE_EXECUTION: TraceTaskName.DRAFT_NODE_EXECUTION_TRACE,
TelemetryCase.NODE_EXECUTION: CaseRoute(signal_type=SignalType.TRACE, ce_eligible=False), TelemetryCase.PROMPT_GENERATION: TraceTaskName.PROMPT_GENERATION_TRACE,
TelemetryCase.DRAFT_NODE_EXECUTION: CaseRoute(signal_type=SignalType.TRACE, ce_eligible=False), TelemetryCase.TOOL_EXECUTION: TraceTaskName.TOOL_TRACE,
TelemetryCase.PROMPT_GENERATION: CaseRoute(signal_type=SignalType.TRACE, ce_eligible=False), TelemetryCase.MODERATION_CHECK: TraceTaskName.MODERATION_TRACE,
# METRIC_LOG — enterprise-only (signal-driven, not trace) TelemetryCase.SUGGESTED_QUESTION: TraceTaskName.SUGGESTED_QUESTION_TRACE,
TelemetryCase.APP_CREATED: CaseRoute(signal_type=SignalType.METRIC_LOG, ce_eligible=False), TelemetryCase.DATASET_RETRIEVAL: TraceTaskName.DATASET_RETRIEVAL_TRACE,
TelemetryCase.APP_UPDATED: CaseRoute(signal_type=SignalType.METRIC_LOG, ce_eligible=False), TelemetryCase.GENERATE_NAME: TraceTaskName.GENERATE_NAME_TRACE,
TelemetryCase.APP_DELETED: CaseRoute(signal_type=SignalType.METRIC_LOG, ce_eligible=False), }
TelemetryCase.FEEDBACK_CREATED: CaseRoute(signal_type=SignalType.METRIC_LOG, ce_eligible=False), return _case_to_trace_task
}
def _get_case_routing() -> dict:
global _case_routing
if _case_routing is None:
from enterprise.telemetry.contracts import CaseRoute, SignalType, TelemetryCase
_case_routing = {
# TRACE — CE-eligible (flow in both CE and EE)
TelemetryCase.WORKFLOW_RUN: CaseRoute(signal_type=SignalType.TRACE, ce_eligible=True),
TelemetryCase.MESSAGE_RUN: CaseRoute(signal_type=SignalType.TRACE, ce_eligible=True),
TelemetryCase.TOOL_EXECUTION: CaseRoute(signal_type=SignalType.TRACE, ce_eligible=True),
TelemetryCase.MODERATION_CHECK: CaseRoute(signal_type=SignalType.TRACE, ce_eligible=True),
TelemetryCase.SUGGESTED_QUESTION: CaseRoute(signal_type=SignalType.TRACE, ce_eligible=True),
TelemetryCase.DATASET_RETRIEVAL: CaseRoute(signal_type=SignalType.TRACE, ce_eligible=True),
TelemetryCase.GENERATE_NAME: CaseRoute(signal_type=SignalType.TRACE, ce_eligible=True),
# TRACE — enterprise-only
TelemetryCase.NODE_EXECUTION: CaseRoute(signal_type=SignalType.TRACE, ce_eligible=False),
TelemetryCase.DRAFT_NODE_EXECUTION: CaseRoute(signal_type=SignalType.TRACE, ce_eligible=False),
TelemetryCase.PROMPT_GENERATION: CaseRoute(signal_type=SignalType.TRACE, ce_eligible=False),
# METRIC_LOG — enterprise-only (signal-driven, not trace)
TelemetryCase.APP_CREATED: CaseRoute(signal_type=SignalType.METRIC_LOG, ce_eligible=False),
TelemetryCase.APP_UPDATED: CaseRoute(signal_type=SignalType.METRIC_LOG, ce_eligible=False),
TelemetryCase.APP_DELETED: CaseRoute(signal_type=SignalType.METRIC_LOG, ce_eligible=False),
TelemetryCase.FEEDBACK_CREATED: CaseRoute(signal_type=SignalType.METRIC_LOG, ce_eligible=False),
}
return _case_routing
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# Helpers # Helpers
@ -132,7 +148,7 @@ def emit(
METRIC_LOG events are dispatched to the enterprise Celery queue; METRIC_LOG events are dispatched to the enterprise Celery queue;
silently dropped when enterprise telemetry is unavailable. silently dropped when enterprise telemetry is unavailable.
""" """
route = CASE_ROUTING.get(case) route = _get_case_routing().get(case)
if route is None: if route is None:
logger.warning("Unknown telemetry case: %s, dropping event", case) logger.warning("Unknown telemetry case: %s, dropping event", case)
return return
@ -141,7 +157,7 @@ def emit(
logger.debug("Dropping EE-only event: case=%s (EE disabled)", case) logger.debug("Dropping EE-only event: case=%s (EE disabled)", case)
return return
if route.signal_type is SignalType.TRACE: if route.signal_type == "trace":
_emit_trace(case, context, payload, trace_manager) _emit_trace(case, context, payload, trace_manager)
else: else:
_emit_metric_log(case, context, payload) _emit_metric_log(case, context, payload)
@ -156,7 +172,7 @@ def _emit_trace(
from core.ops.ops_trace_manager import TraceQueueManager as LocalTraceQueueManager from core.ops.ops_trace_manager import TraceQueueManager as LocalTraceQueueManager
from core.ops.ops_trace_manager import TraceTask from core.ops.ops_trace_manager import TraceTask
trace_task_name = CASE_TO_TRACE_TASK.get(case) trace_task_name = _get_case_to_trace_task().get(case)
if trace_task_name is None: if trace_task_name is None:
logger.warning("No TraceTaskName mapping for case: %s", case) logger.warning("No TraceTaskName mapping for case: %s", case)
return return
@ -189,6 +205,8 @@ def _emit_metric_log(
payload_for_envelope, payload_ref = _handle_payload_sizing(payload, tenant_id, event_id) payload_for_envelope, payload_ref = _handle_payload_sizing(payload, tenant_id, event_id)
from enterprise.telemetry.contracts import TelemetryEnvelope
envelope = TelemetryEnvelope( envelope = TelemetryEnvelope(
case=case, case=case,
tenant_id=tenant_id, tenant_id=tenant_id,

View File

@ -14,14 +14,13 @@ from opentelemetry.trace import Span
from opentelemetry.trace.status import Status, StatusCode from opentelemetry.trace.status import Status, StatusCode
from pydantic import BaseModel from pydantic import BaseModel
from configs import dify_config
from core.file.models import File from core.file.models import File
from core.variables import Segment from core.variables import Segment
from core.workflow.enums import NodeType from core.workflow.enums import NodeType
from core.workflow.graph_events import GraphNodeEventBase from core.workflow.graph_events import GraphNodeEventBase
from core.workflow.nodes.base.node import Node from core.workflow.nodes.base.node import Node
from extensions.otel.semconv.gen_ai import ChainAttributes, GenAIAttributes from extensions.otel.semconv.gen_ai import ChainAttributes, GenAIAttributes
from configs import dify_config
def should_include_content() -> bool: def should_include_content() -> bool:
@ -34,6 +33,7 @@ def should_include_content() -> bool:
return True return True
return dify_config.ENTERPRISE_INCLUDE_CONTENT return dify_config.ENTERPRISE_INCLUDE_CONTENT
def safe_json_dumps(obj: Any, ensure_ascii: bool = False) -> str: def safe_json_dumps(obj: Any, ensure_ascii: bool = False) -> str:
""" """
Safely serialize objects to JSON, handling non-serializable types. Safely serialize objects to JSON, handling non-serializable types.

View File

@ -43,7 +43,11 @@ class ToolNodeOTelParser:
# Tool call arguments and result — gated by content policy # Tool call arguments and result — gated by content policy
if should_include_content(): if should_include_content():
if result_event and result_event.node_run_result and result_event.node_run_result.inputs: if result_event and result_event.node_run_result and result_event.node_run_result.inputs:
span.set_attribute(ToolAttributes.TOOL_CALL_ARGUMENTS, safe_json_dumps(result_event.node_run_result.inputs)) span.set_attribute(
ToolAttributes.TOOL_CALL_ARGUMENTS, safe_json_dumps(result_event.node_run_result.inputs)
)
if result_event and result_event.node_run_result and result_event.node_run_result.outputs: if result_event and result_event.node_run_result and result_event.node_run_result.outputs:
span.set_attribute(ToolAttributes.TOOL_CALL_RESULT, safe_json_dumps(result_event.node_run_result.outputs)) span.set_attribute(
ToolAttributes.TOOL_CALL_RESULT, safe_json_dumps(result_event.node_run_result.outputs)
)

View File

@ -27,7 +27,6 @@ from core.workflow.nodes.start.entities import StartNodeData
from core.workflow.runtime import VariablePool from core.workflow.runtime import VariablePool
from core.workflow.system_variable import SystemVariable from core.workflow.system_variable import SystemVariable
from core.workflow.workflow_entry import WorkflowEntry from core.workflow.workflow_entry import WorkflowEntry
from enterprise.telemetry.draft_trace import enqueue_draft_node_execution_trace
from enums.cloud_plan import CloudPlan from enums.cloud_plan import CloudPlan
from events.app_event import app_draft_workflow_was_synced, app_published_workflow_was_updated from events.app_event import app_draft_workflow_was_synced, app_published_workflow_was_updated
from extensions.ext_database import db from extensions.ext_database import db
@ -734,6 +733,8 @@ class WorkflowService:
with Session(db.engine) as session: with Session(db.engine) as session:
outputs = workflow_node_execution.load_full_outputs(session, storage) outputs = workflow_node_execution.load_full_outputs(session, storage)
from enterprise.telemetry.draft_trace import enqueue_draft_node_execution_trace
enqueue_draft_node_execution_trace( enqueue_draft_node_execution_trace(
execution=workflow_node_execution, execution=workflow_node_execution,
outputs=outputs, outputs=outputs,