Merge pull request #2930 from Netflix/powerdnsplugin_01

Fixing authorization lookup code to fix duplicate domain authorizations bug
This commit is contained in:
csine-nflx 2020-03-16 11:49:09 -07:00 committed by GitHub
commit db3920ed25
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 54 additions and 33 deletions

View File

@ -54,18 +54,30 @@ class AcmeHandler(object):
current_app.logger.error(f"Unable to fetch DNS Providers: {e}")
self.all_dns_providers = []
def find_dns_challenge(self, host, authorizations):
def get_dns_challenges(self, host, authorizations):
"""Get dns challenges for provided domain"""
domain_to_validate, is_wildcard = self.strip_wildcard(host)
dns_challenges = []
for authz in authorizations:
if not authz.body.identifier.value.lower() == host.lower():
if not authz.body.identifier.value.lower() == domain_to_validate.lower():
continue
if is_wildcard and not authz.body.wildcard:
continue
if not is_wildcard and authz.body.wildcard:
continue
for combo in authz.body.challenges:
if isinstance(combo.chall, challenges.DNS01):
dns_challenges.append(combo)
return dns_challenges
def maybe_remove_wildcard(self, host):
return host.replace("*.", "")
def strip_wildcard(self, host):
"""Removes the leading *. and returns Host and whether it was removed or not (True/False)"""
prefix = "*."
if host.startswith(prefix):
return host[len(prefix):], True
return host, False
def maybe_add_extension(self, host, dns_provider_options):
if dns_provider_options and dns_provider_options.get(
@ -86,9 +98,8 @@ class AcmeHandler(object):
current_app.logger.debug("Starting DNS challenge for {0}".format(host))
change_ids = []
host_to_validate = self.maybe_remove_wildcard(host)
dns_challenges = self.find_dns_challenge(host_to_validate, order.authorizations)
dns_challenges = self.get_dns_challenges(host, order.authorizations)
host_to_validate, _ = self.strip_wildcard(host)
host_to_validate = self.maybe_add_extension(
host_to_validate, dns_provider_options
)
@ -325,7 +336,7 @@ class AcmeHandler(object):
)
dns_provider_options = json.loads(dns_provider.credentials)
account_number = dns_provider_options.get("account_id")
host_to_validate = self.maybe_remove_wildcard(authz_record.host)
host_to_validate, _ = self.strip_wildcard(authz_record.host)
host_to_validate = self.maybe_add_extension(
host_to_validate, dns_provider_options
)
@ -357,7 +368,7 @@ class AcmeHandler(object):
dns_provider_options = json.loads(dns_provider.credentials)
account_number = dns_provider_options.get("account_id")
dns_challenges = authz_record.dns_challenge
host_to_validate = self.maybe_remove_wildcard(authz_record.host)
host_to_validate, _ = self.strip_wildcard(authz_record.host)
host_to_validate = self.maybe_add_extension(
host_to_validate, dns_provider_options
)

View File

