From 3373b637168017e9fe04ec4eb71e2dd73c4d0501 Mon Sep 17 00:00:00 2001 From: FFXN <31929997+FFXN@users.noreply.github.com> Date: Thu, 30 Apr 2026 15:03:55 +0800 Subject: [PATCH] fix: virtual batch test run for frontend. (#35727) Co-authored-by: jyong <718720800@qq.com> Co-authored-by: Yansong Zhang <916125788@qq.com> Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com> Co-authored-by: hj24 Co-authored-by: hj24 Co-authored-by: Joel Co-authored-by: Stephen Zhou <38493346+hyoban@users.noreply.github.com> Co-authored-by: CodingOnStar Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com> --- .../console/evaluation/evaluation.py | 95 +++++++-- api/models/evaluation.py | 7 + api/services/evaluation_service.py | 191 ++++++++++++++++++ 3 files changed, 271 insertions(+), 22 deletions(-) diff --git a/api/controllers/console/evaluation/evaluation.py b/api/controllers/console/evaluation/evaluation.py index 1abceffcb5..54a34e88b9 100644 --- a/api/controllers/console/evaluation/evaluation.py +++ b/api/controllers/console/evaluation/evaluation.py @@ -234,6 +234,32 @@ def get_evaluation_target(view_func: Callable[P, R]): return decorated_view +def _load_evaluation_run_request_and_dataset(tenant_id: str) -> tuple[EvaluationRunRequest, bytes]: + """Validate the run payload and load the uploaded dataset bytes.""" + body = request.get_json(force=True) + if not body: + raise BadRequest("Request body is required.") + + try: + run_request = EvaluationRunRequest.model_validate(body) + except Exception as e: + raise BadRequest(f"Invalid request body: {e}") + + upload_file = db.session.query(UploadFile).filter_by(id=run_request.file_id, tenant_id=tenant_id).first() + if not upload_file: + raise NotFound("Dataset file not found.") + + try: + dataset_content = storage.load_once(upload_file.key) + except Exception: + raise BadRequest("Failed to read dataset file.") + + if not dataset_content: + raise BadRequest("Dataset file is empty.") + + return run_request, dataset_content + + @console_ns.route("///dataset-template/download") class EvaluationDatasetTemplateDownloadApi(Resource): @console_ns.doc("download_evaluation_dataset_template") @@ -408,31 +434,56 @@ class EvaluationRunApi(Resource): - judgment_config: judgment conditions config (optional) """ current_account, current_tenant_id = current_account_with_tenant() - - body = request.get_json(force=True) - if not body: - raise BadRequest("Request body is required.") - - # Validate and parse request body - try: - run_request = EvaluationRunRequest.model_validate(body) - except Exception as e: - raise BadRequest(f"Invalid request body: {e}") - - # Load dataset file - upload_file = ( - db.session.query(UploadFile).filter_by(id=run_request.file_id, tenant_id=current_tenant_id).first() - ) - if not upload_file: - raise NotFound("Dataset file not found.") + run_request, dataset_content = _load_evaluation_run_request_and_dataset(current_tenant_id) try: - dataset_content = storage.load_once(upload_file.key) - except Exception: - raise BadRequest("Failed to read dataset file.") + with Session(db.engine, expire_on_commit=False) as session: + if target_type == EvaluationTargetType.APPS.value: + evaluation_run = EvaluationService.start_stub_evaluation_run( + session=session, + tenant_id=current_tenant_id, + target_type=target_type, + target_id=str(target.id), + account_id=str(current_account.id), + dataset_file_content=dataset_content, + run_request=run_request, + ) + else: + evaluation_run = EvaluationService.start_evaluation_run( + session=session, + tenant_id=current_tenant_id, + target_type=target_type, + target_id=str(target.id), + account_id=str(current_account.id), + dataset_file_content=dataset_content, + run_request=run_request, + ) + return _serialize_evaluation_run(evaluation_run), 200 + except EvaluationFrameworkNotConfiguredError as e: + return {"message": str(e.description)}, 400 + except EvaluationNotFoundError as e: + return {"message": str(e.description)}, 404 + except EvaluationMaxConcurrentRunsError as e: + return {"message": str(e.description)}, 429 + except EvaluationDatasetInvalidError as e: + return {"message": str(e.description)}, 400 - if not dataset_content: - raise BadRequest("Dataset file is empty.") + +@console_ns.route("///evaluation/run1") +class EvaluationRunRealApi(Resource): + @console_ns.doc("start_evaluation_run_real") + @console_ns.response(200, "Evaluation run started") + @console_ns.response(400, "Invalid request") + @console_ns.response(404, "Target not found") + @setup_required + @login_required + @account_initialization_required + @get_evaluation_target + @edit_permission_required + def post(self, target: Union[App, CustomizedSnippet, Dataset], target_type: str): + """Start the real evaluation execution flow on the temporary dev path.""" + current_account, current_tenant_id = current_account_with_tenant() + run_request, dataset_content = _load_evaluation_run_request_and_dataset(current_tenant_id) try: with Session(db.engine, expire_on_commit=False) as session: diff --git a/api/models/evaluation.py b/api/models/evaluation.py index 4dcc5f87ed..8b752633c6 100644 --- a/api/models/evaluation.py +++ b/api/models/evaluation.py @@ -125,6 +125,7 @@ class EvaluationRun(Base): total_items: Mapped[int] = mapped_column(Integer, nullable=False, default=0) completed_items: Mapped[int] = mapped_column(Integer, nullable=False, default=0) failed_items: Mapped[int] = mapped_column(Integer, nullable=False, default=0) + metrics_summary: Mapped[str | None] = mapped_column(LongText, nullable=True) error: Mapped[str | None] = mapped_column(Text, nullable=True) celery_task_id: Mapped[str | None] = mapped_column(String(255), nullable=True) @@ -143,6 +144,12 @@ class EvaluationRun(Base): return 0.0 return (self.completed_items + self.failed_items) / self.total_items + @property + def metrics_summary_dict(self) -> dict[str, Any]: + if self.metrics_summary: + return json.loads(self.metrics_summary) + return {} + def __repr__(self) -> str: return f"" diff --git a/api/services/evaluation_service.py b/api/services/evaluation_service.py index e5ff01ebc2..c6fad3b90c 100644 --- a/api/services/evaluation_service.py +++ b/api/services/evaluation_service.py @@ -17,14 +17,19 @@ from core.evaluation.entities.evaluation_entity import ( EvaluationCategory, EvaluationConfigData, EvaluationDatasetInput, + EvaluationItemResult, + EvaluationMetric, EvaluationMetricName, EvaluationRunData, EvaluationRunRequest, NodeInfo, ) +from core.evaluation.entities.judgment_entity import JudgmentConfig, JudgmentResult from core.evaluation.evaluation_manager import EvaluationManager +from core.evaluation.judgment.processor import JudgmentProcessor from graphon.enums import WorkflowNodeExecutionMetadataKey from graphon.node_events.base import NodeRunResult +from libs.datetime_utils import naive_utc_now from models.evaluation import ( EvaluationConfiguration, EvaluationRun, @@ -423,6 +428,78 @@ class EvaluationService: return evaluation_run + @classmethod + def start_stub_evaluation_run( + cls, + session: Session, + tenant_id: str, + target_type: str, + target_id: str, + account_id: str, + dataset_file_content: bytes, + run_request: EvaluationRunRequest, + ) -> EvaluationRun: + """Persist a completed synthetic run for frontend integration testing. + + This temporary path keeps the existing read flows (`logs`, `run detail`, + and result-file download) working for app evaluations while the real + execution logic is moved to `/evaluation/run1` for backend iteration. + """ + from tasks.evaluation_task import ( + _compute_metrics_summary, + _generate_result_xlsx, + _persist_results, + _store_result_file, + ) + + config = cls.save_evaluation_config( + session=session, + tenant_id=tenant_id, + target_type=target_type, + target_id=target_id, + account_id=account_id, + data=run_request, + ) + + items = cls._parse_dataset(dataset_file_content) + max_rows = dify_config.EVALUATION_MAX_DATASET_ROWS + if len(items) > max_rows: + raise EvaluationDatasetInvalidError(f"Dataset has {len(items)} rows, max is {max_rows}.") + + now = naive_utc_now() + results = cls._build_stub_results(input_list=items, run_request=run_request) + metrics_summary = _compute_metrics_summary(results, run_request.judgment_config) + + evaluation_run = EvaluationRun( + tenant_id=tenant_id, + target_type=target_type, + target_id=target_id, + evaluation_config_id=config.id, + status=EvaluationRunStatus.COMPLETED, + dataset_file_id=run_request.file_id, + total_items=len(items), + completed_items=len(items), + failed_items=0, + metrics_summary=json.dumps(metrics_summary), + created_by=account_id, + started_at=now, + completed_at=now, + ) + session.add(evaluation_run) + session.commit() + session.refresh(evaluation_run) + + _persist_results(session, evaluation_run.id, results, items) + + result_xlsx = _generate_result_xlsx(items, results) + result_file_id = _store_result_file(tenant_id, evaluation_run.id, result_xlsx, session) + if result_file_id: + evaluation_run.result_file_id = result_file_id + session.commit() + session.refresh(evaluation_run) + + return evaluation_run + @classmethod def get_evaluation_runs( cls, @@ -902,6 +979,120 @@ class EvaluationService: wb.close() return items + @classmethod + def _build_stub_results( + cls, + input_list: list[EvaluationDatasetInput], + run_request: EvaluationRunRequest, + ) -> list[EvaluationItemResult]: + """Create deterministic synthetic results that match the real read models.""" + results: list[EvaluationItemResult] = [] + + for item_position, item in enumerate(input_list): + metrics: list[EvaluationMetric] = [] + + for metric_position, default_metric in enumerate(run_request.default_metrics): + metric_value_type = default_metric.value_type or METRIC_VALUE_TYPE_MAPPING.get(default_metric.metric, "") + for node_position, node_info in enumerate(default_metric.node_info_list): + metrics.append( + EvaluationMetric( + name=default_metric.metric, + value=cls._build_stub_metric_value( + item_index=item.index, + metric_position=metric_position, + node_position=node_position, + value_type=metric_value_type, + metric_name=default_metric.metric, + ), + details={ + "stubbed": True, + "source": "console-evaluation-run", + "value_type": metric_value_type, + }, + node_info=node_info, + ) + ) + + if run_request.customized_metrics: + for output_position, output_field in enumerate(run_request.customized_metrics.output_fields): + metrics.append( + EvaluationMetric( + name=output_field.variable, + value=cls._build_stub_metric_value( + item_index=item.index, + metric_position=len(metrics), + node_position=output_position, + value_type=output_field.value_type, + metric_name=output_field.variable, + ), + details={ + "stubbed": True, + "source": "console-evaluation-run", + "value_type": output_field.value_type, + "customized": True, + }, + ) + ) + + judgment = cls._evaluate_stub_judgment(metrics, run_request.judgment_config) + actual_output = item.expected_output or cls._build_stub_output(item_position, item.inputs) + + results.append( + EvaluationItemResult( + index=item.index, + actual_output=actual_output, + metrics=metrics, + metadata={ + "stubbed": True, + "source": "console-evaluation-run", + "row": item_position + 1, + }, + judgment=judgment, + ) + ) + + return results + + @staticmethod + def _build_stub_output(item_position: int, inputs: dict[str, Any]) -> str: + """Build a readable synthetic output for one dataset row.""" + first_key = next(iter(inputs.keys()), "input") + first_value = inputs.get(first_key, "") + return f"Stub output #{item_position + 1}: processed {first_key}={first_value}" + + @staticmethod + def _build_stub_metric_value( + item_index: int, + metric_position: int, + node_position: int, + value_type: str, + metric_name: str, + ) -> Any: + """Return a deterministic placeholder metric value by declared type.""" + base_seed = item_index + metric_position + node_position + normalized_type = value_type.lower() + + if normalized_type == "number" or not normalized_type: + return round(0.72 + (base_seed % 18) / 100, 3) + if normalized_type == "boolean": + return base_seed % 2 == 0 + + return f"stub-{metric_name}-{item_index}" + + @staticmethod + def _evaluate_stub_judgment( + metrics: list[EvaluationMetric], + judgment_config: JudgmentConfig | None, + ) -> JudgmentResult: + """Apply the same judgment processor used by real evaluations.""" + if not judgment_config or not judgment_config.conditions: + return JudgmentResult() + + metric_values: dict[tuple[str, str], object] = { + (metric.node_info.node_id, metric.name): metric.value for metric in metrics if metric.node_info + } + return JudgmentProcessor.evaluate(metric_values, judgment_config) + @classmethod def execute_retrieval_test_targets( cls,