add leader session more check

This commit is contained in:
hjlarry 2025-09-29 14:01:42 +08:00
parent 930fdc8fb4
commit 5a1fae1171
1 changed files with 38 additions and 5 deletions

View File

@ -99,20 +99,53 @@ def handle_disconnect(sid):
broadcast_online_users(workflow_id)
def get_or_set_leader(workflow_id, sid):
def _clear_session_state(workflow_id: str, sid: str) -> None:
redis_client.hdel(f"workflow_online_users:{workflow_id}", sid)
redis_client.delete(f"ws_sid_map:{sid}")
def _is_session_active(workflow_id: str, sid: str) -> bool:
if not sid:
return False
try:
if not sio.manager.is_connected(sid, "/"):
return False
except AttributeError:
return False
if not redis_client.hexists(f"workflow_online_users:{workflow_id}", sid):
return False
if not redis_client.exists(f"ws_sid_map:{sid}"):
return False
return True
def get_or_set_leader(workflow_id: str, sid: str) -> str:
"""
Get current leader session or set this session as leader if no leader exists.
Get current leader session or set this session as leader if no valid leader exists.
Returns the leader session id (sid).
"""
leader_key = f"workflow_leader:{workflow_id}"
current_leader = redis_client.get(leader_key)
raw_leader = redis_client.get(leader_key)
current_leader = raw_leader.decode("utf-8") if isinstance(raw_leader, bytes) else raw_leader
leader_replaced = False
if current_leader and not _is_session_active(workflow_id, current_leader):
_clear_session_state(workflow_id, current_leader)
redis_client.delete(leader_key)
current_leader = None
leader_replaced = True
if not current_leader:
# No leader exists, make this session the leader
redis_client.set(leader_key, sid, ex=3600) # Expire in 1 hour
if leader_replaced:
broadcast_leader_change(workflow_id, sid)
return sid
return current_leader.decode("utf-8") if isinstance(current_leader, bytes) else current_leader
return current_leader
def handle_leader_disconnect(workflow_id, disconnected_sid):