refactor workflow collaboration service

This commit is contained in:
hjlarry 2026-01-19 19:56:18 +08:00
parent 805bb7c468
commit 41473ff450
7 changed files with 439 additions and 373 deletions

View File

@ -58,7 +58,6 @@ from .app import (
mcp_server,
message,
model_config,
online_user,
ops_trace,
site,
statistic,
@ -114,6 +113,7 @@ from .explore import (
recommended_app,
saved_message,
)
from .socketio import workflow as socketio_workflow
# Import tag controllers
from .tag import tags
@ -183,7 +183,6 @@ __all__ = [
"models",
"oauth",
"oauth_server",
"online_user",
"ops_trace",
"parameter",
"ping",

View File

@ -1,370 +0,0 @@
import json
import logging
import time
from collections.abc import Callable
from typing import Any, cast
from flask import Request as FlaskRequest
from extensions.ext_redis import redis_client
from extensions.ext_socketio import sio
from libs.passport import PassportService
from libs.token import extract_access_token
from services.account_service import AccountService
SESSION_STATE_TTL_SECONDS = 3600
WORKFLOW_ONLINE_USERS_PREFIX = "workflow_online_users:"
WORKFLOW_LEADER_PREFIX = "workflow_leader:"
WS_SID_MAP_PREFIX = "ws_sid_map:"
def _workflow_key(workflow_id: str) -> str:
return f"{WORKFLOW_ONLINE_USERS_PREFIX}{workflow_id}"
def _leader_key(workflow_id: str) -> str:
return f"{WORKFLOW_LEADER_PREFIX}{workflow_id}"
def _sid_key(sid: str) -> str:
return f"{WS_SID_MAP_PREFIX}{sid}"
def _refresh_session_state(workflow_id: str, sid: str) -> None:
"""
Refresh TTLs for workflow + session keys so healthy sessions do not linger forever after crashes.
"""
workflow_key = _workflow_key(workflow_id)
sid_key = _sid_key(sid)
if redis_client.exists(workflow_key):
redis_client.expire(workflow_key, SESSION_STATE_TTL_SECONDS)
if redis_client.exists(sid_key):
redis_client.expire(sid_key, SESSION_STATE_TTL_SECONDS)
def _sio_on(event: str) -> Callable[[Callable[..., Any]], Callable[..., Any]]:
return cast(Callable[[Callable[..., Any]], Callable[..., Any]], sio.on(event))
@_sio_on("connect")
def socket_connect(sid, environ, auth):
"""
WebSocket connect event, do authentication here.
"""
token = None
if auth and isinstance(auth, dict):
token = auth.get("token")
if not token:
try:
request_environ = FlaskRequest(environ)
token = extract_access_token(request_environ)
except Exception:
logging.exception("Failed to extract token")
token = None
if not token:
return False
try:
decoded = PassportService().verify(token)
user_id = decoded.get("user_id")
if not user_id:
return False
with sio.app.app_context():
user = AccountService.load_logged_in_account(account_id=user_id)
if not user:
return False
sio.save_session(sid, {"user_id": user.id, "username": user.name, "avatar": user.avatar})
return True
except Exception:
logging.exception("Socket authentication failed")
return False
@_sio_on("user_connect")
def handle_user_connect(sid, data):
"""
Handle user connect event. Each session (tab) is treated as an independent collaborator.
"""
workflow_id = data.get("workflow_id")
if not workflow_id:
return {"msg": "workflow_id is required"}, 400
session = sio.get_session(sid)
user_id = session.get("user_id")
if not user_id:
return {"msg": "unauthorized"}, 401
# Each session is stored independently with sid as key
session_info = {
"user_id": user_id,
"username": session.get("username", "Unknown"),
"avatar": session.get("avatar", None),
"sid": sid,
"connected_at": int(time.time()), # Add timestamp to differentiate tabs
}
workflow_key = _workflow_key(workflow_id)
# Store session info with sid as key
redis_client.hset(workflow_key, sid, json.dumps(session_info))
redis_client.set(
_sid_key(sid),
json.dumps({"workflow_id": workflow_id, "user_id": user_id}),
ex=SESSION_STATE_TTL_SECONDS,
)
_refresh_session_state(workflow_id, sid)
# Leader election: first session becomes the leader
leader_sid = get_or_set_leader(workflow_id, sid)
is_leader = leader_sid == sid
sio.enter_room(sid, workflow_id)
broadcast_online_users(workflow_id)
# Notify this session of their leader status
sio.emit("status", {"isLeader": is_leader}, room=sid)
return {"msg": "connected", "user_id": user_id, "sid": sid, "isLeader": is_leader}
@_sio_on("disconnect")
def handle_disconnect(sid):
"""
Handle session disconnect event. Remove the specific session from online users.
"""
mapping = redis_client.get(_sid_key(sid))
if mapping:
data = json.loads(mapping)
workflow_id = data["workflow_id"]
# Remove this specific session
redis_client.hdel(_workflow_key(workflow_id), sid)
redis_client.delete(_sid_key(sid))
# Handle leader re-election if the leader session disconnected
handle_leader_disconnect(workflow_id, sid)
broadcast_online_users(workflow_id)
def _clear_session_state(workflow_id: str, sid: str) -> None:
redis_client.hdel(_workflow_key(workflow_id), sid)
redis_client.delete(_sid_key(sid))
def _is_session_active(workflow_id: str, sid: str) -> bool:
if not sid:
return False
try:
if not sio.manager.is_connected(sid, "/"):
return False
except AttributeError:
return False
if not redis_client.hexists(_workflow_key(workflow_id), sid):
return False
if not redis_client.exists(_sid_key(sid)):
return False
return True
def get_or_set_leader(workflow_id: str, sid: str) -> str:
"""
Get current leader session or set this session as leader if no valid leader exists.
Returns the leader session id (sid).
"""
leader_key = _leader_key(workflow_id)
# 1. Check existing leader
raw_leader = redis_client.get(leader_key)
current_leader = raw_leader.decode("utf-8") if isinstance(raw_leader, bytes) else raw_leader
# 2. If leader exists, verify aliveness
if current_leader:
if _is_session_active(workflow_id, current_leader):
return current_leader
else:
# Leader dead, clear it
_clear_session_state(workflow_id, current_leader)
redis_client.delete(leader_key)
# Fall through to elect new leader
# 3. Try to become leader using atomic SETNX
# nx=True ensures we only set if key doesn't exist
was_set = redis_client.set(leader_key, sid, nx=True, ex=SESSION_STATE_TTL_SECONDS)
if was_set:
# We won the election!
# If we replaced a dead leader, notify everyone
if current_leader:
broadcast_leader_change(workflow_id, sid)
return sid
# 4. If SETNX failed, someone else became leader concurrently
# Fetch again
raw_leader = redis_client.get(leader_key)
current_leader = raw_leader.decode("utf-8") if isinstance(raw_leader, bytes) else raw_leader
if current_leader:
return current_leader
# 5. Fallback (extremely rare race where leader expired/deleted right after SETNX failed)
# Just return self (or recurse)
return sid
def handle_leader_disconnect(workflow_id, disconnected_sid):
"""
Handle leader re-election when a session disconnects.
If the disconnected session was the leader, elect a new leader from remaining sessions.
"""
current_leader = redis_client.get(_leader_key(workflow_id))
if current_leader:
current_leader = current_leader.decode("utf-8") if isinstance(current_leader, bytes) else current_leader
if current_leader == disconnected_sid:
# Leader session disconnected, elect a new leader
sessions_json = redis_client.hgetall(_workflow_key(workflow_id))
if sessions_json:
# Get the first remaining session as new leader
new_leader_sid = list(sessions_json.keys())[0]
if isinstance(new_leader_sid, bytes):
new_leader_sid = new_leader_sid.decode("utf-8")
redis_client.set(_leader_key(workflow_id), new_leader_sid, ex=SESSION_STATE_TTL_SECONDS)
# Notify all sessions about the new leader
broadcast_leader_change(workflow_id, new_leader_sid)
else:
# No sessions left, remove leader
redis_client.delete(_leader_key(workflow_id))
def broadcast_leader_change(workflow_id, new_leader_sid):
"""
Broadcast leader change to all sessions in the workflow.
"""
sessions_json = redis_client.hgetall(_workflow_key(workflow_id))
for sid in sessions_json:
try:
sid_str = sid.decode("utf-8") if isinstance(sid, bytes) else sid
is_leader = sid_str == new_leader_sid
# Emit to each session whether they are the new leader
sio.emit("status", {"isLeader": is_leader}, room=sid_str)
except Exception:
logging.exception("Failed to emit leader status to session %s", sid)
continue
def get_current_leader(workflow_id):
"""
Get the current leader for a workflow.
"""
leader = redis_client.get(_leader_key(workflow_id))
return leader.decode("utf-8") if leader and isinstance(leader, bytes) else leader
def broadcast_online_users(workflow_id):
"""
Broadcast online users to the workflow room.
Each session is shown as a separate user (even if same person has multiple tabs).
"""
sessions_json = redis_client.hgetall(_workflow_key(workflow_id))
users = []
for session_info_json in sessions_json.values():
try:
session_info = json.loads(session_info_json)
# Each session appears as a separate "user" in the UI
users.append(
{
"user_id": session_info["user_id"],
"username": session_info["username"],
"avatar": session_info.get("avatar"),
"sid": session_info["sid"],
"connected_at": session_info.get("connected_at"),
}
)
except Exception:
continue
# Sort by connection time to maintain consistent order
users.sort(key=lambda x: x.get("connected_at") or 0)
# Get current leader session
leader_sid = get_current_leader(workflow_id)
sio.emit("online_users", {"workflow_id": workflow_id, "users": users, "leader": leader_sid}, room=workflow_id)
@_sio_on("collaboration_event")
def handle_collaboration_event(sid, data):
"""
Handle general collaboration events, include:
1. mouse_move
2. vars_and_features_update
3. sync_request (ask leader to update graph)
4. app_state_update
5. mcp_server_update
6. workflow_update
7. comments_update
8. node_panel_presence
"""
mapping = redis_client.get(_sid_key(sid))
if not mapping:
return {"msg": "unauthorized"}, 401
mapping_data = json.loads(mapping)
workflow_id = mapping_data["workflow_id"]
user_id = mapping_data["user_id"]
_refresh_session_state(workflow_id, sid)
event_type = data.get("type")
event_data = data.get("data")
timestamp = data.get("timestamp", int(time.time()))
if not event_type:
return {"msg": "invalid event type"}, 400
sio.emit(
"collaboration_update",
{"type": event_type, "userId": user_id, "data": event_data, "timestamp": timestamp},
room=workflow_id,
skip_sid=sid,
)
return {"msg": "event_broadcasted"}
@_sio_on("graph_event")
def handle_graph_event(sid, data):
"""
Handle graph events - simple broadcast relay.
"""
mapping = redis_client.get(_sid_key(sid))
if not mapping:
return {"msg": "unauthorized"}, 401
mapping_data = json.loads(mapping)
workflow_id = mapping_data["workflow_id"]
_refresh_session_state(workflow_id, sid)
sio.emit("graph_update", data, room=workflow_id, skip_sid=sid)
return {"msg": "graph_update_broadcasted"}

View File

@ -12,7 +12,6 @@ from werkzeug.exceptions import Forbidden, InternalServerError, NotFound
import services
from controllers.console import console_ns
from controllers.console.app.error import ConversationCompletedError, DraftWorkflowNotExist, DraftWorkflowNotSync
from controllers.console.app.online_user import WORKFLOW_ONLINE_USERS_PREFIX
from controllers.console.app.wraps import get_app_model
from controllers.console.wraps import account_initialization_required, edit_permission_required, setup_required
from controllers.web.error import InvokeRateLimitError as InvokeRateLimitHttpError
@ -46,6 +45,7 @@ from libs.login import current_account_with_tenant, login_required
from models import App
from models.model import AppMode
from models.workflow import Workflow
from repositories.workflow_collaboration_repository import WORKFLOW_ONLINE_USERS_PREFIX
from services.app_generate_service import AppGenerateService
from services.errors.app import WorkflowHashNotEqualError
from services.errors.llm import InvokeRateLimitError

View File

@ -0,0 +1 @@

View File

@ -0,0 +1,107 @@
import logging
from collections.abc import Callable
from typing import cast
from flask import Request as FlaskRequest
from extensions.ext_socketio import sio
from libs.passport import PassportService
from libs.token import extract_access_token
from repositories.workflow_collaboration_repository import WorkflowCollaborationRepository
from services.account_service import AccountService
from services.workflow_collaboration_service import WorkflowCollaborationService
repository = WorkflowCollaborationRepository()
collaboration_service = WorkflowCollaborationService(repository, sio)
def _sio_on(event: str) -> Callable[[Callable[..., object]], Callable[..., object]]:
return cast(Callable[[Callable[..., object]], Callable[..., object]], sio.on(event))
@_sio_on("connect")
def socket_connect(sid, environ, auth):
"""
WebSocket connect event, do authentication here.
"""
token = None
if auth and isinstance(auth, dict):
token = auth.get("token")
if not token:
try:
request_environ = FlaskRequest(environ)
token = extract_access_token(request_environ)
except Exception:
logging.exception("Failed to extract token")
token = None
if not token:
return False
try:
decoded = PassportService().verify(token)
user_id = decoded.get("user_id")
if not user_id:
return False
with sio.app.app_context():
user = AccountService.load_logged_in_account(account_id=user_id)
if not user:
return False
collaboration_service.save_session(sid, user)
return True
except Exception:
logging.exception("Socket authentication failed")
return False
@_sio_on("user_connect")
def handle_user_connect(sid, data):
"""
Handle user connect event. Each session (tab) is treated as an independent collaborator.
"""
workflow_id = data.get("workflow_id")
if not workflow_id:
return {"msg": "workflow_id is required"}, 400
result = collaboration_service.register_session(workflow_id, sid)
if not result:
return {"msg": "unauthorized"}, 401
user_id, is_leader = result
return {"msg": "connected", "user_id": user_id, "sid": sid, "isLeader": is_leader}
@_sio_on("disconnect")
def handle_disconnect(sid):
"""
Handle session disconnect event. Remove the specific session from online users.
"""
collaboration_service.disconnect_session(sid)
@_sio_on("collaboration_event")
def handle_collaboration_event(sid, data):
"""
Handle general collaboration events, include:
1. mouse_move
2. vars_and_features_update
3. sync_request (ask leader to update graph)
4. app_state_update
5. mcp_server_update
6. workflow_update
7. comments_update
8. node_panel_presence
"""
return collaboration_service.relay_collaboration_event(sid, data)
@_sio_on("graph_event")
def handle_graph_event(sid, data):
"""
Handle graph events - simple broadcast relay.
"""
return collaboration_service.relay_graph_event(sid, data)

View File

@ -0,0 +1,144 @@
from __future__ import annotations
import json
from typing import TypedDict
from extensions.ext_redis import redis_client
SESSION_STATE_TTL_SECONDS = 3600
WORKFLOW_ONLINE_USERS_PREFIX = "workflow_online_users:"
WORKFLOW_LEADER_PREFIX = "workflow_leader:"
WS_SID_MAP_PREFIX = "ws_sid_map:"
class WorkflowSessionInfo(TypedDict):
user_id: str
username: str
avatar: str | None
sid: str
connected_at: int
class SidMapping(TypedDict):
workflow_id: str
user_id: str
class WorkflowCollaborationRepository:
def __init__(self) -> None:
self._redis = redis_client
def __repr__(self) -> str:
return f"{self.__class__.__name__}(redis_client={self._redis})"
@staticmethod
def workflow_key(workflow_id: str) -> str:
return f"{WORKFLOW_ONLINE_USERS_PREFIX}{workflow_id}"
@staticmethod
def leader_key(workflow_id: str) -> str:
return f"{WORKFLOW_LEADER_PREFIX}{workflow_id}"
@staticmethod
def sid_key(sid: str) -> str:
return f"{WS_SID_MAP_PREFIX}{sid}"
@staticmethod
def _decode(value: str | bytes | None) -> str | None:
if value is None:
return None
if isinstance(value, bytes):
return value.decode("utf-8")
return value
def refresh_session_state(self, workflow_id: str, sid: str) -> None:
workflow_key = self.workflow_key(workflow_id)
sid_key = self.sid_key(sid)
if self._redis.exists(workflow_key):
self._redis.expire(workflow_key, SESSION_STATE_TTL_SECONDS)
if self._redis.exists(sid_key):
self._redis.expire(sid_key, SESSION_STATE_TTL_SECONDS)
def set_session_info(self, workflow_id: str, session_info: WorkflowSessionInfo) -> None:
workflow_key = self.workflow_key(workflow_id)
self._redis.hset(workflow_key, session_info["sid"], json.dumps(session_info))
self._redis.set(
self.sid_key(session_info["sid"]),
json.dumps({"workflow_id": workflow_id, "user_id": session_info["user_id"]}),
ex=SESSION_STATE_TTL_SECONDS,
)
self.refresh_session_state(workflow_id, session_info["sid"])
def get_sid_mapping(self, sid: str) -> SidMapping | None:
raw = self._redis.get(self.sid_key(sid))
if not raw:
return None
value = self._decode(raw)
if not value:
return None
try:
return json.loads(value)
except (TypeError, json.JSONDecodeError):
return None
def delete_session(self, workflow_id: str, sid: str) -> None:
self._redis.hdel(self.workflow_key(workflow_id), sid)
self._redis.delete(self.sid_key(sid))
def session_exists(self, workflow_id: str, sid: str) -> bool:
return bool(self._redis.hexists(self.workflow_key(workflow_id), sid))
def sid_mapping_exists(self, sid: str) -> bool:
return bool(self._redis.exists(self.sid_key(sid)))
def get_session_sids(self, workflow_id: str) -> list[str]:
raw_sids = self._redis.hkeys(self.workflow_key(workflow_id))
return [self._decode(sid) for sid in raw_sids if self._decode(sid)]
def list_sessions(self, workflow_id: str) -> list[WorkflowSessionInfo]:
sessions_json = self._redis.hgetall(self.workflow_key(workflow_id))
users: list[WorkflowSessionInfo] = []
for session_info_json in sessions_json.values():
value = self._decode(session_info_json)
if not value:
continue
try:
session_info = json.loads(value)
except (TypeError, json.JSONDecodeError):
continue
if not isinstance(session_info, dict):
continue
if "user_id" not in session_info or "username" not in session_info or "sid" not in session_info:
continue
users.append(
{
"user_id": str(session_info["user_id"]),
"username": str(session_info["username"]),
"avatar": session_info.get("avatar"),
"sid": str(session_info["sid"]),
"connected_at": int(session_info.get("connected_at") or 0),
}
)
return users
def get_current_leader(self, workflow_id: str) -> str | None:
raw = self._redis.get(self.leader_key(workflow_id))
return self._decode(raw)
def set_leader_if_absent(self, workflow_id: str, sid: str) -> bool:
return bool(
self._redis.set(self.leader_key(workflow_id), sid, nx=True, ex=SESSION_STATE_TTL_SECONDS)
)
def set_leader(self, workflow_id: str, sid: str) -> None:
self._redis.set(self.leader_key(workflow_id), sid, ex=SESSION_STATE_TTL_SECONDS)
def delete_leader(self, workflow_id: str) -> None:
self._redis.delete(self.leader_key(workflow_id))
def expire_leader(self, workflow_id: str) -> None:
self._redis.expire(self.leader_key(workflow_id), SESSION_STATE_TTL_SECONDS)

View File

@ -0,0 +1,185 @@
from __future__ import annotations
import logging
import time
from collections.abc import Mapping
from models.account import Account
from repositories.workflow_collaboration_repository import WorkflowCollaborationRepository, WorkflowSessionInfo
class WorkflowCollaborationService:
def __init__(self, repository: WorkflowCollaborationRepository, socketio) -> None:
self._repository = repository
self._socketio = socketio
def __repr__(self) -> str:
return f"{self.__class__.__name__}(repository={self._repository})"
def save_session(self, sid: str, user: Account) -> None:
self._socketio.save_session(
sid,
{
"user_id": user.id,
"username": user.name,
"avatar": user.avatar,
},
)
def register_session(self, workflow_id: str, sid: str) -> tuple[str, bool] | None:
session = self._socketio.get_session(sid)
user_id = session.get("user_id")
if not user_id:
return None
session_info: WorkflowSessionInfo = {
"user_id": str(user_id),
"username": str(session.get("username", "Unknown")),
"avatar": session.get("avatar"),
"sid": sid,
"connected_at": int(time.time()),
}
self._repository.set_session_info(workflow_id, session_info)
leader_sid = self.get_or_set_leader(workflow_id, sid)
is_leader = leader_sid == sid
self._socketio.enter_room(sid, workflow_id)
self.broadcast_online_users(workflow_id)
self._socketio.emit("status", {"isLeader": is_leader}, room=sid)
return str(user_id), is_leader
def disconnect_session(self, sid: str) -> None:
mapping = self._repository.get_sid_mapping(sid)
if not mapping:
return
workflow_id = mapping["workflow_id"]
self._repository.delete_session(workflow_id, sid)
self.handle_leader_disconnect(workflow_id, sid)
self.broadcast_online_users(workflow_id)
def relay_collaboration_event(
self, sid: str, data: Mapping[str, object]
) -> tuple[dict[str, str], int]:
mapping = self._repository.get_sid_mapping(sid)
if not mapping:
return {"msg": "unauthorized"}, 401
workflow_id = mapping["workflow_id"]
user_id = mapping["user_id"]
self._repository.refresh_session_state(workflow_id, sid)
event_type = data.get("type")
event_data = data.get("data")
timestamp = data.get("timestamp", int(time.time()))
if not event_type:
return {"msg": "invalid event type"}, 400
self._socketio.emit(
"collaboration_update",
{"type": event_type, "userId": user_id, "data": event_data, "timestamp": timestamp},
room=workflow_id,
skip_sid=sid,
)
return {"msg": "event_broadcasted"}, 200
def relay_graph_event(self, sid: str, data: object) -> tuple[dict[str, str], int]:
mapping = self._repository.get_sid_mapping(sid)
if not mapping:
return {"msg": "unauthorized"}, 401
workflow_id = mapping["workflow_id"]
self._repository.refresh_session_state(workflow_id, sid)
self._socketio.emit("graph_update", data, room=workflow_id, skip_sid=sid)
return {"msg": "graph_update_broadcasted"}, 200
def get_or_set_leader(self, workflow_id: str, sid: str) -> str:
current_leader = self._repository.get_current_leader(workflow_id)
if current_leader:
if self.is_session_active(workflow_id, current_leader):
return current_leader
self._repository.delete_session(workflow_id, current_leader)
self._repository.delete_leader(workflow_id)
was_set = self._repository.set_leader_if_absent(workflow_id, sid)
if was_set:
if current_leader:
self.broadcast_leader_change(workflow_id, sid)
return sid
current_leader = self._repository.get_current_leader(workflow_id)
if current_leader:
return current_leader
return sid
def handle_leader_disconnect(self, workflow_id: str, disconnected_sid: str) -> None:
current_leader = self._repository.get_current_leader(workflow_id)
if not current_leader:
return
if current_leader != disconnected_sid:
return
session_sids = self._repository.get_session_sids(workflow_id)
if session_sids:
new_leader_sid = session_sids[0]
self._repository.set_leader(workflow_id, new_leader_sid)
self.broadcast_leader_change(workflow_id, new_leader_sid)
else:
self._repository.delete_leader(workflow_id)
def broadcast_leader_change(self, workflow_id: str, new_leader_sid: str) -> None:
for sid in self._repository.get_session_sids(workflow_id):
try:
is_leader = sid == new_leader_sid
self._socketio.emit("status", {"isLeader": is_leader}, room=sid)
except Exception:
logging.exception("Failed to emit leader status to session %s", sid)
def get_current_leader(self, workflow_id: str) -> str | None:
return self._repository.get_current_leader(workflow_id)
def broadcast_online_users(self, workflow_id: str) -> None:
users = self._repository.list_sessions(workflow_id)
users.sort(key=lambda x: x.get("connected_at") or 0)
leader_sid = self.get_current_leader(workflow_id)
self._socketio.emit(
"online_users",
{"workflow_id": workflow_id, "users": users, "leader": leader_sid},
room=workflow_id,
)
def refresh_session_state(self, workflow_id: str, sid: str) -> None:
self._repository.refresh_session_state(workflow_id, sid)
def is_session_active(self, workflow_id: str, sid: str) -> bool:
if not sid:
return False
try:
if not self._socketio.manager.is_connected(sid, "/"):
return False
except AttributeError:
return False
if not self._repository.session_exists(workflow_id, sid):
return False
if not self._repository.sid_mapping_exists(sid):
return False
return True