try to fix start node collaboration

This commit is contained in:
hjlarry 2025-10-16 10:18:37 +08:00
parent 9c66b92c34
commit 80b34598e9
3 changed files with 405 additions and 146 deletions

View File

@ -0,0 +1,69 @@
import { LoroDoc } from 'loro-crdt'
import { CollaborationManager } from '../collaboration-manager'
import type { Node } from '@/app/components/workflow/types'
import { BlockEnum } from '@/app/components/workflow/types'
const NODE_ID = 'node-1'
const createNode = (variables: string[]): Node => ({
id: NODE_ID,
type: 'custom',
position: { x: 0, y: 0 },
data: {
type: BlockEnum.Start,
title: 'Start',
desc: '',
variables: variables.map(name => ({
variable: name,
label: name,
type: 'text-input',
required: true,
default: '',
max_length: 48,
placeholder: '',
options: [],
hint: '',
})),
},
})
const getManager = (doc: LoroDoc) => {
const manager = new CollaborationManager()
;(manager as any).doc = doc
;(manager as any).nodesMap = doc.getMap('nodes')
;(manager as any).edgesMap = doc.getMap('edges')
return manager
}
const exportNodes = (manager: CollaborationManager) => manager.getNodes()
describe('Loro merge behavior smoke test', () => {
it('inspects concurrent edits after merge', () => {
const docA = new LoroDoc()
const managerA = getManager(docA)
managerA.syncNodes([], [createNode(['a'])])
const snapshot = docA.export({ mode: 'snapshot' })
const docB = LoroDoc.fromSnapshot(snapshot)
const managerB = getManager(docB)
managerA.syncNodes([createNode(['a'])], [createNode(['a', 'b'])])
managerB.syncNodes([createNode(['a'])], [createNode(['a', 'c'])])
const updateForA = docB.export({ mode: 'update', from: docA.version() })
docA.import(updateForA)
const updateForB = docA.export({ mode: 'update', from: docB.version() })
docB.import(updateForB)
const finalA = exportNodes(managerA)
const finalB = exportNodes(managerB)
console.log('Final nodes on docA:', JSON.stringify(finalA, null, 2))
console.log('Final nodes on docB:', JSON.stringify(finalB, null, 2))
expect(finalA.length).toBe(1)
expect(finalB.length).toBe(1)
})
})

View File

