Closes #278 and #199, Starting transition to marshmallow (#299)

* Closes #278  and #199, Starting transition to marshmallow
This commit is contained in:
kevgliss
2016-05-05 12:52:08 -07:00
parent 941d36ebfe
commit 52f44c3ea6
27 changed files with 1368 additions and 489 deletions

View File

@ -38,6 +38,12 @@ LEMUR_SECURITY_TEAM_EMAIL = []
LOG_LEVEL = "DEBUG"
LOG_FILE = "lemur.log"
LEMUR_DEFAULT_COUNTRY = 'US'
LEMUR_DEFAULT_STATE = 'California'
LEMUR_DEFAULT_LOCATION = 'Los Gatos'
LEMUR_DEFAULT_ORGANIZATION = 'Example, Inc.'
LEMUR_DEFAULT_ORGANIZATIONAL_UNIT = 'Example'
# Database

View File

@ -1,15 +1,15 @@
import os
import pytest
from flask import current_app
from flask.ext.principal import identity_changed, Identity
from lemur import create_app
from lemur.database import db as _db
from lemur.users import service as user_service
from lemur.roles import service as role_service
def pytest_addoption(parser):
parser.addoption("--lemurconfig", help="override the default test config")
parser.addoption("--runslow", action="store_true", help="run slow tests")
from .factories import AuthorityFactory, NotificationFactory, DestinationFactory, \
CertificateFactory, UserFactory, RoleFactory
def pytest_runtest_setup(item):
@ -35,10 +35,7 @@ def app(request):
Creates a new Flask application for a test duration.
Uses application factory `create_app`.
"""
if request.config.getoption('--lemurconfig'):
_app = create_app(request.config.getoption('--lemurconfig'))
else:
_app = create_app(os.path.dirname(os.path.realpath(__file__)) + '/conf.py')
_app = create_app(os.path.dirname(os.path.realpath(__file__)) + '/conf.py')
ctx = _app.app_context()
ctx.push()
@ -54,9 +51,10 @@ def db(app, request):
_db.app = app
user = user_service.create('user', 'test', 'user@example.com', True, None, [])
admin_role = role_service.create('admin')
admin = user_service.create('admin', 'admin', 'admin@example.com', True, None, [admin_role])
UserFactory()
r = RoleFactory(name='admin')
UserFactory(roles=[r])
_db.session.commit()
yield _db
@ -68,10 +66,52 @@ def session(db, request):
for test duration.
"""
db.session.begin_nested()
yield session
yield db.session
db.session.rollback()
@pytest.yield_fixture(scope="function")
def client(app, session, client):
yield client
@pytest.fixture
def authority(session):
a = AuthorityFactory()
session.commit()
return a
@pytest.fixture
def destination(session):
d = DestinationFactory()
session.commit()
return d
@pytest.fixture
def notification(session):
n = NotificationFactory()
session.commit()
return n
@pytest.fixture
def certificate(session):
c = CertificateFactory()
session.commit()
return c
@pytest.yield_fixture(scope="function")
def logged_in_user(app, user):
with app.test_request_context():
identity_changed.send(current_app._get_current_object(), identity=Identity(user.id))
yield
@pytest.yield_fixture(scope="function")
def logged_in_admin(app, admin_user):
with app.test_request_context():
identity_changed.send(current_app._get_current_object(), identity=Identity(admin_user.id))
yield

210
lemur/tests/factories.py Normal file
View File

