mirror of https://github.com/langgenius/dify.git
Merge remote-tracking branch 'origin/feat/trigger' into feat/trigger
This commit is contained in:
commit
eb1686f04b
|
|
@ -1020,6 +1020,28 @@ class CeleryScheduleTasksConfig(BaseSettings):
|
|||
default=0,
|
||||
)
|
||||
|
||||
# Trigger provider refresh (simple version)
|
||||
ENABLE_TRIGGER_PROVIDER_REFRESH_TASK: bool = Field(
|
||||
description="Enable trigger provider refresh poller",
|
||||
default=True,
|
||||
)
|
||||
TRIGGER_PROVIDER_REFRESH_INTERVAL: int = Field(
|
||||
description="Trigger provider refresh poller interval in minutes",
|
||||
default=1,
|
||||
)
|
||||
TRIGGER_PROVIDER_REFRESH_BATCH_SIZE: int = Field(
|
||||
description="Max trigger subscriptions to process per tick",
|
||||
default=200,
|
||||
)
|
||||
TRIGGER_PROVIDER_CREDENTIAL_THRESHOLD_SECONDS: int = Field(
|
||||
description="Proactive credential refresh threshold in seconds",
|
||||
default=180,
|
||||
)
|
||||
TRIGGER_PROVIDER_SUBSCRIPTION_THRESHOLD_SECONDS: int = Field(
|
||||
description="Proactive subscription refresh threshold in seconds",
|
||||
default=300,
|
||||
)
|
||||
|
||||
|
||||
class PositionConfig(BaseSettings):
|
||||
POSITION_PROVIDER_PINS: str = Field(
|
||||
|
|
|
|||
|
|
@ -8,6 +8,7 @@ from werkzeug.exceptions import BadRequest, Forbidden
|
|||
from configs import dify_config
|
||||
from controllers.console import api
|
||||
from controllers.console.wraps import account_initialization_required, setup_required
|
||||
from controllers.web.error import NotFoundError
|
||||
from core.model_runtime.utils.encoders import jsonable_encoder
|
||||
from core.plugin.entities.plugin_daemon import CredentialType
|
||||
from core.plugin.impl.oauth import OAuthHandler
|
||||
|
|
@ -318,7 +319,7 @@ class TriggerOAuthAuthorizeApi(Resource):
|
|||
)
|
||||
|
||||
if oauth_client_params is None:
|
||||
raise Forbidden("No OAuth client configuration found for this trigger provider")
|
||||
raise NotFoundError("No OAuth client configuration found for this trigger provider")
|
||||
|
||||
# Create subscription builder
|
||||
subscription_builder = TriggerSubscriptionBuilderService.create_trigger_subscription_builder(
|
||||
|
|
|
|||
|
|
@ -166,6 +166,12 @@ def init_app(app: DifyApp) -> Celery:
|
|||
"task": "schedule.workflow_schedule_task.poll_workflow_schedules",
|
||||
"schedule": timedelta(minutes=dify_config.WORKFLOW_SCHEDULE_POLLER_INTERVAL),
|
||||
}
|
||||
if dify_config.ENABLE_TRIGGER_PROVIDER_REFRESH_TASK:
|
||||
imports.append("schedule.trigger_provider_refresh_task")
|
||||
beat_schedule["trigger_provider_refresh"] = {
|
||||
"task": "schedule.trigger_provider_refresh_task.trigger_provider_refresh",
|
||||
"schedule": timedelta(minutes=dify_config.TRIGGER_PROVIDER_REFRESH_INTERVAL),
|
||||
}
|
||||
celery_app.conf.update(beat_schedule=beat_schedule, imports=imports)
|
||||
|
||||
return celery_app
|
||||
|
|
|
|||
|
|
@ -0,0 +1,21 @@
|
|||
import logging
|
||||
import time
|
||||
|
||||
import app
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def _now_ts() -> int:
|
||||
return int(time.time())
|
||||
|
||||
|
||||
@app.celery.task(queue="trigger_refresh")
|
||||
def trigger_provider_refresh() -> None:
|
||||
"""
|
||||
Simple trigger provider refresh task.
|
||||
- Scans due trigger subscriptions in small batches
|
||||
- Refreshes OAuth credentials if needed
|
||||
- Refreshes subscription metadata if needed
|
||||
"""
|
||||
pass
|
||||
|
|
@ -336,7 +336,7 @@ class TriggerProviderService:
|
|||
|
||||
# Update credentials
|
||||
subscription.credentials = dict(encrypter.encrypt(dict(refreshed_credentials.credentials)))
|
||||
subscription.expires_at = refreshed_credentials.expires_at
|
||||
subscription.credential_expires_at = refreshed_credentials.expires_at
|
||||
session.commit()
|
||||
|
||||
# Clear cache
|
||||
|
|
@ -448,7 +448,9 @@ class TriggerProviderService:
|
|||
session.add(custom_client)
|
||||
|
||||
# Update client params if provided
|
||||
if client_params is not None:
|
||||
if client_params is None:
|
||||
custom_client.encrypted_oauth_params = json.dumps({})
|
||||
else:
|
||||
encrypter, cache = create_provider_encrypter(
|
||||
tenant_id=tenant_id,
|
||||
config=[x.to_basic_provider_config() for x in provider_controller.get_oauth_client_schema()],
|
||||
|
|
|
|||
Loading…
Reference in New Issue