refactor:Decouple Domain Models from Direct Database Access (#27316)

Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
This commit is contained in:
heyszt 2025-10-28 09:59:30 +08:00 committed by GitHub
parent 341b3ae7c9
commit 543c5236e7
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 595 additions and 264 deletions

View File

@ -1,10 +1,9 @@
from datetime import datetime
from decimal import Decimal
import pytz
import sqlalchemy as sa
from flask import jsonify
from flask_restx import Resource, reqparse
from sqlalchemy.orm import sessionmaker
from controllers.console import api, console_ns
from controllers.console.app.wraps import get_app_model
@ -14,10 +13,16 @@ from libs.helper import DatetimeString
from libs.login import current_account_with_tenant, login_required
from models.enums import WorkflowRunTriggeredFrom
from models.model import AppMode
from repositories.factory import DifyAPIRepositoryFactory
@console_ns.route("/apps/<uuid:app_id>/workflow/statistics/daily-conversations")
class WorkflowDailyRunsStatistic(Resource):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
session_maker = sessionmaker(bind=db.engine, expire_on_commit=False)
self._workflow_run_repo = DifyAPIRepositoryFactory.create_api_workflow_run_repository(session_maker)
@api.doc("get_workflow_daily_runs_statistic")
@api.doc(description="Get workflow daily runs statistics")
@api.doc(params={"app_id": "Application ID"})
@ -37,57 +42,44 @@ class WorkflowDailyRunsStatistic(Resource):
)
args = parser.parse_args()
sql_query = """SELECT
DATE(DATE_TRUNC('day', created_at AT TIME ZONE 'UTC' AT TIME ZONE :tz )) AS date,
COUNT(id) AS runs
FROM
workflow_runs
WHERE
app_id = :app_id
AND triggered_from = :triggered_from"""
arg_dict = {
"tz": account.timezone,
"app_id": app_model.id,
"triggered_from": WorkflowRunTriggeredFrom.APP_RUN,
}
assert account.timezone is not None
timezone = pytz.timezone(account.timezone)
utc_timezone = pytz.utc
start_date = None
end_date = None
if args["start"]:
start_datetime = datetime.strptime(args["start"], "%Y-%m-%d %H:%M")
start_datetime = start_datetime.replace(second=0)
start_datetime_timezone = timezone.localize(start_datetime)
start_datetime_utc = start_datetime_timezone.astimezone(utc_timezone)
sql_query += " AND created_at >= :start"
arg_dict["start"] = start_datetime_utc
start_date = start_datetime_timezone.astimezone(utc_timezone)
if args["end"]:
end_datetime = datetime.strptime(args["end"], "%Y-%m-%d %H:%M")
end_datetime = end_datetime.replace(second=0)
end_datetime_timezone = timezone.localize(end_datetime)
end_datetime_utc = end_datetime_timezone.astimezone(utc_timezone)
end_date = end_datetime_timezone.astimezone(utc_timezone)
sql_query += " AND created_at < :end"
arg_dict["end"] = end_datetime_utc
sql_query += " GROUP BY date ORDER BY date"
response_data = []
with db.engine.begin() as conn:
rs = conn.execute(sa.text(sql_query), arg_dict)
for i in rs:
response_data.append({"date": str(i.date), "runs": i.runs})
response_data = self._workflow_run_repo.get_daily_runs_statistics(
tenant_id=app_model.tenant_id,
app_id=app_model.id,
triggered_from=WorkflowRunTriggeredFrom.APP_RUN,
start_date=start_date,
end_date=end_date,
timezone=account.timezone,
)
return jsonify({"data": response_data})
@console_ns.route("/apps/<uuid:app_id>/workflow/statistics/daily-terminals")
class WorkflowDailyTerminalsStatistic(Resource):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
session_maker = sessionmaker(bind=db.engine, expire_on_commit=False)
self._workflow_run_repo = DifyAPIRepositoryFactory.create_api_workflow_run_repository(session_maker)
@api.doc("get_workflow_daily_terminals_statistic")
@api.doc(description="Get workflow daily terminals statistics")
@api.doc(params={"app_id": "Application ID"})
@ -107,57 +99,44 @@ class WorkflowDailyTerminalsStatistic(Resource):
)
args = parser.parse_args()
sql_query = """SELECT
DATE(DATE_TRUNC('day', created_at AT TIME ZONE 'UTC' AT TIME ZONE :tz )) AS date,
COUNT(DISTINCT workflow_runs.created_by) AS terminal_count
FROM
workflow_runs
WHERE
app_id = :app_id
AND triggered_from = :triggered_from"""
arg_dict = {
"tz": account.timezone,
"app_id": app_model.id,
"triggered_from": WorkflowRunTriggeredFrom.APP_RUN,
}
assert account.timezone is not None
timezone = pytz.timezone(account.timezone)
utc_timezone = pytz.utc
start_date = None
end_date = None
if args["start"]:
start_datetime = datetime.strptime(args["start"], "%Y-%m-%d %H:%M")
start_datetime = start_datetime.replace(second=0)
start_datetime_timezone = timezone.localize(start_datetime)
start_datetime_utc = start_datetime_timezone.astimezone(utc_timezone)
sql_query += " AND created_at >= :start"
arg_dict["start"] = start_datetime_utc
start_date = start_datetime_timezone.astimezone(utc_timezone)
if args["end"]:
end_datetime = datetime.strptime(args["end"], "%Y-%m-%d %H:%M")
end_datetime = end_datetime.replace(second=0)
end_datetime_timezone = timezone.localize(end_datetime)
end_datetime_utc = end_datetime_timezone.astimezone(utc_timezone)
end_date = end_datetime_timezone.astimezone(utc_timezone)
sql_query += " AND created_at < :end"
arg_dict["end"] = end_datetime_utc
sql_query += " GROUP BY date ORDER BY date"
response_data = []
with db.engine.begin() as conn:
rs = conn.execute(sa.text(sql_query), arg_dict)
for i in rs:
response_data.append({"date": str(i.date), "terminal_count": i.terminal_count})
response_data = self._workflow_run_repo.get_daily_terminals_statistics(
tenant_id=app_model.tenant_id,
app_id=app_model.id,
triggered_from=WorkflowRunTriggeredFrom.APP_RUN,
start_date=start_date,
end_date=end_date,
timezone=account.timezone,
)
return jsonify({"data": response_data})
@console_ns.route("/apps/<uuid:app_id>/workflow/statistics/token-costs")
class WorkflowDailyTokenCostStatistic(Resource):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
session_maker = sessionmaker(bind=db.engine, expire_on_commit=False)
self._workflow_run_repo = DifyAPIRepositoryFactory.create_api_workflow_run_repository(session_maker)
@api.doc("get_workflow_daily_token_cost_statistic")
@api.doc(description="Get workflow daily token cost statistics")
@api.doc(params={"app_id": "Application ID"})
@ -177,62 +156,44 @@ class WorkflowDailyTokenCostStatistic(Resource):
)
args = parser.parse_args()
sql_query = """SELECT
DATE(DATE_TRUNC('day', created_at AT TIME ZONE 'UTC' AT TIME ZONE :tz )) AS date,
SUM(workflow_runs.total_tokens) AS token_count
FROM
workflow_runs
WHERE
app_id = :app_id
AND triggered_from = :triggered_from"""
arg_dict = {
"tz": account.timezone,
"app_id": app_model.id,
"triggered_from": WorkflowRunTriggeredFrom.APP_RUN,
}
assert account.timezone is not None
timezone = pytz.timezone(account.timezone)
utc_timezone = pytz.utc
start_date = None
end_date = None
if args["start"]:
start_datetime = datetime.strptime(args["start"], "%Y-%m-%d %H:%M")
start_datetime = start_datetime.replace(second=0)
start_datetime_timezone = timezone.localize(start_datetime)
start_datetime_utc = start_datetime_timezone.astimezone(utc_timezone)
sql_query += " AND created_at >= :start"
arg_dict["start"] = start_datetime_utc
start_date = start_datetime_timezone.astimezone(utc_timezone)
if args["end"]:
end_datetime = datetime.strptime(args["end"], "%Y-%m-%d %H:%M")
end_datetime = end_datetime.replace(second=0)
end_datetime_timezone = timezone.localize(end_datetime)
end_datetime_utc = end_datetime_timezone.astimezone(utc_timezone)
end_date = end_datetime_timezone.astimezone(utc_timezone)
sql_query += " AND created_at < :end"
arg_dict["end"] = end_datetime_utc
sql_query += " GROUP BY date ORDER BY date"
response_data = []
with db.engine.begin() as conn:
rs = conn.execute(sa.text(sql_query), arg_dict)
for i in rs:
response_data.append(
{
"date": str(i.date),
"token_count": i.token_count,
}
)
response_data = self._workflow_run_repo.get_daily_token_cost_statistics(
tenant_id=app_model.tenant_id,
app_id=app_model.id,
triggered_from=WorkflowRunTriggeredFrom.APP_RUN,
start_date=start_date,
end_date=end_date,
timezone=account.timezone,
)
return jsonify({"data": response_data})
@console_ns.route("/apps/<uuid:app_id>/workflow/statistics/average-app-interactions")
class WorkflowAverageAppInteractionStatistic(Resource):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
session_maker = sessionmaker(bind=db.engine, expire_on_commit=False)
self._workflow_run_repo = DifyAPIRepositoryFactory.create_api_workflow_run_repository(session_maker)
@api.doc("get_workflow_average_app_interaction_statistic")
@api.doc(description="Get workflow average app interaction statistics")
@api.doc(params={"app_id": "Application ID"})
@ -252,67 +213,32 @@ class WorkflowAverageAppInteractionStatistic(Resource):
)
args = parser.parse_args()
sql_query = """SELECT
AVG(sub.interactions) AS interactions,
sub.date
FROM
(
SELECT
DATE(DATE_TRUNC('day', c.created_at AT TIME ZONE 'UTC' AT TIME ZONE :tz )) AS date,
c.created_by,
COUNT(c.id) AS interactions
FROM
workflow_runs c
WHERE
c.app_id = :app_id
AND c.triggered_from = :triggered_from
{{start}}
{{end}}
GROUP BY
date, c.created_by
) sub
GROUP BY
sub.date"""
arg_dict = {
"tz": account.timezone,
"app_id": app_model.id,
"triggered_from": WorkflowRunTriggeredFrom.APP_RUN,
}
assert account.timezone is not None
timezone = pytz.timezone(account.timezone)
utc_timezone = pytz.utc
start_date = None
end_date = None
if args["start"]:
start_datetime = datetime.strptime(args["start"], "%Y-%m-%d %H:%M")
start_datetime = start_datetime.replace(second=0)
start_datetime_timezone = timezone.localize(start_datetime)
start_datetime_utc = start_datetime_timezone.astimezone(utc_timezone)
sql_query = sql_query.replace("{{start}}", " AND c.created_at >= :start")
arg_dict["start"] = start_datetime_utc
else:
sql_query = sql_query.replace("{{start}}", "")
start_date = start_datetime_timezone.astimezone(utc_timezone)
if args["end"]:
end_datetime = datetime.strptime(args["end"], "%Y-%m-%d %H:%M")
end_datetime = end_datetime.replace(second=0)
end_datetime_timezone = timezone.localize(end_datetime)
end_datetime_utc = end_datetime_timezone.astimezone(utc_timezone)
end_date = end_datetime_timezone.astimezone(utc_timezone)
sql_query = sql_query.replace("{{end}}", " AND c.created_at < :end")
arg_dict["end"] = end_datetime_utc
else:
sql_query = sql_query.replace("{{end}}", "")
response_data = []
with db.engine.begin() as conn:
rs = conn.execute(sa.text(sql_query), arg_dict)
for i in rs:
response_data.append(
{"date": str(i.date), "interactions": float(i.interactions.quantize(Decimal("0.01")))}
)
response_data = self._workflow_run_repo.get_average_app_interaction_statistics(
tenant_id=app_model.tenant_id,
app_id=app_model.id,
triggered_from=WorkflowRunTriggeredFrom.APP_RUN,
start_date=start_date,
end_date=end_date,
timezone=account.timezone,
)
return jsonify({"data": response_data})

