added PowerDNS support for create_txt_record and associated tests
This commit is contained in:
parent
3080a9527c
commit
915ec0ba63
|
@ -57,6 +57,18 @@ def _get(path, params=None):
|
||||||
resp.raise_for_status()
|
resp.raise_for_status()
|
||||||
return resp.json()
|
return resp.json()
|
||||||
|
|
||||||
|
def _patch(path, payload):
|
||||||
|
"""
|
||||||
|
Function to execute a Patch request on the given URL (base_uri + path) with given data
|
||||||
|
"""
|
||||||
|
base_uri = current_app.config.get("ACME_POWERDNS_DOMAIN", "")
|
||||||
|
resp = requests.patch(
|
||||||
|
f"{base_uri}{path}",
|
||||||
|
headers=_generate_header(),
|
||||||
|
data=json.dumps(payload)
|
||||||
|
)
|
||||||
|
resp.raise_for_status()
|
||||||
|
|
||||||
|
|
||||||
def get_zones(account_number):
|
def get_zones(account_number):
|
||||||
"""Get zones from the PowerDNS"""
|
"""Get zones from the PowerDNS"""
|
||||||
|
@ -69,6 +81,23 @@ def get_zones(account_number):
|
||||||
zones.append(zone.name)
|
zones.append(zone.name)
|
||||||
return zones
|
return zones
|
||||||
|
|
||||||
|
def _get_zone_name(domain, account_number):
|
||||||
|
"""Get the matching zone for the given domain"""
|
||||||
|
zones = get_zones(account_number)
|
||||||
|
zone_name = ""
|
||||||
|
for z in zones:
|
||||||
|
if domain.endswith(z):
|
||||||
|
# Find the most specific zone possible for the domain
|
||||||
|
# Ex: If fqdn is a.b.c.com, there is a zone for c.com,
|
||||||
|
# and a zone for b.c.com, we want to use b.c.com.
|
||||||
|
if z.count(".") > zone_name.count("."):
|
||||||
|
zone_name = z
|
||||||
|
if not zone_name:
|
||||||
|
function = sys._getframe().f_code.co_name
|
||||||
|
metrics.send(f"{function}.fail", "counter", 1)
|
||||||
|
raise Exception(f"No PowerDNS zone found for domain: {domain}")
|
||||||
|
return zone_name
|
||||||
|
|
||||||
def create_txt_record(domain, token, account_number):
|
def create_txt_record(domain, token, account_number):
|
||||||
"""
|
"""
|
||||||
Create a TXT record for the given domain.
|
Create a TXT record for the given domain.
|
||||||
|
@ -81,7 +110,57 @@ def create_txt_record(domain, token, account_number):
|
||||||
Matching zone - example.com
|
Matching zone - example.com
|
||||||
Owner name - _acme-challenge.lemur
|
Owner name - _acme-challenge.lemur
|
||||||
"""
|
"""
|
||||||
pass
|
|
||||||
|
zone_name = _get_zone_name(domain, account_number)
|
||||||
|
node_name = domain[:-len(".".join(zone_name))]
|
||||||
|
|
||||||
|
server_id = current_app.config.get("ACME_POWERDNS_SERVERID", "")
|
||||||
|
zone_id = zone_name.join(".")
|
||||||
|
domain_id = domain.join(".")
|
||||||
|
|
||||||
|
path = f"/api/v1/servers/{server_id}/zones/{zone_id}"
|
||||||
|
payload = {
|
||||||
|
"rrsets": [
|
||||||
|
{
|
||||||
|
"name": f"{domain_id}",
|
||||||
|
"type": "TXT",
|
||||||
|
"ttl": "300",
|
||||||
|
"changetype": "REPLACE",
|
||||||
|
"records": [
|
||||||
|
{
|
||||||
|
"content": f"{token}",
|
||||||
|
"disabled": "false"
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"comments": []
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
|
||||||
|
try:
|
||||||
|
_patch(path, payload)
|
||||||
|
function = sys._getframe().f_code.co_name
|
||||||
|
log_data = {
|
||||||
|
"function": function,
|
||||||
|
"fqdn": domain,
|
||||||
|
"token": token,
|
||||||
|
"message": "TXT record successfully created"
|
||||||
|
}
|
||||||
|
current_app.logger.debug(log_data)
|
||||||
|
except Exception as e:
|
||||||
|
function = sys._getframe().f_code.co_name
|
||||||
|
log_data = {
|
||||||
|
"function": function,
|
||||||
|
"domain": domain,
|
||||||
|
"token": token,
|
||||||
|
"Exception": e,
|
||||||
|
"message": "Unable to create TXT record"
|
||||||
|
}
|
||||||
|
current_app.logger.debug(log_data)
|
||||||
|
|
||||||
|
change_id = (domain, token)
|
||||||
|
return change_id
|
||||||
|
|
||||||
|
|
||||||
def wait_for_dns_change(change_id, account_number=None):
|
def wait_for_dns_change(change_id, account_number=None):
|
||||||
"""
|
"""
|
||||||
|
|
|
@ -5,6 +5,7 @@ from mock import MagicMock, Mock, patch
|
||||||
|
|
||||||
from lemur.plugins.lemur_acme import plugin, powerdns
|
from lemur.plugins.lemur_acme import plugin, powerdns
|
||||||
|
|
||||||
|
|
||||||
class TestPowerdns(unittest.TestCase):
|
class TestPowerdns(unittest.TestCase):
|
||||||
@patch("lemur.plugins.lemur_acme.plugin.dns_provider_service")
|
@patch("lemur.plugins.lemur_acme.plugin.dns_provider_service")
|
||||||
def setUp(self, mock_dns_provider_service):
|
def setUp(self, mock_dns_provider_service):
|
||||||
|
@ -19,37 +20,6 @@ class TestPowerdns(unittest.TestCase):
|
||||||
"test.fakedomain.net": [mock_dns_provider],
|
"test.fakedomain.net": [mock_dns_provider],
|
||||||
}
|
}
|
||||||
|
|
||||||
@patch("lemur.plugins.lemur_acme.powerdns.requests")
|
|
||||||
@patch("lemur.plugins.lemur_acme.powerdns.current_app")
|
|
||||||
def test_powerdns_get_token(self, mock_current_app, mock_requests):
|
|
||||||
# ret_val = json.dumps({"access_token": "access"})
|
|
||||||
the_response = Response()
|
|
||||||
the_response._content = b'{"access_token": "access"}'
|
|
||||||
mock_requests.post = Mock(return_value=the_response)
|
|
||||||
mock_current_app.config.get = Mock(return_value="Test")
|
|
||||||
result = powerdns.get_powerdns_token()
|
|
||||||
self.assertTrue(len(result) > 0)
|
|
||||||
|
|
||||||
@patch("lemur.plugins.lemur_acme.powerdns.current_app")
|
|
||||||
def test_powerdns_create_txt_record(self, mock_current_app):
|
|
||||||
domain = "_acme_challenge.test.example.com"
|
|
||||||
zone = "test.example.com"
|
|
||||||
token = "ABCDEFGHIJ"
|
|
||||||
account_number = "1234567890"
|
|
||||||
change_id = (domain, token)
|
|
||||||
powerdns.get_zone_name = Mock(return_value=zone)
|
|
||||||
mock_current_app.logger.debug = Mock()
|
|
||||||
powerdns._post = Mock()
|
|
||||||
log_data = {
|
|
||||||
"function": "create_txt_record",
|
|
||||||
"fqdn": domain,
|
|
||||||
"token": token,
|
|
||||||
"message": "TXT record created"
|
|
||||||
}
|
|
||||||
result = powerdns.create_txt_record(domain, token, account_number)
|
|
||||||
mock_current_app.logger.debug.assert_called_with(log_data)
|
|
||||||
self.assertEqual(result, change_id)
|
|
||||||
|
|
||||||
@patch("lemur.plugins.lemur_acme.powerdns.current_app")
|
@patch("lemur.plugins.lemur_acme.powerdns.current_app")
|
||||||
@patch("lemur.extensions.metrics")
|
@patch("lemur.extensions.metrics")
|
||||||
def test_powerdns_delete_txt_record(self, mock_metrics, mock_current_app):
|
def test_powerdns_delete_txt_record(self, mock_metrics, mock_current_app):
|
||||||
|
@ -94,15 +64,6 @@ class TestPowerdns(unittest.TestCase):
|
||||||
}
|
}
|
||||||
mock_current_app.logger.debug.assert_called_with(log_data)
|
mock_current_app.logger.debug.assert_called_with(log_data)
|
||||||
|
|
||||||
def test_powerdns_get_zone_name(self):
|
|
||||||
zones = ['example.com', 'test.example.com']
|
|
||||||
zone = "test.example.com"
|
|
||||||
domain = "_acme-challenge.test.example.com"
|
|
||||||
account_number = "1234567890"
|
|
||||||
powerdns.get_zones = Mock(return_value=zones)
|
|
||||||
result = powerdns.get_zone_name(domain, account_number)
|
|
||||||
self.assertEqual(result, zone)
|
|
||||||
|
|
||||||
@patch("lemur.plugins.lemur_acme.powerdns.current_app")
|
@patch("lemur.plugins.lemur_acme.powerdns.current_app")
|
||||||
def test_powerdns_get_zones(self, mock_current_app):
|
def test_powerdns_get_zones(self, mock_current_app):
|
||||||
account_number = "1234567890"
|
account_number = "1234567890"
|
||||||
|
@ -121,4 +82,45 @@ class TestPowerdns(unittest.TestCase):
|
||||||
powerdns._get.side_effect = [get_response]
|
powerdns._get.side_effect = [get_response]
|
||||||
mock_current_app.config.get = Mock(return_value="localhost")
|
mock_current_app.config.get = Mock(return_value="localhost")
|
||||||
result = powerdns.get_zones(account_number)
|
result = powerdns.get_zones(account_number)
|
||||||
self.assertEqual(result, zones)
|
self.assertEqual(result, zones)
|
||||||
|
|
||||||
|
def test_powerdns_get_zone_name(self):
|
||||||
|
zones = ['example.com', 'test.example.com']
|
||||||
|
zone = "test.example.com"
|
||||||
|
domain = "_acme-challenge.test.example.com"
|
||||||
|
account_number = "1234567890"
|
||||||
|
powerdns.get_zones = Mock(return_value=zones)
|
||||||
|
result = powerdns._get_zone_name(domain, account_number)
|
||||||
|
self.assertEqual(result, zone)
|
||||||
|
|
||||||
|
def mock_current_app_config_get(a, b):
|
||||||
|
""" Mock of current_app.config.get() """
|
||||||
|
config = {
|
||||||
|
'ACME_POWERDNS_APIKEYNAME': 'X-API-Key',
|
||||||
|
'ACME_POWERDNS_APIKEY': 'KEY',
|
||||||
|
'ACME_POWERDNS_DOMAIN': 'http://internal-dnshiddenmaster-1486232504.us-east-1.elb.amazonaws.com',
|
||||||
|
'ACME_POWERDNS_SERVERID': 'localhost'
|
||||||
|
}
|
||||||
|
return config[a]
|
||||||
|
|
||||||
|
@patch("lemur.plugins.lemur_acme.powerdns.current_app")
|
||||||
|
# @patch("lemur.plugins.lemur_acme.powerdns.current_app.config.get", side_effect=mock_current_app_config_get)
|
||||||
|
def test_powerdns_create_txt_record(self, mock_current_app):
|
||||||
|
domain = "_acme_challenge.test.example.com"
|
||||||
|
zone = "test.example.com"
|
||||||
|
token = "ABCDEFGHIJ"
|
||||||
|
account_number = "1234567890"
|
||||||
|
change_id = (domain, token)
|
||||||
|
powerdns._get_zone_name = Mock(return_value=zone)
|
||||||
|
mock_current_app.logger.debug = Mock()
|
||||||
|
mock_current_app.config.get = Mock(return_value="localhost")
|
||||||
|
powerdns._patch = Mock()
|
||||||
|
log_data = {
|
||||||
|
"function": "create_txt_record",
|
||||||
|
"fqdn": domain,
|
||||||
|
"token": token,
|
||||||
|
"message": "TXT record successfully created"
|
||||||
|
}
|
||||||
|
result = powerdns.create_txt_record(domain, token, account_number)
|
||||||
|
mock_current_app.logger.debug.assert_called_with(log_data)
|
||||||
|
self.assertEqual(result, change_id)
|
Loading…
Reference in New Issue