Merge pull request #12 from Netflix/0.1.x

0.1.1
This commit is contained in:
kevgliss 2015-06-30 09:42:54 -07:00
commit 477876df50
42 changed files with 1589 additions and 405 deletions

View File

@ -12,28 +12,28 @@ from flask import jsonify
from lemur import factory
from lemur.users.views import mod as users
from lemur.roles.views import mod as roles
from lemur.auth.views import mod as auth
from lemur.domains.views import mod as domains
from lemur.elbs.views import mod as elbs
from lemur.accounts.views import mod as accounts
from lemur.authorities.views import mod as authorities
from lemur.listeners.views import mod as listeners
from lemur.certificates.views import mod as certificates
from lemur.status.views import mod as status
from lemur.users.views import mod as users_bp
from lemur.roles.views import mod as roles_bp
from lemur.auth.views import mod as auth_bp
from lemur.domains.views import mod as domains_bp
from lemur.elbs.views import mod as elbs_bp
from lemur.accounts.views import mod as accounts_bp
from lemur.authorities.views import mod as authorities_bp
from lemur.listeners.views import mod as listeners_bp
from lemur.certificates.views import mod as certificates_bp
from lemur.status.views import mod as status_bp
LEMUR_BLUEPRINTS = (
users,
roles,
auth,
domains,
elbs,
accounts,
authorities,
listeners,
certificates,
status
users_bp,
roles_bp,
auth_bp,
domains_bp,
elbs_bp,
accounts_bp,
authorities_bp,
listeners_bp,
certificates_bp,
status_bp
)
def create_app(config=None):

View File

@ -181,7 +181,7 @@ class Accounts(AuthenticatedResource):
@marshal_items(FIELDS)
def put(self, account_id):
"""
.. http:post:: /accounts/1
.. http:put:: /accounts/1
Updates an account

View File

@ -13,7 +13,7 @@ from flask.ext.principal import Permission, RoleNeed
# Permissions
operator_permission = Permission(RoleNeed('operator'))
admin_permission = Permission(RoleNeed('secops@netflix.com'))
admin_permission = Permission(RoleNeed('admin'))
CertificateCreator = namedtuple('certificate', ['method', 'value'])
CertificateCreatorNeed = partial(CertificateCreator, 'certificateView')

View File

@ -96,7 +96,10 @@ def login_required(f):
response.status_code = 401
return response
token = request.headers.get('Authorization').split()[1]
try:
token = request.headers.get('Authorization').split()[1]
except Exception as e:
return dict(message='Token is invalid'), 403
try:
payload = jwt.decode(token, current_app.config['TOKEN_SECRET'])
@ -109,7 +112,7 @@ def login_required(f):
g.current_user = user_service.get(payload['sub'])
if not g.current_user.id:
if not g.current_user:
return dict(message='You are not logged in'), 403
# Tell Flask-Principal the identity changed

View File

@ -57,11 +57,14 @@ def create(kwargs):
cert = cert_service.save_cert(cert_body, None, intermediate, None, None, None)
cert.user = g.current_user
# we create and attach any roles that cloudCA gives us
# we create and attach any roles that the issuer gives us
role_objs = []
for r in issuer_roles:
role = role_service.create(r['name'], password=r['password'], description="CloudCA auto generated role",
username=r['username'])
role = role_service.create(
r['name'],
password=r['password'],
description="{0} auto generated role".format(kwargs.get('pluginName')),
username=r['username'])
# the user creating the authority should be able to administer it
if role.username == 'admin':
g.current_user.roles.append(role)

View File

@ -365,7 +365,11 @@ class CertificateAuthority(AuthenticatedResource):
:statuscode 200: no error
:statuscode 403: unauthenticated
"""
return certificate_service.get(certificate_id).authority
cert = certificate_service.get(certificate_id)
if not cert:
return dict(message="Certificate not found"), 404
return cert.authority
api.add_resource(AuthoritiesList, '/authorities', endpoint='authorities')
api.add_resource(Authorities, '/authorities/<int:authority_id>', endpoint='authority')

View File

@ -26,6 +26,36 @@ from lemur.constants import SAN_NAMING_TEMPLATE, DEFAULT_NAMING_TEMPLATE, NONSTA
from lemur.models import certificate_associations, certificate_account_associations
def create_name(issuer, not_before, not_after, common_name, san):
"""
Create a name for our certificate. A naming standard
is based on a series of templates. The name includes
useful information such as Common Name, Validation dates,
and Issuer.
:rtype : str
:return:
"""
delchars = ''.join(c for c in map(chr, range(256)) if not c.isalnum())
# aws doesn't allow special chars
subject = common_name.replace('*', "WILDCARD")
issuer = issuer.translate(None, delchars)
if san:
t = SAN_NAMING_TEMPLATE
else:
t = DEFAULT_NAMING_TEMPLATE
temp = t.format(
subject=subject,
issuer=issuer,
not_before=not_before.strftime('%Y%m%d'),
not_after=not_after.strftime('%Y%m%d')
)
return temp
def cert_get_cn(cert):
"""
Attempts to get a sane common name from a given certificate.
@ -33,12 +63,9 @@ def cert_get_cn(cert):
:param cert:
:return: Common name or None
"""
try:
return cert.subject.get_attributes_for_oid(
x509.OID_COMMON_NAME
)[0].value.strip()
except Exception as e:
current_app.logger.error("Unable to get CN! {0}".format(e))
return cert.subject.get_attributes_for_oid(
x509.OID_COMMON_NAME
)[0].value.strip()
def cert_get_domains(cert):
@ -48,17 +75,21 @@ def cert_get_domains(cert):
return the common name.
:param cert:
:return: List of domains
:return: List of domainss
"""
domains = []
try:
ext = cert.extensions.get_extension_for_oid(x509.OID_SUBJECT_ALTERNATIVE_NAME)
entries = ext.get_values_for(x509.DNSName)
entries = ext.value.get_values_for_type(x509.DNSName)
for entry in entries:
domains.append(entry.split(":")[1].strip(", "))
domains.append(entry)
except Exception as e:
current_app.logger.warning("Failed to get SubjectAltName: {0}".format(e))
domains.append(cert_get_cn(cert))
# do a simple check to make sure it's a real domain
common_name = cert_get_cn(cert)
if '.' in common_name:
domains.append(common_name)
return domains
@ -106,7 +137,7 @@ def cert_get_bitstrength(cert):
:param cert:
:return: Integer
"""
return cert.public_key().key_size * 8
return cert.public_key().key_size
def cert_get_issuer(cert):
@ -116,26 +147,14 @@ def cert_get_issuer(cert):
:param cert:
:return: Issuer
"""
delchars = ''.join(c for c in map(chr, range(256)) if not c.isalnum())
try:
return cert.subject.get_attributes_for_oid(x509.OID_ORGANIZATION_NAME)[0].value
issuer = str(cert.subject.get_attributes_for_oid(x509.OID_ORGANIZATION_NAME)[0].value)
return issuer.translate(None, delchars)
except Exception as e:
current_app.logger.error("Unable to get issuer! {0}".format(e))
def cert_is_internal(cert):
"""
Uses an internal resource in order to determine if
a given certificate was issued by an 'internal' certificate
authority.
:param cert:
:return: Bool
"""
if cert_get_issuer(cert) in current_app.config.get('INTERNAL_CA', []):
return True
return False
def cert_get_not_before(cert):
"""
Gets the naive datetime of the certificates 'not_before' field.
@ -223,49 +242,11 @@ class Certificate(db.Model):
self.san = cert_is_san(cert)
self.not_before = cert_get_not_before(cert)
self.not_after = cert_get_not_after(cert)
self.name = self.create_name
self.name = create_name(self.issuer, self.not_before, self.not_after, self.cn, self.san)
for domain in cert_get_domains(cert):
self.domains.append(Domain(name=domain))
@property
def create_name(self):
"""
Create a name for our certificate. A naming standard
is based on a series of templates. The name includes
useful information such as Common Name, Validation dates,
and Issuer.
:rtype : str
:return:
"""
# aws doesn't allow special chars
if self.cn:
subject = self.cn.replace('*', "WILDCARD")
if self.san:
t = SAN_NAMING_TEMPLATE
else:
t = DEFAULT_NAMING_TEMPLATE
temp = t.format(
subject=subject,
issuer=self.issuer,
not_before=self.not_before.strftime('%Y%m%d'),
not_after=self.not_after.strftime('%Y%m%d')
)
else:
t = NONSTANDARD_NAMING_TEMPLATE
temp = t.format(
issuer=self.issuer,
not_before=self.not_before.strftime('%Y%m%d'),
not_after=self.not_after.strftime('%Y%m%d')
)
return temp
@property
def is_expired(self):
if self.not_after < datetime.datetime.now():
@ -296,12 +277,3 @@ class Certificate(db.Model):
def as_dict(self):
return {c.name: getattr(self, c.name) for c in self.__table__.columns}
def serialize(self):
blob = self.as_dict()
# TODO this should be done with relationships
user = user_service.get(self.user_id)
if user:
blob['creator'] = user.username
return blob

View File

@ -90,7 +90,7 @@ def private_key_str(value, name):
:return: :raise ValueError:
"""
try:
serialization.load_pem_private_key(str(value), backend=default_backend())
serialization.load_pem_private_key(str(value), None, backend=default_backend())
except Exception as e:
raise ValueError("The parameter '{0}' needs to be a valid RSA private key".format(name))
return value
@ -437,6 +437,9 @@ class CertificatePrivateKey(AuthenticatedResource):
:statuscode 403: unauthenticated
"""
cert = service.get(certificate_id)
if not cert:
return dict(message="Cannot find specified certificate"), 404
role = role_service.get_by_name(cert.owner)
permission = ViewKeyPermission(certificate_id, hasattr(role, 'id'))

View File

@ -45,11 +45,14 @@ class marshal_items(object):
return marshal(resp, self.fields)
except Exception as e:
current_app.logger.exception(e)
# this is a little weird hack to respect flask restful parsing errors on marshaled functions
if hasattr(e, 'code'):
return {'message': e.data['message']}, 400
if hasattr(e, 'data'):
return {'message': e.data['message']}, 400
else:
return {'message': 'unknown'}, 400
else:
current_app.logger.exception(e)
return {'message': e.message}, 400
return wrapper

View File

@ -15,7 +15,7 @@ from sqlalchemy import exc
from sqlalchemy.sql import and_, or_
from lemur.extensions import db
from lemur.exceptions import AttrNotFound, IntegrityError
from lemur.exceptions import AttrNotFound, IntegrityError, DuplicateError
def filter_none(kwargs):
@ -153,9 +153,10 @@ def create(model):
try:
db.session.add(model)
commit()
db.session.refresh(model)
except exc.IntegrityError as e:
raise IntegrityError(e.orig.diag.message_detail)
raise DuplicateError(e.orig.diag.message_detail)
db.session.refresh(model)
return model

View File

@ -11,6 +11,14 @@ class LemurException(Exception):
current_app.logger.error(self)
class DuplicateError(LemurException):
def __init__(self, key):
self.key = key
def __str__(self):
return repr("Duplicate found! Could not create: {0}".format(self.key))
class AuthenticationFailedException(LemurException):
def __init__(self, remote_ip, user_agent):
self.remote_ip = remote_ip

View File

