diff --git a/lemur/plugins/lemur_acme/powerdns.py b/lemur/plugins/lemur_acme/powerdns.py index 2f80e54f..f68828d1 100644 --- a/lemur/plugins/lemur_acme/powerdns.py +++ b/lemur/plugins/lemur_acme/powerdns.py @@ -57,6 +57,18 @@ def _get(path, params=None): resp.raise_for_status() return resp.json() +def _patch(path, payload): + """ + Function to execute a Patch request on the given URL (base_uri + path) with given data + """ + base_uri = current_app.config.get("ACME_POWERDNS_DOMAIN", "") + resp = requests.patch( + f"{base_uri}{path}", + headers=_generate_header(), + data=json.dumps(payload) + ) + resp.raise_for_status() + def get_zones(account_number): """Get zones from the PowerDNS""" @@ -69,6 +81,23 @@ def get_zones(account_number): zones.append(zone.name) return zones +def _get_zone_name(domain, account_number): + """Get the matching zone for the given domain""" + zones = get_zones(account_number) + zone_name = "" + for z in zones: + if domain.endswith(z): + # Find the most specific zone possible for the domain + # Ex: If fqdn is a.b.c.com, there is a zone for c.com, + # and a zone for b.c.com, we want to use b.c.com. + if z.count(".") > zone_name.count("."): + zone_name = z + if not zone_name: + function = sys._getframe().f_code.co_name + metrics.send(f"{function}.fail", "counter", 1) + raise Exception(f"No PowerDNS zone found for domain: {domain}") + return zone_name + def create_txt_record(domain, token, account_number): """ Create a TXT record for the given domain. @@ -81,7 +110,57 @@ def create_txt_record(domain, token, account_number): Matching zone - example.com Owner name - _acme-challenge.lemur """ - pass + + zone_name = _get_zone_name(domain, account_number) + node_name = domain[:-len(".".join(zone_name))] + + server_id = current_app.config.get("ACME_POWERDNS_SERVERID", "") + zone_id = zone_name.join(".") + domain_id = domain.join(".") + + path = f"/api/v1/servers/{server_id}/zones/{zone_id}" + payload = { + "rrsets": [ + { + "name": f"{domain_id}", + "type": "TXT", + "ttl": "300", + "changetype": "REPLACE", + "records": [ + { + "content": f"{token}", + "disabled": "false" + } + ], + "comments": [] + } + ] + } + + try: + _patch(path, payload) + function = sys._getframe().f_code.co_name + log_data = { + "function": function, + "fqdn": domain, + "token": token, + "message": "TXT record successfully created" + } + current_app.logger.debug(log_data) + except Exception as e: + function = sys._getframe().f_code.co_name + log_data = { + "function": function, + "domain": domain, + "token": token, + "Exception": e, + "message": "Unable to create TXT record" + } + current_app.logger.debug(log_data) + + change_id = (domain, token) + return change_id + def wait_for_dns_change(change_id, account_number=None): """ diff --git a/lemur/plugins/lemur_acme/tests/test_powerdns.py b/lemur/plugins/lemur_acme/tests/test_powerdns.py index f39ef3c5..e0808d68 100644 --- a/lemur/plugins/lemur_acme/tests/test_powerdns.py +++ b/lemur/plugins/lemur_acme/tests/test_powerdns.py @@ -5,6 +5,7 @@ from mock import MagicMock, Mock, patch from lemur.plugins.lemur_acme import plugin, powerdns + class TestPowerdns(unittest.TestCase): @patch("lemur.plugins.lemur_acme.plugin.dns_provider_service") def setUp(self, mock_dns_provider_service): @@ -19,37 +20,6 @@ class TestPowerdns(unittest.TestCase): "test.fakedomain.net": [mock_dns_provider], } - @patch("lemur.plugins.lemur_acme.powerdns.requests") - @patch("lemur.plugins.lemur_acme.powerdns.current_app") - def test_powerdns_get_token(self, mock_current_app, mock_requests): - # ret_val = json.dumps({"access_token": "access"}) - the_response = Response() - the_response._content = b'{"access_token": "access"}' - mock_requests.post = Mock(return_value=the_response) - mock_current_app.config.get = Mock(return_value="Test") - result = powerdns.get_powerdns_token() - self.assertTrue(len(result) > 0) - - @patch("lemur.plugins.lemur_acme.powerdns.current_app") - def test_powerdns_create_txt_record(self, mock_current_app): - domain = "_acme_challenge.test.example.com" - zone = "test.example.com" - token = "ABCDEFGHIJ" - account_number = "1234567890" - change_id = (domain, token) - powerdns.get_zone_name = Mock(return_value=zone) - mock_current_app.logger.debug = Mock() - powerdns._post = Mock() - log_data = { - "function": "create_txt_record", - "fqdn": domain, - "token": token, - "message": "TXT record created" - } - result = powerdns.create_txt_record(domain, token, account_number) - mock_current_app.logger.debug.assert_called_with(log_data) - self.assertEqual(result, change_id) - @patch("lemur.plugins.lemur_acme.powerdns.current_app") @patch("lemur.extensions.metrics") def test_powerdns_delete_txt_record(self, mock_metrics, mock_current_app): @@ -94,15 +64,6 @@ class TestPowerdns(unittest.TestCase): } mock_current_app.logger.debug.assert_called_with(log_data) - def test_powerdns_get_zone_name(self): - zones = ['example.com', 'test.example.com'] - zone = "test.example.com" - domain = "_acme-challenge.test.example.com" - account_number = "1234567890" - powerdns.get_zones = Mock(return_value=zones) - result = powerdns.get_zone_name(domain, account_number) - self.assertEqual(result, zone) - @patch("lemur.plugins.lemur_acme.powerdns.current_app") def test_powerdns_get_zones(self, mock_current_app): account_number = "1234567890" @@ -121,4 +82,45 @@ class TestPowerdns(unittest.TestCase): powerdns._get.side_effect = [get_response] mock_current_app.config.get = Mock(return_value="localhost") result = powerdns.get_zones(account_number) - self.assertEqual(result, zones) \ No newline at end of file + self.assertEqual(result, zones) + + def test_powerdns_get_zone_name(self): + zones = ['example.com', 'test.example.com'] + zone = "test.example.com" + domain = "_acme-challenge.test.example.com" + account_number = "1234567890" + powerdns.get_zones = Mock(return_value=zones) + result = powerdns._get_zone_name(domain, account_number) + self.assertEqual(result, zone) + + def mock_current_app_config_get(a, b): + """ Mock of current_app.config.get() """ + config = { + 'ACME_POWERDNS_APIKEYNAME': 'X-API-Key', + 'ACME_POWERDNS_APIKEY': 'KEY', + 'ACME_POWERDNS_DOMAIN': 'http://internal-dnshiddenmaster-1486232504.us-east-1.elb.amazonaws.com', + 'ACME_POWERDNS_SERVERID': 'localhost' + } + return config[a] + + @patch("lemur.plugins.lemur_acme.powerdns.current_app") + # @patch("lemur.plugins.lemur_acme.powerdns.current_app.config.get", side_effect=mock_current_app_config_get) + def test_powerdns_create_txt_record(self, mock_current_app): + domain = "_acme_challenge.test.example.com" + zone = "test.example.com" + token = "ABCDEFGHIJ" + account_number = "1234567890" + change_id = (domain, token) + powerdns._get_zone_name = Mock(return_value=zone) + mock_current_app.logger.debug = Mock() + mock_current_app.config.get = Mock(return_value="localhost") + powerdns._patch = Mock() + log_data = { + "function": "create_txt_record", + "fqdn": domain, + "token": token, + "message": "TXT record successfully created" + } + result = powerdns.create_txt_record(domain, token, account_number) + mock_current_app.logger.debug.assert_called_with(log_data) + self.assertEqual(result, change_id) \ No newline at end of file