adding PowerDNS get_zones functionality and unit tests

This commit is contained in:
csine-nflx 2020-01-17 18:29:37 -08:00
parent 7f73417bda
commit 3080a9527c
4 changed files with 241 additions and 7 deletions

View File

@ -31,7 +31,7 @@ from lemur.exceptions import InvalidAuthority, InvalidConfiguration, UnknownProv
from lemur.extensions import metrics, sentry
from lemur.plugins import lemur_acme as acme
from lemur.plugins.bases import IssuerPlugin
from lemur.plugins.lemur_acme import cloudflare, dyn, route53, ultradns
from lemur.plugins.lemur_acme import cloudflare, dyn, route53, ultradns, powerdns
from retrying import retry
@ -377,6 +377,7 @@ class AcmeHandler(object):
"dyn": dyn,
"route53": route53,
"ultradns": ultradns,
# "powerdns": powerdns,
}
provider = provider_types.get(type)
if not provider:
@ -436,6 +437,7 @@ class ACMEIssuerPlugin(IssuerPlugin):
"dyn": dyn,
"route53": route53,
"ultradns": ultradns,
# "powerdns": powerdns,
}
provider = provider_types.get(type)
if not provider:

View File

@ -0,0 +1,108 @@
import time
import requests
import json
import sys
import dns
import dns.exception
import dns.name
import dns.query
import dns.resolver
from flask import current_app
from lemur.extensions import metrics, sentry
class Zone:
""" This class implements a PowerDNS zone in JSON. """
def __init__(self, _data):
self._data = _data
@property
def id(self):
""" Zone id, has a trailing "." at the end, which we manually remove. """
return self._data["id"][:-1]
@property
def name(self):
""" Zone name, has a trailing "." at the end, which we manually remove. """
return self._data["name"][:-1]
@property
def kind(self):
""" Indicates whether the zone is setup as a PRIMARY or SECONDARY """
return self._data["kind"]
def _generate_header():
"""Function to generate the header for a request using the PowerDNS API Key"""
api_key_name = current_app.config.get("ACME_POWERDNS_APIKEYNAME", "")
api_key = current_app.config.get("ACME_POWERDNS_APIKEY", "")
return {api_key_name: api_key}
def _get(path, params=None):
"""
Function to execute a GET request on the given URL (base_uri + path) with given params
Returns JSON response object
"""
base_uri = current_app.config.get("ACME_POWERDNS_DOMAIN", "")
resp = requests.get(
f"{base_uri}{path}",
headers=_generate_header(),
params=params,
verify=True,
)
resp.raise_for_status()
return resp.json()
def get_zones(account_number):
"""Get zones from the PowerDNS"""
server_id = current_app.config.get("ACME_POWERDNS_SERVERID", "")
path = f"/api/v1/servers/{server_id}/zones"
zones = []
for elem in _get(path):
zone = Zone(elem)
if zone.kind == 'Master':
zones.append(zone.name)
return zones
def create_txt_record(domain, token, account_number):
"""
Create a TXT record for the given domain.
The part of the domain that matches with the zone becomes the zone name.
The remainder becomes the owner name (referred to as node name here)
Example: Let's say we have a zone named "exmaple.com" in PowerDNS and we
get a request to create a cert for lemur.example.com
Domain - _acme-challenge.lemur.example.com
Matching zone - example.com
Owner name - _acme-challenge.lemur
"""
pass
def wait_for_dns_change(change_id, account_number=None):
"""
Waits and checks if the DNS changes have propagated or not.
First check the domains authoritative server. Once this succeeds,
we ask a public DNS server (Google <8.8.8.8> in our case).
"""
pass
def delete_txt_record(change_id, account_number, domain, token):
"""
Delete the TXT record that was created in the create_txt_record() function.
UltraDNS handles records differently compared to Dyn. It creates an RRSet
which is a set of records of the same type and owner. This means
that while deleting the record, we cannot delete any individual record from
the RRSet. Instead, we have to delete the entire RRSet. If multiple certs are
being created for the same domain at the same time, the challenge TXT records
that are created will be added under the same RRSet. If the RRSet had more
than 1 record, then we create a new RRSet on UltraDNS minus the record that
has to be deleted.
"""
pass

View File

