mirror of
https://github.com/langgenius/dify.git
synced 2026-06-22 19:21:13 +08:00
fix(agent): align roster observability logs contract (#37578)
Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
This commit is contained in:
parent
baf775134e
commit
59f8f2e7b3
@ -31,6 +31,8 @@ from fields.agent_fields import (
|
||||
AgentConfigSnapshotListResponse,
|
||||
AgentInviteOptionsResponse,
|
||||
AgentLogListResponse,
|
||||
AgentLogMessageListResponse,
|
||||
AgentLogSourceListResponse,
|
||||
AgentPublishedReferenceResponse,
|
||||
AgentRosterListResponse,
|
||||
AgentStatisticSummaryEnvelopeResponse,
|
||||
@ -153,6 +155,8 @@ register_response_schema_models(
|
||||
AgentConfigSnapshotListResponse,
|
||||
AgentInviteOptionsResponse,
|
||||
AgentLogListResponse,
|
||||
AgentLogMessageListResponse,
|
||||
AgentLogSourceListResponse,
|
||||
AgentPublishedReferenceResponse,
|
||||
AgentRosterListResponse,
|
||||
AgentStatisticSummaryEnvelopeResponse,
|
||||
@ -416,6 +420,7 @@ class AgentLogsApi(Resource):
|
||||
try:
|
||||
payload = _agent_observability_service().list_logs(
|
||||
app=app_model,
|
||||
agent_id=str(agent_id),
|
||||
params=AgentLogQueryParams(
|
||||
page=query.page,
|
||||
limit=query.limit,
|
||||
@ -431,6 +436,53 @@ class AgentLogsApi(Resource):
|
||||
return dump_response(AgentLogListResponse, payload)
|
||||
|
||||
|
||||
@console_ns.route("/agent/<uuid:agent_id>/logs/<uuid:conversation_id>/messages")
|
||||
class AgentLogMessagesApi(Resource):
|
||||
@console_ns.doc(params=query_params_from_model(AgentLogsQuery))
|
||||
@console_ns.response(200, "Agent log messages", console_ns.models[AgentLogMessageListResponse.__name__])
|
||||
@setup_required
|
||||
@login_required
|
||||
@account_initialization_required
|
||||
@with_current_user
|
||||
@with_current_tenant_id
|
||||
def get(self, tenant_id: str, current_user: Account, agent_id: UUID, conversation_id: UUID):
|
||||
app_model = _resolve_agent_app_model(tenant_id=tenant_id, agent_id=agent_id)
|
||||
query = AgentLogsQuery.model_validate(request.args.to_dict(flat=True))
|
||||
start, end = _parse_observability_time_range(query.start, query.end, current_user)
|
||||
try:
|
||||
payload = _agent_observability_service().list_log_messages(
|
||||
app=app_model,
|
||||
agent_id=str(agent_id),
|
||||
conversation_id=str(conversation_id),
|
||||
params=AgentLogQueryParams(
|
||||
page=query.page,
|
||||
limit=query.limit,
|
||||
keyword=query.keyword,
|
||||
status=query.status,
|
||||
source=query.source,
|
||||
start=start,
|
||||
end=end,
|
||||
),
|
||||
)
|
||||
except ValueError as exc:
|
||||
abort(400, description=str(exc))
|
||||
return dump_response(AgentLogMessageListResponse, payload)
|
||||
|
||||
|
||||
@console_ns.route("/agent/<uuid:agent_id>/log-sources")
|
||||
class AgentLogSourcesApi(Resource):
|
||||
@console_ns.response(200, "Agent log sources", console_ns.models[AgentLogSourceListResponse.__name__])
|
||||
@setup_required
|
||||
@login_required
|
||||
@account_initialization_required
|
||||
@with_current_user
|
||||
@with_current_tenant_id
|
||||
def get(self, tenant_id: str, current_user: Account, agent_id: UUID):
|
||||
app_model = _resolve_agent_app_model(tenant_id=tenant_id, agent_id=agent_id)
|
||||
payload = _agent_observability_service().list_log_sources(app=app_model, agent_id=str(agent_id))
|
||||
return dump_response(AgentLogSourceListResponse, payload)
|
||||
|
||||
|
||||
@console_ns.route("/agent/<uuid:agent_id>/statistics/summary")
|
||||
class AgentStatisticsSummaryApi(Resource):
|
||||
@console_ns.doc(params=query_params_from_model(AgentStatisticsQuery))
|
||||
@ -452,6 +504,7 @@ class AgentStatisticsSummaryApi(Resource):
|
||||
try:
|
||||
payload = _agent_observability_service().get_statistics_summary(
|
||||
app=app_model,
|
||||
agent_id=str(agent_id),
|
||||
params=AgentStatisticsQueryParams(source=query.source, start=start, end=end, timezone=timezone),
|
||||
)
|
||||
except ValueError as exc:
|
||||
|
||||
@ -107,17 +107,58 @@ class AgentInviteOptionsResponse(ResponseModel):
|
||||
has_more: bool
|
||||
|
||||
|
||||
class AgentLogItemResponse(ResponseModel):
|
||||
class AgentLogSourceResponse(ResponseModel):
|
||||
id: str
|
||||
type: Literal["webapp", "workflow"]
|
||||
app_id: str
|
||||
app_name: str
|
||||
app_icon_type: str | None = None
|
||||
app_icon: str | None = None
|
||||
app_icon_background: str | None = None
|
||||
workflow_id: str | None = None
|
||||
workflow_version: str | None = None
|
||||
node_id: str | None = None
|
||||
|
||||
|
||||
class AgentLogSourceGroupResponse(ResponseModel):
|
||||
type: Literal["webapp", "workflow"]
|
||||
label: str
|
||||
sources: list[AgentLogSourceResponse] = Field(default_factory=list)
|
||||
|
||||
|
||||
class AgentLogSourceListResponse(ResponseModel):
|
||||
data: list[AgentLogSourceResponse]
|
||||
groups: list[AgentLogSourceGroupResponse]
|
||||
|
||||
|
||||
class AgentLogConversationItemResponse(ResponseModel):
|
||||
id: str
|
||||
conversation_id: str
|
||||
title: str | None = None
|
||||
end_user_id: str | None = None
|
||||
message_count: int
|
||||
user_rate: float | None = None
|
||||
operation_rate: float | None = None
|
||||
unread: bool
|
||||
source: AgentLogSourceResponse | None = None
|
||||
status: Literal["success", "failed", "paused"]
|
||||
created_at: int | None = None
|
||||
updated_at: int | None = None
|
||||
|
||||
@field_validator("created_at", "updated_at", mode="before")
|
||||
@classmethod
|
||||
def _normalize_timestamp(cls, value: datetime | int | None) -> int | None:
|
||||
return to_timestamp(value)
|
||||
|
||||
|
||||
class AgentLogMessageItemResponse(ResponseModel):
|
||||
id: str
|
||||
message_id: str
|
||||
conversation_id: str
|
||||
conversation_name: str | None = None
|
||||
query: str
|
||||
answer: str
|
||||
status: str
|
||||
error: str | None = None
|
||||
source: str | None = None
|
||||
from_source: str | None = None
|
||||
from_end_user_id: str | None = None
|
||||
from_account_id: str | None = None
|
||||
message_tokens: int
|
||||
@ -136,7 +177,15 @@ class AgentLogItemResponse(ResponseModel):
|
||||
|
||||
|
||||
class AgentLogListResponse(ResponseModel):
|
||||
data: list[AgentLogItemResponse]
|
||||
data: list[AgentLogConversationItemResponse]
|
||||
page: int
|
||||
limit: int
|
||||
total: int
|
||||
has_more: bool
|
||||
|
||||
|
||||
class AgentLogMessageListResponse(ResponseModel):
|
||||
data: list[AgentLogMessageItemResponse]
|
||||
page: int
|
||||
limit: int
|
||||
total: int
|
||||
|
||||
@ -658,6 +658,19 @@ Commit an uploaded file into the Agent App drive under files/<name>
|
||||
| ---- | ----------- | ------ |
|
||||
| 201 | File committed into the agent drive | **application/json**: [AgentDriveFileCommitResponse](#agentdrivefilecommitresponse)<br> |
|
||||
|
||||
### [GET] /agent/{agent_id}/log-sources
|
||||
#### Parameters
|
||||
|
||||
| Name | Located in | Description | Required | Schema |
|
||||
| ---- | ---------- | ----------- | -------- | ------ |
|
||||
| agent_id | path | | Yes | string |
|
||||
|
||||
#### Responses
|
||||
|
||||
| Code | Description | Schema |
|
||||
| ---- | ----------- | ------ |
|
||||
| 200 | Agent log sources | **application/json**: [AgentLogSourceListResponse](#agentlogsourcelistresponse)<br> |
|
||||
|
||||
### [GET] /agent/{agent_id}/logs
|
||||
#### Parameters
|
||||
|
||||
@ -678,6 +691,27 @@ Commit an uploaded file into the Agent App drive under files/<name>
|
||||
| ---- | ----------- | ------ |
|
||||
| 200 | Agent logs | **application/json**: [AgentLogListResponse](#agentloglistresponse)<br> |
|
||||
|
||||
### [GET] /agent/{agent_id}/logs/{conversation_id}/messages
|
||||
#### Parameters
|
||||
|
||||
| Name | Located in | Description | Required | Schema |
|
||||
| ---- | ---------- | ----------- | -------- | ------ |
|
||||
| end | query | End date (YYYY-MM-DD HH:MM) | No | string |
|
||||
| keyword | query | Search query, answer, or conversation name | No | string |
|
||||
| limit | query | Page size | No | integer, <br>**Default:** 20 |
|
||||
| page | query | Page number | No | integer, <br>**Default:** 1 |
|
||||
| source | query | Filter by all, console/explore, api/service-api, web-app, debugger, openapi, or trigger | No | string |
|
||||
| start | query | Start date (YYYY-MM-DD HH:MM) | No | string |
|
||||
| status | query | Filter by success, failed, or paused | No | string |
|
||||
| agent_id | path | | Yes | string |
|
||||
| conversation_id | path | | Yes | string |
|
||||
|
||||
#### Responses
|
||||
|
||||
| Code | Description | Schema |
|
||||
| ---- | ----------- | ------ |
|
||||
| 200 | Agent log messages | **application/json**: [AgentLogMessageListResponse](#agentlogmessagelistresponse)<br> |
|
||||
|
||||
### [GET] /agent/{agent_id}/messages/{message_id}
|
||||
Get Agent App message details by ID
|
||||
|
||||
@ -11941,36 +11975,60 @@ the current roster/workflow APIs scoped to Dify Agent.
|
||||
| ---- | ---- | ----------- | -------- |
|
||||
| AgentKnowledgeQueryMode | string | | |
|
||||
|
||||
#### AgentLogItemResponse
|
||||
#### AgentLogConversationItemResponse
|
||||
|
||||
| Name | Type | Description | Required |
|
||||
| ---- | ---- | ----------- | -------- |
|
||||
| conversation_id | string | | Yes |
|
||||
| created_at | integer | | No |
|
||||
| end_user_id | string | | No |
|
||||
| id | string | | Yes |
|
||||
| message_count | integer | | Yes |
|
||||
| operation_rate | number | | No |
|
||||
| source | [AgentLogSourceResponse](#agentlogsourceresponse) | | No |
|
||||
| status | string, <br>**Available values:** "failed", "paused", "success" | *Enum:* `"failed"`, `"paused"`, `"success"` | Yes |
|
||||
| title | string | | No |
|
||||
| unread | boolean | | Yes |
|
||||
| updated_at | integer | | No |
|
||||
| user_rate | number | | No |
|
||||
|
||||
#### AgentLogListResponse
|
||||
|
||||
| Name | Type | Description | Required |
|
||||
| ---- | ---- | ----------- | -------- |
|
||||
| data | [ [AgentLogConversationItemResponse](#agentlogconversationitemresponse) ] | | Yes |
|
||||
| has_more | boolean | | Yes |
|
||||
| limit | integer | | Yes |
|
||||
| page | integer | | Yes |
|
||||
| total | integer | | Yes |
|
||||
|
||||
#### AgentLogMessageItemResponse
|
||||
|
||||
| Name | Type | Description | Required |
|
||||
| ---- | ---- | ----------- | -------- |
|
||||
| answer | string | | Yes |
|
||||
| answer_tokens | integer | | Yes |
|
||||
| conversation_id | string | | Yes |
|
||||
| conversation_name | string | | No |
|
||||
| created_at | integer | | No |
|
||||
| currency | string | | Yes |
|
||||
| error | string | | No |
|
||||
| from_account_id | string | | No |
|
||||
| from_end_user_id | string | | No |
|
||||
| from_source | string | | No |
|
||||
| id | string | | Yes |
|
||||
| latency | number | | Yes |
|
||||
| message_id | string | | Yes |
|
||||
| message_tokens | integer | | Yes |
|
||||
| query | string | | Yes |
|
||||
| source | string | | No |
|
||||
| status | string | | Yes |
|
||||
| total_price | string | | Yes |
|
||||
| total_tokens | integer | | Yes |
|
||||
| updated_at | integer | | No |
|
||||
|
||||
#### AgentLogListResponse
|
||||
#### AgentLogMessageListResponse
|
||||
|
||||
| Name | Type | Description | Required |
|
||||
| ---- | ---- | ----------- | -------- |
|
||||
| data | [ [AgentLogItemResponse](#agentlogitemresponse) ] | | Yes |
|
||||
| data | [ [AgentLogMessageItemResponse](#agentlogmessageitemresponse) ] | | Yes |
|
||||
| has_more | boolean | | Yes |
|
||||
| limit | integer | | Yes |
|
||||
| page | integer | | Yes |
|
||||
@ -12003,6 +12061,36 @@ the current roster/workflow APIs scoped to Dify Agent.
|
||||
| iterations | [ [AgentIterationLogResponse](#agentiterationlogresponse) ] | | Yes |
|
||||
| meta | [AgentLogMetaResponse](#agentlogmetaresponse) | | Yes |
|
||||
|
||||
#### AgentLogSourceGroupResponse
|
||||
|
||||
| Name | Type | Description | Required |
|
||||
| ---- | ---- | ----------- | -------- |
|
||||
| label | string | | Yes |
|
||||
| sources | [ [AgentLogSourceResponse](#agentlogsourceresponse) ] | | No |
|
||||
| type | string, <br>**Available values:** "webapp", "workflow" | *Enum:* `"webapp"`, `"workflow"` | Yes |
|
||||
|
||||
#### AgentLogSourceListResponse
|
||||
|
||||
| Name | Type | Description | Required |
|
||||
| ---- | ---- | ----------- | -------- |
|
||||
| data | [ [AgentLogSourceResponse](#agentlogsourceresponse) ] | | Yes |
|
||||
| groups | [ [AgentLogSourceGroupResponse](#agentlogsourcegroupresponse) ] | | Yes |
|
||||
|
||||
#### AgentLogSourceResponse
|
||||
|
||||
| Name | Type | Description | Required |
|
||||
| ---- | ---- | ----------- | -------- |
|
||||
| app_icon | string | | No |
|
||||
| app_icon_background | string | | No |
|
||||
| app_icon_type | string | | No |
|
||||
| app_id | string | | Yes |
|
||||
| app_name | string | | Yes |
|
||||
| id | string | | Yes |
|
||||
| node_id | string | | No |
|
||||
| type | string, <br>**Available values:** "webapp", "workflow" | *Enum:* `"webapp"`, `"workflow"` | Yes |
|
||||
| workflow_id | string | | No |
|
||||
| workflow_version | string | | No |
|
||||
|
||||
#### AgentLogsQuery
|
||||
|
||||
| Name | Type | Description | Required |
|
||||
|
||||
@ -6,12 +6,15 @@ from decimal import Decimal
|
||||
from typing import Any
|
||||
|
||||
import sqlalchemy as sa
|
||||
from sqlalchemy import func, or_, select
|
||||
from sqlalchemy import and_, func, or_, select
|
||||
from sqlalchemy.orm import aliased
|
||||
|
||||
from core.app.entities.app_invoke_entities import InvokeFrom
|
||||
from libs.helper import convert_datetime_to_date, escape_like_pattern, to_timestamp
|
||||
from models.agent import WorkflowAgentNodeBinding
|
||||
from models.enums import MessageStatus
|
||||
from models.model import App, Conversation, Message
|
||||
from models.workflow import WorkflowNodeExecutionModel, WorkflowRun
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
@ -33,6 +36,16 @@ class AgentStatisticsQueryParams:
|
||||
timezone: str = "UTC"
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class AgentSourceFilter:
|
||||
kind: str
|
||||
app_id: str | None = None
|
||||
workflow_id: str | None = None
|
||||
workflow_version: str | None = None
|
||||
node_id: str | None = None
|
||||
invoke_from: InvokeFrom | None = None
|
||||
|
||||
|
||||
class AgentObservabilityService:
|
||||
_SOURCE_ALIASES: dict[str, InvokeFrom] = {
|
||||
"api": InvokeFrom.SERVICE_API,
|
||||
@ -66,6 +79,31 @@ class AgentObservabilityService:
|
||||
except KeyError as exc:
|
||||
raise ValueError(f"Unsupported source: {source}") from exc
|
||||
|
||||
@classmethod
|
||||
def resolve_source_filter(cls, source: str | None) -> AgentSourceFilter:
|
||||
if not source or source.strip().lower() == "all":
|
||||
return AgentSourceFilter(kind="all")
|
||||
normalized = source.strip()
|
||||
lowered = normalized.lower()
|
||||
if lowered == "webapp":
|
||||
return AgentSourceFilter(kind="webapp")
|
||||
if lowered.startswith("webapp:"):
|
||||
return AgentSourceFilter(kind="webapp", app_id=normalized.split(":", 1)[1] or None)
|
||||
if lowered == "workflow":
|
||||
return AgentSourceFilter(kind="workflow")
|
||||
if lowered.startswith("workflow:"):
|
||||
parts = normalized.split(":", 4)
|
||||
if len(parts) != 5 or not all(parts[1:]):
|
||||
raise ValueError(f"Unsupported source: {source}")
|
||||
return AgentSourceFilter(
|
||||
kind="workflow",
|
||||
app_id=parts[1],
|
||||
workflow_id=parts[2],
|
||||
workflow_version=parts[3],
|
||||
node_id=parts[4],
|
||||
)
|
||||
return AgentSourceFilter(kind="webapp", invoke_from=cls.resolve_source(source))
|
||||
|
||||
@staticmethod
|
||||
def _message_status(message: Message) -> str:
|
||||
if message.error or message.status == MessageStatus.ERROR:
|
||||
@ -104,19 +142,255 @@ class AgentObservabilityService:
|
||||
"updated_at": to_timestamp(message.updated_at),
|
||||
}
|
||||
|
||||
def list_logs(self, *, app: App, params: AgentLogQueryParams) -> dict[str, Any]:
|
||||
source = self.resolve_source(params.source)
|
||||
stmt = (
|
||||
select(Message, Conversation)
|
||||
.join(Conversation, Conversation.id == Message.conversation_id)
|
||||
.where(Message.app_id == app.id, Conversation.app_id == app.id)
|
||||
)
|
||||
stmt = self._apply_source_filter(stmt, source)
|
||||
def list_logs(self, *, app: App, agent_id: str, params: AgentLogQueryParams) -> dict[str, Any]:
|
||||
source_filter = self.resolve_source_filter(params.source)
|
||||
rows: list[dict[str, Any]] = []
|
||||
if source_filter.kind in {"all", "webapp"}:
|
||||
rows.extend(self._list_webapp_conversation_logs(app=app, params=params, source_filter=source_filter))
|
||||
if source_filter.kind in {"all", "workflow"}:
|
||||
rows.extend(
|
||||
self._list_workflow_conversation_logs(
|
||||
app=app,
|
||||
agent_id=agent_id,
|
||||
params=params,
|
||||
source_filter=source_filter,
|
||||
)
|
||||
)
|
||||
rows.sort(key=lambda row: (row["updated_at"] or 0, row["id"]), reverse=True)
|
||||
|
||||
if params.start:
|
||||
stmt = stmt.where(Message.created_at >= params.start)
|
||||
if params.end:
|
||||
stmt = stmt.where(Message.created_at < params.end)
|
||||
total = len(rows)
|
||||
start = (params.page - 1) * params.limit
|
||||
end = start + params.limit
|
||||
return {
|
||||
"data": rows[start:end],
|
||||
"page": params.page,
|
||||
"limit": params.limit,
|
||||
"total": total,
|
||||
"has_more": end < total,
|
||||
}
|
||||
|
||||
def list_log_messages(
|
||||
self, *, app: App, agent_id: str, conversation_id: str, params: AgentLogQueryParams
|
||||
) -> dict[str, Any]:
|
||||
source_filter = self.resolve_source_filter(params.source)
|
||||
rows: list[Message] = []
|
||||
if source_filter.kind in {"all", "webapp"}:
|
||||
rows.extend(
|
||||
self._list_webapp_messages(
|
||||
app=app,
|
||||
conversation_id=conversation_id,
|
||||
params=params,
|
||||
source_filter=source_filter,
|
||||
)
|
||||
)
|
||||
if source_filter.kind in {"all", "workflow"}:
|
||||
rows.extend(
|
||||
self._list_workflow_messages(
|
||||
app=app,
|
||||
agent_id=agent_id,
|
||||
conversation_id=conversation_id,
|
||||
params=params,
|
||||
source_filter=source_filter,
|
||||
)
|
||||
)
|
||||
|
||||
deduped = {message.id: message for message in rows}
|
||||
sorted_rows = sorted(deduped.values(), key=lambda message: (message.created_at, message.id), reverse=True)
|
||||
total = len(sorted_rows)
|
||||
start = (params.page - 1) * params.limit
|
||||
end = start + params.limit
|
||||
return {
|
||||
"data": [self.serialize_log_message(message) for message in sorted_rows[start:end]],
|
||||
"page": params.page,
|
||||
"limit": params.limit,
|
||||
"total": total,
|
||||
"has_more": end < total,
|
||||
}
|
||||
|
||||
def list_log_sources(self, *, app: App, agent_id: str) -> dict[str, Any]:
|
||||
webapp_source = self._serialize_webapp_source(app)
|
||||
workflow_sources = self._list_workflow_sources(app=app, agent_id=agent_id)
|
||||
return {
|
||||
"data": [webapp_source, *workflow_sources],
|
||||
"groups": [
|
||||
{"type": "webapp", "label": "WEBAPP", "sources": [webapp_source]},
|
||||
{"type": "workflow", "label": "WORKFLOW", "sources": workflow_sources},
|
||||
],
|
||||
}
|
||||
|
||||
def _list_webapp_conversation_logs(
|
||||
self, *, app: App, params: AgentLogQueryParams, source_filter: AgentSourceFilter
|
||||
) -> list[dict[str, Any]]:
|
||||
stmt = (
|
||||
select(
|
||||
Conversation,
|
||||
func.count(Message.id).label("message_count"),
|
||||
func.max(Message.created_at).label("created_at"),
|
||||
func.max(Message.updated_at).label("updated_at"),
|
||||
func.sum(sa.case((Message.status == MessageStatus.PAUSED, 1), else_=0)).label("paused_count"),
|
||||
func.sum(
|
||||
sa.case((or_(Message.error.is_not(None), Message.status == MessageStatus.ERROR), 1), else_=0)
|
||||
).label("failed_count"),
|
||||
)
|
||||
.join(Message, Message.conversation_id == Conversation.id)
|
||||
.where(Message.app_id == app.id, Conversation.app_id == app.id)
|
||||
.group_by(Conversation.id)
|
||||
)
|
||||
stmt = self._apply_observability_filters(stmt, params=params, source_filter=source_filter)
|
||||
rows = list(self._session.execute(stmt).all())
|
||||
return [
|
||||
self._serialize_conversation_log(
|
||||
conversation=row[0],
|
||||
message_count=row.message_count,
|
||||
paused_count=row.paused_count,
|
||||
failed_count=row.failed_count,
|
||||
source=self._serialize_webapp_source(app),
|
||||
created_at=row.created_at,
|
||||
updated_at=row.updated_at,
|
||||
)
|
||||
for row in rows
|
||||
]
|
||||
|
||||
def _list_workflow_conversation_logs(
|
||||
self, *, app: App, agent_id: str, params: AgentLogQueryParams, source_filter: AgentSourceFilter
|
||||
) -> list[dict[str, Any]]:
|
||||
workflow_app = aliased(App)
|
||||
stmt = (
|
||||
select(
|
||||
Conversation,
|
||||
workflow_app,
|
||||
WorkflowAgentNodeBinding.workflow_id,
|
||||
WorkflowAgentNodeBinding.workflow_version,
|
||||
WorkflowAgentNodeBinding.node_id,
|
||||
func.count(sa.distinct(Message.id)).label("message_count"),
|
||||
func.max(Message.created_at).label("created_at"),
|
||||
func.max(Message.updated_at).label("updated_at"),
|
||||
func.sum(sa.case((Message.status == MessageStatus.PAUSED, 1), else_=0)).label("paused_count"),
|
||||
func.sum(
|
||||
sa.case((or_(Message.error.is_not(None), Message.status == MessageStatus.ERROR), 1), else_=0)
|
||||
).label("failed_count"),
|
||||
)
|
||||
.select_from(Message)
|
||||
.join(Conversation, Conversation.id == Message.conversation_id)
|
||||
.join(WorkflowRun, WorkflowRun.id == Message.workflow_run_id)
|
||||
.join(
|
||||
WorkflowAgentNodeBinding,
|
||||
and_(
|
||||
WorkflowAgentNodeBinding.tenant_id == app.tenant_id,
|
||||
WorkflowAgentNodeBinding.agent_id == agent_id,
|
||||
WorkflowAgentNodeBinding.app_id == WorkflowRun.app_id,
|
||||
WorkflowAgentNodeBinding.workflow_id == WorkflowRun.workflow_id,
|
||||
WorkflowAgentNodeBinding.workflow_version == WorkflowRun.version,
|
||||
),
|
||||
)
|
||||
.join(
|
||||
WorkflowNodeExecutionModel,
|
||||
and_(
|
||||
WorkflowNodeExecutionModel.workflow_run_id == WorkflowRun.id,
|
||||
WorkflowNodeExecutionModel.node_id == WorkflowAgentNodeBinding.node_id,
|
||||
),
|
||||
)
|
||||
.join(workflow_app, workflow_app.id == WorkflowAgentNodeBinding.app_id)
|
||||
.where(Message.workflow_run_id.is_not(None), Conversation.app_id == WorkflowAgentNodeBinding.app_id)
|
||||
.group_by(
|
||||
Conversation.id,
|
||||
workflow_app.id,
|
||||
WorkflowAgentNodeBinding.workflow_id,
|
||||
WorkflowAgentNodeBinding.workflow_version,
|
||||
WorkflowAgentNodeBinding.node_id,
|
||||
)
|
||||
)
|
||||
stmt = self._apply_observability_filters(stmt, params=params, source_filter=source_filter)
|
||||
stmt = self._apply_workflow_source_filter(stmt, source_filter)
|
||||
rows = list(self._session.execute(stmt).all())
|
||||
return [
|
||||
self._serialize_conversation_log(
|
||||
conversation=row[0],
|
||||
message_count=row.message_count,
|
||||
paused_count=row.paused_count,
|
||||
failed_count=row.failed_count,
|
||||
source=self._serialize_workflow_source(
|
||||
app=row[1],
|
||||
workflow_id=row.workflow_id,
|
||||
workflow_version=row.workflow_version,
|
||||
node_id=row.node_id,
|
||||
),
|
||||
created_at=row.created_at,
|
||||
updated_at=row.updated_at,
|
||||
)
|
||||
for row in rows
|
||||
]
|
||||
|
||||
def _list_webapp_messages(
|
||||
self, *, app: App, conversation_id: str, params: AgentLogQueryParams, source_filter: AgentSourceFilter
|
||||
) -> list[Message]:
|
||||
stmt = select(Message).where(Message.app_id == app.id, Message.conversation_id == conversation_id)
|
||||
stmt = self._apply_message_filters(stmt, params=params, source_filter=source_filter)
|
||||
return list(self._session.scalars(stmt.order_by(Message.created_at.desc(), Message.id.desc())).all())
|
||||
|
||||
def _list_workflow_messages(
|
||||
self,
|
||||
*,
|
||||
app: App,
|
||||
agent_id: str,
|
||||
conversation_id: str,
|
||||
params: AgentLogQueryParams,
|
||||
source_filter: AgentSourceFilter,
|
||||
) -> list[Message]:
|
||||
stmt = (
|
||||
select(Message)
|
||||
.join(WorkflowRun, WorkflowRun.id == Message.workflow_run_id)
|
||||
.join(
|
||||
WorkflowAgentNodeBinding,
|
||||
and_(
|
||||
WorkflowAgentNodeBinding.tenant_id == app.tenant_id,
|
||||
WorkflowAgentNodeBinding.agent_id == agent_id,
|
||||
WorkflowAgentNodeBinding.app_id == WorkflowRun.app_id,
|
||||
WorkflowAgentNodeBinding.workflow_id == WorkflowRun.workflow_id,
|
||||
WorkflowAgentNodeBinding.workflow_version == WorkflowRun.version,
|
||||
),
|
||||
)
|
||||
.join(
|
||||
WorkflowNodeExecutionModel,
|
||||
and_(
|
||||
WorkflowNodeExecutionModel.workflow_run_id == WorkflowRun.id,
|
||||
WorkflowNodeExecutionModel.node_id == WorkflowAgentNodeBinding.node_id,
|
||||
),
|
||||
)
|
||||
.where(Message.conversation_id == conversation_id)
|
||||
)
|
||||
stmt = self._apply_message_filters(stmt, params=params, source_filter=source_filter)
|
||||
stmt = self._apply_workflow_source_filter(stmt, source_filter)
|
||||
return list(self._session.scalars(stmt.order_by(Message.created_at.desc(), Message.id.desc())).all())
|
||||
|
||||
def _list_workflow_sources(self, *, app: App, agent_id: str) -> list[dict[str, Any]]:
|
||||
workflow_app = aliased(App)
|
||||
stmt = (
|
||||
select(
|
||||
workflow_app,
|
||||
WorkflowAgentNodeBinding.workflow_id,
|
||||
WorkflowAgentNodeBinding.workflow_version,
|
||||
WorkflowAgentNodeBinding.node_id,
|
||||
)
|
||||
.join(workflow_app, workflow_app.id == WorkflowAgentNodeBinding.app_id)
|
||||
.where(WorkflowAgentNodeBinding.tenant_id == app.tenant_id, WorkflowAgentNodeBinding.agent_id == agent_id)
|
||||
.order_by(workflow_app.name.asc(), WorkflowAgentNodeBinding.node_id.asc())
|
||||
)
|
||||
rows = self._session.execute(stmt).all()
|
||||
deduped: dict[str, dict[str, Any]] = {}
|
||||
for row in rows:
|
||||
source = self._serialize_workflow_source(
|
||||
app=row[0],
|
||||
workflow_id=row.workflow_id,
|
||||
workflow_version=row.workflow_version,
|
||||
node_id=row.node_id,
|
||||
)
|
||||
deduped[source["id"]] = source
|
||||
return list(deduped.values())
|
||||
|
||||
@classmethod
|
||||
def _apply_observability_filters(cls, stmt, *, params: AgentLogQueryParams, source_filter: AgentSourceFilter):
|
||||
stmt = cls._apply_message_filters(stmt, params=params, source_filter=source_filter, include_keyword=False)
|
||||
if params.keyword:
|
||||
escaped_keyword = escape_like_pattern(params.keyword)
|
||||
pattern = f"%{escaped_keyword}%"
|
||||
@ -127,27 +401,41 @@ class AgentObservabilityService:
|
||||
Conversation.name.ilike(pattern, escape="\\"),
|
||||
)
|
||||
)
|
||||
if params.status:
|
||||
stmt = self._apply_status_filter(stmt, params.status)
|
||||
return stmt
|
||||
|
||||
total = self._session.scalar(select(func.count()).select_from(stmt.subquery())) or 0
|
||||
rows = list(
|
||||
self._session.execute(
|
||||
stmt.order_by(Message.created_at.desc(), Message.id.desc())
|
||||
.offset((params.page - 1) * params.limit)
|
||||
.limit(params.limit)
|
||||
).all()
|
||||
)
|
||||
data = []
|
||||
for message, conversation in rows:
|
||||
data.append(self.serialize_log_message(message, conversation))
|
||||
return {
|
||||
"data": data,
|
||||
"page": params.page,
|
||||
"limit": params.limit,
|
||||
"total": total,
|
||||
"has_more": params.page * params.limit < total,
|
||||
}
|
||||
@classmethod
|
||||
def _apply_message_filters(
|
||||
cls, stmt, *, params: AgentLogQueryParams, source_filter: AgentSourceFilter, include_keyword: bool = True
|
||||
):
|
||||
stmt = cls._apply_source_filter(stmt, source_filter.invoke_from)
|
||||
if params.start:
|
||||
stmt = stmt.where(Message.created_at >= params.start)
|
||||
if params.end:
|
||||
stmt = stmt.where(Message.created_at < params.end)
|
||||
if include_keyword and params.keyword:
|
||||
escaped_keyword = escape_like_pattern(params.keyword)
|
||||
pattern = f"%{escaped_keyword}%"
|
||||
stmt = stmt.where(
|
||||
or_(
|
||||
Message.query.ilike(pattern, escape="\\"),
|
||||
Message.answer.ilike(pattern, escape="\\"),
|
||||
)
|
||||
)
|
||||
if params.status:
|
||||
stmt = cls._apply_status_filter(stmt, params.status)
|
||||
return stmt
|
||||
|
||||
@staticmethod
|
||||
def _apply_workflow_source_filter(stmt, source_filter: AgentSourceFilter):
|
||||
if source_filter.app_id:
|
||||
stmt = stmt.where(WorkflowAgentNodeBinding.app_id == source_filter.app_id)
|
||||
if source_filter.workflow_id:
|
||||
stmt = stmt.where(WorkflowAgentNodeBinding.workflow_id == source_filter.workflow_id)
|
||||
if source_filter.workflow_version:
|
||||
stmt = stmt.where(WorkflowAgentNodeBinding.workflow_version == source_filter.workflow_version)
|
||||
if source_filter.node_id:
|
||||
stmt = stmt.where(WorkflowAgentNodeBinding.node_id == source_filter.node_id)
|
||||
return stmt
|
||||
|
||||
@classmethod
|
||||
def _apply_source_filter(cls, stmt, source: InvokeFrom | None):
|
||||
@ -166,22 +454,95 @@ class AgentObservabilityService:
|
||||
return stmt.where(Message.status == MessageStatus.PAUSED)
|
||||
raise ValueError(f"Unsupported status: {status}")
|
||||
|
||||
def get_statistics_summary(self, *, app: App, params: AgentStatisticsQueryParams) -> dict[str, Any]:
|
||||
source = self.resolve_source(params.source)
|
||||
rows = self._load_daily_statistics(app=app, params=params, source=source)
|
||||
@classmethod
|
||||
def _serialize_conversation_log(
|
||||
cls,
|
||||
*,
|
||||
conversation: Conversation,
|
||||
message_count: int,
|
||||
paused_count: int,
|
||||
failed_count: int,
|
||||
source: dict[str, Any],
|
||||
created_at: datetime | None,
|
||||
updated_at: datetime | None,
|
||||
) -> dict[str, Any]:
|
||||
return {
|
||||
"id": conversation.id,
|
||||
"conversation_id": conversation.id,
|
||||
"title": conversation.name,
|
||||
"end_user_id": conversation.from_end_user_id,
|
||||
"message_count": int(message_count or 0),
|
||||
"user_rate": None,
|
||||
"operation_rate": None,
|
||||
"unread": conversation.read_at is None,
|
||||
"source": source,
|
||||
"status": cls._conversation_status(paused_count=paused_count, failed_count=failed_count),
|
||||
"created_at": to_timestamp(created_at or conversation.created_at),
|
||||
"updated_at": to_timestamp(updated_at or conversation.updated_at),
|
||||
}
|
||||
|
||||
@staticmethod
|
||||
def _conversation_status(*, paused_count: int, failed_count: int) -> str:
|
||||
if paused_count:
|
||||
return "paused"
|
||||
if failed_count:
|
||||
return "failed"
|
||||
return "success"
|
||||
|
||||
@staticmethod
|
||||
def _serialize_webapp_source(app: App) -> dict[str, Any]:
|
||||
icon_type = app.icon_type.value if app.icon_type else None
|
||||
return {
|
||||
"id": f"webapp:{app.id}",
|
||||
"type": "webapp",
|
||||
"app_id": app.id,
|
||||
"app_name": app.name,
|
||||
"app_icon_type": icon_type,
|
||||
"app_icon": app.icon,
|
||||
"app_icon_background": app.icon_background,
|
||||
"workflow_id": None,
|
||||
"workflow_version": None,
|
||||
"node_id": None,
|
||||
}
|
||||
|
||||
@staticmethod
|
||||
def _serialize_workflow_source(
|
||||
*,
|
||||
app: App,
|
||||
workflow_id: str,
|
||||
workflow_version: str,
|
||||
node_id: str,
|
||||
) -> dict[str, Any]:
|
||||
icon_type = app.icon_type.value if app.icon_type else None
|
||||
return {
|
||||
"id": f"workflow:{app.id}:{workflow_id}:{workflow_version}:{node_id}",
|
||||
"type": "workflow",
|
||||
"app_id": app.id,
|
||||
"app_name": app.name,
|
||||
"app_icon_type": icon_type,
|
||||
"app_icon": app.icon,
|
||||
"app_icon_background": app.icon_background,
|
||||
"workflow_id": workflow_id,
|
||||
"workflow_version": workflow_version,
|
||||
"node_id": node_id,
|
||||
}
|
||||
|
||||
def get_statistics_summary(self, *, app: App, agent_id: str, params: AgentStatisticsQueryParams) -> dict[str, Any]:
|
||||
source_filter = self.resolve_source_filter(params.source)
|
||||
rows = self._load_daily_statistics(app=app, agent_id=agent_id, params=params, source_filter=source_filter)
|
||||
charts = self._build_charts(rows)
|
||||
summary = self._build_summary(rows)
|
||||
return {
|
||||
"source": source.value if source else "all",
|
||||
"source": params.source or "all",
|
||||
"summary": summary,
|
||||
"charts": charts,
|
||||
}
|
||||
|
||||
def _load_daily_statistics(
|
||||
self, *, app: App, params: AgentStatisticsQueryParams, source: InvokeFrom | None
|
||||
self, *, app: App, agent_id: str, params: AgentStatisticsQueryParams, source_filter: AgentSourceFilter
|
||||
) -> list[dict[str, Any]]:
|
||||
converted_created_at = convert_datetime_to_date("m.created_at")
|
||||
source_condition = "AND m.invoke_from != :debugger" if source is None else "AND m.invoke_from = :source"
|
||||
message_scope = self._statistics_message_scope_sql(source_filter)
|
||||
sql_query = f"""SELECT
|
||||
{converted_created_at} AS date,
|
||||
COUNT(m.id) AS message_count,
|
||||
@ -197,15 +558,24 @@ FROM messages m
|
||||
LEFT JOIN message_feedbacks mf
|
||||
ON mf.message_id = m.id AND mf.rating = 'like'
|
||||
WHERE
|
||||
m.app_id = :app_id
|
||||
{source_condition}"""
|
||||
{message_scope}"""
|
||||
args: dict[str, Any] = {
|
||||
"tz": params.timezone,
|
||||
"app_id": app.id,
|
||||
"tenant_id": app.tenant_id,
|
||||
"agent_id": agent_id,
|
||||
"debugger": InvokeFrom.DEBUGGER,
|
||||
}
|
||||
if source is not None:
|
||||
args["source"] = source
|
||||
if source_filter.invoke_from is not None:
|
||||
args["source"] = source_filter.invoke_from
|
||||
if source_filter.app_id:
|
||||
args["source_app_id"] = source_filter.app_id
|
||||
if source_filter.workflow_id:
|
||||
args["workflow_id"] = source_filter.workflow_id
|
||||
if source_filter.workflow_version:
|
||||
args["workflow_version"] = source_filter.workflow_version
|
||||
if source_filter.node_id:
|
||||
args["node_id"] = source_filter.node_id
|
||||
if params.start:
|
||||
sql_query += " AND m.created_at >= :start"
|
||||
args["start"] = params.start
|
||||
@ -216,6 +586,45 @@ WHERE
|
||||
|
||||
return [dict(row._mapping) for row in self._session.execute(sa.text(sql_query), args).all()]
|
||||
|
||||
@staticmethod
|
||||
def _statistics_message_scope_sql(source_filter: AgentSourceFilter) -> str:
|
||||
app_scope = "m.app_id = :app_id"
|
||||
if source_filter.invoke_from is None:
|
||||
app_scope += " AND m.invoke_from != :debugger"
|
||||
else:
|
||||
app_scope += " AND m.invoke_from = :source"
|
||||
workflow_binding_filters = []
|
||||
if source_filter.app_id:
|
||||
workflow_binding_filters.append("wanb.app_id = :source_app_id")
|
||||
if source_filter.workflow_id:
|
||||
workflow_binding_filters.append("wanb.workflow_id = :workflow_id")
|
||||
if source_filter.workflow_version:
|
||||
workflow_binding_filters.append("wanb.workflow_version = :workflow_version")
|
||||
if source_filter.node_id:
|
||||
workflow_binding_filters.append("wanb.node_id = :node_id")
|
||||
extra_workflow_filters = f"AND {' AND '.join(workflow_binding_filters)}" if workflow_binding_filters else ""
|
||||
workflow_scope = f"""m.workflow_run_id IS NOT NULL
|
||||
AND EXISTS (
|
||||
SELECT 1
|
||||
FROM workflow_runs wr
|
||||
JOIN workflow_agent_node_bindings wanb
|
||||
ON wanb.tenant_id = :tenant_id
|
||||
AND wanb.agent_id = :agent_id
|
||||
AND wanb.app_id = wr.app_id
|
||||
AND wanb.workflow_id = wr.workflow_id
|
||||
AND wanb.workflow_version = wr.version
|
||||
{extra_workflow_filters}
|
||||
JOIN workflow_node_executions wne
|
||||
ON wne.workflow_run_id = wr.id
|
||||
AND wne.node_id = wanb.node_id
|
||||
WHERE wr.id = m.workflow_run_id
|
||||
)"""
|
||||
if source_filter.kind == "webapp":
|
||||
return app_scope
|
||||
if source_filter.kind == "workflow":
|
||||
return workflow_scope
|
||||
return f"(({app_scope}) OR ({workflow_scope}))"
|
||||
|
||||
@staticmethod
|
||||
def _build_charts(rows: list[dict[str, Any]]) -> dict[str, list[dict[str, Any]]]:
|
||||
messages = []
|
||||
|
||||
@ -24,7 +24,9 @@ from controllers.console.agent.roster import (
|
||||
AgentAppCopyApi,
|
||||
AgentAppListApi,
|
||||
AgentInviteOptionsApi,
|
||||
AgentLogMessagesApi,
|
||||
AgentLogsApi,
|
||||
AgentLogSourcesApi,
|
||||
AgentRosterVersionDetailApi,
|
||||
AgentRosterVersionsApi,
|
||||
AgentStatisticsSummaryApi,
|
||||
@ -153,6 +155,8 @@ def test_agent_v2_console_routes_are_agent_id_first() -> None:
|
||||
"/agent/<uuid:agent_id>/chat-messages/<uuid:message_id>/suggested-questions",
|
||||
"/agent/<uuid:agent_id>/messages/<uuid:message_id>",
|
||||
"/agent/<uuid:agent_id>/logs",
|
||||
"/agent/<uuid:agent_id>/logs/<uuid:conversation_id>/messages",
|
||||
"/agent/<uuid:agent_id>/log-sources",
|
||||
"/agent/<uuid:agent_id>/statistics/summary",
|
||||
"/agent/invite-options",
|
||||
):
|
||||
@ -461,21 +465,59 @@ def test_agent_observability_routes_resolve_app_from_agent_id(
|
||||
captured: dict[str, object] = {}
|
||||
|
||||
class FakeObservabilityService:
|
||||
def list_logs(self, *, app, params):
|
||||
captured["logs"] = {"app": app, "params": params}
|
||||
def list_logs(self, *, app, agent_id, params):
|
||||
captured["logs"] = {"app": app, "agent_id": agent_id, "params": params}
|
||||
return {
|
||||
"data": [
|
||||
{
|
||||
"conversation_id": "conversation-1",
|
||||
"id": "conversation-1",
|
||||
"title": "Debug",
|
||||
"end_user_id": "end-user-1",
|
||||
"message_count": 2,
|
||||
"user_rate": None,
|
||||
"operation_rate": None,
|
||||
"unread": True,
|
||||
"source": {
|
||||
"id": "webapp:app-1",
|
||||
"type": "webapp",
|
||||
"app_id": "app-1",
|
||||
"app_name": "Iris",
|
||||
"app_icon_type": "emoji",
|
||||
"app_icon": "robot",
|
||||
"app_icon_background": "#fff",
|
||||
"workflow_id": None,
|
||||
"workflow_version": None,
|
||||
"node_id": None,
|
||||
},
|
||||
"status": "success",
|
||||
"created_at": 1,
|
||||
"updated_at": 2,
|
||||
}
|
||||
],
|
||||
"page": 2,
|
||||
"limit": 5,
|
||||
"total": 6,
|
||||
"has_more": False,
|
||||
}
|
||||
|
||||
def list_log_messages(self, *, app, agent_id, conversation_id, params):
|
||||
captured["messages"] = {
|
||||
"app": app,
|
||||
"agent_id": agent_id,
|
||||
"conversation_id": conversation_id,
|
||||
"params": params,
|
||||
}
|
||||
return {
|
||||
"data": [
|
||||
{
|
||||
"id": "message-1",
|
||||
"message_id": "message-1",
|
||||
"conversation_id": "conversation-1",
|
||||
"conversation_name": "Debug",
|
||||
"query": "hello",
|
||||
"answer": "hi",
|
||||
"status": "success",
|
||||
"error": None,
|
||||
"source": "explore",
|
||||
"from_source": "console",
|
||||
"from_end_user_id": None,
|
||||
"from_account_id": account_id,
|
||||
"message_tokens": 1,
|
||||
@ -488,14 +530,34 @@ def test_agent_observability_routes_resolve_app_from_agent_id(
|
||||
"updated_at": 2,
|
||||
}
|
||||
],
|
||||
"page": 2,
|
||||
"limit": 5,
|
||||
"total": 6,
|
||||
"page": 1,
|
||||
"limit": 20,
|
||||
"total": 1,
|
||||
"has_more": False,
|
||||
}
|
||||
|
||||
def get_statistics_summary(self, *, app, params):
|
||||
captured["statistics"] = {"app": app, "params": params}
|
||||
def list_log_sources(self, *, app, agent_id):
|
||||
captured["sources"] = {"app": app, "agent_id": agent_id}
|
||||
return {
|
||||
"data": [
|
||||
{
|
||||
"id": "webapp:app-1",
|
||||
"type": "webapp",
|
||||
"app_id": "app-1",
|
||||
"app_name": "Iris",
|
||||
"app_icon_type": "emoji",
|
||||
"app_icon": "robot",
|
||||
"app_icon_background": "#fff",
|
||||
"workflow_id": None,
|
||||
"workflow_version": None,
|
||||
"node_id": None,
|
||||
}
|
||||
],
|
||||
"groups": [{"type": "webapp", "label": "WEBAPP", "sources": []}],
|
||||
}
|
||||
|
||||
def get_statistics_summary(self, *, app, agent_id, params):
|
||||
captured["statistics"] = {"app": app, "agent_id": agent_id, "params": params}
|
||||
return {
|
||||
"source": "all",
|
||||
"summary": {
|
||||
@ -532,9 +594,11 @@ def test_agent_observability_routes_resolve_app_from_agent_id(
|
||||
):
|
||||
logs = unwrap(AgentLogsApi.get)(AgentLogsApi(), "tenant-1", account, agent_id)
|
||||
|
||||
assert logs["data"][0]["id"] == "message-1"
|
||||
assert logs["data"][0]["id"] == "conversation-1"
|
||||
assert logs["data"][0]["source"]["id"] == "webapp:app-1"
|
||||
logs_call = cast(dict[str, object], captured["logs"])
|
||||
assert logs_call["app"] is app_model
|
||||
assert logs_call["agent_id"] == agent_id
|
||||
logs_params = cast(Any, logs_call["params"])
|
||||
assert logs_params.page == 2
|
||||
assert logs_params.limit == 5
|
||||
@ -542,6 +606,31 @@ def test_agent_observability_routes_resolve_app_from_agent_id(
|
||||
assert logs_params.status == "success"
|
||||
assert logs_params.source == "console"
|
||||
|
||||
with app.test_request_context(
|
||||
"/console/api/agent/00000000-0000-0000-0000-000000000001/logs/00000000-0000-0000-0000-000000000002/messages"
|
||||
):
|
||||
messages = unwrap(AgentLogMessagesApi.get)(
|
||||
AgentLogMessagesApi(),
|
||||
"tenant-1",
|
||||
account,
|
||||
agent_id,
|
||||
"00000000-0000-0000-0000-000000000002",
|
||||
)
|
||||
|
||||
assert messages["data"][0]["id"] == "message-1"
|
||||
messages_call = cast(dict[str, object], captured["messages"])
|
||||
assert messages_call["app"] is app_model
|
||||
assert messages_call["agent_id"] == agent_id
|
||||
assert messages_call["conversation_id"] == "00000000-0000-0000-0000-000000000002"
|
||||
|
||||
with app.test_request_context("/console/api/agent/00000000-0000-0000-0000-000000000001/log-sources"):
|
||||
sources = unwrap(AgentLogSourcesApi.get)(AgentLogSourcesApi(), "tenant-1", account, agent_id)
|
||||
|
||||
assert sources["data"][0]["id"] == "webapp:app-1"
|
||||
sources_call = cast(dict[str, object], captured["sources"])
|
||||
assert sources_call["app"] is app_model
|
||||
assert sources_call["agent_id"] == agent_id
|
||||
|
||||
with app.test_request_context(
|
||||
"/console/api/agent/00000000-0000-0000-0000-000000000001/statistics/summary?source=api"
|
||||
):
|
||||
@ -550,6 +639,7 @@ def test_agent_observability_routes_resolve_app_from_agent_id(
|
||||
assert statistics["summary"]["total_messages"] == 1
|
||||
stats_call = cast(dict[str, object], captured["statistics"])
|
||||
assert stats_call["app"] is app_model
|
||||
assert stats_call["agent_id"] == agent_id
|
||||
stats_params = cast(Any, stats_call["params"])
|
||||
assert stats_params.source == "api"
|
||||
assert stats_params.timezone == "UTC"
|
||||
|
||||
@ -20,6 +20,60 @@ def test_resolve_source_accepts_frontend_aliases() -> None:
|
||||
AgentObservabilityService.resolve_source("unknown")
|
||||
|
||||
|
||||
def test_resolve_source_filter_accepts_structured_sources() -> None:
|
||||
assert AgentObservabilityService.resolve_source_filter(None).kind == "all"
|
||||
assert AgentObservabilityService.resolve_source_filter("webapp").kind == "webapp"
|
||||
assert AgentObservabilityService.resolve_source_filter("webapp:app-1").app_id == "app-1"
|
||||
|
||||
workflow_filter = AgentObservabilityService.resolve_source_filter("workflow:app-2:workflow-1:v1:node-1")
|
||||
assert workflow_filter.kind == "workflow"
|
||||
assert workflow_filter.app_id == "app-2"
|
||||
assert workflow_filter.workflow_id == "workflow-1"
|
||||
assert workflow_filter.workflow_version == "v1"
|
||||
assert workflow_filter.node_id == "node-1"
|
||||
|
||||
legacy_filter = AgentObservabilityService.resolve_source_filter("console")
|
||||
assert legacy_filter.kind == "webapp"
|
||||
assert legacy_filter.invoke_from == InvokeFrom.EXPLORE
|
||||
|
||||
with pytest.raises(ValueError, match="Unsupported source"):
|
||||
AgentObservabilityService.resolve_source_filter("workflow:broken")
|
||||
|
||||
|
||||
def test_source_serializers_return_structured_frontend_shape() -> None:
|
||||
app = SimpleNamespace(
|
||||
id="app-1",
|
||||
name="Iris",
|
||||
icon_type=SimpleNamespace(value="emoji"),
|
||||
icon="robot",
|
||||
icon_background="#fff",
|
||||
)
|
||||
|
||||
webapp_source = AgentObservabilityService._serialize_webapp_source(app) # type: ignore[arg-type]
|
||||
workflow_source = AgentObservabilityService._serialize_workflow_source(
|
||||
app=app, # type: ignore[arg-type]
|
||||
workflow_id="workflow-1",
|
||||
workflow_version="v1",
|
||||
node_id="node-1",
|
||||
)
|
||||
|
||||
assert webapp_source == {
|
||||
"id": "webapp:app-1",
|
||||
"type": "webapp",
|
||||
"app_id": "app-1",
|
||||
"app_name": "Iris",
|
||||
"app_icon_type": "emoji",
|
||||
"app_icon": "robot",
|
||||
"app_icon_background": "#fff",
|
||||
"workflow_id": None,
|
||||
"workflow_version": None,
|
||||
"node_id": None,
|
||||
}
|
||||
assert workflow_source["id"] == "workflow:app-1:workflow-1:v1:node-1"
|
||||
assert workflow_source["type"] == "workflow"
|
||||
assert workflow_source["workflow_id"] == "workflow-1"
|
||||
|
||||
|
||||
def test_serialize_log_message_returns_frontend_log_shape() -> None:
|
||||
created_at = datetime(2026, 6, 17, 1, 2, 3, tzinfo=UTC)
|
||||
updated_at = datetime(2026, 6, 17, 1, 3, 3, tzinfo=UTC)
|
||||
|
||||
@ -29,6 +29,11 @@ import {
|
||||
zGetAgentByAgentIdDriveFilesPreviewResponse,
|
||||
zGetAgentByAgentIdDriveFilesQuery,
|
||||
zGetAgentByAgentIdDriveFilesResponse,
|
||||
zGetAgentByAgentIdLogsByConversationIdMessagesPath,
|
||||
zGetAgentByAgentIdLogsByConversationIdMessagesQuery,
|
||||
zGetAgentByAgentIdLogsByConversationIdMessagesResponse,
|
||||
zGetAgentByAgentIdLogSourcesPath,
|
||||
zGetAgentByAgentIdLogSourcesResponse,
|
||||
zGetAgentByAgentIdLogsPath,
|
||||
zGetAgentByAgentIdLogsQuery,
|
||||
zGetAgentByAgentIdLogsResponse,
|
||||
@ -417,6 +422,45 @@ export const files2 = {
|
||||
}
|
||||
|
||||
export const get9 = oc
|
||||
.route({
|
||||
inputStructure: 'detailed',
|
||||
method: 'GET',
|
||||
operationId: 'getAgentByAgentIdLogSources',
|
||||
path: '/agent/{agent_id}/log-sources',
|
||||
tags: ['console'],
|
||||
})
|
||||
.input(z.object({ params: zGetAgentByAgentIdLogSourcesPath }))
|
||||
.output(zGetAgentByAgentIdLogSourcesResponse)
|
||||
|
||||
export const logSources = {
|
||||
get: get9,
|
||||
}
|
||||
|
||||
export const get10 = oc
|
||||
.route({
|
||||
inputStructure: 'detailed',
|
||||
method: 'GET',
|
||||
operationId: 'getAgentByAgentIdLogsByConversationIdMessages',
|
||||
path: '/agent/{agent_id}/logs/{conversation_id}/messages',
|
||||
tags: ['console'],
|
||||
})
|
||||
.input(
|
||||
z.object({
|
||||
params: zGetAgentByAgentIdLogsByConversationIdMessagesPath,
|
||||
query: zGetAgentByAgentIdLogsByConversationIdMessagesQuery.optional(),
|
||||
}),
|
||||
)
|
||||
.output(zGetAgentByAgentIdLogsByConversationIdMessagesResponse)
|
||||
|
||||
export const messages = {
|
||||
get: get10,
|
||||
}
|
||||
|
||||
export const byConversationId = {
|
||||
messages,
|
||||
}
|
||||
|
||||
export const get11 = oc
|
||||
.route({
|
||||
inputStructure: 'detailed',
|
||||
method: 'GET',
|
||||
@ -430,13 +474,14 @@ export const get9 = oc
|
||||
.output(zGetAgentByAgentIdLogsResponse)
|
||||
|
||||
export const logs = {
|
||||
get: get9,
|
||||
get: get11,
|
||||
byConversationId,
|
||||
}
|
||||
|
||||
/**
|
||||
* Get Agent App message details by ID
|
||||
*/
|
||||
export const get10 = oc
|
||||
export const get12 = oc
|
||||
.route({
|
||||
description: 'Get Agent App message details by ID',
|
||||
inputStructure: 'detailed',
|
||||
@ -449,17 +494,17 @@ export const get10 = oc
|
||||
.output(zGetAgentByAgentIdMessagesByMessageIdResponse)
|
||||
|
||||
export const byMessageId2 = {
|
||||
get: get10,
|
||||
get: get12,
|
||||
}
|
||||
|
||||
export const messages = {
|
||||
export const messages2 = {
|
||||
byMessageId: byMessageId2,
|
||||
}
|
||||
|
||||
/**
|
||||
* List workflow apps that reference this Agent App's bound Agent (read-only)
|
||||
*/
|
||||
export const get11 = oc
|
||||
export const get13 = oc
|
||||
.route({
|
||||
description: 'List workflow apps that reference this Agent App\'s bound Agent (read-only)',
|
||||
inputStructure: 'detailed',
|
||||
@ -472,13 +517,13 @@ export const get11 = oc
|
||||
.output(zGetAgentByAgentIdReferencingWorkflowsResponse)
|
||||
|
||||
export const referencingWorkflows = {
|
||||
get: get11,
|
||||
get: get13,
|
||||
}
|
||||
|
||||
/**
|
||||
* Read a text/binary preview file in an Agent App conversation sandbox
|
||||
*/
|
||||
export const get12 = oc
|
||||
export const get14 = oc
|
||||
.route({
|
||||
description: 'Read a text/binary preview file in an Agent App conversation sandbox',
|
||||
inputStructure: 'detailed',
|
||||
@ -496,7 +541,7 @@ export const get12 = oc
|
||||
.output(zGetAgentByAgentIdSandboxFilesReadResponse)
|
||||
|
||||
export const read = {
|
||||
get: get12,
|
||||
get: get14,
|
||||
}
|
||||
|
||||
/**
|
||||
@ -526,7 +571,7 @@ export const upload = {
|
||||
/**
|
||||
* List a directory in an Agent App conversation sandbox
|
||||
*/
|
||||
export const get13 = oc
|
||||
export const get15 = oc
|
||||
.route({
|
||||
description: 'List a directory in an Agent App conversation sandbox',
|
||||
inputStructure: 'detailed',
|
||||
@ -544,7 +589,7 @@ export const get13 = oc
|
||||
.output(zGetAgentByAgentIdSandboxFilesResponse)
|
||||
|
||||
export const files3 = {
|
||||
get: get13,
|
||||
get: get15,
|
||||
read,
|
||||
upload,
|
||||
}
|
||||
@ -638,7 +683,7 @@ export const skills = {
|
||||
bySlug,
|
||||
}
|
||||
|
||||
export const get14 = oc
|
||||
export const get16 = oc
|
||||
.route({
|
||||
inputStructure: 'detailed',
|
||||
method: 'GET',
|
||||
@ -655,14 +700,14 @@ export const get14 = oc
|
||||
.output(zGetAgentByAgentIdStatisticsSummaryResponse)
|
||||
|
||||
export const summary = {
|
||||
get: get14,
|
||||
get: get16,
|
||||
}
|
||||
|
||||
export const statistics = {
|
||||
summary,
|
||||
}
|
||||
|
||||
export const get15 = oc
|
||||
export const get17 = oc
|
||||
.route({
|
||||
inputStructure: 'detailed',
|
||||
method: 'GET',
|
||||
@ -674,10 +719,10 @@ export const get15 = oc
|
||||
.output(zGetAgentByAgentIdVersionsByVersionIdResponse)
|
||||
|
||||
export const byVersionId = {
|
||||
get: get15,
|
||||
get: get17,
|
||||
}
|
||||
|
||||
export const get16 = oc
|
||||
export const get18 = oc
|
||||
.route({
|
||||
inputStructure: 'detailed',
|
||||
method: 'GET',
|
||||
@ -689,7 +734,7 @@ export const get16 = oc
|
||||
.output(zGetAgentByAgentIdVersionsResponse)
|
||||
|
||||
export const versions = {
|
||||
get: get16,
|
||||
get: get18,
|
||||
byVersionId,
|
||||
}
|
||||
|
||||
@ -705,7 +750,7 @@ export const delete3 = oc
|
||||
.input(z.object({ params: zDeleteAgentByAgentIdPath }))
|
||||
.output(zDeleteAgentByAgentIdResponse)
|
||||
|
||||
export const get17 = oc
|
||||
export const get19 = oc
|
||||
.route({
|
||||
inputStructure: 'detailed',
|
||||
method: 'GET',
|
||||
@ -729,7 +774,7 @@ export const put2 = oc
|
||||
|
||||
export const byAgentId = {
|
||||
delete: delete3,
|
||||
get: get17,
|
||||
get: get19,
|
||||
put: put2,
|
||||
chatMessages,
|
||||
composer,
|
||||
@ -738,8 +783,9 @@ export const byAgentId = {
|
||||
features,
|
||||
feedbacks,
|
||||
files: files2,
|
||||
logSources,
|
||||
logs,
|
||||
messages,
|
||||
messages: messages2,
|
||||
referencingWorkflows,
|
||||
sandbox,
|
||||
skills,
|
||||
@ -747,7 +793,7 @@ export const byAgentId = {
|
||||
versions,
|
||||
}
|
||||
|
||||
export const get18 = oc
|
||||
export const get20 = oc
|
||||
.route({
|
||||
inputStructure: 'detailed',
|
||||
method: 'GET',
|
||||
@ -771,7 +817,7 @@ export const post11 = oc
|
||||
.output(zPostAgentResponse)
|
||||
|
||||
export const agent = {
|
||||
get: get18,
|
||||
get: get20,
|
||||
post: post11,
|
||||
inviteOptions,
|
||||
byAgentId,
|
||||
|
||||
@ -177,8 +177,21 @@ export type AgentDriveFileCommitResponse = {
|
||||
file: AgentDriveFileResponse
|
||||
}
|
||||
|
||||
export type AgentLogSourceListResponse = {
|
||||
data: Array<AgentLogSourceResponse>
|
||||
groups: Array<AgentLogSourceGroupResponse>
|
||||
}
|
||||
|
||||
export type AgentLogListResponse = {
|
||||
data: Array<AgentLogItemResponse>
|
||||
data: Array<AgentLogConversationItemResponse>
|
||||
has_more: boolean
|
||||
limit: number
|
||||
page: number
|
||||
total: number
|
||||
}
|
||||
|
||||
export type AgentLogMessageListResponse = {
|
||||
data: Array<AgentLogMessageItemResponse>
|
||||
has_more: boolean
|
||||
limit: number
|
||||
page: number
|
||||
@ -554,23 +567,54 @@ export type AgentDriveFileResponse = {
|
||||
size?: number | null
|
||||
}
|
||||
|
||||
export type AgentLogItemResponse = {
|
||||
export type AgentLogSourceResponse = {
|
||||
app_icon?: string | null
|
||||
app_icon_background?: string | null
|
||||
app_icon_type?: string | null
|
||||
app_id: string
|
||||
app_name: string
|
||||
id: string
|
||||
node_id?: string | null
|
||||
type: 'webapp' | 'workflow'
|
||||
workflow_id?: string | null
|
||||
workflow_version?: string | null
|
||||
}
|
||||
|
||||
export type AgentLogSourceGroupResponse = {
|
||||
label: string
|
||||
sources?: Array<AgentLogSourceResponse>
|
||||
type: 'webapp' | 'workflow'
|
||||
}
|
||||
|
||||
export type AgentLogConversationItemResponse = {
|
||||
conversation_id: string
|
||||
created_at?: number | null
|
||||
end_user_id?: string | null
|
||||
id: string
|
||||
message_count: number
|
||||
operation_rate?: number | null
|
||||
source?: AgentLogSourceResponse | null
|
||||
status: 'failed' | 'paused' | 'success'
|
||||
title?: string | null
|
||||
unread: boolean
|
||||
updated_at?: number | null
|
||||
user_rate?: number | null
|
||||
}
|
||||
|
||||
export type AgentLogMessageItemResponse = {
|
||||
answer: string
|
||||
answer_tokens: number
|
||||
conversation_id: string
|
||||
conversation_name?: string | null
|
||||
created_at?: number | null
|
||||
currency: string
|
||||
error?: string | null
|
||||
from_account_id?: string | null
|
||||
from_end_user_id?: string | null
|
||||
from_source?: string | null
|
||||
id: string
|
||||
latency: number
|
||||
message_id: string
|
||||
message_tokens: number
|
||||
query: string
|
||||
source?: string | null
|
||||
status: string
|
||||
total_price: string
|
||||
total_tokens: number
|
||||
@ -1861,6 +1905,22 @@ export type PostAgentByAgentIdFilesResponses = {
|
||||
export type PostAgentByAgentIdFilesResponse
|
||||
= PostAgentByAgentIdFilesResponses[keyof PostAgentByAgentIdFilesResponses]
|
||||
|
||||
export type GetAgentByAgentIdLogSourcesData = {
|
||||
body?: never
|
||||
path: {
|
||||
agent_id: string
|
||||
}
|
||||
query?: never
|
||||
url: '/agent/{agent_id}/log-sources'
|
||||
}
|
||||
|
||||
export type GetAgentByAgentIdLogSourcesResponses = {
|
||||
200: AgentLogSourceListResponse
|
||||
}
|
||||
|
||||
export type GetAgentByAgentIdLogSourcesResponse
|
||||
= GetAgentByAgentIdLogSourcesResponses[keyof GetAgentByAgentIdLogSourcesResponses]
|
||||
|
||||
export type GetAgentByAgentIdLogsData = {
|
||||
body?: never
|
||||
path: {
|
||||
@ -1885,6 +1945,31 @@ export type GetAgentByAgentIdLogsResponses = {
|
||||
export type GetAgentByAgentIdLogsResponse
|
||||
= GetAgentByAgentIdLogsResponses[keyof GetAgentByAgentIdLogsResponses]
|
||||
|
||||
export type GetAgentByAgentIdLogsByConversationIdMessagesData = {
|
||||
body?: never
|
||||
path: {
|
||||
agent_id: string
|
||||
conversation_id: string
|
||||
}
|
||||
query?: {
|
||||
end?: string
|
||||
keyword?: string
|
||||
limit?: number
|
||||
page?: number
|
||||
source?: string
|
||||
start?: string
|
||||
status?: string
|
||||
}
|
||||
url: '/agent/{agent_id}/logs/{conversation_id}/messages'
|
||||
}
|
||||
|
||||
export type GetAgentByAgentIdLogsByConversationIdMessagesResponses = {
|
||||
200: AgentLogMessageListResponse
|
||||
}
|
||||
|
||||
export type GetAgentByAgentIdLogsByConversationIdMessagesResponse
|
||||
= GetAgentByAgentIdLogsByConversationIdMessagesResponses[keyof GetAgentByAgentIdLogsByConversationIdMessagesResponses]
|
||||
|
||||
export type GetAgentByAgentIdMessagesByMessageIdData = {
|
||||
body?: never
|
||||
path: {
|
||||
|
||||
@ -333,25 +333,84 @@ export const zAgentDriveFileCommitResponse = z.object({
|
||||
})
|
||||
|
||||
/**
|
||||
* AgentLogItemResponse
|
||||
* AgentLogSourceResponse
|
||||
*/
|
||||
export const zAgentLogItemResponse = z.object({
|
||||
export const zAgentLogSourceResponse = z.object({
|
||||
app_icon: z.string().nullish(),
|
||||
app_icon_background: z.string().nullish(),
|
||||
app_icon_type: z.string().nullish(),
|
||||
app_id: z.string(),
|
||||
app_name: z.string(),
|
||||
id: z.string(),
|
||||
node_id: z.string().nullish(),
|
||||
type: z.enum(['webapp', 'workflow']),
|
||||
workflow_id: z.string().nullish(),
|
||||
workflow_version: z.string().nullish(),
|
||||
})
|
||||
|
||||
/**
|
||||
* AgentLogSourceGroupResponse
|
||||
*/
|
||||
export const zAgentLogSourceGroupResponse = z.object({
|
||||
label: z.string(),
|
||||
sources: z.array(zAgentLogSourceResponse).optional(),
|
||||
type: z.enum(['webapp', 'workflow']),
|
||||
})
|
||||
|
||||
/**
|
||||
* AgentLogSourceListResponse
|
||||
*/
|
||||
export const zAgentLogSourceListResponse = z.object({
|
||||
data: z.array(zAgentLogSourceResponse),
|
||||
groups: z.array(zAgentLogSourceGroupResponse),
|
||||
})
|
||||
|
||||
/**
|
||||
* AgentLogConversationItemResponse
|
||||
*/
|
||||
export const zAgentLogConversationItemResponse = z.object({
|
||||
conversation_id: z.string(),
|
||||
created_at: z.int().nullish(),
|
||||
end_user_id: z.string().nullish(),
|
||||
id: z.string(),
|
||||
message_count: z.int(),
|
||||
operation_rate: z.number().nullish(),
|
||||
source: zAgentLogSourceResponse.nullish(),
|
||||
status: z.enum(['failed', 'paused', 'success']),
|
||||
title: z.string().nullish(),
|
||||
unread: z.boolean(),
|
||||
updated_at: z.int().nullish(),
|
||||
user_rate: z.number().nullish(),
|
||||
})
|
||||
|
||||
/**
|
||||
* AgentLogListResponse
|
||||
*/
|
||||
export const zAgentLogListResponse = z.object({
|
||||
data: z.array(zAgentLogConversationItemResponse),
|
||||
has_more: z.boolean(),
|
||||
limit: z.int(),
|
||||
page: z.int(),
|
||||
total: z.int(),
|
||||
})
|
||||
|
||||
/**
|
||||
* AgentLogMessageItemResponse
|
||||
*/
|
||||
export const zAgentLogMessageItemResponse = z.object({
|
||||
answer: z.string(),
|
||||
answer_tokens: z.int(),
|
||||
conversation_id: z.string(),
|
||||
conversation_name: z.string().nullish(),
|
||||
created_at: z.int().nullish(),
|
||||
currency: z.string(),
|
||||
error: z.string().nullish(),
|
||||
from_account_id: z.string().nullish(),
|
||||
from_end_user_id: z.string().nullish(),
|
||||
from_source: z.string().nullish(),
|
||||
id: z.string(),
|
||||
latency: z.number(),
|
||||
message_id: z.string(),
|
||||
message_tokens: z.int(),
|
||||
query: z.string(),
|
||||
source: z.string().nullish(),
|
||||
status: z.string(),
|
||||
total_price: z.string(),
|
||||
total_tokens: z.int(),
|
||||
@ -359,10 +418,10 @@ export const zAgentLogItemResponse = z.object({
|
||||
})
|
||||
|
||||
/**
|
||||
* AgentLogListResponse
|
||||
* AgentLogMessageListResponse
|
||||
*/
|
||||
export const zAgentLogListResponse = z.object({
|
||||
data: z.array(zAgentLogItemResponse),
|
||||
export const zAgentLogMessageListResponse = z.object({
|
||||
data: z.array(zAgentLogMessageItemResponse),
|
||||
has_more: z.boolean(),
|
||||
limit: z.int(),
|
||||
page: z.int(),
|
||||
@ -2287,6 +2346,15 @@ export const zPostAgentByAgentIdFilesPath = z.object({
|
||||
*/
|
||||
export const zPostAgentByAgentIdFilesResponse = zAgentDriveFileCommitResponse
|
||||
|
||||
export const zGetAgentByAgentIdLogSourcesPath = z.object({
|
||||
agent_id: z.string(),
|
||||
})
|
||||
|
||||
/**
|
||||
* Agent log sources
|
||||
*/
|
||||
export const zGetAgentByAgentIdLogSourcesResponse = zAgentLogSourceListResponse
|
||||
|
||||
export const zGetAgentByAgentIdLogsPath = z.object({
|
||||
agent_id: z.uuid(),
|
||||
})
|
||||
@ -2306,6 +2374,26 @@ export const zGetAgentByAgentIdLogsQuery = z.object({
|
||||
*/
|
||||
export const zGetAgentByAgentIdLogsResponse = zAgentLogListResponse
|
||||
|
||||
export const zGetAgentByAgentIdLogsByConversationIdMessagesPath = z.object({
|
||||
agent_id: z.string(),
|
||||
conversation_id: z.string(),
|
||||
})
|
||||
|
||||
export const zGetAgentByAgentIdLogsByConversationIdMessagesQuery = z.object({
|
||||
end: z.string().optional(),
|
||||
keyword: z.string().optional(),
|
||||
limit: z.int().gte(1).lte(100).optional().default(20),
|
||||
page: z.int().gte(1).optional().default(1),
|
||||
source: z.string().optional(),
|
||||
start: z.string().optional(),
|
||||
status: z.string().optional(),
|
||||
})
|
||||
|
||||
/**
|
||||
* Agent log messages
|
||||
*/
|
||||
export const zGetAgentByAgentIdLogsByConversationIdMessagesResponse = zAgentLogMessageListResponse
|
||||
|
||||
export const zGetAgentByAgentIdMessagesByMessageIdPath = z.object({
|
||||
agent_id: z.uuid(),
|
||||
message_id: z.uuid(),
|
||||
|
||||
Loading…
Reference in New Issue
Block a user