mirror of https://github.com/langgenius/dify.git
add leader submit graph data
This commit is contained in:
parent
7dc8557033
commit
d44be2d835
|
|
@ -13,6 +13,7 @@ import { syncWorkflowDraft } from '@/service/workflow'
|
|||
import { useFeaturesStore } from '@/app/components/base/features/hooks'
|
||||
import { API_PREFIX } from '@/config'
|
||||
import { useWorkflowRefreshDraft } from '.'
|
||||
import { collaborationManager } from '@/app/components/workflow/collaboration/core/collaboration-manager'
|
||||
|
||||
export const useNodesSyncDraft = () => {
|
||||
const store = useStoreApi()
|
||||
|
|
@ -93,9 +94,20 @@ export const useNodesSyncDraft = () => {
|
|||
const syncWorkflowDraftWhenPageClose = useCallback(() => {
|
||||
if (getNodesReadOnly())
|
||||
return
|
||||
|
||||
// Check leader status at sync time
|
||||
const currentIsLeader = collaborationManager.getIsLeader()
|
||||
|
||||
// Only allow leader to sync data
|
||||
if (!currentIsLeader) {
|
||||
console.log('Not leader, skipping sync on page close')
|
||||
return
|
||||
}
|
||||
|
||||
const postParams = getPostParams()
|
||||
|
||||
if (postParams) {
|
||||
console.log('Leader syncing workflow draft on page close')
|
||||
navigator.sendBeacon(
|
||||
`${API_PREFIX}/apps/${params.appId}/workflows/draft?_token=${localStorage.getItem('console_token')}`,
|
||||
JSON.stringify(postParams.params),
|
||||
|
|
@ -113,6 +125,18 @@ export const useNodesSyncDraft = () => {
|
|||
) => {
|
||||
if (getNodesReadOnly())
|
||||
return
|
||||
|
||||
// Check leader status at sync time
|
||||
const currentIsLeader = collaborationManager.getIsLeader()
|
||||
|
||||
// Only allow leader to sync data
|
||||
if (!currentIsLeader) {
|
||||
console.log('Not leader, skipping workflow draft sync')
|
||||
callback?.onSettled?.()
|
||||
return
|
||||
}
|
||||
|
||||
console.log('Leader performing workflow draft sync')
|
||||
const postParams = getPostParams()
|
||||
|
||||
if (postParams) {
|
||||
|
|
@ -124,9 +148,11 @@ export const useNodesSyncDraft = () => {
|
|||
const res = await syncWorkflowDraft(postParams)
|
||||
setSyncWorkflowDraftHash(res.hash)
|
||||
setDraftUpdatedAt(res.updated_at)
|
||||
console.log('Leader successfully synced workflow draft')
|
||||
callback?.onSuccess && callback.onSuccess()
|
||||
}
|
||||
catch (error: any) {
|
||||
console.error('Leader failed to sync workflow draft:', error)
|
||||
if (error && error.json && !error.bodyUsed) {
|
||||
error.json().then((err: any) => {
|
||||
if (err.code === 'draft_workflow_not_sync' && !notRefreshWhenSyncError)
|
||||
|
|
|
|||
|
|
@ -17,6 +17,7 @@ export class CollaborationManager {
|
|||
private cursors: Record<string, CursorPosition> = {}
|
||||
private isLeader = false
|
||||
private leaderId: string | null = null
|
||||
private activeConnections = new Set<string>()
|
||||
|
||||
init = (appId: string, reactFlowStore: any): void => {
|
||||
if (!reactFlowStore) {
|
||||
|
|
@ -42,25 +43,57 @@ export class CollaborationManager {
|
|||
this.disconnect()
|
||||
}
|
||||
|
||||
async connect(appId: string, reactFlowStore: any): Promise<void> {
|
||||
if (this.currentAppId === appId && this.doc) return
|
||||
async connect(appId: string, reactFlowStore?: any): Promise<string> {
|
||||
const connectionId = Math.random().toString(36).substring(2, 11)
|
||||
|
||||
this.disconnect()
|
||||
this.activeConnections.add(connectionId)
|
||||
|
||||
if (this.currentAppId === appId && this.doc) {
|
||||
// Already connected to the same app, only update store if provided and we don't have one
|
||||
if (reactFlowStore && !this.reactFlowStore)
|
||||
this.reactFlowStore = reactFlowStore
|
||||
|
||||
return connectionId
|
||||
}
|
||||
|
||||
// Only disconnect if switching to a different app
|
||||
if (this.currentAppId && this.currentAppId !== appId)
|
||||
this.forceDisconnect()
|
||||
|
||||
this.currentAppId = appId
|
||||
this.reactFlowStore = reactFlowStore
|
||||
// Only set store if provided
|
||||
if (reactFlowStore)
|
||||
this.reactFlowStore = reactFlowStore
|
||||
|
||||
const socket = webSocketClient.connect(appId)
|
||||
|
||||
// Setup event listeners BEFORE any other operations
|
||||
this.setupSocketEventListeners(socket)
|
||||
|
||||
this.doc = new LoroDoc()
|
||||
this.nodesMap = this.doc.getMap('nodes')
|
||||
this.edgesMap = this.doc.getMap('edges')
|
||||
this.provider = new CRDTProvider(socket, this.doc)
|
||||
|
||||
this.setupSubscriptions()
|
||||
this.setupSocketEventListeners(socket)
|
||||
|
||||
// Force user_connect if already connected
|
||||
if (socket.connected)
|
||||
socket.emit('user_connect', { workflow_id: appId })
|
||||
|
||||
return connectionId
|
||||
}
|
||||
|
||||
disconnect = (): void => {
|
||||
disconnect = (connectionId?: string): void => {
|
||||
if (connectionId)
|
||||
this.activeConnections.delete(connectionId)
|
||||
|
||||
// Only disconnect when no more connections
|
||||
if (this.activeConnections.size === 0)
|
||||
this.forceDisconnect()
|
||||
}
|
||||
|
||||
private forceDisconnect = (): void => {
|
||||
if (this.currentAppId)
|
||||
webSocketClient.disconnect(this.currentAppId)
|
||||
|
||||
|
|
@ -72,8 +105,16 @@ export class CollaborationManager {
|
|||
this.currentAppId = null
|
||||
this.reactFlowStore = null
|
||||
this.cursors = {}
|
||||
|
||||
// Only reset leader status when actually disconnecting
|
||||
const wasLeader = this.isLeader
|
||||
this.isLeader = false
|
||||
this.leaderId = null
|
||||
|
||||
if (wasLeader)
|
||||
this.eventEmitter.emit('leaderChange', false)
|
||||
|
||||
this.activeConnections.clear()
|
||||
this.eventEmitter.removeAllListeners()
|
||||
}
|
||||
|
||||
|
|
@ -131,6 +172,17 @@ export class CollaborationManager {
|
|||
return this.isLeader
|
||||
}
|
||||
|
||||
debugLeaderStatus(): void {
|
||||
console.log('=== Leader Status Debug ===')
|
||||
console.log('Current leader status:', this.isLeader)
|
||||
console.log('Current leader ID:', this.leaderId)
|
||||
console.log('Active connections:', this.activeConnections.size)
|
||||
console.log('Connected:', this.isConnected())
|
||||
console.log('Current app ID:', this.currentAppId)
|
||||
console.log('Has ReactFlow store:', !!this.reactFlowStore)
|
||||
console.log('========================')
|
||||
}
|
||||
|
||||
private syncNodes(oldNodes: Node[], newNodes: Node[]): void {
|
||||
if (!this.nodesMap) return
|
||||
|
||||
|
|
@ -254,12 +306,6 @@ export class CollaborationManager {
|
|||
if (data.leader && typeof data.leader === 'string')
|
||||
this.leaderId = data.leader
|
||||
|
||||
console.log('Updated online users and leader info:', {
|
||||
users: data.users,
|
||||
leader: data.leader,
|
||||
currentLeader: this.leaderId,
|
||||
})
|
||||
|
||||
this.eventEmitter.emit('onlineUsers', data.users)
|
||||
this.eventEmitter.emit('cursors', { ...this.cursors })
|
||||
}
|
||||
|
|
@ -278,8 +324,6 @@ export class CollaborationManager {
|
|||
const wasLeader = this.isLeader
|
||||
this.isLeader = data.isLeader
|
||||
|
||||
console.log(`Leader status update: ${wasLeader ? 'was' : 'was not'} leader, ${this.isLeader ? 'now is' : 'now is not'} leader`)
|
||||
|
||||
if (wasLeader !== this.isLeader)
|
||||
this.eventEmitter.emit('leaderChange', this.isLeader)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -16,6 +16,8 @@ export function useCollaboration(appId: string, reactFlowStore?: any) {
|
|||
useEffect(() => {
|
||||
if (!appId) return
|
||||
|
||||
let connectionId: string | null = null
|
||||
|
||||
if (!cursorServiceRef.current) {
|
||||
cursorServiceRef.current = new CursorService({
|
||||
minMoveDistance: 10,
|
||||
|
|
@ -24,7 +26,7 @@ export function useCollaboration(appId: string, reactFlowStore?: any) {
|
|||
}
|
||||
|
||||
const initCollaboration = async () => {
|
||||
await collaborationManager.connect(appId, reactFlowStore)
|
||||
connectionId = await collaborationManager.connect(appId, reactFlowStore)
|
||||
setState((prev: any) => ({ ...prev, appId, isConnected: collaborationManager.isConnected() }))
|
||||
}
|
||||
|
||||
|
|
@ -55,7 +57,8 @@ export function useCollaboration(appId: string, reactFlowStore?: any) {
|
|||
unsubscribeUsers()
|
||||
unsubscribeLeaderChange()
|
||||
cursorServiceRef.current?.stopTracking()
|
||||
collaborationManager.disconnect()
|
||||
if (connectionId)
|
||||
collaborationManager.disconnect(connectionId)
|
||||
}
|
||||
}, [appId, reactFlowStore])
|
||||
|
||||
|
|
@ -71,7 +74,7 @@ export function useCollaboration(appId: string, reactFlowStore?: any) {
|
|||
cursorServiceRef.current?.stopTracking()
|
||||
}
|
||||
|
||||
return {
|
||||
const result = {
|
||||
isConnected: state.isConnected || false,
|
||||
onlineUsers: state.onlineUsers || [],
|
||||
cursors: state.cursors || {},
|
||||
|
|
@ -80,4 +83,6 @@ export function useCollaboration(appId: string, reactFlowStore?: any) {
|
|||
startCursorTracking,
|
||||
stopCursorTracking,
|
||||
}
|
||||
|
||||
return result
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,61 +0,0 @@
|
|||
import React from 'react'
|
||||
import { useCollaboration } from '../hooks/use-collaboration'
|
||||
|
||||
type LeaderTestProps = {
|
||||
appId: string
|
||||
}
|
||||
|
||||
export function LeaderTest({ appId }: LeaderTestProps) {
|
||||
const { isConnected, isLeader, leaderId, onlineUsers } = useCollaboration(appId)
|
||||
|
||||
return (
|
||||
<div className="rounded-lg border bg-gray-50 p-4">
|
||||
<h3 className="mb-4 text-lg font-semibold">Leader Election Test</h3>
|
||||
|
||||
<div className="space-y-2">
|
||||
<div className="flex items-center space-x-2">
|
||||
<span className="font-medium">Connection:</span>
|
||||
<span className={`rounded px-2 py-1 text-sm ${
|
||||
isConnected ? 'bg-green-100 text-green-800' : 'bg-red-100 text-red-800'
|
||||
}`}>
|
||||
{isConnected ? 'Connected' : 'Disconnected'}
|
||||
</span>
|
||||
</div>
|
||||
|
||||
<div className="flex items-center space-x-2">
|
||||
<span className="font-medium">I am Leader:</span>
|
||||
<span className={`rounded px-2 py-1 text-sm ${
|
||||
isLeader ? 'bg-blue-100 text-blue-800' : 'bg-gray-100 text-gray-800'
|
||||
}`}>
|
||||
{isLeader ? 'YES' : 'NO'}
|
||||
</span>
|
||||
</div>
|
||||
|
||||
<div className="flex items-center space-x-2">
|
||||
<span className="font-medium">Current Leader ID:</span>
|
||||
<span className="rounded bg-gray-100 px-2 py-1 font-mono text-sm">
|
||||
{leaderId || 'None'}
|
||||
</span>
|
||||
</div>
|
||||
|
||||
<div className="mt-4">
|
||||
<span className="font-medium">Online Users ({onlineUsers.length}):</span>
|
||||
<div className="mt-2 space-y-1">
|
||||
{onlineUsers.map((user: any) => (
|
||||
<div key={user.user_id} className="flex items-center space-x-2 text-sm">
|
||||
<span className={`h-2 w-2 rounded-full ${
|
||||
user.user_id === leaderId ? 'bg-blue-500' : 'bg-green-500'
|
||||
}`} />
|
||||
<span className="font-mono">{user.user_id}</span>
|
||||
<span>({user.username})</span>
|
||||
{user.user_id === leaderId && (
|
||||
<span className="font-medium text-blue-600">👑 Leader</span>
|
||||
)}
|
||||
</div>
|
||||
))}
|
||||
</div>
|
||||
</div>
|
||||
</div>
|
||||
</div>
|
||||
)
|
||||
}
|
||||
Loading…
Reference in New Issue