Reworked sensitive domain name and restriction logic (#878)

* This is a fix for a potential security issue; the old code had edge
  cases with unexpected behavior.
* LEMUR_RESTRICTED_DOMAINS is no more, instead LEMUR_WHITELISTED_DOMAINS
  is a list of *allowed* domain name patterns. Per discussion in PR #600
* Domain restrictions are now checked everywhere: in domain name-like
  CN (common name) values and SAN DNSNames, including raw CSR requests.
* Common name values that contain a space are exempt, since they cannot
  be valid domain names.
This commit is contained in:
Marti Raudsepp 2017-08-17 05:24:49 +03:00 committed by kevgliss
parent 466df367e6
commit 7762d6ed52
10 changed files with 197 additions and 27 deletions

View File

@ -70,11 +70,18 @@ Basic Configuration
Specifies whether to allow certificates created by Lemur to expire on weekends. Default is True. Specifies whether to allow certificates created by Lemur to expire on weekends. Default is True.
.. data:: LEMUR_RESTRICTED_DOMAINS .. data:: LEMUR_WHITELISTED_DOMAINS
:noindex: :noindex:
This allows the administrator to mark a subset of domains or domains matching a particular regex as List of regular expressions for domain restrictions; if the list is not empty, normal users can only issue
*restricted*. This means that only an administrator is allows to issue the domains in question. certificates for domain names matching at least one pattern on this list. Administrators are exempt from this
restriction.
Cerificate common name is matched against these rules *if* it does not contain a space. SubjectAltName DNS names
are always matched against these rules.
Take care to write patterns in such way to not allow the `*` wildcard character inadvertently. To match a `.`
character, it must be escaped (as `\.`).
.. data:: LEMUR_TOKEN_SECRET .. data:: LEMUR_TOKEN_SECRET
:noindex: :noindex:

View File

@ -23,7 +23,7 @@ class AuthorityInputSchema(LemurInputSchema):
name = fields.String(required=True) name = fields.String(required=True)
owner = fields.Email(required=True) owner = fields.Email(required=True)
description = fields.String() description = fields.String()
common_name = fields.String(required=True, validate=validators.sensitive_domain) common_name = fields.String(required=True, validate=validators.common_name)
validity_start = ArrowDateTime() validity_start = ArrowDateTime()
validity_end = ArrowDateTime() validity_end = ArrowDateTime()

View File

@ -55,7 +55,7 @@ class CertificateCreationSchema(CertificateSchema):
class CertificateInputSchema(CertificateCreationSchema): class CertificateInputSchema(CertificateCreationSchema):
name = fields.String() name = fields.String()
common_name = fields.String(required=True, validate=validators.sensitive_domain) common_name = fields.String(required=True, validate=validators.common_name)
authority = fields.Nested(AssociatedAuthoritySchema, required=True) authority = fields.Nested(AssociatedAuthoritySchema, required=True)
validity_start = ArrowDateTime() validity_start = ArrowDateTime()

View File

@ -18,6 +18,8 @@ from marshmallow import utils
from marshmallow.fields import Field from marshmallow.fields import Field
from marshmallow.exceptions import ValidationError from marshmallow.exceptions import ValidationError
from lemur.common import validators
class Hex(Field): class Hex(Field):
""" """
@ -357,6 +359,7 @@ class SubjectAlternativeNameExtension(Field):
general_names = [] general_names = []
for name in value: for name in value:
if name['nameType'] == 'DNSName': if name['nameType'] == 'DNSName':
validators.sensitive_domain(name['value'])
general_names.append(x509.DNSName(name['value'])) general_names.append(x509.DNSName(name['value']))
elif name['nameType'] == 'IPAddress': elif name['nameType'] == 'IPAddress':

View File

@ -1,9 +1,10 @@
import re import re
from flask import current_app
from cryptography import x509 from cryptography import x509
from cryptography.hazmat.backends import default_backend from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization from cryptography.hazmat.primitives import serialization
from cryptography.x509 import NameOID
from flask import current_app
from marshmallow.exceptions import ValidationError from marshmallow.exceptions import ValidationError
from lemur.auth.permissions import SensitiveDomainPermission from lemur.auth.permissions import SensitiveDomainPermission
@ -41,22 +42,33 @@ def private_key(key):
raise ValidationError('Private key presented is not valid.') raise ValidationError('Private key presented is not valid.')
def common_name(value):
"""If the common name could be a domain name, apply domain validation rules."""
# Common name could be a domain name, or a human-readable name of the subject (often used in CA names or client
# certificates). As a simple heuristic, we assume that human-readable names always include a space.
# However, to avoid confusion for humans, we also don't count spaces at the beginning or end of the string.
if ' ' not in value.strip():
return sensitive_domain(value)
def sensitive_domain(domain): def sensitive_domain(domain):
""" """
Determines if domain has been marked as sensitive. Checks if user has the admin role, the domain does not match sensitive domains and whitelisted domain patterns.
:param domain: :param domain: domain name (str)
:return: :return:
""" """
restricted_domains = current_app.config.get('LEMUR_RESTRICTED_DOMAINS', []) if SensitiveDomainPermission().can():
if restricted_domains: # User has permission, no need to check anything
domains = domain_service.get_by_name(domain) return
for domain in domains:
# we only care about non-admins whitelist = current_app.config.get('LEMUR_WHITELISTED_DOMAINS', [])
if not SensitiveDomainPermission().can(): if whitelist and not any(re.match(pattern, domain) for pattern in whitelist):
if domain.sensitive or any([re.match(pattern, domain.name) for pattern in restricted_domains]): raise ValidationError('Domain {0} does not match whitelisted domain patterns. '
raise ValidationError( 'Contact an administrator to issue the certificate.'.format(domain))
'Domain {0} has been marked as sensitive, contact and administrator \
to issue the certificate.'.format(domain)) if any(d.sensitive for d in domain_service.get_by_name(domain)):
raise ValidationError('Domain {0} has been marked as sensitive. '
'Contact an administrator to issue the certificate.'.format(domain))
def encoding(oid_encoding): def encoding(oid_encoding):
@ -84,15 +96,27 @@ def sub_alt_type(alt_type):
def csr(data): def csr(data):
""" """
Determines if the CSR is valid. Determines if the CSR is valid and allowed.
:param data: :param data:
:return: :return:
""" """
try: try:
x509.load_pem_x509_csr(data.encode('utf-8'), default_backend()) request = x509.load_pem_x509_csr(data.encode('utf-8'), default_backend())
except Exception: except Exception:
raise ValidationError('CSR presented is not valid.') raise ValidationError('CSR presented is not valid.')
# Validate common name and SubjectAltNames
for name in request.subject.get_attributes_for_oid(NameOID.COMMON_NAME):
common_name(name.value)
try:
alt_names = request.extensions.get_extension_for_class(x509.SubjectAlternativeName)
for name in alt_names.value.get_values_for_type(x509.DNSName):
sensitive_domain(name)
except x509.ExtensionNotFound:
pass
def dates(data): def dates(data):
if not data.get('validity_start') and data.get('validity_end'): if not data.get('validity_start') and data.get('validity_end'):

View File

@ -87,8 +87,8 @@ SECRET_KEY = '{flask_secret_key}'
LEMUR_TOKEN_SECRET = '{secret_token}' LEMUR_TOKEN_SECRET = '{secret_token}'
LEMUR_ENCRYPTION_KEYS = '{encryption_key}' LEMUR_ENCRYPTION_KEYS = '{encryption_key}'
# this is a list of domains as regexes that only admins can issue # List of domain regular expressions that non-admin users can issue
LEMUR_RESTRICTED_DOMAINS = [] LEMUR_WHITELISTED_DOMAINS = []
# Mail Server # Mail Server

View File

@ -21,8 +21,10 @@ SECRET_KEY = 'I/dVhOZNSMZMqrFJa5tWli6VQccOGudKerq3eWPMSzQNmHHVhMAQfQ=='
LEMUR_TOKEN_SECRET = 'test' LEMUR_TOKEN_SECRET = 'test'
LEMUR_ENCRYPTION_KEYS = 'o61sBLNBSGtAckngtNrfVNd8xy8Hp9LBGDstTbMbqCY=' LEMUR_ENCRYPTION_KEYS = 'o61sBLNBSGtAckngtNrfVNd8xy8Hp9LBGDstTbMbqCY='
# this is a list of domains as regexes that only admins can issue # List of domain regular expressions that non-admin users can issue
LEMUR_RESTRICTED_DOMAINS = [] LEMUR_WHITELISTED_DOMAINS = [
'^[a-zA-Z0-9-]+\.example\.com$'
]
# Mail Server # Mail Server

View File

@ -1,5 +1,7 @@
import os import os
import pytest import pytest
from flask import current_app
from flask_principal import identity_changed, Identity
from lemur import create_app from lemur import create_app
from lemur.database import db as _db from lemur.database import db as _db
@ -176,3 +178,17 @@ def source_plugin():
from .plugins.source_plugin import TestSourcePlugin from .plugins.source_plugin import TestSourcePlugin
register(TestSourcePlugin) register(TestSourcePlugin)
return TestSourcePlugin return TestSourcePlugin
@pytest.yield_fixture(scope="function")
def logged_in_user(session, app):
with app.test_request_context():
identity_changed.send(current_app._get_current_object(), identity=Identity(1))
yield
@pytest.yield_fixture(scope="function")
def logged_in_admin(session, app):
with app.test_request_context():
identity_changed.send(current_app._get_current_object(), identity=Identity(2))
yield

View File

@ -1,11 +1,11 @@
import pytest import pytest
from lemur.authorities.views import * # noqa
from lemur.authorities.views import * # noqa
from lemur.tests.vectors import VALID_ADMIN_HEADER_TOKEN, VALID_USER_HEADER_TOKEN from lemur.tests.vectors import VALID_ADMIN_HEADER_TOKEN, VALID_USER_HEADER_TOKEN
def test_authority_input_schema(client, role, issuer_plugin): def test_authority_input_schema(client, role, issuer_plugin, logged_in_user):
from lemur.authorities.schemas import AuthorityInputSchema from lemur.authorities.schemas import AuthorityInputSchema
input_data = { input_data = {

View File

@ -6,9 +6,13 @@ import json
import arrow import arrow
import pytest import pytest
from cryptography import x509 from cryptography import x509
from marshmallow import ValidationError
from freezegun import freeze_time from freezegun import freeze_time
from lemur.certificates.views import * # noqa from lemur.certificates.views import * # noqa
from lemur.domains.models import Domain
from lemur.tests.vectors import VALID_ADMIN_HEADER_TOKEN, VALID_USER_HEADER_TOKEN, CSR_STR, \ from lemur.tests.vectors import VALID_ADMIN_HEADER_TOKEN, VALID_USER_HEADER_TOKEN, CSR_STR, \
INTERNAL_VALID_LONG_STR, INTERNAL_VALID_SAN_STR, PRIVATE_KEY_STR INTERNAL_VALID_LONG_STR, INTERNAL_VALID_SAN_STR, PRIVATE_KEY_STR
@ -241,6 +245,89 @@ def test_certificate_valid_dates(client, authority):
assert not errors assert not errors
def test_certificate_cn_admin(client, authority, logged_in_admin):
"""Admin is exempt from CN/SAN domain restrictions."""
from lemur.certificates.schemas import CertificateInputSchema
input_data = {
'commonName': '*.admin-overrides-whitelist.com',
'owner': 'jim@example.com',
'authority': {'id': authority.id},
'description': 'testtestest',
'validityStart': '2020-01-01T00:00:00',
'validityEnd': '2020-01-01T00:00:01',
}
data, errors = CertificateInputSchema().load(input_data)
assert not errors
def test_certificate_allowed_names(client, authority, session, logged_in_user):
"""Test for allowed CN and SAN values."""
from lemur.certificates.schemas import CertificateInputSchema
input_data = {
'commonName': 'Names with spaces are not checked',
'owner': 'jim@example.com',
'authority': {'id': authority.id},
'description': 'testtestest',
'validityStart': '2020-01-01T00:00:00',
'validityEnd': '2020-01-01T00:00:01',
'extensions': {
'subAltNames': {
'names': [
{'nameType': 'DNSName', 'value': 'allowed.example.com'},
{'nameType': 'IPAddress', 'value': '127.0.0.1'},
]
}
}
}
data, errors = CertificateInputSchema().load(input_data)
assert not errors
def test_certificate_disallowed_names(client, authority, session, logged_in_user):
"""The CN and SAN are disallowed by LEMUR_WHITELISTED_DOMAINS."""
from lemur.certificates.schemas import CertificateInputSchema
input_data = {
'commonName': '*.example.com',
'owner': 'jim@example.com',
'authority': {'id': authority.id},
'description': 'testtestest',
'validityStart': '2020-01-01T00:00:00',
'validityEnd': '2020-01-01T00:00:01',
'extensions': {
'subAltNames': {
'names': [
{'nameType': 'DNSName', 'value': 'allowed.example.com'},
{'nameType': 'DNSName', 'value': 'evilhacker.org'},
]
}
}
}
data, errors = CertificateInputSchema().load(input_data)
assert errors['common_name'][0].startswith("Domain *.example.com does not match whitelisted domain patterns")
assert (errors['extensions']['sub_alt_names']['names'][0]
.startswith("Domain evilhacker.org does not match whitelisted domain patterns"))
def test_certificate_sensitive_name(client, authority, session, logged_in_user):
"""The CN is disallowed by 'sensitive' flag on Domain model."""
from lemur.certificates.schemas import CertificateInputSchema
input_data = {
'commonName': 'sensitive.example.com',
'owner': 'jim@example.com',
'authority': {'id': authority.id},
'description': 'testtestest',
'validityStart': '2020-01-01T00:00:00',
'validityEnd': '2020-01-01T00:00:01',
}
session.add(Domain(name='sensitive.example.com', sensitive=True))
data, errors = CertificateInputSchema().load(input_data)
assert errors['common_name'][0].startswith("Domain sensitive.example.com has been marked as sensitive")
def test_create_basic_csr(client): def test_create_basic_csr(client):
from cryptography import x509 from cryptography import x509
from cryptography.hazmat.backends import default_backend from cryptography.hazmat.backends import default_backend
@ -263,6 +350,37 @@ def test_create_basic_csr(client):
assert name.value in csr_config.values() assert name.value in csr_config.values()
def test_csr_disallowed_cn(client, logged_in_user):
"""Domain name CN is disallowed via LEMUR_WHITELISTED_DOMAINS."""
from lemur.certificates.service import create_csr
from lemur.common import validators
request, pkey = create_csr(
common_name='evilhacker.org',
owner='joe@example.com',
key_type='RSA2048',
)
with pytest.raises(ValidationError) as err:
validators.csr(request)
assert str(err.value).startswith('Domain evilhacker.org does not match whitelisted domain patterns')
def test_csr_disallowed_san(client, logged_in_user):
"""SAN name is disallowed by LEMUR_WHITELISTED_DOMAINS."""
from lemur.certificates.service import create_csr
from lemur.common import validators
request, pkey = create_csr(
common_name="CN with spaces isn't a domain and is thus allowed",
owner='joe@example.com',
key_type='RSA2048',
extensions={'sub_alt_names': {'names': x509.SubjectAlternativeName([x509.DNSName('evilhacker.org')])}}
)
with pytest.raises(ValidationError) as err:
validators.csr(request)
assert str(err.value).startswith('Domain evilhacker.org does not match whitelisted domain patterns')
def test_get_name_from_arn(client): def test_get_name_from_arn(client):
from lemur.certificates.service import get_name_from_arn from lemur.certificates.service import get_name_from_arn
arn = 'arn:aws:iam::11111111:server-certificate/mycertificate' arn = 'arn:aws:iam::11111111:server-certificate/mycertificate'