mirror of
https://github.com/langgenius/dify.git
synced 2026-05-11 14:58:23 +08:00
evaluation runtime
This commit is contained in:
parent
1ce0610c4c
commit
a3cf1a18a3
@ -1366,6 +1366,32 @@ class SandboxExpiredRecordsCleanConfig(BaseSettings):
|
||||
)
|
||||
|
||||
|
||||
class EvaluationConfig(BaseSettings):
|
||||
"""
|
||||
Configuration for evaluation runtime
|
||||
"""
|
||||
|
||||
EVALUATION_FRAMEWORK: str = Field(
|
||||
description="Evaluation framework to use (ragas/deepeval/none)",
|
||||
default="none",
|
||||
)
|
||||
|
||||
EVALUATION_MAX_CONCURRENT_RUNS: PositiveInt = Field(
|
||||
description="Maximum number of concurrent evaluation runs per tenant",
|
||||
default=3,
|
||||
)
|
||||
|
||||
EVALUATION_MAX_DATASET_ROWS: PositiveInt = Field(
|
||||
description="Maximum number of rows allowed in an evaluation dataset",
|
||||
default=1000,
|
||||
)
|
||||
|
||||
EVALUATION_TASK_TIMEOUT: PositiveInt = Field(
|
||||
description="Timeout in seconds for a single evaluation task",
|
||||
default=3600,
|
||||
)
|
||||
|
||||
|
||||
class FeatureConfig(
|
||||
# place the configs in alphabet order
|
||||
AppExecutionConfig,
|
||||
@ -1378,6 +1404,7 @@ class FeatureConfig(
|
||||
MarketplaceConfig,
|
||||
DataSetConfig,
|
||||
EndpointConfig,
|
||||
EvaluationConfig,
|
||||
FileAccessConfig,
|
||||
FileUploadConfig,
|
||||
HttpConfig,
|
||||
|
||||
@ -1,7 +1,9 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
from collections.abc import Callable
|
||||
from functools import wraps
|
||||
from typing import ParamSpec, TypeVar, Union
|
||||
from typing import TYPE_CHECKING, ParamSpec, TypeVar, Union
|
||||
from urllib.parse import quote
|
||||
|
||||
from flask import Response, request
|
||||
@ -9,7 +11,7 @@ from flask_restx import Resource, fields
|
||||
from pydantic import BaseModel
|
||||
from sqlalchemy import select
|
||||
from sqlalchemy.orm import Session
|
||||
from werkzeug.exceptions import NotFound
|
||||
from werkzeug.exceptions import BadRequest, NotFound
|
||||
|
||||
from controllers.common.schema import register_schema_models
|
||||
from controllers.console import console_ns
|
||||
@ -18,6 +20,7 @@ from controllers.console.wraps import (
|
||||
edit_permission_required,
|
||||
setup_required,
|
||||
)
|
||||
from core.evaluation.entities.evaluation_entity import EvaluationCategory
|
||||
from core.file import helpers as file_helpers
|
||||
from extensions.ext_database import db
|
||||
from libs.helper import TimestampField
|
||||
@ -25,8 +28,17 @@ from libs.login import current_account_with_tenant, login_required
|
||||
from models import App
|
||||
from models.model import UploadFile
|
||||
from models.snippet import CustomizedSnippet
|
||||
from services.errors.evaluation import (
|
||||
EvaluationDatasetInvalidError,
|
||||
EvaluationFrameworkNotConfiguredError,
|
||||
EvaluationMaxConcurrentRunsError,
|
||||
EvaluationNotFoundError,
|
||||
)
|
||||
from services.evaluation_service import EvaluationService
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from models.evaluation import EvaluationRun, EvaluationRunItem
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
P = ParamSpec("P")
|
||||
@ -208,25 +220,70 @@ class EvaluationDetailApi(Resource):
|
||||
@get_evaluation_target
|
||||
def get(self, target: Union[App, CustomizedSnippet], target_type: str):
|
||||
"""
|
||||
Get evaluation details for the target.
|
||||
Get evaluation configuration for the target.
|
||||
|
||||
Returns evaluation configuration including model settings,
|
||||
customized matrix, and judgement conditions.
|
||||
metrics config, and judgement conditions.
|
||||
"""
|
||||
# TODO: Implement actual evaluation detail retrieval
|
||||
# This is a placeholder implementation
|
||||
_, current_tenant_id = current_account_with_tenant()
|
||||
|
||||
with Session(db.engine, expire_on_commit=False) as session:
|
||||
config = EvaluationService.get_evaluation_config(
|
||||
session, current_tenant_id, target_type, str(target.id)
|
||||
)
|
||||
|
||||
if config is None:
|
||||
return {
|
||||
"evaluation_model": None,
|
||||
"evaluation_model_provider": None,
|
||||
"metrics_config": None,
|
||||
"judgement_conditions": None,
|
||||
}
|
||||
|
||||
return {
|
||||
"evaluation_model": None,
|
||||
"evaluation_model_provider": None,
|
||||
"customized_matrix": None,
|
||||
"judgement_conditions": None,
|
||||
"evaluation_model": config.evaluation_model,
|
||||
"evaluation_model_provider": config.evaluation_model_provider,
|
||||
"metrics_config": config.metrics_config_dict,
|
||||
"judgement_conditions": config.judgement_conditions_dict,
|
||||
}
|
||||
|
||||
@console_ns.doc("save_evaluation_detail")
|
||||
@console_ns.response(200, "Evaluation configuration saved successfully")
|
||||
@console_ns.response(404, "Target not found")
|
||||
@setup_required
|
||||
@login_required
|
||||
@account_initialization_required
|
||||
@get_evaluation_target
|
||||
@edit_permission_required
|
||||
def put(self, target: Union[App, CustomizedSnippet], target_type: str):
|
||||
"""
|
||||
Save evaluation configuration for the target.
|
||||
"""
|
||||
current_account, current_tenant_id = current_account_with_tenant()
|
||||
data = request.get_json(force=True)
|
||||
|
||||
with Session(db.engine, expire_on_commit=False) as session:
|
||||
config = EvaluationService.save_evaluation_config(
|
||||
session=session,
|
||||
tenant_id=current_tenant_id,
|
||||
target_type=target_type,
|
||||
target_id=str(target.id),
|
||||
account_id=str(current_account.id),
|
||||
data=data,
|
||||
)
|
||||
|
||||
return {
|
||||
"evaluation_model": config.evaluation_model,
|
||||
"evaluation_model_provider": config.evaluation_model_provider,
|
||||
"metrics_config": config.metrics_config_dict,
|
||||
"judgement_conditions": config.judgement_conditions_dict,
|
||||
}
|
||||
|
||||
|
||||
@console_ns.route("/<string:evaluate_target_type>/<uuid:evaluate_target_id>/evaluation/logs")
|
||||
class EvaluationLogsApi(Resource):
|
||||
@console_ns.doc("get_evaluation_logs")
|
||||
@console_ns.response(200, "Evaluation logs retrieved successfully", evaluation_log_list_model)
|
||||
@console_ns.response(200, "Evaluation logs retrieved successfully")
|
||||
@console_ns.response(404, "Target not found")
|
||||
@setup_required
|
||||
@login_required
|
||||
@ -234,18 +291,192 @@ class EvaluationLogsApi(Resource):
|
||||
@get_evaluation_target
|
||||
def get(self, target: Union[App, CustomizedSnippet], target_type: str):
|
||||
"""
|
||||
Get offline evaluation logs for the target.
|
||||
Get evaluation run history for the target.
|
||||
|
||||
Returns a list of evaluation runs with test files,
|
||||
result files, and version information.
|
||||
Returns a paginated list of evaluation runs.
|
||||
"""
|
||||
# TODO: Implement actual evaluation logs retrieval
|
||||
# This is a placeholder implementation
|
||||
_, current_tenant_id = current_account_with_tenant()
|
||||
page = request.args.get("page", 1, type=int)
|
||||
page_size = request.args.get("page_size", 20, type=int)
|
||||
|
||||
with Session(db.engine, expire_on_commit=False) as session:
|
||||
runs, total = EvaluationService.get_evaluation_runs(
|
||||
session=session,
|
||||
tenant_id=current_tenant_id,
|
||||
target_type=target_type,
|
||||
target_id=str(target.id),
|
||||
page=page,
|
||||
page_size=page_size,
|
||||
)
|
||||
|
||||
return {
|
||||
"data": [],
|
||||
"data": [_serialize_evaluation_run(run) for run in runs],
|
||||
"total": total,
|
||||
"page": page,
|
||||
"page_size": page_size,
|
||||
}
|
||||
|
||||
|
||||
@console_ns.route("/<string:evaluate_target_type>/<uuid:evaluate_target_id>/evaluation/run")
|
||||
class EvaluationRunApi(Resource):
|
||||
@console_ns.doc("start_evaluation_run")
|
||||
@console_ns.response(200, "Evaluation run started")
|
||||
@console_ns.response(400, "Invalid request")
|
||||
@console_ns.response(404, "Target not found")
|
||||
@setup_required
|
||||
@login_required
|
||||
@account_initialization_required
|
||||
@get_evaluation_target
|
||||
@edit_permission_required
|
||||
def post(self, target: Union[App, CustomizedSnippet], target_type: str):
|
||||
"""
|
||||
Start an evaluation run.
|
||||
|
||||
Expects multipart form data with:
|
||||
- file: XLSX dataset file
|
||||
- evaluation_category: one of llm, retrieval, agent, workflow
|
||||
"""
|
||||
current_account, current_tenant_id = current_account_with_tenant()
|
||||
|
||||
# Validate file upload
|
||||
if "file" not in request.files:
|
||||
raise BadRequest("Dataset file is required.")
|
||||
file = request.files["file"]
|
||||
if not file.filename or not file.filename.endswith(".xlsx"):
|
||||
raise BadRequest("Dataset file must be an XLSX file.")
|
||||
|
||||
dataset_content = file.read()
|
||||
if not dataset_content:
|
||||
raise BadRequest("Dataset file is empty.")
|
||||
|
||||
# Validate evaluation category
|
||||
category_str = request.form.get("evaluation_category", "llm")
|
||||
try:
|
||||
evaluation_category = EvaluationCategory(category_str)
|
||||
except ValueError:
|
||||
raise BadRequest(
|
||||
f"Invalid evaluation_category: {category_str}. "
|
||||
f"Must be one of: {', '.join(e.value for e in EvaluationCategory)}"
|
||||
)
|
||||
|
||||
try:
|
||||
with Session(db.engine, expire_on_commit=False) as session:
|
||||
evaluation_run = EvaluationService.start_evaluation_run(
|
||||
session=session,
|
||||
tenant_id=current_tenant_id,
|
||||
target_type=target_type,
|
||||
target_id=str(target.id),
|
||||
account_id=str(current_account.id),
|
||||
dataset_file_content=dataset_content,
|
||||
evaluation_category=evaluation_category,
|
||||
)
|
||||
return _serialize_evaluation_run(evaluation_run), 200
|
||||
except EvaluationFrameworkNotConfiguredError as e:
|
||||
return {"message": str(e.description)}, 400
|
||||
except EvaluationNotFoundError as e:
|
||||
return {"message": str(e.description)}, 404
|
||||
except EvaluationMaxConcurrentRunsError as e:
|
||||
return {"message": str(e.description)}, 429
|
||||
except EvaluationDatasetInvalidError as e:
|
||||
return {"message": str(e.description)}, 400
|
||||
|
||||
|
||||
@console_ns.route(
|
||||
"/<string:evaluate_target_type>/<uuid:evaluate_target_id>/evaluation/runs/<uuid:run_id>"
|
||||
)
|
||||
class EvaluationRunDetailApi(Resource):
|
||||
@console_ns.doc("get_evaluation_run_detail")
|
||||
@console_ns.response(200, "Evaluation run detail retrieved")
|
||||
@console_ns.response(404, "Run not found")
|
||||
@setup_required
|
||||
@login_required
|
||||
@account_initialization_required
|
||||
@get_evaluation_target
|
||||
def get(self, target: Union[App, CustomizedSnippet], target_type: str, run_id: str):
|
||||
"""
|
||||
Get evaluation run detail including items.
|
||||
"""
|
||||
_, current_tenant_id = current_account_with_tenant()
|
||||
run_id = str(run_id)
|
||||
page = request.args.get("page", 1, type=int)
|
||||
page_size = request.args.get("page_size", 50, type=int)
|
||||
|
||||
try:
|
||||
with Session(db.engine, expire_on_commit=False) as session:
|
||||
run = EvaluationService.get_evaluation_run_detail(
|
||||
session=session,
|
||||
tenant_id=current_tenant_id,
|
||||
run_id=run_id,
|
||||
)
|
||||
items, total_items = EvaluationService.get_evaluation_run_items(
|
||||
session=session,
|
||||
run_id=run_id,
|
||||
page=page,
|
||||
page_size=page_size,
|
||||
)
|
||||
|
||||
return {
|
||||
"run": _serialize_evaluation_run(run),
|
||||
"items": {
|
||||
"data": [_serialize_evaluation_run_item(item) for item in items],
|
||||
"total": total_items,
|
||||
"page": page,
|
||||
"page_size": page_size,
|
||||
},
|
||||
}
|
||||
except EvaluationNotFoundError as e:
|
||||
return {"message": str(e.description)}, 404
|
||||
|
||||
|
||||
@console_ns.route(
|
||||
"/<string:evaluate_target_type>/<uuid:evaluate_target_id>/evaluation/runs/<uuid:run_id>/cancel"
|
||||
)
|
||||
class EvaluationRunCancelApi(Resource):
|
||||
@console_ns.doc("cancel_evaluation_run")
|
||||
@console_ns.response(200, "Evaluation run cancelled")
|
||||
@console_ns.response(404, "Run not found")
|
||||
@setup_required
|
||||
@login_required
|
||||
@account_initialization_required
|
||||
@get_evaluation_target
|
||||
@edit_permission_required
|
||||
def post(self, target: Union[App, CustomizedSnippet], target_type: str, run_id: str):
|
||||
"""Cancel a running evaluation."""
|
||||
_, current_tenant_id = current_account_with_tenant()
|
||||
run_id = str(run_id)
|
||||
|
||||
try:
|
||||
with Session(db.engine, expire_on_commit=False) as session:
|
||||
run = EvaluationService.cancel_evaluation_run(
|
||||
session=session,
|
||||
tenant_id=current_tenant_id,
|
||||
run_id=run_id,
|
||||
)
|
||||
return _serialize_evaluation_run(run)
|
||||
except EvaluationNotFoundError as e:
|
||||
return {"message": str(e.description)}, 404
|
||||
except ValueError as e:
|
||||
return {"message": str(e)}, 400
|
||||
|
||||
|
||||
@console_ns.route("/<string:evaluate_target_type>/<uuid:evaluate_target_id>/evaluation/metrics")
|
||||
class EvaluationMetricsApi(Resource):
|
||||
@console_ns.doc("get_evaluation_metrics")
|
||||
@console_ns.response(200, "Available metrics retrieved")
|
||||
@setup_required
|
||||
@login_required
|
||||
@account_initialization_required
|
||||
@get_evaluation_target
|
||||
def get(self, target: Union[App, CustomizedSnippet], target_type: str):
|
||||
"""
|
||||
Get available evaluation metrics for the current framework.
|
||||
"""
|
||||
result = {}
|
||||
for category in EvaluationCategory:
|
||||
result[category.value] = EvaluationService.get_supported_metrics(category)
|
||||
return {"metrics": result}
|
||||
|
||||
|
||||
@console_ns.route("/<string:evaluate_target_type>/<uuid:evaluate_target_id>/evaluation/files/<uuid:file_id>")
|
||||
class EvaluationFileDownloadApi(Resource):
|
||||
@console_ns.doc("download_evaluation_file")
|
||||
@ -309,8 +540,6 @@ class EvaluationVersionApi(Resource):
|
||||
if not version:
|
||||
return {"message": "version parameter is required"}, 400
|
||||
|
||||
# TODO: Implement actual version detail retrieval
|
||||
# For now, return the current graph if available
|
||||
graph = {}
|
||||
if target_type == "snippets" and isinstance(target, CustomizedSnippet):
|
||||
graph = target.graph_dict
|
||||
@ -318,3 +547,43 @@ class EvaluationVersionApi(Resource):
|
||||
return {
|
||||
"graph": graph,
|
||||
}
|
||||
|
||||
|
||||
# ---- Serialization Helpers ----
|
||||
|
||||
|
||||
def _serialize_evaluation_run(run: EvaluationRun) -> dict[str, object]:
|
||||
return {
|
||||
"id": run.id,
|
||||
"tenant_id": run.tenant_id,
|
||||
"target_type": run.target_type,
|
||||
"target_id": run.target_id,
|
||||
"evaluation_config_id": run.evaluation_config_id,
|
||||
"status": run.status,
|
||||
"dataset_file_id": run.dataset_file_id,
|
||||
"result_file_id": run.result_file_id,
|
||||
"total_items": run.total_items,
|
||||
"completed_items": run.completed_items,
|
||||
"failed_items": run.failed_items,
|
||||
"progress": run.progress,
|
||||
"metrics_summary": run.metrics_summary_dict,
|
||||
"error": run.error,
|
||||
"created_by": run.created_by,
|
||||
"started_at": int(run.started_at.timestamp()) if run.started_at else None,
|
||||
"completed_at": int(run.completed_at.timestamp()) if run.completed_at else None,
|
||||
"created_at": int(run.created_at.timestamp()) if run.created_at else None,
|
||||
}
|
||||
|
||||
|
||||
def _serialize_evaluation_run_item(item: EvaluationRunItem) -> dict[str, object]:
|
||||
return {
|
||||
"id": item.id,
|
||||
"item_index": item.item_index,
|
||||
"inputs": item.inputs_dict,
|
||||
"expected_output": item.expected_output,
|
||||
"actual_output": item.actual_output,
|
||||
"metrics": item.metrics_list,
|
||||
"metadata": item.metadata_dict,
|
||||
"error": item.error,
|
||||
"overall_score": item.overall_score,
|
||||
}
|
||||
|
||||
0
api/core/evaluation/__init__.py
Normal file
0
api/core/evaluation/__init__.py
Normal file
64
api/core/evaluation/base_evaluation_instance.py
Normal file
64
api/core/evaluation/base_evaluation_instance.py
Normal file
@ -0,0 +1,64 @@
|
||||
from abc import ABC, abstractmethod
|
||||
|
||||
from core.evaluation.entities.evaluation_entity import (
|
||||
EvaluationCategory,
|
||||
EvaluationItemInput,
|
||||
EvaluationItemResult,
|
||||
)
|
||||
|
||||
|
||||
class BaseEvaluationInstance(ABC):
|
||||
"""Abstract base class for evaluation framework adapters."""
|
||||
|
||||
@abstractmethod
|
||||
def evaluate_llm(
|
||||
self,
|
||||
items: list[EvaluationItemInput],
|
||||
metrics_config: dict,
|
||||
model_provider: str,
|
||||
model_name: str,
|
||||
tenant_id: str,
|
||||
) -> list[EvaluationItemResult]:
|
||||
"""Evaluate LLM outputs using the configured framework."""
|
||||
...
|
||||
|
||||
@abstractmethod
|
||||
def evaluate_retrieval(
|
||||
self,
|
||||
items: list[EvaluationItemInput],
|
||||
metrics_config: dict,
|
||||
model_provider: str,
|
||||
model_name: str,
|
||||
tenant_id: str,
|
||||
) -> list[EvaluationItemResult]:
|
||||
"""Evaluate retrieval quality using the configured framework."""
|
||||
...
|
||||
|
||||
@abstractmethod
|
||||
def evaluate_agent(
|
||||
self,
|
||||
items: list[EvaluationItemInput],
|
||||
metrics_config: dict,
|
||||
model_provider: str,
|
||||
model_name: str,
|
||||
tenant_id: str,
|
||||
) -> list[EvaluationItemResult]:
|
||||
"""Evaluate agent outputs using the configured framework."""
|
||||
...
|
||||
|
||||
@abstractmethod
|
||||
def evaluate_workflow(
|
||||
self,
|
||||
items: list[EvaluationItemInput],
|
||||
metrics_config: dict,
|
||||
model_provider: str,
|
||||
model_name: str,
|
||||
tenant_id: str,
|
||||
) -> list[EvaluationItemResult]:
|
||||
"""Evaluate workflow outputs using the configured framework."""
|
||||
...
|
||||
|
||||
@abstractmethod
|
||||
def get_supported_metrics(self, category: EvaluationCategory) -> list[str]:
|
||||
"""Return the list of supported metric names for a given evaluation category."""
|
||||
...
|
||||
0
api/core/evaluation/entities/__init__.py
Normal file
0
api/core/evaluation/entities/__init__.py
Normal file
19
api/core/evaluation/entities/config_entity.py
Normal file
19
api/core/evaluation/entities/config_entity.py
Normal file
@ -0,0 +1,19 @@
|
||||
from enum import StrEnum
|
||||
|
||||
from pydantic import BaseModel
|
||||
|
||||
|
||||
class EvaluationFrameworkEnum(StrEnum):
|
||||
RAGAS = "ragas"
|
||||
DEEPEVAL = "deepeval"
|
||||
NONE = "none"
|
||||
|
||||
|
||||
class BaseEvaluationConfig(BaseModel):
|
||||
"""Base configuration for evaluation frameworks."""
|
||||
pass
|
||||
|
||||
|
||||
class RagasConfig(BaseEvaluationConfig):
|
||||
"""RAGAS-specific configuration."""
|
||||
pass
|
||||
52
api/core/evaluation/entities/evaluation_entity.py
Normal file
52
api/core/evaluation/entities/evaluation_entity.py
Normal file
@ -0,0 +1,52 @@
|
||||
from enum import StrEnum
|
||||
from typing import Any
|
||||
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
|
||||
class EvaluationCategory(StrEnum):
|
||||
LLM = "llm"
|
||||
RETRIEVAL = "retrieval"
|
||||
AGENT = "agent"
|
||||
WORKFLOW = "workflow"
|
||||
|
||||
|
||||
class EvaluationMetric(BaseModel):
|
||||
name: str
|
||||
score: float
|
||||
details: dict[str, Any] = Field(default_factory=dict)
|
||||
|
||||
|
||||
class EvaluationItemInput(BaseModel):
|
||||
index: int
|
||||
inputs: dict[str, Any]
|
||||
expected_output: str | None = None
|
||||
context: list[str] | None = None
|
||||
|
||||
|
||||
class EvaluationItemResult(BaseModel):
|
||||
index: int
|
||||
actual_output: str | None = None
|
||||
metrics: list[EvaluationMetric] = Field(default_factory=list)
|
||||
metadata: dict[str, Any] = Field(default_factory=dict)
|
||||
error: str | None = None
|
||||
|
||||
@property
|
||||
def overall_score(self) -> float | None:
|
||||
if not self.metrics:
|
||||
return None
|
||||
scores = [m.score for m in self.metrics]
|
||||
return sum(scores) / len(scores)
|
||||
|
||||
|
||||
class EvaluationRunData(BaseModel):
|
||||
"""Serializable data for Celery task."""
|
||||
evaluation_run_id: str
|
||||
tenant_id: str
|
||||
target_type: str
|
||||
target_id: str
|
||||
evaluation_category: EvaluationCategory
|
||||
evaluation_model_provider: str
|
||||
evaluation_model: str
|
||||
metrics_config: dict[str, Any] = Field(default_factory=dict)
|
||||
items: list[EvaluationItemInput]
|
||||
61
api/core/evaluation/evaluation_manager.py
Normal file
61
api/core/evaluation/evaluation_manager.py
Normal file
@ -0,0 +1,61 @@
|
||||
import collections
|
||||
import logging
|
||||
from typing import Any
|
||||
|
||||
from configs import dify_config
|
||||
from core.evaluation.base_evaluation_instance import BaseEvaluationInstance
|
||||
from core.evaluation.entities.config_entity import EvaluationFrameworkEnum
|
||||
from core.evaluation.entities.evaluation_entity import EvaluationCategory
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class EvaluationFrameworkConfigMap(collections.UserDict[str, dict[str, Any]]):
|
||||
"""Registry mapping framework enum -> {config_class, evaluator_class}."""
|
||||
|
||||
def __getitem__(self, framework: str) -> dict[str, Any]:
|
||||
match framework:
|
||||
case EvaluationFrameworkEnum.RAGAS:
|
||||
from core.evaluation.entities.config_entity import RagasConfig
|
||||
from core.evaluation.frameworks.ragas.ragas_evaluator import RagasEvaluator
|
||||
|
||||
return {
|
||||
"config_class": RagasConfig,
|
||||
"evaluator_class": RagasEvaluator,
|
||||
}
|
||||
case EvaluationFrameworkEnum.DEEPEVAL:
|
||||
raise NotImplementedError("DeepEval adapter is not yet implemented.")
|
||||
case _:
|
||||
raise ValueError(f"Unknown evaluation framework: {framework}")
|
||||
|
||||
|
||||
evaluation_framework_config_map = EvaluationFrameworkConfigMap()
|
||||
|
||||
|
||||
class EvaluationManager:
|
||||
"""Factory for evaluation instances based on global configuration."""
|
||||
|
||||
@staticmethod
|
||||
def get_evaluation_instance() -> BaseEvaluationInstance | None:
|
||||
"""Create and return an evaluation instance based on EVALUATION_FRAMEWORK env var."""
|
||||
framework = dify_config.EVALUATION_FRAMEWORK
|
||||
if not framework or framework == EvaluationFrameworkEnum.NONE:
|
||||
return None
|
||||
|
||||
try:
|
||||
config_map = evaluation_framework_config_map[framework]
|
||||
evaluator_class = config_map["evaluator_class"]
|
||||
config_class = config_map["config_class"]
|
||||
config = config_class()
|
||||
return evaluator_class(config)
|
||||
except Exception:
|
||||
logger.exception("Failed to create evaluation instance for framework: %s", framework)
|
||||
return None
|
||||
|
||||
@staticmethod
|
||||
def get_supported_metrics(category: EvaluationCategory) -> list[str]:
|
||||
"""Return supported metrics for the current framework and given category."""
|
||||
instance = EvaluationManager.get_evaluation_instance()
|
||||
if instance is None:
|
||||
return []
|
||||
return instance.get_supported_metrics(category)
|
||||
0
api/core/evaluation/frameworks/__init__.py
Normal file
0
api/core/evaluation/frameworks/__init__.py
Normal file
0
api/core/evaluation/frameworks/ragas/__init__.py
Normal file
0
api/core/evaluation/frameworks/ragas/__init__.py
Normal file
279
api/core/evaluation/frameworks/ragas/ragas_evaluator.py
Normal file
279
api/core/evaluation/frameworks/ragas/ragas_evaluator.py
Normal file
@ -0,0 +1,279 @@
|
||||
import logging
|
||||
from typing import Any
|
||||
|
||||
from core.evaluation.base_evaluation_instance import BaseEvaluationInstance
|
||||
from core.evaluation.entities.config_entity import RagasConfig
|
||||
from core.evaluation.entities.evaluation_entity import (
|
||||
EvaluationCategory,
|
||||
EvaluationItemInput,
|
||||
EvaluationItemResult,
|
||||
EvaluationMetric,
|
||||
)
|
||||
from core.evaluation.frameworks.ragas.ragas_model_wrapper import DifyModelWrapper
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Metric name mappings per category
|
||||
LLM_METRICS = ["faithfulness", "answer_relevancy", "answer_correctness", "answer_similarity"]
|
||||
RETRIEVAL_METRICS = ["context_precision", "context_recall", "context_relevancy"]
|
||||
AGENT_METRICS = ["tool_call_accuracy", "answer_correctness"]
|
||||
WORKFLOW_METRICS = ["faithfulness", "answer_correctness"]
|
||||
|
||||
|
||||
class RagasEvaluator(BaseEvaluationInstance):
|
||||
"""RAGAS framework adapter for evaluation."""
|
||||
|
||||
def __init__(self, config: RagasConfig):
|
||||
self.config = config
|
||||
|
||||
def get_supported_metrics(self, category: EvaluationCategory) -> list[str]:
|
||||
match category:
|
||||
case EvaluationCategory.LLM:
|
||||
return LLM_METRICS
|
||||
case EvaluationCategory.RETRIEVAL:
|
||||
return RETRIEVAL_METRICS
|
||||
case EvaluationCategory.AGENT:
|
||||
return AGENT_METRICS
|
||||
case EvaluationCategory.WORKFLOW:
|
||||
return WORKFLOW_METRICS
|
||||
case _:
|
||||
return []
|
||||
|
||||
def evaluate_llm(
|
||||
self,
|
||||
items: list[EvaluationItemInput],
|
||||
metrics_config: dict,
|
||||
model_provider: str,
|
||||
model_name: str,
|
||||
tenant_id: str,
|
||||
) -> list[EvaluationItemResult]:
|
||||
return self._evaluate(items, metrics_config, model_provider, model_name, tenant_id, EvaluationCategory.LLM)
|
||||
|
||||
def evaluate_retrieval(
|
||||
self,
|
||||
items: list[EvaluationItemInput],
|
||||
metrics_config: dict,
|
||||
model_provider: str,
|
||||
model_name: str,
|
||||
tenant_id: str,
|
||||
) -> list[EvaluationItemResult]:
|
||||
return self._evaluate(
|
||||
items, metrics_config, model_provider, model_name, tenant_id, EvaluationCategory.RETRIEVAL
|
||||
)
|
||||
|
||||
def evaluate_agent(
|
||||
self,
|
||||
items: list[EvaluationItemInput],
|
||||
metrics_config: dict,
|
||||
model_provider: str,
|
||||
model_name: str,
|
||||
tenant_id: str,
|
||||
) -> list[EvaluationItemResult]:
|
||||
return self._evaluate(items, metrics_config, model_provider, model_name, tenant_id, EvaluationCategory.AGENT)
|
||||
|
||||
def evaluate_workflow(
|
||||
self,
|
||||
items: list[EvaluationItemInput],
|
||||
metrics_config: dict,
|
||||
model_provider: str,
|
||||
model_name: str,
|
||||
tenant_id: str,
|
||||
) -> list[EvaluationItemResult]:
|
||||
return self._evaluate(
|
||||
items, metrics_config, model_provider, model_name, tenant_id, EvaluationCategory.WORKFLOW
|
||||
)
|
||||
|
||||
def _evaluate(
|
||||
self,
|
||||
items: list[EvaluationItemInput],
|
||||
metrics_config: dict,
|
||||
model_provider: str,
|
||||
model_name: str,
|
||||
tenant_id: str,
|
||||
category: EvaluationCategory,
|
||||
) -> list[EvaluationItemResult]:
|
||||
"""Core evaluation logic using RAGAS.
|
||||
|
||||
Uses the Dify model wrapper as judge LLM. Falls back to simple
|
||||
string similarity if RAGAS import fails.
|
||||
"""
|
||||
model_wrapper = DifyModelWrapper(model_provider, model_name, tenant_id)
|
||||
requested_metrics = metrics_config.get("metrics", self.get_supported_metrics(category))
|
||||
|
||||
try:
|
||||
return self._evaluate_with_ragas(items, requested_metrics, model_wrapper, category)
|
||||
except ImportError:
|
||||
logger.warning("RAGAS not installed, falling back to simple evaluation")
|
||||
return self._evaluate_simple(items, requested_metrics, model_wrapper)
|
||||
|
||||
def _evaluate_with_ragas(
|
||||
self,
|
||||
items: list[EvaluationItemInput],
|
||||
requested_metrics: list[str],
|
||||
model_wrapper: DifyModelWrapper,
|
||||
category: EvaluationCategory,
|
||||
) -> list[EvaluationItemResult]:
|
||||
"""Evaluate using RAGAS library."""
|
||||
from ragas import evaluate as ragas_evaluate
|
||||
from ragas.dataset_schema import EvaluationDataset, SingleTurnSample
|
||||
from ragas.llms import LangchainLLMWrapper
|
||||
from ragas.metrics import (
|
||||
Faithfulness,
|
||||
ResponseRelevancy,
|
||||
)
|
||||
|
||||
# Build RAGAS dataset
|
||||
samples = []
|
||||
for item in items:
|
||||
sample = SingleTurnSample(
|
||||
user_input=self._inputs_to_query(item.inputs),
|
||||
response=item.expected_output or "",
|
||||
retrieved_contexts=item.context or [],
|
||||
)
|
||||
if item.expected_output:
|
||||
sample.reference = item.expected_output
|
||||
samples.append(sample)
|
||||
|
||||
dataset = EvaluationDataset(samples=samples)
|
||||
|
||||
# Build metric instances
|
||||
ragas_metrics = self._build_ragas_metrics(requested_metrics)
|
||||
|
||||
if not ragas_metrics:
|
||||
logger.warning("No valid RAGAS metrics found for: %s", requested_metrics)
|
||||
return [EvaluationItemResult(index=item.index) for item in items]
|
||||
|
||||
# Run RAGAS evaluation
|
||||
try:
|
||||
result = ragas_evaluate(
|
||||
dataset=dataset,
|
||||
metrics=ragas_metrics,
|
||||
)
|
||||
|
||||
# Convert RAGAS results to our format
|
||||
results = []
|
||||
result_df = result.to_pandas()
|
||||
for i, item in enumerate(items):
|
||||
metrics = []
|
||||
for metric_name in requested_metrics:
|
||||
if metric_name in result_df.columns:
|
||||
score = result_df.iloc[i][metric_name]
|
||||
if score is not None and not (isinstance(score, float) and score != score): # NaN check
|
||||
metrics.append(EvaluationMetric(name=metric_name, score=float(score)))
|
||||
results.append(EvaluationItemResult(index=item.index, metrics=metrics))
|
||||
return results
|
||||
except Exception:
|
||||
logger.exception("RAGAS evaluation failed, falling back to simple evaluation")
|
||||
return self._evaluate_simple(items, requested_metrics, model_wrapper)
|
||||
|
||||
def _evaluate_simple(
|
||||
self,
|
||||
items: list[EvaluationItemInput],
|
||||
requested_metrics: list[str],
|
||||
model_wrapper: DifyModelWrapper,
|
||||
) -> list[EvaluationItemResult]:
|
||||
"""Simple LLM-as-judge fallback when RAGAS is not available."""
|
||||
results = []
|
||||
for item in items:
|
||||
metrics = []
|
||||
query = self._inputs_to_query(item.inputs)
|
||||
|
||||
for metric_name in requested_metrics:
|
||||
try:
|
||||
score = self._judge_with_llm(model_wrapper, metric_name, query, item)
|
||||
metrics.append(EvaluationMetric(name=metric_name, score=score))
|
||||
except Exception:
|
||||
logger.exception("Failed to compute metric %s for item %d", metric_name, item.index)
|
||||
|
||||
results.append(EvaluationItemResult(index=item.index, metrics=metrics))
|
||||
return results
|
||||
|
||||
def _judge_with_llm(
|
||||
self,
|
||||
model_wrapper: DifyModelWrapper,
|
||||
metric_name: str,
|
||||
query: str,
|
||||
item: EvaluationItemInput,
|
||||
) -> float:
|
||||
"""Use the LLM to judge a single metric for a single item."""
|
||||
prompt = self._build_judge_prompt(metric_name, query, item)
|
||||
response = model_wrapper.invoke(prompt)
|
||||
return self._parse_score(response)
|
||||
|
||||
def _build_judge_prompt(self, metric_name: str, query: str, item: EvaluationItemInput) -> str:
|
||||
"""Build a scoring prompt for the LLM judge."""
|
||||
parts = [
|
||||
f"Evaluate the following on the metric '{metric_name}' using a scale of 0.0 to 1.0.",
|
||||
f"\nQuery: {query}",
|
||||
]
|
||||
if item.expected_output:
|
||||
parts.append(f"\nExpected Output: {item.expected_output}")
|
||||
if item.context:
|
||||
parts.append(f"\nContext: {'; '.join(item.context)}")
|
||||
parts.append(
|
||||
"\nRespond with ONLY a single floating point number between 0.0 and 1.0, nothing else."
|
||||
)
|
||||
return "\n".join(parts)
|
||||
|
||||
@staticmethod
|
||||
def _parse_score(response: str) -> float:
|
||||
"""Parse a float score from LLM response."""
|
||||
cleaned = response.strip()
|
||||
try:
|
||||
score = float(cleaned)
|
||||
return max(0.0, min(1.0, score))
|
||||
except ValueError:
|
||||
# Try to extract first number from response
|
||||
import re
|
||||
|
||||
match = re.search(r"(\d+\.?\d*)", cleaned)
|
||||
if match:
|
||||
score = float(match.group(1))
|
||||
return max(0.0, min(1.0, score))
|
||||
return 0.0
|
||||
|
||||
@staticmethod
|
||||
def _inputs_to_query(inputs: dict[str, Any]) -> str:
|
||||
"""Convert input dict to a query string."""
|
||||
if "query" in inputs:
|
||||
return str(inputs["query"])
|
||||
if "question" in inputs:
|
||||
return str(inputs["question"])
|
||||
# Fallback: concatenate all input values
|
||||
return " ".join(str(v) for v in inputs.values())
|
||||
|
||||
@staticmethod
|
||||
def _build_ragas_metrics(requested_metrics: list[str]) -> list[Any]:
|
||||
"""Build RAGAS metric instances from metric names."""
|
||||
try:
|
||||
from ragas.metrics import (
|
||||
AnswerCorrectness,
|
||||
AnswerRelevancy,
|
||||
AnswerSimilarity,
|
||||
ContextPrecision,
|
||||
ContextRecall,
|
||||
ContextRelevancy,
|
||||
Faithfulness,
|
||||
)
|
||||
|
||||
metric_map: dict[str, Any] = {
|
||||
"faithfulness": Faithfulness,
|
||||
"answer_relevancy": AnswerRelevancy,
|
||||
"answer_correctness": AnswerCorrectness,
|
||||
"answer_similarity": AnswerSimilarity,
|
||||
"context_precision": ContextPrecision,
|
||||
"context_recall": ContextRecall,
|
||||
"context_relevancy": ContextRelevancy,
|
||||
}
|
||||
|
||||
metrics = []
|
||||
for name in requested_metrics:
|
||||
metric_class = metric_map.get(name)
|
||||
if metric_class:
|
||||
metrics.append(metric_class())
|
||||
else:
|
||||
logger.warning("Unknown RAGAS metric: %s", name)
|
||||
return metrics
|
||||
except ImportError:
|
||||
logger.warning("RAGAS metrics not available")
|
||||
return []
|
||||
48
api/core/evaluation/frameworks/ragas/ragas_model_wrapper.py
Normal file
48
api/core/evaluation/frameworks/ragas/ragas_model_wrapper.py
Normal file
@ -0,0 +1,48 @@
|
||||
import logging
|
||||
from typing import Any
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class DifyModelWrapper:
|
||||
"""Wraps Dify's model invocation interface for use by RAGAS as an LLM judge.
|
||||
|
||||
RAGAS requires an LLM to compute certain metrics (faithfulness, answer_relevancy, etc.).
|
||||
This wrapper bridges Dify's ModelInstance to a callable that RAGAS can use.
|
||||
"""
|
||||
|
||||
def __init__(self, model_provider: str, model_name: str, tenant_id: str):
|
||||
self.model_provider = model_provider
|
||||
self.model_name = model_name
|
||||
self.tenant_id = tenant_id
|
||||
|
||||
def _get_model_instance(self) -> Any:
|
||||
from core.model_manager import ModelManager
|
||||
from core.model_runtime.entities.model_entities import ModelType
|
||||
|
||||
model_manager = ModelManager()
|
||||
model_instance = model_manager.get_model_instance(
|
||||
tenant_id=self.tenant_id,
|
||||
provider=self.model_provider,
|
||||
model_type=ModelType.LLM,
|
||||
model=self.model_name,
|
||||
)
|
||||
return model_instance
|
||||
|
||||
def invoke(self, prompt: str) -> str:
|
||||
"""Invoke the model with a text prompt and return the text response."""
|
||||
from core.model_runtime.entities.message_entities import (
|
||||
SystemPromptMessage,
|
||||
UserPromptMessage,
|
||||
)
|
||||
|
||||
model_instance = self._get_model_instance()
|
||||
result = model_instance.invoke_llm(
|
||||
prompt_messages=[
|
||||
SystemPromptMessage(content="You are an evaluation judge. Answer precisely and concisely."),
|
||||
UserPromptMessage(content=prompt),
|
||||
],
|
||||
model_parameters={"temperature": 0.0, "max_tokens": 2048},
|
||||
stream=False,
|
||||
)
|
||||
return result.message.content
|
||||
32
api/core/evaluation/runners/__init__.py
Normal file
32
api/core/evaluation/runners/__init__.py
Normal file
@ -0,0 +1,32 @@
|
||||
from sqlalchemy import select
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
from models import Account, App, TenantAccountJoin
|
||||
|
||||
|
||||
def get_service_account_for_app(session: Session, app_id: str) -> Account:
|
||||
"""Get the creator account for an app with tenant context set up.
|
||||
|
||||
This follows the same pattern as BaseTraceInstance.get_service_account_with_tenant().
|
||||
"""
|
||||
app = session.scalar(select(App).where(App.id == app_id))
|
||||
if not app:
|
||||
raise ValueError(f"App with id {app_id} not found")
|
||||
|
||||
if not app.created_by:
|
||||
raise ValueError(f"App with id {app_id} has no creator")
|
||||
|
||||
account = session.scalar(select(Account).where(Account.id == app.created_by))
|
||||
if not account:
|
||||
raise ValueError(f"Creator account not found for app {app_id}")
|
||||
|
||||
current_tenant = (
|
||||
session.query(TenantAccountJoin)
|
||||
.filter_by(account_id=account.id, current=True)
|
||||
.first()
|
||||
)
|
||||
if not current_tenant:
|
||||
raise ValueError(f"Current tenant not found for account {account.id}")
|
||||
|
||||
account.set_tenant_id(current_tenant.tenant_id)
|
||||
return account
|
||||
152
api/core/evaluation/runners/agent_evaluation_runner.py
Normal file
152
api/core/evaluation/runners/agent_evaluation_runner.py
Normal file
@ -0,0 +1,152 @@
|
||||
import logging
|
||||
from typing import Any, Mapping, Union
|
||||
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
from core.evaluation.base_evaluation_instance import BaseEvaluationInstance
|
||||
from core.evaluation.entities.evaluation_entity import (
|
||||
EvaluationItemInput,
|
||||
EvaluationItemResult,
|
||||
)
|
||||
from core.evaluation.runners.base_evaluation_runner import BaseEvaluationRunner
|
||||
from models.model import App, AppMode
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class AgentEvaluationRunner(BaseEvaluationRunner):
|
||||
"""Runner for agent evaluation: executes agent-type App, collects tool calls and final output."""
|
||||
|
||||
def __init__(self, evaluation_instance: BaseEvaluationInstance, session: Session):
|
||||
super().__init__(evaluation_instance, session)
|
||||
|
||||
def execute_target(
|
||||
self,
|
||||
tenant_id: str,
|
||||
target_id: str,
|
||||
target_type: str,
|
||||
item: EvaluationItemInput,
|
||||
) -> EvaluationItemResult:
|
||||
"""Execute agent app and collect response with tool call information."""
|
||||
from core.app.apps.agent_chat.app_generator import AgentChatAppGenerator
|
||||
from core.evaluation.runners import get_service_account_for_app
|
||||
from core.app.entities.app_invoke_entities import InvokeFrom
|
||||
|
||||
app = self.session.query(App).filter_by(id=target_id).first()
|
||||
if not app:
|
||||
raise ValueError(f"App {target_id} not found")
|
||||
|
||||
service_account = get_service_account_for_app(self.session, target_id)
|
||||
|
||||
query = self._extract_query(item.inputs)
|
||||
args: dict[str, Any] = {
|
||||
"inputs": item.inputs,
|
||||
"query": query,
|
||||
}
|
||||
|
||||
generator = AgentChatAppGenerator()
|
||||
# Agent chat requires streaming - collect full response
|
||||
response_generator = generator.generate(
|
||||
app_model=app,
|
||||
user=service_account,
|
||||
args=args,
|
||||
invoke_from=InvokeFrom.SERVICE_API,
|
||||
streaming=True,
|
||||
)
|
||||
|
||||
# Consume the stream to get the full response
|
||||
actual_output, tool_calls = self._consume_agent_stream(response_generator)
|
||||
|
||||
return EvaluationItemResult(
|
||||
index=item.index,
|
||||
actual_output=actual_output,
|
||||
metadata={"tool_calls": tool_calls},
|
||||
)
|
||||
|
||||
def evaluate_metrics(
|
||||
self,
|
||||
items: list[EvaluationItemInput],
|
||||
results: list[EvaluationItemResult],
|
||||
metrics_config: dict,
|
||||
model_provider: str,
|
||||
model_name: str,
|
||||
tenant_id: str,
|
||||
) -> list[EvaluationItemResult]:
|
||||
"""Compute agent evaluation metrics."""
|
||||
result_by_index = {r.index: r for r in results}
|
||||
merged_items = []
|
||||
for item in items:
|
||||
result = result_by_index.get(item.index)
|
||||
context = []
|
||||
if result and result.actual_output:
|
||||
context.append(result.actual_output)
|
||||
merged_items.append(
|
||||
EvaluationItemInput(
|
||||
index=item.index,
|
||||
inputs=item.inputs,
|
||||
expected_output=item.expected_output,
|
||||
context=context + (item.context or []),
|
||||
)
|
||||
)
|
||||
|
||||
evaluated = self.evaluation_instance.evaluate_agent(
|
||||
merged_items, metrics_config, model_provider, model_name, tenant_id
|
||||
)
|
||||
|
||||
# Merge metrics back preserving metadata
|
||||
eval_by_index = {r.index: r for r in evaluated}
|
||||
final_results = []
|
||||
for result in results:
|
||||
if result.index in eval_by_index:
|
||||
eval_result = eval_by_index[result.index]
|
||||
final_results.append(
|
||||
EvaluationItemResult(
|
||||
index=result.index,
|
||||
actual_output=result.actual_output,
|
||||
metrics=eval_result.metrics,
|
||||
metadata=result.metadata,
|
||||
error=result.error,
|
||||
)
|
||||
)
|
||||
else:
|
||||
final_results.append(result)
|
||||
return final_results
|
||||
|
||||
@staticmethod
|
||||
def _extract_query(inputs: dict[str, Any]) -> str:
|
||||
for key in ("query", "question", "input", "text"):
|
||||
if key in inputs:
|
||||
return str(inputs[key])
|
||||
values = list(inputs.values())
|
||||
return str(values[0]) if values else ""
|
||||
|
||||
@staticmethod
|
||||
def _consume_agent_stream(response_generator: Any) -> tuple[str, list[dict]]:
|
||||
"""Consume agent streaming response and extract final answer + tool calls."""
|
||||
answer_parts: list[str] = []
|
||||
tool_calls: list[dict] = []
|
||||
|
||||
try:
|
||||
for chunk in response_generator:
|
||||
if isinstance(chunk, Mapping):
|
||||
event = chunk.get("event")
|
||||
if event == "agent_thought":
|
||||
thought = chunk.get("thought", "")
|
||||
if thought:
|
||||
answer_parts.append(thought)
|
||||
tool = chunk.get("tool")
|
||||
if tool:
|
||||
tool_calls.append({
|
||||
"tool": tool,
|
||||
"tool_input": chunk.get("tool_input", ""),
|
||||
})
|
||||
elif event == "message":
|
||||
answer = chunk.get("answer", "")
|
||||
if answer:
|
||||
answer_parts.append(answer)
|
||||
elif isinstance(chunk, str):
|
||||
answer_parts.append(chunk)
|
||||
except Exception:
|
||||
logger.exception("Error consuming agent stream")
|
||||
|
||||
return "".join(answer_parts), tool_calls
|
||||
130
api/core/evaluation/runners/base_evaluation_runner.py
Normal file
130
api/core/evaluation/runners/base_evaluation_runner.py
Normal file
@ -0,0 +1,130 @@
|
||||
import json
|
||||
import logging
|
||||
from abc import ABC, abstractmethod
|
||||
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
from core.evaluation.base_evaluation_instance import BaseEvaluationInstance
|
||||
from core.evaluation.entities.evaluation_entity import (
|
||||
EvaluationItemInput,
|
||||
EvaluationItemResult,
|
||||
)
|
||||
from libs.datetime_utils import naive_utc_now
|
||||
from models.evaluation import EvaluationRun, EvaluationRunItem, EvaluationRunStatus
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class BaseEvaluationRunner(ABC):
|
||||
"""Abstract base class for evaluation runners.
|
||||
|
||||
Runners are responsible for executing the target (App/Snippet/Retrieval)
|
||||
to collect actual outputs, then delegating to the evaluation instance
|
||||
for metric computation.
|
||||
"""
|
||||
|
||||
def __init__(self, evaluation_instance: BaseEvaluationInstance, session: Session):
|
||||
self.evaluation_instance = evaluation_instance
|
||||
self.session = session
|
||||
|
||||
@abstractmethod
|
||||
def execute_target(
|
||||
self,
|
||||
tenant_id: str,
|
||||
target_id: str,
|
||||
target_type: str,
|
||||
item: EvaluationItemInput,
|
||||
) -> EvaluationItemResult:
|
||||
"""Execute the evaluation target for a single item and return the result with actual_output populated."""
|
||||
...
|
||||
|
||||
@abstractmethod
|
||||
def evaluate_metrics(
|
||||
self,
|
||||
items: list[EvaluationItemInput],
|
||||
results: list[EvaluationItemResult],
|
||||
metrics_config: dict,
|
||||
model_provider: str,
|
||||
model_name: str,
|
||||
tenant_id: str,
|
||||
) -> list[EvaluationItemResult]:
|
||||
"""Compute evaluation metrics on the collected results."""
|
||||
...
|
||||
|
||||
def run(
|
||||
self,
|
||||
evaluation_run_id: str,
|
||||
tenant_id: str,
|
||||
target_id: str,
|
||||
target_type: str,
|
||||
items: list[EvaluationItemInput],
|
||||
metrics_config: dict,
|
||||
model_provider: str,
|
||||
model_name: str,
|
||||
) -> list[EvaluationItemResult]:
|
||||
"""Orchestrate target execution + metric evaluation for all items."""
|
||||
evaluation_run = self.session.query(EvaluationRun).filter_by(id=evaluation_run_id).first()
|
||||
if not evaluation_run:
|
||||
raise ValueError(f"EvaluationRun {evaluation_run_id} not found")
|
||||
|
||||
# Update status to running
|
||||
evaluation_run.status = EvaluationRunStatus.RUNNING
|
||||
evaluation_run.started_at = naive_utc_now()
|
||||
self.session.commit()
|
||||
|
||||
results: list[EvaluationItemResult] = []
|
||||
|
||||
# Phase 1: Execute target for each item
|
||||
for item in items:
|
||||
try:
|
||||
result = self.execute_target(tenant_id, target_id, target_type, item)
|
||||
results.append(result)
|
||||
evaluation_run.completed_items += 1
|
||||
except Exception as e:
|
||||
logger.exception("Failed to execute target for item %d", item.index)
|
||||
results.append(
|
||||
EvaluationItemResult(
|
||||
index=item.index,
|
||||
error=str(e),
|
||||
)
|
||||
)
|
||||
evaluation_run.failed_items += 1
|
||||
self.session.commit()
|
||||
|
||||
# Phase 2: Compute metrics on successful results
|
||||
successful_items = [item for item, result in zip(items, results) if result.error is None]
|
||||
successful_results = [r for r in results if r.error is None]
|
||||
|
||||
if successful_items and successful_results:
|
||||
try:
|
||||
evaluated_results = self.evaluate_metrics(
|
||||
successful_items, successful_results, metrics_config, model_provider, model_name, tenant_id
|
||||
)
|
||||
# Merge evaluated metrics back into results
|
||||
evaluated_by_index = {r.index: r for r in evaluated_results}
|
||||
for i, result in enumerate(results):
|
||||
if result.index in evaluated_by_index:
|
||||
results[i] = evaluated_by_index[result.index]
|
||||
except Exception:
|
||||
logger.exception("Failed to compute metrics for evaluation run %s", evaluation_run_id)
|
||||
|
||||
# Phase 3: Persist individual items
|
||||
for result in results:
|
||||
item_input = next((item for item in items if item.index == result.index), None)
|
||||
run_item = EvaluationRunItem(
|
||||
evaluation_run_id=evaluation_run_id,
|
||||
item_index=result.index,
|
||||
inputs=json.dumps(item_input.inputs) if item_input else None,
|
||||
expected_output=item_input.expected_output if item_input else None,
|
||||
context=json.dumps(item_input.context) if item_input and item_input.context else None,
|
||||
actual_output=result.actual_output,
|
||||
metrics=json.dumps([m.model_dump() for m in result.metrics]) if result.metrics else None,
|
||||
metadata_json=json.dumps(result.metadata) if result.metadata else None,
|
||||
error=result.error,
|
||||
overall_score=result.overall_score,
|
||||
)
|
||||
self.session.add(run_item)
|
||||
|
||||
self.session.commit()
|
||||
|
||||
return results
|
||||
152
api/core/evaluation/runners/llm_evaluation_runner.py
Normal file
152
api/core/evaluation/runners/llm_evaluation_runner.py
Normal file
@ -0,0 +1,152 @@
|
||||
import logging
|
||||
from typing import Any, Mapping, Union
|
||||
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
from core.evaluation.base_evaluation_instance import BaseEvaluationInstance
|
||||
from core.evaluation.entities.evaluation_entity import (
|
||||
EvaluationItemInput,
|
||||
EvaluationItemResult,
|
||||
)
|
||||
from core.evaluation.runners.base_evaluation_runner import BaseEvaluationRunner
|
||||
from models.model import App, AppMode
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class LLMEvaluationRunner(BaseEvaluationRunner):
|
||||
"""Runner for LLM evaluation: executes App to get responses, then evaluates."""
|
||||
|
||||
def __init__(self, evaluation_instance: BaseEvaluationInstance, session: Session):
|
||||
super().__init__(evaluation_instance, session)
|
||||
|
||||
def execute_target(
|
||||
self,
|
||||
tenant_id: str,
|
||||
target_id: str,
|
||||
target_type: str,
|
||||
item: EvaluationItemInput,
|
||||
) -> EvaluationItemResult:
|
||||
"""Execute the App/Snippet with the given inputs and collect the response."""
|
||||
from core.app.apps.completion.app_generator import CompletionAppGenerator
|
||||
from core.app.apps.workflow.app_generator import WorkflowAppGenerator
|
||||
from core.evaluation.runners import get_service_account_for_app
|
||||
from core.app.entities.app_invoke_entities import InvokeFrom
|
||||
from services.workflow_service import WorkflowService
|
||||
|
||||
app = self.session.query(App).filter_by(id=target_id).first()
|
||||
if not app:
|
||||
raise ValueError(f"App {target_id} not found")
|
||||
|
||||
# Get a service account for invocation
|
||||
service_account = get_service_account_for_app(self.session, target_id)
|
||||
|
||||
app_mode = AppMode.value_of(app.mode)
|
||||
|
||||
# Build args from evaluation item inputs
|
||||
args: dict[str, Any] = {
|
||||
"inputs": item.inputs,
|
||||
}
|
||||
# For completion/chat modes, first text input becomes query
|
||||
if app_mode in (AppMode.COMPLETION, AppMode.CHAT):
|
||||
query = self._extract_query(item.inputs)
|
||||
args["query"] = query
|
||||
|
||||
if app_mode in (AppMode.WORKFLOW, AppMode.ADVANCED_CHAT):
|
||||
workflow_service = WorkflowService()
|
||||
workflow = workflow_service.get_published_workflow(app_model=app)
|
||||
if not workflow:
|
||||
raise ValueError(f"No published workflow found for app {target_id}")
|
||||
|
||||
generator = WorkflowAppGenerator()
|
||||
response: Mapping[str, Any] = generator.generate(
|
||||
app_model=app,
|
||||
workflow=workflow,
|
||||
user=service_account,
|
||||
args=args,
|
||||
invoke_from=InvokeFrom.SERVICE_API,
|
||||
streaming=False,
|
||||
)
|
||||
elif app_mode == AppMode.COMPLETION:
|
||||
generator = CompletionAppGenerator()
|
||||
response = generator.generate(
|
||||
app_model=app,
|
||||
user=service_account,
|
||||
args=args,
|
||||
invoke_from=InvokeFrom.SERVICE_API,
|
||||
streaming=False,
|
||||
)
|
||||
else:
|
||||
raise ValueError(f"Unsupported app mode for LLM evaluation: {app_mode}")
|
||||
|
||||
actual_output = self._extract_output(response)
|
||||
return EvaluationItemResult(
|
||||
index=item.index,
|
||||
actual_output=actual_output,
|
||||
)
|
||||
|
||||
def evaluate_metrics(
|
||||
self,
|
||||
items: list[EvaluationItemInput],
|
||||
results: list[EvaluationItemResult],
|
||||
metrics_config: dict,
|
||||
model_provider: str,
|
||||
model_name: str,
|
||||
tenant_id: str,
|
||||
) -> list[EvaluationItemResult]:
|
||||
"""Use the evaluation instance to compute LLM metrics."""
|
||||
# Merge actual_output into items for evaluation
|
||||
merged_items = self._merge_results_into_items(items, results)
|
||||
return self.evaluation_instance.evaluate_llm(
|
||||
merged_items, metrics_config, model_provider, model_name, tenant_id
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def _extract_query(inputs: dict[str, Any]) -> str:
|
||||
"""Extract query from inputs."""
|
||||
for key in ("query", "question", "input", "text"):
|
||||
if key in inputs:
|
||||
return str(inputs[key])
|
||||
values = list(inputs.values())
|
||||
return str(values[0]) if values else ""
|
||||
|
||||
@staticmethod
|
||||
def _extract_output(response: Union[Mapping[str, Any], Any]) -> str:
|
||||
"""Extract text output from app response."""
|
||||
if isinstance(response, Mapping):
|
||||
# Workflow response
|
||||
if "data" in response and isinstance(response["data"], Mapping):
|
||||
outputs = response["data"].get("outputs", {})
|
||||
if isinstance(outputs, Mapping):
|
||||
values = list(outputs.values())
|
||||
return str(values[0]) if values else ""
|
||||
return str(outputs)
|
||||
# Completion response
|
||||
if "answer" in response:
|
||||
return str(response["answer"])
|
||||
if "text" in response:
|
||||
return str(response["text"])
|
||||
return str(response)
|
||||
|
||||
@staticmethod
|
||||
def _merge_results_into_items(
|
||||
items: list[EvaluationItemInput],
|
||||
results: list[EvaluationItemResult],
|
||||
) -> list[EvaluationItemInput]:
|
||||
"""Create new items with actual_output set as expected_output context for metrics."""
|
||||
result_by_index = {r.index: r for r in results}
|
||||
merged = []
|
||||
for item in items:
|
||||
result = result_by_index.get(item.index)
|
||||
if result and result.actual_output:
|
||||
merged.append(
|
||||
EvaluationItemInput(
|
||||
index=item.index,
|
||||
inputs=item.inputs,
|
||||
expected_output=item.expected_output,
|
||||
context=[result.actual_output] + (item.context or []),
|
||||
)
|
||||
)
|
||||
else:
|
||||
merged.append(item)
|
||||
return merged
|
||||
111
api/core/evaluation/runners/retrieval_evaluation_runner.py
Normal file
111
api/core/evaluation/runners/retrieval_evaluation_runner.py
Normal file
@ -0,0 +1,111 @@
|
||||
import logging
|
||||
from typing import Any
|
||||
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
from core.evaluation.base_evaluation_instance import BaseEvaluationInstance
|
||||
from core.evaluation.entities.evaluation_entity import (
|
||||
EvaluationItemInput,
|
||||
EvaluationItemResult,
|
||||
)
|
||||
from core.evaluation.runners.base_evaluation_runner import BaseEvaluationRunner
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class RetrievalEvaluationRunner(BaseEvaluationRunner):
|
||||
"""Runner for retrieval evaluation: performs knowledge base retrieval, then evaluates."""
|
||||
|
||||
def __init__(self, evaluation_instance: BaseEvaluationInstance, session: Session):
|
||||
super().__init__(evaluation_instance, session)
|
||||
|
||||
def execute_target(
|
||||
self,
|
||||
tenant_id: str,
|
||||
target_id: str,
|
||||
target_type: str,
|
||||
item: EvaluationItemInput,
|
||||
) -> EvaluationItemResult:
|
||||
"""Execute retrieval using DatasetRetrieval and collect context documents."""
|
||||
from core.rag.retrieval.dataset_retrieval import DatasetRetrieval
|
||||
|
||||
query = self._extract_query(item.inputs)
|
||||
|
||||
dataset_retrieval = DatasetRetrieval()
|
||||
|
||||
# Use knowledge_retrieval for structured results
|
||||
try:
|
||||
from core.rag.retrieval.dataset_retrieval import KnowledgeRetrievalRequest
|
||||
|
||||
request = KnowledgeRetrievalRequest(
|
||||
query=query,
|
||||
app_id=target_id,
|
||||
tenant_id=tenant_id,
|
||||
)
|
||||
sources = dataset_retrieval.knowledge_retrieval(request)
|
||||
retrieved_contexts = [source.content for source in sources if source.content]
|
||||
except (ImportError, AttributeError):
|
||||
logger.warning("KnowledgeRetrievalRequest not available, using simple retrieval")
|
||||
retrieved_contexts = []
|
||||
|
||||
return EvaluationItemResult(
|
||||
index=item.index,
|
||||
actual_output="\n\n".join(retrieved_contexts) if retrieved_contexts else "",
|
||||
metadata={"retrieved_contexts": retrieved_contexts},
|
||||
)
|
||||
|
||||
def evaluate_metrics(
|
||||
self,
|
||||
items: list[EvaluationItemInput],
|
||||
results: list[EvaluationItemResult],
|
||||
metrics_config: dict,
|
||||
model_provider: str,
|
||||
model_name: str,
|
||||
tenant_id: str,
|
||||
) -> list[EvaluationItemResult]:
|
||||
"""Compute retrieval evaluation metrics."""
|
||||
# Merge retrieved contexts into items
|
||||
result_by_index = {r.index: r for r in results}
|
||||
merged_items = []
|
||||
for item in items:
|
||||
result = result_by_index.get(item.index)
|
||||
contexts = result.metadata.get("retrieved_contexts", []) if result else []
|
||||
merged_items.append(
|
||||
EvaluationItemInput(
|
||||
index=item.index,
|
||||
inputs=item.inputs,
|
||||
expected_output=item.expected_output,
|
||||
context=contexts,
|
||||
)
|
||||
)
|
||||
|
||||
evaluated = self.evaluation_instance.evaluate_retrieval(
|
||||
merged_items, metrics_config, model_provider, model_name, tenant_id
|
||||
)
|
||||
|
||||
# Merge metrics back into original results (preserve actual_output and metadata)
|
||||
eval_by_index = {r.index: r for r in evaluated}
|
||||
final_results = []
|
||||
for result in results:
|
||||
if result.index in eval_by_index:
|
||||
eval_result = eval_by_index[result.index]
|
||||
final_results.append(
|
||||
EvaluationItemResult(
|
||||
index=result.index,
|
||||
actual_output=result.actual_output,
|
||||
metrics=eval_result.metrics,
|
||||
metadata=result.metadata,
|
||||
error=result.error,
|
||||
)
|
||||
)
|
||||
else:
|
||||
final_results.append(result)
|
||||
return final_results
|
||||
|
||||
@staticmethod
|
||||
def _extract_query(inputs: dict[str, Any]) -> str:
|
||||
for key in ("query", "question", "input", "text"):
|
||||
if key in inputs:
|
||||
return str(inputs[key])
|
||||
values = list(inputs.values())
|
||||
return str(values[0]) if values else ""
|
||||
133
api/core/evaluation/runners/workflow_evaluation_runner.py
Normal file
133
api/core/evaluation/runners/workflow_evaluation_runner.py
Normal file
@ -0,0 +1,133 @@
|
||||
import logging
|
||||
from typing import Any, Mapping
|
||||
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
from core.evaluation.base_evaluation_instance import BaseEvaluationInstance
|
||||
from core.evaluation.entities.evaluation_entity import (
|
||||
EvaluationItemInput,
|
||||
EvaluationItemResult,
|
||||
)
|
||||
from core.evaluation.runners.base_evaluation_runner import BaseEvaluationRunner
|
||||
from models.model import App
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class WorkflowEvaluationRunner(BaseEvaluationRunner):
|
||||
"""Runner for workflow evaluation: executes workflow App in non-streaming mode."""
|
||||
|
||||
def __init__(self, evaluation_instance: BaseEvaluationInstance, session: Session):
|
||||
super().__init__(evaluation_instance, session)
|
||||
|
||||
def execute_target(
|
||||
self,
|
||||
tenant_id: str,
|
||||
target_id: str,
|
||||
target_type: str,
|
||||
item: EvaluationItemInput,
|
||||
) -> EvaluationItemResult:
|
||||
"""Execute workflow and collect outputs."""
|
||||
from core.app.apps.workflow.app_generator import WorkflowAppGenerator
|
||||
from core.evaluation.runners import get_service_account_for_app
|
||||
from core.app.entities.app_invoke_entities import InvokeFrom
|
||||
from services.workflow_service import WorkflowService
|
||||
|
||||
app = self.session.query(App).filter_by(id=target_id).first()
|
||||
if not app:
|
||||
raise ValueError(f"App {target_id} not found")
|
||||
|
||||
service_account = get_service_account_for_app(self.session, target_id)
|
||||
workflow_service = WorkflowService()
|
||||
workflow = workflow_service.get_published_workflow(app_model=app)
|
||||
if not workflow:
|
||||
raise ValueError(f"No published workflow found for app {target_id}")
|
||||
|
||||
args: dict[str, Any] = {"inputs": item.inputs}
|
||||
|
||||
generator = WorkflowAppGenerator()
|
||||
response: Mapping[str, Any] = generator.generate(
|
||||
app_model=app,
|
||||
workflow=workflow,
|
||||
user=service_account,
|
||||
args=args,
|
||||
invoke_from=InvokeFrom.SERVICE_API,
|
||||
streaming=False,
|
||||
)
|
||||
|
||||
actual_output = self._extract_output(response)
|
||||
node_executions = self._extract_node_executions(response)
|
||||
|
||||
return EvaluationItemResult(
|
||||
index=item.index,
|
||||
actual_output=actual_output,
|
||||
metadata={"node_executions": node_executions},
|
||||
)
|
||||
|
||||
def evaluate_metrics(
|
||||
self,
|
||||
items: list[EvaluationItemInput],
|
||||
results: list[EvaluationItemResult],
|
||||
metrics_config: dict,
|
||||
model_provider: str,
|
||||
model_name: str,
|
||||
tenant_id: str,
|
||||
) -> list[EvaluationItemResult]:
|
||||
"""Compute workflow evaluation metrics (end-to-end)."""
|
||||
result_by_index = {r.index: r for r in results}
|
||||
merged_items = []
|
||||
for item in items:
|
||||
result = result_by_index.get(item.index)
|
||||
context = []
|
||||
if result and result.actual_output:
|
||||
context.append(result.actual_output)
|
||||
merged_items.append(
|
||||
EvaluationItemInput(
|
||||
index=item.index,
|
||||
inputs=item.inputs,
|
||||
expected_output=item.expected_output,
|
||||
context=context + (item.context or []),
|
||||
)
|
||||
)
|
||||
|
||||
evaluated = self.evaluation_instance.evaluate_workflow(
|
||||
merged_items, metrics_config, model_provider, model_name, tenant_id
|
||||
)
|
||||
|
||||
# Merge metrics back preserving metadata
|
||||
eval_by_index = {r.index: r for r in evaluated}
|
||||
final_results = []
|
||||
for result in results:
|
||||
if result.index in eval_by_index:
|
||||
eval_result = eval_by_index[result.index]
|
||||
final_results.append(
|
||||
EvaluationItemResult(
|
||||
index=result.index,
|
||||
actual_output=result.actual_output,
|
||||
metrics=eval_result.metrics,
|
||||
metadata=result.metadata,
|
||||
error=result.error,
|
||||
)
|
||||
)
|
||||
else:
|
||||
final_results.append(result)
|
||||
return final_results
|
||||
|
||||
@staticmethod
|
||||
def _extract_output(response: Mapping[str, Any]) -> str:
|
||||
"""Extract text output from workflow response."""
|
||||
if "data" in response and isinstance(response["data"], Mapping):
|
||||
outputs = response["data"].get("outputs", {})
|
||||
if isinstance(outputs, Mapping):
|
||||
values = list(outputs.values())
|
||||
return str(values[0]) if values else ""
|
||||
return str(outputs)
|
||||
return str(response)
|
||||
|
||||
@staticmethod
|
||||
def _extract_node_executions(response: Mapping[str, Any]) -> list[dict]:
|
||||
"""Extract node execution trace from workflow response."""
|
||||
data = response.get("data", {})
|
||||
if isinstance(data, Mapping):
|
||||
return data.get("node_executions", [])
|
||||
return []
|
||||
@ -0,0 +1,113 @@
|
||||
"""add_evaluation_tables
|
||||
|
||||
Revision ID: a1b2c3d4e5f6
|
||||
Revises: 1c05e80d2380
|
||||
Create Date: 2026-03-03 00:01:00.000000
|
||||
|
||||
"""
|
||||
|
||||
import sqlalchemy as sa
|
||||
from alembic import op
|
||||
|
||||
import models as models
|
||||
|
||||
|
||||
# revision identifiers, used by Alembic.
|
||||
revision = "a1b2c3d4e5f6"
|
||||
down_revision = "1c05e80d2380"
|
||||
branch_labels = None
|
||||
depends_on = None
|
||||
|
||||
|
||||
def upgrade():
|
||||
# evaluation_configurations
|
||||
op.create_table(
|
||||
"evaluation_configurations",
|
||||
sa.Column("id", models.types.StringUUID(), nullable=False),
|
||||
sa.Column("tenant_id", models.types.StringUUID(), nullable=False),
|
||||
sa.Column("target_type", sa.String(length=20), nullable=False),
|
||||
sa.Column("target_id", models.types.StringUUID(), nullable=False),
|
||||
sa.Column("evaluation_model_provider", sa.String(length=255), nullable=True),
|
||||
sa.Column("evaluation_model", sa.String(length=255), nullable=True),
|
||||
sa.Column("metrics_config", models.types.LongText(), nullable=True),
|
||||
sa.Column("judgement_conditions", models.types.LongText(), nullable=True),
|
||||
sa.Column("created_by", models.types.StringUUID(), nullable=False),
|
||||
sa.Column("updated_by", models.types.StringUUID(), nullable=False),
|
||||
sa.Column("created_at", sa.DateTime(), server_default=sa.func.current_timestamp(), nullable=False),
|
||||
sa.Column("updated_at", sa.DateTime(), server_default=sa.func.current_timestamp(), nullable=False),
|
||||
sa.PrimaryKeyConstraint("id", name="evaluation_configuration_pkey"),
|
||||
sa.UniqueConstraint("tenant_id", "target_type", "target_id", name="evaluation_configuration_unique"),
|
||||
)
|
||||
with op.batch_alter_table("evaluation_configurations", schema=None) as batch_op:
|
||||
batch_op.create_index(
|
||||
"evaluation_configuration_target_idx", ["tenant_id", "target_type", "target_id"], unique=False
|
||||
)
|
||||
|
||||
# evaluation_runs
|
||||
op.create_table(
|
||||
"evaluation_runs",
|
||||
sa.Column("id", models.types.StringUUID(), nullable=False),
|
||||
sa.Column("tenant_id", models.types.StringUUID(), nullable=False),
|
||||
sa.Column("target_type", sa.String(length=20), nullable=False),
|
||||
sa.Column("target_id", models.types.StringUUID(), nullable=False),
|
||||
sa.Column("evaluation_config_id", models.types.StringUUID(), nullable=False),
|
||||
sa.Column("status", sa.String(length=20), nullable=False, server_default=sa.text("'pending'")),
|
||||
sa.Column("dataset_file_id", models.types.StringUUID(), nullable=True),
|
||||
sa.Column("result_file_id", models.types.StringUUID(), nullable=True),
|
||||
sa.Column("total_items", sa.Integer(), nullable=False, server_default=sa.text("0")),
|
||||
sa.Column("completed_items", sa.Integer(), nullable=False, server_default=sa.text("0")),
|
||||
sa.Column("failed_items", sa.Integer(), nullable=False, server_default=sa.text("0")),
|
||||
sa.Column("metrics_summary", models.types.LongText(), nullable=True),
|
||||
sa.Column("error", sa.Text(), nullable=True),
|
||||
sa.Column("celery_task_id", sa.String(length=255), nullable=True),
|
||||
sa.Column("created_by", models.types.StringUUID(), nullable=False),
|
||||
sa.Column("started_at", sa.DateTime(), nullable=True),
|
||||
sa.Column("completed_at", sa.DateTime(), nullable=True),
|
||||
sa.Column("created_at", sa.DateTime(), server_default=sa.func.current_timestamp(), nullable=False),
|
||||
sa.Column("updated_at", sa.DateTime(), server_default=sa.func.current_timestamp(), nullable=False),
|
||||
sa.PrimaryKeyConstraint("id", name="evaluation_run_pkey"),
|
||||
)
|
||||
with op.batch_alter_table("evaluation_runs", schema=None) as batch_op:
|
||||
batch_op.create_index(
|
||||
"evaluation_run_target_idx", ["tenant_id", "target_type", "target_id"], unique=False
|
||||
)
|
||||
batch_op.create_index("evaluation_run_status_idx", ["tenant_id", "status"], unique=False)
|
||||
|
||||
# evaluation_run_items
|
||||
op.create_table(
|
||||
"evaluation_run_items",
|
||||
sa.Column("id", models.types.StringUUID(), nullable=False),
|
||||
sa.Column("evaluation_run_id", models.types.StringUUID(), nullable=False),
|
||||
sa.Column("item_index", sa.Integer(), nullable=False),
|
||||
sa.Column("inputs", models.types.LongText(), nullable=True),
|
||||
sa.Column("expected_output", models.types.LongText(), nullable=True),
|
||||
sa.Column("context", models.types.LongText(), nullable=True),
|
||||
sa.Column("actual_output", models.types.LongText(), nullable=True),
|
||||
sa.Column("metrics", models.types.LongText(), nullable=True),
|
||||
sa.Column("metadata_json", models.types.LongText(), nullable=True),
|
||||
sa.Column("error", sa.Text(), nullable=True),
|
||||
sa.Column("overall_score", sa.Float(), nullable=True),
|
||||
sa.Column("created_at", sa.DateTime(), server_default=sa.func.current_timestamp(), nullable=False),
|
||||
sa.PrimaryKeyConstraint("id", name="evaluation_run_item_pkey"),
|
||||
)
|
||||
with op.batch_alter_table("evaluation_run_items", schema=None) as batch_op:
|
||||
batch_op.create_index("evaluation_run_item_run_idx", ["evaluation_run_id"], unique=False)
|
||||
batch_op.create_index(
|
||||
"evaluation_run_item_index_idx", ["evaluation_run_id", "item_index"], unique=False
|
||||
)
|
||||
|
||||
|
||||
def downgrade():
|
||||
with op.batch_alter_table("evaluation_run_items", schema=None) as batch_op:
|
||||
batch_op.drop_index("evaluation_run_item_index_idx")
|
||||
batch_op.drop_index("evaluation_run_item_run_idx")
|
||||
op.drop_table("evaluation_run_items")
|
||||
|
||||
with op.batch_alter_table("evaluation_runs", schema=None) as batch_op:
|
||||
batch_op.drop_index("evaluation_run_status_idx")
|
||||
batch_op.drop_index("evaluation_run_target_idx")
|
||||
op.drop_table("evaluation_runs")
|
||||
|
||||
with op.batch_alter_table("evaluation_configurations", schema=None) as batch_op:
|
||||
batch_op.drop_index("evaluation_configuration_target_idx")
|
||||
op.drop_table("evaluation_configurations")
|
||||
@ -26,6 +26,13 @@ from .dataset import (
|
||||
TidbAuthBinding,
|
||||
Whitelist,
|
||||
)
|
||||
from .evaluation import (
|
||||
EvaluationConfiguration,
|
||||
EvaluationRun,
|
||||
EvaluationRunItem,
|
||||
EvaluationRunStatus,
|
||||
EvaluationTargetType,
|
||||
)
|
||||
from .enums import (
|
||||
AppTriggerStatus,
|
||||
AppTriggerType,
|
||||
@ -158,6 +165,11 @@ __all__ = [
|
||||
"Document",
|
||||
"DocumentSegment",
|
||||
"Embedding",
|
||||
"EvaluationConfiguration",
|
||||
"EvaluationRun",
|
||||
"EvaluationRunItem",
|
||||
"EvaluationRunStatus",
|
||||
"EvaluationTargetType",
|
||||
"EndUser",
|
||||
"ExecutionExtraContent",
|
||||
"ExporleBanner",
|
||||
|
||||
193
api/models/evaluation.py
Normal file
193
api/models/evaluation.py
Normal file
@ -0,0 +1,193 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
from datetime import datetime
|
||||
from enum import StrEnum
|
||||
from typing import Any
|
||||
|
||||
import sqlalchemy as sa
|
||||
from sqlalchemy import DateTime, Float, Integer, String, Text, func
|
||||
from sqlalchemy.orm import Mapped, mapped_column
|
||||
|
||||
from libs.uuid_utils import uuidv7
|
||||
|
||||
from .base import Base
|
||||
from .types import LongText, StringUUID
|
||||
|
||||
|
||||
class EvaluationRunStatus(StrEnum):
|
||||
PENDING = "pending"
|
||||
RUNNING = "running"
|
||||
COMPLETED = "completed"
|
||||
FAILED = "failed"
|
||||
CANCELLED = "cancelled"
|
||||
|
||||
|
||||
class EvaluationTargetType(StrEnum):
|
||||
APP = "app"
|
||||
SNIPPETS = "snippets"
|
||||
|
||||
|
||||
class EvaluationConfiguration(Base):
|
||||
"""Stores evaluation configuration for each target (App or Snippet)."""
|
||||
|
||||
__tablename__ = "evaluation_configurations"
|
||||
__table_args__ = (
|
||||
sa.PrimaryKeyConstraint("id", name="evaluation_configuration_pkey"),
|
||||
sa.Index("evaluation_configuration_target_idx", "tenant_id", "target_type", "target_id"),
|
||||
sa.UniqueConstraint("tenant_id", "target_type", "target_id", name="evaluation_configuration_unique"),
|
||||
)
|
||||
|
||||
id: Mapped[str] = mapped_column(StringUUID, default=lambda: str(uuidv7()))
|
||||
tenant_id: Mapped[str] = mapped_column(StringUUID, nullable=False)
|
||||
target_type: Mapped[str] = mapped_column(String(20), nullable=False)
|
||||
target_id: Mapped[str] = mapped_column(StringUUID, nullable=False)
|
||||
|
||||
evaluation_model_provider: Mapped[str | None] = mapped_column(String(255), nullable=True)
|
||||
evaluation_model: Mapped[str | None] = mapped_column(String(255), nullable=True)
|
||||
metrics_config: Mapped[str | None] = mapped_column(LongText, nullable=True)
|
||||
judgement_conditions: Mapped[str | None] = mapped_column(LongText, nullable=True)
|
||||
|
||||
created_by: Mapped[str] = mapped_column(StringUUID, nullable=False)
|
||||
updated_by: Mapped[str] = mapped_column(StringUUID, nullable=False)
|
||||
created_at: Mapped[datetime] = mapped_column(
|
||||
DateTime, nullable=False, server_default=func.current_timestamp()
|
||||
)
|
||||
updated_at: Mapped[datetime] = mapped_column(
|
||||
DateTime, nullable=False, server_default=func.current_timestamp(), onupdate=func.current_timestamp()
|
||||
)
|
||||
|
||||
@property
|
||||
def metrics_config_dict(self) -> dict[str, Any]:
|
||||
if self.metrics_config:
|
||||
return json.loads(self.metrics_config)
|
||||
return {}
|
||||
|
||||
@metrics_config_dict.setter
|
||||
def metrics_config_dict(self, value: dict[str, Any]) -> None:
|
||||
self.metrics_config = json.dumps(value)
|
||||
|
||||
@property
|
||||
def judgement_conditions_dict(self) -> dict[str, Any]:
|
||||
if self.judgement_conditions:
|
||||
return json.loads(self.judgement_conditions)
|
||||
return {}
|
||||
|
||||
@judgement_conditions_dict.setter
|
||||
def judgement_conditions_dict(self, value: dict[str, Any]) -> None:
|
||||
self.judgement_conditions = json.dumps(value)
|
||||
|
||||
def __repr__(self) -> str:
|
||||
return f"<EvaluationConfiguration(id={self.id}, target={self.target_type}:{self.target_id})>"
|
||||
|
||||
|
||||
class EvaluationRun(Base):
|
||||
"""Stores each evaluation run record."""
|
||||
|
||||
__tablename__ = "evaluation_runs"
|
||||
__table_args__ = (
|
||||
sa.PrimaryKeyConstraint("id", name="evaluation_run_pkey"),
|
||||
sa.Index("evaluation_run_target_idx", "tenant_id", "target_type", "target_id"),
|
||||
sa.Index("evaluation_run_status_idx", "tenant_id", "status"),
|
||||
)
|
||||
|
||||
id: Mapped[str] = mapped_column(StringUUID, default=lambda: str(uuidv7()))
|
||||
tenant_id: Mapped[str] = mapped_column(StringUUID, nullable=False)
|
||||
target_type: Mapped[str] = mapped_column(String(20), nullable=False)
|
||||
target_id: Mapped[str] = mapped_column(StringUUID, nullable=False)
|
||||
evaluation_config_id: Mapped[str] = mapped_column(StringUUID, nullable=False)
|
||||
|
||||
status: Mapped[str] = mapped_column(
|
||||
String(20), nullable=False, default=EvaluationRunStatus.PENDING
|
||||
)
|
||||
dataset_file_id: Mapped[str | None] = mapped_column(StringUUID, nullable=True)
|
||||
result_file_id: Mapped[str | None] = mapped_column(StringUUID, nullable=True)
|
||||
|
||||
total_items: Mapped[int] = mapped_column(Integer, nullable=False, default=0)
|
||||
completed_items: Mapped[int] = mapped_column(Integer, nullable=False, default=0)
|
||||
failed_items: Mapped[int] = mapped_column(Integer, nullable=False, default=0)
|
||||
|
||||
metrics_summary: Mapped[str | None] = mapped_column(LongText, nullable=True)
|
||||
error: Mapped[str | None] = mapped_column(Text, nullable=True)
|
||||
|
||||
celery_task_id: Mapped[str | None] = mapped_column(String(255), nullable=True)
|
||||
|
||||
created_by: Mapped[str] = mapped_column(StringUUID, nullable=False)
|
||||
started_at: Mapped[datetime | None] = mapped_column(DateTime, nullable=True)
|
||||
completed_at: Mapped[datetime | None] = mapped_column(DateTime, nullable=True)
|
||||
created_at: Mapped[datetime] = mapped_column(
|
||||
DateTime, nullable=False, server_default=func.current_timestamp()
|
||||
)
|
||||
updated_at: Mapped[datetime] = mapped_column(
|
||||
DateTime, nullable=False, server_default=func.current_timestamp(), onupdate=func.current_timestamp()
|
||||
)
|
||||
|
||||
@property
|
||||
def metrics_summary_dict(self) -> dict[str, Any]:
|
||||
if self.metrics_summary:
|
||||
return json.loads(self.metrics_summary)
|
||||
return {}
|
||||
|
||||
@metrics_summary_dict.setter
|
||||
def metrics_summary_dict(self, value: dict[str, Any]) -> None:
|
||||
self.metrics_summary = json.dumps(value)
|
||||
|
||||
@property
|
||||
def progress(self) -> float:
|
||||
if self.total_items == 0:
|
||||
return 0.0
|
||||
return (self.completed_items + self.failed_items) / self.total_items
|
||||
|
||||
def __repr__(self) -> str:
|
||||
return f"<EvaluationRun(id={self.id}, status={self.status})>"
|
||||
|
||||
|
||||
class EvaluationRunItem(Base):
|
||||
"""Stores per-row evaluation results."""
|
||||
|
||||
__tablename__ = "evaluation_run_items"
|
||||
__table_args__ = (
|
||||
sa.PrimaryKeyConstraint("id", name="evaluation_run_item_pkey"),
|
||||
sa.Index("evaluation_run_item_run_idx", "evaluation_run_id"),
|
||||
sa.Index("evaluation_run_item_index_idx", "evaluation_run_id", "item_index"),
|
||||
)
|
||||
|
||||
id: Mapped[str] = mapped_column(StringUUID, default=lambda: str(uuidv7()))
|
||||
evaluation_run_id: Mapped[str] = mapped_column(StringUUID, nullable=False)
|
||||
|
||||
item_index: Mapped[int] = mapped_column(Integer, nullable=False)
|
||||
inputs: Mapped[str | None] = mapped_column(LongText, nullable=True)
|
||||
expected_output: Mapped[str | None] = mapped_column(LongText, nullable=True)
|
||||
context: Mapped[str | None] = mapped_column(LongText, nullable=True)
|
||||
actual_output: Mapped[str | None] = mapped_column(LongText, nullable=True)
|
||||
|
||||
metrics: Mapped[str | None] = mapped_column(LongText, nullable=True)
|
||||
metadata_json: Mapped[str | None] = mapped_column(LongText, nullable=True)
|
||||
error: Mapped[str | None] = mapped_column(Text, nullable=True)
|
||||
|
||||
overall_score: Mapped[float | None] = mapped_column(Float, nullable=True)
|
||||
|
||||
created_at: Mapped[datetime] = mapped_column(
|
||||
DateTime, nullable=False, server_default=func.current_timestamp()
|
||||
)
|
||||
|
||||
@property
|
||||
def inputs_dict(self) -> dict[str, Any]:
|
||||
if self.inputs:
|
||||
return json.loads(self.inputs)
|
||||
return {}
|
||||
|
||||
@property
|
||||
def metrics_list(self) -> list[dict[str, Any]]:
|
||||
if self.metrics:
|
||||
return json.loads(self.metrics)
|
||||
return []
|
||||
|
||||
@property
|
||||
def metadata_dict(self) -> dict[str, Any]:
|
||||
if self.metadata_json:
|
||||
return json.loads(self.metadata_json)
|
||||
return {}
|
||||
|
||||
def __repr__(self) -> str:
|
||||
return f"<EvaluationRunItem(id={self.id}, run={self.evaluation_run_id}, index={self.item_index})>"
|
||||
@ -199,6 +199,12 @@ storage = [
|
||||
############################################################
|
||||
tools = ["cloudscraper~=1.2.71", "nltk~=3.9.1"]
|
||||
|
||||
############################################################
|
||||
# [ Evaluation ] dependency group
|
||||
# Required for evaluation frameworks
|
||||
############################################################
|
||||
evaluation = ["ragas>=0.2.0"]
|
||||
|
||||
############################################################
|
||||
# [ VDB ] dependency group
|
||||
# Required by vector store clients
|
||||
|
||||
21
api/services/errors/evaluation.py
Normal file
21
api/services/errors/evaluation.py
Normal file
@ -0,0 +1,21 @@
|
||||
from services.errors.base import BaseServiceError
|
||||
|
||||
|
||||
class EvaluationFrameworkNotConfiguredError(BaseServiceError):
|
||||
def __init__(self, description: str | None = None):
|
||||
super().__init__(description or "Evaluation framework is not configured. Set EVALUATION_FRAMEWORK env var.")
|
||||
|
||||
|
||||
class EvaluationNotFoundError(BaseServiceError):
|
||||
def __init__(self, description: str | None = None):
|
||||
super().__init__(description or "Evaluation not found.")
|
||||
|
||||
|
||||
class EvaluationDatasetInvalidError(BaseServiceError):
|
||||
def __init__(self, description: str | None = None):
|
||||
super().__init__(description or "Evaluation dataset is invalid.")
|
||||
|
||||
|
||||
class EvaluationMaxConcurrentRunsError(BaseServiceError):
|
||||
def __init__(self, description: str | None = None):
|
||||
super().__init__(description or "Maximum number of concurrent evaluation runs reached.")
|
||||
@ -1,13 +1,34 @@
|
||||
import io
|
||||
import json
|
||||
import logging
|
||||
from typing import Union
|
||||
from typing import Any, Union
|
||||
|
||||
from openpyxl import Workbook
|
||||
from openpyxl import Workbook, load_workbook
|
||||
from openpyxl.styles import Alignment, Border, Font, PatternFill, Side
|
||||
from openpyxl.utils import get_column_letter
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
from configs import dify_config
|
||||
from core.evaluation.entities.evaluation_entity import (
|
||||
EvaluationCategory,
|
||||
EvaluationItemInput,
|
||||
EvaluationRunData,
|
||||
)
|
||||
from core.evaluation.evaluation_manager import EvaluationManager
|
||||
from models.evaluation import (
|
||||
EvaluationConfiguration,
|
||||
EvaluationRun,
|
||||
EvaluationRunItem,
|
||||
EvaluationRunStatus,
|
||||
)
|
||||
from models.model import App, AppMode
|
||||
from models.snippet import CustomizedSnippet
|
||||
from services.errors.evaluation import (
|
||||
EvaluationDatasetInvalidError,
|
||||
EvaluationFrameworkNotConfiguredError,
|
||||
EvaluationMaxConcurrentRunsError,
|
||||
EvaluationNotFoundError,
|
||||
)
|
||||
from services.snippet_service import SnippetService
|
||||
from services.workflow_service import WorkflowService
|
||||
|
||||
@ -176,3 +197,264 @@ class EvaluationService:
|
||||
output.seek(0)
|
||||
|
||||
return output.getvalue()
|
||||
|
||||
# ---- Evaluation Configuration CRUD ----
|
||||
|
||||
@classmethod
|
||||
def get_evaluation_config(
|
||||
cls,
|
||||
session: Session,
|
||||
tenant_id: str,
|
||||
target_type: str,
|
||||
target_id: str,
|
||||
) -> EvaluationConfiguration | None:
|
||||
return (
|
||||
session.query(EvaluationConfiguration)
|
||||
.filter_by(tenant_id=tenant_id, target_type=target_type, target_id=target_id)
|
||||
.first()
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def save_evaluation_config(
|
||||
cls,
|
||||
session: Session,
|
||||
tenant_id: str,
|
||||
target_type: str,
|
||||
target_id: str,
|
||||
account_id: str,
|
||||
data: dict[str, Any],
|
||||
) -> EvaluationConfiguration:
|
||||
config = cls.get_evaluation_config(session, tenant_id, target_type, target_id)
|
||||
if config is None:
|
||||
config = EvaluationConfiguration(
|
||||
tenant_id=tenant_id,
|
||||
target_type=target_type,
|
||||
target_id=target_id,
|
||||
created_by=account_id,
|
||||
updated_by=account_id,
|
||||
)
|
||||
session.add(config)
|
||||
|
||||
config.evaluation_model_provider = data.get("evaluation_model_provider")
|
||||
config.evaluation_model = data.get("evaluation_model")
|
||||
config.metrics_config = json.dumps(data.get("metrics_config", {}))
|
||||
config.judgement_conditions = json.dumps(data.get("judgement_conditions", {}))
|
||||
config.updated_by = account_id
|
||||
session.commit()
|
||||
session.refresh(config)
|
||||
return config
|
||||
|
||||
# ---- Evaluation Run Management ----
|
||||
|
||||
@classmethod
|
||||
def start_evaluation_run(
|
||||
cls,
|
||||
session: Session,
|
||||
tenant_id: str,
|
||||
target_type: str,
|
||||
target_id: str,
|
||||
account_id: str,
|
||||
dataset_file_content: bytes,
|
||||
evaluation_category: EvaluationCategory,
|
||||
) -> EvaluationRun:
|
||||
"""Validate dataset, create run record, dispatch Celery task."""
|
||||
# Check framework is configured
|
||||
evaluation_instance = EvaluationManager.get_evaluation_instance()
|
||||
if evaluation_instance is None:
|
||||
raise EvaluationFrameworkNotConfiguredError()
|
||||
|
||||
# Check evaluation config exists
|
||||
config = cls.get_evaluation_config(session, tenant_id, target_type, target_id)
|
||||
if config is None:
|
||||
raise EvaluationNotFoundError("Evaluation configuration not found. Please configure evaluation first.")
|
||||
|
||||
# Check concurrent run limit
|
||||
active_runs = (
|
||||
session.query(EvaluationRun)
|
||||
.filter_by(tenant_id=tenant_id)
|
||||
.filter(EvaluationRun.status.in_([EvaluationRunStatus.PENDING, EvaluationRunStatus.RUNNING]))
|
||||
.count()
|
||||
)
|
||||
max_concurrent = dify_config.EVALUATION_MAX_CONCURRENT_RUNS
|
||||
if active_runs >= max_concurrent:
|
||||
raise EvaluationMaxConcurrentRunsError(
|
||||
f"Maximum concurrent runs ({max_concurrent}) reached."
|
||||
)
|
||||
|
||||
# Parse dataset
|
||||
items = cls._parse_dataset(dataset_file_content)
|
||||
max_rows = dify_config.EVALUATION_MAX_DATASET_ROWS
|
||||
if len(items) > max_rows:
|
||||
raise EvaluationDatasetInvalidError(f"Dataset has {len(items)} rows, max is {max_rows}.")
|
||||
|
||||
# Create evaluation run
|
||||
evaluation_run = EvaluationRun(
|
||||
tenant_id=tenant_id,
|
||||
target_type=target_type,
|
||||
target_id=target_id,
|
||||
evaluation_config_id=config.id,
|
||||
status=EvaluationRunStatus.PENDING,
|
||||
total_items=len(items),
|
||||
created_by=account_id,
|
||||
)
|
||||
session.add(evaluation_run)
|
||||
session.commit()
|
||||
session.refresh(evaluation_run)
|
||||
|
||||
# Build Celery task data
|
||||
run_data = EvaluationRunData(
|
||||
evaluation_run_id=evaluation_run.id,
|
||||
tenant_id=tenant_id,
|
||||
target_type=target_type,
|
||||
target_id=target_id,
|
||||
evaluation_category=evaluation_category,
|
||||
evaluation_model_provider=config.evaluation_model_provider or "",
|
||||
evaluation_model=config.evaluation_model or "",
|
||||
metrics_config=config.metrics_config_dict,
|
||||
items=items,
|
||||
)
|
||||
|
||||
# Dispatch Celery task
|
||||
from tasks.evaluation_task import run_evaluation
|
||||
|
||||
task = run_evaluation.delay(run_data.model_dump())
|
||||
evaluation_run.celery_task_id = task.id
|
||||
session.commit()
|
||||
|
||||
return evaluation_run
|
||||
|
||||
@classmethod
|
||||
def get_evaluation_runs(
|
||||
cls,
|
||||
session: Session,
|
||||
tenant_id: str,
|
||||
target_type: str,
|
||||
target_id: str,
|
||||
page: int = 1,
|
||||
page_size: int = 20,
|
||||
) -> tuple[list[EvaluationRun], int]:
|
||||
"""Query evaluation run history with pagination."""
|
||||
query = (
|
||||
session.query(EvaluationRun)
|
||||
.filter_by(tenant_id=tenant_id, target_type=target_type, target_id=target_id)
|
||||
.order_by(EvaluationRun.created_at.desc())
|
||||
)
|
||||
total = query.count()
|
||||
runs = query.offset((page - 1) * page_size).limit(page_size).all()
|
||||
return runs, total
|
||||
|
||||
@classmethod
|
||||
def get_evaluation_run_detail(
|
||||
cls,
|
||||
session: Session,
|
||||
tenant_id: str,
|
||||
run_id: str,
|
||||
) -> EvaluationRun:
|
||||
run = (
|
||||
session.query(EvaluationRun)
|
||||
.filter_by(id=run_id, tenant_id=tenant_id)
|
||||
.first()
|
||||
)
|
||||
if not run:
|
||||
raise EvaluationNotFoundError("Evaluation run not found.")
|
||||
return run
|
||||
|
||||
@classmethod
|
||||
def get_evaluation_run_items(
|
||||
cls,
|
||||
session: Session,
|
||||
run_id: str,
|
||||
page: int = 1,
|
||||
page_size: int = 50,
|
||||
) -> tuple[list[EvaluationRunItem], int]:
|
||||
"""Query evaluation run items with pagination."""
|
||||
query = (
|
||||
session.query(EvaluationRunItem)
|
||||
.filter_by(evaluation_run_id=run_id)
|
||||
.order_by(EvaluationRunItem.item_index.asc())
|
||||
)
|
||||
total = query.count()
|
||||
items = query.offset((page - 1) * page_size).limit(page_size).all()
|
||||
return items, total
|
||||
|
||||
@classmethod
|
||||
def cancel_evaluation_run(
|
||||
cls,
|
||||
session: Session,
|
||||
tenant_id: str,
|
||||
run_id: str,
|
||||
) -> EvaluationRun:
|
||||
run = cls.get_evaluation_run_detail(session, tenant_id, run_id)
|
||||
if run.status not in (EvaluationRunStatus.PENDING, EvaluationRunStatus.RUNNING):
|
||||
raise ValueError(f"Cannot cancel evaluation run in status: {run.status}")
|
||||
|
||||
run.status = EvaluationRunStatus.CANCELLED
|
||||
|
||||
# Revoke Celery task if running
|
||||
if run.celery_task_id:
|
||||
try:
|
||||
from celery import current_app as celery_app
|
||||
|
||||
celery_app.control.revoke(run.celery_task_id, terminate=True)
|
||||
except Exception:
|
||||
logger.exception("Failed to revoke Celery task %s", run.celery_task_id)
|
||||
|
||||
session.commit()
|
||||
return run
|
||||
|
||||
@classmethod
|
||||
def get_supported_metrics(cls, category: EvaluationCategory) -> list[str]:
|
||||
return EvaluationManager.get_supported_metrics(category)
|
||||
|
||||
# ---- Dataset Parsing ----
|
||||
|
||||
@classmethod
|
||||
def _parse_dataset(cls, xlsx_content: bytes) -> list[EvaluationItemInput]:
|
||||
"""Parse evaluation dataset from XLSX bytes."""
|
||||
wb = load_workbook(io.BytesIO(xlsx_content), read_only=True)
|
||||
ws = wb.active
|
||||
if ws is None:
|
||||
raise EvaluationDatasetInvalidError("XLSX file has no active worksheet.")
|
||||
|
||||
rows = list(ws.iter_rows(values_only=True))
|
||||
if len(rows) < 2:
|
||||
raise EvaluationDatasetInvalidError("Dataset must have at least a header row and one data row.")
|
||||
|
||||
headers = [str(h).strip() if h is not None else "" for h in rows[0]]
|
||||
if not headers or headers[0].lower() != "index":
|
||||
raise EvaluationDatasetInvalidError("First column header must be 'index'.")
|
||||
|
||||
input_headers = headers[1:] # Skip 'index'
|
||||
items = []
|
||||
for row_idx, row in enumerate(rows[1:], start=1):
|
||||
values = list(row)
|
||||
if all(v is None or str(v).strip() == "" for v in values):
|
||||
continue # Skip empty rows
|
||||
|
||||
index_val = values[0] if values else row_idx
|
||||
try:
|
||||
index = int(index_val)
|
||||
except (TypeError, ValueError):
|
||||
index = row_idx
|
||||
|
||||
inputs: dict[str, Any] = {}
|
||||
for col_idx, header in enumerate(input_headers):
|
||||
val = values[col_idx + 1] if col_idx + 1 < len(values) else None
|
||||
inputs[header] = str(val) if val is not None else ""
|
||||
|
||||
# Check for expected_output column
|
||||
expected_output = inputs.pop("expected_output", None)
|
||||
context_str = inputs.pop("context", None)
|
||||
context = context_str.split(";") if context_str else None
|
||||
|
||||
items.append(
|
||||
EvaluationItemInput(
|
||||
index=index,
|
||||
inputs=inputs,
|
||||
expected_output=expected_output,
|
||||
context=context,
|
||||
)
|
||||
)
|
||||
|
||||
wb.close()
|
||||
return items
|
||||
|
||||
309
api/tasks/evaluation_task.py
Normal file
309
api/tasks/evaluation_task.py
Normal file
@ -0,0 +1,309 @@
|
||||
import io
|
||||
import json
|
||||
import logging
|
||||
from typing import Any
|
||||
|
||||
from celery import shared_task
|
||||
from openpyxl import Workbook
|
||||
from openpyxl.styles import Alignment, Border, Font, PatternFill, Side
|
||||
from openpyxl.utils import get_column_letter
|
||||
|
||||
from core.evaluation.entities.evaluation_entity import (
|
||||
EvaluationCategory,
|
||||
EvaluationItemResult,
|
||||
EvaluationRunData,
|
||||
)
|
||||
from core.evaluation.evaluation_manager import EvaluationManager
|
||||
from core.evaluation.runners.agent_evaluation_runner import AgentEvaluationRunner
|
||||
from core.evaluation.runners.llm_evaluation_runner import LLMEvaluationRunner
|
||||
from core.evaluation.runners.retrieval_evaluation_runner import RetrievalEvaluationRunner
|
||||
from core.evaluation.runners.workflow_evaluation_runner import WorkflowEvaluationRunner
|
||||
from extensions.ext_database import db
|
||||
from libs.datetime_utils import naive_utc_now
|
||||
from models.evaluation import EvaluationRun, EvaluationRunStatus
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@shared_task(queue="evaluation")
|
||||
def run_evaluation(run_data_dict: dict[str, Any]) -> None:
|
||||
"""Celery task for running evaluations asynchronously.
|
||||
|
||||
Workflow:
|
||||
1. Deserialize EvaluationRunData
|
||||
2. Update status to RUNNING
|
||||
3. Select appropriate Runner based on evaluation_category
|
||||
4. Execute runner.run() which handles target execution + metric computation
|
||||
5. Generate result XLSX
|
||||
6. Update EvaluationRun status to COMPLETED
|
||||
"""
|
||||
run_data = EvaluationRunData.model_validate(run_data_dict)
|
||||
|
||||
with db.engine.connect() as connection:
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
session = Session(bind=connection)
|
||||
|
||||
try:
|
||||
_execute_evaluation(session, run_data)
|
||||
except Exception as e:
|
||||
logger.exception("Evaluation run %s failed", run_data.evaluation_run_id)
|
||||
_mark_run_failed(session, run_data.evaluation_run_id, str(e))
|
||||
finally:
|
||||
session.close()
|
||||
|
||||
|
||||
def _execute_evaluation(session: Any, run_data: EvaluationRunData) -> None:
|
||||
"""Core evaluation execution logic."""
|
||||
evaluation_run = session.query(EvaluationRun).filter_by(id=run_data.evaluation_run_id).first()
|
||||
if not evaluation_run:
|
||||
logger.error("EvaluationRun %s not found", run_data.evaluation_run_id)
|
||||
return
|
||||
|
||||
# Check if cancelled
|
||||
if evaluation_run.status == EvaluationRunStatus.CANCELLED:
|
||||
logger.info("EvaluationRun %s was cancelled", run_data.evaluation_run_id)
|
||||
return
|
||||
|
||||
# Get evaluation instance
|
||||
evaluation_instance = EvaluationManager.get_evaluation_instance()
|
||||
if evaluation_instance is None:
|
||||
raise ValueError("Evaluation framework not configured")
|
||||
|
||||
# Select runner based on category
|
||||
runner = _create_runner(run_data.evaluation_category, evaluation_instance, session)
|
||||
|
||||
# Execute evaluation
|
||||
results = runner.run(
|
||||
evaluation_run_id=run_data.evaluation_run_id,
|
||||
tenant_id=run_data.tenant_id,
|
||||
target_id=run_data.target_id,
|
||||
target_type=run_data.target_type,
|
||||
items=run_data.items,
|
||||
metrics_config=run_data.metrics_config,
|
||||
model_provider=run_data.evaluation_model_provider,
|
||||
model_name=run_data.evaluation_model,
|
||||
)
|
||||
|
||||
# Compute summary metrics
|
||||
metrics_summary = _compute_metrics_summary(results)
|
||||
|
||||
# Generate result XLSX
|
||||
result_xlsx = _generate_result_xlsx(run_data.items, results)
|
||||
|
||||
# Store result file
|
||||
result_file_id = _store_result_file(
|
||||
run_data.tenant_id, run_data.evaluation_run_id, result_xlsx, session
|
||||
)
|
||||
|
||||
# Update run to completed
|
||||
evaluation_run = session.query(EvaluationRun).filter_by(id=run_data.evaluation_run_id).first()
|
||||
if evaluation_run:
|
||||
evaluation_run.status = EvaluationRunStatus.COMPLETED
|
||||
evaluation_run.completed_at = naive_utc_now()
|
||||
evaluation_run.metrics_summary = json.dumps(metrics_summary)
|
||||
if result_file_id:
|
||||
evaluation_run.result_file_id = result_file_id
|
||||
session.commit()
|
||||
|
||||
logger.info("Evaluation run %s completed successfully", run_data.evaluation_run_id)
|
||||
|
||||
|
||||
def _create_runner(
|
||||
category: EvaluationCategory,
|
||||
evaluation_instance: Any,
|
||||
session: Any,
|
||||
) -> Any:
|
||||
"""Create the appropriate runner for the evaluation category."""
|
||||
match category:
|
||||
case EvaluationCategory.LLM:
|
||||
return LLMEvaluationRunner(evaluation_instance, session)
|
||||
case EvaluationCategory.RETRIEVAL:
|
||||
return RetrievalEvaluationRunner(evaluation_instance, session)
|
||||
case EvaluationCategory.AGENT:
|
||||
return AgentEvaluationRunner(evaluation_instance, session)
|
||||
case EvaluationCategory.WORKFLOW:
|
||||
return WorkflowEvaluationRunner(evaluation_instance, session)
|
||||
case _:
|
||||
raise ValueError(f"Unknown evaluation category: {category}")
|
||||
|
||||
|
||||
def _mark_run_failed(session: Any, run_id: str, error: str) -> None:
|
||||
"""Mark an evaluation run as failed."""
|
||||
try:
|
||||
evaluation_run = session.query(EvaluationRun).filter_by(id=run_id).first()
|
||||
if evaluation_run:
|
||||
evaluation_run.status = EvaluationRunStatus.FAILED
|
||||
evaluation_run.error = error[:2000] # Truncate error
|
||||
evaluation_run.completed_at = naive_utc_now()
|
||||
session.commit()
|
||||
except Exception:
|
||||
logger.exception("Failed to mark run %s as failed", run_id)
|
||||
|
||||
|
||||
def _compute_metrics_summary(results: list[EvaluationItemResult]) -> dict[str, Any]:
|
||||
"""Compute average scores per metric across all results."""
|
||||
metric_scores: dict[str, list[float]] = {}
|
||||
for result in results:
|
||||
if result.error:
|
||||
continue
|
||||
for metric in result.metrics:
|
||||
if metric.name not in metric_scores:
|
||||
metric_scores[metric.name] = []
|
||||
metric_scores[metric.name].append(metric.score)
|
||||
|
||||
summary: dict[str, Any] = {}
|
||||
for name, scores in metric_scores.items():
|
||||
summary[name] = {
|
||||
"average": sum(scores) / len(scores) if scores else 0.0,
|
||||
"min": min(scores) if scores else 0.0,
|
||||
"max": max(scores) if scores else 0.0,
|
||||
"count": len(scores),
|
||||
}
|
||||
|
||||
# Overall average
|
||||
all_scores = [s for scores in metric_scores.values() for s in scores]
|
||||
summary["_overall"] = {
|
||||
"average": sum(all_scores) / len(all_scores) if all_scores else 0.0,
|
||||
"total_items": len(results),
|
||||
"successful_items": sum(1 for r in results if r.error is None),
|
||||
"failed_items": sum(1 for r in results if r.error is not None),
|
||||
}
|
||||
|
||||
return summary
|
||||
|
||||
|
||||
def _generate_result_xlsx(
|
||||
items: list[Any],
|
||||
results: list[EvaluationItemResult],
|
||||
) -> bytes:
|
||||
"""Generate result XLSX with input data, actual output, and metric scores."""
|
||||
wb = Workbook()
|
||||
ws = wb.active
|
||||
ws.title = "Evaluation Results"
|
||||
|
||||
header_font = Font(bold=True, color="FFFFFF")
|
||||
header_fill = PatternFill(start_color="4472C4", end_color="4472C4", fill_type="solid")
|
||||
header_alignment = Alignment(horizontal="center", vertical="center")
|
||||
thin_border = Border(
|
||||
left=Side(style="thin"),
|
||||
right=Side(style="thin"),
|
||||
top=Side(style="thin"),
|
||||
bottom=Side(style="thin"),
|
||||
)
|
||||
|
||||
# Collect all metric names
|
||||
all_metric_names: list[str] = []
|
||||
for result in results:
|
||||
for metric in result.metrics:
|
||||
if metric.name not in all_metric_names:
|
||||
all_metric_names.append(metric.name)
|
||||
|
||||
# Collect all input keys
|
||||
input_keys: list[str] = []
|
||||
for item in items:
|
||||
for key in item.inputs:
|
||||
if key not in input_keys:
|
||||
input_keys.append(key)
|
||||
|
||||
# Build headers
|
||||
headers = ["index"] + input_keys + ["expected_output", "actual_output"] + all_metric_names + ["overall_score", "error"]
|
||||
|
||||
# Write header row
|
||||
for col_idx, header in enumerate(headers, start=1):
|
||||
cell = ws.cell(row=1, column=col_idx, value=header)
|
||||
cell.font = header_font
|
||||
cell.fill = header_fill
|
||||
cell.alignment = header_alignment
|
||||
cell.border = thin_border
|
||||
|
||||
# Set column widths
|
||||
ws.column_dimensions["A"].width = 10
|
||||
for col_idx in range(2, len(headers) + 1):
|
||||
ws.column_dimensions[get_column_letter(col_idx)].width = 25
|
||||
|
||||
# Build result lookup
|
||||
result_by_index = {r.index: r for r in results}
|
||||
|
||||
# Write data rows
|
||||
for row_idx, item in enumerate(items, start=2):
|
||||
result = result_by_index.get(item.index)
|
||||
|
||||
col = 1
|
||||
# Index
|
||||
ws.cell(row=row_idx, column=col, value=item.index).border = thin_border
|
||||
col += 1
|
||||
|
||||
# Input values
|
||||
for key in input_keys:
|
||||
val = item.inputs.get(key, "")
|
||||
ws.cell(row=row_idx, column=col, value=str(val)).border = thin_border
|
||||
col += 1
|
||||
|
||||
# Expected output
|
||||
ws.cell(row=row_idx, column=col, value=item.expected_output or "").border = thin_border
|
||||
col += 1
|
||||
|
||||
# Actual output
|
||||
ws.cell(row=row_idx, column=col, value=result.actual_output if result else "").border = thin_border
|
||||
col += 1
|
||||
|
||||
# Metric scores
|
||||
metric_scores = {m.name: m.score for m in result.metrics} if result else {}
|
||||
for metric_name in all_metric_names:
|
||||
score = metric_scores.get(metric_name)
|
||||
ws.cell(row=row_idx, column=col, value=score if score is not None else "").border = thin_border
|
||||
col += 1
|
||||
|
||||
# Overall score
|
||||
ws.cell(
|
||||
row=row_idx, column=col, value=result.overall_score if result else ""
|
||||
).border = thin_border
|
||||
col += 1
|
||||
|
||||
# Error
|
||||
ws.cell(row=row_idx, column=col, value=result.error if result else "").border = thin_border
|
||||
|
||||
output = io.BytesIO()
|
||||
wb.save(output)
|
||||
output.seek(0)
|
||||
return output.getvalue()
|
||||
|
||||
|
||||
def _store_result_file(
|
||||
tenant_id: str,
|
||||
run_id: str,
|
||||
xlsx_content: bytes,
|
||||
session: Any,
|
||||
) -> str | None:
|
||||
"""Store result XLSX file and return the UploadFile ID."""
|
||||
try:
|
||||
from extensions.ext_storage import storage
|
||||
from models.model import UploadFile
|
||||
|
||||
from libs.uuid_utils import uuidv7
|
||||
|
||||
file_id = str(uuidv7())
|
||||
filename = f"evaluation-result-{run_id[:8]}.xlsx"
|
||||
storage_key = f"evaluation_results/{tenant_id}/{file_id}.xlsx"
|
||||
|
||||
storage.save(storage_key, xlsx_content)
|
||||
|
||||
upload_file = UploadFile(
|
||||
id=file_id,
|
||||
tenant_id=tenant_id,
|
||||
storage_type="evaluation_result",
|
||||
key=storage_key,
|
||||
name=filename,
|
||||
size=len(xlsx_content),
|
||||
extension="xlsx",
|
||||
mime_type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
|
||||
created_by_role="account",
|
||||
created_by="system",
|
||||
)
|
||||
session.add(upload_file)
|
||||
session.commit()
|
||||
return file_id
|
||||
except Exception:
|
||||
logger.exception("Failed to store result file for run %s", run_id)
|
||||
return None
|
||||
Loading…
Reference in New Issue
Block a user