establish websocket connection

This commit is contained in:
hjlarry 2025-07-17 15:36:50 +08:00
parent a4f421028c
commit 0d7d27ec0b
11 changed files with 2420 additions and 2122 deletions

View File

@ -1,5 +1,6 @@
import os
import sys
import logging
def is_db_command():
@ -33,9 +34,10 @@ else:
psycogreen.gevent.patch_psycopg()
from app_factory import create_app
from extensions.ext_socketio import ext_socketio
app = create_app()
celery = app.extensions["celery"]
if __name__ == "__main__":
app.run(host="0.0.0.0", port=5001)
ext_socketio.run(app, host="0.0.0.0", port=5001, debug=True)

View File

@ -57,6 +57,7 @@ def initialize_extensions(app: DifyApp):
ext_request_logging,
ext_sentry,
ext_set_secretkey,
ext_socketio,
ext_storage,
ext_timezone,
ext_warnings,
@ -85,6 +86,7 @@ def initialize_extensions(app: DifyApp):
ext_commands,
ext_otel,
ext_request_logging,
ext_socketio,
]
for ext in extensions:
short_name = ext.__name__.split(".")[-1]

View File

@ -0,0 +1,60 @@
import json
from flask_login import current_user, login_required
from flask import request
from extensions.ext_socketio import ext_socketio
from extensions.ext_redis import redis_client
@ext_socketio.on('user_connect')
@login_required
def handle_user_connect(data):
"""
Handle user connect event, check login and get user info.
"""
sid = request.sid
workflow_id = data.get('workflow_id')
old_info_json = redis_client.hget(f"workflow_online_users:{workflow_id}", current_user.id)
if old_info_json:
old_info = json.loads(old_info_json)
old_sid = old_info.get("sid")
if old_sid and old_sid != sid:
ext_socketio.server.disconnect(sid=old_sid)
user_info = {
"user_id": current_user.id,
"username": getattr(current_user, "username", ""),
"avatar": getattr(current_user, "avatar", ""),
"sid": sid
}
redis_client.hset(
f"workflow_online_users:{workflow_id}",
current_user.id,
json.dumps(user_info)
)
redis_client.set(
f"ws_sid_map:{sid}",
json.dumps({
"workflow_id": workflow_id,
"user_id": current_user.id
})
)
return {'msg': 'connected', 'user_id': current_user.id, 'sid': sid}
@ext_socketio.on('disconnect')
def handle_disconnect():
"""
Handle user disconnect event, remove user from workflow's online user list.
"""
sid = request.sid
mapping = redis_client.get(f"ws_sid_map:{sid}")
if mapping:
data = json.loads(mapping)
workflow_id = data["workflow_id"]
user_id = data["user_id"]
redis_client.hdel(f"workflow_online_users:{workflow_id}", user_id)
redis_client.delete(f"ws_sid_map:{sid}")

View File

@ -28,12 +28,13 @@ elif [[ "${MODE}" == "beat" ]]; then
exec celery -A app.celery beat --loglevel ${LOG_LEVEL:-INFO}
else
if [[ "${DEBUG}" == "true" ]]; then
# TODO: add socketio support
exec flask run --host=${DIFY_BIND_ADDRESS:-0.0.0.0} --port=${DIFY_PORT:-5001} --debug
else
exec gunicorn \
--bind "${DIFY_BIND_ADDRESS:-0.0.0.0}:${DIFY_PORT:-5001}" \
--workers ${SERVER_WORKER_AMOUNT:-1} \
--worker-class ${SERVER_WORKER_CLASS:-gevent} \
--worker-class ${SERVER_WORKER_CLASS:-geventwebsocket.gunicorn.workers.GeventWebSocketWorker} \
--worker-connections ${SERVER_WORKER_CONNECTIONS:-10} \
--timeout ${GUNICORN_TIMEOUT:-200} \
app:app

View File

@ -0,0 +1,10 @@
from flask_socketio import SocketIO
from configs import dify_config
from dify_app import DifyApp
ext_socketio = SocketIO()
def init_app(app: DifyApp):
ext_socketio.init_app(app, async_mode='gevent', cors_allowed_origins=dify_config.CONSOLE_CORS_ALLOW_ORIGINS)

