From 80b34598e91aa2fd1d620807cafc95a54ca54def Mon Sep 17 00:00:00 2001 From: hjlarry Date: Thu, 16 Oct 2025 10:18:37 +0800 Subject: [PATCH] try to fix start node collaboration --- ...llaboration-manager.merge-behavior.test.ts | 69 ++++ .../collaboration-manager.syncNodes.test.ts | 180 +++++++++++ .../core/collaboration-manager.ts | 302 +++++++++--------- 3 files changed, 405 insertions(+), 146 deletions(-) create mode 100644 web/app/components/workflow/collaboration/core/__tests__/collaboration-manager.merge-behavior.test.ts create mode 100644 web/app/components/workflow/collaboration/core/__tests__/collaboration-manager.syncNodes.test.ts diff --git a/web/app/components/workflow/collaboration/core/__tests__/collaboration-manager.merge-behavior.test.ts b/web/app/components/workflow/collaboration/core/__tests__/collaboration-manager.merge-behavior.test.ts new file mode 100644 index 0000000000..786385c42c --- /dev/null +++ b/web/app/components/workflow/collaboration/core/__tests__/collaboration-manager.merge-behavior.test.ts @@ -0,0 +1,69 @@ +import { LoroDoc } from 'loro-crdt' +import { CollaborationManager } from '../collaboration-manager' +import type { Node } from '@/app/components/workflow/types' +import { BlockEnum } from '@/app/components/workflow/types' + +const NODE_ID = 'node-1' + +const createNode = (variables: string[]): Node => ({ + id: NODE_ID, + type: 'custom', + position: { x: 0, y: 0 }, + data: { + type: BlockEnum.Start, + title: 'Start', + desc: '', + variables: variables.map(name => ({ + variable: name, + label: name, + type: 'text-input', + required: true, + default: '', + max_length: 48, + placeholder: '', + options: [], + hint: '', + })), + }, +}) + +const getManager = (doc: LoroDoc) => { + const manager = new CollaborationManager() + ;(manager as any).doc = doc + ;(manager as any).nodesMap = doc.getMap('nodes') + ;(manager as any).edgesMap = doc.getMap('edges') + return manager +} + +const exportNodes = (manager: CollaborationManager) => manager.getNodes() + +describe('Loro merge behavior smoke test', () => { + it('inspects concurrent edits after merge', () => { + const docA = new LoroDoc() + const managerA = getManager(docA) + managerA.syncNodes([], [createNode(['a'])]) + + const snapshot = docA.export({ mode: 'snapshot' }) + + const docB = LoroDoc.fromSnapshot(snapshot) + const managerB = getManager(docB) + + managerA.syncNodes([createNode(['a'])], [createNode(['a', 'b'])]) + managerB.syncNodes([createNode(['a'])], [createNode(['a', 'c'])]) + + const updateForA = docB.export({ mode: 'update', from: docA.version() }) + docA.import(updateForA) + + const updateForB = docA.export({ mode: 'update', from: docB.version() }) + docB.import(updateForB) + + const finalA = exportNodes(managerA) + const finalB = exportNodes(managerB) + + console.log('Final nodes on docA:', JSON.stringify(finalA, null, 2)) + + console.log('Final nodes on docB:', JSON.stringify(finalB, null, 2)) + expect(finalA.length).toBe(1) + expect(finalB.length).toBe(1) + }) +}) diff --git a/web/app/components/workflow/collaboration/core/__tests__/collaboration-manager.syncNodes.test.ts b/web/app/components/workflow/collaboration/core/__tests__/collaboration-manager.syncNodes.test.ts new file mode 100644 index 0000000000..5c6596c9d6 --- /dev/null +++ b/web/app/components/workflow/collaboration/core/__tests__/collaboration-manager.syncNodes.test.ts @@ -0,0 +1,180 @@ +import { LoroDoc } from 'loro-crdt' +import { CollaborationManager } from '@/app/components/workflow/collaboration/core/collaboration-manager' +import { BlockEnum } from '@/app/components/workflow/types' +import type { Node } from '@/app/components/workflow/types' + +const NODE_ID = '1760342909316' + +type WorkflowVariable = { + default: string + hint: string + label: string + max_length: number + options: string[] + placeholder: string + required: boolean + type: string + variable: string +} + +const createVariable = (name: string, overrides: Partial = {}): WorkflowVariable => ({ + default: '', + hint: '', + label: name, + max_length: 48, + options: [], + placeholder: '', + required: true, + type: 'text-input', + variable: name, + ...overrides, +}) + +const deepClone = (value: T): T => JSON.parse(JSON.stringify(value)) + +const createNodeSnapshot = (variableNames: string[]): Node<{ variables: WorkflowVariable[] }> => ({ + id: NODE_ID, + type: 'custom', + position: { x: 0, y: 24 }, + positionAbsolute: { x: 0, y: 24 }, + height: 88, + width: 242, + selected: true, + selectable: true, + draggable: true, + sourcePosition: 'right', + targetPosition: 'left', + data: { + selected: true, + title: '开始', + desc: '', + type: BlockEnum.Start, + variables: variableNames.map(createVariable), + }, +}) + +const getVariables = (node: Node): string[] => { + const variables = (node.data as any)?.variables ?? [] + return variables.map((item: WorkflowVariable) => item.variable) +} + +const getVariableObject = (node: Node, name: string): WorkflowVariable | undefined => { + const variables = (node.data as any)?.variables ?? [] + return variables.find((item: WorkflowVariable) => item.variable === name) +} + +describe('CollaborationManager syncNodes', () => { + let manager: CollaborationManager + + beforeEach(() => { + manager = new CollaborationManager() + // Bypass private guards for targeted unit testing + const doc = new LoroDoc() + ;(manager as any).doc = doc + ;(manager as any).nodesMap = doc.getMap('nodes') + ;(manager as any).edgesMap = doc.getMap('edges') + + const initialNode = createNodeSnapshot(['a']) + ;(manager as any).syncNodes([], [deepClone(initialNode)]) + }) + + it('updates collaborators map when a single client adds a variable', () => { + const base = [createNodeSnapshot(['a'])] + const next = [createNodeSnapshot(['a', 'b'])] + + ;(manager as any).syncNodes(base, next) + + const stored = (manager.getNodes() as Node[]).find(node => node.id === NODE_ID) + expect(stored).toBeDefined() + expect(getVariables(stored!)).toEqual(['a', 'b']) + }) + + it('applies the latest parallel additions derived from the same base snapshot', () => { + const base = [createNodeSnapshot(['a'])] + const userA = [createNodeSnapshot(['a', 'b'])] + const userB = [createNodeSnapshot(['a', 'c'])] + + ;(manager as any).syncNodes(base, userA) + + const afterUserA = (manager.getNodes() as Node[]).find(node => node.id === NODE_ID) + expect(getVariables(afterUserA!)).toEqual(['a', 'b']) + + ;(manager as any).syncNodes(base, userB) + + const finalNode = (manager.getNodes() as Node[]).find(node => node.id === NODE_ID) + const finalVariables = getVariables(finalNode!) + + expect(finalVariables).toEqual(['a', 'c']) + }) + + it('prefers the incoming mutation when the same variable is edited concurrently', () => { + const base = [createNodeSnapshot(['a'])] + const userA = [ + { + ...createNodeSnapshot(['a']), + data: { + ...createNodeSnapshot(['a']).data, + variables: [ + createVariable('a', { label: 'A from userA', hint: 'hintA' }), + ], + }, + }, + ] + const userB = [ + { + ...createNodeSnapshot(['a']), + data: { + ...createNodeSnapshot(['a']).data, + variables: [ + createVariable('a', { label: 'A from userB', hint: 'hintB' }), + ], + }, + }, + ] + + ;(manager as any).syncNodes(base, userA) + ;(manager as any).syncNodes(base, userB) + + const finalNode = (manager.getNodes() as Node[]).find(node => node.id === NODE_ID) + const finalVariable = getVariableObject(finalNode!, 'a') + + expect(finalVariable?.label).toBe('A from userB') + expect(finalVariable?.hint).toBe('hintB') + }) + + it('reflects the last writer when concurrent removal and edits happen', () => { + const base = [createNodeSnapshot(['a', 'b'])] + ;(manager as any).syncNodes([], [deepClone(base[0])]) + const userA = [ + { + ...createNodeSnapshot(['a']), + data: { + ...createNodeSnapshot(['a']).data, + variables: [ + createVariable('a', { label: 'A after deletion' }), + ], + }, + }, + ] + const userB = [ + { + ...createNodeSnapshot(['a', 'b']), + data: { + ...createNodeSnapshot(['a']).data, + variables: [ + createVariable('a'), + createVariable('b', { label: 'B edited but should vanish' }), + ], + }, + }, + ] + + ;(manager as any).syncNodes(base, userA) + ;(manager as any).syncNodes(base, userB) + + const finalNode = (manager.getNodes() as Node[]).find(node => node.id === NODE_ID) + const finalVariables = getVariables(finalNode!) + expect(finalVariables).toEqual(['a', 'b']) + expect(getVariableObject(finalNode!, 'b')).toBeDefined() + }) +}) diff --git a/web/app/components/workflow/collaboration/core/collaboration-manager.ts b/web/app/components/workflow/collaboration/core/collaboration-manager.ts index fd784ea57c..b6047cae0e 100644 --- a/web/app/components/workflow/collaboration/core/collaboration-manager.ts +++ b/web/app/components/workflow/collaboration/core/collaboration-manager.ts @@ -1,4 +1,4 @@ -import { LoroDoc, UndoManager } from 'loro-crdt' +import { LoroDoc, LoroList, LoroMap, UndoManager } from 'loro-crdt' import { cloneDeep, isEqual } from 'lodash-es' import { webSocketClient } from './websocket-manager' import { CRDTProvider } from './crdt-provider' @@ -37,6 +37,152 @@ export class CollaborationManager { private isUndoRedoInProgress = false private pendingInitialSync = false + private getNodeContainer(nodeId: string): LoroMap { + if (!this.nodesMap) + throw new Error('Nodes map not initialized') + + let container = this.nodesMap.get(nodeId) as any + + if (!container || typeof container.kind !== 'function' || container.kind() !== 'Map') { + const previousValue = container + const newContainer = this.nodesMap.setContainer(nodeId, new LoroMap()) + container = typeof newContainer.getAttached === 'function' ? newContainer.getAttached() ?? newContainer : newContainer + if (previousValue && typeof previousValue === 'object') + this.populateNodeContainer(container, previousValue as Node) + } + else { + container = typeof container.getAttached === 'function' ? container.getAttached() ?? container : container + } + + return container + } + + private ensureDataContainer(nodeContainer: LoroMap): LoroMap { + let dataContainer = nodeContainer.get('data') as any + + if (!dataContainer || typeof dataContainer.kind !== 'function' || dataContainer.kind() !== 'Map') + dataContainer = nodeContainer.setContainer('data', new LoroMap()) + + return typeof dataContainer.getAttached === 'function' ? dataContainer.getAttached() ?? dataContainer : dataContainer + } + + private ensureVariableList(nodeContainer: LoroMap): LoroList { + console.log('variable list, for debug online') + const dataContainer = this.ensureDataContainer(nodeContainer) + let list = dataContainer.get('variables') as any + + if (!list || typeof list.kind !== 'function' || list.kind() !== 'List') + list = dataContainer.setContainer('variables', new LoroList()) + + return typeof list.getAttached === 'function' ? list.getAttached() ?? list : list + } + + private exportNode(nodeId: string): Node { + const container = this.getNodeContainer(nodeId) + const json = container.toJSON() as any + return { + ...json, + data: json.data || {}, + } + } + + private populateNodeContainer(container: LoroMap, node: Node): void { + container.set('id', node.id) + container.set('type', node.type) + container.set('position', cloneDeep(node.position)) + container.set('sourcePosition', node.sourcePosition) + container.set('targetPosition', node.targetPosition) + + if (node.width === undefined) container.delete('width') + else container.set('width', node.width) + + if (node.height === undefined) container.delete('height') + else container.set('height', node.height) + + if (node.selected === undefined) container.delete('selected') + else container.set('selected', node.selected) + + const optionalProps: Array = [ + 'parentId', + 'positionAbsolute', + 'extent', + 'zIndex', + 'draggable', + 'selectable', + 'dragHandle', + 'dragging', + 'connectable', + 'expandParent', + 'focusable', + 'hidden', + 'style', + 'className', + 'ariaLabel', + 'resizing', + 'deletable', + ] + + optionalProps.forEach((prop) => { + const value = node[prop] + if (value === undefined) + container.delete(prop as string) + else + container.set(prop as string, cloneDeep(value as any)) + }) + + const dataContainer = this.ensureDataContainer(container) + const handledKeys = new Set() + + Object.entries(node.data || {}).forEach(([key, value]) => { + if (!this.shouldSyncDataKey(key)) return + handledKeys.add(key) + + if (key === 'variables') + this.syncVariables(container, Array.isArray(value) ? value : []) + else + dataContainer.set(key, cloneDeep(value)) + }) + + const existingData = dataContainer.toJSON() || {} + Object.keys(existingData).forEach((key) => { + if (!this.shouldSyncDataKey(key)) return + if (handledKeys.has(key)) return + + if (key === 'variables') + dataContainer.delete('variables') + + else + dataContainer.delete(key) + }) + } + + private shouldSyncDataKey(key: string): boolean { + const syncDataAllowList = new Set(['_children', '_connectedSourceHandleIds', '_connectedTargetHandleIds', '_targetBranches']) + return (syncDataAllowList.has(key) || !key.startsWith('_')) && key !== 'selected' + } + + private syncVariables(nodeContainer: LoroMap, desired: any[]): void { + const list = this.ensureVariableList(nodeContainer) + const current = list.toJSON() as any[] + const target = Array.isArray(desired) ? desired : [] + const minLength = Math.min(current.length, target.length) + + for (let i = 0; i < minLength; i += 1) { + if (!isEqual(current[i], target[i])) { + list.delete(i, 1) + list.insert(i, cloneDeep(target[i])) + } + } + + if (current.length > target.length) { + list.delete(target.length, current.length - target.length) + } + else if (target.length > current.length) { + for (let i = current.length; i < target.length; i += 1) + list.insert(i, cloneDeep(target[i])) + } + } + private getNodePanelPresenceSnapshot(): NodePanelPresenceMap { const snapshot: NodePanelPresenceMap = {} Object.entries(this.nodePanelPresence).forEach(([nodeId, viewers]) => { @@ -273,7 +419,8 @@ export class CollaborationManager { } getNodes(): Node[] { - return this.nodesMap ? Array.from(this.nodesMap.values()) : [] + if (!this.nodesMap) return [] + return Array.from(this.nodesMap.keys()).map(id => this.exportNode(id as string)) } getEdges(): Edge[] { @@ -547,154 +694,16 @@ export class CollaborationManager { private syncNodes(oldNodes: Node[], newNodes: Node[]): void { if (!this.nodesMap || !this.doc) return - const oldNodesMap = new Map(oldNodes.map(node => [node.id, node])) - const newNodesMap = new Map(newNodes.map(node => [node.id, node])) - const syncDataAllowList = new Set(['_children', '_connectedSourceHandleIds', '_connectedTargetHandleIds', '_targetBranches']) - const shouldSyncDataKey = (key: string) => (syncDataAllowList.has(key) || !key.startsWith('_')) && key !== 'selected' + const newIdSet = new Set(newNodes.map(node => node.id)) - // Delete removed nodes oldNodes.forEach((oldNode) => { - if (!newNodesMap.has(oldNode.id)) + if (!newIdSet.has(oldNode.id)) this.nodesMap.delete(oldNode.id) }) - // Add or update nodes with fine-grained sync for data properties - const copyOptionalNodeProps = (source: Node, target: any) => { - const optionalProps: Array = [ - 'parentId', - 'positionAbsolute', - 'extent', - 'zIndex', - 'draggable', - 'selectable', - 'dragHandle', - 'dragging', - 'connectable', - 'expandParent', - 'focusable', - 'hidden', - 'style', - 'className', - 'ariaLabel', - 'markerStart', - 'markerEnd', - 'resizing', - 'deletable', - ] - - optionalProps.forEach((prop) => { - const value = (source as any)[prop] - if (value === undefined) { - if (prop in target) - delete target[prop] - return - } - - if (value !== null && typeof value === 'object') - target[prop as string] = cloneDeep(value) - else - target[prop as string] = value - }) - } - newNodes.forEach((newNode) => { - const oldNode = oldNodesMap.get(newNode.id) - - if (!oldNode) { - // New node - create as nested structure - const nodeData: any = { - id: newNode.id, - type: newNode.type, - position: { ...newNode.position }, - width: newNode.width, - height: newNode.height, - sourcePosition: newNode.sourcePosition, - targetPosition: newNode.targetPosition, - data: {}, - } - - copyOptionalNodeProps(newNode, nodeData) - - // Clone data properties, excluding private ones - Object.entries(newNode.data).forEach(([key, value]) => { - if (shouldSyncDataKey(key) && value !== undefined) - nodeData.data[key] = cloneDeep(value) - }) - - this.nodesMap.set(newNode.id, nodeData) - } - else { - // Get existing node from CRDT - const existingNode = this.nodesMap.get(newNode.id) - - if (existingNode) { - // Create a deep copy to modify - const updatedNode = cloneDeep(existingNode) - - // Update position only if changed - if (oldNode.position.x !== newNode.position.x || oldNode.position.y !== newNode.position.y) - updatedNode.position = { ...newNode.position } - - // Update dimensions only if changed - if (oldNode.width !== newNode.width) - updatedNode.width = newNode.width - - if (oldNode.height !== newNode.height) - updatedNode.height = newNode.height - - // Ensure optional node props stay in sync - copyOptionalNodeProps(newNode, updatedNode) - - // Ensure data object exists - if (!updatedNode.data) - updatedNode.data = {} - - // Fine-grained update of data properties - const oldData = oldNode.data || {} - const newData = newNode.data || {} - - // Only update changed properties in data - Object.entries(newData).forEach(([key, value]) => { - if (shouldSyncDataKey(key)) { - const oldValue = (oldData as any)[key] - if (!isEqual(oldValue, value)) - updatedNode.data[key] = cloneDeep(value) - } - }) - - // Remove deleted properties from data - Object.keys(oldData).forEach((key) => { - if (shouldSyncDataKey(key) && !(key in newData)) - delete updatedNode.data[key] - }) - - // Only update in CRDT if something actually changed - if (!isEqual(existingNode, updatedNode)) - this.nodesMap.set(newNode.id, updatedNode) - } - else { - // Node exists locally but not in CRDT yet - const nodeData: any = { - id: newNode.id, - type: newNode.type, - position: { ...newNode.position }, - width: newNode.width, - height: newNode.height, - sourcePosition: newNode.sourcePosition, - targetPosition: newNode.targetPosition, - data: {}, - } - - copyOptionalNodeProps(newNode, nodeData) - - Object.entries(newNode.data).forEach(([key, value]) => { - if (shouldSyncDataKey(key) && value !== undefined) - nodeData.data[key] = cloneDeep(value) - }) - - this.nodesMap.set(newNode.id, nodeData) - } - } + const nodeContainer = this.getNodeContainer(newNode.id) + this.populateNodeContainer(nodeContainer, newNode) }) } @@ -745,8 +754,9 @@ export class CollaborationManager { this.pendingInitialSync = false const updatedNodes = Array - .from(this.nodesMap.values()) - .map((node: Node) => { + .from(this.nodesMap.keys()) + .map((nodeId) => { + const node = this.exportNode(nodeId as string) const clonedNode: Node = { ...node, data: {