From 76dcfbd528aa3a835e0062be0685ee5cba7edbca Mon Sep 17 00:00:00 2001 From: Mathias Petermann Date: Wed, 30 Sep 2020 11:35:27 +0200 Subject: [PATCH] Add more tests --- lemur/plugins/lemur_acme/plugin.py | 1 + .../lemur_acme/tests/test_acme_http.py | 132 +++++++++++++----- 2 files changed, 100 insertions(+), 33 deletions(-) diff --git a/lemur/plugins/lemur_acme/plugin.py b/lemur/plugins/lemur_acme/plugin.py index d42573df..a3d15b6d 100644 --- a/lemur/plugins/lemur_acme/plugin.py +++ b/lemur/plugins/lemur_acme/plugin.py @@ -841,6 +841,7 @@ class ACMEHttpIssuerPlugin(IssuerPlugin): else: # Here we probably should create a pending certificate and make use of celery, but for now # I'll ignore all of that + token_destination = None for option in json.loads(issuer_options["authority"].options): if option["name"] == "tokenDestination": token_destination = destination_service.get_by_label(option["value"]) diff --git a/lemur/plugins/lemur_acme/tests/test_acme_http.py b/lemur/plugins/lemur_acme/tests/test_acme_http.py index f6183062..14d46344 100644 --- a/lemur/plugins/lemur_acme/tests/test_acme_http.py +++ b/lemur/plugins/lemur_acme/tests/test_acme_http.py @@ -3,7 +3,6 @@ from unittest.mock import patch, Mock from acme import challenges from lemur.plugins.lemur_acme import plugin -from mock import MagicMock class TestAcmeHttp(unittest.TestCase): @@ -25,33 +24,6 @@ class TestAcmeHttp(unittest.TestCase): mock_destination_provider.plugin_name = "sftp-destination" self.ACMEHttpIssuerPlugin.destination_list = ["mock-sftp-destination", "mock-s3-destination"] - @patch("acme.client.Client") - @patch("OpenSSL.crypto", return_value="mock_cert") - @patch("josepy.util.ComparableX509") - @patch("lemur.plugins.lemur_acme.plugin.AcmeHandler.get_dns_challenges") - @patch("lemur.plugins.lemur_acme.plugin.current_app") - def test_request_certificate( - self, - mock_current_app, - mock_get_dns_challenges, - mock_jose, - mock_crypto, - mock_acme, - ): - mock_cert_response = Mock() - mock_cert_response.body = "123" - mock_cert_response_full = [mock_cert_response, True] - mock_acme.poll_and_request_issuance = Mock(return_value=mock_cert_response_full) - mock_authz = [] - mock_authz_record = MagicMock() - mock_authz_record.authz = Mock() - mock_authz.append(mock_authz_record) - mock_acme.fetch_chain = Mock(return_value="mock_chain") - mock_crypto.dump_certificate = Mock(return_value=b"chain") - mock_order = Mock() - mock_current_app.config = {} - self.acme.request_certificate(mock_acme, [], mock_order) - @patch("lemur.plugins.lemur_acme.plugin.current_app") def test_create_authority(self, mock_current_app): mock_current_app.config = Mock() @@ -97,11 +69,6 @@ class TestAcmeHttp(unittest.TestCase): mock_client.new_order.return_value = mock_order_resource mock_acme.return_value = (mock_client, "") - mock_dns_provider = Mock() - mock_dns_provider.credentials = '{"account_id": 1}' - mock_dns_provider.provider_type = "route53" - mock_dns_provider_service.get.return_value = mock_dns_provider - mock_destination = Mock() mock_destination.label = "mock-sftp-destination" mock_destination.plugin_name = "SFTPDestinationPlugin" @@ -120,3 +87,102 @@ class TestAcmeHttp(unittest.TestCase): mock_request_certificate.return_value = ("pem_certificate", "chain") result = provider.create_certificate(csr, issuer_options) assert result + + @patch("lemur.plugins.lemur_acme.plugin.AcmeHandler.setup_acme_client") + @patch("lemur.plugins.base.manager.PluginManager.get") + @patch("lemur.plugins.lemur_acme.plugin.destination_service") + @patch("lemur.plugins.lemur_acme.plugin.dns_provider_service") + @patch("lemur.plugins.lemur_acme.plugin.current_app") + @patch("lemur.plugins.lemur_acme.plugin.AcmeHandler.get_authorizations") + @patch("lemur.plugins.lemur_acme.plugin.AcmeHandler.finalize_authorizations") + @patch("lemur.plugins.lemur_acme.plugin.AcmeHandler.request_certificate") + @patch("lemur.plugins.lemur_acme.plugin.authorization_service") + def test_create_certificate_missing_destination_token( + self, + mock_authorization_service, + mock_request_certificate, + mock_finalize_authorizations, + mock_get_authorizations, + mock_current_app, + mock_dns_provider_service, + mock_destination_service, + mock_plugin_manager_get, + mock_acme, + ): + provider = plugin.ACMEHttpIssuerPlugin() + mock_authority = Mock() + mock_authority.options = '[{"name": "mock_name", "value": "mock_value"}]' + + mock_order_resource = Mock() + mock_order_resource.authorizations = [Mock()] + mock_order_resource.authorizations[0].body.challenges = [Mock()] + mock_order_resource.authorizations[0].body.challenges[0].chall = challenges.HTTP01( + token=b'\x0f\x1c\xbe#od\xd1\x9c\xa6j\\\xa4\r\xed\xe5\xbf0pz\xeaxnl)\xea[i\xbc\x95\x08\x96\x1f') + + mock_client = Mock() + mock_client.new_order.return_value = mock_order_resource + mock_acme.return_value = (mock_client, "") + + mock_destination = Mock() + mock_destination.label = "mock-sftp-destination" + mock_destination.plugin_name = "SFTPDestinationPlugin" + mock_destination_service.get_by_label.return_value = mock_destination + + mock_destination_plugin = Mock() + mock_destination_plugin.upload_acme_token.return_value = True + mock_plugin_manager_get.return_value = mock_destination_plugin + + issuer_options = { + "authority": mock_authority, + "tokenDestination": "mock-sftp-destination", + "common_name": "test.netflix.net", + } + csr = "123" + mock_request_certificate.return_value = ("pem_certificate", "chain") + with self.assertRaisesRegexp(Exception, "No token_destination configured"): + provider.create_certificate(csr, issuer_options) + + @patch("lemur.plugins.lemur_acme.plugin.AcmeHandler.setup_acme_client") + @patch("lemur.plugins.base.manager.PluginManager.get") + @patch("lemur.plugins.lemur_acme.plugin.destination_service") + @patch("lemur.plugins.lemur_acme.plugin.dns_provider_service") + @patch("lemur.plugins.lemur_acme.plugin.current_app") + @patch("lemur.plugins.lemur_acme.plugin.AcmeHandler.get_authorizations") + @patch("lemur.plugins.lemur_acme.plugin.AcmeHandler.finalize_authorizations") + @patch("lemur.plugins.lemur_acme.plugin.AcmeHandler.request_certificate") + @patch("lemur.plugins.lemur_acme.plugin.authorization_service") + def test_create_certificate_missing_http_challenge( + self, + mock_authorization_service, + mock_request_certificate, + mock_finalize_authorizations, + mock_get_authorizations, + mock_current_app, + mock_dns_provider_service, + mock_destination_service, + mock_plugin_manager_get, + mock_acme, + ): + provider = plugin.ACMEHttpIssuerPlugin() + mock_authority = Mock() + mock_authority.options = '[{"name": "tokenDestination", "value": "mock-sftp-destination"}]' + + mock_order_resource = Mock() + mock_order_resource.authorizations = [Mock()] + mock_order_resource.authorizations[0].body.challenges = [Mock()] + mock_order_resource.authorizations[0].body.challenges[0].chall = challenges.DNS01( + token=b'\x0f\x1c\xbe#od\xd1\x9c\xa6j\\\xa4\r\xed\xe5\xbf0pz\xeaxnl)\xea[i\xbc\x95\x08\x96\x1f') + + mock_client = Mock() + mock_client.new_order.return_value = mock_order_resource + mock_acme.return_value = (mock_client, "") + + issuer_options = { + "authority": mock_authority, + "tokenDestination": "mock-sftp-destination", + "common_name": "test.netflix.net", + } + csr = "123" + mock_request_certificate.return_value = ("pem_certificate", "chain") + with self.assertRaisesRegexp(Exception, "HTTP-01 challenge was not offered"): + provider.create_certificate(csr, issuer_options)