Adding in some initial tests

This commit is contained in:
Kevin Glisson 2015-06-24 16:48:40 -07:00 committed by kevgliss
parent eadfaaeed0
commit 39ad270dad
29 changed files with 386 additions and 298 deletions

View File

@ -15,7 +15,7 @@ from sqlalchemy import exc
from sqlalchemy.sql import and_, or_ from sqlalchemy.sql import and_, or_
from lemur.extensions import db from lemur.extensions import db
from lemur.exceptions import AttrNotFound, IntegrityError from lemur.exceptions import AttrNotFound, IntegrityError, DuplicateError
def filter_none(kwargs): def filter_none(kwargs):
@ -153,9 +153,10 @@ def create(model):
try: try:
db.session.add(model) db.session.add(model)
commit() commit()
db.session.refresh(model)
except exc.IntegrityError as e: except exc.IntegrityError as e:
raise IntegrityError(e.orig.diag.message_detail) raise DuplicateError(e.orig.diag.message_detail)
db.session.refresh(model)
return model return model

View File

@ -11,6 +11,14 @@ class LemurException(Exception):
current_app.logger.error(self) current_app.logger.error(self)
class DuplicateError(LemurException):
def __init__(self, key):
self.key = key
def __str__(self):
return repr("Duplicate found! Could not create: {0}".format(self.key))
class AuthenticationFailedException(LemurException): class AuthenticationFailedException(LemurException):
def __init__(self, remote_ip, user_agent): def __init__(self, remote_ip, user_agent):
self.remote_ip = remote_ip self.remote_ip = remote_ip

View File

@ -4,7 +4,41 @@
:license: Apache, see LICENSE for more details. :license: Apache, see LICENSE for more details.
""" """
from flask.ext.sqlalchemy import SQLAlchemy from flask.ext.sqlalchemy import SQLAlchemy, SignallingSession, SessionBase
class _SignallingSession(SignallingSession):
"""A subclass of `SignallingSession` that allows for `binds` to be specified
in the `options` keyword arguments.
"""
def __init__(self, db, autocommit=False, autoflush=True, **options):
self.app = db.get_app()
self._model_changes = {}
self.emit_modification_signals = \
self.app.config['SQLALCHEMY_TRACK_MODIFICATIONS']
bind = options.pop('bind', None)
if bind is None:
bind = db.engine
binds = options.pop('binds', None)
if binds is None:
binds = db.get_binds(self.app)
SessionBase.__init__(self,
autocommit=autocommit,
autoflush=autoflush,
bind=bind,
binds=binds,
**options)
class _SQLAlchemy(SQLAlchemy):
"""A subclass of `SQLAlchemy` that uses `_SignallingSession`."""
def create_session(self, options):
return _SignallingSession(self, **options)
db = SQLAlchemy() db = SQLAlchemy()
from flask.ext.migrate import Migrate from flask.ext.migrate import Migrate

View File

@ -51,6 +51,12 @@ def create_app(app_name=None, blueprints=None, config=None):
configure_blueprints(app, blueprints) configure_blueprints(app, blueprints)
configure_extensions(app) configure_extensions(app)
configure_logging(app) configure_logging(app)
@app.teardown_appcontext
def teardown(exception=None):
if db.session:
db.session.remove()
return app return app

View File

@ -1,16 +1,4 @@
import unittest import unittest
from nose.tools import eq_
from lemur import app
test_app = app.test_client()
HEADERS = {'Content-Type': 'application/json'}
def check_content_type(headers):
eq_(headers['Content-Type'], 'application/json')
class LemurTestCase(unittest.TestCase): class LemurTestCase(unittest.TestCase):
pass pass

View File

