Merge branch 'master' into le_Log_orderurl

This commit is contained in:
csine-nflx 2020-02-13 16:41:14 -08:00 committed by GitHub
commit b521aaf579
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 716 additions and 35 deletions

View File

@ -973,6 +973,41 @@ Will be the sender of all notifications, so ensure that it is verified with AWS.
SES if the default notification gateway and will be used unless SMTP settings are configured in the application configuration
settings.
PowerDNS ACME Plugin
~~~~~~~~~~~~~~~~~~~~~~
The following configuration properties are required to use the PowerDNS ACME Plugin for domain validation.
.. data:: ACME_POWERDNS_DOMAIN
:noindex:
This is the FQDN for the PowerDNS API (without path)
.. data:: ACME_POWERDNS_SERVERID
:noindex:
This is the ServerID attribute of the PowerDNS API Server (i.e. "localhost")
.. data:: ACME_POWERDNS_APIKEYNAME
:noindex:
This is the Key name to use for authentication (i.e. "X-API-Key")
.. data:: ACME_POWERDNS_APIKEY
:noindex:
This is the API Key to use for authentication (i.e. "Password")
.. data:: ACME_POWERDNS_RETRIES
:noindex:
This is the number of times DNS Verification should be attempted (i.e. 20)
.. _CommandLineInterface:
Command Line Interface
@ -1071,6 +1106,15 @@ All commands default to `~/.lemur/lemur.conf.py` if a configuration is not speci
lemur notify
.. data:: acme
Handles all ACME related tasks, like ACME plugin testing.
::
lemur acme
Sub-commands
------------
@ -1172,11 +1216,12 @@ Acme
Kevin Glisson <kglisson@netflix.com>,
Curtis Castrapel <ccastrapel@netflix.com>,
Hossein Shafagh <hshafagh@netflix.com>,
Mikhail Khodorovskiy <mikhail.khodorovskiy@jivesoftware.com>
Mikhail Khodorovskiy <mikhail.khodorovskiy@jivesoftware.com>,
Chad Sine <csine@netflix.com>
:Type:
Issuer
:Description:
Adds support for the ACME protocol (including LetsEncrypt) with domain validation being handled Route53.
Adds support for the ACME protocol (including LetsEncrypt) with domain validation using several providers.
Atlas

View File

View File

@ -0,0 +1,86 @@
import time
import json
from flask_script import Manager
from flask import current_app
from lemur.extensions import sentry
from lemur.constants import SUCCESS_METRIC_STATUS
from lemur.plugins.lemur_acme.plugin import AcmeHandler
manager = Manager(
usage="Handles all ACME related tasks"
)
@manager.option(
"-d",
"--domain",
dest="domain",
required=True,
help="Name of the Domain to store to (ex. \"_acme-chall.test.com\".",
)
@manager.option(
"-t",
"--token",
dest="token",
required=True,
help="Value of the Token to store in DNS as content.",
)
def dnstest(domain, token):
"""
Create, verify, and delete DNS TXT records using an autodetected provider.
"""
print("[+] Starting ACME Tests.")
change_id = (domain, token)
acme_handler = AcmeHandler()
acme_handler.autodetect_dns_providers(domain)
if not acme_handler.dns_providers_for_domain[domain]:
raise Exception(f"No DNS providers found for domain: {format(domain)}.")
# Create TXT Records
for dns_provider in acme_handler.dns_providers_for_domain[domain]:
dns_provider_plugin = acme_handler.get_dns_provider(dns_provider.provider_type)
dns_provider_options = json.loads(dns_provider.credentials)
account_number = dns_provider_options.get("account_id")
print(f"[+] Creating TXT Record in `{dns_provider.name}` provider")
change_id = dns_provider_plugin.create_txt_record(domain, token, account_number)
print("[+] Verifying TXT Record has propagated to DNS.")
print("[+] This step could take a while...")
time.sleep(10)
# Verify TXT Records
for dns_provider in acme_handler.dns_providers_for_domain[domain]:
dns_provider_plugin = acme_handler.get_dns_provider(dns_provider.provider_type)
dns_provider_options = json.loads(dns_provider.credentials)
account_number = dns_provider_options.get("account_id")
try:
dns_provider_plugin.wait_for_dns_change(change_id, account_number)
print(f"[+] Verified TXT Record in `{dns_provider.name}` provider")
except Exception:
sentry.captureException()
current_app.logger.debug(
f"Unable to resolve DNS challenge for change_id: {change_id}, account_id: "
f"{account_number}",
exc_info=True,
)
print(f"[+] Unable to Verify TXT Record in `{dns_provider.name}` provider")
time.sleep(10)
# Delete TXT Records
for dns_provider in acme_handler.dns_providers_for_domain[domain]:
dns_provider_plugin = acme_handler.get_dns_provider(dns_provider.provider_type)
dns_provider_options = json.loads(dns_provider.credentials)
account_number = dns_provider_options.get("account_id")
# TODO(csine@: Add Exception Handling
dns_provider_plugin.delete_txt_record(change_id, account_number, domain, token)
print(f"[+] Deleted TXT Record in `{dns_provider.name}` provider")
status = SUCCESS_METRIC_STATUS
print("[+] Done with ACME Tests.")

