diff --git a/api/controllers/console/app/online_user.py b/api/controllers/console/app/online_user.py index a4f0dea592..34423f40da 100644 --- a/api/controllers/console/app/online_user.py +++ b/api/controllers/console/app/online_user.py @@ -40,7 +40,7 @@ def socket_connect(sid, environ, auth): @sio.on("user_connect") def handle_user_connect(sid, data): """ - Handle user connect event, check login and get user info. + Handle user connect event. Each session (tab) is treated as an independent collaborator. """ workflow_id = data.get("workflow_id") @@ -53,31 +53,27 @@ def handle_user_connect(sid, data): if not user_id: return {"msg": "unauthorized"}, 401 - old_info_json = redis_client.hget(f"workflow_online_users:{workflow_id}", user_id) - if old_info_json: - old_info = json.loads(old_info_json) - old_sid = old_info.get("sid") - if old_sid and old_sid != sid: - sio.disconnect(sid=old_sid) - - user_info = { + # Each session is stored independently with sid as key + session_info = { "user_id": user_id, "username": session.get("username", "Unknown"), "avatar": session.get("avatar", None), "sid": sid, + "connected_at": int(time.time()), # Add timestamp to differentiate tabs } - redis_client.hset(f"workflow_online_users:{workflow_id}", user_id, json.dumps(user_info)) + # Store session info with sid as key + redis_client.hset(f"workflow_online_users:{workflow_id}", sid, json.dumps(session_info)) redis_client.set(f"ws_sid_map:{sid}", json.dumps({"workflow_id": workflow_id, "user_id": user_id})) - # Leader election: first user becomes the leader - leader_id = get_or_set_leader(workflow_id, user_id) - is_leader = leader_id == user_id + # Leader election: first session becomes the leader + leader_sid = get_or_set_leader(workflow_id, sid) + is_leader = leader_sid == sid sio.enter_room(sid, workflow_id) broadcast_online_users(workflow_id) - # Notify user of their status + # Notify this session of their leader status sio.emit("status", {"isLeader": is_leader}, room=sid) return {"msg": "connected", "user_id": user_id, "sid": sid, "isLeader": is_leader} @@ -86,41 +82,43 @@ def handle_user_connect(sid, data): @sio.on("disconnect") def handle_disconnect(sid): """ - Handle user disconnect event, remove user from workflow's online user list. + Handle session disconnect event. Remove the specific session from online users. """ mapping = redis_client.get(f"ws_sid_map:{sid}") if mapping: data = json.loads(mapping) workflow_id = data["workflow_id"] - user_id = data["user_id"] - redis_client.hdel(f"workflow_online_users:{workflow_id}", user_id) + + # Remove this specific session + redis_client.hdel(f"workflow_online_users:{workflow_id}", sid) redis_client.delete(f"ws_sid_map:{sid}") - # Handle leader re-election if the leader disconnected - handle_leader_disconnect(workflow_id, user_id) + # Handle leader re-election if the leader session disconnected + handle_leader_disconnect(workflow_id, sid) broadcast_online_users(workflow_id) -def get_or_set_leader(workflow_id, user_id): +def get_or_set_leader(workflow_id, sid): """ - Get current leader or set the user as leader if no leader exists. - Returns the leader user_id. + Get current leader session or set this session as leader if no leader exists. + Returns the leader session id (sid). """ leader_key = f"workflow_leader:{workflow_id}" current_leader = redis_client.get(leader_key) if not current_leader: - # No leader exists, make this user the leader - redis_client.set(leader_key, user_id, ex=3600) # Expire in 1 hour - return user_id + # No leader exists, make this session the leader + redis_client.set(leader_key, sid, ex=3600) # Expire in 1 hour + return sid return current_leader.decode("utf-8") if isinstance(current_leader, bytes) else current_leader -def handle_leader_disconnect(workflow_id, disconnected_user_id): +def handle_leader_disconnect(workflow_id, disconnected_sid): """ - Handle leader re-election when a user disconnects. + Handle leader re-election when a session disconnects. + If the disconnected session was the leader, elect a new leader from remaining sessions. """ leader_key = f"workflow_leader:{workflow_id}" current_leader = redis_client.get(leader_key) @@ -128,38 +126,37 @@ def handle_leader_disconnect(workflow_id, disconnected_user_id): if current_leader: current_leader = current_leader.decode("utf-8") if isinstance(current_leader, bytes) else current_leader - if current_leader == disconnected_user_id: - # Leader disconnected, elect a new leader - users_json = redis_client.hgetall(f"workflow_online_users:{workflow_id}") + if current_leader == disconnected_sid: + # Leader session disconnected, elect a new leader + sessions_json = redis_client.hgetall(f"workflow_online_users:{workflow_id}") - if users_json: - # Get the first remaining user as new leader - new_leader_id = list(users_json.keys())[0] - if isinstance(new_leader_id, bytes): - new_leader_id = new_leader_id.decode("utf-8") + if sessions_json: + # Get the first remaining session as new leader + new_leader_sid = list(sessions_json.keys())[0] + if isinstance(new_leader_sid, bytes): + new_leader_sid = new_leader_sid.decode("utf-8") - redis_client.set(leader_key, new_leader_id, ex=3600) + redis_client.set(leader_key, new_leader_sid, ex=3600) - # Notify all users about the new leader - broadcast_leader_change(workflow_id, new_leader_id) + # Notify all sessions about the new leader + broadcast_leader_change(workflow_id, new_leader_sid) else: - # No users left, remove leader + # No sessions left, remove leader redis_client.delete(leader_key) -def broadcast_leader_change(workflow_id, new_leader_id): +def broadcast_leader_change(workflow_id, new_leader_sid): """ - Broadcast leader change to all users in the workflow. + Broadcast leader change to all sessions in the workflow. """ - users_json = redis_client.hgetall(f"workflow_online_users:{workflow_id}") + sessions_json = redis_client.hgetall(f"workflow_online_users:{workflow_id}") - for user_id, user_info_json in users_json.items(): + for sid, session_info_json in sessions_json.items(): try: - user_info = json.loads(user_info_json) - user_sid = user_info.get("sid") - if user_sid: - is_leader = (user_id.decode("utf-8") if isinstance(user_id, bytes) else user_id) == new_leader_id - sio.emit("status", {"isLeader": is_leader}, room=user_sid) + sid_str = sid.decode("utf-8") if isinstance(sid, bytes) else sid + is_leader = sid_str == new_leader_sid + # Emit to each session whether they are the new leader + sio.emit("status", {"isLeader": is_leader}, room=sid_str) except Exception: continue @@ -175,20 +172,33 @@ def get_current_leader(workflow_id): def broadcast_online_users(workflow_id): """ - broadcast online users to the workflow room + Broadcast online users to the workflow room. + Each session is shown as a separate user (even if same person has multiple tabs). """ - users_json = redis_client.hgetall(f"workflow_online_users:{workflow_id}") + sessions_json = redis_client.hgetall(f"workflow_online_users:{workflow_id}") users = [] - for _, user_info_json in users_json.items(): + + for sid, session_info_json in sessions_json.items(): try: - users.append(json.loads(user_info_json)) + session_info = json.loads(session_info_json) + # Each session appears as a separate "user" in the UI + users.append({ + "user_id": session_info["user_id"], + "username": session_info["username"], + "avatar": session_info.get("avatar"), + "sid": session_info["sid"], + "connected_at": session_info.get("connected_at"), + }) except Exception: continue - # Get current leader - leader_id = get_current_leader(workflow_id) + # Sort by connection time to maintain consistent order + users.sort(key=lambda x: x.get("connected_at") or 0) - sio.emit("online_users", {"workflow_id": workflow_id, "users": users, "leader": leader_id}, room=workflow_id) + # Get current leader session + leader_sid = get_current_leader(workflow_id) + + sio.emit("online_users", {"workflow_id": workflow_id, "users": users, "leader": leader_sid}, room=workflow_id) @sio.on("collaboration_event")