diff --git a/lemur/certificates/models.py b/lemur/certificates/models.py index ab43cd01..bd6e8b5e 100644 --- a/lemur/certificates/models.py +++ b/lemur/certificates/models.py @@ -192,12 +192,16 @@ class Certificate(db.Model): def check_integrity(self): """ - Integrity checks: Does the cert have a matching private key? + Integrity checks: Does the cert have a valid chain and matching private key? """ if self.private_key: validators.verify_private_key_match(utils.parse_private_key(self.private_key), self.parsed_cert, error_class=AssertionError) + if self.chain: + chain = [self.parsed_cert] + utils.parse_cert_chain(self.chain) + validators.verify_cert_chain(chain, error_class=AssertionError) + @cached_property def parsed_cert(self): assert self.body, "Certificate body not set" diff --git a/lemur/certificates/schemas.py b/lemur/certificates/schemas.py index 946bd541..d20fd5a7 100644 --- a/lemur/certificates/schemas.py +++ b/lemur/certificates/schemas.py @@ -245,8 +245,7 @@ class CertificateUploadInputSchema(CertificateCreationSchema): external_id = fields.String(missing=None, allow_none=True) private_key = fields.String() body = fields.String(required=True) - chain = fields.String(validate=validators.public_certificate, missing=None, - allow_none=True) # TODO this could be multiple certificates + chain = fields.String(missing=None, allow_none=True) destinations = fields.Nested(AssociatedDestinationSchema, missing=[], many=True) notifications = fields.Nested(AssociatedNotificationSchema, missing=[], many=True) @@ -260,7 +259,7 @@ class CertificateUploadInputSchema(CertificateCreationSchema): raise ValidationError('Destinations require private key.') @validates_schema - def validate_cert_private_key(self, data): + def validate_cert_private_key_chain(self, data): cert = None key = None if data.get('body'): @@ -279,6 +278,15 @@ class CertificateUploadInputSchema(CertificateCreationSchema): # Throws ValidationError validators.verify_private_key_match(key, cert) + if data.get('chain'): + try: + chain = utils.parse_cert_chain(data['chain']) + except ValueError: + raise ValidationError("Invalid certificate in certificate chain.", field_names=['chain']) + + # Throws ValidationError + validators.verify_cert_chain([cert] + chain) + class CertificateExportInputSchema(LemurInputSchema): plugin = fields.Nested(PluginInputSchema) diff --git a/lemur/common/utils.py b/lemur/common/utils.py index 13e6e067..62c3182b 100644 --- a/lemur/common/utils.py +++ b/lemur/common/utils.py @@ -7,6 +7,7 @@ .. moduleauthor:: Kevin Glisson """ import random +import re import string import sqlalchemy @@ -67,6 +68,26 @@ def parse_private_key(private_key): return load_pem_private_key(private_key.encode('utf8'), password=None, backend=default_backend()) +def split_pem(data): + """ + Split a string of several PEM payloads to a list of strings. + + :param data: String + :return: List of strings + """ + return re.split("\n(?=-----BEGIN )", data) + + +def parse_cert_chain(pem_chain): + """ + Helper function to split and parse a series of PEM certificates. + + :param pem_chain: string + :return: List of parsed certificates + """ + return [parse_certificate(cert) for cert in split_pem(pem_chain) if pem_chain] + + def parse_csr(csr): """ Helper function that parses a CSR. diff --git a/lemur/common/validators.py b/lemur/common/validators.py index 90169553..91b831ba 100644 --- a/lemur/common/validators.py +++ b/lemur/common/validators.py @@ -1,27 +1,14 @@ import re from cryptography import x509 +from cryptography.exceptions import UnsupportedAlgorithm, InvalidSignature from cryptography.hazmat.backends import default_backend from cryptography.x509 import NameOID from flask import current_app from marshmallow.exceptions import ValidationError from lemur.auth.permissions import SensitiveDomainPermission -from lemur.common.utils import parse_certificate, is_weekend - - -def public_certificate(body): - """ - Determines if specified string is valid public certificate. - - :param body: - :return: - """ - try: - parse_certificate(body) - except Exception as e: - current_app.logger.exception(e) - raise ValidationError('Public certificate presented is not valid.') +from lemur.common.utils import check_cert_signature, is_weekend def common_name(value): @@ -138,3 +125,34 @@ def verify_private_key_match(key, cert, error_class=ValidationError): """ if key.public_key().public_numbers() != cert.public_key().public_numbers(): raise error_class("Private key does not match certificate.") + + +def verify_cert_chain(certs, error_class=ValidationError): + """ + Verifies that the certificates in the chain are correct. + + We don't bother with full cert validation but just check that certs in the chain are signed by the next, to avoid + basic human errors -- such as pasting the wrong certificate. + + :param certs: List of parsed certificates, use parse_cert_chain() + :param error_class: Exception class to raise on error + """ + cert = certs[0] + for issuer in certs[1:]: + # Use the current cert's public key to verify the previous signature. + # "certificate validation is a complex problem that involves much more than just signature checks" + try: + check_cert_signature(cert, issuer.public_key()) + + except InvalidSignature: + # Avoid circular import. + from lemur.common import defaults + + raise error_class("Incorrect chain certificate(s) provided: '%s' is not signed by '%s'" + % (defaults.common_name(cert) or 'Unknown', defaults.common_name(issuer))) + + except UnsupportedAlgorithm as err: + current_app.logger.warning("Skipping chain validation: %s", err) + + # Next loop will validate that *this issuer* cert is signed by the next chain cert. + cert = issuer diff --git a/lemur/tests/factories.py b/lemur/tests/factories.py index a4af3d43..de78f8a3 100644 --- a/lemur/tests/factories.py +++ b/lemur/tests/factories.py @@ -140,6 +140,7 @@ class CACertificateFactory(CertificateFactory): class InvalidCertificateFactory(CertificateFactory): body = INVALID_CERT_STR private_key = '' + chain = '' class AuthorityFactory(BaseFactory): diff --git a/lemur/tests/test_certificates.py b/lemur/tests/test_certificates.py index a020ac6b..4013d367 100644 --- a/lemur/tests/test_certificates.py +++ b/lemur/tests/test_certificates.py @@ -512,7 +512,7 @@ def test_certificate_upload_schema_invalid_chain(client): 'owner': 'pwner@example.com', } data, errors = CertificateUploadInputSchema().load(data) - assert errors == {'chain': ['Public certificate presented is not valid.']} + assert errors == {'chain': ['Invalid certificate in certificate chain.']} def test_certificate_upload_schema_wrong_pkey(client): @@ -527,6 +527,30 @@ def test_certificate_upload_schema_wrong_pkey(client): assert errors == {'_schema': ['Private key does not match certificate.']} +def test_certificate_upload_schema_wrong_chain(client): + from lemur.certificates.schemas import CertificateUploadInputSchema + data = { + 'owner': 'pwner@example.com', + 'body': SAN_CERT_STR, + 'chain': ROOTCA_CERT_STR, + } + data, errors = CertificateUploadInputSchema().load(data) + assert errors == {'_schema': ["Incorrect chain certificate(s) provided: 'san.example.org' is not signed by " + "'LemurTrust Unittests Root CA 2018'"]} + + +def test_certificate_upload_schema_wrong_chain_2nd(client): + from lemur.certificates.schemas import CertificateUploadInputSchema + data = { + 'owner': 'pwner@example.com', + 'body': SAN_CERT_STR, + 'chain': INTERMEDIATE_CERT_STR + '\n' + SAN_CERT_STR, + } + data, errors = CertificateUploadInputSchema().load(data) + assert errors == {'_schema': ["Incorrect chain certificate(s) provided: 'LemurTrust Unittests Class 1 CA 2018' is " + "not signed by 'san.example.org'"]} + + def test_create_basic_csr(client): csr_config = dict( common_name='example.com',