From 51c8c50b82c41f3484cdd5250fee23ce3f7b0cda Mon Sep 17 00:00:00 2001 From: hjlarry Date: Thu, 22 Jan 2026 09:30:51 +0800 Subject: [PATCH] expire leader key in redis --- .../workflow_collaboration_service.py | 17 ++++++++-- .../test_workflow_collaboration_service.py | 31 +++++++++++++++++++ 2 files changed, 46 insertions(+), 2 deletions(-) diff --git a/api/services/workflow_collaboration_service.py b/api/services/workflow_collaboration_service.py index e739e879e1..02d08fcaff 100644 --- a/api/services/workflow_collaboration_service.py +++ b/api/services/workflow_collaboration_service.py @@ -70,7 +70,7 @@ class WorkflowCollaborationService: workflow_id = mapping["workflow_id"] user_id = mapping["user_id"] - self._repository.refresh_session_state(workflow_id, sid) + self.refresh_session_state(workflow_id, sid) event_type = data.get("type") event_data = data.get("data") @@ -94,7 +94,7 @@ class WorkflowCollaborationService: return {"msg": "unauthorized"}, 401 workflow_id = mapping["workflow_id"] - self._repository.refresh_session_state(workflow_id, sid) + self.refresh_session_state(workflow_id, sid) self._socketio.emit("graph_update", data, room=workflow_id, skip_sid=sid) @@ -163,6 +163,19 @@ class WorkflowCollaborationService: def refresh_session_state(self, workflow_id: str, sid: str) -> None: self._repository.refresh_session_state(workflow_id, sid) + self._ensure_leader(workflow_id, sid) + + def _ensure_leader(self, workflow_id: str, sid: str) -> None: + current_leader = self._repository.get_current_leader(workflow_id) + if current_leader and self.is_session_active(workflow_id, current_leader): + self._repository.expire_leader(workflow_id) + return + + if current_leader: + self._repository.delete_leader(workflow_id) + + self._repository.set_leader(workflow_id, sid) + self.broadcast_leader_change(workflow_id, sid) def is_session_active(self, workflow_id: str, sid: str) -> bool: if not sid: diff --git a/api/tests/unit_tests/services/test_workflow_collaboration_service.py b/api/tests/unit_tests/services/test_workflow_collaboration_service.py index 2f0b89e3fb..d5334d5e34 100644 --- a/api/tests/unit_tests/services/test_workflow_collaboration_service.py +++ b/api/tests/unit_tests/services/test_workflow_collaboration_service.py @@ -226,6 +226,37 @@ class TestWorkflowCollaborationService: room="wf-1", ) + def test_refresh_session_state_expires_active_leader( + self, service: tuple[WorkflowCollaborationService, Mock, Mock] + ) -> None: + # Arrange + collaboration_service, repository, _socketio = service + repository.get_current_leader.return_value = "sid-1" + + with patch.object(collaboration_service, "is_session_active", return_value=True): + # Act + collaboration_service.refresh_session_state("wf-1", "sid-1") + + # Assert + repository.refresh_session_state.assert_called_once_with("wf-1", "sid-1") + repository.expire_leader.assert_called_once_with("wf-1") + repository.set_leader.assert_not_called() + + def test_refresh_session_state_sets_leader_when_missing( + self, service: tuple[WorkflowCollaborationService, Mock, Mock] + ) -> None: + # Arrange + collaboration_service, repository, _socketio = service + repository.get_current_leader.return_value = None + + with patch.object(collaboration_service, "broadcast_leader_change") as broadcast_leader_change: + # Act + collaboration_service.refresh_session_state("wf-1", "sid-2") + + # Assert + repository.set_leader.assert_called_once_with("wf-1", "sid-2") + broadcast_leader_change.assert_called_once_with("wf-1", "sid-2") + def test_relay_graph_event_emits_update(self, service: tuple[WorkflowCollaborationService, Mock, Mock]) -> None: # Arrange collaboration_service, repository, socketio = service