Rewrite Java Keystore/Truststore support based on pyjks library

This commit is contained in:
Marti Raudsepp 2019-04-05 17:52:55 +03:00
parent a865675537
commit dbf34a4d48
10 changed files with 243 additions and 312 deletions

View File

@ -85,7 +85,9 @@ def parse_cert_chain(pem_chain):
:param pem_chain: string :param pem_chain: string
:return: List of parsed certificates :return: List of parsed certificates
""" """
return [parse_certificate(cert) for cert in split_pem(pem_chain) if pem_chain] if pem_chain is None:
return []
return [parse_certificate(cert) for cert in split_pem(pem_chain) if cert]
def parse_csr(csr): def parse_csr(csr):

View File

@ -1,246 +0,0 @@
"""
.. module: lemur.plugins.lemur_java.plugin
:platform: Unix
:copyright: (c) 2018 by Netflix Inc., see AUTHORS for more
:license: Apache, see LICENSE for more details.
.. moduleauthor:: Kevin Glisson <kglisson@netflix.com>
"""
import subprocess
from flask import current_app
from cryptography.fernet import Fernet
from lemur.utils import mktempfile, mktemppath
from lemur.plugins.bases import ExportPlugin
from lemur.plugins import lemur_java as java
from lemur.common.utils import parse_certificate
from lemur.common.defaults import common_name
def run_process(command):
"""
Runs a given command with pOpen and wraps some
error handling around it.
:param command:
:return:
"""
p = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
stdout, stderr = p.communicate()
if p.returncode != 0:
current_app.logger.debug(" ".join(command))
current_app.logger.error(stderr)
current_app.logger.error(stdout)
raise Exception(stderr)
def split_chain(chain):
"""
Split the chain into individual certificates for import into keystore
:param chain:
:return:
"""
certs = []
if not chain:
return certs
lines = chain.split('\n')
cert = []
for line in lines:
cert.append(line + '\n')
if line == '-----END CERTIFICATE-----':
certs.append("".join(cert))
cert = []
return certs
def create_truststore(cert, chain, jks_tmp, alias, passphrase):
assert isinstance(cert, str)
assert isinstance(chain, str)
with mktempfile() as cert_tmp:
with open(cert_tmp, 'w') as f:
f.write(cert)
run_process([
"keytool",
"-importcert",
"-file", cert_tmp,
"-keystore", jks_tmp,
"-alias", "{0}_cert".format(alias),
"-storepass", passphrase,
"-noprompt"
])
# Import the entire chain
for idx, cert in enumerate(split_chain(chain)):
with mktempfile() as c_tmp:
with open(c_tmp, 'w') as f:
f.write(cert)
# Import signed cert in to JKS keystore
run_process([
"keytool",
"-importcert",
"-file", c_tmp,
"-keystore", jks_tmp,
"-alias", "{0}_cert_{1}".format(alias, idx),
"-storepass", passphrase,
"-noprompt"
])
def create_keystore(cert, chain, jks_tmp, key, alias, passphrase):
assert isinstance(cert, str)
assert isinstance(chain, str)
assert isinstance(key, str)
# Create PKCS12 keystore from private key and public certificate
with mktempfile() as cert_tmp:
with open(cert_tmp, 'w') as f:
if chain:
f.writelines([key.strip() + "\n", cert.strip() + "\n", chain.strip() + "\n"])
else:
f.writelines([key.strip() + "\n", cert.strip() + "\n"])
with mktempfile() as p12_tmp:
run_process([
"openssl",
"pkcs12",
"-export",
"-nodes",
"-name", alias,
"-in", cert_tmp,
"-out", p12_tmp,
"-password", "pass:{}".format(passphrase)
])
# Convert PKCS12 keystore into a JKS keystore
run_process([
"keytool",
"-importkeystore",
"-destkeystore", jks_tmp,
"-srckeystore", p12_tmp,
"-srcstoretype", "pkcs12",
"-deststoretype", "JKS",
"-alias", alias,
"-srcstorepass", passphrase,
"-deststorepass", passphrase
])
class JavaTruststoreExportPlugin(ExportPlugin):
title = 'Java Truststore (JKS)'
slug = 'java-truststore-jks'
description = 'Attempts to generate a JKS truststore'
requires_key = False
version = java.VERSION
author = 'Kevin Glisson'
author_url = 'https://github.com/netflix/lemur'
options = [
{
'name': 'alias',
'type': 'str',
'required': False,
'helpMessage': 'Enter the alias you wish to use for the truststore.',
},
{
'name': 'passphrase',
'type': 'str',
'required': False,
'helpMessage': 'If no passphrase is given one will be generated for you, we highly recommend this. Minimum length is 8.',
'validation': ''
},
]
def export(self, body, chain, key, options, **kwargs):
"""
Generates a Java Truststore
:param key:
:param chain:
:param body:
:param options:
:param kwargs:
"""
if self.get_option('alias', options):
alias = self.get_option('alias', options)
else:
alias = "blah"
if self.get_option('passphrase', options):
passphrase = self.get_option('passphrase', options)
else:
passphrase = Fernet.generate_key().decode('utf-8')
with mktemppath() as jks_tmp:
create_truststore(body, chain, jks_tmp, alias, passphrase)
with open(jks_tmp, 'rb') as f:
raw = f.read()
return "jks", passphrase, raw
class JavaKeystoreExportPlugin(ExportPlugin):
title = 'Java Keystore (JKS)'
slug = 'java-keystore-jks'
description = 'Attempts to generate a JKS keystore'
version = java.VERSION
author = 'Kevin Glisson'
author_url = 'https://github.com/netflix/lemur'
options = [
{
'name': 'passphrase',
'type': 'str',
'required': False,
'helpMessage': 'If no passphrase is given one will be generated for you, we highly recommend this. Minimum length is 8.',
'validation': ''
},
{
'name': 'alias',
'type': 'str',
'required': False,
'helpMessage': 'Enter the alias you wish to use for the keystore.',
}
]
def export(self, body, chain, key, options, **kwargs):
"""
Generates a Java Keystore
:param key:
:param chain:
:param body:
:param options:
:param kwargs:
"""
if self.get_option('passphrase', options):
passphrase = self.get_option('passphrase', options)
else:
passphrase = Fernet.generate_key().decode('utf-8')
if self.get_option('alias', options):
alias = self.get_option('alias', options)
else:
alias = common_name(parse_certificate(body))
with mktemppath() as jks_tmp:
create_keystore(body, chain, jks_tmp, key, alias, passphrase)
with open(jks_tmp, 'rb') as f:
raw = f.read()
return "jks", passphrase, raw

