From 684f7df1589413fd7cacfafb11746d16269c49c7 Mon Sep 17 00:00:00 2001 From: hjlarry Date: Mon, 8 Sep 2025 14:46:28 +0800 Subject: [PATCH] node data use crdt data --- .../core/collaboration-manager.ts | 104 +++++++++++++++--- .../workflow/hooks/use-node-data-update.ts | 9 +- 2 files changed, 91 insertions(+), 22 deletions(-) diff --git a/web/app/components/workflow/collaboration/core/collaboration-manager.ts b/web/app/components/workflow/collaboration/core/collaboration-manager.ts index a36fa7ce16..ba9a2db07a 100644 --- a/web/app/components/workflow/collaboration/core/collaboration-manager.ts +++ b/web/app/components/workflow/collaboration/core/collaboration-manager.ts @@ -202,29 +202,107 @@ export class CollaborationManager { } private syncNodes(oldNodes: Node[], newNodes: Node[]): void { - if (!this.nodesMap) return + if (!this.nodesMap || !this.doc) return const oldNodesMap = new Map(oldNodes.map(node => [node.id, node])) const newNodesMap = new Map(newNodes.map(node => [node.id, node])) + // Delete removed nodes oldNodes.forEach((oldNode) => { if (!newNodesMap.has(oldNode.id)) this.nodesMap.delete(oldNode.id) }) + // Add or update nodes with fine-grained sync for data properties newNodes.forEach((newNode) => { const oldNode = oldNodesMap.get(newNode.id) + if (!oldNode) { - const persistentData = this.getPersistentNodeData(newNode) - const clonedData = JSON.parse(JSON.stringify(persistentData)) - this.nodesMap.set(newNode.id, clonedData) + // New node - create as nested structure + const nodeData: any = { + id: newNode.id, + type: newNode.type, + position: { ...newNode.position }, + width: newNode.width, + height: newNode.height, + sourcePosition: newNode.sourcePosition, + targetPosition: newNode.targetPosition, + data: {}, + } + + // Clone data properties, excluding private ones + Object.entries(newNode.data).forEach(([key, value]) => { + if (!key.startsWith('_') && value !== undefined) + nodeData.data[key] = JSON.parse(JSON.stringify(value)) + }) + + this.nodesMap.set(newNode.id, nodeData) } else { - const oldPersistentData = this.getPersistentNodeData(oldNode) - const newPersistentData = this.getPersistentNodeData(newNode) - if (!isEqual(oldPersistentData, newPersistentData)) { - const clonedData = JSON.parse(JSON.stringify(newPersistentData)) - this.nodesMap.set(newNode.id, clonedData) + // Get existing node from CRDT + const existingNode = this.nodesMap.get(newNode.id) + + if (existingNode) { + // Create a deep copy to modify + const updatedNode = JSON.parse(JSON.stringify(existingNode)) + + // Update position only if changed + if (oldNode.position.x !== newNode.position.x || oldNode.position.y !== newNode.position.y) + updatedNode.position = { ...newNode.position } + + // Update dimensions only if changed + if (oldNode.width !== newNode.width) + updatedNode.width = newNode.width + + if (oldNode.height !== newNode.height) + updatedNode.height = newNode.height + + // Ensure data object exists + if (!updatedNode.data) + updatedNode.data = {} + + // Fine-grained update of data properties + const oldData = oldNode.data || {} + const newData = newNode.data || {} + + // Only update changed properties in data + Object.entries(newData).forEach(([key, value]) => { + if (!key.startsWith('_')) { + const oldValue = (oldData as any)[key] + if (!isEqual(oldValue, value)) + updatedNode.data[key] = JSON.parse(JSON.stringify(value)) + } + }) + + // Remove deleted properties from data + Object.keys(oldData).forEach((key) => { + if (!key.startsWith('_') && !(key in newData)) + delete updatedNode.data[key] + }) + + // Only update in CRDT if something actually changed + if (!isEqual(existingNode, updatedNode)) + this.nodesMap.set(newNode.id, updatedNode) + } + else { + // Node exists locally but not in CRDT yet + const nodeData: any = { + id: newNode.id, + type: newNode.type, + position: { ...newNode.position }, + width: newNode.width, + height: newNode.height, + sourcePosition: newNode.sourcePosition, + targetPosition: newNode.targetPosition, + data: {}, + } + + Object.entries(newNode.data).forEach(([key, value]) => { + if (!key.startsWith('_') && value !== undefined) + nodeData.data[key] = JSON.parse(JSON.stringify(value)) + }) + + this.nodesMap.set(newNode.id, nodeData) } } }) @@ -254,14 +332,6 @@ export class CollaborationManager { }) } - private getPersistentNodeData(node: Node): any { - const { data, ...rest } = node - const filteredData = Object.fromEntries( - Object.entries(data).filter(([key]) => !key.startsWith('_')), - ) - return { ...rest, data: filteredData } - } - private setupSubscriptions(): void { this.nodesMap?.subscribe((event: any) => { if (event.by === 'import' && this.reactFlowStore) { diff --git a/web/app/components/workflow/hooks/use-node-data-update.ts b/web/app/components/workflow/hooks/use-node-data-update.ts index c59c858184..732a01ed05 100644 --- a/web/app/components/workflow/hooks/use-node-data-update.ts +++ b/web/app/components/workflow/hooks/use-node-data-update.ts @@ -3,6 +3,7 @@ import produce from 'immer' import { useStoreApi } from 'reactflow' import { useNodesSyncDraft } from './use-nodes-sync-draft' import { useNodesReadOnly } from './use-workflow' +import { useCollaborativeWorkflow } from './use-collaborative-workflow' type NodeDataUpdatePayload = { id: string @@ -13,13 +14,11 @@ export const useNodeDataUpdate = () => { const store = useStoreApi() const { handleSyncWorkflowDraft } = useNodesSyncDraft() const { getNodesReadOnly } = useNodesReadOnly() + const collaborativeWorkflow = useCollaborativeWorkflow() const handleNodeDataUpdate = useCallback(({ id, data }: NodeDataUpdatePayload) => { - const { - getNodes, - setNodes, - } = store.getState() - const newNodes = produce(getNodes(), (draft) => { + const { nodes, setNodes } = collaborativeWorkflow.getState() + const newNodes = produce(nodes, (draft) => { const currentNode = draft.find(node => node.id === id)! if (currentNode)