evaluation runtime

This commit is contained in:
jyong 2026-03-09 15:17:35 +08:00
parent b88195c7d9
commit 2ffd7e519f
7 changed files with 57 additions and 211 deletions

View File

@ -16,7 +16,7 @@ class EvaluationCategory(StrEnum):
class EvaluationMetric(BaseModel):
name: str
score: float
value: Any
details: dict[str, Any] = Field(default_factory=dict)
@ -31,8 +31,6 @@ class EvaluationItemResult(BaseModel):
index: int
actual_output: str | None = None
metrics: list[EvaluationMetric] = Field(default_factory=list)
judgment: JudgmentResult | None = None
metadata: dict[str, Any] = Field(default_factory=dict)
error: str | None = None
@property
@ -85,10 +83,9 @@ class EvaluationRunData(BaseModel):
tenant_id: str
target_type: str
target_id: str
evaluation_category: EvaluationCategory
evaluation_model_provider: str
evaluation_model: str
default_metrics: list[dict[str, Any]] = Field(default_factory=list)
customized_metrics: dict[str, Any] | None = None
default_metrics: list[DefaultMetric] = Field(default_factory=list)
customized_metrics: CustomizedMetrics | None = None
judgment_config: JudgmentConfig | None = None
items: list[EvaluationItemInput]
input_list: list[dict]

View File

@ -17,11 +17,15 @@ from sqlalchemy.orm import Session
from core.evaluation.base_evaluation_instance import BaseEvaluationInstance
from core.evaluation.entities.evaluation_entity import (
CustomizedMetrics,
DefaultMetric,
EvaluationItemInput,
EvaluationItemResult,
)
from core.evaluation.entities.judgment_entity import JudgmentConfig
from core.evaluation.judgment.processor import JudgmentProcessor
from core.workflow.enums import WorkflowNodeExecutionStatus
from core.workflow.node_events import NodeRunResult
from libs.datetime_utils import naive_utc_now
from models.evaluation import EvaluationRun, EvaluationRunItem, EvaluationRunStatus
@ -35,17 +39,6 @@ class BaseEvaluationRunner(ABC):
self.evaluation_instance = evaluation_instance
self.session = session
@abstractmethod
def execute_target(
self,
tenant_id: str,
target_id: str,
target_type: str,
item: EvaluationItemInput,
) -> EvaluationItemResult:
"""Execute the evaluation target for a single item and return the result with actual_output populated."""
...
@abstractmethod
def evaluate_metrics(
self,
@ -65,17 +58,19 @@ class BaseEvaluationRunner(ABC):
tenant_id: str,
target_id: str,
target_type: str,
items: list[EvaluationItemInput],
default_metrics: list[dict[str, Any]],
customized_metrics: dict[str, Any] | None = None,
node_run_result: NodeRunResult,
default_metric: DefaultMetric | None = None,
customized_metrics: CustomizedMetrics | None = None,
model_provider: str = "",
model_name: str = "",
judgment_config: JudgmentConfig | None = None,
model_name: str = "",
) -> list[EvaluationItemResult]:
"""Orchestrate target execution + metric evaluation + judgment for all items."""
evaluation_run = self.session.query(EvaluationRun).filter_by(id=evaluation_run_id).first()
if not evaluation_run:
raise ValueError(f"EvaluationRun {evaluation_run_id} not found")
if not default_metric and not customized_metrics:
raise ValueError("Either default_metric or customized_metrics must be provided")
# Update status to running
evaluation_run.status = EvaluationRunStatus.RUNNING
@ -84,28 +79,8 @@ class BaseEvaluationRunner(ABC):
results: list[EvaluationItemResult] = []
# Phase 1: Execute target for each item
for item in items:
try:
result = self.execute_target(tenant_id, target_id, target_type, item)
results.append(result)
evaluation_run.completed_items += 1
except Exception as e:
logger.exception("Failed to execute target for item %d", item.index)
results.append(
EvaluationItemResult(
index=item.index,
error=str(e),
)
)
evaluation_run.failed_items += 1
self.session.commit()
# Phase 2: Compute metrics on successful results
successful_items = [item for item, result in zip(items, results) if result.error is None]
successful_results = [r for r in results if r.error is None]
if successful_items and successful_results:
# Phase 1: run evaluation
if node_run_result.status == WorkflowNodeExecutionStatus.SUCCEEDED:
try:
if customized_metrics is not None:
# Customized workflow evaluation — target-type agnostic
@ -126,10 +101,6 @@ class BaseEvaluationRunner(ABC):
except Exception:
logger.exception("Failed to compute metrics for evaluation run %s", evaluation_run_id)
# Phase 3: Apply judgment conditions on metrics
if judgment_config and judgment_config.conditions:
results = self._apply_judgment(results, items, judgment_config)
# Phase 4: Persist individual items
for result in results:
item_input = next((item for item in items if item.index == result.index), None)

