From 542e9539199d4c3a51c77ee9911e28cff7afcf90 Mon Sep 17 00:00:00 2001 From: Marti Raudsepp Date: Wed, 20 Jun 2018 18:42:34 +0300 Subject: [PATCH] Check that stored private keys match certificates This is done in two places: * Certificate import validator -- throws validation errors. * Certificate model constructor -- to ensure integrity of Lemur's data even when issuer plugins or other code paths have bugs. --- lemur/certificates/models.py | 14 ++++- lemur/certificates/schemas.py | 26 ++++++++-- lemur/common/utils.py | 15 ++++++ lemur/common/validators.py | 33 ++++++------ lemur/tests/conftest.py | 9 +++- lemur/tests/factories.py | 5 ++ lemur/tests/test_certificates.py | 88 ++++++++++++++++++++++++++++++-- lemur/tests/test_validators.py | 22 ++++++-- 8 files changed, 181 insertions(+), 31 deletions(-) diff --git a/lemur/certificates/models.py b/lemur/certificates/models.py index e2ac2cba..3eaba746 100644 --- a/lemur/certificates/models.py +++ b/lemur/certificates/models.py @@ -19,7 +19,7 @@ from sqlalchemy.sql.expression import case, extract from sqlalchemy_utils.types.arrow import ArrowType from werkzeug.utils import cached_property -from lemur.common import defaults, utils +from lemur.common import defaults, utils, validators from lemur.constants import SUCCESS_METRIC_STATUS, FAILURE_METRIC_STATUS from lemur.database import db from lemur.domains.models import Domain @@ -186,6 +186,18 @@ class Certificate(db.Model): for domain in defaults.domains(cert): self.domains.append(Domain(name=domain)) + # Check integrity before saving anything into the database. + # For user-facing API calls, validation should also be done in schema validators. + self.check_integrity() + + def check_integrity(self): + """ + Integrity checks: Does the cert have a matching private key? + """ + if self.private_key: + validators.verify_private_key_match(utils.parse_private_key(self.private_key), self.parsed_cert, + error_class=AssertionError) + @cached_property def parsed_cert(self): assert self.body, "Certificate body not set" diff --git a/lemur/certificates/schemas.py b/lemur/certificates/schemas.py index bf18eac9..6b457086 100644 --- a/lemur/certificates/schemas.py +++ b/lemur/certificates/schemas.py @@ -10,7 +10,7 @@ from marshmallow import fields, validate, validates_schema, post_load, pre_load from marshmallow.exceptions import ValidationError from lemur.authorities.schemas import AuthorityNestedOutputSchema -from lemur.common import validators, missing +from lemur.common import missing, utils, validators from lemur.common.fields import ArrowDateTime, Hex from lemur.common.schema import LemurInputSchema, LemurOutputSchema from lemur.constants import CERTIFICATE_KEY_TYPES @@ -242,8 +242,8 @@ class CertificateUploadInputSchema(CertificateCreationSchema): authority = fields.Nested(AssociatedAuthoritySchema, required=False) notify = fields.Boolean(missing=True) external_id = fields.String(missing=None, allow_none=True) - private_key = fields.String(validate=validators.private_key) - body = fields.String(required=True, validate=validators.public_certificate) + private_key = fields.String() + body = fields.String(required=True) chain = fields.String(validate=validators.public_certificate, missing=None, allow_none=True) # TODO this could be multiple certificates @@ -258,6 +258,26 @@ class CertificateUploadInputSchema(CertificateCreationSchema): if not data.get('private_key'): raise ValidationError('Destinations require private key.') + @validates_schema + def validate_cert_private_key(self, data): + cert = None + key = None + if data.get('body'): + try: + cert = utils.parse_certificate(data['body']) + except ValueError: + raise ValidationError("Public certificate presented is not valid.", field_names=['body']) + + if data.get('private_key'): + try: + key = utils.parse_private_key(data['private_key']) + except ValueError: + raise ValidationError("Private key presented is not valid.", field_names=['private_key']) + + if cert and key: + # Throws ValidationError + validators.verify_private_key_match(key, cert) + class CertificateExportInputSchema(LemurInputSchema): plugin = fields.Nested(PluginInputSchema) diff --git a/lemur/common/utils.py b/lemur/common/utils.py index 7ea9d7f2..62e59d69 100644 --- a/lemur/common/utils.py +++ b/lemur/common/utils.py @@ -13,6 +13,7 @@ import sqlalchemy from cryptography import x509 from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives.asymmetric import rsa, ec +from cryptography.hazmat.primitives.serialization import load_pem_private_key from flask_restful.reqparse import RequestParser from sqlalchemy import and_, func @@ -52,6 +53,20 @@ def parse_certificate(body): return x509.load_pem_x509_certificate(body, default_backend()) +def parse_private_key(private_key): + """ + Parses a PEM-format private key (RSA, DSA, ECDSA or any other supported algorithm). + + Raises ValueError for an invalid string. + + :param private_key: String containing PEM private key + """ + if isinstance(private_key, str): + private_key = private_key.encode('utf8') + + return load_pem_private_key(private_key, password=None, backend=default_backend()) + + def parse_csr(csr): """ Helper function that parses a CSR. diff --git a/lemur/common/validators.py b/lemur/common/validators.py index 47a94a30..90169553 100644 --- a/lemur/common/validators.py +++ b/lemur/common/validators.py @@ -2,14 +2,12 @@ import re from cryptography import x509 from cryptography.hazmat.backends import default_backend -from cryptography.hazmat.primitives import serialization from cryptography.x509 import NameOID from flask import current_app from marshmallow.exceptions import ValidationError from lemur.auth.permissions import SensitiveDomainPermission from lemur.common.utils import parse_certificate, is_weekend -from lemur.domains import service as domain_service def public_certificate(body): @@ -26,22 +24,6 @@ def public_certificate(body): raise ValidationError('Public certificate presented is not valid.') -def private_key(key): - """ - User to validate that a given string is a RSA private key - - :param key: - :return: :raise ValueError: - """ - try: - if isinstance(key, bytes): - serialization.load_pem_private_key(key, None, backend=default_backend()) - else: - serialization.load_pem_private_key(key.encode('utf-8'), None, backend=default_backend()) - except Exception: - raise ValidationError('Private key presented is not valid.') - - def common_name(value): """If the common name could be a domain name, apply domain validation rules.""" # Common name could be a domain name, or a human-readable name of the subject (often used in CA names or client @@ -66,6 +48,9 @@ def sensitive_domain(domain): raise ValidationError('Domain {0} does not match whitelisted domain patterns. ' 'Contact an administrator to issue the certificate.'.format(domain)) + # Avoid circular import. + from lemur.domains import service as domain_service + if any(d.sensitive for d in domain_service.get_by_name(domain)): raise ValidationError('Domain {0} has been marked as sensitive. ' 'Contact an administrator to issue the certificate.'.format(domain)) @@ -141,3 +126,15 @@ def dates(data): raise ValidationError('Validity end must not be after {0}'.format(data['authority'].authority_certificate.not_after)) return data + + +def verify_private_key_match(key, cert, error_class=ValidationError): + """ + Checks that the supplied private key matches the certificate. + + :param cert: Parsed certificate + :param key: Parsed private key + :param error_class: Exception class to raise on error + """ + if key.public_key().public_numbers() != cert.public_key().public_numbers(): + raise error_class("Private key does not match certificate.") diff --git a/lemur/tests/conftest.py b/lemur/tests/conftest.py index d292e6d6..9a48eb94 100644 --- a/lemur/tests/conftest.py +++ b/lemur/tests/conftest.py @@ -15,7 +15,7 @@ from lemur.tests.vectors import SAN_CERT_KEY, INTERMEDIATE_KEY from .factories import ApiKeyFactory, AuthorityFactory, NotificationFactory, DestinationFactory, \ CertificateFactory, UserFactory, RoleFactory, SourceFactory, EndpointFactory, \ - RotationPolicyFactory, PendingCertificateFactory, AsyncAuthorityFactory + RotationPolicyFactory, PendingCertificateFactory, AsyncAuthorityFactory, CryptoAuthorityFactory def pytest_runtest_setup(item): @@ -91,6 +91,13 @@ def authority(session): return a +@pytest.fixture +def crypto_authority(session): + a = CryptoAuthorityFactory() + session.commit() + return a + + @pytest.fixture def async_authority(session): a = AsyncAuthorityFactory() diff --git a/lemur/tests/factories.py b/lemur/tests/factories.py index cae2c354..3717c64d 100644 --- a/lemur/tests/factories.py +++ b/lemur/tests/factories.py @@ -168,6 +168,11 @@ class AsyncAuthorityFactory(AuthorityFactory): authority_certificate = SubFactory(CertificateFactory) +class CryptoAuthorityFactory(AuthorityFactory): + """Authority factory based on 'cryptography' plugin.""" + plugin = {'slug': 'cryptography-issuer'} + + class DestinationFactory(BaseFactory): """Destination factory.""" plugin_name = 'test-destination' diff --git a/lemur/tests/test_certificates.py b/lemur/tests/test_certificates.py index 87416a7a..a1df1c0d 100644 --- a/lemur/tests/test_certificates.py +++ b/lemur/tests/test_certificates.py @@ -18,7 +18,7 @@ from lemur.domains.models import Domain from lemur.tests.vectors import VALID_ADMIN_API_TOKEN, VALID_ADMIN_HEADER_TOKEN, VALID_USER_HEADER_TOKEN, CSR_STR, \ - INTERMEDIATE_CERT_STR, SAN_CERT_STR, SAN_CERT_KEY + INTERMEDIATE_CERT_STR, SAN_CERT_STR, SAN_CERT_KEY, ROOTCA_KEY, ROOTCA_CERT_STR def test_get_or_increase_name(session, certificate): @@ -365,6 +365,85 @@ def test_certificate_sensitive_name(client, authority, session, logged_in_user): assert errors['common_name'][0].startswith("Domain sensitive.example.com has been marked as sensitive") +def test_certificate_upload_schema_ok(client): + from lemur.certificates.schemas import CertificateUploadInputSchema + data = { + 'name': 'Jane', + 'owner': 'pwner@example.com', + 'body': SAN_CERT_STR, + 'privateKey': SAN_CERT_KEY, + 'chain': INTERMEDIATE_CERT_STR, + 'external_id': '1234', + } + data, errors = CertificateUploadInputSchema().load(data) + assert not errors + + +def test_certificate_upload_schema_minimal(client): + from lemur.certificates.schemas import CertificateUploadInputSchema + data = { + 'owner': 'pwner@example.com', + 'body': SAN_CERT_STR, + } + data, errors = CertificateUploadInputSchema().load(data) + assert not errors + + +def test_certificate_upload_schema_long_chain(client): + from lemur.certificates.schemas import CertificateUploadInputSchema + data = { + 'owner': 'pwner@example.com', + 'body': SAN_CERT_STR, + 'chain': INTERMEDIATE_CERT_STR + '\n' + ROOTCA_CERT_STR + } + data, errors = CertificateUploadInputSchema().load(data) + assert not errors + + +def test_certificate_upload_schema_invalid_body(client): + from lemur.certificates.schemas import CertificateUploadInputSchema + data = { + 'owner': 'pwner@example.com', + 'body': 'Hereby I certify that this is a valid body', + } + data, errors = CertificateUploadInputSchema().load(data) + assert errors == {'body': ['Public certificate presented is not valid.']} + + +def test_certificate_upload_schema_invalid_pkey(client): + from lemur.certificates.schemas import CertificateUploadInputSchema + data = { + 'owner': 'pwner@example.com', + 'body': SAN_CERT_STR, + 'privateKey': 'Look at me Im a private key!!111', + } + data, errors = CertificateUploadInputSchema().load(data) + assert errors == {'private_key': ['Private key presented is not valid.']} + + +def test_certificate_upload_schema_invalid_chain(client): + from lemur.certificates.schemas import CertificateUploadInputSchema + data = { + 'body': SAN_CERT_STR, + 'chain': 'CHAINSAW', + 'owner': 'pwner@example.com', + } + data, errors = CertificateUploadInputSchema().load(data) + assert errors == {'chain': ['Public certificate presented is not valid.']} + + +def test_certificate_upload_schema_wrong_pkey(client): + from lemur.certificates.schemas import CertificateUploadInputSchema + data = { + 'body': SAN_CERT_STR, + 'privateKey': ROOTCA_KEY, + 'chain': INTERMEDIATE_CERT_STR, + 'owner': 'pwner@example.com', + } + data, errors = CertificateUploadInputSchema().load(data) + assert errors == {'_schema': ['Private key does not match certificate.']} + + def test_create_basic_csr(client): csr_config = dict( common_name='example.com', @@ -462,8 +541,11 @@ def test_create_certificate(issuer_plugin, authority, user): assert cert.name == 'ACustomName1' -def test_reissue_certificate(issuer_plugin, authority, certificate): +def test_reissue_certificate(issuer_plugin, crypto_authority, certificate, logged_in_user): from lemur.certificates.service import reissue_certificate + + # test-authority would return a mismatching private key, so use 'cryptography-issuer' plugin instead. + certificate.authority = crypto_authority new_cert = reissue_certificate(certificate) assert new_cert @@ -487,7 +569,7 @@ def test_import(user): assert str(cert.not_after) == '2047-12-31T22:00:00+00:00' assert str(cert.not_before) == '2017-12-31T22:00:00+00:00' assert cert.issuer == 'LemurTrustUnittestsClass1CA2018' - assert cert.name == 'SAN-san.example.org-LemurTrustUnittestsClass1CA2018-20171231-20471231-AFF2DB4F8D2D4D8E80FA382AE27C2333-2' + assert cert.name.startswith('SAN-san.example.org-LemurTrustUnittestsClass1CA2018-20171231-20471231') cert = import_certificate(body=SAN_CERT_STR, chain=INTERMEDIATE_CERT_STR, private_key=SAN_CERT_KEY, owner='joe@example.com', name='ACustomName2', creator=user['user']) assert cert.name == 'ACustomName2' diff --git a/lemur/tests/test_validators.py b/lemur/tests/test_validators.py index 815b7c9d..c3d5357d 100644 --- a/lemur/tests/test_validators.py +++ b/lemur/tests/test_validators.py @@ -1,16 +1,28 @@ -import pytest from datetime import datetime -from .vectors import SAN_CERT_KEY + +import pytest from marshmallow.exceptions import ValidationError +from lemur.common.utils import parse_private_key +from lemur.common.validators import verify_private_key_match +from lemur.tests.vectors import INTERMEDIATE_CERT, SAN_CERT, SAN_CERT_KEY + def test_private_key(session): - from lemur.common.validators import private_key + parse_private_key(SAN_CERT_KEY) - private_key(SAN_CERT_KEY) + with pytest.raises(ValueError): + parse_private_key('invalid_private_key') + + +def test_validate_private_key(session): + key = parse_private_key(SAN_CERT_KEY) + + verify_private_key_match(key, SAN_CERT) with pytest.raises(ValidationError): - private_key('invalid_private_key') + # Wrong key for certificate + verify_private_key_match(key, INTERMEDIATE_CERT) def test_sub_alt_type(session):