Merge branch 'master' into master

This commit is contained in:
sirferl
2020-08-19 12:29:02 +02:00
committed by GitHub
49 changed files with 2150 additions and 871 deletions

View File

@@ -54,18 +54,30 @@ class AcmeHandler(object):
current_app.logger.error(f"Unable to fetch DNS Providers: {e}")
self.all_dns_providers = []
def find_dns_challenge(self, host, authorizations):
def get_dns_challenges(self, host, authorizations):
"""Get dns challenges for provided domain"""
domain_to_validate, is_wildcard = self.strip_wildcard(host)
dns_challenges = []
for authz in authorizations:
if not authz.body.identifier.value.lower() == host.lower():
if not authz.body.identifier.value.lower() == domain_to_validate.lower():
continue
if is_wildcard and not authz.body.wildcard:
continue
if not is_wildcard and authz.body.wildcard:
continue
for combo in authz.body.challenges:
if isinstance(combo.chall, challenges.DNS01):
dns_challenges.append(combo)
return dns_challenges
def maybe_remove_wildcard(self, host):
return host.replace("*.", "")
def strip_wildcard(self, host):
"""Removes the leading *. and returns Host and whether it was removed or not (True/False)"""
prefix = "*."
if host.startswith(prefix):
return host[len(prefix):], True
return host, False
def maybe_add_extension(self, host, dns_provider_options):
if dns_provider_options and dns_provider_options.get(
@@ -86,9 +98,8 @@ class AcmeHandler(object):
current_app.logger.debug("Starting DNS challenge for {0}".format(host))
change_ids = []
host_to_validate = self.maybe_remove_wildcard(host)
dns_challenges = self.find_dns_challenge(host_to_validate, order.authorizations)
dns_challenges = self.get_dns_challenges(host, order.authorizations)
host_to_validate, _ = self.strip_wildcard(host)
host_to_validate = self.maybe_add_extension(
host_to_validate, dns_provider_options
)
@@ -172,7 +183,7 @@ class AcmeHandler(object):
except (AcmeError, TimeoutError):
sentry.captureException(extra={"order_url": str(order.uri)})
metrics.send("request_certificate_error", "counter", 1)
metrics.send("request_certificate_error", "counter", 1, metric_tags={"uri": order.uri})
current_app.logger.error(
f"Unable to resolve Acme order: {order.uri}", exc_info=True
)
@@ -183,15 +194,26 @@ class AcmeHandler(object):
else:
raise
metrics.send("request_certificate_success", "counter", 1, metric_tags={"uri": order.uri})
current_app.logger.info(
f"Successfully resolved Acme order: {order.uri}", exc_info=True
)
pem_certificate = OpenSSL.crypto.dump_certificate(
OpenSSL.crypto.FILETYPE_PEM,
OpenSSL.crypto.load_certificate(
OpenSSL.crypto.FILETYPE_PEM, orderr.fullchain_pem
),
).decode()
pem_certificate_chain = orderr.fullchain_pem[
len(pem_certificate) : # noqa
].lstrip()
if current_app.config.get("IDENTRUST_CROSS_SIGNED_LE_ICA", False) \
and datetime.datetime.now() < datetime.datetime.strptime(
current_app.config.get("IDENTRUST_CROSS_SIGNED_LE_ICA_EXPIRATION_DATE", "17/03/21"), '%d/%m/%y'):
pem_certificate_chain = current_app.config.get("IDENTRUST_CROSS_SIGNED_LE_ICA")
else:
pem_certificate_chain = orderr.fullchain_pem[
len(pem_certificate) : # noqa
].lstrip()
current_app.logger.debug(
"{0} {1}".format(type(pem_certificate), type(pem_certificate_chain))
@@ -320,7 +342,7 @@ class AcmeHandler(object):
)
dns_provider_options = json.loads(dns_provider.credentials)
account_number = dns_provider_options.get("account_id")
host_to_validate = self.maybe_remove_wildcard(authz_record.host)
host_to_validate, _ = self.strip_wildcard(authz_record.host)
host_to_validate = self.maybe_add_extension(
host_to_validate, dns_provider_options
)
@@ -352,7 +374,7 @@ class AcmeHandler(object):
dns_provider_options = json.loads(dns_provider.credentials)
account_number = dns_provider_options.get("account_id")
dns_challenges = authz_record.dns_challenge
host_to_validate = self.maybe_remove_wildcard(authz_record.host)
host_to_validate, _ = self.strip_wildcard(authz_record.host)
host_to_validate = self.maybe_add_extension(
host_to_validate, dns_provider_options
)

View File

@@ -1,11 +1,10 @@
import time
import requests
import json
import sys
import time
import lemur.common.utils as utils
import lemur.dns_providers.util as dnsutil
import requests
from flask import current_app
from lemur.extensions import metrics, sentry
@@ -17,7 +16,9 @@ REQUIRED_VARIABLES = [
class Zone:
""" This class implements a PowerDNS zone in JSON. """
"""
This class implements a PowerDNS zone in JSON.
"""
def __init__(self, _data):
self._data = _data
@@ -39,7 +40,9 @@ class Zone:
class Record:
""" This class implements a PowerDNS record. """
"""
This class implements a PowerDNS record.
"""
def __init__(self, _data):
self._data = _data
@@ -49,20 +52,30 @@ class Record:
return self._data["name"]
@property
def disabled(self):
return self._data["disabled"]
def type(self):
return self._data["type"]
@property
def ttl(self):
return self._data["ttl"]
@property
def content(self):
return self._data["content"]
@property
def ttl(self):
return self._data["ttl"]
def disabled(self):
return self._data["disabled"]
def get_zones(account_number):
"""Retrieve authoritative zones from the PowerDNS API and return a list"""
"""
Retrieve authoritative zones from the PowerDNS API and return a list of zones
:param account_number:
:raise: Exception
:return: list of Zone Objects
"""
_check_conf()
server_id = current_app.config.get("ACME_POWERDNS_SERVERID", "localhost")
path = f"/api/v1/servers/{server_id}/zones"
@@ -90,44 +103,41 @@ def get_zones(account_number):
def create_txt_record(domain, token, account_number):
""" Create a TXT record for the given domain and token and return a change_id tuple """
"""
Create a TXT record for the given domain and token and return a change_id tuple
:param domain: FQDN
:param token: challenge value
:param account_number:
:return: tuple of domain/token
"""
_check_conf()
zone_name = _get_zone_name(domain, account_number)
server_id = current_app.config.get("ACME_POWERDNS_SERVERID", "localhost")
zone_id = zone_name + "."
domain_id = domain + "."
path = f"/api/v1/servers/{server_id}/zones/{zone_id}"
payload = {
"rrsets": [
{
"name": domain_id,
"type": "TXT",
"ttl": 300,
"changetype": "REPLACE",
"records": [
{
"content": f"\"{token}\"",
"disabled": False
}
],
"comments": []
}
]
}
function = sys._getframe().f_code.co_name
log_data = {
"function": function,
"fqdn": domain,
"token": token,
}
# Create new record
domain_id = domain + "."
records = [Record({'name': domain_id, 'content': f"\"{token}\"", 'disabled': False})]
# Get current records
cur_records = _get_txt_records(domain)
for record in cur_records:
if record.content != token:
records.append(record)
try:
_patch(path, payload)
log_data["message"] = "TXT record successfully created"
_patch_txt_records(domain, account_number, records)
log_data["message"] = "TXT record(s) successfully created"
current_app.logger.debug(log_data)
except Exception as e:
sentry.captureException()
log_data["Exception"] = e
log_data["message"] = "Unable to create TXT record"
log_data["message"] = "Unable to create TXT record(s)"
current_app.logger.debug(log_data)
change_id = (domain, token)
@@ -136,8 +146,11 @@ def create_txt_record(domain, token, account_number):
def wait_for_dns_change(change_id, account_number=None):
"""
Checks the authoritative DNS Server to see if changes have propagated to DNS
Retries and waits until successful.
Checks the authoritative DNS Server to see if changes have propagated.
:param change_id: tuple of domain/token
:param account_number:
:return:
"""
_check_conf()
domain, token = change_id
@@ -171,53 +184,115 @@ def wait_for_dns_change(change_id, account_number=None):
def delete_txt_record(change_id, account_number, domain, token):
""" Delete the TXT record for the given domain and token """
"""
Delete the TXT record for the given domain and token
:param change_id: tuple of domain/token
:param account_number:
:param domain: FQDN
:param token: challenge to delete
:return:
"""
_check_conf()
zone_name = _get_zone_name(domain, account_number)
server_id = current_app.config.get("ACME_POWERDNS_SERVERID", "localhost")
zone_id = zone_name + "."
domain_id = domain + "."
path = f"/api/v1/servers/{server_id}/zones/{zone_id}"
payload = {
"rrsets": [
{
"name": domain_id,
"type": "TXT",
"ttl": 300,
"changetype": "DELETE",
"records": [
{
"content": f"\"{token}\"",
"disabled": False
}
],
"comments": []
}
]
}
function = sys._getframe().f_code.co_name
log_data = {
"function": function,
"fqdn": domain,
"token": token
"token": token,
}
try:
_patch(path, payload)
log_data["message"] = "TXT record successfully deleted"
current_app.logger.debug(log_data)
except Exception as e:
sentry.captureException()
log_data["Exception"] = e
log_data["message"] = "Unable to delete TXT record"
"""
Get existing TXT records matching the domain from DNS
The token to be deleted should already exist
There may be other records with different tokens as well
"""
cur_records = _get_txt_records(domain)
found = False
new_records = []
for record in cur_records:
if record.content == f"\"{token}\"":
found = True
else:
new_records.append(record)
# Since the matching token is not in DNS, there is nothing to delete
if not found:
log_data["message"] = "Unable to delete TXT record: Token not found in existing TXT records"
current_app.logger.debug(log_data)
return
# The record to delete has been found AND there are other tokens set on the same domain
# Since we only want to delete one token value from the RRSet, we need to use the Patch command to
# overwrite the current RRSet with the existing records.
elif new_records:
try:
_patch_txt_records(domain, account_number, new_records)
log_data["message"] = "TXT record successfully deleted"
current_app.logger.debug(log_data)
except Exception as e:
sentry.captureException()
log_data["Exception"] = e
log_data["message"] = "Unable to delete TXT record: patching exception"
current_app.logger.debug(log_data)
# The record to delete has been found AND there are no other token values set on the same domain
# Use the Delete command to delete the whole RRSet.
else:
zone_name = _get_zone_name(domain, account_number)
server_id = current_app.config.get("ACME_POWERDNS_SERVERID", "localhost")
zone_id = zone_name + "."
domain_id = domain + "."
path = f"/api/v1/servers/{server_id}/zones/{zone_id}"
payload = {
"rrsets": [
{
"name": domain_id,
"type": "TXT",
"ttl": 300,
"changetype": "DELETE",
"records": [
{
"content": f"\"{token}\"",
"disabled": False
}
],
"comments": []
}
]
}
function = sys._getframe().f_code.co_name
log_data = {
"function": function,
"fqdn": domain,
"token": token
}
try:
_patch(path, payload)
log_data["message"] = "TXT record successfully deleted"
current_app.logger.debug(log_data)
except Exception as e:
sentry.captureException()
log_data["Exception"] = e
log_data["message"] = "Unable to delete TXT record"
current_app.logger.debug(log_data)
def _check_conf():
"""
Verifies required configuration variables are set
:return:
"""
utils.validate_conf(current_app, REQUIRED_VARIABLES)
def _generate_header():
"""Generate a PowerDNS API header and return it as a dictionary"""
"""
Generate a PowerDNS API header and return it as a dictionary
:return: Dict of header parameters
"""
api_key_name = current_app.config.get("ACME_POWERDNS_APIKEYNAME")
api_key = current_app.config.get("ACME_POWERDNS_APIKEY")
headers = {api_key_name: api_key}
@@ -225,7 +300,13 @@ def _generate_header():
def _get_zone_name(domain, account_number):
"""Get most specific matching zone for the given domain and return as a String"""
"""
Get most specific matching zone for the given domain and return as a String
:param domain: FQDN
:param account_number:
:return: FQDN of domain
"""
zones = get_zones(account_number)
zone_name = ""
for z in zones:
@@ -243,25 +324,113 @@ def _get_zone_name(domain, account_number):
return zone_name
def _get_txt_records(domain):
"""
Retrieve TXT records for a given domain and return list of Record Objects
:param domain: FQDN
:return: list of Record objects
"""
server_id = current_app.config.get("ACME_POWERDNS_SERVERID", "localhost")
path = f"/api/v1/servers/{server_id}/search-data?q={domain}&max=100&object_type=record"
function = sys._getframe().f_code.co_name
log_data = {
"function": function
}
try:
records = _get(path)
log_data["message"] = "Retrieved TXT Records Successfully"
current_app.logger.debug(log_data)
except Exception as e:
sentry.captureException()
log_data["Exception"] = e
log_data["message"] = "Failed to Retrieve TXT Records"
current_app.logger.debug(log_data)
return []
txt_records = []
for record in records:
cur_record = Record(record)
txt_records.append(cur_record)
return txt_records
def _get(path, params=None):
""" Execute a GET request on the given URL (base_uri + path) and return response as JSON object """
"""
Execute a GET request on the given URL (base_uri + path) and return response as JSON object
:param path: Relative URL path
:param params: additional parameters
:return: json response
"""
base_uri = current_app.config.get("ACME_POWERDNS_DOMAIN")
verify_value = current_app.config.get("ACME_POWERDNS_VERIFY", True)
resp = requests.get(
f"{base_uri}{path}",
headers=_generate_header(),
params=params,
verify=True,
verify=verify_value
)
resp.raise_for_status()
return resp.json()
def _patch_txt_records(domain, account_number, records):
"""
Send Patch request to PowerDNS Server
:param domain: FQDN
:param account_number:
:param records: List of Record objects
:return:
"""
domain_id = domain + "."
# Create records
txt_records = []
for record in records:
txt_records.append(
{'content': record.content, 'disabled': record.disabled}
)
# Create RRSet
payload = {
"rrsets": [
{
"name": domain_id,
"type": "TXT",
"ttl": 300,
"changetype": "REPLACE",
"records": txt_records,
"comments": []
}
]
}
# Create Txt Records
server_id = current_app.config.get("ACME_POWERDNS_SERVERID", "localhost")
zone_name = _get_zone_name(domain, account_number)
zone_id = zone_name + "."
path = f"/api/v1/servers/{server_id}/zones/{zone_id}"
_patch(path, payload)
def _patch(path, payload):
""" Execute a Patch request on the given URL (base_uri + path) with given payload """
"""
Execute a Patch request on the given URL (base_uri + path) with given payload
:param path:
:param payload:
:return:
"""
base_uri = current_app.config.get("ACME_POWERDNS_DOMAIN")
verify_value = current_app.config.get("ACME_POWERDNS_VERIFY", True)
resp = requests.patch(
f"{base_uri}{path}",
data=json.dumps(payload),
headers=_generate_header()
headers=_generate_header(),
verify=verify_value
)
resp.raise_for_status()

View File

@@ -35,9 +35,10 @@ def get_zones(client=None):
zones = []
for page in paginator.paginate():
for zone in page["HostedZones"]:
zones.append(
zone["Name"][:-1]
) # We need [:-1] to strip out the trailing dot.
if not zone["Config"]["PrivateZone"]:
zones.append(
zone["Name"][:-1]
) # We need [:-1] to strip out the trailing dot.
return zones

View File

@@ -1,11 +1,9 @@
import unittest
from unittest.mock import patch, Mock
from cryptography.x509 import DNSName
from requests.models import Response
from mock import MagicMock, Mock, patch
from lemur.plugins.lemur_acme import plugin, ultradns
from lemur.plugins.lemur_acme import plugin
from mock import MagicMock
class TestAcme(unittest.TestCase):
@@ -23,11 +21,12 @@ class TestAcme(unittest.TestCase):
}
@patch("lemur.plugins.lemur_acme.plugin.len", return_value=1)
def test_find_dns_challenge(self, mock_len):
def test_get_dns_challenges(self, mock_len):
assert mock_len
from acme import challenges
host = "example.com"
c = challenges.DNS01()
mock_authz = Mock()
@@ -35,9 +34,18 @@ class TestAcme(unittest.TestCase):
mock_entry = Mock()
mock_entry.chall = c
mock_authz.body.resolved_combinations.append(mock_entry)
result = yield self.acme.find_dns_challenge(mock_authz)
result = yield self.acme.get_dns_challenges(host, mock_authz)
self.assertEqual(result, mock_entry)
def test_strip_wildcard(self):
expected = ("example.com", False)
result = self.acme.strip_wildcard("example.com")
self.assertEqual(expected, result)
expected = ("example.com", True)
result = self.acme.strip_wildcard("*.example.com")
self.assertEqual(expected, result)
def test_authz_record(self):
a = plugin.AuthorizationRecord("host", "authz", "challenge", "id")
self.assertEqual(type(a), plugin.AuthorizationRecord)
@@ -45,9 +53,9 @@ class TestAcme(unittest.TestCase):
@patch("acme.client.Client")
@patch("lemur.plugins.lemur_acme.plugin.current_app")
@patch("lemur.plugins.lemur_acme.plugin.len", return_value=1)
@patch("lemur.plugins.lemur_acme.plugin.AcmeHandler.find_dns_challenge")
@patch("lemur.plugins.lemur_acme.plugin.AcmeHandler.get_dns_challenges")
def test_start_dns_challenge(
self, mock_find_dns_challenge, mock_len, mock_app, mock_acme
self, mock_get_dns_challenges, mock_len, mock_app, mock_acme
):
assert mock_len
mock_order = Mock()
@@ -65,7 +73,7 @@ class TestAcme(unittest.TestCase):
mock_dns_provider.create_txt_record = Mock(return_value=1)
values = [mock_entry]
iterable = mock_find_dns_challenge.return_value
iterable = mock_get_dns_challenges.return_value
iterator = iter(values)
iterable.__iter__.return_value = iterator
result = self.acme.start_dns_challenge(
@@ -78,7 +86,7 @@ class TestAcme(unittest.TestCase):
@patch("lemur.plugins.lemur_acme.cloudflare.wait_for_dns_change")
@patch("time.sleep")
def test_complete_dns_challenge_success(
self, mock_sleep, mock_wait_for_dns_change, mock_current_app, mock_acme
self, mock_sleep, mock_wait_for_dns_change, mock_current_app, mock_acme
):
mock_dns_provider = Mock()
mock_dns_provider.wait_for_dns_change = Mock(return_value=True)
@@ -102,7 +110,7 @@ class TestAcme(unittest.TestCase):
@patch("lemur.plugins.lemur_acme.plugin.current_app")
@patch("lemur.plugins.lemur_acme.cloudflare.wait_for_dns_change")
def test_complete_dns_challenge_fail(
self, mock_wait_for_dns_change, mock_current_app, mock_acme
self, mock_wait_for_dns_change, mock_current_app, mock_acme
):
mock_dns_provider = Mock()
mock_dns_provider.wait_for_dns_change = Mock(return_value=True)
@@ -127,15 +135,15 @@ class TestAcme(unittest.TestCase):
@patch("acme.client.Client")
@patch("OpenSSL.crypto", return_value="mock_cert")
@patch("josepy.util.ComparableX509")
@patch("lemur.plugins.lemur_acme.plugin.AcmeHandler.find_dns_challenge")
@patch("lemur.plugins.lemur_acme.plugin.AcmeHandler.get_dns_challenges")
@patch("lemur.plugins.lemur_acme.plugin.current_app")
def test_request_certificate(
self,
mock_current_app,
mock_find_dns_challenge,
mock_jose,
mock_crypto,
mock_acme,
self,
mock_current_app,
mock_get_dns_challenges,
mock_jose,
mock_crypto,
mock_acme,
):
mock_cert_response = Mock()
mock_cert_response.body = "123"
@@ -148,6 +156,7 @@ class TestAcme(unittest.TestCase):
mock_acme.fetch_chain = Mock(return_value="mock_chain")
mock_crypto.dump_certificate = Mock(return_value=b"chain")
mock_order = Mock()
mock_current_app.config = {}
self.acme.request_certificate(mock_acme, [], mock_order)
def test_setup_acme_client_fail(self):
@@ -172,7 +181,7 @@ class TestAcme(unittest.TestCase):
assert result_client
assert result_registration
@patch("lemur.plugins.lemur_acme.plugin.current_app")
@patch('lemur.plugins.lemur_acme.plugin.current_app')
def test_get_domains_single(self, mock_current_app):
options = {"common_name": "test.netflix.net"}
result = self.acme.get_domains(options)
@@ -256,11 +265,11 @@ class TestAcme(unittest.TestCase):
@patch("lemur.plugins.lemur_acme.cloudflare.current_app")
@patch("lemur.plugins.lemur_acme.plugin.dns_provider_service")
def test_get_dns_provider(
self,
mock_dns_provider_service,
mock_current_app_cloudflare,
mock_current_app_dyn,
mock_current_app,
self,
mock_dns_provider_service,
mock_current_app_cloudflare,
mock_current_app_dyn,
mock_current_app,
):
provider = plugin.ACMEIssuerPlugin()
route53 = provider.get_dns_provider("route53")
@@ -278,14 +287,14 @@ class TestAcme(unittest.TestCase):
@patch("lemur.plugins.lemur_acme.plugin.AcmeHandler.finalize_authorizations")
@patch("lemur.plugins.lemur_acme.plugin.AcmeHandler.request_certificate")
def test_get_ordered_certificate(
self,
mock_request_certificate,
mock_finalize_authorizations,
mock_get_authorizations,
mock_dns_provider_service,
mock_authorization_service,
mock_current_app,
mock_acme,
self,
mock_request_certificate,
mock_finalize_authorizations,
mock_get_authorizations,
mock_dns_provider_service,
mock_authorization_service,
mock_current_app,
mock_acme,
):
mock_client = Mock()
mock_acme.return_value = (mock_client, "")
@@ -309,14 +318,14 @@ class TestAcme(unittest.TestCase):
@patch("lemur.plugins.lemur_acme.plugin.AcmeHandler.finalize_authorizations")
@patch("lemur.plugins.lemur_acme.plugin.AcmeHandler.request_certificate")
def test_get_ordered_certificates(
self,
mock_request_certificate,
mock_finalize_authorizations,
mock_get_authorizations,
mock_dns_provider_service,
mock_authorization_service,
mock_current_app,
mock_acme,
self,
mock_request_certificate,
mock_finalize_authorizations,
mock_get_authorizations,
mock_dns_provider_service,
mock_authorization_service,
mock_current_app,
mock_acme,
):
mock_client = Mock()
mock_acme.return_value = (mock_client, "")
@@ -349,14 +358,14 @@ class TestAcme(unittest.TestCase):
@patch("lemur.plugins.lemur_acme.plugin.AcmeHandler.request_certificate")
@patch("lemur.plugins.lemur_acme.plugin.authorization_service")
def test_create_certificate(
self,
mock_authorization_service,
mock_request_certificate,
mock_finalize_authorizations,
mock_get_authorizations,
mock_current_app,
mock_dns_provider_service,
mock_acme,
self,
mock_authorization_service,
mock_request_certificate,
mock_finalize_authorizations,
mock_get_authorizations,
mock_current_app,
mock_dns_provider_service,
mock_acme,
):
provider = plugin.ACMEIssuerPlugin()
mock_authority = Mock()
@@ -378,121 +387,3 @@ class TestAcme(unittest.TestCase):
mock_request_certificate.return_value = ("pem_certificate", "chain")
result = provider.create_certificate(csr, issuer_options)
assert result
@patch("lemur.plugins.lemur_acme.ultradns.requests")
@patch("lemur.plugins.lemur_acme.ultradns.current_app")
def test_ultradns_get_token(self, mock_current_app, mock_requests):
# ret_val = json.dumps({"access_token": "access"})
the_response = Response()
the_response._content = b'{"access_token": "access"}'
mock_requests.post = Mock(return_value=the_response)
mock_current_app.config.get = Mock(return_value="Test")
result = ultradns.get_ultradns_token()
self.assertTrue(len(result) > 0)
@patch("lemur.plugins.lemur_acme.ultradns.current_app")
def test_ultradns_create_txt_record(self, mock_current_app):
domain = "_acme_challenge.test.example.com"
zone = "test.example.com"
token = "ABCDEFGHIJ"
account_number = "1234567890"
change_id = (domain, token)
ultradns.get_zone_name = Mock(return_value=zone)
mock_current_app.logger.debug = Mock()
ultradns._post = Mock()
log_data = {
"function": "create_txt_record",
"fqdn": domain,
"token": token,
"message": "TXT record created"
}
result = ultradns.create_txt_record(domain, token, account_number)
mock_current_app.logger.debug.assert_called_with(log_data)
self.assertEqual(result, change_id)
@patch("lemur.plugins.lemur_acme.ultradns.current_app")
@patch("lemur.extensions.metrics")
def test_ultradns_delete_txt_record(self, mock_metrics, mock_current_app):
domain = "_acme_challenge.test.example.com"
zone = "test.example.com"
token = "ABCDEFGHIJ"
account_number = "1234567890"
change_id = (domain, token)
mock_current_app.logger.debug = Mock()
ultradns.get_zone_name = Mock(return_value=zone)
ultradns._post = Mock()
ultradns._get = Mock()
ultradns._get.return_value = {'zoneName': 'test.example.com.com',
'rrSets': [{'ownerName': '_acme-challenge.test.example.com.',
'rrtype': 'TXT (16)', 'ttl': 5, 'rdata': ['ABCDEFGHIJ']}],
'queryInfo': {'sort': 'OWNER', 'reverse': False, 'limit': 100},
'resultInfo': {'totalCount': 1, 'offset': 0, 'returnedCount': 1}}
ultradns._delete = Mock()
mock_metrics.send = Mock()
ultradns.delete_txt_record(change_id, account_number, domain, token)
mock_current_app.logger.debug.assert_not_called()
mock_metrics.send.assert_not_called()
@patch("lemur.plugins.lemur_acme.ultradns.current_app")
@patch("lemur.extensions.metrics")
def test_ultradns_wait_for_dns_change(self, mock_metrics, mock_current_app):
ultradns._has_dns_propagated = Mock(return_value=True)
nameserver = "1.1.1.1"
ultradns.get_authoritative_nameserver = Mock(return_value=nameserver)
mock_metrics.send = Mock()
domain = "_acme-challenge.test.example.com"
token = "ABCDEFGHIJ"
change_id = (domain, token)
mock_current_app.logger.debug = Mock()
ultradns.wait_for_dns_change(change_id)
# mock_metrics.send.assert_not_called()
log_data = {
"function": "wait_for_dns_change",
"fqdn": domain,
"status": True,
"message": "Record status on Public DNS"
}
mock_current_app.logger.debug.assert_called_with(log_data)
def test_ultradns_get_zone_name(self):
zones = ['example.com', 'test.example.com']
zone = "test.example.com"
domain = "_acme-challenge.test.example.com"
account_number = "1234567890"
ultradns.get_zones = Mock(return_value=zones)
result = ultradns.get_zone_name(domain, account_number)
self.assertEqual(result, zone)
def test_ultradns_get_zones(self):
account_number = "1234567890"
path = "a/b/c"
zones = ['example.com', 'test.example.com']
paginate_response = [{
'properties': {
'name': 'example.com.', 'accountName': 'example', 'type': 'PRIMARY',
'dnssecStatus': 'UNSIGNED', 'status': 'ACTIVE', 'resourceRecordCount': 9,
'lastModifiedDateTime': '2017-06-14T06:45Z'},
'registrarInfo': {
'nameServers': {'missing': ['example.ultradns.com.', 'example.ultradns.net.',
'example.ultradns.biz.', 'example.ultradns.org.']}},
'inherit': 'ALL'}, {
'properties': {
'name': 'test.example.com.', 'accountName': 'example', 'type': 'PRIMARY',
'dnssecStatus': 'UNSIGNED', 'status': 'ACTIVE', 'resourceRecordCount': 9,
'lastModifiedDateTime': '2017-06-14T06:45Z'},
'registrarInfo': {
'nameServers': {'missing': ['example.ultradns.com.', 'example.ultradns.net.',
'example.ultradns.biz.', 'example.ultradns.org.']}},
'inherit': 'ALL'}, {
'properties': {
'name': 'example2.com.', 'accountName': 'example', 'type': 'SECONDARY',
'dnssecStatus': 'UNSIGNED', 'status': 'ACTIVE', 'resourceRecordCount': 9,
'lastModifiedDateTime': '2017-06-14T06:45Z'},
'registrarInfo': {
'nameServers': {'missing': ['example.ultradns.com.', 'example.ultradns.net.',
'example.ultradns.biz.', 'example.ultradns.org.']}},
'inherit': 'ALL'}]
ultradns._paginate = Mock(path, "zones")
ultradns._paginate.side_effect = [[paginate_response]]
result = ultradns.get_zones(account_number)
self.assertEqual(result, zones)

View File

@@ -1,5 +1,5 @@
import unittest
from mock import Mock, patch
from unittest.mock import patch, Mock
from lemur.plugins.lemur_acme import plugin, powerdns
@@ -48,13 +48,14 @@ class TestPowerdns(unittest.TestCase):
self.assertEqual(result, zone)
@patch("lemur.plugins.lemur_acme.powerdns.current_app")
def test_create_txt_record(self, mock_current_app):
def test_create_txt_record_write_only(self, mock_current_app):
domain = "_acme_challenge.test.example.com"
zone = "test.example.com"
token = "ABCDEFGHIJ"
account_number = "1234567890"
change_id = (domain, token)
powerdns._check_conf = Mock()
powerdns._get_txt_records = Mock(return_value=[])
powerdns._get_zone_name = Mock(return_value=zone)
mock_current_app.logger.debug = Mock()
mock_current_app.config.get = Mock(return_value="localhost")
@@ -63,24 +64,74 @@ class TestPowerdns(unittest.TestCase):
"function": "create_txt_record",
"fqdn": domain,
"token": token,
"message": "TXT record successfully created"
"message": "TXT record(s) successfully created"
}
result = powerdns.create_txt_record(domain, token, account_number)
mock_current_app.logger.debug.assert_called_with(log_data)
self.assertEqual(result, change_id)
@patch("lemur.plugins.lemur_acme.powerdns.current_app")
def test_create_txt_record_append(self, mock_current_app):
domain = "_acme_challenge.test.example.com"
zone = "test.example.com"
token = "ABCDEFGHIJ"
account_number = "1234567890"
change_id = (domain, token)
powerdns._check_conf = Mock()
cur_token = "123456"
cur_records = [powerdns.Record({'name': domain, 'content': f"\"{cur_token}\"", 'disabled': False})]
powerdns._get_txt_records = Mock(return_value=cur_records)
powerdns._get_zone_name = Mock(return_value=zone)
mock_current_app.logger.debug = Mock()
mock_current_app.config.get = Mock(return_value="localhost")
powerdns._patch = Mock()
log_data = {
"function": "create_txt_record",
"fqdn": domain,
"token": token,
"message": "TXT record(s) successfully created"
}
expected_path = "/api/v1/servers/localhost/zones/test.example.com."
expected_payload = {
"rrsets": [
{
"name": domain + ".",
"type": "TXT",
"ttl": 300,
"changetype": "REPLACE",
"records": [
{
"content": f"\"{token}\"",
"disabled": False
},
{
"content": f"\"{cur_token}\"",
"disabled": False
}
],
"comments": []
}
]
}
result = powerdns.create_txt_record(domain, token, account_number)
mock_current_app.logger.debug.assert_called_with(log_data)
powerdns._patch.assert_called_with(expected_path, expected_payload)
self.assertEqual(result, change_id)
@patch("lemur.plugins.lemur_acme.powerdns.dnsutil")
@patch("lemur.plugins.lemur_acme.powerdns.current_app")
@patch("lemur.extensions.metrics")
@patch("time.sleep")
def test_wait_for_dns_change(self, mock_sleep, mock_metrics, mock_current_app, mock_dnsutil):
domain = "_acme-challenge.test.example.com"
token = "ABCDEFG"
token1 = "ABCDEFG"
token2 = "HIJKLMN"
zone_name = "test.example.com"
nameserver = "1.1.1.1"
change_id = (domain, token)
change_id = (domain, token1)
powerdns._check_conf = Mock()
mock_records = (token,)
mock_records = (token2, token1)
mock_current_app.config.get = Mock(return_value=1)
powerdns._get_zone_name = Mock(return_value=zone_name)
mock_dnsutil.get_authoritative_nameserver = Mock(return_value=nameserver)
@@ -114,7 +165,7 @@ class TestPowerdns(unittest.TestCase):
"function": "delete_txt_record",
"fqdn": domain,
"token": token,
"message": "TXT record successfully deleted"
"message": "Unable to delete TXT record: Token not found in existing TXT records"
}
powerdns.delete_txt_record(change_id, account_number, domain, token)
mock_current_app.logger.debug.assert_called_with(log_data)

