feat: implement querying nodeInfos related to metrics.

This commit is contained in:
FFXN 2026-03-17 14:07:23 +08:00
parent c20be9c815
commit df78acd169
3 changed files with 137 additions and 0 deletions

View File

@ -483,6 +483,48 @@ class EvaluationMetricsApi(Resource):
return {"metrics": result}
@console_ns.route("/<string:evaluate_target_type>/<uuid:evaluate_target_id>/evaluation/node-info")
class EvaluationNodeInfoApi(Resource):
@console_ns.doc("get_evaluation_node_info")
@console_ns.response(200, "Node info grouped by metric")
@console_ns.response(404, "Target not found")
@setup_required
@login_required
@account_initialization_required
@get_evaluation_target
def post(self, target: Union[App, CustomizedSnippet], target_type: str):
"""Return workflow/snippet node info grouped by requested metrics.
Request body (JSON):
- metrics: list[str] | None metric names to query; omit or pass
an empty list to get all nodes under key ``"all"``.
Response:
``{metric_or_all: [{"node_id": ..., "type": ..., "title": ...}, ...]}``
"""
body = request.get_json(silent=True) or {}
metrics: list[str] | None = body.get("metrics") or None
result = EvaluationService.get_nodes_for_metrics(
target=target,
target_type=target_type,
metrics=metrics,
)
return result
@console_ns.route("/evaluation/available-metrics")
class EvaluationAvailableMetricsApi(Resource):
@console_ns.doc("get_available_evaluation_metrics")
@console_ns.response(200, "Available metrics list")
@setup_required
@login_required
@account_initialization_required
def get(self):
"""Return the centrally-defined list of evaluation metrics."""
return {"metrics": EvaluationService.get_available_metrics()}
@console_ns.route("/<string:evaluate_target_type>/<uuid:evaluate_target_id>/evaluation/files/<uuid:file_id>")
class EvaluationFileDownloadApi(Resource):
@console_ns.doc("download_evaluation_file")

View File

@ -15,6 +15,29 @@ class EvaluationCategory(StrEnum):
RETRIEVAL_TEST = "retrieval_test"
# ---- Default Metrics & Node-Type Mapping ----
EVALUATION_METRICS: list[str] = [
"Groundedness",
"Correctness",
"Context Precision",
"Context Recall",
"Tool Correctness",
"Task Completion",
"Argument Correctness",
]
METRIC_NODE_TYPE_MAPPING: dict[str, str] = {
"Groundedness": "llm",
"Correctness": "llm",
"Context Precision": "knowledge-retrieval",
"Context Recall": "knowledge-retrieval",
"Tool Correctness": "agent",
"Task Completion": "agent",
"Argument Correctness": "agent",
}
class EvaluationMetric(BaseModel):
name: str
value: Any

View File

@ -11,12 +11,15 @@ from sqlalchemy.orm import Session
from configs import dify_config
from core.evaluation.entities.evaluation_entity import (
EVALUATION_METRICS,
METRIC_NODE_TYPE_MAPPING,
DefaultMetric,
EvaluationCategory,
EvaluationConfigData,
EvaluationDatasetInput,
EvaluationRunData,
EvaluationRunRequest,
NodeInfo,
)
from core.evaluation.evaluation_manager import EvaluationManager
from core.workflow.enums import WorkflowNodeExecutionMetadataKey
@ -423,6 +426,75 @@ class EvaluationService:
def get_supported_metrics(cls, category: EvaluationCategory) -> list[str]:
return EvaluationManager.get_supported_metrics(category)
@staticmethod
def get_available_metrics() -> list[str]:
"""Return the centrally-defined list of evaluation metrics."""
return list(EVALUATION_METRICS)
@classmethod
def get_nodes_for_metrics(
cls,
target: Union[App, CustomizedSnippet],
target_type: str,
metrics: list[str] | None = None,
) -> dict[str, list[dict[str, str]]]:
"""Return node info grouped by metric (or all nodes when *metrics* is empty).
:param target: App or CustomizedSnippet instance.
:param target_type: ``"app"`` or ``"snippets"``.
:param metrics: Optional list of metric names to filter by.
When *None* or empty, returns ``{"all": [<every node>]}``.
:returns: ``{metric_name: [NodeInfo dict, ...]}`` or
``{"all": [NodeInfo dict, ...]}``.
"""
workflow = cls._resolve_workflow(target, target_type)
if not workflow:
return {"all": []} if not metrics else {m: [] for m in metrics}
if not metrics:
all_nodes = [
NodeInfo(node_id=node_id, type=node_data.get("type", ""), title=node_data.get("title", "")).model_dump()
for node_id, node_data in workflow.walk_nodes()
]
return {"all": all_nodes}
node_type_to_nodes: dict[str, list[dict[str, str]]] = {}
for node_id, node_data in workflow.walk_nodes():
ntype = node_data.get("type", "")
node_type_to_nodes.setdefault(ntype, []).append(
NodeInfo(node_id=node_id, type=ntype, title=node_data.get("title", "")).model_dump()
)
result: dict[str, list[dict[str, str]]] = {}
for metric in metrics:
required_node_type = METRIC_NODE_TYPE_MAPPING.get(metric)
if required_node_type is None:
result[metric] = []
continue
result[metric] = node_type_to_nodes.get(required_node_type, [])
return result
@classmethod
def _resolve_workflow(
cls,
target: Union[App, CustomizedSnippet],
target_type: str,
) -> "Workflow | None":
"""Resolve the *published* (preferred) or *draft* workflow for the target."""
if target_type == "snippets" and isinstance(target, CustomizedSnippet):
snippet_service = SnippetService()
workflow = snippet_service.get_published_workflow(snippet=target)
if not workflow:
workflow = snippet_service.get_draft_workflow(snippet=target)
return workflow
elif target_type == "app" and isinstance(target, App):
workflow_service = WorkflowService()
workflow = workflow_service.get_published_workflow(app_model=target)
if not workflow:
workflow = workflow_service.get_draft_workflow(app_model=target)
return workflow
return None
# ---- Category Resolution ----
@classmethod