diff --git a/api/controllers/console/app/workflow.py b/api/controllers/console/app/workflow.py index 478f783eb0..68dd8b7a8d 100644 --- a/api/controllers/console/app/workflow.py +++ b/api/controllers/console/app/workflow.py @@ -60,7 +60,8 @@ _file_access_controller = DatabaseFileAccessController() LISTENING_RETRY_IN = 2000 DEFAULT_REF_TEMPLATE_SWAGGER_2_0 = "#/definitions/{model}" RESTORE_SOURCE_WORKFLOW_MUST_BE_PUBLISHED_MESSAGE = "source workflow must be published" -MAX_WORKFLOW_ONLINE_USERS_QUERY_IDS = 50 +MAX_WORKFLOW_ONLINE_USERS_REQUEST_IDS = 1000 +WORKFLOW_ONLINE_USERS_REDIS_BATCH_SIZE = 50 # Register models for flask_restx to avoid dict type issues in Swagger # Register in dependency order: base models first, then dependent models @@ -158,8 +159,13 @@ class WorkflowFeaturesPayload(BaseModel): features: dict[str, Any] = Field(..., description="Workflow feature configuration") -class WorkflowOnlineUsersQuery(BaseModel): - app_ids: str = Field(..., description="Comma-separated app IDs") +class WorkflowOnlineUsersPayload(BaseModel): + app_ids: list[str] = Field(default_factory=list, description="App IDs") + + @field_validator("app_ids") + @classmethod + def normalize_app_ids(cls, app_ids: list[str]) -> list[str]: + return list(dict.fromkeys(app_id.strip() for app_id in app_ids if app_id.strip())) class DraftWorkflowTriggerRunPayload(BaseModel): @@ -186,7 +192,7 @@ reg(ConvertToWorkflowPayload) reg(WorkflowListQuery) reg(WorkflowUpdatePayload) reg(WorkflowFeaturesPayload) -reg(WorkflowOnlineUsersQuery) +reg(WorkflowOnlineUsersPayload) reg(DraftWorkflowTriggerRunPayload) reg(DraftWorkflowTriggerRunAllPayload) @@ -1384,19 +1390,19 @@ class DraftWorkflowTriggerRunAllApi(Resource): @console_ns.route("/apps/workflows/online-users") class WorkflowOnlineUsersApi(Resource): - @console_ns.expect(console_ns.models[WorkflowOnlineUsersQuery.__name__]) + @console_ns.expect(console_ns.models[WorkflowOnlineUsersPayload.__name__]) @console_ns.doc("get_workflow_online_users") @console_ns.doc(description="Get workflow online users") @setup_required @login_required @account_initialization_required @marshal_with(online_user_list_fields) - def get(self): - args = WorkflowOnlineUsersQuery.model_validate(request.args.to_dict(flat=True)) # type: ignore + def post(self): + args = WorkflowOnlineUsersPayload.model_validate(console_ns.payload or {}) - app_ids = list(dict.fromkeys(app_id.strip() for app_id in args.app_ids.split(",") if app_id.strip())) - if len(app_ids) > MAX_WORKFLOW_ONLINE_USERS_QUERY_IDS: - raise BadRequest(f"Maximum {MAX_WORKFLOW_ONLINE_USERS_QUERY_IDS} app_ids are allowed per request.") + app_ids = args.app_ids + if len(app_ids) > MAX_WORKFLOW_ONLINE_USERS_REQUEST_IDS: + raise BadRequest(f"Maximum {MAX_WORKFLOW_ONLINE_USERS_REQUEST_IDS} app_ids are allowed per request.") if not app_ids: return {"data": []} @@ -1404,13 +1410,24 @@ class WorkflowOnlineUsersApi(Resource): _, current_tenant_id = current_account_with_tenant() workflow_service = WorkflowService() accessible_app_ids = workflow_service.get_accessible_app_ids(app_ids, current_tenant_id) + ordered_accessible_app_ids = [app_id for app_id in app_ids if app_id in accessible_app_ids] + + users_json_by_app_id: dict[str, Any] = {} + for start_index in range(0, len(ordered_accessible_app_ids), WORKFLOW_ONLINE_USERS_REDIS_BATCH_SIZE): + app_id_batch = ordered_accessible_app_ids[ + start_index : start_index + WORKFLOW_ONLINE_USERS_REDIS_BATCH_SIZE + ] + pipe = redis_client.pipeline(transaction=False) + for app_id in app_id_batch: + pipe.hgetall(f"{WORKFLOW_ONLINE_USERS_PREFIX}{app_id}") + + users_json_batch = pipe.execute() + for app_id, users_json in zip(app_id_batch, users_json_batch): + users_json_by_app_id[app_id] = users_json results = [] - for app_id in app_ids: - if app_id not in accessible_app_ids: - continue - - users_json = redis_client.hgetall(f"{WORKFLOW_ONLINE_USERS_PREFIX}{app_id}") + for app_id in ordered_accessible_app_ids: + users_json = users_json_by_app_id.get(app_id, {}) users = [] for _, user_info_json in users_json.items(): diff --git a/api/tests/unit_tests/controllers/console/app/test_workflow.py b/api/tests/unit_tests/controllers/console/app/test_workflow.py index e91c0a0597..7c470eb9a8 100644 --- a/api/tests/unit_tests/controllers/console/app/test_workflow.py +++ b/api/tests/unit_tests/controllers/console/app/test_workflow.py @@ -363,7 +363,8 @@ def test_workflow_online_users_filters_inaccessible_workflow(app, monkeypatch: p ) monkeypatch.setattr(workflow_module.file_helpers, "get_signed_file_url", sign_avatar) - workflow_module.redis_client.hgetall.side_effect = lambda key: ( + redis_pipeline = Mock() + redis_pipeline.execute.return_value = [ { b"sid-1": json.dumps( { @@ -374,16 +375,16 @@ def test_workflow_online_users_filters_inaccessible_workflow(app, monkeypatch: p } ) } - if key == f"{workflow_module.WORKFLOW_ONLINE_USERS_PREFIX}{app_id_1}" - else {} - ) + ] + workflow_module.redis_client.pipeline.return_value = redis_pipeline api = workflow_module.WorkflowOnlineUsersApi() - handler = _unwrap(api.get) + handler = _unwrap(api.post) with app.test_request_context( - f"/apps/workflows/online-users?app_ids={app_id_1},{app_id_2}", - method="GET", + "/apps/workflows/online-users", + method="POST", + json={"app_ids": [app_id_1, app_id_2]}, ): response = handler(api) @@ -402,12 +403,43 @@ def test_workflow_online_users_filters_inaccessible_workflow(app, monkeypatch: p } ] } - workflow_module.redis_client.hgetall.assert_called_once_with( - f"{workflow_module.WORKFLOW_ONLINE_USERS_PREFIX}{app_id_1}" - ) + workflow_module.redis_client.pipeline.assert_called_once_with(transaction=False) + redis_pipeline.hgetall.assert_called_once_with(f"{workflow_module.WORKFLOW_ONLINE_USERS_PREFIX}{app_id_1}") + redis_pipeline.execute.assert_called_once_with() sign_avatar.assert_called_once_with("avatar-file-id") +def test_workflow_online_users_batches_redis_reads(app, monkeypatch: pytest.MonkeyPatch) -> None: + app_ids = [f"wf-{index}" for index in range(workflow_module.WORKFLOW_ONLINE_USERS_REDIS_BATCH_SIZE + 1)] + monkeypatch.setattr(workflow_module, "current_account_with_tenant", lambda: (SimpleNamespace(), "tenant-1")) + monkeypatch.setattr( + workflow_module, + "WorkflowService", + lambda: SimpleNamespace(get_accessible_app_ids=lambda app_ids, tenant_id: set(app_ids)), + ) + + first_pipeline = Mock() + first_pipeline.execute.return_value = [{} for _ in range(workflow_module.WORKFLOW_ONLINE_USERS_REDIS_BATCH_SIZE)] + second_pipeline = Mock() + second_pipeline.execute.return_value = [{}] + workflow_module.redis_client.pipeline.side_effect = [first_pipeline, second_pipeline] + + api = workflow_module.WorkflowOnlineUsersApi() + handler = _unwrap(api.post) + + with app.test_request_context( + "/apps/workflows/online-users", + method="POST", + json={"app_ids": app_ids}, + ): + response = handler(api) + + assert len(response["data"]) == len(app_ids) + assert workflow_module.redis_client.pipeline.call_count == 2 + assert first_pipeline.hgetall.call_count == workflow_module.WORKFLOW_ONLINE_USERS_REDIS_BATCH_SIZE + assert second_pipeline.hgetall.call_count == 1 + + def test_workflow_online_users_rejects_excessive_workflow_ids(app, monkeypatch: pytest.MonkeyPatch) -> None: monkeypatch.setattr(workflow_module, "current_account_with_tenant", lambda: (SimpleNamespace(), "tenant-1")) accessible_app_ids = Mock(return_value=set()) @@ -417,14 +449,15 @@ def test_workflow_online_users_rejects_excessive_workflow_ids(app, monkeypatch: lambda: SimpleNamespace(get_accessible_app_ids=accessible_app_ids), ) - excessive_ids = ",".join(f"wf-{index}" for index in range(workflow_module.MAX_WORKFLOW_ONLINE_USERS_QUERY_IDS + 1)) + excessive_ids = [f"wf-{index}" for index in range(workflow_module.MAX_WORKFLOW_ONLINE_USERS_REQUEST_IDS + 1)] api = workflow_module.WorkflowOnlineUsersApi() - handler = _unwrap(api.get) + handler = _unwrap(api.post) with app.test_request_context( - f"/apps/workflows/online-users?app_ids={excessive_ids}", - method="GET", + "/apps/workflows/online-users", + method="POST", + json={"app_ids": excessive_ids}, ): with pytest.raises(HTTPException) as exc: handler(api) diff --git a/web/app/components/apps/list.tsx b/web/app/components/apps/list.tsx index d1bdf533fe..b744fe77aa 100644 --- a/web/app/components/apps/list.tsx +++ b/web/app/components/apps/list.tsx @@ -187,15 +187,16 @@ const List: FC = ({ }, [isCreatedByMe, setQuery]) const pages = useMemo(() => data?.pages ?? [], [data?.pages]) - const appIds = useMemo(() => { - const ids = new Set() - pages.forEach((page) => { - page.data?.forEach((app) => { - if (app.id) - ids.add(app.id) + + const workflowOnlineUserAppIds = useMemo(() => { + const appIds = new Set() + pages.forEach(({ data: apps }) => { + apps.forEach((app) => { + if (app.mode === AppModeEnum.WORKFLOW || app.mode === AppModeEnum.ADVANCED_CHAT) + appIds.add(app.id) }) }) - return Array.from(ids) + return Array.from(appIds) }, [pages]) const refreshWorkflowOnlineUsers = useCallback(async () => { @@ -204,19 +205,19 @@ const List: FC = ({ return } - if (!appIds.length) { + if (!workflowOnlineUserAppIds.length) { setWorkflowOnlineUsersMap({}) return } try { - const onlineUsersMap = await fetchWorkflowOnlineUsers({ appIds }) + const onlineUsersMap = await fetchWorkflowOnlineUsers({ appIds: workflowOnlineUserAppIds }) setWorkflowOnlineUsersMap(onlineUsersMap) } catch { setWorkflowOnlineUsersMap({}) } - }, [appIds, systemFeatures.enable_collaboration_mode]) + }, [systemFeatures.enable_collaboration_mode, workflowOnlineUserAppIds]) useEffect(() => { void refreshWorkflowOnlineUsers() diff --git a/web/contract/console/apps.ts b/web/contract/console/apps.ts index ff4f5096b2..2f5f16c25e 100644 --- a/web/contract/console/apps.ts +++ b/web/contract/console/apps.ts @@ -17,11 +17,11 @@ export const appDeleteContract = base export const workflowOnlineUsersContract = base .route({ path: '/apps/workflows/online-users', - method: 'GET', + method: 'POST', }) .input(type<{ - query: { - app_ids: string + body: { + app_ids: string[] } }>()) .output(type()) diff --git a/web/service/apps.ts b/web/service/apps.ts index d2c6593a34..221e83cf39 100644 --- a/web/service/apps.ts +++ b/web/service/apps.ts @@ -14,7 +14,7 @@ export const fetchWorkflowOnlineUsers = async ({ appIds }: { appIds: string[] }) return {} const response = await consoleClient.apps.workflowOnlineUsers({ - query: { app_ids: appIds.join(',') }, + body: { app_ids: appIds }, }) if (!response?.data)