View File

@@ -0,0 +1,138 @@
import unittest
from unittest.mock import patch, Mock
from lemur.plugins.lemur_acme import plugin, ultradns
from requests.models import Response
class TestUltradns(unittest.TestCase):
@patch("lemur.plugins.lemur_acme.plugin.dns_provider_service")
def setUp(self, mock_dns_provider_service):
self.ACMEIssuerPlugin = plugin.ACMEIssuerPlugin()
self.acme = plugin.AcmeHandler()
mock_dns_provider = Mock()
mock_dns_provider.name = "cloudflare"
mock_dns_provider.credentials = "{}"
mock_dns_provider.provider_type = "cloudflare"
self.acme.dns_providers_for_domain = {
"www.test.com": [mock_dns_provider],
"test.fakedomain.net": [mock_dns_provider],
}
@patch("lemur.plugins.lemur_acme.ultradns.requests")
@patch("lemur.plugins.lemur_acme.ultradns.current_app")
def test_ultradns_get_token(self, mock_current_app, mock_requests):
# ret_val = json.dumps({"access_token": "access"})
the_response = Response()
the_response._content = b'{"access_token": "access"}'
mock_requests.post = Mock(return_value=the_response)
mock_current_app.config.get = Mock(return_value="Test")
result = ultradns.get_ultradns_token()
self.assertTrue(len(result) > 0)
@patch("lemur.plugins.lemur_acme.ultradns.current_app")
def test_ultradns_create_txt_record(self, mock_current_app):
domain = "_acme_challenge.test.example.com"
zone = "test.example.com"
token = "ABCDEFGHIJ"
account_number = "1234567890"
change_id = (domain, token)
ultradns.get_zone_name = Mock(return_value=zone)
mock_current_app.logger.debug = Mock()
ultradns._post = Mock()
log_data = {
"function": "create_txt_record",
"fqdn": domain,
"token": token,
"message": "TXT record created"
}
result = ultradns.create_txt_record(domain, token, account_number)
mock_current_app.logger.debug.assert_called_with(log_data)
self.assertEqual(result, change_id)
@patch("lemur.plugins.lemur_acme.ultradns.current_app")
@patch("lemur.extensions.metrics")
def test_ultradns_delete_txt_record(self, mock_metrics, mock_current_app):
domain = "_acme_challenge.test.example.com"
zone = "test.example.com"
token = "ABCDEFGHIJ"
account_number = "1234567890"
change_id = (domain, token)
mock_current_app.logger.debug = Mock()
ultradns.get_zone_name = Mock(return_value=zone)
ultradns._post = Mock()
ultradns._get = Mock()
ultradns._get.return_value = {'zoneName': 'test.example.com.com',
'rrSets': [{'ownerName': '_acme-challenge.test.example.com.',
'rrtype': 'TXT (16)', 'ttl': 5, 'rdata': ['ABCDEFGHIJ']}],
'queryInfo': {'sort': 'OWNER', 'reverse': False, 'limit': 100},
'resultInfo': {'totalCount': 1, 'offset': 0, 'returnedCount': 1}}
ultradns._delete = Mock()
mock_metrics.send = Mock()
ultradns.delete_txt_record(change_id, account_number, domain, token)
mock_current_app.logger.debug.assert_not_called()
mock_metrics.send.assert_not_called()
@patch("lemur.plugins.lemur_acme.ultradns.current_app")
@patch("lemur.extensions.metrics")
def test_ultradns_wait_for_dns_change(self, mock_metrics, mock_current_app):
ultradns._has_dns_propagated = Mock(return_value=True)
nameserver = "1.1.1.1"
ultradns.get_authoritative_nameserver = Mock(return_value=nameserver)
mock_metrics.send = Mock()
domain = "_acme-challenge.test.example.com"
token = "ABCDEFGHIJ"
change_id = (domain, token)
mock_current_app.logger.debug = Mock()
ultradns.wait_for_dns_change(change_id)
# mock_metrics.send.assert_not_called()
log_data = {
"function": "wait_for_dns_change",
"fqdn": domain,
"status": True,
"message": "Record status on Public DNS"
}
mock_current_app.logger.debug.assert_called_with(log_data)
def test_ultradns_get_zone_name(self):
zones = ['example.com', 'test.example.com']
zone = "test.example.com"
domain = "_acme-challenge.test.example.com"
account_number = "1234567890"
ultradns.get_zones = Mock(return_value=zones)
result = ultradns.get_zone_name(domain, account_number)
self.assertEqual(result, zone)
def test_ultradns_get_zones(self):
account_number = "1234567890"
path = "a/b/c"
zones = ['example.com', 'test.example.com']
paginate_response = [{
'properties': {
'name': 'example.com.', 'accountName': 'example', 'type': 'PRIMARY',
'dnssecStatus': 'UNSIGNED', 'status': 'ACTIVE', 'resourceRecordCount': 9,
'lastModifiedDateTime': '2017-06-14T06:45Z'},
'registrarInfo': {
'nameServers': {'missing': ['example.ultradns.com.', 'example.ultradns.net.',
'example.ultradns.biz.', 'example.ultradns.org.']}},
'inherit': 'ALL'}, {
'properties': {
'name': 'test.example.com.', 'accountName': 'example', 'type': 'PRIMARY',
'dnssecStatus': 'UNSIGNED', 'status': 'ACTIVE', 'resourceRecordCount': 9,
'lastModifiedDateTime': '2017-06-14T06:45Z'},
'registrarInfo': {
'nameServers': {'missing': ['example.ultradns.com.', 'example.ultradns.net.',
'example.ultradns.biz.', 'example.ultradns.org.']}},
'inherit': 'ALL'}, {
'properties': {
'name': 'example2.com.', 'accountName': 'example', 'type': 'SECONDARY',
'dnssecStatus': 'UNSIGNED', 'status': 'ACTIVE', 'resourceRecordCount': 9,
'lastModifiedDateTime': '2017-06-14T06:45Z'},
'registrarInfo': {
'nameServers': {'missing': ['example.ultradns.com.', 'example.ultradns.net.',
'example.ultradns.biz.', 'example.ultradns.org.']}},
'inherit': 'ALL'}]
ultradns._paginate = Mock(path, "zones")
ultradns._paginate.side_effect = [[paginate_response]]
result = ultradns.get_zones(account_number)
self.assertEqual(result, zones)

