feat: implement customized evaluation with workflow, and add judgment condition after evaluate_metrics.

This commit is contained in:
FFXN 2026-03-04 14:46:24 +08:00
parent 62bf286220
commit 7251bffae1
10 changed files with 558 additions and 3 deletions

View File

@ -6,6 +6,7 @@ from pydantic import BaseModel
class EvaluationFrameworkEnum(StrEnum):
RAGAS = "ragas"
DEEPEVAL = "deepeval"
CUSTOMIZED = "customized"
NONE = "none"
@ -17,3 +18,8 @@ class BaseEvaluationConfig(BaseModel):
class RagasConfig(BaseEvaluationConfig):
"""RAGAS-specific configuration."""
pass
class CustomizedEvaluatorConfig(BaseEvaluationConfig):
"""Configuration for the customized workflow-based evaluator."""
pass

View File

@ -3,6 +3,8 @@ from typing import Any
from pydantic import BaseModel, Field
from core.evaluation.entities.judgment_entity import JudgmentConfig, JudgmentResult
class EvaluationCategory(StrEnum):
LLM = "llm"
@ -28,6 +30,7 @@ class EvaluationItemResult(BaseModel):
index: int
actual_output: str | None = None
metrics: list[EvaluationMetric] = Field(default_factory=list)
judgment: JudgmentResult | None = None
metadata: dict[str, Any] = Field(default_factory=dict)
error: str | None = None
@ -49,4 +52,5 @@ class EvaluationRunData(BaseModel):
evaluation_model_provider: str
evaluation_model: str
metrics_config: dict[str, Any] = Field(default_factory=dict)
judgment_config: JudgmentConfig | None = None
items: list[EvaluationItemInput]

View File

@ -0,0 +1,84 @@
"""Judgment condition entities for evaluation metric assessment.
Typical usage:
judgment_config = JudgmentConfig(
logical_operator="and",
conditions=[
JudgmentCondition(metric_name="faithfulness", comparison_operator=">", value="0.8"),
JudgmentCondition(metric_name="answer_relevancy", comparison_operator="", value="0.7"),
],
)
"""
from collections.abc import Sequence
from typing import Any, Literal
from pydantic import BaseModel, Field
from core.workflow.utils.condition.entities import SupportedComparisonOperator
class JudgmentCondition(BaseModel):
"""A single judgment condition that checks one metric value.
Attributes:
metric_name: The name of the evaluation metric to check
(must match an EvaluationMetric.name in the results).
comparison_operator: The comparison operator to apply
(reuses the same operator set as workflow condition branches).
value: The expected/threshold value to compare against.
For numeric operators (>, <, =, etc.), this should be a numeric string.
For string operators (contains, is, etc.), this should be a string.
For unary operators (empty, null, etc.), this can be None.
"""
metric_name: str
comparison_operator: SupportedComparisonOperator
value: str | Sequence[str] | None = None
class JudgmentConfig(BaseModel):
"""A group of judgment conditions combined with a logical operator.
Attributes:
logical_operator: How to combine condition results "and" requires
all conditions to pass, "or" requires at least one.
conditions: The list of individual conditions to evaluate.
"""
logical_operator: Literal["and", "or"] = "and"
conditions: list[JudgmentCondition] = Field(default_factory=list)
class JudgmentConditionResult(BaseModel):
"""Result of evaluating a single judgment condition.
Attributes:
metric_name: Which metric was checked.
comparison_operator: The operator that was applied.
expected_value: The threshold/expected value from the condition config.
actual_value: The actual metric value that was evaluated.
passed: Whether this individual condition passed.
error: Error message if the condition evaluation failed.
"""
metric_name: str
comparison_operator: str
expected_value: Any = None
actual_value: Any = None
passed: bool = False
error: str | None = None
class JudgmentResult(BaseModel):
"""Overall result of evaluating all judgment conditions for one item.
Attributes:
passed: Whether the overall judgment passed (based on logical_operator).
logical_operator: The logical operator used to combine conditions.
condition_results: Detailed result for each individual condition.
"""
passed: bool = False
logical_operator: Literal["and", "or"] = "and"
condition_results: list[JudgmentConditionResult] = Field(default_factory=list)