View File

@ -1,63 +0,0 @@
import pytest
from lemur.tests.vectors import INTERNAL_CERTIFICATE_A_STR, INTERNAL_PRIVATE_KEY_A_STR
@pytest.mark.skip(reason="no way of currently testing this")
def test_export_truststore(app):
from lemur.plugins.base import plugins
p = plugins.get('java-truststore-jks')
options = [{'name': 'passphrase', 'value': 'test1234'}]
actual = p.export(INTERNAL_CERTIFICATE_A_STR, "", "", options)
assert actual[0] == 'jks'
assert actual[1] == 'test1234'
assert isinstance(actual[2], bytes)
@pytest.mark.skip(reason="no way of currently testing this")
def test_export_truststore_default_password(app):
from lemur.plugins.base import plugins
p = plugins.get('java-truststore-jks')
options = []
actual = p.export(INTERNAL_CERTIFICATE_A_STR, "", "", options)
assert actual[0] == 'jks'
assert isinstance(actual[1], str)
assert isinstance(actual[2], bytes)
@pytest.mark.skip(reason="no way of currently testing this")
def test_export_keystore(app):
from lemur.plugins.base import plugins
p = plugins.get('java-keystore-jks')
options = [{'name': 'passphrase', 'value': 'test1234'}]
with pytest.raises(Exception):
p.export(INTERNAL_CERTIFICATE_A_STR, "", "", options)
actual = p.export(INTERNAL_CERTIFICATE_A_STR, "", INTERNAL_PRIVATE_KEY_A_STR, options)
assert actual[0] == 'jks'
assert actual[1] == 'test1234'
assert isinstance(actual[2], bytes)
@pytest.mark.skip(reason="no way of currently testing this")
def test_export_keystore_default_password(app):
from lemur.plugins.base import plugins
p = plugins.get('java-keystore-jks')
options = []
with pytest.raises(Exception):
p.export(INTERNAL_CERTIFICATE_A_STR, "", "", options)
actual = p.export(INTERNAL_CERTIFICATE_A_STR, "", INTERNAL_PRIVATE_KEY_A_STR, options)
assert actual[0] == 'jks'
assert isinstance(actual[1], str)
assert isinstance(actual[2], bytes)

View File

