diff --git a/api/controllers/console/workspace/plugin.py b/api/controllers/console/workspace/plugin.py index 6dd79a6d85..021dc96000 100644 --- a/api/controllers/console/workspace/plugin.py +++ b/api/controllers/console/workspace/plugin.py @@ -125,7 +125,7 @@ class PluginAssetApi(Resource): req.add_argument("file_name", type=str, required=True, location="args") args = req.parse_args() - current_user, tenant_id = current_account_with_tenant() + _, tenant_id = current_account_with_tenant() try: binary = PluginService.extract_asset(tenant_id, args["plugin_unique_identifier"], args["file_name"]) return send_file(io.BytesIO(binary), mimetype="application/octet-stream") @@ -715,7 +715,7 @@ class PluginReadmeApi(Resource): @login_required @account_initialization_required def get(self): - current_user, tenant_id = current_account_with_tenant() + _, tenant_id = current_account_with_tenant() parser = reqparse.RequestParser() parser.add_argument("plugin_unique_identifier", type=str, required=True, location="args") parser.add_argument("language", type=str, required=False, location="args") diff --git a/api/core/trigger/debug/event_selectors.py b/api/core/trigger/debug/event_selectors.py index f940d7f6b0..bd96157f89 100644 --- a/api/core/trigger/debug/event_selectors.py +++ b/api/core/trigger/debug/event_selectors.py @@ -137,7 +137,11 @@ class ScheduleTriggerDebugEventPoller(TriggerDebugEventPoller): ) if not schedule_event: return None - return TriggerDebugEvent(workflow_args=schedule_event.inputs, node_id=self.node_id) + workflow_args: Mapping[str, Any] = { + "inputs": schedule_event.inputs or {}, + "files": [], + } + return TriggerDebugEvent(workflow_args=workflow_args, node_id=self.node_id) def create_event_poller( diff --git a/api/services/workflow/entities.py b/api/services/workflow/entities.py index 30abde8329..bd04b3b150 100644 --- a/api/services/workflow/entities.py +++ b/api/services/workflow/entities.py @@ -58,12 +58,11 @@ class ScheduleTriggerData(TriggerData): class PluginTriggerMetadata(TriggerMetadata): """Plugin trigger metadata""" - plugin_id: str endpoint_id: str plugin_unique_identifier: str provider_id: str - icon_url: str - icon_dark_url: str + icon_filename: str + icon_dark_filename: str class PluginTriggerData(TriggerData): diff --git a/api/services/workflow_app_service.py b/api/services/workflow_app_service.py index 378562e590..71fbc338de 100644 --- a/api/services/workflow_app_service.py +++ b/api/services/workflow_app_service.py @@ -1,6 +1,7 @@ import json import uuid from datetime import datetime +from typing import Any from sqlalchemy import and_, func, or_, select from sqlalchemy.orm import Session @@ -9,6 +10,7 @@ from core.workflow.enums import WorkflowExecutionStatus from models import Account, App, EndUser, WorkflowAppLog, WorkflowRun from models.enums import CreatorUserRole from models.trigger import WorkflowTriggerLog +from services.plugin.plugin_service import PluginService # Since the workflow_app_log table has exceeded 100 million records, we use an additional details field to extend it @@ -23,22 +25,14 @@ class LogView: self.log = log self.details_ = details + @property + def details(self) -> dict | None: + return self.details_ + def __getattr__(self, name): return getattr(self.log, name) -# Helpers -def _safe_json_loads(val): - if not val: - return None - if isinstance(val, str): - try: - return json.loads(val) - except Exception: - return None - return val - - class WorkflowAppService: def get_paginate_workflow_app_logs( self, @@ -76,19 +70,15 @@ class WorkflowAppService: ) if detail: - # Correlated scalar subquery: fetch latest trigger_metadata per workflow_run_id - meta_expr = ( - select(WorkflowTriggerLog.trigger_metadata) - .where( - WorkflowTriggerLog.workflow_run_id == WorkflowAppLog.workflow_run_id, - WorkflowTriggerLog.app_id == app_model.id, + # Simple left join by workflow_run_id to fetch trigger_metadata + stmt = stmt.outerjoin( + WorkflowTriggerLog, + and_( WorkflowTriggerLog.tenant_id == app_model.tenant_id, - ) - .order_by(WorkflowTriggerLog.created_at.desc()) - .limit(1) - .scalar_subquery() - ) - stmt = stmt.add_columns(meta_expr) + WorkflowTriggerLog.app_id == app_model.id, + WorkflowTriggerLog.workflow_run_id == WorkflowAppLog.workflow_run_id, + ), + ).add_columns(WorkflowTriggerLog.trigger_metadata) if keyword or status: stmt = stmt.join(WorkflowRun, WorkflowRun.id == WorkflowAppLog.workflow_run_id) @@ -161,7 +151,10 @@ class WorkflowAppService: # Execute query and get items if detail: rows = session.execute(offset_stmt).all() - items = [LogView(log, {"trigger_metadata": _safe_json_loads(meta_val)}) for log, meta_val in rows] + items = [ + LogView(log, {"trigger_metadata": self.handle_trigger_metadata(app_model.tenant_id, meta_val)}) + for log, meta_val in rows + ] else: items = [LogView(log, None) for log in session.scalars(offset_stmt).all()] return { @@ -172,6 +165,30 @@ class WorkflowAppService: "data": items, } + def handle_trigger_metadata(self, tenant_id: str, meta_val: str) -> dict[str, Any]: + metadata: dict[str, Any] | None = self._safe_json_loads(meta_val) + if not metadata: + return {} + icon = metadata.get("icon_filename") + icon_dark = metadata.get("icon_dark_filename") + return { + "icon": PluginService.get_plugin_icon_url(tenant_id=tenant_id, filename=icon) if icon else None, + "icon_dark": PluginService.get_plugin_icon_url(tenant_id=tenant_id, filename=icon_dark) + if icon_dark + else None, + } + + @staticmethod + def _safe_json_loads(val): + if not val: + return None + if isinstance(val, str): + try: + return json.loads(val) + except Exception: + return None + return val + @staticmethod def _safe_parse_uuid(value: str): # fast check diff --git a/api/tasks/async_workflow_tasks.py b/api/tasks/async_workflow_tasks.py index c662b7c4a7..07edd96bc0 100644 --- a/api/tasks/async_workflow_tasks.py +++ b/api/tasks/async_workflow_tasks.py @@ -143,7 +143,7 @@ def _execute_workflow_common( invoke_from=InvokeFrom.SERVICE_API, streaming=False, call_depth=0, - triggered_from=trigger_data.trigger_type, + triggered_from=trigger_data.trigger_from, root_node_id=trigger_data.root_node_id, layers=[ TimeSliceLayer(cfs_plan_scheduler), diff --git a/api/tasks/trigger_processing_tasks.py b/api/tasks/trigger_processing_tasks.py index cb59d835ef..b9d24421e2 100644 --- a/api/tasks/trigger_processing_tasks.py +++ b/api/tasks/trigger_processing_tasks.py @@ -18,7 +18,7 @@ from core.plugin.entities.plugin_daemon import CredentialType from core.plugin.entities.request import TriggerInvokeEventResponse from core.trigger.debug.event_bus import TriggerDebugEventBus from core.trigger.debug.events import PluginTriggerDebugEvent, build_plugin_pool_key -from core.trigger.entities.api_entities import TriggerProviderApiEntity +from core.trigger.entities.entities import TriggerProviderEntity from core.trigger.provider import PluginTriggerProviderController from core.trigger.trigger_manager import TriggerManager from core.workflow.enums import NodeType @@ -139,7 +139,7 @@ def dispatch_triggered_workflow( provider_controller: PluginTriggerProviderController = TriggerManager.get_trigger_provider( tenant_id=subscription.tenant_id, provider_id=TriggerProviderID(subscription.provider_id) ) - trigger_entity: TriggerProviderApiEntity = provider_controller.to_api_entity() + trigger_entity: TriggerProviderEntity = provider_controller.entity with Session(db.engine) as session: workflows: Mapping[str, Workflow] = _get_latest_workflows_by_app_ids(session, subscribers) @@ -204,12 +204,11 @@ def dispatch_triggered_workflow( endpoint_id=subscription.endpoint_id, inputs=invoke_response.variables, trigger_metadata=PluginTriggerMetadata( - plugin_id=trigger_entity.plugin_id or "", - plugin_unique_identifier=trigger_entity.plugin_unique_identifier or "", + plugin_unique_identifier=provider_controller.plugin_unique_identifier or "", endpoint_id=subscription.endpoint_id, provider_id=subscription.provider_id, - icon_url=trigger_entity.icon or "", - icon_dark_url=trigger_entity.icon_dark or "", + icon_filename=trigger_entity.identity.icon or "", + icon_dark_filename=trigger_entity.identity.icon_dark or "", ), ) diff --git a/api/tasks/workflow_schedule_tasks.py b/api/tasks/workflow_schedule_tasks.py index 1599e249fd..3b6ea5dadf 100644 --- a/api/tasks/workflow_schedule_tasks.py +++ b/api/tasks/workflow_schedule_tasks.py @@ -1,7 +1,5 @@ import logging import time -from datetime import UTC, datetime -from zoneinfo import ZoneInfo from celery import shared_task from sqlalchemy.orm import sessionmaker @@ -47,10 +45,7 @@ def run_schedule_trigger(schedule_id: str) -> None: raise TenantOwnerNotFoundError(f"No owner or admin found for tenant {schedule.tenant_id}") try: - current_utc = datetime.now(UTC) - schedule_tz = ZoneInfo(schedule.timezone) if schedule.timezone else UTC - current_in_tz = current_utc.astimezone(schedule_tz) - inputs = {"current_time": current_in_tz.isoformat()} + inputs = {} # Production dispatch: Trigger the workflow normally response = AsyncWorkflowService.trigger_workflow_async(