Prevent unintended access to sensitive fields (passwords, private keys) (#876)
Make sure that fields specified in filter, sortBy, etc. are model fields and may be accessed. This is fixes a potential security issue. The filter() function allowed guessing the content of password hashes one character at a time. The sort() function allowed the user to call an arbitrary method of an arbitrary model attribute, for example sortBy=id&sortDir=distinct would produce an unexpected error.
This commit is contained in:
parent
b40c6a1c67
commit
cf805f530f
|
@ -125,6 +125,8 @@ class Certificate(db.Model):
|
||||||
endpoints = relationship('Endpoint', backref='certificate')
|
endpoints = relationship('Endpoint', backref='certificate')
|
||||||
rotation_policy = relationship("RotationPolicy")
|
rotation_policy = relationship("RotationPolicy")
|
||||||
|
|
||||||
|
sensitive_fields = ('private_key',)
|
||||||
|
|
||||||
def __init__(self, **kwargs):
|
def __init__(self, **kwargs):
|
||||||
cert = lemur.common.utils.parse_certificate(kwargs['body'])
|
cert = lemur.common.utils.parse_certificate(kwargs['body'])
|
||||||
|
|
||||||
|
|
|
@ -75,6 +75,16 @@ def add(model):
|
||||||
db.session.add(model)
|
db.session.add(model)
|
||||||
|
|
||||||
|
|
||||||
|
def get_model_column(model, field):
|
||||||
|
if field in getattr(model, 'sensitive_fields', ()):
|
||||||
|
raise AttrNotFound(field)
|
||||||
|
column = model.__table__.columns._data.get(field, None)
|
||||||
|
if column is None:
|
||||||
|
raise AttrNotFound(field)
|
||||||
|
|
||||||
|
return column
|
||||||
|
|
||||||
|
|
||||||
def find_all(query, model, kwargs):
|
def find_all(query, model, kwargs):
|
||||||
"""
|
"""
|
||||||
Returns a query object that ensures that all kwargs
|
Returns a query object that ensures that all kwargs
|
||||||
|
@ -91,7 +101,7 @@ def find_all(query, model, kwargs):
|
||||||
if not isinstance(value, list):
|
if not isinstance(value, list):
|
||||||
value = value.split(',')
|
value = value.split(',')
|
||||||
|
|
||||||
conditions.append(getattr(model, attr).in_(value))
|
conditions.append(get_model_column(model, attr).in_(value))
|
||||||
|
|
||||||
return query.filter(and_(*conditions))
|
return query.filter(and_(*conditions))
|
||||||
|
|
||||||
|
@ -108,7 +118,7 @@ def find_any(query, model, kwargs):
|
||||||
"""
|
"""
|
||||||
or_args = []
|
or_args = []
|
||||||
for attr, value in kwargs.items():
|
for attr, value in kwargs.items():
|
||||||
or_args.append(or_(getattr(model, attr) == value))
|
or_args.append(or_(get_model_column(model, attr) == value))
|
||||||
exprs = or_(*or_args)
|
exprs = or_(*or_args)
|
||||||
return query.filter(exprs)
|
return query.filter(exprs)
|
||||||
|
|
||||||
|
@ -123,7 +133,7 @@ def get(model, value, field="id"):
|
||||||
:return:
|
:return:
|
||||||
"""
|
"""
|
||||||
query = session_query(model)
|
query = session_query(model)
|
||||||
return query.filter(getattr(model, field) == value).scalar()
|
return query.filter(get_model_column(model, field) == value).scalar()
|
||||||
|
|
||||||
|
|
||||||
def get_all(model, value, field="id"):
|
def get_all(model, value, field="id"):
|
||||||
|
@ -136,7 +146,7 @@ def get_all(model, value, field="id"):
|
||||||
:return:
|
:return:
|
||||||
"""
|
"""
|
||||||
query = session_query(model)
|
query = session_query(model)
|
||||||
return query.filter(getattr(model, field) == value)
|
return query.filter(get_model_column(model, field) == value)
|
||||||
|
|
||||||
|
|
||||||
def create(model):
|
def create(model):
|
||||||
|
@ -188,7 +198,8 @@ def filter(query, model, terms):
|
||||||
:param terms:
|
:param terms:
|
||||||
:return:
|
:return:
|
||||||
"""
|
"""
|
||||||
return query.filter(getattr(model, terms[0]).ilike('%{}%'.format(terms[1])))
|
column = get_model_column(model, terms[0])
|
||||||
|
return query.filter(column.ilike('%{}%'.format(terms[1])))
|
||||||
|
|
||||||
|
|
||||||
def sort(query, model, field, direction):
|
def sort(query, model, field, direction):
|
||||||
|
@ -201,13 +212,8 @@ def sort(query, model, field, direction):
|
||||||
:param field:
|
:param field:
|
||||||
:param direction:
|
:param direction:
|
||||||
"""
|
"""
|
||||||
try:
|
column = get_model_column(model, field)
|
||||||
field = getattr(model, field)
|
return query.order_by(column.desc() if direction == 'desc' else column.asc())
|
||||||
direction = getattr(field, direction)
|
|
||||||
query = query.order_by(direction())
|
|
||||||
return query
|
|
||||||
except AttributeError:
|
|
||||||
raise AttrNotFound(field)
|
|
||||||
|
|
||||||
|
|
||||||
def paginate(query, page, count):
|
def paginate(query, page, count):
|
||||||
|
|
|
@ -29,7 +29,7 @@ class AttrNotFound(LemurException):
|
||||||
self.field = field
|
self.field = field
|
||||||
|
|
||||||
def __str__(self):
|
def __str__(self):
|
||||||
return repr("The field '{0}' is not sortable".format(self.field))
|
return repr("The field '{0}' is not sortable or filterable".format(self.field))
|
||||||
|
|
||||||
|
|
||||||
class InvalidConfiguration(Exception):
|
class InvalidConfiguration(Exception):
|
||||||
|
|
|
@ -30,5 +30,7 @@ class Role(db.Model):
|
||||||
users = relationship("User", secondary=roles_users, passive_deletes=True, backref="role")
|
users = relationship("User", secondary=roles_users, passive_deletes=True, backref="role")
|
||||||
certificates = relationship("Certificate", secondary=roles_certificates, backref="role")
|
certificates = relationship("Certificate", secondary=roles_certificates, backref="role")
|
||||||
|
|
||||||
|
sensitive_fields = ('password',)
|
||||||
|
|
||||||
def __repr__(self):
|
def __repr__(self):
|
||||||
return "Role(name={name})".format(name=self.name)
|
return "Role(name={name})".format(name=self.name)
|
||||||
|
|
|
@ -1,15 +1,14 @@
|
||||||
from __future__ import unicode_literals # at top of module
|
from __future__ import unicode_literals # at top of module
|
||||||
|
|
||||||
import json
|
|
||||||
import pytest
|
|
||||||
import datetime
|
import datetime
|
||||||
import arrow
|
import json
|
||||||
|
|
||||||
from freezegun import freeze_time
|
import arrow
|
||||||
|
import pytest
|
||||||
from cryptography import x509
|
from cryptography import x509
|
||||||
|
from freezegun import freeze_time
|
||||||
|
|
||||||
from lemur.certificates.views import * # noqa
|
from lemur.certificates.views import * # noqa
|
||||||
|
|
||||||
from lemur.tests.vectors import VALID_ADMIN_HEADER_TOKEN, VALID_USER_HEADER_TOKEN, CSR_STR, \
|
from lemur.tests.vectors import VALID_ADMIN_HEADER_TOKEN, VALID_USER_HEADER_TOKEN, CSR_STR, \
|
||||||
INTERNAL_VALID_LONG_STR, INTERNAL_VALID_SAN_STR, PRIVATE_KEY_STR
|
INTERNAL_VALID_LONG_STR, INTERNAL_VALID_SAN_STR, PRIVATE_KEY_STR
|
||||||
|
|
||||||
|
@ -535,3 +534,8 @@ def test_certificates_upload_delete(client, token, status):
|
||||||
])
|
])
|
||||||
def test_certificates_upload_patch(client, token, status):
|
def test_certificates_upload_patch(client, token, status):
|
||||||
assert client.patch(api.url_for(CertificatesUpload), data={}, headers=token).status_code == status
|
assert client.patch(api.url_for(CertificatesUpload), data={}, headers=token).status_code == status
|
||||||
|
|
||||||
|
|
||||||
|
def test_sensitive_sort(client):
|
||||||
|
resp = client.get(api.url_for(CertificatesList) + '?sortBy=private_key&sortDir=asc', headers=VALID_ADMIN_HEADER_TOKEN)
|
||||||
|
assert "'private_key' is not sortable or filterable" in resp.json['message']
|
||||||
|
|
|
@ -1,10 +1,9 @@
|
||||||
import json
|
import json
|
||||||
|
|
||||||
import pytest
|
import pytest
|
||||||
|
|
||||||
from lemur.roles.views import * # noqa
|
from lemur.roles.views import * # noqa
|
||||||
from lemur.tests.factories import RoleFactory, AuthorityFactory, CertificateFactory, UserFactory
|
from lemur.tests.factories import RoleFactory, AuthorityFactory, CertificateFactory, UserFactory
|
||||||
|
|
||||||
|
|
||||||
from .vectors import VALID_ADMIN_HEADER_TOKEN, VALID_USER_HEADER_TOKEN
|
from .vectors import VALID_ADMIN_HEADER_TOKEN, VALID_USER_HEADER_TOKEN
|
||||||
|
|
||||||
|
|
||||||
|
@ -165,3 +164,8 @@ def test_role_list_delete(client, token, status):
|
||||||
])
|
])
|
||||||
def test_role_list_patch(client, token, status):
|
def test_role_list_patch(client, token, status):
|
||||||
assert client.patch(api.url_for(RolesList), data={}, headers=token).status_code == status
|
assert client.patch(api.url_for(RolesList), data={}, headers=token).status_code == status
|
||||||
|
|
||||||
|
|
||||||
|
def test_sensitive_filter(client):
|
||||||
|
resp = client.get(api.url_for(RolesList) + '?filter=password;a', headers=VALID_ADMIN_HEADER_TOKEN)
|
||||||
|
assert "'password' is not sortable or filterable" in resp.json['message']
|
||||||
|
|
|
@ -1,8 +1,6 @@
|
||||||
import pytest
|
import pytest
|
||||||
|
|
||||||
from lemur.users.views import * # noqa
|
from lemur.users.views import * # noqa
|
||||||
|
|
||||||
|
|
||||||
from .vectors import VALID_ADMIN_HEADER_TOKEN, VALID_USER_HEADER_TOKEN
|
from .vectors import VALID_ADMIN_HEADER_TOKEN, VALID_USER_HEADER_TOKEN
|
||||||
|
|
||||||
|
|
||||||
|
@ -99,3 +97,13 @@ def test_user_list_delete(client, token, status):
|
||||||
])
|
])
|
||||||
def test_user_list_patch(client, token, status):
|
def test_user_list_patch(client, token, status):
|
||||||
assert client.patch(api.url_for(UsersList), data={}, headers=token).status_code == status
|
assert client.patch(api.url_for(UsersList), data={}, headers=token).status_code == status
|
||||||
|
|
||||||
|
|
||||||
|
def test_sensitive_filter(client):
|
||||||
|
resp = client.get(api.url_for(UsersList) + '?filter=password;a', headers=VALID_ADMIN_HEADER_TOKEN)
|
||||||
|
assert "'password' is not sortable or filterable" in resp.json['message']
|
||||||
|
|
||||||
|
|
||||||
|
def test_sensitive_sort(client):
|
||||||
|
resp = client.get(api.url_for(UsersList) + '?sortBy=password&sortDir=asc', headers=VALID_ADMIN_HEADER_TOKEN)
|
||||||
|
assert "'password' is not sortable or filterable" in resp.json['message']
|
||||||
|
|
|
@ -46,6 +46,8 @@ class User(db.Model):
|
||||||
authorities = relationship('Authority', backref=db.backref('user'), lazy='dynamic')
|
authorities = relationship('Authority', backref=db.backref('user'), lazy='dynamic')
|
||||||
logs = relationship('Log', backref=db.backref('user'), lazy='dynamic')
|
logs = relationship('Log', backref=db.backref('user'), lazy='dynamic')
|
||||||
|
|
||||||
|
sensitive_fields = ('password',)
|
||||||
|
|
||||||
def check_password(self, password):
|
def check_password(self, password):
|
||||||
"""
|
"""
|
||||||
Hash a given password and check it against the stored value
|
Hash a given password and check it against the stored value
|
||||||
|
|
Loading…
Reference in New Issue