dify/api/services/workflow_app_service.py

240 lines
9.0 KiB
Python

import json
import uuid
from datetime import datetime
from typing import Any
from sqlalchemy import and_, func, or_, select
from sqlalchemy.orm import Session
from core.trigger.trigger_manager import TriggerManager
from core.workflow.enums import WorkflowExecutionStatus
from models import Account, App, EndUser, WorkflowAppLog, WorkflowRun
from models.enums import AppTriggerType, CreatorUserRole
from models.provider_ids import TriggerProviderID
from models.trigger import WorkflowPluginTrigger, WorkflowTriggerLog
class WorkflowAppService:
def get_paginate_workflow_app_logs(
self,
*,
session: Session,
app_model: App,
keyword: str | None = None,
status: WorkflowExecutionStatus | None = None,
created_at_before: datetime | None = None,
created_at_after: datetime | None = None,
page: int = 1,
limit: int = 20,
created_by_end_user_session_id: str | None = None,
created_by_account: str | None = None,
):
"""
Get paginate workflow app logs using SQLAlchemy 2.0 style
:param session: SQLAlchemy session
:param app_model: app model
:param keyword: search keyword
:param status: filter by status
:param created_at_before: filter logs created before this timestamp
:param created_at_after: filter logs created after this timestamp
:param page: page number
:param limit: items per page
:param created_by_end_user_session_id: filter by end user session id
:param created_by_account: filter by account email
:return: Pagination object
"""
# Build base statement using SQLAlchemy 2.0 style
stmt = select(WorkflowAppLog).where(
WorkflowAppLog.tenant_id == app_model.tenant_id, WorkflowAppLog.app_id == app_model.id
)
if keyword or status:
stmt = stmt.join(WorkflowRun, WorkflowRun.id == WorkflowAppLog.workflow_run_id)
if keyword:
keyword_like_val = f"%{keyword[:30].encode('unicode_escape').decode('utf-8')}%".replace(r"\u", r"\\u")
keyword_conditions = [
WorkflowRun.inputs.ilike(keyword_like_val),
WorkflowRun.outputs.ilike(keyword_like_val),
# filter keyword by end user session id if created by end user role
and_(WorkflowRun.created_by_role == "end_user", EndUser.session_id.ilike(keyword_like_val)),
]
# filter keyword by workflow run id
keyword_uuid = self._safe_parse_uuid(keyword)
if keyword_uuid:
keyword_conditions.append(WorkflowRun.id == keyword_uuid)
stmt = stmt.outerjoin(
EndUser,
and_(WorkflowRun.created_by == EndUser.id, WorkflowRun.created_by_role == CreatorUserRole.END_USER),
).where(or_(*keyword_conditions))
if status:
stmt = stmt.where(WorkflowRun.status == status)
# Add time-based filtering
if created_at_before:
stmt = stmt.where(WorkflowAppLog.created_at <= created_at_before)
if created_at_after:
stmt = stmt.where(WorkflowAppLog.created_at >= created_at_after)
# Filter by end user session id or account email
if created_by_end_user_session_id:
stmt = stmt.join(
EndUser,
and_(
WorkflowAppLog.created_by == EndUser.id,
WorkflowAppLog.created_by_role == CreatorUserRole.END_USER,
EndUser.session_id == created_by_end_user_session_id,
),
)
if created_by_account:
account = session.scalar(select(Account).where(Account.email == created_by_account))
if not account:
raise ValueError(f"Account not found: {created_by_account}")
stmt = stmt.join(
Account,
and_(
WorkflowAppLog.created_by == Account.id,
WorkflowAppLog.created_by_role == CreatorUserRole.ACCOUNT,
Account.id == account.id,
),
)
stmt = stmt.order_by(WorkflowAppLog.created_at.desc())
# Get total count using the same filters
count_stmt = select(func.count()).select_from(stmt.subquery())
total = session.scalar(count_stmt) or 0
# Apply pagination limits
offset_stmt = stmt.offset((page - 1) * limit).limit(limit)
# Execute query and get items
items = list(session.scalars(offset_stmt).all())
trigger_info_map = self._build_trigger_info_map(session, app_model, items)
for log in items:
log.trigger_info = trigger_info_map.get(log.workflow_run_id)
return {
"page": page,
"limit": limit,
"total": total,
"has_more": total > page * limit,
"data": items,
}
@staticmethod
def _safe_parse_uuid(value: str):
# fast check
if len(value) < 32:
return None
try:
return uuid.UUID(value)
except ValueError:
return None
def _build_trigger_info_map(self, session: Session, app_model: App, logs: list[WorkflowAppLog]) -> dict[str, dict]:
run_ids = [log.workflow_run_id for log in logs if log.workflow_run_id]
if not run_ids:
return {}
trigger_logs = (
session.execute(select(WorkflowTriggerLog).where(WorkflowTriggerLog.workflow_run_id.in_(run_ids)))
.scalars()
.all()
)
if not trigger_logs:
return {}
trigger_data_map: dict[str, dict] = {}
node_ids: set[str] = set()
for trigger_log in trigger_logs:
if not trigger_log.workflow_run_id:
continue
try:
trigger_data = json.loads(trigger_log.trigger_data)
except json.JSONDecodeError:
trigger_data = {}
node_id = trigger_data.get("root_node_id")
if node_id:
node_ids.add(node_id)
trigger_data_map[trigger_log.workflow_run_id] = {
"log": trigger_log,
"node_id": node_id,
}
plugin_trigger_map: dict[str, WorkflowPluginTrigger] = {}
if node_ids:
plugin_triggers = (
session.execute(
select(WorkflowPluginTrigger).where(
WorkflowPluginTrigger.app_id == app_model.id,
WorkflowPluginTrigger.node_id.in_(node_ids),
)
)
.scalars()
.all()
)
plugin_trigger_map = {plugin.node_id: plugin for plugin in plugin_triggers}
provider_cache: dict[str, dict[str, Any]] = {}
def resolve_provider(provider_id: str) -> dict[str, Any]:
if provider_id in provider_cache:
return provider_cache[provider_id]
metadata: dict[str, Any] = {}
try:
controller = TriggerManager.get_trigger_provider(app_model.tenant_id, TriggerProviderID(provider_id))
api_entity = controller.to_api_entity()
metadata = {
"provider_name": api_entity.name,
"provider_label": api_entity.label.to_dict(),
"icon": api_entity.icon or "",
"plugin_id": controller.plugin_id,
"plugin_unique_identifier": controller.plugin_unique_identifier,
}
except Exception:
metadata = {}
provider_cache[provider_id] = metadata
return metadata
trigger_info_map: dict[str, dict] = {}
for run_id, context in trigger_data_map.items():
trigger_log = context["log"]
if isinstance(trigger_log.trigger_type, AppTriggerType):
trigger_type_value = trigger_log.trigger_type.value
else:
trigger_type_value = trigger_log.trigger_type
info = {
"type": trigger_type_value,
"node_id": context["node_id"],
"workflow_trigger_log_id": trigger_log.id,
}
if (
trigger_log.trigger_type == AppTriggerType.TRIGGER_PLUGIN # type: ignore[comparison-overlap]
and context["node_id"]
):
plugin_trigger = plugin_trigger_map.get(context["node_id"])
if plugin_trigger:
info.update(
{
"provider_id": plugin_trigger.provider_id,
"subscription_id": plugin_trigger.subscription_id,
"event_name": plugin_trigger.event_name,
}
)
provider_metadata = resolve_provider(plugin_trigger.provider_id)
if provider_metadata:
info.update(provider_metadata)
trigger_info_map[run_id] = info
return trigger_info_map