From 7730c88c744e8c286d6b6ed710a02ff4163c3d81 Mon Sep 17 00:00:00 2001 From: hjlarry Date: Tue, 13 Jan 2026 18:01:12 +0800 Subject: [PATCH] fix leader election concurrently --- api/controllers/console/app/online_user.py | 43 ++++++++++++++++------ 1 file changed, 32 insertions(+), 11 deletions(-) diff --git a/api/controllers/console/app/online_user.py b/api/controllers/console/app/online_user.py index d333ad3ba1..d16093f957 100644 --- a/api/controllers/console/app/online_user.py +++ b/api/controllers/console/app/online_user.py @@ -177,23 +177,44 @@ def get_or_set_leader(workflow_id: str, sid: str) -> str: Get current leader session or set this session as leader if no valid leader exists. Returns the leader session id (sid). """ - raw_leader = redis_client.get(_leader_key(workflow_id)) + leader_key = _leader_key(workflow_id) + + # 1. Check existing leader + raw_leader = redis_client.get(leader_key) current_leader = raw_leader.decode("utf-8") if isinstance(raw_leader, bytes) else raw_leader - leader_replaced = False - if current_leader and not _is_session_active(workflow_id, current_leader): - _clear_session_state(workflow_id, current_leader) - redis_client.delete(_leader_key(workflow_id)) - current_leader = None - leader_replaced = True + # 2. If leader exists, verify aliveness + if current_leader: + if _is_session_active(workflow_id, current_leader): + return current_leader + else: + # Leader dead, clear it + _clear_session_state(workflow_id, current_leader) + redis_client.delete(leader_key) + # Fall through to elect new leader - if not current_leader: - redis_client.set(_leader_key(workflow_id), sid, ex=SESSION_STATE_TTL_SECONDS) # Expire in 1 hour - if leader_replaced: + # 3. Try to become leader using atomic SETNX + # nx=True ensures we only set if key doesn't exist + was_set = redis_client.set(leader_key, sid, nx=True, ex=SESSION_STATE_TTL_SECONDS) + + if was_set: + # We won the election! + # If we replaced a dead leader, notify everyone + if current_leader: broadcast_leader_change(workflow_id, sid) return sid - return current_leader + # 4. If SETNX failed, someone else became leader concurrently + # Fetch again + raw_leader = redis_client.get(leader_key) + current_leader = raw_leader.decode("utf-8") if isinstance(raw_leader, bytes) else raw_leader + + if current_leader: + return current_leader + + # 5. Fallback (extremely rare race where leader expired/deleted right after SETNX failed) + # Just return self (or recurse) + return sid def handle_leader_disconnect(workflow_id, disconnected_sid):