mirror of
https://github.com/langgenius/dify.git
synced 2026-04-13 22:57:26 +08:00
when new user connected should rebroadcast the graph data
This commit is contained in:
parent
23fec75c90
commit
8583992d23
@ -35,6 +35,7 @@ export class CollaborationManager {
|
||||
private nodePanelPresence: NodePanelPresenceMap = {}
|
||||
private activeConnections = new Set<string>()
|
||||
private isUndoRedoInProgress = false
|
||||
private pendingInitialSync = false
|
||||
|
||||
private getNodePanelPresenceSnapshot(): NodePanelPresenceMap {
|
||||
const snapshot: NodePanelPresenceMap = {}
|
||||
@ -741,6 +742,8 @@ export class CollaborationManager {
|
||||
.map(node => node.id),
|
||||
)
|
||||
|
||||
this.pendingInitialSync = false
|
||||
|
||||
const updatedNodes = Array
|
||||
.from(this.nodesMap.values())
|
||||
.map((node: Node) => {
|
||||
@ -790,6 +793,8 @@ export class CollaborationManager {
|
||||
const updatedEdges = Array.from(this.edgesMap.values())
|
||||
console.log('Updating React edges from subscription')
|
||||
|
||||
this.pendingInitialSync = false
|
||||
|
||||
// Call ReactFlow's native setter directly to avoid triggering collaboration
|
||||
state.setEdges(updatedEdges)
|
||||
})
|
||||
@ -852,6 +857,11 @@ export class CollaborationManager {
|
||||
this.eventEmitter.emit('syncRequest', {})
|
||||
}
|
||||
}
|
||||
else if (update.type === 'graph_resync_request') {
|
||||
console.log('Received graph resync request from collaborator')
|
||||
if (this.isLeader)
|
||||
this.broadcastCurrentGraph()
|
||||
}
|
||||
})
|
||||
|
||||
socket.on('online_users', (data: { users: OnlineUser[]; leader?: string }) => {
|
||||
@ -898,6 +908,11 @@ export class CollaborationManager {
|
||||
const wasLeader = this.isLeader
|
||||
this.isLeader = data.isLeader
|
||||
|
||||
if (this.isLeader)
|
||||
this.pendingInitialSync = false
|
||||
else
|
||||
this.requestInitialSyncIfNeeded()
|
||||
|
||||
if (wasLeader !== this.isLeader)
|
||||
this.eventEmitter.emit('leaderChange', this.isLeader)
|
||||
}
|
||||
@ -912,6 +927,10 @@ export class CollaborationManager {
|
||||
console.log(`Collaboration: I am now the ${this.isLeader ? 'Leader' : 'Follower'}.`)
|
||||
this.eventEmitter.emit('leaderChange', this.isLeader)
|
||||
}
|
||||
if (this.isLeader)
|
||||
this.pendingInitialSync = false
|
||||
else
|
||||
this.requestInitialSyncIfNeeded()
|
||||
})
|
||||
|
||||
socket.on('status', (data: { isLeader: boolean }) => {
|
||||
@ -920,11 +939,16 @@ export class CollaborationManager {
|
||||
console.log(`Collaboration: I am now the ${this.isLeader ? 'Leader' : 'Follower'}.`)
|
||||
this.eventEmitter.emit('leaderChange', this.isLeader)
|
||||
}
|
||||
if (this.isLeader)
|
||||
this.pendingInitialSync = false
|
||||
else
|
||||
this.requestInitialSyncIfNeeded()
|
||||
})
|
||||
|
||||
socket.on('connect', () => {
|
||||
console.log('WebSocket connected successfully')
|
||||
this.eventEmitter.emit('stateChange', { isConnected: true })
|
||||
this.pendingInitialSync = true
|
||||
})
|
||||
|
||||
socket.on('disconnect', (reason: string) => {
|
||||
@ -932,6 +956,7 @@ export class CollaborationManager {
|
||||
this.cursors = {}
|
||||
this.isLeader = false
|
||||
this.leaderId = null
|
||||
this.pendingInitialSync = false
|
||||
this.eventEmitter.emit('stateChange', { isConnected: false })
|
||||
this.eventEmitter.emit('cursors', {})
|
||||
})
|
||||
@ -945,6 +970,49 @@ export class CollaborationManager {
|
||||
console.error('WebSocket error:', error)
|
||||
})
|
||||
}
|
||||
|
||||
// We currently only relay CRDT updates; the server doesn't persist them.
|
||||
// When a follower joins mid-session, it might miss earlier broadcasts and render stale data.
|
||||
// This lightweight checkpoint asks the leader to rebroadcast the latest graph snapshot once.
|
||||
private requestInitialSyncIfNeeded(): void {
|
||||
if (!this.pendingInitialSync) return
|
||||
if (this.isLeader) {
|
||||
this.pendingInitialSync = false
|
||||
return
|
||||
}
|
||||
|
||||
this.emitGraphResyncRequest()
|
||||
this.pendingInitialSync = false
|
||||
}
|
||||
|
||||
private emitGraphResyncRequest(): void {
|
||||
if (!this.currentAppId || !webSocketClient.isConnected(this.currentAppId)) return
|
||||
|
||||
const socket = webSocketClient.getSocket(this.currentAppId)
|
||||
if (!socket) return
|
||||
|
||||
socket.emit('collaboration_event', {
|
||||
type: 'graph_resync_request',
|
||||
data: { timestamp: Date.now() },
|
||||
timestamp: Date.now(),
|
||||
})
|
||||
}
|
||||
|
||||
private broadcastCurrentGraph(): void {
|
||||
if (!this.currentAppId || !webSocketClient.isConnected(this.currentAppId)) return
|
||||
if (!this.doc) return
|
||||
|
||||
const socket = webSocketClient.getSocket(this.currentAppId)
|
||||
if (!socket) return
|
||||
|
||||
try {
|
||||
const snapshot = this.doc.export({ mode: 'snapshot' })
|
||||
socket.emit('graph_event', snapshot)
|
||||
}
|
||||
catch (error) {
|
||||
console.error('Failed to broadcast graph snapshot:', error)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
export const collaborationManager = new CollaborationManager()
|
||||
|
||||
Loading…
Reference in New Issue
Block a user