fix leader election concurrently

This commit is contained in:
hjlarry 2026-01-13 18:01:12 +08:00
parent ac6b540fd8
commit 7730c88c74
1 changed files with 32 additions and 11 deletions

View File

@ -177,23 +177,44 @@ def get_or_set_leader(workflow_id: str, sid: str) -> str:
Get current leader session or set this session as leader if no valid leader exists.
Returns the leader session id (sid).
"""
raw_leader = redis_client.get(_leader_key(workflow_id))
leader_key = _leader_key(workflow_id)
# 1. Check existing leader
raw_leader = redis_client.get(leader_key)
current_leader = raw_leader.decode("utf-8") if isinstance(raw_leader, bytes) else raw_leader
leader_replaced = False
if current_leader and not _is_session_active(workflow_id, current_leader):
_clear_session_state(workflow_id, current_leader)
redis_client.delete(_leader_key(workflow_id))
current_leader = None
leader_replaced = True
# 2. If leader exists, verify aliveness
if current_leader:
if _is_session_active(workflow_id, current_leader):
return current_leader
else:
# Leader dead, clear it
_clear_session_state(workflow_id, current_leader)
redis_client.delete(leader_key)
# Fall through to elect new leader
if not current_leader:
redis_client.set(_leader_key(workflow_id), sid, ex=SESSION_STATE_TTL_SECONDS) # Expire in 1 hour
if leader_replaced:
# 3. Try to become leader using atomic SETNX
# nx=True ensures we only set if key doesn't exist
was_set = redis_client.set(leader_key, sid, nx=True, ex=SESSION_STATE_TTL_SECONDS)
if was_set:
# We won the election!
# If we replaced a dead leader, notify everyone
if current_leader:
broadcast_leader_change(workflow_id, sid)
return sid
return current_leader
# 4. If SETNX failed, someone else became leader concurrently
# Fetch again
raw_leader = redis_client.get(leader_key)
current_leader = raw_leader.decode("utf-8") if isinstance(raw_leader, bytes) else raw_leader
if current_leader:
return current_leader
# 5. Fallback (extremely rare race where leader expired/deleted right after SETNX failed)
# Just return self (or recurse)
return sid
def handle_leader_disconnect(workflow_id, disconnected_sid):