@ -0,0 +1,180 @@
import { LoroDoc } from 'loro-crdt'
import { CollaborationManager } from '@/app/components/workflow/collaboration/core/collaboration-manager'
import { BlockEnum } from '@/app/components/workflow/types'
import type { Node } from '@/app/components/workflow/types'
const NODE_ID = '1760342909316'
type WorkflowVariable = {
default: string
hint: string
label: string
max_length: number
options: string[]
placeholder: string
required: boolean
type: string
variable: string
}
const createVariable = (name: string, overrides: Partial<WorkflowVariable> = {}): WorkflowVariable => ({
default: '',
hint: '',
label: name,
max_length: 48,
options: [],
placeholder: '',
required: true,
type: 'text-input',
variable: name,
...overrides,
})
const deepClone = <T>(value: T): T => JSON.parse(JSON.stringify(value))
const createNodeSnapshot = (variableNames: string[]): Node<{ variables: WorkflowVariable[] }> => ({
id: NODE_ID,
type: 'custom',
position: { x: 0, y: 24 },
positionAbsolute: { x: 0, y: 24 },
height: 88,
width: 242,
selected: true,
selectable: true,
draggable: true,
sourcePosition: 'right',
targetPosition: 'left',
data: {
selected: true,
title: '开始',
desc: '',
type: BlockEnum.Start,
variables: variableNames.map(createVariable),
},
})
const getVariables = (node: Node): string[] => {
const variables = (node.data as any)?.variables ?? []
return variables.map((item: WorkflowVariable) => item.variable)
}
const getVariableObject = (node: Node, name: string): WorkflowVariable | undefined => {
const variables = (node.data as any)?.variables ?? []
return variables.find((item: WorkflowVariable) => item.variable === name)
}
describe('CollaborationManager syncNodes', () => {
let manager: CollaborationManager
beforeEach(() => {
manager = new CollaborationManager()
// Bypass private guards for targeted unit testing
const doc = new LoroDoc()
;(manager as any).doc = doc
;(manager as any).nodesMap = doc.getMap('nodes')
;(manager as any).edgesMap = doc.getMap('edges')
const initialNode = createNodeSnapshot(['a'])
;(manager as any).syncNodes([], [deepClone(initialNode)])
})
it('updates collaborators map when a single client adds a variable', () => {
const base = [createNodeSnapshot(['a'])]
const next = [createNodeSnapshot(['a', 'b'])]
;(manager as any).syncNodes(base, next)
const stored = (manager.getNodes() as Node[]).find(node => node.id === NODE_ID)
expect(stored).toBeDefined()
expect(getVariables(stored!)).toEqual(['a', 'b'])
})
it('applies the latest parallel additions derived from the same base snapshot', () => {
const base = [createNodeSnapshot(['a'])]
const userA = [createNodeSnapshot(['a', 'b'])]
const userB = [createNodeSnapshot(['a', 'c'])]
;(manager as any).syncNodes(base, userA)
const afterUserA = (manager.getNodes() as Node[]).find(node => node.id === NODE_ID)
expect(getVariables(afterUserA!)).toEqual(['a', 'b'])
;(manager as any).syncNodes(base, userB)
const finalNode = (manager.getNodes() as Node[]).find(node => node.id === NODE_ID)
const finalVariables = getVariables(finalNode!)
expect(finalVariables).toEqual(['a', 'c'])
})
it('prefers the incoming mutation when the same variable is edited concurrently', () => {
const base = [createNodeSnapshot(['a'])]
const userA = [
{
...createNodeSnapshot(['a']),
data: {
...createNodeSnapshot(['a']).data,
variables: [
createVariable('a', { label: 'A from userA', hint: 'hintA' }),
],
},
},
]
const userB = [
{
...createNodeSnapshot(['a']),
data: {
...createNodeSnapshot(['a']).data,
variables: [
createVariable('a', { label: 'A from userB', hint: 'hintB' }),
],
},
},
]
;(manager as any).syncNodes(base, userA)
;(manager as any).syncNodes(base, userB)
const finalNode = (manager.getNodes() as Node[]).find(node => node.id === NODE_ID)
const finalVariable = getVariableObject(finalNode!, 'a')
expect(finalVariable?.label).toBe('A from userB')
expect(finalVariable?.hint).toBe('hintB')
})
it('reflects the last writer when concurrent removal and edits happen', () => {
const base = [createNodeSnapshot(['a', 'b'])]
;(manager as any).syncNodes([], [deepClone(base[0])])
const userA = [
{
...createNodeSnapshot(['a']),
data: {
...createNodeSnapshot(['a']).data,
variables: [
createVariable('a', { label: 'A after deletion' }),
],
},
},
]
const userB = [
{
...createNodeSnapshot(['a', 'b']),
data: {
...createNodeSnapshot(['a']).data,
variables: [
createVariable('a'),
createVariable('b', { label: 'B edited but should vanish' }),
],
},
},
]
;(manager as any).syncNodes(base, userA)
;(manager as any).syncNodes(base, userB)
const finalNode = (manager.getNodes() as Node[]).find(node => node.id === NODE_ID)
const finalVariables = getVariables(finalNode!)
expect(finalVariables).toEqual(['a', 'b'])
expect(getVariableObject(finalNode!, 'b')).toBeDefined()
})
})

View File

