Merge branch 'entrust-plugin' of github.com:sirferl/lemur into azure-plugin

This commit is contained in:
sirferl
2020-11-24 12:17:14 +01:00
11 changed files with 125 additions and 81 deletions

View File

@ -20,14 +20,14 @@ class NotificationPlugin(Plugin):
def send(self, notification_type, message, targets, options, **kwargs):
raise NotImplementedError
def filter_recipients(self, options, excluded_recipients):
def get_recipients(self, options, additional_recipients):
"""
Given a set of options (which should include configured recipient info), filters out recipients that
we do NOT want to notify.
Given a set of options (which should include configured recipient info), returns the parsed list of recipients
from those options plus the additional recipients specified. The returned value has no duplicates.
For any notification types where recipients can't be dynamically modified, this returns an empty list.
For any notification types where recipients can't be dynamically modified, this returns only the additional recipients.
"""
return []
return additional_recipients
class ExpirationNotificationPlugin(NotificationPlugin):

View File

@ -224,7 +224,7 @@ class AcmeHandler(object):
def revoke_certificate(self, certificate):
if not self.reuse_account(certificate.authority):
raise InvalidConfiguration("There is no ACME account saved, unable to revoke the certificate.")
acme_client, _ = self.acme.setup_acme_client(certificate.authority)
acme_client, _ = self.setup_acme_client(certificate.authority)
fullchain_com = jose.ComparableX509(
OpenSSL.crypto.load_certificate(

View File

@ -105,6 +105,8 @@ class EmailNotificationPlugin(ExpirationNotificationPlugin):
@staticmethod
def send(notification_type, message, targets, options, **kwargs):
if not targets:
return
subject = "Lemur: {0} Notification".format(notification_type.capitalize())
@ -119,11 +121,9 @@ class EmailNotificationPlugin(ExpirationNotificationPlugin):
send_via_smtp(subject, body, targets)
@staticmethod
def filter_recipients(options, excluded_recipients, **kwargs):
def get_recipients(options, additional_recipients, **kwargs):
notification_recipients = get_plugin_option("recipients", options)
if notification_recipients:
notification_recipients = notification_recipients.split(",")
# removing owner and security_email from notification_recipient
notification_recipients = [i for i in notification_recipients if i not in excluded_recipients]
return notification_recipients
return list(set(notification_recipients + additional_recipients))

View File

@ -21,7 +21,6 @@ def get_options():
def test_render_expiration(certificate, endpoint):
new_cert = CertificateFactory()
new_cert.replaces.append(certificate)
@ -54,7 +53,7 @@ def test_send_expiration_notification():
certificate.notifications[0].options = get_options()
verify_sender_email()
assert send_expiration_notifications([]) == (3, 0) # owner, recipients (only counted as 1), and security
assert send_expiration_notifications([]) == (4, 0) # owner (1), recipients (2), and security (1)
@mock_ses
@ -76,15 +75,20 @@ def test_send_pending_failure_notification(user, pending_certificate, async_issu
verify_sender_email()
assert send_pending_failure_notification(pending_certificate)
assert send_pending_failure_notification(pending_certificate, True, True)
assert send_pending_failure_notification(pending_certificate, True, False)
assert send_pending_failure_notification(pending_certificate, False, True)
assert send_pending_failure_notification(pending_certificate, False, False)
def test_filter_recipients(certificate, endpoint):
def test_get_recipients(certificate, endpoint):
from lemur.plugins.lemur_email.plugin import EmailNotificationPlugin
options = [{"name": "recipients", "value": "security@example.com,bob@example.com,joe@example.com"}]
assert EmailNotificationPlugin.filter_recipients(options, []) == ["security@example.com", "bob@example.com",
"joe@example.com"]
assert EmailNotificationPlugin.filter_recipients(options, ["security@example.com"]) == ["bob@example.com",
"joe@example.com"]
assert EmailNotificationPlugin.filter_recipients(options, ["security@example.com", "bob@example.com",
"joe@example.com"]) == []
options = [{"name": "recipients", "value": "security@example.com,joe@example.com"}]
two_emails = sorted(["security@example.com", "joe@example.com"])
assert sorted(EmailNotificationPlugin.get_recipients(options, [])) == two_emails
assert sorted(EmailNotificationPlugin.get_recipients(options, ["security@example.com"])) == two_emails
three_emails = sorted(["security@example.com", "bob@example.com", "joe@example.com"])
assert sorted(EmailNotificationPlugin.get_recipients(options, ["bob@example.com"])) == three_emails
assert sorted(EmailNotificationPlugin.get_recipients(options, ["security@example.com", "bob@example.com",
"joe@example.com"])) == three_emails