fix(collaboration): align online-users keys to app_id

Switch /apps/workflows/online-users request and response schema from workflow_ids/workflow_id to app_ids/app_id without compatibility fallback.

Align app list online avatar lookup and online-user map indexing to app.id, matching socket room identity.

Update backend access checks, API fields, frontend contract/types, and unit tests accordingly.
This commit is contained in:
hjlarry 2026-04-12 16:16:47 +08:00
parent 6b7574023e
commit 3288f5e100
8 changed files with 44 additions and 47 deletions

View File

@ -158,7 +158,7 @@ class WorkflowFeaturesPayload(BaseModel):
class WorkflowOnlineUsersQuery(BaseModel):
workflow_ids: str = Field(..., description="Comma-separated workflow IDs")
app_ids: str = Field(..., description="Comma-separated app IDs")
class DraftWorkflowTriggerRunPayload(BaseModel):
@ -1392,25 +1392,23 @@ class WorkflowOnlineUsersApi(Resource):
def get(self):
args = WorkflowOnlineUsersQuery.model_validate(request.args.to_dict(flat=True)) # type: ignore
workflow_ids = list(
dict.fromkeys(workflow_id.strip() for workflow_id in args.workflow_ids.split(",") if workflow_id.strip())
)
if len(workflow_ids) > MAX_WORKFLOW_ONLINE_USERS_QUERY_IDS:
raise BadRequest(f"Maximum {MAX_WORKFLOW_ONLINE_USERS_QUERY_IDS} workflow_ids are allowed per request.")
app_ids = list(dict.fromkeys(app_id.strip() for app_id in args.app_ids.split(",") if app_id.strip()))
if len(app_ids) > MAX_WORKFLOW_ONLINE_USERS_QUERY_IDS:
raise BadRequest(f"Maximum {MAX_WORKFLOW_ONLINE_USERS_QUERY_IDS} app_ids are allowed per request.")
if not workflow_ids:
if not app_ids:
return {"data": []}
_, current_tenant_id = current_account_with_tenant()
workflow_service = WorkflowService()
accessible_workflow_ids = workflow_service.get_accessible_workflow_ids(workflow_ids, current_tenant_id)
accessible_app_ids = workflow_service.get_accessible_app_ids(app_ids, current_tenant_id)
results = []
for workflow_id in workflow_ids:
if workflow_id not in accessible_workflow_ids:
for app_id in app_ids:
if app_id not in accessible_app_ids:
continue
users_json = redis_client.hgetall(f"{WORKFLOW_ONLINE_USERS_PREFIX}{workflow_id}")
users_json = redis_client.hgetall(f"{WORKFLOW_ONLINE_USERS_PREFIX}{app_id}")
users = []
for _, user_info_json in users_json.items():
@ -1418,6 +1416,6 @@ class WorkflowOnlineUsersApi(Resource):
users.append(json.loads(user_info_json))
except Exception:
continue
results.append({"workflow_id": workflow_id, "users": users})
results.append({"app_id": app_id, "users": users})
return {"data": results}

View File

@ -7,7 +7,7 @@ online_user_partial_fields = {
}
workflow_online_users_fields = {
"workflow_id": fields.String,
"app_id": fields.String,
"users": fields.List(fields.Nested(online_user_partial_fields)),
}

View File

