mirror of https://github.com/langgenius/dify.git
add Leader election
This commit is contained in:
parent
72037a1865
commit
7dc8557033
|
|
@ -70,10 +70,17 @@ def handle_user_connect(sid, data):
|
|||
redis_client.hset(f"workflow_online_users:{workflow_id}", user_id, json.dumps(user_info))
|
||||
redis_client.set(f"ws_sid_map:{sid}", json.dumps({"workflow_id": workflow_id, "user_id": user_id}))
|
||||
|
||||
# Leader election: first user becomes the leader
|
||||
leader_id = get_or_set_leader(workflow_id, user_id)
|
||||
is_leader = leader_id == user_id
|
||||
|
||||
sio.enter_room(sid, workflow_id)
|
||||
broadcast_online_users(workflow_id)
|
||||
|
||||
# Notify user of their status
|
||||
sio.emit("status", {"isLeader": is_leader}, room=sid)
|
||||
|
||||
return {"msg": "connected", "user_id": user_id, "sid": sid}
|
||||
return {"msg": "connected", "user_id": user_id, "sid": sid, "isLeader": is_leader}
|
||||
|
||||
|
||||
@sio.on("disconnect")
|
||||
|
|
@ -89,9 +96,83 @@ def handle_disconnect(sid):
|
|||
redis_client.hdel(f"workflow_online_users:{workflow_id}", user_id)
|
||||
redis_client.delete(f"ws_sid_map:{sid}")
|
||||
|
||||
# Handle leader re-election if the leader disconnected
|
||||
handle_leader_disconnect(workflow_id, user_id)
|
||||
|
||||
broadcast_online_users(workflow_id)
|
||||
|
||||
|
||||
def get_or_set_leader(workflow_id, user_id):
|
||||
"""
|
||||
Get current leader or set the user as leader if no leader exists.
|
||||
Returns the leader user_id.
|
||||
"""
|
||||
leader_key = f"workflow_leader:{workflow_id}"
|
||||
current_leader = redis_client.get(leader_key)
|
||||
|
||||
if not current_leader:
|
||||
# No leader exists, make this user the leader
|
||||
redis_client.set(leader_key, user_id, ex=3600) # Expire in 1 hour
|
||||
return user_id
|
||||
|
||||
return current_leader.decode('utf-8') if isinstance(current_leader, bytes) else current_leader
|
||||
|
||||
|
||||
def handle_leader_disconnect(workflow_id, disconnected_user_id):
|
||||
"""
|
||||
Handle leader re-election when a user disconnects.
|
||||
"""
|
||||
leader_key = f"workflow_leader:{workflow_id}"
|
||||
current_leader = redis_client.get(leader_key)
|
||||
|
||||
if current_leader:
|
||||
current_leader = current_leader.decode('utf-8') if isinstance(current_leader, bytes) else current_leader
|
||||
|
||||
if current_leader == disconnected_user_id:
|
||||
# Leader disconnected, elect a new leader
|
||||
users_json = redis_client.hgetall(f"workflow_online_users:{workflow_id}")
|
||||
|
||||
if users_json:
|
||||
# Get the first remaining user as new leader
|
||||
new_leader_id = list(users_json.keys())[0]
|
||||
if isinstance(new_leader_id, bytes):
|
||||
new_leader_id = new_leader_id.decode('utf-8')
|
||||
|
||||
redis_client.set(leader_key, new_leader_id, ex=3600)
|
||||
|
||||
# Notify all users about the new leader
|
||||
broadcast_leader_change(workflow_id, new_leader_id)
|
||||
else:
|
||||
# No users left, remove leader
|
||||
redis_client.delete(leader_key)
|
||||
|
||||
|
||||
def broadcast_leader_change(workflow_id, new_leader_id):
|
||||
"""
|
||||
Broadcast leader change to all users in the workflow.
|
||||
"""
|
||||
users_json = redis_client.hgetall(f"workflow_online_users:{workflow_id}")
|
||||
|
||||
for user_id, user_info_json in users_json.items():
|
||||
try:
|
||||
user_info = json.loads(user_info_json)
|
||||
user_sid = user_info.get("sid")
|
||||
if user_sid:
|
||||
is_leader = (user_id.decode('utf-8') if isinstance(user_id, bytes) else user_id) == new_leader_id
|
||||
sio.emit("status", {"isLeader": is_leader}, room=user_sid)
|
||||
except Exception:
|
||||
continue
|
||||
|
||||
|
||||
def get_current_leader(workflow_id):
|
||||
"""
|
||||
Get the current leader for a workflow.
|
||||
"""
|
||||
leader_key = f"workflow_leader:{workflow_id}"
|
||||
leader = redis_client.get(leader_key)
|
||||
return leader.decode('utf-8') if leader and isinstance(leader, bytes) else leader
|
||||
|
||||
|
||||
def broadcast_online_users(workflow_id):
|
||||
"""
|
||||
broadcast online users to the workflow room
|
||||
|
|
@ -103,7 +184,15 @@ def broadcast_online_users(workflow_id):
|
|||
users.append(json.loads(user_info_json))
|
||||
except Exception:
|
||||
continue
|
||||
sio.emit("online_users", {"workflow_id": workflow_id, "users": users}, room=workflow_id)
|
||||
|
||||
# Get current leader
|
||||
leader_id = get_current_leader(workflow_id)
|
||||
|
||||
sio.emit("online_users", {
|
||||
"workflow_id": workflow_id,
|
||||
"users": users,
|
||||
"leader": leader_id
|
||||
}, room=workflow_id)
|
||||
|
||||
|
||||
@sio.on("collaboration_event")
|
||||
|
|
|
|||
|
|
@ -15,6 +15,8 @@ export class CollaborationManager {
|
|||
private currentAppId: string | null = null
|
||||
private reactFlowStore: any = null
|
||||
private cursors: Record<string, CursorPosition> = {}
|
||||
private isLeader = false
|
||||
private leaderId: string | null = null
|
||||
|
||||
init = (appId: string, reactFlowStore: any): void => {
|
||||
if (!reactFlowStore) {
|
||||
|
|
@ -70,6 +72,8 @@ export class CollaborationManager {
|
|||
this.currentAppId = null
|
||||
this.reactFlowStore = null
|
||||
this.cursors = {}
|
||||
this.isLeader = false
|
||||
this.leaderId = null
|
||||
this.eventEmitter.removeAllListeners()
|
||||
}
|
||||
|
||||
|
|
@ -115,6 +119,18 @@ export class CollaborationManager {
|
|||
return this.eventEmitter.on('varsAndFeaturesUpdate', callback)
|
||||
}
|
||||
|
||||
onLeaderChange(callback: (isLeader: boolean) => void): () => void {
|
||||
return this.eventEmitter.on('leaderChange', callback)
|
||||
}
|
||||
|
||||
getLeaderId(): string | null {
|
||||
return this.leaderId
|
||||
}
|
||||
|
||||
getIsLeader(): boolean {
|
||||
return this.isLeader
|
||||
}
|
||||
|
||||
private syncNodes(oldNodes: Node[], newNodes: Node[]): void {
|
||||
if (!this.nodesMap) return
|
||||
|
||||
|
|
@ -203,8 +219,6 @@ export class CollaborationManager {
|
|||
|
||||
socket.on('collaboration_update', (update: any) => {
|
||||
if (update.type === 'mouseMove') {
|
||||
console.log('Processing mouseMove event:', update)
|
||||
|
||||
// Update cursor state for this user
|
||||
this.cursors[update.userId] = {
|
||||
x: update.data.x,
|
||||
|
|
@ -213,8 +227,6 @@ export class CollaborationManager {
|
|||
timestamp: update.timestamp,
|
||||
}
|
||||
|
||||
// Emit the complete cursor state
|
||||
console.log('Emitting complete cursor state:', this.cursors)
|
||||
this.eventEmitter.emit('cursors', { ...this.cursors })
|
||||
}
|
||||
else if (update.type === 'varsAndFeaturesUpdate') {
|
||||
|
|
@ -223,26 +235,80 @@ export class CollaborationManager {
|
|||
}
|
||||
})
|
||||
|
||||
socket.on('online_users', (data: { users: OnlineUser[] }) => {
|
||||
const onlineUserIds = new Set(data.users.map(user => user.user_id))
|
||||
socket.on('online_users', (data: { users: OnlineUser[]; leader?: string }) => {
|
||||
try {
|
||||
if (!data || !Array.isArray(data.users)) {
|
||||
console.warn('Invalid online_users data structure:', data)
|
||||
return
|
||||
}
|
||||
|
||||
// Remove cursors for offline users
|
||||
Object.keys(this.cursors).forEach((userId) => {
|
||||
if (!onlineUserIds.has(userId))
|
||||
delete this.cursors[userId]
|
||||
})
|
||||
const onlineUserIds = new Set(data.users.map((user: OnlineUser) => user.user_id))
|
||||
|
||||
console.log('Updated online users and cleaned offline cursors:', data.users)
|
||||
this.eventEmitter.emit('onlineUsers', data.users)
|
||||
this.eventEmitter.emit('cursors', { ...this.cursors })
|
||||
// Remove cursors for offline users
|
||||
Object.keys(this.cursors).forEach((userId) => {
|
||||
if (!onlineUserIds.has(userId))
|
||||
delete this.cursors[userId]
|
||||
})
|
||||
|
||||
// Update leader information
|
||||
if (data.leader && typeof data.leader === 'string')
|
||||
this.leaderId = data.leader
|
||||
|
||||
console.log('Updated online users and leader info:', {
|
||||
users: data.users,
|
||||
leader: data.leader,
|
||||
currentLeader: this.leaderId,
|
||||
})
|
||||
|
||||
this.eventEmitter.emit('onlineUsers', data.users)
|
||||
this.eventEmitter.emit('cursors', { ...this.cursors })
|
||||
}
|
||||
catch (error) {
|
||||
console.error('Error processing online_users update:', error)
|
||||
}
|
||||
})
|
||||
|
||||
socket.on('status', (data: any) => {
|
||||
try {
|
||||
if (!data || typeof data.isLeader !== 'boolean') {
|
||||
console.warn('Invalid status data:', data)
|
||||
return
|
||||
}
|
||||
|
||||
const wasLeader = this.isLeader
|
||||
this.isLeader = data.isLeader
|
||||
|
||||
console.log(`Leader status update: ${wasLeader ? 'was' : 'was not'} leader, ${this.isLeader ? 'now is' : 'now is not'} leader`)
|
||||
|
||||
if (wasLeader !== this.isLeader)
|
||||
this.eventEmitter.emit('leaderChange', this.isLeader)
|
||||
}
|
||||
catch (error) {
|
||||
console.error('Error processing status update:', error)
|
||||
}
|
||||
})
|
||||
|
||||
socket.on('connect', () => {
|
||||
console.log('WebSocket connected successfully')
|
||||
this.eventEmitter.emit('stateChange', { isConnected: true })
|
||||
})
|
||||
|
||||
socket.on('disconnect', () => {
|
||||
socket.on('disconnect', (reason: string) => {
|
||||
console.log('WebSocket disconnected:', reason)
|
||||
this.cursors = {}
|
||||
this.isLeader = false
|
||||
this.leaderId = null
|
||||
this.eventEmitter.emit('stateChange', { isConnected: false })
|
||||
this.eventEmitter.emit('cursors', {})
|
||||
})
|
||||
|
||||
socket.on('connect_error', (error: any) => {
|
||||
console.error('WebSocket connection error:', error)
|
||||
this.eventEmitter.emit('stateChange', { isConnected: false, error: error.message })
|
||||
})
|
||||
|
||||
socket.on('error', (error: any) => {
|
||||
console.error('WebSocket error:', error)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -4,10 +4,11 @@ import { CursorService } from '../services/cursor-service'
|
|||
import type { CollaborationState } from '../types/collaboration'
|
||||
|
||||
export function useCollaboration(appId: string, reactFlowStore?: any) {
|
||||
const [state, setState] = useState<Partial<CollaborationState>>({
|
||||
const [state, setState] = useState<Partial<CollaborationState & { isLeader: boolean }>>({
|
||||
isConnected: false,
|
||||
onlineUsers: [],
|
||||
cursors: {},
|
||||
isLeader: false,
|
||||
})
|
||||
|
||||
const cursorServiceRef = useRef<CursorService | null>(null)
|
||||
|
|
@ -35,7 +36,6 @@ export function useCollaboration(appId: string, reactFlowStore?: any) {
|
|||
})
|
||||
|
||||
const unsubscribeCursors = collaborationManager.onCursorUpdate((cursors: any) => {
|
||||
console.log('Cursor update received:', cursors)
|
||||
setState((prev: any) => ({ ...prev, cursors }))
|
||||
})
|
||||
|
||||
|
|
@ -44,10 +44,16 @@ export function useCollaboration(appId: string, reactFlowStore?: any) {
|
|||
setState((prev: any) => ({ ...prev, onlineUsers: users }))
|
||||
})
|
||||
|
||||
const unsubscribeLeaderChange = collaborationManager.onLeaderChange((isLeader: boolean) => {
|
||||
console.log('Leader status changed:', isLeader)
|
||||
setState((prev: any) => ({ ...prev, isLeader }))
|
||||
})
|
||||
|
||||
return () => {
|
||||
unsubscribeStateChange()
|
||||
unsubscribeCursors()
|
||||
unsubscribeUsers()
|
||||
unsubscribeLeaderChange()
|
||||
cursorServiceRef.current?.stopTracking()
|
||||
collaborationManager.disconnect()
|
||||
}
|
||||
|
|
@ -69,6 +75,8 @@ export function useCollaboration(appId: string, reactFlowStore?: any) {
|
|||
isConnected: state.isConnected || false,
|
||||
onlineUsers: state.onlineUsers || [],
|
||||
cursors: state.cursors || {},
|
||||
isLeader: state.isLeader || false,
|
||||
leaderId: collaborationManager.getLeaderId(),
|
||||
startCursorTracking,
|
||||
stopCursorTracking,
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,61 @@
|
|||
import React from 'react'
|
||||
import { useCollaboration } from '../hooks/use-collaboration'
|
||||
|
||||
type LeaderTestProps = {
|
||||
appId: string
|
||||
}
|
||||
|
||||
export function LeaderTest({ appId }: LeaderTestProps) {
|
||||
const { isConnected, isLeader, leaderId, onlineUsers } = useCollaboration(appId)
|
||||
|
||||
return (
|
||||
<div className="rounded-lg border bg-gray-50 p-4">
|
||||
<h3 className="mb-4 text-lg font-semibold">Leader Election Test</h3>
|
||||
|
||||
<div className="space-y-2">
|
||||
<div className="flex items-center space-x-2">
|
||||
<span className="font-medium">Connection:</span>
|
||||
<span className={`rounded px-2 py-1 text-sm ${
|
||||
isConnected ? 'bg-green-100 text-green-800' : 'bg-red-100 text-red-800'
|
||||
}`}>
|
||||
{isConnected ? 'Connected' : 'Disconnected'}
|
||||
</span>
|
||||
</div>
|
||||
|
||||
<div className="flex items-center space-x-2">
|
||||
<span className="font-medium">I am Leader:</span>
|
||||
<span className={`rounded px-2 py-1 text-sm ${
|
||||
isLeader ? 'bg-blue-100 text-blue-800' : 'bg-gray-100 text-gray-800'
|
||||
}`}>
|
||||
{isLeader ? 'YES' : 'NO'}
|
||||
</span>
|
||||
</div>
|
||||
|
||||
<div className="flex items-center space-x-2">
|
||||
<span className="font-medium">Current Leader ID:</span>
|
||||
<span className="rounded bg-gray-100 px-2 py-1 font-mono text-sm">
|
||||
{leaderId || 'None'}
|
||||
</span>
|
||||
</div>
|
||||
|
||||
<div className="mt-4">
|
||||
<span className="font-medium">Online Users ({onlineUsers.length}):</span>
|
||||
<div className="mt-2 space-y-1">
|
||||
{onlineUsers.map((user: any) => (
|
||||
<div key={user.user_id} className="flex items-center space-x-2 text-sm">
|
||||
<span className={`h-2 w-2 rounded-full ${
|
||||
user.user_id === leaderId ? 'bg-blue-500' : 'bg-green-500'
|
||||
}`} />
|
||||
<span className="font-mono">{user.user_id}</span>
|
||||
<span>({user.username})</span>
|
||||
{user.user_id === leaderId && (
|
||||
<span className="font-medium text-blue-600">👑 Leader</span>
|
||||
)}
|
||||
</div>
|
||||
))}
|
||||
</div>
|
||||
</div>
|
||||
</div>
|
||||
</div>
|
||||
)
|
||||
}
|
||||
Loading…
Reference in New Issue