From e43b46786de73a7fb4671b89830d416f2db6e66c Mon Sep 17 00:00:00 2001 From: hjlarry Date: Thu, 7 Aug 2025 10:58:53 +0800 Subject: [PATCH] refactor all the frontend code --- .../(appDetailLayout)/[appId]/layout-main.tsx | 4 +- .../workflow-app/components/workflow-main.tsx | 107 ++------ .../hooks/use-workflow-websocket.ts | 45 +--- web/app/components/workflow-app/index.tsx | 21 +- .../core/collaboration-manager.ts | 218 +++++++++++++++ .../collaboration/core/crdt-provider.ts | 36 +++ .../collaboration/core/event-emitter.ts | 49 ++++ .../collaboration/core/websocket-client.ts | 253 ------------------ .../collaboration/core/websocket-manager.ts | 119 ++++++++ .../collaboration/hooks/use-collaboration.ts | 72 +++++ .../workflow/collaboration/index.ts | 5 + .../workflow/collaboration/manage.ts | 153 ----------- .../collaboration/services/cursor-service.ts | 84 ++++++ .../collaboration/types/collaboration.ts | 43 +++ .../workflow/collaboration/types/events.ts | 38 +++ .../workflow/collaboration/types/index.ts | 3 + .../workflow/collaboration/types/websocket.ts | 16 ++ web/app/components/workflow/features.tsx | 2 +- .../hooks/use-collaborative-workflow.ts | 2 +- .../panel/chat-variable-panel/index.tsx | 2 +- .../workflow/panel/env-panel/index.tsx | 2 +- 21 files changed, 733 insertions(+), 541 deletions(-) create mode 100644 web/app/components/workflow/collaboration/core/collaboration-manager.ts create mode 100644 web/app/components/workflow/collaboration/core/crdt-provider.ts create mode 100644 web/app/components/workflow/collaboration/core/event-emitter.ts delete mode 100644 web/app/components/workflow/collaboration/core/websocket-client.ts create mode 100644 web/app/components/workflow/collaboration/core/websocket-manager.ts create mode 100644 web/app/components/workflow/collaboration/hooks/use-collaboration.ts create mode 100644 web/app/components/workflow/collaboration/index.ts delete mode 100644 web/app/components/workflow/collaboration/manage.ts create mode 100644 web/app/components/workflow/collaboration/services/cursor-service.ts create mode 100644 web/app/components/workflow/collaboration/types/collaboration.ts create mode 100644 web/app/components/workflow/collaboration/types/events.ts create mode 100644 web/app/components/workflow/collaboration/types/index.ts create mode 100644 web/app/components/workflow/collaboration/types/websocket.ts diff --git a/web/app/(commonLayout)/app/(appDetailLayout)/[appId]/layout-main.tsx b/web/app/(commonLayout)/app/(appDetailLayout)/[appId]/layout-main.tsx index 2b87dddc4c..b27f842f12 100644 --- a/web/app/(commonLayout)/app/(appDetailLayout)/[appId]/layout-main.tsx +++ b/web/app/(commonLayout)/app/(appDetailLayout)/[appId]/layout-main.tsx @@ -26,7 +26,7 @@ import Loading from '@/app/components/base/loading' import useBreakpoints, { MediaType } from '@/hooks/use-breakpoints' import type { App } from '@/types/app' import useDocumentTitle from '@/hooks/use-document-title' -import { webSocketClient } from '@/app/components/workflow/collaboration/core/websocket-client' +import { webSocketClient } from '@/app/components/workflow/collaboration/core/websocket-manager' export type IAppDetailLayoutProps = { children: React.ReactNode @@ -117,7 +117,7 @@ const AppDetailLayout: FC = (props) => { setAppDetailRes(res) // Only connect for workflow/advanced-chat apps and if not already connected if ((res.mode === 'workflow' || res.mode === 'advanced-chat') && !webSocketClient.isConnected(appId)) - webSocketClient.getClient(appId) + webSocketClient.connect(appId) }).catch((e: any) => { if (e.status === 404) router.replace('/apps') diff --git a/web/app/components/workflow-app/components/workflow-main.tsx b/web/app/components/workflow-app/components/workflow-main.tsx index 1899e8bb25..f6704026b5 100644 --- a/web/app/components/workflow-app/components/workflow-main.tsx +++ b/web/app/components/workflow-app/components/workflow-main.tsx @@ -3,7 +3,6 @@ import { useEffect, useMemo, useRef, - useState, } from 'react' import { useFeaturesStore } from '@/app/components/base/features/hooks' import type { Features as FeaturesData } from '@/app/components/base/features/types' @@ -22,10 +21,9 @@ import { useWorkflowStartRun, } from '../hooks' import { useStore, useWorkflowStore } from '@/app/components/workflow/store' -import { webSocketClient } from '@/app/components/workflow/collaboration/core/websocket-client' import { useCollaborativeCursors } from '../hooks' -import type { OnlineUser } from '@/app/components/workflow/collaboration/core/websocket-client' -import { collaborationManager } from '@/app/components/workflow/collaboration/manage' +import { useCollaboration } from '@/app/components/workflow/collaboration' +import { collaborationManager } from '@/app/components/workflow/collaboration' import { fetchWorkflowDraft } from '@/service/workflow' import { useStoreApi } from 'reactflow' @@ -39,19 +37,18 @@ const WorkflowMain = ({ const workflowStore = useWorkflowStore() const appId = useStore(s => s.appId) const containerRef = useRef(null) - const lastEmitTimeRef = useRef(0) - const lastPositionRef = useRef<{ x: number; y: number } | null>(null) const store = useStoreApi() + const { startCursorTracking, stopCursorTracking, onlineUsers } = useCollaboration(appId, store) useEffect(() => { - collaborationManager.init(appId, store) - }, [appId, store]) + if (containerRef.current) + startCursorTracking(containerRef as React.RefObject) - // Get the socket for current app - const wsClient = useMemo(() => { - return appId ? webSocketClient.getClient(appId) : null - }, [appId]) + return () => { + stopCursorTracking() + } + }, [startCursorTracking, stopCursorTracking]) const handleWorkflowDataUpdate = useCallback((payload: any) => { const { @@ -102,64 +99,19 @@ const WorkflowMain = ({ useEffect(() => { if (!appId) return - wsClient?.on('collaboration_update', async (update: any) => { - if (update.type === 'varsAndFeaturesUpdate') { - try { - const response = await fetchWorkflowDraft(`/apps/${appId}/workflows/draft`) - handleWorkflowDataUpdate(response) - } - catch (error) { - console.error('workflow vars and features update failed:', error) - } + + const unsubscribe = collaborationManager.onVarsAndFeaturesUpdate(async (update: any) => { + try { + const response = await fetchWorkflowDraft(`/apps/${appId}/workflows/draft`) + handleWorkflowDataUpdate(response) + } + catch (error) { + console.error('workflow vars and features update failed:', error) } }) - }, [appId, wsClient]) - const handleMouseMove = useCallback((event: MouseEvent) => { - if (!containerRef.current || !wsClient?.connected) return - - const rect = containerRef.current.getBoundingClientRect() - const x = event.clientX - rect.left - const y = event.clientY - rect.top - - // Only emit if mouse is within the container - if (x >= 0 && y >= 0 && x <= rect.width && y <= rect.height) { - const now = Date.now() - const timeSinceLastEmit = now - lastEmitTimeRef.current - - if (timeSinceLastEmit >= 300) { - lastEmitTimeRef.current = now - lastPositionRef.current = { x, y } - - const eventData = { - type: 'mouseMove', - data: { x, y }, - timestamp: now, - } - - wsClient.emit('collaboration_event', eventData) - - // Debug log - if (process.env.NODE_ENV === 'development') - console.log('Mouse move emitted:', eventData) - } - else { - // Update position for potential future emit - lastPositionRef.current = { x, y } - } - } - }, [wsClient]) - - useEffect(() => { - const container = containerRef.current - if (!container) return - - container.addEventListener('mousemove', handleMouseMove) - - return () => { - container.removeEventListener('mousemove', handleMouseMove) - } - }, [handleMouseMove]) + return unsubscribe + }, [appId, handleWorkflowDataUpdate]) const { doSyncWorkflowDraft, @@ -180,25 +132,6 @@ const WorkflowMain = ({ } = useWorkflowStartRun() const { cursors, myUserId } = useCollaborativeCursors(appId) - const [onlineUsers, setOnlineUsers] = useState>({}) - - useEffect(() => { - if (!appId || !wsClient) return - - const handleOnlineUsersUpdate = (data: { users: OnlineUser[] }) => { - const usersMap = data.users.reduce((acc, user) => { - acc[user.user_id] = user - return acc - }, {} as Record) - setOnlineUsers(usersMap) - } - - wsClient.on('online_users', handleOnlineUsersUpdate) - - return () => { - wsClient.off('online_users', handleOnlineUsersUpdate) - } - }, [appId, wsClient]) const { fetchInspectVars } = useSetWorkflowVarsWithValue({ flowId: appId, @@ -302,7 +235,7 @@ const WorkflowMain = ({ if (userId === myUserId) return null - const userInfo = onlineUsers[userId] + const userInfo = onlineUsers.find(user => user.user_id === userId) const userName = userInfo?.username || `User ${userId.slice(-4)}` const getUserColor = (id: string) => { diff --git a/web/app/components/workflow-app/hooks/use-workflow-websocket.ts b/web/app/components/workflow-app/hooks/use-workflow-websocket.ts index 12b2b06b04..22f762c3c9 100644 --- a/web/app/components/workflow-app/hooks/use-workflow-websocket.ts +++ b/web/app/components/workflow-app/hooks/use-workflow-websocket.ts @@ -1,45 +1,18 @@ import { useEffect, useState } from 'react' -import { webSocketClient } from '@/app/components/workflow/collaboration/core/websocket-client' +import { useCollaboration } from '@/app/components/workflow/collaboration' export function useCollaborativeCursors(appId: string) { - const [cursors, setCursors] = useState>({}) + const { cursors, isConnected } = useCollaboration(appId) const [myUserId, setMyUserId] = useState(null) useEffect(() => { - if (!appId) return + if (isConnected) + setMyUserId('current-user') + }, [isConnected]) - // Get existing socket or create new one - const wsClient = webSocketClient.getClient(appId) + const filteredCursors = Object.fromEntries( + Object.entries(cursors).filter(([userId]) => userId !== myUserId), + ) - const handleConnect = () => { - setMyUserId(wsClient.id || 'unknown') - } - - // Listen to collaboration events for this specific app - const unsubscribeMouseMove = wsClient.on('collaboration_update', (update: any) => { - if (update.type === 'mouseMove' && update.userId !== myUserId) { - setCursors(prev => ({ - ...prev, - [update.userId]: { - x: update.data.x, - y: update.data.y, - userId: update.userId, - timestamp: update.timestamp, - }, - })) - } - }) - - if (wsClient.connected) - handleConnect() - else - wsClient.on('connect', handleConnect) - - return () => { - unsubscribeMouseMove() - wsClient.off('connect', handleConnect) - } - }, [appId]) - - return { cursors, myUserId } + return { cursors: filteredCursors, myUserId } } diff --git a/web/app/components/workflow-app/index.tsx b/web/app/components/workflow-app/index.tsx index 0795efe318..3348f905a8 100644 --- a/web/app/components/workflow-app/index.tsx +++ b/web/app/components/workflow-app/index.tsx @@ -1,4 +1,5 @@ import { + useEffect, useMemo, } from 'react' import useSWR from 'swr' @@ -23,22 +24,31 @@ import { } from '@/app/components/workflow/context' import { createWorkflowSlice } from './store/workflow/workflow-slice' import WorkflowAppMain from './components/workflow-main' -import { collaborationManager } from '@/app/components/workflow/collaboration/manage' +import { collaborationManager } from '@/app/components/workflow/collaboration/core/collaboration-manager' +import { useStore } from '@/app/components/workflow/store' const WorkflowAppWithAdditionalContext = () => { const { data, isLoading, } = useWorkflowInit() - const { setNodes, setEdges } = collaborationManager + const appId = useStore(s => s.appId) const { data: fileUploadConfigResponse } = useSWR({ url: '/files/upload' }, fetchFileUploadConfig) + useEffect(() => { + if (appId && data) + collaborationManager.init(appId, null) + + return () => { + collaborationManager.destroy() + } + }, [appId, data]) + const nodesData = useMemo(() => { if (data) { const processedNodes = initialNodes(data.graph.nodes, data.graph.edges) - setNodes([], processedNodes) - + collaborationManager.setNodes([], processedNodes) return processedNodes } return [] @@ -47,8 +57,7 @@ const WorkflowAppWithAdditionalContext = () => { const edgesData = useMemo(() => { if (data) { const processedEdges = initialEdges(data.graph.edges, data.graph.nodes) - setEdges([], processedEdges) - + collaborationManager.setEdges([], processedEdges) return processedEdges } return [] diff --git a/web/app/components/workflow/collaboration/core/collaboration-manager.ts b/web/app/components/workflow/collaboration/core/collaboration-manager.ts new file mode 100644 index 0000000000..09278ded8a --- /dev/null +++ b/web/app/components/workflow/collaboration/core/collaboration-manager.ts @@ -0,0 +1,218 @@ +import { LoroDoc } from 'loro-crdt' +import { isEqual } from 'lodash-es' +import { webSocketClient } from './websocket-manager' +import { CRDTProvider } from './crdt-provider' +import { EventEmitter } from './event-emitter' +import type { Edge, Node } from '../../types' +import type { CollaborationState, CursorPosition, OnlineUser } from '../types/collaboration' + +export class CollaborationManager { + private doc: LoroDoc | null = null + private provider: CRDTProvider | null = null + private nodesMap: any = null + private edgesMap: any = null + private eventEmitter = new EventEmitter() + private currentAppId: string | null = null + private reactFlowStore: any = null + + init = (appId: string, reactFlowStore: any): void => { + this.connect(appId, reactFlowStore) + } + + setNodes = (oldNodes: Node[], newNodes: Node[]): void => { + this.syncNodes(oldNodes, newNodes) + if (this.doc) + this.doc.commit() + } + + setEdges = (oldEdges: Edge[], newEdges: Edge[]): void => { + this.syncEdges(oldEdges, newEdges) + if (this.doc) + this.doc.commit() + } + + destroy = (): void => { + this.disconnect() + } + + async connect(appId: string, reactFlowStore: any): Promise { + if (this.currentAppId === appId && this.doc) return + + this.disconnect() + + this.currentAppId = appId + this.reactFlowStore = reactFlowStore + + const socket = webSocketClient.connect(appId) + this.doc = new LoroDoc() + this.nodesMap = this.doc.getMap('nodes') + this.edgesMap = this.doc.getMap('edges') + this.provider = new CRDTProvider(socket, this.doc) + + this.setupSubscriptions() + this.setupSocketEventListeners(socket) + } + + disconnect = (): void => { + if (this.currentAppId) + webSocketClient.disconnect(this.currentAppId) + + this.provider?.destroy() + this.doc = null + this.provider = null + this.nodesMap = null + this.edgesMap = null + this.currentAppId = null + this.reactFlowStore = null + this.eventEmitter.removeAllListeners() + } + + isConnected(): boolean { + return this.currentAppId ? webSocketClient.isConnected(this.currentAppId) : false + } + + getNodes(): Node[] { + return this.nodesMap ? Array.from(this.nodesMap.values()) : [] + } + + getEdges(): Edge[] { + return this.edgesMap ? Array.from(this.edgesMap.values()) : [] + } + + emitCursorMove(position: CursorPosition): void { + if (!this.currentAppId || !webSocketClient.isConnected(this.currentAppId)) return + + const socket = webSocketClient.getSocket(this.currentAppId) + if (socket) { + socket.emit('collaboration_event', { + type: 'mouseMove', + userId: socket.id, + data: { x: position.x, y: position.y }, + timestamp: Date.now(), + }) + } + } + + onStateChange(callback: (state: Partial) => void): () => void { + return this.eventEmitter.on('stateChange', callback) + } + + onCursorUpdate(callback: (cursors: Record) => void): () => void { + return this.eventEmitter.on('cursors', callback) + } + + onOnlineUsersUpdate(callback: (users: OnlineUser[]) => void): () => void { + return this.eventEmitter.on('onlineUsers', callback) + } + + onVarsAndFeaturesUpdate(callback: (update: any) => void): () => void { + return this.eventEmitter.on('varsAndFeaturesUpdate', callback) + } + + private syncNodes(oldNodes: Node[], newNodes: Node[]): void { + if (!this.nodesMap) return + + const oldNodesMap = new Map(oldNodes.map(node => [node.id, node])) + const newNodesMap = new Map(newNodes.map(node => [node.id, node])) + + oldNodes.forEach((oldNode) => { + if (!newNodesMap.has(oldNode.id)) + this.nodesMap.delete(oldNode.id) + }) + + newNodes.forEach((newNode) => { + const oldNode = oldNodesMap.get(newNode.id) + if (!oldNode) { + this.nodesMap.set(newNode.id, newNode) + } + else { + const oldPersistentData = this.getPersistentNodeData(oldNode) + const newPersistentData = this.getPersistentNodeData(newNode) + if (!isEqual(oldPersistentData, newPersistentData)) + this.nodesMap.set(newNode.id, newPersistentData) + } + }) + } + + private syncEdges(oldEdges: Edge[], newEdges: Edge[]): void { + if (!this.edgesMap) return + + const oldEdgesMap = new Map(oldEdges.map(edge => [edge.id, edge])) + const newEdgesMap = new Map(newEdges.map(edge => [edge.id, edge])) + + oldEdges.forEach((oldEdge) => { + if (!newEdgesMap.has(oldEdge.id)) + this.edgesMap.delete(oldEdge.id) + }) + + newEdges.forEach((newEdge) => { + const oldEdge = oldEdgesMap.get(newEdge.id) + if (!oldEdge) + this.edgesMap.set(newEdge.id, newEdge) + else if (!isEqual(oldEdge, newEdge)) + this.edgesMap.set(newEdge.id, newEdge) + }) + } + + private getPersistentNodeData(node: Node): any { + const { data, ...rest } = node + const filteredData = Object.fromEntries( + Object.entries(data).filter(([key]) => !key.startsWith('_')), + ) + return { ...rest, data: filteredData } + } + + private setupSubscriptions(): void { + this.nodesMap?.subscribe((event: any) => { + if (event.by === 'import' && this.reactFlowStore) { + requestAnimationFrame(() => { + const { setNodes } = this.reactFlowStore.getState() + const updatedNodes = Array.from(this.nodesMap.values()) + setNodes(updatedNodes) + }) + } + }) + + this.edgesMap?.subscribe((event: any) => { + if (event.by === 'import' && this.reactFlowStore) { + requestAnimationFrame(() => { + const { setEdges } = this.reactFlowStore.getState() + const updatedEdges = Array.from(this.edgesMap.values()) + setEdges(updatedEdges) + }) + } + }) + } + + private setupSocketEventListeners(socket: any): void { + socket.on('collaboration_update', (update: any) => { + if (update.type === 'mouseMove') { + this.eventEmitter.emit('cursors', { + [update.userId]: { + x: update.data.x, + y: update.data.y, + userId: update.userId, + timestamp: update.timestamp, + }, + }) + } + else if (update.type === 'varsAndFeaturesUpdate') { + this.eventEmitter.emit('varsAndFeaturesUpdate', update) + } + }) + + socket.on('online_users', (data: { users: OnlineUser[] }) => { + this.eventEmitter.emit('onlineUsers', data.users) + }) + + socket.on('connect', () => { + this.eventEmitter.emit('stateChange', { isConnected: true }) + }) + + socket.on('disconnect', () => { + this.eventEmitter.emit('stateChange', { isConnected: false }) + }) + } +} + +export const collaborationManager = new CollaborationManager() diff --git a/web/app/components/workflow/collaboration/core/crdt-provider.ts b/web/app/components/workflow/collaboration/core/crdt-provider.ts new file mode 100644 index 0000000000..644addb863 --- /dev/null +++ b/web/app/components/workflow/collaboration/core/crdt-provider.ts @@ -0,0 +1,36 @@ +import type { LoroDoc } from 'loro-crdt' +import type { Socket } from 'socket.io-client' + +export class CRDTProvider { + private doc: LoroDoc + private socket: Socket + + constructor(socket: Socket, doc: LoroDoc) { + this.socket = socket + this.doc = doc + this.setupEventListeners() + } + + private setupEventListeners(): void { + this.doc.subscribe((event: any) => { + if (event.by === 'local') { + const update = this.doc.export({ mode: 'update' }) + this.socket.emit('graph_event', update) + } + }) + + this.socket.on('graph_update', (updateData: Uint8Array) => { + try { + const data = new Uint8Array(updateData) + this.doc.import(data) + } + catch (error) { + console.error('Error importing graph update:', error) + } + }) + } + + destroy(): void { + this.socket.off('graph_update') + } +} diff --git a/web/app/components/workflow/collaboration/core/event-emitter.ts b/web/app/components/workflow/collaboration/core/event-emitter.ts new file mode 100644 index 0000000000..4308a66dd8 --- /dev/null +++ b/web/app/components/workflow/collaboration/core/event-emitter.ts @@ -0,0 +1,49 @@ +export type EventHandler = (data: T) => void + +export class EventEmitter { + private events: Map> = new Map() + + on(event: string, handler: EventHandler): () => void { + if (!this.events.has(event)) + this.events.set(event, new Set()) + + this.events.get(event)!.add(handler) + + return () => this.off(event, handler) + } + + off(event: string, handler?: EventHandler): void { + if (!this.events.has(event)) return + + const handlers = this.events.get(event)! + if (handler) + handlers.delete(handler) + else + handlers.clear() + + if (handlers.size === 0) + this.events.delete(event) + } + + emit(event: string, data: T): void { + if (!this.events.has(event)) return + + const handlers = this.events.get(event)! + handlers.forEach((handler) => { + try { + handler(data) + } + catch (error) { + console.error(`Error in event handler for ${event}:`, error) + } + }) + } + + removeAllListeners(): void { + this.events.clear() + } + + getListenerCount(event: string): number { + return this.events.get(event)?.size || 0 + } +} diff --git a/web/app/components/workflow/collaboration/core/websocket-client.ts b/web/app/components/workflow/collaboration/core/websocket-client.ts deleted file mode 100644 index 73f0ea43df..0000000000 --- a/web/app/components/workflow/collaboration/core/websocket-client.ts +++ /dev/null @@ -1,253 +0,0 @@ -import type { Socket } from 'socket.io-client' -import { io } from 'socket.io-client' - -export type WebSocketConfig = { - url?: string - token?: string - transports?: string[] - withCredentials?: boolean -} - -export type OnlineUser = { - user_id: string - username: string - avatar: string - sid: string -} - -export type WorkflowOnlineUsers = { - workflow_id: string - users: OnlineUser[] -} - -export type OnlineUserListResponse = { - data: WorkflowOnlineUsers[] -} - -/** - * App-specific WebSocket client - * Provides a clean API for a specific app's WebSocket operations - */ -export class AppWebSocketClient { - constructor( - private appId: string, - private socket: Socket, - private manager: WebSocketClient, - ) { - // Initialize app-specific WebSocket client - } - - /** - * Listen to events - */ - on(event: string, handler: (...args: any[]) => void): () => void { - this.socket.on(event, handler) - return () => this.socket.off(event, handler) - } - - /** - * Remove event listener - */ - off(event: string, handler?: (...args: any[]) => void): void { - this.socket.off(event, handler) - } - - /** - * Emit event - */ - emit(event: string, ...args: any[]): void { - if (this.socket.connected) - this.socket.emit(event, ...args) - } - - /** - * Check connection status - */ - get connected(): boolean { - return this.socket.connected - } - - /** - * Get socket ID - */ - get id(): string | undefined { - return this.socket.id - } - - /** - * Disconnect this specific app - */ - disconnect(): void { - this.manager.disconnect(this.appId) - } - - /** - * Get the underlying socket (for advanced usage) - */ - getSocket(): Socket { - return this.socket - } -} - -/** - * Multi-connection WebSocket manager - * Supports multiple concurrent connections for different apps - */ -export class WebSocketClient { - private connections: Map = new Map() - private connecting: Set = new Set() - private config: WebSocketConfig - - constructor(config: WebSocketConfig = {}) { - this.config = { - url: config.url || process.env.NEXT_PUBLIC_SOCKET_URL || 'ws://localhost:5001', - transports: config.transports || ['websocket'], - withCredentials: config.withCredentials !== false, - ...config, - } - } - - /** - * Get or create app-specific WebSocket client - */ - getClient(appId: string): AppWebSocketClient { - let socket = this.connections.get(appId) - - if (!socket || !socket.connected) - socket = this.connect(appId) - - return new AppWebSocketClient(appId, socket, this) - } - - /** - * Connect to WebSocket server for specific app - */ - private connect(appId: string): Socket { - // Return existing connection if available and connected - const existingSocket = this.connections.get(appId) - if (existingSocket?.connected) - return existingSocket - - // If already connecting, return the pending socket - if (this.connecting.has(appId)) { - const pendingSocket = this.connections.get(appId) - if (pendingSocket) - return pendingSocket - } - - // Clean up disconnected socket - if (existingSocket && !existingSocket.connected) { - existingSocket.disconnect() - this.connections.delete(appId) - } - - // Mark as connecting to prevent duplicate connections - this.connecting.add(appId) - - const authToken = localStorage.getItem('console_token') - - const socket = io(this.config.url!, { - path: '/socket.io', - transports: this.config.transports, - auth: { token: authToken }, - withCredentials: this.config.withCredentials, - }) - - this.connections.set(appId, socket) - this.setupBaseEventListeners(socket, appId) - - return socket - } - - /** - * Disconnect from specific app or all connections - */ - disconnect(appId?: string): void { - if (appId) { - const socket = this.connections.get(appId) - if (socket) { - socket.disconnect() - this.connections.delete(appId) - this.connecting.delete(appId) - } - } - else { - // Disconnect all connections - this.connections.forEach(socket => socket.disconnect()) - this.connections.clear() - this.connecting.clear() - } - } - - /** - * Get socket instance for specific app (for backward compatibility) - */ - getSocket(appId: string): Socket | null { - return this.connections.get(appId) || null - } - - /** - * Check connection status for specific app - */ - isConnected(appId: string): boolean { - return this.connections.get(appId)?.connected || false - } - - /** - * Get all connected app IDs - */ - getConnectedApps(): string[] { - const connectedApps: string[] = [] - this.connections.forEach((socket, appId) => { - if (socket.connected) - connectedApps.push(appId) - }) - return connectedApps - } - - /** - * Debug method: get connection status for all apps - */ - getDebugInfo(): Record { - const info: Record = {} - - this.connections.forEach((socket, appId) => { - info[appId] = { - connected: socket.connected, - connecting: this.connecting.has(appId), - socketId: socket.id, - } - }) - - return info - } - - private setupBaseEventListeners(socket: Socket, appId: string): void { - socket.on('connect', () => { - this.connecting.delete(appId) - socket.emit('user_connect', { workflow_id: appId }) - console.log(`WebSocket connected for app: ${appId}`) - }) - - socket.on('disconnect', () => { - this.connecting.delete(appId) - console.log(`WebSocket disconnected for app: ${appId}`) - }) - - socket.on('connect_error', (err) => { - this.connecting.delete(appId) - console.error(`WebSocket connection error for app ${appId}:`, err) - }) - } -} - -// Singleton instance -export const webSocketClient = new WebSocketClient() - -// Online users API -export const fetchAppsOnlineUsers = async (appIds: string[]) => { - const response = await fetch(`/api/online-users?${new URLSearchParams({ - app_ids: appIds.join(','), - })}`) - return response.json() -} diff --git a/web/app/components/workflow/collaboration/core/websocket-manager.ts b/web/app/components/workflow/collaboration/core/websocket-manager.ts new file mode 100644 index 0000000000..b5d63c04d2 --- /dev/null +++ b/web/app/components/workflow/collaboration/core/websocket-manager.ts @@ -0,0 +1,119 @@ +import type { Socket } from 'socket.io-client' +import { io } from 'socket.io-client' +import type { DebugInfo, WebSocketConfig } from '../types/websocket' + +export class WebSocketClient { + private connections: Map = new Map() + private connecting: Set = new Set() + private config: WebSocketConfig + + constructor(config: WebSocketConfig = {}) { + this.config = { + url: config.url || process.env.NEXT_PUBLIC_SOCKET_URL || 'ws://localhost:5001', + transports: config.transports || ['websocket'], + withCredentials: config.withCredentials !== false, + ...config, + } + } + + connect(appId: string): Socket { + const existingSocket = this.connections.get(appId) + if (existingSocket?.connected) + return existingSocket + + if (this.connecting.has(appId)) { + const pendingSocket = this.connections.get(appId) + if (pendingSocket) + return pendingSocket + } + + if (existingSocket && !existingSocket.connected) { + existingSocket.disconnect() + this.connections.delete(appId) + } + + this.connecting.add(appId) + + const authToken = localStorage.getItem('console_token') + const socket = io(this.config.url!, { + path: '/socket.io', + transports: this.config.transports, + auth: { token: authToken }, + withCredentials: this.config.withCredentials, + }) + + this.connections.set(appId, socket) + this.setupBaseEventListeners(socket, appId) + + return socket + } + + disconnect(appId?: string): void { + if (appId) { + const socket = this.connections.get(appId) + if (socket) { + socket.disconnect() + this.connections.delete(appId) + this.connecting.delete(appId) + } + } + else { + this.connections.forEach(socket => socket.disconnect()) + this.connections.clear() + this.connecting.clear() + } + } + + getSocket(appId: string): Socket | null { + return this.connections.get(appId) || null + } + + isConnected(appId: string): boolean { + return this.connections.get(appId)?.connected || false + } + + getConnectedApps(): string[] { + const connectedApps: string[] = [] + this.connections.forEach((socket, appId) => { + if (socket.connected) + connectedApps.push(appId) + }) + return connectedApps + } + + getDebugInfo(): DebugInfo { + const info: DebugInfo = {} + this.connections.forEach((socket, appId) => { + info[appId] = { + connected: socket.connected, + connecting: this.connecting.has(appId), + socketId: socket.id, + } + }) + return info + } + + private setupBaseEventListeners(socket: Socket, appId: string): void { + socket.on('connect', () => { + this.connecting.delete(appId) + socket.emit('user_connect', { workflow_id: appId }) + }) + + socket.on('disconnect', () => { + this.connecting.delete(appId) + }) + + socket.on('connect_error', () => { + this.connecting.delete(appId) + }) + } +} + +export const webSocketClient = new WebSocketClient() + +export const fetchAppsOnlineUsers = async (appIds: string[]) => { + const response = await fetch(`/api/online-users?${new URLSearchParams({ + app_ids: appIds.join(','), + })}`) + return response.json() +} diff --git a/web/app/components/workflow/collaboration/hooks/use-collaboration.ts b/web/app/components/workflow/collaboration/hooks/use-collaboration.ts new file mode 100644 index 0000000000..5924afc0c5 --- /dev/null +++ b/web/app/components/workflow/collaboration/hooks/use-collaboration.ts @@ -0,0 +1,72 @@ +import { useEffect, useRef, useState } from 'react' +import { collaborationManager } from '../core/collaboration-manager' +import { CursorService } from '../services/cursor-service' +import type { CollaborationState } from '../types/collaboration' + +export function useCollaboration(appId: string, reactFlowStore?: any) { + const [state, setState] = useState>({ + isConnected: false, + onlineUsers: [], + cursors: {}, + }) + + const cursorServiceRef = useRef(null) + + useEffect(() => { + if (!appId) return + + if (!cursorServiceRef.current) { + cursorServiceRef.current = new CursorService({ + minMoveDistance: 10, + throttleMs: 300, + }) + } + + const initCollaboration = async () => { + await collaborationManager.connect(appId, reactFlowStore) + setState((prev: any) => ({ ...prev, appId, isConnected: collaborationManager.isConnected() })) + } + + initCollaboration() + + const unsubscribeStateChange = collaborationManager.onStateChange((newState: any) => { + setState((prev: any) => ({ ...prev, ...newState })) + }) + + const unsubscribeCursors = collaborationManager.onCursorUpdate((cursors: any) => { + setState((prev: any) => ({ ...prev, cursors })) + }) + + const unsubscribeUsers = collaborationManager.onOnlineUsersUpdate((users: any) => { + setState((prev: any) => ({ ...prev, onlineUsers: users })) + }) + + return () => { + unsubscribeStateChange() + unsubscribeCursors() + unsubscribeUsers() + cursorServiceRef.current?.stopTracking() + collaborationManager.disconnect() + } + }, [appId, reactFlowStore]) + + const startCursorTracking = (containerRef: React.RefObject) => { + if (cursorServiceRef.current) { + cursorServiceRef.current.startTracking(containerRef, (position) => { + collaborationManager.emitCursorMove(position) + }) + } + } + + const stopCursorTracking = () => { + cursorServiceRef.current?.stopTracking() + } + + return { + isConnected: state.isConnected || false, + onlineUsers: state.onlineUsers || [], + cursors: state.cursors || {}, + startCursorTracking, + stopCursorTracking, + } +} diff --git a/web/app/components/workflow/collaboration/index.ts b/web/app/components/workflow/collaboration/index.ts new file mode 100644 index 0000000000..51cbb1a489 --- /dev/null +++ b/web/app/components/workflow/collaboration/index.ts @@ -0,0 +1,5 @@ +export { collaborationManager } from './core/collaboration-manager' +export { webSocketClient, fetchAppsOnlineUsers } from './core/websocket-manager' +export { CursorService } from './services/cursor-service' +export { useCollaboration } from './hooks/use-collaboration' +export * from './types' diff --git a/web/app/components/workflow/collaboration/manage.ts b/web/app/components/workflow/collaboration/manage.ts deleted file mode 100644 index 9c24e31395..0000000000 --- a/web/app/components/workflow/collaboration/manage.ts +++ /dev/null @@ -1,153 +0,0 @@ -import { LoroDoc } from 'loro-crdt' -import { isEqual } from 'lodash-es' -import { webSocketClient } from './core/websocket-client' -import type { Edge, Node } from '../types' - -class LoroSocketIOProvider { - private doc: LoroDoc - private socket: any - - constructor(socket: any, doc: LoroDoc) { - this.socket = socket - this.doc = doc - this.setupEventListeners() - } - - private setupEventListeners() { - this.doc.subscribe((event: any) => { - if (event.by === 'local') { - const update = this.doc.export({ mode: 'update' }) - this.socket.emit('graph_event', update) - } - }) - - this.socket.on('graph_update', (updateData: Uint8Array) => { - try { - const data = new Uint8Array(updateData) - this.doc.import(data) - } - catch (error) { - console.error('Error importing graph update:', error) - } - }) - } - - destroy() { - this.socket.off('graph_update') - } -} - -class CollaborationManager { - private doc: LoroDoc | null = null - private provider: LoroSocketIOProvider | null = null - private nodesMap: any = null - private edgesMap: any = null - - init(appId: string, reactFlowStore: any) { - const socket = webSocketClient.getClient(appId) - this.doc = new LoroDoc() - this.nodesMap = this.doc.getMap('nodes') - this.edgesMap = this.doc.getMap('edges') - this.provider = new LoroSocketIOProvider(socket, this.doc) - - this.setupSubscriptions(reactFlowStore) - } - - private setupSubscriptions(reactFlowStore: any) { - this.nodesMap?.subscribe((event: any) => { - console.log('nodesMap', event) - if (event.by === 'import') { - requestAnimationFrame(() => { - const { setNodes: reactFlowSetNodes } = reactFlowStore.getState() - const updatedNodes = Array.from(this.nodesMap.values()) - reactFlowSetNodes(updatedNodes) - }) - } - }) - - this.edgesMap?.subscribe((event: any) => { - if (event.by === 'import') { - requestAnimationFrame(() => { - const { setEdges: reactFlowSetEdges } = reactFlowStore.getState() - const updatedEdges = Array.from(this.edgesMap.values()) - reactFlowSetEdges(updatedEdges) - }) - } - }) - } - - getNodes() { - return this.nodesMap ? Array.from(this.nodesMap.values()) : [] - } - - getEdges() { - return this.edgesMap ? Array.from(this.edgesMap.values()) : [] - } - - private getPersistentNodeData = (node: Node) => { - const { data, ...rest } = node - const filteredData = Object.fromEntries( - Object.entries(data).filter(([key]) => !key.startsWith('_')), - ) - return { ...rest, data: filteredData } - } - - setNodes = (oldNodes: Node[], newNodes: Node[]) => { - if (!this.nodesMap || !this.doc) return - - const oldNodesMap = new Map(oldNodes.map(node => [node.id, node])) - const newNodesMap = new Map(newNodes.map(node => [node.id, node])) - - oldNodes.forEach((oldNode) => { - if (!newNodesMap.has(oldNode.id)) - this.nodesMap.delete(oldNode.id) - }) - - newNodes.forEach((newNode) => { - const oldNode = oldNodesMap.get(newNode.id) - if (!oldNode) { - this.nodesMap.set(newNode.id, newNode) - } - else { - const oldPersistentData = this.getPersistentNodeData(oldNode) - const newPersistentData = this.getPersistentNodeData(newNode) - if (!isEqual(oldPersistentData, newPersistentData)) - this.nodesMap.set(newNode.id, newPersistentData) - } - }) - - this.doc.commit() - } - - setEdges = (oldEdges: Edge[], newEdges: Edge[]) => { - if (!this.edgesMap || !this.doc) return - - const oldEdgesMap = new Map(oldEdges.map(edge => [edge.id, edge])) - const newEdgesMap = new Map(newEdges.map(edge => [edge.id, edge])) - - oldEdges.forEach((oldEdge) => { - if (!newEdgesMap.has(oldEdge.id)) - this.edgesMap.delete(oldEdge.id) - }) - - newEdges.forEach((newEdge) => { - const oldEdge = oldEdgesMap.get(newEdge.id) - if (!oldEdge) - this.edgesMap.set(newEdge.id, newEdge) - else if (!isEqual(oldEdge, newEdge)) - this.edgesMap.set(newEdge.id, newEdge) - }) - - this.doc.commit() - } - - destroy() { - this.provider?.destroy() - this.doc = null - this.provider = null - this.nodesMap = null - this.edgesMap = null - } -} - -export const collaborationManager = new CollaborationManager() diff --git a/web/app/components/workflow/collaboration/services/cursor-service.ts b/web/app/components/workflow/collaboration/services/cursor-service.ts new file mode 100644 index 0000000000..f33d595a4a --- /dev/null +++ b/web/app/components/workflow/collaboration/services/cursor-service.ts @@ -0,0 +1,84 @@ +import type { RefObject } from 'react' +import type { CursorPosition } from '../types/collaboration' + +export type CursorServiceConfig = { + minMoveDistance?: number + throttleMs?: number +} + +export class CursorService { + private containerRef: RefObject | null = null + private isTracking = false + private onCursorUpdate: ((cursors: Record) => void) | null = null + private onEmitPosition: ((position: CursorPosition) => void) | null = null + private lastEmitTime = 0 + private lastPosition: { x: number; y: number } | null = null + private config: Required + + constructor(config: CursorServiceConfig = {}) { + this.config = { + minMoveDistance: config.minMoveDistance ?? 5, + throttleMs: config.throttleMs ?? 300, + } + } + + startTracking( + containerRef: RefObject, + onEmitPosition: (position: CursorPosition) => void, + ): void { + if (this.isTracking) this.stopTracking() + + this.containerRef = containerRef + this.onEmitPosition = onEmitPosition + this.isTracking = true + + if (containerRef.current) + containerRef.current.addEventListener('mousemove', this.handleMouseMove) + } + + stopTracking(): void { + if (this.containerRef?.current) + this.containerRef.current.removeEventListener('mousemove', this.handleMouseMove) + + this.containerRef = null + this.onEmitPosition = null + this.isTracking = false + this.lastPosition = null + } + + setCursorUpdateHandler(handler: (cursors: Record) => void): void { + this.onCursorUpdate = handler + } + + updateCursors(cursors: Record): void { + if (this.onCursorUpdate) + this.onCursorUpdate(cursors) + } + + private handleMouseMove = (event: MouseEvent): void => { + if (!this.containerRef?.current || !this.onEmitPosition) return + + const rect = this.containerRef.current.getBoundingClientRect() + const x = event.clientX - rect.left + const y = event.clientY - rect.top + + if (x >= 0 && y >= 0 && x <= rect.width && y <= rect.height) { + const now = Date.now() + const timeThrottled = now - this.lastEmitTime > this.config.throttleMs + const distanceThrottled = !this.lastPosition + || (Math.abs(x - this.lastPosition.x) > this.config.minMoveDistance + || Math.abs(y - this.lastPosition.y) > this.config.minMoveDistance) + + if (timeThrottled && distanceThrottled) { + this.lastPosition = { x, y } + this.lastEmitTime = now + this.onEmitPosition({ + x, + y, + userId: '', + timestamp: now, + }) + } + } + } +} diff --git a/web/app/components/workflow/collaboration/types/collaboration.ts b/web/app/components/workflow/collaboration/types/collaboration.ts new file mode 100644 index 0000000000..7b69897b63 --- /dev/null +++ b/web/app/components/workflow/collaboration/types/collaboration.ts @@ -0,0 +1,43 @@ +import type { Edge, Node } from '../../types' + +export type OnlineUser = { + user_id: string + username: string + avatar: string + sid: string +} + +export type WorkflowOnlineUsers = { + workflow_id: string + users: OnlineUser[] +} + +export type OnlineUserListResponse = { + data: WorkflowOnlineUsers[] +} + +export type CursorPosition = { + x: number + y: number + userId: string + timestamp: number +} + +export type CollaborationState = { + appId: string + isConnected: boolean + onlineUsers: OnlineUser[] + cursors: Record +} + +export type GraphSyncData = { + nodes: Node[] + edges: Edge[] +} + +export type CollaborationUpdate = { + type: 'mouseMove' | 'graphUpdate' | 'userJoin' | 'userLeave' + userId: string + data: any + timestamp: number +} diff --git a/web/app/components/workflow/collaboration/types/events.ts b/web/app/components/workflow/collaboration/types/events.ts new file mode 100644 index 0000000000..e995f9e876 --- /dev/null +++ b/web/app/components/workflow/collaboration/types/events.ts @@ -0,0 +1,38 @@ +export type CollaborationEvent = { + type: string + data: any + timestamp: number +} + +export type GraphUpdateEvent = { + type: 'graph_update' + data: Uint8Array +} & CollaborationEvent + +export type CursorMoveEvent = { + type: 'cursor_move' + data: { + x: number + y: number + userId: string + } +} & CollaborationEvent + +export type UserConnectEvent = { + type: 'user_connect' + data: { + workflow_id: string + } +} & CollaborationEvent + +export type OnlineUsersEvent = { + type: 'online_users' + data: { + users: Array<{ + user_id: string + username: string + avatar: string + sid: string + }> + } +} & CollaborationEvent diff --git a/web/app/components/workflow/collaboration/types/index.ts b/web/app/components/workflow/collaboration/types/index.ts new file mode 100644 index 0000000000..e79ed35da0 --- /dev/null +++ b/web/app/components/workflow/collaboration/types/index.ts @@ -0,0 +1,3 @@ +export * from './websocket' +export * from './collaboration' +export * from './events' diff --git a/web/app/components/workflow/collaboration/types/websocket.ts b/web/app/components/workflow/collaboration/types/websocket.ts new file mode 100644 index 0000000000..37b3a8ad17 --- /dev/null +++ b/web/app/components/workflow/collaboration/types/websocket.ts @@ -0,0 +1,16 @@ +export type WebSocketConfig = { + url?: string + token?: string + transports?: string[] + withCredentials?: boolean +} + +export type ConnectionInfo = { + connected: boolean + connecting: boolean + socketId?: string +} + +export type DebugInfo = { + [appId: string]: ConnectionInfo +} diff --git a/web/app/components/workflow/features.tsx b/web/app/components/workflow/features.tsx index 9b937ec987..f712fb9887 100644 --- a/web/app/components/workflow/features.tsx +++ b/web/app/components/workflow/features.tsx @@ -14,7 +14,7 @@ import useConfig from './nodes/start/use-config' import type { StartNodeType } from './nodes/start/types' import type { PromptVariable } from '@/models/debug' import NewFeaturePanel from '@/app/components/base/features/new-feature-panel' -import { webSocketClient } from '@/app/components/workflow/collaboration/core/websocket-client' +import { webSocketClient } from '@/app/components/workflow/collaboration/core/websocket-manager' const Features = () => { const setShowFeaturesPanel = useStore(s => s.setShowFeaturesPanel) diff --git a/web/app/components/workflow/hooks/use-collaborative-workflow.ts b/web/app/components/workflow/hooks/use-collaborative-workflow.ts index ddf7c08f31..39ad3ccf33 100644 --- a/web/app/components/workflow/hooks/use-collaborative-workflow.ts +++ b/web/app/components/workflow/hooks/use-collaborative-workflow.ts @@ -1,7 +1,7 @@ import { useCallback } from 'react' import { useStoreApi } from 'reactflow' import type { Edge, Node } from '../types' -import { collaborationManager } from '../collaboration/manage' +import { collaborationManager } from '../collaboration/core/collaboration-manager' export const useCollaborativeWorkflow = () => { const store = useStoreApi() diff --git a/web/app/components/workflow/panel/chat-variable-panel/index.tsx b/web/app/components/workflow/panel/chat-variable-panel/index.tsx index 20d74e9ff3..e87db1eafe 100644 --- a/web/app/components/workflow/panel/chat-variable-panel/index.tsx +++ b/web/app/components/workflow/panel/chat-variable-panel/index.tsx @@ -21,7 +21,7 @@ import type { import { findUsedVarNodes, updateNodeVars } from '@/app/components/workflow/nodes/_base/components/variable/utils' import { useNodesSyncDraft } from '@/app/components/workflow/hooks/use-nodes-sync-draft' import { BlockEnum } from '@/app/components/workflow/types' -import { webSocketClient } from '@/app/components/workflow/collaboration/core/websocket-client' +import { webSocketClient } from '@/app/components/workflow/collaboration/core/websocket-manager' import { useDocLink } from '@/context/i18n' import cn from '@/utils/classnames' import useInspectVarsCrud from '../../hooks/use-inspect-vars-crud' diff --git a/web/app/components/workflow/panel/env-panel/index.tsx b/web/app/components/workflow/panel/env-panel/index.tsx index 8a514f3251..052a80bd59 100644 --- a/web/app/components/workflow/panel/env-panel/index.tsx +++ b/web/app/components/workflow/panel/env-panel/index.tsx @@ -18,7 +18,7 @@ import { findUsedVarNodes, updateNodeVars } from '@/app/components/workflow/node import RemoveEffectVarConfirm from '@/app/components/workflow/nodes/_base/components/remove-effect-var-confirm' import cn from '@/utils/classnames' import { useNodesSyncDraft } from '@/app/components/workflow/hooks/use-nodes-sync-draft' -import { webSocketClient } from '@/app/components/workflow/collaboration/core/websocket-client' +import { webSocketClient } from '@/app/components/workflow/collaboration/core/websocket-manager' import { useStore as useWorkflowStore } from '@/app/components/workflow/store' const EnvPanel = () => {