diff --git a/api/core/evaluation/base_evaluation_instance.py b/api/core/evaluation/base_evaluation_instance.py index 3a5b853c22..03979efc04 100644 --- a/api/core/evaluation/base_evaluation_instance.py +++ b/api/core/evaluation/base_evaluation_instance.py @@ -21,7 +21,7 @@ class BaseEvaluationInstance(ABC): def evaluate_llm( self, items: list[EvaluationItemInput], - default_metrics: list[dict[str, Any]], + default_metrics: str, model_provider: str, model_name: str, tenant_id: str, @@ -33,7 +33,7 @@ class BaseEvaluationInstance(ABC): def evaluate_retrieval( self, items: list[EvaluationItemInput], - default_metrics: list[dict[str, Any]], + default_metrics: str, model_provider: str, model_name: str, tenant_id: str, @@ -45,7 +45,7 @@ class BaseEvaluationInstance(ABC): def evaluate_agent( self, items: list[EvaluationItemInput], - default_metrics: list[dict[str, Any]], + default_metrics: str, model_provider: str, model_name: str, tenant_id: str, @@ -53,188 +53,8 @@ class BaseEvaluationInstance(ABC): """Evaluate agent outputs using the configured framework.""" ... - @abstractmethod - def evaluate_workflow( - self, - items: list[EvaluationItemInput], - default_metrics: list[dict[str, Any]], - model_provider: str, - model_name: str, - tenant_id: str, - ) -> list[EvaluationItemResult]: - """Evaluate workflow outputs using the configured framework.""" - ... - @abstractmethod def get_supported_metrics(self, category: EvaluationCategory) -> list[str]: """Return the list of supported metric names for a given evaluation category.""" ... - def evaluate_with_customized_workflow( - self, - items: list[EvaluationItemInput], - results: list[EvaluationItemResult], - customized_metrics: dict[str, Any], - tenant_id: str, - ) -> list[EvaluationItemResult]: - """Evaluate using a published workflow as the evaluator. - - The evaluator workflow's output variables are treated as metrics: - each output variable name becomes a metric name, and its value - becomes the score. - - Args: - items: Evaluation items with inputs, expected_output, context. - results: Results from Phase 1 (with actual_output populated). - customized_metrics: Must contain ``evaluation_workflow_id`` - pointing to a published WORKFLOW-type App. - tenant_id: Tenant scope. - - Returns: - A list of ``EvaluationItemResult`` with metrics extracted from - the workflow outputs. - """ - from sqlalchemy.orm import Session - - from core.app.apps.workflow.app_generator import WorkflowAppGenerator - from core.app.entities.app_invoke_entities import InvokeFrom - from core.evaluation.runners import get_service_account_for_app - from models.engine import db - from models.model import App - from services.workflow_service import WorkflowService - - workflow_id = customized_metrics.get("evaluation_workflow_id") - if not workflow_id: - raise ValueError( - "customized_metrics must contain 'evaluation_workflow_id' for customized evaluator" - ) - - # Load the evaluator workflow resources using a dedicated session - with Session(db.engine, expire_on_commit=False) as session, session.begin(): - app = session.query(App).filter_by( - id=workflow_id, tenant_id=tenant_id - ).first() - if not app: - raise ValueError( - f"Evaluation workflow app {workflow_id} not found in tenant {tenant_id}" - ) - service_account = get_service_account_for_app(session, workflow_id) - - workflow_service = WorkflowService() - published_workflow = workflow_service.get_published_workflow(app_model=app) - if not published_workflow: - raise ValueError( - f"No published workflow found for evaluation app {workflow_id}" - ) - - result_by_index = {r.index: r for r in results} - eval_results: list[EvaluationItemResult] = [] - for item in items: - result = result_by_index.get(item.index) - try: - workflow_inputs = self._build_workflow_inputs(item, result) - - generator = WorkflowAppGenerator() - response: Mapping[str, Any] = generator.generate( - app_model=app, - workflow=published_workflow, - user=service_account, - args={"inputs": workflow_inputs}, - invoke_from=InvokeFrom.SERVICE_API, - streaming=False, - ) - - metrics = self._extract_workflow_metrics(response) - eval_results.append( - EvaluationItemResult( - index=item.index, - metrics=metrics, - metadata={ - "workflow_response": _safe_serialize(response), - }, - ) - ) - except Exception: - logger.exception( - "Customized evaluator failed for item %d with workflow %s", - item.index, - workflow_id, - ) - eval_results.append(EvaluationItemResult(index=item.index)) - - return eval_results - - @staticmethod - def _build_workflow_inputs( - item: EvaluationItemInput, - result: EvaluationItemResult | None, - ) -> dict[str, Any]: - """Build workflow input dict from evaluation data. - - Maps evaluation data to conventional workflow input variable names: - - ``actual_output``: The target's actual output (from ``result``). - - ``expected_output``: The expected/reference output. - - ``inputs``: The original evaluation inputs as a JSON string. - - ``context``: All context strings joined by newlines. - """ - workflow_inputs: dict[str, Any] = {} - - if result and result.actual_output: - workflow_inputs["actual_output"] = result.actual_output - - if item.expected_output: - workflow_inputs["expected_output"] = item.expected_output - - if item.inputs: - workflow_inputs["inputs"] = json.dumps(item.inputs, ensure_ascii=False) - - if item.context: - workflow_inputs["context"] = "\n\n".join(item.context) - - return workflow_inputs - - @staticmethod - def _extract_workflow_metrics( - response: Mapping[str, Any], - ) -> list[EvaluationMetric]: - """Extract evaluation metrics from workflow output variables. - - Each output variable is treated as a metric. The variable name - becomes the metric name, and its value becomes the score. - Non-numeric values are recorded with ``score=0.0`` and the raw - value stored in ``details``. - """ - metrics: list[EvaluationMetric] = [] - - data = response.get("data", {}) - if not isinstance(data, Mapping): - logger.warning("Unexpected workflow response format: missing 'data' dict") - return metrics - - outputs = data.get("outputs", {}) - if not isinstance(outputs, Mapping): - logger.warning( - "Unexpected workflow response format: 'outputs' is not a dict" - ) - return metrics - - for key, value in outputs.items(): - try: - score = float(value) - metrics.append(EvaluationMetric(name=key, score=score)) - except (TypeError, ValueError): - metrics.append( - EvaluationMetric( - name=key, score=0.0, details={"raw_value": value} - ) - ) - - return metrics - - -def _safe_serialize(response: Mapping[str, Any]) -> dict[str, Any]: - """Safely serialize workflow response for metadata storage.""" - try: - return dict(response) - except Exception: - return {"raw": str(response)} diff --git a/api/core/evaluation/entities/evaluation_entity.py b/api/core/evaluation/entities/evaluation_entity.py index 7d84e02c15..53138d253c 100644 --- a/api/core/evaluation/entities/evaluation_entity.py +++ b/api/core/evaluation/entities/evaluation_entity.py @@ -24,6 +24,7 @@ class EvaluationMetric(BaseModel): class EvaluationItemInput(BaseModel): index: int inputs: dict[str, Any] + output: str expected_output: str | None = None context: list[str] | None = None @@ -89,4 +90,4 @@ class EvaluationRunData(BaseModel): default_metrics: list[DefaultMetric] = Field(default_factory=list) customized_metrics: CustomizedMetrics | None = None judgment_config: JudgmentConfig | None = None - input_list: list[dict] + input_list: list[EvaluationItemInput] diff --git a/api/core/evaluation/frameworks/ragas/ragas_evaluator.py b/api/core/evaluation/frameworks/ragas/ragas_evaluator.py index 31f08639fd..5fba73fafa 100644 --- a/api/core/evaluation/frameworks/ragas/ragas_evaluator.py +++ b/api/core/evaluation/frameworks/ragas/ragas_evaluator.py @@ -42,7 +42,7 @@ class RagasEvaluator(BaseEvaluationInstance): def evaluate_llm( self, items: list[EvaluationItemInput], - default_metrics: list[dict[str, Any]], + default_metrics: str, model_provider: str, model_name: str, tenant_id: str, @@ -52,7 +52,7 @@ class RagasEvaluator(BaseEvaluationInstance): def evaluate_retrieval( self, items: list[EvaluationItemInput], - default_metrics: list[dict[str, Any]], + default_metrics: str, model_provider: str, model_name: str, tenant_id: str, @@ -64,7 +64,7 @@ class RagasEvaluator(BaseEvaluationInstance): def evaluate_agent( self, items: list[EvaluationItemInput], - default_metrics: list[dict[str, Any]], + default_metrics: str, model_provider: str, model_name: str, tenant_id: str, @@ -74,7 +74,7 @@ class RagasEvaluator(BaseEvaluationInstance): def evaluate_workflow( self, items: list[EvaluationItemInput], - default_metrics: list[dict[str, Any]], + default_metrics: str, model_provider: str, model_name: str, tenant_id: str, @@ -86,7 +86,7 @@ class RagasEvaluator(BaseEvaluationInstance): def _evaluate( self, items: list[EvaluationItemInput], - default_metrics: list[dict[str, Any]], + default_metrics: str, model_provider: str, model_name: str, tenant_id: str, @@ -98,9 +98,9 @@ class RagasEvaluator(BaseEvaluationInstance): string similarity if RAGAS import fails. """ model_wrapper = DifyModelWrapper(model_provider, model_name, tenant_id) - # Extract metric names from default_metrics list; each item has a "metric" key. + # Extract metric names from default_metrics string. requested_metrics = ( - [m["metric"] for m in default_metrics if "metric" in m] + [default_metrics] if default_metrics else self.get_supported_metrics(category) ) diff --git a/api/core/evaluation/runners/agent_evaluation_runner.py b/api/core/evaluation/runners/agent_evaluation_runner.py index 5c03c376dd..8cc5c3cc5b 100644 --- a/api/core/evaluation/runners/agent_evaluation_runner.py +++ b/api/core/evaluation/runners/agent_evaluation_runner.py @@ -69,8 +69,8 @@ class AgentEvaluationRunner(BaseEvaluationRunner): def evaluate_metrics( self, - node_run_result_mapping: dict[str, NodeRunResult] | None, - node_run_result: NodeRunResult | None, + node_run_result_mapping_list: list[dict[str, NodeRunResult]] | None, + node_run_result_list: list[NodeRunResult] | None, default_metric: DefaultMetric | None, customized_metrics: CustomizedMetrics | None, model_provider: str, diff --git a/api/core/evaluation/runners/base_evaluation_runner.py b/api/core/evaluation/runners/base_evaluation_runner.py index 1c57f9c99e..e75748ed17 100644 --- a/api/core/evaluation/runners/base_evaluation_runner.py +++ b/api/core/evaluation/runners/base_evaluation_runner.py @@ -42,8 +42,8 @@ class BaseEvaluationRunner(ABC): @abstractmethod def evaluate_metrics( self, - node_run_result_mapping: dict[str, NodeRunResult] | None, - node_run_result: NodeRunResult | None, + node_run_result_mapping_list: list[dict[str, NodeRunResult]] | None, + node_run_result_list: list[NodeRunResult] | None, default_metric: DefaultMetric | None, customized_metrics: CustomizedMetrics | None, model_provider: str, @@ -59,12 +59,12 @@ class BaseEvaluationRunner(ABC): tenant_id: str, target_id: str, target_type: str, - node_run_result: NodeRunResult | None = None, + node_run_result_list: list[NodeRunResult] | None = None, default_metric: DefaultMetric | None = None, customized_metrics: CustomizedMetrics | None = None, model_provider: str = "", model_name: str = "", - node_run_result_mapping: dict[str, NodeRunResult] | None = None, + node_run_result_mapping_list: list[dict[str, NodeRunResult]] | None = None, ) -> list[EvaluationItemResult]: """Orchestrate target execution + metric evaluation + judgment for all items.""" evaluation_run = self.session.query(EvaluationRun).filter_by(id=evaluation_run_id).first() @@ -82,11 +82,11 @@ class BaseEvaluationRunner(ABC): results: list[EvaluationItemResult] = [] # Phase 1: run evaluation - if node_run_result.status == WorkflowNodeExecutionStatus.SUCCEEDED: + if default_metric and node_run_result_list: try: evaluated_results = self.evaluate_metrics( - node_run_result_mapping=node_run_result_mapping, - node_run_result=node_run_result, + node_run_result_mapping_list=node_run_result_mapping_list, + node_run_result_list=node_run_result_list, default_metric=default_metric, customized_metrics=customized_metrics, model_provider=model_provider, @@ -100,6 +100,19 @@ class BaseEvaluationRunner(ABC): results[i] = evaluated_by_index[result.index] except Exception: logger.exception("Failed to compute metrics for evaluation run %s", evaluation_run_id) + if customized_metrics and node_run_result_mapping_list: + try: + evaluated_results = self.evaluate_metrics( + node_run_result_mapping_list=node_run_result_mapping_list, + node_run_result_list=node_run_result_list, + default_metric=default_metric, + customized_metrics=customized_metrics, + model_provider=model_provider, + model_name=model_name, + tenant_id=tenant_id, + ) + except Exception: + logger.exception("Failed to compute metrics for evaluation run %s", evaluation_run_id) # Phase 4: Persist individual items for result in results: diff --git a/api/core/evaluation/runners/llm_evaluation_runner.py b/api/core/evaluation/runners/llm_evaluation_runner.py index b34d57f9f6..7221ad08fc 100644 --- a/api/core/evaluation/runners/llm_evaluation_runner.py +++ b/api/core/evaluation/runners/llm_evaluation_runner.py @@ -26,8 +26,8 @@ class LLMEvaluationRunner(BaseEvaluationRunner): def evaluate_metrics( self, - node_run_result_mapping: dict[str, NodeRunResult] | None, - node_run_result: NodeRunResult | None, + node_run_result_mapping_list: list[dict[str, NodeRunResult]] | None, + node_run_result_list: list[NodeRunResult] | None, default_metric: DefaultMetric | None, customized_metrics: CustomizedMetrics | None, model_provider: str, @@ -36,7 +36,9 @@ class LLMEvaluationRunner(BaseEvaluationRunner): ) -> list[EvaluationItemResult]: """Use the evaluation instance to compute LLM metrics.""" # Merge actual_output into items for evaluation - merged_items = self._merge_results_into_items(items, results) + if not node_run_result_list: + return [] + merged_items = self._merge_results_into_items(node_run_result_list) return self.evaluation_instance.evaluate_llm( merged_items, default_metrics, model_provider, model_name, tenant_id ) @@ -70,23 +72,19 @@ class LLMEvaluationRunner(BaseEvaluationRunner): @staticmethod def _merge_results_into_items( - items: list[EvaluationItemInput], - results: list[EvaluationItemResult], + items: list[NodeRunResult], ) -> list[EvaluationItemInput]: """Create new items with actual_output set as expected_output context for metrics.""" - result_by_index = {r.index: r for r in results} merged = [] for item in items: - result = result_by_index.get(item.index) - if result and result.actual_output: - merged.append( - EvaluationItemInput( - index=item.index, - inputs=item.inputs, - expected_output=item.expected_output, - context=[result.actual_output] + (item.context or []), - ) + merged.append( + EvaluationItemInput( + index=item.index, + inputs={ + "prompt": item.prompt, + }, + output=item.output, + expected_output=item.expected_output, ) - else: - merged.append(item) + ) return merged diff --git a/api/core/evaluation/runners/retrieval_evaluation_runner.py b/api/core/evaluation/runners/retrieval_evaluation_runner.py index 44bf67ea34..311edee2d5 100644 --- a/api/core/evaluation/runners/retrieval_evaluation_runner.py +++ b/api/core/evaluation/runners/retrieval_evaluation_runner.py @@ -24,8 +24,8 @@ class RetrievalEvaluationRunner(BaseEvaluationRunner): def evaluate_metrics( self, - node_run_result_mapping: dict[str, NodeRunResult] | None, - node_run_result: NodeRunResult | None, + node_run_result_mapping_list: list[dict[str, NodeRunResult]] | None, + node_run_result_list: list[NodeRunResult] | None, default_metric: DefaultMetric | None, customized_metrics: CustomizedMetrics | None, model_provider: str, diff --git a/api/core/evaluation/runners/snippet_evaluation_runner.py b/api/core/evaluation/runners/snippet_evaluation_runner.py index b7e16f9772..8bef78ce22 100644 --- a/api/core/evaluation/runners/snippet_evaluation_runner.py +++ b/api/core/evaluation/runners/snippet_evaluation_runner.py @@ -92,8 +92,8 @@ class SnippetEvaluationRunner(BaseEvaluationRunner): def evaluate_metrics( self, - node_run_result_mapping: dict[str, NodeRunResult] | None, - node_run_result: NodeRunResult | None, + node_run_result_mapping_list: list[dict[str, NodeRunResult]] | None, + node_run_result_list: list[NodeRunResult] | None, default_metric: DefaultMetric | None, customized_metrics: CustomizedMetrics | None, model_provider: str, diff --git a/api/core/evaluation/runners/workflow_evaluation_runner.py b/api/core/evaluation/runners/workflow_evaluation_runner.py index 2fcab86ef0..c0fcfb4114 100644 --- a/api/core/evaluation/runners/workflow_evaluation_runner.py +++ b/api/core/evaluation/runners/workflow_evaluation_runner.py @@ -26,8 +26,8 @@ class WorkflowEvaluationRunner(BaseEvaluationRunner): def evaluate_metrics( self, - node_run_result_mapping: dict[str, NodeRunResult] | None, - node_run_result: NodeRunResult | None, + node_run_result_mapping_list: list[dict[str, NodeRunResult]] | None, + node_run_result_list: list[NodeRunResult] | None, default_metric: DefaultMetric | None, customized_metrics: CustomizedMetrics | None, model_provider: str, diff --git a/api/tasks/evaluation_task.py b/api/tasks/evaluation_task.py index 2304b01586..8c08c09c7b 100644 --- a/api/tasks/evaluation_task.py +++ b/api/tasks/evaluation_task.py @@ -27,6 +27,7 @@ from extensions.ext_database import db from libs.datetime_utils import naive_utc_now from models.evaluation import EvaluationRun, EvaluationRunStatus from models.model import UploadFile +from services.evaluation_service import EvaluationService logger = logging.getLogger(__name__) @@ -76,7 +77,20 @@ def _execute_evaluation(session: Any, run_data: EvaluationRunData) -> None: if evaluation_instance is None: raise ValueError("Evaluation framework not configured") - _execute_evaluation_runner(session, run_data, evaluation_instance, node_run_result_mapping) + evaluation_service = EvaluationService() + node_run_result_mapping_list: list[dict[str, NodeRunResult]] = evaluation_service.execute_targets( + tenant_id=run_data.tenant_id, + target_type=run_data.target_type, + target_id=run_data.target_id, + input_list=run_data.input_list, + ) + + results: list[EvaluationItemResult] = _execute_evaluation_runner( + session, + run_data, + evaluation_instance, + node_run_result_mapping_list, + ) # Compute summary metrics @@ -106,15 +120,19 @@ def _execute_evaluation_runner( session: Any, run_data: EvaluationRunData, evaluation_instance: BaseEvaluationInstance, - node_run_result_mapping: dict[str, NodeRunResult], + node_run_result_mapping_list: list[dict[str, NodeRunResult]], ) -> list[EvaluationItemResult]: """Execute the evaluation runner.""" default_metrics = run_data.default_metrics customized_metrics = run_data.customized_metrics for default_metric in default_metrics: for node_info in default_metric.node_info_list: - node_run_result = node_run_result_mapping.get(node_info.node_id) - if node_run_result: + node_run_result_list: list[NodeRunResult] = [] + for node_run_result_mapping in node_run_result_mapping_list: + node_run_result = node_run_result_mapping.get(node_info.node_id) + if node_run_result is not None: + node_run_result_list.append(node_run_result) + if node_run_result_list: runner = _create_runner(EvaluationCategory(node_info.type), evaluation_instance, session) runner.run( evaluation_run_id=run_data.evaluation_run_id, @@ -125,10 +143,8 @@ def _execute_evaluation_runner( customized_metrics=None, model_provider=run_data.evaluation_model_provider, model_name=run_data.evaluation_model, - node_run_result=node_run_result, + node_run_result_list=node_run_result_list, ) - else: - default_metric.score = 0 if customized_metrics: runner = _create_runner(EvaluationCategory.WORKFLOW, evaluation_instance, session) runner.run( @@ -138,8 +154,8 @@ def _execute_evaluation_runner( target_type=run_data.target_type, default_metric=None, customized_metrics=customized_metrics, - node_run_result=None, - node_run_result_mapping=node_run_result_mapping, + node_run_result_list=None, + node_run_result_mapping_list=node_run_result_mapping_list, ) def _create_runner(