View File

@ -99,6 +99,7 @@ def get_types():
},
{"name": "dyn"},
{"name": "ultradns"},
{"name": "powerdns"},
]
},
)

101
lemur/dns_providers/util.py Normal file
View File

@ -0,0 +1,101 @@
import sys
import dns
import dns.exception
import dns.name
import dns.query
import dns.resolver
import re
from lemur.extensions import sentry
from lemur.extensions import metrics
class DNSError(Exception):
"""Base class for DNS Exceptions."""
pass
class BadDomainError(DNSError):
"""Error for when a Bad Domain Name is given."""
def __init__(self, message):
self.message = message
class DNSResolveError(DNSError):
"""Error for DNS Resolution Errors."""
def __init__(self, message):
self.message = message
def is_valid_domain(domain):
"""Checks if a domain is syntactically valid and returns a bool"""
if len(domain) > 253:
return False
if domain[-1] == ".":
domain = domain[:-1]
fqdn_re = re.compile("(?=^.{1,254}$)(^(?:(?!\d+\.|-)[a-zA-Z0-9_\-]{1,63}(?<!-)\.?)+(?:[a-zA-Z]{2,})$)", re.IGNORECASE)
return all(fqdn_re.match(d) for d in domain.split("."))
def get_authoritative_nameserver(domain):
"""Get the authoritative nameservers for the given domain"""
if not is_valid_domain(domain):
raise BadDomainError(f"{domain} is not a valid FQDN")
n = dns.name.from_text(domain)
depth = 2
default = dns.resolver.get_default_resolver()
nameserver = default.nameservers[0]
last = False
while not last:
s = n.split(depth)
last = s[0].to_unicode() == u"@"
sub = s[1]
query = dns.message.make_query(sub, dns.rdatatype.NS)
response = dns.query.udp(query, nameserver)
rcode = response.rcode()
if rcode != dns.rcode.NOERROR:
function = sys._getframe().f_code.co_name
metrics.send(f"{function}.error", "counter", 1)
if rcode == dns.rcode.NXDOMAIN:
raise DNSResolveError(f"{sub} does not exist.")
else:
raise DNSResolveError(f"Error: {dns.rcode.to_text(rcode)}")
if len(response.authority) > 0:
rrset = response.authority[0]
else:
rrset = response.answer[0]
rr = rrset[0]
if rr.rdtype != dns.rdatatype.SOA:
authority = rr.target
nameserver = default.query(authority).rrset[0].to_text()
depth += 1
return nameserver
def get_dns_records(domain, rdtype, nameserver):
"""Retrieves the DNS records matching the name and type and returns a list of records"""
records = []
try:
dns_resolver = dns.resolver.Resolver()
dns_resolver.nameservers = [nameserver]
dns_response = dns_resolver.query(domain, rdtype)
for rdata in dns_response:
for record in rdata.strings:
records.append(record.decode("utf-8"))
except dns.exception.DNSException:
sentry.captureException()
function = sys._getframe().f_code.co_name
metrics.send(f"{function}.fail", "counter", 1)
return records