@ -1,87 +0,0 @@
import os
import shutil
import boto
from lemur import app
from lemur.tests import LemurTestCase
from lemur.tests.constants import TEST_CERT, TEST_KEY
from moto import mock_iam, mock_sts, mock_s3
class CertificateTestCase(LemurTestCase):
def test_create_challenge(self):
from lemur.certificates.service import create_challenge
self.assertTrue(len(create_challenge()) >= 24)
def test_hash_domains(self):
from lemur.certificates.service import hash_domains
h = hash_domains(['netflix.com', 'www.netflix.com', 'movies.netflix.com'])
self.assertEqual('c9c83253b46c7c1245c100ed3f7045eb', h)
def test_create_csr(self):
from lemur.certificates.service import create_csr
from lemur.tests.certificates.test_csr import TEST_CSR
path = create_csr(['netflix.com'], TEST_CSR)
files = len(os.listdir(path))
self.assertEqual(files, 4)
shutil.rmtree(path)
def test_create_san_csr(self):
from lemur.certificates.service import create_csr
from lemur.tests.certificates.test_csr import TEST_CSR
path = create_csr(['netflix.com', 'www.netflix.com'], TEST_CSR)
files = len(os.listdir(path))
self.assertEqual(files, 4)
shutil.rmtree(path)
def test_create_path(self):
from lemur.certificates.service import create_path
path = create_path("blah")
self.assertIn('blah', path)
shutil.rmtree(path)
@mock_s3
@mock_sts
@mock_iam
def test_save_cert(self):
from lemur.certificates.service import save_cert
from lemur.common.services.aws.iam import get_all_server_certs
conn = boto.connect_s3()
bucket = conn.create_bucket(app.config.get('S3_BUCKET'))
cert = save_cert(TEST_CERT, TEST_KEY, None, "blah", "blah", [1])
count = 0
for key in bucket.list():
count += 1
self.assertEqual(count, 4)
certs = get_all_server_certs('1111')
self.assertEqual(len(certs), 1)
# @mock_s3
# @mock_sts
# @mock_iam
# def test_upload_cert(self):
# from lemur.certificates.service import upload
# from lemur.common.services.aws.iam import get_all_server_certs
# conn = boto.connect_s3()
# bucket = conn.create_bucket(app.config.get('S3_BUCKET'))
#
# cert_up = {"public_cert": TEST_CERT, "private_key": TEST_KEY, "owner": "test@example.com", "accounts_ids": ['1111']}
#
# cert_name = upload(**cert_up)
# valid_name = 'AHB-dfdsflkj.net-NetflixInc-20140525-20150525'
# self.assertEqual(cert_name, valid_name)
#
# app.logger.debug(cert_name)
# count = 0
#
# for key in bucket.list():
# count += 1
#
# self.assertEqual(count, 2)
# certs = get_all_server_certs('179727101194')
# self.assertEqual(len(certs), 1)
#
#
#

91
lemur/tests/conftest.py Normal file
View File

@ -0,0 +1,91 @@
import pytest
from lemur import create_app
from lemur.database import db as _db
from flask.ext.sqlalchemy import SignallingSession
from sqlalchemy import event
def pytest_addoption(parser):
parser.addoption("--runslow", action="store_true", help="run slow tests")
def pytest_runtest_setup(item):
if 'slow' in item.keywords and not item.config.getoption("--runslow"):
pytest.skip("need --runslow option to run")
if "incremental" in item.keywords:
previousfailed = getattr(item.parent, "_previousfailed", None)
if previousfailed is not None:
pytest.xfail("previous test failed ({0})".format(previousfailed.name))
def pytest_runtest_makereport(item, call):
if "incremental" in item.keywords:
if call.excinfo is not None:
parent = item.parent
parent._previousfailed = item
@pytest.yield_fixture(scope="session")
def app():
"""
Creates a new Flask application for a test duration.
Uses application factory `create_app`.
"""
app = create_app()
ctx = app.app_context()
ctx.push()
yield app
ctx.pop()
@pytest.yield_fixture(scope="session")
def db():
_db.create_all()
yield _db
_db.drop_all()
@pytest.yield_fixture(scope="function")
def session(app, db):
"""
Creates a new database session with (with working transaction)
for test duration.
"""
connection = _db.engine.connect()
transaction = connection.begin()
options = dict(bind=connection)
session = _db.create_scoped_session(options=options)
# then each time that SAVEPOINT ends, reopen it
@event.listens_for(SignallingSession, "after_transaction_end")
def restart_savepoint(session, transaction):
if transaction.nested and not transaction._parent.nested:
# ensure that state is expired the way
# session.commit() at the top level normally does
# (optional step)
session.expire_all()
session.begin_nested()
# pushing new Flask application context for multiple-thread
# tests to work
_db.session = session
yield session
# the code after the yield statement works as a teardown
transaction.rollback()
connection.close()
session.remove()

View File

