From 8583992d23ead053ae8903de456e5a8b93878013 Mon Sep 17 00:00:00 2001 From: hjlarry Date: Tue, 14 Oct 2025 16:57:02 +0800 Subject: [PATCH] when new user connected should rebroadcast the graph data --- .../core/collaboration-manager.ts | 68 +++++++++++++++++++ 1 file changed, 68 insertions(+) diff --git a/web/app/components/workflow/collaboration/core/collaboration-manager.ts b/web/app/components/workflow/collaboration/core/collaboration-manager.ts index 1cb8a8afe8..fd784ea57c 100644 --- a/web/app/components/workflow/collaboration/core/collaboration-manager.ts +++ b/web/app/components/workflow/collaboration/core/collaboration-manager.ts @@ -35,6 +35,7 @@ export class CollaborationManager { private nodePanelPresence: NodePanelPresenceMap = {} private activeConnections = new Set() private isUndoRedoInProgress = false + private pendingInitialSync = false private getNodePanelPresenceSnapshot(): NodePanelPresenceMap { const snapshot: NodePanelPresenceMap = {} @@ -741,6 +742,8 @@ export class CollaborationManager { .map(node => node.id), ) + this.pendingInitialSync = false + const updatedNodes = Array .from(this.nodesMap.values()) .map((node: Node) => { @@ -790,6 +793,8 @@ export class CollaborationManager { const updatedEdges = Array.from(this.edgesMap.values()) console.log('Updating React edges from subscription') + this.pendingInitialSync = false + // Call ReactFlow's native setter directly to avoid triggering collaboration state.setEdges(updatedEdges) }) @@ -852,6 +857,11 @@ export class CollaborationManager { this.eventEmitter.emit('syncRequest', {}) } } + else if (update.type === 'graph_resync_request') { + console.log('Received graph resync request from collaborator') + if (this.isLeader) + this.broadcastCurrentGraph() + } }) socket.on('online_users', (data: { users: OnlineUser[]; leader?: string }) => { @@ -898,6 +908,11 @@ export class CollaborationManager { const wasLeader = this.isLeader this.isLeader = data.isLeader + if (this.isLeader) + this.pendingInitialSync = false + else + this.requestInitialSyncIfNeeded() + if (wasLeader !== this.isLeader) this.eventEmitter.emit('leaderChange', this.isLeader) } @@ -912,6 +927,10 @@ export class CollaborationManager { console.log(`Collaboration: I am now the ${this.isLeader ? 'Leader' : 'Follower'}.`) this.eventEmitter.emit('leaderChange', this.isLeader) } + if (this.isLeader) + this.pendingInitialSync = false + else + this.requestInitialSyncIfNeeded() }) socket.on('status', (data: { isLeader: boolean }) => { @@ -920,11 +939,16 @@ export class CollaborationManager { console.log(`Collaboration: I am now the ${this.isLeader ? 'Leader' : 'Follower'}.`) this.eventEmitter.emit('leaderChange', this.isLeader) } + if (this.isLeader) + this.pendingInitialSync = false + else + this.requestInitialSyncIfNeeded() }) socket.on('connect', () => { console.log('WebSocket connected successfully') this.eventEmitter.emit('stateChange', { isConnected: true }) + this.pendingInitialSync = true }) socket.on('disconnect', (reason: string) => { @@ -932,6 +956,7 @@ export class CollaborationManager { this.cursors = {} this.isLeader = false this.leaderId = null + this.pendingInitialSync = false this.eventEmitter.emit('stateChange', { isConnected: false }) this.eventEmitter.emit('cursors', {}) }) @@ -945,6 +970,49 @@ export class CollaborationManager { console.error('WebSocket error:', error) }) } + + // We currently only relay CRDT updates; the server doesn't persist them. + // When a follower joins mid-session, it might miss earlier broadcasts and render stale data. + // This lightweight checkpoint asks the leader to rebroadcast the latest graph snapshot once. + private requestInitialSyncIfNeeded(): void { + if (!this.pendingInitialSync) return + if (this.isLeader) { + this.pendingInitialSync = false + return + } + + this.emitGraphResyncRequest() + this.pendingInitialSync = false + } + + private emitGraphResyncRequest(): void { + if (!this.currentAppId || !webSocketClient.isConnected(this.currentAppId)) return + + const socket = webSocketClient.getSocket(this.currentAppId) + if (!socket) return + + socket.emit('collaboration_event', { + type: 'graph_resync_request', + data: { timestamp: Date.now() }, + timestamp: Date.now(), + }) + } + + private broadcastCurrentGraph(): void { + if (!this.currentAppId || !webSocketClient.isConnected(this.currentAppId)) return + if (!this.doc) return + + const socket = webSocketClient.getSocket(this.currentAppId) + if (!socket) return + + try { + const snapshot = this.doc.export({ mode: 'snapshot' }) + socket.emit('graph_event', snapshot) + } + catch (error) { + console.error('Failed to broadcast graph snapshot:', error) + } + } } export const collaborationManager = new CollaborationManager()