Merge branch 'master' into skip_duplicate_tasks

This commit is contained in:
Curtis 2019-03-12 14:53:42 -07:00 committed by GitHub
commit 3b3faa66f4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 96 additions and 20 deletions

View File

@ -192,12 +192,16 @@ class Certificate(db.Model):
def check_integrity(self): def check_integrity(self):
""" """
Integrity checks: Does the cert have a matching private key? Integrity checks: Does the cert have a valid chain and matching private key?
""" """
if self.private_key: if self.private_key:
validators.verify_private_key_match(utils.parse_private_key(self.private_key), self.parsed_cert, validators.verify_private_key_match(utils.parse_private_key(self.private_key), self.parsed_cert,
error_class=AssertionError) error_class=AssertionError)
if self.chain:
chain = [self.parsed_cert] + utils.parse_cert_chain(self.chain)
validators.verify_cert_chain(chain, error_class=AssertionError)
@cached_property @cached_property
def parsed_cert(self): def parsed_cert(self):
assert self.body, "Certificate body not set" assert self.body, "Certificate body not set"

View File

@ -245,8 +245,7 @@ class CertificateUploadInputSchema(CertificateCreationSchema):
external_id = fields.String(missing=None, allow_none=True) external_id = fields.String(missing=None, allow_none=True)
private_key = fields.String() private_key = fields.String()
body = fields.String(required=True) body = fields.String(required=True)
chain = fields.String(validate=validators.public_certificate, missing=None, chain = fields.String(missing=None, allow_none=True)
allow_none=True) # TODO this could be multiple certificates
destinations = fields.Nested(AssociatedDestinationSchema, missing=[], many=True) destinations = fields.Nested(AssociatedDestinationSchema, missing=[], many=True)
notifications = fields.Nested(AssociatedNotificationSchema, missing=[], many=True) notifications = fields.Nested(AssociatedNotificationSchema, missing=[], many=True)
@ -260,7 +259,7 @@ class CertificateUploadInputSchema(CertificateCreationSchema):
raise ValidationError('Destinations require private key.') raise ValidationError('Destinations require private key.')
@validates_schema @validates_schema
def validate_cert_private_key(self, data): def validate_cert_private_key_chain(self, data):
cert = None cert = None
key = None key = None
if data.get('body'): if data.get('body'):
@ -279,6 +278,15 @@ class CertificateUploadInputSchema(CertificateCreationSchema):
# Throws ValidationError # Throws ValidationError
validators.verify_private_key_match(key, cert) validators.verify_private_key_match(key, cert)
if data.get('chain'):
try:
chain = utils.parse_cert_chain(data['chain'])
except ValueError:
raise ValidationError("Invalid certificate in certificate chain.", field_names=['chain'])
# Throws ValidationError
validators.verify_cert_chain([cert] + chain)
class CertificateExportInputSchema(LemurInputSchema): class CertificateExportInputSchema(LemurInputSchema):
plugin = fields.Nested(PluginInputSchema) plugin = fields.Nested(PluginInputSchema)

View File

@ -7,6 +7,7 @@
.. moduleauthor:: Kevin Glisson <kglisson@netflix.com> .. moduleauthor:: Kevin Glisson <kglisson@netflix.com>
""" """
import random import random
import re
import string import string
import sqlalchemy import sqlalchemy
@ -67,6 +68,26 @@ def parse_private_key(private_key):
return load_pem_private_key(private_key.encode('utf8'), password=None, backend=default_backend()) return load_pem_private_key(private_key.encode('utf8'), password=None, backend=default_backend())
def split_pem(data):
"""
Split a string of several PEM payloads to a list of strings.
:param data: String
:return: List of strings
"""
return re.split("\n(?=-----BEGIN )", data)
def parse_cert_chain(pem_chain):
"""
Helper function to split and parse a series of PEM certificates.
:param pem_chain: string
:return: List of parsed certificates
"""
return [parse_certificate(cert) for cert in split_pem(pem_chain) if pem_chain]
def parse_csr(csr): def parse_csr(csr):
""" """
Helper function that parses a CSR. Helper function that parses a CSR.

View File

