Adding tests to AWS plugin
This commit is contained in:
@ -1,5 +1,5 @@
|
||||
"""
|
||||
.. module: lemur.common.services.aws.iam
|
||||
.. module: lemur.plugins.lemur_aws.iam
|
||||
:platform: Unix
|
||||
:synopsis: Contains helper functions for interactive with AWS IAM Apis.
|
||||
:copyright: (c) 2015 by Netflix Inc., see AUTHORS for more
|
||||
@ -19,21 +19,6 @@ def get_name_from_arn(arn):
|
||||
return arn.split("/", 1)[1]
|
||||
|
||||
|
||||
def ssl_split(param_string):
|
||||
"""
|
||||
|
||||
:param param_string:
|
||||
:return:
|
||||
"""
|
||||
output = {}
|
||||
parts = str(param_string).split("/")
|
||||
for part in parts:
|
||||
if "=" in part:
|
||||
key, value = part.split("=", 1)
|
||||
output[key] = value
|
||||
return output
|
||||
|
||||
|
||||
def upload_cert(account_number, cert, private_key, cert_chain=None):
|
||||
"""
|
||||
Upload a certificate to AWS
|
||||
|
@ -58,10 +58,22 @@ class AWSSourcePlugin(SourcePlugin):
|
||||
author = 'Kevin Glisson'
|
||||
author_url = 'https://github.com/netflix/lemur'
|
||||
|
||||
options = {
|
||||
'accountNumber': {'type': 'int'},
|
||||
'pollRate': {'type': 'int', 'default': '60'}
|
||||
}
|
||||
options = [
|
||||
{
|
||||
'name': 'accountNumber',
|
||||
'type': 'int',
|
||||
'required': True,
|
||||
'validation': '/^[0-9]{12,12}$/',
|
||||
'helpMessage': 'Must be a valid AWS account number!',
|
||||
},
|
||||
{
|
||||
'name': 'pollRate',
|
||||
'type': 'int',
|
||||
'required': False,
|
||||
'helpMessage': 'Rate in seconds to poll source for new information.',
|
||||
'default': '60',
|
||||
}
|
||||
]
|
||||
|
||||
def get_certificates(self, **kwargs):
|
||||
certs = []
|
||||
|
34
lemur/plugins/lemur_aws/tests/test_iam.py
Normal file
34
lemur/plugins/lemur_aws/tests/test_iam.py
Normal file
@ -0,0 +1,34 @@
|
||||
import pytest
|
||||
from moto import mock_iam, mock_sts
|
||||
|
||||
from lemur.certificates.models import Certificate
|
||||
|
||||
from lemur.tests.certs import EXTERNAL_VALID_STR, PRIVATE_KEY_STR
|
||||
from lemur.tests.conftest import app # noqa
|
||||
|
||||
|
||||
def test_get_name_from_arn():
|
||||
from lemur.plugins.lemur_aws.iam import get_name_from_arn
|
||||
arn = 'arn:aws:iam::123456789012:server-certificate/tttt2.netflixtest.net-NetflixInc-20150624-20150625'
|
||||
assert get_name_from_arn(arn) == 'tttt2.netflixtest.net-NetflixInc-20150624-20150625'
|
||||
|
||||
|
||||
@mock_sts()
|
||||
@mock_iam()
|
||||
def test_get_all_server_certs(app):
|
||||
from lemur.plugins.lemur_aws.iam import upload_cert, get_all_server_certs
|
||||
cert = Certificate(EXTERNAL_VALID_STR)
|
||||
upload_cert('123456789012', cert, PRIVATE_KEY_STR)
|
||||
certs = get_all_server_certs('123456789012')
|
||||
assert len(certs) == 1
|
||||
|
||||
|
||||
@mock_sts()
|
||||
@mock_iam()
|
||||
def test_get_cert_from_arn(app):
|
||||
from lemur.plugins.lemur_aws.iam import upload_cert, get_cert_from_arn
|
||||
cert = Certificate(EXTERNAL_VALID_STR)
|
||||
upload_cert('123456789012', cert, PRIVATE_KEY_STR)
|
||||
body, chain = get_cert_from_arn('arn:aws:iam::123456789012:server-certificate/tttt2.netflixtest.net-NetflixInc-20150624-20150625')
|
||||
assert body.replace('\n', '') == EXTERNAL_VALID_STR.replace('\n', '')
|
||||
|
@ -16,6 +16,7 @@ from flask import current_app
|
||||
from lemur.plugins.bases import IssuerPlugin
|
||||
from lemur.plugins import lemur_verisign as verisign
|
||||
from lemur.plugins.lemur_verisign import constants
|
||||
from lemur.common.utils import get_psuedo_random_string
|
||||
|
||||
|
||||
# https://support.venafi.com/entries/66445046-Info-VeriSign-Error-Codes
|
||||
@ -58,9 +59,57 @@ VERISIGN_ERRORS = {
|
||||
}
|
||||
|
||||
|
||||
def process_options(options):
|
||||
"""
|
||||
Processes and maps the incoming issuer options to fields/options that
|
||||
verisign understands
|
||||
|
||||
:param options:
|
||||
:return: dict or valid verisign options
|
||||
"""
|
||||
data = {
|
||||
'challenge': get_psuedo_random_string(),
|
||||
'serverType': 'Apache',
|
||||
'certProductType': 'Server',
|
||||
'firstName': current_app.config.get("VERISIGN_FIRST_NAME"),
|
||||
'lastName': current_app.config.get("VERISIGN_LAST_NAME"),
|
||||
'signatureAlgorithm': 'sha256WithRSAEncryption',
|
||||
'email': current_app.config.get("VERISIGN_EMAIL")
|
||||
}
|
||||
|
||||
if options.get('validityEnd'):
|
||||
end_date, period = get_default_issuance(options)
|
||||
data['specificEndDate'] = end_date
|
||||
data['validityPeriod'] = period
|
||||
|
||||
return data
|
||||
|
||||
|
||||
def get_default_issuance(options):
|
||||
"""
|
||||
Gets the default time range for certificates
|
||||
|
||||
:param options:
|
||||
:return:
|
||||
"""
|
||||
specific_end_date = arrow.get(options['validityEnd']).replace(days=-1).format("MM/DD/YYYY")
|
||||
|
||||
now = arrow.utcnow()
|
||||
then = arrow.get(options['validityEnd'])
|
||||
|
||||
if then < now.replace(years=+1):
|
||||
validity_period = '1Y'
|
||||
elif then < now.replace(years=+2):
|
||||
validity_period = '2Y'
|
||||
else:
|
||||
raise Exception("Verisign issued certificates cannot exceed two years in validity")
|
||||
|
||||
return specific_end_date, validity_period
|
||||
|
||||
|
||||
def handle_response(content):
|
||||
"""
|
||||
Helper function that helps with parsing responses from the Verisign API.
|
||||
Helper function for parsing responses from the Verisign API.
|
||||
:param content:
|
||||
:return: :raise Exception:
|
||||
"""
|
||||
@ -99,29 +148,8 @@ class VerisignIssuerPlugin(IssuerPlugin):
|
||||
"""
|
||||
url = current_app.config.get("VERISIGN_URL") + '/enroll'
|
||||
|
||||
data = {
|
||||
'csr': csr,
|
||||
'challenge': issuer_options['challenge'],
|
||||
'serverType': 'Apache',
|
||||
'certProductType': 'Server',
|
||||
'firstName': current_app.config.get("VERISIGN_FIRST_NAME"),
|
||||
'lastName': current_app.config.get("VERISIGN_LAST_NAME"),
|
||||
'signatureAlgorithm': 'sha256WithRSAEncryption',
|
||||
'email': current_app.config.get("VERISIGN_EMAIL")
|
||||
}
|
||||
|
||||
if issuer_options.get('validityEnd'):
|
||||
data['specificEndDate'] = arrow.get(issuer_options['validityEnd']).replace(days=-1).format("MM/DD/YYYY")
|
||||
|
||||
now = arrow.utcnow()
|
||||
then = arrow.get(issuer_options['validityEnd'])
|
||||
|
||||
if then < now.replace(years=+1):
|
||||
data['validityPeriod'] = '1Y'
|
||||
elif then < now.replace(years=+2):
|
||||
data['validityPeriod'] = '2Y'
|
||||
else:
|
||||
raise Exception("Verisign issued certificates cannot exceed two years in validity")
|
||||
data = process_options(issuer_options)
|
||||
data['csr'] = csr
|
||||
|
||||
current_app.logger.info("Requesting a new verisign certificate: {0}".format(data))
|
||||
|
||||
|
Reference in New Issue
Block a user