support broadcast online users

This commit is contained in:
hjlarry 2025-07-18 15:02:34 +08:00
parent 2f35cc9188
commit 74b4719af8
3 changed files with 82 additions and 7 deletions

View File

@ -2,6 +2,7 @@ import json
from flask import request
from flask_restful import Resource, marshal_with, reqparse
from flask_socketio import join_room
from controllers.console import api
from controllers.console.wraps import account_initialization_required, setup_required
@ -36,8 +37,11 @@ def handle_user_connect(data):
}
redis_client.hset(f"workflow_online_users:{workflow_id}", current_user.id, json.dumps(user_info))
redis_client.set(f"ws_sid_map:{sid}", json.dumps({"workflow_id": workflow_id, "user_id": current_user.id}))
join_room(workflow_id)
broadcast_online_users(workflow_id)
return {"msg": "connected", "user_id": current_user.id, "sid": sid}
@ -55,6 +59,25 @@ def handle_disconnect():
redis_client.hdel(f"workflow_online_users:{workflow_id}", user_id)
redis_client.delete(f"ws_sid_map:{sid}")
broadcast_online_users(workflow_id)
def broadcast_online_users(workflow_id):
"""
broadcast online users to the workflow room
"""
users_json = redis_client.hgetall(f"workflow_online_users:{workflow_id}")
users = []
for _, user_info_json in users_json.items():
try:
users.append(json.loads(user_info_json))
except Exception:
continue
ext_socketio.emit(
"online_users",
{"workflow_id": workflow_id, "users": users},
room=workflow_id
)
class OnlineUserApi(Resource):
@setup_required

View File

@ -1,5 +1,6 @@
import {
useCallback,
useEffect,
useMemo,
} from 'react'
import { useFeaturesStore } from '@/app/components/base/features/hooks'
@ -17,6 +18,8 @@ import {
} from '../hooks'
import { useStore, useWorkflowStore } from '@/app/components/workflow/store'
import { useCollaborativeCursors } from '../hooks'
import { connectOnlineUserWebSocket } from '@/service/demo/online-user'
import type { OnlineUser } from '@/service/demo/online-user'
type WorkflowMainProps = Pick<WorkflowProps, 'nodes' | 'edges' | 'viewport'>
const WorkflowMain = ({
@ -26,6 +29,7 @@ const WorkflowMain = ({
}: WorkflowMainProps) => {
const featuresStore = useFeaturesStore()
const workflowStore = useWorkflowStore()
const appId = useStore(s => s.appId)
const handleWorkflowDataUpdate = useCallback((payload: any) => {
const {
@ -65,10 +69,36 @@ const WorkflowMain = ({
handleWorkflowStartRunInChatflow,
handleWorkflowStartRunInWorkflow,
} = useWorkflowStartRun()
const appId = useStore(s => s.appId)
const { cursors, myUserId } = useCollaborativeCursors(appId)
// Add online users logging
useEffect(() => {
if (!appId) return
// Connect to WebSocket for online users
const socket = connectOnlineUserWebSocket(appId)
// Handle online users update
const handleOnlineUsersUpdate = (data: { users: OnlineUser[] }) => {
data.users.forEach((user) => {
console.log(`👤 User: ${user.username} (ID: ${user.user_id})`)
})
}
// Add event listeners
socket.on('online_users', handleOnlineUsersUpdate)
// Log initial connection
console.log('🔌 Connecting to online users WebSocket for app:', appId)
// Cleanup function
return () => {
console.log(' Cleaning up online users WebSocket listeners')
socket.off('online_users', handleOnlineUsersUpdate)
}
}, [appId])
const { fetchInspectVars } = useSetWorkflowVarsWithValue({
flowId: appId,
...useConfigsMap(),

View File

@ -1,15 +1,11 @@
'use client'
import { get } from '../base'
import type { Socket } from 'socket.io-client'
import { io } from 'socket.io-client'
let socket: Socket | null = null
let lastAppId: string | null = null
/**
* Connect to the online user websocket server.
* @param appId The app id to join a specific room or namespace.
* @returns The socket instance.
*/
export function connectOnlineUserWebSocket(appId: string): Socket {
if (socket && lastAppId === appId)
return socket
@ -58,3 +54,29 @@ export function disconnectOnlineUserWebSocket() {
socket = null
}
}
export const getOnlineUsersSocket = (): Socket | null => {
return socket
}
export type OnlineUser = {
user_id: string
username: string
avatar: string
sid: string
}
export type WorkflowOnlineUsers = {
workflow_id: string
users: OnlineUser[]
}
export type OnlineUserListResponse = {
data: WorkflowOnlineUsers[]
}
export const fetchAppsOnlineUsers = (appIds: string[]) => {
return get<OnlineUserListResponse>('/online-users', {
params: { app_ids: appIds.join(',') },
})
}