mirror of
https://github.com/langgenius/dify.git
synced 2026-06-23 04:11:09 +08:00
710 lines
30 KiB
Python
710 lines
30 KiB
Python
from __future__ import annotations
|
|
|
|
from dataclasses import dataclass
|
|
from datetime import datetime
|
|
from decimal import Decimal
|
|
from typing import Any
|
|
|
|
import sqlalchemy as sa
|
|
from sqlalchemy import and_, func, or_, select
|
|
from sqlalchemy.orm import aliased
|
|
|
|
from core.app.entities.app_invoke_entities import InvokeFrom
|
|
from libs.helper import convert_datetime_to_date, escape_like_pattern, to_timestamp
|
|
from models.agent import WorkflowAgentNodeBinding
|
|
from models.enums import MessageStatus
|
|
from models.model import App, Conversation, Message
|
|
from models.workflow import WorkflowNodeExecutionModel, WorkflowRun
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class AgentLogQueryParams:
|
|
page: int = 1
|
|
limit: int = 20
|
|
keyword: str | None = None
|
|
status: str | None = None
|
|
source: str | None = None
|
|
start: datetime | None = None
|
|
end: datetime | None = None
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class AgentStatisticsQueryParams:
|
|
source: str | None = None
|
|
start: datetime | None = None
|
|
end: datetime | None = None
|
|
timezone: str = "UTC"
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class AgentSourceFilter:
|
|
kind: str
|
|
app_id: str | None = None
|
|
workflow_id: str | None = None
|
|
workflow_version: str | None = None
|
|
node_id: str | None = None
|
|
invoke_from: InvokeFrom | None = None
|
|
|
|
|
|
class AgentObservabilityService:
|
|
_SOURCE_ALIASES: dict[str, InvokeFrom] = {
|
|
"api": InvokeFrom.SERVICE_API,
|
|
"service-api": InvokeFrom.SERVICE_API,
|
|
"service_api": InvokeFrom.SERVICE_API,
|
|
"console": InvokeFrom.EXPLORE,
|
|
"explore": InvokeFrom.EXPLORE,
|
|
"explore-app": InvokeFrom.EXPLORE,
|
|
"explore_app": InvokeFrom.EXPLORE,
|
|
"web": InvokeFrom.WEB_APP,
|
|
"web-app": InvokeFrom.WEB_APP,
|
|
"web_app": InvokeFrom.WEB_APP,
|
|
"debugger": InvokeFrom.DEBUGGER,
|
|
"dev": InvokeFrom.DEBUGGER,
|
|
"openapi": InvokeFrom.OPENAPI,
|
|
"trigger": InvokeFrom.TRIGGER,
|
|
}
|
|
|
|
def __init__(self, session: Any):
|
|
self._session = session
|
|
|
|
@classmethod
|
|
def resolve_source(cls, source: str | None) -> InvokeFrom | None:
|
|
if not source or source == "all":
|
|
return None
|
|
normalized = source.strip().lower()
|
|
if not normalized or normalized == "all":
|
|
return None
|
|
try:
|
|
return cls._SOURCE_ALIASES[normalized]
|
|
except KeyError as exc:
|
|
raise ValueError(f"Unsupported source: {source}") from exc
|
|
|
|
@classmethod
|
|
def resolve_source_filter(cls, source: str | None) -> AgentSourceFilter:
|
|
if not source or source.strip().lower() == "all":
|
|
return AgentSourceFilter(kind="all")
|
|
normalized = source.strip()
|
|
lowered = normalized.lower()
|
|
if lowered == "webapp":
|
|
return AgentSourceFilter(kind="webapp")
|
|
if lowered.startswith("webapp:"):
|
|
return AgentSourceFilter(kind="webapp", app_id=normalized.split(":", 1)[1] or None)
|
|
if lowered == "workflow":
|
|
return AgentSourceFilter(kind="workflow")
|
|
if lowered.startswith("workflow:"):
|
|
parts = normalized.split(":", 4)
|
|
if len(parts) != 5 or not all(parts[1:]):
|
|
raise ValueError(f"Unsupported source: {source}")
|
|
return AgentSourceFilter(
|
|
kind="workflow",
|
|
app_id=parts[1],
|
|
workflow_id=parts[2],
|
|
workflow_version=parts[3],
|
|
node_id=parts[4],
|
|
)
|
|
return AgentSourceFilter(kind="webapp", invoke_from=cls.resolve_source(source))
|
|
|
|
@staticmethod
|
|
def _message_status(message: Message) -> str:
|
|
if message.error or message.status == MessageStatus.ERROR:
|
|
return "failed"
|
|
if message.status == MessageStatus.PAUSED:
|
|
return "paused"
|
|
return "success"
|
|
|
|
@staticmethod
|
|
def _total_tokens(message: Message) -> int:
|
|
return int(message.message_tokens or 0) + int(message.answer_tokens or 0)
|
|
|
|
@classmethod
|
|
def serialize_log_message(cls, message: Message, conversation: Conversation | None = None) -> dict[str, Any]:
|
|
invoke_from = message.invoke_from.value if message.invoke_from else None
|
|
return {
|
|
"id": message.id,
|
|
"message_id": message.id,
|
|
"conversation_id": message.conversation_id,
|
|
"conversation_name": conversation.name if conversation else None,
|
|
"query": message.query,
|
|
"answer": message.answer,
|
|
"status": cls._message_status(message),
|
|
"error": message.error,
|
|
"source": invoke_from,
|
|
"from_source": message.from_source.value if message.from_source else None,
|
|
"from_end_user_id": message.from_end_user_id,
|
|
"from_account_id": message.from_account_id,
|
|
"message_tokens": int(message.message_tokens or 0),
|
|
"answer_tokens": int(message.answer_tokens or 0),
|
|
"total_tokens": cls._total_tokens(message),
|
|
"total_price": str(message.total_price or Decimal(0)),
|
|
"currency": message.currency,
|
|
"latency": float(message.provider_response_latency or 0),
|
|
"created_at": to_timestamp(message.created_at),
|
|
"updated_at": to_timestamp(message.updated_at),
|
|
}
|
|
|
|
def list_logs(self, *, app: App, agent_id: str, params: AgentLogQueryParams) -> dict[str, Any]:
|
|
source_filter = self.resolve_source_filter(params.source)
|
|
rows: list[dict[str, Any]] = []
|
|
if source_filter.kind in {"all", "webapp"}:
|
|
rows.extend(self._list_webapp_conversation_logs(app=app, params=params, source_filter=source_filter))
|
|
if source_filter.kind in {"all", "workflow"}:
|
|
rows.extend(
|
|
self._list_workflow_conversation_logs(
|
|
app=app,
|
|
agent_id=agent_id,
|
|
params=params,
|
|
source_filter=source_filter,
|
|
)
|
|
)
|
|
rows.sort(key=lambda row: (row["updated_at"] or 0, row["id"]), reverse=True)
|
|
|
|
total = len(rows)
|
|
start = (params.page - 1) * params.limit
|
|
end = start + params.limit
|
|
return {
|
|
"data": rows[start:end],
|
|
"page": params.page,
|
|
"limit": params.limit,
|
|
"total": total,
|
|
"has_more": end < total,
|
|
}
|
|
|
|
def list_log_messages(
|
|
self, *, app: App, agent_id: str, conversation_id: str, params: AgentLogQueryParams
|
|
) -> dict[str, Any]:
|
|
source_filter = self.resolve_source_filter(params.source)
|
|
rows: list[Message] = []
|
|
if source_filter.kind in {"all", "webapp"}:
|
|
rows.extend(
|
|
self._list_webapp_messages(
|
|
app=app,
|
|
conversation_id=conversation_id,
|
|
params=params,
|
|
source_filter=source_filter,
|
|
)
|
|
)
|
|
if source_filter.kind in {"all", "workflow"}:
|
|
rows.extend(
|
|
self._list_workflow_messages(
|
|
app=app,
|
|
agent_id=agent_id,
|
|
conversation_id=conversation_id,
|
|
params=params,
|
|
source_filter=source_filter,
|
|
)
|
|
)
|
|
|
|
deduped = {message.id: message for message in rows}
|
|
sorted_rows = sorted(deduped.values(), key=lambda message: (message.created_at, message.id), reverse=True)
|
|
total = len(sorted_rows)
|
|
start = (params.page - 1) * params.limit
|
|
end = start + params.limit
|
|
return {
|
|
"data": [self.serialize_log_message(message) for message in sorted_rows[start:end]],
|
|
"page": params.page,
|
|
"limit": params.limit,
|
|
"total": total,
|
|
"has_more": end < total,
|
|
}
|
|
|
|
def list_log_sources(self, *, app: App, agent_id: str) -> dict[str, Any]:
|
|
webapp_source = self._serialize_webapp_source(app)
|
|
workflow_sources = self._list_workflow_sources(app=app, agent_id=agent_id)
|
|
return {
|
|
"data": [webapp_source, *workflow_sources],
|
|
"groups": [
|
|
{"type": "webapp", "label": "WEBAPP", "sources": [webapp_source]},
|
|
{"type": "workflow", "label": "WORKFLOW", "sources": workflow_sources},
|
|
],
|
|
}
|
|
|
|
def _list_webapp_conversation_logs(
|
|
self, *, app: App, params: AgentLogQueryParams, source_filter: AgentSourceFilter
|
|
) -> list[dict[str, Any]]:
|
|
stmt = (
|
|
select(
|
|
Conversation,
|
|
func.count(Message.id).label("message_count"),
|
|
func.max(Message.created_at).label("created_at"),
|
|
func.max(Message.updated_at).label("updated_at"),
|
|
func.sum(sa.case((Message.status == MessageStatus.PAUSED, 1), else_=0)).label("paused_count"),
|
|
func.sum(
|
|
sa.case((or_(Message.error.is_not(None), Message.status == MessageStatus.ERROR), 1), else_=0)
|
|
).label("failed_count"),
|
|
)
|
|
.join(Message, Message.conversation_id == Conversation.id)
|
|
.where(Message.app_id == app.id, Conversation.app_id == app.id)
|
|
.group_by(Conversation.id)
|
|
)
|
|
stmt = self._apply_observability_filters(stmt, params=params, source_filter=source_filter)
|
|
rows = list(self._session.execute(stmt).all())
|
|
return [
|
|
self._serialize_conversation_log(
|
|
conversation=row[0],
|
|
message_count=row.message_count,
|
|
paused_count=row.paused_count,
|
|
failed_count=row.failed_count,
|
|
source=self._serialize_webapp_source(app),
|
|
created_at=row.created_at,
|
|
updated_at=row.updated_at,
|
|
)
|
|
for row in rows
|
|
]
|
|
|
|
def _list_workflow_conversation_logs(
|
|
self, *, app: App, agent_id: str, params: AgentLogQueryParams, source_filter: AgentSourceFilter
|
|
) -> list[dict[str, Any]]:
|
|
workflow_app = aliased(App)
|
|
stmt = (
|
|
select(
|
|
Conversation,
|
|
workflow_app,
|
|
WorkflowAgentNodeBinding.workflow_id,
|
|
WorkflowAgentNodeBinding.workflow_version,
|
|
WorkflowAgentNodeBinding.node_id,
|
|
func.count(sa.distinct(Message.id)).label("message_count"),
|
|
func.max(Message.created_at).label("created_at"),
|
|
func.max(Message.updated_at).label("updated_at"),
|
|
func.sum(sa.case((Message.status == MessageStatus.PAUSED, 1), else_=0)).label("paused_count"),
|
|
func.sum(
|
|
sa.case((or_(Message.error.is_not(None), Message.status == MessageStatus.ERROR), 1), else_=0)
|
|
).label("failed_count"),
|
|
)
|
|
.select_from(Message)
|
|
.join(Conversation, Conversation.id == Message.conversation_id)
|
|
.join(WorkflowRun, WorkflowRun.id == Message.workflow_run_id)
|
|
.join(
|
|
WorkflowAgentNodeBinding,
|
|
and_(
|
|
WorkflowAgentNodeBinding.tenant_id == app.tenant_id,
|
|
WorkflowAgentNodeBinding.agent_id == agent_id,
|
|
WorkflowAgentNodeBinding.app_id == WorkflowRun.app_id,
|
|
WorkflowAgentNodeBinding.workflow_id == WorkflowRun.workflow_id,
|
|
WorkflowAgentNodeBinding.workflow_version == WorkflowRun.version,
|
|
),
|
|
)
|
|
.join(
|
|
WorkflowNodeExecutionModel,
|
|
and_(
|
|
WorkflowNodeExecutionModel.workflow_run_id == WorkflowRun.id,
|
|
WorkflowNodeExecutionModel.node_id == WorkflowAgentNodeBinding.node_id,
|
|
),
|
|
)
|
|
.join(workflow_app, workflow_app.id == WorkflowAgentNodeBinding.app_id)
|
|
.where(Message.workflow_run_id.is_not(None), Conversation.app_id == WorkflowAgentNodeBinding.app_id)
|
|
.group_by(
|
|
Conversation.id,
|
|
workflow_app.id,
|
|
WorkflowAgentNodeBinding.workflow_id,
|
|
WorkflowAgentNodeBinding.workflow_version,
|
|
WorkflowAgentNodeBinding.node_id,
|
|
)
|
|
)
|
|
stmt = self._apply_observability_filters(stmt, params=params, source_filter=source_filter)
|
|
stmt = self._apply_workflow_source_filter(stmt, source_filter)
|
|
rows = list(self._session.execute(stmt).all())
|
|
return [
|
|
self._serialize_conversation_log(
|
|
conversation=row[0],
|
|
message_count=row.message_count,
|
|
paused_count=row.paused_count,
|
|
failed_count=row.failed_count,
|
|
source=self._serialize_workflow_source(
|
|
app=row[1],
|
|
workflow_id=row.workflow_id,
|
|
workflow_version=row.workflow_version,
|
|
node_id=row.node_id,
|
|
),
|
|
created_at=row.created_at,
|
|
updated_at=row.updated_at,
|
|
)
|
|
for row in rows
|
|
]
|
|
|
|
def _list_webapp_messages(
|
|
self, *, app: App, conversation_id: str, params: AgentLogQueryParams, source_filter: AgentSourceFilter
|
|
) -> list[Message]:
|
|
stmt = select(Message).where(Message.app_id == app.id, Message.conversation_id == conversation_id)
|
|
stmt = self._apply_message_filters(stmt, params=params, source_filter=source_filter)
|
|
return list(self._session.scalars(stmt.order_by(Message.created_at.desc(), Message.id.desc())).all())
|
|
|
|
def _list_workflow_messages(
|
|
self,
|
|
*,
|
|
app: App,
|
|
agent_id: str,
|
|
conversation_id: str,
|
|
params: AgentLogQueryParams,
|
|
source_filter: AgentSourceFilter,
|
|
) -> list[Message]:
|
|
stmt = (
|
|
select(Message)
|
|
.join(WorkflowRun, WorkflowRun.id == Message.workflow_run_id)
|
|
.join(
|
|
WorkflowAgentNodeBinding,
|
|
and_(
|
|
WorkflowAgentNodeBinding.tenant_id == app.tenant_id,
|
|
WorkflowAgentNodeBinding.agent_id == agent_id,
|
|
WorkflowAgentNodeBinding.app_id == WorkflowRun.app_id,
|
|
WorkflowAgentNodeBinding.workflow_id == WorkflowRun.workflow_id,
|
|
WorkflowAgentNodeBinding.workflow_version == WorkflowRun.version,
|
|
),
|
|
)
|
|
.join(
|
|
WorkflowNodeExecutionModel,
|
|
and_(
|
|
WorkflowNodeExecutionModel.workflow_run_id == WorkflowRun.id,
|
|
WorkflowNodeExecutionModel.node_id == WorkflowAgentNodeBinding.node_id,
|
|
),
|
|
)
|
|
.where(Message.conversation_id == conversation_id)
|
|
)
|
|
stmt = self._apply_message_filters(stmt, params=params, source_filter=source_filter)
|
|
stmt = self._apply_workflow_source_filter(stmt, source_filter)
|
|
return list(self._session.scalars(stmt.order_by(Message.created_at.desc(), Message.id.desc())).all())
|
|
|
|
def _list_workflow_sources(self, *, app: App, agent_id: str) -> list[dict[str, Any]]:
|
|
workflow_app = aliased(App)
|
|
stmt = (
|
|
select(
|
|
workflow_app,
|
|
WorkflowAgentNodeBinding.workflow_id,
|
|
WorkflowAgentNodeBinding.workflow_version,
|
|
WorkflowAgentNodeBinding.node_id,
|
|
)
|
|
.join(workflow_app, workflow_app.id == WorkflowAgentNodeBinding.app_id)
|
|
.where(WorkflowAgentNodeBinding.tenant_id == app.tenant_id, WorkflowAgentNodeBinding.agent_id == agent_id)
|
|
.order_by(workflow_app.name.asc(), WorkflowAgentNodeBinding.node_id.asc())
|
|
)
|
|
rows = self._session.execute(stmt).all()
|
|
deduped: dict[str, dict[str, Any]] = {}
|
|
for row in rows:
|
|
source = self._serialize_workflow_source(
|
|
app=row[0],
|
|
workflow_id=row.workflow_id,
|
|
workflow_version=row.workflow_version,
|
|
node_id=row.node_id,
|
|
)
|
|
deduped[source["id"]] = source
|
|
return list(deduped.values())
|
|
|
|
@classmethod
|
|
def _apply_observability_filters(cls, stmt, *, params: AgentLogQueryParams, source_filter: AgentSourceFilter):
|
|
stmt = cls._apply_message_filters(stmt, params=params, source_filter=source_filter, include_keyword=False)
|
|
if params.keyword:
|
|
escaped_keyword = escape_like_pattern(params.keyword)
|
|
pattern = f"%{escaped_keyword}%"
|
|
stmt = stmt.where(
|
|
or_(
|
|
Message.query.ilike(pattern, escape="\\"),
|
|
Message.answer.ilike(pattern, escape="\\"),
|
|
Conversation.name.ilike(pattern, escape="\\"),
|
|
)
|
|
)
|
|
return stmt
|
|
|
|
@classmethod
|
|
def _apply_message_filters(
|
|
cls, stmt, *, params: AgentLogQueryParams, source_filter: AgentSourceFilter, include_keyword: bool = True
|
|
):
|
|
stmt = cls._apply_source_filter(stmt, source_filter.invoke_from)
|
|
if params.start:
|
|
stmt = stmt.where(Message.created_at >= params.start)
|
|
if params.end:
|
|
stmt = stmt.where(Message.created_at < params.end)
|
|
if include_keyword and params.keyword:
|
|
escaped_keyword = escape_like_pattern(params.keyword)
|
|
pattern = f"%{escaped_keyword}%"
|
|
stmt = stmt.where(
|
|
or_(
|
|
Message.query.ilike(pattern, escape="\\"),
|
|
Message.answer.ilike(pattern, escape="\\"),
|
|
)
|
|
)
|
|
if params.status:
|
|
stmt = cls._apply_status_filter(stmt, params.status)
|
|
return stmt
|
|
|
|
@staticmethod
|
|
def _apply_workflow_source_filter(stmt, source_filter: AgentSourceFilter):
|
|
if source_filter.app_id:
|
|
stmt = stmt.where(WorkflowAgentNodeBinding.app_id == source_filter.app_id)
|
|
if source_filter.workflow_id:
|
|
stmt = stmt.where(WorkflowAgentNodeBinding.workflow_id == source_filter.workflow_id)
|
|
if source_filter.workflow_version:
|
|
stmt = stmt.where(WorkflowAgentNodeBinding.workflow_version == source_filter.workflow_version)
|
|
if source_filter.node_id:
|
|
stmt = stmt.where(WorkflowAgentNodeBinding.node_id == source_filter.node_id)
|
|
return stmt
|
|
|
|
@classmethod
|
|
def _apply_source_filter(cls, stmt, source: InvokeFrom | None):
|
|
if source is None:
|
|
return stmt.where(Message.invoke_from != InvokeFrom.DEBUGGER)
|
|
return stmt.where(Message.invoke_from == source)
|
|
|
|
@staticmethod
|
|
def _apply_status_filter(stmt, status: str):
|
|
normalized = status.strip().lower()
|
|
if normalized in {"success", "normal"}:
|
|
return stmt.where(Message.error.is_(None), Message.status == MessageStatus.NORMAL)
|
|
if normalized in {"failed", "error"}:
|
|
return stmt.where(or_(Message.error.is_not(None), Message.status == MessageStatus.ERROR))
|
|
if normalized == "paused":
|
|
return stmt.where(Message.status == MessageStatus.PAUSED)
|
|
raise ValueError(f"Unsupported status: {status}")
|
|
|
|
@classmethod
|
|
def _serialize_conversation_log(
|
|
cls,
|
|
*,
|
|
conversation: Conversation,
|
|
message_count: int,
|
|
paused_count: int,
|
|
failed_count: int,
|
|
source: dict[str, Any],
|
|
created_at: datetime | None,
|
|
updated_at: datetime | None,
|
|
) -> dict[str, Any]:
|
|
return {
|
|
"id": conversation.id,
|
|
"conversation_id": conversation.id,
|
|
"title": conversation.name,
|
|
"end_user_id": conversation.from_end_user_id,
|
|
"message_count": int(message_count or 0),
|
|
"user_rate": None,
|
|
"operation_rate": None,
|
|
"unread": conversation.read_at is None,
|
|
"source": source,
|
|
"status": cls._conversation_status(paused_count=paused_count, failed_count=failed_count),
|
|
"created_at": to_timestamp(created_at or conversation.created_at),
|
|
"updated_at": to_timestamp(updated_at or conversation.updated_at),
|
|
}
|
|
|
|
@staticmethod
|
|
def _conversation_status(*, paused_count: int, failed_count: int) -> str:
|
|
if paused_count:
|
|
return "paused"
|
|
if failed_count:
|
|
return "failed"
|
|
return "success"
|
|
|
|
@staticmethod
|
|
def _serialize_webapp_source(app: App) -> dict[str, Any]:
|
|
icon_type = app.icon_type.value if app.icon_type else None
|
|
return {
|
|
"id": f"webapp:{app.id}",
|
|
"type": "webapp",
|
|
"app_id": app.id,
|
|
"app_name": app.name,
|
|
"app_icon_type": icon_type,
|
|
"app_icon": app.icon,
|
|
"app_icon_background": app.icon_background,
|
|
"workflow_id": None,
|
|
"workflow_version": None,
|
|
"node_id": None,
|
|
}
|
|
|
|
@staticmethod
|
|
def _serialize_workflow_source(
|
|
*,
|
|
app: App,
|
|
workflow_id: str,
|
|
workflow_version: str,
|
|
node_id: str,
|
|
) -> dict[str, Any]:
|
|
icon_type = app.icon_type.value if app.icon_type else None
|
|
return {
|
|
"id": f"workflow:{app.id}:{workflow_id}:{workflow_version}:{node_id}",
|
|
"type": "workflow",
|
|
"app_id": app.id,
|
|
"app_name": app.name,
|
|
"app_icon_type": icon_type,
|
|
"app_icon": app.icon,
|
|
"app_icon_background": app.icon_background,
|
|
"workflow_id": workflow_id,
|
|
"workflow_version": workflow_version,
|
|
"node_id": node_id,
|
|
}
|
|
|
|
def get_statistics_summary(self, *, app: App, agent_id: str, params: AgentStatisticsQueryParams) -> dict[str, Any]:
|
|
source_filter = self.resolve_source_filter(params.source)
|
|
rows = self._load_daily_statistics(app=app, agent_id=agent_id, params=params, source_filter=source_filter)
|
|
charts = self._build_charts(rows)
|
|
summary = self._build_summary(rows)
|
|
return {
|
|
"source": params.source or "all",
|
|
"summary": summary,
|
|
"charts": charts,
|
|
}
|
|
|
|
def _load_daily_statistics(
|
|
self, *, app: App, agent_id: str, params: AgentStatisticsQueryParams, source_filter: AgentSourceFilter
|
|
) -> list[dict[str, Any]]:
|
|
converted_created_at = convert_datetime_to_date("m.created_at")
|
|
message_scope = self._statistics_message_scope_sql(source_filter)
|
|
sql_query = f"""SELECT
|
|
{converted_created_at} AS date,
|
|
COUNT(m.id) AS message_count,
|
|
COUNT(DISTINCT m.conversation_id) AS conversation_count,
|
|
COUNT(DISTINCT m.from_end_user_id) AS end_user_count,
|
|
COALESCE(SUM(COALESCE(m.message_tokens, 0) + COALESCE(m.answer_tokens, 0)), 0) AS token_count,
|
|
COALESCE(SUM(COALESCE(m.total_price, 0)), 0) AS total_price,
|
|
COALESCE(AVG(m.provider_response_latency), 0) AS avg_latency,
|
|
COALESCE(SUM(m.provider_response_latency), 0) AS latency_sum,
|
|
COALESCE(SUM(m.answer_tokens), 0) AS answer_tokens,
|
|
COUNT(mf.id) AS like_count
|
|
FROM messages m
|
|
LEFT JOIN message_feedbacks mf
|
|
ON mf.message_id = m.id AND mf.rating = 'like'
|
|
WHERE
|
|
{message_scope}"""
|
|
args: dict[str, Any] = {
|
|
"tz": params.timezone,
|
|
"app_id": app.id,
|
|
"tenant_id": app.tenant_id,
|
|
"agent_id": agent_id,
|
|
"debugger": InvokeFrom.DEBUGGER,
|
|
}
|
|
if source_filter.invoke_from is not None:
|
|
args["source"] = source_filter.invoke_from
|
|
if source_filter.app_id:
|
|
args["source_app_id"] = source_filter.app_id
|
|
if source_filter.workflow_id:
|
|
args["workflow_id"] = source_filter.workflow_id
|
|
if source_filter.workflow_version:
|
|
args["workflow_version"] = source_filter.workflow_version
|
|
if source_filter.node_id:
|
|
args["node_id"] = source_filter.node_id
|
|
if params.start:
|
|
sql_query += " AND m.created_at >= :start"
|
|
args["start"] = params.start
|
|
if params.end:
|
|
sql_query += " AND m.created_at < :end"
|
|
args["end"] = params.end
|
|
sql_query += " GROUP BY date ORDER BY date"
|
|
|
|
return [dict(row._mapping) for row in self._session.execute(sa.text(sql_query), args).all()]
|
|
|
|
@staticmethod
|
|
def _statistics_message_scope_sql(source_filter: AgentSourceFilter) -> str:
|
|
app_scope = "m.app_id = :app_id"
|
|
if source_filter.invoke_from is None:
|
|
app_scope += " AND m.invoke_from != :debugger"
|
|
else:
|
|
app_scope += " AND m.invoke_from = :source"
|
|
workflow_binding_filters = []
|
|
if source_filter.app_id:
|
|
workflow_binding_filters.append("wanb.app_id = :source_app_id")
|
|
if source_filter.workflow_id:
|
|
workflow_binding_filters.append("wanb.workflow_id = :workflow_id")
|
|
if source_filter.workflow_version:
|
|
workflow_binding_filters.append("wanb.workflow_version = :workflow_version")
|
|
if source_filter.node_id:
|
|
workflow_binding_filters.append("wanb.node_id = :node_id")
|
|
extra_workflow_filters = f"AND {' AND '.join(workflow_binding_filters)}" if workflow_binding_filters else ""
|
|
workflow_scope = f"""m.workflow_run_id IS NOT NULL
|
|
AND EXISTS (
|
|
SELECT 1
|
|
FROM workflow_runs wr
|
|
JOIN workflow_agent_node_bindings wanb
|
|
ON wanb.tenant_id = :tenant_id
|
|
AND wanb.agent_id = :agent_id
|
|
AND wanb.app_id = wr.app_id
|
|
AND wanb.workflow_id = wr.workflow_id
|
|
AND wanb.workflow_version = wr.version
|
|
{extra_workflow_filters}
|
|
JOIN workflow_node_executions wne
|
|
ON wne.workflow_run_id = wr.id
|
|
AND wne.node_id = wanb.node_id
|
|
WHERE wr.id = m.workflow_run_id
|
|
)"""
|
|
if source_filter.kind == "webapp":
|
|
return app_scope
|
|
if source_filter.kind == "workflow":
|
|
return workflow_scope
|
|
return f"(({app_scope}) OR ({workflow_scope}))"
|
|
|
|
@staticmethod
|
|
def _build_charts(rows: list[dict[str, Any]]) -> dict[str, list[dict[str, Any]]]:
|
|
messages = []
|
|
conversations = []
|
|
end_users = []
|
|
token_usage = []
|
|
average_session_interactions = []
|
|
average_response_time = []
|
|
tokens_per_second = []
|
|
user_satisfaction_rate = []
|
|
|
|
for row in rows:
|
|
date = str(row["date"])
|
|
message_count = int(row["message_count"] or 0)
|
|
conversation_count = int(row["conversation_count"] or 0)
|
|
token_count = int(row["token_count"] or 0)
|
|
total_price = row["total_price"] or Decimal(0)
|
|
avg_latency = float(row["avg_latency"] or 0)
|
|
latency_sum = float(row["latency_sum"] or 0)
|
|
answer_tokens = int(row["answer_tokens"] or 0)
|
|
like_count = int(row["like_count"] or 0)
|
|
|
|
messages.append({"date": date, "message_count": message_count})
|
|
conversations.append({"date": date, "conversation_count": conversation_count})
|
|
end_users.append({"date": date, "terminal_count": int(row["end_user_count"] or 0)})
|
|
token_usage.append(
|
|
{
|
|
"date": date,
|
|
"token_count": token_count,
|
|
"total_price": str(total_price),
|
|
"currency": "USD",
|
|
}
|
|
)
|
|
average_session_interactions.append(
|
|
{
|
|
"date": date,
|
|
"interactions": round(message_count / conversation_count, 2) if conversation_count else 0,
|
|
}
|
|
)
|
|
average_response_time.append({"date": date, "latency": round(avg_latency * 1000, 4)})
|
|
tokens_per_second.append({"date": date, "tps": round(answer_tokens / latency_sum, 4) if latency_sum else 0})
|
|
user_satisfaction_rate.append(
|
|
{"date": date, "rate": round(like_count * 100 / message_count, 2) if message_count else 0}
|
|
)
|
|
|
|
return {
|
|
"daily_messages": messages,
|
|
"daily_conversations": conversations,
|
|
"daily_end_users": end_users,
|
|
"token_usage": token_usage,
|
|
"average_session_interactions": average_session_interactions,
|
|
"average_response_time": average_response_time,
|
|
"tokens_per_second": tokens_per_second,
|
|
"user_satisfaction_rate": user_satisfaction_rate,
|
|
}
|
|
|
|
@staticmethod
|
|
def _build_summary(rows: list[dict[str, Any]]) -> dict[str, Any]:
|
|
total_messages = sum(int(row["message_count"] or 0) for row in rows)
|
|
total_conversations = sum(int(row["conversation_count"] or 0) for row in rows)
|
|
total_end_users = sum(int(row["end_user_count"] or 0) for row in rows)
|
|
total_tokens = sum(int(row["token_count"] or 0) for row in rows)
|
|
total_price = sum(Decimal(str(row["total_price"] or 0)) for row in rows)
|
|
total_answer_tokens = sum(int(row["answer_tokens"] or 0) for row in rows)
|
|
total_latency = sum(float(row["latency_sum"] or 0) for row in rows)
|
|
weighted_latency = sum(float(row["avg_latency"] or 0) * int(row["message_count"] or 0) for row in rows)
|
|
total_likes = sum(int(row["like_count"] or 0) for row in rows)
|
|
|
|
return {
|
|
"total_messages": total_messages,
|
|
"total_conversations": total_conversations,
|
|
"total_end_users": total_end_users,
|
|
"total_tokens": total_tokens,
|
|
"total_price": str(total_price),
|
|
"currency": "USD",
|
|
"average_session_interactions": round(total_messages / total_conversations, 2)
|
|
if total_conversations
|
|
else 0,
|
|
"average_response_time": round((weighted_latency / total_messages) * 1000, 4) if total_messages else 0,
|
|
"tokens_per_second": round(total_answer_tokens / total_latency, 4) if total_latency else 0,
|
|
"user_satisfaction_rate": round(total_likes * 100 / total_messages, 2) if total_messages else 0,
|
|
}
|