Implement revoke certificate for ACME
This commit is contained in:
parent
215070b327
commit
81b078604c
|
@ -49,6 +49,29 @@ class AuthorizationRecord(object):
|
||||||
|
|
||||||
class AcmeHandler(object):
|
class AcmeHandler(object):
|
||||||
|
|
||||||
|
def reuse_account(self, authority):
|
||||||
|
if not authority.options:
|
||||||
|
raise InvalidAuthority("Invalid authority. Options not set")
|
||||||
|
existing_key = False
|
||||||
|
existing_regr = False
|
||||||
|
|
||||||
|
for option in json.loads(authority.options):
|
||||||
|
if option["name"] == "acme_private_key" and option["value"]:
|
||||||
|
existing_key = True
|
||||||
|
if option["name"] == "acme_regr" and option["value"]:
|
||||||
|
existing_regr = True
|
||||||
|
|
||||||
|
if not existing_key and current_app.config.get("ACME_PRIVATE_KEY"):
|
||||||
|
existing_key = True
|
||||||
|
|
||||||
|
if not existing_regr and current_app.config.get("ACME_REGR"):
|
||||||
|
existing_regr = True
|
||||||
|
|
||||||
|
if existing_key and existing_regr:
|
||||||
|
return True
|
||||||
|
else:
|
||||||
|
return False
|
||||||
|
|
||||||
def strip_wildcard(self, host):
|
def strip_wildcard(self, host):
|
||||||
"""Removes the leading *. and returns Host and whether it was removed or not (True/False)"""
|
"""Removes the leading *. and returns Host and whether it was removed or not (True/False)"""
|
||||||
prefix = "*."
|
prefix = "*."
|
||||||
|
@ -755,6 +778,28 @@ class ACMEIssuerPlugin(IssuerPlugin):
|
||||||
# Needed to override issuer function.
|
# Needed to override issuer function.
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
def revoke_certificate(self, certificate, comments):
|
||||||
|
self.acme = AcmeDnsHandler()
|
||||||
|
if not self.acme.reuse_account(certificate.authority):
|
||||||
|
raise InvalidConfiguration("There is no ACME account saved, unable to revoke the certificate.")
|
||||||
|
acme_client, _ = self.acme.setup_acme_client(certificate.authority)
|
||||||
|
|
||||||
|
fullchain_com = jose.ComparableX509(
|
||||||
|
OpenSSL.crypto.load_certificate(
|
||||||
|
OpenSSL.crypto.FILETYPE_PEM, certificate.body))
|
||||||
|
|
||||||
|
try:
|
||||||
|
acme_client.revoke(fullchain_com, 0) # revocation reason = 0
|
||||||
|
except (errors.ConflictError, errors.ClientError, errors.Error) as e:
|
||||||
|
# Certificate already revoked.
|
||||||
|
current_app.logger.error("Certificate revocation failed with message: " + e.detail)
|
||||||
|
metrics.send("acme_revoke_certificate_failure", "counter", 1)
|
||||||
|
return False
|
||||||
|
|
||||||
|
current_app.logger.warning("Certificate succesfully revoked: " + certificate.name)
|
||||||
|
metrics.send("acme_revoke_certificate_success", "counter", 1)
|
||||||
|
return True
|
||||||
|
|
||||||
|
|
||||||
class ACMEHttpIssuerPlugin(IssuerPlugin):
|
class ACMEHttpIssuerPlugin(IssuerPlugin):
|
||||||
title = "Acme HTTP-01"
|
title = "Acme HTTP-01"
|
||||||
|
@ -884,8 +929,8 @@ class ACMEHttpIssuerPlugin(IssuerPlugin):
|
||||||
len(pem_certificate): # noqa
|
len(pem_certificate): # noqa
|
||||||
].lstrip()
|
].lstrip()
|
||||||
|
|
||||||
# TODO add external ID (if possible)
|
# validation is a random string, we use it as external id, to make it possible to implement revoke_certificate
|
||||||
return pem_certificate, pem_certificate_chain, None
|
return pem_certificate, pem_certificate_chain, validation[0:128]
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def create_authority(options):
|
def create_authority(options):
|
||||||
|
@ -913,3 +958,25 @@ class ACMEHttpIssuerPlugin(IssuerPlugin):
|
||||||
def cancel_ordered_certificate(self, pending_cert, **kwargs):
|
def cancel_ordered_certificate(self, pending_cert, **kwargs):
|
||||||
# Needed to override issuer function.
|
# Needed to override issuer function.
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
def revoke_certificate(self, certificate, comments):
|
||||||
|
self.acme = AcmeHandler()
|
||||||
|
if not self.acme.reuse_account(certificate.authority):
|
||||||
|
raise InvalidConfiguration("There is no ACME account saved, unable to revoke the certificate.")
|
||||||
|
acme_client, _ = self.acme.setup_acme_client(certificate.authority)
|
||||||
|
|
||||||
|
fullchain_com = jose.ComparableX509(
|
||||||
|
OpenSSL.crypto.load_certificate(
|
||||||
|
OpenSSL.crypto.FILETYPE_PEM, certificate.body))
|
||||||
|
|
||||||
|
try:
|
||||||
|
acme_client.revoke(fullchain_com, 0) # revocation reason = 0
|
||||||
|
except (errors.ConflictError, errors.ClientError, errors.Error) as e:
|
||||||
|
# Certificate already revoked.
|
||||||
|
current_app.logger.error("Certificate revocation failed with message: " + e.detail)
|
||||||
|
metrics.send("acme_revoke_certificate_failure", "counter", 1)
|
||||||
|
return False
|
||||||
|
|
||||||
|
current_app.logger.warning("Certificate succesfully revoked: " + certificate.name)
|
||||||
|
metrics.send("acme_revoke_certificate_success", "counter", 1)
|
||||||
|
return True
|
||||||
|
|
|
@ -28,6 +28,34 @@ class TestAcmeHandler(unittest.TestCase):
|
||||||
with self.assertRaises(Exception):
|
with self.assertRaises(Exception):
|
||||||
self.acme.setup_acme_client(mock_authority)
|
self.acme.setup_acme_client(mock_authority)
|
||||||
|
|
||||||
|
def test_reuse_account_not_defined(self):
|
||||||
|
mock_authority = Mock()
|
||||||
|
mock_authority.options = []
|
||||||
|
with self.assertRaises(Exception):
|
||||||
|
self.acme.reuse_account(mock_authority)
|
||||||
|
|
||||||
|
def test_reuse_account_from_authority(self):
|
||||||
|
mock_authority = Mock()
|
||||||
|
mock_authority.options = '[{"name": "acme_private_key", "value": "PRIVATE_KEY"}, {"name": "acme_regr", "value": "ACME_REGR"}]'
|
||||||
|
|
||||||
|
self.assertTrue(self.acme.reuse_account(mock_authority))
|
||||||
|
|
||||||
|
@patch("lemur.plugins.lemur_acme.plugin.current_app")
|
||||||
|
def test_reuse_account_from_config(self, mock_current_app):
|
||||||
|
mock_authority = Mock()
|
||||||
|
mock_authority.options = '[{"name": "mock_name", "value": "mock_value"}]'
|
||||||
|
mock_current_app.config = {"ACME_PRIVATE_KEY": "PRIVATE_KEY", "ACME_REGR": "ACME_REGR"}
|
||||||
|
|
||||||
|
self.assertTrue(self.acme.reuse_account(mock_authority))
|
||||||
|
|
||||||
|
@patch("lemur.plugins.lemur_acme.plugin.current_app")
|
||||||
|
def test_reuse_account_no_configuration(self, mock_current_app):
|
||||||
|
mock_authority = Mock()
|
||||||
|
mock_authority.options = '[{"name": "mock_name", "value": "mock_value"}]'
|
||||||
|
mock_current_app.config = {}
|
||||||
|
|
||||||
|
self.assertFalse(self.acme.reuse_account(mock_authority))
|
||||||
|
|
||||||
@patch("lemur.plugins.lemur_acme.plugin.BackwardsCompatibleClientV2")
|
@patch("lemur.plugins.lemur_acme.plugin.BackwardsCompatibleClientV2")
|
||||||
@patch("lemur.plugins.lemur_acme.plugin.current_app")
|
@patch("lemur.plugins.lemur_acme.plugin.current_app")
|
||||||
def test_setup_acme_client_success(self, mock_current_app, mock_acme):
|
def test_setup_acme_client_success(self, mock_current_app, mock_acme):
|
||||||
|
|
Loading…
Reference in New Issue