fix: workflow online users polling for large app lists (#35786)

Co-authored-by: autofix-ci[bot] <114827586+autofix-ci[bot]@users.noreply.github.com>
This commit is contained in:
非法操作 2026-05-05 18:48:14 +08:00 committed by GitHub
parent a0af10abc8
commit c0431ec843
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 94 additions and 43 deletions

View File

@ -60,7 +60,8 @@ _file_access_controller = DatabaseFileAccessController()
LISTENING_RETRY_IN = 2000
DEFAULT_REF_TEMPLATE_SWAGGER_2_0 = "#/definitions/{model}"
RESTORE_SOURCE_WORKFLOW_MUST_BE_PUBLISHED_MESSAGE = "source workflow must be published"
MAX_WORKFLOW_ONLINE_USERS_QUERY_IDS = 50
MAX_WORKFLOW_ONLINE_USERS_REQUEST_IDS = 1000
WORKFLOW_ONLINE_USERS_REDIS_BATCH_SIZE = 50
# Register models for flask_restx to avoid dict type issues in Swagger
# Register in dependency order: base models first, then dependent models
@ -158,8 +159,13 @@ class WorkflowFeaturesPayload(BaseModel):
features: dict[str, Any] = Field(..., description="Workflow feature configuration")
class WorkflowOnlineUsersQuery(BaseModel):
app_ids: str = Field(..., description="Comma-separated app IDs")
class WorkflowOnlineUsersPayload(BaseModel):
app_ids: list[str] = Field(default_factory=list, description="App IDs")
@field_validator("app_ids")
@classmethod
def normalize_app_ids(cls, app_ids: list[str]) -> list[str]:
return list(dict.fromkeys(app_id.strip() for app_id in app_ids if app_id.strip()))
class DraftWorkflowTriggerRunPayload(BaseModel):
@ -186,7 +192,7 @@ reg(ConvertToWorkflowPayload)
reg(WorkflowListQuery)
reg(WorkflowUpdatePayload)
reg(WorkflowFeaturesPayload)
reg(WorkflowOnlineUsersQuery)
reg(WorkflowOnlineUsersPayload)
reg(DraftWorkflowTriggerRunPayload)
reg(DraftWorkflowTriggerRunAllPayload)
@ -1384,19 +1390,19 @@ class DraftWorkflowTriggerRunAllApi(Resource):
@console_ns.route("/apps/workflows/online-users")
class WorkflowOnlineUsersApi(Resource):
@console_ns.expect(console_ns.models[WorkflowOnlineUsersQuery.__name__])
@console_ns.expect(console_ns.models[WorkflowOnlineUsersPayload.__name__])
@console_ns.doc("get_workflow_online_users")
@console_ns.doc(description="Get workflow online users")
@setup_required
@login_required
@account_initialization_required
@marshal_with(online_user_list_fields)
def get(self):
args = WorkflowOnlineUsersQuery.model_validate(request.args.to_dict(flat=True)) # type: ignore
def post(self):
args = WorkflowOnlineUsersPayload.model_validate(console_ns.payload or {})
app_ids = list(dict.fromkeys(app_id.strip() for app_id in args.app_ids.split(",") if app_id.strip()))
if len(app_ids) > MAX_WORKFLOW_ONLINE_USERS_QUERY_IDS:
raise BadRequest(f"Maximum {MAX_WORKFLOW_ONLINE_USERS_QUERY_IDS} app_ids are allowed per request.")
app_ids = args.app_ids
if len(app_ids) > MAX_WORKFLOW_ONLINE_USERS_REQUEST_IDS:
raise BadRequest(f"Maximum {MAX_WORKFLOW_ONLINE_USERS_REQUEST_IDS} app_ids are allowed per request.")
if not app_ids:
return {"data": []}
@ -1404,13 +1410,24 @@ class WorkflowOnlineUsersApi(Resource):
_, current_tenant_id = current_account_with_tenant()
workflow_service = WorkflowService()
accessible_app_ids = workflow_service.get_accessible_app_ids(app_ids, current_tenant_id)
ordered_accessible_app_ids = [app_id for app_id in app_ids if app_id in accessible_app_ids]
users_json_by_app_id: dict[str, Any] = {}
for start_index in range(0, len(ordered_accessible_app_ids), WORKFLOW_ONLINE_USERS_REDIS_BATCH_SIZE):
app_id_batch = ordered_accessible_app_ids[
start_index : start_index + WORKFLOW_ONLINE_USERS_REDIS_BATCH_SIZE
]
pipe = redis_client.pipeline(transaction=False)
for app_id in app_id_batch:
pipe.hgetall(f"{WORKFLOW_ONLINE_USERS_PREFIX}{app_id}")
users_json_batch = pipe.execute()
for app_id, users_json in zip(app_id_batch, users_json_batch):
users_json_by_app_id[app_id] = users_json
results = []
for app_id in app_ids:
if app_id not in accessible_app_ids:
continue
users_json = redis_client.hgetall(f"{WORKFLOW_ONLINE_USERS_PREFIX}{app_id}")
for app_id in ordered_accessible_app_ids:
users_json = users_json_by_app_id.get(app_id, {})
users = []
for _, user_info_json in users_json.items():

View File

@ -363,7 +363,8 @@ def test_workflow_online_users_filters_inaccessible_workflow(app, monkeypatch: p
)
monkeypatch.setattr(workflow_module.file_helpers, "get_signed_file_url", sign_avatar)
workflow_module.redis_client.hgetall.side_effect = lambda key: (
redis_pipeline = Mock()
redis_pipeline.execute.return_value = [
{
b"sid-1": json.dumps(
{
@ -374,16 +375,16 @@ def test_workflow_online_users_filters_inaccessible_workflow(app, monkeypatch: p
}
)
}
if key == f"{workflow_module.WORKFLOW_ONLINE_USERS_PREFIX}{app_id_1}"
else {}
)
]
workflow_module.redis_client.pipeline.return_value = redis_pipeline
api = workflow_module.WorkflowOnlineUsersApi()
handler = _unwrap(api.get)
handler = _unwrap(api.post)
with app.test_request_context(
f"/apps/workflows/online-users?app_ids={app_id_1},{app_id_2}",
method="GET",
"/apps/workflows/online-users",
method="POST",
json={"app_ids": [app_id_1, app_id_2]},
):
response = handler(api)
@ -402,12 +403,43 @@ def test_workflow_online_users_filters_inaccessible_workflow(app, monkeypatch: p
}
]
}
workflow_module.redis_client.hgetall.assert_called_once_with(
f"{workflow_module.WORKFLOW_ONLINE_USERS_PREFIX}{app_id_1}"
)
workflow_module.redis_client.pipeline.assert_called_once_with(transaction=False)
redis_pipeline.hgetall.assert_called_once_with(f"{workflow_module.WORKFLOW_ONLINE_USERS_PREFIX}{app_id_1}")
redis_pipeline.execute.assert_called_once_with()
sign_avatar.assert_called_once_with("avatar-file-id")
def test_workflow_online_users_batches_redis_reads(app, monkeypatch: pytest.MonkeyPatch) -> None:
app_ids = [f"wf-{index}" for index in range(workflow_module.WORKFLOW_ONLINE_USERS_REDIS_BATCH_SIZE + 1)]
monkeypatch.setattr(workflow_module, "current_account_with_tenant", lambda: (SimpleNamespace(), "tenant-1"))
monkeypatch.setattr(
workflow_module,
"WorkflowService",
lambda: SimpleNamespace(get_accessible_app_ids=lambda app_ids, tenant_id: set(app_ids)),
)
first_pipeline = Mock()
first_pipeline.execute.return_value = [{} for _ in range(workflow_module.WORKFLOW_ONLINE_USERS_REDIS_BATCH_SIZE)]
second_pipeline = Mock()
second_pipeline.execute.return_value = [{}]
workflow_module.redis_client.pipeline.side_effect = [first_pipeline, second_pipeline]
api = workflow_module.WorkflowOnlineUsersApi()
handler = _unwrap(api.post)
with app.test_request_context(
"/apps/workflows/online-users",
method="POST",
json={"app_ids": app_ids},
):
response = handler(api)
assert len(response["data"]) == len(app_ids)
assert workflow_module.redis_client.pipeline.call_count == 2
assert first_pipeline.hgetall.call_count == workflow_module.WORKFLOW_ONLINE_USERS_REDIS_BATCH_SIZE
assert second_pipeline.hgetall.call_count == 1
def test_workflow_online_users_rejects_excessive_workflow_ids(app, monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.setattr(workflow_module, "current_account_with_tenant", lambda: (SimpleNamespace(), "tenant-1"))
accessible_app_ids = Mock(return_value=set())
@ -417,14 +449,15 @@ def test_workflow_online_users_rejects_excessive_workflow_ids(app, monkeypatch:
lambda: SimpleNamespace(get_accessible_app_ids=accessible_app_ids),
)
excessive_ids = ",".join(f"wf-{index}" for index in range(workflow_module.MAX_WORKFLOW_ONLINE_USERS_QUERY_IDS + 1))
excessive_ids = [f"wf-{index}" for index in range(workflow_module.MAX_WORKFLOW_ONLINE_USERS_REQUEST_IDS + 1)]
api = workflow_module.WorkflowOnlineUsersApi()
handler = _unwrap(api.get)
handler = _unwrap(api.post)
with app.test_request_context(
f"/apps/workflows/online-users?app_ids={excessive_ids}",
method="GET",
"/apps/workflows/online-users",
method="POST",
json={"app_ids": excessive_ids},
):
with pytest.raises(HTTPException) as exc:
handler(api)

View File

@ -187,15 +187,16 @@ const List: FC<Props> = ({
}, [isCreatedByMe, setQuery])
const pages = useMemo(() => data?.pages ?? [], [data?.pages])
const appIds = useMemo(() => {
const ids = new Set<string>()
pages.forEach((page) => {
page.data?.forEach((app) => {
if (app.id)
ids.add(app.id)
const workflowOnlineUserAppIds = useMemo(() => {
const appIds = new Set<string>()
pages.forEach(({ data: apps }) => {
apps.forEach((app) => {
if (app.mode === AppModeEnum.WORKFLOW || app.mode === AppModeEnum.ADVANCED_CHAT)
appIds.add(app.id)
})
})
return Array.from(ids)
return Array.from(appIds)
}, [pages])
const refreshWorkflowOnlineUsers = useCallback(async () => {
@ -204,19 +205,19 @@ const List: FC<Props> = ({
return
}
if (!appIds.length) {
if (!workflowOnlineUserAppIds.length) {
setWorkflowOnlineUsersMap({})
return
}
try {
const onlineUsersMap = await fetchWorkflowOnlineUsers({ appIds })
const onlineUsersMap = await fetchWorkflowOnlineUsers({ appIds: workflowOnlineUserAppIds })
setWorkflowOnlineUsersMap(onlineUsersMap)
}
catch {
setWorkflowOnlineUsersMap({})
}
}, [appIds, systemFeatures.enable_collaboration_mode])
}, [systemFeatures.enable_collaboration_mode, workflowOnlineUserAppIds])
useEffect(() => {
void refreshWorkflowOnlineUsers()

View File

@ -17,11 +17,11 @@ export const appDeleteContract = base
export const workflowOnlineUsersContract = base
.route({
path: '/apps/workflows/online-users',
method: 'GET',
method: 'POST',
})
.input(type<{
query: {
app_ids: string
body: {
app_ids: string[]
}
}>())
.output(type<WorkflowOnlineUsersResponse>())

View File

@ -14,7 +14,7 @@ export const fetchWorkflowOnlineUsers = async ({ appIds }: { appIds: string[] })
return {}
const response = await consoleClient.apps.workflowOnlineUsers({
query: { app_ids: appIds.join(',') },
body: { app_ids: appIds },
})
if (!response?.data)