refactor all the frontend code

This commit is contained in:
hjlarry 2025-08-07 10:58:53 +08:00
parent 3f3b37b843
commit e43b46786d
21 changed files with 733 additions and 541 deletions

View File

@ -26,7 +26,7 @@ import Loading from '@/app/components/base/loading'
import useBreakpoints, { MediaType } from '@/hooks/use-breakpoints'
import type { App } from '@/types/app'
import useDocumentTitle from '@/hooks/use-document-title'
import { webSocketClient } from '@/app/components/workflow/collaboration/core/websocket-client'
import { webSocketClient } from '@/app/components/workflow/collaboration/core/websocket-manager'
export type IAppDetailLayoutProps = {
children: React.ReactNode
@ -117,7 +117,7 @@ const AppDetailLayout: FC<IAppDetailLayoutProps> = (props) => {
setAppDetailRes(res)
// Only connect for workflow/advanced-chat apps and if not already connected
if ((res.mode === 'workflow' || res.mode === 'advanced-chat') && !webSocketClient.isConnected(appId))
webSocketClient.getClient(appId)
webSocketClient.connect(appId)
}).catch((e: any) => {
if (e.status === 404)
router.replace('/apps')

View File

@ -3,7 +3,6 @@ import {
useEffect,
useMemo,
useRef,
useState,
} from 'react'
import { useFeaturesStore } from '@/app/components/base/features/hooks'
import type { Features as FeaturesData } from '@/app/components/base/features/types'
@ -22,10 +21,9 @@ import {
useWorkflowStartRun,
} from '../hooks'
import { useStore, useWorkflowStore } from '@/app/components/workflow/store'
import { webSocketClient } from '@/app/components/workflow/collaboration/core/websocket-client'
import { useCollaborativeCursors } from '../hooks'
import type { OnlineUser } from '@/app/components/workflow/collaboration/core/websocket-client'
import { collaborationManager } from '@/app/components/workflow/collaboration/manage'
import { useCollaboration } from '@/app/components/workflow/collaboration'
import { collaborationManager } from '@/app/components/workflow/collaboration'
import { fetchWorkflowDraft } from '@/service/workflow'
import { useStoreApi } from 'reactflow'
@ -39,19 +37,18 @@ const WorkflowMain = ({
const workflowStore = useWorkflowStore()
const appId = useStore(s => s.appId)
const containerRef = useRef<HTMLDivElement>(null)
const lastEmitTimeRef = useRef<number>(0)
const lastPositionRef = useRef<{ x: number; y: number } | null>(null)
const store = useStoreApi()
const { startCursorTracking, stopCursorTracking, onlineUsers } = useCollaboration(appId, store)
useEffect(() => {
collaborationManager.init(appId, store)
}, [appId, store])
if (containerRef.current)
startCursorTracking(containerRef as React.RefObject<HTMLElement>)
// Get the socket for current app
const wsClient = useMemo(() => {
return appId ? webSocketClient.getClient(appId) : null
}, [appId])
return () => {
stopCursorTracking()
}
}, [startCursorTracking, stopCursorTracking])
const handleWorkflowDataUpdate = useCallback((payload: any) => {
const {
@ -102,64 +99,19 @@ const WorkflowMain = ({
useEffect(() => {
if (!appId) return
wsClient?.on('collaboration_update', async (update: any) => {
if (update.type === 'varsAndFeaturesUpdate') {
try {
const response = await fetchWorkflowDraft(`/apps/${appId}/workflows/draft`)
handleWorkflowDataUpdate(response)
}
catch (error) {
console.error('workflow vars and features update failed:', error)
}
const unsubscribe = collaborationManager.onVarsAndFeaturesUpdate(async (update: any) => {
try {
const response = await fetchWorkflowDraft(`/apps/${appId}/workflows/draft`)
handleWorkflowDataUpdate(response)
}
catch (error) {
console.error('workflow vars and features update failed:', error)
}
})
}, [appId, wsClient])
const handleMouseMove = useCallback((event: MouseEvent) => {
if (!containerRef.current || !wsClient?.connected) return
const rect = containerRef.current.getBoundingClientRect()
const x = event.clientX - rect.left
const y = event.clientY - rect.top
// Only emit if mouse is within the container
if (x >= 0 && y >= 0 && x <= rect.width && y <= rect.height) {
const now = Date.now()
const timeSinceLastEmit = now - lastEmitTimeRef.current
if (timeSinceLastEmit >= 300) {
lastEmitTimeRef.current = now
lastPositionRef.current = { x, y }
const eventData = {
type: 'mouseMove',
data: { x, y },
timestamp: now,
}
wsClient.emit('collaboration_event', eventData)
// Debug log
if (process.env.NODE_ENV === 'development')
console.log('Mouse move emitted:', eventData)
}
else {
// Update position for potential future emit
lastPositionRef.current = { x, y }
}
}
}, [wsClient])
useEffect(() => {
const container = containerRef.current
if (!container) return
container.addEventListener('mousemove', handleMouseMove)
return () => {
container.removeEventListener('mousemove', handleMouseMove)
}
}, [handleMouseMove])
return unsubscribe
}, [appId, handleWorkflowDataUpdate])
const {
doSyncWorkflowDraft,
@ -180,25 +132,6 @@ const WorkflowMain = ({
} = useWorkflowStartRun()
const { cursors, myUserId } = useCollaborativeCursors(appId)
const [onlineUsers, setOnlineUsers] = useState<Record<string, OnlineUser>>({})
useEffect(() => {
if (!appId || !wsClient) return
const handleOnlineUsersUpdate = (data: { users: OnlineUser[] }) => {
const usersMap = data.users.reduce((acc, user) => {
acc[user.user_id] = user
return acc
}, {} as Record<string, OnlineUser>)
setOnlineUsers(usersMap)
}
wsClient.on('online_users', handleOnlineUsersUpdate)
return () => {
wsClient.off('online_users', handleOnlineUsersUpdate)
}
}, [appId, wsClient])
const { fetchInspectVars } = useSetWorkflowVarsWithValue({
flowId: appId,
@ -302,7 +235,7 @@ const WorkflowMain = ({
if (userId === myUserId)
return null
const userInfo = onlineUsers[userId]
const userInfo = onlineUsers.find(user => user.user_id === userId)
const userName = userInfo?.username || `User ${userId.slice(-4)}`
const getUserColor = (id: string) => {

View File

@ -1,45 +1,18 @@
import { useEffect, useState } from 'react'
import { webSocketClient } from '@/app/components/workflow/collaboration/core/websocket-client'
import { useCollaboration } from '@/app/components/workflow/collaboration'
export function useCollaborativeCursors(appId: string) {
const [cursors, setCursors] = useState<Record<string, any>>({})
const { cursors, isConnected } = useCollaboration(appId)
const [myUserId, setMyUserId] = useState<string | null>(null)
useEffect(() => {
if (!appId) return
if (isConnected)
setMyUserId('current-user')
}, [isConnected])
// Get existing socket or create new one
const wsClient = webSocketClient.getClient(appId)
const filteredCursors = Object.fromEntries(
Object.entries(cursors).filter(([userId]) => userId !== myUserId),
)
const handleConnect = () => {
setMyUserId(wsClient.id || 'unknown')
}
// Listen to collaboration events for this specific app
const unsubscribeMouseMove = wsClient.on('collaboration_update', (update: any) => {
if (update.type === 'mouseMove' && update.userId !== myUserId) {
setCursors(prev => ({
...prev,
[update.userId]: {
x: update.data.x,
y: update.data.y,
userId: update.userId,
timestamp: update.timestamp,
},
}))
}
})
if (wsClient.connected)
handleConnect()
else
wsClient.on('connect', handleConnect)
return () => {
unsubscribeMouseMove()
wsClient.off('connect', handleConnect)
}
}, [appId])
return { cursors, myUserId }
return { cursors: filteredCursors, myUserId }
}

View File

@ -1,4 +1,5 @@
import {
useEffect,
useMemo,
} from 'react'
import useSWR from 'swr'
@ -23,22 +24,31 @@ import {
} from '@/app/components/workflow/context'
import { createWorkflowSlice } from './store/workflow/workflow-slice'
import WorkflowAppMain from './components/workflow-main'
import { collaborationManager } from '@/app/components/workflow/collaboration/manage'
import { collaborationManager } from '@/app/components/workflow/collaboration/core/collaboration-manager'
import { useStore } from '@/app/components/workflow/store'
const WorkflowAppWithAdditionalContext = () => {
const {
data,
isLoading,
} = useWorkflowInit()
const { setNodes, setEdges } = collaborationManager
const appId = useStore(s => s.appId)
const { data: fileUploadConfigResponse } = useSWR({ url: '/files/upload' }, fetchFileUploadConfig)
useEffect(() => {
if (appId && data)
collaborationManager.init(appId, null)
return () => {
collaborationManager.destroy()
}
}, [appId, data])
const nodesData = useMemo(() => {
if (data) {
const processedNodes = initialNodes(data.graph.nodes, data.graph.edges)
setNodes([], processedNodes)
collaborationManager.setNodes([], processedNodes)
return processedNodes
}
return []
@ -47,8 +57,7 @@ const WorkflowAppWithAdditionalContext = () => {
const edgesData = useMemo(() => {
if (data) {
const processedEdges = initialEdges(data.graph.edges, data.graph.nodes)
setEdges([], processedEdges)
collaborationManager.setEdges([], processedEdges)
return processedEdges
}
return []

View File

@ -0,0 +1,218 @@
import { LoroDoc } from 'loro-crdt'
import { isEqual } from 'lodash-es'
import { webSocketClient } from './websocket-manager'
import { CRDTProvider } from './crdt-provider'
import { EventEmitter } from './event-emitter'
import type { Edge, Node } from '../../types'
import type { CollaborationState, CursorPosition, OnlineUser } from '../types/collaboration'
export class CollaborationManager {
private doc: LoroDoc | null = null
private provider: CRDTProvider | null = null
private nodesMap: any = null
private edgesMap: any = null
private eventEmitter = new EventEmitter()
private currentAppId: string | null = null
private reactFlowStore: any = null
init = (appId: string, reactFlowStore: any): void => {
this.connect(appId, reactFlowStore)
}
setNodes = (oldNodes: Node[], newNodes: Node[]): void => {
this.syncNodes(oldNodes, newNodes)
if (this.doc)
this.doc.commit()
}
setEdges = (oldEdges: Edge[], newEdges: Edge[]): void => {
this.syncEdges(oldEdges, newEdges)
if (this.doc)
this.doc.commit()
}
destroy = (): void => {
this.disconnect()
}
async connect(appId: string, reactFlowStore: any): Promise<void> {
if (this.currentAppId === appId && this.doc) return
this.disconnect()
this.currentAppId = appId
this.reactFlowStore = reactFlowStore
const socket = webSocketClient.connect(appId)
this.doc = new LoroDoc()
this.nodesMap = this.doc.getMap('nodes')
this.edgesMap = this.doc.getMap('edges')
this.provider = new CRDTProvider(socket, this.doc)
this.setupSubscriptions()
this.setupSocketEventListeners(socket)
}
disconnect = (): void => {
if (this.currentAppId)
webSocketClient.disconnect(this.currentAppId)
this.provider?.destroy()
this.doc = null
this.provider = null
this.nodesMap = null
this.edgesMap = null
this.currentAppId = null
this.reactFlowStore = null
this.eventEmitter.removeAllListeners()
}
isConnected(): boolean {
return this.currentAppId ? webSocketClient.isConnected(this.currentAppId) : false
}
getNodes(): Node[] {
return this.nodesMap ? Array.from(this.nodesMap.values()) : []
}
getEdges(): Edge[] {
return this.edgesMap ? Array.from(this.edgesMap.values()) : []
}
emitCursorMove(position: CursorPosition): void {
if (!this.currentAppId || !webSocketClient.isConnected(this.currentAppId)) return
const socket = webSocketClient.getSocket(this.currentAppId)
if (socket) {
socket.emit('collaboration_event', {
type: 'mouseMove',
userId: socket.id,
data: { x: position.x, y: position.y },
timestamp: Date.now(),
})
}
}
onStateChange(callback: (state: Partial<CollaborationState>) => void): () => void {
return this.eventEmitter.on('stateChange', callback)
}
onCursorUpdate(callback: (cursors: Record<string, CursorPosition>) => void): () => void {
return this.eventEmitter.on('cursors', callback)
}
onOnlineUsersUpdate(callback: (users: OnlineUser[]) => void): () => void {
return this.eventEmitter.on('onlineUsers', callback)
}
onVarsAndFeaturesUpdate(callback: (update: any) => void): () => void {
return this.eventEmitter.on('varsAndFeaturesUpdate', callback)
}
private syncNodes(oldNodes: Node[], newNodes: Node[]): void {
if (!this.nodesMap) return
const oldNodesMap = new Map(oldNodes.map(node => [node.id, node]))
const newNodesMap = new Map(newNodes.map(node => [node.id, node]))
oldNodes.forEach((oldNode) => {
if (!newNodesMap.has(oldNode.id))
this.nodesMap.delete(oldNode.id)
})
newNodes.forEach((newNode) => {
const oldNode = oldNodesMap.get(newNode.id)
if (!oldNode) {
this.nodesMap.set(newNode.id, newNode)
}
else {
const oldPersistentData = this.getPersistentNodeData(oldNode)
const newPersistentData = this.getPersistentNodeData(newNode)
if (!isEqual(oldPersistentData, newPersistentData))
this.nodesMap.set(newNode.id, newPersistentData)
}
})
}
private syncEdges(oldEdges: Edge[], newEdges: Edge[]): void {
if (!this.edgesMap) return
const oldEdgesMap = new Map(oldEdges.map(edge => [edge.id, edge]))
const newEdgesMap = new Map(newEdges.map(edge => [edge.id, edge]))
oldEdges.forEach((oldEdge) => {
if (!newEdgesMap.has(oldEdge.id))
this.edgesMap.delete(oldEdge.id)
})
newEdges.forEach((newEdge) => {
const oldEdge = oldEdgesMap.get(newEdge.id)
if (!oldEdge)
this.edgesMap.set(newEdge.id, newEdge)
else if (!isEqual(oldEdge, newEdge))
this.edgesMap.set(newEdge.id, newEdge)
})
}
private getPersistentNodeData(node: Node): any {
const { data, ...rest } = node
const filteredData = Object.fromEntries(
Object.entries(data).filter(([key]) => !key.startsWith('_')),
)
return { ...rest, data: filteredData }
}
private setupSubscriptions(): void {
this.nodesMap?.subscribe((event: any) => {
if (event.by === 'import' && this.reactFlowStore) {
requestAnimationFrame(() => {
const { setNodes } = this.reactFlowStore.getState()
const updatedNodes = Array.from(this.nodesMap.values())
setNodes(updatedNodes)
})
}
})
this.edgesMap?.subscribe((event: any) => {
if (event.by === 'import' && this.reactFlowStore) {
requestAnimationFrame(() => {
const { setEdges } = this.reactFlowStore.getState()
const updatedEdges = Array.from(this.edgesMap.values())
setEdges(updatedEdges)
})
}
})
}
private setupSocketEventListeners(socket: any): void {
socket.on('collaboration_update', (update: any) => {
if (update.type === 'mouseMove') {
this.eventEmitter.emit('cursors', {
[update.userId]: {
x: update.data.x,
y: update.data.y,
userId: update.userId,
timestamp: update.timestamp,
},
})
}
else if (update.type === 'varsAndFeaturesUpdate') {
this.eventEmitter.emit('varsAndFeaturesUpdate', update)
}
})
socket.on('online_users', (data: { users: OnlineUser[] }) => {
this.eventEmitter.emit('onlineUsers', data.users)
})
socket.on('connect', () => {
this.eventEmitter.emit('stateChange', { isConnected: true })
})
socket.on('disconnect', () => {
this.eventEmitter.emit('stateChange', { isConnected: false })
})
}
}
export const collaborationManager = new CollaborationManager()

View File

@ -0,0 +1,36 @@
import type { LoroDoc } from 'loro-crdt'
import type { Socket } from 'socket.io-client'
export class CRDTProvider {
private doc: LoroDoc
private socket: Socket
constructor(socket: Socket, doc: LoroDoc) {
this.socket = socket
this.doc = doc
this.setupEventListeners()
}
private setupEventListeners(): void {
this.doc.subscribe((event: any) => {
if (event.by === 'local') {
const update = this.doc.export({ mode: 'update' })
this.socket.emit('graph_event', update)
}
})
this.socket.on('graph_update', (updateData: Uint8Array) => {
try {
const data = new Uint8Array(updateData)
this.doc.import(data)
}
catch (error) {
console.error('Error importing graph update:', error)
}
})
}
destroy(): void {
this.socket.off('graph_update')
}
}

View File

@ -0,0 +1,49 @@
export type EventHandler<T = any> = (data: T) => void
export class EventEmitter {
private events: Map<string, Set<EventHandler>> = new Map()
on<T = any>(event: string, handler: EventHandler<T>): () => void {
if (!this.events.has(event))
this.events.set(event, new Set())
this.events.get(event)!.add(handler)
return () => this.off(event, handler)
}
off<T = any>(event: string, handler?: EventHandler<T>): void {
if (!this.events.has(event)) return
const handlers = this.events.get(event)!
if (handler)
handlers.delete(handler)
else
handlers.clear()
if (handlers.size === 0)
this.events.delete(event)
}
emit<T = any>(event: string, data: T): void {
if (!this.events.has(event)) return
const handlers = this.events.get(event)!
handlers.forEach((handler) => {
try {
handler(data)
}
catch (error) {
console.error(`Error in event handler for ${event}:`, error)
}
})
}
removeAllListeners(): void {
this.events.clear()
}
getListenerCount(event: string): number {
return this.events.get(event)?.size || 0
}
}

View File

@ -1,253 +0,0 @@
import type { Socket } from 'socket.io-client'
import { io } from 'socket.io-client'
export type WebSocketConfig = {
url?: string
token?: string
transports?: string[]
withCredentials?: boolean
}
export type OnlineUser = {
user_id: string
username: string
avatar: string
sid: string
}
export type WorkflowOnlineUsers = {
workflow_id: string
users: OnlineUser[]
}
export type OnlineUserListResponse = {
data: WorkflowOnlineUsers[]
}
/**
* App-specific WebSocket client
* Provides a clean API for a specific app's WebSocket operations
*/
export class AppWebSocketClient {
constructor(
private appId: string,
private socket: Socket,
private manager: WebSocketClient,
) {
// Initialize app-specific WebSocket client
}
/**
* Listen to events
*/
on(event: string, handler: (...args: any[]) => void): () => void {
this.socket.on(event, handler)
return () => this.socket.off(event, handler)
}
/**
* Remove event listener
*/
off(event: string, handler?: (...args: any[]) => void): void {
this.socket.off(event, handler)
}
/**
* Emit event
*/
emit(event: string, ...args: any[]): void {
if (this.socket.connected)
this.socket.emit(event, ...args)
}
/**
* Check connection status
*/
get connected(): boolean {
return this.socket.connected
}
/**
* Get socket ID
*/
get id(): string | undefined {
return this.socket.id
}
/**
* Disconnect this specific app
*/
disconnect(): void {
this.manager.disconnect(this.appId)
}
/**
* Get the underlying socket (for advanced usage)
*/
getSocket(): Socket {
return this.socket
}
}
/**
* Multi-connection WebSocket manager
* Supports multiple concurrent connections for different apps
*/
export class WebSocketClient {
private connections: Map<string, Socket> = new Map()
private connecting: Set<string> = new Set()
private config: WebSocketConfig
constructor(config: WebSocketConfig = {}) {
this.config = {
url: config.url || process.env.NEXT_PUBLIC_SOCKET_URL || 'ws://localhost:5001',
transports: config.transports || ['websocket'],
withCredentials: config.withCredentials !== false,
...config,
}
}
/**
* Get or create app-specific WebSocket client
*/
getClient(appId: string): AppWebSocketClient {
let socket = this.connections.get(appId)
if (!socket || !socket.connected)
socket = this.connect(appId)
return new AppWebSocketClient(appId, socket, this)
}
/**
* Connect to WebSocket server for specific app
*/
private connect(appId: string): Socket {
// Return existing connection if available and connected
const existingSocket = this.connections.get(appId)
if (existingSocket?.connected)
return existingSocket
// If already connecting, return the pending socket
if (this.connecting.has(appId)) {
const pendingSocket = this.connections.get(appId)
if (pendingSocket)
return pendingSocket
}
// Clean up disconnected socket
if (existingSocket && !existingSocket.connected) {
existingSocket.disconnect()
this.connections.delete(appId)
}
// Mark as connecting to prevent duplicate connections
this.connecting.add(appId)
const authToken = localStorage.getItem('console_token')
const socket = io(this.config.url!, {
path: '/socket.io',
transports: this.config.transports,
auth: { token: authToken },
withCredentials: this.config.withCredentials,
})
this.connections.set(appId, socket)
this.setupBaseEventListeners(socket, appId)
return socket
}
/**
* Disconnect from specific app or all connections
*/
disconnect(appId?: string): void {
if (appId) {
const socket = this.connections.get(appId)
if (socket) {
socket.disconnect()
this.connections.delete(appId)
this.connecting.delete(appId)
}
}
else {
// Disconnect all connections
this.connections.forEach(socket => socket.disconnect())
this.connections.clear()
this.connecting.clear()
}
}
/**
* Get socket instance for specific app (for backward compatibility)
*/
getSocket(appId: string): Socket | null {
return this.connections.get(appId) || null
}
/**
* Check connection status for specific app
*/
isConnected(appId: string): boolean {
return this.connections.get(appId)?.connected || false
}
/**
* Get all connected app IDs
*/
getConnectedApps(): string[] {
const connectedApps: string[] = []
this.connections.forEach((socket, appId) => {
if (socket.connected)
connectedApps.push(appId)
})
return connectedApps
}
/**
* Debug method: get connection status for all apps
*/
getDebugInfo(): Record<string, { connected: boolean; connecting: boolean; socketId?: string }> {
const info: Record<string, { connected: boolean; connecting: boolean; socketId?: string }> = {}
this.connections.forEach((socket, appId) => {
info[appId] = {
connected: socket.connected,
connecting: this.connecting.has(appId),
socketId: socket.id,
}
})
return info
}
private setupBaseEventListeners(socket: Socket, appId: string): void {
socket.on('connect', () => {
this.connecting.delete(appId)
socket.emit('user_connect', { workflow_id: appId })
console.log(`WebSocket connected for app: ${appId}`)
})
socket.on('disconnect', () => {
this.connecting.delete(appId)
console.log(`WebSocket disconnected for app: ${appId}`)
})
socket.on('connect_error', (err) => {
this.connecting.delete(appId)
console.error(`WebSocket connection error for app ${appId}:`, err)
})
}
}
// Singleton instance
export const webSocketClient = new WebSocketClient()
// Online users API
export const fetchAppsOnlineUsers = async (appIds: string[]) => {
const response = await fetch(`/api/online-users?${new URLSearchParams({
app_ids: appIds.join(','),
})}`)
return response.json()
}

View File

@ -0,0 +1,119 @@
import type { Socket } from 'socket.io-client'
import { io } from 'socket.io-client'
import type { DebugInfo, WebSocketConfig } from '../types/websocket'
export class WebSocketClient {
private connections: Map<string, Socket> = new Map()
private connecting: Set<string> = new Set()
private config: WebSocketConfig
constructor(config: WebSocketConfig = {}) {
this.config = {
url: config.url || process.env.NEXT_PUBLIC_SOCKET_URL || 'ws://localhost:5001',
transports: config.transports || ['websocket'],
withCredentials: config.withCredentials !== false,
...config,
}
}
connect(appId: string): Socket {
const existingSocket = this.connections.get(appId)
if (existingSocket?.connected)
return existingSocket
if (this.connecting.has(appId)) {
const pendingSocket = this.connections.get(appId)
if (pendingSocket)
return pendingSocket
}
if (existingSocket && !existingSocket.connected) {
existingSocket.disconnect()
this.connections.delete(appId)
}
this.connecting.add(appId)
const authToken = localStorage.getItem('console_token')
const socket = io(this.config.url!, {
path: '/socket.io',
transports: this.config.transports,
auth: { token: authToken },
withCredentials: this.config.withCredentials,
})
this.connections.set(appId, socket)
this.setupBaseEventListeners(socket, appId)
return socket
}
disconnect(appId?: string): void {
if (appId) {
const socket = this.connections.get(appId)
if (socket) {
socket.disconnect()
this.connections.delete(appId)
this.connecting.delete(appId)
}
}
else {
this.connections.forEach(socket => socket.disconnect())
this.connections.clear()
this.connecting.clear()
}
}
getSocket(appId: string): Socket | null {
return this.connections.get(appId) || null
}
isConnected(appId: string): boolean {
return this.connections.get(appId)?.connected || false
}
getConnectedApps(): string[] {
const connectedApps: string[] = []
this.connections.forEach((socket, appId) => {
if (socket.connected)
connectedApps.push(appId)
})
return connectedApps
}
getDebugInfo(): DebugInfo {
const info: DebugInfo = {}
this.connections.forEach((socket, appId) => {
info[appId] = {
connected: socket.connected,
connecting: this.connecting.has(appId),
socketId: socket.id,
}
})
return info
}
private setupBaseEventListeners(socket: Socket, appId: string): void {
socket.on('connect', () => {
this.connecting.delete(appId)
socket.emit('user_connect', { workflow_id: appId })
})
socket.on('disconnect', () => {
this.connecting.delete(appId)
})
socket.on('connect_error', () => {
this.connecting.delete(appId)
})
}
}
export const webSocketClient = new WebSocketClient()
export const fetchAppsOnlineUsers = async (appIds: string[]) => {
const response = await fetch(`/api/online-users?${new URLSearchParams({
app_ids: appIds.join(','),
})}`)
return response.json()
}

View File

@ -0,0 +1,72 @@
import { useEffect, useRef, useState } from 'react'
import { collaborationManager } from '../core/collaboration-manager'
import { CursorService } from '../services/cursor-service'
import type { CollaborationState } from '../types/collaboration'
export function useCollaboration(appId: string, reactFlowStore?: any) {
const [state, setState] = useState<Partial<CollaborationState>>({
isConnected: false,
onlineUsers: [],
cursors: {},
})
const cursorServiceRef = useRef<CursorService | null>(null)
useEffect(() => {
if (!appId) return
if (!cursorServiceRef.current) {
cursorServiceRef.current = new CursorService({
minMoveDistance: 10,
throttleMs: 300,
})
}
const initCollaboration = async () => {
await collaborationManager.connect(appId, reactFlowStore)
setState((prev: any) => ({ ...prev, appId, isConnected: collaborationManager.isConnected() }))
}
initCollaboration()
const unsubscribeStateChange = collaborationManager.onStateChange((newState: any) => {
setState((prev: any) => ({ ...prev, ...newState }))
})
const unsubscribeCursors = collaborationManager.onCursorUpdate((cursors: any) => {
setState((prev: any) => ({ ...prev, cursors }))
})
const unsubscribeUsers = collaborationManager.onOnlineUsersUpdate((users: any) => {
setState((prev: any) => ({ ...prev, onlineUsers: users }))
})
return () => {
unsubscribeStateChange()
unsubscribeCursors()
unsubscribeUsers()
cursorServiceRef.current?.stopTracking()
collaborationManager.disconnect()
}
}, [appId, reactFlowStore])
const startCursorTracking = (containerRef: React.RefObject<HTMLElement>) => {
if (cursorServiceRef.current) {
cursorServiceRef.current.startTracking(containerRef, (position) => {
collaborationManager.emitCursorMove(position)
})
}
}
const stopCursorTracking = () => {
cursorServiceRef.current?.stopTracking()
}
return {
isConnected: state.isConnected || false,
onlineUsers: state.onlineUsers || [],
cursors: state.cursors || {},
startCursorTracking,
stopCursorTracking,
}
}

View File

@ -0,0 +1,5 @@
export { collaborationManager } from './core/collaboration-manager'
export { webSocketClient, fetchAppsOnlineUsers } from './core/websocket-manager'
export { CursorService } from './services/cursor-service'
export { useCollaboration } from './hooks/use-collaboration'
export * from './types'

View File

@ -1,153 +0,0 @@
import { LoroDoc } from 'loro-crdt'
import { isEqual } from 'lodash-es'
import { webSocketClient } from './core/websocket-client'
import type { Edge, Node } from '../types'
class LoroSocketIOProvider {
private doc: LoroDoc
private socket: any
constructor(socket: any, doc: LoroDoc) {
this.socket = socket
this.doc = doc
this.setupEventListeners()
}
private setupEventListeners() {
this.doc.subscribe((event: any) => {
if (event.by === 'local') {
const update = this.doc.export({ mode: 'update' })
this.socket.emit('graph_event', update)
}
})
this.socket.on('graph_update', (updateData: Uint8Array) => {
try {
const data = new Uint8Array(updateData)
this.doc.import(data)
}
catch (error) {
console.error('Error importing graph update:', error)
}
})
}
destroy() {
this.socket.off('graph_update')
}
}
class CollaborationManager {
private doc: LoroDoc | null = null
private provider: LoroSocketIOProvider | null = null
private nodesMap: any = null
private edgesMap: any = null
init(appId: string, reactFlowStore: any) {
const socket = webSocketClient.getClient(appId)
this.doc = new LoroDoc()
this.nodesMap = this.doc.getMap('nodes')
this.edgesMap = this.doc.getMap('edges')
this.provider = new LoroSocketIOProvider(socket, this.doc)
this.setupSubscriptions(reactFlowStore)
}
private setupSubscriptions(reactFlowStore: any) {
this.nodesMap?.subscribe((event: any) => {
console.log('nodesMap', event)
if (event.by === 'import') {
requestAnimationFrame(() => {
const { setNodes: reactFlowSetNodes } = reactFlowStore.getState()
const updatedNodes = Array.from(this.nodesMap.values())
reactFlowSetNodes(updatedNodes)
})
}
})
this.edgesMap?.subscribe((event: any) => {
if (event.by === 'import') {
requestAnimationFrame(() => {
const { setEdges: reactFlowSetEdges } = reactFlowStore.getState()
const updatedEdges = Array.from(this.edgesMap.values())
reactFlowSetEdges(updatedEdges)
})
}
})
}
getNodes() {
return this.nodesMap ? Array.from(this.nodesMap.values()) : []
}
getEdges() {
return this.edgesMap ? Array.from(this.edgesMap.values()) : []
}
private getPersistentNodeData = (node: Node) => {
const { data, ...rest } = node
const filteredData = Object.fromEntries(
Object.entries(data).filter(([key]) => !key.startsWith('_')),
)
return { ...rest, data: filteredData }
}
setNodes = (oldNodes: Node[], newNodes: Node[]) => {
if (!this.nodesMap || !this.doc) return
const oldNodesMap = new Map(oldNodes.map(node => [node.id, node]))
const newNodesMap = new Map(newNodes.map(node => [node.id, node]))
oldNodes.forEach((oldNode) => {
if (!newNodesMap.has(oldNode.id))
this.nodesMap.delete(oldNode.id)
})
newNodes.forEach((newNode) => {
const oldNode = oldNodesMap.get(newNode.id)
if (!oldNode) {
this.nodesMap.set(newNode.id, newNode)
}
else {
const oldPersistentData = this.getPersistentNodeData(oldNode)
const newPersistentData = this.getPersistentNodeData(newNode)
if (!isEqual(oldPersistentData, newPersistentData))
this.nodesMap.set(newNode.id, newPersistentData)
}
})
this.doc.commit()
}
setEdges = (oldEdges: Edge[], newEdges: Edge[]) => {
if (!this.edgesMap || !this.doc) return
const oldEdgesMap = new Map(oldEdges.map(edge => [edge.id, edge]))
const newEdgesMap = new Map(newEdges.map(edge => [edge.id, edge]))
oldEdges.forEach((oldEdge) => {
if (!newEdgesMap.has(oldEdge.id))
this.edgesMap.delete(oldEdge.id)
})
newEdges.forEach((newEdge) => {
const oldEdge = oldEdgesMap.get(newEdge.id)
if (!oldEdge)
this.edgesMap.set(newEdge.id, newEdge)
else if (!isEqual(oldEdge, newEdge))
this.edgesMap.set(newEdge.id, newEdge)
})
this.doc.commit()
}
destroy() {
this.provider?.destroy()
this.doc = null
this.provider = null
this.nodesMap = null
this.edgesMap = null
}
}
export const collaborationManager = new CollaborationManager()

View File

@ -0,0 +1,84 @@
import type { RefObject } from 'react'
import type { CursorPosition } from '../types/collaboration'
export type CursorServiceConfig = {
minMoveDistance?: number
throttleMs?: number
}
export class CursorService {
private containerRef: RefObject<HTMLElement> | null = null
private isTracking = false
private onCursorUpdate: ((cursors: Record<string, CursorPosition>) => void) | null = null
private onEmitPosition: ((position: CursorPosition) => void) | null = null
private lastEmitTime = 0
private lastPosition: { x: number; y: number } | null = null
private config: Required<CursorServiceConfig>
constructor(config: CursorServiceConfig = {}) {
this.config = {
minMoveDistance: config.minMoveDistance ?? 5,
throttleMs: config.throttleMs ?? 300,
}
}
startTracking(
containerRef: RefObject<HTMLElement>,
onEmitPosition: (position: CursorPosition) => void,
): void {
if (this.isTracking) this.stopTracking()
this.containerRef = containerRef
this.onEmitPosition = onEmitPosition
this.isTracking = true
if (containerRef.current)
containerRef.current.addEventListener('mousemove', this.handleMouseMove)
}
stopTracking(): void {
if (this.containerRef?.current)
this.containerRef.current.removeEventListener('mousemove', this.handleMouseMove)
this.containerRef = null
this.onEmitPosition = null
this.isTracking = false
this.lastPosition = null
}
setCursorUpdateHandler(handler: (cursors: Record<string, CursorPosition>) => void): void {
this.onCursorUpdate = handler
}
updateCursors(cursors: Record<string, CursorPosition>): void {
if (this.onCursorUpdate)
this.onCursorUpdate(cursors)
}
private handleMouseMove = (event: MouseEvent): void => {
if (!this.containerRef?.current || !this.onEmitPosition) return
const rect = this.containerRef.current.getBoundingClientRect()
const x = event.clientX - rect.left
const y = event.clientY - rect.top
if (x >= 0 && y >= 0 && x <= rect.width && y <= rect.height) {
const now = Date.now()
const timeThrottled = now - this.lastEmitTime > this.config.throttleMs
const distanceThrottled = !this.lastPosition
|| (Math.abs(x - this.lastPosition.x) > this.config.minMoveDistance
|| Math.abs(y - this.lastPosition.y) > this.config.minMoveDistance)
if (timeThrottled && distanceThrottled) {
this.lastPosition = { x, y }
this.lastEmitTime = now
this.onEmitPosition({
x,
y,
userId: '',
timestamp: now,
})
}
}
}
}

View File

@ -0,0 +1,43 @@
import type { Edge, Node } from '../../types'
export type OnlineUser = {
user_id: string
username: string
avatar: string
sid: string
}
export type WorkflowOnlineUsers = {
workflow_id: string
users: OnlineUser[]
}
export type OnlineUserListResponse = {
data: WorkflowOnlineUsers[]
}
export type CursorPosition = {
x: number
y: number
userId: string
timestamp: number
}
export type CollaborationState = {
appId: string
isConnected: boolean
onlineUsers: OnlineUser[]
cursors: Record<string, CursorPosition>
}
export type GraphSyncData = {
nodes: Node[]
edges: Edge[]
}
export type CollaborationUpdate = {
type: 'mouseMove' | 'graphUpdate' | 'userJoin' | 'userLeave'
userId: string
data: any
timestamp: number
}

View File

@ -0,0 +1,38 @@
export type CollaborationEvent = {
type: string
data: any
timestamp: number
}
export type GraphUpdateEvent = {
type: 'graph_update'
data: Uint8Array
} & CollaborationEvent
export type CursorMoveEvent = {
type: 'cursor_move'
data: {
x: number
y: number
userId: string
}
} & CollaborationEvent
export type UserConnectEvent = {
type: 'user_connect'
data: {
workflow_id: string
}
} & CollaborationEvent
export type OnlineUsersEvent = {
type: 'online_users'
data: {
users: Array<{
user_id: string
username: string
avatar: string
sid: string
}>
}
} & CollaborationEvent

View File

@ -0,0 +1,3 @@
export * from './websocket'
export * from './collaboration'
export * from './events'

View File

@ -0,0 +1,16 @@
export type WebSocketConfig = {
url?: string
token?: string
transports?: string[]
withCredentials?: boolean
}
export type ConnectionInfo = {
connected: boolean
connecting: boolean
socketId?: string
}
export type DebugInfo = {
[appId: string]: ConnectionInfo
}

View File

@ -14,7 +14,7 @@ import useConfig from './nodes/start/use-config'
import type { StartNodeType } from './nodes/start/types'
import type { PromptVariable } from '@/models/debug'
import NewFeaturePanel from '@/app/components/base/features/new-feature-panel'
import { webSocketClient } from '@/app/components/workflow/collaboration/core/websocket-client'
import { webSocketClient } from '@/app/components/workflow/collaboration/core/websocket-manager'
const Features = () => {
const setShowFeaturesPanel = useStore(s => s.setShowFeaturesPanel)

View File

@ -1,7 +1,7 @@
import { useCallback } from 'react'
import { useStoreApi } from 'reactflow'
import type { Edge, Node } from '../types'
import { collaborationManager } from '../collaboration/manage'
import { collaborationManager } from '../collaboration/core/collaboration-manager'
export const useCollaborativeWorkflow = () => {
const store = useStoreApi()

View File

@ -21,7 +21,7 @@ import type {
import { findUsedVarNodes, updateNodeVars } from '@/app/components/workflow/nodes/_base/components/variable/utils'
import { useNodesSyncDraft } from '@/app/components/workflow/hooks/use-nodes-sync-draft'
import { BlockEnum } from '@/app/components/workflow/types'
import { webSocketClient } from '@/app/components/workflow/collaboration/core/websocket-client'
import { webSocketClient } from '@/app/components/workflow/collaboration/core/websocket-manager'
import { useDocLink } from '@/context/i18n'
import cn from '@/utils/classnames'
import useInspectVarsCrud from '../../hooks/use-inspect-vars-crud'

View File

@ -18,7 +18,7 @@ import { findUsedVarNodes, updateNodeVars } from '@/app/components/workflow/node
import RemoveEffectVarConfirm from '@/app/components/workflow/nodes/_base/components/remove-effect-var-confirm'
import cn from '@/utils/classnames'
import { useNodesSyncDraft } from '@/app/components/workflow/hooks/use-nodes-sync-draft'
import { webSocketClient } from '@/app/components/workflow/collaboration/core/websocket-client'
import { webSocketClient } from '@/app/components/workflow/collaboration/core/websocket-manager'
import { useStore as useWorkflowStore } from '@/app/components/workflow/store'
const EnvPanel = () => {