From 5a1fae11717ca26003181aa6da87d4776b5a1035 Mon Sep 17 00:00:00 2001 From: hjlarry Date: Mon, 29 Sep 2025 14:01:42 +0800 Subject: [PATCH] add leader session more check --- api/controllers/console/app/online_user.py | 43 +++++++++++++++++++--- 1 file changed, 38 insertions(+), 5 deletions(-) diff --git a/api/controllers/console/app/online_user.py b/api/controllers/console/app/online_user.py index f69ea32752..d3103a6666 100644 --- a/api/controllers/console/app/online_user.py +++ b/api/controllers/console/app/online_user.py @@ -99,20 +99,53 @@ def handle_disconnect(sid): broadcast_online_users(workflow_id) -def get_or_set_leader(workflow_id, sid): +def _clear_session_state(workflow_id: str, sid: str) -> None: + redis_client.hdel(f"workflow_online_users:{workflow_id}", sid) + redis_client.delete(f"ws_sid_map:{sid}") + + +def _is_session_active(workflow_id: str, sid: str) -> bool: + if not sid: + return False + + try: + if not sio.manager.is_connected(sid, "/"): + return False + except AttributeError: + return False + + if not redis_client.hexists(f"workflow_online_users:{workflow_id}", sid): + return False + + if not redis_client.exists(f"ws_sid_map:{sid}"): + return False + + return True + + +def get_or_set_leader(workflow_id: str, sid: str) -> str: """ - Get current leader session or set this session as leader if no leader exists. + Get current leader session or set this session as leader if no valid leader exists. Returns the leader session id (sid). """ leader_key = f"workflow_leader:{workflow_id}" - current_leader = redis_client.get(leader_key) + raw_leader = redis_client.get(leader_key) + current_leader = raw_leader.decode("utf-8") if isinstance(raw_leader, bytes) else raw_leader + leader_replaced = False + + if current_leader and not _is_session_active(workflow_id, current_leader): + _clear_session_state(workflow_id, current_leader) + redis_client.delete(leader_key) + current_leader = None + leader_replaced = True if not current_leader: - # No leader exists, make this session the leader redis_client.set(leader_key, sid, ex=3600) # Expire in 1 hour + if leader_replaced: + broadcast_leader_change(workflow_id, sid) return sid - return current_leader.decode("utf-8") if isinstance(current_leader, bytes) else current_leader + return current_leader def handle_leader_disconnect(workflow_id, disconnected_sid):