@ -1,51 +0,0 @@
TEST_KEY = """-----BEGIN RSA PRIVATE KEY-----
MIIEogIBAAKCAQEAvNudwW+UeQqkpY71MIdEg501AFlPKuOXG2xU8DZhvZS6dKv+
kDmIWdEqodDgkQiy0jyTgTwxwRqDSw96R6ZgrXefUoJJo66aCsosTBZtVaE85f1L
bj2+3U678c+rekUdkrnGcGCo6b8QtdvBpiDy2clneox8tSvmffAdcR1uCv/790/k
PzQ/djWDX9JcBRyDkcTJwYC0/ek7URvA/+MXmgUL13T+gWKqduaKuIBlFetonDjn
nO11QUBiusIuHV62wzKn8m5Nc+4XoaBR0YWMFn/g6qXDYrwfCsMpka7vSWJFv5Ff
yf+7kY3wU4xIwU2vXlIDcCsdUu6b/pYoQ0YOsQIDAQABAoIBAGbFH6iWnnXrq8MH
8zcQNOFmF+RztRgCt0TOA76f6TowB/LbcXBsTl2J7CgYMUvbLuwm2KHX7r9FPTMI
XiNFT5C16rYMfiQbLGo4sDhLb/3L+wawem6oHQfzA2VH++lSWRByFaEriF+CgIZl
6pALl/uZlLzkXCx+kjPwCSV3vV0wFkDnNs6+wPrz2IhkePsuC8J0QKQLlwsES2It
Gizzhpehdv9lc9MyZC//1QlD9gMDl5ok5Bt1Xm2c12XUEEcLlKQkJxiOrBOfXPmV
PHCdLc7gZO30hc6dyQ1SSnLpywhz/a0ir2GMvkMbS5hculpcZmwEcdZl1HYD8ObP
yOMbPE0CgYEA4LVGJKGtbM8RiBB0MstxNstMYVJ4mXB0lSQ0RazdO3S3ojn+oLpF
b2pvV6m9WnHiCGigWkzhqtGGCo6aqE0MoiR4jTN8GhiZz4ggDDaVgc4Px5reUD+r
tRsTpBHseGQ+ODGgkMI8eJYkdyqkECkYjAOrdy6uorvgxUAZecRIfJMCgYEA1yhM
7NidTNRuA+huS5GcQwQweTM6P1qF7Kfk1JYQMVu4gibLZiLHlWCyHI9lrbI7IaMm
g/4jXXoewv7IvyrrSEFulkPeVWxCe3mjfQ8JANfUj4kuR915LSn4lX2pbUgUS66K
vJSUJtnzLUmb8khLEcOmDbmTFZl8D/bTHFFZlisCgYAeelfWNhuoq3lMRDcOgKuN
bAujE6WJ4kfdxrhUTvr+ynjxxv3zXPB4CS6q7Dnjn5ix3UcKmGzvV1Xf7rGpbDHv
eBTlyfrmKzoJfQQjw++JWKKpRycqKUin2tFSKqAxQB90Tb7ig4XiMTMm+qCgFILg
0sqZ8rn7FpKJDoWmD2ppgwKBgG2Dl9QeVcKbhfv7PNi+HvmFkl6+knFY1D4nHzSN
xWQ6OWoV8QXlwgzokQA0hR6qT6rJbntUyg90b1/1a5zSbbvzgiR+GxcD6bsLqQmo
s354XTtKKgJuWpWAfYUp1ylGvP3gs8FVJyu3WC2+/9+MqJk8KrNlt9YQr7M4gTAy
wBTNAoGAGU7Po4uI3xDKGLLK/ot3D3P8U9ByfeLlrUZtTz1PASsMOr92bkXmUPlE
DYUd5uFfwwlvbMNT1Ooeyrzg3bARd9B6ATyMkOaJeGoQwFAI468iucnm9rNXB+/t
U2rbIi1pXSm8zSNEY85tf6C8DU/5YbcAPf47a2UYhwCpYAJfMk0=
-----END RSA PRIVATE KEY-----"""
TEST_CERT = """-----BEGIN CERTIFICATE-----
MIIDcDCCAlgCCQC8msHu/aa61zANBgkqhkiG9w0BAQUFADB6MQswCQYDVQQGEwJV
UzETMBEGA1UECBMKQ0FMSUZPUk5JQTESMBAGA1UEBxMJTG9zIEdhdG9zMRYwFAYD
VQQKEw1OZXRmbGl4LCBJbmMuMRMwEQYDVQQLEwpPcGVyYXRpb25zMRUwEwYDVQQD
EwxkZmRzZmxrai5uZXQwHhcNMTQwNTI1MTczMDMzWhcNMTUwNTI1MTczMDMzWjB6
MQswCQYDVQQGEwJVUzETMBEGA1UECBMKQ0FMSUZPUk5JQTESMBAGA1UEBxMJTG9z
IEdhdG9zMRYwFAYDVQQKEw1OZXRmbGl4LCBJbmMuMRMwEQYDVQQLEwpPcGVyYXRp
b25zMRUwEwYDVQQDEwxkZmRzZmxrai5uZXQwggEiMA0GCSqGSIb3DQEBAQUAA4IB
DwAwggEKAoIBAQC8253Bb5R5CqSljvUwh0SDnTUAWU8q45cbbFTwNmG9lLp0q/6Q
OYhZ0Sqh0OCRCLLSPJOBPDHBGoNLD3pHpmCtd59SgkmjrpoKyixMFm1VoTzl/Utu
Pb7dTrvxz6t6RR2SucZwYKjpvxC128GmIPLZyWd6jHy1K+Z98B1xHW4K//v3T+Q/
ND92NYNf0lwFHIORxMnBgLT96TtRG8D/4xeaBQvXdP6BYqp25oq4gGUV62icOOec
7XVBQGK6wi4dXrbDMqfybk1z7hehoFHRhYwWf+DqpcNivB8KwymRru9JYkW/kV/J
/7uRjfBTjEjBTa9eUgNwKx1S7pv+lihDRg6xAgMBAAEwDQYJKoZIhvcNAQEFBQAD
ggEBAJHwa4l2iSiFBb6wVFBJEWEt31qp+njiVCoTg2OJzCT60Xb26hkrsiTldIIh
eB9+y+fwdfwopzWhkNbIOlCfudx/uxtpor8/3BRbjSlNwDUg2L8pfAircJMFLQUM
O6nqPOBWCe8hXwe9FQM/oFOavf/AAw/FED+892xlytjirK9u3B28O20W11+fY7hp
8LQVBrMoVxFeLWmmwETAltJ7HEYutplRzYTM0vLBARl4Vd5kLJlY3j2Dp1ZpRGcg
CrQp26UD/oaAPGtiZQSC4LJ+4JfOuuqbm3CI24QMCh9rxv3ZoOQnFuC+7cZgqrat
V4bxCrVvWhrrDSgy9+A80NVzQ3k=
-----END CERTIFICATE-----"""