@ -364,7 +364,7 @@ class TestAcme(unittest.TestCase):
@patch("lemur.plugins.lemur_acme.ultradns.requests")
@patch("lemur.plugins.lemur_acme.ultradns.current_app")
def test_get_ultradns_token(self, mock_current_app, mock_requests):
def test_ultradns_get_token(self, mock_current_app, mock_requests):
# ret_val = json.dumps({"access_token": "access"})
the_response = Response()
the_response._content = b'{"access_token": "access"}'
@ -374,7 +374,7 @@ class TestAcme(unittest.TestCase):
self.assertTrue(len(result) > 0)
@patch("lemur.plugins.lemur_acme.ultradns.current_app")
def test_create_txt_record(self, mock_current_app):
def test_ultradns_create_txt_record(self, mock_current_app):
domain = "_acme_challenge.test.example.com"
zone = "test.example.com"
token = "ABCDEFGHIJ"
@ -395,7 +395,7 @@ class TestAcme(unittest.TestCase):
@patch("lemur.plugins.lemur_acme.ultradns.current_app")
@patch("lemur.extensions.metrics")
def test_delete_txt_record(self, mock_metrics, mock_current_app):
def test_ultradns_delete_txt_record(self, mock_metrics, mock_current_app):
domain = "_acme_challenge.test.example.com"
zone = "test.example.com"
token = "ABCDEFGHIJ"
@ -418,7 +418,7 @@ class TestAcme(unittest.TestCase):
@patch("lemur.plugins.lemur_acme.ultradns.current_app")
@patch("lemur.extensions.metrics")
def test_wait_for_dns_change(self, mock_metrics, mock_current_app):
def test_ultradns_wait_for_dns_change(self, mock_metrics, mock_current_app):
ultradns._has_dns_propagated = Mock(return_value=True)
nameserver = "1.1.1.1"
ultradns.get_authoritative_nameserver = Mock(return_value=nameserver)
@ -437,7 +437,7 @@ class TestAcme(unittest.TestCase):
}
mock_current_app.logger.debug.assert_called_with(log_data)
def test_get_zone_name(self):
def test_ultradns_get_zone_name(self):
zones = ['example.com', 'test.example.com']
zone = "test.example.com"
domain = "_acme-challenge.test.example.com"
@ -446,7 +446,7 @@ class TestAcme(unittest.TestCase):
result = ultradns.get_zone_name(domain, account_number)
self.assertEqual(result, zone)
def test_get_zones(self):
def test_ultradns_get_zones(self):
account_number = "1234567890"
path = "a/b/c"
zones = ['example.com', 'test.example.com']

View File