@ -23,11 +23,12 @@ class TestAcme(unittest.TestCase):
}
@patch("lemur.plugins.lemur_acme.plugin.len", return_value=1)
def test_find_dns_challenge(self, mock_len):
def test_get_dns_challenges(self, mock_len):
assert mock_len
from acme import challenges
host = "example.com"
c = challenges.DNS01()
mock_authz = Mock()
@ -35,9 +36,18 @@ class TestAcme(unittest.TestCase):
mock_entry = Mock()
mock_entry.chall = c
mock_authz.body.resolved_combinations.append(mock_entry)
result = yield self.acme.find_dns_challenge(mock_authz)
result = yield self.acme.get_dns_challenges(host, mock_authz)
self.assertEqual(result, mock_entry)
def test_strip_wildcard(self):
expected = ("example.com", False)
result = self.acme.strip_wildcard("example.com")
self.assertEqual(expected, result)
expected = ("example.com", True)
result = self.acme.strip_wildcard("*.example.com")
self.assertEqual(expected, result)
def test_authz_record(self):
a = plugin.AuthorizationRecord("host", "authz", "challenge", "id")
self.assertEqual(type(a), plugin.AuthorizationRecord)
@ -45,9 +55,9 @@ class TestAcme(unittest.TestCase):
@patch("acme.client.Client")
@patch("lemur.plugins.lemur_acme.plugin.current_app")
@patch("lemur.plugins.lemur_acme.plugin.len", return_value=1)
@patch("lemur.plugins.lemur_acme.plugin.AcmeHandler.find_dns_challenge")
@patch("lemur.plugins.lemur_acme.plugin.AcmeHandler.get_dns_challenges")
def test_start_dns_challenge(
self, mock_find_dns_challenge, mock_len, mock_app, mock_acme
self, mock_get_dns_challenges, mock_len, mock_app, mock_acme
):
assert mock_len
mock_order = Mock()
@ -65,7 +75,7 @@ class TestAcme(unittest.TestCase):
mock_dns_provider.create_txt_record = Mock(return_value=1)
values = [mock_entry]
iterable = mock_find_dns_challenge.return_value
iterable = mock_get_dns_challenges.return_value
iterator = iter(values)
iterable.__iter__.return_value = iterator
result = self.acme.start_dns_challenge(
@ -127,12 +137,12 @@ class TestAcme(unittest.TestCase):
@patch("acme.client.Client")
@patch("OpenSSL.crypto", return_value="mock_cert")
@patch("josepy.util.ComparableX509")
@patch("lemur.plugins.lemur_acme.plugin.AcmeHandler.find_dns_challenge")
@patch("lemur.plugins.lemur_acme.plugin.AcmeHandler.get_dns_challenges")
@patch("lemur.plugins.lemur_acme.plugin.current_app")
def test_request_certificate(
self,
mock_current_app,
mock_find_dns_challenge,
mock_get_dns_challenges,
mock_jose,
mock_crypto,
mock_acme,
@ -256,11 +266,11 @@ class TestAcme(unittest.TestCase):
@patch("lemur.plugins.lemur_acme.cloudflare.current_app")
@patch("lemur.plugins.lemur_acme.plugin.dns_provider_service")
def test_get_dns_provider(
self,
mock_dns_provider_service,
mock_current_app_cloudflare,
mock_current_app_dyn,
mock_current_app,
self,
mock_dns_provider_service,
mock_current_app_cloudflare,
mock_current_app_dyn,
mock_current_app,
):
provider = plugin.ACMEIssuerPlugin()
route53 = provider.get_dns_provider("route53")
@ -349,14 +359,14 @@ class TestAcme(unittest.TestCase):
@patch("lemur.plugins.lemur_acme.plugin.AcmeHandler.request_certificate")
@patch("lemur.plugins.lemur_acme.plugin.authorization_service")
def test_create_certificate(
self,
mock_authorization_service,
mock_request_certificate,
mock_finalize_authorizations,
mock_get_authorizations,
mock_current_app,
mock_dns_provider_service,
mock_acme,
self,
mock_authorization_service,
mock_request_certificate,
mock_finalize_authorizations,
mock_get_authorizations,
mock_current_app,
mock_dns_provider_service,
mock_acme,
):
provider = plugin.ACMEIssuerPlugin()
mock_authority = Mock()
@ -423,10 +433,10 @@ class TestAcme(unittest.TestCase):
ultradns._post = Mock()
ultradns._get = Mock()
ultradns._get.return_value = {'zoneName': 'test.example.com.com',
'rrSets': [{'ownerName': '_acme-challenge.test.example.com.',
'rrtype': 'TXT (16)', 'ttl': 5, 'rdata': ['ABCDEFGHIJ']}],
'queryInfo': {'sort': 'OWNER', 'reverse': False, 'limit': 100},
'resultInfo': {'totalCount': 1, 'offset': 0, 'returnedCount': 1}}
'rrSets': [{'ownerName': '_acme-challenge.test.example.com.',
'rrtype': 'TXT (16)', 'ttl': 5, 'rdata': ['ABCDEFGHIJ']}],
'queryInfo': {'sort': 'OWNER', 'reverse': False, 'limit': 100},
'resultInfo': {'totalCount': 1, 'offset': 0, 'returnedCount': 1}}
ultradns._delete = Mock()
mock_metrics.send = Mock()
ultradns.delete_txt_record(change_id, account_number, domain, token)