View File

@ -1,5 +0,0 @@
import os
import shutil
from lemur import app
from lemur.tests import LemurTestCase

View File

@ -1,51 +0,0 @@
import boto
from lemur.tests import LemurTestCase
from moto import mock_elb, mock_sts
class ELBTestCase(LemurTestCase):
@mock_sts
@mock_elb
def test_add_listener(self):
from lemur.common.services.aws.elb import create_new_listeners
conn = boto.connect_elb()
zones = ['us-east-1a', 'us-east-1b']
ports = [(80, 8080, 'http')]
conn.create_load_balancer('my-lb', zones, ports)
create_new_listeners('111', 'us-east-1', 'my-lb', listeners=[('443', '80', 'HTTP')])
balancer = conn.get_all_load_balancers()[0]
self.assertEqual(balancer.name, "my-lb")
self.assertEqual(len(balancer.listeners), 2)
@mock_sts
@mock_elb
def test_update_listener(self):
from lemur.common.services.aws.elb import update_listeners
conn = boto.connect_elb()
zones = ['us-east-1a', 'us-east-1b']
ports = [(80, 8080, 'http')]
conn.create_load_balancer('my-lb', zones, ports)
update_listeners('111', 'us-east-1', 'my-lb', listeners=[('80', '7001', 'http')])
balancer = conn.get_all_load_balancers()[0]
listener = balancer.listeners[0]
self.assertEqual(listener.load_balancer_port, 80)
self.assertEqual(listener.instance_port, 7001)
self.assertEqual(listener.protocol, "HTTP")
@mock_sts
@mock_elb
def test_set_certificate(self):
from lemur.common.services.aws.elb import attach_certificate
conn = boto.connect_elb()
zones = ['us-east-1a', 'us-east-1b']
ports = [(443, 7001, 'https', 'sslcert')]
conn.create_load_balancer('my-lb', zones, ports)
attach_certificate('1111', 'us-east-1', 'my-lb', 443, 'somecert')
balancer = conn.get_all_load_balancers()[0]
listener = balancer.listeners[0]
self.assertEqual(listener.load_balancer_port, 443)
self.assertEqual(listener.instance_port, 7001)
self.assertEqual(listener.protocol, "HTTPS")
self.assertEqual(listener.ssl_certificate_id, 'somecert')

