diff --git a/lemur/plugins/lemur_acme/powerdns.py b/lemur/plugins/lemur_acme/powerdns.py index 688a84f2..1efe0752 100644 --- a/lemur/plugins/lemur_acme/powerdns.py +++ b/lemur/plugins/lemur_acme/powerdns.py @@ -224,6 +224,21 @@ def _generate_header(): return headers +def _get_zone_name(domain, account_number): + """Get most specific matching zone for the given domain and return as a String""" + zones = get_zones(account_number) + zone_name = "" + for z in zones: + if domain.endswith(z): + if z.count(".") > zone_name.count("."): + zone_name = z + if not zone_name: + function = sys._getframe().f_code.co_name + metrics.send(f"{function}.fail", "counter", 1) + raise Exception(f"No PowerDNS zone found for domain: {domain}") + return zone_name + + def _get(path, params=None): """ Execute a GET request on the given URL (base_uri + path) and return response as JSON object """ base_uri = current_app.config.get("ACME_POWERDNS_DOMAIN", "") @@ -246,19 +261,3 @@ def _patch(path, payload): headers=_generate_header() ) resp.raise_for_status() - - -def _get_zone_name(domain, account_number): - """Get most specific matching zone for the given domain and return as a String""" - zones = get_zones(account_number) - zone_name = "" - for z in zones: - if domain.endswith(z): - if z.count(".") > zone_name.count("."): - zone_name = z - if not zone_name: - function = sys._getframe().f_code.co_name - metrics.send(f"{function}.fail", "counter", 1) - raise Exception(f"No PowerDNS zone found for domain: {domain}") - return zone_name - diff --git a/lemur/plugins/lemur_acme/tests/test_powerdns.py b/lemur/plugins/lemur_acme/tests/test_powerdns.py index f1190732..4c483741 100644 --- a/lemur/plugins/lemur_acme/tests/test_powerdns.py +++ b/lemur/plugins/lemur_acme/tests/test_powerdns.py @@ -67,35 +67,34 @@ class TestPowerdns(unittest.TestCase): mock_current_app.logger.debug.assert_called_with(log_data) self.assertEqual(result, change_id) + @patch("lemur.plugins.lemur_acme.powerdns.dnsutil") @patch("lemur.plugins.lemur_acme.powerdns.current_app") @patch("lemur.extensions.metrics") @patch("time.sleep") - def test_wait_for_dns_change(self, mock_sleep, mock_metrics, mock_current_app): - nameserver = "1.1.1.1" - powerdns._get_authoritative_nameserver = Mock(return_value=nameserver) - powerdns._has_dns_propagated = Mock(return_value=True) - mock_metrics.send = Mock() - mock_sleep.return_value = False + def test_wait_for_dns_change(self, mock_sleep, mock_metrics, mock_current_app, mock_dnsutil): domain = "_acme-challenge.test.example.com" - token = "ABCDEFGHIJ" + token = "ABCDEFG" + zone_name = "test.example.com" + nameserver = "1.1.1.1" change_id = (domain, token) + mock_records = (token,) + + mock_current_app.config.get = Mock(return_value=1) + powerdns._get_zone_name = Mock(return_value=zone_name) + mock_dnsutil.get_authoritative_nameserver = Mock(return_value=nameserver) + mock_dnsutil.get_dns_records = Mock(return_value=mock_records) + mock_sleep.return_value = False + mock_metrics.send = Mock() mock_current_app.logger.debug = Mock() powerdns.wait_for_dns_change(change_id) - auth_log_data = { + log_data = { "function": "wait_for_dns_change", "fqdn": domain, "status": True, - "message": "Record status on UltraDNS authoritative server" + "message": "Record status on PowerDNS authoritative server" } - pub_log_data = { - "function": "wait_for_dns_change", - "fqdn": domain, - "status": True, - "message": "Record status on Public DNS" - } - mock_current_app.logger.debug.assert_any_call(auth_log_data) - mock_current_app.logger.debug.assert_any_call(pub_log_data) + mock_current_app.logger.debug.assert_called_with(log_data) @patch("lemur.plugins.lemur_acme.powerdns.current_app") def test_delete_txt_record(self, mock_current_app):