add redis key expire time for collaboration

This commit is contained in:
hjlarry 2025-11-11 16:12:50 +08:00
parent d930d8cc4a
commit 39de7673eb
1 changed files with 58 additions and 24 deletions

View File

@ -9,6 +9,35 @@ from libs.passport import PassportService
from libs.token import extract_access_token
from services.account_service import AccountService
SESSION_STATE_TTL_SECONDS = 3600
WORKFLOW_ONLINE_USERS_PREFIX = "workflow_online_users:"
WORKFLOW_LEADER_PREFIX = "workflow_leader:"
WS_SID_MAP_PREFIX = "ws_sid_map:"
def _workflow_key(workflow_id: str) -> str:
return f"{WORKFLOW_ONLINE_USERS_PREFIX}{workflow_id}"
def _leader_key(workflow_id: str) -> str:
return f"{WORKFLOW_LEADER_PREFIX}{workflow_id}"
def _sid_key(sid: str) -> str:
return f"{WS_SID_MAP_PREFIX}{sid}"
def _refresh_session_state(workflow_id: str, sid: str) -> None:
"""
Refresh TTLs for workflow + session keys so healthy sessions do not linger forever after crashes.
"""
workflow_key = _workflow_key(workflow_id)
sid_key = _sid_key(sid)
if redis_client.exists(workflow_key):
redis_client.expire(workflow_key, SESSION_STATE_TTL_SECONDS)
if redis_client.exists(sid_key):
redis_client.expire(sid_key, SESSION_STATE_TTL_SECONDS)
@sio.on("connect")
def socket_connect(sid, environ, auth):
@ -73,9 +102,15 @@ def handle_user_connect(sid, data):
"connected_at": int(time.time()), # Add timestamp to differentiate tabs
}
workflow_key = _workflow_key(workflow_id)
# Store session info with sid as key
redis_client.hset(f"workflow_online_users:{workflow_id}", sid, json.dumps(session_info))
redis_client.set(f"ws_sid_map:{sid}", json.dumps({"workflow_id": workflow_id, "user_id": user_id}))
redis_client.hset(workflow_key, sid, json.dumps(session_info))
redis_client.set(
_sid_key(sid),
json.dumps({"workflow_id": workflow_id, "user_id": user_id}),
ex=SESSION_STATE_TTL_SECONDS,
)
_refresh_session_state(workflow_id, sid)
# Leader election: first session becomes the leader
leader_sid = get_or_set_leader(workflow_id, sid)
@ -95,14 +130,14 @@ def handle_disconnect(sid):
"""
Handle session disconnect event. Remove the specific session from online users.
"""
mapping = redis_client.get(f"ws_sid_map:{sid}")
mapping = redis_client.get(_sid_key(sid))
if mapping:
data = json.loads(mapping)
workflow_id = data["workflow_id"]
# Remove this specific session
redis_client.hdel(f"workflow_online_users:{workflow_id}", sid)
redis_client.delete(f"ws_sid_map:{sid}")
redis_client.hdel(_workflow_key(workflow_id), sid)
redis_client.delete(_sid_key(sid))
# Handle leader re-election if the leader session disconnected
handle_leader_disconnect(workflow_id, sid)
@ -111,8 +146,8 @@ def handle_disconnect(sid):
def _clear_session_state(workflow_id: str, sid: str) -> None:
redis_client.hdel(f"workflow_online_users:{workflow_id}", sid)
redis_client.delete(f"ws_sid_map:{sid}")
redis_client.hdel(_workflow_key(workflow_id), sid)
redis_client.delete(_sid_key(sid))
def _is_session_active(workflow_id: str, sid: str) -> bool:
@ -125,10 +160,10 @@ def _is_session_active(workflow_id: str, sid: str) -> bool:
except AttributeError:
return False
if not redis_client.hexists(f"workflow_online_users:{workflow_id}", sid):
if not redis_client.hexists(_workflow_key(workflow_id), sid):
return False
if not redis_client.exists(f"ws_sid_map:{sid}"):
if not redis_client.exists(_sid_key(sid)):
return False
return True
@ -139,19 +174,18 @@ def get_or_set_leader(workflow_id: str, sid: str) -> str:
Get current leader session or set this session as leader if no valid leader exists.
Returns the leader session id (sid).
"""
leader_key = f"workflow_leader:{workflow_id}"
raw_leader = redis_client.get(leader_key)
raw_leader = redis_client.get(_leader_key(workflow_id))
current_leader = raw_leader.decode("utf-8") if isinstance(raw_leader, bytes) else raw_leader
leader_replaced = False
if current_leader and not _is_session_active(workflow_id, current_leader):
_clear_session_state(workflow_id, current_leader)
redis_client.delete(leader_key)
redis_client.delete(_leader_key(workflow_id))
current_leader = None
leader_replaced = True
if not current_leader:
redis_client.set(leader_key, sid, ex=3600) # Expire in 1 hour
redis_client.set(_leader_key(workflow_id), sid, ex=SESSION_STATE_TTL_SECONDS) # Expire in 1 hour
if leader_replaced:
broadcast_leader_change(workflow_id, sid)
return sid
@ -164,15 +198,14 @@ def handle_leader_disconnect(workflow_id, disconnected_sid):
Handle leader re-election when a session disconnects.
If the disconnected session was the leader, elect a new leader from remaining sessions.
"""
leader_key = f"workflow_leader:{workflow_id}"
current_leader = redis_client.get(leader_key)
current_leader = redis_client.get(_leader_key(workflow_id))
if current_leader:
current_leader = current_leader.decode("utf-8") if isinstance(current_leader, bytes) else current_leader
if current_leader == disconnected_sid:
# Leader session disconnected, elect a new leader
sessions_json = redis_client.hgetall(f"workflow_online_users:{workflow_id}")
sessions_json = redis_client.hgetall(_workflow_key(workflow_id))
if sessions_json:
# Get the first remaining session as new leader
@ -180,20 +213,20 @@ def handle_leader_disconnect(workflow_id, disconnected_sid):
if isinstance(new_leader_sid, bytes):
new_leader_sid = new_leader_sid.decode("utf-8")
redis_client.set(leader_key, new_leader_sid, ex=3600)
redis_client.set(_leader_key(workflow_id), new_leader_sid, ex=SESSION_STATE_TTL_SECONDS)
# Notify all sessions about the new leader
broadcast_leader_change(workflow_id, new_leader_sid)
else:
# No sessions left, remove leader
redis_client.delete(leader_key)
redis_client.delete(_leader_key(workflow_id))
def broadcast_leader_change(workflow_id, new_leader_sid):
"""
Broadcast leader change to all sessions in the workflow.
"""
sessions_json = redis_client.hgetall(f"workflow_online_users:{workflow_id}")
sessions_json = redis_client.hgetall(_workflow_key(workflow_id))
for sid, session_info_json in sessions_json.items():
try:
@ -209,8 +242,7 @@ def get_current_leader(workflow_id):
"""
Get the current leader for a workflow.
"""
leader_key = f"workflow_leader:{workflow_id}"
leader = redis_client.get(leader_key)
leader = redis_client.get(_leader_key(workflow_id))
return leader.decode("utf-8") if leader and isinstance(leader, bytes) else leader
@ -219,7 +251,7 @@ def broadcast_online_users(workflow_id):
Broadcast online users to the workflow room.
Each session is shown as a separate user (even if same person has multiple tabs).
"""
sessions_json = redis_client.hgetall(f"workflow_online_users:{workflow_id}")
sessions_json = redis_client.hgetall(_workflow_key(workflow_id))
users = []
for sid, session_info_json in sessions_json.items():
@ -261,7 +293,7 @@ def handle_collaboration_event(sid, data):
8. node_panel_presence
"""
mapping = redis_client.get(f"ws_sid_map:{sid}")
mapping = redis_client.get(_sid_key(sid))
if not mapping:
return {"msg": "unauthorized"}, 401
@ -269,6 +301,7 @@ def handle_collaboration_event(sid, data):
mapping_data = json.loads(mapping)
workflow_id = mapping_data["workflow_id"]
user_id = mapping_data["user_id"]
_refresh_session_state(workflow_id, sid)
event_type = data.get("type")
event_data = data.get("data")
@ -292,13 +325,14 @@ def handle_graph_event(sid, data):
"""
Handle graph events - simple broadcast relay.
"""
mapping = redis_client.get(f"ws_sid_map:{sid}")
mapping = redis_client.get(_sid_key(sid))
if not mapping:
return {"msg": "unauthorized"}, 401
mapping_data = json.loads(mapping)
workflow_id = mapping_data["workflow_id"]
_refresh_session_state(workflow_id, sid)
sio.emit("graph_update", data, room=workflow_id, skip_sid=sid)