From 7dc85570334f6628229936943e654d25d30a6d3b Mon Sep 17 00:00:00 2001 From: hjlarry Date: Thu, 21 Aug 2025 16:17:16 +0800 Subject: [PATCH] add Leader election --- api/controllers/console/app/online_user.py | 93 +++++++++++++++++- .../core/collaboration-manager.ts | 96 ++++++++++++++++--- .../collaboration/hooks/use-collaboration.ts | 12 ++- .../collaboration/test/leader-test.tsx | 61 ++++++++++++ 4 files changed, 243 insertions(+), 19 deletions(-) create mode 100644 web/app/components/workflow/collaboration/test/leader-test.tsx diff --git a/api/controllers/console/app/online_user.py b/api/controllers/console/app/online_user.py index 9720a1ae0e..1e054c3690 100644 --- a/api/controllers/console/app/online_user.py +++ b/api/controllers/console/app/online_user.py @@ -70,10 +70,17 @@ def handle_user_connect(sid, data): redis_client.hset(f"workflow_online_users:{workflow_id}", user_id, json.dumps(user_info)) redis_client.set(f"ws_sid_map:{sid}", json.dumps({"workflow_id": workflow_id, "user_id": user_id})) + # Leader election: first user becomes the leader + leader_id = get_or_set_leader(workflow_id, user_id) + is_leader = leader_id == user_id + sio.enter_room(sid, workflow_id) broadcast_online_users(workflow_id) + + # Notify user of their status + sio.emit("status", {"isLeader": is_leader}, room=sid) - return {"msg": "connected", "user_id": user_id, "sid": sid} + return {"msg": "connected", "user_id": user_id, "sid": sid, "isLeader": is_leader} @sio.on("disconnect") @@ -89,9 +96,83 @@ def handle_disconnect(sid): redis_client.hdel(f"workflow_online_users:{workflow_id}", user_id) redis_client.delete(f"ws_sid_map:{sid}") + # Handle leader re-election if the leader disconnected + handle_leader_disconnect(workflow_id, user_id) + broadcast_online_users(workflow_id) +def get_or_set_leader(workflow_id, user_id): + """ + Get current leader or set the user as leader if no leader exists. + Returns the leader user_id. + """ + leader_key = f"workflow_leader:{workflow_id}" + current_leader = redis_client.get(leader_key) + + if not current_leader: + # No leader exists, make this user the leader + redis_client.set(leader_key, user_id, ex=3600) # Expire in 1 hour + return user_id + + return current_leader.decode('utf-8') if isinstance(current_leader, bytes) else current_leader + + +def handle_leader_disconnect(workflow_id, disconnected_user_id): + """ + Handle leader re-election when a user disconnects. + """ + leader_key = f"workflow_leader:{workflow_id}" + current_leader = redis_client.get(leader_key) + + if current_leader: + current_leader = current_leader.decode('utf-8') if isinstance(current_leader, bytes) else current_leader + + if current_leader == disconnected_user_id: + # Leader disconnected, elect a new leader + users_json = redis_client.hgetall(f"workflow_online_users:{workflow_id}") + + if users_json: + # Get the first remaining user as new leader + new_leader_id = list(users_json.keys())[0] + if isinstance(new_leader_id, bytes): + new_leader_id = new_leader_id.decode('utf-8') + + redis_client.set(leader_key, new_leader_id, ex=3600) + + # Notify all users about the new leader + broadcast_leader_change(workflow_id, new_leader_id) + else: + # No users left, remove leader + redis_client.delete(leader_key) + + +def broadcast_leader_change(workflow_id, new_leader_id): + """ + Broadcast leader change to all users in the workflow. + """ + users_json = redis_client.hgetall(f"workflow_online_users:{workflow_id}") + + for user_id, user_info_json in users_json.items(): + try: + user_info = json.loads(user_info_json) + user_sid = user_info.get("sid") + if user_sid: + is_leader = (user_id.decode('utf-8') if isinstance(user_id, bytes) else user_id) == new_leader_id + sio.emit("status", {"isLeader": is_leader}, room=user_sid) + except Exception: + continue + + +def get_current_leader(workflow_id): + """ + Get the current leader for a workflow. + """ + leader_key = f"workflow_leader:{workflow_id}" + leader = redis_client.get(leader_key) + return leader.decode('utf-8') if leader and isinstance(leader, bytes) else leader + + def broadcast_online_users(workflow_id): """ broadcast online users to the workflow room @@ -103,7 +184,15 @@ def broadcast_online_users(workflow_id): users.append(json.loads(user_info_json)) except Exception: continue - sio.emit("online_users", {"workflow_id": workflow_id, "users": users}, room=workflow_id) + + # Get current leader + leader_id = get_current_leader(workflow_id) + + sio.emit("online_users", { + "workflow_id": workflow_id, + "users": users, + "leader": leader_id + }, room=workflow_id) @sio.on("collaboration_event") diff --git a/web/app/components/workflow/collaboration/core/collaboration-manager.ts b/web/app/components/workflow/collaboration/core/collaboration-manager.ts index e491ebc2fd..6d6177cb37 100644 --- a/web/app/components/workflow/collaboration/core/collaboration-manager.ts +++ b/web/app/components/workflow/collaboration/core/collaboration-manager.ts @@ -15,6 +15,8 @@ export class CollaborationManager { private currentAppId: string | null = null private reactFlowStore: any = null private cursors: Record = {} + private isLeader = false + private leaderId: string | null = null init = (appId: string, reactFlowStore: any): void => { if (!reactFlowStore) { @@ -70,6 +72,8 @@ export class CollaborationManager { this.currentAppId = null this.reactFlowStore = null this.cursors = {} + this.isLeader = false + this.leaderId = null this.eventEmitter.removeAllListeners() } @@ -115,6 +119,18 @@ export class CollaborationManager { return this.eventEmitter.on('varsAndFeaturesUpdate', callback) } + onLeaderChange(callback: (isLeader: boolean) => void): () => void { + return this.eventEmitter.on('leaderChange', callback) + } + + getLeaderId(): string | null { + return this.leaderId + } + + getIsLeader(): boolean { + return this.isLeader + } + private syncNodes(oldNodes: Node[], newNodes: Node[]): void { if (!this.nodesMap) return @@ -203,8 +219,6 @@ export class CollaborationManager { socket.on('collaboration_update', (update: any) => { if (update.type === 'mouseMove') { - console.log('Processing mouseMove event:', update) - // Update cursor state for this user this.cursors[update.userId] = { x: update.data.x, @@ -213,8 +227,6 @@ export class CollaborationManager { timestamp: update.timestamp, } - // Emit the complete cursor state - console.log('Emitting complete cursor state:', this.cursors) this.eventEmitter.emit('cursors', { ...this.cursors }) } else if (update.type === 'varsAndFeaturesUpdate') { @@ -223,26 +235,80 @@ export class CollaborationManager { } }) - socket.on('online_users', (data: { users: OnlineUser[] }) => { - const onlineUserIds = new Set(data.users.map(user => user.user_id)) + socket.on('online_users', (data: { users: OnlineUser[]; leader?: string }) => { + try { + if (!data || !Array.isArray(data.users)) { + console.warn('Invalid online_users data structure:', data) + return + } - // Remove cursors for offline users - Object.keys(this.cursors).forEach((userId) => { - if (!onlineUserIds.has(userId)) - delete this.cursors[userId] - }) + const onlineUserIds = new Set(data.users.map((user: OnlineUser) => user.user_id)) - console.log('Updated online users and cleaned offline cursors:', data.users) - this.eventEmitter.emit('onlineUsers', data.users) - this.eventEmitter.emit('cursors', { ...this.cursors }) + // Remove cursors for offline users + Object.keys(this.cursors).forEach((userId) => { + if (!onlineUserIds.has(userId)) + delete this.cursors[userId] + }) + + // Update leader information + if (data.leader && typeof data.leader === 'string') + this.leaderId = data.leader + + console.log('Updated online users and leader info:', { + users: data.users, + leader: data.leader, + currentLeader: this.leaderId, + }) + + this.eventEmitter.emit('onlineUsers', data.users) + this.eventEmitter.emit('cursors', { ...this.cursors }) + } + catch (error) { + console.error('Error processing online_users update:', error) + } + }) + + socket.on('status', (data: any) => { + try { + if (!data || typeof data.isLeader !== 'boolean') { + console.warn('Invalid status data:', data) + return + } + + const wasLeader = this.isLeader + this.isLeader = data.isLeader + + console.log(`Leader status update: ${wasLeader ? 'was' : 'was not'} leader, ${this.isLeader ? 'now is' : 'now is not'} leader`) + + if (wasLeader !== this.isLeader) + this.eventEmitter.emit('leaderChange', this.isLeader) + } + catch (error) { + console.error('Error processing status update:', error) + } }) socket.on('connect', () => { + console.log('WebSocket connected successfully') this.eventEmitter.emit('stateChange', { isConnected: true }) }) - socket.on('disconnect', () => { + socket.on('disconnect', (reason: string) => { + console.log('WebSocket disconnected:', reason) + this.cursors = {} + this.isLeader = false + this.leaderId = null this.eventEmitter.emit('stateChange', { isConnected: false }) + this.eventEmitter.emit('cursors', {}) + }) + + socket.on('connect_error', (error: any) => { + console.error('WebSocket connection error:', error) + this.eventEmitter.emit('stateChange', { isConnected: false, error: error.message }) + }) + + socket.on('error', (error: any) => { + console.error('WebSocket error:', error) }) } } diff --git a/web/app/components/workflow/collaboration/hooks/use-collaboration.ts b/web/app/components/workflow/collaboration/hooks/use-collaboration.ts index bd570508a9..8b6fc9ae77 100644 --- a/web/app/components/workflow/collaboration/hooks/use-collaboration.ts +++ b/web/app/components/workflow/collaboration/hooks/use-collaboration.ts @@ -4,10 +4,11 @@ import { CursorService } from '../services/cursor-service' import type { CollaborationState } from '../types/collaboration' export function useCollaboration(appId: string, reactFlowStore?: any) { - const [state, setState] = useState>({ + const [state, setState] = useState>({ isConnected: false, onlineUsers: [], cursors: {}, + isLeader: false, }) const cursorServiceRef = useRef(null) @@ -35,7 +36,6 @@ export function useCollaboration(appId: string, reactFlowStore?: any) { }) const unsubscribeCursors = collaborationManager.onCursorUpdate((cursors: any) => { - console.log('Cursor update received:', cursors) setState((prev: any) => ({ ...prev, cursors })) }) @@ -44,10 +44,16 @@ export function useCollaboration(appId: string, reactFlowStore?: any) { setState((prev: any) => ({ ...prev, onlineUsers: users })) }) + const unsubscribeLeaderChange = collaborationManager.onLeaderChange((isLeader: boolean) => { + console.log('Leader status changed:', isLeader) + setState((prev: any) => ({ ...prev, isLeader })) + }) + return () => { unsubscribeStateChange() unsubscribeCursors() unsubscribeUsers() + unsubscribeLeaderChange() cursorServiceRef.current?.stopTracking() collaborationManager.disconnect() } @@ -69,6 +75,8 @@ export function useCollaboration(appId: string, reactFlowStore?: any) { isConnected: state.isConnected || false, onlineUsers: state.onlineUsers || [], cursors: state.cursors || {}, + isLeader: state.isLeader || false, + leaderId: collaborationManager.getLeaderId(), startCursorTracking, stopCursorTracking, } diff --git a/web/app/components/workflow/collaboration/test/leader-test.tsx b/web/app/components/workflow/collaboration/test/leader-test.tsx new file mode 100644 index 0000000000..b6115b42b8 --- /dev/null +++ b/web/app/components/workflow/collaboration/test/leader-test.tsx @@ -0,0 +1,61 @@ +import React from 'react' +import { useCollaboration } from '../hooks/use-collaboration' + +type LeaderTestProps = { + appId: string +} + +export function LeaderTest({ appId }: LeaderTestProps) { + const { isConnected, isLeader, leaderId, onlineUsers } = useCollaboration(appId) + + return ( +
+

Leader Election Test

+ +
+
+ Connection: + + {isConnected ? 'Connected' : 'Disconnected'} + +
+ +
+ I am Leader: + + {isLeader ? 'YES' : 'NO'} + +
+ +
+ Current Leader ID: + + {leaderId || 'None'} + +
+ +
+ Online Users ({onlineUsers.length}): +
+ {onlineUsers.map((user: any) => ( +
+ + {user.user_id} + ({user.username}) + {user.user_id === leaderId && ( + 👑 Leader + )} +
+ ))} +
+
+
+
+ ) +}