adding dns tests and assorted exception handling

This commit is contained in:
csine-nflx 2020-01-31 13:16:37 -08:00
parent 969a7107fe
commit be7736d350
4 changed files with 25 additions and 16 deletions

View File

@ -10,7 +10,7 @@ from lemur.constants import SUCCESS_METRIC_STATUS
from lemur.plugins.lemur_acme.plugin import AcmeHandler from lemur.plugins.lemur_acme.plugin import AcmeHandler
manager = Manager( manager = Manager(
usage="This provides ability to test ACME issuance" usage="Handles all ACME related tasks"
) )
@ -30,7 +30,7 @@ manager = Manager(
) )
def dnstest(domain, token): def dnstest(domain, token):
""" """
Attempts to create, verify, and delete DNS TXT records with an autodetected provider. Create, verify, and delete DNS TXT records using an autodetected provider.
""" """
print("[+] Starting ACME Tests.") print("[+] Starting ACME Tests.")
change_id = (domain, token) change_id = (domain, token)
@ -53,7 +53,7 @@ def dnstest(domain, token):
change_id = dns_provider_plugin.create_txt_record(domain, token, account_number) change_id = dns_provider_plugin.create_txt_record(domain, token, account_number)
print("[+] Verifying TXT Record has propagated to DNS.") print("[+] Verifying TXT Record has propagated to DNS.")
print("[+] Waiting 60 second before continuing...") print("[+] This step could take a while...")
time.sleep(10) time.sleep(10)
# Verify TXT Records # Verify TXT Records
@ -64,7 +64,7 @@ def dnstest(domain, token):
try: try:
dns_provider_plugin.wait_for_dns_change(change_id, account_number) dns_provider_plugin.wait_for_dns_change(change_id, account_number)
print(f"[+] Verfied TXT Record in `{dns_provider.name}` provider") print(f"[+] Verified TXT Record in `{dns_provider.name}` provider")
except Exception: except Exception:
metrics.send("complete_dns_challenge_error", "counter", 1) metrics.send("complete_dns_challenge_error", "counter", 1)
sentry.captureException() sentry.captureException()

View File

@ -6,8 +6,7 @@ import dns.query
import dns.resolver import dns.resolver
import re import re
from flask import current_app from lemur.extensions import metrics
from lemur.extensions import metrics, sentry
class DNSError(Exception): class DNSError(Exception):
@ -86,9 +85,6 @@ def get_authoritative_nameserver(domain):
def get_dns_records(domain, rdtype, nameserver): def get_dns_records(domain, rdtype, nameserver):
"""Retrieves the DNS records matching the name and type and returns a list of records""" """Retrieves the DNS records matching the name and type and returns a list of records"""
# if not nameserver:
# nameserver = get_authoritative_nameserver(domain)[0]
records = [] records = []
try: try:
dns_resolver = dns.resolver.Resolver() dns_resolver = dns.resolver.Resolver()

View File

@ -67,20 +67,20 @@ def get_zones(account_number):
"message": "Retrieved Zones Successfully" "message": "Retrieved Zones Successfully"
} }
current_app.logger.debug(log_data) current_app.logger.debug(log_data)
for record in records:
zone = Zone(record)
if zone.kind == 'Master':
zones.append(zone.name)
return zones
except Exception as e: except Exception as e:
records = _get(path)
function = sys._getframe().f_code.co_name function = sys._getframe().f_code.co_name
log_data = { log_data = {
"function": function, "function": function,
"message": "Failed to Retrieve Zone Data" "message": "Failed to Retrieve Zone Data"
} }
current_app.logger.debug(log_data) current_app.logger.debug(log_data)
raise
for record in records:
zone = Zone(record)
if zone.kind == 'Master':
zones.append(zone.name)
return zones
def create_txt_record(domain, token, account_number): def create_txt_record(domain, token, account_number):

View File

@ -0,0 +1,13 @@
import unittest
from mock import Mock, patch
from lemur.dns_providers import util as dnsutil
class TestDNSProvider(unittest.TestCase):
def test_is_valid_domain(self):
self.assertTrue(dnsutil.is_valid_domain("example.com"))
self.assertTrue(dnsutil.is_valid_domain("foo.bar.org"))
self.assertTrue(dnsutil.is_valid_domain("_acme-chall.example.com"))
self.assertFalse(dnsutil.is_valid_domain("e/xample.com"))
self.assertFalse(dnsutil.is_valid_domain("exam\ple.com"))
self.assertFalse(dnsutil.is_valid_domain("*.example.com"))