@ -199,15 +199,15 @@ class WorkflowService:
return workflow
def get_accessible_workflow_ids(self, workflow_ids: Sequence[str], tenant_id: str) -> set[str]:
def get_accessible_app_ids(self, app_ids: Sequence[str], tenant_id: str) -> set[str]:
"""
Return workflow IDs that belong to the given tenant.
Return app IDs that belong to the given tenant.
"""
if not workflow_ids:
if not app_ids:
return set()
stmt = select(Workflow.id).where(Workflow.id.in_(workflow_ids), Workflow.tenant_id == tenant_id)
return {str(workflow_id) for workflow_id in db.session.scalars(stmt).all()}
stmt = select(App.id).where(App.id.in_(app_ids), App.tenant_id == tenant_id)
return {str(app_id) for app_id in db.session.scalars(stmt).all()}
def get_all_published_workflow(
self,

View File

@ -296,13 +296,13 @@ def test_advanced_chat_run_conversation_not_exists(app, monkeypatch: pytest.Monk
def test_workflow_online_users_filters_inaccessible_workflow(
app, monkeypatch: pytest.MonkeyPatch
) -> None:
workflow_id_1 = "11111111-1111-1111-1111-111111111111"
workflow_id_2 = "22222222-2222-2222-2222-222222222222"
app_id_1 = "11111111-1111-1111-1111-111111111111"
app_id_2 = "22222222-2222-2222-2222-222222222222"
monkeypatch.setattr(workflow_module, "current_account_with_tenant", lambda: (SimpleNamespace(), "tenant-1"))
monkeypatch.setattr(
workflow_module,
"WorkflowService",
lambda: SimpleNamespace(get_accessible_workflow_ids=lambda workflow_ids, tenant_id: {workflow_id_1}),
lambda: SimpleNamespace(get_accessible_app_ids=lambda app_ids, tenant_id: {app_id_1}),
)
workflow_module.redis_client.hgetall.side_effect = lambda key: (
@ -316,7 +316,7 @@ def test_workflow_online_users_filters_inaccessible_workflow(
}
)
}
if key == f"{workflow_module.WORKFLOW_ONLINE_USERS_PREFIX}{workflow_id_1}"
if key == f"{workflow_module.WORKFLOW_ONLINE_USERS_PREFIX}{app_id_1}"
else {}
)
@ -324,7 +324,7 @@ def test_workflow_online_users_filters_inaccessible_workflow(
handler = _unwrap(api.get)
with app.test_request_context(
f"/apps/workflows/online-users?workflow_ids={workflow_id_1},{workflow_id_2}",
f"/apps/workflows/online-users?app_ids={app_id_1},{app_id_2}",
method="GET",
):
response = handler(api)
@ -332,7 +332,7 @@ def test_workflow_online_users_filters_inaccessible_workflow(
assert response == {
"data": [
{
"workflow_id": workflow_id_1,
"app_id": app_id_1,
"users": [
{
"user_id": "u-1",
@ -345,7 +345,7 @@ def test_workflow_online_users_filters_inaccessible_workflow(
]
}
workflow_module.redis_client.hgetall.assert_called_once_with(
f"{workflow_module.WORKFLOW_ONLINE_USERS_PREFIX}{workflow_id_1}"
f"{workflow_module.WORKFLOW_ONLINE_USERS_PREFIX}{app_id_1}"
)
@ -353,11 +353,11 @@ def test_workflow_online_users_rejects_excessive_workflow_ids(
app, monkeypatch: pytest.MonkeyPatch
) -> None:
monkeypatch.setattr(workflow_module, "current_account_with_tenant", lambda: (SimpleNamespace(), "tenant-1"))
accessible_workflow_ids = Mock(return_value=set())
accessible_app_ids = Mock(return_value=set())
monkeypatch.setattr(
workflow_module,
"WorkflowService",
lambda: SimpleNamespace(get_accessible_workflow_ids=accessible_workflow_ids),
lambda: SimpleNamespace(get_accessible_app_ids=accessible_app_ids),
)
excessive_ids = ",".join(
@ -368,7 +368,7 @@ def test_workflow_online_users_rejects_excessive_workflow_ids(
handler = _unwrap(api.get)
with app.test_request_context(
f"/apps/workflows/online-users?workflow_ids={excessive_ids}",
f"/apps/workflows/online-users?app_ids={excessive_ids}",
method="GET",
):
with pytest.raises(HTTPException) as exc:
@ -376,4 +376,4 @@ def test_workflow_online_users_rejects_excessive_workflow_ids(
assert exc.value.code == 400
assert "Maximum" in exc.value.description
accessible_workflow_ids.assert_not_called()
accessible_app_ids.assert_not_called()

View File

@ -186,30 +186,29 @@ const List: FC<Props> = ({
}, [isCreatedByMe, setQuery])
const pages = data?.pages ?? []
const workflowIds = useMemo(() => {
const appIds = useMemo(() => {
const ids = new Set<string>()
pages.forEach((page) => {
page.data?.forEach((app) => {
const workflowId = app.workflow?.id
if (workflowId)
ids.add(workflowId)
if (app.id)
ids.add(app.id)
})
})
return Array.from(ids)
}, [pages])
const refreshWorkflowOnlineUsers = useCallback(async () => {
if (!workflowIds.length)
if (!appIds.length)
return
try {
const onlineUsersMap = await fetchWorkflowOnlineUsers({ workflowIds })
const onlineUsersMap = await fetchWorkflowOnlineUsers({ appIds })
setWorkflowOnlineUsersMap(onlineUsersMap)
}
catch {
setWorkflowOnlineUsersMap({})
}
}, [workflowIds])
}, [appIds])
useEffect(() => {
void refreshWorkflowOnlineUsers()
@ -286,7 +285,7 @@ const List: FC<Props> = ({
<AppCard
key={app.id}
app={app}
onlineUsers={app.workflow?.id ? (workflowOnlineUsersMap[app.workflow.id] ?? []) : []}
onlineUsers={workflowOnlineUsersMap[app.id] ?? []}
onRefresh={refetch}
/>
))

View File

@ -21,7 +21,7 @@ export const workflowOnlineUsersContract = base
})
.input(type<{
query: {
workflow_ids: string
app_ids: string
}
}>())
.output(type<WorkflowOnlineUsersResponse>())

View File

@ -113,7 +113,7 @@ export type WorkflowOnlineUser = {
export type WorkflowOnlineUsersResponse = {
data: Record<string, WorkflowOnlineUser[]> | Array<{
workflow_id: string
app_id: string
users: WorkflowOnlineUser[]
}>
}

View File

@ -9,12 +9,12 @@ export const fetchAppList = ({ url, params }: { url: string, params?: Record<str
return get<AppListResponse>(url, { params })
}
export const fetchWorkflowOnlineUsers = async ({ workflowIds }: { workflowIds: string[] }): Promise<Record<string, WorkflowOnlineUser[]>> => {
if (!workflowIds.length)
export const fetchWorkflowOnlineUsers = async ({ appIds }: { appIds: string[] }): Promise<Record<string, WorkflowOnlineUser[]>> => {
if (!appIds.length)
return {}
const response = await consoleClient.apps.workflowOnlineUsers({
query: { workflow_ids: workflowIds.join(',') },
query: { app_ids: appIds.join(',') },
})
if (!response?.data)
@ -22,15 +22,15 @@ export const fetchWorkflowOnlineUsers = async ({ workflowIds }: { workflowIds: s
if (Array.isArray(response.data)) {
return response.data.reduce<Record<string, WorkflowOnlineUser[]>>((acc, item) => {
if (item?.workflow_id)
acc[item.workflow_id] = item.users || []
if (item?.app_id)
acc[item.app_id] = item.users || []
return acc
}, {})
}
return Object.entries(response.data).reduce<Record<string, WorkflowOnlineUser[]>>((acc, [workflowId, users]) => {
if (workflowId)
acc[workflowId] = users || []
return Object.entries(response.data).reduce<Record<string, WorkflowOnlineUser[]>>((acc, [appId, users]) => {
if (appId)
acc[appId] = users || []
return acc
}, {})
}