each browser tab session a ws connected obj

This commit is contained in:
hjlarry 2025-09-05 22:19:16 +08:00
parent ca8d15bc64
commit cf72184ce4
1 changed files with 65 additions and 55 deletions

View File

@ -40,7 +40,7 @@ def socket_connect(sid, environ, auth):
@sio.on("user_connect")
def handle_user_connect(sid, data):
"""
Handle user connect event, check login and get user info.
Handle user connect event. Each session (tab) is treated as an independent collaborator.
"""
workflow_id = data.get("workflow_id")
@ -53,31 +53,27 @@ def handle_user_connect(sid, data):
if not user_id:
return {"msg": "unauthorized"}, 401
old_info_json = redis_client.hget(f"workflow_online_users:{workflow_id}", user_id)
if old_info_json:
old_info = json.loads(old_info_json)
old_sid = old_info.get("sid")
if old_sid and old_sid != sid:
sio.disconnect(sid=old_sid)
user_info = {
# Each session is stored independently with sid as key
session_info = {
"user_id": user_id,
"username": session.get("username", "Unknown"),
"avatar": session.get("avatar", None),
"sid": sid,
"connected_at": int(time.time()), # Add timestamp to differentiate tabs
}
redis_client.hset(f"workflow_online_users:{workflow_id}", user_id, json.dumps(user_info))
# Store session info with sid as key
redis_client.hset(f"workflow_online_users:{workflow_id}", sid, json.dumps(session_info))
redis_client.set(f"ws_sid_map:{sid}", json.dumps({"workflow_id": workflow_id, "user_id": user_id}))
# Leader election: first user becomes the leader
leader_id = get_or_set_leader(workflow_id, user_id)
is_leader = leader_id == user_id
# Leader election: first session becomes the leader
leader_sid = get_or_set_leader(workflow_id, sid)
is_leader = leader_sid == sid
sio.enter_room(sid, workflow_id)
broadcast_online_users(workflow_id)
# Notify user of their status
# Notify this session of their leader status
sio.emit("status", {"isLeader": is_leader}, room=sid)
return {"msg": "connected", "user_id": user_id, "sid": sid, "isLeader": is_leader}
@ -86,41 +82,43 @@ def handle_user_connect(sid, data):
@sio.on("disconnect")
def handle_disconnect(sid):
"""
Handle user disconnect event, remove user from workflow's online user list.
Handle session disconnect event. Remove the specific session from online users.
"""
mapping = redis_client.get(f"ws_sid_map:{sid}")
if mapping:
data = json.loads(mapping)
workflow_id = data["workflow_id"]
user_id = data["user_id"]
redis_client.hdel(f"workflow_online_users:{workflow_id}", user_id)
# Remove this specific session
redis_client.hdel(f"workflow_online_users:{workflow_id}", sid)
redis_client.delete(f"ws_sid_map:{sid}")
# Handle leader re-election if the leader disconnected
handle_leader_disconnect(workflow_id, user_id)
# Handle leader re-election if the leader session disconnected
handle_leader_disconnect(workflow_id, sid)
broadcast_online_users(workflow_id)
def get_or_set_leader(workflow_id, user_id):
def get_or_set_leader(workflow_id, sid):
"""
Get current leader or set the user as leader if no leader exists.
Returns the leader user_id.
Get current leader session or set this session as leader if no leader exists.
Returns the leader session id (sid).
"""
leader_key = f"workflow_leader:{workflow_id}"
current_leader = redis_client.get(leader_key)
if not current_leader:
# No leader exists, make this user the leader
redis_client.set(leader_key, user_id, ex=3600) # Expire in 1 hour
return user_id
# No leader exists, make this session the leader
redis_client.set(leader_key, sid, ex=3600) # Expire in 1 hour
return sid
return current_leader.decode("utf-8") if isinstance(current_leader, bytes) else current_leader
def handle_leader_disconnect(workflow_id, disconnected_user_id):
def handle_leader_disconnect(workflow_id, disconnected_sid):
"""
Handle leader re-election when a user disconnects.
Handle leader re-election when a session disconnects.
If the disconnected session was the leader, elect a new leader from remaining sessions.
"""
leader_key = f"workflow_leader:{workflow_id}"
current_leader = redis_client.get(leader_key)
@ -128,38 +126,37 @@ def handle_leader_disconnect(workflow_id, disconnected_user_id):
if current_leader:
current_leader = current_leader.decode("utf-8") if isinstance(current_leader, bytes) else current_leader
if current_leader == disconnected_user_id:
# Leader disconnected, elect a new leader
users_json = redis_client.hgetall(f"workflow_online_users:{workflow_id}")
if current_leader == disconnected_sid:
# Leader session disconnected, elect a new leader
sessions_json = redis_client.hgetall(f"workflow_online_users:{workflow_id}")
if users_json:
# Get the first remaining user as new leader
new_leader_id = list(users_json.keys())[0]
if isinstance(new_leader_id, bytes):
new_leader_id = new_leader_id.decode("utf-8")
if sessions_json:
# Get the first remaining session as new leader
new_leader_sid = list(sessions_json.keys())[0]
if isinstance(new_leader_sid, bytes):
new_leader_sid = new_leader_sid.decode("utf-8")
redis_client.set(leader_key, new_leader_id, ex=3600)
redis_client.set(leader_key, new_leader_sid, ex=3600)
# Notify all users about the new leader
broadcast_leader_change(workflow_id, new_leader_id)
# Notify all sessions about the new leader
broadcast_leader_change(workflow_id, new_leader_sid)
else:
# No users left, remove leader
# No sessions left, remove leader
redis_client.delete(leader_key)
def broadcast_leader_change(workflow_id, new_leader_id):
def broadcast_leader_change(workflow_id, new_leader_sid):
"""
Broadcast leader change to all users in the workflow.
Broadcast leader change to all sessions in the workflow.
"""
users_json = redis_client.hgetall(f"workflow_online_users:{workflow_id}")
sessions_json = redis_client.hgetall(f"workflow_online_users:{workflow_id}")
for user_id, user_info_json in users_json.items():
for sid, session_info_json in sessions_json.items():
try:
user_info = json.loads(user_info_json)
user_sid = user_info.get("sid")
if user_sid:
is_leader = (user_id.decode("utf-8") if isinstance(user_id, bytes) else user_id) == new_leader_id
sio.emit("status", {"isLeader": is_leader}, room=user_sid)
sid_str = sid.decode("utf-8") if isinstance(sid, bytes) else sid
is_leader = sid_str == new_leader_sid
# Emit to each session whether they are the new leader
sio.emit("status", {"isLeader": is_leader}, room=sid_str)
except Exception:
continue
@ -175,20 +172,33 @@ def get_current_leader(workflow_id):
def broadcast_online_users(workflow_id):
"""
broadcast online users to the workflow room
Broadcast online users to the workflow room.
Each session is shown as a separate user (even if same person has multiple tabs).
"""
users_json = redis_client.hgetall(f"workflow_online_users:{workflow_id}")
sessions_json = redis_client.hgetall(f"workflow_online_users:{workflow_id}")
users = []
for _, user_info_json in users_json.items():
for sid, session_info_json in sessions_json.items():
try:
users.append(json.loads(user_info_json))
session_info = json.loads(session_info_json)
# Each session appears as a separate "user" in the UI
users.append({
"user_id": session_info["user_id"],
"username": session_info["username"],
"avatar": session_info.get("avatar"),
"sid": session_info["sid"],
"connected_at": session_info.get("connected_at"),
})
except Exception:
continue
# Get current leader
leader_id = get_current_leader(workflow_id)
# Sort by connection time to maintain consistent order
users.sort(key=lambda x: x.get("connected_at") or 0)
sio.emit("online_users", {"workflow_id": workflow_id, "users": users, "leader": leader_id}, room=workflow_id)
# Get current leader session
leader_sid = get_current_leader(workflow_id)
sio.emit("online_users", {"workflow_id": workflow_id, "users": users, "leader": leader_sid}, room=workflow_id)
@sio.on("collaboration_event")