From c62bcd1456bc35198a5895588e6ab042d0213fe5 Mon Sep 17 00:00:00 2001 From: sirferl Date: Mon, 7 Jan 2019 10:02:37 +0100 Subject: [PATCH] repaired several lint errors --- lemur/plugins/lemur_adcs/plugin.py | 68 ++++++++++++++---------------- 1 file changed, 32 insertions(+), 36 deletions(-) diff --git a/lemur/plugins/lemur_adcs/plugin.py b/lemur/plugins/lemur_adcs/plugin.py index 48a3e85b..31dba7b2 100644 --- a/lemur/plugins/lemur_adcs/plugin.py +++ b/lemur/plugins/lemur_adcs/plugin.py @@ -1,12 +1,11 @@ from lemur.plugins.bases import IssuerPlugin, SourcePlugin import requests -import datetime import lemur_adcs as ADCS from certsrv import Certsrv -import ssl from OpenSSL import crypto from flask import current_app + class ADCSIssuerPlugin(IssuerPlugin): title = 'ADCS' slug = 'adcs-issuer' @@ -27,36 +26,37 @@ class ADCSIssuerPlugin(IssuerPlugin): Creates an authority, this authority is then used by Lemur to allow a user to specify which Certificate Authority they want to sign their certificate. - + :param options: :return: """ + adcs_root = current_app.config.get('ADCS_ROOT') + adcs_issuing = current_app.config.get('ADCS_ISSUING') role = {'username': '', 'password': '', 'name': 'adcs'} - return constants.ADCS_ROOT, constants.ADCS_ISSUING, [role] + return adcs_root, adcs_issuing, [role] def create_certificate(self, csr, issuer_options): adcs_server = current_app.config.get('ADCS_SERVER') adcs_user = current_app.config.get('ADCS_USER') adcs_pwd = current_app.config.get('ADCS_PWD') adcs_auth_method = current_app.config.get('ADCS_AUTH_METHOD') - ca_server = Certsrv(adcs_server, adcs_user, adcs_pwd, auth_method = adcs_auth_method) + adcs_template = current_app.config.get('ADCS_TEMPLATE') + ca_server = Certsrv(adcs_server, adcs_user, adcs_pwd, auth_method=adcs_auth_method) current_app.logger.info("Requesting CSR: {0}".format(csr)) current_app.logger.info("Issuer options: {0}".format(issuer_options)) - cert, req_id = ca_server.get_cert(csr, ADCS_TEMPLATE, encoding='b64').decode('utf-8').replace('\r\n', '\n') + cert, req_id = ca_server.get_cert(csr, adcs_template, encoding='b64').decode('utf-8').replace('\r\n', '\n') chain = ca_server.get_ca_cert(encoding='b64').decode('utf-8').replace('\r\n', '\n') return cert, chain, req_id - + def revoke_certificate(self, certificate, comments): - # requests.put('a third party') - raise NotImplementedError('Not implemented\n', self,certificate, comments) - + raise NotImplementedError('Not implemented\n', self, certificate, comments) + def get_ordered_certificate(self, order_id): - # requests.get('already existing certificate') - raise NotImplementedError('Not implemented\n',self, order_id) - + raise NotImplementedError('Not implemented\n', self, order_id) + def canceled_ordered_certificate(self, pending_cert, **kwargs): - # requests.put('cancel an order that has yet to be issued') - raise NotImplementedError('Not implemented\n',self, pending_cert, **kwargs) + raise NotImplementedError('Not implemented\n', self, pending_cert, **kwargs) + class ADCSSourcePlugin(SourcePlugin): title = 'ADCS' @@ -67,54 +67,50 @@ class ADCSSourcePlugin(SourcePlugin): author = 'sirferl' author_url = 'https://github.com/sirferl/lemur' options = [ - { + { 'name': 'dummy', 'type': 'str', 'required': False, 'validation': '/^[0-9]{12,12}$/', 'helpMessage': 'Just to prevent error' } - ] - - def get_certificates(self,options, **kwargs): + + def get_certificates(self, options, **kwargs): adcs_server = current_app.config.get('ADCS_SERVER') adcs_user = current_app.config.get('ADCS_USER') adcs_pwd = current_app.config.get('ADCS_PWD') adcs_auth_method = current_app.config.get('ADCS_AUTH_METHOD') adcs_start = current_app.config.get('ADCS_START') adcs_stop = current_app.config.get('ADCS_STOP') - ca_server = Certsrv(adcs_server, adcs_user, adcs_pwd, auth_method = adcs_auth_method) + ca_server = Certsrv(adcs_server, adcs_user, adcs_pwd, auth_method=adcs_auth_method) out_certlist = [] - for id in range(adcs_start,adcs_stop): - try: + for id in range(adcs_start, adcs_stop): + try: cert = ca_server.get_existing_cert(id, encoding='b64').decode('utf-8').replace('\r\n', '\n') except Exception as err: if '{0}'.format(err).find("CERTSRV_E_PROPERTY_EMPTY"): - #this error indicates end of certificate list(?), so we stop + # this error indicates end of certificate list(?), so we stop break else: # We do nothing in case there is no certificate returned with the current id for other reasons current_app.logger.info("Error with id {0}: {1}".format(id, err)) - else: - #we have a certificate + else: + # we have a certificate pubkey = crypto.load_certificate(crypto.FILETYPE_PEM, cert) - #loop through extensions to see if we find "TLS Web Server Authentication" - for e_id in range(0,pubkey.get_extension_count()-1): + # loop through extensions to see if we find "TLS Web Server Authentication" + for e_id in range(0, pubkey.get_extension_count() - 1): try: extension = '{0}'.format(pubkey.get_extension(e_id)) - except: + except Exception: extensionn = '' - if extension.find("TLS Web Server Authentication") != -1: - out_certlist.append ( { + if extension.find("TLS Web Server Authentication") != -1: + out_certlist.append({ 'name': format(pubkey.get_subject().CN), - 'body' : cert}) + 'body': cert}) break - return out_certlist - - def get_endpoints(self, options, **kwargs): + def get_endpoints(self, options, **kwargs): # There are no endpoints in the ADCS - raise NotImplementedError('Not implemented\n',self, options, **kwargs) - + raise NotImplementedError('Not implemented\n', self, options, **kwargs)