From df5168765b06a6d0875f1c652962c6857e13bb74 Mon Sep 17 00:00:00 2001 From: Curtis Castrapel Date: Tue, 8 May 2018 11:03:17 -0700 Subject: [PATCH] more tests --- lemur/exceptions.py | 4 + lemur/plugins/lemur_acme/dyn.py | 15 +-- lemur/plugins/lemur_acme/plugin.py | 9 +- lemur/plugins/lemur_acme/tests/test_acme.py | 100 +++++++++++++++++++- 4 files changed, 118 insertions(+), 10 deletions(-) diff --git a/lemur/exceptions.py b/lemur/exceptions.py index a9909e87..3bf90b77 100644 --- a/lemur/exceptions.py +++ b/lemur/exceptions.py @@ -38,3 +38,7 @@ class InvalidConfiguration(Exception): class InvalidAuthority(Exception): pass + + +class UnknownProvider(Exception): + pass diff --git a/lemur/plugins/lemur_acme/dyn.py b/lemur/plugins/lemur_acme/dyn.py index b413180f..d46011d1 100644 --- a/lemur/plugins/lemur_acme/dyn.py +++ b/lemur/plugins/lemur_acme/dyn.py @@ -7,13 +7,14 @@ from dyn.tm.zones import Node, Zone from flask import current_app from tld import get_tld -current_app.logger.debug("Logging in to Dyn API") -dynect_session = DynectSession( - current_app.config.get('ACME_DYN_CUSTOMER_NAME', ''), - current_app.config.get('ACME_DYN_USERNAME', ''), - current_app.config.get('ACME_DYN_PASSWORD', ''), -) +def get_dynect_session(): + dynect_session = DynectSession( + current_app.config.get('ACME_DYN_CUSTOMER_NAME', ''), + current_app.config.get('ACME_DYN_USERNAME', ''), + current_app.config.get('ACME_DYN_PASSWORD', ''), + ) + return dynect_session def _has_dns_propagated(name, token): @@ -47,6 +48,7 @@ def wait_for_dns_change(change_id, account_number=None): def create_txt_record(domain, token, account_number): + get_dynect_session() zone_name = get_tld('http://' + domain) zone_parts = len(zone_name.split('.')) node_name = '.'.join(domain.split('.')[:-zone_parts]) @@ -61,6 +63,7 @@ def create_txt_record(domain, token, account_number): def delete_txt_record(change_id, account_number, domain, token): + get_dynect_session() if not domain: current_app.logger.debug("delete_txt_record: No domain passed") return diff --git a/lemur/plugins/lemur_acme/plugin.py b/lemur/plugins/lemur_acme/plugin.py index 61f22eb4..dc529549 100644 --- a/lemur/plugins/lemur_acme/plugin.py +++ b/lemur/plugins/lemur_acme/plugin.py @@ -24,9 +24,10 @@ from flask import current_app from lemur.authorizations import service as authorization_service from lemur.common.utils import generate_private_key from lemur.dns_providers import service as dns_provider_service -from lemur.exceptions import InvalidAuthority, InvalidConfiguration +from lemur.exceptions import InvalidAuthority, InvalidConfiguration, UnknownProvider from lemur.plugins import lemur_acme as acme from lemur.plugins.bases import IssuerPlugin +from lemur.plugins.lemur_acme import cloudflare, dyn, route53 def find_dns_challenge(authz): @@ -222,13 +223,15 @@ class ACMEIssuerPlugin(IssuerPlugin): super(ACMEIssuerPlugin, self).__init__(*args, **kwargs) def get_dns_provider(self, type): - from lemur.plugins.lemur_acme import cloudflare, dyn, route53 provider_types = { 'cloudflare': cloudflare, 'dyn': dyn, 'route53': route53, } - return provider_types[type] + provider = provider_types.get(type) + if not provider: + raise UnknownProvider("No such DNS provider: {}".format(type)) + return provider def get_ordered_certificate(self, pending_cert): acme_client, registration = setup_acme_client(pending_cert.authority) diff --git a/lemur/plugins/lemur_acme/tests/test_acme.py b/lemur/plugins/lemur_acme/tests/test_acme.py index 88af1b11..7bfd1673 100644 --- a/lemur/plugins/lemur_acme/tests/test_acme.py +++ b/lemur/plugins/lemur_acme/tests/test_acme.py @@ -38,7 +38,7 @@ class TestAcme(unittest.TestCase): mock_app.logger.debug = Mock() mock_authz = Mock() mock_authz.body.resolved_combinations = [] - mock_entry = MagicMock() + mock_entry = MagicMock()t from acme import challenges c = challenges.DNS01() mock_entry.chall = c @@ -180,3 +180,101 @@ class TestAcme(unittest.TestCase): self.assertEqual(acme_root, "123") self.assertEqual(b, "") self.assertEqual(role, [{'username': '', 'password': '', 'name': 'acme'}]) + + @patch('lemur.plugins.lemur_acme.plugin.current_app') + @patch('lemur.plugins.lemur_acme.dyn.current_app') + @patch('lemur.plugins.lemur_acme.cloudflare.current_app') + def test_get_dns_provider(self, mock_current_app_cloudflare, mock_current_app_dyn, mock_current_app): + provider = plugin.ACMEIssuerPlugin() + route53 = provider.get_dns_provider("route53") + assert route53 + cloudflare = provider.get_dns_provider("cloudflare") + assert cloudflare + dyn = provider.get_dns_provider("dyn") + assert dyn + + @patch('lemur.plugins.lemur_acme.plugin.setup_acme_client') + @patch('lemur.plugins.lemur_acme.plugin.current_app') + @patch('lemur.plugins.lemur_acme.plugin.authorization_service') + @patch('lemur.plugins.lemur_acme.plugin.dns_provider_service') + @patch('lemur.plugins.lemur_acme.plugin.get_authorizations') + @patch('lemur.plugins.lemur_acme.plugin.finalize_authorizations') + @patch('lemur.plugins.lemur_acme.plugin.request_certificate') + def test_get_ordered_certificate( + self, mock_request_certificate, mock_finalize_authorizations, mock_get_authorizations, + mock_dns_provider_service, mock_authorization_service, mock_current_app, mock_acme): + mock_client = Mock() + mock_acme.return_value = (mock_client, "") + mock_request_certificate.return_value = ("pem_certificate", "chain") + + mock_cert = Mock() + mock_cert.external_id = 1 + + provider = plugin.ACMEIssuerPlugin() + provider.get_dns_provider = Mock() + result = provider.get_ordered_certificate(mock_cert) + self.assertEqual( + result, + { + 'body': "pem_certificate", + 'chain': "chain", + 'external_id': "1" + } + ) + + @patch('lemur.plugins.lemur_acme.plugin.setup_acme_client') + @patch('lemur.plugins.lemur_acme.plugin.current_app') + @patch('lemur.plugins.lemur_acme.plugin.authorization_service') + @patch('lemur.plugins.lemur_acme.plugin.dns_provider_service') + @patch('lemur.plugins.lemur_acme.plugin.get_authorizations') + @patch('lemur.plugins.lemur_acme.plugin.finalize_authorizations') + @patch('lemur.plugins.lemur_acme.plugin.request_certificate') + def test_get_ordered_certificates( + self, mock_request_certificate, mock_finalize_authorizations, mock_get_authorizations, + mock_dns_provider_service, mock_authorization_service, mock_current_app, mock_acme): + mock_client = Mock() + mock_acme.return_value = (mock_client, "") + mock_request_certificate.return_value = ("pem_certificate", "chain") + + mock_cert = Mock() + mock_cert.external_id = 1 + + mock_cert2 = Mock() + mock_cert2.external_id = 2 + + provider = plugin.ACMEIssuerPlugin() + provider.get_dns_provider = Mock() + result = provider.get_ordered_certificates([mock_cert, mock_cert2]) + self.assertEqual(len(result), 2) + self.assertEqual(result[0]['cert'], {'body': 'pem_certificate', 'chain': 'chain', 'external_id': '1'}) + self.assertEqual(result[1]['cert'], {'body': 'pem_certificate', 'chain': 'chain', 'external_id': '2'}) + + @patch('lemur.plugins.lemur_acme.plugin.setup_acme_client') + @patch('lemur.plugins.lemur_acme.plugin.dns_provider_service') + @patch('lemur.plugins.lemur_acme.plugin.current_app') + @patch('lemur.plugins.lemur_acme.plugin.get_authorizations') + @patch('lemur.plugins.lemur_acme.plugin.finalize_authorizations') + @patch('lemur.plugins.lemur_acme.plugin.request_certificate') + @patch('lemur.plugins.lemur_acme.plugin.authorization_service') + def test_create_certificate(self, mock_authorization_service, mock_request_certificate, mock_finalize_authorizations, mock_get_authorizations, + mock_current_app, mock_dns_provider_service, mock_acme): + provider = plugin.ACMEIssuerPlugin() + mock_authority = Mock() + issuer_options = { + 'authority': mock_authority, + 'dns_provider': {"id": 1}, + "common_name": "test.netflix.net" + } + + mock_client = Mock() + mock_acme.return_value = (mock_client, "") + + mock_dns_provider = Mock() + mock_dns_provider.credentials = '{"account_id": 1}' + mock_dns_provider.provider_type = "route53" + mock_dns_provider_service.get.return_value = mock_dns_provider + + csr = "123" + mock_request_certificate.return_value = ("pem_certificate", "chain") + result = provider.create_certificate(csr, issuer_options) + assert result