Merge pull request #2269 from intgr/verify-cert-private-key

Check that stored private keys match certificates
This commit is contained in:
Hossein Shafagh 2019-01-10 17:01:34 -08:00 committed by GitHub
commit 5118681cb5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 181 additions and 31 deletions

View File

@ -19,7 +19,7 @@ from sqlalchemy.sql.expression import case, extract
from sqlalchemy_utils.types.arrow import ArrowType from sqlalchemy_utils.types.arrow import ArrowType
from werkzeug.utils import cached_property from werkzeug.utils import cached_property
from lemur.common import defaults, utils from lemur.common import defaults, utils, validators
from lemur.constants import SUCCESS_METRIC_STATUS, FAILURE_METRIC_STATUS from lemur.constants import SUCCESS_METRIC_STATUS, FAILURE_METRIC_STATUS
from lemur.database import db from lemur.database import db
from lemur.domains.models import Domain from lemur.domains.models import Domain
@ -186,6 +186,18 @@ class Certificate(db.Model):
for domain in defaults.domains(cert): for domain in defaults.domains(cert):
self.domains.append(Domain(name=domain)) self.domains.append(Domain(name=domain))
# Check integrity before saving anything into the database.
# For user-facing API calls, validation should also be done in schema validators.
self.check_integrity()
def check_integrity(self):
"""
Integrity checks: Does the cert have a matching private key?
"""
if self.private_key:
validators.verify_private_key_match(utils.parse_private_key(self.private_key), self.parsed_cert,
error_class=AssertionError)
@cached_property @cached_property
def parsed_cert(self): def parsed_cert(self):
assert self.body, "Certificate body not set" assert self.body, "Certificate body not set"

View File

