diff --git a/api/core/evaluation/base_evaluation_instance.py b/api/core/evaluation/base_evaluation_instance.py index 07044e2cda..f6c64eac96 100644 --- a/api/core/evaluation/base_evaluation_instance.py +++ b/api/core/evaluation/base_evaluation_instance.py @@ -1,4 +1,3 @@ -import json import logging from abc import ABC, abstractmethod from collections.abc import Mapping @@ -6,7 +5,6 @@ from typing import Any from core.evaluation.entities.evaluation_entity import ( CustomizedMetrics, - DefaultMetric, EvaluationCategory, EvaluationItemInput, EvaluationItemResult, @@ -74,15 +72,17 @@ class BaseEvaluationInstance(ABC): becomes the score. Args: - items: Evaluation items with inputs, expected_output, context. - results: Results from Phase 1 (with actual_output populated). - customized_metrics: Must contain ``evaluation_workflow_id`` - pointing to a published WORKFLOW-type App. + node_run_result_mapping_list: One mapping per test-data item, + where each mapping is ``{node_id: NodeRunResult}`` from the + target execution. + customized_metrics: Contains ``evaluation_workflow_id`` (the + published evaluator workflow) and ``input_fields`` (value + sources for the evaluator's input variables). tenant_id: Tenant scope. Returns: A list of ``EvaluationItemResult`` with metrics extracted from - the workflow outputs. + the evaluator workflow's output variables. """ from sqlalchemy.orm import Session @@ -93,7 +93,7 @@ class BaseEvaluationInstance(ABC): from models.model import App from services.workflow_service import WorkflowService - workflow_id = customized_metrics.get("evaluation_workflow_id") + workflow_id = customized_metrics.evaluation_workflow_id if not workflow_id: raise ValueError( "customized_metrics must contain 'evaluation_workflow_id' for customized evaluator" @@ -118,9 +118,11 @@ class BaseEvaluationInstance(ABC): ) eval_results: list[EvaluationItemResult] = [] - for node_run_result_mapping in node_run_result_mapping_list: + for idx, node_run_result_mapping in enumerate(node_run_result_mapping_list): try: - workflow_inputs = self._build_workflow_inputs(customized_metrics.input_fields, node_run_result_mapping) + workflow_inputs = self._build_workflow_inputs( + customized_metrics.input_fields, node_run_result_mapping, + ) generator = WorkflowAppGenerator() response: Mapping[str, Any] = generator.generate( @@ -130,25 +132,23 @@ class BaseEvaluationInstance(ABC): args={"inputs": workflow_inputs}, invoke_from=InvokeFrom.SERVICE_API, streaming=False, + call_depth=0, ) metrics = self._extract_workflow_metrics(response) eval_results.append( EvaluationItemResult( - index=item.index, + index=idx, metrics=metrics, - metadata={ - "workflow_response": _safe_serialize(response), - }, ) ) except Exception: logger.exception( "Customized evaluator failed for item %d with workflow %s", - item.index, + idx, workflow_id, ) - eval_results.append(EvaluationItemResult(index=item.index)) + eval_results.append(EvaluationItemResult(index=idx)) return eval_results @@ -157,72 +157,126 @@ class BaseEvaluationInstance(ABC): input_fields: dict[str, Any], node_run_result_mapping: dict[str, NodeRunResult], ) -> dict[str, Any]: - """Build workflow input dict from evaluation data. + """Build customized workflow inputs by resolving value sources. + + Each entry in ``input_fields`` maps a workflow input variable name + to its value source, which can be: + + - **Constant**: a plain string without ``{{#…#}}`` used as-is. + - **Expression**: a string containing one or more + ``{{#node_id.output_key#}}`` selectors (same format as + ``VariableTemplateParser``) resolved from + ``node_run_result_mapping``. - Maps evaluation data to conventional workflow input variable names: - - ``actual_output``: The target's actual output (from ``result``). - - ``expected_output``: The expected/reference output. - - ``inputs``: The original evaluation inputs as a JSON string. - - ``context``: All context strings joined by newlines. """ + from core.workflow.nodes.base.variable_template_parser import REGEX as VARIABLE_REGEX + workflow_inputs: dict[str, Any] = {} - if result and result.actual_output: - workflow_inputs["actual_output"] = result.actual_output + for field_name, value_source in input_fields.items(): + if not isinstance(value_source, str): + # Non-string values (numbers, bools, dicts) are used directly. + workflow_inputs[field_name] = value_source + continue - if item.expected_output: - workflow_inputs["expected_output"] = item.expected_output - - if item.inputs: - workflow_inputs["inputs"] = json.dumps(item.inputs, ensure_ascii=False) - - if item.context: - workflow_inputs["context"] = "\n\n".join(item.context) + # Check if the entire value is a single expression. + full_match = VARIABLE_REGEX.fullmatch(value_source) + if full_match: + workflow_inputs[field_name] = _resolve_variable_selector( + full_match.group(1), node_run_result_mapping, + ) + elif VARIABLE_REGEX.search(value_source): + # Mixed template: interpolate all expressions as strings. + workflow_inputs[field_name] = VARIABLE_REGEX.sub( + lambda m: str( + _resolve_variable_selector(m.group(1), node_run_result_mapping) + ), + value_source, + ) + else: + # Plain constant — no expression markers. + workflow_inputs[field_name] = value_source return workflow_inputs @staticmethod def _extract_workflow_metrics( - response: Mapping[str, Any], + response: Mapping[str, object], ) -> list[EvaluationMetric]: """Extract evaluation metrics from workflow output variables. Each output variable is treated as a metric. The variable name - becomes the metric name, and its value becomes the score. - Non-numeric values are recorded with ``score=0.0`` and the raw - value stored in ``details``. + becomes the metric name, and its value is stored as-is regardless + of type (numeric, string, dict, etc.). """ metrics: list[EvaluationMetric] = [] - data = response.get("data", {}) + data = response.get("data") if not isinstance(data, Mapping): logger.warning("Unexpected workflow response format: missing 'data' dict") return metrics - outputs = data.get("outputs", {}) - if not isinstance(outputs, Mapping): + outputs = data.get("outputs") + if not isinstance(outputs, dict): logger.warning( "Unexpected workflow response format: 'outputs' is not a dict" ) return metrics - for key, value in outputs.items(): - try: - score = float(value) - metrics.append(EvaluationMetric(name=key, score=score)) - except (TypeError, ValueError): - metrics.append( - EvaluationMetric( - name=key, score=0.0, details={"raw_value": value} - ) - ) + for key, raw_value in outputs.items(): + if not isinstance(key, str): + continue + metrics.append(EvaluationMetric(name=key, value=raw_value)) return metrics -def _safe_serialize(response: Mapping[str, Any]) -> dict[str, Any]: - """Safely serialize workflow response for metadata storage.""" - try: - return dict(response) - except Exception: - return {"raw": str(response)} +def _resolve_variable_selector( + selector_raw: str, + node_run_result_mapping: dict[str, NodeRunResult], +) -> object: + """Resolve a ``#node_id.output_key#`` selector against node run results. + Returns the resolved value in its original type, or an empty string + if the node or any key along the path is not found. + """ + # "#node_id.output_key#" → "node_id.output_key" + cleaned = selector_raw.strip("#") + parts = cleaned.split(".") + + if len(parts) < 2: + logger.warning( + "Selector '%s' must have at least node_id.output_key", selector_raw, + ) + return "" + + node_id = parts[0] + output_path = parts[1:] + + node_result = node_run_result_mapping.get(node_id) + if not node_result or not node_result.outputs: + logger.warning( + "Selector '%s': node '%s' not found or has no outputs", + selector_raw, node_id, + ) + return "" + + # Traverse the output path to support nested keys. + current: object = node_result.outputs + for key in output_path: + if isinstance(current, Mapping): + next_val = current.get(key) + if next_val is None: + logger.warning( + "Selector '%s': key '%s' not found in node '%s' outputs", + selector_raw, key, node_id, + ) + return "" + current = next_val + else: + logger.warning( + "Selector '%s': cannot traverse into non-dict value at key '%s'", + selector_raw, key, + ) + return "" + + return current if current is not None else "" diff --git a/api/core/evaluation/entities/evaluation_entity.py b/api/core/evaluation/entities/evaluation_entity.py index 63fe180899..16148f8408 100644 --- a/api/core/evaluation/entities/evaluation_entity.py +++ b/api/core/evaluation/entities/evaluation_entity.py @@ -39,6 +39,8 @@ class EvaluationItemResult(BaseModel): index: int actual_output: str | None = None metrics: list[EvaluationMetric] = Field(default_factory=list) + metadata: dict[str, Any] = Field(default_factory=dict) + judgment: JudgmentResult | None = None error: str | None = None @property diff --git a/api/core/evaluation/runners/base_evaluation_runner.py b/api/core/evaluation/runners/base_evaluation_runner.py index 2223bfbe32..0c8f3e4f5f 100644 --- a/api/core/evaluation/runners/base_evaluation_runner.py +++ b/api/core/evaluation/runners/base_evaluation_runner.py @@ -11,7 +11,6 @@ Orchestrates the evaluation lifecycle in four phases: import json import logging from abc import ABC, abstractmethod -from typing import Any from sqlalchemy.orm import Session @@ -19,12 +18,10 @@ from core.evaluation.base_evaluation_instance import BaseEvaluationInstance from core.evaluation.entities.evaluation_entity import ( CustomizedMetrics, DefaultMetric, - EvaluationItemInput, EvaluationItemResult, ) from core.evaluation.entities.judgment_entity import JudgmentConfig from core.evaluation.judgment.processor import JudgmentProcessor -from core.workflow.enums import WorkflowNodeExecutionStatus from core.workflow.node_events import NodeRunResult from libs.datetime_utils import naive_utc_now from models.evaluation import EvaluationRun, EvaluationRunItem, EvaluationRunStatus @@ -79,7 +76,7 @@ class BaseEvaluationRunner(ABC): evaluation_run.started_at = naive_utc_now() self.session.commit() - results: list[EvaluationItemResult] = [] + results_by_index: dict[int, EvaluationItemResult] = {} # Phase 1: run evaluation if default_metric and node_run_result_list: @@ -93,22 +90,30 @@ class BaseEvaluationRunner(ABC): model_name=model_name, tenant_id=tenant_id, ) - # Merge evaluated metrics back into results - evaluated_by_index = {r.index: r for r in evaluated_results} - for i, result in enumerate(results): - if result.index in evaluated_by_index: - results[i] = evaluated_by_index[result.index] + for r in evaluated_results: + results_by_index[r.index] = r except Exception: logger.exception("Failed to compute metrics for evaluation run %s", evaluation_run_id) if customized_metrics and node_run_result_mapping_list: try: - evaluated_results = self._evaluate_customized( + customized_results = self.evaluation_instance.evaluate_with_customized_workflow( node_run_result_mapping_list=node_run_result_mapping_list, customized_metrics=customized_metrics, tenant_id=tenant_id, ) + for r in customized_results: + existing = results_by_index.get(r.index) + if existing: + # Merge: combine metrics from both sources into one result + results_by_index[r.index] = existing.model_copy( + update={"metrics": existing.metrics + r.metrics} + ) + else: + results_by_index[r.index] = r except Exception: - logger.exception("Failed to compute metrics for evaluation run %s", evaluation_run_id) + logger.exception("Failed to compute customized metrics for evaluation run %s", evaluation_run_id) + + results = list(results_by_index.values()) # Phase 4: Persist individual items for result in results: @@ -132,79 +137,32 @@ class BaseEvaluationRunner(ABC): return results - def _evaluate_customized( - self, - node_run_result_mapping_list: list[dict[str, NodeRunResult]], - customized_metrics: CustomizedMetrics, - tenant_id: str, - ) -> list[EvaluationItemResult]: - """Delegate to the instance's customized workflow evaluator. - - Unlike the framework path (which merges ``actual_output`` into - ``context``), here we pass ``results`` directly — the instance's - ``evaluate_with_customized_workflow()`` reads ``actual_output`` - from each ``EvaluationItemResult``. - """ - evaluated_results = self.evaluation_instance.evaluate_with_customized_workflow( - node_run_result_mapping_list=node_run_result_mapping_list, - customized_metrics=customized_metrics, - tenant_id=tenant_id, - ) - - # Merge metrics back preserving actual_output and metadata from Phase 1 - eval_by_index = {r.index: r for r in evaluated} - final_results: list[EvaluationItemResult] = [] - for result in results: - if result.index in eval_by_index: - eval_result = eval_by_index[result.index] - final_results.append( - EvaluationItemResult( - index=result.index, - actual_output=result.actual_output, - metrics=eval_result.metrics, - metadata={**result.metadata, **eval_result.metadata}, - error=eval_result.error, - ) - ) - else: - final_results.append(result) - return final_results - @staticmethod def _apply_judgment( results: list[EvaluationItemResult], - items: list[EvaluationItemInput], judgment_config: JudgmentConfig, + node_run_result_mapping_list: list[dict[str, NodeRunResult]] | None = None, ) -> list[EvaluationItemResult]: """Apply judgment conditions to each result's metrics. - - Builds a metric_name → value mapping from each result's metrics, - and a variable_values dict from the evaluation target's runtime data - (inputs, actual_output, expected_output) for variable-type conditions. - Results with errors are skipped. """ - items_by_index = {item.index: item for item in items} judged_results: list[EvaluationItemResult] = [] - for result in results: + for idx, result in enumerate(results): if result.error is not None or not result.metrics: judged_results.append(result) continue - metric_values: dict[str, object] = {m.name: m.score for m in result.metrics} + # Left side: only metrics + metric_values: dict[str, object] = {m.name: m.value for m in result.metrics} - # Build variable pool from the evaluation target's runtime data. - # These variables can be referenced in conditions with value_source="variable". - item_input = items_by_index.get(result.index) - variable_values: dict[str, object] = {} - if item_input: - variable_values.update(item_input.inputs) - if item_input.expected_output is not None: - variable_values["expected_output"] = item_input.expected_output - if item_input.context: - variable_values["context"] = "; ".join(item_input.context) - if result.actual_output is not None: - variable_values["actual_output"] = result.actual_output + # Right side variable pool: metrics + intermediate node run results + variable_values: dict[str, object] = dict(metric_values) + if node_run_result_mapping_list and idx < len(node_run_result_mapping_list): + node_run_result_mapping = node_run_result_mapping_list[idx] + for node_id, node_result in node_run_result_mapping.items(): + if node_result.outputs: + for output_key, output_value in node_result.outputs.items(): + variable_values[f"{node_id}.{output_key}"] = output_value judgment_result = JudgmentProcessor.evaluate( metric_values, judgment_config, variable_values=variable_values