From bb9ae66f81765fb70c94cbb8b62d7c6549078440 Mon Sep 17 00:00:00 2001 From: hjlarry Date: Sat, 7 Feb 2026 09:34:13 +0800 Subject: [PATCH 1/3] fix: ensure leader online to accept graph change --- .../workflow_collaboration_service.py | 36 +++++++++++++++++++ 1 file changed, 36 insertions(+) diff --git a/api/services/workflow_collaboration_service.py b/api/services/workflow_collaboration_service.py index 8999008d70..fce9fe95a5 100644 --- a/api/services/workflow_collaboration_service.py +++ b/api/services/workflow_collaboration_service.py @@ -120,6 +120,30 @@ class WorkflowCollaborationService: return {"msg": "skill_file_active_updated"}, 200 + if event_type == "sync_request": + leader_sid = self._repository.get_current_leader(workflow_id) + if leader_sid and ( + self.is_session_active(workflow_id, leader_sid) + and self._repository.is_graph_active(workflow_id, leader_sid) + ): + target_sid = leader_sid + else: + if leader_sid: + self._repository.delete_leader(workflow_id) + target_sid = self._select_active_graph_leader(workflow_id, preferred_sid=sid) + if target_sid: + self._repository.set_leader(workflow_id, target_sid) + self.broadcast_leader_change(workflow_id, target_sid) + if not target_sid: + return {"msg": "no_active_leader"}, 200 + + self._socketio.emit( + "collaboration_update", + {"type": event_type, "userId": user_id, "data": event_data, "timestamp": timestamp}, + room=target_sid, + ) + return {"msg": "sync_request_forwarded"}, 200 + self._socketio.emit( "collaboration_update", {"type": event_type, "userId": user_id, "data": event_data, "timestamp": timestamp}, @@ -301,6 +325,18 @@ class WorkflowCollaborationService: return preferred_sid return session_sids[0] + def _select_active_graph_leader(self, workflow_id: str, preferred_sid: str | None = None) -> str | None: + session_sids = [ + session["sid"] + for session in self._repository.list_sessions(workflow_id) + if session.get("graph_active") and self.is_session_active(workflow_id, session["sid"]) + ] + if not session_sids: + return None + if preferred_sid and preferred_sid in session_sids: + return preferred_sid + return session_sids[0] + def _select_skill_leader(self, workflow_id: str, file_id: str, preferred_sid: str | None = None) -> str | None: session_sids = [ sid From 865b221ce6ae599b2557e14dead632ee8990eb6b Mon Sep 17 00:00:00 2001 From: hjlarry Date: Sat, 7 Feb 2026 09:48:07 +0800 Subject: [PATCH 2/3] fix: make sure restart server not get ghost online user --- .../workflow_collaboration_service.py | 47 +++++++++++---- .../test_workflow_collaboration_service.py | 60 ++++++++++++++++--- 2 files changed, 88 insertions(+), 19 deletions(-) diff --git a/api/services/workflow_collaboration_service.py b/api/services/workflow_collaboration_service.py index fce9fe95a5..8968a624ff 100644 --- a/api/services/workflow_collaboration_service.py +++ b/api/services/workflow_collaboration_service.py @@ -130,7 +130,7 @@ class WorkflowCollaborationService: else: if leader_sid: self._repository.delete_leader(workflow_id) - target_sid = self._select_active_graph_leader(workflow_id, preferred_sid=sid) + target_sid = self._select_graph_leader(workflow_id, preferred_sid=sid) if target_sid: self._repository.set_leader(workflow_id, target_sid) self.broadcast_leader_change(workflow_id, target_sid) @@ -256,11 +256,44 @@ class WorkflowCollaborationService: def get_current_leader(self, workflow_id: str) -> str | None: return self._repository.get_current_leader(workflow_id) + def _prune_inactive_sessions(self, workflow_id: str) -> list[WorkflowSessionInfo]: + """Remove inactive sessions from storage and return active sessions only.""" + sessions = self._repository.list_sessions(workflow_id) + if not sessions: + return [] + + active_sessions: list[WorkflowSessionInfo] = [] + stale_sids: list[str] = [] + for session in sessions: + sid = session["sid"] + if self.is_session_active(workflow_id, sid): + active_sessions.append(session) + else: + stale_sids.append(sid) + + for sid in stale_sids: + self._repository.delete_session(workflow_id, sid) + + return active_sessions + def broadcast_online_users(self, workflow_id: str) -> None: - users = self._repository.list_sessions(workflow_id) + users = self._prune_inactive_sessions(workflow_id) users.sort(key=lambda x: x.get("connected_at") or 0) leader_sid = self.get_current_leader(workflow_id) + previous_leader = leader_sid + active_sids = {user["sid"] for user in users} + if leader_sid and leader_sid not in active_sids: + self._repository.delete_leader(workflow_id) + leader_sid = None + + if not leader_sid and users: + leader_sid = self._select_graph_leader(workflow_id) + if leader_sid: + self._repository.set_leader(workflow_id, leader_sid) + + if leader_sid != previous_leader: + self.broadcast_leader_change(workflow_id, leader_sid) self._socketio.emit( "online_users", @@ -316,16 +349,6 @@ class WorkflowCollaborationService: self.broadcast_skill_leader_change(workflow_id, file_id, new_leader_sid) def _select_graph_leader(self, workflow_id: str, preferred_sid: str | None = None) -> str | None: - session_sids = [ - session["sid"] for session in self._repository.list_sessions(workflow_id) if session.get("graph_active") - ] - if not session_sids: - return None - if preferred_sid and preferred_sid in session_sids: - return preferred_sid - return session_sids[0] - - def _select_active_graph_leader(self, workflow_id: str, preferred_sid: str | None = None) -> str | None: session_sids = [ session["sid"] for session in self._repository.list_sessions(workflow_id) diff --git a/api/tests/unit_tests/services/test_workflow_collaboration_service.py b/api/tests/unit_tests/services/test_workflow_collaboration_service.py index d5334d5e34..f1484f2822 100644 --- a/api/tests/unit_tests/services/test_workflow_collaboration_service.py +++ b/api/tests/unit_tests/services/test_workflow_collaboration_service.py @@ -140,9 +140,19 @@ class TestWorkflowCollaborationService: collaboration_service, repository, _socketio = service repository.get_current_leader.return_value = "sid-1" repository.set_leader_if_absent.return_value = True + repository.list_sessions.return_value = [ + { + "user_id": "u-2", + "username": "B", + "avatar": None, + "sid": "sid-2", + "connected_at": 1, + "graph_active": True, + } + ] with ( - patch.object(collaboration_service, "is_session_active", return_value=False), + patch.object(collaboration_service, "is_session_active", side_effect=lambda _wf, sid: sid != "sid-1"), patch.object(collaboration_service, "broadcast_leader_change") as broadcast_leader_change, ): # Act @@ -161,6 +171,16 @@ class TestWorkflowCollaborationService: collaboration_service, repository, _socketio = service repository.get_current_leader.side_effect = [None, "sid-3"] repository.set_leader_if_absent.return_value = False + repository.list_sessions.return_value = [ + { + "user_id": "u-2", + "username": "B", + "avatar": None, + "sid": "sid-2", + "connected_at": 1, + "graph_active": True, + } + ] # Act result = collaboration_service.get_or_set_leader("wf-1", "sid-2") @@ -174,9 +194,21 @@ class TestWorkflowCollaborationService: # Arrange collaboration_service, repository, _socketio = service repository.get_current_leader.return_value = "sid-1" - repository.get_session_sids.return_value = ["sid-2"] + repository.list_sessions.return_value = [ + { + "user_id": "u-2", + "username": "B", + "avatar": None, + "sid": "sid-2", + "connected_at": 1, + "graph_active": True, + } + ] - with patch.object(collaboration_service, "broadcast_leader_change") as broadcast_leader_change: + with ( + patch.object(collaboration_service, "is_session_active", return_value=True), + patch.object(collaboration_service, "broadcast_leader_change") as broadcast_leader_change, + ): # Act collaboration_service.handle_leader_disconnect("wf-1", "sid-1") @@ -190,7 +222,7 @@ class TestWorkflowCollaborationService: # Arrange collaboration_service, repository, _socketio = service repository.get_current_leader.return_value = "sid-1" - repository.get_session_sids.return_value = [] + repository.list_sessions.return_value = [] # Act collaboration_service.handle_leader_disconnect("wf-1", "sid-1") @@ -209,8 +241,9 @@ class TestWorkflowCollaborationService: ] repository.get_current_leader.return_value = "sid-1" - # Act - collaboration_service.broadcast_online_users("wf-1") + with patch.object(collaboration_service, "is_session_active", return_value=True): + # Act + collaboration_service.broadcast_online_users("wf-1") # Assert socketio.emit.assert_called_once_with( @@ -248,8 +281,21 @@ class TestWorkflowCollaborationService: # Arrange collaboration_service, repository, _socketio = service repository.get_current_leader.return_value = None + repository.list_sessions.return_value = [ + { + "user_id": "u-2", + "username": "B", + "avatar": None, + "sid": "sid-2", + "connected_at": 1, + "graph_active": True, + } + ] - with patch.object(collaboration_service, "broadcast_leader_change") as broadcast_leader_change: + with ( + patch.object(collaboration_service, "is_session_active", return_value=True), + patch.object(collaboration_service, "broadcast_leader_change") as broadcast_leader_change, + ): # Act collaboration_service.refresh_session_state("wf-1", "sid-2") From e10996c3686ee0a9bd0d87d3f84b106ccb1a7945 Mon Sep 17 00:00:00 2001 From: hjlarry Date: Sat, 7 Feb 2026 10:12:47 +0800 Subject: [PATCH 3/3] chore: log 20 recent crdt import changes --- .../core/collaboration-manager.ts | 127 ++++++++++++++++++ .../workflow/hooks/use-shortcuts.ts | 8 ++ 2 files changed, 135 insertions(+) diff --git a/web/app/components/workflow/collaboration/core/collaboration-manager.ts b/web/app/components/workflow/collaboration/core/collaboration-manager.ts index 573cd369ff..a23717a96a 100644 --- a/web/app/components/workflow/collaboration/core/collaboration-manager.ts +++ b/web/app/components/workflow/collaboration/core/collaboration-manager.ts @@ -56,6 +56,28 @@ type LoroContainer = { getAttached?: () => unknown } +type GraphImportLogEntry = { + timestamp: number + appId: string | null + sources: Array<'nodes' | 'edges'> + before: { + nodes: Node[] + edges: Edge[] + } + after: { + nodes: Node[] + edges: Edge[] + } + meta: { + leaderId: string | null + isLeader: boolean + graphViewActive: boolean | null + pendingInitialSync: boolean + } +} + +const GRAPH_IMPORT_LOG_LIMIT = 20 + const toLoroValue = (value: unknown): Value => cloneDeep(value) as Value const toLoroRecord = (value: unknown): Record => cloneDeep(value) as Record export class CollaborationManager { @@ -78,6 +100,15 @@ export class CollaborationManager { private rejoinInProgress = false private pendingGraphImportEmit = false private graphViewActive: boolean | null = null + private graphImportLogs: GraphImportLogEntry[] = [] + private pendingImportLog: { + timestamp: number + sources: Set<'nodes' | 'edges'> + before: { + nodes: Node[] + edges: Edge[] + } + } | null = null private getActiveSocket(): Socket | null { if (!this.currentAppId) @@ -504,6 +535,7 @@ export class CollaborationManager { this.onlineUsers = [] this.isUndoRedoInProgress = false this.rejoinInProgress = false + this.clearGraphImportLog() // Only reset leader status when actually disconnecting const wasLeader = this.isLeader @@ -908,6 +940,7 @@ export class CollaborationManager { requestAnimationFrame(() => { const state = reactFlowStore.getState() const previousNodes: Node[] = state.getNodes() + this.startImportLog('nodes', { nodes: previousNodes, edges: state.getEdges() }) const previousNodeMap = new Map(previousNodes.map(node => [node.id, node])) const selectedIds = new Set( previousNodes @@ -964,6 +997,7 @@ export class CollaborationManager { requestAnimationFrame(() => { // Get ReactFlow's native setters, not the collaborative ones const state = reactFlowStore.getState() + this.startImportLog('edges', { nodes: state.getNodes(), edges: state.getEdges() }) const updatedEdges = Array.from(this.edgesMap?.values() || []) as Edge[] this.pendingInitialSync = false @@ -984,6 +1018,7 @@ export class CollaborationManager { this.pendingGraphImportEmit = true requestAnimationFrame(() => { this.pendingGraphImportEmit = false + this.finalizeImportLog() const mergedNodes = this.mergeLocalNodeState(this.getNodes()) this.eventEmitter.emit('graphImport', { nodes: mergedNodes, @@ -1034,6 +1069,98 @@ export class CollaborationManager { }) } + getGraphImportLog(): GraphImportLogEntry[] { + return cloneDeep(this.graphImportLogs) + } + + clearGraphImportLog(): void { + this.graphImportLogs = [] + this.pendingImportLog = null + } + + downloadGraphImportLog(): void { + if (this.graphImportLogs.length === 0) + return + + const payload = { + appId: this.currentAppId, + generatedAt: new Date().toISOString(), + entries: this.graphImportLogs, + } + const stamp = new Date().toISOString().replace(/[:.]/g, '-') + const appSuffix = this.currentAppId ?? 'unknown' + const fileName = `workflow-graph-import-log-${appSuffix}-${stamp}.json` + const blob = new Blob([JSON.stringify(payload, null, 2)], { type: 'application/json' }) + const url = URL.createObjectURL(blob) + const link = document.createElement('a') + link.href = url + link.download = fileName + link.click() + URL.revokeObjectURL(url) + } + + private snapshotReactFlowGraph(): { nodes: Node[], edges: Edge[] } { + if (!this.reactFlowStore) { + return { + nodes: this.getNodes(), + edges: this.getEdges(), + } + } + + const state = this.reactFlowStore.getState() + return { + nodes: cloneDeep(state.getNodes()), + edges: cloneDeep(state.getEdges()), + } + } + + private startImportLog(source: 'nodes' | 'edges', before?: { nodes: Node[], edges: Edge[] }): void { + if (!this.pendingImportLog) { + const snapshot = before ?? this.snapshotReactFlowGraph() + this.pendingImportLog = { + timestamp: Date.now(), + sources: new Set([source]), + before: { + nodes: cloneDeep(snapshot.nodes), + edges: cloneDeep(snapshot.edges), + }, + } + return + } + this.pendingImportLog.sources.add(source) + } + + private finalizeImportLog(): void { + if (!this.pendingImportLog) + return + + const afterSnapshot = this.snapshotReactFlowGraph() + const entry: GraphImportLogEntry = { + timestamp: this.pendingImportLog.timestamp, + appId: this.currentAppId, + sources: Array.from(this.pendingImportLog.sources), + before: { + nodes: this.pendingImportLog.before.nodes, + edges: this.pendingImportLog.before.edges, + }, + after: { + nodes: cloneDeep(afterSnapshot.nodes), + edges: cloneDeep(afterSnapshot.edges), + }, + meta: { + leaderId: this.leaderId, + isLeader: this.isLeader, + graphViewActive: this.graphViewActive, + pendingInitialSync: this.pendingInitialSync, + }, + } + + this.graphImportLogs.push(entry) + if (this.graphImportLogs.length > GRAPH_IMPORT_LOG_LIMIT) + this.graphImportLogs.splice(0, this.graphImportLogs.length - GRAPH_IMPORT_LOG_LIMIT) + this.pendingImportLog = null + } + private setupSocketEventListeners(socket: Socket): void { socket.on('collaboration_update', (update: CollaborationUpdate) => { if (update.type === 'mouse_move') { diff --git a/web/app/components/workflow/hooks/use-shortcuts.ts b/web/app/components/workflow/hooks/use-shortcuts.ts index 6eb7cd2236..7f91d3ab1e 100644 --- a/web/app/components/workflow/hooks/use-shortcuts.ts +++ b/web/app/components/workflow/hooks/use-shortcuts.ts @@ -10,6 +10,7 @@ import { useWorkflowMoveMode, useWorkflowOrganize, } from '.' +import { collaborationManager } from '../collaboration/core/collaboration-manager' import { useWorkflowStore } from '../store' import { getKeyboardKeyCodeBySystem, @@ -266,6 +267,13 @@ export const useShortcuts = (enabled = true): void => { useCapture: true, }) + useKeyPress(`${getKeyboardKeyCodeBySystem('ctrl')}.shift.l`, (e) => { + if (shouldHandleShortcut(e)) { + e.preventDefault() + collaborationManager.downloadGraphImportLog() + } + }, { exactMatch: true, useCapture: true }) + // Shift ↓ useKeyPress( 'shift',