refactor to support mutli websocket connections

This commit is contained in:
hjlarry 2025-08-06 17:05:39 +08:00
parent 2ecf9f6ddf
commit 3f3b37b843
10 changed files with 357 additions and 213 deletions

View File

@ -26,7 +26,7 @@ import Loading from '@/app/components/base/loading'
import useBreakpoints, { MediaType } from '@/hooks/use-breakpoints'
import type { App } from '@/types/app'
import useDocumentTitle from '@/hooks/use-document-title'
import { connectOnlineUserWebSocket, disconnectOnlineUserWebSocket } from '@/service/demo/online-user'
import { webSocketClient } from '@/app/components/workflow/collaboration/core/websocket-client'
export type IAppDetailLayoutProps = {
children: React.ReactNode
@ -115,8 +115,9 @@ const AppDetailLayout: FC<IAppDetailLayoutProps> = (props) => {
setIsLoadingAppDetail(true)
fetchAppDetail({ url: '/apps', id: appId }).then((res) => {
setAppDetailRes(res)
if (res.mode === 'workflow' || res.mode === 'advanced-chat')
connectOnlineUserWebSocket(appId)
// Only connect for workflow/advanced-chat apps and if not already connected
if ((res.mode === 'workflow' || res.mode === 'advanced-chat') && !webSocketClient.isConnected(appId))
webSocketClient.getClient(appId)
}).catch((e: any) => {
if (e.status === 404)
router.replace('/apps')
@ -151,7 +152,7 @@ const AppDetailLayout: FC<IAppDetailLayoutProps> = (props) => {
useUnmount(() => {
setAppDetail()
disconnectOnlineUserWebSocket()
webSocketClient.disconnect(appId)
})
if (!appDetail) {

View File

@ -22,9 +22,9 @@ import {
useWorkflowStartRun,
} from '../hooks'
import { useStore, useWorkflowStore } from '@/app/components/workflow/store'
import { useWebSocketStore } from '@/app/components/workflow/store/websocket-store'
import { webSocketClient } from '@/app/components/workflow/collaboration/core/websocket-client'
import { useCollaborativeCursors } from '../hooks'
import type { OnlineUser } from '@/service/demo/online-user'
import type { OnlineUser } from '@/app/components/workflow/collaboration/core/websocket-client'
import { collaborationManager } from '@/app/components/workflow/collaboration/manage'
import { fetchWorkflowDraft } from '@/service/workflow'
import { useStoreApi } from 'reactflow'
@ -48,7 +48,10 @@ const WorkflowMain = ({
collaborationManager.init(appId, store)
}, [appId, store])
const { emit, getSocket, on } = useWebSocketStore()
// Get the socket for current app
const wsClient = useMemo(() => {
return appId ? webSocketClient.getClient(appId) : null
}, [appId])
const handleWorkflowDataUpdate = useCallback((payload: any) => {
const {
@ -99,24 +102,21 @@ const WorkflowMain = ({
useEffect(() => {
if (!appId) return
const unsubscribeConversationVarsUpdate = on('varsAndFeaturesUpdate', async () => {
try {
const response = await fetchWorkflowDraft(`/apps/${appId}/workflows/draft`)
handleWorkflowDataUpdate(response)
}
catch (error) {
console.error('workflow vars and features update failed:', error)
wsClient?.on('collaboration_update', async (update: any) => {
if (update.type === 'varsAndFeaturesUpdate') {
try {
const response = await fetchWorkflowDraft(`/apps/${appId}/workflows/draft`)
handleWorkflowDataUpdate(response)
}
catch (error) {
console.error('workflow vars and features update failed:', error)
}
}
})
return () => {
unsubscribeConversationVarsUpdate()
}
}, [appId, on])
}, [appId, wsClient])
const handleMouseMove = useCallback((event: MouseEvent) => {
if (!containerRef.current) return
if (!containerRef.current || !wsClient?.connected) return
const rect = containerRef.current.getBoundingClientRect()
const x = event.clientX - rect.left
@ -131,17 +131,24 @@ const WorkflowMain = ({
lastEmitTimeRef.current = now
lastPositionRef.current = { x, y }
emit('mouseMove', {
x,
y,
})
const eventData = {
type: 'mouseMove',
data: { x, y },
timestamp: now,
}
wsClient.emit('collaboration_event', eventData)
// Debug log
if (process.env.NODE_ENV === 'development')
console.log('Mouse move emitted:', eventData)
}
else {
// Update position for potential future emit
lastPositionRef.current = { x, y }
}
}
}, [emit])
}, [wsClient])
useEffect(() => {
const container = containerRef.current
@ -176,8 +183,7 @@ const WorkflowMain = ({
const [onlineUsers, setOnlineUsers] = useState<Record<string, OnlineUser>>({})
useEffect(() => {
if (!appId) return
const socket = getSocket(appId)
if (!appId || !wsClient) return
const handleOnlineUsersUpdate = (data: { users: OnlineUser[] }) => {
const usersMap = data.users.reduce((acc, user) => {
@ -186,11 +192,13 @@ const WorkflowMain = ({
}, {} as Record<string, OnlineUser>)
setOnlineUsers(usersMap)
}
socket.on('online_users', handleOnlineUsersUpdate)
wsClient.on('online_users', handleOnlineUsersUpdate)
return () => {
socket.off('online_users', handleOnlineUsersUpdate)
wsClient.off('online_users', handleOnlineUsersUpdate)
}
}, [appId])
}, [appId, wsClient])
const { fetchInspectVars } = useSetWorkflowVarsWithValue({
flowId: appId,

View File

@ -1,22 +1,23 @@
import { useEffect, useState } from 'react'
import { useWebSocketStore } from '@/app/components/workflow/store/websocket-store'
import { webSocketClient } from '@/app/components/workflow/collaboration/core/websocket-client'
export function useCollaborativeCursors(appId: string) {
const [cursors, setCursors] = useState<Record<string, any>>({})
const [myUserId, setMyUserId] = useState<string | null>(null)
const { getSocket, on } = useWebSocketStore()
useEffect(() => {
if (!appId) return
const socket = getSocket(appId)
// Get existing socket or create new one
const wsClient = webSocketClient.getClient(appId)
const handleConnect = () => {
setMyUserId(socket.id || 'unknown')
setMyUserId(wsClient.id || 'unknown')
}
const unsubscribeMouseMove = on('mouseMove', (update: any) => {
if (update.userId !== myUserId) {
// Listen to collaboration events for this specific app
const unsubscribeMouseMove = wsClient.on('collaboration_update', (update: any) => {
if (update.type === 'mouseMove' && update.userId !== myUserId) {
setCursors(prev => ({
...prev,
[update.userId]: {
@ -29,16 +30,16 @@ export function useCollaborativeCursors(appId: string) {
}
})
if (socket.connected)
if (wsClient.connected)
handleConnect()
else
socket.on('connect', handleConnect)
wsClient.on('connect', handleConnect)
return () => {
unsubscribeMouseMove()
socket.off('connect', handleConnect)
wsClient.off('connect', handleConnect)
}
}, [appId, getSocket, on, myUserId])
}, [appId])
return { cursors, myUserId }
}

View File

@ -0,0 +1,253 @@
import type { Socket } from 'socket.io-client'
import { io } from 'socket.io-client'
export type WebSocketConfig = {
url?: string
token?: string
transports?: string[]
withCredentials?: boolean
}
export type OnlineUser = {
user_id: string
username: string
avatar: string
sid: string
}
export type WorkflowOnlineUsers = {
workflow_id: string
users: OnlineUser[]
}
export type OnlineUserListResponse = {
data: WorkflowOnlineUsers[]
}
/**
* App-specific WebSocket client
* Provides a clean API for a specific app's WebSocket operations
*/
export class AppWebSocketClient {
constructor(
private appId: string,
private socket: Socket,
private manager: WebSocketClient,
) {
// Initialize app-specific WebSocket client
}
/**
* Listen to events
*/
on(event: string, handler: (...args: any[]) => void): () => void {
this.socket.on(event, handler)
return () => this.socket.off(event, handler)
}
/**
* Remove event listener
*/
off(event: string, handler?: (...args: any[]) => void): void {
this.socket.off(event, handler)
}
/**
* Emit event
*/
emit(event: string, ...args: any[]): void {
if (this.socket.connected)
this.socket.emit(event, ...args)
}
/**
* Check connection status
*/
get connected(): boolean {
return this.socket.connected
}
/**
* Get socket ID
*/
get id(): string | undefined {
return this.socket.id
}
/**
* Disconnect this specific app
*/
disconnect(): void {
this.manager.disconnect(this.appId)
}
/**
* Get the underlying socket (for advanced usage)
*/
getSocket(): Socket {
return this.socket
}
}
/**
* Multi-connection WebSocket manager
* Supports multiple concurrent connections for different apps
*/
export class WebSocketClient {
private connections: Map<string, Socket> = new Map()
private connecting: Set<string> = new Set()
private config: WebSocketConfig
constructor(config: WebSocketConfig = {}) {
this.config = {
url: config.url || process.env.NEXT_PUBLIC_SOCKET_URL || 'ws://localhost:5001',
transports: config.transports || ['websocket'],
withCredentials: config.withCredentials !== false,
...config,
}
}
/**
* Get or create app-specific WebSocket client
*/
getClient(appId: string): AppWebSocketClient {
let socket = this.connections.get(appId)
if (!socket || !socket.connected)
socket = this.connect(appId)
return new AppWebSocketClient(appId, socket, this)
}
/**
* Connect to WebSocket server for specific app
*/
private connect(appId: string): Socket {
// Return existing connection if available and connected
const existingSocket = this.connections.get(appId)
if (existingSocket?.connected)
return existingSocket
// If already connecting, return the pending socket
if (this.connecting.has(appId)) {
const pendingSocket = this.connections.get(appId)
if (pendingSocket)
return pendingSocket
}
// Clean up disconnected socket
if (existingSocket && !existingSocket.connected) {
existingSocket.disconnect()
this.connections.delete(appId)
}
// Mark as connecting to prevent duplicate connections
this.connecting.add(appId)
const authToken = localStorage.getItem('console_token')
const socket = io(this.config.url!, {
path: '/socket.io',
transports: this.config.transports,
auth: { token: authToken },
withCredentials: this.config.withCredentials,
})
this.connections.set(appId, socket)
this.setupBaseEventListeners(socket, appId)
return socket
}
/**
* Disconnect from specific app or all connections
*/
disconnect(appId?: string): void {
if (appId) {
const socket = this.connections.get(appId)
if (socket) {
socket.disconnect()
this.connections.delete(appId)
this.connecting.delete(appId)
}
}
else {
// Disconnect all connections
this.connections.forEach(socket => socket.disconnect())
this.connections.clear()
this.connecting.clear()
}
}
/**
* Get socket instance for specific app (for backward compatibility)
*/
getSocket(appId: string): Socket | null {
return this.connections.get(appId) || null
}
/**
* Check connection status for specific app
*/
isConnected(appId: string): boolean {
return this.connections.get(appId)?.connected || false
}
/**
* Get all connected app IDs
*/
getConnectedApps(): string[] {
const connectedApps: string[] = []
this.connections.forEach((socket, appId) => {
if (socket.connected)
connectedApps.push(appId)
})
return connectedApps
}
/**
* Debug method: get connection status for all apps
*/
getDebugInfo(): Record<string, { connected: boolean; connecting: boolean; socketId?: string }> {
const info: Record<string, { connected: boolean; connecting: boolean; socketId?: string }> = {}
this.connections.forEach((socket, appId) => {
info[appId] = {
connected: socket.connected,
connecting: this.connecting.has(appId),
socketId: socket.id,
}
})
return info
}
private setupBaseEventListeners(socket: Socket, appId: string): void {
socket.on('connect', () => {
this.connecting.delete(appId)
socket.emit('user_connect', { workflow_id: appId })
console.log(`WebSocket connected for app: ${appId}`)
})
socket.on('disconnect', () => {
this.connecting.delete(appId)
console.log(`WebSocket disconnected for app: ${appId}`)
})
socket.on('connect_error', (err) => {
this.connecting.delete(appId)
console.error(`WebSocket connection error for app ${appId}:`, err)
})
}
}
// Singleton instance
export const webSocketClient = new WebSocketClient()
// Online users API
export const fetchAppsOnlineUsers = async (appIds: string[]) => {
const response = await fetch(`/api/online-users?${new URLSearchParams({
app_ids: appIds.join(','),
})}`)
return response.json()
}

View File

@ -1,13 +1,13 @@
import { LoroDoc } from 'loro-crdt'
import { isEqual } from 'lodash-es'
import { type WebSocketInstance, useWebSocketStore } from '../store/websocket-store'
import { webSocketClient } from './core/websocket-client'
import type { Edge, Node } from '../types'
class LoroSocketIOProvider {
private doc: LoroDoc
private socket: WebSocketInstance
private socket: any
constructor(socket: WebSocketInstance, doc: LoroDoc) {
constructor(socket: any, doc: LoroDoc) {
this.socket = socket
this.doc = doc
this.setupEventListeners()
@ -44,8 +44,7 @@ class CollaborationManager {
private edgesMap: any = null
init(appId: string, reactFlowStore: any) {
const { getSocket } = useWebSocketStore.getState()
const socket = getSocket(appId)
const socket = webSocketClient.getClient(appId)
this.doc = new LoroDoc()
this.nodesMap = this.doc.getMap('nodes')
this.edgesMap = this.doc.getMap('edges')

View File

@ -14,15 +14,15 @@ import useConfig from './nodes/start/use-config'
import type { StartNodeType } from './nodes/start/types'
import type { PromptVariable } from '@/models/debug'
import NewFeaturePanel from '@/app/components/base/features/new-feature-panel'
import { useWebSocketStore } from '@/app/components/workflow/store/websocket-store'
import { webSocketClient } from '@/app/components/workflow/collaboration/core/websocket-client'
const Features = () => {
const setShowFeaturesPanel = useStore(s => s.setShowFeaturesPanel)
const appId = useStore(s => s.appId)
const isChatMode = useIsChatMode()
const { nodesReadOnly } = useNodesReadOnly()
const { doSyncWorkflowDraft } = useNodesSyncDraft()
const nodes = useNodes<CommonNodeType>()
const { emit } = useWebSocketStore()
const startNode = nodes.find(node => node.data.type === 'start')
const { id, data } = startNode as Node<StartNodeType>
const { handleAddVariable } = useConfig(id, data)
@ -43,11 +43,18 @@ const Features = () => {
const handleFeaturesChange = useCallback(() => {
doSyncWorkflowDraft(false, {
onSuccess() {
emit('varsAndFeaturesUpdate')
if (appId) {
const socket = webSocketClient.getSocket(appId)
if (socket) {
socket.emit('collaboration_event', {
type: 'varsAndFeaturesUpdate',
})
}
}
},
})
setShowFeaturesPanel(true)
}, [doSyncWorkflowDraft, setShowFeaturesPanel])
}, [doSyncWorkflowDraft, setShowFeaturesPanel, appId])
return (
<NewFeaturePanel

View File

@ -21,7 +21,7 @@ import type {
import { findUsedVarNodes, updateNodeVars } from '@/app/components/workflow/nodes/_base/components/variable/utils'
import { useNodesSyncDraft } from '@/app/components/workflow/hooks/use-nodes-sync-draft'
import { BlockEnum } from '@/app/components/workflow/types'
import { useWebSocketStore } from '@/app/components/workflow/store/websocket-store'
import { webSocketClient } from '@/app/components/workflow/collaboration/core/websocket-client'
import { useDocLink } from '@/context/i18n'
import cn from '@/utils/classnames'
import useInspectVarsCrud from '../../hooks/use-inspect-vars-crud'
@ -33,19 +33,26 @@ const ChatVariablePanel = () => {
const setShowChatVariablePanel = useStore(s => s.setShowChatVariablePanel)
const varList = useStore(s => s.conversationVariables) as ConversationVariable[]
const updateChatVarList = useStore(s => s.setConversationVariables)
const appId = useStore(s => s.appId)
const { doSyncWorkflowDraft } = useNodesSyncDraft()
const {
invalidateConversationVarValues,
} = useInspectVarsCrud()
const { emit } = useWebSocketStore()
const handleVarChanged = useCallback(() => {
doSyncWorkflowDraft(false, {
onSuccess() {
invalidateConversationVarValues()
emit('varsAndFeaturesUpdate')
if (appId) {
const socket = webSocketClient.getSocket(appId)
if (socket) {
socket.emit('collaboration_event', {
type: 'varsAndFeaturesUpdate',
})
}
}
},
})
}, [doSyncWorkflowDraft, invalidateConversationVarValues])
}, [doSyncWorkflowDraft, invalidateConversationVarValues, appId])
const [showTip, setShowTip] = useState(true)
const [showVariableModal, setShowVariableModal] = useState(false)

View File

@ -18,7 +18,8 @@ import { findUsedVarNodes, updateNodeVars } from '@/app/components/workflow/node
import RemoveEffectVarConfirm from '@/app/components/workflow/nodes/_base/components/remove-effect-var-confirm'
import cn from '@/utils/classnames'
import { useNodesSyncDraft } from '@/app/components/workflow/hooks/use-nodes-sync-draft'
import { useWebSocketStore } from '@/app/components/workflow/store/websocket-store'
import { webSocketClient } from '@/app/components/workflow/collaboration/core/websocket-client'
import { useStore as useWorkflowStore } from '@/app/components/workflow/store'
const EnvPanel = () => {
const { t } = useTranslation()
@ -29,7 +30,7 @@ const EnvPanel = () => {
const updateEnvList = useStore(s => s.setEnvironmentVariables)
const setEnvSecrets = useStore(s => s.setEnvSecrets)
const { doSyncWorkflowDraft } = useNodesSyncDraft()
const { emit } = useWebSocketStore()
const appId = useWorkflowStore(s => s.appId)
const [showVariableModal, setShowVariableModal] = useState(false)
const [currentVar, setCurrentVar] = useState<EnvironmentVariable>()
@ -73,13 +74,22 @@ const EnvPanel = () => {
setCacheForDelete(undefined)
setShowRemoveConfirm(false)
await doSyncWorkflowDraft()
emit('varsAndFeaturesUpdate')
// Emit update event to other connected clients
const socket = webSocketClient.getSocket(appId)
if (socket?.connected) {
socket.emit('collaboration_event', {
type: 'varsAndFeaturesUpdate',
timestamp: Date.now(),
})
}
if (env.value_type === 'secret') {
const newMap = { ...envSecrets }
delete newMap[env.id]
setEnvSecrets(newMap)
}
}, [doSyncWorkflowDraft, envList, envSecrets, removeUsedVarInNodes, setEnvSecrets, updateEnvList])
}, [doSyncWorkflowDraft, envList, envSecrets, removeUsedVarInNodes, setEnvSecrets, updateEnvList, appId])
const deleteCheck = useCallback((env: EnvironmentVariable) => {
const effectedNodes = getEffectedNodes(env)
@ -105,7 +115,12 @@ const EnvPanel = () => {
const newList = [env, ...envList]
updateEnvList(newList)
await doSyncWorkflowDraft()
emit('varsAndFeaturesUpdate')
const socket = webSocketClient.getSocket(appId)
if (socket) {
socket.emit('collaboration_event', {
type: 'varsAndFeaturesUpdate',
})
}
updateEnvList(newList.map(e => (e.id === env.id && env.value_type === 'secret') ? { ...e, value: '[__HIDDEN__]' } : e))
return
}
@ -147,9 +162,14 @@ const EnvPanel = () => {
setNodes(newNodes)
}
await doSyncWorkflowDraft()
emit('varsAndFeaturesUpdate')
const socket = webSocketClient.getSocket(appId)
if (socket) {
socket.emit('collaboration_event', {
type: 'varsAndFeaturesUpdate',
})
}
updateEnvList(newList.map(e => (e.id === env.id && env.value_type === 'secret') ? { ...e, value: '[__HIDDEN__]' } : e))
}, [currentVar, doSyncWorkflowDraft, envList, envSecrets, getEffectedNodes, setEnvSecrets, store, updateEnvList])
}, [currentVar, doSyncWorkflowDraft, envList, envSecrets, getEffectedNodes, setEnvSecrets, store, updateEnvList, appId])
return (
<div

View File

@ -1,78 +0,0 @@
import { create } from 'zustand'
import { connectOnlineUserWebSocket } from '@/service/demo/online-user'
export type WebSocketInstance = ReturnType<typeof connectOnlineUserWebSocket>
type WebSocketStore = {
socket: WebSocketInstance | null
listeners: Map<string, Set<(data: any) => void>>
isConnected: () => boolean
getSocket: (appId: string) => WebSocketInstance
emit: (eventType: string, data?: any) => void
on: (eventType: string, handler: (data?: any) => void) => () => void
}
export const useWebSocketStore = create<WebSocketStore>((set, get) => ({
socket: null,
listeners: new Map(),
isConnected: () => {
const { socket } = get()
return socket?.connected || false
},
getSocket: (appId: string) => {
let { socket } = get()
if (!socket) {
socket = connectOnlineUserWebSocket(appId)
socket.on('collaboration_update', (update) => {
const { listeners } = get()
const eventListeners = listeners.get(update.type)
if (eventListeners) {
eventListeners.forEach((handler) => {
try {
handler(update)
}
catch (error) {
console.error(`Error in collaboration event handler for ${update.type}:`, error)
}
})
}
})
set({ socket })
}
return socket
},
emit: (eventType: string, data?: any) => {
const { socket } = get()
if (socket?.connected) {
socket.emit('collaboration_event', {
type: eventType,
data,
timestamp: Date.now(),
})
}
},
on: (eventType: string, handler: (data: any) => void) => {
const { listeners } = get()
if (!listeners.has(eventType))
listeners.set(eventType, new Set())
listeners.get(eventType)!.add(handler)
return () => {
const currentListeners = get().listeners.get(eventType)
if (currentListeners) {
currentListeners.delete(handler)
if (currentListeners.size === 0)
get().listeners.delete(eventType)
}
}
},
}))

View File

@ -1,74 +0,0 @@
'use client'
import { get } from '../base'
import type { Socket } from 'socket.io-client'
import { io } from 'socket.io-client'
let socket: Socket | null = null
let lastAppId: string | null = null
export function connectOnlineUserWebSocket(appId: string): Socket {
if (socket && lastAppId === appId)
return socket
if (socket)
socket.disconnect()
const url = process.env.NEXT_PUBLIC_SOCKET_URL || 'ws://localhost:5001'
const token = localStorage.getItem('console_token')
socket = io(url, {
path: '/socket.io',
transports: ['websocket'],
auth: { token },
withCredentials: true,
})
lastAppId = appId
socket.on('connect', () => {
socket?.emit('user_connect', { workflow_id: appId })
console.log('WebSocket connected')
})
socket.on('disconnect', () => {
console.log('WebSocket disconnected')
})
socket.on('connect_error', (err) => {
console.error('WebSocket connection error:', err)
})
return socket
}
export function disconnectOnlineUserWebSocket() {
if (socket) {
socket.disconnect()
socket = null
}
}
export const getOnlineUsersSocket = (): Socket | null => {
return socket
}
export type OnlineUser = {
user_id: string
username: string
avatar: string
sid: string
}
export type WorkflowOnlineUsers = {
workflow_id: string
users: OnlineUser[]
}
export type OnlineUserListResponse = {
data: WorkflowOnlineUsers[]
}
export const fetchAppsOnlineUsers = (appIds: string[]) => {
return get<OnlineUserListResponse>('/online-users', {
params: { app_ids: appIds.join(',') },
})
}