Merge branch 'master' into expanding-S3-plugin
This commit is contained in:
@ -20,6 +20,15 @@ class NotificationPlugin(Plugin):
|
||||
def send(self, notification_type, message, targets, options, **kwargs):
|
||||
raise NotImplementedError
|
||||
|
||||
def filter_recipients(self, options, excluded_recipients):
|
||||
"""
|
||||
Given a set of options (which should include configured recipient info), filters out recipients that
|
||||
we do NOT want to notify.
|
||||
|
||||
For any notification types where recipients can't be dynamically modified, this returns an empty list.
|
||||
"""
|
||||
return []
|
||||
|
||||
|
||||
class ExpirationNotificationPlugin(NotificationPlugin):
|
||||
"""
|
||||
@ -50,5 +59,5 @@ class ExpirationNotificationPlugin(NotificationPlugin):
|
||||
def options(self):
|
||||
return self.default_options + self.additional_options
|
||||
|
||||
def send(self, notification_type, message, targets, options, **kwargs):
|
||||
def send(self, notification_type, message, excluded_targets, options, **kwargs):
|
||||
raise NotImplementedError
|
||||
|
@ -32,13 +32,14 @@
|
||||
.. moduleauthor:: Mikhail Khodorovskiy <mikhail.khodorovskiy@jivesoftware.com>
|
||||
.. moduleauthor:: Harm Weites <harm@weites.com>
|
||||
"""
|
||||
|
||||
from acme.errors import ClientError
|
||||
from flask import current_app
|
||||
from lemur.extensions import sentry, metrics
|
||||
|
||||
from lemur.plugins import lemur_aws as aws
|
||||
from lemur.extensions import sentry, metrics
|
||||
from lemur.plugins import lemur_aws as aws, ExpirationNotificationPlugin
|
||||
from lemur.plugins.bases import DestinationPlugin, ExportDestinationPlugin, SourcePlugin
|
||||
from lemur.plugins.lemur_aws import iam, s3, elb, ec2
|
||||
from lemur.plugins.lemur_aws import iam, s3, elb, ec2, sns
|
||||
|
||||
|
||||
def get_region_from_dns(dns):
|
||||
@ -407,6 +408,7 @@ class S3DestinationPlugin(ExportDestinationPlugin):
|
||||
account_number=self.get_option("accountNumber", options),
|
||||
)
|
||||
|
||||
|
||||
def upload_acme_token(self, token_path, token, options, **kwargs):
|
||||
"""
|
||||
This is called from the acme http challenge
|
||||
@ -433,3 +435,52 @@ class S3DestinationPlugin(ExportDestinationPlugin):
|
||||
data=token,
|
||||
encrypt=False,
|
||||
account_number=account_number)
|
||||
|
||||
|
||||
class SNSNotificationPlugin(ExpirationNotificationPlugin):
|
||||
title = "AWS SNS"
|
||||
slug = "aws-sns"
|
||||
description = "Sends notifications to AWS SNS"
|
||||
version = aws.VERSION
|
||||
|
||||
author = "Jasmine Schladen <jschladen@netflix.com>"
|
||||
author_url = "https://github.com/Netflix/lemur"
|
||||
|
||||
additional_options = [
|
||||
{
|
||||
"name": "accountNumber",
|
||||
"type": "str",
|
||||
"required": True,
|
||||
"validation": "[0-9]{12}",
|
||||
"helpMessage": "A valid AWS account number with permission to access the SNS topic",
|
||||
},
|
||||
{
|
||||
"name": "region",
|
||||
"type": "str",
|
||||
"required": True,
|
||||
"validation": "[0-9a-z\\-]{1,25}",
|
||||
"helpMessage": "Region in which the SNS topic is located, e.g. \"us-east-1\"",
|
||||
},
|
||||
{
|
||||
"name": "topicName",
|
||||
"type": "str",
|
||||
"required": True,
|
||||
# base topic name is 1-256 characters (alphanumeric plus underscore and hyphen)
|
||||
"validation": "^[a-zA-Z0-9_\\-]{1,256}$",
|
||||
"helpMessage": "The name of the topic to use for expiration notifications",
|
||||
}
|
||||
]
|
||||
|
||||
def send(self, notification_type, message, excluded_targets, options, **kwargs):
|
||||
"""
|
||||
While we receive a `targets` parameter here, it is unused, as the SNS topic is pre-configured in the
|
||||
plugin configuration, and can't reasonably be changed dynamically.
|
||||
"""
|
||||
|
||||
topic_arn = f"arn:aws:sns:{self.get_option('region', options)}:" \
|
||||
f"{self.get_option('accountNumber', options)}:" \
|
||||
f"{self.get_option('topicName', options)}"
|
||||
|
||||
current_app.logger.info(f"Publishing {notification_type} notification to topic {topic_arn}")
|
||||
sns.publish(topic_arn, message, notification_type, region_name=self.get_option("region", options))
|
||||
|
||||
|
55
lemur/plugins/lemur_aws/sns.py
Normal file
55
lemur/plugins/lemur_aws/sns.py
Normal file
@ -0,0 +1,55 @@
|
||||
"""
|
||||
.. module: lemur.plugins.lemur_aws.sts
|
||||
:platform: Unix
|
||||
:copyright: (c) 2020 by Netflix Inc., see AUTHORS for more
|
||||
:license: Apache, see LICENSE for more details.
|
||||
.. moduleauthor:: Jasmine Schladen <jschladen@netflix.com>
|
||||
"""
|
||||
import json
|
||||
|
||||
import arrow
|
||||
import boto3
|
||||
from flask import current_app
|
||||
|
||||
|
||||
def publish(topic_arn, certificates, notification_type, **kwargs):
|
||||
sns_client = boto3.client("sns", **kwargs)
|
||||
message_ids = {}
|
||||
for certificate in certificates:
|
||||
message_ids[certificate["name"]] = publish_single(sns_client, topic_arn, certificate, notification_type)
|
||||
|
||||
return message_ids
|
||||
|
||||
|
||||
def publish_single(sns_client, topic_arn, certificate, notification_type):
|
||||
response = sns_client.publish(
|
||||
TopicArn=topic_arn,
|
||||
Message=format_message(certificate, notification_type),
|
||||
)
|
||||
|
||||
response_code = response["ResponseMetadata"]["HTTPStatusCode"]
|
||||
if response_code != 200:
|
||||
raise Exception(f"Failed to publish {notification_type} notification to SNS topic {topic_arn}. "
|
||||
f"SNS response: {response_code} {response}")
|
||||
|
||||
current_app.logger.info(f"AWS SNS message published to topic [{topic_arn}] with message ID {response['MessageId']}")
|
||||
current_app.logger.debug(f"AWS SNS message published to topic [{topic_arn}]: [{response}]")
|
||||
|
||||
return response["MessageId"]
|
||||
|
||||
|
||||
def create_certificate_url(name):
|
||||
return "https://{hostname}/#/certificates/{name}".format(
|
||||
hostname=current_app.config.get("LEMUR_HOSTNAME"), name=name
|
||||
)
|
||||
|
||||
|
||||
def format_message(certificate, notification_type):
|
||||
json_message = {
|
||||
"notification_type": notification_type,
|
||||
"certificate_name": certificate["name"],
|
||||
"expires": arrow.get(certificate["validityEnd"]).format("YYYY-MM-ddTHH:mm:ss"), # 2047-12-T22:00:00
|
||||
"endpoints_detected": len(certificate["endpoints"]),
|
||||
"details": create_certificate_url(certificate["name"])
|
||||
}
|
||||
return json.dumps(json_message)
|
120
lemur/plugins/lemur_aws/tests/test_sns.py
Normal file
120
lemur/plugins/lemur_aws/tests/test_sns.py
Normal file
@ -0,0 +1,120 @@
|
||||
import json
|
||||
from datetime import timedelta
|
||||
|
||||
import arrow
|
||||
import boto3
|
||||
from moto import mock_sns, mock_sqs, mock_ses
|
||||
|
||||
from lemur.certificates.schemas import certificate_notification_output_schema
|
||||
from lemur.plugins.lemur_aws.sns import format_message
|
||||
from lemur.plugins.lemur_aws.sns import publish
|
||||
from lemur.tests.factories import NotificationFactory, CertificateFactory
|
||||
from lemur.tests.test_messaging import verify_sender_email
|
||||
|
||||
|
||||
@mock_sns()
|
||||
def test_format(certificate, endpoint):
|
||||
data = [certificate_notification_output_schema.dump(certificate).data]
|
||||
|
||||
for certificate in data:
|
||||
expected_message = {
|
||||
"notification_type": "expiration",
|
||||
"certificate_name": certificate["name"],
|
||||
"expires": arrow.get(certificate["validityEnd"]).format("YYYY-MM-ddTHH:mm:ss"),
|
||||
"endpoints_detected": 0,
|
||||
"details": "https://lemur.example.com/#/certificates/{name}".format(name=certificate["name"])
|
||||
}
|
||||
assert expected_message == json.loads(format_message(certificate, "expiration"))
|
||||
|
||||
|
||||
@mock_sns()
|
||||
@mock_sqs()
|
||||
def create_and_subscribe_to_topic():
|
||||
sns_client = boto3.client("sns", region_name="us-east-1")
|
||||
topic_arn = sns_client.create_topic(Name='lemursnstest')["TopicArn"]
|
||||
|
||||
sqs_client = boto3.client("sqs", region_name="us-east-1")
|
||||
queue = sqs_client.create_queue(QueueName="lemursnstestqueue")
|
||||
queue_url = queue["QueueUrl"]
|
||||
queue_arn = sqs_client.get_queue_attributes(QueueUrl=queue_url)["Attributes"]["QueueArn"]
|
||||
sns_client.subscribe(TopicArn=topic_arn, Protocol="sqs", Endpoint=queue_arn)
|
||||
|
||||
return [topic_arn, sqs_client, queue_url]
|
||||
|
||||
|
||||
@mock_sns()
|
||||
@mock_sqs()
|
||||
def test_publish(certificate, endpoint):
|
||||
data = [certificate_notification_output_schema.dump(certificate).data]
|
||||
|
||||
topic_arn, sqs_client, queue_url = create_and_subscribe_to_topic()
|
||||
|
||||
message_ids = publish(topic_arn, data, "expiration", region_name="us-east-1")
|
||||
assert len(message_ids) == len(data)
|
||||
received_messages = sqs_client.receive_message(QueueUrl=queue_url)["Messages"]
|
||||
|
||||
for certificate in data:
|
||||
expected_message_id = message_ids[certificate["name"]]
|
||||
actual_message = next(
|
||||
(m for m in received_messages if json.loads(m["Body"])["MessageId"] == expected_message_id), None)
|
||||
assert json.loads(actual_message["Body"])["Message"] == format_message(certificate, "expiration")
|
||||
|
||||
|
||||
def get_options():
|
||||
return [
|
||||
{"name": "interval", "value": 10},
|
||||
{"name": "unit", "value": "days"},
|
||||
{"name": "region", "value": "us-east-1"},
|
||||
{"name": "accountNumber", "value": "123456789012"},
|
||||
{"name": "topicName", "value": "lemursnstest"},
|
||||
]
|
||||
|
||||
|
||||
@mock_sns()
|
||||
@mock_sqs()
|
||||
@mock_ses() # because email notifications are also sent
|
||||
def test_send_expiration_notification():
|
||||
from lemur.notifications.messaging import send_expiration_notifications
|
||||
|
||||
verify_sender_email() # emails are sent to owner and security; SNS only used for configured notification
|
||||
topic_arn, sqs_client, queue_url = create_and_subscribe_to_topic()
|
||||
|
||||
notification = NotificationFactory(plugin_name="aws-sns")
|
||||
notification.options = get_options()
|
||||
|
||||
now = arrow.utcnow()
|
||||
in_ten_days = now + timedelta(days=10, hours=1) # a bit more than 10 days since we'll check in the future
|
||||
|
||||
certificate = CertificateFactory()
|
||||
certificate.not_after = in_ten_days
|
||||
certificate.notifications.append(notification)
|
||||
|
||||
assert send_expiration_notifications([]) == (3, 0) # owner, SNS, and security
|
||||
|
||||
received_messages = sqs_client.receive_message(QueueUrl=queue_url)["Messages"]
|
||||
assert len(received_messages) == 1
|
||||
expected_message = format_message(certificate_notification_output_schema.dump(certificate).data, "expiration")
|
||||
actual_message = json.loads(received_messages[0]["Body"])["Message"]
|
||||
assert actual_message == expected_message
|
||||
|
||||
|
||||
# Currently disabled as the SNS plugin doesn't support this type of notification
|
||||
# def test_send_rotation_notification(endpoint, source_plugin):
|
||||
# from lemur.notifications.messaging import send_rotation_notification
|
||||
# from lemur.deployment.service import rotate_certificate
|
||||
#
|
||||
# notification = NotificationFactory(plugin_name="aws-sns")
|
||||
# notification.options = get_options()
|
||||
#
|
||||
# new_certificate = CertificateFactory()
|
||||
# rotate_certificate(endpoint, new_certificate)
|
||||
# assert endpoint.certificate == new_certificate
|
||||
#
|
||||
# assert send_rotation_notification(new_certificate)
|
||||
|
||||
|
||||
# Currently disabled as the SNS plugin doesn't support this type of notification
|
||||
# def test_send_pending_failure_notification(user, pending_certificate, async_issuer_plugin):
|
||||
# from lemur.notifications.messaging import send_pending_failure_notification
|
||||
#
|
||||
# assert send_pending_failure_notification(pending_certificate)
|
@ -234,7 +234,7 @@ def handle_cis_response(response):
|
||||
return response.json()
|
||||
|
||||
|
||||
@retry(stop_max_attempt_number=10, wait_fixed=10000)
|
||||
@retry(stop_max_attempt_number=10, wait_fixed=1000)
|
||||
def get_certificate_id(session, base_url, order_id):
|
||||
"""Retrieve certificate order id from Digicert API."""
|
||||
order_url = "{0}/services/v2/order/certificate/{1}".format(base_url, order_id)
|
||||
@ -245,7 +245,7 @@ def get_certificate_id(session, base_url, order_id):
|
||||
return response_data["certificate"]["id"]
|
||||
|
||||
|
||||
@retry(stop_max_attempt_number=10, wait_fixed=10000)
|
||||
@retry(stop_max_attempt_number=10, wait_fixed=1000)
|
||||
def get_cis_certificate(session, base_url, order_id):
|
||||
"""Retrieve certificate order id from Digicert API, including the chain"""
|
||||
certificate_url = "{0}/platform/cis/certificate/{1}/download".format(base_url, order_id)
|
||||
|
@ -17,6 +17,7 @@ from lemur.plugins.bases import ExpirationNotificationPlugin
|
||||
from lemur.plugins import lemur_email as email
|
||||
|
||||
from lemur.plugins.lemur_email.templates.config import env
|
||||
from lemur.plugins.utils import get_plugin_option
|
||||
|
||||
|
||||
def render_html(template_name, options, certificates):
|
||||
@ -111,3 +112,13 @@ class EmailNotificationPlugin(ExpirationNotificationPlugin):
|
||||
|
||||
elif s_type == "smtp":
|
||||
send_via_smtp(subject, body, targets)
|
||||
|
||||
@staticmethod
|
||||
def filter_recipients(options, excluded_recipients, **kwargs):
|
||||
notification_recipients = get_plugin_option("recipients", options)
|
||||
if notification_recipients:
|
||||
notification_recipients = notification_recipients.split(",")
|
||||
# removing owner and security_email from notification_recipient
|
||||
notification_recipients = [i for i in notification_recipients if i not in excluded_recipients]
|
||||
|
||||
return notification_recipients
|
||||
|
@ -2,26 +2,21 @@ import os
|
||||
from datetime import timedelta
|
||||
|
||||
import arrow
|
||||
import boto3
|
||||
from moto import mock_ses
|
||||
|
||||
from lemur.certificates.schemas import certificate_notification_output_schema
|
||||
from lemur.plugins.lemur_email.plugin import render_html
|
||||
from lemur.tests.factories import CertificateFactory
|
||||
from lemur.tests.test_messaging import verify_sender_email
|
||||
|
||||
dir_path = os.path.dirname(os.path.realpath(__file__))
|
||||
|
||||
|
||||
@mock_ses
|
||||
def verify_sender_email():
|
||||
ses_client = boto3.client("ses", region_name="us-east-1")
|
||||
ses_client.verify_email_identity(EmailAddress="lemur@example.com")
|
||||
|
||||
|
||||
def get_options():
|
||||
return [
|
||||
{"name": "interval", "value": 10},
|
||||
{"name": "unit", "value": "days"},
|
||||
{"name": "recipients", "value": "person1@example.com,person2@example.com"},
|
||||
]
|
||||
|
||||
|
||||
@ -59,7 +54,7 @@ def test_send_expiration_notification():
|
||||
certificate.notifications[0].options = get_options()
|
||||
|
||||
verify_sender_email()
|
||||
assert send_expiration_notifications([]) == (2, 0)
|
||||
assert send_expiration_notifications([]) == (3, 0) # owner, recipients (only counted as 1), and security
|
||||
|
||||
|
||||
@mock_ses
|
||||
@ -81,3 +76,15 @@ def test_send_pending_failure_notification(user, pending_certificate, async_issu
|
||||
|
||||
verify_sender_email()
|
||||
assert send_pending_failure_notification(pending_certificate)
|
||||
|
||||
|
||||
def test_filter_recipients(certificate, endpoint):
|
||||
from lemur.plugins.lemur_email.plugin import EmailNotificationPlugin
|
||||
|
||||
options = [{"name": "recipients", "value": "security@example.com,bob@example.com,joe@example.com"}]
|
||||
assert EmailNotificationPlugin.filter_recipients(options, []) == ["security@example.com", "bob@example.com",
|
||||
"joe@example.com"]
|
||||
assert EmailNotificationPlugin.filter_recipients(options, ["security@example.com"]) == ["bob@example.com",
|
||||
"joe@example.com"]
|
||||
assert EmailNotificationPlugin.filter_recipients(options, ["security@example.com", "bob@example.com",
|
||||
"joe@example.com"]) == []
|
||||
|
@ -1,9 +1,9 @@
|
||||
|
||||
import arrow
|
||||
import requests
|
||||
import json
|
||||
import sys
|
||||
from flask import current_app
|
||||
from retrying import retry
|
||||
|
||||
from lemur.plugins import lemur_entrust as entrust
|
||||
from lemur.plugins.bases import IssuerPlugin, SourcePlugin
|
||||
@ -78,7 +78,6 @@ def process_options(options):
|
||||
"eku": "SERVER_AND_CLIENT_AUTH",
|
||||
"certType": product_type,
|
||||
"certExpiryDate": validity_end,
|
||||
# "keyType": "RSA", Entrust complaining about this parameter
|
||||
"tracking": tracking_data
|
||||
}
|
||||
return data
|
||||
@ -87,7 +86,7 @@ def process_options(options):
|
||||
def handle_response(my_response):
|
||||
"""
|
||||
Helper function for parsing responses from the Entrust API.
|
||||
:param content:
|
||||
:param my_response:
|
||||
:return: :raise Exception:
|
||||
"""
|
||||
msg = {
|
||||
@ -100,27 +99,47 @@ def handle_response(my_response):
|
||||
}
|
||||
|
||||
try:
|
||||
d = json.loads(my_response.content)
|
||||
data = json.loads(my_response.content)
|
||||
except ValueError:
|
||||
# catch an empty jason object here
|
||||
d = {'response': 'No detailed message'}
|
||||
s = my_response.status_code
|
||||
if s > 399:
|
||||
raise Exception(f"ENTRUST error: {msg.get(s, s)}\n{d['errors']}")
|
||||
data = {'response': 'No detailed message'}
|
||||
status_code = my_response.status_code
|
||||
if status_code > 399:
|
||||
raise Exception(f"ENTRUST error: {msg.get(status_code, status_code)}\n{data['errors']}")
|
||||
|
||||
log_data = {
|
||||
"function": f"{__name__}.{sys._getframe().f_code.co_name}",
|
||||
"message": "Response",
|
||||
"status": s,
|
||||
"response": d
|
||||
"status": status_code,
|
||||
"response": data
|
||||
}
|
||||
current_app.logger.info(log_data)
|
||||
if d == {'response': 'No detailed message'}:
|
||||
if data == {'response': 'No detailed message'}:
|
||||
# status if no data
|
||||
return s
|
||||
return status_code
|
||||
else:
|
||||
# return data from the response
|
||||
return d
|
||||
return data
|
||||
|
||||
|
||||
@retry(stop_max_attempt_number=3, wait_fixed=5000)
|
||||
def order_and_download_certificate(session, url, data):
|
||||
"""
|
||||
Helper function to place a certificacte order and download it
|
||||
:param session:
|
||||
:param url: Entrust endpoint url
|
||||
:param data: CSR, and the required order details, such as validity length
|
||||
:return: the cert chain
|
||||
:raise Exception:
|
||||
"""
|
||||
try:
|
||||
response = session.post(url, json=data, timeout=(15, 40))
|
||||
except requests.exceptions.Timeout:
|
||||
raise Exception("Timeout for POST")
|
||||
except requests.exceptions.RequestException as e:
|
||||
raise Exception(f"Error for POST {e}")
|
||||
|
||||
return handle_response(response)
|
||||
|
||||
|
||||
class EntrustIssuerPlugin(IssuerPlugin):
|
||||
@ -178,14 +197,8 @@ class EntrustIssuerPlugin(IssuerPlugin):
|
||||
data = process_options(issuer_options)
|
||||
data["csr"] = csr
|
||||
|
||||
try:
|
||||
response = self.session.post(url, json=data, timeout=(15, 40))
|
||||
except requests.exceptions.Timeout:
|
||||
raise Exception("Timeout for POST")
|
||||
except requests.exceptions.RequestException as e:
|
||||
raise Exception(f"Error for POST {e}")
|
||||
response_dict = order_and_download_certificate(self.session, url, data)
|
||||
|
||||
response_dict = handle_response(response)
|
||||
external_id = response_dict['trackingId']
|
||||
cert = response_dict['endEntityCert']
|
||||
if len(response_dict['chainCerts']) < 2:
|
||||
@ -200,6 +213,7 @@ class EntrustIssuerPlugin(IssuerPlugin):
|
||||
|
||||
return cert, chain, external_id
|
||||
|
||||
@retry(stop_max_attempt_number=3, wait_fixed=1000)
|
||||
def revoke_certificate(self, certificate, comments):
|
||||
"""Revoke an Entrust certificate."""
|
||||
base_url = current_app.config.get("ENTRUST_URL")
|
||||
@ -216,6 +230,7 @@ class EntrustIssuerPlugin(IssuerPlugin):
|
||||
metrics.send("entrust_revoke_certificate", "counter", 1)
|
||||
return handle_response(response)
|
||||
|
||||
@retry(stop_max_attempt_number=3, wait_fixed=1000)
|
||||
def deactivate_certificate(self, certificate):
|
||||
"""Deactivates an Entrust certificate."""
|
||||
base_url = current_app.config.get("ENTRUST_URL")
|
||||
@ -244,7 +259,7 @@ class EntrustIssuerPlugin(IssuerPlugin):
|
||||
def get_ordered_certificate(self, order_id):
|
||||
raise NotImplementedError("Not implemented\n", self, order_id)
|
||||
|
||||
def canceled_ordered_certificate(self, pending_cert, **kwargs):
|
||||
def cancel_ordered_certificate(self, pending_cert, **kwargs):
|
||||
raise NotImplementedError("Not implemented\n", self, pending_cert, **kwargs)
|
||||
|
||||
|
||||
|
@ -112,6 +112,9 @@ class SlackNotificationPlugin(ExpirationNotificationPlugin):
|
||||
"""
|
||||
A typical check can be performed using the notify command:
|
||||
`lemur notify`
|
||||
|
||||
While we receive a `targets` parameter here, it is unused, as Slack webhooks do not allow
|
||||
dynamic re-targeting of messages. The webhook itself specifies a channel.
|
||||
"""
|
||||
attachments = None
|
||||
if notification_type == "expiration":
|
||||
@ -124,7 +127,7 @@ class SlackNotificationPlugin(ExpirationNotificationPlugin):
|
||||
raise Exception("Unable to create message attachments")
|
||||
|
||||
body = {
|
||||
"text": "Lemur {0} Notification".format(notification_type.capitalize()),
|
||||
"text": f"Lemur {notification_type.capitalize()} Notification",
|
||||
"attachments": attachments,
|
||||
"channel": self.get_option("recipients", options),
|
||||
"username": self.get_option("username", options),
|
||||
@ -133,8 +136,8 @@ class SlackNotificationPlugin(ExpirationNotificationPlugin):
|
||||
r = requests.post(self.get_option("webhook", options), json.dumps(body))
|
||||
|
||||
if r.status_code not in [200]:
|
||||
raise Exception("Failed to send message")
|
||||
raise Exception(f"Failed to send message. Slack response: {r.status_code} {body}")
|
||||
|
||||
current_app.logger.error(
|
||||
"Slack response: {0} Message Body: {1}".format(r.status_code, body)
|
||||
current_app.logger.info(
|
||||
f"Slack response: {r.status_code} Message Body: {body}"
|
||||
)
|
||||
|
@ -1,8 +1,10 @@
|
||||
from datetime import timedelta
|
||||
|
||||
import arrow
|
||||
from moto import mock_ses
|
||||
|
||||
from lemur.tests.factories import NotificationFactory, CertificateFactory
|
||||
from lemur.tests.test_messaging import verify_sender_email
|
||||
|
||||
|
||||
def test_formatting(certificate):
|
||||
@ -38,9 +40,12 @@ def get_options():
|
||||
]
|
||||
|
||||
|
||||
@mock_ses() # because email notifications are also sent
|
||||
def test_send_expiration_notification():
|
||||
from lemur.notifications.messaging import send_expiration_notifications
|
||||
|
||||
verify_sender_email() # emails are sent to owner and security; Slack only used for configured notification
|
||||
|
||||
notification = NotificationFactory(plugin_name="slack-notification")
|
||||
notification.options = get_options()
|
||||
|
||||
@ -51,7 +56,7 @@ def test_send_expiration_notification():
|
||||
certificate.not_after = in_ten_days
|
||||
certificate.notifications.append(notification)
|
||||
|
||||
assert send_expiration_notifications([]) == (2, 0)
|
||||
assert send_expiration_notifications([]) == (3, 0) # owner, Slack, and security
|
||||
|
||||
|
||||
# Currently disabled as the Slack plugin doesn't support this type of notification
|
||||
|
Reference in New Issue
Block a user