mirror of https://github.com/langgenius/dify.git
refactor: improve trigger metadata handling and streamline workflow service
- Updated ScheduleTriggerDebugEventPoller to include an empty files list in workflow_args. - Enhanced WorkflowAppService to handle trigger metadata more effectively, including a new method for processing metadata and removing the deprecated _safe_json_loads function. - Adjusted PluginTriggerMetadata to use icon_filename and icon_dark_filename for better clarity. - Simplified async workflow task parameters by changing triggered_from to trigger_from for consistency.
This commit is contained in:
parent
0d686fc6ae
commit
c16421df27
|
|
@ -125,7 +125,7 @@ class PluginAssetApi(Resource):
|
|||
req.add_argument("file_name", type=str, required=True, location="args")
|
||||
args = req.parse_args()
|
||||
|
||||
current_user, tenant_id = current_account_with_tenant()
|
||||
_, tenant_id = current_account_with_tenant()
|
||||
try:
|
||||
binary = PluginService.extract_asset(tenant_id, args["plugin_unique_identifier"], args["file_name"])
|
||||
return send_file(io.BytesIO(binary), mimetype="application/octet-stream")
|
||||
|
|
@ -715,7 +715,7 @@ class PluginReadmeApi(Resource):
|
|||
@login_required
|
||||
@account_initialization_required
|
||||
def get(self):
|
||||
current_user, tenant_id = current_account_with_tenant()
|
||||
_, tenant_id = current_account_with_tenant()
|
||||
parser = reqparse.RequestParser()
|
||||
parser.add_argument("plugin_unique_identifier", type=str, required=True, location="args")
|
||||
parser.add_argument("language", type=str, required=False, location="args")
|
||||
|
|
|
|||
|
|
@ -137,7 +137,11 @@ class ScheduleTriggerDebugEventPoller(TriggerDebugEventPoller):
|
|||
)
|
||||
if not schedule_event:
|
||||
return None
|
||||
return TriggerDebugEvent(workflow_args=schedule_event.inputs, node_id=self.node_id)
|
||||
workflow_args: Mapping[str, Any] = {
|
||||
"inputs": schedule_event.inputs or {},
|
||||
"files": [],
|
||||
}
|
||||
return TriggerDebugEvent(workflow_args=workflow_args, node_id=self.node_id)
|
||||
|
||||
|
||||
def create_event_poller(
|
||||
|
|
|
|||
|
|
@ -58,12 +58,11 @@ class ScheduleTriggerData(TriggerData):
|
|||
class PluginTriggerMetadata(TriggerMetadata):
|
||||
"""Plugin trigger metadata"""
|
||||
|
||||
plugin_id: str
|
||||
endpoint_id: str
|
||||
plugin_unique_identifier: str
|
||||
provider_id: str
|
||||
icon_url: str
|
||||
icon_dark_url: str
|
||||
icon_filename: str
|
||||
icon_dark_filename: str
|
||||
|
||||
|
||||
class PluginTriggerData(TriggerData):
|
||||
|
|
|
|||
|
|
@ -1,6 +1,7 @@
|
|||
import json
|
||||
import uuid
|
||||
from datetime import datetime
|
||||
from typing import Any
|
||||
|
||||
from sqlalchemy import and_, func, or_, select
|
||||
from sqlalchemy.orm import Session
|
||||
|
|
@ -9,6 +10,7 @@ from core.workflow.enums import WorkflowExecutionStatus
|
|||
from models import Account, App, EndUser, WorkflowAppLog, WorkflowRun
|
||||
from models.enums import CreatorUserRole
|
||||
from models.trigger import WorkflowTriggerLog
|
||||
from services.plugin.plugin_service import PluginService
|
||||
|
||||
|
||||
# Since the workflow_app_log table has exceeded 100 million records, we use an additional details field to extend it
|
||||
|
|
@ -23,22 +25,14 @@ class LogView:
|
|||
self.log = log
|
||||
self.details_ = details
|
||||
|
||||
@property
|
||||
def details(self) -> dict | None:
|
||||
return self.details_
|
||||
|
||||
def __getattr__(self, name):
|
||||
return getattr(self.log, name)
|
||||
|
||||
|
||||
# Helpers
|
||||
def _safe_json_loads(val):
|
||||
if not val:
|
||||
return None
|
||||
if isinstance(val, str):
|
||||
try:
|
||||
return json.loads(val)
|
||||
except Exception:
|
||||
return None
|
||||
return val
|
||||
|
||||
|
||||
class WorkflowAppService:
|
||||
def get_paginate_workflow_app_logs(
|
||||
self,
|
||||
|
|
@ -76,19 +70,15 @@ class WorkflowAppService:
|
|||
)
|
||||
|
||||
if detail:
|
||||
# Correlated scalar subquery: fetch latest trigger_metadata per workflow_run_id
|
||||
meta_expr = (
|
||||
select(WorkflowTriggerLog.trigger_metadata)
|
||||
.where(
|
||||
WorkflowTriggerLog.workflow_run_id == WorkflowAppLog.workflow_run_id,
|
||||
WorkflowTriggerLog.app_id == app_model.id,
|
||||
# Simple left join by workflow_run_id to fetch trigger_metadata
|
||||
stmt = stmt.outerjoin(
|
||||
WorkflowTriggerLog,
|
||||
and_(
|
||||
WorkflowTriggerLog.tenant_id == app_model.tenant_id,
|
||||
)
|
||||
.order_by(WorkflowTriggerLog.created_at.desc())
|
||||
.limit(1)
|
||||
.scalar_subquery()
|
||||
)
|
||||
stmt = stmt.add_columns(meta_expr)
|
||||
WorkflowTriggerLog.app_id == app_model.id,
|
||||
WorkflowTriggerLog.workflow_run_id == WorkflowAppLog.workflow_run_id,
|
||||
),
|
||||
).add_columns(WorkflowTriggerLog.trigger_metadata)
|
||||
|
||||
if keyword or status:
|
||||
stmt = stmt.join(WorkflowRun, WorkflowRun.id == WorkflowAppLog.workflow_run_id)
|
||||
|
|
@ -161,7 +151,10 @@ class WorkflowAppService:
|
|||
# Execute query and get items
|
||||
if detail:
|
||||
rows = session.execute(offset_stmt).all()
|
||||
items = [LogView(log, {"trigger_metadata": _safe_json_loads(meta_val)}) for log, meta_val in rows]
|
||||
items = [
|
||||
LogView(log, {"trigger_metadata": self.handle_trigger_metadata(app_model.tenant_id, meta_val)})
|
||||
for log, meta_val in rows
|
||||
]
|
||||
else:
|
||||
items = [LogView(log, None) for log in session.scalars(offset_stmt).all()]
|
||||
return {
|
||||
|
|
@ -172,6 +165,30 @@ class WorkflowAppService:
|
|||
"data": items,
|
||||
}
|
||||
|
||||
def handle_trigger_metadata(self, tenant_id: str, meta_val: str) -> dict[str, Any]:
|
||||
metadata: dict[str, Any] | None = self._safe_json_loads(meta_val)
|
||||
if not metadata:
|
||||
return {}
|
||||
icon = metadata.get("icon_filename")
|
||||
icon_dark = metadata.get("icon_dark_filename")
|
||||
return {
|
||||
"icon": PluginService.get_plugin_icon_url(tenant_id=tenant_id, filename=icon) if icon else None,
|
||||
"icon_dark": PluginService.get_plugin_icon_url(tenant_id=tenant_id, filename=icon_dark)
|
||||
if icon_dark
|
||||
else None,
|
||||
}
|
||||
|
||||
@staticmethod
|
||||
def _safe_json_loads(val):
|
||||
if not val:
|
||||
return None
|
||||
if isinstance(val, str):
|
||||
try:
|
||||
return json.loads(val)
|
||||
except Exception:
|
||||
return None
|
||||
return val
|
||||
|
||||
@staticmethod
|
||||
def _safe_parse_uuid(value: str):
|
||||
# fast check
|
||||
|
|
|
|||
|
|
@ -143,7 +143,7 @@ def _execute_workflow_common(
|
|||
invoke_from=InvokeFrom.SERVICE_API,
|
||||
streaming=False,
|
||||
call_depth=0,
|
||||
triggered_from=trigger_data.trigger_type,
|
||||
triggered_from=trigger_data.trigger_from,
|
||||
root_node_id=trigger_data.root_node_id,
|
||||
layers=[
|
||||
TimeSliceLayer(cfs_plan_scheduler),
|
||||
|
|
|
|||
|
|
@ -18,7 +18,7 @@ from core.plugin.entities.plugin_daemon import CredentialType
|
|||
from core.plugin.entities.request import TriggerInvokeEventResponse
|
||||
from core.trigger.debug.event_bus import TriggerDebugEventBus
|
||||
from core.trigger.debug.events import PluginTriggerDebugEvent, build_plugin_pool_key
|
||||
from core.trigger.entities.api_entities import TriggerProviderApiEntity
|
||||
from core.trigger.entities.entities import TriggerProviderEntity
|
||||
from core.trigger.provider import PluginTriggerProviderController
|
||||
from core.trigger.trigger_manager import TriggerManager
|
||||
from core.workflow.enums import NodeType
|
||||
|
|
@ -139,7 +139,7 @@ def dispatch_triggered_workflow(
|
|||
provider_controller: PluginTriggerProviderController = TriggerManager.get_trigger_provider(
|
||||
tenant_id=subscription.tenant_id, provider_id=TriggerProviderID(subscription.provider_id)
|
||||
)
|
||||
trigger_entity: TriggerProviderApiEntity = provider_controller.to_api_entity()
|
||||
trigger_entity: TriggerProviderEntity = provider_controller.entity
|
||||
with Session(db.engine) as session:
|
||||
workflows: Mapping[str, Workflow] = _get_latest_workflows_by_app_ids(session, subscribers)
|
||||
|
||||
|
|
@ -204,12 +204,11 @@ def dispatch_triggered_workflow(
|
|||
endpoint_id=subscription.endpoint_id,
|
||||
inputs=invoke_response.variables,
|
||||
trigger_metadata=PluginTriggerMetadata(
|
||||
plugin_id=trigger_entity.plugin_id or "",
|
||||
plugin_unique_identifier=trigger_entity.plugin_unique_identifier or "",
|
||||
plugin_unique_identifier=provider_controller.plugin_unique_identifier or "",
|
||||
endpoint_id=subscription.endpoint_id,
|
||||
provider_id=subscription.provider_id,
|
||||
icon_url=trigger_entity.icon or "",
|
||||
icon_dark_url=trigger_entity.icon_dark or "",
|
||||
icon_filename=trigger_entity.identity.icon or "",
|
||||
icon_dark_filename=trigger_entity.identity.icon_dark or "",
|
||||
),
|
||||
)
|
||||
|
||||
|
|
|
|||
|
|
@ -1,7 +1,5 @@
|
|||
import logging
|
||||
import time
|
||||
from datetime import UTC, datetime
|
||||
from zoneinfo import ZoneInfo
|
||||
|
||||
from celery import shared_task
|
||||
from sqlalchemy.orm import sessionmaker
|
||||
|
|
@ -47,10 +45,7 @@ def run_schedule_trigger(schedule_id: str) -> None:
|
|||
raise TenantOwnerNotFoundError(f"No owner or admin found for tenant {schedule.tenant_id}")
|
||||
|
||||
try:
|
||||
current_utc = datetime.now(UTC)
|
||||
schedule_tz = ZoneInfo(schedule.timezone) if schedule.timezone else UTC
|
||||
current_in_tz = current_utc.astimezone(schedule_tz)
|
||||
inputs = {"current_time": current_in_tz.isoformat()}
|
||||
inputs = {}
|
||||
|
||||
# Production dispatch: Trigger the workflow normally
|
||||
response = AsyncWorkflowService.trigger_workflow_async(
|
||||
|
|
|
|||
Loading…
Reference in New Issue