evaluation runtime

This commit is contained in:
jyong 2026-03-16 18:08:46 +08:00
parent f60084fc43
commit f81bcf53e3
22 changed files with 237 additions and 417 deletions

View File

@ -1383,7 +1383,7 @@ class EvaluationConfig(BaseSettings):
EVALUATION_MAX_DATASET_ROWS: PositiveInt = Field(
description="Maximum number of rows allowed in an evaluation dataset",
default=1000,
default=500,
)
EVALUATION_TASK_TIMEOUT: PositiveInt = Field(

View File

@ -106,6 +106,9 @@ from .datasets.rag_pipeline import (
rag_pipeline_workflow,
)
# Import evaluation controllers
from .evaluation import evaluation
# Import explore controllers
from .explore import (
banner,
@ -116,9 +119,6 @@ from .explore import (
trial,
)
# Import evaluation controllers
from .evaluation import evaluation
# Import snippet controllers
from .snippets import snippet_workflow

View File

@ -153,9 +153,7 @@ def get_evaluation_target(view_func: Callable[P, R]):
target: Union[App, CustomizedSnippet] | None = None
if target_type == "app":
target = (
db.session.query(App).where(App.id == target_id, App.tenant_id == current_tenant_id).first()
)
target = db.session.query(App).where(App.id == target_id, App.tenant_id == current_tenant_id).first()
elif target_type == "snippets":
target = (
db.session.query(CustomizedSnippet)
@ -229,9 +227,7 @@ class EvaluationDetailApi(Resource):
_, current_tenant_id = current_account_with_tenant()
with Session(db.engine, expire_on_commit=False) as session:
config = EvaluationService.get_evaluation_config(
session, current_tenant_id, target_type, str(target.id)
)
config = EvaluationService.get_evaluation_config(session, current_tenant_id, target_type, str(target.id))
if config is None:
return {
@ -360,9 +356,7 @@ class EvaluationRunApi(Resource):
# Load dataset file
upload_file = (
db.session.query(UploadFile)
.filter_by(id=run_request.file_id, tenant_id=current_tenant_id)
.first()
db.session.query(UploadFile).filter_by(id=run_request.file_id, tenant_id=current_tenant_id).first()
)
if not upload_file:
raise NotFound("Dataset file not found.")
@ -397,9 +391,7 @@ class EvaluationRunApi(Resource):
return {"message": str(e.description)}, 400
@console_ns.route(
"/<string:evaluate_target_type>/<uuid:evaluate_target_id>/evaluation/runs/<uuid:run_id>"
)
@console_ns.route("/<string:evaluate_target_type>/<uuid:evaluate_target_id>/evaluation/runs/<uuid:run_id>")
class EvaluationRunDetailApi(Resource):
@console_ns.doc("get_evaluation_run_detail")
@console_ns.response(200, "Evaluation run detail retrieved")
@ -444,9 +436,7 @@ class EvaluationRunDetailApi(Resource):
return {"message": str(e.description)}, 404
@console_ns.route(
"/<string:evaluate_target_type>/<uuid:evaluate_target_id>/evaluation/runs/<uuid:run_id>/cancel"
)
@console_ns.route("/<string:evaluate_target_type>/<uuid:evaluate_target_id>/evaluation/runs/<uuid:run_id>/cancel")
class EvaluationRunCancelApi(Resource):
@console_ns.doc("cancel_evaluation_run")
@console_ns.response(200, "Evaluation run cancelled")
@ -599,6 +589,7 @@ def _serialize_evaluation_run_item(item: EvaluationRunItem) -> dict[str, object]
"expected_output": item.expected_output,
"actual_output": item.actual_output,
"metrics": item.metrics_list,
"judgment": item.judgment_dict,
"metadata": item.metadata_dict,
"error": item.error,
"overall_score": item.overall_score,

View File

@ -237,9 +237,7 @@ class CustomizedSnippetExportApi(Resource):
with Session(db.engine) as session:
export_service = SnippetDslService(session)
result = export_service.export_snippet_dsl(
snippet=snippet, include_secret=query.include_secret == "true"
)
result = export_service.export_snippet_dsl(snippet=snippet, include_secret=query.include_secret == "true")
return {"data": result}, 200

View File

@ -16,7 +16,7 @@ logger = logging.getLogger(__name__)
class BaseEvaluationInstance(ABC):
"""Abstract base class for evaluation framework adapters. """
"""Abstract base class for evaluation framework adapters."""
@abstractmethod
def evaluate_llm(
@ -95,33 +95,26 @@ class BaseEvaluationInstance(ABC):
workflow_id = customized_metrics.evaluation_workflow_id
if not workflow_id:
raise ValueError(
"customized_metrics must contain 'evaluation_workflow_id' for customized evaluator"
)
raise ValueError("customized_metrics must contain 'evaluation_workflow_id' for customized evaluator")
# Load the evaluator workflow resources using a dedicated session
with Session(db.engine, expire_on_commit=False) as session, session.begin():
app = session.query(App).filter_by(
id=workflow_id, tenant_id=tenant_id
).first()
app = session.query(App).filter_by(id=workflow_id, tenant_id=tenant_id).first()
if not app:
raise ValueError(
f"Evaluation workflow app {workflow_id} not found in tenant {tenant_id}"
)
raise ValueError(f"Evaluation workflow app {workflow_id} not found in tenant {tenant_id}")
service_account = get_service_account_for_app(session, workflow_id)
workflow_service = WorkflowService()
published_workflow = workflow_service.get_published_workflow(app_model=app)
if not published_workflow:
raise ValueError(
f"No published workflow found for evaluation app {workflow_id}"
)
raise ValueError(f"No published workflow found for evaluation app {workflow_id}")
eval_results: list[EvaluationItemResult] = []
for idx, node_run_result_mapping in enumerate(node_run_result_mapping_list):
try:
workflow_inputs = self._build_workflow_inputs(
customized_metrics.input_fields, node_run_result_mapping,
customized_metrics.input_fields,
node_run_result_mapping,
)
generator = WorkflowAppGenerator()
@ -183,14 +176,13 @@ class BaseEvaluationInstance(ABC):
full_match = VARIABLE_REGEX.fullmatch(value_source)
if full_match:
workflow_inputs[field_name] = resolve_variable_selector(
full_match.group(1), node_run_result_mapping,
full_match.group(1),
node_run_result_mapping,
)
elif VARIABLE_REGEX.search(value_source):
# Mixed template: interpolate all expressions as strings.
workflow_inputs[field_name] = VARIABLE_REGEX.sub(
lambda m: str(
resolve_variable_selector(m.group(1), node_run_result_mapping)
),
lambda m: str(resolve_variable_selector(m.group(1), node_run_result_mapping)),
value_source,
)
else:
@ -213,9 +205,7 @@ class BaseEvaluationInstance(ABC):
outputs = data.get("outputs")
if not isinstance(outputs, dict):
logger.warning(
"Unexpected workflow response format: 'outputs' is not a dict"
)
logger.warning("Unexpected workflow response format: 'outputs' is not a dict")
return metrics
for key, raw_value in outputs.items():
@ -239,7 +229,8 @@ def resolve_variable_selector(
if len(parts) < 2:
logger.warning(
"Selector '%s' must have at least node_id.output_key", selector_raw,
"Selector '%s' must have at least node_id.output_key",
selector_raw,
)
return ""
@ -250,7 +241,8 @@ def resolve_variable_selector(
if not node_result or not node_result.outputs:
logger.warning(
"Selector '%s': node '%s' not found or has no outputs",
selector_raw, node_id,
selector_raw,
node_id,
)
return ""
@ -262,14 +254,17 @@ def resolve_variable_selector(
if next_val is None:
logger.warning(
"Selector '%s': key '%s' not found in node '%s' outputs",
selector_raw, key, node_id,
selector_raw,
key,
node_id,
)
return ""
current = next_val
else:
logger.warning(
"Selector '%s': cannot traverse into non-dict value at key '%s'",
selector_raw, key,
selector_raw,
key,
)
return ""

View File

@ -11,14 +11,17 @@ class EvaluationFrameworkEnum(StrEnum):
class BaseEvaluationConfig(BaseModel):
"""Base configuration for evaluation frameworks."""
pass
class RagasConfig(BaseEvaluationConfig):
"""RAGAS-specific configuration."""
pass
class DeepEvalConfig(BaseEvaluationConfig):
"""DeepEval-specific configuration."""
pass

View File

@ -40,9 +40,10 @@ class EvaluationItemResult(BaseModel):
actual_output: str | None = None
metrics: list[EvaluationMetric] = Field(default_factory=list)
metadata: dict[str, Any] = Field(default_factory=dict)
judgment: JudgmentResult | None = None
judgment: JudgmentResult = Field(default_factory=JudgmentResult)
error: str | None = None
class NodeInfo(BaseModel):
node_id: str
type: str
@ -67,6 +68,7 @@ class CustomizedMetrics(BaseModel):
class EvaluationConfigData(BaseModel):
"""Structured data for saving evaluation configuration."""
evaluation_model: str = ""
evaluation_model_provider: str = ""
default_metrics: list[DefaultMetric] = Field(default_factory=list)
@ -76,11 +78,13 @@ class EvaluationConfigData(BaseModel):
class EvaluationRunRequest(EvaluationConfigData):
"""Request body for starting an evaluation run."""
file_id: str
class EvaluationRunData(BaseModel):
"""Serializable data for Celery task."""
evaluation_run_id: str
tenant_id: str
target_type: str

View File

@ -1,11 +1,6 @@
"""Judgment condition entities for evaluation metric assessment.
Key concepts:
- **value_source**: Where the comparison target comes from.
- "constant": a literal value supplied by the user (e.g. threshold "0.8").
- "variable": a named field from the evaluation target's runtime data
(inputs, actual_output, expected_output). The ``value`` field holds the
variable key; the actual comparison value is resolved at evaluation time.
- **condition_type**: Determines operator semantics and type coercion.
- "string": string operators (contains, is, start with, ).
- "number": numeric operators (>, <, =, , , ).
@ -18,34 +13,19 @@ Typical usage:
JudgmentCondition(
metric_name="faithfulness",
comparison_operator=">",
value="0.8",
condition_value="0.8",
condition_type="number",
),
JudgmentCondition(
metric_name="output",
comparison_operator="contains",
value="expected_output",
value_source="variable",
condition_type="string",
),
)
],
)
"""
from collections.abc import Sequence
from enum import StrEnum
from typing import Any, Literal
from pydantic import BaseModel, Field
class JudgmentValueSource(StrEnum):
"""Where the comparison target value comes from."""
CONSTANT = "constant"
VARIABLE = "variable"
class JudgmentConditionType(StrEnum):
"""Category of the condition, controls operator semantics and type coercion."""
@ -90,22 +70,15 @@ class JudgmentCondition(BaseModel):
metric_name: The name of the evaluation metric to check (left side).
Must match an EvaluationMetric.name in the results.
comparison_operator: The comparison operator to apply.
value: The comparison target (right side).
- When value_source is "constant": the literal threshold/expected value.
- When value_source is "variable": the variable key name to look up
from the evaluation target's runtime data.
For unary operators (empty, null, etc.), this can be None.
value_source: Where the comparison value comes from.
"constant" (default) for user-supplied literals,
"variable" for references to evaluation target data.
condition_value: The comparison target (right side). For unary operators
such as ``empty`` or ``null`` this can be ``None``.
condition_type: Controls type coercion and which operators are valid.
"string" (default), "number", or "datetime".
"""
metric_name: str
comparison_operator: JudgmentComparisonOperator
value: str | Sequence[str] | None = None
value_source: JudgmentValueSource = JudgmentValueSource.CONSTANT
condition_value: Any | None = None
condition_type: JudgmentConditionType = JudgmentConditionType.STRING

View File

@ -67,9 +67,7 @@ class DeepEvalEvaluator(BaseEvaluationInstance):
model_name: str,
tenant_id: str,
) -> list[EvaluationItemResult]:
return self._evaluate(
items, metric_name, model_provider, model_name, tenant_id, EvaluationCategory.RETRIEVAL
)
return self._evaluate(items, metric_name, model_provider, model_name, tenant_id, EvaluationCategory.RETRIEVAL)
def evaluate_agent(
self,
@ -79,9 +77,7 @@ class DeepEvalEvaluator(BaseEvaluationInstance):
model_name: str,
tenant_id: str,
) -> list[EvaluationItemResult]:
return self._evaluate(
items, metric_name, model_provider, model_name, tenant_id, EvaluationCategory.AGENT
)
return self._evaluate(items, metric_name, model_provider, model_name, tenant_id, EvaluationCategory.AGENT)
def evaluate_workflow(
self,
@ -91,9 +87,7 @@ class DeepEvalEvaluator(BaseEvaluationInstance):
model_name: str,
tenant_id: str,
) -> list[EvaluationItemResult]:
return self._evaluate(
items, metric_name, model_provider, model_name, tenant_id, EvaluationCategory.WORKFLOW
)
return self._evaluate(items, metric_name, model_provider, model_name, tenant_id, EvaluationCategory.WORKFLOW)
def _evaluate(
self,
@ -106,11 +100,7 @@ class DeepEvalEvaluator(BaseEvaluationInstance):
) -> list[EvaluationItemResult]:
"""Core evaluation logic using DeepEval."""
model_wrapper = DifyModelWrapper(model_provider, model_name, tenant_id)
requested_metrics = (
[metric_name]
if metric_name
else self.get_supported_metrics(category)
)
requested_metrics = [metric_name] if metric_name else self.get_supported_metrics(category)
try:
return self._evaluate_with_deepeval(items, requested_metrics, category)
@ -144,9 +134,7 @@ class DeepEvalEvaluator(BaseEvaluationInstance):
try:
metric.measure(test_case)
if metric.score is not None:
metrics.append(
EvaluationMetric(name=metric.__class__.__name__, value=float(metric.score))
)
metrics.append(EvaluationMetric(name=metric.__class__.__name__, value=float(metric.score)))
except Exception:
logger.exception(
"Failed to compute metric %s for item %d",
@ -229,9 +217,7 @@ class DeepEvalEvaluator(BaseEvaluationInstance):
parts.append(f"\nExpected Output: {item.expected_output}")
if item.context:
parts.append(f"\nContext: {'; '.join(item.context)}")
parts.append(
"\nRespond with ONLY a single floating point number between 0.0 and 1.0, nothing else."
)
parts.append("\nRespond with ONLY a single floating point number between 0.0 and 1.0, nothing else.")
return "\n".join(parts)
@staticmethod

View File

@ -57,9 +57,7 @@ class RagasEvaluator(BaseEvaluationInstance):
model_name: str,
tenant_id: str,
) -> list[EvaluationItemResult]:
return self._evaluate(
items, metric_name, model_provider, model_name, tenant_id, EvaluationCategory.RETRIEVAL
)
return self._evaluate(items, metric_name, model_provider, model_name, tenant_id, EvaluationCategory.RETRIEVAL)
def evaluate_agent(
self,
@ -79,9 +77,7 @@ class RagasEvaluator(BaseEvaluationInstance):
model_name: str,
tenant_id: str,
) -> list[EvaluationItemResult]:
return self._evaluate(
items, metric_name, model_provider, model_name, tenant_id, EvaluationCategory.WORKFLOW
)
return self._evaluate(items, metric_name, model_provider, model_name, tenant_id, EvaluationCategory.WORKFLOW)
def _evaluate(
self,
@ -94,11 +90,7 @@ class RagasEvaluator(BaseEvaluationInstance):
) -> list[EvaluationItemResult]:
"""Core evaluation logic using RAGAS."""
model_wrapper = DifyModelWrapper(model_provider, model_name, tenant_id)
requested_metrics = (
[metric_name]
if metric_name
else self.get_supported_metrics(category)
)
requested_metrics = [metric_name] if metric_name else self.get_supported_metrics(category)
try:
return self._evaluate_with_ragas(items, requested_metrics, model_wrapper, category)
@ -237,9 +229,7 @@ class RagasEvaluator(BaseEvaluationInstance):
parts.append(f"\nExpected Output: {item.expected_output}")
if item.context:
parts.append(f"\nContext: {'; '.join(item.context)}")
parts.append(
"\nRespond with ONLY a single floating point number between 0.0 and 1.0, nothing else."
)
parts.append("\nRespond with ONLY a single floating point number between 0.0 and 1.0, nothing else.")
return "\n".join(parts)
@staticmethod

View File

@ -1,35 +1,20 @@
"""Judgment condition processor for evaluation metrics.
Evaluates pass/fail judgment conditions against evaluation metric values.
Reuses the core comparison engine from the workflow condition system
(core.workflow.utils.condition.processor._evaluate_condition) to ensure
consistent operator semantics across the platform.
Each condition uses:
- ``metric_name`` as the left-hand side lookup key from ``metric_values``
- ``comparison_operator`` as the operator
- ``condition_value`` as the right-hand side comparison value
The processor is intentionally decoupled from evaluation frameworks
(RAGAS / Customized) and runners. It operates on plain ``dict`` mappings
and can be invoked from any context.
Typical usage::
metrics = {"faithfulness": 0.85, "answer_relevancy": 0.6}
variables = {"expected_output": "Hello World", "created_at": "2025-01-01T00:00:00"}
config = JudgmentConfig(
logical_operator="and",
conditions=[
JudgmentCondition(metric_name="faithfulness", comparison_operator=">",
value="0.8", condition_type="number"),
JudgmentCondition(metric_name="output", comparison_operator="contains",
value="expected_output", value_source="variable",
condition_type="string"),
],
)
result = JudgmentProcessor.evaluate(metrics, config, variable_values=variables)
The processor is intentionally decoupled from evaluation frameworks and
runners. It operates on plain ``dict`` mappings and can be invoked anywhere
that already has per-item metric results.
"""
import logging
from collections.abc import Sequence
from datetime import datetime
from typing import Any
from typing import Any, cast
from core.evaluation.entities.judgment_entity import (
JudgmentCondition,
@ -37,9 +22,9 @@ from core.evaluation.entities.judgment_entity import (
JudgmentConditionType,
JudgmentConfig,
JudgmentResult,
JudgmentValueSource,
)
from core.workflow.utils.condition.processor import _evaluate_condition
from core.workflow.utils.condition.entities import SupportedComparisonOperator
from core.workflow.utils.condition.processor import _evaluate_condition # pyright: ignore[reportPrivateUsage]
logger = logging.getLogger(__name__)
@ -48,12 +33,10 @@ _UNARY_OPERATORS = frozenset({"null", "not null", "empty", "not empty"})
class JudgmentProcessor:
@staticmethod
def evaluate(
metric_values: dict[str, Any],
config: JudgmentConfig,
variable_values: dict[str, Any] | None = None,
) -> JudgmentResult:
"""Evaluate all judgment conditions against the given metric values.
@ -61,9 +44,6 @@ class JudgmentProcessor:
metric_values: Mapping of metric name metric value
(e.g. ``{"faithfulness": 0.85, "status": "success"}``).
config: The judgment configuration with logical_operator and conditions.
variable_values: Optional mapping of variable name value, used when
a condition's ``value_source`` is ``"variable"``. Typically built
from the evaluation target's inputs / outputs.
Returns:
JudgmentResult with overall pass/fail and per-condition details.
@ -78,9 +58,7 @@ class JudgmentProcessor:
condition_results: list[JudgmentConditionResult] = []
for condition in config.conditions:
result = JudgmentProcessor._evaluate_single_condition(
metric_values, condition, variable_values
)
result = JudgmentProcessor._evaluate_single_condition(metric_values, condition)
condition_results.append(result)
if config.logical_operator == "and" and not result.passed:
@ -112,14 +90,12 @@ class JudgmentProcessor:
def _evaluate_single_condition(
metric_values: dict[str, Any],
condition: JudgmentCondition,
variable_values: dict[str, Any] | None = None,
) -> JudgmentConditionResult:
"""Evaluate a single judgment condition.
Steps:
1. Look up the metric value (left side) by ``metric_name``.
2. Resolve the comparison value (right side) either a constant
or a variable reference.
2. Read ``condition_value`` as the comparison value (right side).
3. Dispatch to the correct type handler (string / number / datetime).
"""
metric_name = condition.metric_name
@ -130,41 +106,28 @@ class JudgmentProcessor:
return JudgmentConditionResult(
metric_name=metric_name,
comparison_operator=condition.comparison_operator,
expected_value=condition.value,
expected_value=condition.condition_value,
actual_value=None,
passed=False,
error=f"Metric '{metric_name}' not found in evaluation results",
)
# Resolve the comparison value (right side)
try:
resolved_value = JudgmentProcessor._resolve_comparison_value(
condition, variable_values
)
except ValueError as e:
return JudgmentConditionResult(
metric_name=metric_name,
comparison_operator=condition.comparison_operator,
expected_value=condition.value,
actual_value=actual_value,
passed=False,
error=str(e),
)
resolved_value = condition.condition_value
# Dispatch to the appropriate type handler
try:
match condition.condition_type:
case JudgmentConditionType.DATETIME:
passed = _evaluate_datetime_condition(
actual_value, condition.comparison_operator, resolved_value
)
passed = _evaluate_datetime_condition(actual_value, condition.comparison_operator, resolved_value)
case JudgmentConditionType.NUMBER:
passed = _evaluate_number_condition(
actual_value, condition.comparison_operator, resolved_value
)
passed = _evaluate_number_condition(actual_value, condition.comparison_operator, resolved_value)
case _: # STRING (default) — delegate to workflow engine
if condition.comparison_operator in {"before", "after"}:
raise ValueError(
f"Operator '{condition.comparison_operator}' is not supported for string conditions"
)
passed = _evaluate_condition(
operator=condition.comparison_operator,
operator=cast(SupportedComparisonOperator, condition.comparison_operator),
value=actual_value,
expected=resolved_value,
)
@ -191,51 +154,6 @@ class JudgmentProcessor:
error=str(e),
)
@staticmethod
def _resolve_comparison_value(
condition: JudgmentCondition,
variable_values: dict[str, Any] | None,
) -> str | Sequence[str] | None:
"""Resolve the right-side comparison value.
For ``value_source == "constant"``, returns ``condition.value`` as-is.
For ``value_source == "variable"``, looks up ``condition.value`` (as a key)
in ``variable_values`` and returns the resolved value (converted to string
for compatibility with the comparison engine).
Raises:
ValueError: If the variable cannot be resolved.
"""
if condition.value_source == JudgmentValueSource.CONSTANT:
return condition.value
# Variable resolution
if condition.value is None:
raise ValueError("Variable name (value) must be provided when value_source is 'variable'")
if not variable_values:
raise ValueError(
f"Cannot resolve variable '{condition.value}': no variable values provided"
)
var_key = condition.value if isinstance(condition.value, str) else str(condition.value)
if var_key not in variable_values:
raise ValueError(
f"Variable '{var_key}' not found in evaluation target data. "
f"Available variables: {list(variable_values.keys())}"
)
resolved = variable_values[var_key]
# Convert to string for the comparison engine, unless it's already
# a str/Sequence[str]/None which the engine expects.
if resolved is None:
return None
if isinstance(resolved, str):
return resolved
if isinstance(resolved, Sequence) and all(isinstance(v, str) for v in resolved):
return resolved
return str(resolved)
_DATETIME_FORMATS = [
"%Y-%m-%dT%H:%M:%S",
@ -348,7 +266,11 @@ def _evaluate_number_condition(
"""
# Unary operators — delegate to workflow engine as-is
if operator in _UNARY_OPERATORS:
return _evaluate_condition(operator=operator, value=actual, expected=expected)
return _evaluate_condition(
operator=cast(SupportedComparisonOperator, operator),
value=actual,
expected=cast(str | Sequence[str] | bool | Sequence[bool] | None, expected),
)
if actual is None:
return False
@ -356,7 +278,7 @@ def _evaluate_number_condition(
# Coerce actual to numeric
if not isinstance(actual, (int, float)):
try:
actual = float(actual)
actual = float(cast(str | int | float, actual))
except (TypeError, ValueError) as e:
raise ValueError(f"Cannot convert actual value '{actual}' to number") from e
@ -365,4 +287,8 @@ def _evaluate_number_condition(
if expected is not None and not isinstance(expected, str):
expected = str(expected)
return _evaluate_condition(operator=operator, value=actual, expected=expected)
return _evaluate_condition(
operator=cast(SupportedComparisonOperator, operator),
value=actual,
expected=expected,
)

View File

@ -20,11 +20,7 @@ def get_service_account_for_app(session: Session, app_id: str) -> Account:
if not account:
raise ValueError(f"Creator account not found for app {app_id}")
current_tenant = (
session.query(TenantAccountJoin)
.filter_by(account_id=account.id, current=True)
.first()
)
current_tenant = session.query(TenantAccountJoin).filter_by(account_id=account.id, current=True).first()
if not current_tenant:
raise ValueError(f"Current tenant not found for account {account.id}")
@ -48,11 +44,7 @@ def get_service_account_for_snippet(session: Session, snippet_id: str) -> Accoun
if not account:
raise ValueError(f"Creator account not found for snippet {snippet_id}")
current_tenant = (
session.query(TenantAccountJoin)
.filter_by(account_id=account.id, current=True)
.first()
)
current_tenant = session.query(TenantAccountJoin).filter_by(account_id=account.id, current=True).first()
if not current_tenant:
raise ValueError(f"Current tenant not found for account {account.id}")

View File

@ -141,10 +141,12 @@ class AgentEvaluationRunner(BaseEvaluationRunner):
answer_parts.append(thought)
tool = chunk.get("tool")
if tool:
tool_calls.append({
"tool": tool,
"tool_input": chunk.get("tool_input", ""),
})
tool_calls.append(
{
"tool": tool,
"tool_input": chunk.get("tool_input", ""),
}
)
elif event == "message":
answer = chunk.get("answer", "")
if answer:

View File

@ -1,11 +1,14 @@
"""Base evaluation runner.
Orchestrates the evaluation lifecycle in four phases:
1. execute_target run the target and collect actual outputs (abstract)
1. execute_target run the target and collect actual outputs (abstract)
2. evaluate_metrics compute metrics via framework or customized workflow
3. apply_judgment evaluate pass/fail judgment conditions on metrics
4. persist save results to the database
The persisted ``EvaluationRunItem.judgment`` payload must reflect the final
judgment result for each evaluated item, so judgment evaluation happens before
the persistence phase whenever a ``JudgmentConfig`` is supplied.
"""
import json
@ -30,7 +33,7 @@ logger = logging.getLogger(__name__)
class BaseEvaluationRunner(ABC):
"""Abstract base class for evaluation runners. """
"""Abstract base class for evaluation runners."""
def __init__(self, evaluation_instance: BaseEvaluationInstance, session: Session):
self.evaluation_instance = evaluation_instance
@ -62,6 +65,7 @@ class BaseEvaluationRunner(ABC):
model_provider: str = "",
model_name: str = "",
node_run_result_mapping_list: list[dict[str, NodeRunResult]] | None = None,
judgment_config: JudgmentConfig | None = None,
) -> list[EvaluationItemResult]:
"""Orchestrate target execution + metric evaluation + judgment for all items."""
evaluation_run = self.session.query(EvaluationRun).filter_by(id=evaluation_run_id).first()
@ -115,9 +119,16 @@ class BaseEvaluationRunner(ABC):
results = list(results_by_index.values())
if judgment_config is not None:
results = self._apply_judgment(
results=results,
judgment_config=judgment_config,
node_run_result_mapping_list=node_run_result_mapping_list,
)
# Phase 4: Persist individual items
for result in results:
item_input = next((item for item in items if item.index == result.index), None)
item_input = next((item for item in evaluation_run.input_list if item.index == result.index), None)
run_item = EvaluationRunItem(
evaluation_run_id=evaluation_run_id,
item_index=result.index,
@ -129,7 +140,7 @@ class BaseEvaluationRunner(ABC):
judgment=json.dumps(result.judgment.model_dump()) if result.judgment else None,
metadata_json=json.dumps(result.metadata) if result.metadata else None,
error=result.error,
overall_score=result.overall_score,
overall_score=getattr(result, "overall_score", None),
)
self.session.add(run_item)
@ -145,54 +156,21 @@ class BaseEvaluationRunner(ABC):
) -> list[EvaluationItemResult]:
"""Apply judgment conditions to each result's metrics.
Left side (``metric_name``): looked up from evaluate-phase metrics only.
Right side: when ``value_source="variable"``, ``condition.value``
contains an expression (e.g. ``{{#node_id.output_key#}}``). The
expression is parsed and resolved against the corresponding
``node_run_result_mapping`` to obtain the actual comparison value.
Judgment is computed only from the per-item metric values and the
supplied ``JudgmentConfig``. ``metric_name`` selects the left-hand side
metric, and ``condition_value`` is used as the comparison target.
"""
from core.evaluation.base_evaluation_instance import resolve_variable_selector
from core.evaluation.entities.judgment_entity import JudgmentValueSource
from core.workflow.nodes.base.variable_template_parser import REGEX as VARIABLE_REGEX
judged_results: list[EvaluationItemResult] = []
for idx, result in enumerate(results):
for result in results:
if result.error is not None or not result.metrics:
judged_results.append(result)
continue
# Left side: only metrics
metric_values: dict[str, object] = {m.name: m.value for m in result.metrics}
judgment_result = JudgmentProcessor.evaluate(metric_values, judgment_config)
# Right side: pre-resolve variable expressions against node run results.
# Each condition.value expression (e.g. "{{#llm1.text#}}") is resolved
# and stored in variable_values keyed by the raw expression string, so
# that JudgmentProcessor._resolve_comparison_value can look it up.
variable_values: dict[str, object] = {}
node_run_result_mapping = (
node_run_result_mapping_list[idx]
if node_run_result_mapping_list and idx < len(node_run_result_mapping_list)
else {}
)
for condition in judgment_config.conditions:
if (
condition.value_source == JudgmentValueSource.VARIABLE
and isinstance(condition.value, str)
and node_run_result_mapping
):
match = VARIABLE_REGEX.fullmatch(condition.value)
if match:
resolved = resolve_variable_selector(
match.group(1), node_run_result_mapping
)
variable_values[condition.value] = resolved
judgment_result = JudgmentProcessor.evaluate(
metric_values, judgment_config, variable_values=variable_values
)
judged_results.append(
result.model_copy(update={"judgment": judgment_result})
)
judged_results.append(result.model_copy(update={"judgment": judgment_result}))
return judged_results

View File

@ -63,7 +63,7 @@ class RetrievalEvaluationRunner(BaseEvaluationRunner):
@staticmethod
def _extract_query(inputs: dict[str, Any]) -> str:
for key in ("query"):
for key in "query":
if key in inputs:
return str(inputs[key])
return ""

View File

@ -75,11 +75,15 @@ class SnippetEvaluationRunner(BaseEvaluationRunner):
# Retrieve per-node execution records from DB
workflow_run_id = self._extract_workflow_run_id(response)
node_executions = self._query_node_executions(
tenant_id=tenant_id,
app_id=target_id,
workflow_run_id=workflow_run_id,
) if workflow_run_id else []
node_executions = (
self._query_node_executions(
tenant_id=tenant_id,
app_id=target_id,
workflow_run_id=workflow_run_id,
)
if workflow_run_id
else []
)
return EvaluationItemResult(
index=item.index,
@ -189,18 +193,18 @@ class SnippetEvaluationRunner(BaseEvaluationRunner):
Returns a list of serialisable dicts for storage in ``metadata``.
"""
stmt = WorkflowNodeExecutionModel.preload_offload_data(
select(WorkflowNodeExecutionModel)
).where(
WorkflowNodeExecutionModel.tenant_id == tenant_id,
WorkflowNodeExecutionModel.app_id == app_id,
WorkflowNodeExecutionModel.workflow_run_id == workflow_run_id,
).order_by(asc(WorkflowNodeExecutionModel.created_at))
node_models: Sequence[WorkflowNodeExecutionModel] = (
self.session.execute(stmt).scalars().all()
stmt = (
WorkflowNodeExecutionModel.preload_offload_data(select(WorkflowNodeExecutionModel))
.where(
WorkflowNodeExecutionModel.tenant_id == tenant_id,
WorkflowNodeExecutionModel.app_id == app_id,
WorkflowNodeExecutionModel.workflow_run_id == workflow_run_id,
)
.order_by(asc(WorkflowNodeExecutionModel.created_at))
)
node_models: Sequence[WorkflowNodeExecutionModel] = self.session.execute(stmt).scalars().all()
return [self._serialize_node_execution(node) for node in node_models]
@staticmethod
@ -211,6 +215,7 @@ class SnippetEvaluationRunner(BaseEvaluationRunner):
status, error, and elapsed_time. The virtual Start node injected by
SnippetGenerateService is filtered out by the caller if needed.
"""
def _safe_parse_json(value: str | None) -> Any:
if not value:
return None

View File

@ -13,7 +13,6 @@ from core.evaluation.entities.evaluation_entity import (
)
from core.evaluation.runners.base_evaluation_runner import BaseEvaluationRunner
from core.workflow.node_events import NodeRunResult
from models.model import App
logger = logging.getLogger(__name__)

View File

@ -26,13 +26,6 @@ from .dataset import (
TidbAuthBinding,
Whitelist,
)
from .evaluation import (
EvaluationConfiguration,
EvaluationRun,
EvaluationRunItem,
EvaluationRunStatus,
EvaluationTargetType,
)
from .enums import (
AppTriggerStatus,
AppTriggerType,
@ -41,6 +34,13 @@ from .enums import (
WorkflowRunTriggeredFrom,
WorkflowTriggerStatus,
)
from .evaluation import (
EvaluationConfiguration,
EvaluationRun,
EvaluationRunItem,
EvaluationRunStatus,
EvaluationTargetType,
)
from .execution_extra_content import ExecutionExtraContent, HumanInputContent
from .human_input import HumanInputForm
from .model import (
@ -165,12 +165,12 @@ __all__ = [
"Document",
"DocumentSegment",
"Embedding",
"EndUser",
"EvaluationConfiguration",
"EvaluationRun",
"EvaluationRunItem",
"EvaluationRunStatus",
"EvaluationTargetType",
"EndUser",
"ExecutionExtraContent",
"ExporleBanner",
"ExternalKnowledgeApis",

View File

@ -50,9 +50,7 @@ class EvaluationConfiguration(Base):
created_by: Mapped[str] = mapped_column(StringUUID, nullable=False)
updated_by: Mapped[str] = mapped_column(StringUUID, nullable=False)
created_at: Mapped[datetime] = mapped_column(
DateTime, nullable=False, server_default=func.current_timestamp()
)
created_at: Mapped[datetime] = mapped_column(DateTime, nullable=False, server_default=func.current_timestamp())
updated_at: Mapped[datetime] = mapped_column(
DateTime, nullable=False, server_default=func.current_timestamp(), onupdate=func.current_timestamp()
)
@ -97,17 +95,13 @@ class EvaluationRun(Base):
target_id: Mapped[str] = mapped_column(StringUUID, nullable=False)
evaluation_config_id: Mapped[str] = mapped_column(StringUUID, nullable=False)
status: Mapped[str] = mapped_column(
String(20), nullable=False, default=EvaluationRunStatus.PENDING
)
status: Mapped[str] = mapped_column(String(20), nullable=False, default=EvaluationRunStatus.PENDING)
dataset_file_id: Mapped[str | None] = mapped_column(StringUUID, nullable=True)
result_file_id: Mapped[str | None] = mapped_column(StringUUID, nullable=True)
total_items: Mapped[int] = mapped_column(Integer, nullable=False, default=0)
completed_items: Mapped[int] = mapped_column(Integer, nullable=False, default=0)
failed_items: Mapped[int] = mapped_column(Integer, nullable=False, default=0)
metrics_summary: Mapped[str | None] = mapped_column(LongText, nullable=True)
error: Mapped[str | None] = mapped_column(Text, nullable=True)
celery_task_id: Mapped[str | None] = mapped_column(String(255), nullable=True)
@ -115,23 +109,11 @@ class EvaluationRun(Base):
created_by: Mapped[str] = mapped_column(StringUUID, nullable=False)
started_at: Mapped[datetime | None] = mapped_column(DateTime, nullable=True)
completed_at: Mapped[datetime | None] = mapped_column(DateTime, nullable=True)
created_at: Mapped[datetime] = mapped_column(
DateTime, nullable=False, server_default=func.current_timestamp()
)
created_at: Mapped[datetime] = mapped_column(DateTime, nullable=False, server_default=func.current_timestamp())
updated_at: Mapped[datetime] = mapped_column(
DateTime, nullable=False, server_default=func.current_timestamp(), onupdate=func.current_timestamp()
)
@property
def metrics_summary_dict(self) -> dict[str, Any]:
if self.metrics_summary:
return json.loads(self.metrics_summary)
return {}
@metrics_summary_dict.setter
def metrics_summary_dict(self, value: dict[str, Any]) -> None:
self.metrics_summary = json.dumps(value)
@property
def progress(self) -> float:
if self.total_items == 0:
@ -168,9 +150,7 @@ class EvaluationRunItem(Base):
overall_score: Mapped[float | None] = mapped_column(Float, nullable=True)
created_at: Mapped[datetime] = mapped_column(
DateTime, nullable=False, server_default=func.current_timestamp()
)
created_at: Mapped[datetime] = mapped_column(DateTime, nullable=False, server_default=func.current_timestamp())
@property
def inputs_dict(self) -> dict[str, Any]:
@ -184,6 +164,12 @@ class EvaluationRunItem(Base):
return json.loads(self.metrics)
return []
@property
def judgment_dict(self) -> dict[str, Any]:
if self.judgment:
return json.loads(self.judgment)
return {}
@property
def metadata_dict(self) -> dict[str, Any]:
if self.metadata_json:

View File

@ -245,13 +245,13 @@ class EvaluationService:
config.evaluation_model_provider = data.evaluation_model_provider
config.evaluation_model = data.evaluation_model
config.metrics_config = json.dumps({
"default_metrics": [m.model_dump() for m in data.default_metrics],
"customized_metrics": data.customized_metrics.model_dump() if data.customized_metrics else None,
})
config.judgement_conditions = json.dumps(
data.judgment_config.model_dump() if data.judgment_config else {}
config.metrics_config = json.dumps(
{
"default_metrics": [m.model_dump() for m in data.default_metrics],
"customized_metrics": data.customized_metrics.model_dump() if data.customized_metrics else None,
}
)
config.judgement_conditions = json.dumps(data.judgment_config.model_dump() if data.judgment_config else {})
config.updated_by = account_id
session.commit()
session.refresh(config)
@ -299,9 +299,7 @@ class EvaluationService:
)
max_concurrent = dify_config.EVALUATION_MAX_CONCURRENT_RUNS
if active_runs >= max_concurrent:
raise EvaluationMaxConcurrentRunsError(
f"Maximum concurrent runs ({max_concurrent}) reached."
)
raise EvaluationMaxConcurrentRunsError(f"Maximum concurrent runs ({max_concurrent}) reached.")
# Parse dataset
items = cls._parse_dataset(dataset_file_content)
@ -373,11 +371,7 @@ class EvaluationService:
tenant_id: str,
run_id: str,
) -> EvaluationRun:
run = (
session.query(EvaluationRun)
.filter_by(id=run_id, tenant_id=tenant_id)
.first()
)
run = session.query(EvaluationRun).filter_by(id=run_id, tenant_id=tenant_id).first()
if not run:
raise EvaluationNotFoundError("Evaluation run not found.")
return run
@ -587,8 +581,7 @@ class EvaluationService:
@staticmethod
def _extract_workflow_run_id(response: Mapping[str, object]) -> str | None:
"""Extract ``workflow_run_id`` from a blocking workflow response.
"""
"""Extract ``workflow_run_id`` from a blocking workflow response."""
wf_run_id = response.get("workflow_run_id")
if wf_run_id:
return str(wf_run_id)
@ -610,13 +603,15 @@ class EvaluationService:
from core.workflow.enums import WorkflowNodeExecutionStatus
from models.workflow import WorkflowNodeExecutionModel
stmt = WorkflowNodeExecutionModel.preload_offload_data(
select(WorkflowNodeExecutionModel)
).where(
WorkflowNodeExecutionModel.tenant_id == tenant_id,
WorkflowNodeExecutionModel.app_id == app_id,
WorkflowNodeExecutionModel.workflow_run_id == workflow_run_id,
).order_by(asc(WorkflowNodeExecutionModel.created_at))
stmt = (
WorkflowNodeExecutionModel.preload_offload_data(select(WorkflowNodeExecutionModel))
.where(
WorkflowNodeExecutionModel.tenant_id == tenant_id,
WorkflowNodeExecutionModel.app_id == app_id,
WorkflowNodeExecutionModel.workflow_run_id == workflow_run_id,
)
.order_by(asc(WorkflowNodeExecutionModel.created_at))
)
node_models: list[WorkflowNodeExecutionModel] = list(session.execute(stmt).scalars().all())

View File

@ -452,8 +452,9 @@ class SnippetDslService:
},
}
self._append_workflow_export_data(export_data=export_data, snippet=snippet, workflow=workflow,
include_secret=include_secret)
self._append_workflow_export_data(
export_data=export_data, snippet=snippet, workflow=workflow, include_secret=include_secret
)
return yaml.dump(export_data, allow_unicode=True) # type: ignore

View File

@ -15,6 +15,7 @@ from core.evaluation.entities.evaluation_entity import (
EvaluationItemResult,
EvaluationRunData,
)
from core.evaluation.entities.judgment_entity import JudgmentConfig
from core.evaluation.evaluation_manager import EvaluationManager
from core.evaluation.runners.agent_evaluation_runner import AgentEvaluationRunner
from core.evaluation.runners.base_evaluation_runner import BaseEvaluationRunner
@ -87,23 +88,20 @@ def _execute_evaluation(session: Any, run_data: EvaluationRunData) -> None:
)
results: list[EvaluationItemResult] = _execute_evaluation_runner(
session,
run_data,
evaluation_instance,
session,
run_data,
evaluation_instance,
node_run_result_mapping_list,
)
# Compute summary metrics
metrics_summary = _compute_metrics_summary(results)
metrics_summary = _compute_metrics_summary(results, run_data.judgment_config)
# Generate result XLSX
result_xlsx = _generate_result_xlsx(run_data.items, results)
# Store result file
result_file_id = _store_result_file(
run_data.tenant_id, run_data.evaluation_run_id, result_xlsx, session
)
result_file_id = _store_result_file(run_data.tenant_id, run_data.evaluation_run_id, result_xlsx, session)
# Update run to completed
evaluation_run = session.query(EvaluationRun).filter_by(id=run_data.evaluation_run_id).first()
@ -119,9 +117,9 @@ def _execute_evaluation(session: Any, run_data: EvaluationRunData) -> None:
def _execute_evaluation_runner(
session: Any,
run_data: EvaluationRunData,
evaluation_instance: BaseEvaluationInstance,
session: Any,
run_data: EvaluationRunData,
evaluation_instance: BaseEvaluationInstance,
node_run_result_mapping_list: list[dict[str, NodeRunResult]],
) -> list[EvaluationItemResult]:
"""Execute the evaluation runner."""
@ -137,31 +135,37 @@ def _execute_evaluation_runner(
node_run_result_list.append(node_run_result)
if node_run_result_list:
runner = _create_runner(EvaluationCategory(node_info.type), evaluation_instance, session)
results.extend(runner.run(
evaluation_run_id=run_data.evaluation_run_id,
tenant_id=run_data.tenant_id,
target_id=run_data.target_id,
target_type=run_data.target_type,
default_metric=default_metric,
customized_metrics=None,
model_provider=run_data.evaluation_model_provider,
model_name=run_data.evaluation_model,
node_run_result_list=node_run_result_list,
))
results.extend(
runner.run(
evaluation_run_id=run_data.evaluation_run_id,
tenant_id=run_data.tenant_id,
target_id=run_data.target_id,
target_type=run_data.target_type,
default_metric=default_metric,
customized_metrics=None,
model_provider=run_data.evaluation_model_provider,
model_name=run_data.evaluation_model,
node_run_result_list=node_run_result_list,
judgment_config=run_data.judgment_config,
)
)
if customized_metrics:
runner = _create_runner(EvaluationCategory.WORKFLOW, evaluation_instance, session)
results.extend(runner.run(
evaluation_run_id=run_data.evaluation_run_id,
tenant_id=run_data.tenant_id,
target_id=run_data.target_id,
target_type=run_data.target_type,
default_metric=None,
customized_metrics=customized_metrics,
node_run_result_list=None,
node_run_result_mapping_list=node_run_result_mapping_list,
))
results.extend(
runner.run(
evaluation_run_id=run_data.evaluation_run_id,
tenant_id=run_data.tenant_id,
target_id=run_data.target_id,
target_type=run_data.target_type,
default_metric=None,
customized_metrics=customized_metrics,
node_run_result_list=None,
node_run_result_mapping_list=node_run_result_mapping_list,
judgment_config=run_data.judgment_config,
)
)
return results
def _create_runner(
category: EvaluationCategory,
@ -197,34 +201,32 @@ def _mark_run_failed(session: Any, run_id: str, error: str) -> None:
logger.exception("Failed to mark run %s as failed", run_id)
def _compute_metrics_summary(results: list[EvaluationItemResult]) -> dict[str, Any]:
"""Compute average scores per metric across all results."""
metric_scores: dict[str, list[float]] = {}
for result in results:
if result.error:
continue
for metric in result.metrics:
if metric.name not in metric_scores:
metric_scores[metric.name] = []
metric_scores[metric.name].append(float(metric.value))
def _compute_metrics_summary(
results: list[EvaluationItemResult],
judgment_config: JudgmentConfig | None,
) -> dict[str, Any]:
"""Compute aggregate metric and judgment summaries for an evaluation run.
Metric statistics are calculated from successful item results only. When a
judgment config is present, the summary also reports how many successful
items passed or failed the configured judgment rules.
"""
summary: dict[str, Any] = {}
for name, scores in metric_scores.items():
summary[name] = {
"average": sum(scores) / len(scores) if scores else 0.0,
"min": min(scores) if scores else 0.0,
"max": max(scores) if scores else 0.0,
"count": len(scores),
}
# Overall average
all_scores = [s for scores in metric_scores.values() for s in scores]
summary["_overall"] = {
"average": sum(all_scores) / len(all_scores) if all_scores else 0.0,
"total_items": len(results),
"successful_items": sum(1 for r in results if r.error is None),
"failed_items": sum(1 for r in results if r.error is not None),
}
if judgment_config is not None and judgment_config.conditions:
evaluated_results: list[EvaluationItemResult] = [result for result in results if result.error is None and result.metrics]
passed_items = sum(1 for result in evaluated_results if result.judgment.passed)
evaluated_items = len(evaluated_results)
summary["_judgment"] = {
"enabled": True,
"logical_operator": judgment_config.logical_operator,
"configured_conditions": len(judgment_config.conditions),
"evaluated_items": evaluated_items,
"passed_items": passed_items,
"failed_items": evaluated_items - passed_items,
"pass_rate": passed_items / evaluated_items if evaluated_items else 0.0,
}
return summary
@ -266,11 +268,7 @@ def _generate_result_xlsx(
# Build headers
headers = (
["index"]
+ input_keys
+ ["expected_output", "actual_output"]
+ all_metric_names
+ ["overall_score", "error"]
["index"] + input_keys + ["expected_output", "actual_output"] + all_metric_names + ["overall_score", "error"]
)
# Write header row
@ -320,9 +318,7 @@ def _generate_result_xlsx(
col += 1
# Overall score
ws.cell(
row=row_idx, column=col, value=result.overall_score if result else ""
).border = thin_border
ws.cell(row=row_idx, column=col, value=result.overall_score if result else "").border = thin_border
col += 1
# Error