From 772ff636ec92b5b35635769a17ab0e49c3576df9 Mon Sep 17 00:00:00 2001 From: Xiyuan Chen <52963600+GareArc@users.noreply.github.com> Date: Wed, 14 Jan 2026 23:33:24 -0800 Subject: [PATCH] feat: credential sync fix for enterprise edition (#30626) --- api/events/event_handlers/__init__.py | 2 + ...eue_credential_sync_when_tenant_created.py | 19 ++++++ api/services/enterprise/workspace_sync.py | 58 +++++++++++++++++++ 3 files changed, 79 insertions(+) create mode 100644 api/events/event_handlers/queue_credential_sync_when_tenant_created.py create mode 100644 api/services/enterprise/workspace_sync.py diff --git a/api/events/event_handlers/__init__.py b/api/events/event_handlers/__init__.py index c79764983b..d37217e168 100644 --- a/api/events/event_handlers/__init__.py +++ b/api/events/event_handlers/__init__.py @@ -6,6 +6,7 @@ from .create_site_record_when_app_created import handle as handle_create_site_re from .delete_tool_parameters_cache_when_sync_draft_workflow import ( handle as handle_delete_tool_parameters_cache_when_sync_draft_workflow, ) +from .queue_credential_sync_when_tenant_created import handle as handle_queue_credential_sync_when_tenant_created from .sync_plugin_trigger_when_app_created import handle as handle_sync_plugin_trigger_when_app_created from .sync_webhook_when_app_created import handle as handle_sync_webhook_when_app_created from .sync_workflow_schedule_when_app_published import handle as handle_sync_workflow_schedule_when_app_published @@ -30,6 +31,7 @@ __all__ = [ "handle_create_installed_app_when_app_created", "handle_create_site_record_when_app_created", "handle_delete_tool_parameters_cache_when_sync_draft_workflow", + "handle_queue_credential_sync_when_tenant_created", "handle_sync_plugin_trigger_when_app_created", "handle_sync_webhook_when_app_created", "handle_sync_workflow_schedule_when_app_published", diff --git a/api/events/event_handlers/queue_credential_sync_when_tenant_created.py b/api/events/event_handlers/queue_credential_sync_when_tenant_created.py new file mode 100644 index 0000000000..6566c214b0 --- /dev/null +++ b/api/events/event_handlers/queue_credential_sync_when_tenant_created.py @@ -0,0 +1,19 @@ +from configs import dify_config +from events.tenant_event import tenant_was_created +from services.enterprise.workspace_sync import WorkspaceSyncService + + +@tenant_was_created.connect +def handle(sender, **kwargs): + """Queue credential sync when a tenant/workspace is created.""" + # Only queue sync tasks if plugin manager (enterprise feature) is enabled + if not dify_config.ENTERPRISE_ENABLED: + return + + tenant = sender + + # Determine source from kwargs if available, otherwise use generic + source = kwargs.get("source", "tenant_created") + + # Queue credential sync task to Redis for enterprise backend to process + WorkspaceSyncService.queue_credential_sync(tenant.id, source=source) diff --git a/api/services/enterprise/workspace_sync.py b/api/services/enterprise/workspace_sync.py new file mode 100644 index 0000000000..acfe325397 --- /dev/null +++ b/api/services/enterprise/workspace_sync.py @@ -0,0 +1,58 @@ +import json +import logging +import uuid +from datetime import UTC, datetime + +from redis import RedisError + +from extensions.ext_redis import redis_client + +logger = logging.getLogger(__name__) + +WORKSPACE_SYNC_QUEUE = "enterprise:workspace:sync:queue" +WORKSPACE_SYNC_PROCESSING = "enterprise:workspace:sync:processing" + + +class WorkspaceSyncService: + """Service to publish workspace sync tasks to Redis queue for enterprise backend consumption""" + + @staticmethod + def queue_credential_sync(workspace_id: str, *, source: str) -> bool: + """ + Queue a credential sync task for a newly created workspace. + + This publishes a task to Redis that will be consumed by the enterprise backend + worker to sync credentials with the plugin-manager. + + Args: + workspace_id: The workspace/tenant ID to sync credentials for + source: Source of the sync request (for debugging/tracking) + + Returns: + bool: True if task was queued successfully, False otherwise + """ + try: + task = { + "task_id": str(uuid.uuid4()), + "workspace_id": workspace_id, + "retry_count": 0, + "created_at": datetime.now(UTC).isoformat(), + "source": source, + } + + # Push to Redis list (queue) - LPUSH adds to the head, worker consumes from tail with RPOP + redis_client.lpush(WORKSPACE_SYNC_QUEUE, json.dumps(task)) + + logger.info( + "Queued credential sync task for workspace %s, task_id: %s, source: %s", + workspace_id, + task["task_id"], + source, + ) + return True + + except (RedisError, TypeError) as e: + logger.error("Failed to queue credential sync for workspace %s: %s", workspace_id, str(e), exc_info=True) + # Don't raise - we don't want to fail workspace creation if queueing fails + # The scheduled task will catch it later + return False