mirror of https://github.com/langgenius/dify.git
expire leader key in redis
This commit is contained in:
parent
1b70a7e4c7
commit
51c8c50b82
|
|
@ -70,7 +70,7 @@ class WorkflowCollaborationService:
|
|||
|
||||
workflow_id = mapping["workflow_id"]
|
||||
user_id = mapping["user_id"]
|
||||
self._repository.refresh_session_state(workflow_id, sid)
|
||||
self.refresh_session_state(workflow_id, sid)
|
||||
|
||||
event_type = data.get("type")
|
||||
event_data = data.get("data")
|
||||
|
|
@ -94,7 +94,7 @@ class WorkflowCollaborationService:
|
|||
return {"msg": "unauthorized"}, 401
|
||||
|
||||
workflow_id = mapping["workflow_id"]
|
||||
self._repository.refresh_session_state(workflow_id, sid)
|
||||
self.refresh_session_state(workflow_id, sid)
|
||||
|
||||
self._socketio.emit("graph_update", data, room=workflow_id, skip_sid=sid)
|
||||
|
||||
|
|
@ -163,6 +163,19 @@ class WorkflowCollaborationService:
|
|||
|
||||
def refresh_session_state(self, workflow_id: str, sid: str) -> None:
|
||||
self._repository.refresh_session_state(workflow_id, sid)
|
||||
self._ensure_leader(workflow_id, sid)
|
||||
|
||||
def _ensure_leader(self, workflow_id: str, sid: str) -> None:
|
||||
current_leader = self._repository.get_current_leader(workflow_id)
|
||||
if current_leader and self.is_session_active(workflow_id, current_leader):
|
||||
self._repository.expire_leader(workflow_id)
|
||||
return
|
||||
|
||||
if current_leader:
|
||||
self._repository.delete_leader(workflow_id)
|
||||
|
||||
self._repository.set_leader(workflow_id, sid)
|
||||
self.broadcast_leader_change(workflow_id, sid)
|
||||
|
||||
def is_session_active(self, workflow_id: str, sid: str) -> bool:
|
||||
if not sid:
|
||||
|
|
|
|||
|
|
@ -226,6 +226,37 @@ class TestWorkflowCollaborationService:
|
|||
room="wf-1",
|
||||
)
|
||||
|
||||
def test_refresh_session_state_expires_active_leader(
|
||||
self, service: tuple[WorkflowCollaborationService, Mock, Mock]
|
||||
) -> None:
|
||||
# Arrange
|
||||
collaboration_service, repository, _socketio = service
|
||||
repository.get_current_leader.return_value = "sid-1"
|
||||
|
||||
with patch.object(collaboration_service, "is_session_active", return_value=True):
|
||||
# Act
|
||||
collaboration_service.refresh_session_state("wf-1", "sid-1")
|
||||
|
||||
# Assert
|
||||
repository.refresh_session_state.assert_called_once_with("wf-1", "sid-1")
|
||||
repository.expire_leader.assert_called_once_with("wf-1")
|
||||
repository.set_leader.assert_not_called()
|
||||
|
||||
def test_refresh_session_state_sets_leader_when_missing(
|
||||
self, service: tuple[WorkflowCollaborationService, Mock, Mock]
|
||||
) -> None:
|
||||
# Arrange
|
||||
collaboration_service, repository, _socketio = service
|
||||
repository.get_current_leader.return_value = None
|
||||
|
||||
with patch.object(collaboration_service, "broadcast_leader_change") as broadcast_leader_change:
|
||||
# Act
|
||||
collaboration_service.refresh_session_state("wf-1", "sid-2")
|
||||
|
||||
# Assert
|
||||
repository.set_leader.assert_called_once_with("wf-1", "sid-2")
|
||||
broadcast_leader_change.assert_called_once_with("wf-1", "sid-2")
|
||||
|
||||
def test_relay_graph_event_emits_update(self, service: tuple[WorkflowCollaborationService, Mock, Mock]) -> None:
|
||||
# Arrange
|
||||
collaboration_service, repository, socketio = service
|
||||
|
|
|
|||
Loading…
Reference in New Issue