This commit is contained in:
Jasmine Schladen 2020-10-20 11:48:54 -07:00
commit d6075ebc11
20 changed files with 313 additions and 170 deletions

View File

@ -155,17 +155,12 @@ Specifying the `SQLALCHEMY_MAX_OVERFLOW` to 0 will enforce limit to not create c
LEMUR_ENCRYPTION_KEYS = ['1YeftooSbxCiX2zo8m1lXtpvQjy27smZcUUaGmffhMY=', 'LAfQt6yrkLqOK5lwpvQcT4jf2zdeTQJV1uYeh9coT5s=']
.. data:: PUBLIC_CA_AUTHORITY_NAMES
:noindex:
A list of public issuers which would be checked against to determine whether limit of max validity of 397 days
should be applied to the certificate. Configure public CA authority names in this list to enforce validity check.
This is an optional setting. Using this will allow the sanity check as mentioned. The name check is a case-insensitive
string comparision.
.. data:: PUBLIC_CA_MAX_VALIDITY_DAYS
:noindex:
Use this config to override the limit of 397 days of validity for certificates issued by public issuers configured
using PUBLIC_CA_AUTHORITY_NAMES. Below example overrides the default validity of 397 days and sets it to 365 days.
Use this config to override the limit of 397 days of validity for certificates issued by CA/Browser compliant authorities.
The authorities with cab_compliant option set to true will use this config. The example below overrides the default validity
of 397 days and sets it to 365 days.
::
@ -175,8 +170,8 @@ Specifying the `SQLALCHEMY_MAX_OVERFLOW` to 0 will enforce limit to not create c
.. data:: DEFAULT_VALIDITY_DAYS
:noindex:
Use this config to override the default validity of 365 days for certificates offered through Lemur UI. Any CA which
is not listed in PUBLIC_CA_AUTHORITY_NAMES will be using this value as default validity to be displayed on UI. Please
note that this config is used for cert issuance only through Lemur UI. Below example overrides the default validity
is not CA/Browser Forum compliant will be using this value as default validity to be displayed on UI. Please
note that this config is used for cert issuance only through Lemur UI. The example below overrides the default validity
of 365 days and sets it to 1095 days (3 years).
::

View File

