CSR Export Plugin (#988)

This plugin allows a certificate to be exported as a CSR via OpenSSL
x509.  The workflow will be:
* Create self-signed cert via Cryptography authority
* Export CSR via this plugin
* Sign your own cert outside of Lemur
* Import new cert with private key

Change-Id: Id3f7db2506bd959236cd3a6df622841058abda5a
This commit is contained in:
James Chuong 2017-11-14 10:11:06 -08:00 committed by kevgliss
parent e30e17038b
commit 4b544ae207
5 changed files with 124 additions and 0 deletions

View File

@ -0,0 +1,5 @@
try:
VERSION = __import__('pkg_resources') \
.get_distribution(__name__).version
except Exception as e:
VERSION = 'unknown'

View File

@ -0,0 +1,104 @@
"""
.. module: lemur.plugins.lemur_csr.plugin
An export plugin that exports CSR from a private key and certificate.
"""
from io import open
import subprocess
from flask import current_app
from lemur.utils import mktempfile, mktemppath
from lemur.plugins.bases import ExportPlugin
from lemur.plugins import lemur_csr as csr
def run_process(command):
"""
Runs a given command with pOpen and wraps some
error handling around it.
:param command:
:return:
"""
p = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
current_app.logger.debug(command)
stdout, stderr = p.communicate()
if p.returncode != 0:
current_app.logger.debug(" ".join(command))
current_app.logger.error(stderr)
raise Exception(stderr)
def create_csr(cert, chain, csr_tmp, key):
"""
Creates a csr from key and cert file.
:param cert:
:param chain:
:param csr_tmp:
:param key:
"""
if isinstance(cert, bytes):
cert = cert.decode('utf-8')
if isinstance(chain, bytes):
chain = chain.decode('utf-8')
if isinstance(key, bytes):
key = key.decode('utf-8')
with mktempfile() as key_tmp:
with open(key_tmp, 'w') as f:
f.write(key)
with mktempfile() as cert_tmp:
with open(cert_tmp, 'w') as f:
if chain:
f.writelines([cert.strip() + "\n", chain.strip() + "\n"])
else:
f.writelines([cert.strip() + "\n"])
output = subprocess.check_output([
"openssl",
"x509",
"-x509toreq",
"-in", cert_tmp,
"-signkey", key_tmp,
])
subprocess.run([
"openssl",
"req",
"-out", csr_tmp
], input=output)
class CSRExportPlugin(ExportPlugin):
title = 'CSR'
slug = 'openssl-csr'
description = 'Exports a CSR'
version = csr.VERSION
author = 'jchuong'
author_url = 'https://github.com/jchuong'
def export(self, body, chain, key, options, **kwargs):
"""
Creates CSR from certificate
:param key:
:param chain:
:param body:
:param options:
:param kwargs:
"""
with mktemppath() as output_tmp:
if not key:
raise Exception("Private Key required by CSR")
create_csr(body, chain, output_tmp, key)
extension = "csr"
with open(output_tmp, 'rb') as f:
raw = f.read()
# passphrase is None
return extension, None, raw

View File

@ -0,0 +1 @@
from lemur.tests.conftest import * # noqa

View File

@ -0,0 +1,13 @@
import pytest
from lemur.tests.vectors import INTERNAL_PRIVATE_KEY_A_STR, INTERNAL_CERTIFICATE_A_STR
def test_export_certificate_to_csr(app):
from lemur.plugins.base import plugins
p = plugins.get('openssl-csr')
options = []
with pytest.raises(Exception):
p.export(INTERNAL_CERTIFICATE_A_STR, "", "", options)
raw = p.export(INTERNAL_CERTIFICATE_A_STR, "", INTERNAL_PRIVATE_KEY_A_STR, options)
assert raw != b""

View File

@ -197,6 +197,7 @@ setup(
'digicert_issuer = lemur.plugins.lemur_digicert.plugin:DigiCertIssuerPlugin', 'digicert_issuer = lemur.plugins.lemur_digicert.plugin:DigiCertIssuerPlugin',
'digicert_cis_issuer = lemur.plugins.lemur_digicert.plugin:DigiCertCISIssuerPlugin', 'digicert_cis_issuer = lemur.plugins.lemur_digicert.plugin:DigiCertCISIssuerPlugin',
'digicert_cis_source = lemur.plugins.lemur_digicert.plugin:DigiCertCISSourcePlugin' 'digicert_cis_source = lemur.plugins.lemur_digicert.plugin:DigiCertCISSourcePlugin'
'csr_export = lemur.plugins.lemur_csr.plugin:CSRExportPlugin',
], ],
}, },
classifiers=[ classifiers=[