@ -51,6 +51,12 @@ def create_app(app_name=None, blueprints=None, config=None):
configure_blueprints(app, blueprints)
configure_extensions(app)
configure_logging(app)
@app.teardown_appcontext
def teardown(exception=None):
if db.session:
db.session.remove()
return app
@ -84,7 +90,7 @@ def configure_app(app, config=None):
:return:
"""
try:
app.config.from_envvar("LEMUR_SETTINGS")
app.config.from_envvar("LEMUR_CONF")
except RuntimeError:
if config and config != 'None':
app.config.from_object(from_file(config))

View File

@ -1,16 +1,4 @@
import unittest
from nose.tools import eq_
from lemur import app
test_app = app.test_client()
HEADERS = {'Content-Type': 'application/json'}
def check_content_type(headers):
eq_(headers['Content-Type'], 'application/json')
class LemurTestCase(unittest.TestCase):
pass

View File

@ -1,87 +0,0 @@
import os
import shutil
import boto
from lemur import app
from lemur.tests import LemurTestCase
from lemur.tests.constants import TEST_CERT, TEST_KEY
from moto import mock_iam, mock_sts, mock_s3
class CertificateTestCase(LemurTestCase):
def test_create_challenge(self):
from lemur.certificates.service import create_challenge
self.assertTrue(len(create_challenge()) >= 24)
def test_hash_domains(self):
from lemur.certificates.service import hash_domains
h = hash_domains(['netflix.com', 'www.netflix.com', 'movies.netflix.com'])
self.assertEqual('c9c83253b46c7c1245c100ed3f7045eb', h)
def test_create_csr(self):
from lemur.certificates.service import create_csr
from lemur.tests.certificates.test_csr import TEST_CSR
path = create_csr(['netflix.com'], TEST_CSR)
files = len(os.listdir(path))
self.assertEqual(files, 4)
shutil.rmtree(path)
def test_create_san_csr(self):
from lemur.certificates.service import create_csr
from lemur.tests.certificates.test_csr import TEST_CSR
path = create_csr(['netflix.com', 'www.netflix.com'], TEST_CSR)
files = len(os.listdir(path))
self.assertEqual(files, 4)
shutil.rmtree(path)
def test_create_path(self):
from lemur.certificates.service import create_path
path = create_path("blah")
self.assertIn('blah', path)
shutil.rmtree(path)
@mock_s3
@mock_sts
@mock_iam
def test_save_cert(self):
from lemur.certificates.service import save_cert
from lemur.common.services.aws.iam import get_all_server_certs
conn = boto.connect_s3()
bucket = conn.create_bucket(app.config.get('S3_BUCKET'))
cert = save_cert(TEST_CERT, TEST_KEY, None, "blah", "blah", [1])
count = 0
for key in bucket.list():
count += 1
self.assertEqual(count, 4)
certs = get_all_server_certs('1111')
self.assertEqual(len(certs), 1)
# @mock_s3
# @mock_sts
# @mock_iam
# def test_upload_cert(self):
# from lemur.certificates.service import upload
# from lemur.common.services.aws.iam import get_all_server_certs
# conn = boto.connect_s3()
# bucket = conn.create_bucket(app.config.get('S3_BUCKET'))
#
# cert_up = {"public_cert": TEST_CERT, "private_key": TEST_KEY, "owner": "test@example.com", "accounts_ids": ['1111']}
#
# cert_name = upload(**cert_up)
# valid_name = 'AHB-dfdsflkj.net-NetflixInc-20140525-20150525'
# self.assertEqual(cert_name, valid_name)
#
# app.logger.debug(cert_name)
# count = 0
#
# for key in bucket.list():
# count += 1
#
# self.assertEqual(count, 2)
# certs = get_all_server_certs('179727101194')
# self.assertEqual(len(certs), 1)
#
#
#

221
lemur/tests/certs.py Normal file
View File

@ -0,0 +1,221 @@
from cryptography import x509
from cryptography.hazmat.backends import default_backend
INTERNAL_VALID_LONG_STR = """
-----BEGIN CERTIFICATE-----
MIID1zCCAr+gAwIBAgIBATANBgkqhkiG9w0BAQsFADCBjDELMAkGA1UEBhMCVVMx
CzAJBgNVBAgMAkNBMRAwDgYDVQQHDAdBIHBsYWNlMRcwFQYDVQQDDA5sb25nLmxp
dmVkLmNvbTEQMA4GA1UECgwHRXhhbXBsZTETMBEGA1UECwwKT3BlcmF0aW9uczEe
MBwGCSqGSIb3DQEJARYPamltQGV4YW1wbGUuY29tMB4XDTE1MDYyNjIwMzA1MloX
DTQwMDEwMTIwMzA1MlowgYwxCzAJBgNVBAYTAlVTMQswCQYDVQQIDAJDQTEQMA4G
A1UEBwwHQSBwbGFjZTEXMBUGA1UEAwwObG9uZy5saXZlZC5jb20xEDAOBgNVBAoM
B0V4YW1wbGUxEzARBgNVBAsMCk9wZXJhdGlvbnMxHjAcBgkqhkiG9w0BCQEWD2pp
bUBleGFtcGxlLmNvbTCCASIwDQYJKoZIhvcNAQEBBQADggEPADCCAQoCggEBAKeg
sqb0HI10i2eRSx3pLeA7JoGdUpud7hy3bGws/1HgOSpRMin9Y65DEpVq2Ia9oir7
XOJLpSTEIulnBkgDHNOsdKVYHDR6k0gUisnIKSl2C3IgKHpCouwiOvvVPwd3PExg
17+d7KLBIu8LpG28wkXKFU8vSz5i7H4i/XCEChnKJ4oGJuGAJJM4Zn022U156pco
97aEAc9ZXR/1dm2njr4XxCXmrnKCYTElfRhLkmxtv+mCi6eV//5d12z7mY3dTBkQ
EG2xpb5DQ+ITQ8BzsKcPX80rz8rTzgYFwaV3gUg38+bgka/JGJq8HgBuNnHv5CeT
1T/EoZTRYW2oPfOgQK8CAwEAAaNCMEAwDwYDVR0TAQH/BAUwAwEB/zAOBgNVHQ8B
Af8EBAMCAQYwHQYDVR0OBBYEFIuDY73dQIhj2nnd4DG2SvseHVVaMA0GCSqGSIb3
DQEBCwUAA4IBAQBk/WwfoWYdS0M8rz5tJda/cMdYFSugUbTn6JJdmHuw6RmiKzKG
8NzfSqBR6m8MWdSTuAZ/chsUZH9YEIjS9tAH9/FfUFBrsUE7TXaUgpNBm4DBLLfl
fj5xDmEyj17JPN/C36amQ9eU5BNesdCx9EkdWLyVJaM50HFRo71W0/FrpKZyKK68
XPhd1z9w/xgfCfYhe7PjEmrmNPN5Tgk5TyXW+UUhOepDctAv2DBetptcx+gHrtW+
Ygk1wptlt/tg7uUmstmXZA4vTPx83f4P3KSS3XHIYFIyGFWUDs23C20K6mmW1iXa
h0S8LN4iv/+vNFPNiM1z9X/SZgfbwZXrLsSi
-----END CERTIFICATE-----
"""
INTERNAL_VALID_LONG_CERT = x509.load_pem_x509_certificate(INTERNAL_VALID_LONG_STR, default_backend())
INTERNAL_INVALID_STR = """
-----BEGIN CERTIFICATE-----
MIIEFTCCAv2gAwIBAgICA+gwDQYJKoZIhvcNAQELBQAwgYwxCzAJBgNVBAYTAlVT
MQswCQYDVQQIDAJDQTEQMA4GA1UEBwwHQSBwbGFjZTEXMBUGA1UEAwwObG9uZy5s
aXZlZC5jb20xEDAOBgNVBAoMB0V4YW1wbGUxEzARBgNVBAsMCk9wZXJhdGlvbnMx
HjAcBgkqhkiG9w0BCQEWD2ppbUBleGFtcGxlLmNvbTAeFw0xNTA2MjYyMDM2NDha
Fw0xNTA2MjcyMDM2NDhaMGkxCzAJBgNVBAYTAlVTMQswCQYDVQQIEwJDQTEQMA4G
A1UEBxMHQSBwbGFjZTEQMA4GA1UEChMHRXhhbXBsZTETMBEGA1UECxMKT3BlcmF0
aW9uczEUMBIGA1UEAxMLZXhwaXJlZC5jb20wggEiMA0GCSqGSIb3DQEBAQUAA4IB
DwAwggEKAoIBAQCcSMzRxB6+UONPqYMy1Ojw3Wi8DIpt9USnSR60I8LiEuRK2ayr
0RMjLJ6sBEgy/hISEqpLgTsciDpxwaTC/WNrkT9vaMcwfiG3V0Red8zbKHQzC+Ty
cLRg9wbC3v613kaIZCQCoE7Aouru9WbVPmuRoasfztrgksWmH9infQbL4TDcmcxo
qGaMn4ajQTVAD63CKnut+CULZIMBREBVlSTLiOO7qZdTrd+vjtLWvdXVPcWLSBrd
Vpu3YnhqqTte+DMzQHwY7A2s3fu4Cg4H4npzcR+0H1H/B5z64kxqZq9FWGIcZcz7
0xXeHN9UUKPDSTgsjtIzKTaIOe9eML3jGSU7AgMBAAGjgaIwgZ8wDAYDVR0TAQH/
BAIwADAOBgNVHQ8BAf8EBAMCBaAwFgYDVR0lAQH/BAwwCgYIKwYBBQUHAwEwHQYD
VR0OBBYEFKwBYaxCLxK0csmV319rbRdqDllWMEgGA1UdHwRBMD8wPaA7oDmGN2h0
dHA6Ly90ZXN0LmNsb3VkY2EuY3JsLm5ldGZsaXguY29tL2xvbmdsaXZlZENBL2Ny
bC5wZW0wDQYJKoZIhvcNAQELBQADggEBADFngqsMsGnNBWknphLDvnoWu5MTrpsD
AgN0bktv5ACKRWhi/qtCmkEf6TieecRMwpQNMpE50dko3LGGdWlZRCI8wdH/zrw2
8MnOeCBxuS1nB4muUGjbf4LIbtuwoHSESrkfmuKjGGK9JTszLL6Hb9YnoFefeg8L
T7W3s8mm5bVHhQM7J9tV6dz/sVDmpOSuzL8oZkqeKP+lWU6ytaohFFpbdzaxWipU
3+GobVe4vRqoF1kwuhQ8YbMbXWDK6zlrT9pjFABcQ/b5nveiW93JDQUbjmVccx/u
kP+oGWtHvhteUAe8Gloo5NchZJ0/BqlYRCD5aAHcmbXRsDid9mO4ADU=
-----END CERTIFICATE-----
"""
INTERNAL_INVALID_CERT = x509.load_pem_x509_certificate(INTERNAL_INVALID_STR, default_backend())
INTERNAL_VALID_SAN_STR = """
-----BEGIN CERTIFICATE-----
MIIESjCCAzKgAwIBAgICA+kwDQYJKoZIhvcNAQELBQAwgYwxCzAJBgNVBAYTAlVT
MQswCQYDVQQIDAJDQTEQMA4GA1UEBwwHQSBwbGFjZTEXMBUGA1UEAwwObG9uZy5s
aXZlZC5jb20xEDAOBgNVBAoMB0V4YW1wbGUxEzARBgNVBAsMCk9wZXJhdGlvbnMx
HjAcBgkqhkiG9w0BCQEWD2ppbUBleGFtcGxlLmNvbTAeFw0xNTA2MjYyMDU5MDZa
Fw0yMDAxMDEyMDU5MDZaMG0xCzAJBgNVBAYTAlVTMQswCQYDVQQIEwJDQTEQMA4G
A1UEBxMHQSBwbGFjZTEQMA4GA1UEChMHRXhhbXBsZTETMBEGA1UECxMKT3BlcmF0
aW9uczEYMBYGA1UEAxMPc2FuLmV4YW1wbGUuY29tMIIBIjANBgkqhkiG9w0BAQEF
AAOCAQ8AMIIBCgKCAQEA2Nq5zFh2WiqtNIPssdSwQ9/00j370VcKPlOATLqK24Q+
dr2hWP1WlZJ0NOoPefhoIysccs2tRivosTpViRAzNJXigBHhxe8ger0QhVW6AXIp
ov327N689TgY4GzRrwqavjz8cqussIcnEUr4NLLsU5AvXE7e3WxYkkskzO497UOI
uCBtWdCXZ4cAGhtVkkA5uQHfPsLmgRVoUmdMDt5ZmA8HhLX4X6vkT3oGIhdGCw6T
W+Cu7PfYlSaggSBbBniU0YKTFLfGLkYFZN/b6bxzvt6CTJLoVFAYXyLJwUvd3EAm
u23HgUflIyZNG3xVPml/lah0OIX7RtSigXUSLm7lYwIDAQABo4HTMIHQMAwGA1Ud
EwEB/wQCMAAwDgYDVR0PAQH/BAQDAgWgMBYGA1UdJQEB/wQMMAoGCCsGAQUFBwMB
MC8GA1UdEQQoMCaCEWV4YW1wbGUyLmxvbmcuY29tghFleGFtcGxlMy5sb25nLmNv
bTAdBgNVHQ4EFgQUiiIyclcBIfJ5PE3OCcTXwzJAM+0wSAYDVR0fBEEwPzA9oDug
OYY3aHR0cDovL3Rlc3QuY2xvdWRjYS5jcmwubmV0ZmxpeC5jb20vbG9uZ2xpdmVk
Q0EvY3JsLnBlbTANBgkqhkiG9w0BAQsFAAOCAQEAgcTioq70B/aPWovNTy+84wLw
VX1q6bCdH3FJwAv2rc28CHp5mCGdR6JqfT/H/CbfRwT1Yh/5i7T5kEVyz+Dp3+p+
AJ2xauHrTvWn0QHQYbUWICwkuZ7VTI9nd0Fry1FQI1EeKiCmyrzNljiN2l+GZw6i
NJUpVNtwRyWRzB+yIx2E9wyydqDFH+sROuQok7EgzlQileitPrF4RrkfIhQp2/ki
YBrY/duF15YpoMKAlFhDBh6R9/nb5kI2n3pY6I5h6LEYfLStazXbIu61M8zu9TM/
+t5Oz6rmcjohL22+sEmmRz86dQZlrBBUxX0kCQj6OAFB4awtRd4fKtkCkZhvhQ==
-----END CERTIFICATE-----
"""
INTERNAL_VALID_SAN_CERT = x509.load_pem_x509_certificate(INTERNAL_VALID_SAN_STR, default_backend())
INTERNAL_VALID_WILDCARD_STR = """
-----BEGIN CERTIFICATE-----
MIIEHDCCAwSgAwIBAgICA+owDQYJKoZIhvcNAQELBQAwgYwxCzAJBgNVBAYTAlVT
MQswCQYDVQQIDAJDQTEQMA4GA1UEBwwHQSBwbGFjZTEXMBUGA1UEAwwObG9uZy5s
aXZlZC5jb20xEDAOBgNVBAoMB0V4YW1wbGUxEzARBgNVBAsMCk9wZXJhdGlvbnMx
HjAcBgkqhkiG9w0BCQEWD2ppbUBleGFtcGxlLmNvbTAeFw0xNTA2MjYyMTEzMTBa
Fw0yMDAxMDEyMTEzMTBaMHAxCzAJBgNVBAYTAlVTMQswCQYDVQQIEwJDQTEQMA4G
A1UEBxMHQSBwbGFjZTEQMA4GA1UEChMHRXhhbXBsZTETMBEGA1UECxMKT3BlcmF0
aW9uczEbMBkGA1UEAxQSKi50ZXN0LmV4YW1wbGUuY29tMIIBIjANBgkqhkiG9w0B
AQEFAAOCAQ8AMIIBCgKCAQEA0T7OEY9FxMIdhe1CwLc+TbDeSfDN6KRHlp0I9MwK
3Pre7A1+1vmRzLiS5qAdOh3Oexelmgdkn/fZUFI+IqEVJwmeUiq13Kib3BFnVtbB
N1RdT7rZF24Bqwygf1DHAekEBYdvu4dGD/gYKsLYsSMD7g6glUuhTbgR871updcV
USYJ801y640CcHjai8UCLxpqtkP/Alob+/KDczUHbhdxYgmH34aQgxC8zg+uzuq6
bIqUAc6SctI+6ArXOqri7wSMgZUnogpF4R5QbCnlDfSzNcNxJFtGp8cy7CNWebMd
IWgBYwee8i8S6Q90B2QUFD9EGG2pEZldpudTxWUpq0tWmwIDAQABo4GiMIGfMAwG
A1UdEwEB/wQCMAAwDgYDVR0PAQH/BAQDAgWgMBYGA1UdJQEB/wQMMAoGCCsGAQUF
BwMBMB0GA1UdDgQWBBTH2KIECrqPHMbsVysGv7ggkYYZGDBIBgNVHR8EQTA/MD2g
O6A5hjdodHRwOi8vdGVzdC5jbG91ZGNhLmNybC5uZXRmbGl4LmNvbS9sb25nbGl2
ZWRDQS9jcmwucGVtMA0GCSqGSIb3DQEBCwUAA4IBAQBjjfur2B6BcdIQIouwhXGk
IFE5gUYMK5S8Crf/lpMxwHdWK8QM1BpJu9gIo6VoM8uFVa8qlY8LN0SyNyWw+qU5
Jc8X/qCeeJwXEyXY3dIYRT/1aj7FCc7EFn1j6pcHPD6/0M2z0Zmj+1rWNBJdcYor
pCy27OgRoJKZ6YhEYekzwIPeFPL6irIN9xKPnfH0b2cnYa/g56DyGmyKH2Kkhz0A
UGniiUh4bAUuppbtSIvUTsRsJuPYOqHC3h8791JZ/3Sr5uB7QbCdz9K14c9zi6Z1
S0Xb3ZauZJQI7OdHeUPDRVq+8hcG77sopN9pEYrIH08oxvLX2US3GqrowjOxthRa
-----END CERTIFICATE-----
"""
INTERNAL_VALID_WILDCARD_CERT = x509.load_pem_x509_certificate(INTERNAL_VALID_WILDCARD_STR, default_backend())
EXTERNAL_VALID_STR = """
-----BEGIN CERTIFICATE-----
MIIFHzCCBAegAwIBAgIQGFWCciDWzbOej/TbAJN0WzANBgkqhkiG9w0BAQsFADCB
pDELMAkGA1UEBhMCVVMxHTAbBgNVBAoTFFN5bWFudGVjIENvcnBvcmF0aW9uMR8w
HQYDVQQLExZGT1IgVEVTVCBQVVJQT1NFUyBPTkxZMR8wHQYDVQQLExZTeW1hbnRl
YyBUcnVzdCBOZXR3b3JrMTQwMgYDVQQDEytTeW1hbnRlYyBDbGFzcyAzIFNlY3Vy
ZSBTZXJ2ZXIgVEVTVCBDQSAtIEc0MB4XDTE1MDYyNDAwMDAwMFoXDTE1MDYyNTIz
NTk1OVowgYMxCzAJBgNVBAYTAlVTMRMwEQYDVQQIDApDQUxJRk9STklBMRIwEAYD
VQQHDAlMb3MgR2F0b3MxFjAUBgNVBAoMDU5ldGZsaXgsIEluYy4xEzARBgNVBAsM
Ck9wZXJhdGlvbnMxHjAcBgNVBAMMFXR0dHQyLm5ldGZsaXh0ZXN0Lm5ldDCCASIw
DQYJKoZIhvcNAQEBBQADggEPADCCAQoCggEBALwMY/yod9YGLKLCzbbsSUBWm4ZC
DfcgbUNL3JLtZaFCaOeUPLa4YNqty+9ACXBLYPNMm+dgsRHix8N2uwtZrGazHILK
qey96eSTosPsvKFt0KLNpUl8GC/YxA69L128SJgFaaq5Dr2Mp3NP0rt0RIz5luPj
Oae0hkGOS8uS0dySlAmfOw2OsJY3gCw5UHcmpcCHpO2f7uU+tWKmgfz4U/PpQ0kz
WVJno+JhcaXIximtiLreCNF1LpraAjrcZJ+ySJwYaLaYMiJoFkdXUtKJcyqmkbA3
Splt7N4Hb8c+5aXv225uQYCh0HXQeMyBotlaIrAddP5obrtjxhXBxB4ysEcCAwEA
AaOCAWowggFmMCAGA1UdEQQZMBeCFXR0dHQyLm5ldGZsaXh0ZXN0Lm5ldDAJBgNV
HRMEAjAAMA4GA1UdDwEB/wQEAwIFoDAdBgNVHSUEFjAUBggrBgEFBQcDAQYIKwYB
BQUHAwIwYQYDVR0gBFowWDBWBgZngQwBAgIwTDAjBggrBgEFBQcCARYXaHR0cHM6
Ly9kLnN5bWNiLmNvbS9jcHMwJQYIKwYBBQUHAgIwGRoXaHR0cHM6Ly9kLnN5bWNi
LmNvbS9ycGEwHwYDVR0jBBgwFoAUNI9UtT8KH1K6nLJl7bqLCGcZ4AQwKwYDVR0f
BCQwIjAgoB6gHIYaaHR0cDovL3NzLnN5bWNiLmNvbS9zcy5jcmwwVwYIKwYBBQUH
AQEESzBJMB8GCCsGAQUFBzABhhNodHRwOi8vc3Muc3ltY2QuY29tMCYGCCsGAQUF
BzAChhpodHRwOi8vc3Muc3ltY2IuY29tL3NzLmNydDANBgkqhkiG9w0BAQsFAAOC
AQEAQuIfyBltvCZ9orqNdS6PUo2PaeUgJzkmdDwbDVd7rTwbZIwGZXZjeKseqMSb
L+r/jN6DWrScVylleiz0N/D0lSUhC609dQKuicGpy3yQaXwhfYZ6duxrW3Ii/+Vz
pFv7DnG3JPZjIXCmVhQVIv/8oaV0bfUF/1mrWRFwZiBILxa7iaycRhjusJEVRtzN
Ot/qkLluHO0wbEHnASV4P9Y5NuR/bliuFS/DeRczofNS78jJuZrGvl2AqS/19Hvm
Bs63gULVCqWygt5KEbv990m/XGuRMaXuHzHCHB4v5LRM30FiFmqCzyD8d+btzW9B
1hZ5s3rj+a6UwvpinKJoPfgkgg==
-----END CERTIFICATE-----
"""
EXTERNAL_CERT = x509.load_pem_x509_certificate(EXTERNAL_VALID_STR, default_backend())
PRIVATE_KEY_STR = """
-----BEGIN RSA PRIVATE KEY-----
MIIEpAIBAAKCAQEAnEjM0cQevlDjT6mDMtTo8N1ovAyKbfVEp0ketCPC4hLkStms
q9ETIyyerARIMv4SEhKqS4E7HIg6ccGkwv1ja5E/b2jHMH4ht1dEXnfM2yh0Mwvk
8nC0YPcGwt7+td5GiGQkAqBOwKLq7vVm1T5rkaGrH87a4JLFph/Yp30Gy+Ew3JnM
aKhmjJ+Go0E1QA+twip7rfglC2SDAURAVZUky4jju6mXU63fr47S1r3V1T3Fi0ga
3Vabt2J4aqk7XvgzM0B8GOwNrN37uAoOB+J6c3EftB9R/wec+uJMamavRVhiHGXM
+9MV3hzfVFCjw0k4LI7SMyk2iDnvXjC94xklOwIDAQABAoIBAGeykly5MeD70OgB
xPEMfoebkav88jklnekVxk6mz9+rw1i6+CyFLJqRN7NRoApdtOXTBrXUyMEUzxq9
7zIGaVptZNbqggh2GK8LM20vNnlQbVGVmdMX30fbgNv6lK1eEBTdxVsMvVRqhVIK
+LGTmlJmICKZ4XdTS9v/k4UGm2TZPCt2pvrNzIpT7TIm2QybCbZoOPY8SHx0U8c5
lmtdqmIsy2JPNSOsOCiJgzQIvkR/fMGWFgNE4fEHsHAfubgpK97TGzwLiFRmlTb+
QUDaz0YbwhF+5bQjHtaGUGATcg5bvV1UWBUvp+g4gRIfwzG+3PAGacYE/djouAdG
PHbxuCkCgYEAz/LsgMgsaV3arlounviSwc8wG9WcI5gbYw5qwX0P57ZoxS7EBAGu
yYtudurJrU9SfsSV44GL11UzBcAGOeS0btddrcMiNBhc7fY7P/1xaufQ3GjG06/v
kH4gOjzsGSTJliZ709g4J6hnMCxz0O0PS31Qg5cBD8UG8xO7/AV0is0CgYEAwGWy
A6YPinpZuenaxrivM5AcVDWmj7aeC29M63l/GY+O5LQH2PKVESH0vL5PvG3LkrCR
SUbaMKdKR0wnZsJ89z21eZ54ydUgj41bZJczl8drxcY0GSajj6XZXGTUjtoVrWsB
A0kJbjsrpd+8J316Y9iCgpopmbVd965pUHe4ACcCgYAamJlDB1cWytgzQHmB/4zV
mOgwRyvHKacnDir9QD+OhTf1MDwFvylZwamJMBJHRkPozr/U7zaxfcYe0CZ7tRKW
spjapoBzZUJNdRay4nllEO0Xo5b6cCAVvOvmRvBzbs8Rky53M8pK2DEKakUNzaQN
JaPskJ2kJLD02etLGm+DaQKBgQCTI/NNmQ2foUzHw1J+0jWjoJ4ZxOI6XLZoFlnk
aInMuZ7Vx92MjJF2hdqPEpkWiX28FO839EjgFsDW4CXuD+XUjEwi1BCagzWgs8Hm
n0Bk3q3MlnW3mnZSYMtoPvDUw3L6qrAenBfrRrNt6zsRlIQqoiXFzjLsi+luh+Oh
F74P1wKBgQCPQGKLUcfAvjIcZp4ECH0K8sBEmoEf8pceuALZ3H5vneYDzqMDIceo
t5Gpocpt77LJnNiszXSerj/KjX2MflY5xUXeekWowLVTBOK5+CZ8+XBIgBt1hIG3
XKxcRgm/Va4QMEAnec0qXfdTVJaJiAW0bdKwKRRrrbwcTdNRGibdng==
-----END RSA PRIVATE KEY-----
"""
CSR_CONFIG = """
# Configuration for standard CSR generation for Netflix
# Used for procuring VeriSign certificates
# Author: jbob
# Contact: security@example.com
[ req ]
# Use a 2048 bit private key
default_bits = 2048
default_keyfile = key.pem
prompt = no
encrypt_key = no
# base request
distinguished_name = req_distinguished_name
# extensions
# Uncomment the following line if you are requesting a SAN cert
#req_extensions = req_ext
# distinguished_name
[ req_distinguished_name ]
countryName = "US" # C=
stateOrProvinceName = "CALIFORNIA" # ST=
localityName = "A place" # L=
organizationName = "Example, Inc." # O=
organizationalUnitName = "Operations" # OU=
# This is the hostname/subject name on the certificate
commonName = "example.net" # CN=
[ req_ext ]
# Uncomment the following line if you are requesting a SAN cert
#subjectAltName = @alt_names
[alt_names]
# Put your SANs here
"""

78
lemur/tests/conftest.py Normal file
View File

@ -0,0 +1,78 @@
import pytest
from lemur import create_app
from lemur.database import db as _db
from lemur.users import service as user_service
from lemur.roles import service as role_service
def pytest_addoption(parser):
parser.addoption("--runslow", action="store_true", help="run slow tests")
def pytest_runtest_setup(item):
if 'slow' in item.keywords and not item.config.getoption("--runslow"):
pytest.skip("need --runslow option to run")
if "incremental" in item.keywords:
previousfailed = getattr(item.parent, "_previousfailed", None)
if previousfailed is not None:
pytest.xfail("previous test failed ({0})".format(previousfailed.name))
def pytest_runtest_makereport(item, call):
if "incremental" in item.keywords:
if call.excinfo is not None:
parent = item.parent
parent._previousfailed = item
@pytest.yield_fixture(scope="session")
def app():
"""
Creates a new Flask application for a test duration.
Uses application factory `create_app`.
"""
app = create_app()
app.config['TESTING'] = True
app.config['LEMUR_ENCRYPTION_KEY'] = 'test'
ctx = app.app_context()
ctx.push()
yield app
ctx.pop()
@pytest.yield_fixture(scope="session")
def db(app, request):
_db.drop_all()
_db.create_all()
_db.app = app
user = user_service.create('user', 'test', 'user@example.com', True, None, [])
admin_role = role_service.create('admin')
admin = user_service.create('admin', 'admin', 'admin@example.com', True, None, [admin_role])
_db.session.commit()
yield _db
@pytest.yield_fixture(scope="function")
def session(db, request):
"""
Creates a new database session with (with working transaction)
for test duration.
"""
db.session.begin_nested()
yield session
db.session.rollback()
@pytest.yield_fixture(scope="function")
def client(app, session):
with app.test_client() as client:
yield client

View File

@ -1,51 +0,0 @@
TEST_KEY = """-----BEGIN RSA PRIVATE KEY-----
MIIEogIBAAKCAQEAvNudwW+UeQqkpY71MIdEg501AFlPKuOXG2xU8DZhvZS6dKv+
kDmIWdEqodDgkQiy0jyTgTwxwRqDSw96R6ZgrXefUoJJo66aCsosTBZtVaE85f1L
bj2+3U678c+rekUdkrnGcGCo6b8QtdvBpiDy2clneox8tSvmffAdcR1uCv/790/k
PzQ/djWDX9JcBRyDkcTJwYC0/ek7URvA/+MXmgUL13T+gWKqduaKuIBlFetonDjn
nO11QUBiusIuHV62wzKn8m5Nc+4XoaBR0YWMFn/g6qXDYrwfCsMpka7vSWJFv5Ff
yf+7kY3wU4xIwU2vXlIDcCsdUu6b/pYoQ0YOsQIDAQABAoIBAGbFH6iWnnXrq8MH
8zcQNOFmF+RztRgCt0TOA76f6TowB/LbcXBsTl2J7CgYMUvbLuwm2KHX7r9FPTMI
XiNFT5C16rYMfiQbLGo4sDhLb/3L+wawem6oHQfzA2VH++lSWRByFaEriF+CgIZl
6pALl/uZlLzkXCx+kjPwCSV3vV0wFkDnNs6+wPrz2IhkePsuC8J0QKQLlwsES2It
Gizzhpehdv9lc9MyZC//1QlD9gMDl5ok5Bt1Xm2c12XUEEcLlKQkJxiOrBOfXPmV
PHCdLc7gZO30hc6dyQ1SSnLpywhz/a0ir2GMvkMbS5hculpcZmwEcdZl1HYD8ObP
yOMbPE0CgYEA4LVGJKGtbM8RiBB0MstxNstMYVJ4mXB0lSQ0RazdO3S3ojn+oLpF
b2pvV6m9WnHiCGigWkzhqtGGCo6aqE0MoiR4jTN8GhiZz4ggDDaVgc4Px5reUD+r
tRsTpBHseGQ+ODGgkMI8eJYkdyqkECkYjAOrdy6uorvgxUAZecRIfJMCgYEA1yhM
7NidTNRuA+huS5GcQwQweTM6P1qF7Kfk1JYQMVu4gibLZiLHlWCyHI9lrbI7IaMm
g/4jXXoewv7IvyrrSEFulkPeVWxCe3mjfQ8JANfUj4kuR915LSn4lX2pbUgUS66K
vJSUJtnzLUmb8khLEcOmDbmTFZl8D/bTHFFZlisCgYAeelfWNhuoq3lMRDcOgKuN
bAujE6WJ4kfdxrhUTvr+ynjxxv3zXPB4CS6q7Dnjn5ix3UcKmGzvV1Xf7rGpbDHv
eBTlyfrmKzoJfQQjw++JWKKpRycqKUin2tFSKqAxQB90Tb7ig4XiMTMm+qCgFILg
0sqZ8rn7FpKJDoWmD2ppgwKBgG2Dl9QeVcKbhfv7PNi+HvmFkl6+knFY1D4nHzSN
xWQ6OWoV8QXlwgzokQA0hR6qT6rJbntUyg90b1/1a5zSbbvzgiR+GxcD6bsLqQmo
s354XTtKKgJuWpWAfYUp1ylGvP3gs8FVJyu3WC2+/9+MqJk8KrNlt9YQr7M4gTAy
wBTNAoGAGU7Po4uI3xDKGLLK/ot3D3P8U9ByfeLlrUZtTz1PASsMOr92bkXmUPlE
DYUd5uFfwwlvbMNT1Ooeyrzg3bARd9B6ATyMkOaJeGoQwFAI468iucnm9rNXB+/t
U2rbIi1pXSm8zSNEY85tf6C8DU/5YbcAPf47a2UYhwCpYAJfMk0=
-----END RSA PRIVATE KEY-----"""
TEST_CERT = """-----BEGIN CERTIFICATE-----
MIIDcDCCAlgCCQC8msHu/aa61zANBgkqhkiG9w0BAQUFADB6MQswCQYDVQQGEwJV
UzETMBEGA1UECBMKQ0FMSUZPUk5JQTESMBAGA1UEBxMJTG9zIEdhdG9zMRYwFAYD
VQQKEw1OZXRmbGl4LCBJbmMuMRMwEQYDVQQLEwpPcGVyYXRpb25zMRUwEwYDVQQD
EwxkZmRzZmxrai5uZXQwHhcNMTQwNTI1MTczMDMzWhcNMTUwNTI1MTczMDMzWjB6
MQswCQYDVQQGEwJVUzETMBEGA1UECBMKQ0FMSUZPUk5JQTESMBAGA1UEBxMJTG9z
IEdhdG9zMRYwFAYDVQQKEw1OZXRmbGl4LCBJbmMuMRMwEQYDVQQLEwpPcGVyYXRp
b25zMRUwEwYDVQQDEwxkZmRzZmxrai5uZXQwggEiMA0GCSqGSIb3DQEBAQUAA4IB
DwAwggEKAoIBAQC8253Bb5R5CqSljvUwh0SDnTUAWU8q45cbbFTwNmG9lLp0q/6Q
OYhZ0Sqh0OCRCLLSPJOBPDHBGoNLD3pHpmCtd59SgkmjrpoKyixMFm1VoTzl/Utu
Pb7dTrvxz6t6RR2SucZwYKjpvxC128GmIPLZyWd6jHy1K+Z98B1xHW4K//v3T+Q/
ND92NYNf0lwFHIORxMnBgLT96TtRG8D/4xeaBQvXdP6BYqp25oq4gGUV62icOOec
7XVBQGK6wi4dXrbDMqfybk1z7hehoFHRhYwWf+DqpcNivB8KwymRru9JYkW/kV/J
/7uRjfBTjEjBTa9eUgNwKx1S7pv+lihDRg6xAgMBAAEwDQYJKoZIhvcNAQEFBQAD
ggEBAJHwa4l2iSiFBb6wVFBJEWEt31qp+njiVCoTg2OJzCT60Xb26hkrsiTldIIh
eB9+y+fwdfwopzWhkNbIOlCfudx/uxtpor8/3BRbjSlNwDUg2L8pfAircJMFLQUM
O6nqPOBWCe8hXwe9FQM/oFOavf/AAw/FED+892xlytjirK9u3B28O20W11+fY7hp
8LQVBrMoVxFeLWmmwETAltJ7HEYutplRzYTM0vLBARl4Vd5kLJlY3j2Dp1ZpRGcg
CrQp26UD/oaAPGtiZQSC4LJ+4JfOuuqbm3CI24QMCh9rxv3ZoOQnFuC+7cZgqrat
V4bxCrVvWhrrDSgy9+A80NVzQ3k=
-----END CERTIFICATE-----"""

View File

@ -1,5 +0,0 @@
import os
import shutil
from lemur import app
from lemur.tests import LemurTestCase

View File

@ -1,51 +0,0 @@
import boto
from lemur.tests import LemurTestCase
from moto import mock_elb, mock_sts
class ELBTestCase(LemurTestCase):
@mock_sts
@mock_elb
def test_add_listener(self):
from lemur.common.services.aws.elb import create_new_listeners
conn = boto.connect_elb()
zones = ['us-east-1a', 'us-east-1b']
ports = [(80, 8080, 'http')]
conn.create_load_balancer('my-lb', zones, ports)
create_new_listeners('111', 'us-east-1', 'my-lb', listeners=[('443', '80', 'HTTP')])
balancer = conn.get_all_load_balancers()[0]
self.assertEqual(balancer.name, "my-lb")
self.assertEqual(len(balancer.listeners), 2)
@mock_sts
@mock_elb
def test_update_listener(self):
from lemur.common.services.aws.elb import update_listeners
conn = boto.connect_elb()
zones = ['us-east-1a', 'us-east-1b']
ports = [(80, 8080, 'http')]
conn.create_load_balancer('my-lb', zones, ports)
update_listeners('111', 'us-east-1', 'my-lb', listeners=[('80', '7001', 'http')])
balancer = conn.get_all_load_balancers()[0]
listener = balancer.listeners[0]
self.assertEqual(listener.load_balancer_port, 80)
self.assertEqual(listener.instance_port, 7001)
self.assertEqual(listener.protocol, "HTTP")
@mock_sts
@mock_elb
def test_set_certificate(self):
from lemur.common.services.aws.elb import attach_certificate
conn = boto.connect_elb()
zones = ['us-east-1a', 'us-east-1b']
ports = [(443, 7001, 'https', 'sslcert')]
conn.create_load_balancer('my-lb', zones, ports)
attach_certificate('1111', 'us-east-1', 'my-lb', 443, 'somecert')
balancer = conn.get_all_load_balancers()[0]
listener = balancer.listeners[0]
self.assertEqual(listener.load_balancer_port, 443)
self.assertEqual(listener.instance_port, 7001)
self.assertEqual(listener.protocol, "HTTPS")
self.assertEqual(listener.ssl_certificate_id, 'somecert')

View File

@ -1,37 +0,0 @@
from lemur import app
from lemur.tests import LemurTestCase
from lemur.tests.constants import TEST_CERT, TEST_KEY
from lemur.certificates.models import Certificate
from moto import mock_iam, mock_sts
class IAMTestCase(LemurTestCase):
@mock_sts
@mock_iam
def test_get_all_server_certs(self):
from lemur.common.services.aws.iam import upload_cert, get_all_server_certs
cert = Certificate(TEST_CERT)
upload_cert('1111', cert, TEST_KEY)
certs = get_all_server_certs('1111')
self.assertEquals(len(certs), 1)
@mock_sts
@mock_iam
def test_get_server_cert(self):
from lemur.common.services.aws.iam import upload_cert, get_cert_from_arn
cert = Certificate(TEST_CERT)
upload_cert('1111', cert, TEST_KEY)
body, chain = get_cert_from_arn('arn:aws:iam::123456789012:server-certificate/AHB-dfdsflkj.net-NetflixInc-20140525-20150525')
self.assertTrue(body)
@mock_sts
@mock_iam
def test_upload_server_cert(self):
from lemur.common.services.aws.iam import upload_cert
cert = Certificate(TEST_CERT)
response = upload_cert('1111', cert, TEST_KEY)
self.assertEquals(response['upload_server_certificate_response']['upload_server_certificate_result']['server_certificate_metadata']['server_certificate_name'], 'AHB-dfdsflkj.net-NetflixInc-20140525-20150525')

View File

@ -1,23 +0,0 @@
from lemur import app
from lemur.tests import LemurTestCase
from lemur.tests.constants import TEST_CERT, TEST_KEY
from lemur.certificates.models import Certificate
from moto import mock_iam, mock_sts
class ManagerTestCase(LemurTestCase):
def test_validate_authority(self):
pass
def test_get_all_authorities(self):
from lemur.common.services.issuers.manager import get_all_authorities
authorities = get_all_authorities()
self.assertEqual(len(authorities), 3)
def test_get_all_issuers(self):
from lemur.common.services.issuers.manager import get_all_issuers
issuers = get_all_issuers()
self.assertEqual(len(issuers) > 1)

View File

@ -1,27 +0,0 @@
import boto
from lemur.tests import LemurTestCase
from lemur.tests.constants import TEST_CERT
from lemur.certificates.models import Certificate
from moto import mock_s3
class S3TestCase(LemurTestCase):
@mock_s3
def test_save(self):
from lemur.common.services.aws.s3 import save
conn = boto.connect_s3()
cert = Certificate(TEST_CERT)
buck = conn.create_bucket('test')
path = save(cert, 'private_key', None, 'csr_config', 'challenge')
self.assertEqual(path, 'lemur/{}/{}/'.format(cert.issuer, cert.name))
count = 0
for key in buck.list():
count += 1
self.assertEqual(count, 4)

View File

@ -0,0 +1,132 @@
from lemur.accounts.service import *
from lemur.accounts.views import *
from json import dumps
def test_crud(session):
account = create('111111', 'account1')
assert account.id > 0
account = update(account.id, 11111, 'account2')
assert account.label == 'account2'
assert len(get_all()) == 1
delete(1)
assert len(get_all()) == 0
def test_account_get(client):
assert client.get(api.url_for(Accounts, account_id=1)).status_code == 401
def test_account_post(client):
assert client.post(api.url_for(Accounts, account_id=1), {}).status_code == 405
def test_account_put(client):
assert client.put(api.url_for(Accounts, account_id=1), {}).status_code == 401
def test_account_delete(client):
assert client.delete(api.url_for(Accounts, account_id=1)).status_code == 401
def test_account_patch(client):
assert client.patch(api.url_for(Accounts, account_id=1), {}).status_code == 405
VALID_USER_HEADER_TOKEN = {
'Authorization': 'Basic ' + 'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpYXQiOjE0MzUyMzMzNjksInN1YiI6MSwiZXhwIjoxNTIxNTQ2OTY5fQ.1qCi0Ip7mzKbjNh0tVd3_eJOrae3rNa_9MCVdA4WtQI'}
def test_auth_account_get(client):
assert client.get(api.url_for(Accounts, account_id=1), headers=VALID_USER_HEADER_TOKEN).status_code == 200
def test_auth_account_post_(client):
assert client.post(api.url_for(Accounts, account_id=1), {}, headers=VALID_USER_HEADER_TOKEN).status_code == 405
def test_auth_account_put(client):
assert client.put(api.url_for(Accounts, account_id=1), data={}, headers=VALID_USER_HEADER_TOKEN).status_code == 403
def test_auth_account_delete(client):
assert client.delete(api.url_for(Accounts, account_id=1), headers=VALID_USER_HEADER_TOKEN).status_code == 403
def test_auth_account_patch(client):
assert client.patch(api.url_for(Accounts, account_id=1), {}, headers=VALID_USER_HEADER_TOKEN).status_code == 405
VALID_ADMIN_HEADER_TOKEN = {
'Authorization': 'Basic ' + 'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpYXQiOjE0MzUyNTAyMTgsInN1YiI6MiwiZXhwIjoxNTIxNTYzODE4fQ.6mbq4-Ro6K5MmuNiTJBB153RDhlM5LGJBjI7GBKkfqA'}
def test_admin_account_get(client):
assert client.get(api.url_for(Accounts, account_id=1), headers=VALID_ADMIN_HEADER_TOKEN).status_code == 200
def test_admin_account_post(client):
assert client.post(api.url_for(Accounts, account_id=1), {}, headers=VALID_ADMIN_HEADER_TOKEN).status_code == 405
def test_admin_account_put(client):
assert client.put(api.url_for(Accounts, account_id=1), data={}, headers=VALID_ADMIN_HEADER_TOKEN).status_code == 400
def test_admin_account_delete(client):
assert client.delete(api.url_for(Accounts, account_id=1), headers=VALID_ADMIN_HEADER_TOKEN).status_code == 500
def test_admin_account_patch(client):
assert client.patch(api.url_for(Accounts, account_id=1), {}, headers=VALID_ADMIN_HEADER_TOKEN).status_code == 405
def test_accounts_get(client):
assert client.get(api.url_for(AccountsList)).status_code == 401
def test_accounts_post(client):
assert client.post(api.url_for(AccountsList), {}).status_code == 401
def test_accounts_put(client):
assert client.put(api.url_for(AccountsList), {}).status_code == 405
def test_accounts_delete(client):
assert client.delete(api.url_for(AccountsList)).status_code == 405
def test_accounts_patch(client):
assert client.patch(api.url_for(AccountsList), {}).status_code == 405
def test_auth_accounts_get(client):
assert client.get(api.url_for(AccountsList), headers=VALID_USER_HEADER_TOKEN).status_code == 200
def test_auth_accounts_post(client):
assert client.post(api.url_for(AccountsList), {}, headers=VALID_USER_HEADER_TOKEN).status_code == 403
def test_admin_accounts_get(client):
resp = client.get(api.url_for(AccountsList), headers=VALID_ADMIN_HEADER_TOKEN)
assert resp.status_code == 200
assert resp.json == {'items': [], 'total': 0}
def test_admin_accounts_crud(client):
assert client.post(api.url_for(AccountsList), headers=VALID_ADMIN_HEADER_TOKEN).status_code == 400
data = {'accountNumber': 111, 'label': 'test', 'comments': 'test'}
resp = client.post(api.url_for(AccountsList), data=dumps(data), content_type='application/json', headers=VALID_ADMIN_HEADER_TOKEN)
assert resp.status_code == 200
assert client.get(api.url_for(Accounts, account_id=resp.json['id']), headers=VALID_ADMIN_HEADER_TOKEN).status_code == 200
resp = client.get(api.url_for(AccountsList), headers=VALID_ADMIN_HEADER_TOKEN)
assert resp.status_code == 200
assert resp.json == {'items': [{'accountNumber': 111, 'label': 'test', 'comments': 'test', 'id': 2}], 'total': 1}
assert client.delete(api.url_for(Accounts, account_id=2), headers=VALID_ADMIN_HEADER_TOKEN).status_code == 200
resp = client.get(api.url_for(AccountsList), headers=VALID_ADMIN_HEADER_TOKEN)
assert resp.status_code == 200
assert resp.json == {'items': [], 'total': 0}

View File

@ -0,0 +1,163 @@
import pytest
from lemur.authorities.views import *
#def test_crud(session):
# role = create('role1')
# assert role.id > 0
#
# role = update(role.id, 'role_new', None, [])
# assert role.name == 'role_new'
# delete(role.id)
# assert get(role.id) == None
def test_authority_get(client):
assert client.get(api.url_for(Authorities, authority_id=1)).status_code == 401
def test_authority_post(client):
assert client.post(api.url_for(Authorities, authority_id=1), {}).status_code == 405
def test_authority_put(client):
assert client.put(api.url_for(Authorities, authority_id=1), {}).status_code == 401
def test_authority_delete(client):
assert client.delete(api.url_for(Authorities, authority_id=1)).status_code == 405
def test_authority_patch(client):
assert client.patch(api.url_for(Authorities, authority_id=1), {}).status_code == 405
def test_authorities_get(client):
assert client.get(api.url_for(AuthoritiesList)).status_code == 401
def test_authorities_post(client):
assert client.post(api.url_for(AuthoritiesList), {}).status_code == 401
def test_authorities_put(client):
assert client.put(api.url_for(AuthoritiesList), {}).status_code == 405
def test_authorities_delete(client):
assert client.delete(api.url_for(AuthoritiesList)).status_code == 405
def test_authorities_patch(client):
assert client.patch(api.url_for(AuthoritiesList), {}).status_code == 405
def test_certificate_authorities_get(client):
assert client.get(api.url_for(AuthoritiesList)).status_code == 401
def test_certificate_authorities_post(client):
assert client.post(api.url_for(AuthoritiesList), {}).status_code == 401
def test_certificate_authorities_put(client):
assert client.put(api.url_for(AuthoritiesList), {}).status_code == 405
def test_certificate_authorities_delete(client):
assert client.delete(api.url_for(AuthoritiesList)).status_code == 405
def test_certificate_authorities_patch(client):
assert client.patch(api.url_for(AuthoritiesList), {}).status_code == 405
VALID_USER_HEADER_TOKEN = {
'Authorization': 'Basic ' + 'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpYXQiOjE0MzUyMzMzNjksInN1YiI6MSwiZXhwIjoxNTIxNTQ2OTY5fQ.1qCi0Ip7mzKbjNh0tVd3_eJOrae3rNa_9MCVdA4WtQI'}
def test_auth_authority_get(client):
assert client.get(api.url_for(Authorities, authority_id=1), headers=VALID_USER_HEADER_TOKEN).status_code == 200
def test_auth_authority_post_(client):
assert client.post(api.url_for(Authorities, authority_id=1), {}, headers=VALID_USER_HEADER_TOKEN).status_code == 405
def test_auth_authority_put(client):
assert client.put(api.url_for(Authorities, authority_id=1), data={}, headers=VALID_USER_HEADER_TOKEN).status_code == 400
def test_auth_authority_delete(client):
assert client.delete(api.url_for(Authorities, authority_id=1), headers=VALID_USER_HEADER_TOKEN).status_code == 405
def test_auth_authority_patch(client):
assert client.patch(api.url_for(Authorities, authority_id=1), {}, headers=VALID_USER_HEADER_TOKEN).status_code == 405
def test_auth_authorities_get(client):
assert client.get(api.url_for(AuthoritiesList), headers=VALID_USER_HEADER_TOKEN).status_code == 200
def test_auth_authorities_post(client):
assert client.post(api.url_for(AuthoritiesList), {}, headers=VALID_USER_HEADER_TOKEN).status_code == 400
def test_auth_certificates_authorities_get(client):
assert client.get(api.url_for(CertificateAuthority, certificate_id=1), headers=VALID_USER_HEADER_TOKEN).status_code == 404
VALID_ADMIN_HEADER_TOKEN = {
'Authorization': 'Basic ' + 'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpYXQiOjE0MzUyNTAyMTgsInN1YiI6MiwiZXhwIjoxNTIxNTYzODE4fQ.6mbq4-Ro6K5MmuNiTJBB153RDhlM5LGJBjI7GBKkfqA'}
def test_admin_authority_get(client):
assert client.get(api.url_for(Authorities, authority_id=1), headers=VALID_ADMIN_HEADER_TOKEN).status_code == 200
def test_admin_authority_post(client):
assert client.post(api.url_for(Authorities, authority_id=1), {}, headers=VALID_ADMIN_HEADER_TOKEN).status_code == 405
def test_admin_authority_put(client):
assert client.put(api.url_for(Authorities, authority_id=1), data={}, headers=VALID_ADMIN_HEADER_TOKEN).status_code == 400
def test_admin_authority_delete(client):
assert client.delete(api.url_for(Authorities, authority_id=1), headers=VALID_ADMIN_HEADER_TOKEN).status_code == 405
def test_admin_authority_patch(client):
assert client.patch(api.url_for(Authorities, authority_id=1), data={}, headers=VALID_ADMIN_HEADER_TOKEN).status_code == 405
def test_admin_authorities_get(client):
assert client.get(api.url_for(AuthoritiesList), headers=VALID_ADMIN_HEADER_TOKEN).status_code == 200
def test_admin_authorities_post(client):
assert client.post(api.url_for(AuthoritiesList), {}, headers=VALID_ADMIN_HEADER_TOKEN).status_code == 400
def test_admin_authorities_put(client):
assert client.put(api.url_for(AuthoritiesList), {}, headers=VALID_ADMIN_HEADER_TOKEN).status_code == 405
def test_admin_authorities_delete(client):
assert client.delete(api.url_for(AuthoritiesList), headers=VALID_ADMIN_HEADER_TOKEN).status_code == 405
def test_admin_certificate_authorities_get(client):
assert client.get(api.url_for(CertificateAuthority, certificate_id=1), headers=VALID_ADMIN_HEADER_TOKEN).status_code == 404
def test_admin_certificate_authorities_post(client):
assert client.post(api.url_for(CertificateAuthority, certficate_id=1), headers=VALID_ADMIN_HEADER_TOKEN).status_code == 405
def test_admin_certificate_authorities_put(client):
assert client.put(api.url_for(CertificateAuthority, certificate_id=1), headers=VALID_ADMIN_HEADER_TOKEN).status_code == 405
def test_admin_certificate_authorities_delete(client):
assert client.delete(api.url_for(CertificateAuthority, certificate_id=1), headers=VALID_ADMIN_HEADER_TOKEN).status_code == 405

View File

@ -0,0 +1,344 @@
import os
import pytest
from mock import mock_open, patch
from lemur.certificates.views import *
#def test_crud(session):
# role = create('role1')
# assert role.id > 0
#
# role = update(role.id, 'role_new', None, [])
# assert role.name == 'role_new'
# delete(role.id)
# assert get(role.id) == None
def test_valid_authority(session):
assert 1 == 2
def test_pem_str():
from lemur.tests.certs import INTERNAL_VALID_LONG_STR
assert pem_str(INTERNAL_VALID_LONG_STR, 'test') == INTERNAL_VALID_LONG_STR
with pytest.raises(ValueError):
pem_str('sdfsdfds', 'test')
def test_private_key_str():
from lemur.tests.certs import PRIVATE_KEY_STR
assert private_key_str(PRIVATE_KEY_STR, 'test') == PRIVATE_KEY_STR
with pytest.raises(ValueError):
private_key_str('dfsdfsdf', 'test')
def test_create_csr():
from lemur.tests.certs import CSR_CONFIG
from lemur.certificates.service import create_csr
m = mock_open()
with patch('lemur.certificates.service.open', m, create=True):
path = create_csr(CSR_CONFIG)
assert path == ''
def test_create_path():
assert 1 == 2
def test_load_ssl_pack():
assert 1 == 2
def test_delete_ssl_path():
assert 1 == 2
def test_import_certificate(session):
assert 1 == 2
def test_mint():
assert 1 == 2
def test_disassociate_aws_account():
assert 1 == 2
def test_cert_get_cn():
from lemur.tests.certs import INTERNAL_VALID_LONG_CERT
from lemur.certificates.models import cert_get_cn
assert cert_get_cn(INTERNAL_VALID_LONG_CERT) == 'long.lived.com'
def test_cert_get_domains():
from lemur.tests.certs import INTERNAL_VALID_SAN_CERT, INTERNAL_VALID_LONG_CERT
from lemur.certificates.models import cert_get_domains
assert cert_get_domains(INTERNAL_VALID_LONG_CERT) == ['long.lived.com']
assert cert_get_domains(INTERNAL_VALID_SAN_CERT) == ['example2.long.com', 'example3.long.com', 'san.example.com']
def test_cert_is_san():
from lemur.tests.certs import INTERNAL_VALID_SAN_CERT, INTERNAL_VALID_LONG_CERT
from lemur.certificates.models import cert_is_san
assert cert_is_san(INTERNAL_VALID_LONG_CERT) == False
assert cert_is_san(INTERNAL_VALID_SAN_CERT) == True
def test_cert_is_wildcard():
from lemur.tests.certs import INTERNAL_VALID_WILDCARD_CERT, INTERNAL_VALID_LONG_CERT
from lemur.certificates.models import cert_is_wildcard
assert cert_is_wildcard(INTERNAL_VALID_WILDCARD_CERT) == True
assert cert_is_wildcard(INTERNAL_VALID_LONG_CERT) == False
def test_cert_get_bitstrength():
from lemur.tests.certs import INTERNAL_VALID_LONG_CERT
from lemur.certificates.models import cert_get_bitstrength
assert cert_get_bitstrength(INTERNAL_VALID_LONG_CERT) == 2048
def test_cert_get_issuer():
from lemur.tests.certs import INTERNAL_VALID_LONG_CERT
from lemur.certificates.models import cert_get_issuer
assert cert_get_issuer(INTERNAL_VALID_LONG_CERT) == 'Example'
def test_get_name_from_arn():
from lemur.certificates.models import get_name_from_arn
arn = 'arn:aws:iam::11111111:server-certificate/mycertificate'
assert get_name_from_arn(arn) == 'mycertificate'
def test_get_account_number():
from lemur.certificates.models import get_account_number
arn = 'arn:aws:iam::11111111:server-certificate/mycertificate'
assert get_account_number(arn) == '11111111'
def test_create_name():
from lemur.certificates.models import create_name
from datetime import datetime
assert create_name(
'Example Inc,',
datetime(2015, 5, 7, 0, 0, 0),
datetime(2015, 5, 12, 0, 0, 0),
'example.com',
False
) == 'example.com-ExampleInc-20150507-20150512'
assert create_name(
'Example Inc,',
datetime(2015, 5, 7, 0, 0, 0),
datetime(2015, 5, 12, 0, 0, 0),
'example.com',
True
) == 'SAN-example.com-ExampleInc-20150507-20150512'
def test_is_expired():
assert 1 == 2
def test_certificate_get(client):
assert client.get(api.url_for(Certificates, certificate_id=1)).status_code == 401
def test_certificate_post(client):
assert client.post(api.url_for(Certificates, certificate_id=1), {}).status_code == 405
def test_certificate_put(client):
assert client.put(api.url_for(Certificates, certificate_id=1), {}).status_code == 401
def test_certificate_delete(client):
assert client.delete(api.url_for(Certificates, certificate_id=1)).status_code == 405
def test_certificate_patch(client):
assert client.patch(api.url_for(Certificates, certificate_id=1), {}).status_code == 405
def test_certificates_get(client):
assert client.get(api.url_for(CertificatesList)).status_code == 401
def test_certificates_post(client):
assert client.post(api.url_for(CertificatesList), {}).status_code == 401
def test_certificates_put(client):
assert client.put(api.url_for(CertificatesList), {}).status_code == 405
def test_certificates_delete(client):
assert client.delete(api.url_for(CertificatesList)).status_code == 405
def test_certificates_patch(client):
assert client.patch(api.url_for(CertificatesList), {}).status_code == 405
def test_certificate_credentials_get(client):
assert client.get(api.url_for(CertificatePrivateKey, certificate_id=1)).status_code == 401
def test_certificate_credentials_post(client):
assert client.post(api.url_for(CertificatePrivateKey, certificate_id=1), {}).status_code == 405
def test_certificate_credentials_put(client):
assert client.put(api.url_for(CertificatePrivateKey, certificate_id=1), {}).status_code == 405
def test_certificate_credentials_delete(client):
assert client.delete(api.url_for(CertificatePrivateKey, certificate_id=1)).status_code == 405
def test_certificate_credentials_patch(client):
assert client.patch(api.url_for(CertificatePrivateKey, certificate_id=1), {}).status_code == 405
def test_certificates_upload_get(client):
assert client.get(api.url_for(CertificatesUpload)).status_code == 405
def test_certificates_upload_post(client):
assert client.post(api.url_for(CertificatesUpload), {}).status_code == 401
def test_certificates_upload_put(client):
assert client.put(api.url_for(CertificatesUpload), {}).status_code == 405
def test_certificates_upload_delete(client):
assert client.delete(api.url_for(CertificatesUpload)).status_code == 405
def test_certificates_upload_patch(client):
assert client.patch(api.url_for(CertificatesUpload), {}).status_code == 405
VALID_USER_HEADER_TOKEN = {
'Authorization': 'Basic ' + 'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpYXQiOjE0MzUyMzMzNjksInN1YiI6MSwiZXhwIjoxNTIxNTQ2OTY5fQ.1qCi0Ip7mzKbjNh0tVd3_eJOrae3rNa_9MCVdA4WtQI'}
def test_auth_certificate_get(client):
assert client.get(api.url_for(Certificates, certificate_id=1), headers=VALID_USER_HEADER_TOKEN).status_code == 200
def test_auth_certificate_post_(client):
assert client.post(api.url_for(Certificates, certificate_id=1), {}, headers=VALID_USER_HEADER_TOKEN).status_code == 405
def test_auth_certificate_put(client):
assert client.put(api.url_for(Certificates, certificate_id=1), data={}, headers=VALID_USER_HEADER_TOKEN).status_code == 400
def test_auth_certificate_delete(client):
assert client.delete(api.url_for(Certificates, certificate_id=1), headers=VALID_USER_HEADER_TOKEN).status_code == 405
def test_auth_certificate_patch(client):
assert client.patch(api.url_for(Certificates, certificate_id=1), {}, headers=VALID_USER_HEADER_TOKEN).status_code == 405
def test_auth_certificates_get(client):
assert client.get(api.url_for(CertificatesList), headers=VALID_USER_HEADER_TOKEN).status_code == 200
def test_auth_certificates_post(client):
assert client.post(api.url_for(CertificatesList), {}, headers=VALID_USER_HEADER_TOKEN).status_code == 400
def test_auth_certificate_credentials_get(client):
assert client.get(api.url_for(CertificatePrivateKey, certificate_id=1), headers=VALID_USER_HEADER_TOKEN).status_code == 404
def test_auth_certificate_credentials_post(client):
assert client.post(api.url_for(CertificatePrivateKey, certificate_id=1), {}, headers=VALID_USER_HEADER_TOKEN).status_code == 405
def test_auth_certificate_credentials_put(client):
assert client.put(api.url_for(CertificatePrivateKey, certificate_id=1), {}, headers=VALID_USER_HEADER_TOKEN).status_code == 405
def test_auth_certificate_credentials_delete(client):
assert client.delete(api.url_for(CertificatePrivateKey, certificate_id=1), headers=VALID_USER_HEADER_TOKEN).status_code == 405
def test_auth_certificate_credentials_patch(client):
assert client.patch(api.url_for(CertificatePrivateKey, certificate_id=1), {}, headers=VALID_USER_HEADER_TOKEN).status_code == 405
def test_auth_certificates_upload_get(client):
assert client.get(api.url_for(CertificatesUpload), headers=VALID_USER_HEADER_TOKEN).status_code == 405
def test_auth_certificates_upload_post(client):
assert client.post(api.url_for(CertificatesUpload), {}, headers=VALID_USER_HEADER_TOKEN).status_code == 400
def test_auth_certificates_upload_put(client):
assert client.put(api.url_for(CertificatesUpload), {}, headers=VALID_USER_HEADER_TOKEN).status_code == 405
def test_auth_certificates_upload_delete(client):
assert client.delete(api.url_for(CertificatesUpload), headers=VALID_USER_HEADER_TOKEN).status_code == 405
def test_auth_certificates_upload_patch(client):
assert client.patch(api.url_for(CertificatesUpload), {}, headers=VALID_USER_HEADER_TOKEN).status_code == 405
VALID_ADMIN_HEADER_TOKEN = {
'Authorization': 'Basic ' + 'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpYXQiOjE0MzUyNTAyMTgsInN1YiI6MiwiZXhwIjoxNTIxNTYzODE4fQ.6mbq4-Ro6K5MmuNiTJBB153RDhlM5LGJBjI7GBKkfqA'}
def test_admin_certificate_get(client):
assert client.get(api.url_for(Certificates, certificate_id=1), headers=VALID_ADMIN_HEADER_TOKEN).status_code == 200
def test_admin_certificate_post(client):
assert client.post(api.url_for(Certificates, certificate_id=1), {}, headers=VALID_ADMIN_HEADER_TOKEN).status_code == 405
def test_admin_certificate_put(client):
assert client.put(api.url_for(Certificates, certificate_id=1), data={}, headers=VALID_ADMIN_HEADER_TOKEN).status_code == 400
def test_admin_certificate_delete(client):
assert client.delete(api.url_for(Certificates, certificate_id=1), headers=VALID_ADMIN_HEADER_TOKEN).status_code == 405
def test_admin_certificate_patch(client):
assert client.patch(api.url_for(Certificates, certificate_id=1), {}, headers=VALID_ADMIN_HEADER_TOKEN).status_code == 405
def test_admin_certificates_get(client):
resp = client.get(api.url_for(CertificatesList), headers=VALID_ADMIN_HEADER_TOKEN)
assert resp.status_code == 200
assert resp.json['total'] == 0
def test_admin_certificate_credentials_get(client):
assert client.get(api.url_for(CertificatePrivateKey, certificate_id=1), headers=VALID_ADMIN_HEADER_TOKEN).status_code == 404
def test_admin_certificate_credentials_post(client):
assert client.post(api.url_for(CertificatePrivateKey, certificate_id=1), {}, headers=VALID_ADMIN_HEADER_TOKEN).status_code == 405
def test_admin_certificate_credentials_put(client):
assert client.put(api.url_for(CertificatePrivateKey, certificate_id=1), data={}, headers=VALID_ADMIN_HEADER_TOKEN).status_code == 405
def test_admin_certificate_credentials_delete(client):
assert client.delete(api.url_for(CertificatePrivateKey, certificate_id=1), headers=VALID_ADMIN_HEADER_TOKEN).status_code == 405
def test_admin_certificate_credentials_patch(client):
assert client.patch(api.url_for(CertificatePrivateKey, certificate_id=1), {}, headers=VALID_ADMIN_HEADER_TOKEN).status_code == 405

123
lemur/tests/test_domains.py Normal file
View File

@ -0,0 +1,123 @@
from lemur.domains.views import *
def test_domain_get(client):
assert client.get(api.url_for(Domains, domain_id=1)).status_code == 401
def test_domain_post(client):
assert client.post(api.url_for(Domains, domain_id=1), {}).status_code == 405
def test_domain_put(client):
assert client.put(api.url_for(Domains, domain_id=1), {}).status_code == 405
def test_domain_delete(client):
assert client.delete(api.url_for(Domains, domain_id=1)).status_code == 405
def test_domain_patch(client):
assert client.patch(api.url_for(Domains, domain_id=1), {}).status_code == 405
VALID_USER_HEADER_TOKEN = {
'Authorization': 'Basic ' + 'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpYXQiOjE0MzUyMzMzNjksInN1YiI6MSwiZXhwIjoxNTIxNTQ2OTY5fQ.1qCi0Ip7mzKbjNh0tVd3_eJOrae3rNa_9MCVdA4WtQI'}
def test_auth_domain_get(client):
assert client.get(api.url_for(Domains, domain_id=1), headers=VALID_USER_HEADER_TOKEN).status_code == 200
def test_auth_domain_post_(client):
assert client.post(api.url_for(Domains, domain_id=1), {}, headers=VALID_USER_HEADER_TOKEN).status_code == 405
def test_auth_domain_put(client):
assert client.put(api.url_for(Domains, domain_id=1), data={}, headers=VALID_USER_HEADER_TOKEN).status_code == 405
def test_auth_domain_delete(client):
assert client.delete(api.url_for(Domains, domain_id=1), headers=VALID_USER_HEADER_TOKEN).status_code == 405
def test_auth_domain_patch(client):
assert client.patch(api.url_for(Domains, domain_id=1), {}, headers=VALID_USER_HEADER_TOKEN).status_code == 405
VALID_ADMIN_HEADER_TOKEN = {
'Authorization': 'Basic ' + 'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpYXQiOjE0MzUyNTAyMTgsInN1YiI6MiwiZXhwIjoxNTIxNTYzODE4fQ.6mbq4-Ro6K5MmuNiTJBB153RDhlM5LGJBjI7GBKkfqA'}
def test_admin_domain_get(client):
assert client.get(api.url_for(Domains, domain_id=1), headers=VALID_ADMIN_HEADER_TOKEN).status_code == 200
def test_admin_domain_post(client):
assert client.post(api.url_for(Domains, domain_id=1), {}, headers=VALID_ADMIN_HEADER_TOKEN).status_code == 405
def test_admin_domain_put(client):
assert client.put(api.url_for(Domains, domain_id=1), data={}, headers=VALID_ADMIN_HEADER_TOKEN).status_code == 405
def test_admin_domain_delete(client):
assert client.delete(api.url_for(Domains, domain_id=1), headers=VALID_ADMIN_HEADER_TOKEN).status_code == 405
def test_admin_domain_patch(client):
assert client.patch(api.url_for(Domains, domain_id=1), {}, headers=VALID_ADMIN_HEADER_TOKEN).status_code == 405
def test_domains_get(client):
assert client.get(api.url_for(DomainsList)).status_code == 401
def test_domains_post(client):
assert client.post(api.url_for(DomainsList), {}).status_code == 405
def test_domains_put(client):
assert client.put(api.url_for(DomainsList), {}).status_code == 405
def test_domains_delete(client):
assert client.delete(api.url_for(DomainsList)).status_code == 405
def test_domains_patch(client):
assert client.patch(api.url_for(DomainsList), {}).status_code == 405
def test_auth_domains_get(client):
assert client.get(api.url_for(DomainsList), headers=VALID_USER_HEADER_TOKEN).status_code == 200
def test_admin_domains_get(client):
resp = client.get(api.url_for(DomainsList), headers=VALID_ADMIN_HEADER_TOKEN)
assert resp.status_code == 200
assert resp.json == {'items': [], 'total': 0}
def test_certificate_domains_get(client):
assert client.get(api.url_for(CertificateDomains, certificate_id=1)).status_code == 401
def test_certificate_domains_post(client):
assert client.post(api.url_for(CertificateDomains, certificate_id=1), {}).status_code == 405
def test_certificate_domains_put(client):
assert client.put(api.url_for(CertificateDomains, certificate_id=1), {}).status_code == 405
def test_certificate_domains_delete(client):
assert client.delete(api.url_for(CertificateDomains, certificate_id=1)).status_code == 405
def test_certificate_domains_patch(client):
assert client.patch(api.url_for(CertificateDomains, certificate_id=1), {}).status_code == 405
def test_auth_certificate_domains_get(client):
assert client.get(api.url_for(CertificateDomains, certificate_id=1), headers=VALID_USER_HEADER_TOKEN).status_code == 200
def test_admin_certificate_domains_get(client):
assert client.get(api.url_for(CertificateDomains, certificate_id=1), headers=VALID_ADMIN_HEADER_TOKEN).status_code == 200

51
lemur/tests/test_elb.py Normal file
View File

@ -0,0 +1,51 @@
import boto
from lemur.tests import LemurTestCase
from moto import mock_elb, mock_sts
#class ELBTestCase(LemurTestCase):
# @mock_sts
# @mock_elb
# def test_add_listener(self):
# from lemur.common.services.aws.elb import create_new_listeners
# conn = boto.connect_elb()
# zones = ['us-east-1a', 'us-east-1b']
# ports = [(80, 8080, 'http')]
# conn.create_load_balancer('my-lb', zones, ports)
# create_new_listeners('111', 'us-east-1', 'my-lb', listeners=[('443', '80', 'HTTP')])
# balancer = conn.get_all_load_balancers()[0]
# self.assertEqual(balancer.name, "my-lb")
# self.assertEqual(len(balancer.listeners), 2)
#
# @mock_sts
# @mock_elb
# def test_update_listener(self):
# from lemur.common.services.aws.elb import update_listeners
# conn = boto.connect_elb()
# zones = ['us-east-1a', 'us-east-1b']
# ports = [(80, 8080, 'http')]
# conn.create_load_balancer('my-lb', zones, ports)
# update_listeners('111', 'us-east-1', 'my-lb', listeners=[('80', '7001', 'http')])
# balancer = conn.get_all_load_balancers()[0]
# listener = balancer.listeners[0]
# self.assertEqual(listener.load_balancer_port, 80)
# self.assertEqual(listener.instance_port, 7001)
# self.assertEqual(listener.protocol, "HTTP")
#
# @mock_sts
# @mock_elb
# def test_set_certificate(self):
# from lemur.common.services.aws.elb import attach_certificate
# conn = boto.connect_elb()
# zones = ['us-east-1a', 'us-east-1b']
# ports = [(443, 7001, 'https', 'sslcert')]
# conn.create_load_balancer('my-lb', zones, ports)
# attach_certificate('1111', 'us-east-1', 'my-lb', 443, 'somecert')
# balancer = conn.get_all_load_balancers()[0]
# listener = balancer.listeners[0]
# self.assertEqual(listener.load_balancer_port, 443)
# self.assertEqual(listener.instance_port, 7001)
# self.assertEqual(listener.protocol, "HTTPS")
# self.assertEqual(listener.ssl_certificate_id, 'somecert')
#

35
lemur/tests/test_iam.py Normal file
View File

@ -0,0 +1,35 @@
from lemur.tests import LemurTestCase
from lemur.certificates.models import Certificate
from moto import mock_iam, mock_sts
#class IAMTestCase(LemurTestCase):
# @mock_sts
# @mock_iam
# def test_get_all_server_certs(self):
# from lemur.common.services.aws.iam import upload_cert, get_all_server_certs
# cert = Certificate(TEST_CERT)
# upload_cert('1111', cert, TEST_KEY)
# certs = get_all_server_certs('1111')
# self.assertEquals(len(certs), 1)
#
# @mock_sts
# @mock_iam
# def test_get_server_cert(self):
# from lemur.common.services.aws.iam import upload_cert, get_cert_from_arn
# cert = Certificate(TEST_CERT)
# upload_cert('1111', cert, TEST_KEY)
# body, chain = get_cert_from_arn('arn:aws:iam::123456789012:server-certificate/AHB-dfdsflkj.net-NetflixInc-20140525-20150525')
# self.assertTrue(body)
#
# @mock_sts
# @mock_iam
# def test_upload_server_cert(self):
# from lemur.common.services.aws.iam import upload_cert
# cert = Certificate(TEST_CERT)
# response = upload_cert('1111', cert, TEST_KEY)
# self.assertEquals(response['upload_server_certificate_response']['upload_server_certificate_result']['server_certificate_metadata']['server_certificate_name'], 'AHB-dfdsflkj.net-NetflixInc-20140525-20150525')
#
#

View File

@ -0,0 +1,16 @@
from lemur.tests import LemurTestCase
#class ManagerTestCase(LemurTestCase):
# def test_validate_authority(self):
# pass
#
# def test_get_all_authorities(self):
# from lemur.common.services.issuers.manager import get_all_authorities
# authorities = get_all_authorities()
# self.assertEqual(len(authorities), 3)
#
# def test_get_all_issuers(self):
# from lemur.common.services.issuers.manager import get_all_issuers
# issuers = get_all_issuers()
# self.assertEqual(len(issuers) > 1)
#

311
lemur/tests/test_roles.py Normal file
View File

@ -0,0 +1,311 @@
from json import dumps
from lemur.roles.service import *
from lemur.roles.views import *
def test_crud(session):
role = create('role1')
assert role.id > 0
role = update(role.id, 'role_new', None, [])
assert role.name == 'role_new'
delete(role.id)
assert get(role.id) == None
def test_role_get(client):
assert client.get(api.url_for(Roles, role_id=1)).status_code == 401
def test_role_post(client):
assert client.post(api.url_for(Roles, role_id=1), {}).status_code == 405
def test_role_put(client):
assert client.put(api.url_for(Roles, role_id=1), {}).status_code == 401
def test_role_delete(client):
assert client.delete(api.url_for(Roles, role_id=1)).status_code == 401
def test_role_patch(client):
assert client.patch(api.url_for(Roles, role_id=1), {}).status_code == 405
def test_roles_get(client):
assert client.get(api.url_for(RolesList)).status_code == 401
def test_roles_post(client):
assert client.post(api.url_for(RolesList), {}).status_code == 401
def test_roles_put(client):
assert client.put(api.url_for(RolesList), {}).status_code == 405
def test_roles_delete(client):
assert client.delete(api.url_for(RolesList)).status_code == 405
def test_roles_patch(client):
assert client.patch(api.url_for(RolesList), {}).status_code == 405
def test_role_credentials_get(client):
assert client.get(api.url_for(RoleViewCredentials, role_id=1)).status_code == 401
def test_role_credentials_post(client):
assert client.post(api.url_for(RoleViewCredentials, role_id=1), {}).status_code == 405
def test_role_credentials_put(client):
assert client.put(api.url_for(RoleViewCredentials, role_id=1), {}).status_code == 405
def test_role_credentials_delete(client):
assert client.delete(api.url_for(RoleViewCredentials, role_id=1)).status_code == 405
def test_role_credentials_patch(client):
assert client.patch(api.url_for(RoleViewCredentials, role_id=1), {}).status_code == 405
def test_user_roles_get(client):
assert client.get(api.url_for(UserRolesList, user_id=1)).status_code == 401
def test_user_roles_post(client):
assert client.post(api.url_for(UserRolesList, user_id=1), {}).status_code == 405
def test_user_roles_put(client):
assert client.put(api.url_for(UserRolesList, user_id=1), {}).status_code == 405
def test_user_roles_delete(client):
assert client.delete(api.url_for(UserRolesList, user_id=1)).status_code == 405
def test_user_roles_patch(client):
assert client.patch(api.url_for(UserRolesList, user_id=1), {}).status_code == 405
def test_authority_roles_get(client):
assert client.get(api.url_for(AuthorityRolesList, authority_id=1)).status_code == 401
def test_authority_roles_post(client):
assert client.post(api.url_for(AuthorityRolesList, authority_id=1), {}).status_code == 405
def test_authority_roles_put(client):
assert client.put(api.url_for(AuthorityRolesList, authority_id=1), {}).status_code == 405
def test_authority_roles_delete(client):
assert client.delete(api.url_for(AuthorityRolesList, authority_id=1)).status_code == 405
def test_authority_roles_patch(client):
assert client.patch(api.url_for(AuthorityRolesList, authority_id=1), {}).status_code == 405
VALID_USER_HEADER_TOKEN = {
'Authorization': 'Basic ' + 'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpYXQiOjE0MzUyMzMzNjksInN1YiI6MSwiZXhwIjoxNTIxNTQ2OTY5fQ.1qCi0Ip7mzKbjNh0tVd3_eJOrae3rNa_9MCVdA4WtQI'}
def test_auth_role_get(client):
assert client.get(api.url_for(Roles, role_id=1), headers=VALID_USER_HEADER_TOKEN).status_code == 400
def test_auth_role_post_(client):
assert client.post(api.url_for(Roles, role_id=1), {}, headers=VALID_USER_HEADER_TOKEN).status_code == 405
def test_auth_role_put(client):
assert client.put(api.url_for(Roles, role_id=1), data={}, headers=VALID_USER_HEADER_TOKEN).status_code == 400
def test_auth_role_delete(client):
assert client.delete(api.url_for(Roles, role_id=1), headers=VALID_USER_HEADER_TOKEN).status_code == 403
def test_auth_role_patch(client):
assert client.patch(api.url_for(Roles, role_id=1), {}, headers=VALID_USER_HEADER_TOKEN).status_code == 405
def test_auth_roles_get(client):
assert client.get(api.url_for(RolesList), headers=VALID_USER_HEADER_TOKEN).status_code == 200
def test_auth_roles_post(client):
assert client.post(api.url_for(RolesList), {}, headers=VALID_USER_HEADER_TOKEN).status_code == 403
def test_auth_role_credentials_get(client):
assert client.get(api.url_for(RoleViewCredentials, role_id=1), headers=VALID_USER_HEADER_TOKEN).status_code == 403
def test_auth_role_credentials_post(client):
assert client.post(api.url_for(RoleViewCredentials, role_id=1), {}, headers=VALID_USER_HEADER_TOKEN).status_code == 405
def test_auth_role_credentials_put(client):
assert client.put(api.url_for(RoleViewCredentials, role_id=1), {}, headers=VALID_USER_HEADER_TOKEN).status_code == 405
def test_auth_role_credentials_delete(client):
assert client.delete(api.url_for(RoleViewCredentials, role_id=1), headers=VALID_USER_HEADER_TOKEN).status_code == 405
def test_auth_role_credentials_patch(client):
assert client.patch(api.url_for(RoleViewCredentials, role_id=1), {}, headers=VALID_USER_HEADER_TOKEN).status_code == 405
def test_auth_user_roles_get(client):
assert client.get(api.url_for(UserRolesList, user_id=1), headers=VALID_USER_HEADER_TOKEN).status_code == 200
def test_auth_user_roles_post(client):
assert client.post(api.url_for(UserRolesList, user_id=1), {}, headers=VALID_USER_HEADER_TOKEN).status_code == 405
def test_auth_user_roles_put(client):
assert client.put(api.url_for(UserRolesList, user_id=1), {}, headers=VALID_USER_HEADER_TOKEN).status_code == 405
def test_auth_user_roles_delete(client):
assert client.delete(api.url_for(UserRolesList, user_id=1), headers=VALID_USER_HEADER_TOKEN).status_code == 405
def test_auth_user_roles_patch(client):
assert client.patch(api.url_for(UserRolesList, user_id=1), {}, headers=VALID_USER_HEADER_TOKEN).status_code == 405
def test_auth_authority_roles_get(client):
assert client.get(api.url_for(AuthorityRolesList, authority_id=1), headers=VALID_USER_HEADER_TOKEN).status_code == 200
def test_auth_authority_roles_post(client):
assert client.post(api.url_for(AuthorityRolesList, authority_id=1), {}, headers=VALID_USER_HEADER_TOKEN).status_code == 405
def test_auth_authority_roles_put(client):
assert client.put(api.url_for(AuthorityRolesList, authority_id=1), {}, headers=VALID_USER_HEADER_TOKEN).status_code == 405
def test_auth_authority_roles_delete(client):
assert client.delete(api.url_for(AuthorityRolesList, authority_id=1), headers=VALID_USER_HEADER_TOKEN).status_code == 405
def test_auth_authority_roles_patch(client):
assert client.patch(api.url_for(AuthorityRolesList, authority_id=1), {}, headers=VALID_USER_HEADER_TOKEN).status_code == 405
VALID_ADMIN_HEADER_TOKEN = {
'Authorization': 'Basic ' + 'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpYXQiOjE0MzUyNTAyMTgsInN1YiI6MiwiZXhwIjoxNTIxNTYzODE4fQ.6mbq4-Ro6K5MmuNiTJBB153RDhlM5LGJBjI7GBKkfqA'}
def test_admin_role_get(client):
assert client.get(api.url_for(Roles, role_id=1), headers=VALID_ADMIN_HEADER_TOKEN).status_code == 200
def test_admin_role_post(client):
assert client.post(api.url_for(Roles, role_id=1), {}, headers=VALID_ADMIN_HEADER_TOKEN).status_code == 405
def test_admin_role_put(client):
assert client.put(api.url_for(Roles, role_id=1), data={}, headers=VALID_ADMIN_HEADER_TOKEN).status_code == 400
def test_admin_role_delete(client):
assert client.delete(api.url_for(Roles, role_id=1), headers=VALID_ADMIN_HEADER_TOKEN).status_code == 200
def test_admin_role_patch(client):
assert client.patch(api.url_for(Roles, role_id=1), {}, headers=VALID_ADMIN_HEADER_TOKEN).status_code == 405
def test_admin_roles_get(client):
resp = client.get(api.url_for(RolesList), headers=VALID_ADMIN_HEADER_TOKEN)
assert resp.status_code == 200
assert resp.json['total'] > 0
def test_admin_role_credentials_get(client):
assert client.get(api.url_for(RolesList), headers=VALID_ADMIN_HEADER_TOKEN).status_code == 200
def test_admin_role_credentials_post(client):
assert client.post(api.url_for(RolesList), {}, headers=VALID_ADMIN_HEADER_TOKEN).status_code == 400
def test_admin_role_credentials_put(client):
assert client.put(api.url_for(RolesList), {}, headers=VALID_ADMIN_HEADER_TOKEN).status_code == 405
def test_admin_role_credentials_delete(client):
assert client.delete(api.url_for(RolesList), headers=VALID_ADMIN_HEADER_TOKEN).status_code == 405
def test_admin_role_credentials_patch(client):
assert client.patch(api.url_for(RolesList), {}, headers=VALID_ADMIN_HEADER_TOKEN).status_code == 405
def test_admin_user_roles_get(client):
assert client.get(api.url_for(UserRolesList, user_id=1), headers=VALID_ADMIN_HEADER_TOKEN).status_code == 200
def test_admin_user_roles_post(client):
assert client.post(api.url_for(UserRolesList, user_id=1), {}, headers=VALID_ADMIN_HEADER_TOKEN).status_code == 405
def test_admin_user_roles_put(client):
assert client.put(api.url_for(UserRolesList, user_id=1), {}, headers=VALID_ADMIN_HEADER_TOKEN).status_code == 405
def test_admin_user_roles_delete(client):
assert client.delete(api.url_for(UserRolesList, user_id=1), headers=VALID_ADMIN_HEADER_TOKEN).status_code == 405
def test_admin_user_roles_patch(client):
assert client.patch(api.url_for(UserRolesList, user_id=1), {}, headers=VALID_ADMIN_HEADER_TOKEN).status_code == 405
def test_admin_authority_roles_get(client):
assert client.get(api.url_for(AuthorityRolesList, authority_id=1), headers=VALID_ADMIN_HEADER_TOKEN).status_code == 200
def test_admin_authority_roles_post(client):
assert client.post(api.url_for(AuthorityRolesList, authority_id=1), {}, headers=VALID_ADMIN_HEADER_TOKEN).status_code == 405
def test_admin_authority_roles_put(client):
assert client.put(api.url_for(AuthorityRolesList, authority_id=1), {}, headers=VALID_ADMIN_HEADER_TOKEN).status_code == 405
def test_admin_authority_roles_delete(client):
assert client.delete(api.url_for(AuthorityRolesList, authority_id=1), headers=VALID_ADMIN_HEADER_TOKEN).status_code == 405
def test_admin_authority_roles_patch(client):
assert client.patch(api.url_for(AuthorityRolesList, authority_id=1), {}, headers=VALID_ADMIN_HEADER_TOKEN).status_code == 405
def test_admin_roles_crud(client):
assert client.post(api.url_for(RolesList), data={}, headers=VALID_ADMIN_HEADER_TOKEN).status_code == 400
data = {'name': 'role', 'description': 'test'}
resp = client.post(api.url_for(RolesList), data=dumps(data), content_type='application/json', headers=VALID_ADMIN_HEADER_TOKEN)
assert resp.status_code == 200
role_id = resp.json['id']
assert client.get(api.url_for(Roles, role_id=role_id), headers=VALID_ADMIN_HEADER_TOKEN).status_code == 200
resp = client.get(api.url_for(RolesList), headers=VALID_ADMIN_HEADER_TOKEN)
assert resp.status_code == 200
assert resp.json['total'] == 2
assert client.delete(api.url_for(Roles, role_id=role_id), headers=VALID_ADMIN_HEADER_TOKEN).status_code == 200
resp = client.get(api.url_for(RolesList), headers=VALID_ADMIN_HEADER_TOKEN)
assert resp.status_code == 200
assert resp.json['total'] == 1

View File

@ -31,8 +31,7 @@ def create(username, password, email, active, profile_picture, roles):
profile_picture=profile_picture,
role=roles
)
user = database.create(user)
return database.update(user)
return database.create(user)
def update(user_id, username, email, active, profile_picture, roles):

View File

@ -46,7 +46,8 @@ install_requires=[
tests_require = [
'pyflakes',
'moto',
'nose'
'nose',
'pytest'
]
docs_require = [