View File

@ -25,6 +25,14 @@ class EvaluationFrameworkConfigMap(collections.UserDict[str, dict[str, Any]]):
}
case EvaluationFrameworkEnum.DEEPEVAL:
raise NotImplementedError("DeepEval adapter is not yet implemented.")
case EvaluationFrameworkEnum.CUSTOMIZED:
from core.evaluation.entities.config_entity import CustomizedEvaluatorConfig
from core.evaluation.frameworks.customized.customized_evaluator import CustomizedEvaluator
return {
"config_class": CustomizedEvaluatorConfig,
"evaluator_class": CustomizedEvaluator,
}
case _:
raise ValueError(f"Unknown evaluation framework: {framework}")

View File

@ -0,0 +1,267 @@
"""Customized workflow-based evaluator.
Uses a published workflow as the evaluation strategy. The target's actual output,
expected output, original inputs, and context are passed as workflow inputs.
The workflow's output variables are treated as evaluation metrics.
The evaluation workflow_id is provided per evaluation run via
metrics_config["workflow_id"].
"""
import json
import logging
from collections.abc import Mapping
from typing import Any
from core.evaluation.base_evaluation_instance import BaseEvaluationInstance
from core.evaluation.entities.config_entity import CustomizedEvaluatorConfig
from core.evaluation.entities.evaluation_entity import (
EvaluationCategory,
EvaluationItemInput,
EvaluationItemResult,
EvaluationMetric,
)
logger = logging.getLogger(__name__)
class CustomizedEvaluator(BaseEvaluationInstance):
"""Evaluate using a published workflow."""
def __init__(self, config: CustomizedEvaluatorConfig):
self.config = config
def evaluate_llm(
self,
items: list[EvaluationItemInput],
metrics_config: dict,
model_provider: str,
model_name: str,
tenant_id: str,
) -> list[EvaluationItemResult]:
return self._evaluate_with_workflow(items, metrics_config, tenant_id)
def evaluate_retrieval(
self,
items: list[EvaluationItemInput],
metrics_config: dict,
model_provider: str,
model_name: str,
tenant_id: str,
) -> list[EvaluationItemResult]:
return self._evaluate_with_workflow(items, metrics_config, tenant_id)
def evaluate_agent(
self,
items: list[EvaluationItemInput],
metrics_config: dict,
model_provider: str,
model_name: str,
tenant_id: str,
) -> list[EvaluationItemResult]:
return self._evaluate_with_workflow(items, metrics_config, tenant_id)
def evaluate_workflow(
self,
items: list[EvaluationItemInput],
metrics_config: dict,
model_provider: str,
model_name: str,
tenant_id: str,
) -> list[EvaluationItemResult]:
return self._evaluate_with_workflow(items, metrics_config, tenant_id)
def get_supported_metrics(self, category: EvaluationCategory) -> list[str]:
"""Metrics are dynamic and defined by the evaluation workflow outputs.
Return an empty list since available metrics depend on the specific
workflow chosen at runtime.
"""
return []
def _evaluate_with_workflow(
self,
items: list[EvaluationItemInput],
metrics_config: dict,
tenant_id: str,
) -> list[EvaluationItemResult]:
"""Run the evaluation workflow for each item and extract metric scores.
Args:
items: Evaluation items with inputs, expected_output, and context
(context typically contains the target's actual_output, merged
by the Runner's evaluate_metrics method).
metrics_config: Must contain "workflow_id" pointing to a published
WORKFLOW-type App.
tenant_id: Tenant scope for database and workflow execution.
Returns:
List of EvaluationItemResult with metrics extracted from workflow outputs.
Raises:
ValueError: If workflow_id is missing from metrics_config or the
workflow/app cannot be found.
"""
workflow_id = metrics_config.get("workflow_id")
if not workflow_id:
raise ValueError(
"metrics_config must contain 'workflow_id' for customized evaluator"
)
app, workflow, service_account = self._load_workflow_resources(workflow_id, tenant_id)
results: list[EvaluationItemResult] = []
for item in items:
try:
result = self._evaluate_single_item(app, workflow, service_account, item)
results.append(result)
except Exception:
logger.exception(
"Customized evaluator failed for item %d with workflow %s",
item.index,
workflow_id,
)
results.append(EvaluationItemResult(index=item.index))
return results
def _evaluate_single_item(
self,
app: Any,
workflow: Any,
service_account: Any,
item: EvaluationItemInput,
) -> EvaluationItemResult:
"""Run the evaluation workflow for a single item.
Builds workflow inputs from the item data and executes the workflow
in non-streaming mode. Extracts metrics from the workflow's output
variables.
"""
from core.app.apps.workflow.app_generator import WorkflowAppGenerator
from core.app.entities.app_invoke_entities import InvokeFrom
workflow_inputs = self._build_workflow_inputs(item)
generator = WorkflowAppGenerator()
response: Mapping[str, Any] = generator.generate(
app_model=app,
workflow=workflow,
user=service_account,
args={"inputs": workflow_inputs},
invoke_from=InvokeFrom.SERVICE_API,
streaming=False,
)
metrics = self._extract_metrics(response)
return EvaluationItemResult(
index=item.index,
metrics=metrics,
metadata={"workflow_response": self._safe_serialize(response)},
)
def _load_workflow_resources(
self, workflow_id: str, tenant_id: str
) -> tuple[Any, Any, Any]:
"""Load the evaluation workflow App, its published workflow, and a service account.
Args:
workflow_id: The App ID of the evaluation workflow.
tenant_id: Tenant scope.
Returns:
Tuple of (app, workflow, service_account).
Raises:
ValueError: If the app or published workflow cannot be found.
"""
from sqlalchemy.orm import Session
from core.evaluation.runners import get_service_account_for_app
from models.engine import db
from models.model import App
from services.workflow_service import WorkflowService
with Session(db.engine, expire_on_commit=False) as session, session.begin():
app = session.query(App).filter_by(id=workflow_id, tenant_id=tenant_id).first()
if not app:
raise ValueError(
f"Evaluation workflow app {workflow_id} not found in tenant {tenant_id}"
)
service_account = get_service_account_for_app(session, workflow_id)
workflow_service = WorkflowService()
published_workflow = workflow_service.get_published_workflow(app_model=app)
if not published_workflow:
raise ValueError(
f"No published workflow found for evaluation app {workflow_id}"
)
return app, published_workflow, service_account
@staticmethod
def _build_workflow_inputs(item: EvaluationItemInput) -> dict[str, Any]:
"""Build workflow input dict from an evaluation item.
Maps evaluation data to conventional workflow input variable names:
- actual_output: The target's actual output (from context[0] if available)
- expected_output: The expected/reference output
- inputs: The original evaluation inputs as JSON string
- context: All context strings joined by newlines
"""
workflow_inputs: dict[str, Any] = {}
# The actual_output is typically the first element in context
# (merged by the Runner's evaluate_metrics method)
if item.context:
workflow_inputs["actual_output"] = item.context[0] if len(item.context) == 1 else "\n\n".join(item.context)
if item.expected_output:
workflow_inputs["expected_output"] = item.expected_output
if item.inputs:
workflow_inputs["inputs"] = json.dumps(item.inputs, ensure_ascii=False)
if item.context and len(item.context) > 1:
workflow_inputs["context"] = "\n\n".join(item.context)
return workflow_inputs
@staticmethod
def _extract_metrics(response: Mapping[str, Any]) -> list[EvaluationMetric]:
"""Extract evaluation metrics from workflow output variables.
Each output variable is treated as a metric.
"""
metrics: list[EvaluationMetric] = []
data = response.get("data", {})
if not isinstance(data, Mapping):
logger.warning("Unexpected workflow response format: missing 'data' dict")
return metrics
outputs = data.get("outputs", {})
if not isinstance(outputs, Mapping):
logger.warning("Unexpected workflow response format: 'outputs' is not a dict")
return metrics
for key, value in outputs.items():
try:
score = float(value)
metrics.append(EvaluationMetric(name=key, score=score))
except (TypeError, ValueError):
metrics.append(
EvaluationMetric(name=key, score=0.0, details={"raw_value": value})
)
return metrics
@staticmethod
def _safe_serialize(response: Mapping[str, Any]) -> dict[str, Any]:
"""Safely serialize workflow response for metadata storage."""
try:
return dict(response)
except Exception:
return {"raw": str(response)}

