repaired several lint errors

This commit is contained in:
sirferl 2019-01-07 10:02:37 +01:00
parent f02178c154
commit c62bcd1456
1 changed files with 32 additions and 36 deletions

View File

@ -1,12 +1,11 @@
from lemur.plugins.bases import IssuerPlugin, SourcePlugin from lemur.plugins.bases import IssuerPlugin, SourcePlugin
import requests import requests
import datetime
import lemur_adcs as ADCS import lemur_adcs as ADCS
from certsrv import Certsrv from certsrv import Certsrv
import ssl
from OpenSSL import crypto from OpenSSL import crypto
from flask import current_app from flask import current_app
class ADCSIssuerPlugin(IssuerPlugin): class ADCSIssuerPlugin(IssuerPlugin):
title = 'ADCS' title = 'ADCS'
slug = 'adcs-issuer' slug = 'adcs-issuer'
@ -31,32 +30,33 @@ class ADCSIssuerPlugin(IssuerPlugin):
:param options: :param options:
:return: :return:
""" """
adcs_root = current_app.config.get('ADCS_ROOT')
adcs_issuing = current_app.config.get('ADCS_ISSUING')
role = {'username': '', 'password': '', 'name': 'adcs'} role = {'username': '', 'password': '', 'name': 'adcs'}
return constants.ADCS_ROOT, constants.ADCS_ISSUING, [role] return adcs_root, adcs_issuing, [role]
def create_certificate(self, csr, issuer_options): def create_certificate(self, csr, issuer_options):
adcs_server = current_app.config.get('ADCS_SERVER') adcs_server = current_app.config.get('ADCS_SERVER')
adcs_user = current_app.config.get('ADCS_USER') adcs_user = current_app.config.get('ADCS_USER')
adcs_pwd = current_app.config.get('ADCS_PWD') adcs_pwd = current_app.config.get('ADCS_PWD')
adcs_auth_method = current_app.config.get('ADCS_AUTH_METHOD') adcs_auth_method = current_app.config.get('ADCS_AUTH_METHOD')
ca_server = Certsrv(adcs_server, adcs_user, adcs_pwd, auth_method = adcs_auth_method) adcs_template = current_app.config.get('ADCS_TEMPLATE')
ca_server = Certsrv(adcs_server, adcs_user, adcs_pwd, auth_method=adcs_auth_method)
current_app.logger.info("Requesting CSR: {0}".format(csr)) current_app.logger.info("Requesting CSR: {0}".format(csr))
current_app.logger.info("Issuer options: {0}".format(issuer_options)) current_app.logger.info("Issuer options: {0}".format(issuer_options))
cert, req_id = ca_server.get_cert(csr, ADCS_TEMPLATE, encoding='b64').decode('utf-8').replace('\r\n', '\n') cert, req_id = ca_server.get_cert(csr, adcs_template, encoding='b64').decode('utf-8').replace('\r\n', '\n')
chain = ca_server.get_ca_cert(encoding='b64').decode('utf-8').replace('\r\n', '\n') chain = ca_server.get_ca_cert(encoding='b64').decode('utf-8').replace('\r\n', '\n')
return cert, chain, req_id return cert, chain, req_id
def revoke_certificate(self, certificate, comments): def revoke_certificate(self, certificate, comments):
# requests.put('a third party') raise NotImplementedError('Not implemented\n', self, certificate, comments)
raise NotImplementedError('Not implemented\n', self,certificate, comments)
def get_ordered_certificate(self, order_id): def get_ordered_certificate(self, order_id):
# requests.get('already existing certificate') raise NotImplementedError('Not implemented\n', self, order_id)
raise NotImplementedError('Not implemented\n',self, order_id)
def canceled_ordered_certificate(self, pending_cert, **kwargs): def canceled_ordered_certificate(self, pending_cert, **kwargs):
# requests.put('cancel an order that has yet to be issued') raise NotImplementedError('Not implemented\n', self, pending_cert, **kwargs)
raise NotImplementedError('Not implemented\n',self, pending_cert, **kwargs)
class ADCSSourcePlugin(SourcePlugin): class ADCSSourcePlugin(SourcePlugin):
title = 'ADCS' title = 'ADCS'
@ -67,54 +67,50 @@ class ADCSSourcePlugin(SourcePlugin):
author = 'sirferl' author = 'sirferl'
author_url = 'https://github.com/sirferl/lemur' author_url = 'https://github.com/sirferl/lemur'
options = [ options = [
{ {
'name': 'dummy', 'name': 'dummy',
'type': 'str', 'type': 'str',
'required': False, 'required': False,
'validation': '/^[0-9]{12,12}$/', 'validation': '/^[0-9]{12,12}$/',
'helpMessage': 'Just to prevent error' 'helpMessage': 'Just to prevent error'
} }
] ]
def get_certificates(self,options, **kwargs): def get_certificates(self, options, **kwargs):
adcs_server = current_app.config.get('ADCS_SERVER') adcs_server = current_app.config.get('ADCS_SERVER')
adcs_user = current_app.config.get('ADCS_USER') adcs_user = current_app.config.get('ADCS_USER')
adcs_pwd = current_app.config.get('ADCS_PWD') adcs_pwd = current_app.config.get('ADCS_PWD')
adcs_auth_method = current_app.config.get('ADCS_AUTH_METHOD') adcs_auth_method = current_app.config.get('ADCS_AUTH_METHOD')
adcs_start = current_app.config.get('ADCS_START') adcs_start = current_app.config.get('ADCS_START')
adcs_stop = current_app.config.get('ADCS_STOP') adcs_stop = current_app.config.get('ADCS_STOP')
ca_server = Certsrv(adcs_server, adcs_user, adcs_pwd, auth_method = adcs_auth_method) ca_server = Certsrv(adcs_server, adcs_user, adcs_pwd, auth_method=adcs_auth_method)
out_certlist = [] out_certlist = []
for id in range(adcs_start,adcs_stop): for id in range(adcs_start, adcs_stop):
try: try:
cert = ca_server.get_existing_cert(id, encoding='b64').decode('utf-8').replace('\r\n', '\n') cert = ca_server.get_existing_cert(id, encoding='b64').decode('utf-8').replace('\r\n', '\n')
except Exception as err: except Exception as err:
if '{0}'.format(err).find("CERTSRV_E_PROPERTY_EMPTY"): if '{0}'.format(err).find("CERTSRV_E_PROPERTY_EMPTY"):
#this error indicates end of certificate list(?), so we stop # this error indicates end of certificate list(?), so we stop
break break
else: else:
# We do nothing in case there is no certificate returned with the current id for other reasons # We do nothing in case there is no certificate returned with the current id for other reasons
current_app.logger.info("Error with id {0}: {1}".format(id, err)) current_app.logger.info("Error with id {0}: {1}".format(id, err))
else: else:
#we have a certificate # we have a certificate
pubkey = crypto.load_certificate(crypto.FILETYPE_PEM, cert) pubkey = crypto.load_certificate(crypto.FILETYPE_PEM, cert)
#loop through extensions to see if we find "TLS Web Server Authentication" # loop through extensions to see if we find "TLS Web Server Authentication"
for e_id in range(0,pubkey.get_extension_count()-1): for e_id in range(0, pubkey.get_extension_count() - 1):
try: try:
extension = '{0}'.format(pubkey.get_extension(e_id)) extension = '{0}'.format(pubkey.get_extension(e_id))
except: except Exception:
extensionn = '' extensionn = ''
if extension.find("TLS Web Server Authentication") != -1: if extension.find("TLS Web Server Authentication") != -1:
out_certlist.append ( { out_certlist.append({
'name': format(pubkey.get_subject().CN), 'name': format(pubkey.get_subject().CN),
'body' : cert}) 'body': cert})
break break
return out_certlist return out_certlist
def get_endpoints(self, options, **kwargs):
def get_endpoints(self, options, **kwargs):
# There are no endpoints in the ADCS # There are no endpoints in the ADCS
raise NotImplementedError('Not implemented\n',self, options, **kwargs) raise NotImplementedError('Not implemented\n', self, options, **kwargs)