diff --git a/web/app/components/workflow-app/hooks/use-nodes-sync-draft.ts b/web/app/components/workflow-app/hooks/use-nodes-sync-draft.ts index 312d38a259..c38e60cf97 100644 --- a/web/app/components/workflow-app/hooks/use-nodes-sync-draft.ts +++ b/web/app/components/workflow-app/hooks/use-nodes-sync-draft.ts @@ -13,6 +13,7 @@ import { syncWorkflowDraft } from '@/service/workflow' import { useFeaturesStore } from '@/app/components/base/features/hooks' import { API_PREFIX } from '@/config' import { useWorkflowRefreshDraft } from '.' +import { collaborationManager } from '@/app/components/workflow/collaboration/core/collaboration-manager' export const useNodesSyncDraft = () => { const store = useStoreApi() @@ -93,9 +94,20 @@ export const useNodesSyncDraft = () => { const syncWorkflowDraftWhenPageClose = useCallback(() => { if (getNodesReadOnly()) return + + // Check leader status at sync time + const currentIsLeader = collaborationManager.getIsLeader() + + // Only allow leader to sync data + if (!currentIsLeader) { + console.log('Not leader, skipping sync on page close') + return + } + const postParams = getPostParams() if (postParams) { + console.log('Leader syncing workflow draft on page close') navigator.sendBeacon( `${API_PREFIX}/apps/${params.appId}/workflows/draft?_token=${localStorage.getItem('console_token')}`, JSON.stringify(postParams.params), @@ -113,6 +125,18 @@ export const useNodesSyncDraft = () => { ) => { if (getNodesReadOnly()) return + + // Check leader status at sync time + const currentIsLeader = collaborationManager.getIsLeader() + + // Only allow leader to sync data + if (!currentIsLeader) { + console.log('Not leader, skipping workflow draft sync') + callback?.onSettled?.() + return + } + + console.log('Leader performing workflow draft sync') const postParams = getPostParams() if (postParams) { @@ -124,9 +148,11 @@ export const useNodesSyncDraft = () => { const res = await syncWorkflowDraft(postParams) setSyncWorkflowDraftHash(res.hash) setDraftUpdatedAt(res.updated_at) + console.log('Leader successfully synced workflow draft') callback?.onSuccess && callback.onSuccess() } catch (error: any) { + console.error('Leader failed to sync workflow draft:', error) if (error && error.json && !error.bodyUsed) { error.json().then((err: any) => { if (err.code === 'draft_workflow_not_sync' && !notRefreshWhenSyncError) diff --git a/web/app/components/workflow/collaboration/core/collaboration-manager.ts b/web/app/components/workflow/collaboration/core/collaboration-manager.ts index 6d6177cb37..a0075ecb1f 100644 --- a/web/app/components/workflow/collaboration/core/collaboration-manager.ts +++ b/web/app/components/workflow/collaboration/core/collaboration-manager.ts @@ -17,6 +17,7 @@ export class CollaborationManager { private cursors: Record = {} private isLeader = false private leaderId: string | null = null + private activeConnections = new Set() init = (appId: string, reactFlowStore: any): void => { if (!reactFlowStore) { @@ -42,25 +43,57 @@ export class CollaborationManager { this.disconnect() } - async connect(appId: string, reactFlowStore: any): Promise { - if (this.currentAppId === appId && this.doc) return + async connect(appId: string, reactFlowStore?: any): Promise { + const connectionId = Math.random().toString(36).substring(2, 11) - this.disconnect() + this.activeConnections.add(connectionId) + + if (this.currentAppId === appId && this.doc) { + // Already connected to the same app, only update store if provided and we don't have one + if (reactFlowStore && !this.reactFlowStore) + this.reactFlowStore = reactFlowStore + + return connectionId + } + + // Only disconnect if switching to a different app + if (this.currentAppId && this.currentAppId !== appId) + this.forceDisconnect() this.currentAppId = appId - this.reactFlowStore = reactFlowStore + // Only set store if provided + if (reactFlowStore) + this.reactFlowStore = reactFlowStore const socket = webSocketClient.connect(appId) + + // Setup event listeners BEFORE any other operations + this.setupSocketEventListeners(socket) + this.doc = new LoroDoc() this.nodesMap = this.doc.getMap('nodes') this.edgesMap = this.doc.getMap('edges') this.provider = new CRDTProvider(socket, this.doc) this.setupSubscriptions() - this.setupSocketEventListeners(socket) + + // Force user_connect if already connected + if (socket.connected) + socket.emit('user_connect', { workflow_id: appId }) + + return connectionId } - disconnect = (): void => { + disconnect = (connectionId?: string): void => { + if (connectionId) + this.activeConnections.delete(connectionId) + + // Only disconnect when no more connections + if (this.activeConnections.size === 0) + this.forceDisconnect() + } + + private forceDisconnect = (): void => { if (this.currentAppId) webSocketClient.disconnect(this.currentAppId) @@ -72,8 +105,16 @@ export class CollaborationManager { this.currentAppId = null this.reactFlowStore = null this.cursors = {} + + // Only reset leader status when actually disconnecting + const wasLeader = this.isLeader this.isLeader = false this.leaderId = null + + if (wasLeader) + this.eventEmitter.emit('leaderChange', false) + + this.activeConnections.clear() this.eventEmitter.removeAllListeners() } @@ -131,6 +172,17 @@ export class CollaborationManager { return this.isLeader } + debugLeaderStatus(): void { + console.log('=== Leader Status Debug ===') + console.log('Current leader status:', this.isLeader) + console.log('Current leader ID:', this.leaderId) + console.log('Active connections:', this.activeConnections.size) + console.log('Connected:', this.isConnected()) + console.log('Current app ID:', this.currentAppId) + console.log('Has ReactFlow store:', !!this.reactFlowStore) + console.log('========================') + } + private syncNodes(oldNodes: Node[], newNodes: Node[]): void { if (!this.nodesMap) return @@ -254,12 +306,6 @@ export class CollaborationManager { if (data.leader && typeof data.leader === 'string') this.leaderId = data.leader - console.log('Updated online users and leader info:', { - users: data.users, - leader: data.leader, - currentLeader: this.leaderId, - }) - this.eventEmitter.emit('onlineUsers', data.users) this.eventEmitter.emit('cursors', { ...this.cursors }) } @@ -278,8 +324,6 @@ export class CollaborationManager { const wasLeader = this.isLeader this.isLeader = data.isLeader - console.log(`Leader status update: ${wasLeader ? 'was' : 'was not'} leader, ${this.isLeader ? 'now is' : 'now is not'} leader`) - if (wasLeader !== this.isLeader) this.eventEmitter.emit('leaderChange', this.isLeader) } diff --git a/web/app/components/workflow/collaboration/hooks/use-collaboration.ts b/web/app/components/workflow/collaboration/hooks/use-collaboration.ts index 8b6fc9ae77..10fd1c7444 100644 --- a/web/app/components/workflow/collaboration/hooks/use-collaboration.ts +++ b/web/app/components/workflow/collaboration/hooks/use-collaboration.ts @@ -16,6 +16,8 @@ export function useCollaboration(appId: string, reactFlowStore?: any) { useEffect(() => { if (!appId) return + let connectionId: string | null = null + if (!cursorServiceRef.current) { cursorServiceRef.current = new CursorService({ minMoveDistance: 10, @@ -24,7 +26,7 @@ export function useCollaboration(appId: string, reactFlowStore?: any) { } const initCollaboration = async () => { - await collaborationManager.connect(appId, reactFlowStore) + connectionId = await collaborationManager.connect(appId, reactFlowStore) setState((prev: any) => ({ ...prev, appId, isConnected: collaborationManager.isConnected() })) } @@ -55,7 +57,8 @@ export function useCollaboration(appId: string, reactFlowStore?: any) { unsubscribeUsers() unsubscribeLeaderChange() cursorServiceRef.current?.stopTracking() - collaborationManager.disconnect() + if (connectionId) + collaborationManager.disconnect(connectionId) } }, [appId, reactFlowStore]) @@ -71,7 +74,7 @@ export function useCollaboration(appId: string, reactFlowStore?: any) { cursorServiceRef.current?.stopTracking() } - return { + const result = { isConnected: state.isConnected || false, onlineUsers: state.onlineUsers || [], cursors: state.cursors || {}, @@ -80,4 +83,6 @@ export function useCollaboration(appId: string, reactFlowStore?: any) { startCursorTracking, stopCursorTracking, } + + return result } diff --git a/web/app/components/workflow/collaboration/test/leader-test.tsx b/web/app/components/workflow/collaboration/test/leader-test.tsx deleted file mode 100644 index b6115b42b8..0000000000 --- a/web/app/components/workflow/collaboration/test/leader-test.tsx +++ /dev/null @@ -1,61 +0,0 @@ -import React from 'react' -import { useCollaboration } from '../hooks/use-collaboration' - -type LeaderTestProps = { - appId: string -} - -export function LeaderTest({ appId }: LeaderTestProps) { - const { isConnected, isLeader, leaderId, onlineUsers } = useCollaboration(appId) - - return ( -
-

Leader Election Test

- -
-
- Connection: - - {isConnected ? 'Connected' : 'Disconnected'} - -
- -
- I am Leader: - - {isLeader ? 'YES' : 'NO'} - -
- -
- Current Leader ID: - - {leaderId || 'None'} - -
- -
- Online Users ({onlineUsers.length}): -
- {onlineUsers.map((user: any) => ( -
- - {user.user_id} - ({user.username}) - {user.user_id === leaderId && ( - 👑 Leader - )} -
- ))} -
-
-
-
- ) -}