@ -8,6 +8,7 @@
"""
import json
from flask import current_app
from sqlalchemy.orm import relationship
from sqlalchemy import (
Column,
@ -98,5 +99,17 @@ class Authority(db.Model):
return None
@property
def max_issuance_days(self):
if self.is_cab_compliant:
return current_app.config.get("PUBLIC_CA_MAX_VALIDITY_DAYS", 397)
@property
def default_validity_days(self):
if self.is_cab_compliant:
return current_app.config.get("PUBLIC_CA_MAX_VALIDITY_DAYS", 397)
return current_app.config.get("DEFAULT_VALIDITY_DAYS", 365) # 1 year default
def __repr__(self):
return "Authority(name={name})".format(name=self.name)

View File

@ -111,8 +111,6 @@ class RootAuthorityCertificateOutputSchema(LemurOutputSchema):
cn = fields.String()
not_after = fields.DateTime()
not_before = fields.DateTime()
max_issuance_days = fields.Integer()
default_validity_days = fields.Integer()
owner = fields.Email()
status = fields.Boolean()
user = fields.Nested(UserNestedOutputSchema)
@ -127,6 +125,8 @@ class AuthorityOutputSchema(LemurOutputSchema):
active = fields.Boolean()
options = fields.Dict()
roles = fields.List(fields.Nested(AssociatedRoleSchema))
max_issuance_days = fields.Integer()
default_validity_days = fields.Integer()
authority_certificate = fields.Nested(RootAuthorityCertificateOutputSchema)
@ -138,8 +138,10 @@ class AuthorityNestedOutputSchema(LemurOutputSchema):
owner = fields.Email()
plugin = fields.Nested(PluginOutputSchema)
active = fields.Boolean()
authority_certificate = fields.Nested(RootAuthorityCertificateOutputSchema, only=["max_issuance_days", "default_validity_days"])
authority_certificate = fields.Nested(RootAuthorityCertificateOutputSchema, only=["not_after", "not_before"])
is_cab_compliant = fields.Boolean()
max_issuance_days = fields.Integer()
default_validity_days = fields.Integer()
authority_update_schema = AuthorityUpdateSchema()

View File

@ -317,20 +317,6 @@ class Certificate(db.Model):
def validity_range(self):
return self.not_after - self.not_before
@property
def max_issuance_days(self):
public_CA = current_app.config.get("PUBLIC_CA_AUTHORITY_NAMES", [])
if self.name.lower() in [ca.lower() for ca in public_CA]:
return current_app.config.get("PUBLIC_CA_MAX_VALIDITY_DAYS", 397)
@property
def default_validity_days(self):
public_CA = current_app.config.get("PUBLIC_CA_AUTHORITY_NAMES", [])
if self.name.lower() in [ca.lower() for ca in public_CA]:
return current_app.config.get("PUBLIC_CA_MAX_VALIDITY_DAYS", 397)
return current_app.config.get("DEFAULT_VALIDITY_DAYS", 365) # 1 year default
@property
def subject(self):
return self.parsed_cert.subject

View File

@ -8,6 +8,7 @@
.. moduleauthor:: Kevin Glisson <kglisson@netflix.com>
"""
import sys
from collections import defaultdict
from datetime import timedelta
from itertools import groupby
@ -36,8 +37,6 @@ def get_certificates(exclude=None):
now = arrow.utcnow()
max = now + timedelta(days=90)
print("ALPACA: Checking for certs not after {0} with notify enabled and not expired".format(max))
q = (
database.db.session.query(Certificate)
.filter(Certificate.not_after <= max)
@ -45,8 +44,6 @@ def get_certificates(exclude=None):
.filter(Certificate.expired == False)
) # noqa
print("ALPACA: Excluding {0}".format(exclude))
exclude_conditions = []
if exclude:
for e in exclude:
@ -60,8 +57,6 @@ def get_certificates(exclude=None):
if needs_notification(c):
certs.append(c)
print("ALPACA: Found {0} eligible certs".format(len(certs)))
return certs
@ -75,27 +70,21 @@ def get_eligible_certificates(exclude=None):
certificates = defaultdict(dict)
certs = get_certificates(exclude=exclude)
print("ALPACA: Found {0} certificates to check for notifications".format(len(certs)))
# group by owner
for owner, items in groupby(certs, lambda x: x.owner):
notification_groups = []
for certificate in items:
notifications = needs_notification(certificate)
print("ALPACA: Considering sending {0} notifications for cert {1}".format(len(notifications), certificate))
if notifications:
for notification in notifications:
print("ALPACA: Will send notification {0} for certificate {1}".format(notification, certificate))
notification_groups.append((notification, certificate))
# group by notification
for notification, items in groupby(notification_groups, lambda x: x[0].label):
certificates[owner][notification] = list(items)
print("ALPACA: Certificates that need notifications: {0}".format(certificates))
return certificates
@ -109,15 +98,20 @@ def send_plugin_notification(event_type, data, recipients, notification):
:param notification:
:return:
"""
function = f"{__name__}.{sys._getframe().f_code.co_name}"
log_data = {
"function": function,
"message": f"Sending expiration notification for to recipients {recipients}",
"notification_type": "expiration",
"certificate_targets": recipients,
}
status = FAILURE_METRIC_STATUS
try:
print("ALPACA: Trying to send notification {0} (plugin: {1})".format(notification, notification.plugin))
notification.plugin.send(event_type, data, recipients, notification.options)
status = SUCCESS_METRIC_STATUS
except Exception as e:
current_app.logger.error(
"Unable to send notification {}.".format(notification), exc_info=True
)
log_data["message"] = f"Unable to send expiration notification to recipients {recipients}"
current_app.logger.error(log_data, exc_info=True)
sentry.captureException()
metrics.send(
@ -157,8 +151,6 @@ def send_expiration_notifications(exclude):
notification_data.append(cert_data)
security_data.append(cert_data)
print("ALPACA: Sending owner notification to {0} for certificate {1}. Data: {2}".format(owner, certificates, notification_data))
if send_default_notification(
"expiration", notification_data, [owner], notification.options
):
@ -166,9 +158,8 @@ def send_expiration_notifications(exclude):
else:
failure += 1
recipients = notification.plugin.filter_recipients(security_email + [owner], notification.options)
recipients = notification.plugin.filter_recipients(notification.options, security_email + [owner])
print("ALPACA: Sending plugin notification {0} for certificate {1} to recipients {2}".format(notification, certificates, recipients))
if send_plugin_notification(
"expiration",
notification_data,
@ -179,7 +170,6 @@ def send_expiration_notifications(exclude):
else:
failure += 1
print("ALPACA: Sending security notification to {0}".format(security_email))
if send_default_notification(
"expiration", security_data, security_email, notification.options
):
@ -201,6 +191,12 @@ def send_default_notification(notification_type, data, targets, notification_opt
:param notification_options:
:return:
"""
function = f"{__name__}.{sys._getframe().f_code.co_name}"
log_data = {
"function": function,
"message": f"Sending notification for certificate data {data}",
"notification_type": notification_type,
}
status = FAILURE_METRIC_STATUS
notification_plugin = plugins.get(
current_app.config.get("LEMUR_DEFAULT_NOTIFICATION_PLUGIN", "email-notification")
@ -211,9 +207,9 @@ def send_default_notification(notification_type, data, targets, notification_opt
notification_plugin.send(notification_type, data, targets, notification_options)
status = SUCCESS_METRIC_STATUS
except Exception as e:
current_app.logger.error(
"Unable to send notification to {}.".format(targets), exc_info=True
)
log_data["message"] = f"Unable to send {notification_type} notification for certificate data {data} " \
f"to target {targets}"
current_app.logger.error(log_data, exc_info=True)
sentry.captureException()
metrics.send(
@ -272,11 +268,7 @@ def needs_notification(certificate):
notifications = []
print("ALPACA: Considering if cert {0} needs notifications".format(certificate))
print("ALPACA: Notifications for {0}: {1}".format(certificate, certificate.notifications))
for notification in certificate.notifications:
print("ALPACA: Considering if cert {0} needs notification {1}".format(certificate, notification))
if not notification.active or not notification.options:
continue
@ -294,11 +286,10 @@ def needs_notification(certificate):
else:
raise Exception(
"Invalid base unit for expiration interval: {0}".format(unit)
f"Invalid base unit for expiration interval: {unit}"
)
print("ALPACA: Considering if cert {0} is applicable for notification {1}: {2} days remaining, configured as "
"{3} days".format(certificate, notification, days, interval))
print(f"Does cert {certificate.name} need a notification {notification.label}? Actual: {days}, "
f"configured: {interval}") # TODO REMOVE
if days == interval:
notifications.append(notification)
return notifications

View File

@ -434,7 +434,7 @@ class SNSNotificationPlugin(ExpirationNotificationPlugin):
"helpMessage": "Region in which the SNS topic is located, e.g. \"us-east-1\"",
},
{
"name": "Topic Name",
"name": "topicName",
"type": "str",
"required": True,
# base topic name is 1-256 characters (alphanumeric plus underscore and hyphen)
@ -449,13 +449,12 @@ class SNSNotificationPlugin(ExpirationNotificationPlugin):
plugin configuration, and can't reasonably be changed dynamically.
"""
topic_arn = "arn:aws:sns:{0}:{1}:{2}".format(self.get_option("region", options),
self.get_option("accountNumber", options),
self.get_option("Topic Name", options))
topic_arn = f"arn:aws:sns:{self.get_option('region', options)}:" \
f"{self.get_option('accountNumber', options)}:" \
f"{self.get_option('topicName', options)}"
current_app.logger.debug("Publishing {0} notification to topic {1}".format(notification_type, topic_arn))
print("ALPACA: Trying to send {0} SNS notification to topic {1}".format(notification_type, topic_arn))
current_app.logger.debug(f"Publishing {notification_type} notification to topic {topic_arn}")
try:
sns.publish(topic_arn, message, notification_type, region_name=self.get_option("region", options))
except Exception:
current_app.logger.exception("Error publishing {0} notification to topic {1}".format(notification_type, topic_arn))
current_app.logger.exception(f"Error publishing {notification_type} notification to topic {topic_arn}")

View File

@ -14,7 +14,6 @@ from flask import current_app
def publish(topic_arn, certificates, notification_type, **kwargs):
sns_client = boto3.client("sns", **kwargs)
print("ALPACA: SNS client: {0}, certificates: {1}".format(sns_client, certificates))
message_ids = {}
for certificate in certificates:
message_ids[certificate["name"]] = publish_single(sns_client, topic_arn, certificate, notification_type)
@ -30,11 +29,9 @@ def publish_single(sns_client, topic_arn, certificate, notification_type):
response_code = response["ResponseMetadata"]["HTTPStatusCode"]
if response_code != 200:
raise Exception("Failed to publish notification to SNS, response code was {}".format(response_code))
raise Exception(f"Failed to publish notification to SNS, response code was {response_code}")
current_app.logger.debug(
"AWS SNS message published to topic [{0}]: [{1}]".format(topic_arn, response)
)
current_app.logger.debug(f"AWS SNS message published to topic [{topic_arn}]: [{response}]")
return response["MessageId"]

View File

@ -1,15 +1,19 @@
from moto import mock_sts, mock_sns, mock_sqs
import boto3
import json
from datetime import timedelta
import arrow
import boto3
from moto import mock_sns, mock_sqs, mock_ses
from lemur.certificates.schemas import certificate_notification_output_schema
from lemur.plugins.lemur_aws.sns import format_message
from lemur.plugins.lemur_aws.sns import publish
from lemur.certificates.schemas import certificate_notification_output_schema
from lemur.tests.factories import NotificationFactory, CertificateFactory
from lemur.tests.test_messaging import verify_sender_email
@mock_sns()
def test_format(certificate, endpoint):
data = [certificate_notification_output_schema.dump(certificate).data]
for certificate in data:
@ -25,10 +29,7 @@ def test_format(certificate, endpoint):
@mock_sns()
@mock_sqs()
def test_publish(certificate, endpoint):
data = [certificate_notification_output_schema.dump(certificate).data]
def create_and_subscribe_to_topic():
sns_client = boto3.client("sns", region_name="us-east-1")
topic_arn = sns_client.create_topic(Name='lemursnstest')["TopicArn"]
@ -38,13 +39,82 @@ def test_publish(certificate, endpoint):
queue_arn = sqs_client.get_queue_attributes(QueueUrl=queue_url)["Attributes"]["QueueArn"]
sns_client.subscribe(TopicArn=topic_arn, Protocol="sqs", Endpoint=queue_arn)
return [topic_arn, sqs_client, queue_url]
@mock_sns()
@mock_sqs()
def test_publish(certificate, endpoint):
data = [certificate_notification_output_schema.dump(certificate).data]
topic_arn, sqs_client, queue_url = create_and_subscribe_to_topic()
message_ids = publish(topic_arn, data, "expiration", region_name="us-east-1")
assert len(message_ids) == len(data)
received_messages = sqs_client.receive_message(QueueUrl=queue_url)["Messages"]
print("ALPACA: Received messages = {}".format(received_messages))
for certificate in data:
expected_message_id = message_ids[certificate["name"]]
actual_message = next((m for m in received_messages if json.loads(m["Body"])["MessageId"] == expected_message_id), None)
actual_message = next(
(m for m in received_messages if json.loads(m["Body"])["MessageId"] == expected_message_id), None)
assert json.loads(actual_message["Body"])["Message"] == format_message(certificate, "expiration")
def get_options():
return [
{"name": "interval", "value": 10},
{"name": "unit", "value": "days"},
{"name": "region", "value": "us-east-1"},
{"name": "accountNumber", "value": "123456789012"},
{"name": "topicName", "value": "lemursnstest"},
]
@mock_sns()
@mock_sqs()
@mock_ses() # because email notifications are also sent
def test_send_expiration_notification():
from lemur.notifications.messaging import send_expiration_notifications
verify_sender_email() # emails are sent to owner and security; SNS only used for configured notification
topic_arn, sqs_client, queue_url = create_and_subscribe_to_topic()
notification = NotificationFactory(plugin_name="aws-sns")
notification.options = get_options()
now = arrow.utcnow()
in_ten_days = now + timedelta(days=10, hours=1) # a bit more than 10 days since we'll check in the future
certificate = CertificateFactory()
certificate.not_after = in_ten_days
certificate.notifications.append(notification)
assert send_expiration_notifications([]) == (3, 0) # owner, SNS, and security
received_messages = sqs_client.receive_message(QueueUrl=queue_url)["Messages"]
assert len(received_messages) == 1
expected_message = format_message(certificate_notification_output_schema.dump(certificate).data, "expiration")
actual_message = json.loads(received_messages[0]["Body"])["Message"]
assert actual_message == expected_message
# Currently disabled as the SNS plugin doesn't support this type of notification
# def test_send_rotation_notification(endpoint, source_plugin):
# from lemur.notifications.messaging import send_rotation_notification
# from lemur.deployment.service import rotate_certificate
#
# notification = NotificationFactory(plugin_name="aws-sns")
# notification.options = get_options()
#
# new_certificate = CertificateFactory()
# rotate_certificate(endpoint, new_certificate)
# assert endpoint.certificate == new_certificate
#
# assert send_rotation_notification(new_certificate)
# Currently disabled as the SNS plugin doesn't support this type of notification
# def test_send_pending_failure_notification(user, pending_certificate, async_issuer_plugin):
# from lemur.notifications.messaging import send_pending_failure_notification
#
# assert send_pending_failure_notification(pending_certificate)

View File

@ -20,14 +20,16 @@ from lemur.plugins.lemur_email.templates.config import env
from lemur.plugins.utils import get_plugin_option
def render_html(template_name, message):
def render_html(template_name, options, certificates):
"""
Renders the html for our email notification.
:param template_name:
:param message:
:param options:
:param certificates:
:return:
"""
message = {"options": options, "certificates": certificates}
template = env.get_template("{}.html".format(template_name))
return template.render(
dict(message=message, hostname=current_app.config.get("LEMUR_HOSTNAME"))
@ -101,25 +103,21 @@ class EmailNotificationPlugin(ExpirationNotificationPlugin):
subject = "Lemur: {0} Notification".format(notification_type.capitalize())
data = {"options": options, "certificates": message}
body = render_html(notification_type, data)
body = render_html(notification_type, options, message)
s_type = current_app.config.get("LEMUR_EMAIL_SENDER", "ses").lower()
current_app.logger.info("ALPACA: Would send an email to {0} with subject \"{1}\" here".format(targets, subject))
print(f"Would send {s_type} email to {targets}: {subject}")
# if s_type == "ses":
# if s_type == "ses":
# send_via_ses(subject, body, targets)
#
# elif s_type == "smtp":
# send_via_smtp(subject, body, targets)
@staticmethod
def filter_recipients(options, excluded_recipients):
print("ALPACA: Getting recipients for notification {0}".format(options))
def filter_recipients(options, excluded_recipients, **kwargs):
notification_recipients = get_plugin_option("recipients", options)
print(
"ALPACA: Sending certificate notifications to recipients {0}".format(notification_recipients.split(",")))
if notification_recipients:
notification_recipients = notification_recipients.split(",")
# removing owner and security_email from notification_recipient

View File

@ -83,12 +83,12 @@
<td width="32px"></td>
<td width="16px"></td>
<td style="line-height:1.2">
<span style="font-family:Roboto-Regular,Helvetica,Arial,sans-serif;font-size:20px;color:#202020">{{ certificate.name }}</span>
<span style="font-family:Roboto-Regular,Helvetica,Arial,sans-serif;font-size:20px;color:#202020">{{ message.certificates.name }}</span>
<br>
<span style="font-family:Roboto-Regular,Helvetica,Arial,sans-serif;font-size:13px;color:#727272">
<br>{{ certificate.owner }}
<br>{{ certificate.validityEnd | time }}
<a href="https://{{ hostname }}/#/certificates/{{ certificate.name }}" target="_blank">Details</a>
<br>{{ message.certificates.owner }}
<br>{{ message.certificates.validityEnd | time }}
<a href="https://{{ hostname }}/#/certificates/{{ message.certificates.name }}" target="_blank">Details</a>
</span>
</td>
</tr>
@ -110,12 +110,12 @@
<td width="32px"></td>
<td width="16px"></td>
<td style="line-height:1.2">
<span style="font-family:Roboto-Regular,Helvetica,Arial,sans-serif;font-size:20px;color:#202020">{{ certificate.replacedBy[0].name }}</span>
<span style="font-family:Roboto-Regular,Helvetica,Arial,sans-serif;font-size:20px;color:#202020">{{ message.certificates.name }}</span>
<br>
<span style="font-family:Roboto-Regular,Helvetica,Arial,sans-serif;font-size:13px;color:#727272">
<br>{{ certificate.replacedBy[0].owner }}
<br>{{ certificate.replacedBy[0].validityEnd | time }}
<a href="https://{{ hostname }}/#/certificates/{{ certificate.replacedBy[0].name }}" target="_blank">Details</a>
<br>{{ message.certificates.owner }}
<br>{{ message.certificates.validityEnd | time }}
<a href="https://{{ hostname }}/#/certificates/{{ message.certificates.name }}" target="_blank">Details</a>
</span>
</td>
</tr>
@ -133,7 +133,7 @@
<table border="0" cellspacing="0" cellpadding="0"
style="margin-top:48px;margin-bottom:48px">
<tbody>
{% for endpoint in certificate.endpoints %}
{% for endpoint in message.certificates.endpoints %}
<tr valign="middle">
<td width="32px"></td>
<td width="16px"></td>

View File

@ -1,39 +1,81 @@
import os
from lemur.plugins.lemur_email.templates.config import env
from datetime import timedelta
import arrow
from moto import mock_ses
from lemur.certificates.schemas import certificate_notification_output_schema
from lemur.plugins.lemur_email.plugin import render_html
from lemur.tests.factories import CertificateFactory
from lemur.tests.test_messaging import verify_sender_email
dir_path = os.path.dirname(os.path.realpath(__file__))
def test_render(certificate, endpoint):
from lemur.certificates.schemas import certificate_notification_output_schema
def get_options():
return [
{"name": "interval", "value": 10},
{"name": "unit", "value": "days"},
{"name": "recipients", "value": "person1@example.com,person2@example.com"},
]
def test_render_expiration(certificate, endpoint):
new_cert = CertificateFactory()
new_cert.replaces.append(certificate)
data = {
"certificates": [certificate_notification_output_schema.dump(certificate).data],
"options": [
{"name": "interval", "value": 10},
{"name": "unit", "value": "days"},
],
}
assert render_html("expiration", get_options(), [certificate_notification_output_schema.dump(certificate).data])
template = env.get_template("{}.html".format("expiration"))
body = template.render(dict(message=data, hostname="lemur.test.example.com"))
template = env.get_template("{}.html".format("rotation"))
def test_render_rotation(certificate, endpoint):
certificate.endpoints.append(endpoint)
body = template.render(
dict(
certificate=certificate_notification_output_schema.dump(certificate).data,
hostname="lemur.test.example.com",
)
)
assert render_html("rotation", get_options(), certificate_notification_output_schema.dump(certificate).data)
def test_render_rotation_failure(pending_certificate):
assert render_html("failed", get_options(), certificate_notification_output_schema.dump(pending_certificate).data)
@mock_ses
def test_send_expiration_notification():
from lemur.notifications.messaging import send_expiration_notifications
from lemur.tests.factories import CertificateFactory
from lemur.tests.factories import NotificationFactory
now = arrow.utcnow()
in_ten_days = now + timedelta(days=10, hours=1) # a bit more than 10 days since we'll check in the future
certificate = CertificateFactory()
notification = NotificationFactory(plugin_name="email-notification")
certificate.not_after = in_ten_days
certificate.notifications.append(notification)
certificate.notifications[0].options = get_options()
verify_sender_email()
assert send_expiration_notifications([]) == (3, 0) # owner, recipients (only counted as 1), and security
@mock_ses
def test_send_rotation_notification(endpoint, source_plugin):
from lemur.notifications.messaging import send_rotation_notification
from lemur.deployment.service import rotate_certificate
new_certificate = CertificateFactory()
rotate_certificate(endpoint, new_certificate)
assert endpoint.certificate == new_certificate
verify_sender_email()
assert send_rotation_notification(new_certificate)
@mock_ses
def test_send_pending_failure_notification(user, pending_certificate, async_issuer_plugin):
from lemur.notifications.messaging import send_pending_failure_notification
verify_sender_email()
assert send_pending_failure_notification(pending_certificate)
def test_filter_recipients(certificate, endpoint):

View File

@ -58,7 +58,6 @@ def create_rotation_attachments(certificate):
"title": certificate["name"],
"title_link": create_certificate_url(certificate["name"]),
"fields": [
{
{"title": "Owner", "value": certificate["owner"], "short": True},
{
"title": "Expires",
@ -67,17 +66,11 @@ def create_rotation_attachments(certificate):
),
"short": True,
},
{
"title": "Replaced By",
"value": len(certificate["replaced"][0]["name"]),
"short": True,
},
{
"title": "Endpoints Rotated",
"value": len(certificate["endpoints"]),
"short": True,
},
}
],
}

View File

@ -1,3 +1,12 @@
from datetime import timedelta
import arrow
from moto import mock_ses
from lemur.tests.factories import NotificationFactory, CertificateFactory
from lemur.tests.test_messaging import verify_sender_email
def test_formatting(certificate):
from lemur.plugins.lemur_slack.plugin import create_expiration_attachments
from lemur.certificates.schemas import certificate_notification_output_schema
@ -21,3 +30,52 @@ def test_formatting(certificate):
}
assert attachment == create_expiration_attachments(data)[0]
def get_options():
return [
{"name": "interval", "value": 10},
{"name": "unit", "value": "days"},
{"name": "webhook", "value": "https://slack.com/api/api.test"},
]
@mock_ses() # because email notifications are also sent
def test_send_expiration_notification():
from lemur.notifications.messaging import send_expiration_notifications
verify_sender_email() # emails are sent to owner and security; Slack only used for configured notification
notification = NotificationFactory(plugin_name="slack-notification")
notification.options = get_options()
now = arrow.utcnow()
in_ten_days = now + timedelta(days=10, hours=1) # a bit more than 10 days since we'll check in the future
certificate = CertificateFactory()
certificate.not_after = in_ten_days
certificate.notifications.append(notification)
assert send_expiration_notifications([]) == (3, 0) # owner, Slack, and security
# Currently disabled as the Slack plugin doesn't support this type of notification
# def test_send_rotation_notification(endpoint, source_plugin):
# from lemur.notifications.messaging import send_rotation_notification
# from lemur.deployment.service import rotate_certificate
#
# notification = NotificationFactory(plugin_name="slack-notification")
# notification.options = get_options()
#
# new_certificate = CertificateFactory()
# rotate_certificate(endpoint, new_certificate)
# assert endpoint.certificate == new_certificate
#
# assert send_rotation_notification(new_certificate, notification_plugin=notification.plugin)
# Currently disabled as the Slack plugin doesn't support this type of notification
# def test_send_pending_failure_notification(user, pending_certificate, async_issuer_plugin):
# from lemur.notifications.messaging import send_pending_failure_notification
#
# assert send_pending_failure_notification(pending_certificate, notification_plugin=plugins.get("slack-notification"))

View File

@ -190,7 +190,7 @@ angular.module('lemur')
function populateValidityDateAsPerDefault(certificate) {
// calculate start and end date as per default validity
let startDate = new Date(), endDate = new Date();
endDate.setDate(startDate.getDate() + certificate.authority.authorityCertificate.defaultValidityDays);
endDate.setDate(startDate.getDate() + certificate.authority.defaultValidityDays);
certificate.validityStart = startDate;
certificate.validityEnd = endDate;
}
@ -359,7 +359,7 @@ angular.module('lemur')
function populateValidityDateAsPerDefault(certificate) {
// calculate start and end date as per default validity
let startDate = new Date(), endDate = new Date();
endDate.setDate(startDate.getDate() + certificate.authority.authorityCertificate.defaultValidityDays);
endDate.setDate(startDate.getDate() + certificate.authority.defaultValidityDays);
certificate.validityStart = startDate;
certificate.validityEnd = endDate;
}

View File

@ -139,7 +139,7 @@
<div class="col-sm-4">
<div class="btn-group btn-group-toggle" data-toggle="buttons">
<label class="btn btn-info" ng-model="certificate.validityType" uib-btn-radio="'defaultDays'" ng-click="clearDates()">
Default ({{certificate.authority.authorityCertificate.defaultValidityDays}} days)</label>
Default ({{certificate.authority.defaultValidityDays}} days)</label>
<label class="btn btn-info" ng-model="certificate.validityType" uib-btn-radio="'customDates'" ng-change="clearDates()">Custom</label>
</div>
</div>

View File

@ -172,12 +172,12 @@ angular.module('lemur')
// Minimum end date will be same as selected start date
this.authority.authorityCertificate.minValidityEnd = value;
if(!this.authority.authorityCertificate || !this.authority.authorityCertificate.maxIssuanceDays) {
if(!this.authority.maxIssuanceDays) {
this.authority.authorityCertificate.maxValidityEnd = this.authority.authorityCertificate.notAfter;
} else {
// Move max end date by maxIssuanceDays
let endDate = new Date(value);
endDate.setDate(endDate.getDate() + this.authority.authorityCertificate.maxIssuanceDays);
endDate.setDate(endDate.getDate() + this.authority.maxIssuanceDays);
this.authority.authorityCertificate.maxValidityEnd = endDate;
}
}

View File

@ -152,12 +152,12 @@ angular.module('lemur')
// Minimum end date will be same as selected start date
this.authority.authorityCertificate.minValidityEnd = value;
if(!this.authority.authorityCertificate || !this.authority.authorityCertificate.maxIssuanceDays) {
if(!this.authority.maxIssuanceDays) {
this.authority.authorityCertificate.maxValidityEnd = this.authority.authorityCertificate.notAfter;
} else {
// Move max end date by maxIssuanceDays
let endDate = new Date(value);
endDate.setDate(endDate.getDate() + this.authority.authorityCertificate.maxIssuanceDays);
endDate.setDate(endDate.getDate() + this.authority.maxIssuanceDays);
this.authority.authorityCertificate.maxValidityEnd = endDate;
}
}

View File

@ -46,7 +46,7 @@ LEMUR_ALLOWED_DOMAINS = [
# Lemur currently only supports SES for sending email, this address
# needs to be verified
LEMUR_EMAIL = ""
LEMUR_EMAIL = "lemur@example.com"
LEMUR_SECURITY_TEAM_EMAIL = ["security@example.com"]
LEMUR_HOSTNAME = "lemur.example.com"

View File

@ -14,5 +14,4 @@ class TestNotificationPlugin(NotificationPlugin):
@staticmethod
def send(notification_type, message, targets, options, **kwargs):
print("TODO REMOVE: sending email to {}".format(targets))
return

View File

@ -1,11 +1,18 @@
from datetime import timedelta
import arrow
import boto3
import pytest
from freezegun import freeze_time
from datetime import timedelta
import arrow
from moto import mock_ses
@mock_ses
def verify_sender_email():
ses_client = boto3.client("ses", region_name="us-east-1")
ses_client.verify_email_identity(EmailAddress="lemur@example.com")
def test_needs_notification(app, certificate, notification):
from lemur.notifications.messaging import needs_notification
@ -78,6 +85,7 @@ def test_get_eligible_certificates(app, certificate, notification):
@mock_ses
def test_send_expiration_notification(certificate, notification, notification_plugin):
from lemur.notifications.messaging import send_expiration_notifications
verify_sender_email()
certificate.notifications.append(notification)
certificate.notifications[0].options = [
@ -105,23 +113,15 @@ def test_send_expiration_notification_with_no_notifications(
@mock_ses
def test_send_rotation_notification(notification_plugin, certificate):
from lemur.tests.factories import UserFactory
from lemur.tests.factories import CertificateFactory
from lemur.notifications.messaging import send_rotation_notification
verify_sender_email()
user = UserFactory(email="jschladen@netflix.com")
new_cert = CertificateFactory(user=user)
assert send_rotation_notification(new_cert)
assert send_rotation_notification(certificate)
@mock_ses
def test_send_pending_failure_notification(certificate, endpoint):
from lemur.tests.factories import UserFactory
from lemur.tests.factories import PendingCertificateFactory
def test_send_pending_failure_notification(notification_plugin, async_issuer_plugin, pending_certificate):
from lemur.notifications.messaging import send_pending_failure_notification
verify_sender_email()
user = UserFactory(email="jschladen@netflix.com")
pending_cert = PendingCertificateFactory(user=user)
assert send_pending_failure_notification(pending_cert)
assert send_pending_failure_notification(pending_certificate)