View File

@ -17,6 +17,7 @@ from flask_migrate import Migrate, MigrateCommand, stamp
from flask_script.commands import ShowUrls, Clean, Server
from lemur.dns_providers.cli import manager as dns_provider_manager
from lemur.acme_providers.cli import manager as acme_manager
from lemur.sources.cli import manager as source_manager
from lemur.policies.cli import manager as policy_manager
from lemur.reporting.cli import manager as report_manager
@ -584,6 +585,7 @@ def main():
manager.add_command("policy", policy_manager)
manager.add_command("pending_certs", pending_certificate_manager)
manager.add_command("dns_providers", dns_provider_manager)
manager.add_command("acme", acme_manager)
manager.run()

View File

@ -31,7 +31,7 @@ from lemur.exceptions import InvalidAuthority, InvalidConfiguration, UnknownProv
from lemur.extensions import metrics, sentry
from lemur.plugins import lemur_acme as acme
from lemur.plugins.bases import IssuerPlugin
from lemur.plugins.lemur_acme import cloudflare, dyn, route53, ultradns
from lemur.plugins.lemur_acme import cloudflare, dyn, route53, ultradns, powerdns
from retrying import retry
@ -259,8 +259,9 @@ class AcmeHandler(object):
domains = [options["common_name"]]
if options.get("extensions"):
for name in options["extensions"]["sub_alt_names"]["names"]:
domains.append(name)
for dns_name in options["extensions"]["sub_alt_names"]["names"]:
if dns_name.value not in domains:
domains.append(dns_name.value)
current_app.logger.debug("Got these domains: {0}".format(domains))
return domains
@ -382,6 +383,7 @@ class AcmeHandler(object):
"dyn": dyn,
"route53": route53,
"ultradns": ultradns,
"powerdns": powerdns
}
provider = provider_types.get(type)
if not provider:
@ -441,6 +443,7 @@ class ACMEIssuerPlugin(IssuerPlugin):
"dyn": dyn,
"route53": route53,
"ultradns": ultradns,
"powerdns": powerdns
}
provider = provider_types.get(type)
if not provider:
@ -643,15 +646,8 @@ class ACMEIssuerPlugin(IssuerPlugin):
domains = self.acme.get_domains(issuer_options)
if not create_immediately:
# Create pending authorizations that we'll need to do the creation
authz_domains = []
for d in domains:
if type(d) == str:
authz_domains.append(d)
else:
authz_domains.append(d.value)
dns_authorization = authorization_service.create(
account_number, authz_domains, provider_type
account_number, domains, provider_type
)
# Return id of the DNS Authorization
return None, None, dns_authorization.id

View File