@ -0,0 +1,210 @@
from datetime import date
from factory import Sequence, post_generation
from factory.alchemy import SQLAlchemyModelFactory
from factory.fuzzy import FuzzyChoice, FuzzyText, FuzzyDate
from lemur.database import db
from lemur.authorities.models import Authority
from lemur.certificates.models import Certificate
from lemur.destinations.models import Destination
from lemur.notifications.models import Notification
from lemur.users.models import User
from lemur.roles.models import Role
from .vectors import INTERNAL_VALID_SAN_STR, PRIVATE_KEY_STR
class BaseFactory(SQLAlchemyModelFactory):
"""Base factory."""
class Meta:
"""Factory configuration."""
abstract = True
sqlalchemy_session = db.session
class AuthorityFactory(BaseFactory):
"""Authority factory."""
name = Sequence(lambda n: 'authority{0}'.format(n))
owner = 'joe@example.com'
plugin_name = 'TheRing'
body = INTERNAL_VALID_SAN_STR
class Meta:
"""Factory configuration."""
model = Authority
@post_generation
def roles(self, create, extracted, **kwargs):
if not create:
return
if extracted:
for role in extracted:
self.roles.append(role)
class CertificateFactory(BaseFactory):
"""Certificate factory."""
name = Sequence(lambda n: 'certificate{0}'.format(n))
chain = INTERNAL_VALID_SAN_STR
body = INTERNAL_VALID_SAN_STR
private_key = PRIVATE_KEY_STR
owner = 'joe@example.com'
status = FuzzyChoice(['valid', 'revoked', 'unknown'])
deleted = False
bits = 2048
issuer = 'Example'
serial = FuzzyText(length=128)
cn = 'test.example.com'
description = FuzzyText(length=128)
active = True
san = 'true'
not_before = FuzzyDate(date(2016, 1, 1), date(2020, 1, 1))
not_after = FuzzyDate(date(2016, 1, 1), date(2020, 1, 1))
date_created = FuzzyDate(date(2016, 1, 1), date(2020, 1, 1))
class Meta:
"""Factory Configuration."""
model = Certificate
@post_generation
def user(self, create, extracted, **kwargs):
if not create:
return
if extracted:
self.user_id = extracted.id
@post_generation
def authority(self, create, extracted, **kwargs):
if not create:
return
if extracted:
self.authority_id = extracted.id
@post_generation
def notifications(self, create, extracted, **kwargs):
if not create:
return
if extracted:
for notification in extracted:
self.notifications.append(notification)
@post_generation
def destinations(self, create, extracted, **kwargs):
if not create:
return
if extracted:
for destination in extracted:
self.destintations.append(destination)
@post_generation
def replaces(self, create, extracted, **kwargs):
if not create:
return
if extracted:
for replace in extracted:
self.replaces.append(replace)
@post_generation
def sources(self, create, extracted, **kwargs):
if not create:
return
if extracted:
for source in extracted:
self.sources.append(source)
@post_generation
def domains(self, create, extracted, **kwargs):
if not create:
return
if extracted:
for domain in extracted:
self.domains.append(domain)
class DestinationFactory(BaseFactory):
"""Destination factory."""
plugin_name = Sequence(lambda n: 'destination{0}'.format(n))
label = Sequence(lambda n: 'destination{0}'.format(n))
class Meta:
"""Factory Configuration."""
model = Destination
class NotificationFactory(BaseFactory):
"""Notification factory."""
plugin_name = Sequence(lambda n: 'notification{0}'.format(n))
label = Sequence(lambda n: 'notification{0}'.format(n))
class Meta:
"""Factory Configuration."""
model = Notification
class RoleFactory(BaseFactory):
"""Role factory."""
name = Sequence(lambda n: 'role{0}'.format(n))
class Meta:
"""Factory Configuration."""
model = Role
@post_generation
def users(self, create, extracted, **kwargs):
if not create:
return
if extracted:
for user in extracted:
self.users.append(user)
class UserFactory(BaseFactory):
"""User Factory."""
username = Sequence(lambda n: 'user{0}'.format(n))
email = Sequence(lambda n: 'user{0}@example.com'.format(n))
active = True
password = FuzzyText(length=24)
class Meta:
"""Factory Configuration."""
model = User
@post_generation
def roles(self, create, extracted, **kwargs):
if not create:
return
if extracted:
for role in extracted:
self.roles.append(role)
@post_generation
def certificates(self, create, extracted, **kwargs):
if not create:
return
if extracted:
for cert in extracted:
self.certificates.append(cert)
@post_generation
def authorities(self, create, extracted, **kwargs):
if not create:
return
if extracted:
for authority in extracted:
self.authorities.append(authority)

View File