@ -0,0 +1,124 @@
import unittest
from requests.models import Response
from mock import MagicMock, Mock, patch
from lemur.plugins.lemur_acme import plugin, powerdns
class TestPowerdns(unittest.TestCase):
@patch("lemur.plugins.lemur_acme.plugin.dns_provider_service")
def setUp(self, mock_dns_provider_service):
self.ACMEIssuerPlugin = plugin.ACMEIssuerPlugin()
self.acme = plugin.AcmeHandler()
mock_dns_provider = Mock()
mock_dns_provider.name = "powerdns"
mock_dns_provider.credentials = "{}"
mock_dns_provider.provider_type = "powerdns"
self.acme.dns_providers_for_domain = {
"www.test.com": [mock_dns_provider],
"test.fakedomain.net": [mock_dns_provider],
}
@patch("lemur.plugins.lemur_acme.powerdns.requests")
@patch("lemur.plugins.lemur_acme.powerdns.current_app")
def test_powerdns_get_token(self, mock_current_app, mock_requests):
# ret_val = json.dumps({"access_token": "access"})
the_response = Response()
the_response._content = b'{"access_token": "access"}'
mock_requests.post = Mock(return_value=the_response)
mock_current_app.config.get = Mock(return_value="Test")
result = powerdns.get_powerdns_token()
self.assertTrue(len(result) > 0)
@patch("lemur.plugins.lemur_acme.powerdns.current_app")
def test_powerdns_create_txt_record(self, mock_current_app):
domain = "_acme_challenge.test.example.com"
zone = "test.example.com"
token = "ABCDEFGHIJ"
account_number = "1234567890"
change_id = (domain, token)
powerdns.get_zone_name = Mock(return_value=zone)
mock_current_app.logger.debug = Mock()
powerdns._post = Mock()
log_data = {
"function": "create_txt_record",
"fqdn": domain,
"token": token,
"message": "TXT record created"
}
result = powerdns.create_txt_record(domain, token, account_number)
mock_current_app.logger.debug.assert_called_with(log_data)
self.assertEqual(result, change_id)
@patch("lemur.plugins.lemur_acme.powerdns.current_app")
@patch("lemur.extensions.metrics")
def test_powerdns_delete_txt_record(self, mock_metrics, mock_current_app):
domain = "_acme_challenge.test.example.com"
zone = "test.example.com"
token = "ABCDEFGHIJ"
account_number = "1234567890"
change_id = (domain, token)
mock_current_app.logger.debug = Mock()
powerdns.get_zone_name = Mock(return_value=zone)
powerdns._post = Mock()
powerdns._get = Mock()
powerdns._get.return_value = {'zoneName': 'test.example.com.com',
'rrSets': [{'ownerName': '_acme-challenge.test.example.com.',
'rrtype': 'TXT (16)', 'ttl': 5, 'rdata': ['ABCDEFGHIJ']}],
'queryInfo': {'sort': 'OWNER', 'reverse': False, 'limit': 100},
'resultInfo': {'totalCount': 1, 'offset': 0, 'returnedCount': 1}}
powerdns._delete = Mock()
mock_metrics.send = Mock()
powerdns.delete_txt_record(change_id, account_number, domain, token)
mock_current_app.logger.debug.assert_not_called()
mock_metrics.send.assert_not_called()
@patch("lemur.plugins.lemur_acme.powerdns.current_app")
@patch("lemur.extensions.metrics")
def test_powerdns_wait_for_dns_change(self, mock_metrics, mock_current_app):
powerdns._has_dns_propagated = Mock(return_value=True)
nameserver = "1.1.1.1"
powerdns.get_authoritative_nameserver = Mock(return_value=nameserver)
mock_metrics.send = Mock()
domain = "_acme-challenge.test.example.com"
token = "ABCDEFGHIJ"
change_id = (domain, token)
mock_current_app.logger.debug = Mock()
powerdns.wait_for_dns_change(change_id)
# mock_metrics.send.assert_not_called()
log_data = {
"function": "wait_for_dns_change",
"fqdn": domain,
"status": True,
"message": "Record status on Public DNS"
}
mock_current_app.logger.debug.assert_called_with(log_data)
def test_powerdns_get_zone_name(self):
zones = ['example.com', 'test.example.com']
zone = "test.example.com"
domain = "_acme-challenge.test.example.com"
account_number = "1234567890"
powerdns.get_zones = Mock(return_value=zones)
result = powerdns.get_zone_name(domain, account_number)
self.assertEqual(result, zone)
@patch("lemur.plugins.lemur_acme.powerdns.current_app")
def test_powerdns_get_zones(self, mock_current_app):
account_number = "1234567890"
path = "a/b/c"
zones = ['example.com', 'test.example.com']
get_response = [{'account': '', 'dnssec': 'False', 'id': 'example.com.', 'kind': 'Master', 'last_check': 0, 'masters': [],
'name': 'example.com.', 'notified_serial': '2019111907', 'serial': '2019111907',
'url': '/api/v1/servers/localhost/zones/example.com.'},
{'account': '', 'dnssec': 'False', 'id': 'bad.example.com.', 'kind': 'Secondary', 'last_check': 0, 'masters': [],
'name': 'bad.example.com.', 'notified_serial': '2018053104', 'serial': '2018053104',
'url': '/api/v1/servers/localhost/zones/bad.example.com.'},
{'account': '', 'dnssec': 'False', 'id': 'test.example.com.', 'kind': 'Master', 'last_check': 0,
'masters': [], 'name': 'test.example.com.', 'notified_serial': '2019112501', 'serial': '2019112501',
'url': '/api/v1/servers/localhost/zones/test.example.com.'}]
powerdns._get = Mock(path)
powerdns._get.side_effect = [get_response]
mock_current_app.config.get = Mock(return_value="localhost")
result = powerdns.get_zones(account_number)
self.assertEqual(result, zones)