View File

@@ -24,6 +24,12 @@ def retry_throttled(exception):
if exception.response["Error"]["Code"] == "NoSuchEntity":
return False
# No need to retry deletion requests if there is a DeleteConflict error.
# This error indicates that the certificate is still attached to an entity
# and cannot be deleted.
if exception.response["Error"]["Code"] == "DeleteConflict":
return False
metrics.send("iam_retry", "counter", 1, metric_tags={"exception": str(exception)})
return True

View File

@@ -216,22 +216,24 @@ class AWSSourcePlugin(SourcePlugin):
for region in regions:
elbs = elb.get_all_elbs(account_number=account_number, region=region)
current_app.logger.info(
"Describing classic load balancers in {0}-{1}".format(
account_number, region
)
)
current_app.logger.info({
"message": "Describing classic load balancers",
"account_number": account_number,
"region": region,
"number_of_load_balancers": len(elbs)
})
for e in elbs:
endpoints.extend(get_elb_endpoints(account_number, region, e))
# fetch advanced ELBs
elbs_v2 = elb.get_all_elbs_v2(account_number=account_number, region=region)
current_app.logger.info(
"Describing advanced load balancers in {0}-{1}".format(
account_number, region
)
)
current_app.logger.info({
"message": "Describing advanced load balancers",
"account_number": account_number,
"region": region,
"number_of_load_balancers": len(elbs_v2)
})
for e in elbs_v2:
endpoints.extend(get_elb_endpoints_v2(account_number, region, e))
@@ -325,14 +327,17 @@ class AWSDestinationPlugin(DestinationPlugin):
]
def upload(self, name, body, private_key, cert_chain, options, **kwargs):
iam.upload_cert(
name,
body,
private_key,
self.get_option("path", options),
cert_chain=cert_chain,
account_number=self.get_option("accountNumber", options),
)
try:
iam.upload_cert(
name,
body,
private_key,
self.get_option("path", options),
cert_chain=cert_chain,
account_number=self.get_option("accountNumber", options),
)
except ClientError:
sentry.captureException()
def deploy(self, elb_name, account, region, certificate):
pass

