diff --git a/api/controllers/console/__init__.py b/api/controllers/console/__init__.py index c44ea6a100..fad1af707f 100644 --- a/api/controllers/console/__init__.py +++ b/api/controllers/console/__init__.py @@ -58,7 +58,6 @@ from .app import ( mcp_server, message, model_config, - online_user, ops_trace, site, statistic, @@ -114,6 +113,7 @@ from .explore import ( recommended_app, saved_message, ) +from .socketio import workflow as socketio_workflow # Import tag controllers from .tag import tags @@ -183,7 +183,6 @@ __all__ = [ "models", "oauth", "oauth_server", - "online_user", "ops_trace", "parameter", "ping", diff --git a/api/controllers/console/app/online_user.py b/api/controllers/console/app/online_user.py deleted file mode 100644 index dc50d2d8c4..0000000000 --- a/api/controllers/console/app/online_user.py +++ /dev/null @@ -1,370 +0,0 @@ -import json -import logging -import time -from collections.abc import Callable -from typing import Any, cast - -from flask import Request as FlaskRequest - -from extensions.ext_redis import redis_client -from extensions.ext_socketio import sio -from libs.passport import PassportService -from libs.token import extract_access_token -from services.account_service import AccountService - -SESSION_STATE_TTL_SECONDS = 3600 -WORKFLOW_ONLINE_USERS_PREFIX = "workflow_online_users:" -WORKFLOW_LEADER_PREFIX = "workflow_leader:" -WS_SID_MAP_PREFIX = "ws_sid_map:" - - -def _workflow_key(workflow_id: str) -> str: - return f"{WORKFLOW_ONLINE_USERS_PREFIX}{workflow_id}" - - -def _leader_key(workflow_id: str) -> str: - return f"{WORKFLOW_LEADER_PREFIX}{workflow_id}" - - -def _sid_key(sid: str) -> str: - return f"{WS_SID_MAP_PREFIX}{sid}" - - -def _refresh_session_state(workflow_id: str, sid: str) -> None: - """ - Refresh TTLs for workflow + session keys so healthy sessions do not linger forever after crashes. - """ - workflow_key = _workflow_key(workflow_id) - sid_key = _sid_key(sid) - if redis_client.exists(workflow_key): - redis_client.expire(workflow_key, SESSION_STATE_TTL_SECONDS) - if redis_client.exists(sid_key): - redis_client.expire(sid_key, SESSION_STATE_TTL_SECONDS) - - -def _sio_on(event: str) -> Callable[[Callable[..., Any]], Callable[..., Any]]: - return cast(Callable[[Callable[..., Any]], Callable[..., Any]], sio.on(event)) - - -@_sio_on("connect") -def socket_connect(sid, environ, auth): - """ - WebSocket connect event, do authentication here. - """ - token = None - if auth and isinstance(auth, dict): - token = auth.get("token") - - if not token: - try: - request_environ = FlaskRequest(environ) - token = extract_access_token(request_environ) - except Exception: - logging.exception("Failed to extract token") - token = None - - if not token: - return False - - try: - decoded = PassportService().verify(token) - user_id = decoded.get("user_id") - if not user_id: - return False - - with sio.app.app_context(): - user = AccountService.load_logged_in_account(account_id=user_id) - if not user: - return False - - sio.save_session(sid, {"user_id": user.id, "username": user.name, "avatar": user.avatar}) - - return True - - except Exception: - logging.exception("Socket authentication failed") - return False - - -@_sio_on("user_connect") -def handle_user_connect(sid, data): - """ - Handle user connect event. Each session (tab) is treated as an independent collaborator. - """ - - workflow_id = data.get("workflow_id") - if not workflow_id: - return {"msg": "workflow_id is required"}, 400 - - session = sio.get_session(sid) - user_id = session.get("user_id") - - if not user_id: - return {"msg": "unauthorized"}, 401 - - # Each session is stored independently with sid as key - session_info = { - "user_id": user_id, - "username": session.get("username", "Unknown"), - "avatar": session.get("avatar", None), - "sid": sid, - "connected_at": int(time.time()), # Add timestamp to differentiate tabs - } - - workflow_key = _workflow_key(workflow_id) - # Store session info with sid as key - redis_client.hset(workflow_key, sid, json.dumps(session_info)) - redis_client.set( - _sid_key(sid), - json.dumps({"workflow_id": workflow_id, "user_id": user_id}), - ex=SESSION_STATE_TTL_SECONDS, - ) - _refresh_session_state(workflow_id, sid) - - # Leader election: first session becomes the leader - leader_sid = get_or_set_leader(workflow_id, sid) - is_leader = leader_sid == sid - - sio.enter_room(sid, workflow_id) - broadcast_online_users(workflow_id) - - # Notify this session of their leader status - sio.emit("status", {"isLeader": is_leader}, room=sid) - - return {"msg": "connected", "user_id": user_id, "sid": sid, "isLeader": is_leader} - - -@_sio_on("disconnect") -def handle_disconnect(sid): - """ - Handle session disconnect event. Remove the specific session from online users. - """ - mapping = redis_client.get(_sid_key(sid)) - if mapping: - data = json.loads(mapping) - workflow_id = data["workflow_id"] - - # Remove this specific session - redis_client.hdel(_workflow_key(workflow_id), sid) - redis_client.delete(_sid_key(sid)) - - # Handle leader re-election if the leader session disconnected - handle_leader_disconnect(workflow_id, sid) - - broadcast_online_users(workflow_id) - - -def _clear_session_state(workflow_id: str, sid: str) -> None: - redis_client.hdel(_workflow_key(workflow_id), sid) - redis_client.delete(_sid_key(sid)) - - -def _is_session_active(workflow_id: str, sid: str) -> bool: - if not sid: - return False - - try: - if not sio.manager.is_connected(sid, "/"): - return False - except AttributeError: - return False - - if not redis_client.hexists(_workflow_key(workflow_id), sid): - return False - - if not redis_client.exists(_sid_key(sid)): - return False - - return True - - -def get_or_set_leader(workflow_id: str, sid: str) -> str: - """ - Get current leader session or set this session as leader if no valid leader exists. - Returns the leader session id (sid). - """ - leader_key = _leader_key(workflow_id) - - # 1. Check existing leader - raw_leader = redis_client.get(leader_key) - current_leader = raw_leader.decode("utf-8") if isinstance(raw_leader, bytes) else raw_leader - - # 2. If leader exists, verify aliveness - if current_leader: - if _is_session_active(workflow_id, current_leader): - return current_leader - else: - # Leader dead, clear it - _clear_session_state(workflow_id, current_leader) - redis_client.delete(leader_key) - # Fall through to elect new leader - - # 3. Try to become leader using atomic SETNX - # nx=True ensures we only set if key doesn't exist - was_set = redis_client.set(leader_key, sid, nx=True, ex=SESSION_STATE_TTL_SECONDS) - - if was_set: - # We won the election! - # If we replaced a dead leader, notify everyone - if current_leader: - broadcast_leader_change(workflow_id, sid) - return sid - - # 4. If SETNX failed, someone else became leader concurrently - # Fetch again - raw_leader = redis_client.get(leader_key) - current_leader = raw_leader.decode("utf-8") if isinstance(raw_leader, bytes) else raw_leader - - if current_leader: - return current_leader - - # 5. Fallback (extremely rare race where leader expired/deleted right after SETNX failed) - # Just return self (or recurse) - return sid - - -def handle_leader_disconnect(workflow_id, disconnected_sid): - """ - Handle leader re-election when a session disconnects. - If the disconnected session was the leader, elect a new leader from remaining sessions. - """ - current_leader = redis_client.get(_leader_key(workflow_id)) - - if current_leader: - current_leader = current_leader.decode("utf-8") if isinstance(current_leader, bytes) else current_leader - - if current_leader == disconnected_sid: - # Leader session disconnected, elect a new leader - sessions_json = redis_client.hgetall(_workflow_key(workflow_id)) - - if sessions_json: - # Get the first remaining session as new leader - new_leader_sid = list(sessions_json.keys())[0] - if isinstance(new_leader_sid, bytes): - new_leader_sid = new_leader_sid.decode("utf-8") - - redis_client.set(_leader_key(workflow_id), new_leader_sid, ex=SESSION_STATE_TTL_SECONDS) - - # Notify all sessions about the new leader - broadcast_leader_change(workflow_id, new_leader_sid) - else: - # No sessions left, remove leader - redis_client.delete(_leader_key(workflow_id)) - - -def broadcast_leader_change(workflow_id, new_leader_sid): - """ - Broadcast leader change to all sessions in the workflow. - """ - sessions_json = redis_client.hgetall(_workflow_key(workflow_id)) - - for sid in sessions_json: - try: - sid_str = sid.decode("utf-8") if isinstance(sid, bytes) else sid - is_leader = sid_str == new_leader_sid - # Emit to each session whether they are the new leader - sio.emit("status", {"isLeader": is_leader}, room=sid_str) - except Exception: - logging.exception("Failed to emit leader status to session %s", sid) - continue - - -def get_current_leader(workflow_id): - """ - Get the current leader for a workflow. - """ - leader = redis_client.get(_leader_key(workflow_id)) - return leader.decode("utf-8") if leader and isinstance(leader, bytes) else leader - - -def broadcast_online_users(workflow_id): - """ - Broadcast online users to the workflow room. - Each session is shown as a separate user (even if same person has multiple tabs). - """ - sessions_json = redis_client.hgetall(_workflow_key(workflow_id)) - users = [] - - for session_info_json in sessions_json.values(): - try: - session_info = json.loads(session_info_json) - # Each session appears as a separate "user" in the UI - users.append( - { - "user_id": session_info["user_id"], - "username": session_info["username"], - "avatar": session_info.get("avatar"), - "sid": session_info["sid"], - "connected_at": session_info.get("connected_at"), - } - ) - except Exception: - continue - - # Sort by connection time to maintain consistent order - users.sort(key=lambda x: x.get("connected_at") or 0) - - # Get current leader session - leader_sid = get_current_leader(workflow_id) - - sio.emit("online_users", {"workflow_id": workflow_id, "users": users, "leader": leader_sid}, room=workflow_id) - - -@_sio_on("collaboration_event") -def handle_collaboration_event(sid, data): - """ - Handle general collaboration events, include: - 1. mouse_move - 2. vars_and_features_update - 3. sync_request (ask leader to update graph) - 4. app_state_update - 5. mcp_server_update - 6. workflow_update - 7. comments_update - 8. node_panel_presence - - """ - mapping = redis_client.get(_sid_key(sid)) - - if not mapping: - return {"msg": "unauthorized"}, 401 - - mapping_data = json.loads(mapping) - workflow_id = mapping_data["workflow_id"] - user_id = mapping_data["user_id"] - _refresh_session_state(workflow_id, sid) - - event_type = data.get("type") - event_data = data.get("data") - timestamp = data.get("timestamp", int(time.time())) - - if not event_type: - return {"msg": "invalid event type"}, 400 - - sio.emit( - "collaboration_update", - {"type": event_type, "userId": user_id, "data": event_data, "timestamp": timestamp}, - room=workflow_id, - skip_sid=sid, - ) - - return {"msg": "event_broadcasted"} - - -@_sio_on("graph_event") -def handle_graph_event(sid, data): - """ - Handle graph events - simple broadcast relay. - """ - mapping = redis_client.get(_sid_key(sid)) - - if not mapping: - return {"msg": "unauthorized"}, 401 - - mapping_data = json.loads(mapping) - workflow_id = mapping_data["workflow_id"] - _refresh_session_state(workflow_id, sid) - - sio.emit("graph_update", data, room=workflow_id, skip_sid=sid) - - return {"msg": "graph_update_broadcasted"} diff --git a/api/controllers/console/app/workflow.py b/api/controllers/console/app/workflow.py index ea0b918508..aea88859a4 100644 --- a/api/controllers/console/app/workflow.py +++ b/api/controllers/console/app/workflow.py @@ -12,7 +12,6 @@ from werkzeug.exceptions import Forbidden, InternalServerError, NotFound import services from controllers.console import console_ns from controllers.console.app.error import ConversationCompletedError, DraftWorkflowNotExist, DraftWorkflowNotSync -from controllers.console.app.online_user import WORKFLOW_ONLINE_USERS_PREFIX from controllers.console.app.wraps import get_app_model from controllers.console.wraps import account_initialization_required, edit_permission_required, setup_required from controllers.web.error import InvokeRateLimitError as InvokeRateLimitHttpError @@ -46,6 +45,7 @@ from libs.login import current_account_with_tenant, login_required from models import App from models.model import AppMode from models.workflow import Workflow +from repositories.workflow_collaboration_repository import WORKFLOW_ONLINE_USERS_PREFIX from services.app_generate_service import AppGenerateService from services.errors.app import WorkflowHashNotEqualError from services.errors.llm import InvokeRateLimitError diff --git a/api/controllers/console/socketio/__init__.py b/api/controllers/console/socketio/__init__.py new file mode 100644 index 0000000000..8b13789179 --- /dev/null +++ b/api/controllers/console/socketio/__init__.py @@ -0,0 +1 @@ + diff --git a/api/controllers/console/socketio/workflow.py b/api/controllers/console/socketio/workflow.py new file mode 100644 index 0000000000..619487e78c --- /dev/null +++ b/api/controllers/console/socketio/workflow.py @@ -0,0 +1,107 @@ +import logging +from collections.abc import Callable +from typing import cast + +from flask import Request as FlaskRequest + +from extensions.ext_socketio import sio +from libs.passport import PassportService +from libs.token import extract_access_token +from repositories.workflow_collaboration_repository import WorkflowCollaborationRepository +from services.account_service import AccountService +from services.workflow_collaboration_service import WorkflowCollaborationService + +repository = WorkflowCollaborationRepository() +collaboration_service = WorkflowCollaborationService(repository, sio) + + +def _sio_on(event: str) -> Callable[[Callable[..., object]], Callable[..., object]]: + return cast(Callable[[Callable[..., object]], Callable[..., object]], sio.on(event)) + + +@_sio_on("connect") +def socket_connect(sid, environ, auth): + """ + WebSocket connect event, do authentication here. + """ + token = None + if auth and isinstance(auth, dict): + token = auth.get("token") + + if not token: + try: + request_environ = FlaskRequest(environ) + token = extract_access_token(request_environ) + except Exception: + logging.exception("Failed to extract token") + token = None + + if not token: + return False + + try: + decoded = PassportService().verify(token) + user_id = decoded.get("user_id") + if not user_id: + return False + + with sio.app.app_context(): + user = AccountService.load_logged_in_account(account_id=user_id) + if not user: + return False + + collaboration_service.save_session(sid, user) + return True + + except Exception: + logging.exception("Socket authentication failed") + return False + + +@_sio_on("user_connect") +def handle_user_connect(sid, data): + """ + Handle user connect event. Each session (tab) is treated as an independent collaborator. + """ + workflow_id = data.get("workflow_id") + if not workflow_id: + return {"msg": "workflow_id is required"}, 400 + + result = collaboration_service.register_session(workflow_id, sid) + if not result: + return {"msg": "unauthorized"}, 401 + + user_id, is_leader = result + return {"msg": "connected", "user_id": user_id, "sid": sid, "isLeader": is_leader} + + +@_sio_on("disconnect") +def handle_disconnect(sid): + """ + Handle session disconnect event. Remove the specific session from online users. + """ + collaboration_service.disconnect_session(sid) + + +@_sio_on("collaboration_event") +def handle_collaboration_event(sid, data): + """ + Handle general collaboration events, include: + 1. mouse_move + 2. vars_and_features_update + 3. sync_request (ask leader to update graph) + 4. app_state_update + 5. mcp_server_update + 6. workflow_update + 7. comments_update + 8. node_panel_presence + """ + return collaboration_service.relay_collaboration_event(sid, data) + + +@_sio_on("graph_event") +def handle_graph_event(sid, data): + """ + Handle graph events - simple broadcast relay. + """ + return collaboration_service.relay_graph_event(sid, data) diff --git a/api/repositories/workflow_collaboration_repository.py b/api/repositories/workflow_collaboration_repository.py new file mode 100644 index 0000000000..2c06e90755 --- /dev/null +++ b/api/repositories/workflow_collaboration_repository.py @@ -0,0 +1,144 @@ +from __future__ import annotations + +import json +from typing import TypedDict + +from extensions.ext_redis import redis_client + +SESSION_STATE_TTL_SECONDS = 3600 +WORKFLOW_ONLINE_USERS_PREFIX = "workflow_online_users:" +WORKFLOW_LEADER_PREFIX = "workflow_leader:" +WS_SID_MAP_PREFIX = "ws_sid_map:" + + +class WorkflowSessionInfo(TypedDict): + user_id: str + username: str + avatar: str | None + sid: str + connected_at: int + + +class SidMapping(TypedDict): + workflow_id: str + user_id: str + + +class WorkflowCollaborationRepository: + def __init__(self) -> None: + self._redis = redis_client + + def __repr__(self) -> str: + return f"{self.__class__.__name__}(redis_client={self._redis})" + + @staticmethod + def workflow_key(workflow_id: str) -> str: + return f"{WORKFLOW_ONLINE_USERS_PREFIX}{workflow_id}" + + @staticmethod + def leader_key(workflow_id: str) -> str: + return f"{WORKFLOW_LEADER_PREFIX}{workflow_id}" + + @staticmethod + def sid_key(sid: str) -> str: + return f"{WS_SID_MAP_PREFIX}{sid}" + + @staticmethod + def _decode(value: str | bytes | None) -> str | None: + if value is None: + return None + if isinstance(value, bytes): + return value.decode("utf-8") + return value + + def refresh_session_state(self, workflow_id: str, sid: str) -> None: + workflow_key = self.workflow_key(workflow_id) + sid_key = self.sid_key(sid) + if self._redis.exists(workflow_key): + self._redis.expire(workflow_key, SESSION_STATE_TTL_SECONDS) + if self._redis.exists(sid_key): + self._redis.expire(sid_key, SESSION_STATE_TTL_SECONDS) + + def set_session_info(self, workflow_id: str, session_info: WorkflowSessionInfo) -> None: + workflow_key = self.workflow_key(workflow_id) + self._redis.hset(workflow_key, session_info["sid"], json.dumps(session_info)) + self._redis.set( + self.sid_key(session_info["sid"]), + json.dumps({"workflow_id": workflow_id, "user_id": session_info["user_id"]}), + ex=SESSION_STATE_TTL_SECONDS, + ) + self.refresh_session_state(workflow_id, session_info["sid"]) + + def get_sid_mapping(self, sid: str) -> SidMapping | None: + raw = self._redis.get(self.sid_key(sid)) + if not raw: + return None + value = self._decode(raw) + if not value: + return None + try: + return json.loads(value) + except (TypeError, json.JSONDecodeError): + return None + + def delete_session(self, workflow_id: str, sid: str) -> None: + self._redis.hdel(self.workflow_key(workflow_id), sid) + self._redis.delete(self.sid_key(sid)) + + def session_exists(self, workflow_id: str, sid: str) -> bool: + return bool(self._redis.hexists(self.workflow_key(workflow_id), sid)) + + def sid_mapping_exists(self, sid: str) -> bool: + return bool(self._redis.exists(self.sid_key(sid))) + + def get_session_sids(self, workflow_id: str) -> list[str]: + raw_sids = self._redis.hkeys(self.workflow_key(workflow_id)) + return [self._decode(sid) for sid in raw_sids if self._decode(sid)] + + def list_sessions(self, workflow_id: str) -> list[WorkflowSessionInfo]: + sessions_json = self._redis.hgetall(self.workflow_key(workflow_id)) + users: list[WorkflowSessionInfo] = [] + + for session_info_json in sessions_json.values(): + value = self._decode(session_info_json) + if not value: + continue + try: + session_info = json.loads(value) + except (TypeError, json.JSONDecodeError): + continue + + if not isinstance(session_info, dict): + continue + if "user_id" not in session_info or "username" not in session_info or "sid" not in session_info: + continue + + users.append( + { + "user_id": str(session_info["user_id"]), + "username": str(session_info["username"]), + "avatar": session_info.get("avatar"), + "sid": str(session_info["sid"]), + "connected_at": int(session_info.get("connected_at") or 0), + } + ) + + return users + + def get_current_leader(self, workflow_id: str) -> str | None: + raw = self._redis.get(self.leader_key(workflow_id)) + return self._decode(raw) + + def set_leader_if_absent(self, workflow_id: str, sid: str) -> bool: + return bool( + self._redis.set(self.leader_key(workflow_id), sid, nx=True, ex=SESSION_STATE_TTL_SECONDS) + ) + + def set_leader(self, workflow_id: str, sid: str) -> None: + self._redis.set(self.leader_key(workflow_id), sid, ex=SESSION_STATE_TTL_SECONDS) + + def delete_leader(self, workflow_id: str) -> None: + self._redis.delete(self.leader_key(workflow_id)) + + def expire_leader(self, workflow_id: str) -> None: + self._redis.expire(self.leader_key(workflow_id), SESSION_STATE_TTL_SECONDS) diff --git a/api/services/workflow_collaboration_service.py b/api/services/workflow_collaboration_service.py new file mode 100644 index 0000000000..badb1b0575 --- /dev/null +++ b/api/services/workflow_collaboration_service.py @@ -0,0 +1,185 @@ +from __future__ import annotations + +import logging +import time +from collections.abc import Mapping + +from models.account import Account +from repositories.workflow_collaboration_repository import WorkflowCollaborationRepository, WorkflowSessionInfo + + +class WorkflowCollaborationService: + def __init__(self, repository: WorkflowCollaborationRepository, socketio) -> None: + self._repository = repository + self._socketio = socketio + + def __repr__(self) -> str: + return f"{self.__class__.__name__}(repository={self._repository})" + + def save_session(self, sid: str, user: Account) -> None: + self._socketio.save_session( + sid, + { + "user_id": user.id, + "username": user.name, + "avatar": user.avatar, + }, + ) + + def register_session(self, workflow_id: str, sid: str) -> tuple[str, bool] | None: + session = self._socketio.get_session(sid) + user_id = session.get("user_id") + if not user_id: + return None + + session_info: WorkflowSessionInfo = { + "user_id": str(user_id), + "username": str(session.get("username", "Unknown")), + "avatar": session.get("avatar"), + "sid": sid, + "connected_at": int(time.time()), + } + + self._repository.set_session_info(workflow_id, session_info) + + leader_sid = self.get_or_set_leader(workflow_id, sid) + is_leader = leader_sid == sid + + self._socketio.enter_room(sid, workflow_id) + self.broadcast_online_users(workflow_id) + + self._socketio.emit("status", {"isLeader": is_leader}, room=sid) + + return str(user_id), is_leader + + def disconnect_session(self, sid: str) -> None: + mapping = self._repository.get_sid_mapping(sid) + if not mapping: + return + + workflow_id = mapping["workflow_id"] + self._repository.delete_session(workflow_id, sid) + + self.handle_leader_disconnect(workflow_id, sid) + self.broadcast_online_users(workflow_id) + + def relay_collaboration_event( + self, sid: str, data: Mapping[str, object] + ) -> tuple[dict[str, str], int]: + mapping = self._repository.get_sid_mapping(sid) + if not mapping: + return {"msg": "unauthorized"}, 401 + + workflow_id = mapping["workflow_id"] + user_id = mapping["user_id"] + self._repository.refresh_session_state(workflow_id, sid) + + event_type = data.get("type") + event_data = data.get("data") + timestamp = data.get("timestamp", int(time.time())) + + if not event_type: + return {"msg": "invalid event type"}, 400 + + self._socketio.emit( + "collaboration_update", + {"type": event_type, "userId": user_id, "data": event_data, "timestamp": timestamp}, + room=workflow_id, + skip_sid=sid, + ) + + return {"msg": "event_broadcasted"}, 200 + + def relay_graph_event(self, sid: str, data: object) -> tuple[dict[str, str], int]: + mapping = self._repository.get_sid_mapping(sid) + if not mapping: + return {"msg": "unauthorized"}, 401 + + workflow_id = mapping["workflow_id"] + self._repository.refresh_session_state(workflow_id, sid) + + self._socketio.emit("graph_update", data, room=workflow_id, skip_sid=sid) + + return {"msg": "graph_update_broadcasted"}, 200 + + def get_or_set_leader(self, workflow_id: str, sid: str) -> str: + current_leader = self._repository.get_current_leader(workflow_id) + + if current_leader: + if self.is_session_active(workflow_id, current_leader): + return current_leader + self._repository.delete_session(workflow_id, current_leader) + self._repository.delete_leader(workflow_id) + + was_set = self._repository.set_leader_if_absent(workflow_id, sid) + + if was_set: + if current_leader: + self.broadcast_leader_change(workflow_id, sid) + return sid + + current_leader = self._repository.get_current_leader(workflow_id) + if current_leader: + return current_leader + + return sid + + def handle_leader_disconnect(self, workflow_id: str, disconnected_sid: str) -> None: + current_leader = self._repository.get_current_leader(workflow_id) + if not current_leader: + return + + if current_leader != disconnected_sid: + return + + session_sids = self._repository.get_session_sids(workflow_id) + if session_sids: + new_leader_sid = session_sids[0] + self._repository.set_leader(workflow_id, new_leader_sid) + self.broadcast_leader_change(workflow_id, new_leader_sid) + else: + self._repository.delete_leader(workflow_id) + + def broadcast_leader_change(self, workflow_id: str, new_leader_sid: str) -> None: + for sid in self._repository.get_session_sids(workflow_id): + try: + is_leader = sid == new_leader_sid + self._socketio.emit("status", {"isLeader": is_leader}, room=sid) + except Exception: + logging.exception("Failed to emit leader status to session %s", sid) + + def get_current_leader(self, workflow_id: str) -> str | None: + return self._repository.get_current_leader(workflow_id) + + def broadcast_online_users(self, workflow_id: str) -> None: + users = self._repository.list_sessions(workflow_id) + users.sort(key=lambda x: x.get("connected_at") or 0) + + leader_sid = self.get_current_leader(workflow_id) + + self._socketio.emit( + "online_users", + {"workflow_id": workflow_id, "users": users, "leader": leader_sid}, + room=workflow_id, + ) + + def refresh_session_state(self, workflow_id: str, sid: str) -> None: + self._repository.refresh_session_state(workflow_id, sid) + + def is_session_active(self, workflow_id: str, sid: str) -> bool: + if not sid: + return False + + try: + if not self._socketio.manager.is_connected(sid, "/"): + return False + except AttributeError: + return False + + if not self._repository.session_exists(workflow_id, sid): + return False + + if not self._repository.sid_mapping_exists(sid): + return False + + return True