@ -10,7 +10,7 @@ from marshmallow import fields, validate, validates_schema, post_load, pre_load
from marshmallow.exceptions import ValidationError from marshmallow.exceptions import ValidationError
from lemur.authorities.schemas import AuthorityNestedOutputSchema from lemur.authorities.schemas import AuthorityNestedOutputSchema
from lemur.common import validators, missing from lemur.common import missing, utils, validators
from lemur.common.fields import ArrowDateTime, Hex from lemur.common.fields import ArrowDateTime, Hex
from lemur.common.schema import LemurInputSchema, LemurOutputSchema from lemur.common.schema import LemurInputSchema, LemurOutputSchema
from lemur.constants import CERTIFICATE_KEY_TYPES from lemur.constants import CERTIFICATE_KEY_TYPES
@ -242,8 +242,8 @@ class CertificateUploadInputSchema(CertificateCreationSchema):
authority = fields.Nested(AssociatedAuthoritySchema, required=False) authority = fields.Nested(AssociatedAuthoritySchema, required=False)
notify = fields.Boolean(missing=True) notify = fields.Boolean(missing=True)
external_id = fields.String(missing=None, allow_none=True) external_id = fields.String(missing=None, allow_none=True)
private_key = fields.String(validate=validators.private_key) private_key = fields.String()
body = fields.String(required=True, validate=validators.public_certificate) body = fields.String(required=True)
chain = fields.String(validate=validators.public_certificate, missing=None, chain = fields.String(validate=validators.public_certificate, missing=None,
allow_none=True) # TODO this could be multiple certificates allow_none=True) # TODO this could be multiple certificates
@ -258,6 +258,26 @@ class CertificateUploadInputSchema(CertificateCreationSchema):
if not data.get('private_key'): if not data.get('private_key'):
raise ValidationError('Destinations require private key.') raise ValidationError('Destinations require private key.')
@validates_schema
def validate_cert_private_key(self, data):
cert = None
key = None
if data.get('body'):
try:
cert = utils.parse_certificate(data['body'])
except ValueError:
raise ValidationError("Public certificate presented is not valid.", field_names=['body'])
if data.get('private_key'):
try:
key = utils.parse_private_key(data['private_key'])
except ValueError:
raise ValidationError("Private key presented is not valid.", field_names=['private_key'])
if cert and key:
# Throws ValidationError
validators.verify_private_key_match(key, cert)
class CertificateExportInputSchema(LemurInputSchema): class CertificateExportInputSchema(LemurInputSchema):
plugin = fields.Nested(PluginInputSchema) plugin = fields.Nested(PluginInputSchema)

View File

@ -13,6 +13,7 @@ import sqlalchemy
from cryptography import x509 from cryptography import x509
from cryptography.hazmat.backends import default_backend from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.asymmetric import rsa, ec from cryptography.hazmat.primitives.asymmetric import rsa, ec
from cryptography.hazmat.primitives.serialization import load_pem_private_key
from flask_restful.reqparse import RequestParser from flask_restful.reqparse import RequestParser
from sqlalchemy import and_, func from sqlalchemy import and_, func
@ -52,6 +53,20 @@ def parse_certificate(body):
return x509.load_pem_x509_certificate(body, default_backend()) return x509.load_pem_x509_certificate(body, default_backend())
def parse_private_key(private_key):
"""
Parses a PEM-format private key (RSA, DSA, ECDSA or any other supported algorithm).
Raises ValueError for an invalid string.
:param private_key: String containing PEM private key
"""
if isinstance(private_key, str):
private_key = private_key.encode('utf8')
return load_pem_private_key(private_key, password=None, backend=default_backend())
def parse_csr(csr): def parse_csr(csr):
""" """
Helper function that parses a CSR. Helper function that parses a CSR.

View File

@ -2,14 +2,12 @@ import re
from cryptography import x509 from cryptography import x509
from cryptography.hazmat.backends import default_backend from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
from cryptography.x509 import NameOID from cryptography.x509 import NameOID
from flask import current_app from flask import current_app
from marshmallow.exceptions import ValidationError from marshmallow.exceptions import ValidationError
from lemur.auth.permissions import SensitiveDomainPermission from lemur.auth.permissions import SensitiveDomainPermission
from lemur.common.utils import parse_certificate, is_weekend from lemur.common.utils import parse_certificate, is_weekend
from lemur.domains import service as domain_service
def public_certificate(body): def public_certificate(body):
@ -26,22 +24,6 @@ def public_certificate(body):
raise ValidationError('Public certificate presented is not valid.') raise ValidationError('Public certificate presented is not valid.')
def private_key(key):
"""
User to validate that a given string is a RSA private key
:param key:
:return: :raise ValueError:
"""
try:
if isinstance(key, bytes):
serialization.load_pem_private_key(key, None, backend=default_backend())
else:
serialization.load_pem_private_key(key.encode('utf-8'), None, backend=default_backend())
except Exception:
raise ValidationError('Private key presented is not valid.')
def common_name(value): def common_name(value):
"""If the common name could be a domain name, apply domain validation rules.""" """If the common name could be a domain name, apply domain validation rules."""
# Common name could be a domain name, or a human-readable name of the subject (often used in CA names or client # Common name could be a domain name, or a human-readable name of the subject (often used in CA names or client
@ -66,6 +48,9 @@ def sensitive_domain(domain):
raise ValidationError('Domain {0} does not match whitelisted domain patterns. ' raise ValidationError('Domain {0} does not match whitelisted domain patterns. '
'Contact an administrator to issue the certificate.'.format(domain)) 'Contact an administrator to issue the certificate.'.format(domain))
# Avoid circular import.
from lemur.domains import service as domain_service
if any(d.sensitive for d in domain_service.get_by_name(domain)): if any(d.sensitive for d in domain_service.get_by_name(domain)):
raise ValidationError('Domain {0} has been marked as sensitive. ' raise ValidationError('Domain {0} has been marked as sensitive. '
'Contact an administrator to issue the certificate.'.format(domain)) 'Contact an administrator to issue the certificate.'.format(domain))
@ -141,3 +126,15 @@ def dates(data):
raise ValidationError('Validity end must not be after {0}'.format(data['authority'].authority_certificate.not_after)) raise ValidationError('Validity end must not be after {0}'.format(data['authority'].authority_certificate.not_after))
return data return data
def verify_private_key_match(key, cert, error_class=ValidationError):
"""
Checks that the supplied private key matches the certificate.
:param cert: Parsed certificate
:param key: Parsed private key
:param error_class: Exception class to raise on error
"""
if key.public_key().public_numbers() != cert.public_key().public_numbers():
raise error_class("Private key does not match certificate.")

View File

@ -15,7 +15,7 @@ from lemur.tests.vectors import SAN_CERT_KEY, INTERMEDIATE_KEY
from .factories import ApiKeyFactory, AuthorityFactory, NotificationFactory, DestinationFactory, \ from .factories import ApiKeyFactory, AuthorityFactory, NotificationFactory, DestinationFactory, \
CertificateFactory, UserFactory, RoleFactory, SourceFactory, EndpointFactory, \ CertificateFactory, UserFactory, RoleFactory, SourceFactory, EndpointFactory, \
RotationPolicyFactory, PendingCertificateFactory, AsyncAuthorityFactory RotationPolicyFactory, PendingCertificateFactory, AsyncAuthorityFactory, CryptoAuthorityFactory
def pytest_runtest_setup(item): def pytest_runtest_setup(item):
@ -91,6 +91,13 @@ def authority(session):
return a return a
@pytest.fixture
def crypto_authority(session):
a = CryptoAuthorityFactory()
session.commit()
return a
@pytest.fixture @pytest.fixture
def async_authority(session): def async_authority(session):
a = AsyncAuthorityFactory() a = AsyncAuthorityFactory()

View File

@ -168,6 +168,11 @@ class AsyncAuthorityFactory(AuthorityFactory):
authority_certificate = SubFactory(CertificateFactory) authority_certificate = SubFactory(CertificateFactory)
class CryptoAuthorityFactory(AuthorityFactory):
"""Authority factory based on 'cryptography' plugin."""
plugin = {'slug': 'cryptography-issuer'}
class DestinationFactory(BaseFactory): class DestinationFactory(BaseFactory):
"""Destination factory.""" """Destination factory."""
plugin_name = 'test-destination' plugin_name = 'test-destination'

View File

@ -18,7 +18,7 @@ from lemur.domains.models import Domain
from lemur.tests.vectors import VALID_ADMIN_API_TOKEN, VALID_ADMIN_HEADER_TOKEN, VALID_USER_HEADER_TOKEN, CSR_STR, \ from lemur.tests.vectors import VALID_ADMIN_API_TOKEN, VALID_ADMIN_HEADER_TOKEN, VALID_USER_HEADER_TOKEN, CSR_STR, \
INTERMEDIATE_CERT_STR, SAN_CERT_STR, SAN_CERT_KEY INTERMEDIATE_CERT_STR, SAN_CERT_STR, SAN_CERT_KEY, ROOTCA_KEY, ROOTCA_CERT_STR
def test_get_or_increase_name(session, certificate): def test_get_or_increase_name(session, certificate):
@ -365,6 +365,85 @@ def test_certificate_sensitive_name(client, authority, session, logged_in_user):
assert errors['common_name'][0].startswith("Domain sensitive.example.com has been marked as sensitive") assert errors['common_name'][0].startswith("Domain sensitive.example.com has been marked as sensitive")
def test_certificate_upload_schema_ok(client):
from lemur.certificates.schemas import CertificateUploadInputSchema
data = {
'name': 'Jane',
'owner': 'pwner@example.com',
'body': SAN_CERT_STR,
'privateKey': SAN_CERT_KEY,
'chain': INTERMEDIATE_CERT_STR,
'external_id': '1234',
}
data, errors = CertificateUploadInputSchema().load(data)
assert not errors
def test_certificate_upload_schema_minimal(client):
from lemur.certificates.schemas import CertificateUploadInputSchema
data = {
'owner': 'pwner@example.com',
'body': SAN_CERT_STR,
}
data, errors = CertificateUploadInputSchema().load(data)
assert not errors
def test_certificate_upload_schema_long_chain(client):
from lemur.certificates.schemas import CertificateUploadInputSchema
data = {
'owner': 'pwner@example.com',
'body': SAN_CERT_STR,
'chain': INTERMEDIATE_CERT_STR + '\n' + ROOTCA_CERT_STR
}
data, errors = CertificateUploadInputSchema().load(data)
assert not errors
def test_certificate_upload_schema_invalid_body(client):
from lemur.certificates.schemas import CertificateUploadInputSchema
data = {
'owner': 'pwner@example.com',
'body': 'Hereby I certify that this is a valid body',
}
data, errors = CertificateUploadInputSchema().load(data)
assert errors == {'body': ['Public certificate presented is not valid.']}
def test_certificate_upload_schema_invalid_pkey(client):
from lemur.certificates.schemas import CertificateUploadInputSchema
data = {
'owner': 'pwner@example.com',
'body': SAN_CERT_STR,
'privateKey': 'Look at me Im a private key!!111',
}
data, errors = CertificateUploadInputSchema().load(data)
assert errors == {'private_key': ['Private key presented is not valid.']}
def test_certificate_upload_schema_invalid_chain(client):
from lemur.certificates.schemas import CertificateUploadInputSchema
data = {
'body': SAN_CERT_STR,
'chain': 'CHAINSAW',
'owner': 'pwner@example.com',
}
data, errors = CertificateUploadInputSchema().load(data)
assert errors == {'chain': ['Public certificate presented is not valid.']}
def test_certificate_upload_schema_wrong_pkey(client):
from lemur.certificates.schemas import CertificateUploadInputSchema
data = {
'body': SAN_CERT_STR,
'privateKey': ROOTCA_KEY,
'chain': INTERMEDIATE_CERT_STR,
'owner': 'pwner@example.com',
}
data, errors = CertificateUploadInputSchema().load(data)
assert errors == {'_schema': ['Private key does not match certificate.']}
def test_create_basic_csr(client): def test_create_basic_csr(client):
csr_config = dict( csr_config = dict(
common_name='example.com', common_name='example.com',
@ -462,8 +541,11 @@ def test_create_certificate(issuer_plugin, authority, user):
assert cert.name == 'ACustomName1' assert cert.name == 'ACustomName1'
def test_reissue_certificate(issuer_plugin, authority, certificate): def test_reissue_certificate(issuer_plugin, crypto_authority, certificate, logged_in_user):
from lemur.certificates.service import reissue_certificate from lemur.certificates.service import reissue_certificate
# test-authority would return a mismatching private key, so use 'cryptography-issuer' plugin instead.
certificate.authority = crypto_authority
new_cert = reissue_certificate(certificate) new_cert = reissue_certificate(certificate)
assert new_cert assert new_cert
@ -487,7 +569,7 @@ def test_import(user):
assert str(cert.not_after) == '2047-12-31T22:00:00+00:00' assert str(cert.not_after) == '2047-12-31T22:00:00+00:00'
assert str(cert.not_before) == '2017-12-31T22:00:00+00:00' assert str(cert.not_before) == '2017-12-31T22:00:00+00:00'
assert cert.issuer == 'LemurTrustUnittestsClass1CA2018' assert cert.issuer == 'LemurTrustUnittestsClass1CA2018'
assert cert.name == 'SAN-san.example.org-LemurTrustUnittestsClass1CA2018-20171231-20471231-AFF2DB4F8D2D4D8E80FA382AE27C2333-2' assert cert.name.startswith('SAN-san.example.org-LemurTrustUnittestsClass1CA2018-20171231-20471231')
cert = import_certificate(body=SAN_CERT_STR, chain=INTERMEDIATE_CERT_STR, private_key=SAN_CERT_KEY, owner='joe@example.com', name='ACustomName2', creator=user['user']) cert = import_certificate(body=SAN_CERT_STR, chain=INTERMEDIATE_CERT_STR, private_key=SAN_CERT_KEY, owner='joe@example.com', name='ACustomName2', creator=user['user'])
assert cert.name == 'ACustomName2' assert cert.name == 'ACustomName2'

View File

@ -1,16 +1,28 @@
import pytest
from datetime import datetime from datetime import datetime
from .vectors import SAN_CERT_KEY
import pytest
from marshmallow.exceptions import ValidationError from marshmallow.exceptions import ValidationError
from lemur.common.utils import parse_private_key
from lemur.common.validators import verify_private_key_match
from lemur.tests.vectors import INTERMEDIATE_CERT, SAN_CERT, SAN_CERT_KEY
def test_private_key(session): def test_private_key(session):
from lemur.common.validators import private_key parse_private_key(SAN_CERT_KEY)
private_key(SAN_CERT_KEY) with pytest.raises(ValueError):
parse_private_key('invalid_private_key')
def test_validate_private_key(session):
key = parse_private_key(SAN_CERT_KEY)
verify_private_key_match(key, SAN_CERT)
with pytest.raises(ValidationError): with pytest.raises(ValidationError):
private_key('invalid_private_key') # Wrong key for certificate
verify_private_key_match(key, INTERMEDIATE_CERT)
def test_sub_alt_type(session): def test_sub_alt_type(session):