use loro for crdt data

This commit is contained in:
hjlarry 2025-07-31 14:02:53 +08:00
parent 4cc01c8aa8
commit 965b65db6e
9 changed files with 113 additions and 125 deletions

View File

@ -140,10 +140,10 @@ def handle_collaboration_event(sid, data):
return {"msg": "event_broadcasted"}
@sio.on("yjs_update")
def handle_yjs_update(sid, data):
@sio.on("graph_update")
def handle_graph_update(sid, data):
"""
Handle Y.js document updates - simple broadcast relay.
Handle graph updates - simple broadcast relay.
"""
mapping = redis_client.get(f"ws_sid_map:{sid}")
@ -152,8 +152,7 @@ def handle_yjs_update(sid, data):
mapping_data = json.loads(mapping)
workflow_id = mapping_data["workflow_id"]
user_id = mapping_data["user_id"]
sio.emit("yjs_update", data, room=workflow_id, skip_sid=sid)
sio.emit("graph_update", data, room=workflow_id, skip_sid=sid)
return {"msg": "yjs_update_broadcasted"}
return {"msg": "graph_update_broadcasted"}

View File

@ -40,14 +40,14 @@ export const useWorkflowInit = () => {
}, [workflowStore])
useWorkflowConfig(appDetail.id, handleUpdateWorkflowConfig)
const initializeCollaboration = async (appId: string) => {
const initializeCollaboration = async (appId: string) => {
const { initCollaboration } = useCollaborationStore.getState()
initCollaboration(appId)
return new Promise<void>((resolve) => {
const checkInitialized = () => {
const { yNodesMap, yEdgesMap } = useCollaborationStore.getState()
if (yNodesMap && yEdgesMap)
const { nodesMap, edgesMap } = useCollaborationStore.getState()
if (nodesMap && edgesMap)
resolve()
else
setTimeout(checkInitialized, 50)
@ -56,21 +56,18 @@ export const useWorkflowInit = () => {
})
}
const populateYjsWithServerData = async (serverData: any) => {
const { yNodesMap, yEdgesMap } = useCollaborationStore.getState()
if (yNodesMap && yEdgesMap && serverData.graph) {
const { ydoc } = useCollaborationStore.getState()
ydoc?.transact(() => {
serverData.graph.nodes?.forEach((node: any) => {
yNodesMap.set(node.id, node)
})
serverData.graph.edges?.forEach((edge: any) => {
yEdgesMap.set(edge.id, edge)
})
const populateCollaborationWithServerData = async (serverData: any) => {
const { nodesMap, edgesMap, loroDoc } = useCollaborationStore.getState()
serverData.graph.nodes?.forEach((node: any) => {
console.log('Setting node:', node.id, node)
nodesMap.set(node.id, node)
})
}
serverData.graph.edges?.forEach((edge: any) => {
console.log('Setting edge:', edge.id, edge)
edgesMap.set(edge.id, edge)
})
loroDoc.commit()
}
const handleGetInitialWorkflowData = useCallback(async () => {
@ -88,7 +85,7 @@ export const useWorkflowInit = () => {
environmentVariables: res.environment_variables?.map(env => env.value_type === 'secret' ? { ...env, value: '[__HIDDEN__]' } : env) || [],
conversationVariables: res.conversation_variables || [],
})
await populateYjsWithServerData(res)
await populateCollaborationWithServerData(res)
setSyncWorkflowDraftHash(res.hash)
setIsLoading(false)
}

View File

@ -1498,18 +1498,15 @@ export const useNodesInteractions = () => {
})
setEdges(newEdges)
const { yNodesMap, yEdgesMap, ydoc } = useCollaborationStore.getState()
if (yNodesMap && yEdgesMap && ydoc) {
ydoc.transact(() => {
newNodes.forEach((node) => {
yNodesMap.set(node.id, node)
})
console.log('Before edge delete, yEdgesMap size:', yEdgesMap?.size)
connectedEdges.forEach((edge) => {
yEdgesMap.delete(edge.id)
})
console.log('After edge delete, yEdgesMap size:', yEdgesMap?.size)
const { nodesMap, edgesMap, loroDoc } = useCollaborationStore.getState()
if (nodesMap && edgesMap && loroDoc) {
// newNodes.forEach((node) => {
// nodesMap.set(node.id, node)
// })
connectedEdges.forEach((edge) => {
edgesMap.delete(edge.id)
})
loroDoc.commit()
}
handleSyncWorkflowDraft()

View File

@ -140,7 +140,6 @@ export const Workflow: FC<WorkflowProps> = memo(({
}, [collaborationNodes, setNodes])
useEffect(() => {
console.log('collaborationEdges changed:', collaborationEdges, 122112)
setEdges(collaborationEdges)
}, [collaborationEdges, setEdges])

View File

@ -1,132 +1,118 @@
import { create } from 'zustand'
import * as Y from 'yjs'
import { LoroDoc } from 'loro-crdt'
import type { Edge, Node } from '../types'
import { useWebSocketStore } from './websocket-store'
let globalYDoc: Y.Doc | null = null
let globalYNodesMap: Y.Map<any> | null = null
let globalYEdgesMap: Y.Map<any> | null = null
class YjsSocketIOProvider {
private doc: Y.Doc
class LoroSocketIOProvider {
private doc: any
private socket: any
private isDestroyed = false
private onRemoteUpdate?: () => void
constructor(socket: any, doc: Y.Doc, onRemoteUpdate?: () => void) {
constructor(socket: any, doc: any) {
this.socket = socket
this.doc = doc
this.onRemoteUpdate = onRemoteUpdate
this.setupEventListeners()
}
private setupEventListeners() {
this.doc.on('update', (update: Uint8Array, origin: any) => {
if (origin !== 'remote')
this.socket.emit('yjs_update', update)
this.doc.subscribe((event: any) => {
if (event.origin !== 'remote') {
const update = this.doc.export({ mode: 'update' })
this.socket.emit('graph_update', update)
}
})
this.socket.on('yjs_update', (updateData: Uint8Array) => {
Y.applyUpdate(this.doc, new Uint8Array(updateData), 'remote')
if (this.onRemoteUpdate)
this.onRemoteUpdate()
this.socket.on('graph_update', (updateData: Uint8Array) => {
try {
const data = new Uint8Array(updateData)
this.doc.import(data)
}
catch (error) {
console.error('Error importing graph update:', error)
}
})
}
destroy() {
this.isDestroyed = true
this.socket.off('graph_update')
}
}
type CollaborationStore = {
ydoc: Y.Doc | null
provider: YjsSocketIOProvider | null
yNodesMap: Y.Map<any> | null
yEdgesMap: Y.Map<any> | null
yTestMap: Y.Map<any> | null
yTestArray: Y.Array<any> | null
loroDoc: any | null
provider: LoroSocketIOProvider | null
nodesMap: any | null
edgesMap: any | null
nodes: Node[]
edges: Edge[]
updateNodes?: () => void
updateEdges?: () => void
initCollaboration: (appId: string) => void
destroyCollaboration: () => void
}
export const useCollaborationStore = create<CollaborationStore>((set, get) => ({
ydoc: null,
loroDoc: null,
provider: null,
yNodesMap: null,
yEdgesMap: null,
yTestMap: null,
yTestArray: null,
nodesMap: null,
edgesMap: null,
nodes: [],
edges: [],
initCollaboration: (appId: string) => {
if (!globalYDoc) {
console.log('Creating new global Y.Doc instance')
globalYDoc = new Y.Doc()
globalYNodesMap = globalYDoc.getMap<any>('nodes')
globalYEdgesMap = globalYDoc.getMap<any>('edges')
}
else {
console.log('Reusing existing global Y.Doc instance')
}
const ydoc = globalYDoc
const yNodesMap = globalYNodesMap!
const yEdgesMap = globalYEdgesMap!
const { getSocket } = useWebSocketStore.getState()
const socket = getSocket(appId)
const doc = new LoroDoc()
const nodesMap = doc.getMap('nodes')
const edgesMap = doc.getMap('edges')
const updateReactState = () => {
console.log('updateReactState called')
const nodes = Array.from(yNodesMap.values())
const edges = Array.from(yEdgesMap.values())
console.log('Y.js data - nodes:', nodes.length, 'edges:', edges.length)
set({
nodes: [...nodes] as Node[],
edges: [...edges] as Edge[],
})
const updateNodes = () => {
const nodes = Array.from(nodesMap.values())
set({ nodes })
}
const provider = new YjsSocketIOProvider(socket, globalYDoc, updateReactState)
const updateEdges = () => {
const edges = Array.from(edgesMap.values())
set({ edges })
}
yNodesMap.observe((event) => {
console.log('yNodesMap changed:', event)
updateReactState()
})
yEdgesMap.observe((event) => {
console.log('yEdgesMap changed:', event)
updateReactState()
})
updateReactState()
const provider = new LoroSocketIOProvider(socket, doc)
set({
ydoc,
loroDoc: doc,
provider,
yNodesMap,
yEdgesMap,
nodesMap,
edgesMap,
nodes: [],
edges: [],
updateNodes,
updateEdges,
})
nodesMap.subscribe((event: any) => {
console.log('NodesMap changed:', event)
updateNodes()
})
edgesMap.subscribe((event: any) => {
console.log('EdgesMap changed:', event)
updateEdges()
})
},
destroyCollaboration: () => {
const { provider } = get()
provider?.destroy()
set({
ydoc: null,
provider: null,
yNodesMap: null,
yEdgesMap: null,
})
if (provider) {
provider.destroy()
set({
loroDoc: null,
provider: null,
nodesMap: null,
edgesMap: null,
nodes: [],
edges: [],
})
}
},
}))

View File

@ -25,6 +25,15 @@ const nextConfig = {
assetPrefix,
webpack: (config, { dev, isServer }) => {
config.plugins.push(codeInspectorPlugin({ bundler: 'webpack' }))
config.experiments = {
asyncWebAssembly: true,
layers: true,
}
config.output.environment = {
asyncFunction: true,
}
return config
},
productionBrowserSourceMaps: false, // enable browser source map generation during the production build

View File

@ -99,6 +99,7 @@
"lexical": "^0.30.0",
"line-clamp": "^1.0.0",
"lodash-es": "^4.17.21",
"loro-crdt": "^1.5.9",
"mermaid": "11.4.1",
"mime": "^4.0.4",
"mitt": "^3.0.1",
@ -148,7 +149,6 @@
"tldts": "^7.0.9",
"use-context-selector": "^2.0.0",
"uuid": "^10.0.0",
"yjs": "^13.6.27",
"zod": "^3.23.8",
"zundo": "^2.1.0",
"zustand": "^4.5.2"

View File

@ -198,6 +198,9 @@ importers:
lodash-es:
specifier: ^4.17.21
version: 4.17.21
loro-crdt:
specifier: ^1.5.9
version: 1.5.9
mermaid:
specifier: 11.4.1
version: 11.4.1
@ -345,9 +348,6 @@ importers:
uuid:
specifier: ^10.0.0
version: 10.0.0
yjs:
specifier: ^13.6.27
version: 13.6.27
zod:
specifier: ^3.23.8
version: 3.24.2
@ -6418,6 +6418,9 @@ packages:
resolution: {integrity: sha512-lyuxPGr/Wfhrlem2CL/UcnUc1zcqKAImBDzukY7Y5F/yQiNdko6+fRLevlw1HgMySw7f611UIY408EtxRSoK3Q==}
hasBin: true
loro-crdt@1.5.9:
resolution: {integrity: sha512-edkfCYh5W42O7EmNVCx6iPrI0yTAhn0DRbEAk6tmiKaL42IpUoqyDREvtFDmdVVgofxKNgceDBQx8fIIjMdr2Q==}
loupe@3.1.3:
resolution: {integrity: sha512-kkIp7XSkP78ZxJEsSxW3712C6teJVoeHHwgo9zJ380de7IYyJ2ISlxojcH2pC5OFLewESmnRi/+XCDIEEVyoug==}
@ -16133,6 +16136,8 @@ snapshots:
dependencies:
js-tokens: 4.0.0
loro-crdt@1.5.9: {}
loupe@3.1.3: {}
lower-case@2.0.2:

View File

@ -24,7 +24,6 @@ export function connectOnlineUserWebSocket(appId: string): Socket {
lastAppId = appId
// Add your event listeners here
socket.on('connect', () => {
socket?.emit('user_connect', { workflow_id: appId })
console.log('WebSocket connected')
@ -41,9 +40,6 @@ export function connectOnlineUserWebSocket(appId: string): Socket {
return socket
}
/**
* Disconnect the websocket connection.
*/
export function disconnectOnlineUserWebSocket() {
if (socket) {
socket.disconnect()