mirror of
https://github.com/langgenius/dify.git
synced 2026-05-11 23:18:39 +08:00
Merge remote-tracking branch 'origin/feat/evaluation' into feat/evaluation
This commit is contained in:
commit
0439624481
@ -21,7 +21,7 @@ class BaseEvaluationInstance(ABC):
|
||||
def evaluate_llm(
|
||||
self,
|
||||
items: list[EvaluationItemInput],
|
||||
default_metrics: list[dict[str, Any]],
|
||||
default_metrics: str,
|
||||
model_provider: str,
|
||||
model_name: str,
|
||||
tenant_id: str,
|
||||
@ -33,7 +33,7 @@ class BaseEvaluationInstance(ABC):
|
||||
def evaluate_retrieval(
|
||||
self,
|
||||
items: list[EvaluationItemInput],
|
||||
default_metrics: list[dict[str, Any]],
|
||||
default_metrics: str,
|
||||
model_provider: str,
|
||||
model_name: str,
|
||||
tenant_id: str,
|
||||
@ -45,7 +45,7 @@ class BaseEvaluationInstance(ABC):
|
||||
def evaluate_agent(
|
||||
self,
|
||||
items: list[EvaluationItemInput],
|
||||
default_metrics: list[dict[str, Any]],
|
||||
default_metrics: str,
|
||||
model_provider: str,
|
||||
model_name: str,
|
||||
tenant_id: str,
|
||||
@ -53,188 +53,8 @@ class BaseEvaluationInstance(ABC):
|
||||
"""Evaluate agent outputs using the configured framework."""
|
||||
...
|
||||
|
||||
@abstractmethod
|
||||
def evaluate_workflow(
|
||||
self,
|
||||
items: list[EvaluationItemInput],
|
||||
default_metrics: list[dict[str, Any]],
|
||||
model_provider: str,
|
||||
model_name: str,
|
||||
tenant_id: str,
|
||||
) -> list[EvaluationItemResult]:
|
||||
"""Evaluate workflow outputs using the configured framework."""
|
||||
...
|
||||
|
||||
@abstractmethod
|
||||
def get_supported_metrics(self, category: EvaluationCategory) -> list[str]:
|
||||
"""Return the list of supported metric names for a given evaluation category."""
|
||||
...
|
||||
|
||||
def evaluate_with_customized_workflow(
|
||||
self,
|
||||
items: list[EvaluationItemInput],
|
||||
results: list[EvaluationItemResult],
|
||||
customized_metrics: dict[str, Any],
|
||||
tenant_id: str,
|
||||
) -> list[EvaluationItemResult]:
|
||||
"""Evaluate using a published workflow as the evaluator.
|
||||
|
||||
The evaluator workflow's output variables are treated as metrics:
|
||||
each output variable name becomes a metric name, and its value
|
||||
becomes the score.
|
||||
|
||||
Args:
|
||||
items: Evaluation items with inputs, expected_output, context.
|
||||
results: Results from Phase 1 (with actual_output populated).
|
||||
customized_metrics: Must contain ``evaluation_workflow_id``
|
||||
pointing to a published WORKFLOW-type App.
|
||||
tenant_id: Tenant scope.
|
||||
|
||||
Returns:
|
||||
A list of ``EvaluationItemResult`` with metrics extracted from
|
||||
the workflow outputs.
|
||||
"""
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
from core.app.apps.workflow.app_generator import WorkflowAppGenerator
|
||||
from core.app.entities.app_invoke_entities import InvokeFrom
|
||||
from core.evaluation.runners import get_service_account_for_app
|
||||
from models.engine import db
|
||||
from models.model import App
|
||||
from services.workflow_service import WorkflowService
|
||||
|
||||
workflow_id = customized_metrics.get("evaluation_workflow_id")
|
||||
if not workflow_id:
|
||||
raise ValueError(
|
||||
"customized_metrics must contain 'evaluation_workflow_id' for customized evaluator"
|
||||
)
|
||||
|
||||
# Load the evaluator workflow resources using a dedicated session
|
||||
with Session(db.engine, expire_on_commit=False) as session, session.begin():
|
||||
app = session.query(App).filter_by(
|
||||
id=workflow_id, tenant_id=tenant_id
|
||||
).first()
|
||||
if not app:
|
||||
raise ValueError(
|
||||
f"Evaluation workflow app {workflow_id} not found in tenant {tenant_id}"
|
||||
)
|
||||
service_account = get_service_account_for_app(session, workflow_id)
|
||||
|
||||
workflow_service = WorkflowService()
|
||||
published_workflow = workflow_service.get_published_workflow(app_model=app)
|
||||
if not published_workflow:
|
||||
raise ValueError(
|
||||
f"No published workflow found for evaluation app {workflow_id}"
|
||||
)
|
||||
|
||||
result_by_index = {r.index: r for r in results}
|
||||
eval_results: list[EvaluationItemResult] = []
|
||||
for item in items:
|
||||
result = result_by_index.get(item.index)
|
||||
try:
|
||||
workflow_inputs = self._build_workflow_inputs(item, result)
|
||||
|
||||
generator = WorkflowAppGenerator()
|
||||
response: Mapping[str, Any] = generator.generate(
|
||||
app_model=app,
|
||||
workflow=published_workflow,
|
||||
user=service_account,
|
||||
args={"inputs": workflow_inputs},
|
||||
invoke_from=InvokeFrom.SERVICE_API,
|
||||
streaming=False,
|
||||
)
|
||||
|
||||
metrics = self._extract_workflow_metrics(response)
|
||||
eval_results.append(
|
||||
EvaluationItemResult(
|
||||
index=item.index,
|
||||
metrics=metrics,
|
||||
metadata={
|
||||
"workflow_response": _safe_serialize(response),
|
||||
},
|
||||
)
|
||||
)
|
||||
except Exception:
|
||||
logger.exception(
|
||||
"Customized evaluator failed for item %d with workflow %s",
|
||||
item.index,
|
||||
workflow_id,
|
||||
)
|
||||
eval_results.append(EvaluationItemResult(index=item.index))
|
||||
|
||||
return eval_results
|
||||
|
||||
@staticmethod
|
||||
def _build_workflow_inputs(
|
||||
item: EvaluationItemInput,
|
||||
result: EvaluationItemResult | None,
|
||||
) -> dict[str, Any]:
|
||||
"""Build workflow input dict from evaluation data.
|
||||
|
||||
Maps evaluation data to conventional workflow input variable names:
|
||||
- ``actual_output``: The target's actual output (from ``result``).
|
||||
- ``expected_output``: The expected/reference output.
|
||||
- ``inputs``: The original evaluation inputs as a JSON string.
|
||||
- ``context``: All context strings joined by newlines.
|
||||
"""
|
||||
workflow_inputs: dict[str, Any] = {}
|
||||
|
||||
if result and result.actual_output:
|
||||
workflow_inputs["actual_output"] = result.actual_output
|
||||
|
||||
if item.expected_output:
|
||||
workflow_inputs["expected_output"] = item.expected_output
|
||||
|
||||
if item.inputs:
|
||||
workflow_inputs["inputs"] = json.dumps(item.inputs, ensure_ascii=False)
|
||||
|
||||
if item.context:
|
||||
workflow_inputs["context"] = "\n\n".join(item.context)
|
||||
|
||||
return workflow_inputs
|
||||
|
||||
@staticmethod
|
||||
def _extract_workflow_metrics(
|
||||
response: Mapping[str, Any],
|
||||
) -> list[EvaluationMetric]:
|
||||
"""Extract evaluation metrics from workflow output variables.
|
||||
|
||||
Each output variable is treated as a metric. The variable name
|
||||
becomes the metric name, and its value becomes the score.
|
||||
Non-numeric values are recorded with ``score=0.0`` and the raw
|
||||
value stored in ``details``.
|
||||
"""
|
||||
metrics: list[EvaluationMetric] = []
|
||||
|
||||
data = response.get("data", {})
|
||||
if not isinstance(data, Mapping):
|
||||
logger.warning("Unexpected workflow response format: missing 'data' dict")
|
||||
return metrics
|
||||
|
||||
outputs = data.get("outputs", {})
|
||||
if not isinstance(outputs, Mapping):
|
||||
logger.warning(
|
||||
"Unexpected workflow response format: 'outputs' is not a dict"
|
||||
)
|
||||
return metrics
|
||||
|
||||
for key, value in outputs.items():
|
||||
try:
|
||||
score = float(value)
|
||||
metrics.append(EvaluationMetric(name=key, score=score))
|
||||
except (TypeError, ValueError):
|
||||
metrics.append(
|
||||
EvaluationMetric(
|
||||
name=key, score=0.0, details={"raw_value": value}
|
||||
)
|
||||
)
|
||||
|
||||
return metrics
|
||||
|
||||
|
||||
def _safe_serialize(response: Mapping[str, Any]) -> dict[str, Any]:
|
||||
"""Safely serialize workflow response for metadata storage."""
|
||||
try:
|
||||
return dict(response)
|
||||
except Exception:
|
||||
return {"raw": str(response)}
|
||||
|
||||
@ -24,6 +24,7 @@ class EvaluationMetric(BaseModel):
|
||||
class EvaluationItemInput(BaseModel):
|
||||
index: int
|
||||
inputs: dict[str, Any]
|
||||
output: str
|
||||
expected_output: str | None = None
|
||||
context: list[str] | None = None
|
||||
|
||||
@ -89,4 +90,4 @@ class EvaluationRunData(BaseModel):
|
||||
default_metrics: list[DefaultMetric] = Field(default_factory=list)
|
||||
customized_metrics: CustomizedMetrics | None = None
|
||||
judgment_config: JudgmentConfig | None = None
|
||||
input_list: list[dict]
|
||||
input_list: list[EvaluationItemInput]
|
||||
|
||||
@ -42,7 +42,7 @@ class RagasEvaluator(BaseEvaluationInstance):
|
||||
def evaluate_llm(
|
||||
self,
|
||||
items: list[EvaluationItemInput],
|
||||
default_metrics: list[dict[str, Any]],
|
||||
default_metrics: str,
|
||||
model_provider: str,
|
||||
model_name: str,
|
||||
tenant_id: str,
|
||||
@ -52,7 +52,7 @@ class RagasEvaluator(BaseEvaluationInstance):
|
||||
def evaluate_retrieval(
|
||||
self,
|
||||
items: list[EvaluationItemInput],
|
||||
default_metrics: list[dict[str, Any]],
|
||||
default_metrics: str,
|
||||
model_provider: str,
|
||||
model_name: str,
|
||||
tenant_id: str,
|
||||
@ -64,7 +64,7 @@ class RagasEvaluator(BaseEvaluationInstance):
|
||||
def evaluate_agent(
|
||||
self,
|
||||
items: list[EvaluationItemInput],
|
||||
default_metrics: list[dict[str, Any]],
|
||||
default_metrics: str,
|
||||
model_provider: str,
|
||||
model_name: str,
|
||||
tenant_id: str,
|
||||
@ -74,7 +74,7 @@ class RagasEvaluator(BaseEvaluationInstance):
|
||||
def evaluate_workflow(
|
||||
self,
|
||||
items: list[EvaluationItemInput],
|
||||
default_metrics: list[dict[str, Any]],
|
||||
default_metrics: str,
|
||||
model_provider: str,
|
||||
model_name: str,
|
||||
tenant_id: str,
|
||||
@ -86,7 +86,7 @@ class RagasEvaluator(BaseEvaluationInstance):
|
||||
def _evaluate(
|
||||
self,
|
||||
items: list[EvaluationItemInput],
|
||||
default_metrics: list[dict[str, Any]],
|
||||
default_metrics: str,
|
||||
model_provider: str,
|
||||
model_name: str,
|
||||
tenant_id: str,
|
||||
@ -98,9 +98,9 @@ class RagasEvaluator(BaseEvaluationInstance):
|
||||
string similarity if RAGAS import fails.
|
||||
"""
|
||||
model_wrapper = DifyModelWrapper(model_provider, model_name, tenant_id)
|
||||
# Extract metric names from default_metrics list; each item has a "metric" key.
|
||||
# Extract metric names from default_metrics string.
|
||||
requested_metrics = (
|
||||
[m["metric"] for m in default_metrics if "metric" in m]
|
||||
[default_metrics]
|
||||
if default_metrics
|
||||
else self.get_supported_metrics(category)
|
||||
)
|
||||
|
||||
@ -69,8 +69,8 @@ class AgentEvaluationRunner(BaseEvaluationRunner):
|
||||
|
||||
def evaluate_metrics(
|
||||
self,
|
||||
node_run_result_mapping: dict[str, NodeRunResult] | None,
|
||||
node_run_result: NodeRunResult | None,
|
||||
node_run_result_mapping_list: list[dict[str, NodeRunResult]] | None,
|
||||
node_run_result_list: list[NodeRunResult] | None,
|
||||
default_metric: DefaultMetric | None,
|
||||
customized_metrics: CustomizedMetrics | None,
|
||||
model_provider: str,
|
||||
|
||||
@ -42,8 +42,8 @@ class BaseEvaluationRunner(ABC):
|
||||
@abstractmethod
|
||||
def evaluate_metrics(
|
||||
self,
|
||||
node_run_result_mapping: dict[str, NodeRunResult] | None,
|
||||
node_run_result: NodeRunResult | None,
|
||||
node_run_result_mapping_list: list[dict[str, NodeRunResult]] | None,
|
||||
node_run_result_list: list[NodeRunResult] | None,
|
||||
default_metric: DefaultMetric | None,
|
||||
customized_metrics: CustomizedMetrics | None,
|
||||
model_provider: str,
|
||||
@ -59,12 +59,12 @@ class BaseEvaluationRunner(ABC):
|
||||
tenant_id: str,
|
||||
target_id: str,
|
||||
target_type: str,
|
||||
node_run_result: NodeRunResult | None = None,
|
||||
node_run_result_list: list[NodeRunResult] | None = None,
|
||||
default_metric: DefaultMetric | None = None,
|
||||
customized_metrics: CustomizedMetrics | None = None,
|
||||
model_provider: str = "",
|
||||
model_name: str = "",
|
||||
node_run_result_mapping: dict[str, NodeRunResult] | None = None,
|
||||
node_run_result_mapping_list: list[dict[str, NodeRunResult]] | None = None,
|
||||
) -> list[EvaluationItemResult]:
|
||||
"""Orchestrate target execution + metric evaluation + judgment for all items."""
|
||||
evaluation_run = self.session.query(EvaluationRun).filter_by(id=evaluation_run_id).first()
|
||||
@ -82,11 +82,11 @@ class BaseEvaluationRunner(ABC):
|
||||
results: list[EvaluationItemResult] = []
|
||||
|
||||
# Phase 1: run evaluation
|
||||
if node_run_result.status == WorkflowNodeExecutionStatus.SUCCEEDED:
|
||||
if default_metric and node_run_result_list:
|
||||
try:
|
||||
evaluated_results = self.evaluate_metrics(
|
||||
node_run_result_mapping=node_run_result_mapping,
|
||||
node_run_result=node_run_result,
|
||||
node_run_result_mapping_list=node_run_result_mapping_list,
|
||||
node_run_result_list=node_run_result_list,
|
||||
default_metric=default_metric,
|
||||
customized_metrics=customized_metrics,
|
||||
model_provider=model_provider,
|
||||
@ -100,6 +100,19 @@ class BaseEvaluationRunner(ABC):
|
||||
results[i] = evaluated_by_index[result.index]
|
||||
except Exception:
|
||||
logger.exception("Failed to compute metrics for evaluation run %s", evaluation_run_id)
|
||||
if customized_metrics and node_run_result_mapping_list:
|
||||
try:
|
||||
evaluated_results = self.evaluate_metrics(
|
||||
node_run_result_mapping_list=node_run_result_mapping_list,
|
||||
node_run_result_list=node_run_result_list,
|
||||
default_metric=default_metric,
|
||||
customized_metrics=customized_metrics,
|
||||
model_provider=model_provider,
|
||||
model_name=model_name,
|
||||
tenant_id=tenant_id,
|
||||
)
|
||||
except Exception:
|
||||
logger.exception("Failed to compute metrics for evaluation run %s", evaluation_run_id)
|
||||
|
||||
# Phase 4: Persist individual items
|
||||
for result in results:
|
||||
|
||||
@ -26,8 +26,8 @@ class LLMEvaluationRunner(BaseEvaluationRunner):
|
||||
|
||||
def evaluate_metrics(
|
||||
self,
|
||||
node_run_result_mapping: dict[str, NodeRunResult] | None,
|
||||
node_run_result: NodeRunResult | None,
|
||||
node_run_result_mapping_list: list[dict[str, NodeRunResult]] | None,
|
||||
node_run_result_list: list[NodeRunResult] | None,
|
||||
default_metric: DefaultMetric | None,
|
||||
customized_metrics: CustomizedMetrics | None,
|
||||
model_provider: str,
|
||||
@ -36,7 +36,9 @@ class LLMEvaluationRunner(BaseEvaluationRunner):
|
||||
) -> list[EvaluationItemResult]:
|
||||
"""Use the evaluation instance to compute LLM metrics."""
|
||||
# Merge actual_output into items for evaluation
|
||||
merged_items = self._merge_results_into_items(items, results)
|
||||
if not node_run_result_list:
|
||||
return []
|
||||
merged_items = self._merge_results_into_items(node_run_result_list)
|
||||
return self.evaluation_instance.evaluate_llm(
|
||||
merged_items, default_metrics, model_provider, model_name, tenant_id
|
||||
)
|
||||
@ -70,23 +72,19 @@ class LLMEvaluationRunner(BaseEvaluationRunner):
|
||||
|
||||
@staticmethod
|
||||
def _merge_results_into_items(
|
||||
items: list[EvaluationItemInput],
|
||||
results: list[EvaluationItemResult],
|
||||
items: list[NodeRunResult],
|
||||
) -> list[EvaluationItemInput]:
|
||||
"""Create new items with actual_output set as expected_output context for metrics."""
|
||||
result_by_index = {r.index: r for r in results}
|
||||
merged = []
|
||||
for item in items:
|
||||
result = result_by_index.get(item.index)
|
||||
if result and result.actual_output:
|
||||
merged.append(
|
||||
EvaluationItemInput(
|
||||
index=item.index,
|
||||
inputs=item.inputs,
|
||||
expected_output=item.expected_output,
|
||||
context=[result.actual_output] + (item.context or []),
|
||||
)
|
||||
merged.append(
|
||||
EvaluationItemInput(
|
||||
index=item.index,
|
||||
inputs={
|
||||
"prompt": item.prompt,
|
||||
},
|
||||
output=item.output,
|
||||
expected_output=item.expected_output,
|
||||
)
|
||||
else:
|
||||
merged.append(item)
|
||||
)
|
||||
return merged
|
||||
|
||||
@ -24,8 +24,8 @@ class RetrievalEvaluationRunner(BaseEvaluationRunner):
|
||||
|
||||
def evaluate_metrics(
|
||||
self,
|
||||
node_run_result_mapping: dict[str, NodeRunResult] | None,
|
||||
node_run_result: NodeRunResult | None,
|
||||
node_run_result_mapping_list: list[dict[str, NodeRunResult]] | None,
|
||||
node_run_result_list: list[NodeRunResult] | None,
|
||||
default_metric: DefaultMetric | None,
|
||||
customized_metrics: CustomizedMetrics | None,
|
||||
model_provider: str,
|
||||
|
||||
@ -92,8 +92,8 @@ class SnippetEvaluationRunner(BaseEvaluationRunner):
|
||||
|
||||
def evaluate_metrics(
|
||||
self,
|
||||
node_run_result_mapping: dict[str, NodeRunResult] | None,
|
||||
node_run_result: NodeRunResult | None,
|
||||
node_run_result_mapping_list: list[dict[str, NodeRunResult]] | None,
|
||||
node_run_result_list: list[NodeRunResult] | None,
|
||||
default_metric: DefaultMetric | None,
|
||||
customized_metrics: CustomizedMetrics | None,
|
||||
model_provider: str,
|
||||
|
||||
@ -26,8 +26,8 @@ class WorkflowEvaluationRunner(BaseEvaluationRunner):
|
||||
|
||||
def evaluate_metrics(
|
||||
self,
|
||||
node_run_result_mapping: dict[str, NodeRunResult] | None,
|
||||
node_run_result: NodeRunResult | None,
|
||||
node_run_result_mapping_list: list[dict[str, NodeRunResult]] | None,
|
||||
node_run_result_list: list[NodeRunResult] | None,
|
||||
default_metric: DefaultMetric | None,
|
||||
customized_metrics: CustomizedMetrics | None,
|
||||
model_provider: str,
|
||||
|
||||
@ -27,6 +27,7 @@ from extensions.ext_database import db
|
||||
from libs.datetime_utils import naive_utc_now
|
||||
from models.evaluation import EvaluationRun, EvaluationRunStatus
|
||||
from models.model import UploadFile
|
||||
from services.evaluation_service import EvaluationService
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@ -76,7 +77,20 @@ def _execute_evaluation(session: Any, run_data: EvaluationRunData) -> None:
|
||||
if evaluation_instance is None:
|
||||
raise ValueError("Evaluation framework not configured")
|
||||
|
||||
_execute_evaluation_runner(session, run_data, evaluation_instance, node_run_result_mapping)
|
||||
evaluation_service = EvaluationService()
|
||||
node_run_result_mapping_list: list[dict[str, NodeRunResult]] = evaluation_service.execute_targets(
|
||||
tenant_id=run_data.tenant_id,
|
||||
target_type=run_data.target_type,
|
||||
target_id=run_data.target_id,
|
||||
input_list=run_data.input_list,
|
||||
)
|
||||
|
||||
results: list[EvaluationItemResult] = _execute_evaluation_runner(
|
||||
session,
|
||||
run_data,
|
||||
evaluation_instance,
|
||||
node_run_result_mapping_list,
|
||||
)
|
||||
|
||||
|
||||
# Compute summary metrics
|
||||
@ -106,15 +120,19 @@ def _execute_evaluation_runner(
|
||||
session: Any,
|
||||
run_data: EvaluationRunData,
|
||||
evaluation_instance: BaseEvaluationInstance,
|
||||
node_run_result_mapping: dict[str, NodeRunResult],
|
||||
node_run_result_mapping_list: list[dict[str, NodeRunResult]],
|
||||
) -> list[EvaluationItemResult]:
|
||||
"""Execute the evaluation runner."""
|
||||
default_metrics = run_data.default_metrics
|
||||
customized_metrics = run_data.customized_metrics
|
||||
for default_metric in default_metrics:
|
||||
for node_info in default_metric.node_info_list:
|
||||
node_run_result = node_run_result_mapping.get(node_info.node_id)
|
||||
if node_run_result:
|
||||
node_run_result_list: list[NodeRunResult] = []
|
||||
for node_run_result_mapping in node_run_result_mapping_list:
|
||||
node_run_result = node_run_result_mapping.get(node_info.node_id)
|
||||
if node_run_result is not None:
|
||||
node_run_result_list.append(node_run_result)
|
||||
if node_run_result_list:
|
||||
runner = _create_runner(EvaluationCategory(node_info.type), evaluation_instance, session)
|
||||
runner.run(
|
||||
evaluation_run_id=run_data.evaluation_run_id,
|
||||
@ -125,10 +143,8 @@ def _execute_evaluation_runner(
|
||||
customized_metrics=None,
|
||||
model_provider=run_data.evaluation_model_provider,
|
||||
model_name=run_data.evaluation_model,
|
||||
node_run_result=node_run_result,
|
||||
node_run_result_list=node_run_result_list,
|
||||
)
|
||||
else:
|
||||
default_metric.score = 0
|
||||
if customized_metrics:
|
||||
runner = _create_runner(EvaluationCategory.WORKFLOW, evaluation_instance, session)
|
||||
runner.run(
|
||||
@ -138,8 +154,8 @@ def _execute_evaluation_runner(
|
||||
target_type=run_data.target_type,
|
||||
default_metric=None,
|
||||
customized_metrics=customized_metrics,
|
||||
node_run_result=None,
|
||||
node_run_result_mapping=node_run_result_mapping,
|
||||
node_run_result_list=None,
|
||||
node_run_result_mapping_list=node_run_result_mapping_list,
|
||||
)
|
||||
|
||||
def _create_runner(
|
||||
|
||||
Loading…
Reference in New Issue
Block a user