From 4555c98d30e7839b36e93e748ef076235253b7a1 Mon Sep 17 00:00:00 2001 From: jyong <718720800@qq.com> Date: Thu, 12 Mar 2026 16:24:39 +0800 Subject: [PATCH] evaluation runtime --- .../evaluation/base_evaluation_instance.py | 167 ++++++++++++++++++ .../evaluation/entities/evaluation_entity.py | 10 +- .../runners/base_evaluation_runner.py | 17 +- api/services/evaluation_service.py | 7 +- 4 files changed, 186 insertions(+), 15 deletions(-) diff --git a/api/core/evaluation/base_evaluation_instance.py b/api/core/evaluation/base_evaluation_instance.py index 6fcfcd5357..07044e2cda 100644 --- a/api/core/evaluation/base_evaluation_instance.py +++ b/api/core/evaluation/base_evaluation_instance.py @@ -5,12 +5,14 @@ from collections.abc import Mapping from typing import Any from core.evaluation.entities.evaluation_entity import ( + CustomizedMetrics, DefaultMetric, EvaluationCategory, EvaluationItemInput, EvaluationItemResult, EvaluationMetric, ) +from core.workflow.node_events import NodeRunResult logger = logging.getLogger(__name__) @@ -59,3 +61,168 @@ class BaseEvaluationInstance(ABC): """Return the list of supported metric names for a given evaluation category.""" ... + def evaluate_with_customized_workflow( + self, + node_run_result_mapping_list: list[dict[str, NodeRunResult]], + customized_metrics: CustomizedMetrics, + tenant_id: str, + ) -> list[EvaluationItemResult]: + """Evaluate using a published workflow as the evaluator. + + The evaluator workflow's output variables are treated as metrics: + each output variable name becomes a metric name, and its value + becomes the score. + + Args: + items: Evaluation items with inputs, expected_output, context. + results: Results from Phase 1 (with actual_output populated). + customized_metrics: Must contain ``evaluation_workflow_id`` + pointing to a published WORKFLOW-type App. + tenant_id: Tenant scope. + + Returns: + A list of ``EvaluationItemResult`` with metrics extracted from + the workflow outputs. + """ + from sqlalchemy.orm import Session + + from core.app.apps.workflow.app_generator import WorkflowAppGenerator + from core.app.entities.app_invoke_entities import InvokeFrom + from core.evaluation.runners import get_service_account_for_app + from models.engine import db + from models.model import App + from services.workflow_service import WorkflowService + + workflow_id = customized_metrics.get("evaluation_workflow_id") + if not workflow_id: + raise ValueError( + "customized_metrics must contain 'evaluation_workflow_id' for customized evaluator" + ) + + # Load the evaluator workflow resources using a dedicated session + with Session(db.engine, expire_on_commit=False) as session, session.begin(): + app = session.query(App).filter_by( + id=workflow_id, tenant_id=tenant_id + ).first() + if not app: + raise ValueError( + f"Evaluation workflow app {workflow_id} not found in tenant {tenant_id}" + ) + service_account = get_service_account_for_app(session, workflow_id) + + workflow_service = WorkflowService() + published_workflow = workflow_service.get_published_workflow(app_model=app) + if not published_workflow: + raise ValueError( + f"No published workflow found for evaluation app {workflow_id}" + ) + + eval_results: list[EvaluationItemResult] = [] + for node_run_result_mapping in node_run_result_mapping_list: + try: + workflow_inputs = self._build_workflow_inputs(customized_metrics.input_fields, node_run_result_mapping) + + generator = WorkflowAppGenerator() + response: Mapping[str, Any] = generator.generate( + app_model=app, + workflow=published_workflow, + user=service_account, + args={"inputs": workflow_inputs}, + invoke_from=InvokeFrom.SERVICE_API, + streaming=False, + ) + + metrics = self._extract_workflow_metrics(response) + eval_results.append( + EvaluationItemResult( + index=item.index, + metrics=metrics, + metadata={ + "workflow_response": _safe_serialize(response), + }, + ) + ) + except Exception: + logger.exception( + "Customized evaluator failed for item %d with workflow %s", + item.index, + workflow_id, + ) + eval_results.append(EvaluationItemResult(index=item.index)) + + return eval_results + + @staticmethod + def _build_workflow_inputs( + input_fields: dict[str, Any], + node_run_result_mapping: dict[str, NodeRunResult], + ) -> dict[str, Any]: + """Build workflow input dict from evaluation data. + + Maps evaluation data to conventional workflow input variable names: + - ``actual_output``: The target's actual output (from ``result``). + - ``expected_output``: The expected/reference output. + - ``inputs``: The original evaluation inputs as a JSON string. + - ``context``: All context strings joined by newlines. + """ + workflow_inputs: dict[str, Any] = {} + + if result and result.actual_output: + workflow_inputs["actual_output"] = result.actual_output + + if item.expected_output: + workflow_inputs["expected_output"] = item.expected_output + + if item.inputs: + workflow_inputs["inputs"] = json.dumps(item.inputs, ensure_ascii=False) + + if item.context: + workflow_inputs["context"] = "\n\n".join(item.context) + + return workflow_inputs + + @staticmethod + def _extract_workflow_metrics( + response: Mapping[str, Any], + ) -> list[EvaluationMetric]: + """Extract evaluation metrics from workflow output variables. + + Each output variable is treated as a metric. The variable name + becomes the metric name, and its value becomes the score. + Non-numeric values are recorded with ``score=0.0`` and the raw + value stored in ``details``. + """ + metrics: list[EvaluationMetric] = [] + + data = response.get("data", {}) + if not isinstance(data, Mapping): + logger.warning("Unexpected workflow response format: missing 'data' dict") + return metrics + + outputs = data.get("outputs", {}) + if not isinstance(outputs, Mapping): + logger.warning( + "Unexpected workflow response format: 'outputs' is not a dict" + ) + return metrics + + for key, value in outputs.items(): + try: + score = float(value) + metrics.append(EvaluationMetric(name=key, score=score)) + except (TypeError, ValueError): + metrics.append( + EvaluationMetric( + name=key, score=0.0, details={"raw_value": value} + ) + ) + + return metrics + + +def _safe_serialize(response: Mapping[str, Any]) -> dict[str, Any]: + """Safely serialize workflow response for metadata storage.""" + try: + return dict(response) + except Exception: + return {"raw": str(response)} diff --git a/api/core/evaluation/entities/evaluation_entity.py b/api/core/evaluation/entities/evaluation_entity.py index 53138d253c..63fe180899 100644 --- a/api/core/evaluation/entities/evaluation_entity.py +++ b/api/core/evaluation/entities/evaluation_entity.py @@ -29,6 +29,12 @@ class EvaluationItemInput(BaseModel): context: list[str] | None = None +class EvaluationDatasetInput(BaseModel): + index: int + inputs: dict[str, Any] + expected_output: str | None = None + + class EvaluationItemResult(BaseModel): index: int actual_output: str | None = None @@ -61,7 +67,7 @@ class CustomizedMetricOutputField(BaseModel): class CustomizedMetrics(BaseModel): evaluation_workflow_id: str - input_fields: dict[str, str] + input_fields: dict[str, Any] output_fields: list[CustomizedMetricOutputField] @@ -90,4 +96,4 @@ class EvaluationRunData(BaseModel): default_metrics: list[DefaultMetric] = Field(default_factory=list) customized_metrics: CustomizedMetrics | None = None judgment_config: JudgmentConfig | None = None - input_list: list[EvaluationItemInput] + input_list: list[EvaluationDatasetInput] diff --git a/api/core/evaluation/runners/base_evaluation_runner.py b/api/core/evaluation/runners/base_evaluation_runner.py index e75748ed17..2223bfbe32 100644 --- a/api/core/evaluation/runners/base_evaluation_runner.py +++ b/api/core/evaluation/runners/base_evaluation_runner.py @@ -102,13 +102,9 @@ class BaseEvaluationRunner(ABC): logger.exception("Failed to compute metrics for evaluation run %s", evaluation_run_id) if customized_metrics and node_run_result_mapping_list: try: - evaluated_results = self.evaluate_metrics( + evaluated_results = self._evaluate_customized( node_run_result_mapping_list=node_run_result_mapping_list, - node_run_result_list=node_run_result_list, - default_metric=default_metric, customized_metrics=customized_metrics, - model_provider=model_provider, - model_name=model_name, tenant_id=tenant_id, ) except Exception: @@ -138,9 +134,8 @@ class BaseEvaluationRunner(ABC): def _evaluate_customized( self, - items: list[EvaluationItemInput], - results: list[EvaluationItemResult], - customized_metrics: dict[str, Any], + node_run_result_mapping_list: list[dict[str, NodeRunResult]], + customized_metrics: CustomizedMetrics, tenant_id: str, ) -> list[EvaluationItemResult]: """Delegate to the instance's customized workflow evaluator. @@ -150,8 +145,10 @@ class BaseEvaluationRunner(ABC): ``evaluate_with_customized_workflow()`` reads ``actual_output`` from each ``EvaluationItemResult``. """ - evaluated = self.evaluation_instance.evaluate_with_customized_workflow( - items, results, customized_metrics, tenant_id, + evaluated_results = self.evaluation_instance.evaluate_with_customized_workflow( + node_run_result_mapping_list=node_run_result_mapping_list, + customized_metrics=customized_metrics, + tenant_id=tenant_id, ) # Merge metrics back preserving actual_output and metadata from Phase 1 diff --git a/api/services/evaluation_service.py b/api/services/evaluation_service.py index 00c402eb3e..101d794c20 100644 --- a/api/services/evaluation_service.py +++ b/api/services/evaluation_service.py @@ -14,6 +14,7 @@ from core.evaluation.entities.evaluation_entity import ( DefaultMetric, EvaluationCategory, EvaluationConfigData, + EvaluationDatasetInput, EvaluationItemInput, EvaluationRunData, EvaluationRunRequest, @@ -455,7 +456,7 @@ class EvaluationService: tenant_id: str, target_type: str, target_id: str, - input_list: list[EvaluationItemInput], + input_list: list[EvaluationDatasetInput], max_workers: int = 5, ) -> list[dict[str, NodeRunResult]]: """Execute the evaluation target for every test-data item in parallel. @@ -475,7 +476,7 @@ class EvaluationService: flask_app: Flask = current_app._get_current_object() # type: ignore - def _worker(item: EvaluationItemInput) -> dict[str, NodeRunResult]: + def _worker(item: EvaluationDatasetInput) -> dict[str, NodeRunResult]: with flask_app.app_context(): from models.engine import db @@ -532,7 +533,7 @@ class EvaluationService: session: Session, target_type: str, target_id: str, - item: EvaluationItemInput, + item: EvaluationDatasetInput, ) -> Mapping[str, object]: """Execute a single evaluation target with one test-data item.