View File

@ -19,8 +19,10 @@ dependencies = [
"flask-login~=0.6.3",
"flask-migrate~=4.0.7",
"flask-restful~=0.3.10",
"flask-socketio~=5.5.1",
"flask-sqlalchemy~=3.1.1",
"gevent~=24.11.1",
"gevent-websocket~=0.10.1",
"gmpy2~=2.2.1",
"google-api-core==2.18.0",
"google-api-python-client==2.90.0",

File diff suppressed because it is too large Load Diff

View File

@ -26,6 +26,7 @@ import Loading from '@/app/components/base/loading'
import useBreakpoints, { MediaType } from '@/hooks/use-breakpoints'
import type { App } from '@/types/app'
import useDocumentTitle from '@/hooks/use-document-title'
import { connectOnlineUserWebSocket, disconnectOnlineUserWebSocket } from '@/service/demo/online-user'
export type IAppDetailLayoutProps = {
children: React.ReactNode
@ -114,6 +115,8 @@ const AppDetailLayout: FC<IAppDetailLayoutProps> = (props) => {
setIsLoadingAppDetail(true)
fetchAppDetail({ url: '/apps', id: appId }).then((res) => {
setAppDetailRes(res)
if (res.mode === 'workflow' || res.mode === 'advanced-chat')
connectOnlineUserWebSocket(appId)
}).catch((e: any) => {
if (e.status === 404)
router.replace('/apps')
@ -148,6 +151,7 @@ const AppDetailLayout: FC<IAppDetailLayoutProps> = (props) => {
useUnmount(() => {
setAppDetail()
disconnectOnlineUserWebSocket()
})
if (!appDetail) {

View File

@ -141,6 +141,7 @@
"server-only": "^0.0.1",
"sharp": "^0.33.2",
"shave": "^5.0.4",
"socket.io-client": "^4.8.1",
"sortablejs": "^1.15.0",
"swr": "^2.3.0",
"tailwind-merge": "^2.5.4",
@ -242,5 +243,6 @@
"prismjs@<1.30.0": "1.30.0",
"brace-expansion@<2.0.2": "2.0.2"
}
}
},
"packageManager": "pnpm@10.11.1+sha512.e519b9f7639869dc8d5c3c5dfef73b3f091094b0a006d7317353c72b124e80e1afd429732e28705ad6bfa1ee879c1fce46c128ccebd3192101f43dd67c667912"
}

View File

@ -324,6 +324,9 @@ importers:
shave:
specifier: ^5.0.4
version: 5.0.4
socket.io-client:
specifier: ^4.8.1
version: 4.8.1
sortablejs:
specifier: ^1.15.0
version: 1.15.6
@ -2690,6 +2693,9 @@ packages:
'@sinonjs/fake-timers@10.3.0':
resolution: {integrity: sha512-V4BG07kuYSUkTCSBHG8G8TNhM+F19jXFWnQtzj+we8DrkpSBCee9Z3Ms8yiGer/dlmhe35/Xdgyo3/0rQKg7YA==}
'@socket.io/component-emitter@3.1.2':
resolution: {integrity: sha512-9BCxFwvbGg/RsZK9tjXd8s4UcwR0MWeFQ1XEKIQVVvAGJyINdrqKMcTRyLoK8Rse1GjzLV9cwjWV1olXRWEXVA==}
'@storybook/addon-actions@8.5.0':
resolution: {integrity: sha512-6CW9+17rk5eNx6I8EKqCxRKtsJFTR/lHL+xiJ6/iBWApIm8sg63vhXvUTJ58UixmIkT5oLh0+ESNPh+x10D8fw==}
peerDependencies:
@ -4569,6 +4575,15 @@ packages:
supports-color:
optional: true
debug@4.3.7:
resolution: {integrity: sha512-Er2nc/H7RrMXZBFCEim6TCmMk02Z8vLC2Rbi1KEBggpo0fS6l0S1nnapwmIi3yW/+GOJap1Krg4w0Hg80oCqgQ==}
engines: {node: '>=6.0'}
peerDependencies:
supports-color: '*'
peerDependenciesMeta:
supports-color:
optional: true
debug@4.4.0:
resolution: {integrity: sha512-6WTZ/IxCY/T6BALoZHaE4ctp9xm+Z5kY/pzYaCHRFeyVhojxlrm+46y68HA6hr0TcwEssoxNiDEUJQjfPZ/RYA==}
engines: {node: '>=6.0'}
@ -4775,6 +4790,13 @@ packages:
endent@2.1.0:
resolution: {integrity: sha512-r8VyPX7XL8U01Xgnb1CjZ3XV+z90cXIJ9JPE/R9SEC9vpw2P6CfsRPJmp20DppC5N7ZAMCmjYkJIa744Iyg96w==}
engine.io-client@6.6.3:
resolution: {integrity: sha512-T0iLjnyNWahNyv/lcjS2y4oE358tVS/SYQNxYXGAJ9/GLgH4VCvOQ/mhTjqU88mLZCQgiG8RIegFHYCdVC+j5w==}
engine.io-parser@5.2.3:
resolution: {integrity: sha512-HqD3yTBfnBxIrbnM1DoD6Pcq8NECnh8d4As1Qgh0z5Gg3jRRIqijury0CL3ghu/edArpUYiYqQiDUQBIs4np3Q==}
engines: {node: '>=10.0.0'}
enhanced-resolve@5.18.1:
resolution: {integrity: sha512-ZSW3ma5GkcQBIpwZTSRAI8N71Uuwgs93IezB7mf7R60tC8ZbJideoDNKjHn2O9KIlx6rkGTTEk1xUCK2E1Y2Yg==}
engines: {node: '>=10.13.0'}
@ -7940,6 +7962,14 @@ packages:
resolution: {integrity: sha512-bSiSngZ/jWeX93BqeIAbImyTbEihizcwNjFoRUIY/T1wWQsfsm2Vw1agPKylXvQTU7iASGdHhyqRlqQzfz+Htg==}
engines: {node: '>=18'}
socket.io-client@4.8.1:
resolution: {integrity: sha512-hJVXfu3E28NmzGk8o1sHhN3om52tRvwYeidbj7xKy2eIIse5IoKX3USlS6Tqt3BHAtflLIkCQBkzVrEEfWUyYQ==}
engines: {node: '>=10.0.0'}
socket.io-parser@4.2.4:
resolution: {integrity: sha512-/GbIKmo8ioc+NIWIhwdecY0ge+qVBSMdgxGygevmdHj24bsfgtCmcUUcQ5ZzcylGFHsN3k4HB4Cgkl96KVnuew==}
engines: {node: '>=10.0.0'}
sortablejs@1.15.6:
resolution: {integrity: sha512-aNfiuwMEpfBM/CN6LY0ibyhxPfPbyFeBTYJKCvzkJ2GkUpazIt3H+QIPAMHwqQ7tMKaHz1Qj+rJJCqljnf4p3A==}
@ -8798,6 +8828,18 @@ packages:
resolution: {integrity: sha512-7KxauUdBmSdWnmpaGFg+ppNjKF8uNLry8LyzjauQDOVONfFLNKrKvQOxZ/VuTIcS/gge/YNahf5RIIQWTSarlg==}
engines: {node: ^12.13.0 || ^14.15.0 || >=16.0.0}
ws@8.17.1:
resolution: {integrity: sha512-6XQFvXTkbfUOZOKKILFG1PDK2NDQs4azKQl26T0YS5CxqWLgXajbPZ+h4gZekJyRqFU8pvnbAbbs/3TgRPy+GQ==}
engines: {node: '>=10.0.0'}
peerDependencies:
bufferutil: ^4.0.1
utf-8-validate: '>=5.0.2'
peerDependenciesMeta:
bufferutil:
optional: true
utf-8-validate:
optional: true
ws@8.18.1:
resolution: {integrity: sha512-RKW2aJZMXeMxVpnZ6bck+RswznaxmzdULiBr6KY7XkTnW8uvt0iT9H5DkHUChXrc+uurzwa0rVI16n/Xzjdz1w==}
engines: {node: '>=10.0.0'}
@ -8814,6 +8856,10 @@ packages:
resolution: {integrity: sha512-ICP2e+jsHvAj2E2lIHxa5tjXRlKDJo4IdvPvCXbXQGdzSfmSpNVyIKMvoZHjDY9DP0zV17iI85o90vRFXNccRw==}
engines: {node: '>=12'}
xmlhttprequest-ssl@2.1.2:
resolution: {integrity: sha512-TEU+nJVUUnA4CYJFLvK5X9AOeH4KvDvhIfm0vV1GaQRtchnG0hgK5p8hw/xjv8cunWYCsiPCSDzObPyhEwq3KQ==}
engines: {node: '>=0.4.0'}
xtend@4.0.2:
resolution: {integrity: sha512-LKYU1iAXJXUgAXn9URjiu+MWhyUXHsvfp7mcuYm9dSUKK0/CjtrUwFAxD82/mCWbtLsGjFIad0wIsod4zrTAEQ==}
engines: {node: '>=0.4'}
@ -11306,6 +11352,8 @@ snapshots:
dependencies:
'@sinonjs/commons': 3.0.1
'@socket.io/component-emitter@3.1.2': {}
'@storybook/addon-actions@8.5.0(storybook@8.5.0)':
dependencies:
'@storybook/global': 5.0.0
@ -13612,6 +13660,10 @@ snapshots:
dependencies:
ms: 2.1.3
debug@4.3.7:
dependencies:
ms: 2.1.3
debug@4.4.0:
dependencies:
ms: 2.1.3
@ -13802,6 +13854,20 @@ snapshots:
fast-json-parse: 1.0.3
objectorarray: 1.0.5
engine.io-client@6.6.3:
dependencies:
'@socket.io/component-emitter': 3.1.2
debug: 4.3.7
engine.io-parser: 5.2.3
ws: 8.17.1
xmlhttprequest-ssl: 2.1.2
transitivePeerDependencies:
- bufferutil
- supports-color
- utf-8-validate
engine.io-parser@5.2.3: {}
enhanced-resolve@5.18.1:
dependencies:
graceful-fs: 4.2.11
@ -18150,6 +18216,24 @@ snapshots:
ansi-styles: 6.2.1
is-fullwidth-code-point: 5.0.0
socket.io-client@4.8.1:
dependencies:
'@socket.io/component-emitter': 3.1.2
debug: 4.3.7
engine.io-client: 6.6.3
socket.io-parser: 4.2.4
transitivePeerDependencies:
- bufferutil
- supports-color
- utf-8-validate
socket.io-parser@4.2.4:
dependencies:
'@socket.io/component-emitter': 3.1.2
debug: 4.3.7
transitivePeerDependencies:
- supports-color
sortablejs@1.15.6: {}
source-map-js@1.2.1: {}
@ -19117,10 +19201,14 @@ snapshots:
imurmurhash: 0.1.4
signal-exit: 3.0.7
ws@8.17.1: {}
ws@8.18.1: {}
xml-name-validator@4.0.0: {}
xmlhttprequest-ssl@2.1.2: {}
xtend@4.0.2: {}
y18n@5.0.8: {}

View File

@ -0,0 +1,54 @@
'use client'
import type { Socket } from 'socket.io-client'
import { io } from 'socket.io-client'
let socket: Socket | null = null
/**
* Connect to the online user websocket server.
* @param appId The app id to join a specific room or namespace.
* @returns The socket instance.
*/
export function connectOnlineUserWebSocket(appId: string): Socket {
// If already connected, disconnect first
if (socket)
socket.disconnect()
const url = process.env.NEXT_PUBLIC_SOCKET_URL || 'ws://localhost:5001'
socket = io(url, {
path: '/socket.io',
transports: ['websocket'],
query: { app_id: appId },
withCredentials: true,
})
// Add your event listeners here
socket.on('connect', () => {
console.log('WebSocket connected')
})
socket.on('disconnect', () => {
console.log('WebSocket disconnected')
})
socket.on('online_users', (data) => {
console.log('Online users:', data)
})
socket.on('connect_error', (err) => {
console.error('WebSocket connection error:', err)
})
return socket
}
/**
* Disconnect the websocket connection.
*/
export function disconnectOnlineUserWebSocket() {
if (socket) {
socket.disconnect()
socket = null
}
}