View File

@ -1,37 +0,0 @@
from lemur import app
from lemur.tests import LemurTestCase
from lemur.tests.constants import TEST_CERT, TEST_KEY
from lemur.certificates.models import Certificate
from moto import mock_iam, mock_sts
class IAMTestCase(LemurTestCase):
@mock_sts
@mock_iam
def test_get_all_server_certs(self):
from lemur.common.services.aws.iam import upload_cert, get_all_server_certs
cert = Certificate(TEST_CERT)
upload_cert('1111', cert, TEST_KEY)
certs = get_all_server_certs('1111')
self.assertEquals(len(certs), 1)
@mock_sts
@mock_iam
def test_get_server_cert(self):
from lemur.common.services.aws.iam import upload_cert, get_cert_from_arn
cert = Certificate(TEST_CERT)
upload_cert('1111', cert, TEST_KEY)
body, chain = get_cert_from_arn('arn:aws:iam::123456789012:server-certificate/AHB-dfdsflkj.net-NetflixInc-20140525-20150525')
self.assertTrue(body)
@mock_sts
@mock_iam
def test_upload_server_cert(self):
from lemur.common.services.aws.iam import upload_cert
cert = Certificate(TEST_CERT)
response = upload_cert('1111', cert, TEST_KEY)
self.assertEquals(response['upload_server_certificate_response']['upload_server_certificate_result']['server_certificate_metadata']['server_certificate_name'], 'AHB-dfdsflkj.net-NetflixInc-20140525-20150525')

View File

@ -1,23 +0,0 @@
from lemur import app
from lemur.tests import LemurTestCase
from lemur.tests.constants import TEST_CERT, TEST_KEY
from lemur.certificates.models import Certificate
from moto import mock_iam, mock_sts
class ManagerTestCase(LemurTestCase):
def test_validate_authority(self):
pass
def test_get_all_authorities(self):
from lemur.common.services.issuers.manager import get_all_authorities
authorities = get_all_authorities()
self.assertEqual(len(authorities), 3)
def test_get_all_issuers(self):
from lemur.common.services.issuers.manager import get_all_issuers
issuers = get_all_issuers()
self.assertEqual(len(issuers) > 1)

View File

@ -1,27 +0,0 @@
import boto
from lemur.tests import LemurTestCase
from lemur.tests.constants import TEST_CERT
from lemur.certificates.models import Certificate
from moto import mock_s3
class S3TestCase(LemurTestCase):
@mock_s3
def test_save(self):
from lemur.common.services.aws.s3 import save
conn = boto.connect_s3()
cert = Certificate(TEST_CERT)
buck = conn.create_bucket('test')
path = save(cert, 'private_key', None, 'csr_config', 'challenge')
self.assertEqual(path, 'lemur/{}/{}/'.format(cert.issuer, cert.name))
count = 0
for key in buck.list():
count += 1
self.assertEqual(count, 4)

View File