View File

@ -1,6 +1,7 @@
from collections.abc import Sequence
from sqlalchemy import select
from sqlalchemy.orm import sessionmaker
from core.app.app_config.features.file_upload.manager import FileUploadConfigManager
from core.file import file_manager
@ -18,7 +19,9 @@ from core.prompt.utils.extract_thread_messages import extract_thread_messages
from extensions.ext_database import db
from factories import file_factory
from models.model import AppMode, Conversation, Message, MessageFile
from models.workflow import Workflow, WorkflowRun
from models.workflow import Workflow
from repositories.api_workflow_run_repository import APIWorkflowRunRepository
from repositories.factory import DifyAPIRepositoryFactory
class TokenBufferMemory:
@ -29,6 +32,14 @@ class TokenBufferMemory:
):
self.conversation = conversation
self.model_instance = model_instance
self._workflow_run_repo: APIWorkflowRunRepository | None = None
@property
def workflow_run_repo(self) -> APIWorkflowRunRepository:
if self._workflow_run_repo is None:
session_maker = sessionmaker(bind=db.engine, expire_on_commit=False)
self._workflow_run_repo = DifyAPIRepositoryFactory.create_api_workflow_run_repository(session_maker)
return self._workflow_run_repo
def _build_prompt_message_with_files(
self,
@ -50,7 +61,16 @@ class TokenBufferMemory:
if self.conversation.mode in {AppMode.AGENT_CHAT, AppMode.COMPLETION, AppMode.CHAT}:
file_extra_config = FileUploadConfigManager.convert(self.conversation.model_config)
elif self.conversation.mode in {AppMode.ADVANCED_CHAT, AppMode.WORKFLOW}:
workflow_run = db.session.scalar(select(WorkflowRun).where(WorkflowRun.id == message.workflow_run_id))
app = self.conversation.app
if not app:
raise ValueError("App not found for conversation")
if not message.workflow_run_id:
raise ValueError("Workflow run ID not found")
workflow_run = self.workflow_run_repo.get_workflow_run_by_id(
tenant_id=app.tenant_id, app_id=app.id, run_id=message.workflow_run_id
)
if not workflow_run:
raise ValueError(f"Workflow run not found: {message.workflow_run_id}")
workflow = db.session.scalar(select(Workflow).where(Workflow.id == workflow_run.workflow_id))

View File

@ -12,7 +12,7 @@ from uuid import UUID, uuid4
from cachetools import LRUCache
from flask import current_app
from sqlalchemy import select
from sqlalchemy.orm import Session
from sqlalchemy.orm import Session, sessionmaker
from core.helper.encrypter import decrypt_token, encrypt_token, obfuscated_token
from core.ops.entities.config_entity import (
@ -34,7 +34,8 @@ from core.ops.utils import get_message_data
from extensions.ext_database import db
from extensions.ext_storage import storage
from models.model import App, AppModelConfig, Conversation, Message, MessageFile, TraceAppConfig
from models.workflow import WorkflowAppLog, WorkflowRun
from models.workflow import WorkflowAppLog
from repositories.factory import DifyAPIRepositoryFactory
from tasks.ops_trace_task import process_trace_tasks
if TYPE_CHECKING:
@ -419,6 +420,18 @@ class OpsTraceManager:
class TraceTask:
_workflow_run_repo = None
_repo_lock = threading.Lock()
@classmethod
def _get_workflow_run_repo(cls):
if cls._workflow_run_repo is None:
with cls._repo_lock:
if cls._workflow_run_repo is None:
session_maker = sessionmaker(bind=db.engine, expire_on_commit=False)
cls._workflow_run_repo = DifyAPIRepositoryFactory.create_api_workflow_run_repository(session_maker)
return cls._workflow_run_repo
def __init__(
self,
trace_type: Any,
@ -486,27 +499,27 @@ class TraceTask:
if not workflow_run_id:
return {}
workflow_run_repo = self._get_workflow_run_repo()
workflow_run = workflow_run_repo.get_workflow_run_by_id_without_tenant(run_id=workflow_run_id)
if not workflow_run:
raise ValueError("Workflow run not found")
workflow_id = workflow_run.workflow_id
tenant_id = workflow_run.tenant_id
workflow_run_id = workflow_run.id
workflow_run_elapsed_time = workflow_run.elapsed_time
workflow_run_status = workflow_run.status
workflow_run_inputs = workflow_run.inputs_dict
workflow_run_outputs = workflow_run.outputs_dict
workflow_run_version = workflow_run.version
error = workflow_run.error or ""
total_tokens = workflow_run.total_tokens
file_list = workflow_run_inputs.get("sys.file") or []
query = workflow_run_inputs.get("query") or workflow_run_inputs.get("sys.query") or ""
with Session(db.engine) as session:
workflow_run_stmt = select(WorkflowRun).where(WorkflowRun.id == workflow_run_id)
workflow_run = session.scalars(workflow_run_stmt).first()
if not workflow_run:
raise ValueError("Workflow run not found")
workflow_id = workflow_run.workflow_id
tenant_id = workflow_run.tenant_id
workflow_run_id = workflow_run.id
workflow_run_elapsed_time = workflow_run.elapsed_time
workflow_run_status = workflow_run.status
workflow_run_inputs = workflow_run.inputs_dict
workflow_run_outputs = workflow_run.outputs_dict
workflow_run_version = workflow_run.version
error = workflow_run.error or ""
total_tokens = workflow_run.total_tokens
file_list = workflow_run_inputs.get("sys.file") or []
query = workflow_run_inputs.get("query") or workflow_run_inputs.get("sys.query") or ""
# get workflow_app_log_id
workflow_app_log_data_stmt = select(WorkflowAppLog.id).where(
WorkflowAppLog.tenant_id == tenant_id,
@ -523,43 +536,43 @@ class TraceTask:
)
message_id = session.scalar(message_data_stmt)
metadata = {
"workflow_id": workflow_id,
"conversation_id": conversation_id,
"workflow_run_id": workflow_run_id,
"tenant_id": tenant_id,
"elapsed_time": workflow_run_elapsed_time,
"status": workflow_run_status,
"version": workflow_run_version,
"total_tokens": total_tokens,
"file_list": file_list,
"triggered_from": workflow_run.triggered_from,
"user_id": user_id,
"app_id": workflow_run.app_id,
}
metadata = {
"workflow_id": workflow_id,
"conversation_id": conversation_id,
"workflow_run_id": workflow_run_id,
"tenant_id": tenant_id,
"elapsed_time": workflow_run_elapsed_time,
"status": workflow_run_status,
"version": workflow_run_version,
"total_tokens": total_tokens,
"file_list": file_list,
"triggered_from": workflow_run.triggered_from,
"user_id": user_id,
"app_id": workflow_run.app_id,
}
workflow_trace_info = WorkflowTraceInfo(
trace_id=self.trace_id,
workflow_data=workflow_run.to_dict(),
conversation_id=conversation_id,
workflow_id=workflow_id,
tenant_id=tenant_id,
workflow_run_id=workflow_run_id,
workflow_run_elapsed_time=workflow_run_elapsed_time,
workflow_run_status=workflow_run_status,
workflow_run_inputs=workflow_run_inputs,
workflow_run_outputs=workflow_run_outputs,
workflow_run_version=workflow_run_version,
error=error,
total_tokens=total_tokens,
file_list=file_list,
query=query,
metadata=metadata,
workflow_app_log_id=workflow_app_log_id,
message_id=message_id,
start_time=workflow_run.created_at,
end_time=workflow_run.finished_at,
)
workflow_trace_info = WorkflowTraceInfo(
trace_id=self.trace_id,
workflow_data=workflow_run.to_dict(),
conversation_id=conversation_id,
workflow_id=workflow_id,
tenant_id=tenant_id,
workflow_run_id=workflow_run_id,
workflow_run_elapsed_time=workflow_run_elapsed_time,
workflow_run_status=workflow_run_status,
workflow_run_inputs=workflow_run_inputs,
workflow_run_outputs=workflow_run_outputs,
workflow_run_version=workflow_run_version,
error=error,
total_tokens=total_tokens,
file_list=file_list,
query=query,
metadata=metadata,
workflow_app_log_id=workflow_app_log_id,
message_id=message_id,
start_time=workflow_run.created_at,
end_time=workflow_run.finished_at,
)
return workflow_trace_info
def message_trace(self, message_id: str | None):

View File

@ -28,7 +28,7 @@ Example:
runs = repo.get_paginated_workflow_runs(
tenant_id="tenant-123",
app_id="app-456",
triggered_from="debugging",
triggered_from=WorkflowRunTriggeredFrom.DEBUGGING,
limit=20
)
```
@ -40,7 +40,14 @@ from typing import Protocol
from core.workflow.repositories.workflow_execution_repository import WorkflowExecutionRepository
from libs.infinite_scroll_pagination import InfiniteScrollPagination
from models.enums import WorkflowRunTriggeredFrom
from models.workflow import WorkflowRun
from repositories.types import (
AverageInteractionStats,
DailyRunsStats,
DailyTerminalsStats,
DailyTokenCostStats,
)
class APIWorkflowRunRepository(WorkflowExecutionRepository, Protocol):
@ -56,7 +63,7 @@ class APIWorkflowRunRepository(WorkflowExecutionRepository, Protocol):
self,
tenant_id: str,
app_id: str,
triggered_from: str,
triggered_from: WorkflowRunTriggeredFrom | Sequence[WorkflowRunTriggeredFrom],
limit: int = 20,
last_id: str | None = None,
status: str | None = None,
@ -71,7 +78,7 @@ class APIWorkflowRunRepository(WorkflowExecutionRepository, Protocol):
Args:
tenant_id: Tenant identifier for multi-tenant isolation
app_id: Application identifier
triggered_from: Filter by trigger source (e.g., "debugging", "app-run")
triggered_from: Filter by trigger source(s) (e.g., "debugging", "app-run", or list of values)
limit: Maximum number of records to return (default: 20)
last_id: Cursor for pagination - ID of the last record from previous page
status: Optional filter by status (e.g., "running", "succeeded", "failed")
@ -109,6 +116,31 @@ class APIWorkflowRunRepository(WorkflowExecutionRepository, Protocol):
"""
...
def get_workflow_run_by_id_without_tenant(
self,
run_id: str,
) -> WorkflowRun | None:
"""
Get a specific workflow run by ID without tenant/app context.
Retrieves a single workflow run using only the run ID, without
requiring tenant_id or app_id. This method is intended for internal
system operations like tracing and monitoring where the tenant context
is not available upfront.
Args:
run_id: Workflow run identifier
Returns:
WorkflowRun object if found, None otherwise
Note:
This method bypasses tenant isolation checks and should only be used
in trusted system contexts like ops trace collection. For user-facing
operations, use get_workflow_run_by_id() with proper tenant isolation.
"""
...
def get_workflow_runs_count(
self,
tenant_id: str,
@ -218,3 +250,119 @@ class APIWorkflowRunRepository(WorkflowExecutionRepository, Protocol):
and ensure proper data retention policies are followed.
"""
...
def get_daily_runs_statistics(
self,
tenant_id: str,
app_id: str,
triggered_from: str,
start_date: datetime | None = None,
end_date: datetime | None = None,
timezone: str = "UTC",
) -> list[DailyRunsStats]:
"""
Get daily runs statistics.
Retrieves daily workflow runs count grouped by date for a specific app
and trigger source. Used for workflow statistics dashboard.
Args:
tenant_id: Tenant identifier for multi-tenant isolation
app_id: Application identifier
triggered_from: Filter by trigger source (e.g., "app-run")
start_date: Optional start date filter
end_date: Optional end date filter
timezone: Timezone for date grouping (default: "UTC")
Returns:
List of dictionaries containing date and runs count:
[{"date": "2024-01-01", "runs": 10}, ...]
"""
...
def get_daily_terminals_statistics(
self,
tenant_id: str,
app_id: str,
triggered_from: str,
start_date: datetime | None = None,
end_date: datetime | None = None,
timezone: str = "UTC",
) -> list[DailyTerminalsStats]:
"""
Get daily terminals statistics.
Retrieves daily unique terminal count grouped by date for a specific app
and trigger source. Used for workflow statistics dashboard.
Args:
tenant_id: Tenant identifier for multi-tenant isolation
app_id: Application identifier
triggered_from: Filter by trigger source (e.g., "app-run")
start_date: Optional start date filter
end_date: Optional end date filter
timezone: Timezone for date grouping (default: "UTC")
Returns:
List of dictionaries containing date and terminal count:
[{"date": "2024-01-01", "terminal_count": 5}, ...]
"""
...
def get_daily_token_cost_statistics(
self,
tenant_id: str,
app_id: str,
triggered_from: str,
start_date: datetime | None = None,
end_date: datetime | None = None,
timezone: str = "UTC",
) -> list[DailyTokenCostStats]:
"""
Get daily token cost statistics.
Retrieves daily total token count grouped by date for a specific app
and trigger source. Used for workflow statistics dashboard.
Args:
tenant_id: Tenant identifier for multi-tenant isolation
app_id: Application identifier
triggered_from: Filter by trigger source (e.g., "app-run")
start_date: Optional start date filter
end_date: Optional end date filter
timezone: Timezone for date grouping (default: "UTC")
Returns:
List of dictionaries containing date and token count:
[{"date": "2024-01-01", "token_count": 1000}, ...]
"""
...
def get_average_app_interaction_statistics(
self,
tenant_id: str,
app_id: str,
triggered_from: str,
start_date: datetime | None = None,
end_date: datetime | None = None,
timezone: str = "UTC",
) -> list[AverageInteractionStats]:
"""
Get average app interaction statistics.
Retrieves daily average interactions per user grouped by date for a specific app
and trigger source. Used for workflow statistics dashboard.
Args:
tenant_id: Tenant identifier for multi-tenant isolation
app_id: Application identifier
triggered_from: Filter by trigger source (e.g., "app-run")
start_date: Optional start date filter
end_date: Optional end date filter
timezone: Timezone for date grouping (default: "UTC")
Returns:
List of dictionaries containing date and average interactions:
[{"date": "2024-01-01", "interactions": 2.5}, ...]
"""
...

View File

@ -22,16 +22,25 @@ Implementation Notes:
import logging
from collections.abc import Sequence
from datetime import datetime
from typing import cast
from decimal import Decimal
from typing import Any, cast
import sqlalchemy as sa
from sqlalchemy import delete, func, select
from sqlalchemy.engine import CursorResult
from sqlalchemy.orm import Session, sessionmaker
from libs.infinite_scroll_pagination import InfiniteScrollPagination
from libs.time_parser import get_time_threshold
from models.enums import WorkflowRunTriggeredFrom
from models.workflow import WorkflowRun
from repositories.api_workflow_run_repository import APIWorkflowRunRepository
from repositories.types import (
AverageInteractionStats,
DailyRunsStats,
DailyTerminalsStats,
DailyTokenCostStats,
)
logger = logging.getLogger(__name__)
@ -61,7 +70,7 @@ class DifyAPISQLAlchemyWorkflowRunRepository(APIWorkflowRunRepository):
self,
tenant_id: str,
app_id: str,
triggered_from: str,
triggered_from: WorkflowRunTriggeredFrom | Sequence[WorkflowRunTriggeredFrom],
limit: int = 20,
last_id: str | None = None,
status: str | None = None,
@ -78,9 +87,14 @@ class DifyAPISQLAlchemyWorkflowRunRepository(APIWorkflowRunRepository):
base_stmt = select(WorkflowRun).where(
WorkflowRun.tenant_id == tenant_id,
WorkflowRun.app_id == app_id,
WorkflowRun.triggered_from == triggered_from,
)
# Handle triggered_from values
if isinstance(triggered_from, WorkflowRunTriggeredFrom):
triggered_from = [triggered_from]
if triggered_from:
base_stmt = base_stmt.where(WorkflowRun.triggered_from.in_(triggered_from))
# Add optional status filter
if status:
base_stmt = base_stmt.where(WorkflowRun.status == status)
@ -126,6 +140,17 @@ class DifyAPISQLAlchemyWorkflowRunRepository(APIWorkflowRunRepository):
)
return session.scalar(stmt)
def get_workflow_run_by_id_without_tenant(
self,
run_id: str,
) -> WorkflowRun | None:
"""
Get a specific workflow run by ID without tenant/app context.
"""
with self._session_maker() as session:
stmt = select(WorkflowRun).where(WorkflowRun.id == run_id)
return session.scalar(stmt)
def get_workflow_runs_count(
self,
tenant_id: str,
@ -275,3 +300,213 @@ class DifyAPISQLAlchemyWorkflowRunRepository(APIWorkflowRunRepository):
logger.info("Total deleted %s workflow runs for app %s", total_deleted, app_id)
return total_deleted
def get_daily_runs_statistics(
self,
tenant_id: str,
app_id: str,
triggered_from: str,
start_date: datetime | None = None,
end_date: datetime | None = None,
timezone: str = "UTC",
) -> list[DailyRunsStats]:
"""
Get daily runs statistics using raw SQL for optimal performance.
"""
sql_query = """SELECT
DATE(DATE_TRUNC('day', created_at AT TIME ZONE 'UTC' AT TIME ZONE :tz )) AS date,
COUNT(id) AS runs
FROM
workflow_runs
WHERE
tenant_id = :tenant_id
AND app_id = :app_id
AND triggered_from = :triggered_from"""
arg_dict: dict[str, Any] = {
"tz": timezone,
"tenant_id": tenant_id,
"app_id": app_id,
"triggered_from": triggered_from,
}
if start_date:
sql_query += " AND created_at >= :start_date"
arg_dict["start_date"] = start_date
if end_date:
sql_query += " AND created_at < :end_date"
arg_dict["end_date"] = end_date
sql_query += " GROUP BY date ORDER BY date"
response_data = []
with self._session_maker() as session:
rs = session.execute(sa.text(sql_query), arg_dict)
for row in rs:
response_data.append({"date": str(row.date), "runs": row.runs})
return cast(list[DailyRunsStats], response_data)
def get_daily_terminals_statistics(
self,
tenant_id: str,
app_id: str,
triggered_from: str,
start_date: datetime | None = None,
end_date: datetime | None = None,
timezone: str = "UTC",
) -> list[DailyTerminalsStats]:
"""
Get daily terminals statistics using raw SQL for optimal performance.
"""
sql_query = """SELECT
DATE(DATE_TRUNC('day', created_at AT TIME ZONE 'UTC' AT TIME ZONE :tz )) AS date,
COUNT(DISTINCT created_by) AS terminal_count
FROM
workflow_runs
WHERE
tenant_id = :tenant_id
AND app_id = :app_id
AND triggered_from = :triggered_from"""
arg_dict: dict[str, Any] = {
"tz": timezone,
"tenant_id": tenant_id,
"app_id": app_id,
"triggered_from": triggered_from,
}
if start_date:
sql_query += " AND created_at >= :start_date"
arg_dict["start_date"] = start_date
if end_date:
sql_query += " AND created_at < :end_date"
arg_dict["end_date"] = end_date
sql_query += " GROUP BY date ORDER BY date"
response_data = []
with self._session_maker() as session:
rs = session.execute(sa.text(sql_query), arg_dict)
for row in rs:
response_data.append({"date": str(row.date), "terminal_count": row.terminal_count})
return cast(list[DailyTerminalsStats], response_data)
def get_daily_token_cost_statistics(
self,
tenant_id: str,
app_id: str,
triggered_from: str,
start_date: datetime | None = None,
end_date: datetime | None = None,
timezone: str = "UTC",
) -> list[DailyTokenCostStats]:
"""
Get daily token cost statistics using raw SQL for optimal performance.
"""
sql_query = """SELECT
DATE(DATE_TRUNC('day', created_at AT TIME ZONE 'UTC' AT TIME ZONE :tz )) AS date,
SUM(total_tokens) AS token_count
FROM
workflow_runs
WHERE
tenant_id = :tenant_id
AND app_id = :app_id
AND triggered_from = :triggered_from"""
arg_dict: dict[str, Any] = {
"tz": timezone,
"tenant_id": tenant_id,
"app_id": app_id,
"triggered_from": triggered_from,
}
if start_date:
sql_query += " AND created_at >= :start_date"
arg_dict["start_date"] = start_date
if end_date:
sql_query += " AND created_at < :end_date"
arg_dict["end_date"] = end_date
sql_query += " GROUP BY date ORDER BY date"
response_data = []
with self._session_maker() as session:
rs = session.execute(sa.text(sql_query), arg_dict)
for row in rs:
response_data.append(
{
"date": str(row.date),
"token_count": row.token_count,
}
)
return cast(list[DailyTokenCostStats], response_data)
def get_average_app_interaction_statistics(
self,
tenant_id: str,
app_id: str,
triggered_from: str,
start_date: datetime | None = None,
end_date: datetime | None = None,
timezone: str = "UTC",
) -> list[AverageInteractionStats]:
"""
Get average app interaction statistics using raw SQL for optimal performance.
"""
sql_query = """SELECT
AVG(sub.interactions) AS interactions,
sub.date
FROM
(
SELECT
DATE(DATE_TRUNC('day', c.created_at AT TIME ZONE 'UTC' AT TIME ZONE :tz )) AS date,
c.created_by,
COUNT(c.id) AS interactions
FROM
workflow_runs c
WHERE
c.tenant_id = :tenant_id
AND c.app_id = :app_id
AND c.triggered_from = :triggered_from
{{start}}
{{end}}
GROUP BY
date, c.created_by
) sub
GROUP BY
sub.date"""
arg_dict: dict[str, Any] = {
"tz": timezone,
"tenant_id": tenant_id,
"app_id": app_id,
"triggered_from": triggered_from,
}
if start_date:
sql_query = sql_query.replace("{{start}}", " AND c.created_at >= :start_date")
arg_dict["start_date"] = start_date
else:
sql_query = sql_query.replace("{{start}}", "")
if end_date:
sql_query = sql_query.replace("{{end}}", " AND c.created_at < :end_date")
arg_dict["end_date"] = end_date
else:
sql_query = sql_query.replace("{{end}}", "")
response_data = []
with self._session_maker() as session:
rs = session.execute(sa.text(sql_query), arg_dict)
for row in rs:
response_data.append(
{"date": str(row.date), "interactions": float(row.interactions.quantize(Decimal("0.01")))}
)
return cast(list[AverageInteractionStats], response_data)

21
api/repositories/types.py Normal file
View File

@ -0,0 +1,21 @@
from typing import TypedDict
class DailyRunsStats(TypedDict):
date: str
runs: int
class DailyTerminalsStats(TypedDict):
date: str
terminal_count: int
class DailyTokenCostStats(TypedDict):
date: str
token_count: int
class AverageInteractionStats(TypedDict):
date: str
interactions: float

View File

@ -9,7 +9,7 @@ from typing import Any, Union, cast
from uuid import uuid4
from flask_login import current_user
from sqlalchemy import func, or_, select
from sqlalchemy import func, select
from sqlalchemy.orm import Session, sessionmaker
import contexts
@ -94,6 +94,7 @@ class RagPipelineService:
self._node_execution_service_repo = DifyAPIRepositoryFactory.create_api_workflow_node_execution_repository(
session_maker
)
self._workflow_run_repo = DifyAPIRepositoryFactory.create_api_workflow_run_repository(session_maker)
@classmethod
def get_pipeline_templates(cls, type: str = "built-in", language: str = "en-US") -> dict:
@ -1015,48 +1016,21 @@ class RagPipelineService:
:param args: request args
"""
limit = int(args.get("limit", 20))
last_id = args.get("last_id")
base_query = db.session.query(WorkflowRun).where(
WorkflowRun.tenant_id == pipeline.tenant_id,
WorkflowRun.app_id == pipeline.id,
or_(
WorkflowRun.triggered_from == WorkflowRunTriggeredFrom.RAG_PIPELINE_RUN.value,
WorkflowRun.triggered_from == WorkflowRunTriggeredFrom.RAG_PIPELINE_DEBUGGING.value,
),
triggered_from_values = [
WorkflowRunTriggeredFrom.RAG_PIPELINE_RUN,
WorkflowRunTriggeredFrom.RAG_PIPELINE_DEBUGGING,
]
return self._workflow_run_repo.get_paginated_workflow_runs(
tenant_id=pipeline.tenant_id,
app_id=pipeline.id,
triggered_from=triggered_from_values,
limit=limit,
last_id=last_id,
)
if args.get("last_id"):
last_workflow_run = base_query.where(
WorkflowRun.id == args.get("last_id"),
).first()
if not last_workflow_run:
raise ValueError("Last workflow run not exists")
workflow_runs = (
base_query.where(
WorkflowRun.created_at < last_workflow_run.created_at, WorkflowRun.id != last_workflow_run.id
)
.order_by(WorkflowRun.created_at.desc())
.limit(limit)
.all()
)
else:
workflow_runs = base_query.order_by(WorkflowRun.created_at.desc()).limit(limit).all()
has_more = False
if len(workflow_runs) == limit:
current_page_first_workflow_run = workflow_runs[-1]
rest_count = base_query.where(
WorkflowRun.created_at < current_page_first_workflow_run.created_at,
WorkflowRun.id != current_page_first_workflow_run.id,
).count()
if rest_count > 0:
has_more = True
return InfiniteScrollPagination(data=workflow_runs, limit=limit, has_more=has_more)
def get_rag_pipeline_workflow_run(self, pipeline: Pipeline, run_id: str) -> WorkflowRun | None:
"""
Get workflow run detail
@ -1064,18 +1038,12 @@ class RagPipelineService:
:param app_model: app model
:param run_id: workflow run id
"""
workflow_run = (
db.session.query(WorkflowRun)
.where(
WorkflowRun.tenant_id == pipeline.tenant_id,
WorkflowRun.app_id == pipeline.id,
WorkflowRun.id == run_id,
)
.first()
return self._workflow_run_repo.get_workflow_run_by_id(
tenant_id=pipeline.tenant_id,
app_id=pipeline.id,
run_id=run_id,
)
return workflow_run
def get_rag_pipeline_workflow_run_node_executions(
self,
pipeline: Pipeline,