mirror of https://github.com/langgenius/dify.git
fix: cycle imports
This commit is contained in:
parent
8bc5035624
commit
6260a1a28c
|
|
@ -1072,7 +1072,9 @@ class DraftWorkflowTriggerNodeApi(Resource):
|
|||
return jsonable_encoder(node_execution)
|
||||
except Exception as e:
|
||||
logger.exception("Error running draft workflow trigger node")
|
||||
return jsonable_encoder({"status": "error", "error": "An unexpected error occurred while running the node."}), 500
|
||||
return jsonable_encoder(
|
||||
{"status": "error", "error": "An unexpected error occurred while running the node."}
|
||||
), 500
|
||||
|
||||
|
||||
@console_ns.route("/apps/<uuid:app_id>/workflows/draft/trigger/run-all")
|
||||
|
|
|
|||
|
|
@ -20,8 +20,8 @@ from models.account import Account
|
|||
from models.provider_ids import TriggerProviderID
|
||||
from services.plugin.oauth_service import OAuthProxyService
|
||||
from services.trigger.trigger_provider_service import TriggerProviderService
|
||||
from services.trigger.trigger_service import TriggerService
|
||||
from services.trigger.trigger_subscription_builder_service import TriggerSubscriptionBuilderService
|
||||
from services.trigger.trigger_subscription_operator_service import TriggerSubscriptionOperatorService
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
|
@ -265,7 +265,7 @@ class TriggerSubscriptionDeleteApi(Resource):
|
|||
@setup_required
|
||||
@login_required
|
||||
@account_initialization_required
|
||||
def post(self, subscription_id):
|
||||
def post(self, subscription_id: str):
|
||||
"""Delete a subscription instance"""
|
||||
user = current_user
|
||||
assert isinstance(user, Account)
|
||||
|
|
@ -282,7 +282,7 @@ class TriggerSubscriptionDeleteApi(Resource):
|
|||
subscription_id=subscription_id,
|
||||
)
|
||||
# Delete plugin triggers
|
||||
TriggerService.delete_plugin_trigger_by_subscription(
|
||||
TriggerSubscriptionOperatorService.delete_plugin_trigger_by_subscription(
|
||||
session=session,
|
||||
tenant_id=user.current_tenant_id,
|
||||
subscription_id=subscription_id,
|
||||
|
|
|
|||
|
|
@ -14,6 +14,7 @@ from libs.schedule_utils import calculate_next_run_at, convert_12h_to_24h
|
|||
from models.account import Account, TenantAccountJoin
|
||||
from models.trigger import WorkflowSchedulePlan
|
||||
from models.workflow import Workflow
|
||||
from services.errors.account import AccountNotFoundError
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
|
@ -124,7 +125,7 @@ class ScheduleService:
|
|||
session.flush()
|
||||
|
||||
@staticmethod
|
||||
def get_tenant_owner(session: Session, tenant_id: str) -> Optional[Account]:
|
||||
def get_tenant_owner(session: Session, tenant_id: str) -> Account:
|
||||
"""
|
||||
Returns an account to execute scheduled workflows on behalf of the tenant.
|
||||
Prioritizes owner over admin to ensure proper authorization hierarchy.
|
||||
|
|
@ -144,7 +145,12 @@ class ScheduleService:
|
|||
).scalar_one_or_none()
|
||||
|
||||
if result:
|
||||
return session.get(Account, result.account_id)
|
||||
account = session.get(Account, result.account_id)
|
||||
if not account:
|
||||
raise AccountNotFoundError(f"Account not found: {result.account_id}")
|
||||
return account
|
||||
else:
|
||||
raise AccountNotFoundError(f"Account not found for tenant: {tenant_id}")
|
||||
|
||||
@staticmethod
|
||||
def update_next_run_at(
|
||||
|
|
|
|||
|
|
@ -6,7 +6,7 @@ from typing import Any
|
|||
|
||||
from flask import Request, Response
|
||||
from pydantic import BaseModel
|
||||
from sqlalchemy import and_, select
|
||||
from sqlalchemy import select
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
from core.plugin.entities.plugin_daemon import CredentialType
|
||||
|
|
@ -22,7 +22,7 @@ from extensions.ext_database import db
|
|||
from extensions.ext_redis import redis_client
|
||||
from models.model import App
|
||||
from models.provider_ids import TriggerProviderID
|
||||
from models.trigger import AppTrigger, AppTriggerStatus, TriggerSubscription, WorkflowPluginTrigger
|
||||
from models.trigger import TriggerSubscription, WorkflowPluginTrigger
|
||||
from models.workflow import Workflow
|
||||
from services.trigger.trigger_provider_service import TriggerProviderService
|
||||
from services.trigger.trigger_request_service import TriggerHttpRequestCachingService
|
||||
|
|
@ -148,68 +148,6 @@ class TriggerService:
|
|||
)
|
||||
return dispatch_response.response
|
||||
|
||||
@classmethod
|
||||
def get_subscriber_triggers(
|
||||
cls, tenant_id: str, subscription_id: str, event_name: str
|
||||
) -> list[WorkflowPluginTrigger]:
|
||||
"""
|
||||
Get WorkflowPluginTriggers for a subscription and trigger.
|
||||
|
||||
Args:
|
||||
tenant_id: Tenant ID
|
||||
subscription_id: Subscription ID
|
||||
event_name: Event name
|
||||
"""
|
||||
with Session(db.engine, expire_on_commit=False) as session:
|
||||
subscribers = session.scalars(
|
||||
select(WorkflowPluginTrigger)
|
||||
.join(
|
||||
AppTrigger,
|
||||
and_(
|
||||
AppTrigger.tenant_id == WorkflowPluginTrigger.tenant_id,
|
||||
AppTrigger.app_id == WorkflowPluginTrigger.app_id,
|
||||
AppTrigger.node_id == WorkflowPluginTrigger.node_id,
|
||||
),
|
||||
)
|
||||
.where(
|
||||
WorkflowPluginTrigger.tenant_id == tenant_id,
|
||||
WorkflowPluginTrigger.subscription_id == subscription_id,
|
||||
WorkflowPluginTrigger.event_name == event_name,
|
||||
AppTrigger.status == AppTriggerStatus.ENABLED,
|
||||
)
|
||||
).all()
|
||||
return list(subscribers)
|
||||
|
||||
@classmethod
|
||||
def delete_plugin_trigger_by_subscription(
|
||||
cls,
|
||||
session: Session,
|
||||
tenant_id: str,
|
||||
subscription_id: str,
|
||||
) -> None:
|
||||
"""Delete a plugin trigger by tenant_id and subscription_id within an existing session
|
||||
|
||||
Args:
|
||||
session: Database session
|
||||
tenant_id: The tenant ID
|
||||
subscription_id: The subscription ID
|
||||
|
||||
Raises:
|
||||
NotFound: If plugin trigger not found
|
||||
"""
|
||||
# Find plugin trigger using indexed columns
|
||||
plugin_trigger = session.scalar(
|
||||
select(WorkflowPluginTrigger).where(
|
||||
WorkflowPluginTrigger.tenant_id == tenant_id,
|
||||
WorkflowPluginTrigger.subscription_id == subscription_id,
|
||||
)
|
||||
)
|
||||
|
||||
if not plugin_trigger:
|
||||
return
|
||||
|
||||
session.delete(plugin_trigger)
|
||||
|
||||
@classmethod
|
||||
def sync_plugin_trigger_relationships(cls, app: App, workflow: Workflow):
|
||||
"""
|
||||
|
|
|
|||
|
|
@ -0,0 +1,70 @@
|
|||
from sqlalchemy import and_, select
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
from extensions.ext_database import db
|
||||
from models.enums import AppTriggerStatus
|
||||
from models.trigger import AppTrigger, WorkflowPluginTrigger
|
||||
|
||||
|
||||
class TriggerSubscriptionOperatorService:
|
||||
@classmethod
|
||||
def get_subscriber_triggers(
|
||||
cls, tenant_id: str, subscription_id: str, event_name: str
|
||||
) -> list[WorkflowPluginTrigger]:
|
||||
"""
|
||||
Get WorkflowPluginTriggers for a subscription and trigger.
|
||||
|
||||
Args:
|
||||
tenant_id: Tenant ID
|
||||
subscription_id: Subscription ID
|
||||
event_name: Event name
|
||||
"""
|
||||
with Session(db.engine, expire_on_commit=False) as session:
|
||||
subscribers = session.scalars(
|
||||
select(WorkflowPluginTrigger)
|
||||
.join(
|
||||
AppTrigger,
|
||||
and_(
|
||||
AppTrigger.tenant_id == WorkflowPluginTrigger.tenant_id,
|
||||
AppTrigger.app_id == WorkflowPluginTrigger.app_id,
|
||||
AppTrigger.node_id == WorkflowPluginTrigger.node_id,
|
||||
),
|
||||
)
|
||||
.where(
|
||||
WorkflowPluginTrigger.tenant_id == tenant_id,
|
||||
WorkflowPluginTrigger.subscription_id == subscription_id,
|
||||
WorkflowPluginTrigger.event_name == event_name,
|
||||
AppTrigger.status == AppTriggerStatus.ENABLED,
|
||||
)
|
||||
).all()
|
||||
return list(subscribers)
|
||||
|
||||
@classmethod
|
||||
def delete_plugin_trigger_by_subscription(
|
||||
cls,
|
||||
session: Session,
|
||||
tenant_id: str,
|
||||
subscription_id: str,
|
||||
) -> None:
|
||||
"""Delete a plugin trigger by tenant_id and subscription_id within an existing session
|
||||
|
||||
Args:
|
||||
session: Database session
|
||||
tenant_id: The tenant ID
|
||||
subscription_id: The subscription ID
|
||||
|
||||
Raises:
|
||||
NotFound: If plugin trigger not found
|
||||
"""
|
||||
# Find plugin trigger using indexed columns
|
||||
plugin_trigger = session.scalar(
|
||||
select(WorkflowPluginTrigger).where(
|
||||
WorkflowPluginTrigger.tenant_id == tenant_id,
|
||||
WorkflowPluginTrigger.subscription_id == subscription_id,
|
||||
)
|
||||
)
|
||||
|
||||
if not plugin_trigger:
|
||||
return
|
||||
|
||||
session.delete(plugin_trigger)
|
||||
|
|
@ -32,6 +32,7 @@ from services.async_workflow_service import AsyncWorkflowService
|
|||
from services.end_user_service import EndUserService
|
||||
from services.trigger.trigger_provider_service import TriggerProviderService
|
||||
from services.trigger.trigger_request_service import TriggerHttpRequestCachingService
|
||||
from services.trigger.trigger_subscription_operator_service import TriggerSubscriptionOperatorService
|
||||
from services.workflow.entities import PluginTriggerData, PluginTriggerDispatchData, PluginTriggerMetadata
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
|
@ -121,10 +122,7 @@ def dispatch_triggered_workflow(
|
|||
request = TriggerHttpRequestCachingService.get_request(request_id)
|
||||
payload = TriggerHttpRequestCachingService.get_payload(request_id)
|
||||
|
||||
from services.trigger.trigger_service import TriggerService
|
||||
# FIXME: we should avoid import modules inside methods
|
||||
|
||||
subscribers: list[WorkflowPluginTrigger] = TriggerService.get_subscriber_triggers(
|
||||
subscribers: list[WorkflowPluginTrigger] = TriggerSubscriptionOperatorService.get_subscriber_triggers(
|
||||
tenant_id=subscription.tenant_id, subscription_id=subscription.id, event_name=event_name
|
||||
)
|
||||
if not subscribers:
|
||||
|
|
|
|||
Loading…
Reference in New Issue