View File

View File

@ -0,0 +1,144 @@
"""Judgment condition processor for evaluation metrics.
Evaluates pass/fail judgment conditions against evaluation metric values.
Reuses the core comparison engine from the workflow condition system
(core.workflow.utils.condition.processor._evaluate_condition) to ensure
consistent operator semantics across the platform.
"""
import logging
from typing import Any
from core.evaluation.entities.judgment_entity import (
JudgmentCondition,
JudgmentConditionResult,
JudgmentConfig,
JudgmentResult,
)
from core.workflow.utils.condition.processor import _evaluate_condition
logger = logging.getLogger(__name__)
class JudgmentProcessor:
@staticmethod
def evaluate(
metric_values: dict[str, Any],
config: JudgmentConfig,
) -> JudgmentResult:
"""Evaluate all judgment conditions against the given metric values.
Args:
metric_values: Mapping of metric name to its value
(e.g. {"faithfulness": 0.85, "status": "success"}).
config: The judgment configuration with logical_operator and conditions.
Returns:
JudgmentResult with overall pass/fail and per-condition details.
"""
if not config.conditions:
return JudgmentResult(
passed=True,
logical_operator=config.logical_operator,
condition_results=[],
)
condition_results: list[JudgmentConditionResult] = []
for condition in config.conditions:
result = JudgmentProcessor._evaluate_single_condition(
metric_values, condition
)
condition_results.append(result)
if config.logical_operator == "and" and not result.passed:
return JudgmentResult(
passed=False,
logical_operator=config.logical_operator,
condition_results=condition_results,
)
if config.logical_operator == "or" and result.passed:
return JudgmentResult(
passed=True,
logical_operator=config.logical_operator,
condition_results=condition_results,
)
if config.logical_operator == "and":
final_passed = all(r.passed for r in condition_results)
else:
final_passed = any(r.passed for r in condition_results)
return JudgmentResult(
passed=final_passed,
logical_operator=config.logical_operator,
condition_results=condition_results,
)
@staticmethod
def _evaluate_single_condition(
metric_values: dict[str, Any],
condition: JudgmentCondition,
) -> JudgmentConditionResult:
"""Evaluate a single judgment condition against the metric values.
Looks up the metric by name, then delegates to the workflow condition
engine for the actual comparison.
Args:
metric_values: Mapping of metric name to its value.
condition: The condition to evaluate.
Returns:
JudgmentConditionResult with pass/fail and details.
"""
metric_name = condition.metric_name
actual_value = metric_values.get(metric_name)
# Handle metric not found
if actual_value is None and condition.comparison_operator not in (
"null",
"not null",
"empty",
"not empty",
"exists",
"not exists",
):
return JudgmentConditionResult(
metric_name=metric_name,
comparison_operator=condition.comparison_operator,
expected_value=condition.value,
actual_value=None,
passed=False,
error=f"Metric '{metric_name}' not found in evaluation results",
)
try:
passed = _evaluate_condition(
operator=condition.comparison_operator,
value=actual_value,
expected=condition.value,
)
return JudgmentConditionResult(
metric_name=metric_name,
comparison_operator=condition.comparison_operator,
expected_value=condition.value,
actual_value=actual_value,
passed=passed,
)
except Exception as e:
logger.warning(
"Judgment condition evaluation failed for metric '%s': %s",
metric_name,
str(e),
)
return JudgmentConditionResult(
metric_name=metric_name,
comparison_operator=condition.comparison_operator,
expected_value=condition.value,
actual_value=actual_value,
passed=False,
error=str(e),
)

