Refactor AcmeHandler, Move DNS stuff into AcmeDnsHandler
This commit is contained in:
parent
76dcfbd528
commit
e3e5ef7d66
@ -48,33 +48,6 @@ class AuthorizationRecord(object):
|
||||
|
||||
|
||||
class AcmeHandler(object):
|
||||
def __init__(self):
|
||||
self.dns_providers_for_domain = {}
|
||||
try:
|
||||
self.all_dns_providers = dns_provider_service.get_all_dns_providers()
|
||||
except Exception as e:
|
||||
metrics.send("AcmeHandler_init_error", "counter", 1)
|
||||
sentry.captureException()
|
||||
current_app.logger.error(f"Unable to fetch DNS Providers: {e}")
|
||||
self.all_dns_providers = []
|
||||
|
||||
def get_dns_challenges(self, host, authorizations):
|
||||
"""Get dns challenges for provided domain"""
|
||||
|
||||
domain_to_validate, is_wildcard = self.strip_wildcard(host)
|
||||
dns_challenges = []
|
||||
for authz in authorizations:
|
||||
if not authz.body.identifier.value.lower() == domain_to_validate.lower():
|
||||
continue
|
||||
if is_wildcard and not authz.body.wildcard:
|
||||
continue
|
||||
if not is_wildcard and authz.body.wildcard:
|
||||
continue
|
||||
for combo in authz.body.challenges:
|
||||
if isinstance(combo.chall, challenges.DNS01):
|
||||
dns_challenges.append(combo)
|
||||
|
||||
return dns_challenges
|
||||
|
||||
def strip_wildcard(self, host):
|
||||
"""Removes the leading *. and returns Host and whether it was removed or not (True/False)"""
|
||||
@ -90,91 +63,6 @@ class AcmeHandler(object):
|
||||
host = host + dns_provider_options.get("acme_challenge_extension")
|
||||
return host
|
||||
|
||||
def start_dns_challenge(
|
||||
self,
|
||||
acme_client,
|
||||
account_number,
|
||||
host,
|
||||
dns_provider,
|
||||
order,
|
||||
dns_provider_options,
|
||||
):
|
||||
current_app.logger.debug("Starting DNS challenge for {0}".format(host))
|
||||
|
||||
change_ids = []
|
||||
dns_challenges = self.get_dns_challenges(host, order.authorizations)
|
||||
host_to_validate, _ = self.strip_wildcard(host)
|
||||
host_to_validate = self.maybe_add_extension(
|
||||
host_to_validate, dns_provider_options
|
||||
)
|
||||
|
||||
if not dns_challenges:
|
||||
sentry.captureException()
|
||||
metrics.send("start_dns_challenge_error_no_dns_challenges", "counter", 1)
|
||||
raise Exception("Unable to determine DNS challenges from authorizations")
|
||||
|
||||
for dns_challenge in dns_challenges:
|
||||
change_id = dns_provider.create_txt_record(
|
||||
dns_challenge.validation_domain_name(host_to_validate),
|
||||
dns_challenge.validation(acme_client.client.net.key),
|
||||
account_number,
|
||||
)
|
||||
change_ids.append(change_id)
|
||||
|
||||
return AuthorizationRecord(
|
||||
host, order.authorizations, dns_challenges, change_ids
|
||||
)
|
||||
|
||||
def complete_dns_challenge(self, acme_client, authz_record):
|
||||
current_app.logger.debug(
|
||||
"Finalizing DNS challenge for {0}".format(
|
||||
authz_record.authz[0].body.identifier.value
|
||||
)
|
||||
)
|
||||
dns_providers = self.dns_providers_for_domain.get(authz_record.host)
|
||||
if not dns_providers:
|
||||
metrics.send("complete_dns_challenge_error_no_dnsproviders", "counter", 1)
|
||||
raise Exception(
|
||||
"No DNS providers found for domain: {}".format(authz_record.host)
|
||||
)
|
||||
|
||||
for dns_provider in dns_providers:
|
||||
# Grab account number (For Route53)
|
||||
dns_provider_options = json.loads(dns_provider.credentials)
|
||||
account_number = dns_provider_options.get("account_id")
|
||||
dns_provider_plugin = self.get_dns_provider(dns_provider.provider_type)
|
||||
for change_id in authz_record.change_id:
|
||||
try:
|
||||
dns_provider_plugin.wait_for_dns_change(
|
||||
change_id, account_number=account_number
|
||||
)
|
||||
except Exception:
|
||||
metrics.send("complete_dns_challenge_error", "counter", 1)
|
||||
sentry.captureException()
|
||||
current_app.logger.debug(
|
||||
f"Unable to resolve DNS challenge for change_id: {change_id}, account_id: "
|
||||
f"{account_number}",
|
||||
exc_info=True,
|
||||
)
|
||||
raise
|
||||
|
||||
for dns_challenge in authz_record.dns_challenge:
|
||||
response = dns_challenge.response(acme_client.client.net.key)
|
||||
|
||||
verified = response.simple_verify(
|
||||
dns_challenge.chall,
|
||||
authz_record.host,
|
||||
acme_client.client.net.key.public_key(),
|
||||
)
|
||||
|
||||
if not verified:
|
||||
metrics.send("complete_dns_challenge_verification_error", "counter", 1)
|
||||
raise ValueError("Failed verification")
|
||||
|
||||
time.sleep(5)
|
||||
res = acme_client.answer_challenge(dns_challenge, response)
|
||||
current_app.logger.debug(f"answer_challenge response: {res}")
|
||||
|
||||
def request_certificate(self, acme_client, authorizations, order):
|
||||
for authorization in authorizations:
|
||||
for authz in authorization.authz:
|
||||
@ -310,6 +198,135 @@ class AcmeHandler(object):
|
||||
current_app.logger.debug("Got these domains: {0}".format(domains))
|
||||
return domains
|
||||
|
||||
|
||||
class AcmeDnsHandler(AcmeHandler):
|
||||
|
||||
def __init__(self):
|
||||
self.dns_providers_for_domain = {}
|
||||
try:
|
||||
self.all_dns_providers = dns_provider_service.get_all_dns_providers()
|
||||
except Exception as e:
|
||||
metrics.send("AcmeHandler_init_error", "counter", 1)
|
||||
sentry.captureException()
|
||||
current_app.logger.error(f"Unable to fetch DNS Providers: {e}")
|
||||
self.all_dns_providers = []
|
||||
|
||||
def get_dns_challenges(self, host, authorizations):
|
||||
"""Get dns challenges for provided domain"""
|
||||
|
||||
domain_to_validate, is_wildcard = self.strip_wildcard(host)
|
||||
dns_challenges = []
|
||||
for authz in authorizations:
|
||||
if not authz.body.identifier.value.lower() == domain_to_validate.lower():
|
||||
continue
|
||||
if is_wildcard and not authz.body.wildcard:
|
||||
continue
|
||||
if not is_wildcard and authz.body.wildcard:
|
||||
continue
|
||||
for combo in authz.body.challenges:
|
||||
if isinstance(combo.chall, challenges.DNS01):
|
||||
dns_challenges.append(combo)
|
||||
|
||||
return dns_challenges
|
||||
|
||||
def get_dns_provider(self, type):
|
||||
provider_types = {
|
||||
"cloudflare": cloudflare,
|
||||
"dyn": dyn,
|
||||
"route53": route53,
|
||||
"ultradns": ultradns,
|
||||
"powerdns": powerdns
|
||||
}
|
||||
provider = provider_types.get(type)
|
||||
if not provider:
|
||||
raise UnknownProvider("No such DNS provider: {}".format(type))
|
||||
return provider
|
||||
|
||||
def start_dns_challenge(
|
||||
self,
|
||||
acme_client,
|
||||
account_number,
|
||||
host,
|
||||
dns_provider,
|
||||
order,
|
||||
dns_provider_options,
|
||||
):
|
||||
current_app.logger.debug("Starting DNS challenge for {0}".format(host))
|
||||
|
||||
change_ids = []
|
||||
dns_challenges = self.get_dns_challenges(host, order.authorizations)
|
||||
host_to_validate, _ = self.strip_wildcard(host)
|
||||
host_to_validate = self.maybe_add_extension(
|
||||
host_to_validate, dns_provider_options
|
||||
)
|
||||
|
||||
if not dns_challenges:
|
||||
sentry.captureException()
|
||||
metrics.send("start_dns_challenge_error_no_dns_challenges", "counter", 1)
|
||||
raise Exception("Unable to determine DNS challenges from authorizations")
|
||||
|
||||
for dns_challenge in dns_challenges:
|
||||
change_id = dns_provider.create_txt_record(
|
||||
dns_challenge.validation_domain_name(host_to_validate),
|
||||
dns_challenge.validation(acme_client.client.net.key),
|
||||
account_number,
|
||||
)
|
||||
change_ids.append(change_id)
|
||||
|
||||
return AuthorizationRecord(
|
||||
host, order.authorizations, dns_challenges, change_ids
|
||||
)
|
||||
|
||||
def complete_dns_challenge(self, acme_client, authz_record):
|
||||
current_app.logger.debug(
|
||||
"Finalizing DNS challenge for {0}".format(
|
||||
authz_record.authz[0].body.identifier.value
|
||||
)
|
||||
)
|
||||
dns_providers = self.dns_providers_for_domain.get(authz_record.host)
|
||||
if not dns_providers:
|
||||
metrics.send("complete_dns_challenge_error_no_dnsproviders", "counter", 1)
|
||||
raise Exception(
|
||||
"No DNS providers found for domain: {}".format(authz_record.host)
|
||||
)
|
||||
|
||||
for dns_provider in dns_providers:
|
||||
# Grab account number (For Route53)
|
||||
dns_provider_options = json.loads(dns_provider.credentials)
|
||||
account_number = dns_provider_options.get("account_id")
|
||||
dns_provider_plugin = self.get_dns_provider(dns_provider.provider_type)
|
||||
for change_id in authz_record.change_id:
|
||||
try:
|
||||
dns_provider_plugin.wait_for_dns_change(
|
||||
change_id, account_number=account_number
|
||||
)
|
||||
except Exception:
|
||||
metrics.send("complete_dns_challenge_error", "counter", 1)
|
||||
sentry.captureException()
|
||||
current_app.logger.debug(
|
||||
f"Unable to resolve DNS challenge for change_id: {change_id}, account_id: "
|
||||
f"{account_number}",
|
||||
exc_info=True,
|
||||
)
|
||||
raise
|
||||
|
||||
for dns_challenge in authz_record.dns_challenge:
|
||||
response = dns_challenge.response(acme_client.client.net.key)
|
||||
|
||||
verified = response.simple_verify(
|
||||
dns_challenge.chall,
|
||||
authz_record.host,
|
||||
acme_client.client.net.key.public_key(),
|
||||
)
|
||||
|
||||
if not verified:
|
||||
metrics.send("complete_dns_challenge_verification_error", "counter", 1)
|
||||
raise ValueError("Failed verification")
|
||||
|
||||
time.sleep(5)
|
||||
res = acme_client.answer_challenge(dns_challenge, response)
|
||||
current_app.logger.debug(f"answer_challenge response: {res}")
|
||||
|
||||
def get_authorizations(self, acme_client, order, order_info):
|
||||
authorizations = []
|
||||
|
||||
@ -421,19 +438,6 @@ class AcmeHandler(object):
|
||||
sentry.captureException()
|
||||
pass
|
||||
|
||||
def get_dns_provider(self, type):
|
||||
provider_types = {
|
||||
"cloudflare": cloudflare,
|
||||
"dyn": dyn,
|
||||
"route53": route53,
|
||||
"ultradns": ultradns,
|
||||
"powerdns": powerdns
|
||||
}
|
||||
provider = provider_types.get(type)
|
||||
if not provider:
|
||||
raise UnknownProvider("No such DNS provider: {}".format(type))
|
||||
return provider
|
||||
|
||||
|
||||
class ACMEIssuerPlugin(IssuerPlugin):
|
||||
title = "Acme"
|
||||
@ -487,7 +491,7 @@ class ACMEIssuerPlugin(IssuerPlugin):
|
||||
super(ACMEIssuerPlugin, self).__init__(*args, **kwargs)
|
||||
|
||||
def get_dns_provider(self, type):
|
||||
self.acme = AcmeHandler()
|
||||
self.acme = AcmeDnsHandler()
|
||||
|
||||
provider_types = {
|
||||
"cloudflare": cloudflare,
|
||||
@ -502,14 +506,14 @@ class ACMEIssuerPlugin(IssuerPlugin):
|
||||
return provider
|
||||
|
||||
def get_all_zones(self, dns_provider):
|
||||
self.acme = AcmeHandler()
|
||||
self.acme = AcmeDnsHandler()
|
||||
dns_provider_options = json.loads(dns_provider.credentials)
|
||||
account_number = dns_provider_options.get("account_id")
|
||||
dns_provider_plugin = self.get_dns_provider(dns_provider.provider_type)
|
||||
return dns_provider_plugin.get_zones(account_number=account_number)
|
||||
|
||||
def get_ordered_certificate(self, pending_cert):
|
||||
self.acme = AcmeHandler()
|
||||
self.acme = AcmeDnsHandler()
|
||||
acme_client, registration = self.acme.setup_acme_client(pending_cert.authority)
|
||||
order_info = authorization_service.get(pending_cert.external_id)
|
||||
if pending_cert.dns_provider_id:
|
||||
@ -555,7 +559,7 @@ class ACMEIssuerPlugin(IssuerPlugin):
|
||||
return cert
|
||||
|
||||
def get_ordered_certificates(self, pending_certs):
|
||||
self.acme = AcmeHandler()
|
||||
self.acme = AcmeDnsHandler()
|
||||
pending = []
|
||||
certs = []
|
||||
for pending_cert in pending_certs:
|
||||
@ -665,7 +669,7 @@ class ACMEIssuerPlugin(IssuerPlugin):
|
||||
:param issuer_options:
|
||||
:return: :raise Exception:
|
||||
"""
|
||||
self.acme = AcmeHandler()
|
||||
self.acme = AcmeDnsHandler()
|
||||
authority = issuer_options.get("authority")
|
||||
create_immediately = issuer_options.get("create_immediately", False)
|
||||
acme_client, registration = self.acme.setup_acme_client(authority)
|
||||
|
@ -12,7 +12,7 @@ class TestAcmeDns(unittest.TestCase):
|
||||
@patch("lemur.plugins.lemur_acme.plugin.dns_provider_service")
|
||||
def setUp(self, mock_dns_provider_service):
|
||||
self.ACMEIssuerPlugin = plugin.ACMEIssuerPlugin()
|
||||
self.acme = plugin.AcmeHandler()
|
||||
self.acme = plugin.AcmeDnsHandler()
|
||||
mock_dns_provider = Mock()
|
||||
mock_dns_provider.name = "cloudflare"
|
||||
mock_dns_provider.credentials = "{}"
|
||||
@ -42,7 +42,7 @@ class TestAcmeDns(unittest.TestCase):
|
||||
@patch("acme.client.Client")
|
||||
@patch("lemur.plugins.lemur_acme.plugin.current_app")
|
||||
@patch("lemur.plugins.lemur_acme.plugin.len", return_value=1)
|
||||
@patch("lemur.plugins.lemur_acme.plugin.AcmeHandler.get_dns_challenges")
|
||||
@patch("lemur.plugins.lemur_acme.plugin.AcmeDnsHandler.get_dns_challenges")
|
||||
def test_start_dns_challenge(
|
||||
self, mock_get_dns_challenges, mock_len, mock_app, mock_acme
|
||||
):
|
||||
@ -124,7 +124,7 @@ class TestAcmeDns(unittest.TestCase):
|
||||
@patch("acme.client.Client")
|
||||
@patch("OpenSSL.crypto", return_value="mock_cert")
|
||||
@patch("josepy.util.ComparableX509")
|
||||
@patch("lemur.plugins.lemur_acme.plugin.AcmeHandler.get_dns_challenges")
|
||||
@patch("lemur.plugins.lemur_acme.plugin.AcmeDnsHandler.get_dns_challenges")
|
||||
@patch("lemur.plugins.lemur_acme.plugin.current_app")
|
||||
def test_request_certificate(
|
||||
self,
|
||||
@ -326,9 +326,9 @@ class TestAcmeDns(unittest.TestCase):
|
||||
@patch("lemur.plugins.lemur_acme.plugin.AcmeHandler.setup_acme_client")
|
||||
@patch("lemur.plugins.lemur_acme.plugin.dns_provider_service")
|
||||
@patch("lemur.plugins.lemur_acme.plugin.current_app")
|
||||
@patch("lemur.plugins.lemur_acme.plugin.AcmeHandler.get_authorizations")
|
||||
@patch("lemur.plugins.lemur_acme.plugin.AcmeHandler.finalize_authorizations")
|
||||
@patch("lemur.plugins.lemur_acme.plugin.AcmeHandler.request_certificate")
|
||||
@patch("lemur.plugins.lemur_acme.plugin.AcmeDnsHandler.get_authorizations")
|
||||
@patch("lemur.plugins.lemur_acme.plugin.AcmeDnsHandler.finalize_authorizations")
|
||||
@patch("lemur.plugins.lemur_acme.plugin.AcmeDnsHandler.request_certificate")
|
||||
@patch("lemur.plugins.lemur_acme.plugin.authorization_service")
|
||||
def test_create_certificate(
|
||||
self,
|
||||
@ -360,3 +360,110 @@ class TestAcmeDns(unittest.TestCase):
|
||||
mock_request_certificate.return_value = ("pem_certificate", "chain")
|
||||
result = provider.create_certificate(csr, issuer_options)
|
||||
assert result
|
||||
|
||||
@patch(
|
||||
"lemur.plugins.lemur_acme.plugin.AcmeDnsHandler.start_dns_challenge",
|
||||
return_value="test",
|
||||
)
|
||||
def test_get_authorizations(self, mock_start_dns_challenge):
|
||||
mock_order = Mock()
|
||||
mock_order.body.identifiers = []
|
||||
mock_domain = Mock()
|
||||
mock_order.body.identifiers.append(mock_domain)
|
||||
mock_order_info = Mock()
|
||||
mock_order_info.account_number = 1
|
||||
mock_order_info.domains = ["test.fakedomain.net"]
|
||||
result = self.acme.get_authorizations(
|
||||
"acme_client", mock_order, mock_order_info
|
||||
)
|
||||
self.assertEqual(result, ["test"])
|
||||
|
||||
@patch(
|
||||
"lemur.plugins.lemur_acme.plugin.AcmeDnsHandler.complete_dns_challenge",
|
||||
return_value="test",
|
||||
)
|
||||
def test_finalize_authorizations(self, mock_complete_dns_challenge):
|
||||
mock_authz = []
|
||||
mock_authz_record = MagicMock()
|
||||
mock_authz_record.authz = Mock()
|
||||
mock_authz_record.change_id = 1
|
||||
mock_authz_record.dns_challenge.validation_domain_name = Mock()
|
||||
mock_authz_record.dns_challenge.validation = Mock()
|
||||
mock_authz.append(mock_authz_record)
|
||||
mock_dns_provider = Mock()
|
||||
mock_dns_provider.delete_txt_record = Mock()
|
||||
|
||||
mock_acme_client = Mock()
|
||||
result = self.acme.finalize_authorizations(mock_acme_client, mock_authz)
|
||||
self.assertEqual(result, mock_authz)
|
||||
|
||||
@patch("lemur.plugins.lemur_acme.plugin.AcmeHandler.setup_acme_client")
|
||||
@patch("lemur.plugins.lemur_acme.plugin.current_app")
|
||||
@patch("lemur.plugins.lemur_acme.plugin.authorization_service")
|
||||
@patch("lemur.plugins.lemur_acme.plugin.dns_provider_service")
|
||||
@patch("lemur.plugins.lemur_acme.plugin.AcmeDnsHandler.get_authorizations")
|
||||
@patch("lemur.plugins.lemur_acme.plugin.AcmeDnsHandler.finalize_authorizations")
|
||||
@patch("lemur.plugins.lemur_acme.plugin.AcmeDnsHandler.request_certificate")
|
||||
def test_get_ordered_certificate(
|
||||
self,
|
||||
mock_request_certificate,
|
||||
mock_finalize_authorizations,
|
||||
mock_get_authorizations,
|
||||
mock_dns_provider_service,
|
||||
mock_authorization_service,
|
||||
mock_current_app,
|
||||
mock_acme,
|
||||
):
|
||||
mock_client = Mock()
|
||||
mock_acme.return_value = (mock_client, "")
|
||||
mock_request_certificate.return_value = ("pem_certificate", "chain")
|
||||
|
||||
mock_cert = Mock()
|
||||
mock_cert.external_id = 1
|
||||
|
||||
provider = plugin.ACMEIssuerPlugin()
|
||||
provider.get_dns_provider = Mock()
|
||||
result = provider.get_ordered_certificate(mock_cert)
|
||||
self.assertEqual(
|
||||
result, {"body": "pem_certificate", "chain": "chain", "external_id": "1"}
|
||||
)
|
||||
|
||||
@patch("lemur.plugins.lemur_acme.plugin.AcmeHandler.setup_acme_client")
|
||||
@patch("lemur.plugins.lemur_acme.plugin.current_app")
|
||||
@patch("lemur.plugins.lemur_acme.plugin.authorization_service")
|
||||
@patch("lemur.plugins.lemur_acme.plugin.dns_provider_service")
|
||||
@patch("lemur.plugins.lemur_acme.plugin.AcmeDnsHandler.get_authorizations")
|
||||
@patch("lemur.plugins.lemur_acme.plugin.AcmeDnsHandler.finalize_authorizations")
|
||||
@patch("lemur.plugins.lemur_acme.plugin.AcmeDnsHandler.request_certificate")
|
||||
def test_get_ordered_certificates(
|
||||
self,
|
||||
mock_request_certificate,
|
||||
mock_finalize_authorizations,
|
||||
mock_get_authorizations,
|
||||
mock_dns_provider_service,
|
||||
mock_authorization_service,
|
||||
mock_current_app,
|
||||
mock_acme,
|
||||
):
|
||||
mock_client = Mock()
|
||||
mock_acme.return_value = (mock_client, "")
|
||||
mock_request_certificate.return_value = ("pem_certificate", "chain")
|
||||
|
||||
mock_cert = Mock()
|
||||
mock_cert.external_id = 1
|
||||
|
||||
mock_cert2 = Mock()
|
||||
mock_cert2.external_id = 2
|
||||
|
||||
provider = plugin.ACMEIssuerPlugin()
|
||||
provider.get_dns_provider = Mock()
|
||||
result = provider.get_ordered_certificates([mock_cert, mock_cert2])
|
||||
self.assertEqual(len(result), 2)
|
||||
self.assertEqual(
|
||||
result[0]["cert"],
|
||||
{"body": "pem_certificate", "chain": "chain", "external_id": "1"},
|
||||
)
|
||||
self.assertEqual(
|
||||
result[1]["cert"],
|
||||
{"body": "pem_certificate", "chain": "chain", "external_id": "2"},
|
||||
)
|
||||
|
@ -3,21 +3,11 @@ from unittest.mock import patch, Mock
|
||||
|
||||
from cryptography.x509 import DNSName
|
||||
from lemur.plugins.lemur_acme import plugin
|
||||
from mock import MagicMock
|
||||
|
||||
|
||||
class TestAcmeHandler(unittest.TestCase):
|
||||
@patch("lemur.plugins.lemur_acme.plugin.dns_provider_service")
|
||||
def setUp(self, mock_dns_provider_service):
|
||||
def setUp(self):
|
||||
self.acme = plugin.AcmeHandler()
|
||||
mock_dns_provider = Mock()
|
||||
mock_dns_provider.name = "cloudflare"
|
||||
mock_dns_provider.credentials = "{}"
|
||||
mock_dns_provider.provider_type = "cloudflare"
|
||||
self.acme.dns_providers_for_domain = {
|
||||
"www.test.com": [mock_dns_provider],
|
||||
"test.fakedomain.net": [mock_dns_provider],
|
||||
}
|
||||
|
||||
def test_strip_wildcard(self):
|
||||
expected = ("example.com", False)
|
||||
@ -85,110 +75,3 @@ class TestAcmeHandler(unittest.TestCase):
|
||||
self.assertEqual(
|
||||
result, [options["common_name"], "test2.netflix.net"]
|
||||
)
|
||||
|
||||
@patch(
|
||||
"lemur.plugins.lemur_acme.plugin.AcmeHandler.start_dns_challenge",
|
||||
return_value="test",
|
||||
)
|
||||
def test_get_authorizations(self, mock_start_dns_challenge):
|
||||
mock_order = Mock()
|
||||
mock_order.body.identifiers = []
|
||||
mock_domain = Mock()
|
||||
mock_order.body.identifiers.append(mock_domain)
|
||||
mock_order_info = Mock()
|
||||
mock_order_info.account_number = 1
|
||||
mock_order_info.domains = ["test.fakedomain.net"]
|
||||
result = self.acme.get_authorizations(
|
||||
"acme_client", mock_order, mock_order_info
|
||||
)
|
||||
self.assertEqual(result, ["test"])
|
||||
|
||||
@patch(
|
||||
"lemur.plugins.lemur_acme.plugin.AcmeHandler.complete_dns_challenge",
|
||||
return_value="test",
|
||||
)
|
||||
def test_finalize_authorizations(self, mock_complete_dns_challenge):
|
||||
mock_authz = []
|
||||
mock_authz_record = MagicMock()
|
||||
mock_authz_record.authz = Mock()
|
||||
mock_authz_record.change_id = 1
|
||||
mock_authz_record.dns_challenge.validation_domain_name = Mock()
|
||||
mock_authz_record.dns_challenge.validation = Mock()
|
||||
mock_authz.append(mock_authz_record)
|
||||
mock_dns_provider = Mock()
|
||||
mock_dns_provider.delete_txt_record = Mock()
|
||||
|
||||
mock_acme_client = Mock()
|
||||
result = self.acme.finalize_authorizations(mock_acme_client, mock_authz)
|
||||
self.assertEqual(result, mock_authz)
|
||||
|
||||
@patch("lemur.plugins.lemur_acme.plugin.AcmeHandler.setup_acme_client")
|
||||
@patch("lemur.plugins.lemur_acme.plugin.current_app")
|
||||
@patch("lemur.plugins.lemur_acme.plugin.authorization_service")
|
||||
@patch("lemur.plugins.lemur_acme.plugin.dns_provider_service")
|
||||
@patch("lemur.plugins.lemur_acme.plugin.AcmeHandler.get_authorizations")
|
||||
@patch("lemur.plugins.lemur_acme.plugin.AcmeHandler.finalize_authorizations")
|
||||
@patch("lemur.plugins.lemur_acme.plugin.AcmeHandler.request_certificate")
|
||||
def test_get_ordered_certificate(
|
||||
self,
|
||||
mock_request_certificate,
|
||||
mock_finalize_authorizations,
|
||||
mock_get_authorizations,
|
||||
mock_dns_provider_service,
|
||||
mock_authorization_service,
|
||||
mock_current_app,
|
||||
mock_acme,
|
||||
):
|
||||
mock_client = Mock()
|
||||
mock_acme.return_value = (mock_client, "")
|
||||
mock_request_certificate.return_value = ("pem_certificate", "chain")
|
||||
|
||||
mock_cert = Mock()
|
||||
mock_cert.external_id = 1
|
||||
|
||||
provider = plugin.ACMEIssuerPlugin()
|
||||
provider.get_dns_provider = Mock()
|
||||
result = provider.get_ordered_certificate(mock_cert)
|
||||
self.assertEqual(
|
||||
result, {"body": "pem_certificate", "chain": "chain", "external_id": "1"}
|
||||
)
|
||||
|
||||
@patch("lemur.plugins.lemur_acme.plugin.AcmeHandler.setup_acme_client")
|
||||
@patch("lemur.plugins.lemur_acme.plugin.current_app")
|
||||
@patch("lemur.plugins.lemur_acme.plugin.authorization_service")
|
||||
@patch("lemur.plugins.lemur_acme.plugin.dns_provider_service")
|
||||
@patch("lemur.plugins.lemur_acme.plugin.AcmeHandler.get_authorizations")
|
||||
@patch("lemur.plugins.lemur_acme.plugin.AcmeHandler.finalize_authorizations")
|
||||
@patch("lemur.plugins.lemur_acme.plugin.AcmeHandler.request_certificate")
|
||||
def test_get_ordered_certificates(
|
||||
self,
|
||||
mock_request_certificate,
|
||||
mock_finalize_authorizations,
|
||||
mock_get_authorizations,
|
||||
mock_dns_provider_service,
|
||||
mock_authorization_service,
|
||||
mock_current_app,
|
||||
mock_acme,
|
||||
):
|
||||
mock_client = Mock()
|
||||
mock_acme.return_value = (mock_client, "")
|
||||
mock_request_certificate.return_value = ("pem_certificate", "chain")
|
||||
|
||||
mock_cert = Mock()
|
||||
mock_cert.external_id = 1
|
||||
|
||||
mock_cert2 = Mock()
|
||||
mock_cert2.external_id = 2
|
||||
|
||||
provider = plugin.ACMEIssuerPlugin()
|
||||
provider.get_dns_provider = Mock()
|
||||
result = provider.get_ordered_certificates([mock_cert, mock_cert2])
|
||||
self.assertEqual(len(result), 2)
|
||||
self.assertEqual(
|
||||
result[0]["cert"],
|
||||
{"body": "pem_certificate", "chain": "chain", "external_id": "1"},
|
||||
)
|
||||
self.assertEqual(
|
||||
result[1]["cert"],
|
||||
{"body": "pem_certificate", "chain": "chain", "external_id": "2"},
|
||||
)
|
||||
|
@ -6,19 +6,11 @@ from lemur.plugins.lemur_acme import plugin
|
||||
|
||||
|
||||
class TestAcmeHttp(unittest.TestCase):
|
||||
@patch("lemur.plugins.lemur_acme.plugin.dns_provider_service")
|
||||
@patch("lemur.plugins.lemur_acme.plugin.destination_service")
|
||||
def setUp(self, mock_dns_provider_service, mock_destination_provider):
|
||||
def setUp(self, mock_destination_provider):
|
||||
self.ACMEHttpIssuerPlugin = plugin.ACMEHttpIssuerPlugin()
|
||||
self.acme = plugin.AcmeHandler()
|
||||
mock_dns_provider = Mock()
|
||||
mock_dns_provider.name = "cloudflare"
|
||||
mock_dns_provider.credentials = "{}"
|
||||
mock_dns_provider.provider_type = "cloudflare"
|
||||
self.acme.dns_providers_for_domain = {
|
||||
"www.test.com": [mock_dns_provider],
|
||||
"test.fakedomain.net": [mock_dns_provider],
|
||||
}
|
||||
|
||||
mock_destination_provider = Mock()
|
||||
mock_destination_provider.label = "mock-sftp-destination"
|
||||
mock_destination_provider.plugin_name = "sftp-destination"
|
||||
@ -38,20 +30,14 @@ class TestAcmeHttp(unittest.TestCase):
|
||||
@patch("lemur.plugins.lemur_acme.plugin.AcmeHandler.setup_acme_client")
|
||||
@patch("lemur.plugins.base.manager.PluginManager.get")
|
||||
@patch("lemur.plugins.lemur_acme.plugin.destination_service")
|
||||
@patch("lemur.plugins.lemur_acme.plugin.dns_provider_service")
|
||||
@patch("lemur.plugins.lemur_acme.plugin.current_app")
|
||||
@patch("lemur.plugins.lemur_acme.plugin.AcmeHandler.get_authorizations")
|
||||
@patch("lemur.plugins.lemur_acme.plugin.AcmeHandler.finalize_authorizations")
|
||||
@patch("lemur.plugins.lemur_acme.plugin.AcmeHandler.request_certificate")
|
||||
@patch("lemur.plugins.lemur_acme.plugin.authorization_service")
|
||||
def test_create_certificate(
|
||||
self,
|
||||
mock_authorization_service,
|
||||
mock_request_certificate,
|
||||
mock_finalize_authorizations,
|
||||
mock_get_authorizations,
|
||||
mock_current_app,
|
||||
mock_dns_provider_service,
|
||||
mock_destination_service,
|
||||
mock_plugin_manager_get,
|
||||
mock_acme,
|
||||
@ -91,20 +77,14 @@ class TestAcmeHttp(unittest.TestCase):
|
||||
@patch("lemur.plugins.lemur_acme.plugin.AcmeHandler.setup_acme_client")
|
||||
@patch("lemur.plugins.base.manager.PluginManager.get")
|
||||
@patch("lemur.plugins.lemur_acme.plugin.destination_service")
|
||||
@patch("lemur.plugins.lemur_acme.plugin.dns_provider_service")
|
||||
@patch("lemur.plugins.lemur_acme.plugin.current_app")
|
||||
@patch("lemur.plugins.lemur_acme.plugin.AcmeHandler.get_authorizations")
|
||||
@patch("lemur.plugins.lemur_acme.plugin.AcmeHandler.finalize_authorizations")
|
||||
@patch("lemur.plugins.lemur_acme.plugin.AcmeHandler.request_certificate")
|
||||
@patch("lemur.plugins.lemur_acme.plugin.authorization_service")
|
||||
def test_create_certificate_missing_destination_token(
|
||||
self,
|
||||
mock_authorization_service,
|
||||
mock_request_certificate,
|
||||
mock_finalize_authorizations,
|
||||
mock_get_authorizations,
|
||||
mock_current_app,
|
||||
mock_dns_provider_service,
|
||||
mock_destination_service,
|
||||
mock_plugin_manager_get,
|
||||
mock_acme,
|
||||
@ -145,20 +125,14 @@ class TestAcmeHttp(unittest.TestCase):
|
||||
@patch("lemur.plugins.lemur_acme.plugin.AcmeHandler.setup_acme_client")
|
||||
@patch("lemur.plugins.base.manager.PluginManager.get")
|
||||
@patch("lemur.plugins.lemur_acme.plugin.destination_service")
|
||||
@patch("lemur.plugins.lemur_acme.plugin.dns_provider_service")
|
||||
@patch("lemur.plugins.lemur_acme.plugin.current_app")
|
||||
@patch("lemur.plugins.lemur_acme.plugin.AcmeHandler.get_authorizations")
|
||||
@patch("lemur.plugins.lemur_acme.plugin.AcmeHandler.finalize_authorizations")
|
||||
@patch("lemur.plugins.lemur_acme.plugin.AcmeHandler.request_certificate")
|
||||
@patch("lemur.plugins.lemur_acme.plugin.authorization_service")
|
||||
def test_create_certificate_missing_http_challenge(
|
||||
self,
|
||||
mock_authorization_service,
|
||||
mock_request_certificate,
|
||||
mock_finalize_authorizations,
|
||||
mock_get_authorizations,
|
||||
mock_current_app,
|
||||
mock_dns_provider_service,
|
||||
mock_destination_service,
|
||||
mock_plugin_manager_get,
|
||||
mock_acme,
|
||||
|
Loading…
Reference in New Issue
Block a user