diff --git a/api/configs/feature/__init__.py b/api/configs/feature/__init__.py index d37cff63e9..47f83d1ec0 100644 --- a/api/configs/feature/__init__.py +++ b/api/configs/feature/__init__.py @@ -1366,6 +1366,32 @@ class SandboxExpiredRecordsCleanConfig(BaseSettings): ) +class EvaluationConfig(BaseSettings): + """ + Configuration for evaluation runtime + """ + + EVALUATION_FRAMEWORK: str = Field( + description="Evaluation framework to use (ragas/deepeval/none)", + default="none", + ) + + EVALUATION_MAX_CONCURRENT_RUNS: PositiveInt = Field( + description="Maximum number of concurrent evaluation runs per tenant", + default=3, + ) + + EVALUATION_MAX_DATASET_ROWS: PositiveInt = Field( + description="Maximum number of rows allowed in an evaluation dataset", + default=1000, + ) + + EVALUATION_TASK_TIMEOUT: PositiveInt = Field( + description="Timeout in seconds for a single evaluation task", + default=3600, + ) + + class FeatureConfig( # place the configs in alphabet order AppExecutionConfig, @@ -1378,6 +1404,7 @@ class FeatureConfig( MarketplaceConfig, DataSetConfig, EndpointConfig, + EvaluationConfig, FileAccessConfig, FileUploadConfig, HttpConfig, diff --git a/api/controllers/console/evaluation/evaluation.py b/api/controllers/console/evaluation/evaluation.py index 37516106f9..d301f52b49 100644 --- a/api/controllers/console/evaluation/evaluation.py +++ b/api/controllers/console/evaluation/evaluation.py @@ -1,7 +1,9 @@ +from __future__ import annotations + import logging from collections.abc import Callable from functools import wraps -from typing import ParamSpec, TypeVar, Union +from typing import TYPE_CHECKING, ParamSpec, TypeVar, Union from urllib.parse import quote from flask import Response, request @@ -9,7 +11,7 @@ from flask_restx import Resource, fields from pydantic import BaseModel from sqlalchemy import select from sqlalchemy.orm import Session -from werkzeug.exceptions import NotFound +from werkzeug.exceptions import BadRequest, NotFound from controllers.common.schema import register_schema_models from controllers.console import console_ns @@ -18,6 +20,7 @@ from controllers.console.wraps import ( edit_permission_required, setup_required, ) +from core.evaluation.entities.evaluation_entity import EvaluationCategory from core.file import helpers as file_helpers from extensions.ext_database import db from libs.helper import TimestampField @@ -25,8 +28,17 @@ from libs.login import current_account_with_tenant, login_required from models import App from models.model import UploadFile from models.snippet import CustomizedSnippet +from services.errors.evaluation import ( + EvaluationDatasetInvalidError, + EvaluationFrameworkNotConfiguredError, + EvaluationMaxConcurrentRunsError, + EvaluationNotFoundError, +) from services.evaluation_service import EvaluationService +if TYPE_CHECKING: + from models.evaluation import EvaluationRun, EvaluationRunItem + logger = logging.getLogger(__name__) P = ParamSpec("P") @@ -208,25 +220,70 @@ class EvaluationDetailApi(Resource): @get_evaluation_target def get(self, target: Union[App, CustomizedSnippet], target_type: str): """ - Get evaluation details for the target. + Get evaluation configuration for the target. Returns evaluation configuration including model settings, - customized matrix, and judgement conditions. + metrics config, and judgement conditions. """ - # TODO: Implement actual evaluation detail retrieval - # This is a placeholder implementation + _, current_tenant_id = current_account_with_tenant() + + with Session(db.engine, expire_on_commit=False) as session: + config = EvaluationService.get_evaluation_config( + session, current_tenant_id, target_type, str(target.id) + ) + + if config is None: + return { + "evaluation_model": None, + "evaluation_model_provider": None, + "metrics_config": None, + "judgement_conditions": None, + } + return { - "evaluation_model": None, - "evaluation_model_provider": None, - "customized_matrix": None, - "judgement_conditions": None, + "evaluation_model": config.evaluation_model, + "evaluation_model_provider": config.evaluation_model_provider, + "metrics_config": config.metrics_config_dict, + "judgement_conditions": config.judgement_conditions_dict, + } + + @console_ns.doc("save_evaluation_detail") + @console_ns.response(200, "Evaluation configuration saved successfully") + @console_ns.response(404, "Target not found") + @setup_required + @login_required + @account_initialization_required + @get_evaluation_target + @edit_permission_required + def put(self, target: Union[App, CustomizedSnippet], target_type: str): + """ + Save evaluation configuration for the target. + """ + current_account, current_tenant_id = current_account_with_tenant() + data = request.get_json(force=True) + + with Session(db.engine, expire_on_commit=False) as session: + config = EvaluationService.save_evaluation_config( + session=session, + tenant_id=current_tenant_id, + target_type=target_type, + target_id=str(target.id), + account_id=str(current_account.id), + data=data, + ) + + return { + "evaluation_model": config.evaluation_model, + "evaluation_model_provider": config.evaluation_model_provider, + "metrics_config": config.metrics_config_dict, + "judgement_conditions": config.judgement_conditions_dict, } @console_ns.route("///evaluation/logs") class EvaluationLogsApi(Resource): @console_ns.doc("get_evaluation_logs") - @console_ns.response(200, "Evaluation logs retrieved successfully", evaluation_log_list_model) + @console_ns.response(200, "Evaluation logs retrieved successfully") @console_ns.response(404, "Target not found") @setup_required @login_required @@ -234,18 +291,192 @@ class EvaluationLogsApi(Resource): @get_evaluation_target def get(self, target: Union[App, CustomizedSnippet], target_type: str): """ - Get offline evaluation logs for the target. + Get evaluation run history for the target. - Returns a list of evaluation runs with test files, - result files, and version information. + Returns a paginated list of evaluation runs. """ - # TODO: Implement actual evaluation logs retrieval - # This is a placeholder implementation + _, current_tenant_id = current_account_with_tenant() + page = request.args.get("page", 1, type=int) + page_size = request.args.get("page_size", 20, type=int) + + with Session(db.engine, expire_on_commit=False) as session: + runs, total = EvaluationService.get_evaluation_runs( + session=session, + tenant_id=current_tenant_id, + target_type=target_type, + target_id=str(target.id), + page=page, + page_size=page_size, + ) + return { - "data": [], + "data": [_serialize_evaluation_run(run) for run in runs], + "total": total, + "page": page, + "page_size": page_size, } +@console_ns.route("///evaluation/run") +class EvaluationRunApi(Resource): + @console_ns.doc("start_evaluation_run") + @console_ns.response(200, "Evaluation run started") + @console_ns.response(400, "Invalid request") + @console_ns.response(404, "Target not found") + @setup_required + @login_required + @account_initialization_required + @get_evaluation_target + @edit_permission_required + def post(self, target: Union[App, CustomizedSnippet], target_type: str): + """ + Start an evaluation run. + + Expects multipart form data with: + - file: XLSX dataset file + - evaluation_category: one of llm, retrieval, agent, workflow + """ + current_account, current_tenant_id = current_account_with_tenant() + + # Validate file upload + if "file" not in request.files: + raise BadRequest("Dataset file is required.") + file = request.files["file"] + if not file.filename or not file.filename.endswith(".xlsx"): + raise BadRequest("Dataset file must be an XLSX file.") + + dataset_content = file.read() + if not dataset_content: + raise BadRequest("Dataset file is empty.") + + # Validate evaluation category + category_str = request.form.get("evaluation_category", "llm") + try: + evaluation_category = EvaluationCategory(category_str) + except ValueError: + raise BadRequest( + f"Invalid evaluation_category: {category_str}. " + f"Must be one of: {', '.join(e.value for e in EvaluationCategory)}" + ) + + try: + with Session(db.engine, expire_on_commit=False) as session: + evaluation_run = EvaluationService.start_evaluation_run( + session=session, + tenant_id=current_tenant_id, + target_type=target_type, + target_id=str(target.id), + account_id=str(current_account.id), + dataset_file_content=dataset_content, + evaluation_category=evaluation_category, + ) + return _serialize_evaluation_run(evaluation_run), 200 + except EvaluationFrameworkNotConfiguredError as e: + return {"message": str(e.description)}, 400 + except EvaluationNotFoundError as e: + return {"message": str(e.description)}, 404 + except EvaluationMaxConcurrentRunsError as e: + return {"message": str(e.description)}, 429 + except EvaluationDatasetInvalidError as e: + return {"message": str(e.description)}, 400 + + +@console_ns.route( + "///evaluation/runs/" +) +class EvaluationRunDetailApi(Resource): + @console_ns.doc("get_evaluation_run_detail") + @console_ns.response(200, "Evaluation run detail retrieved") + @console_ns.response(404, "Run not found") + @setup_required + @login_required + @account_initialization_required + @get_evaluation_target + def get(self, target: Union[App, CustomizedSnippet], target_type: str, run_id: str): + """ + Get evaluation run detail including items. + """ + _, current_tenant_id = current_account_with_tenant() + run_id = str(run_id) + page = request.args.get("page", 1, type=int) + page_size = request.args.get("page_size", 50, type=int) + + try: + with Session(db.engine, expire_on_commit=False) as session: + run = EvaluationService.get_evaluation_run_detail( + session=session, + tenant_id=current_tenant_id, + run_id=run_id, + ) + items, total_items = EvaluationService.get_evaluation_run_items( + session=session, + run_id=run_id, + page=page, + page_size=page_size, + ) + + return { + "run": _serialize_evaluation_run(run), + "items": { + "data": [_serialize_evaluation_run_item(item) for item in items], + "total": total_items, + "page": page, + "page_size": page_size, + }, + } + except EvaluationNotFoundError as e: + return {"message": str(e.description)}, 404 + + +@console_ns.route( + "///evaluation/runs//cancel" +) +class EvaluationRunCancelApi(Resource): + @console_ns.doc("cancel_evaluation_run") + @console_ns.response(200, "Evaluation run cancelled") + @console_ns.response(404, "Run not found") + @setup_required + @login_required + @account_initialization_required + @get_evaluation_target + @edit_permission_required + def post(self, target: Union[App, CustomizedSnippet], target_type: str, run_id: str): + """Cancel a running evaluation.""" + _, current_tenant_id = current_account_with_tenant() + run_id = str(run_id) + + try: + with Session(db.engine, expire_on_commit=False) as session: + run = EvaluationService.cancel_evaluation_run( + session=session, + tenant_id=current_tenant_id, + run_id=run_id, + ) + return _serialize_evaluation_run(run) + except EvaluationNotFoundError as e: + return {"message": str(e.description)}, 404 + except ValueError as e: + return {"message": str(e)}, 400 + + +@console_ns.route("///evaluation/metrics") +class EvaluationMetricsApi(Resource): + @console_ns.doc("get_evaluation_metrics") + @console_ns.response(200, "Available metrics retrieved") + @setup_required + @login_required + @account_initialization_required + @get_evaluation_target + def get(self, target: Union[App, CustomizedSnippet], target_type: str): + """ + Get available evaluation metrics for the current framework. + """ + result = {} + for category in EvaluationCategory: + result[category.value] = EvaluationService.get_supported_metrics(category) + return {"metrics": result} + + @console_ns.route("///evaluation/files/") class EvaluationFileDownloadApi(Resource): @console_ns.doc("download_evaluation_file") @@ -309,8 +540,6 @@ class EvaluationVersionApi(Resource): if not version: return {"message": "version parameter is required"}, 400 - # TODO: Implement actual version detail retrieval - # For now, return the current graph if available graph = {} if target_type == "snippets" and isinstance(target, CustomizedSnippet): graph = target.graph_dict @@ -318,3 +547,43 @@ class EvaluationVersionApi(Resource): return { "graph": graph, } + + +# ---- Serialization Helpers ---- + + +def _serialize_evaluation_run(run: EvaluationRun) -> dict[str, object]: + return { + "id": run.id, + "tenant_id": run.tenant_id, + "target_type": run.target_type, + "target_id": run.target_id, + "evaluation_config_id": run.evaluation_config_id, + "status": run.status, + "dataset_file_id": run.dataset_file_id, + "result_file_id": run.result_file_id, + "total_items": run.total_items, + "completed_items": run.completed_items, + "failed_items": run.failed_items, + "progress": run.progress, + "metrics_summary": run.metrics_summary_dict, + "error": run.error, + "created_by": run.created_by, + "started_at": int(run.started_at.timestamp()) if run.started_at else None, + "completed_at": int(run.completed_at.timestamp()) if run.completed_at else None, + "created_at": int(run.created_at.timestamp()) if run.created_at else None, + } + + +def _serialize_evaluation_run_item(item: EvaluationRunItem) -> dict[str, object]: + return { + "id": item.id, + "item_index": item.item_index, + "inputs": item.inputs_dict, + "expected_output": item.expected_output, + "actual_output": item.actual_output, + "metrics": item.metrics_list, + "metadata": item.metadata_dict, + "error": item.error, + "overall_score": item.overall_score, + } diff --git a/api/core/evaluation/__init__.py b/api/core/evaluation/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/api/core/evaluation/base_evaluation_instance.py b/api/core/evaluation/base_evaluation_instance.py new file mode 100644 index 0000000000..c5fc3f946e --- /dev/null +++ b/api/core/evaluation/base_evaluation_instance.py @@ -0,0 +1,64 @@ +from abc import ABC, abstractmethod + +from core.evaluation.entities.evaluation_entity import ( + EvaluationCategory, + EvaluationItemInput, + EvaluationItemResult, +) + + +class BaseEvaluationInstance(ABC): + """Abstract base class for evaluation framework adapters.""" + + @abstractmethod + def evaluate_llm( + self, + items: list[EvaluationItemInput], + metrics_config: dict, + model_provider: str, + model_name: str, + tenant_id: str, + ) -> list[EvaluationItemResult]: + """Evaluate LLM outputs using the configured framework.""" + ... + + @abstractmethod + def evaluate_retrieval( + self, + items: list[EvaluationItemInput], + metrics_config: dict, + model_provider: str, + model_name: str, + tenant_id: str, + ) -> list[EvaluationItemResult]: + """Evaluate retrieval quality using the configured framework.""" + ... + + @abstractmethod + def evaluate_agent( + self, + items: list[EvaluationItemInput], + metrics_config: dict, + model_provider: str, + model_name: str, + tenant_id: str, + ) -> list[EvaluationItemResult]: + """Evaluate agent outputs using the configured framework.""" + ... + + @abstractmethod + def evaluate_workflow( + self, + items: list[EvaluationItemInput], + metrics_config: dict, + model_provider: str, + model_name: str, + tenant_id: str, + ) -> list[EvaluationItemResult]: + """Evaluate workflow outputs using the configured framework.""" + ... + + @abstractmethod + def get_supported_metrics(self, category: EvaluationCategory) -> list[str]: + """Return the list of supported metric names for a given evaluation category.""" + ... diff --git a/api/core/evaluation/entities/__init__.py b/api/core/evaluation/entities/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/api/core/evaluation/entities/config_entity.py b/api/core/evaluation/entities/config_entity.py new file mode 100644 index 0000000000..b1a48b894d --- /dev/null +++ b/api/core/evaluation/entities/config_entity.py @@ -0,0 +1,19 @@ +from enum import StrEnum + +from pydantic import BaseModel + + +class EvaluationFrameworkEnum(StrEnum): + RAGAS = "ragas" + DEEPEVAL = "deepeval" + NONE = "none" + + +class BaseEvaluationConfig(BaseModel): + """Base configuration for evaluation frameworks.""" + pass + + +class RagasConfig(BaseEvaluationConfig): + """RAGAS-specific configuration.""" + pass diff --git a/api/core/evaluation/entities/evaluation_entity.py b/api/core/evaluation/entities/evaluation_entity.py new file mode 100644 index 0000000000..61a067a3a3 --- /dev/null +++ b/api/core/evaluation/entities/evaluation_entity.py @@ -0,0 +1,52 @@ +from enum import StrEnum +from typing import Any + +from pydantic import BaseModel, Field + + +class EvaluationCategory(StrEnum): + LLM = "llm" + RETRIEVAL = "retrieval" + AGENT = "agent" + WORKFLOW = "workflow" + + +class EvaluationMetric(BaseModel): + name: str + score: float + details: dict[str, Any] = Field(default_factory=dict) + + +class EvaluationItemInput(BaseModel): + index: int + inputs: dict[str, Any] + expected_output: str | None = None + context: list[str] | None = None + + +class EvaluationItemResult(BaseModel): + index: int + actual_output: str | None = None + metrics: list[EvaluationMetric] = Field(default_factory=list) + metadata: dict[str, Any] = Field(default_factory=dict) + error: str | None = None + + @property + def overall_score(self) -> float | None: + if not self.metrics: + return None + scores = [m.score for m in self.metrics] + return sum(scores) / len(scores) + + +class EvaluationRunData(BaseModel): + """Serializable data for Celery task.""" + evaluation_run_id: str + tenant_id: str + target_type: str + target_id: str + evaluation_category: EvaluationCategory + evaluation_model_provider: str + evaluation_model: str + metrics_config: dict[str, Any] = Field(default_factory=dict) + items: list[EvaluationItemInput] diff --git a/api/core/evaluation/evaluation_manager.py b/api/core/evaluation/evaluation_manager.py new file mode 100644 index 0000000000..5499b96cad --- /dev/null +++ b/api/core/evaluation/evaluation_manager.py @@ -0,0 +1,61 @@ +import collections +import logging +from typing import Any + +from configs import dify_config +from core.evaluation.base_evaluation_instance import BaseEvaluationInstance +from core.evaluation.entities.config_entity import EvaluationFrameworkEnum +from core.evaluation.entities.evaluation_entity import EvaluationCategory + +logger = logging.getLogger(__name__) + + +class EvaluationFrameworkConfigMap(collections.UserDict[str, dict[str, Any]]): + """Registry mapping framework enum -> {config_class, evaluator_class}.""" + + def __getitem__(self, framework: str) -> dict[str, Any]: + match framework: + case EvaluationFrameworkEnum.RAGAS: + from core.evaluation.entities.config_entity import RagasConfig + from core.evaluation.frameworks.ragas.ragas_evaluator import RagasEvaluator + + return { + "config_class": RagasConfig, + "evaluator_class": RagasEvaluator, + } + case EvaluationFrameworkEnum.DEEPEVAL: + raise NotImplementedError("DeepEval adapter is not yet implemented.") + case _: + raise ValueError(f"Unknown evaluation framework: {framework}") + + +evaluation_framework_config_map = EvaluationFrameworkConfigMap() + + +class EvaluationManager: + """Factory for evaluation instances based on global configuration.""" + + @staticmethod + def get_evaluation_instance() -> BaseEvaluationInstance | None: + """Create and return an evaluation instance based on EVALUATION_FRAMEWORK env var.""" + framework = dify_config.EVALUATION_FRAMEWORK + if not framework or framework == EvaluationFrameworkEnum.NONE: + return None + + try: + config_map = evaluation_framework_config_map[framework] + evaluator_class = config_map["evaluator_class"] + config_class = config_map["config_class"] + config = config_class() + return evaluator_class(config) + except Exception: + logger.exception("Failed to create evaluation instance for framework: %s", framework) + return None + + @staticmethod + def get_supported_metrics(category: EvaluationCategory) -> list[str]: + """Return supported metrics for the current framework and given category.""" + instance = EvaluationManager.get_evaluation_instance() + if instance is None: + return [] + return instance.get_supported_metrics(category) diff --git a/api/core/evaluation/frameworks/__init__.py b/api/core/evaluation/frameworks/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/api/core/evaluation/frameworks/ragas/__init__.py b/api/core/evaluation/frameworks/ragas/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/api/core/evaluation/frameworks/ragas/ragas_evaluator.py b/api/core/evaluation/frameworks/ragas/ragas_evaluator.py new file mode 100644 index 0000000000..ceab472cfc --- /dev/null +++ b/api/core/evaluation/frameworks/ragas/ragas_evaluator.py @@ -0,0 +1,279 @@ +import logging +from typing import Any + +from core.evaluation.base_evaluation_instance import BaseEvaluationInstance +from core.evaluation.entities.config_entity import RagasConfig +from core.evaluation.entities.evaluation_entity import ( + EvaluationCategory, + EvaluationItemInput, + EvaluationItemResult, + EvaluationMetric, +) +from core.evaluation.frameworks.ragas.ragas_model_wrapper import DifyModelWrapper + +logger = logging.getLogger(__name__) + +# Metric name mappings per category +LLM_METRICS = ["faithfulness", "answer_relevancy", "answer_correctness", "answer_similarity"] +RETRIEVAL_METRICS = ["context_precision", "context_recall", "context_relevancy"] +AGENT_METRICS = ["tool_call_accuracy", "answer_correctness"] +WORKFLOW_METRICS = ["faithfulness", "answer_correctness"] + + +class RagasEvaluator(BaseEvaluationInstance): + """RAGAS framework adapter for evaluation.""" + + def __init__(self, config: RagasConfig): + self.config = config + + def get_supported_metrics(self, category: EvaluationCategory) -> list[str]: + match category: + case EvaluationCategory.LLM: + return LLM_METRICS + case EvaluationCategory.RETRIEVAL: + return RETRIEVAL_METRICS + case EvaluationCategory.AGENT: + return AGENT_METRICS + case EvaluationCategory.WORKFLOW: + return WORKFLOW_METRICS + case _: + return [] + + def evaluate_llm( + self, + items: list[EvaluationItemInput], + metrics_config: dict, + model_provider: str, + model_name: str, + tenant_id: str, + ) -> list[EvaluationItemResult]: + return self._evaluate(items, metrics_config, model_provider, model_name, tenant_id, EvaluationCategory.LLM) + + def evaluate_retrieval( + self, + items: list[EvaluationItemInput], + metrics_config: dict, + model_provider: str, + model_name: str, + tenant_id: str, + ) -> list[EvaluationItemResult]: + return self._evaluate( + items, metrics_config, model_provider, model_name, tenant_id, EvaluationCategory.RETRIEVAL + ) + + def evaluate_agent( + self, + items: list[EvaluationItemInput], + metrics_config: dict, + model_provider: str, + model_name: str, + tenant_id: str, + ) -> list[EvaluationItemResult]: + return self._evaluate(items, metrics_config, model_provider, model_name, tenant_id, EvaluationCategory.AGENT) + + def evaluate_workflow( + self, + items: list[EvaluationItemInput], + metrics_config: dict, + model_provider: str, + model_name: str, + tenant_id: str, + ) -> list[EvaluationItemResult]: + return self._evaluate( + items, metrics_config, model_provider, model_name, tenant_id, EvaluationCategory.WORKFLOW + ) + + def _evaluate( + self, + items: list[EvaluationItemInput], + metrics_config: dict, + model_provider: str, + model_name: str, + tenant_id: str, + category: EvaluationCategory, + ) -> list[EvaluationItemResult]: + """Core evaluation logic using RAGAS. + + Uses the Dify model wrapper as judge LLM. Falls back to simple + string similarity if RAGAS import fails. + """ + model_wrapper = DifyModelWrapper(model_provider, model_name, tenant_id) + requested_metrics = metrics_config.get("metrics", self.get_supported_metrics(category)) + + try: + return self._evaluate_with_ragas(items, requested_metrics, model_wrapper, category) + except ImportError: + logger.warning("RAGAS not installed, falling back to simple evaluation") + return self._evaluate_simple(items, requested_metrics, model_wrapper) + + def _evaluate_with_ragas( + self, + items: list[EvaluationItemInput], + requested_metrics: list[str], + model_wrapper: DifyModelWrapper, + category: EvaluationCategory, + ) -> list[EvaluationItemResult]: + """Evaluate using RAGAS library.""" + from ragas import evaluate as ragas_evaluate + from ragas.dataset_schema import EvaluationDataset, SingleTurnSample + from ragas.llms import LangchainLLMWrapper + from ragas.metrics import ( + Faithfulness, + ResponseRelevancy, + ) + + # Build RAGAS dataset + samples = [] + for item in items: + sample = SingleTurnSample( + user_input=self._inputs_to_query(item.inputs), + response=item.expected_output or "", + retrieved_contexts=item.context or [], + ) + if item.expected_output: + sample.reference = item.expected_output + samples.append(sample) + + dataset = EvaluationDataset(samples=samples) + + # Build metric instances + ragas_metrics = self._build_ragas_metrics(requested_metrics) + + if not ragas_metrics: + logger.warning("No valid RAGAS metrics found for: %s", requested_metrics) + return [EvaluationItemResult(index=item.index) for item in items] + + # Run RAGAS evaluation + try: + result = ragas_evaluate( + dataset=dataset, + metrics=ragas_metrics, + ) + + # Convert RAGAS results to our format + results = [] + result_df = result.to_pandas() + for i, item in enumerate(items): + metrics = [] + for metric_name in requested_metrics: + if metric_name in result_df.columns: + score = result_df.iloc[i][metric_name] + if score is not None and not (isinstance(score, float) and score != score): # NaN check + metrics.append(EvaluationMetric(name=metric_name, score=float(score))) + results.append(EvaluationItemResult(index=item.index, metrics=metrics)) + return results + except Exception: + logger.exception("RAGAS evaluation failed, falling back to simple evaluation") + return self._evaluate_simple(items, requested_metrics, model_wrapper) + + def _evaluate_simple( + self, + items: list[EvaluationItemInput], + requested_metrics: list[str], + model_wrapper: DifyModelWrapper, + ) -> list[EvaluationItemResult]: + """Simple LLM-as-judge fallback when RAGAS is not available.""" + results = [] + for item in items: + metrics = [] + query = self._inputs_to_query(item.inputs) + + for metric_name in requested_metrics: + try: + score = self._judge_with_llm(model_wrapper, metric_name, query, item) + metrics.append(EvaluationMetric(name=metric_name, score=score)) + except Exception: + logger.exception("Failed to compute metric %s for item %d", metric_name, item.index) + + results.append(EvaluationItemResult(index=item.index, metrics=metrics)) + return results + + def _judge_with_llm( + self, + model_wrapper: DifyModelWrapper, + metric_name: str, + query: str, + item: EvaluationItemInput, + ) -> float: + """Use the LLM to judge a single metric for a single item.""" + prompt = self._build_judge_prompt(metric_name, query, item) + response = model_wrapper.invoke(prompt) + return self._parse_score(response) + + def _build_judge_prompt(self, metric_name: str, query: str, item: EvaluationItemInput) -> str: + """Build a scoring prompt for the LLM judge.""" + parts = [ + f"Evaluate the following on the metric '{metric_name}' using a scale of 0.0 to 1.0.", + f"\nQuery: {query}", + ] + if item.expected_output: + parts.append(f"\nExpected Output: {item.expected_output}") + if item.context: + parts.append(f"\nContext: {'; '.join(item.context)}") + parts.append( + "\nRespond with ONLY a single floating point number between 0.0 and 1.0, nothing else." + ) + return "\n".join(parts) + + @staticmethod + def _parse_score(response: str) -> float: + """Parse a float score from LLM response.""" + cleaned = response.strip() + try: + score = float(cleaned) + return max(0.0, min(1.0, score)) + except ValueError: + # Try to extract first number from response + import re + + match = re.search(r"(\d+\.?\d*)", cleaned) + if match: + score = float(match.group(1)) + return max(0.0, min(1.0, score)) + return 0.0 + + @staticmethod + def _inputs_to_query(inputs: dict[str, Any]) -> str: + """Convert input dict to a query string.""" + if "query" in inputs: + return str(inputs["query"]) + if "question" in inputs: + return str(inputs["question"]) + # Fallback: concatenate all input values + return " ".join(str(v) for v in inputs.values()) + + @staticmethod + def _build_ragas_metrics(requested_metrics: list[str]) -> list[Any]: + """Build RAGAS metric instances from metric names.""" + try: + from ragas.metrics import ( + AnswerCorrectness, + AnswerRelevancy, + AnswerSimilarity, + ContextPrecision, + ContextRecall, + ContextRelevancy, + Faithfulness, + ) + + metric_map: dict[str, Any] = { + "faithfulness": Faithfulness, + "answer_relevancy": AnswerRelevancy, + "answer_correctness": AnswerCorrectness, + "answer_similarity": AnswerSimilarity, + "context_precision": ContextPrecision, + "context_recall": ContextRecall, + "context_relevancy": ContextRelevancy, + } + + metrics = [] + for name in requested_metrics: + metric_class = metric_map.get(name) + if metric_class: + metrics.append(metric_class()) + else: + logger.warning("Unknown RAGAS metric: %s", name) + return metrics + except ImportError: + logger.warning("RAGAS metrics not available") + return [] diff --git a/api/core/evaluation/frameworks/ragas/ragas_model_wrapper.py b/api/core/evaluation/frameworks/ragas/ragas_model_wrapper.py new file mode 100644 index 0000000000..e0a5e14914 --- /dev/null +++ b/api/core/evaluation/frameworks/ragas/ragas_model_wrapper.py @@ -0,0 +1,48 @@ +import logging +from typing import Any + +logger = logging.getLogger(__name__) + + +class DifyModelWrapper: + """Wraps Dify's model invocation interface for use by RAGAS as an LLM judge. + + RAGAS requires an LLM to compute certain metrics (faithfulness, answer_relevancy, etc.). + This wrapper bridges Dify's ModelInstance to a callable that RAGAS can use. + """ + + def __init__(self, model_provider: str, model_name: str, tenant_id: str): + self.model_provider = model_provider + self.model_name = model_name + self.tenant_id = tenant_id + + def _get_model_instance(self) -> Any: + from core.model_manager import ModelManager + from core.model_runtime.entities.model_entities import ModelType + + model_manager = ModelManager() + model_instance = model_manager.get_model_instance( + tenant_id=self.tenant_id, + provider=self.model_provider, + model_type=ModelType.LLM, + model=self.model_name, + ) + return model_instance + + def invoke(self, prompt: str) -> str: + """Invoke the model with a text prompt and return the text response.""" + from core.model_runtime.entities.message_entities import ( + SystemPromptMessage, + UserPromptMessage, + ) + + model_instance = self._get_model_instance() + result = model_instance.invoke_llm( + prompt_messages=[ + SystemPromptMessage(content="You are an evaluation judge. Answer precisely and concisely."), + UserPromptMessage(content=prompt), + ], + model_parameters={"temperature": 0.0, "max_tokens": 2048}, + stream=False, + ) + return result.message.content diff --git a/api/core/evaluation/runners/__init__.py b/api/core/evaluation/runners/__init__.py new file mode 100644 index 0000000000..62184ad3f1 --- /dev/null +++ b/api/core/evaluation/runners/__init__.py @@ -0,0 +1,32 @@ +from sqlalchemy import select +from sqlalchemy.orm import Session + +from models import Account, App, TenantAccountJoin + + +def get_service_account_for_app(session: Session, app_id: str) -> Account: + """Get the creator account for an app with tenant context set up. + + This follows the same pattern as BaseTraceInstance.get_service_account_with_tenant(). + """ + app = session.scalar(select(App).where(App.id == app_id)) + if not app: + raise ValueError(f"App with id {app_id} not found") + + if not app.created_by: + raise ValueError(f"App with id {app_id} has no creator") + + account = session.scalar(select(Account).where(Account.id == app.created_by)) + if not account: + raise ValueError(f"Creator account not found for app {app_id}") + + current_tenant = ( + session.query(TenantAccountJoin) + .filter_by(account_id=account.id, current=True) + .first() + ) + if not current_tenant: + raise ValueError(f"Current tenant not found for account {account.id}") + + account.set_tenant_id(current_tenant.tenant_id) + return account diff --git a/api/core/evaluation/runners/agent_evaluation_runner.py b/api/core/evaluation/runners/agent_evaluation_runner.py new file mode 100644 index 0000000000..2f0bc210a5 --- /dev/null +++ b/api/core/evaluation/runners/agent_evaluation_runner.py @@ -0,0 +1,152 @@ +import logging +from typing import Any, Mapping, Union + +from sqlalchemy.orm import Session + +from core.evaluation.base_evaluation_instance import BaseEvaluationInstance +from core.evaluation.entities.evaluation_entity import ( + EvaluationItemInput, + EvaluationItemResult, +) +from core.evaluation.runners.base_evaluation_runner import BaseEvaluationRunner +from models.model import App, AppMode + +logger = logging.getLogger(__name__) + + +class AgentEvaluationRunner(BaseEvaluationRunner): + """Runner for agent evaluation: executes agent-type App, collects tool calls and final output.""" + + def __init__(self, evaluation_instance: BaseEvaluationInstance, session: Session): + super().__init__(evaluation_instance, session) + + def execute_target( + self, + tenant_id: str, + target_id: str, + target_type: str, + item: EvaluationItemInput, + ) -> EvaluationItemResult: + """Execute agent app and collect response with tool call information.""" + from core.app.apps.agent_chat.app_generator import AgentChatAppGenerator + from core.evaluation.runners import get_service_account_for_app + from core.app.entities.app_invoke_entities import InvokeFrom + + app = self.session.query(App).filter_by(id=target_id).first() + if not app: + raise ValueError(f"App {target_id} not found") + + service_account = get_service_account_for_app(self.session, target_id) + + query = self._extract_query(item.inputs) + args: dict[str, Any] = { + "inputs": item.inputs, + "query": query, + } + + generator = AgentChatAppGenerator() + # Agent chat requires streaming - collect full response + response_generator = generator.generate( + app_model=app, + user=service_account, + args=args, + invoke_from=InvokeFrom.SERVICE_API, + streaming=True, + ) + + # Consume the stream to get the full response + actual_output, tool_calls = self._consume_agent_stream(response_generator) + + return EvaluationItemResult( + index=item.index, + actual_output=actual_output, + metadata={"tool_calls": tool_calls}, + ) + + def evaluate_metrics( + self, + items: list[EvaluationItemInput], + results: list[EvaluationItemResult], + metrics_config: dict, + model_provider: str, + model_name: str, + tenant_id: str, + ) -> list[EvaluationItemResult]: + """Compute agent evaluation metrics.""" + result_by_index = {r.index: r for r in results} + merged_items = [] + for item in items: + result = result_by_index.get(item.index) + context = [] + if result and result.actual_output: + context.append(result.actual_output) + merged_items.append( + EvaluationItemInput( + index=item.index, + inputs=item.inputs, + expected_output=item.expected_output, + context=context + (item.context or []), + ) + ) + + evaluated = self.evaluation_instance.evaluate_agent( + merged_items, metrics_config, model_provider, model_name, tenant_id + ) + + # Merge metrics back preserving metadata + eval_by_index = {r.index: r for r in evaluated} + final_results = [] + for result in results: + if result.index in eval_by_index: + eval_result = eval_by_index[result.index] + final_results.append( + EvaluationItemResult( + index=result.index, + actual_output=result.actual_output, + metrics=eval_result.metrics, + metadata=result.metadata, + error=result.error, + ) + ) + else: + final_results.append(result) + return final_results + + @staticmethod + def _extract_query(inputs: dict[str, Any]) -> str: + for key in ("query", "question", "input", "text"): + if key in inputs: + return str(inputs[key]) + values = list(inputs.values()) + return str(values[0]) if values else "" + + @staticmethod + def _consume_agent_stream(response_generator: Any) -> tuple[str, list[dict]]: + """Consume agent streaming response and extract final answer + tool calls.""" + answer_parts: list[str] = [] + tool_calls: list[dict] = [] + + try: + for chunk in response_generator: + if isinstance(chunk, Mapping): + event = chunk.get("event") + if event == "agent_thought": + thought = chunk.get("thought", "") + if thought: + answer_parts.append(thought) + tool = chunk.get("tool") + if tool: + tool_calls.append({ + "tool": tool, + "tool_input": chunk.get("tool_input", ""), + }) + elif event == "message": + answer = chunk.get("answer", "") + if answer: + answer_parts.append(answer) + elif isinstance(chunk, str): + answer_parts.append(chunk) + except Exception: + logger.exception("Error consuming agent stream") + + return "".join(answer_parts), tool_calls diff --git a/api/core/evaluation/runners/base_evaluation_runner.py b/api/core/evaluation/runners/base_evaluation_runner.py new file mode 100644 index 0000000000..c82935a280 --- /dev/null +++ b/api/core/evaluation/runners/base_evaluation_runner.py @@ -0,0 +1,130 @@ +import json +import logging +from abc import ABC, abstractmethod + +from sqlalchemy.orm import Session + +from core.evaluation.base_evaluation_instance import BaseEvaluationInstance +from core.evaluation.entities.evaluation_entity import ( + EvaluationItemInput, + EvaluationItemResult, +) +from libs.datetime_utils import naive_utc_now +from models.evaluation import EvaluationRun, EvaluationRunItem, EvaluationRunStatus + +logger = logging.getLogger(__name__) + + +class BaseEvaluationRunner(ABC): + """Abstract base class for evaluation runners. + + Runners are responsible for executing the target (App/Snippet/Retrieval) + to collect actual outputs, then delegating to the evaluation instance + for metric computation. + """ + + def __init__(self, evaluation_instance: BaseEvaluationInstance, session: Session): + self.evaluation_instance = evaluation_instance + self.session = session + + @abstractmethod + def execute_target( + self, + tenant_id: str, + target_id: str, + target_type: str, + item: EvaluationItemInput, + ) -> EvaluationItemResult: + """Execute the evaluation target for a single item and return the result with actual_output populated.""" + ... + + @abstractmethod + def evaluate_metrics( + self, + items: list[EvaluationItemInput], + results: list[EvaluationItemResult], + metrics_config: dict, + model_provider: str, + model_name: str, + tenant_id: str, + ) -> list[EvaluationItemResult]: + """Compute evaluation metrics on the collected results.""" + ... + + def run( + self, + evaluation_run_id: str, + tenant_id: str, + target_id: str, + target_type: str, + items: list[EvaluationItemInput], + metrics_config: dict, + model_provider: str, + model_name: str, + ) -> list[EvaluationItemResult]: + """Orchestrate target execution + metric evaluation for all items.""" + evaluation_run = self.session.query(EvaluationRun).filter_by(id=evaluation_run_id).first() + if not evaluation_run: + raise ValueError(f"EvaluationRun {evaluation_run_id} not found") + + # Update status to running + evaluation_run.status = EvaluationRunStatus.RUNNING + evaluation_run.started_at = naive_utc_now() + self.session.commit() + + results: list[EvaluationItemResult] = [] + + # Phase 1: Execute target for each item + for item in items: + try: + result = self.execute_target(tenant_id, target_id, target_type, item) + results.append(result) + evaluation_run.completed_items += 1 + except Exception as e: + logger.exception("Failed to execute target for item %d", item.index) + results.append( + EvaluationItemResult( + index=item.index, + error=str(e), + ) + ) + evaluation_run.failed_items += 1 + self.session.commit() + + # Phase 2: Compute metrics on successful results + successful_items = [item for item, result in zip(items, results) if result.error is None] + successful_results = [r for r in results if r.error is None] + + if successful_items and successful_results: + try: + evaluated_results = self.evaluate_metrics( + successful_items, successful_results, metrics_config, model_provider, model_name, tenant_id + ) + # Merge evaluated metrics back into results + evaluated_by_index = {r.index: r for r in evaluated_results} + for i, result in enumerate(results): + if result.index in evaluated_by_index: + results[i] = evaluated_by_index[result.index] + except Exception: + logger.exception("Failed to compute metrics for evaluation run %s", evaluation_run_id) + + # Phase 3: Persist individual items + for result in results: + item_input = next((item for item in items if item.index == result.index), None) + run_item = EvaluationRunItem( + evaluation_run_id=evaluation_run_id, + item_index=result.index, + inputs=json.dumps(item_input.inputs) if item_input else None, + expected_output=item_input.expected_output if item_input else None, + context=json.dumps(item_input.context) if item_input and item_input.context else None, + actual_output=result.actual_output, + metrics=json.dumps([m.model_dump() for m in result.metrics]) if result.metrics else None, + metadata_json=json.dumps(result.metadata) if result.metadata else None, + error=result.error, + overall_score=result.overall_score, + ) + self.session.add(run_item) + + self.session.commit() + + return results diff --git a/api/core/evaluation/runners/llm_evaluation_runner.py b/api/core/evaluation/runners/llm_evaluation_runner.py new file mode 100644 index 0000000000..896c14acf0 --- /dev/null +++ b/api/core/evaluation/runners/llm_evaluation_runner.py @@ -0,0 +1,152 @@ +import logging +from typing import Any, Mapping, Union + +from sqlalchemy.orm import Session + +from core.evaluation.base_evaluation_instance import BaseEvaluationInstance +from core.evaluation.entities.evaluation_entity import ( + EvaluationItemInput, + EvaluationItemResult, +) +from core.evaluation.runners.base_evaluation_runner import BaseEvaluationRunner +from models.model import App, AppMode + +logger = logging.getLogger(__name__) + + +class LLMEvaluationRunner(BaseEvaluationRunner): + """Runner for LLM evaluation: executes App to get responses, then evaluates.""" + + def __init__(self, evaluation_instance: BaseEvaluationInstance, session: Session): + super().__init__(evaluation_instance, session) + + def execute_target( + self, + tenant_id: str, + target_id: str, + target_type: str, + item: EvaluationItemInput, + ) -> EvaluationItemResult: + """Execute the App/Snippet with the given inputs and collect the response.""" + from core.app.apps.completion.app_generator import CompletionAppGenerator + from core.app.apps.workflow.app_generator import WorkflowAppGenerator + from core.evaluation.runners import get_service_account_for_app + from core.app.entities.app_invoke_entities import InvokeFrom + from services.workflow_service import WorkflowService + + app = self.session.query(App).filter_by(id=target_id).first() + if not app: + raise ValueError(f"App {target_id} not found") + + # Get a service account for invocation + service_account = get_service_account_for_app(self.session, target_id) + + app_mode = AppMode.value_of(app.mode) + + # Build args from evaluation item inputs + args: dict[str, Any] = { + "inputs": item.inputs, + } + # For completion/chat modes, first text input becomes query + if app_mode in (AppMode.COMPLETION, AppMode.CHAT): + query = self._extract_query(item.inputs) + args["query"] = query + + if app_mode in (AppMode.WORKFLOW, AppMode.ADVANCED_CHAT): + workflow_service = WorkflowService() + workflow = workflow_service.get_published_workflow(app_model=app) + if not workflow: + raise ValueError(f"No published workflow found for app {target_id}") + + generator = WorkflowAppGenerator() + response: Mapping[str, Any] = generator.generate( + app_model=app, + workflow=workflow, + user=service_account, + args=args, + invoke_from=InvokeFrom.SERVICE_API, + streaming=False, + ) + elif app_mode == AppMode.COMPLETION: + generator = CompletionAppGenerator() + response = generator.generate( + app_model=app, + user=service_account, + args=args, + invoke_from=InvokeFrom.SERVICE_API, + streaming=False, + ) + else: + raise ValueError(f"Unsupported app mode for LLM evaluation: {app_mode}") + + actual_output = self._extract_output(response) + return EvaluationItemResult( + index=item.index, + actual_output=actual_output, + ) + + def evaluate_metrics( + self, + items: list[EvaluationItemInput], + results: list[EvaluationItemResult], + metrics_config: dict, + model_provider: str, + model_name: str, + tenant_id: str, + ) -> list[EvaluationItemResult]: + """Use the evaluation instance to compute LLM metrics.""" + # Merge actual_output into items for evaluation + merged_items = self._merge_results_into_items(items, results) + return self.evaluation_instance.evaluate_llm( + merged_items, metrics_config, model_provider, model_name, tenant_id + ) + + @staticmethod + def _extract_query(inputs: dict[str, Any]) -> str: + """Extract query from inputs.""" + for key in ("query", "question", "input", "text"): + if key in inputs: + return str(inputs[key]) + values = list(inputs.values()) + return str(values[0]) if values else "" + + @staticmethod + def _extract_output(response: Union[Mapping[str, Any], Any]) -> str: + """Extract text output from app response.""" + if isinstance(response, Mapping): + # Workflow response + if "data" in response and isinstance(response["data"], Mapping): + outputs = response["data"].get("outputs", {}) + if isinstance(outputs, Mapping): + values = list(outputs.values()) + return str(values[0]) if values else "" + return str(outputs) + # Completion response + if "answer" in response: + return str(response["answer"]) + if "text" in response: + return str(response["text"]) + return str(response) + + @staticmethod + def _merge_results_into_items( + items: list[EvaluationItemInput], + results: list[EvaluationItemResult], + ) -> list[EvaluationItemInput]: + """Create new items with actual_output set as expected_output context for metrics.""" + result_by_index = {r.index: r for r in results} + merged = [] + for item in items: + result = result_by_index.get(item.index) + if result and result.actual_output: + merged.append( + EvaluationItemInput( + index=item.index, + inputs=item.inputs, + expected_output=item.expected_output, + context=[result.actual_output] + (item.context or []), + ) + ) + else: + merged.append(item) + return merged diff --git a/api/core/evaluation/runners/retrieval_evaluation_runner.py b/api/core/evaluation/runners/retrieval_evaluation_runner.py new file mode 100644 index 0000000000..285e2c33b6 --- /dev/null +++ b/api/core/evaluation/runners/retrieval_evaluation_runner.py @@ -0,0 +1,111 @@ +import logging +from typing import Any + +from sqlalchemy.orm import Session + +from core.evaluation.base_evaluation_instance import BaseEvaluationInstance +from core.evaluation.entities.evaluation_entity import ( + EvaluationItemInput, + EvaluationItemResult, +) +from core.evaluation.runners.base_evaluation_runner import BaseEvaluationRunner + +logger = logging.getLogger(__name__) + + +class RetrievalEvaluationRunner(BaseEvaluationRunner): + """Runner for retrieval evaluation: performs knowledge base retrieval, then evaluates.""" + + def __init__(self, evaluation_instance: BaseEvaluationInstance, session: Session): + super().__init__(evaluation_instance, session) + + def execute_target( + self, + tenant_id: str, + target_id: str, + target_type: str, + item: EvaluationItemInput, + ) -> EvaluationItemResult: + """Execute retrieval using DatasetRetrieval and collect context documents.""" + from core.rag.retrieval.dataset_retrieval import DatasetRetrieval + + query = self._extract_query(item.inputs) + + dataset_retrieval = DatasetRetrieval() + + # Use knowledge_retrieval for structured results + try: + from core.rag.retrieval.dataset_retrieval import KnowledgeRetrievalRequest + + request = KnowledgeRetrievalRequest( + query=query, + app_id=target_id, + tenant_id=tenant_id, + ) + sources = dataset_retrieval.knowledge_retrieval(request) + retrieved_contexts = [source.content for source in sources if source.content] + except (ImportError, AttributeError): + logger.warning("KnowledgeRetrievalRequest not available, using simple retrieval") + retrieved_contexts = [] + + return EvaluationItemResult( + index=item.index, + actual_output="\n\n".join(retrieved_contexts) if retrieved_contexts else "", + metadata={"retrieved_contexts": retrieved_contexts}, + ) + + def evaluate_metrics( + self, + items: list[EvaluationItemInput], + results: list[EvaluationItemResult], + metrics_config: dict, + model_provider: str, + model_name: str, + tenant_id: str, + ) -> list[EvaluationItemResult]: + """Compute retrieval evaluation metrics.""" + # Merge retrieved contexts into items + result_by_index = {r.index: r for r in results} + merged_items = [] + for item in items: + result = result_by_index.get(item.index) + contexts = result.metadata.get("retrieved_contexts", []) if result else [] + merged_items.append( + EvaluationItemInput( + index=item.index, + inputs=item.inputs, + expected_output=item.expected_output, + context=contexts, + ) + ) + + evaluated = self.evaluation_instance.evaluate_retrieval( + merged_items, metrics_config, model_provider, model_name, tenant_id + ) + + # Merge metrics back into original results (preserve actual_output and metadata) + eval_by_index = {r.index: r for r in evaluated} + final_results = [] + for result in results: + if result.index in eval_by_index: + eval_result = eval_by_index[result.index] + final_results.append( + EvaluationItemResult( + index=result.index, + actual_output=result.actual_output, + metrics=eval_result.metrics, + metadata=result.metadata, + error=result.error, + ) + ) + else: + final_results.append(result) + return final_results + + @staticmethod + def _extract_query(inputs: dict[str, Any]) -> str: + for key in ("query", "question", "input", "text"): + if key in inputs: + return str(inputs[key]) + values = list(inputs.values()) + return str(values[0]) if values else "" diff --git a/api/core/evaluation/runners/workflow_evaluation_runner.py b/api/core/evaluation/runners/workflow_evaluation_runner.py new file mode 100644 index 0000000000..1f580eda2b --- /dev/null +++ b/api/core/evaluation/runners/workflow_evaluation_runner.py @@ -0,0 +1,133 @@ +import logging +from typing import Any, Mapping + +from sqlalchemy.orm import Session + +from core.evaluation.base_evaluation_instance import BaseEvaluationInstance +from core.evaluation.entities.evaluation_entity import ( + EvaluationItemInput, + EvaluationItemResult, +) +from core.evaluation.runners.base_evaluation_runner import BaseEvaluationRunner +from models.model import App + +logger = logging.getLogger(__name__) + + +class WorkflowEvaluationRunner(BaseEvaluationRunner): + """Runner for workflow evaluation: executes workflow App in non-streaming mode.""" + + def __init__(self, evaluation_instance: BaseEvaluationInstance, session: Session): + super().__init__(evaluation_instance, session) + + def execute_target( + self, + tenant_id: str, + target_id: str, + target_type: str, + item: EvaluationItemInput, + ) -> EvaluationItemResult: + """Execute workflow and collect outputs.""" + from core.app.apps.workflow.app_generator import WorkflowAppGenerator + from core.evaluation.runners import get_service_account_for_app + from core.app.entities.app_invoke_entities import InvokeFrom + from services.workflow_service import WorkflowService + + app = self.session.query(App).filter_by(id=target_id).first() + if not app: + raise ValueError(f"App {target_id} not found") + + service_account = get_service_account_for_app(self.session, target_id) + workflow_service = WorkflowService() + workflow = workflow_service.get_published_workflow(app_model=app) + if not workflow: + raise ValueError(f"No published workflow found for app {target_id}") + + args: dict[str, Any] = {"inputs": item.inputs} + + generator = WorkflowAppGenerator() + response: Mapping[str, Any] = generator.generate( + app_model=app, + workflow=workflow, + user=service_account, + args=args, + invoke_from=InvokeFrom.SERVICE_API, + streaming=False, + ) + + actual_output = self._extract_output(response) + node_executions = self._extract_node_executions(response) + + return EvaluationItemResult( + index=item.index, + actual_output=actual_output, + metadata={"node_executions": node_executions}, + ) + + def evaluate_metrics( + self, + items: list[EvaluationItemInput], + results: list[EvaluationItemResult], + metrics_config: dict, + model_provider: str, + model_name: str, + tenant_id: str, + ) -> list[EvaluationItemResult]: + """Compute workflow evaluation metrics (end-to-end).""" + result_by_index = {r.index: r for r in results} + merged_items = [] + for item in items: + result = result_by_index.get(item.index) + context = [] + if result and result.actual_output: + context.append(result.actual_output) + merged_items.append( + EvaluationItemInput( + index=item.index, + inputs=item.inputs, + expected_output=item.expected_output, + context=context + (item.context or []), + ) + ) + + evaluated = self.evaluation_instance.evaluate_workflow( + merged_items, metrics_config, model_provider, model_name, tenant_id + ) + + # Merge metrics back preserving metadata + eval_by_index = {r.index: r for r in evaluated} + final_results = [] + for result in results: + if result.index in eval_by_index: + eval_result = eval_by_index[result.index] + final_results.append( + EvaluationItemResult( + index=result.index, + actual_output=result.actual_output, + metrics=eval_result.metrics, + metadata=result.metadata, + error=result.error, + ) + ) + else: + final_results.append(result) + return final_results + + @staticmethod + def _extract_output(response: Mapping[str, Any]) -> str: + """Extract text output from workflow response.""" + if "data" in response and isinstance(response["data"], Mapping): + outputs = response["data"].get("outputs", {}) + if isinstance(outputs, Mapping): + values = list(outputs.values()) + return str(values[0]) if values else "" + return str(outputs) + return str(response) + + @staticmethod + def _extract_node_executions(response: Mapping[str, Any]) -> list[dict]: + """Extract node execution trace from workflow response.""" + data = response.get("data", {}) + if isinstance(data, Mapping): + return data.get("node_executions", []) + return [] diff --git a/api/migrations/versions/2026_03_03_0001-a1b2c3d4e5f6_add_evaluation_tables.py b/api/migrations/versions/2026_03_03_0001-a1b2c3d4e5f6_add_evaluation_tables.py new file mode 100644 index 0000000000..9138256f36 --- /dev/null +++ b/api/migrations/versions/2026_03_03_0001-a1b2c3d4e5f6_add_evaluation_tables.py @@ -0,0 +1,113 @@ +"""add_evaluation_tables + +Revision ID: a1b2c3d4e5f6 +Revises: 1c05e80d2380 +Create Date: 2026-03-03 00:01:00.000000 + +""" + +import sqlalchemy as sa +from alembic import op + +import models as models + + +# revision identifiers, used by Alembic. +revision = "a1b2c3d4e5f6" +down_revision = "1c05e80d2380" +branch_labels = None +depends_on = None + + +def upgrade(): + # evaluation_configurations + op.create_table( + "evaluation_configurations", + sa.Column("id", models.types.StringUUID(), nullable=False), + sa.Column("tenant_id", models.types.StringUUID(), nullable=False), + sa.Column("target_type", sa.String(length=20), nullable=False), + sa.Column("target_id", models.types.StringUUID(), nullable=False), + sa.Column("evaluation_model_provider", sa.String(length=255), nullable=True), + sa.Column("evaluation_model", sa.String(length=255), nullable=True), + sa.Column("metrics_config", models.types.LongText(), nullable=True), + sa.Column("judgement_conditions", models.types.LongText(), nullable=True), + sa.Column("created_by", models.types.StringUUID(), nullable=False), + sa.Column("updated_by", models.types.StringUUID(), nullable=False), + sa.Column("created_at", sa.DateTime(), server_default=sa.func.current_timestamp(), nullable=False), + sa.Column("updated_at", sa.DateTime(), server_default=sa.func.current_timestamp(), nullable=False), + sa.PrimaryKeyConstraint("id", name="evaluation_configuration_pkey"), + sa.UniqueConstraint("tenant_id", "target_type", "target_id", name="evaluation_configuration_unique"), + ) + with op.batch_alter_table("evaluation_configurations", schema=None) as batch_op: + batch_op.create_index( + "evaluation_configuration_target_idx", ["tenant_id", "target_type", "target_id"], unique=False + ) + + # evaluation_runs + op.create_table( + "evaluation_runs", + sa.Column("id", models.types.StringUUID(), nullable=False), + sa.Column("tenant_id", models.types.StringUUID(), nullable=False), + sa.Column("target_type", sa.String(length=20), nullable=False), + sa.Column("target_id", models.types.StringUUID(), nullable=False), + sa.Column("evaluation_config_id", models.types.StringUUID(), nullable=False), + sa.Column("status", sa.String(length=20), nullable=False, server_default=sa.text("'pending'")), + sa.Column("dataset_file_id", models.types.StringUUID(), nullable=True), + sa.Column("result_file_id", models.types.StringUUID(), nullable=True), + sa.Column("total_items", sa.Integer(), nullable=False, server_default=sa.text("0")), + sa.Column("completed_items", sa.Integer(), nullable=False, server_default=sa.text("0")), + sa.Column("failed_items", sa.Integer(), nullable=False, server_default=sa.text("0")), + sa.Column("metrics_summary", models.types.LongText(), nullable=True), + sa.Column("error", sa.Text(), nullable=True), + sa.Column("celery_task_id", sa.String(length=255), nullable=True), + sa.Column("created_by", models.types.StringUUID(), nullable=False), + sa.Column("started_at", sa.DateTime(), nullable=True), + sa.Column("completed_at", sa.DateTime(), nullable=True), + sa.Column("created_at", sa.DateTime(), server_default=sa.func.current_timestamp(), nullable=False), + sa.Column("updated_at", sa.DateTime(), server_default=sa.func.current_timestamp(), nullable=False), + sa.PrimaryKeyConstraint("id", name="evaluation_run_pkey"), + ) + with op.batch_alter_table("evaluation_runs", schema=None) as batch_op: + batch_op.create_index( + "evaluation_run_target_idx", ["tenant_id", "target_type", "target_id"], unique=False + ) + batch_op.create_index("evaluation_run_status_idx", ["tenant_id", "status"], unique=False) + + # evaluation_run_items + op.create_table( + "evaluation_run_items", + sa.Column("id", models.types.StringUUID(), nullable=False), + sa.Column("evaluation_run_id", models.types.StringUUID(), nullable=False), + sa.Column("item_index", sa.Integer(), nullable=False), + sa.Column("inputs", models.types.LongText(), nullable=True), + sa.Column("expected_output", models.types.LongText(), nullable=True), + sa.Column("context", models.types.LongText(), nullable=True), + sa.Column("actual_output", models.types.LongText(), nullable=True), + sa.Column("metrics", models.types.LongText(), nullable=True), + sa.Column("metadata_json", models.types.LongText(), nullable=True), + sa.Column("error", sa.Text(), nullable=True), + sa.Column("overall_score", sa.Float(), nullable=True), + sa.Column("created_at", sa.DateTime(), server_default=sa.func.current_timestamp(), nullable=False), + sa.PrimaryKeyConstraint("id", name="evaluation_run_item_pkey"), + ) + with op.batch_alter_table("evaluation_run_items", schema=None) as batch_op: + batch_op.create_index("evaluation_run_item_run_idx", ["evaluation_run_id"], unique=False) + batch_op.create_index( + "evaluation_run_item_index_idx", ["evaluation_run_id", "item_index"], unique=False + ) + + +def downgrade(): + with op.batch_alter_table("evaluation_run_items", schema=None) as batch_op: + batch_op.drop_index("evaluation_run_item_index_idx") + batch_op.drop_index("evaluation_run_item_run_idx") + op.drop_table("evaluation_run_items") + + with op.batch_alter_table("evaluation_runs", schema=None) as batch_op: + batch_op.drop_index("evaluation_run_status_idx") + batch_op.drop_index("evaluation_run_target_idx") + op.drop_table("evaluation_runs") + + with op.batch_alter_table("evaluation_configurations", schema=None) as batch_op: + batch_op.drop_index("evaluation_configuration_target_idx") + op.drop_table("evaluation_configurations") diff --git a/api/models/__init__.py b/api/models/__init__.py index 3fdca7f7a1..edd0001093 100644 --- a/api/models/__init__.py +++ b/api/models/__init__.py @@ -26,6 +26,13 @@ from .dataset import ( TidbAuthBinding, Whitelist, ) +from .evaluation import ( + EvaluationConfiguration, + EvaluationRun, + EvaluationRunItem, + EvaluationRunStatus, + EvaluationTargetType, +) from .enums import ( AppTriggerStatus, AppTriggerType, @@ -158,6 +165,11 @@ __all__ = [ "Document", "DocumentSegment", "Embedding", + "EvaluationConfiguration", + "EvaluationRun", + "EvaluationRunItem", + "EvaluationRunStatus", + "EvaluationTargetType", "EndUser", "ExecutionExtraContent", "ExporleBanner", diff --git a/api/models/evaluation.py b/api/models/evaluation.py new file mode 100644 index 0000000000..1a428b590c --- /dev/null +++ b/api/models/evaluation.py @@ -0,0 +1,193 @@ +from __future__ import annotations + +import json +from datetime import datetime +from enum import StrEnum +from typing import Any + +import sqlalchemy as sa +from sqlalchemy import DateTime, Float, Integer, String, Text, func +from sqlalchemy.orm import Mapped, mapped_column + +from libs.uuid_utils import uuidv7 + +from .base import Base +from .types import LongText, StringUUID + + +class EvaluationRunStatus(StrEnum): + PENDING = "pending" + RUNNING = "running" + COMPLETED = "completed" + FAILED = "failed" + CANCELLED = "cancelled" + + +class EvaluationTargetType(StrEnum): + APP = "app" + SNIPPETS = "snippets" + + +class EvaluationConfiguration(Base): + """Stores evaluation configuration for each target (App or Snippet).""" + + __tablename__ = "evaluation_configurations" + __table_args__ = ( + sa.PrimaryKeyConstraint("id", name="evaluation_configuration_pkey"), + sa.Index("evaluation_configuration_target_idx", "tenant_id", "target_type", "target_id"), + sa.UniqueConstraint("tenant_id", "target_type", "target_id", name="evaluation_configuration_unique"), + ) + + id: Mapped[str] = mapped_column(StringUUID, default=lambda: str(uuidv7())) + tenant_id: Mapped[str] = mapped_column(StringUUID, nullable=False) + target_type: Mapped[str] = mapped_column(String(20), nullable=False) + target_id: Mapped[str] = mapped_column(StringUUID, nullable=False) + + evaluation_model_provider: Mapped[str | None] = mapped_column(String(255), nullable=True) + evaluation_model: Mapped[str | None] = mapped_column(String(255), nullable=True) + metrics_config: Mapped[str | None] = mapped_column(LongText, nullable=True) + judgement_conditions: Mapped[str | None] = mapped_column(LongText, nullable=True) + + created_by: Mapped[str] = mapped_column(StringUUID, nullable=False) + updated_by: Mapped[str] = mapped_column(StringUUID, nullable=False) + created_at: Mapped[datetime] = mapped_column( + DateTime, nullable=False, server_default=func.current_timestamp() + ) + updated_at: Mapped[datetime] = mapped_column( + DateTime, nullable=False, server_default=func.current_timestamp(), onupdate=func.current_timestamp() + ) + + @property + def metrics_config_dict(self) -> dict[str, Any]: + if self.metrics_config: + return json.loads(self.metrics_config) + return {} + + @metrics_config_dict.setter + def metrics_config_dict(self, value: dict[str, Any]) -> None: + self.metrics_config = json.dumps(value) + + @property + def judgement_conditions_dict(self) -> dict[str, Any]: + if self.judgement_conditions: + return json.loads(self.judgement_conditions) + return {} + + @judgement_conditions_dict.setter + def judgement_conditions_dict(self, value: dict[str, Any]) -> None: + self.judgement_conditions = json.dumps(value) + + def __repr__(self) -> str: + return f"" + + +class EvaluationRun(Base): + """Stores each evaluation run record.""" + + __tablename__ = "evaluation_runs" + __table_args__ = ( + sa.PrimaryKeyConstraint("id", name="evaluation_run_pkey"), + sa.Index("evaluation_run_target_idx", "tenant_id", "target_type", "target_id"), + sa.Index("evaluation_run_status_idx", "tenant_id", "status"), + ) + + id: Mapped[str] = mapped_column(StringUUID, default=lambda: str(uuidv7())) + tenant_id: Mapped[str] = mapped_column(StringUUID, nullable=False) + target_type: Mapped[str] = mapped_column(String(20), nullable=False) + target_id: Mapped[str] = mapped_column(StringUUID, nullable=False) + evaluation_config_id: Mapped[str] = mapped_column(StringUUID, nullable=False) + + status: Mapped[str] = mapped_column( + String(20), nullable=False, default=EvaluationRunStatus.PENDING + ) + dataset_file_id: Mapped[str | None] = mapped_column(StringUUID, nullable=True) + result_file_id: Mapped[str | None] = mapped_column(StringUUID, nullable=True) + + total_items: Mapped[int] = mapped_column(Integer, nullable=False, default=0) + completed_items: Mapped[int] = mapped_column(Integer, nullable=False, default=0) + failed_items: Mapped[int] = mapped_column(Integer, nullable=False, default=0) + + metrics_summary: Mapped[str | None] = mapped_column(LongText, nullable=True) + error: Mapped[str | None] = mapped_column(Text, nullable=True) + + celery_task_id: Mapped[str | None] = mapped_column(String(255), nullable=True) + + created_by: Mapped[str] = mapped_column(StringUUID, nullable=False) + started_at: Mapped[datetime | None] = mapped_column(DateTime, nullable=True) + completed_at: Mapped[datetime | None] = mapped_column(DateTime, nullable=True) + created_at: Mapped[datetime] = mapped_column( + DateTime, nullable=False, server_default=func.current_timestamp() + ) + updated_at: Mapped[datetime] = mapped_column( + DateTime, nullable=False, server_default=func.current_timestamp(), onupdate=func.current_timestamp() + ) + + @property + def metrics_summary_dict(self) -> dict[str, Any]: + if self.metrics_summary: + return json.loads(self.metrics_summary) + return {} + + @metrics_summary_dict.setter + def metrics_summary_dict(self, value: dict[str, Any]) -> None: + self.metrics_summary = json.dumps(value) + + @property + def progress(self) -> float: + if self.total_items == 0: + return 0.0 + return (self.completed_items + self.failed_items) / self.total_items + + def __repr__(self) -> str: + return f"" + + +class EvaluationRunItem(Base): + """Stores per-row evaluation results.""" + + __tablename__ = "evaluation_run_items" + __table_args__ = ( + sa.PrimaryKeyConstraint("id", name="evaluation_run_item_pkey"), + sa.Index("evaluation_run_item_run_idx", "evaluation_run_id"), + sa.Index("evaluation_run_item_index_idx", "evaluation_run_id", "item_index"), + ) + + id: Mapped[str] = mapped_column(StringUUID, default=lambda: str(uuidv7())) + evaluation_run_id: Mapped[str] = mapped_column(StringUUID, nullable=False) + + item_index: Mapped[int] = mapped_column(Integer, nullable=False) + inputs: Mapped[str | None] = mapped_column(LongText, nullable=True) + expected_output: Mapped[str | None] = mapped_column(LongText, nullable=True) + context: Mapped[str | None] = mapped_column(LongText, nullable=True) + actual_output: Mapped[str | None] = mapped_column(LongText, nullable=True) + + metrics: Mapped[str | None] = mapped_column(LongText, nullable=True) + metadata_json: Mapped[str | None] = mapped_column(LongText, nullable=True) + error: Mapped[str | None] = mapped_column(Text, nullable=True) + + overall_score: Mapped[float | None] = mapped_column(Float, nullable=True) + + created_at: Mapped[datetime] = mapped_column( + DateTime, nullable=False, server_default=func.current_timestamp() + ) + + @property + def inputs_dict(self) -> dict[str, Any]: + if self.inputs: + return json.loads(self.inputs) + return {} + + @property + def metrics_list(self) -> list[dict[str, Any]]: + if self.metrics: + return json.loads(self.metrics) + return [] + + @property + def metadata_dict(self) -> dict[str, Any]: + if self.metadata_json: + return json.loads(self.metadata_json) + return {} + + def __repr__(self) -> str: + return f"" diff --git a/api/pyproject.toml b/api/pyproject.toml index 530b0c0da3..26cffc43f4 100644 --- a/api/pyproject.toml +++ b/api/pyproject.toml @@ -199,6 +199,12 @@ storage = [ ############################################################ tools = ["cloudscraper~=1.2.71", "nltk~=3.9.1"] +############################################################ +# [ Evaluation ] dependency group +# Required for evaluation frameworks +############################################################ +evaluation = ["ragas>=0.2.0"] + ############################################################ # [ VDB ] dependency group # Required by vector store clients diff --git a/api/services/errors/evaluation.py b/api/services/errors/evaluation.py new file mode 100644 index 0000000000..6affb68d21 --- /dev/null +++ b/api/services/errors/evaluation.py @@ -0,0 +1,21 @@ +from services.errors.base import BaseServiceError + + +class EvaluationFrameworkNotConfiguredError(BaseServiceError): + def __init__(self, description: str | None = None): + super().__init__(description or "Evaluation framework is not configured. Set EVALUATION_FRAMEWORK env var.") + + +class EvaluationNotFoundError(BaseServiceError): + def __init__(self, description: str | None = None): + super().__init__(description or "Evaluation not found.") + + +class EvaluationDatasetInvalidError(BaseServiceError): + def __init__(self, description: str | None = None): + super().__init__(description or "Evaluation dataset is invalid.") + + +class EvaluationMaxConcurrentRunsError(BaseServiceError): + def __init__(self, description: str | None = None): + super().__init__(description or "Maximum number of concurrent evaluation runs reached.") diff --git a/api/services/evaluation_service.py b/api/services/evaluation_service.py index 812aade9b3..3c55595fbe 100644 --- a/api/services/evaluation_service.py +++ b/api/services/evaluation_service.py @@ -1,13 +1,34 @@ import io +import json import logging -from typing import Union +from typing import Any, Union -from openpyxl import Workbook +from openpyxl import Workbook, load_workbook from openpyxl.styles import Alignment, Border, Font, PatternFill, Side from openpyxl.utils import get_column_letter +from sqlalchemy.orm import Session +from configs import dify_config +from core.evaluation.entities.evaluation_entity import ( + EvaluationCategory, + EvaluationItemInput, + EvaluationRunData, +) +from core.evaluation.evaluation_manager import EvaluationManager +from models.evaluation import ( + EvaluationConfiguration, + EvaluationRun, + EvaluationRunItem, + EvaluationRunStatus, +) from models.model import App, AppMode from models.snippet import CustomizedSnippet +from services.errors.evaluation import ( + EvaluationDatasetInvalidError, + EvaluationFrameworkNotConfiguredError, + EvaluationMaxConcurrentRunsError, + EvaluationNotFoundError, +) from services.snippet_service import SnippetService from services.workflow_service import WorkflowService @@ -176,3 +197,264 @@ class EvaluationService: output.seek(0) return output.getvalue() + + # ---- Evaluation Configuration CRUD ---- + + @classmethod + def get_evaluation_config( + cls, + session: Session, + tenant_id: str, + target_type: str, + target_id: str, + ) -> EvaluationConfiguration | None: + return ( + session.query(EvaluationConfiguration) + .filter_by(tenant_id=tenant_id, target_type=target_type, target_id=target_id) + .first() + ) + + @classmethod + def save_evaluation_config( + cls, + session: Session, + tenant_id: str, + target_type: str, + target_id: str, + account_id: str, + data: dict[str, Any], + ) -> EvaluationConfiguration: + config = cls.get_evaluation_config(session, tenant_id, target_type, target_id) + if config is None: + config = EvaluationConfiguration( + tenant_id=tenant_id, + target_type=target_type, + target_id=target_id, + created_by=account_id, + updated_by=account_id, + ) + session.add(config) + + config.evaluation_model_provider = data.get("evaluation_model_provider") + config.evaluation_model = data.get("evaluation_model") + config.metrics_config = json.dumps(data.get("metrics_config", {})) + config.judgement_conditions = json.dumps(data.get("judgement_conditions", {})) + config.updated_by = account_id + session.commit() + session.refresh(config) + return config + + # ---- Evaluation Run Management ---- + + @classmethod + def start_evaluation_run( + cls, + session: Session, + tenant_id: str, + target_type: str, + target_id: str, + account_id: str, + dataset_file_content: bytes, + evaluation_category: EvaluationCategory, + ) -> EvaluationRun: + """Validate dataset, create run record, dispatch Celery task.""" + # Check framework is configured + evaluation_instance = EvaluationManager.get_evaluation_instance() + if evaluation_instance is None: + raise EvaluationFrameworkNotConfiguredError() + + # Check evaluation config exists + config = cls.get_evaluation_config(session, tenant_id, target_type, target_id) + if config is None: + raise EvaluationNotFoundError("Evaluation configuration not found. Please configure evaluation first.") + + # Check concurrent run limit + active_runs = ( + session.query(EvaluationRun) + .filter_by(tenant_id=tenant_id) + .filter(EvaluationRun.status.in_([EvaluationRunStatus.PENDING, EvaluationRunStatus.RUNNING])) + .count() + ) + max_concurrent = dify_config.EVALUATION_MAX_CONCURRENT_RUNS + if active_runs >= max_concurrent: + raise EvaluationMaxConcurrentRunsError( + f"Maximum concurrent runs ({max_concurrent}) reached." + ) + + # Parse dataset + items = cls._parse_dataset(dataset_file_content) + max_rows = dify_config.EVALUATION_MAX_DATASET_ROWS + if len(items) > max_rows: + raise EvaluationDatasetInvalidError(f"Dataset has {len(items)} rows, max is {max_rows}.") + + # Create evaluation run + evaluation_run = EvaluationRun( + tenant_id=tenant_id, + target_type=target_type, + target_id=target_id, + evaluation_config_id=config.id, + status=EvaluationRunStatus.PENDING, + total_items=len(items), + created_by=account_id, + ) + session.add(evaluation_run) + session.commit() + session.refresh(evaluation_run) + + # Build Celery task data + run_data = EvaluationRunData( + evaluation_run_id=evaluation_run.id, + tenant_id=tenant_id, + target_type=target_type, + target_id=target_id, + evaluation_category=evaluation_category, + evaluation_model_provider=config.evaluation_model_provider or "", + evaluation_model=config.evaluation_model or "", + metrics_config=config.metrics_config_dict, + items=items, + ) + + # Dispatch Celery task + from tasks.evaluation_task import run_evaluation + + task = run_evaluation.delay(run_data.model_dump()) + evaluation_run.celery_task_id = task.id + session.commit() + + return evaluation_run + + @classmethod + def get_evaluation_runs( + cls, + session: Session, + tenant_id: str, + target_type: str, + target_id: str, + page: int = 1, + page_size: int = 20, + ) -> tuple[list[EvaluationRun], int]: + """Query evaluation run history with pagination.""" + query = ( + session.query(EvaluationRun) + .filter_by(tenant_id=tenant_id, target_type=target_type, target_id=target_id) + .order_by(EvaluationRun.created_at.desc()) + ) + total = query.count() + runs = query.offset((page - 1) * page_size).limit(page_size).all() + return runs, total + + @classmethod + def get_evaluation_run_detail( + cls, + session: Session, + tenant_id: str, + run_id: str, + ) -> EvaluationRun: + run = ( + session.query(EvaluationRun) + .filter_by(id=run_id, tenant_id=tenant_id) + .first() + ) + if not run: + raise EvaluationNotFoundError("Evaluation run not found.") + return run + + @classmethod + def get_evaluation_run_items( + cls, + session: Session, + run_id: str, + page: int = 1, + page_size: int = 50, + ) -> tuple[list[EvaluationRunItem], int]: + """Query evaluation run items with pagination.""" + query = ( + session.query(EvaluationRunItem) + .filter_by(evaluation_run_id=run_id) + .order_by(EvaluationRunItem.item_index.asc()) + ) + total = query.count() + items = query.offset((page - 1) * page_size).limit(page_size).all() + return items, total + + @classmethod + def cancel_evaluation_run( + cls, + session: Session, + tenant_id: str, + run_id: str, + ) -> EvaluationRun: + run = cls.get_evaluation_run_detail(session, tenant_id, run_id) + if run.status not in (EvaluationRunStatus.PENDING, EvaluationRunStatus.RUNNING): + raise ValueError(f"Cannot cancel evaluation run in status: {run.status}") + + run.status = EvaluationRunStatus.CANCELLED + + # Revoke Celery task if running + if run.celery_task_id: + try: + from celery import current_app as celery_app + + celery_app.control.revoke(run.celery_task_id, terminate=True) + except Exception: + logger.exception("Failed to revoke Celery task %s", run.celery_task_id) + + session.commit() + return run + + @classmethod + def get_supported_metrics(cls, category: EvaluationCategory) -> list[str]: + return EvaluationManager.get_supported_metrics(category) + + # ---- Dataset Parsing ---- + + @classmethod + def _parse_dataset(cls, xlsx_content: bytes) -> list[EvaluationItemInput]: + """Parse evaluation dataset from XLSX bytes.""" + wb = load_workbook(io.BytesIO(xlsx_content), read_only=True) + ws = wb.active + if ws is None: + raise EvaluationDatasetInvalidError("XLSX file has no active worksheet.") + + rows = list(ws.iter_rows(values_only=True)) + if len(rows) < 2: + raise EvaluationDatasetInvalidError("Dataset must have at least a header row and one data row.") + + headers = [str(h).strip() if h is not None else "" for h in rows[0]] + if not headers or headers[0].lower() != "index": + raise EvaluationDatasetInvalidError("First column header must be 'index'.") + + input_headers = headers[1:] # Skip 'index' + items = [] + for row_idx, row in enumerate(rows[1:], start=1): + values = list(row) + if all(v is None or str(v).strip() == "" for v in values): + continue # Skip empty rows + + index_val = values[0] if values else row_idx + try: + index = int(index_val) + except (TypeError, ValueError): + index = row_idx + + inputs: dict[str, Any] = {} + for col_idx, header in enumerate(input_headers): + val = values[col_idx + 1] if col_idx + 1 < len(values) else None + inputs[header] = str(val) if val is not None else "" + + # Check for expected_output column + expected_output = inputs.pop("expected_output", None) + context_str = inputs.pop("context", None) + context = context_str.split(";") if context_str else None + + items.append( + EvaluationItemInput( + index=index, + inputs=inputs, + expected_output=expected_output, + context=context, + ) + ) + + wb.close() + return items diff --git a/api/tasks/evaluation_task.py b/api/tasks/evaluation_task.py new file mode 100644 index 0000000000..8bfd9cbf87 --- /dev/null +++ b/api/tasks/evaluation_task.py @@ -0,0 +1,309 @@ +import io +import json +import logging +from typing import Any + +from celery import shared_task +from openpyxl import Workbook +from openpyxl.styles import Alignment, Border, Font, PatternFill, Side +from openpyxl.utils import get_column_letter + +from core.evaluation.entities.evaluation_entity import ( + EvaluationCategory, + EvaluationItemResult, + EvaluationRunData, +) +from core.evaluation.evaluation_manager import EvaluationManager +from core.evaluation.runners.agent_evaluation_runner import AgentEvaluationRunner +from core.evaluation.runners.llm_evaluation_runner import LLMEvaluationRunner +from core.evaluation.runners.retrieval_evaluation_runner import RetrievalEvaluationRunner +from core.evaluation.runners.workflow_evaluation_runner import WorkflowEvaluationRunner +from extensions.ext_database import db +from libs.datetime_utils import naive_utc_now +from models.evaluation import EvaluationRun, EvaluationRunStatus + +logger = logging.getLogger(__name__) + + +@shared_task(queue="evaluation") +def run_evaluation(run_data_dict: dict[str, Any]) -> None: + """Celery task for running evaluations asynchronously. + + Workflow: + 1. Deserialize EvaluationRunData + 2. Update status to RUNNING + 3. Select appropriate Runner based on evaluation_category + 4. Execute runner.run() which handles target execution + metric computation + 5. Generate result XLSX + 6. Update EvaluationRun status to COMPLETED + """ + run_data = EvaluationRunData.model_validate(run_data_dict) + + with db.engine.connect() as connection: + from sqlalchemy.orm import Session + + session = Session(bind=connection) + + try: + _execute_evaluation(session, run_data) + except Exception as e: + logger.exception("Evaluation run %s failed", run_data.evaluation_run_id) + _mark_run_failed(session, run_data.evaluation_run_id, str(e)) + finally: + session.close() + + +def _execute_evaluation(session: Any, run_data: EvaluationRunData) -> None: + """Core evaluation execution logic.""" + evaluation_run = session.query(EvaluationRun).filter_by(id=run_data.evaluation_run_id).first() + if not evaluation_run: + logger.error("EvaluationRun %s not found", run_data.evaluation_run_id) + return + + # Check if cancelled + if evaluation_run.status == EvaluationRunStatus.CANCELLED: + logger.info("EvaluationRun %s was cancelled", run_data.evaluation_run_id) + return + + # Get evaluation instance + evaluation_instance = EvaluationManager.get_evaluation_instance() + if evaluation_instance is None: + raise ValueError("Evaluation framework not configured") + + # Select runner based on category + runner = _create_runner(run_data.evaluation_category, evaluation_instance, session) + + # Execute evaluation + results = runner.run( + evaluation_run_id=run_data.evaluation_run_id, + tenant_id=run_data.tenant_id, + target_id=run_data.target_id, + target_type=run_data.target_type, + items=run_data.items, + metrics_config=run_data.metrics_config, + model_provider=run_data.evaluation_model_provider, + model_name=run_data.evaluation_model, + ) + + # Compute summary metrics + metrics_summary = _compute_metrics_summary(results) + + # Generate result XLSX + result_xlsx = _generate_result_xlsx(run_data.items, results) + + # Store result file + result_file_id = _store_result_file( + run_data.tenant_id, run_data.evaluation_run_id, result_xlsx, session + ) + + # Update run to completed + evaluation_run = session.query(EvaluationRun).filter_by(id=run_data.evaluation_run_id).first() + if evaluation_run: + evaluation_run.status = EvaluationRunStatus.COMPLETED + evaluation_run.completed_at = naive_utc_now() + evaluation_run.metrics_summary = json.dumps(metrics_summary) + if result_file_id: + evaluation_run.result_file_id = result_file_id + session.commit() + + logger.info("Evaluation run %s completed successfully", run_data.evaluation_run_id) + + +def _create_runner( + category: EvaluationCategory, + evaluation_instance: Any, + session: Any, +) -> Any: + """Create the appropriate runner for the evaluation category.""" + match category: + case EvaluationCategory.LLM: + return LLMEvaluationRunner(evaluation_instance, session) + case EvaluationCategory.RETRIEVAL: + return RetrievalEvaluationRunner(evaluation_instance, session) + case EvaluationCategory.AGENT: + return AgentEvaluationRunner(evaluation_instance, session) + case EvaluationCategory.WORKFLOW: + return WorkflowEvaluationRunner(evaluation_instance, session) + case _: + raise ValueError(f"Unknown evaluation category: {category}") + + +def _mark_run_failed(session: Any, run_id: str, error: str) -> None: + """Mark an evaluation run as failed.""" + try: + evaluation_run = session.query(EvaluationRun).filter_by(id=run_id).first() + if evaluation_run: + evaluation_run.status = EvaluationRunStatus.FAILED + evaluation_run.error = error[:2000] # Truncate error + evaluation_run.completed_at = naive_utc_now() + session.commit() + except Exception: + logger.exception("Failed to mark run %s as failed", run_id) + + +def _compute_metrics_summary(results: list[EvaluationItemResult]) -> dict[str, Any]: + """Compute average scores per metric across all results.""" + metric_scores: dict[str, list[float]] = {} + for result in results: + if result.error: + continue + for metric in result.metrics: + if metric.name not in metric_scores: + metric_scores[metric.name] = [] + metric_scores[metric.name].append(metric.score) + + summary: dict[str, Any] = {} + for name, scores in metric_scores.items(): + summary[name] = { + "average": sum(scores) / len(scores) if scores else 0.0, + "min": min(scores) if scores else 0.0, + "max": max(scores) if scores else 0.0, + "count": len(scores), + } + + # Overall average + all_scores = [s for scores in metric_scores.values() for s in scores] + summary["_overall"] = { + "average": sum(all_scores) / len(all_scores) if all_scores else 0.0, + "total_items": len(results), + "successful_items": sum(1 for r in results if r.error is None), + "failed_items": sum(1 for r in results if r.error is not None), + } + + return summary + + +def _generate_result_xlsx( + items: list[Any], + results: list[EvaluationItemResult], +) -> bytes: + """Generate result XLSX with input data, actual output, and metric scores.""" + wb = Workbook() + ws = wb.active + ws.title = "Evaluation Results" + + header_font = Font(bold=True, color="FFFFFF") + header_fill = PatternFill(start_color="4472C4", end_color="4472C4", fill_type="solid") + header_alignment = Alignment(horizontal="center", vertical="center") + thin_border = Border( + left=Side(style="thin"), + right=Side(style="thin"), + top=Side(style="thin"), + bottom=Side(style="thin"), + ) + + # Collect all metric names + all_metric_names: list[str] = [] + for result in results: + for metric in result.metrics: + if metric.name not in all_metric_names: + all_metric_names.append(metric.name) + + # Collect all input keys + input_keys: list[str] = [] + for item in items: + for key in item.inputs: + if key not in input_keys: + input_keys.append(key) + + # Build headers + headers = ["index"] + input_keys + ["expected_output", "actual_output"] + all_metric_names + ["overall_score", "error"] + + # Write header row + for col_idx, header in enumerate(headers, start=1): + cell = ws.cell(row=1, column=col_idx, value=header) + cell.font = header_font + cell.fill = header_fill + cell.alignment = header_alignment + cell.border = thin_border + + # Set column widths + ws.column_dimensions["A"].width = 10 + for col_idx in range(2, len(headers) + 1): + ws.column_dimensions[get_column_letter(col_idx)].width = 25 + + # Build result lookup + result_by_index = {r.index: r for r in results} + + # Write data rows + for row_idx, item in enumerate(items, start=2): + result = result_by_index.get(item.index) + + col = 1 + # Index + ws.cell(row=row_idx, column=col, value=item.index).border = thin_border + col += 1 + + # Input values + for key in input_keys: + val = item.inputs.get(key, "") + ws.cell(row=row_idx, column=col, value=str(val)).border = thin_border + col += 1 + + # Expected output + ws.cell(row=row_idx, column=col, value=item.expected_output or "").border = thin_border + col += 1 + + # Actual output + ws.cell(row=row_idx, column=col, value=result.actual_output if result else "").border = thin_border + col += 1 + + # Metric scores + metric_scores = {m.name: m.score for m in result.metrics} if result else {} + for metric_name in all_metric_names: + score = metric_scores.get(metric_name) + ws.cell(row=row_idx, column=col, value=score if score is not None else "").border = thin_border + col += 1 + + # Overall score + ws.cell( + row=row_idx, column=col, value=result.overall_score if result else "" + ).border = thin_border + col += 1 + + # Error + ws.cell(row=row_idx, column=col, value=result.error if result else "").border = thin_border + + output = io.BytesIO() + wb.save(output) + output.seek(0) + return output.getvalue() + + +def _store_result_file( + tenant_id: str, + run_id: str, + xlsx_content: bytes, + session: Any, +) -> str | None: + """Store result XLSX file and return the UploadFile ID.""" + try: + from extensions.ext_storage import storage + from models.model import UploadFile + + from libs.uuid_utils import uuidv7 + + file_id = str(uuidv7()) + filename = f"evaluation-result-{run_id[:8]}.xlsx" + storage_key = f"evaluation_results/{tenant_id}/{file_id}.xlsx" + + storage.save(storage_key, xlsx_content) + + upload_file = UploadFile( + id=file_id, + tenant_id=tenant_id, + storage_type="evaluation_result", + key=storage_key, + name=filename, + size=len(xlsx_content), + extension="xlsx", + mime_type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", + created_by_role="account", + created_by="system", + ) + session.add(upload_file) + session.commit() + return file_id + except Exception: + logger.exception("Failed to store result file for run %s", run_id) + return None