From 865b221ce6ae599b2557e14dead632ee8990eb6b Mon Sep 17 00:00:00 2001 From: hjlarry Date: Sat, 7 Feb 2026 09:48:07 +0800 Subject: [PATCH] fix: make sure restart server not get ghost online user --- .../workflow_collaboration_service.py | 47 +++++++++++---- .../test_workflow_collaboration_service.py | 60 ++++++++++++++++--- 2 files changed, 88 insertions(+), 19 deletions(-) diff --git a/api/services/workflow_collaboration_service.py b/api/services/workflow_collaboration_service.py index fce9fe95a5..8968a624ff 100644 --- a/api/services/workflow_collaboration_service.py +++ b/api/services/workflow_collaboration_service.py @@ -130,7 +130,7 @@ class WorkflowCollaborationService: else: if leader_sid: self._repository.delete_leader(workflow_id) - target_sid = self._select_active_graph_leader(workflow_id, preferred_sid=sid) + target_sid = self._select_graph_leader(workflow_id, preferred_sid=sid) if target_sid: self._repository.set_leader(workflow_id, target_sid) self.broadcast_leader_change(workflow_id, target_sid) @@ -256,11 +256,44 @@ class WorkflowCollaborationService: def get_current_leader(self, workflow_id: str) -> str | None: return self._repository.get_current_leader(workflow_id) + def _prune_inactive_sessions(self, workflow_id: str) -> list[WorkflowSessionInfo]: + """Remove inactive sessions from storage and return active sessions only.""" + sessions = self._repository.list_sessions(workflow_id) + if not sessions: + return [] + + active_sessions: list[WorkflowSessionInfo] = [] + stale_sids: list[str] = [] + for session in sessions: + sid = session["sid"] + if self.is_session_active(workflow_id, sid): + active_sessions.append(session) + else: + stale_sids.append(sid) + + for sid in stale_sids: + self._repository.delete_session(workflow_id, sid) + + return active_sessions + def broadcast_online_users(self, workflow_id: str) -> None: - users = self._repository.list_sessions(workflow_id) + users = self._prune_inactive_sessions(workflow_id) users.sort(key=lambda x: x.get("connected_at") or 0) leader_sid = self.get_current_leader(workflow_id) + previous_leader = leader_sid + active_sids = {user["sid"] for user in users} + if leader_sid and leader_sid not in active_sids: + self._repository.delete_leader(workflow_id) + leader_sid = None + + if not leader_sid and users: + leader_sid = self._select_graph_leader(workflow_id) + if leader_sid: + self._repository.set_leader(workflow_id, leader_sid) + + if leader_sid != previous_leader: + self.broadcast_leader_change(workflow_id, leader_sid) self._socketio.emit( "online_users", @@ -316,16 +349,6 @@ class WorkflowCollaborationService: self.broadcast_skill_leader_change(workflow_id, file_id, new_leader_sid) def _select_graph_leader(self, workflow_id: str, preferred_sid: str | None = None) -> str | None: - session_sids = [ - session["sid"] for session in self._repository.list_sessions(workflow_id) if session.get("graph_active") - ] - if not session_sids: - return None - if preferred_sid and preferred_sid in session_sids: - return preferred_sid - return session_sids[0] - - def _select_active_graph_leader(self, workflow_id: str, preferred_sid: str | None = None) -> str | None: session_sids = [ session["sid"] for session in self._repository.list_sessions(workflow_id) diff --git a/api/tests/unit_tests/services/test_workflow_collaboration_service.py b/api/tests/unit_tests/services/test_workflow_collaboration_service.py index d5334d5e34..f1484f2822 100644 --- a/api/tests/unit_tests/services/test_workflow_collaboration_service.py +++ b/api/tests/unit_tests/services/test_workflow_collaboration_service.py @@ -140,9 +140,19 @@ class TestWorkflowCollaborationService: collaboration_service, repository, _socketio = service repository.get_current_leader.return_value = "sid-1" repository.set_leader_if_absent.return_value = True + repository.list_sessions.return_value = [ + { + "user_id": "u-2", + "username": "B", + "avatar": None, + "sid": "sid-2", + "connected_at": 1, + "graph_active": True, + } + ] with ( - patch.object(collaboration_service, "is_session_active", return_value=False), + patch.object(collaboration_service, "is_session_active", side_effect=lambda _wf, sid: sid != "sid-1"), patch.object(collaboration_service, "broadcast_leader_change") as broadcast_leader_change, ): # Act @@ -161,6 +171,16 @@ class TestWorkflowCollaborationService: collaboration_service, repository, _socketio = service repository.get_current_leader.side_effect = [None, "sid-3"] repository.set_leader_if_absent.return_value = False + repository.list_sessions.return_value = [ + { + "user_id": "u-2", + "username": "B", + "avatar": None, + "sid": "sid-2", + "connected_at": 1, + "graph_active": True, + } + ] # Act result = collaboration_service.get_or_set_leader("wf-1", "sid-2") @@ -174,9 +194,21 @@ class TestWorkflowCollaborationService: # Arrange collaboration_service, repository, _socketio = service repository.get_current_leader.return_value = "sid-1" - repository.get_session_sids.return_value = ["sid-2"] + repository.list_sessions.return_value = [ + { + "user_id": "u-2", + "username": "B", + "avatar": None, + "sid": "sid-2", + "connected_at": 1, + "graph_active": True, + } + ] - with patch.object(collaboration_service, "broadcast_leader_change") as broadcast_leader_change: + with ( + patch.object(collaboration_service, "is_session_active", return_value=True), + patch.object(collaboration_service, "broadcast_leader_change") as broadcast_leader_change, + ): # Act collaboration_service.handle_leader_disconnect("wf-1", "sid-1") @@ -190,7 +222,7 @@ class TestWorkflowCollaborationService: # Arrange collaboration_service, repository, _socketio = service repository.get_current_leader.return_value = "sid-1" - repository.get_session_sids.return_value = [] + repository.list_sessions.return_value = [] # Act collaboration_service.handle_leader_disconnect("wf-1", "sid-1") @@ -209,8 +241,9 @@ class TestWorkflowCollaborationService: ] repository.get_current_leader.return_value = "sid-1" - # Act - collaboration_service.broadcast_online_users("wf-1") + with patch.object(collaboration_service, "is_session_active", return_value=True): + # Act + collaboration_service.broadcast_online_users("wf-1") # Assert socketio.emit.assert_called_once_with( @@ -248,8 +281,21 @@ class TestWorkflowCollaborationService: # Arrange collaboration_service, repository, _socketio = service repository.get_current_leader.return_value = None + repository.list_sessions.return_value = [ + { + "user_id": "u-2", + "username": "B", + "avatar": None, + "sid": "sid-2", + "connected_at": 1, + "graph_active": True, + } + ] - with patch.object(collaboration_service, "broadcast_leader_change") as broadcast_leader_change: + with ( + patch.object(collaboration_service, "is_session_active", return_value=True), + patch.object(collaboration_service, "broadcast_leader_change") as broadcast_leader_change, + ): # Act collaboration_service.refresh_session_state("wf-1", "sid-2")