From 8adb882c4bfede161e403ca6bdb6a93878f5a646 Mon Sep 17 00:00:00 2001 From: hjlarry Date: Tue, 11 Nov 2025 16:38:55 +0800 Subject: [PATCH] if session unauthorized, rejoin --- .../core/collaboration-manager.ts | 135 ++++++++++++------ .../collaboration/core/crdt-provider.ts | 7 +- .../collaboration/core/websocket-manager.ts | 39 ++++- 3 files changed, 132 insertions(+), 49 deletions(-) diff --git a/web/app/components/workflow/collaboration/core/collaboration-manager.ts b/web/app/components/workflow/collaboration/core/collaboration-manager.ts index 01657a0755..a02c9ac919 100644 --- a/web/app/components/workflow/collaboration/core/collaboration-manager.ts +++ b/web/app/components/workflow/collaboration/core/collaboration-manager.ts @@ -1,6 +1,7 @@ import { LoroDoc, LoroList, LoroMap, UndoManager } from 'loro-crdt' import { cloneDeep, isEqual } from 'lodash-es' -import { webSocketClient } from './websocket-manager' +import type { Socket } from 'socket.io-client' +import { emitWithAuthGuard, webSocketClient } from './websocket-manager' import { CRDTProvider } from './crdt-provider' import { EventEmitter } from './event-emitter' import type { Edge, Node } from '../../types' @@ -36,6 +37,58 @@ export class CollaborationManager { private activeConnections = new Set() private isUndoRedoInProgress = false private pendingInitialSync = false + private rejoinInProgress = false + + private getActiveSocket(): Socket | null { + if (!this.currentAppId) + return null + return webSocketClient.getSocket(this.currentAppId) + } + + private handleSessionUnauthorized = (): void => { + if (this.rejoinInProgress) + return + if (!this.currentAppId) + return + + const socket = this.getActiveSocket() + if (!socket) + return + + this.rejoinInProgress = true + console.warn('Collaboration session expired, attempting to rejoin workflow.') + emitWithAuthGuard( + socket, + 'user_connect', + { workflow_id: this.currentAppId }, + { + onAck: () => { + this.rejoinInProgress = false + }, + onUnauthorized: () => { + this.rejoinInProgress = false + console.error('Rejoin failed due to authorization error, forcing disconnect.') + this.forceDisconnect() + }, + }, + ) + } + + private sendCollaborationEvent(payload: any): void { + const socket = this.getActiveSocket() + if (!socket) + return + + emitWithAuthGuard(socket, 'collaboration_event', payload, { onUnauthorized: this.handleSessionUnauthorized }) + } + + private sendGraphEvent(payload: any): void { + const socket = this.getActiveSocket() + if (!socket) + return + + emitWithAuthGuard(socket, 'graph_event', payload, { onUnauthorized: this.handleSessionUnauthorized }) + } private getNodeContainer(nodeId: string): LoroMap { if (!this.nodesMap) @@ -362,13 +415,13 @@ export class CollaborationManager { }, }) - this.provider = new CRDTProvider(socket, this.doc) + this.provider = new CRDTProvider(socket, this.doc, this.handleSessionUnauthorized) this.setupSubscriptions() // Force user_connect if already connected if (socket.connected) - socket.emit('user_connect', { workflow_id: appId }) + emitWithAuthGuard(socket, 'user_connect', { workflow_id: appId }, { onUnauthorized: this.handleSessionUnauthorized }) return connectionId } @@ -397,6 +450,7 @@ export class CollaborationManager { this.cursors = {} this.nodePanelPresence = {} this.isUndoRedoInProgress = false + this.rejoinInProgress = false // Only reset leader status when actually disconnecting const wasLeader = this.isLeader @@ -426,49 +480,44 @@ export class CollaborationManager { emitCursorMove(position: CursorPosition): void { if (!this.currentAppId || !webSocketClient.isConnected(this.currentAppId)) return - const socket = webSocketClient.getSocket(this.currentAppId) - if (socket) { - socket.emit('collaboration_event', { - type: 'mouse_move', - userId: socket.id, - data: { x: position.x, y: position.y }, - timestamp: Date.now(), - }) - } + const socket = this.getActiveSocket() + if (!socket) + return + + this.sendCollaborationEvent({ + type: 'mouse_move', + userId: socket.id, + data: { x: position.x, y: position.y }, + timestamp: Date.now(), + }) } emitSyncRequest(): void { if (!this.currentAppId || !webSocketClient.isConnected(this.currentAppId)) return - const socket = webSocketClient.getSocket(this.currentAppId) - if (socket) { - console.log('Emitting sync request to leader') - socket.emit('collaboration_event', { - type: 'sync_request', - data: { timestamp: Date.now() }, - timestamp: Date.now(), - }) - } + console.log('Emitting sync request to leader') + this.sendCollaborationEvent({ + type: 'sync_request', + data: { timestamp: Date.now() }, + timestamp: Date.now(), + }) } emitWorkflowUpdate(appId: string): void { if (!this.currentAppId || !webSocketClient.isConnected(this.currentAppId)) return - const socket = webSocketClient.getSocket(this.currentAppId) - if (socket) { - console.log('Emitting Workflow update event') - socket.emit('collaboration_event', { - type: 'workflow_update', - data: { appId, timestamp: Date.now() }, - timestamp: Date.now(), - }) - } + console.log('Emitting Workflow update event') + this.sendCollaborationEvent({ + type: 'workflow_update', + data: { appId, timestamp: Date.now() }, + timestamp: Date.now(), + }) } emitNodePanelPresence(nodeId: string, isOpen: boolean, user: NodePanelPresenceUser): void { if (!this.currentAppId || !webSocketClient.isConnected(this.currentAppId)) return - const socket = webSocketClient.getSocket(this.currentAppId) + const socket = this.getActiveSocket() if (!socket || !nodeId || !user?.userId) return const payload: NodePanelPresenceEventData = { @@ -479,7 +528,7 @@ export class CollaborationManager { timestamp: Date.now(), } - socket.emit('collaboration_event', { + this.sendCollaborationEvent({ type: 'node_panel_presence', data: payload, timestamp: payload.timestamp, @@ -545,15 +594,12 @@ export class CollaborationManager { emitCommentsUpdate(appId: string): void { if (!this.currentAppId || !webSocketClient.isConnected(this.currentAppId)) return - const socket = webSocketClient.getSocket(this.currentAppId) - if (socket) { - console.log('Emitting Comments update event') - socket.emit('collaboration_event', { - type: 'comments_update', - data: { appId, timestamp: Date.now() }, - timestamp: Date.now(), - }) - } + console.log('Emitting Comments update event') + this.sendCollaborationEvent({ + type: 'comments_update', + data: { appId, timestamp: Date.now() }, + timestamp: Date.now(), + }) } onUndoRedoStateChange(callback: (state: { canUndo: boolean; canRedo: boolean }) => void): () => void { @@ -994,10 +1040,7 @@ export class CollaborationManager { private emitGraphResyncRequest(): void { if (!this.currentAppId || !webSocketClient.isConnected(this.currentAppId)) return - const socket = webSocketClient.getSocket(this.currentAppId) - if (!socket) return - - socket.emit('collaboration_event', { + this.sendCollaborationEvent({ type: 'graph_resync_request', data: { timestamp: Date.now() }, timestamp: Date.now(), @@ -1013,7 +1056,7 @@ export class CollaborationManager { try { const snapshot = this.doc.export({ mode: 'snapshot' }) - socket.emit('graph_event', snapshot) + this.sendGraphEvent(snapshot) } catch (error) { console.error('Failed to broadcast graph snapshot:', error) diff --git a/web/app/components/workflow/collaboration/core/crdt-provider.ts b/web/app/components/workflow/collaboration/core/crdt-provider.ts index 644addb863..fbe4b13e02 100644 --- a/web/app/components/workflow/collaboration/core/crdt-provider.ts +++ b/web/app/components/workflow/collaboration/core/crdt-provider.ts @@ -1,13 +1,16 @@ import type { LoroDoc } from 'loro-crdt' import type { Socket } from 'socket.io-client' +import { emitWithAuthGuard } from './websocket-manager' export class CRDTProvider { private doc: LoroDoc private socket: Socket + private onUnauthorized?: () => void - constructor(socket: Socket, doc: LoroDoc) { + constructor(socket: Socket, doc: LoroDoc, onUnauthorized?: () => void) { this.socket = socket this.doc = doc + this.onUnauthorized = onUnauthorized this.setupEventListeners() } @@ -15,7 +18,7 @@ export class CRDTProvider { this.doc.subscribe((event: any) => { if (event.by === 'local') { const update = this.doc.export({ mode: 'update' }) - this.socket.emit('graph_event', update) + emitWithAuthGuard(this.socket, 'graph_event', update, { onUnauthorized: this.onUnauthorized }) } }) diff --git a/web/app/components/workflow/collaboration/core/websocket-manager.ts b/web/app/components/workflow/collaboration/core/websocket-manager.ts index b0b80c853c..bef68e5269 100644 --- a/web/app/components/workflow/collaboration/core/websocket-manager.ts +++ b/web/app/components/workflow/collaboration/core/websocket-manager.ts @@ -3,6 +3,43 @@ import { io } from 'socket.io-client' import { ACCESS_TOKEN_LOCAL_STORAGE_NAME } from '@/config' import type { DebugInfo, WebSocketConfig } from '../types/websocket' +const isUnauthorizedAck = (...ackArgs: any[]): boolean => { + const [first, second] = ackArgs + + if (second === 401 || first === 401) + return true + + if (first && typeof first === 'object' && first.msg === 'unauthorized') + return true + + return false +} + +export type EmitAckOptions = { + onAck?: (...ackArgs: any[]) => void + onUnauthorized?: (...ackArgs: any[]) => void +} + +export const emitWithAuthGuard = ( + socket: Socket | null | undefined, + event: string, + payload: any, + options?: EmitAckOptions, +): void => { + if (!socket) + return + + socket.emit( + event, + payload, + (...ackArgs: any[]) => { + options?.onAck?.(...ackArgs) + if (isUnauthorizedAck(...ackArgs)) + options?.onUnauthorized?.(...ackArgs) + }, + ) +} + export class WebSocketClient { private connections: Map = new Map() private connecting: Set = new Set() @@ -115,7 +152,7 @@ export class WebSocketClient { private setupBaseEventListeners(socket: Socket, appId: string): void { socket.on('connect', () => { this.connecting.delete(appId) - socket.emit('user_connect', { workflow_id: appId }) + emitWithAuthGuard(socket, 'user_connect', { workflow_id: appId }) }) socket.on('disconnect', () => {