@ -0,0 +1,53 @@
import pytest
from lemur.accounts.service import *
from lemur.exceptions import DuplicateError
from lemur.accounts.views import *
#def test_crud(session):
# account = create('111111', 'account1')
# assert account.id > 0
#
# account = update(account.id, 11111, 'account2')
# assert account.label == 'account2'
#
# assert len(get_all()) == 1
#
# delete(1)
# assert len(get_all()) == 0
#
#def test_duplicate(session):
# account = create('111111', 'account1')
# assert account.id > 0
#
# with pytest.raises(DuplicateError):
# account = create('111111', 'account1')
def test_basic_user_views(client):
pass
def test_admin_user_views(client):
pass
def test_unauthenticated_views(client):
assert client.get(api.url_for(Accounts, account_id=1)).status_code == 401
assert client.post(api.url_for(Accounts, account_id=1), {}).status_code == 405
assert client.put(api.url_for(Accounts, account_id=1), {}).status_code == 401
assert client.delete(api.url_for(Accounts, account_id=1)).status_code == 401
assert client.patch(api.url_for(Accounts, account_id=1), {}).status_code == 405
assert client.get(api.url_for(AccountsList)).status_code == 401
assert client.post(api.url_for(AccountsList), {}).status_code == 401
assert client.put(api.url_for(AccountsList), {}).status_code == 405
assert client.delete(api.url_for(AccountsList)).status_code == 405
assert client.patch(api.url_for(Accounts), {}).status_code == 405
assert client.get(api.url_for(CertificateAccounts, certificate_id=1)).status_code == 401
assert client.post(api.url_for(CertificateAccounts), {}).status_code == 405
assert client.put(api.url_for(CertificateAccounts), {}).status_code == 405
assert client.delete(api.url_for(CertificateAccounts)).status_code == 405
assert client.patch(api.url_for(CertificateAccounts), {}).status_code == 405

View File

@ -0,0 +1,85 @@
import os
import shutil
import boto
from moto import mock_iam, mock_sts, mock_s3
from lemur.tests import LemurTestCase
#class CertificateTestCase(LemurTestCase):
# def test_create_challenge(self):
# from lemur.certificates.service import create_challenge
# self.assertTrue(len(create_challenge()) >= 24)
#
# def test_hash_domains(self):
# from lemur.certificates.service import hash_domains
# h = hash_domains(['netflix.com', 'www.netflix.com', 'movies.netflix.com'])
# self.assertEqual('c9c83253b46c7c1245c100ed3f7045eb', h)
#
# def test_create_csr(self):
# from lemur.certificates.service import create_csr
# from lemur.tests.test_csr import TEST_CSR
# path = create_csr(['netflix.com'], TEST_CSR)
# files = len(os.listdir(path))
# self.assertEqual(files, 4)
# shutil.rmtree(path)
#
# def test_create_san_csr(self):
# from lemur.certificates.service import create_csr
# from lemur.tests.test_csr import TEST_CSR
# path = create_csr(['netflix.com', 'www.netflix.com'], TEST_CSR)
# files = len(os.listdir(path))
# self.assertEqual(files, 4)
# shutil.rmtree(path)
#
# def test_create_path(self):
# from lemur.certificates.service import create_path
# path = create_path("blah")
# self.assertIn('blah', path)
# shutil.rmtree(path)
#
# @mock_s3
# @mock_sts
# @mock_iam
# def test_save_cert(self):
# from lemur.certificates.service import save_cert
# from lemur.common.services.aws.iam import get_all_server_certs
# conn = boto.connect_s3()
# bucket = conn.create_bucket(app.config.get('S3_BUCKET'))
# cert = save_cert(TEST_CERT, TEST_KEY, None, "blah", "blah", [1])
# count = 0
# for key in bucket.list():
# count += 1
#
# self.assertEqual(count, 4)
# certs = get_all_server_certs('1111')
# self.assertEqual(len(certs), 1)
#
## @mock_s3
## @mock_sts
## @mock_iam
## def test_upload_cert(self):
## from lemur.certificates.service import upload
## from lemur.common.services.aws.iam import get_all_server_certs
## conn = boto.connect_s3()
## bucket = conn.create_bucket(app.config.get('S3_BUCKET'))
##
## cert_up = {"public_cert": TEST_CERT, "private_key": TEST_KEY, "owner": "test@example.com", "accounts_ids": ['1111']}
##
## cert_name = upload(**cert_up)
## valid_name = 'AHB-dfdsflkj.net-NetflixInc-20140525-20150525'
## self.assertEqual(cert_name, valid_name)
##
## app.logger.debug(cert_name)
## count = 0
##
## for key in bucket.list():
## count += 1
##
## self.assertEqual(count, 2)
## certs = get_all_server_certs('179727101194')
## self.assertEqual(len(certs), 1)
##
##
##

51
lemur/tests/test_elb.py Normal file
View File

