From 1d248053e60dcb1ff974fd754b2a69cf2fb3b687 Mon Sep 17 00:00:00 2001 From: jyong <718720800@qq.com> Date: Thu, 12 Mar 2026 14:32:36 +0800 Subject: [PATCH] evaluation runtime --- .../frameworks/ragas/ragas_evaluator.py | 138 +++++++++++------- .../runners/retrieval_evaluation_runner.py | 6 +- 2 files changed, 84 insertions(+), 60 deletions(-) diff --git a/api/core/evaluation/frameworks/ragas/ragas_evaluator.py b/api/core/evaluation/frameworks/ragas/ragas_evaluator.py index 70a544a38f..88588f0d6e 100644 --- a/api/core/evaluation/frameworks/ragas/ragas_evaluator.py +++ b/api/core/evaluation/frameworks/ragas/ragas_evaluator.py @@ -92,13 +92,8 @@ class RagasEvaluator(BaseEvaluationInstance): tenant_id: str, category: EvaluationCategory, ) -> list[EvaluationItemResult]: - """Core evaluation logic using RAGAS. - - Uses the Dify model wrapper as judge LLM. Falls back to simple - string similarity if RAGAS import fails. - """ + """Core evaluation logic using RAGAS.""" model_wrapper = DifyModelWrapper(model_provider, model_name, tenant_id) - # Extract metric names from metric_name string. requested_metrics = ( [metric_name] if metric_name @@ -118,54 +113,88 @@ class RagasEvaluator(BaseEvaluationInstance): model_wrapper: DifyModelWrapper, category: EvaluationCategory, ) -> list[EvaluationItemResult]: - """Evaluate using RAGAS library.""" - from ragas import evaluate as ragas_evaluate - from ragas.dataset_schema import EvaluationDataset, SingleTurnSample + """Evaluate using RAGAS library. - # Build RAGAS dataset - samples = [] + Builds SingleTurnSample differently per category to match ragas requirements: + - LLM/Workflow: user_input=prompt, response=output, reference=expected_output + - Retrieval: user_input=query, reference=expected_output, retrieved_contexts=context + - Agent: Not supported via EvaluationDataset (requires message-based API) + """ + from ragas import evaluate as ragas_evaluate + from ragas.dataset_schema import EvaluationDataset + + samples: list[Any] = [] for item in items: - sample = SingleTurnSample( - user_input=self._inputs_format(item.inputs, category), - response=item.expected_output or "", - retrieved_contexts=item.context or [], - ) - if item.expected_output: - sample.reference = item.expected_output + sample = self._build_sample(item, category) samples.append(sample) dataset = EvaluationDataset(samples=samples) - # Build metric instances ragas_metrics = self._build_ragas_metrics(requested_metrics) - if not ragas_metrics: logger.warning("No valid RAGAS metrics found for: %s", requested_metrics) return [EvaluationItemResult(index=item.index) for item in items] - # Run RAGAS evaluation try: result = ragas_evaluate( dataset=dataset, metrics=ragas_metrics, ) - # Convert RAGAS results to our format - results = [] + results: list[EvaluationItemResult] = [] result_df = result.to_pandas() for i, item in enumerate(items): - metrics = [] - for metric_name in requested_metrics: - if metric_name in result_df.columns: - score = result_df.iloc[i][metric_name] - if score is not None and not (isinstance(score, float) and score != score): # NaN check - metrics.append(EvaluationMetric(name=metric_name, value=float(score))) + metrics: list[EvaluationMetric] = [] + for m_name in requested_metrics: + if m_name in result_df.columns: + score = result_df.iloc[i][m_name] + if score is not None and not (isinstance(score, float) and score != score): + metrics.append(EvaluationMetric(name=m_name, value=float(score))) results.append(EvaluationItemResult(index=item.index, metrics=metrics)) return results except Exception: logger.exception("RAGAS evaluation failed, falling back to simple evaluation") return self._evaluate_simple(items, requested_metrics, model_wrapper) + @staticmethod + def _build_sample(item: EvaluationItemInput, category: EvaluationCategory) -> Any: + """Build a ragas SingleTurnSample with the correct fields per category. + + ragas metric field requirements: + - faithfulness: user_input, response, retrieved_contexts + - answer_relevancy: user_input, response + - answer_correctness: user_input, response, reference + - semantic_similarity: user_input, response, reference + - context_precision: user_input, reference, retrieved_contexts + - context_recall: user_input, reference, retrieved_contexts + - context_relevance: user_input, retrieved_contexts + """ + from ragas.dataset_schema import SingleTurnSample + + user_input = _format_input(item.inputs, category) + + match category: + case EvaluationCategory.LLM: + # response = actual LLM output, reference = expected output + return SingleTurnSample( + user_input=user_input, + response=item.output, + reference=item.expected_output or "", + retrieved_contexts=item.context or [], + ) + case EvaluationCategory.RETRIEVAL: + # context_precision/recall only need reference + retrieved_contexts + return SingleTurnSample( + user_input=user_input, + reference=item.expected_output or "", + retrieved_contexts=item.context or [], + ) + case _: + return SingleTurnSample( + user_input=user_input, + response=item.output, + ) + def _evaluate_simple( self, items: list[EvaluationItemInput], @@ -173,18 +202,15 @@ class RagasEvaluator(BaseEvaluationInstance): model_wrapper: DifyModelWrapper, ) -> list[EvaluationItemResult]: """Simple LLM-as-judge fallback when RAGAS is not available.""" - results = [] + results: list[EvaluationItemResult] = [] for item in items: - metrics = [] - query = self._inputs_to_query(item.inputs) - - for metric_name in requested_metrics: + metrics: list[EvaluationMetric] = [] + for m_name in requested_metrics: try: - score = self._judge_with_llm(model_wrapper, metric_name, query, item) - metrics.append(EvaluationMetric(name=metric_name, value=score)) + score = self._judge_with_llm(model_wrapper, m_name, item) + metrics.append(EvaluationMetric(name=m_name, value=score)) except Exception: - logger.exception("Failed to compute metric %s for item %d", metric_name, item.index) - + logger.exception("Failed to compute metric %s for item %d", m_name, item.index) results.append(EvaluationItemResult(index=item.index, metrics=metrics)) return results @@ -192,19 +218,20 @@ class RagasEvaluator(BaseEvaluationInstance): self, model_wrapper: DifyModelWrapper, metric_name: str, - query: str, item: EvaluationItemInput, ) -> float: """Use the LLM to judge a single metric for a single item.""" - prompt = self._build_judge_prompt(metric_name, query, item) + prompt = self._build_judge_prompt(metric_name, item) response = model_wrapper.invoke(prompt) return self._parse_score(response) - def _build_judge_prompt(self, metric_name: str, query: str, item: EvaluationItemInput) -> str: + @staticmethod + def _build_judge_prompt(metric_name: str, item: EvaluationItemInput) -> str: """Build a scoring prompt for the LLM judge.""" parts = [ f"Evaluate the following on the metric '{metric_name}' using a scale of 0.0 to 1.0.", - f"\nQuery: {query}", + f"\nInput: {item.inputs}", + f"\nOutput: {item.output}", ] if item.expected_output: parts.append(f"\nExpected Output: {item.expected_output}") @@ -218,31 +245,19 @@ class RagasEvaluator(BaseEvaluationInstance): @staticmethod def _parse_score(response: str) -> float: """Parse a float score from LLM response.""" + import re + cleaned = response.strip() try: score = float(cleaned) return max(0.0, min(1.0, score)) except ValueError: - # Try to extract first number from response - import re - match = re.search(r"(\d+\.?\d*)", cleaned) if match: score = float(match.group(1)) return max(0.0, min(1.0, score)) return 0.0 - @staticmethod - def _inputs_format(inputs: dict[str, Any], category: EvaluationCategory) -> str: - """Convert input dict to a prompt string.""" - match category: - case EvaluationCategory.LLM: - return str(inputs["prompt"]) - case EvaluationCategory.RETRIEVAL: - return str(inputs["query"]) - case _: - return "" - @staticmethod def _build_ragas_metrics(requested_metrics: list[str]) -> list[Any]: """Build RAGAS metric instances from metric names.""" @@ -280,3 +295,14 @@ class RagasEvaluator(BaseEvaluationInstance): except ImportError: logger.warning("RAGAS metrics not available") return [] + + +def _format_input(inputs: dict[str, Any], category: EvaluationCategory) -> str: + """Extract the user-facing input string from the inputs dict.""" + match category: + case EvaluationCategory.LLM | EvaluationCategory.WORKFLOW: + return str(inputs.get("prompt", "")) + case EvaluationCategory.RETRIEVAL: + return str(inputs.get("query", "")) + case _: + return str(next(iter(inputs.values()), "")) if inputs else "" diff --git a/api/core/evaluation/runners/retrieval_evaluation_runner.py b/api/core/evaluation/runners/retrieval_evaluation_runner.py index a11032ec63..d1dc2a4151 100644 --- a/api/core/evaluation/runners/retrieval_evaluation_runner.py +++ b/api/core/evaluation/runners/retrieval_evaluation_runner.py @@ -42,13 +42,11 @@ class RetrievalEvaluationRunner(BaseEvaluationRunner): for i, node_result in enumerate(node_run_result_list): # Extract retrieved contexts from outputs outputs = node_result.outputs - contexts = list(outputs.get("retrieved_contexts", [])) query = self._extract_query(dict(node_result.inputs)) # Extract retrieved content from result list result_list = outputs.get("result", []) - output = "\n---\n".join( - str(item.get("content", "")) for item in result_list if item.get("content") - ) + contexts = [item.get("content", "") for item in result_list if item.get("content")] + output = "\n---\n".join(contexts) merged_items.append( EvaluationItemInput(