Lemur cryptography refactor and updates (#668)
* Renaming the function so it sounds less root-specific * Refactoring lemur_cryptography * Adding to the certificate interface an easy way to request the subject and public_key of a certificate * Turning the create authority functionality into a wrapper of creating a CSR in the certificate codebase and issueing that certificate in this plugin. (Dependent on https://github.com/Netflix/lemur/pull/666 changes first) * Ensuring that intermediate certificates and signed certificates retain their chain cert data * Handling extensions that are the responsibility of the CA Implementing authority_key_identifier for lemur_cryptography signatures and including skeletons of handling the certificate_info_access and crl_distribution_points * Fixing errors found with linter * Updating plugin unit tests * Changing this for Python3. Underlying cryptography library expects these to be bytes now. * Updating tests to match new function names/interfaces * Another naming update in the plugin tests * Appears that create_csr won't like this input without an owner. * Undoing last commit and putting it into the right place this time. * create_csr should be good now with these options, and chain certs will be blank in tests * This won't be blank in issue_certificate, like it will in creating an authority. * Much cleaner * unnecessary import
This commit is contained in:
parent
b7833d8e09
commit
117009c0a2
|
@ -200,6 +200,16 @@ class Certificate(db.Model):
|
|||
def validity_range(self):
|
||||
return self.not_after - self.not_before
|
||||
|
||||
@property
|
||||
def subject(self):
|
||||
cert = lemur.common.utils.parse_certificate(self.body)
|
||||
return cert.subject
|
||||
|
||||
@property
|
||||
def public_key(self):
|
||||
cert = lemur.common.utils.parse_certificate(self.body)
|
||||
return cert.public_key()
|
||||
|
||||
@hybrid_property
|
||||
def expired(self):
|
||||
if self.not_after <= arrow.utcnow():
|
||||
|
|
|
@ -17,78 +17,114 @@ from cryptography.hazmat.primitives import hashes, serialization
|
|||
from lemur.plugins.bases import IssuerPlugin
|
||||
from lemur.plugins import lemur_cryptography as cryptography_issuer
|
||||
|
||||
from lemur.common.utils import generate_private_key
|
||||
from lemur.certificates.service import create_csr
|
||||
|
||||
|
||||
def build_root_certificate(options):
|
||||
private_key = generate_private_key(options.get('key_type'))
|
||||
def build_certificate_authority(options):
|
||||
options['certificate_authority'] = True
|
||||
csr, private_key = create_csr(**options)
|
||||
cert_pem, chain_cert_pem = issue_certificate(csr, options, private_key)
|
||||
|
||||
subject = issuer = x509.Name([
|
||||
x509.NameAttribute(x509.OID_COUNTRY_NAME, options['country']),
|
||||
x509.NameAttribute(x509.OID_STATE_OR_PROVINCE_NAME, options['state']),
|
||||
x509.NameAttribute(x509.OID_LOCALITY_NAME, options['location']),
|
||||
x509.NameAttribute(x509.OID_ORGANIZATION_NAME, options['organization']),
|
||||
x509.NameAttribute(x509.OID_ORGANIZATIONAL_UNIT_NAME, options['organizational_unit']),
|
||||
x509.NameAttribute(x509.OID_COMMON_NAME, options['common_name'])
|
||||
])
|
||||
|
||||
builder = x509.CertificateBuilder(
|
||||
subject_name=subject,
|
||||
issuer_name=issuer,
|
||||
public_key=private_key.public_key(),
|
||||
not_valid_after=options['validity_end'],
|
||||
not_valid_before=options['validity_start'],
|
||||
serial_number=options['first_serial']
|
||||
)
|
||||
|
||||
builder.add_extension(x509.SubjectAlternativeName([x509.DNSName(options['common_name'])]), critical=False)
|
||||
|
||||
cert = builder.sign(private_key, hashes.SHA256(), default_backend())
|
||||
|
||||
cert_pem = cert.public_bytes(
|
||||
encoding=serialization.Encoding.PEM
|
||||
).decode('utf-8')
|
||||
|
||||
private_key_pem = private_key.private_bytes(
|
||||
encoding=serialization.Encoding.PEM,
|
||||
format=serialization.PrivateFormat.TraditionalOpenSSL, # would like to use PKCS8 but AWS ELBs don't like it
|
||||
encryption_algorithm=serialization.NoEncryption()
|
||||
)
|
||||
|
||||
return cert_pem, private_key_pem
|
||||
return cert_pem, private_key, chain_cert_pem
|
||||
|
||||
|
||||
def issue_certificate(csr, options):
|
||||
def issue_certificate(csr, options, private_key=None):
|
||||
csr = x509.load_pem_x509_csr(csr.encode('utf-8'), default_backend())
|
||||
|
||||
if options.get("parent"):
|
||||
# creating intermediate authorities will have options['parent'] to specify the issuer
|
||||
# creating certificates will have options['authority'] to specify the issuer
|
||||
# This works around that by making sure options['authority'] can be referenced for either
|
||||
options['authority'] = options['parent']
|
||||
|
||||
if options.get("authority"):
|
||||
# Issue certificate signed by an existing lemur_certificates authority
|
||||
issuer_subject = options['authority'].authority_certificate.subject
|
||||
issuer_private_key = options['authority'].authority_certificate.private_key
|
||||
chain_cert_pem = options['authority'].authority_certificate.body
|
||||
authority_key_identifier_public = options['authority'].authority_certificate.public_key
|
||||
authority_key_identifier_subject = x509.SubjectKeyIdentifier.from_public_key(authority_key_identifier_public)
|
||||
authority_key_identifier_issuer = issuer_subject
|
||||
authority_key_identifier_serial = int(options['authority'].authority_certificate.serial)
|
||||
# TODO figure out a better way to increment serial
|
||||
# New authorities have a value at options['serial_number'] that is being ignored here.
|
||||
serial = int(uuid.uuid4())
|
||||
else:
|
||||
# Issue certificate that is self-signed (new lemur_certificates root authority)
|
||||
issuer_subject = csr.subject
|
||||
issuer_private_key = private_key
|
||||
chain_cert_pem = ""
|
||||
authority_key_identifier_public = csr.public_key()
|
||||
authority_key_identifier_subject = None
|
||||
authority_key_identifier_issuer = csr.subject
|
||||
authority_key_identifier_serial = options['serial_number']
|
||||
# TODO figure out a better way to increment serial
|
||||
serial = int(uuid.uuid4())
|
||||
|
||||
builder = x509.CertificateBuilder(
|
||||
issuer_name=x509.Name([
|
||||
x509.NameAttribute(
|
||||
x509.OID_ORGANIZATION_NAME,
|
||||
options['authority'].authority_certificate.issuer
|
||||
)]
|
||||
),
|
||||
issuer_name=issuer_subject,
|
||||
subject_name=csr.subject,
|
||||
public_key=csr.public_key(),
|
||||
not_valid_before=options['validity_start'],
|
||||
not_valid_after=options['validity_end'],
|
||||
extensions=csr.extensions)
|
||||
serial_number=serial,
|
||||
extensions=csr.extensions._extensions)
|
||||
|
||||
# TODO figure out a better way to increment serial
|
||||
builder = builder.serial_number(int(uuid.uuid4()))
|
||||
for k, v in options.get('extensions', {}).items():
|
||||
if k == 'authority_key_identifier':
|
||||
# One or both of these options may be present inside the aki extension
|
||||
(authority_key_identifier, authority_identifier) = (False, False)
|
||||
for k2, v2 in v.items():
|
||||
if k2 == 'use_key_identifier' and v2:
|
||||
authority_key_identifier = True
|
||||
if k2 == 'use_authority_cert' and v2:
|
||||
authority_identifier = True
|
||||
if authority_key_identifier:
|
||||
if authority_key_identifier_subject:
|
||||
# FIXME in python-cryptography.
|
||||
# from_issuer_subject_key_identifier(cls, ski) is looking for ski.value.digest
|
||||
# but the digest of the ski is at just ski.digest. Until that library is fixed,
|
||||
# this function won't work. The second line has the same result.
|
||||
# aki = x509.AuthorityKeyIdentifier.from_issuer_subject_key_identifier(authority_key_identifier_subject)
|
||||
aki = x509.AuthorityKeyIdentifier(authority_key_identifier_subject.digest, None, None)
|
||||
else:
|
||||
aki = x509.AuthorityKeyIdentifier.from_issuer_public_key(authority_key_identifier_public)
|
||||
if authority_key_identifier and authority_identifier:
|
||||
aki = x509.AuthorityKeyIdentifier(aki.key_identifier, [x509.DirectoryName(authority_key_identifier_issuer)], authority_key_identifier_serial)
|
||||
elif authority_identifier:
|
||||
aki = x509.AuthorityKeyIdentifier(None, [x509.DirectoryName(authority_key_identifier_issuer)], authority_key_identifier_serial)
|
||||
builder = builder.add_extension(aki, critical=False)
|
||||
if k == 'certificate_info_access':
|
||||
# FIXME: Implement the AuthorityInformationAccess extension
|
||||
# descriptions = [
|
||||
# x509.AccessDescription(x509.oid.AuthorityInformationAccessOID.OCSP, x509.UniformResourceIdentifier(u"http://FIXME")),
|
||||
# x509.AccessDescription(x509.oid.AuthorityInformationAccessOID.CA_ISSUERS, x509.UniformResourceIdentifier(u"http://FIXME"))
|
||||
# ]
|
||||
# for k2, v2 in v.items():
|
||||
# if k2 == 'include_aia' and v2 == True:
|
||||
# builder = builder.add_extension(
|
||||
# x509.AuthorityInformationAccess(descriptions),
|
||||
# critical=False
|
||||
# )
|
||||
pass
|
||||
if k == 'crl_distribution_points':
|
||||
# FIXME: Implement the CRLDistributionPoints extension
|
||||
# FIXME: Not implemented in lemur/schemas.py yet https://github.com/Netflix/lemur/issues/662
|
||||
pass
|
||||
|
||||
private_key = serialization.load_pem_private_key(
|
||||
bytes(str(options['authority'].authority_certificate.private_key).encode('utf-8')),
|
||||
bytes(str(issuer_private_key).encode('utf-8')),
|
||||
password=None,
|
||||
backend=default_backend()
|
||||
)
|
||||
|
||||
cert = builder.sign(private_key, hashes.SHA256(), default_backend())
|
||||
|
||||
return cert.public_bytes(
|
||||
cert_pem = cert.public_bytes(
|
||||
encoding=serialization.Encoding.PEM
|
||||
).decode('utf-8')
|
||||
|
||||
return cert_pem, chain_cert_pem
|
||||
|
||||
|
||||
class CryptographyIssuerPlugin(IssuerPlugin):
|
||||
title = 'Cryptography'
|
||||
|
@ -108,8 +144,8 @@ class CryptographyIssuerPlugin(IssuerPlugin):
|
|||
:return: :raise Exception:
|
||||
"""
|
||||
current_app.logger.debug("Issuing new cryptography certificate with options: {0}".format(options))
|
||||
cert = issue_certificate(csr, options)
|
||||
return cert, ""
|
||||
cert_pem, chain_cert_pem = issue_certificate(csr, options)
|
||||
return cert_pem, chain_cert_pem
|
||||
|
||||
@staticmethod
|
||||
def create_authority(options):
|
||||
|
@ -121,9 +157,9 @@ class CryptographyIssuerPlugin(IssuerPlugin):
|
|||
:return:
|
||||
"""
|
||||
current_app.logger.debug("Issuing new cryptography authority with options: {0}".format(options))
|
||||
cert, private_key = build_root_certificate(options)
|
||||
cert_pem, private_key, chain_cert_pem = build_certificate_authority(options)
|
||||
roles = [
|
||||
{'username': '', 'password': '', 'name': options['name'] + '_admin'},
|
||||
{'username': '', 'password': '', 'name': options['name'] + '_operator'}
|
||||
]
|
||||
return cert, private_key, "", roles
|
||||
return cert_pem, private_key, chain_cert_pem, roles
|
||||
|
|
|
@ -1,8 +1,8 @@
|
|||
import arrow
|
||||
|
||||
|
||||
def test_build_root_certificate():
|
||||
from lemur.plugins.lemur_cryptography.plugin import build_root_certificate
|
||||
def test_build_certificate_authority():
|
||||
from lemur.plugins.lemur_cryptography.plugin import build_certificate_authority
|
||||
|
||||
options = {
|
||||
'key_type': 'RSA2048',
|
||||
|
@ -14,13 +14,15 @@ def test_build_root_certificate():
|
|||
'common_name': 'Example ROOT',
|
||||
'validity_start': arrow.get('2016-12-01').datetime,
|
||||
'validity_end': arrow.get('2016-12-02').datetime,
|
||||
'first_serial': 1
|
||||
|
||||
'first_serial': 1,
|
||||
'serial_number': 1,
|
||||
'owner': 'owner@example.com'
|
||||
}
|
||||
cert_pem, private_key_pem = build_root_certificate(options)
|
||||
cert_pem, private_key_pem, chain_cert_pem = build_certificate_authority(options)
|
||||
|
||||
assert cert_pem
|
||||
assert private_key_pem
|
||||
assert chain_cert_pem == ''
|
||||
|
||||
|
||||
def test_issue_certificate(authority):
|
||||
|
@ -32,5 +34,6 @@ def test_issue_certificate(authority):
|
|||
'validity_start': arrow.get('2016-12-01').datetime,
|
||||
'validity_end': arrow.get('2016-12-02').datetime
|
||||
}
|
||||
cert = issue_certificate(CSR_STR, options)
|
||||
assert cert
|
||||
cert_pem, chain_cert_pem = issue_certificate(CSR_STR, options)
|
||||
assert cert_pem
|
||||
assert chain_cert_pem
|
||||
|
|
Loading…
Reference in New Issue