Check that stored private keys match certificates
This is done in two places: * Certificate import validator -- throws validation errors. * Certificate model constructor -- to ensure integrity of Lemur's data even when issuer plugins or other code paths have bugs.
This commit is contained in:
parent
d60b0c8805
commit
542e953919
@ -19,7 +19,7 @@ from sqlalchemy.sql.expression import case, extract
|
||||
from sqlalchemy_utils.types.arrow import ArrowType
|
||||
from werkzeug.utils import cached_property
|
||||
|
||||
from lemur.common import defaults, utils
|
||||
from lemur.common import defaults, utils, validators
|
||||
from lemur.constants import SUCCESS_METRIC_STATUS, FAILURE_METRIC_STATUS
|
||||
from lemur.database import db
|
||||
from lemur.domains.models import Domain
|
||||
@ -186,6 +186,18 @@ class Certificate(db.Model):
|
||||
for domain in defaults.domains(cert):
|
||||
self.domains.append(Domain(name=domain))
|
||||
|
||||
# Check integrity before saving anything into the database.
|
||||
# For user-facing API calls, validation should also be done in schema validators.
|
||||
self.check_integrity()
|
||||
|
||||
def check_integrity(self):
|
||||
"""
|
||||
Integrity checks: Does the cert have a matching private key?
|
||||
"""
|
||||
if self.private_key:
|
||||
validators.verify_private_key_match(utils.parse_private_key(self.private_key), self.parsed_cert,
|
||||
error_class=AssertionError)
|
||||
|
||||
@cached_property
|
||||
def parsed_cert(self):
|
||||
assert self.body, "Certificate body not set"
|
||||
|
@ -10,7 +10,7 @@ from marshmallow import fields, validate, validates_schema, post_load, pre_load
|
||||
from marshmallow.exceptions import ValidationError
|
||||
|
||||
from lemur.authorities.schemas import AuthorityNestedOutputSchema
|
||||
from lemur.common import validators, missing
|
||||
from lemur.common import missing, utils, validators
|
||||
from lemur.common.fields import ArrowDateTime, Hex
|
||||
from lemur.common.schema import LemurInputSchema, LemurOutputSchema
|
||||
from lemur.constants import CERTIFICATE_KEY_TYPES
|
||||
@ -242,8 +242,8 @@ class CertificateUploadInputSchema(CertificateCreationSchema):
|
||||
authority = fields.Nested(AssociatedAuthoritySchema, required=False)
|
||||
notify = fields.Boolean(missing=True)
|
||||
external_id = fields.String(missing=None, allow_none=True)
|
||||
private_key = fields.String(validate=validators.private_key)
|
||||
body = fields.String(required=True, validate=validators.public_certificate)
|
||||
private_key = fields.String()
|
||||
body = fields.String(required=True)
|
||||
chain = fields.String(validate=validators.public_certificate, missing=None,
|
||||
allow_none=True) # TODO this could be multiple certificates
|
||||
|
||||
@ -258,6 +258,26 @@ class CertificateUploadInputSchema(CertificateCreationSchema):
|
||||
if not data.get('private_key'):
|
||||
raise ValidationError('Destinations require private key.')
|
||||
|
||||
@validates_schema
|
||||
def validate_cert_private_key(self, data):
|
||||
cert = None
|
||||
key = None
|
||||
if data.get('body'):
|
||||
try:
|
||||
cert = utils.parse_certificate(data['body'])
|
||||
except ValueError:
|
||||
raise ValidationError("Public certificate presented is not valid.", field_names=['body'])
|
||||
|
||||
if data.get('private_key'):
|
||||
try:
|
||||
key = utils.parse_private_key(data['private_key'])
|
||||
except ValueError:
|
||||
raise ValidationError("Private key presented is not valid.", field_names=['private_key'])
|
||||
|
||||
if cert and key:
|
||||
# Throws ValidationError
|
||||
validators.verify_private_key_match(key, cert)
|
||||
|
||||
|
||||
class CertificateExportInputSchema(LemurInputSchema):
|
||||
plugin = fields.Nested(PluginInputSchema)
|
||||
|
@ -13,6 +13,7 @@ import sqlalchemy
|
||||
from cryptography import x509
|
||||
from cryptography.hazmat.backends import default_backend
|
||||
from cryptography.hazmat.primitives.asymmetric import rsa, ec
|
||||
from cryptography.hazmat.primitives.serialization import load_pem_private_key
|
||||
from flask_restful.reqparse import RequestParser
|
||||
from sqlalchemy import and_, func
|
||||
|
||||
@ -52,6 +53,20 @@ def parse_certificate(body):
|
||||
return x509.load_pem_x509_certificate(body, default_backend())
|
||||
|
||||
|
||||
def parse_private_key(private_key):
|
||||
"""
|
||||
Parses a PEM-format private key (RSA, DSA, ECDSA or any other supported algorithm).
|
||||
|
||||
Raises ValueError for an invalid string.
|
||||
|
||||
:param private_key: String containing PEM private key
|
||||
"""
|
||||
if isinstance(private_key, str):
|
||||
private_key = private_key.encode('utf8')
|
||||
|
||||
return load_pem_private_key(private_key, password=None, backend=default_backend())
|
||||
|
||||
|
||||
def parse_csr(csr):
|
||||
"""
|
||||
Helper function that parses a CSR.
|
||||
|
@ -2,14 +2,12 @@ import re
|
||||
|
||||
from cryptography import x509
|
||||
from cryptography.hazmat.backends import default_backend
|
||||
from cryptography.hazmat.primitives import serialization
|
||||
from cryptography.x509 import NameOID
|
||||
from flask import current_app
|
||||
from marshmallow.exceptions import ValidationError
|
||||
|
||||
from lemur.auth.permissions import SensitiveDomainPermission
|
||||
from lemur.common.utils import parse_certificate, is_weekend
|
||||
from lemur.domains import service as domain_service
|
||||
|
||||
|
||||
def public_certificate(body):
|
||||
@ -26,22 +24,6 @@ def public_certificate(body):
|
||||
raise ValidationError('Public certificate presented is not valid.')
|
||||
|
||||
|
||||
def private_key(key):
|
||||
"""
|
||||
User to validate that a given string is a RSA private key
|
||||
|
||||
:param key:
|
||||
:return: :raise ValueError:
|
||||
"""
|
||||
try:
|
||||
if isinstance(key, bytes):
|
||||
serialization.load_pem_private_key(key, None, backend=default_backend())
|
||||
else:
|
||||
serialization.load_pem_private_key(key.encode('utf-8'), None, backend=default_backend())
|
||||
except Exception:
|
||||
raise ValidationError('Private key presented is not valid.')
|
||||
|
||||
|
||||
def common_name(value):
|
||||
"""If the common name could be a domain name, apply domain validation rules."""
|
||||
# Common name could be a domain name, or a human-readable name of the subject (often used in CA names or client
|
||||
@ -66,6 +48,9 @@ def sensitive_domain(domain):
|
||||
raise ValidationError('Domain {0} does not match whitelisted domain patterns. '
|
||||
'Contact an administrator to issue the certificate.'.format(domain))
|
||||
|
||||
# Avoid circular import.
|
||||
from lemur.domains import service as domain_service
|
||||
|
||||
if any(d.sensitive for d in domain_service.get_by_name(domain)):
|
||||
raise ValidationError('Domain {0} has been marked as sensitive. '
|
||||
'Contact an administrator to issue the certificate.'.format(domain))
|
||||
@ -141,3 +126,15 @@ def dates(data):
|
||||
raise ValidationError('Validity end must not be after {0}'.format(data['authority'].authority_certificate.not_after))
|
||||
|
||||
return data
|
||||
|
||||
|
||||
def verify_private_key_match(key, cert, error_class=ValidationError):
|
||||
"""
|
||||
Checks that the supplied private key matches the certificate.
|
||||
|
||||
:param cert: Parsed certificate
|
||||
:param key: Parsed private key
|
||||
:param error_class: Exception class to raise on error
|
||||
"""
|
||||
if key.public_key().public_numbers() != cert.public_key().public_numbers():
|
||||
raise error_class("Private key does not match certificate.")
|
||||
|
@ -15,7 +15,7 @@ from lemur.tests.vectors import SAN_CERT_KEY, INTERMEDIATE_KEY
|
||||
|
||||
from .factories import ApiKeyFactory, AuthorityFactory, NotificationFactory, DestinationFactory, \
|
||||
CertificateFactory, UserFactory, RoleFactory, SourceFactory, EndpointFactory, \
|
||||
RotationPolicyFactory, PendingCertificateFactory, AsyncAuthorityFactory
|
||||
RotationPolicyFactory, PendingCertificateFactory, AsyncAuthorityFactory, CryptoAuthorityFactory
|
||||
|
||||
|
||||
def pytest_runtest_setup(item):
|
||||
@ -91,6 +91,13 @@ def authority(session):
|
||||
return a
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def crypto_authority(session):
|
||||
a = CryptoAuthorityFactory()
|
||||
session.commit()
|
||||
return a
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def async_authority(session):
|
||||
a = AsyncAuthorityFactory()
|
||||
|
@ -168,6 +168,11 @@ class AsyncAuthorityFactory(AuthorityFactory):
|
||||
authority_certificate = SubFactory(CertificateFactory)
|
||||
|
||||
|
||||
class CryptoAuthorityFactory(AuthorityFactory):
|
||||
"""Authority factory based on 'cryptography' plugin."""
|
||||
plugin = {'slug': 'cryptography-issuer'}
|
||||
|
||||
|
||||
class DestinationFactory(BaseFactory):
|
||||
"""Destination factory."""
|
||||
plugin_name = 'test-destination'
|
||||
|
@ -18,7 +18,7 @@ from lemur.domains.models import Domain
|
||||
|
||||
|
||||
from lemur.tests.vectors import VALID_ADMIN_API_TOKEN, VALID_ADMIN_HEADER_TOKEN, VALID_USER_HEADER_TOKEN, CSR_STR, \
|
||||
INTERMEDIATE_CERT_STR, SAN_CERT_STR, SAN_CERT_KEY
|
||||
INTERMEDIATE_CERT_STR, SAN_CERT_STR, SAN_CERT_KEY, ROOTCA_KEY, ROOTCA_CERT_STR
|
||||
|
||||
|
||||
def test_get_or_increase_name(session, certificate):
|
||||
@ -365,6 +365,85 @@ def test_certificate_sensitive_name(client, authority, session, logged_in_user):
|
||||
assert errors['common_name'][0].startswith("Domain sensitive.example.com has been marked as sensitive")
|
||||
|
||||
|
||||
def test_certificate_upload_schema_ok(client):
|
||||
from lemur.certificates.schemas import CertificateUploadInputSchema
|
||||
data = {
|
||||
'name': 'Jane',
|
||||
'owner': 'pwner@example.com',
|
||||
'body': SAN_CERT_STR,
|
||||
'privateKey': SAN_CERT_KEY,
|
||||
'chain': INTERMEDIATE_CERT_STR,
|
||||
'external_id': '1234',
|
||||
}
|
||||
data, errors = CertificateUploadInputSchema().load(data)
|
||||
assert not errors
|
||||
|
||||
|
||||
def test_certificate_upload_schema_minimal(client):
|
||||
from lemur.certificates.schemas import CertificateUploadInputSchema
|
||||
data = {
|
||||
'owner': 'pwner@example.com',
|
||||
'body': SAN_CERT_STR,
|
||||
}
|
||||
data, errors = CertificateUploadInputSchema().load(data)
|
||||
assert not errors
|
||||
|
||||
|
||||
def test_certificate_upload_schema_long_chain(client):
|
||||
from lemur.certificates.schemas import CertificateUploadInputSchema
|
||||
data = {
|
||||
'owner': 'pwner@example.com',
|
||||
'body': SAN_CERT_STR,
|
||||
'chain': INTERMEDIATE_CERT_STR + '\n' + ROOTCA_CERT_STR
|
||||
}
|
||||
data, errors = CertificateUploadInputSchema().load(data)
|
||||
assert not errors
|
||||
|
||||
|
||||
def test_certificate_upload_schema_invalid_body(client):
|
||||
from lemur.certificates.schemas import CertificateUploadInputSchema
|
||||
data = {
|
||||
'owner': 'pwner@example.com',
|
||||
'body': 'Hereby I certify that this is a valid body',
|
||||
}
|
||||
data, errors = CertificateUploadInputSchema().load(data)
|
||||
assert errors == {'body': ['Public certificate presented is not valid.']}
|
||||
|
||||
|
||||
def test_certificate_upload_schema_invalid_pkey(client):
|
||||
from lemur.certificates.schemas import CertificateUploadInputSchema
|
||||
data = {
|
||||
'owner': 'pwner@example.com',
|
||||
'body': SAN_CERT_STR,
|
||||
'privateKey': 'Look at me Im a private key!!111',
|
||||
}
|
||||
data, errors = CertificateUploadInputSchema().load(data)
|
||||
assert errors == {'private_key': ['Private key presented is not valid.']}
|
||||
|
||||
|
||||
def test_certificate_upload_schema_invalid_chain(client):
|
||||
from lemur.certificates.schemas import CertificateUploadInputSchema
|
||||
data = {
|
||||
'body': SAN_CERT_STR,
|
||||
'chain': 'CHAINSAW',
|
||||
'owner': 'pwner@example.com',
|
||||
}
|
||||
data, errors = CertificateUploadInputSchema().load(data)
|
||||
assert errors == {'chain': ['Public certificate presented is not valid.']}
|
||||
|
||||
|
||||
def test_certificate_upload_schema_wrong_pkey(client):
|
||||
from lemur.certificates.schemas import CertificateUploadInputSchema
|
||||
data = {
|
||||
'body': SAN_CERT_STR,
|
||||
'privateKey': ROOTCA_KEY,
|
||||
'chain': INTERMEDIATE_CERT_STR,
|
||||
'owner': 'pwner@example.com',
|
||||
}
|
||||
data, errors = CertificateUploadInputSchema().load(data)
|
||||
assert errors == {'_schema': ['Private key does not match certificate.']}
|
||||
|
||||
|
||||
def test_create_basic_csr(client):
|
||||
csr_config = dict(
|
||||
common_name='example.com',
|
||||
@ -462,8 +541,11 @@ def test_create_certificate(issuer_plugin, authority, user):
|
||||
assert cert.name == 'ACustomName1'
|
||||
|
||||
|
||||
def test_reissue_certificate(issuer_plugin, authority, certificate):
|
||||
def test_reissue_certificate(issuer_plugin, crypto_authority, certificate, logged_in_user):
|
||||
from lemur.certificates.service import reissue_certificate
|
||||
|
||||
# test-authority would return a mismatching private key, so use 'cryptography-issuer' plugin instead.
|
||||
certificate.authority = crypto_authority
|
||||
new_cert = reissue_certificate(certificate)
|
||||
assert new_cert
|
||||
|
||||
@ -487,7 +569,7 @@ def test_import(user):
|
||||
assert str(cert.not_after) == '2047-12-31T22:00:00+00:00'
|
||||
assert str(cert.not_before) == '2017-12-31T22:00:00+00:00'
|
||||
assert cert.issuer == 'LemurTrustUnittestsClass1CA2018'
|
||||
assert cert.name == 'SAN-san.example.org-LemurTrustUnittestsClass1CA2018-20171231-20471231-AFF2DB4F8D2D4D8E80FA382AE27C2333-2'
|
||||
assert cert.name.startswith('SAN-san.example.org-LemurTrustUnittestsClass1CA2018-20171231-20471231')
|
||||
|
||||
cert = import_certificate(body=SAN_CERT_STR, chain=INTERMEDIATE_CERT_STR, private_key=SAN_CERT_KEY, owner='joe@example.com', name='ACustomName2', creator=user['user'])
|
||||
assert cert.name == 'ACustomName2'
|
||||
|
@ -1,16 +1,28 @@
|
||||
import pytest
|
||||
from datetime import datetime
|
||||
from .vectors import SAN_CERT_KEY
|
||||
|
||||
import pytest
|
||||
from marshmallow.exceptions import ValidationError
|
||||
|
||||
from lemur.common.utils import parse_private_key
|
||||
from lemur.common.validators import verify_private_key_match
|
||||
from lemur.tests.vectors import INTERMEDIATE_CERT, SAN_CERT, SAN_CERT_KEY
|
||||
|
||||
|
||||
def test_private_key(session):
|
||||
from lemur.common.validators import private_key
|
||||
parse_private_key(SAN_CERT_KEY)
|
||||
|
||||
private_key(SAN_CERT_KEY)
|
||||
with pytest.raises(ValueError):
|
||||
parse_private_key('invalid_private_key')
|
||||
|
||||
|
||||
def test_validate_private_key(session):
|
||||
key = parse_private_key(SAN_CERT_KEY)
|
||||
|
||||
verify_private_key_match(key, SAN_CERT)
|
||||
|
||||
with pytest.raises(ValidationError):
|
||||
private_key('invalid_private_key')
|
||||
# Wrong key for certificate
|
||||
verify_private_key_match(key, INTERMEDIATE_CERT)
|
||||
|
||||
|
||||
def test_sub_alt_type(session):
|
||||
|
Loading…
Reference in New Issue
Block a user