mirror of https://github.com/langgenius/dify.git
refactor collaboration
This commit is contained in:
parent
7233b4de55
commit
348fd18230
|
|
@ -22,6 +22,8 @@ import { useStore, useWorkflowStore } from '@/app/components/workflow/store'
|
|||
import { useWebSocketStore } from '@/app/components/workflow/store/websocket-store'
|
||||
import { useCollaborativeCursors } from '../hooks'
|
||||
import type { OnlineUser } from '@/service/demo/online-user'
|
||||
import { collaborationManager } from '@/app/components/workflow/collaboration/manage'
|
||||
import { useStoreApi } from 'reactflow'
|
||||
|
||||
type WorkflowMainProps = Pick<WorkflowProps, 'nodes' | 'edges' | 'viewport'>
|
||||
const WorkflowMain = ({
|
||||
|
|
@ -36,6 +38,12 @@ const WorkflowMain = ({
|
|||
const lastEmitTimeRef = useRef<number>(0)
|
||||
const lastPositionRef = useRef<{ x: number; y: number } | null>(null)
|
||||
|
||||
const store = useStoreApi()
|
||||
|
||||
useEffect(() => {
|
||||
collaborationManager.init(appId, store)
|
||||
}, [appId, store])
|
||||
|
||||
const { emit, getSocket } = useWebSocketStore()
|
||||
|
||||
const handleWorkflowDataUpdate = useCallback((payload: any) => {
|
||||
|
|
@ -87,7 +95,6 @@ const WorkflowMain = ({
|
|||
}
|
||||
}, [emit])
|
||||
|
||||
// Add mouse move event listener
|
||||
useEffect(() => {
|
||||
const container = containerRef.current
|
||||
if (!container) return
|
||||
|
|
@ -132,7 +139,6 @@ const WorkflowMain = ({
|
|||
setOnlineUsers(usersMap)
|
||||
}
|
||||
socket.on('online_users', handleOnlineUsersUpdate)
|
||||
// clean up
|
||||
return () => {
|
||||
socket.off('online_users', handleOnlineUsersUpdate)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -130,7 +130,9 @@ export const useNodesSyncDraft = () => {
|
|||
if (error && error.json && !error.bodyUsed) {
|
||||
error.json().then((err: any) => {
|
||||
if (err.code === 'draft_workflow_not_sync' && !notRefreshWhenSyncError)
|
||||
handleRefreshWorkflowDraft()
|
||||
// TODO: hjlarry test collaboration
|
||||
// handleRefreshWorkflowDraft()
|
||||
console.error('draft_workflow_not_sync', err)
|
||||
})
|
||||
}
|
||||
callback?.onError && callback.onError()
|
||||
|
|
|
|||
|
|
@ -17,7 +17,6 @@ import {
|
|||
} from '@/service/workflow'
|
||||
import type { FetchWorkflowDraftResponse } from '@/types/workflow'
|
||||
import { useWorkflowConfig } from '@/service/use-workflow'
|
||||
import { useCollaborationStore } from '@/app/components/workflow/store/collaboration-store'
|
||||
|
||||
export const useWorkflowInit = () => {
|
||||
const workflowStore = useWorkflowStore()
|
||||
|
|
@ -40,28 +39,9 @@ export const useWorkflowInit = () => {
|
|||
}, [workflowStore])
|
||||
useWorkflowConfig(appDetail.id, handleUpdateWorkflowConfig)
|
||||
|
||||
const initializeCollaboration = async (appId: string) => {
|
||||
const { initCollaboration } = useCollaborationStore.getState()
|
||||
initCollaboration(appId)
|
||||
|
||||
return new Promise<void>((resolve) => {
|
||||
const checkInitialized = () => {
|
||||
const { nodesMap, edgesMap } = useCollaborationStore.getState()
|
||||
if (nodesMap && edgesMap)
|
||||
resolve()
|
||||
else
|
||||
setTimeout(checkInitialized, 50)
|
||||
}
|
||||
checkInitialized()
|
||||
})
|
||||
}
|
||||
|
||||
const handleGetInitialWorkflowData = useCallback(async () => {
|
||||
try {
|
||||
const [res] = await Promise.all([
|
||||
fetchWorkflowDraft(`/apps/${appDetail.id}/workflows/draft`),
|
||||
initializeCollaboration(appDetail.id),
|
||||
])
|
||||
const res = await fetchWorkflowDraft(`/apps/${appDetail.id}/workflows/draft`)
|
||||
setData(res)
|
||||
workflowStore.setState({
|
||||
envSecrets: (res.environment_variables || []).filter(env => env.value_type === 'secret').reduce((acc, env) => {
|
||||
|
|
|
|||
|
|
@ -1,7 +1,6 @@
|
|||
import { useCallback } from 'react'
|
||||
import { useWorkflowStore } from '@/app/components/workflow/store'
|
||||
import { fetchWorkflowDraft } from '@/service/workflow'
|
||||
import type { WorkflowDataUpdater } from '@/app/components/workflow/types'
|
||||
import { useWorkflowUpdate } from '@/app/components/workflow/hooks'
|
||||
|
||||
export const useWorkflowRefreshDraft = () => {
|
||||
|
|
@ -19,7 +18,8 @@ export const useWorkflowRefreshDraft = () => {
|
|||
} = workflowStore.getState()
|
||||
setIsSyncingWorkflowDraft(true)
|
||||
fetchWorkflowDraft(`/apps/${appId}/workflows/draft`).then((response) => {
|
||||
handleUpdateWorkflowCanvas(response.graph as WorkflowDataUpdater)
|
||||
// TODO: hjlarry test collaboration
|
||||
// handleUpdateWorkflowCanvas(response.graph as WorkflowDataUpdater)
|
||||
setSyncWorkflowDraftHash(response.hash)
|
||||
setEnvSecrets((response.environment_variables || []).filter(env => env.value_type === 'secret').reduce((acc, env) => {
|
||||
acc[env.id] = env.value
|
||||
|
|
|
|||
|
|
@ -23,21 +23,21 @@ import {
|
|||
} from '@/app/components/workflow/context'
|
||||
import { createWorkflowSlice } from './store/workflow/workflow-slice'
|
||||
import WorkflowAppMain from './components/workflow-main'
|
||||
import { useCollaborationStore } from '@/app/components/workflow/store/collaboration-store'
|
||||
import { collaborationManager } from '@/app/components/workflow/collaboration/manage'
|
||||
|
||||
const WorkflowAppWithAdditionalContext = () => {
|
||||
const {
|
||||
data,
|
||||
isLoading,
|
||||
} = useWorkflowInit()
|
||||
const { setNodes, setEdges } = collaborationManager
|
||||
|
||||
const { data: fileUploadConfigResponse } = useSWR({ url: '/files/upload' }, fetchFileUploadConfig)
|
||||
|
||||
const nodesData = useMemo(() => {
|
||||
if (data) {
|
||||
const processedNodes = initialNodes(data.graph.nodes, data.graph.edges)
|
||||
|
||||
const { setNodes } = useCollaborationStore.getState()
|
||||
setNodes(processedNodes)
|
||||
setNodes([], processedNodes)
|
||||
|
||||
return processedNodes
|
||||
}
|
||||
|
|
@ -47,9 +47,7 @@ const WorkflowAppWithAdditionalContext = () => {
|
|||
const edgesData = useMemo(() => {
|
||||
if (data) {
|
||||
const processedEdges = initialEdges(data.graph.edges, data.graph.nodes)
|
||||
|
||||
const { setEdges } = useCollaborationStore.getState()
|
||||
setEdges(processedEdges)
|
||||
setEdges([], processedEdges)
|
||||
|
||||
return processedEdges
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,154 @@
|
|||
import { LoroDoc } from 'loro-crdt'
|
||||
import { isEqual } from 'lodash-es'
|
||||
import { useWebSocketStore } from '../store/websocket-store'
|
||||
import type { Edge, Node } from '../types'
|
||||
|
||||
class LoroSocketIOProvider {
|
||||
private doc: any
|
||||
private socket: any
|
||||
|
||||
constructor(socket: any, doc: any) {
|
||||
this.socket = socket
|
||||
this.doc = doc
|
||||
this.setupEventListeners()
|
||||
}
|
||||
|
||||
private setupEventListeners() {
|
||||
this.doc.subscribe((event: any) => {
|
||||
if (event.origin !== 'remote') {
|
||||
const update = this.doc.export({ mode: 'update' })
|
||||
this.socket.emit('graph_update', update)
|
||||
}
|
||||
})
|
||||
|
||||
this.socket.on('graph_update', (updateData: Uint8Array) => {
|
||||
try {
|
||||
const data = new Uint8Array(updateData)
|
||||
this.doc.import(data)
|
||||
}
|
||||
catch (error) {
|
||||
console.error('Error importing graph update:', error)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
destroy() {
|
||||
this.socket.off('graph_update')
|
||||
}
|
||||
}
|
||||
|
||||
class CollaborationManager {
|
||||
private doc: LoroDoc | null = null
|
||||
private provider: LoroSocketIOProvider | null = null
|
||||
private nodesMap: any = null
|
||||
private edgesMap: any = null
|
||||
|
||||
init(appId: string, reactFlowStore: any) {
|
||||
const { getSocket } = useWebSocketStore.getState()
|
||||
const socket = getSocket(appId)
|
||||
this.doc = new LoroDoc()
|
||||
this.nodesMap = this.doc.getMap('nodes')
|
||||
this.edgesMap = this.doc.getMap('edges')
|
||||
this.provider = new LoroSocketIOProvider(socket, this.doc)
|
||||
|
||||
this.setupSubscriptions(reactFlowStore)
|
||||
}
|
||||
|
||||
private setupSubscriptions(reactFlowStore: any) {
|
||||
this.nodesMap?.subscribe((event: any) => {
|
||||
console.log('nodesMap', event)
|
||||
if (event.by === 'import') {
|
||||
requestAnimationFrame(() => {
|
||||
const { setNodes } = reactFlowStore.getState()
|
||||
const updatedNodes = Array.from(this.nodesMap.values())
|
||||
setNodes(updatedNodes)
|
||||
})
|
||||
}
|
||||
})
|
||||
|
||||
this.edgesMap?.subscribe((event: any) => {
|
||||
if (event.by === 'import') {
|
||||
requestAnimationFrame(() => {
|
||||
const { setEdges } = reactFlowStore.getState()
|
||||
const updatedEdges = Array.from(this.edgesMap.values())
|
||||
setEdges(updatedEdges)
|
||||
})
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
getNodes() {
|
||||
return this.nodesMap ? Array.from(this.nodesMap.values()) : []
|
||||
}
|
||||
|
||||
getEdges() {
|
||||
return this.edgesMap ? Array.from(this.edgesMap.values()) : []
|
||||
}
|
||||
|
||||
private getPersistentNodeData = (node: Node) => {
|
||||
const { data, ...rest } = node
|
||||
const filteredData = Object.fromEntries(
|
||||
Object.entries(data).filter(([key]) => !key.startsWith('_')),
|
||||
)
|
||||
return { ...rest, data: filteredData }
|
||||
}
|
||||
|
||||
setNodes = (oldNodes: Node[], newNodes: Node[]) => {
|
||||
if (!this.nodesMap || !this.doc) return
|
||||
|
||||
const oldNodesMap = new Map(oldNodes.map(node => [node.id, node]))
|
||||
const newNodesMap = new Map(newNodes.map(node => [node.id, node]))
|
||||
|
||||
oldNodes.forEach((oldNode) => {
|
||||
if (!newNodesMap.has(oldNode.id))
|
||||
this.nodesMap.delete(oldNode.id)
|
||||
})
|
||||
|
||||
newNodes.forEach((newNode) => {
|
||||
const oldNode = oldNodesMap.get(newNode.id)
|
||||
if (!oldNode) {
|
||||
this.nodesMap.set(newNode.id, newNode)
|
||||
}
|
||||
else {
|
||||
const oldPersistentData = this.getPersistentNodeData(oldNode)
|
||||
const newPersistentData = this.getPersistentNodeData(newNode)
|
||||
if (!isEqual(oldPersistentData, newPersistentData))
|
||||
this.nodesMap.set(newNode.id, newPersistentData)
|
||||
}
|
||||
})
|
||||
|
||||
this.doc.commit()
|
||||
}
|
||||
|
||||
setEdges = (oldEdges: Edge[], newEdges: Edge[]) => {
|
||||
if (!this.edgesMap || !this.doc) return
|
||||
|
||||
const oldEdgesMap = new Map(oldEdges.map(edge => [edge.id, edge]))
|
||||
const newEdgesMap = new Map(newEdges.map(edge => [edge.id, edge]))
|
||||
|
||||
oldEdges.forEach((oldEdge) => {
|
||||
if (!newEdgesMap.has(oldEdge.id))
|
||||
this.edgesMap.delete(oldEdge.id)
|
||||
})
|
||||
|
||||
newEdges.forEach((newEdge) => {
|
||||
const oldEdge = oldEdgesMap.get(newEdge.id)
|
||||
if (!oldEdge)
|
||||
this.edgesMap.set(newEdge.id, newEdge)
|
||||
else if (!isEqual(oldEdge, newEdge))
|
||||
this.edgesMap.set(newEdge.id, newEdge)
|
||||
})
|
||||
|
||||
this.doc.commit()
|
||||
}
|
||||
|
||||
destroy() {
|
||||
this.provider?.destroy()
|
||||
this.doc = null
|
||||
this.provider = null
|
||||
this.nodesMap = null
|
||||
this.edgesMap = null
|
||||
}
|
||||
}
|
||||
|
||||
export const collaborationManager = new CollaborationManager()
|
||||
|
|
@ -0,0 +1,41 @@
|
|||
import { useCallback } from 'react'
|
||||
import { useStoreApi } from 'reactflow'
|
||||
import type { Edge, Node } from '../types'
|
||||
import { collaborationManager } from '../collaboration/manage'
|
||||
|
||||
export const useCollaborativeWorkflow = () => {
|
||||
const store = useStoreApi()
|
||||
const { setNodes: collabSetNodes, setEdges: collabSetEdges } = collaborationManager
|
||||
|
||||
const setNodes = useCallback((newNodes: Node[]) => {
|
||||
const { getNodes, setNodes: reactFlowSetNodes } = store.getState()
|
||||
const oldNodes = getNodes()
|
||||
collabSetNodes(oldNodes, newNodes)
|
||||
reactFlowSetNodes(newNodes)
|
||||
}, [store, collabSetNodes])
|
||||
|
||||
const setEdges = useCallback((newEdges: Edge[]) => {
|
||||
const { edges, setEdges: reactFlowSetEdges } = store.getState()
|
||||
collabSetEdges(edges, newEdges)
|
||||
reactFlowSetEdges(newEdges)
|
||||
}, [store, collabSetEdges])
|
||||
|
||||
const collaborativeStore = useCallback(() => {
|
||||
const state = store.getState()
|
||||
return {
|
||||
|
||||
nodes: state.getNodes(),
|
||||
edges: state.edges,
|
||||
|
||||
setNodes,
|
||||
setEdges,
|
||||
|
||||
}
|
||||
}, [store, setNodes, setEdges])
|
||||
|
||||
return {
|
||||
getState: collaborativeStore,
|
||||
setNodes,
|
||||
setEdges,
|
||||
}
|
||||
}
|
||||
|
|
@ -61,11 +61,12 @@ import {
|
|||
} from './use-workflow'
|
||||
import { WorkflowHistoryEvent, useWorkflowHistory } from './use-workflow-history'
|
||||
import useInspectVarsCrud from './use-inspect-vars-crud'
|
||||
import { useCollaborationStore } from '@/app/components/workflow/store/collaboration-store'
|
||||
import { useCollaborativeWorkflow } from './use-collaborative-workflow'
|
||||
|
||||
export const useNodesInteractions = () => {
|
||||
const { t } = useTranslation()
|
||||
const store = useStoreApi()
|
||||
const collaborativeWorkflow = useCollaborativeWorkflow()
|
||||
const workflowStore = useWorkflowStore()
|
||||
const reactflow = useReactFlow()
|
||||
const { store: workflowHistoryStore } = useWorkflowHistoryStore()
|
||||
|
|
@ -114,13 +115,9 @@ export const useNodesInteractions = () => {
|
|||
if (node.type === CUSTOM_LOOP_START_NODE)
|
||||
return
|
||||
|
||||
const {
|
||||
getNodes,
|
||||
setNodes,
|
||||
} = store.getState()
|
||||
e.stopPropagation()
|
||||
|
||||
const nodes = getNodes()
|
||||
const { nodes, setNodes } = collaborativeWorkflow.getState()
|
||||
|
||||
const { restrictPosition } = handleNodeIterationChildDrag(node)
|
||||
const { restrictPosition: restrictLoopPosition } = handleNodeLoopChildDrag(node)
|
||||
|
|
@ -188,13 +185,7 @@ export const useNodesInteractions = () => {
|
|||
if (node.type === CUSTOM_LOOP_START_NODE || node.type === CUSTOM_NOTE_NODE)
|
||||
return
|
||||
|
||||
const {
|
||||
getNodes,
|
||||
setNodes,
|
||||
edges,
|
||||
setEdges,
|
||||
} = store.getState()
|
||||
const nodes = getNodes()
|
||||
const { nodes, edges, setNodes, setEdges } = collaborativeWorkflow.getState()
|
||||
const {
|
||||
connectingNodePayload,
|
||||
setEnteringNodePayload,
|
||||
|
|
@ -269,13 +260,9 @@ export const useNodesInteractions = () => {
|
|||
setEnteringNodePayload,
|
||||
} = workflowStore.getState()
|
||||
setEnteringNodePayload(undefined)
|
||||
const {
|
||||
getNodes,
|
||||
setNodes,
|
||||
edges,
|
||||
setEdges,
|
||||
} = store.getState()
|
||||
const newNodes = produce(getNodes(), (draft) => {
|
||||
|
||||
const { nodes, edges, setNodes, setEdges } = collaborativeWorkflow.getState()
|
||||
const newNodes = produce(nodes, (draft) => {
|
||||
draft.forEach((node) => {
|
||||
node.data._isEntering = false
|
||||
node.data._inParallelHovering = false
|
||||
|
|
@ -284,7 +271,8 @@ export const useNodesInteractions = () => {
|
|||
setNodes(newNodes)
|
||||
const newEdges = produce(edges, (draft) => {
|
||||
draft.forEach((edge) => {
|
||||
edge.data._connectedNodeIsHovering = false
|
||||
if (edge.data)
|
||||
edge.data._connectedNodeIsHovering = false
|
||||
})
|
||||
})
|
||||
setEdges(newEdges)
|
||||
|
|
@ -293,14 +281,8 @@ export const useNodesInteractions = () => {
|
|||
const handleNodeSelect = useCallback((nodeId: string, cancelSelection?: boolean, initShowLastRunTab?: boolean) => {
|
||||
if(initShowLastRunTab)
|
||||
workflowStore.setState({ initShowLastRunTab: true })
|
||||
const {
|
||||
getNodes,
|
||||
setNodes,
|
||||
edges,
|
||||
setEdges,
|
||||
} = store.getState()
|
||||
|
||||
const nodes = getNodes()
|
||||
const { nodes, edges, setNodes, setEdges } = collaborativeWorkflow.getState()
|
||||
const selectedNode = nodes.find(node => node.data.selected)
|
||||
|
||||
if (!cancelSelection && selectedNode?.id === nodeId)
|
||||
|
|
@ -357,13 +339,7 @@ export const useNodesInteractions = () => {
|
|||
if (getNodesReadOnly())
|
||||
return
|
||||
|
||||
const {
|
||||
getNodes,
|
||||
setNodes,
|
||||
edges,
|
||||
setEdges,
|
||||
} = store.getState()
|
||||
const nodes = getNodes()
|
||||
const { nodes, edges, setNodes, setEdges } = collaborativeWorkflow.getState()
|
||||
const targetNode = nodes.find(node => node.id === target!)
|
||||
const sourceNode = nodes.find(node => node.id === source!)
|
||||
|
||||
|
|
@ -440,8 +416,8 @@ export const useNodesInteractions = () => {
|
|||
|
||||
if (nodeId && handleType) {
|
||||
const { setConnectingNodePayload } = workflowStore.getState()
|
||||
const { getNodes } = store.getState()
|
||||
const node = getNodes().find(n => n.id === nodeId)!
|
||||
const { nodes } = collaborativeWorkflow.getState()
|
||||
const node = nodes.find(n => n.id === nodeId)!
|
||||
|
||||
if (node.type === CUSTOM_NOTE_NODE)
|
||||
return
|
||||
|
|
@ -476,11 +452,7 @@ export const useNodesInteractions = () => {
|
|||
hoveringAssignVariableGroupId,
|
||||
} = workflowStore.getState()
|
||||
const { screenToFlowPosition } = reactflow
|
||||
const {
|
||||
getNodes,
|
||||
setNodes,
|
||||
} = store.getState()
|
||||
const nodes = getNodes()
|
||||
const { nodes, setNodes } = collaborativeWorkflow.getState()
|
||||
const fromHandleType = connectingNodePayload.handleType
|
||||
const fromHandleId = connectingNodePayload.handleId
|
||||
const fromNode = nodes.find(n => n.id === connectingNodePayload.nodeId)!
|
||||
|
|
@ -540,14 +512,7 @@ export const useNodesInteractions = () => {
|
|||
if (getNodesReadOnly())
|
||||
return
|
||||
|
||||
const {
|
||||
getNodes,
|
||||
setNodes,
|
||||
edges,
|
||||
setEdges,
|
||||
} = store.getState()
|
||||
|
||||
const nodes = getNodes()
|
||||
const { nodes, edges, setNodes, setEdges } = collaborativeWorkflow.getState()
|
||||
const currentNodeIndex = nodes.findIndex(node => node.id === nodeId)
|
||||
const currentNode = nodes[currentNodeIndex]
|
||||
|
||||
|
|
@ -681,13 +646,7 @@ export const useNodesInteractions = () => {
|
|||
if (getNodesReadOnly())
|
||||
return
|
||||
|
||||
const {
|
||||
getNodes,
|
||||
setNodes,
|
||||
edges,
|
||||
setEdges,
|
||||
} = store.getState()
|
||||
const nodes = getNodes()
|
||||
const { nodes, edges, setNodes, setEdges } = collaborativeWorkflow.getState()
|
||||
const nodesWithSameType = nodes.filter(node => node.data.type === nodeType)
|
||||
const {
|
||||
newNode,
|
||||
|
|
@ -1111,13 +1070,7 @@ export const useNodesInteractions = () => {
|
|||
if (getNodesReadOnly())
|
||||
return
|
||||
|
||||
const {
|
||||
getNodes,
|
||||
setNodes,
|
||||
edges,
|
||||
setEdges,
|
||||
} = store.getState()
|
||||
const nodes = getNodes()
|
||||
const { nodes, edges, setNodes, setEdges } = collaborativeWorkflow.getState()
|
||||
const currentNode = nodes.find(node => node.id === currentNodeId)!
|
||||
const connectedEdges = getConnectedEdges([currentNode], edges)
|
||||
const nodesWithSameType = nodes.filter(node => node.data.type === nodeType)
|
||||
|
|
@ -1185,12 +1138,7 @@ export const useNodesInteractions = () => {
|
|||
}, [getNodesReadOnly, store, t, handleSyncWorkflowDraft, saveStateToHistory])
|
||||
|
||||
const handleNodesCancelSelected = useCallback(() => {
|
||||
const {
|
||||
getNodes,
|
||||
setNodes,
|
||||
} = store.getState()
|
||||
|
||||
const nodes = getNodes()
|
||||
const { nodes, setNodes } = collaborativeWorkflow.getState()
|
||||
const newNodes = produce(nodes, (draft) => {
|
||||
draft.forEach((node) => {
|
||||
node.data.selected = false
|
||||
|
|
@ -1225,11 +1173,7 @@ export const useNodesInteractions = () => {
|
|||
|
||||
const { setClipboardElements } = workflowStore.getState()
|
||||
|
||||
const {
|
||||
getNodes,
|
||||
} = store.getState()
|
||||
|
||||
const nodes = getNodes()
|
||||
const { nodes } = collaborativeWorkflow.getState()
|
||||
|
||||
if (nodeId) {
|
||||
// If nodeId is provided, copy that specific node
|
||||
|
|
@ -1264,16 +1208,10 @@ export const useNodesInteractions = () => {
|
|||
mousePosition,
|
||||
} = workflowStore.getState()
|
||||
|
||||
const {
|
||||
getNodes,
|
||||
setNodes,
|
||||
edges,
|
||||
setEdges,
|
||||
} = store.getState()
|
||||
const { nodes, edges, setNodes, setEdges } = collaborativeWorkflow.getState()
|
||||
|
||||
const nodesToPaste: Node[] = []
|
||||
const edgesToPaste: Edge[] = []
|
||||
const nodes = getNodes()
|
||||
|
||||
if (clipboardElements.length) {
|
||||
const { x, y } = getTopLeftNodePosition(clipboardElements)
|
||||
|
|
@ -1382,12 +1320,8 @@ export const useNodesInteractions = () => {
|
|||
if (getNodesReadOnly())
|
||||
return
|
||||
|
||||
const {
|
||||
getNodes,
|
||||
edges,
|
||||
} = store.getState()
|
||||
const { nodes, edges } = collaborativeWorkflow.getState()
|
||||
|
||||
const nodes = getNodes()
|
||||
const bundledNodes = nodes.filter(node => node.data._isBundled && node.data.type !== BlockEnum.Start)
|
||||
|
||||
if (bundledNodes.length) {
|
||||
|
|
@ -1410,13 +1344,9 @@ export const useNodesInteractions = () => {
|
|||
if (getNodesReadOnly())
|
||||
return
|
||||
|
||||
const {
|
||||
getNodes,
|
||||
setNodes,
|
||||
} = store.getState()
|
||||
const { nodes, setNodes } = collaborativeWorkflow.getState()
|
||||
const { x, y, width, height } = params
|
||||
|
||||
const nodes = getNodes()
|
||||
const currentNode = nodes.find(n => n.id === nodeId)!
|
||||
const childrenNodes = nodes.filter(n => currentNode.data._children?.find((c: any) => c.nodeId === n.id))
|
||||
let rightNode: Node
|
||||
|
|
@ -1469,12 +1399,7 @@ export const useNodesInteractions = () => {
|
|||
if (getNodesReadOnly())
|
||||
return
|
||||
|
||||
const {
|
||||
getNodes,
|
||||
edges,
|
||||
} = store.getState()
|
||||
const nodes = getNodes()
|
||||
const { setNodes, setEdges } = useCollaborationStore.getState()
|
||||
const { nodes, edges, setNodes, setEdges } = collaborativeWorkflow.getState()
|
||||
const currentNode = nodes.find(node => node.id === nodeId)!
|
||||
const connectedEdges = getConnectedEdges([currentNode], edges)
|
||||
const nodesConnectedSourceOrTargetHandleIdsMap = getNodesConnectedSourceOrTargetHandleIdsMap(
|
||||
|
|
@ -1504,7 +1429,7 @@ export const useNodesInteractions = () => {
|
|||
if (getNodesReadOnly() || getWorkflowReadOnly())
|
||||
return
|
||||
|
||||
const { setEdges, setNodes } = store.getState()
|
||||
const { setNodes, setEdges } = collaborativeWorkflow.getState()
|
||||
undo()
|
||||
|
||||
const { edges, nodes } = workflowHistoryStore.getState()
|
||||
|
|
@ -1519,7 +1444,7 @@ export const useNodesInteractions = () => {
|
|||
if (getNodesReadOnly() || getWorkflowReadOnly())
|
||||
return
|
||||
|
||||
const { setEdges, setNodes } = store.getState()
|
||||
const { setNodes, setEdges } = collaborativeWorkflow.getState()
|
||||
redo()
|
||||
|
||||
const { edges, nodes } = workflowHistoryStore.getState()
|
||||
|
|
|
|||
|
|
@ -83,7 +83,6 @@ import Confirm from '@/app/components/base/confirm'
|
|||
import DatasetsDetailProvider from './datasets-detail-store/provider'
|
||||
import { HooksStoreContextProvider } from './hooks-store'
|
||||
import type { Shape as HooksStoreShape } from './hooks-store'
|
||||
import { useCollaborationStore } from '@/app/components/workflow/store/collaboration-store'
|
||||
|
||||
const nodeTypes = {
|
||||
[CUSTOM_NODE]: CustomNode,
|
||||
|
|
@ -128,21 +127,6 @@ export const Workflow: FC<WorkflowProps> = memo(({
|
|||
return workflowCanvasHeight - bottomPanelHeight
|
||||
}, [workflowCanvasHeight, bottomPanelHeight])
|
||||
|
||||
const collaborationNodes = useCollaborationStore((state) => {
|
||||
return state.nodes
|
||||
})
|
||||
const collaborationEdges = useCollaborationStore((state) => {
|
||||
return state.edges
|
||||
})
|
||||
|
||||
useEffect(() => {
|
||||
setNodes(collaborationNodes)
|
||||
}, [collaborationNodes, setNodes])
|
||||
|
||||
useEffect(() => {
|
||||
setEdges(collaborationEdges)
|
||||
}, [collaborationEdges, setEdges])
|
||||
|
||||
// update workflow Canvas width and height
|
||||
useEffect(() => {
|
||||
if (workflowContainerRef.current) {
|
||||
|
|
|
|||
|
|
@ -1,194 +0,0 @@
|
|||
import { create } from 'zustand'
|
||||
|
||||
import { LoroDoc } from 'loro-crdt'
|
||||
import { isEqual } from 'lodash-es'
|
||||
import type { Edge, Node } from '../types'
|
||||
import { useWebSocketStore } from './websocket-store'
|
||||
|
||||
class LoroSocketIOProvider {
|
||||
private doc: any
|
||||
private socket: any
|
||||
|
||||
constructor(socket: any, doc: any) {
|
||||
this.socket = socket
|
||||
this.doc = doc
|
||||
|
||||
this.setupEventListeners()
|
||||
}
|
||||
|
||||
private setupEventListeners() {
|
||||
this.doc.subscribe((event: any) => {
|
||||
if (event.origin !== 'remote') {
|
||||
const update = this.doc.export({ mode: 'update' })
|
||||
this.socket.emit('graph_update', update)
|
||||
}
|
||||
})
|
||||
|
||||
this.socket.on('graph_update', (updateData: Uint8Array) => {
|
||||
try {
|
||||
const data = new Uint8Array(updateData)
|
||||
this.doc.import(data)
|
||||
}
|
||||
catch (error) {
|
||||
console.error('Error importing graph update:', error)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
destroy() {
|
||||
this.socket.off('graph_update')
|
||||
}
|
||||
}
|
||||
|
||||
type CollaborationStore = {
|
||||
loroDoc: any | null
|
||||
provider: LoroSocketIOProvider | null
|
||||
nodesMap: any | null
|
||||
edgesMap: any | null
|
||||
nodes: Node[]
|
||||
edges: Edge[]
|
||||
updateNodes?: () => void
|
||||
updateEdges?: () => void
|
||||
|
||||
setNodes: (newNodes: Node[]) => void
|
||||
setEdges: (newEdges: Edge[]) => void
|
||||
initCollaboration: (appId: string) => void
|
||||
destroyCollaboration: () => void
|
||||
}
|
||||
|
||||
export const useCollaborationStore = create<CollaborationStore>((set, get) => ({
|
||||
loroDoc: null,
|
||||
provider: null,
|
||||
nodesMap: null,
|
||||
edgesMap: null,
|
||||
nodes: [],
|
||||
edges: [],
|
||||
|
||||
setNodes: (newNodes: Node[]) => {
|
||||
const { nodes: oldNodes, nodesMap, loroDoc } = get()
|
||||
|
||||
const oldNodesMap = new Map(oldNodes.map(node => [node.id, node]))
|
||||
const newNodesMap = new Map(newNodes.map(node => [node.id, node]))
|
||||
|
||||
const getPersistentNodeData = (node: Node) => {
|
||||
const { data, ...rest } = node
|
||||
const filteredData = Object.fromEntries(
|
||||
Object.entries(data).filter(([key]) =>
|
||||
!key.startsWith('_') && key !== 'selected',
|
||||
),
|
||||
)
|
||||
|
||||
return {
|
||||
...rest,
|
||||
data: filteredData,
|
||||
}
|
||||
}
|
||||
|
||||
// delete
|
||||
oldNodes.forEach((oldNode) => {
|
||||
if (!newNodesMap.has(oldNode.id))
|
||||
nodesMap.delete(oldNode.id)
|
||||
})
|
||||
|
||||
newNodes.forEach((newNode) => {
|
||||
const oldNode = oldNodesMap.get(newNode.id)
|
||||
if (!oldNode) {
|
||||
// add
|
||||
nodesMap.set(newNode.id, newNode)
|
||||
}
|
||||
else {
|
||||
const oldPersistentData = getPersistentNodeData(oldNode)
|
||||
const newPersistentData = getPersistentNodeData(newNode)
|
||||
|
||||
if (!isEqual(oldPersistentData, newPersistentData)) {
|
||||
// update
|
||||
nodesMap.set(newNode.id, newNode)
|
||||
}
|
||||
}
|
||||
})
|
||||
loroDoc.commit()
|
||||
},
|
||||
|
||||
setEdges: (newEdges: Edge[]) => {
|
||||
const { edges: oldEdges, edgesMap, loroDoc } = get()
|
||||
|
||||
const oldEdgesMap = new Map(oldEdges.map(edge => [edge.id, edge]))
|
||||
const newEdgesMap = new Map(newEdges.map(edge => [edge.id, edge]))
|
||||
|
||||
// delete
|
||||
oldEdges.forEach((oldEdge) => {
|
||||
if (!newEdgesMap.has(oldEdge.id))
|
||||
edgesMap.delete(oldEdge.id)
|
||||
})
|
||||
|
||||
newEdges.forEach((newEdge) => {
|
||||
const oldEdge = oldEdgesMap.get(newEdge.id)
|
||||
if (!oldEdge) {
|
||||
// add
|
||||
edgesMap.set(newEdge.id, newEdge)
|
||||
}
|
||||
else if (!isEqual(oldEdge, newEdge)) {
|
||||
// update
|
||||
edgesMap.set(newEdge.id, newEdge)
|
||||
}
|
||||
})
|
||||
|
||||
loroDoc.commit()
|
||||
},
|
||||
|
||||
initCollaboration: (appId: string) => {
|
||||
const { getSocket } = useWebSocketStore.getState()
|
||||
const socket = getSocket(appId)
|
||||
const doc = new LoroDoc()
|
||||
const nodesMap = doc.getMap('nodes')
|
||||
const edgesMap = doc.getMap('edges')
|
||||
|
||||
const updateNodes = () => {
|
||||
const nodes = Array.from(nodesMap.values())
|
||||
set({ nodes })
|
||||
}
|
||||
|
||||
const updateEdges = () => {
|
||||
const edges = Array.from(edgesMap.values())
|
||||
set({ edges })
|
||||
}
|
||||
|
||||
const provider = new LoroSocketIOProvider(socket, doc)
|
||||
|
||||
set({
|
||||
loroDoc: doc,
|
||||
provider,
|
||||
nodesMap,
|
||||
edgesMap,
|
||||
nodes: [],
|
||||
edges: [],
|
||||
updateNodes,
|
||||
updateEdges,
|
||||
})
|
||||
|
||||
nodesMap.subscribe((event: any) => {
|
||||
console.log('NodesMap changed:', event)
|
||||
updateNodes()
|
||||
})
|
||||
|
||||
edgesMap.subscribe((event: any) => {
|
||||
console.log('EdgesMap changed:', event)
|
||||
updateEdges()
|
||||
})
|
||||
},
|
||||
|
||||
destroyCollaboration: () => {
|
||||
const { provider } = get()
|
||||
if (provider) {
|
||||
provider.destroy()
|
||||
set({
|
||||
loroDoc: null,
|
||||
provider: null,
|
||||
nodesMap: null,
|
||||
edgesMap: null,
|
||||
nodes: [],
|
||||
edges: [],
|
||||
})
|
||||
}
|
||||
},
|
||||
}))
|
||||
|
|
@ -26,6 +26,7 @@ export type WorkflowDraftSliceShape = {
|
|||
export const createWorkflowDraftSlice: StateCreator<WorkflowDraftSliceShape> = set => ({
|
||||
backupDraft: undefined,
|
||||
setBackupDraft: backupDraft => set(() => ({ backupDraft })),
|
||||
// TODO: hjlarry test collaboration
|
||||
debouncedSyncWorkflowDraft: debounce((syncWorkflowDraft) => {
|
||||
syncWorkflowDraft()
|
||||
}, 500000),
|
||||
|
|
|
|||
Loading…
Reference in New Issue