@ -0,0 +1,267 @@
import time
import requests
import json
import sys
import lemur.common.utils as utils
import lemur.dns_providers.util as dnsutil
from flask import current_app
from lemur.extensions import metrics, sentry
REQUIRED_VARIABLES = [
"ACME_POWERDNS_APIKEYNAME",
"ACME_POWERDNS_APIKEY",
"ACME_POWERDNS_DOMAIN",
]
class Zone:
""" This class implements a PowerDNS zone in JSON. """
def __init__(self, _data):
self._data = _data
@property
def id(self):
""" Zone id, has a trailing "." at the end, which we manually remove. """
return self._data["id"][:-1]
@property
def name(self):
""" Zone name, has a trailing "." at the end, which we manually remove. """
return self._data["name"][:-1]
@property
def kind(self):
""" Indicates whether the zone is setup as a PRIMARY or SECONDARY """
return self._data["kind"]
class Record:
""" This class implements a PowerDNS record. """
def __init__(self, _data):
self._data = _data
@property
def name(self):
return self._data["name"]
@property
def disabled(self):
return self._data["disabled"]
@property
def content(self):
return self._data["content"]
@property
def ttl(self):
return self._data["ttl"]
def get_zones(account_number):
"""Retrieve authoritative zones from the PowerDNS API and return a list"""
_check_conf()
server_id = current_app.config.get("ACME_POWERDNS_SERVERID", "localhost")
path = f"/api/v1/servers/{server_id}/zones"
zones = []
function = sys._getframe().f_code.co_name
log_data = {
"function": function
}
try:
records = _get(path)
log_data["message"] = "Retrieved Zones Successfully"
current_app.logger.debug(log_data)
except Exception as e:
sentry.captureException()
log_data["message"] = "Failed to Retrieve Zone Data"
current_app.logger.debug(log_data)
raise
for record in records:
zone = Zone(record)
if zone.kind == 'Master':
zones.append(zone.name)
return zones
def create_txt_record(domain, token, account_number):
""" Create a TXT record for the given domain and token and return a change_id tuple """
_check_conf()
zone_name = _get_zone_name(domain, account_number)
server_id = current_app.config.get("ACME_POWERDNS_SERVERID", "localhost")
zone_id = zone_name + "."
domain_id = domain + "."
path = f"/api/v1/servers/{server_id}/zones/{zone_id}"
payload = {
"rrsets": [
{
"name": domain_id,
"type": "TXT",
"ttl": 300,
"changetype": "REPLACE",
"records": [
{
"content": f"\"{token}\"",
"disabled": False
}
],
"comments": []
}
]
}
function = sys._getframe().f_code.co_name
log_data = {
"function": function,
"fqdn": domain,
"token": token,
}
try:
_patch(path, payload)
log_data["message"] = "TXT record successfully created"
current_app.logger.debug(log_data)
except Exception as e:
sentry.captureException()
log_data["Exception"] = e
log_data["message"] = "Unable to create TXT record"
current_app.logger.debug(log_data)
change_id = (domain, token)
return change_id
def wait_for_dns_change(change_id, account_number=None):
"""
Checks the authoritative DNS Server to see if changes have propagated to DNS
Retries and waits until successful.
"""
_check_conf()
domain, token = change_id
number_of_attempts = current_app.config.get("ACME_POWERDNS_RETRIES", 3)
zone_name = _get_zone_name(domain, account_number)
nameserver = dnsutil.get_authoritative_nameserver(zone_name)
record_found = False
for attempts in range(0, number_of_attempts):
txt_records = dnsutil.get_dns_records(domain, "TXT", nameserver)
for txt_record in txt_records:
if txt_record == token:
record_found = True
break
if record_found:
break
time.sleep(10)
function = sys._getframe().f_code.co_name
log_data = {
"function": function,
"fqdn": domain,
"status": record_found,
"message": "Record status on PowerDNS authoritative server"
}
current_app.logger.debug(log_data)
if record_found:
metrics.send(f"{function}.success", "counter", 1, metric_tags={"fqdn": domain, "txt_record": token})
else:
metrics.send(f"{function}.fail", "counter", 1, metric_tags={"fqdn": domain, "txt_record": token})
def delete_txt_record(change_id, account_number, domain, token):
""" Delete the TXT record for the given domain and token """
_check_conf()
zone_name = _get_zone_name(domain, account_number)
server_id = current_app.config.get("ACME_POWERDNS_SERVERID", "localhost")
zone_id = zone_name + "."
domain_id = domain + "."
path = f"/api/v1/servers/{server_id}/zones/{zone_id}"
payload = {
"rrsets": [
{
"name": domain_id,
"type": "TXT",
"ttl": 300,
"changetype": "DELETE",
"records": [
{
"content": f"\"{token}\"",
"disabled": False
}
],
"comments": []
}
]
}
function = sys._getframe().f_code.co_name
log_data = {
"function": function,
"fqdn": domain,
"token": token
}
try:
_patch(path, payload)
log_data["message"] = "TXT record successfully deleted"
current_app.logger.debug(log_data)
except Exception as e:
sentry.captureException()
log_data["Exception"] = e
log_data["message"] = "Unable to delete TXT record"
current_app.logger.debug(log_data)
def _check_conf():
utils.validate_conf(current_app, REQUIRED_VARIABLES)
def _generate_header():
"""Generate a PowerDNS API header and return it as a dictionary"""
api_key_name = current_app.config.get("ACME_POWERDNS_APIKEYNAME")
api_key = current_app.config.get("ACME_POWERDNS_APIKEY")
headers = {api_key_name: api_key}
return headers
def _get_zone_name(domain, account_number):
"""Get most specific matching zone for the given domain and return as a String"""
zones = get_zones(account_number)
zone_name = ""
for z in zones:
if domain.endswith(z):
if z.count(".") > zone_name.count("."):
zone_name = z
if not zone_name:
function = sys._getframe().f_code.co_name
log_data = {
"function": function,
"fqdn": domain,
"message": "No PowerDNS zone name found.",
}
metrics.send(f"{function}.fail", "counter", 1)
return zone_name
def _get(path, params=None):
""" Execute a GET request on the given URL (base_uri + path) and return response as JSON object """
base_uri = current_app.config.get("ACME_POWERDNS_DOMAIN")
resp = requests.get(
f"{base_uri}{path}",
headers=_generate_header(),
params=params,
verify=True,
)
resp.raise_for_status()
return resp.json()
def _patch(path, payload):
""" Execute a Patch request on the given URL (base_uri + path) with given payload """
base_uri = current_app.config.get("ACME_POWERDNS_DOMAIN")
resp = requests.patch(
f"{base_uri}{path}",
data=json.dumps(payload),
headers=_generate_header()
)
resp.raise_for_status()