@ -0,0 +1,51 @@
import boto
from lemur.tests import LemurTestCase
from moto import mock_elb, mock_sts
#class ELBTestCase(LemurTestCase):
# @mock_sts
# @mock_elb
# def test_add_listener(self):
# from lemur.common.services.aws.elb import create_new_listeners
# conn = boto.connect_elb()
# zones = ['us-east-1a', 'us-east-1b']
# ports = [(80, 8080, 'http')]
# conn.create_load_balancer('my-lb', zones, ports)
# create_new_listeners('111', 'us-east-1', 'my-lb', listeners=[('443', '80', 'HTTP')])
# balancer = conn.get_all_load_balancers()[0]
# self.assertEqual(balancer.name, "my-lb")
# self.assertEqual(len(balancer.listeners), 2)
#
# @mock_sts
# @mock_elb
# def test_update_listener(self):
# from lemur.common.services.aws.elb import update_listeners
# conn = boto.connect_elb()
# zones = ['us-east-1a', 'us-east-1b']
# ports = [(80, 8080, 'http')]
# conn.create_load_balancer('my-lb', zones, ports)
# update_listeners('111', 'us-east-1', 'my-lb', listeners=[('80', '7001', 'http')])
# balancer = conn.get_all_load_balancers()[0]
# listener = balancer.listeners[0]
# self.assertEqual(listener.load_balancer_port, 80)
# self.assertEqual(listener.instance_port, 7001)
# self.assertEqual(listener.protocol, "HTTP")
#
# @mock_sts
# @mock_elb
# def test_set_certificate(self):
# from lemur.common.services.aws.elb import attach_certificate
# conn = boto.connect_elb()
# zones = ['us-east-1a', 'us-east-1b']
# ports = [(443, 7001, 'https', 'sslcert')]
# conn.create_load_balancer('my-lb', zones, ports)
# attach_certificate('1111', 'us-east-1', 'my-lb', 443, 'somecert')
# balancer = conn.get_all_load_balancers()[0]
# listener = balancer.listeners[0]
# self.assertEqual(listener.load_balancer_port, 443)
# self.assertEqual(listener.instance_port, 7001)
# self.assertEqual(listener.protocol, "HTTPS")
# self.assertEqual(listener.ssl_certificate_id, 'somecert')
#

35
lemur/tests/test_iam.py Normal file
View File

@ -0,0 +1,35 @@
from lemur.tests import LemurTestCase
from lemur.certificates.models import Certificate
from moto import mock_iam, mock_sts
#class IAMTestCase(LemurTestCase):
# @mock_sts
# @mock_iam
# def test_get_all_server_certs(self):
# from lemur.common.services.aws.iam import upload_cert, get_all_server_certs
# cert = Certificate(TEST_CERT)
# upload_cert('1111', cert, TEST_KEY)
# certs = get_all_server_certs('1111')
# self.assertEquals(len(certs), 1)
#
# @mock_sts
# @mock_iam
# def test_get_server_cert(self):
# from lemur.common.services.aws.iam import upload_cert, get_cert_from_arn
# cert = Certificate(TEST_CERT)
# upload_cert('1111', cert, TEST_KEY)
# body, chain = get_cert_from_arn('arn:aws:iam::123456789012:server-certificate/AHB-dfdsflkj.net-NetflixInc-20140525-20150525')
# self.assertTrue(body)
#
# @mock_sts
# @mock_iam
# def test_upload_server_cert(self):
# from lemur.common.services.aws.iam import upload_cert
# cert = Certificate(TEST_CERT)
# response = upload_cert('1111', cert, TEST_KEY)
# self.assertEquals(response['upload_server_certificate_response']['upload_server_certificate_result']['server_certificate_metadata']['server_certificate_name'], 'AHB-dfdsflkj.net-NetflixInc-20140525-20150525')
#
#

View File

@ -0,0 +1,16 @@
from lemur.tests import LemurTestCase
#class ManagerTestCase(LemurTestCase):
# def test_validate_authority(self):
# pass
#
# def test_get_all_authorities(self):
# from lemur.common.services.issuers.manager import get_all_authorities
# authorities = get_all_authorities()
# self.assertEqual(len(authorities), 3)
#
# def test_get_all_issuers(self):
# from lemur.common.services.issuers.manager import get_all_issuers
# issuers = get_all_issuers()
# self.assertEqual(len(issuers) > 1)
#

View File

@ -46,7 +46,8 @@ install_requires=[
tests_require = [ tests_require = [
'pyflakes', 'pyflakes',
'moto', 'moto',
'nose' 'nose',
'pytest'
] ]
docs_require = [ docs_require = [