diff --git a/api/controllers/console/app/workflow_statistic.py b/api/controllers/console/app/workflow_statistic.py index bbea04640a..c246b3ffd5 100644 --- a/api/controllers/console/app/workflow_statistic.py +++ b/api/controllers/console/app/workflow_statistic.py @@ -1,10 +1,9 @@ from datetime import datetime -from decimal import Decimal import pytz -import sqlalchemy as sa from flask import jsonify from flask_restx import Resource, reqparse +from sqlalchemy.orm import sessionmaker from controllers.console import api, console_ns from controllers.console.app.wraps import get_app_model @@ -14,10 +13,16 @@ from libs.helper import DatetimeString from libs.login import current_account_with_tenant, login_required from models.enums import WorkflowRunTriggeredFrom from models.model import AppMode +from repositories.factory import DifyAPIRepositoryFactory @console_ns.route("/apps//workflow/statistics/daily-conversations") class WorkflowDailyRunsStatistic(Resource): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + session_maker = sessionmaker(bind=db.engine, expire_on_commit=False) + self._workflow_run_repo = DifyAPIRepositoryFactory.create_api_workflow_run_repository(session_maker) + @api.doc("get_workflow_daily_runs_statistic") @api.doc(description="Get workflow daily runs statistics") @api.doc(params={"app_id": "Application ID"}) @@ -37,57 +42,44 @@ class WorkflowDailyRunsStatistic(Resource): ) args = parser.parse_args() - sql_query = """SELECT - DATE(DATE_TRUNC('day', created_at AT TIME ZONE 'UTC' AT TIME ZONE :tz )) AS date, - COUNT(id) AS runs -FROM - workflow_runs -WHERE - app_id = :app_id - AND triggered_from = :triggered_from""" - arg_dict = { - "tz": account.timezone, - "app_id": app_model.id, - "triggered_from": WorkflowRunTriggeredFrom.APP_RUN, - } assert account.timezone is not None timezone = pytz.timezone(account.timezone) utc_timezone = pytz.utc + start_date = None + end_date = None + if args["start"]: start_datetime = datetime.strptime(args["start"], "%Y-%m-%d %H:%M") start_datetime = start_datetime.replace(second=0) - start_datetime_timezone = timezone.localize(start_datetime) - start_datetime_utc = start_datetime_timezone.astimezone(utc_timezone) - - sql_query += " AND created_at >= :start" - arg_dict["start"] = start_datetime_utc + start_date = start_datetime_timezone.astimezone(utc_timezone) if args["end"]: end_datetime = datetime.strptime(args["end"], "%Y-%m-%d %H:%M") end_datetime = end_datetime.replace(second=0) - end_datetime_timezone = timezone.localize(end_datetime) - end_datetime_utc = end_datetime_timezone.astimezone(utc_timezone) + end_date = end_datetime_timezone.astimezone(utc_timezone) - sql_query += " AND created_at < :end" - arg_dict["end"] = end_datetime_utc - - sql_query += " GROUP BY date ORDER BY date" - - response_data = [] - - with db.engine.begin() as conn: - rs = conn.execute(sa.text(sql_query), arg_dict) - for i in rs: - response_data.append({"date": str(i.date), "runs": i.runs}) + response_data = self._workflow_run_repo.get_daily_runs_statistics( + tenant_id=app_model.tenant_id, + app_id=app_model.id, + triggered_from=WorkflowRunTriggeredFrom.APP_RUN, + start_date=start_date, + end_date=end_date, + timezone=account.timezone, + ) return jsonify({"data": response_data}) @console_ns.route("/apps//workflow/statistics/daily-terminals") class WorkflowDailyTerminalsStatistic(Resource): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + session_maker = sessionmaker(bind=db.engine, expire_on_commit=False) + self._workflow_run_repo = DifyAPIRepositoryFactory.create_api_workflow_run_repository(session_maker) + @api.doc("get_workflow_daily_terminals_statistic") @api.doc(description="Get workflow daily terminals statistics") @api.doc(params={"app_id": "Application ID"}) @@ -107,57 +99,44 @@ class WorkflowDailyTerminalsStatistic(Resource): ) args = parser.parse_args() - sql_query = """SELECT - DATE(DATE_TRUNC('day', created_at AT TIME ZONE 'UTC' AT TIME ZONE :tz )) AS date, - COUNT(DISTINCT workflow_runs.created_by) AS terminal_count -FROM - workflow_runs -WHERE - app_id = :app_id - AND triggered_from = :triggered_from""" - arg_dict = { - "tz": account.timezone, - "app_id": app_model.id, - "triggered_from": WorkflowRunTriggeredFrom.APP_RUN, - } assert account.timezone is not None timezone = pytz.timezone(account.timezone) utc_timezone = pytz.utc + start_date = None + end_date = None + if args["start"]: start_datetime = datetime.strptime(args["start"], "%Y-%m-%d %H:%M") start_datetime = start_datetime.replace(second=0) - start_datetime_timezone = timezone.localize(start_datetime) - start_datetime_utc = start_datetime_timezone.astimezone(utc_timezone) - - sql_query += " AND created_at >= :start" - arg_dict["start"] = start_datetime_utc + start_date = start_datetime_timezone.astimezone(utc_timezone) if args["end"]: end_datetime = datetime.strptime(args["end"], "%Y-%m-%d %H:%M") end_datetime = end_datetime.replace(second=0) - end_datetime_timezone = timezone.localize(end_datetime) - end_datetime_utc = end_datetime_timezone.astimezone(utc_timezone) + end_date = end_datetime_timezone.astimezone(utc_timezone) - sql_query += " AND created_at < :end" - arg_dict["end"] = end_datetime_utc - - sql_query += " GROUP BY date ORDER BY date" - - response_data = [] - - with db.engine.begin() as conn: - rs = conn.execute(sa.text(sql_query), arg_dict) - for i in rs: - response_data.append({"date": str(i.date), "terminal_count": i.terminal_count}) + response_data = self._workflow_run_repo.get_daily_terminals_statistics( + tenant_id=app_model.tenant_id, + app_id=app_model.id, + triggered_from=WorkflowRunTriggeredFrom.APP_RUN, + start_date=start_date, + end_date=end_date, + timezone=account.timezone, + ) return jsonify({"data": response_data}) @console_ns.route("/apps//workflow/statistics/token-costs") class WorkflowDailyTokenCostStatistic(Resource): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + session_maker = sessionmaker(bind=db.engine, expire_on_commit=False) + self._workflow_run_repo = DifyAPIRepositoryFactory.create_api_workflow_run_repository(session_maker) + @api.doc("get_workflow_daily_token_cost_statistic") @api.doc(description="Get workflow daily token cost statistics") @api.doc(params={"app_id": "Application ID"}) @@ -177,62 +156,44 @@ class WorkflowDailyTokenCostStatistic(Resource): ) args = parser.parse_args() - sql_query = """SELECT - DATE(DATE_TRUNC('day', created_at AT TIME ZONE 'UTC' AT TIME ZONE :tz )) AS date, - SUM(workflow_runs.total_tokens) AS token_count -FROM - workflow_runs -WHERE - app_id = :app_id - AND triggered_from = :triggered_from""" - arg_dict = { - "tz": account.timezone, - "app_id": app_model.id, - "triggered_from": WorkflowRunTriggeredFrom.APP_RUN, - } assert account.timezone is not None timezone = pytz.timezone(account.timezone) utc_timezone = pytz.utc + start_date = None + end_date = None + if args["start"]: start_datetime = datetime.strptime(args["start"], "%Y-%m-%d %H:%M") start_datetime = start_datetime.replace(second=0) - start_datetime_timezone = timezone.localize(start_datetime) - start_datetime_utc = start_datetime_timezone.astimezone(utc_timezone) - - sql_query += " AND created_at >= :start" - arg_dict["start"] = start_datetime_utc + start_date = start_datetime_timezone.astimezone(utc_timezone) if args["end"]: end_datetime = datetime.strptime(args["end"], "%Y-%m-%d %H:%M") end_datetime = end_datetime.replace(second=0) - end_datetime_timezone = timezone.localize(end_datetime) - end_datetime_utc = end_datetime_timezone.astimezone(utc_timezone) + end_date = end_datetime_timezone.astimezone(utc_timezone) - sql_query += " AND created_at < :end" - arg_dict["end"] = end_datetime_utc - - sql_query += " GROUP BY date ORDER BY date" - - response_data = [] - - with db.engine.begin() as conn: - rs = conn.execute(sa.text(sql_query), arg_dict) - for i in rs: - response_data.append( - { - "date": str(i.date), - "token_count": i.token_count, - } - ) + response_data = self._workflow_run_repo.get_daily_token_cost_statistics( + tenant_id=app_model.tenant_id, + app_id=app_model.id, + triggered_from=WorkflowRunTriggeredFrom.APP_RUN, + start_date=start_date, + end_date=end_date, + timezone=account.timezone, + ) return jsonify({"data": response_data}) @console_ns.route("/apps//workflow/statistics/average-app-interactions") class WorkflowAverageAppInteractionStatistic(Resource): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + session_maker = sessionmaker(bind=db.engine, expire_on_commit=False) + self._workflow_run_repo = DifyAPIRepositoryFactory.create_api_workflow_run_repository(session_maker) + @api.doc("get_workflow_average_app_interaction_statistic") @api.doc(description="Get workflow average app interaction statistics") @api.doc(params={"app_id": "Application ID"}) @@ -252,67 +213,32 @@ class WorkflowAverageAppInteractionStatistic(Resource): ) args = parser.parse_args() - sql_query = """SELECT - AVG(sub.interactions) AS interactions, - sub.date -FROM - ( - SELECT - DATE(DATE_TRUNC('day', c.created_at AT TIME ZONE 'UTC' AT TIME ZONE :tz )) AS date, - c.created_by, - COUNT(c.id) AS interactions - FROM - workflow_runs c - WHERE - c.app_id = :app_id - AND c.triggered_from = :triggered_from - {{start}} - {{end}} - GROUP BY - date, c.created_by - ) sub -GROUP BY - sub.date""" - arg_dict = { - "tz": account.timezone, - "app_id": app_model.id, - "triggered_from": WorkflowRunTriggeredFrom.APP_RUN, - } assert account.timezone is not None timezone = pytz.timezone(account.timezone) utc_timezone = pytz.utc + start_date = None + end_date = None + if args["start"]: start_datetime = datetime.strptime(args["start"], "%Y-%m-%d %H:%M") start_datetime = start_datetime.replace(second=0) - start_datetime_timezone = timezone.localize(start_datetime) - start_datetime_utc = start_datetime_timezone.astimezone(utc_timezone) - - sql_query = sql_query.replace("{{start}}", " AND c.created_at >= :start") - arg_dict["start"] = start_datetime_utc - else: - sql_query = sql_query.replace("{{start}}", "") + start_date = start_datetime_timezone.astimezone(utc_timezone) if args["end"]: end_datetime = datetime.strptime(args["end"], "%Y-%m-%d %H:%M") end_datetime = end_datetime.replace(second=0) - end_datetime_timezone = timezone.localize(end_datetime) - end_datetime_utc = end_datetime_timezone.astimezone(utc_timezone) + end_date = end_datetime_timezone.astimezone(utc_timezone) - sql_query = sql_query.replace("{{end}}", " AND c.created_at < :end") - arg_dict["end"] = end_datetime_utc - else: - sql_query = sql_query.replace("{{end}}", "") - - response_data = [] - - with db.engine.begin() as conn: - rs = conn.execute(sa.text(sql_query), arg_dict) - for i in rs: - response_data.append( - {"date": str(i.date), "interactions": float(i.interactions.quantize(Decimal("0.01")))} - ) + response_data = self._workflow_run_repo.get_average_app_interaction_statistics( + tenant_id=app_model.tenant_id, + app_id=app_model.id, + triggered_from=WorkflowRunTriggeredFrom.APP_RUN, + start_date=start_date, + end_date=end_date, + timezone=account.timezone, + ) return jsonify({"data": response_data}) diff --git a/api/core/memory/token_buffer_memory.py b/api/core/memory/token_buffer_memory.py index 35af742f2a..3ebbb60f85 100644 --- a/api/core/memory/token_buffer_memory.py +++ b/api/core/memory/token_buffer_memory.py @@ -1,6 +1,7 @@ from collections.abc import Sequence from sqlalchemy import select +from sqlalchemy.orm import sessionmaker from core.app.app_config.features.file_upload.manager import FileUploadConfigManager from core.file import file_manager @@ -18,7 +19,9 @@ from core.prompt.utils.extract_thread_messages import extract_thread_messages from extensions.ext_database import db from factories import file_factory from models.model import AppMode, Conversation, Message, MessageFile -from models.workflow import Workflow, WorkflowRun +from models.workflow import Workflow +from repositories.api_workflow_run_repository import APIWorkflowRunRepository +from repositories.factory import DifyAPIRepositoryFactory class TokenBufferMemory: @@ -29,6 +32,14 @@ class TokenBufferMemory: ): self.conversation = conversation self.model_instance = model_instance + self._workflow_run_repo: APIWorkflowRunRepository | None = None + + @property + def workflow_run_repo(self) -> APIWorkflowRunRepository: + if self._workflow_run_repo is None: + session_maker = sessionmaker(bind=db.engine, expire_on_commit=False) + self._workflow_run_repo = DifyAPIRepositoryFactory.create_api_workflow_run_repository(session_maker) + return self._workflow_run_repo def _build_prompt_message_with_files( self, @@ -50,7 +61,16 @@ class TokenBufferMemory: if self.conversation.mode in {AppMode.AGENT_CHAT, AppMode.COMPLETION, AppMode.CHAT}: file_extra_config = FileUploadConfigManager.convert(self.conversation.model_config) elif self.conversation.mode in {AppMode.ADVANCED_CHAT, AppMode.WORKFLOW}: - workflow_run = db.session.scalar(select(WorkflowRun).where(WorkflowRun.id == message.workflow_run_id)) + app = self.conversation.app + if not app: + raise ValueError("App not found for conversation") + + if not message.workflow_run_id: + raise ValueError("Workflow run ID not found") + + workflow_run = self.workflow_run_repo.get_workflow_run_by_id( + tenant_id=app.tenant_id, app_id=app.id, run_id=message.workflow_run_id + ) if not workflow_run: raise ValueError(f"Workflow run not found: {message.workflow_run_id}") workflow = db.session.scalar(select(Workflow).where(Workflow.id == workflow_run.workflow_id)) diff --git a/api/core/ops/ops_trace_manager.py b/api/core/ops/ops_trace_manager.py index 7db9b076d2..de0d4560e3 100644 --- a/api/core/ops/ops_trace_manager.py +++ b/api/core/ops/ops_trace_manager.py @@ -12,7 +12,7 @@ from uuid import UUID, uuid4 from cachetools import LRUCache from flask import current_app from sqlalchemy import select -from sqlalchemy.orm import Session +from sqlalchemy.orm import Session, sessionmaker from core.helper.encrypter import decrypt_token, encrypt_token, obfuscated_token from core.ops.entities.config_entity import ( @@ -34,7 +34,8 @@ from core.ops.utils import get_message_data from extensions.ext_database import db from extensions.ext_storage import storage from models.model import App, AppModelConfig, Conversation, Message, MessageFile, TraceAppConfig -from models.workflow import WorkflowAppLog, WorkflowRun +from models.workflow import WorkflowAppLog +from repositories.factory import DifyAPIRepositoryFactory from tasks.ops_trace_task import process_trace_tasks if TYPE_CHECKING: @@ -419,6 +420,18 @@ class OpsTraceManager: class TraceTask: + _workflow_run_repo = None + _repo_lock = threading.Lock() + + @classmethod + def _get_workflow_run_repo(cls): + if cls._workflow_run_repo is None: + with cls._repo_lock: + if cls._workflow_run_repo is None: + session_maker = sessionmaker(bind=db.engine, expire_on_commit=False) + cls._workflow_run_repo = DifyAPIRepositoryFactory.create_api_workflow_run_repository(session_maker) + return cls._workflow_run_repo + def __init__( self, trace_type: Any, @@ -486,27 +499,27 @@ class TraceTask: if not workflow_run_id: return {} + workflow_run_repo = self._get_workflow_run_repo() + workflow_run = workflow_run_repo.get_workflow_run_by_id_without_tenant(run_id=workflow_run_id) + if not workflow_run: + raise ValueError("Workflow run not found") + + workflow_id = workflow_run.workflow_id + tenant_id = workflow_run.tenant_id + workflow_run_id = workflow_run.id + workflow_run_elapsed_time = workflow_run.elapsed_time + workflow_run_status = workflow_run.status + workflow_run_inputs = workflow_run.inputs_dict + workflow_run_outputs = workflow_run.outputs_dict + workflow_run_version = workflow_run.version + error = workflow_run.error or "" + + total_tokens = workflow_run.total_tokens + + file_list = workflow_run_inputs.get("sys.file") or [] + query = workflow_run_inputs.get("query") or workflow_run_inputs.get("sys.query") or "" + with Session(db.engine) as session: - workflow_run_stmt = select(WorkflowRun).where(WorkflowRun.id == workflow_run_id) - workflow_run = session.scalars(workflow_run_stmt).first() - if not workflow_run: - raise ValueError("Workflow run not found") - - workflow_id = workflow_run.workflow_id - tenant_id = workflow_run.tenant_id - workflow_run_id = workflow_run.id - workflow_run_elapsed_time = workflow_run.elapsed_time - workflow_run_status = workflow_run.status - workflow_run_inputs = workflow_run.inputs_dict - workflow_run_outputs = workflow_run.outputs_dict - workflow_run_version = workflow_run.version - error = workflow_run.error or "" - - total_tokens = workflow_run.total_tokens - - file_list = workflow_run_inputs.get("sys.file") or [] - query = workflow_run_inputs.get("query") or workflow_run_inputs.get("sys.query") or "" - # get workflow_app_log_id workflow_app_log_data_stmt = select(WorkflowAppLog.id).where( WorkflowAppLog.tenant_id == tenant_id, @@ -523,43 +536,43 @@ class TraceTask: ) message_id = session.scalar(message_data_stmt) - metadata = { - "workflow_id": workflow_id, - "conversation_id": conversation_id, - "workflow_run_id": workflow_run_id, - "tenant_id": tenant_id, - "elapsed_time": workflow_run_elapsed_time, - "status": workflow_run_status, - "version": workflow_run_version, - "total_tokens": total_tokens, - "file_list": file_list, - "triggered_from": workflow_run.triggered_from, - "user_id": user_id, - "app_id": workflow_run.app_id, - } + metadata = { + "workflow_id": workflow_id, + "conversation_id": conversation_id, + "workflow_run_id": workflow_run_id, + "tenant_id": tenant_id, + "elapsed_time": workflow_run_elapsed_time, + "status": workflow_run_status, + "version": workflow_run_version, + "total_tokens": total_tokens, + "file_list": file_list, + "triggered_from": workflow_run.triggered_from, + "user_id": user_id, + "app_id": workflow_run.app_id, + } - workflow_trace_info = WorkflowTraceInfo( - trace_id=self.trace_id, - workflow_data=workflow_run.to_dict(), - conversation_id=conversation_id, - workflow_id=workflow_id, - tenant_id=tenant_id, - workflow_run_id=workflow_run_id, - workflow_run_elapsed_time=workflow_run_elapsed_time, - workflow_run_status=workflow_run_status, - workflow_run_inputs=workflow_run_inputs, - workflow_run_outputs=workflow_run_outputs, - workflow_run_version=workflow_run_version, - error=error, - total_tokens=total_tokens, - file_list=file_list, - query=query, - metadata=metadata, - workflow_app_log_id=workflow_app_log_id, - message_id=message_id, - start_time=workflow_run.created_at, - end_time=workflow_run.finished_at, - ) + workflow_trace_info = WorkflowTraceInfo( + trace_id=self.trace_id, + workflow_data=workflow_run.to_dict(), + conversation_id=conversation_id, + workflow_id=workflow_id, + tenant_id=tenant_id, + workflow_run_id=workflow_run_id, + workflow_run_elapsed_time=workflow_run_elapsed_time, + workflow_run_status=workflow_run_status, + workflow_run_inputs=workflow_run_inputs, + workflow_run_outputs=workflow_run_outputs, + workflow_run_version=workflow_run_version, + error=error, + total_tokens=total_tokens, + file_list=file_list, + query=query, + metadata=metadata, + workflow_app_log_id=workflow_app_log_id, + message_id=message_id, + start_time=workflow_run.created_at, + end_time=workflow_run.finished_at, + ) return workflow_trace_info def message_trace(self, message_id: str | None): diff --git a/api/repositories/api_workflow_run_repository.py b/api/repositories/api_workflow_run_repository.py index 72de9fed31..eb6d599224 100644 --- a/api/repositories/api_workflow_run_repository.py +++ b/api/repositories/api_workflow_run_repository.py @@ -28,7 +28,7 @@ Example: runs = repo.get_paginated_workflow_runs( tenant_id="tenant-123", app_id="app-456", - triggered_from="debugging", + triggered_from=WorkflowRunTriggeredFrom.DEBUGGING, limit=20 ) ``` @@ -40,7 +40,14 @@ from typing import Protocol from core.workflow.repositories.workflow_execution_repository import WorkflowExecutionRepository from libs.infinite_scroll_pagination import InfiniteScrollPagination +from models.enums import WorkflowRunTriggeredFrom from models.workflow import WorkflowRun +from repositories.types import ( + AverageInteractionStats, + DailyRunsStats, + DailyTerminalsStats, + DailyTokenCostStats, +) class APIWorkflowRunRepository(WorkflowExecutionRepository, Protocol): @@ -56,7 +63,7 @@ class APIWorkflowRunRepository(WorkflowExecutionRepository, Protocol): self, tenant_id: str, app_id: str, - triggered_from: str, + triggered_from: WorkflowRunTriggeredFrom | Sequence[WorkflowRunTriggeredFrom], limit: int = 20, last_id: str | None = None, status: str | None = None, @@ -71,7 +78,7 @@ class APIWorkflowRunRepository(WorkflowExecutionRepository, Protocol): Args: tenant_id: Tenant identifier for multi-tenant isolation app_id: Application identifier - triggered_from: Filter by trigger source (e.g., "debugging", "app-run") + triggered_from: Filter by trigger source(s) (e.g., "debugging", "app-run", or list of values) limit: Maximum number of records to return (default: 20) last_id: Cursor for pagination - ID of the last record from previous page status: Optional filter by status (e.g., "running", "succeeded", "failed") @@ -109,6 +116,31 @@ class APIWorkflowRunRepository(WorkflowExecutionRepository, Protocol): """ ... + def get_workflow_run_by_id_without_tenant( + self, + run_id: str, + ) -> WorkflowRun | None: + """ + Get a specific workflow run by ID without tenant/app context. + + Retrieves a single workflow run using only the run ID, without + requiring tenant_id or app_id. This method is intended for internal + system operations like tracing and monitoring where the tenant context + is not available upfront. + + Args: + run_id: Workflow run identifier + + Returns: + WorkflowRun object if found, None otherwise + + Note: + This method bypasses tenant isolation checks and should only be used + in trusted system contexts like ops trace collection. For user-facing + operations, use get_workflow_run_by_id() with proper tenant isolation. + """ + ... + def get_workflow_runs_count( self, tenant_id: str, @@ -218,3 +250,119 @@ class APIWorkflowRunRepository(WorkflowExecutionRepository, Protocol): and ensure proper data retention policies are followed. """ ... + + def get_daily_runs_statistics( + self, + tenant_id: str, + app_id: str, + triggered_from: str, + start_date: datetime | None = None, + end_date: datetime | None = None, + timezone: str = "UTC", + ) -> list[DailyRunsStats]: + """ + Get daily runs statistics. + + Retrieves daily workflow runs count grouped by date for a specific app + and trigger source. Used for workflow statistics dashboard. + + Args: + tenant_id: Tenant identifier for multi-tenant isolation + app_id: Application identifier + triggered_from: Filter by trigger source (e.g., "app-run") + start_date: Optional start date filter + end_date: Optional end date filter + timezone: Timezone for date grouping (default: "UTC") + + Returns: + List of dictionaries containing date and runs count: + [{"date": "2024-01-01", "runs": 10}, ...] + """ + ... + + def get_daily_terminals_statistics( + self, + tenant_id: str, + app_id: str, + triggered_from: str, + start_date: datetime | None = None, + end_date: datetime | None = None, + timezone: str = "UTC", + ) -> list[DailyTerminalsStats]: + """ + Get daily terminals statistics. + + Retrieves daily unique terminal count grouped by date for a specific app + and trigger source. Used for workflow statistics dashboard. + + Args: + tenant_id: Tenant identifier for multi-tenant isolation + app_id: Application identifier + triggered_from: Filter by trigger source (e.g., "app-run") + start_date: Optional start date filter + end_date: Optional end date filter + timezone: Timezone for date grouping (default: "UTC") + + Returns: + List of dictionaries containing date and terminal count: + [{"date": "2024-01-01", "terminal_count": 5}, ...] + """ + ... + + def get_daily_token_cost_statistics( + self, + tenant_id: str, + app_id: str, + triggered_from: str, + start_date: datetime | None = None, + end_date: datetime | None = None, + timezone: str = "UTC", + ) -> list[DailyTokenCostStats]: + """ + Get daily token cost statistics. + + Retrieves daily total token count grouped by date for a specific app + and trigger source. Used for workflow statistics dashboard. + + Args: + tenant_id: Tenant identifier for multi-tenant isolation + app_id: Application identifier + triggered_from: Filter by trigger source (e.g., "app-run") + start_date: Optional start date filter + end_date: Optional end date filter + timezone: Timezone for date grouping (default: "UTC") + + Returns: + List of dictionaries containing date and token count: + [{"date": "2024-01-01", "token_count": 1000}, ...] + """ + ... + + def get_average_app_interaction_statistics( + self, + tenant_id: str, + app_id: str, + triggered_from: str, + start_date: datetime | None = None, + end_date: datetime | None = None, + timezone: str = "UTC", + ) -> list[AverageInteractionStats]: + """ + Get average app interaction statistics. + + Retrieves daily average interactions per user grouped by date for a specific app + and trigger source. Used for workflow statistics dashboard. + + Args: + tenant_id: Tenant identifier for multi-tenant isolation + app_id: Application identifier + triggered_from: Filter by trigger source (e.g., "app-run") + start_date: Optional start date filter + end_date: Optional end date filter + timezone: Timezone for date grouping (default: "UTC") + + Returns: + List of dictionaries containing date and average interactions: + [{"date": "2024-01-01", "interactions": 2.5}, ...] + """ + ... diff --git a/api/repositories/sqlalchemy_api_workflow_run_repository.py b/api/repositories/sqlalchemy_api_workflow_run_repository.py index 68affb59f3..f08eab0b01 100644 --- a/api/repositories/sqlalchemy_api_workflow_run_repository.py +++ b/api/repositories/sqlalchemy_api_workflow_run_repository.py @@ -22,16 +22,25 @@ Implementation Notes: import logging from collections.abc import Sequence from datetime import datetime -from typing import cast +from decimal import Decimal +from typing import Any, cast +import sqlalchemy as sa from sqlalchemy import delete, func, select from sqlalchemy.engine import CursorResult from sqlalchemy.orm import Session, sessionmaker from libs.infinite_scroll_pagination import InfiniteScrollPagination from libs.time_parser import get_time_threshold +from models.enums import WorkflowRunTriggeredFrom from models.workflow import WorkflowRun from repositories.api_workflow_run_repository import APIWorkflowRunRepository +from repositories.types import ( + AverageInteractionStats, + DailyRunsStats, + DailyTerminalsStats, + DailyTokenCostStats, +) logger = logging.getLogger(__name__) @@ -61,7 +70,7 @@ class DifyAPISQLAlchemyWorkflowRunRepository(APIWorkflowRunRepository): self, tenant_id: str, app_id: str, - triggered_from: str, + triggered_from: WorkflowRunTriggeredFrom | Sequence[WorkflowRunTriggeredFrom], limit: int = 20, last_id: str | None = None, status: str | None = None, @@ -78,9 +87,14 @@ class DifyAPISQLAlchemyWorkflowRunRepository(APIWorkflowRunRepository): base_stmt = select(WorkflowRun).where( WorkflowRun.tenant_id == tenant_id, WorkflowRun.app_id == app_id, - WorkflowRun.triggered_from == triggered_from, ) + # Handle triggered_from values + if isinstance(triggered_from, WorkflowRunTriggeredFrom): + triggered_from = [triggered_from] + if triggered_from: + base_stmt = base_stmt.where(WorkflowRun.triggered_from.in_(triggered_from)) + # Add optional status filter if status: base_stmt = base_stmt.where(WorkflowRun.status == status) @@ -126,6 +140,17 @@ class DifyAPISQLAlchemyWorkflowRunRepository(APIWorkflowRunRepository): ) return session.scalar(stmt) + def get_workflow_run_by_id_without_tenant( + self, + run_id: str, + ) -> WorkflowRun | None: + """ + Get a specific workflow run by ID without tenant/app context. + """ + with self._session_maker() as session: + stmt = select(WorkflowRun).where(WorkflowRun.id == run_id) + return session.scalar(stmt) + def get_workflow_runs_count( self, tenant_id: str, @@ -275,3 +300,213 @@ class DifyAPISQLAlchemyWorkflowRunRepository(APIWorkflowRunRepository): logger.info("Total deleted %s workflow runs for app %s", total_deleted, app_id) return total_deleted + + def get_daily_runs_statistics( + self, + tenant_id: str, + app_id: str, + triggered_from: str, + start_date: datetime | None = None, + end_date: datetime | None = None, + timezone: str = "UTC", + ) -> list[DailyRunsStats]: + """ + Get daily runs statistics using raw SQL for optimal performance. + """ + sql_query = """SELECT + DATE(DATE_TRUNC('day', created_at AT TIME ZONE 'UTC' AT TIME ZONE :tz )) AS date, + COUNT(id) AS runs +FROM + workflow_runs +WHERE + tenant_id = :tenant_id + AND app_id = :app_id + AND triggered_from = :triggered_from""" + + arg_dict: dict[str, Any] = { + "tz": timezone, + "tenant_id": tenant_id, + "app_id": app_id, + "triggered_from": triggered_from, + } + + if start_date: + sql_query += " AND created_at >= :start_date" + arg_dict["start_date"] = start_date + + if end_date: + sql_query += " AND created_at < :end_date" + arg_dict["end_date"] = end_date + + sql_query += " GROUP BY date ORDER BY date" + + response_data = [] + with self._session_maker() as session: + rs = session.execute(sa.text(sql_query), arg_dict) + for row in rs: + response_data.append({"date": str(row.date), "runs": row.runs}) + + return cast(list[DailyRunsStats], response_data) + + def get_daily_terminals_statistics( + self, + tenant_id: str, + app_id: str, + triggered_from: str, + start_date: datetime | None = None, + end_date: datetime | None = None, + timezone: str = "UTC", + ) -> list[DailyTerminalsStats]: + """ + Get daily terminals statistics using raw SQL for optimal performance. + """ + sql_query = """SELECT + DATE(DATE_TRUNC('day', created_at AT TIME ZONE 'UTC' AT TIME ZONE :tz )) AS date, + COUNT(DISTINCT created_by) AS terminal_count +FROM + workflow_runs +WHERE + tenant_id = :tenant_id + AND app_id = :app_id + AND triggered_from = :triggered_from""" + + arg_dict: dict[str, Any] = { + "tz": timezone, + "tenant_id": tenant_id, + "app_id": app_id, + "triggered_from": triggered_from, + } + + if start_date: + sql_query += " AND created_at >= :start_date" + arg_dict["start_date"] = start_date + + if end_date: + sql_query += " AND created_at < :end_date" + arg_dict["end_date"] = end_date + + sql_query += " GROUP BY date ORDER BY date" + + response_data = [] + with self._session_maker() as session: + rs = session.execute(sa.text(sql_query), arg_dict) + for row in rs: + response_data.append({"date": str(row.date), "terminal_count": row.terminal_count}) + + return cast(list[DailyTerminalsStats], response_data) + + def get_daily_token_cost_statistics( + self, + tenant_id: str, + app_id: str, + triggered_from: str, + start_date: datetime | None = None, + end_date: datetime | None = None, + timezone: str = "UTC", + ) -> list[DailyTokenCostStats]: + """ + Get daily token cost statistics using raw SQL for optimal performance. + """ + sql_query = """SELECT + DATE(DATE_TRUNC('day', created_at AT TIME ZONE 'UTC' AT TIME ZONE :tz )) AS date, + SUM(total_tokens) AS token_count +FROM + workflow_runs +WHERE + tenant_id = :tenant_id + AND app_id = :app_id + AND triggered_from = :triggered_from""" + + arg_dict: dict[str, Any] = { + "tz": timezone, + "tenant_id": tenant_id, + "app_id": app_id, + "triggered_from": triggered_from, + } + + if start_date: + sql_query += " AND created_at >= :start_date" + arg_dict["start_date"] = start_date + + if end_date: + sql_query += " AND created_at < :end_date" + arg_dict["end_date"] = end_date + + sql_query += " GROUP BY date ORDER BY date" + + response_data = [] + with self._session_maker() as session: + rs = session.execute(sa.text(sql_query), arg_dict) + for row in rs: + response_data.append( + { + "date": str(row.date), + "token_count": row.token_count, + } + ) + + return cast(list[DailyTokenCostStats], response_data) + + def get_average_app_interaction_statistics( + self, + tenant_id: str, + app_id: str, + triggered_from: str, + start_date: datetime | None = None, + end_date: datetime | None = None, + timezone: str = "UTC", + ) -> list[AverageInteractionStats]: + """ + Get average app interaction statistics using raw SQL for optimal performance. + """ + sql_query = """SELECT + AVG(sub.interactions) AS interactions, + sub.date +FROM + ( + SELECT + DATE(DATE_TRUNC('day', c.created_at AT TIME ZONE 'UTC' AT TIME ZONE :tz )) AS date, + c.created_by, + COUNT(c.id) AS interactions + FROM + workflow_runs c + WHERE + c.tenant_id = :tenant_id + AND c.app_id = :app_id + AND c.triggered_from = :triggered_from + {{start}} + {{end}} + GROUP BY + date, c.created_by + ) sub +GROUP BY + sub.date""" + + arg_dict: dict[str, Any] = { + "tz": timezone, + "tenant_id": tenant_id, + "app_id": app_id, + "triggered_from": triggered_from, + } + + if start_date: + sql_query = sql_query.replace("{{start}}", " AND c.created_at >= :start_date") + arg_dict["start_date"] = start_date + else: + sql_query = sql_query.replace("{{start}}", "") + + if end_date: + sql_query = sql_query.replace("{{end}}", " AND c.created_at < :end_date") + arg_dict["end_date"] = end_date + else: + sql_query = sql_query.replace("{{end}}", "") + + response_data = [] + with self._session_maker() as session: + rs = session.execute(sa.text(sql_query), arg_dict) + for row in rs: + response_data.append( + {"date": str(row.date), "interactions": float(row.interactions.quantize(Decimal("0.01")))} + ) + + return cast(list[AverageInteractionStats], response_data) diff --git a/api/repositories/types.py b/api/repositories/types.py new file mode 100644 index 0000000000..3b3ef7f635 --- /dev/null +++ b/api/repositories/types.py @@ -0,0 +1,21 @@ +from typing import TypedDict + + +class DailyRunsStats(TypedDict): + date: str + runs: int + + +class DailyTerminalsStats(TypedDict): + date: str + terminal_count: int + + +class DailyTokenCostStats(TypedDict): + date: str + token_count: int + + +class AverageInteractionStats(TypedDict): + date: str + interactions: float diff --git a/api/services/rag_pipeline/rag_pipeline.py b/api/services/rag_pipeline/rag_pipeline.py index f6dddd75a3..50dec458a9 100644 --- a/api/services/rag_pipeline/rag_pipeline.py +++ b/api/services/rag_pipeline/rag_pipeline.py @@ -9,7 +9,7 @@ from typing import Any, Union, cast from uuid import uuid4 from flask_login import current_user -from sqlalchemy import func, or_, select +from sqlalchemy import func, select from sqlalchemy.orm import Session, sessionmaker import contexts @@ -94,6 +94,7 @@ class RagPipelineService: self._node_execution_service_repo = DifyAPIRepositoryFactory.create_api_workflow_node_execution_repository( session_maker ) + self._workflow_run_repo = DifyAPIRepositoryFactory.create_api_workflow_run_repository(session_maker) @classmethod def get_pipeline_templates(cls, type: str = "built-in", language: str = "en-US") -> dict: @@ -1015,48 +1016,21 @@ class RagPipelineService: :param args: request args """ limit = int(args.get("limit", 20)) + last_id = args.get("last_id") - base_query = db.session.query(WorkflowRun).where( - WorkflowRun.tenant_id == pipeline.tenant_id, - WorkflowRun.app_id == pipeline.id, - or_( - WorkflowRun.triggered_from == WorkflowRunTriggeredFrom.RAG_PIPELINE_RUN.value, - WorkflowRun.triggered_from == WorkflowRunTriggeredFrom.RAG_PIPELINE_DEBUGGING.value, - ), + triggered_from_values = [ + WorkflowRunTriggeredFrom.RAG_PIPELINE_RUN, + WorkflowRunTriggeredFrom.RAG_PIPELINE_DEBUGGING, + ] + + return self._workflow_run_repo.get_paginated_workflow_runs( + tenant_id=pipeline.tenant_id, + app_id=pipeline.id, + triggered_from=triggered_from_values, + limit=limit, + last_id=last_id, ) - if args.get("last_id"): - last_workflow_run = base_query.where( - WorkflowRun.id == args.get("last_id"), - ).first() - - if not last_workflow_run: - raise ValueError("Last workflow run not exists") - - workflow_runs = ( - base_query.where( - WorkflowRun.created_at < last_workflow_run.created_at, WorkflowRun.id != last_workflow_run.id - ) - .order_by(WorkflowRun.created_at.desc()) - .limit(limit) - .all() - ) - else: - workflow_runs = base_query.order_by(WorkflowRun.created_at.desc()).limit(limit).all() - - has_more = False - if len(workflow_runs) == limit: - current_page_first_workflow_run = workflow_runs[-1] - rest_count = base_query.where( - WorkflowRun.created_at < current_page_first_workflow_run.created_at, - WorkflowRun.id != current_page_first_workflow_run.id, - ).count() - - if rest_count > 0: - has_more = True - - return InfiniteScrollPagination(data=workflow_runs, limit=limit, has_more=has_more) - def get_rag_pipeline_workflow_run(self, pipeline: Pipeline, run_id: str) -> WorkflowRun | None: """ Get workflow run detail @@ -1064,18 +1038,12 @@ class RagPipelineService: :param app_model: app model :param run_id: workflow run id """ - workflow_run = ( - db.session.query(WorkflowRun) - .where( - WorkflowRun.tenant_id == pipeline.tenant_id, - WorkflowRun.app_id == pipeline.id, - WorkflowRun.id == run_id, - ) - .first() + return self._workflow_run_repo.get_workflow_run_by_id( + tenant_id=pipeline.tenant_id, + app_id=pipeline.id, + run_id=run_id, ) - return workflow_run - def get_rag_pipeline_workflow_run_node_executions( self, pipeline: Pipeline,