created CLI options for testin ACME over dns. Examle: acme dnstest -d _acme-chall.foo.com -t token1
This commit is contained in:
103
lemur/dns_providers/util.py
Normal file
103
lemur/dns_providers/util.py
Normal file
@ -0,0 +1,103 @@
|
||||
import sys
|
||||
import dns
|
||||
import dns.exception
|
||||
import dns.name
|
||||
import dns.query
|
||||
import dns.resolver
|
||||
import re
|
||||
|
||||
from flask import current_app
|
||||
from lemur.extensions import metrics, sentry
|
||||
|
||||
|
||||
class DNSError(Exception):
|
||||
"""Base class for DNS Exceptions."""
|
||||
pass
|
||||
|
||||
|
||||
class BadDomainError(DNSError):
|
||||
"""Error for when a Bad Domain Name is given."""
|
||||
|
||||
def __init__(self, message):
|
||||
self.message = message
|
||||
|
||||
|
||||
class DNSResolveError(DNSError):
|
||||
"""Error for DNS Resolution Errors."""
|
||||
|
||||
def __init__(self, message):
|
||||
self.message = message
|
||||
|
||||
|
||||
def is_valid_domain(domain):
|
||||
"""Checks if a domain is syntactically valid and returns a bool"""
|
||||
if len(domain) > 253:
|
||||
return False
|
||||
if domain[-1] == ".":
|
||||
domain = domain[:-1]
|
||||
fqdn_re = re.compile("(?=^.{1,254}$)(^(?:(?!\d+\.|-)[a-zA-Z0-9_\-]{1,63}(?<!-)\.?)+(?:[a-zA-Z]{2,})$)", re.IGNORECASE)
|
||||
return all(fqdn_re.match(d) for d in domain.split("."))
|
||||
|
||||
|
||||
def get_authoritative_nameserver(domain):
|
||||
"""Get the authoritative nameservers for the given domain"""
|
||||
if not is_valid_domain(domain):
|
||||
raise BadDomainError(f"{domain} is not a valid FQDN")
|
||||
|
||||
n = dns.name.from_text(domain)
|
||||
|
||||
depth = 2
|
||||
default = dns.resolver.get_default_resolver()
|
||||
nameserver = default.nameservers[0]
|
||||
|
||||
last = False
|
||||
while not last:
|
||||
s = n.split(depth)
|
||||
|
||||
last = s[0].to_unicode() == u"@"
|
||||
sub = s[1]
|
||||
|
||||
query = dns.message.make_query(sub, dns.rdatatype.NS)
|
||||
response = dns.query.udp(query, nameserver)
|
||||
|
||||
rcode = response.rcode()
|
||||
if rcode != dns.rcode.NOERROR:
|
||||
function = sys._getframe().f_code.co_name
|
||||
metrics.send(f"{function}.error", "counter", 1)
|
||||
if rcode == dns.rcode.NXDOMAIN:
|
||||
raise DNSResolveError(f"{sub} does not exist.")
|
||||
else:
|
||||
raise DNSResolveError(f"Error: {dns.rcode.to_text(rcode)}")
|
||||
|
||||
if len(response.authority) > 0:
|
||||
rrset = response.authority[0]
|
||||
else:
|
||||
rrset = response.answer[0]
|
||||
|
||||
rr = rrset[0]
|
||||
if rr.rdtype != dns.rdatatype.SOA:
|
||||
authority = rr.target
|
||||
nameserver = default.query(authority).rrset[0].to_text()
|
||||
|
||||
depth += 1
|
||||
|
||||
return nameserver
|
||||
|
||||
|
||||
def get_dns_records(domain, rdtype, nameserver):
|
||||
"""Retrieves the DNS records matching the name and type and returns a list of records"""
|
||||
# if not nameserver:
|
||||
# nameserver = get_authoritative_nameserver(domain)[0]
|
||||
|
||||
records = []
|
||||
try:
|
||||
dns_resolver = dns.resolver.Resolver()
|
||||
dns_resolver.nameservers = [nameserver]
|
||||
dns_response = dns_resolver.query(domain, rdtype)
|
||||
for rdata in dns_response:
|
||||
for record in rdata.strings:
|
||||
records.append(record.decode("utf-8"))
|
||||
except dns.exception.DNSException:
|
||||
function = sys._getframe().f_code.co_name
|
||||
metrics.send(f"{function}.fail", "counter", 1)
|
||||
return records
|
Reference in New Issue
Block a user