View File

@ -1,4 +1,6 @@
import unittest
from cryptography.x509 import DNSName
from requests.models import Response
from mock import MagicMock, Mock, patch
@ -74,12 +76,14 @@ class TestAcme(unittest.TestCase):
@patch("acme.client.Client")
@patch("lemur.plugins.lemur_acme.plugin.current_app")
@patch("lemur.plugins.lemur_acme.cloudflare.wait_for_dns_change")
@patch("time.sleep")
def test_complete_dns_challenge_success(
self, mock_wait_for_dns_change, mock_current_app, mock_acme
self, mock_sleep, mock_wait_for_dns_change, mock_current_app, mock_acme
):
mock_dns_provider = Mock()
mock_dns_provider.wait_for_dns_change = Mock(return_value=True)
mock_authz = Mock()
mock_sleep.return_value = False
mock_authz.dns_challenge.response = Mock()
mock_authz.dns_challenge.response.simple_verify = Mock(return_value=True)
mock_authz.authz = []
@ -179,7 +183,7 @@ class TestAcme(unittest.TestCase):
options = {
"common_name": "test.netflix.net",
"extensions": {
"sub_alt_names": {"names": ["test2.netflix.net", "test3.netflix.net"]}
"sub_alt_names": {"names": [DNSName("test2.netflix.net"), DNSName("test3.netflix.net")]}
},
}
result = self.acme.get_domains(options)
@ -187,6 +191,19 @@ class TestAcme(unittest.TestCase):
result, [options["common_name"], "test2.netflix.net", "test3.netflix.net"]
)
@patch("lemur.plugins.lemur_acme.plugin.current_app")
def test_get_domains_san(self, mock_current_app):
options = {
"common_name": "test.netflix.net",
"extensions": {
"sub_alt_names": {"names": [DNSName("test.netflix.net"), DNSName("test2.netflix.net")]}
},
}
result = self.acme.get_domains(options)
self.assertEqual(
result, [options["common_name"], "test2.netflix.net"]
)
@patch(
"lemur.plugins.lemur_acme.plugin.AcmeHandler.start_dns_challenge",
return_value="test",
@ -364,7 +381,7 @@ class TestAcme(unittest.TestCase):
@patch("lemur.plugins.lemur_acme.ultradns.requests")
@patch("lemur.plugins.lemur_acme.ultradns.current_app")
def test_get_ultradns_token(self, mock_current_app, mock_requests):
def test_ultradns_get_token(self, mock_current_app, mock_requests):
# ret_val = json.dumps({"access_token": "access"})
the_response = Response()
the_response._content = b'{"access_token": "access"}'
@ -374,7 +391,7 @@ class TestAcme(unittest.TestCase):
self.assertTrue(len(result) > 0)
@patch("lemur.plugins.lemur_acme.ultradns.current_app")
def test_create_txt_record(self, mock_current_app):
def test_ultradns_create_txt_record(self, mock_current_app):
domain = "_acme_challenge.test.example.com"
zone = "test.example.com"
token = "ABCDEFGHIJ"
@ -395,7 +412,7 @@ class TestAcme(unittest.TestCase):
@patch("lemur.plugins.lemur_acme.ultradns.current_app")
@patch("lemur.extensions.metrics")
def test_delete_txt_record(self, mock_metrics, mock_current_app):
def test_ultradns_delete_txt_record(self, mock_metrics, mock_current_app):
domain = "_acme_challenge.test.example.com"
zone = "test.example.com"
token = "ABCDEFGHIJ"
@ -418,7 +435,7 @@ class TestAcme(unittest.TestCase):
@patch("lemur.plugins.lemur_acme.ultradns.current_app")
@patch("lemur.extensions.metrics")
def test_wait_for_dns_change(self, mock_metrics, mock_current_app):
def test_ultradns_wait_for_dns_change(self, mock_metrics, mock_current_app):
ultradns._has_dns_propagated = Mock(return_value=True)
nameserver = "1.1.1.1"
ultradns.get_authoritative_nameserver = Mock(return_value=nameserver)
@ -437,7 +454,7 @@ class TestAcme(unittest.TestCase):
}
mock_current_app.logger.debug.assert_called_with(log_data)
def test_get_zone_name(self):
def test_ultradns_get_zone_name(self):
zones = ['example.com', 'test.example.com']
zone = "test.example.com"
domain = "_acme-challenge.test.example.com"
@ -446,7 +463,7 @@ class TestAcme(unittest.TestCase):
result = ultradns.get_zone_name(domain, account_number)
self.assertEqual(result, zone)
def test_get_zones(self):
def test_ultradns_get_zones(self):
account_number = "1234567890"
path = "a/b/c"
zones = ['example.com', 'test.example.com']

