diff --git a/web/app/components/workflow-app/components/workflow-main.tsx b/web/app/components/workflow-app/components/workflow-main.tsx index ab23bb3d95..dacc553841 100644 --- a/web/app/components/workflow-app/components/workflow-main.tsx +++ b/web/app/components/workflow-app/components/workflow-main.tsx @@ -22,6 +22,8 @@ import { useStore, useWorkflowStore } from '@/app/components/workflow/store' import { useWebSocketStore } from '@/app/components/workflow/store/websocket-store' import { useCollaborativeCursors } from '../hooks' import type { OnlineUser } from '@/service/demo/online-user' +import { collaborationManager } from '@/app/components/workflow/collaboration/manage' +import { useStoreApi } from 'reactflow' type WorkflowMainProps = Pick const WorkflowMain = ({ @@ -36,6 +38,12 @@ const WorkflowMain = ({ const lastEmitTimeRef = useRef(0) const lastPositionRef = useRef<{ x: number; y: number } | null>(null) + const store = useStoreApi() + + useEffect(() => { + collaborationManager.init(appId, store) + }, [appId, store]) + const { emit, getSocket } = useWebSocketStore() const handleWorkflowDataUpdate = useCallback((payload: any) => { @@ -87,7 +95,6 @@ const WorkflowMain = ({ } }, [emit]) - // Add mouse move event listener useEffect(() => { const container = containerRef.current if (!container) return @@ -132,7 +139,6 @@ const WorkflowMain = ({ setOnlineUsers(usersMap) } socket.on('online_users', handleOnlineUsersUpdate) - // clean up return () => { socket.off('online_users', handleOnlineUsersUpdate) } diff --git a/web/app/components/workflow-app/hooks/use-nodes-sync-draft.ts b/web/app/components/workflow-app/hooks/use-nodes-sync-draft.ts index db21cfb05e..312d38a259 100644 --- a/web/app/components/workflow-app/hooks/use-nodes-sync-draft.ts +++ b/web/app/components/workflow-app/hooks/use-nodes-sync-draft.ts @@ -130,7 +130,9 @@ export const useNodesSyncDraft = () => { if (error && error.json && !error.bodyUsed) { error.json().then((err: any) => { if (err.code === 'draft_workflow_not_sync' && !notRefreshWhenSyncError) - handleRefreshWorkflowDraft() + // TODO: hjlarry test collaboration + // handleRefreshWorkflowDraft() + console.error('draft_workflow_not_sync', err) }) } callback?.onError && callback.onError() diff --git a/web/app/components/workflow-app/hooks/use-workflow-init.ts b/web/app/components/workflow-app/hooks/use-workflow-init.ts index ff0fd50e58..e1c4c25a4e 100644 --- a/web/app/components/workflow-app/hooks/use-workflow-init.ts +++ b/web/app/components/workflow-app/hooks/use-workflow-init.ts @@ -17,7 +17,6 @@ import { } from '@/service/workflow' import type { FetchWorkflowDraftResponse } from '@/types/workflow' import { useWorkflowConfig } from '@/service/use-workflow' -import { useCollaborationStore } from '@/app/components/workflow/store/collaboration-store' export const useWorkflowInit = () => { const workflowStore = useWorkflowStore() @@ -40,28 +39,9 @@ export const useWorkflowInit = () => { }, [workflowStore]) useWorkflowConfig(appDetail.id, handleUpdateWorkflowConfig) - const initializeCollaboration = async (appId: string) => { - const { initCollaboration } = useCollaborationStore.getState() - initCollaboration(appId) - - return new Promise((resolve) => { - const checkInitialized = () => { - const { nodesMap, edgesMap } = useCollaborationStore.getState() - if (nodesMap && edgesMap) - resolve() - else - setTimeout(checkInitialized, 50) - } - checkInitialized() - }) - } - const handleGetInitialWorkflowData = useCallback(async () => { try { - const [res] = await Promise.all([ - fetchWorkflowDraft(`/apps/${appDetail.id}/workflows/draft`), - initializeCollaboration(appDetail.id), - ]) + const res = await fetchWorkflowDraft(`/apps/${appDetail.id}/workflows/draft`) setData(res) workflowStore.setState({ envSecrets: (res.environment_variables || []).filter(env => env.value_type === 'secret').reduce((acc, env) => { diff --git a/web/app/components/workflow-app/hooks/use-workflow-refresh-draft.ts b/web/app/components/workflow-app/hooks/use-workflow-refresh-draft.ts index c944e10c4c..6580763e8e 100644 --- a/web/app/components/workflow-app/hooks/use-workflow-refresh-draft.ts +++ b/web/app/components/workflow-app/hooks/use-workflow-refresh-draft.ts @@ -1,7 +1,6 @@ import { useCallback } from 'react' import { useWorkflowStore } from '@/app/components/workflow/store' import { fetchWorkflowDraft } from '@/service/workflow' -import type { WorkflowDataUpdater } from '@/app/components/workflow/types' import { useWorkflowUpdate } from '@/app/components/workflow/hooks' export const useWorkflowRefreshDraft = () => { @@ -19,7 +18,8 @@ export const useWorkflowRefreshDraft = () => { } = workflowStore.getState() setIsSyncingWorkflowDraft(true) fetchWorkflowDraft(`/apps/${appId}/workflows/draft`).then((response) => { - handleUpdateWorkflowCanvas(response.graph as WorkflowDataUpdater) + // TODO: hjlarry test collaboration + // handleUpdateWorkflowCanvas(response.graph as WorkflowDataUpdater) setSyncWorkflowDraftHash(response.hash) setEnvSecrets((response.environment_variables || []).filter(env => env.value_type === 'secret').reduce((acc, env) => { acc[env.id] = env.value diff --git a/web/app/components/workflow-app/index.tsx b/web/app/components/workflow-app/index.tsx index db404a7efc..0795efe318 100644 --- a/web/app/components/workflow-app/index.tsx +++ b/web/app/components/workflow-app/index.tsx @@ -23,21 +23,21 @@ import { } from '@/app/components/workflow/context' import { createWorkflowSlice } from './store/workflow/workflow-slice' import WorkflowAppMain from './components/workflow-main' -import { useCollaborationStore } from '@/app/components/workflow/store/collaboration-store' +import { collaborationManager } from '@/app/components/workflow/collaboration/manage' const WorkflowAppWithAdditionalContext = () => { const { data, isLoading, } = useWorkflowInit() + const { setNodes, setEdges } = collaborationManager + const { data: fileUploadConfigResponse } = useSWR({ url: '/files/upload' }, fetchFileUploadConfig) const nodesData = useMemo(() => { if (data) { const processedNodes = initialNodes(data.graph.nodes, data.graph.edges) - - const { setNodes } = useCollaborationStore.getState() - setNodes(processedNodes) + setNodes([], processedNodes) return processedNodes } @@ -47,9 +47,7 @@ const WorkflowAppWithAdditionalContext = () => { const edgesData = useMemo(() => { if (data) { const processedEdges = initialEdges(data.graph.edges, data.graph.nodes) - - const { setEdges } = useCollaborationStore.getState() - setEdges(processedEdges) + setEdges([], processedEdges) return processedEdges } diff --git a/web/app/components/workflow/collaboration/manage.ts b/web/app/components/workflow/collaboration/manage.ts new file mode 100644 index 0000000000..4ff51ead57 --- /dev/null +++ b/web/app/components/workflow/collaboration/manage.ts @@ -0,0 +1,154 @@ +import { LoroDoc } from 'loro-crdt' +import { isEqual } from 'lodash-es' +import { useWebSocketStore } from '../store/websocket-store' +import type { Edge, Node } from '../types' + +class LoroSocketIOProvider { + private doc: any + private socket: any + + constructor(socket: any, doc: any) { + this.socket = socket + this.doc = doc + this.setupEventListeners() + } + + private setupEventListeners() { + this.doc.subscribe((event: any) => { + if (event.origin !== 'remote') { + const update = this.doc.export({ mode: 'update' }) + this.socket.emit('graph_update', update) + } + }) + + this.socket.on('graph_update', (updateData: Uint8Array) => { + try { + const data = new Uint8Array(updateData) + this.doc.import(data) + } + catch (error) { + console.error('Error importing graph update:', error) + } + }) + } + + destroy() { + this.socket.off('graph_update') + } +} + +class CollaborationManager { + private doc: LoroDoc | null = null + private provider: LoroSocketIOProvider | null = null + private nodesMap: any = null + private edgesMap: any = null + + init(appId: string, reactFlowStore: any) { + const { getSocket } = useWebSocketStore.getState() + const socket = getSocket(appId) + this.doc = new LoroDoc() + this.nodesMap = this.doc.getMap('nodes') + this.edgesMap = this.doc.getMap('edges') + this.provider = new LoroSocketIOProvider(socket, this.doc) + + this.setupSubscriptions(reactFlowStore) + } + + private setupSubscriptions(reactFlowStore: any) { + this.nodesMap?.subscribe((event: any) => { + console.log('nodesMap', event) + if (event.by === 'import') { + requestAnimationFrame(() => { + const { setNodes } = reactFlowStore.getState() + const updatedNodes = Array.from(this.nodesMap.values()) + setNodes(updatedNodes) + }) + } + }) + + this.edgesMap?.subscribe((event: any) => { + if (event.by === 'import') { + requestAnimationFrame(() => { + const { setEdges } = reactFlowStore.getState() + const updatedEdges = Array.from(this.edgesMap.values()) + setEdges(updatedEdges) + }) + } + }) + } + + getNodes() { + return this.nodesMap ? Array.from(this.nodesMap.values()) : [] + } + + getEdges() { + return this.edgesMap ? Array.from(this.edgesMap.values()) : [] + } + + private getPersistentNodeData = (node: Node) => { + const { data, ...rest } = node + const filteredData = Object.fromEntries( + Object.entries(data).filter(([key]) => !key.startsWith('_')), + ) + return { ...rest, data: filteredData } + } + + setNodes = (oldNodes: Node[], newNodes: Node[]) => { + if (!this.nodesMap || !this.doc) return + + const oldNodesMap = new Map(oldNodes.map(node => [node.id, node])) + const newNodesMap = new Map(newNodes.map(node => [node.id, node])) + + oldNodes.forEach((oldNode) => { + if (!newNodesMap.has(oldNode.id)) + this.nodesMap.delete(oldNode.id) + }) + + newNodes.forEach((newNode) => { + const oldNode = oldNodesMap.get(newNode.id) + if (!oldNode) { + this.nodesMap.set(newNode.id, newNode) + } + else { + const oldPersistentData = this.getPersistentNodeData(oldNode) + const newPersistentData = this.getPersistentNodeData(newNode) + if (!isEqual(oldPersistentData, newPersistentData)) + this.nodesMap.set(newNode.id, newPersistentData) + } + }) + + this.doc.commit() + } + + setEdges = (oldEdges: Edge[], newEdges: Edge[]) => { + if (!this.edgesMap || !this.doc) return + + const oldEdgesMap = new Map(oldEdges.map(edge => [edge.id, edge])) + const newEdgesMap = new Map(newEdges.map(edge => [edge.id, edge])) + + oldEdges.forEach((oldEdge) => { + if (!newEdgesMap.has(oldEdge.id)) + this.edgesMap.delete(oldEdge.id) + }) + + newEdges.forEach((newEdge) => { + const oldEdge = oldEdgesMap.get(newEdge.id) + if (!oldEdge) + this.edgesMap.set(newEdge.id, newEdge) + else if (!isEqual(oldEdge, newEdge)) + this.edgesMap.set(newEdge.id, newEdge) + }) + + this.doc.commit() + } + + destroy() { + this.provider?.destroy() + this.doc = null + this.provider = null + this.nodesMap = null + this.edgesMap = null + } +} + +export const collaborationManager = new CollaborationManager() diff --git a/web/app/components/workflow/hooks/use-collaborative-workflow.ts b/web/app/components/workflow/hooks/use-collaborative-workflow.ts new file mode 100644 index 0000000000..40fe310cad --- /dev/null +++ b/web/app/components/workflow/hooks/use-collaborative-workflow.ts @@ -0,0 +1,41 @@ +import { useCallback } from 'react' +import { useStoreApi } from 'reactflow' +import type { Edge, Node } from '../types' +import { collaborationManager } from '../collaboration/manage' + +export const useCollaborativeWorkflow = () => { + const store = useStoreApi() + const { setNodes: collabSetNodes, setEdges: collabSetEdges } = collaborationManager + + const setNodes = useCallback((newNodes: Node[]) => { + const { getNodes, setNodes: reactFlowSetNodes } = store.getState() + const oldNodes = getNodes() + collabSetNodes(oldNodes, newNodes) + reactFlowSetNodes(newNodes) + }, [store, collabSetNodes]) + + const setEdges = useCallback((newEdges: Edge[]) => { + const { edges, setEdges: reactFlowSetEdges } = store.getState() + collabSetEdges(edges, newEdges) + reactFlowSetEdges(newEdges) + }, [store, collabSetEdges]) + + const collaborativeStore = useCallback(() => { + const state = store.getState() + return { + + nodes: state.getNodes(), + edges: state.edges, + + setNodes, + setEdges, + + } + }, [store, setNodes, setEdges]) + + return { + getState: collaborativeStore, + setNodes, + setEdges, + } +} diff --git a/web/app/components/workflow/hooks/use-nodes-interactions.ts b/web/app/components/workflow/hooks/use-nodes-interactions.ts index 263f7f8e67..7a3009ebe8 100644 --- a/web/app/components/workflow/hooks/use-nodes-interactions.ts +++ b/web/app/components/workflow/hooks/use-nodes-interactions.ts @@ -61,11 +61,12 @@ import { } from './use-workflow' import { WorkflowHistoryEvent, useWorkflowHistory } from './use-workflow-history' import useInspectVarsCrud from './use-inspect-vars-crud' -import { useCollaborationStore } from '@/app/components/workflow/store/collaboration-store' +import { useCollaborativeWorkflow } from './use-collaborative-workflow' export const useNodesInteractions = () => { const { t } = useTranslation() const store = useStoreApi() + const collaborativeWorkflow = useCollaborativeWorkflow() const workflowStore = useWorkflowStore() const reactflow = useReactFlow() const { store: workflowHistoryStore } = useWorkflowHistoryStore() @@ -114,13 +115,9 @@ export const useNodesInteractions = () => { if (node.type === CUSTOM_LOOP_START_NODE) return - const { - getNodes, - setNodes, - } = store.getState() e.stopPropagation() - const nodes = getNodes() + const { nodes, setNodes } = collaborativeWorkflow.getState() const { restrictPosition } = handleNodeIterationChildDrag(node) const { restrictPosition: restrictLoopPosition } = handleNodeLoopChildDrag(node) @@ -188,13 +185,7 @@ export const useNodesInteractions = () => { if (node.type === CUSTOM_LOOP_START_NODE || node.type === CUSTOM_NOTE_NODE) return - const { - getNodes, - setNodes, - edges, - setEdges, - } = store.getState() - const nodes = getNodes() + const { nodes, edges, setNodes, setEdges } = collaborativeWorkflow.getState() const { connectingNodePayload, setEnteringNodePayload, @@ -269,13 +260,9 @@ export const useNodesInteractions = () => { setEnteringNodePayload, } = workflowStore.getState() setEnteringNodePayload(undefined) - const { - getNodes, - setNodes, - edges, - setEdges, - } = store.getState() - const newNodes = produce(getNodes(), (draft) => { + + const { nodes, edges, setNodes, setEdges } = collaborativeWorkflow.getState() + const newNodes = produce(nodes, (draft) => { draft.forEach((node) => { node.data._isEntering = false node.data._inParallelHovering = false @@ -284,7 +271,8 @@ export const useNodesInteractions = () => { setNodes(newNodes) const newEdges = produce(edges, (draft) => { draft.forEach((edge) => { - edge.data._connectedNodeIsHovering = false + if (edge.data) + edge.data._connectedNodeIsHovering = false }) }) setEdges(newEdges) @@ -293,14 +281,8 @@ export const useNodesInteractions = () => { const handleNodeSelect = useCallback((nodeId: string, cancelSelection?: boolean, initShowLastRunTab?: boolean) => { if(initShowLastRunTab) workflowStore.setState({ initShowLastRunTab: true }) - const { - getNodes, - setNodes, - edges, - setEdges, - } = store.getState() - const nodes = getNodes() + const { nodes, edges, setNodes, setEdges } = collaborativeWorkflow.getState() const selectedNode = nodes.find(node => node.data.selected) if (!cancelSelection && selectedNode?.id === nodeId) @@ -357,13 +339,7 @@ export const useNodesInteractions = () => { if (getNodesReadOnly()) return - const { - getNodes, - setNodes, - edges, - setEdges, - } = store.getState() - const nodes = getNodes() + const { nodes, edges, setNodes, setEdges } = collaborativeWorkflow.getState() const targetNode = nodes.find(node => node.id === target!) const sourceNode = nodes.find(node => node.id === source!) @@ -440,8 +416,8 @@ export const useNodesInteractions = () => { if (nodeId && handleType) { const { setConnectingNodePayload } = workflowStore.getState() - const { getNodes } = store.getState() - const node = getNodes().find(n => n.id === nodeId)! + const { nodes } = collaborativeWorkflow.getState() + const node = nodes.find(n => n.id === nodeId)! if (node.type === CUSTOM_NOTE_NODE) return @@ -476,11 +452,7 @@ export const useNodesInteractions = () => { hoveringAssignVariableGroupId, } = workflowStore.getState() const { screenToFlowPosition } = reactflow - const { - getNodes, - setNodes, - } = store.getState() - const nodes = getNodes() + const { nodes, setNodes } = collaborativeWorkflow.getState() const fromHandleType = connectingNodePayload.handleType const fromHandleId = connectingNodePayload.handleId const fromNode = nodes.find(n => n.id === connectingNodePayload.nodeId)! @@ -540,14 +512,7 @@ export const useNodesInteractions = () => { if (getNodesReadOnly()) return - const { - getNodes, - setNodes, - edges, - setEdges, - } = store.getState() - - const nodes = getNodes() + const { nodes, edges, setNodes, setEdges } = collaborativeWorkflow.getState() const currentNodeIndex = nodes.findIndex(node => node.id === nodeId) const currentNode = nodes[currentNodeIndex] @@ -681,13 +646,7 @@ export const useNodesInteractions = () => { if (getNodesReadOnly()) return - const { - getNodes, - setNodes, - edges, - setEdges, - } = store.getState() - const nodes = getNodes() + const { nodes, edges, setNodes, setEdges } = collaborativeWorkflow.getState() const nodesWithSameType = nodes.filter(node => node.data.type === nodeType) const { newNode, @@ -1111,13 +1070,7 @@ export const useNodesInteractions = () => { if (getNodesReadOnly()) return - const { - getNodes, - setNodes, - edges, - setEdges, - } = store.getState() - const nodes = getNodes() + const { nodes, edges, setNodes, setEdges } = collaborativeWorkflow.getState() const currentNode = nodes.find(node => node.id === currentNodeId)! const connectedEdges = getConnectedEdges([currentNode], edges) const nodesWithSameType = nodes.filter(node => node.data.type === nodeType) @@ -1185,12 +1138,7 @@ export const useNodesInteractions = () => { }, [getNodesReadOnly, store, t, handleSyncWorkflowDraft, saveStateToHistory]) const handleNodesCancelSelected = useCallback(() => { - const { - getNodes, - setNodes, - } = store.getState() - - const nodes = getNodes() + const { nodes, setNodes } = collaborativeWorkflow.getState() const newNodes = produce(nodes, (draft) => { draft.forEach((node) => { node.data.selected = false @@ -1225,11 +1173,7 @@ export const useNodesInteractions = () => { const { setClipboardElements } = workflowStore.getState() - const { - getNodes, - } = store.getState() - - const nodes = getNodes() + const { nodes } = collaborativeWorkflow.getState() if (nodeId) { // If nodeId is provided, copy that specific node @@ -1264,16 +1208,10 @@ export const useNodesInteractions = () => { mousePosition, } = workflowStore.getState() - const { - getNodes, - setNodes, - edges, - setEdges, - } = store.getState() + const { nodes, edges, setNodes, setEdges } = collaborativeWorkflow.getState() const nodesToPaste: Node[] = [] const edgesToPaste: Edge[] = [] - const nodes = getNodes() if (clipboardElements.length) { const { x, y } = getTopLeftNodePosition(clipboardElements) @@ -1382,12 +1320,8 @@ export const useNodesInteractions = () => { if (getNodesReadOnly()) return - const { - getNodes, - edges, - } = store.getState() + const { nodes, edges } = collaborativeWorkflow.getState() - const nodes = getNodes() const bundledNodes = nodes.filter(node => node.data._isBundled && node.data.type !== BlockEnum.Start) if (bundledNodes.length) { @@ -1410,13 +1344,9 @@ export const useNodesInteractions = () => { if (getNodesReadOnly()) return - const { - getNodes, - setNodes, - } = store.getState() + const { nodes, setNodes } = collaborativeWorkflow.getState() const { x, y, width, height } = params - const nodes = getNodes() const currentNode = nodes.find(n => n.id === nodeId)! const childrenNodes = nodes.filter(n => currentNode.data._children?.find((c: any) => c.nodeId === n.id)) let rightNode: Node @@ -1469,12 +1399,7 @@ export const useNodesInteractions = () => { if (getNodesReadOnly()) return - const { - getNodes, - edges, - } = store.getState() - const nodes = getNodes() - const { setNodes, setEdges } = useCollaborationStore.getState() + const { nodes, edges, setNodes, setEdges } = collaborativeWorkflow.getState() const currentNode = nodes.find(node => node.id === nodeId)! const connectedEdges = getConnectedEdges([currentNode], edges) const nodesConnectedSourceOrTargetHandleIdsMap = getNodesConnectedSourceOrTargetHandleIdsMap( @@ -1504,7 +1429,7 @@ export const useNodesInteractions = () => { if (getNodesReadOnly() || getWorkflowReadOnly()) return - const { setEdges, setNodes } = store.getState() + const { setNodes, setEdges } = collaborativeWorkflow.getState() undo() const { edges, nodes } = workflowHistoryStore.getState() @@ -1519,7 +1444,7 @@ export const useNodesInteractions = () => { if (getNodesReadOnly() || getWorkflowReadOnly()) return - const { setEdges, setNodes } = store.getState() + const { setNodes, setEdges } = collaborativeWorkflow.getState() redo() const { edges, nodes } = workflowHistoryStore.getState() diff --git a/web/app/components/workflow/index.tsx b/web/app/components/workflow/index.tsx index 377a3fe572..8ea861ebb4 100644 --- a/web/app/components/workflow/index.tsx +++ b/web/app/components/workflow/index.tsx @@ -83,7 +83,6 @@ import Confirm from '@/app/components/base/confirm' import DatasetsDetailProvider from './datasets-detail-store/provider' import { HooksStoreContextProvider } from './hooks-store' import type { Shape as HooksStoreShape } from './hooks-store' -import { useCollaborationStore } from '@/app/components/workflow/store/collaboration-store' const nodeTypes = { [CUSTOM_NODE]: CustomNode, @@ -128,21 +127,6 @@ export const Workflow: FC = memo(({ return workflowCanvasHeight - bottomPanelHeight }, [workflowCanvasHeight, bottomPanelHeight]) - const collaborationNodes = useCollaborationStore((state) => { - return state.nodes - }) - const collaborationEdges = useCollaborationStore((state) => { - return state.edges - }) - - useEffect(() => { - setNodes(collaborationNodes) - }, [collaborationNodes, setNodes]) - - useEffect(() => { - setEdges(collaborationEdges) - }, [collaborationEdges, setEdges]) - // update workflow Canvas width and height useEffect(() => { if (workflowContainerRef.current) { diff --git a/web/app/components/workflow/store/collaboration-store.ts b/web/app/components/workflow/store/collaboration-store.ts deleted file mode 100644 index 51fe6f1c30..0000000000 --- a/web/app/components/workflow/store/collaboration-store.ts +++ /dev/null @@ -1,194 +0,0 @@ -import { create } from 'zustand' - -import { LoroDoc } from 'loro-crdt' -import { isEqual } from 'lodash-es' -import type { Edge, Node } from '../types' -import { useWebSocketStore } from './websocket-store' - -class LoroSocketIOProvider { - private doc: any - private socket: any - - constructor(socket: any, doc: any) { - this.socket = socket - this.doc = doc - - this.setupEventListeners() - } - - private setupEventListeners() { - this.doc.subscribe((event: any) => { - if (event.origin !== 'remote') { - const update = this.doc.export({ mode: 'update' }) - this.socket.emit('graph_update', update) - } - }) - - this.socket.on('graph_update', (updateData: Uint8Array) => { - try { - const data = new Uint8Array(updateData) - this.doc.import(data) - } - catch (error) { - console.error('Error importing graph update:', error) - } - }) - } - - destroy() { - this.socket.off('graph_update') - } -} - -type CollaborationStore = { - loroDoc: any | null - provider: LoroSocketIOProvider | null - nodesMap: any | null - edgesMap: any | null - nodes: Node[] - edges: Edge[] - updateNodes?: () => void - updateEdges?: () => void - - setNodes: (newNodes: Node[]) => void - setEdges: (newEdges: Edge[]) => void - initCollaboration: (appId: string) => void - destroyCollaboration: () => void -} - -export const useCollaborationStore = create((set, get) => ({ - loroDoc: null, - provider: null, - nodesMap: null, - edgesMap: null, - nodes: [], - edges: [], - - setNodes: (newNodes: Node[]) => { - const { nodes: oldNodes, nodesMap, loroDoc } = get() - - const oldNodesMap = new Map(oldNodes.map(node => [node.id, node])) - const newNodesMap = new Map(newNodes.map(node => [node.id, node])) - - const getPersistentNodeData = (node: Node) => { - const { data, ...rest } = node - const filteredData = Object.fromEntries( - Object.entries(data).filter(([key]) => - !key.startsWith('_') && key !== 'selected', - ), - ) - - return { - ...rest, - data: filteredData, - } - } - - // delete - oldNodes.forEach((oldNode) => { - if (!newNodesMap.has(oldNode.id)) - nodesMap.delete(oldNode.id) - }) - - newNodes.forEach((newNode) => { - const oldNode = oldNodesMap.get(newNode.id) - if (!oldNode) { - // add - nodesMap.set(newNode.id, newNode) - } - else { - const oldPersistentData = getPersistentNodeData(oldNode) - const newPersistentData = getPersistentNodeData(newNode) - - if (!isEqual(oldPersistentData, newPersistentData)) { - // update - nodesMap.set(newNode.id, newNode) - } - } - }) - loroDoc.commit() - }, - - setEdges: (newEdges: Edge[]) => { - const { edges: oldEdges, edgesMap, loroDoc } = get() - - const oldEdgesMap = new Map(oldEdges.map(edge => [edge.id, edge])) - const newEdgesMap = new Map(newEdges.map(edge => [edge.id, edge])) - - // delete - oldEdges.forEach((oldEdge) => { - if (!newEdgesMap.has(oldEdge.id)) - edgesMap.delete(oldEdge.id) - }) - - newEdges.forEach((newEdge) => { - const oldEdge = oldEdgesMap.get(newEdge.id) - if (!oldEdge) { - // add - edgesMap.set(newEdge.id, newEdge) - } - else if (!isEqual(oldEdge, newEdge)) { - // update - edgesMap.set(newEdge.id, newEdge) - } - }) - - loroDoc.commit() - }, - - initCollaboration: (appId: string) => { - const { getSocket } = useWebSocketStore.getState() - const socket = getSocket(appId) - const doc = new LoroDoc() - const nodesMap = doc.getMap('nodes') - const edgesMap = doc.getMap('edges') - - const updateNodes = () => { - const nodes = Array.from(nodesMap.values()) - set({ nodes }) - } - - const updateEdges = () => { - const edges = Array.from(edgesMap.values()) - set({ edges }) - } - - const provider = new LoroSocketIOProvider(socket, doc) - - set({ - loroDoc: doc, - provider, - nodesMap, - edgesMap, - nodes: [], - edges: [], - updateNodes, - updateEdges, - }) - - nodesMap.subscribe((event: any) => { - console.log('NodesMap changed:', event) - updateNodes() - }) - - edgesMap.subscribe((event: any) => { - console.log('EdgesMap changed:', event) - updateEdges() - }) - }, - - destroyCollaboration: () => { - const { provider } = get() - if (provider) { - provider.destroy() - set({ - loroDoc: null, - provider: null, - nodesMap: null, - edgesMap: null, - nodes: [], - edges: [], - }) - } - }, -})) diff --git a/web/app/components/workflow/store/workflow/workflow-draft-slice.ts b/web/app/components/workflow/store/workflow/workflow-draft-slice.ts index c6d62e4628..4a6c7b433e 100644 --- a/web/app/components/workflow/store/workflow/workflow-draft-slice.ts +++ b/web/app/components/workflow/store/workflow/workflow-draft-slice.ts @@ -26,6 +26,7 @@ export type WorkflowDraftSliceShape = { export const createWorkflowDraftSlice: StateCreator = set => ({ backupDraft: undefined, setBackupDraft: backupDraft => set(() => ({ backupDraft })), + // TODO: hjlarry test collaboration debouncedSyncWorkflowDraft: debounce((syncWorkflowDraft) => { syncWorkflowDraft() }, 500000),