Adding ability to exclude certificates from expiration (#730)
* adding ability to exclude certificates from expiration * fixing tests
This commit is contained in:
parent
b0ea027769
commit
f0dde845db
|
@ -12,16 +12,21 @@ from lemur.notifications.messaging import send_expiration_notifications
|
|||
manager = Manager(usage="Handles notification related tasks.")
|
||||
|
||||
|
||||
@manager.command
|
||||
def expirations():
|
||||
@manager.option('-e', '--exclude', dest='exclude', nargs="*", help='Common name matching of certificates that should be excluded from notification')
|
||||
def expirations(exclude):
|
||||
"""
|
||||
Runs Lemur's notification engine, that looks for expired certificates and sends
|
||||
notifications out to those that have subscribed to them.
|
||||
|
||||
Every certificate receives notifications by default. When expiration notifications are handled outside of Lemur
|
||||
we exclude their names (or matching) from expiration notifications.
|
||||
|
||||
It performs simple subset matching and is case insensitive.
|
||||
|
||||
:return:
|
||||
"""
|
||||
print("Starting to notify subscribers about expiring certificates!")
|
||||
success, failed = send_expiration_notifications()
|
||||
success, failed = send_expiration_notifications(exclude)
|
||||
print(
|
||||
"Finished notifying subscribers about expiring certificates! Sent: {success} Failed: {failed}".format(
|
||||
success=success,
|
||||
|
|
|
@ -15,6 +15,8 @@ import arrow
|
|||
from datetime import timedelta
|
||||
from flask import current_app
|
||||
|
||||
from sqlalchemy import and_
|
||||
|
||||
from lemur import database, metrics
|
||||
from lemur.common.utils import windowed_query
|
||||
|
||||
|
@ -25,9 +27,10 @@ from lemur.plugins import plugins
|
|||
from lemur.plugins.utils import get_plugin_option
|
||||
|
||||
|
||||
def get_certificates():
|
||||
def get_certificates(exclude=None):
|
||||
"""
|
||||
Finds all certificates that are eligible for notifications.
|
||||
:param exclude:
|
||||
:return:
|
||||
"""
|
||||
now = arrow.utcnow()
|
||||
|
@ -38,6 +41,13 @@ def get_certificates():
|
|||
.filter(Certificate.notify == True) \
|
||||
.filter(Certificate.expired == False) # noqa
|
||||
|
||||
exclude_conditions = []
|
||||
if exclude:
|
||||
for e in exclude:
|
||||
exclude_conditions.append(~Certificate.name.ilike('%{}%'.format(e)))
|
||||
|
||||
q = q.filter(and_(*exclude_conditions))
|
||||
|
||||
certs = []
|
||||
|
||||
for c in windowed_query(q, Certificate.id, 100):
|
||||
|
@ -47,13 +57,14 @@ def get_certificates():
|
|||
return certs
|
||||
|
||||
|
||||
def get_eligible_certificates():
|
||||
def get_eligible_certificates(exclude=None):
|
||||
"""
|
||||
Finds all certificates that are eligible for certificate expiration.
|
||||
:param exclude:
|
||||
:return:
|
||||
"""
|
||||
certificates = defaultdict(dict)
|
||||
certs = get_certificates()
|
||||
certs = get_certificates(exclude=exclude)
|
||||
|
||||
# group by owner
|
||||
for owner, items in groupby(certs, lambda x: x.owner):
|
||||
|
@ -91,7 +102,7 @@ def send_notification(event_type, data, targets, notification):
|
|||
current_app.logger.exception(e)
|
||||
|
||||
|
||||
def send_expiration_notifications():
|
||||
def send_expiration_notifications(exclude):
|
||||
"""
|
||||
This function will check for upcoming certificate expiration,
|
||||
and send out notification emails at given intervals.
|
||||
|
@ -102,7 +113,7 @@ def send_expiration_notifications():
|
|||
security_email = current_app.config.get('LEMUR_SECURITY_TEAM_EMAIL')
|
||||
|
||||
security_data = []
|
||||
for owner, notification_group in get_eligible_certificates().items():
|
||||
for owner, notification_group in get_eligible_certificates(exclude=exclude).items():
|
||||
|
||||
for notification_label, certificates in notification_group.items():
|
||||
notification_data = []
|
||||
|
|
|
@ -40,7 +40,7 @@ def send_via_smtp(subject, body, targets):
|
|||
:param targets:
|
||||
:return:
|
||||
"""
|
||||
msg = Message(subject, recipients=targets)
|
||||
msg = Message(subject, recipients=targets, sender=current_app.config.get("LEMUR_EMAIL"))
|
||||
msg.body = "" # kinda a weird api for sending html emails
|
||||
msg.html = body
|
||||
smtp_mail.send(msg)
|
||||
|
|
|
@ -71,7 +71,7 @@ def test_send_expiration_notification(certificate, notification, notification_pl
|
|||
|
||||
delta = certificate.not_after - timedelta(days=10)
|
||||
with freeze_time(delta.datetime):
|
||||
assert send_expiration_notifications() == (2, 0)
|
||||
assert send_expiration_notifications([]) == (2, 0)
|
||||
|
||||
|
||||
@mock_ses
|
||||
|
|
Loading…
Reference in New Issue