diff --git a/api/core/evaluation/entities/evaluation_entity.py b/api/core/evaluation/entities/evaluation_entity.py index 96a2cf461b..39668be1fb 100644 --- a/api/core/evaluation/entities/evaluation_entity.py +++ b/api/core/evaluation/entities/evaluation_entity.py @@ -16,7 +16,7 @@ class EvaluationCategory(StrEnum): class EvaluationMetric(BaseModel): name: str - score: float + value: Any details: dict[str, Any] = Field(default_factory=dict) @@ -31,8 +31,6 @@ class EvaluationItemResult(BaseModel): index: int actual_output: str | None = None metrics: list[EvaluationMetric] = Field(default_factory=list) - judgment: JudgmentResult | None = None - metadata: dict[str, Any] = Field(default_factory=dict) error: str | None = None @property @@ -85,10 +83,9 @@ class EvaluationRunData(BaseModel): tenant_id: str target_type: str target_id: str - evaluation_category: EvaluationCategory evaluation_model_provider: str evaluation_model: str - default_metrics: list[dict[str, Any]] = Field(default_factory=list) - customized_metrics: dict[str, Any] | None = None + default_metrics: list[DefaultMetric] = Field(default_factory=list) + customized_metrics: CustomizedMetrics | None = None judgment_config: JudgmentConfig | None = None - items: list[EvaluationItemInput] + input_list: list[dict] diff --git a/api/core/evaluation/runners/base_evaluation_runner.py b/api/core/evaluation/runners/base_evaluation_runner.py index c88de0169e..d851ed6401 100644 --- a/api/core/evaluation/runners/base_evaluation_runner.py +++ b/api/core/evaluation/runners/base_evaluation_runner.py @@ -17,11 +17,15 @@ from sqlalchemy.orm import Session from core.evaluation.base_evaluation_instance import BaseEvaluationInstance from core.evaluation.entities.evaluation_entity import ( + CustomizedMetrics, + DefaultMetric, EvaluationItemInput, EvaluationItemResult, ) from core.evaluation.entities.judgment_entity import JudgmentConfig from core.evaluation.judgment.processor import JudgmentProcessor +from core.workflow.enums import WorkflowNodeExecutionStatus +from core.workflow.node_events import NodeRunResult from libs.datetime_utils import naive_utc_now from models.evaluation import EvaluationRun, EvaluationRunItem, EvaluationRunStatus @@ -35,17 +39,6 @@ class BaseEvaluationRunner(ABC): self.evaluation_instance = evaluation_instance self.session = session - @abstractmethod - def execute_target( - self, - tenant_id: str, - target_id: str, - target_type: str, - item: EvaluationItemInput, - ) -> EvaluationItemResult: - """Execute the evaluation target for a single item and return the result with actual_output populated.""" - ... - @abstractmethod def evaluate_metrics( self, @@ -65,17 +58,19 @@ class BaseEvaluationRunner(ABC): tenant_id: str, target_id: str, target_type: str, - items: list[EvaluationItemInput], - default_metrics: list[dict[str, Any]], - customized_metrics: dict[str, Any] | None = None, + node_run_result: NodeRunResult, + default_metric: DefaultMetric | None = None, + customized_metrics: CustomizedMetrics | None = None, model_provider: str = "", - model_name: str = "", - judgment_config: JudgmentConfig | None = None, + model_name: str = "", ) -> list[EvaluationItemResult]: """Orchestrate target execution + metric evaluation + judgment for all items.""" evaluation_run = self.session.query(EvaluationRun).filter_by(id=evaluation_run_id).first() if not evaluation_run: raise ValueError(f"EvaluationRun {evaluation_run_id} not found") + + if not default_metric and not customized_metrics: + raise ValueError("Either default_metric or customized_metrics must be provided") # Update status to running evaluation_run.status = EvaluationRunStatus.RUNNING @@ -84,28 +79,8 @@ class BaseEvaluationRunner(ABC): results: list[EvaluationItemResult] = [] - # Phase 1: Execute target for each item - for item in items: - try: - result = self.execute_target(tenant_id, target_id, target_type, item) - results.append(result) - evaluation_run.completed_items += 1 - except Exception as e: - logger.exception("Failed to execute target for item %d", item.index) - results.append( - EvaluationItemResult( - index=item.index, - error=str(e), - ) - ) - evaluation_run.failed_items += 1 - self.session.commit() - - # Phase 2: Compute metrics on successful results - successful_items = [item for item, result in zip(items, results) if result.error is None] - successful_results = [r for r in results if r.error is None] - - if successful_items and successful_results: + # Phase 1: run evaluation + if node_run_result.status == WorkflowNodeExecutionStatus.SUCCEEDED: try: if customized_metrics is not None: # Customized workflow evaluation — target-type agnostic @@ -126,10 +101,6 @@ class BaseEvaluationRunner(ABC): except Exception: logger.exception("Failed to compute metrics for evaluation run %s", evaluation_run_id) - # Phase 3: Apply judgment conditions on metrics - if judgment_config and judgment_config.conditions: - results = self._apply_judgment(results, items, judgment_config) - # Phase 4: Persist individual items for result in results: item_input = next((item for item in items if item.index == result.index), None) diff --git a/api/core/evaluation/runners/llm_evaluation_runner.py b/api/core/evaluation/runners/llm_evaluation_runner.py index 7c92322557..aa746751ca 100644 --- a/api/core/evaluation/runners/llm_evaluation_runner.py +++ b/api/core/evaluation/runners/llm_evaluation_runner.py @@ -21,71 +21,6 @@ class LLMEvaluationRunner(BaseEvaluationRunner): def __init__(self, evaluation_instance: BaseEvaluationInstance, session: Session): super().__init__(evaluation_instance, session) - def execute_target( - self, - tenant_id: str, - target_id: str, - target_type: str, - item: EvaluationItemInput, - ) -> EvaluationItemResult: - """Execute the App/Snippet with the given inputs and collect the response.""" - from core.app.apps.completion.app_generator import CompletionAppGenerator - from core.app.apps.workflow.app_generator import WorkflowAppGenerator - from core.app.entities.app_invoke_entities import InvokeFrom - from core.evaluation.runners import get_service_account_for_app - from services.workflow_service import WorkflowService - - app = self.session.query(App).filter_by(id=target_id).first() - if not app: - raise ValueError(f"App {target_id} not found") - - # Get a service account for invocation - service_account = get_service_account_for_app(self.session, target_id) - - app_mode = AppMode.value_of(app.mode) - - # Build args from evaluation item inputs - args: dict[str, Any] = { - "inputs": item.inputs, - } - # For completion/chat modes, first text input becomes query - if app_mode in (AppMode.COMPLETION, AppMode.CHAT): - query = self._extract_query(item.inputs) - args["query"] = query - - if app_mode in (AppMode.WORKFLOW, AppMode.ADVANCED_CHAT): - workflow_service = WorkflowService() - workflow = workflow_service.get_published_workflow(app_model=app) - if not workflow: - raise ValueError(f"No published workflow found for app {target_id}") - - generator = WorkflowAppGenerator() - response: Mapping[str, Any] = generator.generate( - app_model=app, - workflow=workflow, - user=service_account, - args=args, - invoke_from=InvokeFrom.SERVICE_API, - streaming=False, - ) - elif app_mode == AppMode.COMPLETION: - generator = CompletionAppGenerator() - response = generator.generate( - app_model=app, - user=service_account, - args=args, - invoke_from=InvokeFrom.SERVICE_API, - streaming=False, - ) - else: - raise ValueError(f"Unsupported app mode for LLM evaluation: {app_mode}") - - actual_output = self._extract_output(response) - return EvaluationItemResult( - index=item.index, - actual_output=actual_output, - ) - def evaluate_metrics( self, items: list[EvaluationItemInput], diff --git a/api/core/evaluation/runners/retrieval_evaluation_runner.py b/api/core/evaluation/runners/retrieval_evaluation_runner.py index 3949dc4ed9..57cffd2e9e 100644 --- a/api/core/evaluation/runners/retrieval_evaluation_runner.py +++ b/api/core/evaluation/runners/retrieval_evaluation_runner.py @@ -19,41 +19,6 @@ class RetrievalEvaluationRunner(BaseEvaluationRunner): def __init__(self, evaluation_instance: BaseEvaluationInstance, session: Session): super().__init__(evaluation_instance, session) - def execute_target( - self, - tenant_id: str, - target_id: str, - target_type: str, - item: EvaluationItemInput, - ) -> EvaluationItemResult: - """Execute retrieval using DatasetRetrieval and collect context documents.""" - from core.rag.retrieval.dataset_retrieval import DatasetRetrieval - - query = self._extract_query(item.inputs) - - dataset_retrieval = DatasetRetrieval() - - # Use knowledge_retrieval for structured results - try: - from core.rag.retrieval.dataset_retrieval import KnowledgeRetrievalRequest - - request = KnowledgeRetrievalRequest( - query=query, - app_id=target_id, - tenant_id=tenant_id, - ) - sources = dataset_retrieval.knowledge_retrieval(request) - retrieved_contexts = [source.content for source in sources if source.content] - except (ImportError, AttributeError): - logger.warning("KnowledgeRetrievalRequest not available, using simple retrieval") - retrieved_contexts = [] - - return EvaluationItemResult( - index=item.index, - actual_output="\n\n".join(retrieved_contexts) if retrieved_contexts else "", - metadata={"retrieved_contexts": retrieved_contexts}, - ) - def evaluate_metrics( self, items: list[EvaluationItemInput], diff --git a/api/core/evaluation/runners/workflow_evaluation_runner.py b/api/core/evaluation/runners/workflow_evaluation_runner.py index 9508251f56..dc968b93b7 100644 --- a/api/core/evaluation/runners/workflow_evaluation_runner.py +++ b/api/core/evaluation/runners/workflow_evaluation_runner.py @@ -21,50 +21,6 @@ class WorkflowEvaluationRunner(BaseEvaluationRunner): def __init__(self, evaluation_instance: BaseEvaluationInstance, session: Session): super().__init__(evaluation_instance, session) - def execute_target( - self, - tenant_id: str, - target_id: str, - target_type: str, - item: EvaluationItemInput, - ) -> EvaluationItemResult: - """Execute workflow and collect outputs.""" - from core.app.apps.workflow.app_generator import WorkflowAppGenerator - from core.app.entities.app_invoke_entities import InvokeFrom - from core.evaluation.runners import get_service_account_for_app - from services.workflow_service import WorkflowService - - app = self.session.query(App).filter_by(id=target_id).first() - if not app: - raise ValueError(f"App {target_id} not found") - - service_account = get_service_account_for_app(self.session, target_id) - workflow_service = WorkflowService() - workflow = workflow_service.get_published_workflow(app_model=app) - if not workflow: - raise ValueError(f"No published workflow found for app {target_id}") - - args: dict[str, Any] = {"inputs": item.inputs} - - generator = WorkflowAppGenerator() - response: Mapping[str, Any] = generator.generate( - app_model=app, - workflow=workflow, - user=service_account, - args=args, - invoke_from=InvokeFrom.SERVICE_API, - streaming=False, - ) - - actual_output = self._extract_output(response) - node_executions = self._extract_node_executions(response) - - return EvaluationItemResult( - index=item.index, - actual_output=actual_output, - metadata={"node_executions": node_executions}, - ) - def evaluate_metrics( self, items: list[EvaluationItemInput], diff --git a/api/services/evaluation_service.py b/api/services/evaluation_service.py index 40b3217e3b..73d3995895 100644 --- a/api/services/evaluation_service.py +++ b/api/services/evaluation_service.py @@ -327,7 +327,6 @@ class EvaluationService: tenant_id=tenant_id, target_type=target_type, target_id=target_id, - evaluation_category=evaluation_category, evaluation_model_provider=run_request.evaluation_model_provider, evaluation_model=run_request.evaluation_model, default_metrics=[m.model_dump() for m in run_request.default_metrics], diff --git a/api/tasks/evaluation_task.py b/api/tasks/evaluation_task.py index b61a7f6399..6cb50c47c2 100644 --- a/api/tasks/evaluation_task.py +++ b/api/tasks/evaluation_task.py @@ -9,6 +9,7 @@ from openpyxl.styles import Alignment, Border, Font, PatternFill, Side from openpyxl.utils import get_column_letter from configs import dify_config +from core.evaluation.base_evaluation_instance import BaseEvaluationInstance from core.evaluation.entities.evaluation_entity import ( EvaluationCategory, EvaluationItemResult, @@ -16,9 +17,11 @@ from core.evaluation.entities.evaluation_entity import ( ) from core.evaluation.evaluation_manager import EvaluationManager from core.evaluation.runners.agent_evaluation_runner import AgentEvaluationRunner +from core.evaluation.runners.base_evaluation_runner import BaseEvaluationRunner from core.evaluation.runners.llm_evaluation_runner import LLMEvaluationRunner from core.evaluation.runners.retrieval_evaluation_runner import RetrievalEvaluationRunner from core.evaluation.runners.workflow_evaluation_runner import WorkflowEvaluationRunner +from core.workflow.node_events.base import NodeRunResult from extensions.ext_database import db from libs.datetime_utils import naive_utc_now from models.evaluation import EvaluationRun, EvaluationRunStatus @@ -72,22 +75,8 @@ def _execute_evaluation(session: Any, run_data: EvaluationRunData) -> None: if evaluation_instance is None: raise ValueError("Evaluation framework not configured") - # Select runner based on category - runner = _create_runner(run_data.evaluation_category, evaluation_instance, session) + _execute_evaluation_runner(session, run_data, evaluation_instance, node_run_result_mapping) - # Execute evaluation - results = runner.run( - evaluation_run_id=run_data.evaluation_run_id, - tenant_id=run_data.tenant_id, - target_id=run_data.target_id, - target_type=run_data.target_type, - items=run_data.items, - default_metrics=run_data.default_metrics, - customized_metrics=run_data.customized_metrics, - model_provider=run_data.evaluation_model_provider, - model_name=run_data.evaluation_model, - judgment_config=run_data.judgment_config, - ) # Compute summary metrics metrics_summary = _compute_metrics_summary(results) @@ -112,12 +101,46 @@ def _execute_evaluation(session: Any, run_data: EvaluationRunData) -> None: logger.info("Evaluation run %s completed successfully", run_data.evaluation_run_id) +def _execute_evaluation_runner( + session: Any, + run_data: EvaluationRunData, + evaluation_instance: BaseEvaluationInstance, + node_run_result_mapping: dict[str, NodeRunResult], +) -> list[EvaluationItemResult]: + """Execute the evaluation runner.""" + default_metrics = run_data.default_metrics + customized_metrics = run_data.customized_metrics + for default_metric in default_metrics: + for node_info in default_metric.node_info_list: + node_run_result = node_run_result_mapping.get(node_info.node_id) + if node_run_result: + runner = _create_runner(EvaluationCategory(node_info.type), evaluation_instance, session) + runner.run( + evaluation_run_id=run_data.evaluation_run_id, + tenant_id=run_data.tenant_id, + target_id=run_data.target_id, + target_type=run_data.target_type, + default_metric=default_metric, + customized_metrics=None, + model_provider=run_data.evaluation_model_provider, + model_name=run_data.evaluation_model, + node_run_result=node_run_result, + ) + else: + default_metric.score = 0 + for customized_metric in customized_metrics: + runner = _create_runner(run_data.evaluation_category, evaluation_instance, session) + runner.run( + evaluation_run_id=run_data.evaluation_run_id, + tenant_id=run_data.tenant_id, + target_id=run_data.target_id, + ) def _create_runner( category: EvaluationCategory, - evaluation_instance: Any, + evaluation_instance: BaseEvaluationInstance, session: Any, -) -> Any: +) -> BaseEvaluationRunner: """Create the appropriate runner for the evaluation category.""" match category: case EvaluationCategory.LLM: