diff --git a/api/core/app/apps/advanced_chat/generate_task_pipeline.py b/api/core/app/apps/advanced_chat/generate_task_pipeline.py index 71588870fa..e021b0aca7 100644 --- a/api/core/app/apps/advanced_chat/generate_task_pipeline.py +++ b/api/core/app/apps/advanced_chat/generate_task_pipeline.py @@ -551,7 +551,7 @@ class AdvancedChatAppGenerateTaskPipeline: total_steps=validated_state.node_run_steps, outputs=event.outputs, exceptions_count=event.exceptions_count, - conversation_id=None, + conversation_id=self._conversation_id, trace_manager=trace_manager, external_trace_id=self._application_generate_entity.extras.get("external_trace_id"), ) diff --git a/api/core/ops/aliyun_trace/aliyun_trace.py b/api/core/ops/aliyun_trace/aliyun_trace.py index 7e817a6bff..c0727326ce 100644 --- a/api/core/ops/aliyun_trace/aliyun_trace.py +++ b/api/core/ops/aliyun_trace/aliyun_trace.py @@ -1,38 +1,28 @@ -import json import logging from collections.abc import Sequence -from urllib.parse import urljoin -from opentelemetry.trace import Link, Status, StatusCode -from sqlalchemy import select -from sqlalchemy.orm import Session, sessionmaker +from sqlalchemy.orm import sessionmaker from core.ops.aliyun_trace.data_exporter.traceclient import ( TraceClient, + build_endpoint, convert_datetime_to_nanoseconds, convert_to_span_id, convert_to_trace_id, - create_link, generate_span_id, ) -from core.ops.aliyun_trace.entities.aliyun_trace_entity import SpanData +from core.ops.aliyun_trace.entities.aliyun_trace_entity import SpanData, TraceMetadata from core.ops.aliyun_trace.entities.semconv import ( GEN_AI_COMPLETION, - GEN_AI_FRAMEWORK, GEN_AI_MODEL_NAME, GEN_AI_PROMPT, GEN_AI_PROMPT_TEMPLATE_TEMPLATE, GEN_AI_PROMPT_TEMPLATE_VARIABLE, GEN_AI_RESPONSE_FINISH_REASON, - GEN_AI_SESSION_ID, - GEN_AI_SPAN_KIND, GEN_AI_SYSTEM, GEN_AI_USAGE_INPUT_TOKENS, GEN_AI_USAGE_OUTPUT_TOKENS, GEN_AI_USAGE_TOTAL_TOKENS, - GEN_AI_USER_ID, - INPUT_VALUE, - OUTPUT_VALUE, RETRIEVAL_DOCUMENT, RETRIEVAL_QUERY, TOOL_DESCRIPTION, @@ -40,6 +30,15 @@ from core.ops.aliyun_trace.entities.semconv import ( TOOL_PARAMETERS, GenAISpanKind, ) +from core.ops.aliyun_trace.utils import ( + create_common_span_attributes, + create_links_from_trace_id, + create_status_from_error, + extract_retrieval_documents, + get_user_id_from_message_data, + get_workflow_node_status, + serialize_json_data, +) from core.ops.base_trace_instance import BaseTraceInstance from core.ops.entities.config_entity import AliyunConfig from core.ops.entities.trace_entity import ( @@ -52,12 +51,11 @@ from core.ops.entities.trace_entity import ( ToolTraceInfo, WorkflowTraceInfo, ) -from core.rag.models.document import Document from core.repositories import SQLAlchemyWorkflowNodeExecutionRepository from core.workflow.entities import WorkflowNodeExecution -from core.workflow.enums import NodeType, WorkflowNodeExecutionMetadataKey, WorkflowNodeExecutionStatus +from core.workflow.enums import NodeType, WorkflowNodeExecutionMetadataKey from extensions.ext_database import db -from models import Account, App, EndUser, TenantAccountJoin, WorkflowNodeExecutionTriggeredFrom +from models import WorkflowNodeExecutionTriggeredFrom logger = logging.getLogger(__name__) @@ -68,8 +66,7 @@ class AliyunDataTrace(BaseTraceInstance): aliyun_config: AliyunConfig, ): super().__init__(aliyun_config) - base_url = aliyun_config.endpoint.rstrip("/") - endpoint = urljoin(base_url, f"adapt_{aliyun_config.license_key}/api/otlp/traces") + endpoint = build_endpoint(aliyun_config.endpoint, aliyun_config.license_key) self.trace_client = TraceClient(service_name=aliyun_config.app_name, endpoint=endpoint) def trace(self, trace_info: BaseTraceInfo): @@ -95,423 +92,422 @@ class AliyunDataTrace(BaseTraceInstance): try: return self.trace_client.get_project_url() except Exception as e: - logger.info("Aliyun get run url failed: %s", str(e), exc_info=True) - raise ValueError(f"Aliyun get run url failed: {str(e)}") + logger.info("Aliyun get project url failed: %s", str(e), exc_info=True) + raise ValueError(f"Aliyun get project url failed: {str(e)}") def workflow_trace(self, trace_info: WorkflowTraceInfo): - trace_id = convert_to_trace_id(trace_info.workflow_run_id) - links = [] - if trace_info.trace_id: - links.append(create_link(trace_id_str=trace_info.trace_id)) - workflow_span_id = convert_to_span_id(trace_info.workflow_run_id, "workflow") - self.add_workflow_span(trace_id, workflow_span_id, trace_info, links) + trace_metadata = TraceMetadata( + trace_id=convert_to_trace_id(trace_info.workflow_run_id), + workflow_span_id=convert_to_span_id(trace_info.workflow_run_id, "workflow"), + session_id=trace_info.metadata.get("conversation_id") or "", + user_id=str(trace_info.metadata.get("user_id") or ""), + links=create_links_from_trace_id(trace_info.trace_id), + ) + + self.add_workflow_span(trace_info, trace_metadata) workflow_node_executions = self.get_workflow_node_executions(trace_info) for node_execution in workflow_node_executions: - node_span = self.build_workflow_node_span(node_execution, trace_id, trace_info, workflow_span_id) + node_span = self.build_workflow_node_span(node_execution, trace_info, trace_metadata) self.trace_client.add_span(node_span) def message_trace(self, trace_info: MessageTraceInfo): message_data = trace_info.message_data if message_data is None: return + message_id = trace_info.message_id + user_id = get_user_id_from_message_data(message_data) + status = create_status_from_error(trace_info.error) - user_id = message_data.from_account_id - if message_data.from_end_user_id: - end_user_data: EndUser | None = ( - db.session.query(EndUser).where(EndUser.id == message_data.from_end_user_id).first() - ) - if end_user_data is not None: - user_id = end_user_data.session_id + trace_metadata = TraceMetadata( + trace_id=convert_to_trace_id(message_id), + workflow_span_id=0, + session_id=trace_info.metadata.get("conversation_id") or "", + user_id=user_id, + links=create_links_from_trace_id(trace_info.trace_id), + ) - status: Status = Status(StatusCode.OK) - if trace_info.error: - status = Status(StatusCode.ERROR, trace_info.error) - - trace_id = convert_to_trace_id(message_id) - links = [] - if trace_info.trace_id: - links.append(create_link(trace_id_str=trace_info.trace_id)) + inputs_json = serialize_json_data(trace_info.inputs) + outputs_str = str(trace_info.outputs) message_span_id = convert_to_span_id(message_id, "message") message_span = SpanData( - trace_id=trace_id, + trace_id=trace_metadata.trace_id, parent_span_id=None, span_id=message_span_id, name="message", start_time=convert_datetime_to_nanoseconds(trace_info.start_time), end_time=convert_datetime_to_nanoseconds(trace_info.end_time), - attributes={ - GEN_AI_SESSION_ID: trace_info.metadata.get("conversation_id") or "", - GEN_AI_USER_ID: str(user_id), - GEN_AI_SPAN_KIND: GenAISpanKind.CHAIN.value, - GEN_AI_FRAMEWORK: "dify", - INPUT_VALUE: json.dumps(trace_info.inputs, ensure_ascii=False), - OUTPUT_VALUE: str(trace_info.outputs), - }, + attributes=create_common_span_attributes( + session_id=trace_metadata.session_id, + user_id=trace_metadata.user_id, + span_kind=GenAISpanKind.CHAIN, + inputs=inputs_json, + outputs=outputs_str, + ), status=status, - links=links, + links=trace_metadata.links, ) self.trace_client.add_span(message_span) - app_model_config = getattr(trace_info.message_data, "app_model_config", {}) + app_model_config = getattr(message_data, "app_model_config", {}) pre_prompt = getattr(app_model_config, "pre_prompt", "") - inputs_data = getattr(trace_info.message_data, "inputs", {}) + inputs_data = getattr(message_data, "inputs", {}) + llm_span = SpanData( - trace_id=trace_id, + trace_id=trace_metadata.trace_id, parent_span_id=message_span_id, span_id=convert_to_span_id(message_id, "llm"), name="llm", start_time=convert_datetime_to_nanoseconds(trace_info.start_time), end_time=convert_datetime_to_nanoseconds(trace_info.end_time), attributes={ - GEN_AI_SESSION_ID: trace_info.metadata.get("conversation_id") or "", - GEN_AI_USER_ID: str(user_id), - GEN_AI_SPAN_KIND: GenAISpanKind.LLM.value, - GEN_AI_FRAMEWORK: "dify", + **create_common_span_attributes( + session_id=trace_metadata.session_id, + user_id=trace_metadata.user_id, + span_kind=GenAISpanKind.LLM, + inputs=inputs_json, + outputs=outputs_str, + ), GEN_AI_MODEL_NAME: trace_info.metadata.get("ls_model_name") or "", GEN_AI_SYSTEM: trace_info.metadata.get("ls_provider") or "", GEN_AI_USAGE_INPUT_TOKENS: str(trace_info.message_tokens), GEN_AI_USAGE_OUTPUT_TOKENS: str(trace_info.answer_tokens), GEN_AI_USAGE_TOTAL_TOKENS: str(trace_info.total_tokens), - GEN_AI_PROMPT_TEMPLATE_VARIABLE: json.dumps(inputs_data, ensure_ascii=False), + GEN_AI_PROMPT_TEMPLATE_VARIABLE: serialize_json_data(inputs_data), GEN_AI_PROMPT_TEMPLATE_TEMPLATE: pre_prompt, - GEN_AI_PROMPT: json.dumps(trace_info.inputs, ensure_ascii=False), - GEN_AI_COMPLETION: str(trace_info.outputs), - INPUT_VALUE: json.dumps(trace_info.inputs, ensure_ascii=False), - OUTPUT_VALUE: str(trace_info.outputs), + GEN_AI_PROMPT: inputs_json, + GEN_AI_COMPLETION: outputs_str, }, status=status, + links=trace_metadata.links, ) self.trace_client.add_span(llm_span) def dataset_retrieval_trace(self, trace_info: DatasetRetrievalTraceInfo): if trace_info.message_data is None: return + message_id = trace_info.message_id - trace_id = convert_to_trace_id(message_id) - links = [] - if trace_info.trace_id: - links.append(create_link(trace_id_str=trace_info.trace_id)) + trace_metadata = TraceMetadata( + trace_id=convert_to_trace_id(message_id), + workflow_span_id=0, + session_id=trace_info.metadata.get("conversation_id") or "", + user_id=str(trace_info.metadata.get("user_id") or ""), + links=create_links_from_trace_id(trace_info.trace_id), + ) documents_data = extract_retrieval_documents(trace_info.documents) + documents_json = serialize_json_data(documents_data) + inputs_str = str(trace_info.inputs) + dataset_retrieval_span = SpanData( - trace_id=trace_id, + trace_id=trace_metadata.trace_id, parent_span_id=convert_to_span_id(message_id, "message"), span_id=generate_span_id(), name="dataset_retrieval", start_time=convert_datetime_to_nanoseconds(trace_info.start_time), end_time=convert_datetime_to_nanoseconds(trace_info.end_time), attributes={ - GEN_AI_SPAN_KIND: GenAISpanKind.RETRIEVER.value, - GEN_AI_FRAMEWORK: "dify", - RETRIEVAL_QUERY: str(trace_info.inputs), - RETRIEVAL_DOCUMENT: json.dumps(documents_data, ensure_ascii=False), - INPUT_VALUE: str(trace_info.inputs), - OUTPUT_VALUE: json.dumps(documents_data, ensure_ascii=False), + **create_common_span_attributes( + session_id=trace_metadata.session_id, + user_id=trace_metadata.user_id, + span_kind=GenAISpanKind.RETRIEVER, + inputs=inputs_str, + outputs=documents_json, + ), + RETRIEVAL_QUERY: inputs_str, + RETRIEVAL_DOCUMENT: documents_json, }, - links=links, + links=trace_metadata.links, ) self.trace_client.add_span(dataset_retrieval_span) def tool_trace(self, trace_info: ToolTraceInfo): if trace_info.message_data is None: return + message_id = trace_info.message_id + status = create_status_from_error(trace_info.error) - status: Status = Status(StatusCode.OK) - if trace_info.error: - status = Status(StatusCode.ERROR, trace_info.error) + trace_metadata = TraceMetadata( + trace_id=convert_to_trace_id(message_id), + workflow_span_id=0, + session_id=trace_info.metadata.get("conversation_id") or "", + user_id=str(trace_info.metadata.get("user_id") or ""), + links=create_links_from_trace_id(trace_info.trace_id), + ) - trace_id = convert_to_trace_id(message_id) - links = [] - if trace_info.trace_id: - links.append(create_link(trace_id_str=trace_info.trace_id)) + tool_config_json = serialize_json_data(trace_info.tool_config) + tool_inputs_json = serialize_json_data(trace_info.tool_inputs) + inputs_json = serialize_json_data(trace_info.inputs) tool_span = SpanData( - trace_id=trace_id, + trace_id=trace_metadata.trace_id, parent_span_id=convert_to_span_id(message_id, "message"), span_id=generate_span_id(), name=trace_info.tool_name, start_time=convert_datetime_to_nanoseconds(trace_info.start_time), end_time=convert_datetime_to_nanoseconds(trace_info.end_time), attributes={ - GEN_AI_SPAN_KIND: GenAISpanKind.TOOL.value, - GEN_AI_FRAMEWORK: "dify", + **create_common_span_attributes( + session_id=trace_metadata.session_id, + user_id=trace_metadata.user_id, + span_kind=GenAISpanKind.TOOL, + inputs=inputs_json, + outputs=str(trace_info.tool_outputs), + ), TOOL_NAME: trace_info.tool_name, - TOOL_DESCRIPTION: json.dumps(trace_info.tool_config, ensure_ascii=False), - TOOL_PARAMETERS: json.dumps(trace_info.tool_inputs, ensure_ascii=False), - INPUT_VALUE: json.dumps(trace_info.inputs, ensure_ascii=False), - OUTPUT_VALUE: str(trace_info.tool_outputs), + TOOL_DESCRIPTION: tool_config_json, + TOOL_PARAMETERS: tool_inputs_json, }, status=status, - links=links, + links=trace_metadata.links, ) self.trace_client.add_span(tool_span) def get_workflow_node_executions(self, trace_info: WorkflowTraceInfo) -> Sequence[WorkflowNodeExecution]: - # through workflow_run_id get all_nodes_execution using repository - session_factory = sessionmaker(bind=db.engine) - # Find the app's creator account - with Session(db.engine, expire_on_commit=False) as session: - # Get the app to find its creator - app_id = trace_info.metadata.get("app_id") - if not app_id: - raise ValueError("No app_id found in trace_info metadata") - app_stmt = select(App).where(App.id == app_id) - app = session.scalar(app_stmt) - if not app: - raise ValueError(f"App with id {app_id} not found") + app_id = trace_info.metadata.get("app_id") + if not app_id: + raise ValueError("No app_id found in trace_info metadata") - if not app.created_by: - raise ValueError(f"App with id {app_id} has no creator (created_by is None)") - account_stmt = select(Account).where(Account.id == app.created_by) - service_account = session.scalar(account_stmt) - if not service_account: - raise ValueError(f"Creator account with id {app.created_by} not found for app {app_id}") - current_tenant = ( - session.query(TenantAccountJoin).filter_by(account_id=service_account.id, current=True).first() - ) - if not current_tenant: - raise ValueError(f"Current tenant not found for account {service_account.id}") - service_account.set_tenant_id(current_tenant.tenant_id) + service_account = self.get_service_account_with_tenant(app_id) + + session_factory = sessionmaker(bind=db.engine) workflow_node_execution_repository = SQLAlchemyWorkflowNodeExecutionRepository( session_factory=session_factory, user=service_account, app_id=app_id, triggered_from=WorkflowNodeExecutionTriggeredFrom.WORKFLOW_RUN, ) - # Get all executions for this workflow run - workflow_node_executions = workflow_node_execution_repository.get_by_workflow_run( - workflow_run_id=trace_info.workflow_run_id - ) - return workflow_node_executions + + return workflow_node_execution_repository.get_by_workflow_run(workflow_run_id=trace_info.workflow_run_id) def build_workflow_node_span( - self, node_execution: WorkflowNodeExecution, trace_id: int, trace_info: WorkflowTraceInfo, workflow_span_id: int + self, node_execution: WorkflowNodeExecution, trace_info: WorkflowTraceInfo, trace_metadata: TraceMetadata ): try: if node_execution.node_type == NodeType.LLM: - node_span = self.build_workflow_llm_span(trace_id, workflow_span_id, trace_info, node_execution) + node_span = self.build_workflow_llm_span(trace_info, node_execution, trace_metadata) elif node_execution.node_type == NodeType.KNOWLEDGE_RETRIEVAL: - node_span = self.build_workflow_retrieval_span(trace_id, workflow_span_id, trace_info, node_execution) + node_span = self.build_workflow_retrieval_span(trace_info, node_execution, trace_metadata) elif node_execution.node_type == NodeType.TOOL: - node_span = self.build_workflow_tool_span(trace_id, workflow_span_id, trace_info, node_execution) + node_span = self.build_workflow_tool_span(trace_info, node_execution, trace_metadata) else: - node_span = self.build_workflow_task_span(trace_id, workflow_span_id, trace_info, node_execution) + node_span = self.build_workflow_task_span(trace_info, node_execution, trace_metadata) return node_span except Exception as e: logger.debug("Error occurred in build_workflow_node_span: %s", e, exc_info=True) return None - def get_workflow_node_status(self, node_execution: WorkflowNodeExecution) -> Status: - span_status: Status = Status(StatusCode.UNSET) - if node_execution.status == WorkflowNodeExecutionStatus.SUCCEEDED: - span_status = Status(StatusCode.OK) - elif node_execution.status in [WorkflowNodeExecutionStatus.FAILED, WorkflowNodeExecutionStatus.EXCEPTION]: - span_status = Status(StatusCode.ERROR, str(node_execution.error)) - return span_status - def build_workflow_task_span( - self, trace_id: int, workflow_span_id: int, trace_info: WorkflowTraceInfo, node_execution: WorkflowNodeExecution + self, trace_info: WorkflowTraceInfo, node_execution: WorkflowNodeExecution, trace_metadata: TraceMetadata ) -> SpanData: + inputs_json = serialize_json_data(node_execution.inputs) + outputs_json = serialize_json_data(node_execution.outputs) return SpanData( - trace_id=trace_id, - parent_span_id=workflow_span_id, + trace_id=trace_metadata.trace_id, + parent_span_id=trace_metadata.workflow_span_id, span_id=convert_to_span_id(node_execution.id, "node"), name=node_execution.title, start_time=convert_datetime_to_nanoseconds(node_execution.created_at), end_time=convert_datetime_to_nanoseconds(node_execution.finished_at), - attributes={ - GEN_AI_SESSION_ID: trace_info.metadata.get("conversation_id") or "", - GEN_AI_SPAN_KIND: GenAISpanKind.TASK.value, - GEN_AI_FRAMEWORK: "dify", - INPUT_VALUE: json.dumps(node_execution.inputs, ensure_ascii=False), - OUTPUT_VALUE: json.dumps(node_execution.outputs, ensure_ascii=False), - }, - status=self.get_workflow_node_status(node_execution), + attributes=create_common_span_attributes( + session_id=trace_metadata.session_id, + user_id=trace_metadata.user_id, + span_kind=GenAISpanKind.TASK, + inputs=inputs_json, + outputs=outputs_json, + ), + status=get_workflow_node_status(node_execution), + links=trace_metadata.links, ) def build_workflow_tool_span( - self, trace_id: int, workflow_span_id: int, trace_info: WorkflowTraceInfo, node_execution: WorkflowNodeExecution + self, trace_info: WorkflowTraceInfo, node_execution: WorkflowNodeExecution, trace_metadata: TraceMetadata ) -> SpanData: tool_des = {} if node_execution.metadata: tool_des = node_execution.metadata.get(WorkflowNodeExecutionMetadataKey.TOOL_INFO, {}) + + inputs_json = serialize_json_data(node_execution.inputs or {}) + outputs_json = serialize_json_data(node_execution.outputs) + return SpanData( - trace_id=trace_id, - parent_span_id=workflow_span_id, + trace_id=trace_metadata.trace_id, + parent_span_id=trace_metadata.workflow_span_id, span_id=convert_to_span_id(node_execution.id, "node"), name=node_execution.title, start_time=convert_datetime_to_nanoseconds(node_execution.created_at), end_time=convert_datetime_to_nanoseconds(node_execution.finished_at), attributes={ - GEN_AI_SPAN_KIND: GenAISpanKind.TOOL.value, - GEN_AI_FRAMEWORK: "dify", + **create_common_span_attributes( + session_id=trace_metadata.session_id, + user_id=trace_metadata.user_id, + span_kind=GenAISpanKind.TOOL, + inputs=inputs_json, + outputs=outputs_json, + ), TOOL_NAME: node_execution.title, - TOOL_DESCRIPTION: json.dumps(tool_des, ensure_ascii=False), - TOOL_PARAMETERS: json.dumps(node_execution.inputs or {}, ensure_ascii=False), - INPUT_VALUE: json.dumps(node_execution.inputs or {}, ensure_ascii=False), - OUTPUT_VALUE: json.dumps(node_execution.outputs, ensure_ascii=False), + TOOL_DESCRIPTION: serialize_json_data(tool_des), + TOOL_PARAMETERS: inputs_json, }, - status=self.get_workflow_node_status(node_execution), + status=get_workflow_node_status(node_execution), + links=trace_metadata.links, ) def build_workflow_retrieval_span( - self, trace_id: int, workflow_span_id: int, trace_info: WorkflowTraceInfo, node_execution: WorkflowNodeExecution + self, trace_info: WorkflowTraceInfo, node_execution: WorkflowNodeExecution, trace_metadata: TraceMetadata ) -> SpanData: - input_value = "" - if node_execution.inputs: - input_value = str(node_execution.inputs.get("query", "")) - output_value = "" - if node_execution.outputs: - output_value = json.dumps(node_execution.outputs.get("result", []), ensure_ascii=False) + input_value = str(node_execution.inputs.get("query", "")) if node_execution.inputs else "" + output_value = serialize_json_data(node_execution.outputs.get("result", [])) if node_execution.outputs else "" + return SpanData( - trace_id=trace_id, - parent_span_id=workflow_span_id, + trace_id=trace_metadata.trace_id, + parent_span_id=trace_metadata.workflow_span_id, span_id=convert_to_span_id(node_execution.id, "node"), name=node_execution.title, start_time=convert_datetime_to_nanoseconds(node_execution.created_at), end_time=convert_datetime_to_nanoseconds(node_execution.finished_at), attributes={ - GEN_AI_SPAN_KIND: GenAISpanKind.RETRIEVER.value, - GEN_AI_FRAMEWORK: "dify", + **create_common_span_attributes( + session_id=trace_metadata.session_id, + user_id=trace_metadata.user_id, + span_kind=GenAISpanKind.RETRIEVER, + inputs=input_value, + outputs=output_value, + ), RETRIEVAL_QUERY: input_value, RETRIEVAL_DOCUMENT: output_value, - INPUT_VALUE: input_value, - OUTPUT_VALUE: output_value, }, - status=self.get_workflow_node_status(node_execution), + status=get_workflow_node_status(node_execution), + links=trace_metadata.links, ) def build_workflow_llm_span( - self, trace_id: int, workflow_span_id: int, trace_info: WorkflowTraceInfo, node_execution: WorkflowNodeExecution + self, trace_info: WorkflowTraceInfo, node_execution: WorkflowNodeExecution, trace_metadata: TraceMetadata ) -> SpanData: process_data = node_execution.process_data or {} outputs = node_execution.outputs or {} usage_data = process_data.get("usage", {}) if "usage" in process_data else outputs.get("usage", {}) + + prompts_json = serialize_json_data(process_data.get("prompts", [])) + text_output = str(outputs.get("text", "")) + return SpanData( - trace_id=trace_id, - parent_span_id=workflow_span_id, + trace_id=trace_metadata.trace_id, + parent_span_id=trace_metadata.workflow_span_id, span_id=convert_to_span_id(node_execution.id, "node"), name=node_execution.title, start_time=convert_datetime_to_nanoseconds(node_execution.created_at), end_time=convert_datetime_to_nanoseconds(node_execution.finished_at), attributes={ - GEN_AI_SESSION_ID: trace_info.metadata.get("conversation_id") or "", - GEN_AI_SPAN_KIND: GenAISpanKind.LLM.value, - GEN_AI_FRAMEWORK: "dify", + **create_common_span_attributes( + session_id=trace_metadata.session_id, + user_id=trace_metadata.user_id, + span_kind=GenAISpanKind.LLM, + inputs=prompts_json, + outputs=text_output, + ), GEN_AI_MODEL_NAME: process_data.get("model_name") or "", GEN_AI_SYSTEM: process_data.get("model_provider") or "", GEN_AI_USAGE_INPUT_TOKENS: str(usage_data.get("prompt_tokens", 0)), GEN_AI_USAGE_OUTPUT_TOKENS: str(usage_data.get("completion_tokens", 0)), GEN_AI_USAGE_TOTAL_TOKENS: str(usage_data.get("total_tokens", 0)), - GEN_AI_PROMPT: json.dumps(process_data.get("prompts", []), ensure_ascii=False), - GEN_AI_COMPLETION: str(outputs.get("text", "")), + GEN_AI_PROMPT: prompts_json, + GEN_AI_COMPLETION: text_output, GEN_AI_RESPONSE_FINISH_REASON: outputs.get("finish_reason") or "", - INPUT_VALUE: json.dumps(process_data.get("prompts", []), ensure_ascii=False), - OUTPUT_VALUE: str(outputs.get("text", "")), }, - status=self.get_workflow_node_status(node_execution), + status=get_workflow_node_status(node_execution), + links=trace_metadata.links, ) - def add_workflow_span( - self, trace_id: int, workflow_span_id: int, trace_info: WorkflowTraceInfo, links: Sequence[Link] - ): + def add_workflow_span(self, trace_info: WorkflowTraceInfo, trace_metadata: TraceMetadata): message_span_id = None if trace_info.message_id: message_span_id = convert_to_span_id(trace_info.message_id, "message") - user_id = trace_info.metadata.get("user_id") - status: Status = Status(StatusCode.OK) - if trace_info.error: - status = Status(StatusCode.ERROR, trace_info.error) - if message_span_id: # chatflow + status = create_status_from_error(trace_info.error) + + inputs_json = serialize_json_data(trace_info.workflow_run_inputs) + outputs_json = serialize_json_data(trace_info.workflow_run_outputs) + + if message_span_id: message_span = SpanData( - trace_id=trace_id, + trace_id=trace_metadata.trace_id, parent_span_id=None, span_id=message_span_id, name="message", start_time=convert_datetime_to_nanoseconds(trace_info.start_time), end_time=convert_datetime_to_nanoseconds(trace_info.end_time), - attributes={ - GEN_AI_SESSION_ID: trace_info.metadata.get("conversation_id") or "", - GEN_AI_USER_ID: str(user_id), - GEN_AI_SPAN_KIND: GenAISpanKind.CHAIN.value, - GEN_AI_FRAMEWORK: "dify", - INPUT_VALUE: trace_info.workflow_run_inputs.get("sys.query") or "", - OUTPUT_VALUE: json.dumps(trace_info.workflow_run_outputs, ensure_ascii=False), - }, + attributes=create_common_span_attributes( + session_id=trace_metadata.session_id, + user_id=trace_metadata.user_id, + span_kind=GenAISpanKind.CHAIN, + inputs=trace_info.workflow_run_inputs.get("sys.query") or "", + outputs=outputs_json, + ), status=status, - links=links, + links=trace_metadata.links, ) self.trace_client.add_span(message_span) workflow_span = SpanData( - trace_id=trace_id, + trace_id=trace_metadata.trace_id, parent_span_id=message_span_id, - span_id=workflow_span_id, + span_id=trace_metadata.workflow_span_id, name="workflow", start_time=convert_datetime_to_nanoseconds(trace_info.start_time), end_time=convert_datetime_to_nanoseconds(trace_info.end_time), - attributes={ - GEN_AI_USER_ID: str(user_id), - GEN_AI_SPAN_KIND: GenAISpanKind.CHAIN.value, - GEN_AI_FRAMEWORK: "dify", - INPUT_VALUE: json.dumps(trace_info.workflow_run_inputs, ensure_ascii=False), - OUTPUT_VALUE: json.dumps(trace_info.workflow_run_outputs, ensure_ascii=False), - }, + attributes=create_common_span_attributes( + session_id=trace_metadata.session_id, + user_id=trace_metadata.user_id, + span_kind=GenAISpanKind.CHAIN, + inputs=inputs_json, + outputs=outputs_json, + ), status=status, - links=links, + links=trace_metadata.links, ) self.trace_client.add_span(workflow_span) def suggested_question_trace(self, trace_info: SuggestedQuestionTraceInfo): message_id = trace_info.message_id - status: Status = Status(StatusCode.OK) - if trace_info.error: - status = Status(StatusCode.ERROR, trace_info.error) + status = create_status_from_error(trace_info.error) - trace_id = convert_to_trace_id(message_id) - links = [] - if trace_info.trace_id: - links.append(create_link(trace_id_str=trace_info.trace_id)) + trace_metadata = TraceMetadata( + trace_id=convert_to_trace_id(message_id), + workflow_span_id=0, + session_id=trace_info.metadata.get("conversation_id") or "", + user_id=str(trace_info.metadata.get("user_id") or ""), + links=create_links_from_trace_id(trace_info.trace_id), + ) + + inputs_json = serialize_json_data(trace_info.inputs) + suggested_question_json = serialize_json_data(trace_info.suggested_question) suggested_question_span = SpanData( - trace_id=trace_id, + trace_id=trace_metadata.trace_id, parent_span_id=convert_to_span_id(message_id, "message"), span_id=convert_to_span_id(message_id, "suggested_question"), name="suggested_question", start_time=convert_datetime_to_nanoseconds(trace_info.start_time), end_time=convert_datetime_to_nanoseconds(trace_info.end_time), attributes={ - GEN_AI_SPAN_KIND: GenAISpanKind.LLM.value, - GEN_AI_FRAMEWORK: "dify", + **create_common_span_attributes( + session_id=trace_metadata.session_id, + user_id=trace_metadata.user_id, + span_kind=GenAISpanKind.LLM, + inputs=inputs_json, + outputs=suggested_question_json, + ), GEN_AI_MODEL_NAME: trace_info.metadata.get("ls_model_name") or "", GEN_AI_SYSTEM: trace_info.metadata.get("ls_provider") or "", - GEN_AI_PROMPT: json.dumps(trace_info.inputs, ensure_ascii=False), - GEN_AI_COMPLETION: json.dumps(trace_info.suggested_question, ensure_ascii=False), - INPUT_VALUE: json.dumps(trace_info.inputs, ensure_ascii=False), - OUTPUT_VALUE: json.dumps(trace_info.suggested_question, ensure_ascii=False), + GEN_AI_PROMPT: inputs_json, + GEN_AI_COMPLETION: suggested_question_json, }, status=status, - links=links, + links=trace_metadata.links, ) self.trace_client.add_span(suggested_question_span) - - -def extract_retrieval_documents(documents: list[Document]): - documents_data = [] - for document in documents: - document_data = { - "content": document.page_content, - "metadata": { - "dataset_id": document.metadata.get("dataset_id"), - "doc_id": document.metadata.get("doc_id"), - "document_id": document.metadata.get("document_id"), - }, - "score": document.metadata.get("score"), - } - documents_data.append(document_data) - return documents_data diff --git a/api/core/ops/aliyun_trace/data_exporter/traceclient.py b/api/core/ops/aliyun_trace/data_exporter/traceclient.py index baaf9fd9f6..f54405b5de 100644 --- a/api/core/ops/aliyun_trace/data_exporter/traceclient.py +++ b/api/core/ops/aliyun_trace/data_exporter/traceclient.py @@ -7,6 +7,8 @@ import uuid from collections import deque from collections.abc import Sequence from datetime import datetime +from typing import Final +from urllib.parse import urljoin import httpx from opentelemetry import trace as trace_api @@ -20,8 +22,12 @@ from opentelemetry.trace import Link, SpanContext, TraceFlags from configs import dify_config from core.ops.aliyun_trace.entities.aliyun_trace_entity import SpanData -INVALID_SPAN_ID = 0x0000000000000000 -INVALID_TRACE_ID = 0x00000000000000000000000000000000 +INVALID_SPAN_ID: Final[int] = 0x0000000000000000 +INVALID_TRACE_ID: Final[int] = 0x00000000000000000000000000000000 +DEFAULT_TIMEOUT: Final[int] = 5 +DEFAULT_MAX_QUEUE_SIZE: Final[int] = 1000 +DEFAULT_SCHEDULE_DELAY_SEC: Final[int] = 5 +DEFAULT_MAX_EXPORT_BATCH_SIZE: Final[int] = 50 logger = logging.getLogger(__name__) @@ -31,9 +37,9 @@ class TraceClient: self, service_name: str, endpoint: str, - max_queue_size: int = 1000, - schedule_delay_sec: int = 5, - max_export_batch_size: int = 50, + max_queue_size: int = DEFAULT_MAX_QUEUE_SIZE, + schedule_delay_sec: int = DEFAULT_SCHEDULE_DELAY_SEC, + max_export_batch_size: int = DEFAULT_MAX_EXPORT_BATCH_SIZE, ): self.endpoint = endpoint self.resource = Resource( @@ -63,9 +69,9 @@ class TraceClient: def export(self, spans: Sequence[ReadableSpan]): self.exporter.export(spans) - def api_check(self): + def api_check(self) -> bool: try: - response = httpx.head(self.endpoint, timeout=5) + response = httpx.head(self.endpoint, timeout=DEFAULT_TIMEOUT) if response.status_code == 405: return True else: @@ -75,12 +81,13 @@ class TraceClient: logger.debug("AliyunTrace API check failed: %s", str(e)) raise ValueError(f"AliyunTrace API check failed: {str(e)}") - def get_project_url(self): + def get_project_url(self) -> str: return "https://arms.console.aliyun.com/#/llm" - def add_span(self, span_data: SpanData): + def add_span(self, span_data: SpanData | None) -> None: if span_data is None: return + span: ReadableSpan = self.span_builder.build_span(span_data) with self.condition: if len(self.queue) == self.max_queue_size: @@ -92,14 +99,14 @@ class TraceClient: if len(self.queue) >= self.max_export_batch_size: self.condition.notify() - def _worker(self): + def _worker(self) -> None: while not self.done: with self.condition: if len(self.queue) < self.max_export_batch_size and not self.done: self.condition.wait(timeout=self.schedule_delay_sec) self._export_batch() - def _export_batch(self): + def _export_batch(self) -> None: spans_to_export: list[ReadableSpan] = [] with self.condition: while len(spans_to_export) < self.max_export_batch_size and self.queue: @@ -111,7 +118,7 @@ class TraceClient: except Exception as e: logger.debug("Error exporting spans: %s", e) - def shutdown(self): + def shutdown(self) -> None: with self.condition: self.done = True self.condition.notify_all() @@ -121,7 +128,7 @@ class TraceClient: class SpanBuilder: - def __init__(self, resource): + def __init__(self, resource: Resource) -> None: self.resource = resource self.instrumentation_scope = InstrumentationScope( __name__, @@ -167,8 +174,12 @@ class SpanBuilder: def create_link(trace_id_str: str) -> Link: - placeholder_span_id = 0x0000000000000000 - trace_id = int(trace_id_str, 16) + placeholder_span_id = INVALID_SPAN_ID + try: + trace_id = int(trace_id_str, 16) + except ValueError as e: + raise ValueError(f"Invalid trace ID format: {trace_id_str}") from e + span_context = SpanContext( trace_id=trace_id, span_id=placeholder_span_id, is_remote=False, trace_flags=TraceFlags(TraceFlags.SAMPLED) ) @@ -184,26 +195,29 @@ def generate_span_id() -> int: def convert_to_trace_id(uuid_v4: str | None) -> int: + if uuid_v4 is None: + raise ValueError("UUID cannot be None") try: uuid_obj = uuid.UUID(uuid_v4) return uuid_obj.int - except Exception as e: - raise ValueError(f"Invalid UUID input: {e}") + except ValueError as e: + raise ValueError(f"Invalid UUID input: {uuid_v4}") from e def convert_string_to_id(string: str | None) -> int: if not string: return generate_span_id() hash_bytes = hashlib.sha256(string.encode("utf-8")).digest() - id = int.from_bytes(hash_bytes[:8], byteorder="big", signed=False) - return id + return int.from_bytes(hash_bytes[:8], byteorder="big", signed=False) def convert_to_span_id(uuid_v4: str | None, span_type: str) -> int: + if uuid_v4 is None: + raise ValueError("UUID cannot be None") try: uuid_obj = uuid.UUID(uuid_v4) - except Exception as e: - raise ValueError(f"Invalid UUID input: {e}") + except ValueError as e: + raise ValueError(f"Invalid UUID input: {uuid_v4}") from e combined_key = f"{uuid_obj.hex}-{span_type}" return convert_string_to_id(combined_key) @@ -212,5 +226,11 @@ def convert_datetime_to_nanoseconds(start_time_a: datetime | None) -> int | None if start_time_a is None: return None timestamp_in_seconds = start_time_a.timestamp() - timestamp_in_nanoseconds = int(timestamp_in_seconds * 1e9) - return timestamp_in_nanoseconds + return int(timestamp_in_seconds * 1e9) + + +def build_endpoint(base_url: str, license_key: str) -> str: + if "log.aliyuncs.com" in base_url: # cms2.0 endpoint + return urljoin(base_url, f"adapt_{license_key}/api/v1/traces") + else: # xtrace endpoint + return urljoin(base_url, f"adapt_{license_key}/api/otlp/traces") diff --git a/api/core/ops/aliyun_trace/entities/aliyun_trace_entity.py b/api/core/ops/aliyun_trace/entities/aliyun_trace_entity.py index f3dcbc5b8f..0ee71fc23f 100644 --- a/api/core/ops/aliyun_trace/entities/aliyun_trace_entity.py +++ b/api/core/ops/aliyun_trace/entities/aliyun_trace_entity.py @@ -1,18 +1,33 @@ from collections.abc import Sequence +from dataclasses import dataclass +from typing import Any from opentelemetry import trace as trace_api from opentelemetry.sdk.trace import Event, Status, StatusCode from pydantic import BaseModel, Field +@dataclass +class TraceMetadata: + """Metadata for trace operations, containing common attributes for all spans in a trace.""" + + trace_id: int + workflow_span_id: int + session_id: str + user_id: str + links: list[trace_api.Link] + + class SpanData(BaseModel): + """Data model for span information in Aliyun trace system.""" + model_config = {"arbitrary_types_allowed": True} trace_id: int = Field(..., description="The unique identifier for the trace.") parent_span_id: int | None = Field(None, description="The ID of the parent span, if any.") span_id: int = Field(..., description="The unique identifier for this span.") name: str = Field(..., description="The name of the span.") - attributes: dict[str, str] = Field(default_factory=dict, description="Attributes associated with the span.") + attributes: dict[str, Any] = Field(default_factory=dict, description="Attributes associated with the span.") events: Sequence[Event] = Field(default_factory=list, description="Events recorded in the span.") links: Sequence[trace_api.Link] = Field(default_factory=list, description="Links to other spans.") status: Status = Field(default=Status(StatusCode.UNSET), description="The status of the span.") diff --git a/api/core/ops/aliyun_trace/entities/semconv.py b/api/core/ops/aliyun_trace/entities/semconv.py index c9427c776a..7a22db21e2 100644 --- a/api/core/ops/aliyun_trace/entities/semconv.py +++ b/api/core/ops/aliyun_trace/entities/semconv.py @@ -1,56 +1,37 @@ from enum import StrEnum +from typing import Final -# public -GEN_AI_SESSION_ID = "gen_ai.session.id" +# Public attributes +GEN_AI_SESSION_ID: Final[str] = "gen_ai.session.id" +GEN_AI_USER_ID: Final[str] = "gen_ai.user.id" +GEN_AI_USER_NAME: Final[str] = "gen_ai.user.name" +GEN_AI_SPAN_KIND: Final[str] = "gen_ai.span.kind" +GEN_AI_FRAMEWORK: Final[str] = "gen_ai.framework" -GEN_AI_USER_ID = "gen_ai.user.id" +# Chain attributes +INPUT_VALUE: Final[str] = "input.value" +OUTPUT_VALUE: Final[str] = "output.value" -GEN_AI_USER_NAME = "gen_ai.user.name" +# Retriever attributes +RETRIEVAL_QUERY: Final[str] = "retrieval.query" +RETRIEVAL_DOCUMENT: Final[str] = "retrieval.document" -GEN_AI_SPAN_KIND = "gen_ai.span.kind" +# LLM attributes +GEN_AI_MODEL_NAME: Final[str] = "gen_ai.model_name" +GEN_AI_SYSTEM: Final[str] = "gen_ai.system" +GEN_AI_USAGE_INPUT_TOKENS: Final[str] = "gen_ai.usage.input_tokens" +GEN_AI_USAGE_OUTPUT_TOKENS: Final[str] = "gen_ai.usage.output_tokens" +GEN_AI_USAGE_TOTAL_TOKENS: Final[str] = "gen_ai.usage.total_tokens" +GEN_AI_PROMPT_TEMPLATE_TEMPLATE: Final[str] = "gen_ai.prompt_template.template" +GEN_AI_PROMPT_TEMPLATE_VARIABLE: Final[str] = "gen_ai.prompt_template.variable" +GEN_AI_PROMPT: Final[str] = "gen_ai.prompt" +GEN_AI_COMPLETION: Final[str] = "gen_ai.completion" +GEN_AI_RESPONSE_FINISH_REASON: Final[str] = "gen_ai.response.finish_reason" -GEN_AI_FRAMEWORK = "gen_ai.framework" - - -# Chain -INPUT_VALUE = "input.value" - -OUTPUT_VALUE = "output.value" - - -# Retriever -RETRIEVAL_QUERY = "retrieval.query" - -RETRIEVAL_DOCUMENT = "retrieval.document" - - -# LLM -GEN_AI_MODEL_NAME = "gen_ai.model_name" - -GEN_AI_SYSTEM = "gen_ai.system" - -GEN_AI_USAGE_INPUT_TOKENS = "gen_ai.usage.input_tokens" - -GEN_AI_USAGE_OUTPUT_TOKENS = "gen_ai.usage.output_tokens" - -GEN_AI_USAGE_TOTAL_TOKENS = "gen_ai.usage.total_tokens" - -GEN_AI_PROMPT_TEMPLATE_TEMPLATE = "gen_ai.prompt_template.template" - -GEN_AI_PROMPT_TEMPLATE_VARIABLE = "gen_ai.prompt_template.variable" - -GEN_AI_PROMPT = "gen_ai.prompt" - -GEN_AI_COMPLETION = "gen_ai.completion" - -GEN_AI_RESPONSE_FINISH_REASON = "gen_ai.response.finish_reason" - -# Tool -TOOL_NAME = "tool.name" - -TOOL_DESCRIPTION = "tool.description" - -TOOL_PARAMETERS = "tool.parameters" +# Tool attributes +TOOL_NAME: Final[str] = "tool.name" +TOOL_DESCRIPTION: Final[str] = "tool.description" +TOOL_PARAMETERS: Final[str] = "tool.parameters" class GenAISpanKind(StrEnum): diff --git a/api/core/ops/aliyun_trace/utils.py b/api/core/ops/aliyun_trace/utils.py new file mode 100644 index 0000000000..2ec9e75dcd --- /dev/null +++ b/api/core/ops/aliyun_trace/utils.py @@ -0,0 +1,95 @@ +import json +from typing import Any + +from opentelemetry.trace import Link, Status, StatusCode + +from core.ops.aliyun_trace.entities.semconv import ( + GEN_AI_FRAMEWORK, + GEN_AI_SESSION_ID, + GEN_AI_SPAN_KIND, + GEN_AI_USER_ID, + INPUT_VALUE, + OUTPUT_VALUE, + GenAISpanKind, +) +from core.rag.models.document import Document +from core.workflow.entities import WorkflowNodeExecution +from core.workflow.enums import WorkflowNodeExecutionStatus +from extensions.ext_database import db +from models import EndUser + +# Constants +DEFAULT_JSON_ENSURE_ASCII = False +DEFAULT_FRAMEWORK_NAME = "dify" + + +def get_user_id_from_message_data(message_data) -> str: + user_id = message_data.from_account_id + if message_data.from_end_user_id: + end_user_data: EndUser | None = ( + db.session.query(EndUser).where(EndUser.id == message_data.from_end_user_id).first() + ) + if end_user_data is not None: + user_id = end_user_data.session_id + return user_id + + +def create_status_from_error(error: str | None) -> Status: + if error: + return Status(StatusCode.ERROR, error) + return Status(StatusCode.OK) + + +def get_workflow_node_status(node_execution: WorkflowNodeExecution) -> Status: + if node_execution.status == WorkflowNodeExecutionStatus.SUCCEEDED: + return Status(StatusCode.OK) + if node_execution.status in [WorkflowNodeExecutionStatus.FAILED, WorkflowNodeExecutionStatus.EXCEPTION]: + return Status(StatusCode.ERROR, str(node_execution.error)) + return Status(StatusCode.UNSET) + + +def create_links_from_trace_id(trace_id: str | None) -> list[Link]: + from core.ops.aliyun_trace.data_exporter.traceclient import create_link + + links = [] + if trace_id: + links.append(create_link(trace_id_str=trace_id)) + return links + + +def extract_retrieval_documents(documents: list[Document]) -> list[dict[str, Any]]: + documents_data = [] + for document in documents: + document_data = { + "content": document.page_content, + "metadata": { + "dataset_id": document.metadata.get("dataset_id"), + "doc_id": document.metadata.get("doc_id"), + "document_id": document.metadata.get("document_id"), + }, + "score": document.metadata.get("score"), + } + documents_data.append(document_data) + return documents_data + + +def serialize_json_data(data: Any, ensure_ascii: bool = DEFAULT_JSON_ENSURE_ASCII) -> str: + return json.dumps(data, ensure_ascii=ensure_ascii) + + +def create_common_span_attributes( + session_id: str = "", + user_id: str = "", + span_kind: str = GenAISpanKind.CHAIN, + framework: str = DEFAULT_FRAMEWORK_NAME, + inputs: str = "", + outputs: str = "", +) -> dict[str, Any]: + return { + GEN_AI_SESSION_ID: session_id, + GEN_AI_USER_ID: user_id, + GEN_AI_SPAN_KIND: span_kind, + GEN_AI_FRAMEWORK: framework, + INPUT_VALUE: inputs, + OUTPUT_VALUE: outputs, + } diff --git a/api/core/ops/entities/config_entity.py b/api/core/ops/entities/config_entity.py index 851a77fbc1..4ba6eb0780 100644 --- a/api/core/ops/entities/config_entity.py +++ b/api/core/ops/entities/config_entity.py @@ -191,7 +191,8 @@ class AliyunConfig(BaseTracingConfig): @field_validator("endpoint") @classmethod def endpoint_validator(cls, v, info: ValidationInfo): - return cls.validate_endpoint_url(v, "https://tracing-analysis-dc-hz.aliyuncs.com") + # aliyun uses two URL formats, which may include a URL path + return validate_url_with_path(v, "https://tracing-analysis-dc-hz.aliyuncs.com") OPS_FILE_PATH = "ops_trace/" diff --git a/api/tests/unit_tests/core/ops/test_config_entity.py b/api/tests/unit_tests/core/ops/test_config_entity.py index 1dc380ad0b..2cbff54c42 100644 --- a/api/tests/unit_tests/core/ops/test_config_entity.py +++ b/api/tests/unit_tests/core/ops/test_config_entity.py @@ -329,20 +329,20 @@ class TestAliyunConfig: assert config.endpoint == "https://tracing-analysis-dc-hz.aliyuncs.com" def test_endpoint_validation_with_path(self): - """Test endpoint validation normalizes URL by removing path""" + """Test endpoint validation preserves path for Aliyun endpoints""" config = AliyunConfig( license_key="test_license", endpoint="https://tracing-analysis-dc-hz.aliyuncs.com/api/v1/traces" ) - assert config.endpoint == "https://tracing-analysis-dc-hz.aliyuncs.com" + assert config.endpoint == "https://tracing-analysis-dc-hz.aliyuncs.com/api/v1/traces" def test_endpoint_validation_invalid_scheme(self): """Test endpoint validation rejects invalid schemes""" - with pytest.raises(ValidationError, match="URL scheme must be one of"): + with pytest.raises(ValidationError, match="URL must start with https:// or http://"): AliyunConfig(license_key="test_license", endpoint="ftp://invalid.tracing-analysis-dc-hz.aliyuncs.com") def test_endpoint_validation_no_scheme(self): """Test endpoint validation rejects URLs without scheme""" - with pytest.raises(ValidationError, match="URL scheme must be one of"): + with pytest.raises(ValidationError, match="URL must start with https:// or http://"): AliyunConfig(license_key="test_license", endpoint="invalid.tracing-analysis-dc-hz.aliyuncs.com") def test_license_key_required(self): @@ -350,6 +350,23 @@ class TestAliyunConfig: with pytest.raises(ValidationError): AliyunConfig(license_key="", endpoint="https://tracing-analysis-dc-hz.aliyuncs.com") + def test_valid_endpoint_format_examples(self): + """Test valid endpoint format examples from comments""" + valid_endpoints = [ + # cms2.0 public endpoint + "https://proj-xtrace-123456-cn-heyuan.cn-heyuan.log.aliyuncs.com/apm/trace/opentelemetry", + # cms2.0 intranet endpoint + "https://proj-xtrace-123456-cn-heyuan.cn-heyuan-intranet.log.aliyuncs.com/apm/trace/opentelemetry", + # xtrace public endpoint + "http://tracing-cn-heyuan.arms.aliyuncs.com", + # xtrace intranet endpoint + "http://tracing-cn-heyuan-internal.arms.aliyuncs.com", + ] + + for endpoint in valid_endpoints: + config = AliyunConfig(license_key="test_license", endpoint=endpoint) + assert config.endpoint == endpoint + class TestConfigIntegration: """Integration tests for configuration classes""" @@ -382,7 +399,7 @@ class TestConfigIntegration: assert arize_config.endpoint == "https://arize.com" assert phoenix_with_path_config.endpoint == "https://app.phoenix.arize.com/s/dify-integration" assert phoenix_without_path_config.endpoint == "https://app.phoenix.arize.com" - assert aliyun_config.endpoint == "https://tracing-analysis-dc-hz.aliyuncs.com" + assert aliyun_config.endpoint == "https://tracing-analysis-dc-hz.aliyuncs.com/api/v1/traces" def test_project_default_values(self): """Test that project default values are set correctly"""