Enforce that PEM strings (certs, keys, CSR) are internally passed as str, not bytes

This was already true in most places but not 100%, leading to lots of redundant checks and conversions.
This commit is contained in:
Marti Raudsepp 2018-12-26 19:49:56 +02:00
parent c60b712523
commit e24a94d798
8 changed files with 27 additions and 61 deletions

View File

@ -221,11 +221,6 @@ def upload(**kwargs):
else:
kwargs['roles'] = roles
if kwargs.get('private_key'):
private_key = kwargs['private_key']
if not isinstance(private_key, bytes):
kwargs['private_key'] = private_key.encode('utf-8')
cert = Certificate(**kwargs)
cert.authority = kwargs.get('authority')
cert = database.create(cert)
@ -432,10 +427,7 @@ def create_csr(**csr_config):
encoding=serialization.Encoding.PEM,
format=serialization.PrivateFormat.TraditionalOpenSSL, # would like to use PKCS8 but AWS ELBs don't like it
encryption_algorithm=serialization.NoEncryption()
)
if isinstance(private_key, bytes):
private_key = private_key.decode('utf-8')
).decode('utf-8')
csr = request.public_bytes(
encoding=serialization.Encoding.PEM

View File

@ -48,24 +48,22 @@ def parse_certificate(body):
:param body:
:return:
"""
if isinstance(body, str):
body = body.encode('utf-8')
assert isinstance(body, str)
return x509.load_pem_x509_certificate(body, default_backend())
return x509.load_pem_x509_certificate(body.encode('utf-8'), default_backend())
def parse_private_key(private_key):
"""
Parses a PEM-format private key (RSA, DSA, ECDSA or any other supported algorithm).
Raises ValueError for an invalid string.
Raises ValueError for an invalid string. Raises AssertionError when passed value is not str-type.
:param private_key: String containing PEM private key
"""
if isinstance(private_key, str):
private_key = private_key.encode('utf8')
assert isinstance(private_key, str)
return load_pem_private_key(private_key, password=None, backend=default_backend())
return load_pem_private_key(private_key.encode('utf8'), password=None, backend=default_backend())
def parse_csr(csr):
@ -75,10 +73,9 @@ def parse_csr(csr):
:param csr:
:return:
"""
if isinstance(csr, str):
csr = csr.encode('utf-8')
assert isinstance(csr, str)
return x509.load_pem_x509_csr(csr, default_backend())
return x509.load_pem_x509_csr(csr.encode('utf-8'), default_backend())
def get_authority_key(body):

View File

@ -64,6 +64,7 @@ def upload_cert(name, body, private_key, path, cert_chain=None, **kwargs):
:param path:
:return:
"""
assert isinstance(private_key, str)
client = kwargs.pop('client')
if not path or path == '/':
@ -72,8 +73,6 @@ def upload_cert(name, body, private_key, path, cert_chain=None, **kwargs):
name = name + '-' + path.strip('/')
try:
if isinstance(private_key, bytes):
private_key = private_key.decode("utf-8")
if cert_chain:
return client.upload_server_certificate(
Path=path,

View File

@ -14,6 +14,7 @@ from cryptography import x509
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes, serialization
from lemur.common.utils import parse_private_key
from lemur.plugins.bases import IssuerPlugin
from lemur.plugins import lemur_cryptography as cryptography_issuer
@ -40,7 +41,8 @@ def issue_certificate(csr, options, private_key=None):
if options.get("authority"):
# Issue certificate signed by an existing lemur_certificates authority
issuer_subject = options['authority'].authority_certificate.subject
issuer_private_key = options['authority'].authority_certificate.private_key
assert private_key is None, "Private would be ignored, authority key used instead"
private_key = options['authority'].authority_certificate.private_key
chain_cert_pem = options['authority'].authority_certificate.body
authority_key_identifier_public = options['authority'].authority_certificate.public_key
authority_key_identifier_subject = x509.SubjectKeyIdentifier.from_public_key(authority_key_identifier_public)
@ -52,7 +54,6 @@ def issue_certificate(csr, options, private_key=None):
else:
# Issue certificate that is self-signed (new lemur_certificates root authority)
issuer_subject = csr.subject
issuer_private_key = private_key
chain_cert_pem = ""
authority_key_identifier_public = csr.public_key()
authority_key_identifier_subject = None
@ -112,11 +113,7 @@ def issue_certificate(csr, options, private_key=None):
# FIXME: Not implemented in lemur/schemas.py yet https://github.com/Netflix/lemur/issues/662
pass
private_key = serialization.load_pem_private_key(
bytes(str(issuer_private_key).encode('utf-8')),
password=None,
backend=default_backend()
)
private_key = parse_private_key(private_key)
cert = builder.sign(private_key, hashes.SHA256(), default_backend())
cert_pem = cert.public_bytes(

View File

@ -38,14 +38,9 @@ def create_csr(cert, chain, csr_tmp, key):
:param csr_tmp:
:param key:
"""
if isinstance(cert, bytes):
cert = cert.decode('utf-8')
if isinstance(chain, bytes):
chain = chain.decode('utf-8')
if isinstance(key, bytes):
key = key.decode('utf-8')
assert isinstance(cert, str)
assert isinstance(chain, str)
assert isinstance(key, str)
with mktempfile() as key_tmp:
with open(key_tmp, 'w') as f:

View File

@ -59,11 +59,8 @@ def split_chain(chain):
def create_truststore(cert, chain, jks_tmp, alias, passphrase):
if isinstance(cert, bytes):
cert = cert.decode('utf-8')
if isinstance(chain, bytes):
chain = chain.decode('utf-8')
assert isinstance(cert, str)
assert isinstance(chain, str)
with mktempfile() as cert_tmp:
with open(cert_tmp, 'w') as f:
@ -98,14 +95,9 @@ def create_truststore(cert, chain, jks_tmp, alias, passphrase):
def create_keystore(cert, chain, jks_tmp, key, alias, passphrase):
if isinstance(cert, bytes):
cert = cert.decode('utf-8')
if isinstance(chain, bytes):
chain = chain.decode('utf-8')
if isinstance(key, bytes):
key = key.decode('utf-8')
assert isinstance(cert, str)
assert isinstance(chain, str)
assert isinstance(key, str)
# Create PKCS12 keystore from private key and public certificate
with mktempfile() as cert_tmp:

View File

@ -44,14 +44,9 @@ def create_pkcs12(cert, chain, p12_tmp, key, alias, passphrase):
:param alias:
:param passphrase:
"""
if isinstance(cert, bytes):
cert = cert.decode('utf-8')
if isinstance(chain, bytes):
chain = chain.decode('utf-8')
if isinstance(key, bytes):
key = key.decode('utf-8')
assert isinstance(cert, str)
assert isinstance(chain, str)
assert isinstance(key, str)
with mktempfile() as key_tmp:
with open(key_tmp, 'w') as f:

View File

@ -3,12 +3,11 @@ import os
import datetime
import pytest
from cryptography import x509
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives.serialization import load_pem_private_key
from flask import current_app
from flask_principal import identity_changed, Identity
from lemur import create_app
from lemur.common.utils import parse_private_key
from lemur.database import db as _db
from lemur.auth.service import create_token
from lemur.tests.vectors import SAN_CERT_KEY, INTERMEDIATE_KEY
@ -235,12 +234,12 @@ def logged_in_admin(session, app):
@pytest.fixture
def private_key():
return load_pem_private_key(SAN_CERT_KEY.encode(), password=None, backend=default_backend())
return parse_private_key(SAN_CERT_KEY)
@pytest.fixture
def issuer_private_key():
return load_pem_private_key(INTERMEDIATE_KEY.encode(), password=None, backend=default_backend())
return parse_private_key(INTERMEDIATE_KEY)
@pytest.fixture