@ -1,35 +1,288 @@
from __future__ import unicode_literals # at top of module
import pytest
import json
from lemur.certificates.views import * # noqa
def test_pem_str():
from lemur.tests.certs import INTERNAL_VALID_LONG_STR
assert pem_str(INTERNAL_VALID_LONG_STR, 'test') == INTERNAL_VALID_LONG_STR
with pytest.raises(ValueError):
pem_str('sdfsdfds', 'test')
from .vectors import VALID_ADMIN_HEADER_TOKEN, VALID_USER_HEADER_TOKEN
def test_private_key_str():
from lemur.tests.certs import PRIVATE_KEY_STR
assert private_key_str(PRIVATE_KEY_STR, 'test') == PRIVATE_KEY_STR
def test_authority_identifier_schema():
from lemur.certificates.schemas import AuthorityIdentifierSchema
input_data = {'useAuthorityCert': True}
with pytest.raises(ValueError):
private_key_str('dfsdfsdf', 'test')
data, errors = AuthorityIdentifierSchema().load(input_data)
assert data == {'use_authority_cert': True}
assert not errors
data, errors = AuthorityIdentifierSchema().dumps(data)
assert not errors
assert data == json.dumps(input_data)
def test_create_basic_csr():
def test_authority_key_identifier_schema():
from lemur.certificates.schemas import AuthorityKeyIdentifierSchema
input_data = {'useKeyIdentifier': True}
data, errors = AuthorityKeyIdentifierSchema().load(input_data)
assert data == {'use_key_identifier': True}
assert not errors
data, errors = AuthorityKeyIdentifierSchema().dumps(data)
assert data == json.dumps(input_data)
assert not errors
def test_certificate_info_access_schema():
from lemur.certificates.schemas import CertificateInfoAccessSchema
input_data = {'includeAIA': True}
data, errors = CertificateInfoAccessSchema().load(input_data)
assert not errors
assert data == {'include_aia': True}
data, errors = CertificateInfoAccessSchema().dump(data)
assert not errors
assert data == input_data
def test_subject_key_identifier_schema():
from lemur.certificates.schemas import SubjectKeyIdentifierSchema
input_data = {'includeSKI': True}
data, errors = SubjectKeyIdentifierSchema().load(input_data)
assert not errors
assert data == {'include_ski': True}
data, errors = SubjectKeyIdentifierSchema().dump(data)
assert not errors
assert data == input_data
def test_extension_schema():
from lemur.certificates.schemas import ExtensionSchema
input_data = {
'keyUsage': {
'useKeyEncipherment': True,
'useDigitalSignature': True
},
'extendedKeyUsage': {
'useServerAuthentication': True
},
'subjectKeyIdentifier': {
'includeSKI': True
},
'subAltNames': {
'names': [
{'nameType': 'DNSName', 'value': 'test.example.com'}
]
}
}
data, errors = ExtensionSchema().load(input_data)
assert not errors
def test_certificate_input_schema(client, authority):
from lemur.certificates.schemas import CertificateInputSchema
input_data = {
'commonName': 'test.example.com',
'owner': 'jim@example.com',
'authority': {'id': authority.id},
'description': 'testtestest',
}
data, errors = CertificateInputSchema().load(input_data)
assert not errors
assert data['authority'].id == authority.id
# make sure the defaults got set
assert data['common_name'] == 'test.example.com'
assert data['country'] == 'US'
assert data['location'] == 'Los Gatos'
assert len(data.keys()) == 12
def test_certificate_input_with_extensions(client, authority):
from lemur.certificates.schemas import CertificateInputSchema
input_data = {
'commonName': 'test.example.com',
'owner': 'jim@example.com',
'authority': {'id': authority.id},
'description': 'testtestest',
'extensions': {
'keyUsage': {
'useKeyEncipherment': True,
'useDigitalSignature': True
},
'extendedKeyUsage': {
'useServerAuthentication': True
},
'subjectKeyIdentifier': {
'includeSKI': True
},
'subAltNames': {
'names': [
{'nameType': 'DNSName', 'value': 'test.example.com'}
]
}
}
}
data, errors = CertificateInputSchema().load(input_data)
assert not errors
def test_certificate_out_of_range_date(client, authority):
from lemur.certificates.schemas import CertificateInputSchema
input_data = {
'commonName': 'test.example.com',
'owner': 'jim@example.com',
'authority': {'id': authority.id},
'description': 'testtestest',
'validityYears': 100
}
data, errors = CertificateInputSchema().load(input_data)
assert errors
input_data['validityStart'] = '2017-04-30T00:12:34.513631'
data, errors = CertificateInputSchema().load(input_data)
assert errors
input_data['validityEnd'] = '2018-04-30T00:12:34.513631'
data, errors = CertificateInputSchema().load(input_data)
assert errors
def test_certificate_valid_years(client, authority):
from lemur.certificates.schemas import CertificateInputSchema
input_data = {
'commonName': 'test.example.com',
'owner': 'jim@example.com',
'authority': {'id': authority.id},
'description': 'testtestest',
'validityYears': 3
}
data, errors = CertificateInputSchema().load(input_data)
assert not errors
def test_certificate_valid_dates(client, authority):
from lemur.certificates.schemas import CertificateInputSchema
input_data = {
'commonName': 'test.example.com',
'owner': 'jim@example.com',
'authority': {'id': authority.id},
'description': 'testtestest',
'validityStart': '2017-04-30T00:12:34.513631',
'validityEnd': '2018-04-30T00:12:34.513631'
}
data, errors = CertificateInputSchema().load(input_data)
assert not errors
def test_sub_alt_name_schema():
from lemur.certificates.schemas import SubAltNameSchema, SubAltNamesSchema
input_data = {'nameType': 'DNSName', 'value': 'test.example.com'}
data, errors = SubAltNameSchema().load(input_data)
assert not errors
assert data == {'name_type': 'DNSName', 'value': 'test.example.com'}
data, errors = SubAltNameSchema().dumps(data)
assert data == json.dumps(input_data)
assert not errors
input_datas = {'names': [input_data]}
data, errors = SubAltNamesSchema().load(input_datas)
assert not errors
assert data == {'names': [{'name_type': 'DNSName', 'value': 'test.example.com'}]}
data, errors = SubAltNamesSchema().dumps(data)
assert data == json.dumps(input_datas)
assert not errors
def test_key_usage_schema():
from lemur.certificates.schemas import KeyUsageSchema
input_data = {
'useCRLSign': True,
'useDataEncipherment': True,
'useDecipherOnly': True,
'useEncipherOnly': True,
'useKeyEncipherment': True,
'useDigitalSignature': True,
'useNonRepudiation': True
}
data, errors = KeyUsageSchema().load(input_data)
assert not errors
assert data == {
'use_crl_sign': True,
'use_data_encipherment': True,
'use_decipher_only': True,
'use_encipher_only': True,
'use_key_encipherment': True,
'use_digital_signature': True,
'use_non_repudiation': True
}
def test_extended_key_usage_schema():
from lemur.certificates.schemas import ExtendedKeyUsageSchema
input_data = {
'useServerAuthentication': True,
'useClientAuthentication': True,
'useEapOverLAN': True,
'useEapOverPPP': True,
'useOCSPSigning': True,
'useSmartCardAuthentication': True,
'useTimestamping': True
}
data, errors = ExtendedKeyUsageSchema().load(input_data)
assert not errors
assert data == {
'use_server_authentication': True,
'use_client_authentication': True,
'use_eap_over_lan': True,
'use_eap_over_ppp': True,
'use_ocsp_signing': True,
'use_smart_card_authentication': True,
'use_timestamping': True
}
def test_create_basic_csr(client):
from cryptography import x509
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
from lemur.certificates.service import create_csr
csr_config = dict(
commonName='example.com',
common_name='example.com',
organization='Example, Inc.',
organizationalUnit='Operations',
organizational_unit='Operations',
country='US',
state='CA',
location='A place',
extensions=dict(names=dict(subAltNames=['test.example.com', 'test2.example.com']))
extensions=dict(names=dict(sub_alt_names=['test.example.com', 'test2.example.com']))
)
csr, pem = create_csr(csr_config)
@ -39,61 +292,61 @@ def test_create_basic_csr():
assert name.value in csr_config.values()
def test_cert_get_cn():
from lemur.tests.certs import INTERNAL_VALID_LONG_CERT
def test_cert_get_cn(client):
from .vectors import INTERNAL_VALID_LONG_CERT
from lemur.certificates.models import get_cn
assert get_cn(INTERNAL_VALID_LONG_CERT) == 'long.lived.com'
def test_cert_get_subAltDomains():
from lemur.tests.certs import INTERNAL_VALID_SAN_CERT, INTERNAL_VALID_LONG_CERT
def test_cert_get_sub_alt_domains(client):
from .vectors import INTERNAL_VALID_SAN_CERT, INTERNAL_VALID_LONG_CERT
from lemur.certificates.models import get_domains
assert get_domains(INTERNAL_VALID_LONG_CERT) == []
assert get_domains(INTERNAL_VALID_SAN_CERT) == ['example2.long.com', 'example3.long.com']
def test_cert_is_san():
from lemur.tests.certs import INTERNAL_VALID_SAN_CERT, INTERNAL_VALID_LONG_CERT
def test_cert_is_san(client):
from .vectors import INTERNAL_VALID_SAN_CERT, INTERNAL_VALID_LONG_CERT
from lemur.certificates.models import is_san
assert is_san(INTERNAL_VALID_LONG_CERT) == None # noqa
assert is_san(INTERNAL_VALID_SAN_CERT) == True # noqa
assert not is_san(INTERNAL_VALID_LONG_CERT)
assert is_san(INTERNAL_VALID_SAN_CERT)
def test_cert_is_wildcard():
from lemur.tests.certs import INTERNAL_VALID_WILDCARD_CERT, INTERNAL_VALID_LONG_CERT
def test_cert_is_wildcard(client):
from .vectors import INTERNAL_VALID_WILDCARD_CERT, INTERNAL_VALID_LONG_CERT
from lemur.certificates.models import is_wildcard
assert is_wildcard(INTERNAL_VALID_WILDCARD_CERT) == True # noqa
assert is_wildcard(INTERNAL_VALID_LONG_CERT) == None # noqa
assert is_wildcard(INTERNAL_VALID_WILDCARD_CERT)
assert not is_wildcard(INTERNAL_VALID_LONG_CERT)
def test_cert_get_bitstrength():
from lemur.tests.certs import INTERNAL_VALID_LONG_CERT
def test_cert_get_bitstrength(client):
from .vectors import INTERNAL_VALID_LONG_CERT
from lemur.certificates.models import get_bitstrength
assert get_bitstrength(INTERNAL_VALID_LONG_CERT) == 2048
def test_cert_get_issuer():
from lemur.tests.certs import INTERNAL_VALID_LONG_CERT
def test_cert_get_issuer(client):
from .vectors import INTERNAL_VALID_LONG_CERT
from lemur.certificates.models import get_issuer
assert get_issuer(INTERNAL_VALID_LONG_CERT) == 'Example'
def test_get_name_from_arn():
def test_get_name_from_arn(client):
from lemur.certificates.models import get_name_from_arn
arn = 'arn:aws:iam::11111111:server-certificate/mycertificate'
assert get_name_from_arn(arn) == 'mycertificate'
def test_get_account_number():
def test_get_account_number(client):
from lemur.certificates.models import get_account_number
arn = 'arn:aws:iam::11111111:server-certificate/mycertificate'
assert get_account_number(arn) == '11111111'
def test_create_name():
def test_create_name(client):
from lemur.certificates.models import create_name
from datetime import datetime
assert create_name(
@ -112,203 +365,181 @@ def test_create_name():
) == 'SAN-example.com-ExampleInc-20150507-20150512'
def test_certificate_get(client):
assert client.get(api.url_for(Certificates, certificate_id=1)).status_code == 401
def test_certificate_post(client):
assert client.post(api.url_for(Certificates, certificate_id=1), data={}).status_code == 405
def test_certificate_put(client):
assert client.put(api.url_for(Certificates, certificate_id=1), data={}).status_code == 401
def test_certificate_delete(client):
assert client.delete(api.url_for(Certificates, certificate_id=1)).status_code == 405
def test_certificate_patch(client):
assert client.patch(api.url_for(Certificates, certificate_id=1), data={}).status_code == 405
def test_certificates_get(client):
assert client.get(api.url_for(CertificatesList)).status_code == 401
def test_certificates_post(client):
assert client.post(api.url_for(CertificatesList), data={}).status_code == 401
def test_certificates_put(client):
assert client.put(api.url_for(CertificatesList), data={}).status_code == 405
def test_certificates_delete(client):
assert client.delete(api.url_for(CertificatesList)).status_code == 405
def test_certificates_patch(client):
assert client.patch(api.url_for(CertificatesList), data={}).status_code == 405
def test_certificate_credentials_get(client):
assert client.get(api.url_for(CertificatePrivateKey, certificate_id=1)).status_code == 401
def test_certificate_credentials_post(client):
assert client.post(api.url_for(CertificatePrivateKey, certificate_id=1), data={}).status_code == 405
def test_certificate_credentials_put(client):
assert client.put(api.url_for(CertificatePrivateKey, certificate_id=1), data={}).status_code == 405
def test_certificate_credentials_delete(client):
assert client.delete(api.url_for(CertificatePrivateKey, certificate_id=1)).status_code == 405
def test_certificate_credentials_patch(client):
assert client.patch(api.url_for(CertificatePrivateKey, certificate_id=1), data={}).status_code == 405
def test_certificates_upload_get(client):
assert client.get(api.url_for(CertificatesUpload)).status_code == 405
def test_certificates_upload_post(client):
assert client.post(api.url_for(CertificatesUpload), data={}).status_code == 401
def test_certificates_upload_put(client):
assert client.put(api.url_for(CertificatesUpload), data={}).status_code == 405
def test_certificates_upload_delete(client):
assert client.delete(api.url_for(CertificatesUpload)).status_code == 405
def test_certificates_upload_patch(client):
assert client.patch(api.url_for(CertificatesUpload), data={}).status_code == 405
VALID_USER_HEADER_TOKEN = {
'Authorization': 'Basic ' + 'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpYXQiOjE0MzUyMzMzNjksInN1YiI6MSwiZXhwIjoxNTIxNTQ2OTY5fQ.1qCi0Ip7mzKbjNh0tVd3_eJOrae3rNa_9MCVdA4WtQI'}
def test_auth_certificate_get(client):
assert client.get(api.url_for(Certificates, certificate_id=1), headers=VALID_USER_HEADER_TOKEN).status_code == 200
def test_auth_certificate_post_(client):
assert client.post(api.url_for(Certificates, certificate_id=1), data={}, headers=VALID_USER_HEADER_TOKEN).status_code == 405
def test_auth_certificate_put(client):
assert client.put(api.url_for(Certificates, certificate_id=1), data={}, headers=VALID_USER_HEADER_TOKEN).status_code == 400
def test_auth_certificate_delete(client):
assert client.delete(api.url_for(Certificates, certificate_id=1), headers=VALID_USER_HEADER_TOKEN).status_code == 405
def test_auth_certificate_patch(client):
assert client.patch(api.url_for(Certificates, certificate_id=1), data={}, headers=VALID_USER_HEADER_TOKEN).status_code == 405
def test_auth_certificates_get(client):
assert client.get(api.url_for(CertificatesList), headers=VALID_USER_HEADER_TOKEN).status_code == 200
def test_auth_certificates_post(client):
assert client.post(api.url_for(CertificatesList), data={}, headers=VALID_USER_HEADER_TOKEN).status_code == 400
def test_auth_certificate_credentials_get(client):
assert client.get(api.url_for(CertificatePrivateKey, certificate_id=1), headers=VALID_USER_HEADER_TOKEN).status_code == 404
def test_auth_certificate_credentials_post(client):
assert client.post(api.url_for(CertificatePrivateKey, certificate_id=1), data={}, headers=VALID_USER_HEADER_TOKEN).status_code == 405
def test_auth_certificate_credentials_put(client):
assert client.put(api.url_for(CertificatePrivateKey, certificate_id=1), data={}, headers=VALID_USER_HEADER_TOKEN).status_code == 405
def test_auth_certificate_credentials_delete(client):
assert client.delete(api.url_for(CertificatePrivateKey, certificate_id=1), headers=VALID_USER_HEADER_TOKEN).status_code == 405
def test_auth_certificate_credentials_patch(client):
assert client.patch(api.url_for(CertificatePrivateKey, certificate_id=1), data={}, headers=VALID_USER_HEADER_TOKEN).status_code == 405
def test_auth_certificates_upload_get(client):
assert client.get(api.url_for(CertificatesUpload), headers=VALID_USER_HEADER_TOKEN).status_code == 405
def test_auth_certificates_upload_post(client):
assert client.post(api.url_for(CertificatesUpload), data={}, headers=VALID_USER_HEADER_TOKEN).status_code == 400
def test_auth_certificates_upload_put(client):
assert client.put(api.url_for(CertificatesUpload), data={}, headers=VALID_USER_HEADER_TOKEN).status_code == 405
def test_auth_certificates_upload_delete(client):
assert client.delete(api.url_for(CertificatesUpload), headers=VALID_USER_HEADER_TOKEN).status_code == 405
def test_auth_certificates_upload_patch(client):
assert client.patch(api.url_for(CertificatesUpload), data={}, headers=VALID_USER_HEADER_TOKEN).status_code == 405
VALID_ADMIN_HEADER_TOKEN = {
'Authorization': 'Basic ' + 'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpYXQiOjE0MzUyNTAyMTgsInN1YiI6MiwiZXhwIjoxNTIxNTYzODE4fQ.6mbq4-Ro6K5MmuNiTJBB153RDhlM5LGJBjI7GBKkfqA'}
def test_admin_certificate_get(client):
assert client.get(api.url_for(Certificates, certificate_id=1), headers=VALID_ADMIN_HEADER_TOKEN).status_code == 200
def test_admin_certificate_post(client):
assert client.post(api.url_for(Certificates, certificate_id=1), data={}, headers=VALID_ADMIN_HEADER_TOKEN).status_code == 405
def test_admin_certificate_put(client):
assert client.put(api.url_for(Certificates, certificate_id=1), data={}, headers=VALID_ADMIN_HEADER_TOKEN).status_code == 400
def test_admin_certificate_delete(client):
assert client.delete(api.url_for(Certificates, certificate_id=1), headers=VALID_ADMIN_HEADER_TOKEN).status_code == 405
def test_admin_certificate_patch(client):
assert client.patch(api.url_for(Certificates, certificate_id=1), data={}, headers=VALID_ADMIN_HEADER_TOKEN).status_code == 405
def test_admin_certificates_get(client):
resp = client.get(api.url_for(CertificatesList), headers=VALID_ADMIN_HEADER_TOKEN)
assert resp.status_code == 200
assert resp.json['total'] == 0
def test_admin_certificate_credentials_get(client):
assert client.get(api.url_for(CertificatePrivateKey, certificate_id=1), headers=VALID_ADMIN_HEADER_TOKEN).status_code == 404
def test_admin_certificate_credentials_post(client):
assert client.post(api.url_for(CertificatePrivateKey, certificate_id=1), data={}, headers=VALID_ADMIN_HEADER_TOKEN).status_code == 405
def test_admin_certificate_credentials_put(client):
assert client.put(api.url_for(CertificatePrivateKey, certificate_id=1), data={}, headers=VALID_ADMIN_HEADER_TOKEN).status_code == 405
def test_admin_certificate_credentials_delete(client):
assert client.delete(api.url_for(CertificatePrivateKey, certificate_id=1), headers=VALID_ADMIN_HEADER_TOKEN).status_code == 405
def test_admin_certificate_credentials_patch(client):
assert client.patch(api.url_for(CertificatePrivateKey, certificate_id=1), data={}, headers=VALID_ADMIN_HEADER_TOKEN).status_code == 405
@pytest.mark.parametrize("token,status", [
(VALID_USER_HEADER_TOKEN, 404),
(VALID_ADMIN_HEADER_TOKEN, 404),
('', 401)
])
def test_certificate_get(client, token, status):
assert client.get(api.url_for(Certificates, certificate_id=1), headers=token).status_code == status
@pytest.mark.parametrize("token,status", [
(VALID_USER_HEADER_TOKEN, 405),
(VALID_ADMIN_HEADER_TOKEN, 405),
('', 405)
])
def test_certificate_post(client, token, status):
assert client.post(api.url_for(Certificates, certificate_id=1), data={}, headers=token).status_code == status
@pytest.mark.parametrize("token,status", [
(VALID_USER_HEADER_TOKEN, 400),
(VALID_ADMIN_HEADER_TOKEN, 400),
('', 401)
])
def test_certificate_put(client, token, status):
assert client.put(api.url_for(Certificates, certificate_id=1), data={}, headers=token).status_code == status
@pytest.mark.parametrize("token,status", [
(VALID_USER_HEADER_TOKEN, 405),
(VALID_ADMIN_HEADER_TOKEN, 405),
('', 405)
])
def test_certificate_delete(client, token, status):
assert client.delete(api.url_for(Certificates, certificate_id=1), headers=token).status_code == status
@pytest.mark.parametrize("token,status", [
(VALID_USER_HEADER_TOKEN, 405),
(VALID_ADMIN_HEADER_TOKEN, 405),
('', 405)
])
def test_certificate_patch(client, token, status):
assert client.patch(api.url_for(Certificates, certificate_id=1), data={}, headers=token).status_code == status
@pytest.mark.parametrize("token,status", [
(VALID_USER_HEADER_TOKEN, 200),
(VALID_ADMIN_HEADER_TOKEN, 200),
('', 401)
])
def test_certificates_get(client, token, status):
assert client.get(api.url_for(CertificatesList), headers=token).status_code == status
@pytest.mark.parametrize("token,status", [
(VALID_USER_HEADER_TOKEN, 400),
(VALID_ADMIN_HEADER_TOKEN, 400),
('', 401)
])
def test_certificates_post(client, token, status):
assert client.post(api.url_for(CertificatesList), data={}, headers=token).status_code == status
@pytest.mark.parametrize("token,status", [
(VALID_USER_HEADER_TOKEN, 405),
(VALID_ADMIN_HEADER_TOKEN, 405),
('', 405)
])
def test_certificates_put(client, token, status):
assert client.put(api.url_for(CertificatesList), data={}, headers=token).status_code == status
@pytest.mark.parametrize("token,status", [
(VALID_USER_HEADER_TOKEN, 405),
(VALID_ADMIN_HEADER_TOKEN, 405),
('', 405)
])
def test_certificates_delete(client, token, status):
assert client.delete(api.url_for(CertificatesList), headers=token).status_code == status
@pytest.mark.parametrize("token,status", [
(VALID_USER_HEADER_TOKEN, 405),
(VALID_ADMIN_HEADER_TOKEN, 405),
('', 405)
])
def test_certificates_patch(client, token, status):
assert client.patch(api.url_for(CertificatesList), data={}, headers=token).status_code == status
@pytest.mark.parametrize("token,status", [
(VALID_USER_HEADER_TOKEN, 404),
(VALID_ADMIN_HEADER_TOKEN, 404),
('', 401)
])
def test_certificate_credentials_get(client, token, status):
assert client.get(api.url_for(CertificatePrivateKey, certificate_id=1), headers=token).status_code == status
@pytest.mark.parametrize("token,status", [
(VALID_USER_HEADER_TOKEN, 405),
(VALID_ADMIN_HEADER_TOKEN, 405),
('', 405)
])
def test_certificate_credentials_post(client, token, status):
assert client.post(api.url_for(CertificatePrivateKey, certificate_id=1), data={}, headers=token).status_code == status
@pytest.mark.parametrize("token,status", [
(VALID_USER_HEADER_TOKEN, 405),
(VALID_ADMIN_HEADER_TOKEN, 405),
('', 405)
])
def test_certificate_credentials_put(client, token, status):
assert client.put(api.url_for(CertificatePrivateKey, certificate_id=1), data={}, headers=token).status_code == status
@pytest.mark.parametrize("token,status", [
(VALID_USER_HEADER_TOKEN, 405),
(VALID_ADMIN_HEADER_TOKEN, 405),
('', 405)
])
def test_certificate_credentials_delete(client, token, status):
assert client.delete(api.url_for(CertificatePrivateKey, certificate_id=1), headers=token).status_code == status
@pytest.mark.parametrize("token,status", [
(VALID_USER_HEADER_TOKEN, 405),
(VALID_ADMIN_HEADER_TOKEN, 405),
('', 405)
])
def test_certificate_credentials_patch(client, token, status):
assert client.patch(api.url_for(CertificatePrivateKey, certificate_id=1), data={}, headers=token).status_code == status
@pytest.mark.parametrize("token,status", [
(VALID_USER_HEADER_TOKEN, 405),
(VALID_ADMIN_HEADER_TOKEN, 405),
('', 405)
])
def test_certificates_upload_get(client, token, status):
assert client.get(api.url_for(CertificatesUpload), headers=token).status_code == status
@pytest.mark.parametrize("token,status", [
(VALID_USER_HEADER_TOKEN, 400),
(VALID_ADMIN_HEADER_TOKEN, 400),
('', 401)
])
def test_certificates_upload_post(client, token, status):
assert client.post(api.url_for(CertificatesUpload), data={}, headers=token).status_code == status
@pytest.mark.parametrize("token,status", [
(VALID_USER_HEADER_TOKEN, 405),
(VALID_ADMIN_HEADER_TOKEN, 405),
('', 405)
])
def test_certificates_upload_put(client, token, status):
assert client.put(api.url_for(CertificatesUpload), data={}, headers=token).status_code == status
@pytest.mark.parametrize("token,status", [
(VALID_USER_HEADER_TOKEN, 405),
(VALID_ADMIN_HEADER_TOKEN, 405),
('', 405)
])
def test_certificates_upload_delete(client, token, status):
assert client.delete(api.url_for(CertificatesUpload), headers=token).status_code == status
@pytest.mark.parametrize("token,status", [
(VALID_USER_HEADER_TOKEN, 405),
(VALID_ADMIN_HEADER_TOKEN, 405),
('', 405)
])
def test_certificates_upload_patch(client, token, status):
assert client.patch(api.url_for(CertificatesUpload), data={}, headers=token).status_code == status

View File

@ -1,6 +1,14 @@
from cryptography import x509
from cryptography.hazmat.backends import default_backend
VALID_USER_HEADER_TOKEN = {
'Authorization': 'Basic ' + 'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpYXQiOjE0MzUyMzMzNjksInN1YiI6MSwiZXhwIjoxNTIxNTQ2OTY5fQ.1qCi0Ip7mzKbjNh0tVd3_eJOrae3rNa_9MCVdA4WtQI'}
VALID_ADMIN_HEADER_TOKEN = {
'Authorization': 'Basic ' + 'eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJpYXQiOjE0MzUyNTAyMTgsInN1YiI6MiwiZXhwIjoxNTIxNTYzODE4fQ.6mbq4-Ro6K5MmuNiTJBB153RDhlM5LGJBjI7GBKkfqA'}
INTERNAL_VALID_LONG_STR = b"""
-----BEGIN CERTIFICATE-----
MIID1zCCAr+gAwIBAgIBATANBgkqhkiG9w0BAQsFADCBjDELMAkGA1UEBhMCVVMx