View File

@ -0,0 +1,120 @@
import unittest
from mock import Mock, patch
from lemur.plugins.lemur_acme import plugin, powerdns
class TestPowerdns(unittest.TestCase):
@patch("lemur.plugins.lemur_acme.plugin.dns_provider_service")
def setUp(self, mock_dns_provider_service):
self.ACMEIssuerPlugin = plugin.ACMEIssuerPlugin()
self.acme = plugin.AcmeHandler()
mock_dns_provider = Mock()
mock_dns_provider.name = "powerdns"
mock_dns_provider.credentials = "{}"
mock_dns_provider.provider_type = "powerdns"
self.acme.dns_providers_for_domain = {
"www.test.com": [mock_dns_provider],
"test.fakedomain.net": [mock_dns_provider],
}
@patch("lemur.plugins.lemur_acme.powerdns.current_app")
def test_get_zones(self, mock_current_app):
account_number = "1234567890"
path = "a/b/c"
zones = ['example.com', 'test.example.com']
get_response = [{'account': '', 'dnssec': 'False', 'id': 'example.com.', 'kind': 'Master', 'last_check': 0, 'masters': [],
'name': 'example.com.', 'notified_serial': '2019111907', 'serial': '2019111907',
'url': '/api/v1/servers/localhost/zones/example.com.'},
{'account': '', 'dnssec': 'False', 'id': 'bad.example.com.', 'kind': 'Secondary', 'last_check': 0, 'masters': [],
'name': 'bad.example.com.', 'notified_serial': '2018053104', 'serial': '2018053104',
'url': '/api/v1/servers/localhost/zones/bad.example.com.'},
{'account': '', 'dnssec': 'False', 'id': 'test.example.com.', 'kind': 'Master', 'last_check': 0,
'masters': [], 'name': 'test.example.com.', 'notified_serial': '2019112501', 'serial': '2019112501',
'url': '/api/v1/servers/localhost/zones/test.example.com.'}]
powerdns._check_conf = Mock()
powerdns._get = Mock(path)
powerdns._get.side_effect = [get_response]
mock_current_app.config.get = Mock(return_value="localhost")
result = powerdns.get_zones(account_number)
self.assertEqual(result, zones)
def test_get_zone_name(self):
zones = ['example.com', 'test.example.com']
zone = "test.example.com"
domain = "_acme-challenge.test.example.com"
account_number = "1234567890"
powerdns.get_zones = Mock(return_value=zones)
result = powerdns._get_zone_name(domain, account_number)
self.assertEqual(result, zone)
@patch("lemur.plugins.lemur_acme.powerdns.current_app")
def test_create_txt_record(self, mock_current_app):
domain = "_acme_challenge.test.example.com"
zone = "test.example.com"
token = "ABCDEFGHIJ"
account_number = "1234567890"
change_id = (domain, token)
powerdns._check_conf = Mock()
powerdns._get_zone_name = Mock(return_value=zone)
mock_current_app.logger.debug = Mock()
mock_current_app.config.get = Mock(return_value="localhost")
powerdns._patch = Mock()
log_data = {
"function": "create_txt_record",
"fqdn": domain,
"token": token,
"message": "TXT record successfully created"
}
result = powerdns.create_txt_record(domain, token, account_number)
mock_current_app.logger.debug.assert_called_with(log_data)
self.assertEqual(result, change_id)
@patch("lemur.plugins.lemur_acme.powerdns.dnsutil")
@patch("lemur.plugins.lemur_acme.powerdns.current_app")
@patch("lemur.extensions.metrics")
@patch("time.sleep")
def test_wait_for_dns_change(self, mock_sleep, mock_metrics, mock_current_app, mock_dnsutil):
domain = "_acme-challenge.test.example.com"
token = "ABCDEFG"
zone_name = "test.example.com"
nameserver = "1.1.1.1"
change_id = (domain, token)
powerdns._check_conf = Mock()
mock_records = (token,)
mock_current_app.config.get = Mock(return_value=1)
powerdns._get_zone_name = Mock(return_value=zone_name)
mock_dnsutil.get_authoritative_nameserver = Mock(return_value=nameserver)
mock_dnsutil.get_dns_records = Mock(return_value=mock_records)
mock_sleep.return_value = False
mock_metrics.send = Mock()
mock_current_app.logger.debug = Mock()
powerdns.wait_for_dns_change(change_id)
log_data = {
"function": "wait_for_dns_change",
"fqdn": domain,
"status": True,
"message": "Record status on PowerDNS authoritative server"
}
mock_current_app.logger.debug.assert_called_with(log_data)
@patch("lemur.plugins.lemur_acme.powerdns.current_app")
def test_delete_txt_record(self, mock_current_app):
domain = "_acme_challenge.test.example.com"
zone = "test.example.com"
token = "ABCDEFGHIJ"
account_number = "1234567890"
change_id = (domain, token)
powerdns._check_conf = Mock()
powerdns._get_zone_name = Mock(return_value=zone)
mock_current_app.logger.debug = Mock()
mock_current_app.config.get = Mock(return_value="localhost")
powerdns._patch = Mock()
log_data = {
"function": "delete_txt_record",
"fqdn": domain,
"token": token,
"message": "TXT record successfully deleted"
}
powerdns.delete_txt_record(change_id, account_number, domain, token)
mock_current_app.logger.debug.assert_called_with(log_data)