@ -1,27 +1,14 @@
import re import re
from cryptography import x509 from cryptography import x509
from cryptography.exceptions import UnsupportedAlgorithm, InvalidSignature
from cryptography.hazmat.backends import default_backend from cryptography.hazmat.backends import default_backend
from cryptography.x509 import NameOID from cryptography.x509 import NameOID
from flask import current_app from flask import current_app
from marshmallow.exceptions import ValidationError from marshmallow.exceptions import ValidationError
from lemur.auth.permissions import SensitiveDomainPermission from lemur.auth.permissions import SensitiveDomainPermission
from lemur.common.utils import parse_certificate, is_weekend from lemur.common.utils import check_cert_signature, is_weekend
def public_certificate(body):
"""
Determines if specified string is valid public certificate.
:param body:
:return:
"""
try:
parse_certificate(body)
except Exception as e:
current_app.logger.exception(e)
raise ValidationError('Public certificate presented is not valid.')
def common_name(value): def common_name(value):
@ -138,3 +125,34 @@ def verify_private_key_match(key, cert, error_class=ValidationError):
""" """
if key.public_key().public_numbers() != cert.public_key().public_numbers(): if key.public_key().public_numbers() != cert.public_key().public_numbers():
raise error_class("Private key does not match certificate.") raise error_class("Private key does not match certificate.")
def verify_cert_chain(certs, error_class=ValidationError):
"""
Verifies that the certificates in the chain are correct.
We don't bother with full cert validation but just check that certs in the chain are signed by the next, to avoid
basic human errors -- such as pasting the wrong certificate.
:param certs: List of parsed certificates, use parse_cert_chain()
:param error_class: Exception class to raise on error
"""
cert = certs[0]
for issuer in certs[1:]:
# Use the current cert's public key to verify the previous signature.
# "certificate validation is a complex problem that involves much more than just signature checks"
try:
check_cert_signature(cert, issuer.public_key())
except InvalidSignature:
# Avoid circular import.
from lemur.common import defaults
raise error_class("Incorrect chain certificate(s) provided: '%s' is not signed by '%s'"
% (defaults.common_name(cert) or 'Unknown', defaults.common_name(issuer)))
except UnsupportedAlgorithm as err:
current_app.logger.warning("Skipping chain validation: %s", err)
# Next loop will validate that *this issuer* cert is signed by the next chain cert.
cert = issuer

View File

@ -140,6 +140,7 @@ class CACertificateFactory(CertificateFactory):
class InvalidCertificateFactory(CertificateFactory): class InvalidCertificateFactory(CertificateFactory):
body = INVALID_CERT_STR body = INVALID_CERT_STR
private_key = '' private_key = ''
chain = ''
class AuthorityFactory(BaseFactory): class AuthorityFactory(BaseFactory):

View File

@ -512,7 +512,7 @@ def test_certificate_upload_schema_invalid_chain(client):
'owner': 'pwner@example.com', 'owner': 'pwner@example.com',
} }
data, errors = CertificateUploadInputSchema().load(data) data, errors = CertificateUploadInputSchema().load(data)
assert errors == {'chain': ['Public certificate presented is not valid.']} assert errors == {'chain': ['Invalid certificate in certificate chain.']}
def test_certificate_upload_schema_wrong_pkey(client): def test_certificate_upload_schema_wrong_pkey(client):
@ -527,6 +527,30 @@ def test_certificate_upload_schema_wrong_pkey(client):
assert errors == {'_schema': ['Private key does not match certificate.']} assert errors == {'_schema': ['Private key does not match certificate.']}
def test_certificate_upload_schema_wrong_chain(client):
from lemur.certificates.schemas import CertificateUploadInputSchema
data = {
'owner': 'pwner@example.com',
'body': SAN_CERT_STR,
'chain': ROOTCA_CERT_STR,
}
data, errors = CertificateUploadInputSchema().load(data)
assert errors == {'_schema': ["Incorrect chain certificate(s) provided: 'san.example.org' is not signed by "
"'LemurTrust Unittests Root CA 2018'"]}
def test_certificate_upload_schema_wrong_chain_2nd(client):
from lemur.certificates.schemas import CertificateUploadInputSchema
data = {
'owner': 'pwner@example.com',
'body': SAN_CERT_STR,
'chain': INTERMEDIATE_CERT_STR + '\n' + SAN_CERT_STR,
}
data, errors = CertificateUploadInputSchema().load(data)
assert errors == {'_schema': ["Incorrect chain certificate(s) provided: 'LemurTrust Unittests Class 1 CA 2018' is "
"not signed by 'san.example.org'"]}
def test_create_basic_csr(client): def test_create_basic_csr(client):
csr_config = dict( csr_config = dict(
common_name='example.com', common_name='example.com',