diff --git a/api/controllers/console/evaluation/evaluation.py b/api/controllers/console/evaluation/evaluation.py index e994bbf5c2..c64d6fc806 100644 --- a/api/controllers/console/evaluation/evaluation.py +++ b/api/controllers/console/evaluation/evaluation.py @@ -184,6 +184,13 @@ evaluation_default_metrics_response_model = console_ns.model( }, ) +evaluation_dataset_columns_response_model = console_ns.model( + "EvaluationDatasetColumnsResponse", + { + "columns": fields.List(fields.String), + }, +) + def get_evaluation_target[**P, R](view_func: Callable[P, R]) -> Callable[P, R]: """ @@ -326,9 +333,9 @@ class EvaluationDetailApi(Resource): return { "evaluation_model": config.evaluation_model, "evaluation_model_provider": config.evaluation_model_provider, - "default_metrics": config.default_metrics_list, + "default_metrics": EvaluationService.serialize_console_default_metrics(config.default_metrics_list), "customized_metrics": config.customized_metrics_dict, - "judgment_config": config.judgment_config_dict, + "judgment_config": EvaluationService.serialize_console_judgment_config(config.judgment_config_dict), } @console_ns.doc("save_evaluation_detail") @@ -364,9 +371,36 @@ class EvaluationDetailApi(Resource): return { "evaluation_model": config.evaluation_model, "evaluation_model_provider": config.evaluation_model_provider, - "default_metrics": config.default_metrics_list, + "default_metrics": EvaluationService.serialize_console_default_metrics(config.default_metrics_list), "customized_metrics": config.customized_metrics_dict, - "judgment_config": config.judgment_config_dict, + "judgment_config": EvaluationService.serialize_console_judgment_config(config.judgment_config_dict), + } + + +@console_ns.route("///evaluation/template-columns") +class EvaluationTemplateColumnsApi(Resource): + @console_ns.doc("get_evaluation_template_columns") + @console_ns.response(200, "Evaluation dataset columns resolved", evaluation_dataset_columns_response_model) + @console_ns.response(400, "Invalid request body") + @console_ns.response(404, "Target not found") + @setup_required + @login_required + @account_initialization_required + @get_evaluation_target + def post(self, target: Union[App, CustomizedSnippet], target_type: str): + """Return the dataset column names implied by the current evaluation config.""" + body = request.get_json(silent=True) or {} + try: + config_data = EvaluationConfigData.model_validate(body) + except Exception as e: + raise BadRequest(f"Invalid request body: {e}") + + return { + "columns": EvaluationService.get_dataset_column_names( + target=target, + target_type=target_type, + data=config_data, + ) } @@ -595,6 +629,8 @@ class EvaluationMetricsApi(Resource): """ result = {} for category in EvaluationCategory: + if category in EvaluationService.CONSOLE_DISABLED_CATEGORIES: + continue result[category.value] = EvaluationService.get_supported_metrics(category) return {"metrics": result} @@ -622,7 +658,11 @@ class EvaluationDefaultMetricsApi(Resource): target=target, target_type=target_type, ) - return {"default_metrics": [m.model_dump() for m in default_metrics]} + return { + "default_metrics": [ + m.model_dump() for m in EvaluationService.filter_console_default_metrics(default_metrics) + ] + } @console_ns.route("///evaluation/node-info") @@ -652,6 +692,20 @@ class EvaluationNodeInfoApi(Resource): target_type=target_type, metrics=metrics, ) + if not metrics: + result = { + "all": [ + node + for node in result.get("all", []) + if node.get("type") not in EvaluationService.CONSOLE_DISABLED_CATEGORIES + ] + } + else: + result = { + metric: nodes + for metric, nodes in result.items() + if metric not in EvaluationService.CONSOLE_DISABLED_METRICS + } return result @@ -664,7 +718,13 @@ class EvaluationAvailableMetricsApi(Resource): @account_initialization_required def get(self): """Return the centrally-defined list of evaluation metrics.""" - return {"metrics": EvaluationService.get_available_metrics()} + return { + "metrics": [ + metric + for metric in EvaluationService.get_available_metrics() + if metric not in EvaluationService.CONSOLE_DISABLED_METRICS + ] + } @console_ns.route("///evaluation/files/") diff --git a/api/core/evaluation/entities/evaluation_entity.py b/api/core/evaluation/entities/evaluation_entity.py index a87354b526..b4c7b5e3cb 100644 --- a/api/core/evaluation/entities/evaluation_entity.py +++ b/api/core/evaluation/entities/evaluation_entity.py @@ -1,3 +1,4 @@ +import json from enum import StrEnum from typing import Any @@ -164,9 +165,62 @@ class EvaluationItemInput(BaseModel): class EvaluationDatasetInput(BaseModel): + """Parsed dataset row used throughout evaluation execution. + + ``expected_output`` keeps backward compatibility with the original + single-reference template. When users upload node-specific reference + columns such as ``LLM 1 : expected_output``, they are stored in + ``expected_outputs`` and resolved by node title at execution time. + """ + index: int inputs: dict[str, Any] expected_output: str | None = None + expected_outputs: dict[str, str] = Field(default_factory=dict) + + def get_expected_output_for_node(self, node_title: str | None) -> str | None: + """Return the best matching reference answer for the given node title.""" + if node_title: + if node_title in self.expected_outputs: + return self.expected_outputs[node_title] + + if self.expected_output is not None: + return self.expected_output + + if len(self.expected_outputs) == 1: + return next(iter(self.expected_outputs.values())) + + return None + + def serialize_expected_output(self) -> str | None: + """Serialize references for persistence and API responses. + + Single-reference datasets stay unchanged, while multi-node references + are stored as JSON so history/detail APIs can still expose the full + uploaded payload without changing the database schema. + """ + if self.expected_output is not None and not self.expected_outputs: + return self.expected_output + + if not self.expected_outputs: + return None + + serialized_expected_outputs = dict(self.expected_outputs) + if self.expected_output is not None: + serialized_expected_outputs = {"expected_output": self.expected_output, **serialized_expected_outputs} + + return json.dumps(serialized_expected_outputs, ensure_ascii=False, sort_keys=True) + + def iter_expected_output_columns(self) -> list[tuple[str, str]]: + """Return uploaded expected-output columns in display order.""" + columns: list[tuple[str, str]] = [] + if self.expected_output is not None: + columns.append(("expected_output", self.expected_output)) + + for node_title, value in self.expected_outputs.items(): + columns.append((f"{node_title} : expected_output", value)) + + return columns class EvaluationItemResult(BaseModel): diff --git a/api/core/evaluation/entities/judgment_entity.py b/api/core/evaluation/entities/judgment_entity.py index 4a59879c06..dd7b2f9855 100644 --- a/api/core/evaluation/entities/judgment_entity.py +++ b/api/core/evaluation/entities/judgment_entity.py @@ -26,10 +26,19 @@ Typical usage:: from collections.abc import Sequence from typing import Any, Literal -from pydantic import BaseModel, Field +from pydantic import BaseModel, Field, field_validator from graphon.utils.condition.entities import SupportedComparisonOperator +COMPARISON_OPERATOR_ALIASES: dict[str, str] = { + "==": "=", + "!=": "≠", + ">=": "≥", + "<=": "≤", + "is null": "null", + "is not null": "not null", +} + class JudgmentCondition(BaseModel): """A single judgment condition that checks one metric value. @@ -48,6 +57,19 @@ class JudgmentCondition(BaseModel): comparison_operator: SupportedComparisonOperator value: str | Sequence[str] | bool | None = None + @field_validator("comparison_operator", mode="before") + @classmethod + def normalize_comparison_operator(cls, value: Any) -> Any: + """Accept common ASCII/API aliases for workflow comparison operators.""" + if not isinstance(value, str): + return value + + normalized_value = value.strip().lower() + alias = COMPARISON_OPERATOR_ALIASES.get(normalized_value) + if alias is not None: + return alias + return value.strip() + class JudgmentConfig(BaseModel): """A group of judgment conditions combined with a logical operator. diff --git a/api/core/evaluation/frameworks/ragas/ragas_evaluator.py b/api/core/evaluation/frameworks/ragas/ragas_evaluator.py index ec2320439d..50e8d4d15a 100644 --- a/api/core/evaluation/frameworks/ragas/ragas_evaluator.py +++ b/api/core/evaluation/frameworks/ragas/ragas_evaluator.py @@ -1,4 +1,5 @@ import logging +from importlib import import_module from typing import Any from core.evaluation.base_evaluation_instance import BaseEvaluationInstance @@ -138,12 +139,13 @@ class RagasEvaluator(BaseEvaluationInstance): ragas_metrics = self._build_ragas_metrics(requested_metrics) if not ragas_metrics: logger.warning("No valid RAGAS metrics found for: %s", requested_metrics) - return [EvaluationItemResult(index=item.index) for item in items] + return [EvaluationItemResult(index=item.index, actual_output=item.output) for item in items] try: result = ragas_evaluate( dataset=dataset, metrics=ragas_metrics, + llm=model_wrapper, ) results: list[EvaluationItemResult] = [] @@ -155,7 +157,7 @@ class RagasEvaluator(BaseEvaluationInstance): score = result_df.iloc[i][m_name] if score is not None and not (isinstance(score, float) and score != score): metrics.append(EvaluationMetric(name=m_name, value=float(score))) - results.append(EvaluationItemResult(index=item.index, metrics=metrics)) + results.append(EvaluationItemResult(index=item.index, metrics=metrics, actual_output=item.output)) return results except Exception: logger.exception("RAGAS evaluation failed, falling back to simple evaluation") @@ -216,7 +218,7 @@ class RagasEvaluator(BaseEvaluationInstance): metrics.append(EvaluationMetric(name=m_name, value=score)) except Exception: logger.exception("Failed to compute metric %s for item %d", m_name, item.index) - results.append(EvaluationItemResult(index=item.index, metrics=metrics)) + results.append(EvaluationItemResult(index=item.index, metrics=metrics, actual_output=item.output)) return results def _judge_with_llm( @@ -265,34 +267,33 @@ class RagasEvaluator(BaseEvaluationInstance): def _build_ragas_metrics(requested_metrics: list[str]) -> list[Any]: """Build RAGAS metric instances from canonical metric names.""" try: - from ragas.metrics.collections import ( - AnswerCorrectness, - AnswerRelevancy, - ContextPrecision, - ContextRecall, - ContextRelevance, - Faithfulness, - SemanticSimilarity, - ToolCallAccuracy, - ) + metrics_module = _import_ragas_metrics_module() # Maps canonical name → ragas metric class ragas_class_map: dict[str, Any] = { - EvaluationMetricName.FAITHFULNESS: Faithfulness, - EvaluationMetricName.ANSWER_RELEVANCY: AnswerRelevancy, - EvaluationMetricName.ANSWER_CORRECTNESS: AnswerCorrectness, - EvaluationMetricName.SEMANTIC_SIMILARITY: SemanticSimilarity, - EvaluationMetricName.CONTEXT_PRECISION: ContextPrecision, - EvaluationMetricName.CONTEXT_RECALL: ContextRecall, - EvaluationMetricName.CONTEXT_RELEVANCE: ContextRelevance, - EvaluationMetricName.TOOL_CORRECTNESS: ToolCallAccuracy, + EvaluationMetricName.FAITHFULNESS: getattr(metrics_module, "Faithfulness"), + EvaluationMetricName.ANSWER_RELEVANCY: getattr(metrics_module, "AnswerRelevancy"), + EvaluationMetricName.ANSWER_CORRECTNESS: getattr(metrics_module, "AnswerCorrectness"), + EvaluationMetricName.SEMANTIC_SIMILARITY: getattr(metrics_module, "SemanticSimilarity"), + EvaluationMetricName.CONTEXT_PRECISION: getattr(metrics_module, "ContextPrecision"), + EvaluationMetricName.CONTEXT_RECALL: getattr(metrics_module, "ContextRecall"), + EvaluationMetricName.CONTEXT_RELEVANCE: getattr(metrics_module, "ContextRelevance"), + EvaluationMetricName.TOOL_CORRECTNESS: getattr(metrics_module, "ToolCallAccuracy"), } metrics = [] for name in requested_metrics: metric_class = ragas_class_map.get(name) if metric_class: - metrics.append(metric_class()) + if name == EvaluationMetricName.ANSWER_CORRECTNESS: + # ragas answer_correctness blends factuality with semantic + # similarity. The latter requires an embeddings backend, + # which is not wired through Dify's evaluation stack yet. + # Keep the metric usable by relying on the factuality + # component only for now. + metrics.append(metric_class(weights=[1.0, 0.0], embeddings=_NoopRagasEmbeddings())) + else: + metrics.append(metric_class()) else: logger.warning("Metric '%s' is not supported by RAGAS, skipping", name) return metrics @@ -301,6 +302,38 @@ class RagasEvaluator(BaseEvaluationInstance): return [] +def _import_ragas_metrics_module() -> Any: + """Load ragas metric classes across supported ragas versions. + + ragas 0.3.x exposes metric classes from ``ragas.metrics`` while some older + versions used ``ragas.metrics.collections``. Support both so worker + environments do not silently drop all metrics because of a module path + mismatch. + """ + try: + return import_module("ragas.metrics") + except ImportError: + return import_module("ragas.metrics.collections") + + +class _NoopRagasEmbeddings: + """Placeholder embeddings for ragas metrics whose embedding branch is disabled. + + ragas eagerly injects a default embeddings backend for any metric that + subclasses ``MetricWithEmbeddings``. For answer_correctness we currently + disable the semantic-similarity weight, so no real embedding call should + happen. Supplying this placeholder keeps ragas from constructing its + default OpenAI embeddings client during setup. + """ + + async def aembed_query(self, text: str) -> list[float]: + del text + return [0.0] + + async def aembed_documents(self, texts: list[str]) -> list[list[float]]: + return [[0.0] for _ in texts] + + def _format_input(inputs: dict[str, Any], category: EvaluationCategory) -> str: """Extract the user-facing input string from the inputs dict.""" match category: diff --git a/api/core/evaluation/frameworks/ragas/ragas_model_wrapper.py b/api/core/evaluation/frameworks/ragas/ragas_model_wrapper.py index e0a5e14914..7f64a9e438 100644 --- a/api/core/evaluation/frameworks/ragas/ragas_model_wrapper.py +++ b/api/core/evaluation/frameworks/ragas/ragas_model_wrapper.py @@ -1,26 +1,53 @@ +import asyncio import logging from typing import Any +from langchain_core.messages import AIMessage, BaseMessage, HumanMessage, SystemMessage +from langchain_core.outputs import ChatGeneration, LLMResult +from langchain_core.prompt_values import PromptValue + +try: + from ragas.llms.base import BaseRagasLLM +except ImportError: + class BaseRagasLLM: # type: ignore[no-redef] + """Lightweight shim so the module stays importable without ragas installed.""" + + def __init__(self, *args: Any, **kwargs: Any) -> None: + del args, kwargs + + @staticmethod + def get_temperature(n: int) -> float: + return 0.3 if n > 1 else 1e-8 + logger = logging.getLogger(__name__) -class DifyModelWrapper: - """Wraps Dify's model invocation interface for use by RAGAS as an LLM judge. +class DifyModelWrapper(BaseRagasLLM): + """Bridge Dify model invocation to ragas and fallback LLM-as-judge flows. - RAGAS requires an LLM to compute certain metrics (faithfulness, answer_relevancy, etc.). - This wrapper bridges Dify's ModelInstance to a callable that RAGAS can use. + ragas can accept a custom ``BaseRagasLLM`` instance. Using one here keeps + evaluation requests on Dify's provider stack instead of falling back to + ragas' default OpenAI factory, which would require standalone environment + credentials and bypass tenant-scoped model configuration. """ - def __init__(self, model_provider: str, model_name: str, tenant_id: str): + model_provider: str + model_name: str + tenant_id: str + user_id: str | None + + def __init__(self, model_provider: str, model_name: str, tenant_id: str, user_id: str | None = None): + super().__init__() self.model_provider = model_provider self.model_name = model_name self.tenant_id = tenant_id + self.user_id = user_id def _get_model_instance(self) -> Any: - from core.model_manager import ModelManager - from core.model_runtime.entities.model_entities import ModelType + from core.plugin.impl.model_runtime_factory import create_plugin_model_manager + from graphon.model_runtime.entities.model_entities import ModelType - model_manager = ModelManager() + model_manager = create_plugin_model_manager(tenant_id=self.tenant_id, user_id=self.user_id) model_instance = model_manager.get_model_instance( tenant_id=self.tenant_id, provider=self.model_provider, @@ -30,11 +57,8 @@ class DifyModelWrapper: return model_instance def invoke(self, prompt: str) -> str: - """Invoke the model with a text prompt and return the text response.""" - from core.model_runtime.entities.message_entities import ( - SystemPromptMessage, - UserPromptMessage, - ) + """Invoke the configured Dify model with a plain-text evaluation prompt.""" + from graphon.model_runtime.entities.message_entities import SystemPromptMessage, UserPromptMessage model_instance = self._get_model_instance() result = model_instance.invoke_llm( @@ -42,7 +66,100 @@ class DifyModelWrapper: SystemPromptMessage(content="You are an evaluation judge. Answer precisely and concisely."), UserPromptMessage(content=prompt), ], - model_parameters={"temperature": 0.0, "max_tokens": 2048}, + model_parameters={"temperature": 0.0}, stream=False, ) return result.message.content + + def generate_text( + self, + prompt: PromptValue, + n: int = 1, + temperature: float = 1e-8, + stop: list[str] | None = None, + callbacks: Any = None, + ) -> LLMResult: + """Implement ragas' sync LLM interface on top of Dify's model runtime.""" + del callbacks # Dify's invocation path does not currently use LangChain callbacks here. + prompt_messages = _convert_prompt_value(prompt) + model_instance = self._get_model_instance() + + generations: list[list[ChatGeneration]] = [[]] + completions = max(1, n) + for _ in range(completions): + result = model_instance.invoke_llm( + prompt_messages=prompt_messages, + model_parameters={"temperature": temperature}, + stop=stop, + stream=False, + ) + text = result.message.content + generations[0].append( + ChatGeneration( + text=text, + message=AIMessage(content=text, response_metadata={"finish_reason": "stop"}), + generation_info={"finish_reason": "stop"}, + ) + ) + + return LLMResult(generations=generations) + + async def agenerate_text( + self, + prompt: PromptValue, + n: int = 1, + temperature: float | None = None, + stop: list[str] | None = None, + callbacks: Any = None, + ) -> LLMResult: + """Async ragas hook backed by the sync Dify invocation path.""" + return await asyncio.to_thread( + self.generate_text, + prompt, + n, + self.get_temperature(n) if temperature is None else temperature, + stop, + callbacks, + ) + + +def _convert_prompt_value(prompt: PromptValue) -> list[Any]: + """Translate LangChain prompt values into graphon prompt messages.""" + from graphon.model_runtime.entities.message_entities import ( + AssistantPromptMessage, + SystemPromptMessage, + UserPromptMessage, + ) + + prompt_messages: list[Any] = [] + for message in prompt.to_messages(): + content = _message_content_to_text(message) + if isinstance(message, SystemMessage): + prompt_messages.append(SystemPromptMessage(content=content)) + elif isinstance(message, AIMessage): + prompt_messages.append(AssistantPromptMessage(content=content)) + elif isinstance(message, HumanMessage): + prompt_messages.append(UserPromptMessage(content=content)) + else: + prompt_messages.append(UserPromptMessage(content=content)) + + return prompt_messages + + +def _message_content_to_text(message: BaseMessage) -> str: + """Flatten LangChain message content into a plain-text string for Dify.""" + if isinstance(message.content, str): + return message.content + + if isinstance(message.content, list): + parts: list[str] = [] + for block in message.content: + if isinstance(block, str): + parts.append(block) + elif isinstance(block, dict): + text = block.get("text") + if text: + parts.append(str(text)) + return "\n".join(part for part in parts if part) + + return str(message.content or "") diff --git a/api/core/evaluation/runners/agent_evaluation_runner.py b/api/core/evaluation/runners/agent_evaluation_runner.py index ef3bbe704c..aaf5d10e51 100644 --- a/api/core/evaluation/runners/agent_evaluation_runner.py +++ b/api/core/evaluation/runners/agent_evaluation_runner.py @@ -5,8 +5,10 @@ from typing import Any from core.evaluation.base_evaluation_instance import BaseEvaluationInstance from core.evaluation.entities.evaluation_entity import ( DefaultMetric, + EvaluationDatasetInput, EvaluationItemInput, EvaluationItemResult, + NodeInfo, ) from core.evaluation.runners.base_evaluation_runner import BaseEvaluationRunner from graphon.node_events import NodeRunResult @@ -27,6 +29,8 @@ class AgentEvaluationRunner(BaseEvaluationRunner): model_provider: str, model_name: str, tenant_id: str, + dataset_items: list[EvaluationDatasetInput] | None = None, + node_info: NodeInfo | None = None, ) -> list[EvaluationItemResult]: """Compute agent evaluation metrics.""" if not node_run_result_list: diff --git a/api/core/evaluation/runners/base_evaluation_runner.py b/api/core/evaluation/runners/base_evaluation_runner.py index 9046c2ddad..a1b5b29029 100644 --- a/api/core/evaluation/runners/base_evaluation_runner.py +++ b/api/core/evaluation/runners/base_evaluation_runner.py @@ -14,6 +14,8 @@ from abc import ABC, abstractmethod from core.evaluation.base_evaluation_instance import BaseEvaluationInstance from core.evaluation.entities.evaluation_entity import ( DefaultMetric, + EvaluationDatasetInput, + NodeInfo, EvaluationItemResult, ) from graphon.node_events import NodeRunResult @@ -40,6 +42,8 @@ class BaseEvaluationRunner(ABC): model_provider: str, model_name: str, tenant_id: str, + dataset_items: list[EvaluationDatasetInput] | None = None, + node_info: NodeInfo | None = None, ) -> list[EvaluationItemResult]: """Compute evaluation metrics on the collected results. diff --git a/api/core/evaluation/runners/llm_evaluation_runner.py b/api/core/evaluation/runners/llm_evaluation_runner.py index 4b1c244838..dc4f0e9975 100644 --- a/api/core/evaluation/runners/llm_evaluation_runner.py +++ b/api/core/evaluation/runners/llm_evaluation_runner.py @@ -1,12 +1,15 @@ import logging +import re from collections.abc import Mapping from typing import Any from core.evaluation.base_evaluation_instance import BaseEvaluationInstance from core.evaluation.entities.evaluation_entity import ( DefaultMetric, + EvaluationDatasetInput, EvaluationItemInput, EvaluationItemResult, + NodeInfo, ) from core.evaluation.runners.base_evaluation_runner import BaseEvaluationRunner from graphon.node_events import NodeRunResult @@ -27,11 +30,13 @@ class LLMEvaluationRunner(BaseEvaluationRunner): model_provider: str, model_name: str, tenant_id: str, + dataset_items: list[EvaluationDatasetInput] | None = None, + node_info: NodeInfo | None = None, ) -> list[EvaluationItemResult]: """Use the evaluation instance to compute LLM metrics.""" if not node_run_result_list: return [] - merged_items = self._merge_results_into_items(node_run_result_list) + merged_items = self._merge_results_into_items(node_run_result_list, dataset_items, node_info) return self.evaluation_instance.evaluate_llm( merged_items, [default_metric.metric], model_provider, model_name, tenant_id ) @@ -39,6 +44,8 @@ class LLMEvaluationRunner(BaseEvaluationRunner): @staticmethod def _merge_results_into_items( items: list[NodeRunResult], + dataset_items: list[EvaluationDatasetInput] | None = None, + node_info: NodeInfo | None = None, ) -> list[EvaluationItemInput]: """Create new items from NodeRunResult for ragas evaluation. @@ -48,13 +55,17 @@ class LLMEvaluationRunner(BaseEvaluationRunner): """ merged = [] for i, item in enumerate(items): - prompt = _format_prompts(item.process_data.get("prompts", [])) + prompts = item.process_data.get("prompts", []) + prompt = _format_prompts(prompts) output = _extract_llm_output(item.outputs) + dataset_item = dataset_items[i] if dataset_items and i < len(dataset_items) else None merged.append( EvaluationItemInput( index=i, inputs={"prompt": prompt}, output=output, + expected_output=dataset_item.get_expected_output_for_node(node_info.title) if dataset_item else None, + context=_extract_context_blocks(prompts), ) ) return merged @@ -81,3 +92,16 @@ def _extract_llm_output(outputs: Mapping[str, Any]) -> str: return str(outputs["answer"]) values = list(outputs.values()) return str(values[0]) if values else "" + + +def _extract_context_blocks(prompts: list[dict[str, Any]]) -> list[str] | None: + """Extract tagged context blocks from rendered prompts. + + Evaluation only treats prompt content wrapped in ``...`` + as retrieved evidence. This keeps faithfulness-style metrics opt-in and + avoids guessing which arbitrary prompt text should be considered context. + """ + prompt_text = "\n".join(str(prompt.get("text", "")) for prompt in prompts) + matches = re.findall(r"(.*?)", prompt_text, flags=re.DOTALL) + contexts = [match.strip() for match in matches if match.strip()] + return contexts or None diff --git a/api/core/evaluation/runners/retrieval_evaluation_runner.py b/api/core/evaluation/runners/retrieval_evaluation_runner.py index 66b8ab7360..4d6c3e1dc7 100644 --- a/api/core/evaluation/runners/retrieval_evaluation_runner.py +++ b/api/core/evaluation/runners/retrieval_evaluation_runner.py @@ -4,8 +4,10 @@ from typing import Any from core.evaluation.base_evaluation_instance import BaseEvaluationInstance from core.evaluation.entities.evaluation_entity import ( DefaultMetric, + EvaluationDatasetInput, EvaluationItemInput, EvaluationItemResult, + NodeInfo, ) from core.evaluation.runners.base_evaluation_runner import BaseEvaluationRunner from graphon.node_events import NodeRunResult @@ -26,6 +28,8 @@ class RetrievalEvaluationRunner(BaseEvaluationRunner): model_provider: str, model_name: str, tenant_id: str, + dataset_items: list[EvaluationDatasetInput] | None = None, + node_info: NodeInfo | None = None, ) -> list[EvaluationItemResult]: """Compute retrieval evaluation metrics.""" if not node_run_result_list: @@ -38,12 +42,14 @@ class RetrievalEvaluationRunner(BaseEvaluationRunner): result_list = outputs.get("result", []) contexts = [item.get("content", "") for item in result_list if item.get("content")] output = "\n---\n".join(contexts) + dataset_item = dataset_items[i] if dataset_items and i < len(dataset_items) else None merged_items.append( EvaluationItemInput( index=i, inputs={"query": query}, output=output, + expected_output=dataset_item.get_expected_output_for_node(node_info.title) if dataset_item else None, context=contexts, ) ) diff --git a/api/core/evaluation/runners/snippet_evaluation_runner.py b/api/core/evaluation/runners/snippet_evaluation_runner.py index bc516f9ee8..c72d3e1d05 100644 --- a/api/core/evaluation/runners/snippet_evaluation_runner.py +++ b/api/core/evaluation/runners/snippet_evaluation_runner.py @@ -11,8 +11,10 @@ from typing import Any from core.evaluation.base_evaluation_instance import BaseEvaluationInstance from core.evaluation.entities.evaluation_entity import ( DefaultMetric, + EvaluationDatasetInput, EvaluationItemInput, EvaluationItemResult, + NodeInfo, ) from core.evaluation.runners.base_evaluation_runner import BaseEvaluationRunner from graphon.node_events import NodeRunResult @@ -33,6 +35,8 @@ class SnippetEvaluationRunner(BaseEvaluationRunner): model_provider: str, model_name: str, tenant_id: str, + dataset_items: list[EvaluationDatasetInput] | None = None, + node_info: NodeInfo | None = None, ) -> list[EvaluationItemResult]: """Compute evaluation metrics for snippet outputs.""" if not node_run_result_list: diff --git a/api/core/evaluation/runners/workflow_evaluation_runner.py b/api/core/evaluation/runners/workflow_evaluation_runner.py index e1cc9defdb..bb4642c8ce 100644 --- a/api/core/evaluation/runners/workflow_evaluation_runner.py +++ b/api/core/evaluation/runners/workflow_evaluation_runner.py @@ -5,8 +5,10 @@ from typing import Any from core.evaluation.base_evaluation_instance import BaseEvaluationInstance from core.evaluation.entities.evaluation_entity import ( DefaultMetric, + EvaluationDatasetInput, EvaluationItemInput, EvaluationItemResult, + NodeInfo, ) from core.evaluation.runners.base_evaluation_runner import BaseEvaluationRunner from graphon.node_events import NodeRunResult @@ -27,6 +29,8 @@ class WorkflowEvaluationRunner(BaseEvaluationRunner): model_provider: str, model_name: str, tenant_id: str, + dataset_items: list[EvaluationDatasetInput] | None = None, + node_info: NodeInfo | None = None, ) -> list[EvaluationItemResult]: """Compute workflow evaluation metrics (end-to-end).""" if not node_run_result_list: diff --git a/api/docker/entrypoint.sh b/api/docker/entrypoint.sh index fc118df5bc..c2998c615c 100755 --- a/api/docker/entrypoint.sh +++ b/api/docker/entrypoint.sh @@ -35,10 +35,10 @@ if [[ "${MODE}" == "worker" ]]; then if [[ -z "${CELERY_QUEUES}" ]]; then if [[ "${EDITION}" == "CLOUD" ]]; then # Cloud edition: separate queues for dataset and trigger tasks - DEFAULT_QUEUES="api_token,dataset,dataset_summary,priority_dataset,priority_pipeline,pipeline,mail,ops_trace,app_deletion,plugin,workflow_storage,conversation,workflow_professional,workflow_team,workflow_sandbox,schedule_poller,schedule_executor,triggered_workflow_dispatcher,trigger_refresh_publisher,trigger_refresh_executor,retention,workflow_based_app_execution" + DEFAULT_QUEUES="api_token,dataset,dataset_summary,priority_dataset,priority_pipeline,pipeline,mail,ops_trace,app_deletion,plugin,workflow_storage,conversation,workflow_professional,workflow_team,workflow_sandbox,schedule_poller,schedule_executor,triggered_workflow_dispatcher,trigger_refresh_publisher,trigger_refresh_executor,retention,workflow_based_app_execution,evaluation" else # Community edition (SELF_HOSTED): dataset, pipeline and workflow have separate queues - DEFAULT_QUEUES="api_token,dataset,dataset_summary,priority_dataset,priority_pipeline,pipeline,mail,ops_trace,app_deletion,plugin,workflow_storage,conversation,workflow,schedule_poller,schedule_executor,triggered_workflow_dispatcher,trigger_refresh_publisher,trigger_refresh_executor,retention,workflow_based_app_execution" + DEFAULT_QUEUES="api_token,dataset,dataset_summary,priority_dataset,priority_pipeline,pipeline,mail,ops_trace,app_deletion,plugin,workflow_storage,conversation,workflow,schedule_poller,schedule_executor,triggered_workflow_dispatcher,trigger_refresh_publisher,trigger_refresh_executor,retention,workflow_based_app_execution,evaluation" fi else DEFAULT_QUEUES="${CELERY_QUEUES}" diff --git a/api/extensions/ext_celery.py b/api/extensions/ext_celery.py index 340f514fcc..1bec0fbc77 100644 --- a/api/extensions/ext_celery.py +++ b/api/extensions/ext_celery.py @@ -148,6 +148,7 @@ def init_app(app: DifyApp) -> Celery: "tasks.trigger_processing_tasks", # async trigger processing "tasks.generate_summary_index_task", # summary index generation "tasks.regenerate_summary_index_task", # summary index regeneration + "tasks.evaluation_task", # evaluation run execution ] day = dify_config.CELERY_BEAT_SCHEDULER_TIME diff --git a/api/services/evaluation_service.py b/api/services/evaluation_service.py index b5c9d682d5..97143efeaa 100644 --- a/api/services/evaluation_service.py +++ b/api/services/evaluation_service.py @@ -63,6 +63,17 @@ class EvaluationService: # Excluded app modes that don't support evaluation templates EXCLUDED_APP_MODES = {AppMode.RAG_PIPELINE} + CONSOLE_DISABLED_CATEGORIES = {EvaluationCategory.AGENT} + CONSOLE_DISABLED_METRICS = { + EvaluationMetricName.TOOL_CORRECTNESS.value, + EvaluationMetricName.TASK_COMPLETION.value, + } + METRICS_REQUIRING_EXPECTED_OUTPUT = { + EvaluationMetricName.ANSWER_CORRECTNESS.value, + EvaluationMetricName.SEMANTIC_SIMILARITY.value, + EvaluationMetricName.CONTEXT_PRECISION.value, + EvaluationMetricName.CONTEXT_RECALL.value, + } @classmethod def generate_dataset_template( @@ -400,6 +411,7 @@ class EvaluationService: target_id=target_id, evaluation_config_id=config.id, status=EvaluationRunStatus.PENDING, + dataset_file_id=run_request.file_id, total_items=len(items), created_by=account_id, ) @@ -588,11 +600,126 @@ class EvaluationService: def get_supported_metrics(cls, category: EvaluationCategory) -> list[str]: return EvaluationManager.get_supported_metrics(category) - @staticmethod - def get_available_metrics() -> list[str]: + @classmethod + def get_available_metrics(cls) -> list[str]: """Return the centrally-defined list of evaluation metrics.""" return [m.value for m in EvaluationMetricName] + @classmethod + def filter_console_default_metrics( + cls, + default_metrics: list[DefaultMetric | Mapping[str, Any]], + ) -> list[DefaultMetric]: + """Drop agent-only metrics/nodes from console-facing evaluation configs.""" + filtered: list[DefaultMetric] = [] + for raw_metric in default_metrics: + metric = raw_metric if isinstance(raw_metric, DefaultMetric) else DefaultMetric.model_validate(raw_metric) + if metric.metric in cls.CONSOLE_DISABLED_METRICS: + continue + + visible_nodes = [node for node in metric.node_info_list if node.type not in cls.CONSOLE_DISABLED_CATEGORIES] + if not visible_nodes: + continue + + filtered.append(metric.model_copy(update={"node_info_list": visible_nodes})) + return filtered + + @classmethod + def filter_console_judgment_config( + cls, + judgment_config: JudgmentConfig | Mapping[str, Any] | None, + ) -> JudgmentConfig | None: + """Strip judgment conditions that reference metrics hidden from console evaluation.""" + if judgment_config is None: + return None + + config = ( + judgment_config + if isinstance(judgment_config, JudgmentConfig) + else JudgmentConfig.model_validate(judgment_config) + ) + visible_conditions = [ + condition + for condition in config.conditions + if len(condition.variable_selector) < 2 or condition.variable_selector[1] not in cls.CONSOLE_DISABLED_METRICS + ] + if not visible_conditions: + return None + return config.model_copy(update={"conditions": visible_conditions}) + + @classmethod + def serialize_console_default_metrics( + cls, + default_metrics: list[Mapping[str, Any]], + ) -> list[dict[str, Any]]: + """Return sanitized default metrics payload for console responses.""" + return [metric.model_dump() for metric in cls.filter_console_default_metrics(default_metrics)] + + @classmethod + def serialize_console_judgment_config( + cls, + judgment_config: Mapping[str, Any] | None, + ) -> dict[str, Any] | None: + """Return sanitized judgment config payload for console responses.""" + filtered = cls.filter_console_judgment_config(judgment_config) + return filtered.model_dump() if filtered else None + + @classmethod + def get_dataset_column_names( + cls, + target: Union[App, CustomizedSnippet], + target_type: str, + data: EvaluationConfigData, + ) -> list[str]: + """Build dataset column names from target inputs and the selected evaluation config.""" + input_columns = cls._get_target_input_column_names(target, target_type) + expected_output_columns = cls._get_expected_output_column_names(data.default_metrics) + return ["index", *input_columns, *expected_output_columns] + + @classmethod + def _get_target_input_column_names( + cls, + target: Union[App, CustomizedSnippet], + target_type: str, + ) -> list[str]: + """Resolve user-input variables for the target in workflow order.""" + if target_type == EvaluationTargetType.APPS.value and isinstance(target, App): + input_fields = cls._get_app_input_fields(target) + elif target_type == EvaluationTargetType.SNIPPETS.value and isinstance(target, CustomizedSnippet): + input_fields = cls._get_snippet_input_fields(target) + else: + raise ValueError(f"Unsupported target type: {target_type}") + + columns: list[str] = [] + seen: set[str] = set() + for field in input_fields: + column_name = str(field.get("variable") or field.get("label") or "").strip() + if not column_name or column_name in seen: + continue + seen.add(column_name) + columns.append(column_name) + return columns + + @classmethod + def _get_expected_output_column_names( + cls, + default_metrics: list[DefaultMetric | Mapping[str, Any]], + ) -> list[str]: + """Build one expected_output column per visible node that needs a reference answer.""" + columns: list[str] = [] + seen: set[str] = set() + for metric in cls.filter_console_default_metrics(default_metrics): + if metric.metric not in cls.METRICS_REQUIRING_EXPECTED_OUTPUT: + continue + for node_info in metric.node_info_list: + node_title = node_info.title or node_info.node_id + column_name = f"{node_title} : expected_output" + if column_name in seen: + continue + seen.add(column_name) + columns.append(column_name) + return columns + @classmethod def _nodes_for_metrics_from_workflow( cls, @@ -602,6 +729,8 @@ class EvaluationService: node_type_to_nodes: dict[str, list[dict[str, str]]] = {} for node_id, node_data in workflow.walk_nodes(): ntype = node_data.get("type", "") + if ntype in cls.CONSOLE_DISABLED_CATEGORIES: + continue node_type_to_nodes.setdefault(ntype, []).append( NodeInfo(node_id=node_id, type=ntype, title=node_data.get("title", "")).model_dump() ) @@ -682,6 +811,7 @@ class EvaluationService: all_nodes = [ NodeInfo(node_id=node_id, type=node_data.get("type", ""), title=node_data.get("title", "")).model_dump() for node_id, node_data in workflow.walk_nodes() + if node_data.get("type", "") not in cls.CONSOLE_DISABLED_CATEGORIES ] return {"all": all_nodes} @@ -942,7 +1072,16 @@ class EvaluationService: @classmethod def _parse_dataset(cls, file_content: bytes, filename: str) -> list[EvaluationDatasetInput]: - """Parse evaluation dataset from CSV or XLSX content.""" + """Parse evaluation dataset from CSV or XLSX content. + + Supported schemas: + + - ``index,,expected_output`` for the legacy single-reference flow + - ``index,, : expected_output`` for node-specific references + + Column parsing treats the *last* ``:`` as the separator so node titles + can themselves contain ``:`` characters. + """ filename_lower = filename.lower() if filename_lower.endswith(".csv"): return cls._parse_csv_dataset(file_content) @@ -964,35 +1103,7 @@ class EvaluationService: if not headers or headers[0].lower() != "index": raise EvaluationDatasetInvalidError("First column header must be 'index'.") - input_headers = headers[1:] # Skip 'index' - items = [] - for row_idx, row in enumerate(rows[1:], start=1): - values = list(row) - if all(v is None or str(v).strip() == "" for v in values): - continue # Skip empty rows - - index_val = values[0] if values else row_idx - try: - index = int(str(index_val)) - except (TypeError, ValueError): - index = row_idx - - inputs: dict[str, Any] = {} - for col_idx, header in enumerate(input_headers): - val = values[col_idx + 1] if col_idx + 1 < len(values) else None - inputs[header] = str(val) if val is not None else "" - - # Extract expected_output column into dedicated field - expected_output = inputs.pop("expected_output", None) - - items.append( - EvaluationDatasetInput( - index=index, - inputs=inputs, - expected_output=expected_output, - ) - ) - + items = cls._rows_to_dataset_items(rows) wb.close() return items @@ -1002,7 +1113,8 @@ class EvaluationService: CSV follows the same schema as XLSX: the first column must be `index`, remaining columns become inputs, - and `expected_output` is extracted into a dedicated field. + and expected-output columns can be either the legacy + ``expected_output`` or node-scoped `` : expected_output``. """ try: decoded = csv_content.decode("utf-8-sig") @@ -1018,11 +1130,19 @@ class EvaluationService: if not headers or headers[0].lower() != "index": raise EvaluationDatasetInvalidError("First column header must be 'index'.") + return cls._rows_to_dataset_items(rows) + + @classmethod + def _rows_to_dataset_items(cls, rows: list[list[Any] | tuple[Any, ...]]) -> list[EvaluationDatasetInput]: + """Normalize spreadsheet rows into dataset items with index validation.""" + headers = [str(h).strip() if h is not None else "" for h in rows[0]] input_headers = headers[1:] + items: list[EvaluationDatasetInput] = [] + seen_indices: set[int] = set() for row_idx, row in enumerate(rows[1:], start=1): values = list(row) - if all(str(v).strip() == "" for v in values): + if all(v is None or str(v).strip() == "" for v in values): continue index_val = values[0] if values else row_idx @@ -1031,16 +1151,59 @@ class EvaluationService: except (TypeError, ValueError): index = row_idx + if index in seen_indices: + raise EvaluationDatasetInvalidError(f"Dataset index '{index}' is duplicated.") + seen_indices.add(index) + inputs: dict[str, Any] = {} + expected_output: str | None = None + expected_outputs: dict[str, str] = {} for col_idx, header in enumerate(input_headers): val = values[col_idx + 1] if col_idx + 1 < len(values) else None - inputs[header] = str(val) if val is not None else "" + string_value = str(val) if val is not None else "" + expected_output_target = cls._parse_expected_output_header(header) + if expected_output_target is None: + inputs[header] = string_value + continue - expected_output = inputs.pop("expected_output", None) - items.append(EvaluationDatasetInput(index=index, inputs=inputs, expected_output=expected_output)) + if expected_output_target == "": + expected_output = string_value + else: + expected_outputs[expected_output_target] = string_value + + items.append( + EvaluationDatasetInput( + index=index, + inputs=inputs, + expected_output=expected_output, + expected_outputs=expected_outputs, + ) + ) return items + @staticmethod + def _parse_expected_output_header(header: str) -> str | None: + """Return the node title for an expected-output column, if any. + + ``expected_output`` keeps the legacy single-reference behaviour and + returns an empty string marker. Node-specific columns split on the last + ``:`` so titles such as ``RAG: final judge`` remain valid. + """ + normalized_header = header.strip() + if normalized_header == "expected_output": + return "" + + if ":" not in normalized_header: + return None + + node_title, suffix = normalized_header.rsplit(":", 1) + if suffix.strip() != "expected_output": + return None + + normalized_title = node_title.strip() + return normalized_title or None + @classmethod def _build_stub_results( cls, @@ -1095,11 +1258,20 @@ class EvaluationService: "value_type": output_field.value_type, "customized": True, }, + node_info=NodeInfo( + node_id=run_request.customized_metrics.evaluation_workflow_id, + type="customized", + title="customized", + ), ) ) judgment = cls._evaluate_stub_judgment(metrics, run_request.judgment_config) - actual_output = item.expected_output or cls._build_stub_output(item_position, item.inputs) + if isinstance(item, EvaluationDatasetInput): + actual_output = item.get_expected_output_for_node(None) + else: + actual_output = getattr(item, "expected_output", None) + actual_output = actual_output or cls._build_stub_output(item_position, item.inputs) results.append( EvaluationItemResult( diff --git a/api/tasks/evaluation_task.py b/api/tasks/evaluation_task.py index 556c162a42..7e0f0bc8f9 100644 --- a/api/tasks/evaluation_task.py +++ b/api/tasks/evaluation_task.py @@ -67,7 +67,12 @@ def run_evaluation(run_data_dict: dict[str, Any]) -> None: def _execute_evaluation(session: Any, run_data: EvaluationRunData) -> None: - """Core evaluation execution logic.""" + """Core evaluation execution logic. + + The task stores progress using dataset row indices instead of positional + offsets so retries, sparse runner outputs, and result-file generation all + stay aligned with the user-uploaded dataset. + """ evaluation_run = session.query(EvaluationRun).filter_by(id=run_data.evaluation_run_id).first() if not evaluation_run: logger.error("EvaluationRun %s not found", run_data.evaluation_run_id) @@ -136,6 +141,8 @@ def _execute_evaluation(session: Any, run_data: EvaluationRunData) -> None: if evaluation_run: evaluation_run.status = EvaluationRunStatus.COMPLETED evaluation_run.completed_at = naive_utc_now() + evaluation_run.completed_items = sum(1 for result in results if not result.error) + evaluation_run.failed_items = len(run_data.input_list) - evaluation_run.completed_items evaluation_run.metrics_summary = json.dumps(metrics_summary) if result_file_id: evaluation_run.result_file_id = result_file_id @@ -166,12 +173,14 @@ def _execute_evaluation_runner( for default_metric in run_data.default_metrics: for node_info in default_metric.node_info_list: node_run_result_list: list[NodeRunResult] = [] - item_indices: list[int] = [] + dataset_items: list[EvaluationDatasetInput] = [] + dataset_indices: list[int] = [] for i, mapping in enumerate(node_run_result_mapping_list): node_result = mapping.get(node_info.node_id) if node_result is not None: node_run_result_list.append(node_result) - item_indices.append(i) + dataset_items.append(run_data.input_list[i]) + dataset_indices.append(run_data.input_list[i].index) if not node_run_result_list: continue @@ -184,6 +193,8 @@ def _execute_evaluation_runner( model_provider=run_data.evaluation_model_provider, model_name=run_data.evaluation_model, tenant_id=run_data.tenant_id, + dataset_items=dataset_items, + node_info=node_info, ) except Exception: logger.exception( @@ -191,7 +202,7 @@ def _execute_evaluation_runner( ) continue - _stamp_and_merge(evaluated, item_indices, node_info, results_by_index) + _stamp_and_merge(evaluated, dataset_indices, node_info, results_by_index) # Phase 2: Customized metrics if run_data.customized_metrics: @@ -201,14 +212,16 @@ def _execute_evaluation_runner( customized_metrics=run_data.customized_metrics, tenant_id=run_data.tenant_id, ) - for result in customized_results: - _merge_result(results_by_index, result.index, result) + _merge_customized_results(results_by_index, customized_results, run_data.input_list) except Exception: logger.exception("Failed customized metrics for run %s", run_data.evaluation_run_id) - results = list(results_by_index.values()) - # Phase 3: Judgment + results = _finalize_results( + input_list=run_data.input_list, + results_by_index=results_by_index, + missing_errors=_build_missing_result_errors(run_data.input_list, node_run_result_mapping_list), + ) if run_data.judgment_config: results = _apply_judgment(results, run_data.judgment_config) @@ -249,13 +262,17 @@ def _execute_retrieval_test( model_provider=run_data.evaluation_model_provider, model_name=run_data.evaluation_model, tenant_id=run_data.tenant_id, + dataset_items=run_data.input_list, ) - item_indices = list(range(len(node_run_result_list))) - _stamp_and_merge(evaluated, item_indices, None, results_by_index) + dataset_indices = [item.index for item in run_data.input_list] + _stamp_and_merge(evaluated, dataset_indices, None, results_by_index) except Exception: logger.exception("Failed retrieval metrics for run %s", run_data.evaluation_run_id) - results = list(results_by_index.values()) + results = _finalize_results( + input_list=run_data.input_list, + results_by_index=results_by_index, + ) if run_data.judgment_config: results = _apply_judgment(results, run_data.judgment_config) @@ -272,17 +289,17 @@ def _execute_retrieval_test( def _stamp_and_merge( evaluated: list[EvaluationItemResult], - item_indices: list[int], + dataset_indices: list[int], node_info: NodeInfo | None, results_by_index: dict[int, EvaluationItemResult], ) -> None: - """Attach node_info to each metric and merge into results_by_index.""" + """Attach node_info to each metric and remap positional results to dataset indices.""" for result in evaluated: - original_index = item_indices[result.index] + dataset_index = dataset_indices[result.index] if node_info is not None: for metric in result.metrics: metric.node_info = node_info - _merge_result(results_by_index, original_index, result) + _merge_result(results_by_index, dataset_index, result) def _merge_result( @@ -295,13 +312,81 @@ def _merge_result( if existing: merged_metrics = existing.metrics + new_result.metrics actual = existing.actual_output or new_result.actual_output + merged_metadata = {**existing.metadata, **new_result.metadata} + merged_error = new_result.error + if merged_error is None and new_result.metrics: + merged_error = None + elif merged_error is None: + merged_error = existing.error results_by_index[index] = existing.model_copy( - update={"metrics": merged_metrics, "actual_output": actual} + update={ + "metrics": merged_metrics, + "actual_output": actual, + "metadata": merged_metadata, + "error": merged_error, + } ) else: results_by_index[index] = new_result.model_copy(update={"index": index}) +def _merge_customized_results( + results_by_index: dict[int, EvaluationItemResult], + customized_results: list[EvaluationItemResult], + input_list: list[EvaluationDatasetInput], +) -> None: + """Remap customized workflow results from positional indices to dataset indices.""" + for result in customized_results: + if 0 <= result.index < len(input_list): + dataset_index = input_list[result.index].index + else: + dataset_index = result.index + _merge_result(results_by_index, dataset_index, result) + + +def _build_missing_result_errors( + input_list: list[EvaluationDatasetInput], + node_run_result_mapping_list: list[dict[str, NodeRunResult]], +) -> dict[int, str]: + """Describe why a dataset row produced no merged evaluation result.""" + errors: dict[int, str] = {} + for item, node_results in zip(input_list, node_run_result_mapping_list): + if node_results: + errors[item.index] = "No evaluation metrics were generated for this row." + else: + errors[item.index] = "Target execution produced no node results for this row." + return errors + + +def _finalize_results( + input_list: list[EvaluationDatasetInput], + results_by_index: dict[int, EvaluationItemResult], + missing_errors: dict[int, str] | None = None, +) -> list[EvaluationItemResult]: + """Return ordered results with one item per dataset row. + + Runners can legitimately emit sparse outputs when a target execution or a + metric batch fails. Before persistence we materialize a placeholder result + for every missing dataset row so history/detail APIs always remain + one-input-row to one-result-row. + """ + finalized: list[EvaluationItemResult] = [] + for item in input_list: + result = results_by_index.get(item.index) + if result is not None: + finalized.append(result) + continue + + finalized.append( + EvaluationItemResult( + index=item.index, + error=(missing_errors or {}).get(item.index, "No evaluation result was generated for this row."), + ) + ) + + return finalized + + def _apply_judgment( results: list[EvaluationItemResult], judgment_config: JudgmentConfig, @@ -328,17 +413,17 @@ def _persist_results( workflow_run_id_map: dict[int, str] | None = None, ) -> None: """Persist evaluation results — one EvaluationRunItem per test-data row.""" - dataset_map = {item.index: item for item in input_list} + result_map = {result.index: result for result in results} wf_map = workflow_run_id_map or {} - for result in results: - item_input = dataset_map.get(result.index) + for item_input in input_list: + result = result_map.get(item_input.index, EvaluationItemResult(index=item_input.index, error="Missing result")) run_item = EvaluationRunItem( evaluation_run_id=evaluation_run_id, - workflow_run_id=wf_map.get(result.index), - item_index=result.index, - inputs=json.dumps(item_input.inputs) if item_input else None, - expected_output=item_input.expected_output if item_input else None, + workflow_run_id=wf_map.get(item_input.index), + item_index=item_input.index, + inputs=json.dumps(item_input.inputs), + expected_output=item_input.serialize_expected_output(), actual_output=result.actual_output, metrics=json.dumps([m.model_dump() for m in result.metrics]) if result.metrics else None, judgment=json.dumps(result.judgment.model_dump()) if result.judgment else None, @@ -450,12 +535,16 @@ def _generate_result_xlsx( if key not in input_keys: input_keys.append(key) + expected_output_headers: list[str] = [] + for item in input_list: + for header, _ in item.iter_expected_output_columns(): + if header not in expected_output_headers: + expected_output_headers.append(header) + has_judgment = any(bool(r.judgment.condition_results) for r in results) judgment_headers = ["judgment"] if has_judgment else [] - headers = ( - ["index"] + input_keys + ["expected_output", "actual_output"] + all_metric_names + judgment_headers + ["error"] - ) + headers = ["index"] + input_keys + expected_output_headers + ["actual_output"] + all_metric_names + judgment_headers + ["error"] for col_idx, header in enumerate(headers, start=1): cell = ws.cell(row=1, column=col_idx, value=header) @@ -482,8 +571,9 @@ def _generate_result_xlsx( ws.cell(row=row_idx, column=col, value=str(val)).border = thin_border col += 1 - ws.cell(row=row_idx, column=col, value=item.expected_output or "").border = thin_border - col += 1 + for header in expected_output_headers: + ws.cell(row=row_idx, column=col, value=_get_expected_output_cell_value(item, header)).border = thin_border + col += 1 ws.cell(row=row_idx, column=col, value=result.actual_output if result else "").border = thin_border col += 1 @@ -510,6 +600,14 @@ def _generate_result_xlsx( return output.getvalue() +def _get_expected_output_cell_value(item: EvaluationDatasetInput, header: str) -> str: + """Return the expected-output cell value for a generated result file header.""" + for column_header, value in item.iter_expected_output_columns(): + if column_header == header: + return value + return "" + + def _store_result_file( tenant_id: str, run_id: str, diff --git a/api/tests/unit_tests/core/evaluation/judgment/test_processor.py b/api/tests/unit_tests/core/evaluation/judgment/test_processor.py index 6f4cdc6708..243e38ffb2 100644 --- a/api/tests/unit_tests/core/evaluation/judgment/test_processor.py +++ b/api/tests/unit_tests/core/evaluation/judgment/test_processor.py @@ -143,3 +143,14 @@ def test_evaluate_string_contains_operator() -> None: ) assert result.passed is True + + +def test_judgment_condition_accepts_ascii_operator_aliases() -> None: + """Request payloads may use ASCII operators such as ``>=`` and should normalize.""" + condition = JudgmentCondition( + variable_selector=["node_a", "score"], + comparison_operator=">=", + value="0.8", + ) + + assert condition.comparison_operator == "≥" diff --git a/api/tests/unit_tests/tasks/test_evaluation_task.py b/api/tests/unit_tests/tasks/test_evaluation_task.py index 6922ec522e..9e65582d25 100644 --- a/api/tests/unit_tests/tasks/test_evaluation_task.py +++ b/api/tests/unit_tests/tasks/test_evaluation_task.py @@ -1,12 +1,27 @@ """Unit tests for evaluation task helpers.""" +import io + +from openpyxl import load_workbook + +from core.evaluation.entities.evaluation_entity import EvaluationDatasetInput from core.evaluation.entities.evaluation_entity import EvaluationItemResult, EvaluationMetric, NodeInfo from core.evaluation.entities.judgment_entity import ( JudgmentCondition, JudgmentConfig, JudgmentResult, ) -from tasks.evaluation_task import _compute_metrics_summary, _merge_result, _stamp_and_merge +from graphon.node_events import NodeRunResult +from tasks.evaluation_task import ( + _apply_judgment, + _build_missing_result_errors, + _compute_metrics_summary, + _finalize_results, + _generate_result_xlsx, + _merge_customized_results, + _merge_result, + _stamp_and_merge, +) _NODE_INFO = NodeInfo(node_id="llm_1", type="llm", title="LLM Node") @@ -95,3 +110,112 @@ def test_stamp_and_merge_attaches_node_info() -> None: assert metric.node_info is not None assert metric.node_info.node_id == "llm_1" assert metric.node_info.type == "llm" + + +def test_finalize_results_materializes_missing_dataset_rows() -> None: + input_list = [ + EvaluationDatasetInput(index=101, inputs={"query": "first"}), + EvaluationDatasetInput(index=205, inputs={"query": "second"}), + ] + results_by_index = { + 205: EvaluationItemResult(index=205, metrics=[EvaluationMetric(name="faithfulness", value=0.9)]) + } + + finalized = _finalize_results( + input_list=input_list, + results_by_index=results_by_index, + missing_errors={101: "Target execution produced no node results for this row."}, + ) + + assert [result.index for result in finalized] == [101, 205] + assert finalized[0].error == "Target execution produced no node results for this row." + assert finalized[1].metrics[0].name == "faithfulness" + + +def test_build_missing_result_errors_marks_empty_node_runs() -> None: + input_list = [ + EvaluationDatasetInput(index=1, inputs={"query": "hello"}), + EvaluationDatasetInput(index=2, inputs={"query": "world"}), + ] + node_run_results = [ + {}, + {"llm-node": NodeRunResult(outputs={"text": "answer"})}, + ] + + errors = _build_missing_result_errors(input_list, node_run_results) + + assert errors == { + 1: "Target execution produced no node results for this row.", + 2: "No evaluation metrics were generated for this row.", + } + + +def test_apply_judgment_supports_customized_metric_scope() -> None: + judgment_config = JudgmentConfig( + logical_operator="and", + conditions=[ + JudgmentCondition( + variable_selector=["workflow-app-1", "score"], + comparison_operator="≥", + value="0.8", + ) + ], + ) + results = [ + EvaluationItemResult( + index=1, + metrics=[ + EvaluationMetric( + name="score", + value=0.91, + node_info=NodeInfo(node_id="workflow-app-1", type="customized", title="customized"), + ) + ], + ) + ] + + judged = _apply_judgment(results, judgment_config) + + assert judged[0].judgment.passed is True + + +def test_merge_customized_results_remaps_positional_indices() -> None: + results_by_index: dict[int, EvaluationItemResult] = {} + input_list = [ + EvaluationDatasetInput(index=101, inputs={"query": "first"}), + EvaluationDatasetInput(index=205, inputs={"query": "second"}), + ] + customized_results = [ + EvaluationItemResult(index=1, metrics=[EvaluationMetric(name="score", value=0.88)]), + ] + + _merge_customized_results(results_by_index, customized_results, input_list) + + assert list(results_by_index.keys()) == [205] + assert results_by_index[205].metrics[0].name == "score" + + +def test_generate_result_xlsx_preserves_multiple_expected_output_columns() -> None: + input_list = [ + EvaluationDatasetInput( + index=1, + inputs={"query": "hello"}, + expected_outputs={"llm1": "world", "knowledge1": "chunk"}, + ) + ] + results = [EvaluationItemResult(index=1, actual_output="answer")] + + content = _generate_result_xlsx(input_list, results) + workbook = load_workbook(io.BytesIO(content)) + worksheet = workbook.active + + headers = [cell.value for cell in worksheet[1]] + + assert headers == [ + "index", + "query", + "llm1 : expected_output", + "knowledge1 : expected_output", + "actual_output", + "error", + ] diff --git a/dev/start-worker b/dev/start-worker index 8baa36f1ed..932cd2a9e2 100755 --- a/dev/start-worker +++ b/dev/start-worker @@ -107,10 +107,10 @@ if [[ -z "${QUEUES}" ]]; then # Configure queues based on edition if [[ "${EDITION}" == "CLOUD" ]]; then # Cloud edition: separate queues for dataset and trigger tasks - QUEUES="dataset,dataset_summary,priority_dataset,priority_pipeline,pipeline,mail,ops_trace,app_deletion,plugin,workflow_storage,conversation,workflow_professional,workflow_team,workflow_sandbox,schedule_poller,schedule_executor,triggered_workflow_dispatcher,trigger_refresh_executor,retention,workflow_based_app_execution" + QUEUES="dataset,dataset_summary,priority_dataset,priority_pipeline,pipeline,mail,ops_trace,app_deletion,plugin,workflow_storage,conversation,workflow_professional,workflow_team,workflow_sandbox,schedule_poller,schedule_executor,triggered_workflow_dispatcher,trigger_refresh_executor,retention,workflow_based_app_execution,evaluation" else # Community edition (SELF_HOSTED): dataset and workflow have separate queues - QUEUES="dataset,dataset_summary,priority_dataset,priority_pipeline,pipeline,mail,ops_trace,app_deletion,plugin,workflow_storage,conversation,workflow,schedule_poller,schedule_executor,triggered_workflow_dispatcher,trigger_refresh_executor,retention,workflow_based_app_execution" + QUEUES="dataset,dataset_summary,priority_dataset,priority_pipeline,pipeline,mail,ops_trace,app_deletion,plugin,workflow_storage,conversation,workflow,schedule_poller,schedule_executor,triggered_workflow_dispatcher,trigger_refresh_executor,retention,workflow_based_app_execution,evaluation" fi echo "No queues specified, using edition-based defaults: ${QUEUES}"