Merge pull request #2337 from intgr/enforce-certs-pkeys-are-str

Enforce that PEM strings (certs, keys, CSR) are internally passed as str, not bytes
This commit is contained in:
Hossein Shafagh 2019-02-01 16:30:25 -08:00 committed by GitHub
commit 73ac1591e0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 27 additions and 61 deletions

View File

@ -221,11 +221,6 @@ def upload(**kwargs):
else: else:
kwargs['roles'] = roles kwargs['roles'] = roles
if kwargs.get('private_key'):
private_key = kwargs['private_key']
if not isinstance(private_key, bytes):
kwargs['private_key'] = private_key.encode('utf-8')
cert = Certificate(**kwargs) cert = Certificate(**kwargs)
cert.authority = kwargs.get('authority') cert.authority = kwargs.get('authority')
cert = database.create(cert) cert = database.create(cert)
@ -432,10 +427,7 @@ def create_csr(**csr_config):
encoding=serialization.Encoding.PEM, encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.TraditionalOpenSSL, # would like to use PKCS8 but AWS ELBs don't like it format=serialization.PrivateFormat.TraditionalOpenSSL, # would like to use PKCS8 but AWS ELBs don't like it
encryption_algorithm=serialization.NoEncryption() encryption_algorithm=serialization.NoEncryption()
) ).decode('utf-8')
if isinstance(private_key, bytes):
private_key = private_key.decode('utf-8')
csr = request.public_bytes( csr = request.public_bytes(
encoding=serialization.Encoding.PEM encoding=serialization.Encoding.PEM

View File

@ -48,24 +48,22 @@ def parse_certificate(body):
:param body: :param body:
:return: :return:
""" """
if isinstance(body, str): assert isinstance(body, str)
body = body.encode('utf-8')
return x509.load_pem_x509_certificate(body, default_backend()) return x509.load_pem_x509_certificate(body.encode('utf-8'), default_backend())
def parse_private_key(private_key): def parse_private_key(private_key):
""" """
Parses a PEM-format private key (RSA, DSA, ECDSA or any other supported algorithm). Parses a PEM-format private key (RSA, DSA, ECDSA or any other supported algorithm).
Raises ValueError for an invalid string. Raises ValueError for an invalid string. Raises AssertionError when passed value is not str-type.
:param private_key: String containing PEM private key :param private_key: String containing PEM private key
""" """
if isinstance(private_key, str): assert isinstance(private_key, str)
private_key = private_key.encode('utf8')
return load_pem_private_key(private_key, password=None, backend=default_backend()) return load_pem_private_key(private_key.encode('utf8'), password=None, backend=default_backend())
def parse_csr(csr): def parse_csr(csr):
@ -75,10 +73,9 @@ def parse_csr(csr):
:param csr: :param csr:
:return: :return:
""" """
if isinstance(csr, str): assert isinstance(csr, str)
csr = csr.encode('utf-8')
return x509.load_pem_x509_csr(csr, default_backend()) return x509.load_pem_x509_csr(csr.encode('utf-8'), default_backend())
def get_authority_key(body): def get_authority_key(body):

View File

@ -64,6 +64,7 @@ def upload_cert(name, body, private_key, path, cert_chain=None, **kwargs):
:param path: :param path:
:return: :return:
""" """
assert isinstance(private_key, str)
client = kwargs.pop('client') client = kwargs.pop('client')
if not path or path == '/': if not path or path == '/':
@ -72,8 +73,6 @@ def upload_cert(name, body, private_key, path, cert_chain=None, **kwargs):
name = name + '-' + path.strip('/') name = name + '-' + path.strip('/')
try: try:
if isinstance(private_key, bytes):
private_key = private_key.decode("utf-8")
if cert_chain: if cert_chain:
return client.upload_server_certificate( return client.upload_server_certificate(
Path=path, Path=path,

View File

@ -14,6 +14,7 @@ from cryptography import x509
from cryptography.hazmat.backends import default_backend from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes, serialization from cryptography.hazmat.primitives import hashes, serialization
from lemur.common.utils import parse_private_key
from lemur.plugins.bases import IssuerPlugin from lemur.plugins.bases import IssuerPlugin
from lemur.plugins import lemur_cryptography as cryptography_issuer from lemur.plugins import lemur_cryptography as cryptography_issuer
@ -40,7 +41,8 @@ def issue_certificate(csr, options, private_key=None):
if options.get("authority"): if options.get("authority"):
# Issue certificate signed by an existing lemur_certificates authority # Issue certificate signed by an existing lemur_certificates authority
issuer_subject = options['authority'].authority_certificate.subject issuer_subject = options['authority'].authority_certificate.subject
issuer_private_key = options['authority'].authority_certificate.private_key assert private_key is None, "Private would be ignored, authority key used instead"
private_key = options['authority'].authority_certificate.private_key
chain_cert_pem = options['authority'].authority_certificate.body chain_cert_pem = options['authority'].authority_certificate.body
authority_key_identifier_public = options['authority'].authority_certificate.public_key authority_key_identifier_public = options['authority'].authority_certificate.public_key
authority_key_identifier_subject = x509.SubjectKeyIdentifier.from_public_key(authority_key_identifier_public) authority_key_identifier_subject = x509.SubjectKeyIdentifier.from_public_key(authority_key_identifier_public)
@ -52,7 +54,6 @@ def issue_certificate(csr, options, private_key=None):
else: else:
# Issue certificate that is self-signed (new lemur_certificates root authority) # Issue certificate that is self-signed (new lemur_certificates root authority)
issuer_subject = csr.subject issuer_subject = csr.subject
issuer_private_key = private_key
chain_cert_pem = "" chain_cert_pem = ""
authority_key_identifier_public = csr.public_key() authority_key_identifier_public = csr.public_key()
authority_key_identifier_subject = None authority_key_identifier_subject = None
@ -112,11 +113,7 @@ def issue_certificate(csr, options, private_key=None):
# FIXME: Not implemented in lemur/schemas.py yet https://github.com/Netflix/lemur/issues/662 # FIXME: Not implemented in lemur/schemas.py yet https://github.com/Netflix/lemur/issues/662
pass pass
private_key = serialization.load_pem_private_key( private_key = parse_private_key(private_key)
bytes(str(issuer_private_key).encode('utf-8')),
password=None,
backend=default_backend()
)
cert = builder.sign(private_key, hashes.SHA256(), default_backend()) cert = builder.sign(private_key, hashes.SHA256(), default_backend())
cert_pem = cert.public_bytes( cert_pem = cert.public_bytes(

View File

@ -38,14 +38,9 @@ def create_csr(cert, chain, csr_tmp, key):
:param csr_tmp: :param csr_tmp:
:param key: :param key:
""" """
if isinstance(cert, bytes): assert isinstance(cert, str)
cert = cert.decode('utf-8') assert isinstance(chain, str)
assert isinstance(key, str)
if isinstance(chain, bytes):
chain = chain.decode('utf-8')
if isinstance(key, bytes):
key = key.decode('utf-8')
with mktempfile() as key_tmp: with mktempfile() as key_tmp:
with open(key_tmp, 'w') as f: with open(key_tmp, 'w') as f:

View File

@ -59,11 +59,8 @@ def split_chain(chain):
def create_truststore(cert, chain, jks_tmp, alias, passphrase): def create_truststore(cert, chain, jks_tmp, alias, passphrase):
if isinstance(cert, bytes): assert isinstance(cert, str)
cert = cert.decode('utf-8') assert isinstance(chain, str)
if isinstance(chain, bytes):
chain = chain.decode('utf-8')
with mktempfile() as cert_tmp: with mktempfile() as cert_tmp:
with open(cert_tmp, 'w') as f: with open(cert_tmp, 'w') as f:
@ -98,14 +95,9 @@ def create_truststore(cert, chain, jks_tmp, alias, passphrase):
def create_keystore(cert, chain, jks_tmp, key, alias, passphrase): def create_keystore(cert, chain, jks_tmp, key, alias, passphrase):
if isinstance(cert, bytes): assert isinstance(cert, str)
cert = cert.decode('utf-8') assert isinstance(chain, str)
assert isinstance(key, str)
if isinstance(chain, bytes):
chain = chain.decode('utf-8')
if isinstance(key, bytes):
key = key.decode('utf-8')
# Create PKCS12 keystore from private key and public certificate # Create PKCS12 keystore from private key and public certificate
with mktempfile() as cert_tmp: with mktempfile() as cert_tmp:

View File

@ -44,14 +44,9 @@ def create_pkcs12(cert, chain, p12_tmp, key, alias, passphrase):
:param alias: :param alias:
:param passphrase: :param passphrase:
""" """
if isinstance(cert, bytes): assert isinstance(cert, str)
cert = cert.decode('utf-8') assert isinstance(chain, str)
assert isinstance(key, str)
if isinstance(chain, bytes):
chain = chain.decode('utf-8')
if isinstance(key, bytes):
key = key.decode('utf-8')
with mktempfile() as key_tmp: with mktempfile() as key_tmp:
with open(key_tmp, 'w') as f: with open(key_tmp, 'w') as f:

View File

@ -3,12 +3,11 @@ import os
import datetime import datetime
import pytest import pytest
from cryptography import x509 from cryptography import x509
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.serialization import load_pem_private_key
from flask import current_app from flask import current_app
from flask_principal import identity_changed, Identity from flask_principal import identity_changed, Identity
from lemur import create_app from lemur import create_app
from lemur.common.utils import parse_private_key
from lemur.database import db as _db from lemur.database import db as _db
from lemur.auth.service import create_token from lemur.auth.service import create_token
from lemur.tests.vectors import SAN_CERT_KEY, INTERMEDIATE_KEY from lemur.tests.vectors import SAN_CERT_KEY, INTERMEDIATE_KEY
@ -235,12 +234,12 @@ def logged_in_admin(session, app):
@pytest.fixture @pytest.fixture
def private_key(): def private_key():
return load_pem_private_key(SAN_CERT_KEY.encode(), password=None, backend=default_backend()) return parse_private_key(SAN_CERT_KEY)
@pytest.fixture @pytest.fixture
def issuer_private_key(): def issuer_private_key():
return load_pem_private_key(INTERMEDIATE_KEY.encode(), password=None, backend=default_backend()) return parse_private_key(INTERMEDIATE_KEY)
@pytest.fixture @pytest.fixture