Merge remote-tracking branch 'origin/feat/evaluation' into feat/evaluation

This commit is contained in:
FFXN 2026-03-16 18:14:33 +08:00
commit c20be9c815
25 changed files with 459 additions and 440 deletions

View File

@ -1383,7 +1383,7 @@ class EvaluationConfig(BaseSettings):
EVALUATION_MAX_DATASET_ROWS: PositiveInt = Field(
description="Maximum number of rows allowed in an evaluation dataset",
default=1000,
default=500,
)
EVALUATION_TASK_TIMEOUT: PositiveInt = Field(

View File

@ -106,6 +106,9 @@ from .datasets.rag_pipeline import (
rag_pipeline_workflow,
)
# Import evaluation controllers
from .evaluation import evaluation
# Import explore controllers
from .explore import (
banner,
@ -116,9 +119,6 @@ from .explore import (
trial,
)
# Import evaluation controllers
from .evaluation import evaluation
# Import snippet controllers
from .snippets import snippet_workflow

View File

@ -153,9 +153,7 @@ def get_evaluation_target(view_func: Callable[P, R]):
target: Union[App, CustomizedSnippet] | None = None
if target_type == "app":
target = (
db.session.query(App).where(App.id == target_id, App.tenant_id == current_tenant_id).first()
)
target = db.session.query(App).where(App.id == target_id, App.tenant_id == current_tenant_id).first()
elif target_type == "snippets":
target = (
db.session.query(CustomizedSnippet)
@ -229,9 +227,7 @@ class EvaluationDetailApi(Resource):
_, current_tenant_id = current_account_with_tenant()
with Session(db.engine, expire_on_commit=False) as session:
config = EvaluationService.get_evaluation_config(
session, current_tenant_id, target_type, str(target.id)
)
config = EvaluationService.get_evaluation_config(session, current_tenant_id, target_type, str(target.id))
if config is None:
return {
@ -360,9 +356,7 @@ class EvaluationRunApi(Resource):
# Load dataset file
upload_file = (
db.session.query(UploadFile)
.filter_by(id=run_request.file_id, tenant_id=current_tenant_id)
.first()
db.session.query(UploadFile).filter_by(id=run_request.file_id, tenant_id=current_tenant_id).first()
)
if not upload_file:
raise NotFound("Dataset file not found.")
@ -397,9 +391,7 @@ class EvaluationRunApi(Resource):
return {"message": str(e.description)}, 400
@console_ns.route(
"/<string:evaluate_target_type>/<uuid:evaluate_target_id>/evaluation/runs/<uuid:run_id>"
)
@console_ns.route("/<string:evaluate_target_type>/<uuid:evaluate_target_id>/evaluation/runs/<uuid:run_id>")
class EvaluationRunDetailApi(Resource):
@console_ns.doc("get_evaluation_run_detail")
@console_ns.response(200, "Evaluation run detail retrieved")
@ -444,9 +436,7 @@ class EvaluationRunDetailApi(Resource):
return {"message": str(e.description)}, 404
@console_ns.route(
"/<string:evaluate_target_type>/<uuid:evaluate_target_id>/evaluation/runs/<uuid:run_id>/cancel"
)
@console_ns.route("/<string:evaluate_target_type>/<uuid:evaluate_target_id>/evaluation/runs/<uuid:run_id>/cancel")
class EvaluationRunCancelApi(Resource):
@console_ns.doc("cancel_evaluation_run")
@console_ns.response(200, "Evaluation run cancelled")
@ -599,6 +589,7 @@ def _serialize_evaluation_run_item(item: EvaluationRunItem) -> dict[str, object]
"expected_output": item.expected_output,
"actual_output": item.actual_output,
"metrics": item.metrics_list,
"judgment": item.judgment_dict,
"metadata": item.metadata_dict,
"error": item.error,
"overall_score": item.overall_score,

View File

@ -238,9 +238,7 @@ class CustomizedSnippetExportApi(Resource):
with Session(db.engine) as session:
export_service = SnippetDslService(session)
result = export_service.export_snippet_dsl(
snippet=snippet, include_secret=query.include_secret == "true"
)
result = export_service.export_snippet_dsl(snippet=snippet, include_secret=query.include_secret == "true")
# Set filename with .snippet extension
filename = f"{snippet.name}.snippet"

View File

@ -16,7 +16,7 @@ logger = logging.getLogger(__name__)
class BaseEvaluationInstance(ABC):
"""Abstract base class for evaluation framework adapters. """
"""Abstract base class for evaluation framework adapters."""
@abstractmethod
def evaluate_llm(
@ -95,33 +95,26 @@ class BaseEvaluationInstance(ABC):
workflow_id = customized_metrics.evaluation_workflow_id
if not workflow_id:
raise ValueError(
"customized_metrics must contain 'evaluation_workflow_id' for customized evaluator"
)
raise ValueError("customized_metrics must contain 'evaluation_workflow_id' for customized evaluator")
# Load the evaluator workflow resources using a dedicated session
with Session(db.engine, expire_on_commit=False) as session, session.begin():
app = session.query(App).filter_by(
id=workflow_id, tenant_id=tenant_id
).first()
app = session.query(App).filter_by(id=workflow_id, tenant_id=tenant_id).first()
if not app:
raise ValueError(
f"Evaluation workflow app {workflow_id} not found in tenant {tenant_id}"
)
raise ValueError(f"Evaluation workflow app {workflow_id} not found in tenant {tenant_id}")
service_account = get_service_account_for_app(session, workflow_id)
workflow_service = WorkflowService()
published_workflow = workflow_service.get_published_workflow(app_model=app)
if not published_workflow:
raise ValueError(
f"No published workflow found for evaluation app {workflow_id}"
)
raise ValueError(f"No published workflow found for evaluation app {workflow_id}")
eval_results: list[EvaluationItemResult] = []
for idx, node_run_result_mapping in enumerate(node_run_result_mapping_list):
try:
workflow_inputs = self._build_workflow_inputs(
customized_metrics.input_fields, node_run_result_mapping,
customized_metrics.input_fields,
node_run_result_mapping,
)
generator = WorkflowAppGenerator()
@ -183,14 +176,13 @@ class BaseEvaluationInstance(ABC):
full_match = VARIABLE_REGEX.fullmatch(value_source)
if full_match:
workflow_inputs[field_name] = resolve_variable_selector(
full_match.group(1), node_run_result_mapping,
full_match.group(1),
node_run_result_mapping,
)
elif VARIABLE_REGEX.search(value_source):
# Mixed template: interpolate all expressions as strings.
workflow_inputs[field_name] = VARIABLE_REGEX.sub(
lambda m: str(
resolve_variable_selector(m.group(1), node_run_result_mapping)
),
lambda m: str(resolve_variable_selector(m.group(1), node_run_result_mapping)),
value_source,
)
else:
@ -213,9 +205,7 @@ class BaseEvaluationInstance(ABC):
outputs = data.get("outputs")
if not isinstance(outputs, dict):
logger.warning(
"Unexpected workflow response format: 'outputs' is not a dict"
)
logger.warning("Unexpected workflow response format: 'outputs' is not a dict")
return metrics
for key, raw_value in outputs.items():
@ -239,7 +229,8 @@ def resolve_variable_selector(
if len(parts) < 2:
logger.warning(
"Selector '%s' must have at least node_id.output_key", selector_raw,
"Selector '%s' must have at least node_id.output_key",
selector_raw,
)
return ""
@ -250,7 +241,8 @@ def resolve_variable_selector(
if not node_result or not node_result.outputs:
logger.warning(
"Selector '%s': node '%s' not found or has no outputs",
selector_raw, node_id,
selector_raw,
node_id,
)
return ""
@ -262,14 +254,17 @@ def resolve_variable_selector(
if next_val is None:
logger.warning(
"Selector '%s': key '%s' not found in node '%s' outputs",
selector_raw, key, node_id,
selector_raw,
key,
node_id,
)
return ""
current = next_val
else:
logger.warning(
"Selector '%s': cannot traverse into non-dict value at key '%s'",
selector_raw, key,
selector_raw,
key,
)
return ""

View File

@ -11,14 +11,17 @@ class EvaluationFrameworkEnum(StrEnum):
class BaseEvaluationConfig(BaseModel):
"""Base configuration for evaluation frameworks."""
pass
class RagasConfig(BaseEvaluationConfig):
"""RAGAS-specific configuration."""
pass
class DeepEvalConfig(BaseEvaluationConfig):
"""DeepEval-specific configuration."""
pass

View File

@ -40,16 +40,9 @@ class EvaluationItemResult(BaseModel):
actual_output: str | None = None
metrics: list[EvaluationMetric] = Field(default_factory=list)
metadata: dict[str, Any] = Field(default_factory=dict)
judgment: JudgmentResult | None = None
judgment: JudgmentResult = Field(default_factory=JudgmentResult)
error: str | None = None
@property
def overall_score(self) -> float | None:
if not self.metrics:
return None
scores = [m.score for m in self.metrics]
return sum(scores) / len(scores)
class NodeInfo(BaseModel):
node_id: str
@ -75,6 +68,7 @@ class CustomizedMetrics(BaseModel):
class EvaluationConfigData(BaseModel):
"""Structured data for saving evaluation configuration."""
evaluation_model: str = ""
evaluation_model_provider: str = ""
default_metrics: list[DefaultMetric] = Field(default_factory=list)
@ -84,11 +78,13 @@ class EvaluationConfigData(BaseModel):
class EvaluationRunRequest(EvaluationConfigData):
"""Request body for starting an evaluation run."""
file_id: str
class EvaluationRunData(BaseModel):
"""Serializable data for Celery task."""
evaluation_run_id: str
tenant_id: str
target_type: str

View File

@ -1,11 +1,6 @@
"""Judgment condition entities for evaluation metric assessment.
Key concepts:
- **value_source**: Where the comparison target comes from.
- "constant": a literal value supplied by the user (e.g. threshold "0.8").
- "variable": a named field from the evaluation target's runtime data
(inputs, actual_output, expected_output). The ``value`` field holds the
variable key; the actual comparison value is resolved at evaluation time.
- **condition_type**: Determines operator semantics and type coercion.
- "string": string operators (contains, is, start with, ).
- "number": numeric operators (>, <, =, , , ).
@ -18,34 +13,19 @@ Typical usage:
JudgmentCondition(
metric_name="faithfulness",
comparison_operator=">",
value="0.8",
condition_value="0.8",
condition_type="number",
),
JudgmentCondition(
metric_name="output",
comparison_operator="contains",
value="expected_output",
value_source="variable",
condition_type="string",
),
)
],
)
"""
from collections.abc import Sequence
from enum import StrEnum
from typing import Any, Literal
from pydantic import BaseModel, Field
class JudgmentValueSource(StrEnum):
"""Where the comparison target value comes from."""
CONSTANT = "constant"
VARIABLE = "variable"
class JudgmentConditionType(StrEnum):
"""Category of the condition, controls operator semantics and type coercion."""
@ -90,22 +70,15 @@ class JudgmentCondition(BaseModel):
metric_name: The name of the evaluation metric to check (left side).
Must match an EvaluationMetric.name in the results.
comparison_operator: The comparison operator to apply.
value: The comparison target (right side).
- When value_source is "constant": the literal threshold/expected value.
- When value_source is "variable": the variable key name to look up
from the evaluation target's runtime data.
For unary operators (empty, null, etc.), this can be None.
value_source: Where the comparison value comes from.
"constant" (default) for user-supplied literals,
"variable" for references to evaluation target data.
condition_value: The comparison target (right side). For unary operators
such as ``empty`` or ``null`` this can be ``None``.
condition_type: Controls type coercion and which operators are valid.
"string" (default), "number", or "datetime".
"""
metric_name: str
comparison_operator: JudgmentComparisonOperator
value: str | Sequence[str] | None = None
value_source: JudgmentValueSource = JudgmentValueSource.CONSTANT
condition_value: Any | None = None
condition_type: JudgmentConditionType = JudgmentConditionType.STRING

View File

@ -67,9 +67,7 @@ class DeepEvalEvaluator(BaseEvaluationInstance):
model_name: str,
tenant_id: str,
) -> list[EvaluationItemResult]:
return self._evaluate(
items, metric_name, model_provider, model_name, tenant_id, EvaluationCategory.RETRIEVAL
)
return self._evaluate(items, metric_name, model_provider, model_name, tenant_id, EvaluationCategory.RETRIEVAL)
def evaluate_agent(
self,
@ -79,9 +77,7 @@ class DeepEvalEvaluator(BaseEvaluationInstance):
model_name: str,
tenant_id: str,
) -> list[EvaluationItemResult]:
return self._evaluate(
items, metric_name, model_provider, model_name, tenant_id, EvaluationCategory.AGENT
)
return self._evaluate(items, metric_name, model_provider, model_name, tenant_id, EvaluationCategory.AGENT)
def evaluate_workflow(
self,
@ -91,9 +87,7 @@ class DeepEvalEvaluator(BaseEvaluationInstance):
model_name: str,
tenant_id: str,
) -> list[EvaluationItemResult]:
return self._evaluate(
items, metric_name, model_provider, model_name, tenant_id, EvaluationCategory.WORKFLOW
)
return self._evaluate(items, metric_name, model_provider, model_name, tenant_id, EvaluationCategory.WORKFLOW)
def _evaluate(
self,
@ -106,11 +100,7 @@ class DeepEvalEvaluator(BaseEvaluationInstance):
) -> list[EvaluationItemResult]:
"""Core evaluation logic using DeepEval."""
model_wrapper = DifyModelWrapper(model_provider, model_name, tenant_id)
requested_metrics = (
[metric_name]
if metric_name
else self.get_supported_metrics(category)
)
requested_metrics = [metric_name] if metric_name else self.get_supported_metrics(category)
try:
return self._evaluate_with_deepeval(items, requested_metrics, category)
@ -144,9 +134,7 @@ class DeepEvalEvaluator(BaseEvaluationInstance):
try:
metric.measure(test_case)
if metric.score is not None:
metrics.append(
EvaluationMetric(name=metric.__class__.__name__, value=float(metric.score))
)
metrics.append(EvaluationMetric(name=metric.__class__.__name__, value=float(metric.score)))
except Exception:
logger.exception(
"Failed to compute metric %s for item %d",
@ -229,9 +217,7 @@ class DeepEvalEvaluator(BaseEvaluationInstance):
parts.append(f"\nExpected Output: {item.expected_output}")
if item.context:
parts.append(f"\nContext: {'; '.join(item.context)}")
parts.append(
"\nRespond with ONLY a single floating point number between 0.0 and 1.0, nothing else."
)
parts.append("\nRespond with ONLY a single floating point number between 0.0 and 1.0, nothing else.")
return "\n".join(parts)
@staticmethod

View File

@ -57,9 +57,7 @@ class RagasEvaluator(BaseEvaluationInstance):
model_name: str,
tenant_id: str,
) -> list[EvaluationItemResult]:
return self._evaluate(
items, metric_name, model_provider, model_name, tenant_id, EvaluationCategory.RETRIEVAL
)
return self._evaluate(items, metric_name, model_provider, model_name, tenant_id, EvaluationCategory.RETRIEVAL)
def evaluate_agent(
self,
@ -79,9 +77,7 @@ class RagasEvaluator(BaseEvaluationInstance):
model_name: str,
tenant_id: str,
) -> list[EvaluationItemResult]:
return self._evaluate(
items, metric_name, model_provider, model_name, tenant_id, EvaluationCategory.WORKFLOW
)
return self._evaluate(items, metric_name, model_provider, model_name, tenant_id, EvaluationCategory.WORKFLOW)
def _evaluate(
self,
@ -94,11 +90,7 @@ class RagasEvaluator(BaseEvaluationInstance):
) -> list[EvaluationItemResult]:
"""Core evaluation logic using RAGAS."""
model_wrapper = DifyModelWrapper(model_provider, model_name, tenant_id)
requested_metrics = (
[metric_name]
if metric_name
else self.get_supported_metrics(category)
)
requested_metrics = [metric_name] if metric_name else self.get_supported_metrics(category)
try:
return self._evaluate_with_ragas(items, requested_metrics, model_wrapper, category)
@ -237,9 +229,7 @@ class RagasEvaluator(BaseEvaluationInstance):
parts.append(f"\nExpected Output: {item.expected_output}")
if item.context:
parts.append(f"\nContext: {'; '.join(item.context)}")
parts.append(
"\nRespond with ONLY a single floating point number between 0.0 and 1.0, nothing else."
)
parts.append("\nRespond with ONLY a single floating point number between 0.0 and 1.0, nothing else.")
return "\n".join(parts)
@staticmethod

View File

@ -1,35 +1,20 @@
"""Judgment condition processor for evaluation metrics.
Evaluates pass/fail judgment conditions against evaluation metric values.
Reuses the core comparison engine from the workflow condition system
(core.workflow.utils.condition.processor._evaluate_condition) to ensure
consistent operator semantics across the platform.
Each condition uses:
- ``metric_name`` as the left-hand side lookup key from ``metric_values``
- ``comparison_operator`` as the operator
- ``condition_value`` as the right-hand side comparison value
The processor is intentionally decoupled from evaluation frameworks
(RAGAS / Customized) and runners. It operates on plain ``dict`` mappings
and can be invoked from any context.
Typical usage::
metrics = {"faithfulness": 0.85, "answer_relevancy": 0.6}
variables = {"expected_output": "Hello World", "created_at": "2025-01-01T00:00:00"}
config = JudgmentConfig(
logical_operator="and",
conditions=[
JudgmentCondition(metric_name="faithfulness", comparison_operator=">",
value="0.8", condition_type="number"),
JudgmentCondition(metric_name="output", comparison_operator="contains",
value="expected_output", value_source="variable",
condition_type="string"),
],
)
result = JudgmentProcessor.evaluate(metrics, config, variable_values=variables)
The processor is intentionally decoupled from evaluation frameworks and
runners. It operates on plain ``dict`` mappings and can be invoked anywhere
that already has per-item metric results.
"""
import logging
from collections.abc import Sequence
from datetime import datetime
from typing import Any
from typing import Any, cast
from core.evaluation.entities.judgment_entity import (
JudgmentCondition,
@ -37,9 +22,9 @@ from core.evaluation.entities.judgment_entity import (
JudgmentConditionType,
JudgmentConfig,
JudgmentResult,
JudgmentValueSource,
)
from core.workflow.utils.condition.processor import _evaluate_condition
from core.workflow.utils.condition.entities import SupportedComparisonOperator
from core.workflow.utils.condition.processor import _evaluate_condition # pyright: ignore[reportPrivateUsage]
logger = logging.getLogger(__name__)
@ -48,12 +33,10 @@ _UNARY_OPERATORS = frozenset({"null", "not null", "empty", "not empty"})
class JudgmentProcessor:
@staticmethod
def evaluate(
metric_values: dict[str, Any],
config: JudgmentConfig,
variable_values: dict[str, Any] | None = None,
) -> JudgmentResult:
"""Evaluate all judgment conditions against the given metric values.
@ -61,9 +44,6 @@ class JudgmentProcessor:
metric_values: Mapping of metric name metric value
(e.g. ``{"faithfulness": 0.85, "status": "success"}``).
config: The judgment configuration with logical_operator and conditions.
variable_values: Optional mapping of variable name value, used when
a condition's ``value_source`` is ``"variable"``. Typically built
from the evaluation target's inputs / outputs.
Returns:
JudgmentResult with overall pass/fail and per-condition details.
@ -78,9 +58,7 @@ class JudgmentProcessor:
condition_results: list[JudgmentConditionResult] = []
for condition in config.conditions:
result = JudgmentProcessor._evaluate_single_condition(
metric_values, condition, variable_values
)
result = JudgmentProcessor._evaluate_single_condition(metric_values, condition)
condition_results.append(result)
if config.logical_operator == "and" and not result.passed:
@ -112,14 +90,12 @@ class JudgmentProcessor:
def _evaluate_single_condition(
metric_values: dict[str, Any],
condition: JudgmentCondition,
variable_values: dict[str, Any] | None = None,
) -> JudgmentConditionResult:
"""Evaluate a single judgment condition.
Steps:
1. Look up the metric value (left side) by ``metric_name``.
2. Resolve the comparison value (right side) either a constant
or a variable reference.
2. Read ``condition_value`` as the comparison value (right side).
3. Dispatch to the correct type handler (string / number / datetime).
"""
metric_name = condition.metric_name
@ -130,41 +106,28 @@ class JudgmentProcessor:
return JudgmentConditionResult(
metric_name=metric_name,
comparison_operator=condition.comparison_operator,
expected_value=condition.value,
expected_value=condition.condition_value,
actual_value=None,
passed=False,
error=f"Metric '{metric_name}' not found in evaluation results",
)
# Resolve the comparison value (right side)
try:
resolved_value = JudgmentProcessor._resolve_comparison_value(
condition, variable_values
)
except ValueError as e:
return JudgmentConditionResult(
metric_name=metric_name,
comparison_operator=condition.comparison_operator,
expected_value=condition.value,
actual_value=actual_value,
passed=False,
error=str(e),
)
resolved_value = condition.condition_value
# Dispatch to the appropriate type handler
try:
match condition.condition_type:
case JudgmentConditionType.DATETIME:
passed = _evaluate_datetime_condition(
actual_value, condition.comparison_operator, resolved_value
)
passed = _evaluate_datetime_condition(actual_value, condition.comparison_operator, resolved_value)
case JudgmentConditionType.NUMBER:
passed = _evaluate_number_condition(
actual_value, condition.comparison_operator, resolved_value
)
passed = _evaluate_number_condition(actual_value, condition.comparison_operator, resolved_value)
case _: # STRING (default) — delegate to workflow engine
if condition.comparison_operator in {"before", "after"}:
raise ValueError(
f"Operator '{condition.comparison_operator}' is not supported for string conditions"
)
passed = _evaluate_condition(
operator=condition.comparison_operator,
operator=cast(SupportedComparisonOperator, condition.comparison_operator),
value=actual_value,
expected=resolved_value,
)
@ -191,51 +154,6 @@ class JudgmentProcessor:
error=str(e),
)
@staticmethod
def _resolve_comparison_value(
condition: JudgmentCondition,
variable_values: dict[str, Any] | None,
) -> str | Sequence[str] | None:
"""Resolve the right-side comparison value.
For ``value_source == "constant"``, returns ``condition.value`` as-is.
For ``value_source == "variable"``, looks up ``condition.value`` (as a key)
in ``variable_values`` and returns the resolved value (converted to string
for compatibility with the comparison engine).
Raises:
ValueError: If the variable cannot be resolved.
"""
if condition.value_source == JudgmentValueSource.CONSTANT:
return condition.value
# Variable resolution
if condition.value is None:
raise ValueError("Variable name (value) must be provided when value_source is 'variable'")
if not variable_values:
raise ValueError(
f"Cannot resolve variable '{condition.value}': no variable values provided"
)
var_key = condition.value if isinstance(condition.value, str) else str(condition.value)
if var_key not in variable_values:
raise ValueError(
f"Variable '{var_key}' not found in evaluation target data. "
f"Available variables: {list(variable_values.keys())}"
)
resolved = variable_values[var_key]
# Convert to string for the comparison engine, unless it's already
# a str/Sequence[str]/None which the engine expects.
if resolved is None:
return None
if isinstance(resolved, str):
return resolved
if isinstance(resolved, Sequence) and all(isinstance(v, str) for v in resolved):
return resolved
return str(resolved)
_DATETIME_FORMATS = [
"%Y-%m-%dT%H:%M:%S",
@ -348,7 +266,11 @@ def _evaluate_number_condition(
"""
# Unary operators — delegate to workflow engine as-is
if operator in _UNARY_OPERATORS:
return _evaluate_condition(operator=operator, value=actual, expected=expected)
return _evaluate_condition(
operator=cast(SupportedComparisonOperator, operator),
value=actual,
expected=cast(str | Sequence[str] | bool | Sequence[bool] | None, expected),
)
if actual is None:
return False
@ -356,7 +278,7 @@ def _evaluate_number_condition(
# Coerce actual to numeric
if not isinstance(actual, (int, float)):
try:
actual = float(actual)
actual = float(cast(str | int | float, actual))
except (TypeError, ValueError) as e:
raise ValueError(f"Cannot convert actual value '{actual}' to number") from e
@ -365,4 +287,8 @@ def _evaluate_number_condition(
if expected is not None and not isinstance(expected, str):
expected = str(expected)
return _evaluate_condition(operator=operator, value=actual, expected=expected)
return _evaluate_condition(
operator=cast(SupportedComparisonOperator, operator),
value=actual,
expected=expected,
)

View File

@ -20,11 +20,7 @@ def get_service_account_for_app(session: Session, app_id: str) -> Account:
if not account:
raise ValueError(f"Creator account not found for app {app_id}")
current_tenant = (
session.query(TenantAccountJoin)
.filter_by(account_id=account.id, current=True)
.first()
)
current_tenant = session.query(TenantAccountJoin).filter_by(account_id=account.id, current=True).first()
if not current_tenant:
raise ValueError(f"Current tenant not found for account {account.id}")
@ -48,11 +44,7 @@ def get_service_account_for_snippet(session: Session, snippet_id: str) -> Accoun
if not account:
raise ValueError(f"Creator account not found for snippet {snippet_id}")
current_tenant = (
session.query(TenantAccountJoin)
.filter_by(account_id=account.id, current=True)
.first()
)
current_tenant = session.query(TenantAccountJoin).filter_by(account_id=account.id, current=True).first()
if not current_tenant:
raise ValueError(f"Current tenant not found for account {account.id}")

View File

@ -141,10 +141,12 @@ class AgentEvaluationRunner(BaseEvaluationRunner):
answer_parts.append(thought)
tool = chunk.get("tool")
if tool:
tool_calls.append({
"tool": tool,
"tool_input": chunk.get("tool_input", ""),
})
tool_calls.append(
{
"tool": tool,
"tool_input": chunk.get("tool_input", ""),
}
)
elif event == "message":
answer = chunk.get("answer", "")
if answer:

View File

@ -1,11 +1,14 @@
"""Base evaluation runner.
Orchestrates the evaluation lifecycle in four phases:
1. execute_target run the target and collect actual outputs (abstract)
1. execute_target run the target and collect actual outputs (abstract)
2. evaluate_metrics compute metrics via framework or customized workflow
3. apply_judgment evaluate pass/fail judgment conditions on metrics
4. persist save results to the database
The persisted ``EvaluationRunItem.judgment`` payload must reflect the final
judgment result for each evaluated item, so judgment evaluation happens before
the persistence phase whenever a ``JudgmentConfig`` is supplied.
"""
import json
@ -30,7 +33,7 @@ logger = logging.getLogger(__name__)
class BaseEvaluationRunner(ABC):
"""Abstract base class for evaluation runners. """
"""Abstract base class for evaluation runners."""
def __init__(self, evaluation_instance: BaseEvaluationInstance, session: Session):
self.evaluation_instance = evaluation_instance
@ -62,6 +65,7 @@ class BaseEvaluationRunner(ABC):
model_provider: str = "",
model_name: str = "",
node_run_result_mapping_list: list[dict[str, NodeRunResult]] | None = None,
judgment_config: JudgmentConfig | None = None,
) -> list[EvaluationItemResult]:
"""Orchestrate target execution + metric evaluation + judgment for all items."""
evaluation_run = self.session.query(EvaluationRun).filter_by(id=evaluation_run_id).first()
@ -115,9 +119,16 @@ class BaseEvaluationRunner(ABC):
results = list(results_by_index.values())
if judgment_config is not None:
results = self._apply_judgment(
results=results,
judgment_config=judgment_config,
node_run_result_mapping_list=node_run_result_mapping_list,
)
# Phase 4: Persist individual items
for result in results:
item_input = next((item for item in items if item.index == result.index), None)
item_input = next((item for item in evaluation_run.input_list if item.index == result.index), None)
run_item = EvaluationRunItem(
evaluation_run_id=evaluation_run_id,
item_index=result.index,
@ -129,7 +140,7 @@ class BaseEvaluationRunner(ABC):
judgment=json.dumps(result.judgment.model_dump()) if result.judgment else None,
metadata_json=json.dumps(result.metadata) if result.metadata else None,
error=result.error,
overall_score=result.overall_score,
overall_score=getattr(result, "overall_score", None),
)
self.session.add(run_item)
@ -145,54 +156,21 @@ class BaseEvaluationRunner(ABC):
) -> list[EvaluationItemResult]:
"""Apply judgment conditions to each result's metrics.
Left side (``metric_name``): looked up from evaluate-phase metrics only.
Right side: when ``value_source="variable"``, ``condition.value``
contains an expression (e.g. ``{{#node_id.output_key#}}``). The
expression is parsed and resolved against the corresponding
``node_run_result_mapping`` to obtain the actual comparison value.
Judgment is computed only from the per-item metric values and the
supplied ``JudgmentConfig``. ``metric_name`` selects the left-hand side
metric, and ``condition_value`` is used as the comparison target.
"""
from core.evaluation.base_evaluation_instance import resolve_variable_selector
from core.evaluation.entities.judgment_entity import JudgmentValueSource
from core.workflow.nodes.base.variable_template_parser import REGEX as VARIABLE_REGEX
judged_results: list[EvaluationItemResult] = []
for idx, result in enumerate(results):
for result in results:
if result.error is not None or not result.metrics:
judged_results.append(result)
continue
# Left side: only metrics
metric_values: dict[str, object] = {m.name: m.value for m in result.metrics}
judgment_result = JudgmentProcessor.evaluate(metric_values, judgment_config)
# Right side: pre-resolve variable expressions against node run results.
# Each condition.value expression (e.g. "{{#llm1.text#}}") is resolved
# and stored in variable_values keyed by the raw expression string, so
# that JudgmentProcessor._resolve_comparison_value can look it up.
variable_values: dict[str, object] = {}
node_run_result_mapping = (
node_run_result_mapping_list[idx]
if node_run_result_mapping_list and idx < len(node_run_result_mapping_list)
else {}
)
for condition in judgment_config.conditions:
if (
condition.value_source == JudgmentValueSource.VARIABLE
and isinstance(condition.value, str)
and node_run_result_mapping
):
match = VARIABLE_REGEX.fullmatch(condition.value)
if match:
resolved = resolve_variable_selector(
match.group(1), node_run_result_mapping
)
variable_values[condition.value] = resolved
judgment_result = JudgmentProcessor.evaluate(
metric_values, judgment_config, variable_values=variable_values
)
judged_results.append(
result.model_copy(update={"judgment": judgment_result})
)
judged_results.append(result.model_copy(update={"judgment": judgment_result}))
return judged_results

View File

@ -63,7 +63,7 @@ class RetrievalEvaluationRunner(BaseEvaluationRunner):
@staticmethod
def _extract_query(inputs: dict[str, Any]) -> str:
for key in ("query"):
for key in "query":
if key in inputs:
return str(inputs[key])
return ""

View File

@ -75,11 +75,15 @@ class SnippetEvaluationRunner(BaseEvaluationRunner):
# Retrieve per-node execution records from DB
workflow_run_id = self._extract_workflow_run_id(response)
node_executions = self._query_node_executions(
tenant_id=tenant_id,
app_id=target_id,
workflow_run_id=workflow_run_id,
) if workflow_run_id else []
node_executions = (
self._query_node_executions(
tenant_id=tenant_id,
app_id=target_id,
workflow_run_id=workflow_run_id,
)
if workflow_run_id
else []
)
return EvaluationItemResult(
index=item.index,
@ -189,18 +193,18 @@ class SnippetEvaluationRunner(BaseEvaluationRunner):
Returns a list of serialisable dicts for storage in ``metadata``.
"""
stmt = WorkflowNodeExecutionModel.preload_offload_data(
select(WorkflowNodeExecutionModel)
).where(
WorkflowNodeExecutionModel.tenant_id == tenant_id,
WorkflowNodeExecutionModel.app_id == app_id,
WorkflowNodeExecutionModel.workflow_run_id == workflow_run_id,
).order_by(asc(WorkflowNodeExecutionModel.created_at))
node_models: Sequence[WorkflowNodeExecutionModel] = (
self.session.execute(stmt).scalars().all()
stmt = (
WorkflowNodeExecutionModel.preload_offload_data(select(WorkflowNodeExecutionModel))
.where(
WorkflowNodeExecutionModel.tenant_id == tenant_id,
WorkflowNodeExecutionModel.app_id == app_id,
WorkflowNodeExecutionModel.workflow_run_id == workflow_run_id,
)
.order_by(asc(WorkflowNodeExecutionModel.created_at))
)
node_models: Sequence[WorkflowNodeExecutionModel] = self.session.execute(stmt).scalars().all()
return [self._serialize_node_execution(node) for node in node_models]
@staticmethod
@ -211,6 +215,7 @@ class SnippetEvaluationRunner(BaseEvaluationRunner):
status, error, and elapsed_time. The virtual Start node injected by
SnippetGenerateService is filtered out by the caller if needed.
"""
def _safe_parse_json(value: str | None) -> Any:
if not value:
return None

View File

@ -13,7 +13,6 @@ from core.evaluation.entities.evaluation_entity import (
)
from core.evaluation.runners.base_evaluation_runner import BaseEvaluationRunner
from core.workflow.node_events import NodeRunResult
from models.model import App
logger = logging.getLogger(__name__)

View File

@ -26,13 +26,6 @@ from .dataset import (
TidbAuthBinding,
Whitelist,
)
from .evaluation import (
EvaluationConfiguration,
EvaluationRun,
EvaluationRunItem,
EvaluationRunStatus,
EvaluationTargetType,
)
from .enums import (
AppTriggerStatus,
AppTriggerType,
@ -41,6 +34,13 @@ from .enums import (
WorkflowRunTriggeredFrom,
WorkflowTriggerStatus,
)
from .evaluation import (
EvaluationConfiguration,
EvaluationRun,
EvaluationRunItem,
EvaluationRunStatus,
EvaluationTargetType,
)
from .execution_extra_content import ExecutionExtraContent, HumanInputContent
from .human_input import HumanInputForm
from .model import (
@ -165,12 +165,12 @@ __all__ = [
"Document",
"DocumentSegment",
"Embedding",
"EndUser",
"EvaluationConfiguration",
"EvaluationRun",
"EvaluationRunItem",
"EvaluationRunStatus",
"EvaluationTargetType",
"EndUser",
"ExecutionExtraContent",
"ExporleBanner",
"ExternalKnowledgeApis",

View File

@ -50,9 +50,7 @@ class EvaluationConfiguration(Base):
created_by: Mapped[str] = mapped_column(StringUUID, nullable=False)
updated_by: Mapped[str] = mapped_column(StringUUID, nullable=False)
created_at: Mapped[datetime] = mapped_column(
DateTime, nullable=False, server_default=func.current_timestamp()
)
created_at: Mapped[datetime] = mapped_column(DateTime, nullable=False, server_default=func.current_timestamp())
updated_at: Mapped[datetime] = mapped_column(
DateTime, nullable=False, server_default=func.current_timestamp(), onupdate=func.current_timestamp()
)
@ -97,17 +95,13 @@ class EvaluationRun(Base):
target_id: Mapped[str] = mapped_column(StringUUID, nullable=False)
evaluation_config_id: Mapped[str] = mapped_column(StringUUID, nullable=False)
status: Mapped[str] = mapped_column(
String(20), nullable=False, default=EvaluationRunStatus.PENDING
)
status: Mapped[str] = mapped_column(String(20), nullable=False, default=EvaluationRunStatus.PENDING)
dataset_file_id: Mapped[str | None] = mapped_column(StringUUID, nullable=True)
result_file_id: Mapped[str | None] = mapped_column(StringUUID, nullable=True)
total_items: Mapped[int] = mapped_column(Integer, nullable=False, default=0)
completed_items: Mapped[int] = mapped_column(Integer, nullable=False, default=0)
failed_items: Mapped[int] = mapped_column(Integer, nullable=False, default=0)
metrics_summary: Mapped[str | None] = mapped_column(LongText, nullable=True)
error: Mapped[str | None] = mapped_column(Text, nullable=True)
celery_task_id: Mapped[str | None] = mapped_column(String(255), nullable=True)
@ -115,23 +109,11 @@ class EvaluationRun(Base):
created_by: Mapped[str] = mapped_column(StringUUID, nullable=False)
started_at: Mapped[datetime | None] = mapped_column(DateTime, nullable=True)
completed_at: Mapped[datetime | None] = mapped_column(DateTime, nullable=True)
created_at: Mapped[datetime] = mapped_column(
DateTime, nullable=False, server_default=func.current_timestamp()
)
created_at: Mapped[datetime] = mapped_column(DateTime, nullable=False, server_default=func.current_timestamp())
updated_at: Mapped[datetime] = mapped_column(
DateTime, nullable=False, server_default=func.current_timestamp(), onupdate=func.current_timestamp()
)
@property
def metrics_summary_dict(self) -> dict[str, Any]:
if self.metrics_summary:
return json.loads(self.metrics_summary)
return {}
@metrics_summary_dict.setter
def metrics_summary_dict(self, value: dict[str, Any]) -> None:
self.metrics_summary = json.dumps(value)
@property
def progress(self) -> float:
if self.total_items == 0:
@ -168,9 +150,7 @@ class EvaluationRunItem(Base):
overall_score: Mapped[float | None] = mapped_column(Float, nullable=True)
created_at: Mapped[datetime] = mapped_column(
DateTime, nullable=False, server_default=func.current_timestamp()
)
created_at: Mapped[datetime] = mapped_column(DateTime, nullable=False, server_default=func.current_timestamp())
@property
def inputs_dict(self) -> dict[str, Any]:
@ -184,6 +164,12 @@ class EvaluationRunItem(Base):
return json.loads(self.metrics)
return []
@property
def judgment_dict(self) -> dict[str, Any]:
if self.judgment:
return json.loads(self.judgment)
return {}
@property
def metadata_dict(self) -> dict[str, Any]:
if self.metadata_json:

View File

@ -15,7 +15,6 @@ from core.evaluation.entities.evaluation_entity import (
EvaluationCategory,
EvaluationConfigData,
EvaluationDatasetInput,
EvaluationItemInput,
EvaluationRunData,
EvaluationRunRequest,
)
@ -156,6 +155,8 @@ class EvaluationService:
"""
wb = Workbook()
ws = wb.active
if ws is None:
ws = wb.create_sheet("Evaluation Dataset")
sheet_name = "Evaluation Dataset"
ws.title = sheet_name
@ -174,7 +175,7 @@ class EvaluationService:
headers = ["index"]
for field in input_fields:
field_label = field.get("label") or field.get("variable")
field_label = str(field.get("label") or field.get("variable") or "")
headers.append(field_label)
# Write header row
@ -244,13 +245,13 @@ class EvaluationService:
config.evaluation_model_provider = data.evaluation_model_provider
config.evaluation_model = data.evaluation_model
config.metrics_config = json.dumps({
"default_metrics": [m.model_dump() for m in data.default_metrics],
"customized_metrics": data.customized_metrics.model_dump() if data.customized_metrics else None,
})
config.judgement_conditions = json.dumps(
data.judgment_config.model_dump() if data.judgment_config else {}
config.metrics_config = json.dumps(
{
"default_metrics": [m.model_dump() for m in data.default_metrics],
"customized_metrics": data.customized_metrics.model_dump() if data.customized_metrics else None,
}
)
config.judgement_conditions = json.dumps(data.judgment_config.model_dump() if data.judgment_config else {})
config.updated_by = account_id
session.commit()
session.refresh(config)
@ -279,9 +280,6 @@ class EvaluationService:
if evaluation_instance is None:
raise EvaluationFrameworkNotConfiguredError()
# Derive evaluation_category from default_metrics node types
evaluation_category = cls._resolve_evaluation_category(run_request.default_metrics)
# Save as latest EvaluationConfiguration
config = cls.save_evaluation_config(
session=session,
@ -301,9 +299,7 @@ class EvaluationService:
)
max_concurrent = dify_config.EVALUATION_MAX_CONCURRENT_RUNS
if active_runs >= max_concurrent:
raise EvaluationMaxConcurrentRunsError(
f"Maximum concurrent runs ({max_concurrent}) reached."
)
raise EvaluationMaxConcurrentRunsError(f"Maximum concurrent runs ({max_concurrent}) reached.")
# Parse dataset
items = cls._parse_dataset(dataset_file_content)
@ -333,12 +329,10 @@ class EvaluationService:
target_id=target_id,
evaluation_model_provider=run_request.evaluation_model_provider,
evaluation_model=run_request.evaluation_model,
default_metrics=[m.model_dump() for m in run_request.default_metrics],
customized_metrics=(
run_request.customized_metrics.model_dump() if run_request.customized_metrics else None
),
default_metrics=run_request.default_metrics,
customized_metrics=run_request.customized_metrics,
judgment_config=run_request.judgment_config,
items=items,
input_list=items,
)
# Dispatch Celery task
@ -377,11 +371,7 @@ class EvaluationService:
tenant_id: str,
run_id: str,
) -> EvaluationRun:
run = (
session.query(EvaluationRun)
.filter_by(id=run_id, tenant_id=tenant_id)
.first()
)
run = session.query(EvaluationRun).filter_by(id=run_id, tenant_id=tenant_id).first()
if not run:
raise EvaluationNotFoundError("Evaluation run not found.")
return run
@ -591,8 +581,7 @@ class EvaluationService:
@staticmethod
def _extract_workflow_run_id(response: Mapping[str, object]) -> str | None:
"""Extract ``workflow_run_id`` from a blocking workflow response.
"""
"""Extract ``workflow_run_id`` from a blocking workflow response."""
wf_run_id = response.get("workflow_run_id")
if wf_run_id:
return str(wf_run_id)
@ -614,13 +603,15 @@ class EvaluationService:
from core.workflow.enums import WorkflowNodeExecutionStatus
from models.workflow import WorkflowNodeExecutionModel
stmt = WorkflowNodeExecutionModel.preload_offload_data(
select(WorkflowNodeExecutionModel)
).where(
WorkflowNodeExecutionModel.tenant_id == tenant_id,
WorkflowNodeExecutionModel.app_id == app_id,
WorkflowNodeExecutionModel.workflow_run_id == workflow_run_id,
).order_by(asc(WorkflowNodeExecutionModel.created_at))
stmt = (
WorkflowNodeExecutionModel.preload_offload_data(select(WorkflowNodeExecutionModel))
.where(
WorkflowNodeExecutionModel.tenant_id == tenant_id,
WorkflowNodeExecutionModel.app_id == app_id,
WorkflowNodeExecutionModel.workflow_run_id == workflow_run_id,
)
.order_by(asc(WorkflowNodeExecutionModel.created_at))
)
node_models: list[WorkflowNodeExecutionModel] = list(session.execute(stmt).scalars().all())
@ -648,7 +639,7 @@ class EvaluationService:
# ---- Dataset Parsing ----
@classmethod
def _parse_dataset(cls, xlsx_content: bytes) -> list[EvaluationItemInput]:
def _parse_dataset(cls, xlsx_content: bytes) -> list[EvaluationDatasetInput]:
"""Parse evaluation dataset from XLSX bytes."""
wb = load_workbook(io.BytesIO(xlsx_content), read_only=True)
ws = wb.active
@ -672,7 +663,7 @@ class EvaluationService:
index_val = values[0] if values else row_idx
try:
index = int(index_val)
index = int(str(index_val))
except (TypeError, ValueError):
index = row_idx
@ -681,17 +672,14 @@ class EvaluationService:
val = values[col_idx + 1] if col_idx + 1 < len(values) else None
inputs[header] = str(val) if val is not None else ""
# Check for expected_output column
# Extract expected_output column into dedicated field
expected_output = inputs.pop("expected_output", None)
context_str = inputs.pop("context", None)
context = context_str.split(";") if context_str else None
items.append(
EvaluationItemInput(
EvaluationDatasetInput(
index=index,
inputs=inputs,
expected_output=expected_output,
context=context,
)
)

View File

@ -480,8 +480,9 @@ class SnippetDslService:
},
}
self._append_workflow_export_data(export_data=export_data, snippet=snippet, workflow=workflow,
include_secret=include_secret)
self._append_workflow_export_data(
export_data=export_data, snippet=snippet, workflow=workflow, include_secret=include_secret
)
return yaml.dump(export_data, allow_unicode=True) # type: ignore

View File

@ -15,6 +15,7 @@ from core.evaluation.entities.evaluation_entity import (
EvaluationItemResult,
EvaluationRunData,
)
from core.evaluation.entities.judgment_entity import JudgmentConfig
from core.evaluation.evaluation_manager import EvaluationManager
from core.evaluation.runners.agent_evaluation_runner import AgentEvaluationRunner
from core.evaluation.runners.base_evaluation_runner import BaseEvaluationRunner
@ -25,6 +26,7 @@ from core.evaluation.runners.workflow_evaluation_runner import WorkflowEvaluatio
from core.workflow.node_events.base import NodeRunResult
from extensions.ext_database import db
from libs.datetime_utils import naive_utc_now
from models.enums import CreatorUserRole
from models.evaluation import EvaluationRun, EvaluationRunStatus
from models.model import UploadFile
from services.evaluation_service import EvaluationService
@ -86,23 +88,20 @@ def _execute_evaluation(session: Any, run_data: EvaluationRunData) -> None:
)
results: list[EvaluationItemResult] = _execute_evaluation_runner(
session,
run_data,
evaluation_instance,
session,
run_data,
evaluation_instance,
node_run_result_mapping_list,
)
# Compute summary metrics
metrics_summary = _compute_metrics_summary(results)
metrics_summary = _compute_metrics_summary(results, run_data.judgment_config)
# Generate result XLSX
result_xlsx = _generate_result_xlsx(run_data.items, results)
# Store result file
result_file_id = _store_result_file(
run_data.tenant_id, run_data.evaluation_run_id, result_xlsx, session
)
result_file_id = _store_result_file(run_data.tenant_id, run_data.evaluation_run_id, result_xlsx, session)
# Update run to completed
evaluation_run = session.query(EvaluationRun).filter_by(id=run_data.evaluation_run_id).first()
@ -116,15 +115,17 @@ def _execute_evaluation(session: Any, run_data: EvaluationRunData) -> None:
logger.info("Evaluation run %s completed successfully", run_data.evaluation_run_id)
def _execute_evaluation_runner(
session: Any,
run_data: EvaluationRunData,
evaluation_instance: BaseEvaluationInstance,
session: Any,
run_data: EvaluationRunData,
evaluation_instance: BaseEvaluationInstance,
node_run_result_mapping_list: list[dict[str, NodeRunResult]],
) -> list[EvaluationItemResult]:
"""Execute the evaluation runner."""
default_metrics = run_data.default_metrics
customized_metrics = run_data.customized_metrics
results: list[EvaluationItemResult] = []
for default_metric in default_metrics:
for node_info in default_metric.node_info_list:
node_run_result_list: list[NodeRunResult] = []
@ -134,29 +135,37 @@ def _execute_evaluation_runner(
node_run_result_list.append(node_run_result)
if node_run_result_list:
runner = _create_runner(EvaluationCategory(node_info.type), evaluation_instance, session)
runner.run(
evaluation_run_id=run_data.evaluation_run_id,
tenant_id=run_data.tenant_id,
target_id=run_data.target_id,
target_type=run_data.target_type,
default_metric=default_metric,
customized_metrics=None,
model_provider=run_data.evaluation_model_provider,
model_name=run_data.evaluation_model,
node_run_result_list=node_run_result_list,
results.extend(
runner.run(
evaluation_run_id=run_data.evaluation_run_id,
tenant_id=run_data.tenant_id,
target_id=run_data.target_id,
target_type=run_data.target_type,
default_metric=default_metric,
customized_metrics=None,
model_provider=run_data.evaluation_model_provider,
model_name=run_data.evaluation_model,
node_run_result_list=node_run_result_list,
judgment_config=run_data.judgment_config,
)
)
if customized_metrics:
runner = _create_runner(EvaluationCategory.WORKFLOW, evaluation_instance, session)
runner.run(
evaluation_run_id=run_data.evaluation_run_id,
tenant_id=run_data.tenant_id,
target_id=run_data.target_id,
target_type=run_data.target_type,
default_metric=None,
customized_metrics=customized_metrics,
node_run_result_list=None,
node_run_result_mapping_list=node_run_result_mapping_list,
results.extend(
runner.run(
evaluation_run_id=run_data.evaluation_run_id,
tenant_id=run_data.tenant_id,
target_id=run_data.target_id,
target_type=run_data.target_type,
default_metric=None,
customized_metrics=customized_metrics,
node_run_result_list=None,
node_run_result_mapping_list=node_run_result_mapping_list,
judgment_config=run_data.judgment_config,
)
)
return results
def _create_runner(
category: EvaluationCategory,
@ -192,34 +201,32 @@ def _mark_run_failed(session: Any, run_id: str, error: str) -> None:
logger.exception("Failed to mark run %s as failed", run_id)
def _compute_metrics_summary(results: list[EvaluationItemResult]) -> dict[str, Any]:
"""Compute average scores per metric across all results."""
metric_scores: dict[str, list[float]] = {}
for result in results:
if result.error:
continue
for metric in result.metrics:
if metric.name not in metric_scores:
metric_scores[metric.name] = []
metric_scores[metric.name].append(metric.score)
def _compute_metrics_summary(
results: list[EvaluationItemResult],
judgment_config: JudgmentConfig | None,
) -> dict[str, Any]:
"""Compute aggregate metric and judgment summaries for an evaluation run.
Metric statistics are calculated from successful item results only. When a
judgment config is present, the summary also reports how many successful
items passed or failed the configured judgment rules.
"""
summary: dict[str, Any] = {}
for name, scores in metric_scores.items():
summary[name] = {
"average": sum(scores) / len(scores) if scores else 0.0,
"min": min(scores) if scores else 0.0,
"max": max(scores) if scores else 0.0,
"count": len(scores),
}
# Overall average
all_scores = [s for scores in metric_scores.values() for s in scores]
summary["_overall"] = {
"average": sum(all_scores) / len(all_scores) if all_scores else 0.0,
"total_items": len(results),
"successful_items": sum(1 for r in results if r.error is None),
"failed_items": sum(1 for r in results if r.error is not None),
}
if judgment_config is not None and judgment_config.conditions:
evaluated_results: list[EvaluationItemResult] = [result for result in results if result.error is None and result.metrics]
passed_items = sum(1 for result in evaluated_results if result.judgment.passed)
evaluated_items = len(evaluated_results)
summary["_judgment"] = {
"enabled": True,
"logical_operator": judgment_config.logical_operator,
"configured_conditions": len(judgment_config.conditions),
"evaluated_items": evaluated_items,
"passed_items": passed_items,
"failed_items": evaluated_items - passed_items,
"pass_rate": passed_items / evaluated_items if evaluated_items else 0.0,
}
return summary
@ -231,6 +238,8 @@ def _generate_result_xlsx(
"""Generate result XLSX with input data, actual output, and metric scores."""
wb = Workbook()
ws = wb.active
if ws is None:
ws = wb.create_sheet("Evaluation Results")
ws.title = "Evaluation Results"
header_font = Font(bold=True, color="FFFFFF")
@ -259,11 +268,7 @@ def _generate_result_xlsx(
# Build headers
headers = (
["index"]
+ input_keys
+ ["expected_output", "actual_output"]
+ all_metric_names
+ ["overall_score", "error"]
["index"] + input_keys + ["expected_output", "actual_output"] + all_metric_names + ["overall_score", "error"]
)
# Write header row
@ -306,16 +311,14 @@ def _generate_result_xlsx(
col += 1
# Metric scores
metric_scores = {m.name: m.score for m in result.metrics} if result else {}
metric_scores = {m.name: m.value for m in result.metrics} if result else {}
for metric_name in all_metric_names:
score = metric_scores.get(metric_name)
ws.cell(row=row_idx, column=col, value=score if score is not None else "").border = thin_border
col += 1
# Overall score
ws.cell(
row=row_idx, column=col, value=result.overall_score if result else ""
).border = thin_border
ws.cell(row=row_idx, column=col, value=result.overall_score if result else "").border = thin_border
col += 1
# Error
@ -351,7 +354,7 @@ def _store_result_file(
size=len(xlsx_content),
extension="xlsx",
mime_type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
created_by_role="account",
created_by_role=CreatorUserRole.ACCOUNT,
created_by="system",
created_at=naive_utc_now(),
used=False,

View File

@ -0,0 +1,69 @@
"""Unit tests for metric-based judgment evaluation."""
from core.evaluation.entities.judgment_entity import JudgmentCondition, JudgmentConfig
from core.evaluation.judgment.processor import JudgmentProcessor
def test_evaluate_uses_and_conditions_against_metric_values() -> None:
"""All conditions must pass when the logical operator is ``and``."""
config = JudgmentConfig(
logical_operator="and",
conditions=[
JudgmentCondition(
metric_name="faithfulness",
comparison_operator=">",
condition_value="0.8",
condition_type="number",
),
JudgmentCondition(
metric_name="answer_relevancy",
comparison_operator="",
condition_value="0.7",
condition_type="number",
),
],
)
result = JudgmentProcessor.evaluate(
{
"faithfulness": 0.9,
"answer_relevancy": 0.75,
},
config,
)
assert result.passed is True
assert len(result.condition_results) == 2
assert all(condition_result.passed for condition_result in result.condition_results)
def test_evaluate_sets_passed_false_when_any_and_condition_fails() -> None:
"""A failed metric comparison should make the overall judgment fail."""
config = JudgmentConfig(
logical_operator="and",
conditions=[
JudgmentCondition(
metric_name="faithfulness",
comparison_operator=">",
condition_value="0.8",
condition_type="number",
),
JudgmentCondition(
metric_name="answer_relevancy",
comparison_operator="",
condition_value="0.7",
condition_type="number",
),
],
)
result = JudgmentProcessor.evaluate(
{
"faithfulness": 0.9,
"answer_relevancy": 0.6,
},
config,
)
assert result.passed is False
assert result.condition_results[-1].passed is False

View File

@ -0,0 +1,80 @@
"""Tests for judgment application in the base evaluation runner."""
from unittest.mock import Mock
from core.evaluation.entities.evaluation_entity import DefaultMetric, EvaluationItemResult, EvaluationMetric
from core.evaluation.entities.judgment_entity import JudgmentCondition, JudgmentConfig
from core.evaluation.runners.base_evaluation_runner import BaseEvaluationRunner
class _FakeItemInput:
def __init__(self, index: int) -> None:
self.index = index
self.inputs = {"query": "hello"}
self.expected_output = "world"
self.context = None
class _FakeEvaluationRun:
def __init__(self) -> None:
self.status = None
self.started_at = None
self.input_list = [_FakeItemInput(index=0)]
class _FakeRunner(BaseEvaluationRunner):
def evaluate_metrics(
self,
node_run_result_mapping_list,
node_run_result_list,
default_metric,
customized_metrics,
model_provider,
model_name,
tenant_id,
) -> list[EvaluationItemResult]:
return [
EvaluationItemResult(
index=0,
actual_output="result",
metrics=[EvaluationMetric(name="faithfulness", value=0.91)],
)
]
def test_run_applies_judgment_before_persisting_results() -> None:
"""Runner should evaluate judgment rules before persisting item rows."""
# Arrange
session = Mock()
evaluation_run = _FakeEvaluationRun()
session.query.return_value.filter_by.return_value.first.return_value = evaluation_run
runner = _FakeRunner(evaluation_instance=Mock(), session=session)
judgment_config = JudgmentConfig(
logical_operator="and",
conditions=[
JudgmentCondition(
metric_name="faithfulness",
comparison_operator=">",
condition_value="0.8",
condition_type="number",
)
],
)
# Act
results = runner.run(
evaluation_run_id="run-id",
tenant_id="tenant-id",
target_id="target-id",
target_type="app",
node_run_result_list=[Mock()],
default_metric=DefaultMetric(metric="faithfulness", node_info_list=[]),
judgment_config=judgment_config,
)
# Assert
assert results[0].judgment.passed is True
persisted_item = session.add.call_args.args[0]
assert persisted_item.judgment is not None
assert '"passed": true' in persisted_item.judgment

View File

@ -0,0 +1,58 @@
"""Unit tests for evaluation task judgment aggregation helpers."""
from core.evaluation.entities.evaluation_entity import EvaluationItemResult, EvaluationMetric
from core.evaluation.entities.judgment_entity import (
JudgmentCondition,
JudgmentConfig,
JudgmentResult,
)
from tasks.evaluation_task import _compute_metrics_summary
def test_compute_metrics_summary_includes_judgment_counts() -> None:
"""Summary should expose pass/fail counts when judgment rules are configured."""
# Arrange
judgment_config = JudgmentConfig(
logical_operator="and",
conditions=[
JudgmentCondition(
metric_name="faithfulness",
comparison_operator=">",
condition_value="0.8",
condition_type="number",
)
],
)
results = [
EvaluationItemResult(
index=0,
metrics=[EvaluationMetric(name="faithfulness", value=0.9)],
judgment=JudgmentResult(passed=True, logical_operator="and", condition_results=[]),
),
EvaluationItemResult(
index=1,
metrics=[EvaluationMetric(name="faithfulness", value=0.4)],
judgment=JudgmentResult(passed=False, logical_operator="and", condition_results=[]),
),
EvaluationItemResult(index=2, error="timeout"),
]
# Act
summary = _compute_metrics_summary(results, judgment_config)
# Assert
assert summary["faithfulness"] == {
"average": 0.65,
"min": 0.4,
"max": 0.9,
"count": 2,
}
assert summary["_judgment"] == {
"enabled": True,
"logical_operator": "and",
"configured_conditions": 1,
"evaluated_items": 2,
"passed_items": 1,
"failed_items": 1,
"pass_rate": 0.5,
}