View File

@ -9,6 +9,8 @@ from core.evaluation.entities.evaluation_entity import (
EvaluationItemInput,
EvaluationItemResult,
)
from core.evaluation.entities.judgment_entity import JudgmentConfig
from core.evaluation.judgment.processor import JudgmentProcessor
from libs.datetime_utils import naive_utc_now
from models.evaluation import EvaluationRun, EvaluationRunItem, EvaluationRunStatus
@ -20,7 +22,13 @@ class BaseEvaluationRunner(ABC):
Runners are responsible for executing the target (App/Snippet/Retrieval)
to collect actual outputs, then delegating to the evaluation instance
for metric computation.
for metric computation, and optionally applying judgment conditions.
Execution phases:
1. execute_target run the target and collect actual outputs
2. evaluate_metrics compute evaluation metrics via the framework
3. apply_judgment evaluate pass/fail judgment conditions on metrics
4. persist save results to the database
"""
def __init__(self, evaluation_instance: BaseEvaluationInstance, session: Session):
@ -61,8 +69,11 @@ class BaseEvaluationRunner(ABC):
metrics_config: dict,
model_provider: str,
model_name: str,
judgment_config: JudgmentConfig | None = None,
) -> list[EvaluationItemResult]:
"""Orchestrate target execution + metric evaluation for all items."""
"""Orchestrate target execution + metric evaluation + judgment for all items.
"""
evaluation_run = self.session.query(EvaluationRun).filter_by(id=evaluation_run_id).first()
if not evaluation_run:
raise ValueError(f"EvaluationRun {evaluation_run_id} not found")
@ -108,7 +119,11 @@ class BaseEvaluationRunner(ABC):
except Exception:
logger.exception("Failed to compute metrics for evaluation run %s", evaluation_run_id)
# Phase 3: Persist individual items
# Phase 3: Apply judgment conditions on metrics
if judgment_config and judgment_config.conditions:
results = self._apply_judgment(results, judgment_config)
# Phase 4: Persist individual items
for result in results:
item_input = next((item for item in items if item.index == result.index), None)
run_item = EvaluationRunItem(
@ -119,6 +134,7 @@ class BaseEvaluationRunner(ABC):
context=json.dumps(item_input.context) if item_input and item_input.context else None,
actual_output=result.actual_output,
metrics=json.dumps([m.model_dump() for m in result.metrics]) if result.metrics else None,
judgment=json.dumps(result.judgment.model_dump()) if result.judgment else None,
metadata_json=json.dumps(result.metadata) if result.metadata else None,
error=result.error,
overall_score=result.overall_score,
@ -128,3 +144,28 @@ class BaseEvaluationRunner(ABC):
self.session.commit()
return results
@staticmethod
def _apply_judgment(
results: list[EvaluationItemResult],
judgment_config: JudgmentConfig,
) -> list[EvaluationItemResult]:
"""Apply judgment conditions to each result's metrics.
Builds a metric_name score mapping from each result's metrics,
then delegates to JudgmentProcessor for condition evaluation.
Results with errors are skipped.
"""
judged_results: list[EvaluationItemResult] = []
for result in results:
if result.error is not None or not result.metrics:
judged_results.append(result)
continue
metric_values = {m.name: m.score for m in result.metrics}
judgment_result = JudgmentProcessor.evaluate(metric_values, judgment_config)
judged_results.append(
result.model_copy(update={"judgment": judgment_result})
)
return judged_results

View File

@ -162,6 +162,7 @@ class EvaluationRunItem(Base):
actual_output: Mapped[str | None] = mapped_column(LongText, nullable=True)
metrics: Mapped[str | None] = mapped_column(LongText, nullable=True)
judgment: Mapped[str | None] = mapped_column(LongText, nullable=True)
metadata_json: Mapped[str | None] = mapped_column(LongText, nullable=True)
error: Mapped[str | None] = mapped_column(Text, nullable=True)