@ -0,0 +1,140 @@
"""
.. module: lemur.plugins.lemur_jks.plugin
:platform: Unix
:copyright: (c) 2018 by Netflix Inc., see AUTHORS for more
:license: Apache, see LICENSE for more details.
.. moduleauthor:: Marti Raudsepp <marti@juffo.org>
"""
from cryptography.fernet import Fernet
from cryptography.hazmat.primitives import serialization
from jks import PrivateKeyEntry, KeyStore, TrustedCertEntry
from lemur.common.defaults import common_name
from lemur.common.utils import parse_certificate, parse_cert_chain, parse_private_key
from lemur.plugins import lemur_jks as jks
from lemur.plugins.bases import ExportPlugin
def cert_chain_as_der(cert, chain):
"""Return a certificate and its chain in a list format, as expected by pyjks."""
certs = [parse_certificate(cert)]
certs.extend(parse_cert_chain(chain))
# certs (list) A list of certificates, as byte strings. The first one should be the one belonging to the private
# key, the others the chain (in correct order).
return [cert.public_bytes(encoding=serialization.Encoding.DER) for cert in certs]
def create_truststore(cert, chain, alias, passphrase):
entries = []
for idx, cert_bytes in enumerate(cert_chain_as_der(cert, chain)):
# The original cert gets name <ALIAS>_cert, first chain element is <ALIAS>_cert_1, etc.
cert_alias = alias + '_cert' + ('_{}'.format(idx) if idx else '')
entries.append(TrustedCertEntry.new(cert_alias, cert_bytes))
return KeyStore.new('jks', entries).saves(passphrase)
def create_keystore(cert, chain, key, alias, passphrase):
certs_bytes = cert_chain_as_der(cert, chain)
key_bytes = parse_private_key(key).private_bytes(
encoding=serialization.Encoding.DER,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption()
)
entry = PrivateKeyEntry.new(alias, certs_bytes, key_bytes)
return KeyStore.new('jks', [entry]).saves(passphrase)
class JavaTruststoreExportPlugin(ExportPlugin):
title = 'Java Truststore (JKS)'
slug = 'java-truststore-jks'
description = 'Generates a JKS truststore'
requires_key = False
version = jks.VERSION
author = 'Marti Raudsepp'
author_url = 'https://github.com/intgr'
options = [
{
'name': 'alias',
'type': 'str',
'required': False,
'helpMessage': 'Enter the alias you wish to use for the truststore.',
},
{
'name': 'passphrase',
'type': 'str',
'required': False,
'helpMessage': 'If no passphrase is given one will be generated for you, we highly recommend this.',
'validation': ''
},
]
def export(self, body, chain, key, options, **kwargs):
"""
Generates a Java Truststore
"""
if self.get_option('alias', options):
alias = self.get_option('alias', options)
else:
alias = common_name(parse_certificate(body))
if self.get_option('passphrase', options):
passphrase = self.get_option('passphrase', options)
else:
passphrase = Fernet.generate_key().decode('utf-8')
raw = create_truststore(body, chain, alias, passphrase)
return 'jks', passphrase, raw
class JavaKeystoreExportPlugin(ExportPlugin):
title = 'Java Keystore (JKS)'
slug = 'java-keystore-jks'
description = 'Generates a JKS keystore'
version = jks.VERSION
author = 'Marti Raudsepp'
author_url = 'https://github.com/intgr'
options = [
{
'name': 'passphrase',
'type': 'str',
'required': False,
'helpMessage': 'If no passphrase is given one will be generated for you, we highly recommend this.',
'validation': ''
},
{
'name': 'alias',
'type': 'str',
'required': False,
'helpMessage': 'Enter the alias you wish to use for the keystore.',
}
]
def export(self, body, chain, key, options, **kwargs):
"""
Generates a Java Keystore
"""
if self.get_option('passphrase', options):
passphrase = self.get_option('passphrase', options)
else:
passphrase = Fernet.generate_key().decode('utf-8')
if self.get_option('alias', options):
alias = self.get_option('alias', options)
else:
alias = common_name(parse_certificate(body))
raw = create_keystore(body, chain, key, alias, passphrase)
return 'jks', passphrase, raw

View File

