134 lines
5.4 KiB
Python
134 lines
5.4 KiB
Python
from lemur.plugins.bases import IssuerPlugin, SourcePlugin
|
|
import requests
|
|
from lemur.plugins import lemur_adcs as ADCS
|
|
from certsrv import Certsrv
|
|
from OpenSSL import crypto
|
|
from flask import current_app
|
|
|
|
|
|
class ADCSIssuerPlugin(IssuerPlugin):
|
|
title = "ADCS"
|
|
slug = "adcs-issuer"
|
|
description = "Enables the creation of certificates by ADCS (Active Directory Certificate Services)"
|
|
version = ADCS.VERSION
|
|
|
|
author = "sirferl"
|
|
author_url = "https://github.com/sirferl/lemur"
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
"""Initialize the issuer with the appropriate details."""
|
|
self.session = requests.Session()
|
|
super(ADCSIssuerPlugin, self).__init__(*args, **kwargs)
|
|
|
|
@staticmethod
|
|
def create_authority(options):
|
|
"""Create an authority.
|
|
Creates an authority, this authority is then used by Lemur to
|
|
allow a user to specify which Certificate Authority they want
|
|
to sign their certificate.
|
|
|
|
:param options:
|
|
:return:
|
|
"""
|
|
adcs_root = current_app.config.get("ADCS_ROOT")
|
|
adcs_issuing = current_app.config.get("ADCS_ISSUING")
|
|
role = {"username": "", "password": "", "name": "adcs"}
|
|
return adcs_root, adcs_issuing, [role]
|
|
|
|
def create_certificate(self, csr, issuer_options):
|
|
adcs_server = current_app.config.get("ADCS_SERVER")
|
|
adcs_user = current_app.config.get("ADCS_USER")
|
|
adcs_pwd = current_app.config.get("ADCS_PWD")
|
|
adcs_auth_method = current_app.config.get("ADCS_AUTH_METHOD")
|
|
# if there is a config variable ADCS_TEMPLATE_<upper(authority.name)> take the value as Cert template
|
|
# else default to ADCS_TEMPLATE to be compatible with former versions
|
|
authority = issuer_options.get("authority").name.upper()
|
|
adcs_template = current_app.config.get("ADCS_TEMPLATE_{0}".format(authority), current_app.config.get("ADCS_TEMPLATE"))
|
|
ca_server = Certsrv(
|
|
adcs_server, adcs_user, adcs_pwd, auth_method=adcs_auth_method
|
|
)
|
|
current_app.logger.info("Requesting CSR: {0}".format(csr))
|
|
current_app.logger.info("Issuer options: {0}".format(issuer_options))
|
|
cert = (
|
|
ca_server.get_cert(csr, adcs_template, encoding="b64")
|
|
.decode("utf-8")
|
|
.replace("\r\n", "\n")
|
|
)
|
|
chain = (
|
|
ca_server.get_ca_cert(encoding="b64").decode("utf-8").replace("\r\n", "\n")
|
|
)
|
|
return cert, chain, None
|
|
|
|
def revoke_certificate(self, certificate, comments):
|
|
raise NotImplementedError("Not implemented\n", self, certificate, comments)
|
|
|
|
def get_ordered_certificate(self, order_id):
|
|
raise NotImplementedError("Not implemented\n", self, order_id)
|
|
|
|
def canceled_ordered_certificate(self, pending_cert, **kwargs):
|
|
raise NotImplementedError("Not implemented\n", self, pending_cert, **kwargs)
|
|
|
|
|
|
class ADCSSourcePlugin(SourcePlugin):
|
|
title = "ADCS"
|
|
slug = "adcs-source"
|
|
description = "Enables the collecion of certificates"
|
|
version = ADCS.VERSION
|
|
|
|
author = "sirferl"
|
|
author_url = "https://github.com/sirferl/lemur"
|
|
options = [
|
|
{
|
|
"name": "dummy",
|
|
"type": "str",
|
|
"required": False,
|
|
"validation": "/^[0-9]{12,12}$/",
|
|
"helpMessage": "Just to prevent error",
|
|
}
|
|
]
|
|
|
|
def get_certificates(self, options, **kwargs):
|
|
adcs_server = current_app.config.get("ADCS_SERVER")
|
|
adcs_user = current_app.config.get("ADCS_USER")
|
|
adcs_pwd = current_app.config.get("ADCS_PWD")
|
|
adcs_auth_method = current_app.config.get("ADCS_AUTH_METHOD")
|
|
adcs_start = current_app.config.get("ADCS_START")
|
|
adcs_stop = current_app.config.get("ADCS_STOP")
|
|
ca_server = Certsrv(
|
|
adcs_server, adcs_user, adcs_pwd, auth_method=adcs_auth_method
|
|
)
|
|
out_certlist = []
|
|
for id in range(adcs_start, adcs_stop):
|
|
try:
|
|
cert = (
|
|
ca_server.get_existing_cert(id, encoding="b64")
|
|
.decode("utf-8")
|
|
.replace("\r\n", "\n")
|
|
)
|
|
except Exception as err:
|
|
if "{0}".format(err).find("CERTSRV_E_PROPERTY_EMPTY"):
|
|
# this error indicates end of certificate list(?), so we stop
|
|
break
|
|
else:
|
|
# We do nothing in case there is no certificate returned for other reasons
|
|
current_app.logger.info("Error with id {0}: {1}".format(id, err))
|
|
else:
|
|
# we have a certificate
|
|
pubkey = crypto.load_certificate(crypto.FILETYPE_PEM, cert)
|
|
# loop through extensions to see if we find "TLS Web Server Authentication"
|
|
for e_id in range(0, pubkey.get_extension_count() - 1):
|
|
try:
|
|
extension = "{0}".format(pubkey.get_extension(e_id))
|
|
except Exception:
|
|
extensionn = ""
|
|
if extension.find("TLS Web Server Authentication") != -1:
|
|
out_certlist.append(
|
|
{"name": format(pubkey.get_subject().CN), "body": cert}
|
|
)
|
|
break
|
|
return out_certlist
|
|
|
|
def get_endpoints(self, options, **kwargs):
|
|
# There are no endpoints in the ADCS
|
|
raise NotImplementedError("Not implemented\n", self, options, **kwargs)
|