View File

@ -21,71 +21,6 @@ class LLMEvaluationRunner(BaseEvaluationRunner):
def __init__(self, evaluation_instance: BaseEvaluationInstance, session: Session):
super().__init__(evaluation_instance, session)
def execute_target(
self,
tenant_id: str,
target_id: str,
target_type: str,
item: EvaluationItemInput,
) -> EvaluationItemResult:
"""Execute the App/Snippet with the given inputs and collect the response."""
from core.app.apps.completion.app_generator import CompletionAppGenerator
from core.app.apps.workflow.app_generator import WorkflowAppGenerator
from core.app.entities.app_invoke_entities import InvokeFrom
from core.evaluation.runners import get_service_account_for_app
from services.workflow_service import WorkflowService
app = self.session.query(App).filter_by(id=target_id).first()
if not app:
raise ValueError(f"App {target_id} not found")
# Get a service account for invocation
service_account = get_service_account_for_app(self.session, target_id)
app_mode = AppMode.value_of(app.mode)
# Build args from evaluation item inputs
args: dict[str, Any] = {
"inputs": item.inputs,
}
# For completion/chat modes, first text input becomes query
if app_mode in (AppMode.COMPLETION, AppMode.CHAT):
query = self._extract_query(item.inputs)
args["query"] = query
if app_mode in (AppMode.WORKFLOW, AppMode.ADVANCED_CHAT):
workflow_service = WorkflowService()
workflow = workflow_service.get_published_workflow(app_model=app)
if not workflow:
raise ValueError(f"No published workflow found for app {target_id}")
generator = WorkflowAppGenerator()
response: Mapping[str, Any] = generator.generate(
app_model=app,
workflow=workflow,
user=service_account,
args=args,
invoke_from=InvokeFrom.SERVICE_API,
streaming=False,
)
elif app_mode == AppMode.COMPLETION:
generator = CompletionAppGenerator()
response = generator.generate(
app_model=app,
user=service_account,
args=args,
invoke_from=InvokeFrom.SERVICE_API,
streaming=False,
)
else:
raise ValueError(f"Unsupported app mode for LLM evaluation: {app_mode}")
actual_output = self._extract_output(response)
return EvaluationItemResult(
index=item.index,
actual_output=actual_output,
)
def evaluate_metrics(
self,
items: list[EvaluationItemInput],

View File

@ -19,41 +19,6 @@ class RetrievalEvaluationRunner(BaseEvaluationRunner):
def __init__(self, evaluation_instance: BaseEvaluationInstance, session: Session):
super().__init__(evaluation_instance, session)
def execute_target(
self,
tenant_id: str,
target_id: str,
target_type: str,
item: EvaluationItemInput,
) -> EvaluationItemResult:
"""Execute retrieval using DatasetRetrieval and collect context documents."""
from core.rag.retrieval.dataset_retrieval import DatasetRetrieval
query = self._extract_query(item.inputs)
dataset_retrieval = DatasetRetrieval()
# Use knowledge_retrieval for structured results
try:
from core.rag.retrieval.dataset_retrieval import KnowledgeRetrievalRequest
request = KnowledgeRetrievalRequest(
query=query,
app_id=target_id,
tenant_id=tenant_id,
)
sources = dataset_retrieval.knowledge_retrieval(request)
retrieved_contexts = [source.content for source in sources if source.content]
except (ImportError, AttributeError):
logger.warning("KnowledgeRetrievalRequest not available, using simple retrieval")
retrieved_contexts = []
return EvaluationItemResult(
index=item.index,
actual_output="\n\n".join(retrieved_contexts) if retrieved_contexts else "",
metadata={"retrieved_contexts": retrieved_contexts},
)
def evaluate_metrics(
self,
items: list[EvaluationItemInput],

View File

@ -21,50 +21,6 @@ class WorkflowEvaluationRunner(BaseEvaluationRunner):
def __init__(self, evaluation_instance: BaseEvaluationInstance, session: Session):
super().__init__(evaluation_instance, session)
def execute_target(
self,
tenant_id: str,
target_id: str,
target_type: str,
item: EvaluationItemInput,
) -> EvaluationItemResult:
"""Execute workflow and collect outputs."""
from core.app.apps.workflow.app_generator import WorkflowAppGenerator
from core.app.entities.app_invoke_entities import InvokeFrom
from core.evaluation.runners import get_service_account_for_app
from services.workflow_service import WorkflowService
app = self.session.query(App).filter_by(id=target_id).first()
if not app:
raise ValueError(f"App {target_id} not found")
service_account = get_service_account_for_app(self.session, target_id)
workflow_service = WorkflowService()
workflow = workflow_service.get_published_workflow(app_model=app)
if not workflow:
raise ValueError(f"No published workflow found for app {target_id}")
args: dict[str, Any] = {"inputs": item.inputs}
generator = WorkflowAppGenerator()
response: Mapping[str, Any] = generator.generate(
app_model=app,
workflow=workflow,
user=service_account,
args=args,
invoke_from=InvokeFrom.SERVICE_API,
streaming=False,
)
actual_output = self._extract_output(response)
node_executions = self._extract_node_executions(response)
return EvaluationItemResult(
index=item.index,
actual_output=actual_output,
metadata={"node_executions": node_executions},
)
def evaluate_metrics(
self,
items: list[EvaluationItemInput],

View File

@ -327,7 +327,6 @@ class EvaluationService:
tenant_id=tenant_id,
target_type=target_type,
target_id=target_id,
evaluation_category=evaluation_category,
evaluation_model_provider=run_request.evaluation_model_provider,
evaluation_model=run_request.evaluation_model,
default_metrics=[m.model_dump() for m in run_request.default_metrics],

View File

@ -9,6 +9,7 @@ from openpyxl.styles import Alignment, Border, Font, PatternFill, Side
from openpyxl.utils import get_column_letter
from configs import dify_config
from core.evaluation.base_evaluation_instance import BaseEvaluationInstance
from core.evaluation.entities.evaluation_entity import (
EvaluationCategory,
EvaluationItemResult,
@ -16,9 +17,11 @@ from core.evaluation.entities.evaluation_entity import (
)
from core.evaluation.evaluation_manager import EvaluationManager
from core.evaluation.runners.agent_evaluation_runner import AgentEvaluationRunner
from core.evaluation.runners.base_evaluation_runner import BaseEvaluationRunner
from core.evaluation.runners.llm_evaluation_runner import LLMEvaluationRunner
from core.evaluation.runners.retrieval_evaluation_runner import RetrievalEvaluationRunner
from core.evaluation.runners.workflow_evaluation_runner import WorkflowEvaluationRunner
from core.workflow.node_events.base import NodeRunResult
from extensions.ext_database import db
from libs.datetime_utils import naive_utc_now
from models.evaluation import EvaluationRun, EvaluationRunStatus
@ -72,22 +75,8 @@ def _execute_evaluation(session: Any, run_data: EvaluationRunData) -> None:
if evaluation_instance is None:
raise ValueError("Evaluation framework not configured")
# Select runner based on category
runner = _create_runner(run_data.evaluation_category, evaluation_instance, session)
_execute_evaluation_runner(session, run_data, evaluation_instance, node_run_result_mapping)
# Execute evaluation
results = runner.run(
evaluation_run_id=run_data.evaluation_run_id,
tenant_id=run_data.tenant_id,
target_id=run_data.target_id,
target_type=run_data.target_type,
items=run_data.items,
default_metrics=run_data.default_metrics,
customized_metrics=run_data.customized_metrics,
model_provider=run_data.evaluation_model_provider,
model_name=run_data.evaluation_model,
judgment_config=run_data.judgment_config,
)
# Compute summary metrics
metrics_summary = _compute_metrics_summary(results)
@ -112,12 +101,46 @@ def _execute_evaluation(session: Any, run_data: EvaluationRunData) -> None:
logger.info("Evaluation run %s completed successfully", run_data.evaluation_run_id)
def _execute_evaluation_runner(
session: Any,
run_data: EvaluationRunData,
evaluation_instance: BaseEvaluationInstance,
node_run_result_mapping: dict[str, NodeRunResult],
) -> list[EvaluationItemResult]:
"""Execute the evaluation runner."""
default_metrics = run_data.default_metrics
customized_metrics = run_data.customized_metrics
for default_metric in default_metrics:
for node_info in default_metric.node_info_list:
node_run_result = node_run_result_mapping.get(node_info.node_id)
if node_run_result:
runner = _create_runner(EvaluationCategory(node_info.type), evaluation_instance, session)
runner.run(
evaluation_run_id=run_data.evaluation_run_id,
tenant_id=run_data.tenant_id,
target_id=run_data.target_id,
target_type=run_data.target_type,
default_metric=default_metric,
customized_metrics=None,
model_provider=run_data.evaluation_model_provider,
model_name=run_data.evaluation_model,
node_run_result=node_run_result,
)
else:
default_metric.score = 0
for customized_metric in customized_metrics:
runner = _create_runner(run_data.evaluation_category, evaluation_instance, session)
runner.run(
evaluation_run_id=run_data.evaluation_run_id,
tenant_id=run_data.tenant_id,
target_id=run_data.target_id,
)
def _create_runner(
category: EvaluationCategory,
evaluation_instance: Any,
evaluation_instance: BaseEvaluationInstance,
session: Any,
) -> Any:
) -> BaseEvaluationRunner:
"""Create the appropriate runner for the evaluation category."""
match category:
case EvaluationCategory.LLM: