diff --git a/api/controllers/console/app/online_user.py b/api/controllers/console/app/online_user.py index b0b4dc5013..afb77e1ac6 100644 --- a/api/controllers/console/app/online_user.py +++ b/api/controllers/console/app/online_user.py @@ -2,6 +2,7 @@ import json from flask import request from flask_restful import Resource, marshal_with, reqparse +from flask_socketio import join_room from controllers.console import api from controllers.console.wraps import account_initialization_required, setup_required @@ -36,8 +37,11 @@ def handle_user_connect(data): } redis_client.hset(f"workflow_online_users:{workflow_id}", current_user.id, json.dumps(user_info)) - redis_client.set(f"ws_sid_map:{sid}", json.dumps({"workflow_id": workflow_id, "user_id": current_user.id})) + + join_room(workflow_id) + broadcast_online_users(workflow_id) + return {"msg": "connected", "user_id": current_user.id, "sid": sid} @@ -55,6 +59,25 @@ def handle_disconnect(): redis_client.hdel(f"workflow_online_users:{workflow_id}", user_id) redis_client.delete(f"ws_sid_map:{sid}") + broadcast_online_users(workflow_id) + +def broadcast_online_users(workflow_id): + """ + broadcast online users to the workflow room + """ + users_json = redis_client.hgetall(f"workflow_online_users:{workflow_id}") + users = [] + for _, user_info_json in users_json.items(): + try: + users.append(json.loads(user_info_json)) + except Exception: + continue + ext_socketio.emit( + "online_users", + {"workflow_id": workflow_id, "users": users}, + room=workflow_id + ) + class OnlineUserApi(Resource): @setup_required diff --git a/web/app/components/workflow-app/components/workflow-main.tsx b/web/app/components/workflow-app/components/workflow-main.tsx index 162062f293..30ec41ef35 100644 --- a/web/app/components/workflow-app/components/workflow-main.tsx +++ b/web/app/components/workflow-app/components/workflow-main.tsx @@ -1,5 +1,6 @@ import { useCallback, + useEffect, useMemo, } from 'react' import { useFeaturesStore } from '@/app/components/base/features/hooks' @@ -17,6 +18,8 @@ import { } from '../hooks' import { useStore, useWorkflowStore } from '@/app/components/workflow/store' import { useCollaborativeCursors } from '../hooks' +import { connectOnlineUserWebSocket } from '@/service/demo/online-user' +import type { OnlineUser } from '@/service/demo/online-user' type WorkflowMainProps = Pick const WorkflowMain = ({ @@ -26,6 +29,7 @@ const WorkflowMain = ({ }: WorkflowMainProps) => { const featuresStore = useFeaturesStore() const workflowStore = useWorkflowStore() + const appId = useStore(s => s.appId) const handleWorkflowDataUpdate = useCallback((payload: any) => { const { @@ -65,10 +69,36 @@ const WorkflowMain = ({ handleWorkflowStartRunInChatflow, handleWorkflowStartRunInWorkflow, } = useWorkflowStartRun() - const appId = useStore(s => s.appId) const { cursors, myUserId } = useCollaborativeCursors(appId) + // Add online users logging + useEffect(() => { + if (!appId) return + + // Connect to WebSocket for online users + const socket = connectOnlineUserWebSocket(appId) + + // Handle online users update + const handleOnlineUsersUpdate = (data: { users: OnlineUser[] }) => { + data.users.forEach((user) => { + console.log(`👤 User: ${user.username} (ID: ${user.user_id})`) + }) + } + + // Add event listeners + socket.on('online_users', handleOnlineUsersUpdate) + + // Log initial connection + console.log('🔌 Connecting to online users WebSocket for app:', appId) + + // Cleanup function + return () => { + console.log(' Cleaning up online users WebSocket listeners') + socket.off('online_users', handleOnlineUsersUpdate) + } + }, [appId]) + const { fetchInspectVars } = useSetWorkflowVarsWithValue({ flowId: appId, ...useConfigsMap(), diff --git a/web/service/demo/online-user.ts b/web/service/demo/online-user.ts index 889a78a9ef..50ff7232f5 100644 --- a/web/service/demo/online-user.ts +++ b/web/service/demo/online-user.ts @@ -1,15 +1,11 @@ 'use client' +import { get } from '../base' import type { Socket } from 'socket.io-client' import { io } from 'socket.io-client' let socket: Socket | null = null let lastAppId: string | null = null -/** - * Connect to the online user websocket server. - * @param appId The app id to join a specific room or namespace. - * @returns The socket instance. - */ export function connectOnlineUserWebSocket(appId: string): Socket { if (socket && lastAppId === appId) return socket @@ -58,3 +54,29 @@ export function disconnectOnlineUserWebSocket() { socket = null } } + +export const getOnlineUsersSocket = (): Socket | null => { + return socket +} + +export type OnlineUser = { + user_id: string + username: string + avatar: string + sid: string +} + +export type WorkflowOnlineUsers = { + workflow_id: string + users: OnlineUser[] +} + +export type OnlineUserListResponse = { + data: WorkflowOnlineUsers[] +} + +export const fetchAppsOnlineUsers = (appIds: string[]) => { + return get('/online-users', { + params: { app_ids: appIds.join(',') }, + }) +}