diff --git a/api/core/evaluation/entities/config_entity.py b/api/core/evaluation/entities/config_entity.py index b1a48b894d..dc36f435b3 100644 --- a/api/core/evaluation/entities/config_entity.py +++ b/api/core/evaluation/entities/config_entity.py @@ -6,6 +6,7 @@ from pydantic import BaseModel class EvaluationFrameworkEnum(StrEnum): RAGAS = "ragas" DEEPEVAL = "deepeval" + CUSTOMIZED = "customized" NONE = "none" @@ -17,3 +18,8 @@ class BaseEvaluationConfig(BaseModel): class RagasConfig(BaseEvaluationConfig): """RAGAS-specific configuration.""" pass + + +class CustomizedEvaluatorConfig(BaseEvaluationConfig): + """Configuration for the customized workflow-based evaluator.""" + pass diff --git a/api/core/evaluation/entities/evaluation_entity.py b/api/core/evaluation/entities/evaluation_entity.py index 61a067a3a3..026fde642e 100644 --- a/api/core/evaluation/entities/evaluation_entity.py +++ b/api/core/evaluation/entities/evaluation_entity.py @@ -3,6 +3,8 @@ from typing import Any from pydantic import BaseModel, Field +from core.evaluation.entities.judgment_entity import JudgmentConfig, JudgmentResult + class EvaluationCategory(StrEnum): LLM = "llm" @@ -28,6 +30,7 @@ class EvaluationItemResult(BaseModel): index: int actual_output: str | None = None metrics: list[EvaluationMetric] = Field(default_factory=list) + judgment: JudgmentResult | None = None metadata: dict[str, Any] = Field(default_factory=dict) error: str | None = None @@ -49,4 +52,5 @@ class EvaluationRunData(BaseModel): evaluation_model_provider: str evaluation_model: str metrics_config: dict[str, Any] = Field(default_factory=dict) + judgment_config: JudgmentConfig | None = None items: list[EvaluationItemInput] diff --git a/api/core/evaluation/entities/judgment_entity.py b/api/core/evaluation/entities/judgment_entity.py new file mode 100644 index 0000000000..4970938c78 --- /dev/null +++ b/api/core/evaluation/entities/judgment_entity.py @@ -0,0 +1,84 @@ +"""Judgment condition entities for evaluation metric assessment. + +Typical usage: + judgment_config = JudgmentConfig( + logical_operator="and", + conditions=[ + JudgmentCondition(metric_name="faithfulness", comparison_operator=">", value="0.8"), + JudgmentCondition(metric_name="answer_relevancy", comparison_operator="≥", value="0.7"), + ], + ) +""" + +from collections.abc import Sequence +from typing import Any, Literal + +from pydantic import BaseModel, Field + +from core.workflow.utils.condition.entities import SupportedComparisonOperator + + +class JudgmentCondition(BaseModel): + """A single judgment condition that checks one metric value. + + Attributes: + metric_name: The name of the evaluation metric to check + (must match an EvaluationMetric.name in the results). + comparison_operator: The comparison operator to apply + (reuses the same operator set as workflow condition branches). + value: The expected/threshold value to compare against. + For numeric operators (>, <, =, etc.), this should be a numeric string. + For string operators (contains, is, etc.), this should be a string. + For unary operators (empty, null, etc.), this can be None. + """ + + metric_name: str + comparison_operator: SupportedComparisonOperator + value: str | Sequence[str] | None = None + + +class JudgmentConfig(BaseModel): + """A group of judgment conditions combined with a logical operator. + + Attributes: + logical_operator: How to combine condition results — "and" requires + all conditions to pass, "or" requires at least one. + conditions: The list of individual conditions to evaluate. + """ + + logical_operator: Literal["and", "or"] = "and" + conditions: list[JudgmentCondition] = Field(default_factory=list) + + +class JudgmentConditionResult(BaseModel): + """Result of evaluating a single judgment condition. + + Attributes: + metric_name: Which metric was checked. + comparison_operator: The operator that was applied. + expected_value: The threshold/expected value from the condition config. + actual_value: The actual metric value that was evaluated. + passed: Whether this individual condition passed. + error: Error message if the condition evaluation failed. + """ + + metric_name: str + comparison_operator: str + expected_value: Any = None + actual_value: Any = None + passed: bool = False + error: str | None = None + + +class JudgmentResult(BaseModel): + """Overall result of evaluating all judgment conditions for one item. + + Attributes: + passed: Whether the overall judgment passed (based on logical_operator). + logical_operator: The logical operator used to combine conditions. + condition_results: Detailed result for each individual condition. + """ + + passed: bool = False + logical_operator: Literal["and", "or"] = "and" + condition_results: list[JudgmentConditionResult] = Field(default_factory=list) diff --git a/api/core/evaluation/evaluation_manager.py b/api/core/evaluation/evaluation_manager.py index 5499b96cad..06a3bed221 100644 --- a/api/core/evaluation/evaluation_manager.py +++ b/api/core/evaluation/evaluation_manager.py @@ -25,6 +25,14 @@ class EvaluationFrameworkConfigMap(collections.UserDict[str, dict[str, Any]]): } case EvaluationFrameworkEnum.DEEPEVAL: raise NotImplementedError("DeepEval adapter is not yet implemented.") + case EvaluationFrameworkEnum.CUSTOMIZED: + from core.evaluation.entities.config_entity import CustomizedEvaluatorConfig + from core.evaluation.frameworks.customized.customized_evaluator import CustomizedEvaluator + + return { + "config_class": CustomizedEvaluatorConfig, + "evaluator_class": CustomizedEvaluator, + } case _: raise ValueError(f"Unknown evaluation framework: {framework}") diff --git a/api/core/evaluation/frameworks/customized/__init__.py b/api/core/evaluation/frameworks/customized/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/api/core/evaluation/frameworks/customized/customized_evaluator.py b/api/core/evaluation/frameworks/customized/customized_evaluator.py new file mode 100644 index 0000000000..501af17833 --- /dev/null +++ b/api/core/evaluation/frameworks/customized/customized_evaluator.py @@ -0,0 +1,267 @@ +"""Customized workflow-based evaluator. + +Uses a published workflow as the evaluation strategy. The target's actual output, +expected output, original inputs, and context are passed as workflow inputs. +The workflow's output variables are treated as evaluation metrics. + +The evaluation workflow_id is provided per evaluation run via +metrics_config["workflow_id"]. + +""" + +import json +import logging +from collections.abc import Mapping +from typing import Any + +from core.evaluation.base_evaluation_instance import BaseEvaluationInstance +from core.evaluation.entities.config_entity import CustomizedEvaluatorConfig +from core.evaluation.entities.evaluation_entity import ( + EvaluationCategory, + EvaluationItemInput, + EvaluationItemResult, + EvaluationMetric, +) + +logger = logging.getLogger(__name__) + + +class CustomizedEvaluator(BaseEvaluationInstance): + """Evaluate using a published workflow.""" + + def __init__(self, config: CustomizedEvaluatorConfig): + self.config = config + + def evaluate_llm( + self, + items: list[EvaluationItemInput], + metrics_config: dict, + model_provider: str, + model_name: str, + tenant_id: str, + ) -> list[EvaluationItemResult]: + return self._evaluate_with_workflow(items, metrics_config, tenant_id) + + def evaluate_retrieval( + self, + items: list[EvaluationItemInput], + metrics_config: dict, + model_provider: str, + model_name: str, + tenant_id: str, + ) -> list[EvaluationItemResult]: + return self._evaluate_with_workflow(items, metrics_config, tenant_id) + + def evaluate_agent( + self, + items: list[EvaluationItemInput], + metrics_config: dict, + model_provider: str, + model_name: str, + tenant_id: str, + ) -> list[EvaluationItemResult]: + return self._evaluate_with_workflow(items, metrics_config, tenant_id) + + def evaluate_workflow( + self, + items: list[EvaluationItemInput], + metrics_config: dict, + model_provider: str, + model_name: str, + tenant_id: str, + ) -> list[EvaluationItemResult]: + return self._evaluate_with_workflow(items, metrics_config, tenant_id) + + def get_supported_metrics(self, category: EvaluationCategory) -> list[str]: + """Metrics are dynamic and defined by the evaluation workflow outputs. + + Return an empty list since available metrics depend on the specific + workflow chosen at runtime. + """ + return [] + + def _evaluate_with_workflow( + self, + items: list[EvaluationItemInput], + metrics_config: dict, + tenant_id: str, + ) -> list[EvaluationItemResult]: + """Run the evaluation workflow for each item and extract metric scores. + + Args: + items: Evaluation items with inputs, expected_output, and context + (context typically contains the target's actual_output, merged + by the Runner's evaluate_metrics method). + metrics_config: Must contain "workflow_id" pointing to a published + WORKFLOW-type App. + tenant_id: Tenant scope for database and workflow execution. + + Returns: + List of EvaluationItemResult with metrics extracted from workflow outputs. + + Raises: + ValueError: If workflow_id is missing from metrics_config or the + workflow/app cannot be found. + """ + workflow_id = metrics_config.get("workflow_id") + if not workflow_id: + raise ValueError( + "metrics_config must contain 'workflow_id' for customized evaluator" + ) + + app, workflow, service_account = self._load_workflow_resources(workflow_id, tenant_id) + + results: list[EvaluationItemResult] = [] + for item in items: + try: + result = self._evaluate_single_item(app, workflow, service_account, item) + results.append(result) + except Exception: + logger.exception( + "Customized evaluator failed for item %d with workflow %s", + item.index, + workflow_id, + ) + results.append(EvaluationItemResult(index=item.index)) + return results + + def _evaluate_single_item( + self, + app: Any, + workflow: Any, + service_account: Any, + item: EvaluationItemInput, + ) -> EvaluationItemResult: + """Run the evaluation workflow for a single item. + + Builds workflow inputs from the item data and executes the workflow + in non-streaming mode. Extracts metrics from the workflow's output + variables. + """ + from core.app.apps.workflow.app_generator import WorkflowAppGenerator + from core.app.entities.app_invoke_entities import InvokeFrom + + workflow_inputs = self._build_workflow_inputs(item) + + generator = WorkflowAppGenerator() + response: Mapping[str, Any] = generator.generate( + app_model=app, + workflow=workflow, + user=service_account, + args={"inputs": workflow_inputs}, + invoke_from=InvokeFrom.SERVICE_API, + streaming=False, + ) + + metrics = self._extract_metrics(response) + return EvaluationItemResult( + index=item.index, + metrics=metrics, + metadata={"workflow_response": self._safe_serialize(response)}, + ) + + def _load_workflow_resources( + self, workflow_id: str, tenant_id: str + ) -> tuple[Any, Any, Any]: + """Load the evaluation workflow App, its published workflow, and a service account. + + Args: + workflow_id: The App ID of the evaluation workflow. + tenant_id: Tenant scope. + + Returns: + Tuple of (app, workflow, service_account). + + Raises: + ValueError: If the app or published workflow cannot be found. + """ + from sqlalchemy.orm import Session + + from core.evaluation.runners import get_service_account_for_app + from models.engine import db + from models.model import App + from services.workflow_service import WorkflowService + + with Session(db.engine, expire_on_commit=False) as session, session.begin(): + app = session.query(App).filter_by(id=workflow_id, tenant_id=tenant_id).first() + if not app: + raise ValueError( + f"Evaluation workflow app {workflow_id} not found in tenant {tenant_id}" + ) + + service_account = get_service_account_for_app(session, workflow_id) + + workflow_service = WorkflowService() + published_workflow = workflow_service.get_published_workflow(app_model=app) + if not published_workflow: + raise ValueError( + f"No published workflow found for evaluation app {workflow_id}" + ) + + return app, published_workflow, service_account + + @staticmethod + def _build_workflow_inputs(item: EvaluationItemInput) -> dict[str, Any]: + """Build workflow input dict from an evaluation item. + + Maps evaluation data to conventional workflow input variable names: + - actual_output: The target's actual output (from context[0] if available) + - expected_output: The expected/reference output + - inputs: The original evaluation inputs as JSON string + - context: All context strings joined by newlines + + """ + workflow_inputs: dict[str, Any] = {} + + # The actual_output is typically the first element in context + # (merged by the Runner's evaluate_metrics method) + if item.context: + workflow_inputs["actual_output"] = item.context[0] if len(item.context) == 1 else "\n\n".join(item.context) + + if item.expected_output: + workflow_inputs["expected_output"] = item.expected_output + + if item.inputs: + workflow_inputs["inputs"] = json.dumps(item.inputs, ensure_ascii=False) + + if item.context and len(item.context) > 1: + workflow_inputs["context"] = "\n\n".join(item.context) + + return workflow_inputs + + @staticmethod + def _extract_metrics(response: Mapping[str, Any]) -> list[EvaluationMetric]: + """Extract evaluation metrics from workflow output variables. + + Each output variable is treated as a metric. + """ + metrics: list[EvaluationMetric] = [] + + data = response.get("data", {}) + if not isinstance(data, Mapping): + logger.warning("Unexpected workflow response format: missing 'data' dict") + return metrics + + outputs = data.get("outputs", {}) + if not isinstance(outputs, Mapping): + logger.warning("Unexpected workflow response format: 'outputs' is not a dict") + return metrics + + for key, value in outputs.items(): + try: + score = float(value) + metrics.append(EvaluationMetric(name=key, score=score)) + except (TypeError, ValueError): + metrics.append( + EvaluationMetric(name=key, score=0.0, details={"raw_value": value}) + ) + + return metrics + + @staticmethod + def _safe_serialize(response: Mapping[str, Any]) -> dict[str, Any]: + """Safely serialize workflow response for metadata storage.""" + try: + return dict(response) + except Exception: + return {"raw": str(response)} diff --git a/api/core/evaluation/judgment/__init__.py b/api/core/evaluation/judgment/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/api/core/evaluation/judgment/processor.py b/api/core/evaluation/judgment/processor.py new file mode 100644 index 0000000000..0345110ca1 --- /dev/null +++ b/api/core/evaluation/judgment/processor.py @@ -0,0 +1,144 @@ +"""Judgment condition processor for evaluation metrics. + +Evaluates pass/fail judgment conditions against evaluation metric values. +Reuses the core comparison engine from the workflow condition system +(core.workflow.utils.condition.processor._evaluate_condition) to ensure +consistent operator semantics across the platform. + +""" + +import logging +from typing import Any + +from core.evaluation.entities.judgment_entity import ( + JudgmentCondition, + JudgmentConditionResult, + JudgmentConfig, + JudgmentResult, +) +from core.workflow.utils.condition.processor import _evaluate_condition + +logger = logging.getLogger(__name__) + + +class JudgmentProcessor: + + @staticmethod + def evaluate( + metric_values: dict[str, Any], + config: JudgmentConfig, + ) -> JudgmentResult: + """Evaluate all judgment conditions against the given metric values. + + Args: + metric_values: Mapping of metric name to its value + (e.g. {"faithfulness": 0.85, "status": "success"}). + config: The judgment configuration with logical_operator and conditions. + + Returns: + JudgmentResult with overall pass/fail and per-condition details. + """ + if not config.conditions: + return JudgmentResult( + passed=True, + logical_operator=config.logical_operator, + condition_results=[], + ) + + condition_results: list[JudgmentConditionResult] = [] + + for condition in config.conditions: + result = JudgmentProcessor._evaluate_single_condition( + metric_values, condition + ) + condition_results.append(result) + + if config.logical_operator == "and" and not result.passed: + return JudgmentResult( + passed=False, + logical_operator=config.logical_operator, + condition_results=condition_results, + ) + if config.logical_operator == "or" and result.passed: + return JudgmentResult( + passed=True, + logical_operator=config.logical_operator, + condition_results=condition_results, + ) + + if config.logical_operator == "and": + final_passed = all(r.passed for r in condition_results) + else: + final_passed = any(r.passed for r in condition_results) + + return JudgmentResult( + passed=final_passed, + logical_operator=config.logical_operator, + condition_results=condition_results, + ) + + @staticmethod + def _evaluate_single_condition( + metric_values: dict[str, Any], + condition: JudgmentCondition, + ) -> JudgmentConditionResult: + """Evaluate a single judgment condition against the metric values. + + Looks up the metric by name, then delegates to the workflow condition + engine for the actual comparison. + + Args: + metric_values: Mapping of metric name to its value. + condition: The condition to evaluate. + + Returns: + JudgmentConditionResult with pass/fail and details. + """ + metric_name = condition.metric_name + actual_value = metric_values.get(metric_name) + + # Handle metric not found + if actual_value is None and condition.comparison_operator not in ( + "null", + "not null", + "empty", + "not empty", + "exists", + "not exists", + ): + return JudgmentConditionResult( + metric_name=metric_name, + comparison_operator=condition.comparison_operator, + expected_value=condition.value, + actual_value=None, + passed=False, + error=f"Metric '{metric_name}' not found in evaluation results", + ) + + try: + passed = _evaluate_condition( + operator=condition.comparison_operator, + value=actual_value, + expected=condition.value, + ) + return JudgmentConditionResult( + metric_name=metric_name, + comparison_operator=condition.comparison_operator, + expected_value=condition.value, + actual_value=actual_value, + passed=passed, + ) + except Exception as e: + logger.warning( + "Judgment condition evaluation failed for metric '%s': %s", + metric_name, + str(e), + ) + return JudgmentConditionResult( + metric_name=metric_name, + comparison_operator=condition.comparison_operator, + expected_value=condition.value, + actual_value=actual_value, + passed=False, + error=str(e), + ) diff --git a/api/core/evaluation/runners/base_evaluation_runner.py b/api/core/evaluation/runners/base_evaluation_runner.py index c82935a280..d5d93a9808 100644 --- a/api/core/evaluation/runners/base_evaluation_runner.py +++ b/api/core/evaluation/runners/base_evaluation_runner.py @@ -9,6 +9,8 @@ from core.evaluation.entities.evaluation_entity import ( EvaluationItemInput, EvaluationItemResult, ) +from core.evaluation.entities.judgment_entity import JudgmentConfig +from core.evaluation.judgment.processor import JudgmentProcessor from libs.datetime_utils import naive_utc_now from models.evaluation import EvaluationRun, EvaluationRunItem, EvaluationRunStatus @@ -20,7 +22,13 @@ class BaseEvaluationRunner(ABC): Runners are responsible for executing the target (App/Snippet/Retrieval) to collect actual outputs, then delegating to the evaluation instance - for metric computation. + for metric computation, and optionally applying judgment conditions. + + Execution phases: + 1. execute_target — run the target and collect actual outputs + 2. evaluate_metrics — compute evaluation metrics via the framework + 3. apply_judgment — evaluate pass/fail judgment conditions on metrics + 4. persist — save results to the database """ def __init__(self, evaluation_instance: BaseEvaluationInstance, session: Session): @@ -61,8 +69,11 @@ class BaseEvaluationRunner(ABC): metrics_config: dict, model_provider: str, model_name: str, + judgment_config: JudgmentConfig | None = None, ) -> list[EvaluationItemResult]: - """Orchestrate target execution + metric evaluation for all items.""" + """Orchestrate target execution + metric evaluation + judgment for all items. + + """ evaluation_run = self.session.query(EvaluationRun).filter_by(id=evaluation_run_id).first() if not evaluation_run: raise ValueError(f"EvaluationRun {evaluation_run_id} not found") @@ -108,7 +119,11 @@ class BaseEvaluationRunner(ABC): except Exception: logger.exception("Failed to compute metrics for evaluation run %s", evaluation_run_id) - # Phase 3: Persist individual items + # Phase 3: Apply judgment conditions on metrics + if judgment_config and judgment_config.conditions: + results = self._apply_judgment(results, judgment_config) + + # Phase 4: Persist individual items for result in results: item_input = next((item for item in items if item.index == result.index), None) run_item = EvaluationRunItem( @@ -119,6 +134,7 @@ class BaseEvaluationRunner(ABC): context=json.dumps(item_input.context) if item_input and item_input.context else None, actual_output=result.actual_output, metrics=json.dumps([m.model_dump() for m in result.metrics]) if result.metrics else None, + judgment=json.dumps(result.judgment.model_dump()) if result.judgment else None, metadata_json=json.dumps(result.metadata) if result.metadata else None, error=result.error, overall_score=result.overall_score, @@ -128,3 +144,28 @@ class BaseEvaluationRunner(ABC): self.session.commit() return results + + @staticmethod + def _apply_judgment( + results: list[EvaluationItemResult], + judgment_config: JudgmentConfig, + ) -> list[EvaluationItemResult]: + """Apply judgment conditions to each result's metrics. + + Builds a metric_name → score mapping from each result's metrics, + then delegates to JudgmentProcessor for condition evaluation. + Results with errors are skipped. + """ + judged_results: list[EvaluationItemResult] = [] + for result in results: + if result.error is not None or not result.metrics: + judged_results.append(result) + continue + + metric_values = {m.name: m.score for m in result.metrics} + judgment_result = JudgmentProcessor.evaluate(metric_values, judgment_config) + + judged_results.append( + result.model_copy(update={"judgment": judgment_result}) + ) + return judged_results diff --git a/api/models/evaluation.py b/api/models/evaluation.py index 1a428b590c..898cecb6a3 100644 --- a/api/models/evaluation.py +++ b/api/models/evaluation.py @@ -162,6 +162,7 @@ class EvaluationRunItem(Base): actual_output: Mapped[str | None] = mapped_column(LongText, nullable=True) metrics: Mapped[str | None] = mapped_column(LongText, nullable=True) + judgment: Mapped[str | None] = mapped_column(LongText, nullable=True) metadata_json: Mapped[str | None] = mapped_column(LongText, nullable=True) error: Mapped[str | None] = mapped_column(Text, nullable=True)