diff --git a/api/app.py b/api/app.py index 080df69b57..47b59dc3a5 100644 --- a/api/app.py +++ b/api/app.py @@ -39,11 +39,10 @@ else: if __name__ == "__main__": from gevent import pywsgi - from geventwebsocket.handler import WebSocketHandler host = os.environ.get('HOST', '0.0.0.0') port = int(os.environ.get('PORT', 5001)) print(f"Starting server on {host}:{port}") - server = pywsgi.WSGIServer((host, port), socketio_app, handler_class=WebSocketHandler) + server = pywsgi.WSGIServer((host, port), socketio_app) server.serve_forever() diff --git a/api/controllers/console/app/online_user.py b/api/controllers/console/app/online_user.py index 7d5653dd14..9727c62871 100644 --- a/api/controllers/console/app/online_user.py +++ b/api/controllers/console/app/online_user.py @@ -1,14 +1,44 @@ import json import time -from flask_restful import Resource, marshal_with, reqparse - -from controllers.console import api -from controllers.console.wraps import account_initialization_required, setup_required from extensions.ext_redis import redis_client from extensions.ext_socketio import sio -from fields.online_user_fields import online_user_list_fields -from libs.login import login_required +from libs.passport import PassportService +from services.account_service import AccountService + + +@sio.on('connect') +def socket_connect(sid, environ, auth): + """ + WebSocket connect event, do authentication here. + """ + token = None + if auth and isinstance(auth, dict): + token = auth.get('token') + if not token: + return False + + try: + decoded = PassportService().verify(token) + user_id = decoded.get("user_id") + if not user_id: + return False + + with sio.app.app_context(): + user = AccountService.load_logged_in_account(account_id=user_id) + if not user: + return False + + sio.save_session(sid, { + 'user_id': user.id, + 'username': user.name, + 'avatar': user.avatar + }) + + return True + + except Exception: + return False @sio.on("user_connect") @@ -84,36 +114,6 @@ def broadcast_online_users(workflow_id): ) -class OnlineUserApi(Resource): - @setup_required - @login_required - @account_initialization_required - @marshal_with(online_user_list_fields) - def get(self): - parser = reqparse.RequestParser() - parser.add_argument("workflow_ids", type=str, required=True, location="args") - args = parser.parse_args() - - workflow_ids = [id.strip() for id in args["workflow_ids"].split(",")] - - results = {} - for workflow_id in workflow_ids: - users_json = redis_client.hgetall(f"workflow_online_users:{workflow_id}") - - users = [] - for _, user_info_json in users_json.items(): - try: - users.append(json.loads(user_info_json)) - except Exception: - continue - results[workflow_id] = users - - return {"data": results} - - -api.add_resource(OnlineUserApi, "/online-users") - - @sio.on("collaboration_event") def handle_collaboration_event(sid, data): """ diff --git a/api/controllers/console/app/workflow.py b/api/controllers/console/app/workflow.py index a9f088a276..d95be3665e 100644 --- a/api/controllers/console/app/workflow.py +++ b/api/controllers/console/app/workflow.py @@ -25,6 +25,7 @@ from core.app.entities.app_invoke_entities import InvokeFrom from core.file.models import File from extensions.ext_database import db from factories import file_factory, variable_factory +from fields.online_user_fields import online_user_list_fields from fields.workflow_fields import workflow_fields, workflow_pagination_fields from fields.workflow_run_fields import workflow_run_node_execution_fields from libs import helper @@ -788,6 +789,32 @@ class DraftWorkflowNodeLastRunApi(Resource): raise NotFound("last run not found") return node_exec +class WorkflowOnlineUsersApi(Resource): + @setup_required + @login_required + @account_initialization_required + @marshal_with(online_user_list_fields) + def get(self): + parser = reqparse.RequestParser() + parser.add_argument("workflow_ids", type=str, required=True, location="args") + args = parser.parse_args() + + workflow_ids = [id.strip() for id in args["workflow_ids"].split(",")] + + results = {} + for workflow_id in workflow_ids: + users_json = redis_client.hgetall(f"workflow_online_users:{workflow_id}") + + users = [] + for _, user_info_json in users_json.items(): + try: + users.append(json.loads(user_info_json)) + except Exception: + continue + results[workflow_id] = users + + return {"data": results} + api.add_resource( DraftWorkflowApi, @@ -857,3 +884,4 @@ api.add_resource( DraftWorkflowNodeLastRunApi, "/apps//workflows/draft/nodes//last-run", ) +api.add_resource(WorkflowOnlineUsersApi, "/apps/workflows/online-users") diff --git a/api/extensions/ext_login.py b/api/extensions/ext_login.py index 72069465fc..11d1856ac4 100644 --- a/api/extensions/ext_login.py +++ b/api/extensions/ext_login.py @@ -8,7 +8,6 @@ from werkzeug.exceptions import NotFound, Unauthorized from configs import dify_config from dify_app import DifyApp from extensions.ext_database import db -from extensions.ext_socketio import sio from libs.passport import PassportService from models.account import Account, Tenant, TenantAccountJoin from models.model import AppMCPServer, EndUser @@ -114,39 +113,5 @@ def unauthorized_handler(): ) -@sio.on('connect') -def socket_connect(sid, environ, auth): - """ - WebSocket connect event, do authentication here. - """ - token = None - if auth and isinstance(auth, dict): - token = auth.get('token') - if not token: - return False - - try: - decoded = PassportService().verify(token) - user_id = decoded.get("user_id") - if not user_id: - return False - - with sio.app.app_context(): - user = AccountService.load_logged_in_account(account_id=user_id) - if not user: - return False - - sio.save_session(sid, { - 'user_id': user.id, - 'username': user.name, - 'avatar': user.avatar - }) - - return True - - except Exception: - return False - - def init_app(app: DifyApp): login_manager.init_app(app) diff --git a/api/pyproject.toml b/api/pyproject.toml index ede97ac197..9f4ea9c4e7 100644 --- a/api/pyproject.toml +++ b/api/pyproject.toml @@ -19,10 +19,8 @@ dependencies = [ "flask-login~=0.6.3", "flask-migrate~=4.0.7", "flask-restful~=0.3.10", - "flask-socketio~=5.5.1", "flask-sqlalchemy~=3.1.1", "gevent~=24.11.1", - "gevent-websocket~=0.10.1", "gmpy2~=2.2.1", "google-api-core==2.18.0", "google-api-python-client==2.90.0", @@ -71,6 +69,7 @@ dependencies = [ "pypdfium2==4.30.0", "python-docx~=1.1.0", "python-dotenv==1.0.1", + "python-socketio~=5.13.0", "pyyaml~=6.0.1", "readabilipy~=0.3.0", "redis[hiredis]~=6.1.0", diff --git a/api/uv.lock b/api/uv.lock index c98b8b7df8..36740ccd3c 100644 --- a/api/uv.lock +++ b/api/uv.lock @@ -1244,10 +1244,8 @@ dependencies = [ { name = "flask-login" }, { name = "flask-migrate" }, { name = "flask-restful" }, - { name = "flask-socketio" }, { name = "flask-sqlalchemy" }, { name = "gevent" }, - { name = "gevent-websocket" }, { name = "gmpy2" }, { name = "google-api-core" }, { name = "google-api-python-client" }, @@ -1295,6 +1293,7 @@ dependencies = [ { name = "pypdfium2" }, { name = "python-docx" }, { name = "python-dotenv" }, + { name = "python-socketio" }, { name = "pyyaml" }, { name = "readabilipy" }, { name = "redis", extra = ["hiredis"] }, @@ -1428,10 +1427,8 @@ requires-dist = [ { name = "flask-login", specifier = "~=0.6.3" }, { name = "flask-migrate", specifier = "~=4.0.7" }, { name = "flask-restful", specifier = "~=0.3.10" }, - { name = "flask-socketio", specifier = "~=5.5.1" }, { name = "flask-sqlalchemy", specifier = "~=3.1.1" }, { name = "gevent", specifier = "~=24.11.1" }, - { name = "gevent-websocket", specifier = "~=0.10.1" }, { name = "gmpy2", specifier = "~=2.2.1" }, { name = "google-api-core", specifier = "==2.18.0" }, { name = "google-api-python-client", specifier = "==2.90.0" }, @@ -1479,6 +1476,7 @@ requires-dist = [ { name = "pypdfium2", specifier = "==4.30.0" }, { name = "python-docx", specifier = "~=1.1.0" }, { name = "python-dotenv", specifier = "==1.0.1" }, + { name = "python-socketio", specifier = "~=5.13.0" }, { name = "pyyaml", specifier = "~=6.0.1" }, { name = "readabilipy", specifier = "~=0.3.0" }, { name = "redis", extras = ["hiredis"], specifier = "~=6.1.0" }, @@ -1844,19 +1842,6 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/d7/7b/f0b45f0df7d2978e5ae51804bb5939b7897b2ace24306009da0cc34d8d1f/Flask_RESTful-0.3.10-py2.py3-none-any.whl", hash = "sha256:1cf93c535172f112e080b0d4503a8d15f93a48c88bdd36dd87269bdaf405051b", size = 26217 }, ] -[[package]] -name = "flask-socketio" -version = "5.5.1" -source = { registry = "https://pypi.org/simple" } -dependencies = [ - { name = "flask" }, - { name = "python-socketio" }, -] -sdist = { url = "https://files.pythonhosted.org/packages/d1/1f/54d3de4982df695682af99c65d4b89f8a46fe6739780c5a68690195835a0/flask_socketio-5.5.1.tar.gz", hash = "sha256:d946c944a1074ccad8e99485a6f5c79bc5789e3ea4df0bb9c864939586c51ec4", size = 37401 } -wheels = [ - { url = "https://files.pythonhosted.org/packages/47/38/1b75b3ba3452860211ec87710f9854112911a436ee4d155533e0b83b5cd9/Flask_SocketIO-5.5.1-py3-none-any.whl", hash = "sha256:35a50166db44d055f68021d6ec32cb96f1f925cd82de4504314be79139ea846f", size = 18259 }, -] - [[package]] name = "flask-sqlalchemy" version = "3.1.1" @@ -1970,18 +1955,6 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/11/b2/5d20664ef6a077bec9f27f7a7ee761edc64946d0b1e293726a3d074a9a18/gevent-24.11.1-cp312-cp312-win_amd64.whl", hash = "sha256:68bee86b6e1c041a187347ef84cf03a792f0b6c7238378bf6ba4118af11feaae", size = 1541631 }, ] -[[package]] -name = "gevent-websocket" -version = "0.10.1" -source = { registry = "https://pypi.org/simple" } -dependencies = [ - { name = "gevent" }, -] -sdist = { url = "https://files.pythonhosted.org/packages/98/d2/6fa19239ff1ab072af40ebf339acd91fb97f34617c2ee625b8e34bf42393/gevent-websocket-0.10.1.tar.gz", hash = "sha256:7eaef32968290c9121f7c35b973e2cc302ffb076d018c9068d2f5ca8b2d85fb0", size = 18366 } -wheels = [ - { url = "https://files.pythonhosted.org/packages/7b/84/2dc373eb6493e00c884cc11e6c059ec97abae2678d42f06bf780570b0193/gevent_websocket-0.10.1-py3-none-any.whl", hash = "sha256:17b67d91282f8f4c973eba0551183fc84f56f1c90c8f6b6b30256f31f66f5242", size = 22987 }, -] - [[package]] name = "gitdb" version = "4.0.12"