@ -0,0 +1,96 @@
import pytest
from jks import KeyStore, TrustedCertEntry, PrivateKeyEntry
from lemur.tests.vectors import INTERNAL_CERTIFICATE_A_STR, SAN_CERT_STR, INTERMEDIATE_CERT_STR, ROOTCA_CERT_STR, \
SAN_CERT_KEY
def test_export_truststore(app):
from lemur.plugins.base import plugins
p = plugins.get('java-truststore-jks')
options = [
{'name': 'passphrase', 'value': 'hunter2'},
{'name': 'alias', 'value': 'AzureDiamond'},
]
chain = INTERMEDIATE_CERT_STR + '\n' + ROOTCA_CERT_STR
ext, password, raw = p.export(SAN_CERT_STR, chain, SAN_CERT_KEY, options)
assert ext == 'jks'
assert password == 'hunter2'
assert isinstance(raw, bytes)
ks = KeyStore.loads(raw, 'hunter2')
assert ks.store_type == 'jks'
# JKS lower-cases alias strings
assert ks.entries.keys() == {'azurediamond_cert', 'azurediamond_cert_1', 'azurediamond_cert_2'}
assert isinstance(ks.entries['azurediamond_cert'], TrustedCertEntry)
def test_export_truststore_defaults(app):
from lemur.plugins.base import plugins
p = plugins.get('java-truststore-jks')
options = []
ext, password, raw = p.export(INTERNAL_CERTIFICATE_A_STR, '', '', options)
assert ext == 'jks'
assert isinstance(password, str)
assert isinstance(raw, bytes)
ks = KeyStore.loads(raw, password)
assert ks.store_type == 'jks'
# JKS lower-cases alias strings
assert ks.entries.keys() == {'acommonname_cert'}
assert isinstance(ks.entries['acommonname_cert'], TrustedCertEntry)
def test_export_keystore(app):
from lemur.plugins.base import plugins
p = plugins.get('java-keystore-jks')
options = [
{'name': 'passphrase', 'value': 'hunter2'},
{'name': 'alias', 'value': 'AzureDiamond'},
]
chain = INTERMEDIATE_CERT_STR + '\n' + ROOTCA_CERT_STR
with pytest.raises(Exception):
p.export(INTERNAL_CERTIFICATE_A_STR, chain, '', options)
ext, password, raw = p.export(SAN_CERT_STR, chain, SAN_CERT_KEY, options)
assert ext == 'jks'
assert password == 'hunter2'
assert isinstance(raw, bytes)
ks = KeyStore.loads(raw, password)
assert ks.store_type == 'jks'
# JKS lower-cases alias strings
assert ks.entries.keys() == {'azurediamond'}
entry = ks.entries['azurediamond']
assert isinstance(entry, PrivateKeyEntry)
assert len(entry.cert_chain) == 3 # Cert and chain were provided
def test_export_keystore_defaults(app):
from lemur.plugins.base import plugins
p = plugins.get('java-keystore-jks')
options = []
with pytest.raises(Exception):
p.export(INTERNAL_CERTIFICATE_A_STR, '', '', options)
ext, password, raw = p.export(SAN_CERT_STR, '', SAN_CERT_KEY, options)
assert ext == 'jks'
assert isinstance(password, str)
assert isinstance(raw, bytes)
ks = KeyStore.loads(raw, password)
assert ks.store_type == 'jks'
assert ks.entries.keys() == {'san.example.org'}
entry = ks.entries['san.example.org']
assert isinstance(entry, PrivateKeyEntry)
assert len(entry.cert_chain) == 1 # Only cert itself, no chain was provided

View File

@ -47,3 +47,4 @@ SQLAlchemy-Utils
tabulate tabulate
xmltodict xmltodict
pyyaml>=4.2b1 #high severity alert pyyaml>=4.2b1 #high severity alert
pyjks

View File

@ -86,3 +86,4 @@ urllib3==1.24.1 # via botocore, requests
vine==1.3.0 # via amqp vine==1.3.0 # via amqp
werkzeug==0.15.1 # via flask werkzeug==0.15.1 # via flask
xmltodict==0.12.0 xmltodict==0.12.0
pyjks==18.0.0

View File

@ -143,8 +143,8 @@ setup(
'aws_s3 = lemur.plugins.lemur_aws.plugin:S3DestinationPlugin', 'aws_s3 = lemur.plugins.lemur_aws.plugin:S3DestinationPlugin',
'email_notification = lemur.plugins.lemur_email.plugin:EmailNotificationPlugin', 'email_notification = lemur.plugins.lemur_email.plugin:EmailNotificationPlugin',
'slack_notification = lemur.plugins.lemur_slack.plugin:SlackNotificationPlugin', 'slack_notification = lemur.plugins.lemur_slack.plugin:SlackNotificationPlugin',
'java_truststore_export = lemur.plugins.lemur_java.plugin:JavaTruststoreExportPlugin', 'java_truststore_export = lemur.plugins.lemur_jks.plugin:JavaTruststoreExportPlugin',
'java_keystore_export = lemur.plugins.lemur_java.plugin:JavaKeystoreExportPlugin', 'java_keystore_export = lemur.plugins.lemur_jks.plugin:JavaKeystoreExportPlugin',
'openssl_export = lemur.plugins.lemur_openssl.plugin:OpenSSLExportPlugin', 'openssl_export = lemur.plugins.lemur_openssl.plugin:OpenSSLExportPlugin',
'atlas_metric = lemur.plugins.lemur_atlas.plugin:AtlasMetricPlugin', 'atlas_metric = lemur.plugins.lemur_atlas.plugin:AtlasMetricPlugin',
'kubernetes_destination = lemur.plugins.lemur_kubernetes.plugin:KubernetesDestinationPlugin', 'kubernetes_destination = lemur.plugins.lemur_kubernetes.plugin:KubernetesDestinationPlugin',