From 2d1621c43d90029c607d305741271825a5e3043a Mon Sep 17 00:00:00 2001 From: hjlarry Date: Fri, 8 Aug 2025 14:54:18 +0800 Subject: [PATCH] add leader but not review --- api/controllers/console/app/online_user.py | 63 +++++++++++++++++-- .../hooks/use-nodes-sync-draft.ts | 9 ++- .../core/collaboration-manager.ts | 25 +++++++- .../collaboration/hooks/use-collaboration.ts | 9 ++- 4 files changed, 98 insertions(+), 8 deletions(-) diff --git a/api/controllers/console/app/online_user.py b/api/controllers/console/app/online_user.py index 9720a1ae0e..fca4498b8f 100644 --- a/api/controllers/console/app/online_user.py +++ b/api/controllers/console/app/online_user.py @@ -67,7 +67,24 @@ def handle_user_connect(sid, data): "sid": sid, } - redis_client.hset(f"workflow_online_users:{workflow_id}", user_id, json.dumps(user_info)) + # --- Leader Election Logic --- + workflow_users_key = f"workflow_online_users:{workflow_id}" + workflow_order_key = f"workflow_user_order:{workflow_id}" + + # Remove user from list in case of reconnection, to add them to the end + redis_client.lrem(workflow_order_key, 0, user_id) + # Add user to the end of the list + redis_client.rpush(workflow_order_key, user_id) + + # The first user in the list is the leader + leader_user_id_bytes = redis_client.lindex(workflow_order_key, 0) + is_leader = leader_user_id_bytes and leader_user_id_bytes.decode("utf-8") == user_id + + # Notify the connecting client of their leader status + sio.emit("status", {"isLeader": is_leader}, room=sid) + # --- End of Leader Election Logic --- + + redis_client.hset(workflow_users_key, user_id, json.dumps(user_info)) redis_client.set(f"ws_sid_map:{sid}", json.dumps({"workflow_id": workflow_id, "user_id": user_id})) sio.enter_room(sid, workflow_id) @@ -86,8 +103,35 @@ def handle_disconnect(sid): data = json.loads(mapping) workflow_id = data["workflow_id"] user_id = data["user_id"] - redis_client.hdel(f"workflow_online_users:{workflow_id}", user_id) + + workflow_users_key = f"workflow_online_users:{workflow_id}" + workflow_order_key = f"workflow_user_order:{workflow_id}" + + # Get leader before any modification + leader_user_id_bytes = redis_client.lindex(workflow_order_key, 0) + was_leader = leader_user_id_bytes and leader_user_id_bytes.decode("utf-8") == user_id + + # Remove user + redis_client.hdel(workflow_users_key, user_id) redis_client.delete(f"ws_sid_map:{sid}") + redis_client.lrem(workflow_order_key, 0, user_id) + + # Check if leader disconnected and a new one needs to be elected + if was_leader: + new_leader_user_id_bytes = redis_client.lindex(workflow_order_key, 0) + if new_leader_user_id_bytes: + new_leader_user_id = new_leader_user_id_bytes.decode("utf-8") + # get new leader's info to find their sid + new_leader_info_json = redis_client.hget(workflow_users_key, new_leader_user_id) + if new_leader_info_json: + new_leader_info = json.loads(new_leader_info_json) + new_leader_sid = new_leader_info.get("sid") + if new_leader_sid: + sio.emit("status", {"isLeader": True}, room=new_leader_sid) + + # If the room is empty, clean up the redis key + if redis_client.llen(workflow_order_key) == 0: + redis_client.delete(workflow_order_key) broadcast_online_users(workflow_id) @@ -96,14 +140,25 @@ def broadcast_online_users(workflow_id): """ broadcast online users to the workflow room """ - users_json = redis_client.hgetall(f"workflow_online_users:{workflow_id}") + workflow_users_key = f"workflow_online_users:{workflow_id}" + workflow_order_key = f"workflow_user_order:{workflow_id}" + + # The first user in the list is the leader + leader_user_id_bytes = redis_client.lindex(workflow_order_key, 0) + leader_user_id = leader_user_id_bytes.decode("utf-8") if leader_user_id_bytes else None + + users_json = redis_client.hgetall(workflow_users_key) users = [] for _, user_info_json in users_json.items(): try: users.append(json.loads(user_info_json)) except Exception: continue - sio.emit("online_users", {"workflow_id": workflow_id, "users": users}, room=workflow_id) + sio.emit( + "online_users", + {"workflow_id": workflow_id, "users": users, "leader": leader_user_id}, + room=workflow_id, + ) @sio.on("collaboration_event") diff --git a/web/app/components/workflow-app/hooks/use-nodes-sync-draft.ts b/web/app/components/workflow-app/hooks/use-nodes-sync-draft.ts index 312d38a259..8516fdc711 100644 --- a/web/app/components/workflow-app/hooks/use-nodes-sync-draft.ts +++ b/web/app/components/workflow-app/hooks/use-nodes-sync-draft.ts @@ -13,6 +13,7 @@ import { syncWorkflowDraft } from '@/service/workflow' import { useFeaturesStore } from '@/app/components/base/features/hooks' import { API_PREFIX } from '@/config' import { useWorkflowRefreshDraft } from '.' +import { useCollaboration } from '@/app/components/workflow/collaboration/hooks/use-collaboration' export const useNodesSyncDraft = () => { const store = useStoreApi() @@ -21,6 +22,7 @@ export const useNodesSyncDraft = () => { const { getNodesReadOnly } = useNodesReadOnly() const { handleRefreshWorkflowDraft } = useWorkflowRefreshDraft() const params = useParams() + const { isLeader } = useCollaboration(params.appId as string) const getPostParams = useCallback(() => { const { @@ -85,13 +87,14 @@ export const useNodesSyncDraft = () => { environment_variables: environmentVariables, conversation_variables: conversationVariables, hash: syncWorkflowDraftHash, + _is_collaborative: true, }, } } }, [store, featuresStore, workflowStore]) const syncWorkflowDraftWhenPageClose = useCallback(() => { - if (getNodesReadOnly()) + if (getNodesReadOnly() || !isLeader) return const postParams = getPostParams() @@ -111,8 +114,10 @@ export const useNodesSyncDraft = () => { onSettled?: () => void }, ) => { - if (getNodesReadOnly()) + if (getNodesReadOnly() || !isLeader) return + + console.log('I am the leader, saving draft...') const postParams = getPostParams() if (postParams) { diff --git a/web/app/components/workflow/collaboration/core/collaboration-manager.ts b/web/app/components/workflow/collaboration/core/collaboration-manager.ts index e491ebc2fd..8feb0770c7 100644 --- a/web/app/components/workflow/collaboration/core/collaboration-manager.ts +++ b/web/app/components/workflow/collaboration/core/collaboration-manager.ts @@ -14,6 +14,8 @@ export class CollaborationManager { private eventEmitter = new EventEmitter() private currentAppId: string | null = null private reactFlowStore: any = null + private isLeader = false + private leaderId: string | null = null private cursors: Record = {} init = (appId: string, reactFlowStore: any): void => { @@ -115,6 +117,10 @@ export class CollaborationManager { return this.eventEmitter.on('varsAndFeaturesUpdate', callback) } + onLeaderChange(callback: (isLeader: boolean) => void): () => void { + return this.eventEmitter.on('leaderChange', callback) + } + private syncNodes(oldNodes: Node[], newNodes: Node[]): void { if (!this.nodesMap) return @@ -223,7 +229,7 @@ export class CollaborationManager { } }) - socket.on('online_users', (data: { users: OnlineUser[] }) => { + socket.on('online_users', (data: { users: OnlineUser[]; leader: string }) => { const onlineUserIds = new Set(data.users.map(user => user.user_id)) // Remove cursors for offline users @@ -233,10 +239,27 @@ export class CollaborationManager { }) console.log('Updated online users and cleaned offline cursors:', data.users) + this.leaderId = data.leader this.eventEmitter.emit('onlineUsers', data.users) this.eventEmitter.emit('cursors', { ...this.cursors }) }) + socket.on('status', (data: { isLeader: boolean }) => { + if (this.isLeader !== data.isLeader) { + this.isLeader = data.isLeader + console.log(`Collaboration: I am now the ${this.isLeader ? 'Leader' : 'Follower'}.`) + this.eventEmitter.emit('leaderChange', this.isLeader) + } + }) + + socket.on('status', (data: { isLeader: boolean }) => { + if (this.isLeader !== data.isLeader) { + this.isLeader = data.isLeader + console.log(`Collaboration: I am now the ${this.isLeader ? 'Leader' : 'Follower'}.`) + this.eventEmitter.emit('leaderChange', this.isLeader) + } + }) + socket.on('connect', () => { this.eventEmitter.emit('stateChange', { isConnected: true }) }) diff --git a/web/app/components/workflow/collaboration/hooks/use-collaboration.ts b/web/app/components/workflow/collaboration/hooks/use-collaboration.ts index bd570508a9..b4941870bb 100644 --- a/web/app/components/workflow/collaboration/hooks/use-collaboration.ts +++ b/web/app/components/workflow/collaboration/hooks/use-collaboration.ts @@ -4,10 +4,11 @@ import { CursorService } from '../services/cursor-service' import type { CollaborationState } from '../types/collaboration' export function useCollaboration(appId: string, reactFlowStore?: any) { - const [state, setState] = useState>({ + const [state, setState] = useState>({ isConnected: false, onlineUsers: [], cursors: {}, + isLeader: false, }) const cursorServiceRef = useRef(null) @@ -44,10 +45,15 @@ export function useCollaboration(appId: string, reactFlowStore?: any) { setState((prev: any) => ({ ...prev, onlineUsers: users })) }) + const unsubscribeLeaderChange = collaborationManager.onLeaderChange((isLeader: boolean) => { + setState((prev: any) => ({ ...prev, isLeader })) + }) + return () => { unsubscribeStateChange() unsubscribeCursors() unsubscribeUsers() + unsubscribeLeaderChange() cursorServiceRef.current?.stopTracking() collaborationManager.disconnect() } @@ -69,6 +75,7 @@ export function useCollaboration(appId: string, reactFlowStore?: any) { isConnected: state.isConnected || false, onlineUsers: state.onlineUsers || [], cursors: state.cursors || {}, + isLeader: state.isLeader || false, startCursorTracking, stopCursorTracking, }