fix(security): enforce tenant scoping on app trace-config endpoints

The /console/api/apps/<app_id>/trace-config endpoints (GET/POST/PATCH/
DELETE) only checked that the caller was authenticated; they did not
verify that the target app_id belonged to the caller's tenant.

A logged-in user from tenant A could read, modify, or delete the
tracing configuration of an app in tenant B (e.g., redirect outbound
traces to an attacker-controlled Langfuse endpoint).

Apply the established @get_app_model decorator (api/controllers/
console/app/wraps.py) to all four verbs. The decorator loads the App
with App.tenant_id == current_tenant_id and raises AppNotFoundError
on mismatch — same pattern used by mcp_server.py and workflow_trigger.py.

Refs: GHSA-48xc-wmw8-3jr3 (reported by zafido via Huntr).
This commit is contained in:
xr843 2026-05-05 14:22:50 +08:00
parent c5ac191a79
commit 9909742371

View File

@ -7,8 +7,10 @@ from werkzeug.exceptions import BadRequest
from controllers.console import console_ns
from controllers.console.app.error import TracingConfigCheckError, TracingConfigIsExist, TracingConfigNotExist
from controllers.console.app.wraps import get_app_model
from controllers.console.wraps import account_initialization_required, setup_required
from libs.login import login_required
from models import App
from services.ops_service import OpsService
DEFAULT_REF_TEMPLATE_SWAGGER_2_0 = "#/definitions/{model}"
@ -49,11 +51,12 @@ class TraceAppConfigApi(Resource):
@setup_required
@login_required
@account_initialization_required
def get(self, app_id):
@get_app_model
def get(self, app_model: App):
args = TraceProviderQuery.model_validate(request.args.to_dict(flat=True)) # type: ignore
try:
trace_config = OpsService.get_tracing_app_config(app_id=app_id, tracing_provider=args.tracing_provider)
trace_config = OpsService.get_tracing_app_config(app_id=app_model.id, tracing_provider=args.tracing_provider)
if not trace_config:
return {"has_not_configured": True}
return trace_config
@ -71,13 +74,14 @@ class TraceAppConfigApi(Resource):
@setup_required
@login_required
@account_initialization_required
def post(self, app_id):
@get_app_model
def post(self, app_model: App):
"""Create a new trace app configuration"""
args = TraceConfigPayload.model_validate(console_ns.payload)
try:
result = OpsService.create_tracing_app_config(
app_id=app_id, tracing_provider=args.tracing_provider, tracing_config=args.tracing_config
app_id=app_model.id, tracing_provider=args.tracing_provider, tracing_config=args.tracing_config
)
if not result:
raise TracingConfigIsExist()
@ -96,13 +100,14 @@ class TraceAppConfigApi(Resource):
@setup_required
@login_required
@account_initialization_required
def patch(self, app_id):
@get_app_model
def patch(self, app_model: App):
"""Update an existing trace app configuration"""
args = TraceConfigPayload.model_validate(console_ns.payload)
try:
result = OpsService.update_tracing_app_config(
app_id=app_id, tracing_provider=args.tracing_provider, tracing_config=args.tracing_config
app_id=app_model.id, tracing_provider=args.tracing_provider, tracing_config=args.tracing_config
)
if not result:
raise TracingConfigNotExist()
@ -119,12 +124,13 @@ class TraceAppConfigApi(Resource):
@setup_required
@login_required
@account_initialization_required
def delete(self, app_id):
@get_app_model
def delete(self, app_model: App):
"""Delete an existing trace app configuration"""
args = TraceProviderQuery.model_validate(request.args.to_dict(flat=True)) # type: ignore
try:
result = OpsService.delete_tracing_app_config(app_id=app_id, tracing_provider=args.tracing_provider)
result = OpsService.delete_tracing_app_config(app_id=app_model.id, tracing_provider=args.tracing_provider)
if not result:
raise TracingConfigNotExist()
return {"result": "success"}, 204