adding wait_for_dns_change() and tests for PowerDNS ACME plugin

This commit is contained in:
csine-nflx 2020-01-21 18:47:21 -08:00
parent 915ec0ba63
commit 52c7686d58
2 changed files with 161 additions and 24 deletions

View File

@ -33,6 +33,34 @@ class Zone:
""" Indicates whether the zone is setup as a PRIMARY or SECONDARY """ """ Indicates whether the zone is setup as a PRIMARY or SECONDARY """
return self._data["kind"] return self._data["kind"]
class Record:
"""
This class implements a PowerDNS record.
Accepts the response from the API call as the argument.
"""
def __init__(self, _data):
# Since we are dealing with only TXT records for Lemur, we expect only 1 RRSet in the response.
# Thus we default to picking up the first entry (_data["rrsets"][0]) from the response.
self._data = _data
@property
def name(self):
return self._data["name"]
@property
def disabled(self):
return self._data["disabled"]
@property
def content(self):
return self._data["content"]
@property
def ttl(self):
return self._data["ttl"]
def _generate_header(): def _generate_header():
"""Function to generate the header for a request using the PowerDNS API Key""" """Function to generate the header for a request using the PowerDNS API Key"""
@ -147,7 +175,7 @@ def create_txt_record(domain, token, account_number):
"message": "TXT record successfully created" "message": "TXT record successfully created"
} }
current_app.logger.debug(log_data) current_app.logger.debug(log_data)
except Exception as e: except requests.exceptions.RequestException as e:
function = sys._getframe().f_code.co_name function = sys._getframe().f_code.co_name
log_data = { log_data = {
"function": function, "function": function,
@ -161,6 +189,78 @@ def create_txt_record(domain, token, account_number):
change_id = (domain, token) change_id = (domain, token)
return change_id return change_id
def _get_authoritative_nameserver(domain):
"""Get the authoritative nameserver for the given domain"""
n = dns.name.from_text(domain)
depth = 2
default = dns.resolver.get_default_resolver()
nameserver = default.nameservers[0]
last = False
while not last:
s = n.split(depth)
last = s[0].to_unicode() == u"@"
sub = s[1]
query = dns.message.make_query(sub, dns.rdatatype.NS)
response = dns.query.udp(query, nameserver)
rcode = response.rcode()
if rcode != dns.rcode.NOERROR:
function = sys._getframe().f_code.co_name
metrics.send(f"{function}.error", "counter", 1)
if rcode == dns.rcode.NXDOMAIN:
raise Exception("%s does not exist." % sub)
else:
raise Exception("Error %s" % dns.rcode.to_text(rcode))
if len(response.authority) > 0:
rrset = response.authority[0]
else:
rrset = response.answer[0]
rr = rrset[0]
if rr.rdtype != dns.rdatatype.SOA:
authority = rr.target
nameserver = default.query(authority).rrset[0].to_text()
depth += 1
return nameserver
def _get_public_authoritative_nameserver():
return "8.8.8.8"
def _has_dns_propagated(name, token, domain):
"""
Check whether the DNS change made by Lemur have propagated to the public DNS or not.
Invoked by wait_for_dns_change() function
"""
txt_records = []
try:
dns_resolver = dns.resolver.Resolver()
dns_resolver.nameservers = [domain]
dns_response = dns_resolver.query(name, "TXT")
for rdata in dns_response:
for txt_record in rdata.strings:
txt_records.append(txt_record.decode("utf-8"))
except dns.exception.DNSException:
function = sys._getframe().f_code.co_name
metrics.send(f"{function}.fail", "counter", 1)
return False
for txt_record in txt_records:
if txt_record == token:
function = sys._getframe().f_code.co_name
metrics.send(f"{function}.success", "counter", 1)
return True
return False
def wait_for_dns_change(change_id, account_number=None): def wait_for_dns_change(change_id, account_number=None):
""" """
@ -169,7 +269,44 @@ def wait_for_dns_change(change_id, account_number=None):
First check the domains authoritative server. Once this succeeds, First check the domains authoritative server. Once this succeeds,
we ask a public DNS server (Google <8.8.8.8> in our case). we ask a public DNS server (Google <8.8.8.8> in our case).
""" """
pass domain, token = change_id
number_of_attempts = 20
# Check if Record exists via DNS
nameserver = _get_authoritative_nameserver(domain)
for attempts in range(0, number_of_attempts):
status = _has_dns_propagated(domain, token, nameserver)
function = sys._getframe().f_code.co_name
log_data = {
"function": function,
"fqdn": domain,
"status": status,
"message": "Record status on ultraDNS authoritative server"
}
current_app.logger.debug(log_data)
if status:
time.sleep(10)
break
time.sleep(10)
if status:
nameserver = _get_public_authoritative_nameserver()
for attempts in range(0, number_of_attempts):
status = _has_dns_propagated(domain, token, nameserver)
log_data = {
"function": function,
"fqdn": domain,
"status": status,
"message": "Record status on Public DNS"
}
current_app.logger.debug(log_data)
if status:
metrics.send(f"{function}.success", "counter", 1)
break
time.sleep(10)
if not status:
metrics.send(f"{function}.fail", "counter", 1, metric_tags={"fqdn": domain, "txt_record": token})
sentry.captureException(extra={"fqdn": str(domain), "txt_record": str(token)})
return
def delete_txt_record(change_id, account_number, domain, token): def delete_txt_record(change_id, account_number, domain, token):
""" """

View File

@ -43,27 +43,6 @@ class TestPowerdns(unittest.TestCase):
mock_current_app.logger.debug.assert_not_called() mock_current_app.logger.debug.assert_not_called()
mock_metrics.send.assert_not_called() mock_metrics.send.assert_not_called()
@patch("lemur.plugins.lemur_acme.powerdns.current_app")
@patch("lemur.extensions.metrics")
def test_powerdns_wait_for_dns_change(self, mock_metrics, mock_current_app):
powerdns._has_dns_propagated = Mock(return_value=True)
nameserver = "1.1.1.1"
powerdns.get_authoritative_nameserver = Mock(return_value=nameserver)
mock_metrics.send = Mock()
domain = "_acme-challenge.test.example.com"
token = "ABCDEFGHIJ"
change_id = (domain, token)
mock_current_app.logger.debug = Mock()
powerdns.wait_for_dns_change(change_id)
# mock_metrics.send.assert_not_called()
log_data = {
"function": "wait_for_dns_change",
"fqdn": domain,
"status": True,
"message": "Record status on Public DNS"
}
mock_current_app.logger.debug.assert_called_with(log_data)
@patch("lemur.plugins.lemur_acme.powerdns.current_app") @patch("lemur.plugins.lemur_acme.powerdns.current_app")
def test_powerdns_get_zones(self, mock_current_app): def test_powerdns_get_zones(self, mock_current_app):
account_number = "1234567890" account_number = "1234567890"
@ -124,3 +103,24 @@ class TestPowerdns(unittest.TestCase):
result = powerdns.create_txt_record(domain, token, account_number) result = powerdns.create_txt_record(domain, token, account_number)
mock_current_app.logger.debug.assert_called_with(log_data) mock_current_app.logger.debug.assert_called_with(log_data)
self.assertEqual(result, change_id) self.assertEqual(result, change_id)
@patch("lemur.plugins.lemur_acme.powerdns.current_app")
@patch("lemur.extensions.metrics")
def test_powerdns_wait_for_dns_change(self, mock_metrics, mock_current_app):
powerdns._has_dns_propagated = Mock(return_value=True)
nameserver = "1.1.1.1"
powerdns._get_authoritative_nameserver = Mock(return_value=nameserver)
mock_metrics.send = Mock()
domain = "_acme-challenge.test.example.com"
token = "ABCDEFGHIJ"
change_id = (domain, token)
mock_current_app.logger.debug = Mock()
powerdns.wait_for_dns_change(change_id)
# mock_metrics.send.assert_not_called()
log_data = {
"function": "wait_for_dns_change",
"fqdn": domain,
"status": True,
"message": "Record status on Public DNS"
}
mock_current_app.logger.debug.assert_called_with(log_data)