From 25df902ec4ca0440028070478d961617d6438221 Mon Sep 17 00:00:00 2001 From: NVIDIAN Date: Wed, 15 Apr 2026 21:39:59 -0700 Subject: [PATCH] refactor(api): add BaseModel workflow field schemas (#35297) Co-authored-by: ai-hpc --- api/fields/workflow_app_log_fields.py | 67 +++++++++- api/fields/workflow_run_fields.py | 184 +++++++++++++++++++++++++- 2 files changed, 245 insertions(+), 6 deletions(-) diff --git a/api/fields/workflow_app_log_fields.py b/api/fields/workflow_app_log_fields.py index d0e762f62b..1b2c71255d 100644 --- a/api/fields/workflow_app_log_fields.py +++ b/api/fields/workflow_app_log_fields.py @@ -1,8 +1,17 @@ -from flask_restx import Namespace, fields +from __future__ import annotations -from fields.end_user_fields import simple_end_user_fields -from fields.member_fields import simple_account_fields +from datetime import datetime +from typing import Any + +from flask_restx import Namespace, fields +from pydantic import field_validator + +from fields.base import ResponseModel +from fields.end_user_fields import SimpleEndUser, simple_end_user_fields +from fields.member_fields import SimpleAccount, simple_account_fields from fields.workflow_run_fields import ( + WorkflowRunForArchivedLogResponse, + WorkflowRunForLogResponse, build_workflow_run_for_archived_log_model, build_workflow_run_for_log_model, workflow_run_for_archived_log_fields, @@ -85,3 +94,55 @@ def build_workflow_archived_log_pagination_model(api_or_ns: Namespace): copied_fields = workflow_archived_log_pagination_fields.copy() copied_fields["data"] = fields.List(fields.Nested(workflow_archived_log_partial_model)) return api_or_ns.model("WorkflowArchivedLogPagination", copied_fields) + + +def _to_timestamp(value: datetime | int | None) -> int | None: + if isinstance(value, datetime): + return int(value.timestamp()) + return value + + +class WorkflowAppLogPartialResponse(ResponseModel): + id: str + workflow_run: WorkflowRunForLogResponse | None = None + details: Any = None + created_from: str | None = None + created_by_role: str | None = None + created_by_account: SimpleAccount | None = None + created_by_end_user: SimpleEndUser | None = None + created_at: int | None = None + + @field_validator("created_at", mode="before") + @classmethod + def _normalize_timestamp(cls, value: datetime | int | None) -> int | None: + return _to_timestamp(value) + + +class WorkflowArchivedLogPartialResponse(ResponseModel): + id: str + workflow_run: WorkflowRunForArchivedLogResponse | None = None + trigger_metadata: Any = None + created_by_account: SimpleAccount | None = None + created_by_end_user: SimpleEndUser | None = None + created_at: int | None = None + + @field_validator("created_at", mode="before") + @classmethod + def _normalize_timestamp(cls, value: datetime | int | None) -> int | None: + return _to_timestamp(value) + + +class WorkflowAppLogPaginationResponse(ResponseModel): + page: int + limit: int + total: int + has_more: bool + data: list[WorkflowAppLogPartialResponse] + + +class WorkflowArchivedLogPaginationResponse(ResponseModel): + page: int + limit: int + total: int + has_more: bool + data: list[WorkflowArchivedLogPartialResponse] diff --git a/api/fields/workflow_run_fields.py b/api/fields/workflow_run_fields.py index 35bb442c59..8c659086ed 100644 --- a/api/fields/workflow_run_fields.py +++ b/api/fields/workflow_run_fields.py @@ -1,7 +1,14 @@ -from flask_restx import Namespace, fields +from __future__ import annotations -from fields.end_user_fields import simple_end_user_fields -from fields.member_fields import simple_account_fields +from datetime import datetime +from typing import Any + +from flask_restx import Namespace, fields +from pydantic import Field, field_validator + +from fields.base import ResponseModel +from fields.end_user_fields import SimpleEndUser, simple_end_user_fields +from fields.member_fields import SimpleAccount, simple_account_fields from libs.helper import TimestampField workflow_run_for_log_fields = { @@ -147,3 +154,174 @@ workflow_run_node_execution_fields = { workflow_run_node_execution_list_fields = { "data": fields.List(fields.Nested(workflow_run_node_execution_fields)), } + + +def _to_timestamp(value: datetime | int | None) -> int | None: + if isinstance(value, datetime): + return int(value.timestamp()) + return value + + +class WorkflowRunForLogResponse(ResponseModel): + id: str + version: str | None = None + status: str | None = None + triggered_from: str | None = None + error: str | None = None + elapsed_time: float | None = None + total_tokens: int | None = None + total_steps: int | None = None + created_at: int | None = None + finished_at: int | None = None + exceptions_count: int | None = None + + @field_validator("status", mode="before") + @classmethod + def _normalize_status(cls, value: Any) -> str | None: + if value is None or isinstance(value, str): + return value + return str(getattr(value, "value", value)) + + @field_validator("created_at", "finished_at", mode="before") + @classmethod + def _normalize_timestamp(cls, value: datetime | int | None) -> int | None: + return _to_timestamp(value) + + +class WorkflowRunForArchivedLogResponse(ResponseModel): + id: str + status: str | None = None + triggered_from: str | None = None + elapsed_time: float | None = None + total_tokens: int | None = None + + @field_validator("status", mode="before") + @classmethod + def _normalize_status(cls, value: Any) -> str | None: + if value is None or isinstance(value, str): + return value + return str(getattr(value, "value", value)) + + +class WorkflowRunForListResponse(ResponseModel): + id: str + version: str | None = None + status: str | None = None + elapsed_time: float | None = None + total_tokens: int | None = None + total_steps: int | None = None + created_by_account: SimpleAccount | None = None + created_at: int | None = None + finished_at: int | None = None + exceptions_count: int | None = None + retry_index: int | None = None + + @field_validator("status", mode="before") + @classmethod + def _normalize_status(cls, value: Any) -> str | None: + if value is None or isinstance(value, str): + return value + return str(getattr(value, "value", value)) + + @field_validator("created_at", "finished_at", mode="before") + @classmethod + def _normalize_timestamp(cls, value: datetime | int | None) -> int | None: + return _to_timestamp(value) + + +class AdvancedChatWorkflowRunForListResponse(WorkflowRunForListResponse): + conversation_id: str | None = None + message_id: str | None = None + + +class AdvancedChatWorkflowRunPaginationResponse(ResponseModel): + limit: int + has_more: bool + data: list[AdvancedChatWorkflowRunForListResponse] + + +class WorkflowRunPaginationResponse(ResponseModel): + limit: int + has_more: bool + data: list[WorkflowRunForListResponse] + + +class WorkflowRunCountResponse(ResponseModel): + total: int + running: int + succeeded: int + failed: int + stopped: int + partial_succeeded: int = Field(validation_alias="partial-succeeded") + + +class WorkflowRunDetailResponse(ResponseModel): + id: str + version: str | None = None + graph: Any = Field(validation_alias="graph_dict") + inputs: Any = Field(validation_alias="inputs_dict") + status: str | None = None + outputs: Any = Field(validation_alias="outputs_dict") + error: str | None = None + elapsed_time: float | None = None + total_tokens: int | None = None + total_steps: int | None = None + created_by_role: str | None = None + created_by_account: SimpleAccount | None = None + created_by_end_user: SimpleEndUser | None = None + created_at: int | None = None + finished_at: int | None = None + exceptions_count: int | None = None + + @field_validator("status", mode="before") + @classmethod + def _normalize_status(cls, value: Any) -> str | None: + if value is None or isinstance(value, str): + return value + return str(getattr(value, "value", value)) + + @field_validator("created_at", "finished_at", mode="before") + @classmethod + def _normalize_timestamp(cls, value: datetime | int | None) -> int | None: + return _to_timestamp(value) + + +class WorkflowRunNodeExecutionResponse(ResponseModel): + id: str + index: int | None = None + predecessor_node_id: str | None = None + node_id: str | None = None + node_type: str | None = None + title: str | None = None + inputs: Any = Field(default=None, validation_alias="inputs_dict") + process_data: Any = Field(default=None, validation_alias="process_data_dict") + outputs: Any = Field(default=None, validation_alias="outputs_dict") + status: str | None = None + error: str | None = None + elapsed_time: float | None = None + execution_metadata: Any = Field(default=None, validation_alias="execution_metadata_dict") + extras: Any = None + created_at: int | None = None + created_by_role: str | None = None + created_by_account: SimpleAccount | None = None + created_by_end_user: SimpleEndUser | None = None + finished_at: int | None = None + inputs_truncated: bool | None = None + outputs_truncated: bool | None = None + process_data_truncated: bool | None = None + + @field_validator("status", mode="before") + @classmethod + def _normalize_status(cls, value: Any) -> str | None: + if value is None or isinstance(value, str): + return value + return str(getattr(value, "value", value)) + + @field_validator("created_at", "finished_at", mode="before") + @classmethod + def _normalize_timestamp(cls, value: datetime | int | None) -> int | None: + return _to_timestamp(value) + + +class WorkflowRunNodeExecutionListResponse(ResponseModel): + data: list[WorkflowRunNodeExecutionResponse]