fix(workflow): enforce tenant access in online users query

Validate requested workflow_ids against current tenant before reading collaboration online-user state from Redis.

Move workflow access-id lookup into WorkflowService to keep controller thin and aligned with layering.

Limit query size, and stop exposing sid in REST response fields.

Add unit tests for inaccessible workflow filtering and workflow_ids limit checks.
This commit is contained in:
hjlarry 2026-04-12 16:05:29 +08:00
parent 828276d672
commit 6b7574023e
4 changed files with 113 additions and 2 deletions

View File

@ -59,6 +59,7 @@ _file_access_controller = DatabaseFileAccessController()
LISTENING_RETRY_IN = 2000
DEFAULT_REF_TEMPLATE_SWAGGER_2_0 = "#/definitions/{model}"
RESTORE_SOURCE_WORKFLOW_MUST_BE_PUBLISHED_MESSAGE = "source workflow must be published"
MAX_WORKFLOW_ONLINE_USERS_QUERY_IDS = 50
# Register models for flask_restx to avoid dict type issues in Swagger
# Register in dependency order: base models first, then dependent models
@ -1391,10 +1392,24 @@ class WorkflowOnlineUsersApi(Resource):
def get(self):
args = WorkflowOnlineUsersQuery.model_validate(request.args.to_dict(flat=True)) # type: ignore
workflow_ids = [workflow_id.strip() for workflow_id in args.workflow_ids.split(",") if workflow_id.strip()]
workflow_ids = list(
dict.fromkeys(workflow_id.strip() for workflow_id in args.workflow_ids.split(",") if workflow_id.strip())
)
if len(workflow_ids) > MAX_WORKFLOW_ONLINE_USERS_QUERY_IDS:
raise BadRequest(f"Maximum {MAX_WORKFLOW_ONLINE_USERS_QUERY_IDS} workflow_ids are allowed per request.")
if not workflow_ids:
return {"data": []}
_, current_tenant_id = current_account_with_tenant()
workflow_service = WorkflowService()
accessible_workflow_ids = workflow_service.get_accessible_workflow_ids(workflow_ids, current_tenant_id)
results = []
for workflow_id in workflow_ids:
if workflow_id not in accessible_workflow_ids:
continue
users_json = redis_client.hgetall(f"{WORKFLOW_ONLINE_USERS_PREFIX}{workflow_id}")
users = []

View File

@ -4,7 +4,6 @@ online_user_partial_fields = {
"user_id": fields.String,
"username": fields.String,
"avatar": fields.String,
"sid": fields.String,
}
workflow_online_users_fields = {

View File

@ -199,6 +199,16 @@ class WorkflowService:
return workflow
def get_accessible_workflow_ids(self, workflow_ids: Sequence[str], tenant_id: str) -> set[str]:
"""
Return workflow IDs that belong to the given tenant.
"""
if not workflow_ids:
return set()
stmt = select(Workflow.id).where(Workflow.id.in_(workflow_ids), Workflow.tenant_id == tenant_id)
return {str(workflow_id) for workflow_id in db.session.scalars(stmt).all()}
def get_all_published_workflow(
self,
*,

View File

@ -1,5 +1,6 @@
from __future__ import annotations
import json
from datetime import datetime
from types import SimpleNamespace
from unittest.mock import Mock
@ -290,3 +291,89 @@ def test_advanced_chat_run_conversation_not_exists(app, monkeypatch: pytest.Monk
):
with pytest.raises(NotFound):
handler(api, app_model=SimpleNamespace(id="app"))
def test_workflow_online_users_filters_inaccessible_workflow(
app, monkeypatch: pytest.MonkeyPatch
) -> None:
workflow_id_1 = "11111111-1111-1111-1111-111111111111"
workflow_id_2 = "22222222-2222-2222-2222-222222222222"
monkeypatch.setattr(workflow_module, "current_account_with_tenant", lambda: (SimpleNamespace(), "tenant-1"))
monkeypatch.setattr(
workflow_module,
"WorkflowService",
lambda: SimpleNamespace(get_accessible_workflow_ids=lambda workflow_ids, tenant_id: {workflow_id_1}),
)
workflow_module.redis_client.hgetall.side_effect = lambda key: (
{
b"sid-1": json.dumps(
{
"user_id": "u-1",
"username": "Alice",
"avatar": "avatar-url",
"sid": "sid-1",
}
)
}
if key == f"{workflow_module.WORKFLOW_ONLINE_USERS_PREFIX}{workflow_id_1}"
else {}
)
api = workflow_module.WorkflowOnlineUsersApi()
handler = _unwrap(api.get)
with app.test_request_context(
f"/apps/workflows/online-users?workflow_ids={workflow_id_1},{workflow_id_2}",
method="GET",
):
response = handler(api)
assert response == {
"data": [
{
"workflow_id": workflow_id_1,
"users": [
{
"user_id": "u-1",
"username": "Alice",
"avatar": "avatar-url",
"sid": "sid-1",
}
],
}
]
}
workflow_module.redis_client.hgetall.assert_called_once_with(
f"{workflow_module.WORKFLOW_ONLINE_USERS_PREFIX}{workflow_id_1}"
)
def test_workflow_online_users_rejects_excessive_workflow_ids(
app, monkeypatch: pytest.MonkeyPatch
) -> None:
monkeypatch.setattr(workflow_module, "current_account_with_tenant", lambda: (SimpleNamespace(), "tenant-1"))
accessible_workflow_ids = Mock(return_value=set())
monkeypatch.setattr(
workflow_module,
"WorkflowService",
lambda: SimpleNamespace(get_accessible_workflow_ids=accessible_workflow_ids),
)
excessive_ids = ",".join(
f"wf-{index}" for index in range(workflow_module.MAX_WORKFLOW_ONLINE_USERS_QUERY_IDS + 1)
)
api = workflow_module.WorkflowOnlineUsersApi()
handler = _unwrap(api.get)
with app.test_request_context(
f"/apps/workflows/online-users?workflow_ids={excessive_ids}",
method="GET",
):
with pytest.raises(HTTPException) as exc:
handler(api)
assert exc.value.code == 400
assert "Maximum" in exc.value.description
accessible_workflow_ids.assert_not_called()