lemur/lemur/tests/conftest.py

332 lines
7.6 KiB
Python

import os
import datetime
import pytest
from cryptography import x509
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes
from flask import current_app
from flask_principal import identity_changed, Identity
from sqlalchemy.sql import text
from lemur import create_app
from lemur.common.utils import parse_private_key
from lemur.database import db as _db
from lemur.auth.service import create_token
from lemur.tests.vectors import (
SAN_CERT_KEY,
INTERMEDIATE_KEY,
ROOTCA_CERT_STR,
ROOTCA_KEY,
)
from .factories import (
ApiKeyFactory,
AuthorityFactory,
NotificationFactory,
DestinationFactory,
CertificateFactory,
UserFactory,
RoleFactory,
SourceFactory,
EndpointFactory,
RotationPolicyFactory,
PendingCertificateFactory,
AsyncAuthorityFactory,
InvalidCertificateFactory,
CryptoAuthorityFactory,
CACertificateFactory,
)
def pytest_runtest_setup(item):
if "slow" in item.keywords and not item.config.getoption("--runslow"):
pytest.skip("need --runslow option to run")
if "incremental" in item.keywords:
previousfailed = getattr(item.parent, "_previousfailed", None)
if previousfailed is not None:
pytest.xfail("previous test failed ({0})".format(previousfailed.name))
def pytest_runtest_makereport(item, call):
if "incremental" in item.keywords:
if call.excinfo is not None:
parent = item.parent
parent._previousfailed = item
@pytest.yield_fixture(scope="session")
def app(request):
"""
Creates a new Flask application for a test duration.
Uses application factory `create_app`.
"""
_app = create_app(
config_path=os.path.dirname(os.path.realpath(__file__)) + "/conf.py"
)
ctx = _app.app_context()
ctx.push()
yield _app
ctx.pop()
@pytest.yield_fixture(scope="session")
def db(app, request):
_db.drop_all()
_db.engine.execute(text("CREATE EXTENSION IF NOT EXISTS pg_trgm"))
_db.create_all()
_db.app = app
UserFactory()
r = RoleFactory(name="admin")
u = UserFactory(roles=[r])
rp = RotationPolicyFactory(name="default")
ApiKeyFactory(user=u)
_db.session.commit()
yield _db
_db.drop_all()
@pytest.yield_fixture(scope="function")
def session(db, request):
"""
Creates a new database session with (with working transaction)
for test duration.
"""
db.session.begin_nested()
yield db.session
db.session.rollback()
@pytest.yield_fixture(scope="function")
def client(app, session, client):
yield client
@pytest.fixture
def authority(session):
a = AuthorityFactory()
session.commit()
return a
@pytest.fixture
def crypto_authority(session):
a = CryptoAuthorityFactory()
session.commit()
return a
@pytest.fixture
def async_authority(session):
a = AsyncAuthorityFactory()
session.commit()
return a
@pytest.fixture
def destination(session):
d = DestinationFactory()
session.commit()
return d
@pytest.fixture
def source(session):
s = SourceFactory()
session.commit()
return s
@pytest.fixture
def notification(session):
n = NotificationFactory()
session.commit()
return n
@pytest.fixture
def certificate(session):
u = UserFactory()
a = AuthorityFactory()
c = CertificateFactory(user=u, authority=a)
session.commit()
return c
@pytest.fixture
def endpoint(session):
s = SourceFactory()
e = EndpointFactory(source=s)
session.commit()
return e
@pytest.fixture
def role(session):
r = RoleFactory()
session.commit()
return r
@pytest.fixture
def user(session):
u = UserFactory()
session.commit()
user_token = create_token(u)
token = {"Authorization": "Basic " + user_token}
return {"user": u, "token": token}
@pytest.fixture
def pending_certificate(session):
u = UserFactory()
a = AsyncAuthorityFactory()
p = PendingCertificateFactory(user=u, authority=a)
session.commit()
return p
@pytest.fixture
def pending_certificate_from_full_chain_ca(session):
u = UserFactory()
a = AuthorityFactory()
p = PendingCertificateFactory(user=u, authority=a)
session.commit()
return p
@pytest.fixture
def pending_certificate_from_partial_chain_ca(session):
u = UserFactory()
c = CACertificateFactory(body=ROOTCA_CERT_STR, private_key=ROOTCA_KEY, chain=None)
a = AuthorityFactory(authority_certificate=c)
p = PendingCertificateFactory(user=u, authority=a)
session.commit()
return p
@pytest.fixture
def invalid_certificate(session):
u = UserFactory()
a = AsyncAuthorityFactory()
i = InvalidCertificateFactory(user=u, authority=a)
session.commit()
return i
@pytest.fixture
def admin_user(session):
u = UserFactory()
admin_role = RoleFactory(name="admin")
u.roles.append(admin_role)
session.commit()
user_token = create_token(u)
token = {"Authorization": "Basic " + user_token}
return {"user": u, "token": token}
@pytest.fixture
def async_issuer_plugin():
from lemur.plugins.base import register
from .plugins.issuer_plugin import TestAsyncIssuerPlugin
register(TestAsyncIssuerPlugin)
return TestAsyncIssuerPlugin
@pytest.fixture
def issuer_plugin():
from lemur.plugins.base import register
from .plugins.issuer_plugin import TestIssuerPlugin
register(TestIssuerPlugin)
return TestIssuerPlugin
@pytest.fixture
def notification_plugin():
from lemur.plugins.base import register
from .plugins.notification_plugin import TestNotificationPlugin
register(TestNotificationPlugin)
return TestNotificationPlugin
@pytest.fixture
def destination_plugin():
from lemur.plugins.base import register
from .plugins.destination_plugin import TestDestinationPlugin
register(TestDestinationPlugin)
return TestDestinationPlugin
@pytest.fixture
def source_plugin():
from lemur.plugins.base import register
from .plugins.source_plugin import TestSourcePlugin
register(TestSourcePlugin)
return TestSourcePlugin
@pytest.yield_fixture(scope="function")
def logged_in_user(session, app):
with app.test_request_context():
identity_changed.send(current_app._get_current_object(), identity=Identity(1))
yield
@pytest.yield_fixture(scope="function")
def logged_in_admin(session, app):
with app.test_request_context():
identity_changed.send(current_app._get_current_object(), identity=Identity(2))
yield
@pytest.fixture
def private_key():
return parse_private_key(SAN_CERT_KEY)
@pytest.fixture
def issuer_private_key():
return parse_private_key(INTERMEDIATE_KEY)
@pytest.fixture
def cert_builder(private_key):
return (
x509.CertificateBuilder()
.subject_name(
x509.Name([x509.NameAttribute(x509.NameOID.COMMON_NAME, "foo.com")])
)
.issuer_name(
x509.Name([x509.NameAttribute(x509.NameOID.COMMON_NAME, "foo.com")])
)
.serial_number(1)
.public_key(private_key.public_key())
.not_valid_before(datetime.datetime(2017, 12, 22))
.not_valid_after(datetime.datetime(2040, 1, 1))
)
@pytest.fixture
def selfsigned_cert(cert_builder, private_key):
# cert_builder uses the same cert public key as 'private_key'
return cert_builder.sign(private_key, hashes.SHA256(), default_backend())
@pytest.fixture(scope="function")
def aws_credentials():
os.environ["AWS_ACCESS_KEY_ID"] = "testing"
os.environ["AWS_SECRET_ACCESS_KEY"] = "testing"
os.environ["AWS_SECURITY_TOKEN"] = "testing"
os.environ["AWS_SESSION_TOKEN"] = "testing"