From 5111f055fa92c52754791da3ae9caf92870bbf39 Mon Sep 17 00:00:00 2001 From: Kevin Glisson Date: Wed, 24 Jun 2015 16:48:40 -0700 Subject: [PATCH 01/13] Adding in some initial tests --- lemur/database.py | 7 +- lemur/exceptions.py | 8 ++ lemur/extensions.py | 36 +++++++- lemur/factory.py | 8 +- lemur/tests/__init__.py | 12 --- lemur/tests/certificates/test_certificates.py | 87 ------------------ lemur/tests/conftest.py | 91 +++++++++++++++++++ lemur/tests/constants.py | 51 ----------- lemur/tests/elbs/__init__.py | 0 lemur/tests/elbs/test_elbs.py | 5 - lemur/tests/listeners/__init__.py | 0 lemur/tests/services/__init__.py | 0 lemur/tests/services/test_elb.py | 51 ----------- lemur/tests/services/test_iam.py | 37 -------- lemur/tests/services/test_issuer_manager.py | 23 ----- lemur/tests/services/test_s3.py | 27 ------ lemur/tests/test_accounts.py | 53 +++++++++++ lemur/tests/test_certificates.py | 85 +++++++++++++++++ .../__init__.py => test_crypto.py} | 0 lemur/tests/{certificates => }/test_csr.py | 0 lemur/tests/test_elb.py | 51 +++++++++++ lemur/tests/test_iam.py | 35 +++++++ lemur/tests/test_issuer_manager.py | 16 ++++ .../test_pack/challenge.txt | 0 .../test_pack/csr_config.txt | 0 .../{certificates => }/test_pack/private.key | 0 .../{certificates => }/test_pack/request.csr | 0 .../{certificates => }/test_pack/server.crt | 0 setup.py | 3 +- 29 files changed, 387 insertions(+), 299 deletions(-) delete mode 100644 lemur/tests/certificates/test_certificates.py create mode 100644 lemur/tests/conftest.py delete mode 100644 lemur/tests/constants.py delete mode 100644 lemur/tests/elbs/__init__.py delete mode 100644 lemur/tests/elbs/test_elbs.py delete mode 100644 lemur/tests/listeners/__init__.py delete mode 100644 lemur/tests/services/__init__.py delete mode 100644 lemur/tests/services/test_elb.py delete mode 100644 lemur/tests/services/test_iam.py delete mode 100644 lemur/tests/services/test_issuer_manager.py delete mode 100644 lemur/tests/services/test_s3.py create mode 100644 lemur/tests/test_accounts.py create mode 100644 lemur/tests/test_certificates.py rename lemur/tests/{certificates/__init__.py => test_crypto.py} (100%) rename lemur/tests/{certificates => }/test_csr.py (100%) create mode 100644 lemur/tests/test_elb.py create mode 100644 lemur/tests/test_iam.py create mode 100644 lemur/tests/test_issuer_manager.py rename lemur/tests/{certificates => }/test_pack/challenge.txt (100%) rename lemur/tests/{certificates => }/test_pack/csr_config.txt (100%) rename lemur/tests/{certificates => }/test_pack/private.key (100%) rename lemur/tests/{certificates => }/test_pack/request.csr (100%) rename lemur/tests/{certificates => }/test_pack/server.crt (100%) diff --git a/lemur/database.py b/lemur/database.py index d2a7c742..fe591ced 100644 --- a/lemur/database.py +++ b/lemur/database.py @@ -15,7 +15,7 @@ from sqlalchemy import exc from sqlalchemy.sql import and_, or_ from lemur.extensions import db -from lemur.exceptions import AttrNotFound, IntegrityError +from lemur.exceptions import AttrNotFound, IntegrityError, DuplicateError def filter_none(kwargs): @@ -153,9 +153,10 @@ def create(model): try: db.session.add(model) commit() - db.session.refresh(model) except exc.IntegrityError as e: - raise IntegrityError(e.orig.diag.message_detail) + raise DuplicateError(e.orig.diag.message_detail) + + db.session.refresh(model) return model diff --git a/lemur/exceptions.py b/lemur/exceptions.py index f36bda21..42543a59 100644 --- a/lemur/exceptions.py +++ b/lemur/exceptions.py @@ -11,6 +11,14 @@ class LemurException(Exception): current_app.logger.error(self) +class DuplicateError(LemurException): + def __init__(self, key): + self.key = key + + def __str__(self): + return repr("Duplicate found! Could not create: {0}".format(self.key)) + + class AuthenticationFailedException(LemurException): def __init__(self, remote_ip, user_agent): self.remote_ip = remote_ip diff --git a/lemur/extensions.py b/lemur/extensions.py index 07101c4d..101432e8 100644 --- a/lemur/extensions.py +++ b/lemur/extensions.py @@ -4,7 +4,41 @@ :license: Apache, see LICENSE for more details. """ -from flask.ext.sqlalchemy import SQLAlchemy +from flask.ext.sqlalchemy import SQLAlchemy, SignallingSession, SessionBase + + +class _SignallingSession(SignallingSession): + """A subclass of `SignallingSession` that allows for `binds` to be specified + in the `options` keyword arguments. + + """ + def __init__(self, db, autocommit=False, autoflush=True, **options): + self.app = db.get_app() + self._model_changes = {} + self.emit_modification_signals = \ + self.app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] + + bind = options.pop('bind', None) + if bind is None: + bind = db.engine + + binds = options.pop('binds', None) + if binds is None: + binds = db.get_binds(self.app) + + SessionBase.__init__(self, + autocommit=autocommit, + autoflush=autoflush, + bind=bind, + binds=binds, + **options) + + +class _SQLAlchemy(SQLAlchemy): + """A subclass of `SQLAlchemy` that uses `_SignallingSession`.""" + def create_session(self, options): + return _SignallingSession(self, **options) + db = SQLAlchemy() from flask.ext.migrate import Migrate diff --git a/lemur/factory.py b/lemur/factory.py index 520ee977..9730ebaa 100644 --- a/lemur/factory.py +++ b/lemur/factory.py @@ -51,6 +51,12 @@ def create_app(app_name=None, blueprints=None, config=None): configure_blueprints(app, blueprints) configure_extensions(app) configure_logging(app) + + @app.teardown_appcontext + def teardown(exception=None): + if db.session: + db.session.remove() + return app @@ -84,7 +90,7 @@ def configure_app(app, config=None): :return: """ try: - app.config.from_envvar("LEMUR_SETTINGS") + app.config.from_envvar("LEMUR_CONF") except RuntimeError: if config and config != 'None': app.config.from_object(from_file(config)) diff --git a/lemur/tests/__init__.py b/lemur/tests/__init__.py index 0b6502e9..c293a1cc 100644 --- a/lemur/tests/__init__.py +++ b/lemur/tests/__init__.py @@ -1,16 +1,4 @@ import unittest -from nose.tools import eq_ - -from lemur import app - -test_app = app.test_client() - -HEADERS = {'Content-Type': 'application/json'} - - -def check_content_type(headers): - eq_(headers['Content-Type'], 'application/json') - class LemurTestCase(unittest.TestCase): pass diff --git a/lemur/tests/certificates/test_certificates.py b/lemur/tests/certificates/test_certificates.py deleted file mode 100644 index 1480818c..00000000 --- a/lemur/tests/certificates/test_certificates.py +++ /dev/null @@ -1,87 +0,0 @@ -import os -import shutil -import boto - -from lemur import app -from lemur.tests import LemurTestCase -from lemur.tests.constants import TEST_CERT, TEST_KEY - -from moto import mock_iam, mock_sts, mock_s3 - - -class CertificateTestCase(LemurTestCase): - def test_create_challenge(self): - from lemur.certificates.service import create_challenge - self.assertTrue(len(create_challenge()) >= 24) - - def test_hash_domains(self): - from lemur.certificates.service import hash_domains - h = hash_domains(['netflix.com', 'www.netflix.com', 'movies.netflix.com']) - self.assertEqual('c9c83253b46c7c1245c100ed3f7045eb', h) - - def test_create_csr(self): - from lemur.certificates.service import create_csr - from lemur.tests.certificates.test_csr import TEST_CSR - path = create_csr(['netflix.com'], TEST_CSR) - files = len(os.listdir(path)) - self.assertEqual(files, 4) - shutil.rmtree(path) - - def test_create_san_csr(self): - from lemur.certificates.service import create_csr - from lemur.tests.certificates.test_csr import TEST_CSR - path = create_csr(['netflix.com', 'www.netflix.com'], TEST_CSR) - files = len(os.listdir(path)) - self.assertEqual(files, 4) - shutil.rmtree(path) - - def test_create_path(self): - from lemur.certificates.service import create_path - path = create_path("blah") - self.assertIn('blah', path) - shutil.rmtree(path) - - @mock_s3 - @mock_sts - @mock_iam - def test_save_cert(self): - from lemur.certificates.service import save_cert - from lemur.common.services.aws.iam import get_all_server_certs - conn = boto.connect_s3() - bucket = conn.create_bucket(app.config.get('S3_BUCKET')) - cert = save_cert(TEST_CERT, TEST_KEY, None, "blah", "blah", [1]) - count = 0 - for key in bucket.list(): - count += 1 - - self.assertEqual(count, 4) - certs = get_all_server_certs('1111') - self.assertEqual(len(certs), 1) - -# @mock_s3 -# @mock_sts -# @mock_iam -# def test_upload_cert(self): -# from lemur.certificates.service import upload -# from lemur.common.services.aws.iam import get_all_server_certs -# conn = boto.connect_s3() -# bucket = conn.create_bucket(app.config.get('S3_BUCKET')) -# -# cert_up = {"public_cert": TEST_CERT, "private_key": TEST_KEY, "owner": "test@example.com", "accounts_ids": ['1111']} -# -# cert_name = upload(**cert_up) -# valid_name = 'AHB-dfdsflkj.net-NetflixInc-20140525-20150525' -# self.assertEqual(cert_name, valid_name) -# -# app.logger.debug(cert_name) -# count = 0 -# -# for key in bucket.list(): -# count += 1 -# -# self.assertEqual(count, 2) -# certs = get_all_server_certs('179727101194') -# self.assertEqual(len(certs), 1) -# -# -# diff --git a/lemur/tests/conftest.py b/lemur/tests/conftest.py new file mode 100644 index 00000000..135b5ca7 --- /dev/null +++ b/lemur/tests/conftest.py @@ -0,0 +1,91 @@ +import pytest + +from lemur import create_app +from lemur.database import db as _db + +from flask.ext.sqlalchemy import SignallingSession + +from sqlalchemy import event + + +def pytest_addoption(parser): + parser.addoption("--runslow", action="store_true", help="run slow tests") + + +def pytest_runtest_setup(item): + if 'slow' in item.keywords and not item.config.getoption("--runslow"): + pytest.skip("need --runslow option to run") + + if "incremental" in item.keywords: + previousfailed = getattr(item.parent, "_previousfailed", None) + if previousfailed is not None: + pytest.xfail("previous test failed ({0})".format(previousfailed.name)) + + +def pytest_runtest_makereport(item, call): + if "incremental" in item.keywords: + if call.excinfo is not None: + parent = item.parent + parent._previousfailed = item + + +@pytest.yield_fixture(scope="session") +def app(): + """ + Creates a new Flask application for a test duration. + Uses application factory `create_app`. + """ + app = create_app() + + ctx = app.app_context() + ctx.push() + + yield app + + ctx.pop() + + +@pytest.yield_fixture(scope="session") +def db(): + _db.create_all() + + yield _db + + _db.drop_all() + + +@pytest.yield_fixture(scope="function") +def session(app, db): + """ + Creates a new database session with (with working transaction) + for test duration. + """ + connection = _db.engine.connect() + transaction = connection.begin() + + options = dict(bind=connection) + session = _db.create_scoped_session(options=options) + + # then each time that SAVEPOINT ends, reopen it + @event.listens_for(SignallingSession, "after_transaction_end") + def restart_savepoint(session, transaction): + if transaction.nested and not transaction._parent.nested: + + # ensure that state is expired the way + # session.commit() at the top level normally does + # (optional step) + session.expire_all() + + session.begin_nested() + + # pushing new Flask application context for multiple-thread + # tests to work + + _db.session = session + + yield session + + # the code after the yield statement works as a teardown + transaction.rollback() + connection.close() + session.remove() diff --git a/lemur/tests/constants.py b/lemur/tests/constants.py deleted file mode 100644 index 8632e76a..00000000 --- a/lemur/tests/constants.py +++ /dev/null @@ -1,51 +0,0 @@ -TEST_KEY = """-----BEGIN RSA PRIVATE KEY----- -MIIEogIBAAKCAQEAvNudwW+UeQqkpY71MIdEg501AFlPKuOXG2xU8DZhvZS6dKv+ -kDmIWdEqodDgkQiy0jyTgTwxwRqDSw96R6ZgrXefUoJJo66aCsosTBZtVaE85f1L -bj2+3U678c+rekUdkrnGcGCo6b8QtdvBpiDy2clneox8tSvmffAdcR1uCv/790/k -PzQ/djWDX9JcBRyDkcTJwYC0/ek7URvA/+MXmgUL13T+gWKqduaKuIBlFetonDjn -nO11QUBiusIuHV62wzKn8m5Nc+4XoaBR0YWMFn/g6qXDYrwfCsMpka7vSWJFv5Ff -yf+7kY3wU4xIwU2vXlIDcCsdUu6b/pYoQ0YOsQIDAQABAoIBAGbFH6iWnnXrq8MH -8zcQNOFmF+RztRgCt0TOA76f6TowB/LbcXBsTl2J7CgYMUvbLuwm2KHX7r9FPTMI -XiNFT5C16rYMfiQbLGo4sDhLb/3L+wawem6oHQfzA2VH++lSWRByFaEriF+CgIZl -6pALl/uZlLzkXCx+kjPwCSV3vV0wFkDnNs6+wPrz2IhkePsuC8J0QKQLlwsES2It -Gizzhpehdv9lc9MyZC//1QlD9gMDl5ok5Bt1Xm2c12XUEEcLlKQkJxiOrBOfXPmV -PHCdLc7gZO30hc6dyQ1SSnLpywhz/a0ir2GMvkMbS5hculpcZmwEcdZl1HYD8ObP -yOMbPE0CgYEA4LVGJKGtbM8RiBB0MstxNstMYVJ4mXB0lSQ0RazdO3S3ojn+oLpF -b2pvV6m9WnHiCGigWkzhqtGGCo6aqE0MoiR4jTN8GhiZz4ggDDaVgc4Px5reUD+r -tRsTpBHseGQ+ODGgkMI8eJYkdyqkECkYjAOrdy6uorvgxUAZecRIfJMCgYEA1yhM -7NidTNRuA+huS5GcQwQweTM6P1qF7Kfk1JYQMVu4gibLZiLHlWCyHI9lrbI7IaMm -g/4jXXoewv7IvyrrSEFulkPeVWxCe3mjfQ8JANfUj4kuR915LSn4lX2pbUgUS66K -vJSUJtnzLUmb8khLEcOmDbmTFZl8D/bTHFFZlisCgYAeelfWNhuoq3lMRDcOgKuN -bAujE6WJ4kfdxrhUTvr+ynjxxv3zXPB4CS6q7Dnjn5ix3UcKmGzvV1Xf7rGpbDHv -eBTlyfrmKzoJfQQjw++JWKKpRycqKUin2tFSKqAxQB90Tb7ig4XiMTMm+qCgFILg -0sqZ8rn7FpKJDoWmD2ppgwKBgG2Dl9QeVcKbhfv7PNi+HvmFkl6+knFY1D4nHzSN -xWQ6OWoV8QXlwgzokQA0hR6qT6rJbntUyg90b1/1a5zSbbvzgiR+GxcD6bsLqQmo -s354XTtKKgJuWpWAfYUp1ylGvP3gs8FVJyu3WC2+/9+MqJk8KrNlt9YQr7M4gTAy -wBTNAoGAGU7Po4uI3xDKGLLK/ot3D3P8U9ByfeLlrUZtTz1PASsMOr92bkXmUPlE -DYUd5uFfwwlvbMNT1Ooeyrzg3bARd9B6ATyMkOaJeGoQwFAI468iucnm9rNXB+/t -U2rbIi1pXSm8zSNEY85tf6C8DU/5YbcAPf47a2UYhwCpYAJfMk0= ------END RSA PRIVATE KEY-----""" - -TEST_CERT = """-----BEGIN CERTIFICATE----- -MIIDcDCCAlgCCQC8msHu/aa61zANBgkqhkiG9w0BAQUFADB6MQswCQYDVQQGEwJV -UzETMBEGA1UECBMKQ0FMSUZPUk5JQTESMBAGA1UEBxMJTG9zIEdhdG9zMRYwFAYD -VQQKEw1OZXRmbGl4LCBJbmMuMRMwEQYDVQQLEwpPcGVyYXRpb25zMRUwEwYDVQQD -EwxkZmRzZmxrai5uZXQwHhcNMTQwNTI1MTczMDMzWhcNMTUwNTI1MTczMDMzWjB6 -MQswCQYDVQQGEwJVUzETMBEGA1UECBMKQ0FMSUZPUk5JQTESMBAGA1UEBxMJTG9z -IEdhdG9zMRYwFAYDVQQKEw1OZXRmbGl4LCBJbmMuMRMwEQYDVQQLEwpPcGVyYXRp -b25zMRUwEwYDVQQDEwxkZmRzZmxrai5uZXQwggEiMA0GCSqGSIb3DQEBAQUAA4IB -DwAwggEKAoIBAQC8253Bb5R5CqSljvUwh0SDnTUAWU8q45cbbFTwNmG9lLp0q/6Q -OYhZ0Sqh0OCRCLLSPJOBPDHBGoNLD3pHpmCtd59SgkmjrpoKyixMFm1VoTzl/Utu -Pb7dTrvxz6t6RR2SucZwYKjpvxC128GmIPLZyWd6jHy1K+Z98B1xHW4K//v3T+Q/ -ND92NYNf0lwFHIORxMnBgLT96TtRG8D/4xeaBQvXdP6BYqp25oq4gGUV62icOOec -7XVBQGK6wi4dXrbDMqfybk1z7hehoFHRhYwWf+DqpcNivB8KwymRru9JYkW/kV/J -/7uRjfBTjEjBTa9eUgNwKx1S7pv+lihDRg6xAgMBAAEwDQYJKoZIhvcNAQEFBQAD -ggEBAJHwa4l2iSiFBb6wVFBJEWEt31qp+njiVCoTg2OJzCT60Xb26hkrsiTldIIh -eB9+y+fwdfwopzWhkNbIOlCfudx/uxtpor8/3BRbjSlNwDUg2L8pfAircJMFLQUM -O6nqPOBWCe8hXwe9FQM/oFOavf/AAw/FED+892xlytjirK9u3B28O20W11+fY7hp -8LQVBrMoVxFeLWmmwETAltJ7HEYutplRzYTM0vLBARl4Vd5kLJlY3j2Dp1ZpRGcg -CrQp26UD/oaAPGtiZQSC4LJ+4JfOuuqbm3CI24QMCh9rxv3ZoOQnFuC+7cZgqrat -V4bxCrVvWhrrDSgy9+A80NVzQ3k= ------END CERTIFICATE-----""" - - diff --git a/lemur/tests/elbs/__init__.py b/lemur/tests/elbs/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/lemur/tests/elbs/test_elbs.py b/lemur/tests/elbs/test_elbs.py deleted file mode 100644 index 1d31fd12..00000000 --- a/lemur/tests/elbs/test_elbs.py +++ /dev/null @@ -1,5 +0,0 @@ -import os -import shutil -from lemur import app -from lemur.tests import LemurTestCase - diff --git a/lemur/tests/listeners/__init__.py b/lemur/tests/listeners/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/lemur/tests/services/__init__.py b/lemur/tests/services/__init__.py deleted file mode 100644 index e69de29b..00000000 diff --git a/lemur/tests/services/test_elb.py b/lemur/tests/services/test_elb.py deleted file mode 100644 index 0303f77e..00000000 --- a/lemur/tests/services/test_elb.py +++ /dev/null @@ -1,51 +0,0 @@ -import boto -from lemur.tests import LemurTestCase - -from moto import mock_elb, mock_sts - - -class ELBTestCase(LemurTestCase): - @mock_sts - @mock_elb - def test_add_listener(self): - from lemur.common.services.aws.elb import create_new_listeners - conn = boto.connect_elb() - zones = ['us-east-1a', 'us-east-1b'] - ports = [(80, 8080, 'http')] - conn.create_load_balancer('my-lb', zones, ports) - create_new_listeners('111', 'us-east-1', 'my-lb', listeners=[('443', '80', 'HTTP')]) - balancer = conn.get_all_load_balancers()[0] - self.assertEqual(balancer.name, "my-lb") - self.assertEqual(len(balancer.listeners), 2) - - @mock_sts - @mock_elb - def test_update_listener(self): - from lemur.common.services.aws.elb import update_listeners - conn = boto.connect_elb() - zones = ['us-east-1a', 'us-east-1b'] - ports = [(80, 8080, 'http')] - conn.create_load_balancer('my-lb', zones, ports) - update_listeners('111', 'us-east-1', 'my-lb', listeners=[('80', '7001', 'http')]) - balancer = conn.get_all_load_balancers()[0] - listener = balancer.listeners[0] - self.assertEqual(listener.load_balancer_port, 80) - self.assertEqual(listener.instance_port, 7001) - self.assertEqual(listener.protocol, "HTTP") - - @mock_sts - @mock_elb - def test_set_certificate(self): - from lemur.common.services.aws.elb import attach_certificate - conn = boto.connect_elb() - zones = ['us-east-1a', 'us-east-1b'] - ports = [(443, 7001, 'https', 'sslcert')] - conn.create_load_balancer('my-lb', zones, ports) - attach_certificate('1111', 'us-east-1', 'my-lb', 443, 'somecert') - balancer = conn.get_all_load_balancers()[0] - listener = balancer.listeners[0] - self.assertEqual(listener.load_balancer_port, 443) - self.assertEqual(listener.instance_port, 7001) - self.assertEqual(listener.protocol, "HTTPS") - self.assertEqual(listener.ssl_certificate_id, 'somecert') - diff --git a/lemur/tests/services/test_iam.py b/lemur/tests/services/test_iam.py deleted file mode 100644 index 68315e1b..00000000 --- a/lemur/tests/services/test_iam.py +++ /dev/null @@ -1,37 +0,0 @@ -from lemur import app -from lemur.tests import LemurTestCase -from lemur.tests.constants import TEST_CERT, TEST_KEY - -from lemur.certificates.models import Certificate - -from moto import mock_iam, mock_sts - - -class IAMTestCase(LemurTestCase): - @mock_sts - @mock_iam - def test_get_all_server_certs(self): - from lemur.common.services.aws.iam import upload_cert, get_all_server_certs - cert = Certificate(TEST_CERT) - upload_cert('1111', cert, TEST_KEY) - certs = get_all_server_certs('1111') - self.assertEquals(len(certs), 1) - - @mock_sts - @mock_iam - def test_get_server_cert(self): - from lemur.common.services.aws.iam import upload_cert, get_cert_from_arn - cert = Certificate(TEST_CERT) - upload_cert('1111', cert, TEST_KEY) - body, chain = get_cert_from_arn('arn:aws:iam::123456789012:server-certificate/AHB-dfdsflkj.net-NetflixInc-20140525-20150525') - self.assertTrue(body) - - @mock_sts - @mock_iam - def test_upload_server_cert(self): - from lemur.common.services.aws.iam import upload_cert - cert = Certificate(TEST_CERT) - response = upload_cert('1111', cert, TEST_KEY) - self.assertEquals(response['upload_server_certificate_response']['upload_server_certificate_result']['server_certificate_metadata']['server_certificate_name'], 'AHB-dfdsflkj.net-NetflixInc-20140525-20150525') - - diff --git a/lemur/tests/services/test_issuer_manager.py b/lemur/tests/services/test_issuer_manager.py deleted file mode 100644 index 6408438d..00000000 --- a/lemur/tests/services/test_issuer_manager.py +++ /dev/null @@ -1,23 +0,0 @@ -from lemur import app -from lemur.tests import LemurTestCase -from lemur.tests.constants import TEST_CERT, TEST_KEY - -from lemur.certificates.models import Certificate - -from moto import mock_iam, mock_sts - - -class ManagerTestCase(LemurTestCase): - def test_validate_authority(self): - pass - - def test_get_all_authorities(self): - from lemur.common.services.issuers.manager import get_all_authorities - authorities = get_all_authorities() - self.assertEqual(len(authorities), 3) - - def test_get_all_issuers(self): - from lemur.common.services.issuers.manager import get_all_issuers - issuers = get_all_issuers() - self.assertEqual(len(issuers) > 1) - diff --git a/lemur/tests/services/test_s3.py b/lemur/tests/services/test_s3.py deleted file mode 100644 index f5de669f..00000000 --- a/lemur/tests/services/test_s3.py +++ /dev/null @@ -1,27 +0,0 @@ -import boto - -from lemur.tests import LemurTestCase -from lemur.tests.constants import TEST_CERT - -from lemur.certificates.models import Certificate - -from moto import mock_s3 - - -class S3TestCase(LemurTestCase): - @mock_s3 - def test_save(self): - from lemur.common.services.aws.s3 import save - conn = boto.connect_s3() - - cert = Certificate(TEST_CERT) - - buck = conn.create_bucket('test') - path = save(cert, 'private_key', None, 'csr_config', 'challenge') - self.assertEqual(path, 'lemur/{}/{}/'.format(cert.issuer, cert.name)) - - count = 0 - for key in buck.list(): - count += 1 - - self.assertEqual(count, 4) diff --git a/lemur/tests/test_accounts.py b/lemur/tests/test_accounts.py new file mode 100644 index 00000000..19f5bd8d --- /dev/null +++ b/lemur/tests/test_accounts.py @@ -0,0 +1,53 @@ + +import pytest +from lemur.accounts.service import * +from lemur.exceptions import DuplicateError + +from lemur.accounts.views import * + +#def test_crud(session): +# account = create('111111', 'account1') +# assert account.id > 0 +# +# account = update(account.id, 11111, 'account2') +# assert account.label == 'account2' +# +# assert len(get_all()) == 1 +# +# delete(1) +# assert len(get_all()) == 0 +# + +#def test_duplicate(session): +# account = create('111111', 'account1') +# assert account.id > 0 +# +# with pytest.raises(DuplicateError): +# account = create('111111', 'account1') + + +def test_basic_user_views(client): + pass + + +def test_admin_user_views(client): + pass + +def test_unauthenticated_views(client): + assert client.get(api.url_for(Accounts, account_id=1)).status_code == 401 + assert client.post(api.url_for(Accounts, account_id=1), {}).status_code == 405 + assert client.put(api.url_for(Accounts, account_id=1), {}).status_code == 401 + assert client.delete(api.url_for(Accounts, account_id=1)).status_code == 401 + assert client.patch(api.url_for(Accounts, account_id=1), {}).status_code == 405 + + assert client.get(api.url_for(AccountsList)).status_code == 401 + assert client.post(api.url_for(AccountsList), {}).status_code == 401 + assert client.put(api.url_for(AccountsList), {}).status_code == 405 + assert client.delete(api.url_for(AccountsList)).status_code == 405 + assert client.patch(api.url_for(Accounts), {}).status_code == 405 + + assert client.get(api.url_for(CertificateAccounts, certificate_id=1)).status_code == 401 + assert client.post(api.url_for(CertificateAccounts), {}).status_code == 405 + assert client.put(api.url_for(CertificateAccounts), {}).status_code == 405 + assert client.delete(api.url_for(CertificateAccounts)).status_code == 405 + assert client.patch(api.url_for(CertificateAccounts), {}).status_code == 405 diff --git a/lemur/tests/test_certificates.py b/lemur/tests/test_certificates.py new file mode 100644 index 00000000..e24251c5 --- /dev/null +++ b/lemur/tests/test_certificates.py @@ -0,0 +1,85 @@ +import os +import shutil + +import boto +from moto import mock_iam, mock_sts, mock_s3 + +from lemur.tests import LemurTestCase + + +#class CertificateTestCase(LemurTestCase): +# def test_create_challenge(self): +# from lemur.certificates.service import create_challenge +# self.assertTrue(len(create_challenge()) >= 24) +# +# def test_hash_domains(self): +# from lemur.certificates.service import hash_domains +# h = hash_domains(['netflix.com', 'www.netflix.com', 'movies.netflix.com']) +# self.assertEqual('c9c83253b46c7c1245c100ed3f7045eb', h) +# +# def test_create_csr(self): +# from lemur.certificates.service import create_csr +# from lemur.tests.test_csr import TEST_CSR +# path = create_csr(['netflix.com'], TEST_CSR) +# files = len(os.listdir(path)) +# self.assertEqual(files, 4) +# shutil.rmtree(path) +# +# def test_create_san_csr(self): +# from lemur.certificates.service import create_csr +# from lemur.tests.test_csr import TEST_CSR +# path = create_csr(['netflix.com', 'www.netflix.com'], TEST_CSR) +# files = len(os.listdir(path)) +# self.assertEqual(files, 4) +# shutil.rmtree(path) +# +# def test_create_path(self): +# from lemur.certificates.service import create_path +# path = create_path("blah") +# self.assertIn('blah', path) +# shutil.rmtree(path) +# +# @mock_s3 +# @mock_sts +# @mock_iam +# def test_save_cert(self): +# from lemur.certificates.service import save_cert +# from lemur.common.services.aws.iam import get_all_server_certs +# conn = boto.connect_s3() +# bucket = conn.create_bucket(app.config.get('S3_BUCKET')) +# cert = save_cert(TEST_CERT, TEST_KEY, None, "blah", "blah", [1]) +# count = 0 +# for key in bucket.list(): +# count += 1 +# +# self.assertEqual(count, 4) +# certs = get_all_server_certs('1111') +# self.assertEqual(len(certs), 1) +# +## @mock_s3 +## @mock_sts +## @mock_iam +## def test_upload_cert(self): +## from lemur.certificates.service import upload +## from lemur.common.services.aws.iam import get_all_server_certs +## conn = boto.connect_s3() +## bucket = conn.create_bucket(app.config.get('S3_BUCKET')) +## +## cert_up = {"public_cert": TEST_CERT, "private_key": TEST_KEY, "owner": "test@example.com", "accounts_ids": ['1111']} +## +## cert_name = upload(**cert_up) +## valid_name = 'AHB-dfdsflkj.net-NetflixInc-20140525-20150525' +## self.assertEqual(cert_name, valid_name) +## +## app.logger.debug(cert_name) +## count = 0 +## +## for key in bucket.list(): +## count += 1 +## +## self.assertEqual(count, 2) +## certs = get_all_server_certs('179727101194') +## self.assertEqual(len(certs), 1) +## +## +## diff --git a/lemur/tests/certificates/__init__.py b/lemur/tests/test_crypto.py similarity index 100% rename from lemur/tests/certificates/__init__.py rename to lemur/tests/test_crypto.py diff --git a/lemur/tests/certificates/test_csr.py b/lemur/tests/test_csr.py similarity index 100% rename from lemur/tests/certificates/test_csr.py rename to lemur/tests/test_csr.py diff --git a/lemur/tests/test_elb.py b/lemur/tests/test_elb.py new file mode 100644 index 00000000..bc4a10c7 --- /dev/null +++ b/lemur/tests/test_elb.py @@ -0,0 +1,51 @@ +import boto +from lemur.tests import LemurTestCase + +from moto import mock_elb, mock_sts + + +#class ELBTestCase(LemurTestCase): +# @mock_sts +# @mock_elb +# def test_add_listener(self): +# from lemur.common.services.aws.elb import create_new_listeners +# conn = boto.connect_elb() +# zones = ['us-east-1a', 'us-east-1b'] +# ports = [(80, 8080, 'http')] +# conn.create_load_balancer('my-lb', zones, ports) +# create_new_listeners('111', 'us-east-1', 'my-lb', listeners=[('443', '80', 'HTTP')]) +# balancer = conn.get_all_load_balancers()[0] +# self.assertEqual(balancer.name, "my-lb") +# self.assertEqual(len(balancer.listeners), 2) +# +# @mock_sts +# @mock_elb +# def test_update_listener(self): +# from lemur.common.services.aws.elb import update_listeners +# conn = boto.connect_elb() +# zones = ['us-east-1a', 'us-east-1b'] +# ports = [(80, 8080, 'http')] +# conn.create_load_balancer('my-lb', zones, ports) +# update_listeners('111', 'us-east-1', 'my-lb', listeners=[('80', '7001', 'http')]) +# balancer = conn.get_all_load_balancers()[0] +# listener = balancer.listeners[0] +# self.assertEqual(listener.load_balancer_port, 80) +# self.assertEqual(listener.instance_port, 7001) +# self.assertEqual(listener.protocol, "HTTP") +# +# @mock_sts +# @mock_elb +# def test_set_certificate(self): +# from lemur.common.services.aws.elb import attach_certificate +# conn = boto.connect_elb() +# zones = ['us-east-1a', 'us-east-1b'] +# ports = [(443, 7001, 'https', 'sslcert')] +# conn.create_load_balancer('my-lb', zones, ports) +# attach_certificate('1111', 'us-east-1', 'my-lb', 443, 'somecert') +# balancer = conn.get_all_load_balancers()[0] +# listener = balancer.listeners[0] +# self.assertEqual(listener.load_balancer_port, 443) +# self.assertEqual(listener.instance_port, 7001) +# self.assertEqual(listener.protocol, "HTTPS") +# self.assertEqual(listener.ssl_certificate_id, 'somecert') +# diff --git a/lemur/tests/test_iam.py b/lemur/tests/test_iam.py new file mode 100644 index 00000000..2405f9b5 --- /dev/null +++ b/lemur/tests/test_iam.py @@ -0,0 +1,35 @@ +from lemur.tests import LemurTestCase + +from lemur.certificates.models import Certificate + +from moto import mock_iam, mock_sts + + +#class IAMTestCase(LemurTestCase): +# @mock_sts +# @mock_iam +# def test_get_all_server_certs(self): +# from lemur.common.services.aws.iam import upload_cert, get_all_server_certs +# cert = Certificate(TEST_CERT) +# upload_cert('1111', cert, TEST_KEY) +# certs = get_all_server_certs('1111') +# self.assertEquals(len(certs), 1) +# +# @mock_sts +# @mock_iam +# def test_get_server_cert(self): +# from lemur.common.services.aws.iam import upload_cert, get_cert_from_arn +# cert = Certificate(TEST_CERT) +# upload_cert('1111', cert, TEST_KEY) +# body, chain = get_cert_from_arn('arn:aws:iam::123456789012:server-certificate/AHB-dfdsflkj.net-NetflixInc-20140525-20150525') +# self.assertTrue(body) +# +# @mock_sts +# @mock_iam +# def test_upload_server_cert(self): +# from lemur.common.services.aws.iam import upload_cert +# cert = Certificate(TEST_CERT) +# response = upload_cert('1111', cert, TEST_KEY) +# self.assertEquals(response['upload_server_certificate_response']['upload_server_certificate_result']['server_certificate_metadata']['server_certificate_name'], 'AHB-dfdsflkj.net-NetflixInc-20140525-20150525') +# +# diff --git a/lemur/tests/test_issuer_manager.py b/lemur/tests/test_issuer_manager.py new file mode 100644 index 00000000..5eaec09b --- /dev/null +++ b/lemur/tests/test_issuer_manager.py @@ -0,0 +1,16 @@ +from lemur.tests import LemurTestCase + +#class ManagerTestCase(LemurTestCase): +# def test_validate_authority(self): +# pass +# +# def test_get_all_authorities(self): +# from lemur.common.services.issuers.manager import get_all_authorities +# authorities = get_all_authorities() +# self.assertEqual(len(authorities), 3) +# +# def test_get_all_issuers(self): +# from lemur.common.services.issuers.manager import get_all_issuers +# issuers = get_all_issuers() +# self.assertEqual(len(issuers) > 1) +# diff --git a/lemur/tests/certificates/test_pack/challenge.txt b/lemur/tests/test_pack/challenge.txt similarity index 100% rename from lemur/tests/certificates/test_pack/challenge.txt rename to lemur/tests/test_pack/challenge.txt diff --git a/lemur/tests/certificates/test_pack/csr_config.txt b/lemur/tests/test_pack/csr_config.txt similarity index 100% rename from lemur/tests/certificates/test_pack/csr_config.txt rename to lemur/tests/test_pack/csr_config.txt diff --git a/lemur/tests/certificates/test_pack/private.key b/lemur/tests/test_pack/private.key similarity index 100% rename from lemur/tests/certificates/test_pack/private.key rename to lemur/tests/test_pack/private.key diff --git a/lemur/tests/certificates/test_pack/request.csr b/lemur/tests/test_pack/request.csr similarity index 100% rename from lemur/tests/certificates/test_pack/request.csr rename to lemur/tests/test_pack/request.csr diff --git a/lemur/tests/certificates/test_pack/server.crt b/lemur/tests/test_pack/server.crt similarity index 100% rename from lemur/tests/certificates/test_pack/server.crt rename to lemur/tests/test_pack/server.crt diff --git a/setup.py b/setup.py index ad3815a0..5e07860b 100644 --- a/setup.py +++ b/setup.py @@ -46,7 +46,8 @@ install_requires=[ tests_require = [ 'pyflakes', 'moto', - 'nose' + 'nose', + 'pytest' ] docs_require = [ From 75e5bdfa55476d70e1354b730bf6b6d52f5616e3 Mon Sep 17 00:00:00 2001 From: Kevin Glisson Date: Thu, 25 Jun 2015 13:43:42 -0700 Subject: [PATCH 02/13] Adding some structure for authenticated tests --- lemur/accounts/views.py | 2 +- lemur/auth/service.py | 3 +-- lemur/tests/conftest.py | 45 ++++++++++++++++++++++++++---------- lemur/tests/test_accounts.py | 13 +++-------- 4 files changed, 38 insertions(+), 25 deletions(-) diff --git a/lemur/accounts/views.py b/lemur/accounts/views.py index b3304af4..2729ec47 100644 --- a/lemur/accounts/views.py +++ b/lemur/accounts/views.py @@ -181,7 +181,7 @@ class Accounts(AuthenticatedResource): @marshal_items(FIELDS) def put(self, account_id): """ - .. http:post:: /accounts/1 + .. http:put:: /accounts/1 Updates an account diff --git a/lemur/auth/service.py b/lemur/auth/service.py index 0675f640..facad7c4 100644 --- a/lemur/auth/service.py +++ b/lemur/auth/service.py @@ -96,9 +96,8 @@ def login_required(f): response.status_code = 401 return response - token = request.headers.get('Authorization').split()[1] - try: + token = request.headers.get('Authorization').split()[1] payload = jwt.decode(token, current_app.config['TOKEN_SECRET']) except jwt.DecodeError: return dict(message='Token is invalid'), 403 diff --git a/lemur/tests/conftest.py b/lemur/tests/conftest.py index 135b5ca7..2d680850 100644 --- a/lemur/tests/conftest.py +++ b/lemur/tests/conftest.py @@ -1,9 +1,11 @@ import pytest +from flask import current_app + from lemur import create_app -from lemur.database import db as _db from flask.ext.sqlalchemy import SignallingSession +from flask.ext.principal import Identity, identity_changed from sqlalchemy import event @@ -45,26 +47,45 @@ def app(): ctx.pop() -@pytest.yield_fixture(scope="session") -def db(): - _db.create_all() - - yield _db - - _db.drop_all() +@pytest.yield_fixture(scope="function") +def unauth_client(app): + with app.test_client() as client: + yield client @pytest.yield_fixture(scope="function") -def session(app, db): +def auth_client(app): + with app.test_client() as client: + yield client + + +@pytest.yield_fixture(scope="function") +def admin_client(app): + with app.test_client() as client: + yield client + + + +@pytest.yield_fixture(scope="session") +def database(app): + app.db.create_all() + + yield app.db + + app.db.drop_all() + + +@pytest.yield_fixture(scope="function") +def session(database): """ Creates a new database session with (with working transaction) for test duration. """ - connection = _db.engine.connect() + connection = database.engine.connect() transaction = connection.begin() options = dict(bind=connection) - session = _db.create_scoped_session(options=options) + session = database.create_scoped_session(options=options) # then each time that SAVEPOINT ends, reopen it @event.listens_for(SignallingSession, "after_transaction_end") @@ -81,7 +102,7 @@ def session(app, db): # pushing new Flask application context for multiple-thread # tests to work - _db.session = session + database.session = session yield session diff --git a/lemur/tests/test_accounts.py b/lemur/tests/test_accounts.py index 19f5bd8d..3d1de94d 100644 --- a/lemur/tests/test_accounts.py +++ b/lemur/tests/test_accounts.py @@ -40,14 +40,7 @@ def test_unauthenticated_views(client): assert client.delete(api.url_for(Accounts, account_id=1)).status_code == 401 assert client.patch(api.url_for(Accounts, account_id=1), {}).status_code == 405 - assert client.get(api.url_for(AccountsList)).status_code == 401 - assert client.post(api.url_for(AccountsList), {}).status_code == 401 - assert client.put(api.url_for(AccountsList), {}).status_code == 405 - assert client.delete(api.url_for(AccountsList)).status_code == 405 - assert client.patch(api.url_for(Accounts), {}).status_code == 405 +VALID_TOKEN = 'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpYXQiOjE0MzUyMzMzNjksInN1YiI6MSwiZXhwIjoxNTIxNTQ2OTY5fQ.1qCi0Ip7mzKbjNh0tVd3_eJOrae3rNa_9MCVdA4WtQI' - assert client.get(api.url_for(CertificateAccounts, certificate_id=1)).status_code == 401 - assert client.post(api.url_for(CertificateAccounts), {}).status_code == 405 - assert client.put(api.url_for(CertificateAccounts), {}).status_code == 405 - assert client.delete(api.url_for(CertificateAccounts)).status_code == 405 - assert client.patch(api.url_for(CertificateAccounts), {}).status_code == 405 +def test_auth_account_get(auth_client): + assert auth_client.get(api.url_for(Accounts, account_id=1), headers={'Authorization': 'Basic ' + VALID_TOKEN}).status_code == 200 \ No newline at end of file From 2a3fac11e48573f50437d78a1db5b1bc83433c38 Mon Sep 17 00:00:00 2001 From: Kevin Glisson Date: Thu, 25 Jun 2015 18:05:52 -0700 Subject: [PATCH 03/13] Adding more tests to the accounts model --- lemur/extensions.py | 36 +-------- lemur/tests/conftest.py | 90 ++++++++------------- lemur/tests/test_accounts.py | 150 +++++++++++++++++++++++++++-------- 3 files changed, 152 insertions(+), 124 deletions(-) diff --git a/lemur/extensions.py b/lemur/extensions.py index 101432e8..07101c4d 100644 --- a/lemur/extensions.py +++ b/lemur/extensions.py @@ -4,41 +4,7 @@ :license: Apache, see LICENSE for more details. """ -from flask.ext.sqlalchemy import SQLAlchemy, SignallingSession, SessionBase - - -class _SignallingSession(SignallingSession): - """A subclass of `SignallingSession` that allows for `binds` to be specified - in the `options` keyword arguments. - - """ - def __init__(self, db, autocommit=False, autoflush=True, **options): - self.app = db.get_app() - self._model_changes = {} - self.emit_modification_signals = \ - self.app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] - - bind = options.pop('bind', None) - if bind is None: - bind = db.engine - - binds = options.pop('binds', None) - if binds is None: - binds = db.get_binds(self.app) - - SessionBase.__init__(self, - autocommit=autocommit, - autoflush=autoflush, - bind=bind, - binds=binds, - **options) - - -class _SQLAlchemy(SQLAlchemy): - """A subclass of `SQLAlchemy` that uses `_SignallingSession`.""" - def create_session(self, options): - return _SignallingSession(self, **options) - +from flask.ext.sqlalchemy import SQLAlchemy db = SQLAlchemy() from flask.ext.migrate import Migrate diff --git a/lemur/tests/conftest.py b/lemur/tests/conftest.py index 2d680850..254a7e27 100644 --- a/lemur/tests/conftest.py +++ b/lemur/tests/conftest.py @@ -1,13 +1,9 @@ import pytest -from flask import current_app - from lemur import create_app - -from flask.ext.sqlalchemy import SignallingSession -from flask.ext.principal import Identity, identity_changed - -from sqlalchemy import event +from lemur.database import db as _db +from lemur.users import service as user_service +from lemur.roles import service as role_service def pytest_addoption(parser): @@ -38,6 +34,7 @@ def app(): Uses application factory `create_app`. """ app = create_app() + app.config['TESTING'] = True ctx = app.app_context() ctx.push() @@ -47,66 +44,45 @@ def app(): ctx.pop() -@pytest.yield_fixture(scope="function") -def unauth_client(app): - with app.test_client() as client: - yield client - - -@pytest.yield_fixture(scope="function") -def auth_client(app): - with app.test_client() as client: - yield client - - -@pytest.yield_fixture(scope="function") -def admin_client(app): - with app.test_client() as client: - yield client - - @pytest.yield_fixture(scope="session") -def database(app): - app.db.create_all() +def db(app, request): + _db.drop_all() + _db.create_all() - yield app.db + _db.app = app - app.db.drop_all() + yield _db + + _db.drop_all() @pytest.yield_fixture(scope="function") -def session(database): +def session(db, request): """ Creates a new database session with (with working transaction) for test duration. """ - connection = database.engine.connect() - transaction = connection.begin() - - options = dict(bind=connection) - session = database.create_scoped_session(options=options) - - # then each time that SAVEPOINT ends, reopen it - @event.listens_for(SignallingSession, "after_transaction_end") - def restart_savepoint(session, transaction): - if transaction.nested and not transaction._parent.nested: - - # ensure that state is expired the way - # session.commit() at the top level normally does - # (optional step) - session.expire_all() - - session.begin_nested() - - # pushing new Flask application context for multiple-thread - # tests to work - - database.session = session - + db.session.begin_nested() yield session + db.session.rollback() + + +@pytest.yield_fixture(scope="session") +def default_user(db): + user = user_service.create('user', 'test', 'user@example.com', True, None, []) + yield user + + +@pytest.yield_fixture(scope="session") +def admin_user(db): + admin_role = role_service.create('admin') + admin = user_service.create('admin', 'admin', 'admin@example.com', True, None, [admin_role]) + yield admin + + +@pytest.yield_fixture(scope="function") +def client(app): + with app.test_client() as client: + yield client - # the code after the yield statement works as a teardown - transaction.rollback() - connection.close() - session.remove() diff --git a/lemur/tests/test_accounts.py b/lemur/tests/test_accounts.py index 3d1de94d..08239afb 100644 --- a/lemur/tests/test_accounts.py +++ b/lemur/tests/test_accounts.py @@ -1,46 +1,132 @@ - -import pytest from lemur.accounts.service import * -from lemur.exceptions import DuplicateError - from lemur.accounts.views import * -#def test_crud(session): -# account = create('111111', 'account1') -# assert account.id > 0 -# -# account = update(account.id, 11111, 'account2') -# assert account.label == 'account2' -# -# assert len(get_all()) == 1 -# -# delete(1) -# assert len(get_all()) == 0 -# - -#def test_duplicate(session): -# account = create('111111', 'account1') -# assert account.id > 0 -# -# with pytest.raises(DuplicateError): -# account = create('111111', 'account1') +from json import dumps -def test_basic_user_views(client): - pass +def test_crud(session): + account = create('111111', 'account1') + assert account.id > 0 + + account = update(account.id, 11111, 'account2') + assert account.label == 'account2' + + assert len(get_all()) == 1 + + delete(1) + assert len(get_all()) == 0 -def test_admin_user_views(client): - pass - -def test_unauthenticated_views(client): +def test_account_get(client): assert client.get(api.url_for(Accounts, account_id=1)).status_code == 401 + + +def test_account_post(client): assert client.post(api.url_for(Accounts, account_id=1), {}).status_code == 405 + + +def test_account_put(client): assert client.put(api.url_for(Accounts, account_id=1), {}).status_code == 401 + + +def test_account_delete(client): assert client.delete(api.url_for(Accounts, account_id=1)).status_code == 401 + + +def test_account_patch(client): assert client.patch(api.url_for(Accounts, account_id=1), {}).status_code == 405 -VALID_TOKEN = 'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpYXQiOjE0MzUyMzMzNjksInN1YiI6MSwiZXhwIjoxNTIxNTQ2OTY5fQ.1qCi0Ip7mzKbjNh0tVd3_eJOrae3rNa_9MCVdA4WtQI' -def test_auth_account_get(auth_client): - assert auth_client.get(api.url_for(Accounts, account_id=1), headers={'Authorization': 'Basic ' + VALID_TOKEN}).status_code == 200 \ No newline at end of file +VALID_USER_HEADER_TOKEN = { + 'Authorization': 'Basic ' + 'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpYXQiOjE0MzUyMzMzNjksInN1YiI6MSwiZXhwIjoxNTIxNTQ2OTY5fQ.1qCi0Ip7mzKbjNh0tVd3_eJOrae3rNa_9MCVdA4WtQI'} + +def test_auth_account_get(client, default_user): + assert client.get(api.url_for(Accounts, account_id=1), headers=VALID_USER_HEADER_TOKEN).status_code == 200 + + +def test_auth_account_post_(client, default_user): + assert client.post(api.url_for(Accounts, account_id=1), {}, headers=VALID_USER_HEADER_TOKEN).status_code == 405 + + +def test_auth_account_put(client, default_user): + assert client.put(api.url_for(Accounts, account_id=1), data={}, headers=VALID_USER_HEADER_TOKEN).status_code == 403 + + +def test_auth_account_delete(client, default_user): + assert client.delete(api.url_for(Accounts, account_id=1), headers=VALID_USER_HEADER_TOKEN).status_code == 403 + + +def test_auth_account_patch(client, default_user): + assert client.patch(api.url_for(Accounts, account_id=1), {}, headers=VALID_USER_HEADER_TOKEN).status_code == 405 + + +VALID_ADMIN_HEADER_TOKEN = { + 'Authorization': 'Basic ' + 'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpYXQiOjE0MzUyNTAyMTgsInN1YiI6MiwiZXhwIjoxNTIxNTYzODE4fQ.6mbq4-Ro6K5MmuNiTJBB153RDhlM5LGJBjI7GBKkfqA'} + +def test_admin_account_get(client, admin_user): + assert client.get(api.url_for(Accounts, account_id=1), headers=VALID_ADMIN_HEADER_TOKEN).status_code == 200 + + +def test_admin_account_post(client, admin_user): + assert client.post(api.url_for(Accounts, account_id=1), {}, headers=VALID_ADMIN_HEADER_TOKEN).status_code == 405 + + +def test_admin_account_put(client, admin_user): + assert client.put(api.url_for(Accounts, account_id=1), data={}, headers=VALID_ADMIN_HEADER_TOKEN).status_code == 400 + + +def test_admin_account_delete(client, admin_user): + assert client.delete(api.url_for(Accounts, account_id=1), headers=VALID_ADMIN_HEADER_TOKEN).status_code == 500 + + +def test_admin_account_patch(client, admin_user): + assert client.patch(api.url_for(Accounts, account_id=1), {}, headers=VALID_ADMIN_HEADER_TOKEN).status_code == 405 + + +def test_accounts_get(client): + assert client.get(api.url_for(AccountsList)).status_code == 401 + + +def test_accounts_post(client): + assert client.post(api.url_for(AccountsList), {}).status_code == 401 + + +def test_accounts_put(client): + assert client.put(api.url_for(AccountsList), {}).status_code == 405 + + +def test_accounts_delete(client): + assert client.delete(api.url_for(AccountsList)).status_code == 405 + + +def test_accounts_patch(client): + assert client.patch(api.url_for(AccountsList), {}).status_code == 405 + + +def test_auth_accounts_get(client, default_user): + assert client.get(api.url_for(AccountsList), headers=VALID_USER_HEADER_TOKEN).status_code == 200 + + +def test_auth_accounts_post(client, default_user): + assert client.post(api.url_for(AccountsList), {}, headers=VALID_USER_HEADER_TOKEN).status_code == 403 + + +def test_admin_accounts_get(client, admin_user): + resp = client.get(api.url_for(AccountsList), headers=VALID_ADMIN_HEADER_TOKEN) + assert resp.status_code == 200 + assert resp.json == {'items': [], 'total': 0} + + +def test_admin_accounts_crud(client, admin_user): + assert client.post(api.url_for(AccountsList), headers=VALID_ADMIN_HEADER_TOKEN).status_code == 400 + data = {'accountNumber': 111, 'label': 'test', 'comments': 'test'} + resp = client.post(api.url_for(AccountsList), data=dumps(data), content_type='application/json', headers=VALID_ADMIN_HEADER_TOKEN) + assert resp.status_code == 200 + assert client.get(api.url_for(Accounts, account_id=resp.json['id']), headers=VALID_ADMIN_HEADER_TOKEN).status_code == 200 + resp = client.get(api.url_for(AccountsList), headers=VALID_ADMIN_HEADER_TOKEN) + assert resp.status_code == 200 + assert resp.json == {'items': [{'accountNumber': 111, 'label': 'test', 'comments': 'test', 'id': 2}], 'total': 1} + assert client.delete(api.url_for(Accounts, account_id=2), headers=VALID_ADMIN_HEADER_TOKEN).status_code == 200 + resp = client.get(api.url_for(AccountsList), headers=VALID_ADMIN_HEADER_TOKEN) + assert resp.status_code == 200 + assert resp.json == {'items': [], 'total': 0} From 457a63c000242880a2848cd35e66582b32f3e3e1 Mon Sep 17 00:00:00 2001 From: Kevin Glisson Date: Thu, 25 Jun 2015 18:06:47 -0700 Subject: [PATCH 04/13] Removing netflix specific role --- lemur/auth/permissions.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lemur/auth/permissions.py b/lemur/auth/permissions.py index 16d686b7..137b650b 100644 --- a/lemur/auth/permissions.py +++ b/lemur/auth/permissions.py @@ -13,7 +13,7 @@ from flask.ext.principal import Permission, RoleNeed # Permissions operator_permission = Permission(RoleNeed('operator')) -admin_permission = Permission(RoleNeed('secops@netflix.com')) +admin_permission = Permission(RoleNeed('admin')) CertificateCreator = namedtuple('certificate', ['method', 'value']) CertificateCreatorNeed = partial(CertificateCreator, 'certificateView') From 7c996e2f484a0cb45553920f5154bf4b8fd6c304 Mon Sep 17 00:00:00 2001 From: Kevin Glisson Date: Thu, 25 Jun 2015 18:07:21 -0700 Subject: [PATCH 05/13] Removing duplicated commit --- lemur/users/service.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/lemur/users/service.py b/lemur/users/service.py index f3fd84ba..c4eebcee 100644 --- a/lemur/users/service.py +++ b/lemur/users/service.py @@ -31,8 +31,7 @@ def create(username, password, email, active, profile_picture, roles): profile_picture=profile_picture, role=roles ) - user = database.create(user) - return database.update(user) + return database.create(user) def update(user_id, username, email, active, profile_picture, roles): From e2475fb024ffb372021b38584a4bd702a744cfed Mon Sep 17 00:00:00 2001 From: Kevin Glisson Date: Thu, 25 Jun 2015 18:08:04 -0700 Subject: [PATCH 06/13] Adding for handling proxy-based errors --- lemur/auth/service.py | 6 +++++- lemur/common/utils.py | 7 +++++-- 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/lemur/auth/service.py b/lemur/auth/service.py index facad7c4..5fd20f42 100644 --- a/lemur/auth/service.py +++ b/lemur/auth/service.py @@ -98,6 +98,10 @@ def login_required(f): try: token = request.headers.get('Authorization').split()[1] + except Exception as e: + return dict(message='Token is invalid'), 403 + + try: payload = jwt.decode(token, current_app.config['TOKEN_SECRET']) except jwt.DecodeError: return dict(message='Token is invalid'), 403 @@ -108,7 +112,7 @@ def login_required(f): g.current_user = user_service.get(payload['sub']) - if not g.current_user.id: + if not g.current_user: return dict(message='You are not logged in'), 403 # Tell Flask-Principal the identity changed diff --git a/lemur/common/utils.py b/lemur/common/utils.py index 6b23b3d8..55f411e2 100644 --- a/lemur/common/utils.py +++ b/lemur/common/utils.py @@ -45,11 +45,14 @@ class marshal_items(object): return marshal(resp, self.fields) except Exception as e: + current_app.logger.exception(e) # this is a little weird hack to respect flask restful parsing errors on marshaled functions if hasattr(e, 'code'): - return {'message': e.data['message']}, 400 + if hasattr(e, 'data'): + return {'message': e.data['message']}, 400 + else: + return {'message': 'unknown'}, 400 else: - current_app.logger.exception(e) return {'message': e.message}, 400 return wrapper From 9637383f635c232f28080d6f8ef745e57384688c Mon Sep 17 00:00:00 2001 From: Kevin Glisson Date: Fri, 26 Jun 2015 08:09:10 -0700 Subject: [PATCH 07/13] Adding domain module tests --- lemur/tests/test_domains.py | 123 ++++++++++++++++++++++++++++++++++++ 1 file changed, 123 insertions(+) create mode 100644 lemur/tests/test_domains.py diff --git a/lemur/tests/test_domains.py b/lemur/tests/test_domains.py new file mode 100644 index 00000000..2f9b1a7f --- /dev/null +++ b/lemur/tests/test_domains.py @@ -0,0 +1,123 @@ +from lemur.domains.views import * + +def test_domain_get(client): + assert client.get(api.url_for(Domains, domain_id=1)).status_code == 401 + + +def test_domain_post(client): + assert client.post(api.url_for(Domains, domain_id=1), {}).status_code == 405 + + +def test_domain_put(client): + assert client.put(api.url_for(Domains, domain_id=1), {}).status_code == 405 + + +def test_domain_delete(client): + assert client.delete(api.url_for(Domains, domain_id=1)).status_code == 405 + + +def test_domain_patch(client): + assert client.patch(api.url_for(Domains, domain_id=1), {}).status_code == 405 + + +VALID_USER_HEADER_TOKEN = { + 'Authorization': 'Basic ' + 'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpYXQiOjE0MzUyMzMzNjksInN1YiI6MSwiZXhwIjoxNTIxNTQ2OTY5fQ.1qCi0Ip7mzKbjNh0tVd3_eJOrae3rNa_9MCVdA4WtQI'} + +def test_auth_domain_get(client, default_user): + assert client.get(api.url_for(Domains, domain_id=1), headers=VALID_USER_HEADER_TOKEN).status_code == 200 + + +def test_auth_domain_post_(client, default_user): + assert client.post(api.url_for(Domains, domain_id=1), {}, headers=VALID_USER_HEADER_TOKEN).status_code == 405 + + +def test_auth_domain_put(client, default_user): + assert client.put(api.url_for(Domains, domain_id=1), data={}, headers=VALID_USER_HEADER_TOKEN).status_code == 405 + + +def test_auth_domain_delete(client, default_user): + assert client.delete(api.url_for(Domains, domain_id=1), headers=VALID_USER_HEADER_TOKEN).status_code == 405 + + +def test_auth_domain_patch(client, default_user): + assert client.patch(api.url_for(Domains, domain_id=1), {}, headers=VALID_USER_HEADER_TOKEN).status_code == 405 + + +VALID_ADMIN_HEADER_TOKEN = { + 'Authorization': 'Basic ' + 'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpYXQiOjE0MzUyNTAyMTgsInN1YiI6MiwiZXhwIjoxNTIxNTYzODE4fQ.6mbq4-Ro6K5MmuNiTJBB153RDhlM5LGJBjI7GBKkfqA'} + +def test_admin_domain_get(client, admin_user): + assert client.get(api.url_for(Domains, domain_id=1), headers=VALID_ADMIN_HEADER_TOKEN).status_code == 200 + + +def test_admin_domain_post(client, admin_user): + assert client.post(api.url_for(Domains, domain_id=1), {}, headers=VALID_ADMIN_HEADER_TOKEN).status_code == 405 + + +def test_admin_domain_put(client, admin_user): + assert client.put(api.url_for(Domains, domain_id=1), data={}, headers=VALID_ADMIN_HEADER_TOKEN).status_code == 405 + + +def test_admin_domain_delete(client, admin_user): + assert client.delete(api.url_for(Domains, domain_id=1), headers=VALID_ADMIN_HEADER_TOKEN).status_code == 405 + + +def test_admin_domain_patch(client, admin_user): + assert client.patch(api.url_for(Domains, domain_id=1), {}, headers=VALID_ADMIN_HEADER_TOKEN).status_code == 405 + + +def test_domains_get(client): + assert client.get(api.url_for(DomainsList)).status_code == 401 + + +def test_domains_post(client): + assert client.post(api.url_for(DomainsList), {}).status_code == 405 + + +def test_domains_put(client): + assert client.put(api.url_for(DomainsList), {}).status_code == 405 + + +def test_domains_delete(client): + assert client.delete(api.url_for(DomainsList)).status_code == 405 + + +def test_domains_patch(client): + assert client.patch(api.url_for(DomainsList), {}).status_code == 405 + + +def test_auth_domains_get(client, default_user): + assert client.get(api.url_for(DomainsList), headers=VALID_USER_HEADER_TOKEN).status_code == 200 + + +def test_admin_domains_get(client, admin_user): + resp = client.get(api.url_for(DomainsList), headers=VALID_ADMIN_HEADER_TOKEN) + assert resp.status_code == 200 + assert resp.json == {'items': [], 'total': 0} + + +def test_certificate_domains_get(client): + assert client.get(api.url_for(CertificateDomains, certificate_id=1)).status_code == 401 + + +def test_certificate_domains_post(client): + assert client.post(api.url_for(CertificateDomains, certificate_id=1), {}).status_code == 405 + + +def test_certificate_domains_put(client): + assert client.put(api.url_for(CertificateDomains, certificate_id=1), {}).status_code == 405 + + +def test_certificate_domains_delete(client): + assert client.delete(api.url_for(CertificateDomains, certificate_id=1)).status_code == 405 + + +def test_certificate_domains_patch(client): + assert client.patch(api.url_for(CertificateDomains, certificate_id=1), {}).status_code == 405 + + +def test_auth_certificate_domains_get(client, default_user): + assert client.get(api.url_for(CertificateDomains, certificate_id=1), headers=VALID_USER_HEADER_TOKEN).status_code == 200 + +def test_admin_certificate_domains_get(client, admin_user): + assert client.get(api.url_for(CertificateDomains, certificate_id=1), headers=VALID_ADMIN_HEADER_TOKEN).status_code == 200 From c6ae689dc8d760279d92dae6d071fdb60914217c Mon Sep 17 00:00:00 2001 From: Kevin Glisson Date: Fri, 26 Jun 2015 10:31:55 -0700 Subject: [PATCH 08/13] Adding role tests --- lemur/certificates/views.py | 2 +- lemur/tests/conftest.py | 22 +-- lemur/tests/test_accounts.py | 28 ++-- lemur/tests/test_domains.py | 28 ++-- lemur/tests/test_roles.py | 311 +++++++++++++++++++++++++++++++++++ 5 files changed, 346 insertions(+), 45 deletions(-) create mode 100644 lemur/tests/test_roles.py diff --git a/lemur/certificates/views.py b/lemur/certificates/views.py index 3de200d5..4bc317f1 100644 --- a/lemur/certificates/views.py +++ b/lemur/certificates/views.py @@ -90,7 +90,7 @@ def private_key_str(value, name): :return: :raise ValueError: """ try: - serialization.load_pem_private_key(str(value), backend=default_backend()) + serialization.load_pem_private_key(str(value), None, backend=default_backend()) except Exception as e: raise ValueError("The parameter '{0}' needs to be a valid RSA private key".format(name)) return value diff --git a/lemur/tests/conftest.py b/lemur/tests/conftest.py index 254a7e27..50bfd144 100644 --- a/lemur/tests/conftest.py +++ b/lemur/tests/conftest.py @@ -35,6 +35,7 @@ def app(): """ app = create_app() app.config['TESTING'] = True + app.config['LEMUR_ENCRYPTION_KEY'] = 'test' ctx = app.app_context() ctx.push() @@ -52,10 +53,12 @@ def db(app, request): _db.app = app + user = user_service.create('user', 'test', 'user@example.com', True, None, []) + admin_role = role_service.create('admin') + admin = user_service.create('admin', 'admin', 'admin@example.com', True, None, [admin_role]) + _db.session.commit() yield _db - _db.drop_all() - @pytest.yield_fixture(scope="function") def session(db, request): @@ -68,21 +71,8 @@ def session(db, request): db.session.rollback() -@pytest.yield_fixture(scope="session") -def default_user(db): - user = user_service.create('user', 'test', 'user@example.com', True, None, []) - yield user - - -@pytest.yield_fixture(scope="session") -def admin_user(db): - admin_role = role_service.create('admin') - admin = user_service.create('admin', 'admin', 'admin@example.com', True, None, [admin_role]) - yield admin - - @pytest.yield_fixture(scope="function") -def client(app): +def client(app, session): with app.test_client() as client: yield client diff --git a/lemur/tests/test_accounts.py b/lemur/tests/test_accounts.py index 08239afb..2712947c 100644 --- a/lemur/tests/test_accounts.py +++ b/lemur/tests/test_accounts.py @@ -40,46 +40,46 @@ def test_account_patch(client): VALID_USER_HEADER_TOKEN = { 'Authorization': 'Basic ' + 'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpYXQiOjE0MzUyMzMzNjksInN1YiI6MSwiZXhwIjoxNTIxNTQ2OTY5fQ.1qCi0Ip7mzKbjNh0tVd3_eJOrae3rNa_9MCVdA4WtQI'} -def test_auth_account_get(client, default_user): +def test_auth_account_get(client): assert client.get(api.url_for(Accounts, account_id=1), headers=VALID_USER_HEADER_TOKEN).status_code == 200 -def test_auth_account_post_(client, default_user): +def test_auth_account_post_(client): assert client.post(api.url_for(Accounts, account_id=1), {}, headers=VALID_USER_HEADER_TOKEN).status_code == 405 -def test_auth_account_put(client, default_user): +def test_auth_account_put(client): assert client.put(api.url_for(Accounts, account_id=1), data={}, headers=VALID_USER_HEADER_TOKEN).status_code == 403 -def test_auth_account_delete(client, default_user): +def test_auth_account_delete(client): assert client.delete(api.url_for(Accounts, account_id=1), headers=VALID_USER_HEADER_TOKEN).status_code == 403 -def test_auth_account_patch(client, default_user): +def test_auth_account_patch(client): assert client.patch(api.url_for(Accounts, account_id=1), {}, headers=VALID_USER_HEADER_TOKEN).status_code == 405 VALID_ADMIN_HEADER_TOKEN = { 'Authorization': 'Basic ' + 'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpYXQiOjE0MzUyNTAyMTgsInN1YiI6MiwiZXhwIjoxNTIxNTYzODE4fQ.6mbq4-Ro6K5MmuNiTJBB153RDhlM5LGJBjI7GBKkfqA'} -def test_admin_account_get(client, admin_user): +def test_admin_account_get(client): assert client.get(api.url_for(Accounts, account_id=1), headers=VALID_ADMIN_HEADER_TOKEN).status_code == 200 -def test_admin_account_post(client, admin_user): +def test_admin_account_post(client): assert client.post(api.url_for(Accounts, account_id=1), {}, headers=VALID_ADMIN_HEADER_TOKEN).status_code == 405 -def test_admin_account_put(client, admin_user): +def test_admin_account_put(client): assert client.put(api.url_for(Accounts, account_id=1), data={}, headers=VALID_ADMIN_HEADER_TOKEN).status_code == 400 -def test_admin_account_delete(client, admin_user): +def test_admin_account_delete(client): assert client.delete(api.url_for(Accounts, account_id=1), headers=VALID_ADMIN_HEADER_TOKEN).status_code == 500 -def test_admin_account_patch(client, admin_user): +def test_admin_account_patch(client): assert client.patch(api.url_for(Accounts, account_id=1), {}, headers=VALID_ADMIN_HEADER_TOKEN).status_code == 405 @@ -103,21 +103,21 @@ def test_accounts_patch(client): assert client.patch(api.url_for(AccountsList), {}).status_code == 405 -def test_auth_accounts_get(client, default_user): +def test_auth_accounts_get(client): assert client.get(api.url_for(AccountsList), headers=VALID_USER_HEADER_TOKEN).status_code == 200 -def test_auth_accounts_post(client, default_user): +def test_auth_accounts_post(client): assert client.post(api.url_for(AccountsList), {}, headers=VALID_USER_HEADER_TOKEN).status_code == 403 -def test_admin_accounts_get(client, admin_user): +def test_admin_accounts_get(client): resp = client.get(api.url_for(AccountsList), headers=VALID_ADMIN_HEADER_TOKEN) assert resp.status_code == 200 assert resp.json == {'items': [], 'total': 0} -def test_admin_accounts_crud(client, admin_user): +def test_admin_accounts_crud(client): assert client.post(api.url_for(AccountsList), headers=VALID_ADMIN_HEADER_TOKEN).status_code == 400 data = {'accountNumber': 111, 'label': 'test', 'comments': 'test'} resp = client.post(api.url_for(AccountsList), data=dumps(data), content_type='application/json', headers=VALID_ADMIN_HEADER_TOKEN) diff --git a/lemur/tests/test_domains.py b/lemur/tests/test_domains.py index 2f9b1a7f..9d57f142 100644 --- a/lemur/tests/test_domains.py +++ b/lemur/tests/test_domains.py @@ -23,46 +23,46 @@ def test_domain_patch(client): VALID_USER_HEADER_TOKEN = { 'Authorization': 'Basic ' + 'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpYXQiOjE0MzUyMzMzNjksInN1YiI6MSwiZXhwIjoxNTIxNTQ2OTY5fQ.1qCi0Ip7mzKbjNh0tVd3_eJOrae3rNa_9MCVdA4WtQI'} -def test_auth_domain_get(client, default_user): +def test_auth_domain_get(client): assert client.get(api.url_for(Domains, domain_id=1), headers=VALID_USER_HEADER_TOKEN).status_code == 200 -def test_auth_domain_post_(client, default_user): +def test_auth_domain_post_(client): assert client.post(api.url_for(Domains, domain_id=1), {}, headers=VALID_USER_HEADER_TOKEN).status_code == 405 -def test_auth_domain_put(client, default_user): +def test_auth_domain_put(client): assert client.put(api.url_for(Domains, domain_id=1), data={}, headers=VALID_USER_HEADER_TOKEN).status_code == 405 -def test_auth_domain_delete(client, default_user): +def test_auth_domain_delete(client): assert client.delete(api.url_for(Domains, domain_id=1), headers=VALID_USER_HEADER_TOKEN).status_code == 405 -def test_auth_domain_patch(client, default_user): +def test_auth_domain_patch(client): assert client.patch(api.url_for(Domains, domain_id=1), {}, headers=VALID_USER_HEADER_TOKEN).status_code == 405 VALID_ADMIN_HEADER_TOKEN = { 'Authorization': 'Basic ' + 'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpYXQiOjE0MzUyNTAyMTgsInN1YiI6MiwiZXhwIjoxNTIxNTYzODE4fQ.6mbq4-Ro6K5MmuNiTJBB153RDhlM5LGJBjI7GBKkfqA'} -def test_admin_domain_get(client, admin_user): +def test_admin_domain_get(client): assert client.get(api.url_for(Domains, domain_id=1), headers=VALID_ADMIN_HEADER_TOKEN).status_code == 200 -def test_admin_domain_post(client, admin_user): +def test_admin_domain_post(client): assert client.post(api.url_for(Domains, domain_id=1), {}, headers=VALID_ADMIN_HEADER_TOKEN).status_code == 405 -def test_admin_domain_put(client, admin_user): +def test_admin_domain_put(client): assert client.put(api.url_for(Domains, domain_id=1), data={}, headers=VALID_ADMIN_HEADER_TOKEN).status_code == 405 -def test_admin_domain_delete(client, admin_user): +def test_admin_domain_delete(client): assert client.delete(api.url_for(Domains, domain_id=1), headers=VALID_ADMIN_HEADER_TOKEN).status_code == 405 -def test_admin_domain_patch(client, admin_user): +def test_admin_domain_patch(client): assert client.patch(api.url_for(Domains, domain_id=1), {}, headers=VALID_ADMIN_HEADER_TOKEN).status_code == 405 @@ -86,11 +86,11 @@ def test_domains_patch(client): assert client.patch(api.url_for(DomainsList), {}).status_code == 405 -def test_auth_domains_get(client, default_user): +def test_auth_domains_get(client): assert client.get(api.url_for(DomainsList), headers=VALID_USER_HEADER_TOKEN).status_code == 200 -def test_admin_domains_get(client, admin_user): +def test_admin_domains_get(client): resp = client.get(api.url_for(DomainsList), headers=VALID_ADMIN_HEADER_TOKEN) assert resp.status_code == 200 assert resp.json == {'items': [], 'total': 0} @@ -116,8 +116,8 @@ def test_certificate_domains_patch(client): assert client.patch(api.url_for(CertificateDomains, certificate_id=1), {}).status_code == 405 -def test_auth_certificate_domains_get(client, default_user): +def test_auth_certificate_domains_get(client): assert client.get(api.url_for(CertificateDomains, certificate_id=1), headers=VALID_USER_HEADER_TOKEN).status_code == 200 -def test_admin_certificate_domains_get(client, admin_user): +def test_admin_certificate_domains_get(client): assert client.get(api.url_for(CertificateDomains, certificate_id=1), headers=VALID_ADMIN_HEADER_TOKEN).status_code == 200 diff --git a/lemur/tests/test_roles.py b/lemur/tests/test_roles.py new file mode 100644 index 00000000..b40e0772 --- /dev/null +++ b/lemur/tests/test_roles.py @@ -0,0 +1,311 @@ +from json import dumps +from lemur.roles.service import * +from lemur.roles.views import * + + +def test_crud(session): + role = create('role1') + assert role.id > 0 + + role = update(role.id, 'role_new', None, []) + assert role.name == 'role_new' + delete(role.id) + assert get(role.id) == None + + +def test_role_get(client): + assert client.get(api.url_for(Roles, role_id=1)).status_code == 401 + + +def test_role_post(client): + assert client.post(api.url_for(Roles, role_id=1), {}).status_code == 405 + + +def test_role_put(client): + assert client.put(api.url_for(Roles, role_id=1), {}).status_code == 401 + + +def test_role_delete(client): + assert client.delete(api.url_for(Roles, role_id=1)).status_code == 401 + + +def test_role_patch(client): + assert client.patch(api.url_for(Roles, role_id=1), {}).status_code == 405 + + +def test_roles_get(client): + assert client.get(api.url_for(RolesList)).status_code == 401 + + +def test_roles_post(client): + assert client.post(api.url_for(RolesList), {}).status_code == 401 + + +def test_roles_put(client): + assert client.put(api.url_for(RolesList), {}).status_code == 405 + + +def test_roles_delete(client): + assert client.delete(api.url_for(RolesList)).status_code == 405 + + +def test_roles_patch(client): + assert client.patch(api.url_for(RolesList), {}).status_code == 405 + + +def test_role_credentials_get(client): + assert client.get(api.url_for(RoleViewCredentials, role_id=1)).status_code == 401 + + +def test_role_credentials_post(client): + assert client.post(api.url_for(RoleViewCredentials, role_id=1), {}).status_code == 405 + + +def test_role_credentials_put(client): + assert client.put(api.url_for(RoleViewCredentials, role_id=1), {}).status_code == 405 + + +def test_role_credentials_delete(client): + assert client.delete(api.url_for(RoleViewCredentials, role_id=1)).status_code == 405 + + +def test_role_credentials_patch(client): + assert client.patch(api.url_for(RoleViewCredentials, role_id=1), {}).status_code == 405 + + +def test_user_roles_get(client): + assert client.get(api.url_for(UserRolesList, user_id=1)).status_code == 401 + + +def test_user_roles_post(client): + assert client.post(api.url_for(UserRolesList, user_id=1), {}).status_code == 405 + + +def test_user_roles_put(client): + assert client.put(api.url_for(UserRolesList, user_id=1), {}).status_code == 405 + + +def test_user_roles_delete(client): + assert client.delete(api.url_for(UserRolesList, user_id=1)).status_code == 405 + + +def test_user_roles_patch(client): + assert client.patch(api.url_for(UserRolesList, user_id=1), {}).status_code == 405 + + +def test_authority_roles_get(client): + assert client.get(api.url_for(AuthorityRolesList, authority_id=1)).status_code == 401 + + +def test_authority_roles_post(client): + assert client.post(api.url_for(AuthorityRolesList, authority_id=1), {}).status_code == 405 + + +def test_authority_roles_put(client): + assert client.put(api.url_for(AuthorityRolesList, authority_id=1), {}).status_code == 405 + + +def test_authority_roles_delete(client): + assert client.delete(api.url_for(AuthorityRolesList, authority_id=1)).status_code == 405 + + +def test_authority_roles_patch(client): + assert client.patch(api.url_for(AuthorityRolesList, authority_id=1), {}).status_code == 405 + + +VALID_USER_HEADER_TOKEN = { + 'Authorization': 'Basic ' + 'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpYXQiOjE0MzUyMzMzNjksInN1YiI6MSwiZXhwIjoxNTIxNTQ2OTY5fQ.1qCi0Ip7mzKbjNh0tVd3_eJOrae3rNa_9MCVdA4WtQI'} + + +def test_auth_role_get(client): + assert client.get(api.url_for(Roles, role_id=1), headers=VALID_USER_HEADER_TOKEN).status_code == 400 + + +def test_auth_role_post_(client): + assert client.post(api.url_for(Roles, role_id=1), {}, headers=VALID_USER_HEADER_TOKEN).status_code == 405 + + +def test_auth_role_put(client): + assert client.put(api.url_for(Roles, role_id=1), data={}, headers=VALID_USER_HEADER_TOKEN).status_code == 400 + + +def test_auth_role_delete(client): + assert client.delete(api.url_for(Roles, role_id=1), headers=VALID_USER_HEADER_TOKEN).status_code == 403 + + +def test_auth_role_patch(client): + assert client.patch(api.url_for(Roles, role_id=1), {}, headers=VALID_USER_HEADER_TOKEN).status_code == 405 + + +def test_auth_roles_get(client): + assert client.get(api.url_for(RolesList), headers=VALID_USER_HEADER_TOKEN).status_code == 200 + + +def test_auth_roles_post(client): + assert client.post(api.url_for(RolesList), {}, headers=VALID_USER_HEADER_TOKEN).status_code == 403 + + +def test_auth_role_credentials_get(client): + assert client.get(api.url_for(RoleViewCredentials, role_id=1), headers=VALID_USER_HEADER_TOKEN).status_code == 403 + + +def test_auth_role_credentials_post(client): + assert client.post(api.url_for(RoleViewCredentials, role_id=1), {}, headers=VALID_USER_HEADER_TOKEN).status_code == 405 + + +def test_auth_role_credentials_put(client): + assert client.put(api.url_for(RoleViewCredentials, role_id=1), {}, headers=VALID_USER_HEADER_TOKEN).status_code == 405 + + +def test_auth_role_credentials_delete(client): + assert client.delete(api.url_for(RoleViewCredentials, role_id=1), headers=VALID_USER_HEADER_TOKEN).status_code == 405 + + +def test_auth_role_credentials_patch(client): + assert client.patch(api.url_for(RoleViewCredentials, role_id=1), {}, headers=VALID_USER_HEADER_TOKEN).status_code == 405 + + +def test_auth_user_roles_get(client): + assert client.get(api.url_for(UserRolesList, user_id=1), headers=VALID_USER_HEADER_TOKEN).status_code == 200 + + +def test_auth_user_roles_post(client): + assert client.post(api.url_for(UserRolesList, user_id=1), {}, headers=VALID_USER_HEADER_TOKEN).status_code == 405 + + +def test_auth_user_roles_put(client): + assert client.put(api.url_for(UserRolesList, user_id=1), {}, headers=VALID_USER_HEADER_TOKEN).status_code == 405 + + +def test_auth_user_roles_delete(client): + assert client.delete(api.url_for(UserRolesList, user_id=1), headers=VALID_USER_HEADER_TOKEN).status_code == 405 + + +def test_auth_user_roles_patch(client): + assert client.patch(api.url_for(UserRolesList, user_id=1), {}, headers=VALID_USER_HEADER_TOKEN).status_code == 405 + + +def test_auth_authority_roles_get(client): + assert client.get(api.url_for(AuthorityRolesList, authority_id=1), headers=VALID_USER_HEADER_TOKEN).status_code == 200 + + +def test_auth_authority_roles_post(client): + assert client.post(api.url_for(AuthorityRolesList, authority_id=1), {}, headers=VALID_USER_HEADER_TOKEN).status_code == 405 + + +def test_auth_authority_roles_put(client): + assert client.put(api.url_for(AuthorityRolesList, authority_id=1), {}, headers=VALID_USER_HEADER_TOKEN).status_code == 405 + + +def test_auth_authority_roles_delete(client): + assert client.delete(api.url_for(AuthorityRolesList, authority_id=1), headers=VALID_USER_HEADER_TOKEN).status_code == 405 + + +def test_auth_authority_roles_patch(client): + assert client.patch(api.url_for(AuthorityRolesList, authority_id=1), {}, headers=VALID_USER_HEADER_TOKEN).status_code == 405 + + +VALID_ADMIN_HEADER_TOKEN = { + 'Authorization': 'Basic ' + 'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpYXQiOjE0MzUyNTAyMTgsInN1YiI6MiwiZXhwIjoxNTIxNTYzODE4fQ.6mbq4-Ro6K5MmuNiTJBB153RDhlM5LGJBjI7GBKkfqA'} + + +def test_admin_role_get(client): + assert client.get(api.url_for(Roles, role_id=1), headers=VALID_ADMIN_HEADER_TOKEN).status_code == 200 + + +def test_admin_role_post(client): + assert client.post(api.url_for(Roles, role_id=1), {}, headers=VALID_ADMIN_HEADER_TOKEN).status_code == 405 + + +def test_admin_role_put(client): + assert client.put(api.url_for(Roles, role_id=1), data={}, headers=VALID_ADMIN_HEADER_TOKEN).status_code == 400 + + +def test_admin_role_delete(client): + assert client.delete(api.url_for(Roles, role_id=1), headers=VALID_ADMIN_HEADER_TOKEN).status_code == 200 + + +def test_admin_role_patch(client): + assert client.patch(api.url_for(Roles, role_id=1), {}, headers=VALID_ADMIN_HEADER_TOKEN).status_code == 405 + + +def test_admin_roles_get(client): + resp = client.get(api.url_for(RolesList), headers=VALID_ADMIN_HEADER_TOKEN) + assert resp.status_code == 200 + assert resp.json['total'] > 0 + + +def test_admin_role_credentials_get(client): + assert client.get(api.url_for(RolesList), headers=VALID_ADMIN_HEADER_TOKEN).status_code == 200 + + +def test_admin_role_credentials_post(client): + assert client.post(api.url_for(RolesList), {}, headers=VALID_ADMIN_HEADER_TOKEN).status_code == 400 + + +def test_admin_role_credentials_put(client): + assert client.put(api.url_for(RolesList), {}, headers=VALID_ADMIN_HEADER_TOKEN).status_code == 405 + + +def test_admin_role_credentials_delete(client): + assert client.delete(api.url_for(RolesList), headers=VALID_ADMIN_HEADER_TOKEN).status_code == 405 + + +def test_admin_role_credentials_patch(client): + assert client.patch(api.url_for(RolesList), {}, headers=VALID_ADMIN_HEADER_TOKEN).status_code == 405 + + +def test_admin_user_roles_get(client): + assert client.get(api.url_for(UserRolesList, user_id=1), headers=VALID_ADMIN_HEADER_TOKEN).status_code == 200 + + +def test_admin_user_roles_post(client): + assert client.post(api.url_for(UserRolesList, user_id=1), {}, headers=VALID_ADMIN_HEADER_TOKEN).status_code == 405 + + +def test_admin_user_roles_put(client): + assert client.put(api.url_for(UserRolesList, user_id=1), {}, headers=VALID_ADMIN_HEADER_TOKEN).status_code == 405 + + +def test_admin_user_roles_delete(client): + assert client.delete(api.url_for(UserRolesList, user_id=1), headers=VALID_ADMIN_HEADER_TOKEN).status_code == 405 + + +def test_admin_user_roles_patch(client): + assert client.patch(api.url_for(UserRolesList, user_id=1), {}, headers=VALID_ADMIN_HEADER_TOKEN).status_code == 405 + + +def test_admin_authority_roles_get(client): + assert client.get(api.url_for(AuthorityRolesList, authority_id=1), headers=VALID_ADMIN_HEADER_TOKEN).status_code == 200 + + +def test_admin_authority_roles_post(client): + assert client.post(api.url_for(AuthorityRolesList, authority_id=1), {}, headers=VALID_ADMIN_HEADER_TOKEN).status_code == 405 + + +def test_admin_authority_roles_put(client): + assert client.put(api.url_for(AuthorityRolesList, authority_id=1), {}, headers=VALID_ADMIN_HEADER_TOKEN).status_code == 405 + + +def test_admin_authority_roles_delete(client): + assert client.delete(api.url_for(AuthorityRolesList, authority_id=1), headers=VALID_ADMIN_HEADER_TOKEN).status_code == 405 + + +def test_admin_authority_roles_patch(client): + assert client.patch(api.url_for(AuthorityRolesList, authority_id=1), {}, headers=VALID_ADMIN_HEADER_TOKEN).status_code == 405 + + +def test_admin_roles_crud(client): + assert client.post(api.url_for(RolesList), data={}, headers=VALID_ADMIN_HEADER_TOKEN).status_code == 400 + data = {'name': 'role', 'description': 'test'} + resp = client.post(api.url_for(RolesList), data=dumps(data), content_type='application/json', headers=VALID_ADMIN_HEADER_TOKEN) + assert resp.status_code == 200 + role_id = resp.json['id'] + assert client.get(api.url_for(Roles, role_id=role_id), headers=VALID_ADMIN_HEADER_TOKEN).status_code == 200 + resp = client.get(api.url_for(RolesList), headers=VALID_ADMIN_HEADER_TOKEN) + assert resp.status_code == 200 + assert resp.json['total'] == 2 + assert client.delete(api.url_for(Roles, role_id=role_id), headers=VALID_ADMIN_HEADER_TOKEN).status_code == 200 + resp = client.get(api.url_for(RolesList), headers=VALID_ADMIN_HEADER_TOKEN) + assert resp.status_code == 200 + assert resp.json['total'] == 1 From c8cbc82062b4df0d06eac0d6300cd5770e2a8673 Mon Sep 17 00:00:00 2001 From: Kevin Glisson Date: Fri, 26 Jun 2015 16:16:13 -0700 Subject: [PATCH 09/13] Starting add certificate tests --- lemur/tests/certs.py | 183 ++++++++++++++ lemur/tests/test_certificates.py | 416 +++++++++++++++++++++++++------ 2 files changed, 517 insertions(+), 82 deletions(-) create mode 100644 lemur/tests/certs.py diff --git a/lemur/tests/certs.py b/lemur/tests/certs.py new file mode 100644 index 00000000..29e62705 --- /dev/null +++ b/lemur/tests/certs.py @@ -0,0 +1,183 @@ +from cryptography import x509 +from cryptography.hazmat.backends import default_backend + +INTERNAL_VALID_LONG_STR = """ +-----BEGIN CERTIFICATE----- +MIID1zCCAr+gAwIBAgIBATANBgkqhkiG9w0BAQsFADCBjDELMAkGA1UEBhMCVVMx +CzAJBgNVBAgMAkNBMRAwDgYDVQQHDAdBIHBsYWNlMRcwFQYDVQQDDA5sb25nLmxp +dmVkLmNvbTEQMA4GA1UECgwHRXhhbXBsZTETMBEGA1UECwwKT3BlcmF0aW9uczEe +MBwGCSqGSIb3DQEJARYPamltQGV4YW1wbGUuY29tMB4XDTE1MDYyNjIwMzA1MloX +DTQwMDEwMTIwMzA1MlowgYwxCzAJBgNVBAYTAlVTMQswCQYDVQQIDAJDQTEQMA4G +A1UEBwwHQSBwbGFjZTEXMBUGA1UEAwwObG9uZy5saXZlZC5jb20xEDAOBgNVBAoM +B0V4YW1wbGUxEzARBgNVBAsMCk9wZXJhdGlvbnMxHjAcBgkqhkiG9w0BCQEWD2pp +bUBleGFtcGxlLmNvbTCCASIwDQYJKoZIhvcNAQEBBQADggEPADCCAQoCggEBAKeg +sqb0HI10i2eRSx3pLeA7JoGdUpud7hy3bGws/1HgOSpRMin9Y65DEpVq2Ia9oir7 +XOJLpSTEIulnBkgDHNOsdKVYHDR6k0gUisnIKSl2C3IgKHpCouwiOvvVPwd3PExg +17+d7KLBIu8LpG28wkXKFU8vSz5i7H4i/XCEChnKJ4oGJuGAJJM4Zn022U156pco +97aEAc9ZXR/1dm2njr4XxCXmrnKCYTElfRhLkmxtv+mCi6eV//5d12z7mY3dTBkQ +EG2xpb5DQ+ITQ8BzsKcPX80rz8rTzgYFwaV3gUg38+bgka/JGJq8HgBuNnHv5CeT +1T/EoZTRYW2oPfOgQK8CAwEAAaNCMEAwDwYDVR0TAQH/BAUwAwEB/zAOBgNVHQ8B +Af8EBAMCAQYwHQYDVR0OBBYEFIuDY73dQIhj2nnd4DG2SvseHVVaMA0GCSqGSIb3 +DQEBCwUAA4IBAQBk/WwfoWYdS0M8rz5tJda/cMdYFSugUbTn6JJdmHuw6RmiKzKG +8NzfSqBR6m8MWdSTuAZ/chsUZH9YEIjS9tAH9/FfUFBrsUE7TXaUgpNBm4DBLLfl +fj5xDmEyj17JPN/C36amQ9eU5BNesdCx9EkdWLyVJaM50HFRo71W0/FrpKZyKK68 +XPhd1z9w/xgfCfYhe7PjEmrmNPN5Tgk5TyXW+UUhOepDctAv2DBetptcx+gHrtW+ +Ygk1wptlt/tg7uUmstmXZA4vTPx83f4P3KSS3XHIYFIyGFWUDs23C20K6mmW1iXa +h0S8LN4iv/+vNFPNiM1z9X/SZgfbwZXrLsSi +-----END CERTIFICATE----- +""" +INTERNAL_VALID_LONG_CERT = x509.load_pem_x509_certificate(INTERNAL_VALID_LONG_STR, default_backend()) + + +INTERNAL_INVALID_STR = """ +-----BEGIN CERTIFICATE----- +MIIEFTCCAv2gAwIBAgICA+gwDQYJKoZIhvcNAQELBQAwgYwxCzAJBgNVBAYTAlVT +MQswCQYDVQQIDAJDQTEQMA4GA1UEBwwHQSBwbGFjZTEXMBUGA1UEAwwObG9uZy5s +aXZlZC5jb20xEDAOBgNVBAoMB0V4YW1wbGUxEzARBgNVBAsMCk9wZXJhdGlvbnMx +HjAcBgkqhkiG9w0BCQEWD2ppbUBleGFtcGxlLmNvbTAeFw0xNTA2MjYyMDM2NDha +Fw0xNTA2MjcyMDM2NDhaMGkxCzAJBgNVBAYTAlVTMQswCQYDVQQIEwJDQTEQMA4G +A1UEBxMHQSBwbGFjZTEQMA4GA1UEChMHRXhhbXBsZTETMBEGA1UECxMKT3BlcmF0 +aW9uczEUMBIGA1UEAxMLZXhwaXJlZC5jb20wggEiMA0GCSqGSIb3DQEBAQUAA4IB +DwAwggEKAoIBAQCcSMzRxB6+UONPqYMy1Ojw3Wi8DIpt9USnSR60I8LiEuRK2ayr +0RMjLJ6sBEgy/hISEqpLgTsciDpxwaTC/WNrkT9vaMcwfiG3V0Red8zbKHQzC+Ty +cLRg9wbC3v613kaIZCQCoE7Aouru9WbVPmuRoasfztrgksWmH9infQbL4TDcmcxo +qGaMn4ajQTVAD63CKnut+CULZIMBREBVlSTLiOO7qZdTrd+vjtLWvdXVPcWLSBrd +Vpu3YnhqqTte+DMzQHwY7A2s3fu4Cg4H4npzcR+0H1H/B5z64kxqZq9FWGIcZcz7 +0xXeHN9UUKPDSTgsjtIzKTaIOe9eML3jGSU7AgMBAAGjgaIwgZ8wDAYDVR0TAQH/ +BAIwADAOBgNVHQ8BAf8EBAMCBaAwFgYDVR0lAQH/BAwwCgYIKwYBBQUHAwEwHQYD +VR0OBBYEFKwBYaxCLxK0csmV319rbRdqDllWMEgGA1UdHwRBMD8wPaA7oDmGN2h0 +dHA6Ly90ZXN0LmNsb3VkY2EuY3JsLm5ldGZsaXguY29tL2xvbmdsaXZlZENBL2Ny +bC5wZW0wDQYJKoZIhvcNAQELBQADggEBADFngqsMsGnNBWknphLDvnoWu5MTrpsD +AgN0bktv5ACKRWhi/qtCmkEf6TieecRMwpQNMpE50dko3LGGdWlZRCI8wdH/zrw2 +8MnOeCBxuS1nB4muUGjbf4LIbtuwoHSESrkfmuKjGGK9JTszLL6Hb9YnoFefeg8L +T7W3s8mm5bVHhQM7J9tV6dz/sVDmpOSuzL8oZkqeKP+lWU6ytaohFFpbdzaxWipU +3+GobVe4vRqoF1kwuhQ8YbMbXWDK6zlrT9pjFABcQ/b5nveiW93JDQUbjmVccx/u +kP+oGWtHvhteUAe8Gloo5NchZJ0/BqlYRCD5aAHcmbXRsDid9mO4ADU= +-----END CERTIFICATE----- +""" +INTERNAL_INVALID_CERT = x509.load_pem_x509_certificate(INTERNAL_INVALID_STR, default_backend()) + + +INTERNAL_VALID_SAN_STR = """ +-----BEGIN CERTIFICATE----- +MIIESjCCAzKgAwIBAgICA+kwDQYJKoZIhvcNAQELBQAwgYwxCzAJBgNVBAYTAlVT +MQswCQYDVQQIDAJDQTEQMA4GA1UEBwwHQSBwbGFjZTEXMBUGA1UEAwwObG9uZy5s +aXZlZC5jb20xEDAOBgNVBAoMB0V4YW1wbGUxEzARBgNVBAsMCk9wZXJhdGlvbnMx +HjAcBgkqhkiG9w0BCQEWD2ppbUBleGFtcGxlLmNvbTAeFw0xNTA2MjYyMDU5MDZa +Fw0yMDAxMDEyMDU5MDZaMG0xCzAJBgNVBAYTAlVTMQswCQYDVQQIEwJDQTEQMA4G +A1UEBxMHQSBwbGFjZTEQMA4GA1UEChMHRXhhbXBsZTETMBEGA1UECxMKT3BlcmF0 +aW9uczEYMBYGA1UEAxMPc2FuLmV4YW1wbGUuY29tMIIBIjANBgkqhkiG9w0BAQEF +AAOCAQ8AMIIBCgKCAQEA2Nq5zFh2WiqtNIPssdSwQ9/00j370VcKPlOATLqK24Q+ +dr2hWP1WlZJ0NOoPefhoIysccs2tRivosTpViRAzNJXigBHhxe8ger0QhVW6AXIp +ov327N689TgY4GzRrwqavjz8cqussIcnEUr4NLLsU5AvXE7e3WxYkkskzO497UOI +uCBtWdCXZ4cAGhtVkkA5uQHfPsLmgRVoUmdMDt5ZmA8HhLX4X6vkT3oGIhdGCw6T +W+Cu7PfYlSaggSBbBniU0YKTFLfGLkYFZN/b6bxzvt6CTJLoVFAYXyLJwUvd3EAm +u23HgUflIyZNG3xVPml/lah0OIX7RtSigXUSLm7lYwIDAQABo4HTMIHQMAwGA1Ud +EwEB/wQCMAAwDgYDVR0PAQH/BAQDAgWgMBYGA1UdJQEB/wQMMAoGCCsGAQUFBwMB +MC8GA1UdEQQoMCaCEWV4YW1wbGUyLmxvbmcuY29tghFleGFtcGxlMy5sb25nLmNv +bTAdBgNVHQ4EFgQUiiIyclcBIfJ5PE3OCcTXwzJAM+0wSAYDVR0fBEEwPzA9oDug +OYY3aHR0cDovL3Rlc3QuY2xvdWRjYS5jcmwubmV0ZmxpeC5jb20vbG9uZ2xpdmVk +Q0EvY3JsLnBlbTANBgkqhkiG9w0BAQsFAAOCAQEAgcTioq70B/aPWovNTy+84wLw +VX1q6bCdH3FJwAv2rc28CHp5mCGdR6JqfT/H/CbfRwT1Yh/5i7T5kEVyz+Dp3+p+ +AJ2xauHrTvWn0QHQYbUWICwkuZ7VTI9nd0Fry1FQI1EeKiCmyrzNljiN2l+GZw6i +NJUpVNtwRyWRzB+yIx2E9wyydqDFH+sROuQok7EgzlQileitPrF4RrkfIhQp2/ki +YBrY/duF15YpoMKAlFhDBh6R9/nb5kI2n3pY6I5h6LEYfLStazXbIu61M8zu9TM/ ++t5Oz6rmcjohL22+sEmmRz86dQZlrBBUxX0kCQj6OAFB4awtRd4fKtkCkZhvhQ== +-----END CERTIFICATE----- +""" +INTERNAL_VALID_SAN_CERT = x509.load_pem_x509_certificate(INTERNAL_VALID_SAN_STR, default_backend()) + + +INTERNAL_VALID_WILDCARD_STR = """ +-----BEGIN CERTIFICATE----- +MIIEHDCCAwSgAwIBAgICA+owDQYJKoZIhvcNAQELBQAwgYwxCzAJBgNVBAYTAlVT +MQswCQYDVQQIDAJDQTEQMA4GA1UEBwwHQSBwbGFjZTEXMBUGA1UEAwwObG9uZy5s +aXZlZC5jb20xEDAOBgNVBAoMB0V4YW1wbGUxEzARBgNVBAsMCk9wZXJhdGlvbnMx +HjAcBgkqhkiG9w0BCQEWD2ppbUBleGFtcGxlLmNvbTAeFw0xNTA2MjYyMTEzMTBa +Fw0yMDAxMDEyMTEzMTBaMHAxCzAJBgNVBAYTAlVTMQswCQYDVQQIEwJDQTEQMA4G +A1UEBxMHQSBwbGFjZTEQMA4GA1UEChMHRXhhbXBsZTETMBEGA1UECxMKT3BlcmF0 +aW9uczEbMBkGA1UEAxQSKi50ZXN0LmV4YW1wbGUuY29tMIIBIjANBgkqhkiG9w0B +AQEFAAOCAQ8AMIIBCgKCAQEA0T7OEY9FxMIdhe1CwLc+TbDeSfDN6KRHlp0I9MwK +3Pre7A1+1vmRzLiS5qAdOh3Oexelmgdkn/fZUFI+IqEVJwmeUiq13Kib3BFnVtbB +N1RdT7rZF24Bqwygf1DHAekEBYdvu4dGD/gYKsLYsSMD7g6glUuhTbgR871updcV +USYJ801y640CcHjai8UCLxpqtkP/Alob+/KDczUHbhdxYgmH34aQgxC8zg+uzuq6 +bIqUAc6SctI+6ArXOqri7wSMgZUnogpF4R5QbCnlDfSzNcNxJFtGp8cy7CNWebMd +IWgBYwee8i8S6Q90B2QUFD9EGG2pEZldpudTxWUpq0tWmwIDAQABo4GiMIGfMAwG +A1UdEwEB/wQCMAAwDgYDVR0PAQH/BAQDAgWgMBYGA1UdJQEB/wQMMAoGCCsGAQUF +BwMBMB0GA1UdDgQWBBTH2KIECrqPHMbsVysGv7ggkYYZGDBIBgNVHR8EQTA/MD2g +O6A5hjdodHRwOi8vdGVzdC5jbG91ZGNhLmNybC5uZXRmbGl4LmNvbS9sb25nbGl2 +ZWRDQS9jcmwucGVtMA0GCSqGSIb3DQEBCwUAA4IBAQBjjfur2B6BcdIQIouwhXGk +IFE5gUYMK5S8Crf/lpMxwHdWK8QM1BpJu9gIo6VoM8uFVa8qlY8LN0SyNyWw+qU5 +Jc8X/qCeeJwXEyXY3dIYRT/1aj7FCc7EFn1j6pcHPD6/0M2z0Zmj+1rWNBJdcYor +pCy27OgRoJKZ6YhEYekzwIPeFPL6irIN9xKPnfH0b2cnYa/g56DyGmyKH2Kkhz0A +UGniiUh4bAUuppbtSIvUTsRsJuPYOqHC3h8791JZ/3Sr5uB7QbCdz9K14c9zi6Z1 +S0Xb3ZauZJQI7OdHeUPDRVq+8hcG77sopN9pEYrIH08oxvLX2US3GqrowjOxthRa +-----END CERTIFICATE----- +""" +INTERNAL_VALID_WILDCARD_CERT = x509.load_pem_x509_certificate(INTERNAL_VALID_WILDCARD_STR, default_backend()) + + +EXTERNAL_VALID_STR = """ +-----BEGIN CERTIFICATE----- +MIIFHzCCBAegAwIBAgIQGFWCciDWzbOej/TbAJN0WzANBgkqhkiG9w0BAQsFADCB +pDELMAkGA1UEBhMCVVMxHTAbBgNVBAoTFFN5bWFudGVjIENvcnBvcmF0aW9uMR8w +HQYDVQQLExZGT1IgVEVTVCBQVVJQT1NFUyBPTkxZMR8wHQYDVQQLExZTeW1hbnRl +YyBUcnVzdCBOZXR3b3JrMTQwMgYDVQQDEytTeW1hbnRlYyBDbGFzcyAzIFNlY3Vy +ZSBTZXJ2ZXIgVEVTVCBDQSAtIEc0MB4XDTE1MDYyNDAwMDAwMFoXDTE1MDYyNTIz +NTk1OVowgYMxCzAJBgNVBAYTAlVTMRMwEQYDVQQIDApDQUxJRk9STklBMRIwEAYD +VQQHDAlMb3MgR2F0b3MxFjAUBgNVBAoMDU5ldGZsaXgsIEluYy4xEzARBgNVBAsM +Ck9wZXJhdGlvbnMxHjAcBgNVBAMMFXR0dHQyLm5ldGZsaXh0ZXN0Lm5ldDCCASIw +DQYJKoZIhvcNAQEBBQADggEPADCCAQoCggEBALwMY/yod9YGLKLCzbbsSUBWm4ZC +DfcgbUNL3JLtZaFCaOeUPLa4YNqty+9ACXBLYPNMm+dgsRHix8N2uwtZrGazHILK +qey96eSTosPsvKFt0KLNpUl8GC/YxA69L128SJgFaaq5Dr2Mp3NP0rt0RIz5luPj +Oae0hkGOS8uS0dySlAmfOw2OsJY3gCw5UHcmpcCHpO2f7uU+tWKmgfz4U/PpQ0kz +WVJno+JhcaXIximtiLreCNF1LpraAjrcZJ+ySJwYaLaYMiJoFkdXUtKJcyqmkbA3 +Splt7N4Hb8c+5aXv225uQYCh0HXQeMyBotlaIrAddP5obrtjxhXBxB4ysEcCAwEA +AaOCAWowggFmMCAGA1UdEQQZMBeCFXR0dHQyLm5ldGZsaXh0ZXN0Lm5ldDAJBgNV +HRMEAjAAMA4GA1UdDwEB/wQEAwIFoDAdBgNVHSUEFjAUBggrBgEFBQcDAQYIKwYB +BQUHAwIwYQYDVR0gBFowWDBWBgZngQwBAgIwTDAjBggrBgEFBQcCARYXaHR0cHM6 +Ly9kLnN5bWNiLmNvbS9jcHMwJQYIKwYBBQUHAgIwGRoXaHR0cHM6Ly9kLnN5bWNi +LmNvbS9ycGEwHwYDVR0jBBgwFoAUNI9UtT8KH1K6nLJl7bqLCGcZ4AQwKwYDVR0f +BCQwIjAgoB6gHIYaaHR0cDovL3NzLnN5bWNiLmNvbS9zcy5jcmwwVwYIKwYBBQUH +AQEESzBJMB8GCCsGAQUFBzABhhNodHRwOi8vc3Muc3ltY2QuY29tMCYGCCsGAQUF +BzAChhpodHRwOi8vc3Muc3ltY2IuY29tL3NzLmNydDANBgkqhkiG9w0BAQsFAAOC +AQEAQuIfyBltvCZ9orqNdS6PUo2PaeUgJzkmdDwbDVd7rTwbZIwGZXZjeKseqMSb +L+r/jN6DWrScVylleiz0N/D0lSUhC609dQKuicGpy3yQaXwhfYZ6duxrW3Ii/+Vz +pFv7DnG3JPZjIXCmVhQVIv/8oaV0bfUF/1mrWRFwZiBILxa7iaycRhjusJEVRtzN +Ot/qkLluHO0wbEHnASV4P9Y5NuR/bliuFS/DeRczofNS78jJuZrGvl2AqS/19Hvm +Bs63gULVCqWygt5KEbv990m/XGuRMaXuHzHCHB4v5LRM30FiFmqCzyD8d+btzW9B +1hZ5s3rj+a6UwvpinKJoPfgkgg== +-----END CERTIFICATE----- +""" +EXTERNAL_CERT = x509.load_pem_x509_certificate(EXTERNAL_VALID_STR, default_backend()) + + +PRIVATE_KEY_STR = """ +-----BEGIN RSA PRIVATE KEY----- +MIIEpAIBAAKCAQEAnEjM0cQevlDjT6mDMtTo8N1ovAyKbfVEp0ketCPC4hLkStms +q9ETIyyerARIMv4SEhKqS4E7HIg6ccGkwv1ja5E/b2jHMH4ht1dEXnfM2yh0Mwvk +8nC0YPcGwt7+td5GiGQkAqBOwKLq7vVm1T5rkaGrH87a4JLFph/Yp30Gy+Ew3JnM +aKhmjJ+Go0E1QA+twip7rfglC2SDAURAVZUky4jju6mXU63fr47S1r3V1T3Fi0ga +3Vabt2J4aqk7XvgzM0B8GOwNrN37uAoOB+J6c3EftB9R/wec+uJMamavRVhiHGXM ++9MV3hzfVFCjw0k4LI7SMyk2iDnvXjC94xklOwIDAQABAoIBAGeykly5MeD70OgB +xPEMfoebkav88jklnekVxk6mz9+rw1i6+CyFLJqRN7NRoApdtOXTBrXUyMEUzxq9 +7zIGaVptZNbqggh2GK8LM20vNnlQbVGVmdMX30fbgNv6lK1eEBTdxVsMvVRqhVIK ++LGTmlJmICKZ4XdTS9v/k4UGm2TZPCt2pvrNzIpT7TIm2QybCbZoOPY8SHx0U8c5 +lmtdqmIsy2JPNSOsOCiJgzQIvkR/fMGWFgNE4fEHsHAfubgpK97TGzwLiFRmlTb+ +QUDaz0YbwhF+5bQjHtaGUGATcg5bvV1UWBUvp+g4gRIfwzG+3PAGacYE/djouAdG +PHbxuCkCgYEAz/LsgMgsaV3arlounviSwc8wG9WcI5gbYw5qwX0P57ZoxS7EBAGu +yYtudurJrU9SfsSV44GL11UzBcAGOeS0btddrcMiNBhc7fY7P/1xaufQ3GjG06/v +kH4gOjzsGSTJliZ709g4J6hnMCxz0O0PS31Qg5cBD8UG8xO7/AV0is0CgYEAwGWy +A6YPinpZuenaxrivM5AcVDWmj7aeC29M63l/GY+O5LQH2PKVESH0vL5PvG3LkrCR +SUbaMKdKR0wnZsJ89z21eZ54ydUgj41bZJczl8drxcY0GSajj6XZXGTUjtoVrWsB +A0kJbjsrpd+8J316Y9iCgpopmbVd965pUHe4ACcCgYAamJlDB1cWytgzQHmB/4zV +mOgwRyvHKacnDir9QD+OhTf1MDwFvylZwamJMBJHRkPozr/U7zaxfcYe0CZ7tRKW +spjapoBzZUJNdRay4nllEO0Xo5b6cCAVvOvmRvBzbs8Rky53M8pK2DEKakUNzaQN +JaPskJ2kJLD02etLGm+DaQKBgQCTI/NNmQ2foUzHw1J+0jWjoJ4ZxOI6XLZoFlnk +aInMuZ7Vx92MjJF2hdqPEpkWiX28FO839EjgFsDW4CXuD+XUjEwi1BCagzWgs8Hm +n0Bk3q3MlnW3mnZSYMtoPvDUw3L6qrAenBfrRrNt6zsRlIQqoiXFzjLsi+luh+Oh +F74P1wKBgQCPQGKLUcfAvjIcZp4ECH0K8sBEmoEf8pceuALZ3H5vneYDzqMDIceo +t5Gpocpt77LJnNiszXSerj/KjX2MflY5xUXeekWowLVTBOK5+CZ8+XBIgBt1hIG3 +XKxcRgm/Va4QMEAnec0qXfdTVJaJiAW0bdKwKRRrrbwcTdNRGibdng== +-----END RSA PRIVATE KEY----- +""" \ No newline at end of file diff --git a/lemur/tests/test_certificates.py b/lemur/tests/test_certificates.py index e24251c5..3201287a 100644 --- a/lemur/tests/test_certificates.py +++ b/lemur/tests/test_certificates.py @@ -1,85 +1,337 @@ -import os -import shutil +import pytest +from lemur.certificates.views import * -import boto -from moto import mock_iam, mock_sts, mock_s3 - -from lemur.tests import LemurTestCase +#def test_crud(session): +# role = create('role1') +# assert role.id > 0 +# +# role = update(role.id, 'role_new', None, []) +# assert role.name == 'role_new' +# delete(role.id) +# assert get(role.id) == None -#class CertificateTestCase(LemurTestCase): -# def test_create_challenge(self): -# from lemur.certificates.service import create_challenge -# self.assertTrue(len(create_challenge()) >= 24) -# -# def test_hash_domains(self): -# from lemur.certificates.service import hash_domains -# h = hash_domains(['netflix.com', 'www.netflix.com', 'movies.netflix.com']) -# self.assertEqual('c9c83253b46c7c1245c100ed3f7045eb', h) -# -# def test_create_csr(self): -# from lemur.certificates.service import create_csr -# from lemur.tests.test_csr import TEST_CSR -# path = create_csr(['netflix.com'], TEST_CSR) -# files = len(os.listdir(path)) -# self.assertEqual(files, 4) -# shutil.rmtree(path) -# -# def test_create_san_csr(self): -# from lemur.certificates.service import create_csr -# from lemur.tests.test_csr import TEST_CSR -# path = create_csr(['netflix.com', 'www.netflix.com'], TEST_CSR) -# files = len(os.listdir(path)) -# self.assertEqual(files, 4) -# shutil.rmtree(path) -# -# def test_create_path(self): -# from lemur.certificates.service import create_path -# path = create_path("blah") -# self.assertIn('blah', path) -# shutil.rmtree(path) -# -# @mock_s3 -# @mock_sts -# @mock_iam -# def test_save_cert(self): -# from lemur.certificates.service import save_cert -# from lemur.common.services.aws.iam import get_all_server_certs -# conn = boto.connect_s3() -# bucket = conn.create_bucket(app.config.get('S3_BUCKET')) -# cert = save_cert(TEST_CERT, TEST_KEY, None, "blah", "blah", [1]) -# count = 0 -# for key in bucket.list(): -# count += 1 -# -# self.assertEqual(count, 4) -# certs = get_all_server_certs('1111') -# self.assertEqual(len(certs), 1) -# -## @mock_s3 -## @mock_sts -## @mock_iam -## def test_upload_cert(self): -## from lemur.certificates.service import upload -## from lemur.common.services.aws.iam import get_all_server_certs -## conn = boto.connect_s3() -## bucket = conn.create_bucket(app.config.get('S3_BUCKET')) -## -## cert_up = {"public_cert": TEST_CERT, "private_key": TEST_KEY, "owner": "test@example.com", "accounts_ids": ['1111']} -## -## cert_name = upload(**cert_up) -## valid_name = 'AHB-dfdsflkj.net-NetflixInc-20140525-20150525' -## self.assertEqual(cert_name, valid_name) -## -## app.logger.debug(cert_name) -## count = 0 -## -## for key in bucket.list(): -## count += 1 -## -## self.assertEqual(count, 2) -## certs = get_all_server_certs('179727101194') -## self.assertEqual(len(certs), 1) -## -## -## +def test_valid_authority(session): + assert 1 == 2 + + +def test_pem_str(): + from lemur.tests.certs import INTERNAL_VALID_LONG_STR + assert pem_str(INTERNAL_VALID_LONG_STR, 'test') == INTERNAL_VALID_LONG_STR + + with pytest.raises(ValueError): + pem_str('sdfsdfds', 'test') + + +def test_private_key_str(): + from lemur.tests.certs import PRIVATE_KEY_STR + assert private_key_str(PRIVATE_KEY_STR, 'test') == PRIVATE_KEY_STR + + with pytest.raises(ValueError): + private_key_str('dfsdfsdf', 'test') + + +def test_create_csr(): + assert 1 == 2 + + +def test_create_path(): + assert 1 == 2 + + +def test_load_ssl_pack(): + assert 1 == 2 + + +def test_delete_ssl_path(): + assert 1 == 2 + + +def test_import_certificate(session): + assert 1 == 2 + + +def test_mint(): + assert 1 == 2 + + +def test_disassociate_aws_account(): + assert 1 == 2 + + +def test_cert_get_cn(): + from lemur.tests.certs import INTERNAL_VALID_LONG_CERT + from lemur.certificates.models import cert_get_cn + + assert cert_get_cn(INTERNAL_VALID_LONG_CERT) == 'long.lived.com' + + +def test_cert_get_domains(): + from lemur.tests.certs import INTERNAL_VALID_SAN_CERT, INTERNAL_VALID_LONG_CERT + from lemur.certificates.models import cert_get_domains + + assert cert_get_domains(INTERNAL_VALID_LONG_CERT) == ['long.lived.com'] + assert cert_get_domains(INTERNAL_VALID_SAN_CERT) == ['example2.long.com', 'example3.long.com', 'san.example.com'] + + +def test_cert_is_san(): + from lemur.tests.certs import INTERNAL_VALID_SAN_CERT, INTERNAL_VALID_LONG_CERT + from lemur.certificates.models import cert_is_san + + assert cert_is_san(INTERNAL_VALID_LONG_CERT) == False + assert cert_is_san(INTERNAL_VALID_SAN_CERT) == True + + +def test_cert_is_wildcard(): + from lemur.tests.certs import INTERNAL_VALID_WILDCARD_CERT, INTERNAL_VALID_LONG_CERT + from lemur.certificates.models import cert_is_wildcard + assert cert_is_wildcard(INTERNAL_VALID_WILDCARD_CERT) == True + assert cert_is_wildcard(INTERNAL_VALID_LONG_CERT) == False + + +def test_cert_get_bitstrength(): + from lemur.tests.certs import INTERNAL_VALID_LONG_CERT + from lemur.certificates.models import cert_get_bitstrength + assert cert_get_bitstrength(INTERNAL_VALID_LONG_CERT) == 2048 + +def test_cert_get_issuer(): + from lemur.tests.certs import INTERNAL_VALID_LONG_CERT + from lemur.certificates.models import cert_get_issuer + assert cert_get_issuer(INTERNAL_VALID_LONG_CERT) == 'Example' + + +def test_get_name_from_arn(): + from lemur.certificates.models import get_name_from_arn + arn = 'arn:aws:iam::11111111:server-certificate/mycertificate' + assert get_name_from_arn(arn) == 'mycertificate' + + +def test_get_account_number(): + from lemur.certificates.models import get_account_number + arn = 'arn:aws:iam::11111111:server-certificate/mycertificate' + assert get_account_number(arn) == '11111111' + + +def test_create_name(): + from lemur.certificates.models import create_name + from datetime import datetime + assert create_name( + 'Example Inc,', + datetime(2015, 5, 7, 0, 0, 0), + datetime(2015, 5, 12, 0, 0, 0), + 'example.com', + False + ) == 'example.com-ExampleInc-20150507-20150512' + assert create_name( + 'Example Inc,', + datetime(2015, 5, 7, 0, 0, 0), + datetime(2015, 5, 12, 0, 0, 0), + 'example.com', + True + ) == 'SAN-example.com-ExampleInc-20150507-20150512' + +def test_is_expired(): + assert 1 == 2 + + +def test_certificate_get(client): + assert client.get(api.url_for(Certificates, certificate_id=1)).status_code == 401 + + +def test_certificate_post(client): + assert client.post(api.url_for(Certificates, certificate_id=1), {}).status_code == 405 + + +def test_certificate_put(client): + assert client.put(api.url_for(Certificates, certificate_id=1), {}).status_code == 401 + + +def test_certificate_delete(client): + assert client.delete(api.url_for(Certificates, certificate_id=1)).status_code == 405 + + +def test_certificate_patch(client): + assert client.patch(api.url_for(Certificates, certificate_id=1), {}).status_code == 405 + + +def test_certificates_get(client): + assert client.get(api.url_for(CertificatesList)).status_code == 401 + + +def test_certificates_post(client): + assert client.post(api.url_for(CertificatesList), {}).status_code == 401 + + +def test_certificates_put(client): + assert client.put(api.url_for(CertificatesList), {}).status_code == 405 + + +def test_certificates_delete(client): + assert client.delete(api.url_for(CertificatesList)).status_code == 405 + + +def test_certificates_patch(client): + assert client.patch(api.url_for(CertificatesList), {}).status_code == 405 + + +def test_certificate_credentials_get(client): + assert client.get(api.url_for(CertificatePrivateKey, certificate_id=1)).status_code == 401 + + +def test_certificate_credentials_post(client): + assert client.post(api.url_for(CertificatePrivateKey, certificate_id=1), {}).status_code == 405 + + +def test_certificate_credentials_put(client): + assert client.put(api.url_for(CertificatePrivateKey, certificate_id=1), {}).status_code == 405 + + +def test_certificate_credentials_delete(client): + assert client.delete(api.url_for(CertificatePrivateKey, certificate_id=1)).status_code == 405 + + +def test_certificate_credentials_patch(client): + assert client.patch(api.url_for(CertificatePrivateKey, certificate_id=1), {}).status_code == 405 + + +def test_certificates_upload_get(client): + assert client.get(api.url_for(CertificatesUpload)).status_code == 405 + + +def test_certificates_upload_post(client): + assert client.post(api.url_for(CertificatesUpload), {}).status_code == 401 + + +def test_certificates_upload_put(client): + assert client.put(api.url_for(CertificatesUpload), {}).status_code == 405 + + +def test_certificates_upload_delete(client): + assert client.delete(api.url_for(CertificatesUpload)).status_code == 405 + + +def test_certificates_upload_patch(client): + assert client.patch(api.url_for(CertificatesUpload), {}).status_code == 405 + + +VALID_USER_HEADER_TOKEN = { + 'Authorization': 'Basic ' + 'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpYXQiOjE0MzUyMzMzNjksInN1YiI6MSwiZXhwIjoxNTIxNTQ2OTY5fQ.1qCi0Ip7mzKbjNh0tVd3_eJOrae3rNa_9MCVdA4WtQI'} + + +def test_auth_certificate_get(client): + assert client.get(api.url_for(Certificates, certificate_id=1), headers=VALID_USER_HEADER_TOKEN).status_code == 200 + + +def test_auth_certificate_post_(client): + assert client.post(api.url_for(Certificates, certificate_id=1), {}, headers=VALID_USER_HEADER_TOKEN).status_code == 405 + + +def test_auth_certificate_put(client): + assert client.put(api.url_for(Certificates, certificate_id=1), data={}, headers=VALID_USER_HEADER_TOKEN).status_code == 400 + + +def test_auth_certificate_delete(client): + assert client.delete(api.url_for(Certificates, certificate_id=1), headers=VALID_USER_HEADER_TOKEN).status_code == 405 + + +def test_auth_certificate_patch(client): + assert client.patch(api.url_for(Certificates, certificate_id=1), {}, headers=VALID_USER_HEADER_TOKEN).status_code == 405 + + +def test_auth_certificates_get(client): + assert client.get(api.url_for(CertificatesList), headers=VALID_USER_HEADER_TOKEN).status_code == 200 + + +def test_auth_certificates_post(client): + assert client.post(api.url_for(CertificatesList), {}, headers=VALID_USER_HEADER_TOKEN).status_code == 400 + + +def test_auth_certificate_credentials_get(client): + assert client.get(api.url_for(CertificatePrivateKey, certificate_id=1), headers=VALID_USER_HEADER_TOKEN).status_code == 404 + + +def test_auth_certificate_credentials_post(client): + assert client.post(api.url_for(CertificatePrivateKey, certificate_id=1), {}, headers=VALID_USER_HEADER_TOKEN).status_code == 405 + + +def test_auth_certificate_credentials_put(client): + assert client.put(api.url_for(CertificatePrivateKey, certificate_id=1), {}, headers=VALID_USER_HEADER_TOKEN).status_code == 405 + + +def test_auth_certificate_credentials_delete(client): + assert client.delete(api.url_for(CertificatePrivateKey, certificate_id=1), headers=VALID_USER_HEADER_TOKEN).status_code == 405 + + +def test_auth_certificate_credentials_patch(client): + assert client.patch(api.url_for(CertificatePrivateKey, certificate_id=1), {}, headers=VALID_USER_HEADER_TOKEN).status_code == 405 + + +def test_auth_certificates_upload_get(client): + assert client.get(api.url_for(CertificatesUpload), headers=VALID_USER_HEADER_TOKEN).status_code == 405 + + +def test_auth_certificates_upload_post(client): + assert client.post(api.url_for(CertificatesUpload), {}, headers=VALID_USER_HEADER_TOKEN).status_code == 400 + + +def test_auth_certificates_upload_put(client): + assert client.put(api.url_for(CertificatesUpload), {}, headers=VALID_USER_HEADER_TOKEN).status_code == 405 + + +def test_auth_certificates_upload_delete(client): + assert client.delete(api.url_for(CertificatesUpload), headers=VALID_USER_HEADER_TOKEN).status_code == 405 + + +def test_auth_certificates_upload_patch(client): + assert client.patch(api.url_for(CertificatesUpload), {}, headers=VALID_USER_HEADER_TOKEN).status_code == 405 + + +VALID_ADMIN_HEADER_TOKEN = { + 'Authorization': 'Basic ' + 'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpYXQiOjE0MzUyNTAyMTgsInN1YiI6MiwiZXhwIjoxNTIxNTYzODE4fQ.6mbq4-Ro6K5MmuNiTJBB153RDhlM5LGJBjI7GBKkfqA'} + + +def test_admin_certificate_get(client): + assert client.get(api.url_for(Certificates, certificate_id=1), headers=VALID_ADMIN_HEADER_TOKEN).status_code == 200 + + +def test_admin_certificate_post(client): + assert client.post(api.url_for(Certificates, certificate_id=1), {}, headers=VALID_ADMIN_HEADER_TOKEN).status_code == 405 + + +def test_admin_certificate_put(client): + assert client.put(api.url_for(Certificates, certificate_id=1), data={}, headers=VALID_ADMIN_HEADER_TOKEN).status_code == 400 + + +def test_admin_certificate_delete(client): + assert client.delete(api.url_for(Certificates, certificate_id=1), headers=VALID_ADMIN_HEADER_TOKEN).status_code == 405 + + +def test_admin_certificate_patch(client): + assert client.patch(api.url_for(Certificates, certificate_id=1), {}, headers=VALID_ADMIN_HEADER_TOKEN).status_code == 405 + + +def test_admin_certificates_get(client): + resp = client.get(api.url_for(CertificatesList), headers=VALID_ADMIN_HEADER_TOKEN) + assert resp.status_code == 200 + assert resp.json['total'] == 0 + + +def test_admin_certificate_credentials_get(client): + assert client.get(api.url_for(CertificatePrivateKey, certificate_id=1), headers=VALID_ADMIN_HEADER_TOKEN).status_code == 404 + + +def test_admin_certificate_credentials_post(client): + assert client.post(api.url_for(CertificatePrivateKey, certificate_id=1), {}, headers=VALID_ADMIN_HEADER_TOKEN).status_code == 405 + + +def test_admin_certificate_credentials_put(client): + assert client.put(api.url_for(CertificatePrivateKey, certificate_id=1), data={}, headers=VALID_ADMIN_HEADER_TOKEN).status_code == 405 + + +def test_admin_certificate_credentials_delete(client): + assert client.delete(api.url_for(CertificatePrivateKey, certificate_id=1), headers=VALID_ADMIN_HEADER_TOKEN).status_code == 405 + + +def test_admin_certificate_credentials_patch(client): + assert client.patch(api.url_for(CertificatePrivateKey, certificate_id=1), {}, headers=VALID_ADMIN_HEADER_TOKEN).status_code == 405 + From 1f9d943a4cb9ce41502de3076d58f8d4694a9edd Mon Sep 17 00:00:00 2001 From: Kevin Glisson Date: Fri, 26 Jun 2015 16:17:22 -0700 Subject: [PATCH 10/13] Ensuring a 404 is returned when we can't find the specified certificate --- lemur/certificates/views.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/lemur/certificates/views.py b/lemur/certificates/views.py index 4bc317f1..1312d665 100644 --- a/lemur/certificates/views.py +++ b/lemur/certificates/views.py @@ -437,6 +437,9 @@ class CertificatePrivateKey(AuthenticatedResource): :statuscode 403: unauthenticated """ cert = service.get(certificate_id) + if not cert: + return dict(message="Cannot find specified certificate"), 404 + role = role_service.get_by_name(cert.owner) permission = ViewKeyPermission(certificate_id, hasattr(role, 'id')) From 964d1c1c524e7b2ea4406d42864cfd8870914672 Mon Sep 17 00:00:00 2001 From: Kevin Glisson Date: Fri, 26 Jun 2015 16:18:31 -0700 Subject: [PATCH 11/13] Refactoring 'create_name' out of our certificate class, fixed an issuer were key size was being calculated and removing unused functions --- lemur/certificates/models.py | 116 +++++++++++++---------------------- 1 file changed, 43 insertions(+), 73 deletions(-) diff --git a/lemur/certificates/models.py b/lemur/certificates/models.py index 81947bfc..6eca4afc 100644 --- a/lemur/certificates/models.py +++ b/lemur/certificates/models.py @@ -26,6 +26,36 @@ from lemur.constants import SAN_NAMING_TEMPLATE, DEFAULT_NAMING_TEMPLATE, NONSTA from lemur.models import certificate_associations, certificate_account_associations +def create_name(issuer, not_before, not_after, common_name, san): + """ + Create a name for our certificate. A naming standard + is based on a series of templates. The name includes + useful information such as Common Name, Validation dates, + and Issuer. + + :rtype : str + :return: + """ + delchars = ''.join(c for c in map(chr, range(256)) if not c.isalnum()) + # aws doesn't allow special chars + subject = common_name.replace('*', "WILDCARD") + issuer = issuer.translate(None, delchars) + + if san: + t = SAN_NAMING_TEMPLATE + else: + t = DEFAULT_NAMING_TEMPLATE + + temp = t.format( + subject=subject, + issuer=issuer, + not_before=not_before.strftime('%Y%m%d'), + not_after=not_after.strftime('%Y%m%d') + ) + + return temp + + def cert_get_cn(cert): """ Attempts to get a sane common name from a given certificate. @@ -33,12 +63,9 @@ def cert_get_cn(cert): :param cert: :return: Common name or None """ - try: - return cert.subject.get_attributes_for_oid( - x509.OID_COMMON_NAME - )[0].value.strip() - except Exception as e: - current_app.logger.error("Unable to get CN! {0}".format(e)) + return cert.subject.get_attributes_for_oid( + x509.OID_COMMON_NAME + )[0].value.strip() def cert_get_domains(cert): @@ -48,17 +75,21 @@ def cert_get_domains(cert): return the common name. :param cert: - :return: List of domains + :return: List of domainss """ domains = [] try: ext = cert.extensions.get_extension_for_oid(x509.OID_SUBJECT_ALTERNATIVE_NAME) - entries = ext.get_values_for(x509.DNSName) + entries = ext.value.get_values_for_type(x509.DNSName) for entry in entries: - domains.append(entry.split(":")[1].strip(", ")) + domains.append(entry) except Exception as e: current_app.logger.warning("Failed to get SubjectAltName: {0}".format(e)) - domains.append(cert_get_cn(cert)) + + # do a simple check to make sure it's a real domain + common_name = cert_get_cn(cert) + if '.' in common_name: + domains.append(common_name) return domains @@ -106,7 +137,7 @@ def cert_get_bitstrength(cert): :param cert: :return: Integer """ - return cert.public_key().key_size * 8 + return cert.public_key().key_size def cert_get_issuer(cert): @@ -122,20 +153,6 @@ def cert_get_issuer(cert): current_app.logger.error("Unable to get issuer! {0}".format(e)) -def cert_is_internal(cert): - """ - Uses an internal resource in order to determine if - a given certificate was issued by an 'internal' certificate - authority. - - :param cert: - :return: Bool - """ - if cert_get_issuer(cert) in current_app.config.get('INTERNAL_CA', []): - return True - return False - - def cert_get_not_before(cert): """ Gets the naive datetime of the certificates 'not_before' field. @@ -223,49 +240,11 @@ class Certificate(db.Model): self.san = cert_is_san(cert) self.not_before = cert_get_not_before(cert) self.not_after = cert_get_not_after(cert) - self.name = self.create_name + self.name = create_name(self.issuer, self.not_before, self.not_after, self.cn, self.san) for domain in cert_get_domains(cert): self.domains.append(Domain(name=domain)) - @property - def create_name(self): - """ - Create a name for our certificate. A naming standard - is based on a series of templates. The name includes - useful information such as Common Name, Validation dates, - and Issuer. - - :rtype : str - :return: - """ - # aws doesn't allow special chars - if self.cn: - subject = self.cn.replace('*', "WILDCARD") - - if self.san: - t = SAN_NAMING_TEMPLATE - else: - t = DEFAULT_NAMING_TEMPLATE - - temp = t.format( - subject=subject, - issuer=self.issuer, - not_before=self.not_before.strftime('%Y%m%d'), - not_after=self.not_after.strftime('%Y%m%d') - ) - - else: - t = NONSTANDARD_NAMING_TEMPLATE - - temp = t.format( - issuer=self.issuer, - not_before=self.not_before.strftime('%Y%m%d'), - not_after=self.not_after.strftime('%Y%m%d') - ) - - return temp - @property def is_expired(self): if self.not_after < datetime.datetime.now(): @@ -296,12 +275,3 @@ class Certificate(db.Model): def as_dict(self): return {c.name: getattr(self, c.name) for c in self.__table__.columns} - def serialize(self): - blob = self.as_dict() - # TODO this should be done with relationships - user = user_service.get(self.user_id) - if user: - blob['creator'] = user.username - - return blob - From b025a45046ab577ef6e998c221a91125c6b4869e Mon Sep 17 00:00:00 2001 From: Kevin Glisson Date: Mon, 29 Jun 2015 12:36:27 -0700 Subject: [PATCH 12/13] Adding basic authority tests. --- lemur/authorities/service.py | 9 +- lemur/authorities/views.py | 6 +- lemur/tests/test_authorities.py | 163 ++++++++++++++++++++++++++++++++ 3 files changed, 174 insertions(+), 4 deletions(-) create mode 100644 lemur/tests/test_authorities.py diff --git a/lemur/authorities/service.py b/lemur/authorities/service.py index d7d82a48..84861665 100644 --- a/lemur/authorities/service.py +++ b/lemur/authorities/service.py @@ -57,11 +57,14 @@ def create(kwargs): cert = cert_service.save_cert(cert_body, None, intermediate, None, None, None) cert.user = g.current_user - # we create and attach any roles that cloudCA gives us + # we create and attach any roles that the issuer gives us role_objs = [] for r in issuer_roles: - role = role_service.create(r['name'], password=r['password'], description="CloudCA auto generated role", - username=r['username']) + role = role_service.create( + r['name'], + password=r['password'], + description="{0} auto generated role".format(kwargs.get('pluginName')), + username=r['username']) # the user creating the authority should be able to administer it if role.username == 'admin': g.current_user.roles.append(role) diff --git a/lemur/authorities/views.py b/lemur/authorities/views.py index e973a10c..cb1797ed 100644 --- a/lemur/authorities/views.py +++ b/lemur/authorities/views.py @@ -365,7 +365,11 @@ class CertificateAuthority(AuthenticatedResource): :statuscode 200: no error :statuscode 403: unauthenticated """ - return certificate_service.get(certificate_id).authority + cert = certificate_service.get(certificate_id) + if not cert: + return dict(message="Certificate not found"), 404 + + return cert.authority api.add_resource(AuthoritiesList, '/authorities', endpoint='authorities') api.add_resource(Authorities, '/authorities/', endpoint='authority') diff --git a/lemur/tests/test_authorities.py b/lemur/tests/test_authorities.py new file mode 100644 index 00000000..e55db48a --- /dev/null +++ b/lemur/tests/test_authorities.py @@ -0,0 +1,163 @@ +import pytest +from lemur.authorities.views import * + +#def test_crud(session): +# role = create('role1') +# assert role.id > 0 +# +# role = update(role.id, 'role_new', None, []) +# assert role.name == 'role_new' +# delete(role.id) +# assert get(role.id) == None + + +def test_authority_get(client): + assert client.get(api.url_for(Authorities, authority_id=1)).status_code == 401 + + +def test_authority_post(client): + assert client.post(api.url_for(Authorities, authority_id=1), {}).status_code == 405 + + +def test_authority_put(client): + assert client.put(api.url_for(Authorities, authority_id=1), {}).status_code == 401 + + +def test_authority_delete(client): + assert client.delete(api.url_for(Authorities, authority_id=1)).status_code == 405 + + +def test_authority_patch(client): + assert client.patch(api.url_for(Authorities, authority_id=1), {}).status_code == 405 + + +def test_authorities_get(client): + assert client.get(api.url_for(AuthoritiesList)).status_code == 401 + + +def test_authorities_post(client): + assert client.post(api.url_for(AuthoritiesList), {}).status_code == 401 + + +def test_authorities_put(client): + assert client.put(api.url_for(AuthoritiesList), {}).status_code == 405 + + +def test_authorities_delete(client): + assert client.delete(api.url_for(AuthoritiesList)).status_code == 405 + + +def test_authorities_patch(client): + assert client.patch(api.url_for(AuthoritiesList), {}).status_code == 405 + + +def test_certificate_authorities_get(client): + assert client.get(api.url_for(AuthoritiesList)).status_code == 401 + + +def test_certificate_authorities_post(client): + assert client.post(api.url_for(AuthoritiesList), {}).status_code == 401 + + +def test_certificate_authorities_put(client): + assert client.put(api.url_for(AuthoritiesList), {}).status_code == 405 + + +def test_certificate_authorities_delete(client): + assert client.delete(api.url_for(AuthoritiesList)).status_code == 405 + + +def test_certificate_authorities_patch(client): + assert client.patch(api.url_for(AuthoritiesList), {}).status_code == 405 + + +VALID_USER_HEADER_TOKEN = { + 'Authorization': 'Basic ' + 'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpYXQiOjE0MzUyMzMzNjksInN1YiI6MSwiZXhwIjoxNTIxNTQ2OTY5fQ.1qCi0Ip7mzKbjNh0tVd3_eJOrae3rNa_9MCVdA4WtQI'} + + +def test_auth_authority_get(client): + assert client.get(api.url_for(Authorities, authority_id=1), headers=VALID_USER_HEADER_TOKEN).status_code == 200 + + +def test_auth_authority_post_(client): + assert client.post(api.url_for(Authorities, authority_id=1), {}, headers=VALID_USER_HEADER_TOKEN).status_code == 405 + + +def test_auth_authority_put(client): + assert client.put(api.url_for(Authorities, authority_id=1), data={}, headers=VALID_USER_HEADER_TOKEN).status_code == 400 + + +def test_auth_authority_delete(client): + assert client.delete(api.url_for(Authorities, authority_id=1), headers=VALID_USER_HEADER_TOKEN).status_code == 405 + + +def test_auth_authority_patch(client): + assert client.patch(api.url_for(Authorities, authority_id=1), {}, headers=VALID_USER_HEADER_TOKEN).status_code == 405 + + +def test_auth_authorities_get(client): + assert client.get(api.url_for(AuthoritiesList), headers=VALID_USER_HEADER_TOKEN).status_code == 200 + + +def test_auth_authorities_post(client): + assert client.post(api.url_for(AuthoritiesList), {}, headers=VALID_USER_HEADER_TOKEN).status_code == 400 + + +def test_auth_certificates_authorities_get(client): + assert client.get(api.url_for(CertificateAuthority, certificate_id=1), headers=VALID_USER_HEADER_TOKEN).status_code == 404 + + +VALID_ADMIN_HEADER_TOKEN = { + 'Authorization': 'Basic ' + 'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpYXQiOjE0MzUyNTAyMTgsInN1YiI6MiwiZXhwIjoxNTIxNTYzODE4fQ.6mbq4-Ro6K5MmuNiTJBB153RDhlM5LGJBjI7GBKkfqA'} + + +def test_admin_authority_get(client): + assert client.get(api.url_for(Authorities, authority_id=1), headers=VALID_ADMIN_HEADER_TOKEN).status_code == 200 + + +def test_admin_authority_post(client): + assert client.post(api.url_for(Authorities, authority_id=1), {}, headers=VALID_ADMIN_HEADER_TOKEN).status_code == 405 + + +def test_admin_authority_put(client): + assert client.put(api.url_for(Authorities, authority_id=1), data={}, headers=VALID_ADMIN_HEADER_TOKEN).status_code == 400 + + +def test_admin_authority_delete(client): + assert client.delete(api.url_for(Authorities, authority_id=1), headers=VALID_ADMIN_HEADER_TOKEN).status_code == 405 + + +def test_admin_authority_patch(client): + assert client.patch(api.url_for(Authorities, authority_id=1), data={}, headers=VALID_ADMIN_HEADER_TOKEN).status_code == 405 + + +def test_admin_authorities_get(client): + assert client.get(api.url_for(AuthoritiesList), headers=VALID_ADMIN_HEADER_TOKEN).status_code == 200 + + +def test_admin_authorities_post(client): + assert client.post(api.url_for(AuthoritiesList), {}, headers=VALID_ADMIN_HEADER_TOKEN).status_code == 400 + + +def test_admin_authorities_put(client): + assert client.put(api.url_for(AuthoritiesList), {}, headers=VALID_ADMIN_HEADER_TOKEN).status_code == 405 + + +def test_admin_authorities_delete(client): + assert client.delete(api.url_for(AuthoritiesList), headers=VALID_ADMIN_HEADER_TOKEN).status_code == 405 + + +def test_admin_certificate_authorities_get(client): + assert client.get(api.url_for(CertificateAuthority, certificate_id=1), headers=VALID_ADMIN_HEADER_TOKEN).status_code == 404 + + +def test_admin_certificate_authorities_post(client): + assert client.post(api.url_for(CertificateAuthority, certficate_id=1), headers=VALID_ADMIN_HEADER_TOKEN).status_code == 405 + + +def test_admin_certificate_authorities_put(client): + assert client.put(api.url_for(CertificateAuthority, certificate_id=1), headers=VALID_ADMIN_HEADER_TOKEN).status_code == 405 + + +def test_admin_certificate_authorities_delete(client): + assert client.delete(api.url_for(CertificateAuthority, certificate_id=1), headers=VALID_ADMIN_HEADER_TOKEN).status_code == 405 From bde556aa10d5802eb71f0311c119b02213bb25da Mon Sep 17 00:00:00 2001 From: Kevin Glisson Date: Mon, 29 Jun 2015 13:51:52 -0700 Subject: [PATCH 13/13] Extending certificate tests. --- lemur/__init__.py | 40 ++++++++++++++++---------------- lemur/tests/certs.py | 38 ++++++++++++++++++++++++++++++ lemur/tests/test_certificates.py | 9 ++++++- 3 files changed, 66 insertions(+), 21 deletions(-) diff --git a/lemur/__init__.py b/lemur/__init__.py index 388914bd..3eb69e3b 100644 --- a/lemur/__init__.py +++ b/lemur/__init__.py @@ -12,28 +12,28 @@ from flask import jsonify from lemur import factory -from lemur.users.views import mod as users -from lemur.roles.views import mod as roles -from lemur.auth.views import mod as auth -from lemur.domains.views import mod as domains -from lemur.elbs.views import mod as elbs -from lemur.accounts.views import mod as accounts -from lemur.authorities.views import mod as authorities -from lemur.listeners.views import mod as listeners -from lemur.certificates.views import mod as certificates -from lemur.status.views import mod as status +from lemur.users.views import mod as users_bp +from lemur.roles.views import mod as roles_bp +from lemur.auth.views import mod as auth_bp +from lemur.domains.views import mod as domains_bp +from lemur.elbs.views import mod as elbs_bp +from lemur.accounts.views import mod as accounts_bp +from lemur.authorities.views import mod as authorities_bp +from lemur.listeners.views import mod as listeners_bp +from lemur.certificates.views import mod as certificates_bp +from lemur.status.views import mod as status_bp LEMUR_BLUEPRINTS = ( - users, - roles, - auth, - domains, - elbs, - accounts, - authorities, - listeners, - certificates, - status + users_bp, + roles_bp, + auth_bp, + domains_bp, + elbs_bp, + accounts_bp, + authorities_bp, + listeners_bp, + certificates_bp, + status_bp ) def create_app(config=None): diff --git a/lemur/tests/certs.py b/lemur/tests/certs.py index 29e62705..ce2f123b 100644 --- a/lemur/tests/certs.py +++ b/lemur/tests/certs.py @@ -180,4 +180,42 @@ F74P1wKBgQCPQGKLUcfAvjIcZp4ECH0K8sBEmoEf8pceuALZ3H5vneYDzqMDIceo t5Gpocpt77LJnNiszXSerj/KjX2MflY5xUXeekWowLVTBOK5+CZ8+XBIgBt1hIG3 XKxcRgm/Va4QMEAnec0qXfdTVJaJiAW0bdKwKRRrrbwcTdNRGibdng== -----END RSA PRIVATE KEY----- +""" + +CSR_CONFIG = """ + # Configuration for standard CSR generation for Netflix + # Used for procuring VeriSign certificates + # Author: jbob + # Contact: security@example.com + + [ req ] + # Use a 2048 bit private key + default_bits = 2048 + default_keyfile = key.pem + prompt = no + encrypt_key = no + + # base request + distinguished_name = req_distinguished_name + + # extensions + # Uncomment the following line if you are requesting a SAN cert + #req_extensions = req_ext + + # distinguished_name + [ req_distinguished_name ] + countryName = "US" # C= + stateOrProvinceName = "CALIFORNIA" # ST= + localityName = "A place" # L= + organizationName = "Example, Inc." # O= + organizationalUnitName = "Operations" # OU= + # This is the hostname/subject name on the certificate + commonName = "example.net" # CN= + + [ req_ext ] + # Uncomment the following line if you are requesting a SAN cert + #subjectAltName = @alt_names + + [alt_names] + # Put your SANs here """ \ No newline at end of file diff --git a/lemur/tests/test_certificates.py b/lemur/tests/test_certificates.py index 3201287a..4470149d 100644 --- a/lemur/tests/test_certificates.py +++ b/lemur/tests/test_certificates.py @@ -1,4 +1,6 @@ +import os import pytest +from mock import mock_open, patch from lemur.certificates.views import * #def test_crud(session): @@ -32,7 +34,12 @@ def test_private_key_str(): def test_create_csr(): - assert 1 == 2 + from lemur.tests.certs import CSR_CONFIG + from lemur.certificates.service import create_csr + m = mock_open() + with patch('lemur.certificates.service.open', m, create=True): + path = create_csr(CSR_CONFIG) + assert path == '' def test_create_path():