diff --git a/api/controllers/console/app/online_user.py b/api/controllers/console/app/online_user.py index b58643a0b9..0c653f47bc 100644 --- a/api/controllers/console/app/online_user.py +++ b/api/controllers/console/app/online_user.py @@ -140,10 +140,10 @@ def handle_collaboration_event(sid, data): return {"msg": "event_broadcasted"} -@sio.on("yjs_update") -def handle_yjs_update(sid, data): +@sio.on("graph_update") +def handle_graph_update(sid, data): """ - Handle Y.js document updates - simple broadcast relay. + Handle graph updates - simple broadcast relay. """ mapping = redis_client.get(f"ws_sid_map:{sid}") @@ -152,8 +152,7 @@ def handle_yjs_update(sid, data): mapping_data = json.loads(mapping) workflow_id = mapping_data["workflow_id"] - user_id = mapping_data["user_id"] - sio.emit("yjs_update", data, room=workflow_id, skip_sid=sid) + sio.emit("graph_update", data, room=workflow_id, skip_sid=sid) - return {"msg": "yjs_update_broadcasted"} + return {"msg": "graph_update_broadcasted"} diff --git a/web/app/components/workflow-app/hooks/use-workflow-init.ts b/web/app/components/workflow-app/hooks/use-workflow-init.ts index b682435e8a..fb007c031f 100644 --- a/web/app/components/workflow-app/hooks/use-workflow-init.ts +++ b/web/app/components/workflow-app/hooks/use-workflow-init.ts @@ -40,14 +40,14 @@ export const useWorkflowInit = () => { }, [workflowStore]) useWorkflowConfig(appDetail.id, handleUpdateWorkflowConfig) - const initializeCollaboration = async (appId: string) => { + const initializeCollaboration = async (appId: string) => { const { initCollaboration } = useCollaborationStore.getState() initCollaboration(appId) return new Promise((resolve) => { const checkInitialized = () => { - const { yNodesMap, yEdgesMap } = useCollaborationStore.getState() - if (yNodesMap && yEdgesMap) + const { nodesMap, edgesMap } = useCollaborationStore.getState() + if (nodesMap && edgesMap) resolve() else setTimeout(checkInitialized, 50) @@ -56,21 +56,18 @@ export const useWorkflowInit = () => { }) } - const populateYjsWithServerData = async (serverData: any) => { - const { yNodesMap, yEdgesMap } = useCollaborationStore.getState() - - if (yNodesMap && yEdgesMap && serverData.graph) { - const { ydoc } = useCollaborationStore.getState() - ydoc?.transact(() => { - serverData.graph.nodes?.forEach((node: any) => { - yNodesMap.set(node.id, node) - }) - - serverData.graph.edges?.forEach((edge: any) => { - yEdgesMap.set(edge.id, edge) - }) + const populateCollaborationWithServerData = async (serverData: any) => { + const { nodesMap, edgesMap, loroDoc } = useCollaborationStore.getState() + serverData.graph.nodes?.forEach((node: any) => { + console.log('Setting node:', node.id, node) + nodesMap.set(node.id, node) }) - } + + serverData.graph.edges?.forEach((edge: any) => { + console.log('Setting edge:', edge.id, edge) + edgesMap.set(edge.id, edge) + }) + loroDoc.commit() } const handleGetInitialWorkflowData = useCallback(async () => { @@ -88,7 +85,7 @@ export const useWorkflowInit = () => { environmentVariables: res.environment_variables?.map(env => env.value_type === 'secret' ? { ...env, value: '[__HIDDEN__]' } : env) || [], conversationVariables: res.conversation_variables || [], }) - await populateYjsWithServerData(res) + await populateCollaborationWithServerData(res) setSyncWorkflowDraftHash(res.hash) setIsLoading(false) } diff --git a/web/app/components/workflow/hooks/use-nodes-interactions.ts b/web/app/components/workflow/hooks/use-nodes-interactions.ts index 58018a8c97..6a9b708eca 100644 --- a/web/app/components/workflow/hooks/use-nodes-interactions.ts +++ b/web/app/components/workflow/hooks/use-nodes-interactions.ts @@ -1498,18 +1498,15 @@ export const useNodesInteractions = () => { }) setEdges(newEdges) - const { yNodesMap, yEdgesMap, ydoc } = useCollaborationStore.getState() - if (yNodesMap && yEdgesMap && ydoc) { - ydoc.transact(() => { - newNodes.forEach((node) => { - yNodesMap.set(node.id, node) - }) - console.log('Before edge delete, yEdgesMap size:', yEdgesMap?.size) - connectedEdges.forEach((edge) => { - yEdgesMap.delete(edge.id) - }) - console.log('After edge delete, yEdgesMap size:', yEdgesMap?.size) + const { nodesMap, edgesMap, loroDoc } = useCollaborationStore.getState() + if (nodesMap && edgesMap && loroDoc) { + // newNodes.forEach((node) => { + // nodesMap.set(node.id, node) + // }) + connectedEdges.forEach((edge) => { + edgesMap.delete(edge.id) }) + loroDoc.commit() } handleSyncWorkflowDraft() diff --git a/web/app/components/workflow/index.tsx b/web/app/components/workflow/index.tsx index ebfc1b52fb..377a3fe572 100644 --- a/web/app/components/workflow/index.tsx +++ b/web/app/components/workflow/index.tsx @@ -140,7 +140,6 @@ export const Workflow: FC = memo(({ }, [collaborationNodes, setNodes]) useEffect(() => { - console.log('collaborationEdges changed:', collaborationEdges, 122112) setEdges(collaborationEdges) }, [collaborationEdges, setEdges]) diff --git a/web/app/components/workflow/store/collaboration-store.ts b/web/app/components/workflow/store/collaboration-store.ts index 3a3ae3ac0a..c9fe1f0e44 100644 --- a/web/app/components/workflow/store/collaboration-store.ts +++ b/web/app/components/workflow/store/collaboration-store.ts @@ -1,132 +1,118 @@ import { create } from 'zustand' -import * as Y from 'yjs' + +import { LoroDoc } from 'loro-crdt' import type { Edge, Node } from '../types' import { useWebSocketStore } from './websocket-store' -let globalYDoc: Y.Doc | null = null -let globalYNodesMap: Y.Map | null = null -let globalYEdgesMap: Y.Map | null = null - -class YjsSocketIOProvider { - private doc: Y.Doc +class LoroSocketIOProvider { + private doc: any private socket: any - private isDestroyed = false - private onRemoteUpdate?: () => void - constructor(socket: any, doc: Y.Doc, onRemoteUpdate?: () => void) { + constructor(socket: any, doc: any) { this.socket = socket this.doc = doc - this.onRemoteUpdate = onRemoteUpdate this.setupEventListeners() } private setupEventListeners() { - this.doc.on('update', (update: Uint8Array, origin: any) => { - if (origin !== 'remote') - this.socket.emit('yjs_update', update) + this.doc.subscribe((event: any) => { + if (event.origin !== 'remote') { + const update = this.doc.export({ mode: 'update' }) + this.socket.emit('graph_update', update) + } }) - this.socket.on('yjs_update', (updateData: Uint8Array) => { - Y.applyUpdate(this.doc, new Uint8Array(updateData), 'remote') - - if (this.onRemoteUpdate) - this.onRemoteUpdate() + this.socket.on('graph_update', (updateData: Uint8Array) => { + try { + const data = new Uint8Array(updateData) + this.doc.import(data) + } + catch (error) { + console.error('Error importing graph update:', error) + } }) } destroy() { - this.isDestroyed = true + this.socket.off('graph_update') } } type CollaborationStore = { - ydoc: Y.Doc | null - provider: YjsSocketIOProvider | null - - yNodesMap: Y.Map | null - yEdgesMap: Y.Map | null - - yTestMap: Y.Map | null - yTestArray: Y.Array | null - + loroDoc: any | null + provider: LoroSocketIOProvider | null + nodesMap: any | null + edgesMap: any | null nodes: Node[] edges: Edge[] - + updateNodes?: () => void + updateEdges?: () => void initCollaboration: (appId: string) => void destroyCollaboration: () => void - } export const useCollaborationStore = create((set, get) => ({ - ydoc: null, + loroDoc: null, provider: null, - yNodesMap: null, - yEdgesMap: null, - yTestMap: null, - yTestArray: null, + nodesMap: null, + edgesMap: null, nodes: [], edges: [], initCollaboration: (appId: string) => { - if (!globalYDoc) { - console.log('Creating new global Y.Doc instance') - globalYDoc = new Y.Doc() - globalYNodesMap = globalYDoc.getMap('nodes') - globalYEdgesMap = globalYDoc.getMap('edges') - } - else { - console.log('Reusing existing global Y.Doc instance') - } - const ydoc = globalYDoc - const yNodesMap = globalYNodesMap! - const yEdgesMap = globalYEdgesMap! - const { getSocket } = useWebSocketStore.getState() const socket = getSocket(appId) + const doc = new LoroDoc() + const nodesMap = doc.getMap('nodes') + const edgesMap = doc.getMap('edges') - const updateReactState = () => { - console.log('updateReactState called') - - const nodes = Array.from(yNodesMap.values()) - const edges = Array.from(yEdgesMap.values()) - console.log('Y.js data - nodes:', nodes.length, 'edges:', edges.length) - - set({ - nodes: [...nodes] as Node[], - edges: [...edges] as Edge[], - }) + const updateNodes = () => { + const nodes = Array.from(nodesMap.values()) + set({ nodes }) } - const provider = new YjsSocketIOProvider(socket, globalYDoc, updateReactState) + const updateEdges = () => { + const edges = Array.from(edgesMap.values()) + set({ edges }) + } - yNodesMap.observe((event) => { - console.log('yNodesMap changed:', event) - updateReactState() - }) - yEdgesMap.observe((event) => { - console.log('yEdgesMap changed:', event) - updateReactState() - }) - - updateReactState() + const provider = new LoroSocketIOProvider(socket, doc) set({ - ydoc, + loroDoc: doc, provider, - yNodesMap, - yEdgesMap, + nodesMap, + edgesMap, + nodes: [], + edges: [], + updateNodes, + updateEdges, + }) + + nodesMap.subscribe((event: any) => { + console.log('NodesMap changed:', event) + updateNodes() + }) + + edgesMap.subscribe((event: any) => { + console.log('EdgesMap changed:', event) + updateEdges() }) }, destroyCollaboration: () => { const { provider } = get() - provider?.destroy() - set({ - ydoc: null, - provider: null, - yNodesMap: null, - yEdgesMap: null, - }) + if (provider) { + provider.destroy() + set({ + loroDoc: null, + provider: null, + nodesMap: null, + edgesMap: null, + nodes: [], + edges: [], + }) + } }, })) diff --git a/web/next.config.js b/web/next.config.js index 9ce1b35644..f37bd3b9ec 100644 --- a/web/next.config.js +++ b/web/next.config.js @@ -25,6 +25,15 @@ const nextConfig = { assetPrefix, webpack: (config, { dev, isServer }) => { config.plugins.push(codeInspectorPlugin({ bundler: 'webpack' })) + + config.experiments = { + asyncWebAssembly: true, + layers: true, + } + config.output.environment = { + asyncFunction: true, + } + return config }, productionBrowserSourceMaps: false, // enable browser source map generation during the production build diff --git a/web/package.json b/web/package.json index a6e7b41a36..792d2fcc92 100644 --- a/web/package.json +++ b/web/package.json @@ -99,6 +99,7 @@ "lexical": "^0.30.0", "line-clamp": "^1.0.0", "lodash-es": "^4.17.21", + "loro-crdt": "^1.5.9", "mermaid": "11.4.1", "mime": "^4.0.4", "mitt": "^3.0.1", @@ -148,7 +149,6 @@ "tldts": "^7.0.9", "use-context-selector": "^2.0.0", "uuid": "^10.0.0", - "yjs": "^13.6.27", "zod": "^3.23.8", "zundo": "^2.1.0", "zustand": "^4.5.2" diff --git a/web/pnpm-lock.yaml b/web/pnpm-lock.yaml index 021616427a..29380f8644 100644 --- a/web/pnpm-lock.yaml +++ b/web/pnpm-lock.yaml @@ -198,6 +198,9 @@ importers: lodash-es: specifier: ^4.17.21 version: 4.17.21 + loro-crdt: + specifier: ^1.5.9 + version: 1.5.9 mermaid: specifier: 11.4.1 version: 11.4.1 @@ -345,9 +348,6 @@ importers: uuid: specifier: ^10.0.0 version: 10.0.0 - yjs: - specifier: ^13.6.27 - version: 13.6.27 zod: specifier: ^3.23.8 version: 3.24.2 @@ -6418,6 +6418,9 @@ packages: resolution: {integrity: sha512-lyuxPGr/Wfhrlem2CL/UcnUc1zcqKAImBDzukY7Y5F/yQiNdko6+fRLevlw1HgMySw7f611UIY408EtxRSoK3Q==} hasBin: true + loro-crdt@1.5.9: + resolution: {integrity: sha512-edkfCYh5W42O7EmNVCx6iPrI0yTAhn0DRbEAk6tmiKaL42IpUoqyDREvtFDmdVVgofxKNgceDBQx8fIIjMdr2Q==} + loupe@3.1.3: resolution: {integrity: sha512-kkIp7XSkP78ZxJEsSxW3712C6teJVoeHHwgo9zJ380de7IYyJ2ISlxojcH2pC5OFLewESmnRi/+XCDIEEVyoug==} @@ -16133,6 +16136,8 @@ snapshots: dependencies: js-tokens: 4.0.0 + loro-crdt@1.5.9: {} + loupe@3.1.3: {} lower-case@2.0.2: diff --git a/web/service/demo/online-user.ts b/web/service/demo/online-user.ts index 9d190f982b..bd8eaaa696 100644 --- a/web/service/demo/online-user.ts +++ b/web/service/demo/online-user.ts @@ -24,7 +24,6 @@ export function connectOnlineUserWebSocket(appId: string): Socket { lastAppId = appId - // Add your event listeners here socket.on('connect', () => { socket?.emit('user_connect', { workflow_id: appId }) console.log('WebSocket connected') @@ -41,9 +40,6 @@ export function connectOnlineUserWebSocket(appId: string): Socket { return socket } -/** - * Disconnect the websocket connection. - */ export function disconnectOnlineUserWebSocket() { if (socket) { socket.disconnect()