diff --git a/lemur/plugins/lemur_acme/plugin.py b/lemur/plugins/lemur_acme/plugin.py index f02133a4..b8cbdc55 100644 --- a/lemur/plugins/lemur_acme/plugin.py +++ b/lemur/plugins/lemur_acme/plugin.py @@ -49,6 +49,29 @@ class AuthorizationRecord(object): class AcmeHandler(object): + def reuse_account(self, authority): + if not authority.options: + raise InvalidAuthority("Invalid authority. Options not set") + existing_key = False + existing_regr = False + + for option in json.loads(authority.options): + if option["name"] == "acme_private_key" and option["value"]: + existing_key = True + if option["name"] == "acme_regr" and option["value"]: + existing_regr = True + + if not existing_key and current_app.config.get("ACME_PRIVATE_KEY"): + existing_key = True + + if not existing_regr and current_app.config.get("ACME_REGR"): + existing_regr = True + + if existing_key and existing_regr: + return True + else: + return False + def strip_wildcard(self, host): """Removes the leading *. and returns Host and whether it was removed or not (True/False)""" prefix = "*." @@ -755,6 +778,28 @@ class ACMEIssuerPlugin(IssuerPlugin): # Needed to override issuer function. pass + def revoke_certificate(self, certificate, comments): + self.acme = AcmeDnsHandler() + if not self.acme.reuse_account(certificate.authority): + raise InvalidConfiguration("There is no ACME account saved, unable to revoke the certificate.") + acme_client, _ = self.acme.setup_acme_client(certificate.authority) + + fullchain_com = jose.ComparableX509( + OpenSSL.crypto.load_certificate( + OpenSSL.crypto.FILETYPE_PEM, certificate.body)) + + try: + acme_client.revoke(fullchain_com, 0) # revocation reason = 0 + except (errors.ConflictError, errors.ClientError, errors.Error) as e: + # Certificate already revoked. + current_app.logger.error("Certificate revocation failed with message: " + e.detail) + metrics.send("acme_revoke_certificate_failure", "counter", 1) + return False + + current_app.logger.warning("Certificate succesfully revoked: " + certificate.name) + metrics.send("acme_revoke_certificate_success", "counter", 1) + return True + class ACMEHttpIssuerPlugin(IssuerPlugin): title = "Acme HTTP-01" @@ -884,8 +929,8 @@ class ACMEHttpIssuerPlugin(IssuerPlugin): len(pem_certificate): # noqa ].lstrip() - # TODO add external ID (if possible) - return pem_certificate, pem_certificate_chain, None + # validation is a random string, we use it as external id, to make it possible to implement revoke_certificate + return pem_certificate, pem_certificate_chain, validation[0:128] @staticmethod def create_authority(options): @@ -913,3 +958,25 @@ class ACMEHttpIssuerPlugin(IssuerPlugin): def cancel_ordered_certificate(self, pending_cert, **kwargs): # Needed to override issuer function. pass + + def revoke_certificate(self, certificate, comments): + self.acme = AcmeHandler() + if not self.acme.reuse_account(certificate.authority): + raise InvalidConfiguration("There is no ACME account saved, unable to revoke the certificate.") + acme_client, _ = self.acme.setup_acme_client(certificate.authority) + + fullchain_com = jose.ComparableX509( + OpenSSL.crypto.load_certificate( + OpenSSL.crypto.FILETYPE_PEM, certificate.body)) + + try: + acme_client.revoke(fullchain_com, 0) # revocation reason = 0 + except (errors.ConflictError, errors.ClientError, errors.Error) as e: + # Certificate already revoked. + current_app.logger.error("Certificate revocation failed with message: " + e.detail) + metrics.send("acme_revoke_certificate_failure", "counter", 1) + return False + + current_app.logger.warning("Certificate succesfully revoked: " + certificate.name) + metrics.send("acme_revoke_certificate_success", "counter", 1) + return True diff --git a/lemur/plugins/lemur_acme/tests/test_acme_handler.py b/lemur/plugins/lemur_acme/tests/test_acme_handler.py index b586aa9f..57ddcee6 100644 --- a/lemur/plugins/lemur_acme/tests/test_acme_handler.py +++ b/lemur/plugins/lemur_acme/tests/test_acme_handler.py @@ -28,6 +28,34 @@ class TestAcmeHandler(unittest.TestCase): with self.assertRaises(Exception): self.acme.setup_acme_client(mock_authority) + def test_reuse_account_not_defined(self): + mock_authority = Mock() + mock_authority.options = [] + with self.assertRaises(Exception): + self.acme.reuse_account(mock_authority) + + def test_reuse_account_from_authority(self): + mock_authority = Mock() + mock_authority.options = '[{"name": "acme_private_key", "value": "PRIVATE_KEY"}, {"name": "acme_regr", "value": "ACME_REGR"}]' + + self.assertTrue(self.acme.reuse_account(mock_authority)) + + @patch("lemur.plugins.lemur_acme.plugin.current_app") + def test_reuse_account_from_config(self, mock_current_app): + mock_authority = Mock() + mock_authority.options = '[{"name": "mock_name", "value": "mock_value"}]' + mock_current_app.config = {"ACME_PRIVATE_KEY": "PRIVATE_KEY", "ACME_REGR": "ACME_REGR"} + + self.assertTrue(self.acme.reuse_account(mock_authority)) + + @patch("lemur.plugins.lemur_acme.plugin.current_app") + def test_reuse_account_no_configuration(self, mock_current_app): + mock_authority = Mock() + mock_authority.options = '[{"name": "mock_name", "value": "mock_value"}]' + mock_current_app.config = {} + + self.assertFalse(self.acme.reuse_account(mock_authority)) + @patch("lemur.plugins.lemur_acme.plugin.BackwardsCompatibleClientV2") @patch("lemur.plugins.lemur_acme.plugin.current_app") def test_setup_acme_client_success(self, mock_current_app, mock_acme):