View File

@ -50,11 +50,19 @@ class VaultSourcePlugin(SourcePlugin):
"helpMessage": "Version of the Vault KV API to use",
},
{
"name": "vaultAuthTokenFile",
"name": "authenticationMethod",
"type": "select",
"value": "token",
"available": ["token", "kubernetes"],
"required": True,
"helpMessage": "Authentication method to use",
},
{
"name": "tokenFile/VaultRole",
"type": "str",
"required": True,
"validation": "(/[^/]+)+",
"helpMessage": "Must be a valid file path!",
"validation": "^([a-zA-Z0-9/._-]+/?)+$",
"helpMessage": "Must be vaild file path for token based auth and valid role if k8s based auth",
},
{
"name": "vaultMount",
@ -85,7 +93,8 @@ class VaultSourcePlugin(SourcePlugin):
cert = []
body = ""
url = self.get_option("vaultUrl", options)
token_file = self.get_option("vaultAuthTokenFile", options)
auth_method = self.get_option("authenticationMethod", options)
auth_key = self.get_option("tokenFile/vaultRole", options)
mount = self.get_option("vaultMount", options)
path = self.get_option("vaultPath", options)
obj_name = self.get_option("objectName", options)
@ -93,10 +102,18 @@ class VaultSourcePlugin(SourcePlugin):
cert_filter = "-----BEGIN CERTIFICATE-----"
cert_delimiter = "-----END CERTIFICATE-----"
with open(token_file, "r") as tfile:
client = hvac.Client(url=url)
if auth_method == 'token':
with open(auth_key, "r") as tfile:
token = tfile.readline().rstrip("\n")
client.token = token
if auth_method == 'kubernetes':
token_path = '/var/run/secrets/kubernetes.io/serviceaccount/token'
with open(token_path, 'r') as f:
jwt = f.read()
client.auth_kubernetes(auth_key, jwt)
client = hvac.Client(url=url, token=token)
client.secrets.kv.default_kv_version = api_version
path = "{0}/{1}".format(path, obj_name)
@ -160,11 +177,19 @@ class VaultDestinationPlugin(DestinationPlugin):
"helpMessage": "Version of the Vault KV API to use",
},
{
"name": "vaultAuthTokenFile",
"name": "authenticationMethod",
"type": "select",
"value": "token",
"available": ["token", "kubernetes"],
"required": True,
"helpMessage": "Authentication method to use",
},
{
"name": "tokenFile/VaultRole",
"type": "str",
"required": True,
"validation": "(/[^/]+)+",
"helpMessage": "Must be a valid file path!",
"validation": "^([a-zA-Z0-9/._-]+/?)+$",
"helpMessage": "Must be vaild file path for token based auth and valid role if k8s based auth",
},
{
"name": "vaultMount",
@ -219,7 +244,8 @@ class VaultDestinationPlugin(DestinationPlugin):
cname = common_name(parse_certificate(body))
url = self.get_option("vaultUrl", options)
token_file = self.get_option("vaultAuthTokenFile", options)
auth_method = self.get_option("authenticationMethod", options)
auth_key = self.get_option("tokenFile/vaultRole", options)
mount = self.get_option("vaultMount", options)
path = self.get_option("vaultPath", options)
bundle = self.get_option("bundleChain", options)
@ -245,10 +271,18 @@ class VaultDestinationPlugin(DestinationPlugin):
exc_info=True,
)
with open(token_file, "r") as tfile:
client = hvac.Client(url=url)
if auth_method == 'token':
with open(auth_key, "r") as tfile:
token = tfile.readline().rstrip("\n")
client.token = token
if auth_method == 'kubernetes':
token_path = '/var/run/secrets/kubernetes.io/serviceaccount/token'
with open(token_path, 'r') as f:
jwt = f.read()
client.auth_kubernetes(auth_key, jwt)
client = hvac.Client(url=url, token=token)
client.secrets.kv.default_kv_version = api_version
if obj_name:

View File

@ -0,0 +1,12 @@
import unittest
from lemur.dns_providers import util as dnsutil
class TestDNSProvider(unittest.TestCase):
def test_is_valid_domain(self):
self.assertTrue(dnsutil.is_valid_domain("example.com"))
self.assertTrue(dnsutil.is_valid_domain("foo.bar.org"))
self.assertTrue(dnsutil.is_valid_domain("_acme-chall.example.com"))
self.assertFalse(dnsutil.is_valid_domain("e/xample.com"))
self.assertFalse(dnsutil.is_valid_domain("exam\ple.com"))
self.assertFalse(dnsutil.is_valid_domain("*.example.com"))