fix: virtual batch test run for frontend. (#35727)

Co-authored-by: jyong <718720800@qq.com>
Co-authored-by: Yansong Zhang <916125788@qq.com>
Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
Co-authored-by: hj24 <mambahj24@gmail.com>
Co-authored-by: hj24 <huangjian@dify.ai>
Co-authored-by: Joel <iamjoel007@gmail.com>
Co-authored-by: Stephen Zhou <38493346+hyoban@users.noreply.github.com>
Co-authored-by: CodingOnStar <hanxujiang@dify.com>
Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com>
This commit is contained in:
FFXN 2026-04-30 15:03:55 +08:00 committed by GitHub
parent 10e6fbe721
commit 3373b63716
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 271 additions and 22 deletions

View File

@ -234,6 +234,32 @@ def get_evaluation_target(view_func: Callable[P, R]):
return decorated_view
def _load_evaluation_run_request_and_dataset(tenant_id: str) -> tuple[EvaluationRunRequest, bytes]:
"""Validate the run payload and load the uploaded dataset bytes."""
body = request.get_json(force=True)
if not body:
raise BadRequest("Request body is required.")
try:
run_request = EvaluationRunRequest.model_validate(body)
except Exception as e:
raise BadRequest(f"Invalid request body: {e}")
upload_file = db.session.query(UploadFile).filter_by(id=run_request.file_id, tenant_id=tenant_id).first()
if not upload_file:
raise NotFound("Dataset file not found.")
try:
dataset_content = storage.load_once(upload_file.key)
except Exception:
raise BadRequest("Failed to read dataset file.")
if not dataset_content:
raise BadRequest("Dataset file is empty.")
return run_request, dataset_content
@console_ns.route("/<string:evaluate_target_type>/<uuid:evaluate_target_id>/dataset-template/download")
class EvaluationDatasetTemplateDownloadApi(Resource):
@console_ns.doc("download_evaluation_dataset_template")
@ -408,31 +434,56 @@ class EvaluationRunApi(Resource):
- judgment_config: judgment conditions config (optional)
"""
current_account, current_tenant_id = current_account_with_tenant()
body = request.get_json(force=True)
if not body:
raise BadRequest("Request body is required.")
# Validate and parse request body
try:
run_request = EvaluationRunRequest.model_validate(body)
except Exception as e:
raise BadRequest(f"Invalid request body: {e}")
# Load dataset file
upload_file = (
db.session.query(UploadFile).filter_by(id=run_request.file_id, tenant_id=current_tenant_id).first()
)
if not upload_file:
raise NotFound("Dataset file not found.")
run_request, dataset_content = _load_evaluation_run_request_and_dataset(current_tenant_id)
try:
dataset_content = storage.load_once(upload_file.key)
except Exception:
raise BadRequest("Failed to read dataset file.")
with Session(db.engine, expire_on_commit=False) as session:
if target_type == EvaluationTargetType.APPS.value:
evaluation_run = EvaluationService.start_stub_evaluation_run(
session=session,
tenant_id=current_tenant_id,
target_type=target_type,
target_id=str(target.id),
account_id=str(current_account.id),
dataset_file_content=dataset_content,
run_request=run_request,
)
else:
evaluation_run = EvaluationService.start_evaluation_run(
session=session,
tenant_id=current_tenant_id,
target_type=target_type,
target_id=str(target.id),
account_id=str(current_account.id),
dataset_file_content=dataset_content,
run_request=run_request,
)
return _serialize_evaluation_run(evaluation_run), 200
except EvaluationFrameworkNotConfiguredError as e:
return {"message": str(e.description)}, 400
except EvaluationNotFoundError as e:
return {"message": str(e.description)}, 404
except EvaluationMaxConcurrentRunsError as e:
return {"message": str(e.description)}, 429
except EvaluationDatasetInvalidError as e:
return {"message": str(e.description)}, 400
if not dataset_content:
raise BadRequest("Dataset file is empty.")
@console_ns.route("/<string:evaluate_target_type>/<uuid:evaluate_target_id>/evaluation/run1")
class EvaluationRunRealApi(Resource):
@console_ns.doc("start_evaluation_run_real")
@console_ns.response(200, "Evaluation run started")
@console_ns.response(400, "Invalid request")
@console_ns.response(404, "Target not found")
@setup_required
@login_required
@account_initialization_required
@get_evaluation_target
@edit_permission_required
def post(self, target: Union[App, CustomizedSnippet, Dataset], target_type: str):
"""Start the real evaluation execution flow on the temporary dev path."""
current_account, current_tenant_id = current_account_with_tenant()
run_request, dataset_content = _load_evaluation_run_request_and_dataset(current_tenant_id)
try:
with Session(db.engine, expire_on_commit=False) as session:

View File

@ -125,6 +125,7 @@ class EvaluationRun(Base):
total_items: Mapped[int] = mapped_column(Integer, nullable=False, default=0)
completed_items: Mapped[int] = mapped_column(Integer, nullable=False, default=0)
failed_items: Mapped[int] = mapped_column(Integer, nullable=False, default=0)
metrics_summary: Mapped[str | None] = mapped_column(LongText, nullable=True)
error: Mapped[str | None] = mapped_column(Text, nullable=True)
celery_task_id: Mapped[str | None] = mapped_column(String(255), nullable=True)
@ -143,6 +144,12 @@ class EvaluationRun(Base):
return 0.0
return (self.completed_items + self.failed_items) / self.total_items
@property
def metrics_summary_dict(self) -> dict[str, Any]:
if self.metrics_summary:
return json.loads(self.metrics_summary)
return {}
def __repr__(self) -> str:
return f"<EvaluationRun(id={self.id}, status={self.status})>"

View File

@ -17,14 +17,19 @@ from core.evaluation.entities.evaluation_entity import (
EvaluationCategory,
EvaluationConfigData,
EvaluationDatasetInput,
EvaluationItemResult,
EvaluationMetric,
EvaluationMetricName,
EvaluationRunData,
EvaluationRunRequest,
NodeInfo,
)
from core.evaluation.entities.judgment_entity import JudgmentConfig, JudgmentResult
from core.evaluation.evaluation_manager import EvaluationManager
from core.evaluation.judgment.processor import JudgmentProcessor
from graphon.enums import WorkflowNodeExecutionMetadataKey
from graphon.node_events.base import NodeRunResult
from libs.datetime_utils import naive_utc_now
from models.evaluation import (
EvaluationConfiguration,
EvaluationRun,
@ -423,6 +428,78 @@ class EvaluationService:
return evaluation_run
@classmethod
def start_stub_evaluation_run(
cls,
session: Session,
tenant_id: str,
target_type: str,
target_id: str,
account_id: str,
dataset_file_content: bytes,
run_request: EvaluationRunRequest,
) -> EvaluationRun:
"""Persist a completed synthetic run for frontend integration testing.
This temporary path keeps the existing read flows (`logs`, `run detail`,
and result-file download) working for app evaluations while the real
execution logic is moved to `/evaluation/run1` for backend iteration.
"""
from tasks.evaluation_task import (
_compute_metrics_summary,
_generate_result_xlsx,
_persist_results,
_store_result_file,
)
config = cls.save_evaluation_config(
session=session,
tenant_id=tenant_id,
target_type=target_type,
target_id=target_id,
account_id=account_id,
data=run_request,
)
items = cls._parse_dataset(dataset_file_content)
max_rows = dify_config.EVALUATION_MAX_DATASET_ROWS
if len(items) > max_rows:
raise EvaluationDatasetInvalidError(f"Dataset has {len(items)} rows, max is {max_rows}.")
now = naive_utc_now()
results = cls._build_stub_results(input_list=items, run_request=run_request)
metrics_summary = _compute_metrics_summary(results, run_request.judgment_config)
evaluation_run = EvaluationRun(
tenant_id=tenant_id,
target_type=target_type,
target_id=target_id,
evaluation_config_id=config.id,
status=EvaluationRunStatus.COMPLETED,
dataset_file_id=run_request.file_id,
total_items=len(items),
completed_items=len(items),
failed_items=0,
metrics_summary=json.dumps(metrics_summary),
created_by=account_id,
started_at=now,
completed_at=now,
)
session.add(evaluation_run)
session.commit()
session.refresh(evaluation_run)
_persist_results(session, evaluation_run.id, results, items)
result_xlsx = _generate_result_xlsx(items, results)
result_file_id = _store_result_file(tenant_id, evaluation_run.id, result_xlsx, session)
if result_file_id:
evaluation_run.result_file_id = result_file_id
session.commit()
session.refresh(evaluation_run)
return evaluation_run
@classmethod
def get_evaluation_runs(
cls,
@ -902,6 +979,120 @@ class EvaluationService:
wb.close()
return items
@classmethod
def _build_stub_results(
cls,
input_list: list[EvaluationDatasetInput],
run_request: EvaluationRunRequest,
) -> list[EvaluationItemResult]:
"""Create deterministic synthetic results that match the real read models."""
results: list[EvaluationItemResult] = []
for item_position, item in enumerate(input_list):
metrics: list[EvaluationMetric] = []
for metric_position, default_metric in enumerate(run_request.default_metrics):
metric_value_type = default_metric.value_type or METRIC_VALUE_TYPE_MAPPING.get(default_metric.metric, "")
for node_position, node_info in enumerate(default_metric.node_info_list):
metrics.append(
EvaluationMetric(
name=default_metric.metric,
value=cls._build_stub_metric_value(
item_index=item.index,
metric_position=metric_position,
node_position=node_position,
value_type=metric_value_type,
metric_name=default_metric.metric,
),
details={
"stubbed": True,
"source": "console-evaluation-run",
"value_type": metric_value_type,
},
node_info=node_info,
)
)
if run_request.customized_metrics:
for output_position, output_field in enumerate(run_request.customized_metrics.output_fields):
metrics.append(
EvaluationMetric(
name=output_field.variable,
value=cls._build_stub_metric_value(
item_index=item.index,
metric_position=len(metrics),
node_position=output_position,
value_type=output_field.value_type,
metric_name=output_field.variable,
),
details={
"stubbed": True,
"source": "console-evaluation-run",
"value_type": output_field.value_type,
"customized": True,
},
)
)
judgment = cls._evaluate_stub_judgment(metrics, run_request.judgment_config)
actual_output = item.expected_output or cls._build_stub_output(item_position, item.inputs)
results.append(
EvaluationItemResult(
index=item.index,
actual_output=actual_output,
metrics=metrics,
metadata={
"stubbed": True,
"source": "console-evaluation-run",
"row": item_position + 1,
},
judgment=judgment,
)
)
return results
@staticmethod
def _build_stub_output(item_position: int, inputs: dict[str, Any]) -> str:
"""Build a readable synthetic output for one dataset row."""
first_key = next(iter(inputs.keys()), "input")
first_value = inputs.get(first_key, "")
return f"Stub output #{item_position + 1}: processed {first_key}={first_value}"
@staticmethod
def _build_stub_metric_value(
item_index: int,
metric_position: int,
node_position: int,
value_type: str,
metric_name: str,
) -> Any:
"""Return a deterministic placeholder metric value by declared type."""
base_seed = item_index + metric_position + node_position
normalized_type = value_type.lower()
if normalized_type == "number" or not normalized_type:
return round(0.72 + (base_seed % 18) / 100, 3)
if normalized_type == "boolean":
return base_seed % 2 == 0
return f"stub-{metric_name}-{item_index}"
@staticmethod
def _evaluate_stub_judgment(
metrics: list[EvaluationMetric],
judgment_config: JudgmentConfig | None,
) -> JudgmentResult:
"""Apply the same judgment processor used by real evaluations."""
if not judgment_config or not judgment_config.conditions:
return JudgmentResult()
metric_values: dict[tuple[str, str], object] = {
(metric.node_info.node_id, metric.name): metric.value for metric in metrics if metric.node_info
}
return JudgmentProcessor.evaluate(metric_values, judgment_config)
@classmethod
def execute_retrieval_test_targets(
cls,