adding PowerDNS delete_txt_record with associated tests
This commit is contained in:
parent
52c7686d58
commit
bddae6e428
|
@ -12,6 +12,7 @@ import dns.resolver
|
||||||
from flask import current_app
|
from flask import current_app
|
||||||
from lemur.extensions import metrics, sentry
|
from lemur.extensions import metrics, sentry
|
||||||
|
|
||||||
|
|
||||||
class Zone:
|
class Zone:
|
||||||
""" This class implements a PowerDNS zone in JSON. """
|
""" This class implements a PowerDNS zone in JSON. """
|
||||||
|
|
||||||
|
@ -33,16 +34,11 @@ class Zone:
|
||||||
""" Indicates whether the zone is setup as a PRIMARY or SECONDARY """
|
""" Indicates whether the zone is setup as a PRIMARY or SECONDARY """
|
||||||
return self._data["kind"]
|
return self._data["kind"]
|
||||||
|
|
||||||
class Record:
|
|
||||||
"""
|
|
||||||
This class implements a PowerDNS record.
|
|
||||||
|
|
||||||
Accepts the response from the API call as the argument.
|
class Record:
|
||||||
"""
|
""" This class implements a PowerDNS record. """
|
||||||
|
|
||||||
def __init__(self, _data):
|
def __init__(self, _data):
|
||||||
# Since we are dealing with only TXT records for Lemur, we expect only 1 RRSet in the response.
|
|
||||||
# Thus we default to picking up the first entry (_data["rrsets"][0]) from the response.
|
|
||||||
self._data = _data
|
self._data = _data
|
||||||
|
|
||||||
@property
|
@property
|
||||||
|
@ -62,44 +58,8 @@ class Record:
|
||||||
return self._data["ttl"]
|
return self._data["ttl"]
|
||||||
|
|
||||||
|
|
||||||
def _generate_header():
|
|
||||||
"""Function to generate the header for a request using the PowerDNS API Key"""
|
|
||||||
|
|
||||||
api_key_name = current_app.config.get("ACME_POWERDNS_APIKEYNAME", "")
|
|
||||||
api_key = current_app.config.get("ACME_POWERDNS_APIKEY", "")
|
|
||||||
return {api_key_name: api_key}
|
|
||||||
|
|
||||||
|
|
||||||
def _get(path, params=None):
|
|
||||||
"""
|
|
||||||
Function to execute a GET request on the given URL (base_uri + path) with given params
|
|
||||||
Returns JSON response object
|
|
||||||
"""
|
|
||||||
base_uri = current_app.config.get("ACME_POWERDNS_DOMAIN", "")
|
|
||||||
resp = requests.get(
|
|
||||||
f"{base_uri}{path}",
|
|
||||||
headers=_generate_header(),
|
|
||||||
params=params,
|
|
||||||
verify=True,
|
|
||||||
)
|
|
||||||
resp.raise_for_status()
|
|
||||||
return resp.json()
|
|
||||||
|
|
||||||
def _patch(path, payload):
|
|
||||||
"""
|
|
||||||
Function to execute a Patch request on the given URL (base_uri + path) with given data
|
|
||||||
"""
|
|
||||||
base_uri = current_app.config.get("ACME_POWERDNS_DOMAIN", "")
|
|
||||||
resp = requests.patch(
|
|
||||||
f"{base_uri}{path}",
|
|
||||||
headers=_generate_header(),
|
|
||||||
data=json.dumps(payload)
|
|
||||||
)
|
|
||||||
resp.raise_for_status()
|
|
||||||
|
|
||||||
|
|
||||||
def get_zones(account_number):
|
def get_zones(account_number):
|
||||||
"""Get zones from the PowerDNS"""
|
"""Retrieve authoritative zones from the PowerDNS API and return a list"""
|
||||||
server_id = current_app.config.get("ACME_POWERDNS_SERVERID", "")
|
server_id = current_app.config.get("ACME_POWERDNS_SERVERID", "")
|
||||||
path = f"/api/v1/servers/{server_id}/zones"
|
path = f"/api/v1/servers/{server_id}/zones"
|
||||||
zones = []
|
zones = []
|
||||||
|
@ -109,43 +69,13 @@ def get_zones(account_number):
|
||||||
zones.append(zone.name)
|
zones.append(zone.name)
|
||||||
return zones
|
return zones
|
||||||
|
|
||||||
def _get_zone_name(domain, account_number):
|
|
||||||
"""Get the matching zone for the given domain"""
|
|
||||||
zones = get_zones(account_number)
|
|
||||||
zone_name = ""
|
|
||||||
for z in zones:
|
|
||||||
if domain.endswith(z):
|
|
||||||
# Find the most specific zone possible for the domain
|
|
||||||
# Ex: If fqdn is a.b.c.com, there is a zone for c.com,
|
|
||||||
# and a zone for b.c.com, we want to use b.c.com.
|
|
||||||
if z.count(".") > zone_name.count("."):
|
|
||||||
zone_name = z
|
|
||||||
if not zone_name:
|
|
||||||
function = sys._getframe().f_code.co_name
|
|
||||||
metrics.send(f"{function}.fail", "counter", 1)
|
|
||||||
raise Exception(f"No PowerDNS zone found for domain: {domain}")
|
|
||||||
return zone_name
|
|
||||||
|
|
||||||
def create_txt_record(domain, token, account_number):
|
def create_txt_record(domain, token, account_number):
|
||||||
"""
|
""" Create a TXT record for the given domain and token and return a change_id tuple """
|
||||||
Create a TXT record for the given domain.
|
|
||||||
|
|
||||||
The part of the domain that matches with the zone becomes the zone name.
|
|
||||||
The remainder becomes the owner name (referred to as node name here)
|
|
||||||
Example: Let's say we have a zone named "exmaple.com" in PowerDNS and we
|
|
||||||
get a request to create a cert for lemur.example.com
|
|
||||||
Domain - _acme-challenge.lemur.example.com
|
|
||||||
Matching zone - example.com
|
|
||||||
Owner name - _acme-challenge.lemur
|
|
||||||
"""
|
|
||||||
|
|
||||||
zone_name = _get_zone_name(domain, account_number)
|
zone_name = _get_zone_name(domain, account_number)
|
||||||
node_name = domain[:-len(".".join(zone_name))]
|
|
||||||
|
|
||||||
server_id = current_app.config.get("ACME_POWERDNS_SERVERID", "")
|
server_id = current_app.config.get("ACME_POWERDNS_SERVERID", "")
|
||||||
zone_id = zone_name.join(".")
|
zone_id = zone_name.join(".")
|
||||||
domain_id = domain.join(".")
|
domain_id = domain.join(".")
|
||||||
|
|
||||||
path = f"/api/v1/servers/{server_id}/zones/{zone_id}"
|
path = f"/api/v1/servers/{server_id}/zones/{zone_id}"
|
||||||
payload = {
|
payload = {
|
||||||
"rrsets": [
|
"rrsets": [
|
||||||
|
@ -189,6 +119,146 @@ def create_txt_record(domain, token, account_number):
|
||||||
change_id = (domain, token)
|
change_id = (domain, token)
|
||||||
return change_id
|
return change_id
|
||||||
|
|
||||||
|
|
||||||
|
def wait_for_dns_change(change_id, account_number=None):
|
||||||
|
"""
|
||||||
|
Checks if changes have propagated to DNS
|
||||||
|
Verifies both the authoritative DNS server and a public DNS server(Google <8.8.8.8> in our case)
|
||||||
|
Retries and waits until successful.
|
||||||
|
"""
|
||||||
|
domain, token = change_id
|
||||||
|
number_of_attempts = 20
|
||||||
|
|
||||||
|
nameserver = _get_authoritative_nameserver(domain)
|
||||||
|
status = False
|
||||||
|
for attempts in range(0, number_of_attempts):
|
||||||
|
status = _has_dns_propagated(domain, token, nameserver)
|
||||||
|
function = sys._getframe().f_code.co_name
|
||||||
|
log_data = {
|
||||||
|
"function": function,
|
||||||
|
"fqdn": domain,
|
||||||
|
"status": status,
|
||||||
|
"message": "Record status on UltraDNS authoritative server"
|
||||||
|
}
|
||||||
|
current_app.logger.debug(log_data)
|
||||||
|
if status:
|
||||||
|
time.sleep(10)
|
||||||
|
break
|
||||||
|
time.sleep(10)
|
||||||
|
if status:
|
||||||
|
nameserver = _get_public_authoritative_nameserver()
|
||||||
|
for attempts in range(0, number_of_attempts):
|
||||||
|
status = _has_dns_propagated(domain, token, nameserver)
|
||||||
|
function = sys._getframe().f_code.co_name
|
||||||
|
log_data = {
|
||||||
|
"function": function,
|
||||||
|
"fqdn": domain,
|
||||||
|
"status": status,
|
||||||
|
"message": "Record status on Public DNS"
|
||||||
|
}
|
||||||
|
current_app.logger.debug(log_data)
|
||||||
|
if status:
|
||||||
|
metrics.send(f"{function}.success", "counter", 1)
|
||||||
|
break
|
||||||
|
time.sleep(10)
|
||||||
|
if not status:
|
||||||
|
metrics.send(f"{function}.fail", "counter", 1, metric_tags={"fqdn": domain, "txt_record": token})
|
||||||
|
sentry.captureException(extra={"fqdn": str(domain), "txt_record": str(token)})
|
||||||
|
|
||||||
|
|
||||||
|
def delete_txt_record(change_id, account_number, domain, token):
|
||||||
|
""" Delete the TXT record for the given domain and token """
|
||||||
|
zone_name = _get_zone_name(domain, account_number)
|
||||||
|
server_id = current_app.config.get("ACME_POWERDNS_SERVERID", "")
|
||||||
|
zone_id = zone_name.join(".")
|
||||||
|
domain_id = domain.join(".")
|
||||||
|
path = f"/api/v1/servers/{server_id}/zones/{zone_id}"
|
||||||
|
payload = {
|
||||||
|
"rrsets": [
|
||||||
|
{
|
||||||
|
"name": f"{domain_id}",
|
||||||
|
"type": "TXT",
|
||||||
|
"ttl": "300",
|
||||||
|
"changetype": "DELETE",
|
||||||
|
"records": [
|
||||||
|
{
|
||||||
|
"content": f"{token}",
|
||||||
|
"disabled": "false"
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"comments": []
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
|
||||||
|
try:
|
||||||
|
_patch(path, payload)
|
||||||
|
function = sys._getframe().f_code.co_name
|
||||||
|
log_data = {
|
||||||
|
"function": function,
|
||||||
|
"fqdn": domain,
|
||||||
|
"token": token,
|
||||||
|
"message": "TXT record successfully deleted"
|
||||||
|
}
|
||||||
|
current_app.logger.debug(log_data)
|
||||||
|
except requests.exceptions.RequestException as e:
|
||||||
|
function = sys._getframe().f_code.co_name
|
||||||
|
log_data = {
|
||||||
|
"function": function,
|
||||||
|
"domain": domain,
|
||||||
|
"token": token,
|
||||||
|
"Exception": e,
|
||||||
|
"message": "Unable to delete TXT record"
|
||||||
|
}
|
||||||
|
current_app.logger.debug(log_data)
|
||||||
|
|
||||||
|
|
||||||
|
def _generate_header():
|
||||||
|
"""Generate a PowerDNS API header and return it as a dictionary"""
|
||||||
|
api_key_name = current_app.config.get("ACME_POWERDNS_APIKEYNAME", "")
|
||||||
|
api_key = current_app.config.get("ACME_POWERDNS_APIKEY", "")
|
||||||
|
return {api_key_name: api_key}
|
||||||
|
|
||||||
|
|
||||||
|
def _get(path, params=None):
|
||||||
|
""" Execute a GET request on the given URL (base_uri + path) and return response as JSON object """
|
||||||
|
base_uri = current_app.config.get("ACME_POWERDNS_DOMAIN", "")
|
||||||
|
resp = requests.get(
|
||||||
|
f"{base_uri}{path}",
|
||||||
|
headers=_generate_header(),
|
||||||
|
params=params,
|
||||||
|
verify=True,
|
||||||
|
)
|
||||||
|
resp.raise_for_status()
|
||||||
|
return resp.json()
|
||||||
|
|
||||||
|
|
||||||
|
def _patch(path, payload):
|
||||||
|
""" Execute a Patch request on the given URL (base_uri + path) with given payload """
|
||||||
|
base_uri = current_app.config.get("ACME_POWERDNS_DOMAIN", "")
|
||||||
|
resp = requests.patch(
|
||||||
|
f"{base_uri}{path}",
|
||||||
|
headers=_generate_header(),
|
||||||
|
data=json.dumps(payload)
|
||||||
|
)
|
||||||
|
resp.raise_for_status()
|
||||||
|
|
||||||
|
|
||||||
|
def _get_zone_name(domain, account_number):
|
||||||
|
"""Get most specific matching zone for the given domain and return as a String"""
|
||||||
|
zones = get_zones(account_number)
|
||||||
|
zone_name = ""
|
||||||
|
for z in zones:
|
||||||
|
if domain.endswith(z):
|
||||||
|
if z.count(".") > zone_name.count("."):
|
||||||
|
zone_name = z
|
||||||
|
if not zone_name:
|
||||||
|
function = sys._getframe().f_code.co_name
|
||||||
|
metrics.send(f"{function}.fail", "counter", 1)
|
||||||
|
raise Exception(f"No PowerDNS zone found for domain: {domain}")
|
||||||
|
return zone_name
|
||||||
|
|
||||||
|
|
||||||
def _get_authoritative_nameserver(domain):
|
def _get_authoritative_nameserver(domain):
|
||||||
"""Get the authoritative nameserver for the given domain"""
|
"""Get the authoritative nameserver for the given domain"""
|
||||||
n = dns.name.from_text(domain)
|
n = dns.name.from_text(domain)
|
||||||
|
@ -234,12 +304,9 @@ def _get_authoritative_nameserver(domain):
|
||||||
def _get_public_authoritative_nameserver():
|
def _get_public_authoritative_nameserver():
|
||||||
return "8.8.8.8"
|
return "8.8.8.8"
|
||||||
|
|
||||||
def _has_dns_propagated(name, token, domain):
|
|
||||||
"""
|
|
||||||
Check whether the DNS change made by Lemur have propagated to the public DNS or not.
|
|
||||||
|
|
||||||
Invoked by wait_for_dns_change() function
|
def _has_dns_propagated(name, token, domain):
|
||||||
"""
|
"""Check whether the DNS change has propagated to the public DNS"""
|
||||||
txt_records = []
|
txt_records = []
|
||||||
try:
|
try:
|
||||||
dns_resolver = dns.resolver.Resolver()
|
dns_resolver = dns.resolver.Resolver()
|
||||||
|
@ -260,65 +327,3 @@ def _has_dns_propagated(name, token, domain):
|
||||||
return True
|
return True
|
||||||
|
|
||||||
return False
|
return False
|
||||||
|
|
||||||
|
|
||||||
def wait_for_dns_change(change_id, account_number=None):
|
|
||||||
"""
|
|
||||||
Waits and checks if the DNS changes have propagated or not.
|
|
||||||
|
|
||||||
First check the domains authoritative server. Once this succeeds,
|
|
||||||
we ask a public DNS server (Google <8.8.8.8> in our case).
|
|
||||||
"""
|
|
||||||
domain, token = change_id
|
|
||||||
number_of_attempts = 20
|
|
||||||
|
|
||||||
# Check if Record exists via DNS
|
|
||||||
nameserver = _get_authoritative_nameserver(domain)
|
|
||||||
for attempts in range(0, number_of_attempts):
|
|
||||||
status = _has_dns_propagated(domain, token, nameserver)
|
|
||||||
function = sys._getframe().f_code.co_name
|
|
||||||
log_data = {
|
|
||||||
"function": function,
|
|
||||||
"fqdn": domain,
|
|
||||||
"status": status,
|
|
||||||
"message": "Record status on ultraDNS authoritative server"
|
|
||||||
}
|
|
||||||
current_app.logger.debug(log_data)
|
|
||||||
if status:
|
|
||||||
time.sleep(10)
|
|
||||||
break
|
|
||||||
time.sleep(10)
|
|
||||||
if status:
|
|
||||||
nameserver = _get_public_authoritative_nameserver()
|
|
||||||
for attempts in range(0, number_of_attempts):
|
|
||||||
status = _has_dns_propagated(domain, token, nameserver)
|
|
||||||
log_data = {
|
|
||||||
"function": function,
|
|
||||||
"fqdn": domain,
|
|
||||||
"status": status,
|
|
||||||
"message": "Record status on Public DNS"
|
|
||||||
}
|
|
||||||
current_app.logger.debug(log_data)
|
|
||||||
if status:
|
|
||||||
metrics.send(f"{function}.success", "counter", 1)
|
|
||||||
break
|
|
||||||
time.sleep(10)
|
|
||||||
if not status:
|
|
||||||
metrics.send(f"{function}.fail", "counter", 1, metric_tags={"fqdn": domain, "txt_record": token})
|
|
||||||
sentry.captureException(extra={"fqdn": str(domain), "txt_record": str(token)})
|
|
||||||
return
|
|
||||||
|
|
||||||
def delete_txt_record(change_id, account_number, domain, token):
|
|
||||||
"""
|
|
||||||
Delete the TXT record that was created in the create_txt_record() function.
|
|
||||||
|
|
||||||
UltraDNS handles records differently compared to Dyn. It creates an RRSet
|
|
||||||
which is a set of records of the same type and owner. This means
|
|
||||||
that while deleting the record, we cannot delete any individual record from
|
|
||||||
the RRSet. Instead, we have to delete the entire RRSet. If multiple certs are
|
|
||||||
being created for the same domain at the same time, the challenge TXT records
|
|
||||||
that are created will be added under the same RRSet. If the RRSet had more
|
|
||||||
than 1 record, then we create a new RRSet on UltraDNS minus the record that
|
|
||||||
has to be deleted.
|
|
||||||
"""
|
|
||||||
pass
|
|
||||||
|
|
|
@ -21,30 +21,7 @@ class TestPowerdns(unittest.TestCase):
|
||||||
}
|
}
|
||||||
|
|
||||||
@patch("lemur.plugins.lemur_acme.powerdns.current_app")
|
@patch("lemur.plugins.lemur_acme.powerdns.current_app")
|
||||||
@patch("lemur.extensions.metrics")
|
def test_get_zones(self, mock_current_app):
|
||||||
def test_powerdns_delete_txt_record(self, mock_metrics, mock_current_app):
|
|
||||||
domain = "_acme_challenge.test.example.com"
|
|
||||||
zone = "test.example.com"
|
|
||||||
token = "ABCDEFGHIJ"
|
|
||||||
account_number = "1234567890"
|
|
||||||
change_id = (domain, token)
|
|
||||||
mock_current_app.logger.debug = Mock()
|
|
||||||
powerdns.get_zone_name = Mock(return_value=zone)
|
|
||||||
powerdns._post = Mock()
|
|
||||||
powerdns._get = Mock()
|
|
||||||
powerdns._get.return_value = {'zoneName': 'test.example.com.com',
|
|
||||||
'rrSets': [{'ownerName': '_acme-challenge.test.example.com.',
|
|
||||||
'rrtype': 'TXT (16)', 'ttl': 5, 'rdata': ['ABCDEFGHIJ']}],
|
|
||||||
'queryInfo': {'sort': 'OWNER', 'reverse': False, 'limit': 100},
|
|
||||||
'resultInfo': {'totalCount': 1, 'offset': 0, 'returnedCount': 1}}
|
|
||||||
powerdns._delete = Mock()
|
|
||||||
mock_metrics.send = Mock()
|
|
||||||
powerdns.delete_txt_record(change_id, account_number, domain, token)
|
|
||||||
mock_current_app.logger.debug.assert_not_called()
|
|
||||||
mock_metrics.send.assert_not_called()
|
|
||||||
|
|
||||||
@patch("lemur.plugins.lemur_acme.powerdns.current_app")
|
|
||||||
def test_powerdns_get_zones(self, mock_current_app):
|
|
||||||
account_number = "1234567890"
|
account_number = "1234567890"
|
||||||
path = "a/b/c"
|
path = "a/b/c"
|
||||||
zones = ['example.com', 'test.example.com']
|
zones = ['example.com', 'test.example.com']
|
||||||
|
@ -63,7 +40,7 @@ class TestPowerdns(unittest.TestCase):
|
||||||
result = powerdns.get_zones(account_number)
|
result = powerdns.get_zones(account_number)
|
||||||
self.assertEqual(result, zones)
|
self.assertEqual(result, zones)
|
||||||
|
|
||||||
def test_powerdns_get_zone_name(self):
|
def test_get_zone_name(self):
|
||||||
zones = ['example.com', 'test.example.com']
|
zones = ['example.com', 'test.example.com']
|
||||||
zone = "test.example.com"
|
zone = "test.example.com"
|
||||||
domain = "_acme-challenge.test.example.com"
|
domain = "_acme-challenge.test.example.com"
|
||||||
|
@ -72,19 +49,8 @@ class TestPowerdns(unittest.TestCase):
|
||||||
result = powerdns._get_zone_name(domain, account_number)
|
result = powerdns._get_zone_name(domain, account_number)
|
||||||
self.assertEqual(result, zone)
|
self.assertEqual(result, zone)
|
||||||
|
|
||||||
def mock_current_app_config_get(a, b):
|
|
||||||
""" Mock of current_app.config.get() """
|
|
||||||
config = {
|
|
||||||
'ACME_POWERDNS_APIKEYNAME': 'X-API-Key',
|
|
||||||
'ACME_POWERDNS_APIKEY': 'KEY',
|
|
||||||
'ACME_POWERDNS_DOMAIN': 'http://internal-dnshiddenmaster-1486232504.us-east-1.elb.amazonaws.com',
|
|
||||||
'ACME_POWERDNS_SERVERID': 'localhost'
|
|
||||||
}
|
|
||||||
return config[a]
|
|
||||||
|
|
||||||
@patch("lemur.plugins.lemur_acme.powerdns.current_app")
|
@patch("lemur.plugins.lemur_acme.powerdns.current_app")
|
||||||
# @patch("lemur.plugins.lemur_acme.powerdns.current_app.config.get", side_effect=mock_current_app_config_get)
|
def test_create_txt_record(self, mock_current_app):
|
||||||
def test_powerdns_create_txt_record(self, mock_current_app):
|
|
||||||
domain = "_acme_challenge.test.example.com"
|
domain = "_acme_challenge.test.example.com"
|
||||||
zone = "test.example.com"
|
zone = "test.example.com"
|
||||||
token = "ABCDEFGHIJ"
|
token = "ABCDEFGHIJ"
|
||||||
|
@ -106,21 +72,50 @@ class TestPowerdns(unittest.TestCase):
|
||||||
|
|
||||||
@patch("lemur.plugins.lemur_acme.powerdns.current_app")
|
@patch("lemur.plugins.lemur_acme.powerdns.current_app")
|
||||||
@patch("lemur.extensions.metrics")
|
@patch("lemur.extensions.metrics")
|
||||||
def test_powerdns_wait_for_dns_change(self, mock_metrics, mock_current_app):
|
@patch("time.sleep")
|
||||||
powerdns._has_dns_propagated = Mock(return_value=True)
|
def test_wait_for_dns_change(self, mock_sleep, mock_metrics, mock_current_app):
|
||||||
nameserver = "1.1.1.1"
|
nameserver = "1.1.1.1"
|
||||||
powerdns._get_authoritative_nameserver = Mock(return_value=nameserver)
|
powerdns._get_authoritative_nameserver = Mock(return_value=nameserver)
|
||||||
|
powerdns._has_dns_propagated = Mock(return_value=True)
|
||||||
mock_metrics.send = Mock()
|
mock_metrics.send = Mock()
|
||||||
|
mock_sleep.return_value = False
|
||||||
domain = "_acme-challenge.test.example.com"
|
domain = "_acme-challenge.test.example.com"
|
||||||
token = "ABCDEFGHIJ"
|
token = "ABCDEFGHIJ"
|
||||||
change_id = (domain, token)
|
change_id = (domain, token)
|
||||||
mock_current_app.logger.debug = Mock()
|
mock_current_app.logger.debug = Mock()
|
||||||
powerdns.wait_for_dns_change(change_id)
|
powerdns.wait_for_dns_change(change_id)
|
||||||
# mock_metrics.send.assert_not_called()
|
|
||||||
log_data = {
|
auth_log_data = {
|
||||||
|
"function": "wait_for_dns_change",
|
||||||
|
"fqdn": domain,
|
||||||
|
"status": True,
|
||||||
|
"message": "Record status on UltraDNS authoritative server"
|
||||||
|
}
|
||||||
|
pub_log_data = {
|
||||||
"function": "wait_for_dns_change",
|
"function": "wait_for_dns_change",
|
||||||
"fqdn": domain,
|
"fqdn": domain,
|
||||||
"status": True,
|
"status": True,
|
||||||
"message": "Record status on Public DNS"
|
"message": "Record status on Public DNS"
|
||||||
}
|
}
|
||||||
|
mock_current_app.logger.debug.assert_any_call(auth_log_data)
|
||||||
|
mock_current_app.logger.debug.assert_any_call(pub_log_data)
|
||||||
|
|
||||||
|
@patch("lemur.plugins.lemur_acme.powerdns.current_app")
|
||||||
|
def test_delete_txt_record(self, mock_current_app):
|
||||||
|
domain = "_acme_challenge.test.example.com"
|
||||||
|
zone = "test.example.com"
|
||||||
|
token = "ABCDEFGHIJ"
|
||||||
|
account_number = "1234567890"
|
||||||
|
change_id = (domain, token)
|
||||||
|
powerdns._get_zone_name = Mock(return_value=zone)
|
||||||
|
mock_current_app.logger.debug = Mock()
|
||||||
|
mock_current_app.config.get = Mock(return_value="localhost")
|
||||||
|
powerdns._patch = Mock()
|
||||||
|
log_data = {
|
||||||
|
"function": "delete_txt_record",
|
||||||
|
"fqdn": domain,
|
||||||
|
"token": token,
|
||||||
|
"message": "TXT record successfully deleted"
|
||||||
|
}
|
||||||
|
powerdns.delete_txt_record(change_id, account_number, domain, token)
|
||||||
mock_current_app.logger.debug.assert_called_with(log_data)
|
mock_current_app.logger.debug.assert_called_with(log_data)
|
Loading…
Reference in New Issue