From 4cc01c8aa86960445e1c16d4258534a4beb6ab2d Mon Sep 17 00:00:00 2001 From: hjlarry Date: Wed, 30 Jul 2025 14:36:29 +0800 Subject: [PATCH] try a lot for yjs, but update data still not work... --- api/controllers/console/app/online_user.py | 19 +++ .../workflow-app/hooks/use-workflow-init.ts | 41 +++++- .../workflow/hooks/use-nodes-interactions.ts | 16 +++ web/app/components/workflow/index.tsx | 17 +++ .../workflow/store/collaboration-store.ts | 132 ++++++++++++++++++ web/package.json | 1 + web/pnpm-lock.yaml | 19 +-- 7 files changed, 236 insertions(+), 9 deletions(-) create mode 100644 web/app/components/workflow/store/collaboration-store.ts diff --git a/api/controllers/console/app/online_user.py b/api/controllers/console/app/online_user.py index 93c4a34f6b..b58643a0b9 100644 --- a/api/controllers/console/app/online_user.py +++ b/api/controllers/console/app/online_user.py @@ -138,3 +138,22 @@ def handle_collaboration_event(sid, data): ) return {"msg": "event_broadcasted"} + + +@sio.on("yjs_update") +def handle_yjs_update(sid, data): + """ + Handle Y.js document updates - simple broadcast relay. + """ + mapping = redis_client.get(f"ws_sid_map:{sid}") + + if not mapping: + return {"msg": "unauthorized"}, 401 + + mapping_data = json.loads(mapping) + workflow_id = mapping_data["workflow_id"] + user_id = mapping_data["user_id"] + + sio.emit("yjs_update", data, room=workflow_id, skip_sid=sid) + + return {"msg": "yjs_update_broadcasted"} diff --git a/web/app/components/workflow-app/hooks/use-workflow-init.ts b/web/app/components/workflow-app/hooks/use-workflow-init.ts index 6d16dc5c44..b682435e8a 100644 --- a/web/app/components/workflow-app/hooks/use-workflow-init.ts +++ b/web/app/components/workflow-app/hooks/use-workflow-init.ts @@ -17,6 +17,8 @@ import { } from '@/service/workflow' import type { FetchWorkflowDraftResponse } from '@/types/workflow' import { useWorkflowConfig } from '@/service/use-workflow' +import { useCollaborationStore } from '@/app/components/workflow/store/collaboration-store' + export const useWorkflowInit = () => { const workflowStore = useWorkflowStore() const { @@ -38,9 +40,45 @@ export const useWorkflowInit = () => { }, [workflowStore]) useWorkflowConfig(appDetail.id, handleUpdateWorkflowConfig) + const initializeCollaboration = async (appId: string) => { + const { initCollaboration } = useCollaborationStore.getState() + initCollaboration(appId) + + return new Promise((resolve) => { + const checkInitialized = () => { + const { yNodesMap, yEdgesMap } = useCollaborationStore.getState() + if (yNodesMap && yEdgesMap) + resolve() + else + setTimeout(checkInitialized, 50) + } + checkInitialized() + }) + } + + const populateYjsWithServerData = async (serverData: any) => { + const { yNodesMap, yEdgesMap } = useCollaborationStore.getState() + + if (yNodesMap && yEdgesMap && serverData.graph) { + const { ydoc } = useCollaborationStore.getState() + ydoc?.transact(() => { + serverData.graph.nodes?.forEach((node: any) => { + yNodesMap.set(node.id, node) + }) + + serverData.graph.edges?.forEach((edge: any) => { + yEdgesMap.set(edge.id, edge) + }) + }) + } + } + const handleGetInitialWorkflowData = useCallback(async () => { try { - const res = await fetchWorkflowDraft(`/apps/${appDetail.id}/workflows/draft`) + const [res] = await Promise.all([ + fetchWorkflowDraft(`/apps/${appDetail.id}/workflows/draft`), + initializeCollaboration(appDetail.id), + ]) setData(res) workflowStore.setState({ envSecrets: (res.environment_variables || []).filter(env => env.value_type === 'secret').reduce((acc, env) => { @@ -50,6 +88,7 @@ export const useWorkflowInit = () => { environmentVariables: res.environment_variables?.map(env => env.value_type === 'secret' ? { ...env, value: '[__HIDDEN__]' } : env) || [], conversationVariables: res.conversation_variables || [], }) + await populateYjsWithServerData(res) setSyncWorkflowDraftHash(res.hash) setIsLoading(false) } diff --git a/web/app/components/workflow/hooks/use-nodes-interactions.ts b/web/app/components/workflow/hooks/use-nodes-interactions.ts index b598951adb..58018a8c97 100644 --- a/web/app/components/workflow/hooks/use-nodes-interactions.ts +++ b/web/app/components/workflow/hooks/use-nodes-interactions.ts @@ -61,6 +61,7 @@ import { } from './use-workflow' import { WorkflowHistoryEvent, useWorkflowHistory } from './use-workflow-history' import useInspectVarsCrud from './use-inspect-vars-crud' +import { useCollaborationStore } from '@/app/components/workflow/store/collaboration-store' export const useNodesInteractions = () => { const { t } = useTranslation() @@ -1496,6 +1497,21 @@ export const useNodesInteractions = () => { return draft.filter(edge => !connectedEdges.find(connectedEdge => connectedEdge.id === edge.id)) }) setEdges(newEdges) + + const { yNodesMap, yEdgesMap, ydoc } = useCollaborationStore.getState() + if (yNodesMap && yEdgesMap && ydoc) { + ydoc.transact(() => { + newNodes.forEach((node) => { + yNodesMap.set(node.id, node) + }) + console.log('Before edge delete, yEdgesMap size:', yEdgesMap?.size) + connectedEdges.forEach((edge) => { + yEdgesMap.delete(edge.id) + }) + console.log('After edge delete, yEdgesMap size:', yEdgesMap?.size) + }) + } + handleSyncWorkflowDraft() saveStateToHistory(WorkflowHistoryEvent.EdgeDelete) }, [store, getNodesReadOnly, handleSyncWorkflowDraft, saveStateToHistory]) diff --git a/web/app/components/workflow/index.tsx b/web/app/components/workflow/index.tsx index 8ea861ebb4..ebfc1b52fb 100644 --- a/web/app/components/workflow/index.tsx +++ b/web/app/components/workflow/index.tsx @@ -83,6 +83,7 @@ import Confirm from '@/app/components/base/confirm' import DatasetsDetailProvider from './datasets-detail-store/provider' import { HooksStoreContextProvider } from './hooks-store' import type { Shape as HooksStoreShape } from './hooks-store' +import { useCollaborationStore } from '@/app/components/workflow/store/collaboration-store' const nodeTypes = { [CUSTOM_NODE]: CustomNode, @@ -127,6 +128,22 @@ export const Workflow: FC = memo(({ return workflowCanvasHeight - bottomPanelHeight }, [workflowCanvasHeight, bottomPanelHeight]) + const collaborationNodes = useCollaborationStore((state) => { + return state.nodes + }) + const collaborationEdges = useCollaborationStore((state) => { + return state.edges + }) + + useEffect(() => { + setNodes(collaborationNodes) + }, [collaborationNodes, setNodes]) + + useEffect(() => { + console.log('collaborationEdges changed:', collaborationEdges, 122112) + setEdges(collaborationEdges) + }, [collaborationEdges, setEdges]) + // update workflow Canvas width and height useEffect(() => { if (workflowContainerRef.current) { diff --git a/web/app/components/workflow/store/collaboration-store.ts b/web/app/components/workflow/store/collaboration-store.ts new file mode 100644 index 0000000000..3a3ae3ac0a --- /dev/null +++ b/web/app/components/workflow/store/collaboration-store.ts @@ -0,0 +1,132 @@ +import { create } from 'zustand' +import * as Y from 'yjs' +import type { Edge, Node } from '../types' +import { useWebSocketStore } from './websocket-store' + +let globalYDoc: Y.Doc | null = null +let globalYNodesMap: Y.Map | null = null +let globalYEdgesMap: Y.Map | null = null + +class YjsSocketIOProvider { + private doc: Y.Doc + private socket: any + private isDestroyed = false + private onRemoteUpdate?: () => void + + constructor(socket: any, doc: Y.Doc, onRemoteUpdate?: () => void) { + this.socket = socket + this.doc = doc + this.onRemoteUpdate = onRemoteUpdate + + this.setupEventListeners() + } + + private setupEventListeners() { + this.doc.on('update', (update: Uint8Array, origin: any) => { + if (origin !== 'remote') + this.socket.emit('yjs_update', update) + }) + + this.socket.on('yjs_update', (updateData: Uint8Array) => { + Y.applyUpdate(this.doc, new Uint8Array(updateData), 'remote') + + if (this.onRemoteUpdate) + this.onRemoteUpdate() + }) + } + + destroy() { + this.isDestroyed = true + } +} + +type CollaborationStore = { + ydoc: Y.Doc | null + provider: YjsSocketIOProvider | null + + yNodesMap: Y.Map | null + yEdgesMap: Y.Map | null + + yTestMap: Y.Map | null + yTestArray: Y.Array | null + + nodes: Node[] + edges: Edge[] + + initCollaboration: (appId: string) => void + destroyCollaboration: () => void + +} + +export const useCollaborationStore = create((set, get) => ({ + ydoc: null, + provider: null, + yNodesMap: null, + yEdgesMap: null, + yTestMap: null, + yTestArray: null, + nodes: [], + edges: [], + + initCollaboration: (appId: string) => { + if (!globalYDoc) { + console.log('Creating new global Y.Doc instance') + globalYDoc = new Y.Doc() + globalYNodesMap = globalYDoc.getMap('nodes') + globalYEdgesMap = globalYDoc.getMap('edges') + } + else { + console.log('Reusing existing global Y.Doc instance') + } + const ydoc = globalYDoc + const yNodesMap = globalYNodesMap! + const yEdgesMap = globalYEdgesMap! + + const { getSocket } = useWebSocketStore.getState() + const socket = getSocket(appId) + + const updateReactState = () => { + console.log('updateReactState called') + + const nodes = Array.from(yNodesMap.values()) + const edges = Array.from(yEdgesMap.values()) + console.log('Y.js data - nodes:', nodes.length, 'edges:', edges.length) + + set({ + nodes: [...nodes] as Node[], + edges: [...edges] as Edge[], + }) + } + + const provider = new YjsSocketIOProvider(socket, globalYDoc, updateReactState) + + yNodesMap.observe((event) => { + console.log('yNodesMap changed:', event) + updateReactState() + }) + yEdgesMap.observe((event) => { + console.log('yEdgesMap changed:', event) + updateReactState() + }) + + updateReactState() + + set({ + ydoc, + provider, + yNodesMap, + yEdgesMap, + }) + }, + + destroyCollaboration: () => { + const { provider } = get() + provider?.destroy() + set({ + ydoc: null, + provider: null, + yNodesMap: null, + yEdgesMap: null, + }) + }, +})) diff --git a/web/package.json b/web/package.json index c16481e6e5..a6e7b41a36 100644 --- a/web/package.json +++ b/web/package.json @@ -148,6 +148,7 @@ "tldts": "^7.0.9", "use-context-selector": "^2.0.0", "uuid": "^10.0.0", + "yjs": "^13.6.27", "zod": "^3.23.8", "zundo": "^2.1.0", "zustand": "^4.5.2" diff --git a/web/pnpm-lock.yaml b/web/pnpm-lock.yaml index 217dbc6f0a..021616427a 100644 --- a/web/pnpm-lock.yaml +++ b/web/pnpm-lock.yaml @@ -56,7 +56,7 @@ importers: version: 0.30.0 '@lexical/react': specifier: ^0.30.0 - version: 0.30.0(react-dom@19.1.0(react@19.1.0))(react@19.1.0)(yjs@13.6.24) + version: 0.30.0(react-dom@19.1.0(react@19.1.0))(react@19.1.0)(yjs@13.6.27) '@lexical/selection': specifier: ^0.30.0 version: 0.30.0 @@ -345,6 +345,9 @@ importers: uuid: specifier: ^10.0.0 version: 10.0.0 + yjs: + specifier: ^13.6.27 + version: 13.6.27 zod: specifier: ^3.23.8 version: 3.24.2 @@ -8895,8 +8898,8 @@ packages: resolution: {integrity: sha512-7dSzzRQ++CKnNI/krKnYRV7JKKPUXMEh61soaHKg9mrWEhzFWhFnxPxGl+69cD1Ou63C13NUPCnmIcrvqCuM6w==} engines: {node: '>=12'} - yjs@13.6.24: - resolution: {integrity: sha512-xn/pYLTZa3uD1uDG8lpxfLRo5SR/rp0frdASOl2a71aYNvUXdWcLtVL91s2y7j+Q8ppmjZ9H3jsGVgoFMbT2VA==} + yjs@13.6.27: + resolution: {integrity: sha512-OIDwaflOaq4wC6YlPBy2L6ceKeKuF7DeTxx+jPzv1FHn9tCZ0ZwSRnUBxD05E3yed46fv/FWJbvR+Ud7x0L7zw==} engines: {node: '>=16.0.0', npm: '>=8.0.0'} yn@3.1.1: @@ -10780,7 +10783,7 @@ snapshots: '@lexical/utils': 0.30.0 lexical: 0.30.0 - '@lexical/react@0.30.0(react-dom@19.1.0(react@19.1.0))(react@19.1.0)(yjs@13.6.24)': + '@lexical/react@0.30.0(react-dom@19.1.0(react@19.1.0))(react@19.1.0)(yjs@13.6.27)': dependencies: '@lexical/devtools-core': 0.30.0(react-dom@19.1.0(react@19.1.0))(react@19.1.0) '@lexical/dragon': 0.30.0 @@ -10796,7 +10799,7 @@ snapshots: '@lexical/table': 0.30.0 '@lexical/text': 0.30.0 '@lexical/utils': 0.30.0 - '@lexical/yjs': 0.30.0(yjs@13.6.24) + '@lexical/yjs': 0.30.0(yjs@13.6.27) lexical: 0.30.0 react: 19.1.0 react-dom: 19.1.0(react@19.1.0) @@ -10832,12 +10835,12 @@ snapshots: '@lexical/table': 0.30.0 lexical: 0.30.0 - '@lexical/yjs@0.30.0(yjs@13.6.24)': + '@lexical/yjs@0.30.0(yjs@13.6.27)': dependencies: '@lexical/offset': 0.30.0 '@lexical/selection': 0.30.0 lexical: 0.30.0 - yjs: 13.6.24 + yjs: 13.6.27 '@mapbox/node-pre-gyp@1.0.11': dependencies: @@ -19239,7 +19242,7 @@ snapshots: y18n: 5.0.8 yargs-parser: 21.1.1 - yjs@13.6.24: + yjs@13.6.27: dependencies: lib0: 0.2.102