View File

@@ -24,7 +24,12 @@ from lemur.certificates.service import create_csr
def build_certificate_authority(options):
options["certificate_authority"] = True
csr, private_key = create_csr(**options)
cert_pem, chain_cert_pem = issue_certificate(csr, options, private_key)
if options.get("parent"):
# Intermediate Cert Issuance
cert_pem, chain_cert_pem = issue_certificate(csr, options, None)
else:
cert_pem, chain_cert_pem = issue_certificate(csr, options, private_key)
return cert_pem, private_key, chain_cert_pem

View File

@@ -25,6 +25,31 @@ def test_build_certificate_authority():
assert chain_cert_pem == ""
def test_build_intermediate_certificate_authority(authority):
from lemur.plugins.lemur_cryptography.plugin import build_certificate_authority
options = {
"key_type": "RSA2048",
"country": "US",
"state": "CA",
"location": "Example place",
"organization": "Example, Inc.",
"organizational_unit": "Example Unit",
"common_name": "Example INTERMEDIATE",
"validity_start": arrow.get("2016-12-01").datetime,
"validity_end": arrow.get("2016-12-02").datetime,
"first_serial": 1,
"serial_number": 1,
"owner": "owner@example.com",
"parent": authority
}
cert_pem, private_key_pem, chain_cert_pem = build_certificate_authority(options)
assert cert_pem
assert private_key_pem
assert chain_cert_pem == authority.authority_certificate.body
def test_issue_certificate(authority):
from lemur.tests.vectors import CSR_STR
from lemur.plugins.lemur_cryptography.plugin import issue_certificate

