lemur/lemur/notifications.py

185 lines
5.1 KiB
Python

"""
.. module: lemur.notifications
:platform: Unix
:copyright: (c) 2015 by Netflix Inc., see AUTHORS for more
:license: Apache, see LICENSE for more details.
.. moduleauthor:: Kevin Glisson <kglisson@netflix.com>
"""
import ssl
import socket
import arrow
from flask import current_app
from lemur import database
from lemur.common.services.aws import ses
from lemur.certificates.models import Certificate
from lemur.domains.models import Domain
NOTIFICATION_INTERVALS = [30, 15, 5, 2]
def _get_domain_certificate(name):
"""
Fetch the SSL certificate currently hosted at a given domain (if any) and
compare it against our all of our know certificates to determine if a new
SSL certificate has already been deployed
:param name:
:return:
"""
query = database.session_query(Certificate)
try:
pub_key = ssl.get_server_certificate((name, 443))
return query.filter(Certificate.body == pub_key.strip()).first()
except socket.gaierror as e:
current_app.logger.info(str(e))
def _find_superseded(domains):
"""
Here we try to fetch any domain in the certificate to see if we can resolve it
and to try and see if it is currently serving the certificate we are
alerting on
:param domains:
:return:
"""
query = database.session_query(Certificate)
ss_list = []
for domain in domains:
dc = _get_domain_certificate(domain.name)
if dc:
ss_list.append(dc)
current_app.logger.info("Trying to resolve {0}".format(domain.name))
query = query.filter(Certificate.domains.any(Domain.name.in_([x.name for x in domains])))
query = query.filter(Certificate.active == True) # noqa
query = query.filter(Certificate.not_after >= arrow.utcnow().format('YYYY-MM-DD'))
ss_list.extend(query.all())
return ss_list
def send_expiration_notifications():
"""
This function will check for upcoming certificate expiration,
and send out notification emails at given intervals.
"""
notifications = 0
certs = _get_expiring_certs()
alerts = []
for cert in certs:
if _is_eligible_for_notifications(cert):
data = _get_message_data(cert)
recipients = _get_message_recipients(cert)
alerts.append((data, recipients))
roll_ups = _create_roll_ups(alerts)
for messages, recipients in roll_ups:
notifications += 1
ses.send("Certificate Expiration", dict(messages=messages), 'event', recipients)
print notifications
current_app.logger.info("Lemur has sent {0} certification notifications".format(notifications))
def _get_message_recipients(cert):
"""
Determine who the recipients of the certificate expiration should be
:param cert:
:return:
"""
recipients = []
if current_app.config.get('SECURITY_TEAM_EMAIL'):
recipients.extend(current_app.config.get('SECURITY_TEAM_EMAIL'))
recipients.append(cert.owner)
if cert.user:
recipients.append(cert.user.email)
return list(set(recipients))
def _get_message_data(cert):
"""
Parse our the certification information needed for our notification
:param cert:
:return:
"""
cert_dict = cert.as_dict()
cert_dict['domains'] = [x .name for x in cert.domains]
cert_dict['superseded'] = list(set([x.name for x in _find_superseded(cert.domains) if cert.name != x]))
return cert_dict
def _get_expiring_certs(outlook=30):
"""
Find all the certificates expiring within a given outlook
:param outlook: int days to look forward
:return:
"""
now = arrow.utcnow()
query = database.session_query(Certificate)
attr = Certificate.not_after
# get all certs expiring in the next 30 days
to = now.replace(days=+outlook).format('YYYY-MM-DD')
certs = []
for cert in query.filter(attr <= to).filter(attr >= now.format('YYYY-MM-DD')).all():
if _is_eligible_for_notifications(cert):
certs.append(cert)
return certs
def _is_eligible_for_notifications(cert, intervals=None):
"""
Determine if notifications for a given certificate should
currently be sent
:param cert:
:param intervals: list of days to alert on
:return:
"""
now = arrow.utcnow()
if cert.active:
days = (cert.not_after - now.naive).days
if not intervals:
intervals = NOTIFICATION_INTERVALS
if days in intervals:
return cert
def _create_roll_ups(messages):
"""
Take all of the messages that should be sent and provide
a roll up to the same set if the recipients are the same
:param messages:
"""
roll_ups = []
for message_data, recipients in messages:
for m, r in roll_ups:
if r == recipients:
m.append(message_data)
current_app.logger.info(
"Sending email expiration alert about {0} to {1}".format(
message_data['name'], ",".join(recipients)))
break
else:
roll_ups.append(([message_data], recipients))
return roll_ups