diff --git a/api/configs/feature/__init__.py b/api/configs/feature/__init__.py index bcedd32ae3..af33410a95 100644 --- a/api/configs/feature/__init__.py +++ b/api/configs/feature/__init__.py @@ -1020,6 +1020,28 @@ class CeleryScheduleTasksConfig(BaseSettings): default=0, ) + # Trigger provider refresh (simple version) + ENABLE_TRIGGER_PROVIDER_REFRESH_TASK: bool = Field( + description="Enable trigger provider refresh poller", + default=True, + ) + TRIGGER_PROVIDER_REFRESH_INTERVAL: int = Field( + description="Trigger provider refresh poller interval in minutes", + default=1, + ) + TRIGGER_PROVIDER_REFRESH_BATCH_SIZE: int = Field( + description="Max trigger subscriptions to process per tick", + default=200, + ) + TRIGGER_PROVIDER_CREDENTIAL_THRESHOLD_SECONDS: int = Field( + description="Proactive credential refresh threshold in seconds", + default=180, + ) + TRIGGER_PROVIDER_SUBSCRIPTION_THRESHOLD_SECONDS: int = Field( + description="Proactive subscription refresh threshold in seconds", + default=300, + ) + class PositionConfig(BaseSettings): POSITION_PROVIDER_PINS: str = Field( diff --git a/api/controllers/console/workspace/trigger_providers.py b/api/controllers/console/workspace/trigger_providers.py index 8afec9a94d..631b82979b 100644 --- a/api/controllers/console/workspace/trigger_providers.py +++ b/api/controllers/console/workspace/trigger_providers.py @@ -8,6 +8,7 @@ from werkzeug.exceptions import BadRequest, Forbidden from configs import dify_config from controllers.console import api from controllers.console.wraps import account_initialization_required, setup_required +from controllers.web.error import NotFoundError from core.model_runtime.utils.encoders import jsonable_encoder from core.plugin.entities.plugin_daemon import CredentialType from core.plugin.impl.oauth import OAuthHandler @@ -318,7 +319,7 @@ class TriggerOAuthAuthorizeApi(Resource): ) if oauth_client_params is None: - raise Forbidden("No OAuth client configuration found for this trigger provider") + raise NotFoundError("No OAuth client configuration found for this trigger provider") # Create subscription builder subscription_builder = TriggerSubscriptionBuilderService.create_trigger_subscription_builder( diff --git a/api/extensions/ext_celery.py b/api/extensions/ext_celery.py index ab73242eee..5cf4984709 100644 --- a/api/extensions/ext_celery.py +++ b/api/extensions/ext_celery.py @@ -166,6 +166,12 @@ def init_app(app: DifyApp) -> Celery: "task": "schedule.workflow_schedule_task.poll_workflow_schedules", "schedule": timedelta(minutes=dify_config.WORKFLOW_SCHEDULE_POLLER_INTERVAL), } + if dify_config.ENABLE_TRIGGER_PROVIDER_REFRESH_TASK: + imports.append("schedule.trigger_provider_refresh_task") + beat_schedule["trigger_provider_refresh"] = { + "task": "schedule.trigger_provider_refresh_task.trigger_provider_refresh", + "schedule": timedelta(minutes=dify_config.TRIGGER_PROVIDER_REFRESH_INTERVAL), + } celery_app.conf.update(beat_schedule=beat_schedule, imports=imports) return celery_app diff --git a/api/schedule/trigger_provider_refresh_task.py b/api/schedule/trigger_provider_refresh_task.py new file mode 100644 index 0000000000..f91a849a71 --- /dev/null +++ b/api/schedule/trigger_provider_refresh_task.py @@ -0,0 +1,21 @@ +import logging +import time + +import app + +logger = logging.getLogger(__name__) + + +def _now_ts() -> int: + return int(time.time()) + + +@app.celery.task(queue="trigger_refresh") +def trigger_provider_refresh() -> None: + """ + Simple trigger provider refresh task. + - Scans due trigger subscriptions in small batches + - Refreshes OAuth credentials if needed + - Refreshes subscription metadata if needed + """ + pass \ No newline at end of file diff --git a/api/services/trigger/trigger_provider_service.py b/api/services/trigger/trigger_provider_service.py index 5b6b18c37b..4cc19b485a 100644 --- a/api/services/trigger/trigger_provider_service.py +++ b/api/services/trigger/trigger_provider_service.py @@ -336,7 +336,7 @@ class TriggerProviderService: # Update credentials subscription.credentials = dict(encrypter.encrypt(dict(refreshed_credentials.credentials))) - subscription.expires_at = refreshed_credentials.expires_at + subscription.credential_expires_at = refreshed_credentials.expires_at session.commit() # Clear cache @@ -448,7 +448,9 @@ class TriggerProviderService: session.add(custom_client) # Update client params if provided - if client_params is not None: + if client_params is None: + custom_client.encrypted_oauth_params = json.dumps({}) + else: encrypter, cache = create_provider_encrypter( tenant_id=tenant_id, config=[x.to_basic_provider_config() for x in provider_controller.get_oauth_client_schema()],