View File

@@ -14,21 +14,17 @@
.. moduleauthor:: Kevin Glisson <kglisson@netflix.com>
"""
import json
import arrow
import requests
import pem
from retrying import retry
from flask import current_app
import requests
from cryptography import x509
from lemur.extensions import metrics
from flask import current_app
from lemur.common.utils import validate_conf
from lemur.plugins.bases import IssuerPlugin, SourcePlugin
from lemur.extensions import metrics
from lemur.plugins import lemur_digicert as digicert
from lemur.plugins.bases import IssuerPlugin, SourcePlugin
from retrying import retry
def log_status_code(r, *args, **kwargs):
@@ -64,24 +60,35 @@ def signature_hash(signing_algorithm):
raise Exception("Unsupported signing algorithm.")
def determine_validity_years(end_date):
"""Given an end date determine how many years into the future that date is.
def determine_validity_years(years):
"""
Considering maximum allowed certificate validity period of 397 days, this method should not return
more than 1 year of validity. Thus changing it to always return 1.
Lemur will change this method in future to handle validity in months (determine_validity_months)
instead of years. This will allow flexibility to handle short-lived certificates.
:param years:
:return: 1
"""
return 1
def determine_end_date(end_date):
"""
Determine appropriate end date
:param end_date:
:return: str validity in years
:return: validity_end
"""
now = arrow.utcnow()
default_days = current_app.config.get("DIGICERT_DEFAULT_VALIDITY_DAYS", 397)
max_validity_end = arrow.utcnow().shift(days=current_app.config.get("DIGICERT_MAX_VALIDITY_DAYS", default_days))
if end_date < now.shift(years=+1):
return 1
elif end_date < now.shift(years=+2):
return 2
elif end_date < now.shift(years=+3):
return 3
if not end_date:
end_date = arrow.utcnow().shift(days=default_days)
raise Exception(
"DigiCert issued certificates cannot exceed three" " years in validity"
)
if end_date > max_validity_end:
end_date = max_validity_end
return end_date
def get_additional_names(options):
@@ -107,12 +114,6 @@ def map_fields(options, csr):
:param csr:
:return: dict or valid DigiCert options
"""
if not options.get("validity_years"):
if not options.get("validity_end"):
options["validity_years"] = current_app.config.get(
"DIGICERT_DEFAULT_VALIDITY", 1
)
data = dict(
certificate={
"common_name": options["common_name"],
@@ -125,9 +126,11 @@ def map_fields(options, csr):
data["certificate"]["dns_names"] = get_additional_names(options)
if options.get("validity_years"):
data["validity_years"] = options["validity_years"]
data["validity_years"] = determine_validity_years(options.get("validity_years"))
elif options.get("validity_end"):
data["custom_expiration_date"] = determine_end_date(options.get("validity_end")).format("YYYY-MM-DD")
else:
data["custom_expiration_date"] = options["validity_end"].format("YYYY-MM-DD")
data["validity_years"] = determine_validity_years(0)
if current_app.config.get("DIGICERT_PRIVATE", False):
if "product" in data:
@@ -144,18 +147,15 @@ def map_cis_fields(options, csr):
:param options:
:param csr:
:return:
:return: data
"""
if not options.get("validity_years"):
if not options.get("validity_end"):
options["validity_end"] = arrow.utcnow().shift(
years=current_app.config.get("DIGICERT_DEFAULT_VALIDITY", 1)
)
options["validity_years"] = determine_validity_years(options["validity_end"])
if options.get("validity_years"):
validity_end = determine_end_date(arrow.utcnow().shift(years=options["validity_years"]))
elif options.get("validity_end"):
validity_end = determine_end_date(options.get("validity_end"))
else:
options["validity_end"] = arrow.utcnow().shift(
years=options["validity_years"]
)
validity_end = determine_end_date(False)
data = {
"profile_name": current_app.config.get("DIGICERT_CIS_PROFILE_NAMES", {}).get(options['authority'].name),
@@ -164,7 +164,7 @@ def map_cis_fields(options, csr):
"csr": csr,
"signature_hash": signature_hash(options.get("signing_algorithm")),
"validity": {
"valid_to": options["validity_end"].format("YYYY-MM-DDTHH:MM") + "Z"
"valid_to": validity_end.format("YYYY-MM-DDTHH:MM") + "Z"
},
"organization": {
"name": options["organization"],
@@ -173,7 +173,8 @@ def map_cis_fields(options, csr):
}
# possibility to default to a SIGNING_ALGORITHM for a given profile
if current_app.config.get("DIGICERT_CIS_SIGNING_ALGORITHMS", {}).get(options['authority'].name):
data["signature_hash"] = current_app.config.get("DIGICERT_CIS_SIGNING_ALGORITHMS", {}).get(options['authority'].name)
data["signature_hash"] = current_app.config.get("DIGICERT_CIS_SIGNING_ALGORITHMS", {}).get(
options['authority'].name)
return data

View File

@@ -1,117 +1,122 @@
import pytest
import arrow
import json
from unittest.mock import patch
from unittest.mock import patch, Mock
import arrow
import pytest
from cryptography import x509
from freezegun import freeze_time
from lemur.plugins.lemur_digicert import plugin
from lemur.tests.vectors import CSR_STR
from cryptography import x509
def test_map_fields_with_validity_end_and_start(app):
from lemur.plugins.lemur_digicert.plugin import map_fields
names = [u"one.example.com", u"two.example.com", u"three.example.com"]
options = {
"common_name": "example.com",
"owner": "bob@example.com",
"description": "test certificate",
"extensions": {"sub_alt_names": {"names": [x509.DNSName(x) for x in names]}},
"validity_end": arrow.get(2017, 5, 7),
"validity_start": arrow.get(2016, 10, 30),
}
data = map_fields(options, CSR_STR)
assert data == {
"certificate": {
"csr": CSR_STR,
"common_name": "example.com",
"dns_names": names,
"signature_hash": "sha256",
},
"organization": {"id": 111111},
"custom_expiration_date": arrow.get(2017, 5, 7).format("YYYY-MM-DD"),
def config_mock(*args):
values = {
"DIGICERT_ORG_ID": 111111,
"DIGICERT_PRIVATE": False,
"DIGICERT_DEFAULT_SIGNING_ALGORITHM": "sha256",
"DIGICERT_CIS_PROFILE_NAMES": {"digicert": 'digicert'},
"DIGICERT_CIS_SIGNING_ALGORITHMS": {"digicert": 'digicert'},
}
return values[args[0]]
def test_map_fields_with_validity_years(app):
from lemur.plugins.lemur_digicert.plugin import map_fields
names = [u"one.example.com", u"two.example.com", u"three.example.com"]
options = {
"common_name": "example.com",
"owner": "bob@example.com",
"description": "test certificate",
"extensions": {"sub_alt_names": {"names": [x509.DNSName(x) for x in names]}},
"validity_years": 2,
"validity_end": arrow.get(2017, 10, 30),
}
data = map_fields(options, CSR_STR)
assert data == {
"certificate": {
"csr": CSR_STR,
"common_name": "example.com",
"dns_names": names,
"signature_hash": "sha256",
},
"organization": {"id": 111111},
"validity_years": 2,
}
@patch("lemur.plugins.lemur_digicert.plugin.current_app")
def test_determine_validity_years(mock_current_app):
assert plugin.determine_validity_years(1) == 1
assert plugin.determine_validity_years(0) == 1
assert plugin.determine_validity_years(3) == 1
def test_map_cis_fields(app, authority):
from lemur.plugins.lemur_digicert.plugin import map_cis_fields
names = [u"one.example.com", u"two.example.com", u"three.example.com"]
options = {
"common_name": "example.com",
"owner": "bob@example.com",
"description": "test certificate",
"extensions": {"sub_alt_names": {"names": [x509.DNSName(x) for x in names]}},
"organization": "Example, Inc.",
"organizational_unit": "Example Org",
"validity_end": arrow.get(2017, 5, 7),
"validity_start": arrow.get(2016, 10, 30),
"authority": authority,
}
data = map_cis_fields(options, CSR_STR)
assert data == {
"common_name": "example.com",
"csr": CSR_STR,
"additional_dns_names": names,
"signature_hash": "sha256",
"organization": {"name": "Example, Inc.", "units": ["Example Org"]},
"validity": {
"valid_to": arrow.get(2017, 5, 7).format("YYYY-MM-DDTHH:MM") + "Z"
},
"profile_name": None,
}
options = {
"common_name": "example.com",
"owner": "bob@example.com",
"description": "test certificate",
"extensions": {"sub_alt_names": {"names": [x509.DNSName(x) for x in names]}},
"organization": "Example, Inc.",
"organizational_unit": "Example Org",
"validity_years": 2,
"authority": authority,
}
@patch("lemur.plugins.lemur_digicert.plugin.current_app")
def test_determine_end_date(mock_current_app):
mock_current_app.config.get = Mock(return_value=397) # 397 days validity
with freeze_time(time_to_freeze=arrow.get(2016, 11, 3).datetime):
data = map_cis_fields(options, CSR_STR)
assert arrow.get(2017, 12, 5) == plugin.determine_end_date(0) # 397 days from (2016, 11, 3)
assert arrow.get(2017, 12, 5) == plugin.determine_end_date(arrow.get(2017, 12, 5))
assert arrow.get(2017, 12, 5) == plugin.determine_end_date(arrow.get(2020, 5, 7))
assert data == {
@patch("lemur.plugins.lemur_digicert.plugin.current_app")
def test_map_fields_with_validity_years(mock_current_app):
mock_current_app.config.get = Mock(side_effect=config_mock)
with patch('lemur.plugins.lemur_digicert.plugin.signature_hash') as mock_signature_hash:
mock_signature_hash.return_value = "sha256"
names = [u"one.example.com", u"two.example.com", u"three.example.com"]
options = {
"common_name": "example.com",
"owner": "bob@example.com",
"description": "test certificate",
"extensions": {"sub_alt_names": {"names": [x509.DNSName(x) for x in names]}},
"validity_years": 1
}
expected = {
"certificate": {
"csr": CSR_STR,
"common_name": "example.com",
"dns_names": names,
"signature_hash": "sha256",
},
"organization": {"id": 111111},
"validity_years": 1,
}
assert expected == plugin.map_fields(options, CSR_STR)
@patch("lemur.plugins.lemur_digicert.plugin.current_app")
def test_map_fields_with_validity_end_and_start(mock_current_app):
mock_current_app.config.get = Mock(side_effect=config_mock)
plugin.determine_end_date = Mock(return_value=arrow.get(2017, 5, 7))
with patch('lemur.plugins.lemur_digicert.plugin.signature_hash') as mock_signature_hash:
mock_signature_hash.return_value = "sha256"
names = [u"one.example.com", u"two.example.com", u"three.example.com"]
options = {
"common_name": "example.com",
"owner": "bob@example.com",
"description": "test certificate",
"extensions": {"sub_alt_names": {"names": [x509.DNSName(x) for x in names]}},
"validity_end": arrow.get(2017, 5, 7),
"validity_start": arrow.get(2016, 10, 30),
}
expected = {
"certificate": {
"csr": CSR_STR,
"common_name": "example.com",
"dns_names": names,
"signature_hash": "sha256",
},
"organization": {"id": 111111},
"custom_expiration_date": arrow.get(2017, 5, 7).format("YYYY-MM-DD"),
}
assert expected == plugin.map_fields(options, CSR_STR)
@patch("lemur.plugins.lemur_digicert.plugin.current_app")
def test_map_cis_fields_with_validity_years(mock_current_app, authority):
mock_current_app.config.get = Mock(side_effect=config_mock)
plugin.determine_end_date = Mock(return_value=arrow.get(2018, 11, 3))
with patch('lemur.plugins.lemur_digicert.plugin.signature_hash') as mock_signature_hash:
mock_signature_hash.return_value = "sha256"
names = [u"one.example.com", u"two.example.com", u"three.example.com"]
options = {
"common_name": "example.com",
"owner": "bob@example.com",
"description": "test certificate",
"extensions": {"sub_alt_names": {"names": [x509.DNSName(x) for x in names]}},
"organization": "Example, Inc.",
"organizational_unit": "Example Org",
"validity_years": 2,
"authority": authority,
}
expected = {
"common_name": "example.com",
"csr": CSR_STR,
"additional_dns_names": names,
@@ -123,21 +128,59 @@ def test_map_cis_fields(app, authority):
"profile_name": None,
}
assert expected == plugin.map_cis_fields(options, CSR_STR)
def test_signature_hash(app):
from lemur.plugins.lemur_digicert.plugin import signature_hash
assert signature_hash(None) == "sha256"
assert signature_hash("sha256WithRSA") == "sha256"
assert signature_hash("sha384WithRSA") == "sha384"
assert signature_hash("sha512WithRSA") == "sha512"
@patch("lemur.plugins.lemur_digicert.plugin.current_app")
def test_map_cis_fields_with_validity_end_and_start(mock_current_app, app, authority):
mock_current_app.config.get = Mock(side_effect=config_mock)
plugin.determine_end_date = Mock(return_value=arrow.get(2017, 5, 7))
with patch('lemur.plugins.lemur_digicert.plugin.signature_hash') as mock_signature_hash:
mock_signature_hash.return_value = "sha256"
names = [u"one.example.com", u"two.example.com", u"three.example.com"]
options = {
"common_name": "example.com",
"owner": "bob@example.com",
"description": "test certificate",
"extensions": {"sub_alt_names": {"names": [x509.DNSName(x) for x in names]}},
"organization": "Example, Inc.",
"organizational_unit": "Example Org",
"validity_end": arrow.get(2017, 5, 7),
"validity_start": arrow.get(2016, 10, 30),
"authority": authority
}
expected = {
"common_name": "example.com",
"csr": CSR_STR,
"additional_dns_names": names,
"signature_hash": "sha256",
"organization": {"name": "Example, Inc.", "units": ["Example Org"]},
"validity": {
"valid_to": arrow.get(2017, 5, 7).format("YYYY-MM-DDTHH:MM") + "Z"
},
"profile_name": None,
}
assert expected == plugin.map_cis_fields(options, CSR_STR)
@patch("lemur.plugins.lemur_digicert.plugin.current_app")
def test_signature_hash(mock_current_app, app):
mock_current_app.config.get = Mock(side_effect=config_mock)
assert plugin.signature_hash(None) == "sha256"
assert plugin.signature_hash("sha256WithRSA") == "sha256"
assert plugin.signature_hash("sha384WithRSA") == "sha384"
assert plugin.signature_hash("sha512WithRSA") == "sha512"
with pytest.raises(Exception):
signature_hash("sdfdsf")
plugin.signature_hash("sdfdsf")
def test_issuer_plugin_create_certificate(
certificate_="""\
certificate_="""\
-----BEGIN CERTIFICATE-----
abc
-----END CERTIFICATE-----

View File

@@ -75,7 +75,8 @@
</tr>
<tr>
<td style="font-family:Roboto-Regular,Helvetica,Arial,sans-serif;font-size:13px;color:#202020;line-height:1.5">
<br>This is a Lemur certificate expiration notice. Please verify that the following certificates are no longer used.
<br>This is a Lemur certificate expiration notice. Please verify that the following certificates are no longer used,
and disable notifications via the Notify toggle in Lemur, if applicable.
<table border="0" cellspacing="0" cellpadding="0"
style="margin-top:48px;margin-bottom:48px">
<tbody>

View File

@@ -14,7 +14,7 @@ import re
import hvac
from flask import current_app
from lemur.common.defaults import common_name
from lemur.common.defaults import common_name, country, state, location, organizational_unit, organization
from lemur.common.utils import parse_certificate
from lemur.plugins.bases import DestinationPlugin
from lemur.plugins.bases import SourcePlugin
@@ -58,7 +58,7 @@ class VaultSourcePlugin(SourcePlugin):
"helpMessage": "Authentication method to use",
},
{
"name": "tokenFile/VaultRole",
"name": "tokenFileOrVaultRole",
"type": "str",
"required": True,
"validation": "^([a-zA-Z0-9/._-]+/?)+$",
@@ -94,7 +94,7 @@ class VaultSourcePlugin(SourcePlugin):
body = ""
url = self.get_option("vaultUrl", options)
auth_method = self.get_option("authenticationMethod", options)
auth_key = self.get_option("tokenFile/vaultRole", options)
auth_key = self.get_option("tokenFileOrVaultRole", options)
mount = self.get_option("vaultMount", options)
path = self.get_option("vaultPath", options)
obj_name = self.get_option("objectName", options)
@@ -185,7 +185,7 @@ class VaultDestinationPlugin(DestinationPlugin):
"helpMessage": "Authentication method to use",
},
{
"name": "tokenFile/VaultRole",
"name": "tokenFileOrVaultRole",
"type": "str",
"required": True,
"validation": "^([a-zA-Z0-9/._-]+/?)+$",
@@ -202,15 +202,15 @@ class VaultDestinationPlugin(DestinationPlugin):
"name": "vaultPath",
"type": "str",
"required": True,
"validation": "^([a-zA-Z0-9._-]+/?)+$",
"helpMessage": "Must be a valid Vault secrets path",
"validation": "^(([a-zA-Z0-9._-]+|{(CN|OU|O|L|S|C)})+/?)+$",
"helpMessage": "Must be a valid Vault secrets path. Support vars: {CN|OU|O|L|S|C}",
},
{
"name": "objectName",
"type": "str",
"required": False,
"validation": "[0-9a-zA-Z.:_-]+",
"helpMessage": "Name to bundle certs under, if blank use cn",
"validation": "^([0-9a-zA-Z.:_-]+|{(CN|OU|O|L|S|C)})+$",
"helpMessage": "Name to bundle certs under, if blank use {CN}. Support vars: {CN|OU|O|L|S|C}",
},
{
"name": "bundleChain",
@@ -241,11 +241,12 @@ class VaultDestinationPlugin(DestinationPlugin):
:param cert_chain:
:return:
"""
cname = common_name(parse_certificate(body))
cert = parse_certificate(body)
cname = common_name(cert)
url = self.get_option("vaultUrl", options)
auth_method = self.get_option("authenticationMethod", options)
auth_key = self.get_option("tokenFile/vaultRole", options)
auth_key = self.get_option("tokenFileOrVaultRole", options)
mount = self.get_option("vaultMount", options)
path = self.get_option("vaultPath", options)
bundle = self.get_option("bundleChain", options)
@@ -285,10 +286,27 @@ class VaultDestinationPlugin(DestinationPlugin):
client.secrets.kv.default_kv_version = api_version
if obj_name:
path = "{0}/{1}".format(path, obj_name)
else:
path = "{0}/{1}".format(path, cname)
t_path = path.format(
CN=cname,
OU=organizational_unit(cert),
O=organization(cert), # noqa: E741
L=location(cert),
S=state(cert),
C=country(cert)
)
if not obj_name:
obj_name = '{CN}'
f_obj_name = obj_name.format(
CN=cname,
OU=organizational_unit(cert),
O=organization(cert), # noqa: E741
L=location(cert),
S=state(cert),
C=country(cert)
)
path = "{0}/{1}".format(t_path, f_obj_name)
secret = get_secret(client, mount, path)
secret["data"][cname] = {}