From c3287755e39245262f87d939bc00c6bded575b9d Mon Sep 17 00:00:00 2001 From: hjlarry Date: Mon, 8 Sep 2025 09:00:20 +0800 Subject: [PATCH] add request leader to sync graph --- .../workflow-app/components/workflow-main.tsx | 36 ++++++++++++------- .../hooks/use-nodes-sync-draft.ts | 11 +++--- .../core/collaboration-manager.ts | 26 ++++++++++++++ 3 files changed, 56 insertions(+), 17 deletions(-) diff --git a/web/app/components/workflow-app/components/workflow-main.tsx b/web/app/components/workflow-app/components/workflow-main.tsx index d310d0383c..6bc2fbcc31 100644 --- a/web/app/components/workflow-app/components/workflow-main.tsx +++ b/web/app/components/workflow-app/components/workflow-main.tsx @@ -109,6 +109,19 @@ const WorkflowMain = ({ } }, [featuresStore, workflowStore]) + const { + doSyncWorkflowDraft, + syncWorkflowDraftWhenPageClose, + } = useNodesSyncDraft() + const { handleRefreshWorkflowDraft } = useWorkflowRefreshDraft() + const { + handleBackupDraft, + handleLoadBackupDraft, + handleRestoreFromPublishedWorkflow, + handleRun, + handleStopRun, + } = useWorkflowRun() + useEffect(() => { if (!appId) return @@ -125,18 +138,17 @@ const WorkflowMain = ({ return unsubscribe }, [appId, handleWorkflowDataUpdate]) - const { - doSyncWorkflowDraft, - syncWorkflowDraftWhenPageClose, - } = useNodesSyncDraft() - const { handleRefreshWorkflowDraft } = useWorkflowRefreshDraft() - const { - handleBackupDraft, - handleLoadBackupDraft, - handleRestoreFromPublishedWorkflow, - handleRun, - handleStopRun, - } = useWorkflowRun() + // Listen for sync requests from other users (only processed by leader) + useEffect(() => { + if (!appId) return + + const unsubscribe = collaborationManager.onSyncRequest(() => { + console.log('Leader received sync request, performing sync') + doSyncWorkflowDraft() + }) + + return unsubscribe + }, [appId, doSyncWorkflowDraft]) const { handleStartWorkflowRun, handleWorkflowStartRunInChatflow, diff --git a/web/app/components/workflow-app/hooks/use-nodes-sync-draft.ts b/web/app/components/workflow-app/hooks/use-nodes-sync-draft.ts index c38e60cf97..09b0cf2e19 100644 --- a/web/app/components/workflow-app/hooks/use-nodes-sync-draft.ts +++ b/web/app/components/workflow-app/hooks/use-nodes-sync-draft.ts @@ -129,9 +129,10 @@ export const useNodesSyncDraft = () => { // Check leader status at sync time const currentIsLeader = collaborationManager.getIsLeader() - // Only allow leader to sync data + // If not leader, request the leader to sync if (!currentIsLeader) { - console.log('Not leader, skipping workflow draft sync') + console.log('Not leader, requesting leader to sync workflow draft') + collaborationManager.emitSyncRequest() callback?.onSettled?.() return } @@ -155,10 +156,10 @@ export const useNodesSyncDraft = () => { console.error('Leader failed to sync workflow draft:', error) if (error && error.json && !error.bodyUsed) { error.json().then((err: any) => { - if (err.code === 'draft_workflow_not_sync' && !notRefreshWhenSyncError) - // TODO: hjlarry test collaboration - // handleRefreshWorkflowDraft() + if (err.code === 'draft_workflow_not_sync' && !notRefreshWhenSyncError) { console.error('draft_workflow_not_sync', err) + handleRefreshWorkflowDraft() + } }) } callback?.onError && callback.onError() diff --git a/web/app/components/workflow/collaboration/core/collaboration-manager.ts b/web/app/components/workflow/collaboration/core/collaboration-manager.ts index a0075ecb1f..a36fa7ce16 100644 --- a/web/app/components/workflow/collaboration/core/collaboration-manager.ts +++ b/web/app/components/workflow/collaboration/core/collaboration-manager.ts @@ -144,6 +144,24 @@ export class CollaborationManager { } } + emitSyncRequest(): void { + if (!this.currentAppId || !webSocketClient.isConnected(this.currentAppId)) return + + const socket = webSocketClient.getSocket(this.currentAppId) + if (socket) { + console.log('Emitting sync request to leader') + socket.emit('collaboration_event', { + type: 'syncRequest', + data: { timestamp: Date.now() }, + timestamp: Date.now(), + }) + } + } + + onSyncRequest(callback: () => void): () => void { + return this.eventEmitter.on('syncRequest', callback) + } + onStateChange(callback: (state: Partial) => void): () => void { return this.eventEmitter.on('stateChange', callback) } @@ -285,6 +303,14 @@ export class CollaborationManager { console.log('Processing varsAndFeaturesUpdate event:', update) this.eventEmitter.emit('varsAndFeaturesUpdate', update) } + else if (update.type === 'syncRequest') { + console.log('Received sync request from another user') + // Only process if we are the leader + if (this.isLeader) { + console.log('Leader received sync request, triggering sync') + this.eventEmitter.emit('syncRequest', {}) + } + } }) socket.on('online_users', (data: { users: OnlineUser[]; leader?: string }) => {