try a lot for yjs, but update data still not work...

This commit is contained in:
hjlarry 2025-07-30 14:36:29 +08:00
parent 41372168b6
commit 4cc01c8aa8
7 changed files with 236 additions and 9 deletions

View File

@ -138,3 +138,22 @@ def handle_collaboration_event(sid, data):
)
return {"msg": "event_broadcasted"}
@sio.on("yjs_update")
def handle_yjs_update(sid, data):
"""
Handle Y.js document updates - simple broadcast relay.
"""
mapping = redis_client.get(f"ws_sid_map:{sid}")
if not mapping:
return {"msg": "unauthorized"}, 401
mapping_data = json.loads(mapping)
workflow_id = mapping_data["workflow_id"]
user_id = mapping_data["user_id"]
sio.emit("yjs_update", data, room=workflow_id, skip_sid=sid)
return {"msg": "yjs_update_broadcasted"}

View File

@ -17,6 +17,8 @@ import {
} from '@/service/workflow'
import type { FetchWorkflowDraftResponse } from '@/types/workflow'
import { useWorkflowConfig } from '@/service/use-workflow'
import { useCollaborationStore } from '@/app/components/workflow/store/collaboration-store'
export const useWorkflowInit = () => {
const workflowStore = useWorkflowStore()
const {
@ -38,9 +40,45 @@ export const useWorkflowInit = () => {
}, [workflowStore])
useWorkflowConfig(appDetail.id, handleUpdateWorkflowConfig)
const initializeCollaboration = async (appId: string) => {
const { initCollaboration } = useCollaborationStore.getState()
initCollaboration(appId)
return new Promise<void>((resolve) => {
const checkInitialized = () => {
const { yNodesMap, yEdgesMap } = useCollaborationStore.getState()
if (yNodesMap && yEdgesMap)
resolve()
else
setTimeout(checkInitialized, 50)
}
checkInitialized()
})
}
const populateYjsWithServerData = async (serverData: any) => {
const { yNodesMap, yEdgesMap } = useCollaborationStore.getState()
if (yNodesMap && yEdgesMap && serverData.graph) {
const { ydoc } = useCollaborationStore.getState()
ydoc?.transact(() => {
serverData.graph.nodes?.forEach((node: any) => {
yNodesMap.set(node.id, node)
})
serverData.graph.edges?.forEach((edge: any) => {
yEdgesMap.set(edge.id, edge)
})
})
}
}
const handleGetInitialWorkflowData = useCallback(async () => {
try {
const res = await fetchWorkflowDraft(`/apps/${appDetail.id}/workflows/draft`)
const [res] = await Promise.all([
fetchWorkflowDraft(`/apps/${appDetail.id}/workflows/draft`),
initializeCollaboration(appDetail.id),
])
setData(res)
workflowStore.setState({
envSecrets: (res.environment_variables || []).filter(env => env.value_type === 'secret').reduce((acc, env) => {
@ -50,6 +88,7 @@ export const useWorkflowInit = () => {
environmentVariables: res.environment_variables?.map(env => env.value_type === 'secret' ? { ...env, value: '[__HIDDEN__]' } : env) || [],
conversationVariables: res.conversation_variables || [],
})
await populateYjsWithServerData(res)
setSyncWorkflowDraftHash(res.hash)
setIsLoading(false)
}

View File

@ -61,6 +61,7 @@ import {
} from './use-workflow'
import { WorkflowHistoryEvent, useWorkflowHistory } from './use-workflow-history'
import useInspectVarsCrud from './use-inspect-vars-crud'
import { useCollaborationStore } from '@/app/components/workflow/store/collaboration-store'
export const useNodesInteractions = () => {
const { t } = useTranslation()
@ -1496,6 +1497,21 @@ export const useNodesInteractions = () => {
return draft.filter(edge => !connectedEdges.find(connectedEdge => connectedEdge.id === edge.id))
})
setEdges(newEdges)
const { yNodesMap, yEdgesMap, ydoc } = useCollaborationStore.getState()
if (yNodesMap && yEdgesMap && ydoc) {
ydoc.transact(() => {
newNodes.forEach((node) => {
yNodesMap.set(node.id, node)
})
console.log('Before edge delete, yEdgesMap size:', yEdgesMap?.size)
connectedEdges.forEach((edge) => {
yEdgesMap.delete(edge.id)
})
console.log('After edge delete, yEdgesMap size:', yEdgesMap?.size)
})
}
handleSyncWorkflowDraft()
saveStateToHistory(WorkflowHistoryEvent.EdgeDelete)
}, [store, getNodesReadOnly, handleSyncWorkflowDraft, saveStateToHistory])

View File

@ -83,6 +83,7 @@ import Confirm from '@/app/components/base/confirm'
import DatasetsDetailProvider from './datasets-detail-store/provider'
import { HooksStoreContextProvider } from './hooks-store'
import type { Shape as HooksStoreShape } from './hooks-store'
import { useCollaborationStore } from '@/app/components/workflow/store/collaboration-store'
const nodeTypes = {
[CUSTOM_NODE]: CustomNode,
@ -127,6 +128,22 @@ export const Workflow: FC<WorkflowProps> = memo(({
return workflowCanvasHeight - bottomPanelHeight
}, [workflowCanvasHeight, bottomPanelHeight])
const collaborationNodes = useCollaborationStore((state) => {
return state.nodes
})
const collaborationEdges = useCollaborationStore((state) => {
return state.edges
})
useEffect(() => {
setNodes(collaborationNodes)
}, [collaborationNodes, setNodes])
useEffect(() => {
console.log('collaborationEdges changed:', collaborationEdges, 122112)
setEdges(collaborationEdges)
}, [collaborationEdges, setEdges])
// update workflow Canvas width and height
useEffect(() => {
if (workflowContainerRef.current) {

View File

@ -0,0 +1,132 @@
import { create } from 'zustand'
import * as Y from 'yjs'
import type { Edge, Node } from '../types'
import { useWebSocketStore } from './websocket-store'
let globalYDoc: Y.Doc | null = null
let globalYNodesMap: Y.Map<any> | null = null
let globalYEdgesMap: Y.Map<any> | null = null
class YjsSocketIOProvider {
private doc: Y.Doc
private socket: any
private isDestroyed = false
private onRemoteUpdate?: () => void
constructor(socket: any, doc: Y.Doc, onRemoteUpdate?: () => void) {
this.socket = socket
this.doc = doc
this.onRemoteUpdate = onRemoteUpdate
this.setupEventListeners()
}
private setupEventListeners() {
this.doc.on('update', (update: Uint8Array, origin: any) => {
if (origin !== 'remote')
this.socket.emit('yjs_update', update)
})
this.socket.on('yjs_update', (updateData: Uint8Array) => {
Y.applyUpdate(this.doc, new Uint8Array(updateData), 'remote')
if (this.onRemoteUpdate)
this.onRemoteUpdate()
})
}
destroy() {
this.isDestroyed = true
}
}
type CollaborationStore = {
ydoc: Y.Doc | null
provider: YjsSocketIOProvider | null
yNodesMap: Y.Map<any> | null
yEdgesMap: Y.Map<any> | null
yTestMap: Y.Map<any> | null
yTestArray: Y.Array<any> | null
nodes: Node[]
edges: Edge[]
initCollaboration: (appId: string) => void
destroyCollaboration: () => void
}
export const useCollaborationStore = create<CollaborationStore>((set, get) => ({
ydoc: null,
provider: null,
yNodesMap: null,
yEdgesMap: null,
yTestMap: null,
yTestArray: null,
nodes: [],
edges: [],
initCollaboration: (appId: string) => {
if (!globalYDoc) {
console.log('Creating new global Y.Doc instance')
globalYDoc = new Y.Doc()
globalYNodesMap = globalYDoc.getMap<any>('nodes')
globalYEdgesMap = globalYDoc.getMap<any>('edges')
}
else {
console.log('Reusing existing global Y.Doc instance')
}
const ydoc = globalYDoc
const yNodesMap = globalYNodesMap!
const yEdgesMap = globalYEdgesMap!
const { getSocket } = useWebSocketStore.getState()
const socket = getSocket(appId)
const updateReactState = () => {
console.log('updateReactState called')
const nodes = Array.from(yNodesMap.values())
const edges = Array.from(yEdgesMap.values())
console.log('Y.js data - nodes:', nodes.length, 'edges:', edges.length)
set({
nodes: [...nodes] as Node[],
edges: [...edges] as Edge[],
})
}
const provider = new YjsSocketIOProvider(socket, globalYDoc, updateReactState)
yNodesMap.observe((event) => {
console.log('yNodesMap changed:', event)
updateReactState()
})
yEdgesMap.observe((event) => {
console.log('yEdgesMap changed:', event)
updateReactState()
})
updateReactState()
set({
ydoc,
provider,
yNodesMap,
yEdgesMap,
})
},
destroyCollaboration: () => {
const { provider } = get()
provider?.destroy()
set({
ydoc: null,
provider: null,
yNodesMap: null,
yEdgesMap: null,
})
},
}))

View File

@ -148,6 +148,7 @@
"tldts": "^7.0.9",
"use-context-selector": "^2.0.0",
"uuid": "^10.0.0",
"yjs": "^13.6.27",
"zod": "^3.23.8",
"zundo": "^2.1.0",
"zustand": "^4.5.2"

View File

@ -56,7 +56,7 @@ importers:
version: 0.30.0
'@lexical/react':
specifier: ^0.30.0
version: 0.30.0(react-dom@19.1.0(react@19.1.0))(react@19.1.0)(yjs@13.6.24)
version: 0.30.0(react-dom@19.1.0(react@19.1.0))(react@19.1.0)(yjs@13.6.27)
'@lexical/selection':
specifier: ^0.30.0
version: 0.30.0
@ -345,6 +345,9 @@ importers:
uuid:
specifier: ^10.0.0
version: 10.0.0
yjs:
specifier: ^13.6.27
version: 13.6.27
zod:
specifier: ^3.23.8
version: 3.24.2
@ -8895,8 +8898,8 @@ packages:
resolution: {integrity: sha512-7dSzzRQ++CKnNI/krKnYRV7JKKPUXMEh61soaHKg9mrWEhzFWhFnxPxGl+69cD1Ou63C13NUPCnmIcrvqCuM6w==}
engines: {node: '>=12'}
yjs@13.6.24:
resolution: {integrity: sha512-xn/pYLTZa3uD1uDG8lpxfLRo5SR/rp0frdASOl2a71aYNvUXdWcLtVL91s2y7j+Q8ppmjZ9H3jsGVgoFMbT2VA==}
yjs@13.6.27:
resolution: {integrity: sha512-OIDwaflOaq4wC6YlPBy2L6ceKeKuF7DeTxx+jPzv1FHn9tCZ0ZwSRnUBxD05E3yed46fv/FWJbvR+Ud7x0L7zw==}
engines: {node: '>=16.0.0', npm: '>=8.0.0'}
yn@3.1.1:
@ -10780,7 +10783,7 @@ snapshots:
'@lexical/utils': 0.30.0
lexical: 0.30.0
'@lexical/react@0.30.0(react-dom@19.1.0(react@19.1.0))(react@19.1.0)(yjs@13.6.24)':
'@lexical/react@0.30.0(react-dom@19.1.0(react@19.1.0))(react@19.1.0)(yjs@13.6.27)':
dependencies:
'@lexical/devtools-core': 0.30.0(react-dom@19.1.0(react@19.1.0))(react@19.1.0)
'@lexical/dragon': 0.30.0
@ -10796,7 +10799,7 @@ snapshots:
'@lexical/table': 0.30.0
'@lexical/text': 0.30.0
'@lexical/utils': 0.30.0
'@lexical/yjs': 0.30.0(yjs@13.6.24)
'@lexical/yjs': 0.30.0(yjs@13.6.27)
lexical: 0.30.0
react: 19.1.0
react-dom: 19.1.0(react@19.1.0)
@ -10832,12 +10835,12 @@ snapshots:
'@lexical/table': 0.30.0
lexical: 0.30.0
'@lexical/yjs@0.30.0(yjs@13.6.24)':
'@lexical/yjs@0.30.0(yjs@13.6.27)':
dependencies:
'@lexical/offset': 0.30.0
'@lexical/selection': 0.30.0
lexical: 0.30.0
yjs: 13.6.24
yjs: 13.6.27
'@mapbox/node-pre-gyp@1.0.11':
dependencies:
@ -19239,7 +19242,7 @@ snapshots:
y18n: 5.0.8
yargs-parser: 21.1.1
yjs@13.6.24:
yjs@13.6.27:
dependencies:
lib0: 0.2.102