diff --git a/api/controllers/console/app/online_user.py b/api/controllers/console/app/online_user.py index 8e2f6f2a3e..ae829681ce 100644 --- a/api/controllers/console/app/online_user.py +++ b/api/controllers/console/app/online_user.py @@ -9,6 +9,35 @@ from libs.passport import PassportService from libs.token import extract_access_token from services.account_service import AccountService +SESSION_STATE_TTL_SECONDS = 3600 +WORKFLOW_ONLINE_USERS_PREFIX = "workflow_online_users:" +WORKFLOW_LEADER_PREFIX = "workflow_leader:" +WS_SID_MAP_PREFIX = "ws_sid_map:" + + +def _workflow_key(workflow_id: str) -> str: + return f"{WORKFLOW_ONLINE_USERS_PREFIX}{workflow_id}" + + +def _leader_key(workflow_id: str) -> str: + return f"{WORKFLOW_LEADER_PREFIX}{workflow_id}" + + +def _sid_key(sid: str) -> str: + return f"{WS_SID_MAP_PREFIX}{sid}" + + +def _refresh_session_state(workflow_id: str, sid: str) -> None: + """ + Refresh TTLs for workflow + session keys so healthy sessions do not linger forever after crashes. + """ + workflow_key = _workflow_key(workflow_id) + sid_key = _sid_key(sid) + if redis_client.exists(workflow_key): + redis_client.expire(workflow_key, SESSION_STATE_TTL_SECONDS) + if redis_client.exists(sid_key): + redis_client.expire(sid_key, SESSION_STATE_TTL_SECONDS) + @sio.on("connect") def socket_connect(sid, environ, auth): @@ -73,9 +102,15 @@ def handle_user_connect(sid, data): "connected_at": int(time.time()), # Add timestamp to differentiate tabs } + workflow_key = _workflow_key(workflow_id) # Store session info with sid as key - redis_client.hset(f"workflow_online_users:{workflow_id}", sid, json.dumps(session_info)) - redis_client.set(f"ws_sid_map:{sid}", json.dumps({"workflow_id": workflow_id, "user_id": user_id})) + redis_client.hset(workflow_key, sid, json.dumps(session_info)) + redis_client.set( + _sid_key(sid), + json.dumps({"workflow_id": workflow_id, "user_id": user_id}), + ex=SESSION_STATE_TTL_SECONDS, + ) + _refresh_session_state(workflow_id, sid) # Leader election: first session becomes the leader leader_sid = get_or_set_leader(workflow_id, sid) @@ -95,14 +130,14 @@ def handle_disconnect(sid): """ Handle session disconnect event. Remove the specific session from online users. """ - mapping = redis_client.get(f"ws_sid_map:{sid}") + mapping = redis_client.get(_sid_key(sid)) if mapping: data = json.loads(mapping) workflow_id = data["workflow_id"] # Remove this specific session - redis_client.hdel(f"workflow_online_users:{workflow_id}", sid) - redis_client.delete(f"ws_sid_map:{sid}") + redis_client.hdel(_workflow_key(workflow_id), sid) + redis_client.delete(_sid_key(sid)) # Handle leader re-election if the leader session disconnected handle_leader_disconnect(workflow_id, sid) @@ -111,8 +146,8 @@ def handle_disconnect(sid): def _clear_session_state(workflow_id: str, sid: str) -> None: - redis_client.hdel(f"workflow_online_users:{workflow_id}", sid) - redis_client.delete(f"ws_sid_map:{sid}") + redis_client.hdel(_workflow_key(workflow_id), sid) + redis_client.delete(_sid_key(sid)) def _is_session_active(workflow_id: str, sid: str) -> bool: @@ -125,10 +160,10 @@ def _is_session_active(workflow_id: str, sid: str) -> bool: except AttributeError: return False - if not redis_client.hexists(f"workflow_online_users:{workflow_id}", sid): + if not redis_client.hexists(_workflow_key(workflow_id), sid): return False - if not redis_client.exists(f"ws_sid_map:{sid}"): + if not redis_client.exists(_sid_key(sid)): return False return True @@ -139,19 +174,18 @@ def get_or_set_leader(workflow_id: str, sid: str) -> str: Get current leader session or set this session as leader if no valid leader exists. Returns the leader session id (sid). """ - leader_key = f"workflow_leader:{workflow_id}" - raw_leader = redis_client.get(leader_key) + raw_leader = redis_client.get(_leader_key(workflow_id)) current_leader = raw_leader.decode("utf-8") if isinstance(raw_leader, bytes) else raw_leader leader_replaced = False if current_leader and not _is_session_active(workflow_id, current_leader): _clear_session_state(workflow_id, current_leader) - redis_client.delete(leader_key) + redis_client.delete(_leader_key(workflow_id)) current_leader = None leader_replaced = True if not current_leader: - redis_client.set(leader_key, sid, ex=3600) # Expire in 1 hour + redis_client.set(_leader_key(workflow_id), sid, ex=SESSION_STATE_TTL_SECONDS) # Expire in 1 hour if leader_replaced: broadcast_leader_change(workflow_id, sid) return sid @@ -164,15 +198,14 @@ def handle_leader_disconnect(workflow_id, disconnected_sid): Handle leader re-election when a session disconnects. If the disconnected session was the leader, elect a new leader from remaining sessions. """ - leader_key = f"workflow_leader:{workflow_id}" - current_leader = redis_client.get(leader_key) + current_leader = redis_client.get(_leader_key(workflow_id)) if current_leader: current_leader = current_leader.decode("utf-8") if isinstance(current_leader, bytes) else current_leader if current_leader == disconnected_sid: # Leader session disconnected, elect a new leader - sessions_json = redis_client.hgetall(f"workflow_online_users:{workflow_id}") + sessions_json = redis_client.hgetall(_workflow_key(workflow_id)) if sessions_json: # Get the first remaining session as new leader @@ -180,20 +213,20 @@ def handle_leader_disconnect(workflow_id, disconnected_sid): if isinstance(new_leader_sid, bytes): new_leader_sid = new_leader_sid.decode("utf-8") - redis_client.set(leader_key, new_leader_sid, ex=3600) + redis_client.set(_leader_key(workflow_id), new_leader_sid, ex=SESSION_STATE_TTL_SECONDS) # Notify all sessions about the new leader broadcast_leader_change(workflow_id, new_leader_sid) else: # No sessions left, remove leader - redis_client.delete(leader_key) + redis_client.delete(_leader_key(workflow_id)) def broadcast_leader_change(workflow_id, new_leader_sid): """ Broadcast leader change to all sessions in the workflow. """ - sessions_json = redis_client.hgetall(f"workflow_online_users:{workflow_id}") + sessions_json = redis_client.hgetall(_workflow_key(workflow_id)) for sid, session_info_json in sessions_json.items(): try: @@ -209,8 +242,7 @@ def get_current_leader(workflow_id): """ Get the current leader for a workflow. """ - leader_key = f"workflow_leader:{workflow_id}" - leader = redis_client.get(leader_key) + leader = redis_client.get(_leader_key(workflow_id)) return leader.decode("utf-8") if leader and isinstance(leader, bytes) else leader @@ -219,7 +251,7 @@ def broadcast_online_users(workflow_id): Broadcast online users to the workflow room. Each session is shown as a separate user (even if same person has multiple tabs). """ - sessions_json = redis_client.hgetall(f"workflow_online_users:{workflow_id}") + sessions_json = redis_client.hgetall(_workflow_key(workflow_id)) users = [] for sid, session_info_json in sessions_json.items(): @@ -261,7 +293,7 @@ def handle_collaboration_event(sid, data): 8. node_panel_presence """ - mapping = redis_client.get(f"ws_sid_map:{sid}") + mapping = redis_client.get(_sid_key(sid)) if not mapping: return {"msg": "unauthorized"}, 401 @@ -269,6 +301,7 @@ def handle_collaboration_event(sid, data): mapping_data = json.loads(mapping) workflow_id = mapping_data["workflow_id"] user_id = mapping_data["user_id"] + _refresh_session_state(workflow_id, sid) event_type = data.get("type") event_data = data.get("data") @@ -292,13 +325,14 @@ def handle_graph_event(sid, data): """ Handle graph events - simple broadcast relay. """ - mapping = redis_client.get(f"ws_sid_map:{sid}") + mapping = redis_client.get(_sid_key(sid)) if not mapping: return {"msg": "unauthorized"}, 401 mapping_data = json.loads(mapping) workflow_id = mapping_data["workflow_id"] + _refresh_session_state(workflow_id, sid) sio.emit("graph_update", data, room=workflow_id, skip_sid=sid)