Adding in some initial tests
This commit is contained in:
parent
4330ac9c05
commit
5111f055fa
@ -15,7 +15,7 @@ from sqlalchemy import exc
|
||||
from sqlalchemy.sql import and_, or_
|
||||
|
||||
from lemur.extensions import db
|
||||
from lemur.exceptions import AttrNotFound, IntegrityError
|
||||
from lemur.exceptions import AttrNotFound, IntegrityError, DuplicateError
|
||||
|
||||
|
||||
def filter_none(kwargs):
|
||||
@ -153,9 +153,10 @@ def create(model):
|
||||
try:
|
||||
db.session.add(model)
|
||||
commit()
|
||||
db.session.refresh(model)
|
||||
except exc.IntegrityError as e:
|
||||
raise IntegrityError(e.orig.diag.message_detail)
|
||||
raise DuplicateError(e.orig.diag.message_detail)
|
||||
|
||||
db.session.refresh(model)
|
||||
return model
|
||||
|
||||
|
||||
|
@ -11,6 +11,14 @@ class LemurException(Exception):
|
||||
current_app.logger.error(self)
|
||||
|
||||
|
||||
class DuplicateError(LemurException):
|
||||
def __init__(self, key):
|
||||
self.key = key
|
||||
|
||||
def __str__(self):
|
||||
return repr("Duplicate found! Could not create: {0}".format(self.key))
|
||||
|
||||
|
||||
class AuthenticationFailedException(LemurException):
|
||||
def __init__(self, remote_ip, user_agent):
|
||||
self.remote_ip = remote_ip
|
||||
|
@ -4,7 +4,41 @@
|
||||
:license: Apache, see LICENSE for more details.
|
||||
"""
|
||||
|
||||
from flask.ext.sqlalchemy import SQLAlchemy
|
||||
from flask.ext.sqlalchemy import SQLAlchemy, SignallingSession, SessionBase
|
||||
|
||||
|
||||
class _SignallingSession(SignallingSession):
|
||||
"""A subclass of `SignallingSession` that allows for `binds` to be specified
|
||||
in the `options` keyword arguments.
|
||||
|
||||
"""
|
||||
def __init__(self, db, autocommit=False, autoflush=True, **options):
|
||||
self.app = db.get_app()
|
||||
self._model_changes = {}
|
||||
self.emit_modification_signals = \
|
||||
self.app.config['SQLALCHEMY_TRACK_MODIFICATIONS']
|
||||
|
||||
bind = options.pop('bind', None)
|
||||
if bind is None:
|
||||
bind = db.engine
|
||||
|
||||
binds = options.pop('binds', None)
|
||||
if binds is None:
|
||||
binds = db.get_binds(self.app)
|
||||
|
||||
SessionBase.__init__(self,
|
||||
autocommit=autocommit,
|
||||
autoflush=autoflush,
|
||||
bind=bind,
|
||||
binds=binds,
|
||||
**options)
|
||||
|
||||
|
||||
class _SQLAlchemy(SQLAlchemy):
|
||||
"""A subclass of `SQLAlchemy` that uses `_SignallingSession`."""
|
||||
def create_session(self, options):
|
||||
return _SignallingSession(self, **options)
|
||||
|
||||
db = SQLAlchemy()
|
||||
|
||||
from flask.ext.migrate import Migrate
|
||||
|
@ -51,6 +51,12 @@ def create_app(app_name=None, blueprints=None, config=None):
|
||||
configure_blueprints(app, blueprints)
|
||||
configure_extensions(app)
|
||||
configure_logging(app)
|
||||
|
||||
@app.teardown_appcontext
|
||||
def teardown(exception=None):
|
||||
if db.session:
|
||||
db.session.remove()
|
||||
|
||||
return app
|
||||
|
||||
|
||||
@ -84,7 +90,7 @@ def configure_app(app, config=None):
|
||||
:return:
|
||||
"""
|
||||
try:
|
||||
app.config.from_envvar("LEMUR_SETTINGS")
|
||||
app.config.from_envvar("LEMUR_CONF")
|
||||
except RuntimeError:
|
||||
if config and config != 'None':
|
||||
app.config.from_object(from_file(config))
|
||||
|
@ -1,16 +1,4 @@
|
||||
import unittest
|
||||
from nose.tools import eq_
|
||||
|
||||
from lemur import app
|
||||
|
||||
test_app = app.test_client()
|
||||
|
||||
HEADERS = {'Content-Type': 'application/json'}
|
||||
|
||||
|
||||
def check_content_type(headers):
|
||||
eq_(headers['Content-Type'], 'application/json')
|
||||
|
||||
|
||||
class LemurTestCase(unittest.TestCase):
|
||||
pass
|
||||
|
@ -1,87 +0,0 @@
|
||||
import os
|
||||
import shutil
|
||||
import boto
|
||||
|
||||
from lemur import app
|
||||
from lemur.tests import LemurTestCase
|
||||
from lemur.tests.constants import TEST_CERT, TEST_KEY
|
||||
|
||||
from moto import mock_iam, mock_sts, mock_s3
|
||||
|
||||
|
||||
class CertificateTestCase(LemurTestCase):
|
||||
def test_create_challenge(self):
|
||||
from lemur.certificates.service import create_challenge
|
||||
self.assertTrue(len(create_challenge()) >= 24)
|
||||
|
||||
def test_hash_domains(self):
|
||||
from lemur.certificates.service import hash_domains
|
||||
h = hash_domains(['netflix.com', 'www.netflix.com', 'movies.netflix.com'])
|
||||
self.assertEqual('c9c83253b46c7c1245c100ed3f7045eb', h)
|
||||
|
||||
def test_create_csr(self):
|
||||
from lemur.certificates.service import create_csr
|
||||
from lemur.tests.certificates.test_csr import TEST_CSR
|
||||
path = create_csr(['netflix.com'], TEST_CSR)
|
||||
files = len(os.listdir(path))
|
||||
self.assertEqual(files, 4)
|
||||
shutil.rmtree(path)
|
||||
|
||||
def test_create_san_csr(self):
|
||||
from lemur.certificates.service import create_csr
|
||||
from lemur.tests.certificates.test_csr import TEST_CSR
|
||||
path = create_csr(['netflix.com', 'www.netflix.com'], TEST_CSR)
|
||||
files = len(os.listdir(path))
|
||||
self.assertEqual(files, 4)
|
||||
shutil.rmtree(path)
|
||||
|
||||
def test_create_path(self):
|
||||
from lemur.certificates.service import create_path
|
||||
path = create_path("blah")
|
||||
self.assertIn('blah', path)
|
||||
shutil.rmtree(path)
|
||||
|
||||
@mock_s3
|
||||
@mock_sts
|
||||
@mock_iam
|
||||
def test_save_cert(self):
|
||||
from lemur.certificates.service import save_cert
|
||||
from lemur.common.services.aws.iam import get_all_server_certs
|
||||
conn = boto.connect_s3()
|
||||
bucket = conn.create_bucket(app.config.get('S3_BUCKET'))
|
||||
cert = save_cert(TEST_CERT, TEST_KEY, None, "blah", "blah", [1])
|
||||
count = 0
|
||||
for key in bucket.list():
|
||||
count += 1
|
||||
|
||||
self.assertEqual(count, 4)
|
||||
certs = get_all_server_certs('1111')
|
||||
self.assertEqual(len(certs), 1)
|
||||
|
||||
# @mock_s3
|
||||
# @mock_sts
|
||||
# @mock_iam
|
||||
# def test_upload_cert(self):
|
||||
# from lemur.certificates.service import upload
|
||||
# from lemur.common.services.aws.iam import get_all_server_certs
|
||||
# conn = boto.connect_s3()
|
||||
# bucket = conn.create_bucket(app.config.get('S3_BUCKET'))
|
||||
#
|
||||
# cert_up = {"public_cert": TEST_CERT, "private_key": TEST_KEY, "owner": "test@example.com", "accounts_ids": ['1111']}
|
||||
#
|
||||
# cert_name = upload(**cert_up)
|
||||
# valid_name = 'AHB-dfdsflkj.net-NetflixInc-20140525-20150525'
|
||||
# self.assertEqual(cert_name, valid_name)
|
||||
#
|
||||
# app.logger.debug(cert_name)
|
||||
# count = 0
|
||||
#
|
||||
# for key in bucket.list():
|
||||
# count += 1
|
||||
#
|
||||
# self.assertEqual(count, 2)
|
||||
# certs = get_all_server_certs('179727101194')
|
||||
# self.assertEqual(len(certs), 1)
|
||||
#
|
||||
#
|
||||
#
|
91
lemur/tests/conftest.py
Normal file
91
lemur/tests/conftest.py
Normal file
@ -0,0 +1,91 @@
|
||||
import pytest
|
||||
|
||||
from lemur import create_app
|
||||
from lemur.database import db as _db
|
||||
|
||||
from flask.ext.sqlalchemy import SignallingSession
|
||||
|
||||
from sqlalchemy import event
|
||||
|
||||
|
||||
def pytest_addoption(parser):
|
||||
parser.addoption("--runslow", action="store_true", help="run slow tests")
|
||||
|
||||
|
||||
def pytest_runtest_setup(item):
|
||||
if 'slow' in item.keywords and not item.config.getoption("--runslow"):
|
||||
pytest.skip("need --runslow option to run")
|
||||
|
||||
if "incremental" in item.keywords:
|
||||
previousfailed = getattr(item.parent, "_previousfailed", None)
|
||||
if previousfailed is not None:
|
||||
pytest.xfail("previous test failed ({0})".format(previousfailed.name))
|
||||
|
||||
|
||||
def pytest_runtest_makereport(item, call):
|
||||
if "incremental" in item.keywords:
|
||||
if call.excinfo is not None:
|
||||
parent = item.parent
|
||||
parent._previousfailed = item
|
||||
|
||||
|
||||
@pytest.yield_fixture(scope="session")
|
||||
def app():
|
||||
"""
|
||||
Creates a new Flask application for a test duration.
|
||||
Uses application factory `create_app`.
|
||||
"""
|
||||
app = create_app()
|
||||
|
||||
ctx = app.app_context()
|
||||
ctx.push()
|
||||
|
||||
yield app
|
||||
|
||||
ctx.pop()
|
||||
|
||||
|
||||
@pytest.yield_fixture(scope="session")
|
||||
def db():
|
||||
_db.create_all()
|
||||
|
||||
yield _db
|
||||
|
||||
_db.drop_all()
|
||||
|
||||
|
||||
@pytest.yield_fixture(scope="function")
|
||||
def session(app, db):
|
||||
"""
|
||||
Creates a new database session with (with working transaction)
|
||||
for test duration.
|
||||
"""
|
||||
connection = _db.engine.connect()
|
||||
transaction = connection.begin()
|
||||
|
||||
options = dict(bind=connection)
|
||||
session = _db.create_scoped_session(options=options)
|
||||
|
||||
# then each time that SAVEPOINT ends, reopen it
|
||||
@event.listens_for(SignallingSession, "after_transaction_end")
|
||||
def restart_savepoint(session, transaction):
|
||||
if transaction.nested and not transaction._parent.nested:
|
||||
|
||||
# ensure that state is expired the way
|
||||
# session.commit() at the top level normally does
|
||||
# (optional step)
|
||||
session.expire_all()
|
||||
|
||||
session.begin_nested()
|
||||
|
||||
# pushing new Flask application context for multiple-thread
|
||||
# tests to work
|
||||
|
||||
_db.session = session
|
||||
|
||||
yield session
|
||||
|
||||
# the code after the yield statement works as a teardown
|
||||
transaction.rollback()
|
||||
connection.close()
|
||||
session.remove()
|
@ -1,51 +0,0 @@
|
||||
TEST_KEY = """-----BEGIN RSA PRIVATE KEY-----
|
||||
MIIEogIBAAKCAQEAvNudwW+UeQqkpY71MIdEg501AFlPKuOXG2xU8DZhvZS6dKv+
|
||||
kDmIWdEqodDgkQiy0jyTgTwxwRqDSw96R6ZgrXefUoJJo66aCsosTBZtVaE85f1L
|
||||
bj2+3U678c+rekUdkrnGcGCo6b8QtdvBpiDy2clneox8tSvmffAdcR1uCv/790/k
|
||||
PzQ/djWDX9JcBRyDkcTJwYC0/ek7URvA/+MXmgUL13T+gWKqduaKuIBlFetonDjn
|
||||
nO11QUBiusIuHV62wzKn8m5Nc+4XoaBR0YWMFn/g6qXDYrwfCsMpka7vSWJFv5Ff
|
||||
yf+7kY3wU4xIwU2vXlIDcCsdUu6b/pYoQ0YOsQIDAQABAoIBAGbFH6iWnnXrq8MH
|
||||
8zcQNOFmF+RztRgCt0TOA76f6TowB/LbcXBsTl2J7CgYMUvbLuwm2KHX7r9FPTMI
|
||||
XiNFT5C16rYMfiQbLGo4sDhLb/3L+wawem6oHQfzA2VH++lSWRByFaEriF+CgIZl
|
||||
6pALl/uZlLzkXCx+kjPwCSV3vV0wFkDnNs6+wPrz2IhkePsuC8J0QKQLlwsES2It
|
||||
Gizzhpehdv9lc9MyZC//1QlD9gMDl5ok5Bt1Xm2c12XUEEcLlKQkJxiOrBOfXPmV
|
||||
PHCdLc7gZO30hc6dyQ1SSnLpywhz/a0ir2GMvkMbS5hculpcZmwEcdZl1HYD8ObP
|
||||
yOMbPE0CgYEA4LVGJKGtbM8RiBB0MstxNstMYVJ4mXB0lSQ0RazdO3S3ojn+oLpF
|
||||
b2pvV6m9WnHiCGigWkzhqtGGCo6aqE0MoiR4jTN8GhiZz4ggDDaVgc4Px5reUD+r
|
||||
tRsTpBHseGQ+ODGgkMI8eJYkdyqkECkYjAOrdy6uorvgxUAZecRIfJMCgYEA1yhM
|
||||
7NidTNRuA+huS5GcQwQweTM6P1qF7Kfk1JYQMVu4gibLZiLHlWCyHI9lrbI7IaMm
|
||||
g/4jXXoewv7IvyrrSEFulkPeVWxCe3mjfQ8JANfUj4kuR915LSn4lX2pbUgUS66K
|
||||
vJSUJtnzLUmb8khLEcOmDbmTFZl8D/bTHFFZlisCgYAeelfWNhuoq3lMRDcOgKuN
|
||||
bAujE6WJ4kfdxrhUTvr+ynjxxv3zXPB4CS6q7Dnjn5ix3UcKmGzvV1Xf7rGpbDHv
|
||||
eBTlyfrmKzoJfQQjw++JWKKpRycqKUin2tFSKqAxQB90Tb7ig4XiMTMm+qCgFILg
|
||||
0sqZ8rn7FpKJDoWmD2ppgwKBgG2Dl9QeVcKbhfv7PNi+HvmFkl6+knFY1D4nHzSN
|
||||
xWQ6OWoV8QXlwgzokQA0hR6qT6rJbntUyg90b1/1a5zSbbvzgiR+GxcD6bsLqQmo
|
||||
s354XTtKKgJuWpWAfYUp1ylGvP3gs8FVJyu3WC2+/9+MqJk8KrNlt9YQr7M4gTAy
|
||||
wBTNAoGAGU7Po4uI3xDKGLLK/ot3D3P8U9ByfeLlrUZtTz1PASsMOr92bkXmUPlE
|
||||
DYUd5uFfwwlvbMNT1Ooeyrzg3bARd9B6ATyMkOaJeGoQwFAI468iucnm9rNXB+/t
|
||||
U2rbIi1pXSm8zSNEY85tf6C8DU/5YbcAPf47a2UYhwCpYAJfMk0=
|
||||
-----END RSA PRIVATE KEY-----"""
|
||||
|
||||
TEST_CERT = """-----BEGIN CERTIFICATE-----
|
||||
MIIDcDCCAlgCCQC8msHu/aa61zANBgkqhkiG9w0BAQUFADB6MQswCQYDVQQGEwJV
|
||||
UzETMBEGA1UECBMKQ0FMSUZPUk5JQTESMBAGA1UEBxMJTG9zIEdhdG9zMRYwFAYD
|
||||
VQQKEw1OZXRmbGl4LCBJbmMuMRMwEQYDVQQLEwpPcGVyYXRpb25zMRUwEwYDVQQD
|
||||
EwxkZmRzZmxrai5uZXQwHhcNMTQwNTI1MTczMDMzWhcNMTUwNTI1MTczMDMzWjB6
|
||||
MQswCQYDVQQGEwJVUzETMBEGA1UECBMKQ0FMSUZPUk5JQTESMBAGA1UEBxMJTG9z
|
||||
IEdhdG9zMRYwFAYDVQQKEw1OZXRmbGl4LCBJbmMuMRMwEQYDVQQLEwpPcGVyYXRp
|
||||
b25zMRUwEwYDVQQDEwxkZmRzZmxrai5uZXQwggEiMA0GCSqGSIb3DQEBAQUAA4IB
|
||||
DwAwggEKAoIBAQC8253Bb5R5CqSljvUwh0SDnTUAWU8q45cbbFTwNmG9lLp0q/6Q
|
||||
OYhZ0Sqh0OCRCLLSPJOBPDHBGoNLD3pHpmCtd59SgkmjrpoKyixMFm1VoTzl/Utu
|
||||
Pb7dTrvxz6t6RR2SucZwYKjpvxC128GmIPLZyWd6jHy1K+Z98B1xHW4K//v3T+Q/
|
||||
ND92NYNf0lwFHIORxMnBgLT96TtRG8D/4xeaBQvXdP6BYqp25oq4gGUV62icOOec
|
||||
7XVBQGK6wi4dXrbDMqfybk1z7hehoFHRhYwWf+DqpcNivB8KwymRru9JYkW/kV/J
|
||||
/7uRjfBTjEjBTa9eUgNwKx1S7pv+lihDRg6xAgMBAAEwDQYJKoZIhvcNAQEFBQAD
|
||||
ggEBAJHwa4l2iSiFBb6wVFBJEWEt31qp+njiVCoTg2OJzCT60Xb26hkrsiTldIIh
|
||||
eB9+y+fwdfwopzWhkNbIOlCfudx/uxtpor8/3BRbjSlNwDUg2L8pfAircJMFLQUM
|
||||
O6nqPOBWCe8hXwe9FQM/oFOavf/AAw/FED+892xlytjirK9u3B28O20W11+fY7hp
|
||||
8LQVBrMoVxFeLWmmwETAltJ7HEYutplRzYTM0vLBARl4Vd5kLJlY3j2Dp1ZpRGcg
|
||||
CrQp26UD/oaAPGtiZQSC4LJ+4JfOuuqbm3CI24QMCh9rxv3ZoOQnFuC+7cZgqrat
|
||||
V4bxCrVvWhrrDSgy9+A80NVzQ3k=
|
||||
-----END CERTIFICATE-----"""
|
||||
|
||||
|
@ -1,5 +0,0 @@
|
||||
import os
|
||||
import shutil
|
||||
from lemur import app
|
||||
from lemur.tests import LemurTestCase
|
||||
|
@ -1,51 +0,0 @@
|
||||
import boto
|
||||
from lemur.tests import LemurTestCase
|
||||
|
||||
from moto import mock_elb, mock_sts
|
||||
|
||||
|
||||
class ELBTestCase(LemurTestCase):
|
||||
@mock_sts
|
||||
@mock_elb
|
||||
def test_add_listener(self):
|
||||
from lemur.common.services.aws.elb import create_new_listeners
|
||||
conn = boto.connect_elb()
|
||||
zones = ['us-east-1a', 'us-east-1b']
|
||||
ports = [(80, 8080, 'http')]
|
||||
conn.create_load_balancer('my-lb', zones, ports)
|
||||
create_new_listeners('111', 'us-east-1', 'my-lb', listeners=[('443', '80', 'HTTP')])
|
||||
balancer = conn.get_all_load_balancers()[0]
|
||||
self.assertEqual(balancer.name, "my-lb")
|
||||
self.assertEqual(len(balancer.listeners), 2)
|
||||
|
||||
@mock_sts
|
||||
@mock_elb
|
||||
def test_update_listener(self):
|
||||
from lemur.common.services.aws.elb import update_listeners
|
||||
conn = boto.connect_elb()
|
||||
zones = ['us-east-1a', 'us-east-1b']
|
||||
ports = [(80, 8080, 'http')]
|
||||
conn.create_load_balancer('my-lb', zones, ports)
|
||||
update_listeners('111', 'us-east-1', 'my-lb', listeners=[('80', '7001', 'http')])
|
||||
balancer = conn.get_all_load_balancers()[0]
|
||||
listener = balancer.listeners[0]
|
||||
self.assertEqual(listener.load_balancer_port, 80)
|
||||
self.assertEqual(listener.instance_port, 7001)
|
||||
self.assertEqual(listener.protocol, "HTTP")
|
||||
|
||||
@mock_sts
|
||||
@mock_elb
|
||||
def test_set_certificate(self):
|
||||
from lemur.common.services.aws.elb import attach_certificate
|
||||
conn = boto.connect_elb()
|
||||
zones = ['us-east-1a', 'us-east-1b']
|
||||
ports = [(443, 7001, 'https', 'sslcert')]
|
||||
conn.create_load_balancer('my-lb', zones, ports)
|
||||
attach_certificate('1111', 'us-east-1', 'my-lb', 443, 'somecert')
|
||||
balancer = conn.get_all_load_balancers()[0]
|
||||
listener = balancer.listeners[0]
|
||||
self.assertEqual(listener.load_balancer_port, 443)
|
||||
self.assertEqual(listener.instance_port, 7001)
|
||||
self.assertEqual(listener.protocol, "HTTPS")
|
||||
self.assertEqual(listener.ssl_certificate_id, 'somecert')
|
||||
|
@ -1,37 +0,0 @@
|
||||
from lemur import app
|
||||
from lemur.tests import LemurTestCase
|
||||
from lemur.tests.constants import TEST_CERT, TEST_KEY
|
||||
|
||||
from lemur.certificates.models import Certificate
|
||||
|
||||
from moto import mock_iam, mock_sts
|
||||
|
||||
|
||||
class IAMTestCase(LemurTestCase):
|
||||
@mock_sts
|
||||
@mock_iam
|
||||
def test_get_all_server_certs(self):
|
||||
from lemur.common.services.aws.iam import upload_cert, get_all_server_certs
|
||||
cert = Certificate(TEST_CERT)
|
||||
upload_cert('1111', cert, TEST_KEY)
|
||||
certs = get_all_server_certs('1111')
|
||||
self.assertEquals(len(certs), 1)
|
||||
|
||||
@mock_sts
|
||||
@mock_iam
|
||||
def test_get_server_cert(self):
|
||||
from lemur.common.services.aws.iam import upload_cert, get_cert_from_arn
|
||||
cert = Certificate(TEST_CERT)
|
||||
upload_cert('1111', cert, TEST_KEY)
|
||||
body, chain = get_cert_from_arn('arn:aws:iam::123456789012:server-certificate/AHB-dfdsflkj.net-NetflixInc-20140525-20150525')
|
||||
self.assertTrue(body)
|
||||
|
||||
@mock_sts
|
||||
@mock_iam
|
||||
def test_upload_server_cert(self):
|
||||
from lemur.common.services.aws.iam import upload_cert
|
||||
cert = Certificate(TEST_CERT)
|
||||
response = upload_cert('1111', cert, TEST_KEY)
|
||||
self.assertEquals(response['upload_server_certificate_response']['upload_server_certificate_result']['server_certificate_metadata']['server_certificate_name'], 'AHB-dfdsflkj.net-NetflixInc-20140525-20150525')
|
||||
|
||||
|
@ -1,23 +0,0 @@
|
||||
from lemur import app
|
||||
from lemur.tests import LemurTestCase
|
||||
from lemur.tests.constants import TEST_CERT, TEST_KEY
|
||||
|
||||
from lemur.certificates.models import Certificate
|
||||
|
||||
from moto import mock_iam, mock_sts
|
||||
|
||||
|
||||
class ManagerTestCase(LemurTestCase):
|
||||
def test_validate_authority(self):
|
||||
pass
|
||||
|
||||
def test_get_all_authorities(self):
|
||||
from lemur.common.services.issuers.manager import get_all_authorities
|
||||
authorities = get_all_authorities()
|
||||
self.assertEqual(len(authorities), 3)
|
||||
|
||||
def test_get_all_issuers(self):
|
||||
from lemur.common.services.issuers.manager import get_all_issuers
|
||||
issuers = get_all_issuers()
|
||||
self.assertEqual(len(issuers) > 1)
|
||||
|
@ -1,27 +0,0 @@
|
||||
import boto
|
||||
|
||||
from lemur.tests import LemurTestCase
|
||||
from lemur.tests.constants import TEST_CERT
|
||||
|
||||
from lemur.certificates.models import Certificate
|
||||
|
||||
from moto import mock_s3
|
||||
|
||||
|
||||
class S3TestCase(LemurTestCase):
|
||||
@mock_s3
|
||||
def test_save(self):
|
||||
from lemur.common.services.aws.s3 import save
|
||||
conn = boto.connect_s3()
|
||||
|
||||
cert = Certificate(TEST_CERT)
|
||||
|
||||
buck = conn.create_bucket('test')
|
||||
path = save(cert, 'private_key', None, 'csr_config', 'challenge')
|
||||
self.assertEqual(path, 'lemur/{}/{}/'.format(cert.issuer, cert.name))
|
||||
|
||||
count = 0
|
||||
for key in buck.list():
|
||||
count += 1
|
||||
|
||||
self.assertEqual(count, 4)
|
53
lemur/tests/test_accounts.py
Normal file
53
lemur/tests/test_accounts.py
Normal file
@ -0,0 +1,53 @@
|
||||
|
||||
import pytest
|
||||
from lemur.accounts.service import *
|
||||
from lemur.exceptions import DuplicateError
|
||||
|
||||
from lemur.accounts.views import *
|
||||
|
||||
#def test_crud(session):
|
||||
# account = create('111111', 'account1')
|
||||
# assert account.id > 0
|
||||
#
|
||||
# account = update(account.id, 11111, 'account2')
|
||||
# assert account.label == 'account2'
|
||||
#
|
||||
# assert len(get_all()) == 1
|
||||
#
|
||||
# delete(1)
|
||||
# assert len(get_all()) == 0
|
||||
#
|
||||
|
||||
#def test_duplicate(session):
|
||||
# account = create('111111', 'account1')
|
||||
# assert account.id > 0
|
||||
#
|
||||
# with pytest.raises(DuplicateError):
|
||||
# account = create('111111', 'account1')
|
||||
|
||||
|
||||
def test_basic_user_views(client):
|
||||
pass
|
||||
|
||||
|
||||
def test_admin_user_views(client):
|
||||
pass
|
||||
|
||||
def test_unauthenticated_views(client):
|
||||
assert client.get(api.url_for(Accounts, account_id=1)).status_code == 401
|
||||
assert client.post(api.url_for(Accounts, account_id=1), {}).status_code == 405
|
||||
assert client.put(api.url_for(Accounts, account_id=1), {}).status_code == 401
|
||||
assert client.delete(api.url_for(Accounts, account_id=1)).status_code == 401
|
||||
assert client.patch(api.url_for(Accounts, account_id=1), {}).status_code == 405
|
||||
|
||||
assert client.get(api.url_for(AccountsList)).status_code == 401
|
||||
assert client.post(api.url_for(AccountsList), {}).status_code == 401
|
||||
assert client.put(api.url_for(AccountsList), {}).status_code == 405
|
||||
assert client.delete(api.url_for(AccountsList)).status_code == 405
|
||||
assert client.patch(api.url_for(Accounts), {}).status_code == 405
|
||||
|
||||
assert client.get(api.url_for(CertificateAccounts, certificate_id=1)).status_code == 401
|
||||
assert client.post(api.url_for(CertificateAccounts), {}).status_code == 405
|
||||
assert client.put(api.url_for(CertificateAccounts), {}).status_code == 405
|
||||
assert client.delete(api.url_for(CertificateAccounts)).status_code == 405
|
||||
assert client.patch(api.url_for(CertificateAccounts), {}).status_code == 405
|
85
lemur/tests/test_certificates.py
Normal file
85
lemur/tests/test_certificates.py
Normal file
@ -0,0 +1,85 @@
|
||||
import os
|
||||
import shutil
|
||||
|
||||
import boto
|
||||
from moto import mock_iam, mock_sts, mock_s3
|
||||
|
||||
from lemur.tests import LemurTestCase
|
||||
|
||||
|
||||
#class CertificateTestCase(LemurTestCase):
|
||||
# def test_create_challenge(self):
|
||||
# from lemur.certificates.service import create_challenge
|
||||
# self.assertTrue(len(create_challenge()) >= 24)
|
||||
#
|
||||
# def test_hash_domains(self):
|
||||
# from lemur.certificates.service import hash_domains
|
||||
# h = hash_domains(['netflix.com', 'www.netflix.com', 'movies.netflix.com'])
|
||||
# self.assertEqual('c9c83253b46c7c1245c100ed3f7045eb', h)
|
||||
#
|
||||
# def test_create_csr(self):
|
||||
# from lemur.certificates.service import create_csr
|
||||
# from lemur.tests.test_csr import TEST_CSR
|
||||
# path = create_csr(['netflix.com'], TEST_CSR)
|
||||
# files = len(os.listdir(path))
|
||||
# self.assertEqual(files, 4)
|
||||
# shutil.rmtree(path)
|
||||
#
|
||||
# def test_create_san_csr(self):
|
||||
# from lemur.certificates.service import create_csr
|
||||
# from lemur.tests.test_csr import TEST_CSR
|
||||
# path = create_csr(['netflix.com', 'www.netflix.com'], TEST_CSR)
|
||||
# files = len(os.listdir(path))
|
||||
# self.assertEqual(files, 4)
|
||||
# shutil.rmtree(path)
|
||||
#
|
||||
# def test_create_path(self):
|
||||
# from lemur.certificates.service import create_path
|
||||
# path = create_path("blah")
|
||||
# self.assertIn('blah', path)
|
||||
# shutil.rmtree(path)
|
||||
#
|
||||
# @mock_s3
|
||||
# @mock_sts
|
||||
# @mock_iam
|
||||
# def test_save_cert(self):
|
||||
# from lemur.certificates.service import save_cert
|
||||
# from lemur.common.services.aws.iam import get_all_server_certs
|
||||
# conn = boto.connect_s3()
|
||||
# bucket = conn.create_bucket(app.config.get('S3_BUCKET'))
|
||||
# cert = save_cert(TEST_CERT, TEST_KEY, None, "blah", "blah", [1])
|
||||
# count = 0
|
||||
# for key in bucket.list():
|
||||
# count += 1
|
||||
#
|
||||
# self.assertEqual(count, 4)
|
||||
# certs = get_all_server_certs('1111')
|
||||
# self.assertEqual(len(certs), 1)
|
||||
#
|
||||
## @mock_s3
|
||||
## @mock_sts
|
||||
## @mock_iam
|
||||
## def test_upload_cert(self):
|
||||
## from lemur.certificates.service import upload
|
||||
## from lemur.common.services.aws.iam import get_all_server_certs
|
||||
## conn = boto.connect_s3()
|
||||
## bucket = conn.create_bucket(app.config.get('S3_BUCKET'))
|
||||
##
|
||||
## cert_up = {"public_cert": TEST_CERT, "private_key": TEST_KEY, "owner": "test@example.com", "accounts_ids": ['1111']}
|
||||
##
|
||||
## cert_name = upload(**cert_up)
|
||||
## valid_name = 'AHB-dfdsflkj.net-NetflixInc-20140525-20150525'
|
||||
## self.assertEqual(cert_name, valid_name)
|
||||
##
|
||||
## app.logger.debug(cert_name)
|
||||
## count = 0
|
||||
##
|
||||
## for key in bucket.list():
|
||||
## count += 1
|
||||
##
|
||||
## self.assertEqual(count, 2)
|
||||
## certs = get_all_server_certs('179727101194')
|
||||
## self.assertEqual(len(certs), 1)
|
||||
##
|
||||
##
|
||||
##
|
51
lemur/tests/test_elb.py
Normal file
51
lemur/tests/test_elb.py
Normal file
@ -0,0 +1,51 @@
|
||||
import boto
|
||||
from lemur.tests import LemurTestCase
|
||||
|
||||
from moto import mock_elb, mock_sts
|
||||
|
||||
|
||||
#class ELBTestCase(LemurTestCase):
|
||||
# @mock_sts
|
||||
# @mock_elb
|
||||
# def test_add_listener(self):
|
||||
# from lemur.common.services.aws.elb import create_new_listeners
|
||||
# conn = boto.connect_elb()
|
||||
# zones = ['us-east-1a', 'us-east-1b']
|
||||
# ports = [(80, 8080, 'http')]
|
||||
# conn.create_load_balancer('my-lb', zones, ports)
|
||||
# create_new_listeners('111', 'us-east-1', 'my-lb', listeners=[('443', '80', 'HTTP')])
|
||||
# balancer = conn.get_all_load_balancers()[0]
|
||||
# self.assertEqual(balancer.name, "my-lb")
|
||||
# self.assertEqual(len(balancer.listeners), 2)
|
||||
#
|
||||
# @mock_sts
|
||||
# @mock_elb
|
||||
# def test_update_listener(self):
|
||||
# from lemur.common.services.aws.elb import update_listeners
|
||||
# conn = boto.connect_elb()
|
||||
# zones = ['us-east-1a', 'us-east-1b']
|
||||
# ports = [(80, 8080, 'http')]
|
||||
# conn.create_load_balancer('my-lb', zones, ports)
|
||||
# update_listeners('111', 'us-east-1', 'my-lb', listeners=[('80', '7001', 'http')])
|
||||
# balancer = conn.get_all_load_balancers()[0]
|
||||
# listener = balancer.listeners[0]
|
||||
# self.assertEqual(listener.load_balancer_port, 80)
|
||||
# self.assertEqual(listener.instance_port, 7001)
|
||||
# self.assertEqual(listener.protocol, "HTTP")
|
||||
#
|
||||
# @mock_sts
|
||||
# @mock_elb
|
||||
# def test_set_certificate(self):
|
||||
# from lemur.common.services.aws.elb import attach_certificate
|
||||
# conn = boto.connect_elb()
|
||||
# zones = ['us-east-1a', 'us-east-1b']
|
||||
# ports = [(443, 7001, 'https', 'sslcert')]
|
||||
# conn.create_load_balancer('my-lb', zones, ports)
|
||||
# attach_certificate('1111', 'us-east-1', 'my-lb', 443, 'somecert')
|
||||
# balancer = conn.get_all_load_balancers()[0]
|
||||
# listener = balancer.listeners[0]
|
||||
# self.assertEqual(listener.load_balancer_port, 443)
|
||||
# self.assertEqual(listener.instance_port, 7001)
|
||||
# self.assertEqual(listener.protocol, "HTTPS")
|
||||
# self.assertEqual(listener.ssl_certificate_id, 'somecert')
|
||||
#
|
35
lemur/tests/test_iam.py
Normal file
35
lemur/tests/test_iam.py
Normal file
@ -0,0 +1,35 @@
|
||||
from lemur.tests import LemurTestCase
|
||||
|
||||
from lemur.certificates.models import Certificate
|
||||
|
||||
from moto import mock_iam, mock_sts
|
||||
|
||||
|
||||
#class IAMTestCase(LemurTestCase):
|
||||
# @mock_sts
|
||||
# @mock_iam
|
||||
# def test_get_all_server_certs(self):
|
||||
# from lemur.common.services.aws.iam import upload_cert, get_all_server_certs
|
||||
# cert = Certificate(TEST_CERT)
|
||||
# upload_cert('1111', cert, TEST_KEY)
|
||||
# certs = get_all_server_certs('1111')
|
||||
# self.assertEquals(len(certs), 1)
|
||||
#
|
||||
# @mock_sts
|
||||
# @mock_iam
|
||||
# def test_get_server_cert(self):
|
||||
# from lemur.common.services.aws.iam import upload_cert, get_cert_from_arn
|
||||
# cert = Certificate(TEST_CERT)
|
||||
# upload_cert('1111', cert, TEST_KEY)
|
||||
# body, chain = get_cert_from_arn('arn:aws:iam::123456789012:server-certificate/AHB-dfdsflkj.net-NetflixInc-20140525-20150525')
|
||||
# self.assertTrue(body)
|
||||
#
|
||||
# @mock_sts
|
||||
# @mock_iam
|
||||
# def test_upload_server_cert(self):
|
||||
# from lemur.common.services.aws.iam import upload_cert
|
||||
# cert = Certificate(TEST_CERT)
|
||||
# response = upload_cert('1111', cert, TEST_KEY)
|
||||
# self.assertEquals(response['upload_server_certificate_response']['upload_server_certificate_result']['server_certificate_metadata']['server_certificate_name'], 'AHB-dfdsflkj.net-NetflixInc-20140525-20150525')
|
||||
#
|
||||
#
|
16
lemur/tests/test_issuer_manager.py
Normal file
16
lemur/tests/test_issuer_manager.py
Normal file
@ -0,0 +1,16 @@
|
||||
from lemur.tests import LemurTestCase
|
||||
|
||||
#class ManagerTestCase(LemurTestCase):
|
||||
# def test_validate_authority(self):
|
||||
# pass
|
||||
#
|
||||
# def test_get_all_authorities(self):
|
||||
# from lemur.common.services.issuers.manager import get_all_authorities
|
||||
# authorities = get_all_authorities()
|
||||
# self.assertEqual(len(authorities), 3)
|
||||
#
|
||||
# def test_get_all_issuers(self):
|
||||
# from lemur.common.services.issuers.manager import get_all_issuers
|
||||
# issuers = get_all_issuers()
|
||||
# self.assertEqual(len(issuers) > 1)
|
||||
#
|
Loading…
Reference in New Issue
Block a user