@ -1,4 +1,4 @@
import { LoroDoc, UndoManager } from 'loro-crdt'
import { LoroDoc, LoroList, LoroMap, UndoManager } from 'loro-crdt'
import { cloneDeep, isEqual } from 'lodash-es'
import { webSocketClient } from './websocket-manager'
import { CRDTProvider } from './crdt-provider'
@ -37,6 +37,152 @@ export class CollaborationManager {
private isUndoRedoInProgress = false
private pendingInitialSync = false
private getNodeContainer(nodeId: string): LoroMap<any> {
if (!this.nodesMap)
throw new Error('Nodes map not initialized')
let container = this.nodesMap.get(nodeId) as any
if (!container || typeof container.kind !== 'function' || container.kind() !== 'Map') {
const previousValue = container
const newContainer = this.nodesMap.setContainer(nodeId, new LoroMap())
container = typeof newContainer.getAttached === 'function' ? newContainer.getAttached() ?? newContainer : newContainer
if (previousValue && typeof previousValue === 'object')
this.populateNodeContainer(container, previousValue as Node)
}
else {
container = typeof container.getAttached === 'function' ? container.getAttached() ?? container : container
}
return container
}
private ensureDataContainer(nodeContainer: LoroMap<any>): LoroMap<any> {
let dataContainer = nodeContainer.get('data') as any
if (!dataContainer || typeof dataContainer.kind !== 'function' || dataContainer.kind() !== 'Map')
dataContainer = nodeContainer.setContainer('data', new LoroMap())
return typeof dataContainer.getAttached === 'function' ? dataContainer.getAttached() ?? dataContainer : dataContainer
}
private ensureVariableList(nodeContainer: LoroMap<any>): LoroList<any> {
console.log('variable list, for debug online')
const dataContainer = this.ensureDataContainer(nodeContainer)
let list = dataContainer.get('variables') as any
if (!list || typeof list.kind !== 'function' || list.kind() !== 'List')
list = dataContainer.setContainer('variables', new LoroList())
return typeof list.getAttached === 'function' ? list.getAttached() ?? list : list
}
private exportNode(nodeId: string): Node {
const container = this.getNodeContainer(nodeId)
const json = container.toJSON() as any
return {
...json,
data: json.data || {},
}
}
private populateNodeContainer(container: LoroMap<any>, node: Node): void {
container.set('id', node.id)
container.set('type', node.type)
container.set('position', cloneDeep(node.position))
container.set('sourcePosition', node.sourcePosition)
container.set('targetPosition', node.targetPosition)
if (node.width === undefined) container.delete('width')
else container.set('width', node.width)
if (node.height === undefined) container.delete('height')
else container.set('height', node.height)
if (node.selected === undefined) container.delete('selected')
else container.set('selected', node.selected)
const optionalProps: Array<keyof Node> = [
'parentId',
'positionAbsolute',
'extent',
'zIndex',
'draggable',
'selectable',
'dragHandle',
'dragging',
'connectable',
'expandParent',
'focusable',
'hidden',
'style',
'className',
'ariaLabel',
'resizing',
'deletable',
]
optionalProps.forEach((prop) => {
const value = node[prop]
if (value === undefined)
container.delete(prop as string)
else
container.set(prop as string, cloneDeep(value as any))
})
const dataContainer = this.ensureDataContainer(container)
const handledKeys = new Set<string>()
Object.entries(node.data || {}).forEach(([key, value]) => {
if (!this.shouldSyncDataKey(key)) return
handledKeys.add(key)
if (key === 'variables')
this.syncVariables(container, Array.isArray(value) ? value : [])
else
dataContainer.set(key, cloneDeep(value))
})
const existingData = dataContainer.toJSON() || {}
Object.keys(existingData).forEach((key) => {
if (!this.shouldSyncDataKey(key)) return
if (handledKeys.has(key)) return
if (key === 'variables')
dataContainer.delete('variables')
else
dataContainer.delete(key)
})
}
private shouldSyncDataKey(key: string): boolean {
const syncDataAllowList = new Set(['_children', '_connectedSourceHandleIds', '_connectedTargetHandleIds', '_targetBranches'])
return (syncDataAllowList.has(key) || !key.startsWith('_')) && key !== 'selected'
}
private syncVariables(nodeContainer: LoroMap<any>, desired: any[]): void {
const list = this.ensureVariableList(nodeContainer)
const current = list.toJSON() as any[]
const target = Array.isArray(desired) ? desired : []
const minLength = Math.min(current.length, target.length)
for (let i = 0; i < minLength; i += 1) {
if (!isEqual(current[i], target[i])) {
list.delete(i, 1)
list.insert(i, cloneDeep(target[i]))
}
}
if (current.length > target.length) {
list.delete(target.length, current.length - target.length)
}
else if (target.length > current.length) {
for (let i = current.length; i < target.length; i += 1)
list.insert(i, cloneDeep(target[i]))
}
}
private getNodePanelPresenceSnapshot(): NodePanelPresenceMap {
const snapshot: NodePanelPresenceMap = {}
Object.entries(this.nodePanelPresence).forEach(([nodeId, viewers]) => {
@ -273,7 +419,8 @@ export class CollaborationManager {
}
getNodes(): Node[] {
return this.nodesMap ? Array.from(this.nodesMap.values()) : []
if (!this.nodesMap) return []
return Array.from(this.nodesMap.keys()).map(id => this.exportNode(id as string))
}
getEdges(): Edge[] {
@ -547,154 +694,16 @@ export class CollaborationManager {
private syncNodes(oldNodes: Node[], newNodes: Node[]): void {
if (!this.nodesMap || !this.doc) return
const oldNodesMap = new Map(oldNodes.map(node => [node.id, node]))
const newNodesMap = new Map(newNodes.map(node => [node.id, node]))
const syncDataAllowList = new Set(['_children', '_connectedSourceHandleIds', '_connectedTargetHandleIds', '_targetBranches'])
const shouldSyncDataKey = (key: string) => (syncDataAllowList.has(key) || !key.startsWith('_')) && key !== 'selected'
const newIdSet = new Set(newNodes.map(node => node.id))
// Delete removed nodes
oldNodes.forEach((oldNode) => {
if (!newNodesMap.has(oldNode.id))
if (!newIdSet.has(oldNode.id))
this.nodesMap.delete(oldNode.id)
})
// Add or update nodes with fine-grained sync for data properties
const copyOptionalNodeProps = (source: Node, target: any) => {
const optionalProps: Array<keyof Node | keyof any> = [
'parentId',
'positionAbsolute',
'extent',
'zIndex',
'draggable',
'selectable',
'dragHandle',
'dragging',
'connectable',
'expandParent',
'focusable',
'hidden',
'style',
'className',
'ariaLabel',
'markerStart',
'markerEnd',
'resizing',
'deletable',
]
optionalProps.forEach((prop) => {
const value = (source as any)[prop]
if (value === undefined) {
if (prop in target)
delete target[prop]
return
}
if (value !== null && typeof value === 'object')
target[prop as string] = cloneDeep(value)
else
target[prop as string] = value
})
}
newNodes.forEach((newNode) => {
const oldNode = oldNodesMap.get(newNode.id)
if (!oldNode) {
// New node - create as nested structure
const nodeData: any = {
id: newNode.id,
type: newNode.type,
position: { ...newNode.position },
width: newNode.width,
height: newNode.height,
sourcePosition: newNode.sourcePosition,
targetPosition: newNode.targetPosition,
data: {},
}
copyOptionalNodeProps(newNode, nodeData)
// Clone data properties, excluding private ones
Object.entries(newNode.data).forEach(([key, value]) => {
if (shouldSyncDataKey(key) && value !== undefined)
nodeData.data[key] = cloneDeep(value)
})
this.nodesMap.set(newNode.id, nodeData)
}
else {
// Get existing node from CRDT
const existingNode = this.nodesMap.get(newNode.id)
if (existingNode) {
// Create a deep copy to modify
const updatedNode = cloneDeep(existingNode)
// Update position only if changed
if (oldNode.position.x !== newNode.position.x || oldNode.position.y !== newNode.position.y)
updatedNode.position = { ...newNode.position }
// Update dimensions only if changed
if (oldNode.width !== newNode.width)
updatedNode.width = newNode.width
if (oldNode.height !== newNode.height)
updatedNode.height = newNode.height
// Ensure optional node props stay in sync
copyOptionalNodeProps(newNode, updatedNode)
// Ensure data object exists
if (!updatedNode.data)
updatedNode.data = {}
// Fine-grained update of data properties
const oldData = oldNode.data || {}
const newData = newNode.data || {}
// Only update changed properties in data
Object.entries(newData).forEach(([key, value]) => {
if (shouldSyncDataKey(key)) {
const oldValue = (oldData as any)[key]
if (!isEqual(oldValue, value))
updatedNode.data[key] = cloneDeep(value)
}
})
// Remove deleted properties from data
Object.keys(oldData).forEach((key) => {
if (shouldSyncDataKey(key) && !(key in newData))
delete updatedNode.data[key]
})
// Only update in CRDT if something actually changed
if (!isEqual(existingNode, updatedNode))
this.nodesMap.set(newNode.id, updatedNode)
}
else {
// Node exists locally but not in CRDT yet
const nodeData: any = {
id: newNode.id,
type: newNode.type,
position: { ...newNode.position },
width: newNode.width,
height: newNode.height,
sourcePosition: newNode.sourcePosition,
targetPosition: newNode.targetPosition,
data: {},
}
copyOptionalNodeProps(newNode, nodeData)
Object.entries(newNode.data).forEach(([key, value]) => {
if (shouldSyncDataKey(key) && value !== undefined)
nodeData.data[key] = cloneDeep(value)
})
this.nodesMap.set(newNode.id, nodeData)
}
}
const nodeContainer = this.getNodeContainer(newNode.id)
this.populateNodeContainer(nodeContainer, newNode)
})
}
@ -745,8 +754,9 @@ export class CollaborationManager {
this.pendingInitialSync = false
const updatedNodes = Array
.from(this.nodesMap.values())
.map((node: Node) => {
.from(this.nodesMap.keys())
.map((nodeId) => {
const node = this.exportNode(nodeId as string)
const clonedNode: Node = {
...node,
data: {